-
Notifications
You must be signed in to change notification settings - Fork 988
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Create multiprocessing/multithreading helper functions #6604
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #6604 +/- ##
==========================================
- Coverage 97.81% 97.81% -0.01%
==========================================
Files 1061 1063 +2
Lines 91656 91732 +76
==========================================
+ Hits 89657 89730 +73
- Misses 1999 2002 +3 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to change module name so it does not shadow builtin multiprocessing.
Also please convert to generator interface, ie, yield from the ...with_progress_bar
functions.
@@ -0,0 +1,119 @@ | |||
# Copyright 2024 The Cirq Developers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please rename from multiprocessing
. We should avoid overlap with builtin module names if at all possible.
Perhaps use progress_bar
instead?
from cirq.work.progress_bar import starmap_with_progress_bar
General comment
Please check thread_map
and process_map
in tqdm.contrib.concurrent if they would suffice for your needs.
We can save ourselves from extra code to maintain, but if tqdm.contrib is not enough, let's proceed here.
def execute_with_progress_bar( | ||
func: Callable[..., _OUTPUT_T], | ||
inputs: Iterable[Any], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For consistency with multiprocessing.Pool methods, consider renaming to map_with_progress_bar
.
Also let's rename inputs
--> iterable
which seems to be used as that argument name in the builtin map and starmap functions.
progress_bar: Callable[..., ContextManager] = tqdm.tqdm, | ||
**progress_bar_args, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you have any immidiate need for other progress bar implementations than tqdm.tqdm?
If not, please remove together with progress_bar_args
kwargs.
In general we should have minimum interface that fulfills the needed functionality and add extra bells/whistles later if ever needed. Otherwise we may end up with an extra, never used arguments.
] = None, | ||
progress_bar: Callable[..., ContextManager] = tqdm.tqdm, | ||
**progress_bar_args, | ||
) -> List[_OUTPUT_T]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please convert to a generator function with Iterator return type.
That would be inline with the Pool.map / Pool.starmap interface and would avoid blocking until everything is processed. If truly needed we can add a list-returning thin wrapper functions for convenience.
Args: | ||
func: The callable to execute, the function takes a single argument. | ||
inputs: An iterable of the argument to pass to the function. | ||
pool: An optional multiprocessing or threading pool. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please describe what is the default pool
when not provided.
Returns: | ||
An out-of-order list of the results of the function calls. | ||
""" | ||
sequential_inputs = [*inputs] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit - better to be verbose and more readable
sequential_inputs = [*inputs] | |
sequential_inputs = list(inputs) |
if pool is None: | ||
with progress_bar(total=len(sequential_inputs), **progress_bar_args) as progress: | ||
for args in sequential_inputs: | ||
results.append(func(args)) | ||
progress.update(1) | ||
return results |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code is quite the same as in starmap_with_progress_bar
.
Can you please delegate to starmap_with_progress_bar
after massaging the inputs?
'pool_creator', [None, multiprocessing.Pool, concurrent.futures.ThreadPoolExecutor] | ||
) | ||
def test_execute_with_progress_bar(pool_creator): | ||
desired = set([f'{x=}' for x in range(10)]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit - set
can take generator expression, no need for a temporary list -
desired = set([f'{x=}' for x in range(10)]) | |
desired = set(f'{x=}' for x in range(10)) |
if pool_creator is None: | ||
actual = set(execute_with_progress_bar(_sinle_arg_func, range(10), pool=None)) | ||
else: | ||
with pool_creator(2) as pool: | ||
actual = set(execute_with_progress_bar(_sinle_arg_func, range(10), pool=pool)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please pull / cherry-pick e5ef18f for a bit simplified test code -
git pull https://github.com/pavoljuhas/Cirq.git amend-6604
closing in favor of using |
Some code in
experiments
usesmultiprocessing.Pool
orThreadPoolExecutor
+ there are places that could benefit from using either to speed things up. The methods introduced in this PR standardize the way this is done and runs the batches with a progress bar which would improve UX since batching/threading/multiprocessing is used when running long running processes so getting a progress bar is a nice feature.