blob: 68f64f983b26d7466acaa1516daf1d47db69d6bd [file] [log] [blame]
Brad Bishopa34c0302019-09-23 22:34:48 -04001# Copyright (C) 2019 Garmin Ltd.
2#
3# SPDX-License-Identifier: GPL-2.0-only
4#
5
Andrew Geissler20137392023-10-12 04:59:14 -06006from datetime import datetime, timedelta
Brad Bishopa34c0302019-09-23 22:34:48 -04007import asyncio
Brad Bishopa34c0302019-09-23 22:34:48 -04008import logging
9import math
Brad Bishopa34c0302019-09-23 22:34:48 -040010import time
Patrick Williamsac13d5f2023-11-24 18:59:46 -060011import os
12import base64
13import hashlib
14from . import create_async_client
Andrew Geisslerc926e172021-05-07 16:11:35 -050015import bb.asyncrpc
16
Patrick Williamsac13d5f2023-11-24 18:59:46 -060017logger = logging.getLogger("hashserv.server")
Brad Bishopa34c0302019-09-23 22:34:48 -040018
Patrick Williamsac13d5f2023-11-24 18:59:46 -060019
20# This permission only exists to match nothing
21NONE_PERM = "@none"
22
23READ_PERM = "@read"
24REPORT_PERM = "@report"
25DB_ADMIN_PERM = "@db-admin"
26USER_ADMIN_PERM = "@user-admin"
27ALL_PERM = "@all"
28
29ALL_PERMISSIONS = {
30 READ_PERM,
31 REPORT_PERM,
32 DB_ADMIN_PERM,
33 USER_ADMIN_PERM,
34 ALL_PERM,
35}
36
37DEFAULT_ANON_PERMS = (
38 READ_PERM,
39 REPORT_PERM,
40 DB_ADMIN_PERM,
41)
42
43TOKEN_ALGORITHM = "sha256"
44
45# 48 bytes of random data will result in 64 characters when base64
46# encoded. This number also ensures that the base64 encoding won't have any
47# trailing '=' characters.
48TOKEN_SIZE = 48
49
50SALT_SIZE = 8
Brad Bishopa34c0302019-09-23 22:34:48 -040051
52
53class Measurement(object):
54 def __init__(self, sample):
55 self.sample = sample
56
57 def start(self):
58 self.start_time = time.perf_counter()
59
60 def end(self):
61 self.sample.add(time.perf_counter() - self.start_time)
62
63 def __enter__(self):
64 self.start()
65 return self
66
67 def __exit__(self, *args, **kwargs):
68 self.end()
69
70
71class Sample(object):
72 def __init__(self, stats):
73 self.stats = stats
74 self.num_samples = 0
75 self.elapsed = 0
76
77 def measure(self):
78 return Measurement(self)
79
80 def __enter__(self):
81 return self
82
83 def __exit__(self, *args, **kwargs):
84 self.end()
85
86 def add(self, elapsed):
87 self.num_samples += 1
88 self.elapsed += elapsed
89
90 def end(self):
91 if self.num_samples:
92 self.stats.add(self.elapsed)
93 self.num_samples = 0
94 self.elapsed = 0
95
96
97class Stats(object):
98 def __init__(self):
99 self.reset()
100
101 def reset(self):
102 self.num = 0
103 self.total_time = 0
104 self.max_time = 0
105 self.m = 0
106 self.s = 0
107 self.current_elapsed = None
108
109 def add(self, elapsed):
110 self.num += 1
111 if self.num == 1:
112 self.m = elapsed
113 self.s = 0
114 else:
115 last_m = self.m
116 self.m = last_m + (elapsed - last_m) / self.num
117 self.s = self.s + (elapsed - last_m) * (elapsed - self.m)
118
119 self.total_time += elapsed
120
121 if self.max_time < elapsed:
122 self.max_time = elapsed
123
124 def start_sample(self):
125 return Sample(self)
126
127 @property
128 def average(self):
129 if self.num == 0:
130 return 0
131 return self.total_time / self.num
132
133 @property
134 def stdev(self):
135 if self.num <= 1:
136 return 0
137 return math.sqrt(self.s / (self.num - 1))
138
139 def todict(self):
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600140 return {
141 k: getattr(self, k)
142 for k in ("num", "total_time", "max_time", "average", "stdev")
143 }
Brad Bishopa34c0302019-09-23 22:34:48 -0400144
145
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600146token_refresh_semaphore = asyncio.Lock()
Andrew Geisslereff27472021-10-29 15:35:00 -0500147
148
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600149async def new_token():
150 # Prevent malicious users from using this API to deduce the entropy
151 # pool on the server and thus be able to guess a token. *All* token
152 # refresh requests lock the same global semaphore and then sleep for a
153 # short time. The effectively rate limits the total number of requests
154 # than can be made across all clients to 10/second, which should be enough
155 # since you have to be an authenticated users to make the request in the
156 # first place
157 async with token_refresh_semaphore:
158 await asyncio.sleep(0.1)
159 raw = os.getrandom(TOKEN_SIZE, os.GRND_NONBLOCK)
Andrew Geisslereff27472021-10-29 15:35:00 -0500160
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600161 return base64.b64encode(raw, b"._").decode("utf-8")
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600162
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600163
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600164def new_salt():
165 return os.getrandom(SALT_SIZE, os.GRND_NONBLOCK).hex()
166
167
168def hash_token(algo, salt, token):
169 h = hashlib.new(algo)
170 h.update(salt.encode("utf-8"))
171 h.update(token.encode("utf-8"))
172 return ":".join([algo, salt, h.hexdigest()])
173
174
175def permissions(*permissions, allow_anon=True, allow_self_service=False):
176 """
177 Function decorator that can be used to decorate an RPC function call and
178 check that the current users permissions match the require permissions.
179
180 If allow_anon is True, the user will also be allowed to make the RPC call
181 if the anonymous user permissions match the permissions.
182
183 If allow_self_service is True, and the "username" property in the request
184 is the currently logged in user, or not specified, the user will also be
185 allowed to make the request. This allows users to access normal privileged
186 API, as long as they are only modifying their own user properties (e.g.
187 users can be allowed to reset their own token without @user-admin
188 permissions, but not the token for any other user.
189 """
190
191 def wrapper(func):
192 async def wrap(self, request):
193 if allow_self_service and self.user is not None:
194 username = request.get("username", self.user.username)
195 if username == self.user.username:
196 request["username"] = self.user.username
197 return await func(self, request)
198
199 if not self.user_has_permissions(*permissions, allow_anon=allow_anon):
200 if not self.user:
201 username = "Anonymous user"
Patrick Williams73bd93f2024-02-20 08:07:48 -0600202 user_perms = self.server.anon_perms
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600203 else:
204 username = self.user.username
205 user_perms = self.user.permissions
206
207 self.logger.info(
208 "User %s with permissions %r denied from calling %s. Missing permissions(s) %r",
209 username,
210 ", ".join(user_perms),
211 func.__name__,
212 ", ".join(permissions),
213 )
214 raise bb.asyncrpc.InvokeError(
215 f"{username} is not allowed to access permissions(s) {', '.join(permissions)}"
216 )
217
218 return await func(self, request)
219
220 return wrap
221
222 return wrapper
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600223
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600224
Andrew Geisslerc926e172021-05-07 16:11:35 -0500225class ServerClient(bb.asyncrpc.AsyncServerConnection):
Patrick Williams73bd93f2024-02-20 08:07:48 -0600226 def __init__(self, socket, server):
227 super().__init__(socket, "OEHASHEQUIV", server.logger)
228 self.server = server
Andrew Geisslerc926e172021-05-07 16:11:35 -0500229 self.max_chunk = bb.asyncrpc.DEFAULT_MAX_CHUNK
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600230 self.user = None
Andrew Geissler475cb722020-07-10 16:00:51 -0500231
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600232 self.handlers.update(
233 {
234 "get": self.handle_get,
235 "get-outhash": self.handle_get_outhash,
236 "get-stream": self.handle_get_stream,
Patrick Williams73bd93f2024-02-20 08:07:48 -0600237 "exists-stream": self.handle_exists_stream,
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600238 "get-stats": self.handle_get_stats,
239 "get-db-usage": self.handle_get_db_usage,
240 "get-db-query-columns": self.handle_get_db_query_columns,
241 # Not always read-only, but internally checks if the server is
242 # read-only
243 "report": self.handle_report,
244 "auth": self.handle_auth,
245 "get-user": self.handle_get_user,
246 "get-all-users": self.handle_get_all_users,
247 "become-user": self.handle_become_user,
248 }
249 )
Brad Bishopa34c0302019-09-23 22:34:48 -0400250
Patrick Williams73bd93f2024-02-20 08:07:48 -0600251 if not self.server.read_only:
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600252 self.handlers.update(
253 {
254 "report-equiv": self.handle_equivreport,
255 "reset-stats": self.handle_reset_stats,
256 "backfill-wait": self.handle_backfill_wait,
257 "remove": self.handle_remove,
Patrick Williams73bd93f2024-02-20 08:07:48 -0600258 "gc-mark": self.handle_gc_mark,
259 "gc-sweep": self.handle_gc_sweep,
260 "gc-status": self.handle_gc_status,
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600261 "clean-unused": self.handle_clean_unused,
262 "refresh-token": self.handle_refresh_token,
263 "set-user-perms": self.handle_set_perms,
264 "new-user": self.handle_new_user,
265 "delete-user": self.handle_delete_user,
266 }
267 )
268
269 def raise_no_user_error(self, username):
270 raise bb.asyncrpc.InvokeError(f"No user named '{username}' exists")
271
272 def user_has_permissions(self, *permissions, allow_anon=True):
273 permissions = set(permissions)
274 if allow_anon:
Patrick Williams73bd93f2024-02-20 08:07:48 -0600275 if ALL_PERM in self.server.anon_perms:
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600276 return True
277
Patrick Williams73bd93f2024-02-20 08:07:48 -0600278 if not permissions - self.server.anon_perms:
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600279 return True
280
281 if self.user is None:
282 return False
283
284 if ALL_PERM in self.user.permissions:
285 return True
286
287 if not permissions - self.user.permissions:
288 return True
289
290 return False
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600291
Andrew Geisslerc926e172021-05-07 16:11:35 -0500292 def validate_proto_version(self):
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600293 return self.proto_version > (1, 0) and self.proto_version <= (1, 1)
Andrew Geisslerc926e172021-05-07 16:11:35 -0500294
Brad Bishopa34c0302019-09-23 22:34:48 -0400295 async def process_requests(self):
Patrick Williams73bd93f2024-02-20 08:07:48 -0600296 async with self.server.db_engine.connect(self.logger) as db:
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600297 self.db = db
Patrick Williams73bd93f2024-02-20 08:07:48 -0600298 if self.server.upstream is not None:
299 self.upstream_client = await create_async_client(self.server.upstream)
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600300 else:
301 self.upstream_client = None
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600302
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600303 try:
304 await super().process_requests()
305 finally:
306 if self.upstream_client is not None:
307 await self.upstream_client.close()
Brad Bishopa34c0302019-09-23 22:34:48 -0400308
Andrew Geissler475cb722020-07-10 16:00:51 -0500309 async def dispatch_message(self, msg):
310 for k in self.handlers.keys():
311 if k in msg:
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600312 self.logger.debug("Handling %s" % k)
313 if "stream" in k:
314 return await self.handlers[k](msg[k])
Andrew Geissler475cb722020-07-10 16:00:51 -0500315 else:
Patrick Williams73bd93f2024-02-20 08:07:48 -0600316 with self.server.request_stats.start_sample() as self.request_sample, self.request_sample.measure():
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600317 return await self.handlers[k](msg[k])
Andrew Geissler475cb722020-07-10 16:00:51 -0500318
Andrew Geisslerc926e172021-05-07 16:11:35 -0500319 raise bb.asyncrpc.ClientError("Unrecognized command %r" % msg)
Andrew Geissler475cb722020-07-10 16:00:51 -0500320
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600321 @permissions(READ_PERM)
Brad Bishopa34c0302019-09-23 22:34:48 -0400322 async def handle_get(self, request):
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600323 method = request["method"]
324 taskhash = request["taskhash"]
325 fetch_all = request.get("all", False)
Brad Bishopa34c0302019-09-23 22:34:48 -0400326
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600327 return await self.get_unihash(method, taskhash, fetch_all)
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600328
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600329 async def get_unihash(self, method, taskhash, fetch_all=False):
Andrew Geisslereff27472021-10-29 15:35:00 -0500330 d = None
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600331
Andrew Geisslereff27472021-10-29 15:35:00 -0500332 if fetch_all:
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600333 row = await self.db.get_unihash_by_taskhash_full(method, taskhash)
Andrew Geisslereff27472021-10-29 15:35:00 -0500334 if row is not None:
335 d = {k: row[k] for k in row.keys()}
336 elif self.upstream_client is not None:
337 d = await self.upstream_client.get_taskhash(method, taskhash, True)
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600338 await self.update_unified(d)
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600339 else:
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600340 row = await self.db.get_equivalent(method, taskhash)
Andrew Geisslereff27472021-10-29 15:35:00 -0500341
342 if row is not None:
343 d = {k: row[k] for k in row.keys()}
344 elif self.upstream_client is not None:
345 d = await self.upstream_client.get_taskhash(method, taskhash)
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600346 await self.db.insert_unihash(d["method"], d["taskhash"], d["unihash"])
Andrew Geisslereff27472021-10-29 15:35:00 -0500347
348 return d
349
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600350 @permissions(READ_PERM)
Andrew Geisslereff27472021-10-29 15:35:00 -0500351 async def handle_get_outhash(self, request):
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600352 method = request["method"]
353 outhash = request["outhash"]
354 taskhash = request["taskhash"]
Andrew Geissler20137392023-10-12 04:59:14 -0600355 with_unihash = request.get("with_unihash", True)
Andrew Geisslereff27472021-10-29 15:35:00 -0500356
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600357 return await self.get_outhash(method, outhash, taskhash, with_unihash)
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600358
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600359 async def get_outhash(self, method, outhash, taskhash, with_unihash=True):
Andrew Geisslereff27472021-10-29 15:35:00 -0500360 d = None
Andrew Geissler20137392023-10-12 04:59:14 -0600361 if with_unihash:
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600362 row = await self.db.get_unihash_by_outhash(method, outhash)
Andrew Geissler20137392023-10-12 04:59:14 -0600363 else:
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600364 row = await self.db.get_outhash(method, outhash)
Andrew Geisslereff27472021-10-29 15:35:00 -0500365
366 if row is not None:
367 d = {k: row[k] for k in row.keys()}
368 elif self.upstream_client is not None:
369 d = await self.upstream_client.get_outhash(method, outhash, taskhash)
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600370 await self.update_unified(d)
Andrew Geisslereff27472021-10-29 15:35:00 -0500371
372 return d
373
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600374 async def update_unified(self, data):
Andrew Geisslereff27472021-10-29 15:35:00 -0500375 if data is None:
376 return
377
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600378 await self.db.insert_unihash(data["method"], data["taskhash"], data["unihash"])
379 await self.db.insert_outhash(data)
Andrew Geisslereff27472021-10-29 15:35:00 -0500380
Patrick Williams73bd93f2024-02-20 08:07:48 -0600381 async def _stream_handler(self, handler):
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600382 await self.socket.send_message("ok")
Brad Bishopa34c0302019-09-23 22:34:48 -0400383
384 while True:
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600385 upstream = None
386
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600387 l = await self.socket.recv()
Brad Bishopa34c0302019-09-23 22:34:48 -0400388 if not l:
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600389 break
Brad Bishopa34c0302019-09-23 22:34:48 -0400390
391 try:
392 # This inner loop is very sensitive and must be as fast as
393 # possible (which is why the request sample is handled manually
394 # instead of using 'with', and also why logging statements are
395 # commented out.
Patrick Williams73bd93f2024-02-20 08:07:48 -0600396 self.request_sample = self.server.request_stats.start_sample()
Brad Bishopa34c0302019-09-23 22:34:48 -0400397 request_measure = self.request_sample.measure()
398 request_measure.start()
399
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600400 if l == "END":
401 break
Brad Bishopa34c0302019-09-23 22:34:48 -0400402
Patrick Williams73bd93f2024-02-20 08:07:48 -0600403 msg = await handler(l)
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600404 await self.socket.send(msg)
Brad Bishopa34c0302019-09-23 22:34:48 -0400405 finally:
406 request_measure.end()
407 self.request_sample.end()
408
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600409 await self.socket.send("ok")
410 return self.NO_RESPONSE
Brad Bishopa34c0302019-09-23 22:34:48 -0400411
Patrick Williams73bd93f2024-02-20 08:07:48 -0600412 @permissions(READ_PERM)
413 async def handle_get_stream(self, request):
414 async def handler(l):
415 (method, taskhash) = l.split()
416 # self.logger.debug('Looking up %s %s' % (method, taskhash))
417 row = await self.db.get_equivalent(method, taskhash)
418
419 if row is not None:
420 # self.logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash']))
421 return row["unihash"]
422
423 if self.upstream_client is not None:
424 upstream = await self.upstream_client.get_unihash(method, taskhash)
425 if upstream:
426 await self.server.backfill_queue.put((method, taskhash))
427 return upstream
428
429 return ""
430
431 return await self._stream_handler(handler)
432
433 @permissions(READ_PERM)
434 async def handle_exists_stream(self, request):
435 async def handler(l):
436 if await self.db.unihash_exists(l):
437 return "true"
438
439 if self.upstream_client is not None:
440 if await self.upstream_client.unihash_exists(l):
441 return "true"
442
443 return "false"
444
445 return await self._stream_handler(handler)
446
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600447 async def report_readonly(self, data):
448 method = data["method"]
449 outhash = data["outhash"]
450 taskhash = data["taskhash"]
Brad Bishopa34c0302019-09-23 22:34:48 -0400451
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600452 info = await self.get_outhash(method, outhash, taskhash)
453 if info:
454 unihash = info["unihash"]
455 else:
456 unihash = data["unihash"]
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600457
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600458 return {
459 "taskhash": taskhash,
460 "method": method,
461 "unihash": unihash,
Brad Bishopa34c0302019-09-23 22:34:48 -0400462 }
463
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600464 # Since this can be called either read only or to report, the check to
465 # report is made inside the function
466 @permissions(READ_PERM)
467 async def handle_report(self, data):
Patrick Williams73bd93f2024-02-20 08:07:48 -0600468 if self.server.read_only or not self.user_has_permissions(REPORT_PERM):
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600469 return await self.report_readonly(data)
Brad Bishopa34c0302019-09-23 22:34:48 -0400470
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600471 outhash_data = {
472 "method": data["method"],
473 "outhash": data["outhash"],
474 "taskhash": data["taskhash"],
475 "created": datetime.now(),
476 }
477
478 for k in ("owner", "PN", "PV", "PR", "task", "outhash_siginfo"):
479 if k in data:
480 outhash_data[k] = data[k]
481
482 if self.user:
483 outhash_data["owner"] = self.user.username
484
485 # Insert the new entry, unless it already exists
486 if await self.db.insert_outhash(outhash_data):
487 # If this row is new, check if it is equivalent to another
488 # output hash
489 row = await self.db.get_equivalent_for_outhash(
490 data["method"], data["outhash"], data["taskhash"]
491 )
492
493 if row is not None:
494 # A matching output hash was found. Set our taskhash to the
495 # same unihash since they are equivalent
496 unihash = row["unihash"]
497 else:
498 # No matching output hash was found. This is probably the
499 # first outhash to be added.
500 unihash = data["unihash"]
501
502 # Query upstream to see if it has a unihash we can use
503 if self.upstream_client is not None:
504 upstream_data = await self.upstream_client.get_outhash(
505 data["method"], data["outhash"], data["taskhash"]
506 )
507 if upstream_data is not None:
508 unihash = upstream_data["unihash"]
509
510 await self.db.insert_unihash(data["method"], data["taskhash"], unihash)
511
512 unihash_data = await self.get_unihash(data["method"], data["taskhash"])
513 if unihash_data is not None:
514 unihash = unihash_data["unihash"]
515 else:
516 unihash = data["unihash"]
517
518 return {
519 "taskhash": data["taskhash"],
520 "method": data["method"],
521 "unihash": unihash,
522 }
523
524 @permissions(READ_PERM, REPORT_PERM)
525 async def handle_equivreport(self, data):
526 await self.db.insert_unihash(data["method"], data["taskhash"], data["unihash"])
527
528 # Fetch the unihash that will be reported for the taskhash. If the
529 # unihash matches, it means this row was inserted (or the mapping
530 # was already valid)
531 row = await self.db.get_equivalent(data["method"], data["taskhash"])
532
533 if row["unihash"] == data["unihash"]:
534 self.logger.info(
535 "Adding taskhash equivalence for %s with unihash %s",
536 data["taskhash"],
537 row["unihash"],
538 )
539
540 return {k: row[k] for k in ("taskhash", "method", "unihash")}
541
542 @permissions(READ_PERM)
543 async def handle_get_stats(self, request):
544 return {
Patrick Williams73bd93f2024-02-20 08:07:48 -0600545 "requests": self.server.request_stats.todict(),
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600546 }
547
548 @permissions(DB_ADMIN_PERM)
Brad Bishopa34c0302019-09-23 22:34:48 -0400549 async def handle_reset_stats(self, request):
550 d = {
Patrick Williams73bd93f2024-02-20 08:07:48 -0600551 "requests": self.server.request_stats.todict(),
Brad Bishopa34c0302019-09-23 22:34:48 -0400552 }
553
Patrick Williams73bd93f2024-02-20 08:07:48 -0600554 self.server.request_stats.reset()
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600555 return d
Brad Bishopa34c0302019-09-23 22:34:48 -0400556
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600557 @permissions(READ_PERM)
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600558 async def handle_backfill_wait(self, request):
559 d = {
Patrick Williams73bd93f2024-02-20 08:07:48 -0600560 "tasks": self.server.backfill_queue.qsize(),
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600561 }
Patrick Williams73bd93f2024-02-20 08:07:48 -0600562 await self.server.backfill_queue.join()
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600563 return d
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600564
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600565 @permissions(DB_ADMIN_PERM)
Andrew Geissler20137392023-10-12 04:59:14 -0600566 async def handle_remove(self, request):
567 condition = request["where"]
568 if not isinstance(condition, dict):
569 raise TypeError("Bad condition type %s" % type(condition))
570
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600571 return {"count": await self.db.remove(condition)}
Andrew Geissler20137392023-10-12 04:59:14 -0600572
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600573 @permissions(DB_ADMIN_PERM)
Patrick Williams73bd93f2024-02-20 08:07:48 -0600574 async def handle_gc_mark(self, request):
575 condition = request["where"]
576 mark = request["mark"]
577
578 if not isinstance(condition, dict):
579 raise TypeError("Bad condition type %s" % type(condition))
580
581 if not isinstance(mark, str):
582 raise TypeError("Bad mark type %s" % type(mark))
583
584 return {"count": await self.db.gc_mark(mark, condition)}
585
586 @permissions(DB_ADMIN_PERM)
587 async def handle_gc_sweep(self, request):
588 mark = request["mark"]
589
590 if not isinstance(mark, str):
591 raise TypeError("Bad mark type %s" % type(mark))
592
593 current_mark = await self.db.get_current_gc_mark()
594
595 if not current_mark or mark != current_mark:
596 raise bb.asyncrpc.InvokeError(
597 f"'{mark}' is not the current mark. Refusing to sweep"
598 )
599
600 count = await self.db.gc_sweep()
601
602 return {"count": count}
603
604 @permissions(DB_ADMIN_PERM)
605 async def handle_gc_status(self, request):
606 (keep_rows, remove_rows, current_mark) = await self.db.gc_status()
607 return {
608 "keep": keep_rows,
609 "remove": remove_rows,
610 "mark": current_mark,
611 }
612
613 @permissions(DB_ADMIN_PERM)
Andrew Geissler20137392023-10-12 04:59:14 -0600614 async def handle_clean_unused(self, request):
615 max_age = request["max_age_seconds"]
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600616 oldest = datetime.now() - timedelta(seconds=-max_age)
617 return {"count": await self.db.clean_unused(oldest)}
Andrew Geissler20137392023-10-12 04:59:14 -0600618
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600619 @permissions(DB_ADMIN_PERM)
620 async def handle_get_db_usage(self, request):
621 return {"usage": await self.db.get_usage()}
Andrew Geissler20137392023-10-12 04:59:14 -0600622
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600623 @permissions(DB_ADMIN_PERM)
624 async def handle_get_db_query_columns(self, request):
625 return {"columns": await self.db.get_query_columns()}
626
627 # The authentication API is always allowed
628 async def handle_auth(self, request):
629 username = str(request["username"])
630 token = str(request["token"])
631
632 async def fail_auth():
633 nonlocal username
634 # Rate limit bad login attempts
635 await asyncio.sleep(1)
636 raise bb.asyncrpc.InvokeError(f"Unable to authenticate as {username}")
637
638 user, db_token = await self.db.lookup_user_token(username)
639
640 if not user or not db_token:
641 await fail_auth()
642
643 try:
644 algo, salt, _ = db_token.split(":")
645 except ValueError:
646 await fail_auth()
647
648 if hash_token(algo, salt, token) != db_token:
649 await fail_auth()
650
651 self.user = user
652
653 self.logger.info("Authenticated as %s", username)
654
655 return {
656 "result": True,
657 "username": self.user.username,
658 "permissions": sorted(list(self.user.permissions)),
659 }
660
661 @permissions(USER_ADMIN_PERM, allow_self_service=True, allow_anon=False)
662 async def handle_refresh_token(self, request):
663 username = str(request["username"])
664
665 token = await new_token()
666
667 updated = await self.db.set_user_token(
668 username,
669 hash_token(TOKEN_ALGORITHM, new_salt(), token),
Andrew Geisslereff27472021-10-29 15:35:00 -0500670 )
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600671 if not updated:
672 self.raise_no_user_error(username)
673
674 return {"username": username, "token": token}
675
676 def get_perm_arg(self, arg):
677 if not isinstance(arg, list):
678 raise bb.asyncrpc.InvokeError("Unexpected type for permissions")
679
680 arg = set(arg)
681 try:
682 arg.remove(NONE_PERM)
683 except KeyError:
684 pass
685
686 unknown_perms = arg - ALL_PERMISSIONS
687 if unknown_perms:
688 raise bb.asyncrpc.InvokeError(
689 "Unknown permissions %s" % ", ".join(sorted(list(unknown_perms)))
690 )
691
692 return sorted(list(arg))
693
694 def return_perms(self, permissions):
695 if ALL_PERM in permissions:
696 return sorted(list(ALL_PERMISSIONS))
697 return sorted(list(permissions))
698
699 @permissions(USER_ADMIN_PERM, allow_anon=False)
700 async def handle_set_perms(self, request):
701 username = str(request["username"])
702 permissions = self.get_perm_arg(request["permissions"])
703
704 if not await self.db.set_user_perms(username, permissions):
705 self.raise_no_user_error(username)
706
707 return {
708 "username": username,
709 "permissions": self.return_perms(permissions),
710 }
711
712 @permissions(USER_ADMIN_PERM, allow_self_service=True, allow_anon=False)
713 async def handle_get_user(self, request):
714 username = str(request["username"])
715
716 user = await self.db.lookup_user(username)
717 if user is None:
718 return None
719
720 return {
721 "username": user.username,
722 "permissions": self.return_perms(user.permissions),
723 }
724
725 @permissions(USER_ADMIN_PERM, allow_anon=False)
726 async def handle_get_all_users(self, request):
727 users = await self.db.get_all_users()
728 return {
729 "users": [
730 {
731 "username": u.username,
732 "permissions": self.return_perms(u.permissions),
733 }
734 for u in users
735 ]
736 }
737
738 @permissions(USER_ADMIN_PERM, allow_anon=False)
739 async def handle_new_user(self, request):
740 username = str(request["username"])
741 permissions = self.get_perm_arg(request["permissions"])
742
743 token = await new_token()
744
745 inserted = await self.db.new_user(
746 username,
747 permissions,
748 hash_token(TOKEN_ALGORITHM, new_salt(), token),
749 )
750 if not inserted:
751 raise bb.asyncrpc.InvokeError(f"Cannot create new user '{username}'")
752
753 return {
754 "username": username,
755 "permissions": self.return_perms(permissions),
756 "token": token,
757 }
758
759 @permissions(USER_ADMIN_PERM, allow_self_service=True, allow_anon=False)
760 async def handle_delete_user(self, request):
761 username = str(request["username"])
762
763 if not await self.db.delete_user(username):
764 self.raise_no_user_error(username)
765
766 return {"username": username}
767
768 @permissions(USER_ADMIN_PERM, allow_anon=False)
769 async def handle_become_user(self, request):
770 username = str(request["username"])
771
772 user = await self.db.lookup_user(username)
773 if user is None:
774 raise bb.asyncrpc.InvokeError(f"User {username} doesn't exist")
775
776 self.user = user
777
778 self.logger.info("Became user %s", username)
779
780 return {
781 "username": self.user.username,
782 "permissions": self.return_perms(self.user.permissions),
783 }
Brad Bishopa34c0302019-09-23 22:34:48 -0400784
785
Andrew Geisslerc926e172021-05-07 16:11:35 -0500786class Server(bb.asyncrpc.AsyncServer):
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600787 def __init__(
788 self,
789 db_engine,
790 upstream=None,
791 read_only=False,
792 anon_perms=DEFAULT_ANON_PERMS,
793 admin_username=None,
794 admin_password=None,
795 ):
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600796 if upstream and read_only:
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600797 raise bb.asyncrpc.ServerError(
798 "Read-only hashserv cannot pull from an upstream server"
799 )
800
801 disallowed_perms = set(anon_perms) - set(
802 [NONE_PERM, READ_PERM, REPORT_PERM, DB_ADMIN_PERM]
803 )
804
805 if disallowed_perms:
806 raise bb.asyncrpc.ServerError(
807 f"Permission(s) {' '.join(disallowed_perms)} are not allowed for anonymous users"
808 )
Andrew Geisslerc926e172021-05-07 16:11:35 -0500809
Andrew Geisslerd159c7f2021-09-02 21:05:58 -0500810 super().__init__(logger)
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600811
Brad Bishopa34c0302019-09-23 22:34:48 -0400812 self.request_stats = Stats()
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600813 self.db_engine = db_engine
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600814 self.upstream = upstream
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600815 self.read_only = read_only
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600816 self.backfill_queue = None
817 self.anon_perms = set(anon_perms)
818 self.admin_username = admin_username
819 self.admin_password = admin_password
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600820
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600821 self.logger.info(
822 "Anonymous user permissions are: %s", ", ".join(self.anon_perms)
823 )
Brad Bishopa34c0302019-09-23 22:34:48 -0400824
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600825 def accept_client(self, socket):
Patrick Williams73bd93f2024-02-20 08:07:48 -0600826 return ServerClient(socket, self)
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600827
828 async def create_admin_user(self):
829 admin_permissions = (ALL_PERM,)
830 async with self.db_engine.connect(self.logger) as db:
831 added = await db.new_user(
832 self.admin_username,
833 admin_permissions,
834 hash_token(TOKEN_ALGORITHM, new_salt(), self.admin_password),
835 )
836 if added:
837 self.logger.info("Created admin user '%s'", self.admin_username)
838 else:
839 await db.set_user_perms(
840 self.admin_username,
841 admin_permissions,
842 )
843 await db.set_user_token(
844 self.admin_username,
845 hash_token(TOKEN_ALGORITHM, new_salt(), self.admin_password),
846 )
847 self.logger.info("Admin user '%s' updated", self.admin_username)
848
849 async def backfill_worker_task(self):
850 async with await create_async_client(
851 self.upstream
852 ) as client, self.db_engine.connect(self.logger) as db:
853 while True:
854 item = await self.backfill_queue.get()
855 if item is None:
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600856 self.backfill_queue.task_done()
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600857 break
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600858
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600859 method, taskhash = item
860 d = await client.get_taskhash(method, taskhash)
861 if d is not None:
862 await db.insert_unihash(d["method"], d["taskhash"], d["unihash"])
863 self.backfill_queue.task_done()
864
865 def start(self):
866 tasks = super().start()
867 if self.upstream:
868 self.backfill_queue = asyncio.Queue()
869 tasks += [self.backfill_worker_task()]
870
871 self.loop.run_until_complete(self.db_engine.create())
872
873 if self.admin_username:
874 self.loop.run_until_complete(self.create_admin_user())
875
876 return tasks
877
878 async def stop(self):
879 if self.backfill_queue is not None:
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600880 await self.backfill_queue.put(None)
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600881 await super().stop()