Brad Bishop | 1a4b7ee | 2018-12-16 17:11:34 -0800 | [diff] [blame] | 1 | #!/usr/bin/env python3 |
| 2 | # |
Brad Bishop | c342db3 | 2019-05-15 21:57:59 -0400 | [diff] [blame] | 3 | # SPDX-License-Identifier: GPL-2.0-or-later |
| 4 | # |
Brad Bishop | 1a4b7ee | 2018-12-16 17:11:34 -0800 | [diff] [blame] | 5 | # Modified for use in OE by Richard Purdie, 2018 |
| 6 | # |
| 7 | # Modified by: Corey Goldberg, 2013 |
| 8 | # License: GPLv2+ |
| 9 | # |
| 10 | # Original code from: |
| 11 | # Bazaar (bzrlib.tests.__init__.py, v2.6, copied Jun 01 2013) |
| 12 | # Copyright (C) 2005-2011 Canonical Ltd |
| 13 | # License: GPLv2+ |
| 14 | |
| 15 | import os |
| 16 | import sys |
| 17 | import traceback |
| 18 | import unittest |
| 19 | import subprocess |
| 20 | import testtools |
| 21 | import threading |
| 22 | import time |
| 23 | import io |
Brad Bishop | 79641f2 | 2019-09-10 07:20:22 -0400 | [diff] [blame] | 24 | import json |
Brad Bishop | c342db3 | 2019-05-15 21:57:59 -0400 | [diff] [blame] | 25 | import subunit |
Brad Bishop | 1a4b7ee | 2018-12-16 17:11:34 -0800 | [diff] [blame] | 26 | |
| 27 | from queue import Queue |
| 28 | from itertools import cycle |
| 29 | from subunit import ProtocolTestCase, TestProtocolClient |
| 30 | from subunit.test_results import AutoTimingTestResultDecorator |
| 31 | from testtools import ThreadsafeForwardingResult, iterate_tests |
Brad Bishop | 79641f2 | 2019-09-10 07:20:22 -0400 | [diff] [blame] | 32 | from testtools.content import Content |
| 33 | from testtools.content_type import ContentType |
Brad Bishop | 1932369 | 2019-04-05 15:28:33 -0400 | [diff] [blame] | 34 | from oeqa.utils.commands import get_test_layer |
Brad Bishop | 1a4b7ee | 2018-12-16 17:11:34 -0800 | [diff] [blame] | 35 | |
| 36 | import bb.utils |
| 37 | import oe.path |
| 38 | |
| 39 | _all__ = [ |
| 40 | 'ConcurrentTestSuite', |
| 41 | 'fork_for_tests', |
| 42 | 'partition_tests', |
| 43 | ] |
| 44 | |
| 45 | # |
| 46 | # Patch the version from testtools to allow access to _test_start and allow |
| 47 | # computation of timing information and threading progress |
| 48 | # |
| 49 | class BBThreadsafeForwardingResult(ThreadsafeForwardingResult): |
| 50 | |
| 51 | def __init__(self, target, semaphore, threadnum, totalinprocess, totaltests): |
| 52 | super(BBThreadsafeForwardingResult, self).__init__(target, semaphore) |
| 53 | self.threadnum = threadnum |
| 54 | self.totalinprocess = totalinprocess |
| 55 | self.totaltests = totaltests |
| 56 | |
| 57 | def _add_result_with_semaphore(self, method, test, *args, **kwargs): |
| 58 | self.semaphore.acquire() |
| 59 | try: |
Brad Bishop | c342db3 | 2019-05-15 21:57:59 -0400 | [diff] [blame] | 60 | if self._test_start: |
| 61 | self.result.starttime[test.id()] = self._test_start.timestamp() |
| 62 | self.result.threadprogress[self.threadnum].append(test.id()) |
| 63 | totalprogress = sum(len(x) for x in self.result.threadprogress.values()) |
| 64 | self.result.progressinfo[test.id()] = "%s: %s/%s %s/%s (%ss) (%s)" % ( |
Brad Bishop | 1a4b7ee | 2018-12-16 17:11:34 -0800 | [diff] [blame] | 65 | self.threadnum, |
| 66 | len(self.result.threadprogress[self.threadnum]), |
| 67 | self.totalinprocess, |
| 68 | totalprogress, |
| 69 | self.totaltests, |
| 70 | "{0:.2f}".format(time.time()-self._test_start.timestamp()), |
| 71 | test.id()) |
| 72 | finally: |
| 73 | self.semaphore.release() |
| 74 | super(BBThreadsafeForwardingResult, self)._add_result_with_semaphore(method, test, *args, **kwargs) |
| 75 | |
Brad Bishop | 79641f2 | 2019-09-10 07:20:22 -0400 | [diff] [blame] | 76 | class ProxyTestResult: |
| 77 | # a very basic TestResult proxy, in order to modify add* calls |
| 78 | def __init__(self, target): |
| 79 | self.result = target |
| 80 | |
| 81 | def _addResult(self, method, test, *args, **kwargs): |
| 82 | return method(test, *args, **kwargs) |
| 83 | |
| 84 | def addError(self, test, *args, **kwargs): |
| 85 | self._addResult(self.result.addError, test, *args, **kwargs) |
| 86 | |
| 87 | def addFailure(self, test, *args, **kwargs): |
| 88 | self._addResult(self.result.addFailure, test, *args, **kwargs) |
| 89 | |
| 90 | def addSuccess(self, test, *args, **kwargs): |
| 91 | self._addResult(self.result.addSuccess, test, *args, **kwargs) |
| 92 | |
| 93 | def addExpectedFailure(self, test, *args, **kwargs): |
| 94 | self._addResult(self.result.addExpectedFailure, test, *args, **kwargs) |
| 95 | |
| 96 | def addUnexpectedSuccess(self, test, *args, **kwargs): |
| 97 | self._addResult(self.result.addUnexpectedSuccess, test, *args, **kwargs) |
| 98 | |
| 99 | def __getattr__(self, attr): |
| 100 | return getattr(self.result, attr) |
| 101 | |
| 102 | class ExtraResultsDecoderTestResult(ProxyTestResult): |
| 103 | def _addResult(self, method, test, *args, **kwargs): |
| 104 | if "details" in kwargs and "extraresults" in kwargs["details"]: |
| 105 | if isinstance(kwargs["details"]["extraresults"], Content): |
| 106 | kwargs = kwargs.copy() |
| 107 | kwargs["details"] = kwargs["details"].copy() |
| 108 | extraresults = kwargs["details"]["extraresults"] |
| 109 | data = bytearray() |
| 110 | for b in extraresults.iter_bytes(): |
| 111 | data += b |
| 112 | extraresults = json.loads(data.decode()) |
| 113 | kwargs["details"]["extraresults"] = extraresults |
| 114 | return method(test, *args, **kwargs) |
| 115 | |
| 116 | class ExtraResultsEncoderTestResult(ProxyTestResult): |
| 117 | def _addResult(self, method, test, *args, **kwargs): |
| 118 | if hasattr(test, "extraresults"): |
| 119 | extras = lambda : [json.dumps(test.extraresults).encode()] |
| 120 | kwargs = kwargs.copy() |
| 121 | if "details" not in kwargs: |
| 122 | kwargs["details"] = {} |
| 123 | else: |
| 124 | kwargs["details"] = kwargs["details"].copy() |
| 125 | kwargs["details"]["extraresults"] = Content(ContentType("application", "json", {'charset': 'utf8'}), extras) |
| 126 | return method(test, *args, **kwargs) |
| 127 | |
Brad Bishop | 1a4b7ee | 2018-12-16 17:11:34 -0800 | [diff] [blame] | 128 | # |
Brad Bishop | c342db3 | 2019-05-15 21:57:59 -0400 | [diff] [blame] | 129 | # We have to patch subunit since it doesn't understand how to handle addError |
| 130 | # outside of a running test case. This can happen if classSetUp() fails |
| 131 | # for a class of tests. This unfortunately has horrible internal knowledge. |
| 132 | # |
| 133 | def outSideTestaddError(self, offset, line): |
| 134 | """An 'error:' directive has been read.""" |
| 135 | test_name = line[offset:-1].decode('utf8') |
| 136 | self.parser._current_test = subunit.RemotedTestCase(test_name) |
| 137 | self.parser.current_test_description = test_name |
| 138 | self.parser._state = self.parser._reading_error_details |
| 139 | self.parser._reading_error_details.set_simple() |
| 140 | self.parser.subunitLineReceived(line) |
| 141 | |
| 142 | subunit._OutSideTest.addError = outSideTestaddError |
| 143 | |
| 144 | |
| 145 | # |
Brad Bishop | 1a4b7ee | 2018-12-16 17:11:34 -0800 | [diff] [blame] | 146 | # A dummy structure to add to io.StringIO so that the .buffer object |
| 147 | # is available and accepts writes. This allows unittest with buffer=True |
| 148 | # to interact ok with subunit which wants to access sys.stdout.buffer. |
| 149 | # |
| 150 | class dummybuf(object): |
| 151 | def __init__(self, parent): |
| 152 | self.p = parent |
| 153 | def write(self, data): |
| 154 | self.p.write(data.decode("utf-8")) |
| 155 | |
| 156 | # |
| 157 | # Taken from testtools.ConncurrencyTestSuite but modified for OE use |
| 158 | # |
| 159 | class ConcurrentTestSuite(unittest.TestSuite): |
| 160 | |
| 161 | def __init__(self, suite, processes): |
| 162 | super(ConcurrentTestSuite, self).__init__([suite]) |
| 163 | self.processes = processes |
| 164 | |
| 165 | def run(self, result): |
| 166 | tests, totaltests = fork_for_tests(self.processes, self) |
| 167 | try: |
| 168 | threads = {} |
| 169 | queue = Queue() |
| 170 | semaphore = threading.Semaphore(1) |
| 171 | result.threadprogress = {} |
| 172 | for i, (test, testnum) in enumerate(tests): |
| 173 | result.threadprogress[i] = [] |
Brad Bishop | 79641f2 | 2019-09-10 07:20:22 -0400 | [diff] [blame] | 174 | process_result = BBThreadsafeForwardingResult( |
| 175 | ExtraResultsDecoderTestResult(result), |
| 176 | semaphore, i, testnum, totaltests) |
Brad Bishop | 1a4b7ee | 2018-12-16 17:11:34 -0800 | [diff] [blame] | 177 | # Force buffering of stdout/stderr so the console doesn't get corrupted by test output |
| 178 | # as per default in parent code |
| 179 | process_result.buffer = True |
| 180 | # We have to add a buffer object to stdout to keep subunit happy |
| 181 | process_result._stderr_buffer = io.StringIO() |
| 182 | process_result._stderr_buffer.buffer = dummybuf(process_result._stderr_buffer) |
| 183 | process_result._stdout_buffer = io.StringIO() |
| 184 | process_result._stdout_buffer.buffer = dummybuf(process_result._stdout_buffer) |
| 185 | reader_thread = threading.Thread( |
| 186 | target=self._run_test, args=(test, process_result, queue)) |
| 187 | threads[test] = reader_thread, process_result |
| 188 | reader_thread.start() |
| 189 | while threads: |
| 190 | finished_test = queue.get() |
| 191 | threads[finished_test][0].join() |
| 192 | del threads[finished_test] |
| 193 | except: |
| 194 | for thread, process_result in threads.values(): |
| 195 | process_result.stop() |
| 196 | raise |
| 197 | finally: |
| 198 | for test in tests: |
| 199 | test[0]._stream.close() |
| 200 | |
| 201 | def _run_test(self, test, process_result, queue): |
| 202 | try: |
| 203 | try: |
| 204 | test.run(process_result) |
| 205 | except Exception: |
| 206 | # The run logic itself failed |
| 207 | case = testtools.ErrorHolder( |
| 208 | "broken-runner", |
| 209 | error=sys.exc_info()) |
| 210 | case.run(process_result) |
| 211 | finally: |
| 212 | queue.put(test) |
| 213 | |
| 214 | def removebuilddir(d): |
| 215 | delay = 5 |
| 216 | while delay and os.path.exists(d + "/bitbake.lock"): |
| 217 | time.sleep(1) |
| 218 | delay = delay - 1 |
Brad Bishop | a34c030 | 2019-09-23 22:34:48 -0400 | [diff] [blame^] | 219 | bb.utils.prunedir(d, ionice=True) |
Brad Bishop | 1a4b7ee | 2018-12-16 17:11:34 -0800 | [diff] [blame] | 220 | |
| 221 | def fork_for_tests(concurrency_num, suite): |
| 222 | result = [] |
Brad Bishop | 1932369 | 2019-04-05 15:28:33 -0400 | [diff] [blame] | 223 | if 'BUILDDIR' in os.environ: |
| 224 | selftestdir = get_test_layer() |
| 225 | |
Brad Bishop | 1a4b7ee | 2018-12-16 17:11:34 -0800 | [diff] [blame] | 226 | test_blocks = partition_tests(suite, concurrency_num) |
| 227 | # Clear the tests from the original suite so it doesn't keep them alive |
| 228 | suite._tests[:] = [] |
| 229 | totaltests = sum(len(x) for x in test_blocks) |
| 230 | for process_tests in test_blocks: |
| 231 | numtests = len(process_tests) |
| 232 | process_suite = unittest.TestSuite(process_tests) |
| 233 | # Also clear each split list so new suite has only reference |
| 234 | process_tests[:] = [] |
| 235 | c2pread, c2pwrite = os.pipe() |
| 236 | # Clear buffers before fork to avoid duplicate output |
| 237 | sys.stdout.flush() |
| 238 | sys.stderr.flush() |
| 239 | pid = os.fork() |
| 240 | if pid == 0: |
| 241 | ourpid = os.getpid() |
| 242 | try: |
| 243 | newbuilddir = None |
| 244 | stream = os.fdopen(c2pwrite, 'wb', 1) |
| 245 | os.close(c2pread) |
| 246 | |
| 247 | # Create a new separate BUILDDIR for each group of tests |
| 248 | if 'BUILDDIR' in os.environ: |
| 249 | builddir = os.environ['BUILDDIR'] |
| 250 | newbuilddir = builddir + "-st-" + str(ourpid) |
Brad Bishop | 1a4b7ee | 2018-12-16 17:11:34 -0800 | [diff] [blame] | 251 | newselftestdir = newbuilddir + "/meta-selftest" |
| 252 | |
| 253 | bb.utils.mkdirhier(newbuilddir) |
| 254 | oe.path.copytree(builddir + "/conf", newbuilddir + "/conf") |
| 255 | oe.path.copytree(builddir + "/cache", newbuilddir + "/cache") |
| 256 | oe.path.copytree(selftestdir, newselftestdir) |
| 257 | |
| 258 | for e in os.environ: |
| 259 | if builddir in os.environ[e]: |
| 260 | os.environ[e] = os.environ[e].replace(builddir, newbuilddir) |
| 261 | |
| 262 | subprocess.check_output("git init; git add *; git commit -a -m 'initial'", cwd=newselftestdir, shell=True) |
| 263 | |
| 264 | # Tried to used bitbake-layers add/remove but it requires recipe parsing and hence is too slow |
| 265 | subprocess.check_output("sed %s/conf/bblayers.conf -i -e 's#%s#%s#g'" % (newbuilddir, selftestdir, newselftestdir), cwd=newbuilddir, shell=True) |
| 266 | |
| 267 | os.chdir(newbuilddir) |
| 268 | |
| 269 | for t in process_suite: |
| 270 | if not hasattr(t, "tc"): |
| 271 | continue |
| 272 | cp = t.tc.config_paths |
| 273 | for p in cp: |
| 274 | if selftestdir in cp[p] and newselftestdir not in cp[p]: |
| 275 | cp[p] = cp[p].replace(selftestdir, newselftestdir) |
| 276 | if builddir in cp[p] and newbuilddir not in cp[p]: |
| 277 | cp[p] = cp[p].replace(builddir, newbuilddir) |
| 278 | |
| 279 | # Leave stderr and stdout open so we can see test noise |
| 280 | # Close stdin so that the child goes away if it decides to |
| 281 | # read from stdin (otherwise its a roulette to see what |
| 282 | # child actually gets keystrokes for pdb etc). |
| 283 | newsi = os.open(os.devnull, os.O_RDWR) |
| 284 | os.dup2(newsi, sys.stdin.fileno()) |
| 285 | |
| 286 | subunit_client = TestProtocolClient(stream) |
| 287 | # Force buffering of stdout/stderr so the console doesn't get corrupted by test output |
| 288 | # as per default in parent code |
| 289 | subunit_client.buffer = True |
| 290 | subunit_result = AutoTimingTestResultDecorator(subunit_client) |
Brad Bishop | 79641f2 | 2019-09-10 07:20:22 -0400 | [diff] [blame] | 291 | process_suite.run(ExtraResultsEncoderTestResult(subunit_result)) |
Brad Bishop | 1a4b7ee | 2018-12-16 17:11:34 -0800 | [diff] [blame] | 292 | if ourpid != os.getpid(): |
| 293 | os._exit(0) |
| 294 | if newbuilddir: |
| 295 | removebuilddir(newbuilddir) |
| 296 | except: |
| 297 | # Don't do anything with process children |
| 298 | if ourpid != os.getpid(): |
| 299 | os._exit(1) |
| 300 | # Try and report traceback on stream, but exit with error |
| 301 | # even if stream couldn't be created or something else |
| 302 | # goes wrong. The traceback is formatted to a string and |
| 303 | # written in one go to avoid interleaving lines from |
| 304 | # multiple failing children. |
| 305 | try: |
| 306 | stream.write(traceback.format_exc().encode('utf-8')) |
| 307 | except: |
| 308 | sys.stderr.write(traceback.format_exc()) |
| 309 | finally: |
| 310 | if newbuilddir: |
| 311 | removebuilddir(newbuilddir) |
| 312 | stream.flush() |
| 313 | os._exit(1) |
| 314 | stream.flush() |
| 315 | os._exit(0) |
| 316 | else: |
| 317 | os.close(c2pwrite) |
| 318 | stream = os.fdopen(c2pread, 'rb', 1) |
| 319 | test = ProtocolTestCase(stream) |
| 320 | result.append((test, numtests)) |
| 321 | return result, totaltests |
| 322 | |
| 323 | def partition_tests(suite, count): |
| 324 | # Keep tests from the same class together but allow tests from modules |
| 325 | # to go to different processes to aid parallelisation. |
| 326 | modules = {} |
| 327 | for test in iterate_tests(suite): |
| 328 | m = test.__module__ + "." + test.__class__.__name__ |
| 329 | if m not in modules: |
| 330 | modules[m] = [] |
| 331 | modules[m].append(test) |
| 332 | |
| 333 | # Simply divide the test blocks between the available processes |
| 334 | partitions = [list() for _ in range(count)] |
| 335 | for partition, m in zip(cycle(partitions), modules): |
| 336 | partition.extend(modules[m]) |
| 337 | |
| 338 | # No point in empty threads so drop them |
| 339 | return [p for p in partitions if p] |
| 340 | |