Skip to content

Commit

Permalink
Code linting
Browse files Browse the repository at this point in the history
  • Loading branch information
Vicbi committed Mar 24, 2024
1 parent 2c8c5e3 commit c48cf7b
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 95 deletions.
17 changes: 8 additions & 9 deletions data_access/firebase_FHIR_data_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,21 @@
# Standard library imports
import json
import os
from typing import Any, Dict, List, Tuple, Optional
from typing import Tuple, Optional

# Related third-party imports
import pandas as pd
from firebase_admin import credentials, firestore
import firebase_admin
from google.cloud.firestore_v1.base_query import FieldFilter, Or
from google.cloud.firestore_v1.base_query import FieldFilter

# Local application/library specific imports
from fhir.resources.observation import Observation


class EnhancedObservation:
def __init__(self, observation: Observation, UserId: str = None):
def __init__(self, observation: Observation, user_id: str = None):
self.observation = observation
self.UserId = UserId
self.user_id = user_id


class FirebaseFHIRAccess:
Expand Down Expand Up @@ -53,8 +52,8 @@ def fetch_data(
self,
collection_name: str = "users",
subcollection_name: str = "HealthKit",
loinc_codes: Optional[List[str]] = None,
) -> List[EnhancedObservation]:
loinc_codes: Optional[list[str]] = None,
) -> list[EnhancedObservation]:
resources = []
users = self.db.collection(collection_name).stream()

