Skip to content

Commit

Permalink
feat: Support seek subscription in AdminClient
Browse files Browse the repository at this point in the history
  • Loading branch information
tmdiep committed Jul 2, 2021
1 parent 6c84e24 commit def076e
Show file tree
Hide file tree
Showing 8 changed files with 189 additions and 4 deletions.
8 changes: 8 additions & 0 deletions google/cloud/pubsublite/__init__.py
Expand Up @@ -62,6 +62,9 @@
from google.cloud.pubsublite_v1.types.admin import ListTopicSubscriptionsResponse
from google.cloud.pubsublite_v1.types.admin import ListTopicsRequest
from google.cloud.pubsublite_v1.types.admin import ListTopicsResponse
from google.cloud.pubsublite_v1.types.admin import OperationMetadata
from google.cloud.pubsublite_v1.types.admin import SeekSubscriptionRequest
from google.cloud.pubsublite_v1.types.admin import SeekSubscriptionResponse
from google.cloud.pubsublite_v1.types.admin import TopicPartitions
from google.cloud.pubsublite_v1.types.admin import UpdateSubscriptionRequest
from google.cloud.pubsublite_v1.types.admin import UpdateTopicRequest
Expand All @@ -70,6 +73,7 @@
from google.cloud.pubsublite_v1.types.common import PubSubMessage
from google.cloud.pubsublite_v1.types.common import SequencedMessage
from google.cloud.pubsublite_v1.types.common import Subscription
from google.cloud.pubsublite_v1.types.common import TimeTarget
from google.cloud.pubsublite_v1.types.common import Topic
from google.cloud.pubsublite_v1.types.cursor import CommitCursorRequest
from google.cloud.pubsublite_v1.types.cursor import CommitCursorResponse
Expand Down Expand Up @@ -148,6 +152,7 @@
"MessagePublishRequest",
"MessagePublishResponse",
"MessageResponse",
"OperationMetadata",
"PartitionAssignment",
"PartitionAssignmentAck",
"PartitionAssignmentRequest",
Expand All @@ -159,6 +164,8 @@
"PublishResponse",
"PublisherServiceAsyncClient",
"PublisherServiceClient",
"SeekSubscriptionRequest",
"SeekSubscriptionResponse",
"SeekRequest",
"SeekResponse",
"SequencedCommitCursorRequest",
Expand All @@ -171,6 +178,7 @@
"SubscriberServiceAsyncClient",
"SubscriberServiceClient",
"Subscription",
"TimeTarget",
"Topic",
"TopicPartitions",
"TopicStatsServiceAsyncClient",
Expand Down
13 changes: 12 additions & 1 deletion google/cloud/pubsublite/admin_client.py
Expand Up @@ -12,10 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Optional, List
from typing import Optional, List, Union

from overrides import overrides
from google.api_core.client_options import ClientOptions
from google.api_core.operation import Operation
from google.auth.credentials import Credentials
from google.protobuf.field_mask_pb2 import FieldMask

