blob: c3c1450a50b47460f57bb0066a557af6624f0b06 [file] [log] [blame]
Patrick Williamsc124f4f2015-09-15 14:41:29 -05001#
2# BitBake Process based server.
3#
4# Copyright (C) 2010 Bob Foerster <robert@erafx.com>
5#
6# This program is free software; you can redistribute it and/or modify
7# it under the terms of the GNU General Public License version 2 as
8# published by the Free Software Foundation.
9#
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License along
16# with this program; if not, write to the Free Software Foundation, Inc.,
17# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18
19"""
20 This module implements a multiprocessing.Process based server for bitbake.
21"""
22
23import bb
24import bb.event
25import itertools
26import logging
27import multiprocessing
28import os
29import signal
30import sys
31import time
32import select
Patrick Williamsc0f7c042017-02-23 20:41:17 -060033from queue import Empty
Patrick Williamsc124f4f2015-09-15 14:41:29 -050034from multiprocessing import Event, Process, util, Queue, Pipe, queues, Manager
35
36from . import BitBakeBaseServer, BitBakeBaseServerConnection, BaseImplServer
37
38logger = logging.getLogger('BitBake')
39
40class ServerCommunicator():
41 def __init__(self, connection, event_handle, server):
42 self.connection = connection
43 self.event_handle = event_handle
44 self.server = server
45
46 def runCommand(self, command):
47 # @todo try/except
48 self.connection.send(command)
49
50 if not self.server.is_alive():
51 raise SystemExit
52
53 while True:
54 # don't let the user ctrl-c while we're waiting for a response
55 try:
Patrick Williamsd8c66bc2016-06-20 12:57:21 -050056 for idx in range(0,4): # 0, 1, 2, 3
57 if self.connection.poll(5):
58 return self.connection.recv()
59 else:
60 bb.warn("Timeout while attempting to communicate with bitbake server")
61 bb.fatal("Gave up; Too many tries: timeout while attempting to communicate with bitbake server")
Patrick Williamsc124f4f2015-09-15 14:41:29 -050062 except KeyboardInterrupt:
63 pass
64
65 def getEventHandle(self):
66 return self.event_handle.value
67
68class EventAdapter():
69 """
70 Adapter to wrap our event queue since the caller (bb.event) expects to
71 call a send() method, but our actual queue only has put()
72 """
73 def __init__(self, queue):
74 self.queue = queue
75
76 def send(self, event):
77 try:
78 self.queue.put(event)
79 except Exception as err:
80 print("EventAdapter puked: %s" % str(err))
81
82
83class ProcessServer(Process, BaseImplServer):
84 profile_filename = "profile.log"
85 profile_processed_filename = "profile.log.processed"
86
87 def __init__(self, command_channel, event_queue, featurelist):
88 BaseImplServer.__init__(self)
89 Process.__init__(self)
90 self.command_channel = command_channel
91 self.event_queue = event_queue
92 self.event = EventAdapter(event_queue)
93 self.featurelist = featurelist
94 self.quit = False
Brad Bishop6e60e8b2018-02-01 10:27:11 -050095 self.heartbeat_seconds = 1 # default, BB_HEARTBEAT_EVENT will be checked once we have a datastore.
96 self.next_heartbeat = time.time()
Patrick Williamsc124f4f2015-09-15 14:41:29 -050097
98 self.quitin, self.quitout = Pipe()
99 self.event_handle = multiprocessing.Value("i")
100
101 def run(self):
102 for event in bb.event.ui_queue:
103 self.event_queue.put(event)
104 self.event_handle.value = bb.event.register_UIHhandler(self, True)
105
Brad Bishop6e60e8b2018-02-01 10:27:11 -0500106 heartbeat_event = self.cooker.data.getVar('BB_HEARTBEAT_EVENT')
107 if heartbeat_event:
108 try:
109 self.heartbeat_seconds = float(heartbeat_event)
110 except:
111 # Throwing an exception here causes bitbake to hang.
112 # Just warn about the invalid setting and continue
113 bb.warn('Ignoring invalid BB_HEARTBEAT_EVENT=%s, must be a float specifying seconds.' % heartbeat_event)
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500114 bb.cooker.server_main(self.cooker, self.main)
115
116 def main(self):
117 # Ignore SIGINT within the server, as all SIGINT handling is done by
118 # the UI and communicated to us
119 self.quitin.close()
120 signal.signal(signal.SIGINT, signal.SIG_IGN)
Patrick Williamsd8c66bc2016-06-20 12:57:21 -0500121 bb.utils.set_process_name("Cooker")
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500122 while not self.quit:
123 try:
124 if self.command_channel.poll():
125 command = self.command_channel.recv()
126 self.runCommand(command)
127 if self.quitout.poll():
128 self.quitout.recv()
129 self.quit = True
130 try:
131 self.runCommand(["stateForceShutdown"])
132 except:
133 pass
134
135 self.idle_commands(.1, [self.command_channel, self.quitout])
136 except Exception:
137 logger.exception('Running command %s', command)
138
139 self.event_queue.close()
140 bb.event.unregister_UIHhandler(self.event_handle.value)
141 self.command_channel.close()
142 self.cooker.shutdown(True)
143 self.quitout.close()
144
145 def idle_commands(self, delay, fds=None):
146 nextsleep = delay
147 if not fds:
148 fds = []
149
Patrick Williamsc0f7c042017-02-23 20:41:17 -0600150 for function, data in list(self._idlefuns.items()):
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500151 try:
152 retval = function(self, data, False)
153 if retval is False:
154 del self._idlefuns[function]
155 nextsleep = None
156 elif retval is True:
157 nextsleep = None
Patrick Williamsc0f7c042017-02-23 20:41:17 -0600158 elif isinstance(retval, float) and nextsleep:
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500159 if (retval < nextsleep):
160 nextsleep = retval
161 elif nextsleep is None:
162 continue
163 else:
164 fds = fds + retval
165 except SystemExit:
166 raise
167 except Exception as exc:
168 if not isinstance(exc, bb.BBHandledException):
169 logger.exception('Running idle function')
170 del self._idlefuns[function]
171 self.quit = True
172
Brad Bishop6e60e8b2018-02-01 10:27:11 -0500173 # Create new heartbeat event?
174 now = time.time()
175 if now >= self.next_heartbeat:
176 # We might have missed heartbeats. Just trigger once in
177 # that case and continue after the usual delay.
178 self.next_heartbeat += self.heartbeat_seconds
179 if self.next_heartbeat <= now:
180 self.next_heartbeat = now + self.heartbeat_seconds
181 heartbeat = bb.event.HeartbeatEvent(now)
182 bb.event.fire(heartbeat, self.cooker.data)
183 if nextsleep and now + nextsleep > self.next_heartbeat:
184 # Shorten timeout so that we we wake up in time for
185 # the heartbeat.
186 nextsleep = self.next_heartbeat - now
187
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500188 if nextsleep is not None:
189 select.select(fds,[],[],nextsleep)
190
191 def runCommand(self, command):
192 """
193 Run a cooker command on the server
194 """
195 self.command_channel.send(self.cooker.command.runCommand(command))
196
197 def stop(self):
198 self.quitin.send("quit")
199 self.quitin.close()
200
201class BitBakeProcessServerConnection(BitBakeBaseServerConnection):
202 def __init__(self, serverImpl, ui_channel, event_queue):
203 self.procserver = serverImpl
204 self.ui_channel = ui_channel
205 self.event_queue = event_queue
206 self.connection = ServerCommunicator(self.ui_channel, self.procserver.event_handle, self.procserver)
207 self.events = self.event_queue
208 self.terminated = False
209
210 def sigterm_terminate(self):
211 bb.error("UI received SIGTERM")
212 self.terminate()
213
214 def terminate(self):
215 if self.terminated:
216 return
217 self.terminated = True
218 def flushevents():
219 while True:
220 try:
221 event = self.event_queue.get(block=False)
222 except (Empty, IOError):
223 break
224 if isinstance(event, logging.LogRecord):
225 logger.handle(event)
226
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500227 self.procserver.stop()
228
229 while self.procserver.is_alive():
230 flushevents()
231 self.procserver.join(0.1)
232
233 self.ui_channel.close()
234 self.event_queue.close()
235 self.event_queue.setexit()
Brad Bishop6e60e8b2018-02-01 10:27:11 -0500236 # XXX: Call explicity close in _writer to avoid
237 # fd leakage because isn't called on Queue.close()
238 self.event_queue._writer.close()
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500239
240# Wrap Queue to provide API which isn't server implementation specific
241class ProcessEventQueue(multiprocessing.queues.Queue):
242 def __init__(self, maxsize):
Patrick Williamsc0f7c042017-02-23 20:41:17 -0600243 multiprocessing.queues.Queue.__init__(self, maxsize, ctx=multiprocessing.get_context())
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500244 self.exit = False
Patrick Williamsd8c66bc2016-06-20 12:57:21 -0500245 bb.utils.set_process_name("ProcessEQueue")
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500246
247 def setexit(self):
248 self.exit = True
249
250 def waitEvent(self, timeout):
251 if self.exit:
Patrick Williamsc0f7c042017-02-23 20:41:17 -0600252 return self.getEvent()
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500253 try:
254 if not self.server.is_alive():
Patrick Williamsc0f7c042017-02-23 20:41:17 -0600255 return self.getEvent()
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500256 return self.get(True, timeout)
257 except Empty:
258 return None
259
260 def getEvent(self):
261 try:
262 if not self.server.is_alive():
263 self.setexit()
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500264 return self.get(False)
265 except Empty:
Patrick Williamsc0f7c042017-02-23 20:41:17 -0600266 if self.exit:
267 sys.exit(1)
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500268 return None
269
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500270class BitBakeServer(BitBakeBaseServer):
Patrick Williamsd8c66bc2016-06-20 12:57:21 -0500271 def initServer(self, single_use=True):
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500272 # establish communication channels. We use bidirectional pipes for
273 # ui <--> server command/response pairs
274 # and a queue for server -> ui event notifications
275 #
276 self.ui_channel, self.server_channel = Pipe()
277 self.event_queue = ProcessEventQueue(0)
278 self.serverImpl = ProcessServer(self.server_channel, self.event_queue, None)
279 self.event_queue.server = self.serverImpl
280
281 def detach(self):
282 self.serverImpl.start()
283 return
284
285 def establishConnection(self, featureset):
286
287 self.connection = BitBakeProcessServerConnection(self.serverImpl, self.ui_channel, self.event_queue)
288
289 _, error = self.connection.connection.runCommand(["setFeatures", featureset])
290 if error:
291 logger.error("Unable to set the cooker to the correct featureset: %s" % error)
292 raise BaseException(error)
293 signal.signal(signal.SIGTERM, lambda i, s: self.connection.sigterm_terminate())
294 return self.connection