Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

feat(python): expose MERGE operation #1685

Merged
merged 42 commits into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
80d328b
save work to continue on other pc
ion-elgreco Sep 23, 2023
f62abab
Setup merge skeleton on python side
ion-elgreco Sep 23, 2023
fd07d5f
use arrow
ion-elgreco Sep 26, 2023
57932f4
Merge branch 'main' into feat/expose_merge_python
ion-elgreco Sep 26, 2023
4e4e432
save to continue home pc
ion-elgreco Sep 29, 2023
ee7860f
save
ion-elgreco Sep 29, 2023
774ff55
Merge branch 'main' into feat/expose_merge_python
ion-elgreco Sep 29, 2023
7176771
Allow metrics to be serialized
ion-elgreco Sep 30, 2023
ec7c2b2
make datafusion_utils mod public
ion-elgreco Sep 30, 2023
2ded189
Comment the deny missing docs for now : )
ion-elgreco Sep 30, 2023
ecef07b
Add merge_execute
ion-elgreco Sep 30, 2023
239af35
Add logic to create (col, expr) hashmap for updates/set
ion-elgreco Oct 1, 2023
4a927a9
Remove println
ion-elgreco Oct 1, 2023
bcb8d4a
Use Typing type hints
ion-elgreco Oct 2, 2023
1c866b1
Fix rust lints
ion-elgreco Oct 2, 2023
2522d3e
Add merge when_matched_delete test
ion-elgreco Oct 2, 2023
08497d3
Move property to merge
ion-elgreco Oct 2, 2023
611d93a
Add all test cases
ion-elgreco Oct 2, 2023
6a05320
Fix lint and type hint
ion-elgreco Oct 3, 2023
7770b4e
Add type hints
ion-elgreco Oct 3, 2023
2e3c388
Merge branch 'main' into feat/expose_merge_python
ion-elgreco Oct 3, 2023
28734be
add into
ion-elgreco Oct 3, 2023
1fd7909
simplify code
ion-elgreco Oct 3, 2023
3c4493c
Move fixture
ion-elgreco Oct 3, 2023
15af04e
format
ion-elgreco Oct 3, 2023
ec82c91
Merge branch 'main' into feat/expose_merge_python
ion-elgreco Oct 6, 2023
98a83d6
Use target_alias, fix tests
ion-elgreco Oct 7, 2023
de56cec
Add passing tests
ion-elgreco Oct 7, 2023
70dc65a
formatting
ion-elgreco Oct 7, 2023
b0de38e
Make consistent with update
ion-elgreco Oct 7, 2023
a042273
Merge branch 'main' into feat/expose_merge_python
ion-elgreco Oct 12, 2023
0d9b8e6
Merge branch 'main' into feat/expose_merge_python
ion-elgreco Oct 12, 2023
cc7fd15
add target_alias
ion-elgreco Oct 12, 2023
547bcaf
Add update_all and insert_all method
ion-elgreco Oct 12, 2023
5a2e66f
formatting + update docs
ion-elgreco Oct 12, 2023
d045ec1
Adjust test cases to include less or different columns compared to ta…
ion-elgreco Oct 14, 2023
06480ef
Add when_not_matched_by_source_delete_wo_predicate test
ion-elgreco Oct 14, 2023
0c1af87
use recordbatchrearder and fix bug
ion-elgreco Oct 14, 2023
e83d327
Merge branch 'main' into feat/expose_merge_python
ion-elgreco Oct 14, 2023
a35f67a
resolve lint errors
ion-elgreco Oct 14, 2023
c7709cb
clean up code
ion-elgreco Oct 15, 2023
ad41a7d
remove debug print
ion-elgreco Oct 15, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions python/deltalake/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,25 @@ class RawDeltaTable:
writer_properties: Optional[Dict[str, int]],
safe_cast: bool = False,
) -> str: ...
def merge_execute(
self,
source: pa.RecordBatch,
predicate: str,
source_alias: Optional[str],
target_alias: Optional[str],
writer_properties: Optional[Dict[str, int | None]],
safe_cast: bool,
matched_update_updates: Optional[Dict[str, str]],
matched_update_predicate: Optional[str],
matched_delete_predicate: Optional[str],
matched_delete_all: Optional[bool],
not_matched_insert_updates: Optional[Dict[str, str]],
not_matched_insert_predicate: Optional[str],
not_matched_by_source_update_updates: Optional[Dict[str, str]],
not_matched_by_source_update_predicate: Optional[str],
not_matched_by_source_delete_predicate: Optional[str],
not_matched_by_source_delete_all: Optional[bool],
) -> str: ...
def get_active_partitions(
self, partitions_filters: Optional[FilterType] = None
) -> Any: ...
Expand Down
337 changes: 337 additions & 0 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,40 @@ def optimize(
) -> "TableOptimizer":
return TableOptimizer(self)

