diff --git a/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_subscriber_client.py b/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_subscriber_client.py index 928984ae..3c4d9dfe 100644 --- a/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_subscriber_client.py +++ b/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_subscriber_client.py @@ -13,7 +13,7 @@ # limitations under the License. from concurrent.futures.thread import ThreadPoolExecutor -from typing import Union, Optional, Set +from typing import Union, Optional, Set, Callable, cast from google.cloud.pubsub_v1.subscriber.futures import StreamingPullFuture @@ -61,12 +61,13 @@ def cancel_streaming_pull_future(fut: StreamingPullFuture): def subscribe( self, subscription: Union[SubscriptionPath, str], - callback: MessageCallback, + callback: Callable, # TODO(dpcollins): Change to MessageCallback, per_partition_flow_control_settings: FlowControlSettings, fixed_partitions: Optional[Set[Partition]] = None, ) -> StreamingPullFuture: if isinstance(subscription, str): subscription = SubscriptionPath.parse(subscription) + callback = cast(MessageCallback, callback) def create_and_open(): underlying = self._underlying_factory( diff --git a/google/cloud/pubsublite/cloudpubsub/subscriber_client.py b/google/cloud/pubsublite/cloudpubsub/subscriber_client.py index 13705beb..12ecdae5 100644 --- a/google/cloud/pubsublite/cloudpubsub/subscriber_client.py +++ b/google/cloud/pubsublite/cloudpubsub/subscriber_client.py @@ -13,7 +13,7 @@ # limitations under the License. from concurrent.futures.thread import ThreadPoolExecutor -from typing import Optional, Union, Set, AsyncIterator +from typing import Optional, Union, Set, AsyncIterator, Callable from google.api_core.client_options import ClientOptions from google.auth.credentials import Credentials @@ -33,7 +33,6 @@ from google.cloud.pubsublite.cloudpubsub.nack_handler import NackHandler from google.cloud.pubsublite.cloudpubsub.subscriber_client_interface import ( SubscriberClientInterface, - MessageCallback, AsyncSubscriberClientInterface, ) from google.cloud.pubsublite.internal.constructable_from_service_account import ( @@ -100,7 +99,7 @@ def __init__( def subscribe( self, subscription: Union[SubscriptionPath, str], - callback: MessageCallback, + callback: Callable, # TODO(dpcollins): Change to MessageCallback, per_partition_flow_control_settings: FlowControlSettings, fixed_partitions: Optional[Set[Partition]] = None, ) -> StreamingPullFuture: diff --git a/google/cloud/pubsublite/cloudpubsub/subscriber_client_interface.py b/google/cloud/pubsublite/cloudpubsub/subscriber_client_interface.py index 8dd23506..e9e43113 100644 --- a/google/cloud/pubsublite/cloudpubsub/subscriber_client_interface.py +++ b/google/cloud/pubsublite/cloudpubsub/subscriber_client_interface.py @@ -81,7 +81,7 @@ class SubscriberClientInterface(ContextManager): def subscribe( self, subscription: Union[SubscriptionPath, str], - callback: MessageCallback, + callback: Callable, # TODO(dpcollins): Change to MessageCallback, per_partition_flow_control_settings: FlowControlSettings, fixed_partitions: Optional[Set[Partition]] = None, ) -> StreamingPullFuture: