blob: 767a1c03367b48934b8087ea9fad83e99fd8b9cf [file] [log] [blame]
Patrick Williamsc124f4f2015-09-15 14:41:29 -05001#!/usr/bin/env python
2
3import os
4import sys
5import warnings
6sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib'))
7from bb import fetch2
8import logging
9import bb
10import select
11import errno
12import signal
Patrick Williamsf1e5d692016-03-30 15:21:19 -050013from multiprocessing import Lock
Patrick Williamsc124f4f2015-09-15 14:41:29 -050014
15# Users shouldn't be running this code directly
16if len(sys.argv) != 2 or not sys.argv[1].startswith("decafbad"):
17 print("bitbake-worker is meant for internal execution by bitbake itself, please don't use it standalone.")
18 sys.exit(1)
19
20profiling = False
Patrick Williamsd8c66bc2016-06-20 12:57:21 -050021if sys.argv[1].startswith("decafbadbad"):
Patrick Williamsc124f4f2015-09-15 14:41:29 -050022 profiling = True
23 try:
24 import cProfile as profile
25 except:
26 import profile
27
28# Unbuffer stdout to avoid log truncation in the event
29# of an unorderly exit as well as to provide timely
30# updates to log files for use with tail
31try:
32 if sys.stdout.name == '<stdout>':
33 sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
34except:
35 pass
36
37logger = logging.getLogger("BitBake")
38
39try:
40 import cPickle as pickle
41except ImportError:
42 import pickle
43 bb.msg.note(1, bb.msg.domain.Cache, "Importing cPickle failed. Falling back to a very slow implementation.")
44
45
46worker_pipe = sys.stdout.fileno()
47bb.utils.nonblockingfd(worker_pipe)
Patrick Williamsf1e5d692016-03-30 15:21:19 -050048# Need to guard against multiprocessing being used in child processes
49# and multiple processes trying to write to the parent at the same time
50worker_pipe_lock = None
Patrick Williamsc124f4f2015-09-15 14:41:29 -050051
52handler = bb.event.LogHandler()
53logger.addHandler(handler)
54
55if 0:
56 # Code to write out a log file of all events passing through the worker
57 logfilename = "/tmp/workerlogfile"
58 format_str = "%(levelname)s: %(message)s"
59 conlogformat = bb.msg.BBLogFormatter(format_str)
60 consolelog = logging.FileHandler(logfilename)
61 bb.msg.addDefaultlogFilter(consolelog)
62 consolelog.setFormatter(conlogformat)
63 logger.addHandler(consolelog)
64
65worker_queue = ""
66
67def worker_fire(event, d):
68 data = "<event>" + pickle.dumps(event) + "</event>"
69 worker_fire_prepickled(data)
70
71def worker_fire_prepickled(event):
72 global worker_queue
73
74 worker_queue = worker_queue + event
75 worker_flush()
76
77def worker_flush():
78 global worker_queue, worker_pipe
79
80 if not worker_queue:
81 return
82
83 try:
84 written = os.write(worker_pipe, worker_queue)
85 worker_queue = worker_queue[written:]
86 except (IOError, OSError) as e:
87 if e.errno != errno.EAGAIN and e.errno != errno.EPIPE:
88 raise
89
90def worker_child_fire(event, d):
91 global worker_pipe
Patrick Williamsf1e5d692016-03-30 15:21:19 -050092 global worker_pipe_lock
Patrick Williamsc124f4f2015-09-15 14:41:29 -050093
94 data = "<event>" + pickle.dumps(event) + "</event>"
95 try:
Patrick Williamsf1e5d692016-03-30 15:21:19 -050096 worker_pipe_lock.acquire()
Patrick Williamsc124f4f2015-09-15 14:41:29 -050097 worker_pipe.write(data)
Patrick Williamsf1e5d692016-03-30 15:21:19 -050098 worker_pipe_lock.release()
Patrick Williamsc124f4f2015-09-15 14:41:29 -050099 except IOError:
100 sigterm_handler(None, None)
101 raise
102
103bb.event.worker_fire = worker_fire
104
105lf = None
106#lf = open("/tmp/workercommandlog", "w+")
107def workerlog_write(msg):
108 if lf:
109 lf.write(msg)
110 lf.flush()
111
112def sigterm_handler(signum, frame):
113 signal.signal(signal.SIGTERM, signal.SIG_DFL)
114 os.killpg(0, signal.SIGTERM)
115 sys.exit()
116
117def fork_off_task(cfg, data, workerdata, fn, task, taskname, appends, taskdepdata, quieterrors=False):
118 # We need to setup the environment BEFORE the fork, since
119 # a fork() or exec*() activates PSEUDO...
120
121 envbackup = {}
122 fakeenv = {}
123 umask = None
124
125 taskdep = workerdata["taskdeps"][fn]
126 if 'umask' in taskdep and taskname in taskdep['umask']:
127 # umask might come in as a number or text string..
128 try:
129 umask = int(taskdep['umask'][taskname],8)
130 except TypeError:
131 umask = taskdep['umask'][taskname]
132
133 # We can't use the fakeroot environment in a dry run as it possibly hasn't been built
134 if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not cfg.dry_run:
135 envvars = (workerdata["fakerootenv"][fn] or "").split()
136 for key, value in (var.split('=') for var in envvars):
137 envbackup[key] = os.environ.get(key)
138 os.environ[key] = value
139 fakeenv[key] = value
140
141 fakedirs = (workerdata["fakerootdirs"][fn] or "").split()
142 for p in fakedirs:
143 bb.utils.mkdirhier(p)
144 logger.debug(2, 'Running %s:%s under fakeroot, fakedirs: %s' %
145 (fn, taskname, ', '.join(fakedirs)))
146 else:
147 envvars = (workerdata["fakerootnoenv"][fn] or "").split()
148 for key, value in (var.split('=') for var in envvars):
149 envbackup[key] = os.environ.get(key)
150 os.environ[key] = value
151 fakeenv[key] = value
152
153 sys.stdout.flush()
154 sys.stderr.flush()
155
156 try:
157 pipein, pipeout = os.pipe()
158 pipein = os.fdopen(pipein, 'rb', 4096)
159 pipeout = os.fdopen(pipeout, 'wb', 0)
160 pid = os.fork()
161 except OSError as e:
162 bb.msg.fatal("RunQueue", "fork failed: %d (%s)" % (e.errno, e.strerror))
163
164 if pid == 0:
165 def child():
166 global worker_pipe
Patrick Williamsf1e5d692016-03-30 15:21:19 -0500167 global worker_pipe_lock
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500168 pipein.close()
169
170 signal.signal(signal.SIGTERM, sigterm_handler)
171 # Let SIGHUP exit as SIGTERM
172 signal.signal(signal.SIGHUP, sigterm_handler)
173 bb.utils.signal_on_parent_exit("SIGTERM")
174
175 # Save out the PID so that the event can include it the
176 # events
177 bb.event.worker_pid = os.getpid()
178 bb.event.worker_fire = worker_child_fire
179 worker_pipe = pipeout
Patrick Williamsf1e5d692016-03-30 15:21:19 -0500180 worker_pipe_lock = Lock()
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500181
182 # Make the child the process group leader and ensure no
183 # child process will be controlled by the current terminal
184 # This ensures signals sent to the controlling terminal like Ctrl+C
185 # don't stop the child processes.
186 os.setsid()
187 # No stdin
188 newsi = os.open(os.devnull, os.O_RDWR)
189 os.dup2(newsi, sys.stdin.fileno())
190
191 if umask:
192 os.umask(umask)
193
194 data.setVar("BB_WORKERCONTEXT", "1")
195 data.setVar("BB_TASKDEPDATA", taskdepdata)
196 data.setVar("BUILDNAME", workerdata["buildname"])
197 data.setVar("DATE", workerdata["date"])
198 data.setVar("TIME", workerdata["time"])
199 bb.parse.siggen.set_taskdata(workerdata["sigdata"])
200 ret = 0
201 try:
202 the_data = bb.cache.Cache.loadDataFull(fn, appends, data)
203 the_data.setVar('BB_TASKHASH', workerdata["runq_hash"][task])
204
Patrick Williamsd8c66bc2016-06-20 12:57:21 -0500205 bb.utils.set_process_name("%s:%s" % (the_data.getVar("PN", True), taskname.replace("do_", "")))
206
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500207 # exported_vars() returns a generator which *cannot* be passed to os.environ.update()
208 # successfully. We also need to unset anything from the environment which shouldn't be there
209 exports = bb.data.exported_vars(the_data)
210 bb.utils.empty_environment()
211 for e, v in exports:
212 os.environ[e] = v
213 for e in fakeenv:
214 os.environ[e] = fakeenv[e]
215 the_data.setVar(e, fakeenv[e])
216 the_data.setVarFlag(e, 'export', "1")
217
218 if quieterrors:
219 the_data.setVarFlag(taskname, "quieterrors", "1")
220
221 except Exception as exc:
222 if not quieterrors:
223 logger.critical(str(exc))
224 os._exit(1)
225 try:
226 if cfg.dry_run:
227 return 0
228 return bb.build.exec_task(fn, taskname, the_data, cfg.profile)
229 except:
230 os._exit(1)
231 if not profiling:
232 os._exit(child())
233 else:
234 profname = "profile-%s.log" % (fn.replace("/", "-") + "-" + taskname)
235 prof = profile.Profile()
236 try:
237 ret = profile.Profile.runcall(prof, child)
238 finally:
239 prof.dump_stats(profname)
240 bb.utils.process_profilelog(profname)
241 os._exit(ret)
242 else:
243 for key, value in envbackup.iteritems():
244 if value is None:
245 del os.environ[key]
246 else:
247 os.environ[key] = value
248
249 return pid, pipein, pipeout
250
251class runQueueWorkerPipe():
252 """
253 Abstraction for a pipe between a worker thread and the worker server
254 """
255 def __init__(self, pipein, pipeout):
256 self.input = pipein
257 if pipeout:
258 pipeout.close()
259 bb.utils.nonblockingfd(self.input)
260 self.queue = ""
261
262 def read(self):
263 start = len(self.queue)
264 try:
265 self.queue = self.queue + self.input.read(102400)
266 except (OSError, IOError) as e:
267 if e.errno != errno.EAGAIN:
268 raise
269
270 end = len(self.queue)
271 index = self.queue.find("</event>")
272 while index != -1:
273 worker_fire_prepickled(self.queue[:index+8])
274 self.queue = self.queue[index+8:]
275 index = self.queue.find("</event>")
276 return (end > start)
277
278 def close(self):
279 while self.read():
280 continue
281 if len(self.queue) > 0:
282 print("Warning, worker child left partial message: %s" % self.queue)
283 self.input.close()
284
285normalexit = False
286
287class BitbakeWorker(object):
288 def __init__(self, din):
289 self.input = din
290 bb.utils.nonblockingfd(self.input)
291 self.queue = ""
292 self.cookercfg = None
293 self.databuilder = None
294 self.data = None
295 self.build_pids = {}
296 self.build_pipes = {}
297
298 signal.signal(signal.SIGTERM, self.sigterm_exception)
299 # Let SIGHUP exit as SIGTERM
300 signal.signal(signal.SIGHUP, self.sigterm_exception)
Patrick Williamsd8c66bc2016-06-20 12:57:21 -0500301 if "beef" in sys.argv[1]:
302 bb.utils.set_process_name("Worker (Fakeroot)")
303 else:
304 bb.utils.set_process_name("Worker")
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500305
306 def sigterm_exception(self, signum, stackframe):
307 if signum == signal.SIGTERM:
Patrick Williamsd8c66bc2016-06-20 12:57:21 -0500308 bb.warn("Worker received SIGTERM, shutting down...")
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500309 elif signum == signal.SIGHUP:
Patrick Williamsd8c66bc2016-06-20 12:57:21 -0500310 bb.warn("Worker received SIGHUP, shutting down...")
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500311 self.handle_finishnow(None)
312 signal.signal(signal.SIGTERM, signal.SIG_DFL)
313 os.kill(os.getpid(), signal.SIGTERM)
314
315 def serve(self):
316 while True:
317 (ready, _, _) = select.select([self.input] + [i.input for i in self.build_pipes.values()], [] , [], 1)
318 if self.input in ready:
319 try:
320 r = self.input.read()
321 if len(r) == 0:
322 # EOF on pipe, server must have terminated
323 self.sigterm_exception(signal.SIGTERM, None)
324 self.queue = self.queue + r
325 except (OSError, IOError):
326 pass
327 if len(self.queue):
328 self.handle_item("cookerconfig", self.handle_cookercfg)
329 self.handle_item("workerdata", self.handle_workerdata)
330 self.handle_item("runtask", self.handle_runtask)
331 self.handle_item("finishnow", self.handle_finishnow)
332 self.handle_item("ping", self.handle_ping)
333 self.handle_item("quit", self.handle_quit)
334
335 for pipe in self.build_pipes:
336 self.build_pipes[pipe].read()
337 if len(self.build_pids):
338 self.process_waitpid()
339 worker_flush()
340
341
342 def handle_item(self, item, func):
343 if self.queue.startswith("<" + item + ">"):
344 index = self.queue.find("</" + item + ">")
345 while index != -1:
346 func(self.queue[(len(item) + 2):index])
347 self.queue = self.queue[(index + len(item) + 3):]
348 index = self.queue.find("</" + item + ">")
349
350 def handle_cookercfg(self, data):
351 self.cookercfg = pickle.loads(data)
352 self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True)
353 self.databuilder.parseBaseConfiguration()
354 self.data = self.databuilder.data
355
356 def handle_workerdata(self, data):
357 self.workerdata = pickle.loads(data)
358 bb.msg.loggerDefaultDebugLevel = self.workerdata["logdefaultdebug"]
359 bb.msg.loggerDefaultVerbose = self.workerdata["logdefaultverbose"]
360 bb.msg.loggerVerboseLogs = self.workerdata["logdefaultverboselogs"]
361 bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"]
362 self.data.setVar("PRSERV_HOST", self.workerdata["prhost"])
363
364 def handle_ping(self, _):
365 workerlog_write("Handling ping\n")
366
367 logger.warn("Pong from bitbake-worker!")
368
369 def handle_quit(self, data):
370 workerlog_write("Handling quit\n")
371
372 global normalexit
373 normalexit = True
374 sys.exit(0)
375
376 def handle_runtask(self, data):
377 fn, task, taskname, quieterrors, appends, taskdepdata = pickle.loads(data)
378 workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname))
379
380 pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.workerdata, fn, task, taskname, appends, taskdepdata, quieterrors)
381
382 self.build_pids[pid] = task
383 self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout)
384
385 def process_waitpid(self):
386 """
387 Return none is there are no processes awaiting result collection, otherwise
388 collect the process exit codes and close the information pipe.
389 """
390 try:
391 pid, status = os.waitpid(-1, os.WNOHANG)
392 if pid == 0 or os.WIFSTOPPED(status):
393 return None
394 except OSError:
395 return None
396
397 workerlog_write("Exit code of %s for pid %s\n" % (status, pid))
398
399 if os.WIFEXITED(status):
400 status = os.WEXITSTATUS(status)
401 elif os.WIFSIGNALED(status):
402 # Per shell conventions for $?, when a process exits due to
403 # a signal, we return an exit code of 128 + SIGNUM
404 status = 128 + os.WTERMSIG(status)
405
406 task = self.build_pids[pid]
407 del self.build_pids[pid]
408
409 self.build_pipes[pid].close()
410 del self.build_pipes[pid]
411
412 worker_fire_prepickled("<exitcode>" + pickle.dumps((task, status)) + "</exitcode>")
413
414 def handle_finishnow(self, _):
415 if self.build_pids:
416 logger.info("Sending SIGTERM to remaining %s tasks", len(self.build_pids))
417 for k, v in self.build_pids.iteritems():
418 try:
419 os.kill(-k, signal.SIGTERM)
420 os.waitpid(-1, 0)
421 except:
422 pass
423 for pipe in self.build_pipes:
424 self.build_pipes[pipe].read()
425
426try:
427 worker = BitbakeWorker(sys.stdin)
428 if not profiling:
429 worker.serve()
430 else:
431 profname = "profile-worker.log"
432 prof = profile.Profile()
433 try:
434 profile.Profile.runcall(prof, worker.serve)
435 finally:
436 prof.dump_stats(profname)
437 bb.utils.process_profilelog(profname)
438except BaseException as e:
439 if not normalexit:
440 import traceback
441 sys.stderr.write(traceback.format_exc())
442 sys.stderr.write(str(e))
443while len(worker_queue):
444 worker_flush()
445workerlog_write("exitting")
446sys.exit(0)
447