| import sys, os, re, struct, operator, math |
| from collections import defaultdict |
| import unittest |
| |
| sys.path.insert(0, os.getcwd()) |
| |
| import pybootchartgui.parsing as parsing |
| import pybootchartgui.main as main |
| |
| debug = False |
| |
| def floatEq(f1, f2): |
| return math.fabs(f1-f2) < 0.00001 |
| |
| bootchart_dir = os.path.join(os.path.dirname(sys.argv[0]), '../../examples/1/') |
| parser = main._mk_options_parser() |
| options, args = parser.parse_args(['--q', bootchart_dir]) |
| writer = main._mk_writer(options) |
| |
| class TestBCParser(unittest.TestCase): |
| |
| def setUp(self): |
| self.name = "My first unittest" |
| self.rootdir = bootchart_dir |
| |
| def mk_fname(self,f): |
| return os.path.join(self.rootdir, f) |
| |
| def testParseHeader(self): |
| trace = parsing.Trace(writer, args, options) |
| state = parsing.parse_file(writer, trace, self.mk_fname('header')) |
| self.assertEqual(6, len(state.headers)) |
| self.assertEqual(2, parsing.get_num_cpus(state.headers)) |
| |
| def test_parseTimedBlocks(self): |
| trace = parsing.Trace(writer, args, options) |
| state = parsing.parse_file(writer, trace, self.mk_fname('proc_diskstats.log')) |
| self.assertEqual(141, len(state.disk_stats)) |
| |
| def testParseProcPsLog(self): |
| trace = parsing.Trace(writer, args, options) |
| state = parsing.parse_file(writer, trace, self.mk_fname('proc_ps.log')) |
| samples = state.ps_stats |
| processes = samples.process_map |
| sorted_processes = [processes[k] for k in sorted(processes.keys())] |
| |
| ps_data = open(self.mk_fname('extract2.proc_ps.log')) |
| for index, line in enumerate(ps_data): |
| tokens = line.split(); |
| process = sorted_processes[index] |
| if debug: |
| print(tokens[0:4]) |
| print(process.pid / 1000, process.cmd, process.ppid, len(process.samples)) |
| print('-------------------') |
| |
| self.assertEqual(tokens[0], str(process.pid // 1000)) |
| self.assertEqual(tokens[1], str(process.cmd)) |
| self.assertEqual(tokens[2], str(process.ppid // 1000)) |
| self.assertEqual(tokens[3], str(len(process.samples))) |
| ps_data.close() |
| |
| def testparseProcDiskStatLog(self): |
| trace = parsing.Trace(writer, args, options) |
| state_with_headers = parsing.parse_file(writer, trace, self.mk_fname('header')) |
| state_with_headers.headers['system.cpu'] = 'xxx (2)' |
| samples = parsing.parse_file(writer, state_with_headers, self.mk_fname('proc_diskstats.log')).disk_stats |
| self.assertEqual(141, len(samples)) |
| |
| diskstats_data = open(self.mk_fname('extract.proc_diskstats.log')) |
| for index, line in enumerate(diskstats_data): |
| tokens = line.split('\t') |
| sample = samples[index] |
| if debug: |
| print(line.rstrip()) |
| print(sample) |
| print('-------------------') |
| |
| self.assertEqual(tokens[0], str(sample.time)) |
| self.assert_(floatEq(float(tokens[1]), sample.read)) |
| self.assert_(floatEq(float(tokens[2]), sample.write)) |
| self.assert_(floatEq(float(tokens[3]), sample.util)) |
| diskstats_data.close() |
| |
| def testparseProcStatLog(self): |
| trace = parsing.Trace(writer, args, options) |
| samples = parsing.parse_file(writer, trace, self.mk_fname('proc_stat.log')).cpu_stats |
| self.assertEqual(141, len(samples)) |
| |
| stat_data = open(self.mk_fname('extract.proc_stat.log')) |
| for index, line in enumerate(stat_data): |
| tokens = line.split('\t') |
| sample = samples[index] |
| if debug: |
| print(line.rstrip()) |
| print(sample) |
| print('-------------------') |
| self.assert_(floatEq(float(tokens[0]), sample.time)) |
| self.assert_(floatEq(float(tokens[1]), sample.user)) |
| self.assert_(floatEq(float(tokens[2]), sample.sys)) |
| self.assert_(floatEq(float(tokens[3]), sample.io)) |
| stat_data.close() |
| |
| if __name__ == '__main__': |
| unittest.main() |
| |