Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

remove stage_dataframe from the launcher interface #1220

Merged
merged 2 commits into from
Dec 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions sdk/python/feast/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,14 +482,18 @@ def get_historical_features(
features: str, entity_df_path: str, entity_df_dtype: str, destination: str
):
"""
Get historical features
Get historical features. This CLI command is mostly for testing/easy demos; use the
corresponding API method in production.

The main reason why this command is unlikely to be more broadly useful is that we make quite a
few assumptions about the entity dataframe, namely:
* it has to have `event_timestamp` column
* it has to parse cleanly by `pandas.read_csv()` with no extra tuning of data types
"""
import pandas

client = Client()

# TODO: clean this up

if entity_df_dtype:
dtype = json.loads(entity_df_dtype)
entity_df = pandas.read_csv(
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1212,4 +1212,4 @@ def get_job_by_id(self, job_id: str) -> SparkJob:
def stage_dataframe(
self, df: pd.DataFrame, event_timestamp_column: str,
) -> FileSource:
return stage_dataframe(df, event_timestamp_column, self)
return stage_dataframe(df, event_timestamp_column, self._config)
16 changes: 0 additions & 16 deletions sdk/python/feast/pyspark/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,6 @@
from enum import Enum
from typing import Dict, List, Optional

import pandas

from feast.data_source import FileSource


class SparkJobFailure(Exception):
"""
Expand Down Expand Up @@ -541,18 +537,6 @@ def start_stream_to_online_ingestion(
"""
raise NotImplementedError

@abc.abstractmethod
def stage_dataframe(
self, df: pandas.DataFrame, event_timestamp_column: str,
) -> FileSource:
"""
Upload a pandas dataframe so it is available to the Spark cluster.

Returns:
FileSource: representing the uploaded dataframe.
"""
raise NotImplementedError

@abc.abstractmethod
def get_job_by_id(self, job_id: str) -> SparkJob:
raise NotImplementedError
Expand Down
37 changes: 34 additions & 3 deletions sdk/python/feast/pyspark/launcher.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import os
import tempfile
from datetime import datetime
from typing import TYPE_CHECKING, List, Union
from urllib.parse import urlparse, urlunparse

from feast.config import Config
from feast.constants import ConfigOptions as opt
from feast.data_format import ParquetFormat
from feast.data_source import BigQuerySource, DataSource, FileSource, KafkaSource
from feast.feature_table import FeatureTable
from feast.pyspark.abc import (
Expand All @@ -16,6 +20,7 @@
StreamIngestionJobParameters,
)
from feast.staging.entities import create_bq_view_of_joined_features_and_entities
from feast.staging.storage_client import get_staging_client
from feast.value_type import ValueType

if TYPE_CHECKING:
Expand Down Expand Up @@ -297,6 +302,32 @@ def get_job_by_id(job_id: str, client: "Client") -> SparkJob:
return launcher.get_job_by_id(job_id)


def stage_dataframe(df, event_timestamp_column: str, client: "Client") -> FileSource:
launcher = resolve_launcher(client._config)
return launcher.stage_dataframe(df, event_timestamp_column)
def stage_dataframe(df, event_timestamp_column: str, config: Config) -> FileSource:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a basic description/comment?

"""
Helper function to upload a pandas dataframe in parquet format to a temporary location (under
SPARK_STAGING_LOCATION) and return it wrapped in a FileSource.

Args:
event_timestamp_column(str): the name of the timestamp column in the dataframe.
config(Config): feast config.
"""
staging_location = config.get(opt.SPARK_STAGING_LOCATION)
staging_uri = urlparse(staging_location)

with tempfile.NamedTemporaryFile() as f:
df.to_parquet(f)

file_url = urlunparse(
get_staging_client(staging_uri.scheme, config).upload_fileobj(
f,
f.name,
remote_path_prefix=os.path.join(staging_location, "dataframes"),
remote_path_suffix=".parquet",
)
)

return FileSource(
event_timestamp_column=event_timestamp_column,
file_format=ParquetFormat(),
file_url=file_url,
)
25 changes: 0 additions & 25 deletions sdk/python/feast/pyspark/launchers/aws/emr.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
import os
import tempfile
from io import BytesIO
from typing import Any, Dict, List, Optional
from urllib.parse import urlunparse

import boto3
import pandas
from botocore.config import Config as BotoConfig

from feast.data_format import ParquetFormat
from feast.data_source import FileSource
from feast.pyspark.abc import (
BatchIngestionJob,
BatchIngestionJobParameters,
Expand Down Expand Up @@ -304,27 +300,6 @@ def start_stream_to_online_ingestion(

return EmrStreamIngestionJob(self._emr_client(), job_ref, job_hash)

def stage_dataframe(self, df: pandas.DataFrame, event_timestamp: str) -> FileSource:
with tempfile.NamedTemporaryFile() as f:
df.to_parquet(f)

file_url = urlunparse(
get_staging_client("s3").upload_fileobj(
f,
f.name,
remote_path_prefix=os.path.join(
self._staging_location, "dataframes"
),
remote_path_suffix=".parquet",
)
)

return FileSource(
event_timestamp_column=event_timestamp,
file_format=ParquetFormat(),
file_url=file_url,
)

def _job_from_job_info(self, job_info: JobInfo) -> SparkJob:
if job_info.job_type == HISTORICAL_RETRIEVAL_JOB_TYPE:
assert job_info.output_file_uri is not None
Expand Down
3 changes: 0 additions & 3 deletions sdk/python/feast/pyspark/launchers/gcloud/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,9 +350,6 @@ def start_stream_to_online_ingestion(
job_hash = ingestion_job_params.get_job_hash()
return DataprocStreamingIngestionJob(job, refresh_fn, cancel_fn, job_hash)

def stage_dataframe(self, df, event_timestamp_column: str):
raise NotImplementedError

def get_job_by_id(self, job_id: str) -> SparkJob:
job = self.job_client.get_job(
project_id=self.project_id, region=self.region, job_id=job_id
Expand Down
3 changes: 0 additions & 3 deletions sdk/python/feast/pyspark/launchers/standalone/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,9 +332,6 @@ def start_stream_to_online_ingestion(
global_job_cache.add_job(job)
return job

def stage_dataframe(self, df, event_timestamp_column: str):
raise NotImplementedError

def get_job_by_id(self, job_id: str) -> SparkJob:
return global_job_cache.get_job_by_id(job_id)

Expand Down