From 66e7dc1c79af056e2693e9155cf8b7e3cad528fa Mon Sep 17 00:00:00 2001 From: Jan Katins Date: Thu, 4 Mar 2021 22:06:01 +0100 Subject: [PATCH] Add ability to specify modification value type in CopyIncrementally (#53) Some databases do not automatically convert from string to the actual value: e.g. `timestamp_column >= '2020-01-01'` fails. This would break incremental loading as there we generated this kind of comparison. The new modification_comparison_type argument to `CopyIncrementally()` will allow to alter this to `timestamp_column >= TIMESTAMP '2020-01-01'`. Also fixup the docstring for `comparison_value_placeholder`argument which should actually be named `comparison_placeholder`and now at least the docstring reads like that. --- mara_pipelines/commands/sql.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/mara_pipelines/commands/sql.py b/mara_pipelines/commands/sql.py index d1ad020..f00dade 100644 --- a/mara_pipelines/commands/sql.py +++ b/mara_pipelines/commands/sql.py @@ -216,7 +216,8 @@ def __init__(self, source_db_alias: str, source_table: str, sql_file_name: Union[str, Callable] = None, sql_statement: Union[str, Callable] = None, target_db_alias: str = None, timezone: str = None, replace: {str: str} = None, use_explicit_upsert: bool = False, - csv_format: bool = None, delimiter_char: str = None) -> None: + csv_format: bool = None, delimiter_char: str = None, + modification_comparison_type: str = None) -> None: """ Incrementally loads data from one database into another. @@ -233,7 +234,9 @@ def __init__(self, source_db_alias: str, source_table: str, sql_file_name: The path of a file name that is run to query the source database replace: A set of replacements to perform against the sql query modification_comparison: SQL expression that evaluates to a comparable value - comparison_value_placeholder: A placeholder that is replaced with the last comparison value in the sql query + modification_comparison_type: type of the saved (as string) modification_comparison value + comparison_value_placeholder: A placeholder in the sql code that gets replaced with the + actual incremental load comparison or `1=1`. target_db_alias: The database to write to target_table: The table for loading data into primary_keys: A combination of primary key columns that are used for upserting into the target table @@ -244,6 +247,7 @@ def __init__(self, source_db_alias: str, source_table: str, self.source_db_alias = source_db_alias self.source_table = source_table self.modification_comparison = modification_comparison + self.modification_comparison_type = modification_comparison_type self.comparison_value_placeholder = comparison_value_placeholder self._target_db_alias = target_db_alias @@ -326,8 +330,9 @@ def run(self) -> bool: # perform the actual copy replacing the placeholder # with the comparison value from the latest successful execution + modification_comparison_type = self.modification_comparison_type or '' replace = {self.comparison_value_placeholder: - f'({self.modification_comparison} >= \'{last_comparison_value}\')'} + f'({self.modification_comparison} >= {modification_comparison_type} \'{last_comparison_value}\')'} if not shell.run_shell_command(self._copy_command(self.target_table + '_upsert', replace)): return False @@ -393,6 +398,7 @@ def html_doc_items(self) -> [(str, str)]: ('modification comparison', _.tt[self.modification_comparison])] \ + _SQLCommand.html_doc_items(self, self.source_db_alias) \ + [('comparison value placeholder', _.tt[self.comparison_value_placeholder]), + ('modification comparison type', _.tt[self.modification_comparison_type if self.modification_comparison_type else '(no cast)']), ('target db', _.tt[self.target_db_alias]), ('target table', _.tt[self.target_table]), ('primary_keys', _.tt[repr(self.primary_keys)]),