From 883076b2d598541abee2a80002612cfca91cb1ae Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Tue, 14 Dec 2021 16:14:17 +0000 Subject: [PATCH 1/3] Respect `full_feature_names` for ODFVs Signed-off-by: Judah Rand <17158624+judahrand@users.noreply.github.com> --- sdk/python/feast/feature_store.py | 19 +++++++++---------- .../infra/offline_stores/offline_store.py | 4 ++-- sdk/python/feast/on_demand_feature_view.py | 18 ++++++++++++++++-- sdk/python/feast/transformation_server.py | 2 +- 4 files changed, 28 insertions(+), 15 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 7a66bfb81d6..ea11eca5255 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -1260,7 +1260,11 @@ def _augment_response_with_on_demand_transforms( for feature_ref in feature_refs: view_name, feature_name = feature_ref.split(":") if view_name in requested_odfv_feature_names: - odfv_feature_refs[view_name].append(feature_name) + odfv_feature_refs[view_name].append( + f"{requested_odfv_map[view_name].projection.name_to_use()}__{feature_name}" + if full_feature_names + else feature_name + ) initial_response = OnlineResponse( GetOnlineFeaturesResponse(field_values=result_rows) @@ -1272,7 +1276,7 @@ def _augment_response_with_on_demand_transforms( for odfv_name, _feature_refs in odfv_feature_refs.items(): odfv = requested_odfv_map[odfv_name] transformed_features_df = odfv.get_transformed_features_df( - initial_response_df + initial_response_df, full_feature_names, ) for row_idx in range(len(result_rows)): result_row = result_rows[row_idx] @@ -1282,18 +1286,13 @@ def _augment_response_with_on_demand_transforms( ] for transformed_feature in selected_subset: - transformed_feature_name = ( - f"{odfv.projection.name_to_use()}__{transformed_feature}" - if full_feature_names - else transformed_feature - ) - odfv_result_names.add(transformed_feature_name) + odfv_result_names.add(transformed_feature) proto_value = python_value_to_proto_value( transformed_features_df[transformed_feature].values[row_idx] ) - result_row.fields[transformed_feature_name].CopyFrom(proto_value) + result_row.fields[transformed_feature].CopyFrom(proto_value) result_row.statuses[ - transformed_feature_name + transformed_feature ] = GetOnlineFeaturesResponse.FieldStatus.PRESENT # Drop values that aren't needed diff --git a/sdk/python/feast/infra/offline_stores/offline_store.py b/sdk/python/feast/infra/offline_stores/offline_store.py index 04bf34e3e2d..0ba81971543 100644 --- a/sdk/python/feast/infra/offline_stores/offline_store.py +++ b/sdk/python/feast/infra/offline_stores/offline_store.py @@ -47,7 +47,7 @@ def to_df(self) -> pd.DataFrame: # TODO(adchia): Fix requirement to specify dependent feature views in feature_refs for odfv in self.on_demand_feature_views: features_df = features_df.join( - odfv.get_transformed_features_df(features_df) + odfv.get_transformed_features_df(features_df, self.full_feature_names,) ) return features_df @@ -69,7 +69,7 @@ def to_arrow(self) -> pyarrow.Table: features_df = self._to_df_internal() for odfv in self.on_demand_feature_views: features_df = features_df.join( - odfv.get_transformed_features_df(features_df) + odfv.get_transformed_features_df(features_df, self.full_feature_names,) ) return pyarrow.Table.from_pandas(features_df) diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index c89d122dded..4e2646f61ad 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -164,7 +164,7 @@ def get_request_data_schema(self) -> Dict[str, ValueType]: return schema def get_transformed_features_df( - self, df_with_features: pd.DataFrame + self, df_with_features: pd.DataFrame, full_feature_names: bool = False, ) -> pd.DataFrame: # Apply on demand transformations columns_to_cleanup = [] @@ -183,9 +183,23 @@ def get_transformed_features_df( # Compute transformed values and apply to each result row df_with_transformed_features = self.udf.__call__(df_with_features) + # Work out whether the correct columns names are used. + rename_columns: Dict[str, str] = {} + for feature in self.features: + short_name = feature.name + long_name = f"{self.projection.name_to_use()}__{feature.name}" + if ( + short_name in df_with_transformed_features.columns + and full_feature_names + ): + rename_columns[short_name] = long_name + elif not full_feature_names: + # Long name must be in dataframe. + rename_columns[long_name] = short_name + # Cleanup extra columns used for transformation df_with_features.drop(columns=columns_to_cleanup, inplace=True) - return df_with_transformed_features + return df_with_transformed_features.rename(columns=rename_columns) def infer_features(self): """ diff --git a/sdk/python/feast/transformation_server.py b/sdk/python/feast/transformation_server.py index c7e61f9d17e..83f4af749e3 100644 --- a/sdk/python/feast/transformation_server.py +++ b/sdk/python/feast/transformation_server.py @@ -47,7 +47,7 @@ def TransformFeatures(self, request, context): df = pa.ipc.open_file(request.transformation_input.arrow_value).read_pandas() - result_df = odfv.get_transformed_features_df(df) + result_df = odfv.get_transformed_features_df(df, True) result_arrow = pa.Table.from_pandas(result_df) sink = pa.BufferOutputStream() writer = pa.ipc.new_file(sink, result_arrow.schema) From 821062224805d3b8e15ff6a20c5b56c35b4d4f8e Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Tue, 14 Dec 2021 16:43:10 +0000 Subject: [PATCH 2/3] Add test for ODFV `full_feature_names` Signed-off-by: Judah Rand <17158624+judahrand@users.noreply.github.com> --- .../test_universal_historical_retrieval.py | 29 ++++++++++++++----- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py index 384f3c8d4f8..7d15e2370bb 100644 --- a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py +++ b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py @@ -227,16 +227,21 @@ def get_expected_training_df( expected_df[col] = expected_df[col].astype(typ) conv_feature_name = "driver_stats__conv_rate" if full_feature_names else "conv_rate" - expected_df["conv_rate_plus_100"] = expected_df[conv_feature_name] + 100 - expected_df["conv_rate_plus_100_rounded"] = ( - expected_df["conv_rate_plus_100"] + conv_plus_feature_name = response_feature_name( + "conv_rate_plus_100", full_feature_names + ) + expected_df[conv_plus_feature_name] = expected_df[conv_feature_name] + 100 + expected_df[ + response_feature_name("conv_rate_plus_100_rounded", full_feature_names) + ] = ( + expected_df[conv_plus_feature_name] .astype("float") .round() .astype(pd.Int32Dtype()) ) - expected_df["conv_rate_plus_val_to_add"] = ( - expected_df[conv_feature_name] + expected_df["val_to_add"] - ) + expected_df[ + response_feature_name("conv_rate_plus_val_to_add", full_feature_names) + ] = (expected_df[conv_feature_name] + expected_df["val_to_add"]) return expected_df @@ -638,7 +643,15 @@ def response_feature_name(feature: str, full_feature_names: bool) -> str: if feature in {"conv_rate", "avg_daily_trips"} and full_feature_names: return f"driver_stats__{feature}" - if feature in {"conv_rate_plus_100"} and full_feature_names: + if ( + feature + in { + "conv_rate_plus_100", + "conv_rate_plus_100_rounded", + "conv_rate_plus_val_to_add", + } + and full_feature_names + ): return f"conv_rate_plus_100__{feature}" return feature @@ -670,7 +683,7 @@ def assert_feature_service_correctness( "driver_id", "customer_id", response_feature_name("conv_rate", full_feature_names), - "conv_rate_plus_100", + response_feature_name("conv_rate_plus_100", full_feature_names), "driver_age", ] ] From 0b78adc7caca5d5a9c2ae8c200303bcad52a9cc4 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Fri, 17 Dec 2021 10:42:53 +0000 Subject: [PATCH 3/3] Correct feature names in tests Signed-off-by: Judah Rand <17158624+judahrand@users.noreply.github.com> --- .../offline_store/test_universal_historical_retrieval.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py index 7d15e2370bb..dad14ac5aad 100644 --- a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py +++ b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py @@ -385,10 +385,10 @@ def test_historical_features(environment, universal_data_sources, full_feature_n # Not requesting the on demand transform with an entity_df query (can't add request data in them) expected_df_query = expected_df.drop( columns=[ - "conv_rate_plus_100", - "conv_rate_plus_100_rounded", + response_feature_name("conv_rate_plus_100", full_feature_names), + response_feature_name("conv_rate_plus_100_rounded", full_feature_names), + response_feature_name("conv_rate_plus_val_to_add", full_feature_names), "val_to_add", - "conv_rate_plus_val_to_add", "driver_age", ] )