-
Notifications
You must be signed in to change notification settings - Fork 56
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enable at-least-once delivery for ACK-based input sources via GC callback #229
Comments
This callback might be better called |
I'd be interested in participating in the discussion around this. I'm working on using Bytewax with Memphis.dev. Like the JMS API, Memphis expects the consumer to acknowledge the message once processed. For my simple prototype, I do not intend to do aggregations so I was going to pass the message information along the flow so that I can use a dummy output source to acknowledge it after a previous output source writes the transformed message out. I was also considering if it would be possible to add a hook to the StatelessInputSource or similar that gets called after next(). E.g., next() is called to get the next event and once the flow processes it, the hook (e.g., mark_done()) is called. edit: I realized that the hook might not be necessary. Some logic could be added to next() that does this: if self.previous != None:
self.previous.ack()
self.previous = None
self.previous = consumer.fetch()
return self.previous.get_data() |
Unfortunately, the "ack on the next The idea of "send the message ID through the flow and ack it in the output" is closer to correct but would get you at-most-once delivery over a crash and resume. The Bytewax recovery model is "coherently snapshot all state and IO locations, then on resume restore that state and rewind IO to those locations." Thus because it'd ack the message on the first output, it'll never be seen again at an input, even if the state of all the internal operators is rewound back further than that item. Depending on the specific operators you use and delivery guarantees you want, that might be fine, though. E.g. if you have no stateful operators and are just doing The right way has the recovery system being able to notify that a message has passed the point of "this will never need to be replayed" so you can ack it and permanently move on, but we don't have those hooks. |
I didn't realize that. I thought it worked in lock step. Yes, I can definitely see that being a problem. I dug a bit into Memphis. We have the ability to get the sequence id for each message we receive and set the starting sequencing number when we create consumers. We can use this to perform Kafka-esque state recovery, so we'll do that. We still need to acknowledge messages but that's because of some logic on the broker side. |
Right now, if you want to use ACK-based input sources (e.g. SQS, RabbitMQ), you have to ACK immediately upon message receipt (because otherwise you can never ACK unless you do some very hacky and input specific things like send the raw message type through the dataflow and ACK on output in a custom connector). This means that those input systems are limited to at-most-once delivery.
A way we could facilitate at-least-once delivery is by adding a
StatefulSource.garbage_collect(self, state)
callback that is called when an epoch is older than the cluster frontier (meaning all outputs and backups for it have completed) with the snapshot that was taken at the end of that epoch.Then, a SQS input source could do something like (made up SQS Python API):
I believe this will work because any messages consumed during an epoch that is "rolled back" during recovery will eventually visibility timeout and be delivered again. This requires some coordination with the SQS visibility timeout value; it'd have to be longer than the epoch interval so that you don't redeliver before GC could even be called.
Returning the snapshot state on GC instead of baking in the concept of message IDs directly is cool because then you can support other input sources that have "dynamic retention". The one that comes to mind is if you wanted to build a Postgres logical replication input. You need to be periodically "ACKing" up to an LSN so the replication slot does not retain changes forever.
Something like (made up Postgres Python API, but inspired by psychopg2's replication API):
The text was updated successfully, but these errors were encountered: