Skip to content

Commit

Permalink
fix: adding a method to the internal wire publisher that will attempt…
Browse files Browse the repository at this point in the history
… to cancel all outstanding publishes (#434)

* fix: keep track of internal seek

* fix: check shutdown on seek

* fix: reset tokens on seek

* feat: add api client header to outbound requests

* fix format

* fix: use gccl instead of gapic

* feat: adding a method to the wire publisher to cancel outstanding publishes

* fix: fix typo

* fix: adding more tests to satisfy codecov report
  • Loading branch information
hannahrogers-google committed Dec 22, 2020
1 parent 9df4ccf commit 7b9776e
Show file tree
Hide file tree
Showing 10 changed files with 123 additions and 9 deletions.
Expand Up @@ -29,4 +29,7 @@ public interface Publisher<ResponseT> extends ApiService, Flushable {
// Guarantees that if a single publish future has an exception set, all publish calls made after
// that will also have an exception set.
ApiFuture<ResponseT> publish(Message message);

// Attempts to cancel all outstanding publishes.
void cancelOutstandingPublishes();
}
Expand Up @@ -38,6 +38,11 @@ public ApiFuture<T> publish(Message message) {
return toClientFuture(publisher.publish(message));
}

@Override
public void cancelOutstandingPublishes() {
publisher.cancelOutstandingPublishes();
}

@Override
public void flush() throws IOException {
publisher.flush();
Expand Down
Expand Up @@ -68,6 +68,12 @@ public ApiFuture<PublishMetadata> publish(Message message) throws CheckedApiExce
}
}

public void cancelOutstandingPublishes() {
for (Publisher<PublishMetadata> publisher : publishers.values()) {
publisher.cancelOutstandingPublishes();
}
}

public void flush() throws IOException {
for (Publisher<PublishMetadata> publisher : publishers.values()) {
publisher.flush();
Expand Down Expand Up @@ -113,6 +119,19 @@ public ApiFuture<PublishMetadata> publish(Message message) {
}
}

@Override
public void cancelOutstandingPublishes() {
Optional<PartitionsWithRouting> partitions;
try (CloseableMonitor.Hold h = monitor.enter()) {
partitions = partitionsWithRouting;
}
if (!partitions.isPresent()) {
throw new IllegalStateException(
"Cancel outstanding publishes called before start or after shutdown");
}
partitions.get().cancelOutstandingPublishes();
}

@Override
public void flush() throws IOException {
Optional<PartitionsWithRouting> partitions;
Expand Down
Expand Up @@ -156,10 +156,7 @@ public void triggerReinitialize() {
protected void handlePermanentError(CheckedApiException error) {
try (CloseableMonitor.Hold h = monitor.enter()) {
shutdown = true;
batchesInFlight.forEach(
batch -> batch.messageFutures.forEach(future -> future.setException(error)));
batcher.flush().forEach(m -> m.future().setException(error));
batchesInFlight.clear();
terminateOutstandingPublishes(error);
}
}

Expand Down Expand Up @@ -200,6 +197,14 @@ private void processBatch(Collection<UnbatchedMessage> batch) throws CheckedApiE
});
}

@GuardedBy("monitor.monitor")
private void terminateOutstandingPublishes(CheckedApiException e) {
batchesInFlight.forEach(
batch -> batch.messageFutures.forEach(future -> future.setException(e)));
batcher.flush().forEach(m -> m.future().setException(e));
batchesInFlight.clear();
}

@Override
public ApiFuture<Offset> publish(Message message) {
PubSubMessage proto = message.toProto();
Expand Down Expand Up @@ -235,6 +240,14 @@ public ApiFuture<Offset> publish(Message message) {
}
}

@Override
public void cancelOutstandingPublishes() {
try (CloseableMonitor.Hold h = monitor.enter()) {
terminateOutstandingPublishes(
new CheckedApiException("Cancelled by client.", Code.CANCELLED));
}
}

