Skip to content

Commit

Permalink
Merge pull request #119 from VIDA-NYU/dev-0.3.2
Browse files Browse the repository at this point in the history
Dev 0.3.2
  • Loading branch information
heikomuller authored Apr 6, 2021
2 parents 584abd7 + 3ea320a commit da6c501
Show file tree
Hide file tree
Showing 9 changed files with 111 additions and 28 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,9 @@
### 0.3.1 - 2021-03-30

* Add optional version parameter when requesting metadata for a dataset version in `openclean.engine.dataset.DatasetHandle`.


### 0.3.2 - 2021-04-06

* Make checking out a committed dataset in the `openclean.data.archive.base.ArchiveStore` optional.
* Enable cache refresh for cached datasets in `openclean.data.archive.cache.CachedDatastore`.
14 changes: 11 additions & 3 deletions openclean/data/archive/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,18 +70,26 @@ def checkout(self, version: Optional[int] = None) -> pd.DataFrame:
raise NotImplementedError() # pragma: no cover

@abstractmethod
def commit(self, df: pd.DataFrame, action: Optional[ActionHandle] = None) -> pd.DataFrame:
def commit(
self, df: pd.DataFrame, action: Optional[ActionHandle] = None,
checkout: Optional[bool] = False
) -> pd.DataFrame:
"""Insert a new dataset snapshot.
Returns the inserted data frame (after potentially modifying the row
indexes) and the version identifier for the commited version.
Returns the inserted data frame with potentially modified row indexes.
Parameters
----------
df: pd.DataFrame
Data frame containing the new dataset version that is being stored.
action: openclean.data.archive.base.ActionHandle, default=None
Optional handle of the action that created the new dataset version.
checkout: bool, default=False
Checkout the commited snapshot and return the result. This option
is required only if the row index of the given data frame has been
modified by the commit operation, i.e., if the index of the given
data frame contained non-integers, negative values, or duplicate
values.
Returns
-------
Expand Down
25 changes: 20 additions & 5 deletions openclean/data/archive/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ def __init__(self, datastore: ArchiveStore):
self.datastore = datastore
self._cache = None

def checkout(self, version: Optional[int] = None) -> pd.DataFrame:
def checkout(
self, version: Optional[int] = None, no_cache: Optional[bool] = False
) -> pd.DataFrame:
"""Get a specific version of a dataset. The dataset snapshot is
identified by the unique version identifier.
Expand All @@ -55,6 +57,9 @@ def checkout(self, version: Optional[int] = None) -> pd.DataFrame:
----------
version: int
Unique dataset version identifier.
no_cache: bool, default=None
If True, ignore cached dataset version and checkout the dataset
from the associated data store.
Returns
-------
Expand All @@ -68,7 +73,7 @@ def checkout(self, version: Optional[int] = None) -> pd.DataFrame:
if version is None:
version = self.datastore.last_version()
# Serve dataset from cache if present.
if self._cache is not None:
if self._cache is not None and not no_cache:
# If the requested version matches the cached version return the
# cached data frame.
if version == self._cache.version:
Expand All @@ -79,16 +84,26 @@ def checkout(self, version: Optional[int] = None) -> pd.DataFrame:
self._cache = CacheEntry(df=df, version=version)
return df

