From 637b40136073a45adb01dfeb4bb5e77654855bf7 Mon Sep 17 00:00:00 2001 From: Heiko Mueller Date: Tue, 30 Mar 2021 16:12:47 -0400 Subject: [PATCH 1/2] Add optional column labels for registered functions --- openclean/engine/library.py | 10 +++++++--- openclean/engine/object/function.py | 20 +++++++++++++++++--- tests/engine/object/test_function_objects.py | 18 ++++++++++++++++++ 3 files changed, 42 insertions(+), 6 deletions(-) diff --git a/openclean/engine/library.py b/openclean/engine/library.py index cd6c2de..b1bee7f 100644 --- a/openclean/engine/library.py +++ b/openclean/engine/library.py @@ -18,7 +18,7 @@ """ from __future__ import annotations -from typing import Callable, Iterable, List, Optional +from typing import Callable, Iterable, List, Optional, Union from openclean.data.mapping import Mapping from openclean.data.store.mem import VolatileDataStore @@ -69,8 +69,8 @@ def __init__( def eval( self, name: Optional[str] = None, namespace: Optional[str] = None, label: Optional[str] = None, description: Optional[str] = None, - columns: Optional[int] = None, outputs: Optional[int] = None, - parameters: Optional[List[Parameter]] = None + columns: Optional[int] = None, collabels: Optional[Union[str, List[str]]] = None, + outputs: Optional[int] = None, parameters: Optional[List[Parameter]] = None ) -> Callable: """Decorator that adds a new function to the registered set of data frame transformers. @@ -93,6 +93,9 @@ def eval( each column plus arguments for any additional parameter. The column values will be the first arguments that are passed to the registered function. + collabels: string or list of string, default=None + Display labels for the nput columns. If given the number of values + has to match the ``columns`` value. outputs: int, default=None Defines the number of scalar output values that the registered function returns. By default it is assumed that the function will @@ -118,6 +121,7 @@ def register_eval(func: Callable) -> Callable: label=label, description=description, columns=columns, + collabels=collabels, outputs=outputs, parameters=parameters ) diff --git a/openclean/engine/object/function.py b/openclean/engine/object/function.py index 448d0f8..be2e2f1 100644 --- a/openclean/engine/object/function.py +++ b/openclean/engine/object/function.py @@ -26,7 +26,7 @@ """ from flowserv.model.parameter.factory import ParameterDeserializer -from typing import Callable, Dict, List, Optional, Tuple +from typing import Callable, Dict, List, Optional, Tuple, Union import dill @@ -55,8 +55,8 @@ class FunctionHandle(ObjectHandle): def __init__( self, func: Callable, name: Optional[str] = None, namespace: Optional[str] = None, label: Optional[str] = None, description: Optional[str] = None, - columns: Optional[int] = None, outputs: Optional[int] = None, - parameters: Optional[List[Parameter]] = None + columns: Optional[int] = None, collabels: Optional[Union[str, List[str]]] = None, + outputs: Optional[int] = None, parameters: Optional[List[Parameter]] = None ): """Initialize the object properties. @@ -80,6 +80,9 @@ def __init__( each column plus arguments for any additional parameter. The column values will be the first arguments that are passed to the registered function. + collabels: string or list of string, default=None + Display labels for the nput columns. If given the number of values + has to match the ``columns`` value. outputs: int, default=None Defines the number of scalar output values that the registered function returns. By default it is assumed that the function will @@ -99,6 +102,15 @@ def __init__( self.columns = columns if columns is not None else 1 self.outputs = outputs if outputs is not None else 1 self.parameters = parameters if parameters is not None else list() + # Get the column labels from the function code descriptor (if possible). + if not collabels: + try: + collabels = list(func.__code__.co_varnames)[:self.columns] + except AttributeError: + pass + elif len(collabels) != self.columns: + raise ValueError('expected {} column labels'.format(self.columns)) + self.collabels = collabels # The function handle can be used as a substitue for the registered # function. Use the function name as the name for the handle. self.__name__ = func.__name__ @@ -134,6 +146,7 @@ def deserialize(self, descriptor: Dict, data: str) -> ObjectHandle: label=descriptor.get('label'), description=descriptor.get('description'), columns=descriptor['columns'], + collabels=descriptor['columnLabels'], outputs=descriptor['outputs'], parameters=[ ParameterDeserializer.from_dict(obj) for obj in descriptor['parameters'] @@ -156,6 +169,7 @@ def serialize(self, object: ObjectHandle) -> Tuple[Dict, str]: """ descriptor = object.to_dict() descriptor['columns'] = object.columns + descriptor['columnLabels'] = object.collabels descriptor['outputs'] = object.outputs descriptor['parameters'] = [p.to_dict() for p in object.parameters] data = dill.dumps(object.func).decode(encoding=DEFAULT_ENCODING) diff --git a/tests/engine/object/test_function_objects.py b/tests/engine/object/test_function_objects.py index f617d00..37a19c2 100644 --- a/tests/engine/object/test_function_objects.py +++ b/tests/engine/object/test_function_objects.py @@ -7,6 +7,8 @@ """Unit tests for (de-)serialization of function handles.""" +import pytest + from openclean.engine.object.function import FunctionHandle, FunctionFactory from openclean.engine.object.function import Int @@ -27,6 +29,7 @@ def test_serialize_function(): assert f.label is None assert f.description is None assert f.columns == 1 + assert f.collabels == ['n'] assert f.outputs == 1 assert f.parameters == [] f.func(0.1) @@ -38,6 +41,7 @@ def test_serialize_function(): label='My Name', description='Just a test', columns=2, + collabels=['a', 'b'], outputs=3, parameters=[Int('sleep')] ) @@ -48,7 +52,21 @@ def test_serialize_function(): assert f.label == 'My Name' assert f.description == 'Just a test' assert f.columns == 2 + assert f.collabels == ['a', 'b'] assert f.outputs == 3 assert len(f.parameters) == 1 assert f.parameters[0].is_int() f.func(0.1) + # Error case + with pytest.raises(ValueError): + FunctionHandle( + func=my_func, + name='myname', + namespace='mynamespace', + label='My Name', + description='Just a test', + columns=2, + collabels=['a'], + outputs=3, + parameters=[Int('sleep')] + ) From 3ea320ab3ec510759bd7e7aa06401abd835a3e1e Mon Sep 17 00:00:00 2001 From: Heiko Mueller Date: Tue, 6 Apr 2021 16:23:28 -0400 Subject: [PATCH 2/2] Changes to the dataset archive (version 0.3.2) --- CHANGELOG.md | 6 +++++ openclean/data/archive/base.py | 14 +++++++++--- openclean/data/archive/cache.py | 25 ++++++++++++++++----- openclean/data/archive/histore.py | 20 +++++++++++++---- openclean/version.py | 2 +- tests/data/archive/test_cached_datastore.py | 24 ++++++++++++-------- 6 files changed, 69 insertions(+), 22 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1e65f98..990bca2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,3 +33,9 @@ ### 0.3.1 - 2021-03-30 * Add optional version parameter when requesting metadata for a dataset version in `openclean.engine.dataset.DatasetHandle`. + + +### 0.3.2 - 2021-04-06 + +* Make checking out a committed dataset in the `openclean.data.archive.base.ArchiveStore` optional. +* Enable cache refresh for cached datasets in `openclean.data.archive.cache.CachedDatastore`. diff --git a/openclean/data/archive/base.py b/openclean/data/archive/base.py index 8f1a21f..9f607f4 100644 --- a/openclean/data/archive/base.py +++ b/openclean/data/archive/base.py @@ -70,11 +70,13 @@ def checkout(self, version: Optional[int] = None) -> pd.DataFrame: raise NotImplementedError() # pragma: no cover @abstractmethod - def commit(self, df: pd.DataFrame, action: Optional[ActionHandle] = None) -> pd.DataFrame: + def commit( + self, df: pd.DataFrame, action: Optional[ActionHandle] = None, + checkout: Optional[bool] = False + ) -> pd.DataFrame: """Insert a new dataset snapshot. - Returns the inserted data frame (after potentially modifying the row - indexes) and the version identifier for the commited version. + Returns the inserted data frame with potentially modified row indexes. Parameters ---------- @@ -82,6 +84,12 @@ def commit(self, df: pd.DataFrame, action: Optional[ActionHandle] = None) -> pd. Data frame containing the new dataset version that is being stored. action: openclean.data.archive.base.ActionHandle, default=None Optional handle of the action that created the new dataset version. + checkout: bool, default=False + Checkout the commited snapshot and return the result. This option + is required only if the row index of the given data frame has been + modified by the commit operation, i.e., if the index of the given + data frame contained non-integers, negative values, or duplicate + values. Returns ------- diff --git a/openclean/data/archive/cache.py b/openclean/data/archive/cache.py index 2c8dd34..57a9cd2 100644 --- a/openclean/data/archive/cache.py +++ b/openclean/data/archive/cache.py @@ -45,7 +45,9 @@ def __init__(self, datastore: ArchiveStore): self.datastore = datastore self._cache = None - def checkout(self, version: Optional[int] = None) -> pd.DataFrame: + def checkout( + self, version: Optional[int] = None, no_cache: Optional[bool] = False + ) -> pd.DataFrame: """Get a specific version of a dataset. The dataset snapshot is identified by the unique version identifier. @@ -55,6 +57,9 @@ def checkout(self, version: Optional[int] = None) -> pd.DataFrame: ---------- version: int Unique dataset version identifier. + no_cache: bool, default=None + If True, ignore cached dataset version and checkout the dataset + from the associated data store. Returns ------- @@ -68,7 +73,7 @@ def checkout(self, version: Optional[int] = None) -> pd.DataFrame: if version is None: version = self.datastore.last_version() # Serve dataset from cache if present. - if self._cache is not None: + if self._cache is not None and not no_cache: # If the requested version matches the cached version return the # cached data frame. if version == self._cache.version: @@ -79,9 +84,13 @@ def checkout(self, version: Optional[int] = None) -> pd.DataFrame: self._cache = CacheEntry(df=df, version=version) return df - def commit(self, df: pd.DataFrame, action: Optional[ActionHandle] = None) -> pd.DataFrame: - """Insert a new version for a dataset. Returns the inserted data frame - (after potentially modifying the row indexes). + def commit( + self, df: pd.DataFrame, action: Optional[ActionHandle] = None, + checkout: Optional[bool] = False + ) -> pd.DataFrame: + """Insert a new version for a dataset. + + Returns the inserted data frame with potentially modified row indexes. Parameters ---------- @@ -89,6 +98,12 @@ def commit(self, df: pd.DataFrame, action: Optional[ActionHandle] = None) -> pd. Data frame containing the new dataset version that is being stored. action: openclean.data.archive.base.ActionHandle, default=None Optional handle of the action that created the new dataset version. + checkout: bool, default=False + Checkout the commited snapshot and return the result. This option + is required only if the row index of the given data frame has been + modified by the commit operation, i.e., if the index of the given + data frame contained non-integers, negative values, or duplicate + values. Returns ------- diff --git a/openclean/data/archive/histore.py b/openclean/data/archive/histore.py index 600eecc..2780b62 100644 --- a/openclean/data/archive/histore.py +++ b/openclean/data/archive/histore.py @@ -63,9 +63,15 @@ def checkout(self, version: Optional[int] = None) -> pd.DataFrame: """ return self.archive.checkout(version=version) - def commit(self, df: pd.DataFrame, action: Optional[ActionHandle] = None) -> pd.DataFrame: - """Insert a new version for a dataset. Returns the inserted data frame - (after potentially modifying the row indexes). + def commit( + self, df: pd.DataFrame, action: Optional[ActionHandle] = None, + checkout: Optional[bool] = False + ) -> pd.DataFrame: + """Insert a new version for a dataset. + + Returns the inserted data frame. If the ``checkout`` flag is True the + commited dataset is checked out to account for possible changes to the + row index. If the flag is set to False the given data frame is returned. Parameters ---------- @@ -73,6 +79,12 @@ def commit(self, df: pd.DataFrame, action: Optional[ActionHandle] = None) -> pd. Data frame containing the new dataset version that is being stored. action: openclean.data.archive.base.ActionHandle, default=None Optional handle of the action that created the new dataset version. + checkout: bool, default=False + Checkout the commited snapshot and return the result. This option + is required only if the row index of the given data frame has been + modified by the commit operation, i.e., if the index of the given + data frame contained non-integers, negative values, or duplicate + values. Returns ------- @@ -82,7 +94,7 @@ def commit(self, df: pd.DataFrame, action: Optional[ActionHandle] = None) -> pd. doc=df, action=action.to_dict() if action is not None else None ) - return self.archive.checkout(version=self._last_snapshot.version) + return self.archive.checkout(version=self._last_snapshot.version) if checkout else df def last_version(self) -> int: """Get a identifier for the last version of a dataset. diff --git a/openclean/version.py b/openclean/version.py index 86bb8b3..33de770 100644 --- a/openclean/version.py +++ b/openclean/version.py @@ -7,4 +7,4 @@ """Version information for the openclean package.""" -__version__ = '0.3.1' +__version__ = '0.3.2' diff --git a/tests/data/archive/test_cached_datastore.py b/tests/data/archive/test_cached_datastore.py index f56342f..d7391d4 100644 --- a/tests/data/archive/test_cached_datastore.py +++ b/tests/data/archive/test_cached_datastore.py @@ -24,24 +24,30 @@ def test_cache_dataframe(dataset, store): """Test maintaining the last dataset in the cache.""" cached_store = CachedDatastore(datastore=store) # -- First snapshot ------------------------------------------------------- - df = cached_store.commit(df=dataset) - assert df.shape == (2, 3) + df_1 = cached_store.commit(df=dataset) + assert df_1.shape == (2, 3) assert cached_store._cache is not None assert cached_store._cache.df.shape == (2, 3) assert cached_store._cache.version == 0 assert cached_store.last_version() == 0 - df = cached_store.checkout() - assert df.shape == (2, 3) - df = df[df['A'] == 1] + df_1 = cached_store.checkout() + assert df_1.shape == (2, 3) + df_2 = df_1[df_1['A'] == 1] # -- Second snapshot ------------------------------------------------------ - df = cached_store.commit(df=df) - assert df.shape == (1, 3) + df_2 = cached_store.commit(df=df_2) + assert df_2.shape == (1, 3) assert cached_store._cache.df.shape == (1, 3) assert cached_store._cache.version == 1 assert len(cached_store.snapshots()) == 2 # -- Checkout first snapshot ---------------------------------------------- - df = cached_store.checkout(version=0) - assert df.shape == (2, 3) + df_1 = cached_store.checkout(version=0) + assert df_1.shape == (2, 3) + assert cached_store._cache.df.shape == (2, 3) + assert cached_store._cache.version == 0 + # -- Refresh cache -------------------------------------------------------- + cached_store._cache.df = df_2 + df_1 = cached_store.checkout(version=0, no_cache=True) + assert df_1.shape == (2, 3) assert cached_store._cache.df.shape == (2, 3) assert cached_store._cache.version == 0