blob: 0ffd0c2ae28f6f1e8f46a1f68020999ee6eff0cf [file] [log] [blame]
Brad Bishopa34c0302019-09-23 22:34:48 -04001# Copyright (C) 2019 Garmin Ltd.
2#
3# SPDX-License-Identifier: GPL-2.0-only
4#
5
Andrew Geissler6ce62a22020-11-30 19:58:47 -06006import asyncio
Brad Bishopa34c0302019-09-23 22:34:48 -04007import json
8import logging
9import socket
Brad Bishop00e122a2019-10-05 11:10:57 -040010import os
Andrew Geissler6ce62a22020-11-30 19:58:47 -060011from . import chunkify, DEFAULT_MAX_CHUNK, create_async_client
Brad Bishopa34c0302019-09-23 22:34:48 -040012
13
Andrew Geissler6ce62a22020-11-30 19:58:47 -060014logger = logging.getLogger("hashserv.client")
Brad Bishopa34c0302019-09-23 22:34:48 -040015
16
17class HashConnectionError(Exception):
18 pass
19
20
Andrew Geissler6ce62a22020-11-30 19:58:47 -060021class AsyncClient(object):
Brad Bishopa34c0302019-09-23 22:34:48 -040022 MODE_NORMAL = 0
23 MODE_GET_STREAM = 1
24
25 def __init__(self):
Brad Bishopa34c0302019-09-23 22:34:48 -040026 self.reader = None
27 self.writer = None
28 self.mode = self.MODE_NORMAL
Andrew Geissler475cb722020-07-10 16:00:51 -050029 self.max_chunk = DEFAULT_MAX_CHUNK
Brad Bishopa34c0302019-09-23 22:34:48 -040030
Andrew Geissler6ce62a22020-11-30 19:58:47 -060031 async def connect_tcp(self, address, port):
32 async def connect_sock():
33 return await asyncio.open_connection(address, port)
Brad Bishopa34c0302019-09-23 22:34:48 -040034
35 self._connect_sock = connect_sock
36
Andrew Geissler6ce62a22020-11-30 19:58:47 -060037 async def connect_unix(self, path):
38 async def connect_sock():
39 return await asyncio.open_unix_connection(path)
Brad Bishopa34c0302019-09-23 22:34:48 -040040
41 self._connect_sock = connect_sock
42
Andrew Geissler09209ee2020-12-13 08:44:15 -060043 async def connect(self):
Andrew Geissler6ce62a22020-11-30 19:58:47 -060044 if self.reader is None or self.writer is None:
45 (self.reader, self.writer) = await self._connect_sock()
Brad Bishopa34c0302019-09-23 22:34:48 -040046
Andrew Geissler6ce62a22020-11-30 19:58:47 -060047 self.writer.write("OEHASHEQUIV 1.1\n\n".encode("utf-8"))
48 await self.writer.drain()
Brad Bishopa34c0302019-09-23 22:34:48 -040049
Brad Bishopa34c0302019-09-23 22:34:48 -040050 cur_mode = self.mode
51 self.mode = self.MODE_NORMAL
Andrew Geissler6ce62a22020-11-30 19:58:47 -060052 await self._set_mode(cur_mode)
Brad Bishopa34c0302019-09-23 22:34:48 -040053
Andrew Geissler6ce62a22020-11-30 19:58:47 -060054 async def close(self):
55 self.reader = None
Brad Bishopa34c0302019-09-23 22:34:48 -040056
Andrew Geissler6ce62a22020-11-30 19:58:47 -060057 if self.writer is not None:
58 self.writer.close()
Brad Bishopa34c0302019-09-23 22:34:48 -040059 self.writer = None
60
Andrew Geissler6ce62a22020-11-30 19:58:47 -060061 async def _send_wrapper(self, proc):
Brad Bishopa34c0302019-09-23 22:34:48 -040062 count = 0
63 while True:
64 try:
Andrew Geissler09209ee2020-12-13 08:44:15 -060065 await self.connect()
Andrew Geissler6ce62a22020-11-30 19:58:47 -060066 return await proc()
67 except (
68 OSError,
69 HashConnectionError,
70 json.JSONDecodeError,
71 UnicodeDecodeError,
72 ) as e:
73 logger.warning("Error talking to server: %s" % e)
Brad Bishopa34c0302019-09-23 22:34:48 -040074 if count >= 3:
75 if not isinstance(e, HashConnectionError):
76 raise HashConnectionError(str(e))
77 raise e
Andrew Geissler6ce62a22020-11-30 19:58:47 -060078 await self.close()
Brad Bishopa34c0302019-09-23 22:34:48 -040079 count += 1
80
Andrew Geissler6ce62a22020-11-30 19:58:47 -060081 async def send_message(self, msg):
82 async def get_line():
83 line = await self.reader.readline()
Andrew Geissler475cb722020-07-10 16:00:51 -050084 if not line:
Andrew Geissler6ce62a22020-11-30 19:58:47 -060085 raise HashConnectionError("Connection closed")
Brad Bishopa34c0302019-09-23 22:34:48 -040086
Andrew Geissler6ce62a22020-11-30 19:58:47 -060087 line = line.decode("utf-8")
88
89 if not line.endswith("\n"):
90 raise HashConnectionError("Bad message %r" % message)
Brad Bishopa34c0302019-09-23 22:34:48 -040091
Andrew Geissler475cb722020-07-10 16:00:51 -050092 return line
93
Andrew Geissler6ce62a22020-11-30 19:58:47 -060094 async def proc():
Andrew Geissler475cb722020-07-10 16:00:51 -050095 for c in chunkify(json.dumps(msg), self.max_chunk):
Andrew Geissler6ce62a22020-11-30 19:58:47 -060096 self.writer.write(c.encode("utf-8"))
97 await self.writer.drain()
Andrew Geissler475cb722020-07-10 16:00:51 -050098
Andrew Geissler6ce62a22020-11-30 19:58:47 -060099 l = await get_line()
Andrew Geissler475cb722020-07-10 16:00:51 -0500100
101 m = json.loads(l)
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600102 if "chunk-stream" in m:
Andrew Geissler475cb722020-07-10 16:00:51 -0500103 lines = []
104 while True:
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600105 l = (await get_line()).rstrip("\n")
Andrew Geissler475cb722020-07-10 16:00:51 -0500106 if not l:
107 break
108 lines.append(l)
109
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600110 m = json.loads("".join(lines))
Andrew Geissler475cb722020-07-10 16:00:51 -0500111
112 return m
Brad Bishopa34c0302019-09-23 22:34:48 -0400113
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600114 return await self._send_wrapper(proc)
Brad Bishopa34c0302019-09-23 22:34:48 -0400115
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600116 async def send_stream(self, msg):
117 async def proc():
118 self.writer.write(("%s\n" % msg).encode("utf-8"))
119 await self.writer.drain()
120 l = await self.reader.readline()
Brad Bishopa34c0302019-09-23 22:34:48 -0400121 if not l:
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600122 raise HashConnectionError("Connection closed")
123 return l.decode("utf-8").rstrip()
Brad Bishopa34c0302019-09-23 22:34:48 -0400124
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600125 return await self._send_wrapper(proc)
Brad Bishopa34c0302019-09-23 22:34:48 -0400126
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600127 async def _set_mode(self, new_mode):
Brad Bishopa34c0302019-09-23 22:34:48 -0400128 if new_mode == self.MODE_NORMAL and self.mode == self.MODE_GET_STREAM:
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600129 r = await self.send_stream("END")
130 if r != "ok":
131 raise HashConnectionError("Bad response from server %r" % r)
Brad Bishopa34c0302019-09-23 22:34:48 -0400132 elif new_mode == self.MODE_GET_STREAM and self.mode == self.MODE_NORMAL:
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600133 r = await self.send_message({"get-stream": None})
134 if r != "ok":
135 raise HashConnectionError("Bad response from server %r" % r)
Brad Bishopa34c0302019-09-23 22:34:48 -0400136 elif new_mode != self.mode:
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600137 raise Exception(
138 "Undefined mode transition %r -> %r" % (self.mode, new_mode)
139 )
Brad Bishopa34c0302019-09-23 22:34:48 -0400140
141 self.mode = new_mode
142
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600143 async def get_unihash(self, method, taskhash):
144 await self._set_mode(self.MODE_GET_STREAM)
145 r = await self.send_stream("%s %s" % (method, taskhash))
Brad Bishopa34c0302019-09-23 22:34:48 -0400146 if not r:
147 return None
148 return r
149
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600150 async def report_unihash(self, taskhash, method, outhash, unihash, extra={}):
151 await self._set_mode(self.MODE_NORMAL)
Brad Bishopa34c0302019-09-23 22:34:48 -0400152 m = extra.copy()
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600153 m["taskhash"] = taskhash
154 m["method"] = method
155 m["outhash"] = outhash
156 m["unihash"] = unihash
157 return await self.send_message({"report": m})
Brad Bishopa34c0302019-09-23 22:34:48 -0400158
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600159 async def report_unihash_equiv(self, taskhash, method, unihash, extra={}):
160 await self._set_mode(self.MODE_NORMAL)
Andrew Geissler82c905d2020-04-13 13:39:40 -0500161 m = extra.copy()
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600162 m["taskhash"] = taskhash
163 m["method"] = method
164 m["unihash"] = unihash
165 return await self.send_message({"report-equiv": m})
Andrew Geissler82c905d2020-04-13 13:39:40 -0500166
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600167 async def get_taskhash(self, method, taskhash, all_properties=False):
168 await self._set_mode(self.MODE_NORMAL)
169 return await self.send_message(
170 {"get": {"taskhash": taskhash, "method": method, "all": all_properties}}
171 )
Andrew Geissler475cb722020-07-10 16:00:51 -0500172
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600173 async def get_stats(self):
174 await self._set_mode(self.MODE_NORMAL)
175 return await self.send_message({"get-stats": None})
Brad Bishopa34c0302019-09-23 22:34:48 -0400176
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600177 async def reset_stats(self):
178 await self._set_mode(self.MODE_NORMAL)
179 return await self.send_message({"reset-stats": None})
180
181 async def backfill_wait(self):
182 await self._set_mode(self.MODE_NORMAL)
183 return (await self.send_message({"backfill-wait": None}))["tasks"]
184
185
186class Client(object):
187 def __init__(self):
188 self.client = AsyncClient()
189 self.loop = asyncio.new_event_loop()
190
191 for call in (
192 "connect_tcp",
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600193 "close",
194 "get_unihash",
195 "report_unihash",
196 "report_unihash_equiv",
197 "get_taskhash",
198 "get_stats",
199 "reset_stats",
200 "backfill_wait",
201 ):
202 downcall = getattr(self.client, call)
203 setattr(self, call, self._get_downcall_wrapper(downcall))
204
205 def _get_downcall_wrapper(self, downcall):
206 def wrapper(*args, **kwargs):
207 return self.loop.run_until_complete(downcall(*args, **kwargs))
208
209 return wrapper
210
Andrew Geissler09209ee2020-12-13 08:44:15 -0600211 def connect_unix(self, path):
212 # AF_UNIX has path length issues so chdir here to workaround
213 cwd = os.getcwd()
214 try:
215 os.chdir(os.path.dirname(path))
216 self.loop.run_until_complete(self.client.connect_unix(os.path.basename(path)))
217 self.loop.run_until_complete(self.client.connect())
218 finally:
219 os.chdir(cwd)
220
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600221 @property
222 def max_chunk(self):
223 return self.client.max_chunk
224
225 @max_chunk.setter
226 def max_chunk(self, value):
227 self.client.max_chunk = value