blob: e050818f0f4f32c2d233ddbe010d97b42e476caa [file] [log] [blame]
Brad Bishop1a4b7ee2018-12-16 17:11:34 -08001#!/usr/bin/env python3
2#
3# Modified for use in OE by Richard Purdie, 2018
4#
5# Modified by: Corey Goldberg, 2013
6# License: GPLv2+
7#
8# Original code from:
9# Bazaar (bzrlib.tests.__init__.py, v2.6, copied Jun 01 2013)
10# Copyright (C) 2005-2011 Canonical Ltd
11# License: GPLv2+
12
13import os
14import sys
15import traceback
16import unittest
17import subprocess
18import testtools
19import threading
20import time
21import io
22
23from queue import Queue
24from itertools import cycle
25from subunit import ProtocolTestCase, TestProtocolClient
26from subunit.test_results import AutoTimingTestResultDecorator
27from testtools import ThreadsafeForwardingResult, iterate_tests
Brad Bishop19323692019-04-05 15:28:33 -040028from oeqa.utils.commands import get_test_layer
Brad Bishop1a4b7ee2018-12-16 17:11:34 -080029
30import bb.utils
31import oe.path
32
33_all__ = [
34 'ConcurrentTestSuite',
35 'fork_for_tests',
36 'partition_tests',
37]
38
39#
40# Patch the version from testtools to allow access to _test_start and allow
41# computation of timing information and threading progress
42#
43class BBThreadsafeForwardingResult(ThreadsafeForwardingResult):
44
45 def __init__(self, target, semaphore, threadnum, totalinprocess, totaltests):
46 super(BBThreadsafeForwardingResult, self).__init__(target, semaphore)
47 self.threadnum = threadnum
48 self.totalinprocess = totalinprocess
49 self.totaltests = totaltests
50
51 def _add_result_with_semaphore(self, method, test, *args, **kwargs):
52 self.semaphore.acquire()
53 try:
54 self.result.starttime[test.id()] = self._test_start.timestamp()
55 self.result.threadprogress[self.threadnum].append(test.id())
56 totalprogress = sum(len(x) for x in self.result.threadprogress.values())
57 self.result.progressinfo[test.id()] = "%s: %s/%s %s/%s (%ss) (%s)" % (
58 self.threadnum,
59 len(self.result.threadprogress[self.threadnum]),
60 self.totalinprocess,
61 totalprogress,
62 self.totaltests,
63 "{0:.2f}".format(time.time()-self._test_start.timestamp()),
64 test.id())
65 finally:
66 self.semaphore.release()
67 super(BBThreadsafeForwardingResult, self)._add_result_with_semaphore(method, test, *args, **kwargs)
68
69#
70# A dummy structure to add to io.StringIO so that the .buffer object
71# is available and accepts writes. This allows unittest with buffer=True
72# to interact ok with subunit which wants to access sys.stdout.buffer.
73#
74class dummybuf(object):
75 def __init__(self, parent):
76 self.p = parent
77 def write(self, data):
78 self.p.write(data.decode("utf-8"))
79
80#
81# Taken from testtools.ConncurrencyTestSuite but modified for OE use
82#
83class ConcurrentTestSuite(unittest.TestSuite):
84
85 def __init__(self, suite, processes):
86 super(ConcurrentTestSuite, self).__init__([suite])
87 self.processes = processes
88
89 def run(self, result):
90 tests, totaltests = fork_for_tests(self.processes, self)
91 try:
92 threads = {}
93 queue = Queue()
94 semaphore = threading.Semaphore(1)
95 result.threadprogress = {}
96 for i, (test, testnum) in enumerate(tests):
97 result.threadprogress[i] = []
98 process_result = BBThreadsafeForwardingResult(result, semaphore, i, testnum, totaltests)
99 # Force buffering of stdout/stderr so the console doesn't get corrupted by test output
100 # as per default in parent code
101 process_result.buffer = True
102 # We have to add a buffer object to stdout to keep subunit happy
103 process_result._stderr_buffer = io.StringIO()
104 process_result._stderr_buffer.buffer = dummybuf(process_result._stderr_buffer)
105 process_result._stdout_buffer = io.StringIO()
106 process_result._stdout_buffer.buffer = dummybuf(process_result._stdout_buffer)
107 reader_thread = threading.Thread(
108 target=self._run_test, args=(test, process_result, queue))
109 threads[test] = reader_thread, process_result
110 reader_thread.start()
111 while threads:
112 finished_test = queue.get()
113 threads[finished_test][0].join()
114 del threads[finished_test]
115 except:
116 for thread, process_result in threads.values():
117 process_result.stop()
118 raise
119 finally:
120 for test in tests:
121 test[0]._stream.close()
122
123 def _run_test(self, test, process_result, queue):
124 try:
125 try:
126 test.run(process_result)
127 except Exception:
128 # The run logic itself failed
129 case = testtools.ErrorHolder(
130 "broken-runner",
131 error=sys.exc_info())
132 case.run(process_result)
133 finally:
134 queue.put(test)
135
136def removebuilddir(d):
137 delay = 5
138 while delay and os.path.exists(d + "/bitbake.lock"):
139 time.sleep(1)
140 delay = delay - 1
141 bb.utils.prunedir(d)
142
143def fork_for_tests(concurrency_num, suite):
144 result = []
Brad Bishop19323692019-04-05 15:28:33 -0400145 if 'BUILDDIR' in os.environ:
146 selftestdir = get_test_layer()
147
Brad Bishop1a4b7ee2018-12-16 17:11:34 -0800148 test_blocks = partition_tests(suite, concurrency_num)
149 # Clear the tests from the original suite so it doesn't keep them alive
150 suite._tests[:] = []
151 totaltests = sum(len(x) for x in test_blocks)
152 for process_tests in test_blocks:
153 numtests = len(process_tests)
154 process_suite = unittest.TestSuite(process_tests)
155 # Also clear each split list so new suite has only reference
156 process_tests[:] = []
157 c2pread, c2pwrite = os.pipe()
158 # Clear buffers before fork to avoid duplicate output
159 sys.stdout.flush()
160 sys.stderr.flush()
161 pid = os.fork()
162 if pid == 0:
163 ourpid = os.getpid()
164 try:
165 newbuilddir = None
166 stream = os.fdopen(c2pwrite, 'wb', 1)
167 os.close(c2pread)
168
169 # Create a new separate BUILDDIR for each group of tests
170 if 'BUILDDIR' in os.environ:
171 builddir = os.environ['BUILDDIR']
172 newbuilddir = builddir + "-st-" + str(ourpid)
Brad Bishop1a4b7ee2018-12-16 17:11:34 -0800173 newselftestdir = newbuilddir + "/meta-selftest"
174
175 bb.utils.mkdirhier(newbuilddir)
176 oe.path.copytree(builddir + "/conf", newbuilddir + "/conf")
177 oe.path.copytree(builddir + "/cache", newbuilddir + "/cache")
178 oe.path.copytree(selftestdir, newselftestdir)
179
180 for e in os.environ:
181 if builddir in os.environ[e]:
182 os.environ[e] = os.environ[e].replace(builddir, newbuilddir)
183
184 subprocess.check_output("git init; git add *; git commit -a -m 'initial'", cwd=newselftestdir, shell=True)
185
186 # Tried to used bitbake-layers add/remove but it requires recipe parsing and hence is too slow
187 subprocess.check_output("sed %s/conf/bblayers.conf -i -e 's#%s#%s#g'" % (newbuilddir, selftestdir, newselftestdir), cwd=newbuilddir, shell=True)
188
189 os.chdir(newbuilddir)
190
191 for t in process_suite:
192 if not hasattr(t, "tc"):
193 continue
194 cp = t.tc.config_paths
195 for p in cp:
196 if selftestdir in cp[p] and newselftestdir not in cp[p]:
197 cp[p] = cp[p].replace(selftestdir, newselftestdir)
198 if builddir in cp[p] and newbuilddir not in cp[p]:
199 cp[p] = cp[p].replace(builddir, newbuilddir)
200
201 # Leave stderr and stdout open so we can see test noise
202 # Close stdin so that the child goes away if it decides to
203 # read from stdin (otherwise its a roulette to see what
204 # child actually gets keystrokes for pdb etc).
205 newsi = os.open(os.devnull, os.O_RDWR)
206 os.dup2(newsi, sys.stdin.fileno())
207
208 subunit_client = TestProtocolClient(stream)
209 # Force buffering of stdout/stderr so the console doesn't get corrupted by test output
210 # as per default in parent code
211 subunit_client.buffer = True
212 subunit_result = AutoTimingTestResultDecorator(subunit_client)
213 process_suite.run(subunit_result)
214 if ourpid != os.getpid():
215 os._exit(0)
216 if newbuilddir:
217 removebuilddir(newbuilddir)
218 except:
219 # Don't do anything with process children
220 if ourpid != os.getpid():
221 os._exit(1)
222 # Try and report traceback on stream, but exit with error
223 # even if stream couldn't be created or something else
224 # goes wrong. The traceback is formatted to a string and
225 # written in one go to avoid interleaving lines from
226 # multiple failing children.
227 try:
228 stream.write(traceback.format_exc().encode('utf-8'))
229 except:
230 sys.stderr.write(traceback.format_exc())
231 finally:
232 if newbuilddir:
233 removebuilddir(newbuilddir)
234 stream.flush()
235 os._exit(1)
236 stream.flush()
237 os._exit(0)
238 else:
239 os.close(c2pwrite)
240 stream = os.fdopen(c2pread, 'rb', 1)
241 test = ProtocolTestCase(stream)
242 result.append((test, numtests))
243 return result, totaltests
244
245def partition_tests(suite, count):
246 # Keep tests from the same class together but allow tests from modules
247 # to go to different processes to aid parallelisation.
248 modules = {}
249 for test in iterate_tests(suite):
250 m = test.__module__ + "." + test.__class__.__name__
251 if m not in modules:
252 modules[m] = []
253 modules[m].append(test)
254
255 # Simply divide the test blocks between the available processes
256 partitions = [list() for _ in range(count)]
257 for partition, m in zip(cycle(partitions), modules):
258 partition.extend(modules[m])
259
260 # No point in empty threads so drop them
261 return [p for p in partitions if p]
262