From f78ec4d4ff9b1896491893e55b05298a84baa688 Mon Sep 17 00:00:00 2001 From: Jeremy Arbesfeld Date: Tue, 12 Jul 2022 12:08:58 -0400 Subject: [PATCH 1/2] added postgres data store, fix #10 --- setup.cfg | 1 + src/anyvar/storage/README-pg.md | 2 + src/anyvar/storage/__init__.py | 4 + src/anyvar/storage/pg_utility.py | 158 +++++++++++++++++++++++++++ src/anyvar/storage/postgres.py | 65 +++++++++++ src/anyvar/storage/postgres_init.sql | 4 + 6 files changed, 234 insertions(+) create mode 100644 src/anyvar/storage/README-pg.md create mode 100644 src/anyvar/storage/pg_utility.py create mode 100644 src/anyvar/storage/postgres.py create mode 100644 src/anyvar/storage/postgres_init.sql diff --git a/setup.cfg b/setup.cfg index 8d98e17..f8077de 100644 --- a/setup.cfg +++ b/setup.cfg @@ -11,6 +11,7 @@ install_requires = canonicaljson connexion[swagger-ui] ga4gh.vrs[extras]==0.7.6 + psycopg2-binary setup_requires = setuptools_scm packages = find: diff --git a/src/anyvar/storage/README-pg.md b/src/anyvar/storage/README-pg.md new file mode 100644 index 0000000..d321f4c --- /dev/null +++ b/src/anyvar/storage/README-pg.md @@ -0,0 +1,2 @@ +Docker pull postgres +docker run --name anyvar-pg -e POSTGRES_PASSWORD=mysecretpassword -d postgres \ No newline at end of file diff --git a/src/anyvar/storage/__init__.py b/src/anyvar/storage/__init__.py index c86f14d..00a4f3a 100644 --- a/src/anyvar/storage/__init__.py +++ b/src/anyvar/storage/__init__.py @@ -52,6 +52,10 @@ def create_storage(uri=None): from .redisobjectstore import RedisObjectStore storage = RedisObjectStore(redis.Redis.from_url(uri)) + elif parsed_uri.scheme == "postgres": + from .postgres import PostgresObjectStore + storage = PostgresObjectStore(uri) + else: raise ValueError(f"URI scheme {parsed_uri.scheme} is not implemented") diff --git a/src/anyvar/storage/pg_utility.py b/src/anyvar/storage/pg_utility.py new file mode 100644 index 0000000..fae5be9 --- /dev/null +++ b/src/anyvar/storage/pg_utility.py @@ -0,0 +1,158 @@ +import contextlib +import weakref +import psycopg2 +import hgvs +from hgvs.dataproviders.uta import _parse_url +import inspect +import logging +import os + +_logger = logging.getLogger(__name__) + + +class PostgresClient: + def __init__(self, + db_url, + pooling=False, + application_name=None, + mode=None, + cache=None): + url = _parse_url(db_url) + #if url.schema is None: + # raise Exception("No schema name provided in {url}".format(url=url)) + if url.scheme != "postgres": + raise Exception("Only Postgres databases supported for now") + self.application_name = application_name + self.pooling = pooling + self._conn = None + self.url = url + # If we're using connection pooling, track the set of DB + # connections we've seen; on first use we set the schema + # search path. Use weak references to avoid keeping connection + # objects alive unnecessarily. + self._conns_seen = weakref.WeakSet() + + def __del__(self): + self.close() + + def close(self): + if self.pooling: + self._pool.closeall() + else: + if self._conn is not None: + self._conn.close() + + def _connect(self): + if self.application_name is None: + st = inspect.stack() + self.application_name = os.path.basename(st[-1][1]) + conn_args = dict( + host=self.url.hostname, + port=self.url.port, + database=self.url.database, + user=self.url.username, + password=self.url.password, + application_name=self.application_name + "/" + hgvs.__version__, + ) + if self.pooling: + _logger.info("Using UTA ThreadedConnectionPool") + self._pool = psycopg2.pool.ThreadedConnectionPool( + hgvs.global_config.uta.pool_min, + hgvs.global_config.uta.pool_max, **conn_args) + else: + self._conn = psycopg2.connect(**conn_args) + self._conn.autocommit = True + with self._get_cursor() as cur: + self._set_search_path(cur) + + self.ensure_schema_exists() + + def _create_schema(self): + create_sql = "CREATE TABLE vrs_objects (id BIGSERIAL primary key, vrs_id text, vrs_object jsonb);" + self._insert_one(create_sql) + + def ensure_schema_exists(self): + # N.B. On AWS RDS, information_schema.schemata always returns zero rows + r = self._fetchone( + "select exists(SELECT 1 FROM pg_catalog.pg_tables WHERE tablename = 'vrs_objects')" + ) + if r[0]: + return + self._create_schema() + + def _fetchone(self, sql, *args): + with self._get_cursor() as cur: + cur.execute(sql, *args) + return cur.fetchone() + + def _fetchall(self, sql, *args): + with self._get_cursor() as cur: + cur.execute(sql, *args) + return cur.fetchall() + + def _insert_one(self, sql, *args): + with self._get_cursor() as cur: + cur.execute(sql, *args) + + @contextlib.contextmanager + def _get_cursor(self, n_retries=1): + """Returns a context manager for obtained from a single or pooled + connection, and sets the PostgreSQL search_path to the schema + specified in the connection URL. + Although *connections* are threadsafe, *cursors* are bound to + connections and are *not* threadsafe. Do not share cursors + across threads. + Use this funciton like this:: + with hdp._get_cursor() as cur: + # your code + Do not call this function outside a contextmanager. + """ + + n_tries_rem = n_retries + 1 + while n_tries_rem > 0: + try: + + conn = self._pool.getconn() if self.pooling else self._conn + + # autocommit=True obviates closing explicitly + conn.autocommit = True + + cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor) + if self.pooling: + # this might be a new connection, in which case we + # need to set the search path + if conn not in self._conns_seen: + self._set_search_path(cur) + self._conns_seen.add(conn) + + yield cur + + # contextmanager executes these when context exits + cur.close() + if self.pooling: + self._pool.putconn(conn) + + break + + except psycopg2.OperationalError: + + _logger.warning( + "Lost connection to {url}; attempting reconnect".format( + url=self.url)) + if self.pooling: + self._pool.closeall() + self._connect() + _logger.warning("Reconnected to {url}".format(url=self.url)) + + n_tries_rem -= 1 + + else: + + # N.B. Probably never reached + raise RuntimeError( + "Permanently lost connection to {url} ({n} retries)".format( + url=self.url, n=n_retries)) + + def _set_search_path(self, cur): + cur.execute( + "set search_path = {self.url.schema},public;".format(self=self)) diff --git a/src/anyvar/storage/postgres.py b/src/anyvar/storage/postgres.py new file mode 100644 index 0000000..bfe367d --- /dev/null +++ b/src/anyvar/storage/postgres.py @@ -0,0 +1,65 @@ +import collections +import datetime +import functools +import logging +import os +import shelve +import json +import zlib + +import ga4gh.vrs +from ga4gh.core import is_pjs_instance +from .pg_utility import PostgresClient + +_logger = logging.getLogger(__name__) + +silos = "locations alleles haplotypes genotypes variationsets relations texts".split() + + +class PostgresObjectStore: + """Super simple key-value storage for GA4GH VRS objects""" + + def __init__(self, db_url): + self.conn = PostgresClient(db_url=db_url) + self.conn._connect() + + def __repr__(self): + return str(self.conn) + + def __setitem__(self, name, value): + assert is_pjs_instance(value), "ga4gh.vrs object value required" + name = str(name) # in case str-like + d = value.as_dict() + j = json.dumps(d) + self.conn._insert_one(f"insert into vrs_objects (vrs_id, vrs_object) values (%s,%s)", [name, j]) + + def __getitem__(self, name): + name = str(name) # in case str-like + data = self.conn._fetchone(f"select vrs_object from vrs_objects where vrs_id = %s", [name]) + if data: + data = data[0] + typ = data["type"] + vo = ga4gh.vrs.models[typ](**data) + return vo + + def __contains__(self, name): + name = str(name) # in case str-like + return self._db.__contains__(name) + + def __delitem__(self, name): + name = str(name) # in case str-like + del self._db[name] + + def __del__(self): + self._db.close() + + def __len__(self): + return self._db.__len__() + + def __iter__(self): + return self._db.__iter__() + + def keys(self): + return self._db.keys() + + diff --git a/src/anyvar/storage/postgres_init.sql b/src/anyvar/storage/postgres_init.sql new file mode 100644 index 0000000..35152ab --- /dev/null +++ b/src/anyvar/storage/postgres_init.sql @@ -0,0 +1,4 @@ +CREATE USER anyvar; +CREATE DATABASE anyvar_db; +GRANT ALL PRIVILEGES ON DATABASE anyvar_db TO anyvar; +CREATE TABLE vrs_objects (id BIGINT primary key, vrs_id text, vrs_object jsonb); \ No newline at end of file From 22000160d493e9e30d85b81310b649677995d74c Mon Sep 17 00:00:00 2001 From: Jeremy Arbesfeld Date: Tue, 12 Jul 2022 12:16:22 -0400 Subject: [PATCH 2/2] modified readme for setting up postgres in docker --- src/anyvar/storage/README-pg.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/anyvar/storage/README-pg.md b/src/anyvar/storage/README-pg.md index d321f4c..971f93b 100644 --- a/src/anyvar/storage/README-pg.md +++ b/src/anyvar/storage/README-pg.md @@ -1,2 +1,3 @@ -Docker pull postgres -docker run --name anyvar-pg -e POSTGRES_PASSWORD=mysecretpassword -d postgres \ No newline at end of file +docker pull postgres +docker run -d --name anyvar-pg -p 5432:5432 -e POSTGRES_PASSWORD=postgres postgres +psql -h localhost -U postgres -p 5432 \ No newline at end of file