Skip to content

Commit

Permalink
fix historical test for azure (#1262)
Browse files Browse the repository at this point in the history
* fix historical test for azure

Signed-off-by: Jacob Klegar <jacob@tecton.ai>

* fix adlfs version

Signed-off-by: Jacob Klegar <jacob@tecton.ai>

* overwrite existing Azure blobs to match s3 and gcs

Signed-off-by: Jacob Klegar <jacob@tecton.ai>

* fix linting

Signed-off-by: Jacob Klegar <jacob@tecton.ai>
  • Loading branch information
jklegar authored Jan 8, 2021
1 parent 468f417 commit d8a3795
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 6 deletions.
3 changes: 1 addition & 2 deletions sdk/python/feast/pyspark/launchers/k8s/k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import yaml
from kubernetes.client.api import CustomObjectsApi

from feast.constants import ConfigOptions as opt
from feast.pyspark.abc import (
BQ_SPARK_PACKAGE,
BatchIngestionJob,
Expand Down Expand Up @@ -196,7 +195,7 @@ def _get_azure_credentials(self):
account_key = self._azure_account_key
if account_name is None or account_key is None:
raise Exception(
f"Using Azure blob storage requires Azure blob account name and access key to be set in config"
"Using Azure blob storage requires Azure blob account name and access key to be set in config"
)
return {
f"spark.hadoop.fs.azure.account.key.{account_name}.blob.core.windows.net": f"{account_key}"
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/staging/storage_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ def upload_fileobj(
)
bucket, key = self._uri_to_bucket_key(remote_uri)
container_client = self.blob_service_client.get_container_client(bucket)
container_client.upload_blob(name=key, data=fileobj)
container_client.upload_blob(name=key, data=fileobj, overwrite=True)
return remote_uri


Expand Down
3 changes: 2 additions & 1 deletion sdk/python/requirements-ci.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@ pytest-timeout==1.4.2
pytest-ordering==0.6.*
pytest-mock==1.10.4
PyYAML==5.3.1
great-expectations==0.13.2
great-expectations==0.13.2
adlfs==0.5.9
28 changes: 26 additions & 2 deletions tests/e2e/test_historical_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@
from pyarrow import parquet

from feast import Client, Entity, Feature, FeatureTable, ValueType
from feast.constants import ConfigOptions as opt
from feast.data_source import BigQuerySource, FileSource
from feast.pyspark.abc import SparkJobStatus

np.random.seed(0)


def read_parquet(uri):
def read_parquet(uri, azure_account_name=None, azure_account_key=None):
parsed_uri = urlparse(uri)
if parsed_uri.scheme == "file":
return pd.read_parquet(parsed_uri.path)
Expand All @@ -42,6 +43,16 @@ def read_parquet(uri):
files = ["s3://" + path for path in fs.glob(s3uri + "/part-*")]
ds = parquet.ParquetDataset(files, filesystem=fs)
return ds.read().to_pandas()
elif parsed_uri.scheme == "wasbs":
import adlfs

fs = adlfs.AzureBlobFileSystem(
account_name=azure_account_name, account_key=azure_account_key
)
uripath = parsed_uri.username + parsed_uri.path
files = fs.glob(uripath + "/part-*")
ds = parquet.ParquetDataset(files, filesystem=fs)
return ds.read().to_pandas()
else:
raise ValueError(f"Unsupported URL scheme {uri}")

Expand Down Expand Up @@ -75,6 +86,13 @@ def generate_data():
return transactions_df, customer_df


def _get_azure_creds(feast_client: Client):
return (
feast_client._config.get(opt.AZURE_BLOB_ACCOUNT_NAME, None),
feast_client._config.get(opt.AZURE_BLOB_ACCOUNT_ACCESS_KEY, None),
)


def test_historical_features(
feast_client: Client,
tfrecord_feast_client: Client,
Expand Down Expand Up @@ -108,7 +126,13 @@ def test_historical_features(

job = feast_client.get_historical_features(feature_refs, customers_df)
output_dir = job.get_output_file_uri()
joined_df = read_parquet(output_dir)

# will both be None if not using Azure blob storage
account_name, account_key = _get_azure_creds(feast_client)

joined_df = read_parquet(
output_dir, azure_account_name=account_name, azure_account_key=account_key
)

expected_joined_df = pd.DataFrame(
{
Expand Down

0 comments on commit d8a3795

Please # to comment.