New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: implement assigning subscriber #23
Conversation
This handles mapping a single partition to a Cloud Pub/Sub Like asynchronous subscriber.
This handles changing partition assignments and creates AsyncSubscribers per-partition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM after my comment.
for partition in added_partitions: | ||
await self._start_subscriber(partition) | ||
for partition in removed_partitions: | ||
await self._stop_subscriber(self._subscribers[partition]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit; I would make stop_subscriber take a partition, and have it remove it from the map. Then we can never forget to keep the map in sync after calling this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nack. Then this function cannot be reused on teardown when looping over the active subscribers without making an explicit copy of the key set, otherwise you'll get a RuntimeError: dictionary changed size during iteration.
This handles changing partition assignments and creates AsyncSubscribers per-partition.
diffbased on #22