Skip to content

Commit

Permalink
CR updates
Browse files Browse the repository at this point in the history
Signed-off-by: Achal Shah <achals@gmail.com>
  • Loading branch information
achals committed Dec 16, 2021
1 parent 40f6292 commit cf8ce45
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 134 deletions.
223 changes: 111 additions & 112 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,18 @@
from collections import Counter, OrderedDict, defaultdict
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Union, cast
from typing import (
Any,
Dict,
Iterable,
List,
NamedTuple,
Optional,
Set,
Tuple,
Union,
cast,
)

import pandas as pd
from colorama import Fore, Style
Expand Down Expand Up @@ -68,6 +79,31 @@
warnings.simplefilter("once", DeprecationWarning)


class RepoContents(NamedTuple):
feature_views: Set[FeatureView]
on_demand_feature_views: Set[OnDemandFeatureView]
request_feature_views: Set[RequestFeatureView]
entities: Set[Entity]
feature_services: Set[FeatureService]

def to_registry_proto(self) -> RegistryProto:
registry_proto = RegistryProto()
registry_proto.entities.extend([e.to_proto() for e in self.entities])
registry_proto.feature_views.extend(
[fv.to_proto() for fv in self.feature_views]
)
registry_proto.on_demand_feature_views.extend(
[fv.to_proto() for fv in self.on_demand_feature_views]
)
registry_proto.request_feature_views.extend(
[fv.to_proto() for fv in self.request_feature_views]
)
registry_proto.feature_services.extend(
[fs.to_proto() for fs in self.feature_services]
)
return registry_proto


class FeatureStore:
"""
A FeatureStore object is used to define, create, and retrieve features.
Expand Down Expand Up @@ -360,7 +396,56 @@ def _get_features(self, features: Union[List[str], FeatureService],) -> List[str
return _feature_refs

@log_exceptions_and_usage
def plan(
def plan(self, desired_repo_objects: RepoContents) -> RegistryDiff:
"""Dry-run registering objects to metadata store.
The plan method dry-runs registering one or more definitions (e.g., Entity, FeatureView), and produces
a list of all the changes the that would be introduced in the feature repo. The changes computed by the plan
command are for informational purpose, and are not actually applied to the registry.
Args:
objects: A single object, or a list of objects that are intended to be registered with the Feature Store.
objects_to_delete: A list of objects to be deleted from the registry.
partial: If True, apply will only handle the specified objects; if False, apply will also delete
all the objects in objects_to_delete.
Raises:
ValueError: The 'objects' parameter could not be parsed properly.
Examples:
Generate a plan adding an Entity and a FeatureView.
>>> from feast import FeatureStore, Entity, FeatureView, Feature, ValueType, FileSource, RepoConfig
>>> from feast.feature_store import RepoContents
>>> from datetime import timedelta
>>> fs = FeatureStore(repo_path="feature_repo")
>>> driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id")
>>> driver_hourly_stats = FileSource(
... path="feature_repo/data/driver_stats.parquet",
... event_timestamp_column="event_timestamp",
... created_timestamp_column="created",
... )
>>> driver_hourly_stats_view = FeatureView(
... name="driver_hourly_stats",
... entities=["driver_id"],
... ttl=timedelta(seconds=86400 * 1),
... batch_source=driver_hourly_stats,
... )
>>> diff = fs.plan(RepoContents({driver_hourly_stats_view}, set(), set(), {driver}, set())) # register entity and feature view
"""

current_registry_proto = (
self._registry.cached_registry_proto.__deepcopy__()
if self._registry.cached_registry_proto
else RegistryProto()
)

desired_registry_proto = desired_repo_objects.to_registry_proto()
diffs = Registry.diff_between(current_registry_proto, desired_registry_proto)
return diffs

@log_exceptions_and_usage
def apply(
self,
objects: Union[
Entity,
Expand Down Expand Up @@ -390,24 +475,26 @@ def plan(
]
] = None,
partial: bool = True,
) -> Tuple[Registry, RegistryDiff]:
"""Dry-run registering objects to metadata store.
) -> RegistryDiff:
"""Register objects to metadata store and update related infrastructure.
The plan method dry-runs registering one or more definitions (e.g., Entity, FeatureView), and produces
a list of all the changes the that would be introduced in the feature repo. The changes computed by the plan
command are for informational purpose, and are not actually applied to the registry.
The apply method registers one or more definitions (e.g., Entity, FeatureView) and registers or updates these
objects in the Feast registry. Once the apply method has updated the infrastructure (e.g., create tables in
an online store), it will commit the updated registry. All operations are idempotent, meaning they can safely
be rerun.
Args:
objects: A single object, or a list of objects that are intended to be registered with the Feature Store.
objects_to_delete: A list of objects to be deleted from the registry.
objects: A single object, or a list of objects that should be registered with the Feature Store.
objects_to_delete: A list of objects to be deleted from the registry and removed from the
provider's infrastructure. This deletion will only be performed if partial is set to False.
partial: If True, apply will only handle the specified objects; if False, apply will also delete
all the objects in objects_to_delete.
all the objects in objects_to_delete, and tear down any associated cloud resources.
Raises:
ValueError: The 'objects' parameter could not be parsed properly.
Examples:
Generate a plan adding an Entity and a FeatureView.
Register an Entity and a FeatureView.
>>> from feast import FeatureStore, Entity, FeatureView, Feature, ValueType, FileSource, RepoConfig
>>> from datetime import timedelta
Expand All @@ -424,12 +511,11 @@ def plan(
... ttl=timedelta(seconds=86400 * 1),
... batch_source=driver_hourly_stats,
... )
>>> registry, diff = fs.plan([driver_hourly_stats_view, driver]) # register entity and feature view
>>> diff = fs.apply([driver_hourly_stats_view, driver]) # register entity and feature view
"""

