blob: 29a5ab76aa92030d92d3a79d413661d489bbbae8 [file] [log] [blame]
Andrew Geisslerc926e172021-05-07 16:11:35 -05001#
Patrick Williams92b42cb2022-09-03 06:53:57 -05002# Copyright BitBake Contributors
3#
Andrew Geisslerc926e172021-05-07 16:11:35 -05004# SPDX-License-Identifier: GPL-2.0-only
5#
6
7import abc
8import asyncio
9import json
10import os
11import socket
Andrew Geisslereff27472021-10-29 15:35:00 -050012import sys
Patrick Williams73bd93f2024-02-20 08:07:48 -060013import contextlib
14from threading import Thread
Patrick Williamsac13d5f2023-11-24 18:59:46 -060015from .connection import StreamConnection, WebsocketConnection, DEFAULT_MAX_CHUNK
16from .exceptions import ConnectionClosedError, InvokeError
Andrew Geisslerc926e172021-05-07 16:11:35 -050017
18
19class AsyncClient(object):
Patrick Williams39653562024-03-01 08:54:02 -060020 def __init__(
21 self,
22 proto_name,
23 proto_version,
24 logger,
25 timeout=30,
26 server_headers=False,
27 headers={},
28 ):
Patrick Williamsac13d5f2023-11-24 18:59:46 -060029 self.socket = None
Andrew Geisslerc926e172021-05-07 16:11:35 -050030 self.max_chunk = DEFAULT_MAX_CHUNK
31 self.proto_name = proto_name
32 self.proto_version = proto_version
33 self.logger = logger
Patrick Williams213cb262021-08-07 19:21:33 -050034 self.timeout = timeout
Patrick Williams39653562024-03-01 08:54:02 -060035 self.needs_server_headers = server_headers
36 self.server_headers = {}
37 self.headers = headers
Andrew Geisslerc926e172021-05-07 16:11:35 -050038
39 async def connect_tcp(self, address, port):
40 async def connect_sock():
Patrick Williamsac13d5f2023-11-24 18:59:46 -060041 reader, writer = await asyncio.open_connection(address, port)
42 return StreamConnection(reader, writer, self.timeout, self.max_chunk)
Andrew Geisslerc926e172021-05-07 16:11:35 -050043
44 self._connect_sock = connect_sock
45
46 async def connect_unix(self, path):
47 async def connect_sock():
Andrew Geissler87f5cff2022-09-30 13:13:31 -050048 # AF_UNIX has path length issues so chdir here to workaround
49 cwd = os.getcwd()
50 try:
51 os.chdir(os.path.dirname(path))
52 # The socket must be opened synchronously so that CWD doesn't get
53 # changed out from underneath us so we pass as a sock into asyncio
54 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
55 sock.connect(os.path.basename(path))
56 finally:
Patrick Williamsac13d5f2023-11-24 18:59:46 -060057 os.chdir(cwd)
58 reader, writer = await asyncio.open_unix_connection(sock=sock)
59 return StreamConnection(reader, writer, self.timeout, self.max_chunk)
60
61 self._connect_sock = connect_sock
62
63 async def connect_websocket(self, uri):
64 import websockets
65
66 async def connect_sock():
67 websocket = await websockets.connect(uri, ping_interval=None)
68 return WebsocketConnection(websocket, self.timeout)
Andrew Geisslerc926e172021-05-07 16:11:35 -050069
70 self._connect_sock = connect_sock
71
72 async def setup_connection(self):
Patrick Williamsac13d5f2023-11-24 18:59:46 -060073 # Send headers
74 await self.socket.send("%s %s" % (self.proto_name, self.proto_version))
Patrick Williams39653562024-03-01 08:54:02 -060075 await self.socket.send(
76 "needs-headers: %s" % ("true" if self.needs_server_headers else "false")
77 )
78 for k, v in self.headers.items():
79 await self.socket.send("%s: %s" % (k, v))
80
Patrick Williamsac13d5f2023-11-24 18:59:46 -060081 # End of headers
82 await self.socket.send("")
Andrew Geisslerc926e172021-05-07 16:11:35 -050083
Patrick Williams39653562024-03-01 08:54:02 -060084 self.server_headers = {}
85 if self.needs_server_headers:
86 while True:
87 line = await self.socket.recv()
88 if not line:
89 # End headers
90 break
91 tag, value = line.split(":", 1)
92 self.server_headers[tag.lower()] = value.strip()
93
94 async def get_header(self, tag, default):
95 await self.connect()
96 return self.server_headers.get(tag, default)
97
Andrew Geisslerc926e172021-05-07 16:11:35 -050098 async def connect(self):
Patrick Williamsac13d5f2023-11-24 18:59:46 -060099 if self.socket is None:
100 self.socket = await self._connect_sock()
Andrew Geisslerc926e172021-05-07 16:11:35 -0500101 await self.setup_connection()
102
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600103 async def disconnect(self):
104 if self.socket is not None:
105 await self.socket.close()
106 self.socket = None
Andrew Geisslerc926e172021-05-07 16:11:35 -0500107
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600108 async def close(self):
109 await self.disconnect()
Andrew Geisslerc926e172021-05-07 16:11:35 -0500110
111 async def _send_wrapper(self, proc):
112 count = 0
113 while True:
114 try:
115 await self.connect()
116 return await proc()
117 except (
118 OSError,
119 ConnectionError,
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600120 ConnectionClosedError,
Andrew Geisslerc926e172021-05-07 16:11:35 -0500121 json.JSONDecodeError,
122 UnicodeDecodeError,
123 ) as e:
124 self.logger.warning("Error talking to server: %s" % e)
125 if count >= 3:
126 if not isinstance(e, ConnectionError):
127 raise ConnectionError(str(e))
128 raise e
129 await self.close()
130 count += 1
131
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600132 def check_invoke_error(self, msg):
133 if isinstance(msg, dict) and "invoke-error" in msg:
134 raise InvokeError(msg["invoke-error"]["message"])
Patrick Williams213cb262021-08-07 19:21:33 -0500135
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600136 async def invoke(self, msg):
Andrew Geisslerc926e172021-05-07 16:11:35 -0500137 async def proc():
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600138 await self.socket.send_message(msg)
139 return await self.socket.recv_message()
Andrew Geisslerc926e172021-05-07 16:11:35 -0500140
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600141 result = await self._send_wrapper(proc)
142 self.check_invoke_error(result)
143 return result
Andrew Geisslerc926e172021-05-07 16:11:35 -0500144
Andrew Geissler09036742021-06-25 14:25:14 -0500145 async def ping(self):
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600146 return await self.invoke({"ping": {}})
147
148 async def __aenter__(self):
149 return self
150
151 async def __aexit__(self, exc_type, exc_value, traceback):
152 await self.close()
Andrew Geissler09036742021-06-25 14:25:14 -0500153
Andrew Geisslerc926e172021-05-07 16:11:35 -0500154
155class Client(object):
156 def __init__(self):
157 self.client = self._get_async_client()
158 self.loop = asyncio.new_event_loop()
159
Andrew Geisslerd159c7f2021-09-02 21:05:58 -0500160 # Override any pre-existing loop.
161 # Without this, the PR server export selftest triggers a hang
162 # when running with Python 3.7. The drawback is that there is
163 # potential for issues if the PR and hash equiv (or some new)
164 # clients need to both be instantiated in the same process.
165 # This should be revisited if/when Python 3.9 becomes the
166 # minimum required version for BitBake, as it seems not
167 # required (but harmless) with it.
168 asyncio.set_event_loop(self.loop)
169
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600170 self._add_methods("connect_tcp", "ping")
Andrew Geisslerc926e172021-05-07 16:11:35 -0500171
172 @abc.abstractmethod
173 def _get_async_client(self):
174 pass
175
176 def _get_downcall_wrapper(self, downcall):
177 def wrapper(*args, **kwargs):
178 return self.loop.run_until_complete(downcall(*args, **kwargs))
179
180 return wrapper
181
182 def _add_methods(self, *methods):
183 for m in methods:
184 downcall = getattr(self.client, m)
185 setattr(self, m, self._get_downcall_wrapper(downcall))
186
187 def connect_unix(self, path):
Andrew Geissler87f5cff2022-09-30 13:13:31 -0500188 self.loop.run_until_complete(self.client.connect_unix(path))
189 self.loop.run_until_complete(self.client.connect())
Andrew Geisslerc926e172021-05-07 16:11:35 -0500190
191 @property
192 def max_chunk(self):
193 return self.client.max_chunk
194
195 @max_chunk.setter
196 def max_chunk(self, value):
197 self.client.max_chunk = value
Andrew Geisslereff27472021-10-29 15:35:00 -0500198
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600199 def disconnect(self):
Andrew Geisslereff27472021-10-29 15:35:00 -0500200 self.loop.run_until_complete(self.client.close())
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600201
202 def close(self):
203 if self.loop:
204 self.loop.run_until_complete(self.client.close())
205 if sys.version_info >= (3, 6):
206 self.loop.run_until_complete(self.loop.shutdown_asyncgens())
207 self.loop.close()
208 self.loop = None
209
210 def __enter__(self):
211 return self
212
213 def __exit__(self, exc_type, exc_value, traceback):
214 self.close()
215 return False
Patrick Williams73bd93f2024-02-20 08:07:48 -0600216
217
218class ClientPool(object):
219 def __init__(self, max_clients):
220 self.avail_clients = []
221 self.num_clients = 0
222 self.max_clients = max_clients
223 self.loop = None
224 self.client_condition = None
225
226 @abc.abstractmethod
227 async def _new_client(self):
228 raise NotImplementedError("Must be implemented in derived class")
229
230 def close(self):
231 if self.client_condition:
232 self.client_condition = None
233
234 if self.loop:
235 self.loop.run_until_complete(self.__close_clients())
236 self.loop.run_until_complete(self.loop.shutdown_asyncgens())
237 self.loop.close()
238 self.loop = None
239
240 def run_tasks(self, tasks):
241 if not self.loop:
242 self.loop = asyncio.new_event_loop()
243
244 thread = Thread(target=self.__thread_main, args=(tasks,))
245 thread.start()
246 thread.join()
247
248 @contextlib.asynccontextmanager
249 async def get_client(self):
250 async with self.client_condition:
251 if self.avail_clients:
252 client = self.avail_clients.pop()
253 elif self.num_clients < self.max_clients:
254 self.num_clients += 1
255 client = await self._new_client()
256 else:
257 while not self.avail_clients:
258 await self.client_condition.wait()
259 client = self.avail_clients.pop()
260
261 try:
262 yield client
263 finally:
264 async with self.client_condition:
265 self.avail_clients.append(client)
266 self.client_condition.notify()
267
268 def __thread_main(self, tasks):
269 async def process_task(task):
270 async with self.get_client() as client:
271 await task(client)
272
273 asyncio.set_event_loop(self.loop)
274 if not self.client_condition:
275 self.client_condition = asyncio.Condition()
276 tasks = [process_task(t) for t in tasks]
277 self.loop.run_until_complete(asyncio.gather(*tasks))
278
279 async def __close_clients(self):
280 for c in self.avail_clients:
281 await c.close()
282 self.avail_clients = []
283 self.num_clients = 0
284
285 def __enter__(self):
286 return self
287
288 def __exit__(self, exc_type, exc_value, traceback):
289 self.close()
290 return False