blob: c941c0e9dde018dc8b7a1a1db3aadac28bef7fed [file] [log] [blame]
Brad Bishopa34c0302019-09-23 22:34:48 -04001# Copyright (C) 2019 Garmin Ltd.
2#
3# SPDX-License-Identifier: GPL-2.0-only
4#
5
Andrew Geissler6ce62a22020-11-30 19:58:47 -06006from contextlib import closing, contextmanager
Brad Bishopa34c0302019-09-23 22:34:48 -04007from datetime import datetime
8import asyncio
9import json
10import logging
11import math
12import os
13import signal
14import socket
Andrew Geissler6ce62a22020-11-30 19:58:47 -060015import sys
Brad Bishopa34c0302019-09-23 22:34:48 -040016import time
Andrew Geisslerc926e172021-05-07 16:11:35 -050017from . import create_async_client, TABLE_COLUMNS
18import bb.asyncrpc
19
Brad Bishopa34c0302019-09-23 22:34:48 -040020
21logger = logging.getLogger('hashserv.server')
22
23
24class Measurement(object):
25 def __init__(self, sample):
26 self.sample = sample
27
28 def start(self):
29 self.start_time = time.perf_counter()
30
31 def end(self):
32 self.sample.add(time.perf_counter() - self.start_time)
33
34 def __enter__(self):
35 self.start()
36 return self
37
38 def __exit__(self, *args, **kwargs):
39 self.end()
40
41
42class Sample(object):
43 def __init__(self, stats):
44 self.stats = stats
45 self.num_samples = 0
46 self.elapsed = 0
47
48 def measure(self):
49 return Measurement(self)
50
51 def __enter__(self):
52 return self
53
54 def __exit__(self, *args, **kwargs):
55 self.end()
56
57 def add(self, elapsed):
58 self.num_samples += 1
59 self.elapsed += elapsed
60
61 def end(self):
62 if self.num_samples:
63 self.stats.add(self.elapsed)
64 self.num_samples = 0
65 self.elapsed = 0
66
67
68class Stats(object):
69 def __init__(self):
70 self.reset()
71
72 def reset(self):
73 self.num = 0
74 self.total_time = 0
75 self.max_time = 0
76 self.m = 0
77 self.s = 0
78 self.current_elapsed = None
79
80 def add(self, elapsed):
81 self.num += 1
82 if self.num == 1:
83 self.m = elapsed
84 self.s = 0
85 else:
86 last_m = self.m
87 self.m = last_m + (elapsed - last_m) / self.num
88 self.s = self.s + (elapsed - last_m) * (elapsed - self.m)
89
90 self.total_time += elapsed
91
92 if self.max_time < elapsed:
93 self.max_time = elapsed
94
95 def start_sample(self):
96 return Sample(self)
97
98 @property
99 def average(self):
100 if self.num == 0:
101 return 0
102 return self.total_time / self.num
103
104 @property
105 def stdev(self):
106 if self.num <= 1:
107 return 0
108 return math.sqrt(self.s / (self.num - 1))
109
110 def todict(self):
111 return {k: getattr(self, k) for k in ('num', 'total_time', 'max_time', 'average', 'stdev')}
112
113
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600114def insert_task(cursor, data, ignore=False):
115 keys = sorted(data.keys())
116 query = '''INSERT%s INTO tasks_v2 (%s) VALUES (%s)''' % (
117 " OR IGNORE" if ignore else "",
118 ', '.join(keys),
119 ', '.join(':' + k for k in keys))
120 cursor.execute(query, data)
121
122async def copy_from_upstream(client, db, method, taskhash):
123 d = await client.get_taskhash(method, taskhash, True)
124 if d is not None:
125 # Filter out unknown columns
126 d = {k: v for k, v in d.items() if k in TABLE_COLUMNS}
127 keys = sorted(d.keys())
128
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600129 with closing(db.cursor()) as cursor:
130 insert_task(cursor, d)
131 db.commit()
132
133 return d
134
135async def copy_outhash_from_upstream(client, db, method, outhash, taskhash):
136 d = await client.get_outhash(method, outhash, taskhash)
137 if d is not None:
138 # Filter out unknown columns
139 d = {k: v for k, v in d.items() if k in TABLE_COLUMNS}
140 keys = sorted(d.keys())
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600141
142 with closing(db.cursor()) as cursor:
143 insert_task(cursor, d)
144 db.commit()
145
146 return d
147
Andrew Geisslerc926e172021-05-07 16:11:35 -0500148class ServerClient(bb.asyncrpc.AsyncServerConnection):
Andrew Geissler475cb722020-07-10 16:00:51 -0500149 FAST_QUERY = 'SELECT taskhash, method, unihash FROM tasks_v2 WHERE method=:method AND taskhash=:taskhash ORDER BY created ASC LIMIT 1'
150 ALL_QUERY = 'SELECT * FROM tasks_v2 WHERE method=:method AND taskhash=:taskhash ORDER BY created ASC LIMIT 1'
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600151 OUTHASH_QUERY = '''
152 -- Find tasks with a matching outhash (that is, tasks that
153 -- are equivalent)
154 SELECT * FROM tasks_v2 WHERE method=:method AND outhash=:outhash
Andrew Geissler475cb722020-07-10 16:00:51 -0500155
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600156 -- If there is an exact match on the taskhash, return it.
157 -- Otherwise return the oldest matching outhash of any
158 -- taskhash
159 ORDER BY CASE WHEN taskhash=:taskhash THEN 1 ELSE 2 END,
160 created ASC
161
162 -- Only return one row
163 LIMIT 1
164 '''
165
166 def __init__(self, reader, writer, db, request_stats, backfill_queue, upstream, read_only):
Andrew Geisslerc926e172021-05-07 16:11:35 -0500167 super().__init__(reader, writer, 'OEHASHEQUIV', logger)
Brad Bishopa34c0302019-09-23 22:34:48 -0400168 self.db = db
169 self.request_stats = request_stats
Andrew Geisslerc926e172021-05-07 16:11:35 -0500170 self.max_chunk = bb.asyncrpc.DEFAULT_MAX_CHUNK
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600171 self.backfill_queue = backfill_queue
172 self.upstream = upstream
Andrew Geissler475cb722020-07-10 16:00:51 -0500173
Andrew Geisslerc926e172021-05-07 16:11:35 -0500174 self.handlers.update({
Andrew Geissler475cb722020-07-10 16:00:51 -0500175 'get': self.handle_get,
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600176 'get-outhash': self.handle_get_outhash,
Andrew Geissler475cb722020-07-10 16:00:51 -0500177 'get-stream': self.handle_get_stream,
178 'get-stats': self.handle_get_stats,
Andrew Geisslerc926e172021-05-07 16:11:35 -0500179 })
Brad Bishopa34c0302019-09-23 22:34:48 -0400180
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600181 if not read_only:
182 self.handlers.update({
183 'report': self.handle_report,
184 'report-equiv': self.handle_equivreport,
185 'reset-stats': self.handle_reset_stats,
186 'backfill-wait': self.handle_backfill_wait,
187 })
188
Andrew Geisslerc926e172021-05-07 16:11:35 -0500189 def validate_proto_version(self):
190 return (self.proto_version > (1, 0) and self.proto_version <= (1, 1))
191
Brad Bishopa34c0302019-09-23 22:34:48 -0400192 async def process_requests(self):
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600193 if self.upstream is not None:
194 self.upstream_client = await create_async_client(self.upstream)
195 else:
196 self.upstream_client = None
197
Andrew Geisslerc926e172021-05-07 16:11:35 -0500198 await super().process_requests()
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600199
Andrew Geisslerc926e172021-05-07 16:11:35 -0500200 if self.upstream_client is not None:
201 await self.upstream_client.close()
Brad Bishopa34c0302019-09-23 22:34:48 -0400202
Andrew Geissler475cb722020-07-10 16:00:51 -0500203 async def dispatch_message(self, msg):
204 for k in self.handlers.keys():
205 if k in msg:
206 logger.debug('Handling %s' % k)
207 if 'stream' in k:
208 await self.handlers[k](msg[k])
209 else:
210 with self.request_stats.start_sample() as self.request_sample, \
211 self.request_sample.measure():
212 await self.handlers[k](msg[k])
213 return
214
Andrew Geisslerc926e172021-05-07 16:11:35 -0500215 raise bb.asyncrpc.ClientError("Unrecognized command %r" % msg)
Andrew Geissler475cb722020-07-10 16:00:51 -0500216
Brad Bishopa34c0302019-09-23 22:34:48 -0400217 async def handle_get(self, request):
218 method = request['method']
219 taskhash = request['taskhash']
220
Andrew Geissler475cb722020-07-10 16:00:51 -0500221 if request.get('all', False):
222 row = self.query_equivalent(method, taskhash, self.ALL_QUERY)
223 else:
224 row = self.query_equivalent(method, taskhash, self.FAST_QUERY)
225
Brad Bishopa34c0302019-09-23 22:34:48 -0400226 if row is not None:
227 logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash']))
Andrew Geissler475cb722020-07-10 16:00:51 -0500228 d = {k: row[k] for k in row.keys()}
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600229 elif self.upstream_client is not None:
230 d = await copy_from_upstream(self.upstream_client, self.db, method, taskhash)
Brad Bishopa34c0302019-09-23 22:34:48 -0400231 else:
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600232 d = None
233
234 self.write_message(d)
Brad Bishopa34c0302019-09-23 22:34:48 -0400235
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600236 async def handle_get_outhash(self, request):
237 with closing(self.db.cursor()) as cursor:
238 cursor.execute(self.OUTHASH_QUERY,
239 {k: request[k] for k in ('method', 'outhash', 'taskhash')})
240
241 row = cursor.fetchone()
242
243 if row is not None:
244 logger.debug('Found equivalent outhash %s -> %s', (row['outhash'], row['unihash']))
245 d = {k: row[k] for k in row.keys()}
246 else:
247 d = None
248
249 self.write_message(d)
250
Brad Bishopa34c0302019-09-23 22:34:48 -0400251 async def handle_get_stream(self, request):
252 self.write_message('ok')
253
254 while True:
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600255 upstream = None
256
Brad Bishopa34c0302019-09-23 22:34:48 -0400257 l = await self.reader.readline()
258 if not l:
259 return
260
261 try:
262 # This inner loop is very sensitive and must be as fast as
263 # possible (which is why the request sample is handled manually
264 # instead of using 'with', and also why logging statements are
265 # commented out.
266 self.request_sample = self.request_stats.start_sample()
267 request_measure = self.request_sample.measure()
268 request_measure.start()
269
270 l = l.decode('utf-8').rstrip()
271 if l == 'END':
272 self.writer.write('ok\n'.encode('utf-8'))
273 return
274
275 (method, taskhash) = l.split()
276 #logger.debug('Looking up %s %s' % (method, taskhash))
Andrew Geissler475cb722020-07-10 16:00:51 -0500277 row = self.query_equivalent(method, taskhash, self.FAST_QUERY)
Brad Bishopa34c0302019-09-23 22:34:48 -0400278 if row is not None:
279 msg = ('%s\n' % row['unihash']).encode('utf-8')
280 #logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash']))
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600281 elif self.upstream_client is not None:
282 upstream = await self.upstream_client.get_unihash(method, taskhash)
283 if upstream:
284 msg = ("%s\n" % upstream).encode("utf-8")
285 else:
286 msg = "\n".encode("utf-8")
Brad Bishopa34c0302019-09-23 22:34:48 -0400287 else:
288 msg = '\n'.encode('utf-8')
289
290 self.writer.write(msg)
291 finally:
292 request_measure.end()
293 self.request_sample.end()
294
295 await self.writer.drain()
296
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600297 # Post to the backfill queue after writing the result to minimize
298 # the turn around time on a request
299 if upstream is not None:
300 await self.backfill_queue.put((method, taskhash))
301
Brad Bishopa34c0302019-09-23 22:34:48 -0400302 async def handle_report(self, data):
303 with closing(self.db.cursor()) as cursor:
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600304 cursor.execute(self.OUTHASH_QUERY,
305 {k: data[k] for k in ('method', 'outhash', 'taskhash')})
Brad Bishopa34c0302019-09-23 22:34:48 -0400306
307 row = cursor.fetchone()
308
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600309 if row is None and self.upstream_client:
310 # Try upstream
311 row = await copy_outhash_from_upstream(self.upstream_client,
312 self.db,
313 data['method'],
314 data['outhash'],
315 data['taskhash'])
316
Brad Bishopa34c0302019-09-23 22:34:48 -0400317 # If no matching outhash was found, or one *was* found but it
318 # wasn't an exact match on the taskhash, a new entry for this
319 # taskhash should be added
320 if row is None or row['taskhash'] != data['taskhash']:
321 # If a row matching the outhash was found, the unihash for
322 # the new taskhash should be the same as that one.
323 # Otherwise the caller provided unihash is used.
324 unihash = data['unihash']
325 if row is not None:
326 unihash = row['unihash']
327
328 insert_data = {
329 'method': data['method'],
330 'outhash': data['outhash'],
331 'taskhash': data['taskhash'],
332 'unihash': unihash,
333 'created': datetime.now()
334 }
335
336 for k in ('owner', 'PN', 'PV', 'PR', 'task', 'outhash_siginfo'):
337 if k in data:
338 insert_data[k] = data[k]
339
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600340 insert_task(cursor, insert_data)
Brad Bishopa34c0302019-09-23 22:34:48 -0400341 self.db.commit()
342
343 logger.info('Adding taskhash %s with unihash %s',
344 data['taskhash'], unihash)
345
346 d = {
347 'taskhash': data['taskhash'],
348 'method': data['method'],
349 'unihash': unihash
350 }
351 else:
352 d = {k: row[k] for k in ('taskhash', 'method', 'unihash')}
353
354 self.write_message(d)
355
Andrew Geissler82c905d2020-04-13 13:39:40 -0500356 async def handle_equivreport(self, data):
357 with closing(self.db.cursor()) as cursor:
358 insert_data = {
359 'method': data['method'],
360 'outhash': "",
361 'taskhash': data['taskhash'],
362 'unihash': data['unihash'],
363 'created': datetime.now()
364 }
365
366 for k in ('owner', 'PN', 'PV', 'PR', 'task', 'outhash_siginfo'):
367 if k in data:
368 insert_data[k] = data[k]
369
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600370 insert_task(cursor, insert_data, ignore=True)
Andrew Geissler82c905d2020-04-13 13:39:40 -0500371 self.db.commit()
372
373 # Fetch the unihash that will be reported for the taskhash. If the
374 # unihash matches, it means this row was inserted (or the mapping
375 # was already valid)
Andrew Geissler475cb722020-07-10 16:00:51 -0500376 row = self.query_equivalent(data['method'], data['taskhash'], self.FAST_QUERY)
Andrew Geissler82c905d2020-04-13 13:39:40 -0500377
378 if row['unihash'] == data['unihash']:
379 logger.info('Adding taskhash equivalence for %s with unihash %s',
380 data['taskhash'], row['unihash'])
381
382 d = {k: row[k] for k in ('taskhash', 'method', 'unihash')}
383
384 self.write_message(d)
385
386
Brad Bishopa34c0302019-09-23 22:34:48 -0400387 async def handle_get_stats(self, request):
388 d = {
389 'requests': self.request_stats.todict(),
390 }
391
392 self.write_message(d)
393
394 async def handle_reset_stats(self, request):
395 d = {
396 'requests': self.request_stats.todict(),
397 }
398
399 self.request_stats.reset()
400 self.write_message(d)
401
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600402 async def handle_backfill_wait(self, request):
403 d = {
404 'tasks': self.backfill_queue.qsize(),
405 }
406 await self.backfill_queue.join()
407 self.write_message(d)
408
Andrew Geissler475cb722020-07-10 16:00:51 -0500409 def query_equivalent(self, method, taskhash, query):
Brad Bishopa34c0302019-09-23 22:34:48 -0400410 # This is part of the inner loop and must be as fast as possible
411 try:
412 cursor = self.db.cursor()
Andrew Geissler475cb722020-07-10 16:00:51 -0500413 cursor.execute(query, {'method': method, 'taskhash': taskhash})
Brad Bishopa34c0302019-09-23 22:34:48 -0400414 return cursor.fetchone()
415 except:
416 cursor.close()
417
418
Andrew Geisslerc926e172021-05-07 16:11:35 -0500419class Server(bb.asyncrpc.AsyncServer):
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600420 def __init__(self, db, loop=None, upstream=None, read_only=False):
421 if upstream and read_only:
Andrew Geisslerc926e172021-05-07 16:11:35 -0500422 raise bb.asyncrpc.ServerError("Read-only hashserv cannot pull from an upstream server")
423
424 super().__init__(logger, loop)
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600425
Brad Bishopa34c0302019-09-23 22:34:48 -0400426 self.request_stats = Stats()
427 self.db = db
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600428 self.upstream = upstream
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600429 self.read_only = read_only
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600430
Andrew Geisslerc926e172021-05-07 16:11:35 -0500431 def accept_client(self, reader, writer):
432 return ServerClient(reader, writer, self.db, self.request_stats, self.backfill_queue, self.upstream, self.read_only)
Brad Bishopa34c0302019-09-23 22:34:48 -0400433
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600434 @contextmanager
435 def _backfill_worker(self):
436 async def backfill_worker_task():
437 client = await create_async_client(self.upstream)
438 try:
439 while True:
440 item = await self.backfill_queue.get()
441 if item is None:
442 self.backfill_queue.task_done()
443 break
444 method, taskhash = item
445 await copy_from_upstream(client, self.db, method, taskhash)
446 self.backfill_queue.task_done()
447 finally:
448 await client.close()
449
450 async def join_worker(worker):
451 await self.backfill_queue.put(None)
452 await worker
453
454 if self.upstream is not None:
455 worker = asyncio.ensure_future(backfill_worker_task())
456 try:
457 yield
458 finally:
459 self.loop.run_until_complete(join_worker(worker))
460 else:
461 yield
462
Andrew Geisslerc926e172021-05-07 16:11:35 -0500463 def run_loop_forever(self):
464 self.backfill_queue = asyncio.Queue()
Brad Bishopa34c0302019-09-23 22:34:48 -0400465
Andrew Geisslerc926e172021-05-07 16:11:35 -0500466 with self._backfill_worker():
467 super().run_loop_forever()