blob: 34960197d138e7148217bfda2b8b6256faa1f2f4 [file] [log] [blame]
Andrew Geisslerc926e172021-05-07 16:11:35 -05001#
2# SPDX-License-Identifier: GPL-2.0-only
3#
4
5import abc
6import asyncio
7import json
8import os
9import socket
Andrew Geisslereff27472021-10-29 15:35:00 -050010import sys
Andrew Geisslerc926e172021-05-07 16:11:35 -050011from . import chunkify, DEFAULT_MAX_CHUNK
12
13
14class AsyncClient(object):
Patrick Williams213cb262021-08-07 19:21:33 -050015 def __init__(self, proto_name, proto_version, logger, timeout=30):
Andrew Geisslerc926e172021-05-07 16:11:35 -050016 self.reader = None
17 self.writer = None
18 self.max_chunk = DEFAULT_MAX_CHUNK
19 self.proto_name = proto_name
20 self.proto_version = proto_version
21 self.logger = logger
Patrick Williams213cb262021-08-07 19:21:33 -050022 self.timeout = timeout
Andrew Geisslerc926e172021-05-07 16:11:35 -050023
24 async def connect_tcp(self, address, port):
25 async def connect_sock():
26 return await asyncio.open_connection(address, port)
27
28 self._connect_sock = connect_sock
29
30 async def connect_unix(self, path):
31 async def connect_sock():
32 return await asyncio.open_unix_connection(path)
33
34 self._connect_sock = connect_sock
35
36 async def setup_connection(self):
37 s = '%s %s\n\n' % (self.proto_name, self.proto_version)
38 self.writer.write(s.encode("utf-8"))
39 await self.writer.drain()
40
41 async def connect(self):
42 if self.reader is None or self.writer is None:
43 (self.reader, self.writer) = await self._connect_sock()
44 await self.setup_connection()
45
46 async def close(self):
47 self.reader = None
48
49 if self.writer is not None:
50 self.writer.close()
51 self.writer = None
52
53 async def _send_wrapper(self, proc):
54 count = 0
55 while True:
56 try:
57 await self.connect()
58 return await proc()
59 except (
60 OSError,
61 ConnectionError,
62 json.JSONDecodeError,
63 UnicodeDecodeError,
64 ) as e:
65 self.logger.warning("Error talking to server: %s" % e)
66 if count >= 3:
67 if not isinstance(e, ConnectionError):
68 raise ConnectionError(str(e))
69 raise e
70 await self.close()
71 count += 1
72
73 async def send_message(self, msg):
74 async def get_line():
Patrick Williams213cb262021-08-07 19:21:33 -050075 try:
76 line = await asyncio.wait_for(self.reader.readline(), self.timeout)
77 except asyncio.TimeoutError:
78 raise ConnectionError("Timed out waiting for server")
79
Andrew Geisslerc926e172021-05-07 16:11:35 -050080 if not line:
81 raise ConnectionError("Connection closed")
82
83 line = line.decode("utf-8")
84
85 if not line.endswith("\n"):
Patrick Williams213cb262021-08-07 19:21:33 -050086 raise ConnectionError("Bad message %r" % (line))
Andrew Geisslerc926e172021-05-07 16:11:35 -050087
88 return line
89
90 async def proc():
91 for c in chunkify(json.dumps(msg), self.max_chunk):
92 self.writer.write(c.encode("utf-8"))
93 await self.writer.drain()
94
95 l = await get_line()
96
97 m = json.loads(l)
98 if m and "chunk-stream" in m:
99 lines = []
100 while True:
101 l = (await get_line()).rstrip("\n")
102 if not l:
103 break
104 lines.append(l)
105
106 m = json.loads("".join(lines))
107
108 return m
109
110 return await self._send_wrapper(proc)
111
Andrew Geissler09036742021-06-25 14:25:14 -0500112 async def ping(self):
113 return await self.send_message(
114 {'ping': {}}
115 )
116
Andrew Geisslerc926e172021-05-07 16:11:35 -0500117
118class Client(object):
119 def __init__(self):
120 self.client = self._get_async_client()
121 self.loop = asyncio.new_event_loop()
122
Andrew Geisslerd159c7f2021-09-02 21:05:58 -0500123 # Override any pre-existing loop.
124 # Without this, the PR server export selftest triggers a hang
125 # when running with Python 3.7. The drawback is that there is
126 # potential for issues if the PR and hash equiv (or some new)
127 # clients need to both be instantiated in the same process.
128 # This should be revisited if/when Python 3.9 becomes the
129 # minimum required version for BitBake, as it seems not
130 # required (but harmless) with it.
131 asyncio.set_event_loop(self.loop)
132
Andrew Geisslereff27472021-10-29 15:35:00 -0500133 self._add_methods('connect_tcp', 'ping')
Andrew Geisslerc926e172021-05-07 16:11:35 -0500134
135 @abc.abstractmethod
136 def _get_async_client(self):
137 pass
138
139 def _get_downcall_wrapper(self, downcall):
140 def wrapper(*args, **kwargs):
141 return self.loop.run_until_complete(downcall(*args, **kwargs))
142
143 return wrapper
144
145 def _add_methods(self, *methods):
146 for m in methods:
147 downcall = getattr(self.client, m)
148 setattr(self, m, self._get_downcall_wrapper(downcall))
149
150 def connect_unix(self, path):
151 # AF_UNIX has path length issues so chdir here to workaround
152 cwd = os.getcwd()
153 try:
154 os.chdir(os.path.dirname(path))
155 self.loop.run_until_complete(self.client.connect_unix(os.path.basename(path)))
156 self.loop.run_until_complete(self.client.connect())
157 finally:
158 os.chdir(cwd)
159
160 @property
161 def max_chunk(self):
162 return self.client.max_chunk
163
164 @max_chunk.setter
165 def max_chunk(self, value):
166 self.client.max_chunk = value
Andrew Geisslereff27472021-10-29 15:35:00 -0500167
168 def close(self):
169 self.loop.run_until_complete(self.client.close())
170 if sys.version_info >= (3, 6):
171 self.loop.run_until_complete(self.loop.shutdown_asyncgens())
172 self.loop.close()