blob: fa042bbe87cc71c34f61f1ed944399fa34ccc2e3 [file] [log] [blame]
Andrew Geisslerc926e172021-05-07 16:11:35 -05001#
Patrick Williams92b42cb2022-09-03 06:53:57 -05002# Copyright BitBake Contributors
3#
Andrew Geisslerc926e172021-05-07 16:11:35 -05004# SPDX-License-Identifier: GPL-2.0-only
5#
6
7import abc
8import asyncio
9import json
10import os
11import socket
Andrew Geisslereff27472021-10-29 15:35:00 -050012import sys
Andrew Geisslerc926e172021-05-07 16:11:35 -050013from . import chunkify, DEFAULT_MAX_CHUNK
14
15
16class AsyncClient(object):
Patrick Williams213cb262021-08-07 19:21:33 -050017 def __init__(self, proto_name, proto_version, logger, timeout=30):
Andrew Geisslerc926e172021-05-07 16:11:35 -050018 self.reader = None
19 self.writer = None
20 self.max_chunk = DEFAULT_MAX_CHUNK
21 self.proto_name = proto_name
22 self.proto_version = proto_version
23 self.logger = logger
Patrick Williams213cb262021-08-07 19:21:33 -050024 self.timeout = timeout
Andrew Geisslerc926e172021-05-07 16:11:35 -050025
26 async def connect_tcp(self, address, port):
27 async def connect_sock():
28 return await asyncio.open_connection(address, port)
29
30 self._connect_sock = connect_sock
31
32 async def connect_unix(self, path):
33 async def connect_sock():
Andrew Geissler87f5cff2022-09-30 13:13:31 -050034 # AF_UNIX has path length issues so chdir here to workaround
35 cwd = os.getcwd()
36 try:
37 os.chdir(os.path.dirname(path))
38 # The socket must be opened synchronously so that CWD doesn't get
39 # changed out from underneath us so we pass as a sock into asyncio
40 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
41 sock.connect(os.path.basename(path))
42 finally:
43 os.chdir(cwd)
44 return await asyncio.open_unix_connection(sock=sock)
Andrew Geisslerc926e172021-05-07 16:11:35 -050045
46 self._connect_sock = connect_sock
47
48 async def setup_connection(self):
49 s = '%s %s\n\n' % (self.proto_name, self.proto_version)
50 self.writer.write(s.encode("utf-8"))
51 await self.writer.drain()
52
53 async def connect(self):
54 if self.reader is None or self.writer is None:
55 (self.reader, self.writer) = await self._connect_sock()
56 await self.setup_connection()
57
58 async def close(self):
59 self.reader = None
60
61 if self.writer is not None:
62 self.writer.close()
63 self.writer = None
64
65 async def _send_wrapper(self, proc):
66 count = 0
67 while True:
68 try:
69 await self.connect()
70 return await proc()
71 except (
72 OSError,
73 ConnectionError,
74 json.JSONDecodeError,
75 UnicodeDecodeError,
76 ) as e:
77 self.logger.warning("Error talking to server: %s" % e)
78 if count >= 3:
79 if not isinstance(e, ConnectionError):
80 raise ConnectionError(str(e))
81 raise e
82 await self.close()
83 count += 1
84
85 async def send_message(self, msg):
86 async def get_line():
Patrick Williams213cb262021-08-07 19:21:33 -050087 try:
88 line = await asyncio.wait_for(self.reader.readline(), self.timeout)
89 except asyncio.TimeoutError:
90 raise ConnectionError("Timed out waiting for server")
91
Andrew Geisslerc926e172021-05-07 16:11:35 -050092 if not line:
93 raise ConnectionError("Connection closed")
94
95 line = line.decode("utf-8")
96
97 if not line.endswith("\n"):
Patrick Williams213cb262021-08-07 19:21:33 -050098 raise ConnectionError("Bad message %r" % (line))
Andrew Geisslerc926e172021-05-07 16:11:35 -050099
100 return line
101
102 async def proc():
103 for c in chunkify(json.dumps(msg), self.max_chunk):
104 self.writer.write(c.encode("utf-8"))
105 await self.writer.drain()
106
107 l = await get_line()
108
109 m = json.loads(l)
110 if m and "chunk-stream" in m:
111 lines = []
112 while True:
113 l = (await get_line()).rstrip("\n")
114 if not l:
115 break
116 lines.append(l)
117
118 m = json.loads("".join(lines))
119
120 return m
121
122 return await self._send_wrapper(proc)
123
Andrew Geissler09036742021-06-25 14:25:14 -0500124 async def ping(self):
125 return await self.send_message(
126 {'ping': {}}
127 )
128
Andrew Geisslerc926e172021-05-07 16:11:35 -0500129
130class Client(object):
131 def __init__(self):
132 self.client = self._get_async_client()
133 self.loop = asyncio.new_event_loop()
134
Andrew Geisslerd159c7f2021-09-02 21:05:58 -0500135 # Override any pre-existing loop.
136 # Without this, the PR server export selftest triggers a hang
137 # when running with Python 3.7. The drawback is that there is
138 # potential for issues if the PR and hash equiv (or some new)
139 # clients need to both be instantiated in the same process.
140 # This should be revisited if/when Python 3.9 becomes the
141 # minimum required version for BitBake, as it seems not
142 # required (but harmless) with it.
143 asyncio.set_event_loop(self.loop)
144
Andrew Geisslereff27472021-10-29 15:35:00 -0500145 self._add_methods('connect_tcp', 'ping')
Andrew Geisslerc926e172021-05-07 16:11:35 -0500146
147 @abc.abstractmethod
148 def _get_async_client(self):
149 pass
150
151 def _get_downcall_wrapper(self, downcall):
152 def wrapper(*args, **kwargs):
153 return self.loop.run_until_complete(downcall(*args, **kwargs))
154
155 return wrapper
156
157 def _add_methods(self, *methods):
158 for m in methods:
159 downcall = getattr(self.client, m)
160 setattr(self, m, self._get_downcall_wrapper(downcall))
161
162 def connect_unix(self, path):
Andrew Geissler87f5cff2022-09-30 13:13:31 -0500163 self.loop.run_until_complete(self.client.connect_unix(path))
164 self.loop.run_until_complete(self.client.connect())
Andrew Geisslerc926e172021-05-07 16:11:35 -0500165
166 @property
167 def max_chunk(self):
168 return self.client.max_chunk
169
170 @max_chunk.setter
171 def max_chunk(self, value):
172 self.client.max_chunk = value
Andrew Geisslereff27472021-10-29 15:35:00 -0500173
174 def close(self):
175 self.loop.run_until_complete(self.client.close())
176 if sys.version_info >= (3, 6):
177 self.loop.run_until_complete(self.loop.shutdown_asyncgens())
178 self.loop.close()