blob: b269879ecfd208d21d5762f4ea08f21281529f46 [file] [log] [blame]
Brad Bishopa34c0302019-09-23 22:34:48 -04001# Copyright (C) 2019 Garmin Ltd.
2#
3# SPDX-License-Identifier: GPL-2.0-only
4#
5
Brad Bishopa34c0302019-09-23 22:34:48 -04006import logging
7import socket
Andrew Geisslerc926e172021-05-07 16:11:35 -05008import bb.asyncrpc
Patrick Williamsac13d5f2023-11-24 18:59:46 -06009import json
Andrew Geisslerc926e172021-05-07 16:11:35 -050010from . import create_async_client
Brad Bishopa34c0302019-09-23 22:34:48 -040011
12
Andrew Geissler6ce62a22020-11-30 19:58:47 -060013logger = logging.getLogger("hashserv.client")
Brad Bishopa34c0302019-09-23 22:34:48 -040014
15
Andrew Geisslerc926e172021-05-07 16:11:35 -050016class AsyncClient(bb.asyncrpc.AsyncClient):
Brad Bishopa34c0302019-09-23 22:34:48 -040017 MODE_NORMAL = 0
18 MODE_GET_STREAM = 1
Patrick Williams73bd93f2024-02-20 08:07:48 -060019 MODE_EXIST_STREAM = 2
Brad Bishopa34c0302019-09-23 22:34:48 -040020
Patrick Williamsac13d5f2023-11-24 18:59:46 -060021 def __init__(self, username=None, password=None):
22 super().__init__("OEHASHEQUIV", "1.1", logger)
Brad Bishopa34c0302019-09-23 22:34:48 -040023 self.mode = self.MODE_NORMAL
Patrick Williamsac13d5f2023-11-24 18:59:46 -060024 self.username = username
25 self.password = password
26 self.saved_become_user = None
Brad Bishopa34c0302019-09-23 22:34:48 -040027
Andrew Geisslerc926e172021-05-07 16:11:35 -050028 async def setup_connection(self):
29 await super().setup_connection()
30 cur_mode = self.mode
31 self.mode = self.MODE_NORMAL
32 await self._set_mode(cur_mode)
Patrick Williamsac13d5f2023-11-24 18:59:46 -060033 if self.username:
34 # Save off become user temporarily because auth() resets it
35 become = self.saved_become_user
36 await self.auth(self.username, self.password)
37
38 if become:
39 await self.become_user(become)
Brad Bishopa34c0302019-09-23 22:34:48 -040040
Andrew Geissler6ce62a22020-11-30 19:58:47 -060041 async def send_stream(self, msg):
42 async def proc():
Patrick Williamsac13d5f2023-11-24 18:59:46 -060043 await self.socket.send(msg)
44 return await self.socket.recv()
Brad Bishopa34c0302019-09-23 22:34:48 -040045
Andrew Geissler6ce62a22020-11-30 19:58:47 -060046 return await self._send_wrapper(proc)
Brad Bishopa34c0302019-09-23 22:34:48 -040047
Andrew Geissler6ce62a22020-11-30 19:58:47 -060048 async def _set_mode(self, new_mode):
Patrick Williamsac13d5f2023-11-24 18:59:46 -060049 async def stream_to_normal():
50 await self.socket.send("END")
51 return await self.socket.recv()
52
Patrick Williams73bd93f2024-02-20 08:07:48 -060053 async def normal_to_stream(command):
54 r = await self.invoke({command: None})
55 if r != "ok":
56 raise ConnectionError(
57 f"Unable to transition to stream mode: Bad response from server {r!r}"
58 )
59
60 self.logger.debug("Mode is now %s", command)
61
62 if new_mode == self.mode:
63 return
64
65 self.logger.debug("Transitioning mode %s -> %s", self.mode, new_mode)
66
67 # Always transition to normal mode before switching to any other mode
68 if self.mode != self.MODE_NORMAL:
Patrick Williamsac13d5f2023-11-24 18:59:46 -060069 r = await self._send_wrapper(stream_to_normal)
Andrew Geissler6ce62a22020-11-30 19:58:47 -060070 if r != "ok":
Patrick Williamsac13d5f2023-11-24 18:59:46 -060071 self.check_invoke_error(r)
Patrick Williams73bd93f2024-02-20 08:07:48 -060072 raise ConnectionError(
73 f"Unable to transition to normal mode: Bad response from server {r!r}"
74 )
75 self.logger.debug("Mode is now normal")
76
77 if new_mode == self.MODE_GET_STREAM:
78 await normal_to_stream("get-stream")
79 elif new_mode == self.MODE_EXIST_STREAM:
80 await normal_to_stream("exists-stream")
81 elif new_mode != self.MODE_NORMAL:
82 raise Exception("Undefined mode transition {self.mode!r} -> {new_mode!r}")
Brad Bishopa34c0302019-09-23 22:34:48 -040083
84 self.mode = new_mode
85
Andrew Geissler6ce62a22020-11-30 19:58:47 -060086 async def get_unihash(self, method, taskhash):
87 await self._set_mode(self.MODE_GET_STREAM)
88 r = await self.send_stream("%s %s" % (method, taskhash))
Brad Bishopa34c0302019-09-23 22:34:48 -040089 if not r:
90 return None
91 return r
92
Andrew Geissler6ce62a22020-11-30 19:58:47 -060093 async def report_unihash(self, taskhash, method, outhash, unihash, extra={}):
94 await self._set_mode(self.MODE_NORMAL)
Brad Bishopa34c0302019-09-23 22:34:48 -040095 m = extra.copy()
Andrew Geissler6ce62a22020-11-30 19:58:47 -060096 m["taskhash"] = taskhash
97 m["method"] = method
98 m["outhash"] = outhash
99 m["unihash"] = unihash
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600100 return await self.invoke({"report": m})
Brad Bishopa34c0302019-09-23 22:34:48 -0400101
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600102 async def report_unihash_equiv(self, taskhash, method, unihash, extra={}):
103 await self._set_mode(self.MODE_NORMAL)
Andrew Geissler82c905d2020-04-13 13:39:40 -0500104 m = extra.copy()
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600105 m["taskhash"] = taskhash
106 m["method"] = method
107 m["unihash"] = unihash
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600108 return await self.invoke({"report-equiv": m})
Andrew Geissler82c905d2020-04-13 13:39:40 -0500109
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600110 async def get_taskhash(self, method, taskhash, all_properties=False):
111 await self._set_mode(self.MODE_NORMAL)
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600112 return await self.invoke(
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600113 {"get": {"taskhash": taskhash, "method": method, "all": all_properties}}
114 )
Andrew Geissler475cb722020-07-10 16:00:51 -0500115
Patrick Williams73bd93f2024-02-20 08:07:48 -0600116 async def unihash_exists(self, unihash):
117 await self._set_mode(self.MODE_EXIST_STREAM)
118 r = await self.send_stream(unihash)
119 return r == "true"
120
Andrew Geissler20137392023-10-12 04:59:14 -0600121 async def get_outhash(self, method, outhash, taskhash, with_unihash=True):
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600122 await self._set_mode(self.MODE_NORMAL)
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600123 return await self.invoke(
124 {
125 "get-outhash": {
126 "outhash": outhash,
127 "taskhash": taskhash,
128 "method": method,
129 "with_unihash": with_unihash,
130 }
131 }
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600132 )
133
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600134 async def get_stats(self):
135 await self._set_mode(self.MODE_NORMAL)
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600136 return await self.invoke({"get-stats": None})
Brad Bishopa34c0302019-09-23 22:34:48 -0400137
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600138 async def reset_stats(self):
139 await self._set_mode(self.MODE_NORMAL)
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600140 return await self.invoke({"reset-stats": None})
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600141
142 async def backfill_wait(self):
143 await self._set_mode(self.MODE_NORMAL)
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600144 return (await self.invoke({"backfill-wait": None}))["tasks"]
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600145
Andrew Geissler20137392023-10-12 04:59:14 -0600146 async def remove(self, where):
147 await self._set_mode(self.MODE_NORMAL)
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600148 return await self.invoke({"remove": {"where": where}})
Andrew Geissler20137392023-10-12 04:59:14 -0600149
150 async def clean_unused(self, max_age):
151 await self._set_mode(self.MODE_NORMAL)
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600152 return await self.invoke({"clean-unused": {"max_age_seconds": max_age}})
153
154 async def auth(self, username, token):
155 await self._set_mode(self.MODE_NORMAL)
156 result = await self.invoke({"auth": {"username": username, "token": token}})
157 self.username = username
158 self.password = token
159 self.saved_become_user = None
160 return result
161
162 async def refresh_token(self, username=None):
163 await self._set_mode(self.MODE_NORMAL)
164 m = {}
165 if username:
166 m["username"] = username
167 result = await self.invoke({"refresh-token": m})
168 if (
169 self.username
170 and not self.saved_become_user
171 and result["username"] == self.username
172 ):
173 self.password = result["token"]
174 return result
175
176 async def set_user_perms(self, username, permissions):
177 await self._set_mode(self.MODE_NORMAL)
178 return await self.invoke(
179 {"set-user-perms": {"username": username, "permissions": permissions}}
180 )
181
182 async def get_user(self, username=None):
183 await self._set_mode(self.MODE_NORMAL)
184 m = {}
185 if username:
186 m["username"] = username
187 return await self.invoke({"get-user": m})
188
189 async def get_all_users(self):
190 await self._set_mode(self.MODE_NORMAL)
191 return (await self.invoke({"get-all-users": {}}))["users"]
192
193 async def new_user(self, username, permissions):
194 await self._set_mode(self.MODE_NORMAL)
195 return await self.invoke(
196 {"new-user": {"username": username, "permissions": permissions}}
197 )
198
199 async def delete_user(self, username):
200 await self._set_mode(self.MODE_NORMAL)
201 return await self.invoke({"delete-user": {"username": username}})
202
203 async def become_user(self, username):
204 await self._set_mode(self.MODE_NORMAL)
205 result = await self.invoke({"become-user": {"username": username}})
206 if username == self.username:
207 self.saved_become_user = None
208 else:
209 self.saved_become_user = username
210 return result
211
212 async def get_db_usage(self):
213 await self._set_mode(self.MODE_NORMAL)
214 return (await self.invoke({"get-db-usage": {}}))["usage"]
215
216 async def get_db_query_columns(self):
217 await self._set_mode(self.MODE_NORMAL)
218 return (await self.invoke({"get-db-query-columns": {}}))["columns"]
Andrew Geissler20137392023-10-12 04:59:14 -0600219
Patrick Williams73bd93f2024-02-20 08:07:48 -0600220 async def gc_status(self):
221 await self._set_mode(self.MODE_NORMAL)
222 return await self.invoke({"gc-status": {}})
223
224 async def gc_mark(self, mark, where):
225 """
226 Starts a new garbage collection operation identified by "mark". If
227 garbage collection is already in progress with "mark", the collection
228 is continued.
229
230 All unihash entries that match the "where" clause are marked to be
231 kept. In addition, any new entries added to the database after this
232 command will be automatically marked with "mark"
233 """
234 await self._set_mode(self.MODE_NORMAL)
235 return await self.invoke({"gc-mark": {"mark": mark, "where": where}})
236
237 async def gc_sweep(self, mark):
238 """
239 Finishes garbage collection for "mark". All unihash entries that have
240 not been marked will be deleted.
241
242 It is recommended to clean unused outhash entries after running this to
243 cleanup any dangling outhashes
244 """
245 await self._set_mode(self.MODE_NORMAL)
246 return await self.invoke({"gc-sweep": {"mark": mark}})
247
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600248
Andrew Geisslerc926e172021-05-07 16:11:35 -0500249class Client(bb.asyncrpc.Client):
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600250 def __init__(self, username=None, password=None):
251 self.username = username
252 self.password = password
253
Andrew Geisslerc926e172021-05-07 16:11:35 -0500254 super().__init__()
255 self._add_methods(
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600256 "connect_tcp",
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600257 "connect_websocket",
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600258 "get_unihash",
259 "report_unihash",
260 "report_unihash_equiv",
261 "get_taskhash",
Patrick Williams73bd93f2024-02-20 08:07:48 -0600262 "unihash_exists",
Andrew Geisslereff27472021-10-29 15:35:00 -0500263 "get_outhash",
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600264 "get_stats",
265 "reset_stats",
266 "backfill_wait",
Andrew Geissler20137392023-10-12 04:59:14 -0600267 "remove",
268 "clean_unused",
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600269 "auth",
270 "refresh_token",
271 "set_user_perms",
272 "get_user",
273 "get_all_users",
274 "new_user",
275 "delete_user",
276 "become_user",
277 "get_db_usage",
278 "get_db_query_columns",
Patrick Williams73bd93f2024-02-20 08:07:48 -0600279 "gc_status",
280 "gc_mark",
281 "gc_sweep",
Andrew Geisslerc926e172021-05-07 16:11:35 -0500282 )
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600283
Andrew Geisslerc926e172021-05-07 16:11:35 -0500284 def _get_async_client(self):
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600285 return AsyncClient(self.username, self.password)
Patrick Williams73bd93f2024-02-20 08:07:48 -0600286
287
288class ClientPool(bb.asyncrpc.ClientPool):
289 def __init__(
290 self,
291 address,
292 max_clients,
293 *,
294 username=None,
295 password=None,
296 become=None,
297 ):
298 super().__init__(max_clients)
299 self.address = address
300 self.username = username
301 self.password = password
302 self.become = become
303
304 async def _new_client(self):
305 client = await create_async_client(
306 self.address,
307 username=self.username,
308 password=self.password,
309 )
310 if self.become:
311 await client.become_user(self.become)
312 return client
313
314 def _run_key_tasks(self, queries, call):
315 results = {key: None for key in queries.keys()}
316
317 def make_task(key, args):
318 async def task(client):
319 nonlocal results
320 unihash = await call(client, args)
321 results[key] = unihash
322
323 return task
324
325 def gen_tasks():
326 for key, args in queries.items():
327 yield make_task(key, args)
328
329 self.run_tasks(gen_tasks())
330 return results
331
332 def get_unihashes(self, queries):
333 """
334 Query multiple unihashes in parallel.
335
336 The queries argument is a dictionary with arbitrary key. The values
337 must be a tuple of (method, taskhash).
338
339 Returns a dictionary with a corresponding key for each input key, and
340 the value is the queried unihash (which might be none if the query
341 failed)
342 """
343
344 async def call(client, args):
345 method, taskhash = args
346 return await client.get_unihash(method, taskhash)
347
348 return self._run_key_tasks(queries, call)
349
350 def unihashes_exist(self, queries):
351 """
352 Query multiple unihash existence checks in parallel.
353
354 The queries argument is a dictionary with arbitrary key. The values
355 must be a unihash.
356
357 Returns a dictionary with a corresponding key for each input key, and
358 the value is True or False if the unihash is known by the server (or
359 None if there was a failure)
360 """
361
362 async def call(client, unihash):
363 return await client.unihash_exists(unihash)
364
365 return self._run_key_tasks(queries, call)