Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop consuming messages after failure #30

Open
prabello opened this issue Apr 29, 2020 · 23 comments
Open

Stop consuming messages after failure #30

prabello opened this issue Apr 29, 2020 · 23 comments
Labels
contributions welcome Contributions welcome

Comments

@prabello
Copy link

How can I configure kafka_broadway to stop once an error occurs?
Once an error occurs, I would like to stop consuming messages and keep the same offset, this may be due to the publisher sending wrong information or something that needs to be changed on my application pipeline.

Right now I'm using the handle_failed to publish into a dead-letter-topic, but its not the ideal behavior for my use case.

Is it possible to change the offset of broadway consumer group to skip some messages or even replay?

@josevalim
Copy link
Member

@prabello have you tried using Broadway.stop(NameOfYourTopology)?

@msaraiva
Copy link
Collaborator

Is it possible to change the offset of broadway consumer group to skip some messages or even replay?

I don't think it's possible with the current implementation as the producer will always acknowledge the messages, even when they fail. See Handling failed messages. Maybe implementing a custom acknowledger could work somehow, but I think it can be tricky to make it play nicely with the current producer.

@prabello
Copy link
Author

@josevalim no, I may have missed, is there any docs that explain it?
What exactly would define my broadway topology?

@msaraiva thanks for pointing it out, in some cases, I just want to stop everything and fix the upstream and having to re-read things on the dead-letter can be cumbersome
But I see the reason for this strategy, thanks for the fast reply

@josevalim
Copy link
Member

@prabello it is the name you give to Broadway.start_link. Apparently the function is not documented though, we would have to fix that first.

@jared-mackey
Copy link

I am also curious on this. According to the docs, Broadway Kafka always acks messages. I don't think that is always the desired behaviour.

In some cases, such as a downstream dependency being unavailable, we may chose to completely stop this consumer and fail over. In other cases, such as bad serialization of this particular message, the current solution is ideal.

What does Broadway.stop() do in these scenarios?

@josevalim
Copy link
Member

josevalim commented Jun 1, 2020

Broadway.stop will send a shutdown exit, this will make producers stop fetching and processors to clear their buffers. For the partitions assigned to the processor that has called Broadway.stop, no more elements will be processed. For the other partitions running concurrently within other processes, they will all flush accordingly first.

For things such as downstream being unavailable, I would rather consider things like retries with back-offs (say 1s, 2s, 5s, 10s). This means that some messages may still go to the dead-letter queue but the processing speed will be quite reduced.

@CJPoll
Copy link

CJPoll commented Jun 1, 2020

Thank you for the quick reply on that.

So anything that was pre-fetched will be acked too.

Does this mean that broadway_kafka will ack before sending messages to handle_message?

@josevalim
Copy link
Member

josevalim commented Jun 1, 2020

No! It is always at the end. I have updated the previous comment for clarity. :)

@CJPoll
Copy link

CJPoll commented Jun 1, 2020

Whew - scared me for a second there!

