Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

feat: Adding write capabability to online store to on demand feature … #4418

Closed
wants to merge 45 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
d03a895
merged changes
franciscojavierarceo Sep 24, 2024
e0f42c6
saving progress
franciscojavierarceo Aug 17, 2024
856cea2
merged changes to odfv
franciscojavierarceo Sep 24, 2024
f6a7133
linted
franciscojavierarceo Aug 18, 2024
fe387fc
adding the test needed to show the expected behavior
franciscojavierarceo Aug 18, 2024
646a124
updated test case
franciscojavierarceo Aug 21, 2024
b345e7e
saving progress
franciscojavierarceo Aug 21, 2024
16e68f0
merging
franciscojavierarceo Sep 24, 2024
24c4ae2
merged
franciscojavierarceo Sep 24, 2024
3d21881
merged
franciscojavierarceo Sep 24, 2024
7afaa84
merging
franciscojavierarceo Sep 24, 2024
436478a
adding the entity keys for now to do retrieval
franciscojavierarceo Aug 29, 2024
c78aead
adding entity to odfv
franciscojavierarceo Aug 29, 2024
fbebc63
checking in progress...getting closer
franciscojavierarceo Aug 29, 2024
5b3e2f5
may have to revert some of this...looks like the challenge is getting…
franciscojavierarceo Aug 31, 2024
7879e21
moving things around to make it easier to debug
franciscojavierarceo Sep 1, 2024
72caa15
debugging
franciscojavierarceo Sep 1, 2024
6d23889
merged
franciscojavierarceo Sep 24, 2024
b76bf4e
merging
franciscojavierarceo Sep 24, 2024
3cf0369
Rebasing and merging changes from other PR
franciscojavierarceo Sep 6, 2024
3a6dfc4
Merging changes continued
franciscojavierarceo Sep 7, 2024
1c37e54
update the _make_inference to include odfvs with writes in the update…
franciscojavierarceo Sep 7, 2024
2f9546f
have the table being written now...the create table happens in the Sq…
franciscojavierarceo Sep 8, 2024
c46e157
checking in progress
franciscojavierarceo Sep 9, 2024
580b77f
adding logs
franciscojavierarceo Sep 10, 2024
815a352
updating permissions
franciscojavierarceo Sep 10, 2024
2eda92c
going to error out on purpose
franciscojavierarceo Sep 10, 2024
e326e9b
adding unit test and merging changes
franciscojavierarceo Sep 18, 2024
a54b5f8
almost got everything working and type validation behaving
franciscojavierarceo Sep 18, 2024
029c9cf
cleaned up and have tests behaving
franciscojavierarceo Sep 18, 2024
e2c6b35
adding print
franciscojavierarceo Sep 21, 2024
adf147f
removing print
franciscojavierarceo Sep 21, 2024
d34db1d
checking in progress
franciscojavierarceo Sep 23, 2024
731bacb
updating test
franciscojavierarceo Sep 25, 2024
8479296
adding test
franciscojavierarceo Sep 25, 2024
3068c3b
linted and updated
franciscojavierarceo Sep 25, 2024
238dc29
removed print
franciscojavierarceo Sep 25, 2024
06315b9
updated tests to test actual behavior
franciscojavierarceo Sep 25, 2024
05efa37
checking in progress
franciscojavierarceo Sep 28, 2024
8c559c8
changing typo
franciscojavierarceo Sep 28, 2024
8b8aa16
updating test
franciscojavierarceo Sep 28, 2024
4562a23
testing changes
franciscojavierarceo Sep 28, 2024
14f837d
checking to see if thing still working
franciscojavierarceo Sep 29, 2024
082a860
removed print
franciscojavierarceo Sep 29, 2024
1d52d29
undo change for odfv file
franciscojavierarceo Sep 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 59 additions & 17 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,12 @@ def _make_inferences(
update_feature_views_with_inferred_features_and_entities(
sfvs_to_update, entities + entities_to_update, self.config
)
# TODO(kevjumba): Update schema inferrence
# We need to attach the time stamp fields to the underlying data sources
# and cascade the dependencies
update_feature_views_with_inferred_features_and_entities(
odfvs_to_update, entities + entities_to_update, self.config
)
# TODO(kevjumba): Update schema inference
for sfv in sfvs_to_update:
if not sfv.schema:
raise ValueError(
Expand All @@ -618,8 +623,13 @@ def _make_inferences(
for odfv in odfvs_to_update:
odfv.infer_features()

odfvs_to_write = [
odfv for odfv in odfvs_to_update if odfv.write_to_online_store
]
# Update to include ODFVs with write to online store
fvs_to_update_map = {
view.name: view for view in [*views_to_update, *sfvs_to_update]
view.name: view
for view in [*views_to_update, *sfvs_to_update, *odfvs_to_write]
}
for feature_service in feature_services_to_update:
feature_service.infer_features(fvs_to_update=fvs_to_update_map)
Expand Down Expand Up @@ -847,6 +857,11 @@ def apply(
]
sfvs_to_update = [ob for ob in objects if isinstance(ob, StreamFeatureView)]
odfvs_to_update = [ob for ob in objects if isinstance(ob, OnDemandFeatureView)]
odfvs_with_writes_to_update = [
ob
for ob in objects
if isinstance(ob, OnDemandFeatureView) and ob.write_to_online_store
]
services_to_update = [ob for ob in objects if isinstance(ob, FeatureService)]
data_sources_set_to_update = {
ob for ob in objects if isinstance(ob, DataSource)
Expand All @@ -868,10 +883,22 @@ def apply(
for batch_source in batch_sources_to_add:
data_sources_set_to_update.add(batch_source)

for fv in itertools.chain(views_to_update, sfvs_to_update):
data_sources_set_to_update.add(fv.batch_source)
if fv.stream_source:
data_sources_set_to_update.add(fv.stream_source)
for fv in itertools.chain(
views_to_update, sfvs_to_update, odfvs_with_writes_to_update
):
if isinstance(fv, FeatureView):
data_sources_set_to_update.add(fv.batch_source)
if isinstance(fv, StreamFeatureView):
if fv.stream_source:
data_sources_set_to_update.add(fv.stream_source)
if isinstance(fv, OnDemandFeatureView):
for source_fvp in fv.source_feature_view_projections:
if fv.source_feature_view_projections[source_fvp].batch_source:
data_sources_set_to_update.add(
fv.source_feature_view_projections[source_fvp].batch_source
)
else:
pass

for odfv in odfvs_to_update:
for v in odfv.source_request_sources.values():
Expand All @@ -884,7 +911,9 @@ def apply(

# Validate all feature views and make inferences.
self._validate_all_feature_views(
views_to_update, odfvs_to_update, sfvs_to_update
views_to_update,
odfvs_to_update,
sfvs_to_update,
)
self._make_inferences(
data_sources_to_update,
Expand Down Expand Up @@ -989,7 +1018,9 @@ def apply(
tables_to_delete: List[FeatureView] = (
views_to_delete + sfvs_to_delete if not partial else [] # type: ignore
)
tables_to_keep: List[FeatureView] = views_to_update + sfvs_to_update # type: ignore
tables_to_keep: List[FeatureView] = (
views_to_update + sfvs_to_update + odfvs_with_writes_to_update
) # type: ignore

self._get_provider().update_infra(
project=self.project,
Expand Down Expand Up @@ -1444,19 +1475,18 @@ def write_to_online_store(
inputs: Optional the dictionary object to be written
allow_registry_cache (optional): Whether to allow retrieving feature views from a cached registry.
"""
# TODO: restrict this to work with online StreamFeatureViews and validate the FeatureView type
feature_view_dict = {
fv_proto.name: fv_proto
for fv_proto in self.list_all_feature_views(allow_registry_cache)
}
try:
feature_view: FeatureView = self.get_stream_feature_view(
feature_view_name, allow_registry_cache=allow_registry_cache
)
feature_view = feature_view_dict[feature_view_name]
except FeatureViewNotFoundException:
feature_view = self.get_feature_view(
feature_view_name, allow_registry_cache=allow_registry_cache
)
raise FeatureViewNotFoundException(feature_view_name, self.project)
if df is not None and inputs is not None:
raise ValueError("Both df and inputs cannot be provided at the same time.")
if df is None and inputs is not None:
if isinstance(inputs, dict):
if isinstance(inputs, dict) or isinstance(inputs, List):
try:
df = pd.DataFrame(inputs)
except Exception as _:
Expand All @@ -1465,8 +1495,20 @@ def write_to_online_store(
pass
else:
raise ValueError("inputs must be a dictionary or a pandas DataFrame.")
if df is not None and inputs is None:
if isinstance(df, dict) or isinstance(df, List):
try:
df = pd.DataFrame(df)
except Exception as _:
raise DataFrameSerializationError

provider = self._get_provider()
provider.ingest_df(feature_view, df)
if isinstance(feature_view, OnDemandFeatureView):
# TODO: add projection mapping
projection_mapping = {}
provider.ingest_df(feature_view, df, projection_mapping)
else:
provider.ingest_df(feature_view, df)

def write_to_offline_store(
self,
Expand Down
35 changes: 21 additions & 14 deletions sdk/python/feast/inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ def _infer_features_and_entities(
fv, join_keys, run_inference_for_features, config
)

entity_columns = []
columns_to_exclude = {
fv.batch_source.timestamp_field,
fv.batch_source.created_timestamp_column,
Expand All @@ -218,6 +219,7 @@ def _infer_features_and_entities(
columns_to_exclude.remove(mapped_col)
columns_to_exclude.add(original_col)

# this is what gets the right stuff
table_column_names_and_types = fv.batch_source.get_table_column_names_and_types(
config
)
Expand All @@ -233,9 +235,9 @@ def _infer_features_and_entities(
),
)
if field.name not in [
entity_column.name for entity_column in fv.entity_columns
entity_column.name for entity_column in entity_columns
]:
fv.entity_columns.append(field)
entity_columns.append(field)
elif not re.match(
"^__|__$", col_name
): # double underscores often signal an internal-use column
Expand All @@ -256,6 +258,8 @@ def _infer_features_and_entities(
if field.name not in [feature.name for feature in fv.features]:
fv.features.append(field)

fv.entity_columns = entity_columns


def _infer_on_demand_features_and_entities(
fv: OnDemandFeatureView,
Expand All @@ -282,26 +286,29 @@ def _infer_on_demand_features_and_entities(

batch_source = getattr(source_feature_view, "batch_source")
batch_field_mapping = getattr(batch_source or None, "field_mapping")
if batch_field_mapping:
for (
original_col,
mapped_col,
) in batch_field_mapping.items():
if mapped_col in columns_to_exclude:
columns_to_exclude.remove(mapped_col)
columns_to_exclude.add(original_col)
for (
original_col,
mapped_col,
) in batch_field_mapping.items():
if mapped_col in columns_to_exclude:
columns_to_exclude.remove(mapped_col)
columns_to_exclude.add(original_col)

table_column_names_and_types = batch_source.get_table_column_names_and_types(
config
)
batch_field_mapping = getattr(batch_source, "field_mapping", {})

table_column_names_and_types = (
batch_source.get_table_column_names_and_types(config)
)
for col_name, col_datatype in table_column_names_and_types:
if col_name in columns_to_exclude:
continue
elif col_name in join_keys:
field = Field(
name=col_name,
dtype=from_value_type(
batch_source.source_datatype_to_feast_value_type()(col_datatype)
batch_source.source_datatype_to_feast_value_type()(
col_datatype
)
),
)
if field.name not in [
Expand Down
41 changes: 28 additions & 13 deletions sdk/python/feast/infra/passthrough_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import pyarrow as pa
from tqdm import tqdm

from feast import importer
from feast import OnDemandFeatureView, importer
from feast.batch_feature_view import BatchFeatureView
from feast.data_source import DataSource
from feast.entity import Entity
Expand Down Expand Up @@ -276,23 +276,38 @@ def ingest_df(
self,
feature_view: FeatureView,
df: pd.DataFrame,
field_mapping: Optional[Dict] = None,
):
table = pa.Table.from_pandas(df)

if feature_view.batch_source.field_mapping is not None:
table = _run_pyarrow_field_mapping(
table, feature_view.batch_source.field_mapping
if isinstance(feature_view, OnDemandFeatureView):
table = _run_pyarrow_field_mapping(table, field_mapping)
join_keys = {
entity.name: entity.dtype.to_value_type()
for entity in feature_view.entity_columns
}
rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys)

self.online_write_batch(
self.repo_config, feature_view, rows_to_write, progress=None
)
else:
# Note: A dictionary mapping of column names in this data
# source to feature names in a feature table or view. Only used for feature
# columns, not entity or timestamp columns.
if feature_view.batch_source.field_mapping is not None:
table = _run_pyarrow_field_mapping(
table, feature_view.batch_source.field_mapping
)

join_keys = {
entity.name: entity.dtype.to_value_type()
for entity in feature_view.entity_columns
}
rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys)
join_keys = {
entity.name: entity.dtype.to_value_type()
for entity in feature_view.entity_columns
}
rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys)

self.online_write_batch(
self.repo_config, feature_view, rows_to_write, progress=None
)
self.online_write_batch(
self.repo_config, feature_view, rows_to_write, progress=None
)

def ingest_df_to_offline_store(self, feature_view: FeatureView, table: pa.Table):
if feature_view.batch_source.field_mapping is not None:
Expand Down
36 changes: 34 additions & 2 deletions sdk/python/feast/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from abc import ABC, abstractmethod
from datetime import datetime, timezone
from enum import Enum
from typing import Dict, Union

import pyarrow

from feast.value_type import ValueType

PRIMITIVE_FEAST_TYPES_TO_VALUE_TYPES = {
Expand All @@ -30,6 +33,10 @@
}


def _utc_now() -> datetime:
return datetime.now(tz=timezone.utc)


class ComplexFeastType(ABC):
"""
A ComplexFeastType represents a structured type that is recognized by Feast.
Expand Down Expand Up @@ -103,7 +110,6 @@ def __hash__(self):
Float64 = PrimitiveFeastType.FLOAT64
UnixTimestamp = PrimitiveFeastType.UNIX_TIMESTAMP


SUPPORTED_BASE_TYPES = [
Invalid,
String,
Expand Down Expand Up @@ -159,7 +165,6 @@ def __str__(self):

FeastType = Union[ComplexFeastType, PrimitiveFeastType]


VALUE_TYPES_TO_FEAST_TYPES: Dict["ValueType", FeastType] = {
ValueType.UNKNOWN: Invalid,
ValueType.BYTES: Bytes,
Expand All @@ -180,6 +185,33 @@ def __str__(self):
ValueType.UNIX_TIMESTAMP_LIST: Array(UnixTimestamp),
}

FEAST_TYPES_TO_PYARROW_TYPES = {
String: pyarrow.string(),
Bool: pyarrow.bool_(),
Int32: pyarrow.int32(),
Int64: pyarrow.int64(),
Float32: pyarrow.float32(),
Float64: pyarrow.float64(),
# Note: datetime only supports microseconds https://github.com/python/cpython/blob/3.8/Lib/datetime.py#L1559
UnixTimestamp: pyarrow.timestamp("us", tz=_utc_now().tzname()),
}


def from_feast_to_pyarrow_type(feast_type: FeastType) -> pyarrow.DataType:
"""
Converts a Feast type to a PyArrow type.

Args:
feast_type: The Feast type to be converted.

Raises:
ValueError: The conversion could not be performed.
"""
if feast_type in FEAST_TYPES_TO_PYARROW_TYPES:
return FEAST_TYPES_TO_PYARROW_TYPES[feast_type]

raise ValueError(f"Could not convert Feast type {feast_type} to PyArrow type.")


def from_value_type(
value_type: ValueType,
Expand Down
Loading
Loading