blob: 3ff4c51ccbff744c60063b96acdd8e44050bb73e [file] [log] [blame]
Brad Bishopa34c0302019-09-23 22:34:48 -04001# Copyright (C) 2019 Garmin Ltd.
2#
3# SPDX-License-Identifier: GPL-2.0-only
4#
5
Andrew Geissler6ce62a22020-11-30 19:58:47 -06006from contextlib import closing, contextmanager
Brad Bishopa34c0302019-09-23 22:34:48 -04007from datetime import datetime
8import asyncio
9import json
10import logging
11import math
12import os
13import signal
14import socket
Andrew Geissler6ce62a22020-11-30 19:58:47 -060015import sys
Brad Bishopa34c0302019-09-23 22:34:48 -040016import time
Andrew Geissler6ce62a22020-11-30 19:58:47 -060017from . import chunkify, DEFAULT_MAX_CHUNK, create_async_client, TABLE_COLUMNS
Brad Bishopa34c0302019-09-23 22:34:48 -040018
19logger = logging.getLogger('hashserv.server')
20
21
22class Measurement(object):
23 def __init__(self, sample):
24 self.sample = sample
25
26 def start(self):
27 self.start_time = time.perf_counter()
28
29 def end(self):
30 self.sample.add(time.perf_counter() - self.start_time)
31
32 def __enter__(self):
33 self.start()
34 return self
35
36 def __exit__(self, *args, **kwargs):
37 self.end()
38
39
40class Sample(object):
41 def __init__(self, stats):
42 self.stats = stats
43 self.num_samples = 0
44 self.elapsed = 0
45
46 def measure(self):
47 return Measurement(self)
48
49 def __enter__(self):
50 return self
51
52 def __exit__(self, *args, **kwargs):
53 self.end()
54
55 def add(self, elapsed):
56 self.num_samples += 1
57 self.elapsed += elapsed
58
59 def end(self):
60 if self.num_samples:
61 self.stats.add(self.elapsed)
62 self.num_samples = 0
63 self.elapsed = 0
64
65
66class Stats(object):
67 def __init__(self):
68 self.reset()
69
70 def reset(self):
71 self.num = 0
72 self.total_time = 0
73 self.max_time = 0
74 self.m = 0
75 self.s = 0
76 self.current_elapsed = None
77
78 def add(self, elapsed):
79 self.num += 1
80 if self.num == 1:
81 self.m = elapsed
82 self.s = 0
83 else:
84 last_m = self.m
85 self.m = last_m + (elapsed - last_m) / self.num
86 self.s = self.s + (elapsed - last_m) * (elapsed - self.m)
87
88 self.total_time += elapsed
89
90 if self.max_time < elapsed:
91 self.max_time = elapsed
92
93 def start_sample(self):
94 return Sample(self)
95
96 @property
97 def average(self):
98 if self.num == 0:
99 return 0
100 return self.total_time / self.num
101
102 @property
103 def stdev(self):
104 if self.num <= 1:
105 return 0
106 return math.sqrt(self.s / (self.num - 1))
107
108 def todict(self):
109 return {k: getattr(self, k) for k in ('num', 'total_time', 'max_time', 'average', 'stdev')}
110
111
Andrew Geissler475cb722020-07-10 16:00:51 -0500112class ClientError(Exception):
113 pass
114
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600115def insert_task(cursor, data, ignore=False):
116 keys = sorted(data.keys())
117 query = '''INSERT%s INTO tasks_v2 (%s) VALUES (%s)''' % (
118 " OR IGNORE" if ignore else "",
119 ', '.join(keys),
120 ', '.join(':' + k for k in keys))
121 cursor.execute(query, data)
122
123async def copy_from_upstream(client, db, method, taskhash):
124 d = await client.get_taskhash(method, taskhash, True)
125 if d is not None:
126 # Filter out unknown columns
127 d = {k: v for k, v in d.items() if k in TABLE_COLUMNS}
128 keys = sorted(d.keys())
129
130
131 with closing(db.cursor()) as cursor:
132 insert_task(cursor, d)
133 db.commit()
134
135 return d
136
Brad Bishopa34c0302019-09-23 22:34:48 -0400137class ServerClient(object):
Andrew Geissler475cb722020-07-10 16:00:51 -0500138 FAST_QUERY = 'SELECT taskhash, method, unihash FROM tasks_v2 WHERE method=:method AND taskhash=:taskhash ORDER BY created ASC LIMIT 1'
139 ALL_QUERY = 'SELECT * FROM tasks_v2 WHERE method=:method AND taskhash=:taskhash ORDER BY created ASC LIMIT 1'
140
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600141 def __init__(self, reader, writer, db, request_stats, backfill_queue, upstream):
Brad Bishopa34c0302019-09-23 22:34:48 -0400142 self.reader = reader
143 self.writer = writer
144 self.db = db
145 self.request_stats = request_stats
Andrew Geissler475cb722020-07-10 16:00:51 -0500146 self.max_chunk = DEFAULT_MAX_CHUNK
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600147 self.backfill_queue = backfill_queue
148 self.upstream = upstream
Andrew Geissler475cb722020-07-10 16:00:51 -0500149
150 self.handlers = {
151 'get': self.handle_get,
152 'report': self.handle_report,
153 'report-equiv': self.handle_equivreport,
154 'get-stream': self.handle_get_stream,
155 'get-stats': self.handle_get_stats,
156 'reset-stats': self.handle_reset_stats,
157 'chunk-stream': self.handle_chunk,
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600158 'backfill-wait': self.handle_backfill_wait,
Andrew Geissler475cb722020-07-10 16:00:51 -0500159 }
Brad Bishopa34c0302019-09-23 22:34:48 -0400160
161 async def process_requests(self):
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600162 if self.upstream is not None:
163 self.upstream_client = await create_async_client(self.upstream)
164 else:
165 self.upstream_client = None
166
Brad Bishopa34c0302019-09-23 22:34:48 -0400167 try:
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600168
169
Brad Bishopa34c0302019-09-23 22:34:48 -0400170 self.addr = self.writer.get_extra_info('peername')
171 logger.debug('Client %r connected' % (self.addr,))
172
173 # Read protocol and version
174 protocol = await self.reader.readline()
175 if protocol is None:
176 return
177
178 (proto_name, proto_version) = protocol.decode('utf-8').rstrip().split()
Andrew Geissler475cb722020-07-10 16:00:51 -0500179 if proto_name != 'OEHASHEQUIV':
180 return
181
182 proto_version = tuple(int(v) for v in proto_version.split('.'))
183 if proto_version < (1, 0) or proto_version > (1, 1):
Brad Bishopa34c0302019-09-23 22:34:48 -0400184 return
185
186 # Read headers. Currently, no headers are implemented, so look for
187 # an empty line to signal the end of the headers
188 while True:
189 line = await self.reader.readline()
190 if line is None:
191 return
192
193 line = line.decode('utf-8').rstrip()
194 if not line:
195 break
196
197 # Handle messages
Brad Bishopa34c0302019-09-23 22:34:48 -0400198 while True:
199 d = await self.read_message()
200 if d is None:
201 break
Andrew Geissler475cb722020-07-10 16:00:51 -0500202 await self.dispatch_message(d)
Brad Bishopa34c0302019-09-23 22:34:48 -0400203 await self.writer.drain()
Andrew Geissler475cb722020-07-10 16:00:51 -0500204 except ClientError as e:
205 logger.error(str(e))
Brad Bishopa34c0302019-09-23 22:34:48 -0400206 finally:
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600207 if self.upstream_client is not None:
208 await self.upstream_client.close()
209
Brad Bishopa34c0302019-09-23 22:34:48 -0400210 self.writer.close()
211
Andrew Geissler475cb722020-07-10 16:00:51 -0500212 async def dispatch_message(self, msg):
213 for k in self.handlers.keys():
214 if k in msg:
215 logger.debug('Handling %s' % k)
216 if 'stream' in k:
217 await self.handlers[k](msg[k])
218 else:
219 with self.request_stats.start_sample() as self.request_sample, \
220 self.request_sample.measure():
221 await self.handlers[k](msg[k])
222 return
223
224 raise ClientError("Unrecognized command %r" % msg)
225
Brad Bishopa34c0302019-09-23 22:34:48 -0400226 def write_message(self, msg):
Andrew Geissler475cb722020-07-10 16:00:51 -0500227 for c in chunkify(json.dumps(msg), self.max_chunk):
228 self.writer.write(c.encode('utf-8'))
Brad Bishopa34c0302019-09-23 22:34:48 -0400229
230 async def read_message(self):
231 l = await self.reader.readline()
232 if not l:
233 return None
234
235 try:
236 message = l.decode('utf-8')
237
238 if not message.endswith('\n'):
239 return None
240
241 return json.loads(message)
242 except (json.JSONDecodeError, UnicodeDecodeError) as e:
243 logger.error('Bad message from client: %r' % message)
244 raise e
245
Andrew Geissler475cb722020-07-10 16:00:51 -0500246 async def handle_chunk(self, request):
247 lines = []
248 try:
249 while True:
250 l = await self.reader.readline()
251 l = l.rstrip(b"\n").decode("utf-8")
252 if not l:
253 break
254 lines.append(l)
255
256 msg = json.loads(''.join(lines))
257 except (json.JSONDecodeError, UnicodeDecodeError) as e:
258 logger.error('Bad message from client: %r' % message)
259 raise e
260
261 if 'chunk-stream' in msg:
262 raise ClientError("Nested chunks are not allowed")
263
264 await self.dispatch_message(msg)
265
Brad Bishopa34c0302019-09-23 22:34:48 -0400266 async def handle_get(self, request):
267 method = request['method']
268 taskhash = request['taskhash']
269
Andrew Geissler475cb722020-07-10 16:00:51 -0500270 if request.get('all', False):
271 row = self.query_equivalent(method, taskhash, self.ALL_QUERY)
272 else:
273 row = self.query_equivalent(method, taskhash, self.FAST_QUERY)
274
Brad Bishopa34c0302019-09-23 22:34:48 -0400275 if row is not None:
276 logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash']))
Andrew Geissler475cb722020-07-10 16:00:51 -0500277 d = {k: row[k] for k in row.keys()}
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600278 elif self.upstream_client is not None:
279 d = await copy_from_upstream(self.upstream_client, self.db, method, taskhash)
Brad Bishopa34c0302019-09-23 22:34:48 -0400280 else:
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600281 d = None
282
283 self.write_message(d)
Brad Bishopa34c0302019-09-23 22:34:48 -0400284
285 async def handle_get_stream(self, request):
286 self.write_message('ok')
287
288 while True:
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600289 upstream = None
290
Brad Bishopa34c0302019-09-23 22:34:48 -0400291 l = await self.reader.readline()
292 if not l:
293 return
294
295 try:
296 # This inner loop is very sensitive and must be as fast as
297 # possible (which is why the request sample is handled manually
298 # instead of using 'with', and also why logging statements are
299 # commented out.
300 self.request_sample = self.request_stats.start_sample()
301 request_measure = self.request_sample.measure()
302 request_measure.start()
303
304 l = l.decode('utf-8').rstrip()
305 if l == 'END':
306 self.writer.write('ok\n'.encode('utf-8'))
307 return
308
309 (method, taskhash) = l.split()
310 #logger.debug('Looking up %s %s' % (method, taskhash))
Andrew Geissler475cb722020-07-10 16:00:51 -0500311 row = self.query_equivalent(method, taskhash, self.FAST_QUERY)
Brad Bishopa34c0302019-09-23 22:34:48 -0400312 if row is not None:
313 msg = ('%s\n' % row['unihash']).encode('utf-8')
314 #logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash']))
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600315 elif self.upstream_client is not None:
316 upstream = await self.upstream_client.get_unihash(method, taskhash)
317 if upstream:
318 msg = ("%s\n" % upstream).encode("utf-8")
319 else:
320 msg = "\n".encode("utf-8")
Brad Bishopa34c0302019-09-23 22:34:48 -0400321 else:
322 msg = '\n'.encode('utf-8')
323
324 self.writer.write(msg)
325 finally:
326 request_measure.end()
327 self.request_sample.end()
328
329 await self.writer.drain()
330
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600331 # Post to the backfill queue after writing the result to minimize
332 # the turn around time on a request
333 if upstream is not None:
334 await self.backfill_queue.put((method, taskhash))
335
Brad Bishopa34c0302019-09-23 22:34:48 -0400336 async def handle_report(self, data):
337 with closing(self.db.cursor()) as cursor:
338 cursor.execute('''
339 -- Find tasks with a matching outhash (that is, tasks that
340 -- are equivalent)
341 SELECT taskhash, method, unihash FROM tasks_v2 WHERE method=:method AND outhash=:outhash
342
343 -- If there is an exact match on the taskhash, return it.
344 -- Otherwise return the oldest matching outhash of any
345 -- taskhash
346 ORDER BY CASE WHEN taskhash=:taskhash THEN 1 ELSE 2 END,
347 created ASC
348
349 -- Only return one row
350 LIMIT 1
351 ''', {k: data[k] for k in ('method', 'outhash', 'taskhash')})
352
353 row = cursor.fetchone()
354
355 # If no matching outhash was found, or one *was* found but it
356 # wasn't an exact match on the taskhash, a new entry for this
357 # taskhash should be added
358 if row is None or row['taskhash'] != data['taskhash']:
359 # If a row matching the outhash was found, the unihash for
360 # the new taskhash should be the same as that one.
361 # Otherwise the caller provided unihash is used.
362 unihash = data['unihash']
363 if row is not None:
364 unihash = row['unihash']
365
366 insert_data = {
367 'method': data['method'],
368 'outhash': data['outhash'],
369 'taskhash': data['taskhash'],
370 'unihash': unihash,
371 'created': datetime.now()
372 }
373
374 for k in ('owner', 'PN', 'PV', 'PR', 'task', 'outhash_siginfo'):
375 if k in data:
376 insert_data[k] = data[k]
377
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600378 insert_task(cursor, insert_data)
Brad Bishopa34c0302019-09-23 22:34:48 -0400379 self.db.commit()
380
381 logger.info('Adding taskhash %s with unihash %s',
382 data['taskhash'], unihash)
383
384 d = {
385 'taskhash': data['taskhash'],
386 'method': data['method'],
387 'unihash': unihash
388 }
389 else:
390 d = {k: row[k] for k in ('taskhash', 'method', 'unihash')}
391
392 self.write_message(d)
393
Andrew Geissler82c905d2020-04-13 13:39:40 -0500394 async def handle_equivreport(self, data):
395 with closing(self.db.cursor()) as cursor:
396 insert_data = {
397 'method': data['method'],
398 'outhash': "",
399 'taskhash': data['taskhash'],
400 'unihash': data['unihash'],
401 'created': datetime.now()
402 }
403
404 for k in ('owner', 'PN', 'PV', 'PR', 'task', 'outhash_siginfo'):
405 if k in data:
406 insert_data[k] = data[k]
407
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600408 insert_task(cursor, insert_data, ignore=True)
Andrew Geissler82c905d2020-04-13 13:39:40 -0500409 self.db.commit()
410
411 # Fetch the unihash that will be reported for the taskhash. If the
412 # unihash matches, it means this row was inserted (or the mapping
413 # was already valid)
Andrew Geissler475cb722020-07-10 16:00:51 -0500414 row = self.query_equivalent(data['method'], data['taskhash'], self.FAST_QUERY)
Andrew Geissler82c905d2020-04-13 13:39:40 -0500415
416 if row['unihash'] == data['unihash']:
417 logger.info('Adding taskhash equivalence for %s with unihash %s',
418 data['taskhash'], row['unihash'])
419
420 d = {k: row[k] for k in ('taskhash', 'method', 'unihash')}
421
422 self.write_message(d)
423
424
Brad Bishopa34c0302019-09-23 22:34:48 -0400425 async def handle_get_stats(self, request):
426 d = {
427 'requests': self.request_stats.todict(),
428 }
429
430 self.write_message(d)
431
432 async def handle_reset_stats(self, request):
433 d = {
434 'requests': self.request_stats.todict(),
435 }
436
437 self.request_stats.reset()
438 self.write_message(d)
439
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600440 async def handle_backfill_wait(self, request):
441 d = {
442 'tasks': self.backfill_queue.qsize(),
443 }
444 await self.backfill_queue.join()
445 self.write_message(d)
446
Andrew Geissler475cb722020-07-10 16:00:51 -0500447 def query_equivalent(self, method, taskhash, query):
Brad Bishopa34c0302019-09-23 22:34:48 -0400448 # This is part of the inner loop and must be as fast as possible
449 try:
450 cursor = self.db.cursor()
Andrew Geissler475cb722020-07-10 16:00:51 -0500451 cursor.execute(query, {'method': method, 'taskhash': taskhash})
Brad Bishopa34c0302019-09-23 22:34:48 -0400452 return cursor.fetchone()
453 except:
454 cursor.close()
455
456
457class Server(object):
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600458 def __init__(self, db, loop=None, upstream=None):
Brad Bishopa34c0302019-09-23 22:34:48 -0400459 self.request_stats = Stats()
460 self.db = db
461
462 if loop is None:
463 self.loop = asyncio.new_event_loop()
464 self.close_loop = True
465 else:
466 self.loop = loop
467 self.close_loop = False
468
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600469 self.upstream = upstream
470
Brad Bishopa34c0302019-09-23 22:34:48 -0400471 self._cleanup_socket = None
472
473 def start_tcp_server(self, host, port):
474 self.server = self.loop.run_until_complete(
475 asyncio.start_server(self.handle_client, host, port, loop=self.loop)
476 )
477
478 for s in self.server.sockets:
479 logger.info('Listening on %r' % (s.getsockname(),))
480 # Newer python does this automatically. Do it manually here for
481 # maximum compatibility
482 s.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
483 s.setsockopt(socket.SOL_TCP, socket.TCP_QUICKACK, 1)
484
485 name = self.server.sockets[0].getsockname()
486 if self.server.sockets[0].family == socket.AF_INET6:
487 self.address = "[%s]:%d" % (name[0], name[1])
488 else:
489 self.address = "%s:%d" % (name[0], name[1])
490
491 def start_unix_server(self, path):
492 def cleanup():
493 os.unlink(path)
494
495 cwd = os.getcwd()
496 try:
497 # Work around path length limits in AF_UNIX
498 os.chdir(os.path.dirname(path))
499 self.server = self.loop.run_until_complete(
500 asyncio.start_unix_server(self.handle_client, os.path.basename(path), loop=self.loop)
501 )
502 finally:
503 os.chdir(cwd)
504
505 logger.info('Listening on %r' % path)
506
507 self._cleanup_socket = cleanup
508 self.address = "unix://%s" % os.path.abspath(path)
509
510 async def handle_client(self, reader, writer):
511 # writer.transport.set_write_buffer_limits(0)
512 try:
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600513 client = ServerClient(reader, writer, self.db, self.request_stats, self.backfill_queue, self.upstream)
Brad Bishopa34c0302019-09-23 22:34:48 -0400514 await client.process_requests()
515 except Exception as e:
516 import traceback
517 logger.error('Error from client: %s' % str(e), exc_info=True)
518 traceback.print_exc()
519 writer.close()
520 logger.info('Client disconnected')
521
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600522 @contextmanager
523 def _backfill_worker(self):
524 async def backfill_worker_task():
525 client = await create_async_client(self.upstream)
526 try:
527 while True:
528 item = await self.backfill_queue.get()
529 if item is None:
530 self.backfill_queue.task_done()
531 break
532 method, taskhash = item
533 await copy_from_upstream(client, self.db, method, taskhash)
534 self.backfill_queue.task_done()
535 finally:
536 await client.close()
537
538 async def join_worker(worker):
539 await self.backfill_queue.put(None)
540 await worker
541
542 if self.upstream is not None:
543 worker = asyncio.ensure_future(backfill_worker_task())
544 try:
545 yield
546 finally:
547 self.loop.run_until_complete(join_worker(worker))
548 else:
549 yield
550
Brad Bishopa34c0302019-09-23 22:34:48 -0400551 def serve_forever(self):
552 def signal_handler():
553 self.loop.stop()
554
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600555 asyncio.set_event_loop(self.loop)
Brad Bishopa34c0302019-09-23 22:34:48 -0400556 try:
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600557 self.backfill_queue = asyncio.Queue()
Brad Bishopa34c0302019-09-23 22:34:48 -0400558
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600559 self.loop.add_signal_handler(signal.SIGTERM, signal_handler)
Brad Bishopa34c0302019-09-23 22:34:48 -0400560
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600561 with self._backfill_worker():
562 try:
563 self.loop.run_forever()
564 except KeyboardInterrupt:
565 pass
Brad Bishopa34c0302019-09-23 22:34:48 -0400566
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600567 self.server.close()
568
569 self.loop.run_until_complete(self.server.wait_closed())
570 logger.info('Server shutting down')
571 finally:
572 if self.close_loop:
573 if sys.version_info >= (3, 6):
574 self.loop.run_until_complete(self.loop.shutdown_asyncgens())
575 self.loop.close()
576
577 if self._cleanup_socket is not None:
578 self._cleanup_socket()