blob: 828159ed75691ca1aad558c295995adf34407b57 [file] [log] [blame]
Patrick Williamsc124f4f2015-09-15 14:41:29 -05001#
2# BitBake Process based server.
3#
4# Copyright (C) 2010 Bob Foerster <robert@erafx.com>
5#
6# This program is free software; you can redistribute it and/or modify
7# it under the terms of the GNU General Public License version 2 as
8# published by the Free Software Foundation.
9#
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License along
16# with this program; if not, write to the Free Software Foundation, Inc.,
17# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18
19"""
20 This module implements a multiprocessing.Process based server for bitbake.
21"""
22
23import bb
24import bb.event
Patrick Williamsc124f4f2015-09-15 14:41:29 -050025import logging
26import multiprocessing
Brad Bishopd7bf8c12018-02-25 22:55:05 -050027import threading
28import array
Patrick Williamsc124f4f2015-09-15 14:41:29 -050029import os
Patrick Williamsc124f4f2015-09-15 14:41:29 -050030import sys
31import time
32import select
Brad Bishopd7bf8c12018-02-25 22:55:05 -050033import socket
34import subprocess
35import errno
36import re
37import datetime
38import bb.server.xmlrpcserver
39from bb import daemonize
40from multiprocessing import queues
Patrick Williamsc124f4f2015-09-15 14:41:29 -050041
42logger = logging.getLogger('BitBake')
43
Brad Bishopd7bf8c12018-02-25 22:55:05 -050044class ProcessTimeout(SystemExit):
45 pass
Patrick Williamsc124f4f2015-09-15 14:41:29 -050046
Brad Bishopd7bf8c12018-02-25 22:55:05 -050047class ProcessServer(multiprocessing.Process):
Patrick Williamsc124f4f2015-09-15 14:41:29 -050048 profile_filename = "profile.log"
49 profile_processed_filename = "profile.log.processed"
50
Brad Bishopd7bf8c12018-02-25 22:55:05 -050051 def __init__(self, lock, sock, sockname):
52 multiprocessing.Process.__init__(self)
53 self.command_channel = False
54 self.command_channel_reply = False
Patrick Williamsc124f4f2015-09-15 14:41:29 -050055 self.quit = False
Brad Bishop6e60e8b2018-02-01 10:27:11 -050056 self.heartbeat_seconds = 1 # default, BB_HEARTBEAT_EVENT will be checked once we have a datastore.
57 self.next_heartbeat = time.time()
Patrick Williamsc124f4f2015-09-15 14:41:29 -050058
Brad Bishopd7bf8c12018-02-25 22:55:05 -050059 self.event_handle = None
60 self.haveui = False
61 self.lastui = False
62 self.xmlrpc = False
63
64 self._idlefuns = {}
65
66 self.bitbake_lock = lock
67 self.sock = sock
68 self.sockname = sockname
69
70 def register_idle_function(self, function, data):
71 """Register a function to be called while the server is idle"""
72 assert hasattr(function, '__call__')
73 self._idlefuns[function] = data
Patrick Williamsc124f4f2015-09-15 14:41:29 -050074
75 def run(self):
Brad Bishopd7bf8c12018-02-25 22:55:05 -050076
77 if self.xmlrpcinterface[0]:
78 self.xmlrpc = bb.server.xmlrpcserver.BitBakeXMLRPCServer(self.xmlrpcinterface, self.cooker, self)
79
80 print("Bitbake XMLRPC server address: %s, server port: %s" % (self.xmlrpc.host, self.xmlrpc.port))
Patrick Williamsc124f4f2015-09-15 14:41:29 -050081
Brad Bishop6e60e8b2018-02-01 10:27:11 -050082 heartbeat_event = self.cooker.data.getVar('BB_HEARTBEAT_EVENT')
83 if heartbeat_event:
84 try:
85 self.heartbeat_seconds = float(heartbeat_event)
86 except:
Brad Bishop6e60e8b2018-02-01 10:27:11 -050087 bb.warn('Ignoring invalid BB_HEARTBEAT_EVENT=%s, must be a float specifying seconds.' % heartbeat_event)
Brad Bishopd7bf8c12018-02-25 22:55:05 -050088
89 self.timeout = self.server_timeout or self.cooker.data.getVar('BB_SERVER_TIMEOUT')
90 try:
91 if self.timeout:
92 self.timeout = float(self.timeout)
93 except:
94 bb.warn('Ignoring invalid BB_SERVER_TIMEOUT=%s, must be a float specifying seconds.' % self.timeout)
95
96
97 try:
98 self.bitbake_lock.seek(0)
99 self.bitbake_lock.truncate()
100 if self.xmlrpc:
101 self.bitbake_lock.write("%s %s:%s\n" % (os.getpid(), self.xmlrpc.host, self.xmlrpc.port))
102 else:
103 self.bitbake_lock.write("%s\n" % (os.getpid()))
104 self.bitbake_lock.flush()
105 except Exception as e:
106 print("Error writing to lock file: %s" % str(e))
107 pass
108
109 if self.cooker.configuration.profile:
110 try:
111 import cProfile as profile
112 except:
113 import profile
114 prof = profile.Profile()
115
116 ret = profile.Profile.runcall(prof, self.main)
117
118 prof.dump_stats("profile.log")
119 bb.utils.process_profilelog("profile.log")
120 print("Raw profiling information saved to profile.log and processed statistics to profile.log.processed")
121
122 else:
123 ret = self.main()
124
125 return ret
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500126
127 def main(self):
Brad Bishopd7bf8c12018-02-25 22:55:05 -0500128 self.cooker.pre_serve()
129
Patrick Williamsd8c66bc2016-06-20 12:57:21 -0500130 bb.utils.set_process_name("Cooker")
Brad Bishopd7bf8c12018-02-25 22:55:05 -0500131
132 ready = []
133
134 self.controllersock = False
135 fds = [self.sock]
136 if self.xmlrpc:
137 fds.append(self.xmlrpc)
138 print("Entering server connection loop")
139
140 def disconnect_client(self, fds):
141 if not self.haveui:
142 return
143 print("Disconnecting Client")
144 fds.remove(self.controllersock)
145 fds.remove(self.command_channel)
146 bb.event.unregister_UIHhandler(self.event_handle, True)
147 self.command_channel_reply.writer.close()
148 self.event_writer.writer.close()
149 del self.event_writer
150 self.controllersock.close()
151 self.controllersock = False
152 self.haveui = False
153 self.lastui = time.time()
154 self.cooker.clientComplete()
155 if self.timeout is None:
156 print("No timeout, exiting.")
157 self.quit = True
158
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500159 while not self.quit:
Brad Bishopd7bf8c12018-02-25 22:55:05 -0500160 if self.sock in ready:
161 self.controllersock, address = self.sock.accept()
162 if self.haveui:
163 print("Dropping connection attempt as we have a UI %s" % (str(ready)))
164 self.controllersock.close()
165 else:
166 print("Accepting %s" % (str(ready)))
167 fds.append(self.controllersock)
168 if self.controllersock in ready:
169 try:
170 print("Connecting Client")
171 ui_fds = recvfds(self.controllersock, 3)
172
173 # Where to write events to
174 writer = ConnectionWriter(ui_fds[0])
175 self.event_handle = bb.event.register_UIHhandler(writer, True)
176 self.event_writer = writer
177
178 # Where to read commands from
179 reader = ConnectionReader(ui_fds[1])
180 fds.append(reader)
181 self.command_channel = reader
182
183 # Where to send command return values to
184 writer = ConnectionWriter(ui_fds[2])
185 self.command_channel_reply = writer
186
187 self.haveui = True
188
189 except (EOFError, OSError):
190 disconnect_client(self, fds)
191
192 if not self.timeout == -1.0 and not self.haveui and self.lastui and self.timeout and \
193 (self.lastui + self.timeout) < time.time():
194 print("Server timeout, exiting.")
195 self.quit = True
196
197 if self.command_channel in ready:
198 try:
199 command = self.command_channel.get()
200 except EOFError:
201 # Client connection shutting down
202 ready = []
203 disconnect_client(self, fds)
204 continue
205 if command[0] == "terminateServer":
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500206 self.quit = True
Brad Bishopd7bf8c12018-02-25 22:55:05 -0500207 continue
208 try:
209 print("Running command %s" % command)
210 self.command_channel_reply.send(self.cooker.command.runCommand(command))
211 except Exception as e:
212 logger.exception('Exception in server main event loop running command %s (%s)' % (command, str(e)))
213
214 if self.xmlrpc in ready:
215 self.xmlrpc.handle_requests()
216
217 ready = self.idle_commands(.1, fds)
218
219 print("Exiting")
220 # Remove the socket file so we don't get any more connections to avoid races
221 os.unlink(self.sockname)
222 self.sock.close()
223
224 try:
225 self.cooker.shutdown(True)
Brad Bishop316dfdd2018-06-25 12:45:53 -0400226 self.cooker.notifier.stop()
227 self.cooker.confignotifier.stop()
Brad Bishopd7bf8c12018-02-25 22:55:05 -0500228 except:
229 pass
230
231 self.cooker.post_serve()
232
233 # Finally release the lockfile but warn about other processes holding it open
234 lock = self.bitbake_lock
235 lockfile = lock.name
236 lock.close()
237 lock = None
238
239 while not lock:
240 with bb.utils.timeout(3):
241 lock = bb.utils.lockfile(lockfile, shared=False, retry=False, block=True)
242 if not lock:
243 # Some systems may not have lsof available
244 procs = None
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500245 try:
Brad Bishopd7bf8c12018-02-25 22:55:05 -0500246 procs = subprocess.check_output(["lsof", '-w', lockfile], stderr=subprocess.STDOUT)
247 except OSError as e:
248 if e.errno != errno.ENOENT:
249 raise
250 if procs is None:
251 # Fall back to fuser if lsof is unavailable
252 try:
253 procs = subprocess.check_output(["fuser", '-v', lockfile], stderr=subprocess.STDOUT)
254 except OSError as e:
255 if e.errno != errno.ENOENT:
256 raise
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500257
Brad Bishopd7bf8c12018-02-25 22:55:05 -0500258 msg = "Delaying shutdown due to active processes which appear to be holding bitbake.lock"
259 if procs:
260 msg += ":\n%s" % str(procs)
261 print(msg)
262 return
263 # We hold the lock so we can remove the file (hide stale pid data)
264 bb.utils.remove(lockfile)
265 bb.utils.unlockfile(lock)
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500266
267 def idle_commands(self, delay, fds=None):
268 nextsleep = delay
269 if not fds:
270 fds = []
271
Patrick Williamsc0f7c042017-02-23 20:41:17 -0600272 for function, data in list(self._idlefuns.items()):
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500273 try:
274 retval = function(self, data, False)
275 if retval is False:
276 del self._idlefuns[function]
277 nextsleep = None
278 elif retval is True:
279 nextsleep = None
Patrick Williamsc0f7c042017-02-23 20:41:17 -0600280 elif isinstance(retval, float) and nextsleep:
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500281 if (retval < nextsleep):
282 nextsleep = retval
283 elif nextsleep is None:
284 continue
285 else:
286 fds = fds + retval
287 except SystemExit:
288 raise
289 except Exception as exc:
290 if not isinstance(exc, bb.BBHandledException):
291 logger.exception('Running idle function')
292 del self._idlefuns[function]
293 self.quit = True
294
Brad Bishop6e60e8b2018-02-01 10:27:11 -0500295 # Create new heartbeat event?
296 now = time.time()
297 if now >= self.next_heartbeat:
298 # We might have missed heartbeats. Just trigger once in
299 # that case and continue after the usual delay.
300 self.next_heartbeat += self.heartbeat_seconds
301 if self.next_heartbeat <= now:
302 self.next_heartbeat = now + self.heartbeat_seconds
303 heartbeat = bb.event.HeartbeatEvent(now)
304 bb.event.fire(heartbeat, self.cooker.data)
305 if nextsleep and now + nextsleep > self.next_heartbeat:
306 # Shorten timeout so that we we wake up in time for
307 # the heartbeat.
308 nextsleep = self.next_heartbeat - now
309
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500310 if nextsleep is not None:
Brad Bishopd7bf8c12018-02-25 22:55:05 -0500311 if self.xmlrpc:
312 nextsleep = self.xmlrpc.get_timeout(nextsleep)
313 try:
314 return select.select(fds,[],[],nextsleep)[0]
315 except InterruptedError:
316 # Ignore EINTR
317 return []
318 else:
319 return select.select(fds,[],[],0)[0]
320
321
322class ServerCommunicator():
323 def __init__(self, connection, recv):
324 self.connection = connection
325 self.recv = recv
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500326
327 def runCommand(self, command):
Brad Bishopd7bf8c12018-02-25 22:55:05 -0500328 self.connection.send(command)
329 if not self.recv.poll(30):
330 raise ProcessTimeout("Timeout while waiting for a reply from the bitbake server")
331 return self.recv.get()
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500332
Brad Bishopd7bf8c12018-02-25 22:55:05 -0500333 def updateFeatureSet(self, featureset):
334 _, error = self.runCommand(["setFeatures", featureset])
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500335 if error:
336 logger.error("Unable to set the cooker to the correct featureset: %s" % error)
337 raise BaseException(error)
Brad Bishopd7bf8c12018-02-25 22:55:05 -0500338
339 def getEventHandle(self):
340 handle, error = self.runCommand(["getUIHandlerNum"])
341 if error:
342 logger.error("Unable to get UI Handler Number: %s" % error)
343 raise BaseException(error)
344
345 return handle
346
347 def terminateServer(self):
348 self.connection.send(['terminateServer'])
349 return
350
351class BitBakeProcessServerConnection(object):
352 def __init__(self, ui_channel, recv, eq, sock):
353 self.connection = ServerCommunicator(ui_channel, recv)
354 self.events = eq
355 # Save sock so it doesn't get gc'd for the life of our connection
356 self.socket_connection = sock
357
358 def terminate(self):
359 self.socket_connection.close()
360 self.connection.connection.close()
361 self.connection.recv.close()
362 return
363
364class BitBakeServer(object):
365 start_log_format = '--- Starting bitbake server pid %s at %s ---'
366 start_log_datetime_format = '%Y-%m-%d %H:%M:%S.%f'
367
368 def __init__(self, lock, sockname, configuration, featureset):
369
370 self.configuration = configuration
371 self.featureset = featureset
372 self.sockname = sockname
373 self.bitbake_lock = lock
374 self.readypipe, self.readypipein = os.pipe()
375
376 # Create server control socket
377 if os.path.exists(sockname):
378 os.unlink(sockname)
379
380 self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
381 # AF_UNIX has path length issues so chdir here to workaround
382 cwd = os.getcwd()
383 logfile = os.path.join(cwd, "bitbake-cookerdaemon.log")
384
385 try:
386 os.chdir(os.path.dirname(sockname))
387 self.sock.bind(os.path.basename(sockname))
388 finally:
389 os.chdir(cwd)
390 self.sock.listen(1)
391
392 os.set_inheritable(self.sock.fileno(), True)
393 startdatetime = datetime.datetime.now()
394 bb.daemonize.createDaemon(self._startServer, logfile)
395 self.sock.close()
396 self.bitbake_lock.close()
397
398 ready = ConnectionReader(self.readypipe)
399 r = ready.poll(30)
400 if r:
401 r = ready.get()
402 if not r or r != "ready":
403 ready.close()
404 bb.error("Unable to start bitbake server")
405 if os.path.exists(logfile):
406 logstart_re = re.compile(self.start_log_format % ('([0-9]+)', '([0-9-]+ [0-9:.]+)'))
407 started = False
408 lines = []
409 with open(logfile, "r") as f:
410 for line in f:
411 if started:
412 lines.append(line)
413 else:
414 res = logstart_re.match(line.rstrip())
415 if res:
416 ldatetime = datetime.datetime.strptime(res.group(2), self.start_log_datetime_format)
417 if ldatetime >= startdatetime:
418 started = True
419 lines.append(line)
420 if lines:
421 if len(lines) > 10:
422 bb.error("Last 10 lines of server log for this session (%s):\n%s" % (logfile, "".join(lines[-10:])))
423 else:
424 bb.error("Server log for this session (%s):\n%s" % (logfile, "".join(lines)))
425 raise SystemExit(1)
426 ready.close()
427 os.close(self.readypipein)
428
429 def _startServer(self):
430 print(self.start_log_format % (os.getpid(), datetime.datetime.now().strftime(self.start_log_datetime_format)))
431 server = ProcessServer(self.bitbake_lock, self.sock, self.sockname)
432 self.configuration.setServerRegIdleCallback(server.register_idle_function)
433 writer = ConnectionWriter(self.readypipein)
434 try:
435 self.cooker = bb.cooker.BBCooker(self.configuration, self.featureset)
436 writer.send("ready")
437 except:
438 writer.send("fail")
439 raise
440 finally:
441 os.close(self.readypipein)
442 server.cooker = self.cooker
443 server.server_timeout = self.configuration.server_timeout
444 server.xmlrpcinterface = self.configuration.xmlrpcinterface
445 print("Started bitbake server pid %d" % os.getpid())
446 server.start()
447
448def connectProcessServer(sockname, featureset):
449 # Connect to socket
450 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
451 # AF_UNIX has path length issues so chdir here to workaround
452 cwd = os.getcwd()
453
454 try:
455 os.chdir(os.path.dirname(sockname))
456 sock.connect(os.path.basename(sockname))
457 finally:
458 os.chdir(cwd)
459
460 readfd = writefd = readfd1 = writefd1 = readfd2 = writefd2 = None
461 eq = command_chan_recv = command_chan = None
462
463 try:
464
465 # Send an fd for the remote to write events to
466 readfd, writefd = os.pipe()
467 eq = BBUIEventQueue(readfd)
468 # Send an fd for the remote to recieve commands from
469 readfd1, writefd1 = os.pipe()
470 command_chan = ConnectionWriter(writefd1)
471 # Send an fd for the remote to write commands results to
472 readfd2, writefd2 = os.pipe()
473 command_chan_recv = ConnectionReader(readfd2)
474
475 sendfds(sock, [writefd, readfd1, writefd2])
476
477 server_connection = BitBakeProcessServerConnection(command_chan, command_chan_recv, eq, sock)
478
479 # Close the ends of the pipes we won't use
480 for i in [writefd, readfd1, writefd2]:
481 os.close(i)
482
483 server_connection.connection.updateFeatureSet(featureset)
484
485 except (Exception, SystemExit) as e:
486 if command_chan_recv:
487 command_chan_recv.close()
488 if command_chan:
489 command_chan.close()
490 for i in [writefd, readfd1, writefd2]:
491 try:
492 os.close(i)
493 except OSError:
494 pass
495 sock.close()
496 raise
497
498 return server_connection
499
500def sendfds(sock, fds):
501 '''Send an array of fds over an AF_UNIX socket.'''
502 fds = array.array('i', fds)
503 msg = bytes([len(fds) % 256])
504 sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)])
505
506def recvfds(sock, size):
507 '''Receive an array of fds over an AF_UNIX socket.'''
508 a = array.array('i')
509 bytes_size = a.itemsize * size
510 msg, ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_LEN(bytes_size))
511 if not msg and not ancdata:
512 raise EOFError
513 try:
514 if len(ancdata) != 1:
515 raise RuntimeError('received %d items of ancdata' %
516 len(ancdata))
517 cmsg_level, cmsg_type, cmsg_data = ancdata[0]
518 if (cmsg_level == socket.SOL_SOCKET and
519 cmsg_type == socket.SCM_RIGHTS):
520 if len(cmsg_data) % a.itemsize != 0:
521 raise ValueError
522 a.frombytes(cmsg_data)
523 assert len(a) % 256 == msg[0]
524 return list(a)
525 except (ValueError, IndexError):
526 pass
527 raise RuntimeError('Invalid data received')
528
529class BBUIEventQueue:
530 def __init__(self, readfd):
531
532 self.eventQueue = []
533 self.eventQueueLock = threading.Lock()
534 self.eventQueueNotify = threading.Event()
535
536 self.reader = ConnectionReader(readfd)
537
538 self.t = threading.Thread()
539 self.t.setDaemon(True)
540 self.t.run = self.startCallbackHandler
541 self.t.start()
542
543 def getEvent(self):
544 self.eventQueueLock.acquire()
545
546 if len(self.eventQueue) == 0:
547 self.eventQueueLock.release()
548 return None
549
550 item = self.eventQueue.pop(0)
551
552 if len(self.eventQueue) == 0:
553 self.eventQueueNotify.clear()
554
555 self.eventQueueLock.release()
556 return item
557
558 def waitEvent(self, delay):
559 self.eventQueueNotify.wait(delay)
560 return self.getEvent()
561
562 def queue_event(self, event):
563 self.eventQueueLock.acquire()
564 self.eventQueue.append(event)
565 self.eventQueueNotify.set()
566 self.eventQueueLock.release()
567
568 def send_event(self, event):
569 self.queue_event(pickle.loads(event))
570
571 def startCallbackHandler(self):
572 bb.utils.set_process_name("UIEventQueue")
573 while True:
574 try:
575 self.reader.wait()
576 event = self.reader.get()
577 self.queue_event(event)
578 except EOFError:
579 # Easiest way to exit is to close the file descriptor to cause an exit
580 break
581 self.reader.close()
582
583class ConnectionReader(object):
584
585 def __init__(self, fd):
586 self.reader = multiprocessing.connection.Connection(fd, writable=False)
587 self.rlock = multiprocessing.Lock()
588
589 def wait(self, timeout=None):
590 return multiprocessing.connection.wait([self.reader], timeout)
591
592 def poll(self, timeout=None):
593 return self.reader.poll(timeout)
594
595 def get(self):
596 with self.rlock:
597 res = self.reader.recv_bytes()
598 return multiprocessing.reduction.ForkingPickler.loads(res)
599
600 def fileno(self):
601 return self.reader.fileno()
602
603 def close(self):
604 return self.reader.close()
605
606
607class ConnectionWriter(object):
608
609 def __init__(self, fd):
610 self.writer = multiprocessing.connection.Connection(fd, readable=False)
611 self.wlock = multiprocessing.Lock()
612 # Why bb.event needs this I have no idea
613 self.event = self
614
615 def send(self, obj):
616 obj = multiprocessing.reduction.ForkingPickler.dumps(obj)
617 with self.wlock:
618 self.writer.send_bytes(obj)
619
620 def fileno(self):
621 return self.writer.fileno()
622
623 def close(self):
624 return self.writer.close()