diff --git a/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_publisher_client.py b/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_publisher_client.py index 249463c6..4bfe9cf5 100644 --- a/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_publisher_client.py +++ b/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_publisher_client.py @@ -47,7 +47,7 @@ def publish( data: bytes, ordering_key: str = "", **attrs: Mapping[str, str] - ) -> Future: + ) -> "Future[str]": if isinstance(topic, str): topic = TopicPath.parse(topic) publisher = self._multiplexer.get_or_create( diff --git a/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_subscriber_client.py b/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_subscriber_client.py index 3c4d9dfe..928984ae 100644 --- a/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_subscriber_client.py +++ b/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_subscriber_client.py @@ -13,7 +13,7 @@ # limitations under the License. from concurrent.futures.thread import ThreadPoolExecutor -from typing import Union, Optional, Set, Callable, cast +from typing import Union, Optional, Set from google.cloud.pubsub_v1.subscriber.futures import StreamingPullFuture @@ -61,13 +61,12 @@ def cancel_streaming_pull_future(fut: StreamingPullFuture): def subscribe( self, subscription: Union[SubscriptionPath, str], - callback: Callable, # TODO(dpcollins): Change to MessageCallback, + callback: MessageCallback, per_partition_flow_control_settings: FlowControlSettings, fixed_partitions: Optional[Set[Partition]] = None, ) -> StreamingPullFuture: if isinstance(subscription, str): subscription = SubscriptionPath.parse(subscription) - callback = cast(MessageCallback, callback) def create_and_open(): underlying = self._underlying_factory( diff --git a/google/cloud/pubsublite/cloudpubsub/internal/publisher_impl.py b/google/cloud/pubsublite/cloudpubsub/internal/publisher_impl.py index 2e66a380..7f25e77d 100644 --- a/google/cloud/pubsublite/cloudpubsub/internal/publisher_impl.py +++ b/google/cloud/pubsublite/cloudpubsub/internal/publisher_impl.py @@ -35,7 +35,7 @@ def __init__(self, underlying: AsyncSinglePublisher): def publish( self, data: bytes, ordering_key: str = "", **attrs: Mapping[str, str] - ) -> Future: + ) -> "Future[str]": return self._managed_loop.submit( self._underlying.publish(data=data, ordering_key=ordering_key, **attrs) ) diff --git a/google/cloud/pubsublite/cloudpubsub/publisher_client.py b/google/cloud/pubsublite/cloudpubsub/publisher_client.py index 7a984ff9..9ddd4da5 100644 --- a/google/cloud/pubsublite/cloudpubsub/publisher_client.py +++ b/google/cloud/pubsublite/cloudpubsub/publisher_client.py @@ -94,7 +94,7 @@ def publish( data: bytes, ordering_key: str = "", **attrs: Mapping[str, str] - ) -> Future: + ) -> "Future[str]": self._require_stared.require_started() return self._impl.publish( topic=topic, data=data, ordering_key=ordering_key, **attrs diff --git a/google/cloud/pubsublite/cloudpubsub/publisher_client_interface.py b/google/cloud/pubsublite/cloudpubsub/publisher_client_interface.py index 6499b407..883a9518 100644 --- a/google/cloud/pubsublite/cloudpubsub/publisher_client_interface.py +++ b/google/cloud/pubsublite/cloudpubsub/publisher_client_interface.py @@ -67,7 +67,7 @@ def publish( data: bytes, ordering_key: str = "", **attrs: Mapping[str, str], - ) -> Future: + ) -> "Future[str]": """ Publish a message. diff --git a/google/cloud/pubsublite/cloudpubsub/subscriber_client.py b/google/cloud/pubsublite/cloudpubsub/subscriber_client.py index 12ecdae5..eda098fb 100644 --- a/google/cloud/pubsublite/cloudpubsub/subscriber_client.py +++ b/google/cloud/pubsublite/cloudpubsub/subscriber_client.py @@ -13,7 +13,7 @@ # limitations under the License. from concurrent.futures.thread import ThreadPoolExecutor -from typing import Optional, Union, Set, AsyncIterator, Callable +from typing import Optional, Union, Set, AsyncIterator from google.api_core.client_options import ClientOptions from google.auth.credentials import Credentials @@ -34,6 +34,7 @@ from google.cloud.pubsublite.cloudpubsub.subscriber_client_interface import ( SubscriberClientInterface, AsyncSubscriberClientInterface, + MessageCallback, ) from google.cloud.pubsublite.internal.constructable_from_service_account import ( ConstructableFromServiceAccount, @@ -99,7 +100,7 @@ def __init__( def subscribe( self, subscription: Union[SubscriptionPath, str], - callback: Callable, # TODO(dpcollins): Change to MessageCallback, + callback: MessageCallback, per_partition_flow_control_settings: FlowControlSettings, fixed_partitions: Optional[Set[Partition]] = None, ) -> StreamingPullFuture: diff --git a/google/cloud/pubsublite/cloudpubsub/subscriber_client_interface.py b/google/cloud/pubsublite/cloudpubsub/subscriber_client_interface.py index e9e43113..8dd23506 100644 --- a/google/cloud/pubsublite/cloudpubsub/subscriber_client_interface.py +++ b/google/cloud/pubsublite/cloudpubsub/subscriber_client_interface.py @@ -81,7 +81,7 @@ class SubscriberClientInterface(ContextManager): def subscribe( self, subscription: Union[SubscriptionPath, str], - callback: Callable, # TODO(dpcollins): Change to MessageCallback, + callback: MessageCallback, per_partition_flow_control_settings: FlowControlSettings, fixed_partitions: Optional[Set[Partition]] = None, ) -> StreamingPullFuture: diff --git a/setup.py b/setup.py index ff939886..14a34701 100644 --- a/setup.py +++ b/setup.py @@ -29,7 +29,7 @@ dependencies = [ "google-cloud-pubsub >= 2.1.0, <3.0.0dev", - "overrides>=2.0.0, <7.0.0", + "overrides>=6.0.1, <7.0.0", ] setuptools.setup(