| From 0136ca731cba8b056b3f2ff0e7df3953b94f1e87 Mon Sep 17 00:00:00 2001 |
| From: Tim Orling <tim.orling@konsulko.com> |
| Date: Sun, 24 Dec 2023 09:41:57 -0800 |
| Subject: [PATCH 1/2] test_functionality: convert line endings to Unix |
| |
| Convert the Windows line endings with dos2unix to be like the |
| other files in tests/* |
| |
| Upstream-Status: Submitted [https://github.com/sumerc/yappi/pull/164] |
| |
| Signed-off-by: Tim Orling <tim.orling@konsulko.com> |
| --- |
| tests/test_functionality.py | 3822 +++++++++++++++++------------------ |
| 1 file changed, 1911 insertions(+), 1911 deletions(-) |
| |
| diff --git a/tests/test_functionality.py b/tests/test_functionality.py |
| index 0e99c47..38bbe67 100644 |
| --- a/tests/test_functionality.py |
| +++ b/tests/test_functionality.py |
| @@ -1,1911 +1,1911 @@ |
| -import os
|
| -import sys
|
| -import time
|
| -import threading
|
| -import unittest
|
| -import yappi
|
| -import _yappi
|
| -import utils
|
| -import multiprocessing
|
| -import subprocess
|
| -
|
| -_counter = 0
|
| -
|
| -
|
| -class BasicUsage(utils.YappiUnitTestCase):
|
| -
|
| - def test_callback_function_int_return_overflow(self):
|
| - # this test is just here to check if any errors are generated, as the err
|
| - # is printed in C side, I did not include it here. THere are ways to test
|
| - # this deterministically, I did not bother
|
| - import ctypes
|
| -
|
| - def _unsigned_overflow_margin():
|
| - return 2**(ctypes.sizeof(ctypes.c_void_p) * 8) - 1
|
| -
|
| - def foo():
|
| - pass
|
| -
|
| - #with utils.captured_output() as (out, err):
|
| - yappi.set_context_id_callback(_unsigned_overflow_margin)
|
| - yappi.set_tag_callback(_unsigned_overflow_margin)
|
| - yappi.start()
|
| - foo()
|
| -
|
| - def test_issue60(self):
|
| -
|
| - def foo():
|
| - buf = bytearray()
|
| - buf += b't' * 200
|
| - view = memoryview(buf)[10:]
|
| - view = view.tobytes()
|
| - del buf[:10] # this throws exception
|
| - return view
|
| -
|
| - yappi.start(builtins=True)
|
| - foo()
|
| - self.assertTrue(
|
| - len(
|
| - yappi.get_func_stats(
|
| - filter_callback=lambda x: yappi.
|
| - func_matches(x, [memoryview.tobytes])
|
| - )
|
| - ) > 0
|
| - )
|
| - yappi.stop()
|
| -
|
| - def test_issue54(self):
|
| -
|
| - def _tag_cbk():
|
| - global _counter
|
| - _counter += 1
|
| - return _counter
|
| -
|
| - def a():
|
| - pass
|
| -
|
| - def b():
|
| - pass
|
| -
|
| - yappi.set_tag_callback(_tag_cbk)
|
| - yappi.start()
|
| - a()
|
| - a()
|
| - a()
|
| - yappi.stop()
|
| - stats = yappi.get_func_stats()
|
| - self.assertEqual(stats.pop().ncall, 3) # aggregated if no tag is given
|
| - stats = yappi.get_func_stats(tag=1)
|
| -
|
| - for i in range(1, 3):
|
| - stats = yappi.get_func_stats(tag=i)
|
| - stats = yappi.get_func_stats(
|
| - tag=i, filter_callback=lambda x: yappi.func_matches(x, [a])
|
| - )
|
| -
|
| - stat = stats.pop()
|
| - self.assertEqual(stat.ncall, 1)
|
| -
|
| - yappi.set_tag_callback(None)
|
| - yappi.clear_stats()
|
| - yappi.start()
|
| - b()
|
| - b()
|
| - stats = yappi.get_func_stats()
|
| - self.assertEqual(len(stats), 1)
|
| - stat = stats.pop()
|
| - self.assertEqual(stat.ncall, 2)
|
| -
|
| - def test_filter(self):
|
| -
|
| - def a():
|
| - pass
|
| -
|
| - def b():
|
| - a()
|
| -
|
| - def c():
|
| - b()
|
| -
|
| - _TCOUNT = 5
|
| -
|
| - ts = []
|
| - yappi.start()
|
| - for i in range(_TCOUNT):
|
| - t = threading.Thread(target=c)
|
| - t.start()
|
| - ts.append(t)
|
| -
|
| - for t in ts:
|
| - t.join()
|
| -
|
| - yappi.stop()
|
| -
|
| - ctx_ids = []
|
| - for tstat in yappi.get_thread_stats():
|
| - if tstat.name == '_MainThread':
|
| - main_ctx_id = tstat.id
|
| - else:
|
| - ctx_ids.append(tstat.id)
|
| -
|
| - fstats = yappi.get_func_stats(filter={"ctx_id": 9})
|
| - self.assertTrue(fstats.empty())
|
| - fstats = yappi.get_func_stats(
|
| - filter={
|
| - "ctx_id": main_ctx_id,
|
| - "name": "c"
|
| - }
|
| - ) # main thread
|
| - self.assertTrue(fstats.empty())
|
| -
|
| - for i in ctx_ids:
|
| - fstats = yappi.get_func_stats(
|
| - filter={
|
| - "ctx_id": i,
|
| - "name": "a",
|
| - "ncall": 1
|
| - }
|
| - )
|
| - self.assertEqual(fstats.pop().ncall, 1)
|
| - fstats = yappi.get_func_stats(filter={"ctx_id": i, "name": "b"})
|
| - self.assertEqual(fstats.pop().ncall, 1)
|
| - fstats = yappi.get_func_stats(filter={"ctx_id": i, "name": "c"})
|
| - self.assertEqual(fstats.pop().ncall, 1)
|
| -
|
| - yappi.clear_stats()
|
| - yappi.start(builtins=True)
|
| - time.sleep(0.1)
|
| - yappi.stop()
|
| - fstats = yappi.get_func_stats(filter={"module": "time"})
|
| - self.assertEqual(len(fstats), 1)
|
| -
|
| - # invalid filters`
|
| - self.assertRaises(
|
| - Exception, yappi.get_func_stats, filter={'tag': "sss"}
|
| - )
|
| - self.assertRaises(
|
| - Exception, yappi.get_func_stats, filter={'ctx_id': "None"}
|
| - )
|
| -
|
| - def test_filter_callback(self):
|
| -
|
| - def a():
|
| - time.sleep(0.1)
|
| -
|
| - def b():
|
| - a()
|
| -
|
| - def c():
|
| - pass
|
| -
|
| - def d():
|
| - pass
|
| -
|
| - yappi.set_clock_type("wall")
|
| - yappi.start(builtins=True)
|
| - a()
|
| - b()
|
| - c()
|
| - d()
|
| - stats = yappi.get_func_stats(
|
| - filter_callback=lambda x: yappi.func_matches(x, [a, b])
|
| - )
|
| - #stats.print_all()
|
| - r1 = '''
|
| - tests/test_functionality.py:98 a 2 0.000000 0.200350 0.100175
|
| - tests/test_functionality.py:101 b 1 0.000000 0.120000 0.100197
|
| - '''
|
| - self.assert_traces_almost_equal(r1, stats)
|
| - self.assertEqual(len(stats), 2)
|
| - stats = yappi.get_func_stats(
|
| - filter_callback=lambda x: yappi.
|
| - module_matches(x, [sys.modules[__name__]])
|
| - )
|
| - r1 = '''
|
| - tests/test_functionality.py:98 a 2 0.000000 0.230130 0.115065
|
| - tests/test_functionality.py:101 b 1 0.000000 0.120000 0.109011
|
| - tests/test_functionality.py:104 c 1 0.000000 0.000002 0.000002
|
| - tests/test_functionality.py:107 d 1 0.000000 0.000001 0.000001
|
| - '''
|
| - self.assert_traces_almost_equal(r1, stats)
|
| - self.assertEqual(len(stats), 4)
|
| -
|
| - stats = yappi.get_func_stats(
|
| - filter_callback=lambda x: yappi.func_matches(x, [time.sleep])
|
| - )
|
| - self.assertEqual(len(stats), 1)
|
| - r1 = '''
|
| - time.sleep 2 0.206804 0.220000 0.103402
|
| - '''
|
| - self.assert_traces_almost_equal(r1, stats)
|
| -
|
| - def test_print_formatting(self):
|
| -
|
| - def a():
|
| - pass
|
| -
|
| - def b():
|
| - a()
|
| -
|
| - func_cols = {
|
| - 1: ("name", 48),
|
| - 0: ("ncall", 5),
|
| - 2: ("tsub", 8),
|
| - }
|
| - thread_cols = {
|
| - 1: ("name", 48),
|
| - 0: ("ttot", 8),
|
| - }
|
| -
|
| - yappi.start()
|
| - a()
|
| - b()
|
| - yappi.stop()
|
| - fs = yappi.get_func_stats()
|
| - cs = fs[1].children
|
| - ts = yappi.get_thread_stats()
|
| - #fs.print_all(out=sys.stderr, columns={1:("name", 70), })
|
| - #cs.print_all(out=sys.stderr, columns=func_cols)
|
| - #ts.print_all(out=sys.stderr, columns=thread_cols)
|
| - #cs.print_all(out=sys.stderr, columns={})
|
| -
|
| - self.assertRaises(
|
| - yappi.YappiError, fs.print_all, columns={1: ("namee", 9)}
|
| - )
|
| - self.assertRaises(
|
| - yappi.YappiError, cs.print_all, columns={1: ("dd", 0)}
|
| - )
|
| - self.assertRaises(
|
| - yappi.YappiError, ts.print_all, columns={1: ("tidd", 0)}
|
| - )
|
| -
|
| - def test_get_clock(self):
|
| - yappi.set_clock_type('cpu')
|
| - self.assertEqual('cpu', yappi.get_clock_type())
|
| - clock_info = yappi.get_clock_info()
|
| - self.assertTrue('api' in clock_info)
|
| - self.assertTrue('resolution' in clock_info)
|
| -
|
| - yappi.set_clock_type('wall')
|
| - self.assertEqual('wall', yappi.get_clock_type())
|
| -
|
| - t0 = yappi.get_clock_time()
|
| - time.sleep(0.1)
|
| - duration = yappi.get_clock_time() - t0
|
| - self.assertTrue(0.05 < duration < 0.3)
|
| -
|
| - def test_profile_decorator(self):
|
| -
|
| - def aggregate(func, stats):
|
| - fname = f"tests/{func.__name__}.profile"
|
| - try:
|
| - stats.add(fname)
|
| - except OSError:
|
| - pass
|
| - stats.save(fname)
|
| - raise Exception("messing around")
|
| -
|
| - @yappi.profile(return_callback=aggregate)
|
| - def a(x, y):
|
| - if x + y == 25:
|
| - raise Exception("")
|
| - return x + y
|
| -
|
| - def b():
|
| - pass
|
| -
|
| - try:
|
| - os.remove(
|
| - "tests/a.profile"
|
| - ) # remove the one from prev test, if available
|
| - except:
|
| - pass
|
| -
|
| - # global profile is on to mess things up
|
| - yappi.start()
|
| - b()
|
| -
|
| - # assert functionality and call function at same time
|
| - try:
|
| - self.assertEqual(a(1, 2), 3)
|
| - except:
|
| - pass
|
| - try:
|
| - self.assertEqual(a(2, 5), 7)
|
| - except:
|
| - pass
|
| - try:
|
| - a(4, 21)
|
| - except:
|
| - pass
|
| - stats = yappi.get_func_stats().add("tests/a.profile")
|
| - fsa = utils.find_stat_by_name(stats, 'a')
|
| - self.assertEqual(fsa.ncall, 3)
|
| - self.assertEqual(len(stats), 1) # b() should be cleared out.
|
| -
|
| - @yappi.profile(return_callback=aggregate)
|
| - def count_down_rec(n):
|
| - if n == 0:
|
| - return
|
| - count_down_rec(n - 1)
|
| -
|
| - try:
|
| - os.remove(
|
| - "tests/count_down_rec.profile"
|
| - ) # remove the one from prev test, if available
|
| - except:
|
| - pass
|
| -
|
| - try:
|
| - count_down_rec(4)
|
| - except:
|
| - pass
|
| - try:
|
| - count_down_rec(3)
|
| - except:
|
| - pass
|
| -
|
| - stats = yappi.YFuncStats("tests/count_down_rec.profile")
|
| - fsrec = utils.find_stat_by_name(stats, 'count_down_rec')
|
| - self.assertEqual(fsrec.ncall, 9)
|
| - self.assertEqual(fsrec.nactualcall, 2)
|
| -
|
| - def test_strip_dirs(self):
|
| -
|
| - def a():
|
| - pass
|
| -
|
| - stats = utils.run_and_get_func_stats(a, )
|
| - stats.strip_dirs()
|
| - fsa = utils.find_stat_by_name(stats, "a")
|
| - self.assertEqual(fsa.module, os.path.basename(fsa.module))
|
| -
|
| - @unittest.skipIf(os.name == "nt", "do not run on Windows")
|
| - def test_run_as_script(self):
|
| - import re
|
| - p = subprocess.Popen(
|
| - ['yappi', os.path.join('./tests', 'run_as_script.py')],
|
| - stdout=subprocess.PIPE
|
| - )
|
| - out, err = p.communicate()
|
| - self.assertEqual(p.returncode, 0)
|
| - func_stats, thread_stats = re.split(
|
| - b'name\\s+id\\s+tid\\s+ttot\\s+scnt\\s*\n', out
|
| - )
|
| - self.assertTrue(b'FancyThread' in thread_stats)
|
| -
|
| - def test_yappi_overhead(self):
|
| - LOOP_COUNT = 100000
|
| -
|
| - def a():
|
| - pass
|
| -
|
| - def b():
|
| - for i in range(LOOP_COUNT):
|
| - a()
|
| -
|
| - t0 = time.time()
|
| - yappi.start()
|
| - b()
|
| - yappi.stop()
|
| - time_with_yappi = time.time() - t0
|
| - t0 = time.time()
|
| - b()
|
| - time_without_yappi = time.time() - t0
|
| - if time_without_yappi == 0:
|
| - time_without_yappi = 0.000001
|
| -
|
| - # in latest v0.82, I calculated this as close to "7.0" in my machine.
|
| - # however, %83 of this overhead is coming from tickcount(). The other %17
|
| - # seems to have been evenly distributed to the internal bookkeeping
|
| - # structures/algorithms which seems acceptable. Note that our test only
|
| - # tests one function being profiled at-a-time in a short interval.
|
| - # profiling high number of functions in a small time
|
| - # is a different beast, (which is pretty unlikely in most applications)
|
| - # So as a conclusion: I cannot see any optimization window for Yappi that
|
| - # is worth implementing as we will only optimize %17 of the time.
|
| - sys.stderr.write("\r\nYappi puts %0.1f times overhead to the profiled application in average.\r\n" % \
|
| - (time_with_yappi / time_without_yappi))
|
| -
|
| - def test_clear_stats_while_running(self):
|
| -
|
| - def a():
|
| - pass
|
| -
|
| - yappi.start()
|
| - a()
|
| - yappi.clear_stats()
|
| - a()
|
| - stats = yappi.get_func_stats()
|
| - fsa = utils.find_stat_by_name(stats, 'a')
|
| - self.assertEqual(fsa.ncall, 1)
|
| -
|
| - def test_generator(self):
|
| -
|
| - def _gen(n):
|
| - while (n > 0):
|
| - yield n
|
| - n -= 1
|
| -
|
| - yappi.start()
|
| - for x in _gen(5):
|
| - pass
|
| - self.assertTrue(
|
| - yappi.convert2pstats(yappi.get_func_stats()) is not None
|
| - )
|
| -
|
| - def test_slice_child_stats_and_strip_dirs(self):
|
| -
|
| - def b():
|
| - for i in range(10000000):
|
| - pass
|
| -
|
| - def a():
|
| - b()
|
| -
|
| - yappi.start(builtins=True)
|
| - a()
|
| - stats = yappi.get_func_stats()
|
| - fsa = utils.find_stat_by_name(stats, 'a')
|
| - fsb = utils.find_stat_by_name(stats, 'b')
|
| - self.assertTrue(fsa.children[0:1] is not None)
|
| - prev_afullname = fsa.full_name
|
| - prev_bchildfullname = fsa.children[fsb].full_name
|
| - stats.strip_dirs()
|
| - self.assertTrue(len(prev_afullname) > len(fsa.full_name))
|
| - self.assertTrue(
|
| - len(prev_bchildfullname) > len(fsa.children[fsb].full_name)
|
| - )
|
| -
|
| - def test_children_stat_functions(self):
|
| - _timings = {"a_1": 5, "b_1": 3, "c_1": 1}
|
| - _yappi._set_test_timings(_timings)
|
| -
|
| - def b():
|
| - pass
|
| -
|
| - def c():
|
| - pass
|
| -
|
| - def a():
|
| - b()
|
| - c()
|
| -
|
| - yappi.start()
|
| - a()
|
| - b() # non-child call
|
| - c() # non-child call
|
| - stats = yappi.get_func_stats()
|
| - fsa = utils.find_stat_by_name(stats, 'a')
|
| - childs_of_a = fsa.children.get().sort("tavg", "desc")
|
| - prev_item = None
|
| - for item in childs_of_a:
|
| - if prev_item:
|
| - self.assertTrue(prev_item.tavg > item.tavg)
|
| - prev_item = item
|
| - childs_of_a.sort("name", "desc")
|
| - prev_item = None
|
| - for item in childs_of_a:
|
| - if prev_item:
|
| - self.assertTrue(prev_item.name > item.name)
|
| - prev_item = item
|
| - childs_of_a.clear()
|
| - self.assertTrue(childs_of_a.empty())
|
| -
|
| - def test_no_stats_different_clock_type_load(self):
|
| -
|
| - def a():
|
| - pass
|
| -
|
| - yappi.start()
|
| - a()
|
| - yappi.stop()
|
| - yappi.get_func_stats().save("tests/ystats1.ys")
|
| - yappi.clear_stats()
|
| - yappi.set_clock_type("WALL")
|
| - yappi.start()
|
| - yappi.stop()
|
| - stats = yappi.get_func_stats().add("tests/ystats1.ys")
|
| - fsa = utils.find_stat_by_name(stats, 'a')
|
| - self.assertTrue(fsa is not None)
|
| -
|
| - def test_subsequent_profile(self):
|
| - _timings = {"a_1": 1, "b_1": 1}
|
| - _yappi._set_test_timings(_timings)
|
| -
|
| - def a():
|
| - pass
|
| -
|
| - def b():
|
| - pass
|
| -
|
| - yappi.start()
|
| - a()
|
| - yappi.stop()
|
| - yappi.start()
|
| - b()
|
| - yappi.stop()
|
| - stats = yappi.get_func_stats()
|
| - fsa = utils.find_stat_by_name(stats, 'a')
|
| - fsb = utils.find_stat_by_name(stats, 'b')
|
| - self.assertTrue(fsa is not None)
|
| - self.assertTrue(fsb is not None)
|
| - self.assertEqual(fsa.ttot, 1)
|
| - self.assertEqual(fsb.ttot, 1)
|
| -
|
| - def test_lambda(self):
|
| - f = lambda: time.sleep(0.3)
|
| - yappi.set_clock_type("wall")
|
| - yappi.start()
|
| - f()
|
| - stats = yappi.get_func_stats()
|
| - fsa = utils.find_stat_by_name(stats, '<lambda>')
|
| - self.assertTrue(fsa.ttot > 0.1)
|
| -
|
| - def test_module_stress(self):
|
| - self.assertEqual(yappi.is_running(), False)
|
| -
|
| - yappi.start()
|
| - yappi.clear_stats()
|
| - self.assertRaises(_yappi.error, yappi.set_clock_type, "wall")
|
| -
|
| - yappi.stop()
|
| - yappi.clear_stats()
|
| - yappi.set_clock_type("cpu")
|
| - self.assertRaises(yappi.YappiError, yappi.set_clock_type, "dummy")
|
| - self.assertEqual(yappi.is_running(), False)
|
| - yappi.clear_stats()
|
| - yappi.clear_stats()
|
| -
|
| - def test_stat_sorting(self):
|
| - _timings = {"a_1": 13, "b_1": 10, "a_2": 6, "b_2": 1}
|
| - _yappi._set_test_timings(_timings)
|
| -
|
| - self._ncall = 1
|
| -
|
| - def a():
|
| - b()
|
| -
|
| - def b():
|
| - if self._ncall == 2:
|
| - return
|
| - self._ncall += 1
|
| - a()
|
| -
|
| - stats = utils.run_and_get_func_stats(a)
|
| - stats = stats.sort("totaltime", "desc")
|
| - prev_stat = None
|
| - for stat in stats:
|
| - if prev_stat:
|
| - self.assertTrue(prev_stat.ttot >= stat.ttot)
|
| - prev_stat = stat
|
| - stats = stats.sort("totaltime", "asc")
|
| - prev_stat = None
|
| - for stat in stats:
|
| - if prev_stat:
|
| - self.assertTrue(prev_stat.ttot <= stat.ttot)
|
| - prev_stat = stat
|
| - stats = stats.sort("avgtime", "asc")
|
| - prev_stat = None
|
| - for stat in stats:
|
| - if prev_stat:
|
| - self.assertTrue(prev_stat.tavg <= stat.tavg)
|
| - prev_stat = stat
|
| - stats = stats.sort("name", "asc")
|
| - prev_stat = None
|
| - for stat in stats:
|
| - if prev_stat:
|
| - self.assertTrue(prev_stat.name <= stat.name)
|
| - prev_stat = stat
|
| - stats = stats.sort("subtime", "asc")
|
| - prev_stat = None
|
| - for stat in stats:
|
| - if prev_stat:
|
| - self.assertTrue(prev_stat.tsub <= stat.tsub)
|
| - prev_stat = stat
|
| -
|
| - self.assertRaises(
|
| - yappi.YappiError, stats.sort, "invalid_func_sorttype_arg"
|
| - )
|
| - self.assertRaises(
|
| - yappi.YappiError, stats.sort, "totaltime",
|
| - "invalid_func_sortorder_arg"
|
| - )
|
| -
|
| - def test_start_flags(self):
|
| - self.assertEqual(_yappi._get_start_flags(), None)
|
| - yappi.start()
|
| -
|
| - def a():
|
| - pass
|
| -
|
| - a()
|
| - self.assertEqual(_yappi._get_start_flags()["profile_builtins"], 0)
|
| - self.assertEqual(_yappi._get_start_flags()["profile_multicontext"], 1)
|
| - self.assertEqual(len(yappi.get_thread_stats()), 1)
|
| -
|
| - def test_builtin_profiling(self):
|
| -
|
| - def a():
|
| - time.sleep(0.4) # is a builtin function
|
| -
|
| - yappi.set_clock_type('wall')
|
| -
|
| - yappi.start(builtins=True)
|
| - a()
|
| - stats = yappi.get_func_stats()
|
| - fsa = utils.find_stat_by_name(stats, 'sleep')
|
| - self.assertTrue(fsa is not None)
|
| - self.assertTrue(fsa.ttot > 0.3)
|
| - yappi.stop()
|
| - yappi.clear_stats()
|
| -
|
| - def a():
|
| - pass
|
| -
|
| - yappi.start()
|
| - t = threading.Thread(target=a)
|
| - t.start()
|
| - t.join()
|
| - stats = yappi.get_func_stats()
|
| -
|
| - def test_singlethread_profiling(self):
|
| - yappi.set_clock_type('wall')
|
| -
|
| - def a():
|
| - time.sleep(0.2)
|
| -
|
| - class Worker1(threading.Thread):
|
| -
|
| - def a(self):
|
| - time.sleep(0.3)
|
| -
|
| - def run(self):
|
| - self.a()
|
| -
|
| - yappi.start(profile_threads=False)
|
| -
|
| - c = Worker1()
|
| - c.start()
|
| - c.join()
|
| - a()
|
| - stats = yappi.get_func_stats()
|
| - fsa1 = utils.find_stat_by_name(stats, 'Worker1.a')
|
| - fsa2 = utils.find_stat_by_name(stats, 'a')
|
| - self.assertTrue(fsa1 is None)
|
| - self.assertTrue(fsa2 is not None)
|
| - self.assertTrue(fsa2.ttot > 0.1)
|
| -
|
| - def test_run(self):
|
| -
|
| - def profiled():
|
| - pass
|
| -
|
| - yappi.clear_stats()
|
| - try:
|
| - with yappi.run():
|
| - profiled()
|
| - stats = yappi.get_func_stats()
|
| - finally:
|
| - yappi.clear_stats()
|
| -
|
| - self.assertIsNotNone(utils.find_stat_by_name(stats, 'profiled'))
|
| -
|
| - def test_run_recursive(self):
|
| -
|
| - def profiled():
|
| - pass
|
| -
|
| - def not_profiled():
|
| - pass
|
| -
|
| - yappi.clear_stats()
|
| - try:
|
| - with yappi.run():
|
| - with yappi.run():
|
| - profiled()
|
| - # Profiling stopped here
|
| - not_profiled()
|
| - stats = yappi.get_func_stats()
|
| - finally:
|
| - yappi.clear_stats()
|
| -
|
| - self.assertIsNotNone(utils.find_stat_by_name(stats, 'profiled'))
|
| - self.assertIsNone(utils.find_stat_by_name(stats, 'not_profiled'))
|
| -
|
| -
|
| -class StatSaveScenarios(utils.YappiUnitTestCase):
|
| -
|
| - def test_pstats_conversion(self):
|
| -
|
| - def pstat_id(fs):
|
| - return (fs.module, fs.lineno, fs.name)
|
| -
|
| - def a():
|
| - d()
|
| -
|
| - def b():
|
| - d()
|
| -
|
| - def c():
|
| - pass
|
| -
|
| - def d():
|
| - pass
|
| -
|
| - _timings = {"a_1": 12, "b_1": 7, "c_1": 5, "d_1": 2}
|
| - _yappi._set_test_timings(_timings)
|
| - stats = utils.run_and_get_func_stats(a, )
|
| - stats.strip_dirs()
|
| - stats.save("tests/a1.pstats", type="pstat")
|
| - fsa_pid = pstat_id(utils.find_stat_by_name(stats, "a"))
|
| - fsd_pid = pstat_id(utils.find_stat_by_name(stats, "d"))
|
| - yappi.clear_stats()
|
| - _yappi._set_test_timings(_timings)
|
| - stats = utils.run_and_get_func_stats(a, )
|
| - stats.strip_dirs()
|
| - stats.save("tests/a2.pstats", type="pstat")
|
| - yappi.clear_stats()
|
| - _yappi._set_test_timings(_timings)
|
| - stats = utils.run_and_get_func_stats(b, )
|
| - stats.strip_dirs()
|
| - stats.save("tests/b1.pstats", type="pstat")
|
| - fsb_pid = pstat_id(utils.find_stat_by_name(stats, "b"))
|
| - yappi.clear_stats()
|
| - _yappi._set_test_timings(_timings)
|
| - stats = utils.run_and_get_func_stats(c, )
|
| - stats.strip_dirs()
|
| - stats.save("tests/c1.pstats", type="pstat")
|
| - fsc_pid = pstat_id(utils.find_stat_by_name(stats, "c"))
|
| -
|
| - # merge saved stats and check pstats values are correct
|
| - import pstats
|
| - p = pstats.Stats(
|
| - 'tests/a1.pstats', 'tests/a2.pstats', 'tests/b1.pstats',
|
| - 'tests/c1.pstats'
|
| - )
|
| - p.strip_dirs()
|
| - # ct = ttot, tt = tsub
|
| - (cc, nc, tt, ct, callers) = p.stats[fsa_pid]
|
| - self.assertEqual(cc, nc, 2)
|
| - self.assertEqual(tt, 20)
|
| - self.assertEqual(ct, 24)
|
| - (cc, nc, tt, ct, callers) = p.stats[fsd_pid]
|
| - self.assertEqual(cc, nc, 3)
|
| - self.assertEqual(tt, 6)
|
| - self.assertEqual(ct, 6)
|
| - self.assertEqual(len(callers), 2)
|
| - (cc, nc, tt, ct) = callers[fsa_pid]
|
| - self.assertEqual(cc, nc, 2)
|
| - self.assertEqual(tt, 4)
|
| - self.assertEqual(ct, 4)
|
| - (cc, nc, tt, ct) = callers[fsb_pid]
|
| - self.assertEqual(cc, nc, 1)
|
| - self.assertEqual(tt, 2)
|
| - self.assertEqual(ct, 2)
|
| -
|
| - def test_merge_stats(self):
|
| - _timings = {
|
| - "a_1": 15,
|
| - "b_1": 14,
|
| - "c_1": 12,
|
| - "d_1": 10,
|
| - "e_1": 9,
|
| - "f_1": 7,
|
| - "g_1": 6,
|
| - "h_1": 5,
|
| - "i_1": 1
|
| - }
|
| - _yappi._set_test_timings(_timings)
|
| -
|
| - def a():
|
| - b()
|
| -
|
| - def b():
|
| - c()
|
| -
|
| - def c():
|
| - d()
|
| -
|
| - def d():
|
| - e()
|
| -
|
| - def e():
|
| - f()
|
| -
|
| - def f():
|
| - g()
|
| -
|
| - def g():
|
| - h()
|
| -
|
| - def h():
|
| - i()
|
| -
|
| - def i():
|
| - pass
|
| -
|
| - yappi.start()
|
| - a()
|
| - a()
|
| - yappi.stop()
|
| - stats = yappi.get_func_stats()
|
| - self.assertRaises(
|
| - NotImplementedError, stats.save, "", "INVALID_SAVE_TYPE"
|
| - )
|
| - stats.save("tests/ystats2.ys")
|
| - yappi.clear_stats()
|
| - _yappi._set_test_timings(_timings)
|
| - yappi.start()
|
| - a()
|
| - stats = yappi.get_func_stats().add("tests/ystats2.ys")
|
| - fsa = utils.find_stat_by_name(stats, "a")
|
| - fsb = utils.find_stat_by_name(stats, "b")
|
| - fsc = utils.find_stat_by_name(stats, "c")
|
| - fsd = utils.find_stat_by_name(stats, "d")
|
| - fse = utils.find_stat_by_name(stats, "e")
|
| - fsf = utils.find_stat_by_name(stats, "f")
|
| - fsg = utils.find_stat_by_name(stats, "g")
|
| - fsh = utils.find_stat_by_name(stats, "h")
|
| - fsi = utils.find_stat_by_name(stats, "i")
|
| - self.assertEqual(fsa.ttot, 45)
|
| - self.assertEqual(fsa.ncall, 3)
|
| - self.assertEqual(fsa.nactualcall, 3)
|
| - self.assertEqual(fsa.tsub, 3)
|
| - self.assertEqual(fsa.children[fsb].ttot, fsb.ttot)
|
| - self.assertEqual(fsa.children[fsb].tsub, fsb.tsub)
|
| - self.assertEqual(fsb.children[fsc].ttot, fsc.ttot)
|
| - self.assertEqual(fsb.children[fsc].tsub, fsc.tsub)
|
| - self.assertEqual(fsc.tsub, 6)
|
| - self.assertEqual(fsc.children[fsd].ttot, fsd.ttot)
|
| - self.assertEqual(fsc.children[fsd].tsub, fsd.tsub)
|
| - self.assertEqual(fsd.children[fse].ttot, fse.ttot)
|
| - self.assertEqual(fsd.children[fse].tsub, fse.tsub)
|
| - self.assertEqual(fse.children[fsf].ttot, fsf.ttot)
|
| - self.assertEqual(fse.children[fsf].tsub, fsf.tsub)
|
| - self.assertEqual(fsf.children[fsg].ttot, fsg.ttot)
|
| - self.assertEqual(fsf.children[fsg].tsub, fsg.tsub)
|
| - self.assertEqual(fsg.ttot, 18)
|
| - self.assertEqual(fsg.tsub, 3)
|
| - self.assertEqual(fsg.children[fsh].ttot, fsh.ttot)
|
| - self.assertEqual(fsg.children[fsh].tsub, fsh.tsub)
|
| - self.assertEqual(fsh.ttot, 15)
|
| - self.assertEqual(fsh.tsub, 12)
|
| - self.assertEqual(fsh.tavg, 5)
|
| - self.assertEqual(fsh.children[fsi].ttot, fsi.ttot)
|
| - self.assertEqual(fsh.children[fsi].tsub, fsi.tsub)
|
| - #stats.debug_print()
|
| -
|
| - def test_merge_multithreaded_stats(self):
|
| - import _yappi
|
| - timings = {"a_1": 2, "b_1": 1}
|
| - _yappi._set_test_timings(timings)
|
| -
|
| - def a():
|
| - pass
|
| -
|
| - def b():
|
| - pass
|
| -
|
| - yappi.start()
|
| - t = threading.Thread(target=a)
|
| - t.start()
|
| - t.join()
|
| - t = threading.Thread(target=b)
|
| - t.start()
|
| - t.join()
|
| - yappi.get_func_stats().save("tests/ystats1.ys")
|
| - yappi.clear_stats()
|
| - _yappi._set_test_timings(timings)
|
| - self.assertEqual(len(yappi.get_func_stats()), 0)
|
| - self.assertEqual(len(yappi.get_thread_stats()), 1)
|
| - t = threading.Thread(target=a)
|
| - t.start()
|
| - t.join()
|
| -
|
| - self.assertEqual(_yappi._get_start_flags()["profile_builtins"], 0)
|
| - self.assertEqual(_yappi._get_start_flags()["profile_multicontext"], 1)
|
| - yappi.get_func_stats().save("tests/ystats2.ys")
|
| -
|
| - stats = yappi.YFuncStats([
|
| - "tests/ystats1.ys",
|
| - "tests/ystats2.ys",
|
| - ])
|
| - fsa = utils.find_stat_by_name(stats, "a")
|
| - fsb = utils.find_stat_by_name(stats, "b")
|
| - self.assertEqual(fsa.ncall, 2)
|
| - self.assertEqual(fsb.ncall, 1)
|
| - self.assertEqual(fsa.tsub, fsa.ttot, 4)
|
| - self.assertEqual(fsb.tsub, fsb.ttot, 1)
|
| -
|
| - def test_merge_load_different_clock_types(self):
|
| - yappi.start(builtins=True)
|
| -
|
| - def a():
|
| - b()
|
| -
|
| - def b():
|
| - c()
|
| -
|
| - def c():
|
| - pass
|
| -
|
| - t = threading.Thread(target=a)
|
| - t.start()
|
| - t.join()
|
| - yappi.get_func_stats().sort("name", "asc").save("tests/ystats1.ys")
|
| - yappi.stop()
|
| - yappi.clear_stats()
|
| - yappi.start(builtins=False)
|
| - t = threading.Thread(target=a)
|
| - t.start()
|
| - t.join()
|
| - yappi.get_func_stats().save("tests/ystats2.ys")
|
| - yappi.stop()
|
| - self.assertRaises(_yappi.error, yappi.set_clock_type, "wall")
|
| - yappi.clear_stats()
|
| - yappi.set_clock_type("wall")
|
| - yappi.start()
|
| - t = threading.Thread(target=a)
|
| - t.start()
|
| - t.join()
|
| - yappi.get_func_stats().save("tests/ystats3.ys")
|
| - self.assertRaises(
|
| - yappi.YappiError,
|
| - yappi.YFuncStats().add("tests/ystats1.ys").add, "tests/ystats3.ys"
|
| - )
|
| - stats = yappi.YFuncStats(["tests/ystats1.ys",
|
| - "tests/ystats2.ys"]).sort("name")
|
| - fsa = utils.find_stat_by_name(stats, "a")
|
| - fsb = utils.find_stat_by_name(stats, "b")
|
| - fsc = utils.find_stat_by_name(stats, "c")
|
| - self.assertEqual(fsa.ncall, 2)
|
| - self.assertEqual(fsa.ncall, fsb.ncall, fsc.ncall)
|
| -
|
| - def test_merge_aabab_aabbc(self):
|
| - _timings = {
|
| - "a_1": 15,
|
| - "a_2": 14,
|
| - "b_1": 12,
|
| - "a_3": 10,
|
| - "b_2": 9,
|
| - "c_1": 4
|
| - }
|
| - _yappi._set_test_timings(_timings)
|
| -
|
| - def a():
|
| - if self._ncall == 1:
|
| - self._ncall += 1
|
| - a()
|
| - elif self._ncall == 5:
|
| - self._ncall += 1
|
| - a()
|
| - else:
|
| - b()
|
| -
|
| - def b():
|
| - if self._ncall == 2:
|
| - self._ncall += 1
|
| - a()
|
| - elif self._ncall == 6:
|
| - self._ncall += 1
|
| - b()
|
| - elif self._ncall == 7:
|
| - c()
|
| - else:
|
| - return
|
| -
|
| - def c():
|
| - pass
|
| -
|
| - self._ncall = 1
|
| - stats = utils.run_and_get_func_stats(a, )
|
| - stats.save("tests/ystats1.ys")
|
| - yappi.clear_stats()
|
| - _yappi._set_test_timings(_timings)
|
| - #stats.print_all()
|
| -
|
| - self._ncall = 5
|
| - stats = utils.run_and_get_func_stats(a, )
|
| - stats.save("tests/ystats2.ys")
|
| -
|
| - #stats.print_all()
|
| -
|
| - def a(): # same name but another function(code object)
|
| - pass
|
| -
|
| - yappi.start()
|
| - a()
|
| - stats = yappi.get_func_stats().add(
|
| - ["tests/ystats1.ys", "tests/ystats2.ys"]
|
| - )
|
| - #stats.print_all()
|
| - self.assertEqual(len(stats), 4)
|
| -
|
| - fsa = None
|
| - for stat in stats:
|
| - if stat.name == "a" and stat.ttot == 45:
|
| - fsa = stat
|
| - break
|
| - self.assertTrue(fsa is not None)
|
| -
|
| - self.assertEqual(fsa.ncall, 7)
|
| - self.assertEqual(fsa.nactualcall, 3)
|
| - self.assertEqual(fsa.ttot, 45)
|
| - self.assertEqual(fsa.tsub, 10)
|
| - fsb = utils.find_stat_by_name(stats, "b")
|
| - fsc = utils.find_stat_by_name(stats, "c")
|
| - self.assertEqual(fsb.ncall, 6)
|
| - self.assertEqual(fsb.nactualcall, 3)
|
| - self.assertEqual(fsb.ttot, 36)
|
| - self.assertEqual(fsb.tsub, 27)
|
| - self.assertEqual(fsb.tavg, 6)
|
| - self.assertEqual(fsc.ttot, 8)
|
| - self.assertEqual(fsc.tsub, 8)
|
| - self.assertEqual(fsc.tavg, 4)
|
| - self.assertEqual(fsc.nactualcall, fsc.ncall, 2)
|
| -
|
| -
|
| -class MultithreadedScenarios(utils.YappiUnitTestCase):
|
| -
|
| - def test_issue_32(self):
|
| - '''
|
| - Start yappi from different thread and we get Internal Error(15) as
|
| - the current_ctx_id() called while enumerating the threads in start()
|
| - and as it does not swap to the enumerated ThreadState* the THreadState_GetDict()
|
| - returns wrong object and thus sets an invalid id for the _ctx structure.
|
| -
|
| - When this issue happens multiple Threads have same tid as the internal ts_ptr
|
| - will be same for different contexts. So, let's see if that happens
|
| - '''
|
| -
|
| - def foo():
|
| - time.sleep(0.2)
|
| -
|
| - def bar():
|
| - time.sleep(0.1)
|
| -
|
| - def thread_func():
|
| - yappi.set_clock_type("wall")
|
| - yappi.start()
|
| -
|
| - bar()
|
| -
|
| - t = threading.Thread(target=thread_func)
|
| - t.start()
|
| - t.join()
|
| -
|
| - foo()
|
| -
|
| - yappi.stop()
|
| -
|
| - thread_ids = set()
|
| - for tstat in yappi.get_thread_stats():
|
| - self.assertTrue(tstat.tid not in thread_ids)
|
| - thread_ids.add(tstat.tid)
|
| -
|
| - def test_subsequent_profile(self):
|
| - WORKER_COUNT = 5
|
| -
|
| - def a():
|
| - pass
|
| -
|
| - def b():
|
| - pass
|
| -
|
| - def c():
|
| - pass
|
| -
|
| - _timings = {
|
| - "a_1": 3,
|
| - "b_1": 2,
|
| - "c_1": 1,
|
| - }
|
| -
|
| - yappi.start()
|
| -
|
| - def g():
|
| - pass
|
| -
|
| - g()
|
| - yappi.stop()
|
| - yappi.clear_stats()
|
| - _yappi._set_test_timings(_timings)
|
| - yappi.start()
|
| -
|
| - _dummy = []
|
| - for i in range(WORKER_COUNT):
|
| - t = threading.Thread(target=a)
|
| - t.start()
|
| - t.join()
|
| - for i in range(WORKER_COUNT):
|
| - t = threading.Thread(target=b)
|
| - t.start()
|
| - _dummy.append(t)
|
| - t.join()
|
| - for i in range(WORKER_COUNT):
|
| - t = threading.Thread(target=a)
|
| - t.start()
|
| - t.join()
|
| - for i in range(WORKER_COUNT):
|
| - t = threading.Thread(target=c)
|
| - t.start()
|
| - t.join()
|
| - yappi.stop()
|
| - yappi.start()
|
| -
|
| - def f():
|
| - pass
|
| -
|
| - f()
|
| - stats = yappi.get_func_stats()
|
| - fsa = utils.find_stat_by_name(stats, 'a')
|
| - fsb = utils.find_stat_by_name(stats, 'b')
|
| - fsc = utils.find_stat_by_name(stats, 'c')
|
| - self.assertEqual(fsa.ncall, 10)
|
| - self.assertEqual(fsb.ncall, 5)
|
| - self.assertEqual(fsc.ncall, 5)
|
| - self.assertEqual(fsa.ttot, fsa.tsub, 30)
|
| - self.assertEqual(fsb.ttot, fsb.tsub, 10)
|
| - self.assertEqual(fsc.ttot, fsc.tsub, 5)
|
| -
|
| - # MACOSx optimizes by only creating one worker thread
|
| - self.assertTrue(len(yappi.get_thread_stats()) >= 2)
|
| -
|
| - def test_basic(self):
|
| - yappi.set_clock_type('wall')
|
| -
|
| - def dummy():
|
| - pass
|
| -
|
| - def a():
|
| - time.sleep(0.2)
|
| -
|
| - class Worker1(threading.Thread):
|
| -
|
| - def a(self):
|
| - time.sleep(0.3)
|
| -
|
| - def run(self):
|
| - self.a()
|
| -
|
| - yappi.start(builtins=False, profile_threads=True)
|
| -
|
| - c = Worker1()
|
| - c.start()
|
| - c.join()
|
| - a()
|
| - stats = yappi.get_func_stats()
|
| - fsa1 = utils.find_stat_by_name(stats, 'Worker1.a')
|
| - fsa2 = utils.find_stat_by_name(stats, 'a')
|
| - self.assertTrue(fsa1 is not None)
|
| - self.assertTrue(fsa2 is not None)
|
| - self.assertTrue(fsa1.ttot > 0.2)
|
| - self.assertTrue(fsa2.ttot > 0.1)
|
| - tstats = yappi.get_thread_stats()
|
| - self.assertEqual(len(tstats), 2)
|
| - tsa = utils.find_stat_by_name(tstats, 'Worker1')
|
| - tsm = utils.find_stat_by_name(tstats, '_MainThread')
|
| - dummy() # call dummy to force ctx name to be retrieved again.
|
| - self.assertTrue(tsa is not None)
|
| - # TODO: I put dummy() to fix below, remove the comments after a while.
|
| - self.assertTrue( # FIX: I see this fails sometimes?
|
| - tsm is not None,
|
| - f"Could not find \"_MainThread\". Found: {', '.join(utils.get_stat_names(tstats))}")
|
| -
|
| - def test_ctx_stats(self):
|
| - from threading import Thread
|
| - DUMMY_WORKER_COUNT = 5
|
| - yappi.start()
|
| -
|
| - class DummyThread(Thread):
|
| - pass
|
| -
|
| - def dummy():
|
| - pass
|
| -
|
| - def dummy_worker():
|
| - pass
|
| -
|
| - for i in range(DUMMY_WORKER_COUNT):
|
| - t = DummyThread(target=dummy_worker)
|
| - t.start()
|
| - t.join()
|
| - yappi.stop()
|
| - stats = yappi.get_thread_stats()
|
| - tsa = utils.find_stat_by_name(stats, "DummyThread")
|
| - self.assertTrue(tsa is not None)
|
| - yappi.clear_stats()
|
| - time.sleep(1.0)
|
| - _timings = {
|
| - "a_1": 6,
|
| - "b_1": 5,
|
| - "c_1": 3,
|
| - "d_1": 1,
|
| - "a_2": 4,
|
| - "b_2": 3,
|
| - "c_2": 2,
|
| - "d_2": 1
|
| - }
|
| - _yappi._set_test_timings(_timings)
|
| -
|
| - class Thread1(Thread):
|
| - pass
|
| -
|
| - class Thread2(Thread):
|
| - pass
|
| -
|
| - def a():
|
| - b()
|
| -
|
| - def b():
|
| - c()
|
| -
|
| - def c():
|
| - d()
|
| -
|
| - def d():
|
| - time.sleep(0.6)
|
| -
|
| - yappi.set_clock_type("wall")
|
| - yappi.start()
|
| - t1 = Thread1(target=a)
|
| - t1.start()
|
| - t2 = Thread2(target=a)
|
| - t2.start()
|
| - t1.join()
|
| - t2.join()
|
| - stats = yappi.get_thread_stats()
|
| -
|
| - # the fist clear_stats clears the context table?
|
| - tsa = utils.find_stat_by_name(stats, "DummyThread")
|
| - self.assertTrue(tsa is None)
|
| -
|
| - tst1 = utils.find_stat_by_name(stats, "Thread1")
|
| - tst2 = utils.find_stat_by_name(stats, "Thread2")
|
| - tsmain = utils.find_stat_by_name(stats, "_MainThread")
|
| - dummy() # call dummy to force ctx name to be retrieved again.
|
| - self.assertTrue(len(stats) == 3)
|
| - self.assertTrue(tst1 is not None)
|
| - self.assertTrue(tst2 is not None)
|
| - # TODO: I put dummy() to fix below, remove the comments after a while.
|
| - self.assertTrue( # FIX: I see this fails sometimes
|
| - tsmain is not None,
|
| - f"Could not find \"_MainThread\". Found: {', '.join(utils.get_stat_names(stats))}")
|
| - self.assertTrue(1.0 > tst2.ttot >= 0.5)
|
| - self.assertTrue(1.0 > tst1.ttot >= 0.5)
|
| -
|
| - # test sorting of the ctx stats
|
| - stats = stats.sort("totaltime", "desc")
|
| - prev_stat = None
|
| - for stat in stats:
|
| - if prev_stat:
|
| - self.assertTrue(prev_stat.ttot >= stat.ttot)
|
| - prev_stat = stat
|
| - stats = stats.sort("totaltime", "asc")
|
| - prev_stat = None
|
| - for stat in stats:
|
| - if prev_stat:
|
| - self.assertTrue(prev_stat.ttot <= stat.ttot)
|
| - prev_stat = stat
|
| - stats = stats.sort("schedcount", "desc")
|
| - prev_stat = None
|
| - for stat in stats:
|
| - if prev_stat:
|
| - self.assertTrue(prev_stat.sched_count >= stat.sched_count)
|
| - prev_stat = stat
|
| - stats = stats.sort("name", "desc")
|
| - prev_stat = None
|
| - for stat in stats:
|
| - if prev_stat:
|
| - self.assertTrue(prev_stat.name.lower() >= stat.name.lower())
|
| - prev_stat = stat
|
| - self.assertRaises(
|
| - yappi.YappiError, stats.sort, "invalid_thread_sorttype_arg"
|
| - )
|
| - self.assertRaises(
|
| - yappi.YappiError, stats.sort, "invalid_thread_sortorder_arg"
|
| - )
|
| -
|
| - def test_ctx_stats_cpu(self):
|
| -
|
| - def get_thread_name():
|
| - try:
|
| - return threading.current_thread().name
|
| - except AttributeError:
|
| - return "Anonymous"
|
| -
|
| - def burn_cpu(sec):
|
| - t0 = yappi.get_clock_time()
|
| - elapsed = 0
|
| - while (elapsed < sec):
|
| - for _ in range(1000):
|
| - pass
|
| - elapsed = yappi.get_clock_time() - t0
|
| -
|
| - def test():
|
| -
|
| - ts = []
|
| - for i in (0.01, 0.05, 0.1):
|
| - t = threading.Thread(target=burn_cpu, args=(i, ))
|
| - t.name = f"burn_cpu-{str(i)}"
|
| - t.start()
|
| - ts.append(t)
|
| - for t in ts:
|
| - t.join()
|
| -
|
| - yappi.set_clock_type("cpu")
|
| - yappi.set_context_name_callback(get_thread_name)
|
| -
|
| - yappi.start()
|
| -
|
| - test()
|
| -
|
| - yappi.stop()
|
| -
|
| - tstats = yappi.get_thread_stats()
|
| - r1 = '''
|
| - burn_cpu-0.1 3 123145356058624 0.100105 8
|
| - burn_cpu-0.05 2 123145361313792 0.050149 8
|
| - burn_cpu-0.01 1 123145356058624 0.010127 2
|
| - MainThread 0 4321620864 0.001632 6
|
| - '''
|
| - self.assert_ctx_stats_almost_equal(r1, tstats)
|
| -
|
| - def test_producer_consumer_with_queues(self):
|
| - # we currently just stress yappi, no functionality test is done here.
|
| - yappi.start()
|
| - from queue import Queue
|
| - from threading import Thread
|
| - WORKER_THREAD_COUNT = 50
|
| - WORK_ITEM_COUNT = 2000
|
| -
|
| - def worker():
|
| - while True:
|
| - item = q.get()
|
| - # do the work with item
|
| - q.task_done()
|
| -
|
| - q = Queue()
|
| - for i in range(WORKER_THREAD_COUNT):
|
| - t = Thread(target=worker)
|
| - t.daemon = True
|
| - t.start()
|
| -
|
| - for item in range(WORK_ITEM_COUNT):
|
| - q.put(item)
|
| - q.join() # block until all tasks are done
|
| - #yappi.get_func_stats().sort("callcount").print_all()
|
| - yappi.stop()
|
| -
|
| - def test_temporary_lock_waiting(self):
|
| - yappi.start()
|
| - _lock = threading.Lock()
|
| -
|
| - def worker():
|
| - _lock.acquire()
|
| - try:
|
| - time.sleep(1.0)
|
| - finally:
|
| - _lock.release()
|
| -
|
| - t1 = threading.Thread(target=worker)
|
| - t2 = threading.Thread(target=worker)
|
| - t1.start()
|
| - t2.start()
|
| - t1.join()
|
| - t2.join()
|
| - #yappi.get_func_stats().sort("callcount").print_all()
|
| - yappi.stop()
|
| -
|
| - @unittest.skipIf(os.name != "posix", "requires Posix compliant OS")
|
| - def test_signals_with_blocking_calls(self):
|
| - import signal, os, time
|
| -
|
| - # just to verify if signal is handled correctly and stats/yappi are not corrupted.
|
| - def handler(signum, frame):
|
| - raise Exception("Signal handler executed!")
|
| -
|
| - yappi.start()
|
| - signal.signal(signal.SIGALRM, handler)
|
| - signal.alarm(1)
|
| - self.assertRaises(Exception, time.sleep, 2)
|
| - stats = yappi.get_func_stats()
|
| - fsh = utils.find_stat_by_name(stats, "handler")
|
| - self.assertTrue(fsh is not None)
|
| -
|
| - def test_concurrent_futures(self):
|
| - yappi.start()
|
| - from concurrent.futures import ThreadPoolExecutor
|
| - with ThreadPoolExecutor(max_workers=5) as executor:
|
| - f = executor.submit(pow, 5, 2)
|
| - self.assertEqual(f.result(), 25)
|
| - time.sleep(1.0)
|
| - yappi.stop()
|
| -
|
| - def test_barrier(self):
|
| - yappi.start()
|
| - b = threading.Barrier(2, timeout=1)
|
| -
|
| - def worker():
|
| - try:
|
| - b.wait()
|
| - except threading.BrokenBarrierError:
|
| - pass
|
| - except Exception:
|
| - raise Exception("BrokenBarrierError not raised")
|
| -
|
| - t1 = threading.Thread(target=worker)
|
| - t1.start()
|
| - #b.wait()
|
| - t1.join()
|
| - yappi.stop()
|
| -
|
| -
|
| -class NonRecursiveFunctions(utils.YappiUnitTestCase):
|
| -
|
| - def test_abcd(self):
|
| - _timings = {"a_1": 6, "b_1": 5, "c_1": 3, "d_1": 1}
|
| - _yappi._set_test_timings(_timings)
|
| -
|
| - def a():
|
| - b()
|
| -
|
| - def b():
|
| - c()
|
| -
|
| - def c():
|
| - d()
|
| -
|
| - def d():
|
| - pass
|
| -
|
| - stats = utils.run_and_get_func_stats(a)
|
| - fsa = utils.find_stat_by_name(stats, 'a')
|
| - fsb = utils.find_stat_by_name(stats, 'b')
|
| - fsc = utils.find_stat_by_name(stats, 'c')
|
| - fsd = utils.find_stat_by_name(stats, 'd')
|
| - cfsab = fsa.children[fsb]
|
| - cfsbc = fsb.children[fsc]
|
| - cfscd = fsc.children[fsd]
|
| -
|
| - self.assertEqual(fsa.ttot, 6)
|
| - self.assertEqual(fsa.tsub, 1)
|
| - self.assertEqual(fsb.ttot, 5)
|
| - self.assertEqual(fsb.tsub, 2)
|
| - self.assertEqual(fsc.ttot, 3)
|
| - self.assertEqual(fsc.tsub, 2)
|
| - self.assertEqual(fsd.ttot, 1)
|
| - self.assertEqual(fsd.tsub, 1)
|
| - self.assertEqual(cfsab.ttot, 5)
|
| - self.assertEqual(cfsab.tsub, 2)
|
| - self.assertEqual(cfsbc.ttot, 3)
|
| - self.assertEqual(cfsbc.tsub, 2)
|
| - self.assertEqual(cfscd.ttot, 1)
|
| - self.assertEqual(cfscd.tsub, 1)
|
| -
|
| - def test_stop_in_middle(self):
|
| - _timings = {"a_1": 6, "b_1": 4}
|
| - _yappi._set_test_timings(_timings)
|
| -
|
| - def a():
|
| - b()
|
| - yappi.stop()
|
| -
|
| - def b():
|
| - time.sleep(0.2)
|
| -
|
| - yappi.start()
|
| - a()
|
| - stats = yappi.get_func_stats()
|
| - fsa = utils.find_stat_by_name(stats, 'a')
|
| - fsb = utils.find_stat_by_name(stats, 'b')
|
| -
|
| - self.assertEqual(fsa.ncall, 1)
|
| - self.assertEqual(fsa.nactualcall, 0)
|
| - self.assertEqual(fsa.ttot, 0) # no call_leave called
|
| - self.assertEqual(fsa.tsub, 0) # no call_leave called
|
| - self.assertEqual(fsb.ttot, 4)
|
| -
|
| -
|
| -class RecursiveFunctions(utils.YappiUnitTestCase):
|
| -
|
| - def test_fibonacci(self):
|
| -
|
| - def fib(n):
|
| - if n > 1:
|
| - return fib(n - 1) + fib(n - 2)
|
| - else:
|
| - return n
|
| -
|
| - stats = utils.run_and_get_func_stats(fib, 22)
|
| - fs = utils.find_stat_by_name(stats, 'fib')
|
| - self.assertEqual(fs.ncall, 57313)
|
| - self.assertEqual(fs.ttot, fs.tsub)
|
| -
|
| - def test_abcadc(self):
|
| - _timings = {
|
| - "a_1": 20,
|
| - "b_1": 19,
|
| - "c_1": 17,
|
| - "a_2": 13,
|
| - "d_1": 12,
|
| - "c_2": 10,
|
| - "a_3": 5
|
| - }
|
| - _yappi._set_test_timings(_timings)
|
| -
|
| - def a(n):
|
| - if n == 3:
|
| - return
|
| - if n == 1 + 1:
|
| - d(n)
|
| - else:
|
| - b(n)
|
| -
|
| - def b(n):
|
| - c(n)
|
| -
|
| - def c(n):
|
| - a(n + 1)
|
| -
|
| - def d(n):
|
| - c(n)
|
| -
|
| - stats = utils.run_and_get_func_stats(a, 1)
|
| - fsa = utils.find_stat_by_name(stats, 'a')
|
| - fsb = utils.find_stat_by_name(stats, 'b')
|
| - fsc = utils.find_stat_by_name(stats, 'c')
|
| - fsd = utils.find_stat_by_name(stats, 'd')
|
| - self.assertEqual(fsa.ncall, 3)
|
| - self.assertEqual(fsa.nactualcall, 1)
|
| - self.assertEqual(fsa.ttot, 20)
|
| - self.assertEqual(fsa.tsub, 7)
|
| - self.assertEqual(fsb.ttot, 19)
|
| - self.assertEqual(fsb.tsub, 2)
|
| - self.assertEqual(fsc.ttot, 17)
|
| - self.assertEqual(fsc.tsub, 9)
|
| - self.assertEqual(fsd.ttot, 12)
|
| - self.assertEqual(fsd.tsub, 2)
|
| - cfsca = fsc.children[fsa]
|
| - self.assertEqual(cfsca.nactualcall, 0)
|
| - self.assertEqual(cfsca.ncall, 2)
|
| - self.assertEqual(cfsca.ttot, 13)
|
| - self.assertEqual(cfsca.tsub, 6)
|
| -
|
| - def test_aaaa(self):
|
| - _timings = {"d_1": 9, "d_2": 7, "d_3": 3, "d_4": 2}
|
| - _yappi._set_test_timings(_timings)
|
| -
|
| - def d(n):
|
| - if n == 3:
|
| - return
|
| - d(n + 1)
|
| -
|
| - stats = utils.run_and_get_func_stats(d, 0)
|
| - fsd = utils.find_stat_by_name(stats, 'd')
|
| - self.assertEqual(fsd.ncall, 4)
|
| - self.assertEqual(fsd.nactualcall, 1)
|
| - self.assertEqual(fsd.ttot, 9)
|
| - self.assertEqual(fsd.tsub, 9)
|
| - cfsdd = fsd.children[fsd]
|
| - self.assertEqual(cfsdd.ttot, 7)
|
| - self.assertEqual(cfsdd.tsub, 7)
|
| - self.assertEqual(cfsdd.ncall, 3)
|
| - self.assertEqual(cfsdd.nactualcall, 0)
|
| -
|
| - def test_abcabc(self):
|
| - _timings = {
|
| - "a_1": 20,
|
| - "b_1": 19,
|
| - "c_1": 17,
|
| - "a_2": 13,
|
| - "b_2": 11,
|
| - "c_2": 9,
|
| - "a_3": 6
|
| - }
|
| - _yappi._set_test_timings(_timings)
|
| -
|
| - def a(n):
|
| - if n == 3:
|
| - return
|
| - else:
|
| - b(n)
|
| -
|
| - def b(n):
|
| - c(n)
|
| -
|
| - def c(n):
|
| - a(n + 1)
|
| -
|
| - stats = utils.run_and_get_func_stats(a, 1)
|
| - fsa = utils.find_stat_by_name(stats, 'a')
|
| - fsb = utils.find_stat_by_name(stats, 'b')
|
| - fsc = utils.find_stat_by_name(stats, 'c')
|
| - self.assertEqual(fsa.ncall, 3)
|
| - self.assertEqual(fsa.nactualcall, 1)
|
| - self.assertEqual(fsa.ttot, 20)
|
| - self.assertEqual(fsa.tsub, 9)
|
| - self.assertEqual(fsb.ttot, 19)
|
| - self.assertEqual(fsb.tsub, 4)
|
| - self.assertEqual(fsc.ttot, 17)
|
| - self.assertEqual(fsc.tsub, 7)
|
| - cfsab = fsa.children[fsb]
|
| - cfsbc = fsb.children[fsc]
|
| - cfsca = fsc.children[fsa]
|
| - self.assertEqual(cfsab.ttot, 19)
|
| - self.assertEqual(cfsab.tsub, 4)
|
| - self.assertEqual(cfsbc.ttot, 17)
|
| - self.assertEqual(cfsbc.tsub, 7)
|
| - self.assertEqual(cfsca.ttot, 13)
|
| - self.assertEqual(cfsca.tsub, 8)
|
| -
|
| - def test_abcbca(self):
|
| - _timings = {"a_1": 10, "b_1": 9, "c_1": 7, "b_2": 4, "c_2": 2, "a_2": 1}
|
| - _yappi._set_test_timings(_timings)
|
| - self._ncall = 1
|
| -
|
| - def a():
|
| - if self._ncall == 1:
|
| - b()
|
| - else:
|
| - return
|
| -
|
| - def b():
|
| - c()
|
| -
|
| - def c():
|
| - if self._ncall == 1:
|
| - self._ncall += 1
|
| - b()
|
| - else:
|
| - a()
|
| -
|
| - stats = utils.run_and_get_func_stats(a)
|
| - fsa = utils.find_stat_by_name(stats, 'a')
|
| - fsb = utils.find_stat_by_name(stats, 'b')
|
| - fsc = utils.find_stat_by_name(stats, 'c')
|
| - cfsab = fsa.children[fsb]
|
| - cfsbc = fsb.children[fsc]
|
| - cfsca = fsc.children[fsa]
|
| - self.assertEqual(fsa.ttot, 10)
|
| - self.assertEqual(fsa.tsub, 2)
|
| - self.assertEqual(fsb.ttot, 9)
|
| - self.assertEqual(fsb.tsub, 4)
|
| - self.assertEqual(fsc.ttot, 7)
|
| - self.assertEqual(fsc.tsub, 4)
|
| - self.assertEqual(cfsab.ttot, 9)
|
| - self.assertEqual(cfsab.tsub, 2)
|
| - self.assertEqual(cfsbc.ttot, 7)
|
| - self.assertEqual(cfsbc.tsub, 4)
|
| - self.assertEqual(cfsca.ttot, 1)
|
| - self.assertEqual(cfsca.tsub, 1)
|
| - self.assertEqual(cfsca.ncall, 1)
|
| - self.assertEqual(cfsca.nactualcall, 0)
|
| -
|
| - def test_aabccb(self):
|
| - _timings = {
|
| - "a_1": 13,
|
| - "a_2": 11,
|
| - "b_1": 9,
|
| - "c_1": 5,
|
| - "c_2": 3,
|
| - "b_2": 1
|
| - }
|
| - _yappi._set_test_timings(_timings)
|
| - self._ncall = 1
|
| -
|
| - def a():
|
| - if self._ncall == 1:
|
| - self._ncall += 1
|
| - a()
|
| - else:
|
| - b()
|
| -
|
| - def b():
|
| - if self._ncall == 3:
|
| - return
|
| - else:
|
| - c()
|
| -
|
| - def c():
|
| - if self._ncall == 2:
|
| - self._ncall += 1
|
| - c()
|
| - else:
|
| - b()
|
| -
|
| - stats = utils.run_and_get_func_stats(a)
|
| - fsa = utils.find_stat_by_name(stats, 'a')
|
| - fsb = utils.find_stat_by_name(stats, 'b')
|
| - fsc = utils.find_stat_by_name(stats, 'c')
|
| - cfsaa = fsa.children[fsa.index]
|
| - cfsab = fsa.children[fsb]
|
| - cfsbc = fsb.children[fsc.full_name]
|
| - cfscc = fsc.children[fsc]
|
| - cfscb = fsc.children[fsb]
|
| - self.assertEqual(fsb.ttot, 9)
|
| - self.assertEqual(fsb.tsub, 5)
|
| - self.assertEqual(cfsbc.ttot, 5)
|
| - self.assertEqual(cfsbc.tsub, 2)
|
| - self.assertEqual(fsa.ttot, 13)
|
| - self.assertEqual(fsa.tsub, 4)
|
| - self.assertEqual(cfsab.ttot, 9)
|
| - self.assertEqual(cfsab.tsub, 4)
|
| - self.assertEqual(cfsaa.ttot, 11)
|
| - self.assertEqual(cfsaa.tsub, 2)
|
| - self.assertEqual(fsc.ttot, 5)
|
| - self.assertEqual(fsc.tsub, 4)
|
| -
|
| - def test_abaa(self):
|
| - _timings = {"a_1": 13, "b_1": 10, "a_2": 9, "a_3": 5}
|
| - _yappi._set_test_timings(_timings)
|
| -
|
| - self._ncall = 1
|
| -
|
| - def a():
|
| - if self._ncall == 1:
|
| - b()
|
| - elif self._ncall == 2:
|
| - self._ncall += 1
|
| - a()
|
| - else:
|
| - return
|
| -
|
| - def b():
|
| - self._ncall += 1
|
| - a()
|
| -
|
| - stats = utils.run_and_get_func_stats(a)
|
| - fsa = utils.find_stat_by_name(stats, 'a')
|
| - fsb = utils.find_stat_by_name(stats, 'b')
|
| - cfsaa = fsa.children[fsa]
|
| - cfsba = fsb.children[fsa]
|
| - self.assertEqual(fsb.ttot, 10)
|
| - self.assertEqual(fsb.tsub, 1)
|
| - self.assertEqual(fsa.ttot, 13)
|
| - self.assertEqual(fsa.tsub, 12)
|
| - self.assertEqual(cfsaa.ttot, 5)
|
| - self.assertEqual(cfsaa.tsub, 5)
|
| - self.assertEqual(cfsba.ttot, 9)
|
| - self.assertEqual(cfsba.tsub, 4)
|
| -
|
| - def test_aabb(self):
|
| - _timings = {"a_1": 13, "a_2": 10, "b_1": 9, "b_2": 5}
|
| - _yappi._set_test_timings(_timings)
|
| -
|
| - self._ncall = 1
|
| -
|
| - def a():
|
| - if self._ncall == 1:
|
| - self._ncall += 1
|
| - a()
|
| - elif self._ncall == 2:
|
| - b()
|
| - else:
|
| - return
|
| -
|
| - def b():
|
| - if self._ncall == 2:
|
| - self._ncall += 1
|
| - b()
|
| - else:
|
| - return
|
| -
|
| - stats = utils.run_and_get_func_stats(a)
|
| - fsa = utils.find_stat_by_name(stats, 'a')
|
| - fsb = utils.find_stat_by_name(stats, 'b')
|
| - cfsaa = fsa.children[fsa]
|
| - cfsab = fsa.children[fsb]
|
| - cfsbb = fsb.children[fsb]
|
| - self.assertEqual(fsa.ttot, 13)
|
| - self.assertEqual(fsa.tsub, 4)
|
| - self.assertEqual(fsb.ttot, 9)
|
| - self.assertEqual(fsb.tsub, 9)
|
| - self.assertEqual(cfsaa.ttot, 10)
|
| - self.assertEqual(cfsaa.tsub, 1)
|
| - self.assertEqual(cfsab.ttot, 9)
|
| - self.assertEqual(cfsab.tsub, 4)
|
| - self.assertEqual(cfsbb.ttot, 5)
|
| - self.assertEqual(cfsbb.tsub, 5)
|
| -
|
| - def test_abbb(self):
|
| - _timings = {"a_1": 13, "b_1": 10, "b_2": 6, "b_3": 1}
|
| - _yappi._set_test_timings(_timings)
|
| -
|
| - self._ncall = 1
|
| -
|
| - def a():
|
| - if self._ncall == 1:
|
| - b()
|
| -
|
| - def b():
|
| - if self._ncall == 3:
|
| - return
|
| - self._ncall += 1
|
| - b()
|
| -
|
| - stats = utils.run_and_get_func_stats(a)
|
| - fsa = utils.find_stat_by_name(stats, 'a')
|
| - fsb = utils.find_stat_by_name(stats, 'b')
|
| - cfsab = fsa.children[fsb]
|
| - cfsbb = fsb.children[fsb]
|
| - self.assertEqual(fsa.ttot, 13)
|
| - self.assertEqual(fsa.tsub, 3)
|
| - self.assertEqual(fsb.ttot, 10)
|
| - self.assertEqual(fsb.tsub, 10)
|
| - self.assertEqual(fsb.ncall, 3)
|
| - self.assertEqual(fsb.nactualcall, 1)
|
| - self.assertEqual(cfsab.ttot, 10)
|
| - self.assertEqual(cfsab.tsub, 4)
|
| - self.assertEqual(cfsbb.ttot, 6)
|
| - self.assertEqual(cfsbb.tsub, 6)
|
| - self.assertEqual(cfsbb.nactualcall, 0)
|
| - self.assertEqual(cfsbb.ncall, 2)
|
| -
|
| - def test_aaab(self):
|
| - _timings = {"a_1": 13, "a_2": 10, "a_3": 6, "b_1": 1}
|
| - _yappi._set_test_timings(_timings)
|
| -
|
| - self._ncall = 1
|
| -
|
| - def a():
|
| - if self._ncall == 3:
|
| - b()
|
| - return
|
| - self._ncall += 1
|
| - a()
|
| -
|
| - def b():
|
| - return
|
| -
|
| - stats = utils.run_and_get_func_stats(a)
|
| - fsa = utils.find_stat_by_name(stats, 'a')
|
| - fsb = utils.find_stat_by_name(stats, 'b')
|
| - cfsaa = fsa.children[fsa]
|
| - cfsab = fsa.children[fsb]
|
| - self.assertEqual(fsa.ttot, 13)
|
| - self.assertEqual(fsa.tsub, 12)
|
| - self.assertEqual(fsb.ttot, 1)
|
| - self.assertEqual(fsb.tsub, 1)
|
| - self.assertEqual(cfsaa.ttot, 10)
|
| - self.assertEqual(cfsaa.tsub, 9)
|
| - self.assertEqual(cfsab.ttot, 1)
|
| - self.assertEqual(cfsab.tsub, 1)
|
| -
|
| - def test_abab(self):
|
| - _timings = {"a_1": 13, "b_1": 10, "a_2": 6, "b_2": 1}
|
| - _yappi._set_test_timings(_timings)
|
| -
|
| - self._ncall = 1
|
| -
|
| - def a():
|
| - b()
|
| -
|
| - def b():
|
| - if self._ncall == 2:
|
| - return
|
| - self._ncall += 1
|
| - a()
|
| -
|
| - stats = utils.run_and_get_func_stats(a)
|
| - fsa = utils.find_stat_by_name(stats, 'a')
|
| - fsb = utils.find_stat_by_name(stats, 'b')
|
| - cfsab = fsa.children[fsb]
|
| - cfsba = fsb.children[fsa]
|
| - self.assertEqual(fsa.ttot, 13)
|
| - self.assertEqual(fsa.tsub, 8)
|
| - self.assertEqual(fsb.ttot, 10)
|
| - self.assertEqual(fsb.tsub, 5)
|
| - self.assertEqual(cfsab.ttot, 10)
|
| - self.assertEqual(cfsab.tsub, 5)
|
| - self.assertEqual(cfsab.ncall, 2)
|
| - self.assertEqual(cfsab.nactualcall, 1)
|
| - self.assertEqual(cfsba.ttot, 6)
|
| - self.assertEqual(cfsba.tsub, 5)
|
| -
|
| -
|
| -if __name__ == '__main__':
|
| - # import sys;sys.argv = ['', 'BasicUsage.test_run_as_script']
|
| - # import sys;sys.argv = ['', 'MultithreadedScenarios.test_subsequent_profile']
|
| - unittest.main()
|
| +import os |
| +import sys |
| +import time |
| +import threading |
| +import unittest |
| +import yappi |
| +import _yappi |
| +import utils |
| +import multiprocessing |
| +import subprocess |
| + |
| +_counter = 0 |
| + |
| + |
| +class BasicUsage(utils.YappiUnitTestCase): |
| + |
| + def test_callback_function_int_return_overflow(self): |
| + # this test is just here to check if any errors are generated, as the err |
| + # is printed in C side, I did not include it here. THere are ways to test |
| + # this deterministically, I did not bother |
| + import ctypes |
| + |
| + def _unsigned_overflow_margin(): |
| + return 2**(ctypes.sizeof(ctypes.c_void_p) * 8) - 1 |
| + |
| + def foo(): |
| + pass |
| + |
| + #with utils.captured_output() as (out, err): |
| + yappi.set_context_id_callback(_unsigned_overflow_margin) |
| + yappi.set_tag_callback(_unsigned_overflow_margin) |
| + yappi.start() |
| + foo() |
| + |
| + def test_issue60(self): |
| + |
| + def foo(): |
| + buf = bytearray() |
| + buf += b't' * 200 |
| + view = memoryview(buf)[10:] |
| + view = view.tobytes() |
| + del buf[:10] # this throws exception |
| + return view |
| + |
| + yappi.start(builtins=True) |
| + foo() |
| + self.assertTrue( |
| + len( |
| + yappi.get_func_stats( |
| + filter_callback=lambda x: yappi. |
| + func_matches(x, [memoryview.tobytes]) |
| + ) |
| + ) > 0 |
| + ) |
| + yappi.stop() |
| + |
| + def test_issue54(self): |
| + |
| + def _tag_cbk(): |
| + global _counter |
| + _counter += 1 |
| + return _counter |
| + |
| + def a(): |
| + pass |
| + |
| + def b(): |
| + pass |
| + |
| + yappi.set_tag_callback(_tag_cbk) |
| + yappi.start() |
| + a() |
| + a() |
| + a() |
| + yappi.stop() |
| + stats = yappi.get_func_stats() |
| + self.assertEqual(stats.pop().ncall, 3) # aggregated if no tag is given |
| + stats = yappi.get_func_stats(tag=1) |
| + |
| + for i in range(1, 3): |
| + stats = yappi.get_func_stats(tag=i) |
| + stats = yappi.get_func_stats( |
| + tag=i, filter_callback=lambda x: yappi.func_matches(x, [a]) |
| + ) |
| + |
| + stat = stats.pop() |
| + self.assertEqual(stat.ncall, 1) |
| + |
| + yappi.set_tag_callback(None) |
| + yappi.clear_stats() |
| + yappi.start() |
| + b() |
| + b() |
| + stats = yappi.get_func_stats() |
| + self.assertEqual(len(stats), 1) |
| + stat = stats.pop() |
| + self.assertEqual(stat.ncall, 2) |
| + |
| + def test_filter(self): |
| + |
| + def a(): |
| + pass |
| + |
| + def b(): |
| + a() |
| + |
| + def c(): |
| + b() |
| + |
| + _TCOUNT = 5 |
| + |
| + ts = [] |
| + yappi.start() |
| + for i in range(_TCOUNT): |
| + t = threading.Thread(target=c) |
| + t.start() |
| + ts.append(t) |
| + |
| + for t in ts: |
| + t.join() |
| + |
| + yappi.stop() |
| + |
| + ctx_ids = [] |
| + for tstat in yappi.get_thread_stats(): |
| + if tstat.name == '_MainThread': |
| + main_ctx_id = tstat.id |
| + else: |
| + ctx_ids.append(tstat.id) |
| + |
| + fstats = yappi.get_func_stats(filter={"ctx_id": 9}) |
| + self.assertTrue(fstats.empty()) |
| + fstats = yappi.get_func_stats( |
| + filter={ |
| + "ctx_id": main_ctx_id, |
| + "name": "c" |
| + } |
| + ) # main thread |
| + self.assertTrue(fstats.empty()) |
| + |
| + for i in ctx_ids: |
| + fstats = yappi.get_func_stats( |
| + filter={ |
| + "ctx_id": i, |
| + "name": "a", |
| + "ncall": 1 |
| + } |
| + ) |
| + self.assertEqual(fstats.pop().ncall, 1) |
| + fstats = yappi.get_func_stats(filter={"ctx_id": i, "name": "b"}) |
| + self.assertEqual(fstats.pop().ncall, 1) |
| + fstats = yappi.get_func_stats(filter={"ctx_id": i, "name": "c"}) |
| + self.assertEqual(fstats.pop().ncall, 1) |
| + |
| + yappi.clear_stats() |
| + yappi.start(builtins=True) |
| + time.sleep(0.1) |
| + yappi.stop() |
| + fstats = yappi.get_func_stats(filter={"module": "time"}) |
| + self.assertEqual(len(fstats), 1) |
| + |
| + # invalid filters` |
| + self.assertRaises( |
| + Exception, yappi.get_func_stats, filter={'tag': "sss"} |
| + ) |
| + self.assertRaises( |
| + Exception, yappi.get_func_stats, filter={'ctx_id': "None"} |
| + ) |
| + |
| + def test_filter_callback(self): |
| + |
| + def a(): |
| + time.sleep(0.1) |
| + |
| + def b(): |
| + a() |
| + |
| + def c(): |
| + pass |
| + |
| + def d(): |
| + pass |
| + |
| + yappi.set_clock_type("wall") |
| + yappi.start(builtins=True) |
| + a() |
| + b() |
| + c() |
| + d() |
| + stats = yappi.get_func_stats( |
| + filter_callback=lambda x: yappi.func_matches(x, [a, b]) |
| + ) |
| + #stats.print_all() |
| + r1 = ''' |
| + tests/test_functionality.py:98 a 2 0.000000 0.200350 0.100175 |
| + tests/test_functionality.py:101 b 1 0.000000 0.120000 0.100197 |
| + ''' |
| + self.assert_traces_almost_equal(r1, stats) |
| + self.assertEqual(len(stats), 2) |
| + stats = yappi.get_func_stats( |
| + filter_callback=lambda x: yappi. |
| + module_matches(x, [sys.modules[__name__]]) |
| + ) |
| + r1 = ''' |
| + tests/test_functionality.py:98 a 2 0.000000 0.230130 0.115065 |
| + tests/test_functionality.py:101 b 1 0.000000 0.120000 0.109011 |
| + tests/test_functionality.py:104 c 1 0.000000 0.000002 0.000002 |
| + tests/test_functionality.py:107 d 1 0.000000 0.000001 0.000001 |
| + ''' |
| + self.assert_traces_almost_equal(r1, stats) |
| + self.assertEqual(len(stats), 4) |
| + |
| + stats = yappi.get_func_stats( |
| + filter_callback=lambda x: yappi.func_matches(x, [time.sleep]) |
| + ) |
| + self.assertEqual(len(stats), 1) |
| + r1 = ''' |
| + time.sleep 2 0.206804 0.220000 0.103402 |
| + ''' |
| + self.assert_traces_almost_equal(r1, stats) |
| + |
| + def test_print_formatting(self): |
| + |
| + def a(): |
| + pass |
| + |
| + def b(): |
| + a() |
| + |
| + func_cols = { |
| + 1: ("name", 48), |
| + 0: ("ncall", 5), |
| + 2: ("tsub", 8), |
| + } |
| + thread_cols = { |
| + 1: ("name", 48), |
| + 0: ("ttot", 8), |
| + } |
| + |
| + yappi.start() |
| + a() |
| + b() |
| + yappi.stop() |
| + fs = yappi.get_func_stats() |
| + cs = fs[1].children |
| + ts = yappi.get_thread_stats() |
| + #fs.print_all(out=sys.stderr, columns={1:("name", 70), }) |
| + #cs.print_all(out=sys.stderr, columns=func_cols) |
| + #ts.print_all(out=sys.stderr, columns=thread_cols) |
| + #cs.print_all(out=sys.stderr, columns={}) |
| + |
| + self.assertRaises( |
| + yappi.YappiError, fs.print_all, columns={1: ("namee", 9)} |
| + ) |
| + self.assertRaises( |
| + yappi.YappiError, cs.print_all, columns={1: ("dd", 0)} |
| + ) |
| + self.assertRaises( |
| + yappi.YappiError, ts.print_all, columns={1: ("tidd", 0)} |
| + ) |
| + |
| + def test_get_clock(self): |
| + yappi.set_clock_type('cpu') |
| + self.assertEqual('cpu', yappi.get_clock_type()) |
| + clock_info = yappi.get_clock_info() |
| + self.assertTrue('api' in clock_info) |
| + self.assertTrue('resolution' in clock_info) |
| + |
| + yappi.set_clock_type('wall') |
| + self.assertEqual('wall', yappi.get_clock_type()) |
| + |
| + t0 = yappi.get_clock_time() |
| + time.sleep(0.1) |
| + duration = yappi.get_clock_time() - t0 |
| + self.assertTrue(0.05 < duration < 0.3) |
| + |
| + def test_profile_decorator(self): |
| + |
| + def aggregate(func, stats): |
| + fname = f"tests/{func.__name__}.profile" |
| + try: |
| + stats.add(fname) |
| + except OSError: |
| + pass |
| + stats.save(fname) |
| + raise Exception("messing around") |
| + |
| + @yappi.profile(return_callback=aggregate) |
| + def a(x, y): |
| + if x + y == 25: |
| + raise Exception("") |
| + return x + y |
| + |
| + def b(): |
| + pass |
| + |
| + try: |
| + os.remove( |
| + "tests/a.profile" |
| + ) # remove the one from prev test, if available |
| + except: |
| + pass |
| + |
| + # global profile is on to mess things up |
| + yappi.start() |
| + b() |
| + |
| + # assert functionality and call function at same time |
| + try: |
| + self.assertEqual(a(1, 2), 3) |
| + except: |
| + pass |
| + try: |
| + self.assertEqual(a(2, 5), 7) |
| + except: |
| + pass |
| + try: |
| + a(4, 21) |
| + except: |
| + pass |
| + stats = yappi.get_func_stats().add("tests/a.profile") |
| + fsa = utils.find_stat_by_name(stats, 'a') |
| + self.assertEqual(fsa.ncall, 3) |
| + self.assertEqual(len(stats), 1) # b() should be cleared out. |
| + |
| + @yappi.profile(return_callback=aggregate) |
| + def count_down_rec(n): |
| + if n == 0: |
| + return |
| + count_down_rec(n - 1) |
| + |
| + try: |
| + os.remove( |
| + "tests/count_down_rec.profile" |
| + ) # remove the one from prev test, if available |
| + except: |
| + pass |
| + |
| + try: |
| + count_down_rec(4) |
| + except: |
| + pass |
| + try: |
| + count_down_rec(3) |
| + except: |
| + pass |
| + |
| + stats = yappi.YFuncStats("tests/count_down_rec.profile") |
| + fsrec = utils.find_stat_by_name(stats, 'count_down_rec') |
| + self.assertEqual(fsrec.ncall, 9) |
| + self.assertEqual(fsrec.nactualcall, 2) |
| + |
| + def test_strip_dirs(self): |
| + |
| + def a(): |
| + pass |
| + |
| + stats = utils.run_and_get_func_stats(a, ) |
| + stats.strip_dirs() |
| + fsa = utils.find_stat_by_name(stats, "a") |
| + self.assertEqual(fsa.module, os.path.basename(fsa.module)) |
| + |
| + @unittest.skipIf(os.name == "nt", "do not run on Windows") |
| + def test_run_as_script(self): |
| + import re |
| + p = subprocess.Popen( |
| + ['yappi', os.path.join('./tests', 'run_as_script.py')], |
| + stdout=subprocess.PIPE |
| + ) |
| + out, err = p.communicate() |
| + self.assertEqual(p.returncode, 0) |
| + func_stats, thread_stats = re.split( |
| + b'name\\s+id\\s+tid\\s+ttot\\s+scnt\\s*\n', out |
| + ) |
| + self.assertTrue(b'FancyThread' in thread_stats) |
| + |
| + def test_yappi_overhead(self): |
| + LOOP_COUNT = 100000 |
| + |
| + def a(): |
| + pass |
| + |
| + def b(): |
| + for i in range(LOOP_COUNT): |
| + a() |
| + |
| + t0 = time.time() |
| + yappi.start() |
| + b() |
| + yappi.stop() |
| + time_with_yappi = time.time() - t0 |
| + t0 = time.time() |
| + b() |
| + time_without_yappi = time.time() - t0 |
| + if time_without_yappi == 0: |
| + time_without_yappi = 0.000001 |
| + |
| + # in latest v0.82, I calculated this as close to "7.0" in my machine. |
| + # however, %83 of this overhead is coming from tickcount(). The other %17 |
| + # seems to have been evenly distributed to the internal bookkeeping |
| + # structures/algorithms which seems acceptable. Note that our test only |
| + # tests one function being profiled at-a-time in a short interval. |
| + # profiling high number of functions in a small time |
| + # is a different beast, (which is pretty unlikely in most applications) |
| + # So as a conclusion: I cannot see any optimization window for Yappi that |
| + # is worth implementing as we will only optimize %17 of the time. |
| + sys.stderr.write("\r\nYappi puts %0.1f times overhead to the profiled application in average.\r\n" % \ |
| + (time_with_yappi / time_without_yappi)) |
| + |
| + def test_clear_stats_while_running(self): |
| + |
| + def a(): |
| + pass |
| + |
| + yappi.start() |
| + a() |
| + yappi.clear_stats() |
| + a() |
| + stats = yappi.get_func_stats() |
| + fsa = utils.find_stat_by_name(stats, 'a') |
| + self.assertEqual(fsa.ncall, 1) |
| + |
| + def test_generator(self): |
| + |
| + def _gen(n): |
| + while (n > 0): |
| + yield n |
| + n -= 1 |
| + |
| + yappi.start() |
| + for x in _gen(5): |
| + pass |
| + self.assertTrue( |
| + yappi.convert2pstats(yappi.get_func_stats()) is not None |
| + ) |
| + |
| + def test_slice_child_stats_and_strip_dirs(self): |
| + |
| + def b(): |
| + for i in range(10000000): |
| + pass |
| + |
| + def a(): |
| + b() |
| + |
| + yappi.start(builtins=True) |
| + a() |
| + stats = yappi.get_func_stats() |
| + fsa = utils.find_stat_by_name(stats, 'a') |
| + fsb = utils.find_stat_by_name(stats, 'b') |
| + self.assertTrue(fsa.children[0:1] is not None) |
| + prev_afullname = fsa.full_name |
| + prev_bchildfullname = fsa.children[fsb].full_name |
| + stats.strip_dirs() |
| + self.assertTrue(len(prev_afullname) > len(fsa.full_name)) |
| + self.assertTrue( |
| + len(prev_bchildfullname) > len(fsa.children[fsb].full_name) |
| + ) |
| + |
| + def test_children_stat_functions(self): |
| + _timings = {"a_1": 5, "b_1": 3, "c_1": 1} |
| + _yappi._set_test_timings(_timings) |
| + |
| + def b(): |
| + pass |
| + |
| + def c(): |
| + pass |
| + |
| + def a(): |
| + b() |
| + c() |
| + |
| + yappi.start() |
| + a() |
| + b() # non-child call |
| + c() # non-child call |
| + stats = yappi.get_func_stats() |
| + fsa = utils.find_stat_by_name(stats, 'a') |
| + childs_of_a = fsa.children.get().sort("tavg", "desc") |
| + prev_item = None |
| + for item in childs_of_a: |
| + if prev_item: |
| + self.assertTrue(prev_item.tavg > item.tavg) |
| + prev_item = item |
| + childs_of_a.sort("name", "desc") |
| + prev_item = None |
| + for item in childs_of_a: |
| + if prev_item: |
| + self.assertTrue(prev_item.name > item.name) |
| + prev_item = item |
| + childs_of_a.clear() |
| + self.assertTrue(childs_of_a.empty()) |
| + |
| + def test_no_stats_different_clock_type_load(self): |
| + |
| + def a(): |
| + pass |
| + |
| + yappi.start() |
| + a() |
| + yappi.stop() |
| + yappi.get_func_stats().save("tests/ystats1.ys") |
| + yappi.clear_stats() |
| + yappi.set_clock_type("WALL") |
| + yappi.start() |
| + yappi.stop() |
| + stats = yappi.get_func_stats().add("tests/ystats1.ys") |
| + fsa = utils.find_stat_by_name(stats, 'a') |
| + self.assertTrue(fsa is not None) |
| + |
| + def test_subsequent_profile(self): |
| + _timings = {"a_1": 1, "b_1": 1} |
| + _yappi._set_test_timings(_timings) |
| + |
| + def a(): |
| + pass |
| + |
| + def b(): |
| + pass |
| + |
| + yappi.start() |
| + a() |
| + yappi.stop() |
| + yappi.start() |
| + b() |
| + yappi.stop() |
| + stats = yappi.get_func_stats() |
| + fsa = utils.find_stat_by_name(stats, 'a') |
| + fsb = utils.find_stat_by_name(stats, 'b') |
| + self.assertTrue(fsa is not None) |
| + self.assertTrue(fsb is not None) |
| + self.assertEqual(fsa.ttot, 1) |
| + self.assertEqual(fsb.ttot, 1) |
| + |
| + def test_lambda(self): |
| + f = lambda: time.sleep(0.3) |
| + yappi.set_clock_type("wall") |
| + yappi.start() |
| + f() |
| + stats = yappi.get_func_stats() |
| + fsa = utils.find_stat_by_name(stats, '<lambda>') |
| + self.assertTrue(fsa.ttot > 0.1) |
| + |
| + def test_module_stress(self): |
| + self.assertEqual(yappi.is_running(), False) |
| + |
| + yappi.start() |
| + yappi.clear_stats() |
| + self.assertRaises(_yappi.error, yappi.set_clock_type, "wall") |
| + |
| + yappi.stop() |
| + yappi.clear_stats() |
| + yappi.set_clock_type("cpu") |
| + self.assertRaises(yappi.YappiError, yappi.set_clock_type, "dummy") |
| + self.assertEqual(yappi.is_running(), False) |
| + yappi.clear_stats() |
| + yappi.clear_stats() |
| + |
| + def test_stat_sorting(self): |
| + _timings = {"a_1": 13, "b_1": 10, "a_2": 6, "b_2": 1} |
| + _yappi._set_test_timings(_timings) |
| + |
| + self._ncall = 1 |
| + |
| + def a(): |
| + b() |
| + |
| + def b(): |
| + if self._ncall == 2: |
| + return |
| + self._ncall += 1 |
| + a() |
| + |
| + stats = utils.run_and_get_func_stats(a) |
| + stats = stats.sort("totaltime", "desc") |
| + prev_stat = None |
| + for stat in stats: |
| + if prev_stat: |
| + self.assertTrue(prev_stat.ttot >= stat.ttot) |
| + prev_stat = stat |
| + stats = stats.sort("totaltime", "asc") |
| + prev_stat = None |
| + for stat in stats: |
| + if prev_stat: |
| + self.assertTrue(prev_stat.ttot <= stat.ttot) |
| + prev_stat = stat |
| + stats = stats.sort("avgtime", "asc") |
| + prev_stat = None |
| + for stat in stats: |
| + if prev_stat: |
| + self.assertTrue(prev_stat.tavg <= stat.tavg) |
| + prev_stat = stat |
| + stats = stats.sort("name", "asc") |
| + prev_stat = None |
| + for stat in stats: |
| + if prev_stat: |
| + self.assertTrue(prev_stat.name <= stat.name) |
| + prev_stat = stat |
| + stats = stats.sort("subtime", "asc") |
| + prev_stat = None |
| + for stat in stats: |
| + if prev_stat: |
| + self.assertTrue(prev_stat.tsub <= stat.tsub) |
| + prev_stat = stat |
| + |
| + self.assertRaises( |
| + yappi.YappiError, stats.sort, "invalid_func_sorttype_arg" |
| + ) |
| + self.assertRaises( |
| + yappi.YappiError, stats.sort, "totaltime", |
| + "invalid_func_sortorder_arg" |
| + ) |
| + |
| + def test_start_flags(self): |
| + self.assertEqual(_yappi._get_start_flags(), None) |
| + yappi.start() |
| + |
| + def a(): |
| + pass |
| + |
| + a() |
| + self.assertEqual(_yappi._get_start_flags()["profile_builtins"], 0) |
| + self.assertEqual(_yappi._get_start_flags()["profile_multicontext"], 1) |
| + self.assertEqual(len(yappi.get_thread_stats()), 1) |
| + |
| + def test_builtin_profiling(self): |
| + |
| + def a(): |
| + time.sleep(0.4) # is a builtin function |
| + |
| + yappi.set_clock_type('wall') |
| + |
| + yappi.start(builtins=True) |
| + a() |
| + stats = yappi.get_func_stats() |
| + fsa = utils.find_stat_by_name(stats, 'sleep') |
| + self.assertTrue(fsa is not None) |
| + self.assertTrue(fsa.ttot > 0.3) |
| + yappi.stop() |
| + yappi.clear_stats() |
| + |
| + def a(): |
| + pass |
| + |
| + yappi.start() |
| + t = threading.Thread(target=a) |
| + t.start() |
| + t.join() |
| + stats = yappi.get_func_stats() |
| + |
| + def test_singlethread_profiling(self): |
| + yappi.set_clock_type('wall') |
| + |
| + def a(): |
| + time.sleep(0.2) |
| + |
| + class Worker1(threading.Thread): |
| + |
| + def a(self): |
| + time.sleep(0.3) |
| + |
| + def run(self): |
| + self.a() |
| + |
| + yappi.start(profile_threads=False) |
| + |
| + c = Worker1() |
| + c.start() |
| + c.join() |
| + a() |
| + stats = yappi.get_func_stats() |
| + fsa1 = utils.find_stat_by_name(stats, 'Worker1.a') |
| + fsa2 = utils.find_stat_by_name(stats, 'a') |
| + self.assertTrue(fsa1 is None) |
| + self.assertTrue(fsa2 is not None) |
| + self.assertTrue(fsa2.ttot > 0.1) |
| + |
| + def test_run(self): |
| + |
| + def profiled(): |
| + pass |
| + |
| + yappi.clear_stats() |
| + try: |
| + with yappi.run(): |
| + profiled() |
| + stats = yappi.get_func_stats() |
| + finally: |
| + yappi.clear_stats() |
| + |
| + self.assertIsNotNone(utils.find_stat_by_name(stats, 'profiled')) |
| + |
| + def test_run_recursive(self): |
| + |
| + def profiled(): |
| + pass |
| + |
| + def not_profiled(): |
| + pass |
| + |
| + yappi.clear_stats() |
| + try: |
| + with yappi.run(): |
| + with yappi.run(): |
| + profiled() |
| + # Profiling stopped here |
| + not_profiled() |
| + stats = yappi.get_func_stats() |
| + finally: |
| + yappi.clear_stats() |
| + |
| + self.assertIsNotNone(utils.find_stat_by_name(stats, 'profiled')) |
| + self.assertIsNone(utils.find_stat_by_name(stats, 'not_profiled')) |
| + |
| + |
| +class StatSaveScenarios(utils.YappiUnitTestCase): |
| + |
| + def test_pstats_conversion(self): |
| + |
| + def pstat_id(fs): |
| + return (fs.module, fs.lineno, fs.name) |
| + |
| + def a(): |
| + d() |
| + |
| + def b(): |
| + d() |
| + |
| + def c(): |
| + pass |
| + |
| + def d(): |
| + pass |
| + |
| + _timings = {"a_1": 12, "b_1": 7, "c_1": 5, "d_1": 2} |
| + _yappi._set_test_timings(_timings) |
| + stats = utils.run_and_get_func_stats(a, ) |
| + stats.strip_dirs() |
| + stats.save("tests/a1.pstats", type="pstat") |
| + fsa_pid = pstat_id(utils.find_stat_by_name(stats, "a")) |
| + fsd_pid = pstat_id(utils.find_stat_by_name(stats, "d")) |
| + yappi.clear_stats() |
| + _yappi._set_test_timings(_timings) |
| + stats = utils.run_and_get_func_stats(a, ) |
| + stats.strip_dirs() |
| + stats.save("tests/a2.pstats", type="pstat") |
| + yappi.clear_stats() |
| + _yappi._set_test_timings(_timings) |
| + stats = utils.run_and_get_func_stats(b, ) |
| + stats.strip_dirs() |
| + stats.save("tests/b1.pstats", type="pstat") |
| + fsb_pid = pstat_id(utils.find_stat_by_name(stats, "b")) |
| + yappi.clear_stats() |
| + _yappi._set_test_timings(_timings) |
| + stats = utils.run_and_get_func_stats(c, ) |
| + stats.strip_dirs() |
| + stats.save("tests/c1.pstats", type="pstat") |
| + fsc_pid = pstat_id(utils.find_stat_by_name(stats, "c")) |
| + |
| + # merge saved stats and check pstats values are correct |
| + import pstats |
| + p = pstats.Stats( |
| + 'tests/a1.pstats', 'tests/a2.pstats', 'tests/b1.pstats', |
| + 'tests/c1.pstats' |
| + ) |
| + p.strip_dirs() |
| + # ct = ttot, tt = tsub |
| + (cc, nc, tt, ct, callers) = p.stats[fsa_pid] |
| + self.assertEqual(cc, nc, 2) |
| + self.assertEqual(tt, 20) |
| + self.assertEqual(ct, 24) |
| + (cc, nc, tt, ct, callers) = p.stats[fsd_pid] |
| + self.assertEqual(cc, nc, 3) |
| + self.assertEqual(tt, 6) |
| + self.assertEqual(ct, 6) |
| + self.assertEqual(len(callers), 2) |
| + (cc, nc, tt, ct) = callers[fsa_pid] |
| + self.assertEqual(cc, nc, 2) |
| + self.assertEqual(tt, 4) |
| + self.assertEqual(ct, 4) |
| + (cc, nc, tt, ct) = callers[fsb_pid] |
| + self.assertEqual(cc, nc, 1) |
| + self.assertEqual(tt, 2) |
| + self.assertEqual(ct, 2) |
| + |
| + def test_merge_stats(self): |
| + _timings = { |
| + "a_1": 15, |
| + "b_1": 14, |
| + "c_1": 12, |
| + "d_1": 10, |
| + "e_1": 9, |
| + "f_1": 7, |
| + "g_1": 6, |
| + "h_1": 5, |
| + "i_1": 1 |
| + } |
| + _yappi._set_test_timings(_timings) |
| + |
| + def a(): |
| + b() |
| + |
| + def b(): |
| + c() |
| + |
| + def c(): |
| + d() |
| + |
| + def d(): |
| + e() |
| + |
| + def e(): |
| + f() |
| + |
| + def f(): |
| + g() |
| + |
| + def g(): |
| + h() |
| + |
| + def h(): |
| + i() |
| + |
| + def i(): |
| + pass |
| + |
| + yappi.start() |
| + a() |
| + a() |
| + yappi.stop() |
| + stats = yappi.get_func_stats() |
| + self.assertRaises( |
| + NotImplementedError, stats.save, "", "INVALID_SAVE_TYPE" |
| + ) |
| + stats.save("tests/ystats2.ys") |
| + yappi.clear_stats() |
| + _yappi._set_test_timings(_timings) |
| + yappi.start() |
| + a() |
| + stats = yappi.get_func_stats().add("tests/ystats2.ys") |
| + fsa = utils.find_stat_by_name(stats, "a") |
| + fsb = utils.find_stat_by_name(stats, "b") |
| + fsc = utils.find_stat_by_name(stats, "c") |
| + fsd = utils.find_stat_by_name(stats, "d") |
| + fse = utils.find_stat_by_name(stats, "e") |
| + fsf = utils.find_stat_by_name(stats, "f") |
| + fsg = utils.find_stat_by_name(stats, "g") |
| + fsh = utils.find_stat_by_name(stats, "h") |
| + fsi = utils.find_stat_by_name(stats, "i") |
| + self.assertEqual(fsa.ttot, 45) |
| + self.assertEqual(fsa.ncall, 3) |
| + self.assertEqual(fsa.nactualcall, 3) |
| + self.assertEqual(fsa.tsub, 3) |
| + self.assertEqual(fsa.children[fsb].ttot, fsb.ttot) |
| + self.assertEqual(fsa.children[fsb].tsub, fsb.tsub) |
| + self.assertEqual(fsb.children[fsc].ttot, fsc.ttot) |
| + self.assertEqual(fsb.children[fsc].tsub, fsc.tsub) |
| + self.assertEqual(fsc.tsub, 6) |
| + self.assertEqual(fsc.children[fsd].ttot, fsd.ttot) |
| + self.assertEqual(fsc.children[fsd].tsub, fsd.tsub) |
| + self.assertEqual(fsd.children[fse].ttot, fse.ttot) |
| + self.assertEqual(fsd.children[fse].tsub, fse.tsub) |
| + self.assertEqual(fse.children[fsf].ttot, fsf.ttot) |
| + self.assertEqual(fse.children[fsf].tsub, fsf.tsub) |
| + self.assertEqual(fsf.children[fsg].ttot, fsg.ttot) |
| + self.assertEqual(fsf.children[fsg].tsub, fsg.tsub) |
| + self.assertEqual(fsg.ttot, 18) |
| + self.assertEqual(fsg.tsub, 3) |
| + self.assertEqual(fsg.children[fsh].ttot, fsh.ttot) |
| + self.assertEqual(fsg.children[fsh].tsub, fsh.tsub) |
| + self.assertEqual(fsh.ttot, 15) |
| + self.assertEqual(fsh.tsub, 12) |
| + self.assertEqual(fsh.tavg, 5) |
| + self.assertEqual(fsh.children[fsi].ttot, fsi.ttot) |
| + self.assertEqual(fsh.children[fsi].tsub, fsi.tsub) |
| + #stats.debug_print() |
| + |
| + def test_merge_multithreaded_stats(self): |
| + import _yappi |
| + timings = {"a_1": 2, "b_1": 1} |
| + _yappi._set_test_timings(timings) |
| + |
| + def a(): |
| + pass |
| + |
| + def b(): |
| + pass |
| + |
| + yappi.start() |
| + t = threading.Thread(target=a) |
| + t.start() |
| + t.join() |
| + t = threading.Thread(target=b) |
| + t.start() |
| + t.join() |
| + yappi.get_func_stats().save("tests/ystats1.ys") |
| + yappi.clear_stats() |
| + _yappi._set_test_timings(timings) |
| + self.assertEqual(len(yappi.get_func_stats()), 0) |
| + self.assertEqual(len(yappi.get_thread_stats()), 1) |
| + t = threading.Thread(target=a) |
| + t.start() |
| + t.join() |
| + |
| + self.assertEqual(_yappi._get_start_flags()["profile_builtins"], 0) |
| + self.assertEqual(_yappi._get_start_flags()["profile_multicontext"], 1) |
| + yappi.get_func_stats().save("tests/ystats2.ys") |
| + |
| + stats = yappi.YFuncStats([ |
| + "tests/ystats1.ys", |
| + "tests/ystats2.ys", |
| + ]) |
| + fsa = utils.find_stat_by_name(stats, "a") |
| + fsb = utils.find_stat_by_name(stats, "b") |
| + self.assertEqual(fsa.ncall, 2) |
| + self.assertEqual(fsb.ncall, 1) |
| + self.assertEqual(fsa.tsub, fsa.ttot, 4) |
| + self.assertEqual(fsb.tsub, fsb.ttot, 1) |
| + |
| + def test_merge_load_different_clock_types(self): |
| + yappi.start(builtins=True) |
| + |
| + def a(): |
| + b() |
| + |
| + def b(): |
| + c() |
| + |
| + def c(): |
| + pass |
| + |
| + t = threading.Thread(target=a) |
| + t.start() |
| + t.join() |
| + yappi.get_func_stats().sort("name", "asc").save("tests/ystats1.ys") |
| + yappi.stop() |
| + yappi.clear_stats() |
| + yappi.start(builtins=False) |
| + t = threading.Thread(target=a) |
| + t.start() |
| + t.join() |
| + yappi.get_func_stats().save("tests/ystats2.ys") |
| + yappi.stop() |
| + self.assertRaises(_yappi.error, yappi.set_clock_type, "wall") |
| + yappi.clear_stats() |
| + yappi.set_clock_type("wall") |
| + yappi.start() |
| + t = threading.Thread(target=a) |
| + t.start() |
| + t.join() |
| + yappi.get_func_stats().save("tests/ystats3.ys") |
| + self.assertRaises( |
| + yappi.YappiError, |
| + yappi.YFuncStats().add("tests/ystats1.ys").add, "tests/ystats3.ys" |
| + ) |
| + stats = yappi.YFuncStats(["tests/ystats1.ys", |
| + "tests/ystats2.ys"]).sort("name") |
| + fsa = utils.find_stat_by_name(stats, "a") |
| + fsb = utils.find_stat_by_name(stats, "b") |
| + fsc = utils.find_stat_by_name(stats, "c") |
| + self.assertEqual(fsa.ncall, 2) |
| + self.assertEqual(fsa.ncall, fsb.ncall, fsc.ncall) |
| + |
| + def test_merge_aabab_aabbc(self): |
| + _timings = { |
| + "a_1": 15, |
| + "a_2": 14, |
| + "b_1": 12, |
| + "a_3": 10, |
| + "b_2": 9, |
| + "c_1": 4 |
| + } |
| + _yappi._set_test_timings(_timings) |
| + |
| + def a(): |
| + if self._ncall == 1: |
| + self._ncall += 1 |
| + a() |
| + elif self._ncall == 5: |
| + self._ncall += 1 |
| + a() |
| + else: |
| + b() |
| + |
| + def b(): |
| + if self._ncall == 2: |
| + self._ncall += 1 |
| + a() |
| + elif self._ncall == 6: |
| + self._ncall += 1 |
| + b() |
| + elif self._ncall == 7: |
| + c() |
| + else: |
| + return |
| + |
| + def c(): |
| + pass |
| + |
| + self._ncall = 1 |
| + stats = utils.run_and_get_func_stats(a, ) |
| + stats.save("tests/ystats1.ys") |
| + yappi.clear_stats() |
| + _yappi._set_test_timings(_timings) |
| + #stats.print_all() |
| + |
| + self._ncall = 5 |
| + stats = utils.run_and_get_func_stats(a, ) |
| + stats.save("tests/ystats2.ys") |
| + |
| + #stats.print_all() |
| + |
| + def a(): # same name but another function(code object) |
| + pass |
| + |
| + yappi.start() |
| + a() |
| + stats = yappi.get_func_stats().add( |
| + ["tests/ystats1.ys", "tests/ystats2.ys"] |
| + ) |
| + #stats.print_all() |
| + self.assertEqual(len(stats), 4) |
| + |
| + fsa = None |
| + for stat in stats: |
| + if stat.name == "a" and stat.ttot == 45: |
| + fsa = stat |
| + break |
| + self.assertTrue(fsa is not None) |
| + |
| + self.assertEqual(fsa.ncall, 7) |
| + self.assertEqual(fsa.nactualcall, 3) |
| + self.assertEqual(fsa.ttot, 45) |
| + self.assertEqual(fsa.tsub, 10) |
| + fsb = utils.find_stat_by_name(stats, "b") |
| + fsc = utils.find_stat_by_name(stats, "c") |
| + self.assertEqual(fsb.ncall, 6) |
| + self.assertEqual(fsb.nactualcall, 3) |
| + self.assertEqual(fsb.ttot, 36) |
| + self.assertEqual(fsb.tsub, 27) |
| + self.assertEqual(fsb.tavg, 6) |
| + self.assertEqual(fsc.ttot, 8) |
| + self.assertEqual(fsc.tsub, 8) |
| + self.assertEqual(fsc.tavg, 4) |
| + self.assertEqual(fsc.nactualcall, fsc.ncall, 2) |
| + |
| + |
| +class MultithreadedScenarios(utils.YappiUnitTestCase): |
| + |
| + def test_issue_32(self): |
| + ''' |
| + Start yappi from different thread and we get Internal Error(15) as |
| + the current_ctx_id() called while enumerating the threads in start() |
| + and as it does not swap to the enumerated ThreadState* the THreadState_GetDict() |
| + returns wrong object and thus sets an invalid id for the _ctx structure. |
| + |
| + When this issue happens multiple Threads have same tid as the internal ts_ptr |
| + will be same for different contexts. So, let's see if that happens |
| + ''' |
| + |
| + def foo(): |
| + time.sleep(0.2) |
| + |
| + def bar(): |
| + time.sleep(0.1) |
| + |
| + def thread_func(): |
| + yappi.set_clock_type("wall") |
| + yappi.start() |
| + |
| + bar() |
| + |
| + t = threading.Thread(target=thread_func) |
| + t.start() |
| + t.join() |
| + |
| + foo() |
| + |
| + yappi.stop() |
| + |
| + thread_ids = set() |
| + for tstat in yappi.get_thread_stats(): |
| + self.assertTrue(tstat.tid not in thread_ids) |
| + thread_ids.add(tstat.tid) |
| + |
| + def test_subsequent_profile(self): |
| + WORKER_COUNT = 5 |
| + |
| + def a(): |
| + pass |
| + |
| + def b(): |
| + pass |
| + |
| + def c(): |
| + pass |
| + |
| + _timings = { |
| + "a_1": 3, |
| + "b_1": 2, |
| + "c_1": 1, |
| + } |
| + |
| + yappi.start() |
| + |
| + def g(): |
| + pass |
| + |
| + g() |
| + yappi.stop() |
| + yappi.clear_stats() |
| + _yappi._set_test_timings(_timings) |
| + yappi.start() |
| + |
| + _dummy = [] |
| + for i in range(WORKER_COUNT): |
| + t = threading.Thread(target=a) |
| + t.start() |
| + t.join() |
| + for i in range(WORKER_COUNT): |
| + t = threading.Thread(target=b) |
| + t.start() |
| + _dummy.append(t) |
| + t.join() |
| + for i in range(WORKER_COUNT): |
| + t = threading.Thread(target=a) |
| + t.start() |
| + t.join() |
| + for i in range(WORKER_COUNT): |
| + t = threading.Thread(target=c) |
| + t.start() |
| + t.join() |
| + yappi.stop() |
| + yappi.start() |
| + |
| + def f(): |
| + pass |
| + |
| + f() |
| + stats = yappi.get_func_stats() |
| + fsa = utils.find_stat_by_name(stats, 'a') |
| + fsb = utils.find_stat_by_name(stats, 'b') |
| + fsc = utils.find_stat_by_name(stats, 'c') |
| + self.assertEqual(fsa.ncall, 10) |
| + self.assertEqual(fsb.ncall, 5) |
| + self.assertEqual(fsc.ncall, 5) |
| + self.assertEqual(fsa.ttot, fsa.tsub, 30) |
| + self.assertEqual(fsb.ttot, fsb.tsub, 10) |
| + self.assertEqual(fsc.ttot, fsc.tsub, 5) |
| + |
| + # MACOSx optimizes by only creating one worker thread |
| + self.assertTrue(len(yappi.get_thread_stats()) >= 2) |
| + |
| + def test_basic(self): |
| + yappi.set_clock_type('wall') |
| + |
| + def dummy(): |
| + pass |
| + |
| + def a(): |
| + time.sleep(0.2) |
| + |
| + class Worker1(threading.Thread): |
| + |
| + def a(self): |
| + time.sleep(0.3) |
| + |
| + def run(self): |
| + self.a() |
| + |
| + yappi.start(builtins=False, profile_threads=True) |
| + |
| + c = Worker1() |
| + c.start() |
| + c.join() |
| + a() |
| + stats = yappi.get_func_stats() |
| + fsa1 = utils.find_stat_by_name(stats, 'Worker1.a') |
| + fsa2 = utils.find_stat_by_name(stats, 'a') |
| + self.assertTrue(fsa1 is not None) |
| + self.assertTrue(fsa2 is not None) |
| + self.assertTrue(fsa1.ttot > 0.2) |
| + self.assertTrue(fsa2.ttot > 0.1) |
| + tstats = yappi.get_thread_stats() |
| + self.assertEqual(len(tstats), 2) |
| + tsa = utils.find_stat_by_name(tstats, 'Worker1') |
| + tsm = utils.find_stat_by_name(tstats, '_MainThread') |
| + dummy() # call dummy to force ctx name to be retrieved again. |
| + self.assertTrue(tsa is not None) |
| + # TODO: I put dummy() to fix below, remove the comments after a while. |
| + self.assertTrue( # FIX: I see this fails sometimes? |
| + tsm is not None, |
| + f"Could not find \"_MainThread\". Found: {', '.join(utils.get_stat_names(tstats))}") |
| + |
| + def test_ctx_stats(self): |
| + from threading import Thread |
| + DUMMY_WORKER_COUNT = 5 |
| + yappi.start() |
| + |
| + class DummyThread(Thread): |
| + pass |
| + |
| + def dummy(): |
| + pass |
| + |
| + def dummy_worker(): |
| + pass |
| + |
| + for i in range(DUMMY_WORKER_COUNT): |
| + t = DummyThread(target=dummy_worker) |
| + t.start() |
| + t.join() |
| + yappi.stop() |
| + stats = yappi.get_thread_stats() |
| + tsa = utils.find_stat_by_name(stats, "DummyThread") |
| + self.assertTrue(tsa is not None) |
| + yappi.clear_stats() |
| + time.sleep(1.0) |
| + _timings = { |
| + "a_1": 6, |
| + "b_1": 5, |
| + "c_1": 3, |
| + "d_1": 1, |
| + "a_2": 4, |
| + "b_2": 3, |
| + "c_2": 2, |
| + "d_2": 1 |
| + } |
| + _yappi._set_test_timings(_timings) |
| + |
| + class Thread1(Thread): |
| + pass |
| + |
| + class Thread2(Thread): |
| + pass |
| + |
| + def a(): |
| + b() |
| + |
| + def b(): |
| + c() |
| + |
| + def c(): |
| + d() |
| + |
| + def d(): |
| + time.sleep(0.6) |
| + |
| + yappi.set_clock_type("wall") |
| + yappi.start() |
| + t1 = Thread1(target=a) |
| + t1.start() |
| + t2 = Thread2(target=a) |
| + t2.start() |
| + t1.join() |
| + t2.join() |
| + stats = yappi.get_thread_stats() |
| + |
| + # the fist clear_stats clears the context table? |
| + tsa = utils.find_stat_by_name(stats, "DummyThread") |
| + self.assertTrue(tsa is None) |
| + |
| + tst1 = utils.find_stat_by_name(stats, "Thread1") |
| + tst2 = utils.find_stat_by_name(stats, "Thread2") |
| + tsmain = utils.find_stat_by_name(stats, "_MainThread") |
| + dummy() # call dummy to force ctx name to be retrieved again. |
| + self.assertTrue(len(stats) == 3) |
| + self.assertTrue(tst1 is not None) |
| + self.assertTrue(tst2 is not None) |
| + # TODO: I put dummy() to fix below, remove the comments after a while. |
| + self.assertTrue( # FIX: I see this fails sometimes |
| + tsmain is not None, |
| + f"Could not find \"_MainThread\". Found: {', '.join(utils.get_stat_names(stats))}") |
| + self.assertTrue(1.0 > tst2.ttot >= 0.5) |
| + self.assertTrue(1.0 > tst1.ttot >= 0.5) |
| + |
| + # test sorting of the ctx stats |
| + stats = stats.sort("totaltime", "desc") |
| + prev_stat = None |
| + for stat in stats: |
| + if prev_stat: |
| + self.assertTrue(prev_stat.ttot >= stat.ttot) |
| + prev_stat = stat |
| + stats = stats.sort("totaltime", "asc") |
| + prev_stat = None |
| + for stat in stats: |
| + if prev_stat: |
| + self.assertTrue(prev_stat.ttot <= stat.ttot) |
| + prev_stat = stat |
| + stats = stats.sort("schedcount", "desc") |
| + prev_stat = None |
| + for stat in stats: |
| + if prev_stat: |
| + self.assertTrue(prev_stat.sched_count >= stat.sched_count) |
| + prev_stat = stat |
| + stats = stats.sort("name", "desc") |
| + prev_stat = None |
| + for stat in stats: |
| + if prev_stat: |
| + self.assertTrue(prev_stat.name.lower() >= stat.name.lower()) |
| + prev_stat = stat |
| + self.assertRaises( |
| + yappi.YappiError, stats.sort, "invalid_thread_sorttype_arg" |
| + ) |
| + self.assertRaises( |
| + yappi.YappiError, stats.sort, "invalid_thread_sortorder_arg" |
| + ) |
| + |
| + def test_ctx_stats_cpu(self): |
| + |
| + def get_thread_name(): |
| + try: |
| + return threading.current_thread().name |
| + except AttributeError: |
| + return "Anonymous" |
| + |
| + def burn_cpu(sec): |
| + t0 = yappi.get_clock_time() |
| + elapsed = 0 |
| + while (elapsed < sec): |
| + for _ in range(1000): |
| + pass |
| + elapsed = yappi.get_clock_time() - t0 |
| + |
| + def test(): |
| + |
| + ts = [] |
| + for i in (0.01, 0.05, 0.1): |
| + t = threading.Thread(target=burn_cpu, args=(i, )) |
| + t.name = f"burn_cpu-{str(i)}" |
| + t.start() |
| + ts.append(t) |
| + for t in ts: |
| + t.join() |
| + |
| + yappi.set_clock_type("cpu") |
| + yappi.set_context_name_callback(get_thread_name) |
| + |
| + yappi.start() |
| + |
| + test() |
| + |
| + yappi.stop() |
| + |
| + tstats = yappi.get_thread_stats() |
| + r1 = ''' |
| + burn_cpu-0.1 3 123145356058624 0.100105 8 |
| + burn_cpu-0.05 2 123145361313792 0.050149 8 |
| + burn_cpu-0.01 1 123145356058624 0.010127 2 |
| + MainThread 0 4321620864 0.001632 6 |
| + ''' |
| + self.assert_ctx_stats_almost_equal(r1, tstats) |
| + |
| + def test_producer_consumer_with_queues(self): |
| + # we currently just stress yappi, no functionality test is done here. |
| + yappi.start() |
| + from queue import Queue |
| + from threading import Thread |
| + WORKER_THREAD_COUNT = 50 |
| + WORK_ITEM_COUNT = 2000 |
| + |
| + def worker(): |
| + while True: |
| + item = q.get() |
| + # do the work with item |
| + q.task_done() |
| + |
| + q = Queue() |
| + for i in range(WORKER_THREAD_COUNT): |
| + t = Thread(target=worker) |
| + t.daemon = True |
| + t.start() |
| + |
| + for item in range(WORK_ITEM_COUNT): |
| + q.put(item) |
| + q.join() # block until all tasks are done |
| + #yappi.get_func_stats().sort("callcount").print_all() |
| + yappi.stop() |
| + |
| + def test_temporary_lock_waiting(self): |
| + yappi.start() |
| + _lock = threading.Lock() |
| + |
| + def worker(): |
| + _lock.acquire() |
| + try: |
| + time.sleep(1.0) |
| + finally: |
| + _lock.release() |
| + |
| + t1 = threading.Thread(target=worker) |
| + t2 = threading.Thread(target=worker) |
| + t1.start() |
| + t2.start() |
| + t1.join() |
| + t2.join() |
| + #yappi.get_func_stats().sort("callcount").print_all() |
| + yappi.stop() |
| + |
| + @unittest.skipIf(os.name != "posix", "requires Posix compliant OS") |
| + def test_signals_with_blocking_calls(self): |
| + import signal, os, time |
| + |
| + # just to verify if signal is handled correctly and stats/yappi are not corrupted. |
| + def handler(signum, frame): |
| + raise Exception("Signal handler executed!") |
| + |
| + yappi.start() |
| + signal.signal(signal.SIGALRM, handler) |
| + signal.alarm(1) |
| + self.assertRaises(Exception, time.sleep, 2) |
| + stats = yappi.get_func_stats() |
| + fsh = utils.find_stat_by_name(stats, "handler") |
| + self.assertTrue(fsh is not None) |
| + |
| + def test_concurrent_futures(self): |
| + yappi.start() |
| + from concurrent.futures import ThreadPoolExecutor |
| + with ThreadPoolExecutor(max_workers=5) as executor: |
| + f = executor.submit(pow, 5, 2) |
| + self.assertEqual(f.result(), 25) |
| + time.sleep(1.0) |
| + yappi.stop() |
| + |
| + def test_barrier(self): |
| + yappi.start() |
| + b = threading.Barrier(2, timeout=1) |
| + |
| + def worker(): |
| + try: |
| + b.wait() |
| + except threading.BrokenBarrierError: |
| + pass |
| + except Exception: |
| + raise Exception("BrokenBarrierError not raised") |
| + |
| + t1 = threading.Thread(target=worker) |
| + t1.start() |
| + #b.wait() |
| + t1.join() |
| + yappi.stop() |
| + |
| + |
| +class NonRecursiveFunctions(utils.YappiUnitTestCase): |
| + |
| + def test_abcd(self): |
| + _timings = {"a_1": 6, "b_1": 5, "c_1": 3, "d_1": 1} |
| + _yappi._set_test_timings(_timings) |
| + |
| + def a(): |
| + b() |
| + |
| + def b(): |
| + c() |
| + |
| + def c(): |
| + d() |
| + |
| + def d(): |
| + pass |
| + |
| + stats = utils.run_and_get_func_stats(a) |
| + fsa = utils.find_stat_by_name(stats, 'a') |
| + fsb = utils.find_stat_by_name(stats, 'b') |
| + fsc = utils.find_stat_by_name(stats, 'c') |
| + fsd = utils.find_stat_by_name(stats, 'd') |
| + cfsab = fsa.children[fsb] |
| + cfsbc = fsb.children[fsc] |
| + cfscd = fsc.children[fsd] |
| + |
| + self.assertEqual(fsa.ttot, 6) |
| + self.assertEqual(fsa.tsub, 1) |
| + self.assertEqual(fsb.ttot, 5) |
| + self.assertEqual(fsb.tsub, 2) |
| + self.assertEqual(fsc.ttot, 3) |
| + self.assertEqual(fsc.tsub, 2) |
| + self.assertEqual(fsd.ttot, 1) |
| + self.assertEqual(fsd.tsub, 1) |
| + self.assertEqual(cfsab.ttot, 5) |
| + self.assertEqual(cfsab.tsub, 2) |
| + self.assertEqual(cfsbc.ttot, 3) |
| + self.assertEqual(cfsbc.tsub, 2) |
| + self.assertEqual(cfscd.ttot, 1) |
| + self.assertEqual(cfscd.tsub, 1) |
| + |
| + def test_stop_in_middle(self): |
| + _timings = {"a_1": 6, "b_1": 4} |
| + _yappi._set_test_timings(_timings) |
| + |
| + def a(): |
| + b() |
| + yappi.stop() |
| + |
| + def b(): |
| + time.sleep(0.2) |
| + |
| + yappi.start() |
| + a() |
| + stats = yappi.get_func_stats() |
| + fsa = utils.find_stat_by_name(stats, 'a') |
| + fsb = utils.find_stat_by_name(stats, 'b') |
| + |
| + self.assertEqual(fsa.ncall, 1) |
| + self.assertEqual(fsa.nactualcall, 0) |
| + self.assertEqual(fsa.ttot, 0) # no call_leave called |
| + self.assertEqual(fsa.tsub, 0) # no call_leave called |
| + self.assertEqual(fsb.ttot, 4) |
| + |
| + |
| +class RecursiveFunctions(utils.YappiUnitTestCase): |
| + |
| + def test_fibonacci(self): |
| + |
| + def fib(n): |
| + if n > 1: |
| + return fib(n - 1) + fib(n - 2) |
| + else: |
| + return n |
| + |
| + stats = utils.run_and_get_func_stats(fib, 22) |
| + fs = utils.find_stat_by_name(stats, 'fib') |
| + self.assertEqual(fs.ncall, 57313) |
| + self.assertEqual(fs.ttot, fs.tsub) |
| + |
| + def test_abcadc(self): |
| + _timings = { |
| + "a_1": 20, |
| + "b_1": 19, |
| + "c_1": 17, |
| + "a_2": 13, |
| + "d_1": 12, |
| + "c_2": 10, |
| + "a_3": 5 |
| + } |
| + _yappi._set_test_timings(_timings) |
| + |
| + def a(n): |
| + if n == 3: |
| + return |
| + if n == 1 + 1: |
| + d(n) |
| + else: |
| + b(n) |
| + |
| + def b(n): |
| + c(n) |
| + |
| + def c(n): |
| + a(n + 1) |
| + |
| + def d(n): |
| + c(n) |
| + |
| + stats = utils.run_and_get_func_stats(a, 1) |
| + fsa = utils.find_stat_by_name(stats, 'a') |
| + fsb = utils.find_stat_by_name(stats, 'b') |
| + fsc = utils.find_stat_by_name(stats, 'c') |
| + fsd = utils.find_stat_by_name(stats, 'd') |
| + self.assertEqual(fsa.ncall, 3) |
| + self.assertEqual(fsa.nactualcall, 1) |
| + self.assertEqual(fsa.ttot, 20) |
| + self.assertEqual(fsa.tsub, 7) |
| + self.assertEqual(fsb.ttot, 19) |
| + self.assertEqual(fsb.tsub, 2) |
| + self.assertEqual(fsc.ttot, 17) |
| + self.assertEqual(fsc.tsub, 9) |
| + self.assertEqual(fsd.ttot, 12) |
| + self.assertEqual(fsd.tsub, 2) |
| + cfsca = fsc.children[fsa] |
| + self.assertEqual(cfsca.nactualcall, 0) |
| + self.assertEqual(cfsca.ncall, 2) |
| + self.assertEqual(cfsca.ttot, 13) |
| + self.assertEqual(cfsca.tsub, 6) |
| + |
| + def test_aaaa(self): |
| + _timings = {"d_1": 9, "d_2": 7, "d_3": 3, "d_4": 2} |
| + _yappi._set_test_timings(_timings) |
| + |
| + def d(n): |
| + if n == 3: |
| + return |
| + d(n + 1) |
| + |
| + stats = utils.run_and_get_func_stats(d, 0) |
| + fsd = utils.find_stat_by_name(stats, 'd') |
| + self.assertEqual(fsd.ncall, 4) |
| + self.assertEqual(fsd.nactualcall, 1) |
| + self.assertEqual(fsd.ttot, 9) |
| + self.assertEqual(fsd.tsub, 9) |
| + cfsdd = fsd.children[fsd] |
| + self.assertEqual(cfsdd.ttot, 7) |
| + self.assertEqual(cfsdd.tsub, 7) |
| + self.assertEqual(cfsdd.ncall, 3) |
| + self.assertEqual(cfsdd.nactualcall, 0) |
| + |
| + def test_abcabc(self): |
| + _timings = { |
| + "a_1": 20, |
| + "b_1": 19, |
| + "c_1": 17, |
| + "a_2": 13, |
| + "b_2": 11, |
| + "c_2": 9, |
| + "a_3": 6 |
| + } |
| + _yappi._set_test_timings(_timings) |
| + |
| + def a(n): |
| + if n == 3: |
| + return |
| + else: |
| + b(n) |
| + |
| + def b(n): |
| + c(n) |
| + |
| + def c(n): |
| + a(n + 1) |
| + |
| + stats = utils.run_and_get_func_stats(a, 1) |
| + fsa = utils.find_stat_by_name(stats, 'a') |
| + fsb = utils.find_stat_by_name(stats, 'b') |
| + fsc = utils.find_stat_by_name(stats, 'c') |
| + self.assertEqual(fsa.ncall, 3) |
| + self.assertEqual(fsa.nactualcall, 1) |
| + self.assertEqual(fsa.ttot, 20) |
| + self.assertEqual(fsa.tsub, 9) |
| + self.assertEqual(fsb.ttot, 19) |
| + self.assertEqual(fsb.tsub, 4) |
| + self.assertEqual(fsc.ttot, 17) |
| + self.assertEqual(fsc.tsub, 7) |
| + cfsab = fsa.children[fsb] |
| + cfsbc = fsb.children[fsc] |
| + cfsca = fsc.children[fsa] |
| + self.assertEqual(cfsab.ttot, 19) |
| + self.assertEqual(cfsab.tsub, 4) |
| + self.assertEqual(cfsbc.ttot, 17) |
| + self.assertEqual(cfsbc.tsub, 7) |
| + self.assertEqual(cfsca.ttot, 13) |
| + self.assertEqual(cfsca.tsub, 8) |
| + |
| + def test_abcbca(self): |
| + _timings = {"a_1": 10, "b_1": 9, "c_1": 7, "b_2": 4, "c_2": 2, "a_2": 1} |
| + _yappi._set_test_timings(_timings) |
| + self._ncall = 1 |
| + |
| + def a(): |
| + if self._ncall == 1: |
| + b() |
| + else: |
| + return |
| + |
| + def b(): |
| + c() |
| + |
| + def c(): |
| + if self._ncall == 1: |
| + self._ncall += 1 |
| + b() |
| + else: |
| + a() |
| + |
| + stats = utils.run_and_get_func_stats(a) |
| + fsa = utils.find_stat_by_name(stats, 'a') |
| + fsb = utils.find_stat_by_name(stats, 'b') |
| + fsc = utils.find_stat_by_name(stats, 'c') |
| + cfsab = fsa.children[fsb] |
| + cfsbc = fsb.children[fsc] |
| + cfsca = fsc.children[fsa] |
| + self.assertEqual(fsa.ttot, 10) |
| + self.assertEqual(fsa.tsub, 2) |
| + self.assertEqual(fsb.ttot, 9) |
| + self.assertEqual(fsb.tsub, 4) |
| + self.assertEqual(fsc.ttot, 7) |
| + self.assertEqual(fsc.tsub, 4) |
| + self.assertEqual(cfsab.ttot, 9) |
| + self.assertEqual(cfsab.tsub, 2) |
| + self.assertEqual(cfsbc.ttot, 7) |
| + self.assertEqual(cfsbc.tsub, 4) |
| + self.assertEqual(cfsca.ttot, 1) |
| + self.assertEqual(cfsca.tsub, 1) |
| + self.assertEqual(cfsca.ncall, 1) |
| + self.assertEqual(cfsca.nactualcall, 0) |
| + |
| + def test_aabccb(self): |
| + _timings = { |
| + "a_1": 13, |
| + "a_2": 11, |
| + "b_1": 9, |
| + "c_1": 5, |
| + "c_2": 3, |
| + "b_2": 1 |
| + } |
| + _yappi._set_test_timings(_timings) |
| + self._ncall = 1 |
| + |
| + def a(): |
| + if self._ncall == 1: |
| + self._ncall += 1 |
| + a() |
| + else: |
| + b() |
| + |
| + def b(): |
| + if self._ncall == 3: |
| + return |
| + else: |
| + c() |
| + |
| + def c(): |
| + if self._ncall == 2: |
| + self._ncall += 1 |
| + c() |
| + else: |
| + b() |
| + |
| + stats = utils.run_and_get_func_stats(a) |
| + fsa = utils.find_stat_by_name(stats, 'a') |
| + fsb = utils.find_stat_by_name(stats, 'b') |
| + fsc = utils.find_stat_by_name(stats, 'c') |
| + cfsaa = fsa.children[fsa.index] |
| + cfsab = fsa.children[fsb] |
| + cfsbc = fsb.children[fsc.full_name] |
| + cfscc = fsc.children[fsc] |
| + cfscb = fsc.children[fsb] |
| + self.assertEqual(fsb.ttot, 9) |
| + self.assertEqual(fsb.tsub, 5) |
| + self.assertEqual(cfsbc.ttot, 5) |
| + self.assertEqual(cfsbc.tsub, 2) |
| + self.assertEqual(fsa.ttot, 13) |
| + self.assertEqual(fsa.tsub, 4) |
| + self.assertEqual(cfsab.ttot, 9) |
| + self.assertEqual(cfsab.tsub, 4) |
| + self.assertEqual(cfsaa.ttot, 11) |
| + self.assertEqual(cfsaa.tsub, 2) |
| + self.assertEqual(fsc.ttot, 5) |
| + self.assertEqual(fsc.tsub, 4) |
| + |
| + def test_abaa(self): |
| + _timings = {"a_1": 13, "b_1": 10, "a_2": 9, "a_3": 5} |
| + _yappi._set_test_timings(_timings) |
| + |
| + self._ncall = 1 |
| + |
| + def a(): |
| + if self._ncall == 1: |
| + b() |
| + elif self._ncall == 2: |
| + self._ncall += 1 |
| + a() |
| + else: |
| + return |
| + |
| + def b(): |
| + self._ncall += 1 |
| + a() |
| + |
| + stats = utils.run_and_get_func_stats(a) |
| + fsa = utils.find_stat_by_name(stats, 'a') |
| + fsb = utils.find_stat_by_name(stats, 'b') |
| + cfsaa = fsa.children[fsa] |
| + cfsba = fsb.children[fsa] |
| + self.assertEqual(fsb.ttot, 10) |
| + self.assertEqual(fsb.tsub, 1) |
| + self.assertEqual(fsa.ttot, 13) |
| + self.assertEqual(fsa.tsub, 12) |
| + self.assertEqual(cfsaa.ttot, 5) |
| + self.assertEqual(cfsaa.tsub, 5) |
| + self.assertEqual(cfsba.ttot, 9) |
| + self.assertEqual(cfsba.tsub, 4) |
| + |
| + def test_aabb(self): |
| + _timings = {"a_1": 13, "a_2": 10, "b_1": 9, "b_2": 5} |
| + _yappi._set_test_timings(_timings) |
| + |
| + self._ncall = 1 |
| + |
| + def a(): |
| + if self._ncall == 1: |
| + self._ncall += 1 |
| + a() |
| + elif self._ncall == 2: |
| + b() |
| + else: |
| + return |
| + |
| + def b(): |
| + if self._ncall == 2: |
| + self._ncall += 1 |
| + b() |
| + else: |
| + return |
| + |
| + stats = utils.run_and_get_func_stats(a) |
| + fsa = utils.find_stat_by_name(stats, 'a') |
| + fsb = utils.find_stat_by_name(stats, 'b') |
| + cfsaa = fsa.children[fsa] |
| + cfsab = fsa.children[fsb] |
| + cfsbb = fsb.children[fsb] |
| + self.assertEqual(fsa.ttot, 13) |
| + self.assertEqual(fsa.tsub, 4) |
| + self.assertEqual(fsb.ttot, 9) |
| + self.assertEqual(fsb.tsub, 9) |
| + self.assertEqual(cfsaa.ttot, 10) |
| + self.assertEqual(cfsaa.tsub, 1) |
| + self.assertEqual(cfsab.ttot, 9) |
| + self.assertEqual(cfsab.tsub, 4) |
| + self.assertEqual(cfsbb.ttot, 5) |
| + self.assertEqual(cfsbb.tsub, 5) |
| + |
| + def test_abbb(self): |
| + _timings = {"a_1": 13, "b_1": 10, "b_2": 6, "b_3": 1} |
| + _yappi._set_test_timings(_timings) |
| + |
| + self._ncall = 1 |
| + |
| + def a(): |
| + if self._ncall == 1: |
| + b() |
| + |
| + def b(): |
| + if self._ncall == 3: |
| + return |
| + self._ncall += 1 |
| + b() |
| + |
| + stats = utils.run_and_get_func_stats(a) |
| + fsa = utils.find_stat_by_name(stats, 'a') |
| + fsb = utils.find_stat_by_name(stats, 'b') |
| + cfsab = fsa.children[fsb] |
| + cfsbb = fsb.children[fsb] |
| + self.assertEqual(fsa.ttot, 13) |
| + self.assertEqual(fsa.tsub, 3) |
| + self.assertEqual(fsb.ttot, 10) |
| + self.assertEqual(fsb.tsub, 10) |
| + self.assertEqual(fsb.ncall, 3) |
| + self.assertEqual(fsb.nactualcall, 1) |
| + self.assertEqual(cfsab.ttot, 10) |
| + self.assertEqual(cfsab.tsub, 4) |
| + self.assertEqual(cfsbb.ttot, 6) |
| + self.assertEqual(cfsbb.tsub, 6) |
| + self.assertEqual(cfsbb.nactualcall, 0) |
| + self.assertEqual(cfsbb.ncall, 2) |
| + |
| + def test_aaab(self): |
| + _timings = {"a_1": 13, "a_2": 10, "a_3": 6, "b_1": 1} |
| + _yappi._set_test_timings(_timings) |
| + |
| + self._ncall = 1 |
| + |
| + def a(): |
| + if self._ncall == 3: |
| + b() |
| + return |
| + self._ncall += 1 |
| + a() |
| + |
| + def b(): |
| + return |
| + |
| + stats = utils.run_and_get_func_stats(a) |
| + fsa = utils.find_stat_by_name(stats, 'a') |
| + fsb = utils.find_stat_by_name(stats, 'b') |
| + cfsaa = fsa.children[fsa] |
| + cfsab = fsa.children[fsb] |
| + self.assertEqual(fsa.ttot, 13) |
| + self.assertEqual(fsa.tsub, 12) |
| + self.assertEqual(fsb.ttot, 1) |
| + self.assertEqual(fsb.tsub, 1) |
| + self.assertEqual(cfsaa.ttot, 10) |
| + self.assertEqual(cfsaa.tsub, 9) |
| + self.assertEqual(cfsab.ttot, 1) |
| + self.assertEqual(cfsab.tsub, 1) |
| + |
| + def test_abab(self): |
| + _timings = {"a_1": 13, "b_1": 10, "a_2": 6, "b_2": 1} |
| + _yappi._set_test_timings(_timings) |
| + |
| + self._ncall = 1 |
| + |
| + def a(): |
| + b() |
| + |
| + def b(): |
| + if self._ncall == 2: |
| + return |
| + self._ncall += 1 |
| + a() |
| + |
| + stats = utils.run_and_get_func_stats(a) |
| + fsa = utils.find_stat_by_name(stats, 'a') |
| + fsb = utils.find_stat_by_name(stats, 'b') |
| + cfsab = fsa.children[fsb] |
| + cfsba = fsb.children[fsa] |
| + self.assertEqual(fsa.ttot, 13) |
| + self.assertEqual(fsa.tsub, 8) |
| + self.assertEqual(fsb.ttot, 10) |
| + self.assertEqual(fsb.tsub, 5) |
| + self.assertEqual(cfsab.ttot, 10) |
| + self.assertEqual(cfsab.tsub, 5) |
| + self.assertEqual(cfsab.ncall, 2) |
| + self.assertEqual(cfsab.nactualcall, 1) |
| + self.assertEqual(cfsba.ttot, 6) |
| + self.assertEqual(cfsba.tsub, 5) |
| + |
| + |
| +if __name__ == '__main__': |
| + # import sys;sys.argv = ['', 'BasicUsage.test_run_as_script'] |
| + # import sys;sys.argv = ['', 'MultithreadedScenarios.test_subsequent_profile'] |
| + unittest.main() |
| -- |
| 2.34.1 |
| |