Skip to content

Commit

Permalink
Add ability to specify modification value type in CopyIncrementally (#53
Browse files Browse the repository at this point in the history
)

Some databases do not automatically convert from string to the actual value: e.g. `timestamp_column >= '2020-01-01'` fails. This would break incremental loading as there we generated this kind of comparison. The new modification_comparison_type argument to `CopyIncrementally()` will allow to alter this to `timestamp_column >= TIMESTAMP '2020-01-01'`.

Also fixup the docstring for `comparison_value_placeholder`argument which should actually be named `comparison_placeholder`and now at least the docstring reads like that.
  • Loading branch information
jankatins authored Mar 4, 2021
1 parent 434b935 commit 66e7dc1
Showing 1 changed file with 9 additions and 3 deletions.
12 changes: 9 additions & 3 deletions mara_pipelines/commands/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,8 @@ def __init__(self, source_db_alias: str, source_table: str,
sql_file_name: Union[str, Callable] = None, sql_statement: Union[str, Callable] = None,
target_db_alias: str = None, timezone: str = None, replace: {str: str} = None,
use_explicit_upsert: bool = False,
csv_format: bool = None, delimiter_char: str = None) -> None:
csv_format: bool = None, delimiter_char: str = None,
modification_comparison_type: str = None) -> None:
"""
Incrementally loads data from one database into another.
Expand All @@ -233,7 +234,9 @@ def __init__(self, source_db_alias: str, source_table: str,
sql_file_name: The path of a file name that is run to query the source database
replace: A set of replacements to perform against the sql query
modification_comparison: SQL expression that evaluates to a comparable value
comparison_value_placeholder: A placeholder that is replaced with the last comparison value in the sql query
modification_comparison_type: type of the saved (as string) modification_comparison value
comparison_value_placeholder: A placeholder in the sql code that gets replaced with the
actual incremental load comparison or `1=1`.
target_db_alias: The database to write to
target_table: The table for loading data into
primary_keys: A combination of primary key columns that are used for upserting into the target table
Expand All @@ -244,6 +247,7 @@ def __init__(self, source_db_alias: str, source_table: str,
self.source_db_alias = source_db_alias
self.source_table = source_table
self.modification_comparison = modification_comparison
self.modification_comparison_type = modification_comparison_type
self.comparison_value_placeholder = comparison_value_placeholder

self._target_db_alias = target_db_alias
Expand Down Expand Up @@ -326,8 +330,9 @@ def run(self) -> bool:

# perform the actual copy replacing the placeholder
# with the comparison value from the latest successful execution
modification_comparison_type = self.modification_comparison_type or ''
replace = {self.comparison_value_placeholder:
f'({self.modification_comparison} >= \'{last_comparison_value}\')'}
f'({self.modification_comparison} >= {modification_comparison_type} \'{last_comparison_value}\')'}
if not shell.run_shell_command(self._copy_command(self.target_table + '_upsert', replace)):
return False

Expand Down Expand Up @@ -393,6 +398,7 @@ def html_doc_items(self) -> [(str, str)]:
('modification comparison', _.tt[self.modification_comparison])] \
+ _SQLCommand.html_doc_items(self, self.source_db_alias) \
+ [('comparison value placeholder', _.tt[self.comparison_value_placeholder]),
('modification comparison type', _.tt[self.modification_comparison_type if self.modification_comparison_type else '(no cast)']),
('target db', _.tt[self.target_db_alias]),
('target table', _.tt[self.target_table]),
('primary_keys', _.tt[repr(self.primary_keys)]),
Expand Down

0 comments on commit 66e7dc1

Please # to comment.