blob: eafc3aab7bbbace17c1e963dd8606388c565a787 [file] [log] [blame]
Patrick Williamsc124f4f2015-09-15 14:41:29 -05001import os,sys,logging
2import signal, time
3from SimpleXMLRPCServer import SimpleXMLRPCServer, SimpleXMLRPCRequestHandler
4import threading
5import Queue
Patrick Williamsf1e5d692016-03-30 15:21:19 -05006import socket
Patrick Williamsc124f4f2015-09-15 14:41:29 -05007
8try:
9 import sqlite3
10except ImportError:
11 from pysqlite2 import dbapi2 as sqlite3
12
13import bb.server.xmlrpc
14import prserv
15import prserv.db
16import errno
17
18logger = logging.getLogger("BitBake.PRserv")
19
20if sys.hexversion < 0x020600F0:
21 print("Sorry, python 2.6 or later is required.")
22 sys.exit(1)
23
24class Handler(SimpleXMLRPCRequestHandler):
25 def _dispatch(self,method,params):
26 try:
27 value=self.server.funcs[method](*params)
28 except:
29 import traceback
30 traceback.print_exc()
31 raise
32 return value
33
34PIDPREFIX = "/tmp/PRServer_%s_%s.pid"
35singleton = None
36
37
38class PRServer(SimpleXMLRPCServer):
39 def __init__(self, dbfile, logfile, interface, daemon=True):
40 ''' constructor '''
Patrick Williamsc124f4f2015-09-15 14:41:29 -050041 try:
42 SimpleXMLRPCServer.__init__(self, interface,
43 logRequests=False, allow_none=True)
44 except socket.error:
45 ip=socket.gethostbyname(interface[0])
46 port=interface[1]
47 msg="PR Server unable to bind to %s:%s\n" % (ip, port)
48 sys.stderr.write(msg)
49 raise PRServiceConfigError
50
51 self.dbfile=dbfile
52 self.daemon=daemon
53 self.logfile=logfile
54 self.working_thread=None
55 self.host, self.port = self.socket.getsockname()
56 self.pidfile=PIDPREFIX % (self.host, self.port)
57
58 self.register_function(self.getPR, "getPR")
59 self.register_function(self.quit, "quit")
60 self.register_function(self.ping, "ping")
61 self.register_function(self.export, "export")
62 self.register_function(self.importone, "importone")
63 self.register_introspection_functions()
64
65 self.db = prserv.db.PRData(self.dbfile)
66 self.table = self.db["PRMAIN"]
67
68 self.requestqueue = Queue.Queue()
69 self.handlerthread = threading.Thread(target = self.process_request_thread)
70 self.handlerthread.daemon = False
71
72 def process_request_thread(self):
73 """Same as in BaseServer but as a thread.
74
75 In addition, exception handling is done here.
76
77 """
78 iter_count = 1
79 # 60 iterations between syncs or sync if dirty every ~30 seconds
80 iterations_between_sync = 60
81
82 while not self.quit:
83 try:
84 (request, client_address) = self.requestqueue.get(True, 30)
85 except Queue.Empty:
86 self.table.sync_if_dirty()
87 continue
88 try:
89 self.finish_request(request, client_address)
90 self.shutdown_request(request)
91 iter_count = (iter_count + 1) % iterations_between_sync
92 if iter_count == 0:
93 self.table.sync_if_dirty()
94 except:
95 self.handle_error(request, client_address)
96 self.shutdown_request(request)
97 self.table.sync()
98 self.table.sync_if_dirty()
99
100 def sigint_handler(self, signum, stack):
101 self.table.sync()
102
103 def sigterm_handler(self, signum, stack):
104 self.table.sync()
105 raise SystemExit
106
107 def process_request(self, request, client_address):
108 self.requestqueue.put((request, client_address))
109
110 def export(self, version=None, pkgarch=None, checksum=None, colinfo=True):
111 try:
112 return self.table.export(version, pkgarch, checksum, colinfo)
113 except sqlite3.Error as exc:
114 logger.error(str(exc))
115 return None
116
117 def importone(self, version, pkgarch, checksum, value):
118 return self.table.importone(version, pkgarch, checksum, value)
119
120 def ping(self):
121 return not self.quit
122
123 def getinfo(self):
124 return (self.host, self.port)
125
126 def getPR(self, version, pkgarch, checksum):
127 try:
128 return self.table.getValue(version, pkgarch, checksum)
129 except prserv.NotFoundError:
130 logger.error("can not find value for (%s, %s)",version, checksum)
131 return None
132 except sqlite3.Error as exc:
133 logger.error(str(exc))
134 return None
135
136 def quit(self):
137 self.quit=True
138 return
139
140 def work_forever(self,):
141 self.quit = False
142 self.timeout = 0.5
143
144 logger.info("Started PRServer with DBfile: %s, IP: %s, PORT: %s, PID: %s" %
145 (self.dbfile, self.host, self.port, str(os.getpid())))
146
147 self.handlerthread.start()
148 while not self.quit:
149 self.handle_request()
150 self.handlerthread.join()
Patrick Williamsf1e5d692016-03-30 15:21:19 -0500151 self.db.disconnect()
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500152 logger.info("PRServer: stopping...")
153 self.server_close()
154 return
155
156 def start(self):
157 if self.daemon:
158 pid = self.daemonize()
159 else:
160 pid = self.fork()
161
162 # Ensure both the parent sees this and the child from the work_forever log entry above
163 logger.info("Started PRServer with DBfile: %s, IP: %s, PORT: %s, PID: %s" %
164 (self.dbfile, self.host, self.port, str(pid)))
165
166 def delpid(self):
167 os.remove(self.pidfile)
168
169 def daemonize(self):
170 """
171 See Advanced Programming in the UNIX, Sec 13.3
172 """
173 try:
174 pid = os.fork()
175 if pid > 0:
176 os.waitpid(pid, 0)
177 #parent return instead of exit to give control
178 return pid
179 except OSError as e:
180 raise Exception("%s [%d]" % (e.strerror, e.errno))
181
182 os.setsid()
183 """
184 fork again to make sure the daemon is not session leader,
185 which prevents it from acquiring controlling terminal
186 """
187 try:
188 pid = os.fork()
189 if pid > 0: #parent
190 os._exit(0)
191 except OSError as e:
192 raise Exception("%s [%d]" % (e.strerror, e.errno))
193
194 self.cleanup_handles()
195 os._exit(0)
196
197 def fork(self):
198 try:
199 pid = os.fork()
200 if pid > 0:
201 return pid
202 except OSError as e:
203 raise Exception("%s [%d]" % (e.strerror, e.errno))
204
205 bb.utils.signal_on_parent_exit("SIGTERM")
206 self.cleanup_handles()
207 os._exit(0)
208
209 def cleanup_handles(self):
210 signal.signal(signal.SIGINT, self.sigint_handler)
211 signal.signal(signal.SIGTERM, self.sigterm_handler)
212 os.umask(0)
213 os.chdir("/")
214
215 sys.stdout.flush()
216 sys.stderr.flush()
217 si = file('/dev/null', 'r')
218 so = file(self.logfile, 'a+')
219 se = so
220 os.dup2(si.fileno(),sys.stdin.fileno())
221 os.dup2(so.fileno(),sys.stdout.fileno())
222 os.dup2(se.fileno(),sys.stderr.fileno())
223
224 # Clear out all log handlers prior to the fork() to avoid calling
225 # event handlers not part of the PRserver
226 for logger_iter in logging.Logger.manager.loggerDict.keys():
227 logging.getLogger(logger_iter).handlers = []
228
229 # Ensure logging makes it to the logfile
230 streamhandler = logging.StreamHandler()
231 streamhandler.setLevel(logging.DEBUG)
232 formatter = bb.msg.BBLogFormatter("%(levelname)s: %(message)s")
233 streamhandler.setFormatter(formatter)
234 logger.addHandler(streamhandler)
235
236 # write pidfile
237 pid = str(os.getpid())
238 pf = file(self.pidfile, 'w')
239 pf.write("%s\n" % pid)
240 pf.close()
241
242 self.work_forever()
243 self.delpid()
244
245class PRServSingleton(object):
246 def __init__(self, dbfile, logfile, interface):
247 self.dbfile = dbfile
248 self.logfile = logfile
249 self.interface = interface
250 self.host = None
251 self.port = None
252
253 def start(self):
254 self.prserv = PRServer(self.dbfile, self.logfile, self.interface, daemon=False)
255 self.prserv.start()
256 self.host, self.port = self.prserv.getinfo()
257
258 def getinfo(self):
259 return (self.host, self.port)
260
261class PRServerConnection(object):
262 def __init__(self, host, port):
263 if is_local_special(host, port):
264 host, port = singleton.getinfo()
265 self.host = host
266 self.port = port
267 self.connection, self.transport = bb.server.xmlrpc._create_server(self.host, self.port)
268
269 def terminate(self):
270 try:
271 logger.info("Terminating PRServer...")
272 self.connection.quit()
273 except Exception as exc:
274 sys.stderr.write("%s\n" % str(exc))
275
276 def getPR(self, version, pkgarch, checksum):
277 return self.connection.getPR(version, pkgarch, checksum)
278
279 def ping(self):
280 return self.connection.ping()
281
282 def export(self,version=None, pkgarch=None, checksum=None, colinfo=True):
283 return self.connection.export(version, pkgarch, checksum, colinfo)
284
285 def importone(self, version, pkgarch, checksum, value):
286 return self.connection.importone(version, pkgarch, checksum, value)
287
288 def getinfo(self):
289 return self.host, self.port
290
291def start_daemon(dbfile, host, port, logfile):
Patrick Williamsf1e5d692016-03-30 15:21:19 -0500292 ip = socket.gethostbyname(host)
293 pidfile = PIDPREFIX % (ip, port)
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500294 try:
295 pf = file(pidfile,'r')
296 pid = int(pf.readline().strip())
297 pf.close()
298 except IOError:
299 pid = None
300
301 if pid:
302 sys.stderr.write("pidfile %s already exist. Daemon already running?\n"
303 % pidfile)
304 return 1
305
Patrick Williamsf1e5d692016-03-30 15:21:19 -0500306 server = PRServer(os.path.abspath(dbfile), os.path.abspath(logfile), (ip,port))
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500307 server.start()
Patrick Williamsf1e5d692016-03-30 15:21:19 -0500308
309 # Sometimes, the port (i.e. localhost:0) indicated by the user does not match with
310 # the one the server actually is listening, so at least warn the user about it
311 _,rport = server.getinfo()
312 if port != rport:
313 sys.stdout.write("Server is listening at port %s instead of %s\n"
314 % (rport,port))
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500315 return 0
316
317def stop_daemon(host, port):
Patrick Williamsf1e5d692016-03-30 15:21:19 -0500318 import glob
319 ip = socket.gethostbyname(host)
320 pidfile = PIDPREFIX % (ip, port)
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500321 try:
322 pf = file(pidfile,'r')
323 pid = int(pf.readline().strip())
324 pf.close()
325 except IOError:
326 pid = None
327
328 if not pid:
Patrick Williamsf1e5d692016-03-30 15:21:19 -0500329 # when server starts at port=0 (i.e. localhost:0), server actually takes another port,
330 # so at least advise the user which ports the corresponding server is listening
331 ports = []
332 portstr = ""
333 for pf in glob.glob(PIDPREFIX % (ip,'*')):
334 bn = os.path.basename(pf)
335 root, _ = os.path.splitext(bn)
336 ports.append(root.split('_')[-1])
337 if len(ports):
338 portstr = "Wrong port? Other ports listening at %s: %s" % (host, ' '.join(ports))
339
340 sys.stderr.write("pidfile %s does not exist. Daemon not running? %s\n"
341 % (pidfile,portstr))
342 return 1
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500343
344 try:
Patrick Williamsf1e5d692016-03-30 15:21:19 -0500345 PRServerConnection(ip, port).terminate()
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500346 except:
347 logger.critical("Stop PRService %s:%d failed" % (host,port))
348
349 try:
350 if pid:
351 wait_timeout = 0
352 print("Waiting for pr-server to exit.")
353 while is_running(pid) and wait_timeout < 50:
354 time.sleep(0.1)
355 wait_timeout += 1
356
357 if is_running(pid):
358 print("Sending SIGTERM to pr-server.")
359 os.kill(pid,signal.SIGTERM)
360 time.sleep(0.1)
361
362 if os.path.exists(pidfile):
363 os.remove(pidfile)
364
365 except OSError as e:
366 err = str(e)
367 if err.find("No such process") <= 0:
368 raise e
369
370 return 0
371
372def is_running(pid):
373 try:
374 os.kill(pid, 0)
375 except OSError as err:
376 if err.errno == errno.ESRCH:
377 return False
378 return True
379
380def is_local_special(host, port):
381 if host.strip().upper() == 'localhost'.upper() and (not port):
382 return True
383 else:
384 return False
385
386class PRServiceConfigError(Exception):
387 pass
388
389def auto_start(d):
390 global singleton
391
392 host_params = filter(None, (d.getVar('PRSERV_HOST', True) or '').split(':'))
393 if not host_params:
394 return None
395
396 if len(host_params) != 2:
397 logger.critical('\n'.join(['PRSERV_HOST: incorrect format',
398 'Usage: PRSERV_HOST = "<hostname>:<port>"']))
399 raise PRServiceConfigError
400
401 if is_local_special(host_params[0], int(host_params[1])) and not singleton:
402 import bb.utils
403 cachedir = (d.getVar("PERSISTENT_DIR", True) or d.getVar("CACHE", True))
404 if not cachedir:
405 logger.critical("Please set the 'PERSISTENT_DIR' or 'CACHE' variable")
406 raise PRServiceConfigError
407 bb.utils.mkdirhier(cachedir)
408 dbfile = os.path.join(cachedir, "prserv.sqlite3")
409 logfile = os.path.join(cachedir, "prserv.log")
410 singleton = PRServSingleton(os.path.abspath(dbfile), os.path.abspath(logfile), ("localhost",0))
411 singleton.start()
412 if singleton:
413 host, port = singleton.getinfo()
414 else:
415 host = host_params[0]
416 port = int(host_params[1])
417
418 try:
419 connection = PRServerConnection(host,port)
420 connection.ping()
421 realhost, realport = connection.getinfo()
422 return str(realhost) + ":" + str(realport)
423
424 except Exception:
425 logger.critical("PRservice %s:%d not available" % (host, port))
426 raise PRServiceConfigError
427
428def auto_shutdown(d=None):
429 global singleton
430 if singleton:
431 host, port = singleton.getinfo()
432 try:
433 PRServerConnection(host, port).terminate()
434 except:
435 logger.critical("Stop PRService %s:%d failed" % (host,port))
436 singleton = None
437
438def ping(host, port):
439 conn=PRServerConnection(host, port)
440 return conn.ping()