Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Minor bug fixes for Python SDK #383

Merged
merged 1 commit into from
Dec 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 17 additions & 12 deletions sdk/python/feast/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
import json
import logging
import os
import shutil
import tempfile
import time
from collections import OrderedDict
from typing import Dict, Union
from typing import List
from math import ceil
from typing import Dict, List, Tuple, Union
from urllib.parse import urlparse

import fastavro
Expand Down Expand Up @@ -642,11 +644,11 @@ def ingest(
raise Exception(f"Feature set name must be provided")

# Read table and get row count
tmp_table_name = _read_table_from_source(
dir_path, dest_path = _read_table_from_source(
source, chunk_size, max_workers
)

pq_file = pq.ParquetFile(tmp_table_name)
pq_file = pq.ParquetFile(dest_path)

row_count = pq_file.metadata.num_rows

Expand Down Expand Up @@ -688,7 +690,7 @@ def ingest(
# Transform and push data to Kafka
if feature_set.source.source_type == "Kafka":
for chunk in get_feature_row_chunks(
file=tmp_table_name,
file=dest_path,
row_groups=list(range(pq_file.num_row_groups)),
fs=feature_set,
max_workers=max_workers):
Expand All @@ -715,7 +717,7 @@ def ingest(
finally:
# Remove parquet file(s) that were created earlier
print("Removing temporary file(s)...")
os.remove(tmp_table_name)
shutil.rmtree(dir_path)

return None

Expand Down Expand Up @@ -753,7 +755,7 @@ def _read_table_from_source(
source: Union[pd.DataFrame, str],
chunk_size: int,
max_workers: int
) -> str:
) -> Tuple[str, str]:
"""
Infers a data source type (path or Pandas DataFrame) and reads it in as
a PyArrow Table.
Expand All @@ -777,7 +779,9 @@ def _read_table_from_source(
Amount of rows to load and ingest at a time.

Returns:
str: Path to parquet file that was created.
Tuple[str, str]:
Tuple containing parent directory path and destination path to
parquet file.
"""

# Pandas DataFrame detected
Expand Down Expand Up @@ -807,12 +811,13 @@ def _read_table_from_source(
assert isinstance(table, pa.lib.Table)

# Write table as parquet file with a specified row_group_size
dir_path = tempfile.mkdtemp()
tmp_table_name = f"{int(time.time())}.parquet"
row_group_size = min(int(table.num_rows/max_workers), chunk_size)
pq.write_table(table=table, where=tmp_table_name,
row_group_size=row_group_size)
dest_path = f"{dir_path}/{tmp_table_name}"
row_group_size = min(ceil(table.num_rows/max_workers), chunk_size)
pq.write_table(table=table, where=dest_path, row_group_size=row_group_size)

# Remove table from memory
del table

return tmp_table_name
return dir_path, dest_path
2 changes: 1 addition & 1 deletion sdk/python/feast/loaders/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def get_feature_row_chunks(

pool = Pool(max_workers)
func = partial(_encode_pa_tables, file, fs)
for chunk in pool.imap_unordered(func, row_groups):
for chunk in pool.imap(func, row_groups):
yield chunk
return

Expand Down