Skip to content

Commit

Permalink
Remove inferred indices (#438)
Browse files Browse the repository at this point in the history
  • Loading branch information
fjetter authored Mar 16, 2021
1 parent b349cee commit b1e2535
Show file tree
Hide file tree
Showing 6 changed files with 17 additions and 27 deletions.
7 changes: 3 additions & 4 deletions kartothek/io/dask/_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from kartothek.core.typing import StoreFactory
from kartothek.io.dask.compression import pack_payload, unpack_payload_pandas
from kartothek.io_components.metapartition import MetaPartition
from kartothek.io_components.utils import InferredIndices
from kartothek.io_components.write import write_partition
from kartothek.serialization import DataFrameSerializer

Expand All @@ -36,7 +35,7 @@ def _hash_bucket(df: pd.DataFrame, subset: Optional[Sequence[str]], num_buckets:
def shuffle_store_dask_partitions(
ddf: dd.DataFrame,
table: str,
secondary_indices: Optional[InferredIndices],
secondary_indices: List[str],
metadata_version: int,
partition_on: List[str],
store_factory: StoreFactory,
Expand Down Expand Up @@ -132,11 +131,11 @@ def shuffle_store_dask_partitions(

def _unpack_store_partition(
df: pd.DataFrame,
secondary_indices: Optional[InferredIndices],
secondary_indices: List[str],
sort_partitions_by: List[str],
table: str,
dataset_uuid: str,
partition_on: Optional[List[str]],
partition_on: List[str],
store_factory: StoreFactory,
df_serializer: DataFrameSerializer,
metadata_version: int,
Expand Down
3 changes: 1 addition & 2 deletions kartothek/io/dask/common_cube.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ def ensure_valid_cube_indices(
dataset_columns = set(ds.schema.names)
table_indices = cube.index_columns & dataset_columns
compatible_indices = _ensure_compatible_indices(ds, table_indices)
if compatible_indices:
dataset_indices.append(set(compatible_indices))
dataset_indices.append(set(compatible_indices))
required_indices = cube.index_columns.union(*dataset_indices)
suppress_index_on = cube.suppress_index_on.difference(*dataset_indices)

Expand Down
3 changes: 1 addition & 2 deletions kartothek/io/dask/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
from kartothek.io_components.read import dispatch_metapartitions_from_factory
from kartothek.io_components.update import update_dataset_from_partitions
from kartothek.io_components.utils import (
InferredIndices,
_ensure_compatible_indices,
normalize_arg,
normalize_args,
Expand Down Expand Up @@ -307,7 +306,7 @@ def _write_dataframe_partitions(
store: StoreFactory,
dataset_uuid: str,
table: str,
secondary_indices: Optional[InferredIndices],
secondary_indices: List[str],
shuffle: bool,
repartition_ratio: Optional[SupportsFloat],
num_buckets: int,
Expand Down
3 changes: 3 additions & 0 deletions kartothek/io/eager.py
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,9 @@ def update_dataset_from_dataframes(
partition_on=partition_on,
)

# ensured by normalize_args but mypy doesn't recognize it
assert secondary_indices is not None

inferred_indices = _ensure_compatible_indices(ds_factory, secondary_indices)
del secondary_indices

Expand Down
19 changes: 5 additions & 14 deletions kartothek/io_components/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@
except ImportError:
from typing import Literal # type: ignore

# Literal false is sentinel, see function body of `_ensure_compatible_indices` for details
InferredIndices = Union[Literal[False], List[str]]

signature = inspect.signature


Expand Down Expand Up @@ -113,10 +110,10 @@ def _combine_metadata(dataset_metadata, append_to_list):


def _ensure_compatible_indices(
dataset: Optional[DatasetMetadataBase], secondary_indices: Optional[Iterable[str]],
) -> InferredIndices:
dataset: Optional[DatasetMetadataBase], secondary_indices: Iterable[str],
) -> List[str]:
if dataset:
ds_secondary_indices = list(dataset.secondary_indices.keys())
ds_secondary_indices = sorted(dataset.secondary_indices.keys())

if secondary_indices and not set(secondary_indices).issubset(
ds_secondary_indices
Expand All @@ -126,15 +123,9 @@ def _ensure_compatible_indices(
f"Expected: {ds_secondary_indices}\n"
f"But got: {secondary_indices}"
)

return ds_secondary_indices
else:
# We return `False` if there is no dataset in storage and `secondary_indices` is undefined
# (`secondary_indices` is normalized to `[]` by default).
# In consequence, `parse_input_to_metapartition` will not check indices at the partition level.
if secondary_indices:
return list(secondary_indices)
else:
return False
return sorted(secondary_indices)


def validate_partition_keys(
Expand Down
9 changes: 4 additions & 5 deletions kartothek/io_components/write.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from functools import partial
from typing import Dict, Iterable, Optional, Sequence, Union, cast
from typing import Dict, Iterable, List, Optional, cast

from simplekv import KeyValueStore

Expand All @@ -23,7 +23,6 @@
partition_labels_from_mps,
)
from kartothek.io_components.utils import (
InferredIndices,
combine_metadata,
extract_duplicates,
sort_values_categorical,
Expand All @@ -35,10 +34,10 @@

def write_partition(
partition_df: MetaPartitionInput,
secondary_indices: Optional[InferredIndices],
sort_partitions_by: Optional[Union[str, Sequence[str]]],
secondary_indices: List[str],
sort_partitions_by: List[str],
dataset_uuid: str,
partition_on: Optional[Union[str, Sequence[str]]],
partition_on: List[str],
store_factory: StoreFactory,
df_serializer: Optional[DataFrameSerializer],
metadata_version: int,
Expand Down

0 comments on commit b1e2535

Please # to comment.