def commit(self, df: pd.DataFrame, action: Optional[ActionHandle] = None) -> pd.DataFrame:
"""Insert a new version for a dataset. Returns the inserted data frame
(after potentially modifying the row indexes).
def commit(
self, df: pd.DataFrame, action: Optional[ActionHandle] = None,
checkout: Optional[bool] = False
) -> pd.DataFrame:
"""Insert a new version for a dataset.
Returns the inserted data frame with potentially modified row indexes.
Parameters
----------
df: pd.DataFrame
Data frame containing the new dataset version that is being stored.
action: openclean.data.archive.base.ActionHandle, default=None
Optional handle of the action that created the new dataset version.
checkout: bool, default=False
Checkout the commited snapshot and return the result. This option
is required only if the row index of the given data frame has been
modified by the commit operation, i.e., if the index of the given
data frame contained non-integers, negative values, or duplicate
values.
Returns
-------
Expand Down
20 changes: 16 additions & 4 deletions openclean/data/archive/histore.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,28 @@ def checkout(self, version: Optional[int] = None) -> pd.DataFrame:
"""
return self.archive.checkout(version=version)

def commit(self, df: pd.DataFrame, action: Optional[ActionHandle] = None) -> pd.DataFrame:
"""Insert a new version for a dataset. Returns the inserted data frame
(after potentially modifying the row indexes).
def commit(
self, df: pd.DataFrame, action: Optional[ActionHandle] = None,
checkout: Optional[bool] = False
) -> pd.DataFrame:
"""Insert a new version for a dataset.
Returns the inserted data frame. If the ``checkout`` flag is True the
commited dataset is checked out to account for possible changes to the
row index. If the flag is set to False the given data frame is returned.
Parameters
----------
df: pd.DataFrame
Data frame containing the new dataset version that is being stored.
action: openclean.data.archive.base.ActionHandle, default=None
Optional handle of the action that created the new dataset version.
checkout: bool, default=False
Checkout the commited snapshot and return the result. This option
is required only if the row index of the given data frame has been
modified by the commit operation, i.e., if the index of the given
data frame contained non-integers, negative values, or duplicate
values.
Returns
-------
Expand All @@ -82,7 +94,7 @@ def commit(self, df: pd.DataFrame, action: Optional[ActionHandle] = None) -> pd.
doc=df,
action=action.to_dict() if action is not None else None
)
return self.archive.checkout(version=self._last_snapshot.version)
return self.archive.checkout(version=self._last_snapshot.version) if checkout else df

def last_version(self) -> int:
"""Get a identifier for the last version of a dataset.
Expand Down
10 changes: 7 additions & 3 deletions openclean/engine/library.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"""

from __future__ import annotations
from typing import Callable, Iterable, List, Optional
from typing import Callable, Iterable, List, Optional, Union

from openclean.data.mapping import Mapping
from openclean.data.store.mem import VolatileDataStore
Expand Down Expand Up @@ -69,8 +69,8 @@ def __init__(
def eval(
self, name: Optional[str] = None, namespace: Optional[str] = None,
label: Optional[str] = None, description: Optional[str] = None,
columns: Optional[int] = None, outputs: Optional[int] = None,
parameters: Optional[List[Parameter]] = None
columns: Optional[int] = None, collabels: Optional[Union[str, List[str]]] = None,
outputs: Optional[int] = None, parameters: Optional[List[Parameter]] = None
) -> Callable:
"""Decorator that adds a new function to the registered set of data
frame transformers.
Expand All @@ -93,6 +93,9 @@ def eval(
each column plus arguments for any additional parameter. The
column values will be the first arguments that are passed to the
registered function.
collabels: string or list of string, default=None
Display labels for the nput columns. If given the number of values
has to match the ``columns`` value.
outputs: int, default=None
Defines the number of scalar output values that the registered
function returns. By default it is assumed that the function will
Expand All @@ -118,6 +121,7 @@ def register_eval(func: Callable) -> Callable:
label=label,
description=description,
columns=columns,
collabels=collabels,
outputs=outputs,
parameters=parameters
)
Expand Down
20 changes: 17 additions & 3 deletions openclean/engine/object/function.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
"""

from flowserv.model.parameter.factory import ParameterDeserializer
from typing import Callable, Dict, List, Optional, Tuple
from typing import Callable, Dict, List, Optional, Tuple, Union

import dill

Expand Down Expand Up @@ -55,8 +55,8 @@ class FunctionHandle(ObjectHandle):
def __init__(
self, func: Callable, name: Optional[str] = None, namespace: Optional[str] = None,
label: Optional[str] = None, description: Optional[str] = None,
columns: Optional[int] = None, outputs: Optional[int] = None,
parameters: Optional[List[Parameter]] = None
columns: Optional[int] = None, collabels: Optional[Union[str, List[str]]] = None,
outputs: Optional[int] = None, parameters: Optional[List[Parameter]] = None
):
"""Initialize the object properties.
Expand All @@ -80,6 +80,9 @@ def __init__(
each column plus arguments for any additional parameter. The
column values will be the first arguments that are passed to the
registered function.
collabels: string or list of string, default=None
Display labels for the nput columns. If given the number of values
has to match the ``columns`` value.
outputs: int, default=None
Defines the number of scalar output values that the registered
function returns. By default it is assumed that the function will
Expand All @@ -99,6 +102,15 @@ def __init__(
self.columns = columns if columns is not None else 1
self.outputs = outputs if outputs is not None else 1
self.parameters = parameters if parameters is not None else list()
# Get the column labels from the function code descriptor (if possible).
if not collabels:
try:
collabels = list(func.__code__.co_varnames)[:self.columns]
except AttributeError:
pass
elif len(collabels) != self.columns:
raise ValueError('expected {} column labels'.format(self.columns))
self.collabels = collabels
# The function handle can be used as a substitue for the registered
# function. Use the function name as the name for the handle.
self.__name__ = func.__name__
Expand Down Expand Up @@ -134,6 +146,7 @@ def deserialize(self, descriptor: Dict, data: str) -> ObjectHandle:
label=descriptor.get('label'),
description=descriptor.get('description'),
columns=descriptor['columns'],
collabels=descriptor['columnLabels'],
outputs=descriptor['outputs'],
parameters=[
ParameterDeserializer.from_dict(obj) for obj in descriptor['parameters']
Expand All @@ -156,6 +169,7 @@ def serialize(self, object: ObjectHandle) -> Tuple[Dict, str]:
"""
descriptor = object.to_dict()
descriptor['columns'] = object.columns
descriptor['columnLabels'] = object.collabels
descriptor['outputs'] = object.outputs
descriptor['parameters'] = [p.to_dict() for p in object.parameters]
data = dill.dumps(object.func).decode(encoding=DEFAULT_ENCODING)
Expand Down
2 changes: 1 addition & 1 deletion openclean/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@

"""Version information for the openclean package."""

__version__ = '0.3.1'
__version__ = '0.3.2'
24 changes: 15 additions & 9 deletions tests/data/archive/test_cached_datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,30 @@ def test_cache_dataframe(dataset, store):
"""Test maintaining the last dataset in the cache."""
cached_store = CachedDatastore(datastore=store)
# -- First snapshot -------------------------------------------------------
df = cached_store.commit(df=dataset)
assert df.shape == (2, 3)
df_1 = cached_store.commit(df=dataset)
assert df_1.shape == (2, 3)
assert cached_store._cache is not None
assert cached_store._cache.df.shape == (2, 3)
assert cached_store._cache.version == 0
assert cached_store.last_version() == 0
df = cached_store.checkout()
assert df.shape == (2, 3)
df = df[df['A'] == 1]
df_1 = cached_store.checkout()
assert df_1.shape == (2, 3)
df_2 = df_1[df_1['A'] == 1]
# -- Second snapshot ------------------------------------------------------
df = cached_store.commit(df=df)
assert df.shape == (1, 3)
df_2 = cached_store.commit(df=df_2)
assert df_2.shape == (1, 3)
assert cached_store._cache.df.shape == (1, 3)
assert cached_store._cache.version == 1
assert len(cached_store.snapshots()) == 2
# -- Checkout first snapshot ----------------------------------------------
df = cached_store.checkout(version=0)
assert df.shape == (2, 3)
df_1 = cached_store.checkout(version=0)
assert df_1.shape == (2, 3)
assert cached_store._cache.df.shape == (2, 3)
assert cached_store._cache.version == 0
# -- Refresh cache --------------------------------------------------------
cached_store._cache.df = df_2
df_1 = cached_store.checkout(version=0, no_cache=True)
assert df_1.shape == (2, 3)
assert cached_store._cache.df.shape == (2, 3)
assert cached_store._cache.version == 0

Expand Down
18 changes: 18 additions & 0 deletions tests/engine/object/test_function_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

"""Unit tests for (de-)serialization of function handles."""

import pytest

from openclean.engine.object.function import FunctionHandle, FunctionFactory
from openclean.engine.object.function import Int

Expand All @@ -27,6 +29,7 @@ def test_serialize_function():
assert f.label is None
assert f.description is None
assert f.columns == 1
assert f.collabels == ['n']
assert f.outputs == 1
assert f.parameters == []
f.func(0.1)
Expand All @@ -38,6 +41,7 @@ def test_serialize_function():
label='My Name',
description='Just a test',
columns=2,
collabels=['a', 'b'],
outputs=3,
parameters=[Int('sleep')]
)
Expand All @@ -48,7 +52,21 @@ def test_serialize_function():
assert f.label == 'My Name'
assert f.description == 'Just a test'
assert f.columns == 2
assert f.collabels == ['a', 'b']
assert f.outputs == 3
assert len(f.parameters) == 1
assert f.parameters[0].is_int()
f.func(0.1)
# Error case
with pytest.raises(ValueError):
FunctionHandle(
func=my_func,
name='myname',
namespace='mynamespace',
label='My Name',
description='Just a test',
columns=2,
collabels=['a'],
outputs=3,
parameters=[Int('sleep')]
)

0 comments on commit da6c501

Please # to comment.