It's easy for Jared or me to say the right answer is to "stop consuming", but harder to translate that into something actionable. I have 3 thoughts in my head:

  1. Stop the consumer in the supervision tree
    Pros: Easy
    Cons: The problematic partition will be reassigned to another pod/node until all are shutdown. That creates another manual step to restart apps or otherwise get consumers started again.

  2. Implement backoff in Broadway itself (looks like it's on the todo list)
    Pros: Easiest for application devs, looks like it's on the roadmap
    Cons: An existing ecosystem to migrate

  3. Recursively retry in the business logic, sleeping if backoff is needed.
    Pros: Requires no changes to broadway
    Cons: High likelihood of inconsistencies/mistakes

It looks like backoff is on the todo list for broadway - are there previous discussions I could review to see if I might be able to contribute?

@josevalim
Copy link
Member

The proposed back-off in Broadway is that it is really just a safety net. For example, imagine that in your handle_message you need to perform 3 tasks. If the task in the middle fails, it would be best to provide retry and backoffs specific to that task. So once it succeeds it goes to the third task.

If the back-off is in Broadway, then the best we can do is to assume it has failed altogether and just slow things down. So it is a safety net.

So my suggestion is to do #3 and potentially add #2 as a safety net.

@victorolinasc
Copy link
Contributor

Perhaps we should add to the docs one very important piece of information: even if handle_failed/2 callback raises/throws the offset will be commited.

As I understand (and I might be saying something dumb here), this is complicated. A common practice is to publish failed messages to another topic/queue so that you can handle it separately. Though, if by any means, publishing fails but commiting the offset succeeds we loose the ability to handle that failed message. Considering Murphy's law: if anything CAN go wrong, it WILL go wrong.

I think the only way to go about this on consumer code is to always wrap whatever code we put in handle_failed/2 with a catch clause and call Broadway.stop/1 in case it raises/throws otherwise we risk loosing messages on the stream.

@josevalim
Copy link
Member

@victorolinasc the docs are already explicit about this:

broadway_kafka never stops the flow of the stream, i.e. it will always ack the messages even when they fail.

However, if you think there are other places we can add this note to make it clearer, pull requests are welcome!

@victorolinasc
Copy link
Contributor

I've read that part but somehow considered that handle_failed/2 would not commit if it raised / returned an error. It is the error of a failure rescuing after all (Inception feelings). I'll think about including it in the sentence but I see now that was my misreading the sentence.

Thanks!

@amacciola
Copy link
Contributor

I am using the latest version of Broadway Kafka and i do not see any method called Broadway.stop/1 where would this method be defined ?

@lucacorti
Copy link

Broadway.stop will send a shutdown exit, this will make producers stop fetching and processors to clear their buffers. For the partitions assigned to the processor that has called Broadway.stop, no more elements will be processed. For the other partitions running concurrently within other processes, they will all flush accordingly first.

@josevalim as @amacciola says there seems to be no Broadway.stop function. Is there another way to shut down the producer?

@josevalim
Copy link
Member

You should be able to call GenServer.stop(BroadwayPIpeline) but keep in mind that starting and stopping Kafka partitions is asking for trouble because Kafka doesn't deal with nodes quickly going down and coming up well. There is a discussion here: #47.

@amacciola
Copy link
Contributor

@lucacorti i also found that I was shutting down Broadway pipelines when what I really wanted to be doing was suspending the pipelines

You can see our conversation here

#56

If the goal is to just pause and start pipelines use the solution in that discussion

If the goal is to completely shutdown the pipeline then use Genserver.stop(pipeline_name)

@aravindanck
Copy link
Contributor

I'm using Broadway.stop(MODULE, :shutdown) to stop the pipeline, but the underlying :brod_sup restarts it after a minute. In the logs, I could see that the restart_type in child's spec is set to permanent - I'm guessing that's the cause. Could you tell if it's the same behavior you see as well or am I missing something?

Logs here - #86

@josevalim
Copy link
Member

Btw, this discussion made it clear there are different desired approaches to handling failure. If anyone wants to submit pull requests documenting individual techniques, it will be very welcome!

@yordis
Copy link
Contributor

yordis commented May 29, 2022

Full credit to @slashmili based on our conversation on Slack. Just collecting more ideas here.

So there are a few cases where an error can occurs which I already mentioned in the issue.

  1. If you tag a message as an error
  2. Exception occurs, you can easily guard your handle_message(in try/catch) and tag it as an error
  3. BEAM crash, no need to worry about that since the offset was not committed and will be re-read again

If you manage to tag a message as failed, your handle_failure will be called, then you can push the message to some other topic as DLQ.

When pushing the failed messages to the DLQ topic, add a retry_attempts to the body(if it’s JSON content or you could add a retry_attempts counter to the header).

Setup another broadway module that consumes the DLQ topic, and does the same action as the original broadway. If it fails bump the retry_attempts and push it as the new msg to the DLQ.
Do this until retry_attempts reaches max and then put it in another topic(3rd) topic which needs a human inspection to find out what is wrong with the message. It's not optimal and you’ll have the message duplicated in multiple topics and in DLQ (every time you put it back to DLQ again, it will be a new message)

Using Kafka for two use cases, one is for high throughput, we can't add Oban and database to the flow, deal with DLQ by putting the failed messages into another topic
For another use case which is like even driven architecture, we always enqueue the jobs into Oban(on handle_mesasge), and never process any data in broadway!

@amacciola
Copy link
Contributor

@yordis i am just double checking but all of this you mentioned can already be done right now correct ? This would not need any new PR against Broadway Kafka lib to support ? Because I am already doing something very similar to what you mentioned, just instead of Kafka for the RetryQueue using RabbitMQ and the RabbitMQ Broadway lib because I don't need the failed messages to persist unless they reach max failed attempts and I push them into the DLQ

@yordis
Copy link
Contributor

yordis commented May 30, 2022

@yordis I am just double-checking but all of this you mentioned can already be done right now correct?

Yes

This would not need any new PR against Broadway Kafka lib to support ?

I am not sure what you mean by "supporting".

I am a big fan of https://diataxis.fr/ documentation framework, so if you leave it to me, I would love the Elixir ecosystem to start adding "How-To Guides" to showcase potential implementations like your situation using RabbitMQ, or my situation where we may use Oban for it ...

I am not sure what is the right call here.

For this particular case,

Showcase the way we solve the problems

@josevalim
Copy link
Member

Correct. This is a specific request for more documentation to be added via PRs. :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
contributions welcome Contributions welcome
Projects
None yet
Development

No branches or pull requests

10 participants