From 1d7a848908ba3cb5c527bc6253c19e5a84a28b18 Mon Sep 17 00:00:00 2001 From: "stainless-app[bot]" <142633134+stainless-app[bot]@users.noreply.github.com> Date: Tue, 4 Feb 2025 12:04:58 +0000 Subject: [PATCH] feat(api): add events streaming --- .stats.yml | 2 +- api.md | 2 +- src/gitpod/_decoders/jsonl.py | 101 +++++++++++++++++++++++++++++ src/gitpod/_response.py | 22 +++++++ src/gitpod/resources/events.py | 23 ++++--- tests/api_resources/test_events.py | 49 +++++++++----- 6 files changed, 171 insertions(+), 28 deletions(-) create mode 100644 src/gitpod/_decoders/jsonl.py diff --git a/.stats.yml b/.stats.yml index fdef86a..f1e1d64 100644 --- a/.stats.yml +++ b/.stats.yml @@ -1,2 +1,2 @@ configured_endpoints: 106 -openapi_spec_url: https://storage.googleapis.com/stainless-sdk-openapi-specs/gitpod%2Fgitpod-2e9f8b8666b2fd4e346a3acbf81a2c82a6f3793e01bc146499708efaf0c250c5.yml +openapi_spec_url: https://storage.googleapis.com/stainless-sdk-openapi-specs/gitpod%2Fgitpod-922f204ec36b8a84ae8f96e73923e92cb2044a14c6497d173f4b7110a090ac30.yml diff --git a/api.md b/api.md index 320dc6f..894da1c 100644 --- a/api.md +++ b/api.md @@ -167,7 +167,7 @@ from gitpod.types import EventListResponse, EventWatchResponse Methods: - client.events.list(\*\*params) -> EventListResponse -- client.events.watch(\*\*params) -> EventWatchResponse +- client.events.watch(\*\*params) -> JSONLDecoder[EventWatchResponse] # Groups diff --git a/src/gitpod/_decoders/jsonl.py b/src/gitpod/_decoders/jsonl.py new file mode 100644 index 0000000..e9d29a1 --- /dev/null +++ b/src/gitpod/_decoders/jsonl.py @@ -0,0 +1,101 @@ +from __future__ import annotations + +import json +from typing_extensions import Generic, TypeVar, Iterator, AsyncIterator + +import httpx + +from .._models import construct_type_unchecked + +_T = TypeVar("_T") + + +class JSONLDecoder(Generic[_T]): + """A decoder for [JSON Lines](https://jsonlines.org) format. + + This class provides an iterator over a byte-iterator that parses each JSON Line + into a given type. + """ + + http_response: httpx.Response | None + """The HTTP response this decoder was constructed from""" + + def __init__( + self, *, raw_iterator: Iterator[bytes], line_type: type[_T], http_response: httpx.Response | None + ) -> None: + super().__init__() + self.http_response = http_response + self._raw_iterator = raw_iterator + self._line_type = line_type + self._iterator = self.__decode__() + + def __decode__(self) -> Iterator[_T]: + buf = b"" + for chunk in self._raw_iterator: + for line in chunk.splitlines(keepends=True): + buf += line + if buf.endswith((b"\r", b"\n", b"\r\n")): + yield construct_type_unchecked( + value=json.loads(buf), + type_=self._line_type, + ) + buf = b"" + + # flush + if buf: + yield construct_type_unchecked( + value=json.loads(buf), + type_=self._line_type, + ) + + def __next__(self) -> _T: + return self._iterator.__next__() + + def __iter__(self) -> Iterator[_T]: + for item in self._iterator: + yield item + + +class AsyncJSONLDecoder(Generic[_T]): + """A decoder for [JSON Lines](https://jsonlines.org) format. + + This class provides an async iterator over a byte-iterator that parses each JSON Line + into a given type. + """ + + http_response: httpx.Response | None + + def __init__( + self, *, raw_iterator: AsyncIterator[bytes], line_type: type[_T], http_response: httpx.Response | None + ) -> None: + super().__init__() + self.http_response = http_response + self._raw_iterator = raw_iterator + self._line_type = line_type + self._iterator = self.__decode__() + + async def __decode__(self) -> AsyncIterator[_T]: + buf = b"" + async for chunk in self._raw_iterator: + for line in chunk.splitlines(keepends=True): + buf += line + if buf.endswith((b"\r", b"\n", b"\r\n")): + yield construct_type_unchecked( + value=json.loads(buf), + type_=self._line_type, + ) + buf = b"" + + # flush + if buf: + yield construct_type_unchecked( + value=json.loads(buf), + type_=self._line_type, + ) + + async def __anext__(self) -> _T: + return await self._iterator.__anext__() + + async def __aiter__(self) -> AsyncIterator[_T]: + async for item in self._iterator: + yield item diff --git a/src/gitpod/_response.py b/src/gitpod/_response.py index 9ffc616..de6c36a 100644 --- a/src/gitpod/_response.py +++ b/src/gitpod/_response.py @@ -30,6 +30,7 @@ from ._constants import RAW_RESPONSE_HEADER, OVERRIDE_CAST_TO_HEADER from ._streaming import Stream, AsyncStream, is_stream_class_type, extract_stream_chunk_type from ._exceptions import GitpodError, APIResponseValidationError +from ._decoders.jsonl import JSONLDecoder, AsyncJSONLDecoder if TYPE_CHECKING: from ._models import FinalRequestOptions @@ -138,6 +139,27 @@ def _parse(self, *, to: type[_T] | None = None) -> R | _T: origin = get_origin(cast_to) or cast_to + if inspect.isclass(origin): + if issubclass(cast(Any, origin), JSONLDecoder): + return cast( + R, + cast("type[JSONLDecoder[Any]]", cast_to)( + raw_iterator=self.http_response.iter_bytes(chunk_size=4096), + line_type=extract_type_arg(cast_to, 0), + http_response=self.http_response, + ), + ) + + if issubclass(cast(Any, origin), AsyncJSONLDecoder): + return cast( + R, + cast("type[AsyncJSONLDecoder[Any]]", cast_to)( + raw_iterator=self.http_response.aiter_bytes(chunk_size=4096), + line_type=extract_type_arg(cast_to, 0), + http_response=self.http_response, + ), + ) + if self._is_sse_stream: if to: if not is_stream_class_type(to): diff --git a/src/gitpod/resources/events.py b/src/gitpod/resources/events.py index 4dc3ff5..508d773 100644 --- a/src/gitpod/resources/events.py +++ b/src/gitpod/resources/events.py @@ -24,6 +24,7 @@ async_to_streamed_response_wrapper, ) from .._base_client import make_request_options +from .._decoders.jsonl import JSONLDecoder, AsyncJSONLDecoder from ..types.event_list_response import EventListResponse from ..types.event_watch_response import EventWatchResponse @@ -136,7 +137,7 @@ def watch( extra_query: Query | None = None, extra_body: Body | None = None, timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN, - ) -> EventWatchResponse: + ) -> JSONLDecoder[EventWatchResponse]: """ WatchEvents streams all requests events to the client @@ -173,7 +174,7 @@ def watch( extra_query: Query | None = None, extra_body: Body | None = None, timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN, - ) -> EventWatchResponse: + ) -> JSONLDecoder[EventWatchResponse]: """ WatchEvents streams all requests events to the client @@ -211,8 +212,8 @@ def watch( extra_query: Query | None = None, extra_body: Body | None = None, timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN, - ) -> EventWatchResponse: - extra_headers = {"Accept": "application/connect+json", **(extra_headers or {})} + ) -> JSONLDecoder[EventWatchResponse]: + extra_headers = {"Accept": "application/jsonl", **(extra_headers or {})} extra_headers = { **strip_not_given( { @@ -234,7 +235,8 @@ def watch( options=make_request_options( extra_headers=extra_headers, extra_query=extra_query, extra_body=extra_body, timeout=timeout ), - cast_to=EventWatchResponse, + cast_to=JSONLDecoder[EventWatchResponse], + stream=True, ) @@ -344,7 +346,7 @@ async def watch( extra_query: Query | None = None, extra_body: Body | None = None, timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN, - ) -> EventWatchResponse: + ) -> AsyncJSONLDecoder[EventWatchResponse]: """ WatchEvents streams all requests events to the client @@ -381,7 +383,7 @@ async def watch( extra_query: Query | None = None, extra_body: Body | None = None, timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN, - ) -> EventWatchResponse: + ) -> AsyncJSONLDecoder[EventWatchResponse]: """ WatchEvents streams all requests events to the client @@ -419,8 +421,8 @@ async def watch( extra_query: Query | None = None, extra_body: Body | None = None, timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN, - ) -> EventWatchResponse: - extra_headers = {"Accept": "application/connect+json", **(extra_headers or {})} + ) -> AsyncJSONLDecoder[EventWatchResponse]: + extra_headers = {"Accept": "application/jsonl", **(extra_headers or {})} extra_headers = { **strip_not_given( { @@ -442,7 +444,8 @@ async def watch( options=make_request_options( extra_headers=extra_headers, extra_query=extra_query, extra_body=extra_body, timeout=timeout ), - cast_to=EventWatchResponse, + cast_to=AsyncJSONLDecoder[EventWatchResponse], + stream=True, ) diff --git a/tests/api_resources/test_events.py b/tests/api_resources/test_events.py index f539a4b..d24091b 100644 --- a/tests/api_resources/test_events.py +++ b/tests/api_resources/test_events.py @@ -10,6 +10,7 @@ from gitpod import Gitpod, AsyncGitpod from tests.utils import assert_matches_type from gitpod.types import EventListResponse, EventWatchResponse +from gitpod._decoders.jsonl import JSONLDecoder, AsyncJSONLDecoder base_url = os.environ.get("TEST_API_BASE_URL", "http://127.0.0.1:4010") @@ -64,14 +65,16 @@ def test_streaming_response_list(self, client: Gitpod) -> None: assert cast(Any, response.is_closed) is True + @pytest.mark.skip(reason="Prism doesn't support JSONL responses yet") @parametrize def test_method_watch_overload_1(self, client: Gitpod) -> None: event = client.events.watch( environment_id="environmentId", connect_protocol_version=1, ) - assert_matches_type(EventWatchResponse, event, path=["response"]) + assert_matches_type(JSONLDecoder[EventWatchResponse], event, path=["response"]) + @pytest.mark.skip(reason="Prism doesn't support JSONL responses yet") @parametrize def test_method_watch_with_all_params_overload_1(self, client: Gitpod) -> None: event = client.events.watch( @@ -79,8 +82,9 @@ def test_method_watch_with_all_params_overload_1(self, client: Gitpod) -> None: connect_protocol_version=1, connect_timeout_ms=0, ) - assert_matches_type(EventWatchResponse, event, path=["response"]) + assert_matches_type(JSONLDecoder[EventWatchResponse], event, path=["response"]) + @pytest.mark.skip(reason="Prism doesn't support JSONL responses yet") @parametrize def test_raw_response_watch_overload_1(self, client: Gitpod) -> None: response = client.events.with_raw_response.watch( @@ -91,8 +95,9 @@ def test_raw_response_watch_overload_1(self, client: Gitpod) -> None: assert response.is_closed is True assert response.http_request.headers.get("X-Stainless-Lang") == "python" event = response.parse() - assert_matches_type(EventWatchResponse, event, path=["response"]) + assert_matches_type(JSONLDecoder[EventWatchResponse], event, path=["response"]) + @pytest.mark.skip(reason="Prism doesn't support JSONL responses yet") @parametrize def test_streaming_response_watch_overload_1(self, client: Gitpod) -> None: with client.events.with_streaming_response.watch( @@ -103,18 +108,20 @@ def test_streaming_response_watch_overload_1(self, client: Gitpod) -> None: assert response.http_request.headers.get("X-Stainless-Lang") == "python" event = response.parse() - assert_matches_type(EventWatchResponse, event, path=["response"]) + assert_matches_type(JSONLDecoder[EventWatchResponse], event, path=["response"]) assert cast(Any, response.is_closed) is True + @pytest.mark.skip(reason="Prism doesn't support JSONL responses yet") @parametrize def test_method_watch_overload_2(self, client: Gitpod) -> None: event = client.events.watch( organization=True, connect_protocol_version=1, ) - assert_matches_type(EventWatchResponse, event, path=["response"]) + assert_matches_type(JSONLDecoder[EventWatchResponse], event, path=["response"]) + @pytest.mark.skip(reason="Prism doesn't support JSONL responses yet") @parametrize def test_method_watch_with_all_params_overload_2(self, client: Gitpod) -> None: event = client.events.watch( @@ -122,8 +129,9 @@ def test_method_watch_with_all_params_overload_2(self, client: Gitpod) -> None: connect_protocol_version=1, connect_timeout_ms=0, ) - assert_matches_type(EventWatchResponse, event, path=["response"]) + assert_matches_type(JSONLDecoder[EventWatchResponse], event, path=["response"]) + @pytest.mark.skip(reason="Prism doesn't support JSONL responses yet") @parametrize def test_raw_response_watch_overload_2(self, client: Gitpod) -> None: response = client.events.with_raw_response.watch( @@ -134,8 +142,9 @@ def test_raw_response_watch_overload_2(self, client: Gitpod) -> None: assert response.is_closed is True assert response.http_request.headers.get("X-Stainless-Lang") == "python" event = response.parse() - assert_matches_type(EventWatchResponse, event, path=["response"]) + assert_matches_type(JSONLDecoder[EventWatchResponse], event, path=["response"]) + @pytest.mark.skip(reason="Prism doesn't support JSONL responses yet") @parametrize def test_streaming_response_watch_overload_2(self, client: Gitpod) -> None: with client.events.with_streaming_response.watch( @@ -146,7 +155,7 @@ def test_streaming_response_watch_overload_2(self, client: Gitpod) -> None: assert response.http_request.headers.get("X-Stainless-Lang") == "python" event = response.parse() - assert_matches_type(EventWatchResponse, event, path=["response"]) + assert_matches_type(JSONLDecoder[EventWatchResponse], event, path=["response"]) assert cast(Any, response.is_closed) is True @@ -201,14 +210,16 @@ async def test_streaming_response_list(self, async_client: AsyncGitpod) -> None: assert cast(Any, response.is_closed) is True + @pytest.mark.skip(reason="Prism doesn't support JSONL responses yet") @parametrize async def test_method_watch_overload_1(self, async_client: AsyncGitpod) -> None: event = await async_client.events.watch( environment_id="environmentId", connect_protocol_version=1, ) - assert_matches_type(EventWatchResponse, event, path=["response"]) + assert_matches_type(AsyncJSONLDecoder[EventWatchResponse], event, path=["response"]) + @pytest.mark.skip(reason="Prism doesn't support JSONL responses yet") @parametrize async def test_method_watch_with_all_params_overload_1(self, async_client: AsyncGitpod) -> None: event = await async_client.events.watch( @@ -216,8 +227,9 @@ async def test_method_watch_with_all_params_overload_1(self, async_client: Async connect_protocol_version=1, connect_timeout_ms=0, ) - assert_matches_type(EventWatchResponse, event, path=["response"]) + assert_matches_type(AsyncJSONLDecoder[EventWatchResponse], event, path=["response"]) + @pytest.mark.skip(reason="Prism doesn't support JSONL responses yet") @parametrize async def test_raw_response_watch_overload_1(self, async_client: AsyncGitpod) -> None: response = await async_client.events.with_raw_response.watch( @@ -228,8 +240,9 @@ async def test_raw_response_watch_overload_1(self, async_client: AsyncGitpod) -> assert response.is_closed is True assert response.http_request.headers.get("X-Stainless-Lang") == "python" event = await response.parse() - assert_matches_type(EventWatchResponse, event, path=["response"]) + assert_matches_type(AsyncJSONLDecoder[EventWatchResponse], event, path=["response"]) + @pytest.mark.skip(reason="Prism doesn't support JSONL responses yet") @parametrize async def test_streaming_response_watch_overload_1(self, async_client: AsyncGitpod) -> None: async with async_client.events.with_streaming_response.watch( @@ -240,18 +253,20 @@ async def test_streaming_response_watch_overload_1(self, async_client: AsyncGitp assert response.http_request.headers.get("X-Stainless-Lang") == "python" event = await response.parse() - assert_matches_type(EventWatchResponse, event, path=["response"]) + assert_matches_type(AsyncJSONLDecoder[EventWatchResponse], event, path=["response"]) assert cast(Any, response.is_closed) is True + @pytest.mark.skip(reason="Prism doesn't support JSONL responses yet") @parametrize async def test_method_watch_overload_2(self, async_client: AsyncGitpod) -> None: event = await async_client.events.watch( organization=True, connect_protocol_version=1, ) - assert_matches_type(EventWatchResponse, event, path=["response"]) + assert_matches_type(AsyncJSONLDecoder[EventWatchResponse], event, path=["response"]) + @pytest.mark.skip(reason="Prism doesn't support JSONL responses yet") @parametrize async def test_method_watch_with_all_params_overload_2(self, async_client: AsyncGitpod) -> None: event = await async_client.events.watch( @@ -259,8 +274,9 @@ async def test_method_watch_with_all_params_overload_2(self, async_client: Async connect_protocol_version=1, connect_timeout_ms=0, ) - assert_matches_type(EventWatchResponse, event, path=["response"]) + assert_matches_type(AsyncJSONLDecoder[EventWatchResponse], event, path=["response"]) + @pytest.mark.skip(reason="Prism doesn't support JSONL responses yet") @parametrize async def test_raw_response_watch_overload_2(self, async_client: AsyncGitpod) -> None: response = await async_client.events.with_raw_response.watch( @@ -271,8 +287,9 @@ async def test_raw_response_watch_overload_2(self, async_client: AsyncGitpod) -> assert response.is_closed is True assert response.http_request.headers.get("X-Stainless-Lang") == "python" event = await response.parse() - assert_matches_type(EventWatchResponse, event, path=["response"]) + assert_matches_type(AsyncJSONLDecoder[EventWatchResponse], event, path=["response"]) + @pytest.mark.skip(reason="Prism doesn't support JSONL responses yet") @parametrize async def test_streaming_response_watch_overload_2(self, async_client: AsyncGitpod) -> None: async with async_client.events.with_streaming_response.watch( @@ -283,6 +300,6 @@ async def test_streaming_response_watch_overload_2(self, async_client: AsyncGitp assert response.http_request.headers.get("X-Stainless-Lang") == "python" event = await response.parse() - assert_matches_type(EventWatchResponse, event, path=["response"]) + assert_matches_type(AsyncJSONLDecoder[EventWatchResponse], event, path=["response"]) assert cast(Any, response.is_closed) is True