Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create multiprocessing/multithreading helper functions #6604

Closed
wants to merge 7 commits into from

Conversation

NoureldinYosri
Copy link
Collaborator

@NoureldinYosri NoureldinYosri commented May 15, 2024

Some code in experiments uses multiprocessing.Pool or ThreadPoolExecutor + there are places that could benefit from using either to speed things up. The methods introduced in this PR standardize the way this is done and runs the batches with a progress bar which would improve UX since batching/threading/multiprocessing is used when running long running processes so getting a progress bar is a nice feature.

@CirqBot CirqBot added the size: M 50< lines changed <250 label May 15, 2024
@NoureldinYosri NoureldinYosri marked this pull request as ready for review May 15, 2024 18:46
@NoureldinYosri NoureldinYosri requested review from vtomole, cduck and a team as code owners May 15, 2024 18:46
Copy link

codecov bot commented May 15, 2024

Codecov Report

Attention: Patch coverage is 98.63014% with 1 lines in your changes are missing coverage. Please review.

Project coverage is 97.81%. Comparing base (cf86dda) to head (b82e95a).
Report is 1 commits behind head on main.

Files Patch % Lines
cirq-core/cirq/work/multiprocessing.py 97.91% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #6604      +/-   ##
==========================================
- Coverage   97.81%   97.81%   -0.01%     
==========================================
  Files        1061     1063       +2     
  Lines       91656    91732      +76     
==========================================
+ Hits        89657    89730      +73     
- Misses       1999     2002       +3     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Collaborator

@pavoljuhas pavoljuhas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to change module name so it does not shadow builtin multiprocessing.

Also please convert to generator interface, ie, yield from the ...with_progress_bar functions.

@@ -0,0 +1,119 @@
# Copyright 2024 The Cirq Developers
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please rename from multiprocessing. We should avoid overlap with builtin module names if at all possible.

Perhaps use progress_bar instead?

from cirq.work.progress_bar import  starmap_with_progress_bar

General comment

Please check thread_map and process_map in tqdm.contrib.concurrent if they would suffice for your needs.
We can save ourselves from extra code to maintain, but if tqdm.contrib is not enough, let's proceed here.

Comment on lines +36 to +38
def execute_with_progress_bar(
func: Callable[..., _OUTPUT_T],
inputs: Iterable[Any],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For consistency with multiprocessing.Pool methods, consider renaming to map_with_progress_bar.
Also let's rename inputs --> iterable which seems to be used as that argument name in the builtin map and starmap functions.

Comment on lines +42 to +43
progress_bar: Callable[..., ContextManager] = tqdm.tqdm,
**progress_bar_args,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have any immidiate need for other progress bar implementations than tqdm.tqdm?
If not, please remove together with progress_bar_args kwargs.

In general we should have minimum interface that fulfills the needed functionality and add extra bells/whistles later if ever needed. Otherwise we may end up with an extra, never used arguments.

] = None,
progress_bar: Callable[..., ContextManager] = tqdm.tqdm,
**progress_bar_args,
) -> List[_OUTPUT_T]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please convert to a generator function with Iterator return type.
That would be inline with the Pool.map / Pool.starmap interface and would avoid blocking until everything is processed. If truly needed we can add a list-returning thin wrapper functions for convenience.

Args:
func: The callable to execute, the function takes a single argument.
inputs: An iterable of the argument to pass to the function.
pool: An optional multiprocessing or threading pool.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please describe what is the default pool when not provided.

Returns:
An out-of-order list of the results of the function calls.
"""
sequential_inputs = [*inputs]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit - better to be verbose and more readable

Suggested change
sequential_inputs = [*inputs]
sequential_inputs = list(inputs)

Comment on lines +59 to +64
if pool is None:
with progress_bar(total=len(sequential_inputs), **progress_bar_args) as progress:
for args in sequential_inputs:
results.append(func(args))
progress.update(1)
return results
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code is quite the same as in starmap_with_progress_bar.
Can you please delegate to starmap_with_progress_bar after massaging the inputs?

'pool_creator', [None, multiprocessing.Pool, concurrent.futures.ThreadPoolExecutor]
)
def test_execute_with_progress_bar(pool_creator):
desired = set([f'{x=}' for x in range(10)])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit - set can take generator expression, no need for a temporary list -

Suggested change
desired = set([f'{x=}' for x in range(10)])
desired = set(f'{x=}' for x in range(10))

Comment on lines +36 to +40
if pool_creator is None:
actual = set(execute_with_progress_bar(_sinle_arg_func, range(10), pool=None))
else:
with pool_creator(2) as pool:
actual = set(execute_with_progress_bar(_sinle_arg_func, range(10), pool=pool))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please pull / cherry-pick e5ef18f for a bit simplified test code -

git pull https://github.com/pavoljuhas/Cirq.git amend-6604

@NoureldinYosri
Copy link
Collaborator Author

closing in favor of using tqmd.contrib.concurrent

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
size: M 50< lines changed <250
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants