blob: db3c4b184f5940de1ed84759b13cdec9baf2e74d [file] [log] [blame]
Patrick Williamsc0f7c042017-02-23 20:41:17 -06001#!/usr/bin/env python3
Patrick Williamsc124f4f2015-09-15 14:41:29 -05002
3import os
4import sys
5import warnings
6sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib'))
7from bb import fetch2
8import logging
9import bb
10import select
11import errno
12import signal
Patrick Williamsc0f7c042017-02-23 20:41:17 -060013import pickle
Brad Bishop37a0e4d2017-12-04 01:01:44 -050014import traceback
15import queue
Patrick Williamsf1e5d692016-03-30 15:21:19 -050016from multiprocessing import Lock
Brad Bishop37a0e4d2017-12-04 01:01:44 -050017from threading import Thread
Patrick Williamsc124f4f2015-09-15 14:41:29 -050018
Patrick Williamsc0f7c042017-02-23 20:41:17 -060019if sys.getfilesystemencoding() != "utf-8":
20 sys.exit("Please use a locale setting which supports utf-8.\nPython can't change the filesystem locale after loading so we need a utf-8 when python starts or things won't work.")
21
Patrick Williamsc124f4f2015-09-15 14:41:29 -050022# Users shouldn't be running this code directly
23if len(sys.argv) != 2 or not sys.argv[1].startswith("decafbad"):
24 print("bitbake-worker is meant for internal execution by bitbake itself, please don't use it standalone.")
25 sys.exit(1)
26
27profiling = False
Patrick Williamsd8c66bc2016-06-20 12:57:21 -050028if sys.argv[1].startswith("decafbadbad"):
Patrick Williamsc124f4f2015-09-15 14:41:29 -050029 profiling = True
30 try:
31 import cProfile as profile
32 except:
33 import profile
34
35# Unbuffer stdout to avoid log truncation in the event
36# of an unorderly exit as well as to provide timely
37# updates to log files for use with tail
38try:
39 if sys.stdout.name == '<stdout>':
Patrick Williamsc0f7c042017-02-23 20:41:17 -060040 import fcntl
41 fl = fcntl.fcntl(sys.stdout.fileno(), fcntl.F_GETFL)
42 fl |= os.O_SYNC
43 fcntl.fcntl(sys.stdout.fileno(), fcntl.F_SETFL, fl)
44 #sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
Patrick Williamsc124f4f2015-09-15 14:41:29 -050045except:
46 pass
47
48logger = logging.getLogger("BitBake")
49
Patrick Williamsc124f4f2015-09-15 14:41:29 -050050worker_pipe = sys.stdout.fileno()
51bb.utils.nonblockingfd(worker_pipe)
Patrick Williamsf1e5d692016-03-30 15:21:19 -050052# Need to guard against multiprocessing being used in child processes
53# and multiple processes trying to write to the parent at the same time
54worker_pipe_lock = None
Patrick Williamsc124f4f2015-09-15 14:41:29 -050055
56handler = bb.event.LogHandler()
57logger.addHandler(handler)
58
59if 0:
60 # Code to write out a log file of all events passing through the worker
61 logfilename = "/tmp/workerlogfile"
62 format_str = "%(levelname)s: %(message)s"
63 conlogformat = bb.msg.BBLogFormatter(format_str)
64 consolelog = logging.FileHandler(logfilename)
65 bb.msg.addDefaultlogFilter(consolelog)
66 consolelog.setFormatter(conlogformat)
67 logger.addHandler(consolelog)
68
Brad Bishop37a0e4d2017-12-04 01:01:44 -050069worker_queue = queue.Queue()
Patrick Williamsc124f4f2015-09-15 14:41:29 -050070
71def worker_fire(event, d):
Patrick Williamsc0f7c042017-02-23 20:41:17 -060072 data = b"<event>" + pickle.dumps(event) + b"</event>"
Patrick Williamsc124f4f2015-09-15 14:41:29 -050073 worker_fire_prepickled(data)
74
75def worker_fire_prepickled(event):
76 global worker_queue
77
Brad Bishop37a0e4d2017-12-04 01:01:44 -050078 worker_queue.put(event)
Patrick Williamsc124f4f2015-09-15 14:41:29 -050079
Brad Bishop37a0e4d2017-12-04 01:01:44 -050080#
81# We can end up with write contention with the cooker, it can be trying to send commands
82# and we can be trying to send event data back. Therefore use a separate thread for writing
83# back data to cooker.
84#
85worker_thread_exit = False
Patrick Williamsc124f4f2015-09-15 14:41:29 -050086
Brad Bishop37a0e4d2017-12-04 01:01:44 -050087def worker_flush(worker_queue):
88 worker_queue_int = b""
89 global worker_pipe, worker_thread_exit
Patrick Williamsc124f4f2015-09-15 14:41:29 -050090
Brad Bishop37a0e4d2017-12-04 01:01:44 -050091 while True:
92 try:
93 worker_queue_int = worker_queue_int + worker_queue.get(True, 1)
94 except queue.Empty:
95 pass
96 while (worker_queue_int or not worker_queue.empty()):
97 try:
98 if not worker_queue.empty():
99 worker_queue_int = worker_queue_int + worker_queue.get()
100 written = os.write(worker_pipe, worker_queue_int)
101 worker_queue_int = worker_queue_int[written:]
102 except (IOError, OSError) as e:
103 if e.errno != errno.EAGAIN and e.errno != errno.EPIPE:
104 raise
105 if worker_thread_exit and worker_queue.empty() and not worker_queue_int:
106 return
107
108worker_thread = Thread(target=worker_flush, args=(worker_queue,))
109worker_thread.start()
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500110
111def worker_child_fire(event, d):
112 global worker_pipe
Patrick Williamsf1e5d692016-03-30 15:21:19 -0500113 global worker_pipe_lock
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500114
Patrick Williamsc0f7c042017-02-23 20:41:17 -0600115 data = b"<event>" + pickle.dumps(event) + b"</event>"
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500116 try:
Patrick Williamsf1e5d692016-03-30 15:21:19 -0500117 worker_pipe_lock.acquire()
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500118 worker_pipe.write(data)
Patrick Williamsf1e5d692016-03-30 15:21:19 -0500119 worker_pipe_lock.release()
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500120 except IOError:
121 sigterm_handler(None, None)
122 raise
123
124bb.event.worker_fire = worker_fire
125
126lf = None
127#lf = open("/tmp/workercommandlog", "w+")
128def workerlog_write(msg):
129 if lf:
130 lf.write(msg)
131 lf.flush()
132
133def sigterm_handler(signum, frame):
134 signal.signal(signal.SIGTERM, signal.SIG_DFL)
135 os.killpg(0, signal.SIGTERM)
136 sys.exit()
137
Patrick Williamsc0f7c042017-02-23 20:41:17 -0600138def fork_off_task(cfg, data, databuilder, workerdata, fn, task, taskname, appends, taskdepdata, quieterrors=False):
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500139 # We need to setup the environment BEFORE the fork, since
140 # a fork() or exec*() activates PSEUDO...
141
142 envbackup = {}
143 fakeenv = {}
144 umask = None
145
146 taskdep = workerdata["taskdeps"][fn]
147 if 'umask' in taskdep and taskname in taskdep['umask']:
148 # umask might come in as a number or text string..
149 try:
150 umask = int(taskdep['umask'][taskname],8)
151 except TypeError:
152 umask = taskdep['umask'][taskname]
153
154 # We can't use the fakeroot environment in a dry run as it possibly hasn't been built
155 if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not cfg.dry_run:
156 envvars = (workerdata["fakerootenv"][fn] or "").split()
157 for key, value in (var.split('=') for var in envvars):
158 envbackup[key] = os.environ.get(key)
159 os.environ[key] = value
160 fakeenv[key] = value
161
162 fakedirs = (workerdata["fakerootdirs"][fn] or "").split()
163 for p in fakedirs:
164 bb.utils.mkdirhier(p)
165 logger.debug(2, 'Running %s:%s under fakeroot, fakedirs: %s' %
166 (fn, taskname, ', '.join(fakedirs)))
167 else:
168 envvars = (workerdata["fakerootnoenv"][fn] or "").split()
169 for key, value in (var.split('=') for var in envvars):
170 envbackup[key] = os.environ.get(key)
171 os.environ[key] = value
172 fakeenv[key] = value
173
174 sys.stdout.flush()
175 sys.stderr.flush()
176
177 try:
178 pipein, pipeout = os.pipe()
179 pipein = os.fdopen(pipein, 'rb', 4096)
180 pipeout = os.fdopen(pipeout, 'wb', 0)
181 pid = os.fork()
182 except OSError as e:
Patrick Williamsc0f7c042017-02-23 20:41:17 -0600183 logger.critical("fork failed: %d (%s)" % (e.errno, e.strerror))
184 sys.exit(1)
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500185
186 if pid == 0:
187 def child():
188 global worker_pipe
Patrick Williamsf1e5d692016-03-30 15:21:19 -0500189 global worker_pipe_lock
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500190 pipein.close()
191
192 signal.signal(signal.SIGTERM, sigterm_handler)
193 # Let SIGHUP exit as SIGTERM
194 signal.signal(signal.SIGHUP, sigterm_handler)
195 bb.utils.signal_on_parent_exit("SIGTERM")
196
197 # Save out the PID so that the event can include it the
198 # events
199 bb.event.worker_pid = os.getpid()
200 bb.event.worker_fire = worker_child_fire
201 worker_pipe = pipeout
Patrick Williamsf1e5d692016-03-30 15:21:19 -0500202 worker_pipe_lock = Lock()
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500203
204 # Make the child the process group leader and ensure no
205 # child process will be controlled by the current terminal
206 # This ensures signals sent to the controlling terminal like Ctrl+C
207 # don't stop the child processes.
208 os.setsid()
209 # No stdin
210 newsi = os.open(os.devnull, os.O_RDWR)
211 os.dup2(newsi, sys.stdin.fileno())
212
213 if umask:
214 os.umask(umask)
215
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500216 try:
Patrick Williamsc0f7c042017-02-23 20:41:17 -0600217 bb_cache = bb.cache.NoCache(databuilder)
218 (realfn, virtual, mc) = bb.cache.virtualfn2realfn(fn)
219 the_data = databuilder.mcdata[mc]
220 the_data.setVar("BB_WORKERCONTEXT", "1")
221 the_data.setVar("BB_TASKDEPDATA", taskdepdata)
222 the_data.setVar("BUILDNAME", workerdata["buildname"])
223 the_data.setVar("DATE", workerdata["date"])
224 the_data.setVar("TIME", workerdata["time"])
225 bb.parse.siggen.set_taskdata(workerdata["sigdata"])
226 ret = 0
227
228 the_data = bb_cache.loadDataFull(fn, appends)
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500229 the_data.setVar('BB_TASKHASH', workerdata["runq_hash"][task])
230
Patrick Williamsd8c66bc2016-06-20 12:57:21 -0500231 bb.utils.set_process_name("%s:%s" % (the_data.getVar("PN", True), taskname.replace("do_", "")))
232
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500233 # exported_vars() returns a generator which *cannot* be passed to os.environ.update()
234 # successfully. We also need to unset anything from the environment which shouldn't be there
235 exports = bb.data.exported_vars(the_data)
Patrick Williamsc0f7c042017-02-23 20:41:17 -0600236
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500237 bb.utils.empty_environment()
238 for e, v in exports:
239 os.environ[e] = v
Patrick Williamsc0f7c042017-02-23 20:41:17 -0600240
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500241 for e in fakeenv:
242 os.environ[e] = fakeenv[e]
243 the_data.setVar(e, fakeenv[e])
244 the_data.setVarFlag(e, 'export', "1")
245
Patrick Williamsc0f7c042017-02-23 20:41:17 -0600246 task_exports = the_data.getVarFlag(taskname, 'exports', True)
247 if task_exports:
248 for e in task_exports.split():
249 the_data.setVarFlag(e, 'export', '1')
250 v = the_data.getVar(e, True)
251 if v is not None:
252 os.environ[e] = v
253
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500254 if quieterrors:
255 the_data.setVarFlag(taskname, "quieterrors", "1")
256
Brad Bishop37a0e4d2017-12-04 01:01:44 -0500257 except Exception:
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500258 if not quieterrors:
Brad Bishop37a0e4d2017-12-04 01:01:44 -0500259 logger.critical(traceback.format_exc())
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500260 os._exit(1)
261 try:
262 if cfg.dry_run:
263 return 0
264 return bb.build.exec_task(fn, taskname, the_data, cfg.profile)
265 except:
266 os._exit(1)
267 if not profiling:
268 os._exit(child())
269 else:
270 profname = "profile-%s.log" % (fn.replace("/", "-") + "-" + taskname)
271 prof = profile.Profile()
272 try:
273 ret = profile.Profile.runcall(prof, child)
274 finally:
275 prof.dump_stats(profname)
276 bb.utils.process_profilelog(profname)
277 os._exit(ret)
278 else:
Patrick Williamsc0f7c042017-02-23 20:41:17 -0600279 for key, value in iter(envbackup.items()):
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500280 if value is None:
281 del os.environ[key]
282 else:
283 os.environ[key] = value
284
285 return pid, pipein, pipeout
286
287class runQueueWorkerPipe():
288 """
289 Abstraction for a pipe between a worker thread and the worker server
290 """
291 def __init__(self, pipein, pipeout):
292 self.input = pipein
293 if pipeout:
294 pipeout.close()
295 bb.utils.nonblockingfd(self.input)
Patrick Williamsc0f7c042017-02-23 20:41:17 -0600296 self.queue = b""
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500297
298 def read(self):
299 start = len(self.queue)
300 try:
Patrick Williamsc0f7c042017-02-23 20:41:17 -0600301 self.queue = self.queue + (self.input.read(102400) or b"")
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500302 except (OSError, IOError) as e:
303 if e.errno != errno.EAGAIN:
304 raise
305
306 end = len(self.queue)
Patrick Williamsc0f7c042017-02-23 20:41:17 -0600307 index = self.queue.find(b"</event>")
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500308 while index != -1:
309 worker_fire_prepickled(self.queue[:index+8])
310 self.queue = self.queue[index+8:]
Patrick Williamsc0f7c042017-02-23 20:41:17 -0600311 index = self.queue.find(b"</event>")
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500312 return (end > start)
313
314 def close(self):
315 while self.read():
316 continue
317 if len(self.queue) > 0:
318 print("Warning, worker child left partial message: %s" % self.queue)
319 self.input.close()
320
321normalexit = False
322
323class BitbakeWorker(object):
324 def __init__(self, din):
325 self.input = din
326 bb.utils.nonblockingfd(self.input)
Patrick Williamsc0f7c042017-02-23 20:41:17 -0600327 self.queue = b""
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500328 self.cookercfg = None
329 self.databuilder = None
330 self.data = None
331 self.build_pids = {}
332 self.build_pipes = {}
333
334 signal.signal(signal.SIGTERM, self.sigterm_exception)
335 # Let SIGHUP exit as SIGTERM
336 signal.signal(signal.SIGHUP, self.sigterm_exception)
Patrick Williamsd8c66bc2016-06-20 12:57:21 -0500337 if "beef" in sys.argv[1]:
338 bb.utils.set_process_name("Worker (Fakeroot)")
339 else:
340 bb.utils.set_process_name("Worker")
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500341
342 def sigterm_exception(self, signum, stackframe):
343 if signum == signal.SIGTERM:
Patrick Williamsd8c66bc2016-06-20 12:57:21 -0500344 bb.warn("Worker received SIGTERM, shutting down...")
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500345 elif signum == signal.SIGHUP:
Patrick Williamsd8c66bc2016-06-20 12:57:21 -0500346 bb.warn("Worker received SIGHUP, shutting down...")
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500347 self.handle_finishnow(None)
348 signal.signal(signal.SIGTERM, signal.SIG_DFL)
349 os.kill(os.getpid(), signal.SIGTERM)
350
351 def serve(self):
352 while True:
353 (ready, _, _) = select.select([self.input] + [i.input for i in self.build_pipes.values()], [] , [], 1)
354 if self.input in ready:
355 try:
356 r = self.input.read()
357 if len(r) == 0:
358 # EOF on pipe, server must have terminated
359 self.sigterm_exception(signal.SIGTERM, None)
360 self.queue = self.queue + r
361 except (OSError, IOError):
362 pass
363 if len(self.queue):
Patrick Williamsc0f7c042017-02-23 20:41:17 -0600364 self.handle_item(b"cookerconfig", self.handle_cookercfg)
365 self.handle_item(b"workerdata", self.handle_workerdata)
366 self.handle_item(b"runtask", self.handle_runtask)
367 self.handle_item(b"finishnow", self.handle_finishnow)
368 self.handle_item(b"ping", self.handle_ping)
369 self.handle_item(b"quit", self.handle_quit)
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500370
371 for pipe in self.build_pipes:
372 self.build_pipes[pipe].read()
373 if len(self.build_pids):
374 self.process_waitpid()
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500375
376
377 def handle_item(self, item, func):
Patrick Williamsc0f7c042017-02-23 20:41:17 -0600378 if self.queue.startswith(b"<" + item + b">"):
379 index = self.queue.find(b"</" + item + b">")
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500380 while index != -1:
381 func(self.queue[(len(item) + 2):index])
382 self.queue = self.queue[(index + len(item) + 3):]
Patrick Williamsc0f7c042017-02-23 20:41:17 -0600383 index = self.queue.find(b"</" + item + b">")
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500384
385 def handle_cookercfg(self, data):
386 self.cookercfg = pickle.loads(data)
387 self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True)
388 self.databuilder.parseBaseConfiguration()
389 self.data = self.databuilder.data
390
391 def handle_workerdata(self, data):
392 self.workerdata = pickle.loads(data)
393 bb.msg.loggerDefaultDebugLevel = self.workerdata["logdefaultdebug"]
394 bb.msg.loggerDefaultVerbose = self.workerdata["logdefaultverbose"]
395 bb.msg.loggerVerboseLogs = self.workerdata["logdefaultverboselogs"]
396 bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"]
Patrick Williamsc0f7c042017-02-23 20:41:17 -0600397 for mc in self.databuilder.mcdata:
398 self.databuilder.mcdata[mc].setVar("PRSERV_HOST", self.workerdata["prhost"])
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500399
400 def handle_ping(self, _):
401 workerlog_write("Handling ping\n")
402
Patrick Williamsc0f7c042017-02-23 20:41:17 -0600403 logger.warning("Pong from bitbake-worker!")
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500404
405 def handle_quit(self, data):
406 workerlog_write("Handling quit\n")
407
408 global normalexit
409 normalexit = True
410 sys.exit(0)
411
412 def handle_runtask(self, data):
413 fn, task, taskname, quieterrors, appends, taskdepdata = pickle.loads(data)
414 workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname))
415
Patrick Williamsc0f7c042017-02-23 20:41:17 -0600416 pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.databuilder, self.workerdata, fn, task, taskname, appends, taskdepdata, quieterrors)
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500417
418 self.build_pids[pid] = task
419 self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout)
420
421 def process_waitpid(self):
422 """
423 Return none is there are no processes awaiting result collection, otherwise
424 collect the process exit codes and close the information pipe.
425 """
426 try:
427 pid, status = os.waitpid(-1, os.WNOHANG)
428 if pid == 0 or os.WIFSTOPPED(status):
429 return None
430 except OSError:
431 return None
432
433 workerlog_write("Exit code of %s for pid %s\n" % (status, pid))
434
435 if os.WIFEXITED(status):
436 status = os.WEXITSTATUS(status)
437 elif os.WIFSIGNALED(status):
438 # Per shell conventions for $?, when a process exits due to
439 # a signal, we return an exit code of 128 + SIGNUM
440 status = 128 + os.WTERMSIG(status)
441
442 task = self.build_pids[pid]
443 del self.build_pids[pid]
444
445 self.build_pipes[pid].close()
446 del self.build_pipes[pid]
447
Patrick Williamsc0f7c042017-02-23 20:41:17 -0600448 worker_fire_prepickled(b"<exitcode>" + pickle.dumps((task, status)) + b"</exitcode>")
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500449
450 def handle_finishnow(self, _):
451 if self.build_pids:
452 logger.info("Sending SIGTERM to remaining %s tasks", len(self.build_pids))
Patrick Williamsc0f7c042017-02-23 20:41:17 -0600453 for k, v in iter(self.build_pids.items()):
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500454 try:
455 os.kill(-k, signal.SIGTERM)
456 os.waitpid(-1, 0)
457 except:
458 pass
459 for pipe in self.build_pipes:
460 self.build_pipes[pipe].read()
461
462try:
Patrick Williamsc0f7c042017-02-23 20:41:17 -0600463 worker = BitbakeWorker(os.fdopen(sys.stdin.fileno(), 'rb'))
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500464 if not profiling:
465 worker.serve()
466 else:
467 profname = "profile-worker.log"
468 prof = profile.Profile()
469 try:
470 profile.Profile.runcall(prof, worker.serve)
471 finally:
472 prof.dump_stats(profname)
473 bb.utils.process_profilelog(profname)
474except BaseException as e:
475 if not normalexit:
476 import traceback
477 sys.stderr.write(traceback.format_exc())
478 sys.stderr.write(str(e))
Brad Bishop37a0e4d2017-12-04 01:01:44 -0500479
480worker_thread_exit = True
481worker_thread.join()
482
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500483workerlog_write("exitting")
484sys.exit(0)
485