Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement assigning subscriber #23

Merged
merged 4 commits into from Sep 24, 2020
Merged

Conversation

dpcollins-google
Copy link
Collaborator

@dpcollins-google dpcollins-google commented Sep 23, 2020

This handles changing partition assignments and creates AsyncSubscribers per-partition.

diffbased on #22

This handles mapping a single partition to a Cloud Pub/Sub Like asynchronous subscriber.
This handles changing partition assignments and creates AsyncSubscribers per-partition.
@dpcollins-google dpcollins-google requested a review from a team as a code owner September 23, 2020 15:55
@google-cla google-cla bot added the cla: yes This human has signed the Contributor License Agreement. label Sep 23, 2020
Copy link

@manuelmenzella-google manuelmenzella-google left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM after my comment.

for partition in added_partitions:
await self._start_subscriber(partition)
for partition in removed_partitions:
await self._stop_subscriber(self._subscribers[partition])

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit; I would make stop_subscriber take a partition, and have it remove it from the map. Then we can never forget to keep the map in sync after calling this.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nack. Then this function cannot be reused on teardown when looping over the active subscribers without making an explicit copy of the key set, otherwise you'll get a RuntimeError: dictionary changed size during iteration.

@dpcollins-google dpcollins-google merged commit 6afd477 into master Sep 24, 2020
@anguillanneuf anguillanneuf deleted the assigning_subscriber branch March 25, 2022 22:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cla: yes This human has signed the Contributor License Agreement.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants