blob: a86507830e8d8731dfd72e88de4eb03eb5a5b9bb [file] [log] [blame]
Brad Bishopa34c0302019-09-23 22:34:48 -04001# Copyright (C) 2019 Garmin Ltd.
2#
3# SPDX-License-Identifier: GPL-2.0-only
4#
5
Andrew Geissler20137392023-10-12 04:59:14 -06006from datetime import datetime, timedelta
Brad Bishopa34c0302019-09-23 22:34:48 -04007import asyncio
Brad Bishopa34c0302019-09-23 22:34:48 -04008import logging
9import math
Brad Bishopa34c0302019-09-23 22:34:48 -040010import time
Patrick Williamsac13d5f2023-11-24 18:59:46 -060011import os
12import base64
13import hashlib
14from . import create_async_client
Andrew Geisslerc926e172021-05-07 16:11:35 -050015import bb.asyncrpc
16
Patrick Williamsac13d5f2023-11-24 18:59:46 -060017logger = logging.getLogger("hashserv.server")
Brad Bishopa34c0302019-09-23 22:34:48 -040018
Patrick Williamsac13d5f2023-11-24 18:59:46 -060019
20# This permission only exists to match nothing
21NONE_PERM = "@none"
22
23READ_PERM = "@read"
24REPORT_PERM = "@report"
25DB_ADMIN_PERM = "@db-admin"
26USER_ADMIN_PERM = "@user-admin"
27ALL_PERM = "@all"
28
29ALL_PERMISSIONS = {
30 READ_PERM,
31 REPORT_PERM,
32 DB_ADMIN_PERM,
33 USER_ADMIN_PERM,
34 ALL_PERM,
35}
36
37DEFAULT_ANON_PERMS = (
38 READ_PERM,
39 REPORT_PERM,
40 DB_ADMIN_PERM,
41)
42
43TOKEN_ALGORITHM = "sha256"
44
45# 48 bytes of random data will result in 64 characters when base64
46# encoded. This number also ensures that the base64 encoding won't have any
47# trailing '=' characters.
48TOKEN_SIZE = 48
49
50SALT_SIZE = 8
Brad Bishopa34c0302019-09-23 22:34:48 -040051
52
53class Measurement(object):
54 def __init__(self, sample):
55 self.sample = sample
56
57 def start(self):
58 self.start_time = time.perf_counter()
59
60 def end(self):
61 self.sample.add(time.perf_counter() - self.start_time)
62
63 def __enter__(self):
64 self.start()
65 return self
66
67 def __exit__(self, *args, **kwargs):
68 self.end()
69
70
71class Sample(object):
72 def __init__(self, stats):
73 self.stats = stats
74 self.num_samples = 0
75 self.elapsed = 0
76
77 def measure(self):
78 return Measurement(self)
79
80 def __enter__(self):
81 return self
82
83 def __exit__(self, *args, **kwargs):
84 self.end()
85
86 def add(self, elapsed):
87 self.num_samples += 1
88 self.elapsed += elapsed
89
90 def end(self):
91 if self.num_samples:
92 self.stats.add(self.elapsed)
93 self.num_samples = 0
94 self.elapsed = 0
95
96
97class Stats(object):
98 def __init__(self):
99 self.reset()
100
101 def reset(self):
102 self.num = 0
103 self.total_time = 0
104 self.max_time = 0
105 self.m = 0
106 self.s = 0
107 self.current_elapsed = None
108
109 def add(self, elapsed):
110 self.num += 1
111 if self.num == 1:
112 self.m = elapsed
113 self.s = 0
114 else:
115 last_m = self.m
116 self.m = last_m + (elapsed - last_m) / self.num
117 self.s = self.s + (elapsed - last_m) * (elapsed - self.m)
118
119 self.total_time += elapsed
120
121 if self.max_time < elapsed:
122 self.max_time = elapsed
123
124 def start_sample(self):
125 return Sample(self)
126
127 @property
128 def average(self):
129 if self.num == 0:
130 return 0
131 return self.total_time / self.num
132
133 @property
134 def stdev(self):
135 if self.num <= 1:
136 return 0
137 return math.sqrt(self.s / (self.num - 1))
138
139 def todict(self):
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600140 return {
141 k: getattr(self, k)
142 for k in ("num", "total_time", "max_time", "average", "stdev")
143 }
Brad Bishopa34c0302019-09-23 22:34:48 -0400144
145
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600146token_refresh_semaphore = asyncio.Lock()
Andrew Geisslereff27472021-10-29 15:35:00 -0500147
148
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600149async def new_token():
150 # Prevent malicious users from using this API to deduce the entropy
151 # pool on the server and thus be able to guess a token. *All* token
152 # refresh requests lock the same global semaphore and then sleep for a
153 # short time. The effectively rate limits the total number of requests
154 # than can be made across all clients to 10/second, which should be enough
155 # since you have to be an authenticated users to make the request in the
156 # first place
157 async with token_refresh_semaphore:
158 await asyncio.sleep(0.1)
159 raw = os.getrandom(TOKEN_SIZE, os.GRND_NONBLOCK)
Andrew Geisslereff27472021-10-29 15:35:00 -0500160
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600161 return base64.b64encode(raw, b"._").decode("utf-8")
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600162
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600163
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600164def new_salt():
165 return os.getrandom(SALT_SIZE, os.GRND_NONBLOCK).hex()
166
167
168def hash_token(algo, salt, token):
169 h = hashlib.new(algo)
170 h.update(salt.encode("utf-8"))
171 h.update(token.encode("utf-8"))
172 return ":".join([algo, salt, h.hexdigest()])
173
174
175def permissions(*permissions, allow_anon=True, allow_self_service=False):
176 """
177 Function decorator that can be used to decorate an RPC function call and
178 check that the current users permissions match the require permissions.
179
180 If allow_anon is True, the user will also be allowed to make the RPC call
181 if the anonymous user permissions match the permissions.
182
183 If allow_self_service is True, and the "username" property in the request
184 is the currently logged in user, or not specified, the user will also be
185 allowed to make the request. This allows users to access normal privileged
186 API, as long as they are only modifying their own user properties (e.g.
187 users can be allowed to reset their own token without @user-admin
188 permissions, but not the token for any other user.
189 """
190
191 def wrapper(func):
192 async def wrap(self, request):
193 if allow_self_service and self.user is not None:
194 username = request.get("username", self.user.username)
195 if username == self.user.username:
196 request["username"] = self.user.username
197 return await func(self, request)
198
199 if not self.user_has_permissions(*permissions, allow_anon=allow_anon):
200 if not self.user:
201 username = "Anonymous user"
202 user_perms = self.anon_perms
203 else:
204 username = self.user.username
205 user_perms = self.user.permissions
206
207 self.logger.info(
208 "User %s with permissions %r denied from calling %s. Missing permissions(s) %r",
209 username,
210 ", ".join(user_perms),
211 func.__name__,
212 ", ".join(permissions),
213 )
214 raise bb.asyncrpc.InvokeError(
215 f"{username} is not allowed to access permissions(s) {', '.join(permissions)}"
216 )
217
218 return await func(self, request)
219
220 return wrap
221
222 return wrapper
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600223
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600224
Andrew Geisslerc926e172021-05-07 16:11:35 -0500225class ServerClient(bb.asyncrpc.AsyncServerConnection):
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600226 def __init__(
227 self,
228 socket,
229 db_engine,
230 request_stats,
231 backfill_queue,
232 upstream,
233 read_only,
234 anon_perms,
235 ):
236 super().__init__(socket, "OEHASHEQUIV", logger)
237 self.db_engine = db_engine
Brad Bishopa34c0302019-09-23 22:34:48 -0400238 self.request_stats = request_stats
Andrew Geisslerc926e172021-05-07 16:11:35 -0500239 self.max_chunk = bb.asyncrpc.DEFAULT_MAX_CHUNK
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600240 self.backfill_queue = backfill_queue
241 self.upstream = upstream
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600242 self.read_only = read_only
243 self.user = None
244 self.anon_perms = anon_perms
Andrew Geissler475cb722020-07-10 16:00:51 -0500245
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600246 self.handlers.update(
247 {
248 "get": self.handle_get,
249 "get-outhash": self.handle_get_outhash,
250 "get-stream": self.handle_get_stream,
251 "get-stats": self.handle_get_stats,
252 "get-db-usage": self.handle_get_db_usage,
253 "get-db-query-columns": self.handle_get_db_query_columns,
254 # Not always read-only, but internally checks if the server is
255 # read-only
256 "report": self.handle_report,
257 "auth": self.handle_auth,
258 "get-user": self.handle_get_user,
259 "get-all-users": self.handle_get_all_users,
260 "become-user": self.handle_become_user,
261 }
262 )
Brad Bishopa34c0302019-09-23 22:34:48 -0400263
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600264 if not read_only:
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600265 self.handlers.update(
266 {
267 "report-equiv": self.handle_equivreport,
268 "reset-stats": self.handle_reset_stats,
269 "backfill-wait": self.handle_backfill_wait,
270 "remove": self.handle_remove,
271 "clean-unused": self.handle_clean_unused,
272 "refresh-token": self.handle_refresh_token,
273 "set-user-perms": self.handle_set_perms,
274 "new-user": self.handle_new_user,
275 "delete-user": self.handle_delete_user,
276 }
277 )
278
279 def raise_no_user_error(self, username):
280 raise bb.asyncrpc.InvokeError(f"No user named '{username}' exists")
281
282 def user_has_permissions(self, *permissions, allow_anon=True):
283 permissions = set(permissions)
284 if allow_anon:
285 if ALL_PERM in self.anon_perms:
286 return True
287
288 if not permissions - self.anon_perms:
289 return True
290
291 if self.user is None:
292 return False
293
294 if ALL_PERM in self.user.permissions:
295 return True
296
297 if not permissions - self.user.permissions:
298 return True
299
300 return False
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600301
Andrew Geisslerc926e172021-05-07 16:11:35 -0500302 def validate_proto_version(self):
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600303 return self.proto_version > (1, 0) and self.proto_version <= (1, 1)
Andrew Geisslerc926e172021-05-07 16:11:35 -0500304
Brad Bishopa34c0302019-09-23 22:34:48 -0400305 async def process_requests(self):
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600306 async with self.db_engine.connect(self.logger) as db:
307 self.db = db
308 if self.upstream is not None:
309 self.upstream_client = await create_async_client(self.upstream)
310 else:
311 self.upstream_client = None
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600312
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600313 try:
314 await super().process_requests()
315 finally:
316 if self.upstream_client is not None:
317 await self.upstream_client.close()
Brad Bishopa34c0302019-09-23 22:34:48 -0400318
Andrew Geissler475cb722020-07-10 16:00:51 -0500319 async def dispatch_message(self, msg):
320 for k in self.handlers.keys():
321 if k in msg:
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600322 self.logger.debug("Handling %s" % k)
323 if "stream" in k:
324 return await self.handlers[k](msg[k])
Andrew Geissler475cb722020-07-10 16:00:51 -0500325 else:
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600326 with self.request_stats.start_sample() as self.request_sample, self.request_sample.measure():
327 return await self.handlers[k](msg[k])
Andrew Geissler475cb722020-07-10 16:00:51 -0500328
Andrew Geisslerc926e172021-05-07 16:11:35 -0500329 raise bb.asyncrpc.ClientError("Unrecognized command %r" % msg)
Andrew Geissler475cb722020-07-10 16:00:51 -0500330
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600331 @permissions(READ_PERM)
Brad Bishopa34c0302019-09-23 22:34:48 -0400332 async def handle_get(self, request):
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600333 method = request["method"]
334 taskhash = request["taskhash"]
335 fetch_all = request.get("all", False)
Brad Bishopa34c0302019-09-23 22:34:48 -0400336
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600337 return await self.get_unihash(method, taskhash, fetch_all)
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600338
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600339 async def get_unihash(self, method, taskhash, fetch_all=False):
Andrew Geisslereff27472021-10-29 15:35:00 -0500340 d = None
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600341
Andrew Geisslereff27472021-10-29 15:35:00 -0500342 if fetch_all:
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600343 row = await self.db.get_unihash_by_taskhash_full(method, taskhash)
Andrew Geisslereff27472021-10-29 15:35:00 -0500344 if row is not None:
345 d = {k: row[k] for k in row.keys()}
346 elif self.upstream_client is not None:
347 d = await self.upstream_client.get_taskhash(method, taskhash, True)
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600348 await self.update_unified(d)
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600349 else:
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600350 row = await self.db.get_equivalent(method, taskhash)
Andrew Geisslereff27472021-10-29 15:35:00 -0500351
352 if row is not None:
353 d = {k: row[k] for k in row.keys()}
354 elif self.upstream_client is not None:
355 d = await self.upstream_client.get_taskhash(method, taskhash)
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600356 await self.db.insert_unihash(d["method"], d["taskhash"], d["unihash"])
Andrew Geisslereff27472021-10-29 15:35:00 -0500357
358 return d
359
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600360 @permissions(READ_PERM)
Andrew Geisslereff27472021-10-29 15:35:00 -0500361 async def handle_get_outhash(self, request):
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600362 method = request["method"]
363 outhash = request["outhash"]
364 taskhash = request["taskhash"]
Andrew Geissler20137392023-10-12 04:59:14 -0600365 with_unihash = request.get("with_unihash", True)
Andrew Geisslereff27472021-10-29 15:35:00 -0500366
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600367 return await self.get_outhash(method, outhash, taskhash, with_unihash)
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600368
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600369 async def get_outhash(self, method, outhash, taskhash, with_unihash=True):
Andrew Geisslereff27472021-10-29 15:35:00 -0500370 d = None
Andrew Geissler20137392023-10-12 04:59:14 -0600371 if with_unihash:
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600372 row = await self.db.get_unihash_by_outhash(method, outhash)
Andrew Geissler20137392023-10-12 04:59:14 -0600373 else:
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600374 row = await self.db.get_outhash(method, outhash)
Andrew Geisslereff27472021-10-29 15:35:00 -0500375
376 if row is not None:
377 d = {k: row[k] for k in row.keys()}
378 elif self.upstream_client is not None:
379 d = await self.upstream_client.get_outhash(method, outhash, taskhash)
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600380 await self.update_unified(d)
Andrew Geisslereff27472021-10-29 15:35:00 -0500381
382 return d
383
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600384 async def update_unified(self, data):
Andrew Geisslereff27472021-10-29 15:35:00 -0500385 if data is None:
386 return
387
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600388 await self.db.insert_unihash(data["method"], data["taskhash"], data["unihash"])
389 await self.db.insert_outhash(data)
Andrew Geisslereff27472021-10-29 15:35:00 -0500390
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600391 @permissions(READ_PERM)
Brad Bishopa34c0302019-09-23 22:34:48 -0400392 async def handle_get_stream(self, request):
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600393 await self.socket.send_message("ok")
Brad Bishopa34c0302019-09-23 22:34:48 -0400394
395 while True:
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600396 upstream = None
397
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600398 l = await self.socket.recv()
Brad Bishopa34c0302019-09-23 22:34:48 -0400399 if not l:
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600400 break
Brad Bishopa34c0302019-09-23 22:34:48 -0400401
402 try:
403 # This inner loop is very sensitive and must be as fast as
404 # possible (which is why the request sample is handled manually
405 # instead of using 'with', and also why logging statements are
406 # commented out.
407 self.request_sample = self.request_stats.start_sample()
408 request_measure = self.request_sample.measure()
409 request_measure.start()
410
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600411 if l == "END":
412 break
Brad Bishopa34c0302019-09-23 22:34:48 -0400413
414 (method, taskhash) = l.split()
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600415 # self.logger.debug('Looking up %s %s' % (method, taskhash))
416 row = await self.db.get_equivalent(method, taskhash)
Andrew Geisslereff27472021-10-29 15:35:00 -0500417
Brad Bishopa34c0302019-09-23 22:34:48 -0400418 if row is not None:
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600419 msg = row["unihash"]
420 # self.logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash']))
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600421 elif self.upstream_client is not None:
422 upstream = await self.upstream_client.get_unihash(method, taskhash)
423 if upstream:
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600424 msg = upstream
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600425 else:
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600426 msg = ""
Brad Bishopa34c0302019-09-23 22:34:48 -0400427 else:
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600428 msg = ""
Brad Bishopa34c0302019-09-23 22:34:48 -0400429
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600430 await self.socket.send(msg)
Brad Bishopa34c0302019-09-23 22:34:48 -0400431 finally:
432 request_measure.end()
433 self.request_sample.end()
434
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600435 # Post to the backfill queue after writing the result to minimize
436 # the turn around time on a request
437 if upstream is not None:
438 await self.backfill_queue.put((method, taskhash))
439
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600440 await self.socket.send("ok")
441 return self.NO_RESPONSE
Brad Bishopa34c0302019-09-23 22:34:48 -0400442
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600443 async def report_readonly(self, data):
444 method = data["method"]
445 outhash = data["outhash"]
446 taskhash = data["taskhash"]
Brad Bishopa34c0302019-09-23 22:34:48 -0400447
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600448 info = await self.get_outhash(method, outhash, taskhash)
449 if info:
450 unihash = info["unihash"]
451 else:
452 unihash = data["unihash"]
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600453
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600454 return {
455 "taskhash": taskhash,
456 "method": method,
457 "unihash": unihash,
Brad Bishopa34c0302019-09-23 22:34:48 -0400458 }
459
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600460 # Since this can be called either read only or to report, the check to
461 # report is made inside the function
462 @permissions(READ_PERM)
463 async def handle_report(self, data):
464 if self.read_only or not self.user_has_permissions(REPORT_PERM):
465 return await self.report_readonly(data)
Brad Bishopa34c0302019-09-23 22:34:48 -0400466
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600467 outhash_data = {
468 "method": data["method"],
469 "outhash": data["outhash"],
470 "taskhash": data["taskhash"],
471 "created": datetime.now(),
472 }
473
474 for k in ("owner", "PN", "PV", "PR", "task", "outhash_siginfo"):
475 if k in data:
476 outhash_data[k] = data[k]
477
478 if self.user:
479 outhash_data["owner"] = self.user.username
480
481 # Insert the new entry, unless it already exists
482 if await self.db.insert_outhash(outhash_data):
483 # If this row is new, check if it is equivalent to another
484 # output hash
485 row = await self.db.get_equivalent_for_outhash(
486 data["method"], data["outhash"], data["taskhash"]
487 )
488
489 if row is not None:
490 # A matching output hash was found. Set our taskhash to the
491 # same unihash since they are equivalent
492 unihash = row["unihash"]
493 else:
494 # No matching output hash was found. This is probably the
495 # first outhash to be added.
496 unihash = data["unihash"]
497
498 # Query upstream to see if it has a unihash we can use
499 if self.upstream_client is not None:
500 upstream_data = await self.upstream_client.get_outhash(
501 data["method"], data["outhash"], data["taskhash"]
502 )
503 if upstream_data is not None:
504 unihash = upstream_data["unihash"]
505
506 await self.db.insert_unihash(data["method"], data["taskhash"], unihash)
507
508 unihash_data = await self.get_unihash(data["method"], data["taskhash"])
509 if unihash_data is not None:
510 unihash = unihash_data["unihash"]
511 else:
512 unihash = data["unihash"]
513
514 return {
515 "taskhash": data["taskhash"],
516 "method": data["method"],
517 "unihash": unihash,
518 }
519
520 @permissions(READ_PERM, REPORT_PERM)
521 async def handle_equivreport(self, data):
522 await self.db.insert_unihash(data["method"], data["taskhash"], data["unihash"])
523
524 # Fetch the unihash that will be reported for the taskhash. If the
525 # unihash matches, it means this row was inserted (or the mapping
526 # was already valid)
527 row = await self.db.get_equivalent(data["method"], data["taskhash"])
528
529 if row["unihash"] == data["unihash"]:
530 self.logger.info(
531 "Adding taskhash equivalence for %s with unihash %s",
532 data["taskhash"],
533 row["unihash"],
534 )
535
536 return {k: row[k] for k in ("taskhash", "method", "unihash")}
537
538 @permissions(READ_PERM)
539 async def handle_get_stats(self, request):
540 return {
541 "requests": self.request_stats.todict(),
542 }
543
544 @permissions(DB_ADMIN_PERM)
Brad Bishopa34c0302019-09-23 22:34:48 -0400545 async def handle_reset_stats(self, request):
546 d = {
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600547 "requests": self.request_stats.todict(),
Brad Bishopa34c0302019-09-23 22:34:48 -0400548 }
549
550 self.request_stats.reset()
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600551 return d
Brad Bishopa34c0302019-09-23 22:34:48 -0400552
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600553 @permissions(READ_PERM)
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600554 async def handle_backfill_wait(self, request):
555 d = {
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600556 "tasks": self.backfill_queue.qsize(),
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600557 }
558 await self.backfill_queue.join()
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600559 return d
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600560
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600561 @permissions(DB_ADMIN_PERM)
Andrew Geissler20137392023-10-12 04:59:14 -0600562 async def handle_remove(self, request):
563 condition = request["where"]
564 if not isinstance(condition, dict):
565 raise TypeError("Bad condition type %s" % type(condition))
566
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600567 return {"count": await self.db.remove(condition)}
Andrew Geissler20137392023-10-12 04:59:14 -0600568
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600569 @permissions(DB_ADMIN_PERM)
Andrew Geissler20137392023-10-12 04:59:14 -0600570 async def handle_clean_unused(self, request):
571 max_age = request["max_age_seconds"]
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600572 oldest = datetime.now() - timedelta(seconds=-max_age)
573 return {"count": await self.db.clean_unused(oldest)}
Andrew Geissler20137392023-10-12 04:59:14 -0600574
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600575 @permissions(DB_ADMIN_PERM)
576 async def handle_get_db_usage(self, request):
577 return {"usage": await self.db.get_usage()}
Andrew Geissler20137392023-10-12 04:59:14 -0600578
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600579 @permissions(DB_ADMIN_PERM)
580 async def handle_get_db_query_columns(self, request):
581 return {"columns": await self.db.get_query_columns()}
582
583 # The authentication API is always allowed
584 async def handle_auth(self, request):
585 username = str(request["username"])
586 token = str(request["token"])
587
588 async def fail_auth():
589 nonlocal username
590 # Rate limit bad login attempts
591 await asyncio.sleep(1)
592 raise bb.asyncrpc.InvokeError(f"Unable to authenticate as {username}")
593
594 user, db_token = await self.db.lookup_user_token(username)
595
596 if not user or not db_token:
597 await fail_auth()
598
599 try:
600 algo, salt, _ = db_token.split(":")
601 except ValueError:
602 await fail_auth()
603
604 if hash_token(algo, salt, token) != db_token:
605 await fail_auth()
606
607 self.user = user
608
609 self.logger.info("Authenticated as %s", username)
610
611 return {
612 "result": True,
613 "username": self.user.username,
614 "permissions": sorted(list(self.user.permissions)),
615 }
616
617 @permissions(USER_ADMIN_PERM, allow_self_service=True, allow_anon=False)
618 async def handle_refresh_token(self, request):
619 username = str(request["username"])
620
621 token = await new_token()
622
623 updated = await self.db.set_user_token(
624 username,
625 hash_token(TOKEN_ALGORITHM, new_salt(), token),
Andrew Geisslereff27472021-10-29 15:35:00 -0500626 )
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600627 if not updated:
628 self.raise_no_user_error(username)
629
630 return {"username": username, "token": token}
631
632 def get_perm_arg(self, arg):
633 if not isinstance(arg, list):
634 raise bb.asyncrpc.InvokeError("Unexpected type for permissions")
635
636 arg = set(arg)
637 try:
638 arg.remove(NONE_PERM)
639 except KeyError:
640 pass
641
642 unknown_perms = arg - ALL_PERMISSIONS
643 if unknown_perms:
644 raise bb.asyncrpc.InvokeError(
645 "Unknown permissions %s" % ", ".join(sorted(list(unknown_perms)))
646 )
647
648 return sorted(list(arg))
649
650 def return_perms(self, permissions):
651 if ALL_PERM in permissions:
652 return sorted(list(ALL_PERMISSIONS))
653 return sorted(list(permissions))
654
655 @permissions(USER_ADMIN_PERM, allow_anon=False)
656 async def handle_set_perms(self, request):
657 username = str(request["username"])
658 permissions = self.get_perm_arg(request["permissions"])
659
660 if not await self.db.set_user_perms(username, permissions):
661 self.raise_no_user_error(username)
662
663 return {
664 "username": username,
665 "permissions": self.return_perms(permissions),
666 }
667
668 @permissions(USER_ADMIN_PERM, allow_self_service=True, allow_anon=False)
669 async def handle_get_user(self, request):
670 username = str(request["username"])
671
672 user = await self.db.lookup_user(username)
673 if user is None:
674 return None
675
676 return {
677 "username": user.username,
678 "permissions": self.return_perms(user.permissions),
679 }
680
681 @permissions(USER_ADMIN_PERM, allow_anon=False)
682 async def handle_get_all_users(self, request):
683 users = await self.db.get_all_users()
684 return {
685 "users": [
686 {
687 "username": u.username,
688 "permissions": self.return_perms(u.permissions),
689 }
690 for u in users
691 ]
692 }
693
694 @permissions(USER_ADMIN_PERM, allow_anon=False)
695 async def handle_new_user(self, request):
696 username = str(request["username"])
697 permissions = self.get_perm_arg(request["permissions"])
698
699 token = await new_token()
700
701 inserted = await self.db.new_user(
702 username,
703 permissions,
704 hash_token(TOKEN_ALGORITHM, new_salt(), token),
705 )
706 if not inserted:
707 raise bb.asyncrpc.InvokeError(f"Cannot create new user '{username}'")
708
709 return {
710 "username": username,
711 "permissions": self.return_perms(permissions),
712 "token": token,
713 }
714
715 @permissions(USER_ADMIN_PERM, allow_self_service=True, allow_anon=False)
716 async def handle_delete_user(self, request):
717 username = str(request["username"])
718
719 if not await self.db.delete_user(username):
720 self.raise_no_user_error(username)
721
722 return {"username": username}
723
724 @permissions(USER_ADMIN_PERM, allow_anon=False)
725 async def handle_become_user(self, request):
726 username = str(request["username"])
727
728 user = await self.db.lookup_user(username)
729 if user is None:
730 raise bb.asyncrpc.InvokeError(f"User {username} doesn't exist")
731
732 self.user = user
733
734 self.logger.info("Became user %s", username)
735
736 return {
737 "username": self.user.username,
738 "permissions": self.return_perms(self.user.permissions),
739 }
Brad Bishopa34c0302019-09-23 22:34:48 -0400740
741
Andrew Geisslerc926e172021-05-07 16:11:35 -0500742class Server(bb.asyncrpc.AsyncServer):
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600743 def __init__(
744 self,
745 db_engine,
746 upstream=None,
747 read_only=False,
748 anon_perms=DEFAULT_ANON_PERMS,
749 admin_username=None,
750 admin_password=None,
751 ):
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600752 if upstream and read_only:
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600753 raise bb.asyncrpc.ServerError(
754 "Read-only hashserv cannot pull from an upstream server"
755 )
756
757 disallowed_perms = set(anon_perms) - set(
758 [NONE_PERM, READ_PERM, REPORT_PERM, DB_ADMIN_PERM]
759 )
760
761 if disallowed_perms:
762 raise bb.asyncrpc.ServerError(
763 f"Permission(s) {' '.join(disallowed_perms)} are not allowed for anonymous users"
764 )
Andrew Geisslerc926e172021-05-07 16:11:35 -0500765
Andrew Geisslerd159c7f2021-09-02 21:05:58 -0500766 super().__init__(logger)
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600767
Brad Bishopa34c0302019-09-23 22:34:48 -0400768 self.request_stats = Stats()
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600769 self.db_engine = db_engine
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600770 self.upstream = upstream
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600771 self.read_only = read_only
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600772 self.backfill_queue = None
773 self.anon_perms = set(anon_perms)
774 self.admin_username = admin_username
775 self.admin_password = admin_password
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600776
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600777 self.logger.info(
778 "Anonymous user permissions are: %s", ", ".join(self.anon_perms)
779 )
Brad Bishopa34c0302019-09-23 22:34:48 -0400780
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600781 def accept_client(self, socket):
782 return ServerClient(
783 socket,
784 self.db_engine,
785 self.request_stats,
786 self.backfill_queue,
787 self.upstream,
788 self.read_only,
789 self.anon_perms,
790 )
791
792 async def create_admin_user(self):
793 admin_permissions = (ALL_PERM,)
794 async with self.db_engine.connect(self.logger) as db:
795 added = await db.new_user(
796 self.admin_username,
797 admin_permissions,
798 hash_token(TOKEN_ALGORITHM, new_salt(), self.admin_password),
799 )
800 if added:
801 self.logger.info("Created admin user '%s'", self.admin_username)
802 else:
803 await db.set_user_perms(
804 self.admin_username,
805 admin_permissions,
806 )
807 await db.set_user_token(
808 self.admin_username,
809 hash_token(TOKEN_ALGORITHM, new_salt(), self.admin_password),
810 )
811 self.logger.info("Admin user '%s' updated", self.admin_username)
812
813 async def backfill_worker_task(self):
814 async with await create_async_client(
815 self.upstream
816 ) as client, self.db_engine.connect(self.logger) as db:
817 while True:
818 item = await self.backfill_queue.get()
819 if item is None:
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600820 self.backfill_queue.task_done()
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600821 break
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600822
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600823 method, taskhash = item
824 d = await client.get_taskhash(method, taskhash)
825 if d is not None:
826 await db.insert_unihash(d["method"], d["taskhash"], d["unihash"])
827 self.backfill_queue.task_done()
828
829 def start(self):
830 tasks = super().start()
831 if self.upstream:
832 self.backfill_queue = asyncio.Queue()
833 tasks += [self.backfill_worker_task()]
834
835 self.loop.run_until_complete(self.db_engine.create())
836
837 if self.admin_username:
838 self.loop.run_until_complete(self.create_admin_user())
839
840 return tasks
841
842 async def stop(self):
843 if self.backfill_queue is not None:
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600844 await self.backfill_queue.put(None)
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600845 await super().stop()