-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathengine.py
92 lines (71 loc) · 2.67 KB
/
engine.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import dataclasses
import json
from decimal import Decimal
from shared.config import get_config
from shared.utils.ReportEncoder import ReportEncoder
from sqlalchemy import create_engine
from sqlalchemy.orm import Session, scoped_session, sessionmaker
import database.events # noqa: F401
from database.models.timeseries import TimeseriesBaseModel
from helpers.timeseries import timeseries_enabled
from .base import Base
def create_all(engine):
Base.metadata.create_all(engine)
class DatabaseEncoder(ReportEncoder):
def default(self, obj):
if dataclasses.is_dataclass(obj):
return dataclasses.astuple(obj)
if isinstance(obj, Decimal):
return str(obj)
return super().default(obj)
def json_dumps(d):
return json.dumps(d, cls=DatabaseEncoder)
class SessionFactory:
def __init__(self, database_url, timeseries_database_url=None):
self.database_url = database_url
self.timeseries_database_url = timeseries_database_url
self.main_engine = None
self.timeseries_engine = None
def create_session(self):
self.main_engine = create_engine(
self.database_url,
json_serializer=json_dumps,
)
if timeseries_enabled():
self.timeseries_engine = create_engine(
self.timeseries_database_url,
json_serializer=json_dumps,
)
main_engine = self.main_engine
timeseries_engine = self.timeseries_engine
class RoutingSession(Session):
def get_bind(self, mapper=None, clause=None, **kw):
if mapper is not None and issubclass(
mapper.class_, TimeseriesBaseModel
):
return timeseries_engine
if (
clause is not None
and hasattr(clause, "table")
and clause.table.name.startswith("timeseries_")
):
return timeseries_engine
return main_engine
session_factory = sessionmaker(class_=RoutingSession)
else:
session_factory = sessionmaker(bind=self.main_engine)
return scoped_session(session_factory)
session_factory = SessionFactory(
database_url=get_config(
"services",
"database_url",
default="postgres://postgres:@postgres:5432/postgres",
),
timeseries_database_url=get_config(
"services",
"timeseries_database_url",
default="postgres://postgres:@timescale:5432/postgres",
),
)
session = session_factory.create_session()
get_db_session = session