Skip to content

Commit

Permalink
Made simple feature names default on data retrieval, provides option …
Browse files Browse the repository at this point in the history
…for names prefixed with featureviews

Signed-off-by: Mwad22 <51929507+Mwad22@users.noreply.github.com>
  • Loading branch information
Mwad22 committed Jun 12, 2021
1 parent 68cacd9 commit 3c08212
Show file tree
Hide file tree
Showing 15 changed files with 316 additions and 34 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ training_df = store.get_historical_features(
'driver_hourly_stats:acc_rate',
'driver_hourly_stats:avg_daily_trips'
],
full_feature_names=True
).to_df()

print(training_df.head())
Expand Down
3 changes: 2 additions & 1 deletion docs/quickstart.md
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,8 @@ feature_vector = store.get_online_features(
'driver_hourly_stats:acc_rate',
'driver_hourly_stats:avg_daily_trips'
],
entity_rows=[{"driver_id": 1001}]
entity_rows=[{"driver_id": 1001}],
full_feature_names=True
).to_dict()
pprint(feature_vector)
Expand Down
7 changes: 7 additions & 0 deletions sdk/python/feast/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@ def __init__(self, offline_store_name: str, data_source_name: str):
)


class FeatureNameCollisionError(Exception):
def __init__(self, feature_name_collisions: str):
super().__init__(
f"The following feature name(s) have collisions: {feature_name_collisions}. Set 'feature_names_only' argument in the data retrieval function to False to use the full feature name which is prefixed by the feature view name."
)


