Skip to content

Commit

Permalink
feat: Add the TopicStats client (#179)
Browse files Browse the repository at this point in the history
* feat: Add the TopicStats client

Adds an interface and implementation for the TopicStatsClient. I've kept
them both in internal for now since we don't yet expose an API for
interacting with the CursorService.

* Changes to address review comments

* move comment
  • Loading branch information
palmere-google committed Jul 31, 2020
1 parent f3c4f79 commit 7eb7861
Show file tree
Hide file tree
Showing 10 changed files with 642 additions and 108 deletions.
Expand Up @@ -25,29 +25,21 @@
import io.grpc.StatusException;
import java.io.IOException;
import java.util.Optional;
import org.threeten.bp.Duration;

@AutoValue
public abstract class AdminClientSettings {
public static final RetrySettings DEFAULT_RETRY_SETTINGS =
RetrySettings.newBuilder()
.setInitialRetryDelay(Duration.ofMillis(100))
.setRetryDelayMultiplier(1.3)
.setMaxRetryDelay(Duration.ofSeconds(60))
.setJittered(true)
.setTotalTimeout(Duration.ofMinutes(10))
.build();

// Required parameters.
abstract CloudRegion region();

// Optional parameters.
abstract Optional<RetrySettings> retrySettings();
abstract RetrySettings retrySettings();

abstract Optional<AdminServiceBlockingStub> stub();

public static Builder newBuilder() {
return new AutoValue_AdminClientSettings.Builder();
return new AutoValue_AdminClientSettings.Builder()
.setRetrySettings(Constants.DEFAULT_RETRY_SETTINGS);
}

@AutoValue.Builder
Expand Down Expand Up @@ -79,6 +71,6 @@ AdminClient instantiate() throws StatusException {
.asException();
}
}
return new AdminClientImpl(region(), stub, retrySettings().orElse(DEFAULT_RETRY_SETTINGS));
return new AdminClientImpl(region(), stub, retrySettings());
}
}
Expand Up @@ -16,7 +16,19 @@

package com.google.cloud.pubsublite;

import com.google.api.gax.retrying.RetrySettings;
import org.threeten.bp.Duration;

public class Constants {
public static final RetrySettings DEFAULT_RETRY_SETTINGS =
RetrySettings.newBuilder()
.setInitialRetryDelay(Duration.ofMillis(100))
.setRetryDelayMultiplier(1.3)
.setMaxRetryDelay(Duration.ofSeconds(60))
.setJittered(true)
.setTotalTimeout(Duration.ofMinutes(10))
.build();

public static final long MAX_PUBLISH_BATCH_COUNT = 1_000;
public static final long MAX_PUBLISH_MESSAGE_BYTES = 1_000_000;
public static final long MAX_PUBLISH_BATCH_BYTES = 3_500_000;
Expand Down
Expand Up @@ -17,20 +17,12 @@
package com.google.cloud.pubsublite.internal;

import com.google.api.core.ApiFuture;
import com.google.api.core.NanoClock;
import com.google.api.gax.core.BackgroundResourceAggregation;
import com.google.api.gax.core.ExecutorAsBackgroundResource;
import com.google.api.gax.retrying.ExponentialRetryAlgorithm;
import com.google.api.gax.retrying.ResultRetryAlgorithm;
import com.google.api.gax.retrying.RetryAlgorithm;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.retrying.RetryingExecutor;
import com.google.api.gax.retrying.RetryingFuture;
import com.google.api.gax.retrying.ScheduledRetryingExecutor;
import com.google.api.gax.retrying.TimedAttemptSettings;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.ErrorCodes;
import com.google.cloud.pubsublite.LocationPath;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.SubscriptionPaths;
Expand All @@ -53,10 +45,7 @@
import com.google.cloud.pubsublite.proto.UpdateTopicRequest;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.FieldMask;
import io.grpc.Status;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

Expand Down Expand Up @@ -91,59 +80,28 @@ private AdminClientImpl(
super(ImmutableList.of(new ExecutorAsBackgroundResource(executor)));
this.region = region;
this.stub = stub;
this.voidRetryingExecutor = retryingExecutor(retrySettings, executor);
this.topicRetryingExecutor = retryingExecutor(retrySettings, executor);
this.subscriptionRetryingExecutor = retryingExecutor(retrySettings, executor);
this.partitionCountRetryingExecutor = retryingExecutor(retrySettings, executor);
this.listTopicsRetryingExecutor = retryingExecutor(retrySettings, executor);
this.listSubscriptionsRetryingExecutor = retryingExecutor(retrySettings, executor);
this.listTopicSubscriptionsRetryingExecutor = retryingExecutor(retrySettings, executor);
}

private static <T> RetryingExecutor<T> retryingExecutor(
RetrySettings settings, ScheduledExecutorService executor) {
return new ScheduledRetryingExecutor<>(retryAlgorithm(settings), executor);
}

private static <T> RetryAlgorithm<T> retryAlgorithm(RetrySettings retrySettings) {
return new RetryAlgorithm<>(
resultRetryAlgorithm(),
new ExponentialRetryAlgorithm(retrySettings, NanoClock.getDefaultClock()));
}

private static <T> ResultRetryAlgorithm<T> resultRetryAlgorithm() {
return new ResultRetryAlgorithm<T>() {
@Override
public TimedAttemptSettings createNextAttempt(
Throwable prevThrowable, T prevResponse, TimedAttemptSettings prevSettings) {
return null; // Null means no specific settings.
}

@Override
public boolean shouldRetry(Throwable prevThrowable, T prevResponse) {
if (null != prevResponse) return false;
Optional<Status> statusOr = ExtractStatus.extract(prevThrowable);
if (!statusOr.isPresent()) return false; // Received a non-grpc error.
return ErrorCodes.IsRetryable(statusOr.get().getCode());
}
};
this.voidRetryingExecutor = RetryingExecutorUtil.retryingExecutor(retrySettings, executor);
this.topicRetryingExecutor = RetryingExecutorUtil.retryingExecutor(retrySettings, executor);
this.subscriptionRetryingExecutor =
RetryingExecutorUtil.retryingExecutor(retrySettings, executor);
this.partitionCountRetryingExecutor =
RetryingExecutorUtil.retryingExecutor(retrySettings, executor);
this.listTopicsRetryingExecutor =
RetryingExecutorUtil.retryingExecutor(retrySettings, executor);
this.listSubscriptionsRetryingExecutor =
RetryingExecutorUtil.retryingExecutor(retrySettings, executor);
this.listTopicSubscriptionsRetryingExecutor =
RetryingExecutorUtil.retryingExecutor(retrySettings, executor);
}

@Override
public CloudRegion region() {
return region;
}

private static <T> ApiFuture<T> runWithRetries(
Callable<T> callable, RetryingExecutor<T> executor) {
RetryingFuture<T> retryingFuture = executor.createFuture(callable);
retryingFuture.setAttemptFuture(executor.submit(retryingFuture));
return retryingFuture;
}

@Override
public ApiFuture<Topic> createTopic(Topic topic) {
return runWithRetries(
return RetryingExecutorUtil.runWithRetries(
() -> {
TopicPath path = TopicPath.of(topic.getName());
return stub.createTopic(
Expand All @@ -158,14 +116,14 @@ public ApiFuture<Topic> createTopic(Topic topic) {

@Override
public ApiFuture<Topic> getTopic(TopicPath path) {
return runWithRetries(
return RetryingExecutorUtil.runWithRetries(
() -> stub.getTopic(GetTopicRequest.newBuilder().setName(path.value()).build()),
topicRetryingExecutor);
}

@Override
public ApiFuture<Long> getTopicPartitionCount(TopicPath path) {
return runWithRetries(
return RetryingExecutorUtil.runWithRetries(
() ->
stub.getTopicPartitions(
GetTopicPartitionsRequest.newBuilder().setName(path.value()).build())
Expand All @@ -175,7 +133,7 @@ public ApiFuture<Long> getTopicPartitionCount(TopicPath path) {

@Override
public ApiFuture<List<Topic>> listTopics(LocationPath path) {
return runWithRetries(
return RetryingExecutorUtil.runWithRetries(
() -> {
return stub.listTopics(ListTopicsRequest.newBuilder().setParent(path.value()).build())
.getTopicsList();
Expand All @@ -185,7 +143,7 @@ public ApiFuture<List<Topic>> listTopics(LocationPath path) {

@Override
public ApiFuture<Topic> updateTopic(Topic topic, FieldMask mask) {
return runWithRetries(
return RetryingExecutorUtil.runWithRetries(
() -> {
return stub.updateTopic(
UpdateTopicRequest.newBuilder().setTopic(topic).setUpdateMask(mask).build());
Expand All @@ -196,7 +154,7 @@ public ApiFuture<Topic> updateTopic(Topic topic, FieldMask mask) {
@Override
@SuppressWarnings("UnusedReturnValue")
public ApiFuture<Void> deleteTopic(TopicPath path) {
return runWithRetries(
return RetryingExecutorUtil.runWithRetries(
() -> {
stub.deleteTopic(DeleteTopicRequest.newBuilder().setName(path.value()).build());
return null;
Expand All @@ -206,7 +164,7 @@ public ApiFuture<Void> deleteTopic(TopicPath path) {

@Override
public ApiFuture<List<SubscriptionPath>> listTopicSubscriptions(TopicPath path) {
return runWithRetries(
return RetryingExecutorUtil.runWithRetries(
() -> {
ImmutableList.Builder<SubscriptionPath> builder = ImmutableList.builder();
for (String subscription :
Expand All @@ -224,7 +182,7 @@ public ApiFuture<List<SubscriptionPath>> listTopicSubscriptions(TopicPath path)

@Override
public ApiFuture<Subscription> createSubscription(Subscription subscription) {
return runWithRetries(
return RetryingExecutorUtil.runWithRetries(
() -> {
SubscriptionPath path = SubscriptionPath.of(subscription.getName());
return stub.createSubscription(
Expand All @@ -239,15 +197,15 @@ public ApiFuture<Subscription> createSubscription(Subscription subscription) {

@Override
public ApiFuture<Subscription> getSubscription(SubscriptionPath path) {
return runWithRetries(
return RetryingExecutorUtil.runWithRetries(
() ->
stub.getSubscription(GetSubscriptionRequest.newBuilder().setName(path.value()).build()),
subscriptionRetryingExecutor);
}

@Override
public ApiFuture<List<Subscription>> listSubscriptions(LocationPath path) {
return runWithRetries(
return RetryingExecutorUtil.runWithRetries(
() -> {
return stub.listSubscriptions(
ListSubscriptionsRequest.newBuilder().setParent(path.value()).build())
Expand All @@ -258,7 +216,7 @@ public ApiFuture<List<Subscription>> listSubscriptions(LocationPath path) {

@Override
public ApiFuture<Subscription> updateSubscription(Subscription subscription, FieldMask mask) {
return runWithRetries(
return RetryingExecutorUtil.runWithRetries(
() -> {
return stub.updateSubscription(
UpdateSubscriptionRequest.newBuilder()
Expand All @@ -272,7 +230,7 @@ public ApiFuture<Subscription> updateSubscription(Subscription subscription, Fie
@Override
@SuppressWarnings("UnusedReturnValue")
public ApiFuture<Void> deleteSubscription(SubscriptionPath path) {
return runWithRetries(
return RetryingExecutorUtil.runWithRetries(
() -> {
stub.deleteSubscription(
DeleteSubscriptionRequest.newBuilder().setName(path.value()).build());
Expand Down
@@ -0,0 +1,77 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.pubsublite.internal;

import com.google.api.core.ApiFuture;
import com.google.api.core.NanoClock;
import com.google.api.gax.retrying.ExponentialRetryAlgorithm;
import com.google.api.gax.retrying.ResultRetryAlgorithm;
import com.google.api.gax.retrying.RetryAlgorithm;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.retrying.RetryingExecutor;
import com.google.api.gax.retrying.RetryingFuture;
import com.google.api.gax.retrying.ScheduledRetryingExecutor;
import com.google.api.gax.retrying.TimedAttemptSettings;
import com.google.cloud.pubsublite.ErrorCodes;
import io.grpc.Status;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService;

public final class RetryingExecutorUtil {

private RetryingExecutorUtil() {}

public static <T> RetryingExecutor<T> retryingExecutor(
RetrySettings settings, ScheduledExecutorService executor) {
return new ScheduledRetryingExecutor<>(retryAlgorithm(settings), executor);
}

public static <T> ApiFuture<T> runWithRetries(
Callable<T> callable, RetryingExecutor<T> executor) {
RetryingFuture<T> retryingFuture = executor.createFuture(callable);
retryingFuture.setAttemptFuture(executor.submit(retryingFuture));
return retryingFuture;
}

private static <T> RetryAlgorithm<T> retryAlgorithm(RetrySettings retrySettings) {
return new RetryAlgorithm<>(
resultRetryAlgorithm(),
new ExponentialRetryAlgorithm(retrySettings, NanoClock.getDefaultClock()));
}

private static <T> ResultRetryAlgorithm<T> resultRetryAlgorithm() {
return new ResultRetryAlgorithm<T>() {
@Override
public TimedAttemptSettings createNextAttempt(
Throwable prevThrowable, T prevResponse, TimedAttemptSettings prevSettings) {
return null; // Null means no specific settings.
}

@Override
public boolean shouldRetry(Throwable prevThrowable, T prevResponse) {
if (null != prevResponse) {
return false;
}
Optional<Status> statusOr = ExtractStatus.extract(prevThrowable);
if (!statusOr.isPresent()) {
return false; // Received a non-grpc error.
}
return ErrorCodes.IsRetryable(statusOr.get().getCode());
}
};
}
}
@@ -0,0 +1,55 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.pubsublite.internal;

import com.google.api.core.ApiFuture;
import com.google.api.gax.core.BackgroundResource;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
import io.grpc.StatusException;

public interface TopicStatsClient extends BackgroundResource {

static TopicStatsClient create(TopicStatsClientSettings settings) throws StatusException {
return settings.instantiate();
}

/** The Google Cloud region this client operates on. */
CloudRegion region();

/**
* Compute statistics about the messages between two cursors in a topic partition.
*
* @param path The topic to compute statistics on
* @param partition The partition to compute statistics for
* @param start The start cursor
* @param end The end cursor
* @return A future that will have either an error {@link io.grpc.StatusException} or the
* ComputeMessageStatistics on success.
*/
ApiFuture<ComputeMessageStatsResponse> computeMessageStats(
TopicPath path, Partition partition, Offset start, Offset end);

/**
* Tear down this admin client.
*
* @throws StatusException on a failure to properly terminate.
*/
void close() throws StatusException;
}

0 comments on commit 7eb7861

Please sign in to comment.