@VisibleForTesting
void flushToStream() {
try (CloseableMonitor.Hold h = monitor.enter()) {
Expand Down
Expand Up @@ -63,6 +63,13 @@ public ApiFuture<PublishMetadata> publish(Message message) {
}
}

@Override
public void cancelOutstandingPublishes() {
for (Publisher<PublishMetadata> publisher : partitionPublishers.values()) {
publisher.cancelOutstandingPublishes();
}
}

@Override
public void flush() throws IOException {
for (Publisher<PublishMetadata> publisher : partitionPublishers.values()) {
Expand Down
Expand Up @@ -48,6 +48,11 @@ public ApiFuture<PublishMetadata> publish(Message message) {
MoreExecutors.directExecutor());
}

@Override
public void cancelOutstandingPublishes() {
publisher.cancelOutstandingPublishes();
}

@Override
public void flush() throws IOException {
publisher.flush();
Expand Down
Expand Up @@ -167,6 +167,13 @@ public void testFlush() throws Exception {
verify(publisher1).flush();
}

@Test
public void testCancelOutstandingPublishes() throws Exception {
publisher.cancelOutstandingPublishes();
verify(publisher0).cancelOutstandingPublishes();
verify(publisher1).cancelOutstandingPublishes();
}

@Test
public void testIncreaseSucceeds() throws Exception {
leakedConsumer.accept(3L);
Expand Down
Expand Up @@ -310,4 +310,31 @@ public void invalidOffsetSequence_SetsPermanentException() throws Exception {

verifyNoMoreInteractions(mockBatchPublisher);
}

@Test
public void cancelOutstandingPublishes_terminatesFutures() throws Exception {
startPublisher();

// Publish a message and flush to stream.
Message message1 = Message.builder().setData(ByteString.copyFromUtf8("data")).build();
Future<Offset> future1 = publisher.publish(message1);
publisher.flushToStream();
verify(mockBatchPublisher)
.publish((Collection<PubSubMessage>) argThat(hasItems(message1.toProto())));

// Publish another message but do not flush to stream yet.
Message message2 = Message.builder().setData(ByteString.copyFromUtf8("other_data")).build();
Future<Offset> future2 = publisher.publish(message2);

// Cancel outstanding publishes and verify that both futures complete with a cancelled status.
assertThat(future1.isDone()).isFalse();
assertThat(future2.isDone()).isFalse();
publisher.cancelOutstandingPublishes();
assertThat(future1.isDone()).isTrue();
ExecutionException e1 = assertThrows(ExecutionException.class, future1::get);
assertThat(ExtractStatus.extract(e1.getCause()).get().code()).isEqualTo(Code.CANCELLED);
assertThat(future2.isDone()).isTrue();
ExecutionException e2 = assertThrows(ExecutionException.class, future2::get);
assertThat(ExtractStatus.extract(e2.getCause()).get().code()).isEqualTo(Code.CANCELLED);
}
}
Expand Up @@ -72,6 +72,14 @@ public void flushFlushesAll() throws Exception {
this.routing.stopAsync().awaitTerminated();
}

@Test
public void cancelOutstandingCancelsAll() throws Exception {
routing.cancelOutstandingPublishes();
verify(publisher0, times(1)).cancelOutstandingPublishes();
verify(publisher1, times(1)).cancelOutstandingPublishes();
this.routing.stopAsync().awaitTerminated();
}

@Test
public void publishValidRoute() throws Exception {
Message message = Message.builder().setKey(ByteString.copyFromUtf8("abc")).build();
Expand Down
Expand Up @@ -19,6 +19,8 @@
import static com.google.common.truth.Truth.assertThat;
import static org.mockito.Mockito.RETURNS_SELF;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.initMocks;

Expand Down Expand Up @@ -48,13 +50,12 @@ abstract static class FakeOffsetPublisher extends FakeApiService implements Publ

@Spy private FakeOffsetPublisher underlying;

private Publisher<PublishMetadata> pub;

@Before
public void setUp() {
initMocks(this);
}

@Test
public void publishResultTransformed() throws Exception {
TopicPath topic =
TopicPath.newBuilder()
.setName(TopicName.of("abc"))
Expand All @@ -67,13 +68,17 @@ public void publishResultTransformed() throws Exception {
when(mockBuilder.build()).thenReturn(underlying);

when(mockBuilder.setTopic(topic)).thenReturn(mockBuilder);
Publisher<PublishMetadata> pub =
this.pub =
SinglePartitionPublisherBuilder.newBuilder()
.setTopic(topic)
.setPartition(partition)
.setUnderlyingBuilder(mockBuilder)
.build();
pub.startAsync().awaitRunning();
this.pub.startAsync().awaitRunning();
}

@Test
public void publishResultTransformed() throws Exception {
SettableApiFuture<Offset> offsetFuture = SettableApiFuture.create();
Message message = Message.builder().setData(ByteString.copyFromUtf8("xyz")).build();
when(underlying.publish(message)).thenReturn(offsetFuture);
Expand All @@ -82,5 +87,20 @@ public void publishResultTransformed() throws Exception {
offsetFuture.set(Offset.of(7));
assertThat(metadataFuture.isDone()).isTrue();
assertThat(metadataFuture.get()).isEqualTo(PublishMetadata.of(Partition.of(3), Offset.of(7)));
pub.stopAsync().awaitTerminated();
}

@Test
public void flushFlushesUnderlying() throws Exception {
pub.flush();
verify(underlying, times(1)).flush();
pub.stopAsync().awaitTerminated();
}

@Test
public void cancelOutstandingCancelsUnderlyingPublishes() throws Exception {
pub.cancelOutstandingPublishes();
verify(underlying, times(1)).cancelOutstandingPublishes();
pub.stopAsync().awaitTerminated();
}
}

0 comments on commit 7b9776e

Please sign in to comment.