blob: 70d4607c296ae29979ff6a7ea0a08883814e8b91 [file] [log] [blame]
From 0136ca731cba8b056b3f2ff0e7df3953b94f1e87 Mon Sep 17 00:00:00 2001
From: Tim Orling <tim.orling@konsulko.com>
Date: Sun, 24 Dec 2023 09:41:57 -0800
Subject: [PATCH 1/2] test_functionality: convert line endings to Unix
Convert the Windows line endings with dos2unix to be like the
other files in tests/*
Upstream-Status: Submitted [https://github.com/sumerc/yappi/pull/164]
Signed-off-by: Tim Orling <tim.orling@konsulko.com>
---
tests/test_functionality.py | 3822 +++++++++++++++++------------------
1 file changed, 1911 insertions(+), 1911 deletions(-)
diff --git a/tests/test_functionality.py b/tests/test_functionality.py
index 0e99c47..38bbe67 100644
--- a/tests/test_functionality.py
+++ b/tests/test_functionality.py
@@ -1,1911 +1,1911 @@
-import os
-import sys
-import time
-import threading
-import unittest
-import yappi
-import _yappi
-import utils
-import multiprocessing
-import subprocess
-
-_counter = 0
-
-
-class BasicUsage(utils.YappiUnitTestCase):
-
- def test_callback_function_int_return_overflow(self):
- # this test is just here to check if any errors are generated, as the err
- # is printed in C side, I did not include it here. THere are ways to test
- # this deterministically, I did not bother
- import ctypes
-
- def _unsigned_overflow_margin():
- return 2**(ctypes.sizeof(ctypes.c_void_p) * 8) - 1
-
- def foo():
- pass
-
- #with utils.captured_output() as (out, err):
- yappi.set_context_id_callback(_unsigned_overflow_margin)
- yappi.set_tag_callback(_unsigned_overflow_margin)
- yappi.start()
- foo()
-
- def test_issue60(self):
-
- def foo():
- buf = bytearray()
- buf += b't' * 200
- view = memoryview(buf)[10:]
- view = view.tobytes()
- del buf[:10] # this throws exception
- return view
-
- yappi.start(builtins=True)
- foo()
- self.assertTrue(
- len(
- yappi.get_func_stats(
- filter_callback=lambda x: yappi.
- func_matches(x, [memoryview.tobytes])
- )
- ) > 0
- )
- yappi.stop()
-
- def test_issue54(self):
-
- def _tag_cbk():
- global _counter
- _counter += 1
- return _counter
-
- def a():
- pass
-
- def b():
- pass
-
- yappi.set_tag_callback(_tag_cbk)
- yappi.start()
- a()
- a()
- a()
- yappi.stop()
- stats = yappi.get_func_stats()
- self.assertEqual(stats.pop().ncall, 3) # aggregated if no tag is given
- stats = yappi.get_func_stats(tag=1)
-
- for i in range(1, 3):
- stats = yappi.get_func_stats(tag=i)
- stats = yappi.get_func_stats(
- tag=i, filter_callback=lambda x: yappi.func_matches(x, [a])
- )
-
- stat = stats.pop()
- self.assertEqual(stat.ncall, 1)
-
- yappi.set_tag_callback(None)
- yappi.clear_stats()
- yappi.start()
- b()
- b()
- stats = yappi.get_func_stats()
- self.assertEqual(len(stats), 1)
- stat = stats.pop()
- self.assertEqual(stat.ncall, 2)
-
- def test_filter(self):
-
- def a():
- pass
-
- def b():
- a()
-
- def c():
- b()
-
- _TCOUNT = 5
-
- ts = []
- yappi.start()
- for i in range(_TCOUNT):
- t = threading.Thread(target=c)
- t.start()
- ts.append(t)
-
- for t in ts:
- t.join()
-
- yappi.stop()
-
- ctx_ids = []
- for tstat in yappi.get_thread_stats():
- if tstat.name == '_MainThread':
- main_ctx_id = tstat.id
- else:
- ctx_ids.append(tstat.id)
-
- fstats = yappi.get_func_stats(filter={"ctx_id": 9})
- self.assertTrue(fstats.empty())
- fstats = yappi.get_func_stats(
- filter={
- "ctx_id": main_ctx_id,
- "name": "c"
- }
- ) # main thread
- self.assertTrue(fstats.empty())
-
- for i in ctx_ids:
- fstats = yappi.get_func_stats(
- filter={
- "ctx_id": i,
- "name": "a",
- "ncall": 1
- }
- )
- self.assertEqual(fstats.pop().ncall, 1)
- fstats = yappi.get_func_stats(filter={"ctx_id": i, "name": "b"})
- self.assertEqual(fstats.pop().ncall, 1)
- fstats = yappi.get_func_stats(filter={"ctx_id": i, "name": "c"})
- self.assertEqual(fstats.pop().ncall, 1)
-
- yappi.clear_stats()
- yappi.start(builtins=True)
- time.sleep(0.1)
- yappi.stop()
- fstats = yappi.get_func_stats(filter={"module": "time"})
- self.assertEqual(len(fstats), 1)
-
- # invalid filters`
- self.assertRaises(
- Exception, yappi.get_func_stats, filter={'tag': "sss"}
- )
- self.assertRaises(
- Exception, yappi.get_func_stats, filter={'ctx_id': "None"}
- )
-
- def test_filter_callback(self):
-
- def a():
- time.sleep(0.1)
-
- def b():
- a()
-
- def c():
- pass
-
- def d():
- pass
-
- yappi.set_clock_type("wall")
- yappi.start(builtins=True)
- a()
- b()
- c()
- d()
- stats = yappi.get_func_stats(
- filter_callback=lambda x: yappi.func_matches(x, [a, b])
- )
- #stats.print_all()
- r1 = '''
- tests/test_functionality.py:98 a 2 0.000000 0.200350 0.100175
- tests/test_functionality.py:101 b 1 0.000000 0.120000 0.100197
- '''
- self.assert_traces_almost_equal(r1, stats)
- self.assertEqual(len(stats), 2)
- stats = yappi.get_func_stats(
- filter_callback=lambda x: yappi.
- module_matches(x, [sys.modules[__name__]])
- )
- r1 = '''
- tests/test_functionality.py:98 a 2 0.000000 0.230130 0.115065
- tests/test_functionality.py:101 b 1 0.000000 0.120000 0.109011
- tests/test_functionality.py:104 c 1 0.000000 0.000002 0.000002
- tests/test_functionality.py:107 d 1 0.000000 0.000001 0.000001
- '''
- self.assert_traces_almost_equal(r1, stats)
- self.assertEqual(len(stats), 4)
-
- stats = yappi.get_func_stats(
- filter_callback=lambda x: yappi.func_matches(x, [time.sleep])
- )
- self.assertEqual(len(stats), 1)
- r1 = '''
- time.sleep 2 0.206804 0.220000 0.103402
- '''
- self.assert_traces_almost_equal(r1, stats)
-
- def test_print_formatting(self):
-
- def a():
- pass
-
- def b():
- a()
-
- func_cols = {
- 1: ("name", 48),
- 0: ("ncall", 5),
- 2: ("tsub", 8),
- }
- thread_cols = {
- 1: ("name", 48),
- 0: ("ttot", 8),
- }
-
- yappi.start()
- a()
- b()
- yappi.stop()
- fs = yappi.get_func_stats()
- cs = fs[1].children
- ts = yappi.get_thread_stats()
- #fs.print_all(out=sys.stderr, columns={1:("name", 70), })
- #cs.print_all(out=sys.stderr, columns=func_cols)
- #ts.print_all(out=sys.stderr, columns=thread_cols)
- #cs.print_all(out=sys.stderr, columns={})
-
- self.assertRaises(
- yappi.YappiError, fs.print_all, columns={1: ("namee", 9)}
- )
- self.assertRaises(
- yappi.YappiError, cs.print_all, columns={1: ("dd", 0)}
- )
- self.assertRaises(
- yappi.YappiError, ts.print_all, columns={1: ("tidd", 0)}
- )
-
- def test_get_clock(self):
- yappi.set_clock_type('cpu')
- self.assertEqual('cpu', yappi.get_clock_type())
- clock_info = yappi.get_clock_info()
- self.assertTrue('api' in clock_info)
- self.assertTrue('resolution' in clock_info)
-
- yappi.set_clock_type('wall')
- self.assertEqual('wall', yappi.get_clock_type())
-
- t0 = yappi.get_clock_time()
- time.sleep(0.1)
- duration = yappi.get_clock_time() - t0
- self.assertTrue(0.05 < duration < 0.3)
-
- def test_profile_decorator(self):
-
- def aggregate(func, stats):
- fname = f"tests/{func.__name__}.profile"
- try:
- stats.add(fname)
- except OSError:
- pass
- stats.save(fname)
- raise Exception("messing around")
-
- @yappi.profile(return_callback=aggregate)
- def a(x, y):
- if x + y == 25:
- raise Exception("")
- return x + y
-
- def b():
- pass
-
- try:
- os.remove(
- "tests/a.profile"
- ) # remove the one from prev test, if available
- except:
- pass
-
- # global profile is on to mess things up
- yappi.start()
- b()
-
- # assert functionality and call function at same time
- try:
- self.assertEqual(a(1, 2), 3)
- except:
- pass
- try:
- self.assertEqual(a(2, 5), 7)
- except:
- pass
- try:
- a(4, 21)
- except:
- pass
- stats = yappi.get_func_stats().add("tests/a.profile")
- fsa = utils.find_stat_by_name(stats, 'a')
- self.assertEqual(fsa.ncall, 3)
- self.assertEqual(len(stats), 1) # b() should be cleared out.
-
- @yappi.profile(return_callback=aggregate)
- def count_down_rec(n):
- if n == 0:
- return
- count_down_rec(n - 1)
-
- try:
- os.remove(
- "tests/count_down_rec.profile"
- ) # remove the one from prev test, if available
- except:
- pass
-
- try:
- count_down_rec(4)
- except:
- pass
- try:
- count_down_rec(3)
- except:
- pass
-
- stats = yappi.YFuncStats("tests/count_down_rec.profile")
- fsrec = utils.find_stat_by_name(stats, 'count_down_rec')
- self.assertEqual(fsrec.ncall, 9)
- self.assertEqual(fsrec.nactualcall, 2)
-
- def test_strip_dirs(self):
-
- def a():
- pass
-
- stats = utils.run_and_get_func_stats(a, )
- stats.strip_dirs()
- fsa = utils.find_stat_by_name(stats, "a")
- self.assertEqual(fsa.module, os.path.basename(fsa.module))
-
- @unittest.skipIf(os.name == "nt", "do not run on Windows")
- def test_run_as_script(self):
- import re
- p = subprocess.Popen(
- ['yappi', os.path.join('./tests', 'run_as_script.py')],
- stdout=subprocess.PIPE
- )
- out, err = p.communicate()
- self.assertEqual(p.returncode, 0)
- func_stats, thread_stats = re.split(
- b'name\\s+id\\s+tid\\s+ttot\\s+scnt\\s*\n', out
- )
- self.assertTrue(b'FancyThread' in thread_stats)
-
- def test_yappi_overhead(self):
- LOOP_COUNT = 100000
-
- def a():
- pass
-
- def b():
- for i in range(LOOP_COUNT):
- a()
-
- t0 = time.time()
- yappi.start()
- b()
- yappi.stop()
- time_with_yappi = time.time() - t0
- t0 = time.time()
- b()
- time_without_yappi = time.time() - t0
- if time_without_yappi == 0:
- time_without_yappi = 0.000001
-
- # in latest v0.82, I calculated this as close to "7.0" in my machine.
- # however, %83 of this overhead is coming from tickcount(). The other %17
- # seems to have been evenly distributed to the internal bookkeeping
- # structures/algorithms which seems acceptable. Note that our test only
- # tests one function being profiled at-a-time in a short interval.
- # profiling high number of functions in a small time
- # is a different beast, (which is pretty unlikely in most applications)
- # So as a conclusion: I cannot see any optimization window for Yappi that
- # is worth implementing as we will only optimize %17 of the time.
- sys.stderr.write("\r\nYappi puts %0.1f times overhead to the profiled application in average.\r\n" % \
- (time_with_yappi / time_without_yappi))
-
- def test_clear_stats_while_running(self):
-
- def a():
- pass
-
- yappi.start()
- a()
- yappi.clear_stats()
- a()
- stats = yappi.get_func_stats()
- fsa = utils.find_stat_by_name(stats, 'a')
- self.assertEqual(fsa.ncall, 1)
-
- def test_generator(self):
-
- def _gen(n):
- while (n > 0):
- yield n
- n -= 1
-
- yappi.start()
- for x in _gen(5):
- pass
- self.assertTrue(
- yappi.convert2pstats(yappi.get_func_stats()) is not None
- )
-
- def test_slice_child_stats_and_strip_dirs(self):
-
- def b():
- for i in range(10000000):
- pass
-
- def a():
- b()
-
- yappi.start(builtins=True)
- a()
- stats = yappi.get_func_stats()
- fsa = utils.find_stat_by_name(stats, 'a')
- fsb = utils.find_stat_by_name(stats, 'b')
- self.assertTrue(fsa.children[0:1] is not None)
- prev_afullname = fsa.full_name
- prev_bchildfullname = fsa.children[fsb].full_name
- stats.strip_dirs()
- self.assertTrue(len(prev_afullname) > len(fsa.full_name))
- self.assertTrue(
- len(prev_bchildfullname) > len(fsa.children[fsb].full_name)
- )
-
- def test_children_stat_functions(self):
- _timings = {"a_1": 5, "b_1": 3, "c_1": 1}
- _yappi._set_test_timings(_timings)
-
- def b():
- pass
-
- def c():
- pass
-
- def a():
- b()
- c()
-
- yappi.start()
- a()
- b() # non-child call
- c() # non-child call
- stats = yappi.get_func_stats()
- fsa = utils.find_stat_by_name(stats, 'a')
- childs_of_a = fsa.children.get().sort("tavg", "desc")
- prev_item = None
- for item in childs_of_a:
- if prev_item:
- self.assertTrue(prev_item.tavg > item.tavg)
- prev_item = item
- childs_of_a.sort("name", "desc")
- prev_item = None
- for item in childs_of_a:
- if prev_item:
- self.assertTrue(prev_item.name > item.name)
- prev_item = item
- childs_of_a.clear()
- self.assertTrue(childs_of_a.empty())
-
- def test_no_stats_different_clock_type_load(self):
-
- def a():
- pass
-
- yappi.start()
- a()
- yappi.stop()
- yappi.get_func_stats().save("tests/ystats1.ys")
- yappi.clear_stats()
- yappi.set_clock_type("WALL")
- yappi.start()
- yappi.stop()
- stats = yappi.get_func_stats().add("tests/ystats1.ys")
- fsa = utils.find_stat_by_name(stats, 'a')
- self.assertTrue(fsa is not None)
-
- def test_subsequent_profile(self):
- _timings = {"a_1": 1, "b_1": 1}
- _yappi._set_test_timings(_timings)
-
- def a():
- pass
-
- def b():
- pass
-
- yappi.start()
- a()
- yappi.stop()
- yappi.start()
- b()
- yappi.stop()
- stats = yappi.get_func_stats()
- fsa = utils.find_stat_by_name(stats, 'a')
- fsb = utils.find_stat_by_name(stats, 'b')
- self.assertTrue(fsa is not None)
- self.assertTrue(fsb is not None)
- self.assertEqual(fsa.ttot, 1)
- self.assertEqual(fsb.ttot, 1)
-
- def test_lambda(self):
- f = lambda: time.sleep(0.3)
- yappi.set_clock_type("wall")
- yappi.start()
- f()
- stats = yappi.get_func_stats()
- fsa = utils.find_stat_by_name(stats, '<lambda>')
- self.assertTrue(fsa.ttot > 0.1)
-
- def test_module_stress(self):
- self.assertEqual(yappi.is_running(), False)
-
- yappi.start()
- yappi.clear_stats()
- self.assertRaises(_yappi.error, yappi.set_clock_type, "wall")
-
- yappi.stop()
- yappi.clear_stats()
- yappi.set_clock_type("cpu")
- self.assertRaises(yappi.YappiError, yappi.set_clock_type, "dummy")
- self.assertEqual(yappi.is_running(), False)
- yappi.clear_stats()
- yappi.clear_stats()
-
- def test_stat_sorting(self):
- _timings = {"a_1": 13, "b_1": 10, "a_2": 6, "b_2": 1}
- _yappi._set_test_timings(_timings)
-
- self._ncall = 1
-
- def a():
- b()
-
- def b():
- if self._ncall == 2:
- return
- self._ncall += 1
- a()
-
- stats = utils.run_and_get_func_stats(a)
- stats = stats.sort("totaltime", "desc")
- prev_stat = None
- for stat in stats:
- if prev_stat:
- self.assertTrue(prev_stat.ttot >= stat.ttot)
- prev_stat = stat
- stats = stats.sort("totaltime", "asc")
- prev_stat = None
- for stat in stats:
- if prev_stat:
- self.assertTrue(prev_stat.ttot <= stat.ttot)
- prev_stat = stat
- stats = stats.sort("avgtime", "asc")
- prev_stat = None
- for stat in stats:
- if prev_stat:
- self.assertTrue(prev_stat.tavg <= stat.tavg)
- prev_stat = stat
- stats = stats.sort("name", "asc")
- prev_stat = None
- for stat in stats:
- if prev_stat:
- self.assertTrue(prev_stat.name <= stat.name)
- prev_stat = stat
- stats = stats.sort("subtime", "asc")
- prev_stat = None
- for stat in stats:
- if prev_stat:
- self.assertTrue(prev_stat.tsub <= stat.tsub)
- prev_stat = stat
-
- self.assertRaises(
- yappi.YappiError, stats.sort, "invalid_func_sorttype_arg"
- )
- self.assertRaises(
- yappi.YappiError, stats.sort, "totaltime",
- "invalid_func_sortorder_arg"
- )
-
- def test_start_flags(self):
- self.assertEqual(_yappi._get_start_flags(), None)
- yappi.start()
-
- def a():
- pass
-
- a()
- self.assertEqual(_yappi._get_start_flags()["profile_builtins"], 0)
- self.assertEqual(_yappi._get_start_flags()["profile_multicontext"], 1)
- self.assertEqual(len(yappi.get_thread_stats()), 1)
-
- def test_builtin_profiling(self):
-
- def a():
- time.sleep(0.4) # is a builtin function
-
- yappi.set_clock_type('wall')
-
- yappi.start(builtins=True)
- a()
- stats = yappi.get_func_stats()
- fsa = utils.find_stat_by_name(stats, 'sleep')
- self.assertTrue(fsa is not None)
- self.assertTrue(fsa.ttot > 0.3)
- yappi.stop()
- yappi.clear_stats()
-
- def a():
- pass
-
- yappi.start()
- t = threading.Thread(target=a)
- t.start()
- t.join()
- stats = yappi.get_func_stats()
-
- def test_singlethread_profiling(self):
- yappi.set_clock_type('wall')
-
- def a():
- time.sleep(0.2)
-
- class Worker1(threading.Thread):
-
- def a(self):
- time.sleep(0.3)
-
- def run(self):
- self.a()
-
- yappi.start(profile_threads=False)
-
- c = Worker1()
- c.start()
- c.join()
- a()
- stats = yappi.get_func_stats()
- fsa1 = utils.find_stat_by_name(stats, 'Worker1.a')
- fsa2 = utils.find_stat_by_name(stats, 'a')
- self.assertTrue(fsa1 is None)
- self.assertTrue(fsa2 is not None)
- self.assertTrue(fsa2.ttot > 0.1)
-
- def test_run(self):
-
- def profiled():
- pass
-
- yappi.clear_stats()
- try:
- with yappi.run():
- profiled()
- stats = yappi.get_func_stats()
- finally:
- yappi.clear_stats()
-
- self.assertIsNotNone(utils.find_stat_by_name(stats, 'profiled'))
-
- def test_run_recursive(self):
-
- def profiled():
- pass
-
- def not_profiled():
- pass
-
- yappi.clear_stats()
- try:
- with yappi.run():
- with yappi.run():
- profiled()
- # Profiling stopped here
- not_profiled()
- stats = yappi.get_func_stats()
- finally:
- yappi.clear_stats()
-
- self.assertIsNotNone(utils.find_stat_by_name(stats, 'profiled'))
- self.assertIsNone(utils.find_stat_by_name(stats, 'not_profiled'))
-
-
-class StatSaveScenarios(utils.YappiUnitTestCase):
-
- def test_pstats_conversion(self):
-
- def pstat_id(fs):
- return (fs.module, fs.lineno, fs.name)
-
- def a():
- d()
-
- def b():
- d()
-
- def c():
- pass
-
- def d():
- pass
-
- _timings = {"a_1": 12, "b_1": 7, "c_1": 5, "d_1": 2}
- _yappi._set_test_timings(_timings)
- stats = utils.run_and_get_func_stats(a, )
- stats.strip_dirs()
- stats.save("tests/a1.pstats", type="pstat")
- fsa_pid = pstat_id(utils.find_stat_by_name(stats, "a"))
- fsd_pid = pstat_id(utils.find_stat_by_name(stats, "d"))
- yappi.clear_stats()
- _yappi._set_test_timings(_timings)
- stats = utils.run_and_get_func_stats(a, )
- stats.strip_dirs()
- stats.save("tests/a2.pstats", type="pstat")
- yappi.clear_stats()
- _yappi._set_test_timings(_timings)
- stats = utils.run_and_get_func_stats(b, )
- stats.strip_dirs()
- stats.save("tests/b1.pstats", type="pstat")
- fsb_pid = pstat_id(utils.find_stat_by_name(stats, "b"))
- yappi.clear_stats()
- _yappi._set_test_timings(_timings)
- stats = utils.run_and_get_func_stats(c, )
- stats.strip_dirs()
- stats.save("tests/c1.pstats", type="pstat")
- fsc_pid = pstat_id(utils.find_stat_by_name(stats, "c"))
-
- # merge saved stats and check pstats values are correct
- import pstats
- p = pstats.Stats(
- 'tests/a1.pstats', 'tests/a2.pstats', 'tests/b1.pstats',
- 'tests/c1.pstats'
- )
- p.strip_dirs()
- # ct = ttot, tt = tsub
- (cc, nc, tt, ct, callers) = p.stats[fsa_pid]
- self.assertEqual(cc, nc, 2)
- self.assertEqual(tt, 20)
- self.assertEqual(ct, 24)
- (cc, nc, tt, ct, callers) = p.stats[fsd_pid]
- self.assertEqual(cc, nc, 3)
- self.assertEqual(tt, 6)
- self.assertEqual(ct, 6)
- self.assertEqual(len(callers), 2)
- (cc, nc, tt, ct) = callers[fsa_pid]
- self.assertEqual(cc, nc, 2)
- self.assertEqual(tt, 4)
- self.assertEqual(ct, 4)
- (cc, nc, tt, ct) = callers[fsb_pid]
- self.assertEqual(cc, nc, 1)
- self.assertEqual(tt, 2)
- self.assertEqual(ct, 2)
-
- def test_merge_stats(self):
- _timings = {
- "a_1": 15,
- "b_1": 14,
- "c_1": 12,
- "d_1": 10,
- "e_1": 9,
- "f_1": 7,
- "g_1": 6,
- "h_1": 5,
- "i_1": 1
- }
- _yappi._set_test_timings(_timings)
-
- def a():
- b()
-
- def b():
- c()
-
- def c():
- d()
-
- def d():
- e()
-
- def e():
- f()
-
- def f():
- g()
-
- def g():
- h()
-
- def h():
- i()
-
- def i():
- pass
-
- yappi.start()
- a()
- a()
- yappi.stop()
- stats = yappi.get_func_stats()
- self.assertRaises(
- NotImplementedError, stats.save, "", "INVALID_SAVE_TYPE"
- )
- stats.save("tests/ystats2.ys")
- yappi.clear_stats()
- _yappi._set_test_timings(_timings)
- yappi.start()
- a()
- stats = yappi.get_func_stats().add("tests/ystats2.ys")
- fsa = utils.find_stat_by_name(stats, "a")
- fsb = utils.find_stat_by_name(stats, "b")
- fsc = utils.find_stat_by_name(stats, "c")
- fsd = utils.find_stat_by_name(stats, "d")
- fse = utils.find_stat_by_name(stats, "e")
- fsf = utils.find_stat_by_name(stats, "f")
- fsg = utils.find_stat_by_name(stats, "g")
- fsh = utils.find_stat_by_name(stats, "h")
- fsi = utils.find_stat_by_name(stats, "i")
- self.assertEqual(fsa.ttot, 45)
- self.assertEqual(fsa.ncall, 3)
- self.assertEqual(fsa.nactualcall, 3)
- self.assertEqual(fsa.tsub, 3)
- self.assertEqual(fsa.children[fsb].ttot, fsb.ttot)
- self.assertEqual(fsa.children[fsb].tsub, fsb.tsub)
- self.assertEqual(fsb.children[fsc].ttot, fsc.ttot)
- self.assertEqual(fsb.children[fsc].tsub, fsc.tsub)
- self.assertEqual(fsc.tsub, 6)
- self.assertEqual(fsc.children[fsd].ttot, fsd.ttot)
- self.assertEqual(fsc.children[fsd].tsub, fsd.tsub)
- self.assertEqual(fsd.children[fse].ttot, fse.ttot)
- self.assertEqual(fsd.children[fse].tsub, fse.tsub)
- self.assertEqual(fse.children[fsf].ttot, fsf.ttot)
- self.assertEqual(fse.children[fsf].tsub, fsf.tsub)
- self.assertEqual(fsf.children[fsg].ttot, fsg.ttot)
- self.assertEqual(fsf.children[fsg].tsub, fsg.tsub)
- self.assertEqual(fsg.ttot, 18)
- self.assertEqual(fsg.tsub, 3)
- self.assertEqual(fsg.children[fsh].ttot, fsh.ttot)
- self.assertEqual(fsg.children[fsh].tsub, fsh.tsub)
- self.assertEqual(fsh.ttot, 15)
- self.assertEqual(fsh.tsub, 12)
- self.assertEqual(fsh.tavg, 5)
- self.assertEqual(fsh.children[fsi].ttot, fsi.ttot)
- self.assertEqual(fsh.children[fsi].tsub, fsi.tsub)
- #stats.debug_print()
-
- def test_merge_multithreaded_stats(self):
- import _yappi
- timings = {"a_1": 2, "b_1": 1}
- _yappi._set_test_timings(timings)
-
- def a():
- pass
-
- def b():
- pass
-
- yappi.start()
- t = threading.Thread(target=a)
- t.start()
- t.join()
- t = threading.Thread(target=b)
- t.start()
- t.join()
- yappi.get_func_stats().save("tests/ystats1.ys")
- yappi.clear_stats()
- _yappi._set_test_timings(timings)
- self.assertEqual(len(yappi.get_func_stats()), 0)
- self.assertEqual(len(yappi.get_thread_stats()), 1)
- t = threading.Thread(target=a)
- t.start()
- t.join()
-
- self.assertEqual(_yappi._get_start_flags()["profile_builtins"], 0)
- self.assertEqual(_yappi._get_start_flags()["profile_multicontext"], 1)
- yappi.get_func_stats().save("tests/ystats2.ys")
-
- stats = yappi.YFuncStats([
- "tests/ystats1.ys",
- "tests/ystats2.ys",
- ])
- fsa = utils.find_stat_by_name(stats, "a")
- fsb = utils.find_stat_by_name(stats, "b")
- self.assertEqual(fsa.ncall, 2)
- self.assertEqual(fsb.ncall, 1)
- self.assertEqual(fsa.tsub, fsa.ttot, 4)
- self.assertEqual(fsb.tsub, fsb.ttot, 1)
-
- def test_merge_load_different_clock_types(self):
- yappi.start(builtins=True)
-
- def a():
- b()
-
- def b():
- c()
-
- def c():
- pass
-
- t = threading.Thread(target=a)
- t.start()
- t.join()
- yappi.get_func_stats().sort("name", "asc").save("tests/ystats1.ys")
- yappi.stop()
- yappi.clear_stats()
- yappi.start(builtins=False)
- t = threading.Thread(target=a)
- t.start()
- t.join()
- yappi.get_func_stats().save("tests/ystats2.ys")
- yappi.stop()
- self.assertRaises(_yappi.error, yappi.set_clock_type, "wall")
- yappi.clear_stats()
- yappi.set_clock_type("wall")
- yappi.start()
- t = threading.Thread(target=a)
- t.start()
- t.join()
- yappi.get_func_stats().save("tests/ystats3.ys")
- self.assertRaises(
- yappi.YappiError,
- yappi.YFuncStats().add("tests/ystats1.ys").add, "tests/ystats3.ys"
- )
- stats = yappi.YFuncStats(["tests/ystats1.ys",
- "tests/ystats2.ys"]).sort("name")
- fsa = utils.find_stat_by_name(stats, "a")
- fsb = utils.find_stat_by_name(stats, "b")
- fsc = utils.find_stat_by_name(stats, "c")
- self.assertEqual(fsa.ncall, 2)
- self.assertEqual(fsa.ncall, fsb.ncall, fsc.ncall)
-
- def test_merge_aabab_aabbc(self):
- _timings = {
- "a_1": 15,
- "a_2": 14,
- "b_1": 12,
- "a_3": 10,
- "b_2": 9,
- "c_1": 4
- }
- _yappi._set_test_timings(_timings)
-
- def a():
- if self._ncall == 1:
- self._ncall += 1
- a()
- elif self._ncall == 5:
- self._ncall += 1
- a()
- else:
- b()
-
- def b():
- if self._ncall == 2:
- self._ncall += 1
- a()
- elif self._ncall == 6:
- self._ncall += 1
- b()
- elif self._ncall == 7:
- c()
- else:
- return
-
- def c():
- pass
-
- self._ncall = 1
- stats = utils.run_and_get_func_stats(a, )
- stats.save("tests/ystats1.ys")
- yappi.clear_stats()
- _yappi._set_test_timings(_timings)
- #stats.print_all()
-
- self._ncall = 5
- stats = utils.run_and_get_func_stats(a, )
- stats.save("tests/ystats2.ys")
-
- #stats.print_all()
-
- def a(): # same name but another function(code object)
- pass
-
- yappi.start()
- a()
- stats = yappi.get_func_stats().add(
- ["tests/ystats1.ys", "tests/ystats2.ys"]
- )
- #stats.print_all()
- self.assertEqual(len(stats), 4)
-
- fsa = None
- for stat in stats:
- if stat.name == "a" and stat.ttot == 45:
- fsa = stat
- break
- self.assertTrue(fsa is not None)
-
- self.assertEqual(fsa.ncall, 7)
- self.assertEqual(fsa.nactualcall, 3)
- self.assertEqual(fsa.ttot, 45)
- self.assertEqual(fsa.tsub, 10)
- fsb = utils.find_stat_by_name(stats, "b")
- fsc = utils.find_stat_by_name(stats, "c")
- self.assertEqual(fsb.ncall, 6)
- self.assertEqual(fsb.nactualcall, 3)
- self.assertEqual(fsb.ttot, 36)
- self.assertEqual(fsb.tsub, 27)
- self.assertEqual(fsb.tavg, 6)
- self.assertEqual(fsc.ttot, 8)
- self.assertEqual(fsc.tsub, 8)
- self.assertEqual(fsc.tavg, 4)
- self.assertEqual(fsc.nactualcall, fsc.ncall, 2)
-
-
-class MultithreadedScenarios(utils.YappiUnitTestCase):
-
- def test_issue_32(self):
- '''
- Start yappi from different thread and we get Internal Error(15) as
- the current_ctx_id() called while enumerating the threads in start()
- and as it does not swap to the enumerated ThreadState* the THreadState_GetDict()
- returns wrong object and thus sets an invalid id for the _ctx structure.
-
- When this issue happens multiple Threads have same tid as the internal ts_ptr
- will be same for different contexts. So, let's see if that happens
- '''
-
- def foo():
- time.sleep(0.2)
-
- def bar():
- time.sleep(0.1)
-
- def thread_func():
- yappi.set_clock_type("wall")
- yappi.start()
-
- bar()
-
- t = threading.Thread(target=thread_func)
- t.start()
- t.join()
-
- foo()
-
- yappi.stop()
-
- thread_ids = set()
- for tstat in yappi.get_thread_stats():
- self.assertTrue(tstat.tid not in thread_ids)
- thread_ids.add(tstat.tid)
-
- def test_subsequent_profile(self):
- WORKER_COUNT = 5
-
- def a():
- pass
-
- def b():
- pass
-
- def c():
- pass
-
- _timings = {
- "a_1": 3,
- "b_1": 2,
- "c_1": 1,
- }
-
- yappi.start()
-
- def g():
- pass
-
- g()
- yappi.stop()
- yappi.clear_stats()
- _yappi._set_test_timings(_timings)
- yappi.start()
-
- _dummy = []
- for i in range(WORKER_COUNT):
- t = threading.Thread(target=a)
- t.start()
- t.join()
- for i in range(WORKER_COUNT):
- t = threading.Thread(target=b)
- t.start()
- _dummy.append(t)
- t.join()
- for i in range(WORKER_COUNT):
- t = threading.Thread(target=a)
- t.start()
- t.join()
- for i in range(WORKER_COUNT):
- t = threading.Thread(target=c)
- t.start()
- t.join()
- yappi.stop()
- yappi.start()
-
- def f():
- pass
-
- f()
- stats = yappi.get_func_stats()
- fsa = utils.find_stat_by_name(stats, 'a')
- fsb = utils.find_stat_by_name(stats, 'b')
- fsc = utils.find_stat_by_name(stats, 'c')
- self.assertEqual(fsa.ncall, 10)
- self.assertEqual(fsb.ncall, 5)
- self.assertEqual(fsc.ncall, 5)
- self.assertEqual(fsa.ttot, fsa.tsub, 30)
- self.assertEqual(fsb.ttot, fsb.tsub, 10)
- self.assertEqual(fsc.ttot, fsc.tsub, 5)
-
- # MACOSx optimizes by only creating one worker thread
- self.assertTrue(len(yappi.get_thread_stats()) >= 2)
-
- def test_basic(self):
- yappi.set_clock_type('wall')
-
- def dummy():
- pass
-
- def a():
- time.sleep(0.2)
-
- class Worker1(threading.Thread):
-
- def a(self):
- time.sleep(0.3)
-
- def run(self):
- self.a()
-
- yappi.start(builtins=False, profile_threads=True)
-
- c = Worker1()
- c.start()
- c.join()
- a()
- stats = yappi.get_func_stats()
- fsa1 = utils.find_stat_by_name(stats, 'Worker1.a')
- fsa2 = utils.find_stat_by_name(stats, 'a')
- self.assertTrue(fsa1 is not None)
- self.assertTrue(fsa2 is not None)
- self.assertTrue(fsa1.ttot > 0.2)
- self.assertTrue(fsa2.ttot > 0.1)
- tstats = yappi.get_thread_stats()
- self.assertEqual(len(tstats), 2)
- tsa = utils.find_stat_by_name(tstats, 'Worker1')
- tsm = utils.find_stat_by_name(tstats, '_MainThread')
- dummy() # call dummy to force ctx name to be retrieved again.
- self.assertTrue(tsa is not None)
- # TODO: I put dummy() to fix below, remove the comments after a while.
- self.assertTrue( # FIX: I see this fails sometimes?
- tsm is not None,
- f"Could not find \"_MainThread\". Found: {', '.join(utils.get_stat_names(tstats))}")
-
- def test_ctx_stats(self):
- from threading import Thread
- DUMMY_WORKER_COUNT = 5
- yappi.start()
-
- class DummyThread(Thread):
- pass
-
- def dummy():
- pass
-
- def dummy_worker():
- pass
-
- for i in range(DUMMY_WORKER_COUNT):
- t = DummyThread(target=dummy_worker)
- t.start()
- t.join()
- yappi.stop()
- stats = yappi.get_thread_stats()
- tsa = utils.find_stat_by_name(stats, "DummyThread")
- self.assertTrue(tsa is not None)
- yappi.clear_stats()
- time.sleep(1.0)
- _timings = {
- "a_1": 6,
- "b_1": 5,
- "c_1": 3,
- "d_1": 1,
- "a_2": 4,
- "b_2": 3,
- "c_2": 2,
- "d_2": 1
- }
- _yappi._set_test_timings(_timings)
-
- class Thread1(Thread):
- pass
-
- class Thread2(Thread):
- pass
-
- def a():
- b()
-
- def b():
- c()
-
- def c():
- d()
-
- def d():
- time.sleep(0.6)
-
- yappi.set_clock_type("wall")
- yappi.start()
- t1 = Thread1(target=a)
- t1.start()
- t2 = Thread2(target=a)
- t2.start()
- t1.join()
- t2.join()
- stats = yappi.get_thread_stats()
-
- # the fist clear_stats clears the context table?
- tsa = utils.find_stat_by_name(stats, "DummyThread")
- self.assertTrue(tsa is None)
-
- tst1 = utils.find_stat_by_name(stats, "Thread1")
- tst2 = utils.find_stat_by_name(stats, "Thread2")
- tsmain = utils.find_stat_by_name(stats, "_MainThread")
- dummy() # call dummy to force ctx name to be retrieved again.
- self.assertTrue(len(stats) == 3)
- self.assertTrue(tst1 is not None)
- self.assertTrue(tst2 is not None)
- # TODO: I put dummy() to fix below, remove the comments after a while.
- self.assertTrue( # FIX: I see this fails sometimes
- tsmain is not None,
- f"Could not find \"_MainThread\". Found: {', '.join(utils.get_stat_names(stats))}")
- self.assertTrue(1.0 > tst2.ttot >= 0.5)
- self.assertTrue(1.0 > tst1.ttot >= 0.5)
-
- # test sorting of the ctx stats
- stats = stats.sort("totaltime", "desc")
- prev_stat = None
- for stat in stats:
- if prev_stat:
- self.assertTrue(prev_stat.ttot >= stat.ttot)
- prev_stat = stat
- stats = stats.sort("totaltime", "asc")
- prev_stat = None
- for stat in stats:
- if prev_stat:
- self.assertTrue(prev_stat.ttot <= stat.ttot)
- prev_stat = stat
- stats = stats.sort("schedcount", "desc")
- prev_stat = None
- for stat in stats:
- if prev_stat:
- self.assertTrue(prev_stat.sched_count >= stat.sched_count)
- prev_stat = stat
- stats = stats.sort("name", "desc")
- prev_stat = None
- for stat in stats:
- if prev_stat:
- self.assertTrue(prev_stat.name.lower() >= stat.name.lower())
- prev_stat = stat
- self.assertRaises(
- yappi.YappiError, stats.sort, "invalid_thread_sorttype_arg"
- )
- self.assertRaises(
- yappi.YappiError, stats.sort, "invalid_thread_sortorder_arg"
- )
-
- def test_ctx_stats_cpu(self):
-
- def get_thread_name():
- try:
- return threading.current_thread().name
- except AttributeError:
- return "Anonymous"
-
- def burn_cpu(sec):
- t0 = yappi.get_clock_time()
- elapsed = 0
- while (elapsed < sec):
- for _ in range(1000):
- pass
- elapsed = yappi.get_clock_time() - t0
-
- def test():
-
- ts = []
- for i in (0.01, 0.05, 0.1):
- t = threading.Thread(target=burn_cpu, args=(i, ))
- t.name = f"burn_cpu-{str(i)}"
- t.start()
- ts.append(t)
- for t in ts:
- t.join()
-
- yappi.set_clock_type("cpu")
- yappi.set_context_name_callback(get_thread_name)
-
- yappi.start()
-
- test()
-
- yappi.stop()
-
- tstats = yappi.get_thread_stats()
- r1 = '''
- burn_cpu-0.1 3 123145356058624 0.100105 8
- burn_cpu-0.05 2 123145361313792 0.050149 8
- burn_cpu-0.01 1 123145356058624 0.010127 2
- MainThread 0 4321620864 0.001632 6
- '''
- self.assert_ctx_stats_almost_equal(r1, tstats)
-
- def test_producer_consumer_with_queues(self):
- # we currently just stress yappi, no functionality test is done here.
- yappi.start()
- from queue import Queue
- from threading import Thread
- WORKER_THREAD_COUNT = 50
- WORK_ITEM_COUNT = 2000
-
- def worker():
- while True:
- item = q.get()
- # do the work with item
- q.task_done()
-
- q = Queue()
- for i in range(WORKER_THREAD_COUNT):
- t = Thread(target=worker)
- t.daemon = True
- t.start()
-
- for item in range(WORK_ITEM_COUNT):
- q.put(item)
- q.join() # block until all tasks are done
- #yappi.get_func_stats().sort("callcount").print_all()
- yappi.stop()
-
- def test_temporary_lock_waiting(self):
- yappi.start()
- _lock = threading.Lock()
-
- def worker():
- _lock.acquire()
- try:
- time.sleep(1.0)
- finally:
- _lock.release()
-
- t1 = threading.Thread(target=worker)
- t2 = threading.Thread(target=worker)
- t1.start()
- t2.start()
- t1.join()
- t2.join()
- #yappi.get_func_stats().sort("callcount").print_all()
- yappi.stop()
-
- @unittest.skipIf(os.name != "posix", "requires Posix compliant OS")
- def test_signals_with_blocking_calls(self):
- import signal, os, time
-
- # just to verify if signal is handled correctly and stats/yappi are not corrupted.
- def handler(signum, frame):
- raise Exception("Signal handler executed!")
-
- yappi.start()
- signal.signal(signal.SIGALRM, handler)
- signal.alarm(1)
- self.assertRaises(Exception, time.sleep, 2)
- stats = yappi.get_func_stats()
- fsh = utils.find_stat_by_name(stats, "handler")
- self.assertTrue(fsh is not None)
-
- def test_concurrent_futures(self):
- yappi.start()
- from concurrent.futures import ThreadPoolExecutor
- with ThreadPoolExecutor(max_workers=5) as executor:
- f = executor.submit(pow, 5, 2)
- self.assertEqual(f.result(), 25)
- time.sleep(1.0)
- yappi.stop()
-
- def test_barrier(self):
- yappi.start()
- b = threading.Barrier(2, timeout=1)
-
- def worker():
- try:
- b.wait()
- except threading.BrokenBarrierError:
- pass
- except Exception:
- raise Exception("BrokenBarrierError not raised")
-
- t1 = threading.Thread(target=worker)
- t1.start()
- #b.wait()
- t1.join()
- yappi.stop()
-
-
-class NonRecursiveFunctions(utils.YappiUnitTestCase):
-
- def test_abcd(self):
- _timings = {"a_1": 6, "b_1": 5, "c_1": 3, "d_1": 1}
- _yappi._set_test_timings(_timings)
-
- def a():
- b()
-
- def b():
- c()
-
- def c():
- d()
-
- def d():
- pass
-
- stats = utils.run_and_get_func_stats(a)
- fsa = utils.find_stat_by_name(stats, 'a')
- fsb = utils.find_stat_by_name(stats, 'b')
- fsc = utils.find_stat_by_name(stats, 'c')
- fsd = utils.find_stat_by_name(stats, 'd')
- cfsab = fsa.children[fsb]
- cfsbc = fsb.children[fsc]
- cfscd = fsc.children[fsd]
-
- self.assertEqual(fsa.ttot, 6)
- self.assertEqual(fsa.tsub, 1)
- self.assertEqual(fsb.ttot, 5)
- self.assertEqual(fsb.tsub, 2)
- self.assertEqual(fsc.ttot, 3)
- self.assertEqual(fsc.tsub, 2)
- self.assertEqual(fsd.ttot, 1)
- self.assertEqual(fsd.tsub, 1)
- self.assertEqual(cfsab.ttot, 5)
- self.assertEqual(cfsab.tsub, 2)
- self.assertEqual(cfsbc.ttot, 3)
- self.assertEqual(cfsbc.tsub, 2)
- self.assertEqual(cfscd.ttot, 1)
- self.assertEqual(cfscd.tsub, 1)
-
- def test_stop_in_middle(self):
- _timings = {"a_1": 6, "b_1": 4}
- _yappi._set_test_timings(_timings)
-
- def a():
- b()
- yappi.stop()
-
- def b():
- time.sleep(0.2)
-
- yappi.start()
- a()
- stats = yappi.get_func_stats()
- fsa = utils.find_stat_by_name(stats, 'a')
- fsb = utils.find_stat_by_name(stats, 'b')
-
- self.assertEqual(fsa.ncall, 1)
- self.assertEqual(fsa.nactualcall, 0)
- self.assertEqual(fsa.ttot, 0) # no call_leave called
- self.assertEqual(fsa.tsub, 0) # no call_leave called
- self.assertEqual(fsb.ttot, 4)
-
-
-class RecursiveFunctions(utils.YappiUnitTestCase):
-
- def test_fibonacci(self):
-
- def fib(n):
- if n > 1:
- return fib(n - 1) + fib(n - 2)
- else:
- return n
-
- stats = utils.run_and_get_func_stats(fib, 22)
- fs = utils.find_stat_by_name(stats, 'fib')
- self.assertEqual(fs.ncall, 57313)
- self.assertEqual(fs.ttot, fs.tsub)
-
- def test_abcadc(self):
- _timings = {
- "a_1": 20,
- "b_1": 19,
- "c_1": 17,
- "a_2": 13,
- "d_1": 12,
- "c_2": 10,
- "a_3": 5
- }
- _yappi._set_test_timings(_timings)
-
- def a(n):
- if n == 3:
- return
- if n == 1 + 1:
- d(n)
- else:
- b(n)
-
- def b(n):
- c(n)
-
- def c(n):
- a(n + 1)
-
- def d(n):
- c(n)
-
- stats = utils.run_and_get_func_stats(a, 1)
- fsa = utils.find_stat_by_name(stats, 'a')
- fsb = utils.find_stat_by_name(stats, 'b')
- fsc = utils.find_stat_by_name(stats, 'c')
- fsd = utils.find_stat_by_name(stats, 'd')
- self.assertEqual(fsa.ncall, 3)
- self.assertEqual(fsa.nactualcall, 1)
- self.assertEqual(fsa.ttot, 20)
- self.assertEqual(fsa.tsub, 7)
- self.assertEqual(fsb.ttot, 19)
- self.assertEqual(fsb.tsub, 2)
- self.assertEqual(fsc.ttot, 17)
- self.assertEqual(fsc.tsub, 9)
- self.assertEqual(fsd.ttot, 12)
- self.assertEqual(fsd.tsub, 2)
- cfsca = fsc.children[fsa]
- self.assertEqual(cfsca.nactualcall, 0)
- self.assertEqual(cfsca.ncall, 2)
- self.assertEqual(cfsca.ttot, 13)
- self.assertEqual(cfsca.tsub, 6)
-
- def test_aaaa(self):
- _timings = {"d_1": 9, "d_2": 7, "d_3": 3, "d_4": 2}
- _yappi._set_test_timings(_timings)
-
- def d(n):
- if n == 3:
- return
- d(n + 1)
-
- stats = utils.run_and_get_func_stats(d, 0)
- fsd = utils.find_stat_by_name(stats, 'd')
- self.assertEqual(fsd.ncall, 4)
- self.assertEqual(fsd.nactualcall, 1)
- self.assertEqual(fsd.ttot, 9)
- self.assertEqual(fsd.tsub, 9)
- cfsdd = fsd.children[fsd]
- self.assertEqual(cfsdd.ttot, 7)
- self.assertEqual(cfsdd.tsub, 7)
- self.assertEqual(cfsdd.ncall, 3)
- self.assertEqual(cfsdd.nactualcall, 0)
-
- def test_abcabc(self):
- _timings = {
- "a_1": 20,
- "b_1": 19,
- "c_1": 17,
- "a_2": 13,
- "b_2": 11,
- "c_2": 9,
- "a_3": 6
- }
- _yappi._set_test_timings(_timings)
-
- def a(n):
- if n == 3:
- return
- else:
- b(n)
-
- def b(n):
- c(n)
-
- def c(n):
- a(n + 1)
-
- stats = utils.run_and_get_func_stats(a, 1)
- fsa = utils.find_stat_by_name(stats, 'a')
- fsb = utils.find_stat_by_name(stats, 'b')
- fsc = utils.find_stat_by_name(stats, 'c')
- self.assertEqual(fsa.ncall, 3)
- self.assertEqual(fsa.nactualcall, 1)
- self.assertEqual(fsa.ttot, 20)
- self.assertEqual(fsa.tsub, 9)
- self.assertEqual(fsb.ttot, 19)
- self.assertEqual(fsb.tsub, 4)
- self.assertEqual(fsc.ttot, 17)
- self.assertEqual(fsc.tsub, 7)
- cfsab = fsa.children[fsb]
- cfsbc = fsb.children[fsc]
- cfsca = fsc.children[fsa]
- self.assertEqual(cfsab.ttot, 19)
- self.assertEqual(cfsab.tsub, 4)
- self.assertEqual(cfsbc.ttot, 17)
- self.assertEqual(cfsbc.tsub, 7)
- self.assertEqual(cfsca.ttot, 13)
- self.assertEqual(cfsca.tsub, 8)
-
- def test_abcbca(self):
- _timings = {"a_1": 10, "b_1": 9, "c_1": 7, "b_2": 4, "c_2": 2, "a_2": 1}
- _yappi._set_test_timings(_timings)
- self._ncall = 1
-
- def a():
- if self._ncall == 1:
- b()
- else:
- return
-
- def b():
- c()
-
- def c():
- if self._ncall == 1:
- self._ncall += 1
- b()
- else:
- a()
-
- stats = utils.run_and_get_func_stats(a)
- fsa = utils.find_stat_by_name(stats, 'a')
- fsb = utils.find_stat_by_name(stats, 'b')
- fsc = utils.find_stat_by_name(stats, 'c')
- cfsab = fsa.children[fsb]
- cfsbc = fsb.children[fsc]
- cfsca = fsc.children[fsa]
- self.assertEqual(fsa.ttot, 10)
- self.assertEqual(fsa.tsub, 2)
- self.assertEqual(fsb.ttot, 9)
- self.assertEqual(fsb.tsub, 4)
- self.assertEqual(fsc.ttot, 7)
- self.assertEqual(fsc.tsub, 4)
- self.assertEqual(cfsab.ttot, 9)
- self.assertEqual(cfsab.tsub, 2)
- self.assertEqual(cfsbc.ttot, 7)
- self.assertEqual(cfsbc.tsub, 4)
- self.assertEqual(cfsca.ttot, 1)
- self.assertEqual(cfsca.tsub, 1)
- self.assertEqual(cfsca.ncall, 1)
- self.assertEqual(cfsca.nactualcall, 0)
-
- def test_aabccb(self):
- _timings = {
- "a_1": 13,
- "a_2": 11,
- "b_1": 9,
- "c_1": 5,
- "c_2": 3,
- "b_2": 1
- }
- _yappi._set_test_timings(_timings)
- self._ncall = 1
-
- def a():
- if self._ncall == 1:
- self._ncall += 1
- a()
- else:
- b()
-
- def b():
- if self._ncall == 3:
- return
- else:
- c()
-
- def c():
- if self._ncall == 2:
- self._ncall += 1
- c()
- else:
- b()
-
- stats = utils.run_and_get_func_stats(a)
- fsa = utils.find_stat_by_name(stats, 'a')
- fsb = utils.find_stat_by_name(stats, 'b')
- fsc = utils.find_stat_by_name(stats, 'c')
- cfsaa = fsa.children[fsa.index]
- cfsab = fsa.children[fsb]
- cfsbc = fsb.children[fsc.full_name]
- cfscc = fsc.children[fsc]
- cfscb = fsc.children[fsb]
- self.assertEqual(fsb.ttot, 9)
- self.assertEqual(fsb.tsub, 5)
- self.assertEqual(cfsbc.ttot, 5)
- self.assertEqual(cfsbc.tsub, 2)
- self.assertEqual(fsa.ttot, 13)
- self.assertEqual(fsa.tsub, 4)
- self.assertEqual(cfsab.ttot, 9)
- self.assertEqual(cfsab.tsub, 4)
- self.assertEqual(cfsaa.ttot, 11)
- self.assertEqual(cfsaa.tsub, 2)
- self.assertEqual(fsc.ttot, 5)
- self.assertEqual(fsc.tsub, 4)
-
- def test_abaa(self):
- _timings = {"a_1": 13, "b_1": 10, "a_2": 9, "a_3": 5}
- _yappi._set_test_timings(_timings)
-
- self._ncall = 1
-
- def a():
- if self._ncall == 1:
- b()
- elif self._ncall == 2:
- self._ncall += 1
- a()
- else:
- return
-
- def b():
- self._ncall += 1
- a()
-
- stats = utils.run_and_get_func_stats(a)
- fsa = utils.find_stat_by_name(stats, 'a')
- fsb = utils.find_stat_by_name(stats, 'b')
- cfsaa = fsa.children[fsa]
- cfsba = fsb.children[fsa]
- self.assertEqual(fsb.ttot, 10)
- self.assertEqual(fsb.tsub, 1)
- self.assertEqual(fsa.ttot, 13)
- self.assertEqual(fsa.tsub, 12)
- self.assertEqual(cfsaa.ttot, 5)
- self.assertEqual(cfsaa.tsub, 5)
- self.assertEqual(cfsba.ttot, 9)
- self.assertEqual(cfsba.tsub, 4)
-
- def test_aabb(self):
- _timings = {"a_1": 13, "a_2": 10, "b_1": 9, "b_2": 5}
- _yappi._set_test_timings(_timings)
-
- self._ncall = 1
-
- def a():
- if self._ncall == 1:
- self._ncall += 1
- a()
- elif self._ncall == 2:
- b()
- else:
- return
-
- def b():
- if self._ncall == 2:
- self._ncall += 1
- b()
- else:
- return
-
- stats = utils.run_and_get_func_stats(a)
- fsa = utils.find_stat_by_name(stats, 'a')
- fsb = utils.find_stat_by_name(stats, 'b')
- cfsaa = fsa.children[fsa]
- cfsab = fsa.children[fsb]
- cfsbb = fsb.children[fsb]
- self.assertEqual(fsa.ttot, 13)
- self.assertEqual(fsa.tsub, 4)
- self.assertEqual(fsb.ttot, 9)
- self.assertEqual(fsb.tsub, 9)
- self.assertEqual(cfsaa.ttot, 10)
- self.assertEqual(cfsaa.tsub, 1)
- self.assertEqual(cfsab.ttot, 9)
- self.assertEqual(cfsab.tsub, 4)
- self.assertEqual(cfsbb.ttot, 5)
- self.assertEqual(cfsbb.tsub, 5)
-
- def test_abbb(self):
- _timings = {"a_1": 13, "b_1": 10, "b_2": 6, "b_3": 1}
- _yappi._set_test_timings(_timings)
-
- self._ncall = 1
-
- def a():
- if self._ncall == 1:
- b()
-
- def b():
- if self._ncall == 3:
- return
- self._ncall += 1
- b()
-
- stats = utils.run_and_get_func_stats(a)
- fsa = utils.find_stat_by_name(stats, 'a')
- fsb = utils.find_stat_by_name(stats, 'b')
- cfsab = fsa.children[fsb]
- cfsbb = fsb.children[fsb]
- self.assertEqual(fsa.ttot, 13)
- self.assertEqual(fsa.tsub, 3)
- self.assertEqual(fsb.ttot, 10)
- self.assertEqual(fsb.tsub, 10)
- self.assertEqual(fsb.ncall, 3)
- self.assertEqual(fsb.nactualcall, 1)
- self.assertEqual(cfsab.ttot, 10)
- self.assertEqual(cfsab.tsub, 4)
- self.assertEqual(cfsbb.ttot, 6)
- self.assertEqual(cfsbb.tsub, 6)
- self.assertEqual(cfsbb.nactualcall, 0)
- self.assertEqual(cfsbb.ncall, 2)
-
- def test_aaab(self):
- _timings = {"a_1": 13, "a_2": 10, "a_3": 6, "b_1": 1}
- _yappi._set_test_timings(_timings)
-
- self._ncall = 1
-
- def a():
- if self._ncall == 3:
- b()
- return
- self._ncall += 1
- a()
-
- def b():
- return
-
- stats = utils.run_and_get_func_stats(a)
- fsa = utils.find_stat_by_name(stats, 'a')
- fsb = utils.find_stat_by_name(stats, 'b')
- cfsaa = fsa.children[fsa]
- cfsab = fsa.children[fsb]
- self.assertEqual(fsa.ttot, 13)
- self.assertEqual(fsa.tsub, 12)
- self.assertEqual(fsb.ttot, 1)
- self.assertEqual(fsb.tsub, 1)
- self.assertEqual(cfsaa.ttot, 10)
- self.assertEqual(cfsaa.tsub, 9)
- self.assertEqual(cfsab.ttot, 1)
- self.assertEqual(cfsab.tsub, 1)
-
- def test_abab(self):
- _timings = {"a_1": 13, "b_1": 10, "a_2": 6, "b_2": 1}
- _yappi._set_test_timings(_timings)
-
- self._ncall = 1
-
- def a():
- b()
-
- def b():
- if self._ncall == 2:
- return
- self._ncall += 1
- a()
-
- stats = utils.run_and_get_func_stats(a)
- fsa = utils.find_stat_by_name(stats, 'a')
- fsb = utils.find_stat_by_name(stats, 'b')
- cfsab = fsa.children[fsb]
- cfsba = fsb.children[fsa]
- self.assertEqual(fsa.ttot, 13)
- self.assertEqual(fsa.tsub, 8)
- self.assertEqual(fsb.ttot, 10)
- self.assertEqual(fsb.tsub, 5)
- self.assertEqual(cfsab.ttot, 10)
- self.assertEqual(cfsab.tsub, 5)
- self.assertEqual(cfsab.ncall, 2)
- self.assertEqual(cfsab.nactualcall, 1)
- self.assertEqual(cfsba.ttot, 6)
- self.assertEqual(cfsba.tsub, 5)
-
-
-if __name__ == '__main__':
- # import sys;sys.argv = ['', 'BasicUsage.test_run_as_script']
- # import sys;sys.argv = ['', 'MultithreadedScenarios.test_subsequent_profile']
- unittest.main()
+import os
+import sys
+import time
+import threading
+import unittest
+import yappi
+import _yappi
+import utils
+import multiprocessing
+import subprocess
+
+_counter = 0
+
+
+class BasicUsage(utils.YappiUnitTestCase):
+
+ def test_callback_function_int_return_overflow(self):
+ # this test is just here to check if any errors are generated, as the err
+ # is printed in C side, I did not include it here. THere are ways to test
+ # this deterministically, I did not bother
+ import ctypes
+
+ def _unsigned_overflow_margin():
+ return 2**(ctypes.sizeof(ctypes.c_void_p) * 8) - 1
+
+ def foo():
+ pass
+
+ #with utils.captured_output() as (out, err):
+ yappi.set_context_id_callback(_unsigned_overflow_margin)
+ yappi.set_tag_callback(_unsigned_overflow_margin)
+ yappi.start()
+ foo()
+
+ def test_issue60(self):
+
+ def foo():
+ buf = bytearray()
+ buf += b't' * 200
+ view = memoryview(buf)[10:]
+ view = view.tobytes()
+ del buf[:10] # this throws exception
+ return view
+
+ yappi.start(builtins=True)
+ foo()
+ self.assertTrue(
+ len(
+ yappi.get_func_stats(
+ filter_callback=lambda x: yappi.
+ func_matches(x, [memoryview.tobytes])
+ )
+ ) > 0
+ )
+ yappi.stop()
+
+ def test_issue54(self):
+
+ def _tag_cbk():
+ global _counter
+ _counter += 1
+ return _counter
+
+ def a():
+ pass
+
+ def b():
+ pass
+
+ yappi.set_tag_callback(_tag_cbk)
+ yappi.start()
+ a()
+ a()
+ a()
+ yappi.stop()
+ stats = yappi.get_func_stats()
+ self.assertEqual(stats.pop().ncall, 3) # aggregated if no tag is given
+ stats = yappi.get_func_stats(tag=1)
+
+ for i in range(1, 3):
+ stats = yappi.get_func_stats(tag=i)
+ stats = yappi.get_func_stats(
+ tag=i, filter_callback=lambda x: yappi.func_matches(x, [a])
+ )
+
+ stat = stats.pop()
+ self.assertEqual(stat.ncall, 1)
+
+ yappi.set_tag_callback(None)
+ yappi.clear_stats()
+ yappi.start()
+ b()
+ b()
+ stats = yappi.get_func_stats()
+ self.assertEqual(len(stats), 1)
+ stat = stats.pop()
+ self.assertEqual(stat.ncall, 2)
+
+ def test_filter(self):
+
+ def a():
+ pass
+
+ def b():
+ a()
+
+ def c():
+ b()
+
+ _TCOUNT = 5
+
+ ts = []
+ yappi.start()
+ for i in range(_TCOUNT):
+ t = threading.Thread(target=c)
+ t.start()
+ ts.append(t)
+
+ for t in ts:
+ t.join()
+
+ yappi.stop()
+
+ ctx_ids = []
+ for tstat in yappi.get_thread_stats():
+ if tstat.name == '_MainThread':
+ main_ctx_id = tstat.id
+ else:
+ ctx_ids.append(tstat.id)
+
+ fstats = yappi.get_func_stats(filter={"ctx_id": 9})
+ self.assertTrue(fstats.empty())
+ fstats = yappi.get_func_stats(
+ filter={
+ "ctx_id": main_ctx_id,
+ "name": "c"
+ }
+ ) # main thread
+ self.assertTrue(fstats.empty())
+
+ for i in ctx_ids:
+ fstats = yappi.get_func_stats(
+ filter={
+ "ctx_id": i,
+ "name": "a",
+ "ncall": 1
+ }
+ )
+ self.assertEqual(fstats.pop().ncall, 1)
+ fstats = yappi.get_func_stats(filter={"ctx_id": i, "name": "b"})
+ self.assertEqual(fstats.pop().ncall, 1)
+ fstats = yappi.get_func_stats(filter={"ctx_id": i, "name": "c"})
+ self.assertEqual(fstats.pop().ncall, 1)
+
+ yappi.clear_stats()
+ yappi.start(builtins=True)
+ time.sleep(0.1)
+ yappi.stop()
+ fstats = yappi.get_func_stats(filter={"module": "time"})
+ self.assertEqual(len(fstats), 1)
+
+ # invalid filters`
+ self.assertRaises(
+ Exception, yappi.get_func_stats, filter={'tag': "sss"}
+ )
+ self.assertRaises(
+ Exception, yappi.get_func_stats, filter={'ctx_id': "None"}
+ )
+
+ def test_filter_callback(self):
+
+ def a():
+ time.sleep(0.1)
+
+ def b():
+ a()
+
+ def c():
+ pass
+
+ def d():
+ pass
+
+ yappi.set_clock_type("wall")
+ yappi.start(builtins=True)
+ a()
+ b()
+ c()
+ d()
+ stats = yappi.get_func_stats(
+ filter_callback=lambda x: yappi.func_matches(x, [a, b])
+ )
+ #stats.print_all()
+ r1 = '''
+ tests/test_functionality.py:98 a 2 0.000000 0.200350 0.100175
+ tests/test_functionality.py:101 b 1 0.000000 0.120000 0.100197
+ '''
+ self.assert_traces_almost_equal(r1, stats)
+ self.assertEqual(len(stats), 2)
+ stats = yappi.get_func_stats(
+ filter_callback=lambda x: yappi.
+ module_matches(x, [sys.modules[__name__]])
+ )
+ r1 = '''
+ tests/test_functionality.py:98 a 2 0.000000 0.230130 0.115065
+ tests/test_functionality.py:101 b 1 0.000000 0.120000 0.109011
+ tests/test_functionality.py:104 c 1 0.000000 0.000002 0.000002
+ tests/test_functionality.py:107 d 1 0.000000 0.000001 0.000001
+ '''
+ self.assert_traces_almost_equal(r1, stats)
+ self.assertEqual(len(stats), 4)
+
+ stats = yappi.get_func_stats(
+ filter_callback=lambda x: yappi.func_matches(x, [time.sleep])
+ )
+ self.assertEqual(len(stats), 1)
+ r1 = '''
+ time.sleep 2 0.206804 0.220000 0.103402
+ '''
+ self.assert_traces_almost_equal(r1, stats)
+
+ def test_print_formatting(self):
+
+ def a():
+ pass
+
+ def b():
+ a()
+
+ func_cols = {
+ 1: ("name", 48),
+ 0: ("ncall", 5),
+ 2: ("tsub", 8),
+ }
+ thread_cols = {
+ 1: ("name", 48),
+ 0: ("ttot", 8),
+ }
+
+ yappi.start()
+ a()
+ b()
+ yappi.stop()
+ fs = yappi.get_func_stats()
+ cs = fs[1].children
+ ts = yappi.get_thread_stats()
+ #fs.print_all(out=sys.stderr, columns={1:("name", 70), })
+ #cs.print_all(out=sys.stderr, columns=func_cols)
+ #ts.print_all(out=sys.stderr, columns=thread_cols)
+ #cs.print_all(out=sys.stderr, columns={})
+
+ self.assertRaises(
+ yappi.YappiError, fs.print_all, columns={1: ("namee", 9)}
+ )
+ self.assertRaises(
+ yappi.YappiError, cs.print_all, columns={1: ("dd", 0)}
+ )
+ self.assertRaises(
+ yappi.YappiError, ts.print_all, columns={1: ("tidd", 0)}
+ )
+
+ def test_get_clock(self):
+ yappi.set_clock_type('cpu')
+ self.assertEqual('cpu', yappi.get_clock_type())
+ clock_info = yappi.get_clock_info()
+ self.assertTrue('api' in clock_info)
+ self.assertTrue('resolution' in clock_info)
+
+ yappi.set_clock_type('wall')
+ self.assertEqual('wall', yappi.get_clock_type())
+
+ t0 = yappi.get_clock_time()
+ time.sleep(0.1)
+ duration = yappi.get_clock_time() - t0
+ self.assertTrue(0.05 < duration < 0.3)
+
+ def test_profile_decorator(self):
+
+ def aggregate(func, stats):
+ fname = f"tests/{func.__name__}.profile"
+ try:
+ stats.add(fname)
+ except OSError:
+ pass
+ stats.save(fname)
+ raise Exception("messing around")
+
+ @yappi.profile(return_callback=aggregate)
+ def a(x, y):
+ if x + y == 25:
+ raise Exception("")
+ return x + y
+
+ def b():
+ pass
+
+ try:
+ os.remove(
+ "tests/a.profile"
+ ) # remove the one from prev test, if available
+ except:
+ pass
+
+ # global profile is on to mess things up
+ yappi.start()
+ b()
+
+ # assert functionality and call function at same time
+ try:
+ self.assertEqual(a(1, 2), 3)
+ except:
+ pass
+ try:
+ self.assertEqual(a(2, 5), 7)
+ except:
+ pass
+ try:
+ a(4, 21)
+ except:
+ pass
+ stats = yappi.get_func_stats().add("tests/a.profile")
+ fsa = utils.find_stat_by_name(stats, 'a')
+ self.assertEqual(fsa.ncall, 3)
+ self.assertEqual(len(stats), 1) # b() should be cleared out.
+
+ @yappi.profile(return_callback=aggregate)
+ def count_down_rec(n):
+ if n == 0:
+ return
+ count_down_rec(n - 1)
+
+ try:
+ os.remove(
+ "tests/count_down_rec.profile"
+ ) # remove the one from prev test, if available
+ except:
+ pass
+
+ try:
+ count_down_rec(4)
+ except:
+ pass
+ try:
+ count_down_rec(3)
+ except:
+ pass
+
+ stats = yappi.YFuncStats("tests/count_down_rec.profile")
+ fsrec = utils.find_stat_by_name(stats, 'count_down_rec')
+ self.assertEqual(fsrec.ncall, 9)
+ self.assertEqual(fsrec.nactualcall, 2)
+
+ def test_strip_dirs(self):
+
+ def a():
+ pass
+
+ stats = utils.run_and_get_func_stats(a, )
+ stats.strip_dirs()
+ fsa = utils.find_stat_by_name(stats, "a")
+ self.assertEqual(fsa.module, os.path.basename(fsa.module))
+
+ @unittest.skipIf(os.name == "nt", "do not run on Windows")
+ def test_run_as_script(self):
+ import re
+ p = subprocess.Popen(
+ ['yappi', os.path.join('./tests', 'run_as_script.py')],
+ stdout=subprocess.PIPE
+ )
+ out, err = p.communicate()
+ self.assertEqual(p.returncode, 0)
+ func_stats, thread_stats = re.split(
+ b'name\\s+id\\s+tid\\s+ttot\\s+scnt\\s*\n', out
+ )
+ self.assertTrue(b'FancyThread' in thread_stats)
+
+ def test_yappi_overhead(self):
+ LOOP_COUNT = 100000
+
+ def a():
+ pass
+
+ def b():
+ for i in range(LOOP_COUNT):
+ a()
+
+ t0 = time.time()
+ yappi.start()
+ b()
+ yappi.stop()
+ time_with_yappi = time.time() - t0
+ t0 = time.time()
+ b()
+ time_without_yappi = time.time() - t0
+ if time_without_yappi == 0:
+ time_without_yappi = 0.000001
+
+ # in latest v0.82, I calculated this as close to "7.0" in my machine.
+ # however, %83 of this overhead is coming from tickcount(). The other %17
+ # seems to have been evenly distributed to the internal bookkeeping
+ # structures/algorithms which seems acceptable. Note that our test only
+ # tests one function being profiled at-a-time in a short interval.
+ # profiling high number of functions in a small time
+ # is a different beast, (which is pretty unlikely in most applications)
+ # So as a conclusion: I cannot see any optimization window for Yappi that
+ # is worth implementing as we will only optimize %17 of the time.
+ sys.stderr.write("\r\nYappi puts %0.1f times overhead to the profiled application in average.\r\n" % \
+ (time_with_yappi / time_without_yappi))
+
+ def test_clear_stats_while_running(self):
+
+ def a():
+ pass
+
+ yappi.start()
+ a()
+ yappi.clear_stats()
+ a()
+ stats = yappi.get_func_stats()
+ fsa = utils.find_stat_by_name(stats, 'a')
+ self.assertEqual(fsa.ncall, 1)
+
+ def test_generator(self):
+
+ def _gen(n):
+ while (n > 0):
+ yield n
+ n -= 1
+
+ yappi.start()
+ for x in _gen(5):
+ pass
+ self.assertTrue(
+ yappi.convert2pstats(yappi.get_func_stats()) is not None
+ )
+
+ def test_slice_child_stats_and_strip_dirs(self):
+
+ def b():
+ for i in range(10000000):
+ pass
+
+ def a():
+ b()
+
+ yappi.start(builtins=True)
+ a()
+ stats = yappi.get_func_stats()
+ fsa = utils.find_stat_by_name(stats, 'a')
+ fsb = utils.find_stat_by_name(stats, 'b')
+ self.assertTrue(fsa.children[0:1] is not None)
+ prev_afullname = fsa.full_name
+ prev_bchildfullname = fsa.children[fsb].full_name
+ stats.strip_dirs()
+ self.assertTrue(len(prev_afullname) > len(fsa.full_name))
+ self.assertTrue(
+ len(prev_bchildfullname) > len(fsa.children[fsb].full_name)
+ )
+
+ def test_children_stat_functions(self):
+ _timings = {"a_1": 5, "b_1": 3, "c_1": 1}
+ _yappi._set_test_timings(_timings)
+
+ def b():
+ pass
+
+ def c():
+ pass
+
+ def a():
+ b()
+ c()
+
+ yappi.start()
+ a()
+ b() # non-child call
+ c() # non-child call
+ stats = yappi.get_func_stats()
+ fsa = utils.find_stat_by_name(stats, 'a')
+ childs_of_a = fsa.children.get().sort("tavg", "desc")
+ prev_item = None
+ for item in childs_of_a:
+ if prev_item:
+ self.assertTrue(prev_item.tavg > item.tavg)
+ prev_item = item
+ childs_of_a.sort("name", "desc")
+ prev_item = None
+ for item in childs_of_a:
+ if prev_item:
+ self.assertTrue(prev_item.name > item.name)
+ prev_item = item
+ childs_of_a.clear()
+ self.assertTrue(childs_of_a.empty())
+
+ def test_no_stats_different_clock_type_load(self):
+
+ def a():
+ pass
+
+ yappi.start()
+ a()
+ yappi.stop()
+ yappi.get_func_stats().save("tests/ystats1.ys")
+ yappi.clear_stats()
+ yappi.set_clock_type("WALL")
+ yappi.start()
+ yappi.stop()
+ stats = yappi.get_func_stats().add("tests/ystats1.ys")
+ fsa = utils.find_stat_by_name(stats, 'a')
+ self.assertTrue(fsa is not None)
+
+ def test_subsequent_profile(self):
+ _timings = {"a_1": 1, "b_1": 1}
+ _yappi._set_test_timings(_timings)
+
+ def a():
+ pass
+
+ def b():
+ pass
+
+ yappi.start()
+ a()
+ yappi.stop()
+ yappi.start()
+ b()
+ yappi.stop()
+ stats = yappi.get_func_stats()
+ fsa = utils.find_stat_by_name(stats, 'a')
+ fsb = utils.find_stat_by_name(stats, 'b')
+ self.assertTrue(fsa is not None)
+ self.assertTrue(fsb is not None)
+ self.assertEqual(fsa.ttot, 1)
+ self.assertEqual(fsb.ttot, 1)
+
+ def test_lambda(self):
+ f = lambda: time.sleep(0.3)
+ yappi.set_clock_type("wall")
+ yappi.start()
+ f()
+ stats = yappi.get_func_stats()
+ fsa = utils.find_stat_by_name(stats, '<lambda>')
+ self.assertTrue(fsa.ttot > 0.1)
+
+ def test_module_stress(self):
+ self.assertEqual(yappi.is_running(), False)
+
+ yappi.start()
+ yappi.clear_stats()
+ self.assertRaises(_yappi.error, yappi.set_clock_type, "wall")
+
+ yappi.stop()
+ yappi.clear_stats()
+ yappi.set_clock_type("cpu")
+ self.assertRaises(yappi.YappiError, yappi.set_clock_type, "dummy")
+ self.assertEqual(yappi.is_running(), False)
+ yappi.clear_stats()
+ yappi.clear_stats()
+
+ def test_stat_sorting(self):
+ _timings = {"a_1": 13, "b_1": 10, "a_2": 6, "b_2": 1}
+ _yappi._set_test_timings(_timings)
+
+ self._ncall = 1
+
+ def a():
+ b()
+
+ def b():
+ if self._ncall == 2:
+ return
+ self._ncall += 1
+ a()
+
+ stats = utils.run_and_get_func_stats(a)
+ stats = stats.sort("totaltime", "desc")
+ prev_stat = None
+ for stat in stats:
+ if prev_stat:
+ self.assertTrue(prev_stat.ttot >= stat.ttot)
+ prev_stat = stat
+ stats = stats.sort("totaltime", "asc")
+ prev_stat = None
+ for stat in stats:
+ if prev_stat:
+ self.assertTrue(prev_stat.ttot <= stat.ttot)
+ prev_stat = stat
+ stats = stats.sort("avgtime", "asc")
+ prev_stat = None
+ for stat in stats:
+ if prev_stat:
+ self.assertTrue(prev_stat.tavg <= stat.tavg)
+ prev_stat = stat
+ stats = stats.sort("name", "asc")
+ prev_stat = None
+ for stat in stats:
+ if prev_stat:
+ self.assertTrue(prev_stat.name <= stat.name)
+ prev_stat = stat
+ stats = stats.sort("subtime", "asc")
+ prev_stat = None
+ for stat in stats:
+ if prev_stat:
+ self.assertTrue(prev_stat.tsub <= stat.tsub)
+ prev_stat = stat
+
+ self.assertRaises(
+ yappi.YappiError, stats.sort, "invalid_func_sorttype_arg"
+ )
+ self.assertRaises(
+ yappi.YappiError, stats.sort, "totaltime",
+ "invalid_func_sortorder_arg"
+ )
+
+ def test_start_flags(self):
+ self.assertEqual(_yappi._get_start_flags(), None)
+ yappi.start()
+
+ def a():
+ pass
+
+ a()
+ self.assertEqual(_yappi._get_start_flags()["profile_builtins"], 0)
+ self.assertEqual(_yappi._get_start_flags()["profile_multicontext"], 1)
+ self.assertEqual(len(yappi.get_thread_stats()), 1)
+
+ def test_builtin_profiling(self):
+
+ def a():
+ time.sleep(0.4) # is a builtin function
+
+ yappi.set_clock_type('wall')
+
+ yappi.start(builtins=True)
+ a()
+ stats = yappi.get_func_stats()
+ fsa = utils.find_stat_by_name(stats, 'sleep')
+ self.assertTrue(fsa is not None)
+ self.assertTrue(fsa.ttot > 0.3)
+ yappi.stop()
+ yappi.clear_stats()
+
+ def a():
+ pass
+
+ yappi.start()
+ t = threading.Thread(target=a)
+ t.start()
+ t.join()
+ stats = yappi.get_func_stats()
+
+ def test_singlethread_profiling(self):
+ yappi.set_clock_type('wall')
+
+ def a():
+ time.sleep(0.2)
+
+ class Worker1(threading.Thread):
+
+ def a(self):
+ time.sleep(0.3)
+
+ def run(self):
+ self.a()
+
+ yappi.start(profile_threads=False)
+
+ c = Worker1()
+ c.start()
+ c.join()
+ a()
+ stats = yappi.get_func_stats()
+ fsa1 = utils.find_stat_by_name(stats, 'Worker1.a')
+ fsa2 = utils.find_stat_by_name(stats, 'a')
+ self.assertTrue(fsa1 is None)
+ self.assertTrue(fsa2 is not None)
+ self.assertTrue(fsa2.ttot > 0.1)
+
+ def test_run(self):
+
+ def profiled():
+ pass
+
+ yappi.clear_stats()
+ try:
+ with yappi.run():
+ profiled()
+ stats = yappi.get_func_stats()
+ finally:
+ yappi.clear_stats()
+
+ self.assertIsNotNone(utils.find_stat_by_name(stats, 'profiled'))
+
+ def test_run_recursive(self):
+
+ def profiled():
+ pass
+
+ def not_profiled():
+ pass
+
+ yappi.clear_stats()
+ try:
+ with yappi.run():
+ with yappi.run():
+ profiled()
+ # Profiling stopped here
+ not_profiled()
+ stats = yappi.get_func_stats()
+ finally:
+ yappi.clear_stats()
+
+ self.assertIsNotNone(utils.find_stat_by_name(stats, 'profiled'))
+ self.assertIsNone(utils.find_stat_by_name(stats, 'not_profiled'))
+
+
+class StatSaveScenarios(utils.YappiUnitTestCase):
+
+ def test_pstats_conversion(self):
+
+ def pstat_id(fs):
+ return (fs.module, fs.lineno, fs.name)
+
+ def a():
+ d()
+
+ def b():
+ d()
+
+ def c():
+ pass
+
+ def d():
+ pass
+
+ _timings = {"a_1": 12, "b_1": 7, "c_1": 5, "d_1": 2}
+ _yappi._set_test_timings(_timings)
+ stats = utils.run_and_get_func_stats(a, )
+ stats.strip_dirs()
+ stats.save("tests/a1.pstats", type="pstat")
+ fsa_pid = pstat_id(utils.find_stat_by_name(stats, "a"))
+ fsd_pid = pstat_id(utils.find_stat_by_name(stats, "d"))
+ yappi.clear_stats()
+ _yappi._set_test_timings(_timings)
+ stats = utils.run_and_get_func_stats(a, )
+ stats.strip_dirs()
+ stats.save("tests/a2.pstats", type="pstat")
+ yappi.clear_stats()
+ _yappi._set_test_timings(_timings)
+ stats = utils.run_and_get_func_stats(b, )
+ stats.strip_dirs()
+ stats.save("tests/b1.pstats", type="pstat")
+ fsb_pid = pstat_id(utils.find_stat_by_name(stats, "b"))
+ yappi.clear_stats()
+ _yappi._set_test_timings(_timings)
+ stats = utils.run_and_get_func_stats(c, )
+ stats.strip_dirs()
+ stats.save("tests/c1.pstats", type="pstat")
+ fsc_pid = pstat_id(utils.find_stat_by_name(stats, "c"))
+
+ # merge saved stats and check pstats values are correct
+ import pstats
+ p = pstats.Stats(
+ 'tests/a1.pstats', 'tests/a2.pstats', 'tests/b1.pstats',
+ 'tests/c1.pstats'
+ )
+ p.strip_dirs()
+ # ct = ttot, tt = tsub
+ (cc, nc, tt, ct, callers) = p.stats[fsa_pid]
+ self.assertEqual(cc, nc, 2)
+ self.assertEqual(tt, 20)
+ self.assertEqual(ct, 24)
+ (cc, nc, tt, ct, callers) = p.stats[fsd_pid]
+ self.assertEqual(cc, nc, 3)
+ self.assertEqual(tt, 6)
+ self.assertEqual(ct, 6)
+ self.assertEqual(len(callers), 2)
+ (cc, nc, tt, ct) = callers[fsa_pid]
+ self.assertEqual(cc, nc, 2)
+ self.assertEqual(tt, 4)
+ self.assertEqual(ct, 4)
+ (cc, nc, tt, ct) = callers[fsb_pid]
+ self.assertEqual(cc, nc, 1)
+ self.assertEqual(tt, 2)
+ self.assertEqual(ct, 2)
+
+ def test_merge_stats(self):
+ _timings = {
+ "a_1": 15,
+ "b_1": 14,
+ "c_1": 12,
+ "d_1": 10,
+ "e_1": 9,
+ "f_1": 7,
+ "g_1": 6,
+ "h_1": 5,
+ "i_1": 1
+ }
+ _yappi._set_test_timings(_timings)
+
+ def a():
+ b()
+
+ def b():
+ c()
+
+ def c():
+ d()
+
+ def d():
+ e()
+
+ def e():
+ f()
+
+ def f():
+ g()
+
+ def g():
+ h()
+
+ def h():
+ i()
+
+ def i():
+ pass
+
+ yappi.start()
+ a()
+ a()
+ yappi.stop()
+ stats = yappi.get_func_stats()
+ self.assertRaises(
+ NotImplementedError, stats.save, "", "INVALID_SAVE_TYPE"
+ )
+ stats.save("tests/ystats2.ys")
+ yappi.clear_stats()
+ _yappi._set_test_timings(_timings)
+ yappi.start()
+ a()
+ stats = yappi.get_func_stats().add("tests/ystats2.ys")
+ fsa = utils.find_stat_by_name(stats, "a")
+ fsb = utils.find_stat_by_name(stats, "b")
+ fsc = utils.find_stat_by_name(stats, "c")
+ fsd = utils.find_stat_by_name(stats, "d")
+ fse = utils.find_stat_by_name(stats, "e")
+ fsf = utils.find_stat_by_name(stats, "f")
+ fsg = utils.find_stat_by_name(stats, "g")
+ fsh = utils.find_stat_by_name(stats, "h")
+ fsi = utils.find_stat_by_name(stats, "i")
+ self.assertEqual(fsa.ttot, 45)
+ self.assertEqual(fsa.ncall, 3)
+ self.assertEqual(fsa.nactualcall, 3)
+ self.assertEqual(fsa.tsub, 3)
+ self.assertEqual(fsa.children[fsb].ttot, fsb.ttot)
+ self.assertEqual(fsa.children[fsb].tsub, fsb.tsub)
+ self.assertEqual(fsb.children[fsc].ttot, fsc.ttot)
+ self.assertEqual(fsb.children[fsc].tsub, fsc.tsub)
+ self.assertEqual(fsc.tsub, 6)
+ self.assertEqual(fsc.children[fsd].ttot, fsd.ttot)
+ self.assertEqual(fsc.children[fsd].tsub, fsd.tsub)
+ self.assertEqual(fsd.children[fse].ttot, fse.ttot)
+ self.assertEqual(fsd.children[fse].tsub, fse.tsub)
+ self.assertEqual(fse.children[fsf].ttot, fsf.ttot)
+ self.assertEqual(fse.children[fsf].tsub, fsf.tsub)
+ self.assertEqual(fsf.children[fsg].ttot, fsg.ttot)
+ self.assertEqual(fsf.children[fsg].tsub, fsg.tsub)
+ self.assertEqual(fsg.ttot, 18)
+ self.assertEqual(fsg.tsub, 3)
+ self.assertEqual(fsg.children[fsh].ttot, fsh.ttot)
+ self.assertEqual(fsg.children[fsh].tsub, fsh.tsub)
+ self.assertEqual(fsh.ttot, 15)
+ self.assertEqual(fsh.tsub, 12)
+ self.assertEqual(fsh.tavg, 5)
+ self.assertEqual(fsh.children[fsi].ttot, fsi.ttot)
+ self.assertEqual(fsh.children[fsi].tsub, fsi.tsub)
+ #stats.debug_print()
+
+ def test_merge_multithreaded_stats(self):
+ import _yappi
+ timings = {"a_1": 2, "b_1": 1}
+ _yappi._set_test_timings(timings)
+
+ def a():
+ pass
+
+ def b():
+ pass
+
+ yappi.start()
+ t = threading.Thread(target=a)
+ t.start()
+ t.join()
+ t = threading.Thread(target=b)
+ t.start()
+ t.join()
+ yappi.get_func_stats().save("tests/ystats1.ys")
+ yappi.clear_stats()
+ _yappi._set_test_timings(timings)
+ self.assertEqual(len(yappi.get_func_stats()), 0)
+ self.assertEqual(len(yappi.get_thread_stats()), 1)
+ t = threading.Thread(target=a)
+ t.start()
+ t.join()
+
+ self.assertEqual(_yappi._get_start_flags()["profile_builtins"], 0)
+ self.assertEqual(_yappi._get_start_flags()["profile_multicontext"], 1)
+ yappi.get_func_stats().save("tests/ystats2.ys")
+
+ stats = yappi.YFuncStats([
+ "tests/ystats1.ys",
+ "tests/ystats2.ys",
+ ])
+ fsa = utils.find_stat_by_name(stats, "a")
+ fsb = utils.find_stat_by_name(stats, "b")
+ self.assertEqual(fsa.ncall, 2)
+ self.assertEqual(fsb.ncall, 1)
+ self.assertEqual(fsa.tsub, fsa.ttot, 4)
+ self.assertEqual(fsb.tsub, fsb.ttot, 1)
+
+ def test_merge_load_different_clock_types(self):
+ yappi.start(builtins=True)
+
+ def a():
+ b()
+
+ def b():
+ c()
+
+ def c():
+ pass
+
+ t = threading.Thread(target=a)
+ t.start()
+ t.join()
+ yappi.get_func_stats().sort("name", "asc").save("tests/ystats1.ys")
+ yappi.stop()
+ yappi.clear_stats()
+ yappi.start(builtins=False)
+ t = threading.Thread(target=a)
+ t.start()
+ t.join()
+ yappi.get_func_stats().save("tests/ystats2.ys")
+ yappi.stop()
+ self.assertRaises(_yappi.error, yappi.set_clock_type, "wall")
+ yappi.clear_stats()
+ yappi.set_clock_type("wall")
+ yappi.start()
+ t = threading.Thread(target=a)
+ t.start()
+ t.join()
+ yappi.get_func_stats().save("tests/ystats3.ys")
+ self.assertRaises(
+ yappi.YappiError,
+ yappi.YFuncStats().add("tests/ystats1.ys").add, "tests/ystats3.ys"
+ )
+ stats = yappi.YFuncStats(["tests/ystats1.ys",
+ "tests/ystats2.ys"]).sort("name")
+ fsa = utils.find_stat_by_name(stats, "a")
+ fsb = utils.find_stat_by_name(stats, "b")
+ fsc = utils.find_stat_by_name(stats, "c")
+ self.assertEqual(fsa.ncall, 2)
+ self.assertEqual(fsa.ncall, fsb.ncall, fsc.ncall)
+
+ def test_merge_aabab_aabbc(self):
+ _timings = {
+ "a_1": 15,
+ "a_2": 14,
+ "b_1": 12,
+ "a_3": 10,
+ "b_2": 9,
+ "c_1": 4
+ }
+ _yappi._set_test_timings(_timings)
+
+ def a():
+ if self._ncall == 1:
+ self._ncall += 1
+ a()
+ elif self._ncall == 5:
+ self._ncall += 1
+ a()
+ else:
+ b()
+
+ def b():
+ if self._ncall == 2:
+ self._ncall += 1
+ a()
+ elif self._ncall == 6:
+ self._ncall += 1
+ b()
+ elif self._ncall == 7:
+ c()
+ else:
+ return
+
+ def c():
+ pass
+
+ self._ncall = 1
+ stats = utils.run_and_get_func_stats(a, )
+ stats.save("tests/ystats1.ys")
+ yappi.clear_stats()
+ _yappi._set_test_timings(_timings)
+ #stats.print_all()
+
+ self._ncall = 5
+ stats = utils.run_and_get_func_stats(a, )
+ stats.save("tests/ystats2.ys")
+
+ #stats.print_all()
+
+ def a(): # same name but another function(code object)
+ pass
+
+ yappi.start()
+ a()
+ stats = yappi.get_func_stats().add(
+ ["tests/ystats1.ys", "tests/ystats2.ys"]
+ )
+ #stats.print_all()
+ self.assertEqual(len(stats), 4)
+
+ fsa = None
+ for stat in stats:
+ if stat.name == "a" and stat.ttot == 45:
+ fsa = stat
+ break
+ self.assertTrue(fsa is not None)
+
+ self.assertEqual(fsa.ncall, 7)
+ self.assertEqual(fsa.nactualcall, 3)
+ self.assertEqual(fsa.ttot, 45)
+ self.assertEqual(fsa.tsub, 10)
+ fsb = utils.find_stat_by_name(stats, "b")
+ fsc = utils.find_stat_by_name(stats, "c")
+ self.assertEqual(fsb.ncall, 6)
+ self.assertEqual(fsb.nactualcall, 3)
+ self.assertEqual(fsb.ttot, 36)
+ self.assertEqual(fsb.tsub, 27)
+ self.assertEqual(fsb.tavg, 6)
+ self.assertEqual(fsc.ttot, 8)
+ self.assertEqual(fsc.tsub, 8)
+ self.assertEqual(fsc.tavg, 4)
+ self.assertEqual(fsc.nactualcall, fsc.ncall, 2)
+
+
+class MultithreadedScenarios(utils.YappiUnitTestCase):
+
+ def test_issue_32(self):
+ '''
+ Start yappi from different thread and we get Internal Error(15) as
+ the current_ctx_id() called while enumerating the threads in start()
+ and as it does not swap to the enumerated ThreadState* the THreadState_GetDict()
+ returns wrong object and thus sets an invalid id for the _ctx structure.
+
+ When this issue happens multiple Threads have same tid as the internal ts_ptr
+ will be same for different contexts. So, let's see if that happens
+ '''
+
+ def foo():
+ time.sleep(0.2)
+
+ def bar():
+ time.sleep(0.1)
+
+ def thread_func():
+ yappi.set_clock_type("wall")
+ yappi.start()
+
+ bar()
+
+ t = threading.Thread(target=thread_func)
+ t.start()
+ t.join()
+
+ foo()
+
+ yappi.stop()
+
+ thread_ids = set()
+ for tstat in yappi.get_thread_stats():
+ self.assertTrue(tstat.tid not in thread_ids)
+ thread_ids.add(tstat.tid)
+
+ def test_subsequent_profile(self):
+ WORKER_COUNT = 5
+
+ def a():
+ pass
+
+ def b():
+ pass
+
+ def c():
+ pass
+
+ _timings = {
+ "a_1": 3,
+ "b_1": 2,
+ "c_1": 1,
+ }
+
+ yappi.start()
+
+ def g():
+ pass
+
+ g()
+ yappi.stop()
+ yappi.clear_stats()
+ _yappi._set_test_timings(_timings)
+ yappi.start()
+
+ _dummy = []
+ for i in range(WORKER_COUNT):
+ t = threading.Thread(target=a)
+ t.start()
+ t.join()
+ for i in range(WORKER_COUNT):
+ t = threading.Thread(target=b)
+ t.start()
+ _dummy.append(t)
+ t.join()
+ for i in range(WORKER_COUNT):
+ t = threading.Thread(target=a)
+ t.start()
+ t.join()
+ for i in range(WORKER_COUNT):
+ t = threading.Thread(target=c)
+ t.start()
+ t.join()
+ yappi.stop()
+ yappi.start()
+
+ def f():
+ pass
+
+ f()
+ stats = yappi.get_func_stats()
+ fsa = utils.find_stat_by_name(stats, 'a')
+ fsb = utils.find_stat_by_name(stats, 'b')
+ fsc = utils.find_stat_by_name(stats, 'c')
+ self.assertEqual(fsa.ncall, 10)
+ self.assertEqual(fsb.ncall, 5)
+ self.assertEqual(fsc.ncall, 5)
+ self.assertEqual(fsa.ttot, fsa.tsub, 30)
+ self.assertEqual(fsb.ttot, fsb.tsub, 10)
+ self.assertEqual(fsc.ttot, fsc.tsub, 5)
+
+ # MACOSx optimizes by only creating one worker thread
+ self.assertTrue(len(yappi.get_thread_stats()) >= 2)
+
+ def test_basic(self):
+ yappi.set_clock_type('wall')
+
+ def dummy():
+ pass
+
+ def a():
+ time.sleep(0.2)
+
+ class Worker1(threading.Thread):
+
+ def a(self):
+ time.sleep(0.3)
+
+ def run(self):
+ self.a()
+
+ yappi.start(builtins=False, profile_threads=True)
+
+ c = Worker1()
+ c.start()
+ c.join()
+ a()
+ stats = yappi.get_func_stats()
+ fsa1 = utils.find_stat_by_name(stats, 'Worker1.a')
+ fsa2 = utils.find_stat_by_name(stats, 'a')
+ self.assertTrue(fsa1 is not None)
+ self.assertTrue(fsa2 is not None)
+ self.assertTrue(fsa1.ttot > 0.2)
+ self.assertTrue(fsa2.ttot > 0.1)
+ tstats = yappi.get_thread_stats()
+ self.assertEqual(len(tstats), 2)
+ tsa = utils.find_stat_by_name(tstats, 'Worker1')
+ tsm = utils.find_stat_by_name(tstats, '_MainThread')
+ dummy() # call dummy to force ctx name to be retrieved again.
+ self.assertTrue(tsa is not None)
+ # TODO: I put dummy() to fix below, remove the comments after a while.
+ self.assertTrue( # FIX: I see this fails sometimes?
+ tsm is not None,
+ f"Could not find \"_MainThread\". Found: {', '.join(utils.get_stat_names(tstats))}")
+
+ def test_ctx_stats(self):
+ from threading import Thread
+ DUMMY_WORKER_COUNT = 5
+ yappi.start()
+
+ class DummyThread(Thread):
+ pass
+
+ def dummy():
+ pass
+
+ def dummy_worker():
+ pass
+
+ for i in range(DUMMY_WORKER_COUNT):
+ t = DummyThread(target=dummy_worker)
+ t.start()
+ t.join()
+ yappi.stop()
+ stats = yappi.get_thread_stats()
+ tsa = utils.find_stat_by_name(stats, "DummyThread")
+ self.assertTrue(tsa is not None)
+ yappi.clear_stats()
+ time.sleep(1.0)
+ _timings = {
+ "a_1": 6,
+ "b_1": 5,
+ "c_1": 3,
+ "d_1": 1,
+ "a_2": 4,
+ "b_2": 3,
+ "c_2": 2,
+ "d_2": 1
+ }
+ _yappi._set_test_timings(_timings)
+
+ class Thread1(Thread):
+ pass
+
+ class Thread2(Thread):
+ pass
+
+ def a():
+ b()
+
+ def b():
+ c()
+
+ def c():
+ d()
+
+ def d():
+ time.sleep(0.6)
+
+ yappi.set_clock_type("wall")
+ yappi.start()
+ t1 = Thread1(target=a)
+ t1.start()
+ t2 = Thread2(target=a)
+ t2.start()
+ t1.join()
+ t2.join()
+ stats = yappi.get_thread_stats()
+
+ # the fist clear_stats clears the context table?
+ tsa = utils.find_stat_by_name(stats, "DummyThread")
+ self.assertTrue(tsa is None)
+
+ tst1 = utils.find_stat_by_name(stats, "Thread1")
+ tst2 = utils.find_stat_by_name(stats, "Thread2")
+ tsmain = utils.find_stat_by_name(stats, "_MainThread")
+ dummy() # call dummy to force ctx name to be retrieved again.
+ self.assertTrue(len(stats) == 3)
+ self.assertTrue(tst1 is not None)
+ self.assertTrue(tst2 is not None)
+ # TODO: I put dummy() to fix below, remove the comments after a while.
+ self.assertTrue( # FIX: I see this fails sometimes
+ tsmain is not None,
+ f"Could not find \"_MainThread\". Found: {', '.join(utils.get_stat_names(stats))}")
+ self.assertTrue(1.0 > tst2.ttot >= 0.5)
+ self.assertTrue(1.0 > tst1.ttot >= 0.5)
+
+ # test sorting of the ctx stats
+ stats = stats.sort("totaltime", "desc")
+ prev_stat = None
+ for stat in stats:
+ if prev_stat:
+ self.assertTrue(prev_stat.ttot >= stat.ttot)
+ prev_stat = stat
+ stats = stats.sort("totaltime", "asc")
+ prev_stat = None
+ for stat in stats:
+ if prev_stat:
+ self.assertTrue(prev_stat.ttot <= stat.ttot)
+ prev_stat = stat
+ stats = stats.sort("schedcount", "desc")
+ prev_stat = None
+ for stat in stats:
+ if prev_stat:
+ self.assertTrue(prev_stat.sched_count >= stat.sched_count)
+ prev_stat = stat
+ stats = stats.sort("name", "desc")
+ prev_stat = None
+ for stat in stats:
+ if prev_stat:
+ self.assertTrue(prev_stat.name.lower() >= stat.name.lower())
+ prev_stat = stat
+ self.assertRaises(
+ yappi.YappiError, stats.sort, "invalid_thread_sorttype_arg"
+ )
+ self.assertRaises(
+ yappi.YappiError, stats.sort, "invalid_thread_sortorder_arg"
+ )
+
+ def test_ctx_stats_cpu(self):
+
+ def get_thread_name():
+ try:
+ return threading.current_thread().name
+ except AttributeError:
+ return "Anonymous"
+
+ def burn_cpu(sec):
+ t0 = yappi.get_clock_time()
+ elapsed = 0
+ while (elapsed < sec):
+ for _ in range(1000):
+ pass
+ elapsed = yappi.get_clock_time() - t0
+
+ def test():
+
+ ts = []
+ for i in (0.01, 0.05, 0.1):
+ t = threading.Thread(target=burn_cpu, args=(i, ))
+ t.name = f"burn_cpu-{str(i)}"
+ t.start()
+ ts.append(t)
+ for t in ts:
+ t.join()
+
+ yappi.set_clock_type("cpu")
+ yappi.set_context_name_callback(get_thread_name)
+
+ yappi.start()
+
+ test()
+
+ yappi.stop()
+
+ tstats = yappi.get_thread_stats()
+ r1 = '''
+ burn_cpu-0.1 3 123145356058624 0.100105 8
+ burn_cpu-0.05 2 123145361313792 0.050149 8
+ burn_cpu-0.01 1 123145356058624 0.010127 2
+ MainThread 0 4321620864 0.001632 6
+ '''
+ self.assert_ctx_stats_almost_equal(r1, tstats)
+
+ def test_producer_consumer_with_queues(self):
+ # we currently just stress yappi, no functionality test is done here.
+ yappi.start()
+ from queue import Queue
+ from threading import Thread
+ WORKER_THREAD_COUNT = 50
+ WORK_ITEM_COUNT = 2000
+
+ def worker():
+ while True:
+ item = q.get()
+ # do the work with item
+ q.task_done()
+
+ q = Queue()
+ for i in range(WORKER_THREAD_COUNT):
+ t = Thread(target=worker)
+ t.daemon = True
+ t.start()
+
+ for item in range(WORK_ITEM_COUNT):
+ q.put(item)
+ q.join() # block until all tasks are done
+ #yappi.get_func_stats().sort("callcount").print_all()
+ yappi.stop()
+
+ def test_temporary_lock_waiting(self):
+ yappi.start()
+ _lock = threading.Lock()
+
+ def worker():
+ _lock.acquire()
+ try:
+ time.sleep(1.0)
+ finally:
+ _lock.release()
+
+ t1 = threading.Thread(target=worker)
+ t2 = threading.Thread(target=worker)
+ t1.start()
+ t2.start()
+ t1.join()
+ t2.join()
+ #yappi.get_func_stats().sort("callcount").print_all()
+ yappi.stop()
+
+ @unittest.skipIf(os.name != "posix", "requires Posix compliant OS")
+ def test_signals_with_blocking_calls(self):
+ import signal, os, time
+
+ # just to verify if signal is handled correctly and stats/yappi are not corrupted.
+ def handler(signum, frame):
+ raise Exception("Signal handler executed!")
+
+ yappi.start()
+ signal.signal(signal.SIGALRM, handler)
+ signal.alarm(1)
+ self.assertRaises(Exception, time.sleep, 2)
+ stats = yappi.get_func_stats()
+ fsh = utils.find_stat_by_name(stats, "handler")
+ self.assertTrue(fsh is not None)
+
+ def test_concurrent_futures(self):
+ yappi.start()
+ from concurrent.futures import ThreadPoolExecutor
+ with ThreadPoolExecutor(max_workers=5) as executor:
+ f = executor.submit(pow, 5, 2)
+ self.assertEqual(f.result(), 25)
+ time.sleep(1.0)
+ yappi.stop()
+
+ def test_barrier(self):
+ yappi.start()
+ b = threading.Barrier(2, timeout=1)
+
+ def worker():
+ try:
+ b.wait()
+ except threading.BrokenBarrierError:
+ pass
+ except Exception:
+ raise Exception("BrokenBarrierError not raised")
+
+ t1 = threading.Thread(target=worker)
+ t1.start()
+ #b.wait()
+ t1.join()
+ yappi.stop()
+
+
+class NonRecursiveFunctions(utils.YappiUnitTestCase):
+
+ def test_abcd(self):
+ _timings = {"a_1": 6, "b_1": 5, "c_1": 3, "d_1": 1}
+ _yappi._set_test_timings(_timings)
+
+ def a():
+ b()
+
+ def b():
+ c()
+
+ def c():
+ d()
+
+ def d():
+ pass
+
+ stats = utils.run_and_get_func_stats(a)
+ fsa = utils.find_stat_by_name(stats, 'a')
+ fsb = utils.find_stat_by_name(stats, 'b')
+ fsc = utils.find_stat_by_name(stats, 'c')
+ fsd = utils.find_stat_by_name(stats, 'd')
+ cfsab = fsa.children[fsb]
+ cfsbc = fsb.children[fsc]
+ cfscd = fsc.children[fsd]
+
+ self.assertEqual(fsa.ttot, 6)
+ self.assertEqual(fsa.tsub, 1)
+ self.assertEqual(fsb.ttot, 5)
+ self.assertEqual(fsb.tsub, 2)
+ self.assertEqual(fsc.ttot, 3)
+ self.assertEqual(fsc.tsub, 2)
+ self.assertEqual(fsd.ttot, 1)
+ self.assertEqual(fsd.tsub, 1)
+ self.assertEqual(cfsab.ttot, 5)
+ self.assertEqual(cfsab.tsub, 2)
+ self.assertEqual(cfsbc.ttot, 3)
+ self.assertEqual(cfsbc.tsub, 2)
+ self.assertEqual(cfscd.ttot, 1)
+ self.assertEqual(cfscd.tsub, 1)
+
+ def test_stop_in_middle(self):
+ _timings = {"a_1": 6, "b_1": 4}
+ _yappi._set_test_timings(_timings)
+
+ def a():
+ b()
+ yappi.stop()
+
+ def b():
+ time.sleep(0.2)
+
+ yappi.start()
+ a()
+ stats = yappi.get_func_stats()
+ fsa = utils.find_stat_by_name(stats, 'a')
+ fsb = utils.find_stat_by_name(stats, 'b')
+
+ self.assertEqual(fsa.ncall, 1)
+ self.assertEqual(fsa.nactualcall, 0)
+ self.assertEqual(fsa.ttot, 0) # no call_leave called
+ self.assertEqual(fsa.tsub, 0) # no call_leave called
+ self.assertEqual(fsb.ttot, 4)
+
+
+class RecursiveFunctions(utils.YappiUnitTestCase):
+
+ def test_fibonacci(self):
+
+ def fib(n):
+ if n > 1:
+ return fib(n - 1) + fib(n - 2)
+ else:
+ return n
+
+ stats = utils.run_and_get_func_stats(fib, 22)
+ fs = utils.find_stat_by_name(stats, 'fib')
+ self.assertEqual(fs.ncall, 57313)
+ self.assertEqual(fs.ttot, fs.tsub)
+
+ def test_abcadc(self):
+ _timings = {
+ "a_1": 20,
+ "b_1": 19,
+ "c_1": 17,
+ "a_2": 13,
+ "d_1": 12,
+ "c_2": 10,
+ "a_3": 5
+ }
+ _yappi._set_test_timings(_timings)
+
+ def a(n):
+ if n == 3:
+ return
+ if n == 1 + 1:
+ d(n)
+ else:
+ b(n)
+
+ def b(n):
+ c(n)
+
+ def c(n):
+ a(n + 1)
+
+ def d(n):
+ c(n)
+
+ stats = utils.run_and_get_func_stats(a, 1)
+ fsa = utils.find_stat_by_name(stats, 'a')
+ fsb = utils.find_stat_by_name(stats, 'b')
+ fsc = utils.find_stat_by_name(stats, 'c')
+ fsd = utils.find_stat_by_name(stats, 'd')
+ self.assertEqual(fsa.ncall, 3)
+ self.assertEqual(fsa.nactualcall, 1)
+ self.assertEqual(fsa.ttot, 20)
+ self.assertEqual(fsa.tsub, 7)
+ self.assertEqual(fsb.ttot, 19)
+ self.assertEqual(fsb.tsub, 2)
+ self.assertEqual(fsc.ttot, 17)
+ self.assertEqual(fsc.tsub, 9)
+ self.assertEqual(fsd.ttot, 12)
+ self.assertEqual(fsd.tsub, 2)
+ cfsca = fsc.children[fsa]
+ self.assertEqual(cfsca.nactualcall, 0)
+ self.assertEqual(cfsca.ncall, 2)
+ self.assertEqual(cfsca.ttot, 13)
+ self.assertEqual(cfsca.tsub, 6)
+
+ def test_aaaa(self):
+ _timings = {"d_1": 9, "d_2": 7, "d_3": 3, "d_4": 2}
+ _yappi._set_test_timings(_timings)
+
+ def d(n):
+ if n == 3:
+ return
+ d(n + 1)
+
+ stats = utils.run_and_get_func_stats(d, 0)
+ fsd = utils.find_stat_by_name(stats, 'd')
+ self.assertEqual(fsd.ncall, 4)
+ self.assertEqual(fsd.nactualcall, 1)
+ self.assertEqual(fsd.ttot, 9)
+ self.assertEqual(fsd.tsub, 9)
+ cfsdd = fsd.children[fsd]
+ self.assertEqual(cfsdd.ttot, 7)
+ self.assertEqual(cfsdd.tsub, 7)
+ self.assertEqual(cfsdd.ncall, 3)
+ self.assertEqual(cfsdd.nactualcall, 0)
+
+ def test_abcabc(self):
+ _timings = {
+ "a_1": 20,
+ "b_1": 19,
+ "c_1": 17,
+ "a_2": 13,
+ "b_2": 11,
+ "c_2": 9,
+ "a_3": 6
+ }
+ _yappi._set_test_timings(_timings)
+
+ def a(n):
+ if n == 3:
+ return
+ else:
+ b(n)
+
+ def b(n):
+ c(n)
+
+ def c(n):
+ a(n + 1)
+
+ stats = utils.run_and_get_func_stats(a, 1)
+ fsa = utils.find_stat_by_name(stats, 'a')
+ fsb = utils.find_stat_by_name(stats, 'b')
+ fsc = utils.find_stat_by_name(stats, 'c')
+ self.assertEqual(fsa.ncall, 3)
+ self.assertEqual(fsa.nactualcall, 1)
+ self.assertEqual(fsa.ttot, 20)
+ self.assertEqual(fsa.tsub, 9)
+ self.assertEqual(fsb.ttot, 19)
+ self.assertEqual(fsb.tsub, 4)
+ self.assertEqual(fsc.ttot, 17)
+ self.assertEqual(fsc.tsub, 7)
+ cfsab = fsa.children[fsb]
+ cfsbc = fsb.children[fsc]
+ cfsca = fsc.children[fsa]
+ self.assertEqual(cfsab.ttot, 19)
+ self.assertEqual(cfsab.tsub, 4)
+ self.assertEqual(cfsbc.ttot, 17)
+ self.assertEqual(cfsbc.tsub, 7)
+ self.assertEqual(cfsca.ttot, 13)
+ self.assertEqual(cfsca.tsub, 8)
+
+ def test_abcbca(self):
+ _timings = {"a_1": 10, "b_1": 9, "c_1": 7, "b_2": 4, "c_2": 2, "a_2": 1}
+ _yappi._set_test_timings(_timings)
+ self._ncall = 1
+
+ def a():
+ if self._ncall == 1:
+ b()
+ else:
+ return
+
+ def b():
+ c()
+
+ def c():
+ if self._ncall == 1:
+ self._ncall += 1
+ b()
+ else:
+ a()
+
+ stats = utils.run_and_get_func_stats(a)
+ fsa = utils.find_stat_by_name(stats, 'a')
+ fsb = utils.find_stat_by_name(stats, 'b')
+ fsc = utils.find_stat_by_name(stats, 'c')
+ cfsab = fsa.children[fsb]
+ cfsbc = fsb.children[fsc]
+ cfsca = fsc.children[fsa]
+ self.assertEqual(fsa.ttot, 10)
+ self.assertEqual(fsa.tsub, 2)
+ self.assertEqual(fsb.ttot, 9)
+ self.assertEqual(fsb.tsub, 4)
+ self.assertEqual(fsc.ttot, 7)
+ self.assertEqual(fsc.tsub, 4)
+ self.assertEqual(cfsab.ttot, 9)
+ self.assertEqual(cfsab.tsub, 2)
+ self.assertEqual(cfsbc.ttot, 7)
+ self.assertEqual(cfsbc.tsub, 4)
+ self.assertEqual(cfsca.ttot, 1)
+ self.assertEqual(cfsca.tsub, 1)
+ self.assertEqual(cfsca.ncall, 1)
+ self.assertEqual(cfsca.nactualcall, 0)
+
+ def test_aabccb(self):
+ _timings = {
+ "a_1": 13,
+ "a_2": 11,
+ "b_1": 9,
+ "c_1": 5,
+ "c_2": 3,
+ "b_2": 1
+ }
+ _yappi._set_test_timings(_timings)
+ self._ncall = 1
+
+ def a():
+ if self._ncall == 1:
+ self._ncall += 1
+ a()
+ else:
+ b()
+
+ def b():
+ if self._ncall == 3:
+ return
+ else:
+ c()
+
+ def c():
+ if self._ncall == 2:
+ self._ncall += 1
+ c()
+ else:
+ b()
+
+ stats = utils.run_and_get_func_stats(a)
+ fsa = utils.find_stat_by_name(stats, 'a')
+ fsb = utils.find_stat_by_name(stats, 'b')
+ fsc = utils.find_stat_by_name(stats, 'c')
+ cfsaa = fsa.children[fsa.index]
+ cfsab = fsa.children[fsb]
+ cfsbc = fsb.children[fsc.full_name]
+ cfscc = fsc.children[fsc]
+ cfscb = fsc.children[fsb]
+ self.assertEqual(fsb.ttot, 9)
+ self.assertEqual(fsb.tsub, 5)
+ self.assertEqual(cfsbc.ttot, 5)
+ self.assertEqual(cfsbc.tsub, 2)
+ self.assertEqual(fsa.ttot, 13)
+ self.assertEqual(fsa.tsub, 4)
+ self.assertEqual(cfsab.ttot, 9)
+ self.assertEqual(cfsab.tsub, 4)
+ self.assertEqual(cfsaa.ttot, 11)
+ self.assertEqual(cfsaa.tsub, 2)
+ self.assertEqual(fsc.ttot, 5)
+ self.assertEqual(fsc.tsub, 4)
+
+ def test_abaa(self):
+ _timings = {"a_1": 13, "b_1": 10, "a_2": 9, "a_3": 5}
+ _yappi._set_test_timings(_timings)
+
+ self._ncall = 1
+
+ def a():
+ if self._ncall == 1:
+ b()
+ elif self._ncall == 2:
+ self._ncall += 1
+ a()
+ else:
+ return
+
+ def b():
+ self._ncall += 1
+ a()
+
+ stats = utils.run_and_get_func_stats(a)
+ fsa = utils.find_stat_by_name(stats, 'a')
+ fsb = utils.find_stat_by_name(stats, 'b')
+ cfsaa = fsa.children[fsa]
+ cfsba = fsb.children[fsa]
+ self.assertEqual(fsb.ttot, 10)
+ self.assertEqual(fsb.tsub, 1)
+ self.assertEqual(fsa.ttot, 13)
+ self.assertEqual(fsa.tsub, 12)
+ self.assertEqual(cfsaa.ttot, 5)
+ self.assertEqual(cfsaa.tsub, 5)
+ self.assertEqual(cfsba.ttot, 9)
+ self.assertEqual(cfsba.tsub, 4)
+
+ def test_aabb(self):
+ _timings = {"a_1": 13, "a_2": 10, "b_1": 9, "b_2": 5}
+ _yappi._set_test_timings(_timings)
+
+ self._ncall = 1
+
+ def a():
+ if self._ncall == 1:
+ self._ncall += 1
+ a()
+ elif self._ncall == 2:
+ b()
+ else:
+ return
+
+ def b():
+ if self._ncall == 2:
+ self._ncall += 1
+ b()
+ else:
+ return
+
+ stats = utils.run_and_get_func_stats(a)
+ fsa = utils.find_stat_by_name(stats, 'a')
+ fsb = utils.find_stat_by_name(stats, 'b')
+ cfsaa = fsa.children[fsa]
+ cfsab = fsa.children[fsb]
+ cfsbb = fsb.children[fsb]
+ self.assertEqual(fsa.ttot, 13)
+ self.assertEqual(fsa.tsub, 4)
+ self.assertEqual(fsb.ttot, 9)
+ self.assertEqual(fsb.tsub, 9)
+ self.assertEqual(cfsaa.ttot, 10)
+ self.assertEqual(cfsaa.tsub, 1)
+ self.assertEqual(cfsab.ttot, 9)
+ self.assertEqual(cfsab.tsub, 4)
+ self.assertEqual(cfsbb.ttot, 5)
+ self.assertEqual(cfsbb.tsub, 5)
+
+ def test_abbb(self):
+ _timings = {"a_1": 13, "b_1": 10, "b_2": 6, "b_3": 1}
+ _yappi._set_test_timings(_timings)
+
+ self._ncall = 1
+
+ def a():
+ if self._ncall == 1:
+ b()
+
+ def b():
+ if self._ncall == 3:
+ return
+ self._ncall += 1
+ b()
+
+ stats = utils.run_and_get_func_stats(a)
+ fsa = utils.find_stat_by_name(stats, 'a')
+ fsb = utils.find_stat_by_name(stats, 'b')
+ cfsab = fsa.children[fsb]
+ cfsbb = fsb.children[fsb]
+ self.assertEqual(fsa.ttot, 13)
+ self.assertEqual(fsa.tsub, 3)
+ self.assertEqual(fsb.ttot, 10)
+ self.assertEqual(fsb.tsub, 10)
+ self.assertEqual(fsb.ncall, 3)
+ self.assertEqual(fsb.nactualcall, 1)
+ self.assertEqual(cfsab.ttot, 10)
+ self.assertEqual(cfsab.tsub, 4)
+ self.assertEqual(cfsbb.ttot, 6)
+ self.assertEqual(cfsbb.tsub, 6)
+ self.assertEqual(cfsbb.nactualcall, 0)
+ self.assertEqual(cfsbb.ncall, 2)
+
+ def test_aaab(self):
+ _timings = {"a_1": 13, "a_2": 10, "a_3": 6, "b_1": 1}
+ _yappi._set_test_timings(_timings)
+
+ self._ncall = 1
+
+ def a():
+ if self._ncall == 3:
+ b()
+ return
+ self._ncall += 1
+ a()
+
+ def b():
+ return
+
+ stats = utils.run_and_get_func_stats(a)
+ fsa = utils.find_stat_by_name(stats, 'a')
+ fsb = utils.find_stat_by_name(stats, 'b')
+ cfsaa = fsa.children[fsa]
+ cfsab = fsa.children[fsb]
+ self.assertEqual(fsa.ttot, 13)
+ self.assertEqual(fsa.tsub, 12)
+ self.assertEqual(fsb.ttot, 1)
+ self.assertEqual(fsb.tsub, 1)
+ self.assertEqual(cfsaa.ttot, 10)
+ self.assertEqual(cfsaa.tsub, 9)
+ self.assertEqual(cfsab.ttot, 1)
+ self.assertEqual(cfsab.tsub, 1)
+
+ def test_abab(self):
+ _timings = {"a_1": 13, "b_1": 10, "a_2": 6, "b_2": 1}
+ _yappi._set_test_timings(_timings)
+
+ self._ncall = 1
+
+ def a():
+ b()
+
+ def b():
+ if self._ncall == 2:
+ return
+ self._ncall += 1
+ a()
+
+ stats = utils.run_and_get_func_stats(a)
+ fsa = utils.find_stat_by_name(stats, 'a')
+ fsb = utils.find_stat_by_name(stats, 'b')
+ cfsab = fsa.children[fsb]
+ cfsba = fsb.children[fsa]
+ self.assertEqual(fsa.ttot, 13)
+ self.assertEqual(fsa.tsub, 8)
+ self.assertEqual(fsb.ttot, 10)
+ self.assertEqual(fsb.tsub, 5)
+ self.assertEqual(cfsab.ttot, 10)
+ self.assertEqual(cfsab.tsub, 5)
+ self.assertEqual(cfsab.ncall, 2)
+ self.assertEqual(cfsab.nactualcall, 1)
+ self.assertEqual(cfsba.ttot, 6)
+ self.assertEqual(cfsba.tsub, 5)
+
+
+if __name__ == '__main__':
+ # import sys;sys.argv = ['', 'BasicUsage.test_run_as_script']
+ # import sys;sys.argv = ['', 'MultithreadedScenarios.test_subsequent_profile']
+ unittest.main()
--
2.34.1