From 03951beb45e65b771210a244c097b4fddd191eff Mon Sep 17 00:00:00 2001 From: hannahrogers-google Date: Wed, 24 Feb 2021 17:15:27 -0500 Subject: [PATCH 01/12] feat: adding ability to create subscriptions at head --- google/cloud/pubsublite/admin_client.py | 5 +++++ google/cloud/pubsublite/admin_client_interface.py | 13 +++++++++++++ .../pubsublite/internal/wire/admin_client_impl.py | 6 ++++++ 3 files changed, 24 insertions(+) diff --git a/google/cloud/pubsublite/admin_client.py b/google/cloud/pubsublite/admin_client.py index d5e82197..f83e449e 100644 --- a/google/cloud/pubsublite/admin_client.py +++ b/google/cloud/pubsublite/admin_client.py @@ -103,6 +103,11 @@ def list_topic_subscriptions(self, topic_path: TopicPath): @overrides def create_subscription(self, subscription: Subscription) -> Subscription: return self._impl.create_subscription(subscription) + @overrides + def create_subscription( + self, subscription: Subscription, location: AdminClientInterface.CursorLocation + ) -> Subscription: + return self._impl.create_subscription(subscription, location) @overrides def get_subscription(self, subscription_path: SubscriptionPath) -> Subscription: diff --git a/google/cloud/pubsublite/admin_client_interface.py b/google/cloud/pubsublite/admin_client_interface.py index 2c59a102..68b590bc 100644 --- a/google/cloud/pubsublite/admin_client_interface.py +++ b/google/cloud/pubsublite/admin_client_interface.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import proto + from abc import ABC, abstractmethod from typing import List @@ -30,6 +32,11 @@ class AdminClientInterface(ABC): An admin client for Pub/Sub Lite. Only operates on a single region. """ + class CursorLocation(proto.Enum): + """The location of the cursor on a subscription.""" + BEGINNING = 0 + END = 1 + @abstractmethod def region(self) -> CloudRegion: """The region this client is for.""" @@ -66,6 +73,12 @@ def list_topic_subscriptions(self, topic_path: TopicPath): def create_subscription(self, subscription: Subscription) -> Subscription: """Create a subscription, returns the created subscription.""" + @abstractmethod + def create_subscription( + self, subscription: Subscription, location: CursorLocation + ) -> Subscription: + """Create a subscription at the given location, returns the created subscription.""" + @abstractmethod def get_subscription(self, subscription_path: SubscriptionPath) -> Subscription: """Get the subscription object from the server.""" diff --git a/google/cloud/pubsublite/internal/wire/admin_client_impl.py b/google/cloud/pubsublite/internal/wire/admin_client_impl.py index 234916b9..9ecf4186 100644 --- a/google/cloud/pubsublite/internal/wire/admin_client_impl.py +++ b/google/cloud/pubsublite/internal/wire/admin_client_impl.py @@ -73,11 +73,17 @@ def list_topic_subscriptions(self, topic_path: TopicPath): return [SubscriptionPath.parse(x) for x in subscription_strings] def create_subscription(self, subscription: Subscription) -> Subscription: + return create_subscription(subscription, AdminClientInterface.CursorLocation.BEGINNING) + + def create_subscription( + self, subscription: Subscription, location: AdminClientInterface.CursorLocation + ) -> Subscription: path = SubscriptionPath.parse(subscription.name) return self._underlying.create_subscription( parent=str(path.to_location_path()), subscription=subscription, subscription_id=path.name, + #skip_backlog=(location == AdminClientInterface.CursorLocation.END), ) def get_subscription(self, subscription_path: SubscriptionPath) -> Subscription: From 94d0fed233bb7c406cfa241e97ae7fda64b55f13 Mon Sep 17 00:00:00 2001 From: hannahrogers-google Date: Wed, 24 Feb 2021 17:38:38 -0500 Subject: [PATCH 02/12] fix: lint errors --- google/cloud/pubsublite/admin_client.py | 1 + 1 file changed, 1 insertion(+) diff --git a/google/cloud/pubsublite/admin_client.py b/google/cloud/pubsublite/admin_client.py index f83e449e..d22e8f50 100644 --- a/google/cloud/pubsublite/admin_client.py +++ b/google/cloud/pubsublite/admin_client.py @@ -103,6 +103,7 @@ def list_topic_subscriptions(self, topic_path: TopicPath): @overrides def create_subscription(self, subscription: Subscription) -> Subscription: return self._impl.create_subscription(subscription) + @overrides def create_subscription( self, subscription: Subscription, location: AdminClientInterface.CursorLocation From 730a278358ad7d31c1c195ddbfba711de8182621 Mon Sep 17 00:00:00 2001 From: hannahrogers-google Date: Thu, 4 Mar 2021 12:30:34 -0500 Subject: [PATCH 03/12] fix: remove absl dependency --- google/cloud/pubsublite/internal/wire/assigner_impl.py | 6 ++++-- google/cloud/pubsublite/internal/wire/committer_impl.py | 7 +++++-- google/cloud/pubsublite/internal/wire/pubsub_context.py | 7 +++++-- .../pubsublite/internal/wire/single_partition_publisher.py | 6 ++++-- setup.py | 1 - testing/constraints-3.6.txt | 3 +-- 6 files changed, 19 insertions(+), 11 deletions(-) diff --git a/google/cloud/pubsublite/internal/wire/assigner_impl.py b/google/cloud/pubsublite/internal/wire/assigner_impl.py index 13db5b89..b4ce786e 100644 --- a/google/cloud/pubsublite/internal/wire/assigner_impl.py +++ b/google/cloud/pubsublite/internal/wire/assigner_impl.py @@ -15,7 +15,7 @@ import asyncio from typing import Optional, Set -from absl import logging +import logging from google.cloud.pubsublite.internal.wait_ignore_cancelled import wait_ignore_errors from google.cloud.pubsublite.internal.wire.assigner import Assigner @@ -36,6 +36,8 @@ PartitionAssignmentAck, ) +_LOGGER = logging.getLogger(__name__) + # Maximum bytes per batch at 3.5 MiB to avoid GRPC limit of 4 MiB _MAX_BYTES = int(3.5 * 1024 * 1024) @@ -120,7 +122,7 @@ async def get_assignment(self) -> Set[Partition]: self._outstanding_assignment = False except GoogleAPICallError as e: # If there is a failure to ack, keep going. The stream likely restarted. - logging.debug( + _LOGGER.debug( f"Assignment ack attempt failed due to stream failure: {e}" ) return await self._connection.await_unless_failed(self._new_assignment.get()) diff --git a/google/cloud/pubsublite/internal/wire/committer_impl.py b/google/cloud/pubsublite/internal/wire/committer_impl.py index 115845fe..df7dab18 100644 --- a/google/cloud/pubsublite/internal/wire/committer_impl.py +++ b/google/cloud/pubsublite/internal/wire/committer_impl.py @@ -15,7 +15,7 @@ import asyncio from typing import Optional, List, Iterable -from absl import logging +import logging from google.cloud.pubsublite.internal.wait_ignore_cancelled import wait_ignore_errors from google.cloud.pubsublite.internal.wire.committer import Committer @@ -41,6 +41,9 @@ from google.cloud.pubsublite.internal.wire.work_item import WorkItem +_LOGGER = logging.getLogger(__name__) + + class CommitterImpl( Committer, ConnectionReinitializer[ @@ -149,7 +152,7 @@ async def _flush(self): try: await self._connection.write(req) except GoogleAPICallError as e: - logging.debug(f"Failed commit on stream: {e}") + _LOGGER.debug(f"Failed commit on stream: {e}") self._fail_if_retrying_failed() async def commit(self, cursor: Cursor) -> None: diff --git a/google/cloud/pubsublite/internal/wire/pubsub_context.py b/google/cloud/pubsublite/internal/wire/pubsub_context.py index 032f5fc7..5cbad6f6 100644 --- a/google/cloud/pubsublite/internal/wire/pubsub_context.py +++ b/google/cloud/pubsublite/internal/wire/pubsub_context.py @@ -15,11 +15,14 @@ from base64 import b64encode from typing import Mapping, Optional, NamedTuple -from absl import logging +import logging import pkg_resources from google.protobuf import struct_pb2 +_LOGGER = logging.getLogger(__name__) + + class _Semver(NamedTuple): major: int minor: int @@ -29,7 +32,7 @@ def _version() -> _Semver: version = pkg_resources.get_distribution("google-cloud-pubsublite").version splits = version.split(".") if len(splits) != 3: - logging.info(f"Failed to extract semver from {version}.") + _LOGGER.info(f"Failed to extract semver from {version}.") return _Semver(0, 0) return _Semver(int(splits[0]), int(splits[1])) diff --git a/google/cloud/pubsublite/internal/wire/single_partition_publisher.py b/google/cloud/pubsublite/internal/wire/single_partition_publisher.py index 411d9b13..43b04127 100644 --- a/google/cloud/pubsublite/internal/wire/single_partition_publisher.py +++ b/google/cloud/pubsublite/internal/wire/single_partition_publisher.py @@ -15,7 +15,7 @@ import asyncio from typing import Optional, List, Iterable -from absl import logging +import logging from google.cloud.pubsub_v1.types import BatchSettings from google.cloud.pubsublite.internal.wait_ignore_cancelled import wait_ignore_errors @@ -43,6 +43,8 @@ ) from google.cloud.pubsublite.internal.wire.work_item import WorkItem +_LOGGER = logging.getLogger(__name__) + # Maximum bytes per batch at 3.5 MiB to avoid GRPC limit of 4 MiB _MAX_BYTES = int(3.5 * 1024 * 1024) @@ -156,7 +158,7 @@ async def _flush(self): try: await self._connection.write(aggregate) except GoogleAPICallError as e: - logging.debug(f"Failed publish on stream: {e}") + _LOGGER.debug(f"Failed publish on stream: {e}") self._fail_if_retrying_failed() async def publish(self, message: PubSubMessage) -> MessageMetadata: diff --git a/setup.py b/setup.py index 53bd470e..b638ab6b 100644 --- a/setup.py +++ b/setup.py @@ -29,7 +29,6 @@ dependencies = [ "google-api-core >= 1.22.0", - "absl-py >= 0.9.0", "proto-plus >= 0.4.0", "google-cloud-pubsub >= 2.1.0", "grpcio", diff --git a/testing/constraints-3.6.txt b/testing/constraints-3.6.txt index 5886f65d..8bf36590 100644 --- a/testing/constraints-3.6.txt +++ b/testing/constraints-3.6.txt @@ -6,9 +6,8 @@ # e.g., if setup.py has "foo >= 1.14.0, < 2.0.0dev", # Then this file should have foo==1.14.0 google-api-core==1.22.0 -absl-py==0.9.0 proto-plus==0.4.0 google-cloud-pubsub==2.1.0 grpcio==100000.0.0 setuptools==100000.0.0 -overrides==100000.0.0 \ No newline at end of file +overrides==100000.0.0 From cfad1e02eeba2155cd68010d7e5bb4453fa2055c Mon Sep 17 00:00:00 2001 From: hannahrogers-google Date: Tue, 16 Mar 2021 11:08:25 -0400 Subject: [PATCH 04/12] fix: lint --- google/cloud/pubsublite/admin_client.py | 7 +++--- .../pubsublite/admin_client_interface.py | 14 ++++-------- .../internal/wire/admin_client_impl.py | 9 ++++---- google/cloud/pubsublite/types/__init__.py | 2 ++ .../cloud/pubsublite/types/offset_location.py | 22 +++++++++++++++++++ 5 files changed, 37 insertions(+), 17 deletions(-) create mode 100644 google/cloud/pubsublite/types/offset_location.py diff --git a/google/cloud/pubsublite/admin_client.py b/google/cloud/pubsublite/admin_client.py index d22e8f50..dab25131 100644 --- a/google/cloud/pubsublite/admin_client.py +++ b/google/cloud/pubsublite/admin_client.py @@ -30,6 +30,7 @@ SubscriptionPath, LocationPath, TopicPath, + OffsetLocation, ) from google.cloud.pubsublite_v1 import AdminServiceClient, Subscription, Topic @@ -105,10 +106,10 @@ def create_subscription(self, subscription: Subscription) -> Subscription: return self._impl.create_subscription(subscription) @overrides - def create_subscription( - self, subscription: Subscription, location: AdminClientInterface.CursorLocation + def create_subscription_at_offset( + self, subscription: Subscription, starting_offset: OffsetLocation ) -> Subscription: - return self._impl.create_subscription(subscription, location) + return self._impl.create_subscription(subscription, starting_offset) @overrides def get_subscription(self, subscription_path: SubscriptionPath) -> Subscription: diff --git a/google/cloud/pubsublite/admin_client_interface.py b/google/cloud/pubsublite/admin_client_interface.py index 68b590bc..50bd7d6e 100644 --- a/google/cloud/pubsublite/admin_client_interface.py +++ b/google/cloud/pubsublite/admin_client_interface.py @@ -12,8 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import proto - from abc import ABC, abstractmethod from typing import List @@ -22,6 +20,7 @@ TopicPath, LocationPath, SubscriptionPath, + OffsetLocation, ) from google.cloud.pubsublite_v1 import Topic, Subscription from google.protobuf.field_mask_pb2 import FieldMask @@ -32,11 +31,6 @@ class AdminClientInterface(ABC): An admin client for Pub/Sub Lite. Only operates on a single region. """ - class CursorLocation(proto.Enum): - """The location of the cursor on a subscription.""" - BEGINNING = 0 - END = 1 - @abstractmethod def region(self) -> CloudRegion: """The region this client is for.""" @@ -74,10 +68,10 @@ def create_subscription(self, subscription: Subscription) -> Subscription: """Create a subscription, returns the created subscription.""" @abstractmethod - def create_subscription( - self, subscription: Subscription, location: CursorLocation + def create_subscription_at_offset( + self, subscription: Subscription, starting_offset: OffsetLocation ) -> Subscription: - """Create a subscription at the given location, returns the created subscription.""" + """Create a subscription at the given starting offset, returns the created subscription.""" @abstractmethod def get_subscription(self, subscription_path: SubscriptionPath) -> Subscription: diff --git a/google/cloud/pubsublite/internal/wire/admin_client_impl.py b/google/cloud/pubsublite/internal/wire/admin_client_impl.py index 9ecf4186..def502e5 100644 --- a/google/cloud/pubsublite/internal/wire/admin_client_impl.py +++ b/google/cloud/pubsublite/internal/wire/admin_client_impl.py @@ -22,6 +22,7 @@ SubscriptionPath, LocationPath, TopicPath, + OffsetLocation, ) from google.cloud.pubsublite_v1 import ( Subscription, @@ -73,17 +74,17 @@ def list_topic_subscriptions(self, topic_path: TopicPath): return [SubscriptionPath.parse(x) for x in subscription_strings] def create_subscription(self, subscription: Subscription) -> Subscription: - return create_subscription(subscription, AdminClientInterface.CursorLocation.BEGINNING) + return self.create_subscription_at_offset(subscription, OffsetLocation.END) - def create_subscription( - self, subscription: Subscription, location: AdminClientInterface.CursorLocation + def create_subscription_at_offset( + self, subscription: Subscription, starting_offset: OffsetLocation ) -> Subscription: path = SubscriptionPath.parse(subscription.name) return self._underlying.create_subscription( parent=str(path.to_location_path()), subscription=subscription, subscription_id=path.name, - #skip_backlog=(location == AdminClientInterface.CursorLocation.END), + skip_backlog=(starting_offset == OffsetLocation.END), ) def get_subscription(self, subscription_path: SubscriptionPath) -> Subscription: diff --git a/google/cloud/pubsublite/types/__init__.py b/google/cloud/pubsublite/types/__init__.py index 6304fd3c..cfb662ef 100644 --- a/google/cloud/pubsublite/types/__init__.py +++ b/google/cloud/pubsublite/types/__init__.py @@ -18,6 +18,7 @@ from .paths import LocationPath, TopicPath, SubscriptionPath from .message_metadata import MessageMetadata from .flow_control_settings import FlowControlSettings, DISABLED_FLOW_CONTROL +from .offset_location import OffsetLocation __all__ = ( "CloudRegion", @@ -28,4 +29,5 @@ "MessageMetadata", "SubscriptionPath", "TopicPath", + "OffsetLocation", ) diff --git a/google/cloud/pubsublite/types/offset_location.py b/google/cloud/pubsublite/types/offset_location.py new file mode 100644 index 00000000..bf145e21 --- /dev/null +++ b/google/cloud/pubsublite/types/offset_location.py @@ -0,0 +1,22 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import enum + + +class OffsetLocation(enum.Enum): + """The location of an offset with respect to the message backlog.""" + + BEGINNING = 0 + END = 1 From 02694ad3f47f90cf9f528606170a1c87a7ab684b Mon Sep 17 00:00:00 2001 From: hannahrogers-google Date: Tue, 16 Mar 2021 15:04:09 -0400 Subject: [PATCH 05/12] feat: use default keyword args --- google/cloud/pubsublite/admin_client.py | 10 ++++------ google/cloud/pubsublite/admin_client_interface.py | 14 +++++++------- .../pubsublite/internal/wire/admin_client_impl.py | 9 ++++----- 3 files changed, 15 insertions(+), 18 deletions(-) diff --git a/google/cloud/pubsublite/admin_client.py b/google/cloud/pubsublite/admin_client.py index dab25131..bd235083 100644 --- a/google/cloud/pubsublite/admin_client.py +++ b/google/cloud/pubsublite/admin_client.py @@ -102,12 +102,10 @@ def list_topic_subscriptions(self, topic_path: TopicPath): return self._impl.list_topic_subscriptions(topic_path) @overrides - def create_subscription(self, subscription: Subscription) -> Subscription: - return self._impl.create_subscription(subscription) - - @overrides - def create_subscription_at_offset( - self, subscription: Subscription, starting_offset: OffsetLocation + def create_subscription( + self, + subscription: Subscription, + starting_offset: OffsetLocation = OffsetLocation.END, ) -> Subscription: return self._impl.create_subscription(subscription, starting_offset) diff --git a/google/cloud/pubsublite/admin_client_interface.py b/google/cloud/pubsublite/admin_client_interface.py index 50bd7d6e..82a430ed 100644 --- a/google/cloud/pubsublite/admin_client_interface.py +++ b/google/cloud/pubsublite/admin_client_interface.py @@ -64,14 +64,14 @@ def list_topic_subscriptions(self, topic_path: TopicPath): """List the subscriptions that exist for a given topic.""" @abstractmethod - def create_subscription(self, subscription: Subscription) -> Subscription: - """Create a subscription, returns the created subscription.""" - - @abstractmethod - def create_subscription_at_offset( - self, subscription: Subscription, starting_offset: OffsetLocation + def create_subscription( + self, + subscription: Subscription, + starting_offset: OffsetLocation = OffsetLocation.END, ) -> Subscription: - """Create a subscription at the given starting offset, returns the created subscription.""" + """Create a subscription, returns the created subscription. By default + a subscription will only receive messages published after the + subscription was created.""" @abstractmethod def get_subscription(self, subscription_path: SubscriptionPath) -> Subscription: diff --git a/google/cloud/pubsublite/internal/wire/admin_client_impl.py b/google/cloud/pubsublite/internal/wire/admin_client_impl.py index def502e5..cf3166a3 100644 --- a/google/cloud/pubsublite/internal/wire/admin_client_impl.py +++ b/google/cloud/pubsublite/internal/wire/admin_client_impl.py @@ -73,11 +73,10 @@ def list_topic_subscriptions(self, topic_path: TopicPath): ] return [SubscriptionPath.parse(x) for x in subscription_strings] - def create_subscription(self, subscription: Subscription) -> Subscription: - return self.create_subscription_at_offset(subscription, OffsetLocation.END) - - def create_subscription_at_offset( - self, subscription: Subscription, starting_offset: OffsetLocation + def create_subscription( + self, + subscription: Subscription, + starting_offset: OffsetLocation = OffsetLocation.END, ) -> Subscription: path = SubscriptionPath.parse(subscription.name) return self._underlying.create_subscription( From abf1a4f13b865f9e07ccf921ae6fcf5a6fcdea71 Mon Sep 17 00:00:00 2001 From: hannahrogers-google Date: Wed, 17 Mar 2021 10:59:07 -0400 Subject: [PATCH 06/12] fix: rename offset location to backlog location --- google/cloud/pubsublite/admin_client.py | 4 ++-- google/cloud/pubsublite/admin_client_interface.py | 4 ++-- google/cloud/pubsublite/internal/wire/admin_client_impl.py | 6 +++--- google/cloud/pubsublite/types/__init__.py | 4 ++-- .../types/{offset_location.py => backlog_location.py} | 6 ++++-- 5 files changed, 13 insertions(+), 11 deletions(-) rename google/cloud/pubsublite/types/{offset_location.py => backlog_location.py} (69%) diff --git a/google/cloud/pubsublite/admin_client.py b/google/cloud/pubsublite/admin_client.py index bd235083..7d7a8dd5 100644 --- a/google/cloud/pubsublite/admin_client.py +++ b/google/cloud/pubsublite/admin_client.py @@ -30,7 +30,7 @@ SubscriptionPath, LocationPath, TopicPath, - OffsetLocation, + BacklogLocation, ) from google.cloud.pubsublite_v1 import AdminServiceClient, Subscription, Topic @@ -105,7 +105,7 @@ def list_topic_subscriptions(self, topic_path: TopicPath): def create_subscription( self, subscription: Subscription, - starting_offset: OffsetLocation = OffsetLocation.END, + starting_offset: BacklogLocation = BacklogLocation.END, ) -> Subscription: return self._impl.create_subscription(subscription, starting_offset) diff --git a/google/cloud/pubsublite/admin_client_interface.py b/google/cloud/pubsublite/admin_client_interface.py index 82a430ed..38b90ba3 100644 --- a/google/cloud/pubsublite/admin_client_interface.py +++ b/google/cloud/pubsublite/admin_client_interface.py @@ -20,7 +20,7 @@ TopicPath, LocationPath, SubscriptionPath, - OffsetLocation, + BacklogLocation, ) from google.cloud.pubsublite_v1 import Topic, Subscription from google.protobuf.field_mask_pb2 import FieldMask @@ -67,7 +67,7 @@ def list_topic_subscriptions(self, topic_path: TopicPath): def create_subscription( self, subscription: Subscription, - starting_offset: OffsetLocation = OffsetLocation.END, + starting_offset: BacklogLocation = BacklogLocation.END, ) -> Subscription: """Create a subscription, returns the created subscription. By default a subscription will only receive messages published after the diff --git a/google/cloud/pubsublite/internal/wire/admin_client_impl.py b/google/cloud/pubsublite/internal/wire/admin_client_impl.py index cf3166a3..00cb6e42 100644 --- a/google/cloud/pubsublite/internal/wire/admin_client_impl.py +++ b/google/cloud/pubsublite/internal/wire/admin_client_impl.py @@ -22,7 +22,7 @@ SubscriptionPath, LocationPath, TopicPath, - OffsetLocation, + BacklogLocation, ) from google.cloud.pubsublite_v1 import ( Subscription, @@ -76,14 +76,14 @@ def list_topic_subscriptions(self, topic_path: TopicPath): def create_subscription( self, subscription: Subscription, - starting_offset: OffsetLocation = OffsetLocation.END, + starting_offset: BacklogLocation = BacklogLocation.END, ) -> Subscription: path = SubscriptionPath.parse(subscription.name) return self._underlying.create_subscription( parent=str(path.to_location_path()), subscription=subscription, subscription_id=path.name, - skip_backlog=(starting_offset == OffsetLocation.END), + skip_backlog=(starting_offset == BacklogLocation.END), ) def get_subscription(self, subscription_path: SubscriptionPath) -> Subscription: diff --git a/google/cloud/pubsublite/types/__init__.py b/google/cloud/pubsublite/types/__init__.py index cfb662ef..fbf5cc00 100644 --- a/google/cloud/pubsublite/types/__init__.py +++ b/google/cloud/pubsublite/types/__init__.py @@ -18,7 +18,7 @@ from .paths import LocationPath, TopicPath, SubscriptionPath from .message_metadata import MessageMetadata from .flow_control_settings import FlowControlSettings, DISABLED_FLOW_CONTROL -from .offset_location import OffsetLocation +from .backlog_location import BacklogLocation __all__ = ( "CloudRegion", @@ -29,5 +29,5 @@ "MessageMetadata", "SubscriptionPath", "TopicPath", - "OffsetLocation", + "BacklogLocation", ) diff --git a/google/cloud/pubsublite/types/offset_location.py b/google/cloud/pubsublite/types/backlog_location.py similarity index 69% rename from google/cloud/pubsublite/types/offset_location.py rename to google/cloud/pubsublite/types/backlog_location.py index bf145e21..42e64218 100644 --- a/google/cloud/pubsublite/types/offset_location.py +++ b/google/cloud/pubsublite/types/backlog_location.py @@ -15,8 +15,10 @@ import enum -class OffsetLocation(enum.Enum): - """The location of an offset with respect to the message backlog.""" +class BacklogLocation(enum.Enum): + """A location with respect to the message backlog. BEGINNING refers to the + location of the oldest retained message. END refers to the location past + all currently published messages, skipping the entire message backlog.""" BEGINNING = 0 END = 1 From c7279ad67b29caff4ec0d1bd9a979986a28bcffa Mon Sep 17 00:00:00 2001 From: hannahrogers-google Date: Thu, 18 Mar 2021 16:20:35 -0400 Subject: [PATCH 07/12] fix: broken samples --- .../pubsublite/internal/wire/admin_client_impl.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/google/cloud/pubsublite/internal/wire/admin_client_impl.py b/google/cloud/pubsublite/internal/wire/admin_client_impl.py index 00cb6e42..ab5acba6 100644 --- a/google/cloud/pubsublite/internal/wire/admin_client_impl.py +++ b/google/cloud/pubsublite/internal/wire/admin_client_impl.py @@ -80,10 +80,12 @@ def create_subscription( ) -> Subscription: path = SubscriptionPath.parse(subscription.name) return self._underlying.create_subscription( - parent=str(path.to_location_path()), - subscription=subscription, - subscription_id=path.name, - skip_backlog=(starting_offset == BacklogLocation.END), + request={ + "parent": str(path.to_location_path()), + "subscription": subscription, + "subscription_id": path.name, + "skip_backlog": (starting_offset == BacklogLocation.END), + } ) def get_subscription(self, subscription_path: SubscriptionPath) -> Subscription: From 700fb7d582c67696d8075bf6bb88333a778bd567 Mon Sep 17 00:00:00 2001 From: hannahrogers-google Date: Wed, 21 Apr 2021 12:21:00 -0400 Subject: [PATCH 08/12] fix: do not crash if pubsublite distribution can not be found when extracting semver --- google/cloud/pubsublite/internal/wire/pubsub_context.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/google/cloud/pubsublite/internal/wire/pubsub_context.py b/google/cloud/pubsublite/internal/wire/pubsub_context.py index 5cbad6f6..96740187 100644 --- a/google/cloud/pubsublite/internal/wire/pubsub_context.py +++ b/google/cloud/pubsublite/internal/wire/pubsub_context.py @@ -29,7 +29,13 @@ class _Semver(NamedTuple): def _version() -> _Semver: - version = pkg_resources.get_distribution("google-cloud-pubsublite").version + try: + version = pkg_resources.get_distribution("google-cloud-pubsublite").version + except pkg_resources.DistributionNotFound: + _LOGGER.info( + "Failed to extract the google-cloud-pubsublite semver version. DistributionNotFound." + ) + return _Semver(0, 0) splits = version.split(".") if len(splits) != 3: _LOGGER.info(f"Failed to extract semver from {version}.") From bdee7eb18f8272c589a7521403e63780ae5b74b9 Mon Sep 17 00:00:00 2001 From: hannahrogers-google Date: Mon, 26 Apr 2021 17:58:19 -0400 Subject: [PATCH 09/12] fix: properly shutdown event loop when failing to initialize publisher --- .../internal/multiplexed_publisher_client.py | 11 ++++++++--- .../pubsublite/cloudpubsub/internal/publisher_impl.py | 7 ++++++- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_publisher_client.py b/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_publisher_client.py index 4bfe9cf5..9d2520ee 100644 --- a/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_publisher_client.py +++ b/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_publisher_client.py @@ -50,9 +50,14 @@ def publish( ) -> "Future[str]": if isinstance(topic, str): topic = TopicPath.parse(topic) - publisher = self._multiplexer.get_or_create( - topic, lambda: self._publisher_factory(topic).__enter__() - ) + try: + publisher = self._multiplexer.get_or_create( + topic, lambda: self._publisher_factory(topic).__enter__() + ) + except GoogleAPICallError as e: + failed = Future() + failed.set_exception(e) + return failed future = publisher.publish(data=data, ordering_key=ordering_key, **attrs) future.add_done_callback( lambda fut: self._on_future_completion(topic, publisher, fut) diff --git a/google/cloud/pubsublite/cloudpubsub/internal/publisher_impl.py b/google/cloud/pubsublite/cloudpubsub/internal/publisher_impl.py index 7f25e77d..84b5d57c 100644 --- a/google/cloud/pubsublite/cloudpubsub/internal/publisher_impl.py +++ b/google/cloud/pubsublite/cloudpubsub/internal/publisher_impl.py @@ -15,6 +15,7 @@ from concurrent.futures import Future from typing import Mapping +from google.api_core.exceptions import GoogleAPICallError from google.cloud.pubsublite.cloudpubsub.internal.managed_event_loop import ( ManagedEventLoop, ) @@ -42,7 +43,11 @@ def publish( def __enter__(self): self._managed_loop.__enter__() - self._managed_loop.submit(self._underlying.__aenter__()).result() + try: + self._managed_loop.submit(self._underlying.__aenter__()).result() + except GoogleAPICallError: + self._managed_loop.__exit__(None, None, None) + raise return self def __exit__(self, __exc_type, __exc_value, __traceback): From df63818e09d9a1573b07eda0b08a269c2b98ce46 Mon Sep 17 00:00:00 2001 From: Hannah Rogers Date: Wed, 5 May 2021 17:02:59 -0400 Subject: [PATCH 10/12] fix: ensure proper shutdown on failure --- .../internal/multiplexed_publisher_client.py | 10 +++++++++- .../pubsublite/cloudpubsub/internal/publisher_impl.py | 6 +----- .../wire/partition_count_watching_publisher.py | 5 +++-- 3 files changed, 13 insertions(+), 8 deletions(-) diff --git a/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_publisher_client.py b/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_publisher_client.py index 9d2520ee..55806ef4 100644 --- a/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_publisher_client.py +++ b/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_publisher_client.py @@ -52,7 +52,7 @@ def publish( topic = TopicPath.parse(topic) try: publisher = self._multiplexer.get_or_create( - topic, lambda: self._publisher_factory(topic).__enter__() + topic, lambda: self._create_and_start_publisher(topic) ) except GoogleAPICallError as e: failed = Future() @@ -64,6 +64,14 @@ def publish( ) return future + def _create_and_start_publisher(self, topic: Union[TopicPath, str]): + publisher = self._publisher_factory(topic) + try: + publisher.__enter__() + except GoogleAPICallError: + publisher.__exit__(None, None, None) + raise + def _on_future_completion( self, topic: TopicPath, publisher: SinglePublisher, future: "Future[str]" ): diff --git a/google/cloud/pubsublite/cloudpubsub/internal/publisher_impl.py b/google/cloud/pubsublite/cloudpubsub/internal/publisher_impl.py index 84b5d57c..4a0dfdf8 100644 --- a/google/cloud/pubsublite/cloudpubsub/internal/publisher_impl.py +++ b/google/cloud/pubsublite/cloudpubsub/internal/publisher_impl.py @@ -43,11 +43,7 @@ def publish( def __enter__(self): self._managed_loop.__enter__() - try: - self._managed_loop.submit(self._underlying.__aenter__()).result() - except GoogleAPICallError: - self._managed_loop.__exit__(None, None, None) - raise + self._managed_loop.submit(self._underlying.__aenter__()).result() return self def __exit__(self, __exc_type, __exc_value, __traceback): diff --git a/google/cloud/pubsublite/internal/wire/partition_count_watching_publisher.py b/google/cloud/pubsublite/internal/wire/partition_count_watching_publisher.py index ba3a962e..cf3aecf8 100644 --- a/google/cloud/pubsublite/internal/wire/partition_count_watching_publisher.py +++ b/google/cloud/pubsublite/internal/wire/partition_count_watching_publisher.py @@ -56,8 +56,9 @@ async def __aenter__(self): return self async def __aexit__(self, exc_type, exc_val, exc_tb): - self._partition_count_poller.cancel() - await wait_ignore_cancelled(self._partition_count_poller) + if hasattr(self, '_partition_count_poller'): + self._partition_count_poller.cancel() + await wait_ignore_cancelled(self._partition_count_poller) await self._watcher.__aexit__(exc_type, exc_val, exc_tb) for publisher in self._publishers.values(): await publisher.__aexit__(exc_type, exc_val, exc_tb) From 1e564345a40934f215ffdc8d1994865a12b931c1 Mon Sep 17 00:00:00 2001 From: Hannah Rogers Date: Wed, 5 May 2021 17:04:58 -0400 Subject: [PATCH 11/12] fix: remove unused dep --- google/cloud/pubsublite/cloudpubsub/internal/publisher_impl.py | 1 - 1 file changed, 1 deletion(-) diff --git a/google/cloud/pubsublite/cloudpubsub/internal/publisher_impl.py b/google/cloud/pubsublite/cloudpubsub/internal/publisher_impl.py index 7e8fd826..2e66a380 100644 --- a/google/cloud/pubsublite/cloudpubsub/internal/publisher_impl.py +++ b/google/cloud/pubsublite/cloudpubsub/internal/publisher_impl.py @@ -15,7 +15,6 @@ from concurrent.futures import Future from typing import Mapping -from google.api_core.exceptions import GoogleAPICallError from google.cloud.pubsublite.cloudpubsub.internal.managed_event_loop import ( ManagedEventLoop, ) From 2af6cbbd324d9c5515e4bd68c119f1516ebf8c39 Mon Sep 17 00:00:00 2001 From: Hannah Rogers Date: Wed, 12 May 2021 21:23:31 -0400 Subject: [PATCH 12/12] fix: adding tests and requested changes --- .../internal/multiplexed_publisher_client.py | 6 +- .../partition_count_watching_publisher.py | 5 +- log.txt | 0 .../multiplexed_publisher_client_test.py | 78 +++++++++++++++++++ ...partition_count_watching_publisher_test.py | 4 +- 5 files changed, 86 insertions(+), 7 deletions(-) create mode 100644 log.txt create mode 100644 tests/unit/pubsublite/cloudpubsub/internal/multiplexed_publisher_client_test.py diff --git a/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_publisher_client.py b/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_publisher_client.py index 798e4e83..d6d58fde 100644 --- a/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_publisher_client.py +++ b/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_publisher_client.py @@ -67,10 +67,10 @@ def publish( def _create_and_start_publisher(self, topic: Union[TopicPath, str]): publisher = self._publisher_factory(topic) try: - publisher.__enter__() + return publisher.__enter__() except GoogleAPICallError: - publisher.__exit__(None, None, None) - raise + publisher.__exit__(None, None, None) + raise def _on_future_completion( self, topic: TopicPath, publisher: SinglePublisher, future: "Future[str]" diff --git a/google/cloud/pubsublite/internal/wire/partition_count_watching_publisher.py b/google/cloud/pubsublite/internal/wire/partition_count_watching_publisher.py index cf3aecf8..b625a0dc 100644 --- a/google/cloud/pubsublite/internal/wire/partition_count_watching_publisher.py +++ b/google/cloud/pubsublite/internal/wire/partition_count_watching_publisher.py @@ -42,6 +42,7 @@ def __init__( self._publisher_factory = publisher_factory self._policy_factory = policy_factory self._watcher = watcher + self._partition_count_poller = None async def __aenter__(self): try: @@ -56,10 +57,10 @@ async def __aenter__(self): return self async def __aexit__(self, exc_type, exc_val, exc_tb): - if hasattr(self, '_partition_count_poller'): + if self._partition_count_poller is not None: self._partition_count_poller.cancel() await wait_ignore_cancelled(self._partition_count_poller) - await self._watcher.__aexit__(exc_type, exc_val, exc_tb) + await self._watcher.__aexit__(exc_type, exc_val, exc_tb) for publisher in self._publishers.values(): await publisher.__aexit__(exc_type, exc_val, exc_tb) diff --git a/log.txt b/log.txt new file mode 100644 index 00000000..e69de29b diff --git a/tests/unit/pubsublite/cloudpubsub/internal/multiplexed_publisher_client_test.py b/tests/unit/pubsublite/cloudpubsub/internal/multiplexed_publisher_client_test.py new file mode 100644 index 00000000..0b226257 --- /dev/null +++ b/tests/unit/pubsublite/cloudpubsub/internal/multiplexed_publisher_client_test.py @@ -0,0 +1,78 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from asynctest.mock import MagicMock +import pytest + +from google.cloud.pubsublite.cloudpubsub.internal.multiplexed_publisher_client import ( + MultiplexedPublisherClient, +) +from google.cloud.pubsublite.cloudpubsub.internal.single_publisher import ( + SinglePublisher, +) +from google.cloud.pubsublite.types import TopicPath +from google.api_core.exceptions import GoogleAPICallError + + +@pytest.fixture() +def topic1(): + return TopicPath.parse("projects/1/locations/us-central1-a/topics/topic1") + + +@pytest.fixture() +def topic2(): + return TopicPath.parse("projects/1/locations/us-central1-a/topics/topic2") + + +@pytest.fixture() +def topic1_publisher(): + topic1_publisher = MagicMock(spec=SinglePublisher) + return topic1_publisher + + +@pytest.fixture() +def topic2_publisher(): + topic2_publisher = MagicMock(spec=SinglePublisher) + return topic2_publisher + + +@pytest.fixture() +def multiplexed_publisher(topic1, topic1_publisher, topic2_publisher): + return MultiplexedPublisherClient( + lambda topic: topic1_publisher if topic == topic1 else topic2_publisher + ) + + +def test_multiplexed_publish( + topic1, topic2, topic1_publisher, topic2_publisher, multiplexed_publisher +): + topic1_publisher.__enter__.return_value = topic1_publisher + topic2_publisher.__enter__.return_value = topic2_publisher + with multiplexed_publisher: + multiplexed_publisher.publish(topic1, data=b"abc") + topic1_publisher.__enter__.assert_called_once() + topic1_publisher.publish.assert_called_once_with(data=b"abc", ordering_key="") + multiplexed_publisher.publish(topic2, data=b"abc") + topic2_publisher.__enter__.assert_called_once() + topic2_publisher.publish.assert_called_once_with(data=b"abc", ordering_key="") + topic1_publisher.__exit__.assert_called_once() + topic2_publisher.__exit__.assert_called_once() + + +def test_publisher_init_failure(topic1, topic1_publisher, multiplexed_publisher): + topic1_publisher.__enter__.side_effect = GoogleAPICallError("error") + with multiplexed_publisher: + future = multiplexed_publisher.publish(topic1, data=b"abc") + with pytest.raises(GoogleAPICallError): + future.result() diff --git a/tests/unit/pubsublite/internal/wire/partition_count_watching_publisher_test.py b/tests/unit/pubsublite/internal/wire/partition_count_watching_publisher_test.py index a31f12fa..ec9f6f4e 100644 --- a/tests/unit/pubsublite/internal/wire/partition_count_watching_publisher_test.py +++ b/tests/unit/pubsublite/internal/wire/partition_count_watching_publisher_test.py @@ -66,10 +66,10 @@ async def test_init(mock_watcher, publisher): async def test_failed_init(mock_watcher, publisher): mock_watcher.get_partition_count.side_effect = GoogleAPICallError("error") with pytest.raises(GoogleAPICallError): - async with publisher: - pass + await publisher.__aenter__() mock_watcher.__aenter__.assert_called_once() mock_watcher.__aexit__.assert_called_once() + await publisher.__aexit__(None, None, None) async def test_simple_publish(mock_publishers, mock_policies, mock_watcher, publisher):