Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support seek subscription in AdminClient #176

Merged
merged 10 commits into from Aug 10, 2021
8 changes: 8 additions & 0 deletions google/cloud/pubsublite/__init__.py
Expand Up @@ -62,6 +62,9 @@
from google.cloud.pubsublite_v1.types.admin import ListTopicSubscriptionsResponse
from google.cloud.pubsublite_v1.types.admin import ListTopicsRequest
from google.cloud.pubsublite_v1.types.admin import ListTopicsResponse
from google.cloud.pubsublite_v1.types.admin import OperationMetadata
from google.cloud.pubsublite_v1.types.admin import SeekSubscriptionRequest
from google.cloud.pubsublite_v1.types.admin import SeekSubscriptionResponse
from google.cloud.pubsublite_v1.types.admin import TopicPartitions
from google.cloud.pubsublite_v1.types.admin import UpdateSubscriptionRequest
from google.cloud.pubsublite_v1.types.admin import UpdateTopicRequest
Expand All @@ -70,6 +73,7 @@
from google.cloud.pubsublite_v1.types.common import PubSubMessage
from google.cloud.pubsublite_v1.types.common import SequencedMessage
from google.cloud.pubsublite_v1.types.common import Subscription
from google.cloud.pubsublite_v1.types.common import TimeTarget
from google.cloud.pubsublite_v1.types.common import Topic
from google.cloud.pubsublite_v1.types.cursor import CommitCursorRequest
from google.cloud.pubsublite_v1.types.cursor import CommitCursorResponse
Expand Down Expand Up @@ -148,6 +152,7 @@
"MessagePublishRequest",
"MessagePublishResponse",
"MessageResponse",
"OperationMetadata",
"PartitionAssignment",
"PartitionAssignmentAck",
"PartitionAssignmentRequest",
Expand All @@ -159,6 +164,8 @@
"PublishResponse",
"PublisherServiceAsyncClient",
"PublisherServiceClient",
"SeekSubscriptionRequest",
"SeekSubscriptionResponse",
"SeekRequest",
"SeekResponse",
"SequencedCommitCursorRequest",
Expand All @@ -171,6 +178,7 @@
"SubscriberServiceAsyncClient",
"SubscriberServiceClient",
"Subscription",
"TimeTarget",
"Topic",
"TopicPartitions",
"TopicStatsServiceAsyncClient",
Expand Down
13 changes: 12 additions & 1 deletion google/cloud/pubsublite/admin_client.py
Expand Up @@ -12,10 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Optional, List
from typing import Optional, List, Union

from overrides import overrides
from google.api_core.client_options import ClientOptions
from google.api_core.operation import Operation
from google.auth.credentials import Credentials
from google.protobuf.field_mask_pb2 import FieldMask

Expand All @@ -31,6 +32,8 @@
LocationPath,
TopicPath,
BacklogLocation,
PublishTime,
EventTime,
)
from google.cloud.pubsublite.types.paths import ReservationPath
from google.cloud.pubsublite_v1 import (
Expand Down Expand Up @@ -129,6 +132,14 @@ def update_subscription(
) -> Subscription:
return self._impl.update_subscription(subscription, update_mask)

@overrides
def seek_subscription(
self,
subscription_path: SubscriptionPath,
target: Union[BacklogLocation, PublishTime, EventTime],
) -> Operation:
return self._impl.seek_subscription(subscription_path, target)

@overrides
def delete_subscription(self, subscription_path: SubscriptionPath):
return self._impl.delete_subscription(subscription_path)
Expand Down
23 changes: 22 additions & 1 deletion google/cloud/pubsublite/admin_client_interface.py
Expand Up @@ -13,14 +13,17 @@
# limitations under the License.

from abc import ABC, abstractmethod
from typing import List
from typing import List, Union

from google.api_core.operation import Operation
from google.cloud.pubsublite.types import (
CloudRegion,
TopicPath,
LocationPath,
SubscriptionPath,
BacklogLocation,
PublishTime,
EventTime,
)
from google.cloud.pubsublite.types.paths import ReservationPath
from google.cloud.pubsublite_v1 import Topic, Subscription, Reservation
Expand Down Expand Up @@ -88,6 +91,24 @@ def update_subscription(
) -> Subscription:
"""Update the masked fields of the provided subscription."""

@abstractmethod
def seek_subscription(
self,
subscription_path: SubscriptionPath,
target: Union[BacklogLocation, PublishTime, EventTime],
) -> Operation:
"""Initiate an out-of-band seek for a subscription to a specified target.

The seek target may be timestamps or named positions within the message
backlog See https://cloud.google.com/pubsub/lite/docs/seek for more
information.

Returns:
google.api_core.operation.Operation with:
result type: google.cloud.pubsublite.SeekSubscriptionResponse
metadata type: google.cloud.pubsublite.OperationMetadata
"""

@abstractmethod
def delete_subscription(self, subscription_path: SubscriptionPath):
"""Delete a subscription and all associated messages."""
Expand Down
27 changes: 26 additions & 1 deletion google/cloud/pubsublite/internal/wire/admin_client_impl.py
Expand Up @@ -12,8 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import List
from typing import List, Union

from google.api_core.exceptions import InvalidArgument
from google.api_core.operation import Operation
from google.protobuf.field_mask_pb2 import FieldMask

from google.cloud.pubsublite.admin_client_interface import AdminClientInterface
Expand All @@ -23,6 +25,8 @@
LocationPath,
TopicPath,
BacklogLocation,
PublishTime,
EventTime,
)
from google.cloud.pubsublite.types.paths import ReservationPath
from google.cloud.pubsublite_v1 import (
Expand All @@ -31,6 +35,8 @@
AdminServiceClient,
TopicPartitions,
Reservation,
TimeTarget,
SeekSubscriptionRequest,
)


Expand Down Expand Up @@ -105,6 +111,25 @@ def update_subscription(
subscription=subscription, update_mask=update_mask
)

def seek_subscription(
self,
subscription_path: SubscriptionPath,
target: Union[BacklogLocation, PublishTime, EventTime],
) -> Operation:
request = SeekSubscriptionRequest(name=str(subscription_path))
if isinstance(target, PublishTime):
request.time_target = TimeTarget(publish_time=target.value)
elif isinstance(target, EventTime):
request.time_target = TimeTarget(event_time=target.value)
elif isinstance(target, BacklogLocation):
if target == BacklogLocation.END:
request.named_target = SeekSubscriptionRequest.NamedTarget.HEAD
else:
request.named_target = SeekSubscriptionRequest.NamedTarget.TAIL
else:
raise InvalidArgument("A valid seek target must be specified.")
return self._underlying.seek_subscription(request=request)

def delete_subscription(self, subscription_path: SubscriptionPath):
self._underlying.delete_subscription(name=str(subscription_path))

Expand Down
4 changes: 3 additions & 1 deletion google/cloud/pubsublite/types/__init__.py
Expand Up @@ -18,7 +18,7 @@
from .paths import LocationPath, TopicPath, SubscriptionPath
from .message_metadata import MessageMetadata
from .flow_control_settings import FlowControlSettings, DISABLED_FLOW_CONTROL
from .backlog_location import BacklogLocation
from .backlog_location import BacklogLocation, PublishTime, EventTime

__all__ = (
"CloudRegion",
Expand All @@ -30,4 +30,6 @@
"SubscriptionPath",
"TopicPath",
"BacklogLocation",
"PublishTime",
"EventTime",
)
14 changes: 14 additions & 0 deletions google/cloud/pubsublite/types/backlog_location.py
Expand Up @@ -13,6 +13,8 @@
# limitations under the License.

import enum
from datetime import datetime
from typing import NamedTuple


class BacklogLocation(enum.Enum):
Expand All @@ -22,3 +24,15 @@ class BacklogLocation(enum.Enum):

BEGINNING = 0
END = 1


class PublishTime(NamedTuple):
"""The publish timestamp of a message."""

value: datetime


class EventTime(NamedTuple):
"""A user-defined event timestamp of a message."""

