Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

fix(taps): Use nulls_first when available to order NULL results in incremental SQL streams #2094

Merged
merged 1 commit into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
# Show typehints in the description, along with parameter descriptions
autodoc_typehints = "signature"
autodoc_class_signature = "separated"
autodoc_member_order = "groupwise"

# -- Options for HTML output -------------------------------------------------

Expand Down
1 change: 1 addition & 0 deletions samples/sample_tap_sqlite/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class SQLiteStream(SQLStream):
"""

connector_class = SQLiteConnector
supports_nulls_first = True

# Use a smaller state message frequency to check intermediate state.
STATE_MSG_FREQUENCY = 10
Expand Down
12 changes: 11 additions & 1 deletion singer_sdk/streams/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import abc
import typing as t

from sqlalchemy import nulls_first

import singer_sdk.helpers._catalog as catalog
from singer_sdk._singerlib import CatalogEntry, MetadataMapping
from singer_sdk.connectors import SQLConnector
Expand All @@ -20,6 +22,9 @@ class SQLStream(Stream, metaclass=abc.ABCMeta):
connector_class = SQLConnector
_cached_schema: dict | None = None

supports_nulls_first: bool = False
"""Whether the database supports the NULLS FIRST/LAST syntax."""

def __init__(
self,
tap: Tap,
Expand Down Expand Up @@ -189,7 +194,12 @@ def get_records(self, context: dict | None) -> t.Iterable[dict[str, t.Any]]:

if self.replication_key:
replication_key_col = table.columns[self.replication_key]
query = query.order_by(replication_key_col)
order_by = (
nulls_first(replication_key_col.asc())
if self.supports_nulls_first
else replication_key_col.asc()
)
query = query.order_by(order_by)

start_val = self.get_starting_replication_key_value(context)
if start_val:
Expand Down