Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pause and Resume Functionality in Processor #404

Open
johnmehan opened this issue Nov 30, 2022 · 4 comments
Open

Pause and Resume Functionality in Processor #404

johnmehan opened this issue Nov 30, 2022 · 4 comments

Comments

@johnmehan
Copy link

Would it be possible to add functionality to pause and resume the Processor?

The sarama.Consumer already has these methods: Pause, PauseAll, Resume, ResumeAll.

Thanks

@roubanakhle
Copy link
Contributor

@johnmehan Yes it is possible to add the pause and resume functionality.
Can you please share the use case for which you would need this functionality?

@johnmehan
Copy link
Author

@roubanakhle So, in order to gain a performance improvement, we have our callback queuing messages into a queue. We then have a separate process consuming from this queue and committing messages when processed. We have achieved a substantial speed improvement but we do need to pause/resume our consumer in order to manage memory usage.

@frairon
Copy link
Contributor

frairon commented Dec 25, 2022

@johnmehan sorry for the long delay, I gave it a quick shot in this PR: #411.
It's not tested and all, but would that be something you had in mind?

@mariusw
Copy link

mariusw commented Mar 28, 2023

This seems to be exactly what we need. Our usecase is that we have a service A that is dependent on another service B for its lookups. Service A consumes diagnostics but for each consumed message it needs to check if the source of the diagnostic message is still in active service (we want to ignore diagnostics from inactive sources).

Whether the source is active or inactive is determined by service B. To service A, these two scenarios will look identical:

  1. Source is inactive (i.e. not found in "active sources registry")
  2. Service B has stopped (causing TTLs to expire in "active sources registry")

To alleviate this, we can provide a heartbeat from service B, and if the heartbeats stop, pause consuming diagnostics in service A until heartbeats resume.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants