blob: 5fca3508b1055d61e14674ad80b2029ed60cd052 [file] [log] [blame]
Patrick Williamsc124f4f2015-09-15 14:41:29 -05001#
2# BitBake Process based server.
3#
4# Copyright (C) 2010 Bob Foerster <robert@erafx.com>
5#
6# This program is free software; you can redistribute it and/or modify
7# it under the terms of the GNU General Public License version 2 as
8# published by the Free Software Foundation.
9#
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License along
16# with this program; if not, write to the Free Software Foundation, Inc.,
17# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18
19"""
20 This module implements a multiprocessing.Process based server for bitbake.
21"""
22
23import bb
24import bb.event
25import itertools
26import logging
27import multiprocessing
28import os
29import signal
30import sys
31import time
32import select
33from Queue import Empty
34from multiprocessing import Event, Process, util, Queue, Pipe, queues, Manager
35
36from . import BitBakeBaseServer, BitBakeBaseServerConnection, BaseImplServer
37
38logger = logging.getLogger('BitBake')
39
40class ServerCommunicator():
41 def __init__(self, connection, event_handle, server):
42 self.connection = connection
43 self.event_handle = event_handle
44 self.server = server
45
46 def runCommand(self, command):
47 # @todo try/except
48 self.connection.send(command)
49
50 if not self.server.is_alive():
51 raise SystemExit
52
53 while True:
54 # don't let the user ctrl-c while we're waiting for a response
55 try:
56 if self.connection.poll(20):
57 return self.connection.recv()
58 else:
59 bb.fatal("Timeout while attempting to communicate with bitbake server")
60 except KeyboardInterrupt:
61 pass
62
63 def getEventHandle(self):
64 return self.event_handle.value
65
66class EventAdapter():
67 """
68 Adapter to wrap our event queue since the caller (bb.event) expects to
69 call a send() method, but our actual queue only has put()
70 """
71 def __init__(self, queue):
72 self.queue = queue
73
74 def send(self, event):
75 try:
76 self.queue.put(event)
77 except Exception as err:
78 print("EventAdapter puked: %s" % str(err))
79
80
81class ProcessServer(Process, BaseImplServer):
82 profile_filename = "profile.log"
83 profile_processed_filename = "profile.log.processed"
84
85 def __init__(self, command_channel, event_queue, featurelist):
86 BaseImplServer.__init__(self)
87 Process.__init__(self)
88 self.command_channel = command_channel
89 self.event_queue = event_queue
90 self.event = EventAdapter(event_queue)
91 self.featurelist = featurelist
92 self.quit = False
93
94 self.quitin, self.quitout = Pipe()
95 self.event_handle = multiprocessing.Value("i")
96
97 def run(self):
98 for event in bb.event.ui_queue:
99 self.event_queue.put(event)
100 self.event_handle.value = bb.event.register_UIHhandler(self, True)
101
102 bb.cooker.server_main(self.cooker, self.main)
103
104 def main(self):
105 # Ignore SIGINT within the server, as all SIGINT handling is done by
106 # the UI and communicated to us
107 self.quitin.close()
108 signal.signal(signal.SIGINT, signal.SIG_IGN)
109 while not self.quit:
110 try:
111 if self.command_channel.poll():
112 command = self.command_channel.recv()
113 self.runCommand(command)
114 if self.quitout.poll():
115 self.quitout.recv()
116 self.quit = True
117 try:
118 self.runCommand(["stateForceShutdown"])
119 except:
120 pass
121
122 self.idle_commands(.1, [self.command_channel, self.quitout])
123 except Exception:
124 logger.exception('Running command %s', command)
125
126 self.event_queue.close()
127 bb.event.unregister_UIHhandler(self.event_handle.value)
128 self.command_channel.close()
129 self.cooker.shutdown(True)
130 self.quitout.close()
131
132 def idle_commands(self, delay, fds=None):
133 nextsleep = delay
134 if not fds:
135 fds = []
136
137 for function, data in self._idlefuns.items():
138 try:
139 retval = function(self, data, False)
140 if retval is False:
141 del self._idlefuns[function]
142 nextsleep = None
143 elif retval is True:
144 nextsleep = None
145 elif isinstance(retval, float):
146 if (retval < nextsleep):
147 nextsleep = retval
148 elif nextsleep is None:
149 continue
150 else:
151 fds = fds + retval
152 except SystemExit:
153 raise
154 except Exception as exc:
155 if not isinstance(exc, bb.BBHandledException):
156 logger.exception('Running idle function')
157 del self._idlefuns[function]
158 self.quit = True
159
160 if nextsleep is not None:
161 select.select(fds,[],[],nextsleep)
162
163 def runCommand(self, command):
164 """
165 Run a cooker command on the server
166 """
167 self.command_channel.send(self.cooker.command.runCommand(command))
168
169 def stop(self):
170 self.quitin.send("quit")
171 self.quitin.close()
172
173class BitBakeProcessServerConnection(BitBakeBaseServerConnection):
174 def __init__(self, serverImpl, ui_channel, event_queue):
175 self.procserver = serverImpl
176 self.ui_channel = ui_channel
177 self.event_queue = event_queue
178 self.connection = ServerCommunicator(self.ui_channel, self.procserver.event_handle, self.procserver)
179 self.events = self.event_queue
180 self.terminated = False
181
182 def sigterm_terminate(self):
183 bb.error("UI received SIGTERM")
184 self.terminate()
185
186 def terminate(self):
187 if self.terminated:
188 return
189 self.terminated = True
190 def flushevents():
191 while True:
192 try:
193 event = self.event_queue.get(block=False)
194 except (Empty, IOError):
195 break
196 if isinstance(event, logging.LogRecord):
197 logger.handle(event)
198
199 signal.signal(signal.SIGINT, signal.SIG_IGN)
200 self.procserver.stop()
201
202 while self.procserver.is_alive():
203 flushevents()
204 self.procserver.join(0.1)
205
206 self.ui_channel.close()
207 self.event_queue.close()
208 self.event_queue.setexit()
209
210# Wrap Queue to provide API which isn't server implementation specific
211class ProcessEventQueue(multiprocessing.queues.Queue):
212 def __init__(self, maxsize):
213 multiprocessing.queues.Queue.__init__(self, maxsize)
214 self.exit = False
215
216 def setexit(self):
217 self.exit = True
218
219 def waitEvent(self, timeout):
220 if self.exit:
221 sys.exit(1)
222 try:
223 if not self.server.is_alive():
224 self.setexit()
225 return None
226 return self.get(True, timeout)
227 except Empty:
228 return None
229
230 def getEvent(self):
231 try:
232 if not self.server.is_alive():
233 self.setexit()
234 return None
235 return self.get(False)
236 except Empty:
237 return None
238
239
240class BitBakeServer(BitBakeBaseServer):
241 def initServer(self):
242 # establish communication channels. We use bidirectional pipes for
243 # ui <--> server command/response pairs
244 # and a queue for server -> ui event notifications
245 #
246 self.ui_channel, self.server_channel = Pipe()
247 self.event_queue = ProcessEventQueue(0)
248 self.serverImpl = ProcessServer(self.server_channel, self.event_queue, None)
249 self.event_queue.server = self.serverImpl
250
251 def detach(self):
252 self.serverImpl.start()
253 return
254
255 def establishConnection(self, featureset):
256
257 self.connection = BitBakeProcessServerConnection(self.serverImpl, self.ui_channel, self.event_queue)
258
259 _, error = self.connection.connection.runCommand(["setFeatures", featureset])
260 if error:
261 logger.error("Unable to set the cooker to the correct featureset: %s" % error)
262 raise BaseException(error)
263 signal.signal(signal.SIGTERM, lambda i, s: self.connection.sigterm_terminate())
264 return self.connection