Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure only canonical are added to RabbitmqBroker.queues #3

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

es
Copy link

@es es commented Apr 14, 2023

When a worker is started, it'll eventually start two consumers: one for reading off the queue and another for reading off the delayed queue. When it starts consuming off the delayed queue, it calls RabbitmqBroker#consume('queue_name.DQ'). In turn, queue_name.DQ goes through the declare queue initialization which results in its name being added to the RabbitmqBroker.queues set. This becomes an issue as later on in commands such as RabbitmqBroker#flush, where assumptions are made that only the canonical queue name is present.

When a worker is started, it'll eventually start two consumers: one for reading off the queue and another for reading off the delayed queue. When it starts consuming off the delayed queue, it calls
`RabbitmqBroker#consume('queue_name.DQ')`. In tern, 'queue_name.DQ' goes through the declare queue initialization which results in its name being added to the `queues` set. This becomes an issue as later
on in commands such as flush, assumptions are made that only the canonical queue name is present.
@es es requested a review from twooster April 14, 2023 18:00
@twooster
Copy link

Hmm. No I don't think this is a good change. It's expected that all queues that might be consumed from are predefined. Are you seeing bugs in dramatiq as a result of this, or are you seeing bugs in our middleware as a result?

If bugs in our middleware, the "dramatiq way" seems to be to call q_name on any queue names you might be handling to ascertain the canonical queue name.

@es
Copy link
Author

es commented Apr 19, 2023

I'm realizing that the previous implementation allows for lazy queue creation 🤔 I'll look at adding some tests to validate that's the case?

The problem I'm running into was around the changes in #1. It seems like a lot of the code assumes RabbitmqBroker.queues will have only canonical names. When we create or delete them, it calls all three (canonical, dead letter, and delay queue names). The specific issue I was running into was when calling flush_all, it would flush all three default queues twice (RabbitmqBroker.queues contained both default and default.DQ). This is fine for flushing, but would break when attempting to delete the queues 😕

One option would be to modify the deletion to be more defensive? In theory, it's possible that the queues could be deleted on the RMQ hosts manually or out of band, so we can't assume they're always there.

The larger change that I kept avoiding in this PR was refactoring Queue out into a class of its own and adding delete, create, etc. as methods on it. I'm wondering maybe it's worth it to just bite the bullet on that change?

@twooster
Copy link

Definitely Dramatiq is a bit loosey-goosey with some concepts here, and I see the overlap now in the flush function.

To cut the Gordian's knot: What about moving the queue deletion to the test infrastructure, or just using different queue names for testing quorum vs not?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants