blob: 8e849897371606d840fbea4c175a49748633c044 [file] [log] [blame]
Brad Bishopa34c0302019-09-23 22:34:48 -04001# Copyright (C) 2019 Garmin Ltd.
2#
3# SPDX-License-Identifier: GPL-2.0-only
4#
5
Andrew Geissler6ce62a22020-11-30 19:58:47 -06006from contextlib import closing, contextmanager
Brad Bishopa34c0302019-09-23 22:34:48 -04007from datetime import datetime
8import asyncio
Brad Bishopa34c0302019-09-23 22:34:48 -04009import logging
10import math
Brad Bishopa34c0302019-09-23 22:34:48 -040011import time
Andrew Geisslerc926e172021-05-07 16:11:35 -050012from . import create_async_client, TABLE_COLUMNS
13import bb.asyncrpc
14
Brad Bishopa34c0302019-09-23 22:34:48 -040015
16logger = logging.getLogger('hashserv.server')
17
18
19class Measurement(object):
20 def __init__(self, sample):
21 self.sample = sample
22
23 def start(self):
24 self.start_time = time.perf_counter()
25
26 def end(self):
27 self.sample.add(time.perf_counter() - self.start_time)
28
29 def __enter__(self):
30 self.start()
31 return self
32
33 def __exit__(self, *args, **kwargs):
34 self.end()
35
36
37class Sample(object):
38 def __init__(self, stats):
39 self.stats = stats
40 self.num_samples = 0
41 self.elapsed = 0
42
43 def measure(self):
44 return Measurement(self)
45
46 def __enter__(self):
47 return self
48
49 def __exit__(self, *args, **kwargs):
50 self.end()
51
52 def add(self, elapsed):
53 self.num_samples += 1
54 self.elapsed += elapsed
55
56 def end(self):
57 if self.num_samples:
58 self.stats.add(self.elapsed)
59 self.num_samples = 0
60 self.elapsed = 0
61
62
63class Stats(object):
64 def __init__(self):
65 self.reset()
66
67 def reset(self):
68 self.num = 0
69 self.total_time = 0
70 self.max_time = 0
71 self.m = 0
72 self.s = 0
73 self.current_elapsed = None
74
75 def add(self, elapsed):
76 self.num += 1
77 if self.num == 1:
78 self.m = elapsed
79 self.s = 0
80 else:
81 last_m = self.m
82 self.m = last_m + (elapsed - last_m) / self.num
83 self.s = self.s + (elapsed - last_m) * (elapsed - self.m)
84
85 self.total_time += elapsed
86
87 if self.max_time < elapsed:
88 self.max_time = elapsed
89
90 def start_sample(self):
91 return Sample(self)
92
93 @property
94 def average(self):
95 if self.num == 0:
96 return 0
97 return self.total_time / self.num
98
99 @property
100 def stdev(self):
101 if self.num <= 1:
102 return 0
103 return math.sqrt(self.s / (self.num - 1))
104
105 def todict(self):
106 return {k: getattr(self, k) for k in ('num', 'total_time', 'max_time', 'average', 'stdev')}
107
108
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600109def insert_task(cursor, data, ignore=False):
110 keys = sorted(data.keys())
111 query = '''INSERT%s INTO tasks_v2 (%s) VALUES (%s)''' % (
112 " OR IGNORE" if ignore else "",
113 ', '.join(keys),
114 ', '.join(':' + k for k in keys))
115 cursor.execute(query, data)
116
117async def copy_from_upstream(client, db, method, taskhash):
118 d = await client.get_taskhash(method, taskhash, True)
119 if d is not None:
120 # Filter out unknown columns
121 d = {k: v for k, v in d.items() if k in TABLE_COLUMNS}
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600122
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600123 with closing(db.cursor()) as cursor:
124 insert_task(cursor, d)
125 db.commit()
126
127 return d
128
129async def copy_outhash_from_upstream(client, db, method, outhash, taskhash):
130 d = await client.get_outhash(method, outhash, taskhash)
131 if d is not None:
132 # Filter out unknown columns
133 d = {k: v for k, v in d.items() if k in TABLE_COLUMNS}
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600134
135 with closing(db.cursor()) as cursor:
136 insert_task(cursor, d)
137 db.commit()
138
139 return d
140
Andrew Geisslerc926e172021-05-07 16:11:35 -0500141class ServerClient(bb.asyncrpc.AsyncServerConnection):
Andrew Geissler475cb722020-07-10 16:00:51 -0500142 FAST_QUERY = 'SELECT taskhash, method, unihash FROM tasks_v2 WHERE method=:method AND taskhash=:taskhash ORDER BY created ASC LIMIT 1'
143 ALL_QUERY = 'SELECT * FROM tasks_v2 WHERE method=:method AND taskhash=:taskhash ORDER BY created ASC LIMIT 1'
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600144 OUTHASH_QUERY = '''
145 -- Find tasks with a matching outhash (that is, tasks that
146 -- are equivalent)
147 SELECT * FROM tasks_v2 WHERE method=:method AND outhash=:outhash
Andrew Geissler475cb722020-07-10 16:00:51 -0500148
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600149 -- If there is an exact match on the taskhash, return it.
150 -- Otherwise return the oldest matching outhash of any
151 -- taskhash
152 ORDER BY CASE WHEN taskhash=:taskhash THEN 1 ELSE 2 END,
153 created ASC
154
155 -- Only return one row
156 LIMIT 1
157 '''
158
159 def __init__(self, reader, writer, db, request_stats, backfill_queue, upstream, read_only):
Andrew Geisslerc926e172021-05-07 16:11:35 -0500160 super().__init__(reader, writer, 'OEHASHEQUIV', logger)
Brad Bishopa34c0302019-09-23 22:34:48 -0400161 self.db = db
162 self.request_stats = request_stats
Andrew Geisslerc926e172021-05-07 16:11:35 -0500163 self.max_chunk = bb.asyncrpc.DEFAULT_MAX_CHUNK
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600164 self.backfill_queue = backfill_queue
165 self.upstream = upstream
Andrew Geissler475cb722020-07-10 16:00:51 -0500166
Andrew Geisslerc926e172021-05-07 16:11:35 -0500167 self.handlers.update({
Andrew Geissler475cb722020-07-10 16:00:51 -0500168 'get': self.handle_get,
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600169 'get-outhash': self.handle_get_outhash,
Andrew Geissler475cb722020-07-10 16:00:51 -0500170 'get-stream': self.handle_get_stream,
171 'get-stats': self.handle_get_stats,
Andrew Geisslerc926e172021-05-07 16:11:35 -0500172 })
Brad Bishopa34c0302019-09-23 22:34:48 -0400173
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600174 if not read_only:
175 self.handlers.update({
176 'report': self.handle_report,
177 'report-equiv': self.handle_equivreport,
178 'reset-stats': self.handle_reset_stats,
179 'backfill-wait': self.handle_backfill_wait,
180 })
181
Andrew Geisslerc926e172021-05-07 16:11:35 -0500182 def validate_proto_version(self):
183 return (self.proto_version > (1, 0) and self.proto_version <= (1, 1))
184
Brad Bishopa34c0302019-09-23 22:34:48 -0400185 async def process_requests(self):
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600186 if self.upstream is not None:
187 self.upstream_client = await create_async_client(self.upstream)
188 else:
189 self.upstream_client = None
190
Andrew Geisslerc926e172021-05-07 16:11:35 -0500191 await super().process_requests()
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600192
Andrew Geisslerc926e172021-05-07 16:11:35 -0500193 if self.upstream_client is not None:
194 await self.upstream_client.close()
Brad Bishopa34c0302019-09-23 22:34:48 -0400195
Andrew Geissler475cb722020-07-10 16:00:51 -0500196 async def dispatch_message(self, msg):
197 for k in self.handlers.keys():
198 if k in msg:
199 logger.debug('Handling %s' % k)
200 if 'stream' in k:
201 await self.handlers[k](msg[k])
202 else:
203 with self.request_stats.start_sample() as self.request_sample, \
204 self.request_sample.measure():
205 await self.handlers[k](msg[k])
206 return
207
Andrew Geisslerc926e172021-05-07 16:11:35 -0500208 raise bb.asyncrpc.ClientError("Unrecognized command %r" % msg)
Andrew Geissler475cb722020-07-10 16:00:51 -0500209
Brad Bishopa34c0302019-09-23 22:34:48 -0400210 async def handle_get(self, request):
211 method = request['method']
212 taskhash = request['taskhash']
213
Andrew Geissler475cb722020-07-10 16:00:51 -0500214 if request.get('all', False):
215 row = self.query_equivalent(method, taskhash, self.ALL_QUERY)
216 else:
217 row = self.query_equivalent(method, taskhash, self.FAST_QUERY)
218
Brad Bishopa34c0302019-09-23 22:34:48 -0400219 if row is not None:
220 logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash']))
Andrew Geissler475cb722020-07-10 16:00:51 -0500221 d = {k: row[k] for k in row.keys()}
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600222 elif self.upstream_client is not None:
223 d = await copy_from_upstream(self.upstream_client, self.db, method, taskhash)
Brad Bishopa34c0302019-09-23 22:34:48 -0400224 else:
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600225 d = None
226
227 self.write_message(d)
Brad Bishopa34c0302019-09-23 22:34:48 -0400228
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600229 async def handle_get_outhash(self, request):
230 with closing(self.db.cursor()) as cursor:
231 cursor.execute(self.OUTHASH_QUERY,
232 {k: request[k] for k in ('method', 'outhash', 'taskhash')})
233
234 row = cursor.fetchone()
235
236 if row is not None:
237 logger.debug('Found equivalent outhash %s -> %s', (row['outhash'], row['unihash']))
238 d = {k: row[k] for k in row.keys()}
239 else:
240 d = None
241
242 self.write_message(d)
243
Brad Bishopa34c0302019-09-23 22:34:48 -0400244 async def handle_get_stream(self, request):
245 self.write_message('ok')
246
247 while True:
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600248 upstream = None
249
Brad Bishopa34c0302019-09-23 22:34:48 -0400250 l = await self.reader.readline()
251 if not l:
252 return
253
254 try:
255 # This inner loop is very sensitive and must be as fast as
256 # possible (which is why the request sample is handled manually
257 # instead of using 'with', and also why logging statements are
258 # commented out.
259 self.request_sample = self.request_stats.start_sample()
260 request_measure = self.request_sample.measure()
261 request_measure.start()
262
263 l = l.decode('utf-8').rstrip()
264 if l == 'END':
265 self.writer.write('ok\n'.encode('utf-8'))
266 return
267
268 (method, taskhash) = l.split()
269 #logger.debug('Looking up %s %s' % (method, taskhash))
Andrew Geissler475cb722020-07-10 16:00:51 -0500270 row = self.query_equivalent(method, taskhash, self.FAST_QUERY)
Brad Bishopa34c0302019-09-23 22:34:48 -0400271 if row is not None:
272 msg = ('%s\n' % row['unihash']).encode('utf-8')
273 #logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash']))
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600274 elif self.upstream_client is not None:
275 upstream = await self.upstream_client.get_unihash(method, taskhash)
276 if upstream:
277 msg = ("%s\n" % upstream).encode("utf-8")
278 else:
279 msg = "\n".encode("utf-8")
Brad Bishopa34c0302019-09-23 22:34:48 -0400280 else:
281 msg = '\n'.encode('utf-8')
282
283 self.writer.write(msg)
284 finally:
285 request_measure.end()
286 self.request_sample.end()
287
288 await self.writer.drain()
289
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600290 # Post to the backfill queue after writing the result to minimize
291 # the turn around time on a request
292 if upstream is not None:
293 await self.backfill_queue.put((method, taskhash))
294
Brad Bishopa34c0302019-09-23 22:34:48 -0400295 async def handle_report(self, data):
296 with closing(self.db.cursor()) as cursor:
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600297 cursor.execute(self.OUTHASH_QUERY,
298 {k: data[k] for k in ('method', 'outhash', 'taskhash')})
Brad Bishopa34c0302019-09-23 22:34:48 -0400299
300 row = cursor.fetchone()
301
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600302 if row is None and self.upstream_client:
303 # Try upstream
304 row = await copy_outhash_from_upstream(self.upstream_client,
305 self.db,
306 data['method'],
307 data['outhash'],
308 data['taskhash'])
309
Brad Bishopa34c0302019-09-23 22:34:48 -0400310 # If no matching outhash was found, or one *was* found but it
311 # wasn't an exact match on the taskhash, a new entry for this
312 # taskhash should be added
313 if row is None or row['taskhash'] != data['taskhash']:
314 # If a row matching the outhash was found, the unihash for
315 # the new taskhash should be the same as that one.
316 # Otherwise the caller provided unihash is used.
317 unihash = data['unihash']
318 if row is not None:
319 unihash = row['unihash']
320
321 insert_data = {
322 'method': data['method'],
323 'outhash': data['outhash'],
324 'taskhash': data['taskhash'],
325 'unihash': unihash,
326 'created': datetime.now()
327 }
328
329 for k in ('owner', 'PN', 'PV', 'PR', 'task', 'outhash_siginfo'):
330 if k in data:
331 insert_data[k] = data[k]
332
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600333 insert_task(cursor, insert_data)
Brad Bishopa34c0302019-09-23 22:34:48 -0400334 self.db.commit()
335
336 logger.info('Adding taskhash %s with unihash %s',
337 data['taskhash'], unihash)
338
339 d = {
340 'taskhash': data['taskhash'],
341 'method': data['method'],
342 'unihash': unihash
343 }
344 else:
345 d = {k: row[k] for k in ('taskhash', 'method', 'unihash')}
346
347 self.write_message(d)
348
Andrew Geissler82c905d2020-04-13 13:39:40 -0500349 async def handle_equivreport(self, data):
350 with closing(self.db.cursor()) as cursor:
351 insert_data = {
352 'method': data['method'],
353 'outhash': "",
354 'taskhash': data['taskhash'],
355 'unihash': data['unihash'],
356 'created': datetime.now()
357 }
358
359 for k in ('owner', 'PN', 'PV', 'PR', 'task', 'outhash_siginfo'):
360 if k in data:
361 insert_data[k] = data[k]
362
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600363 insert_task(cursor, insert_data, ignore=True)
Andrew Geissler82c905d2020-04-13 13:39:40 -0500364 self.db.commit()
365
366 # Fetch the unihash that will be reported for the taskhash. If the
367 # unihash matches, it means this row was inserted (or the mapping
368 # was already valid)
Andrew Geissler475cb722020-07-10 16:00:51 -0500369 row = self.query_equivalent(data['method'], data['taskhash'], self.FAST_QUERY)
Andrew Geissler82c905d2020-04-13 13:39:40 -0500370
371 if row['unihash'] == data['unihash']:
372 logger.info('Adding taskhash equivalence for %s with unihash %s',
373 data['taskhash'], row['unihash'])
374
375 d = {k: row[k] for k in ('taskhash', 'method', 'unihash')}
376
377 self.write_message(d)
378
379
Brad Bishopa34c0302019-09-23 22:34:48 -0400380 async def handle_get_stats(self, request):
381 d = {
382 'requests': self.request_stats.todict(),
383 }
384
385 self.write_message(d)
386
387 async def handle_reset_stats(self, request):
388 d = {
389 'requests': self.request_stats.todict(),
390 }
391
392 self.request_stats.reset()
393 self.write_message(d)
394
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600395 async def handle_backfill_wait(self, request):
396 d = {
397 'tasks': self.backfill_queue.qsize(),
398 }
399 await self.backfill_queue.join()
400 self.write_message(d)
401
Andrew Geissler475cb722020-07-10 16:00:51 -0500402 def query_equivalent(self, method, taskhash, query):
Brad Bishopa34c0302019-09-23 22:34:48 -0400403 # This is part of the inner loop and must be as fast as possible
404 try:
405 cursor = self.db.cursor()
Andrew Geissler475cb722020-07-10 16:00:51 -0500406 cursor.execute(query, {'method': method, 'taskhash': taskhash})
Brad Bishopa34c0302019-09-23 22:34:48 -0400407 return cursor.fetchone()
408 except:
409 cursor.close()
410
411
Andrew Geisslerc926e172021-05-07 16:11:35 -0500412class Server(bb.asyncrpc.AsyncServer):
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600413 def __init__(self, db, loop=None, upstream=None, read_only=False):
414 if upstream and read_only:
Andrew Geisslerc926e172021-05-07 16:11:35 -0500415 raise bb.asyncrpc.ServerError("Read-only hashserv cannot pull from an upstream server")
416
417 super().__init__(logger, loop)
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600418
Brad Bishopa34c0302019-09-23 22:34:48 -0400419 self.request_stats = Stats()
420 self.db = db
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600421 self.upstream = upstream
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600422 self.read_only = read_only
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600423
Andrew Geisslerc926e172021-05-07 16:11:35 -0500424 def accept_client(self, reader, writer):
425 return ServerClient(reader, writer, self.db, self.request_stats, self.backfill_queue, self.upstream, self.read_only)
Brad Bishopa34c0302019-09-23 22:34:48 -0400426
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600427 @contextmanager
428 def _backfill_worker(self):
429 async def backfill_worker_task():
430 client = await create_async_client(self.upstream)
431 try:
432 while True:
433 item = await self.backfill_queue.get()
434 if item is None:
435 self.backfill_queue.task_done()
436 break
437 method, taskhash = item
438 await copy_from_upstream(client, self.db, method, taskhash)
439 self.backfill_queue.task_done()
440 finally:
441 await client.close()
442
443 async def join_worker(worker):
444 await self.backfill_queue.put(None)
445 await worker
446
447 if self.upstream is not None:
448 worker = asyncio.ensure_future(backfill_worker_task())
449 try:
450 yield
451 finally:
452 self.loop.run_until_complete(join_worker(worker))
453 else:
454 yield
455
Andrew Geisslerc926e172021-05-07 16:11:35 -0500456 def run_loop_forever(self):
457 self.backfill_queue = asyncio.Queue()
Brad Bishopa34c0302019-09-23 22:34:48 -0400458
Andrew Geisslerc926e172021-05-07 16:11:35 -0500459 with self._backfill_worker():
460 super().run_loop_forever()