-
Notifications
You must be signed in to change notification settings - Fork 0
/
schemas-1-evaluatefhirpath.py
280 lines (232 loc) · 9.77 KB
/
schemas-1-evaluatefhirpath.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
import csv
import fhirpathpy
import json
import os
import pathlib
import pyarrow as pa
import pyarrow.parquet as pq
import random
import sys # DEMO
import yaml
from functools import cache
from pathlib import Path
from phdi.azure import AzureFhirServerCredentialManager
from phdi.fhir import fhir_server_get
from typing import Literal, List, Union, Callable
def load_schema(path: str) -> dict:
"""
Given the path to local YAML files containing a user-defined schema read the file
and return the schema as a dictionary.
:param path: Path specifying the location of a YAML file containing a schema.
:return schema: A user-defined schema
"""
try:
with open(path, "r") as file:
schema = yaml.safe_load(file)
return schema
except FileNotFoundError:
return {}
def apply_selection_criteria(
value: list,
selection_criteria: Literal["first", "last", "random", "all"],
) -> Union[str, List[str]]:
"""
Given a list of values parsed from a FHIR resource, return value(s) according to the
selection criteria. In general a single value is returned, but when
selection_criteria is set to "all" a list containing all of the parsed values is
returned.
:param value: A list containing the values parsed from a FHIR resource.
:param selection_criteria: A string indicating which element(s) of a list to select.
"""
if selection_criteria == "first":
value = value[0]
elif selection_criteria == "last":
value = value[-1]
elif selection_criteria == "random":
value = random.choice(value)
# Temporary hack to ensure no structured data is written using pyarrow.
# Currently Pyarrow does not support mixing non-structured and structured data.
# https://github.com/awslabs/aws-data-wrangler/issues/463
# Will need to consider other methods of writing to parquet if this is an essential
# feature.
if type(value) == dict:
value = json.dumps(value)
elif type(value) == list:
value = ",".join(value)
return value
@cache
def __get_fhirpathpy_parser(fhirpath_expression: str) -> Callable:
"""
Return a fhirpathpy parser for a specific FHIRPath. This cached function minimizes
calls to the relatively expensive :func:`fhirpathpy.compile` function for any given
`fhirpath_expression`
:param fhirpath_expression: The FHIRPath expression to evaluate
"""
return fhirpathpy.compile(fhirpath_expression)
def apply_schema_to_resource(resource: dict, schema: dict) -> dict:
"""
Given a resource and a schema, return a dictionary with values of the data
specified by the schema and associated keys defined by the variable name provided
by the schema.
:param resource: A FHIR resource on which to apply a schema.
:param schema: A schema specifying the desired values by FHIR resource type.
"""
data = {}
resource_schema = schema.get(resource.get("resourceType", ""))
if resource_schema is None:
return data
for field in resource_schema.keys():
path = resource_schema[field]["fhir_path"]
# parse_function = __get_fhirpathpy_parser(path)
# value = parse_function(resource)
value = fhirpathpy.evaluate(resource, path)
if len(value) == 0:
data[resource_schema[field]["new_name"]] = ""
else:
selection_criteria = resource_schema[field]["selection_criteria"]
value = apply_selection_criteria(value, selection_criteria)
data[resource_schema[field]["new_name"]] = str(value)
return data
def make_table(
schema: dict,
output_path: pathlib.Path,
output_format: Literal["parquet"],
fhir_url: str,
cred_manager: AzureFhirServerCredentialManager,
):
"""
Given the schema for a single table, make the table.
:param schema: A schema specifying the desired values by FHIR resource type.
:param output_path: A path specifying where the table should be written.
:param output_format: A string indicating the file format to be used.
:param fhir_url: URL to a FHIR server.
:param cred_manager: Service used to get an access token used to make a
request.
"""
output_path.mkdir(parents=True, exist_ok=True)
for resource_type in schema:
output_file_name = output_path / f"{resource_type}.{output_format}"
# TODO: make _count (and other query parameters) configurable
query = f"/{resource_type}?_count=1000"
url = fhir_url + query
demo_counter = 0 # DEMO
writer = None
next_page = True
while next_page and demo_counter < 5: # DEMO
demo_counter += 1 # DEMO
response = fhir_server_get(url, cred_manager)
if response.status_code != 200:
break
# Load queried data.
query_result = json.loads(response.content)
data = []
# Extract values specified by schema from each resource.
# values_from_resource is a dictionary of the form:
# {field1:value1, field2:value2, ...}.
for resource in query_result["entry"]:
values_from_resource = apply_schema_to_resource(
resource["resource"], schema
)
if values_from_resource != {}:
data.append(values_from_resource)
# Write data to file.
writer = write_schema_table(data, output_file_name, output_format, writer)
# Check for an additional page of query results.
for link in query_result.get("link"):
if link.get("relation") == "next":
url = link.get("url")
break
else:
next_page = False
if writer is not None:
writer.close()
def make_schema_tables(
schema_path: pathlib.Path,
base_output_path: pathlib.Path,
output_format: Literal["parquet"],
fhir_url: str,
cred_manager: AzureFhirServerCredentialManager,
):
"""
Given the url for a FHIR server, the location of a schema file, and and output
directory generate the specified schema and store the tables in the desired
location.
:param schema_path: A path to the location of a YAML schema config file.
:param base_output_path: A path to the directory where tables of the schema should
be written.
:param output_format: Specifies the file format of the tables to be generated.
:param fhir_url: URL to a FHIR server.
:param cred_manager: Service used to get an access token used to make a
request.
"""
schema = load_schema(schema_path)
for table in schema.keys():
output_path = base_output_path / table
make_table(schema[table], output_path, output_format, fhir_url, cred_manager)
def write_schema_table(
data: List[dict],
output_file_name: pathlib.Path,
file_format: Literal["parquet"],
writer: pq.ParquetWriter = None,
):
"""
Write data extracted from the FHIR Server to a file.
:param data: A list of dictionaries specifying the data for each row of a table
where the keys of each dict correspond to the columns, and the values contain the
data for each entry in a row.
:param output_file_name: Full name for the file where the table is to be written.
:param output_format: Specifies the file format of the table to be written.
:param writer: A writer object that can be kept open between calls of this function
to support file formats that cannot be appended to after being written
(e.g. parquet).
"""
if file_format == "parquet":
table = pa.Table.from_pylist(data)
if writer is None:
writer = pq.ParquetWriter(output_file_name, table.schema)
writer.write_table(table=table)
return writer
if file_format == "csv":
keys = data[0].keys()
new_file = False if os.path.isfile(output_file_name) else True
with open(output_file_name, "a", newline="") as output_file:
dict_writer = csv.DictWriter(output_file, keys)
if new_file:
dict_writer.writeheader()
dict_writer.writerows(data)
def print_schema_summary(
schema_directory: pathlib.Path,
display_head: bool = False,
):
"""
Given a directory containing tables of the specified file format, print a summary of
each table.
:param schema_directory: Path specifying location of schema tables.
:param display_head: Print the head of each table when true. Note depending on the
file format this may require reading large amounts of data into memory.
"""
for (directory_path, _, file_names) in os.walk(schema_directory):
for file_name in file_names:
if file_name.endswith("parquet"):
# Read metadata from parquet file without loading the actual data.
parquet_file = pq.ParquetFile(Path(directory_path) / file_name)
print(parquet_file.metadata)
# Read data from parquet and convert to pandas data frame.
if display_head is True:
parquet_table = pq.read_table(Path(directory_path) / file_name)
df = parquet_table.to_pandas()
print(df.head())
print(df.info())
if file_name.endswith("csv"):
with open(file_name, "r") as csv_file:
reader = csv.reader(csv_file, dialect="excel")
print(next(reader))
return "hi"
# demo
if __name__ == "__main__":
schema_path = Path(sys.argv[1])
output_path = Path(sys.argv[2])
output_format = sys.argv[3]
fhir_url = sys.argv[4]
cred_manager = AzureFhirServerCredentialManager(fhir_url=fhir_url)
make_schema_tables(schema_path, output_path, output_format, fhir_url, cred_manager)