blob: 496d9e0c9031a6fb5ff58de78a3a80541a2e0e11 [file] [log] [blame]
#
# Copyright OpenEmbedded Contributors
#
# SPDX-License-Identifier: MIT
#
import enum
import os
import re
# A parser that can be used to identify weather a line is a test result or a section statement.
class PtestParser(object):
def __init__(self):
self.results = {}
self.sections = {}
def parse(self, logfile):
test_regex = {}
test_regex['PASSED'] = re.compile(r"^PASS:(.+)")
test_regex['FAILED'] = re.compile(r"^FAIL:([^(]+)")
test_regex['SKIPPED'] = re.compile(r"^SKIP:(.+)")
section_regex = {}
section_regex['begin'] = re.compile(r"^BEGIN: .*/(.+)/ptest")
section_regex['end'] = re.compile(r"^END: .*/(.+)/ptest")
section_regex['duration'] = re.compile(r"^DURATION: (.+)")
section_regex['exitcode'] = re.compile(r"^ERROR: Exit status is (.+)")
section_regex['timeout'] = re.compile(r"^TIMEOUT: .*/(.+)/ptest")
# Cache markers so we don't take the re.search() hit all the time.
markers = ("PASS:", "FAIL:", "SKIP:", "BEGIN:", "END:", "DURATION:", "ERROR: Exit", "TIMEOUT:")
def newsection():
return { 'name': "No-section", 'log': [] }
current_section = newsection()
with open(logfile, errors='replace') as f:
for line in f:
if not line.startswith(markers):
current_section['log'].append(line)
continue
result = section_regex['begin'].search(line)
if result:
current_section['name'] = result.group(1)
if current_section['name'] not in self.results:
self.results[current_section['name']] = {}
continue
result = section_regex['end'].search(line)
if result:
if current_section['name'] != result.group(1):
bb.warn("Ptest END log section mismatch %s vs. %s" % (current_section['name'], result.group(1)))
if current_section['name'] in self.sections:
bb.warn("Ptest duplicate section for %s" % (current_section['name']))
self.sections[current_section['name']] = current_section
del self.sections[current_section['name']]['name']
current_section = newsection()
continue
result = section_regex['timeout'].search(line)
if result:
if current_section['name'] != result.group(1):
bb.warn("Ptest TIMEOUT log section mismatch %s vs. %s" % (current_section['name'], result.group(1)))
current_section['timeout'] = True
continue
for t in ['duration', 'exitcode']:
result = section_regex[t].search(line)
if result:
current_section[t] = result.group(1)
continue
current_section['log'].append(line)
for t in test_regex:
result = test_regex[t].search(line)
if result:
try:
self.results[current_section['name']][result.group(1).strip()] = t
except KeyError:
bb.warn("Result with no section: %s - %s" % (t, result.group(1).strip()))
# Python performance for repeatedly joining long strings is poor, do it all at once at the end.
# For 2.1 million lines in a log this reduces 18 hours to 12s.
for section in self.sections:
self.sections[section]['log'] = "".join(self.sections[section]['log'])
return self.results, self.sections
# Log the results as files. The file name is the section name and the contents are the tests in that section.
def results_as_files(self, target_dir):
if not os.path.exists(target_dir):
raise Exception("Target directory does not exist: %s" % target_dir)
for section in self.results:
prefix = 'No-section'
if section:
prefix = section
section_file = os.path.join(target_dir, prefix)
# purge the file contents if it exists
with open(section_file, 'w') as f:
for test_name in sorted(self.results[section]):
status = self.results[section][test_name]
f.write(status + ": " + test_name + "\n")
class LtpParser:
"""
Parse the machine-readable LTP log output into a ptest-friendly data structure.
"""
def parse(self, logfile):
results = {}
# Aaccumulate the duration here but as the log rounds quick tests down
# to 0 seconds this is very much a lower bound. The caller can replace
# the value.
section = {"duration": 0, "log": ""}
class LtpExitCode(enum.IntEnum):
# Exit codes as defined in ltp/include/tst_res_flags.h
TPASS = 0 # Test passed flag
TFAIL = 1 # Test failed flag
TBROK = 2 # Test broken flag
TWARN = 4 # Test warning flag
TINFO = 16 # Test information flag
TCONF = 32 # Test not appropriate for configuration flag
with open(logfile, errors="replace") as f:
# Lines look like this:
# tag=cfs_bandwidth01 stime=1689762564 dur=0 exit=exited stat=32 core=no cu=0 cs=0
for line in f:
if not line.startswith("tag="):
continue
values = dict(s.split("=") for s in line.strip().split())
section["duration"] += int(values["dur"])
exitcode = int(values["stat"])
if values["exit"] == "exited" and exitcode == LtpExitCode.TCONF:
# Exited normally with the "invalid configuration" code
results[values["tag"]] = "SKIPPED"
elif exitcode == LtpExitCode.TPASS:
# Successful exit
results[values["tag"]] = "PASSED"
else:
# Other exit
results[values["tag"]] = "FAILED"
return results, section
# ltp Compliance log parsing
class LtpComplianceParser(object):
def __init__(self):
self.results = {}
self.section = {'duration': "", 'log': ""}
def parse(self, logfile):
test_regex = {}
test_regex['FAILED'] = re.compile(r"FAIL")
section_regex = {}
section_regex['test'] = re.compile(r"^Executing")
with open(logfile, errors='replace') as f:
name = logfile
result = "PASSED"
for line in f:
regex_result = section_regex['test'].search(line)
if regex_result:
name = line.split()[1].strip()
regex_result = test_regex['FAILED'].search(line)
if regex_result:
result = "FAILED"
self.results[name] = result
for test in self.results:
result = self.results[test]
print (self.results)
self.section['log'] = self.section['log'] + ("%s: %s\n" % (result.strip()[:-2], test.strip()))
return self.results, self.section