Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

fix: Don't prevent apply from running given duplicate empty names in data sources. Also fix repeated apply of Spark data source. #2415

Merged
merged 15 commits into from
Mar 29, 2022
1 change: 0 additions & 1 deletion protos/feast/core/SavedDataset.proto
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ option go_package = "github.com/feast-dev/feast/go/protos/feast/core";

import "google/protobuf/timestamp.proto";
import "feast/core/DataSource.proto";
import "feast/core/FeatureService.proto";

message SavedDatasetSpec {
// Name of the dataset. Must be unique since it's possible to overwrite dataset by name
Expand Down
11 changes: 11 additions & 0 deletions sdk/python/feast/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import logging
import warnings
from datetime import datetime
from pathlib import Path
from typing import List, Optional
Expand Down Expand Up @@ -151,6 +152,11 @@ def data_source_describe(ctx: click.Context, name: str):
print(e)
exit(1)

warnings.warn(
"Describing data sources will only work properly if all data sources have names or table names specified. "
"Starting Feast 0.21, data source unique names will be required to encourage data source discovery.",
RuntimeWarning,
)
print(
yaml.dump(
yaml.safe_load(str(data_source)), default_flow_style=False, sort_keys=False
Expand All @@ -173,6 +179,11 @@ def data_source_list(ctx: click.Context):

from tabulate import tabulate

warnings.warn(
"Listing data sources will only work properly if all data sources have names or table names specified. "
"Starting Feast 0.21, data source unique names will be required to encourage data source discovery",
RuntimeWarning,
)
print(tabulate(table, headers=["NAME", "CLASS"], tablefmt="plain"))


Expand Down
5 changes: 5 additions & 0 deletions sdk/python/feast/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
from abc import ABC, abstractmethod
from typing import Any, Callable, Dict, Iterable, Optional, Tuple

from google.protobuf.json_format import MessageToJson

from feast import type_map
from feast.data_format import StreamFormat
from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto
Expand Down Expand Up @@ -192,6 +194,9 @@ def __init__(
def __hash__(self):
return hash((id(self), self.name))

def __str__(self):
return str(MessageToJson(self.to_proto()))

def __eq__(self, other):
if not isinstance(other, DataSource):
raise TypeError("Comparisons should only involve DataSource class objects.")
Expand Down
10 changes: 8 additions & 2 deletions sdk/python/feast/diff/registry_diff.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ def to_string(self):
continue
if feast_object_diff.transition_type == TransitionType.UNCHANGED:
continue
if feast_object_diff.feast_object_type == FeastObjectType.DATA_SOURCE:
# TODO(adchia): Print statements out starting in Feast 0.21
continue
action, color = message_action_map[feast_object_diff.transition_type]
log_string += f"{action} {feast_object_diff.feast_object_type.value} {Style.BRIGHT + color}{feast_object_diff.name}{Style.RESET_ALL}\n"
if feast_object_diff.transition_type == TransitionType.UPDATE:
Expand All @@ -78,8 +81,11 @@ def to_string(self):
def tag_objects_for_keep_delete_update_add(
existing_objs: Iterable[FeastObject], desired_objs: Iterable[FeastObject]
) -> Tuple[Set[FeastObject], Set[FeastObject], Set[FeastObject], Set[FeastObject]]:
existing_obj_names = {e.name for e in existing_objs}
desired_obj_names = {e.name for e in desired_objs}
# TODO(adchia): Remove the "if X.name" condition when data sources are forced to have names
existing_obj_names = {e.name for e in existing_objs if e.name}
desired_objs = [obj for obj in desired_objs if obj.name]
existing_objs = [obj for obj in existing_objs if obj.name]
desired_obj_names = {e.name for e in desired_objs if e.name}

objs_to_add = {e for e in desired_objs if e.name not in existing_obj_names}
objs_to_update = {e for e in desired_objs if e.name in existing_obj_names}
Expand Down
17 changes: 10 additions & 7 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -2063,14 +2063,17 @@ def _validate_feature_views(feature_views: List[BaseFeatureView]):
def _validate_data_sources(data_sources: List[DataSource]):
""" Verify data sources have case-insensitively unique names"""
ds_names = set()
for fv in data_sources:
case_insensitive_ds_name = fv.name.lower()
for ds in data_sources:
case_insensitive_ds_name = ds.name.lower()
if case_insensitive_ds_name in ds_names:
raise ValueError(
f"More than one data source with name {case_insensitive_ds_name} found. "
f"Please ensure that all data source names are case-insensitively unique. "
f"It may be necessary to ignore certain files in your feature repository by using a .feastignore file."
)
if case_insensitive_ds_name.strip():
warnings.warn(
f"More than one data source with name {case_insensitive_ds_name} found. "
f"Please ensure that all data source names are case-insensitively unique. "
f"It may be necessary to ignore certain files in your feature repository by using a .feastignore "
f"file. Starting in Feast 0.21, unique names (perhaps inferred from the table name) will be "
f"required in data sources to encourage data source discovery"
)
else:
ds_names.add(case_insensitive_ds_name)

Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/infra/offline_stores/bigquery_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def __init__(
else:
warnings.warn(
(
"Starting in Feast 0.21, Feast will require either a name for a data source (if using query) or `table`."
f"Starting in Feast 0.21, Feast will require either a name for a data source (if using query) or `table`: {self.query}"
),
DeprecationWarning,
)
Expand Down
16 changes: 8 additions & 8 deletions sdk/python/feast/infra/offline_stores/redshift_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ def __init__(
query (optional): The query to be executed to obtain the features.
name (optional): Name for the source. Defaults to the table_ref if not specified.
"""
# The default Redshift schema is named "public".
_schema = "public" if table and not schema else schema
self.redshift_options = RedshiftOptions(
table=table, schema=_schema, query=query
)

if table is None and query is None:
raise ValueError('No "table" argument provided.')
_name = name
Expand All @@ -50,7 +56,8 @@ def __init__(
else:
warnings.warn(
(
"Starting in Feast 0.21, Feast will require either a name for a data source (if using query) or `table`."
f"Starting in Feast 0.21, Feast will require either a name for a data source (if using query) "
f"or `table`: {self.query}"
),
DeprecationWarning,
)
Expand All @@ -63,13 +70,6 @@ def __init__(
date_partition_column,
)

# The default Redshift schema is named "public".
_schema = "public" if table and not schema else schema

self.redshift_options = RedshiftOptions(
table=table, schema=_schema, query=query
)

@staticmethod
def from_proto(data_source: DataSourceProto):
"""
Expand Down
16 changes: 8 additions & 8 deletions sdk/python/feast/infra/offline_stores/snowflake_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ def __init__(
"""
if table is None and query is None:
raise ValueError('No "table" argument provided.')
# The default Snowflake schema is named "PUBLIC".
_schema = "PUBLIC" if (database and table and not schema) else schema

self.snowflake_options = SnowflakeOptions(
database=database, schema=_schema, table=table, query=query
)

# If no name, use the table as the default name
_name = name
Expand All @@ -53,7 +59,8 @@ def __init__(
else:
warnings.warn(
(
"Starting in Feast 0.21, Feast will require either a name for a data source (if using query) or `table`."
f"Starting in Feast 0.21, Feast will require either a name for a data source (if using query) "
f"or `table`: {self.query}"
),
DeprecationWarning,
)
Expand All @@ -66,13 +73,6 @@ def __init__(
date_partition_column,
)

# The default Snowflake schema is named "PUBLIC".
_schema = "PUBLIC" if (database and table and not schema) else schema

self.snowflake_options = SnowflakeOptions(
database=database, schema=_schema, table=table, query=query
)

@staticmethod
def from_proto(data_source: DataSourceProto):
"""
Expand Down
4 changes: 3 additions & 1 deletion sdk/python/feast/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,11 +314,13 @@ def apply_data_source(
commit: Whether to immediately commit to the registry
"""
registry = self._prepare_registry_for_changes()

for idx, existing_data_source_proto in enumerate(registry.data_sources):
if existing_data_source_proto.name == data_source.name:
del registry.data_sources[idx]
data_source_proto = data_source.to_proto()
data_source_proto.data_source_class_type = (
f"{data_source.__class__.__module__}.{data_source.__class__.__name__}"
)
data_source_proto.project = project
data_source_proto.data_source_class_type = (
f"{data_source.__class__.__module__}.{data_source.__class__.__name__}"
Expand Down
6 changes: 2 additions & 4 deletions sdk/python/feast/repo_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,8 @@ def extract_objects_for_apply_delete(project, registry, repo):
return (
all_to_apply,
all_to_delete,
set(
objs_to_add[FeastObjectType.FEATURE_VIEW].union(
objs_to_update[FeastObjectType.FEATURE_VIEW]
)
set(objs_to_add[FeastObjectType.FEATURE_VIEW]).union(
set(objs_to_update[FeastObjectType.FEATURE_VIEW])
),
objs_to_delete[FeastObjectType.FEATURE_VIEW],
)
Expand Down
12 changes: 12 additions & 0 deletions sdk/python/tests/example_repos/example_feature_repo_1.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,18 @@
created_timestamp_column="created_timestamp",
)

driver_locations_source_query = BigQuerySource(
query="SELECT * from feast-oss.public.drivers",
event_timestamp_column="event_timestamp",
created_timestamp_column="created_timestamp",
)

driver_locations_source_query_2 = BigQuerySource(
query="SELECT lat * 2 FROM feast-oss.public.drivers",
event_timestamp_column="event_timestamp",
created_timestamp_column="created_timestamp",
)

customer_profile_source = BigQuerySource(
name="customer_profile_source",
table_ref="feast-oss.public.customers",
Expand Down