Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix blocking worker queue when scheduling in parallel #3013

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

riga
Copy link
Contributor

@riga riga commented Oct 28, 2020

Hi,

this PR is meant to fix* and improve the parallel scheduling of tasks. To demonstrate what is currently failing (* therefore the fix), I added the WIP label and only added one commit with a simple test that currently, against my expectation, does not pass but blocks the entire process.

Description

The blocking occurs when parallel scheduling is enabled but a task is not picklable. The scheduling is mostly implemented in Worker.add,

luigi/luigi/worker.py

Lines 730 to 774 in cf2abbd

def add(self, task, multiprocess=False, processes=0):
"""
Add a Task for the worker to check and possibly schedule and run.
Returns True if task and its dependencies were successfully scheduled or completed before.
"""
if self._first_task is None and hasattr(task, 'task_id'):
self._first_task = task.task_id
self.add_succeeded = True
if multiprocess:
queue = multiprocessing.Manager().Queue()
pool = multiprocessing.Pool(processes=processes if processes > 0 else None)
else:
queue = DequeQueue()
pool = SingleProcessPool()
self._validate_task(task)
pool.apply_async(check_complete, [task, queue])
# we track queue size ourselves because len(queue) won't work for multiprocessing
queue_size = 1
try:
seen = {task.task_id}
while queue_size:
current = queue.get()
queue_size -= 1
item, is_complete = current
for next in self._add(item, is_complete):
if next.task_id not in seen:
self._validate_task(next)
seen.add(next.task_id)
pool.apply_async(check_complete, [next, queue])
queue_size += 1
except (KeyboardInterrupt, TaskException):
raise
except Exception as ex:
self.add_succeeded = False
formatted_traceback = traceback.format_exc()
self._log_unexpected_error(task)
task.trigger_event(Event.BROKEN_TASK, task, ex)
self._email_unexpected_error(task, formatted_traceback)
raise
finally:
pool.close()
pool.join()
return self.add_succeeded

where processes of a pool put the results of task completeness checks back into a queue. However, when the task is not picklable, the queue is never filled so that queue.get() will block forever without being able to access exceptions raised in the process. Instead, the exception is hold by the AsyncResult object returned by apply_async which is not used in Worker.add, so there is currently no handle to catch these cases.

Possible solution

IMHO, the root cause for this issue is the difference between the actual multiprocessing.Pool.apply_async (returning an async result object) and the custom SingleProcessPool.apply_async (instantly returning the result of the function call). The latter should produce something like a SyncResult with the same API as AsyncResult. Following this, one should rely on its built-in functionality to wrap exceptions (re-raised when calling get()) and a lot of the custom exception handling in worker.py would be obsolete (such as AsyncCompletionException and TracebackWrapper) which simplifies things quite a lot.

I already implemented this (all tests pass) and will push it if you agree :)

I know that you guys are (understandably) very cautious when it comes to changes in core code, but I think the parallel scheduling could really benefit from this change / fix.

Motivation and Context

Our dependency trees sometimes take O(h) to build as our completeness checks require a lot of remote resources. Therefore, we make heavy use of the parallel scheduling feature. Besides this issue/PR, we plan to submit two additional PRs in the future:

  • We noticed that a list of tasks started with interface.build(many_tasks_in_a_list, ...) is not using the parallel scheduling at all (because of this loop).
  • We would like to start running tasks already while the dependency tree is being built. Most of the required functionality is actually already there and would need only minor additions.

Have you tested this? If so, how?

Yes, the test is included and will pass once the parallel scheduling is fixed.

i = luigi.IntParameter()

tasks = [UnpicklableTask(i=i) for i in range(5)]
self.assertFalse(self.schedule_parallel(tasks))
Copy link
Contributor Author

@riga riga Oct 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line never terminates as can be seen in the first build log.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 043b4c6.

