From 939a62b15bcc933740dbf677d560c7a76f877c7f Mon Sep 17 00:00:00 2001 From: suvoonhou Date: Sun, 22 Dec 2019 23:47:05 +0800 Subject: [PATCH] Added a few minor changes: - Use tmp dir during ingest - Tweak int to ceil for row_group_size - Changed imap_unordered to imap so that results are ordered --- sdk/python/feast/client.py | 29 +++++++++++++++++------------ sdk/python/feast/loaders/ingest.py | 2 +- 2 files changed, 18 insertions(+), 13 deletions(-) diff --git a/sdk/python/feast/client.py b/sdk/python/feast/client.py index 4e5d63e2d86..719022ea7a3 100644 --- a/sdk/python/feast/client.py +++ b/sdk/python/feast/client.py @@ -15,10 +15,12 @@ import json import logging import os +import shutil +import tempfile import time from collections import OrderedDict -from typing import Dict, Union -from typing import List +from math import ceil +from typing import Dict, List, Tuple, Union from urllib.parse import urlparse import fastavro @@ -642,11 +644,11 @@ def ingest( raise Exception(f"Feature set name must be provided") # Read table and get row count - tmp_table_name = _read_table_from_source( + dir_path, dest_path = _read_table_from_source( source, chunk_size, max_workers ) - pq_file = pq.ParquetFile(tmp_table_name) + pq_file = pq.ParquetFile(dest_path) row_count = pq_file.metadata.num_rows @@ -688,7 +690,7 @@ def ingest( # Transform and push data to Kafka if feature_set.source.source_type == "Kafka": for chunk in get_feature_row_chunks( - file=tmp_table_name, + file=dest_path, row_groups=list(range(pq_file.num_row_groups)), fs=feature_set, max_workers=max_workers): @@ -715,7 +717,7 @@ def ingest( finally: # Remove parquet file(s) that were created earlier print("Removing temporary file(s)...") - os.remove(tmp_table_name) + shutil.rmtree(dir_path) return None @@ -753,7 +755,7 @@ def _read_table_from_source( source: Union[pd.DataFrame, str], chunk_size: int, max_workers: int -) -> str: +) -> Tuple[str, str]: """ Infers a data source type (path or Pandas DataFrame) and reads it in as a PyArrow Table. @@ -777,7 +779,9 @@ def _read_table_from_source( Amount of rows to load and ingest at a time. Returns: - str: Path to parquet file that was created. + Tuple[str, str]: + Tuple containing parent directory path and destination path to + parquet file. """ # Pandas DataFrame detected @@ -807,12 +811,13 @@ def _read_table_from_source( assert isinstance(table, pa.lib.Table) # Write table as parquet file with a specified row_group_size + dir_path = tempfile.mkdtemp() tmp_table_name = f"{int(time.time())}.parquet" - row_group_size = min(int(table.num_rows/max_workers), chunk_size) - pq.write_table(table=table, where=tmp_table_name, - row_group_size=row_group_size) + dest_path = f"{dir_path}/{tmp_table_name}" + row_group_size = min(ceil(table.num_rows/max_workers), chunk_size) + pq.write_table(table=table, where=dest_path, row_group_size=row_group_size) # Remove table from memory del table - return tmp_table_name + return dir_path, dest_path diff --git a/sdk/python/feast/loaders/ingest.py b/sdk/python/feast/loaders/ingest.py index 527ab481fe0..cbe80086e67 100644 --- a/sdk/python/feast/loaders/ingest.py +++ b/sdk/python/feast/loaders/ingest.py @@ -127,7 +127,7 @@ def get_feature_row_chunks( pool = Pool(max_workers) func = partial(_encode_pa_tables, file, fs) - for chunk in pool.imap_unordered(func, row_groups): + for chunk in pool.imap(func, row_groups): yield chunk return