blob: f050289e612cd31bfb8912f8719a4f3c41633ef7 [file] [log] [blame]
Brad Bishop1a4b7ee2018-12-16 17:11:34 -08001#!/usr/bin/env python3
2#
3# Modified for use in OE by Richard Purdie, 2018
4#
5# Modified by: Corey Goldberg, 2013
6# License: GPLv2+
7#
8# Original code from:
9# Bazaar (bzrlib.tests.__init__.py, v2.6, copied Jun 01 2013)
10# Copyright (C) 2005-2011 Canonical Ltd
11# License: GPLv2+
12
13import os
14import sys
15import traceback
16import unittest
17import subprocess
18import testtools
19import threading
20import time
21import io
22
23from queue import Queue
24from itertools import cycle
25from subunit import ProtocolTestCase, TestProtocolClient
26from subunit.test_results import AutoTimingTestResultDecorator
27from testtools import ThreadsafeForwardingResult, iterate_tests
28
29import bb.utils
30import oe.path
31
32_all__ = [
33 'ConcurrentTestSuite',
34 'fork_for_tests',
35 'partition_tests',
36]
37
38#
39# Patch the version from testtools to allow access to _test_start and allow
40# computation of timing information and threading progress
41#
42class BBThreadsafeForwardingResult(ThreadsafeForwardingResult):
43
44 def __init__(self, target, semaphore, threadnum, totalinprocess, totaltests):
45 super(BBThreadsafeForwardingResult, self).__init__(target, semaphore)
46 self.threadnum = threadnum
47 self.totalinprocess = totalinprocess
48 self.totaltests = totaltests
49
50 def _add_result_with_semaphore(self, method, test, *args, **kwargs):
51 self.semaphore.acquire()
52 try:
53 self.result.starttime[test.id()] = self._test_start.timestamp()
54 self.result.threadprogress[self.threadnum].append(test.id())
55 totalprogress = sum(len(x) for x in self.result.threadprogress.values())
56 self.result.progressinfo[test.id()] = "%s: %s/%s %s/%s (%ss) (%s)" % (
57 self.threadnum,
58 len(self.result.threadprogress[self.threadnum]),
59 self.totalinprocess,
60 totalprogress,
61 self.totaltests,
62 "{0:.2f}".format(time.time()-self._test_start.timestamp()),
63 test.id())
64 finally:
65 self.semaphore.release()
66 super(BBThreadsafeForwardingResult, self)._add_result_with_semaphore(method, test, *args, **kwargs)
67
68#
69# A dummy structure to add to io.StringIO so that the .buffer object
70# is available and accepts writes. This allows unittest with buffer=True
71# to interact ok with subunit which wants to access sys.stdout.buffer.
72#
73class dummybuf(object):
74 def __init__(self, parent):
75 self.p = parent
76 def write(self, data):
77 self.p.write(data.decode("utf-8"))
78
79#
80# Taken from testtools.ConncurrencyTestSuite but modified for OE use
81#
82class ConcurrentTestSuite(unittest.TestSuite):
83
84 def __init__(self, suite, processes):
85 super(ConcurrentTestSuite, self).__init__([suite])
86 self.processes = processes
87
88 def run(self, result):
89 tests, totaltests = fork_for_tests(self.processes, self)
90 try:
91 threads = {}
92 queue = Queue()
93 semaphore = threading.Semaphore(1)
94 result.threadprogress = {}
95 for i, (test, testnum) in enumerate(tests):
96 result.threadprogress[i] = []
97 process_result = BBThreadsafeForwardingResult(result, semaphore, i, testnum, totaltests)
98 # Force buffering of stdout/stderr so the console doesn't get corrupted by test output
99 # as per default in parent code
100 process_result.buffer = True
101 # We have to add a buffer object to stdout to keep subunit happy
102 process_result._stderr_buffer = io.StringIO()
103 process_result._stderr_buffer.buffer = dummybuf(process_result._stderr_buffer)
104 process_result._stdout_buffer = io.StringIO()
105 process_result._stdout_buffer.buffer = dummybuf(process_result._stdout_buffer)
106 reader_thread = threading.Thread(
107 target=self._run_test, args=(test, process_result, queue))
108 threads[test] = reader_thread, process_result
109 reader_thread.start()
110 while threads:
111 finished_test = queue.get()
112 threads[finished_test][0].join()
113 del threads[finished_test]
114 except:
115 for thread, process_result in threads.values():
116 process_result.stop()
117 raise
118 finally:
119 for test in tests:
120 test[0]._stream.close()
121
122 def _run_test(self, test, process_result, queue):
123 try:
124 try:
125 test.run(process_result)
126 except Exception:
127 # The run logic itself failed
128 case = testtools.ErrorHolder(
129 "broken-runner",
130 error=sys.exc_info())
131 case.run(process_result)
132 finally:
133 queue.put(test)
134
135def removebuilddir(d):
136 delay = 5
137 while delay and os.path.exists(d + "/bitbake.lock"):
138 time.sleep(1)
139 delay = delay - 1
140 bb.utils.prunedir(d)
141
142def fork_for_tests(concurrency_num, suite):
143 result = []
144 test_blocks = partition_tests(suite, concurrency_num)
145 # Clear the tests from the original suite so it doesn't keep them alive
146 suite._tests[:] = []
147 totaltests = sum(len(x) for x in test_blocks)
148 for process_tests in test_blocks:
149 numtests = len(process_tests)
150 process_suite = unittest.TestSuite(process_tests)
151 # Also clear each split list so new suite has only reference
152 process_tests[:] = []
153 c2pread, c2pwrite = os.pipe()
154 # Clear buffers before fork to avoid duplicate output
155 sys.stdout.flush()
156 sys.stderr.flush()
157 pid = os.fork()
158 if pid == 0:
159 ourpid = os.getpid()
160 try:
161 newbuilddir = None
162 stream = os.fdopen(c2pwrite, 'wb', 1)
163 os.close(c2pread)
164
165 # Create a new separate BUILDDIR for each group of tests
166 if 'BUILDDIR' in os.environ:
167 builddir = os.environ['BUILDDIR']
168 newbuilddir = builddir + "-st-" + str(ourpid)
169 selftestdir = os.path.abspath(builddir + "/../meta-selftest")
170 newselftestdir = newbuilddir + "/meta-selftest"
171
172 bb.utils.mkdirhier(newbuilddir)
173 oe.path.copytree(builddir + "/conf", newbuilddir + "/conf")
174 oe.path.copytree(builddir + "/cache", newbuilddir + "/cache")
175 oe.path.copytree(selftestdir, newselftestdir)
176
177 for e in os.environ:
178 if builddir in os.environ[e]:
179 os.environ[e] = os.environ[e].replace(builddir, newbuilddir)
180
181 subprocess.check_output("git init; git add *; git commit -a -m 'initial'", cwd=newselftestdir, shell=True)
182
183 # Tried to used bitbake-layers add/remove but it requires recipe parsing and hence is too slow
184 subprocess.check_output("sed %s/conf/bblayers.conf -i -e 's#%s#%s#g'" % (newbuilddir, selftestdir, newselftestdir), cwd=newbuilddir, shell=True)
185
186 os.chdir(newbuilddir)
187
188 for t in process_suite:
189 if not hasattr(t, "tc"):
190 continue
191 cp = t.tc.config_paths
192 for p in cp:
193 if selftestdir in cp[p] and newselftestdir not in cp[p]:
194 cp[p] = cp[p].replace(selftestdir, newselftestdir)
195 if builddir in cp[p] and newbuilddir not in cp[p]:
196 cp[p] = cp[p].replace(builddir, newbuilddir)
197
198 # Leave stderr and stdout open so we can see test noise
199 # Close stdin so that the child goes away if it decides to
200 # read from stdin (otherwise its a roulette to see what
201 # child actually gets keystrokes for pdb etc).
202 newsi = os.open(os.devnull, os.O_RDWR)
203 os.dup2(newsi, sys.stdin.fileno())
204
205 subunit_client = TestProtocolClient(stream)
206 # Force buffering of stdout/stderr so the console doesn't get corrupted by test output
207 # as per default in parent code
208 subunit_client.buffer = True
209 subunit_result = AutoTimingTestResultDecorator(subunit_client)
210 process_suite.run(subunit_result)
211 if ourpid != os.getpid():
212 os._exit(0)
213 if newbuilddir:
214 removebuilddir(newbuilddir)
215 except:
216 # Don't do anything with process children
217 if ourpid != os.getpid():
218 os._exit(1)
219 # Try and report traceback on stream, but exit with error
220 # even if stream couldn't be created or something else
221 # goes wrong. The traceback is formatted to a string and
222 # written in one go to avoid interleaving lines from
223 # multiple failing children.
224 try:
225 stream.write(traceback.format_exc().encode('utf-8'))
226 except:
227 sys.stderr.write(traceback.format_exc())
228 finally:
229 if newbuilddir:
230 removebuilddir(newbuilddir)
231 stream.flush()
232 os._exit(1)
233 stream.flush()
234 os._exit(0)
235 else:
236 os.close(c2pwrite)
237 stream = os.fdopen(c2pread, 'rb', 1)
238 test = ProtocolTestCase(stream)
239 result.append((test, numtests))
240 return result, totaltests
241
242def partition_tests(suite, count):
243 # Keep tests from the same class together but allow tests from modules
244 # to go to different processes to aid parallelisation.
245 modules = {}
246 for test in iterate_tests(suite):
247 m = test.__module__ + "." + test.__class__.__name__
248 if m not in modules:
249 modules[m] = []
250 modules[m].append(test)
251
252 # Simply divide the test blocks between the available processes
253 partitions = [list() for _ in range(count)]
254 for partition, m in zip(cycle(partitions), modules):
255 partition.extend(modules[m])
256
257 # No point in empty threads so drop them
258 return [p for p in partitions if p]
259