-
Notifications
You must be signed in to change notification settings - Fork 153
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Graceful slow consumers handling #506
base: main
Are you sure you want to change the base?
Conversation
Merge upstrea master
…ntinue and eventually resolve.
src/NATS.Client/Conn.cs
Outdated
{ | ||
callbackScheduler.Add(()=> slowConsumerEventHandler(this, new SlowConsumerEventArgs(this, s, m))); | ||
} | ||
else //If the user does not care about slow consumers (has not registered an event handler) propagate the situation as error. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- This is essentially an api change, so I'll have to discuss this with Colin.
- The idea is fine, but if we add this, we will need to make a
SlowConsumerEventHandlerOrDefault
and always call that method. See the other OrDefault in the Nats.cs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- And should that
DefaultSlowConsumerEventHandler
do the fallback toAsyncErrorEventHandler
if noSlowConsumerEventHandler
is registered? And possibly even set the lastEx, so that it behaves like before if you don't register anSlowConsumerEventHandler
? Or should I remove the fallback completely?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good backward compatible solution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I've moved that fallback to the DefaultSlowConsumerEventHandler
and set the Connection.lastEx
there. This will be quite close to the original API if no SlowConsumerEventHandler
is registered, although lastEx
is set a bit later because it's queued by the callbackScheduler
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really like some of the ideas here.
My concern in adding a new handler is that this creates a non-trivial client parity issue - we'd have to add equivalent functionality for go, python, java, ruby, deno (node/ts) etc, so we'll want to touch base with the maintainers of the other NATS clients.
Great catch that the last error exception should be reset if the connection survives (the SC was initiated by the client). We'll want to look at the other clients to see what behavior in this situation is and make sure all clients adhere to desired behavior.
With respect to the JetStream sequence gap checking here, we will soon have an ordered consumer which avoids this situation via heartbeats and flow control and will automatically recover when a gap has been detected. So we like your idea. :)
The philosophical stance NATS has around slow consumer errors is that they should be rare and indicate that operational tuning on the system should occur rather than attempting to recover within NATS. That being said, we want to provide enough information to the application so it can attempt to recover itself if desired.
As a simpler option, in addition to resetting lastEx
, would a connection level API (available to the async error event handler) that would get the first dropped message work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your feedback, Colin!
I had a look at the Java Client and that already has a dedicated slow consumer detected event: https://github.com/nats-io/nats.java/blob/7525652472ac343ef3fa37e4db1aca496c4f6b49/src/main/java/io/nats/client/ErrorListener.java#L71
(Although it does not pass the first dropped message, which I thought is a nice thing to do but is actually not necessary for my usecase).
see also #238
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ColinSullivan1 did you had the opportunity to discuss the API changes with the other client maintainers? I would appreciate a decision how the API should look like.
Then I can update my PR to get it merged asap. I would love to get back on track with my services.
This is about client side slow consumer handling which happens when the receive buffers are full and an other message is received. Right now this is handled like an error, and
Options.AsyncErrorEventHandler
is invoked.There is also a
Connection.lastEx
field that is set with aNATSSlowConsumerException
:This
Connection.lastEx
will be thrown for example on every publish rendering theConnection
useless.It is better to discard messages if the buffers are full and notify the application about that. Afaiks, this pattern is also followed by the Java client.
I've added an
Options.SlowConsumerEventHandler
that will be called once when a slow consumer gets detected.The Subscription is marked as slow consumer (
Subscription.sc
). The flag will be reset when the buffers get free again (at 80% of their capacity), when they get full again a new slow consumer event will be fired.This enables the applications to continue processing and eventually catch up again. They have the full control what to do in case of a slow consumer and the
Options.SlowConsumerEventHandler
does provide the first dropped message so that the applications can get its JetStream sequence number for example.If the
Options.SlowConsumerEventHandler
is not registered theOptions.AsyncErrorEventHandler
will be called instead to gain some backward compatibility.