value: datetime
11 changes: 11 additions & 0 deletions samples/snippets/quickstart_test.py
Expand Up @@ -22,6 +22,7 @@
from google.api_core.exceptions import NotFound
from google.cloud.pubsublite import AdminClient
from google.cloud.pubsublite.types import (
BacklogLocation,
CloudRegion,
CloudZone,
SubscriptionPath,
Expand Down Expand Up @@ -259,6 +260,16 @@ def test_subscriber_example(topic_path, subscription_path, capsys):
assert f"Received {message}" in out


def test_seek_lite_subscription_example(capsys):
import seek_lite_subscription_example

seek_lite_subscription_example.seek_lite_subscription(
PROJECT_NUMBER, CLOUD_REGION, ZONE_ID, SUBSCRIPTION_ID, BacklogLocation.BEGINNING, False
)
out, _ = capsys.readouterr()
assert "Seek operation" in out


def test_delete_lite_subscription_example(subscription_path, capsys):
import delete_lite_subscription_example

Expand Down
101 changes: 101 additions & 0 deletions samples/snippets/seek_lite_subscription_example.py
@@ -0,0 +1,101 @@
#!/usr/bin/env python

# Copyright 2021 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""This application demonstrates how to initiate an out-of-band seek for a
subscription with the Pub/Sub Lite API. For more information, see the
documentation at https://cloud.google.com/pubsub/lite/docs/seek.
"""

import argparse


def seek_lite_subscription(project_number, cloud_region, zone_id, subscription_id, seek_target, wait_for_operation):
# [START pubsublite_seek_subscription]
from google.api_core.exceptions import NotFound
from google.cloud.pubsublite import AdminClient
from google.cloud.pubsublite.types import CloudRegion, CloudZone, SubscriptionPath

# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# subscription_id = "your-subscription-id"
# seek_target = BacklogLocation.BEGINNING
# wait_for_operation = False

# Possible values for seek_target:
# - BacklogLocation.BEGINNING: replays from the beginning of all retained
# messages.
# - BacklogLocation.END: skips past all current published messages.
# - PublishTime(<datetime>): delivers messages with publish time greater
# than or equal to the specified timestamp.
# - EventTime(<datetime>): seeks to the first message with event time
# greater than or equal to the specified timestamp.

# Waiting for the seek operation to complete is optional. It indicates when
# subscribers for all partitions are receiving messages from the seek
# target. If subscribers are offline, the operation will complete once they
# are online.

cloud_region = CloudRegion(cloud_region)
location = CloudZone(cloud_region, zone_id)
subscription_path = SubscriptionPath(project_number, location, subscription_id)

client = AdminClient(cloud_region)
try:
# Initiate an out-of-band seek for a subscription to the specified
# target. If an operation is returned, the seek has been successfully
# registered and will eventually propagate to subscribers.
seek_operation = client.seek_subscription(subscription_path, seek_target)
print(f"Seek operation: {seek_operation.operation.name}")
except NotFound:
print(f"{subscription_path} not found.")
return

if wait_for_operation:
print("Waiting for operation to complete...")
seek_operation.result()
print(f"Operation completed. Metadata:\n{seek_operation.metadata}")
# [END pubsublite_seek_subscription]


if __name__ == "__main__":
from datetime import datetime
from google.cloud.pubsublite.types import BacklogLocation, PublishTime

parser = argparse.ArgumentParser(
description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument("project_number", help="Your Google Cloud Project Number")
parser.add_argument("cloud_region", help="Your Cloud Region, e.g. 'us-central1'")
parser.add_argument("zone_id", help="Your Zone ID, e.g. 'a'")
parser.add_argument("subscription_id", help="Your subscription ID")
parser.add_argument("--target", default="BEGINNING", help="Seek target, e.g. 'BEGINNING, 'END' or a timestamp")
parser.add_argument("--wait_for_operation", help="Wait for the seek operation to complete")

args = parser.parse_args()

if args.target == "BEGINNING":
seek_target = BacklogLocation.BEGINNING
elif args.target == "END":
seek_target = BacklogLocation.END
else:
seek_target = PublishTime(datetime.strptime(args.target, "%Y-%m-%d %H:%M:%S"))

seek_lite_subscription(
args.project_number, args.cloud_region, args.zone_id,
args.subscription_id, seek_target, args.wait_for_operation
)