blob: 0d7cd85780d7d887987f3ce16ea22f4419cd8e92 [file] [log] [blame]
Andrew Geisslerc926e172021-05-07 16:11:35 -05001#
Patrick Williams92b42cb2022-09-03 06:53:57 -05002# Copyright BitBake Contributors
3#
Andrew Geisslerc926e172021-05-07 16:11:35 -05004# SPDX-License-Identifier: GPL-2.0-only
5#
6
7import abc
8import asyncio
9import json
10import os
11import socket
Andrew Geisslereff27472021-10-29 15:35:00 -050012import sys
Patrick Williamsac13d5f2023-11-24 18:59:46 -060013from .connection import StreamConnection, WebsocketConnection, DEFAULT_MAX_CHUNK
14from .exceptions import ConnectionClosedError, InvokeError
Andrew Geisslerc926e172021-05-07 16:11:35 -050015
16
17class AsyncClient(object):
Patrick Williams213cb262021-08-07 19:21:33 -050018 def __init__(self, proto_name, proto_version, logger, timeout=30):
Patrick Williamsac13d5f2023-11-24 18:59:46 -060019 self.socket = None
Andrew Geisslerc926e172021-05-07 16:11:35 -050020 self.max_chunk = DEFAULT_MAX_CHUNK
21 self.proto_name = proto_name
22 self.proto_version = proto_version
23 self.logger = logger
Patrick Williams213cb262021-08-07 19:21:33 -050024 self.timeout = timeout
Andrew Geisslerc926e172021-05-07 16:11:35 -050025
26 async def connect_tcp(self, address, port):
27 async def connect_sock():
Patrick Williamsac13d5f2023-11-24 18:59:46 -060028 reader, writer = await asyncio.open_connection(address, port)
29 return StreamConnection(reader, writer, self.timeout, self.max_chunk)
Andrew Geisslerc926e172021-05-07 16:11:35 -050030
31 self._connect_sock = connect_sock
32
33 async def connect_unix(self, path):
34 async def connect_sock():
Andrew Geissler87f5cff2022-09-30 13:13:31 -050035 # AF_UNIX has path length issues so chdir here to workaround
36 cwd = os.getcwd()
37 try:
38 os.chdir(os.path.dirname(path))
39 # The socket must be opened synchronously so that CWD doesn't get
40 # changed out from underneath us so we pass as a sock into asyncio
41 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
42 sock.connect(os.path.basename(path))
43 finally:
Patrick Williamsac13d5f2023-11-24 18:59:46 -060044 os.chdir(cwd)
45 reader, writer = await asyncio.open_unix_connection(sock=sock)
46 return StreamConnection(reader, writer, self.timeout, self.max_chunk)
47
48 self._connect_sock = connect_sock
49
50 async def connect_websocket(self, uri):
51 import websockets
52
53 async def connect_sock():
54 websocket = await websockets.connect(uri, ping_interval=None)
55 return WebsocketConnection(websocket, self.timeout)
Andrew Geisslerc926e172021-05-07 16:11:35 -050056
57 self._connect_sock = connect_sock
58
59 async def setup_connection(self):
Patrick Williamsac13d5f2023-11-24 18:59:46 -060060 # Send headers
61 await self.socket.send("%s %s" % (self.proto_name, self.proto_version))
62 # End of headers
63 await self.socket.send("")
Andrew Geisslerc926e172021-05-07 16:11:35 -050064
65 async def connect(self):
Patrick Williamsac13d5f2023-11-24 18:59:46 -060066 if self.socket is None:
67 self.socket = await self._connect_sock()
Andrew Geisslerc926e172021-05-07 16:11:35 -050068 await self.setup_connection()
69
Patrick Williamsac13d5f2023-11-24 18:59:46 -060070 async def disconnect(self):
71 if self.socket is not None:
72 await self.socket.close()
73 self.socket = None
Andrew Geisslerc926e172021-05-07 16:11:35 -050074
Patrick Williamsac13d5f2023-11-24 18:59:46 -060075 async def close(self):
76 await self.disconnect()
Andrew Geisslerc926e172021-05-07 16:11:35 -050077
78 async def _send_wrapper(self, proc):
79 count = 0
80 while True:
81 try:
82 await self.connect()
83 return await proc()
84 except (
85 OSError,
86 ConnectionError,
Patrick Williamsac13d5f2023-11-24 18:59:46 -060087 ConnectionClosedError,
Andrew Geisslerc926e172021-05-07 16:11:35 -050088 json.JSONDecodeError,
89 UnicodeDecodeError,
90 ) as e:
91 self.logger.warning("Error talking to server: %s" % e)
92 if count >= 3:
93 if not isinstance(e, ConnectionError):
94 raise ConnectionError(str(e))
95 raise e
96 await self.close()
97 count += 1
98
Patrick Williamsac13d5f2023-11-24 18:59:46 -060099 def check_invoke_error(self, msg):
100 if isinstance(msg, dict) and "invoke-error" in msg:
101 raise InvokeError(msg["invoke-error"]["message"])
Patrick Williams213cb262021-08-07 19:21:33 -0500102
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600103 async def invoke(self, msg):
Andrew Geisslerc926e172021-05-07 16:11:35 -0500104 async def proc():
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600105 await self.socket.send_message(msg)
106 return await self.socket.recv_message()
Andrew Geisslerc926e172021-05-07 16:11:35 -0500107
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600108 result = await self._send_wrapper(proc)
109 self.check_invoke_error(result)
110 return result
Andrew Geisslerc926e172021-05-07 16:11:35 -0500111
Andrew Geissler09036742021-06-25 14:25:14 -0500112 async def ping(self):
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600113 return await self.invoke({"ping": {}})
114
115 async def __aenter__(self):
116 return self
117
118 async def __aexit__(self, exc_type, exc_value, traceback):
119 await self.close()
Andrew Geissler09036742021-06-25 14:25:14 -0500120
Andrew Geisslerc926e172021-05-07 16:11:35 -0500121
122class Client(object):
123 def __init__(self):
124 self.client = self._get_async_client()
125 self.loop = asyncio.new_event_loop()
126
Andrew Geisslerd159c7f2021-09-02 21:05:58 -0500127 # Override any pre-existing loop.
128 # Without this, the PR server export selftest triggers a hang
129 # when running with Python 3.7. The drawback is that there is
130 # potential for issues if the PR and hash equiv (or some new)
131 # clients need to both be instantiated in the same process.
132 # This should be revisited if/when Python 3.9 becomes the
133 # minimum required version for BitBake, as it seems not
134 # required (but harmless) with it.
135 asyncio.set_event_loop(self.loop)
136
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600137 self._add_methods("connect_tcp", "ping")
Andrew Geisslerc926e172021-05-07 16:11:35 -0500138
139 @abc.abstractmethod
140 def _get_async_client(self):
141 pass
142
143 def _get_downcall_wrapper(self, downcall):
144 def wrapper(*args, **kwargs):
145 return self.loop.run_until_complete(downcall(*args, **kwargs))
146
147 return wrapper
148
149 def _add_methods(self, *methods):
150 for m in methods:
151 downcall = getattr(self.client, m)
152 setattr(self, m, self._get_downcall_wrapper(downcall))
153
154 def connect_unix(self, path):
Andrew Geissler87f5cff2022-09-30 13:13:31 -0500155 self.loop.run_until_complete(self.client.connect_unix(path))
156 self.loop.run_until_complete(self.client.connect())
Andrew Geisslerc926e172021-05-07 16:11:35 -0500157
158 @property
159 def max_chunk(self):
160 return self.client.max_chunk
161
162 @max_chunk.setter
163 def max_chunk(self, value):
164 self.client.max_chunk = value
Andrew Geisslereff27472021-10-29 15:35:00 -0500165
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600166 def disconnect(self):
Andrew Geisslereff27472021-10-29 15:35:00 -0500167 self.loop.run_until_complete(self.client.close())
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600168
169 def close(self):
170 if self.loop:
171 self.loop.run_until_complete(self.client.close())
172 if sys.version_info >= (3, 6):
173 self.loop.run_until_complete(self.loop.shutdown_asyncgens())
174 self.loop.close()
175 self.loop = None
176
177 def __enter__(self):
178 return self
179
180 def __exit__(self, exc_type, exc_value, traceback):
181 self.close()
182 return False