diff --git a/google/cloud/pubsublite/__init__.py b/google/cloud/pubsublite/__init__.py index 0ba5d579..86db3807 100644 --- a/google/cloud/pubsublite/__init__.py +++ b/google/cloud/pubsublite/__init__.py @@ -62,6 +62,9 @@ from google.cloud.pubsublite_v1.types.admin import ListTopicSubscriptionsResponse from google.cloud.pubsublite_v1.types.admin import ListTopicsRequest from google.cloud.pubsublite_v1.types.admin import ListTopicsResponse +from google.cloud.pubsublite_v1.types.admin import OperationMetadata +from google.cloud.pubsublite_v1.types.admin import SeekSubscriptionRequest +from google.cloud.pubsublite_v1.types.admin import SeekSubscriptionResponse from google.cloud.pubsublite_v1.types.admin import TopicPartitions from google.cloud.pubsublite_v1.types.admin import UpdateSubscriptionRequest from google.cloud.pubsublite_v1.types.admin import UpdateTopicRequest @@ -70,6 +73,7 @@ from google.cloud.pubsublite_v1.types.common import PubSubMessage from google.cloud.pubsublite_v1.types.common import SequencedMessage from google.cloud.pubsublite_v1.types.common import Subscription +from google.cloud.pubsublite_v1.types.common import TimeTarget from google.cloud.pubsublite_v1.types.common import Topic from google.cloud.pubsublite_v1.types.cursor import CommitCursorRequest from google.cloud.pubsublite_v1.types.cursor import CommitCursorResponse @@ -148,6 +152,7 @@ "MessagePublishRequest", "MessagePublishResponse", "MessageResponse", + "OperationMetadata", "PartitionAssignment", "PartitionAssignmentAck", "PartitionAssignmentRequest", @@ -159,6 +164,8 @@ "PublishResponse", "PublisherServiceAsyncClient", "PublisherServiceClient", + "SeekSubscriptionRequest", + "SeekSubscriptionResponse", "SeekRequest", "SeekResponse", "SequencedCommitCursorRequest", @@ -171,6 +178,7 @@ "SubscriberServiceAsyncClient", "SubscriberServiceClient", "Subscription", + "TimeTarget", "Topic", "TopicPartitions", "TopicStatsServiceAsyncClient", diff --git a/google/cloud/pubsublite/admin_client.py b/google/cloud/pubsublite/admin_client.py index bc7622c8..7dd20491 100644 --- a/google/cloud/pubsublite/admin_client.py +++ b/google/cloud/pubsublite/admin_client.py @@ -12,10 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Optional, List +from typing import Optional, List, Union from overrides import overrides from google.api_core.client_options import ClientOptions +from google.api_core.operation import Operation from google.auth.credentials import Credentials from google.protobuf.field_mask_pb2 import FieldMask @@ -31,6 +32,8 @@ LocationPath, TopicPath, BacklogLocation, + PublishTime, + EventTime, ) from google.cloud.pubsublite.types.paths import ReservationPath from google.cloud.pubsublite_v1 import ( @@ -129,6 +132,14 @@ def update_subscription( ) -> Subscription: return self._impl.update_subscription(subscription, update_mask) + @overrides + def seek_subscription( + self, + subscription_path: SubscriptionPath, + target: Union[BacklogLocation, PublishTime, EventTime], + ) -> Operation: + return self._impl.seek_subscription(subscription_path, target) + @overrides def delete_subscription(self, subscription_path: SubscriptionPath): return self._impl.delete_subscription(subscription_path) diff --git a/google/cloud/pubsublite/admin_client_interface.py b/google/cloud/pubsublite/admin_client_interface.py index c300351d..74f8f6b3 100644 --- a/google/cloud/pubsublite/admin_client_interface.py +++ b/google/cloud/pubsublite/admin_client_interface.py @@ -13,14 +13,17 @@ # limitations under the License. from abc import ABC, abstractmethod -from typing import List +from typing import List, Union +from google.api_core.operation import Operation from google.cloud.pubsublite.types import ( CloudRegion, TopicPath, LocationPath, SubscriptionPath, BacklogLocation, + PublishTime, + EventTime, ) from google.cloud.pubsublite.types.paths import ReservationPath from google.cloud.pubsublite_v1 import Topic, Subscription, Reservation @@ -88,6 +91,32 @@ def update_subscription( ) -> Subscription: """Update the masked fields of the provided subscription.""" + @abstractmethod + def seek_subscription( + self, + subscription_path: SubscriptionPath, + target: Union[BacklogLocation, PublishTime, EventTime], + ) -> Operation: + """Initiate an out-of-band seek for a subscription to a specified target. + + If an operation is returned, the seek has been registered and subscribers + will eventually receive messages from the seek target, as long as the + subscriber client supports out-of-band seeks. The seek operation will be + aborted for unsupported clients, or if it is superseded by a newer seek + invocation for the same subscription. + + To determine when subscribers react to the seek (or it has been aborted), + wait for the returned operation to complete. The operation will succeed and + complete once subscribers are receiving messages from the seek target for all + partitions of the topic. The operation will not complete until all + subscribers come online. + + Returns: + google.api_core.operation.Operation with: + result type: google.cloud.pubsublite.SeekSubscriptionResponse + metadata type: google.cloud.pubsublite.OperationMetadata + """ + @abstractmethod def delete_subscription(self, subscription_path: SubscriptionPath): """Delete a subscription and all associated messages.""" diff --git a/google/cloud/pubsublite/internal/wire/admin_client_impl.py b/google/cloud/pubsublite/internal/wire/admin_client_impl.py index 3dc2cdcb..be7152e9 100644 --- a/google/cloud/pubsublite/internal/wire/admin_client_impl.py +++ b/google/cloud/pubsublite/internal/wire/admin_client_impl.py @@ -12,9 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import List +from typing import List, Union +from google.api_core.exceptions import InvalidArgument +from google.api_core.operation import Operation from google.protobuf.field_mask_pb2 import FieldMask +from google.protobuf.timestamp_pb2 import Timestamp from google.cloud.pubsublite.admin_client_interface import AdminClientInterface from google.cloud.pubsublite.types import ( @@ -23,6 +26,8 @@ LocationPath, TopicPath, BacklogLocation, + PublishTime, + EventTime, ) from google.cloud.pubsublite.types.paths import ReservationPath from google.cloud.pubsublite_v1 import ( @@ -31,6 +36,8 @@ AdminServiceClient, TopicPartitions, Reservation, + TimeTarget, + SeekSubscriptionRequest, ) @@ -105,6 +112,29 @@ def update_subscription( subscription=subscription, update_mask=update_mask ) + def seek_subscription( + self, + subscription_path: SubscriptionPath, + target: Union[BacklogLocation, PublishTime, EventTime], + ) -> Operation: + request = SeekSubscriptionRequest(name=str(subscription_path)) + if isinstance(target, PublishTime): + ts = Timestamp() + ts.FromDatetime(target.value) + request.time_target = TimeTarget(publish_time=ts) + elif isinstance(target, EventTime): + ts = Timestamp() + ts.FromDatetime(target.value) + request.time_target = TimeTarget(event_time=ts) + elif isinstance(target, BacklogLocation): + if target == BacklogLocation.END: + request.named_target = SeekSubscriptionRequest.NamedTarget.HEAD + else: + request.named_target = SeekSubscriptionRequest.NamedTarget.TAIL + else: + raise InvalidArgument("A valid seek target must be specified.") + return self._underlying.seek_subscription(request=request) + def delete_subscription(self, subscription_path: SubscriptionPath): self._underlying.delete_subscription(name=str(subscription_path)) diff --git a/google/cloud/pubsublite/types/__init__.py b/google/cloud/pubsublite/types/__init__.py index fbf5cc00..ef4d842c 100644 --- a/google/cloud/pubsublite/types/__init__.py +++ b/google/cloud/pubsublite/types/__init__.py @@ -18,7 +18,7 @@ from .paths import LocationPath, TopicPath, SubscriptionPath from .message_metadata import MessageMetadata from .flow_control_settings import FlowControlSettings, DISABLED_FLOW_CONTROL -from .backlog_location import BacklogLocation +from .backlog_location import BacklogLocation, PublishTime, EventTime __all__ = ( "CloudRegion", @@ -30,4 +30,6 @@ "SubscriptionPath", "TopicPath", "BacklogLocation", + "PublishTime", + "EventTime", ) diff --git a/google/cloud/pubsublite/types/backlog_location.py b/google/cloud/pubsublite/types/backlog_location.py index 42e64218..81a53e2c 100644 --- a/google/cloud/pubsublite/types/backlog_location.py +++ b/google/cloud/pubsublite/types/backlog_location.py @@ -13,6 +13,8 @@ # limitations under the License. import enum +from datetime import datetime +from typing import NamedTuple class BacklogLocation(enum.Enum): @@ -22,3 +24,11 @@ class BacklogLocation(enum.Enum): BEGINNING = 0 END = 1 + + +class PublishTime(NamedTuple): + value: datetime + + +class EventTime(NamedTuple): + value: datetime diff --git a/samples/snippets/quickstart_test.py b/samples/snippets/quickstart_test.py index 470ca32f..ffe0bd55 100644 --- a/samples/snippets/quickstart_test.py +++ b/samples/snippets/quickstart_test.py @@ -259,6 +259,16 @@ def test_subscriber_example(topic_path, subscription_path, capsys): assert f"Received {message}" in out +def test_seek_lite_subscription_example(capsys): + import seek_lite_subscription_example + + seek_lite_subscription_example.seek_lite_subscription( + PROJECT_NUMBER, CLOUD_REGION, ZONE_ID, SUBSCRIPTION_ID, "BEGINNING", False + ) + out, _ = capsys.readouterr() + assert "Seek operation" in out + + def test_delete_lite_subscription_example(subscription_path, capsys): import delete_lite_subscription_example diff --git a/samples/snippets/seek_lite_subscription_example.py b/samples/snippets/seek_lite_subscription_example.py new file mode 100644 index 00000000..f2ae7048 --- /dev/null +++ b/samples/snippets/seek_lite_subscription_example.py @@ -0,0 +1,85 @@ +#!/usr/bin/env python + +# Copyright 2021 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""This application demonstrates how to invoke an out-of-band seek for a +subscription with the Pub/Sub Lite API. +""" + +import argparse + + +def seek_lite_subscription(project_number, cloud_region, zone_id, subscription_id, target, wait_for_operation): + # [START pubsublite_seek_subscription] + from datetime import datetime + from google.api_core.exceptions import NotFound, GoogleAPICallError + from google.cloud.pubsublite import AdminClient + from google.cloud.pubsublite.types import CloudRegion, CloudZone, SubscriptionPath, BacklogLocation, PublishTime + + # TODO(developer): + # project_number = 1122334455 + # cloud_region = "us-central1" + # zone_id = "a" + # subscription_id = "your-subscription-id" + # target = "BEGINNING" + # wait_for_operation = 1 + + cloud_region = CloudRegion(cloud_region) + location = CloudZone(cloud_region, zone_id) + subscription_path = SubscriptionPath(project_number, location, subscription_id) + + if target == "BEGINNING": + seek_target = BacklogLocation.BEGINNING + elif target == "END": + seek_target = BacklogLocation.END + else: + seek_target = PublishTime(datetime.strptime(target, "%Y-%m-%d %H:%M:%S")) + + client = AdminClient(cloud_region) + try: + seek_operation = client.seek_subscription(subscription_path, seek_target) + print(f"Seek operation: {seek_operation.operation.name}") + print(f"Metadata:\n{seek_operation.metadata}") + except NotFound: + print(f"{subscription_path} not found.") + + # Note: In order for the operation to complete, a subscriber must be + # receiving messages for the subscription. + if wait_for_operation: + print("Waiting for operation to complete...") + try: + seek_operation.result() + print(f"Operation completed. Metadata:\n{seek_operation.metadata}") + except GoogleAPICallError as e: + print(e) + # [END pubsublite_seek_subscription] + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, + ) + parser.add_argument("project_number", help="Your Google Cloud Project Number") + parser.add_argument("cloud_region", help="Your Cloud Region, e.g. 'us-central1'") + parser.add_argument("zone_id", help="Your Zone ID, e.g. 'a'") + parser.add_argument("subscription_id", help="Your subscription ID") + parser.add_argument("--target", default="BEGINNING", help="Seek target, e.g. 'BEGINNING, 'END' or a timestamp") + parser.add_argument("--wait_for_operation", help="Wait for the seek operation to complete") + + args = parser.parse_args() + + seek_lite_subscription( + args.project_number, args.cloud_region, args.zone_id, args.subscription_id, args.target, args.wait_for_operation + )