Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batches task doesn't trigger next task in chain #26

Open
anotherbugmaster opened this issue May 18, 2021 · 10 comments
Open

Batches task doesn't trigger next task in chain #26

anotherbugmaster opened this issue May 18, 2021 · 10 comments
Labels
enhancement New feature or request

Comments

@anotherbugmaster
Copy link

anotherbugmaster commented May 18, 2021

I'm trying to use celery-batches in chain, like this:

chain(
	task_foo.s(*args, **kwargs),
	batch_task.s(*args, **kwargs),
	task_bar.s(*args, **kwargs),
).apply_async()

Celery never registers task_bar as batch_task child and the chain cuts off. What could be the cause and how can I fix it?

@clokep
Copy link
Owner

clokep commented May 18, 2021

Looks like we don't call into the chains (or groups) after a successful task run, this would need to port some of the code around https://github.com/celery/celery/blob/2411504f4164ac9acfa20007038d37591c6f57e5/celery/app/trace.py#L508-L516

This should be fairly easy to do and to add tests for.

@clokep clokep added the enhancement New feature or request label May 18, 2021
@anotherbugmaster
Copy link
Author

anotherbugmaster commented May 19, 2021

I'm trying to figure out how all of this works. Do I need to apply_async the next task from the chain inside on_return callback?

def on_return(result):

@anotherbugmaster
Copy link
Author

anotherbugmaster commented May 19, 2021

Another thing I don't understand is why do we need pickleable SimpleRequest at all? I can't see where we serialize it, seems like we could just use an ordinary Request.

@clokep
Copy link
Owner

clokep commented May 19, 2021

I'm trying to figure out how all of this works. Do I need to apply_async the next task from the chain inside on_return callback?

If you're trying to modify celery-batches, I think the thing to do is to take a look near:

else:
if success_receivers:
send_success(sender=task, result=result)

Inside the else clause we need to copy the code I linked to above, but modify it to iterate over all the tasks (which are weirdly stored in args[0]). So it would end up being something like:

else:
    try:
        for task_request in args[0]:
            chain = task_request.chain
            if chain:
                ...
    else:
        if success_receivers:
            send_success(sender=task, result=result)

But as you say below....this probably needs to not use SimpleRequest. 😄

Another thing I don't understand is why do we need pickleable SimpleRequest at all? I can't see where we serialize it, seems like we could just use an ordinary Request.

I was trying to remember this yesterday when looking at #24, I believe it gets pickled when you use prefork pool worker (or maybe it used to be pickled and is no longer in newer versions of Celery).

@anotherbugmaster
Copy link
Author

Unfortunately it seems that batch tasks are also not passing their result to the next element in a chain. Next element only gets the return value of pseudo task, here's a minimal example:

def test_chain(celery_app, celery_worker):
    result = chain(
        cumadd.s(1),
        cumadd.s(),
    ).apply()

    # An EagerResult that resolve to 1 should be returned.
    assert result.get() == 1

@anotherbugmaster
Copy link
Author

anotherbugmaster commented May 19, 2021

By the way, second cumadd gets called, so I'm not so sure why it didn't work the first time

@clokep
Copy link
Owner

clokep commented May 19, 2021

There's a bit of a fan-in/fan-out question here. It isn't always clear what the different chains mean:

  • Normal -> Batch -> Normal
    • Call a task
    • Use the result as one of the potential requests to a Batch task
    • Call a task with each request of the batch task (?)
  • Batch -> Batch
    • Each batch request result should be sent to a new batch task?

So this might be trickier to solve than I first thought.

@anotherbugmaster
Copy link
Author

I think of Batches like a control structure of sort - we keep our task DAG the same, but some tasks are executed together. So I see your cases like this:

  • Normal -> Batch -> Normal
    • Call a task
    • Use the result as one of the potential requests to a Batch task
    • Call a cask with each request of the batch task - yep, because the previous task returns a single instance of result, not a batch
  • Batch -> Batch
    • Each batch request result should be sent to a new batch task - as soon as single task from batch is ready, we send it to the next task and start accumulating again

@anotherbugmaster
Copy link
Author

I suppose that we can't implement that behavior using parent task, as we do now?

@clokep
Copy link
Owner

clokep commented May 19, 2021

You might be able to explicitly fire other tasks from the batch task, but that can cause issues if you're not careful (I can't seem to find the warning in the Celery docs at the moment...) It would probably be OK if you're using a separate queue for batch tasks (which I've found to be a good idea anyway, I should probably document that...)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants