Skip to content

Commit

Permalink
feat: ComputeTimeCursor RPC for Pub/Sub Lite (#615)
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 370536460

Source-Author: Google APIs <noreply@google.com>
Source-Date: Mon Apr 26 14:02:22 2021 -0700
Source-Repo: googleapis/googleapis
Source-Sha: ae5fb2884a296832c39867e8e8c81bbc72a32ce8
Source-Link: googleapis/googleapis@ae5fb28
  • Loading branch information
yoshi-automation committed Apr 26, 2021
1 parent f5e9cb9 commit f74b73c
Show file tree
Hide file tree
Showing 24 changed files with 3,973 additions and 203 deletions.
Expand Up @@ -23,6 +23,8 @@
import com.google.cloud.pubsublite.proto.ComputeHeadCursorResponse;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsRequest;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
import com.google.cloud.pubsublite.proto.ComputeTimeCursorRequest;
import com.google.cloud.pubsublite.proto.ComputeTimeCursorResponse;
import com.google.cloud.pubsublite.v1.stub.TopicStatsServiceStub;
import com.google.cloud.pubsublite.v1.stub.TopicStatsServiceStubSettings;
import java.io.IOException;
Expand Down Expand Up @@ -259,6 +261,57 @@ public final ComputeHeadCursorResponse computeHeadCursor(ComputeHeadCursorReques
return stub.computeHeadCursorCallable();
}

// AUTO-GENERATED DOCUMENTATION AND METHOD.
/**
* Compute the corresponding cursor for a publish or event time in a topic partition.
*
* <p>Sample code:
*
* <pre>{@code
* try (TopicStatsServiceClient topicStatsServiceClient = TopicStatsServiceClient.create()) {
* ComputeTimeCursorRequest request =
* ComputeTimeCursorRequest.newBuilder()
* .setTopic(TopicName.of("[PROJECT]", "[LOCATION]", "[TOPIC]").toString())
* .setPartition(-1799810326)
* .setTarget(TimeTarget.newBuilder().build())
* .build();
* ComputeTimeCursorResponse response = topicStatsServiceClient.computeTimeCursor(request);
* }
* }</pre>
*
* @param request The request object containing all of the parameters for the API call.
* @throws com.google.api.gax.rpc.ApiException if the remote call fails
*/
public final ComputeTimeCursorResponse computeTimeCursor(ComputeTimeCursorRequest request) {
return computeTimeCursorCallable().call(request);
}

// AUTO-GENERATED DOCUMENTATION AND METHOD.
/**
* Compute the corresponding cursor for a publish or event time in a topic partition.
*
* <p>Sample code:
*
* <pre>{@code
* try (TopicStatsServiceClient topicStatsServiceClient = TopicStatsServiceClient.create()) {
* ComputeTimeCursorRequest request =
* ComputeTimeCursorRequest.newBuilder()
* .setTopic(TopicName.of("[PROJECT]", "[LOCATION]", "[TOPIC]").toString())
* .setPartition(-1799810326)
* .setTarget(TimeTarget.newBuilder().build())
* .build();
* ApiFuture<ComputeTimeCursorResponse> future =
* topicStatsServiceClient.computeTimeCursorCallable().futureCall(request);
* // Do something.
* ComputeTimeCursorResponse response = future.get();
* }
* }</pre>
*/
public final UnaryCallable<ComputeTimeCursorRequest, ComputeTimeCursorResponse>
computeTimeCursorCallable() {
return stub.computeTimeCursorCallable();
}

@Override
public final void close() {
stub.close();
Expand Down
Expand Up @@ -30,6 +30,8 @@
import com.google.cloud.pubsublite.proto.ComputeHeadCursorResponse;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsRequest;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
import com.google.cloud.pubsublite.proto.ComputeTimeCursorRequest;
import com.google.cloud.pubsublite.proto.ComputeTimeCursorResponse;
import com.google.cloud.pubsublite.v1.stub.TopicStatsServiceStubSettings;
import java.io.IOException;
import java.util.List;
Expand Down Expand Up @@ -82,6 +84,12 @@ public class TopicStatsServiceSettings extends ClientSettings<TopicStatsServiceS
return ((TopicStatsServiceStubSettings) getStubSettings()).computeHeadCursorSettings();
}

/** Returns the object with the settings used for calls to computeTimeCursor. */
public UnaryCallSettings<ComputeTimeCursorRequest, ComputeTimeCursorResponse>
computeTimeCursorSettings() {
return ((TopicStatsServiceStubSettings) getStubSettings()).computeTimeCursorSettings();
}

public static final TopicStatsServiceSettings create(TopicStatsServiceStubSettings stub)
throws IOException {
return new TopicStatsServiceSettings.Builder(stub.toBuilder()).build();
Expand Down Expand Up @@ -192,6 +200,12 @@ public Builder applyToAllUnaryMethods(
return getStubSettingsBuilder().computeHeadCursorSettings();
}

/** Returns the builder for the settings used for calls to computeTimeCursor. */
public UnaryCallSettings.Builder<ComputeTimeCursorRequest, ComputeTimeCursorResponse>
computeTimeCursorSettings() {
return getStubSettingsBuilder().computeTimeCursorSettings();
}

@Override
public TopicStatsServiceSettings build() throws IOException {
return new TopicStatsServiceSettings(this);
Expand Down
Expand Up @@ -114,6 +114,9 @@
},
"ComputeMessageStats": {
"methods": ["computeMessageStats", "computeMessageStatsCallable"]
},
"ComputeTimeCursor": {
"methods": ["computeTimeCursor", "computeTimeCursorCallable"]
}
}
}
Expand Down
Expand Up @@ -27,6 +27,8 @@
import com.google.cloud.pubsublite.proto.ComputeHeadCursorResponse;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsRequest;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
import com.google.cloud.pubsublite.proto.ComputeTimeCursorRequest;
import com.google.cloud.pubsublite.proto.ComputeTimeCursorResponse;
import com.google.common.collect.ImmutableMap;
import com.google.longrunning.stub.GrpcOperationsStub;
import io.grpc.MethodDescriptor;
Expand Down Expand Up @@ -66,10 +68,23 @@ public class GrpcTopicStatsServiceStub extends TopicStatsServiceStub {
ProtoUtils.marshaller(ComputeHeadCursorResponse.getDefaultInstance()))
.build();

private static final MethodDescriptor<ComputeTimeCursorRequest, ComputeTimeCursorResponse>
computeTimeCursorMethodDescriptor =
MethodDescriptor.<ComputeTimeCursorRequest, ComputeTimeCursorResponse>newBuilder()
.setType(MethodDescriptor.MethodType.UNARY)
.setFullMethodName("google.cloud.pubsublite.v1.TopicStatsService/ComputeTimeCursor")
.setRequestMarshaller(
ProtoUtils.marshaller(ComputeTimeCursorRequest.getDefaultInstance()))
.setResponseMarshaller(
ProtoUtils.marshaller(ComputeTimeCursorResponse.getDefaultInstance()))
.build();

private final UnaryCallable<ComputeMessageStatsRequest, ComputeMessageStatsResponse>
computeMessageStatsCallable;
private final UnaryCallable<ComputeHeadCursorRequest, ComputeHeadCursorResponse>
computeHeadCursorCallable;
private final UnaryCallable<ComputeTimeCursorRequest, ComputeTimeCursorResponse>
computeTimeCursorCallable;

private final BackgroundResource backgroundResources;
private final GrpcOperationsStub operationsStub;
Expand Down Expand Up @@ -143,6 +158,20 @@ public Map<String, String> extract(ComputeHeadCursorRequest request) {
}
})
.build();
GrpcCallSettings<ComputeTimeCursorRequest, ComputeTimeCursorResponse>
computeTimeCursorTransportSettings =
GrpcCallSettings.<ComputeTimeCursorRequest, ComputeTimeCursorResponse>newBuilder()
.setMethodDescriptor(computeTimeCursorMethodDescriptor)
.setParamsExtractor(
new RequestParamsExtractor<ComputeTimeCursorRequest>() {
@Override
public Map<String, String> extract(ComputeTimeCursorRequest request) {
ImmutableMap.Builder<String, String> params = ImmutableMap.builder();
params.put("topic", String.valueOf(request.getTopic()));
return params.build();
}
})
.build();

this.computeMessageStatsCallable =
callableFactory.createUnaryCallable(
Expand All @@ -154,6 +183,11 @@ public Map<String, String> extract(ComputeHeadCursorRequest request) {
computeHeadCursorTransportSettings,
settings.computeHeadCursorSettings(),
clientContext);
this.computeTimeCursorCallable =
callableFactory.createUnaryCallable(
computeTimeCursorTransportSettings,
settings.computeTimeCursorSettings(),
clientContext);

this.backgroundResources =
new BackgroundResourceAggregation(clientContext.getBackgroundResources());
Expand All @@ -175,6 +209,12 @@ public GrpcOperationsStub getOperationsStub() {
return computeHeadCursorCallable;
}

@Override
public UnaryCallable<ComputeTimeCursorRequest, ComputeTimeCursorResponse>
computeTimeCursorCallable() {
return computeTimeCursorCallable;
}

@Override
public final void close() {
shutdown();
Expand Down
Expand Up @@ -22,6 +22,8 @@
import com.google.cloud.pubsublite.proto.ComputeHeadCursorResponse;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsRequest;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
import com.google.cloud.pubsublite.proto.ComputeTimeCursorRequest;
import com.google.cloud.pubsublite.proto.ComputeTimeCursorResponse;
import javax.annotation.Generated;

// AUTO-GENERATED DOCUMENTATION AND CLASS.
Expand All @@ -43,6 +45,11 @@ public abstract class TopicStatsServiceStub implements BackgroundResource {
throw new UnsupportedOperationException("Not implemented: computeHeadCursorCallable()");
}

public UnaryCallable<ComputeTimeCursorRequest, ComputeTimeCursorResponse>
computeTimeCursorCallable() {
throw new UnsupportedOperationException("Not implemented: computeTimeCursorCallable()");
}

@Override
public abstract void close();
}
Expand Up @@ -35,6 +35,8 @@
import com.google.cloud.pubsublite.proto.ComputeHeadCursorResponse;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsRequest;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
import com.google.cloud.pubsublite.proto.ComputeTimeCursorRequest;
import com.google.cloud.pubsublite.proto.ComputeTimeCursorResponse;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -87,6 +89,8 @@ public class TopicStatsServiceStubSettings extends StubSettings<TopicStatsServic
computeMessageStatsSettings;
private final UnaryCallSettings<ComputeHeadCursorRequest, ComputeHeadCursorResponse>
computeHeadCursorSettings;
private final UnaryCallSettings<ComputeTimeCursorRequest, ComputeTimeCursorResponse>
computeTimeCursorSettings;

/** Returns the object with the settings used for calls to computeMessageStats. */
public UnaryCallSettings<ComputeMessageStatsRequest, ComputeMessageStatsResponse>
Expand All @@ -100,6 +104,12 @@ public class TopicStatsServiceStubSettings extends StubSettings<TopicStatsServic
return computeHeadCursorSettings;
}

/** Returns the object with the settings used for calls to computeTimeCursor. */
public UnaryCallSettings<ComputeTimeCursorRequest, ComputeTimeCursorResponse>
computeTimeCursorSettings() {
return computeTimeCursorSettings;
}

@BetaApi("A restructuring of stub classes is planned, so this may break in the future")
public TopicStatsServiceStub createStub() throws IOException {
if (getTransportChannelProvider()
Expand Down Expand Up @@ -171,6 +181,7 @@ protected TopicStatsServiceStubSettings(Builder settingsBuilder) throws IOExcept

computeMessageStatsSettings = settingsBuilder.computeMessageStatsSettings().build();
computeHeadCursorSettings = settingsBuilder.computeHeadCursorSettings().build();
computeTimeCursorSettings = settingsBuilder.computeTimeCursorSettings().build();
}

/** Builder for TopicStatsServiceStubSettings. */
Expand All @@ -180,6 +191,8 @@ public static class Builder extends StubSettings.Builder<TopicStatsServiceStubSe
computeMessageStatsSettings;
private final UnaryCallSettings.Builder<ComputeHeadCursorRequest, ComputeHeadCursorResponse>
computeHeadCursorSettings;
private final UnaryCallSettings.Builder<ComputeTimeCursorRequest, ComputeTimeCursorResponse>
computeTimeCursorSettings;
private static final ImmutableMap<String, ImmutableSet<StatusCode.Code>>
RETRYABLE_CODE_DEFINITIONS;

Expand All @@ -195,7 +208,6 @@ public static class Builder extends StubSettings.Builder<TopicStatsServiceStubSe
StatusCode.Code.ABORTED,
StatusCode.Code.INTERNAL,
StatusCode.Code.UNKNOWN)));
definitions.put("no_retry_codes", ImmutableSet.copyOf(Lists.<StatusCode.Code>newArrayList()));
RETRYABLE_CODE_DEFINITIONS = definitions.build();
}

Expand All @@ -215,8 +227,6 @@ public static class Builder extends StubSettings.Builder<TopicStatsServiceStubSe
.setTotalTimeout(Duration.ofMillis(600000L))
.build();
definitions.put("retry_policy_0_params", settings);
settings = RetrySettings.newBuilder().setRpcTimeoutMultiplier(1.0).build();
definitions.put("no_retry_params", settings);
RETRY_PARAM_DEFINITIONS = definitions.build();
}

Expand All @@ -229,10 +239,11 @@ protected Builder(ClientContext clientContext) {

computeMessageStatsSettings = UnaryCallSettings.newUnaryCallSettingsBuilder();
computeHeadCursorSettings = UnaryCallSettings.newUnaryCallSettingsBuilder();
computeTimeCursorSettings = UnaryCallSettings.newUnaryCallSettingsBuilder();

unaryMethodSettingsBuilders =
ImmutableList.<UnaryCallSettings.Builder<?, ?>>of(
computeMessageStatsSettings, computeHeadCursorSettings);
computeMessageStatsSettings, computeHeadCursorSettings, computeTimeCursorSettings);
initDefaults(this);
}

Expand All @@ -241,10 +252,11 @@ protected Builder(TopicStatsServiceStubSettings settings) {

computeMessageStatsSettings = settings.computeMessageStatsSettings.toBuilder();
computeHeadCursorSettings = settings.computeHeadCursorSettings.toBuilder();
computeTimeCursorSettings = settings.computeTimeCursorSettings.toBuilder();

unaryMethodSettingsBuilders =
ImmutableList.<UnaryCallSettings.Builder<?, ?>>of(
computeMessageStatsSettings, computeHeadCursorSettings);
computeMessageStatsSettings, computeHeadCursorSettings, computeTimeCursorSettings);
}

private static Builder createDefault() {
Expand All @@ -266,8 +278,13 @@ private static Builder initDefaults(Builder builder) {

builder
.computeHeadCursorSettings()
.setRetryableCodes(RETRYABLE_CODE_DEFINITIONS.get("no_retry_codes"))
.setRetrySettings(RETRY_PARAM_DEFINITIONS.get("no_retry_params"));
.setRetryableCodes(RETRYABLE_CODE_DEFINITIONS.get("retry_policy_0_codes"))
.setRetrySettings(RETRY_PARAM_DEFINITIONS.get("retry_policy_0_params"));

builder
.computeTimeCursorSettings()
.setRetryableCodes(RETRYABLE_CODE_DEFINITIONS.get("retry_policy_0_codes"))
.setRetrySettings(RETRY_PARAM_DEFINITIONS.get("retry_policy_0_params"));

return builder;
}
Expand Down Expand Up @@ -300,6 +317,12 @@ public Builder applyToAllUnaryMethods(
return computeHeadCursorSettings;
}

/** Returns the builder for the settings used for calls to computeTimeCursor. */
public UnaryCallSettings.Builder<ComputeTimeCursorRequest, ComputeTimeCursorResponse>
computeTimeCursorSettings() {
return computeTimeCursorSettings;
}

@Override
public TopicStatsServiceStubSettings build() throws IOException {
return new TopicStatsServiceStubSettings(this);
Expand Down
Expand Up @@ -21,6 +21,8 @@
import com.google.cloud.pubsublite.proto.ComputeHeadCursorResponse;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsRequest;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
import com.google.cloud.pubsublite.proto.ComputeTimeCursorRequest;
import com.google.cloud.pubsublite.proto.ComputeTimeCursorResponse;
import com.google.cloud.pubsublite.proto.TopicStatsServiceGrpc.TopicStatsServiceImplBase;
import com.google.protobuf.AbstractMessage;
import io.grpc.stub.StreamObserver;
Expand Down Expand Up @@ -105,4 +107,26 @@ public void computeHeadCursor(
Exception.class.getName())));
}
}

@Override
public void computeTimeCursor(
ComputeTimeCursorRequest request,
StreamObserver<ComputeTimeCursorResponse> responseObserver) {
Object response = responses.poll();
if (response instanceof ComputeTimeCursorResponse) {
requests.add(request);
responseObserver.onNext(((ComputeTimeCursorResponse) response));
responseObserver.onCompleted();
} else if (response instanceof Exception) {
responseObserver.onError(((Exception) response));
} else {
responseObserver.onError(
new IllegalArgumentException(
String.format(
"Unrecognized response type %s for method ComputeTimeCursor, expected %s or %s",
response == null ? "null" : response.getClass().getName(),
ComputeTimeCursorResponse.class.getName(),
Exception.class.getName())));
}
}
}

0 comments on commit f74b73c

Please sign in to comment.