blob: 500f2ad1611505800dcf09e01ec34195a75b8a70 [file] [log] [blame]
Patrick Williamsc0f7c042017-02-23 20:41:17 -06001#!/usr/bin/env python3
Patrick Williamsc124f4f2015-09-15 14:41:29 -05002
3import os
4import sys
5import warnings
6sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib'))
7from bb import fetch2
8import logging
9import bb
10import select
11import errno
12import signal
Patrick Williamsc0f7c042017-02-23 20:41:17 -060013import pickle
Patrick Williamsf1e5d692016-03-30 15:21:19 -050014from multiprocessing import Lock
Patrick Williamsc124f4f2015-09-15 14:41:29 -050015
Patrick Williamsc0f7c042017-02-23 20:41:17 -060016if sys.getfilesystemencoding() != "utf-8":
17 sys.exit("Please use a locale setting which supports utf-8.\nPython can't change the filesystem locale after loading so we need a utf-8 when python starts or things won't work.")
18
Patrick Williamsc124f4f2015-09-15 14:41:29 -050019# Users shouldn't be running this code directly
20if len(sys.argv) != 2 or not sys.argv[1].startswith("decafbad"):
21 print("bitbake-worker is meant for internal execution by bitbake itself, please don't use it standalone.")
22 sys.exit(1)
23
24profiling = False
Patrick Williamsd8c66bc2016-06-20 12:57:21 -050025if sys.argv[1].startswith("decafbadbad"):
Patrick Williamsc124f4f2015-09-15 14:41:29 -050026 profiling = True
27 try:
28 import cProfile as profile
29 except:
30 import profile
31
32# Unbuffer stdout to avoid log truncation in the event
33# of an unorderly exit as well as to provide timely
34# updates to log files for use with tail
35try:
36 if sys.stdout.name == '<stdout>':
Patrick Williamsc0f7c042017-02-23 20:41:17 -060037 import fcntl
38 fl = fcntl.fcntl(sys.stdout.fileno(), fcntl.F_GETFL)
39 fl |= os.O_SYNC
40 fcntl.fcntl(sys.stdout.fileno(), fcntl.F_SETFL, fl)
41 #sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
Patrick Williamsc124f4f2015-09-15 14:41:29 -050042except:
43 pass
44
45logger = logging.getLogger("BitBake")
46
Patrick Williamsc124f4f2015-09-15 14:41:29 -050047worker_pipe = sys.stdout.fileno()
48bb.utils.nonblockingfd(worker_pipe)
Patrick Williamsf1e5d692016-03-30 15:21:19 -050049# Need to guard against multiprocessing being used in child processes
50# and multiple processes trying to write to the parent at the same time
51worker_pipe_lock = None
Patrick Williamsc124f4f2015-09-15 14:41:29 -050052
53handler = bb.event.LogHandler()
54logger.addHandler(handler)
55
56if 0:
57 # Code to write out a log file of all events passing through the worker
58 logfilename = "/tmp/workerlogfile"
59 format_str = "%(levelname)s: %(message)s"
60 conlogformat = bb.msg.BBLogFormatter(format_str)
61 consolelog = logging.FileHandler(logfilename)
62 bb.msg.addDefaultlogFilter(consolelog)
63 consolelog.setFormatter(conlogformat)
64 logger.addHandler(consolelog)
65
Patrick Williamsc0f7c042017-02-23 20:41:17 -060066worker_queue = b""
Patrick Williamsc124f4f2015-09-15 14:41:29 -050067
68def worker_fire(event, d):
Patrick Williamsc0f7c042017-02-23 20:41:17 -060069 data = b"<event>" + pickle.dumps(event) + b"</event>"
Patrick Williamsc124f4f2015-09-15 14:41:29 -050070 worker_fire_prepickled(data)
71
72def worker_fire_prepickled(event):
73 global worker_queue
74
75 worker_queue = worker_queue + event
76 worker_flush()
77
78def worker_flush():
79 global worker_queue, worker_pipe
80
81 if not worker_queue:
82 return
83
84 try:
85 written = os.write(worker_pipe, worker_queue)
86 worker_queue = worker_queue[written:]
87 except (IOError, OSError) as e:
88 if e.errno != errno.EAGAIN and e.errno != errno.EPIPE:
89 raise
90
91def worker_child_fire(event, d):
92 global worker_pipe
Patrick Williamsf1e5d692016-03-30 15:21:19 -050093 global worker_pipe_lock
Patrick Williamsc124f4f2015-09-15 14:41:29 -050094
Patrick Williamsc0f7c042017-02-23 20:41:17 -060095 data = b"<event>" + pickle.dumps(event) + b"</event>"
Patrick Williamsc124f4f2015-09-15 14:41:29 -050096 try:
Patrick Williamsf1e5d692016-03-30 15:21:19 -050097 worker_pipe_lock.acquire()
Patrick Williamsc124f4f2015-09-15 14:41:29 -050098 worker_pipe.write(data)
Patrick Williamsf1e5d692016-03-30 15:21:19 -050099 worker_pipe_lock.release()
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500100 except IOError:
101 sigterm_handler(None, None)
102 raise
103
104bb.event.worker_fire = worker_fire
105
106lf = None
107#lf = open("/tmp/workercommandlog", "w+")
108def workerlog_write(msg):
109 if lf:
110 lf.write(msg)
111 lf.flush()
112
113def sigterm_handler(signum, frame):
114 signal.signal(signal.SIGTERM, signal.SIG_DFL)
115 os.killpg(0, signal.SIGTERM)
116 sys.exit()
117
Patrick Williamsc0f7c042017-02-23 20:41:17 -0600118def fork_off_task(cfg, data, databuilder, workerdata, fn, task, taskname, appends, taskdepdata, quieterrors=False):
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500119 # We need to setup the environment BEFORE the fork, since
120 # a fork() or exec*() activates PSEUDO...
121
122 envbackup = {}
123 fakeenv = {}
124 umask = None
125
126 taskdep = workerdata["taskdeps"][fn]
127 if 'umask' in taskdep and taskname in taskdep['umask']:
128 # umask might come in as a number or text string..
129 try:
130 umask = int(taskdep['umask'][taskname],8)
131 except TypeError:
132 umask = taskdep['umask'][taskname]
133
134 # We can't use the fakeroot environment in a dry run as it possibly hasn't been built
135 if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not cfg.dry_run:
136 envvars = (workerdata["fakerootenv"][fn] or "").split()
137 for key, value in (var.split('=') for var in envvars):
138 envbackup[key] = os.environ.get(key)
139 os.environ[key] = value
140 fakeenv[key] = value
141
142 fakedirs = (workerdata["fakerootdirs"][fn] or "").split()
143 for p in fakedirs:
144 bb.utils.mkdirhier(p)
145 logger.debug(2, 'Running %s:%s under fakeroot, fakedirs: %s' %
146 (fn, taskname, ', '.join(fakedirs)))
147 else:
148 envvars = (workerdata["fakerootnoenv"][fn] or "").split()
149 for key, value in (var.split('=') for var in envvars):
150 envbackup[key] = os.environ.get(key)
151 os.environ[key] = value
152 fakeenv[key] = value
153
154 sys.stdout.flush()
155 sys.stderr.flush()
156
157 try:
158 pipein, pipeout = os.pipe()
159 pipein = os.fdopen(pipein, 'rb', 4096)
160 pipeout = os.fdopen(pipeout, 'wb', 0)
161 pid = os.fork()
162 except OSError as e:
Patrick Williamsc0f7c042017-02-23 20:41:17 -0600163 logger.critical("fork failed: %d (%s)" % (e.errno, e.strerror))
164 sys.exit(1)
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500165
166 if pid == 0:
167 def child():
168 global worker_pipe
Patrick Williamsf1e5d692016-03-30 15:21:19 -0500169 global worker_pipe_lock
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500170 pipein.close()
171
172 signal.signal(signal.SIGTERM, sigterm_handler)
173 # Let SIGHUP exit as SIGTERM
174 signal.signal(signal.SIGHUP, sigterm_handler)
175 bb.utils.signal_on_parent_exit("SIGTERM")
176
177 # Save out the PID so that the event can include it the
178 # events
179 bb.event.worker_pid = os.getpid()
180 bb.event.worker_fire = worker_child_fire
181 worker_pipe = pipeout
Patrick Williamsf1e5d692016-03-30 15:21:19 -0500182 worker_pipe_lock = Lock()
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500183
184 # Make the child the process group leader and ensure no
185 # child process will be controlled by the current terminal
186 # This ensures signals sent to the controlling terminal like Ctrl+C
187 # don't stop the child processes.
188 os.setsid()
189 # No stdin
190 newsi = os.open(os.devnull, os.O_RDWR)
191 os.dup2(newsi, sys.stdin.fileno())
192
193 if umask:
194 os.umask(umask)
195
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500196 try:
Patrick Williamsc0f7c042017-02-23 20:41:17 -0600197 bb_cache = bb.cache.NoCache(databuilder)
198 (realfn, virtual, mc) = bb.cache.virtualfn2realfn(fn)
199 the_data = databuilder.mcdata[mc]
200 the_data.setVar("BB_WORKERCONTEXT", "1")
201 the_data.setVar("BB_TASKDEPDATA", taskdepdata)
202 the_data.setVar("BUILDNAME", workerdata["buildname"])
203 the_data.setVar("DATE", workerdata["date"])
204 the_data.setVar("TIME", workerdata["time"])
205 bb.parse.siggen.set_taskdata(workerdata["sigdata"])
206 ret = 0
207
208 the_data = bb_cache.loadDataFull(fn, appends)
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500209 the_data.setVar('BB_TASKHASH', workerdata["runq_hash"][task])
210
Patrick Williamsd8c66bc2016-06-20 12:57:21 -0500211 bb.utils.set_process_name("%s:%s" % (the_data.getVar("PN", True), taskname.replace("do_", "")))
212
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500213 # exported_vars() returns a generator which *cannot* be passed to os.environ.update()
214 # successfully. We also need to unset anything from the environment which shouldn't be there
215 exports = bb.data.exported_vars(the_data)
Patrick Williamsc0f7c042017-02-23 20:41:17 -0600216
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500217 bb.utils.empty_environment()
218 for e, v in exports:
219 os.environ[e] = v
Patrick Williamsc0f7c042017-02-23 20:41:17 -0600220
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500221 for e in fakeenv:
222 os.environ[e] = fakeenv[e]
223 the_data.setVar(e, fakeenv[e])
224 the_data.setVarFlag(e, 'export', "1")
225
Patrick Williamsc0f7c042017-02-23 20:41:17 -0600226 task_exports = the_data.getVarFlag(taskname, 'exports', True)
227 if task_exports:
228 for e in task_exports.split():
229 the_data.setVarFlag(e, 'export', '1')
230 v = the_data.getVar(e, True)
231 if v is not None:
232 os.environ[e] = v
233
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500234 if quieterrors:
235 the_data.setVarFlag(taskname, "quieterrors", "1")
236
237 except Exception as exc:
238 if not quieterrors:
239 logger.critical(str(exc))
240 os._exit(1)
241 try:
242 if cfg.dry_run:
243 return 0
244 return bb.build.exec_task(fn, taskname, the_data, cfg.profile)
245 except:
246 os._exit(1)
247 if not profiling:
248 os._exit(child())
249 else:
250 profname = "profile-%s.log" % (fn.replace("/", "-") + "-" + taskname)
251 prof = profile.Profile()
252 try:
253 ret = profile.Profile.runcall(prof, child)
254 finally:
255 prof.dump_stats(profname)
256 bb.utils.process_profilelog(profname)
257 os._exit(ret)
258 else:
Patrick Williamsc0f7c042017-02-23 20:41:17 -0600259 for key, value in iter(envbackup.items()):
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500260 if value is None:
261 del os.environ[key]
262 else:
263 os.environ[key] = value
264
265 return pid, pipein, pipeout
266
267class runQueueWorkerPipe():
268 """
269 Abstraction for a pipe between a worker thread and the worker server
270 """
271 def __init__(self, pipein, pipeout):
272 self.input = pipein
273 if pipeout:
274 pipeout.close()
275 bb.utils.nonblockingfd(self.input)
Patrick Williamsc0f7c042017-02-23 20:41:17 -0600276 self.queue = b""
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500277
278 def read(self):
279 start = len(self.queue)
280 try:
Patrick Williamsc0f7c042017-02-23 20:41:17 -0600281 self.queue = self.queue + (self.input.read(102400) or b"")
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500282 except (OSError, IOError) as e:
283 if e.errno != errno.EAGAIN:
284 raise
285
286 end = len(self.queue)
Patrick Williamsc0f7c042017-02-23 20:41:17 -0600287 index = self.queue.find(b"</event>")
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500288 while index != -1:
289 worker_fire_prepickled(self.queue[:index+8])
290 self.queue = self.queue[index+8:]
Patrick Williamsc0f7c042017-02-23 20:41:17 -0600291 index = self.queue.find(b"</event>")
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500292 return (end > start)
293
294 def close(self):
295 while self.read():
296 continue
297 if len(self.queue) > 0:
298 print("Warning, worker child left partial message: %s" % self.queue)
299 self.input.close()
300
301normalexit = False
302
303class BitbakeWorker(object):
304 def __init__(self, din):
305 self.input = din
306 bb.utils.nonblockingfd(self.input)
Patrick Williamsc0f7c042017-02-23 20:41:17 -0600307 self.queue = b""
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500308 self.cookercfg = None
309 self.databuilder = None
310 self.data = None
311 self.build_pids = {}
312 self.build_pipes = {}
313
314 signal.signal(signal.SIGTERM, self.sigterm_exception)
315 # Let SIGHUP exit as SIGTERM
316 signal.signal(signal.SIGHUP, self.sigterm_exception)
Patrick Williamsd8c66bc2016-06-20 12:57:21 -0500317 if "beef" in sys.argv[1]:
318 bb.utils.set_process_name("Worker (Fakeroot)")
319 else:
320 bb.utils.set_process_name("Worker")
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500321
322 def sigterm_exception(self, signum, stackframe):
323 if signum == signal.SIGTERM:
Patrick Williamsd8c66bc2016-06-20 12:57:21 -0500324 bb.warn("Worker received SIGTERM, shutting down...")
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500325 elif signum == signal.SIGHUP:
Patrick Williamsd8c66bc2016-06-20 12:57:21 -0500326 bb.warn("Worker received SIGHUP, shutting down...")
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500327 self.handle_finishnow(None)
328 signal.signal(signal.SIGTERM, signal.SIG_DFL)
329 os.kill(os.getpid(), signal.SIGTERM)
330
331 def serve(self):
332 while True:
333 (ready, _, _) = select.select([self.input] + [i.input for i in self.build_pipes.values()], [] , [], 1)
334 if self.input in ready:
335 try:
336 r = self.input.read()
337 if len(r) == 0:
338 # EOF on pipe, server must have terminated
339 self.sigterm_exception(signal.SIGTERM, None)
340 self.queue = self.queue + r
341 except (OSError, IOError):
342 pass
343 if len(self.queue):
Patrick Williamsc0f7c042017-02-23 20:41:17 -0600344 self.handle_item(b"cookerconfig", self.handle_cookercfg)
345 self.handle_item(b"workerdata", self.handle_workerdata)
346 self.handle_item(b"runtask", self.handle_runtask)
347 self.handle_item(b"finishnow", self.handle_finishnow)
348 self.handle_item(b"ping", self.handle_ping)
349 self.handle_item(b"quit", self.handle_quit)
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500350
351 for pipe in self.build_pipes:
352 self.build_pipes[pipe].read()
353 if len(self.build_pids):
354 self.process_waitpid()
355 worker_flush()
356
357
358 def handle_item(self, item, func):
Patrick Williamsc0f7c042017-02-23 20:41:17 -0600359 if self.queue.startswith(b"<" + item + b">"):
360 index = self.queue.find(b"</" + item + b">")
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500361 while index != -1:
362 func(self.queue[(len(item) + 2):index])
363 self.queue = self.queue[(index + len(item) + 3):]
Patrick Williamsc0f7c042017-02-23 20:41:17 -0600364 index = self.queue.find(b"</" + item + b">")
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500365
366 def handle_cookercfg(self, data):
367 self.cookercfg = pickle.loads(data)
368 self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True)
369 self.databuilder.parseBaseConfiguration()
370 self.data = self.databuilder.data
371
372 def handle_workerdata(self, data):
373 self.workerdata = pickle.loads(data)
374 bb.msg.loggerDefaultDebugLevel = self.workerdata["logdefaultdebug"]
375 bb.msg.loggerDefaultVerbose = self.workerdata["logdefaultverbose"]
376 bb.msg.loggerVerboseLogs = self.workerdata["logdefaultverboselogs"]
377 bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"]
Patrick Williamsc0f7c042017-02-23 20:41:17 -0600378 for mc in self.databuilder.mcdata:
379 self.databuilder.mcdata[mc].setVar("PRSERV_HOST", self.workerdata["prhost"])
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500380
381 def handle_ping(self, _):
382 workerlog_write("Handling ping\n")
383
Patrick Williamsc0f7c042017-02-23 20:41:17 -0600384 logger.warning("Pong from bitbake-worker!")
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500385
386 def handle_quit(self, data):
387 workerlog_write("Handling quit\n")
388
389 global normalexit
390 normalexit = True
391 sys.exit(0)
392
393 def handle_runtask(self, data):
394 fn, task, taskname, quieterrors, appends, taskdepdata = pickle.loads(data)
395 workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname))
396
Patrick Williamsc0f7c042017-02-23 20:41:17 -0600397 pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.databuilder, self.workerdata, fn, task, taskname, appends, taskdepdata, quieterrors)
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500398
399 self.build_pids[pid] = task
400 self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout)
401
402 def process_waitpid(self):
403 """
404 Return none is there are no processes awaiting result collection, otherwise
405 collect the process exit codes and close the information pipe.
406 """
407 try:
408 pid, status = os.waitpid(-1, os.WNOHANG)
409 if pid == 0 or os.WIFSTOPPED(status):
410 return None
411 except OSError:
412 return None
413
414 workerlog_write("Exit code of %s for pid %s\n" % (status, pid))
415
416 if os.WIFEXITED(status):
417 status = os.WEXITSTATUS(status)
418 elif os.WIFSIGNALED(status):
419 # Per shell conventions for $?, when a process exits due to
420 # a signal, we return an exit code of 128 + SIGNUM
421 status = 128 + os.WTERMSIG(status)
422
423 task = self.build_pids[pid]
424 del self.build_pids[pid]
425
426 self.build_pipes[pid].close()
427 del self.build_pipes[pid]
428
Patrick Williamsc0f7c042017-02-23 20:41:17 -0600429 worker_fire_prepickled(b"<exitcode>" + pickle.dumps((task, status)) + b"</exitcode>")
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500430
431 def handle_finishnow(self, _):
432 if self.build_pids:
433 logger.info("Sending SIGTERM to remaining %s tasks", len(self.build_pids))
Patrick Williamsc0f7c042017-02-23 20:41:17 -0600434 for k, v in iter(self.build_pids.items()):
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500435 try:
436 os.kill(-k, signal.SIGTERM)
437 os.waitpid(-1, 0)
438 except:
439 pass
440 for pipe in self.build_pipes:
441 self.build_pipes[pipe].read()
442
443try:
Patrick Williamsc0f7c042017-02-23 20:41:17 -0600444 worker = BitbakeWorker(os.fdopen(sys.stdin.fileno(), 'rb'))
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500445 if not profiling:
446 worker.serve()
447 else:
448 profname = "profile-worker.log"
449 prof = profile.Profile()
450 try:
451 profile.Profile.runcall(prof, worker.serve)
452 finally:
453 prof.dump_stats(profname)
454 bb.utils.process_profilelog(profname)
455except BaseException as e:
456 if not normalexit:
457 import traceback
458 sys.stderr.write(traceback.format_exc())
459 sys.stderr.write(str(e))
460while len(worker_queue):
461 worker_flush()
462workerlog_write("exitting")
463sys.exit(0)
464