blob: af17b874aa3460e48534f7de7887e1786cf62aab [file] [log] [blame]
Patrick Williamsc124f4f2015-09-15 14:41:29 -05001#!/usr/bin/env python
2
3import os
4import sys
5import warnings
6sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib'))
7from bb import fetch2
8import logging
9import bb
10import select
11import errno
12import signal
13
14# Users shouldn't be running this code directly
15if len(sys.argv) != 2 or not sys.argv[1].startswith("decafbad"):
16 print("bitbake-worker is meant for internal execution by bitbake itself, please don't use it standalone.")
17 sys.exit(1)
18
19profiling = False
20if sys.argv[1] == "decafbadbad":
21 profiling = True
22 try:
23 import cProfile as profile
24 except:
25 import profile
26
27# Unbuffer stdout to avoid log truncation in the event
28# of an unorderly exit as well as to provide timely
29# updates to log files for use with tail
30try:
31 if sys.stdout.name == '<stdout>':
32 sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
33except:
34 pass
35
36logger = logging.getLogger("BitBake")
37
38try:
39 import cPickle as pickle
40except ImportError:
41 import pickle
42 bb.msg.note(1, bb.msg.domain.Cache, "Importing cPickle failed. Falling back to a very slow implementation.")
43
44
45worker_pipe = sys.stdout.fileno()
46bb.utils.nonblockingfd(worker_pipe)
47
48handler = bb.event.LogHandler()
49logger.addHandler(handler)
50
51if 0:
52 # Code to write out a log file of all events passing through the worker
53 logfilename = "/tmp/workerlogfile"
54 format_str = "%(levelname)s: %(message)s"
55 conlogformat = bb.msg.BBLogFormatter(format_str)
56 consolelog = logging.FileHandler(logfilename)
57 bb.msg.addDefaultlogFilter(consolelog)
58 consolelog.setFormatter(conlogformat)
59 logger.addHandler(consolelog)
60
61worker_queue = ""
62
63def worker_fire(event, d):
64 data = "<event>" + pickle.dumps(event) + "</event>"
65 worker_fire_prepickled(data)
66
67def worker_fire_prepickled(event):
68 global worker_queue
69
70 worker_queue = worker_queue + event
71 worker_flush()
72
73def worker_flush():
74 global worker_queue, worker_pipe
75
76 if not worker_queue:
77 return
78
79 try:
80 written = os.write(worker_pipe, worker_queue)
81 worker_queue = worker_queue[written:]
82 except (IOError, OSError) as e:
83 if e.errno != errno.EAGAIN and e.errno != errno.EPIPE:
84 raise
85
86def worker_child_fire(event, d):
87 global worker_pipe
88
89 data = "<event>" + pickle.dumps(event) + "</event>"
90 try:
91 worker_pipe.write(data)
92 except IOError:
93 sigterm_handler(None, None)
94 raise
95
96bb.event.worker_fire = worker_fire
97
98lf = None
99#lf = open("/tmp/workercommandlog", "w+")
100def workerlog_write(msg):
101 if lf:
102 lf.write(msg)
103 lf.flush()
104
105def sigterm_handler(signum, frame):
106 signal.signal(signal.SIGTERM, signal.SIG_DFL)
107 os.killpg(0, signal.SIGTERM)
108 sys.exit()
109
110def fork_off_task(cfg, data, workerdata, fn, task, taskname, appends, taskdepdata, quieterrors=False):
111 # We need to setup the environment BEFORE the fork, since
112 # a fork() or exec*() activates PSEUDO...
113
114 envbackup = {}
115 fakeenv = {}
116 umask = None
117
118 taskdep = workerdata["taskdeps"][fn]
119 if 'umask' in taskdep and taskname in taskdep['umask']:
120 # umask might come in as a number or text string..
121 try:
122 umask = int(taskdep['umask'][taskname],8)
123 except TypeError:
124 umask = taskdep['umask'][taskname]
125
126 # We can't use the fakeroot environment in a dry run as it possibly hasn't been built
127 if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not cfg.dry_run:
128 envvars = (workerdata["fakerootenv"][fn] or "").split()
129 for key, value in (var.split('=') for var in envvars):
130 envbackup[key] = os.environ.get(key)
131 os.environ[key] = value
132 fakeenv[key] = value
133
134 fakedirs = (workerdata["fakerootdirs"][fn] or "").split()
135 for p in fakedirs:
136 bb.utils.mkdirhier(p)
137 logger.debug(2, 'Running %s:%s under fakeroot, fakedirs: %s' %
138 (fn, taskname, ', '.join(fakedirs)))
139 else:
140 envvars = (workerdata["fakerootnoenv"][fn] or "").split()
141 for key, value in (var.split('=') for var in envvars):
142 envbackup[key] = os.environ.get(key)
143 os.environ[key] = value
144 fakeenv[key] = value
145
146 sys.stdout.flush()
147 sys.stderr.flush()
148
149 try:
150 pipein, pipeout = os.pipe()
151 pipein = os.fdopen(pipein, 'rb', 4096)
152 pipeout = os.fdopen(pipeout, 'wb', 0)
153 pid = os.fork()
154 except OSError as e:
155 bb.msg.fatal("RunQueue", "fork failed: %d (%s)" % (e.errno, e.strerror))
156
157 if pid == 0:
158 def child():
159 global worker_pipe
160 pipein.close()
161
162 signal.signal(signal.SIGTERM, sigterm_handler)
163 # Let SIGHUP exit as SIGTERM
164 signal.signal(signal.SIGHUP, sigterm_handler)
165 bb.utils.signal_on_parent_exit("SIGTERM")
166
167 # Save out the PID so that the event can include it the
168 # events
169 bb.event.worker_pid = os.getpid()
170 bb.event.worker_fire = worker_child_fire
171 worker_pipe = pipeout
172
173 # Make the child the process group leader and ensure no
174 # child process will be controlled by the current terminal
175 # This ensures signals sent to the controlling terminal like Ctrl+C
176 # don't stop the child processes.
177 os.setsid()
178 # No stdin
179 newsi = os.open(os.devnull, os.O_RDWR)
180 os.dup2(newsi, sys.stdin.fileno())
181
182 if umask:
183 os.umask(umask)
184
185 data.setVar("BB_WORKERCONTEXT", "1")
186 data.setVar("BB_TASKDEPDATA", taskdepdata)
187 data.setVar("BUILDNAME", workerdata["buildname"])
188 data.setVar("DATE", workerdata["date"])
189 data.setVar("TIME", workerdata["time"])
190 bb.parse.siggen.set_taskdata(workerdata["sigdata"])
191 ret = 0
192 try:
193 the_data = bb.cache.Cache.loadDataFull(fn, appends, data)
194 the_data.setVar('BB_TASKHASH', workerdata["runq_hash"][task])
195
196 # exported_vars() returns a generator which *cannot* be passed to os.environ.update()
197 # successfully. We also need to unset anything from the environment which shouldn't be there
198 exports = bb.data.exported_vars(the_data)
199 bb.utils.empty_environment()
200 for e, v in exports:
201 os.environ[e] = v
202 for e in fakeenv:
203 os.environ[e] = fakeenv[e]
204 the_data.setVar(e, fakeenv[e])
205 the_data.setVarFlag(e, 'export', "1")
206
207 if quieterrors:
208 the_data.setVarFlag(taskname, "quieterrors", "1")
209
210 except Exception as exc:
211 if not quieterrors:
212 logger.critical(str(exc))
213 os._exit(1)
214 try:
215 if cfg.dry_run:
216 return 0
217 return bb.build.exec_task(fn, taskname, the_data, cfg.profile)
218 except:
219 os._exit(1)
220 if not profiling:
221 os._exit(child())
222 else:
223 profname = "profile-%s.log" % (fn.replace("/", "-") + "-" + taskname)
224 prof = profile.Profile()
225 try:
226 ret = profile.Profile.runcall(prof, child)
227 finally:
228 prof.dump_stats(profname)
229 bb.utils.process_profilelog(profname)
230 os._exit(ret)
231 else:
232 for key, value in envbackup.iteritems():
233 if value is None:
234 del os.environ[key]
235 else:
236 os.environ[key] = value
237
238 return pid, pipein, pipeout
239
240class runQueueWorkerPipe():
241 """
242 Abstraction for a pipe between a worker thread and the worker server
243 """
244 def __init__(self, pipein, pipeout):
245 self.input = pipein
246 if pipeout:
247 pipeout.close()
248 bb.utils.nonblockingfd(self.input)
249 self.queue = ""
250
251 def read(self):
252 start = len(self.queue)
253 try:
254 self.queue = self.queue + self.input.read(102400)
255 except (OSError, IOError) as e:
256 if e.errno != errno.EAGAIN:
257 raise
258
259 end = len(self.queue)
260 index = self.queue.find("</event>")
261 while index != -1:
262 worker_fire_prepickled(self.queue[:index+8])
263 self.queue = self.queue[index+8:]
264 index = self.queue.find("</event>")
265 return (end > start)
266
267 def close(self):
268 while self.read():
269 continue
270 if len(self.queue) > 0:
271 print("Warning, worker child left partial message: %s" % self.queue)
272 self.input.close()
273
274normalexit = False
275
276class BitbakeWorker(object):
277 def __init__(self, din):
278 self.input = din
279 bb.utils.nonblockingfd(self.input)
280 self.queue = ""
281 self.cookercfg = None
282 self.databuilder = None
283 self.data = None
284 self.build_pids = {}
285 self.build_pipes = {}
286
287 signal.signal(signal.SIGTERM, self.sigterm_exception)
288 # Let SIGHUP exit as SIGTERM
289 signal.signal(signal.SIGHUP, self.sigterm_exception)
290
291 def sigterm_exception(self, signum, stackframe):
292 if signum == signal.SIGTERM:
293 bb.warn("Worker recieved SIGTERM, shutting down...")
294 elif signum == signal.SIGHUP:
295 bb.warn("Worker recieved SIGHUP, shutting down...")
296 self.handle_finishnow(None)
297 signal.signal(signal.SIGTERM, signal.SIG_DFL)
298 os.kill(os.getpid(), signal.SIGTERM)
299
300 def serve(self):
301 while True:
302 (ready, _, _) = select.select([self.input] + [i.input for i in self.build_pipes.values()], [] , [], 1)
303 if self.input in ready:
304 try:
305 r = self.input.read()
306 if len(r) == 0:
307 # EOF on pipe, server must have terminated
308 self.sigterm_exception(signal.SIGTERM, None)
309 self.queue = self.queue + r
310 except (OSError, IOError):
311 pass
312 if len(self.queue):
313 self.handle_item("cookerconfig", self.handle_cookercfg)
314 self.handle_item("workerdata", self.handle_workerdata)
315 self.handle_item("runtask", self.handle_runtask)
316 self.handle_item("finishnow", self.handle_finishnow)
317 self.handle_item("ping", self.handle_ping)
318 self.handle_item("quit", self.handle_quit)
319
320 for pipe in self.build_pipes:
321 self.build_pipes[pipe].read()
322 if len(self.build_pids):
323 self.process_waitpid()
324 worker_flush()
325
326
327 def handle_item(self, item, func):
328 if self.queue.startswith("<" + item + ">"):
329 index = self.queue.find("</" + item + ">")
330 while index != -1:
331 func(self.queue[(len(item) + 2):index])
332 self.queue = self.queue[(index + len(item) + 3):]
333 index = self.queue.find("</" + item + ">")
334
335 def handle_cookercfg(self, data):
336 self.cookercfg = pickle.loads(data)
337 self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True)
338 self.databuilder.parseBaseConfiguration()
339 self.data = self.databuilder.data
340
341 def handle_workerdata(self, data):
342 self.workerdata = pickle.loads(data)
343 bb.msg.loggerDefaultDebugLevel = self.workerdata["logdefaultdebug"]
344 bb.msg.loggerDefaultVerbose = self.workerdata["logdefaultverbose"]
345 bb.msg.loggerVerboseLogs = self.workerdata["logdefaultverboselogs"]
346 bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"]
347 self.data.setVar("PRSERV_HOST", self.workerdata["prhost"])
348
349 def handle_ping(self, _):
350 workerlog_write("Handling ping\n")
351
352 logger.warn("Pong from bitbake-worker!")
353
354 def handle_quit(self, data):
355 workerlog_write("Handling quit\n")
356
357 global normalexit
358 normalexit = True
359 sys.exit(0)
360
361 def handle_runtask(self, data):
362 fn, task, taskname, quieterrors, appends, taskdepdata = pickle.loads(data)
363 workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname))
364
365 pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.workerdata, fn, task, taskname, appends, taskdepdata, quieterrors)
366
367 self.build_pids[pid] = task
368 self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout)
369
370 def process_waitpid(self):
371 """
372 Return none is there are no processes awaiting result collection, otherwise
373 collect the process exit codes and close the information pipe.
374 """
375 try:
376 pid, status = os.waitpid(-1, os.WNOHANG)
377 if pid == 0 or os.WIFSTOPPED(status):
378 return None
379 except OSError:
380 return None
381
382 workerlog_write("Exit code of %s for pid %s\n" % (status, pid))
383
384 if os.WIFEXITED(status):
385 status = os.WEXITSTATUS(status)
386 elif os.WIFSIGNALED(status):
387 # Per shell conventions for $?, when a process exits due to
388 # a signal, we return an exit code of 128 + SIGNUM
389 status = 128 + os.WTERMSIG(status)
390
391 task = self.build_pids[pid]
392 del self.build_pids[pid]
393
394 self.build_pipes[pid].close()
395 del self.build_pipes[pid]
396
397 worker_fire_prepickled("<exitcode>" + pickle.dumps((task, status)) + "</exitcode>")
398
399 def handle_finishnow(self, _):
400 if self.build_pids:
401 logger.info("Sending SIGTERM to remaining %s tasks", len(self.build_pids))
402 for k, v in self.build_pids.iteritems():
403 try:
404 os.kill(-k, signal.SIGTERM)
405 os.waitpid(-1, 0)
406 except:
407 pass
408 for pipe in self.build_pipes:
409 self.build_pipes[pipe].read()
410
411try:
412 worker = BitbakeWorker(sys.stdin)
413 if not profiling:
414 worker.serve()
415 else:
416 profname = "profile-worker.log"
417 prof = profile.Profile()
418 try:
419 profile.Profile.runcall(prof, worker.serve)
420 finally:
421 prof.dump_stats(profname)
422 bb.utils.process_profilelog(profname)
423except BaseException as e:
424 if not normalexit:
425 import traceback
426 sys.stderr.write(traceback.format_exc())
427 sys.stderr.write(str(e))
428while len(worker_queue):
429 worker_flush()
430workerlog_write("exitting")
431sys.exit(0)
432