| #!/usr/bin/env python3 |
| # |
| # Copyright BitBake Contributors |
| # |
| # SPDX-License-Identifier: GPL-2.0-only |
| # |
| |
| import os |
| import sys |
| import warnings |
| warnings.simplefilter("default") |
| sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib')) |
| from bb import fetch2 |
| import logging |
| import bb |
| import select |
| import errno |
| import signal |
| import pickle |
| import traceback |
| import queue |
| import shlex |
| import subprocess |
| from multiprocessing import Lock |
| from threading import Thread |
| |
| bb.utils.check_system_locale() |
| |
| # Users shouldn't be running this code directly |
| if len(sys.argv) != 2 or not sys.argv[1].startswith("decafbad"): |
| print("bitbake-worker is meant for internal execution by bitbake itself, please don't use it standalone.") |
| sys.exit(1) |
| |
| profiling = False |
| if sys.argv[1].startswith("decafbadbad"): |
| profiling = True |
| try: |
| import cProfile as profile |
| except: |
| import profile |
| |
| # Unbuffer stdout to avoid log truncation in the event |
| # of an unorderly exit as well as to provide timely |
| # updates to log files for use with tail |
| try: |
| if sys.stdout.name == '<stdout>': |
| import fcntl |
| fl = fcntl.fcntl(sys.stdout.fileno(), fcntl.F_GETFL) |
| fl |= os.O_SYNC |
| fcntl.fcntl(sys.stdout.fileno(), fcntl.F_SETFL, fl) |
| #sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0) |
| except: |
| pass |
| |
| logger = logging.getLogger("BitBake") |
| |
| worker_pipe = sys.stdout.fileno() |
| bb.utils.nonblockingfd(worker_pipe) |
| # Need to guard against multiprocessing being used in child processes |
| # and multiple processes trying to write to the parent at the same time |
| worker_pipe_lock = None |
| |
| handler = bb.event.LogHandler() |
| logger.addHandler(handler) |
| |
| if 0: |
| # Code to write out a log file of all events passing through the worker |
| logfilename = "/tmp/workerlogfile" |
| format_str = "%(levelname)s: %(message)s" |
| conlogformat = bb.msg.BBLogFormatter(format_str) |
| consolelog = logging.FileHandler(logfilename) |
| consolelog.setFormatter(conlogformat) |
| logger.addHandler(consolelog) |
| |
| worker_queue = queue.Queue() |
| |
| def worker_fire(event, d): |
| data = b"<event>" + pickle.dumps(event) + b"</event>" |
| worker_fire_prepickled(data) |
| |
| def worker_fire_prepickled(event): |
| global worker_queue |
| |
| worker_queue.put(event) |
| |
| # |
| # We can end up with write contention with the cooker, it can be trying to send commands |
| # and we can be trying to send event data back. Therefore use a separate thread for writing |
| # back data to cooker. |
| # |
| worker_thread_exit = False |
| |
| def worker_flush(worker_queue): |
| worker_queue_int = bytearray() |
| global worker_pipe, worker_thread_exit |
| |
| while True: |
| try: |
| worker_queue_int.extend(worker_queue.get(True, 1)) |
| except queue.Empty: |
| pass |
| while (worker_queue_int or not worker_queue.empty()): |
| try: |
| (_, ready, _) = select.select([], [worker_pipe], [], 1) |
| if not worker_queue.empty(): |
| worker_queue_int.extend(worker_queue.get()) |
| written = os.write(worker_pipe, worker_queue_int) |
| worker_queue_int = worker_queue_int[written:] |
| except (IOError, OSError) as e: |
| if e.errno != errno.EAGAIN and e.errno != errno.EPIPE: |
| raise |
| if worker_thread_exit and worker_queue.empty() and not worker_queue_int: |
| return |
| |
| worker_thread = Thread(target=worker_flush, args=(worker_queue,)) |
| worker_thread.start() |
| |
| def worker_child_fire(event, d): |
| global worker_pipe |
| global worker_pipe_lock |
| |
| data = b"<event>" + pickle.dumps(event) + b"</event>" |
| try: |
| with bb.utils.lock_timeout(worker_pipe_lock): |
| while(len(data)): |
| written = worker_pipe.write(data) |
| data = data[written:] |
| except IOError: |
| sigterm_handler(None, None) |
| raise |
| |
| bb.event.worker_fire = worker_fire |
| |
| lf = None |
| #lf = open("/tmp/workercommandlog", "w+") |
| def workerlog_write(msg): |
| if lf: |
| lf.write(msg) |
| lf.flush() |
| |
| def sigterm_handler(signum, frame): |
| signal.signal(signal.SIGTERM, signal.SIG_DFL) |
| os.killpg(0, signal.SIGTERM) |
| sys.exit() |
| |
| def fork_off_task(cfg, data, databuilder, workerdata, extraconfigdata, runtask): |
| |
| fn = runtask['fn'] |
| task = runtask['task'] |
| taskname = runtask['taskname'] |
| taskhash = runtask['taskhash'] |
| unihash = runtask['unihash'] |
| appends = runtask['appends'] |
| layername = runtask['layername'] |
| taskdepdata = runtask['taskdepdata'] |
| quieterrors = runtask['quieterrors'] |
| # We need to setup the environment BEFORE the fork, since |
| # a fork() or exec*() activates PSEUDO... |
| |
| envbackup = {} |
| fakeroot = False |
| fakeenv = {} |
| umask = None |
| |
| uid = os.getuid() |
| gid = os.getgid() |
| |
| taskdep = runtask['taskdep'] |
| if 'umask' in taskdep and taskname in taskdep['umask']: |
| umask = taskdep['umask'][taskname] |
| elif workerdata["umask"]: |
| umask = workerdata["umask"] |
| if umask: |
| # umask might come in as a number or text string.. |
| try: |
| umask = int(umask, 8) |
| except TypeError: |
| pass |
| |
| dry_run = cfg.dry_run or runtask['dry_run'] |
| |
| # We can't use the fakeroot environment in a dry run as it possibly hasn't been built |
| if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not dry_run: |
| fakeroot = True |
| envvars = (runtask['fakerootenv'] or "").split() |
| for key, value in (var.split('=') for var in envvars): |
| envbackup[key] = os.environ.get(key) |
| os.environ[key] = value |
| fakeenv[key] = value |
| |
| fakedirs = (runtask['fakerootdirs'] or "").split() |
| for p in fakedirs: |
| bb.utils.mkdirhier(p) |
| logger.debug2('Running %s:%s under fakeroot, fakedirs: %s' % |
| (fn, taskname, ', '.join(fakedirs))) |
| else: |
| envvars = (runtask['fakerootnoenv'] or "").split() |
| for key, value in (var.split('=') for var in envvars): |
| envbackup[key] = os.environ.get(key) |
| os.environ[key] = value |
| fakeenv[key] = value |
| |
| sys.stdout.flush() |
| sys.stderr.flush() |
| |
| try: |
| pipein, pipeout = os.pipe() |
| pipein = os.fdopen(pipein, 'rb', 4096) |
| pipeout = os.fdopen(pipeout, 'wb', 0) |
| pid = os.fork() |
| except OSError as e: |
| logger.critical("fork failed: %d (%s)" % (e.errno, e.strerror)) |
| sys.exit(1) |
| |
| if pid == 0: |
| def child(): |
| global worker_pipe |
| global worker_pipe_lock |
| pipein.close() |
| |
| bb.utils.signal_on_parent_exit("SIGTERM") |
| |
| # Save out the PID so that the event can include it the |
| # events |
| bb.event.worker_pid = os.getpid() |
| bb.event.worker_fire = worker_child_fire |
| worker_pipe = pipeout |
| worker_pipe_lock = Lock() |
| |
| # Make the child the process group leader and ensure no |
| # child process will be controlled by the current terminal |
| # This ensures signals sent to the controlling terminal like Ctrl+C |
| # don't stop the child processes. |
| os.setsid() |
| |
| signal.signal(signal.SIGTERM, sigterm_handler) |
| # Let SIGHUP exit as SIGTERM |
| signal.signal(signal.SIGHUP, sigterm_handler) |
| |
| # No stdin |
| newsi = os.open(os.devnull, os.O_RDWR) |
| os.dup2(newsi, sys.stdin.fileno()) |
| |
| if umask: |
| os.umask(umask) |
| |
| try: |
| (realfn, virtual, mc) = bb.cache.virtualfn2realfn(fn) |
| the_data = databuilder.mcdata[mc] |
| the_data.setVar("BB_WORKERCONTEXT", "1") |
| the_data.setVar("BB_TASKDEPDATA", taskdepdata) |
| the_data.setVar('BB_CURRENTTASK', taskname.replace("do_", "")) |
| if cfg.limited_deps: |
| the_data.setVar("BB_LIMITEDDEPS", "1") |
| the_data.setVar("BUILDNAME", workerdata["buildname"]) |
| the_data.setVar("DATE", workerdata["date"]) |
| the_data.setVar("TIME", workerdata["time"]) |
| for varname, value in extraconfigdata.items(): |
| the_data.setVar(varname, value) |
| |
| bb.parse.siggen.set_taskdata(workerdata["sigdata"]) |
| if "newhashes" in workerdata: |
| bb.parse.siggen.set_taskhashes(workerdata["newhashes"]) |
| ret = 0 |
| |
| the_data = databuilder.parseRecipe(fn, appends, layername) |
| the_data.setVar('BB_TASKHASH', taskhash) |
| the_data.setVar('BB_UNIHASH', unihash) |
| bb.parse.siggen.setup_datacache_from_datastore(fn, the_data) |
| |
| bb.utils.set_process_name("%s:%s" % (the_data.getVar("PN"), taskname.replace("do_", ""))) |
| |
| if not bb.utils.to_boolean(the_data.getVarFlag(taskname, 'network')): |
| if bb.utils.is_local_uid(uid): |
| logger.debug("Attempting to disable network for %s" % taskname) |
| bb.utils.disable_network(uid, gid) |
| else: |
| logger.debug("Skipping disable network for %s since %s is not a local uid." % (taskname, uid)) |
| |
| # exported_vars() returns a generator which *cannot* be passed to os.environ.update() |
| # successfully. We also need to unset anything from the environment which shouldn't be there |
| exports = bb.data.exported_vars(the_data) |
| |
| bb.utils.empty_environment() |
| for e, v in exports: |
| os.environ[e] = v |
| |
| for e in fakeenv: |
| os.environ[e] = fakeenv[e] |
| the_data.setVar(e, fakeenv[e]) |
| the_data.setVarFlag(e, 'export', "1") |
| |
| task_exports = the_data.getVarFlag(taskname, 'exports') |
| if task_exports: |
| for e in task_exports.split(): |
| the_data.setVarFlag(e, 'export', '1') |
| v = the_data.getVar(e) |
| if v is not None: |
| os.environ[e] = v |
| |
| if quieterrors: |
| the_data.setVarFlag(taskname, "quieterrors", "1") |
| |
| except Exception: |
| if not quieterrors: |
| logger.critical(traceback.format_exc()) |
| os._exit(1) |
| try: |
| if dry_run: |
| return 0 |
| try: |
| ret = bb.build.exec_task(fn, taskname, the_data, cfg.profile) |
| finally: |
| if fakeroot: |
| fakerootcmd = shlex.split(the_data.getVar("FAKEROOTCMD")) |
| subprocess.run(fakerootcmd + ['-S'], check=True, stdout=subprocess.PIPE) |
| return ret |
| except: |
| os._exit(1) |
| if not profiling: |
| os._exit(child()) |
| else: |
| profname = "profile-%s.log" % (fn.replace("/", "-") + "-" + taskname) |
| prof = profile.Profile() |
| try: |
| ret = profile.Profile.runcall(prof, child) |
| finally: |
| prof.dump_stats(profname) |
| bb.utils.process_profilelog(profname) |
| os._exit(ret) |
| else: |
| for key, value in iter(envbackup.items()): |
| if value is None: |
| del os.environ[key] |
| else: |
| os.environ[key] = value |
| |
| return pid, pipein, pipeout |
| |
| class runQueueWorkerPipe(): |
| """ |
| Abstraction for a pipe between a worker thread and the worker server |
| """ |
| def __init__(self, pipein, pipeout): |
| self.input = pipein |
| if pipeout: |
| pipeout.close() |
| bb.utils.nonblockingfd(self.input) |
| self.queue = bytearray() |
| |
| def read(self): |
| start = len(self.queue) |
| try: |
| self.queue.extend(self.input.read(102400) or b"") |
| except (OSError, IOError) as e: |
| if e.errno != errno.EAGAIN: |
| raise |
| |
| end = len(self.queue) |
| index = self.queue.find(b"</event>") |
| while index != -1: |
| msg = self.queue[:index+8] |
| assert msg.startswith(b"<event>") and msg.count(b"<event>") == 1 |
| worker_fire_prepickled(msg) |
| self.queue = self.queue[index+8:] |
| index = self.queue.find(b"</event>") |
| return (end > start) |
| |
| def close(self): |
| while self.read(): |
| continue |
| if len(self.queue) > 0: |
| print("Warning, worker child left partial message: %s" % self.queue) |
| self.input.close() |
| |
| normalexit = False |
| |
| class BitbakeWorker(object): |
| def __init__(self, din): |
| self.input = din |
| bb.utils.nonblockingfd(self.input) |
| self.queue = bytearray() |
| self.cookercfg = None |
| self.databuilder = None |
| self.data = None |
| self.extraconfigdata = None |
| self.build_pids = {} |
| self.build_pipes = {} |
| |
| signal.signal(signal.SIGTERM, self.sigterm_exception) |
| # Let SIGHUP exit as SIGTERM |
| signal.signal(signal.SIGHUP, self.sigterm_exception) |
| if "beef" in sys.argv[1]: |
| bb.utils.set_process_name("Worker (Fakeroot)") |
| else: |
| bb.utils.set_process_name("Worker") |
| |
| def sigterm_exception(self, signum, stackframe): |
| if signum == signal.SIGTERM: |
| bb.warn("Worker received SIGTERM, shutting down...") |
| elif signum == signal.SIGHUP: |
| bb.warn("Worker received SIGHUP, shutting down...") |
| self.handle_finishnow(None) |
| signal.signal(signal.SIGTERM, signal.SIG_DFL) |
| os.kill(os.getpid(), signal.SIGTERM) |
| |
| def serve(self): |
| while True: |
| (ready, _, _) = select.select([self.input] + [i.input for i in self.build_pipes.values()], [] , [], 1) |
| if self.input in ready: |
| try: |
| r = self.input.read() |
| if len(r) == 0: |
| # EOF on pipe, server must have terminated |
| self.sigterm_exception(signal.SIGTERM, None) |
| self.queue.extend(r) |
| except (OSError, IOError): |
| pass |
| if len(self.queue): |
| self.handle_item(b"cookerconfig", self.handle_cookercfg) |
| self.handle_item(b"extraconfigdata", self.handle_extraconfigdata) |
| self.handle_item(b"workerdata", self.handle_workerdata) |
| self.handle_item(b"newtaskhashes", self.handle_newtaskhashes) |
| self.handle_item(b"runtask", self.handle_runtask) |
| self.handle_item(b"finishnow", self.handle_finishnow) |
| self.handle_item(b"ping", self.handle_ping) |
| self.handle_item(b"quit", self.handle_quit) |
| |
| for pipe in self.build_pipes: |
| if self.build_pipes[pipe].input in ready: |
| self.build_pipes[pipe].read() |
| if len(self.build_pids): |
| while self.process_waitpid(): |
| continue |
| |
| |
| def handle_item(self, item, func): |
| if self.queue.startswith(b"<" + item + b">"): |
| index = self.queue.find(b"</" + item + b">") |
| while index != -1: |
| try: |
| func(self.queue[(len(item) + 2):index]) |
| except pickle.UnpicklingError: |
| workerlog_write("Unable to unpickle data: %s\n" % ":".join("{:02x}".format(c) for c in self.queue)) |
| raise |
| self.queue = self.queue[(index + len(item) + 3):] |
| index = self.queue.find(b"</" + item + b">") |
| |
| def handle_cookercfg(self, data): |
| self.cookercfg = pickle.loads(data) |
| self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True) |
| self.databuilder.parseBaseConfiguration(worker=True) |
| self.data = self.databuilder.data |
| |
| def handle_extraconfigdata(self, data): |
| self.extraconfigdata = pickle.loads(data) |
| |
| def handle_workerdata(self, data): |
| self.workerdata = pickle.loads(data) |
| bb.build.verboseShellLogging = self.workerdata["build_verbose_shell"] |
| bb.build.verboseStdoutLogging = self.workerdata["build_verbose_stdout"] |
| bb.msg.loggerDefaultLogLevel = self.workerdata["logdefaultlevel"] |
| bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"] |
| for mc in self.databuilder.mcdata: |
| self.databuilder.mcdata[mc].setVar("PRSERV_HOST", self.workerdata["prhost"]) |
| self.databuilder.mcdata[mc].setVar("BB_HASHSERVE", self.workerdata["hashservaddr"]) |
| self.databuilder.mcdata[mc].setVar("__bbclasstype", "recipe") |
| |
| def handle_newtaskhashes(self, data): |
| self.workerdata["newhashes"] = pickle.loads(data) |
| |
| def handle_ping(self, _): |
| workerlog_write("Handling ping\n") |
| |
| logger.warning("Pong from bitbake-worker!") |
| |
| def handle_quit(self, data): |
| workerlog_write("Handling quit\n") |
| |
| global normalexit |
| normalexit = True |
| sys.exit(0) |
| |
| def handle_runtask(self, data): |
| runtask = pickle.loads(data) |
| |
| fn = runtask['fn'] |
| task = runtask['task'] |
| taskname = runtask['taskname'] |
| |
| workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname)) |
| |
| pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.databuilder, self.workerdata, self.extraconfigdata, runtask) |
| self.build_pids[pid] = task |
| self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout) |
| |
| def process_waitpid(self): |
| """ |
| Return none is there are no processes awaiting result collection, otherwise |
| collect the process exit codes and close the information pipe. |
| """ |
| try: |
| pid, status = os.waitpid(-1, os.WNOHANG) |
| if pid == 0 or os.WIFSTOPPED(status): |
| return False |
| except OSError: |
| return False |
| |
| workerlog_write("Exit code of %s for pid %s\n" % (status, pid)) |
| |
| if os.WIFEXITED(status): |
| status = os.WEXITSTATUS(status) |
| elif os.WIFSIGNALED(status): |
| # Per shell conventions for $?, when a process exits due to |
| # a signal, we return an exit code of 128 + SIGNUM |
| status = 128 + os.WTERMSIG(status) |
| |
| task = self.build_pids[pid] |
| del self.build_pids[pid] |
| |
| self.build_pipes[pid].close() |
| del self.build_pipes[pid] |
| |
| worker_fire_prepickled(b"<exitcode>" + pickle.dumps((task, status)) + b"</exitcode>") |
| |
| return True |
| |
| def handle_finishnow(self, _): |
| if self.build_pids: |
| logger.info("Sending SIGTERM to remaining %s tasks", len(self.build_pids)) |
| for k, v in iter(self.build_pids.items()): |
| try: |
| os.kill(-k, signal.SIGTERM) |
| os.waitpid(-1, 0) |
| except: |
| pass |
| for pipe in self.build_pipes: |
| self.build_pipes[pipe].read() |
| |
| try: |
| worker = BitbakeWorker(os.fdopen(sys.stdin.fileno(), 'rb')) |
| if not profiling: |
| worker.serve() |
| else: |
| profname = "profile-worker.log" |
| prof = profile.Profile() |
| try: |
| profile.Profile.runcall(prof, worker.serve) |
| finally: |
| prof.dump_stats(profname) |
| bb.utils.process_profilelog(profname) |
| except BaseException as e: |
| if not normalexit: |
| import traceback |
| sys.stderr.write(traceback.format_exc()) |
| sys.stderr.write(str(e)) |
| finally: |
| worker_thread_exit = True |
| worker_thread.join() |
| |
| workerlog_write("exiting") |
| if not normalexit: |
| sys.exit(1) |
| sys.exit(0) |