Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dramatiq.composition.pipeline is not working as expected with reference to the documentation. #524

Open
7 tasks
priyanshu-mayank-tech opened this issue Jan 10, 2023 · 4 comments

Comments

@priyanshu-mayank-tech
Copy link

Issues

I think dramatiq.composition.pipeline is not working as expected. The output/result of first actor is not getting passed to second actor.

Checklist

  • Does your title concisely summarize the problem?
  • Did you include a minimal, reproducible example?
  • What OS are you using?
  • What version of Dramatiq are you using?
  • What did you do?
  • What did you expect would happen?
  • What happened?

What OS are you using?

macOS 13.1

What version of Dramatiq are you using?

dramatiq == 1.13.0

What did you do?

I tries to use the composition.pipeline([actor1.message(input), actor2.message()], broker=broker).run()

What did you expect would happen?

As per documentation, the output result/message from first actor i.e. actor1 should get passed to second actor i.e. actor2.
But that is not happening here.

What happened?

But the output of first actor i.e. actor1 is not getting passed to second actor i.e. actor2.

@synweap15
Copy link
Contributor

Can you show a minimal reproducible example? Make sure you're not overriding the default middlewares, or if that's intended, add Pipelines middleware manually.

@priyanshu-mayank-tech
Copy link
Author

priyanshu-mayank-tech commented Jan 10, 2023

Can you show a minimal reproducible example? Make sure you're not overriding the default middlewares, or if that's intended, add Pipelines middleware manually.

@synweap15 - Following is the code for your reference:

from dramatiq.middleware import Pipelines

from dramatiq.composition import pipeline
from dramatiq.brokers.rabbitmq import RabbitmqBroker
from dramatiq.middleware.retries import Retries
from dramatiq.results import Results
from dramatiq.results.backends import RedisBackend
from redis import Redis


result_backend = RedisBackend(client=Redis(host="redis", port=6379))
host = "rabbitmq"
port = 5672
user = "guest"
password = "guest"
rabbit_url = f"amqp://{user}:{password}@{host}:{port}"

broker = RabbitmqBroker(
    url=rabbit_url,
    middleware=[Retries(), Results(backend=result_backend), Pipelines()],)
dramatiq.set_broker(broker)
msg = {
    "artist_docs": {
        "type": "track",
    },
}


@dramatiq.actor()
def act_one(message):
    artist = "Myself"
    message["artist"] = artist
    return message


@dramatiq.actor(store_results=True)
def act_two(received_msg):
    sub_artist = "everyone"
    received_msg["sub_artist"] = sub_artist
    return received_msg


def main():
    pipe = pipeline([
        act_one.message(msg),
        act_two.message()
    ], broker=broker).run()

Error:

2023-01-10 11:54.16 [error    ] Received message for undefined actor 'act_one'. Moving it to the DLQ. 
[dramatiq.worker.ConsumerThread(default)]
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/dramatiq/worker.py", line 325, in handle_message
    actor = self.broker.get_actor(message.actor_name)
  File "/usr/local/lib/python3.10/dist-packages/dramatiq/broker.py", line 230, in get_actor
    raise ActorNotFound(actor_name) from None
dramatiq.errors.ActorNotFound: act_one

After this I tried replacing act_one.message(msg) with act_one(msg) only. In that case, I am getting the following error:

 AttributeError("'dict' object has no attribute 'options'")

@synweap15
Copy link
Contributor

Are the broker definition and actors definitions placed in the same file? If not, you need to start the worker also pointing to the file which contains the actor's definitions:

dramatiq [-h] [--processes PROCESSES] [--threads THREADS] [--path [PATH [PATH ...]]] [--queues [QUEUES [QUEUES ...]]] [--pid-file PID_FILE] [--log-file LOG_FILE] [--skip-logging] [--use-spawn] [--fork-function FORKS] [--version] [--verbose] broker [module [module ...]]

So eg. dramatiq broker actors, assuming you have the broker defined in broker.py and actors in actors.py

@priyanshu-mayank-tech
Copy link
Author

As you can see from above code, both the actors & broker are defined in the same file.

broker = RabbitmqBroker(
    url=rabbit_url,
    middleware=[Retries(), Results(backend=result_backend), Pipelines()],)
dramatiq.set_broker(broker)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants