Skip to content

Commit

Permalink
Merge pull request #189 from broadinstitute/sn_POD-2360_force_data_to…
Browse files Browse the repository at this point in the history
…_match_schema

Adding option ignore_existing_schema_mismatch to ingest
  • Loading branch information
snovod authored Jan 8, 2025
2 parents 66feadf + e77f8d9 commit 552bd99
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 19 deletions.
2 changes: 1 addition & 1 deletion python/copy_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def get_args() -> Namespace:
help=f"Batch size for ingest. Default to {ARG_DEFAULTS['batch_size']}",
default=ARG_DEFAULTS['batch_size'], type=int
)
parser.add_argument("--update_strategy", choices=["REPLACE", "APPEND", "UPDATE"], default="REPLACE")
parser.add_argument("--update_strategy", choices=["REPLACE", "APPEND", "MERGE"], default="REPLACE")
parser.add_argument(
"--new_dataset_name", "-nd", required=True,
help="Cannot be named the same as original dataset"
Expand Down
18 changes: 17 additions & 1 deletion python/gcp_workspace_table_to_dataset_ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,12 @@ def get_args() -> argparse.Namespace:
help="If used, will attempt to soft-delete all TDR tables in the target dataset that correspond to the Terra "
"tables that were marked for ingest",
)
parser.add_argument(
"--ignore_existing_schema_mismatch",
action="store_true",
help="If used, will ignore schema mismatch between Terra and existing TDR tables and attempt to ingest anyway "
"and force Terra data to match TDR schema",
)

return parser.parse_args()

Expand All @@ -128,6 +134,7 @@ def get_args() -> argparse.Namespace:
all_fields_non_required = args.all_fields_non_required
force_disparate_rows_to_string = args.force_disparate_rows_to_string
trunc_and_reload = args.trunc_and_reload
ignore_existing_schema_mismatch = args.ignore_existing_schema_mismatch

# Initialize the Terra and TDR classes
token = Token(cloud=CLOUD_TYPE)
Expand Down Expand Up @@ -199,6 +206,7 @@ def get_args() -> argparse.Namespace:
table_info_dict=table_info_dict,
all_fields_non_required=all_fields_non_required,
force_disparate_rows_to_string=force_disparate_rows_to_string,
ignore_existing_schema_mismatch=ignore_existing_schema_mismatch
).run()

if trunc_and_reload:
Expand Down Expand Up @@ -227,6 +235,13 @@ def get_args() -> argparse.Namespace:
else:
file_uuids_dict = None

# Only use this to make Terra data match what TDR schema is set to if ignore_schema_mismatch is used
# This will make it so it will try to force the Terra data to match the existing TDR schema data types
if ignore_existing_schema_mismatch:
tdr_table_schema_info = tdr.get_table_schema_info(dataset_id=dataset_id, table_name=target_table_name)
else:
tdr_table_schema_info = None

BatchIngest(
ingest_metadata=filtered_metrics,
tdr=tdr,
Expand All @@ -239,5 +254,6 @@ def get_args() -> argparse.Namespace:
waiting_time_to_poll=ARG_DEFAULTS['waiting_time_to_poll'],
test_ingest=TEST_INGEST,
load_tag=f"{billing_project}_{workspace_name}-{dataset_id}",
file_to_uuid_dict=file_uuids_dict
file_to_uuid_dict=file_uuids_dict,
schema_info=tdr_table_schema_info
).run()
48 changes: 31 additions & 17 deletions python/utils/tdr_utils/tdr_table_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def __init__(
table_info_dict: dict,
all_fields_non_required: bool = False,
force_disparate_rows_to_string: bool = False,
ignore_existing_schema_mismatch: bool = False
):
"""
Initialize the SetUpTDRTables class.
Expand All @@ -32,12 +33,17 @@ def __init__(
dataset_id (str): The ID of the dataset.
table_info_dict (dict): A dictionary containing table information.
all_fields_non_required (bool): A boolean indicating whether all columns are non-required.
force_disparate_rows_to_string (bool): A boolean indicating whether disparate rows should be forced to
string.
ignore_existing_schema_mismatch (bool): A boolean indicating whether to not fail on data type not
matching existing schema.
"""
self.tdr = tdr
self.dataset_id = dataset_id
self.table_info_dict = table_info_dict
self.all_fields_non_required = all_fields_non_required
self.force_disparate_rows_to_string = force_disparate_rows_to_string
self.ignore_existing_schema_mismatch = ignore_existing_schema_mismatch

@staticmethod
def _compare_table(reference_dataset_table: dict, target_dataset_table: list[dict], table_name: str) -> list[dict]:
Expand Down Expand Up @@ -66,6 +72,11 @@ def _compare_table(reference_dataset_table: dict, target_dataset_table: list[dic
# Check if column exists but is not set up the same
if column_dict != target_dataset_table_dict[column_dict["name"]]:
column_dict["action"] = "modify"
logging.warning(
f'Column {column_dict["name"]} in table {table_name} does not match. Expected column info:\n'
f'{json.dumps(column_dict, indent=4)}\nexisting column info:\n'
f'{json.dumps(target_dataset_table_dict[column_dict["name"]], indent=4)}'
)
columns_to_update.append(column_dict)
return columns_to_update

Expand Down Expand Up @@ -136,10 +147,7 @@ def run(self) -> dict:
# If any updates needed nothing is done for whole ingest
valid = False
for column_to_update_dict in columns_to_update:
logging.warning(
f"Columns needs updates in {ingest_table_name}: "
f"{json.dumps(column_to_update_dict, indent=4)}"
)
logging.warning(f"Column {column_to_update_dict['name']} needs updates in {ingest_table_name}")
else:
logging.info(f"Table {ingest_table_name} exists and is up to date")
if valid:
Expand All @@ -156,17 +164,23 @@ def run(self) -> dict:
)
else:
logging.info("All tables in dataset exist and are up to date")
# Return schema info for all existing tables after creation
data_set_info = self.tdr.get_dataset_info(dataset_id=self.dataset_id, info_to_include=["SCHEMA"])
# Return dict with key being table name and value being dict of columns with key being
# column name and value being column info
return {
table_dict["name"]: {
column_dict["name"]: column_dict
for column_dict in table_dict["columns"]
}
for table_dict in data_set_info["schema"]["tables"]
}
else:
logging.error("Tables need manual updating. Exiting")
sys.exit(1)
logging.warning("Tables do not appear to be valid")
if self.ignore_existing_schema_mismatch:
logging.warning("Ignoring schema mismatch because ignore_existing_schema_mismatch was used")
else:
logging.error(
"Tables need manual updating. If want to force through use ignore_existing_schema_mismatch."
)
sys.exit(1)
# Return schema info for all existing tables after creation
data_set_info = self.tdr.get_dataset_info(dataset_id=self.dataset_id, info_to_include=["SCHEMA"])
# Return dict with key being table name and value being dict of columns with key being
# column name and value being column info
return {
table_dict["name"]: {
column_dict["name"]: column_dict
for column_dict in table_dict["columns"]
}
for table_dict in data_set_info["schema"]["tables"]
}

0 comments on commit 552bd99

Please # to comment.