-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? # to your account
Improve exception handling, logging, and validation #1477
Changes from 9 commits
25eabd0
5f70cff
1204f6e
2996fb1
6d44edc
56fed83
7d23ee1
32c630a
0af71ad
b3f9021
5c9a57c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,10 +5,12 @@ | |
|
||
import pandas | ||
import pyarrow | ||
from google.auth.exceptions import DefaultCredentialsError | ||
from google.cloud import bigquery | ||
from jinja2 import BaseLoader, Environment | ||
|
||
from feast.data_source import BigQuerySource, DataSource | ||
from feast.errors import FeastProviderLoginError | ||
from feast.feature_view import FeatureView | ||
from feast.infra.offline_stores.offline_store import OfflineStore | ||
from feast.infra.provider import ( | ||
|
@@ -59,12 +61,32 @@ def pull_latest_from_table_or_query( | |
|
||
@staticmethod | ||
def _pull_query(query: str) -> pyarrow.Table: | ||
from google.cloud import bigquery | ||
|
||
client = bigquery.Client() | ||
client = BigQueryOfflineStore._get_bigquery_client() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you can make this a |
||
query_job = client.query(query) | ||
return query_job.to_arrow() | ||
|
||
@staticmethod | ||
def _get_bigquery_client(): | ||
try: | ||
from google.cloud import bigquery | ||
|
||
client = bigquery.Client() | ||
except DefaultCredentialsError as e: | ||
raise FeastProviderLoginError( | ||
str(e) | ||
+ '\nIt may be necessary to run "gcloud auth application-default login" if you would like to use your ' | ||
"local Google Cloud account" | ||
) | ||
except EnvironmentError as e: | ||
raise FeastProviderLoginError( | ||
"GCP error: " | ||
+ str(e) | ||
+ "\nIt may be necessary to set a default GCP project by running " | ||
'"gcloud config set project your-project"' | ||
) | ||
|
||
return client | ||
|
||
@staticmethod | ||
def get_historical_features( | ||
config: RepoConfig, | ||
|
@@ -76,14 +98,18 @@ def get_historical_features( | |
) -> RetrievalJob: | ||
# TODO: Add entity_df validation in order to fail before interacting with BigQuery | ||
|
||
client = BigQueryOfflineStore._get_bigquery_client() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same here. Generally if you're accessing class methods, it's better to make the method a classmethod. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've been tainted by Java There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. lol :D |
||
|
||
if type(entity_df) is str: | ||
entity_df_sql_table = f"({entity_df})" | ||
elif isinstance(entity_df, pandas.DataFrame): | ||
if "event_timestamp" not in entity_df.columns: | ||
raise ValueError( | ||
"Please provide an entity_df with a column named event_timestamp representing the time of events." | ||
) | ||
table_id = _upload_entity_df_into_bigquery(config.project, entity_df) | ||
table_id = _upload_entity_df_into_bigquery( | ||
config.project, entity_df, client | ||
) | ||
entity_df_sql_table = f"`{table_id}`" | ||
else: | ||
raise ValueError( | ||
|
@@ -104,18 +130,19 @@ def get_historical_features( | |
max_timestamp=datetime.now() + timedelta(days=1), | ||
left_table_query_string=entity_df_sql_table, | ||
) | ||
job = BigQueryRetrievalJob(query=query) | ||
|
||
job = BigQueryRetrievalJob(query=query, client=client) | ||
return job | ||
|
||
|
||
class BigQueryRetrievalJob(RetrievalJob): | ||
def __init__(self, query): | ||
def __init__(self, query, client): | ||
self.query = query | ||
self.client = client | ||
|
||
def to_df(self): | ||
# TODO: Ideally only start this job when the user runs "get_historical_features", not when they run to_df() | ||
client = bigquery.Client() | ||
df = client.query(self.query).to_dataframe(create_bqstorage_client=True) | ||
df = self.client.query(self.query).to_dataframe(create_bqstorage_client=True) | ||
return df | ||
|
||
|
||
|
@@ -135,9 +162,8 @@ class FeatureViewQueryContext: | |
entity_selections: List[str] | ||
|
||
|
||
def _upload_entity_df_into_bigquery(project, entity_df) -> str: | ||
def _upload_entity_df_into_bigquery(project, entity_df, client) -> str: | ||
"""Uploads a Pandas entity dataframe into a BigQuery table and returns a reference to the resulting table""" | ||
client = bigquery.Client() | ||
|
||
# First create the BigQuery dataset if it doesn't exist | ||
dataset = bigquery.Dataset(f"{client.project}.feast_{project}") | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -45,26 +45,38 @@ class Registry: | |
cached_registry_proto_ttl: timedelta | ||
cache_being_updated: bool = False | ||
|
||
def __init__(self, registry_path: str, cache_ttl: timedelta): | ||
def __init__(self, registry_path: str, repo_path: Path, cache_ttl: timedelta): | ||
""" | ||
Create the Registry object. | ||
|
||
Args: | ||
repo_path: Path to the base of the Feast repository | ||
cache_ttl: The amount of time that cached registry state stays valid | ||
registry_path: filepath or GCS URI that is the location of the object store registry, | ||
or where it will be created if it does not exist yet. | ||
""" | ||
uri = urlparse(registry_path) | ||
if uri.scheme == "gs": | ||
self._registry_store: RegistryStore = GCSRegistryStore(registry_path) | ||
elif uri.scheme == "file" or uri.scheme == "": | ||
self._registry_store = LocalRegistryStore(registry_path) | ||
self._registry_store = LocalRegistryStore( | ||
repo_path=repo_path, registry_path_string=registry_path | ||
) | ||
else: | ||
raise Exception( | ||
f"Registry path {registry_path} has unsupported scheme {uri.scheme}. Supported schemes are file and gs." | ||
) | ||
self.cached_registry_proto_ttl = cache_ttl | ||
return | ||
|
||
def _initialize_registry(self): | ||
"""Explicitly forces the initialization of a registry""" | ||
|
||
def updater(registry_proto: RegistryProto): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. might be better to allow |
||
return registry_proto # no-op | ||
|
||
self._registry_store.update_registry_proto(updater) | ||
|
||
def apply_entity(self, entity: Entity, project: str): | ||
""" | ||
Registers a single entity with Feast | ||
|
@@ -264,7 +276,7 @@ def get_feature_view(self, name: str, project: str) -> FeatureView: | |
and feature_view_proto.spec.project == project | ||
): | ||
return FeatureView.from_proto(feature_view_proto) | ||
raise FeatureViewNotFoundException(project, name) | ||
raise FeatureViewNotFoundException(name, project) | ||
|
||
def delete_feature_table(self, name: str, project: str): | ||
""" | ||
|
@@ -309,7 +321,7 @@ def updater(registry_proto: RegistryProto): | |
): | ||
del registry_proto.feature_views[idx] | ||
return registry_proto | ||
raise FeatureViewNotFoundException(project, name) | ||
raise FeatureViewNotFoundException(name, project) | ||
|
||
self._registry_store.update_registry_proto(updater) | ||
|
||
|
@@ -381,9 +393,12 @@ def update_registry_proto(self, updater: Callable[[RegistryProto], RegistryProto | |
|
||
|
||
class LocalRegistryStore(RegistryStore): | ||
def __init__(self, filepath: str): | ||
self._filepath = Path(filepath) | ||
return | ||
def __init__(self, repo_path: Path, registry_path_string: str): | ||
registry_path = Path(registry_path_string) | ||
if registry_path.is_absolute(): | ||
self._filepath = registry_path | ||
else: | ||
self._filepath = repo_path.joinpath(registry_path) | ||
|
||
def get_registry_proto(self): | ||
registry_proto = RegistryProto() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this isn't necessary
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, good catch