Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Graceful slow consumers handling #506

Open
wants to merge 15 commits into
base: main
Choose a base branch
from

Conversation

kleferbe
Copy link
Contributor

@kleferbe kleferbe commented Nov 3, 2021

This is about client side slow consumer handling which happens when the receive buffers are full and an other message is received. Right now this is handled like an error, and Options.AsyncErrorEventHandler is invoked.
There is also a Connection.lastEx field that is set with a NATSSlowConsumerException:

        internal void processSlowConsumer(Subscription s)
        {
            lastEx = new NATSSlowConsumerException();
            ...

This Connection.lastEx will be thrown for example on every publish rendering the Connection useless.

It is better to discard messages if the buffers are full and notify the application about that. Afaiks, this pattern is also followed by the Java client.

I've added an Options.SlowConsumerEventHandler that will be called once when a slow consumer gets detected.
The Subscription is marked as slow consumer (Subscription.sc). The flag will be reset when the buffers get free again (at 80% of their capacity), when they get full again a new slow consumer event will be fired.

This enables the applications to continue processing and eventually catch up again. They have the full control what to do in case of a slow consumer and the Options.SlowConsumerEventHandler does provide the first dropped message so that the applications can get its JetStream sequence number for example.

If the Options.SlowConsumerEventHandler is not registered the Options.AsyncErrorEventHandler will be called instead to gain some backward compatibility.

{
callbackScheduler.Add(()=> slowConsumerEventHandler(this, new SlowConsumerEventArgs(this, s, m)));
}
else //If the user does not care about slow consumers (has not registered an event handler) propagate the situation as error.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. This is essentially an api change, so I'll have to discuss this with Colin.
  2. The idea is fine, but if we add this, we will need to make a SlowConsumerEventHandlerOrDefault and always call that method. See the other OrDefault in the Nats.cs

Copy link
Contributor Author

@kleferbe kleferbe Nov 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. And should that DefaultSlowConsumerEventHandler do the fallback to AsyncErrorEventHandler if no SlowConsumerEventHandler is registered? And possibly even set the lastEx, so that it behaves like before if you don't register an SlowConsumerEventHandler? Or should I remove the fallback completely?

Copy link
Collaborator

@scottf scottf Nov 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good backward compatible solution.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I've moved that fallback to the DefaultSlowConsumerEventHandler and set the Connection.lastEx there. This will be quite close to the original API if no SlowConsumerEventHandler is registered, although lastEx is set a bit later because it's queued by the callbackScheduler.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like some of the ideas here.

My concern in adding a new handler is that this creates a non-trivial client parity issue - we'd have to add equivalent functionality for go, python, java, ruby, deno (node/ts) etc, so we'll want to touch base with the maintainers of the other NATS clients.

Great catch that the last error exception should be reset if the connection survives (the SC was initiated by the client). We'll want to look at the other clients to see what behavior in this situation is and make sure all clients adhere to desired behavior.

With respect to the JetStream sequence gap checking here, we will soon have an ordered consumer which avoids this situation via heartbeats and flow control and will automatically recover when a gap has been detected. So we like your idea. :)

The philosophical stance NATS has around slow consumer errors is that they should be rare and indicate that operational tuning on the system should occur rather than attempting to recover within NATS. That being said, we want to provide enough information to the application so it can attempt to recover itself if desired.

As a simpler option, in addition to resetting lastEx, would a connection level API (available to the async error event handler) that would get the first dropped message work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your feedback, Colin!
I had a look at the Java Client and that already has a dedicated slow consumer detected event: https://github.com/nats-io/nats.java/blob/7525652472ac343ef3fa37e4db1aca496c4f6b49/src/main/java/io/nats/client/ErrorListener.java#L71
(Although it does not pass the first dropped message, which I thought is a nice thing to do but is actually not necessary for my usecase).

see also #238

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ColinSullivan1 did you had the opportunity to discuss the API changes with the other client maintainers? I would appreciate a decision how the API should look like.
Then I can update my PR to get it merged asap. I would love to get back on track with my services.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants