Skip to content

Commit

Permalink
feat: Use TopicStats to implement getSplitBacklog (#228)
Browse files Browse the repository at this point in the history
* feat: Use TopicStats to implement getSplitBacklog

We use the TopicStatsService to compute the backlog size for the
assigned splits in the PubsubLiteUnbounded Reader.

We will refresh backlog information every ten seconds, and will use
stale information for up to a minute if the topic stats serivce is
unavailable.

Constructing the TopicBacklogReader in SubscriberOptions is a bit
awkward, but I wanted to make sure we don't need to re-resolve the
subscription -> topic mapping when we split or merge the source.
Otherwise we end up with a hard dependency on the admin service.

* Code review changes

* Change logging api to test dependency

* Switch to google logger
  • Loading branch information
palmere-google committed Sep 15, 2020
1 parent 7deb610 commit 9a889a9
Show file tree
Hide file tree
Showing 5 changed files with 217 additions and 12 deletions.
13 changes: 13 additions & 0 deletions pubsublite-beam-io/pom.xml
Expand Up @@ -77,7 +77,19 @@
<groupId>com.google.api.grpc</groupId>
<artifactId>proto-google-cloud-pubsub-v1</artifactId>
</dependency>
<dependency>
<groupId>com.google.flogger</groupId>
<artifactId>google-extensions</artifactId>
<version>0.5.1</version>
</dependency>

<!--test dependencies-->
<dependency>
<groupId>com.google.flogger</groupId>
<artifactId>flogger-system-backend</artifactId>
<version>0.5.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down Expand Up @@ -131,6 +143,7 @@
<ignoredUnusedDeclaredDependencies>
<ignoredUnusedDeclaredDependency>org.hamcrest:hamcrest</ignoredUnusedDeclaredDependency>
<ignoredUnusedDeclaredDependency>io.grpc:grpc-testing</ignoredUnusedDeclaredDependency>
<ignoredUnusedDeclaredDependency>com.google.flogger:flogger-system-backend</ignoredUnusedDeclaredDependency>
<ignoredUnusedDeclaredDependency>org.apache.beam:beam-runners-direct-java</ignoredUnusedDeclaredDependency>
<ignoredUnusedDeclaredDependency>javax.annotation:javax.annotation-api</ignoredUnusedDeclaredDependency>
</ignoredUnusedDeclaredDependencies>
Expand Down
Expand Up @@ -25,7 +25,13 @@
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.internal.ProxyService;
import com.google.cloud.pubsublite.internal.wire.Committer;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
import com.google.common.base.Ticker;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableMap;
import com.google.common.flogger.GoogleLogger;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.protobuf.Timestamp;
import com.google.protobuf.util.Timestamps;
Expand All @@ -40,6 +46,8 @@
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.beam.sdk.io.UnboundedSource;
Expand All @@ -50,7 +58,10 @@

class PubsubLiteUnboundedReader extends UnboundedReader<SequencedMessage>
implements OffsetFinalizer {
private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();
private final UnboundedSource<SequencedMessage, ?> source;
private final TopicBacklogReader backlogReader;
private final LoadingCache<String, Long> backlogCache;
private final CloseableMonitor monitor = new CloseableMonitor();

@GuardedBy("monitor.monitor")
Expand Down Expand Up @@ -88,7 +99,9 @@ protected void handlePermanentError(StatusException error) {

public PubsubLiteUnboundedReader(
UnboundedSource<SequencedMessage, ?> source,
ImmutableMap<Partition, SubscriberState> subscriberMap)
ImmutableMap<Partition, SubscriberState> subscriberMap,
TopicBacklogReader backlogReader,
Ticker ticker)
throws StatusException {
this.source = source;
this.subscriberMap = subscriberMap;
Expand All @@ -100,9 +113,32 @@ public PubsubLiteUnboundedReader(
permanentError = Optional.of(permanentError.orElse(error));
}
});
this.backlogReader = backlogReader;
this.backlogCache =
CacheBuilder.newBuilder()
.ticker(ticker)
.maximumSize(1)
.expireAfterWrite(1, TimeUnit.MINUTES)
.refreshAfterWrite(10, TimeUnit.SECONDS)
.build(
new CacheLoader<Object, Long>() {
public Long load(Object val) throws InterruptedException, ExecutionException {
return computeSplitBacklog().get().getMessageBytes();
}
});
this.committerProxy.startAsync().awaitRunning();
}

private ApiFuture<ComputeMessageStatsResponse> computeSplitBacklog() {
ImmutableMap.Builder<Partition, Offset> builder = ImmutableMap.builder();
try (CloseableMonitor.Hold h = monitor.enter()) {
subscriberMap.forEach(
(partition, subscriberState) ->
subscriberState.lastDelivered.ifPresent(offset -> builder.put(partition, offset)));
}
return backlogReader.computeMessageStats(builder.build());
}

@Override
public void finalizeOffsets(Map<Partition, Offset> offsets) throws StatusException {
List<ApiFuture<Void>> commitFutures = new ArrayList<>();
Expand Down Expand Up @@ -257,6 +293,20 @@ public CheckpointMark getCheckpointMark() {
}
}

@Override
public long getSplitBacklogBytes() {
try {
// We use the cache because it allows us to coalesce request, periodically refresh the value
// and expire the value after a maximum staleness, but there is only ever one key.
return backlogCache.get("Backlog");
} catch (ExecutionException e) {
logger.atWarning().log(
"Failed to retrieve backlog information, reporting the backlog size as UNKNOWN: {}",
e.getCause().getMessage());
return BACKLOG_UNKNOWN;
}
}

@Override
public UnboundedSource<SequencedMessage, ?> getCurrentSource() {
return source;
Expand Down
Expand Up @@ -23,6 +23,7 @@
import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.internal.wire.Committer;
import com.google.cloud.pubsublite.internal.wire.SubscriberFactory;
import com.google.common.base.Ticker;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -93,7 +94,11 @@ public UnboundedReader<SequencedMessage> createReader(
}
statesBuilder.put(partition, state);
}
return new PubsubLiteUnboundedReader(this, statesBuilder.build());
return new PubsubLiteUnboundedReader(
this,
statesBuilder.build(),
subscriberOptions.topicBacklogReader(),
Ticker.systemTicker());
} catch (StatusException e) {
throw new IOException(e);
}
Expand Down
Expand Up @@ -17,10 +17,16 @@
package com.google.cloud.pubsublite.beam;

import com.google.auto.value.AutoValue;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.AdminClientSettings;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.PartitionLookupUtils;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.internal.TopicStatsClient;
import com.google.cloud.pubsublite.internal.TopicStatsClientSettings;
import com.google.cloud.pubsublite.internal.wire.Committer;
import com.google.cloud.pubsublite.internal.wire.CommitterBuilder;
import com.google.cloud.pubsublite.internal.wire.PubsubContext;
Expand All @@ -34,6 +40,7 @@
import com.google.common.collect.ImmutableSet;
import io.grpc.StatusException;
import java.io.Serializable;
import java.util.concurrent.ExecutionException;

@AutoValue
public abstract class SubscriberOptions implements Serializable {
Expand All @@ -50,6 +57,9 @@ public abstract class SubscriberOptions implements Serializable {
/** A set of partitions. If empty, retrieve the set of partitions using an admin client. */
public abstract ImmutableSet<Partition> partitions();

/** The class used to read backlog for the subscription described by subscriptionPath() */
public abstract TopicBacklogReader topicBacklogReader();

/** A supplier for the subscriber stub to be used. */
public abstract Optional<SerializableSupplier<SubscriberServiceStub>> subscriberStubSupplier();

Expand Down Expand Up @@ -133,24 +143,54 @@ public abstract Builder setSubscriberStubSupplier(
public abstract Builder setCommitterStubSupplier(
SerializableSupplier<CursorServiceStub> stubSupplier);

public abstract Builder setTopicBacklogReader(TopicBacklogReader topicBacklogReader);

// Used in unit tests
abstract Builder setSubscriberFactory(SubscriberFactory subscriberFactory);

abstract Builder setCommitterSupplier(SerializableSupplier<Committer> committerSupplier);

// Used for implementing build();
abstract SubscriptionPath subscriptionPath();

abstract ImmutableSet<Partition> partitions();

abstract Optional<TopicBacklogReader> topicBacklogReader();

abstract SubscriberOptions autoBuild();

public SubscriberOptions build() throws StatusException {
SubscriberOptions built = autoBuild();
if (!built.partitions().isEmpty()) {
return built;
if (!partitions().isEmpty() && topicBacklogReader().isPresent()) {
return autoBuild();
}
int partition_count = PartitionLookupUtils.numPartitions(built.subscriptionPath());
SubscriberOptions.Builder builder = built.toBuilder();
ImmutableSet.Builder<Partition> partitions = ImmutableSet.builder();
for (int i = 0; i < partition_count; i++) {
partitions.add(Partition.of(i));
TopicPath path;
try (AdminClient adminClient =
AdminClient.create(
AdminClientSettings.newBuilder()
.setRegion(subscriptionPath().location().location().region())
.build())) {
path = TopicPath.parse(adminClient.getSubscription(subscriptionPath()).get().getTopic());
} catch (ExecutionException e) {
throw ExtractStatus.toCanonical(e.getCause());
} catch (Throwable t) {
throw ExtractStatus.toCanonical(t);
}
return builder.setPartitions(partitions.build()).autoBuild();

if (partitions().isEmpty()) {
int partition_count = PartitionLookupUtils.numPartitions(path);
ImmutableSet.Builder<Partition> partitions = ImmutableSet.builder();
for (int i = 0; i < partition_count; i++) {
partitions.add(Partition.of(i));
}
setPartitions(partitions.build());
}
if (!topicBacklogReader().isPresent()) {
setTopicBacklogReader(
new TopicBacklogReaderImpl(
TopicStatsClient.create(TopicStatsClientSettings.newBuilder().build()), path));
}

return autoBuild();
}
}
}
Expand Up @@ -32,10 +32,15 @@
import com.google.cloud.pubsublite.beam.PubsubLiteUnboundedReader.SubscriberState;
import com.google.cloud.pubsublite.internal.FakeApiService;
import com.google.cloud.pubsublite.internal.wire.Committer;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
import com.google.common.base.Ticker;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.Duration;
import com.google.protobuf.Timestamp;
import com.google.protobuf.util.Durations;
import com.google.protobuf.util.Timestamps;
import io.grpc.Status;
import io.grpc.StatusException;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -46,6 +51,7 @@
import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
Expand All @@ -62,12 +68,32 @@ public class PubsubLiteUnboundedReaderTest {

abstract static class CommitterFakeService extends FakeApiService implements Committer {}

private static class FakeTicker extends Ticker {
private Timestamp time;

FakeTicker(Timestamp start) {
time = start;
}

@Override
public long read() {
return Timestamps.toNanos(time);
}

public void advance(Duration duration) {
time = Timestamps.add(time, duration);
}
}

@Spy private CommitterFakeService committer5;
@Spy private CommitterFakeService committer8;

@SuppressWarnings("unchecked")
private final UnboundedSource<SequencedMessage, ?> source = mock(UnboundedSource.class);

private final TopicBacklogReader backlogReader = mock(TopicBacklogReader.class);
private final FakeTicker ticker = new FakeTicker(Timestamps.fromSeconds(450));

private final PubsubLiteUnboundedReader reader;

private static SequencedMessage exampleMessage(Offset offset, Timestamp publishTime) {
Expand All @@ -92,7 +118,10 @@ public PubsubLiteUnboundedReaderTest() throws StatusException {
state8.committer = committer8;
reader =
new PubsubLiteUnboundedReader(
source, ImmutableMap.of(Partition.of(5), state5, Partition.of(8), state8));
source,
ImmutableMap.of(Partition.of(5), state5, Partition.of(8), state8),
backlogReader,
ticker);
}

@Test
Expand Down Expand Up @@ -215,4 +244,72 @@ public void checkpointMarkFinalizeCommits() throws Exception {
mark.finalizeCheckpoint();
verify(committer5).commitOffset(Offset.of(10));
}

@Test
public void splitBacklogBytes_returnsUnknownBacklogOnError() throws Exception {
when(backlogReader.computeMessageStats(ImmutableMap.of()))
.thenReturn(ApiFutures.immediateFailedFuture(new StatusException(Status.UNAVAILABLE)));
Assert.assertEquals(PubsubLiteUnboundedReader.BACKLOG_UNKNOWN, reader.getSplitBacklogBytes());
}

@Test
public void splitBacklogBytes_computesBacklog() throws Exception {
ComputeMessageStatsResponse response =
ComputeMessageStatsResponse.newBuilder().setMessageBytes(40).build();
when(backlogReader.computeMessageStats(ImmutableMap.of()))
.thenReturn(ApiFutures.immediateFuture(response));
Assert.assertEquals(response.getMessageBytes(), reader.getSplitBacklogBytes());
}

@Test
public void splitBacklogBytes_computesBacklogOncePerTenSeconds() throws Exception {
ComputeMessageStatsResponse response1 =
ComputeMessageStatsResponse.newBuilder().setMessageBytes(40).build();
ComputeMessageStatsResponse response2 =
ComputeMessageStatsResponse.newBuilder().setMessageBytes(50).build();

when(backlogReader.computeMessageStats(ImmutableMap.of()))
.thenReturn(ApiFutures.immediateFuture(response1), ApiFutures.immediateFuture(response2));

Assert.assertEquals(response1.getMessageBytes(), reader.getSplitBacklogBytes());
ticker.advance(Durations.fromSeconds(10));
Assert.assertEquals(response1.getMessageBytes(), reader.getSplitBacklogBytes());
ticker.advance(Durations.fromSeconds(1));
Assert.assertEquals(response2.getMessageBytes(), reader.getSplitBacklogBytes());
}

@Test
public void splitBacklogBytes_oldValueExpiresAfterOneMinute() throws Exception {
ComputeMessageStatsResponse response =
ComputeMessageStatsResponse.newBuilder().setMessageBytes(40).build();

when(backlogReader.computeMessageStats(ImmutableMap.of()))
.thenReturn(
ApiFutures.immediateFuture(response),
ApiFutures.immediateFailedFuture(new StatusException(Status.UNAVAILABLE)));

Assert.assertEquals(response.getMessageBytes(), reader.getSplitBacklogBytes());
ticker.advance(Durations.fromSeconds(30));
Assert.assertEquals(response.getMessageBytes(), reader.getSplitBacklogBytes());
ticker.advance(Durations.fromSeconds(31));
Assert.assertEquals(PubsubLiteUnboundedReader.BACKLOG_UNKNOWN, reader.getSplitBacklogBytes());
}

@Test
public void splitBacklogBytes_usesCorrectCursorValues() throws Exception {
SequencedMessage message1 = exampleMessage(Offset.of(10), randomMilliAllignedTimestamp());
SequencedMessage message2 = exampleMessage(Offset.of(888), randomMilliAllignedTimestamp());
ComputeMessageStatsResponse response =
ComputeMessageStatsResponse.newBuilder().setMessageBytes(40).build();

when(subscriber5.pull()).thenReturn(ImmutableList.of(message1));
when(subscriber8.pull()).thenReturn(ImmutableList.of(message2));
when(backlogReader.computeMessageStats(
ImmutableMap.of(Partition.of(5), Offset.of(10), Partition.of(8), Offset.of(888))))
.thenReturn(ApiFutures.immediateFuture(response));

assertThat(reader.start()).isTrue();
assertThat(reader.advance()).isTrue();
Assert.assertEquals(response.getMessageBytes(), reader.getSplitBacklogBytes());
}
}

0 comments on commit 9a889a9

Please sign in to comment.