blob: a0dc0c170f2b469258b40e44db0ec2dcc6bae960 [file] [log] [blame]
Brad Bishopa34c0302019-09-23 22:34:48 -04001# Copyright (C) 2019 Garmin Ltd.
2#
3# SPDX-License-Identifier: GPL-2.0-only
4#
5
Andrew Geissler6ce62a22020-11-30 19:58:47 -06006from contextlib import closing, contextmanager
Brad Bishopa34c0302019-09-23 22:34:48 -04007from datetime import datetime
8import asyncio
9import json
10import logging
11import math
12import os
13import signal
14import socket
Andrew Geissler6ce62a22020-11-30 19:58:47 -060015import sys
Brad Bishopa34c0302019-09-23 22:34:48 -040016import time
Andrew Geissler6ce62a22020-11-30 19:58:47 -060017from . import chunkify, DEFAULT_MAX_CHUNK, create_async_client, TABLE_COLUMNS
Brad Bishopa34c0302019-09-23 22:34:48 -040018
19logger = logging.getLogger('hashserv.server')
20
21
22class Measurement(object):
23 def __init__(self, sample):
24 self.sample = sample
25
26 def start(self):
27 self.start_time = time.perf_counter()
28
29 def end(self):
30 self.sample.add(time.perf_counter() - self.start_time)
31
32 def __enter__(self):
33 self.start()
34 return self
35
36 def __exit__(self, *args, **kwargs):
37 self.end()
38
39
40class Sample(object):
41 def __init__(self, stats):
42 self.stats = stats
43 self.num_samples = 0
44 self.elapsed = 0
45
46 def measure(self):
47 return Measurement(self)
48
49 def __enter__(self):
50 return self
51
52 def __exit__(self, *args, **kwargs):
53 self.end()
54
55 def add(self, elapsed):
56 self.num_samples += 1
57 self.elapsed += elapsed
58
59 def end(self):
60 if self.num_samples:
61 self.stats.add(self.elapsed)
62 self.num_samples = 0
63 self.elapsed = 0
64
65
66class Stats(object):
67 def __init__(self):
68 self.reset()
69
70 def reset(self):
71 self.num = 0
72 self.total_time = 0
73 self.max_time = 0
74 self.m = 0
75 self.s = 0
76 self.current_elapsed = None
77
78 def add(self, elapsed):
79 self.num += 1
80 if self.num == 1:
81 self.m = elapsed
82 self.s = 0
83 else:
84 last_m = self.m
85 self.m = last_m + (elapsed - last_m) / self.num
86 self.s = self.s + (elapsed - last_m) * (elapsed - self.m)
87
88 self.total_time += elapsed
89
90 if self.max_time < elapsed:
91 self.max_time = elapsed
92
93 def start_sample(self):
94 return Sample(self)
95
96 @property
97 def average(self):
98 if self.num == 0:
99 return 0
100 return self.total_time / self.num
101
102 @property
103 def stdev(self):
104 if self.num <= 1:
105 return 0
106 return math.sqrt(self.s / (self.num - 1))
107
108 def todict(self):
109 return {k: getattr(self, k) for k in ('num', 'total_time', 'max_time', 'average', 'stdev')}
110
111
Andrew Geissler475cb722020-07-10 16:00:51 -0500112class ClientError(Exception):
113 pass
114
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600115class ServerError(Exception):
116 pass
117
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600118def insert_task(cursor, data, ignore=False):
119 keys = sorted(data.keys())
120 query = '''INSERT%s INTO tasks_v2 (%s) VALUES (%s)''' % (
121 " OR IGNORE" if ignore else "",
122 ', '.join(keys),
123 ', '.join(':' + k for k in keys))
124 cursor.execute(query, data)
125
126async def copy_from_upstream(client, db, method, taskhash):
127 d = await client.get_taskhash(method, taskhash, True)
128 if d is not None:
129 # Filter out unknown columns
130 d = {k: v for k, v in d.items() if k in TABLE_COLUMNS}
131 keys = sorted(d.keys())
132
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600133 with closing(db.cursor()) as cursor:
134 insert_task(cursor, d)
135 db.commit()
136
137 return d
138
139async def copy_outhash_from_upstream(client, db, method, outhash, taskhash):
140 d = await client.get_outhash(method, outhash, taskhash)
141 if d is not None:
142 # Filter out unknown columns
143 d = {k: v for k, v in d.items() if k in TABLE_COLUMNS}
144 keys = sorted(d.keys())
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600145
146 with closing(db.cursor()) as cursor:
147 insert_task(cursor, d)
148 db.commit()
149
150 return d
151
Brad Bishopa34c0302019-09-23 22:34:48 -0400152class ServerClient(object):
Andrew Geissler475cb722020-07-10 16:00:51 -0500153 FAST_QUERY = 'SELECT taskhash, method, unihash FROM tasks_v2 WHERE method=:method AND taskhash=:taskhash ORDER BY created ASC LIMIT 1'
154 ALL_QUERY = 'SELECT * FROM tasks_v2 WHERE method=:method AND taskhash=:taskhash ORDER BY created ASC LIMIT 1'
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600155 OUTHASH_QUERY = '''
156 -- Find tasks with a matching outhash (that is, tasks that
157 -- are equivalent)
158 SELECT * FROM tasks_v2 WHERE method=:method AND outhash=:outhash
Andrew Geissler475cb722020-07-10 16:00:51 -0500159
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600160 -- If there is an exact match on the taskhash, return it.
161 -- Otherwise return the oldest matching outhash of any
162 -- taskhash
163 ORDER BY CASE WHEN taskhash=:taskhash THEN 1 ELSE 2 END,
164 created ASC
165
166 -- Only return one row
167 LIMIT 1
168 '''
169
170 def __init__(self, reader, writer, db, request_stats, backfill_queue, upstream, read_only):
Brad Bishopa34c0302019-09-23 22:34:48 -0400171 self.reader = reader
172 self.writer = writer
173 self.db = db
174 self.request_stats = request_stats
Andrew Geissler475cb722020-07-10 16:00:51 -0500175 self.max_chunk = DEFAULT_MAX_CHUNK
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600176 self.backfill_queue = backfill_queue
177 self.upstream = upstream
Andrew Geissler475cb722020-07-10 16:00:51 -0500178
179 self.handlers = {
180 'get': self.handle_get,
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600181 'get-outhash': self.handle_get_outhash,
Andrew Geissler475cb722020-07-10 16:00:51 -0500182 'get-stream': self.handle_get_stream,
183 'get-stats': self.handle_get_stats,
Andrew Geissler475cb722020-07-10 16:00:51 -0500184 'chunk-stream': self.handle_chunk,
185 }
Brad Bishopa34c0302019-09-23 22:34:48 -0400186
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600187 if not read_only:
188 self.handlers.update({
189 'report': self.handle_report,
190 'report-equiv': self.handle_equivreport,
191 'reset-stats': self.handle_reset_stats,
192 'backfill-wait': self.handle_backfill_wait,
193 })
194
Brad Bishopa34c0302019-09-23 22:34:48 -0400195 async def process_requests(self):
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600196 if self.upstream is not None:
197 self.upstream_client = await create_async_client(self.upstream)
198 else:
199 self.upstream_client = None
200
Brad Bishopa34c0302019-09-23 22:34:48 -0400201 try:
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600202
203
Brad Bishopa34c0302019-09-23 22:34:48 -0400204 self.addr = self.writer.get_extra_info('peername')
205 logger.debug('Client %r connected' % (self.addr,))
206
207 # Read protocol and version
208 protocol = await self.reader.readline()
209 if protocol is None:
210 return
211
212 (proto_name, proto_version) = protocol.decode('utf-8').rstrip().split()
Andrew Geissler475cb722020-07-10 16:00:51 -0500213 if proto_name != 'OEHASHEQUIV':
214 return
215
216 proto_version = tuple(int(v) for v in proto_version.split('.'))
217 if proto_version < (1, 0) or proto_version > (1, 1):
Brad Bishopa34c0302019-09-23 22:34:48 -0400218 return
219
220 # Read headers. Currently, no headers are implemented, so look for
221 # an empty line to signal the end of the headers
222 while True:
223 line = await self.reader.readline()
224 if line is None:
225 return
226
227 line = line.decode('utf-8').rstrip()
228 if not line:
229 break
230
231 # Handle messages
Brad Bishopa34c0302019-09-23 22:34:48 -0400232 while True:
233 d = await self.read_message()
234 if d is None:
235 break
Andrew Geissler475cb722020-07-10 16:00:51 -0500236 await self.dispatch_message(d)
Brad Bishopa34c0302019-09-23 22:34:48 -0400237 await self.writer.drain()
Andrew Geissler475cb722020-07-10 16:00:51 -0500238 except ClientError as e:
239 logger.error(str(e))
Brad Bishopa34c0302019-09-23 22:34:48 -0400240 finally:
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600241 if self.upstream_client is not None:
242 await self.upstream_client.close()
243
Brad Bishopa34c0302019-09-23 22:34:48 -0400244 self.writer.close()
245
Andrew Geissler475cb722020-07-10 16:00:51 -0500246 async def dispatch_message(self, msg):
247 for k in self.handlers.keys():
248 if k in msg:
249 logger.debug('Handling %s' % k)
250 if 'stream' in k:
251 await self.handlers[k](msg[k])
252 else:
253 with self.request_stats.start_sample() as self.request_sample, \
254 self.request_sample.measure():
255 await self.handlers[k](msg[k])
256 return
257
258 raise ClientError("Unrecognized command %r" % msg)
259
Brad Bishopa34c0302019-09-23 22:34:48 -0400260 def write_message(self, msg):
Andrew Geissler475cb722020-07-10 16:00:51 -0500261 for c in chunkify(json.dumps(msg), self.max_chunk):
262 self.writer.write(c.encode('utf-8'))
Brad Bishopa34c0302019-09-23 22:34:48 -0400263
264 async def read_message(self):
265 l = await self.reader.readline()
266 if not l:
267 return None
268
269 try:
270 message = l.decode('utf-8')
271
272 if not message.endswith('\n'):
273 return None
274
275 return json.loads(message)
276 except (json.JSONDecodeError, UnicodeDecodeError) as e:
277 logger.error('Bad message from client: %r' % message)
278 raise e
279
Andrew Geissler475cb722020-07-10 16:00:51 -0500280 async def handle_chunk(self, request):
281 lines = []
282 try:
283 while True:
284 l = await self.reader.readline()
285 l = l.rstrip(b"\n").decode("utf-8")
286 if not l:
287 break
288 lines.append(l)
289
290 msg = json.loads(''.join(lines))
291 except (json.JSONDecodeError, UnicodeDecodeError) as e:
292 logger.error('Bad message from client: %r' % message)
293 raise e
294
295 if 'chunk-stream' in msg:
296 raise ClientError("Nested chunks are not allowed")
297
298 await self.dispatch_message(msg)
299
Brad Bishopa34c0302019-09-23 22:34:48 -0400300 async def handle_get(self, request):
301 method = request['method']
302 taskhash = request['taskhash']
303
Andrew Geissler475cb722020-07-10 16:00:51 -0500304 if request.get('all', False):
305 row = self.query_equivalent(method, taskhash, self.ALL_QUERY)
306 else:
307 row = self.query_equivalent(method, taskhash, self.FAST_QUERY)
308
Brad Bishopa34c0302019-09-23 22:34:48 -0400309 if row is not None:
310 logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash']))
Andrew Geissler475cb722020-07-10 16:00:51 -0500311 d = {k: row[k] for k in row.keys()}
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600312 elif self.upstream_client is not None:
313 d = await copy_from_upstream(self.upstream_client, self.db, method, taskhash)
Brad Bishopa34c0302019-09-23 22:34:48 -0400314 else:
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600315 d = None
316
317 self.write_message(d)
Brad Bishopa34c0302019-09-23 22:34:48 -0400318
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600319 async def handle_get_outhash(self, request):
320 with closing(self.db.cursor()) as cursor:
321 cursor.execute(self.OUTHASH_QUERY,
322 {k: request[k] for k in ('method', 'outhash', 'taskhash')})
323
324 row = cursor.fetchone()
325
326 if row is not None:
327 logger.debug('Found equivalent outhash %s -> %s', (row['outhash'], row['unihash']))
328 d = {k: row[k] for k in row.keys()}
329 else:
330 d = None
331
332 self.write_message(d)
333
Brad Bishopa34c0302019-09-23 22:34:48 -0400334 async def handle_get_stream(self, request):
335 self.write_message('ok')
336
337 while True:
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600338 upstream = None
339
Brad Bishopa34c0302019-09-23 22:34:48 -0400340 l = await self.reader.readline()
341 if not l:
342 return
343
344 try:
345 # This inner loop is very sensitive and must be as fast as
346 # possible (which is why the request sample is handled manually
347 # instead of using 'with', and also why logging statements are
348 # commented out.
349 self.request_sample = self.request_stats.start_sample()
350 request_measure = self.request_sample.measure()
351 request_measure.start()
352
353 l = l.decode('utf-8').rstrip()
354 if l == 'END':
355 self.writer.write('ok\n'.encode('utf-8'))
356 return
357
358 (method, taskhash) = l.split()
359 #logger.debug('Looking up %s %s' % (method, taskhash))
Andrew Geissler475cb722020-07-10 16:00:51 -0500360 row = self.query_equivalent(method, taskhash, self.FAST_QUERY)
Brad Bishopa34c0302019-09-23 22:34:48 -0400361 if row is not None:
362 msg = ('%s\n' % row['unihash']).encode('utf-8')
363 #logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash']))
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600364 elif self.upstream_client is not None:
365 upstream = await self.upstream_client.get_unihash(method, taskhash)
366 if upstream:
367 msg = ("%s\n" % upstream).encode("utf-8")
368 else:
369 msg = "\n".encode("utf-8")
Brad Bishopa34c0302019-09-23 22:34:48 -0400370 else:
371 msg = '\n'.encode('utf-8')
372
373 self.writer.write(msg)
374 finally:
375 request_measure.end()
376 self.request_sample.end()
377
378 await self.writer.drain()
379
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600380 # Post to the backfill queue after writing the result to minimize
381 # the turn around time on a request
382 if upstream is not None:
383 await self.backfill_queue.put((method, taskhash))
384
Brad Bishopa34c0302019-09-23 22:34:48 -0400385 async def handle_report(self, data):
386 with closing(self.db.cursor()) as cursor:
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600387 cursor.execute(self.OUTHASH_QUERY,
388 {k: data[k] for k in ('method', 'outhash', 'taskhash')})
Brad Bishopa34c0302019-09-23 22:34:48 -0400389
390 row = cursor.fetchone()
391
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600392 if row is None and self.upstream_client:
393 # Try upstream
394 row = await copy_outhash_from_upstream(self.upstream_client,
395 self.db,
396 data['method'],
397 data['outhash'],
398 data['taskhash'])
399
Brad Bishopa34c0302019-09-23 22:34:48 -0400400 # If no matching outhash was found, or one *was* found but it
401 # wasn't an exact match on the taskhash, a new entry for this
402 # taskhash should be added
403 if row is None or row['taskhash'] != data['taskhash']:
404 # If a row matching the outhash was found, the unihash for
405 # the new taskhash should be the same as that one.
406 # Otherwise the caller provided unihash is used.
407 unihash = data['unihash']
408 if row is not None:
409 unihash = row['unihash']
410
411 insert_data = {
412 'method': data['method'],
413 'outhash': data['outhash'],
414 'taskhash': data['taskhash'],
415 'unihash': unihash,
416 'created': datetime.now()
417 }
418
419 for k in ('owner', 'PN', 'PV', 'PR', 'task', 'outhash_siginfo'):
420 if k in data:
421 insert_data[k] = data[k]
422
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600423 insert_task(cursor, insert_data)
Brad Bishopa34c0302019-09-23 22:34:48 -0400424 self.db.commit()
425
426 logger.info('Adding taskhash %s with unihash %s',
427 data['taskhash'], unihash)
428
429 d = {
430 'taskhash': data['taskhash'],
431 'method': data['method'],
432 'unihash': unihash
433 }
434 else:
435 d = {k: row[k] for k in ('taskhash', 'method', 'unihash')}
436
437 self.write_message(d)
438
Andrew Geissler82c905d2020-04-13 13:39:40 -0500439 async def handle_equivreport(self, data):
440 with closing(self.db.cursor()) as cursor:
441 insert_data = {
442 'method': data['method'],
443 'outhash': "",
444 'taskhash': data['taskhash'],
445 'unihash': data['unihash'],
446 'created': datetime.now()
447 }
448
449 for k in ('owner', 'PN', 'PV', 'PR', 'task', 'outhash_siginfo'):
450 if k in data:
451 insert_data[k] = data[k]
452
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600453 insert_task(cursor, insert_data, ignore=True)
Andrew Geissler82c905d2020-04-13 13:39:40 -0500454 self.db.commit()
455
456 # Fetch the unihash that will be reported for the taskhash. If the
457 # unihash matches, it means this row was inserted (or the mapping
458 # was already valid)
Andrew Geissler475cb722020-07-10 16:00:51 -0500459 row = self.query_equivalent(data['method'], data['taskhash'], self.FAST_QUERY)
Andrew Geissler82c905d2020-04-13 13:39:40 -0500460
461 if row['unihash'] == data['unihash']:
462 logger.info('Adding taskhash equivalence for %s with unihash %s',
463 data['taskhash'], row['unihash'])
464
465 d = {k: row[k] for k in ('taskhash', 'method', 'unihash')}
466
467 self.write_message(d)
468
469
Brad Bishopa34c0302019-09-23 22:34:48 -0400470 async def handle_get_stats(self, request):
471 d = {
472 'requests': self.request_stats.todict(),
473 }
474
475 self.write_message(d)
476
477 async def handle_reset_stats(self, request):
478 d = {
479 'requests': self.request_stats.todict(),
480 }
481
482 self.request_stats.reset()
483 self.write_message(d)
484
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600485 async def handle_backfill_wait(self, request):
486 d = {
487 'tasks': self.backfill_queue.qsize(),
488 }
489 await self.backfill_queue.join()
490 self.write_message(d)
491
Andrew Geissler475cb722020-07-10 16:00:51 -0500492 def query_equivalent(self, method, taskhash, query):
Brad Bishopa34c0302019-09-23 22:34:48 -0400493 # This is part of the inner loop and must be as fast as possible
494 try:
495 cursor = self.db.cursor()
Andrew Geissler475cb722020-07-10 16:00:51 -0500496 cursor.execute(query, {'method': method, 'taskhash': taskhash})
Brad Bishopa34c0302019-09-23 22:34:48 -0400497 return cursor.fetchone()
498 except:
499 cursor.close()
500
501
502class Server(object):
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600503 def __init__(self, db, loop=None, upstream=None, read_only=False):
504 if upstream and read_only:
505 raise ServerError("Read-only hashserv cannot pull from an upstream server")
506
Brad Bishopa34c0302019-09-23 22:34:48 -0400507 self.request_stats = Stats()
508 self.db = db
509
510 if loop is None:
511 self.loop = asyncio.new_event_loop()
512 self.close_loop = True
513 else:
514 self.loop = loop
515 self.close_loop = False
516
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600517 self.upstream = upstream
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600518 self.read_only = read_only
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600519
Brad Bishopa34c0302019-09-23 22:34:48 -0400520 self._cleanup_socket = None
521
522 def start_tcp_server(self, host, port):
523 self.server = self.loop.run_until_complete(
524 asyncio.start_server(self.handle_client, host, port, loop=self.loop)
525 )
526
527 for s in self.server.sockets:
528 logger.info('Listening on %r' % (s.getsockname(),))
529 # Newer python does this automatically. Do it manually here for
530 # maximum compatibility
531 s.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
532 s.setsockopt(socket.SOL_TCP, socket.TCP_QUICKACK, 1)
533
534 name = self.server.sockets[0].getsockname()
535 if self.server.sockets[0].family == socket.AF_INET6:
536 self.address = "[%s]:%d" % (name[0], name[1])
537 else:
538 self.address = "%s:%d" % (name[0], name[1])
539
540 def start_unix_server(self, path):
541 def cleanup():
542 os.unlink(path)
543
544 cwd = os.getcwd()
545 try:
546 # Work around path length limits in AF_UNIX
547 os.chdir(os.path.dirname(path))
548 self.server = self.loop.run_until_complete(
549 asyncio.start_unix_server(self.handle_client, os.path.basename(path), loop=self.loop)
550 )
551 finally:
552 os.chdir(cwd)
553
554 logger.info('Listening on %r' % path)
555
556 self._cleanup_socket = cleanup
557 self.address = "unix://%s" % os.path.abspath(path)
558
559 async def handle_client(self, reader, writer):
560 # writer.transport.set_write_buffer_limits(0)
561 try:
Andrew Geisslerd1e89492021-02-12 15:35:20 -0600562 client = ServerClient(reader, writer, self.db, self.request_stats, self.backfill_queue, self.upstream, self.read_only)
Brad Bishopa34c0302019-09-23 22:34:48 -0400563 await client.process_requests()
564 except Exception as e:
565 import traceback
566 logger.error('Error from client: %s' % str(e), exc_info=True)
567 traceback.print_exc()
568 writer.close()
569 logger.info('Client disconnected')
570
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600571 @contextmanager
572 def _backfill_worker(self):
573 async def backfill_worker_task():
574 client = await create_async_client(self.upstream)
575 try:
576 while True:
577 item = await self.backfill_queue.get()
578 if item is None:
579 self.backfill_queue.task_done()
580 break
581 method, taskhash = item
582 await copy_from_upstream(client, self.db, method, taskhash)
583 self.backfill_queue.task_done()
584 finally:
585 await client.close()
586
587 async def join_worker(worker):
588 await self.backfill_queue.put(None)
589 await worker
590
591 if self.upstream is not None:
592 worker = asyncio.ensure_future(backfill_worker_task())
593 try:
594 yield
595 finally:
596 self.loop.run_until_complete(join_worker(worker))
597 else:
598 yield
599
Brad Bishopa34c0302019-09-23 22:34:48 -0400600 def serve_forever(self):
601 def signal_handler():
602 self.loop.stop()
603
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600604 asyncio.set_event_loop(self.loop)
Brad Bishopa34c0302019-09-23 22:34:48 -0400605 try:
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600606 self.backfill_queue = asyncio.Queue()
Brad Bishopa34c0302019-09-23 22:34:48 -0400607
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600608 self.loop.add_signal_handler(signal.SIGTERM, signal_handler)
Brad Bishopa34c0302019-09-23 22:34:48 -0400609
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600610 with self._backfill_worker():
611 try:
612 self.loop.run_forever()
613 except KeyboardInterrupt:
614 pass
Brad Bishopa34c0302019-09-23 22:34:48 -0400615
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600616 self.server.close()
617
618 self.loop.run_until_complete(self.server.wait_closed())
619 logger.info('Server shutting down')
620 finally:
621 if self.close_loop:
622 if sys.version_info >= (3, 6):
623 self.loop.run_until_complete(self.loop.shutdown_asyncgens())
624 self.loop.close()
625
626 if self._cleanup_socket is not None:
627 self._cleanup_socket()