class FeastOnlineStoreUnsupportedDataSource(Exception):
def __init__(self, online_store_name: str, data_source_name: str):
super().__init__(
Expand Down
56 changes: 46 additions & 10 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@

from feast import utils
from feast.entity import Entity
from feast.errors import FeastProviderLoginError, FeatureViewNotFoundException
from feast.errors import (
FeastProviderLoginError,
FeatureNameCollisionError,
FeatureViewNotFoundException,
)
from feast.feature_view import FeatureView
from feast.inference import infer_entity_value_type_from_feature_views
from feast.infra.provider import Provider, RetrievalJob, get_provider
Expand Down Expand Up @@ -244,7 +248,10 @@ def apply(

@log_exceptions_and_usage
def get_historical_features(
self, entity_df: Union[pd.DataFrame, str], feature_refs: List[str],
self,
entity_df: Union[pd.DataFrame, str],
feature_refs: List[str],
full_feature_names: bool = False,
) -> RetrievalJob:
"""Enrich an entity dataframe with historical feature values for either training or batch scoring.
Expand All @@ -266,6 +273,10 @@ def get_historical_features(
SQL query. The query must be of a format supported by the configured offline store (e.g., BigQuery)
feature_refs: A list of features that should be retrieved from the offline store. Feature references are of
the format "feature_view:feature", e.g., "customer_fv:daily_transactions".
full_feature_names: By default, this value is set to False. This strips the feature view prefixes from the data
and returns only the feature name, changing them from the format "feature_view__feature" to "feature"
(e.g., "customer_fv__daily_transactions" changes to "daily_transactions"). Set the value to True for
the feature names to be prefixed by the feature view name in the format "feature_view__feature".
Returns:
RetrievalJob which can be used to materialize the results.
Expand All @@ -278,12 +289,12 @@ def get_historical_features(
>>> fs = FeatureStore(config=RepoConfig(provider="gcp"))
>>> retrieval_job = fs.get_historical_features(
>>> entity_df="SELECT event_timestamp, order_id, customer_id from gcp_project.my_ds.customer_orders",
>>> feature_refs=["customer:age", "customer:avg_orders_1d", "customer:avg_orders_7d"]
>>> )
>>> feature_refs=["customer:age", "customer:avg_orders_1d", "customer:avg_orders_7d"],
>>> full_feature_names=False
>>> )
>>> feature_data = retrieval_job.to_df()
>>> model.fit(feature_data) # insert your modeling framework here.
"""

all_feature_views = self._registry.list_feature_views(project=self.project)
try:
feature_views = _get_requested_feature_views(
Expand All @@ -301,6 +312,7 @@ def get_historical_features(
entity_df,
self._registry,
self.project,
full_feature_names,
)
except FeastProviderLoginError as e:
sys.exit(e)
Expand Down Expand Up @@ -467,7 +479,10 @@ def tqdm_builder(length):

@log_exceptions_and_usage
def get_online_features(
self, feature_refs: List[str], entity_rows: List[Dict[str, Any]],
self,
feature_refs: List[str],
entity_rows: List[Dict[str, Any]],
full_feature_names: bool = False,
) -> OnlineResponse:
"""
Retrieves the latest online feature data.
Expand Down Expand Up @@ -535,7 +550,7 @@ def get_online_features(
project=self.project, allow_cache=True
)

grouped_refs = _group_refs(feature_refs, all_feature_views)
grouped_refs = _group_refs(feature_refs, all_feature_views, full_feature_names)
for table, requested_features in grouped_refs:
entity_keys = _get_table_entity_keys(
table, union_of_entity_keys, entity_name_to_join_key_map
Expand All @@ -552,13 +567,21 @@ def get_online_features(

if feature_data is None:
for feature_name in requested_features:
feature_ref = f"{table.name}__{feature_name}"
feature_ref = (
f"{table.name}__{feature_name}"
if full_feature_names
else feature_name
)
result_row.statuses[
feature_ref
] = GetOnlineFeaturesResponse.FieldStatus.NOT_FOUND
else:
for feature_name in feature_data:
feature_ref = f"{table.name}__{feature_name}"
feature_ref = (
f"{table.name}__{feature_name}"
if full_feature_names
else feature_name
)
if feature_name in requested_features:
result_row.fields[feature_ref].CopyFrom(
feature_data[feature_name]
Expand Down Expand Up @@ -587,7 +610,9 @@ def _entity_row_to_field_values(


def _group_refs(
feature_refs: List[str], all_feature_views: List[FeatureView]
feature_refs: List[str],
all_feature_views: List[FeatureView],
full_feature_names: bool = False,
) -> List[Tuple[FeatureView, List[str]]]:
""" Get list of feature views and corresponding feature names based on feature references"""

Expand All @@ -597,12 +622,23 @@ def _group_refs(
# view name to feature names
views_features = defaultdict(list)

feature_set = set()
feature_collision_set = set()

for ref in feature_refs:
view_name, feat_name = ref.split(":")
if feat_name in feature_set:
feature_collision_set.add(feat_name)
else:
feature_set.add(feat_name)
if view_name not in view_index:
raise FeatureViewNotFoundException(view_name)
views_features[view_name].append(feat_name)

if not full_feature_names and len(feature_collision_set) > 0:
err = ", ".join(x for x in feature_collision_set)
raise FeatureNameCollisionError(err)

result = []
for view_name, feature_names in views_features.items():
result.append((view_index[view_name], feature_names))
Expand Down
2 changes: 2 additions & 0 deletions sdk/python/feast/infra/gcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ def get_historical_features(
entity_df: Union[pandas.DataFrame, str],
registry: Registry,
project: str,
full_feature_names: bool = False,
) -> RetrievalJob:
job = self.offline_store.get_historical_features(
config=config,
Expand All @@ -136,5 +137,6 @@ def get_historical_features(
entity_df=entity_df,
registry=registry,
project=project,
full_feature_names=full_feature_names,
)
return job
2 changes: 2 additions & 0 deletions sdk/python/feast/infra/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ def get_historical_features(
entity_df: Union[pd.DataFrame, str],
registry: Registry,
project: str,
full_feature_names: bool = False,
) -> RetrievalJob:
return self.offline_store.get_historical_features(
config=config,
Expand All @@ -135,6 +136,7 @@ def get_historical_features(
entity_df=entity_df,
registry=registry,
project=project,
full_feature_names=full_feature_names,
)


Expand Down
13 changes: 9 additions & 4 deletions sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ def get_historical_features(
entity_df: Union[pandas.DataFrame, str],
registry: Registry,
project: str,
full_feature_names: bool = False,
) -> RetrievalJob:
# TODO: Add entity_df validation in order to fail before interacting with BigQuery

Expand All @@ -105,7 +106,7 @@ def get_historical_features(

# Build a query context containing all information required to template the BigQuery SQL query
query_context = get_feature_view_query_context(
feature_refs, feature_views, registry, project
feature_refs, feature_views, registry, project, full_feature_names
)

# TODO: Infer min_timestamp and max_timestamp from entity_df
Expand All @@ -116,6 +117,7 @@ def get_historical_features(
max_timestamp=datetime.now() + timedelta(days=1),
left_table_query_string=str(table.reference),
entity_df_event_timestamp_col=entity_df_event_timestamp_col,
full_feature_names=full_feature_names,
)

job = BigQueryRetrievalJob(query=query, client=client)
Expand Down Expand Up @@ -292,11 +294,12 @@ def get_feature_view_query_context(
feature_views: List[FeatureView],
registry: Registry,
project: str,
full_feature_names: bool = False,
) -> List[FeatureViewQueryContext]:
"""Build a query context containing all information required to template a BigQuery point-in-time SQL query"""

feature_views_to_feature_map = _get_requested_feature_views_to_features_dict(
feature_refs, feature_views
feature_refs, feature_views, full_feature_names
)

query_context = []
Expand Down Expand Up @@ -351,6 +354,7 @@ def build_point_in_time_query(
max_timestamp: datetime,
left_table_query_string: str,
entity_df_event_timestamp_col: str,
full_feature_names: bool = False,
):
"""Build point-in-time query between each feature view table and the entity dataframe"""
template = Environment(loader=BaseLoader()).from_string(
Expand All @@ -367,6 +371,7 @@ def build_point_in_time_query(
[entity for fv in feature_view_query_contexts for entity in fv.entities]
),
"featureviews": [asdict(context) for context in feature_view_query_contexts],
"full_feature_names": full_feature_names,
}

query = template.render(template_context)
Expand Down Expand Up @@ -440,7 +445,7 @@ def _get_bigquery_client():
{{ featureview.created_timestamp_column ~ ' as created_timestamp,' if featureview.created_timestamp_column else '' }}
{{ featureview.entity_selections | join(', ')}},
{% for feature in featureview.features %}
{{ feature }} as {{ featureview.name }}__{{ feature }}{% if loop.last %}{% else %}, {% endif %}
{{ feature }} as {% if full_feature_names %}{{ featureview.name }}__{{feature}}{% else %}{{ feature }}{% endif %}{% if loop.last %}{% else %}, {% endif %}
{% endfor %}
FROM {{ featureview.table_subquery }}
),
Expand Down Expand Up @@ -533,7 +538,7 @@ def _get_bigquery_client():
SELECT
entity_row_unique_id,
{% for feature in featureview.features %}
{{ featureview.name }}__{{ feature }},
{% if full_feature_names %}{{ featureview.name }}__{{feature}}{% else %}{{ feature }}{% endif %},
{% endfor %}
FROM {{ featureview.name }}__cleaned
) USING (entity_row_unique_id)
Expand Down
14 changes: 8 additions & 6 deletions sdk/python/feast/infra/offline_stores/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def get_historical_features(
entity_df: Union[pd.DataFrame, str],
registry: Registry,
project: str,
full_feature_names: bool = False,
) -> FileRetrievalJob:
if not isinstance(entity_df, pd.DataFrame):
raise ValueError(
Expand All @@ -59,9 +60,8 @@ def get_historical_features(
raise ValueError(
f"Please provide an entity_df with a column named {DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL} representing the time of events."
)

feature_views_to_features = _get_requested_feature_views_to_features_dict(
feature_refs, feature_views
feature_refs, feature_views, full_feature_names
)

# Create lazy function that is only called from the RetrievalJob object
Expand Down Expand Up @@ -125,14 +125,16 @@ def evaluate_historical_retrieval():
# Modify the separator for feature refs in column names to double underscore. We are using
# double underscore as separator for consistency with other databases like BigQuery,
# where there are very few characters available for use as separators
prefixed_feature_name = f"{feature_view.name}__{feature}"

if full_feature_names:
formatted_feature_name = f"{feature_view.name}__{feature}"
else:
formatted_feature_name = feature
# Add the feature name to the list of columns
feature_names.append(prefixed_feature_name)
feature_names.append(formatted_feature_name)

# Ensure that the source dataframe feature column includes the feature view name as a prefix
df_to_join.rename(
columns={feature: prefixed_feature_name}, inplace=True,
columns={feature: formatted_feature_name}, inplace=True,
)

# Build a list of entity columns to join on (from the right table)
Expand Down
1 change: 1 addition & 0 deletions sdk/python/feast/infra/offline_stores/offline_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,5 +66,6 @@ def get_historical_features(
entity_df: Union[pd.DataFrame, str],
registry: Registry,
project: str,
full_feature_names: bool = False,
) -> RetrievalJob:
pass
21 changes: 18 additions & 3 deletions sdk/python/feast/infra/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import importlib
from datetime import datetime
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union
from typing import Any, Callable, Dict, List, Optional, Sequence, Set, Tuple, Union

import pandas
import pyarrow
Expand Down Expand Up @@ -116,6 +116,7 @@ def get_historical_features(
entity_df: Union[pandas.DataFrame, str],
registry: Registry,
project: str,
full_feature_names: bool = False,
) -> RetrievalJob:
pass

Expand Down Expand Up @@ -179,15 +180,24 @@ def get_provider(config: RepoConfig, repo_path: Path) -> Provider:


def _get_requested_feature_views_to_features_dict(
feature_refs: List[str], feature_views: List[FeatureView]
feature_refs: List[str], feature_views: List[FeatureView], full_feature_names: bool
) -> Dict[FeatureView, List[str]]:
"""Create a dict of FeatureView -> List[Feature] for all requested features"""
"""Create a dict of FeatureView -> List[Feature] for all requested features.
Features are prefixed by the feature view name, set value to True to obtain only the feature names."""

feature_views_to_feature_map = {} # type: Dict[FeatureView, List[str]]
feature_set = set() # type: Set[str]
feature_collision_set = set() # type: Set[str]

for ref in feature_refs:
ref_parts = ref.split(":")
feature_view_from_ref = ref_parts[0]
feature_from_ref = ref_parts[1]
if feature_from_ref in feature_set:
feature_collision_set.add(feature_from_ref)
else:
feature_set.add(feature_from_ref)

found = False
for feature_view_from_registry in feature_views:
if feature_view_from_registry.name == feature_view_from_ref:
Expand All @@ -203,6 +213,11 @@ def _get_requested_feature_views_to_features_dict(

if not found:
raise ValueError(f"Could not find feature view from reference {ref}")

if not full_feature_names and len(feature_collision_set) > 0:
err = ", ".join(x for x in feature_collision_set)
raise errors.FeatureNameCollisionError(err)

return feature_views_to_feature_map


Expand Down
1 change: 1 addition & 0 deletions sdk/python/tests/foo_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def get_historical_features(
entity_df: Union[pandas.DataFrame, str],
registry: Registry,
project: str,
full_feature_names: bool = False,
) -> RetrievalJob:
pass

Expand Down
1 change: 1 addition & 0 deletions sdk/python/tests/test_e2e_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def _assert_online_features(
"driver_hourly_stats:avg_daily_trips",
],
entity_rows=[{"driver_id": 1001}],
full_feature_names=True,
)

assert "driver_hourly_stats__avg_daily_trips" in result.to_dict()
Expand Down
Loading

0 comments on commit 3c08212

Please # to comment.