Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Respect full_feature_names for ODFVs #2144

Merged
merged 3 commits into from
Dec 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 9 additions & 10 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -1260,7 +1260,11 @@ def _augment_response_with_on_demand_transforms(
for feature_ref in feature_refs:
view_name, feature_name = feature_ref.split(":")
if view_name in requested_odfv_feature_names:
odfv_feature_refs[view_name].append(feature_name)
odfv_feature_refs[view_name].append(
f"{requested_odfv_map[view_name].projection.name_to_use()}__{feature_name}"
if full_feature_names
else feature_name
)

initial_response = OnlineResponse(
GetOnlineFeaturesResponse(field_values=result_rows)
Expand All @@ -1272,7 +1276,7 @@ def _augment_response_with_on_demand_transforms(
for odfv_name, _feature_refs in odfv_feature_refs.items():
odfv = requested_odfv_map[odfv_name]
transformed_features_df = odfv.get_transformed_features_df(
initial_response_df
initial_response_df, full_feature_names,
)
for row_idx in range(len(result_rows)):
result_row = result_rows[row_idx]
Expand All @@ -1282,18 +1286,13 @@ def _augment_response_with_on_demand_transforms(
]

for transformed_feature in selected_subset:
transformed_feature_name = (
f"{odfv.projection.name_to_use()}__{transformed_feature}"
if full_feature_names
else transformed_feature
)
odfv_result_names.add(transformed_feature_name)
odfv_result_names.add(transformed_feature)
proto_value = python_value_to_proto_value(
transformed_features_df[transformed_feature].values[row_idx]
)
result_row.fields[transformed_feature_name].CopyFrom(proto_value)
result_row.fields[transformed_feature].CopyFrom(proto_value)
result_row.statuses[
transformed_feature_name
transformed_feature
] = GetOnlineFeaturesResponse.FieldStatus.PRESENT

# Drop values that aren't needed
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/feast/infra/offline_stores/offline_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def to_df(self) -> pd.DataFrame:
# TODO(adchia): Fix requirement to specify dependent feature views in feature_refs
for odfv in self.on_demand_feature_views:
features_df = features_df.join(
odfv.get_transformed_features_df(features_df)
odfv.get_transformed_features_df(features_df, self.full_feature_names,)
)
return features_df

Expand All @@ -69,7 +69,7 @@ def to_arrow(self) -> pyarrow.Table:
features_df = self._to_df_internal()
for odfv in self.on_demand_feature_views:
features_df = features_df.join(
odfv.get_transformed_features_df(features_df)
odfv.get_transformed_features_df(features_df, self.full_feature_names,)
)
return pyarrow.Table.from_pandas(features_df)

Expand Down
18 changes: 16 additions & 2 deletions sdk/python/feast/on_demand_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ def get_request_data_schema(self) -> Dict[str, ValueType]:
return schema

def get_transformed_features_df(
self, df_with_features: pd.DataFrame
self, df_with_features: pd.DataFrame, full_feature_names: bool = False,
) -> pd.DataFrame:
# Apply on demand transformations
columns_to_cleanup = []
Expand All @@ -183,9 +183,23 @@ def get_transformed_features_df(
# Compute transformed values and apply to each result row
df_with_transformed_features = self.udf.__call__(df_with_features)

# Work out whether the correct columns names are used.
rename_columns: Dict[str, str] = {}
for feature in self.features:
short_name = feature.name
long_name = f"{self.projection.name_to_use()}__{feature.name}"
if (
short_name in df_with_transformed_features.columns
and full_feature_names
):
rename_columns[short_name] = long_name
elif not full_feature_names:
# Long name must be in dataframe.
rename_columns[long_name] = short_name

# Cleanup extra columns used for transformation
df_with_features.drop(columns=columns_to_cleanup, inplace=True)
return df_with_transformed_features
return df_with_transformed_features.rename(columns=rename_columns)

def infer_features(self):
"""
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/transformation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def TransformFeatures(self, request, context):

df = pa.ipc.open_file(request.transformation_input.arrow_value).read_pandas()

result_df = odfv.get_transformed_features_df(df)
result_df = odfv.get_transformed_features_df(df, True)
result_arrow = pa.Table.from_pandas(result_df)
sink = pa.BufferOutputStream()
writer = pa.ipc.new_file(sink, result_arrow.schema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,16 +227,21 @@ def get_expected_training_df(
expected_df[col] = expected_df[col].astype(typ)

conv_feature_name = "driver_stats__conv_rate" if full_feature_names else "conv_rate"
expected_df["conv_rate_plus_100"] = expected_df[conv_feature_name] + 100
expected_df["conv_rate_plus_100_rounded"] = (
expected_df["conv_rate_plus_100"]
conv_plus_feature_name = response_feature_name(
"conv_rate_plus_100", full_feature_names
)
expected_df[conv_plus_feature_name] = expected_df[conv_feature_name] + 100
expected_df[
response_feature_name("conv_rate_plus_100_rounded", full_feature_names)
] = (
expected_df[conv_plus_feature_name]
.astype("float")
.round()
.astype(pd.Int32Dtype())
)
expected_df["conv_rate_plus_val_to_add"] = (
expected_df[conv_feature_name] + expected_df["val_to_add"]
)
expected_df[
response_feature_name("conv_rate_plus_val_to_add", full_feature_names)
] = (expected_df[conv_feature_name] + expected_df["val_to_add"])

return expected_df

Expand Down Expand Up @@ -380,10 +385,10 @@ def test_historical_features(environment, universal_data_sources, full_feature_n
# Not requesting the on demand transform with an entity_df query (can't add request data in them)
expected_df_query = expected_df.drop(
columns=[
"conv_rate_plus_100",
"conv_rate_plus_100_rounded",
response_feature_name("conv_rate_plus_100", full_feature_names),
response_feature_name("conv_rate_plus_100_rounded", full_feature_names),
response_feature_name("conv_rate_plus_val_to_add", full_feature_names),
"val_to_add",
"conv_rate_plus_val_to_add",
"driver_age",
]
)
Expand Down Expand Up @@ -638,7 +643,15 @@ def response_feature_name(feature: str, full_feature_names: bool) -> str:
if feature in {"conv_rate", "avg_daily_trips"} and full_feature_names:
return f"driver_stats__{feature}"

if feature in {"conv_rate_plus_100"} and full_feature_names:
if (
feature
in {
"conv_rate_plus_100",
"conv_rate_plus_100_rounded",
"conv_rate_plus_val_to_add",
}
and full_feature_names
):
return f"conv_rate_plus_100__{feature}"

return feature
Expand Down Expand Up @@ -670,7 +683,7 @@ def assert_feature_service_correctness(
"driver_id",
"customer_id",
response_feature_name("conv_rate", full_feature_names),
"conv_rate_plus_100",
response_feature_name("conv_rate_plus_100", full_feature_names),
"driver_age",
]
]
Expand Down