Skip to content

Commit

Permalink
feat(api): add events streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
stainless-app[bot] committed Feb 4, 2025
1 parent ec5eb7e commit 1d7a848
Show file tree
Hide file tree
Showing 6 changed files with 171 additions and 28 deletions.
2 changes: 1 addition & 1 deletion .stats.yml
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
configured_endpoints: 106
openapi_spec_url: https://storage.googleapis.com/stainless-sdk-openapi-specs/gitpod%2Fgitpod-2e9f8b8666b2fd4e346a3acbf81a2c82a6f3793e01bc146499708efaf0c250c5.yml
openapi_spec_url: https://storage.googleapis.com/stainless-sdk-openapi-specs/gitpod%2Fgitpod-922f204ec36b8a84ae8f96e73923e92cb2044a14c6497d173f4b7110a090ac30.yml
2 changes: 1 addition & 1 deletion api.md
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ from gitpod.types import EventListResponse, EventWatchResponse
Methods:

- <code title="get /gitpod.v1.EventService/ListAuditLogs">client.events.<a href="./src/gitpod/resources/events.py">list</a>(\*\*<a href="src/gitpod/types/event_list_params.py">params</a>) -> <a href="./src/gitpod/types/event_list_response.py">EventListResponse</a></code>
- <code title="post /gitpod.v1.EventService/WatchEvents">client.events.<a href="./src/gitpod/resources/events.py">watch</a>(\*\*<a href="src/gitpod/types/event_watch_params.py">params</a>) -> <a href="./src/gitpod/types/event_watch_response.py">EventWatchResponse</a></code>
- <code title="post /gitpod.v1.EventService/WatchEvents">client.events.<a href="./src/gitpod/resources/events.py">watch</a>(\*\*<a href="src/gitpod/types/event_watch_params.py">params</a>) -> <a href="./src/gitpod/types/event_watch_response.py">JSONLDecoder[EventWatchResponse]</a></code>

# Groups

Expand Down
101 changes: 101 additions & 0 deletions src/gitpod/_decoders/jsonl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
from __future__ import annotations

import json
from typing_extensions import Generic, TypeVar, Iterator, AsyncIterator

import httpx

from .._models import construct_type_unchecked

_T = TypeVar("_T")


class JSONLDecoder(Generic[_T]):
"""A decoder for [JSON Lines](https://jsonlines.org) format.
This class provides an iterator over a byte-iterator that parses each JSON Line
into a given type.
"""

http_response: httpx.Response | None
"""The HTTP response this decoder was constructed from"""

def __init__(
self, *, raw_iterator: Iterator[bytes], line_type: type[_T], http_response: httpx.Response | None
) -> None:
super().__init__()
self.http_response = http_response
self._raw_iterator = raw_iterator
self._line_type = line_type
self._iterator = self.__decode__()

def __decode__(self) -> Iterator[_T]:
buf = b""
for chunk in self._raw_iterator:
for line in chunk.splitlines(keepends=True):
buf += line
if buf.endswith((b"\r", b"\n", b"\r\n")):
yield construct_type_unchecked(
value=json.loads(buf),
type_=self._line_type,
)
buf = b""

# flush
if buf:
yield construct_type_unchecked(
value=json.loads(buf),
type_=self._line_type,
)

def __next__(self) -> _T:
return self._iterator.__next__()

def __iter__(self) -> Iterator[_T]:
for item in self._iterator:
yield item


class AsyncJSONLDecoder(Generic[_T]):
"""A decoder for [JSON Lines](https://jsonlines.org) format.
This class provides an async iterator over a byte-iterator that parses each JSON Line
into a given type.
"""

http_response: httpx.Response | None

def __init__(
self, *, raw_iterator: AsyncIterator[bytes], line_type: type[_T], http_response: httpx.Response | None
) -> None:
super().__init__()
self.http_response = http_response
self._raw_iterator = raw_iterator
self._line_type = line_type
self._iterator = self.__decode__()

async def __decode__(self) -> AsyncIterator[_T]:
buf = b""
async for chunk in self._raw_iterator:
for line in chunk.splitlines(keepends=True):
buf += line
if buf.endswith((b"\r", b"\n", b"\r\n")):
yield construct_type_unchecked(
value=json.loads(buf),
type_=self._line_type,
)
buf = b""

# flush
if buf:
yield construct_type_unchecked(
value=json.loads(buf),
type_=self._line_type,
)

async def __anext__(self) -> _T:
return await self._iterator.__anext__()

