Skip to content

Commit

Permalink
Merge pull request #11 from GenomicMedLab/issue-10-postgres
Browse files Browse the repository at this point in the history
added postgres data store, fix #10
  • Loading branch information
andreasprlic authored Jul 12, 2022
2 parents d31a1b9 + db545bf commit 442b734
Show file tree
Hide file tree
Showing 6 changed files with 235 additions and 0 deletions.
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ install_requires =
canonicaljson
connexion[swagger-ui]
ga4gh.vrs[extras]==0.7.6
psycopg2-binary
setup_requires =
setuptools_scm
packages = find:
Expand Down
3 changes: 3 additions & 0 deletions src/anyvar/storage/README-pg.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
docker pull postgres
docker run -d --name anyvar-pg -p 5432:5432 -e POSTGRES_PASSWORD=postgres postgres
psql -h localhost -U postgres -p 5432
4 changes: 4 additions & 0 deletions src/anyvar/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ def create_storage(uri=None):
from .redisobjectstore import RedisObjectStore
storage = RedisObjectStore(redis.Redis.from_url(uri))

elif parsed_uri.scheme == "postgres":
from .postgres import PostgresObjectStore
storage = PostgresObjectStore(uri)

else:
raise ValueError(f"URI scheme {parsed_uri.scheme} is not implemented")

Expand Down
158 changes: 158 additions & 0 deletions src/anyvar/storage/pg_utility.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
import contextlib
import weakref
import psycopg2
import hgvs
from hgvs.dataproviders.uta import _parse_url
import inspect
import logging
import os

_logger = logging.getLogger(__name__)


class PostgresClient:
def __init__(self,
db_url,
pooling=False,
application_name=None,
mode=None,
cache=None):
url = _parse_url(db_url)
#if url.schema is None:
# raise Exception("No schema name provided in {url}".format(url=url))
if url.scheme != "postgres":
raise Exception("Only Postgres databases supported for now")
self.application_name = application_name
self.pooling = pooling
self._conn = None
self.url = url
# If we're using connection pooling, track the set of DB
# connections we've seen; on first use we set the schema
# search path. Use weak references to avoid keeping connection
# objects alive unnecessarily.
self._conns_seen = weakref.WeakSet()

def __del__(self):
self.close()

def close(self):
if self.pooling:
self._pool.closeall()
else:
if self._conn is not None:
self._conn.close()

def _connect(self):
if self.application_name is None:
st = inspect.stack()
self.application_name = os.path.basename(st[-1][1])
conn_args = dict(
host=self.url.hostname,
port=self.url.port,
database=self.url.database,
user=self.url.username,
password=self.url.password,
application_name=self.application_name + "/" + hgvs.__version__,
)
if self.pooling:
_logger.info("Using UTA ThreadedConnectionPool")
self._pool = psycopg2.pool.ThreadedConnectionPool(
hgvs.global_config.uta.pool_min,
hgvs.global_config.uta.pool_max, **conn_args)
else:
self._conn = psycopg2.connect(**conn_args)
self._conn.autocommit = True
with self._get_cursor() as cur:
self._set_search_path(cur)

self.ensure_schema_exists()

def _create_schema(self):
create_sql = "CREATE TABLE vrs_objects (id BIGSERIAL primary key, vrs_id text, vrs_object jsonb);"
self._insert_one(create_sql)

def ensure_schema_exists(self):
# N.B. On AWS RDS, information_schema.schemata always returns zero rows
r = self._fetchone(
"select exists(SELECT 1 FROM pg_catalog.pg_tables WHERE tablename = 'vrs_objects')"
)
if r[0]:
return
self._create_schema()

def _fetchone(self, sql, *args):
with self._get_cursor() as cur:
cur.execute(sql, *args)
return cur.fetchone()

def _fetchall(self, sql, *args):
with self._get_cursor() as cur:
cur.execute(sql, *args)
return cur.fetchall()

def _insert_one(self, sql, *args):
with self._get_cursor() as cur:
cur.execute(sql, *args)

@contextlib.contextmanager
def _get_cursor(self, n_retries=1):
"""Returns a context manager for obtained from a single or pooled
connection, and sets the PostgreSQL search_path to the schema
specified in the connection URL.
Although *connections* are threadsafe, *cursors* are bound to
connections and are *not* threadsafe. Do not share cursors
across threads.
Use this funciton like this::
with hdp._get_cursor() as cur:
# your code
Do not call this function outside a contextmanager.
"""

n_tries_rem = n_retries + 1
while n_tries_rem > 0:
try:

conn = self._pool.getconn() if self.pooling else self._conn

# autocommit=True obviates closing explicitly
conn.autocommit = True

cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
if self.pooling:
# this might be a new connection, in which case we
# need to set the search path
if conn not in self._conns_seen:
self._set_search_path(cur)
self._conns_seen.add(conn)

yield cur

# contextmanager executes these when context exits
cur.close()
if self.pooling:
self._pool.putconn(conn)

break

except psycopg2.OperationalError:

_logger.warning(
"Lost connection to {url}; attempting reconnect".format(
url=self.url))
if self.pooling:
self._pool.closeall()
self._connect()
_logger.warning("Reconnected to {url}".format(url=self.url))

n_tries_rem -= 1

else:

# N.B. Probably never reached
raise RuntimeError(
"Permanently lost connection to {url} ({n} retries)".format(
url=self.url, n=n_retries))

def _set_search_path(self, cur):
cur.execute(
"set search_path = {self.url.schema},public;".format(self=self))
65 changes: 65 additions & 0 deletions src/anyvar/storage/postgres.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import collections
import datetime
import functools
import logging
import os
import shelve
import json
import zlib

import ga4gh.vrs
from ga4gh.core import is_pjs_instance
from .pg_utility import PostgresClient

_logger = logging.getLogger(__name__)

silos = "locations alleles haplotypes genotypes variationsets relations texts".split()


class PostgresObjectStore:
"""Super simple key-value storage for GA4GH VRS objects"""

def __init__(self, db_url):
self.conn = PostgresClient(db_url=db_url)
self.conn._connect()

def __repr__(self):
return str(self.conn)

def __setitem__(self, name, value):
assert is_pjs_instance(value), "ga4gh.vrs object value required"
name = str(name) # in case str-like
d = value.as_dict()
j = json.dumps(d)
self.conn._insert_one(f"insert into vrs_objects (vrs_id, vrs_object) values (%s,%s)", [name, j])

def __getitem__(self, name):
name = str(name) # in case str-like
data = self.conn._fetchone(f"select vrs_object from vrs_objects where vrs_id = %s", [name])
if data:
data = data[0]
typ = data["type"]
vo = ga4gh.vrs.models[typ](**data)
return vo

def __contains__(self, name):
name = str(name) # in case str-like
return self._db.__contains__(name)

def __delitem__(self, name):
name = str(name) # in case str-like
del self._db[name]

def __del__(self):
self._db.close()

def __len__(self):
return self._db.__len__()

def __iter__(self):
return self._db.__iter__()

def keys(self):
return self._db.keys()


4 changes: 4 additions & 0 deletions src/anyvar/storage/postgres_init.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
CREATE USER anyvar;
CREATE DATABASE anyvar_db;
GRANT ALL PRIVILEGES ON DATABASE anyvar_db TO anyvar;
CREATE TABLE vrs_objects (id BIGINT primary key, vrs_id text, vrs_object jsonb);

0 comments on commit 442b734

Please # to comment.