From 0c07836ed92294a74aea69cc3f5fe20b1ce37f92 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez=20Mondrag=C3=B3n?= Date: Thu, 7 Dec 2023 20:17:22 -0600 Subject: [PATCH] fix: Use `nulls_first` to order results in incremental SQL streams --- docs/conf.py | 1 + samples/sample_tap_sqlite/__init__.py | 1 + singer_sdk/streams/sql.py | 12 +++++++++++- 3 files changed, 13 insertions(+), 1 deletion(-) diff --git a/docs/conf.py b/docs/conf.py index 9bae7ea2c..81636b492 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -57,6 +57,7 @@ # Show typehints in the description, along with parameter descriptions autodoc_typehints = "signature" autodoc_class_signature = "separated" +autodoc_member_order = "groupwise" # -- Options for HTML output ------------------------------------------------- diff --git a/samples/sample_tap_sqlite/__init__.py b/samples/sample_tap_sqlite/__init__.py index 3aed5d21d..2cd34144f 100644 --- a/samples/sample_tap_sqlite/__init__.py +++ b/samples/sample_tap_sqlite/__init__.py @@ -33,6 +33,7 @@ class SQLiteStream(SQLStream): """ connector_class = SQLiteConnector + supports_nulls_first = True # Use a smaller state message frequency to check intermediate state. STATE_MSG_FREQUENCY = 10 diff --git a/singer_sdk/streams/sql.py b/singer_sdk/streams/sql.py index e605471dc..4ed7be25b 100644 --- a/singer_sdk/streams/sql.py +++ b/singer_sdk/streams/sql.py @@ -5,6 +5,8 @@ import abc import typing as t +from sqlalchemy import nulls_first + import singer_sdk.helpers._catalog as catalog from singer_sdk._singerlib import CatalogEntry, MetadataMapping from singer_sdk.connectors import SQLConnector @@ -20,6 +22,9 @@ class SQLStream(Stream, metaclass=abc.ABCMeta): connector_class = SQLConnector _cached_schema: dict | None = None + supports_nulls_first: bool = False + """Whether the database supports the NULLS FIRST/LAST syntax.""" + def __init__( self, tap: Tap, @@ -189,7 +194,12 @@ def get_records(self, context: dict | None) -> t.Iterable[dict[str, t.Any]]: if self.replication_key: replication_key_col = table.columns[self.replication_key] - query = query.order_by(replication_key_col) + order_by = ( + nulls_first(replication_key_col.asc()) + if self.supports_nulls_first + else replication_key_col.asc() + ) + query = query.order_by(order_by) start_val = self.get_starting_replication_key_value(context) if start_val: