blob: 55f48410d32568a62fd9d31455591d674d530c06 [file] [log] [blame]
Brad Bishop08902b02019-08-20 09:16:51 -04001# Copyright (C) 2018-2019 Garmin Ltd.
Brad Bishop19323692019-04-05 15:28:33 -04002#
Brad Bishopc342db32019-05-15 21:57:59 -04003# SPDX-License-Identifier: GPL-2.0-only
Brad Bishop19323692019-04-05 15:28:33 -04004#
Brad Bishop19323692019-04-05 15:28:33 -04005
Andrew Geissler6ce62a22020-11-30 19:58:47 -06006import asyncio
Brad Bishopa34c0302019-09-23 22:34:48 -04007from contextlib import closing
8import re
Brad Bishop19323692019-04-05 15:28:33 -04009import sqlite3
Andrew Geissler475cb722020-07-10 16:00:51 -050010import itertools
11import json
Brad Bishop19323692019-04-05 15:28:33 -040012
Brad Bishopa34c0302019-09-23 22:34:48 -040013UNIX_PREFIX = "unix://"
Brad Bishop19323692019-04-05 15:28:33 -040014
Brad Bishopa34c0302019-09-23 22:34:48 -040015ADDR_TYPE_UNIX = 0
16ADDR_TYPE_TCP = 1
Brad Bishop19323692019-04-05 15:28:33 -040017
Andrew Geissler475cb722020-07-10 16:00:51 -050018# The Python async server defaults to a 64K receive buffer, so we hardcode our
19# maximum chunk size. It would be better if the client and server reported to
20# each other what the maximum chunk sizes were, but that will slow down the
21# connection setup with a round trip delay so I'd rather not do that unless it
22# is necessary
23DEFAULT_MAX_CHUNK = 32 * 1024
Brad Bishop08902b02019-08-20 09:16:51 -040024
Andrew Geissler6ce62a22020-11-30 19:58:47 -060025TABLE_DEFINITION = (
26 ("method", "TEXT NOT NULL"),
27 ("outhash", "TEXT NOT NULL"),
28 ("taskhash", "TEXT NOT NULL"),
29 ("unihash", "TEXT NOT NULL"),
30 ("created", "DATETIME"),
31
32 # Optional fields
33 ("owner", "TEXT"),
34 ("PN", "TEXT"),
35 ("PV", "TEXT"),
36 ("PR", "TEXT"),
37 ("task", "TEXT"),
38 ("outhash_siginfo", "TEXT"),
39)
40
41TABLE_COLUMNS = tuple(name for name, _ in TABLE_DEFINITION)
42
Brad Bishopa34c0302019-09-23 22:34:48 -040043def setup_database(database, sync=True):
44 db = sqlite3.connect(database)
Brad Bishop19323692019-04-05 15:28:33 -040045 db.row_factory = sqlite3.Row
46
Brad Bishopa34c0302019-09-23 22:34:48 -040047 with closing(db.cursor()) as cursor:
Brad Bishop19323692019-04-05 15:28:33 -040048 cursor.execute('''
Brad Bishop08902b02019-08-20 09:16:51 -040049 CREATE TABLE IF NOT EXISTS tasks_v2 (
Brad Bishop19323692019-04-05 15:28:33 -040050 id INTEGER PRIMARY KEY AUTOINCREMENT,
Andrew Geissler6ce62a22020-11-30 19:58:47 -060051 %s
Brad Bishop08902b02019-08-20 09:16:51 -040052 UNIQUE(method, outhash, taskhash)
Brad Bishop19323692019-04-05 15:28:33 -040053 )
Andrew Geissler6ce62a22020-11-30 19:58:47 -060054 ''' % " ".join("%s %s," % (name, typ) for name, typ in TABLE_DEFINITION))
Brad Bishopa34c0302019-09-23 22:34:48 -040055 cursor.execute('PRAGMA journal_mode = WAL')
56 cursor.execute('PRAGMA synchronous = %s' % ('NORMAL' if sync else 'OFF'))
Brad Bishop19323692019-04-05 15:28:33 -040057
Brad Bishopa34c0302019-09-23 22:34:48 -040058 # Drop old indexes
59 cursor.execute('DROP INDEX IF EXISTS taskhash_lookup')
60 cursor.execute('DROP INDEX IF EXISTS outhash_lookup')
Brad Bishop08902b02019-08-20 09:16:51 -040061
Brad Bishopa34c0302019-09-23 22:34:48 -040062 # Create new indexes
63 cursor.execute('CREATE INDEX IF NOT EXISTS taskhash_lookup_v2 ON tasks_v2 (method, taskhash, created)')
64 cursor.execute('CREATE INDEX IF NOT EXISTS outhash_lookup_v2 ON tasks_v2 (method, outhash)')
Brad Bishop08902b02019-08-20 09:16:51 -040065
Brad Bishopa34c0302019-09-23 22:34:48 -040066 return db
67
68
69def parse_address(addr):
70 if addr.startswith(UNIX_PREFIX):
71 return (ADDR_TYPE_UNIX, (addr[len(UNIX_PREFIX):],))
72 else:
73 m = re.match(r'\[(?P<host>[^\]]*)\]:(?P<port>\d+)$', addr)
74 if m is not None:
75 host = m.group('host')
76 port = m.group('port')
77 else:
78 host, port = addr.split(':')
79
80 return (ADDR_TYPE_TCP, (host, int(port)))
81
82
Andrew Geissler475cb722020-07-10 16:00:51 -050083def chunkify(msg, max_chunk):
84 if len(msg) < max_chunk - 1:
85 yield ''.join((msg, "\n"))
86 else:
87 yield ''.join((json.dumps({
88 'chunk-stream': None
89 }), "\n"))
90
91 args = [iter(msg)] * (max_chunk - 1)
92 for m in map(''.join, itertools.zip_longest(*args, fillvalue='')):
93 yield ''.join(itertools.chain(m, "\n"))
94 yield "\n"
95
96
Andrew Geissler6ce62a22020-11-30 19:58:47 -060097def create_server(addr, dbname, *, sync=True, upstream=None):
Brad Bishopa34c0302019-09-23 22:34:48 -040098 from . import server
99 db = setup_database(dbname, sync=sync)
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600100 s = server.Server(db, upstream=upstream)
Brad Bishopa34c0302019-09-23 22:34:48 -0400101
102 (typ, a) = parse_address(addr)
103 if typ == ADDR_TYPE_UNIX:
104 s.start_unix_server(*a)
105 else:
106 s.start_tcp_server(*a)
107
108 return s
109
110
111def create_client(addr):
112 from . import client
113 c = client.Client()
114
115 (typ, a) = parse_address(addr)
116 if typ == ADDR_TYPE_UNIX:
117 c.connect_unix(*a)
118 else:
119 c.connect_tcp(*a)
120
121 return c
Andrew Geissler6ce62a22020-11-30 19:58:47 -0600122
123async def create_async_client(addr):
124 from . import client
125 c = client.AsyncClient()
126
127 (typ, a) = parse_address(addr)
128 if typ == ADDR_TYPE_UNIX:
129 await c.connect_unix(*a)
130 else:
131 await c.connect_tcp(*a)
132
133 return c