Expand All @@ -31,6 +32,8 @@
LocationPath,
TopicPath,
BacklogLocation,
PublishTime,
EventTime,
)
from google.cloud.pubsublite.types.paths import ReservationPath
from google.cloud.pubsublite_v1 import (
Expand Down Expand Up @@ -129,6 +132,14 @@ def update_subscription(
) -> Subscription:
return self._impl.update_subscription(subscription, update_mask)

@overrides
def seek_subscription(
self,
subscription_path: SubscriptionPath,
target: Union[BacklogLocation, PublishTime, EventTime],
) -> Operation:
return self._impl.seek_subscription(subscription_path, target)

@overrides
def delete_subscription(self, subscription_path: SubscriptionPath):
return self._impl.delete_subscription(subscription_path)
Expand Down
31 changes: 30 additions & 1 deletion google/cloud/pubsublite/admin_client_interface.py
Expand Up @@ -13,14 +13,17 @@
# limitations under the License.

from abc import ABC, abstractmethod
from typing import List
from typing import List, Union

from google.api_core.operation import Operation
from google.cloud.pubsublite.types import (
CloudRegion,
TopicPath,
LocationPath,
SubscriptionPath,
BacklogLocation,
PublishTime,
EventTime,
)
from google.cloud.pubsublite.types.paths import ReservationPath
from google.cloud.pubsublite_v1 import Topic, Subscription, Reservation
Expand Down Expand Up @@ -88,6 +91,32 @@ def update_subscription(
) -> Subscription:
"""Update the masked fields of the provided subscription."""

@abstractmethod
def seek_subscription(
self,
subscription_path: SubscriptionPath,
target: Union[BacklogLocation, PublishTime, EventTime],
) -> Operation:
"""Initiate an out-of-band seek for a subscription to a specified target.
If an operation is returned, the seek has been registered and subscribers
will eventually receive messages from the seek target, as long as the
subscriber client supports out-of-band seeks. The seek operation will be
aborted for unsupported clients, or if it is superseded by a newer seek
invocation for the same subscription.
To determine when subscribers react to the seek (or it has been aborted),
wait for the returned operation to complete. The operation will succeed and
complete once subscribers are receiving messages from the seek target for all
partitions of the topic. The operation will not complete until all
subscribers come online.
Returns:
google.api_core.operation.Operation with:
result type: google.cloud.pubsublite.SeekSubscriptionResponse
metadata type: google.cloud.pubsublite.OperationMetadata
"""

@abstractmethod
def delete_subscription(self, subscription_path: SubscriptionPath):
"""Delete a subscription and all associated messages."""
Expand Down
32 changes: 31 additions & 1 deletion google/cloud/pubsublite/internal/wire/admin_client_impl.py
Expand Up @@ -12,9 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import List
from typing import List, Union

from google.api_core.exceptions import InvalidArgument
from google.api_core.operation import Operation
from google.protobuf.field_mask_pb2 import FieldMask
from google.protobuf.timestamp_pb2 import Timestamp

from google.cloud.pubsublite.admin_client_interface import AdminClientInterface
from google.cloud.pubsublite.types import (
Expand All @@ -23,6 +26,8 @@
LocationPath,
TopicPath,
BacklogLocation,
PublishTime,
EventTime,
)
from google.cloud.pubsublite.types.paths import ReservationPath
from google.cloud.pubsublite_v1 import (
Expand All @@ -31,6 +36,8 @@
AdminServiceClient,
TopicPartitions,
Reservation,
TimeTarget,
SeekSubscriptionRequest,
)


Expand Down Expand Up @@ -105,6 +112,29 @@ def update_subscription(
subscription=subscription, update_mask=update_mask
)

def seek_subscription(
self,
subscription_path: SubscriptionPath,
target: Union[BacklogLocation, PublishTime, EventTime],
) -> Operation:
request = SeekSubscriptionRequest(name=str(subscription_path))
if isinstance(target, PublishTime):
ts = Timestamp()
ts.FromDatetime(target.value)
request.time_target = TimeTarget(publish_time=ts)
elif isinstance(target, EventTime):
ts = Timestamp()
ts.FromDatetime(target.value)
request.time_target = TimeTarget(event_time=ts)
elif isinstance(target, BacklogLocation):
if target == BacklogLocation.END:
request.named_target = SeekSubscriptionRequest.NamedTarget.HEAD
else:
request.named_target = SeekSubscriptionRequest.NamedTarget.TAIL
else:
raise InvalidArgument("A valid seek target must be specified.")
return self._underlying.seek_subscription(request=request)

def delete_subscription(self, subscription_path: SubscriptionPath):
self._underlying.delete_subscription(name=str(subscription_path))

Expand Down
4 changes: 3 additions & 1 deletion google/cloud/pubsublite/types/__init__.py
Expand Up @@ -18,7 +18,7 @@
from .paths import LocationPath, TopicPath, SubscriptionPath
from .message_metadata import MessageMetadata
from .flow_control_settings import FlowControlSettings, DISABLED_FLOW_CONTROL
from .backlog_location import BacklogLocation
from .backlog_location import BacklogLocation, PublishTime, EventTime

__all__ = (
"CloudRegion",
Expand All @@ -30,4 +30,6 @@
"SubscriptionPath",
"TopicPath",
"BacklogLocation",
"PublishTime",
"EventTime",
)
10 changes: 10 additions & 0 deletions google/cloud/pubsublite/types/backlog_location.py
Expand Up @@ -13,6 +13,8 @@
# limitations under the License.

import enum
from datetime import datetime
from typing import NamedTuple


class BacklogLocation(enum.Enum):
Expand All @@ -22,3 +24,11 @@ class BacklogLocation(enum.Enum):

BEGINNING = 0
END = 1


class PublishTime(NamedTuple):
value: datetime


class EventTime(NamedTuple):
value: datetime
10 changes: 10 additions & 0 deletions samples/snippets/quickstart_test.py
Expand Up @@ -259,6 +259,16 @@ def test_subscriber_example(topic_path, subscription_path, capsys):
assert f"Received {message}" in out


def test_seek_lite_subscription_example(capsys):
import seek_lite_subscription_example

seek_lite_subscription_example.seek_lite_subscription(
PROJECT_NUMBER, CLOUD_REGION, ZONE_ID, SUBSCRIPTION_ID, "BEGINNING", False
)
out, _ = capsys.readouterr()
assert "Seek operation" in out


def test_delete_lite_subscription_example(subscription_path, capsys):
import delete_lite_subscription_example

Expand Down
85 changes: 85 additions & 0 deletions samples/snippets/seek_lite_subscription_example.py
@@ -0,0 +1,85 @@
#!/usr/bin/env python

# Copyright 2021 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""This application demonstrates how to invoke an out-of-band seek for a
subscription with the Pub/Sub Lite API.
"""

import argparse


def seek_lite_subscription(project_number, cloud_region, zone_id, subscription_id, target, wait_for_operation):
# [START pubsublite_seek_subscription]
from datetime import datetime
from google.api_core.exceptions import NotFound, GoogleAPICallError
from google.cloud.pubsublite import AdminClient
from google.cloud.pubsublite.types import CloudRegion, CloudZone, SubscriptionPath, BacklogLocation, PublishTime

# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# subscription_id = "your-subscription-id"
# target = "BEGINNING"
# wait_for_operation = 1

cloud_region = CloudRegion(cloud_region)
location = CloudZone(cloud_region, zone_id)
subscription_path = SubscriptionPath(project_number, location, subscription_id)

if target == "BEGINNING":
seek_target = BacklogLocation.BEGINNING
elif target == "END":
seek_target = BacklogLocation.END
else:
seek_target = PublishTime(datetime.strptime(target, "%Y-%m-%d %H:%M:%S"))

client = AdminClient(cloud_region)
try:
seek_operation = client.seek_subscription(subscription_path, seek_target)
print(f"Seek operation: {seek_operation.operation.name}")
print(f"Metadata:\n{seek_operation.metadata}")
except NotFound:
print(f"{subscription_path} not found.")

# Note: In order for the operation to complete, a subscriber must be
# receiving messages for the subscription.
if wait_for_operation:
print("Waiting for operation to complete...")
try:
seek_operation.result()
print(f"Operation completed. Metadata:\n{seek_operation.metadata}")
except GoogleAPICallError as e:
print(e)
# [END pubsublite_seek_subscription]


if __name__ == "__main__":
parser = argparse.ArgumentParser(
description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument("project_number", help="Your Google Cloud Project Number")
parser.add_argument("cloud_region", help="Your Cloud Region, e.g. 'us-central1'")
parser.add_argument("zone_id", help="Your Zone ID, e.g. 'a'")
parser.add_argument("subscription_id", help="Your subscription ID")
parser.add_argument("--target", default="BEGINNING", help="Seek target, e.g. 'BEGINNING, 'END' or a timestamp")
parser.add_argument("--wait_for_operation", help="Wait for the seek operation to complete")

args = parser.parse_args()

seek_lite_subscription(
args.project_number, args.cloud_region, args.zone_id, args.subscription_id, args.target, args.wait_for_operation
)

0 comments on commit def076e

Please sign in to comment.