Skip to content

Commit

Permalink
feat(targets): The last received Singer message is now logged when th…
Browse files Browse the repository at this point in the history
…e target fails
  • Loading branch information
edgarrmondragon committed Nov 14, 2024
1 parent 25527c4 commit 7ad9f5e
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 1 deletion.
15 changes: 14 additions & 1 deletion singer_sdk/_singerlib/encoding/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,25 @@ class SingerMessageType(str, enum.Enum):
class GenericSingerReader(t.Generic[T], metaclass=abc.ABCMeta):
"""Interface for all plugins reading Singer messages as strings or bytes."""

def __init__(self) -> None:
super().__init__()
self._current_message: T | None = None

@t.final
def listen(self, file_input: t.IO[T] | None = None) -> None:
"""Read from input until all messages are processed.
Args:
file_input: Readable stream of messages. Defaults to standard in.
"""
self._process_lines(file_input or self.default_input)
try:
self._process_lines(file_input or self.default_input)
except Exception:
logger.info(
"Failed while processing Singer message: %s",
self._current_message,
)
raise
self._process_endofpipe()

def _process_lines(self, file_input: t.IO[T]) -> t.Counter[str]:
Expand All @@ -59,6 +70,8 @@ def _process_lines(self, file_input: t.IO[T]) -> t.Counter[str]:
"""
stats: dict[str, int] = defaultdict(int)
for line in file_input:
self._current_message = line

line_dict = self.deserialize_json(line)
self._assert_line_requires(line_dict, requires={"type"})

Expand Down
24 changes: 24 additions & 0 deletions tests/core/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io
import itertools
import json
import logging
from contextlib import nullcontext, redirect_stdout
from textwrap import dedent

Expand Down Expand Up @@ -83,6 +84,29 @@ def test_listen_unknown_message():
reader.listen(input_lines)


def test_listen_error(caplog: pytest.LogCaptureFixture):
class ErrorReader(DummyReader):
def _process_record_message(self, message_dict: dict) -> None: # noqa: ARG002
msg = "Bad record"
raise ValueError(msg)

message = RecordMessage(
stream="users",
record={"id": 1, "value": 1.23},
)

input_lines = io.StringIO(json.dumps(message.to_dict()) + "\n")

reader = ErrorReader()
with caplog.at_level(logging.INFO), pytest.raises(ValueError, match="Bad record"):
reader.listen(input_lines)

assert caplog.records

message = caplog.records[0].message
assert "Failed while processing Singer message" in message


def test_write_message():
writer = SingerWriter()
message = RecordMessage(
Expand Down

0 comments on commit 7ad9f5e

Please # to comment.