async def __aiter__(self) -> AsyncIterator[_T]:
async for item in self._iterator:
yield item
22 changes: 22 additions & 0 deletions src/gitpod/_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from ._constants import RAW_RESPONSE_HEADER, OVERRIDE_CAST_TO_HEADER
from ._streaming import Stream, AsyncStream, is_stream_class_type, extract_stream_chunk_type
from ._exceptions import GitpodError, APIResponseValidationError
from ._decoders.jsonl import JSONLDecoder, AsyncJSONLDecoder

if TYPE_CHECKING:
from ._models import FinalRequestOptions
Expand Down Expand Up @@ -138,6 +139,27 @@ def _parse(self, *, to: type[_T] | None = None) -> R | _T:

origin = get_origin(cast_to) or cast_to

if inspect.isclass(origin):
if issubclass(cast(Any, origin), JSONLDecoder):
return cast(
R,
cast("type[JSONLDecoder[Any]]", cast_to)(
raw_iterator=self.http_response.iter_bytes(chunk_size=4096),
line_type=extract_type_arg(cast_to, 0),
http_response=self.http_response,
),
)

if issubclass(cast(Any, origin), AsyncJSONLDecoder):
return cast(
R,
cast("type[AsyncJSONLDecoder[Any]]", cast_to)(
raw_iterator=self.http_response.aiter_bytes(chunk_size=4096),
line_type=extract_type_arg(cast_to, 0),
http_response=self.http_response,
),
)

if self._is_sse_stream:
if to:
if not is_stream_class_type(to):
Expand Down
23 changes: 13 additions & 10 deletions src/gitpod/resources/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
async_to_streamed_response_wrapper,
)
from .._base_client import make_request_options
from .._decoders.jsonl import JSONLDecoder, AsyncJSONLDecoder
from ..types.event_list_response import EventListResponse
from ..types.event_watch_response import EventWatchResponse

Expand Down Expand Up @@ -136,7 +137,7 @@ def watch(
extra_query: Query | None = None,
extra_body: Body | None = None,
timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN,
) -> EventWatchResponse:
) -> JSONLDecoder[EventWatchResponse]:
"""
WatchEvents streams all requests events to the client
Expand Down Expand Up @@ -173,7 +174,7 @@ def watch(
extra_query: Query | None = None,
extra_body: Body | None = None,
timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN,
) -> EventWatchResponse:
) -> JSONLDecoder[EventWatchResponse]:
"""
WatchEvents streams all requests events to the client
Expand Down Expand Up @@ -211,8 +212,8 @@ def watch(
extra_query: Query | None = None,
extra_body: Body | None = None,
timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN,
) -> EventWatchResponse:
extra_headers = {"Accept": "application/connect+json", **(extra_headers or {})}
) -> JSONLDecoder[EventWatchResponse]:
extra_headers = {"Accept": "application/jsonl", **(extra_headers or {})}
extra_headers = {
**strip_not_given(
{
Expand All @@ -234,7 +235,8 @@ def watch(
options=make_request_options(
extra_headers=extra_headers, extra_query=extra_query, extra_body=extra_body, timeout=timeout
),
cast_to=EventWatchResponse,
cast_to=JSONLDecoder[EventWatchResponse],
stream=True,
)


Expand Down Expand Up @@ -344,7 +346,7 @@ async def watch(
extra_query: Query | None = None,
extra_body: Body | None = None,
timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN,
) -> EventWatchResponse:
) -> AsyncJSONLDecoder[EventWatchResponse]:
"""
WatchEvents streams all requests events to the client
Expand Down Expand Up @@ -381,7 +383,7 @@ async def watch(
extra_query: Query | None = None,
extra_body: Body | None = None,
timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN,
) -> EventWatchResponse:
) -> AsyncJSONLDecoder[EventWatchResponse]:
"""
WatchEvents streams all requests events to the client
Expand Down Expand Up @@ -419,8 +421,8 @@ async def watch(
extra_query: Query | None = None,
extra_body: Body | None = None,
timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN,
) -> EventWatchResponse:
extra_headers = {"Accept": "application/connect+json", **(extra_headers or {})}
) -> AsyncJSONLDecoder[EventWatchResponse]:
extra_headers = {"Accept": "application/jsonl", **(extra_headers or {})}
extra_headers = {
**strip_not_given(
{
Expand All @@ -442,7 +444,8 @@ async def watch(
options=make_request_options(
extra_headers=extra_headers, extra_query=extra_query, extra_body=extra_body, timeout=timeout
),
cast_to=EventWatchResponse,
cast_to=AsyncJSONLDecoder[EventWatchResponse],
stream=True,
)


Expand Down
Loading

0 comments on commit 1d7a848

Please # to comment.