blob: f93cb2c1dd9ea8af64f6024d4634eca73a1215de [file] [log] [blame]
Patrick Williamsac13d5f2023-11-24 18:59:46 -06001#! /usr/bin/env python3
2#
3# Copyright (C) 2023 Garmin Ltd.
4#
5# SPDX-License-Identifier: GPL-2.0-only
6#
7import sqlite3
8import logging
9from contextlib import closing
10from . import User
11
12logger = logging.getLogger("hashserv.sqlite")
13
14UNIHASH_TABLE_DEFINITION = (
15 ("method", "TEXT NOT NULL", "UNIQUE"),
16 ("taskhash", "TEXT NOT NULL", "UNIQUE"),
17 ("unihash", "TEXT NOT NULL", ""),
18)
19
20UNIHASH_TABLE_COLUMNS = tuple(name for name, _, _ in UNIHASH_TABLE_DEFINITION)
21
22OUTHASH_TABLE_DEFINITION = (
23 ("method", "TEXT NOT NULL", "UNIQUE"),
24 ("taskhash", "TEXT NOT NULL", "UNIQUE"),
25 ("outhash", "TEXT NOT NULL", "UNIQUE"),
26 ("created", "DATETIME", ""),
27 # Optional fields
28 ("owner", "TEXT", ""),
29 ("PN", "TEXT", ""),
30 ("PV", "TEXT", ""),
31 ("PR", "TEXT", ""),
32 ("task", "TEXT", ""),
33 ("outhash_siginfo", "TEXT", ""),
34)
35
36OUTHASH_TABLE_COLUMNS = tuple(name for name, _, _ in OUTHASH_TABLE_DEFINITION)
37
38USERS_TABLE_DEFINITION = (
39 ("username", "TEXT NOT NULL", "UNIQUE"),
40 ("token", "TEXT NOT NULL", ""),
41 ("permissions", "TEXT NOT NULL", ""),
42)
43
44USERS_TABLE_COLUMNS = tuple(name for name, _, _ in USERS_TABLE_DEFINITION)
45
46
47def _make_table(cursor, name, definition):
48 cursor.execute(
49 """
50 CREATE TABLE IF NOT EXISTS {name} (
51 id INTEGER PRIMARY KEY AUTOINCREMENT,
52 {fields}
53 UNIQUE({unique})
54 )
55 """.format(
56 name=name,
57 fields=" ".join("%s %s," % (name, typ) for name, typ, _ in definition),
58 unique=", ".join(
59 name for name, _, flags in definition if "UNIQUE" in flags
60 ),
61 )
62 )
63
64
65def map_user(row):
66 if row is None:
67 return None
68 return User(
69 username=row["username"],
70 permissions=set(row["permissions"].split()),
71 )
72
73
74class DatabaseEngine(object):
75 def __init__(self, dbname, sync):
76 self.dbname = dbname
77 self.logger = logger
78 self.sync = sync
79
80 async def create(self):
81 db = sqlite3.connect(self.dbname)
82 db.row_factory = sqlite3.Row
83
84 with closing(db.cursor()) as cursor:
85 _make_table(cursor, "unihashes_v2", UNIHASH_TABLE_DEFINITION)
86 _make_table(cursor, "outhashes_v2", OUTHASH_TABLE_DEFINITION)
87 _make_table(cursor, "users", USERS_TABLE_DEFINITION)
88
89 cursor.execute("PRAGMA journal_mode = WAL")
90 cursor.execute(
91 "PRAGMA synchronous = %s" % ("NORMAL" if self.sync else "OFF")
92 )
93
94 # Drop old indexes
95 cursor.execute("DROP INDEX IF EXISTS taskhash_lookup")
96 cursor.execute("DROP INDEX IF EXISTS outhash_lookup")
97 cursor.execute("DROP INDEX IF EXISTS taskhash_lookup_v2")
98 cursor.execute("DROP INDEX IF EXISTS outhash_lookup_v2")
99
100 # TODO: Upgrade from tasks_v2?
101 cursor.execute("DROP TABLE IF EXISTS tasks_v2")
102
103 # Create new indexes
104 cursor.execute(
105 "CREATE INDEX IF NOT EXISTS taskhash_lookup_v3 ON unihashes_v2 (method, taskhash)"
106 )
107 cursor.execute(
108 "CREATE INDEX IF NOT EXISTS outhash_lookup_v3 ON outhashes_v2 (method, outhash)"
109 )
110
111 def connect(self, logger):
Patrick Williamsda295312023-12-05 16:48:56 -0600112 return Database(logger, self.dbname, self.sync)
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600113
114
115class Database(object):
Patrick Williamsda295312023-12-05 16:48:56 -0600116 def __init__(self, logger, dbname, sync):
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600117 self.dbname = dbname
118 self.logger = logger
119
120 self.db = sqlite3.connect(self.dbname)
121 self.db.row_factory = sqlite3.Row
122
123 with closing(self.db.cursor()) as cursor:
Patrick Williamsda295312023-12-05 16:48:56 -0600124 cursor.execute("PRAGMA journal_mode = WAL")
125 cursor.execute(
126 "PRAGMA synchronous = %s" % ("NORMAL" if sync else "OFF")
127 )
128
Patrick Williamsac13d5f2023-11-24 18:59:46 -0600129 cursor.execute("SELECT sqlite_version()")
130
131 version = []
132 for v in cursor.fetchone()[0].split("."):
133 try:
134 version.append(int(v))
135 except ValueError:
136 version.append(v)
137
138 self.sqlite_version = tuple(version)
139
140 async def __aenter__(self):
141 return self
142
143 async def __aexit__(self, exc_type, exc_value, traceback):
144 await self.close()
145
146 async def close(self):
147 self.db.close()
148
149 async def get_unihash_by_taskhash_full(self, method, taskhash):
150 with closing(self.db.cursor()) as cursor:
151 cursor.execute(
152 """
153 SELECT *, unihashes_v2.unihash AS unihash FROM outhashes_v2
154 INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash
155 WHERE outhashes_v2.method=:method AND outhashes_v2.taskhash=:taskhash
156 ORDER BY outhashes_v2.created ASC
157 LIMIT 1
158 """,
159 {
160 "method": method,
161 "taskhash": taskhash,
162 },
163 )
164 return cursor.fetchone()
165
166 async def get_unihash_by_outhash(self, method, outhash):
167 with closing(self.db.cursor()) as cursor:
168 cursor.execute(
169 """
170 SELECT *, unihashes_v2.unihash AS unihash FROM outhashes_v2
171 INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash
172 WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash
173 ORDER BY outhashes_v2.created ASC
174 LIMIT 1
175 """,
176 {
177 "method": method,
178 "outhash": outhash,
179 },
180 )
181 return cursor.fetchone()
182
183 async def get_outhash(self, method, outhash):
184 with closing(self.db.cursor()) as cursor:
185 cursor.execute(
186 """
187 SELECT * FROM outhashes_v2
188 WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash
189 ORDER BY outhashes_v2.created ASC
190 LIMIT 1
191 """,
192 {
193 "method": method,
194 "outhash": outhash,
195 },
196 )
197 return cursor.fetchone()
198
199 async def get_equivalent_for_outhash(self, method, outhash, taskhash):
200 with closing(self.db.cursor()) as cursor:
201 cursor.execute(
202 """
203 SELECT outhashes_v2.taskhash AS taskhash, unihashes_v2.unihash AS unihash FROM outhashes_v2
204 INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash
205 -- Select any matching output hash except the one we just inserted
206 WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash AND outhashes_v2.taskhash!=:taskhash
207 -- Pick the oldest hash
208 ORDER BY outhashes_v2.created ASC
209 LIMIT 1
210 """,
211 {
212 "method": method,
213 "outhash": outhash,
214 "taskhash": taskhash,
215 },
216 )
217 return cursor.fetchone()
218
219 async def get_equivalent(self, method, taskhash):
220 with closing(self.db.cursor()) as cursor:
221 cursor.execute(
222 "SELECT taskhash, method, unihash FROM unihashes_v2 WHERE method=:method AND taskhash=:taskhash",
223 {
224 "method": method,
225 "taskhash": taskhash,
226 },
227 )
228 return cursor.fetchone()
229
230 async def remove(self, condition):
231 def do_remove(columns, table_name, cursor):
232 where = {}
233 for c in columns:
234 if c in condition and condition[c] is not None:
235 where[c] = condition[c]
236
237 if where:
238 query = ("DELETE FROM %s WHERE " % table_name) + " AND ".join(
239 "%s=:%s" % (k, k) for k in where.keys()
240 )
241 cursor.execute(query, where)
242 return cursor.rowcount
243
244 return 0
245
246 count = 0
247 with closing(self.db.cursor()) as cursor:
248 count += do_remove(OUTHASH_TABLE_COLUMNS, "outhashes_v2", cursor)
249 count += do_remove(UNIHASH_TABLE_COLUMNS, "unihashes_v2", cursor)
250 self.db.commit()
251
252 return count
253
254 async def clean_unused(self, oldest):
255 with closing(self.db.cursor()) as cursor:
256 cursor.execute(
257 """
258 DELETE FROM outhashes_v2 WHERE created<:oldest AND NOT EXISTS (
259 SELECT unihashes_v2.id FROM unihashes_v2 WHERE unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash LIMIT 1
260 )
261 """,
262 {
263 "oldest": oldest,
264 },
265 )
266 self.db.commit()
267 return cursor.rowcount
268
269 async def insert_unihash(self, method, taskhash, unihash):
270 with closing(self.db.cursor()) as cursor:
271 prevrowid = cursor.lastrowid
272 cursor.execute(
273 """
274 INSERT OR IGNORE INTO unihashes_v2 (method, taskhash, unihash) VALUES(:method, :taskhash, :unihash)
275 """,
276 {
277 "method": method,
278 "taskhash": taskhash,
279 "unihash": unihash,
280 },
281 )
282 self.db.commit()
283 return cursor.lastrowid != prevrowid
284
285 async def insert_outhash(self, data):
286 data = {k: v for k, v in data.items() if k in OUTHASH_TABLE_COLUMNS}
287 keys = sorted(data.keys())
288 query = "INSERT OR IGNORE INTO outhashes_v2 ({fields}) VALUES({values})".format(
289 fields=", ".join(keys),
290 values=", ".join(":" + k for k in keys),
291 )
292 with closing(self.db.cursor()) as cursor:
293 prevrowid = cursor.lastrowid
294 cursor.execute(query, data)
295 self.db.commit()
296 return cursor.lastrowid != prevrowid
297
298 def _get_user(self, username):
299 with closing(self.db.cursor()) as cursor:
300 cursor.execute(
301 """
302 SELECT username, permissions, token FROM users WHERE username=:username
303 """,
304 {
305 "username": username,
306 },
307 )
308 return cursor.fetchone()
309
310 async def lookup_user_token(self, username):
311 row = self._get_user(username)
312 if row is None:
313 return None, None
314 return map_user(row), row["token"]
315
316 async def lookup_user(self, username):
317 return map_user(self._get_user(username))
318
319 async def set_user_token(self, username, token):
320 with closing(self.db.cursor()) as cursor:
321 cursor.execute(
322 """
323 UPDATE users SET token=:token WHERE username=:username
324 """,
325 {
326 "username": username,
327 "token": token,
328 },
329 )
330 self.db.commit()
331 return cursor.rowcount != 0
332
333 async def set_user_perms(self, username, permissions):
334 with closing(self.db.cursor()) as cursor:
335 cursor.execute(
336 """
337 UPDATE users SET permissions=:permissions WHERE username=:username
338 """,
339 {
340 "username": username,
341 "permissions": " ".join(permissions),
342 },
343 )
344 self.db.commit()
345 return cursor.rowcount != 0
346
347 async def get_all_users(self):
348 with closing(self.db.cursor()) as cursor:
349 cursor.execute("SELECT username, permissions FROM users")
350 return [map_user(r) for r in cursor.fetchall()]
351
352 async def new_user(self, username, permissions, token):
353 with closing(self.db.cursor()) as cursor:
354 try:
355 cursor.execute(
356 """
357 INSERT INTO users (username, token, permissions) VALUES (:username, :token, :permissions)
358 """,
359 {
360 "username": username,
361 "token": token,
362 "permissions": " ".join(permissions),
363 },
364 )
365 self.db.commit()
366 return True
367 except sqlite3.IntegrityError:
368 return False
369
370 async def delete_user(self, username):
371 with closing(self.db.cursor()) as cursor:
372 cursor.execute(
373 """
374 DELETE FROM users WHERE username=:username
375 """,
376 {
377 "username": username,
378 },
379 )
380 self.db.commit()
381 return cursor.rowcount != 0
382
383 async def get_usage(self):
384 usage = {}
385 with closing(self.db.cursor()) as cursor:
386 if self.sqlite_version >= (3, 33):
387 table_name = "sqlite_schema"
388 else:
389 table_name = "sqlite_master"
390
391 cursor.execute(
392 f"""
393 SELECT name FROM {table_name} WHERE type = 'table' AND name NOT LIKE 'sqlite_%'
394 """
395 )
396 for row in cursor.fetchall():
397 cursor.execute(
398 """
399 SELECT COUNT() FROM %s
400 """
401 % row["name"],
402 )
403 usage[row["name"]] = {
404 "rows": cursor.fetchone()[0],
405 }
406 return usage
407
408 async def get_query_columns(self):
409 columns = set()
410 for name, typ, _ in UNIHASH_TABLE_DEFINITION + OUTHASH_TABLE_DEFINITION:
411 if typ.startswith("TEXT"):
412 columns.add(name)
413 return list(columns)