diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index 9af734db2..e28427eea 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -527,6 +527,7 @@ public void shutdown() { currentAlarmFuture.cancel(false); } publishAllOutstanding(); + messagesWaiter.waitComplete(); backgroundResources.shutdown(); } diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java index 08fdee7b6..b1109c8fc 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java @@ -195,6 +195,37 @@ public void testSinglePublishByNumBytes() throws Exception { publisher.awaitTermination(1, TimeUnit.MINUTES); } + @Test + public void testPublishByShutdown() throws Exception { + Publisher publisher = + getTestPublisherBuilder() + .setBatchingSettings( + Publisher.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setDelayThreshold(Duration.ofSeconds(100)) + .setElementCountThreshold(10L) + .build()) + .build(); + + testPublisherServiceImpl.addPublishResponse( + PublishResponse.newBuilder().addMessageIds("1").addMessageIds("2")); + + ApiFuture publishFuture1 = sendTestMessage(publisher, "A"); + ApiFuture publishFuture2 = sendTestMessage(publisher, "B"); + + // Note we are not advancing time or reaching the count threshold but messages should + // still get published by call to shutdown + + publisher.shutdown(); + publisher.awaitTermination(1, TimeUnit.MINUTES); + + // Verify the publishes completed + assertTrue(publishFuture1.isDone()); + assertTrue(publishFuture2.isDone()); + assertEquals("1", publishFuture1.get()); + assertEquals("2", publishFuture2.get()); + } + @Test public void testPublishMixedSizeAndDuration() throws Exception { Publisher publisher =