-
Notifications
You must be signed in to change notification settings - Fork 182
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: add concurrency config for processing message concurrently #1276
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this change, it's a very easy and elegant way, which doesn't break the issues we've encountered on mobile devices.
Maybe we can however make the variable name a bit more specific, as concurency
can mean a lot of things.
So maybe maxMessageProcessingConcurrency
? Or just messageProcessingConcurrency
?
@blu3beri any thoughts? You're always good at names
Codecov Report
@@ Coverage Diff @@
## main #1276 +/- ##
==========================================
- Coverage 87.04% 86.53% -0.51%
==========================================
Files 778 790 +12
Lines 18573 19070 +497
Branches 3157 3274 +117
==========================================
+ Hits 16166 16503 +337
- Misses 2400 2560 +160
Partials 7 7
Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here. |
I think |
Will change it to
|
I mean why the number of concurrent processes can be set. Is there any advantage over using the default from RxJS? |
this.messageSubscription = this.eventEmitter
.observable<AgentMessageReceivedEvent>(AgentEventTypes.AgentMessageReceived)
.pipe(
takeUntil(stop$),
mergeMap(
(e) =>
this.messageReceiver
.receiveMessage(e.payload.message, {
connection: e.payload.connection,
contextCorrelationId: e.payload.contextCorrelationId,
})
.catch((error) => {
this.logger.error('Failed to process message', { error })
}),
this.agentConfig.processMessagesConcurrently ? undefined : 1
)
)
.subscribe() @TimoGlastra can you share more about the issue with the mobile devices which you are talking about? |
…ncurrently Signed-off-by: Pritam Singh <pkspritam16@gmail.com>
it's not specific to mobile, but it occurs mostly on mobile as they use a mediator. the issue that arises is that multiple messages are picked up at once from the mediator, and that sometimes those messages assume prevous messages have been processed. this leads to issues when the agent suddenly processed 10 messages at the same. An example where this happens is that certain issuers act when a connection reach state resposne. This means they will send a connection response in response to a connection request and immediately after send a credential offer. If the connection response is processed first and then the credential offer all is fine. But if they're processed at the same time (which happens due to batch pickup), the credential offer processing will throw an error because the connection is still in state request and thus can't be used yet according to AFJ rules. According to the Aries RFCS, messages MUST be assumed to be processed in no specific order, but in this case it causes issues. The reason why issuers send an offer after sending the connection response is that not all agents acknowledge a connections (it's not required with RFC 0160), and it's an issue that is solved with basically with the oob RFC. That's some context on the historic reasons for sticking with serial processing, but I think for cloud servers this doesn't make sense and can really limit the throughput of the agent. A boolean (to just make it concurrent or serial) would also work I think. I've been thinking about how to revamp the inbound / outbound queue for a while now, as at some point I would like to make it more of a persistent queue to prevent the agent from having to process an insame amount of messages concurrently |
One inconsistency to add here is that this message queue is only used for messages that are emitted using the |
Oh, |
concatMap
pipe processes messages synchronously, this PR adds theconcurrency
config and replacesconcatMap
withmergeMap
to allow messages to be processed concurrently. Now that multi-tenant is supported, it will be helpful to process multiple messages at a time.Signed-off-by: Pritam Singh pkspritam16@gmail.com