blob: 45bf476bfefba61d8c0d813f84784c9104c875d5 [file] [log] [blame]
Brad Bishopa34c0302019-09-23 22:34:48 -04001# Copyright (C) 2019 Garmin Ltd.
2#
3# SPDX-License-Identifier: GPL-2.0-only
4#
5
Andrew Geissler6ce62a22020-11-30 19:58:47 -06006from contextlib import closing, contextmanager
Andrew Geissler20137392023-10-12 04:59:14 -06007from datetime import datetime, timedelta
Andrew Geisslereff27472021-10-29 15:35:00 -05008import enum
Brad Bishopa34c0302019-09-23 22:34:48 -04009import asyncio
Brad Bishopa34c0302019-09-23 22:34:48 -040010import logging
11import math
Brad Bishopa34c0302019-09-23 22:34:48 -040012import time
Andrew Geisslereff27472021-10-29 15:35:00 -050013from . import create_async_client, UNIHASH_TABLE_COLUMNS, OUTHASH_TABLE_COLUMNS
Andrew Geisslerc926e172021-05-07 16:11:35 -050014import bb.asyncrpc
15
Brad Bishopa34c0302019-09-23 22:34:48 -040016
17logger = logging.getLogger('hashserv.server')
18
19
20class Measurement(object):
21 def __init__(self, sample):
22 self.sample = sample
23
24 def start(self):
25 self.start_time = time.perf_counter()
26
27 def end(self):
28 self.sample.add(time.perf_counter() - self.start_time)
29
30 def __enter__(self):
31 self.start()
32 return self
33
34 def __exit__(self, *args, **kwargs):
35 self.end()
36
37
38class Sample(object):
39 def __init__(self, stats):
40 self.stats = stats
41 self.num_samples = 0
42 self.elapsed = 0
43
44 def measure(self):
45 return Measurement(self)
46
47 def __enter__(self):
48 return self
49
50 def __exit__(self, *args, **kwargs):
51 self.end()
52
53 def add(self, elapsed):
54 self.num_samples += 1
55 self.elapsed += elapsed
56
57 def end(self):
58 if self.num_samples:
59 self.stats.add(self.elapsed)
60 self.num_samples = 0
61 self.elapsed = 0
62
63
64class Stats(object):
65 def __init__(self):
66 self.reset()
67
68 def reset(self):
69 self.num = 0
70 self.total_time = 0
71 self.max_time = 0
72 self.m = 0
73 self.s = 0
74 self.current_elapsed = None
75
76 def add(self, elapsed):
77 self.num += 1
78 if self.num == 1:
79 self.m = elapsed
80 self.s = 0
81 else:
82 last_m = self.m
83 self.m = last_m + (elapsed - last_m) / self.num
84 self.s = self.s + (elapsed - last_m) * (elapsed - self.m)
85
86 self.total_time += elapsed
87
88 if self.max_time < elapsed:
89 self.max_time = elapsed
90
91 def start_sample(self):
92 return Sample(self)
93
94 @property
95 def average(self):
96 if self.num == 0:
97 return 0
98 return self.total_time / self.num
99
100 @property
101 def stdev(self):
102 if self.num <= 1:
103 return 0
104 return math.sqrt(self.s / (self.num - 1))
105
106 def todict(self):
107 return {k: getattr(self, k) for k in ('num', 'total_time', 'max_time', 'average', 'stdev')}
108
109
Andrew Geisslereff27472021-10-29 15:35:00 -0500110@enum.unique
111class Resolve(enum.Enum):
112 FAIL = enum.auto()
113 IGNORE = enum.auto()
114 REPLACE = enum.auto()
115
116
117def insert_table(cursor, table, data, on_conflict):
118 resolve = {
119 Resolve.FAIL: "",
120 Resolve.IGNORE: " OR IGNORE",
121 Resolve.REPLACE: " OR REPLACE",
122 }[on_conflict]
123
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600124 keys = sorted(data.keys())
Andrew Geisslereff27472021-10-29 15:35:00 -0500125 query = 'INSERT{resolve} INTO {table} ({fields}) VALUES({values})'.format(
126 resolve=resolve,
127 table=table,
128 fields=", ".join(keys),
129 values=", ".join(":" + k for k in keys),
130 )
131 prevrowid = cursor.lastrowid
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600132 cursor.execute(query, data)
Andrew Geisslereff27472021-10-29 15:35:00 -0500133 logging.debug(
134 "Inserting %r into %s, %s",
135 data,
136 table,
137 on_conflict
138 )
139 return (cursor.lastrowid, cursor.lastrowid != prevrowid)
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600140
Andrew Geisslereff27472021-10-29 15:35:00 -0500141def insert_unihash(cursor, data, on_conflict):
142 return insert_table(cursor, "unihashes_v2", data, on_conflict)
143
144def insert_outhash(cursor, data, on_conflict):
145 return insert_table(cursor, "outhashes_v2", data, on_conflict)
146
147async def copy_unihash_from_upstream(client, db, method, taskhash):
148 d = await client.get_taskhash(method, taskhash)
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600149 if d is not None:
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600150 with closing(db.cursor()) as cursor:
Andrew Geisslereff27472021-10-29 15:35:00 -0500151 insert_unihash(
152 cursor,
153 {k: v for k, v in d.items() if k in UNIHASH_TABLE_COLUMNS},
154 Resolve.IGNORE,
155 )
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600156 db.commit()
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600157 return d
158
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600159
Andrew Geisslereff27472021-10-29 15:35:00 -0500160class ServerCursor(object):
161 def __init__(self, db, cursor, upstream):
162 self.db = db
163 self.cursor = cursor
164 self.upstream = upstream
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600165
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600166
Andrew Geisslerc926e172021-05-07 16:11:35 -0500167class ServerClient(bb.asyncrpc.AsyncServerConnection):
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600168 def __init__(self, reader, writer, db, request_stats, backfill_queue, upstream, read_only):
Andrew Geisslerc926e172021-05-07 16:11:35 -0500169 super().__init__(reader, writer, 'OEHASHEQUIV', logger)
Brad Bishopa34c0302019-09-23 22:34:48 -0400170 self.db = db
171 self.request_stats = request_stats
Andrew Geisslerc926e172021-05-07 16:11:35 -0500172 self.max_chunk = bb.asyncrpc.DEFAULT_MAX_CHUNK
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600173 self.backfill_queue = backfill_queue
174 self.upstream = upstream
Andrew Geissler475cb722020-07-10 16:00:51 -0500175
Andrew Geisslerc926e172021-05-07 16:11:35 -0500176 self.handlers.update({
Andrew Geissler475cb722020-07-10 16:00:51 -0500177 'get': self.handle_get,
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600178 'get-outhash': self.handle_get_outhash,
Andrew Geissler475cb722020-07-10 16:00:51 -0500179 'get-stream': self.handle_get_stream,
180 'get-stats': self.handle_get_stats,
Andrew Geisslerc926e172021-05-07 16:11:35 -0500181 })
Brad Bishopa34c0302019-09-23 22:34:48 -0400182
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600183 if not read_only:
184 self.handlers.update({
185 'report': self.handle_report,
186 'report-equiv': self.handle_equivreport,
187 'reset-stats': self.handle_reset_stats,
188 'backfill-wait': self.handle_backfill_wait,
Andrew Geissler20137392023-10-12 04:59:14 -0600189 'remove': self.handle_remove,
190 'clean-unused': self.handle_clean_unused,
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600191 })
192
Andrew Geisslerc926e172021-05-07 16:11:35 -0500193 def validate_proto_version(self):
194 return (self.proto_version > (1, 0) and self.proto_version <= (1, 1))
195
Brad Bishopa34c0302019-09-23 22:34:48 -0400196 async def process_requests(self):
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600197 if self.upstream is not None:
198 self.upstream_client = await create_async_client(self.upstream)
199 else:
200 self.upstream_client = None
201
Andrew Geisslerc926e172021-05-07 16:11:35 -0500202 await super().process_requests()
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600203
Andrew Geisslerc926e172021-05-07 16:11:35 -0500204 if self.upstream_client is not None:
205 await self.upstream_client.close()
Brad Bishopa34c0302019-09-23 22:34:48 -0400206
Andrew Geissler475cb722020-07-10 16:00:51 -0500207 async def dispatch_message(self, msg):
208 for k in self.handlers.keys():
209 if k in msg:
210 logger.debug('Handling %s' % k)
211 if 'stream' in k:
212 await self.handlers[k](msg[k])
213 else:
214 with self.request_stats.start_sample() as self.request_sample, \
215 self.request_sample.measure():
216 await self.handlers[k](msg[k])
217 return
218
Andrew Geisslerc926e172021-05-07 16:11:35 -0500219 raise bb.asyncrpc.ClientError("Unrecognized command %r" % msg)
Andrew Geissler475cb722020-07-10 16:00:51 -0500220
Brad Bishopa34c0302019-09-23 22:34:48 -0400221 async def handle_get(self, request):
222 method = request['method']
223 taskhash = request['taskhash']
Andrew Geisslereff27472021-10-29 15:35:00 -0500224 fetch_all = request.get('all', False)
Brad Bishopa34c0302019-09-23 22:34:48 -0400225
Andrew Geisslereff27472021-10-29 15:35:00 -0500226 with closing(self.db.cursor()) as cursor:
227 d = await self.get_unihash(cursor, method, taskhash, fetch_all)
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600228
229 self.write_message(d)
Brad Bishopa34c0302019-09-23 22:34:48 -0400230
Andrew Geisslereff27472021-10-29 15:35:00 -0500231 async def get_unihash(self, cursor, method, taskhash, fetch_all=False):
232 d = None
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600233
Andrew Geisslereff27472021-10-29 15:35:00 -0500234 if fetch_all:
235 cursor.execute(
236 '''
237 SELECT *, unihashes_v2.unihash AS unihash FROM outhashes_v2
238 INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash
239 WHERE outhashes_v2.method=:method AND outhashes_v2.taskhash=:taskhash
240 ORDER BY outhashes_v2.created ASC
241 LIMIT 1
242 ''',
243 {
244 'method': method,
245 'taskhash': taskhash,
246 }
247
248 )
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600249 row = cursor.fetchone()
250
Andrew Geisslereff27472021-10-29 15:35:00 -0500251 if row is not None:
252 d = {k: row[k] for k in row.keys()}
253 elif self.upstream_client is not None:
254 d = await self.upstream_client.get_taskhash(method, taskhash, True)
255 self.update_unified(cursor, d)
256 self.db.commit()
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600257 else:
Andrew Geisslereff27472021-10-29 15:35:00 -0500258 row = self.query_equivalent(cursor, method, taskhash)
259
260 if row is not None:
261 d = {k: row[k] for k in row.keys()}
262 elif self.upstream_client is not None:
263 d = await self.upstream_client.get_taskhash(method, taskhash)
264 d = {k: v for k, v in d.items() if k in UNIHASH_TABLE_COLUMNS}
265 insert_unihash(cursor, d, Resolve.IGNORE)
266 self.db.commit()
267
268 return d
269
270 async def handle_get_outhash(self, request):
271 method = request['method']
272 outhash = request['outhash']
273 taskhash = request['taskhash']
Andrew Geissler20137392023-10-12 04:59:14 -0600274 with_unihash = request.get("with_unihash", True)
Andrew Geisslereff27472021-10-29 15:35:00 -0500275
276 with closing(self.db.cursor()) as cursor:
Andrew Geissler20137392023-10-12 04:59:14 -0600277 d = await self.get_outhash(cursor, method, outhash, taskhash, with_unihash)
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600278
279 self.write_message(d)
280
Andrew Geissler20137392023-10-12 04:59:14 -0600281 async def get_outhash(self, cursor, method, outhash, taskhash, with_unihash=True):
Andrew Geisslereff27472021-10-29 15:35:00 -0500282 d = None
Andrew Geissler20137392023-10-12 04:59:14 -0600283 if with_unihash:
284 cursor.execute(
285 '''
286 SELECT *, unihashes_v2.unihash AS unihash FROM outhashes_v2
287 INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash
288 WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash
289 ORDER BY outhashes_v2.created ASC
290 LIMIT 1
291 ''',
292 {
293 'method': method,
294 'outhash': outhash,
295 }
296 )
297 else:
298 cursor.execute(
299 """
300 SELECT * FROM outhashes_v2
301 WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash
302 ORDER BY outhashes_v2.created ASC
303 LIMIT 1
304 """,
305 {
306 'method': method,
307 'outhash': outhash,
308 }
309 )
Andrew Geisslereff27472021-10-29 15:35:00 -0500310 row = cursor.fetchone()
311
312 if row is not None:
313 d = {k: row[k] for k in row.keys()}
314 elif self.upstream_client is not None:
315 d = await self.upstream_client.get_outhash(method, outhash, taskhash)
316 self.update_unified(cursor, d)
317 self.db.commit()
318
319 return d
320
321 def update_unified(self, cursor, data):
322 if data is None:
323 return
324
325 insert_unihash(
326 cursor,
327 {k: v for k, v in data.items() if k in UNIHASH_TABLE_COLUMNS},
328 Resolve.IGNORE
329 )
330 insert_outhash(
331 cursor,
332 {k: v for k, v in data.items() if k in OUTHASH_TABLE_COLUMNS},
333 Resolve.IGNORE
334 )
335
Brad Bishopa34c0302019-09-23 22:34:48 -0400336 async def handle_get_stream(self, request):
337 self.write_message('ok')
338
339 while True:
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600340 upstream = None
341
Brad Bishopa34c0302019-09-23 22:34:48 -0400342 l = await self.reader.readline()
343 if not l:
344 return
345
346 try:
347 # This inner loop is very sensitive and must be as fast as
348 # possible (which is why the request sample is handled manually
349 # instead of using 'with', and also why logging statements are
350 # commented out.
351 self.request_sample = self.request_stats.start_sample()
352 request_measure = self.request_sample.measure()
353 request_measure.start()
354
355 l = l.decode('utf-8').rstrip()
356 if l == 'END':
357 self.writer.write('ok\n'.encode('utf-8'))
358 return
359
360 (method, taskhash) = l.split()
361 #logger.debug('Looking up %s %s' % (method, taskhash))
Andrew Geisslereff27472021-10-29 15:35:00 -0500362 cursor = self.db.cursor()
363 try:
364 row = self.query_equivalent(cursor, method, taskhash)
365 finally:
366 cursor.close()
367
Brad Bishopa34c0302019-09-23 22:34:48 -0400368 if row is not None:
369 msg = ('%s\n' % row['unihash']).encode('utf-8')
370 #logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash']))
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600371 elif self.upstream_client is not None:
372 upstream = await self.upstream_client.get_unihash(method, taskhash)
373 if upstream:
374 msg = ("%s\n" % upstream).encode("utf-8")
375 else:
376 msg = "\n".encode("utf-8")
Brad Bishopa34c0302019-09-23 22:34:48 -0400377 else:
378 msg = '\n'.encode('utf-8')
379
380 self.writer.write(msg)
381 finally:
382 request_measure.end()
383 self.request_sample.end()
384
385 await self.writer.drain()
386
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600387 # Post to the backfill queue after writing the result to minimize
388 # the turn around time on a request
389 if upstream is not None:
390 await self.backfill_queue.put((method, taskhash))
391
Brad Bishopa34c0302019-09-23 22:34:48 -0400392 async def handle_report(self, data):
393 with closing(self.db.cursor()) as cursor:
Andrew Geisslereff27472021-10-29 15:35:00 -0500394 outhash_data = {
395 'method': data['method'],
396 'outhash': data['outhash'],
397 'taskhash': data['taskhash'],
398 'created': datetime.now()
399 }
Brad Bishopa34c0302019-09-23 22:34:48 -0400400
Andrew Geisslereff27472021-10-29 15:35:00 -0500401 for k in ('owner', 'PN', 'PV', 'PR', 'task', 'outhash_siginfo'):
402 if k in data:
403 outhash_data[k] = data[k]
Brad Bishopa34c0302019-09-23 22:34:48 -0400404
Andrew Geisslereff27472021-10-29 15:35:00 -0500405 # Insert the new entry, unless it already exists
406 (rowid, inserted) = insert_outhash(cursor, outhash_data, Resolve.IGNORE)
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600407
Andrew Geisslereff27472021-10-29 15:35:00 -0500408 if inserted:
409 # If this row is new, check if it is equivalent to another
410 # output hash
411 cursor.execute(
412 '''
413 SELECT outhashes_v2.taskhash AS taskhash, unihashes_v2.unihash AS unihash FROM outhashes_v2
414 INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash
415 -- Select any matching output hash except the one we just inserted
416 WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash AND outhashes_v2.taskhash!=:taskhash
417 -- Pick the oldest hash
418 ORDER BY outhashes_v2.created ASC
419 LIMIT 1
420 ''',
421 {
422 'method': data['method'],
423 'outhash': data['outhash'],
424 'taskhash': data['taskhash'],
425 }
426 )
427 row = cursor.fetchone()
428
Brad Bishopa34c0302019-09-23 22:34:48 -0400429 if row is not None:
Andrew Geisslereff27472021-10-29 15:35:00 -0500430 # A matching output hash was found. Set our taskhash to the
431 # same unihash since they are equivalent
Brad Bishopa34c0302019-09-23 22:34:48 -0400432 unihash = row['unihash']
Andrew Geisslereff27472021-10-29 15:35:00 -0500433 resolve = Resolve.IGNORE
434 else:
435 # No matching output hash was found. This is probably the
436 # first outhash to be added.
437 unihash = data['unihash']
438 resolve = Resolve.IGNORE
Brad Bishopa34c0302019-09-23 22:34:48 -0400439
Andrew Geisslereff27472021-10-29 15:35:00 -0500440 # Query upstream to see if it has a unihash we can use
441 if self.upstream_client is not None:
442 upstream_data = await self.upstream_client.get_outhash(data['method'], data['outhash'], data['taskhash'])
443 if upstream_data is not None:
444 unihash = upstream_data['unihash']
Brad Bishopa34c0302019-09-23 22:34:48 -0400445
Brad Bishopa34c0302019-09-23 22:34:48 -0400446
Andrew Geisslereff27472021-10-29 15:35:00 -0500447 insert_unihash(
448 cursor,
449 {
450 'method': data['method'],
451 'taskhash': data['taskhash'],
452 'unihash': unihash,
453 },
454 resolve
455 )
Brad Bishopa34c0302019-09-23 22:34:48 -0400456
Andrew Geisslereff27472021-10-29 15:35:00 -0500457 unihash_data = await self.get_unihash(cursor, data['method'], data['taskhash'])
458 if unihash_data is not None:
459 unihash = unihash_data['unihash']
Brad Bishopa34c0302019-09-23 22:34:48 -0400460 else:
Andrew Geisslereff27472021-10-29 15:35:00 -0500461 unihash = data['unihash']
462
463 self.db.commit()
464
465 d = {
466 'taskhash': data['taskhash'],
467 'method': data['method'],
468 'unihash': unihash,
469 }
Brad Bishopa34c0302019-09-23 22:34:48 -0400470
471 self.write_message(d)
472
Andrew Geissler82c905d2020-04-13 13:39:40 -0500473 async def handle_equivreport(self, data):
474 with closing(self.db.cursor()) as cursor:
475 insert_data = {
476 'method': data['method'],
Andrew Geissler82c905d2020-04-13 13:39:40 -0500477 'taskhash': data['taskhash'],
478 'unihash': data['unihash'],
Andrew Geissler82c905d2020-04-13 13:39:40 -0500479 }
Andrew Geisslereff27472021-10-29 15:35:00 -0500480 insert_unihash(cursor, insert_data, Resolve.IGNORE)
Andrew Geissler82c905d2020-04-13 13:39:40 -0500481 self.db.commit()
482
483 # Fetch the unihash that will be reported for the taskhash. If the
484 # unihash matches, it means this row was inserted (or the mapping
485 # was already valid)
Andrew Geisslereff27472021-10-29 15:35:00 -0500486 row = self.query_equivalent(cursor, data['method'], data['taskhash'])
Andrew Geissler82c905d2020-04-13 13:39:40 -0500487
488 if row['unihash'] == data['unihash']:
489 logger.info('Adding taskhash equivalence for %s with unihash %s',
490 data['taskhash'], row['unihash'])
491
492 d = {k: row[k] for k in ('taskhash', 'method', 'unihash')}
493
494 self.write_message(d)
495
496
Brad Bishopa34c0302019-09-23 22:34:48 -0400497 async def handle_get_stats(self, request):
498 d = {
499 'requests': self.request_stats.todict(),
500 }
501
502 self.write_message(d)
503
504 async def handle_reset_stats(self, request):
505 d = {
506 'requests': self.request_stats.todict(),
507 }
508
509 self.request_stats.reset()
510 self.write_message(d)
511
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600512 async def handle_backfill_wait(self, request):
513 d = {
514 'tasks': self.backfill_queue.qsize(),
515 }
516 await self.backfill_queue.join()
517 self.write_message(d)
518
Andrew Geissler20137392023-10-12 04:59:14 -0600519 async def handle_remove(self, request):
520 condition = request["where"]
521 if not isinstance(condition, dict):
522 raise TypeError("Bad condition type %s" % type(condition))
523
524 def do_remove(columns, table_name, cursor):
525 nonlocal condition
526 where = {}
527 for c in columns:
528 if c in condition and condition[c] is not None:
529 where[c] = condition[c]
530
531 if where:
532 query = ('DELETE FROM %s WHERE ' % table_name) + ' AND '.join("%s=:%s" % (k, k) for k in where.keys())
533 cursor.execute(query, where)
534 return cursor.rowcount
535
536 return 0
537
538 count = 0
539 with closing(self.db.cursor()) as cursor:
540 count += do_remove(OUTHASH_TABLE_COLUMNS, "outhashes_v2", cursor)
541 count += do_remove(UNIHASH_TABLE_COLUMNS, "unihashes_v2", cursor)
542 self.db.commit()
543
544 self.write_message({"count": count})
545
546 async def handle_clean_unused(self, request):
547 max_age = request["max_age_seconds"]
548 with closing(self.db.cursor()) as cursor:
549 cursor.execute(
550 """
551 DELETE FROM outhashes_v2 WHERE created<:oldest AND NOT EXISTS (
552 SELECT unihashes_v2.id FROM unihashes_v2 WHERE unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash LIMIT 1
553 )
554 """,
555 {
556 "oldest": datetime.now() - timedelta(seconds=-max_age)
557 }
558 )
559 count = cursor.rowcount
560
561 self.write_message({"count": count})
562
Andrew Geisslereff27472021-10-29 15:35:00 -0500563 def query_equivalent(self, cursor, method, taskhash):
Brad Bishopa34c0302019-09-23 22:34:48 -0400564 # This is part of the inner loop and must be as fast as possible
Andrew Geisslereff27472021-10-29 15:35:00 -0500565 cursor.execute(
566 'SELECT taskhash, method, unihash FROM unihashes_v2 WHERE method=:method AND taskhash=:taskhash',
567 {
568 'method': method,
569 'taskhash': taskhash,
570 }
571 )
572 return cursor.fetchone()
Brad Bishopa34c0302019-09-23 22:34:48 -0400573
574
Andrew Geisslerc926e172021-05-07 16:11:35 -0500575class Server(bb.asyncrpc.AsyncServer):
Andrew Geisslerd159c7f2021-09-02 21:05:58 -0500576 def __init__(self, db, upstream=None, read_only=False):
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600577 if upstream and read_only:
Andrew Geisslerc926e172021-05-07 16:11:35 -0500578 raise bb.asyncrpc.ServerError("Read-only hashserv cannot pull from an upstream server")
579
Andrew Geisslerd159c7f2021-09-02 21:05:58 -0500580 super().__init__(logger)
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600581
Brad Bishopa34c0302019-09-23 22:34:48 -0400582 self.request_stats = Stats()
583 self.db = db
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600584 self.upstream = upstream
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600585 self.read_only = read_only
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600586
Andrew Geisslerc926e172021-05-07 16:11:35 -0500587 def accept_client(self, reader, writer):
588 return ServerClient(reader, writer, self.db, self.request_stats, self.backfill_queue, self.upstream, self.read_only)
Brad Bishopa34c0302019-09-23 22:34:48 -0400589
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600590 @contextmanager
591 def _backfill_worker(self):
592 async def backfill_worker_task():
593 client = await create_async_client(self.upstream)
594 try:
595 while True:
596 item = await self.backfill_queue.get()
597 if item is None:
598 self.backfill_queue.task_done()
599 break
600 method, taskhash = item
Andrew Geisslereff27472021-10-29 15:35:00 -0500601 await copy_unihash_from_upstream(client, self.db, method, taskhash)
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600602 self.backfill_queue.task_done()
603 finally:
604 await client.close()
605
606 async def join_worker(worker):
607 await self.backfill_queue.put(None)
608 await worker
609
610 if self.upstream is not None:
611 worker = asyncio.ensure_future(backfill_worker_task())
612 try:
613 yield
614 finally:
615 self.loop.run_until_complete(join_worker(worker))
616 else:
617 yield
618
Andrew Geisslerc926e172021-05-07 16:11:35 -0500619 def run_loop_forever(self):
620 self.backfill_queue = asyncio.Queue()
Brad Bishopa34c0302019-09-23 22:34:48 -0400621
Andrew Geisslerc926e172021-05-07 16:11:35 -0500622 with self._backfill_worker():
623 super().run_loop_forever()