From ad6e808624dad725c591e82518bcb59d6fcda406 Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Fri, 28 Apr 2023 11:46:31 -0500 Subject: [PATCH 1/3] Add argument for DeltaTable storage options --- dask_deltatable/core.py | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/dask_deltatable/core.py b/dask_deltatable/core.py index f833c16..476423e 100644 --- a/dask_deltatable/core.py +++ b/dask_deltatable/core.py @@ -29,13 +29,14 @@ def __init__( columns: List[str], datetime: Optional[str] = None, storage_options: Dict[str, str] = None, + delta_storage_options: Dict[str, str] = None, ) -> None: self.path: str = path self.version: int = version self.columns = columns self.datetime = datetime self.storage_options = storage_options - self.dt = DeltaTable(table_uri=self.path, version=self.version) + self.dt = DeltaTable(table_uri=self.path, version=self.version, storage_options=delta_storage_options) self.fs, self.fs_token, _ = get_fs_token_paths( path, storage_options=storage_options ) @@ -198,6 +199,7 @@ def read_delta_table( columns: List[str] = None, storage_options: Dict[str, str] = None, datetime: str = None, + delta_storage_options: Dict[str, str] = None, **kwargs, ): """ @@ -235,7 +237,9 @@ def read_delta_table( columns: None or list(str) Columns to load. If None, loads all. storage_options : dict, default None - Key/value pairs to be passed on to the file-system backend, if any. + Key/value pairs to be passed on to the fsspec backend, if any. + delta_storage_options : dict, default None + Key/value pairs to be passed on to the delta-rs filesystem, if any. kwargs: dict,optional Some most used parameters can be passed here are: 1. schema @@ -282,13 +286,15 @@ def read_delta_table( columns=columns, storage_options=storage_options, datetime=datetime, + delta_storage_options=delta_storage_options, ) resultdf = dtw.read_delta_table(columns=columns, **kwargs) return resultdf def read_delta_history( - path: str, limit: Optional[int] = None, storage_options: Dict[str, str] = None + path: str, limit: Optional[int] = None, storage_options: Dict[str, str] = None, + delta_storage_options: Dict[str, str] = None, ) -> dd.core.DataFrame: """ Run the history command on the DeltaTable. @@ -310,7 +316,8 @@ def read_delta_history( """ dtw = DeltaTableWrapper( - path=path, version=None, columns=None, storage_options=storage_options + path=path, version=None, columns=None, storage_options=storage_options, + delta_storage_options=delta_storage_options, ) return dtw.history(limit=limit) @@ -320,6 +327,7 @@ def vacuum( retention_hours: int = 168, dry_run: bool = True, storage_options: Dict[str, str] = None, + delta_storage_options: Dict[str, str] = None, ) -> None: """ Run the Vacuum command on the Delta Table: list and delete @@ -341,6 +349,7 @@ def vacuum( """ dtw = DeltaTableWrapper( - path=path, version=None, columns=None, storage_options=storage_options + path=path, version=None, columns=None, storage_options=storage_options, + delta_storage_options=delta_storage_optoins, ) return dtw.vacuum(retention_hours=retention_hours, dry_run=dry_run) From b8cc9511ae2117862f0d77d629478c8bf2687779 Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Sat, 29 Apr 2023 07:55:20 -0500 Subject: [PATCH 2/3] fixup --- dask_deltatable/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_deltatable/core.py b/dask_deltatable/core.py index 476423e..91428ff 100644 --- a/dask_deltatable/core.py +++ b/dask_deltatable/core.py @@ -350,6 +350,6 @@ def vacuum( dtw = DeltaTableWrapper( path=path, version=None, columns=None, storage_options=storage_options, - delta_storage_options=delta_storage_optoins, + delta_storage_options=delta_storage_options, ) return dtw.vacuum(retention_hours=retention_hours, dry_run=dry_run) From 4ca8e35db7ecd03112f8f103cb67a611a326d7b9 Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Sat, 29 Apr 2023 07:56:08 -0500 Subject: [PATCH 3/3] formatting --- dask_deltatable/core.py | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/dask_deltatable/core.py b/dask_deltatable/core.py index 91428ff..383ded4 100644 --- a/dask_deltatable/core.py +++ b/dask_deltatable/core.py @@ -36,7 +36,11 @@ def __init__( self.columns = columns self.datetime = datetime self.storage_options = storage_options - self.dt = DeltaTable(table_uri=self.path, version=self.version, storage_options=delta_storage_options) + self.dt = DeltaTable( + table_uri=self.path, + version=self.version, + storage_options=delta_storage_options, + ) self.fs, self.fs_token, _ = get_fs_token_paths( path, storage_options=storage_options ) @@ -293,7 +297,9 @@ def read_delta_table( def read_delta_history( - path: str, limit: Optional[int] = None, storage_options: Dict[str, str] = None, + path: str, + limit: Optional[int] = None, + storage_options: Dict[str, str] = None, delta_storage_options: Dict[str, str] = None, ) -> dd.core.DataFrame: """ @@ -316,7 +322,10 @@ def read_delta_history( """ dtw = DeltaTableWrapper( - path=path, version=None, columns=None, storage_options=storage_options, + path=path, + version=None, + columns=None, + storage_options=storage_options, delta_storage_options=delta_storage_options, ) return dtw.history(limit=limit) @@ -349,7 +358,10 @@ def vacuum( """ dtw = DeltaTableWrapper( - path=path, version=None, columns=None, storage_options=storage_options, + path=path, + version=None, + columns=None, + storage_options=storage_options, delta_storage_options=delta_storage_options, ) return dtw.vacuum(retention_hours=retention_hours, dry_run=dry_run)