diff --git a/data_integration/commands/sql.py b/data_integration/commands/sql.py index f00dcae..bda4719 100644 --- a/data_integration/commands/sql.py +++ b/data_integration/commands/sql.py @@ -140,7 +140,8 @@ class Copy(_SQLCommand): def __init__(self, source_db_alias: str, target_table: str, target_db_alias: str = None, sql_statement: str = None, sql_file_name: Union[Callable, str] = None, replace: {str: str} = None, - timezone: str = None, csv_format: bool = None, delimiter_char: str = None) -> None: + timezone: str = None, csv_format: bool = None, delimiter_char: str = None, + file_dependencies = None) -> None: _SQLCommand.__init__(self, sql_statement, sql_file_name, replace) self.source_db_alias = source_db_alias self.target_table = target_table @@ -148,6 +149,7 @@ def __init__(self, source_db_alias: str, target_table: str, target_db_alias: str self.timezone = timezone self.csv_format = csv_format self.delimiter_char = delimiter_char + self.file_dependencies = file_dependencies or [] @property def target_db_alias(self): @@ -156,6 +158,28 @@ def target_db_alias(self): def file_path(self) -> pathlib.Path: return self.parent.parent.base_path() / self.file_name + def run(self) -> bool: + if self.sql_file_name: + logger.log(self.sql_file_name, logger.Format.ITALICS) + + dependency_type = 'Copy ' + (self.sql_file_name or self.sql_statement) + + if self.file_dependencies: + assert (self.parent) + pipeline_base_path = self.parent.parent.base_path() + if not file_dependencies.is_modified(self.node_path(), dependency_type, + pipeline_base_path, + self.file_dependencies): + logger.log('no changes') + return True + + if not super().run(): + return False + + if self.file_dependencies: + file_dependencies.update(self.node_path(), dependency_type, pipeline_base_path, self.file_dependencies) + return True + def shell_command(self): return _SQLCommand.shell_command(self) \ + ' | ' + mara_db.shell.copy_command(self.source_db_alias, self.target_db_alias, self.target_table,