Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issues with _ParallelRead / Redesign adding optional Worker nodes #75

Open
leo-schick opened this issue May 13, 2022 · 0 comments
Open
Assignees
Labels
enhancement New feature or request

Comments

@leo-schick
Copy link
Member

leo-schick commented May 13, 2022

User story

I came across an interesting problem while reading multiple files via a subclass of _ParallelRead: With the current design _ParallelRead loads all files to be processed into RAM, decides which tasks to do first and then starts the parallel reading.

I have a folder with over 1.3 million files which needs to be processed on a cloud storage. It takes ages to get to the point that mara starts reading (and when you have a invalid file, all starts over again...)

In addition, it looks to me that the calculation is inefficient when one would load a lot of files which have different sizes (= different processing times).

Here is what I came up with

I redesigned the base class ParallelTask to support using generic Worker nodes instead of tasks. This is an optional mode which needs to be activated with self.use_workers = True.
The Worker nodes will get their commands during runtime of the pipeline (in contrast to the Task node which requires that its commands are defined upfront).

An additional function feed_workers in class ParallelTask can be overloaded. This method is run in a separate process during pipeline execution and yields the commands which are then passed over to the workers. You can eigher yield a single command or a command list. In case you yield a command list, the list is only passed to a single worker. (This is necessary because in some cases you want to execute several commands in order for a single file).

Since the workers now get their files / commands passed on runtime form an "message queue", I expect this logic to work better when many files need to be processed.

This new design does not work for all _ParallelRead execution options, so it is only used when possible.

PR: #74

Some points to note

  1. When the feed_workers function throws an exception all commands already in the queue will be processed by the worker tasks. There is no implementation done to inform the worker nodes that they should stop their work. They will stop when all open commands in the queue picket up.
  2. When a worker node fails, the other workers and the feed woerker process will continue their work until the Queue is empty.

This implementation only works with ReadMode.ALL.

@leo-schick leo-schick added the enhancement New feature or request label May 13, 2022
@leo-schick leo-schick self-assigned this May 13, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

1 participant