blob: 0b254beddd7fc2a0072741f9b8b73b4568a3847b [file] [log] [blame]
Brad Bishopa34c0302019-09-23 22:34:48 -04001# Copyright (C) 2019 Garmin Ltd.
2#
3# SPDX-License-Identifier: GPL-2.0-only
4#
5
Brad Bishopa34c0302019-09-23 22:34:48 -04006import logging
7import socket
Andrew Geisslerc926e172021-05-07 16:11:35 -05008import bb.asyncrpc
Patrick Williamsac13d5f2023-11-24 18:59:46 -06009import json
Andrew Geisslerc926e172021-05-07 16:11:35 -050010from . import create_async_client
Brad Bishopa34c0302019-09-23 22:34:48 -040011
12
Andrew Geissler6ce62a22020-11-30 19:58:47 -060013logger = logging.getLogger("hashserv.client")
Brad Bishopa34c0302019-09-23 22:34:48 -040014
15
Andrew Geisslerc926e172021-05-07 16:11:35 -050016class AsyncClient(bb.asyncrpc.AsyncClient):
Brad Bishopa34c0302019-09-23 22:34:48 -040017 MODE_NORMAL = 0
18 MODE_GET_STREAM = 1
Patrick Williams73bd93f2024-02-20 08:07:48 -060019 MODE_EXIST_STREAM = 2
Brad Bishopa34c0302019-09-23 22:34:48 -040020
Patrick Williamsac13d5f2023-11-24 18:59:46 -060021 def __init__(self, username=None, password=None):
22 super().__init__("OEHASHEQUIV", "1.1", logger)
Brad Bishopa34c0302019-09-23 22:34:48 -040023 self.mode = self.MODE_NORMAL
Patrick Williamsac13d5f2023-11-24 18:59:46 -060024 self.username = username
25 self.password = password
26 self.saved_become_user = None
Brad Bishopa34c0302019-09-23 22:34:48 -040027
Andrew Geisslerc926e172021-05-07 16:11:35 -050028 async def setup_connection(self):
29 await super().setup_connection()
Andrew Geisslerc926e172021-05-07 16:11:35 -050030 self.mode = self.MODE_NORMAL
Patrick Williamsac13d5f2023-11-24 18:59:46 -060031 if self.username:
32 # Save off become user temporarily because auth() resets it
33 become = self.saved_become_user
34 await self.auth(self.username, self.password)
35
36 if become:
37 await self.become_user(become)
Brad Bishopa34c0302019-09-23 22:34:48 -040038
Patrick Williams2f814a62024-04-16 16:28:03 -050039 async def send_stream(self, mode, msg):
Andrew Geissler6ce62a22020-11-30 19:58:47 -060040 async def proc():
Patrick Williams2f814a62024-04-16 16:28:03 -050041 await self._set_mode(mode)
Patrick Williamsac13d5f2023-11-24 18:59:46 -060042 await self.socket.send(msg)
43 return await self.socket.recv()
Brad Bishopa34c0302019-09-23 22:34:48 -040044
Andrew Geissler6ce62a22020-11-30 19:58:47 -060045 return await self._send_wrapper(proc)
Brad Bishopa34c0302019-09-23 22:34:48 -040046
Patrick Williams2f814a62024-04-16 16:28:03 -050047 async def invoke(self, *args, **kwargs):
48 # It's OK if connection errors cause a failure here, because the mode
49 # is also reset to normal on a new connection
50 await self._set_mode(self.MODE_NORMAL)
51 return await super().invoke(*args, **kwargs)
52
Andrew Geissler6ce62a22020-11-30 19:58:47 -060053 async def _set_mode(self, new_mode):
Patrick Williamsac13d5f2023-11-24 18:59:46 -060054 async def stream_to_normal():
55 await self.socket.send("END")
56 return await self.socket.recv()
57
Patrick Williams73bd93f2024-02-20 08:07:48 -060058 async def normal_to_stream(command):
59 r = await self.invoke({command: None})
60 if r != "ok":
61 raise ConnectionError(
62 f"Unable to transition to stream mode: Bad response from server {r!r}"
63 )
64
65 self.logger.debug("Mode is now %s", command)
66
67 if new_mode == self.mode:
68 return
69
70 self.logger.debug("Transitioning mode %s -> %s", self.mode, new_mode)
71
72 # Always transition to normal mode before switching to any other mode
73 if self.mode != self.MODE_NORMAL:
Patrick Williamsac13d5f2023-11-24 18:59:46 -060074 r = await self._send_wrapper(stream_to_normal)
Andrew Geissler6ce62a22020-11-30 19:58:47 -060075 if r != "ok":
Patrick Williamsac13d5f2023-11-24 18:59:46 -060076 self.check_invoke_error(r)
Patrick Williams73bd93f2024-02-20 08:07:48 -060077 raise ConnectionError(
78 f"Unable to transition to normal mode: Bad response from server {r!r}"
79 )
80 self.logger.debug("Mode is now normal")
81
82 if new_mode == self.MODE_GET_STREAM:
83 await normal_to_stream("get-stream")
84 elif new_mode == self.MODE_EXIST_STREAM:
85 await normal_to_stream("exists-stream")
86 elif new_mode != self.MODE_NORMAL:
87 raise Exception("Undefined mode transition {self.mode!r} -> {new_mode!r}")
Brad Bishopa34c0302019-09-23 22:34:48 -040088
89 self.mode = new_mode
90
Andrew Geissler6ce62a22020-11-30 19:58:47 -060091 async def get_unihash(self, method, taskhash):
Patrick Williams2f814a62024-04-16 16:28:03 -050092 r = await self.send_stream(self.MODE_GET_STREAM, "%s %s" % (method, taskhash))
Brad Bishopa34c0302019-09-23 22:34:48 -040093 if not r:
94 return None
95 return r
96
Andrew Geissler6ce62a22020-11-30 19:58:47 -060097 async def report_unihash(self, taskhash, method, outhash, unihash, extra={}):
Brad Bishopa34c0302019-09-23 22:34:48 -040098 m = extra.copy()
Andrew Geissler6ce62a22020-11-30 19:58:47 -060099 m["taskhash"] = taskhash
100 m["method"] = method
101 m["outhash"] = outhash
102 m["unihash"] = unihash
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600103 return await self.invoke({"report": m})
Brad Bishopa34c0302019-09-23 22:34:48 -0400104
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600105 async def report_unihash_equiv(self, taskhash, method, unihash, extra={}):
Andrew Geissler82c905d2020-04-13 13:39:40 -0500106 m = extra.copy()
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600107 m["taskhash"] = taskhash
108 m["method"] = method
109 m["unihash"] = unihash
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600110 return await self.invoke({"report-equiv": m})
Andrew Geissler82c905d2020-04-13 13:39:40 -0500111
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600112 async def get_taskhash(self, method, taskhash, all_properties=False):
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600113 return await self.invoke(
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600114 {"get": {"taskhash": taskhash, "method": method, "all": all_properties}}
115 )
Andrew Geissler475cb722020-07-10 16:00:51 -0500116
Patrick Williams73bd93f2024-02-20 08:07:48 -0600117 async def unihash_exists(self, unihash):
Patrick Williams2f814a62024-04-16 16:28:03 -0500118 r = await self.send_stream(self.MODE_EXIST_STREAM, unihash)
Patrick Williams73bd93f2024-02-20 08:07:48 -0600119 return r == "true"
120
Andrew Geissler20137392023-10-12 04:59:14 -0600121 async def get_outhash(self, method, outhash, taskhash, with_unihash=True):
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600122 return await self.invoke(
123 {
124 "get-outhash": {
125 "outhash": outhash,
126 "taskhash": taskhash,
127 "method": method,
128 "with_unihash": with_unihash,
129 }
130 }
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600131 )
132
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600133 async def get_stats(self):
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600134 return await self.invoke({"get-stats": None})
Brad Bishopa34c0302019-09-23 22:34:48 -0400135
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600136 async def reset_stats(self):
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600137 return await self.invoke({"reset-stats": None})
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600138
139 async def backfill_wait(self):
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600140 return (await self.invoke({"backfill-wait": None}))["tasks"]
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600141
Andrew Geissler20137392023-10-12 04:59:14 -0600142 async def remove(self, where):
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600143 return await self.invoke({"remove": {"where": where}})
Andrew Geissler20137392023-10-12 04:59:14 -0600144
145 async def clean_unused(self, max_age):
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600146 return await self.invoke({"clean-unused": {"max_age_seconds": max_age}})
147
148 async def auth(self, username, token):
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600149 result = await self.invoke({"auth": {"username": username, "token": token}})
150 self.username = username
151 self.password = token
152 self.saved_become_user = None
153 return result
154
155 async def refresh_token(self, username=None):
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600156 m = {}
157 if username:
158 m["username"] = username
159 result = await self.invoke({"refresh-token": m})
160 if (
161 self.username
162 and not self.saved_become_user
163 and result["username"] == self.username
164 ):
165 self.password = result["token"]
166 return result
167
168 async def set_user_perms(self, username, permissions):
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600169 return await self.invoke(
170 {"set-user-perms": {"username": username, "permissions": permissions}}
171 )
172
173 async def get_user(self, username=None):
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600174 m = {}
175 if username:
176 m["username"] = username
177 return await self.invoke({"get-user": m})
178
179 async def get_all_users(self):
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600180 return (await self.invoke({"get-all-users": {}}))["users"]
181
182 async def new_user(self, username, permissions):
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600183 return await self.invoke(
184 {"new-user": {"username": username, "permissions": permissions}}
185 )
186
187 async def delete_user(self, username):
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600188 return await self.invoke({"delete-user": {"username": username}})
189
190 async def become_user(self, username):
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600191 result = await self.invoke({"become-user": {"username": username}})
192 if username == self.username:
193 self.saved_become_user = None
194 else:
195 self.saved_become_user = username
196 return result
197
198 async def get_db_usage(self):
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600199 return (await self.invoke({"get-db-usage": {}}))["usage"]
200
201 async def get_db_query_columns(self):
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600202 return (await self.invoke({"get-db-query-columns": {}}))["columns"]
Andrew Geissler20137392023-10-12 04:59:14 -0600203
Patrick Williams73bd93f2024-02-20 08:07:48 -0600204 async def gc_status(self):
Patrick Williams73bd93f2024-02-20 08:07:48 -0600205 return await self.invoke({"gc-status": {}})
206
207 async def gc_mark(self, mark, where):
208 """
209 Starts a new garbage collection operation identified by "mark". If
210 garbage collection is already in progress with "mark", the collection
211 is continued.
212
213 All unihash entries that match the "where" clause are marked to be
214 kept. In addition, any new entries added to the database after this
215 command will be automatically marked with "mark"
216 """
Patrick Williams73bd93f2024-02-20 08:07:48 -0600217 return await self.invoke({"gc-mark": {"mark": mark, "where": where}})
218
219 async def gc_sweep(self, mark):
220 """
221 Finishes garbage collection for "mark". All unihash entries that have
222 not been marked will be deleted.
223
224 It is recommended to clean unused outhash entries after running this to
225 cleanup any dangling outhashes
226 """
Patrick Williams73bd93f2024-02-20 08:07:48 -0600227 return await self.invoke({"gc-sweep": {"mark": mark}})
228
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600229
Andrew Geisslerc926e172021-05-07 16:11:35 -0500230class Client(bb.asyncrpc.Client):
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600231 def __init__(self, username=None, password=None):
232 self.username = username
233 self.password = password
234
Andrew Geisslerc926e172021-05-07 16:11:35 -0500235 super().__init__()
236 self._add_methods(
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600237 "connect_tcp",
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600238 "connect_websocket",
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600239 "get_unihash",
240 "report_unihash",
241 "report_unihash_equiv",
242 "get_taskhash",
Patrick Williams73bd93f2024-02-20 08:07:48 -0600243 "unihash_exists",
Andrew Geisslereff27472021-10-29 15:35:00 -0500244 "get_outhash",
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600245 "get_stats",
246 "reset_stats",
247 "backfill_wait",
Andrew Geissler20137392023-10-12 04:59:14 -0600248 "remove",
249 "clean_unused",
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600250 "auth",
251 "refresh_token",
252 "set_user_perms",
253 "get_user",
254 "get_all_users",
255 "new_user",
256 "delete_user",
257 "become_user",
258 "get_db_usage",
259 "get_db_query_columns",
Patrick Williams73bd93f2024-02-20 08:07:48 -0600260 "gc_status",
261 "gc_mark",
262 "gc_sweep",
Andrew Geisslerc926e172021-05-07 16:11:35 -0500263 )
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600264
Andrew Geisslerc926e172021-05-07 16:11:35 -0500265 def _get_async_client(self):
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600266 return AsyncClient(self.username, self.password)
Patrick Williams73bd93f2024-02-20 08:07:48 -0600267
268
269class ClientPool(bb.asyncrpc.ClientPool):
270 def __init__(
271 self,
272 address,
273 max_clients,
274 *,
275 username=None,
276 password=None,
277 become=None,
278 ):
279 super().__init__(max_clients)
280 self.address = address
281 self.username = username
282 self.password = password
283 self.become = become
284
285 async def _new_client(self):
286 client = await create_async_client(
287 self.address,
288 username=self.username,
289 password=self.password,
290 )
291 if self.become:
292 await client.become_user(self.become)
293 return client
294
295 def _run_key_tasks(self, queries, call):
296 results = {key: None for key in queries.keys()}
297
298 def make_task(key, args):
299 async def task(client):
300 nonlocal results
301 unihash = await call(client, args)
302 results[key] = unihash
303
304 return task
305
306 def gen_tasks():
307 for key, args in queries.items():
308 yield make_task(key, args)
309
310 self.run_tasks(gen_tasks())
311 return results
312
313 def get_unihashes(self, queries):
314 """
315 Query multiple unihashes in parallel.
316
317 The queries argument is a dictionary with arbitrary key. The values
318 must be a tuple of (method, taskhash).
319
320 Returns a dictionary with a corresponding key for each input key, and
321 the value is the queried unihash (which might be none if the query
322 failed)
323 """
324
325 async def call(client, args):
326 method, taskhash = args
327 return await client.get_unihash(method, taskhash)
328
329 return self._run_key_tasks(queries, call)
330
331 def unihashes_exist(self, queries):
332 """
333 Query multiple unihash existence checks in parallel.
334
335 The queries argument is a dictionary with arbitrary key. The values
336 must be a unihash.
337
338 Returns a dictionary with a corresponding key for each input key, and
339 the value is True or False if the unihash is known by the server (or
340 None if there was a failure)
341 """
342
343 async def call(client, unihash):
344 return await client.unihash_exists(unihash)
345
346 return self._run_key_tasks(queries, call)