Patrick Williams | c124f4f | 2015-09-15 14:41:29 -0500 | [diff] [blame^] | 1 | #!/usr/bin/env python |
| 2 | |
| 3 | import os |
| 4 | import sys |
| 5 | import warnings |
| 6 | sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib')) |
| 7 | from bb import fetch2 |
| 8 | import logging |
| 9 | import bb |
| 10 | import select |
| 11 | import errno |
| 12 | import signal |
| 13 | |
| 14 | # Users shouldn't be running this code directly |
| 15 | if len(sys.argv) != 2 or not sys.argv[1].startswith("decafbad"): |
| 16 | print("bitbake-worker is meant for internal execution by bitbake itself, please don't use it standalone.") |
| 17 | sys.exit(1) |
| 18 | |
| 19 | profiling = False |
| 20 | if sys.argv[1] == "decafbadbad": |
| 21 | profiling = True |
| 22 | try: |
| 23 | import cProfile as profile |
| 24 | except: |
| 25 | import profile |
| 26 | |
| 27 | # Unbuffer stdout to avoid log truncation in the event |
| 28 | # of an unorderly exit as well as to provide timely |
| 29 | # updates to log files for use with tail |
| 30 | try: |
| 31 | if sys.stdout.name == '<stdout>': |
| 32 | sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0) |
| 33 | except: |
| 34 | pass |
| 35 | |
| 36 | logger = logging.getLogger("BitBake") |
| 37 | |
| 38 | try: |
| 39 | import cPickle as pickle |
| 40 | except ImportError: |
| 41 | import pickle |
| 42 | bb.msg.note(1, bb.msg.domain.Cache, "Importing cPickle failed. Falling back to a very slow implementation.") |
| 43 | |
| 44 | |
| 45 | worker_pipe = sys.stdout.fileno() |
| 46 | bb.utils.nonblockingfd(worker_pipe) |
| 47 | |
| 48 | handler = bb.event.LogHandler() |
| 49 | logger.addHandler(handler) |
| 50 | |
| 51 | if 0: |
| 52 | # Code to write out a log file of all events passing through the worker |
| 53 | logfilename = "/tmp/workerlogfile" |
| 54 | format_str = "%(levelname)s: %(message)s" |
| 55 | conlogformat = bb.msg.BBLogFormatter(format_str) |
| 56 | consolelog = logging.FileHandler(logfilename) |
| 57 | bb.msg.addDefaultlogFilter(consolelog) |
| 58 | consolelog.setFormatter(conlogformat) |
| 59 | logger.addHandler(consolelog) |
| 60 | |
| 61 | worker_queue = "" |
| 62 | |
| 63 | def worker_fire(event, d): |
| 64 | data = "<event>" + pickle.dumps(event) + "</event>" |
| 65 | worker_fire_prepickled(data) |
| 66 | |
| 67 | def worker_fire_prepickled(event): |
| 68 | global worker_queue |
| 69 | |
| 70 | worker_queue = worker_queue + event |
| 71 | worker_flush() |
| 72 | |
| 73 | def worker_flush(): |
| 74 | global worker_queue, worker_pipe |
| 75 | |
| 76 | if not worker_queue: |
| 77 | return |
| 78 | |
| 79 | try: |
| 80 | written = os.write(worker_pipe, worker_queue) |
| 81 | worker_queue = worker_queue[written:] |
| 82 | except (IOError, OSError) as e: |
| 83 | if e.errno != errno.EAGAIN and e.errno != errno.EPIPE: |
| 84 | raise |
| 85 | |
| 86 | def worker_child_fire(event, d): |
| 87 | global worker_pipe |
| 88 | |
| 89 | data = "<event>" + pickle.dumps(event) + "</event>" |
| 90 | try: |
| 91 | worker_pipe.write(data) |
| 92 | except IOError: |
| 93 | sigterm_handler(None, None) |
| 94 | raise |
| 95 | |
| 96 | bb.event.worker_fire = worker_fire |
| 97 | |
| 98 | lf = None |
| 99 | #lf = open("/tmp/workercommandlog", "w+") |
| 100 | def workerlog_write(msg): |
| 101 | if lf: |
| 102 | lf.write(msg) |
| 103 | lf.flush() |
| 104 | |
| 105 | def sigterm_handler(signum, frame): |
| 106 | signal.signal(signal.SIGTERM, signal.SIG_DFL) |
| 107 | os.killpg(0, signal.SIGTERM) |
| 108 | sys.exit() |
| 109 | |
| 110 | def fork_off_task(cfg, data, workerdata, fn, task, taskname, appends, taskdepdata, quieterrors=False): |
| 111 | # We need to setup the environment BEFORE the fork, since |
| 112 | # a fork() or exec*() activates PSEUDO... |
| 113 | |
| 114 | envbackup = {} |
| 115 | fakeenv = {} |
| 116 | umask = None |
| 117 | |
| 118 | taskdep = workerdata["taskdeps"][fn] |
| 119 | if 'umask' in taskdep and taskname in taskdep['umask']: |
| 120 | # umask might come in as a number or text string.. |
| 121 | try: |
| 122 | umask = int(taskdep['umask'][taskname],8) |
| 123 | except TypeError: |
| 124 | umask = taskdep['umask'][taskname] |
| 125 | |
| 126 | # We can't use the fakeroot environment in a dry run as it possibly hasn't been built |
| 127 | if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not cfg.dry_run: |
| 128 | envvars = (workerdata["fakerootenv"][fn] or "").split() |
| 129 | for key, value in (var.split('=') for var in envvars): |
| 130 | envbackup[key] = os.environ.get(key) |
| 131 | os.environ[key] = value |
| 132 | fakeenv[key] = value |
| 133 | |
| 134 | fakedirs = (workerdata["fakerootdirs"][fn] or "").split() |
| 135 | for p in fakedirs: |
| 136 | bb.utils.mkdirhier(p) |
| 137 | logger.debug(2, 'Running %s:%s under fakeroot, fakedirs: %s' % |
| 138 | (fn, taskname, ', '.join(fakedirs))) |
| 139 | else: |
| 140 | envvars = (workerdata["fakerootnoenv"][fn] or "").split() |
| 141 | for key, value in (var.split('=') for var in envvars): |
| 142 | envbackup[key] = os.environ.get(key) |
| 143 | os.environ[key] = value |
| 144 | fakeenv[key] = value |
| 145 | |
| 146 | sys.stdout.flush() |
| 147 | sys.stderr.flush() |
| 148 | |
| 149 | try: |
| 150 | pipein, pipeout = os.pipe() |
| 151 | pipein = os.fdopen(pipein, 'rb', 4096) |
| 152 | pipeout = os.fdopen(pipeout, 'wb', 0) |
| 153 | pid = os.fork() |
| 154 | except OSError as e: |
| 155 | bb.msg.fatal("RunQueue", "fork failed: %d (%s)" % (e.errno, e.strerror)) |
| 156 | |
| 157 | if pid == 0: |
| 158 | def child(): |
| 159 | global worker_pipe |
| 160 | pipein.close() |
| 161 | |
| 162 | signal.signal(signal.SIGTERM, sigterm_handler) |
| 163 | # Let SIGHUP exit as SIGTERM |
| 164 | signal.signal(signal.SIGHUP, sigterm_handler) |
| 165 | bb.utils.signal_on_parent_exit("SIGTERM") |
| 166 | |
| 167 | # Save out the PID so that the event can include it the |
| 168 | # events |
| 169 | bb.event.worker_pid = os.getpid() |
| 170 | bb.event.worker_fire = worker_child_fire |
| 171 | worker_pipe = pipeout |
| 172 | |
| 173 | # Make the child the process group leader and ensure no |
| 174 | # child process will be controlled by the current terminal |
| 175 | # This ensures signals sent to the controlling terminal like Ctrl+C |
| 176 | # don't stop the child processes. |
| 177 | os.setsid() |
| 178 | # No stdin |
| 179 | newsi = os.open(os.devnull, os.O_RDWR) |
| 180 | os.dup2(newsi, sys.stdin.fileno()) |
| 181 | |
| 182 | if umask: |
| 183 | os.umask(umask) |
| 184 | |
| 185 | data.setVar("BB_WORKERCONTEXT", "1") |
| 186 | data.setVar("BB_TASKDEPDATA", taskdepdata) |
| 187 | data.setVar("BUILDNAME", workerdata["buildname"]) |
| 188 | data.setVar("DATE", workerdata["date"]) |
| 189 | data.setVar("TIME", workerdata["time"]) |
| 190 | bb.parse.siggen.set_taskdata(workerdata["sigdata"]) |
| 191 | ret = 0 |
| 192 | try: |
| 193 | the_data = bb.cache.Cache.loadDataFull(fn, appends, data) |
| 194 | the_data.setVar('BB_TASKHASH', workerdata["runq_hash"][task]) |
| 195 | |
| 196 | # exported_vars() returns a generator which *cannot* be passed to os.environ.update() |
| 197 | # successfully. We also need to unset anything from the environment which shouldn't be there |
| 198 | exports = bb.data.exported_vars(the_data) |
| 199 | bb.utils.empty_environment() |
| 200 | for e, v in exports: |
| 201 | os.environ[e] = v |
| 202 | for e in fakeenv: |
| 203 | os.environ[e] = fakeenv[e] |
| 204 | the_data.setVar(e, fakeenv[e]) |
| 205 | the_data.setVarFlag(e, 'export', "1") |
| 206 | |
| 207 | if quieterrors: |
| 208 | the_data.setVarFlag(taskname, "quieterrors", "1") |
| 209 | |
| 210 | except Exception as exc: |
| 211 | if not quieterrors: |
| 212 | logger.critical(str(exc)) |
| 213 | os._exit(1) |
| 214 | try: |
| 215 | if cfg.dry_run: |
| 216 | return 0 |
| 217 | return bb.build.exec_task(fn, taskname, the_data, cfg.profile) |
| 218 | except: |
| 219 | os._exit(1) |
| 220 | if not profiling: |
| 221 | os._exit(child()) |
| 222 | else: |
| 223 | profname = "profile-%s.log" % (fn.replace("/", "-") + "-" + taskname) |
| 224 | prof = profile.Profile() |
| 225 | try: |
| 226 | ret = profile.Profile.runcall(prof, child) |
| 227 | finally: |
| 228 | prof.dump_stats(profname) |
| 229 | bb.utils.process_profilelog(profname) |
| 230 | os._exit(ret) |
| 231 | else: |
| 232 | for key, value in envbackup.iteritems(): |
| 233 | if value is None: |
| 234 | del os.environ[key] |
| 235 | else: |
| 236 | os.environ[key] = value |
| 237 | |
| 238 | return pid, pipein, pipeout |
| 239 | |
| 240 | class runQueueWorkerPipe(): |
| 241 | """ |
| 242 | Abstraction for a pipe between a worker thread and the worker server |
| 243 | """ |
| 244 | def __init__(self, pipein, pipeout): |
| 245 | self.input = pipein |
| 246 | if pipeout: |
| 247 | pipeout.close() |
| 248 | bb.utils.nonblockingfd(self.input) |
| 249 | self.queue = "" |
| 250 | |
| 251 | def read(self): |
| 252 | start = len(self.queue) |
| 253 | try: |
| 254 | self.queue = self.queue + self.input.read(102400) |
| 255 | except (OSError, IOError) as e: |
| 256 | if e.errno != errno.EAGAIN: |
| 257 | raise |
| 258 | |
| 259 | end = len(self.queue) |
| 260 | index = self.queue.find("</event>") |
| 261 | while index != -1: |
| 262 | worker_fire_prepickled(self.queue[:index+8]) |
| 263 | self.queue = self.queue[index+8:] |
| 264 | index = self.queue.find("</event>") |
| 265 | return (end > start) |
| 266 | |
| 267 | def close(self): |
| 268 | while self.read(): |
| 269 | continue |
| 270 | if len(self.queue) > 0: |
| 271 | print("Warning, worker child left partial message: %s" % self.queue) |
| 272 | self.input.close() |
| 273 | |
| 274 | normalexit = False |
| 275 | |
| 276 | class BitbakeWorker(object): |
| 277 | def __init__(self, din): |
| 278 | self.input = din |
| 279 | bb.utils.nonblockingfd(self.input) |
| 280 | self.queue = "" |
| 281 | self.cookercfg = None |
| 282 | self.databuilder = None |
| 283 | self.data = None |
| 284 | self.build_pids = {} |
| 285 | self.build_pipes = {} |
| 286 | |
| 287 | signal.signal(signal.SIGTERM, self.sigterm_exception) |
| 288 | # Let SIGHUP exit as SIGTERM |
| 289 | signal.signal(signal.SIGHUP, self.sigterm_exception) |
| 290 | |
| 291 | def sigterm_exception(self, signum, stackframe): |
| 292 | if signum == signal.SIGTERM: |
| 293 | bb.warn("Worker recieved SIGTERM, shutting down...") |
| 294 | elif signum == signal.SIGHUP: |
| 295 | bb.warn("Worker recieved SIGHUP, shutting down...") |
| 296 | self.handle_finishnow(None) |
| 297 | signal.signal(signal.SIGTERM, signal.SIG_DFL) |
| 298 | os.kill(os.getpid(), signal.SIGTERM) |
| 299 | |
| 300 | def serve(self): |
| 301 | while True: |
| 302 | (ready, _, _) = select.select([self.input] + [i.input for i in self.build_pipes.values()], [] , [], 1) |
| 303 | if self.input in ready: |
| 304 | try: |
| 305 | r = self.input.read() |
| 306 | if len(r) == 0: |
| 307 | # EOF on pipe, server must have terminated |
| 308 | self.sigterm_exception(signal.SIGTERM, None) |
| 309 | self.queue = self.queue + r |
| 310 | except (OSError, IOError): |
| 311 | pass |
| 312 | if len(self.queue): |
| 313 | self.handle_item("cookerconfig", self.handle_cookercfg) |
| 314 | self.handle_item("workerdata", self.handle_workerdata) |
| 315 | self.handle_item("runtask", self.handle_runtask) |
| 316 | self.handle_item("finishnow", self.handle_finishnow) |
| 317 | self.handle_item("ping", self.handle_ping) |
| 318 | self.handle_item("quit", self.handle_quit) |
| 319 | |
| 320 | for pipe in self.build_pipes: |
| 321 | self.build_pipes[pipe].read() |
| 322 | if len(self.build_pids): |
| 323 | self.process_waitpid() |
| 324 | worker_flush() |
| 325 | |
| 326 | |
| 327 | def handle_item(self, item, func): |
| 328 | if self.queue.startswith("<" + item + ">"): |
| 329 | index = self.queue.find("</" + item + ">") |
| 330 | while index != -1: |
| 331 | func(self.queue[(len(item) + 2):index]) |
| 332 | self.queue = self.queue[(index + len(item) + 3):] |
| 333 | index = self.queue.find("</" + item + ">") |
| 334 | |
| 335 | def handle_cookercfg(self, data): |
| 336 | self.cookercfg = pickle.loads(data) |
| 337 | self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True) |
| 338 | self.databuilder.parseBaseConfiguration() |
| 339 | self.data = self.databuilder.data |
| 340 | |
| 341 | def handle_workerdata(self, data): |
| 342 | self.workerdata = pickle.loads(data) |
| 343 | bb.msg.loggerDefaultDebugLevel = self.workerdata["logdefaultdebug"] |
| 344 | bb.msg.loggerDefaultVerbose = self.workerdata["logdefaultverbose"] |
| 345 | bb.msg.loggerVerboseLogs = self.workerdata["logdefaultverboselogs"] |
| 346 | bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"] |
| 347 | self.data.setVar("PRSERV_HOST", self.workerdata["prhost"]) |
| 348 | |
| 349 | def handle_ping(self, _): |
| 350 | workerlog_write("Handling ping\n") |
| 351 | |
| 352 | logger.warn("Pong from bitbake-worker!") |
| 353 | |
| 354 | def handle_quit(self, data): |
| 355 | workerlog_write("Handling quit\n") |
| 356 | |
| 357 | global normalexit |
| 358 | normalexit = True |
| 359 | sys.exit(0) |
| 360 | |
| 361 | def handle_runtask(self, data): |
| 362 | fn, task, taskname, quieterrors, appends, taskdepdata = pickle.loads(data) |
| 363 | workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname)) |
| 364 | |
| 365 | pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.workerdata, fn, task, taskname, appends, taskdepdata, quieterrors) |
| 366 | |
| 367 | self.build_pids[pid] = task |
| 368 | self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout) |
| 369 | |
| 370 | def process_waitpid(self): |
| 371 | """ |
| 372 | Return none is there are no processes awaiting result collection, otherwise |
| 373 | collect the process exit codes and close the information pipe. |
| 374 | """ |
| 375 | try: |
| 376 | pid, status = os.waitpid(-1, os.WNOHANG) |
| 377 | if pid == 0 or os.WIFSTOPPED(status): |
| 378 | return None |
| 379 | except OSError: |
| 380 | return None |
| 381 | |
| 382 | workerlog_write("Exit code of %s for pid %s\n" % (status, pid)) |
| 383 | |
| 384 | if os.WIFEXITED(status): |
| 385 | status = os.WEXITSTATUS(status) |
| 386 | elif os.WIFSIGNALED(status): |
| 387 | # Per shell conventions for $?, when a process exits due to |
| 388 | # a signal, we return an exit code of 128 + SIGNUM |
| 389 | status = 128 + os.WTERMSIG(status) |
| 390 | |
| 391 | task = self.build_pids[pid] |
| 392 | del self.build_pids[pid] |
| 393 | |
| 394 | self.build_pipes[pid].close() |
| 395 | del self.build_pipes[pid] |
| 396 | |
| 397 | worker_fire_prepickled("<exitcode>" + pickle.dumps((task, status)) + "</exitcode>") |
| 398 | |
| 399 | def handle_finishnow(self, _): |
| 400 | if self.build_pids: |
| 401 | logger.info("Sending SIGTERM to remaining %s tasks", len(self.build_pids)) |
| 402 | for k, v in self.build_pids.iteritems(): |
| 403 | try: |
| 404 | os.kill(-k, signal.SIGTERM) |
| 405 | os.waitpid(-1, 0) |
| 406 | except: |
| 407 | pass |
| 408 | for pipe in self.build_pipes: |
| 409 | self.build_pipes[pipe].read() |
| 410 | |
| 411 | try: |
| 412 | worker = BitbakeWorker(sys.stdin) |
| 413 | if not profiling: |
| 414 | worker.serve() |
| 415 | else: |
| 416 | profname = "profile-worker.log" |
| 417 | prof = profile.Profile() |
| 418 | try: |
| 419 | profile.Profile.runcall(prof, worker.serve) |
| 420 | finally: |
| 421 | prof.dump_stats(profname) |
| 422 | bb.utils.process_profilelog(profname) |
| 423 | except BaseException as e: |
| 424 | if not normalexit: |
| 425 | import traceback |
| 426 | sys.stderr.write(traceback.format_exc()) |
| 427 | sys.stderr.write(str(e)) |
| 428 | while len(worker_queue): |
| 429 | worker_flush() |
| 430 | workerlog_write("exitting") |
| 431 | sys.exit(0) |
| 432 | |