blob: a1c824f7b7c35fbb9e563332557a0137391ba06e [file] [log] [blame]
Patrick Williamsac13d5f2023-11-24 18:59:46 -06001#!/usr/bin/env python3
2# ex:ts=4:sw=4:sts=4:et
3# -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*-
4#
5# patchtest: execute all unittest test cases discovered for a single patch
6#
7# Copyright (C) 2016 Intel Corporation
8#
9# SPDX-License-Identifier: GPL-2.0-only
10#
11
12import sys
13import os
14import unittest
15import logging
16import traceback
17import json
18
19# Include current path so test cases can see it
20sys.path.insert(0, os.path.dirname(os.path.realpath(__file__)))
21
22# Include patchtest library
23sys.path.insert(0, os.path.join(os.path.dirname(os.path.realpath(__file__)), '../meta/lib/patchtest'))
24
25from data import PatchTestInput
26from repo import PatchTestRepo
27
28import utils
29logger = utils.logger_create('patchtest')
30info = logger.info
31error = logger.error
32
33import repo
34
35def getResult(patch, mergepatch, logfile=None):
36
37 class PatchTestResult(unittest.TextTestResult):
38 """ Patchtest TextTestResult """
39 shouldStop = True
40 longMessage = False
41
42 success = 'PASS'
43 fail = 'FAIL'
44 skip = 'SKIP'
45
46 def startTestRun(self):
47 # let's create the repo already, it can be used later on
48 repoargs = {
49 'repodir': PatchTestInput.repodir,
50 'commit' : PatchTestInput.basecommit,
51 'branch' : PatchTestInput.basebranch,
52 'patch' : patch,
53 }
54
55 self.repo_error = False
56 self.test_error = False
57 self.test_failure = False
58
59 try:
60 self.repo = PatchTestInput.repo = PatchTestRepo(**repoargs)
61 except:
62 logger.error(traceback.print_exc())
63 self.repo_error = True
64 self.stop()
65 return
66
67 if mergepatch:
68 self.repo.merge()
69
70 def addError(self, test, err):
71 self.test_error = True
72 (ty, va, trace) = err
73 logger.error(traceback.print_exc())
74
75 def addFailure(self, test, err):
76 test_description = test.id().split('.')[-1].replace('_', ' ').replace("cve", "CVE").replace("signed off by",
77 "Signed-off-by").replace("upstream status",
78 "Upstream-Status").replace("non auh",
79 "non-AUH").replace("presence format", "presence")
80 self.test_failure = True
81 fail_str = '{}: {}: {} ({})'.format(self.fail,
82 test_description, json.loads(str(err[1]))["issue"],
83 test.id())
84 print(fail_str)
85 if logfile:
86 with open(logfile, "a") as f:
87 f.write(fail_str + "\n")
88
89 def addSuccess(self, test):
90 test_description = test.id().split('.')[-1].replace('_', ' ').replace("cve", "CVE").replace("signed off by",
91 "Signed-off-by").replace("upstream status",
92 "Upstream-Status").replace("non auh",
93 "non-AUH").replace("presence format", "presence")
94 success_str = '{}: {} ({})'.format(self.success,
95 test_description, test.id())
96 print(success_str)
97 if logfile:
98 with open(logfile, "a") as f:
99 f.write(success_str + "\n")
100
101 def addSkip(self, test, reason):
102 test_description = test.id().split('.')[-1].replace('_', ' ').replace("cve", "CVE").replace("signed off by",
103 "Signed-off-by").replace("upstream status",
104 "Upstream-Status").replace("non auh",
105 "non-AUH").replace("presence format", "presence")
106 skip_str = '{}: {}: {} ({})'.format(self.skip,
107 test_description, json.loads(str(reason))["issue"],
108 test.id())
109 print(skip_str)
110 if logfile:
111 with open(logfile, "a") as f:
112 f.write(skip_str + "\n")
113
114 def stopTestRun(self):
115
116 # in case there was an error on repo object creation, just return
117 if self.repo_error:
118 return
119
120 self.repo.clean()
121
122 return PatchTestResult
123
124def _runner(resultklass, prefix=None):
125 # load test with the corresponding prefix
126 loader = unittest.TestLoader()
127 if prefix:
128 loader.testMethodPrefix = prefix
129
130 # create the suite with discovered tests and the corresponding runner
131 suite = loader.discover(start_dir=PatchTestInput.testdir, pattern=PatchTestInput.pattern, top_level_dir=PatchTestInput.topdir)
132 ntc = suite.countTestCases()
133
134 # if there are no test cases, just quit
135 if not ntc:
136 return 2
137 runner = unittest.TextTestRunner(resultclass=resultklass, verbosity=0)
138
139 try:
140 result = runner.run(suite)
141 except:
142 logger.error(traceback.print_exc())
143 logger.error('patchtest: something went wrong')
144 return 1
145
146 return 0
147
148def run(patch, logfile=None):
149 """ Load, setup and run pre and post-merge tests """
150 # Get the result class and install the control-c handler
151 unittest.installHandler()
152
153 # run pre-merge tests, meaning those methods with 'pretest' as prefix
154 premerge_resultklass = getResult(patch, False, logfile)
155 premerge_result = _runner(premerge_resultklass, 'pretest')
156
157 # run post-merge tests, meaning those methods with 'test' as prefix
158 postmerge_resultklass = getResult(patch, True, logfile)
159 postmerge_result = _runner(postmerge_resultklass, 'test')
160
161 if premerge_result == 2 and postmerge_result == 2:
162 logger.error('patchtest: any test cases found - did you specify the correct suite directory?')
163
164 return premerge_result or postmerge_result
165
166def main():
167 tmp_patch = False
168 patch_path = PatchTestInput.patch_path
169 log_results = PatchTestInput.log_results
170 log_path = None
171 patch_list = None
172
173 git_status = os.popen("(cd %s && git status)" % PatchTestInput.repodir).read()
174 status_matches = ["Changes not staged for commit", "Changes to be committed"]
175 if any([match in git_status for match in status_matches]):
176 logger.error("patchtest: there are uncommitted changes in the target repo that would be overwritten. Please commit or restore them before running patchtest")
177 return 1
178
179 if os.path.isdir(patch_path):
180 patch_list = [os.path.join(patch_path, filename) for filename in sorted(os.listdir(patch_path))]
181 else:
182 patch_list = [patch_path]
183
184 for patch in patch_list:
185 if os.path.getsize(patch) == 0:
186 logger.error('patchtest: patch is empty')
187 return 1
188
189 logger.info('Testing patch %s' % patch)
190
191 if log_results:
192 log_path = patch + ".testresult"
193 with open(log_path, "a") as f:
194 f.write("Patchtest results for patch '%s':\n\n" % patch)
195
196 try:
197 if log_path:
198 run(patch, log_path)
199 else:
200 run(patch)
201 finally:
202 if tmp_patch:
203 os.remove(patch)
204
205if __name__ == '__main__':
206 ret = 1
207
208 # Parse the command line arguments and store it on the PatchTestInput namespace
209 PatchTestInput.set_namespace()
210
211 # set debugging level
212 if PatchTestInput.debug:
213 logger.setLevel(logging.DEBUG)
214
215 # if topdir not define, default it to testdir
216 if not PatchTestInput.topdir:
217 PatchTestInput.topdir = PatchTestInput.testdir
218
219 try:
220 ret = main()
221 except Exception:
222 import traceback
223 traceback.print_exc(5)
224
225 sys.exit(ret)