Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: make publish futures compatible with concurrent.futures.as_completed() #397

Merged
merged 6 commits into from May 15, 2021

Conversation

plamut
Copy link
Contributor

@plamut plamut commented Apr 26, 2021

Closes #368.
Possibly supersedes #374.

This is a different approach to the same feature request which does not rely on implementation details of concurrent.futures.Future (as discussed offline). Instead, the custom base Future implementation is replaced with the one from concurrent.futures package with only minor adjustments to fit specific needs of publisher/subscriber futures.

The interface stays the same, and the isinstance() checks should keep working, as concurrent.futures.Future is injected is injected into the right place in MRO.

PR checklist

  • Make sure to open an issue as a bug/issue before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea
  • Ensure the tests and linter pass
  • Code coverage does not decrease (if any source code was changed)
  • Appropriate docs were updated (if necessary)

@plamut plamut requested a review from pradn April 26, 2021 14:12
@google-cla google-cla bot added the cla: yes This human has signed the Contributor License Agreement. label Apr 26, 2021
@product-auto-label product-auto-label bot added the api: pubsub Issues related to the googleapis/python-pubsub API. label Apr 26, 2021
@plamut
Copy link
Contributor Author

plamut commented Apr 26, 2021

@dpcollins-google Could this break Pub/Sub Lite in any way? Does the Lite client currently use any Future internals?

The futures implementation is adjusted to work well with the built-in
function with the same name in `concurrent.futures` package.
@plamut
Copy link
Contributor Author

plamut commented Apr 26, 2021

module 'concurrent.futures' has no attribute 'InvalidStateError' in Python 3.7 and below. Will refactor the affected unit tests.

If setting a result/exception on a concurrent.futures.Future object,
an exception is raised only in Python3.8+, thus we conditionally
disable two unit tests.

This behavior change is fine, though, because users should never use
the set_result() and set_exception() methods directly.
@plamut
Copy link
Contributor Author

plamut commented Apr 27, 2021

Status checks got stuck.

@plamut plamut added the kokoro:force-run Add this label to force Kokoro to re-run the tests. label Apr 27, 2021
@plamut plamut marked this pull request as ready for review April 27, 2021 10:35
@plamut plamut requested a review from a team as a code owner April 27, 2021 10:35
@plamut plamut added kokoro:force-run Add this label to force Kokoro to re-run the tests. and removed kokoro:force-run Add this label to force Kokoro to re-run the tests. labels Apr 28, 2021
@yoshi-kokoro yoshi-kokoro removed the kokoro:force-run Add this label to force Kokoro to re-run the tests. label Apr 28, 2021
@plamut plamut added kokoro:force-run Add this label to force Kokoro to re-run the tests. kokoro:run Add this label to force Kokoro to re-run the tests. labels Apr 30, 2021
@yoshi-kokoro yoshi-kokoro removed kokoro:run Add this label to force Kokoro to re-run the tests. kokoro:force-run Add this label to force Kokoro to re-run the tests. labels Apr 30, 2021
@jimfulton jimfulton self-requested a review May 13, 2021 18:43
Copy link
Contributor

@jimfulton jimfulton left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks great. I love the code reduction!

I'm a little nervous about the subclassing.

I would minimize the number of overridden methods.

In StreamingPullFuture, I'd change the private attributes to use a double- rather than a single-underscore prefixes to get name mangling and greatly reduce the chance of a future conflict with the base class.

google/cloud/pubsub_v1/futures.py Outdated Show resolved Hide resolved
# Set the result and trigger the future.
self._result = result
self._trigger()
return super().set_result(result=result)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this override?

self._completed.set()
for callback in self._callbacks:
callback(self)
return super().set_exception(exception=exception)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this override?

if err is None:
return self._result
raise err
return super().result(timeout=timeout)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this override?

@plamut
Copy link
Contributor Author

plamut commented May 14, 2021

In StreamingPullFuture, I'd change the private attributes to use a double- rather than a single-underscore prefixes to get name mangling and greatly reduce the chance of a future conflict with the base class.

This is a very good suggestion, a case where we really do want to assure there's no accidental overriding.

Why do we need this override? (x3)

To override the base implementations' docstrings', we want to prereserve the existing docstrings of Pub/Sub futures' methods. Do you know if there's a more idiomatic way to do this other than merely delegating the call to the implementation up the MRO chain?

Do we know of any clients that are passing this argument? (I see this package was one. :) )

Do we plan to get rid of this in the near future?

The library only implements a thread-based batch, which is an internal default the publisher client uses. That batch class is the only thing that passed in the now-removed completed argument, and we are not aware of any 3rd parties that would considerably mangle the client code to change the publisher concurrency paradigm.

End users also do not instantiate the futures directly (and if they do, they violate the contract :) ).

No plans for removing the completed argument yet, but could make sense to do so, since since concurrent.futures.Future tracks its own state differently and doesn't need it.

