-
Notifications
You must be signed in to change notification settings - Fork 100
feat: async adapter #208
New issue
Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? # to your account
base: 1.0.x-alpha
Are you sure you want to change the base?
feat: async adapter #208
Conversation
// TODO: is using `@rsocket/rxjs` as intermediary adapter a bad idea? | ||
// - considerations: | ||
// - do we lose support for backpressure that we wouldn't have otherwise? | ||
// - what is bundle size consequences of relying on `@rsocket/rxjs`? | ||
// - what is bundle size consequences of relying on `rxjs` and `rxjs-for-await` | ||
const $responderObs = RxRequestersFactory.requestChannel( | ||
$requesterObs, | ||
inputCodec, | ||
outputCodec, | ||
prefetch | ||
)(rsocket, metadata); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is where I leveraged the Rx adapter to support requestChannel more easily, but doing so raises some questions, which I've detailed in the comment, and below:
- do we lose support for backpressure that we wouldn't have otherwise?
- what is bundle size consequences of relying on
@rsocket/rxjs
? - what is bundle size consequences of relying on
rxjs
andrxjs-for-await
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@OlegDokuka can you help out with providing an alternative async adapter implementation for requestChannel here?
const subscriberFactory = RxRespondersFactory.requestChannel( | ||
($in) => from(handler(eachValueFrom($in))), | ||
codecs, | ||
prefetch | ||
); | ||
|
||
return subscriberFactory(payload, initialRequestN, isCompleted, s); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same idea here as above, I leveraged the RX adapter to more easily provide this functionality, but not sure if it's the best idea.
export function requestStream<T, R>( | ||
handler: DefaultResponderHandlerSignature<T>, | ||
codecs: { | ||
inputCodec: Codec<T>; | ||
outputCodec: Codec<R>; | ||
} | ||
) { | ||
return Object.assign< | ||
DefaultResponderHandlerSignature<Payload>, | ||
{ requestType: FrameTypes.REQUEST_STREAM } | ||
>( | ||
(payload, initialRequestN, subscriber) => { | ||
return handler( | ||
codecs.inputCodec.decode(payload.data), | ||
initialRequestN, | ||
subscriber | ||
); | ||
}, | ||
{ requestType: FrameTypes.REQUEST_STREAM } | ||
); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I mentioned in the last PR, I started adding a set of "default" responders to aid with my testing, as well as an additional feature. I only got as far as this single responder though, so if we don't want to land this with only a single responder, we can pick it out and loop back around to adding the rest later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is also the question of if these default request/responders should be in the messaging package or elsewhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: Reviewed where I am using this default responder and its only in an example currently so lets not land this API addition with this effort.
- requester fireAndForget - requester requestResponse - requester requestStream refactor: renamed to SubscribingAsyncIterator + added more tests feat: (wip) add async responders - fireAndForget - requestResponse feat: AsyncIterable requestStream responder refactor: use rxjs observer for async iterable requestStream example feat: add requesChannel responders and requesters refactor: remove unnecessary passing of scheduler test: (wip) requester tests test: async requestResponse requesters tests test: async adapter fireAndForget requester tests refactor: apply linting fix: resolve issues from rebasing test: add tests for requestStream requester refactor: rename async package to adapter-async Signed-off-by: Kevin Viglucci <kviglucci@gmail.com>
Signed-off-by: Kevin Viglucci <kviglucci@gmail.com>
Signed-off-by: Kevin Viglucci <kviglucci@gmail.com>
ee225e6
to
f30d5b6
Compare
Signed-off-by: Kevin Viglucci <kviglucci@gmail.com>
04fca05
to
43cd833
Compare
Signed-off-by: Kevin Viglucci <kviglucci@gmail.com>
479727a
to
303c6f3
Compare
Adds an adapter package that supports
async/await
and async generators for the various interaction patterns.