Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable at-least-once delivery for ACK-based input sources via GC callback #229

Open
davidselassie opened this issue Apr 14, 2023 · 4 comments
Labels
type:feature New feature request

Comments

@davidselassie
Copy link
Contributor

davidselassie commented Apr 14, 2023

Right now, if you want to use ACK-based input sources (e.g. SQS, RabbitMQ), you have to ACK immediately upon message receipt (because otherwise you can never ACK unless you do some very hacky and input specific things like send the raw message type through the dataflow and ACK on output in a custom connector). This means that those input systems are limited to at-most-once delivery.

A way we could facilitate at-least-once delivery is by adding a StatefulSource.garbage_collect(self, state) callback that is called when an epoch is older than the cluster frontier (meaning all outputs and backups for it have completed) with the snapshot that was taken at the end of that epoch.

Then, a SQS input source could do something like (made up SQS Python API):

class _SQSSource(StatefulSource):
    def __init__(self, queue_name, resume_state):
        self._queue = SQSQueueConsumer(queue_name)
        self._in_flight_msg_ids = resume_state or set()

    def next(self):
        msg = self._queue.poll(0.001)
        if msg is None:
            return
        else:
            self._in_flight_msg_ids.insert(msg.id())
            return msg.payload()

    def snapshot(self):
        return self._in_flight_msg_ids

    def garbage_collect(self, state):
        closed_in_flight_msg_ids = state
        for msg_id in closed_in_flight_msg_ids:
            self._queue.ack(msg_id)
            self._in_flight_msg_ids.remove(msg_id)

    def close(self):
        self._queue.close()

I believe this will work because any messages consumed during an epoch that is "rolled back" during recovery will eventually visibility timeout and be delivered again. This requires some coordination with the SQS visibility timeout value; it'd have to be longer than the epoch interval so that you don't redeliver before GC could even be called.

Returning the snapshot state on GC instead of baking in the concept of message IDs directly is cool because then you can support other input sources that have "dynamic retention". The one that comes to mind is if you wanted to build a Postgres logical replication input. You need to be periodically "ACKing" up to an LSN so the replication slot does not retain changes forever.

Something like (made up Postgres Python API, but inspired by psychopg2's replication API):

class _PGLRSource(StatefulSource):
    def __init__(self, conn, resume_state):
        self._lsn = resume_state
        self._cursor = conn.start_replication(start_lsn=self._lsn)
        
    def next(self):
        msg = self._cursor.read_message()
        if msg is None:
            return
        else if msg.data_start > self._lsn:  # Skip the msg at the recovery LSN.
            self._lsn = msg.data_start
            return msg.payload

    def snapshot(self):
        return self._lsn

    def garbage_collect(self, state):
        lsn = state
        self._cursor.send_feedback(flush_lsn=lsn)

    def close(self):
        self._queue.close()
@github-actions github-actions bot added the needs triage New issue, needs triage label Apr 14, 2023
@davidselassie
Copy link
Contributor Author

This callback might be better called commit? This is effectively a two-phase-commit. Also you could then have an identical API with StatefulSink.commit(self, state) for outputs to get exactly-once output. The same recovery machinery would support both the input and the output functionality.

@whoahbot whoahbot removed the needs triage New issue, needs triage label Apr 17, 2023
@rnowling-memphis
Copy link

rnowling-memphis commented Jul 25, 2023

I'd be interested in participating in the discussion around this. I'm working on using Bytewax with Memphis.dev. Like the JMS API, Memphis expects the consumer to acknowledge the message once processed. For my simple prototype, I do not intend to do aggregations so I was going to pass the message information along the flow so that I can use a dummy output source to acknowledge it after a previous output source writes the transformed message out. I was also considering if it would be possible to add a hook to the StatelessInputSource or similar that gets called after next(). E.g., next() is called to get the next event and once the flow processes it, the hook (e.g., mark_done()) is called.

edit: I realized that the hook might not be necessary. Some logic could be added to next() that does this:

if self.previous != None:
    self.previous.ack()
    self.previous = None

self.previous = consumer.fetch()

return self.previous.get_data()

@davidselassie davidselassie added the type:feature New feature request label Jul 27, 2023
@davidselassie
Copy link
Contributor Author

Unfortunately, the "ack on the next next call" trick will not provide the guarantees that you want. The execution model of dataflows is "loosely coupled independent operators with queues in between", thus there is no guarantee that when next is called any particular output has been emitted by the entire dataflow. E.g. it's possible an input's next is called 3 times in a row, buffering items into internal dataflow queues, but no output was ever emitted. (Although probably you will not practically see this behavior, but Bytewax might change it.)

The idea of "send the message ID through the flow and ack it in the output" is closer to correct but would get you at-most-once delivery over a crash and resume. The Bytewax recovery model is "coherently snapshot all state and IO locations, then on resume restore that state and rewind IO to those locations." Thus because it'd ack the message on the first output, it'll never be seen again at an input, even if the state of all the internal operators is rewound back further than that item. Depending on the specific operators you use and delivery guarantees you want, that might be fine, though. E.g. if you have no stateful operators and are just doing map transforms, you'll only see each message output once.

The right way has the recovery system being able to notify that a message has passed the point of "this will never need to be replayed" so you can ack it and permanently move on, but we don't have those hooks.

@rnowling-memphis
Copy link

E.g. it's possible an input's next is called 3 times in a row, buffering items into internal dataflow queues, but no output was ever emitted. (Although probably you will not practically see this behavior, but Bytewax might change it.)

I didn't realize that. I thought it worked in lock step. Yes, I can definitely see that being a problem.

I dug a bit into Memphis. We have the ability to get the sequence id for each message we receive and set the starting sequencing number when we create consumers. We can use this to perform Kafka-esque state recovery, so we'll do that. We still need to acknowledge messages but that's because of some logic on the broker side.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type:feature New feature request
Projects
None yet
Development

No branches or pull requests

3 participants