Skip to content

Commit

Permalink
feat: Set initial location when connecting subscribe streams (#664)
Browse files Browse the repository at this point in the history
- Sets the InitialSubscribeRequest.initial_location field when reconnecting subscribe streams.
- Completes the handling of admin/out of band seeks.
- Removes the initial seek from all subscriber wrapper implementations.
  • Loading branch information
tmdiep committed Jun 8, 2021
1 parent 8f4d176 commit 65ced46
Show file tree
Hide file tree
Showing 16 changed files with 208 additions and 167 deletions.
Expand Up @@ -42,6 +42,8 @@
import com.google.cloud.pubsublite.internal.wire.PubsubContext.Framework;
import com.google.cloud.pubsublite.internal.wire.RoutingMetadata;
import com.google.cloud.pubsublite.internal.wire.SubscriberBuilder;
import com.google.cloud.pubsublite.proto.SeekRequest;
import com.google.cloud.pubsublite.proto.SeekRequest.NamedTarget;
import com.google.cloud.pubsublite.v1.CursorServiceClient;
import com.google.cloud.pubsublite.v1.CursorServiceSettings;
import com.google.cloud.pubsublite.v1.PartitionAssignmentServiceClient;
Expand Down Expand Up @@ -244,7 +246,9 @@ Subscriber newPartitionSubscriber(Partition partition) throws CheckedApiExceptio
SubscriberBuilder.newBuilder()
.setPartition(partition)
.setSubscriptionPath(subscriptionPath())
.setServiceClient(newSubscriberServiceClient(partition));
.setServiceClient(newSubscriberServiceClient(partition))
.setInitialLocation(
SeekRequest.newBuilder().setNamedTarget(NamedTarget.COMMITTED_CURSOR).build());

Committer wireCommitter =
CommitterSettings.newBuilder()
Expand Down
Expand Up @@ -129,7 +129,7 @@ public void onSuccess(Void result) {

@VisibleForTesting
boolean onSubscriberReset() throws CheckedApiException {
// TODO: handle reset.
return false;
ackSetTracker.waitUntilCommitted();
return true;
}
}
Expand Up @@ -27,14 +27,12 @@
import com.google.cloud.pubsublite.internal.wire.Subscriber;
import com.google.cloud.pubsublite.internal.wire.SubscriberFactory;
import com.google.cloud.pubsublite.proto.FlowControlRequest;
import com.google.cloud.pubsublite.proto.SeekRequest;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Deque;
import java.util.Optional;
import java.util.concurrent.ExecutionException;

public class BlockingPullSubscriberImpl implements BlockingPullSubscriber {

Expand All @@ -49,8 +47,7 @@ public class BlockingPullSubscriberImpl implements BlockingPullSubscriber {
@GuardedBy("this")
private Optional<SettableApiFuture<Void>> notification = Optional.empty();

public BlockingPullSubscriberImpl(
SubscriberFactory factory, FlowControlSettings settings, SeekRequest initialSeek)
public BlockingPullSubscriberImpl(SubscriberFactory factory, FlowControlSettings settings)
throws CheckedApiException {
underlying = factory.newSubscriber(this::addMessages);
underlying.addListener(
Expand All @@ -62,11 +59,6 @@ public void failed(State state, Throwable throwable) {
},
MoreExecutors.directExecutor());
underlying.startAsync().awaitRunning();
try {
underlying.seek(initialSeek).get();
} catch (InterruptedException | ExecutionException e) {
throw ExtractStatus.toCanonical(e);
}
underlying.allowFlow(
FlowControlRequest.newBuilder()
.setAllowedMessages(settings.messagesOutstanding())
Expand Down
Expand Up @@ -24,8 +24,6 @@
import com.google.cloud.pubsublite.internal.wire.Subscriber;
import com.google.cloud.pubsublite.internal.wire.SubscriberFactory;
import com.google.cloud.pubsublite.proto.FlowControlRequest;
import com.google.cloud.pubsublite.proto.SeekRequest;
import com.google.cloud.pubsublite.proto.SeekRequest.NamedTarget;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.MoreExecutors;
Expand All @@ -35,7 +33,6 @@
import java.util.Deque;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;

public class BufferingPullSubscriber implements PullSubscriber<SequencedMessage> {
private final Subscriber underlying;
Expand All @@ -51,15 +48,6 @@ public class BufferingPullSubscriber implements PullSubscriber<SequencedMessage>

public BufferingPullSubscriber(SubscriberFactory factory, FlowControlSettings settings)
throws CheckedApiException {
this(
factory,
settings,
SeekRequest.newBuilder().setNamedTarget(NamedTarget.COMMITTED_CURSOR).build());
}

public BufferingPullSubscriber(
SubscriberFactory factory, FlowControlSettings settings, SeekRequest initialSeek)
throws CheckedApiException {
underlying = factory.newSubscriber(this::addMessages);
underlying.addListener(
new Listener() {
Expand All @@ -70,11 +58,6 @@ public void failed(State state, Throwable throwable) {
},
MoreExecutors.directExecutor());
underlying.startAsync().awaitRunning();
try {
underlying.seek(initialSeek).get();
} catch (InterruptedException | ExecutionException e) {
throw ExtractStatus.toCanonical(e);
}
underlying.allowFlow(
FlowControlRequest.newBuilder()
.setAllowedMessages(settings.messagesOutstanding())
Expand Down
Expand Up @@ -30,6 +30,9 @@
public interface Subscriber extends ApiService {
// Seek the subscriber using the given SeekRequest. Requires that no seeks are outstanding.
// Returns the seeked-to offset.
//
// Flow control tokens are reset when the seek response is received from the server and should be
// refilled after the future completes.
ApiFuture<Offset> seek(SeekRequest request);
// Whether or not a seek is in flight for this subscriber. If a seek is in flight, any further
// seek requests will result in a permanent error.
Expand Down
Expand Up @@ -22,6 +22,7 @@
import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.proto.InitialSubscribeRequest;
import com.google.cloud.pubsublite.proto.SeekRequest;
import com.google.cloud.pubsublite.v1.SubscriberServiceClient;
import com.google.common.collect.ImmutableList;
import java.util.function.Consumer;
Expand All @@ -37,6 +38,8 @@ public abstract class SubscriberBuilder {

abstract SubscriberServiceClient serviceClient();

abstract SeekRequest initialLocation();

// Optional parameters.
abstract SubscriberResetHandler resetHandler();

Expand All @@ -57,6 +60,8 @@ public abstract Builder setMessageConsumer(

public abstract Builder setServiceClient(SubscriberServiceClient serviceClient);

public abstract Builder setInitialLocation(SeekRequest initialLocation);

// Optional parameters.
public abstract Builder setResetHandler(SubscriberResetHandler resetHandler);

Expand All @@ -75,6 +80,7 @@ public Subscriber build() throws ApiException {
new SubscriberImpl(
autoBuilt.serviceClient(),
initialSubscribeRequest,
autoBuilt.initialLocation(),
autoBuilt.messageConsumer(),
autoBuilt.resetHandler()));
}
Expand Down
Expand Up @@ -34,12 +34,12 @@
import com.google.cloud.pubsublite.proto.FlowControlRequest;
import com.google.cloud.pubsublite.proto.InitialSubscribeRequest;
import com.google.cloud.pubsublite.proto.SeekRequest;
import com.google.cloud.pubsublite.proto.SeekRequest.NamedTarget;
import com.google.cloud.pubsublite.proto.SubscribeRequest;
import com.google.cloud.pubsublite.proto.SubscribeResponse;
import com.google.cloud.pubsublite.v1.SubscriberServiceClient;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Monitor;
import java.util.Optional;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
Expand All @@ -54,7 +54,7 @@ public class SubscriberImpl extends ProxyService

private final SubscriberResetHandler resetHandler;

private final SubscribeRequest initialRequest;
private final InitialSubscribeRequest baseInitialRequest;

private final CloseableMonitor monitor = new CloseableMonitor();

Expand All @@ -73,7 +73,7 @@ public class SubscriberImpl extends ProxyService
private Optional<InFlightSeek> inFlightSeek = Optional.empty();

@GuardedBy("monitor.monitor")
private boolean internalSeekInFlight = false;
private SeekRequest initialLocation;

@GuardedBy("monitor.monitor")
private boolean shutdown = false;
Expand All @@ -92,28 +92,32 @@ private static class InFlightSeek {
SubscriberImpl(
StreamFactory<SubscribeRequest, SubscribeResponse> streamFactory,
ConnectedSubscriberFactory factory,
InitialSubscribeRequest initialRequest,
InitialSubscribeRequest baseInitialRequest,
SeekRequest initialLocation,
Consumer<ImmutableList<SequencedMessage>> messageConsumer,
SubscriberResetHandler resetHandler)
throws ApiException {
this.messageConsumer = messageConsumer;
this.resetHandler = resetHandler;
this.initialRequest = SubscribeRequest.newBuilder().setInitial(initialRequest).build();
this.baseInitialRequest = baseInitialRequest;
this.initialLocation = initialLocation;
this.connection =
new RetryingConnectionImpl<>(streamFactory, factory, this, this.initialRequest);
new RetryingConnectionImpl<>(streamFactory, factory, this, getInitialRequest());
addServices(this.connection);
}

public SubscriberImpl(
SubscriberServiceClient client,
InitialSubscribeRequest initialRequest,
InitialSubscribeRequest baseInitialRequest,
SeekRequest initialLocation,
Consumer<ImmutableList<SequencedMessage>> messageConsumer,
SubscriberResetHandler resetHandler)
throws ApiException {
this(
stream -> client.subscribeCallable().splitCall(stream),
new ConnectedSubscriberImpl.Factory(),
initialRequest,
baseInitialRequest,
initialLocation,
messageConsumer,
resetHandler);
addServices(backgroundResourceAsApiService(client));
Expand Down Expand Up @@ -157,24 +161,18 @@ protected void stop() {

@Override
public ApiFuture<Offset> seek(SeekRequest request) {
try (CloseableMonitor.Hold h =
monitor.enterWhenUninterruptibly(
new Monitor.Guard(monitor.monitor) {
@Override
public boolean isSatisfied() {
return !internalSeekInFlight || shutdown;
}
})) {
try (CloseableMonitor.Hold h = monitor.enter()) {
checkArgument(
Predicates.isValidSeekRequest(request), "Sent SeekRequest with no location set.");
checkState(!shutdown, "Seeked after the stream shut down.");
checkState(!inFlightSeek.isPresent(), "Seeked while seek is already in flight.");
SettableApiFuture<Offset> future = SettableApiFuture.create();
inFlightSeek = Optional.of(new InFlightSeek(request, future));
flowControlBatcher.onClientSeek();
connection.modifyConnection(
connectedSubscriber ->
connectedSubscriber.ifPresent(subscriber -> subscriber.seek(request)));
// Note: next offset and flow control tokens should be reset upon seek response. Pre-seek
// messages may still be received until the server receives the seek request.
return future;
} catch (CheckedApiException e) {
onPermanentError(e);
Expand Down Expand Up @@ -202,6 +200,18 @@ public void allowFlow(FlowControlRequest clientRequest) throws CheckedApiExcepti
}
}

private SubscribeRequest getInitialRequest() {
try (CloseableMonitor.Hold h = monitor.enter()) {
return SubscribeRequest.newBuilder()
.setInitial(
baseInitialRequest
.toBuilder()
.setInitialLocation(
nextOffsetTracker.requestForRestart().orElse(initialLocation)))
.build();
}
}

public void reset() {
try (CloseableMonitor.Hold h = monitor.enter()) {
if (shutdown) return;
Expand All @@ -211,6 +221,8 @@ public void reset() {
new CheckedApiException("Aborted due to out of band seek.", Code.ABORTED)));
inFlightSeek = Optional.empty();
nextOffsetTracker.reset();
initialLocation =
SeekRequest.newBuilder().setNamedTarget(NamedTarget.COMMITTED_CURSOR).build();
}
}

Expand All @@ -230,25 +242,21 @@ public void triggerReinitialize(CheckedApiException streamError) {

try (CloseableMonitor.Hold h = monitor.enter()) {
if (shutdown) return;
connection.reinitialize(initialRequest);
connection.reinitialize(getInitialRequest());
connection.modifyConnection(
connectedSubscriber -> {
checkArgument(monitor.monitor.isOccupiedByCurrentThread());
checkArgument(connectedSubscriber.isPresent());
if (inFlightSeek.isPresent()) {
connectedSubscriber.get().seek(inFlightSeek.get().seekRequest);
} else {
nextOffsetTracker
// Flow control tokens should be cleared after the seek response is received, thus
// they are not sent after the subscribe stream is reconnected when there is an
// in-flight seek.
flowControlBatcher
.requestForRestart()
.ifPresent(
request -> {
internalSeekInFlight = true;
connectedSubscriber.get().seek(request);
});
.ifPresent(request -> connectedSubscriber.get().allowFlow(request));
}
flowControlBatcher
.requestForRestart()
.ifPresent(request -> connectedSubscriber.get().allowFlow(request));
});
} catch (CheckedApiException e) {
onPermanentError(e);
Expand Down Expand Up @@ -282,12 +290,9 @@ private void onMessageResponse(ImmutableList<SequencedMessage> messages)
private void onSeekResponse(Offset seekOffset) throws CheckedApiException {
try (CloseableMonitor.Hold h = monitor.enter()) {
if (shutdown) return;
if (internalSeekInFlight) {
internalSeekInFlight = false;
return;
}
checkState(inFlightSeek.isPresent(), "No in flight seek, but received a seek response.");
nextOffsetTracker.onClientSeek(seekOffset);
flowControlBatcher.onClientSeek();
inFlightSeek.get().seekFuture.set(seekOffset);
inFlightSeek = Optional.empty();
}
Expand Down
Expand Up @@ -223,7 +223,8 @@ public void singleMessageNackHandlerFailedFuture() throws CheckedApiException {
}

@Test
public void onSubscriberResetNotHandled() throws CheckedApiException {
assertThat(subscriber.onSubscriberReset()).isFalse();
public void onSubscriberResetWaitsForAckSetTracker() throws CheckedApiException {
assertThat(subscriber.onSubscriberReset()).isTrue();
verify(ackSetTracker).waitUntilCommitted();
}
}
Expand Up @@ -25,7 +25,6 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.google.api.core.ApiFutures;
import com.google.api.core.ApiService;
import com.google.api.gax.rpc.StatusCode;
import com.google.cloud.pubsublite.Message;
Expand All @@ -34,9 +33,7 @@
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.internal.wire.Subscriber;
import com.google.cloud.pubsublite.internal.wire.SubscriberFactory;
import com.google.cloud.pubsublite.proto.Cursor;
import com.google.cloud.pubsublite.proto.FlowControlRequest;
import com.google.cloud.pubsublite.proto.SeekRequest;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.util.Timestamps;
import java.util.ArrayList;
Expand All @@ -53,11 +50,6 @@
public class BlockingPullSubscriberImplTest {
private final SubscriberFactory underlyingFactory = mock(SubscriberFactory.class);
private final Subscriber underlying = mock(Subscriber.class);
private final Offset initialOffset = Offset.of(5);
private final SeekRequest initialSeek =
SeekRequest.newBuilder()
.setCursor(Cursor.newBuilder().setOffset(initialOffset.value()))
.build();
private final FlowControlSettings flowControlSettings =
FlowControlSettings.builder().setBytesOutstanding(10).setMessagesOutstanding(20).build();
// Initialized in setUp.
Expand All @@ -69,11 +61,6 @@ public class BlockingPullSubscriberImplTest {
@Before
public void setUp() throws Exception {
when(underlying.startAsync()).thenReturn(underlying);
SeekRequest seek =
SeekRequest.newBuilder()
.setCursor(Cursor.newBuilder().setOffset(initialOffset.value()).build())
.build();
when(underlying.seek(seek)).thenReturn(ApiFutures.immediateFuture(initialOffset));
FlowControlRequest flow =
FlowControlRequest.newBuilder()
.setAllowedBytes(flowControlSettings.bytesOutstanding())
Expand All @@ -94,15 +81,13 @@ public void setUp() throws Exception {
.when(underlying)
.addListener(any(), any());

subscriber =
new BlockingPullSubscriberImpl(underlyingFactory, flowControlSettings, initialSeek);
subscriber = new BlockingPullSubscriberImpl(underlyingFactory, flowControlSettings);

InOrder inOrder = inOrder(underlyingFactory, underlying);
inOrder.verify(underlyingFactory).newSubscriber(any());
inOrder.verify(underlying).addListener(any(), any());
inOrder.verify(underlying).startAsync();
inOrder.verify(underlying).awaitRunning();
inOrder.verify(underlying).seek(seek);
inOrder.verify(underlying).allowFlow(flow);

assertThat(messageConsumer).isNotNull();
Expand Down

0 comments on commit 65ced46

Please sign in to comment.