diff --git a/google/cloud/pubsublite/__init__.py b/google/cloud/pubsublite/__init__.py index 0ba5d579..86db3807 100644 --- a/google/cloud/pubsublite/__init__.py +++ b/google/cloud/pubsublite/__init__.py @@ -62,6 +62,9 @@ from google.cloud.pubsublite_v1.types.admin import ListTopicSubscriptionsResponse from google.cloud.pubsublite_v1.types.admin import ListTopicsRequest from google.cloud.pubsublite_v1.types.admin import ListTopicsResponse +from google.cloud.pubsublite_v1.types.admin import OperationMetadata +from google.cloud.pubsublite_v1.types.admin import SeekSubscriptionRequest +from google.cloud.pubsublite_v1.types.admin import SeekSubscriptionResponse from google.cloud.pubsublite_v1.types.admin import TopicPartitions from google.cloud.pubsublite_v1.types.admin import UpdateSubscriptionRequest from google.cloud.pubsublite_v1.types.admin import UpdateTopicRequest @@ -70,6 +73,7 @@ from google.cloud.pubsublite_v1.types.common import PubSubMessage from google.cloud.pubsublite_v1.types.common import SequencedMessage from google.cloud.pubsublite_v1.types.common import Subscription +from google.cloud.pubsublite_v1.types.common import TimeTarget from google.cloud.pubsublite_v1.types.common import Topic from google.cloud.pubsublite_v1.types.cursor import CommitCursorRequest from google.cloud.pubsublite_v1.types.cursor import CommitCursorResponse @@ -148,6 +152,7 @@ "MessagePublishRequest", "MessagePublishResponse", "MessageResponse", + "OperationMetadata", "PartitionAssignment", "PartitionAssignmentAck", "PartitionAssignmentRequest", @@ -159,6 +164,8 @@ "PublishResponse", "PublisherServiceAsyncClient", "PublisherServiceClient", + "SeekSubscriptionRequest", + "SeekSubscriptionResponse", "SeekRequest", "SeekResponse", "SequencedCommitCursorRequest", @@ -171,6 +178,7 @@ "SubscriberServiceAsyncClient", "SubscriberServiceClient", "Subscription", + "TimeTarget", "Topic", "TopicPartitions", "TopicStatsServiceAsyncClient", diff --git a/google/cloud/pubsublite/admin_client.py b/google/cloud/pubsublite/admin_client.py index bc7622c8..7dd20491 100644 --- a/google/cloud/pubsublite/admin_client.py +++ b/google/cloud/pubsublite/admin_client.py @@ -12,10 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Optional, List +from typing import Optional, List, Union from overrides import overrides from google.api_core.client_options import ClientOptions +from google.api_core.operation import Operation from google.auth.credentials import Credentials from google.protobuf.field_mask_pb2 import FieldMask @@ -31,6 +32,8 @@ LocationPath, TopicPath, BacklogLocation, + PublishTime, + EventTime, ) from google.cloud.pubsublite.types.paths import ReservationPath from google.cloud.pubsublite_v1 import ( @@ -129,6 +132,14 @@ def update_subscription( ) -> Subscription: return self._impl.update_subscription(subscription, update_mask) + @overrides + def seek_subscription( + self, + subscription_path: SubscriptionPath, + target: Union[BacklogLocation, PublishTime, EventTime], + ) -> Operation: + return self._impl.seek_subscription(subscription_path, target) + @overrides def delete_subscription(self, subscription_path: SubscriptionPath): return self._impl.delete_subscription(subscription_path) diff --git a/google/cloud/pubsublite/admin_client_interface.py b/google/cloud/pubsublite/admin_client_interface.py index c300351d..becf9ea7 100644 --- a/google/cloud/pubsublite/admin_client_interface.py +++ b/google/cloud/pubsublite/admin_client_interface.py @@ -13,14 +13,17 @@ # limitations under the License. from abc import ABC, abstractmethod -from typing import List +from typing import List, Union +from google.api_core.operation import Operation from google.cloud.pubsublite.types import ( CloudRegion, TopicPath, LocationPath, SubscriptionPath, BacklogLocation, + PublishTime, + EventTime, ) from google.cloud.pubsublite.types.paths import ReservationPath from google.cloud.pubsublite_v1 import Topic, Subscription, Reservation @@ -88,6 +91,24 @@ def update_subscription( ) -> Subscription: """Update the masked fields of the provided subscription.""" + @abstractmethod + def seek_subscription( + self, + subscription_path: SubscriptionPath, + target: Union[BacklogLocation, PublishTime, EventTime], + ) -> Operation: + """Initiate an out-of-band seek for a subscription to a specified target. + + The seek target may be timestamps or named positions within the message + backlog See https://cloud.google.com/pubsub/lite/docs/seek for more + information. + + Returns: + google.api_core.operation.Operation with: + result type: google.cloud.pubsublite.SeekSubscriptionResponse + metadata type: google.cloud.pubsublite.OperationMetadata + """ + @abstractmethod def delete_subscription(self, subscription_path: SubscriptionPath): """Delete a subscription and all associated messages.""" diff --git a/google/cloud/pubsublite/internal/wire/admin_client_impl.py b/google/cloud/pubsublite/internal/wire/admin_client_impl.py index 3dc2cdcb..a5a9366c 100644 --- a/google/cloud/pubsublite/internal/wire/admin_client_impl.py +++ b/google/cloud/pubsublite/internal/wire/admin_client_impl.py @@ -12,8 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import List +from typing import List, Union +from google.api_core.exceptions import InvalidArgument +from google.api_core.operation import Operation from google.protobuf.field_mask_pb2 import FieldMask from google.cloud.pubsublite.admin_client_interface import AdminClientInterface @@ -23,6 +25,8 @@ LocationPath, TopicPath, BacklogLocation, + PublishTime, + EventTime, ) from google.cloud.pubsublite.types.paths import ReservationPath from google.cloud.pubsublite_v1 import ( @@ -31,6 +35,8 @@ AdminServiceClient, TopicPartitions, Reservation, + TimeTarget, + SeekSubscriptionRequest, ) @@ -105,6 +111,25 @@ def update_subscription( subscription=subscription, update_mask=update_mask ) + def seek_subscription( + self, + subscription_path: SubscriptionPath, + target: Union[BacklogLocation, PublishTime, EventTime], + ) -> Operation: + request = SeekSubscriptionRequest(name=str(subscription_path)) + if isinstance(target, PublishTime): + request.time_target = TimeTarget(publish_time=target.value) + elif isinstance(target, EventTime): + request.time_target = TimeTarget(event_time=target.value) + elif isinstance(target, BacklogLocation): + if target == BacklogLocation.END: + request.named_target = SeekSubscriptionRequest.NamedTarget.HEAD + else: + request.named_target = SeekSubscriptionRequest.NamedTarget.TAIL + else: + raise InvalidArgument("A valid seek target must be specified.") + return self._underlying.seek_subscription(request=request) + def delete_subscription(self, subscription_path: SubscriptionPath): self._underlying.delete_subscription(name=str(subscription_path)) diff --git a/google/cloud/pubsublite/types/__init__.py b/google/cloud/pubsublite/types/__init__.py index fbf5cc00..ef4d842c 100644 --- a/google/cloud/pubsublite/types/__init__.py +++ b/google/cloud/pubsublite/types/__init__.py @@ -18,7 +18,7 @@ from .paths import LocationPath, TopicPath, SubscriptionPath from .message_metadata import MessageMetadata from .flow_control_settings import FlowControlSettings, DISABLED_FLOW_CONTROL -from .backlog_location import BacklogLocation +from .backlog_location import BacklogLocation, PublishTime, EventTime __all__ = ( "CloudRegion", @@ -30,4 +30,6 @@ "SubscriptionPath", "TopicPath", "BacklogLocation", + "PublishTime", + "EventTime", ) diff --git a/google/cloud/pubsublite/types/backlog_location.py b/google/cloud/pubsublite/types/backlog_location.py index 42e64218..dbd0c603 100644 --- a/google/cloud/pubsublite/types/backlog_location.py +++ b/google/cloud/pubsublite/types/backlog_location.py @@ -13,6 +13,8 @@ # limitations under the License. import enum +from datetime import datetime +from typing import NamedTuple class BacklogLocation(enum.Enum): @@ -22,3 +24,15 @@ class BacklogLocation(enum.Enum): BEGINNING = 0 END = 1 + + +class PublishTime(NamedTuple): + """The publish timestamp of a message.""" + + value: datetime + + +class EventTime(NamedTuple): + """A user-defined event timestamp of a message.""" + + value: datetime diff --git a/samples/snippets/quickstart_test.py b/samples/snippets/quickstart_test.py index 470ca32f..a47d54ed 100644 --- a/samples/snippets/quickstart_test.py +++ b/samples/snippets/quickstart_test.py @@ -22,6 +22,7 @@ from google.api_core.exceptions import NotFound from google.cloud.pubsublite import AdminClient from google.cloud.pubsublite.types import ( + BacklogLocation, CloudRegion, CloudZone, SubscriptionPath, @@ -259,6 +260,16 @@ def test_subscriber_example(topic_path, subscription_path, capsys): assert f"Received {message}" in out +def test_seek_lite_subscription_example(capsys): + import seek_lite_subscription_example + + seek_lite_subscription_example.seek_lite_subscription( + PROJECT_NUMBER, CLOUD_REGION, ZONE_ID, SUBSCRIPTION_ID, BacklogLocation.BEGINNING, False + ) + out, _ = capsys.readouterr() + assert "Seek operation" in out + + def test_delete_lite_subscription_example(subscription_path, capsys): import delete_lite_subscription_example diff --git a/samples/snippets/seek_lite_subscription_example.py b/samples/snippets/seek_lite_subscription_example.py new file mode 100644 index 00000000..0bfe0acd --- /dev/null +++ b/samples/snippets/seek_lite_subscription_example.py @@ -0,0 +1,101 @@ +#!/usr/bin/env python + +# Copyright 2021 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""This application demonstrates how to initiate an out-of-band seek for a +subscription with the Pub/Sub Lite API. For more information, see the +documentation at https://cloud.google.com/pubsub/lite/docs/seek. +""" + +import argparse + + +def seek_lite_subscription(project_number, cloud_region, zone_id, subscription_id, seek_target, wait_for_operation): + # [START pubsublite_seek_subscription] + from google.api_core.exceptions import NotFound + from google.cloud.pubsublite import AdminClient + from google.cloud.pubsublite.types import CloudRegion, CloudZone, SubscriptionPath + + # TODO(developer): + # project_number = 1122334455 + # cloud_region = "us-central1" + # zone_id = "a" + # subscription_id = "your-subscription-id" + # seek_target = BacklogLocation.BEGINNING + # wait_for_operation = False + + # Possible values for seek_target: + # - BacklogLocation.BEGINNING: replays from the beginning of all retained + # messages. + # - BacklogLocation.END: skips past all current published messages. + # - PublishTime(): delivers messages with publish time greater + # than or equal to the specified timestamp. + # - EventTime(): seeks to the first message with event time + # greater than or equal to the specified timestamp. + + # Waiting for the seek operation to complete is optional. It indicates when + # subscribers for all partitions are receiving messages from the seek + # target. If subscribers are offline, the operation will complete once they + # are online. + + cloud_region = CloudRegion(cloud_region) + location = CloudZone(cloud_region, zone_id) + subscription_path = SubscriptionPath(project_number, location, subscription_id) + + client = AdminClient(cloud_region) + try: + # Initiate an out-of-band seek for a subscription to the specified + # target. If an operation is returned, the seek has been successfully + # registered and will eventually propagate to subscribers. + seek_operation = client.seek_subscription(subscription_path, seek_target) + print(f"Seek operation: {seek_operation.operation.name}") + except NotFound: + print(f"{subscription_path} not found.") + return + + if wait_for_operation: + print("Waiting for operation to complete...") + seek_operation.result() + print(f"Operation completed. Metadata:\n{seek_operation.metadata}") + # [END pubsublite_seek_subscription] + + +if __name__ == "__main__": + from datetime import datetime + from google.cloud.pubsublite.types import BacklogLocation, PublishTime + + parser = argparse.ArgumentParser( + description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, + ) + parser.add_argument("project_number", help="Your Google Cloud Project Number") + parser.add_argument("cloud_region", help="Your Cloud Region, e.g. 'us-central1'") + parser.add_argument("zone_id", help="Your Zone ID, e.g. 'a'") + parser.add_argument("subscription_id", help="Your subscription ID") + parser.add_argument("--target", default="BEGINNING", help="Seek target, e.g. 'BEGINNING, 'END' or a timestamp") + parser.add_argument("--wait_for_operation", help="Wait for the seek operation to complete") + + args = parser.parse_args() + + if args.target == "BEGINNING": + seek_target = BacklogLocation.BEGINNING + elif args.target == "END": + seek_target = BacklogLocation.END + else: + seek_target = PublishTime(datetime.strptime(args.target, "%Y-%m-%d %H:%M:%S")) + + seek_lite_subscription( + args.project_number, args.cloud_region, args.zone_id, + args.subscription_id, seek_target, args.wait_for_operation + )