Skip to content

Commit

Permalink
feat: Support seek subscription in AdminClient (#740)
Browse files Browse the repository at this point in the history
Seek subscription performs an out-of-band seek for a subscription to a specified target, which may be a backlog location, publish timestamp or event timestamp.

Note: feature is currently pre-release.
  • Loading branch information
tmdiep committed Jul 8, 2021
1 parent 9af040b commit 5fe36cb
Show file tree
Hide file tree
Showing 5 changed files with 232 additions and 0 deletions.
6 changes: 6 additions & 0 deletions google-cloud-pubsublite/clirr-ignored-differences.xml
Expand Up @@ -25,6 +25,12 @@
<to>*</to>
</difference>
<!-- END TODO: Remove on next release -->
<!-- Added method to AdminClient interface (Always okay) -->
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/pubsublite/AdminClient</className>
<method>*</method>
</difference>
<!-- Added abstract method to AutoValue.Builder class (Always okay) -->
<difference>
<differenceType>7013</differenceType>
Expand Down
Expand Up @@ -18,9 +18,12 @@

import com.google.api.core.ApiFuture;
import com.google.api.core.BetaApi;
import com.google.api.gax.longrunning.OperationFuture;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.internal.ApiBackgroundResource;
import com.google.cloud.pubsublite.proto.OperationMetadata;
import com.google.cloud.pubsublite.proto.Reservation;
import com.google.cloud.pubsublite.proto.SeekSubscriptionResponse;
import com.google.cloud.pubsublite.proto.Subscription;
import com.google.cloud.pubsublite.proto.Topic;
import com.google.protobuf.FieldMask;
Expand Down Expand Up @@ -156,6 +159,25 @@ ApiFuture<Subscription> createSubscription(
*/
ApiFuture<Subscription> updateSubscription(Subscription subscription, FieldMask mask);

/**
* Initiate an out-of-band seek for a subscription to a specified target, which may be timestamps
* or named positions within the message backlog.
*
* <p>See https://cloud.google.com/pubsub/lite/docs/seek for more information.
*
* @param path The path of the subscription to seek.
* @param target The location to seek to.
* @return A {@link com.google.api.gax.longrunning.OperationFuture} that returns an operation name
* if the seek was successfully initiated, or otherwise throw an {@link
* com.google.api.gax.rpc.ApiException}. {@link
* com.google.api.gax.longrunning.OperationFuture.get()} will return a response if the seek
* operation completes successfully, or otherwise throw an {@link
* com.google.api.gax.rpc.ApiException}.
*/
@BetaApi("This may not be implemented in the backend, it is a pre-release feature.")
OperationFuture<SeekSubscriptionResponse, OperationMetadata> seekSubscription(
SubscriptionPath path, SeekTarget target);

/**
* Delete the subscription with id {@code id} if it exists.
*
Expand Down
@@ -0,0 +1,54 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite;

import com.google.auto.value.AutoOneOf;
import com.google.protobuf.Timestamp;
import java.io.Serializable;

/** The target location to seek a subscription to. */
@AutoOneOf(SeekTarget.Kind.class)
public abstract class SeekTarget implements Serializable {
public enum Kind {
BACKLOG_LOCATION,
PUBLISH_TIME,
EVENT_TIME,
}

public abstract SeekTarget.Kind getKind();

public abstract BacklogLocation backlogLocation();

public abstract Timestamp publishTime();

public abstract Timestamp eventTime();

/** Seek to a named backlog location. */
public static SeekTarget of(BacklogLocation location) {
return AutoOneOf_SeekTarget.backlogLocation(location);
}

/** Seek to a message publish timestamp. */
public static SeekTarget ofPublishTime(Timestamp time) {
return AutoOneOf_SeekTarget.publishTime(time);
}

/** Seek to a message event timestamp. */
public static SeekTarget ofEventTime(Timestamp time) {
return AutoOneOf_SeekTarget.eventTime(time);
}
}
Expand Up @@ -18,11 +18,13 @@

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.longrunning.OperationFuture;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.BacklogLocation;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.LocationPath;
import com.google.cloud.pubsublite.ReservationPath;
import com.google.cloud.pubsublite.SeekTarget;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.proto.CreateReservationRequest;
Expand All @@ -43,7 +45,11 @@
import com.google.cloud.pubsublite.proto.ListTopicSubscriptionsRequest;
import com.google.cloud.pubsublite.proto.ListTopicsRequest;
import com.google.cloud.pubsublite.proto.ListTopicsResponse;
import com.google.cloud.pubsublite.proto.OperationMetadata;
import com.google.cloud.pubsublite.proto.Reservation;
import com.google.cloud.pubsublite.proto.SeekSubscriptionRequest;
import com.google.cloud.pubsublite.proto.SeekSubscriptionRequest.NamedTarget;
import com.google.cloud.pubsublite.proto.SeekSubscriptionResponse;
import com.google.cloud.pubsublite.proto.Subscription;
import com.google.cloud.pubsublite.proto.Topic;
import com.google.cloud.pubsublite.proto.TopicPartitions;
Expand Down Expand Up @@ -188,6 +194,32 @@ public ApiFuture<Subscription> updateSubscription(Subscription subscription, Fie
.build());
}

@Override
public OperationFuture<SeekSubscriptionResponse, OperationMetadata> seekSubscription(
SubscriptionPath path, SeekTarget target) {
SeekSubscriptionRequest.Builder request =
SeekSubscriptionRequest.newBuilder().setName(path.toString());
switch (target.getKind()) {
case BACKLOG_LOCATION:
switch (target.backlogLocation()) {
case END:
request.setNamedTarget(NamedTarget.HEAD);
break;
case BEGINNING:
request.setNamedTarget(NamedTarget.TAIL);
break;
}
break;
case PUBLISH_TIME:
request.getTimeTargetBuilder().setPublishTime(target.publishTime());
break;
case EVENT_TIME:
request.getTimeTargetBuilder().setEventTime(target.eventTime());
break;
}
return serviceClient.seekSubscriptionOperationCallable().futureCall(request.build());
}

@Override
public ApiFuture<Void> deleteSubscription(SubscriptionPath path) {
return ApiFutures.transform(
Expand Down
Expand Up @@ -20,12 +20,16 @@
import static com.google.cloud.pubsublite.internal.ApiExceptionMatcher.assertFutureThrowsCode;
import static com.google.cloud.pubsublite.internal.testing.UnitTestExamples.example;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertThrows;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.initMocks;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.longrunning.OperationFuture;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.OperationCallable;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.cloud.pubsublite.BacklogLocation;
Expand All @@ -35,6 +39,7 @@
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.ReservationName;
import com.google.cloud.pubsublite.ReservationPath;
import com.google.cloud.pubsublite.SeekTarget;
import com.google.cloud.pubsublite.SubscriptionName;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.TopicName;
Expand All @@ -59,9 +64,14 @@
import com.google.cloud.pubsublite.proto.ListTopicSubscriptionsResponse;
import com.google.cloud.pubsublite.proto.ListTopicsRequest;
import com.google.cloud.pubsublite.proto.ListTopicsResponse;
import com.google.cloud.pubsublite.proto.OperationMetadata;
import com.google.cloud.pubsublite.proto.Reservation;
import com.google.cloud.pubsublite.proto.SeekSubscriptionRequest;
import com.google.cloud.pubsublite.proto.SeekSubscriptionRequest.NamedTarget;
import com.google.cloud.pubsublite.proto.SeekSubscriptionResponse;
import com.google.cloud.pubsublite.proto.Subscription;
import com.google.cloud.pubsublite.proto.Subscription.DeliveryConfig;
import com.google.cloud.pubsublite.proto.TimeTarget;
import com.google.cloud.pubsublite.proto.Topic;
import com.google.cloud.pubsublite.proto.Topic.PartitionConfig;
import com.google.cloud.pubsublite.proto.TopicPartitions;
Expand All @@ -73,6 +83,7 @@
import com.google.common.collect.ImmutableList;
import com.google.protobuf.Empty;
import com.google.protobuf.FieldMask;
import com.google.protobuf.Timestamp;
import java.io.IOException;
import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -103,6 +114,8 @@ public class AdminClientImplTest {
.setThroughputCapacity(example(Reservation.class).getThroughputCapacity() + 1)
.build();

private static final String OPERATION_PATH = "/path/for/operation";

private static final <T> ApiFuture<T> failedPreconditionFuture() {
return ApiFutures.immediateFailedFuture(
new CheckedApiException(Code.FAILED_PRECONDITION).underlying);
Expand All @@ -128,6 +141,12 @@ private static final <T> ApiFuture<T> failedPreconditionFuture() {
@Mock UnaryCallable<UpdateSubscriptionRequest, Subscription> updateSubscriptionCallable;
@Mock UnaryCallable<DeleteSubscriptionRequest, Empty> deleteSubscriptionCallable;

@Mock
OperationCallable<SeekSubscriptionRequest, SeekSubscriptionResponse, OperationMetadata>
seekSubscriptionCallable;

@Mock OperationFuture<SeekSubscriptionResponse, OperationMetadata> seekFuture;

@Mock UnaryCallable<CreateReservationRequest, Reservation> createReservationCallable;
@Mock UnaryCallable<GetReservationRequest, Reservation> getReservationCallable;
@Mock UnaryCallable<ListReservationsRequest, ListReservationsResponse> listReservationsCallable;
Expand Down Expand Up @@ -159,6 +178,7 @@ public void setUp() throws IOException {
when(stub.listSubscriptionsCallable()).thenReturn(listSubscriptionsCallable);
when(stub.updateSubscriptionCallable()).thenReturn(updateSubscriptionCallable);
when(stub.deleteSubscriptionCallable()).thenReturn(deleteSubscriptionCallable);
when(stub.seekSubscriptionOperationCallable()).thenReturn(seekSubscriptionCallable);

when(stub.createReservationCallable()).thenReturn(createReservationCallable);
when(stub.getReservationCallable()).thenReturn(getReservationCallable);
Expand Down Expand Up @@ -546,6 +566,104 @@ public void listSubscriptions_Error() {
client.listSubscriptions(example(LocationPath.class)), Code.FAILED_PRECONDITION);
}

@Test
public void seekSubscription_PublishTimeOk() throws Exception {
Timestamp publishTime = Timestamp.newBuilder().setSeconds(123).build();
SeekSubscriptionRequest request =
SeekSubscriptionRequest.newBuilder()
.setName(example(SubscriptionPath.class).toString())
.setTimeTarget(TimeTarget.newBuilder().setPublishTime(publishTime))
.build();

when(seekFuture.getName()).thenReturn(OPERATION_PATH);
when(seekSubscriptionCallable.futureCall(request)).thenReturn(seekFuture);

assertThat(
client
.seekSubscription(
example(SubscriptionPath.class), SeekTarget.ofPublishTime(publishTime))
.getName())
.isEqualTo(OPERATION_PATH);
}

@Test
public void seekSubscription_EventTimeOk() throws Exception {
Timestamp eventTime = Timestamp.newBuilder().setSeconds(456).build();
SeekSubscriptionRequest request =
SeekSubscriptionRequest.newBuilder()
.setName(example(SubscriptionPath.class).toString())
.setTimeTarget(TimeTarget.newBuilder().setEventTime(eventTime))
.build();

when(seekFuture.getName()).thenReturn(OPERATION_PATH);
when(seekSubscriptionCallable.futureCall(request)).thenReturn(seekFuture);

assertThat(
client
.seekSubscription(
example(SubscriptionPath.class), SeekTarget.ofEventTime(eventTime))
.getName())
.isEqualTo(OPERATION_PATH);
}

@Test
public void seekSubscription_BacklogBeginningOk() throws Exception {
SeekSubscriptionRequest request =
SeekSubscriptionRequest.newBuilder()
.setName(example(SubscriptionPath.class).toString())
.setNamedTarget(NamedTarget.TAIL)
.build();

when(seekFuture.getName()).thenReturn(OPERATION_PATH);
when(seekSubscriptionCallable.futureCall(request)).thenReturn(seekFuture);

assertThat(
client
.seekSubscription(
example(SubscriptionPath.class), SeekTarget.of(BacklogLocation.BEGINNING))
.getName())
.isEqualTo(OPERATION_PATH);
}

@Test
public void seekSubscription_BacklogEndOk() throws Exception {
SeekSubscriptionRequest request =
SeekSubscriptionRequest.newBuilder()
.setName(example(SubscriptionPath.class).toString())
.setNamedTarget(NamedTarget.HEAD)
.build();

when(seekFuture.getName()).thenReturn(OPERATION_PATH);
when(seekSubscriptionCallable.futureCall(request)).thenReturn(seekFuture);

assertThat(
client
.seekSubscription(
example(SubscriptionPath.class), SeekTarget.of(BacklogLocation.END))
.getName())
.isEqualTo(OPERATION_PATH);
}

@Test
public void seekSubscription_Error() throws Exception {
SeekSubscriptionRequest request =
SeekSubscriptionRequest.newBuilder()
.setName(example(SubscriptionPath.class).toString())
.setNamedTarget(NamedTarget.HEAD)
.build();

when(seekFuture.getName()).thenThrow(new CheckedApiException(Code.NOT_FOUND).underlying);
when(seekSubscriptionCallable.futureCall(request)).thenReturn(seekFuture);

assertThrows(
ApiException.class,
() ->
client
.seekSubscription(
example(SubscriptionPath.class), SeekTarget.of(BacklogLocation.END))
.getName());
}

@Test
public void createReservation_Ok() throws Exception {
CreateReservationRequest request =
Expand Down

0 comments on commit 5fe36cb

Please sign in to comment.