| # |
| # Copyright OpenEmbedded Contributors |
| # |
| # SPDX-License-Identifier: MIT |
| # |
| |
| import collections |
| import os |
| import sys |
| |
| from shutil import rmtree |
| from oeqa.runtime.case import OERuntimeTestCase |
| from oeqa.core.decorator.depends import OETestDepends |
| |
| # importlib.resources.open_text in Python <3.10 doesn't search all directories |
| # when a package is split across multiple directories. Until we can rely on |
| # 3.10+, reimplement the searching logic. |
| if sys.version_info < (3, 10): |
| def _open_text(package, resource): |
| import importlib, pathlib |
| module = importlib.import_module(package) |
| for path in module.__path__: |
| candidate = pathlib.Path(path) / resource |
| if candidate.exists(): |
| return candidate.open(encoding='utf-8') |
| raise FileNotFoundError |
| else: |
| from importlib.resources import open_text as _open_text |
| |
| |
| class ParseLogsTest(OERuntimeTestCase): |
| |
| # Which log files should be collected |
| log_locations = ["/var/log/", "/var/log/dmesg", "/tmp/dmesg_output.log"] |
| |
| # The keywords that identify error messages in the log files |
| errors = ["error", "cannot", "can't", "failed"] |
| |
| # A list of error messages that should be ignored |
| ignore_errors = [] |
| |
| @classmethod |
| def setUpClass(cls): |
| # When systemd is enabled we need to notice errors on |
| # circular dependencies in units. |
| if 'systemd' in cls.td.get('DISTRO_FEATURES'): |
| cls.errors.extend([ |
| 'Found ordering cycle on', |
| 'Breaking ordering cycle by deleting job', |
| 'deleted to break ordering cycle', |
| 'Ordering cycle found, skipping', |
| ]) |
| |
| cls.errors = [s.casefold() for s in cls.errors] |
| |
| cls.load_machine_ignores() |
| |
| @classmethod |
| def load_machine_ignores(cls): |
| # Add TARGET_ARCH explicitly as not every machine has that in MACHINEOVERRDES (eg qemux86-64) |
| for candidate in ["common", cls.td.get("TARGET_ARCH")] + cls.td.get("MACHINEOVERRIDES").split(":"): |
| try: |
| name = f"parselogs-ignores-{candidate}.txt" |
| for line in _open_text("oeqa.runtime.cases", name): |
| line = line.strip() |
| if line and not line.startswith("#"): |
| cls.ignore_errors.append(line.casefold()) |
| except FileNotFoundError: |
| pass |
| |
| # Go through the log locations provided and if it's a folder |
| # create a list with all the .log files in it, if it's a file |
| # just add it to that list. |
| def getLogList(self, log_locations): |
| logs = [] |
| for location in log_locations: |
| status, _ = self.target.run('test -f %s' % location) |
| if status == 0: |
| logs.append(location) |
| else: |
| status, _ = self.target.run('test -d %s' % location) |
| if status == 0: |
| cmd = 'find %s -name \\*.log -maxdepth 1 -type f' % location |
| status, output = self.target.run(cmd) |
| if status == 0: |
| output = output.splitlines() |
| for logfile in output: |
| logs.append(os.path.join(location, logfile)) |
| return logs |
| |
| # Copy the log files to be parsed locally |
| def transfer_logs(self, log_list): |
| workdir = self.td.get('WORKDIR') |
| self.target_logs = workdir + '/' + 'target_logs' |
| target_logs = self.target_logs |
| if os.path.exists(target_logs): |
| rmtree(self.target_logs) |
| os.makedirs(target_logs) |
| for f in log_list: |
| self.target.copyFrom(str(f), target_logs) |
| |
| # Get the local list of logs |
| def get_local_log_list(self, log_locations): |
| self.transfer_logs(self.getLogList(log_locations)) |
| list_dir = os.listdir(self.target_logs) |
| dir_files = [os.path.join(self.target_logs, f) for f in list_dir] |
| logs = [f for f in dir_files if os.path.isfile(f)] |
| return logs |
| |
| def get_context(self, lines, index, before=6, after=3): |
| """ |
| Given a set of lines and the index of the line that is important, return |
| a number of lines surrounding that line. |
| """ |
| last = len(lines) |
| |
| start = index - before |
| end = index + after + 1 |
| |
| if start < 0: |
| end -= start |
| start = 0 |
| if end > last: |
| start -= end - last |
| end = last |
| |
| return lines[start:end] |
| |
| def test_get_context(self): |
| """ |
| A test case for the test case. |
| """ |
| lines = list(range(0,10)) |
| self.assertEqual(self.get_context(lines, 0, 2, 1), [0, 1, 2, 3]) |
| self.assertEqual(self.get_context(lines, 5, 2, 1), [3, 4, 5, 6]) |
| self.assertEqual(self.get_context(lines, 9, 2, 1), [6, 7, 8, 9]) |
| |
| def parse_logs(self, logs, lines_before=10, lines_after=10): |
| """ |
| Search the log files @logs looking for error lines (marked by |
| @self.errors), ignoring anything listed in @self.ignore_errors. |
| |
| Returns a dictionary of log filenames to a dictionary of error lines to |
| the error context (controlled by @lines_before and @lines_after). |
| """ |
| results = collections.defaultdict(dict) |
| |
| for log in logs: |
| with open(log) as f: |
| lines = f.readlines() |
| |
| for i, line in enumerate(lines): |
| line = line.strip() |
| line_lower = line.casefold() |
| |
| if any(keyword in line_lower for keyword in self.errors): |
| if not any(ignore in line_lower for ignore in self.ignore_errors): |
| results[log][line] = "".join(self.get_context(lines, i, lines_before, lines_after)) |
| |
| return results |
| |
| # Get the output of dmesg and write it in a file. |
| # This file is added to log_locations. |
| def write_dmesg(self): |
| (status, dmesg) = self.target.run('dmesg > /tmp/dmesg_output.log') |
| |
| @OETestDepends(['ssh.SSHTest.test_ssh']) |
| def test_parselogs(self): |
| self.write_dmesg() |
| log_list = self.get_local_log_list(self.log_locations) |
| result = self.parse_logs(log_list) |
| |
| errcount = 0 |
| self.msg = "" |
| for log in result: |
| self.msg += 'Log: ' + log + '\n' |
| self.msg += '-----------------------\n' |
| for error in result[log]: |
| errcount += 1 |
| self.msg += 'Central error: ' + error + '\n' |
| self.msg += '***********************\n' |
| self.msg += result[log][error] + '\n' |
| self.msg += '***********************\n' |
| self.msg += '%s errors found in logs.' % errcount |
| self.assertEqual(errcount, 0, msg=self.msg) |