Andrew Geissler | c926e17 | 2021-05-07 16:11:35 -0500 | [diff] [blame] | 1 | # |
Patrick Williams | 92b42cb | 2022-09-03 06:53:57 -0500 | [diff] [blame] | 2 | # Copyright BitBake Contributors |
| 3 | # |
Andrew Geissler | c926e17 | 2021-05-07 16:11:35 -0500 | [diff] [blame] | 4 | # SPDX-License-Identifier: GPL-2.0-only |
| 5 | # |
| 6 | |
| 7 | import abc |
| 8 | import asyncio |
| 9 | import json |
| 10 | import os |
| 11 | import socket |
Andrew Geissler | eff2747 | 2021-10-29 15:35:00 -0500 | [diff] [blame] | 12 | import sys |
Patrick Williams | ac13d5f | 2023-11-24 18:59:46 -0600 | [diff] [blame] | 13 | from .connection import StreamConnection, WebsocketConnection, DEFAULT_MAX_CHUNK |
| 14 | from .exceptions import ConnectionClosedError, InvokeError |
Andrew Geissler | c926e17 | 2021-05-07 16:11:35 -0500 | [diff] [blame] | 15 | |
| 16 | |
| 17 | class AsyncClient(object): |
Patrick Williams | 213cb26 | 2021-08-07 19:21:33 -0500 | [diff] [blame] | 18 | def __init__(self, proto_name, proto_version, logger, timeout=30): |
Patrick Williams | ac13d5f | 2023-11-24 18:59:46 -0600 | [diff] [blame] | 19 | self.socket = None |
Andrew Geissler | c926e17 | 2021-05-07 16:11:35 -0500 | [diff] [blame] | 20 | self.max_chunk = DEFAULT_MAX_CHUNK |
| 21 | self.proto_name = proto_name |
| 22 | self.proto_version = proto_version |
| 23 | self.logger = logger |
Patrick Williams | 213cb26 | 2021-08-07 19:21:33 -0500 | [diff] [blame] | 24 | self.timeout = timeout |
Andrew Geissler | c926e17 | 2021-05-07 16:11:35 -0500 | [diff] [blame] | 25 | |
| 26 | async def connect_tcp(self, address, port): |
| 27 | async def connect_sock(): |
Patrick Williams | ac13d5f | 2023-11-24 18:59:46 -0600 | [diff] [blame] | 28 | reader, writer = await asyncio.open_connection(address, port) |
| 29 | return StreamConnection(reader, writer, self.timeout, self.max_chunk) |
Andrew Geissler | c926e17 | 2021-05-07 16:11:35 -0500 | [diff] [blame] | 30 | |
| 31 | self._connect_sock = connect_sock |
| 32 | |
| 33 | async def connect_unix(self, path): |
| 34 | async def connect_sock(): |
Andrew Geissler | 87f5cff | 2022-09-30 13:13:31 -0500 | [diff] [blame] | 35 | # AF_UNIX has path length issues so chdir here to workaround |
| 36 | cwd = os.getcwd() |
| 37 | try: |
| 38 | os.chdir(os.path.dirname(path)) |
| 39 | # The socket must be opened synchronously so that CWD doesn't get |
| 40 | # changed out from underneath us so we pass as a sock into asyncio |
| 41 | sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0) |
| 42 | sock.connect(os.path.basename(path)) |
| 43 | finally: |
Patrick Williams | ac13d5f | 2023-11-24 18:59:46 -0600 | [diff] [blame] | 44 | os.chdir(cwd) |
| 45 | reader, writer = await asyncio.open_unix_connection(sock=sock) |
| 46 | return StreamConnection(reader, writer, self.timeout, self.max_chunk) |
| 47 | |
| 48 | self._connect_sock = connect_sock |
| 49 | |
| 50 | async def connect_websocket(self, uri): |
| 51 | import websockets |
| 52 | |
| 53 | async def connect_sock(): |
| 54 | websocket = await websockets.connect(uri, ping_interval=None) |
| 55 | return WebsocketConnection(websocket, self.timeout) |
Andrew Geissler | c926e17 | 2021-05-07 16:11:35 -0500 | [diff] [blame] | 56 | |
| 57 | self._connect_sock = connect_sock |
| 58 | |
| 59 | async def setup_connection(self): |
Patrick Williams | ac13d5f | 2023-11-24 18:59:46 -0600 | [diff] [blame] | 60 | # Send headers |
| 61 | await self.socket.send("%s %s" % (self.proto_name, self.proto_version)) |
| 62 | # End of headers |
| 63 | await self.socket.send("") |
Andrew Geissler | c926e17 | 2021-05-07 16:11:35 -0500 | [diff] [blame] | 64 | |
| 65 | async def connect(self): |
Patrick Williams | ac13d5f | 2023-11-24 18:59:46 -0600 | [diff] [blame] | 66 | if self.socket is None: |
| 67 | self.socket = await self._connect_sock() |
Andrew Geissler | c926e17 | 2021-05-07 16:11:35 -0500 | [diff] [blame] | 68 | await self.setup_connection() |
| 69 | |
Patrick Williams | ac13d5f | 2023-11-24 18:59:46 -0600 | [diff] [blame] | 70 | async def disconnect(self): |
| 71 | if self.socket is not None: |
| 72 | await self.socket.close() |
| 73 | self.socket = None |
Andrew Geissler | c926e17 | 2021-05-07 16:11:35 -0500 | [diff] [blame] | 74 | |
Patrick Williams | ac13d5f | 2023-11-24 18:59:46 -0600 | [diff] [blame] | 75 | async def close(self): |
| 76 | await self.disconnect() |
Andrew Geissler | c926e17 | 2021-05-07 16:11:35 -0500 | [diff] [blame] | 77 | |
| 78 | async def _send_wrapper(self, proc): |
| 79 | count = 0 |
| 80 | while True: |
| 81 | try: |
| 82 | await self.connect() |
| 83 | return await proc() |
| 84 | except ( |
| 85 | OSError, |
| 86 | ConnectionError, |
Patrick Williams | ac13d5f | 2023-11-24 18:59:46 -0600 | [diff] [blame] | 87 | ConnectionClosedError, |
Andrew Geissler | c926e17 | 2021-05-07 16:11:35 -0500 | [diff] [blame] | 88 | json.JSONDecodeError, |
| 89 | UnicodeDecodeError, |
| 90 | ) as e: |
| 91 | self.logger.warning("Error talking to server: %s" % e) |
| 92 | if count >= 3: |
| 93 | if not isinstance(e, ConnectionError): |
| 94 | raise ConnectionError(str(e)) |
| 95 | raise e |
| 96 | await self.close() |
| 97 | count += 1 |
| 98 | |
Patrick Williams | ac13d5f | 2023-11-24 18:59:46 -0600 | [diff] [blame] | 99 | def check_invoke_error(self, msg): |
| 100 | if isinstance(msg, dict) and "invoke-error" in msg: |
| 101 | raise InvokeError(msg["invoke-error"]["message"]) |
Patrick Williams | 213cb26 | 2021-08-07 19:21:33 -0500 | [diff] [blame] | 102 | |
Patrick Williams | ac13d5f | 2023-11-24 18:59:46 -0600 | [diff] [blame] | 103 | async def invoke(self, msg): |
Andrew Geissler | c926e17 | 2021-05-07 16:11:35 -0500 | [diff] [blame] | 104 | async def proc(): |
Patrick Williams | ac13d5f | 2023-11-24 18:59:46 -0600 | [diff] [blame] | 105 | await self.socket.send_message(msg) |
| 106 | return await self.socket.recv_message() |
Andrew Geissler | c926e17 | 2021-05-07 16:11:35 -0500 | [diff] [blame] | 107 | |
Patrick Williams | ac13d5f | 2023-11-24 18:59:46 -0600 | [diff] [blame] | 108 | result = await self._send_wrapper(proc) |
| 109 | self.check_invoke_error(result) |
| 110 | return result |
Andrew Geissler | c926e17 | 2021-05-07 16:11:35 -0500 | [diff] [blame] | 111 | |
Andrew Geissler | 0903674 | 2021-06-25 14:25:14 -0500 | [diff] [blame] | 112 | async def ping(self): |
Patrick Williams | ac13d5f | 2023-11-24 18:59:46 -0600 | [diff] [blame] | 113 | return await self.invoke({"ping": {}}) |
| 114 | |
| 115 | async def __aenter__(self): |
| 116 | return self |
| 117 | |
| 118 | async def __aexit__(self, exc_type, exc_value, traceback): |
| 119 | await self.close() |
Andrew Geissler | 0903674 | 2021-06-25 14:25:14 -0500 | [diff] [blame] | 120 | |
Andrew Geissler | c926e17 | 2021-05-07 16:11:35 -0500 | [diff] [blame] | 121 | |
| 122 | class Client(object): |
| 123 | def __init__(self): |
| 124 | self.client = self._get_async_client() |
| 125 | self.loop = asyncio.new_event_loop() |
| 126 | |
Andrew Geissler | d159c7f | 2021-09-02 21:05:58 -0500 | [diff] [blame] | 127 | # Override any pre-existing loop. |
| 128 | # Without this, the PR server export selftest triggers a hang |
| 129 | # when running with Python 3.7. The drawback is that there is |
| 130 | # potential for issues if the PR and hash equiv (or some new) |
| 131 | # clients need to both be instantiated in the same process. |
| 132 | # This should be revisited if/when Python 3.9 becomes the |
| 133 | # minimum required version for BitBake, as it seems not |
| 134 | # required (but harmless) with it. |
| 135 | asyncio.set_event_loop(self.loop) |
| 136 | |
Patrick Williams | ac13d5f | 2023-11-24 18:59:46 -0600 | [diff] [blame] | 137 | self._add_methods("connect_tcp", "ping") |
Andrew Geissler | c926e17 | 2021-05-07 16:11:35 -0500 | [diff] [blame] | 138 | |
| 139 | @abc.abstractmethod |
| 140 | def _get_async_client(self): |
| 141 | pass |
| 142 | |
| 143 | def _get_downcall_wrapper(self, downcall): |
| 144 | def wrapper(*args, **kwargs): |
| 145 | return self.loop.run_until_complete(downcall(*args, **kwargs)) |
| 146 | |
| 147 | return wrapper |
| 148 | |
| 149 | def _add_methods(self, *methods): |
| 150 | for m in methods: |
| 151 | downcall = getattr(self.client, m) |
| 152 | setattr(self, m, self._get_downcall_wrapper(downcall)) |
| 153 | |
| 154 | def connect_unix(self, path): |
Andrew Geissler | 87f5cff | 2022-09-30 13:13:31 -0500 | [diff] [blame] | 155 | self.loop.run_until_complete(self.client.connect_unix(path)) |
| 156 | self.loop.run_until_complete(self.client.connect()) |
Andrew Geissler | c926e17 | 2021-05-07 16:11:35 -0500 | [diff] [blame] | 157 | |
| 158 | @property |
| 159 | def max_chunk(self): |
| 160 | return self.client.max_chunk |
| 161 | |
| 162 | @max_chunk.setter |
| 163 | def max_chunk(self, value): |
| 164 | self.client.max_chunk = value |
Andrew Geissler | eff2747 | 2021-10-29 15:35:00 -0500 | [diff] [blame] | 165 | |
Patrick Williams | ac13d5f | 2023-11-24 18:59:46 -0600 | [diff] [blame] | 166 | def disconnect(self): |
Andrew Geissler | eff2747 | 2021-10-29 15:35:00 -0500 | [diff] [blame] | 167 | self.loop.run_until_complete(self.client.close()) |
Patrick Williams | ac13d5f | 2023-11-24 18:59:46 -0600 | [diff] [blame] | 168 | |
| 169 | def close(self): |
| 170 | if self.loop: |
| 171 | self.loop.run_until_complete(self.client.close()) |
| 172 | if sys.version_info >= (3, 6): |
| 173 | self.loop.run_until_complete(self.loop.shutdown_asyncgens()) |
| 174 | self.loop.close() |
| 175 | self.loop = None |
| 176 | |
| 177 | def __enter__(self): |
| 178 | return self |
| 179 | |
| 180 | def __exit__(self, exc_type, exc_value, traceback): |
| 181 | self.close() |
| 182 | return False |