Expand Down Expand Up @@ -84,7 +83,7 @@ def fetch_data(
observation_str = json.dumps(doc.to_dict())
fhir_obj = Observation.parse_raw(observation_str)
enhanced_fhir_obj = EnhancedObservation(
observation=fhir_obj, UserId=user.id
observation=fhir_obj, user_id=user.id
)
resources.append(enhanced_fhir_obj)
else:
Expand All @@ -93,7 +92,7 @@ def fetch_data(
observation_str = json.dumps(doc.to_dict())
fhir_obj = Observation.parse_raw(observation_str)
enhanced_fhir_obj = EnhancedObservation(
observation=fhir_obj, UserId=user.id
observation=fhir_obj, user_id=user.id
)
resources.append(enhanced_fhir_obj)

Expand Down
96 changes: 42 additions & 54 deletions data_analysis/data_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@
# SPDX-License-Identifier: MIT
#

# Standard library imports
from datetime import datetime

# Related third-party imports
from typing import Any
import pandas as pd
import numpy as np

Expand All @@ -35,11 +33,9 @@ def __init__(self):
"9052-2": (0, 2700), # Dietary energy consumed in calories
}

def process_FHIR_data(
self, flattened_FHIRDataFrame: FHIRDataFrame
) -> FHIRDataFrame:
self.validate_columns(flattened_FHIRDataFrame)
flattened_fhir_df = flattened_FHIRDataFrame.df
def process_fhir_data(self, flattened_fhir_df: FHIRDataFrame) -> FHIRDataFrame:
self.validate_columns(flattened_fhir_df)
flattened_fhir_df = flattened_fhir_df.df

# Normalize 'EffectiveDateTime' to date only
flattened_fhir_df["EffectiveDateTime"] = pd.to_datetime(
Expand All @@ -49,22 +45,22 @@ def process_FHIR_data(
processed_fhir_df = pd.DataFrame()

for (
userId,
effectiveDateTime,
loincCode,
user_id,
effective_dat_time,
loinc_code,
), group_df in flattened_fhir_df.groupby(
["UserId", "EffectiveDateTime", "LoincCode"]
):
# Filter outliers for the group based on LOINC code-specific ranges
group_FHIRDataFrame = FHIRDataFrame(
group_df, flattened_FHIRDataFrame.resource_type
group_df, flattened_fhir_df.resource_type
)
filtered_group_FHIRDataFrame = self.filter_outliers(
group_FHIRDataFrame, self.default_value_ranges.get(loincCode)
group_FHIRDataFrame, self.default_value_ranges.get(loinc_code)
)

# Determine the processing function based on the LOINC code
process_function = self.code_to_function.get(loincCode)
process_function = self.code_to_function.get(loinc_code)
if process_function and filtered_group_FHIRDataFrame.df is not None:
# Apply the processing function to the filtered group
processed_group_FHIRDataFrame = process_function(
Expand All @@ -76,7 +72,7 @@ def process_FHIR_data(
)

processed_FHIRDataFrame = FHIRDataFrame(
processed_fhir_df, flattened_FHIRDataFrame.resource_type
processed_fhir_df, flattened_fhir_df.resource_type
)

return processed_FHIRDataFrame
Expand Down Expand Up @@ -134,19 +130,17 @@ def _finalize_group(
lambda x: f"{prefix} {x}"
)

# Optionally, update 'Display' or other fields as needed
# final_df['Display'] = final_df['Display'].apply(lambda x: f"Updated display info based on {prefix}")

return final_df

def filter_outliers(
self, flattened_FHIRDataFrame: FHIRDataFrame, value_range=None
self: Self @ FHIRDataProcessor,
flattened_fhir_df: FHIRDataFrame,
value_range: Any | None = None,
) -> FHIRDataFrame:
self.validate_columns(flattened_FHIRDataFrame)
self.validate_columns(flattened_fhir_df)

"""Filters outliers from the FHIRDataFrame."""
if value_range is None:
loinc_code = flattened_FHIRDataFrame.df["LoincCode"].iloc[
loinc_code = flattened_fhir_df.df["LoincCode"].iloc[
0
] # Assumes uniform LoincCode within the DataFrame
value_range = self.default_value_ranges.get(loinc_code)
Expand All @@ -156,15 +150,15 @@ def filter_outliers(
)

lower_bound, upper_bound = value_range
filtered_df = flattened_FHIRDataFrame.df[
(flattened_FHIRDataFrame.df["QuantityValue"] >= lower_bound)
& (flattened_FHIRDataFrame.df["QuantityValue"] <= upper_bound)
filtered_df = flattened_fhir_df.df[
(flattened_fhir_df.df["QuantityValue"] >= lower_bound)
& (flattened_fhir_df.df["QuantityValue"] <= upper_bound)
]
return FHIRDataFrame(filtered_df, flattened_FHIRDataFrame.resource_type)
return FHIRDataFrame(filtered_df, flattened_fhir_df.resource_type)

def validate_columns(self, flattened_FHIRDataFrame: FHIRDataFrame) -> None:
def validate_columns(self: Any, flattened_fhir_df: FHIRDataFrame) -> None:

if flattened_FHIRDataFrame.resource_type == "Observation":
if flattened_fhir_df.resource_type == "Observation":
REQUIRED_COLUMNS = [
"UserId",
"EffectiveDateTime",
Expand All @@ -177,61 +171,55 @@ def validate_columns(self, flattened_FHIRDataFrame: FHIRDataFrame) -> None:
]

missing_columns = [
col
for col in REQUIRED_COLUMNS
if col not in flattened_FHIRDataFrame.df.columns
col for col in REQUIRED_COLUMNS if col not in flattened_fhir_df.df.columns
]
if missing_columns:
raise ValueError(
f"The DataFrame is missing required columns: {missing_columns}"
)

def select_data_by_user(
self, flattened_FHIRDataFrame: FHIRDataFrame, user_id: str
self: Self @ FHIRDataProcessor, flattened_fhir_df: FHIRDataFrame, user_id: str
) -> FHIRDataFrame:
self.validate_columns(flattened_FHIRDataFrame)
self.validate_columns(flattened_fhir_df)

user_df = flattened_FHIRDataFrame.df[
flattened_FHIRDataFrame.df["UserId"] == user_id
]
user_df = flattened_fhir_df.df[flattened_fhir_df.df["UserId"] == user_id]
return FHIRDataFrame(
user_df.reset_index(drop=True), flattened_FHIRDataFrame.resource_type
user_df.reset_index(drop=True), flattened_fhir_df.resource_type
)

def select_data_by_dates(
self, flattened_FHIRDataFrame: FHIRDataFrame, start_date: str, end_date: str
self, flattened_fhir_df: FHIRDataFrame, start_date: str, end_date: str
) -> FHIRDataFrame:
"""Selects data within a specific date range within a DataFrame."""
self.validate_columns(flattened_FHIRDataFrame)
self.validate_columns(flattened_fhir_df)

start_datetime = pd.to_datetime(start_date).tz_localize(None)
end_datetime = pd.to_datetime(end_date).tz_localize(None)

flattened_FHIRDataFrame.df["EffectiveDateTime"] = pd.to_datetime(
flattened_FHIRDataFrame.df["EffectiveDateTime"]
flattened_fhir_df.df["EffectiveDateTime"] = pd.to_datetime(
flattened_fhir_df.df["EffectiveDateTime"]
).dt.tz_localize(None)

filtered_df = flattened_FHIRDataFrame.df[
(df.df["EffectiveDateTime"] >= start_datetime)
& (df.df["EffectiveDateTime"] <= end_datetime)
filtered_df = flattened_fhir_df.df[
(flattened_fhir_df.df["EffectiveDateTime"] >= start_datetime)
& (flattened_fhir_df.df["EffectiveDateTime"] <= end_datetime)
]

return FHIRDataFrame(
filtered_df.reset_index(drop=True), flattened_FHIRDataFrame.resource_type
filtered_df.reset_index(drop=True), flattened_fhir_df.resource_type
)

def calculate_moving_average(
self, flattened_FHIRDataFrame: FHIRDataFrame, n=7
self: Any, flattened_fhir_df: FHIRDataFrame, n=7
) -> FHIRDataFrame:
self.validate_columns(flattened_FHIRDataFrame)
self.validate_columns(flattened_fhir_df)

flattened_FHIRDataFrame.df["EffectiveDateTime"] = pd.to_datetime(
flattened_FHIRDataFrame.df["EffectiveDateTime"]
flattened_fhir_df.df["EffectiveDateTime"] = pd.to_datetime(
flattened_fhir_df.df["EffectiveDateTime"]
).dt.date

moving_avg_df = flattened_FHIRDataFrame.df.groupby(
["UserId", "LoincCode"]
).apply(
moving_avg_df = flattened_fhir_df.df.groupby(["UserId", "LoincCode"]).apply(
lambda x: x.sort_values("EffectiveDateTime")
.rolling(window=n, on="EffectiveDateTime")["QuantityValue"]
.mean()
Expand All @@ -240,7 +228,7 @@ def calculate_moving_average(

moving_avg_df = moving_avg_df.reset_index()
result_df = pd.merge(
flattened_FHIRDataFrame.df,
flattened_fhir_df.df,
moving_avg_df,
on=["UserId", "LoincCode", "EffectiveDateTime"],
suffixes=("", "_moving_avg"),
Expand All @@ -249,4 +237,4 @@ def calculate_moving_average(
columns={"QuantityValue_moving_avg": "QuantityValue"}, inplace=True
)

return FHIRDataFrame(result_df, flattened_FHIRDataFrame.resource_type)
return FHIRDataFrame(result_df, flattened_fhir_df.resource_type)
9 changes: 4 additions & 5 deletions data_export/data_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@
#

# Local application/library specific imports
from data_flattening.FHIR_data_flattener import *
from utils.helpers import *
from data_visualization.data_visualizer import *
from data_flattening.FHIR_data_flattener import FHIRDataFrame
from data_visualization.data_visualizer import DataVisualizer


class DataExporter(DataVisualizer):
Expand All @@ -31,12 +30,12 @@ def export_to_csv(self, filename):
def create_and_save_plot(self, filename):
"""Generates a plot using inherited create_static_plot method and saves it."""
try:
if self.user_ids == None or len(self.user_ids) > 1:
if self.user_ids is None or len(self.user_ids) > 1:
print("Select a single user for enabling figure saving.")
else:
fig = super().create_static_plot(self.flattened_FHIRDataFrame)
fig.savefig(filename, dpi=self.dpi)
print("Plot saved successfully.")

except Exception as e:
except (TypeError, ValueError) as e:
print(f"An error occurred while generating the plot: {e}")
8 changes: 2 additions & 6 deletions data_flattening/FHIR_data_flattener.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,6 @@
# SPDX-License-Identifier: MIT
#

# Standard library imports
from datetime import datetime
from typing import List, Dict, Any

# Related third-party imports
import pandas as pd

Expand All @@ -35,7 +31,7 @@ def df(self) -> pd.DataFrame:
return self.data_frame


def flatten_FHIR_resources(FHIR_resources: List[EnhancedObservation]) -> FHIRDataFrame:
def flatten_FHIR_resources(FHIR_resources: list[EnhancedObservation]) -> FHIRDataFrame:
flattened_data = []

for FHIR_obj in FHIR_resources:
Expand All @@ -58,7 +54,7 @@ def flatten_FHIR_resources(FHIR_resources: List[EnhancedObservation]) -> FHIRDat
)

flattened_entry = {
"UserId": FHIR_obj.UserId,
"UserId": FHIR_obj.user_id,
"DocumentId": FHIR_obj.observation.dict()["id"],
"EffectiveDateTime": effective_datetime if effective_datetime else None,
"QuantityName": quantity_name,
Expand Down
12 changes: 5 additions & 7 deletions data_visualization/data_visualizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,14 @@
#
# Standard library imports
from datetime import datetime, date
from typing import List, Optional
from typing import Any, List, Optional

# Related third-party imports
import matplotlib.pyplot as plt
import pandas as pd

# Local application/library specific imports
from data_analysis.data_analyzer import *
from data_flattening.FHIR_data_flattener import *
from utils.helpers import *
from data_analysis.data_analyzer import FHIRDataProcessor
from data_flattening.FHIR_data_flattener import FHIRDataFrame


class DataVisualizer(FHIRDataProcessor):
Expand Down Expand Up @@ -48,12 +46,12 @@ def set_dpi(self, dpi: float):
self.dpi = dpi

def create_static_plot(
self, flattened_FHIRDataFrame: FHIRDataFrame
self: Any, flattened_FHIRDataFrame: FHIRDataFrame
) -> Optional[plt.Figure]:
if not isinstance(
flattened_FHIRDataFrame.df["EffectiveDateTime"].iloc[0], date
):
print(f"The date type should be date.")
print("The date type should be of type date.")
return

if flattened_FHIRDataFrame.df["LoincCode"].nunique() != 1:
Expand Down
1 change: 1 addition & 0 deletions env
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
PYTHONPATH=/Users/vicky/Desktop/Work/Stanford/Projects/Data pipeline project/Code/Spezi Data Pipeline Template
Empty file removed utils/__init__.py
Empty file.
14 changes: 0 additions & 14 deletions utils/helpers.py

This file was deleted.

0 comments on commit c48cf7b

Please # to comment.