Skip to content

Commit

Permalink
feat: Add ComputeHeadCursor RPC for Pub/Sub Lite. (#429)
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 347681363

Source-Author: Google APIs <noreply@google.com>
Source-Date: Tue Dec 15 13:31:04 2020 -0800
Source-Repo: googleapis/googleapis
Source-Sha: f967ea0c0437a269515665ff9dbb69fcf134ddd9
Source-Link: googleapis/googleapis@f967ea0

Co-authored-by: dpcollins-google <40498610+dpcollins-google@users.noreply.github.com>
  • Loading branch information
yoshi-automation and dpcollins-google committed Dec 16, 2020
1 parent 86aecc9 commit 34d8d02
Show file tree
Hide file tree
Showing 28 changed files with 2,182 additions and 216 deletions.
Expand Up @@ -19,6 +19,8 @@
import com.google.api.core.BetaApi;
import com.google.api.gax.core.BackgroundResource;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.cloud.pubsublite.proto.ComputeHeadCursorRequest;
import com.google.cloud.pubsublite.proto.ComputeHeadCursorResponse;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsRequest;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
import com.google.cloud.pubsublite.v1.stub.TopicStatsServiceStub;
Expand Down Expand Up @@ -159,6 +161,34 @@ public final ComputeMessageStatsResponse computeMessageStats(ComputeMessageStats
return stub.computeMessageStatsCallable();
}

// AUTO-GENERATED DOCUMENTATION AND METHOD.
/**
* Compute the head cursor for the partition. The head cursor?s offset is guaranteed to be before
* or equal to all messages which have not yet been acknowledged to be published, and greater than
* the offset of any message whose publish has already been acknowledged. It is 0 if there have
* never been messages on the partition.
*
* @param request The request object containing all of the parameters for the API call.
* @throws com.google.api.gax.rpc.ApiException if the remote call fails
*/
public final ComputeHeadCursorResponse computeHeadCursor(ComputeHeadCursorRequest request) {
return computeHeadCursorCallable().call(request);
}

// AUTO-GENERATED DOCUMENTATION AND METHOD.
/**
* Compute the head cursor for the partition. The head cursor?s offset is guaranteed to be before
* or equal to all messages which have not yet been acknowledged to be published, and greater than
* the offset of any message whose publish has already been acknowledged. It is 0 if there have
* never been messages on the partition.
*
* <p>Sample code:
*/
public final UnaryCallable<ComputeHeadCursorRequest, ComputeHeadCursorResponse>
computeHeadCursorCallable() {
return stub.computeHeadCursorCallable();
}

@Override
public final void close() {
stub.close();
Expand Down
Expand Up @@ -26,6 +26,8 @@
import com.google.api.gax.rpc.ClientSettings;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.api.gax.rpc.UnaryCallSettings;
import com.google.cloud.pubsublite.proto.ComputeHeadCursorRequest;
import com.google.cloud.pubsublite.proto.ComputeHeadCursorResponse;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsRequest;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
import com.google.cloud.pubsublite.v1.stub.TopicStatsServiceStubSettings;
Expand Down Expand Up @@ -74,6 +76,12 @@ public class TopicStatsServiceSettings extends ClientSettings<TopicStatsServiceS
return ((TopicStatsServiceStubSettings) getStubSettings()).computeMessageStatsSettings();
}

/** Returns the object with the settings used for calls to computeHeadCursor. */
public UnaryCallSettings<ComputeHeadCursorRequest, ComputeHeadCursorResponse>
computeHeadCursorSettings() {
return ((TopicStatsServiceStubSettings) getStubSettings()).computeHeadCursorSettings();
}

public static final TopicStatsServiceSettings create(TopicStatsServiceStubSettings stub)
throws IOException {
return new TopicStatsServiceSettings.Builder(stub.toBuilder()).build();
Expand Down Expand Up @@ -178,6 +186,12 @@ public Builder applyToAllUnaryMethods(
return getStubSettingsBuilder().computeMessageStatsSettings();
}

/** Returns the builder for the settings used for calls to computeHeadCursor. */
public UnaryCallSettings.Builder<ComputeHeadCursorRequest, ComputeHeadCursorResponse>
computeHeadCursorSettings() {
return getStubSettingsBuilder().computeHeadCursorSettings();
}

@Override
public TopicStatsServiceSettings build() throws IOException {
return new TopicStatsServiceSettings(this);
Expand Down
Expand Up @@ -23,6 +23,8 @@
import com.google.api.gax.rpc.ClientContext;
import com.google.api.gax.rpc.RequestParamsExtractor;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.cloud.pubsublite.proto.ComputeHeadCursorRequest;
import com.google.cloud.pubsublite.proto.ComputeHeadCursorResponse;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsRequest;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -53,8 +55,21 @@ public class GrpcTopicStatsServiceStub extends TopicStatsServiceStub {
ProtoUtils.marshaller(ComputeMessageStatsResponse.getDefaultInstance()))
.build();

private static final MethodDescriptor<ComputeHeadCursorRequest, ComputeHeadCursorResponse>
computeHeadCursorMethodDescriptor =
MethodDescriptor.<ComputeHeadCursorRequest, ComputeHeadCursorResponse>newBuilder()
.setType(MethodDescriptor.MethodType.UNARY)
.setFullMethodName("google.cloud.pubsublite.v1.TopicStatsService/ComputeHeadCursor")
.setRequestMarshaller(
ProtoUtils.marshaller(ComputeHeadCursorRequest.getDefaultInstance()))
.setResponseMarshaller(
ProtoUtils.marshaller(ComputeHeadCursorResponse.getDefaultInstance()))
.build();

private final UnaryCallable<ComputeMessageStatsRequest, ComputeMessageStatsResponse>
computeMessageStatsCallable;
private final UnaryCallable<ComputeHeadCursorRequest, ComputeHeadCursorResponse>
computeHeadCursorCallable;

private final BackgroundResource backgroundResources;
private final GrpcOperationsStub operationsStub;
Expand Down Expand Up @@ -114,12 +129,31 @@ public Map<String, String> extract(ComputeMessageStatsRequest request) {
}
})
.build();
GrpcCallSettings<ComputeHeadCursorRequest, ComputeHeadCursorResponse>
computeHeadCursorTransportSettings =
GrpcCallSettings.<ComputeHeadCursorRequest, ComputeHeadCursorResponse>newBuilder()
.setMethodDescriptor(computeHeadCursorMethodDescriptor)
.setParamsExtractor(
new RequestParamsExtractor<ComputeHeadCursorRequest>() {
@Override
public Map<String, String> extract(ComputeHeadCursorRequest request) {
ImmutableMap.Builder<String, String> params = ImmutableMap.builder();
params.put("topic", String.valueOf(request.getTopic()));
return params.build();
}
})
.build();

this.computeMessageStatsCallable =
callableFactory.createUnaryCallable(
computeMessageStatsTransportSettings,
settings.computeMessageStatsSettings(),
clientContext);
this.computeHeadCursorCallable =
callableFactory.createUnaryCallable(
computeHeadCursorTransportSettings,
settings.computeHeadCursorSettings(),
clientContext);

this.backgroundResources =
new BackgroundResourceAggregation(clientContext.getBackgroundResources());
Expand All @@ -134,6 +168,11 @@ public GrpcOperationsStub getOperationsStub() {
return computeMessageStatsCallable;
}

public UnaryCallable<ComputeHeadCursorRequest, ComputeHeadCursorResponse>
computeHeadCursorCallable() {
return computeHeadCursorCallable;
}

@Override
public final void close() {
shutdown();
Expand Down
Expand Up @@ -18,6 +18,8 @@

import com.google.api.gax.core.BackgroundResource;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.cloud.pubsublite.proto.ComputeHeadCursorRequest;
import com.google.cloud.pubsublite.proto.ComputeHeadCursorResponse;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsRequest;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
import javax.annotation.Generated;
Expand All @@ -36,6 +38,11 @@ public abstract class TopicStatsServiceStub implements BackgroundResource {
throw new UnsupportedOperationException("Not implemented: computeMessageStatsCallable()");
}

public UnaryCallable<ComputeHeadCursorRequest, ComputeHeadCursorResponse>
computeHeadCursorCallable() {
throw new UnsupportedOperationException("Not implemented: computeHeadCursorCallable()");
}

@Override
public abstract void close();
}
Expand Up @@ -31,6 +31,8 @@
import com.google.api.gax.rpc.StubSettings;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.api.gax.rpc.UnaryCallSettings;
import com.google.cloud.pubsublite.proto.ComputeHeadCursorRequest;
import com.google.cloud.pubsublite.proto.ComputeHeadCursorResponse;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsRequest;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -84,13 +86,21 @@ public class TopicStatsServiceStubSettings extends StubSettings<TopicStatsServic

private final UnaryCallSettings<ComputeMessageStatsRequest, ComputeMessageStatsResponse>
computeMessageStatsSettings;
private final UnaryCallSettings<ComputeHeadCursorRequest, ComputeHeadCursorResponse>
computeHeadCursorSettings;

/** Returns the object with the settings used for calls to computeMessageStats. */
public UnaryCallSettings<ComputeMessageStatsRequest, ComputeMessageStatsResponse>
computeMessageStatsSettings() {
return computeMessageStatsSettings;
}

/** Returns the object with the settings used for calls to computeHeadCursor. */
public UnaryCallSettings<ComputeHeadCursorRequest, ComputeHeadCursorResponse>
computeHeadCursorSettings() {
return computeHeadCursorSettings;
}

@BetaApi("A restructuring of stub classes is planned, so this may break in the future")
public TopicStatsServiceStub createStub() throws IOException {
if (getTransportChannelProvider()
Expand Down Expand Up @@ -161,13 +171,16 @@ protected TopicStatsServiceStubSettings(Builder settingsBuilder) throws IOExcept
super(settingsBuilder);

computeMessageStatsSettings = settingsBuilder.computeMessageStatsSettings().build();
computeHeadCursorSettings = settingsBuilder.computeHeadCursorSettings().build();
}

/** Builder for TopicStatsServiceStubSettings. */
public static class Builder extends StubSettings.Builder<TopicStatsServiceStubSettings, Builder> {
private final ImmutableList<UnaryCallSettings.Builder<?, ?>> unaryMethodSettingsBuilders;
private final UnaryCallSettings.Builder<ComputeMessageStatsRequest, ComputeMessageStatsResponse>
computeMessageStatsSettings;
private final UnaryCallSettings.Builder<ComputeHeadCursorRequest, ComputeHeadCursorResponse>
computeHeadCursorSettings;
private static final ImmutableMap<String, ImmutableSet<StatusCode.Code>>
RETRYABLE_CODE_DEFINITIONS;

Expand All @@ -183,6 +196,7 @@ public static class Builder extends StubSettings.Builder<TopicStatsServiceStubSe
StatusCode.Code.ABORTED,
StatusCode.Code.INTERNAL,
StatusCode.Code.UNKNOWN)));
definitions.put("no_retry_codes", ImmutableSet.copyOf(Lists.<StatusCode.Code>newArrayList()));
RETRYABLE_CODE_DEFINITIONS = definitions.build();
}

