| #!/usr/bin/env python3 |
| # ex:ts=4:sw=4:sts=4:et |
| # -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- |
| # |
| # patchtest: execute all unittest test cases discovered for a single patch |
| # |
| # Copyright (C) 2016 Intel Corporation |
| # |
| # SPDX-License-Identifier: GPL-2.0-only |
| # |
| |
| import sys |
| import os |
| import unittest |
| import logging |
| import traceback |
| import json |
| |
| # Include current path so test cases can see it |
| sys.path.insert(0, os.path.dirname(os.path.realpath(__file__))) |
| |
| # Include patchtest library |
| sys.path.insert(0, os.path.join(os.path.dirname(os.path.realpath(__file__)), '../meta/lib/patchtest')) |
| |
| from data import PatchTestInput |
| from repo import PatchTestRepo |
| |
| import utils |
| logger = utils.logger_create('patchtest') |
| info = logger.info |
| error = logger.error |
| |
| import repo |
| |
| def getResult(patch, mergepatch, logfile=None): |
| |
| class PatchTestResult(unittest.TextTestResult): |
| """ Patchtest TextTestResult """ |
| shouldStop = True |
| longMessage = False |
| |
| success = 'PASS' |
| fail = 'FAIL' |
| skip = 'SKIP' |
| |
| def startTestRun(self): |
| # let's create the repo already, it can be used later on |
| repoargs = { |
| 'repodir': PatchTestInput.repodir, |
| 'commit' : PatchTestInput.basecommit, |
| 'branch' : PatchTestInput.basebranch, |
| 'patch' : patch, |
| } |
| |
| self.repo_error = False |
| self.test_error = False |
| self.test_failure = False |
| |
| try: |
| self.repo = PatchTestInput.repo = PatchTestRepo(**repoargs) |
| except: |
| logger.error(traceback.print_exc()) |
| self.repo_error = True |
| self.stop() |
| return |
| |
| if mergepatch: |
| self.repo.merge() |
| |
| def addError(self, test, err): |
| self.test_error = True |
| (ty, va, trace) = err |
| logger.error(traceback.print_exc()) |
| |
| def addFailure(self, test, err): |
| test_description = test.id().split('.')[-1].replace('_', ' ').replace("cve", "CVE").replace("signed off by", |
| "Signed-off-by").replace("upstream status", |
| "Upstream-Status").replace("non auh", |
| "non-AUH").replace("presence format", "presence") |
| self.test_failure = True |
| fail_str = '{}: {}: {} ({})'.format(self.fail, |
| test_description, json.loads(str(err[1]))["issue"], |
| test.id()) |
| print(fail_str) |
| if logfile: |
| with open(logfile, "a") as f: |
| f.write(fail_str + "\n") |
| |
| def addSuccess(self, test): |
| test_description = test.id().split('.')[-1].replace('_', ' ').replace("cve", "CVE").replace("signed off by", |
| "Signed-off-by").replace("upstream status", |
| "Upstream-Status").replace("non auh", |
| "non-AUH").replace("presence format", "presence") |
| success_str = '{}: {} ({})'.format(self.success, |
| test_description, test.id()) |
| print(success_str) |
| if logfile: |
| with open(logfile, "a") as f: |
| f.write(success_str + "\n") |
| |
| def addSkip(self, test, reason): |
| test_description = test.id().split('.')[-1].replace('_', ' ').replace("cve", "CVE").replace("signed off by", |
| "Signed-off-by").replace("upstream status", |
| "Upstream-Status").replace("non auh", |
| "non-AUH").replace("presence format", "presence") |
| skip_str = '{}: {}: {} ({})'.format(self.skip, |
| test_description, json.loads(str(reason))["issue"], |
| test.id()) |
| print(skip_str) |
| if logfile: |
| with open(logfile, "a") as f: |
| f.write(skip_str + "\n") |
| |
| def stopTestRun(self): |
| |
| # in case there was an error on repo object creation, just return |
| if self.repo_error: |
| return |
| |
| self.repo.clean() |
| |
| return PatchTestResult |
| |
| def _runner(resultklass, prefix=None): |
| # load test with the corresponding prefix |
| loader = unittest.TestLoader() |
| if prefix: |
| loader.testMethodPrefix = prefix |
| |
| # create the suite with discovered tests and the corresponding runner |
| suite = loader.discover(start_dir=PatchTestInput.testdir, pattern=PatchTestInput.pattern, top_level_dir=PatchTestInput.topdir) |
| ntc = suite.countTestCases() |
| |
| # if there are no test cases, just quit |
| if not ntc: |
| return 2 |
| runner = unittest.TextTestRunner(resultclass=resultklass, verbosity=0) |
| |
| try: |
| result = runner.run(suite) |
| except: |
| logger.error(traceback.print_exc()) |
| logger.error('patchtest: something went wrong') |
| return 1 |
| if result.test_failure or result.test_error: |
| return 1 |
| |
| return 0 |
| |
| def run(patch, logfile=None): |
| """ Load, setup and run pre and post-merge tests """ |
| # Get the result class and install the control-c handler |
| unittest.installHandler() |
| |
| # run pre-merge tests, meaning those methods with 'pretest' as prefix |
| premerge_resultklass = getResult(patch, False, logfile) |
| premerge_result = _runner(premerge_resultklass, 'pretest') |
| |
| # run post-merge tests, meaning those methods with 'test' as prefix |
| postmerge_resultklass = getResult(patch, True, logfile) |
| postmerge_result = _runner(postmerge_resultklass, 'test') |
| |
| print('----------------------------------------------------------------------\n') |
| if premerge_result == 2 and postmerge_result == 2: |
| logger.error('patchtest: No test cases found - did you specify the correct suite directory?') |
| if premerge_result == 1 or postmerge_result == 1: |
| logger.error('WARNING: patchtest: At least one patchtest caused a failure or an error - please check') |
| else: |
| logger.error('OK: patchtest: All patchtests passed') |
| print('----------------------------------------------------------------------\n') |
| return premerge_result or postmerge_result |
| |
| def main(): |
| tmp_patch = False |
| patch_path = PatchTestInput.patch_path |
| log_results = PatchTestInput.log_results |
| log_path = None |
| patch_list = None |
| |
| git_status = os.popen("(cd %s && git status)" % PatchTestInput.repodir).read() |
| status_matches = ["Changes not staged for commit", "Changes to be committed"] |
| if any([match in git_status for match in status_matches]): |
| logger.error("patchtest: there are uncommitted changes in the target repo that would be overwritten. Please commit or restore them before running patchtest") |
| return 1 |
| |
| if os.path.isdir(patch_path): |
| patch_list = [os.path.join(patch_path, filename) for filename in sorted(os.listdir(patch_path))] |
| else: |
| patch_list = [patch_path] |
| |
| for patch in patch_list: |
| if os.path.getsize(patch) == 0: |
| logger.error('patchtest: patch is empty') |
| return 1 |
| |
| logger.info('Testing patch %s' % patch) |
| |
| if log_results: |
| log_path = patch + ".testresult" |
| with open(log_path, "a") as f: |
| f.write("Patchtest results for patch '%s':\n\n" % patch) |
| |
| try: |
| if log_path: |
| run(patch, log_path) |
| else: |
| run(patch) |
| finally: |
| if tmp_patch: |
| os.remove(patch) |
| |
| if __name__ == '__main__': |
| ret = 1 |
| |
| # Parse the command line arguments and store it on the PatchTestInput namespace |
| PatchTestInput.set_namespace() |
| |
| # set debugging level |
| if PatchTestInput.debug: |
| logger.setLevel(logging.DEBUG) |
| |
| # if topdir not define, default it to testdir |
| if not PatchTestInput.topdir: |
| PatchTestInput.topdir = PatchTestInput.testdir |
| |
| try: |
| ret = main() |
| except Exception: |
| import traceback |
| traceback.print_exc(5) |
| |
| sys.exit(ret) |