def merge(
self,
source: Union[pyarrow.Table, pyarrow.RecordBatch],
predicate: str,
source_alias: Optional[str] = None,
target_alias: Optional[str] = None,
error_on_type_mismatch: bool = True,
) -> "TableMerger":
"""Pass the source data which you want to merge on the target delta table, providing a
predicate in SQL query like format. You can also specify on what to do when the underlying data types do not
match the underlying table.

Args:
source (pyarrow.Table | pyarrow.RecordBatch): source data
predicate (str): SQL like predicate on how to merge
source_alias (str): Alias for the source table
target_alias (str): Alias for the target table
error_on_type_mismatch (bool): specify if merge will return error if data types are mismatching :default = True

Returns:
TableMerger: TableMerger Object
"""
if isinstance(source, pyarrow.Table):
source = source.to_batches()[0]

return TableMerger(
self,
source=source,
predicate=predicate,
source_alias=source_alias,
target_alias=target_alias,
safe_cast=not error_on_type_mismatch,
)

def pyarrow_schema(self) -> pyarrow.Schema:
"""
Get the current schema of the DeltaTable with the Parquet PyArrow format.
Expand Down Expand Up @@ -747,6 +781,309 @@ def delete(self, predicate: Optional[str] = None) -> Dict[str, Any]:
return json.loads(metrics)


class TableMerger:
"""API for various table MERGE commands."""

def __init__(
self,
table: DeltaTable,
source: Union[pyarrow.Table, pyarrow.RecordBatch],
predicate: str,
source_alias: Optional[str] = None,
target_alias: Optional[str] = None,
safe_cast: bool = True,
):
self.table = table
self.source = source
self.predicate = predicate
self.source_alias = source_alias
self.target_alias = target_alias
self.safe_cast = safe_cast
self.writer_properties: Optional[Dict[str, Optional[int]]] = None
self.matched_update_updates: Optional[Dict[str, str]] = None
self.matched_update_predicate: Optional[str] = None
self.matched_delete_predicate: Optional[str] = None
self.matched_delete_all: Optional[bool] = None
self.not_matched_insert_updates: Optional[Dict[str, str]] = None
self.not_matched_insert_predicate: Optional[str] = None
self.not_matched_by_source_update_updates: Optional[Dict[str, str]] = None
self.not_matched_by_source_update_predicate: Optional[str] = None
self.not_matched_by_source_delete_predicate: Optional[str] = None
self.not_matched_by_source_delete_all: Optional[bool] = None

def with_writer_properties(
self,
data_page_size_limit: Optional[int] = None,
dictionary_page_size_limit: Optional[int] = None,
data_page_row_count_limit: Optional[int] = None,
write_batch_size: Optional[int] = None,
max_row_group_size: Optional[int] = None,
) -> "TableMerger":
"""Pass writer properties to the Rust parquet writer, see options https://arrow.apache.org/rust/parquet/file/properties/struct.WriterProperties.html:

Args:
data_page_size_limit (int|None, optional): Limit DataPage size to this in bytes. Defaults to None.
dictionary_page_size_limit (int|None, optional): Limit the size of each DataPage to store dicts to this amount in bytes. Defaults to None.
data_page_row_count_limit (int|None, optional): Limit the number of rows in each DataPage. Defaults to None.
write_batch_size (int|None, optional): Splits internally to smaller batch size. Defaults to None.
max_row_group_size (int|None, optional): Max number of rows in row group. Defaults to None.

Returns:
TableMerger: TableMerger Object
"""
writer_properties = {
"data_page_size_limit": data_page_size_limit,
"dictionary_page_size_limit": dictionary_page_size_limit,
"data_page_row_count_limit": data_page_row_count_limit,
"write_batch_size": write_batch_size,
"max_row_group_size": max_row_group_size,
}
self.writer_properties = writer_properties
return self

def when_matched_update(
self, updates: Dict[str, str], predicate: Optional[str] = None
) -> "TableMerger":
"""Update a matched table row based on the rules defined by ``updates``.
If a ``predicate`` is specified, then it must evaluate to true for the row to be updated.

Args:
updates (dict): a mapping of column name to update SQL expression.
predicate (str | None, optional): SQL like predicate on when to update. Defaults to None.

Returns:
TableMerger: TableMerger Object

Examples:

>>> from deltalake import DeltaTable
>>> import pyarrow as pa
>>> data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]})
>>> dt = DeltaTable("tmp")
>>> dt.merge(source=data, predicate='target.x = source.x', source_alias='source', target_alias='target') \
... .when_matched_update(
... updates = {
... "x": "source.x",
... "y": "source.y"
... }
... ).execute()
"""
self.matched_update_updates = updates
self.matched_update_predicate = predicate
return self

def when_matched_update_all(self, predicate: Optional[str] = None) -> "TableMerger":
"""Updating all source fields to target fields, source and target are required to have the same field names.
If a ``predicate`` is specified, then it must evaluate to true for the row to be updated.

Args:
predicate (str | None, optional): SQL like predicate on when to update all columns. Defaults to None.

Returns:
TableMerger: TableMerger Object

Examples:

>>> from deltalake import DeltaTable
>>> import pyarrow as pa
>>> data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]})
>>> dt = DeltaTable("tmp")
>>> dt.merge(source=data, predicate='target.x = source.x', source_alias='source', target_alias='target') \
... .when_matched_update_all().execute()
"""

src_alias = (self.source_alias + ".") if self.source_alias is not None else ""
trgt_alias = (self.target_alias + ".") if self.target_alias is not None else ""

self.matched_update_updates = {
f"{trgt_alias}{col.name}": f"{src_alias}{col.name}"
for col in self.source.schema
}
print(self.matched_update_updates)
self.matched_update_predicate = predicate
return self

def when_matched_delete(self, predicate: Optional[str] = None) -> "TableMerger":
"""Delete a matched row from the table only if the given ``predicate`` (if specified) is
true for the matched row. If not specified it deletes all matches.

Args:
predicate (str | None, optional): SQL like predicate on when to delete. Defaults to None.

Returns:
TableMerger: TableMerger Object

Examples:

Delete on a predicate

>>> from deltalake import DeltaTable
>>> import pyarrow as pa
>>> data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]})
>>> dt = DeltaTable("tmp")
>>> dt.merge(source=data, predicate='target.x = source.x', source_alias='source', target_alias='target') \
... .when_matched_delete(predicate = "source.deleted = true")
... .execute()

Delete all records that were matched

>>> from deltalake import DeltaTable
>>> import pyarrow as pa
>>> data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]})
>>> dt = DeltaTable("tmp")
>>> dt.merge(source=data, predicate='target.x = source.x', source_alias='source', target_alias='target') \
... .when_matched_delete()
... .execute()
"""

if predicate is None:
self.matched_delete_all = True
else:
self.matched_delete_predicate = predicate
return self

def when_not_matched_insert(
self, updates: Dict[str, str], predicate: Optional[str] = None
) -> "TableMerger":
"""Insert a new row to the target table based on the rules defined by ``updates``. If a
``predicate`` is specified, then it must evaluate to true for the new row to be inserted.

Args:
updates (dict): a mapping of column name to insert SQL expression.
predicate (str | None, optional): SQL like predicate on when to insert. Defaults to None.

Returns:
TableMerger: TableMerger Object

Examples:

>>> from deltalake import DeltaTable
>>> import pyarrow as pa
>>> data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]})
>>> dt = DeltaTable("tmp")
>>> dt.merge(source=data, predicate='target.x = source.x', source_alias='source', target_alias='target') \
... .when_not_matched_insert(
... updates = {
... "x": "source.x",
... "y": "source.y"
... }
... ).execute()
"""

self.not_matched_insert_updates = updates
self.not_matched_insert_predicate = predicate

return self

def when_not_matched_insert_all(
self, predicate: Optional[str] = None
) -> "TableMerger":
"""Insert a new row to the target table, updating all source fields to target fields. Source and target are
required to have the same field names. If a ``predicate`` is specified, then it must evaluate to true for
the new row to be inserted.

Args:
predicate (str | None, optional): SQL like predicate on when to insert. Defaults to None.

Returns:
TableMerger: TableMerger Object

Examples:

>>> from deltalake import DeltaTable
>>> import pyarrow as pa
>>> data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]})
>>> dt = DeltaTable("tmp")
>>> dt.merge(source=data, predicate='target.x = source.x', source_alias='source', target_alias='target') \
... .when_not_matched_insert_all().execute()
"""

src_alias = (self.source_alias + ".") if self.source_alias is not None else ""
trgt_alias = (self.target_alias + ".") if self.target_alias is not None else ""
self.not_matched_insert_updates = {
f"{trgt_alias}{col.name}": f"{src_alias}{col.name}"
for col in self.source.schema
}
self.not_matched_insert_predicate = predicate
return self

def when_not_matched_by_source_update(
self, updates: Dict[str, str], predicate: Optional[str] = None
) -> "TableMerger":
"""Update a target row that has no matches in the source based on the rules defined by ``updates``.
If a ``predicate`` is specified, then it must evaluate to true for the row to be updated.

Args:
updates (dict): a mapping of column name to update SQL expression.
predicate (str | None, optional): SQL like predicate on when to update. Defaults to None.

Returns:
TableMerger: TableMerger Object

>>> from deltalake import DeltaTable
>>> import pyarrow as pa
>>> data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]})
>>> dt = DeltaTable("tmp")
>>> dt.merge(source=data, predicate='target.x = source.x', source_alias='source', target_alias='target') \
... .when_not_matched_by_source_update(
... predicate = "y > 3"
... updates = {
... "y": "0",
... }
... ).execute()
"""
self.not_matched_by_source_update_updates = updates
self.not_matched_by_source_update_predicate = predicate
return self

def when_not_matched_by_source_delete(
self, predicate: Optional[str] = None
) -> "TableMerger":
"""Delete a target row that has no matches in the source from the table only if the given
``predicate`` (if specified) is true for the target row.

Args:
updates (dict): a mapping of column name to update SQL expression.
predicate (str | None, optional): SQL like predicate on when to delete when not matched by source. Defaults to None.

Returns:
TableMerger: TableMerger Object
"""

if predicate is None:
self.not_matched_by_source_delete_all = True
else:
self.not_matched_by_source_delete_predicate = predicate
return self

def execute(self) -> Dict[str, Any]:
"""Executes MERGE with the previously provided settings in Rust with Apache Datafusion query engine.

Returns:
Dict[str, any]: metrics
"""
metrics = self.table._table.merge_execute(
source=self.source,
predicate=self.predicate,
source_alias=self.source_alias,
target_alias=self.target_alias,
safe_cast=self.safe_cast,
writer_properties=self.writer_properties,
matched_update_updates=self.matched_update_updates,
matched_update_predicate=self.matched_update_predicate,
matched_delete_predicate=self.matched_delete_predicate,
matched_delete_all=self.matched_delete_all,
not_matched_insert_updates=self.not_matched_insert_updates,
not_matched_insert_predicate=self.not_matched_insert_predicate,
not_matched_by_source_update_updates=self.not_matched_by_source_update_updates,
not_matched_by_source_update_predicate=self.not_matched_by_source_update_predicate,
not_matched_by_source_delete_predicate=self.not_matched_by_source_delete_predicate,
not_matched_by_source_delete_all=self.not_matched_by_source_delete_all,
)
self.table.update_incremental()
return json.loads(metrics)


class TableOptimizer:
"""API for various table optimization commands."""

Expand Down
Loading