1
1
"""
2
2
This module implements a backend for sample storage in ClickHouse.
3
3
"""
4
+
4
5
import base64
5
6
import logging
6
7
import time
45
46
}
46
47
47
48
49
+ class ClickHouseBackendError (Exception ):
50
+ """Something bad happened in the ClickHouse backend."""
51
+
52
+
48
53
def create_runs_table (client : clickhouse_driver .Client ):
49
54
query = """
50
55
CREATE TABLE IF NOT EXISTS runs (
@@ -79,7 +84,7 @@ def create_chain_table(client: clickhouse_driver.Client, meta: ChainMeta, rmeta:
79
84
# Check that it does not already exist
80
85
cid = chain_id (meta )
81
86
if client .execute (f"SHOW TABLES LIKE '{ cid } ';" ):
82
- raise Exception (f"A table for { cid } already exists." )
87
+ raise ClickHouseBackendError (f"A table for { cid } already exists." )
83
88
84
89
# Create a table with columns corresponding to the model variables
85
90
columns = []
@@ -235,7 +240,7 @@ def _get_row_at(
235
240
query = f"SELECT (`{ names } `,) FROM { self .cid } WHERE _draw_idx={ idx } ;"
236
241
data = self ._client .execute (query )
237
242
if not data :
238
- raise Exception (f"No record found for draw index { idx } ." )
243
+ raise ClickHouseBackendError (f"No record found for draw index { idx } ." )
239
244
result = dict (zip (var_names , data [0 ][0 ]))
240
245
return result
241
246
@@ -363,7 +368,10 @@ def __init__(
363
368
raise ValueError ("Either a `client` or a `client_fn` must be provided." )
364
369
365
370
if client_fn is None :
366
- client_fn = lambda : client
371
+
372
+ def client_fn ():
373
+ return client
374
+
367
375
if client is None :
368
376
client = client_fn ()
369
377
@@ -381,11 +389,11 @@ def init_run(self, meta: RunMeta) -> ClickHouseRun:
381
389
else :
382
390
created_at = datetime .now ().astimezone (timezone .utc )
383
391
query = "INSERT INTO runs (created_at, rid, proto) VALUES"
384
- params = dict (
385
- created_at = created_at ,
386
- rid = meta .rid ,
387
- proto = base64 .encodebytes (bytes (meta )).decode ("ascii" ),
388
- )
392
+ params = {
393
+ " created_at" : created_at ,
394
+ " rid" : meta .rid ,
395
+ " proto" : base64 .encodebytes (bytes (meta )).decode ("ascii" ),
396
+ }
389
397
self ._client .execute (query , [params ])
390
398
return ClickHouseRun (meta , client_fn = self ._client_fn , created_at = created_at )
391
399
@@ -407,7 +415,9 @@ def get_run(self, rid: str) -> ClickHouseRun:
407
415
{"rid" : rid },
408
416
)
409
417
if len (rows ) != 1 :
410
- raise Exception (f"Unexpected number of { len (rows )} results for rid='{ rid } '." )
418
+ raise ClickHouseBackendError (
419
+ f"Unexpected number of { len (rows )} results for rid='{ rid } '."
420
+ )
411
421
data = base64 .decodebytes (rows [0 ][2 ].encode ("ascii" ))
412
422
meta = RunMeta ().parse (data )
413
423
return ClickHouseRun (
0 commit comments