blob: 881434d2e95079228503edf00f79b7dadeb2c5f9 [file] [log] [blame]
Andrew Geisslerc926e172021-05-07 16:11:35 -05001#
Patrick Williams92b42cb2022-09-03 06:53:57 -05002# Copyright BitBake Contributors
3#
Andrew Geisslerc926e172021-05-07 16:11:35 -05004# SPDX-License-Identifier: GPL-2.0-only
5#
6
7import abc
8import asyncio
9import json
10import os
11import socket
Andrew Geisslereff27472021-10-29 15:35:00 -050012import sys
Andrew Geisslerc926e172021-05-07 16:11:35 -050013from . import chunkify, DEFAULT_MAX_CHUNK
14
15
16class AsyncClient(object):
Patrick Williams213cb262021-08-07 19:21:33 -050017 def __init__(self, proto_name, proto_version, logger, timeout=30):
Andrew Geisslerc926e172021-05-07 16:11:35 -050018 self.reader = None
19 self.writer = None
20 self.max_chunk = DEFAULT_MAX_CHUNK
21 self.proto_name = proto_name
22 self.proto_version = proto_version
23 self.logger = logger
Patrick Williams213cb262021-08-07 19:21:33 -050024 self.timeout = timeout
Andrew Geisslerc926e172021-05-07 16:11:35 -050025
26 async def connect_tcp(self, address, port):
27 async def connect_sock():
28 return await asyncio.open_connection(address, port)
29
30 self._connect_sock = connect_sock
31
32 async def connect_unix(self, path):
33 async def connect_sock():
34 return await asyncio.open_unix_connection(path)
35
36 self._connect_sock = connect_sock
37
38 async def setup_connection(self):
39 s = '%s %s\n\n' % (self.proto_name, self.proto_version)
40 self.writer.write(s.encode("utf-8"))
41 await self.writer.drain()
42
43 async def connect(self):
44 if self.reader is None or self.writer is None:
45 (self.reader, self.writer) = await self._connect_sock()
46 await self.setup_connection()
47
48 async def close(self):
49 self.reader = None
50
51 if self.writer is not None:
52 self.writer.close()
53 self.writer = None
54
55 async def _send_wrapper(self, proc):
56 count = 0
57 while True:
58 try:
59 await self.connect()
60 return await proc()
61 except (
62 OSError,
63 ConnectionError,
64 json.JSONDecodeError,
65 UnicodeDecodeError,
66 ) as e:
67 self.logger.warning("Error talking to server: %s" % e)
68 if count >= 3:
69 if not isinstance(e, ConnectionError):
70 raise ConnectionError(str(e))
71 raise e
72 await self.close()
73 count += 1
74
75 async def send_message(self, msg):
76 async def get_line():
Patrick Williams213cb262021-08-07 19:21:33 -050077 try:
78 line = await asyncio.wait_for(self.reader.readline(), self.timeout)
79 except asyncio.TimeoutError:
80 raise ConnectionError("Timed out waiting for server")
81
Andrew Geisslerc926e172021-05-07 16:11:35 -050082 if not line:
83 raise ConnectionError("Connection closed")
84
85 line = line.decode("utf-8")
86
87 if not line.endswith("\n"):
Patrick Williams213cb262021-08-07 19:21:33 -050088 raise ConnectionError("Bad message %r" % (line))
Andrew Geisslerc926e172021-05-07 16:11:35 -050089
90 return line
91
92 async def proc():
93 for c in chunkify(json.dumps(msg), self.max_chunk):
94 self.writer.write(c.encode("utf-8"))
95 await self.writer.drain()
96
97 l = await get_line()
98
99 m = json.loads(l)
100 if m and "chunk-stream" in m:
101 lines = []
102 while True:
103 l = (await get_line()).rstrip("\n")
104 if not l:
105 break
106 lines.append(l)
107
108 m = json.loads("".join(lines))
109
110 return m
111
112 return await self._send_wrapper(proc)
113
Andrew Geissler09036742021-06-25 14:25:14 -0500114 async def ping(self):
115 return await self.send_message(
116 {'ping': {}}
117 )
118
Andrew Geisslerc926e172021-05-07 16:11:35 -0500119
120class Client(object):
121 def __init__(self):
122 self.client = self._get_async_client()
123 self.loop = asyncio.new_event_loop()
124
Andrew Geisslerd159c7f2021-09-02 21:05:58 -0500125 # Override any pre-existing loop.
126 # Without this, the PR server export selftest triggers a hang
127 # when running with Python 3.7. The drawback is that there is
128 # potential for issues if the PR and hash equiv (or some new)
129 # clients need to both be instantiated in the same process.
130 # This should be revisited if/when Python 3.9 becomes the
131 # minimum required version for BitBake, as it seems not
132 # required (but harmless) with it.
133 asyncio.set_event_loop(self.loop)
134
Andrew Geisslereff27472021-10-29 15:35:00 -0500135 self._add_methods('connect_tcp', 'ping')
Andrew Geisslerc926e172021-05-07 16:11:35 -0500136
137 @abc.abstractmethod
138 def _get_async_client(self):
139 pass
140
141 def _get_downcall_wrapper(self, downcall):
142 def wrapper(*args, **kwargs):
143 return self.loop.run_until_complete(downcall(*args, **kwargs))
144
145 return wrapper
146
147 def _add_methods(self, *methods):
148 for m in methods:
149 downcall = getattr(self.client, m)
150 setattr(self, m, self._get_downcall_wrapper(downcall))
151
152 def connect_unix(self, path):
153 # AF_UNIX has path length issues so chdir here to workaround
154 cwd = os.getcwd()
155 try:
156 os.chdir(os.path.dirname(path))
157 self.loop.run_until_complete(self.client.connect_unix(os.path.basename(path)))
158 self.loop.run_until_complete(self.client.connect())
159 finally:
160 os.chdir(cwd)
161
162 @property
163 def max_chunk(self):
164 return self.client.max_chunk
165
166 @max_chunk.setter
167 def max_chunk(self, value):
168 self.client.max_chunk = value
Andrew Geisslereff27472021-10-29 15:35:00 -0500169
170 def close(self):
171 self.loop.run_until_complete(self.client.close())
172 if sys.version_info >= (3, 6):
173 self.loop.run_until_complete(self.loop.shutdown_asyncgens())
174 self.loop.close()