blob: 6966923c94df6bb1a449cc837dc44cdbe389d12b [file] [log] [blame]
#
# Copyright OpenEmbedded Contributors
#
# SPDX-License-Identifier: MIT
#
import collections
import os
import sys
from shutil import rmtree
from oeqa.runtime.case import OERuntimeTestCase
from oeqa.core.decorator.depends import OETestDepends
# importlib.resources.open_text in Python <3.10 doesn't search all directories
# when a package is split across multiple directories. Until we can rely on
# 3.10+, reimplement the searching logic.
if sys.version_info < (3, 10):
def _open_text(package, resource):
import importlib, pathlib
module = importlib.import_module(package)
for path in module.__path__:
candidate = pathlib.Path(path) / resource
if candidate.exists():
return candidate.open(encoding='utf-8')
raise FileNotFoundError
else:
from importlib.resources import open_text as _open_text
class ParseLogsTest(OERuntimeTestCase):
# Which log files should be collected
log_locations = ["/var/log/", "/var/log/dmesg", "/tmp/dmesg_output.log"]
# The keywords that identify error messages in the log files
errors = ["error", "cannot", "can't", "failed"]
# A list of error messages that should be ignored
ignore_errors = []
@classmethod
def setUpClass(cls):
# When systemd is enabled we need to notice errors on
# circular dependencies in units.
if 'systemd' in cls.td.get('DISTRO_FEATURES'):
cls.errors.extend([
'Found ordering cycle on',
'Breaking ordering cycle by deleting job',
'deleted to break ordering cycle',
'Ordering cycle found, skipping',
])
cls.errors = [s.casefold() for s in cls.errors]
cls.load_machine_ignores()
@classmethod
def load_machine_ignores(cls):
# Add TARGET_ARCH explicitly as not every machine has that in MACHINEOVERRDES (eg qemux86-64)
for candidate in ["common", cls.td.get("TARGET_ARCH")] + cls.td.get("MACHINEOVERRIDES").split(":"):
try:
name = f"parselogs-ignores-{candidate}.txt"
for line in _open_text("oeqa.runtime.cases", name):
line = line.strip()
if line and not line.startswith("#"):
cls.ignore_errors.append(line.casefold())
except FileNotFoundError:
pass
# Go through the log locations provided and if it's a folder
# create a list with all the .log files in it, if it's a file
# just add it to that list.
def getLogList(self, log_locations):
logs = []
for location in log_locations:
status, _ = self.target.run('test -f %s' % location)
if status == 0:
logs.append(location)
else:
status, _ = self.target.run('test -d %s' % location)
if status == 0:
cmd = 'find %s -name \\*.log -maxdepth 1 -type f' % location
status, output = self.target.run(cmd)
if status == 0:
output = output.splitlines()
for logfile in output:
logs.append(os.path.join(location, logfile))
return logs
# Copy the log files to be parsed locally
def transfer_logs(self, log_list):
workdir = self.td.get('WORKDIR')
self.target_logs = workdir + '/' + 'target_logs'
target_logs = self.target_logs
if os.path.exists(target_logs):
rmtree(self.target_logs)
os.makedirs(target_logs)
for f in log_list:
self.target.copyFrom(str(f), target_logs)
# Get the local list of logs
def get_local_log_list(self, log_locations):
self.transfer_logs(self.getLogList(log_locations))
list_dir = os.listdir(self.target_logs)
dir_files = [os.path.join(self.target_logs, f) for f in list_dir]
logs = [f for f in dir_files if os.path.isfile(f)]
return logs
def get_context(self, lines, index, before=6, after=3):
"""
Given a set of lines and the index of the line that is important, return
a number of lines surrounding that line.
"""
last = len(lines)
start = index - before
end = index + after + 1
if start < 0:
end -= start
start = 0
if end > last:
start -= end - last
end = last
return lines[start:end]
def test_get_context(self):
"""
A test case for the test case.
"""
lines = list(range(0,10))
self.assertEqual(self.get_context(lines, 0, 2, 1), [0, 1, 2, 3])
self.assertEqual(self.get_context(lines, 5, 2, 1), [3, 4, 5, 6])
self.assertEqual(self.get_context(lines, 9, 2, 1), [6, 7, 8, 9])
def parse_logs(self, logs, lines_before=10, lines_after=10):
"""
Search the log files @logs looking for error lines (marked by
@self.errors), ignoring anything listed in @self.ignore_errors.
Returns a dictionary of log filenames to a dictionary of error lines to
the error context (controlled by @lines_before and @lines_after).
"""
results = collections.defaultdict(dict)
for log in logs:
with open(log) as f:
lines = f.readlines()
for i, line in enumerate(lines):
line = line.strip()
line_lower = line.casefold()
if any(keyword in line_lower for keyword in self.errors):
if not any(ignore in line_lower for ignore in self.ignore_errors):
results[log][line] = "".join(self.get_context(lines, i, lines_before, lines_after))
return results
# Get the output of dmesg and write it in a file.
# This file is added to log_locations.
def write_dmesg(self):
(status, dmesg) = self.target.run('dmesg > /tmp/dmesg_output.log')
@OETestDepends(['ssh.SSHTest.test_ssh'])
def test_parselogs(self):
self.write_dmesg()
log_list = self.get_local_log_list(self.log_locations)
result = self.parse_logs(log_list)
errcount = 0
self.msg = ""
for log in result:
self.msg += 'Log: ' + log + '\n'
self.msg += '-----------------------\n'
for error in result[log]:
errcount += 1
self.msg += 'Central error: ' + error + '\n'
self.msg += '***********************\n'
self.msg += result[log][error] + '\n'
self.msg += '***********************\n'
self.msg += '%s errors found in logs.' % errcount
self.assertEqual(errcount, 0, msg=self.msg)