From cf8ce45e43e9fa794d25e8c520339e13014b4285 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Thu, 16 Dec 2021 00:05:12 -0800 Subject: [PATCH] CR updates Signed-off-by: Achal Shah --- sdk/python/feast/feature_store.py | 223 ++++++++++++++-------------- sdk/python/feast/repo_operations.py | 38 ++--- 2 files changed, 127 insertions(+), 134 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 7b648b9f1e1..b0b99061e98 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -18,7 +18,18 @@ from collections import Counter, OrderedDict, defaultdict from datetime import datetime from pathlib import Path -from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Union, cast +from typing import ( + Any, + Dict, + Iterable, + List, + NamedTuple, + Optional, + Set, + Tuple, + Union, + cast, +) import pandas as pd from colorama import Fore, Style @@ -68,6 +79,31 @@ warnings.simplefilter("once", DeprecationWarning) +class RepoContents(NamedTuple): + feature_views: Set[FeatureView] + on_demand_feature_views: Set[OnDemandFeatureView] + request_feature_views: Set[RequestFeatureView] + entities: Set[Entity] + feature_services: Set[FeatureService] + + def to_registry_proto(self) -> RegistryProto: + registry_proto = RegistryProto() + registry_proto.entities.extend([e.to_proto() for e in self.entities]) + registry_proto.feature_views.extend( + [fv.to_proto() for fv in self.feature_views] + ) + registry_proto.on_demand_feature_views.extend( + [fv.to_proto() for fv in self.on_demand_feature_views] + ) + registry_proto.request_feature_views.extend( + [fv.to_proto() for fv in self.request_feature_views] + ) + registry_proto.feature_services.extend( + [fs.to_proto() for fs in self.feature_services] + ) + return registry_proto + + class FeatureStore: """ A FeatureStore object is used to define, create, and retrieve features. @@ -360,7 +396,56 @@ def _get_features(self, features: Union[List[str], FeatureService],) -> List[str return _feature_refs @log_exceptions_and_usage - def plan( + def plan(self, desired_repo_objects: RepoContents) -> RegistryDiff: + """Dry-run registering objects to metadata store. + + The plan method dry-runs registering one or more definitions (e.g., Entity, FeatureView), and produces + a list of all the changes the that would be introduced in the feature repo. The changes computed by the plan + command are for informational purpose, and are not actually applied to the registry. + + Args: + objects: A single object, or a list of objects that are intended to be registered with the Feature Store. + objects_to_delete: A list of objects to be deleted from the registry. + partial: If True, apply will only handle the specified objects; if False, apply will also delete + all the objects in objects_to_delete. + + Raises: + ValueError: The 'objects' parameter could not be parsed properly. + + Examples: + Generate a plan adding an Entity and a FeatureView. + + >>> from feast import FeatureStore, Entity, FeatureView, Feature, ValueType, FileSource, RepoConfig + >>> from feast.feature_store import RepoContents + >>> from datetime import timedelta + >>> fs = FeatureStore(repo_path="feature_repo") + >>> driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id") + >>> driver_hourly_stats = FileSource( + ... path="feature_repo/data/driver_stats.parquet", + ... event_timestamp_column="event_timestamp", + ... created_timestamp_column="created", + ... ) + >>> driver_hourly_stats_view = FeatureView( + ... name="driver_hourly_stats", + ... entities=["driver_id"], + ... ttl=timedelta(seconds=86400 * 1), + ... batch_source=driver_hourly_stats, + ... ) + >>> diff = fs.plan(RepoContents({driver_hourly_stats_view}, set(), set(), {driver}, set())) # register entity and feature view + """ + + current_registry_proto = ( + self._registry.cached_registry_proto.__deepcopy__() + if self._registry.cached_registry_proto + else RegistryProto() + ) + + desired_registry_proto = desired_repo_objects.to_registry_proto() + diffs = Registry.diff_between(current_registry_proto, desired_registry_proto) + return diffs + + @log_exceptions_and_usage + def apply( self, objects: Union[ Entity, @@ -390,24 +475,26 @@ def plan( ] ] = None, partial: bool = True, - ) -> Tuple[Registry, RegistryDiff]: - """Dry-run registering objects to metadata store. + ) -> RegistryDiff: + """Register objects to metadata store and update related infrastructure. - The plan method dry-runs registering one or more definitions (e.g., Entity, FeatureView), and produces - a list of all the changes the that would be introduced in the feature repo. The changes computed by the plan - command are for informational purpose, and are not actually applied to the registry. + The apply method registers one or more definitions (e.g., Entity, FeatureView) and registers or updates these + objects in the Feast registry. Once the apply method has updated the infrastructure (e.g., create tables in + an online store), it will commit the updated registry. All operations are idempotent, meaning they can safely + be rerun. Args: - objects: A single object, or a list of objects that are intended to be registered with the Feature Store. - objects_to_delete: A list of objects to be deleted from the registry. + objects: A single object, or a list of objects that should be registered with the Feature Store. + objects_to_delete: A list of objects to be deleted from the registry and removed from the + provider's infrastructure. This deletion will only be performed if partial is set to False. partial: If True, apply will only handle the specified objects; if False, apply will also delete - all the objects in objects_to_delete. + all the objects in objects_to_delete, and tear down any associated cloud resources. Raises: ValueError: The 'objects' parameter could not be parsed properly. Examples: - Generate a plan adding an Entity and a FeatureView. + Register an Entity and a FeatureView. >>> from feast import FeatureStore, Entity, FeatureView, Feature, ValueType, FileSource, RepoConfig >>> from datetime import timedelta @@ -424,12 +511,11 @@ def plan( ... ttl=timedelta(seconds=86400 * 1), ... batch_source=driver_hourly_stats, ... ) - >>> registry, diff = fs.plan([driver_hourly_stats_view, driver]) # register entity and feature view + >>> diff = fs.apply([driver_hourly_stats_view, driver]) # register entity and feature view """ - + # TODO: Add locking if not isinstance(objects, Iterable): objects = [objects] - assert isinstance(objects, list) if not objects_to_delete: @@ -440,7 +526,6 @@ def plan( if self._registry.cached_registry_proto else RegistryProto() ) - new_registry = self._registry.clone() # Separate all objects into entities, feature services, and different feature view types. entities_to_update = [ob for ob in objects if isinstance(ob, Entity)] @@ -492,11 +577,11 @@ def plan( for view in itertools.chain( views_to_update, odfvs_to_update, request_views_to_update ): - new_registry.apply_feature_view(view, project=self.project, commit=False) + self._registry.apply_feature_view(view, project=self.project, commit=False) for ent in entities_to_update: - new_registry.apply_entity(ent, project=self.project, commit=False) + self._registry.apply_entity(ent, project=self.project, commit=False) for feature_service in services_to_update: - new_registry.apply_feature_service( + self._registry.apply_feature_service( feature_service, project=self.project, commit=False ) @@ -519,119 +604,33 @@ def plan( ] for entity in entities_to_delete: - new_registry.delete_entity( + self._registry.delete_entity( entity.name, project=self.project, commit=False ) for view in views_to_delete: - new_registry.delete_feature_view( + self._registry.delete_feature_view( view.name, project=self.project, commit=False ) for request_view in request_views_to_delete: - new_registry.delete_feature_view( + self._registry.delete_feature_view( request_view.name, project=self.project, commit=False ) for odfv in odfvs_to_delete: - new_registry.delete_feature_view( + self._registry.delete_feature_view( odfv.name, project=self.project, commit=False ) for service in services_to_delete: - new_registry.delete_feature_service( + self._registry.delete_feature_service( service.name, project=self.project, commit=False ) new_registry_proto = ( - new_registry.cached_registry_proto - if new_registry.cached_registry_proto + self._registry.cached_registry_proto + if self._registry.cached_registry_proto else RegistryProto() ) - return ( - new_registry, - Registry.diff_between(current_registry_proto, new_registry_proto), - ) - - @log_exceptions_and_usage - def apply( - self, - objects: Union[ - Entity, - FeatureView, - OnDemandFeatureView, - RequestFeatureView, - FeatureService, - List[ - Union[ - FeatureView, - OnDemandFeatureView, - RequestFeatureView, - Entity, - FeatureService, - ] - ], - ], - objects_to_delete: Optional[ - List[ - Union[ - FeatureView, - OnDemandFeatureView, - RequestFeatureView, - Entity, - FeatureService, - ] - ] - ] = None, - partial: bool = True, - ): - """Register objects to metadata store and update related infrastructure. - - The apply method registers one or more definitions (e.g., Entity, FeatureView) and registers or updates these - objects in the Feast registry. Once the apply method has updated the infrastructure (e.g., create tables in - an online store), it will commit the updated registry. All operations are idempotent, meaning they can safely - be rerun. - - Args: - objects: A single object, or a list of objects that should be registered with the Feature Store. - objects_to_delete: A list of objects to be deleted from the registry and removed from the - provider's infrastructure. This deletion will only be performed if partial is set to False. - partial: If True, apply will only handle the specified objects; if False, apply will also delete - all the objects in objects_to_delete, and tear down any associated cloud resources. - - Raises: - ValueError: The 'objects' parameter could not be parsed properly. - - Examples: - Register an Entity and a FeatureView. - - >>> from feast import FeatureStore, Entity, FeatureView, Feature, ValueType, FileSource, RepoConfig - >>> from datetime import timedelta - >>> fs = FeatureStore(repo_path="feature_repo") - >>> driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id") - >>> driver_hourly_stats = FileSource( - ... path="feature_repo/data/driver_stats.parquet", - ... event_timestamp_column="event_timestamp", - ... created_timestamp_column="created", - ... ) - >>> driver_hourly_stats_view = FeatureView( - ... name="driver_hourly_stats", - ... entities=["driver_id"], - ... ttl=timedelta(seconds=86400 * 1), - ... batch_source=driver_hourly_stats, - ... ) - >>> diff = fs.apply([driver_hourly_stats_view, driver]) # register entity and feature view - """ - # TODO: Add locking - if not isinstance(objects, Iterable): - objects = [objects] - - if not objects_to_delete: - objects_to_delete = [] - - new_registry, diffs = self.plan(objects, objects_to_delete, partial) - new_registry.cached_registry_proto_ttl = ( - self._registry.cached_registry_proto_ttl - ) - new_registry._registry_store = self._registry._registry_store - self._registry = new_registry + diffs = Registry.diff_between(current_registry_proto, new_registry_proto) entities_to_update = [ob for ob in objects if isinstance(ob, Entity)] views_to_update = [ob for ob in objects if isinstance(ob, FeatureView)] diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index a05032ba088..86de6d1958a 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -6,7 +6,7 @@ import sys from importlib.abc import Loader from pathlib import Path -from typing import List, NamedTuple, Set, Union, cast +from typing import List, Set, Union, cast import click from click.exceptions import BadParameter @@ -15,7 +15,7 @@ from feast.diff.FcoDiff import TransitionType, tag_objects_for_keep_delete_add from feast.entity import Entity from feast.feature_service import FeatureService -from feast.feature_store import FeatureStore +from feast.feature_store import FeatureStore, RepoContents from feast.feature_view import DUMMY_ENTITY, DUMMY_ENTITY_NAME, FeatureView from feast.names import adjectives, animals from feast.on_demand_feature_view import OnDemandFeatureView @@ -33,14 +33,6 @@ def py_path_to_module(path: Path, repo_root: Path) -> str: ) -class ParsedRepo(NamedTuple): - feature_views: Set[FeatureView] - on_demand_feature_views: Set[OnDemandFeatureView] - request_feature_views: Set[RequestFeatureView] - entities: Set[Entity] - feature_services: Set[FeatureService] - - def read_feastignore(repo_root: Path) -> List[str]: """Read .feastignore in the repo root directory (if exists) and return the list of user-defined ignore paths""" feast_ignore = repo_root / ".feastignore" @@ -94,9 +86,9 @@ def get_repo_files(repo_root: Path) -> List[Path]: return sorted(repo_files) -def parse_repo(repo_root: Path) -> ParsedRepo: +def parse_repo(repo_root: Path) -> RepoContents: """ Collect feature table definitions from feature repo """ - res = ParsedRepo( + res = RepoContents( entities=set(), feature_views=set(), feature_services=set(), @@ -135,16 +127,18 @@ def plan(repo_config: RepoConfig, repo_path: Path, skip_source_validation: bool) for data_source in data_sources: data_source.validate(store.config) - # For each object in the registry, determine whether it should be kept or deleted, - # and whether new objects need to be added. - ( - all_to_apply, - all_to_delete, - views_to_delete, - views_to_keep, - ) = extract_objects_for_apply_delete(project, registry, repo) - - _, diff = store.plan(all_to_apply, objects_to_delete=all_to_delete, partial=False) + diff = store.plan(repo) + views_to_delete = [ + v + for v in diff.fco_diffs + if v.fco_type == "feature view" and v.transition_type == TransitionType.DELETE + ] + views_to_keep = [ + v + for v in diff.fco_diffs + if v.fco_type == "feature view" + and v.transition_type in {TransitionType.CREATE, TransitionType.UNCHANGED} + ] log_cli_output(diff, views_to_delete, views_to_keep)