@plamut plamut added the kokoro:force-run Add this label to force Kokoro to re-run the tests. label May 14, 2021
@yoshi-kokoro yoshi-kokoro removed the kokoro:force-run Add this label to force Kokoro to re-run the tests. label May 14, 2021
@jimfulton
Copy link
Contributor

Why do we need this override? (x3)

To override the base implementations' docstrings', we want to prereserve the existing docstrings of Pub/Sub futures' methods. Do you know if there's a more idiomatic way to do this other than merely delegating the call to the implementation up the MRO chain?

IMO. It's not worth it.

The less overriding the better.

Do we know of any clients that are passing this argument? (I see this package was one. :) )
Do we plan to get rid of this in the near future?

The library only implements a thread-based batch, which is an internal default the publisher client uses. That batch class is the only thing that passed in the now-removed completed argument, and we are not aware of any 3rd parties that would considerably mangle the client code to change the publisher concurrency paradigm.

End users also do not instantiate the futures directly (and if they do, they violate the contract :) ).

No plans for removing the completed argument yet, but could make sense to do so, since since concurrent.futures.Future tracks its own state differently and doesn't need it.

I would plan to remove it "soon".

If we're confident that no clients are calling this, then I'd remove it now.

Every line of code we own is a liability -- overridden inherited code from external (to us) dependencies doubly (or triply) so.

These are suggestions. My approval stands.

@jimfulton
Copy link
Contributor

I have nothing more to say. :)

@plamut
Copy link
Contributor Author

plamut commented May 14, 2021

I'll re-think about actually removing the completed argument right now when I re-visit this on Monday, since a loud error is probably better than silently changing the argument's effect to a no-op. Ditto for the other remark about overriding. Thanks for all the feedback!

@dpcollins-google
Copy link
Contributor

lgtm, shouldn't break pub/sub lite

@anguillanneuf
Copy link
Contributor

@plamut After this update is released, shall we update our publish/subscribe samples? Or add new samples to demonstrate how to work with these futures?

This parameter is unlikely to be used by any 3rd party code, but
even if it is, it's better to cause a loud error rather than silently
changing its effect to a no-op.
@plamut
Copy link
Contributor Author

plamut commented May 15, 2021

On a second thought, removing the completed parameter is actually better - it will cause a loud error for any code that uses it, instead of possibly introducing non-obvious bugs if its behavior was silently changed to a no-op. As for the overriding methods that preserve the existing docstrings and keep it under our control, I opted to keep them.

After this update is released, shall we update our publish/subscribe samples? Or add new samples to demonstrate how to work with these futures?

@anguillanneuf The existing samples should not need any modifications, since that's actually one of the goals - futures should continue working as before.

On the other hand, it would make sense to demonstrate their new capability, i.e. compatibility with concurrent.futures.as_completed(). Maybe as a use case where somebody publishes multiple messages in a loop and then wants to block until all operations complete?

Something like the following:

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_NAME, TOPIC_NAME)

future_to_msg = {}

for i in range(10):
    msg = f"message {i}".encode()
    suffix = "" if i % 2 else "x"
    future = publisher.publish(topic_path + suffix, msg)
    future_to_msg[future] = msg
    time.sleep(0.01)

for future in concurrent.futures.as_completed(future_to_msg):
    msg = future_to_msg[future].decode()
    try:
        result = future.result()
    except Exception as exc:
        print(f"Error publishing message {msg}: {exc}")
    else:
        print(f"Message successfully published: {msg}")

Sample output (with logging level set to CRITICAL to cut off background thread noise):

Message successfully published: message 5
Error publishing message message 0: 404 Resource not found (resource=topic-*****).
Error publishing message message 8: 404 Resource not found (resource=topic-*****).
Message successfully published: message 7
Message successfully published: message 9
Message successfully published: message 3
Message successfully published: message 1
Error publishing message message 6: 404 Resource not found (resource=topic-*****).
Error publishing message message 2: 404 Resource not found (resource=topic-*****).
Error publishing message message 4: 404 Resource not found (resource=topic-*****).

@plamut plamut added the kokoro:force-run Add this label to force Kokoro to re-run the tests. label May 15, 2021
@yoshi-kokoro yoshi-kokoro removed the kokoro:force-run Add this label to force Kokoro to re-run the tests. label May 15, 2021
@plamut plamut added the automerge Merge the pull request once unit tests and other checks pass. label May 15, 2021
@plamut plamut merged commit e29a2c0 into googleapis:master May 15, 2021
@gcf-merge-on-green gcf-merge-on-green bot removed the automerge Merge the pull request once unit tests and other checks pass. label May 15, 2021
@plamut plamut deleted the iss-368-b branch May 15, 2021 14:32
gcf-merge-on-green bot pushed a commit that referenced this pull request May 18, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the googleapis/python-pubsub API. cla: yes This human has signed the Contributor License Agreement.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Make publish futures compatible with concurrent.futures.as_completed()
5 participants