blob: a29af836d901698026fd1d8d5cee645081acb9c9 [file] [log] [blame]
Brad Bishopa34c0302019-09-23 22:34:48 -04001# Copyright (C) 2019 Garmin Ltd.
2#
3# SPDX-License-Identifier: GPL-2.0-only
4#
5
Brad Bishopa34c0302019-09-23 22:34:48 -04006import json
7import logging
8import socket
Brad Bishop00e122a2019-10-05 11:10:57 -04009import os
Andrew Geissler475cb722020-07-10 16:00:51 -050010from . import chunkify, DEFAULT_MAX_CHUNK
Brad Bishopa34c0302019-09-23 22:34:48 -040011
12
13logger = logging.getLogger('hashserv.client')
14
15
16class HashConnectionError(Exception):
17 pass
18
19
20class Client(object):
21 MODE_NORMAL = 0
22 MODE_GET_STREAM = 1
23
24 def __init__(self):
25 self._socket = None
26 self.reader = None
27 self.writer = None
28 self.mode = self.MODE_NORMAL
Andrew Geissler475cb722020-07-10 16:00:51 -050029 self.max_chunk = DEFAULT_MAX_CHUNK
Brad Bishopa34c0302019-09-23 22:34:48 -040030
31 def connect_tcp(self, address, port):
32 def connect_sock():
33 s = socket.create_connection((address, port))
34
35 s.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
36 s.setsockopt(socket.SOL_TCP, socket.TCP_QUICKACK, 1)
37 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
38 return s
39
40 self._connect_sock = connect_sock
41
42 def connect_unix(self, path):
43 def connect_sock():
44 s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
45 # AF_UNIX has path length issues so chdir here to workaround
46 cwd = os.getcwd()
47 try:
48 os.chdir(os.path.dirname(path))
49 s.connect(os.path.basename(path))
50 finally:
51 os.chdir(cwd)
52 return s
53
54 self._connect_sock = connect_sock
55
56 def connect(self):
57 if self._socket is None:
58 self._socket = self._connect_sock()
59
60 self.reader = self._socket.makefile('r', encoding='utf-8')
61 self.writer = self._socket.makefile('w', encoding='utf-8')
62
Andrew Geissler475cb722020-07-10 16:00:51 -050063 self.writer.write('OEHASHEQUIV 1.1\n\n')
Brad Bishopa34c0302019-09-23 22:34:48 -040064 self.writer.flush()
65
66 # Restore mode if the socket is being re-created
67 cur_mode = self.mode
68 self.mode = self.MODE_NORMAL
69 self._set_mode(cur_mode)
70
71 return self._socket
72
73 def close(self):
74 if self._socket is not None:
75 self._socket.close()
76 self._socket = None
77 self.reader = None
78 self.writer = None
79
80 def _send_wrapper(self, proc):
81 count = 0
82 while True:
83 try:
84 self.connect()
85 return proc()
86 except (OSError, HashConnectionError, json.JSONDecodeError, UnicodeDecodeError) as e:
87 logger.warning('Error talking to server: %s' % e)
88 if count >= 3:
89 if not isinstance(e, HashConnectionError):
90 raise HashConnectionError(str(e))
91 raise e
92 self.close()
93 count += 1
94
95 def send_message(self, msg):
Andrew Geissler475cb722020-07-10 16:00:51 -050096 def get_line():
97 line = self.reader.readline()
98 if not line:
Brad Bishopa34c0302019-09-23 22:34:48 -040099 raise HashConnectionError('Connection closed')
100
Andrew Geissler475cb722020-07-10 16:00:51 -0500101 if not line.endswith('\n'):
Brad Bishopa34c0302019-09-23 22:34:48 -0400102 raise HashConnectionError('Bad message %r' % message)
103
Andrew Geissler475cb722020-07-10 16:00:51 -0500104 return line
105
106 def proc():
107 for c in chunkify(json.dumps(msg), self.max_chunk):
108 self.writer.write(c)
109 self.writer.flush()
110
111 l = get_line()
112
113 m = json.loads(l)
114 if 'chunk-stream' in m:
115 lines = []
116 while True:
117 l = get_line().rstrip('\n')
118 if not l:
119 break
120 lines.append(l)
121
122 m = json.loads(''.join(lines))
123
124 return m
Brad Bishopa34c0302019-09-23 22:34:48 -0400125
126 return self._send_wrapper(proc)
127
128 def send_stream(self, msg):
129 def proc():
130 self.writer.write("%s\n" % msg)
131 self.writer.flush()
132 l = self.reader.readline()
133 if not l:
134 raise HashConnectionError('Connection closed')
135 return l.rstrip()
136
137 return self._send_wrapper(proc)
138
139 def _set_mode(self, new_mode):
140 if new_mode == self.MODE_NORMAL and self.mode == self.MODE_GET_STREAM:
141 r = self.send_stream('END')
142 if r != 'ok':
143 raise HashConnectionError('Bad response from server %r' % r)
144 elif new_mode == self.MODE_GET_STREAM and self.mode == self.MODE_NORMAL:
145 r = self.send_message({'get-stream': None})
146 if r != 'ok':
147 raise HashConnectionError('Bad response from server %r' % r)
148 elif new_mode != self.mode:
149 raise Exception('Undefined mode transition %r -> %r' % (self.mode, new_mode))
150
151 self.mode = new_mode
152
153 def get_unihash(self, method, taskhash):
154 self._set_mode(self.MODE_GET_STREAM)
155 r = self.send_stream('%s %s' % (method, taskhash))
156 if not r:
157 return None
158 return r
159
160 def report_unihash(self, taskhash, method, outhash, unihash, extra={}):
161 self._set_mode(self.MODE_NORMAL)
162 m = extra.copy()
163 m['taskhash'] = taskhash
164 m['method'] = method
165 m['outhash'] = outhash
166 m['unihash'] = unihash
167 return self.send_message({'report': m})
168
Andrew Geissler82c905d2020-04-13 13:39:40 -0500169 def report_unihash_equiv(self, taskhash, method, unihash, extra={}):
170 self._set_mode(self.MODE_NORMAL)
171 m = extra.copy()
172 m['taskhash'] = taskhash
173 m['method'] = method
174 m['unihash'] = unihash
175 return self.send_message({'report-equiv': m})
176
Andrew Geissler475cb722020-07-10 16:00:51 -0500177 def get_taskhash(self, method, taskhash, all_properties=False):
178 self._set_mode(self.MODE_NORMAL)
179 return self.send_message({'get': {
180 'taskhash': taskhash,
181 'method': method,
182 'all': all_properties
183 }})
184
Brad Bishopa34c0302019-09-23 22:34:48 -0400185 def get_stats(self):
186 self._set_mode(self.MODE_NORMAL)
187 return self.send_message({'get-stats': None})
188
189 def reset_stats(self):
190 self._set_mode(self.MODE_NORMAL)
191 return self.send_message({'reset-stats': None})