-
Notifications
You must be signed in to change notification settings - Fork 90
Analytics on a single machine using Docker
The repository includes a "Single Machine" Docker Compose configuration which brings up the FHIR Pipelines Controller plus a Spark Thrift server, letting you more easily run Spark SQL queries on the Parquet files output by the Pipelines Controller.
To learn how the Pipelines Controller works on its own, Try out the FHIR Pipelines Controller.
- A source HAPI FHIR server configured to use Postgres as its database
- If you don't have a server, use a local test server by following the instructions to bring up a source HAPI FHIR server with Postgres
-
Docker
- If you are using Linux, Docker must be in sudoless mode
- Docker Compose
- The FHIR Data Pipes repository, cloned onto the host machine
Note: All file paths are relative to the root of the FHIR Data Pipes repository.
NOTE: You need to configure only one of the following options:
-
For FHIR Search API (works for any FHIR server):
- Open
docker/config/application.yaml
and edit the value offhirServerUrl
to match the FHIR server you are connecting to. - Comment out the
dbConfig
in this case.
- Open
-
For direct DB access (specific to HAPI FHIR servers):
- Comment out
fhirServerUrl
- Set
dbConfig
to the DB connection config file, e.g.,docker/config/hapi-postgres-config_local.json
; - Edit the values in this file to match the database for the FHIR server you are connecting to.
- Comment out
With the default config, you will create both Parquet files (under dwhRootPrefix
) and flattened views in the database configured by sinkDbConfigPath
here.
- If you don't need flattened views you can comment out that setting.
- If you do need them, make sure you create the DB referenced in the connection config file, e.g., with the following SQL query:
CREATE DATABASE views;
which you can run in Postgres like this:
PGPASSWORD=admin psql -h 127.0.0.1 -p 5432 -U admin postgres -c "CREATE DATABASE views"
For documentation of all config parameters, see here.
If you are using the local test servers, things should work with the default values. If not, use the IP address of the Docker default bridge network. To find it, run the following command and use the "Gateway" value:
docker network inspect bridge | grep Gateway
The Single Machine docker configuration uses two environment variables, DWH_ROOT
and PIPELINE_CONFIG
, whose default values are defined in the .env file. To override them, set the variable before running the docker-compose
command. For example, to override the DWH_ROOT
environment variable, run the following:
DWH_ROOT="$(pwd)/my-amazing-data" docker-compose -f docker/compose-controller-spark-sql-single.yaml up --force-recreate
To bring up the docker/compose-controller-spark-sql-single.yaml
configuration for the first time or if you have run this container in the past and want to include new changes pulled into the repo, run:
docker-compose -f docker/compose-controller-spark-sql-single.yaml up --force-recreate --build
Alternatively, to run without rebuilding use:
docker-compose -f docker/compose-controller-spark-sql-single.yaml up --force-recreate
Alternatively, docker/compose-controller-spark-sql.yaml
serves as a very simple example on how to integrate the Parquet output of Pipelines in a Spark cluster environment.
Once started, the Pipelines Controller is available at http://localhost:8090
and the Spark Thrift server is at http://localhost:10001
.
The first time you run the Pipelines Controller, you must manually start a Full Pipeline run. In a browser go to http://localhost:8090
and click the Run Full button.
After running the Full Pipeline, use the Incremental Pipeline to update the Parquet files and tables. By default it is scheduled to run every hour, or you can manually trigger it.
If the Incremental Pipeline does not work, or you see errors like:
ERROR o.openmrs.analytics.PipelineManager o.openmrs.analytics.PipelineManager$PipelineThread.run:343 - exception while running pipeline:
pipeline-controller | java.sql.SQLException: org.apache.hive.service.cli.HiveSQLException: Error running query: org.apache.spark.sql.AnalysisException: Unable to infer schema for Parquet. It must be specified manually.
try running sudo chmod -R 755
on the Parquet file directory, by default located at docker/dwh
.
Connect to the Spark Thrift server using a client that supports Apache Hive. For example, if using the JDBC driver, the URL should be jdbc:hive2://localhost:10001
. The pipeline will automatically create Patient
, Encounter
, and Observation
tables when run.
Let's do some basic quality checks to make sure the data is uploaded properly (note table names are case insensitive):
SELECT COUNT(0) FROM Patient;
We should have exactly 79 patients:
+-----------+
| count(0) |
+-----------+
| 79 |
+-----------+
Doing the same for observations:
SELECT COUNT(0) FROM Observation;
+-----------+
| count(0) |
+-----------+
| 17279 |
+-----------+
One major challenge when querying exported data is that FHIR resources have many nested fields. One approach is to use LATERAL VIEW
with EXPLODE
to flatten repeated fields and then filter for specific values of interest.
The following queries explore the sample data loaded when using a local test server. They leverage LATERAL VIEW
with EXPLODE
to flatten the Observation.code.coding repeated field and filter for specific observation codes.
Note that the synthetic sample data simulates HIV patients. Observations for HIV viral load use the following code, which is not the actual LOINC code:
[
{
"id":null,
"coding":[
{
"id":null,
"system":"http://loinc.org",
"version":null,
"code":"856AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA",
"display":"HIV viral load",
"userSelected":null
}
],
"text":"HIV viral load"
}
]
Let's say we are interested only in certain observation codes. For working with repeated fields
like Observation.code.coding
sometime it is easier to first "flatten" the table on that field.
Conceptually this means that an Observation row with say 4 codes will be repeated 4 times
where each row has exactly one of those 4 values. Here is a query that does that selecting only
rows with "viral load" observations (code 856):
SELECT O.id AS obs_id, OCC.`system`, OCC.code, O.status AS status, O.value.quantity.value AS value
FROM Observation AS O LATERAL VIEW explode(code.coding) AS OCC
WHERE OCC.`system` = 'http://loinc.org'
AND OCC.code LIKE '856A%'
LIMIT 4;
Sample output:
+---------+-------------------+---------------------------------------+---------+-----------+
| obs_id | system | code | status | value |
+---------+-------------------+---------------------------------------+---------+-----------+
| 10393 | http://loinc.org | 856AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA | final | 224650.0 |
| 12446 | http://loinc.org | 856AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA | final | 823010.0 |
| 14456 | http://loinc.org | 856AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA | final | 166100.0 |
| 15991 | http://loinc.org | 856AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA | final | 64630.0 |
+---------+-------------------+---------------------------------------+---------+-----------+
Now, let's say we are interested only in cases with high viral load; and for each patient we need some demographic information too. We can use the flat table we created above and join it with the Patient resource table:
SELECT P.id AS pid, P.name.family AS family, P.gender AS gender, O.id AS obs_id, OCC.`system`,
OCC.code, O.status AS status, O.value.quantity.value AS value
FROM Patient AS P, Observation AS O LATERAL VIEW explode(code.coding) AS OCC
WHERE P.id = O.subject.PatientId
AND OCC.`system` = 'http://loinc.org'
AND OCC.code LIKE '856A%'
AND O.value.quantity.value > 10000
LIMIT 4;
Sample output:
+--------+-----------------------------------+---------+---------+-------------------+---------------------------------------+---------+-----------+
| pid | family | gender | obs_id | system | code | status | value |
+--------+-----------------------------------+---------+---------+-------------------+---------------------------------------+---------+-----------+
| 10091 | ["Fritsch593"] | male | 10393 | http://loinc.org | 856AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA | final | 224650.0 |
| 11689 | ["Dickinson688"] | male | 12446 | http://loinc.org | 856AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA | final | 823010.0 |
| 13230 | ["Jerde200","Ruecker817"] | female | 14456 | http://loinc.org | 856AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA | final | 166100.0 |
| 15315 | ["Pfeffer420","Pfannerstill264"] | female | 15991 | http://loinc.org | 856AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA | final | 64630.0 |
+--------+-----------------------------------+---------+---------+-------------------+---------------------------------------+---------+-----------+
SELECT COUNT(0)
FROM (
SELECT P.id AS pid, P.name.family AS family, P.gender AS gender, O.id AS obs_id, OCC.`system`,
OCC.code, O.status AS status, O.value.quantity.value
FROM Patient AS P, Observation AS O LATERAL VIEW explode(code.coding) AS OCC
WHERE P.id = O.subject.PatientId
AND OCC.`system` = 'http://loinc.org'
AND OCC.code LIKE '856A%'
);
Sample output:
+-----------+
| count(0) |
+-----------+
| 265 |
+-----------+
Once you have a query that filters to the data you're interested in, create a view with a simpler schema to work with in the future. This is a good way to create building blocks to combine with other data and create more complex queries.
SELECT
O.id AS obs_id, OCC.`system`, OCC.code, O.status AS status,
OVCC.code AS value_code, O.subject.PatientId AS patient_id
FROM Observation AS O LATERAL VIEW explode(code.coding) AS OCC
LATERAL VIEW explode(O.value.codeableConcept.coding) AS OVCC
WHERE OCC.code LIKE '1255%'
AND OVCC.code LIKE "1256%"
AND YEAR(O.effective.dateTime) = 2010
LIMIT 1;
Sample output:
+---------+-------------------+---------------------------------------+---------+---------------------------------------+-------------+
| obs_id | system | code | status | value_code | patient_id |
+---------+-------------------+---------------------------------------+---------+---------------------------------------+-------------+
| 33553 | http://loinc.org | 1255AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA | final | 1256AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA | 32003 |
+---------+-------------------+---------------------------------------+---------+---------------------------------------+-------------+
CREATE VIEW obs_arv_plan AS
SELECT
O.id AS obs_id, OCC.`system`, OCC.code, O.status AS status,
OVCC.code AS value_code, O.subject.PatientId AS patient_id
FROM Observation AS O LATERAL VIEW explode(code.coding) AS OCC
LATERAL VIEW explode(O.value.codeableConcept.coding) AS OVCC
WHERE OCC.code LIKE '1255%'
AND OVCC.code LIKE "1256%"
AND YEAR(O.effective.dateTime) = 2010;
SELECT COUNT(0) FROM obs_arv_plan ;
Sample output:
+-----------+
| count(0) |
+-----------+
| 2 |
+-----------+
SELECT P.id AS pid, P.name.family AS family, P.gender AS gender, COUNT(0) AS num_start
FROM Patient P, obs_arv_plan
WHERE P.id = obs_arv_plan.patient_id
GROUP BY P.id, P.name.family, P.gender
ORDER BY num_start DESC
LIMIT 10;
Sample output:
+--------+-------------------+---------+------------+
| pid | family | gender | num_start |
+--------+-------------------+---------+------------+
| 20375 | ["VonRueden376"] | male | 1 |
| 32003 | ["Terry864"] | male | 1 |
+--------+-------------------+---------+------------+