# TODO: Add locking
if not isinstance(objects, Iterable):
objects = [objects]

assert isinstance(objects, list)

if not objects_to_delete:
Expand All @@ -440,7 +526,6 @@ def plan(
if self._registry.cached_registry_proto
else RegistryProto()
)
new_registry = self._registry.clone()

# Separate all objects into entities, feature services, and different feature view types.
entities_to_update = [ob for ob in objects if isinstance(ob, Entity)]
Expand Down Expand Up @@ -492,11 +577,11 @@ def plan(
for view in itertools.chain(
views_to_update, odfvs_to_update, request_views_to_update
):
new_registry.apply_feature_view(view, project=self.project, commit=False)
self._registry.apply_feature_view(view, project=self.project, commit=False)
for ent in entities_to_update:
new_registry.apply_entity(ent, project=self.project, commit=False)
self._registry.apply_entity(ent, project=self.project, commit=False)
for feature_service in services_to_update:
new_registry.apply_feature_service(
self._registry.apply_feature_service(
feature_service, project=self.project, commit=False
)

Expand All @@ -519,119 +604,33 @@ def plan(
]

for entity in entities_to_delete:
new_registry.delete_entity(
self._registry.delete_entity(
entity.name, project=self.project, commit=False
)
for view in views_to_delete:
new_registry.delete_feature_view(
self._registry.delete_feature_view(
view.name, project=self.project, commit=False
)
for request_view in request_views_to_delete:
new_registry.delete_feature_view(
self._registry.delete_feature_view(
request_view.name, project=self.project, commit=False
)
for odfv in odfvs_to_delete:
new_registry.delete_feature_view(
self._registry.delete_feature_view(
odfv.name, project=self.project, commit=False
)
for service in services_to_delete:
new_registry.delete_feature_service(
self._registry.delete_feature_service(
service.name, project=self.project, commit=False
)

new_registry_proto = (
new_registry.cached_registry_proto
if new_registry.cached_registry_proto
self._registry.cached_registry_proto
if self._registry.cached_registry_proto
else RegistryProto()
)

return (
new_registry,
Registry.diff_between(current_registry_proto, new_registry_proto),
)

@log_exceptions_and_usage
def apply(
self,
objects: Union[
Entity,
FeatureView,
OnDemandFeatureView,
RequestFeatureView,
FeatureService,
List[
Union[
FeatureView,
OnDemandFeatureView,
RequestFeatureView,
Entity,
FeatureService,
]
],
],
objects_to_delete: Optional[
List[
Union[
FeatureView,
OnDemandFeatureView,
RequestFeatureView,
Entity,
FeatureService,
]
]
] = None,
partial: bool = True,
):
"""Register objects to metadata store and update related infrastructure.
The apply method registers one or more definitions (e.g., Entity, FeatureView) and registers or updates these
objects in the Feast registry. Once the apply method has updated the infrastructure (e.g., create tables in
an online store), it will commit the updated registry. All operations are idempotent, meaning they can safely
be rerun.
Args:
objects: A single object, or a list of objects that should be registered with the Feature Store.
objects_to_delete: A list of objects to be deleted from the registry and removed from the
provider's infrastructure. This deletion will only be performed if partial is set to False.
partial: If True, apply will only handle the specified objects; if False, apply will also delete
all the objects in objects_to_delete, and tear down any associated cloud resources.
Raises:
ValueError: The 'objects' parameter could not be parsed properly.
Examples:
Register an Entity and a FeatureView.
>>> from feast import FeatureStore, Entity, FeatureView, Feature, ValueType, FileSource, RepoConfig
>>> from datetime import timedelta
>>> fs = FeatureStore(repo_path="feature_repo")
>>> driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id")
>>> driver_hourly_stats = FileSource(
... path="feature_repo/data/driver_stats.parquet",
... event_timestamp_column="event_timestamp",
... created_timestamp_column="created",
... )
>>> driver_hourly_stats_view = FeatureView(
... name="driver_hourly_stats",
... entities=["driver_id"],
... ttl=timedelta(seconds=86400 * 1),
... batch_source=driver_hourly_stats,
... )
>>> diff = fs.apply([driver_hourly_stats_view, driver]) # register entity and feature view
"""
# TODO: Add locking
if not isinstance(objects, Iterable):
objects = [objects]

if not objects_to_delete:
objects_to_delete = []

new_registry, diffs = self.plan(objects, objects_to_delete, partial)
new_registry.cached_registry_proto_ttl = (
self._registry.cached_registry_proto_ttl
)
new_registry._registry_store = self._registry._registry_store
self._registry = new_registry
diffs = Registry.diff_between(current_registry_proto, new_registry_proto)

entities_to_update = [ob for ob in objects if isinstance(ob, Entity)]
views_to_update = [ob for ob in objects if isinstance(ob, FeatureView)]
Expand Down
38 changes: 16 additions & 22 deletions sdk/python/feast/repo_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import sys
from importlib.abc import Loader
from pathlib import Path
from typing import List, NamedTuple, Set, Union, cast
from typing import List, Set, Union, cast

import click
from click.exceptions import BadParameter
Expand All @@ -15,7 +15,7 @@
from feast.diff.FcoDiff import TransitionType, tag_objects_for_keep_delete_add
from feast.entity import Entity
from feast.feature_service import FeatureService
from feast.feature_store import FeatureStore
from feast.feature_store import FeatureStore, RepoContents
from feast.feature_view import DUMMY_ENTITY, DUMMY_ENTITY_NAME, FeatureView
from feast.names import adjectives, animals
from feast.on_demand_feature_view import OnDemandFeatureView
Expand All @@ -33,14 +33,6 @@ def py_path_to_module(path: Path, repo_root: Path) -> str:
)


class ParsedRepo(NamedTuple):
feature_views: Set[FeatureView]
on_demand_feature_views: Set[OnDemandFeatureView]
request_feature_views: Set[RequestFeatureView]
entities: Set[Entity]
feature_services: Set[FeatureService]


def read_feastignore(repo_root: Path) -> List[str]:
"""Read .feastignore in the repo root directory (if exists) and return the list of user-defined ignore paths"""
feast_ignore = repo_root / ".feastignore"
Expand Down Expand Up @@ -94,9 +86,9 @@ def get_repo_files(repo_root: Path) -> List[Path]:
return sorted(repo_files)


def parse_repo(repo_root: Path) -> ParsedRepo:
def parse_repo(repo_root: Path) -> RepoContents:
""" Collect feature table definitions from feature repo """
res = ParsedRepo(
res = RepoContents(
entities=set(),
feature_views=set(),
feature_services=set(),
Expand Down Expand Up @@ -135,16 +127,18 @@ def plan(repo_config: RepoConfig, repo_path: Path, skip_source_validation: bool)
for data_source in data_sources:
data_source.validate(store.config)

# For each object in the registry, determine whether it should be kept or deleted,
# and whether new objects need to be added.
(
all_to_apply,
all_to_delete,
views_to_delete,
views_to_keep,
) = extract_objects_for_apply_delete(project, registry, repo)

_, diff = store.plan(all_to_apply, objects_to_delete=all_to_delete, partial=False)
diff = store.plan(repo)
views_to_delete = [
v
for v in diff.fco_diffs
if v.fco_type == "feature view" and v.transition_type == TransitionType.DELETE
]
views_to_keep = [
v
for v in diff.fco_diffs
if v.fco_type == "feature view"
and v.transition_type in {TransitionType.CREATE, TransitionType.UNCHANGED}
]

log_cli_output(diff, views_to_delete, views_to_keep)

Expand Down

0 comments on commit cf8ce45

Please # to comment.