From 5a18a2858ec270bbff03649d59cc549765674ef9 Mon Sep 17 00:00:00 2001 From: Sam Novod Date: Wed, 8 Jan 2025 11:21:40 -0500 Subject: [PATCH 1/2] Adding option ignore_existing_schema_mismatch to ingest --- python/copy_dataset.py | 2 +- .../gcp_workspace_table_to_dataset_ingest.py | 18 +++++++- python/utils/tdr_utils/tdr_table_utils.py | 46 ++++++++++++------- 3 files changed, 47 insertions(+), 19 deletions(-) diff --git a/python/copy_dataset.py b/python/copy_dataset.py index 42dc6775..774ed46c 100644 --- a/python/copy_dataset.py +++ b/python/copy_dataset.py @@ -27,7 +27,7 @@ def get_args() -> Namespace: help=f"Batch size for ingest. Default to {ARG_DEFAULTS['batch_size']}", default=ARG_DEFAULTS['batch_size'], type=int ) - parser.add_argument("--update_strategy", choices=["REPLACE", "APPEND", "UPDATE"], default="REPLACE") + parser.add_argument("--update_strategy", choices=["REPLACE", "APPEND", "MERGE"], default="REPLACE") parser.add_argument( "--new_dataset_name", "-nd", required=True, help="Cannot be named the same as original dataset" diff --git a/python/gcp_workspace_table_to_dataset_ingest.py b/python/gcp_workspace_table_to_dataset_ingest.py index e7d9518e..41f28af4 100644 --- a/python/gcp_workspace_table_to_dataset_ingest.py +++ b/python/gcp_workspace_table_to_dataset_ingest.py @@ -107,6 +107,12 @@ def get_args() -> argparse.Namespace: help="If used, will attempt to soft-delete all TDR tables in the target dataset that correspond to the Terra " "tables that were marked for ingest", ) + parser.add_argument( + "--ignore_existing_schema_mismatch", + action="store_true", + help="If used, will ignore schema mismatch between Terra and existing TDR tables and attempt to ingest anyway " + "and force Terra data to match TDR schema", + ) return parser.parse_args() @@ -128,6 +134,7 @@ def get_args() -> argparse.Namespace: all_fields_non_required = args.all_fields_non_required force_disparate_rows_to_string = args.force_disparate_rows_to_string trunc_and_reload = args.trunc_and_reload + ignore_existing_schema_mismatch = args.ignore_existing_schema_mismatch # Initialize the Terra and TDR classes token = Token(cloud=CLOUD_TYPE) @@ -199,6 +206,7 @@ def get_args() -> argparse.Namespace: table_info_dict=table_info_dict, all_fields_non_required=all_fields_non_required, force_disparate_rows_to_string=force_disparate_rows_to_string, + ignore_existing_schema_mismatch=ignore_existing_schema_mismatch ).run() if trunc_and_reload: @@ -227,6 +235,13 @@ def get_args() -> argparse.Namespace: else: file_uuids_dict = None + # Only use this to make Terra data match what TDR schema is set to if ignore_schema_mismatch is used + # This will make it so it will try to force the Terra data to match the existing TDR schema data types + if ignore_existing_schema_mismatch: + tdr_table_schema_info = tdr.get_table_schema_info(dataset_id=dataset_id, table_name=target_table_name) + else: + tdr_table_schema_info = None + BatchIngest( ingest_metadata=filtered_metrics, tdr=tdr, @@ -239,5 +254,6 @@ def get_args() -> argparse.Namespace: waiting_time_to_poll=ARG_DEFAULTS['waiting_time_to_poll'], test_ingest=TEST_INGEST, load_tag=f"{billing_project}_{workspace_name}-{dataset_id}", - file_to_uuid_dict=file_uuids_dict + file_to_uuid_dict=file_uuids_dict, + schema_info=tdr_table_schema_info ).run() diff --git a/python/utils/tdr_utils/tdr_table_utils.py b/python/utils/tdr_utils/tdr_table_utils.py index 3c171181..994a53ba 100644 --- a/python/utils/tdr_utils/tdr_table_utils.py +++ b/python/utils/tdr_utils/tdr_table_utils.py @@ -23,6 +23,7 @@ def __init__( table_info_dict: dict, all_fields_non_required: bool = False, force_disparate_rows_to_string: bool = False, + ignore_existing_schema_mismatch: bool = False ): """ Initialize the SetUpTDRTables class. @@ -32,12 +33,15 @@ def __init__( dataset_id (str): The ID of the dataset. table_info_dict (dict): A dictionary containing table information. all_fields_non_required (bool): A boolean indicating whether all columns are non-required. + force_disparate_rows_to_string (bool): A boolean indicating whether to not fail on data type not + matching existing schema. """ self.tdr = tdr self.dataset_id = dataset_id self.table_info_dict = table_info_dict self.all_fields_non_required = all_fields_non_required self.force_disparate_rows_to_string = force_disparate_rows_to_string + self.ignore_existing_schema_mismatch = ignore_existing_schema_mismatch @staticmethod def _compare_table(reference_dataset_table: dict, target_dataset_table: list[dict], table_name: str) -> list[dict]: @@ -66,6 +70,11 @@ def _compare_table(reference_dataset_table: dict, target_dataset_table: list[dic # Check if column exists but is not set up the same if column_dict != target_dataset_table_dict[column_dict["name"]]: column_dict["action"] = "modify" + logging.warning( + f'Column {column_dict["name"]} in table {table_name} does not match. Expected column info:\n' + f'{json.dumps(column_dict, indent=4)}\nexisting column info:\n' + f'{json.dumps(target_dataset_table_dict[column_dict["name"]], indent=4)}' + ) columns_to_update.append(column_dict) return columns_to_update @@ -136,10 +145,7 @@ def run(self) -> dict: # If any updates needed nothing is done for whole ingest valid = False for column_to_update_dict in columns_to_update: - logging.warning( - f"Columns needs updates in {ingest_table_name}: " - f"{json.dumps(column_to_update_dict, indent=4)}" - ) + logging.warning(f"Column {column_to_update_dict['name']} needs updates in {ingest_table_name}") else: logging.info(f"Table {ingest_table_name} exists and is up to date") if valid: @@ -156,17 +162,23 @@ def run(self) -> dict: ) else: logging.info("All tables in dataset exist and are up to date") - # Return schema info for all existing tables after creation - data_set_info = self.tdr.get_dataset_info(dataset_id=self.dataset_id, info_to_include=["SCHEMA"]) - # Return dict with key being table name and value being dict of columns with key being - # column name and value being column info - return { - table_dict["name"]: { - column_dict["name"]: column_dict - for column_dict in table_dict["columns"] - } - for table_dict in data_set_info["schema"]["tables"] - } else: - logging.error("Tables need manual updating. Exiting") - sys.exit(1) + logging.warning("Tables do not appear to be valid") + if self.ignore_existing_schema_mismatch: + logging.warning("Ignoring schema mismatch because ignore_existing_schema_mismatch was used") + else: + logging.error( + "Tables need manual updating. If want to force through use ignore_existing_schema_mismatch." + ) + sys.exit(1) + # Return schema info for all existing tables after creation + data_set_info = self.tdr.get_dataset_info(dataset_id=self.dataset_id, info_to_include=["SCHEMA"]) + # Return dict with key being table name and value being dict of columns with key being + # column name and value being column info + return { + table_dict["name"]: { + column_dict["name"]: column_dict + for column_dict in table_dict["columns"] + } + for table_dict in data_set_info["schema"]["tables"] + } From e77f8d9496e28c462b5855de4e10a6ea20d1d1ac Mon Sep 17 00:00:00 2001 From: Sam Novod Date: Wed, 8 Jan 2025 11:23:35 -0500 Subject: [PATCH 2/2] Updating docs for method --- python/utils/tdr_utils/tdr_table_utils.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/python/utils/tdr_utils/tdr_table_utils.py b/python/utils/tdr_utils/tdr_table_utils.py index 994a53ba..85f3a084 100644 --- a/python/utils/tdr_utils/tdr_table_utils.py +++ b/python/utils/tdr_utils/tdr_table_utils.py @@ -33,7 +33,9 @@ def __init__( dataset_id (str): The ID of the dataset. table_info_dict (dict): A dictionary containing table information. all_fields_non_required (bool): A boolean indicating whether all columns are non-required. - force_disparate_rows_to_string (bool): A boolean indicating whether to not fail on data type not + force_disparate_rows_to_string (bool): A boolean indicating whether disparate rows should be forced to + string. + ignore_existing_schema_mismatch (bool): A boolean indicating whether to not fail on data type not matching existing schema. """ self.tdr = tdr