Skip to content

Commit

Permalink
feat: Add ReassignmentHandler which is notified on client reassignment (
Browse files Browse the repository at this point in the history
#886)

This enables clients to respond to a reassignment by cancelling outstanding actions and nacking all messages.

Fixes #869
  • Loading branch information
dpcollins-google committed Sep 16, 2021
1 parent 5abd97d commit 5bfef8d
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 42 deletions.
5 changes: 5 additions & 0 deletions google-cloud-pubsublite/clirr-ignored-differences.xml
Expand Up @@ -164,6 +164,11 @@
<className>com/google/cloud/pubsublite/cloudpubsub/internal/**</className>
<method>*</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/pubsublite/cloudpubsub/internal/**</className>
<method>*</method>
</difference>
<difference>
<differenceType>7005</differenceType>
<className>com/google/cloud/pubsublite/cloudpubsub/internal/**</className>
Expand Down
@@ -0,0 +1,51 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.cloudpubsub;

import com.google.cloud.pubsublite.Partition;
import java.util.Set;

/**
* A ReassignmentHandler is called any time a new partition assignment is received from the server.
* It will be called with both the previous and new assignments as decided by the backend.
*
* <p>The client library will not acknowledge the assignment until handleReassignment returns. The
* assigning backend will not assign any of the partitions in `before` to another server unless the
* assignment is acknowledged, or a client takes too long to acknowledged (currently 30 seconds from
* the time the assignment is sent from server's point of view).
*
* <p>Because of the above, as long as reassignment handling is processed quickly, it can be used to
* abort outstanding operations on partitions which are being assigned away from this client, or to
* pre-warm state which will be used by the MessageReceiver.
*/
public interface ReassignmentHandler {
/**
* Called with the previous and new assignment delivered to this client on an assignment change.
* The assignment will not be acknowledged until this method returns, so it should complete
* quickly, or the backend will assume it is non-responsive and assign all partitions away without
* waiting for acknowledgement.
*
* <p>handleReassignment will only be called after no new messages will be delivered for the
* partition.
*
* <p>Acks or nacks on messages from partitions being assigned away will have no effect.
*
* @param before the previous assignment
* @param after the new assignment
*/
void handleReassignment(Set<Partition> before, Set<Partition> after);
}
Expand Up @@ -126,12 +126,16 @@ public abstract class SubscriberSettings {
*/
abstract Optional<NackHandler> nackHandler();

/** A handler that will be notified when partition assignments change from the backend. */
abstract ReassignmentHandler reassignmentHandler();

public static Builder newBuilder() {
return new AutoValue_SubscriberSettings.Builder()
.setFramework(Framework.of("CLOUD_PUBSUB_SHIM"))
.setPartitions(ImmutableList.of())
.setCredentialsProvider(
SubscriberServiceSettings.defaultCredentialsProviderBuilder().build());
SubscriberServiceSettings.defaultCredentialsProviderBuilder().build())
.setReassignmentHandler((before, after) -> {});
}

@AutoValue.Builder
Expand Down Expand Up @@ -203,6 +207,9 @@ public abstract Builder setSubscriberServiceClientSupplier(
*/
public abstract Builder setNackHandler(NackHandler nackHandler);

/** A handler that will be notified when partition assignments change from the backend. */
public abstract Builder setReassignmentHandler(ReassignmentHandler reassignmentHandler);

public abstract SubscriberSettings build();
}

Expand Down Expand Up @@ -301,7 +308,8 @@ Subscriber instantiate() throws ApiException {
.setServiceClient(getAssignmentServiceClient());
AssignerFactory assignerFactory =
receiver -> assignerSettings.setReceiver(receiver).build().instantiate();
return new AssigningSubscriber(partitionSubscriberFactory, assignerFactory);
return new AssigningSubscriber(
partitionSubscriberFactory, reassignmentHandler(), assignerFactory);
}

List<Subscriber> perPartitionSubscribers = new ArrayList<>();
Expand Down
Expand Up @@ -22,9 +22,9 @@

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.cloudpubsub.ReassignmentHandler;
import com.google.cloud.pubsublite.cloudpubsub.Subscriber;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.CloseableMonitor;
import com.google.cloud.pubsublite.internal.ProxyService;
import com.google.cloud.pubsublite.internal.wire.Assigner;
import com.google.cloud.pubsublite.internal.wire.AssignerFactory;
Expand All @@ -36,42 +36,36 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

public class AssigningSubscriber extends ProxyService implements Subscriber {
private static final GoogleLogger LOG = GoogleLogger.forEnclosingClass();
private final PartitionSubscriberFactory subscriberFactory;
private final ReassignmentHandler reassignmentHandler;

private final CloseableMonitor monitor = new CloseableMonitor();

@GuardedBy("monitor.monitor")
@GuardedBy("this")
private final Map<Partition, Subscriber> liveSubscriberMap = new HashMap<>();

@GuardedBy("monitor.monitor")
private final List<Subscriber> stoppingSubscribers = new ArrayList<>();

@GuardedBy("monitor.monitor")
@GuardedBy("this")
private boolean shutdown = false;

public AssigningSubscriber(
PartitionSubscriberFactory subscriberFactory, AssignerFactory assignerFactory)
PartitionSubscriberFactory subscriberFactory,
ReassignmentHandler reassignmentHandler,
AssignerFactory assignerFactory)
throws ApiException {
this.subscriberFactory = subscriberFactory;
this.reassignmentHandler = reassignmentHandler;
Assigner assigner = assignerFactory.New(this::handleAssignment);
addServices(assigner);
}

@Override
protected void start() {}

@Override
protected void stop() {
try (CloseableMonitor.Hold h = monitor.enter()) {
shutdown = true;
blockingShutdown(liveSubscriberMap.values());
liveSubscriberMap.clear();
blockingShutdown(stoppingSubscribers);
}
protected synchronized void stop() {
shutdown = true;
blockingShutdown(liveSubscriberMap.values());
liveSubscriberMap.clear();
}

@Override
Expand All @@ -81,25 +75,30 @@ protected void handlePermanentError(CheckedApiException error) {

private void handleAssignment(Set<Partition> assignment) {
try {
try (CloseableMonitor.Hold h = monitor.enter()) {
Set<Partition> livePartitions;
List<Subscriber> removed = new ArrayList<>();
synchronized (this) {
if (shutdown) return;
Set<Partition> livePartitions = ImmutableSet.copyOf(liveSubscriberMap.keySet());
livePartitions = ImmutableSet.copyOf(liveSubscriberMap.keySet());
for (Partition partition : livePartitions) {
if (!assignment.contains(partition)) {
stopSubscriber(liveSubscriberMap.remove(partition));
removed.add(Objects.requireNonNull(liveSubscriberMap.remove(partition)));
}
}
for (Partition partition : assignment) {
if (!liveSubscriberMap.containsKey(partition)) startSubscriber(partition);
}
}
blockingShutdown(removed);
// Call reassignment handler outside lock so it won't deadlock if it is asynchronously
// reentrant such as by calling sub.stopAsync().awaitTerminated().
reassignmentHandler.handleReassignment(livePartitions, assignment);
} catch (Throwable t) {
onPermanentError(toCanonical(t));
}
}

@GuardedBy("monitor.monitor")
private void startSubscriber(Partition partition) throws CheckedApiException {
private synchronized void startSubscriber(Partition partition) throws CheckedApiException {
checkState(!liveSubscriberMap.containsKey(partition));
Subscriber subscriber = subscriberFactory.newSubscriber(partition);
subscriber.addListener(
Expand All @@ -109,22 +108,9 @@ public void failed(State from, Throwable failure) {
if (State.STOPPING.equals(from)) return;
onPermanentError(toCanonical(failure));
}

@Override
public void terminated(State from) {
try (CloseableMonitor.Hold h = monitor.enter()) {
stoppingSubscribers.remove(subscriber);
}
}
},
SystemExecutors.getFuturesExecutor());
liveSubscriberMap.put(partition, subscriber);
subscriber.startAsync();
}

@GuardedBy("monitor.monitor")
private void stopSubscriber(Subscriber subscriber) {
stoppingSubscribers.add(subscriber);
subscriber.stopAsync();
}
}
Expand Up @@ -78,6 +78,10 @@ protected void start() throws CheckedApiException {
@Override
protected void stop() {}

private boolean terminated() {
return State.TERMINATED.equals(state());
}

@VisibleForTesting
void onMessages(List<SequencedMessage> sequencedMessages) {
try {
Expand All @@ -89,6 +93,9 @@ void onMessages(List<SequencedMessage> sequencedMessages) {
new AckReplyConsumer() {
@Override
public void ack() {
if (terminated()) {
return; // Drop acks after shutdown
}
trackerConsumer.run();
try {
wireSubscriber.allowFlow(
Expand All @@ -103,6 +110,9 @@ public void ack() {

@Override
public void nack() {
if (terminated()) {
return; // Drop nacks after shutdown to allow nacking from reassignment handler
}
ApiFuture<Void> nackDone = nackHandler.nack(userMessage);
ApiFutures.addCallback(
nackDone,
Expand Down
Expand Up @@ -20,14 +20,19 @@
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.initMocks;

import com.google.api.core.ApiService.State;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.cloudpubsub.ReassignmentHandler;
import com.google.cloud.pubsublite.cloudpubsub.Subscriber;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.testing.FakeApiService;
Expand All @@ -36,17 +41,21 @@
import com.google.cloud.pubsublite.internal.wire.PartitionAssignmentReceiver;
import com.google.common.collect.ImmutableSet;
import io.grpc.Status;
import java.util.Set;
import java.util.concurrent.Future;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.stubbing.Answer;

@RunWith(JUnit4.class)
public class AssigningSubscriberTest {
@Mock PartitionSubscriberFactory subscriberFactory;
@Mock ReassignmentHandler reassignmentHandler;
@Mock AssignerFactory assignerFactory;

private AssigningSubscriber assigningSubscriber;
Expand All @@ -68,7 +77,8 @@ public void setUp() {
leakedReceiver = args.getArgument(0);
return assigner;
});
assigningSubscriber = new AssigningSubscriber(subscriberFactory, assignerFactory);
assigningSubscriber =
new AssigningSubscriber(subscriberFactory, reassignmentHandler, assignerFactory);
verify(assignerFactory).New(any());
assertThat(leakedReceiver).isNotNull();
assigningSubscriber.startAsync().awaitRunning();
Expand All @@ -89,19 +99,52 @@ public void failedCreate() throws CheckedApiException {
assertThrows(IllegalStateException.class, assigningSubscriber::awaitTerminated);
}

@Test
public void assignmentHandlerFailure() throws Exception {
Subscriber sub1 = spy(FakeSubscriber.class);
when(subscriberFactory.newSubscriber(Partition.of(1))).thenReturn(sub1);
doThrow(new RuntimeException("Arbitrary error."))
.when(reassignmentHandler)
.handleReassignment(ImmutableSet.of(), ImmutableSet.of(Partition.of(1)));
leakedReceiver.handleAssignment(ImmutableSet.of(Partition.of(1)));
assertThrows(IllegalStateException.class, assigningSubscriber::awaitTerminated);
}

@Test
public void assignmentHandlerReentrantSafe() throws Exception {
Subscriber sub1 = spy(FakeSubscriber.class);
when(subscriberFactory.newSubscriber(Partition.of(1))).thenReturn(sub1);
doAnswer(
(Answer<Void>)
args -> {
assigningSubscriber.stopAsync().awaitTerminated();
return null;
})
.when(reassignmentHandler)
.handleReassignment(ImmutableSet.of(), ImmutableSet.of(Partition.of(1)));
leakedReceiver.handleAssignment(ImmutableSet.of(Partition.of(1)));
assigningSubscriber.awaitTerminated();
}

@Test
public void createSubscribers() throws CheckedApiException {
Subscriber sub1 = spy(FakeSubscriber.class);
when(subscriberFactory.newSubscriber(Partition.of(1))).thenReturn(sub1);
leakedReceiver.handleAssignment(ImmutableSet.of(Partition.of(1)));
verify(subscriberFactory).newSubscriber(Partition.of(1));
InOrder order = inOrder(reassignmentHandler, subscriberFactory);
order.verify(subscriberFactory).newSubscriber(Partition.of(1));
order
.verify(reassignmentHandler)
.handleReassignment(ImmutableSet.of(), ImmutableSet.of(Partition.of(1)));
verify(sub1).startAsync();
reset(sub1);

Subscriber sub2 = spy(FakeSubscriber.class);
when(subscriberFactory.newSubscriber(Partition.of(2))).thenReturn(sub2);
leakedReceiver.handleAssignment(ImmutableSet.of(Partition.of(1), Partition.of(2)));
Set<Partition> newAssignment = ImmutableSet.of(Partition.of(1), Partition.of(2));
leakedReceiver.handleAssignment(newAssignment);
verify(subscriberFactory).newSubscriber(Partition.of(2));
verify(reassignmentHandler).handleReassignment(ImmutableSet.of(Partition.of(1)), newAssignment);
verify(sub2).startAsync();
verifyNoMoreInteractions(sub1);
}
Expand All @@ -112,15 +155,31 @@ public void createAndEvict() throws CheckedApiException {
when(subscriberFactory.newSubscriber(Partition.of(1))).thenReturn(sub1);
leakedReceiver.handleAssignment(ImmutableSet.of(Partition.of(1)));
verify(subscriberFactory).newSubscriber(Partition.of(1));
verify(reassignmentHandler)
.handleReassignment(ImmutableSet.of(), ImmutableSet.of(Partition.of(1)));
verify(sub1).startAsync();
State sub1State = sub1.state();
assertThat(State.STARTING.equals(sub1State) || State.RUNNING.equals(sub1State)).isTrue();
reset(sub1);

Subscriber sub2 = spy(FakeSubscriber.class);
when(subscriberFactory.newSubscriber(Partition.of(2))).thenReturn(sub2);
doAnswer(
(Answer<Void>)
args -> {
// sub1 should already be stopped by the time the reassignmentHandler is called
assertThat(sub1.state()).isEqualTo(State.TERMINATED);
return null;
})
.when(reassignmentHandler)
.handleReassignment(ImmutableSet.of(Partition.of(1)), ImmutableSet.of(Partition.of(2)));
leakedReceiver.handleAssignment(ImmutableSet.of(Partition.of(2)));
verify(subscriberFactory).newSubscriber(Partition.of(2));
verify(reassignmentHandler)
.handleReassignment(ImmutableSet.of(Partition.of(1)), ImmutableSet.of(Partition.of(2)));
verify(sub2).startAsync();
verify(sub1).stopAsync();
verify(sub1).awaitTerminated();
}

private Subscriber initSub1() throws CheckedApiException {
Expand Down

0 comments on commit 5bfef8d

Please sign in to comment.