From 4740d7797d66feb4e76161c093fd618ba58b413e Mon Sep 17 00:00:00 2001 From: Alex Edwards <130456418+alexeocto@users.noreply.github.com> Date: Fri, 22 Sep 2023 17:59:26 +0100 Subject: [PATCH] Bump version to v4.3.0 Co-Authored-By: Omer Korner --- CHANGELOG.md | 4 + setup.py | 4 +- tests/storage/test_storage.py | 343 ++++++++++++++++++++++++++++++++++ xocto/__init__.py | 2 +- xocto/storage/s3_select.py | 25 +++ xocto/storage/storage.py | 202 +++++++++++++++++++- 6 files changed, 569 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 81439ef..6e2f45a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## v4.3.0 - 2023-09-27 + +- Enable querying parquet files using `S3FileStore.fetch_object_contents_with_s3_select` and `LocalFileStore.fetch_object_contents_with_s3_select` [#95](https://github.com/octoenergy/xocto/pull/95/) + ## v4.2.1 - 2023-09-18 - Allow timzone override in `localtime.parse_dt` [#93](https://github.com/octoenergy/xocto/pull/93) diff --git a/setup.py b/setup.py index bc4b609..8c1e002 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ REPO_ROOT = path.abspath(path.dirname(__file__)) -VERSION = "4.2.1" +VERSION = "4.3.0" with open(path.join(REPO_ROOT, "README.md"), encoding="utf-8") as f: long_description = f.read() @@ -34,6 +34,7 @@ zip_safe=False, install_requires=[ "ddtrace>=1.9.0", + "duckdb==0.8.1", "django>=4.0", "openpyxl>=3.1.0", "pact-python>=1.6.0", @@ -55,6 +56,7 @@ "mypy==0.991", "numpy==1.22.2", "pre-commit>=3.2.0", + "pyarrow-stubs==10.0.1.6", "twine==4.0.2", "types-openpyxl==3.0.4.5", "types-python-dateutil==2.8.19.6", diff --git a/tests/storage/test_storage.py b/tests/storage/test_storage.py index 5dbc4ed..7d2fbe2 100644 --- a/tests/storage/test_storage.py +++ b/tests/storage/test_storage.py @@ -7,9 +7,12 @@ import boto3 import moto +import pandas as pd +import pyarrow import pytest import time_machine from django.test import override_settings +from pyarrow import parquet from xocto.storage import s3_select, storage @@ -30,6 +33,12 @@ def mock_s3_bucket(mocker): yield bucket +@pytest.fixture +def sample_dataframe(): + data = {"Name": ["Alice", "Bob", "Charlie"], "Age": [25, 30, 35]} + return pd.DataFrame(data) + + def test_memory_storage_used_during_tests(): store = storage.store("some-bucket") assert isinstance(store, storage.MemoryFileStore) @@ -494,6 +503,71 @@ def test_fetch_file_contents_using_s3_select_and_expect_output_in_json_format(se OutputSerialization={"JSON": {"RecordDelimiter": "\n"}}, ) + def test_fetch_file_contents_using_s3_select_with_parquet_as_input(self): + store = storage.S3FileStore("some-bucket") + + # Moto doesn't support faking a response from `select_object_content` that's why + # we need a stub that can return us a fake response. + boto_client = mock.Mock() + store._get_boto_client = mock.Mock(return_value=boto_client) + + # Mock a fake response from S3 Select. + # Note: Response has been heavily trimmed for the sake of this test. + boto_client.select_object_content.return_value = { + "ResponseMetadata": {"HTTPStatusCode": 200}, + # Payload returns an EventStream iterator, therefore mock it using a list. + "Payload": iter([{"Records": {"Payload": b"Foo\nBar"}}]), + } + + assert list( + store.fetch_file_contents_using_s3_select( + key_path="some_file.parquet", + # LIMIT 1 means we want to fetch a single row. + raw_sql="""SELECT * FROM s3Object LIMIT 1""", + input_serializer=s3_select.ParquetInputSerializer(), + output_serializer=s3_select.JSONOutputSerializer(), + ) + ) == ["Foo\nBar"] + + boto_client.select_object_content.assert_called_with( + Bucket="some-bucket", + Key="some_file.parquet", + ExpressionType="SQL", + Expression="SELECT * FROM s3Object LIMIT 1", + InputSerialization={"Parquet": {}}, + OutputSerialization={"JSON": {"RecordDelimiter": "\n"}}, + ) + + def test_fetch_file_contents_using_s3_select_with_parquet_fails_with_scan_range(self): + store = storage.S3FileStore("some-bucket") + + # Moto doesn't support faking a response from `select_object_content` that's why + # we need a stub that can return us a fake response. + boto_client = mock.Mock() + store._get_boto_client = mock.Mock(return_value=boto_client) + + # Mock a fake response from S3 Select. + # Note: Response has been heavily trimmed for the sake of this test. + boto_client.select_object_content.return_value = { + "ResponseMetadata": {"HTTPStatusCode": 200}, + # Payload returns an EventStream iterator, therefore mock it using a list. + "Payload": iter([{"Records": {"Payload": b"Foo\nBar"}}]), + } + + with pytest.raises(ValueError) as error: + + list( + store.fetch_file_contents_using_s3_select( + key_path="some_file.parquet", + raw_sql="""SELECT * FROM s3Object LIMIT 1""", + input_serializer=s3_select.ParquetInputSerializer(), + output_serializer=s3_select.JSONOutputSerializer(), + scan_range=s3_select.ScanRange(0, 100), + ) + ) + + assert str(error.value) == "The scan_range parameter is not supported for parquet files" + @pytest.mark.parametrize("expected_error_code", [400, 401, 403, 500]) def test_fetch_file_contents_using_s3_select_raises_errors(self, expected_error_code): store = storage.S3FileStore("some-bucket") @@ -623,6 +697,18 @@ def test_download_file(self): class TestLocalFileStore: + @classmethod + def setup_class(cls): + cls.sample_dataframe = pd.DataFrame( + {"Name": ["Alice", "Bob", "Charlie"], "Age": [25, 30, 35]} + ) + + cls.csv_data = """Name,Age + Alice,25 + Bob,30 + Charlie,35 + """ + def test_store_and_fetch(self): with tempfile.TemporaryDirectory() as tdir: store = storage.LocalFileStore("bucket", tdir, use_date_in_key_path=False) @@ -771,6 +857,263 @@ def test_fetch_url_with_version(self): fetch_url = store.fetch_url("some/key", version_id="some-version") assert fetch_url == "/media-url/bucket-name/some/some-version/key" + @mock.patch.object(storage.LocalFileStore, "_filepath_for_key_path") + def test_fetch_csv_file_contents_using_s3_select(self, mock__filepath_for_key_path): + + store = storage.LocalFileStore("my_bucket") + mock_csv_data = "Name,Age\nAlice,25\nBob,30\nCharlie,35\n" + with tempfile.NamedTemporaryFile(delete=False, mode="w", suffix=".csv") as tmp_csv_file: + tmp_csv_file.write(mock_csv_data) + tmp_csv_file_path = tmp_csv_file.name + + mock__filepath_for_key_path.return_value = tmp_csv_file_path + + input_serializer = s3_select.CSVInputSerializer( + FileHeaderInfo=s3_select.FileHeaderInfo.USE, FieldDelimiter="," + ) + output_serializer = s3_select.JSONOutputSerializer() + + results = list( + store.fetch_file_contents_using_s3_select( + key_path="my_csv_file.csv", + raw_sql="SELECT * FROM s3object", + input_serializer=input_serializer, + output_serializer=output_serializer, + ) + ) + + expected_results = [ + '{"Name":"Alice","Age":25}\n' + '{"Name":"Bob","Age":30}\n' + '{"Name":"Charlie","Age":35}\n' + ] + assert results == expected_results + + @mock.patch.object(storage.LocalFileStore, "_filepath_for_key_path") + def test_fetch_csv_file_contents_using_s3_select_and_where_statement( + self, mock__filepath_for_key_path + ): + + store = storage.LocalFileStore("my_bucket") + mock_csv_data = "Name,Age\nAlice,25\nBob,30\nCharlie,35\n" + with tempfile.NamedTemporaryFile(delete=False, mode="w", suffix=".csv") as tmp_csv_file: + tmp_csv_file.write(mock_csv_data) + tmp_csv_file_path = tmp_csv_file.name + + mock__filepath_for_key_path.return_value = tmp_csv_file_path + + input_serializer = s3_select.CSVInputSerializer( + FileHeaderInfo=s3_select.FileHeaderInfo.USE, FieldDelimiter="," + ) + output_serializer = s3_select.JSONOutputSerializer() + + results = list( + store.fetch_file_contents_using_s3_select( + key_path="my_csv_file.csv", + raw_sql="SELECT * FROM s3object WHERE Name = 'Alice'", + input_serializer=input_serializer, + output_serializer=output_serializer, + ) + ) + + expected_results = ['{"Name":"Alice","Age":25}\n'] + assert results == expected_results + + @mock.patch.object(storage.LocalFileStore, "_filepath_for_key_path") + def test_fetch_parquet_file_contents_using_s3_select(self, mock__filepath_for_key_path): + + store = storage.LocalFileStore("my_bucket") + mock_data = {"Name": ["Alice", "Bob", "Charlie"], "Age": [25, 30, 35]} + df = pd.DataFrame(mock_data) + + with tempfile.NamedTemporaryFile( + delete=False, mode="w", suffix=".parquet" + ) as tmp_parquet_file: + parquet_file_path = tmp_parquet_file.name + + table = pyarrow.Table.from_pandas(df) + parquet.write_table(table, parquet_file_path) + + mock__filepath_for_key_path.return_value = parquet_file_path + + input_serializer = s3_select.ParquetInputSerializer() + output_serializer = s3_select.JSONOutputSerializer() + + results = list( + store.fetch_file_contents_using_s3_select( + key_path="my_parquet_file.parquet", + raw_sql="SELECT * FROM s3object", + input_serializer=input_serializer, + output_serializer=output_serializer, + ) + ) + + expected_results = [ + '{"Name":"Alice","Age":25}\n' + '{"Name":"Bob","Age":30}\n' + '{"Name":"Charlie","Age":35}\n' + ] + assert results == expected_results + + def test_fetch_nonexistent_file_with_s3_select(self): + + input_serializer = s3_select.CSVInputSerializer(s3_select.FileHeaderInfo.USE) + output_serializer = s3_select.JSONOutputSerializer() + store = storage.LocalFileStore("my_bucket") + + with pytest.raises(FileNotFoundError): + list( + store.fetch_file_contents_using_s3_select( + key_path="nonexistent_file.csv", + raw_sql="SELECT * FROM s3Object", + input_serializer=input_serializer, + output_serializer=output_serializer, + ) + ) + + def test_fetch_file_with_s3_select_scan_range_raises_error(self): + + input_serializer = s3_select.CSVInputSerializer(s3_select.FileHeaderInfo.USE) + output_serializer = s3_select.JSONOutputSerializer() + store = storage.LocalFileStore("my_bucket") + + with pytest.raises(NotImplementedError): + list( + store.fetch_file_contents_using_s3_select( + key_path="nonexistent_file.csv", + raw_sql="SELECT * FROM s3Object", + input_serializer=input_serializer, + output_serializer=output_serializer, + scan_range=s3_select.ScanRange(0, 100), + ) + ) + + @mock.patch.object(storage.LocalFileStore, "_filepath_for_key_path") + def test_json_output_unsupported_record_separator_raises_exception( + self, mock__filepath_for_key_path + ): + + store = storage.LocalFileStore("my_bucket") + mock_data = {"Name": ["Alice", "Bob", "Charlie"], "Age": [25, 30, 35]} + df = pd.DataFrame(mock_data) + + with tempfile.NamedTemporaryFile( + delete=False, mode="w", suffix=".parquet" + ) as tmp_parquet_file: + parquet_file_path = tmp_parquet_file.name + + table = pyarrow.Table.from_pandas(df) + parquet.write_table(table, parquet_file_path) + + mock__filepath_for_key_path.return_value = parquet_file_path + + input_serializer = s3_select.ParquetInputSerializer() + output_serializer = s3_select.JSONOutputSerializer("\r") + + with pytest.raises(NotImplementedError): + list( + store.fetch_file_contents_using_s3_select( + key_path="my_parquet_file.parquet", + raw_sql="SELECT * FROM s3object", + input_serializer=input_serializer, + output_serializer=output_serializer, + ) + ) + + def test_output_csv_with_serializer_quoting_always(self): + store = storage.LocalFileStore("my_bucket") + serializer = s3_select.CSVOutputSerializer(QuoteFields=s3_select.QuoteFields.ALWAYS) + result = store.output_csv_with_serializer( + df=self.sample_dataframe, output_serializer=serializer + ) + expected = '"Name","Age"\n"Alice","25"\n"Bob","30"\n"Charlie","35"\n' + assert result == expected + + def test_output_csv_with_serializer_quoting_as_needed(self): + + sample_dataframe = pd.DataFrame( + {"Name": ["Ali,ce", "Bob", "Charlie"], "Age": [25, 30, 35]} + ) + store = storage.LocalFileStore("my_bucket") + serializer = s3_select.CSVOutputSerializer(QuoteFields=s3_select.QuoteFields.ASNEEDED) + result = store.output_csv_with_serializer( + df=sample_dataframe, output_serializer=serializer + ) + expected = 'Name,Age\n"Ali,ce",25\nBob,30\nCharlie,35\n' + assert result == expected + + def test_output_csv_with_serializer_custom_delimiter(self): + store = storage.LocalFileStore("my_bucket") + serializer = s3_select.CSVOutputSerializer(FieldDelimiter=";") + result = store.output_csv_with_serializer( + df=self.sample_dataframe, output_serializer=serializer + ) + expected = "Name;Age\nAlice;25\nBob;30\nCharlie;35\n" + assert result == expected + + def test_output_csv_with_serializer_custom_escapechar(self): + store = storage.LocalFileStore("my_bucket") + serializer = s3_select.CSVOutputSerializer(QuoteEscapeCharacter="\\") + result = store.output_csv_with_serializer( + df=self.sample_dataframe, output_serializer=serializer + ) + expected = "Name,Age\nAlice,25\nBob,30\nCharlie,35\n" + assert result == expected + + def test_output_csv_with_serializer_custom_record_delimiter(self): + store = storage.LocalFileStore("my_bucket") + serializer = s3_select.CSVOutputSerializer(RecordDelimiter="|") + result = store.output_csv_with_serializer( + df=self.sample_dataframe, output_serializer=serializer + ) + expected = "Name,Age|Alice,25|Bob,30|Charlie,35|" + assert result == expected + + def test_read_csv_with_serializer(self): + store = storage.LocalFileStore("my_bucket") + + with tempfile.NamedTemporaryFile(delete=False, mode="w", suffix=".csv") as tmp_csv_file: + tmp_csv_file.write(self.csv_data) + tmp_csv_file_path = tmp_csv_file.name + input_serializer = s3_select.CSVInputSerializer(s3_select.FileHeaderInfo.USE) + result = store.read_csv_with_serializer( + csv_file_path=tmp_csv_file_path, + csv_input_serializer=input_serializer, + ) + assert isinstance(result, pd.DataFrame) + + def test_query_dataframe_with_sql(self): + + data = { + "string_column": ["A", "B", "C"], + "array_column": [[1, 2, 3], [4, 5, 6], [7, 8, 9]], + } + dummy_df = pd.DataFrame(data) + + store = storage.LocalFileStore("my_bucket") + raw_sql = "SELECT * FROM S3Object WHERE string_column = 'A'" + + result_df = store.query_dataframe_with_sql(raw_sql, dummy_df) + + assert result_df.shape == (1, 2) + + assert result_df["string_column"][0] == "A" + assert result_df["array_column"][0] == [1, 2, 3] + + def test_query_dataframe_with_sql_with_capitalised_object_in_query(self): + + dummy_df = pd.DataFrame(self.sample_dataframe) + + store = storage.LocalFileStore("my_bucket") + raw_sql = "SELECT * FROM s3OBJeCT WHERE Name = 'Alice'" + + result_df = store.query_dataframe_with_sql(raw_sql, dummy_df) + + assert result_df.shape == (1, 2) + + assert result_df["Name"][0] == "Alice" + assert result_df["Age"][0] == 25 + class TestFromUri: def test_adds_set_acl_bucket_owner_if_in_s3_url(self): diff --git a/xocto/__init__.py b/xocto/__init__.py index aef46ac..111dc91 100644 --- a/xocto/__init__.py +++ b/xocto/__init__.py @@ -1 +1 @@ -__version__ = "4.2.1" +__version__ = "4.3.0" diff --git a/xocto/storage/s3_select.py b/xocto/storage/s3_select.py index a9e6a75..ab0472d 100644 --- a/xocto/storage/s3_select.py +++ b/xocto/storage/s3_select.py @@ -90,6 +90,11 @@ class CSVInputSerializer(BaseSerializer): AllowQuotedRecordDelimiter: bool | None = False +@dataclasses.dataclass(frozen=True) +class ParquetInputSerializer(BaseSerializer): + pass + + @dataclasses.dataclass(frozen=True) class CSVOutputSerializer(BaseSerializer): # Indicates whether to use quotation marks around output fields. @@ -165,3 +170,23 @@ def get_serializers_for_csv_file( temp_dict["ScanRange"] = scan_range.to_dict() return temp_dict + + +def get_serializers_for_parquet_file( + output_serializer: JSONOutputSerializer | CSVOutputSerializer, +) -> dict[str, dict[str, Any]]: + """ + Returns input and output serialization dictionaries that should be used to perform a select_object_content query. + + Parquet files do not support input serialization or scan ranges. + """ + return { + "input_serialization": { + "Parquet": {}, + }, + "output_serialization": { + "JSON" + if isinstance(output_serializer, JSONOutputSerializer) + else "CSV": output_serializer.to_dict() + }, + } diff --git a/xocto/storage/storage.py b/xocto/storage/storage.py index a45b444..c30c367 100644 --- a/xocto/storage/storage.py +++ b/xocto/storage/storage.py @@ -2,6 +2,7 @@ import abc import base64 +import csv import dataclasses import datetime import hashlib @@ -27,12 +28,15 @@ ) import boto3 +import duckdb import magic +import pandas as pd from botocore import exceptions as botocore_exceptions from botocore.response import StreamingBody from django.conf import settings from django.urls import reverse from django.utils.module_loading import import_string +from pyarrow import parquet from xocto import events, localtime @@ -550,24 +554,39 @@ def fetch_file_contents_using_s3_select( self, key_path: str, raw_sql: str, - input_serializer: s3_select.CSVInputSerializer, + input_serializer: s3_select.CSVInputSerializer | s3_select.ParquetInputSerializer, output_serializer: s3_select.CSVOutputSerializer | s3_select.JSONOutputSerializer, - compression_type: s3_select.CompressionType, + compression_type: s3_select.CompressionType | None = None, scan_range: s3_select.ScanRange | None = None, chunk_size: int | None = None, ) -> Iterator[str]: """ - Reads a CSV file from S3 using the given SQL statement. + Reads a CSV or Parquet file from S3 using the given SQL statement. Reference: https://dev.to/idrisrampurawala/efficiently-streaming-a-large-aws-s3-file-via-s3-select-4on + Does not support columnar compression for parquet files at present. """ boto_client = self._get_boto_client() - serialization = s3_select.get_serializers_for_csv_file( - input_serializer=input_serializer, - compression_type=compression_type, - output_serializer=output_serializer, - scan_range=scan_range, - ) + + if isinstance(input_serializer, s3_select.CSVInputSerializer): + serialization = s3_select.get_serializers_for_csv_file( + input_serializer=input_serializer, + compression_type=compression_type + if compression_type is not None + else s3_select.CompressionType.NONE, + output_serializer=output_serializer, + scan_range=scan_range, + ) + elif isinstance(input_serializer, s3_select.ParquetInputSerializer): + if scan_range is not None: + raise ValueError("The scan_range parameter is not supported for parquet files") + serialization = s3_select.get_serializers_for_parquet_file( + output_serializer=output_serializer + ) + else: + raise ValueError( + "input_serializer must be either CSVInputSerializer or ParquetInputSerializer" + ) select_object_content_parameters = dict( Bucket=self.bucket_name, @@ -1107,6 +1126,171 @@ def fetch_file(self, key_path: str, version_id: str | None = None) -> StreamingB def fetch_file_contents(self, key_path: str, version_id: str | None = None) -> bytes: return self.fetch_file(key_path, version_id).read() + def fetch_file_contents_using_s3_select( + self, + key_path: str, + raw_sql: str, + input_serializer: s3_select.CSVInputSerializer | s3_select.ParquetInputSerializer, + output_serializer: s3_select.CSVOutputSerializer | s3_select.JSONOutputSerializer, + compression_type: s3_select.CompressionType | None = None, + scan_range: s3_select.ScanRange | None = None, + chunk_size: int | None = None, + ) -> Iterator[str]: + """ + Localdev version of S3FileStore `fetch_file_contents_using_s3_select`. Does not support scan ranges. + Converts the file to a pandas dataframe and then runs the SQL query on it + by creating an in memory DB with the table which a query can be run on. As the file is loaded into memory, + this is not recommended for use with very large files. + + While this should return the same results as the S3FileStore version, + there may be small discrepancies in the results being returned. + """ + + if scan_range is not None: + raise NotImplementedError("Scan ranges are not supported for localdev") + + input_file_path = self._filepath_for_key_path(key_path=key_path) + + if isinstance(input_serializer, s3_select.CSVInputSerializer): + # https://docs.aws.amazon.com/AmazonS3/latest/API/API_CSVInput.html#AmazonS3-Type-CSVInput-QuoteCharacter + df = self.read_csv_with_serializer( + csv_file_path=input_file_path, + csv_input_serializer=input_serializer, + compression_type=compression_type, + ) + elif isinstance(input_serializer, s3_select.ParquetInputSerializer): + parquet_file = parquet.ParquetFile(input_file_path) + df = parquet_file.read().to_pandas() + + filtered_df = self.query_dataframe_with_sql( + raw_sql=raw_sql, + df=df, + ) + + if isinstance(output_serializer, s3_select.JSONOutputSerializer): + if output_serializer.RecordDelimiter != "\n": + raise NotImplementedError( + "Only newline ('\n') is supported as the record delimiter for JSON output in localdev" + ) + result = filtered_df.to_json(orient="records", lines=True, date_format="iso") + elif isinstance(output_serializer, s3_select.CSVOutputSerializer): + result = self.output_csv_with_serializer( + df=filtered_df, + output_serializer=output_serializer, + ) + + yield result + + def query_dataframe_with_sql( + self, + raw_sql: str, + df: pd.DataFrame, + ) -> pd.DataFrame: + + # s3 select requires the from clause to use the identifier "s3object" + # it is case insensitive however so people's queries may use different cases + S3_OBJECT_QUERY_IDENTIFIER = "s3object" + + con = duckdb.connect(database=":memory:") + pattern = re.compile(rf"{S3_OBJECT_QUERY_IDENTIFIER}", re.IGNORECASE) + raw_sql = pattern.sub(S3_OBJECT_QUERY_IDENTIFIER, raw_sql) + con.register(S3_OBJECT_QUERY_IDENTIFIER, df) + + result = con.execute(raw_sql) + + df_filtered = result.fetchdf() + + return df_filtered + + def read_csv_with_serializer( + self, + csv_file_path: str, + csv_input_serializer: s3_select.CSVInputSerializer, + compression_type: s3_select.CompressionType | None = None, + ) -> pd.DataFrame: + + input_serializer = csv_input_serializer.to_dict() + + field_delimiter = input_serializer.get("FieldDelimiter", ",") + compression = compression_type.lower() if compression_type else "infer" + quotechar = input_serializer.get("QuoteCharacter", '"') + escapechar = input_serializer.get("QuoteEscapeCharacter", "\\") + comment = input_serializer.get("Comments") + record_delimiter = input_serializer.get("RecordDelimiter") + header: int | None | str + + if "FileHeaderInfo" in input_serializer.keys(): + + if input_serializer["FileHeaderInfo"] == "NONE": + header = None + elif input_serializer["FileHeaderInfo"] == "IGNORE": + raise NotImplementedError( + "Value `IGNORE` of column selection not implemented for localdev" + ) + elif input_serializer["FileHeaderInfo"] == "USE": + header = 0 + else: + header = "infer" + else: + header = "infer" + + # Define default values for each parameter + default_kwargs = { + "sep": field_delimiter, + "compression": compression, + "quotechar": quotechar, + "escapechar": escapechar, + "comment": comment, + "header": header, + "lineterminator": record_delimiter if record_delimiter != "\n" else None, + "engine": "c" if record_delimiter != "\n" else None, + } + + kwargs = { + key: value + for key, value in default_kwargs.items() + if value is not None or key == "header" + } + + df = pd.read_csv(filepath_or_buffer=csv_file_path, **kwargs) + + return df + + def output_csv_with_serializer( + self, + df: pd.DataFrame, + output_serializer: s3_select.CSVOutputSerializer, + ) -> str: + + output_serializer_dict = output_serializer.to_dict() + + field_delimiter = output_serializer_dict.get("FieldDelimiter", ",") + quotechar = output_serializer_dict.get("QuoteCharacter", '"') + escapechar = output_serializer_dict.get("QuoteEscapeCharacter", "\\") + record_delimiter = output_serializer_dict.get("RecordDelimiter", "\n") + + if "QuoteFields" in output_serializer_dict.keys(): + if output_serializer_dict["QuoteFields"] == "ALWAYS": + quoting = csv.QUOTE_ALL + else: + quoting = None + else: + quoting = None + + default_kwargs = { + "sep": field_delimiter, + "lineterminator": record_delimiter if record_delimiter != "\n" else None, + "quotechar": quotechar, + "escapechar": escapechar, + "quoting": quoting, + } + + kwargs = {key: value for key, value in default_kwargs.items() if value is not None} + + result = df.to_csv(index=False, **kwargs) + + return result + def fetch_url( self, key_path: str,