Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add concurrency config for processing message concurrently #1276

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

Zzocker
Copy link
Contributor

@Zzocker Zzocker commented Feb 9, 2023

concatMap pipe processes messages synchronously, this PR adds the concurrency config and replaces concatMap with mergeMap to allow messages to be processed concurrently. Now that multi-tenant is supported, it will be helpful to process multiple messages at a time.

Signed-off-by: Pritam Singh pkspritam16@gmail.com

@Zzocker Zzocker requested a review from a team as a code owner February 9, 2023 07:35
Copy link
Contributor

@TimoGlastra TimoGlastra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this change, it's a very easy and elegant way, which doesn't break the issues we've encountered on mobile devices.

Maybe we can however make the variable name a bit more specific, as concurency can mean a lot of things.

So maybe maxMessageProcessingConcurrency? Or just messageProcessingConcurrency?

@blu3beri any thoughts? You're always good at names

@codecov-commenter
Copy link

codecov-commenter commented Feb 9, 2023

Codecov Report

Merging #1276 (8df35f2) into main (115d897) will decrease coverage by 0.51%.
The diff coverage is 67.79%.

@@            Coverage Diff             @@
##             main    #1276      +/-   ##
==========================================
- Coverage   87.04%   86.53%   -0.51%     
==========================================
  Files         778      790      +12     
  Lines       18573    19070     +497     
  Branches     3157     3274     +117     
==========================================
+ Hits        16166    16503     +337     
- Misses       2400     2560     +160     
  Partials        7        7              
Impacted Files Coverage Δ
packages/core/src/types.ts 100.00% <ø> (ø)
packages/node/src/NodeFileSystem.ts 35.13% <0.00%> (-2.01%) ⬇️
packages/askar/src/AskarModule.ts 22.22% <22.22%> (ø)
packages/askar/src/utils/askarWalletConfig.ts 42.50% <42.50%> (ø)
packages/askar/src/wallet/AskarWallet.ts 60.78% <60.78%> (ø)
packages/askar/src/utils/assertAskarWallet.ts 66.66% <66.66%> (ø)
packages/core/src/agent/Agent.ts 95.76% <66.66%> (+0.03%) ⬆️
packages/askar/src/storage/AskarStorageService.ts 77.50% <77.50%> (ø)
packages/askar/src/storage/utils.ts 97.87% <97.87%> (ø)
packages/askar/src/storage/index.ts 100.00% <100.00%> (ø)
... and 7 more

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.

@berendsliedrecht
Copy link
Contributor

I like this change, it's a very easy and elegant way, which doesn't break the issues we've encountered on mobile devices.

Maybe we can however make the variable name a bit more specific, as concurency can mean a lot of things.

So maybe maxMessageProcessingConcurrency? Or just messageProcessingConcurrency?

@blu3beri any thoughts? You're always good at names

I think maxConcurrentMessageProcessing might be good? But is there a specific reason why the user can set this?

@Zzocker
Copy link
Contributor Author

Zzocker commented Feb 9, 2023

I like this change, it's a very easy and elegant way, which doesn't break the issues we've encountered on mobile devices.
Maybe we can however make the variable name a bit more specific, as concurency can mean a lot of things.
So maybe maxMessageProcessingConcurrency? Or just messageProcessingConcurrency?
@blu3beri any thoughts? You're always good at names

I think maxConcurrentMessageProcessing might be good? But is there a specific reason why the user can set this?

Will change it to maxConcurrentMessageProcessing.

@berendsliedrecht
Copy link
Contributor

I like this change, it's a very easy and elegant way, which doesn't break the issues we've encountered on mobile devices.
Maybe we can however make the variable name a bit more specific, as concurency can mean a lot of things.
So maybe maxMessageProcessingConcurrency? Or just messageProcessingConcurrency?
@blu3beri any thoughts? You're always good at names

I think maxConcurrentMessageProcessing might be good? But is there a specific reason why the user can set this?

Will change it to maxConcurrentMessageProcessing.

I mean why the number of concurrent processes can be set. Is there any advantage over using the default from RxJS?

@Zzocker
Copy link
Contributor Author

Zzocker commented Feb 9, 2023

concatMap was explicitly used, so I thought it was a design decision to have synchronous message processing, which is why I have set the default of concurrency to 1. Then again no one can guess the exact level of concurrency they want. I think a boolean config processMessagesConcurrently should be used.

    this.messageSubscription = this.eventEmitter
      .observable<AgentMessageReceivedEvent>(AgentEventTypes.AgentMessageReceived)
      .pipe(
        takeUntil(stop$),
        mergeMap(
          (e) =>
            this.messageReceiver
              .receiveMessage(e.payload.message, {
                connection: e.payload.connection,
                contextCorrelationId: e.payload.contextCorrelationId,
              })
              .catch((error) => {
                this.logger.error('Failed to process message', { error })
              }),
          this.agentConfig.processMessagesConcurrently ?  undefined : 1
        )
      )
      .subscribe()

@TimoGlastra can you share more about the issue with the mobile devices which you are talking about?

…ncurrently

Signed-off-by: Pritam Singh <pkspritam16@gmail.com>
@TimoGlastra
Copy link
Contributor

it's not specific to mobile, but it occurs mostly on mobile as they use a mediator. the issue that arises is that multiple messages are picked up at once from the mediator, and that sometimes those messages assume prevous messages have been processed. this leads to issues when the agent suddenly processed 10 messages at the same.

An example where this happens is that certain issuers act when a connection reach state resposne. This means they will send a connection response in response to a connection request and immediately after send a credential offer. If the connection response is processed first and then the credential offer all is fine. But if they're processed at the same time (which happens due to batch pickup), the credential offer processing will throw an error because the connection is still in state request and thus can't be used yet according to AFJ rules.

According to the Aries RFCS, messages MUST be assumed to be processed in no specific order, but in this case it causes issues. The reason why issuers send an offer after sending the connection response is that not all agents acknowledge a connections (it's not required with RFC 0160), and it's an issue that is solved with basically with the oob RFC.

That's some context on the historic reasons for sticking with serial processing, but I think for cloud servers this doesn't make sense and can really limit the throughput of the agent. A boolean (to just make it concurrent or serial) would also work I think. I've been thinking about how to revamp the inbound / outbound queue for a while now, as at some point I would like to make it more of a persistent queue to prevent the agent from having to process an insame amount of messages concurrently

@TimoGlastra
Copy link
Contributor

One inconsistency to add here is that this message queue is only used for messages that are emitted using the AgentMessageReceivedEvent. Messages that are dirreclty sent to the message receiver (what happens in the inbound transport) are not handled by the queue

@Zzocker
Copy link
Contributor Author

Zzocker commented Feb 9, 2023

Oh, HttpInboundTransport, WsInboundTransport , SubjectInboundTransport and SubjectOutboundTransport directly call the receiveMessage. Should all of them also be using the queue ?

@TimoGlastra TimoGlastra self-requested a review July 6, 2023 13:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants