Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Commit

Permalink
Merge pull request #6 from KPMP/develop
Browse files Browse the repository at this point in the history
Version 1.0
  • Loading branch information
zwright authored Nov 30, 2020
2 parents 80e1145 + 129d4f6 commit 55c023a
Show file tree
Hide file tree
Showing 11 changed files with 328 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .env_example
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
mysql_user=root
mysql_pwd=welcome
132 changes: 132 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
.env
.idea/*

# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST

# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/

# Translations
*.mo
*.pot

# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
target/

# Jupyter Notebook
.ipynb_checkpoints

# IPython
profile_default/
ipython_config.py

# pyenv
.python-version

# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock

# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/

# Celery stuff
celerybeat-schedule
celerybeat.pid

# SageMath parsed files
*.sage.py

# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# Spyder project settings
.spyderproject
.spyproject

# Rope project settings
.ropeproject

# mkdocs documentation
/site

# mypy
.mypy_cache/
.dmypy.json
dmypy.json

# Pyre type checker
.pyre/
5 changes: 5 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
FROM python:3.8.6-alpine3.12
WORKDIR /project
ADD . /project
RUN pip install -r requirements.txt
CMD ["python", "application.py"]
58 changes: 58 additions & 0 deletions application.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import flask
import sys
from index_creator.core.index_creation import generate_index
from index_creator.core.update_es import update_file_cases
import logging

app = flask.Flask('index-creation-worker')

handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
app.logger.addHandler(handler)
app.logger.setLevel(logging.DEBUG)

@app.route('/api/v1/index/file_cases', methods=['PUT'])
def updateFileCases():
try:
update_statement = generate_index()
if update_statement is not None:
update_file_cases(update_statement)
return "ok - updated with all files"
else:
app.logger.warn("Index not updated, no results to apply")
return "Index not updated, no results to apply"
except:
app.logger.error(str(sys.exc_info()[0]) + " on line: " + str(sys.exc_info()[-1].tb_lineno))
return "There was an error updating the index. Check the logs"

@app.route('/api/v1/index/file_cases/file_id/<string:file_id>', methods=['PUT'])
def updateFileCase(file_id):
try:
update_statement = generate_index(file_id=file_id)
if update_statement is not None:
update_file_cases(update_statement)
return "ok - updated with file: " + str(file_id)
else:
app.logger.warn("Index not updated, no results to apply")
return "Index not updated, no results to apply"
except:
app.logger.error(str(sys.exc_info()[0]) + " on line: " + str(sys.exc_info()[-1].tb_lineno))
return "There was an error updating the index. Check the logs"

@app.route('/api/v1/index/file_cases/release_ver/<string:release_ver>', methods=['PUT'])
def updateFileCaseRelease(release_ver):
try:
update_statement = generate_index(release_ver=release_ver)
if update_statement is not None:
update_file_cases(update_statement)
return "ok - updated release: " + release_ver
else:
app.logger.warn("Index not updated, no results to apply")
return "Index not updated, no results to apply"
except:
app.logger.error(str(sys.exc_info()[0]) + " on line: " + str(sys.exc_info()[-1].tb_lineno))
return "There was an error updating the index. Check the logs"

if __name__ == '__main__':
app.run(host='0.0.0.0', debug=False)
Empty file added index_creator/core/__init__.py
Empty file.
92 changes: 92 additions & 0 deletions index_creator/core/index_creation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import json
import mysql.connector
import os
from index_creator.models.FileCasesIndexDoc import FileCasesIndexDoc
from index_creator.models.IndexDoc import IndexDoc
import logging

log = logging.getLogger('index-creation-worker.index_creation')

def get_index_update_json(id):
return '{"update":{"_index":"file_cases","_id":"' + str(id) + '"}}'

def get_index_doc_json(index_doc):
try:
index_doc.cases = index_doc.cases.__dict__
json_doc = json.dumps(index_doc.__dict__)
doc = '{"doc":' + json_doc + ',"doc_as_upsert":true}'
except TypeError as err:
log.error(err)
return doc

def generate_index(file_id = None, release_ver = None):
mysql_user = os.environ.get('MYSQL_USER')
mysql_pwd = os.environ.get('MYSQL_ROOT_PASSWORD')

mydb = mysql.connector.connect(
host="mariadb",
user=mysql_user,
password=mysql_pwd,
database="knowledge_environment",
converter_class=MyConverter
)
try:
mycursor = mydb.cursor(buffered=True, dictionary=True)

where_clause = ""
if file_id is not None:
where_clause = " WHERE f.file_id = '" + str(file_id) + "' "
elif release_ver is not None:
where_clause = " WHERE f.release_ver = " + str(release_ver) + " "

query = ("SELECT f.*, p.*, m.* FROM file f "
"JOIN file_participant fp on f.file_id = fp.file_id "
"JOIN participant p on fp.participant_id = p.participant_id "
"JOIN metadata_type m on f.metadata_type_id = m.metadata_type_id " + where_clause +
"order by f.file_id")

mycursor.execute(query)
documents = {}
if not mycursor.rowcount:
log.error("query returned 0 results. ES index will not be updated")
pass

update_statement = '';
for row in mycursor:

if row["file_id"] in documents:
index_doc = documents[row["file_id"]]
# Not adding a new tissue source because we should only have one tissue source per file
index_doc.cases.samples["participant_id"].append(row['participant_id'])
index_doc.cases.samples["sample_type"].append(row['sample_type'])
index_doc.cases.samples["tissue_type"].append(row['tissue_type'])
index_doc.cases.demographics["age"].append(row['age_binned'])
index_doc.cases.demographics["sex"].append(row['sex'])
else:
cases_doc = FileCasesIndexDoc([row['tissue_source']], {"participant_id":[row['participant_id']], "tissue_type":[row['tissue_type']], "sample_type":[row['sample_type']]},{"sex":[row['sex']], "age":[row['age_binned']]})
index_doc = IndexDoc(row["access"], row["platform"], row["experimental_strategy"], row["data_category"], row["workflow_type"], row["data_format"], row["data_type"], row["file_id"], row["file_name"], row["file_size"], row["protocol"], row["package_id"], cases_doc)
documents[row["file_id"]] = index_doc

for id in documents:
update_statement = update_statement + get_index_update_json(id) + "\n" + get_index_doc_json(documents[id]) + "\n"
try:
index_doc
return update_statement
except NameError:
log.error("Unable to process results");
pass
finally:
mycursor.close()
mydb.close()

class MyConverter(mysql.connector.conversion.MySQLConverter):

def row_to_python(self, row, fields):
row = super(MyConverter, self).row_to_python(row, fields)

def to_unicode(col):
if isinstance(col, bytearray):
return col.decode('utf-8')
return col

return[to_unicode(col) for col in row]
14 changes: 14 additions & 0 deletions index_creator/core/update_es.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from elasticsearch import Elasticsearch, ConnectionError, ElasticsearchException
import logging

log = logging.getLogger('index-creation-worker.update_file_cases')

def update_file_cases(update_statement):
try:
elastic = Elasticsearch(['elasticsearch'])
elastic.bulk(update_statement)
except ConnectionError as connectionError:
log.error("error communicating with elasicsearch: " + str(connectionError))
except ElasticsearchException as e:
log.error("other error with elasticsearch: " + str(e))

5 changes: 5 additions & 0 deletions index_creator/models/FileCasesIndexDoc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
class FileCasesIndexDoc:
def __init__(self, tissue_source, samples, demographics):
self.tissue_source = tissue_source
self.samples = samples
self.demographics = demographics
15 changes: 15 additions & 0 deletions index_creator/models/IndexDoc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
class IndexDoc:
def __init__(self, access, platform, experimental_strategy, data_category, workflow_type, data_format, data_type, file_id, file_name, file_size, protocol, package_id, cases):
self.access = access
self.platform = platform
self.experimental_strategy = experimental_strategy
self.data_category = data_category
self.workflow_type = workflow_type
self.file_id = file_id
self.file_name = file_name
self.data_format = data_format
self.file_size = file_size
self.data_type = data_type
self.protocol = protocol
self.package_id = package_id
self.cases = cases
Empty file.
5 changes: 5 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Flask==1.1.2
elasticsearch==7.9.1
mysql_connector_repackaged==0.3.1
mysql-connector==2.2.9
python-dotenv==0.15.0

0 comments on commit 55c023a

Please # to comment.