Expand All @@ -202,6 +216,8 @@ public static class Builder extends StubSettings.Builder<TopicStatsServiceStubSe
.setTotalTimeout(Duration.ofMillis(600000L))
.build();
definitions.put("retry_policy_0_params", settings);
settings = RetrySettings.newBuilder().setRpcTimeoutMultiplier(1.0).build();
definitions.put("no_retry_params", settings);
RETRY_PARAM_DEFINITIONS = definitions.build();
}

Expand All @@ -213,19 +229,23 @@ protected Builder(ClientContext clientContext) {
super(clientContext);

computeMessageStatsSettings = UnaryCallSettings.newUnaryCallSettingsBuilder();
computeHeadCursorSettings = UnaryCallSettings.newUnaryCallSettingsBuilder();

unaryMethodSettingsBuilders =
ImmutableList.<UnaryCallSettings.Builder<?, ?>>of(computeMessageStatsSettings);
ImmutableList.<UnaryCallSettings.Builder<?, ?>>of(
computeMessageStatsSettings, computeHeadCursorSettings);
initDefaults(this);
}

protected Builder(TopicStatsServiceStubSettings settings) {
super(settings);

computeMessageStatsSettings = settings.computeMessageStatsSettings.toBuilder();
computeHeadCursorSettings = settings.computeHeadCursorSettings.toBuilder();

unaryMethodSettingsBuilders =
ImmutableList.<UnaryCallSettings.Builder<?, ?>>of(computeMessageStatsSettings);
ImmutableList.<UnaryCallSettings.Builder<?, ?>>of(
computeMessageStatsSettings, computeHeadCursorSettings);
}

private static Builder createDefault() {
Expand All @@ -245,6 +265,11 @@ private static Builder initDefaults(Builder builder) {
.setRetryableCodes(RETRYABLE_CODE_DEFINITIONS.get("retry_policy_0_codes"))
.setRetrySettings(RETRY_PARAM_DEFINITIONS.get("retry_policy_0_params"));

builder
.computeHeadCursorSettings()
.setRetryableCodes(RETRYABLE_CODE_DEFINITIONS.get("no_retry_codes"))
.setRetrySettings(RETRY_PARAM_DEFINITIONS.get("no_retry_params"));

return builder;
}

Expand All @@ -270,6 +295,12 @@ public Builder applyToAllUnaryMethods(
return computeMessageStatsSettings;
}

/** Returns the builder for the settings used for calls to computeHeadCursor. */
public UnaryCallSettings.Builder<ComputeHeadCursorRequest, ComputeHeadCursorResponse>
computeHeadCursorSettings() {
return computeHeadCursorSettings;
}

@Override
public TopicStatsServiceStubSettings build() throws IOException {
return new TopicStatsServiceStubSettings(this);
Expand Down
Expand Up @@ -17,6 +17,8 @@
package com.google.cloud.pubsublite.v1;

import com.google.api.core.BetaApi;
import com.google.cloud.pubsublite.proto.ComputeHeadCursorRequest;
import com.google.cloud.pubsublite.proto.ComputeHeadCursorResponse;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsRequest;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
import com.google.cloud.pubsublite.proto.TopicStatsServiceGrpc.TopicStatsServiceImplBase;
Expand Down Expand Up @@ -75,4 +77,20 @@ public void computeMessageStats(
responseObserver.onError(new IllegalArgumentException("Unrecognized response type"));
}
}

@Override
public void computeHeadCursor(
ComputeHeadCursorRequest request,
StreamObserver<ComputeHeadCursorResponse> responseObserver) {
Object response = responses.remove();
if (response instanceof ComputeHeadCursorResponse) {
requests.add(request);
responseObserver.onNext(((ComputeHeadCursorResponse) response));
responseObserver.onCompleted();
} else if (response instanceof Exception) {
responseObserver.onError(((Exception) response));
} else {
responseObserver.onError(new IllegalArgumentException("Unrecognized response type"));
}
}
}
Expand Up @@ -23,6 +23,8 @@
import com.google.api.gax.grpc.testing.MockServiceHelper;
import com.google.api.gax.rpc.ApiClientHeaderProvider;
import com.google.api.gax.rpc.InvalidArgumentException;
import com.google.cloud.pubsublite.proto.ComputeHeadCursorRequest;
import com.google.cloud.pubsublite.proto.ComputeHeadCursorResponse;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsRequest;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
import com.google.cloud.pubsublite.proto.Cursor;
Expand Down Expand Up @@ -135,4 +137,49 @@ public void computeMessageStatsExceptionTest() throws Exception {
// Expected exception.
}
}

