Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Add argument for DeltaTable storage options #6

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 26 additions & 5 deletions dask_deltatable/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,18 @@ def __init__(
columns: List[str],
datetime: Optional[str] = None,
storage_options: Dict[str, str] = None,
delta_storage_options: Dict[str, str] = None,
) -> None:
self.path: str = path
self.version: int = version
self.columns = columns
self.datetime = datetime
self.storage_options = storage_options
self.dt = DeltaTable(table_uri=self.path, version=self.version)
self.dt = DeltaTable(
table_uri=self.path,
version=self.version,
storage_options=delta_storage_options,
)
self.fs, self.fs_token, _ = get_fs_token_paths(
path, storage_options=storage_options
)
Expand Down Expand Up @@ -198,6 +203,7 @@ def read_delta_table(
columns: List[str] = None,
storage_options: Dict[str, str] = None,
datetime: str = None,
delta_storage_options: Dict[str, str] = None,
**kwargs,
):
"""
Expand Down Expand Up @@ -235,7 +241,9 @@ def read_delta_table(
columns: None or list(str)
Columns to load. If None, loads all.
storage_options : dict, default None
Key/value pairs to be passed on to the file-system backend, if any.
Key/value pairs to be passed on to the fsspec backend, if any.
delta_storage_options : dict, default None
Key/value pairs to be passed on to the delta-rs filesystem, if any.
kwargs: dict,optional
Some most used parameters can be passed here are:
1. schema
Expand Down Expand Up @@ -282,13 +290,17 @@ def read_delta_table(
columns=columns,
storage_options=storage_options,
datetime=datetime,
delta_storage_options=delta_storage_options,
)
resultdf = dtw.read_delta_table(columns=columns, **kwargs)
return resultdf


def read_delta_history(
path: str, limit: Optional[int] = None, storage_options: Dict[str, str] = None
path: str,
limit: Optional[int] = None,
storage_options: Dict[str, str] = None,
delta_storage_options: Dict[str, str] = None,
) -> dd.core.DataFrame:
"""
Run the history command on the DeltaTable.
Expand All @@ -310,7 +322,11 @@ def read_delta_history(
"""

dtw = DeltaTableWrapper(
path=path, version=None, columns=None, storage_options=storage_options
path=path,
version=None,
columns=None,
storage_options=storage_options,
delta_storage_options=delta_storage_options,
)
return dtw.history(limit=limit)

Expand All @@ -320,6 +336,7 @@ def vacuum(
retention_hours: int = 168,
dry_run: bool = True,
storage_options: Dict[str, str] = None,
delta_storage_options: Dict[str, str] = None,
) -> None:
"""
Run the Vacuum command on the Delta Table: list and delete
Expand All @@ -341,6 +358,10 @@ def vacuum(
"""

dtw = DeltaTableWrapper(
path=path, version=None, columns=None, storage_options=storage_options
path=path,
version=None,
columns=None,
storage_options=storage_options,
delta_storage_options=delta_storage_options,
)
return dtw.vacuum(retention_hours=retention_hours, dry_run=dry_run)