Skip to content

Commit

Permalink
Include file_dependencies as variable for Copy Commands. (#17)
Browse files Browse the repository at this point in the history
  • Loading branch information
jieyinybl authored Aug 1, 2019
1 parent c1ad2ba commit 06dae52
Showing 1 changed file with 25 additions and 1 deletion.
26 changes: 25 additions & 1 deletion data_integration/commands/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,16 @@ class Copy(_SQLCommand):

def __init__(self, source_db_alias: str, target_table: str, target_db_alias: str = None,
sql_statement: str = None, sql_file_name: Union[Callable, str] = None, replace: {str: str} = None,
timezone: str = None, csv_format: bool = None, delimiter_char: str = None) -> None:
timezone: str = None, csv_format: bool = None, delimiter_char: str = None,
file_dependencies = None) -> None:
_SQLCommand.__init__(self, sql_statement, sql_file_name, replace)
self.source_db_alias = source_db_alias
self.target_table = target_table
self._target_db_alias = target_db_alias
self.timezone = timezone
self.csv_format = csv_format
self.delimiter_char = delimiter_char
self.file_dependencies = file_dependencies or []

@property
def target_db_alias(self):
Expand All @@ -156,6 +158,28 @@ def target_db_alias(self):
def file_path(self) -> pathlib.Path:
return self.parent.parent.base_path() / self.file_name

def run(self) -> bool:
if self.sql_file_name:
logger.log(self.sql_file_name, logger.Format.ITALICS)

dependency_type = 'Copy ' + (self.sql_file_name or self.sql_statement)

if self.file_dependencies:
assert (self.parent)
pipeline_base_path = self.parent.parent.base_path()
if not file_dependencies.is_modified(self.node_path(), dependency_type,
pipeline_base_path,
self.file_dependencies):
logger.log('no changes')
return True

if not super().run():
return False

if self.file_dependencies:
file_dependencies.update(self.node_path(), dependency_type, pipeline_base_path, self.file_dependencies)
return True

def shell_command(self):
return _SQLCommand.shell_command(self) \
+ ' | ' + mara_db.shell.copy_command(self.source_db_alias, self.target_db_alias, self.target_table,
Expand Down

0 comments on commit 06dae52

Please # to comment.