Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Having bizarre issues with custom pytest fixture #574

Open
dragonpaw opened this issue Sep 1, 2023 · 0 comments
Open

Having bizarre issues with custom pytest fixture #574

dragonpaw opened this issue Sep 1, 2023 · 0 comments

Comments

@dragonpaw
Copy link

What OS are you using?

Debian Bookworm

What version of Dramatiq are you using?

1.14.2

What did you do?

In order to facilitate a lot of functional tests with Dramatiq, we wrote this little fixture for use with pytest:

@dataclasses.dataclass
class DramatiqFixture:
    """A fixture for running dramatiq jobs in the background.

    This encapsulates the broker, and the worker, and provides methods to pause and
    resume the worker, and to wait for it to finish.
    """

    worker: dramatiq.Worker
    broker: dramatiq.brokers.stub.StubBroker

    _is_paused: bool = False

    def get_actor(self, actor_name: str) -> dramatiq.Actor:
        return self.broker.get_actor(actor_name)

    @property
    def queues(self) -> dict[str, queue.Queue]:
        return self.broker.queues

    def wait(self, *, queue_name: str = "default", fail_fast: bool = False):
        """Wait for the background worker to finish."""

        if self._is_paused:
            self.resume()
        self.broker.join(queue_name=queue_name, fail_fast=fail_fast)
        self.worker.join()

    def pause(self):
        """Pause the background worker."""
        self.worker.pause()
        self._is_paused = True

    def resume(self):
        """Resume the background worker."""
        self.worker.resume()
        self._is_paused = False


@pytest.fixture(name="dramatiq_q")
def dramatiq_broker_worker_fixture(
    transactional_db, restore_db
) -> Generator[DramatiqFixture, None, None]:
    """Using this fixture starts up a background worker to run dramatiq jobs."""

    broker = dramatiq.get_broker()
    assert isinstance(broker, dramatiq.brokers.stub.StubBroker)

    broker.flush_all()

    worker = dramatiq.Worker(broker, worker_timeout=100)
    worker.start()

    yield DramatiqFixture(worker=worker, broker=broker)

    broker.flush_all()
    worker.stop(timeout=5 * 1000)

The idea being you can import it in any test, and call things like wait() on it to force jobs to run. However, when redently adding more tests that use this, a prior test that was working, started always failing by timing out on the wait. So somehow the multiple tests are interfering with each other, but I can't figure out why. Running both the new test, or the old test, individually they run fine. It's only in combination that it fails.

What did you expect would happen?

Ideally we could have all tests be idempotent.

What happened?

Crashed. 400s timeout on calling dramatiq_q.wait() on a test that only runs one job and that job is being aborted by raising an error in the middleware and setting message.failed = True

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant