blob: 80a7875ad98053f8569fed94b60cbe6cad90c5c1 [file] [log] [blame]
Patrick Williamsc124f4f2015-09-15 14:41:29 -05001#
2# BitBake Process based server.
3#
4# Copyright (C) 2010 Bob Foerster <robert@erafx.com>
5#
6# This program is free software; you can redistribute it and/or modify
7# it under the terms of the GNU General Public License version 2 as
8# published by the Free Software Foundation.
9#
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License along
16# with this program; if not, write to the Free Software Foundation, Inc.,
17# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18
19"""
20 This module implements a multiprocessing.Process based server for bitbake.
21"""
22
23import bb
24import bb.event
Patrick Williamsc124f4f2015-09-15 14:41:29 -050025import logging
26import multiprocessing
Brad Bishopd7bf8c12018-02-25 22:55:05 -050027import threading
28import array
Patrick Williamsc124f4f2015-09-15 14:41:29 -050029import os
Patrick Williamsc124f4f2015-09-15 14:41:29 -050030import sys
31import time
32import select
Brad Bishopd7bf8c12018-02-25 22:55:05 -050033import socket
34import subprocess
35import errno
36import re
37import datetime
38import bb.server.xmlrpcserver
39from bb import daemonize
40from multiprocessing import queues
Patrick Williamsc124f4f2015-09-15 14:41:29 -050041
42logger = logging.getLogger('BitBake')
43
Brad Bishopd7bf8c12018-02-25 22:55:05 -050044class ProcessTimeout(SystemExit):
45 pass
Patrick Williamsc124f4f2015-09-15 14:41:29 -050046
Brad Bishopd7bf8c12018-02-25 22:55:05 -050047class ProcessServer(multiprocessing.Process):
Patrick Williamsc124f4f2015-09-15 14:41:29 -050048 profile_filename = "profile.log"
49 profile_processed_filename = "profile.log.processed"
50
Brad Bishopd7bf8c12018-02-25 22:55:05 -050051 def __init__(self, lock, sock, sockname):
52 multiprocessing.Process.__init__(self)
53 self.command_channel = False
54 self.command_channel_reply = False
Patrick Williamsc124f4f2015-09-15 14:41:29 -050055 self.quit = False
Brad Bishop6e60e8b2018-02-01 10:27:11 -050056 self.heartbeat_seconds = 1 # default, BB_HEARTBEAT_EVENT will be checked once we have a datastore.
57 self.next_heartbeat = time.time()
Patrick Williamsc124f4f2015-09-15 14:41:29 -050058
Brad Bishopd7bf8c12018-02-25 22:55:05 -050059 self.event_handle = None
60 self.haveui = False
61 self.lastui = False
62 self.xmlrpc = False
63
64 self._idlefuns = {}
65
66 self.bitbake_lock = lock
67 self.sock = sock
68 self.sockname = sockname
69
70 def register_idle_function(self, function, data):
71 """Register a function to be called while the server is idle"""
72 assert hasattr(function, '__call__')
73 self._idlefuns[function] = data
Patrick Williamsc124f4f2015-09-15 14:41:29 -050074
75 def run(self):
Brad Bishopd7bf8c12018-02-25 22:55:05 -050076
77 if self.xmlrpcinterface[0]:
78 self.xmlrpc = bb.server.xmlrpcserver.BitBakeXMLRPCServer(self.xmlrpcinterface, self.cooker, self)
79
80 print("Bitbake XMLRPC server address: %s, server port: %s" % (self.xmlrpc.host, self.xmlrpc.port))
Patrick Williamsc124f4f2015-09-15 14:41:29 -050081
Brad Bishop6e60e8b2018-02-01 10:27:11 -050082 heartbeat_event = self.cooker.data.getVar('BB_HEARTBEAT_EVENT')
83 if heartbeat_event:
84 try:
85 self.heartbeat_seconds = float(heartbeat_event)
86 except:
Brad Bishop6e60e8b2018-02-01 10:27:11 -050087 bb.warn('Ignoring invalid BB_HEARTBEAT_EVENT=%s, must be a float specifying seconds.' % heartbeat_event)
Brad Bishopd7bf8c12018-02-25 22:55:05 -050088
89 self.timeout = self.server_timeout or self.cooker.data.getVar('BB_SERVER_TIMEOUT')
90 try:
91 if self.timeout:
92 self.timeout = float(self.timeout)
93 except:
94 bb.warn('Ignoring invalid BB_SERVER_TIMEOUT=%s, must be a float specifying seconds.' % self.timeout)
95
96
97 try:
98 self.bitbake_lock.seek(0)
99 self.bitbake_lock.truncate()
100 if self.xmlrpc:
101 self.bitbake_lock.write("%s %s:%s\n" % (os.getpid(), self.xmlrpc.host, self.xmlrpc.port))
102 else:
103 self.bitbake_lock.write("%s\n" % (os.getpid()))
104 self.bitbake_lock.flush()
105 except Exception as e:
106 print("Error writing to lock file: %s" % str(e))
107 pass
108
109 if self.cooker.configuration.profile:
110 try:
111 import cProfile as profile
112 except:
113 import profile
114 prof = profile.Profile()
115
116 ret = profile.Profile.runcall(prof, self.main)
117
118 prof.dump_stats("profile.log")
119 bb.utils.process_profilelog("profile.log")
120 print("Raw profiling information saved to profile.log and processed statistics to profile.log.processed")
121
122 else:
123 ret = self.main()
124
125 return ret
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500126
127 def main(self):
Brad Bishopd7bf8c12018-02-25 22:55:05 -0500128 self.cooker.pre_serve()
129
Patrick Williamsd8c66bc2016-06-20 12:57:21 -0500130 bb.utils.set_process_name("Cooker")
Brad Bishopd7bf8c12018-02-25 22:55:05 -0500131
132 ready = []
Brad Bishopf058f492019-01-28 23:50:33 -0500133 newconnections = []
Brad Bishopd7bf8c12018-02-25 22:55:05 -0500134
135 self.controllersock = False
136 fds = [self.sock]
137 if self.xmlrpc:
138 fds.append(self.xmlrpc)
139 print("Entering server connection loop")
140
141 def disconnect_client(self, fds):
Brad Bishopd7bf8c12018-02-25 22:55:05 -0500142 print("Disconnecting Client")
Brad Bishopf058f492019-01-28 23:50:33 -0500143 if self.controllersock:
144 fds.remove(self.controllersock)
145 self.controllersock.close()
146 self.controllersock = False
147 if self.haveui:
148 fds.remove(self.command_channel)
149 bb.event.unregister_UIHhandler(self.event_handle, True)
150 self.command_channel_reply.writer.close()
151 self.event_writer.writer.close()
152 self.command_channel.close()
153 self.command_channel = False
154 del self.event_writer
155 self.lastui = time.time()
156 self.cooker.clientComplete()
157 self.haveui = False
158 ready = select.select(fds,[],[],0)[0]
159 if newconnections:
160 print("Starting new client")
161 conn = newconnections.pop(-1)
162 fds.append(conn)
163 self.controllersock = conn
164 elif self.timeout is None and not ready:
Brad Bishopd7bf8c12018-02-25 22:55:05 -0500165 print("No timeout, exiting.")
166 self.quit = True
167
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500168 while not self.quit:
Brad Bishopd7bf8c12018-02-25 22:55:05 -0500169 if self.sock in ready:
Brad Bishopf058f492019-01-28 23:50:33 -0500170 while select.select([self.sock],[],[],0)[0]:
171 controllersock, address = self.sock.accept()
172 if self.controllersock:
173 print("Queuing %s (%s)" % (str(ready), str(newconnections)))
174 newconnections.append(controllersock)
175 else:
176 print("Accepting %s (%s)" % (str(ready), str(newconnections)))
177 self.controllersock = controllersock
178 fds.append(controllersock)
Brad Bishopd7bf8c12018-02-25 22:55:05 -0500179 if self.controllersock in ready:
180 try:
Brad Bishopf058f492019-01-28 23:50:33 -0500181 print("Processing Client")
Brad Bishopd7bf8c12018-02-25 22:55:05 -0500182 ui_fds = recvfds(self.controllersock, 3)
Brad Bishopf058f492019-01-28 23:50:33 -0500183 print("Connecting Client")
Brad Bishopd7bf8c12018-02-25 22:55:05 -0500184
185 # Where to write events to
186 writer = ConnectionWriter(ui_fds[0])
187 self.event_handle = bb.event.register_UIHhandler(writer, True)
188 self.event_writer = writer
189
190 # Where to read commands from
191 reader = ConnectionReader(ui_fds[1])
192 fds.append(reader)
193 self.command_channel = reader
194
195 # Where to send command return values to
196 writer = ConnectionWriter(ui_fds[2])
197 self.command_channel_reply = writer
198
199 self.haveui = True
200
201 except (EOFError, OSError):
202 disconnect_client(self, fds)
203
204 if not self.timeout == -1.0 and not self.haveui and self.lastui and self.timeout and \
205 (self.lastui + self.timeout) < time.time():
206 print("Server timeout, exiting.")
207 self.quit = True
208
209 if self.command_channel in ready:
210 try:
211 command = self.command_channel.get()
212 except EOFError:
213 # Client connection shutting down
214 ready = []
215 disconnect_client(self, fds)
216 continue
217 if command[0] == "terminateServer":
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500218 self.quit = True
Brad Bishopd7bf8c12018-02-25 22:55:05 -0500219 continue
220 try:
221 print("Running command %s" % command)
222 self.command_channel_reply.send(self.cooker.command.runCommand(command))
223 except Exception as e:
224 logger.exception('Exception in server main event loop running command %s (%s)' % (command, str(e)))
225
226 if self.xmlrpc in ready:
227 self.xmlrpc.handle_requests()
228
229 ready = self.idle_commands(.1, fds)
230
231 print("Exiting")
232 # Remove the socket file so we don't get any more connections to avoid races
233 os.unlink(self.sockname)
234 self.sock.close()
235
236 try:
237 self.cooker.shutdown(True)
Brad Bishop316dfdd2018-06-25 12:45:53 -0400238 self.cooker.notifier.stop()
239 self.cooker.confignotifier.stop()
Brad Bishopd7bf8c12018-02-25 22:55:05 -0500240 except:
241 pass
242
243 self.cooker.post_serve()
244
245 # Finally release the lockfile but warn about other processes holding it open
246 lock = self.bitbake_lock
247 lockfile = lock.name
248 lock.close()
249 lock = None
250
251 while not lock:
252 with bb.utils.timeout(3):
253 lock = bb.utils.lockfile(lockfile, shared=False, retry=False, block=True)
Brad Bishopa5c52ff2018-11-23 10:55:50 +1300254 if lock:
255 # We hold the lock so we can remove the file (hide stale pid data)
256 bb.utils.remove(lockfile)
257 bb.utils.unlockfile(lock)
258 return
259
Brad Bishopd7bf8c12018-02-25 22:55:05 -0500260 if not lock:
261 # Some systems may not have lsof available
262 procs = None
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500263 try:
Brad Bishopd7bf8c12018-02-25 22:55:05 -0500264 procs = subprocess.check_output(["lsof", '-w', lockfile], stderr=subprocess.STDOUT)
265 except OSError as e:
266 if e.errno != errno.ENOENT:
267 raise
268 if procs is None:
269 # Fall back to fuser if lsof is unavailable
270 try:
271 procs = subprocess.check_output(["fuser", '-v', lockfile], stderr=subprocess.STDOUT)
272 except OSError as e:
273 if e.errno != errno.ENOENT:
274 raise
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500275
Brad Bishopd7bf8c12018-02-25 22:55:05 -0500276 msg = "Delaying shutdown due to active processes which appear to be holding bitbake.lock"
277 if procs:
278 msg += ":\n%s" % str(procs)
279 print(msg)
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500280
281 def idle_commands(self, delay, fds=None):
282 nextsleep = delay
283 if not fds:
284 fds = []
285
Patrick Williamsc0f7c042017-02-23 20:41:17 -0600286 for function, data in list(self._idlefuns.items()):
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500287 try:
288 retval = function(self, data, False)
289 if retval is False:
290 del self._idlefuns[function]
291 nextsleep = None
292 elif retval is True:
293 nextsleep = None
Patrick Williamsc0f7c042017-02-23 20:41:17 -0600294 elif isinstance(retval, float) and nextsleep:
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500295 if (retval < nextsleep):
296 nextsleep = retval
297 elif nextsleep is None:
298 continue
299 else:
300 fds = fds + retval
301 except SystemExit:
302 raise
303 except Exception as exc:
304 if not isinstance(exc, bb.BBHandledException):
305 logger.exception('Running idle function')
306 del self._idlefuns[function]
307 self.quit = True
308
Brad Bishop6e60e8b2018-02-01 10:27:11 -0500309 # Create new heartbeat event?
310 now = time.time()
311 if now >= self.next_heartbeat:
312 # We might have missed heartbeats. Just trigger once in
313 # that case and continue after the usual delay.
314 self.next_heartbeat += self.heartbeat_seconds
315 if self.next_heartbeat <= now:
316 self.next_heartbeat = now + self.heartbeat_seconds
317 heartbeat = bb.event.HeartbeatEvent(now)
318 bb.event.fire(heartbeat, self.cooker.data)
319 if nextsleep and now + nextsleep > self.next_heartbeat:
320 # Shorten timeout so that we we wake up in time for
321 # the heartbeat.
322 nextsleep = self.next_heartbeat - now
323
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500324 if nextsleep is not None:
Brad Bishopd7bf8c12018-02-25 22:55:05 -0500325 if self.xmlrpc:
326 nextsleep = self.xmlrpc.get_timeout(nextsleep)
327 try:
328 return select.select(fds,[],[],nextsleep)[0]
329 except InterruptedError:
330 # Ignore EINTR
331 return []
332 else:
333 return select.select(fds,[],[],0)[0]
334
335
336class ServerCommunicator():
337 def __init__(self, connection, recv):
338 self.connection = connection
339 self.recv = recv
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500340
341 def runCommand(self, command):
Brad Bishopd7bf8c12018-02-25 22:55:05 -0500342 self.connection.send(command)
343 if not self.recv.poll(30):
344 raise ProcessTimeout("Timeout while waiting for a reply from the bitbake server")
345 return self.recv.get()
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500346
Brad Bishopd7bf8c12018-02-25 22:55:05 -0500347 def updateFeatureSet(self, featureset):
348 _, error = self.runCommand(["setFeatures", featureset])
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500349 if error:
350 logger.error("Unable to set the cooker to the correct featureset: %s" % error)
351 raise BaseException(error)
Brad Bishopd7bf8c12018-02-25 22:55:05 -0500352
353 def getEventHandle(self):
354 handle, error = self.runCommand(["getUIHandlerNum"])
355 if error:
356 logger.error("Unable to get UI Handler Number: %s" % error)
357 raise BaseException(error)
358
359 return handle
360
361 def terminateServer(self):
362 self.connection.send(['terminateServer'])
363 return
364
365class BitBakeProcessServerConnection(object):
366 def __init__(self, ui_channel, recv, eq, sock):
367 self.connection = ServerCommunicator(ui_channel, recv)
368 self.events = eq
369 # Save sock so it doesn't get gc'd for the life of our connection
370 self.socket_connection = sock
371
372 def terminate(self):
373 self.socket_connection.close()
374 self.connection.connection.close()
375 self.connection.recv.close()
376 return
377
378class BitBakeServer(object):
379 start_log_format = '--- Starting bitbake server pid %s at %s ---'
380 start_log_datetime_format = '%Y-%m-%d %H:%M:%S.%f'
381
382 def __init__(self, lock, sockname, configuration, featureset):
383
384 self.configuration = configuration
385 self.featureset = featureset
386 self.sockname = sockname
387 self.bitbake_lock = lock
388 self.readypipe, self.readypipein = os.pipe()
389
390 # Create server control socket
391 if os.path.exists(sockname):
392 os.unlink(sockname)
393
Brad Bishop1a4b7ee2018-12-16 17:11:34 -0800394 # Place the log in the builddirectory alongside the lock file
395 logfile = os.path.join(os.path.dirname(self.bitbake_lock.name), "bitbake-cookerdaemon.log")
396
Brad Bishopd7bf8c12018-02-25 22:55:05 -0500397 self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
398 # AF_UNIX has path length issues so chdir here to workaround
399 cwd = os.getcwd()
Brad Bishopd7bf8c12018-02-25 22:55:05 -0500400 try:
401 os.chdir(os.path.dirname(sockname))
402 self.sock.bind(os.path.basename(sockname))
403 finally:
404 os.chdir(cwd)
405 self.sock.listen(1)
406
407 os.set_inheritable(self.sock.fileno(), True)
408 startdatetime = datetime.datetime.now()
409 bb.daemonize.createDaemon(self._startServer, logfile)
410 self.sock.close()
411 self.bitbake_lock.close()
Brad Bishop1a4b7ee2018-12-16 17:11:34 -0800412 os.close(self.readypipein)
Brad Bishopd7bf8c12018-02-25 22:55:05 -0500413
414 ready = ConnectionReader(self.readypipe)
Brad Bishopf058f492019-01-28 23:50:33 -0500415 r = ready.poll(5)
416 if not r:
417 bb.note("Bitbake server didn't start within 5 seconds, waiting for 90")
418 r = ready.poll(90)
Brad Bishopd7bf8c12018-02-25 22:55:05 -0500419 if r:
Brad Bishop1a4b7ee2018-12-16 17:11:34 -0800420 try:
421 r = ready.get()
422 except EOFError:
423 # Trap the child exitting/closing the pipe and error out
424 r = None
Brad Bishopf058f492019-01-28 23:50:33 -0500425 if not r or r[0] != "r":
Brad Bishopd7bf8c12018-02-25 22:55:05 -0500426 ready.close()
Brad Bishopf058f492019-01-28 23:50:33 -0500427 bb.error("Unable to start bitbake server (%s)" % str(r))
Brad Bishopd7bf8c12018-02-25 22:55:05 -0500428 if os.path.exists(logfile):
429 logstart_re = re.compile(self.start_log_format % ('([0-9]+)', '([0-9-]+ [0-9:.]+)'))
430 started = False
431 lines = []
Brad Bishopa5c52ff2018-11-23 10:55:50 +1300432 lastlines = []
Brad Bishopd7bf8c12018-02-25 22:55:05 -0500433 with open(logfile, "r") as f:
434 for line in f:
435 if started:
436 lines.append(line)
437 else:
Brad Bishopa5c52ff2018-11-23 10:55:50 +1300438 lastlines.append(line)
Brad Bishopd7bf8c12018-02-25 22:55:05 -0500439 res = logstart_re.match(line.rstrip())
440 if res:
441 ldatetime = datetime.datetime.strptime(res.group(2), self.start_log_datetime_format)
442 if ldatetime >= startdatetime:
443 started = True
444 lines.append(line)
Brad Bishopa5c52ff2018-11-23 10:55:50 +1300445 if len(lastlines) > 60:
446 lastlines = lastlines[-60:]
Brad Bishopd7bf8c12018-02-25 22:55:05 -0500447 if lines:
Brad Bishopa5c52ff2018-11-23 10:55:50 +1300448 if len(lines) > 60:
449 bb.error("Last 60 lines of server log for this session (%s):\n%s" % (logfile, "".join(lines[-60:])))
Brad Bishopd7bf8c12018-02-25 22:55:05 -0500450 else:
451 bb.error("Server log for this session (%s):\n%s" % (logfile, "".join(lines)))
Brad Bishopa5c52ff2018-11-23 10:55:50 +1300452 elif lastlines:
453 bb.error("Server didn't start, last 60 loglines (%s):\n%s" % (logfile, "".join(lastlines)))
454 else:
455 bb.error("%s doesn't exist" % logfile)
456
Brad Bishopd7bf8c12018-02-25 22:55:05 -0500457 raise SystemExit(1)
Brad Bishopa5c52ff2018-11-23 10:55:50 +1300458
Brad Bishopd7bf8c12018-02-25 22:55:05 -0500459 ready.close()
Brad Bishopd7bf8c12018-02-25 22:55:05 -0500460
461 def _startServer(self):
462 print(self.start_log_format % (os.getpid(), datetime.datetime.now().strftime(self.start_log_datetime_format)))
Brad Bishope2d5b612018-11-23 10:55:50 +1300463 sys.stdout.flush()
464
Brad Bishopd7bf8c12018-02-25 22:55:05 -0500465 server = ProcessServer(self.bitbake_lock, self.sock, self.sockname)
466 self.configuration.setServerRegIdleCallback(server.register_idle_function)
Brad Bishop1a4b7ee2018-12-16 17:11:34 -0800467 os.close(self.readypipe)
Brad Bishopd7bf8c12018-02-25 22:55:05 -0500468 writer = ConnectionWriter(self.readypipein)
Brad Bishop1a4b7ee2018-12-16 17:11:34 -0800469 self.cooker = bb.cooker.BBCooker(self.configuration, self.featureset)
Brad Bishopf058f492019-01-28 23:50:33 -0500470 writer.send("r")
Brad Bishop1a4b7ee2018-12-16 17:11:34 -0800471 writer.close()
Brad Bishopd7bf8c12018-02-25 22:55:05 -0500472 server.cooker = self.cooker
473 server.server_timeout = self.configuration.server_timeout
474 server.xmlrpcinterface = self.configuration.xmlrpcinterface
475 print("Started bitbake server pid %d" % os.getpid())
Brad Bishope2d5b612018-11-23 10:55:50 +1300476 sys.stdout.flush()
477
Brad Bishopd7bf8c12018-02-25 22:55:05 -0500478 server.start()
479
480def connectProcessServer(sockname, featureset):
481 # Connect to socket
482 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
483 # AF_UNIX has path length issues so chdir here to workaround
484 cwd = os.getcwd()
485
Brad Bishopf058f492019-01-28 23:50:33 -0500486 readfd = writefd = readfd1 = writefd1 = readfd2 = writefd2 = None
487 eq = command_chan_recv = command_chan = None
488
489 sock.settimeout(10)
490
Brad Bishopd7bf8c12018-02-25 22:55:05 -0500491 try:
Brad Bishope2d5b612018-11-23 10:55:50 +1300492 try:
493 os.chdir(os.path.dirname(sockname))
Brad Bishopf058f492019-01-28 23:50:33 -0500494 finished = False
495 while not finished:
496 try:
497 sock.connect(os.path.basename(sockname))
498 finished = True
499 except IOError as e:
500 if e.errno == errno.EWOULDBLOCK:
501 pass
Richard Purdie3da11142019-02-05 21:34:37 +0000502 raise
Brad Bishope2d5b612018-11-23 10:55:50 +1300503 finally:
504 os.chdir(cwd)
Brad Bishopd7bf8c12018-02-25 22:55:05 -0500505
506 # Send an fd for the remote to write events to
507 readfd, writefd = os.pipe()
508 eq = BBUIEventQueue(readfd)
509 # Send an fd for the remote to recieve commands from
510 readfd1, writefd1 = os.pipe()
511 command_chan = ConnectionWriter(writefd1)
512 # Send an fd for the remote to write commands results to
513 readfd2, writefd2 = os.pipe()
514 command_chan_recv = ConnectionReader(readfd2)
515
516 sendfds(sock, [writefd, readfd1, writefd2])
517
518 server_connection = BitBakeProcessServerConnection(command_chan, command_chan_recv, eq, sock)
519
520 # Close the ends of the pipes we won't use
521 for i in [writefd, readfd1, writefd2]:
522 os.close(i)
523
524 server_connection.connection.updateFeatureSet(featureset)
525
526 except (Exception, SystemExit) as e:
527 if command_chan_recv:
528 command_chan_recv.close()
529 if command_chan:
530 command_chan.close()
531 for i in [writefd, readfd1, writefd2]:
532 try:
Brad Bishope2d5b612018-11-23 10:55:50 +1300533 if i:
534 os.close(i)
Brad Bishopd7bf8c12018-02-25 22:55:05 -0500535 except OSError:
536 pass
537 sock.close()
538 raise
539
540 return server_connection
541
542def sendfds(sock, fds):
543 '''Send an array of fds over an AF_UNIX socket.'''
544 fds = array.array('i', fds)
545 msg = bytes([len(fds) % 256])
546 sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)])
547
548def recvfds(sock, size):
549 '''Receive an array of fds over an AF_UNIX socket.'''
550 a = array.array('i')
551 bytes_size = a.itemsize * size
552 msg, ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_LEN(bytes_size))
553 if not msg and not ancdata:
554 raise EOFError
555 try:
556 if len(ancdata) != 1:
557 raise RuntimeError('received %d items of ancdata' %
558 len(ancdata))
559 cmsg_level, cmsg_type, cmsg_data = ancdata[0]
560 if (cmsg_level == socket.SOL_SOCKET and
561 cmsg_type == socket.SCM_RIGHTS):
562 if len(cmsg_data) % a.itemsize != 0:
563 raise ValueError
564 a.frombytes(cmsg_data)
565 assert len(a) % 256 == msg[0]
566 return list(a)
567 except (ValueError, IndexError):
568 pass
569 raise RuntimeError('Invalid data received')
570
571class BBUIEventQueue:
572 def __init__(self, readfd):
573
574 self.eventQueue = []
575 self.eventQueueLock = threading.Lock()
576 self.eventQueueNotify = threading.Event()
577
578 self.reader = ConnectionReader(readfd)
579
580 self.t = threading.Thread()
581 self.t.setDaemon(True)
582 self.t.run = self.startCallbackHandler
583 self.t.start()
584
585 def getEvent(self):
586 self.eventQueueLock.acquire()
587
588 if len(self.eventQueue) == 0:
589 self.eventQueueLock.release()
590 return None
591
592 item = self.eventQueue.pop(0)
593
594 if len(self.eventQueue) == 0:
595 self.eventQueueNotify.clear()
596
597 self.eventQueueLock.release()
598 return item
599
600 def waitEvent(self, delay):
601 self.eventQueueNotify.wait(delay)
602 return self.getEvent()
603
604 def queue_event(self, event):
605 self.eventQueueLock.acquire()
606 self.eventQueue.append(event)
607 self.eventQueueNotify.set()
608 self.eventQueueLock.release()
609
610 def send_event(self, event):
611 self.queue_event(pickle.loads(event))
612
613 def startCallbackHandler(self):
614 bb.utils.set_process_name("UIEventQueue")
615 while True:
616 try:
617 self.reader.wait()
618 event = self.reader.get()
619 self.queue_event(event)
620 except EOFError:
621 # Easiest way to exit is to close the file descriptor to cause an exit
622 break
623 self.reader.close()
624
625class ConnectionReader(object):
626
627 def __init__(self, fd):
628 self.reader = multiprocessing.connection.Connection(fd, writable=False)
629 self.rlock = multiprocessing.Lock()
630
631 def wait(self, timeout=None):
632 return multiprocessing.connection.wait([self.reader], timeout)
633
634 def poll(self, timeout=None):
635 return self.reader.poll(timeout)
636
637 def get(self):
638 with self.rlock:
639 res = self.reader.recv_bytes()
640 return multiprocessing.reduction.ForkingPickler.loads(res)
641
642 def fileno(self):
643 return self.reader.fileno()
644
645 def close(self):
646 return self.reader.close()
647
648
649class ConnectionWriter(object):
650
651 def __init__(self, fd):
652 self.writer = multiprocessing.connection.Connection(fd, readable=False)
653 self.wlock = multiprocessing.Lock()
654 # Why bb.event needs this I have no idea
655 self.event = self
656
657 def send(self, obj):
658 obj = multiprocessing.reduction.ForkingPickler.dumps(obj)
659 with self.wlock:
660 self.writer.send_bytes(obj)
661
662 def fileno(self):
663 return self.writer.fileno()
664
665 def close(self):
666 return self.writer.close()