@riga riga changed the title [WIP] Fix blocking worker queue when scheduling in parallel [WIP, test fails intentionally] Fix blocking worker queue when scheduling in parallel Oct 30, 2020
@riga riga changed the title [WIP, test fails intentionally] Fix blocking worker queue when scheduling in parallel [WIP, test fails intentionally, waiting for feedback] Fix blocking worker queue when scheduling in parallel Nov 5, 2020
@sognetic
Copy link
Contributor

sognetic commented Jun 18, 2021

This seems like a really interesting improvement, is there any chance of this making it into luigi? In addition, I would be also very interested in the two improvements mentioned above, could these be included in luigi without the PR suggested here?

@lallea
Copy link
Contributor

lallea commented Jun 18, 2021

I am not a maintainer, but I think it would be great to get this in. We also use parallel scheduling to cut down latency until first task execution.

@riga
Copy link
Contributor Author

riga commented Jun 23, 2021

I essentially have all changes I mentioned above ready in personal branches (and then work kicked in and couldn't follow up on this anymore).

@dlstadther If you agree with the ideas above, I would open PRs.

@dlstadther
Copy link
Collaborator

I'm good with the submission of a fix to resolve this issue. Note that I don't utilize the Luigi project for work anymore, so i can only review as an outsider now. I'll do my best to be responsive, but times might be slow.

Also, Parallel scheduling is optional behaviour and is not enabled by default.

@riga riga requested a review from a team as a code owner July 5, 2021 15:12
@riga riga changed the title [WIP, test fails intentionally, waiting for feedback] Fix blocking worker queue when scheduling in parallel Fix blocking worker queue when scheduling in parallel Jul 5, 2021
@riga
Copy link
Contributor Author

riga commented Jul 5, 2021

I committed the fix of the loop doing the completeness checks during initial scheduling.

The main change is that queue objects are no longer required for storing the results of complete() calls across processes, but the current implementation rather fully relies on Async / SyncResult's for the multiprocess.Pool / SingleProcessPool cases. Actually, AsyncResult's have been used before as well, but somewhat mixed with said queues.

IMHO, this simplifies the code quite a lot and allows to add the 2 additional features I mentioned in the PR description (in future PRs), but I understand that changes to the core require a lot of scrutiny. @dlstadther Any idea who we can include for additional review?

The tests pass, but for some reason the coverage drops by ~8% and I fail to see where / how exactly ...

@dlstadther
Copy link
Collaborator

@riga ; sorry, i've been a bit busy lately and haven't gotten to review this yet. I'll try to get to it in the next week.

As for others to review, that's a bit challenging. Unsure if @honnix or other Spotify folks can review.

@lallea
Copy link
Contributor

lallea commented Jul 18, 2021

With the usual disclaimer that I am not a maintainer, I am familiar with the code and would like this PR to get in, so I'll contribute a review. It overall looks good. The polling itches a bit, since it might add up to measurable CPU usage for long tasks, but I don't have a suggestion for a way to avoid it that wouldn't add complexity. Luigi is built on a concept of polling, so I guess we should not be too picky. :-)

I'll add a few nitpicks inline.

break
else:
time.sleep(wait_interval)
continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The for/else + continue makes the flow unclear. How about packing up the rest of the loop into a new method and calling it before break?

Copy link
Collaborator

@dlstadther dlstadther left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have an optional comment and a question. Assuming no impact, i'm good to approve. (Sorry for taking a month to return to this review when I said I'd be a week)!

"""

def __init__(self, func, args=None, kwargs=None):
super(SyncResult, self).__init__()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

luigi isn't officially supporting py 2.7 anymore so this could just be super().__init__(), but totally optional

Comment on lines -400 to +449
try:
is_complete = task.complete()
except Exception:
is_complete = TracebackWrapper(traceback.format_exc())
out_queue.put((task, is_complete))
return task.complete()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the potential for hitting an exception here? Was the previous catch never caught and thus pointless? Or are we introducing the opportunity here for an unhandled exception?

@stale
Copy link

stale bot commented Jan 9, 2022

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. If closed, you may revisit when your time allows and reopen! Thank you for your contributions.

@stale stale bot added the wontfix label Jan 9, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants