blob: affccd9b3b2733a17e9280324160ceb15d8ec1aa [file] [log] [blame]
Patrick Williamsc124f4f2015-09-15 14:41:29 -05001import os,sys,logging
2import signal, time
3from SimpleXMLRPCServer import SimpleXMLRPCServer, SimpleXMLRPCRequestHandler
4import threading
5import Queue
Patrick Williamsf1e5d692016-03-30 15:21:19 -05006import socket
Patrick Williamsd8c66bc2016-06-20 12:57:21 -05007import StringIO
Patrick Williamsc124f4f2015-09-15 14:41:29 -05008
9try:
10 import sqlite3
11except ImportError:
12 from pysqlite2 import dbapi2 as sqlite3
13
14import bb.server.xmlrpc
15import prserv
16import prserv.db
17import errno
18
19logger = logging.getLogger("BitBake.PRserv")
20
21if sys.hexversion < 0x020600F0:
22 print("Sorry, python 2.6 or later is required.")
23 sys.exit(1)
24
25class Handler(SimpleXMLRPCRequestHandler):
26 def _dispatch(self,method,params):
27 try:
28 value=self.server.funcs[method](*params)
29 except:
30 import traceback
31 traceback.print_exc()
32 raise
33 return value
34
35PIDPREFIX = "/tmp/PRServer_%s_%s.pid"
36singleton = None
37
38
39class PRServer(SimpleXMLRPCServer):
40 def __init__(self, dbfile, logfile, interface, daemon=True):
41 ''' constructor '''
Patrick Williamsc124f4f2015-09-15 14:41:29 -050042 try:
43 SimpleXMLRPCServer.__init__(self, interface,
44 logRequests=False, allow_none=True)
45 except socket.error:
46 ip=socket.gethostbyname(interface[0])
47 port=interface[1]
48 msg="PR Server unable to bind to %s:%s\n" % (ip, port)
49 sys.stderr.write(msg)
50 raise PRServiceConfigError
51
52 self.dbfile=dbfile
53 self.daemon=daemon
54 self.logfile=logfile
55 self.working_thread=None
56 self.host, self.port = self.socket.getsockname()
57 self.pidfile=PIDPREFIX % (self.host, self.port)
58
59 self.register_function(self.getPR, "getPR")
60 self.register_function(self.quit, "quit")
61 self.register_function(self.ping, "ping")
62 self.register_function(self.export, "export")
Patrick Williamsd8c66bc2016-06-20 12:57:21 -050063 self.register_function(self.dump_db, "dump_db")
Patrick Williamsc124f4f2015-09-15 14:41:29 -050064 self.register_function(self.importone, "importone")
65 self.register_introspection_functions()
66
Patrick Williamsc124f4f2015-09-15 14:41:29 -050067 self.requestqueue = Queue.Queue()
68 self.handlerthread = threading.Thread(target = self.process_request_thread)
69 self.handlerthread.daemon = False
70
71 def process_request_thread(self):
72 """Same as in BaseServer but as a thread.
73
74 In addition, exception handling is done here.
75
76 """
77 iter_count = 1
78 # 60 iterations between syncs or sync if dirty every ~30 seconds
79 iterations_between_sync = 60
80
Patrick Williamsd8c66bc2016-06-20 12:57:21 -050081 bb.utils.set_process_name("PRServ Handler")
82
Patrick Williamsc124f4f2015-09-15 14:41:29 -050083 while not self.quit:
84 try:
85 (request, client_address) = self.requestqueue.get(True, 30)
86 except Queue.Empty:
87 self.table.sync_if_dirty()
88 continue
89 try:
90 self.finish_request(request, client_address)
91 self.shutdown_request(request)
92 iter_count = (iter_count + 1) % iterations_between_sync
93 if iter_count == 0:
94 self.table.sync_if_dirty()
95 except:
96 self.handle_error(request, client_address)
97 self.shutdown_request(request)
98 self.table.sync()
99 self.table.sync_if_dirty()
100
101 def sigint_handler(self, signum, stack):
Patrick Williamsd8c66bc2016-06-20 12:57:21 -0500102 if self.table:
103 self.table.sync()
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500104
105 def sigterm_handler(self, signum, stack):
Patrick Williamsd8c66bc2016-06-20 12:57:21 -0500106 if self.table:
107 self.table.sync()
108 self.quit=True
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500109
110 def process_request(self, request, client_address):
111 self.requestqueue.put((request, client_address))
112
113 def export(self, version=None, pkgarch=None, checksum=None, colinfo=True):
114 try:
115 return self.table.export(version, pkgarch, checksum, colinfo)
116 except sqlite3.Error as exc:
117 logger.error(str(exc))
118 return None
119
Patrick Williamsd8c66bc2016-06-20 12:57:21 -0500120 def dump_db(self):
121 """
122 Returns a script (string) that reconstructs the state of the
123 entire database at the time this function is called. The script
124 language is defined by the backing database engine, which is a
125 function of server configuration.
126 Returns None if the database engine does not support dumping to
127 script or if some other error is encountered in processing.
128 """
129 buff = StringIO.StringIO()
130 try:
131 self.table.sync()
132 self.table.dump_db(buff)
133 return buff.getvalue()
134 except Exception as exc:
135 logger.error(str(exc))
136 return None
137 finally:
138 buff.close()
139
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500140 def importone(self, version, pkgarch, checksum, value):
141 return self.table.importone(version, pkgarch, checksum, value)
142
143 def ping(self):
144 return not self.quit
145
146 def getinfo(self):
147 return (self.host, self.port)
148
149 def getPR(self, version, pkgarch, checksum):
150 try:
151 return self.table.getValue(version, pkgarch, checksum)
152 except prserv.NotFoundError:
153 logger.error("can not find value for (%s, %s)",version, checksum)
154 return None
155 except sqlite3.Error as exc:
156 logger.error(str(exc))
157 return None
158
159 def quit(self):
160 self.quit=True
161 return
162
163 def work_forever(self,):
164 self.quit = False
165 self.timeout = 0.5
166
Patrick Williamsd8c66bc2016-06-20 12:57:21 -0500167 bb.utils.set_process_name("PRServ")
168
169 # DB connection must be created after all forks
170 self.db = prserv.db.PRData(self.dbfile)
171 self.table = self.db["PRMAIN"]
172
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500173 logger.info("Started PRServer with DBfile: %s, IP: %s, PORT: %s, PID: %s" %
174 (self.dbfile, self.host, self.port, str(os.getpid())))
175
176 self.handlerthread.start()
177 while not self.quit:
178 self.handle_request()
179 self.handlerthread.join()
Patrick Williamsf1e5d692016-03-30 15:21:19 -0500180 self.db.disconnect()
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500181 logger.info("PRServer: stopping...")
182 self.server_close()
183 return
184
185 def start(self):
186 if self.daemon:
187 pid = self.daemonize()
188 else:
189 pid = self.fork()
190
191 # Ensure both the parent sees this and the child from the work_forever log entry above
192 logger.info("Started PRServer with DBfile: %s, IP: %s, PORT: %s, PID: %s" %
193 (self.dbfile, self.host, self.port, str(pid)))
194
195 def delpid(self):
196 os.remove(self.pidfile)
197
198 def daemonize(self):
199 """
200 See Advanced Programming in the UNIX, Sec 13.3
201 """
202 try:
203 pid = os.fork()
204 if pid > 0:
205 os.waitpid(pid, 0)
206 #parent return instead of exit to give control
207 return pid
208 except OSError as e:
209 raise Exception("%s [%d]" % (e.strerror, e.errno))
210
211 os.setsid()
212 """
213 fork again to make sure the daemon is not session leader,
214 which prevents it from acquiring controlling terminal
215 """
216 try:
217 pid = os.fork()
218 if pid > 0: #parent
219 os._exit(0)
220 except OSError as e:
221 raise Exception("%s [%d]" % (e.strerror, e.errno))
222
223 self.cleanup_handles()
224 os._exit(0)
225
226 def fork(self):
227 try:
228 pid = os.fork()
229 if pid > 0:
230 return pid
231 except OSError as e:
232 raise Exception("%s [%d]" % (e.strerror, e.errno))
233
234 bb.utils.signal_on_parent_exit("SIGTERM")
235 self.cleanup_handles()
236 os._exit(0)
237
238 def cleanup_handles(self):
239 signal.signal(signal.SIGINT, self.sigint_handler)
240 signal.signal(signal.SIGTERM, self.sigterm_handler)
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500241 os.chdir("/")
242
243 sys.stdout.flush()
244 sys.stderr.flush()
245 si = file('/dev/null', 'r')
246 so = file(self.logfile, 'a+')
247 se = so
248 os.dup2(si.fileno(),sys.stdin.fileno())
249 os.dup2(so.fileno(),sys.stdout.fileno())
250 os.dup2(se.fileno(),sys.stderr.fileno())
251
252 # Clear out all log handlers prior to the fork() to avoid calling
253 # event handlers not part of the PRserver
254 for logger_iter in logging.Logger.manager.loggerDict.keys():
255 logging.getLogger(logger_iter).handlers = []
256
257 # Ensure logging makes it to the logfile
258 streamhandler = logging.StreamHandler()
259 streamhandler.setLevel(logging.DEBUG)
260 formatter = bb.msg.BBLogFormatter("%(levelname)s: %(message)s")
261 streamhandler.setFormatter(formatter)
262 logger.addHandler(streamhandler)
263
264 # write pidfile
265 pid = str(os.getpid())
266 pf = file(self.pidfile, 'w')
267 pf.write("%s\n" % pid)
268 pf.close()
269
270 self.work_forever()
271 self.delpid()
272
273class PRServSingleton(object):
274 def __init__(self, dbfile, logfile, interface):
275 self.dbfile = dbfile
276 self.logfile = logfile
277 self.interface = interface
278 self.host = None
279 self.port = None
280
281 def start(self):
282 self.prserv = PRServer(self.dbfile, self.logfile, self.interface, daemon=False)
283 self.prserv.start()
284 self.host, self.port = self.prserv.getinfo()
285
286 def getinfo(self):
287 return (self.host, self.port)
288
289class PRServerConnection(object):
290 def __init__(self, host, port):
291 if is_local_special(host, port):
292 host, port = singleton.getinfo()
293 self.host = host
294 self.port = port
295 self.connection, self.transport = bb.server.xmlrpc._create_server(self.host, self.port)
296
297 def terminate(self):
298 try:
299 logger.info("Terminating PRServer...")
300 self.connection.quit()
301 except Exception as exc:
302 sys.stderr.write("%s\n" % str(exc))
303
304 def getPR(self, version, pkgarch, checksum):
305 return self.connection.getPR(version, pkgarch, checksum)
306
307 def ping(self):
308 return self.connection.ping()
309
310 def export(self,version=None, pkgarch=None, checksum=None, colinfo=True):
311 return self.connection.export(version, pkgarch, checksum, colinfo)
312
Patrick Williamsd8c66bc2016-06-20 12:57:21 -0500313 def dump_db(self):
314 return self.connection.dump_db()
315
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500316 def importone(self, version, pkgarch, checksum, value):
317 return self.connection.importone(version, pkgarch, checksum, value)
318
319 def getinfo(self):
320 return self.host, self.port
321
322def start_daemon(dbfile, host, port, logfile):
Patrick Williamsf1e5d692016-03-30 15:21:19 -0500323 ip = socket.gethostbyname(host)
324 pidfile = PIDPREFIX % (ip, port)
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500325 try:
326 pf = file(pidfile,'r')
327 pid = int(pf.readline().strip())
328 pf.close()
329 except IOError:
330 pid = None
331
332 if pid:
333 sys.stderr.write("pidfile %s already exist. Daemon already running?\n"
334 % pidfile)
335 return 1
336
Patrick Williamsf1e5d692016-03-30 15:21:19 -0500337 server = PRServer(os.path.abspath(dbfile), os.path.abspath(logfile), (ip,port))
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500338 server.start()
Patrick Williamsf1e5d692016-03-30 15:21:19 -0500339
340 # Sometimes, the port (i.e. localhost:0) indicated by the user does not match with
341 # the one the server actually is listening, so at least warn the user about it
342 _,rport = server.getinfo()
343 if port != rport:
344 sys.stdout.write("Server is listening at port %s instead of %s\n"
345 % (rport,port))
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500346 return 0
347
348def stop_daemon(host, port):
Patrick Williamsf1e5d692016-03-30 15:21:19 -0500349 import glob
350 ip = socket.gethostbyname(host)
351 pidfile = PIDPREFIX % (ip, port)
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500352 try:
353 pf = file(pidfile,'r')
354 pid = int(pf.readline().strip())
355 pf.close()
356 except IOError:
357 pid = None
358
359 if not pid:
Patrick Williamsf1e5d692016-03-30 15:21:19 -0500360 # when server starts at port=0 (i.e. localhost:0), server actually takes another port,
361 # so at least advise the user which ports the corresponding server is listening
362 ports = []
363 portstr = ""
364 for pf in glob.glob(PIDPREFIX % (ip,'*')):
365 bn = os.path.basename(pf)
366 root, _ = os.path.splitext(bn)
367 ports.append(root.split('_')[-1])
368 if len(ports):
369 portstr = "Wrong port? Other ports listening at %s: %s" % (host, ' '.join(ports))
370
371 sys.stderr.write("pidfile %s does not exist. Daemon not running? %s\n"
372 % (pidfile,portstr))
373 return 1
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500374
375 try:
Patrick Williamsf1e5d692016-03-30 15:21:19 -0500376 PRServerConnection(ip, port).terminate()
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500377 except:
378 logger.critical("Stop PRService %s:%d failed" % (host,port))
379
380 try:
381 if pid:
382 wait_timeout = 0
383 print("Waiting for pr-server to exit.")
384 while is_running(pid) and wait_timeout < 50:
385 time.sleep(0.1)
386 wait_timeout += 1
387
388 if is_running(pid):
389 print("Sending SIGTERM to pr-server.")
390 os.kill(pid,signal.SIGTERM)
391 time.sleep(0.1)
392
393 if os.path.exists(pidfile):
394 os.remove(pidfile)
395
396 except OSError as e:
397 err = str(e)
398 if err.find("No such process") <= 0:
399 raise e
400
401 return 0
402
403def is_running(pid):
404 try:
405 os.kill(pid, 0)
406 except OSError as err:
407 if err.errno == errno.ESRCH:
408 return False
409 return True
410
411def is_local_special(host, port):
412 if host.strip().upper() == 'localhost'.upper() and (not port):
413 return True
414 else:
415 return False
416
417class PRServiceConfigError(Exception):
418 pass
419
420def auto_start(d):
421 global singleton
422
423 host_params = filter(None, (d.getVar('PRSERV_HOST', True) or '').split(':'))
424 if not host_params:
425 return None
426
427 if len(host_params) != 2:
428 logger.critical('\n'.join(['PRSERV_HOST: incorrect format',
429 'Usage: PRSERV_HOST = "<hostname>:<port>"']))
430 raise PRServiceConfigError
431
432 if is_local_special(host_params[0], int(host_params[1])) and not singleton:
433 import bb.utils
434 cachedir = (d.getVar("PERSISTENT_DIR", True) or d.getVar("CACHE", True))
435 if not cachedir:
436 logger.critical("Please set the 'PERSISTENT_DIR' or 'CACHE' variable")
437 raise PRServiceConfigError
438 bb.utils.mkdirhier(cachedir)
439 dbfile = os.path.join(cachedir, "prserv.sqlite3")
440 logfile = os.path.join(cachedir, "prserv.log")
441 singleton = PRServSingleton(os.path.abspath(dbfile), os.path.abspath(logfile), ("localhost",0))
442 singleton.start()
443 if singleton:
444 host, port = singleton.getinfo()
445 else:
446 host = host_params[0]
447 port = int(host_params[1])
448
449 try:
450 connection = PRServerConnection(host,port)
451 connection.ping()
452 realhost, realport = connection.getinfo()
453 return str(realhost) + ":" + str(realport)
454
455 except Exception:
456 logger.critical("PRservice %s:%d not available" % (host, port))
457 raise PRServiceConfigError
458
459def auto_shutdown(d=None):
460 global singleton
461 if singleton:
462 host, port = singleton.getinfo()
463 try:
464 PRServerConnection(host, port).terminate()
465 except:
466 logger.critical("Stop PRService %s:%d failed" % (host,port))
467 singleton = None
468
469def ping(host, port):
470 conn=PRServerConnection(host, port)
471 return conn.ping()