Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature request: pipeline with dynamic children #625

Open
yoav-orca opened this issue Apr 30, 2024 · 0 comments
Open

Feature request: pipeline with dynamic children #625

yoav-orca opened this issue Apr 30, 2024 · 0 comments

Comments

@yoav-orca
Copy link

I'm trying to migrate away from Celery, but I have the following scenario Workflow:

  • Build a list of tasks
  • process tasks (fanout)
  • Join the results

In celery, I can do the following:

@shared_task
def build_task_list(num: int) -> list[str]:
    return [f"Task-{i}" for i in range(num)]

@shared_task
def process_and_gather(task_list: list[str], gather_task: Singature) -> None
    return (group(process.s(t) for t in task_list) | gather_task).delay()

@shared_task
def process(task: str) -> str:
    return f"{task}: done"

@shared_task
def combine_results(results: list[str]) -> str:
    return '\n'.join(results)

# full workflow

workflow = build_task_list.s(100) | process_and_gather.s(gather_task=combine_results.s())
result = workflow.delay()
print(result.get())

It's not possible to do right now in dramatic, I believe it would be a great addition to the library.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant