Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: async adapter #208

Draft
wants to merge 6 commits into
base: 1.0.x-alpha
Choose a base branch
from
Draft

Conversation

viglucci
Copy link
Member

@viglucci viglucci commented Jan 12, 2022

Adds an adapter package that supports async/await and async generators for the various interaction patterns.

Comment on lines 23 to 33
// TODO: is using `@rsocket/rxjs` as intermediary adapter a bad idea?
// - considerations:
// - do we lose support for backpressure that we wouldn't have otherwise?
// - what is bundle size consequences of relying on `@rsocket/rxjs`?
// - what is bundle size consequences of relying on `rxjs` and `rxjs-for-await`
const $responderObs = RxRequestersFactory.requestChannel(
$requesterObs,
inputCodec,
outputCodec,
prefetch
)(rsocket, metadata);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is where I leveraged the Rx adapter to support requestChannel more easily, but doing so raises some questions, which I've detailed in the comment, and below:

  • do we lose support for backpressure that we wouldn't have otherwise?
  • what is bundle size consequences of relying on @rsocket/rxjs?
  • what is bundle size consequences of relying on rxjs and rxjs-for-await

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@OlegDokuka can you help out with providing an alternative async adapter implementation for requestChannel here?

Comment on lines +67 to +73
const subscriberFactory = RxRespondersFactory.requestChannel(
($in) => from(handler(eachValueFrom($in))),
codecs,
prefetch
);

return subscriberFactory(payload, initialRequestN, isCompleted, s);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same idea here as above, I leveraged the RX adapter to more easily provide this functionality, but not sure if it's the best idea.

Comment on lines +18 to +38
export function requestStream<T, R>(
handler: DefaultResponderHandlerSignature<T>,
codecs: {
inputCodec: Codec<T>;
outputCodec: Codec<R>;
}
) {
return Object.assign<
DefaultResponderHandlerSignature<Payload>,
{ requestType: FrameTypes.REQUEST_STREAM }
>(
(payload, initialRequestN, subscriber) => {
return handler(
codecs.inputCodec.decode(payload.data),
initialRequestN,
subscriber
);
},
{ requestType: FrameTypes.REQUEST_STREAM }
);
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I mentioned in the last PR, I started adding a set of "default" responders to aid with my testing, as well as an additional feature. I only got as far as this single responder though, so if we don't want to land this with only a single responder, we can pick it out and loop back around to adding the rest later.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is also the question of if these default request/responders should be in the messaging package or elsewhere.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: Reviewed where I am using this default responder and its only in an example currently so lets not land this API addition with this effort.

- requester fireAndForget
- requester requestResponse
- requester requestStream

refactor: renamed to SubscribingAsyncIterator + added more tests

feat: (wip) add async responders

- fireAndForget
- requestResponse

feat: AsyncIterable requestStream responder

refactor: use rxjs observer for async iterable requestStream example

feat: add requesChannel responders and requesters

refactor: remove unnecessary passing of scheduler

test: (wip) requester tests

test: async requestResponse requesters tests

test: async adapter fireAndForget requester tests

refactor: apply linting

fix: resolve issues from rebasing

test: add tests for requestStream requester

refactor: rename async package to adapter-async
Signed-off-by: Kevin Viglucci <kviglucci@gmail.com>
Signed-off-by: Kevin Viglucci <kviglucci@gmail.com>
Signed-off-by: Kevin Viglucci <kviglucci@gmail.com>
@viglucci viglucci force-pushed the feature/async-iterable-adapter branch from ee225e6 to f30d5b6 Compare January 12, 2022 15:28
Signed-off-by: Kevin Viglucci <kviglucci@gmail.com>
@viglucci viglucci force-pushed the feature/async-iterable-adapter branch from 04fca05 to 43cd833 Compare January 12, 2022 15:35
Signed-off-by: Kevin Viglucci <kviglucci@gmail.com>
@viglucci viglucci force-pushed the feature/async-iterable-adapter branch from 479727a to 303c6f3 Compare January 12, 2022 15:39
@viglucci viglucci marked this pull request as ready for review January 12, 2022 15:40
@viglucci viglucci changed the base branch from 1.0.x to 1.0.x-alpha June 21, 2023 00:59
@viglucci viglucci marked this pull request as draft June 21, 2023 01:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

1 participant