Skip to content

Commit

Permalink
feat: Add SeekSubscription and Operations to API (#169)
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 380660182

Source-Link: googleapis/googleapis@b601f02

Source-Link: googleapis/googleapis-gen@de07f61
  • Loading branch information
gcf-owl-bot[bot] committed Jun 26, 2021
1 parent f95b4cc commit 2e29ba1
Show file tree
Hide file tree
Showing 10 changed files with 594 additions and 0 deletions.
6 changes: 6 additions & 0 deletions google/cloud/pubsublite_v1/__init__.py
Expand Up @@ -47,6 +47,9 @@
from .types.admin import ListTopicsResponse
from .types.admin import ListTopicSubscriptionsRequest
from .types.admin import ListTopicSubscriptionsResponse
from .types.admin import OperationMetadata
from .types.admin import SeekSubscriptionRequest
from .types.admin import SeekSubscriptionResponse
from .types.admin import TopicPartitions
from .types.admin import UpdateReservationRequest
from .types.admin import UpdateSubscriptionRequest
Expand Down Expand Up @@ -147,6 +150,7 @@
"MessagePublishRequest",
"MessagePublishResponse",
"MessageResponse",
"OperationMetadata",
"PartitionAssignment",
"PartitionAssignmentAck",
"PartitionAssignmentRequest",
Expand All @@ -159,6 +163,8 @@
"Reservation",
"SeekRequest",
"SeekResponse",
"SeekSubscriptionRequest",
"SeekSubscriptionResponse",
"SequencedCommitCursorRequest",
"SequencedCommitCursorResponse",
"SequencedMessage",
Expand Down
10 changes: 10 additions & 0 deletions google/cloud/pubsublite_v1/gapic_metadata.json
Expand Up @@ -85,6 +85,11 @@
"list_topics"
]
},
"SeekSubscription": {
"methods": [
"seek_subscription"
]
},
"UpdateReservation": {
"methods": [
"update_reservation"
Expand Down Expand Up @@ -180,6 +185,11 @@
"list_topics"
]
},
"SeekSubscription": {
"methods": [
"seek_subscription"
]
},
"UpdateReservation": {
"methods": [
"update_reservation"
Expand Down
87 changes: 87 additions & 0 deletions google/cloud/pubsublite_v1/services/admin_service/async_client.py
Expand Up @@ -26,6 +26,8 @@
from google.auth import credentials as ga_credentials # type: ignore
from google.oauth2 import service_account # type: ignore

from google.api_core import operation # type: ignore
from google.api_core import operation_async # type: ignore
from google.cloud.pubsublite_v1.services.admin_service import pagers
from google.cloud.pubsublite_v1.types import admin
from google.cloud.pubsublite_v1.types import common
Expand Down Expand Up @@ -1084,6 +1086,91 @@ async def delete_subscription(
request, retry=retry, timeout=timeout, metadata=metadata,
)

async def seek_subscription(
self,
request: admin.SeekSubscriptionRequest = None,
*,
retry: retries.Retry = gapic_v1.method.DEFAULT,
timeout: float = None,
metadata: Sequence[Tuple[str, str]] = (),
) -> operation_async.AsyncOperation:
r"""Performs an out-of-band seek for a subscription to a
specified target, which may be timestamps or named
positions within the message backlog. Seek translates
these targets to cursors for each partition and
orchestrates subscribers to start consuming messages
from these seek cursors.
If an operation is returned, the seek has been
registered and subscribers will eventually receive
messages from the seek cursors (i.e. eventual
consistency), as long as they are using a minimum
supported client library version and not a system that
tracks cursors independently of Pub/Sub Lite (e.g.
Apache Beam, Dataflow, Spark). The seek operation will
fail for unsupported clients.
If clients would like to know when subscribers react to
the seek (or not), they can poll the operation. The seek
operation will succeed and complete once subscribers are
ready to receive messages from the seek cursors for all
partitions of the topic. This means that the seek
operation will not complete until all subscribers come
online.
If the previous seek operation has not yet completed, it
will be aborted and the new invocation of seek will
supersede it.
Args:
request (:class:`google.cloud.pubsublite_v1.types.SeekSubscriptionRequest`):
The request object. Request for SeekSubscription.
retry (google.api_core.retry.Retry): Designation of what errors, if any,
should be retried.
timeout (float): The timeout for this request.
metadata (Sequence[Tuple[str, str]]): Strings which should be
sent along with the request as metadata.
Returns:
google.api_core.operation_async.AsyncOperation:
An object representing a long-running operation.
The result type for the operation will be
:class:`google.cloud.pubsublite_v1.types.SeekSubscriptionResponse`
Response for SeekSubscription long running operation.
"""
# Create or coerce a protobuf request object.
request = admin.SeekSubscriptionRequest(request)

# Wrap the RPC method; this adds retry and timeout information,
# and friendly error handling.
rpc = gapic_v1.method_async.wrap_method(
self._client._transport.seek_subscription,
default_timeout=None,
client_info=DEFAULT_CLIENT_INFO,
)

# Certain fields should be provided within the metadata header;
# add these here.
metadata = tuple(metadata) + (
gapic_v1.routing_header.to_grpc_metadata((("name", request.name),)),
)

# Send the request.
response = await rpc(request, retry=retry, timeout=timeout, metadata=metadata,)

# Wrap the response in an operation future.
response = operation_async.from_gapic(
response,
self._client._transport.operations_client,
admin.SeekSubscriptionResponse,
metadata_type=admin.OperationMetadata,
)

# Done; return the response.
return response

async def create_reservation(
self,
request: admin.CreateReservationRequest = None,
Expand Down
88 changes: 88 additions & 0 deletions google/cloud/pubsublite_v1/services/admin_service/client.py
Expand Up @@ -30,6 +30,8 @@
from google.auth.exceptions import MutualTLSChannelError # type: ignore
from google.oauth2 import service_account # type: ignore

from google.api_core import operation # type: ignore
from google.api_core import operation_async # type: ignore
from google.cloud.pubsublite_v1.services.admin_service import pagers
from google.cloud.pubsublite_v1.types import admin
from google.cloud.pubsublite_v1.types import common
Expand Down Expand Up @@ -1299,6 +1301,92 @@ def delete_subscription(
request, retry=retry, timeout=timeout, metadata=metadata,
)

def seek_subscription(
self,
request: admin.SeekSubscriptionRequest = None,
*,
retry: retries.Retry = gapic_v1.method.DEFAULT,
timeout: float = None,
metadata: Sequence[Tuple[str, str]] = (),
) -> operation.Operation:
r"""Performs an out-of-band seek for a subscription to a
specified target, which may be timestamps or named
positions within the message backlog. Seek translates
these targets to cursors for each partition and
orchestrates subscribers to start consuming messages
from these seek cursors.
If an operation is returned, the seek has been
registered and subscribers will eventually receive
messages from the seek cursors (i.e. eventual
consistency), as long as they are using a minimum
supported client library version and not a system that
tracks cursors independently of Pub/Sub Lite (e.g.
Apache Beam, Dataflow, Spark). The seek operation will
fail for unsupported clients.
If clients would like to know when subscribers react to
the seek (or not), they can poll the operation. The seek
operation will succeed and complete once subscribers are
ready to receive messages from the seek cursors for all
partitions of the topic. This means that the seek
operation will not complete until all subscribers come
online.
If the previous seek operation has not yet completed, it
will be aborted and the new invocation of seek will
supersede it.
Args:
request (google.cloud.pubsublite_v1.types.SeekSubscriptionRequest):
The request object. Request for SeekSubscription.
retry (google.api_core.retry.Retry): Designation of what errors, if any,
should be retried.
timeout (float): The timeout for this request.
metadata (Sequence[Tuple[str, str]]): Strings which should be
sent along with the request as metadata.
Returns:
google.api_core.operation.Operation:
An object representing a long-running operation.
The result type for the operation will be
:class:`google.cloud.pubsublite_v1.types.SeekSubscriptionResponse`
Response for SeekSubscription long running operation.
"""
# Create or coerce a protobuf request object.
# Minor optimization to avoid making a copy if the user passes
# in a admin.SeekSubscriptionRequest.
# There's no risk of modifying the input as we've already verified
# there are no flattened fields.
if not isinstance(request, admin.SeekSubscriptionRequest):
request = admin.SeekSubscriptionRequest(request)

# Wrap the RPC method; this adds retry and timeout information,
# and friendly error handling.
rpc = self._transport._wrapped_methods[self._transport.seek_subscription]

# Certain fields should be provided within the metadata header;
# add these here.
metadata = tuple(metadata) + (
gapic_v1.routing_header.to_grpc_metadata((("name", request.name),)),
)

# Send the request.
response = rpc(request, retry=retry, timeout=timeout, metadata=metadata,)

# Wrap the response in an operation future.
response = operation.from_gapic(
response,
self._transport.operations_client,
admin.SeekSubscriptionResponse,
metadata_type=admin.OperationMetadata,
)

# Done; return the response.
return response

def create_reservation(
self,
request: admin.CreateReservationRequest = None,
Expand Down
Expand Up @@ -23,11 +23,13 @@
from google.api_core import exceptions as core_exceptions # type: ignore
from google.api_core import gapic_v1 # type: ignore
from google.api_core import retry as retries # type: ignore
from google.api_core import operations_v1 # type: ignore
from google.auth import credentials as ga_credentials # type: ignore
from google.oauth2 import service_account # type: ignore

from google.cloud.pubsublite_v1.types import admin
from google.cloud.pubsublite_v1.types import common
from google.longrunning import operations_pb2 # type: ignore
from google.protobuf import empty_pb2 # type: ignore

try:
Expand Down Expand Up @@ -196,6 +198,9 @@ def _prep_wrapped_messages(self, client_info):
self.delete_subscription: gapic_v1.method.wrap_method(
self.delete_subscription, default_timeout=None, client_info=client_info,
),
self.seek_subscription: gapic_v1.method.wrap_method(
self.seek_subscription, default_timeout=None, client_info=client_info,
),
self.create_reservation: gapic_v1.method.wrap_method(
self.create_reservation, default_timeout=None, client_info=client_info,
),
Expand All @@ -218,6 +223,11 @@ def _prep_wrapped_messages(self, client_info):
),
}

@property
def operations_client(self) -> operations_v1.OperationsClient:
"""Return the client designed to process long-running operations."""
raise NotImplementedError()

@property
def create_topic(
self,
Expand Down Expand Up @@ -327,6 +337,15 @@ def delete_subscription(
]:
raise NotImplementedError()

@property
def seek_subscription(
self,
) -> Callable[
[admin.SeekSubscriptionRequest],
Union[operations_pb2.Operation, Awaitable[operations_pb2.Operation]],
]:
raise NotImplementedError()

@property
def create_reservation(
self,
Expand Down

0 comments on commit 2e29ba1

Please sign in to comment.