From 1eb5b39b7a35bf586ba23525af2fa358f93e44d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Fri, 29 Nov 2024 17:08:15 -0600 Subject: [PATCH 1/6] fix: Use FS-specific `listdir` in folder tap --- singer_sdk/contrib/filesystem/tap.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/singer_sdk/contrib/filesystem/tap.py b/singer_sdk/contrib/filesystem/tap.py index 58ca673d8..cce52f409 100644 --- a/singer_sdk/contrib/filesystem/tap.py +++ b/singer_sdk/contrib/filesystem/tap.py @@ -178,7 +178,7 @@ def discover_streams(self) -> list: filepaths=[os.path.join(path, member)], # noqa: PTH118 filesystem=self.fs, ) - for member in os.listdir(path) + for member in self.fs.listdir(path, detail=False) if member.endswith(self.valid_extensions) ] @@ -189,7 +189,7 @@ def discover_streams(self) -> list: name=self.config["stream_name"], filepaths=[ os.path.join(path, member) # noqa: PTH118 - for member in os.listdir(path) + for member in self.fs.listdir(path, detail=False) if member.endswith(self.valid_extensions) ], filesystem=self.fs, From 14d6d40aa4548c7f891f657aede235b1dfa0db0e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Fri, 29 Nov 2024 19:16:25 -0600 Subject: [PATCH 2/6] Use details --- singer_sdk/contrib/filesystem/tap.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/singer_sdk/contrib/filesystem/tap.py b/singer_sdk/contrib/filesystem/tap.py index cce52f409..8584a6ca8 100644 --- a/singer_sdk/contrib/filesystem/tap.py +++ b/singer_sdk/contrib/filesystem/tap.py @@ -174,12 +174,12 @@ def discover_streams(self) -> list: return [ self.default_stream_class( tap=self, - name=file_path_to_stream_name(member), - filepaths=[os.path.join(path, member)], # noqa: PTH118 + name=file_path_to_stream_name(member["name"]), + filepaths=[os.path.join(path, member["name"])], # noqa: PTH118 filesystem=self.fs, ) - for member in self.fs.listdir(path, detail=False) - if member.endswith(self.valid_extensions) + for member in self.fs.listdir(path) + if member["name"].endswith(self.valid_extensions) ] # Merge @@ -188,9 +188,9 @@ def discover_streams(self) -> list: tap=self, name=self.config["stream_name"], filepaths=[ - os.path.join(path, member) # noqa: PTH118 - for member in self.fs.listdir(path, detail=False) - if member.endswith(self.valid_extensions) + os.path.join(path, member["name"]) # noqa: PTH118 + for member in self.fs.listdir(path) + if member["name"].endswith(self.valid_extensions) ], filesystem=self.fs, ) From cbc4486a71b41d9bec59b550b5cf1675949ffbba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Fri, 29 Nov 2024 19:45:42 -0600 Subject: [PATCH 3/6] Fix tests --- singer_sdk/contrib/filesystem/tap.py | 5 +++-- singer_sdk/testing/templates.py | 2 +- tests/samples/test_tap_csv.py | 22 ++++++++++++++-------- 3 files changed, 18 insertions(+), 11 deletions(-) diff --git a/singer_sdk/contrib/filesystem/tap.py b/singer_sdk/contrib/filesystem/tap.py index 8584a6ca8..c882385a3 100644 --- a/singer_sdk/contrib/filesystem/tap.py +++ b/singer_sdk/contrib/filesystem/tap.py @@ -188,9 +188,10 @@ def discover_streams(self) -> list: tap=self, name=self.config["stream_name"], filepaths=[ - os.path.join(path, member["name"]) # noqa: PTH118 + member["name"] for member in self.fs.listdir(path) - if member["name"].endswith(self.valid_extensions) + if member["type"] == "file" + and member["name"].endswith(self.valid_extensions) ], filesystem=self.fs, ) diff --git a/singer_sdk/testing/templates.py b/singer_sdk/testing/templates.py index 8a21f639c..4fa708ab0 100644 --- a/singer_sdk/testing/templates.py +++ b/singer_sdk/testing/templates.py @@ -104,7 +104,7 @@ def run( ValueError: if Test instance does not have `name` and `type` properties. """ if not self.name or not self.plugin_type: - msg = "Test must have 'name' and 'type' properties." + msg = "Test must have 'name' and 'plugin_type' properties." raise ValueError(msg) self.config = config diff --git a/tests/samples/test_tap_csv.py b/tests/samples/test_tap_csv.py index cb16e0e0e..8d735c3f2 100644 --- a/tests/samples/test_tap_csv.py +++ b/tests/samples/test_tap_csv.py @@ -1,11 +1,15 @@ from __future__ import annotations import datetime +import typing as t import pytest from samples.sample_tap_csv.sample_tap_csv import SampleTapCSV -from singer_sdk.testing import SuiteConfig, get_tap_test_class +from singer_sdk.testing import SuiteConfig, TapTestRunner, get_tap_test_class + +if t.TYPE_CHECKING: + from samples.sample_tap_csv.client import CSVStream _TestCSVMerge = get_tap_test_class( tap_class=SampleTapCSV, @@ -76,10 +80,12 @@ class TestCSVOneStreamPerFile(_TestCSVOneStreamPerFile): class TestCSVOneStreamPerFileIncremental(_TestCSVOneStreamPerFileIncremental): - @pytest.mark.xfail(reason="No records are extracted", strict=True) - def test_tap_stream_transformed_catalog_schema_matches_record(self, stream: str): - super().test_tap_stream_transformed_catalog_schema_matches_record(stream) - - @pytest.mark.xfail(reason="No records are extracted", strict=True) - def test_tap_stream_returns_record(self, stream: str): - super().test_tap_stream_returns_record(stream) + def test_tap_stream_returns_record( + self, + config: SuiteConfig, + resource: t.Any, + runner: TapTestRunner, + stream: CSVStream, + ): + with pytest.warns(UserWarning): + super().test_tap_stream_returns_record(config, resource, runner, stream) From edbd7e03c8d856629964716cd37d8e03fd632312 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Fri, 29 Nov 2024 20:11:04 -0600 Subject: [PATCH 4/6] Use `DirFileSystem` wrapper --- singer_sdk/contrib/filesystem/tap.py | 30 +++++++++++++++++----------- tests/samples/test_tap_csv.py | 19 ++++++++++++++++-- 2 files changed, 35 insertions(+), 14 deletions(-) diff --git a/singer_sdk/contrib/filesystem/tap.py b/singer_sdk/contrib/filesystem/tap.py index c882385a3..88b08ac34 100644 --- a/singer_sdk/contrib/filesystem/tap.py +++ b/singer_sdk/contrib/filesystem/tap.py @@ -5,11 +5,12 @@ import enum import functools import logging -import os import typing as t from pathlib import Path import fsspec +import fsspec.implementations +import fsspec.implementations.dirfs import singer_sdk.typing as th from singer_sdk import Tap @@ -138,6 +139,11 @@ def read_mode(self) -> ReadMode: """Folder read mode.""" return ReadMode(self.config["read_mode"]) + @functools.cached_property + def path(self) -> str: + """Return the path to the directory.""" + return self.config["path"] + @functools.cached_property def fs(self) -> fsspec.AbstractFileSystem: """Return the filesystem object. @@ -147,13 +153,14 @@ def fs(self) -> fsspec.AbstractFileSystem: """ protocol = self.config["filesystem"] if protocol != "local" and protocol not in self.config: # pragma: no cover - msg = "Filesytem configuration is missing" + msg = "Filesystem configuration is missing" raise ConfigValidationError( msg, errors=[f"Missing configuration for filesystem {protocol}"], ) - logger.info("Instatiating filesystem inteface: '%s'", protocol) - return fsspec.filesystem(protocol, **self.config.get(protocol, {})) + logger.info("Instantiating filesystem interface: '%s'", protocol) + fs = fsspec.filesystem(protocol, **self.config.get(protocol, {})) + return fsspec.implementations.dirfs.DirFileSystem(path=self.path, fs=fs) def discover_streams(self) -> list: """Return a list of discovered streams. @@ -162,11 +169,9 @@ def discover_streams(self) -> list: ValueError: If the path does not exist or is not a directory. """ # A directory for now, but could be a glob pattern. - path: str = self.config["path"] - - if not self.fs.exists(path) or not self.fs.isdir(path): # pragma: no cover + if not self.fs.exists(".") or not self.fs.isdir("."): # pragma: no cover # Raise a more specific error if the path is not a directory. - msg = f"Path {path} does not exist or is not a directory" + msg = f"Path {self.path} does not exist or is not a directory" raise ValueError(msg) # One stream per file @@ -175,11 +180,12 @@ def discover_streams(self) -> list: self.default_stream_class( tap=self, name=file_path_to_stream_name(member["name"]), - filepaths=[os.path.join(path, member["name"])], # noqa: PTH118 + filepaths=[member["name"]], filesystem=self.fs, ) - for member in self.fs.listdir(path) - if member["name"].endswith(self.valid_extensions) + for member in self.fs.listdir(".") + if member["type"] == "file" + and member["name"].endswith(self.valid_extensions) ] # Merge @@ -189,7 +195,7 @@ def discover_streams(self) -> list: name=self.config["stream_name"], filepaths=[ member["name"] - for member in self.fs.listdir(path) + for member in self.fs.listdir(".") if member["type"] == "file" and member["name"].endswith(self.valid_extensions) ], diff --git a/tests/samples/test_tap_csv.py b/tests/samples/test_tap_csv.py index 8d735c3f2..217022f2a 100644 --- a/tests/samples/test_tap_csv.py +++ b/tests/samples/test_tap_csv.py @@ -48,7 +48,7 @@ class TestCSVOneStreamPerFile(_TestCSVOneStreamPerFile): "customers": { "partitions": [ { - "context": {"_sdc_path": "fixtures/csv/customers.csv"}, + "context": {"_sdc_path": "./customers.csv"}, "replication_key": "_sdc_modified_at", "replication_key_value": FUTURE.isoformat(), } @@ -57,7 +57,7 @@ class TestCSVOneStreamPerFile(_TestCSVOneStreamPerFile): "employees": { "partitions": [ { - "context": {"_sdc_path": "fixtures/csv/employees.csv"}, + "context": {"_sdc_path": "./employees.csv"}, "replication_key": "_sdc_modified_at", "replication_key_value": FUTURE.isoformat(), } @@ -80,6 +80,21 @@ class TestCSVOneStreamPerFile(_TestCSVOneStreamPerFile): class TestCSVOneStreamPerFileIncremental(_TestCSVOneStreamPerFileIncremental): + def test_tap_stream_transformed_catalog_schema_matches_record( + self, + config: SuiteConfig, + resource: t.Any, + runner: TapTestRunner, + stream: CSVStream, + ): + with pytest.warns(UserWarning): + super().test_tap_stream_transformed_catalog_schema_matches_record( + config, + resource, + runner, + stream, + ) + def test_tap_stream_returns_record( self, config: SuiteConfig, From 53b5dcb406bf2641d6172cb90e800b9279c3e84a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Fri, 29 Nov 2024 20:15:00 -0600 Subject: [PATCH 5/6] Make mypy happy --- singer_sdk/contrib/filesystem/tap.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/singer_sdk/contrib/filesystem/tap.py b/singer_sdk/contrib/filesystem/tap.py index 88b08ac34..ff0c98819 100644 --- a/singer_sdk/contrib/filesystem/tap.py +++ b/singer_sdk/contrib/filesystem/tap.py @@ -142,7 +142,7 @@ def read_mode(self) -> ReadMode: @functools.cached_property def path(self) -> str: """Return the path to the directory.""" - return self.config["path"] + return self.config["path"] # type: ignore[no-any-return] @functools.cached_property def fs(self) -> fsspec.AbstractFileSystem: @@ -159,8 +159,12 @@ def fs(self) -> fsspec.AbstractFileSystem: errors=[f"Missing configuration for filesystem {protocol}"], ) logger.info("Instantiating filesystem interface: '%s'", protocol) - fs = fsspec.filesystem(protocol, **self.config.get(protocol, {})) - return fsspec.implementations.dirfs.DirFileSystem(path=self.path, fs=fs) + + return fsspec.implementations.dirfs.DirFileSystem( + path=self.path, + target_protocol=protocol, + target_options=self.config.get(protocol), + ) def discover_streams(self) -> list: """Return a list of discovered streams. From 3fbe4c1093a1035e1c49cbcfd4f823749e9fcd5e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez=20Mondrag=C3=B3n?= <16805946+edgarrmondragon@users.noreply.github.com> Date: Fri, 29 Nov 2024 20:21:25 -0600 Subject: [PATCH 6/6] Update singer_sdk/testing/templates.py --- singer_sdk/testing/templates.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/singer_sdk/testing/templates.py b/singer_sdk/testing/templates.py index 4fa708ab0..716caaf0a 100644 --- a/singer_sdk/testing/templates.py +++ b/singer_sdk/testing/templates.py @@ -103,7 +103,7 @@ def run( Raises: ValueError: if Test instance does not have `name` and `type` properties. """ - if not self.name or not self.plugin_type: + if not self.name or not self.plugin_type: # pragma: no cover msg = "Test must have 'name' and 'plugin_type' properties." raise ValueError(msg)