@Test
public void computeHeadCursorTest() throws Exception {
ComputeHeadCursorResponse expectedResponse =
ComputeHeadCursorResponse.newBuilder().setHeadCursor(Cursor.newBuilder().build()).build();
mockTopicStatsService.addResponse(expectedResponse);

ComputeHeadCursorRequest request =
ComputeHeadCursorRequest.newBuilder()
.setTopic(TopicName.of("[PROJECT]", "[LOCATION]", "[TOPIC]").toString())
.setPartition(-1799810326)
.build();

ComputeHeadCursorResponse actualResponse = client.computeHeadCursor(request);
Assert.assertEquals(expectedResponse, actualResponse);

List<AbstractMessage> actualRequests = mockTopicStatsService.getRequests();
Assert.assertEquals(1, actualRequests.size());
ComputeHeadCursorRequest actualRequest = ((ComputeHeadCursorRequest) actualRequests.get(0));

Assert.assertEquals(request.getTopic(), actualRequest.getTopic());
Assert.assertEquals(request.getPartition(), actualRequest.getPartition());
Assert.assertTrue(
channelProvider.isHeaderSent(
ApiClientHeaderProvider.getDefaultApiClientHeaderKey(),
GaxGrpcProperties.getDefaultApiClientHeaderPattern()));
}

@Test
public void computeHeadCursorExceptionTest() throws Exception {
StatusRuntimeException exception = new StatusRuntimeException(io.grpc.Status.INVALID_ARGUMENT);
mockTopicStatsService.addException(exception);

try {
ComputeHeadCursorRequest request =
ComputeHeadCursorRequest.newBuilder()
.setTopic(TopicName.of("[PROJECT]", "[LOCATION]", "[TOPIC]").toString())
.setPartition(-1799810326)
.build();
client.computeHeadCursor(request);
Assert.fail("No exception raised");
} catch (InvalidArgumentException e) {
// Expected exception.
}
}
}

0 comments on commit 34d8d02

Please sign in to comment.