Skip to content

Commit

Permalink
fix: Return an error on publish if the publisher's state is not RUNNI…
Browse files Browse the repository at this point in the history
…NG (#81)

* Throw an exception on publish if the publisher state is not RUNNING

* Immediately return failed future

* Moved check to PublisherImpl

* Tests for PublisherImplTest

* Throw a permanent error when publishing before starting the service

* Remove comment

* Check for permanent failure by trying to start publisher
  • Loading branch information
tmdiep committed Jun 8, 2020
1 parent a56858e commit 74d61fd
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 13 deletions.
Expand Up @@ -20,6 +20,7 @@

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.core.ApiService;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.batching.BatchingSettings;
import com.google.cloud.pubsublite.Constants;
Expand Down Expand Up @@ -209,12 +210,11 @@ public ApiFuture<Offset> publish(Message message) {
return ApiFutures.immediateFailedFuture(error.asException());
}
try (CloseableMonitor.Hold h = monitor.enter()) {
if (shutdown) {
return ApiFutures.immediateFailedFuture(
Status.FAILED_PRECONDITION
.withDescription("Published after the stream shut down.")
.asException());
}
ApiService.State currentState = state();
checkState(
currentState == ApiService.State.RUNNING,
String.format("Cannot publish when Publisher state is %s.", currentState.name()));
checkState(!shutdown, "Published after the stream shut down.");
ApiFuture<Offset> messageFuture = batcher.add(proto);
if (batcher.shouldFlush()) {
processBatch(batcher.flush());
Expand Down
Expand Up @@ -116,20 +116,25 @@ public void setUp() throws StatusException {
INITIAL_PUBLISH_REQUEST.getInitialRequest(),
BATCHING_SETTINGS_THAT_NEVER_FIRE);
publisher.addListener(permanentErrorHandler, MoreExecutors.directExecutor());
}

private void startPublisher() {
publisher.startAsync().awaitRunning();

assertThat(leakedOffsetStream).isNotNull();
verify(mockPublisherFactory).New(any(), any(), eq(INITIAL_PUBLISH_REQUEST));
}

@Test
public void construct_CallsFactoryNew() {
verify(mockPublisherFactory).New(any(), any(), eq(INITIAL_PUBLISH_REQUEST));
startPublisher();
verifyNoMoreInteractions(mockPublisherFactory);
verifyZeroInteractions(mockBatchPublisher);
}

@Test
public void construct_FlushSendsBatched() throws Exception {
verify(mockPublisherFactory).New(any(), any(), eq(INITIAL_PUBLISH_REQUEST));
startPublisher();
Message message = Message.builder().build();
Future<Offset> future = publisher.publish(message);

Expand All @@ -151,7 +156,7 @@ public void construct_FlushSendsBatched() throws Exception {

@Test
public void construct_CloseSendsBatched() throws Exception {
verify(mockPublisherFactory).New(any(), any(), eq(INITIAL_PUBLISH_REQUEST));
startPublisher();
Message message = Message.builder().build();
Future<Offset> future = publisher.publish(message);

Expand All @@ -172,9 +177,18 @@ public void construct_CloseSendsBatched() throws Exception {
verifyNoMoreInteractions(mockBatchPublisher);
}

@Test
public void publishBeforeStart_IsPermanentError() throws Exception {
Message message = Message.builder().build();
assertThrows(IllegalStateException.class, () -> publisher.publish(message));
assertThrows(IllegalStateException.class, () -> publisher.startAsync().awaitRunning());
verifyZeroInteractions(mockPublisherFactory);
verifyZeroInteractions(mockBatchPublisher);
}

@Test
public void publishAfterError_IsError() throws Exception {
verify(mockPublisherFactory).New(any(), any(), eq(INITIAL_PUBLISH_REQUEST));
startPublisher();
leakedOffsetStream.onError(Status.PERMISSION_DENIED.asRuntimeException());
assertThrows(IllegalStateException.class, publisher::awaitTerminated);
errorOccurredLatch.await();
Expand All @@ -191,7 +205,7 @@ public void publishAfterError_IsError() throws Exception {

@Test
public void multipleBatches_Ok() throws Exception {
verify(mockPublisherFactory).New(any(), any(), eq(INITIAL_PUBLISH_REQUEST));
startPublisher();
Message message1 = Message.builder().build();
Message message2 = Message.builder().setData(ByteString.copyFromUtf8("data")).build();
Message message3 = Message.builder().setData(ByteString.copyFromUtf8("other_data")).build();
Expand Down Expand Up @@ -226,7 +240,7 @@ public void multipleBatches_Ok() throws Exception {

@Test
public void retryableError_RecreatesAndRetriesAll() throws Exception {
verify(mockPublisherFactory).New(any(), any(), eq(INITIAL_PUBLISH_REQUEST));
startPublisher();
Message message1 = Message.builder().setData(ByteString.copyFromUtf8("message1")).build();
Message message2 = Message.builder().setData(ByteString.copyFromUtf8("message2")).build();
Future<Offset> future1 = publisher.publish(message1);
Expand Down Expand Up @@ -276,7 +290,7 @@ public void retryableError_RecreatesAndRetriesAll() throws Exception {

@Test
public void invalidOffsetSequence_SetsPermanentException() throws Exception {
verify(mockPublisherFactory).New(any(), any(), eq(INITIAL_PUBLISH_REQUEST));
startPublisher();
Message message1 = Message.builder().build();
Message message2 = Message.builder().setData(ByteString.copyFromUtf8("data")).build();
Message message3 = Message.builder().setData(ByteString.copyFromUtf8("other_data")).build();
Expand Down

0 comments on commit 74d61fd

Please sign in to comment.