New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: make publish futures compatible with concurrent.futures.as_completed() #397
Conversation
@dpcollins-google Could this break Pub/Sub Lite in any way? Does the Lite client currently use any |
The futures implementation is adjusted to work well with the built-in function with the same name in `concurrent.futures` package.
|
If setting a result/exception on a concurrent.futures.Future object, an exception is raised only in Python3.8+, thus we conditionally disable two unit tests. This behavior change is fine, though, because users should never use the set_result() and set_exception() methods directly.
Status checks got stuck. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks great. I love the code reduction!
I'm a little nervous about the subclassing.
I would minimize the number of overridden methods.
In StreamingPullFuture
, I'd change the private attributes to use a double- rather than a single-underscore prefixes to get name mangling and greatly reduce the chance of a future conflict with the base class.
# Set the result and trigger the future. | ||
self._result = result | ||
self._trigger() | ||
return super().set_result(result=result) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this override?
self._completed.set() | ||
for callback in self._callbacks: | ||
callback(self) | ||
return super().set_exception(exception=exception) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this override?
if err is None: | ||
return self._result | ||
raise err | ||
return super().result(timeout=timeout) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this override?
This is a very good suggestion, a case where we really do want to assure there's no accidental overriding.
To override the base implementations' docstrings', we want to prereserve the existing docstrings of Pub/Sub futures' methods. Do you know if there's a more idiomatic way to do this other than merely delegating the call to the implementation up the MRO chain?
The library only implements a thread-based batch, which is an internal default the publisher client uses. That batch class is the only thing that passed in the now-removed End users also do not instantiate the futures directly (and if they do, they violate the contract :) ). No plans for removing the |
IMO. It's not worth it. The less overriding the better.
I would plan to remove it "soon". If we're confident that no clients are calling this, then I'd remove it now. Every line of code we own is a liability -- overridden inherited code from external (to us) dependencies doubly (or triply) so. These are suggestions. My approval stands. |
I have nothing more to say. :) |
I'll re-think about actually removing the |
lgtm, shouldn't break pub/sub lite |
@plamut After this update is released, shall we update our publish/subscribe samples? Or add new samples to demonstrate how to work with these futures? |
This parameter is unlikely to be used by any 3rd party code, but even if it is, it's better to cause a loud error rather than silently changing its effect to a no-op.
On a second thought, removing the
@anguillanneuf The existing samples should not need any modifications, since that's actually one of the goals - futures should continue working as before. On the other hand, it would make sense to demonstrate their new capability, i.e. compatibility with Something like the following: publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_NAME, TOPIC_NAME)
future_to_msg = {}
for i in range(10):
msg = f"message {i}".encode()
suffix = "" if i % 2 else "x"
future = publisher.publish(topic_path + suffix, msg)
future_to_msg[future] = msg
time.sleep(0.01)
for future in concurrent.futures.as_completed(future_to_msg):
msg = future_to_msg[future].decode()
try:
result = future.result()
except Exception as exc:
print(f"Error publishing message {msg}: {exc}")
else:
print(f"Message successfully published: {msg}") Sample output (with logging level set to CRITICAL to cut off background thread noise):
|
🤖 I have created a release \*beep\* \*boop\* --- ## [2.5.0](https://www.github.com/googleapis/python-pubsub/compare/v2.4.2...v2.5.0) (2021-05-18) ### Features * make publish futures compatible with concurrent.futures.as_completed() ([#397](https://www.github.com/googleapis/python-pubsub/issues/397)) ([e29a2c0](https://www.github.com/googleapis/python-pubsub/commit/e29a2c0ac6c5d2ebf2311646e552a02f184cfedc)) ### Bug Fixes * scheduler errors when executor in shutdown ([#399](https://www.github.com/googleapis/python-pubsub/issues/399)) ([39a83d3](https://www.github.com/googleapis/python-pubsub/commit/39a83d3eef196e88478ad8362201a2ab12e9f681)) --- This PR was generated with [Release Please](https://github.com/googleapis/release-please). See [documentation](https://github.com/googleapis/release-please#release-please).
Closes #368.
Possibly supersedes #374.
This is a different approach to the same feature request which does not rely on implementation details of
concurrent.futures.Future
(as discussed offline). Instead, the custom baseFuture
implementation is replaced with the one fromconcurrent.futures
package with only minor adjustments to fit specific needs of publisher/subscriber futures.The interface stays the same, and the
isinstance()
checks should keep working, asconcurrent.futures.Future
is injected is injected into the right place in MRO.PR checklist