blob: 6293cf94ec5068f8a1558e17d06708e5a64e4ace [file] [log] [blame]
Brad Bishop1a4b7ee2018-12-16 17:11:34 -08001#!/usr/bin/env python3
2#
Brad Bishopc342db32019-05-15 21:57:59 -04003# SPDX-License-Identifier: GPL-2.0-or-later
4#
Brad Bishop1a4b7ee2018-12-16 17:11:34 -08005# Modified for use in OE by Richard Purdie, 2018
6#
7# Modified by: Corey Goldberg, 2013
8# License: GPLv2+
9#
10# Original code from:
11# Bazaar (bzrlib.tests.__init__.py, v2.6, copied Jun 01 2013)
12# Copyright (C) 2005-2011 Canonical Ltd
13# License: GPLv2+
14
15import os
16import sys
17import traceback
18import unittest
19import subprocess
20import testtools
21import threading
22import time
23import io
Brad Bishop79641f22019-09-10 07:20:22 -040024import json
Brad Bishopc342db32019-05-15 21:57:59 -040025import subunit
Brad Bishop1a4b7ee2018-12-16 17:11:34 -080026
27from queue import Queue
28from itertools import cycle
29from subunit import ProtocolTestCase, TestProtocolClient
30from subunit.test_results import AutoTimingTestResultDecorator
31from testtools import ThreadsafeForwardingResult, iterate_tests
Brad Bishop79641f22019-09-10 07:20:22 -040032from testtools.content import Content
33from testtools.content_type import ContentType
Brad Bishop19323692019-04-05 15:28:33 -040034from oeqa.utils.commands import get_test_layer
Brad Bishop1a4b7ee2018-12-16 17:11:34 -080035
36import bb.utils
37import oe.path
38
39_all__ = [
40 'ConcurrentTestSuite',
41 'fork_for_tests',
42 'partition_tests',
43]
44
45#
46# Patch the version from testtools to allow access to _test_start and allow
47# computation of timing information and threading progress
48#
49class BBThreadsafeForwardingResult(ThreadsafeForwardingResult):
50
51 def __init__(self, target, semaphore, threadnum, totalinprocess, totaltests):
52 super(BBThreadsafeForwardingResult, self).__init__(target, semaphore)
53 self.threadnum = threadnum
54 self.totalinprocess = totalinprocess
55 self.totaltests = totaltests
56
57 def _add_result_with_semaphore(self, method, test, *args, **kwargs):
58 self.semaphore.acquire()
59 try:
Brad Bishopc342db32019-05-15 21:57:59 -040060 if self._test_start:
61 self.result.starttime[test.id()] = self._test_start.timestamp()
62 self.result.threadprogress[self.threadnum].append(test.id())
63 totalprogress = sum(len(x) for x in self.result.threadprogress.values())
64 self.result.progressinfo[test.id()] = "%s: %s/%s %s/%s (%ss) (%s)" % (
Brad Bishop1a4b7ee2018-12-16 17:11:34 -080065 self.threadnum,
66 len(self.result.threadprogress[self.threadnum]),
67 self.totalinprocess,
68 totalprogress,
69 self.totaltests,
70 "{0:.2f}".format(time.time()-self._test_start.timestamp()),
71 test.id())
72 finally:
73 self.semaphore.release()
74 super(BBThreadsafeForwardingResult, self)._add_result_with_semaphore(method, test, *args, **kwargs)
75
Brad Bishop79641f22019-09-10 07:20:22 -040076class ProxyTestResult:
77 # a very basic TestResult proxy, in order to modify add* calls
78 def __init__(self, target):
79 self.result = target
80
81 def _addResult(self, method, test, *args, **kwargs):
82 return method(test, *args, **kwargs)
83
84 def addError(self, test, *args, **kwargs):
85 self._addResult(self.result.addError, test, *args, **kwargs)
86
87 def addFailure(self, test, *args, **kwargs):
88 self._addResult(self.result.addFailure, test, *args, **kwargs)
89
90 def addSuccess(self, test, *args, **kwargs):
91 self._addResult(self.result.addSuccess, test, *args, **kwargs)
92
93 def addExpectedFailure(self, test, *args, **kwargs):
94 self._addResult(self.result.addExpectedFailure, test, *args, **kwargs)
95
96 def addUnexpectedSuccess(self, test, *args, **kwargs):
97 self._addResult(self.result.addUnexpectedSuccess, test, *args, **kwargs)
98
99 def __getattr__(self, attr):
100 return getattr(self.result, attr)
101
102class ExtraResultsDecoderTestResult(ProxyTestResult):
103 def _addResult(self, method, test, *args, **kwargs):
104 if "details" in kwargs and "extraresults" in kwargs["details"]:
105 if isinstance(kwargs["details"]["extraresults"], Content):
106 kwargs = kwargs.copy()
107 kwargs["details"] = kwargs["details"].copy()
108 extraresults = kwargs["details"]["extraresults"]
109 data = bytearray()
110 for b in extraresults.iter_bytes():
111 data += b
112 extraresults = json.loads(data.decode())
113 kwargs["details"]["extraresults"] = extraresults
114 return method(test, *args, **kwargs)
115
116class ExtraResultsEncoderTestResult(ProxyTestResult):
117 def _addResult(self, method, test, *args, **kwargs):
118 if hasattr(test, "extraresults"):
119 extras = lambda : [json.dumps(test.extraresults).encode()]
120 kwargs = kwargs.copy()
121 if "details" not in kwargs:
122 kwargs["details"] = {}
123 else:
124 kwargs["details"] = kwargs["details"].copy()
125 kwargs["details"]["extraresults"] = Content(ContentType("application", "json", {'charset': 'utf8'}), extras)
126 return method(test, *args, **kwargs)
127
Brad Bishop1a4b7ee2018-12-16 17:11:34 -0800128#
Brad Bishopc342db32019-05-15 21:57:59 -0400129# We have to patch subunit since it doesn't understand how to handle addError
130# outside of a running test case. This can happen if classSetUp() fails
131# for a class of tests. This unfortunately has horrible internal knowledge.
132#
133def outSideTestaddError(self, offset, line):
134 """An 'error:' directive has been read."""
135 test_name = line[offset:-1].decode('utf8')
136 self.parser._current_test = subunit.RemotedTestCase(test_name)
137 self.parser.current_test_description = test_name
138 self.parser._state = self.parser._reading_error_details
139 self.parser._reading_error_details.set_simple()
140 self.parser.subunitLineReceived(line)
141
142subunit._OutSideTest.addError = outSideTestaddError
143
144
145#
Brad Bishop1a4b7ee2018-12-16 17:11:34 -0800146# A dummy structure to add to io.StringIO so that the .buffer object
147# is available and accepts writes. This allows unittest with buffer=True
148# to interact ok with subunit which wants to access sys.stdout.buffer.
149#
150class dummybuf(object):
151 def __init__(self, parent):
152 self.p = parent
153 def write(self, data):
154 self.p.write(data.decode("utf-8"))
155
156#
157# Taken from testtools.ConncurrencyTestSuite but modified for OE use
158#
159class ConcurrentTestSuite(unittest.TestSuite):
160
161 def __init__(self, suite, processes):
162 super(ConcurrentTestSuite, self).__init__([suite])
163 self.processes = processes
164
165 def run(self, result):
166 tests, totaltests = fork_for_tests(self.processes, self)
167 try:
168 threads = {}
169 queue = Queue()
170 semaphore = threading.Semaphore(1)
171 result.threadprogress = {}
172 for i, (test, testnum) in enumerate(tests):
173 result.threadprogress[i] = []
Brad Bishop79641f22019-09-10 07:20:22 -0400174 process_result = BBThreadsafeForwardingResult(
175 ExtraResultsDecoderTestResult(result),
176 semaphore, i, testnum, totaltests)
Brad Bishop1a4b7ee2018-12-16 17:11:34 -0800177 # Force buffering of stdout/stderr so the console doesn't get corrupted by test output
178 # as per default in parent code
179 process_result.buffer = True
180 # We have to add a buffer object to stdout to keep subunit happy
181 process_result._stderr_buffer = io.StringIO()
182 process_result._stderr_buffer.buffer = dummybuf(process_result._stderr_buffer)
183 process_result._stdout_buffer = io.StringIO()
184 process_result._stdout_buffer.buffer = dummybuf(process_result._stdout_buffer)
185 reader_thread = threading.Thread(
186 target=self._run_test, args=(test, process_result, queue))
187 threads[test] = reader_thread, process_result
188 reader_thread.start()
189 while threads:
190 finished_test = queue.get()
191 threads[finished_test][0].join()
192 del threads[finished_test]
193 except:
194 for thread, process_result in threads.values():
195 process_result.stop()
196 raise
197 finally:
198 for test in tests:
199 test[0]._stream.close()
200
201 def _run_test(self, test, process_result, queue):
202 try:
203 try:
204 test.run(process_result)
205 except Exception:
206 # The run logic itself failed
207 case = testtools.ErrorHolder(
208 "broken-runner",
209 error=sys.exc_info())
210 case.run(process_result)
211 finally:
212 queue.put(test)
213
214def removebuilddir(d):
215 delay = 5
216 while delay and os.path.exists(d + "/bitbake.lock"):
217 time.sleep(1)
218 delay = delay - 1
Brad Bishopa34c0302019-09-23 22:34:48 -0400219 bb.utils.prunedir(d, ionice=True)
Brad Bishop1a4b7ee2018-12-16 17:11:34 -0800220
221def fork_for_tests(concurrency_num, suite):
222 result = []
Brad Bishop19323692019-04-05 15:28:33 -0400223 if 'BUILDDIR' in os.environ:
224 selftestdir = get_test_layer()
225
Brad Bishop1a4b7ee2018-12-16 17:11:34 -0800226 test_blocks = partition_tests(suite, concurrency_num)
227 # Clear the tests from the original suite so it doesn't keep them alive
228 suite._tests[:] = []
229 totaltests = sum(len(x) for x in test_blocks)
230 for process_tests in test_blocks:
231 numtests = len(process_tests)
232 process_suite = unittest.TestSuite(process_tests)
233 # Also clear each split list so new suite has only reference
234 process_tests[:] = []
235 c2pread, c2pwrite = os.pipe()
236 # Clear buffers before fork to avoid duplicate output
237 sys.stdout.flush()
238 sys.stderr.flush()
239 pid = os.fork()
240 if pid == 0:
241 ourpid = os.getpid()
242 try:
243 newbuilddir = None
244 stream = os.fdopen(c2pwrite, 'wb', 1)
245 os.close(c2pread)
246
247 # Create a new separate BUILDDIR for each group of tests
248 if 'BUILDDIR' in os.environ:
249 builddir = os.environ['BUILDDIR']
250 newbuilddir = builddir + "-st-" + str(ourpid)
Brad Bishop1a4b7ee2018-12-16 17:11:34 -0800251 newselftestdir = newbuilddir + "/meta-selftest"
252
253 bb.utils.mkdirhier(newbuilddir)
254 oe.path.copytree(builddir + "/conf", newbuilddir + "/conf")
255 oe.path.copytree(builddir + "/cache", newbuilddir + "/cache")
256 oe.path.copytree(selftestdir, newselftestdir)
257
258 for e in os.environ:
259 if builddir in os.environ[e]:
260 os.environ[e] = os.environ[e].replace(builddir, newbuilddir)
261
262 subprocess.check_output("git init; git add *; git commit -a -m 'initial'", cwd=newselftestdir, shell=True)
263
264 # Tried to used bitbake-layers add/remove but it requires recipe parsing and hence is too slow
265 subprocess.check_output("sed %s/conf/bblayers.conf -i -e 's#%s#%s#g'" % (newbuilddir, selftestdir, newselftestdir), cwd=newbuilddir, shell=True)
266
267 os.chdir(newbuilddir)
268
269 for t in process_suite:
270 if not hasattr(t, "tc"):
271 continue
272 cp = t.tc.config_paths
273 for p in cp:
274 if selftestdir in cp[p] and newselftestdir not in cp[p]:
275 cp[p] = cp[p].replace(selftestdir, newselftestdir)
276 if builddir in cp[p] and newbuilddir not in cp[p]:
277 cp[p] = cp[p].replace(builddir, newbuilddir)
278
279 # Leave stderr and stdout open so we can see test noise
280 # Close stdin so that the child goes away if it decides to
281 # read from stdin (otherwise its a roulette to see what
282 # child actually gets keystrokes for pdb etc).
283 newsi = os.open(os.devnull, os.O_RDWR)
284 os.dup2(newsi, sys.stdin.fileno())
285
286 subunit_client = TestProtocolClient(stream)
287 # Force buffering of stdout/stderr so the console doesn't get corrupted by test output
288 # as per default in parent code
289 subunit_client.buffer = True
290 subunit_result = AutoTimingTestResultDecorator(subunit_client)
Brad Bishop79641f22019-09-10 07:20:22 -0400291 process_suite.run(ExtraResultsEncoderTestResult(subunit_result))
Brad Bishop1a4b7ee2018-12-16 17:11:34 -0800292 if ourpid != os.getpid():
293 os._exit(0)
294 if newbuilddir:
295 removebuilddir(newbuilddir)
296 except:
297 # Don't do anything with process children
298 if ourpid != os.getpid():
299 os._exit(1)
300 # Try and report traceback on stream, but exit with error
301 # even if stream couldn't be created or something else
302 # goes wrong. The traceback is formatted to a string and
303 # written in one go to avoid interleaving lines from
304 # multiple failing children.
305 try:
306 stream.write(traceback.format_exc().encode('utf-8'))
307 except:
308 sys.stderr.write(traceback.format_exc())
309 finally:
310 if newbuilddir:
311 removebuilddir(newbuilddir)
312 stream.flush()
313 os._exit(1)
314 stream.flush()
315 os._exit(0)
316 else:
317 os.close(c2pwrite)
318 stream = os.fdopen(c2pread, 'rb', 1)
319 test = ProtocolTestCase(stream)
320 result.append((test, numtests))
321 return result, totaltests
322
323def partition_tests(suite, count):
324 # Keep tests from the same class together but allow tests from modules
325 # to go to different processes to aid parallelisation.
326 modules = {}
327 for test in iterate_tests(suite):
328 m = test.__module__ + "." + test.__class__.__name__
329 if m not in modules:
330 modules[m] = []
331 modules[m].append(test)
332
333 # Simply divide the test blocks between the available processes
334 partitions = [list() for _ in range(count)]
335 for partition, m in zip(cycle(partitions), modules):
336 partition.extend(modules[m])
337
338 # No point in empty threads so drop them
339 return [p for p in partitions if p]
340