diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java index c68367601..d7687ae07 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java @@ -31,8 +31,10 @@ import com.google.api.gax.core.FixedExecutorProvider; import com.google.api.gax.core.InstantiatingExecutorProvider; import com.google.api.gax.core.NoCredentialsProvider; +import com.google.api.gax.grpc.GrpcTransportChannel; import com.google.api.gax.grpc.testing.LocalChannelProvider; import com.google.api.gax.rpc.DataLossException; +import com.google.api.gax.rpc.FixedTransportChannelProvider; import com.google.api.gax.rpc.TransportChannelProvider; import com.google.cloud.pubsub.v1.Publisher.Builder; import com.google.protobuf.ByteString; @@ -40,9 +42,11 @@ import com.google.pubsub.v1.PublishRequest; import com.google.pubsub.v1.PublishResponse; import com.google.pubsub.v1.PubsubMessage; +import io.grpc.ManagedChannel; import io.grpc.Server; import io.grpc.Status; import io.grpc.StatusException; +import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -75,6 +79,8 @@ public class PublisherImplTest { private FakePublisherServiceImpl testPublisherServiceImpl; + private ManagedChannel testChannel; + private Server testServer; @Before @@ -84,6 +90,7 @@ public void setUp() throws Exception { InProcessServerBuilder serverBuilder = InProcessServerBuilder.forName("test-server"); serverBuilder.addService(testPublisherServiceImpl); testServer = serverBuilder.build(); + testChannel = InProcessChannelBuilder.forName("test-server").build(); testServer.start(); fakeExecutor = new FakeScheduledExecutorService(); @@ -92,6 +99,7 @@ public void setUp() throws Exception { @After public void tearDown() throws Exception { testServer.shutdownNow().awaitTermination(); + testChannel.shutdown(); } @Test @@ -122,8 +130,7 @@ public void testPublishByDuration() throws Exception { assertEquals("2", publishFuture2.get()); assertEquals(2, testPublisherServiceImpl.getCapturedRequests().get(0).getMessagesCount()); - publisher.shutdown(); - publisher.awaitTermination(1, TimeUnit.MINUTES); + shutdownTestPublisher(publisher); } @Test @@ -160,8 +167,9 @@ public void testPublishByNumBatchedMessages() throws Exception { assertEquals(2, testPublisherServiceImpl.getCapturedRequests().get(0).getMessagesCount()); assertEquals(2, testPublisherServiceImpl.getCapturedRequests().get(1).getMessagesCount()); - publisher.shutdown(); - publisher.awaitTermination(1, TimeUnit.MINUTES); + + fakeExecutor.advanceTime(Duration.ofSeconds(100)); + shutdownTestPublisher(publisher); } @Test @@ -195,8 +203,9 @@ public void testSinglePublishByNumBytes() throws Exception { assertEquals("4", publishFuture4.get()); assertEquals(2, testPublisherServiceImpl.getCapturedRequests().size()); - publisher.shutdown(); - publisher.awaitTermination(1, TimeUnit.MINUTES); + + fakeExecutor.advanceTime(Duration.ofSeconds(100)); + shutdownTestPublisher(publisher); } @Test @@ -219,15 +228,16 @@ public void testPublishByShutdown() throws Exception { // Note we are not advancing time or reaching the count threshold but messages should // still get published by call to shutdown - publisher.shutdown(); - publisher.awaitTermination(1, TimeUnit.MINUTES); // Verify the publishes completed assertTrue(publishFuture1.isDone()); assertTrue(publishFuture2.isDone()); assertEquals("1", publishFuture1.get()); assertEquals("2", publishFuture2.get()); + + fakeExecutor.advanceTime(Duration.ofSeconds(100)); + publisher.awaitTermination(1, TimeUnit.MINUTES); } @Test @@ -269,8 +279,7 @@ public void testPublishMixedSizeAndDuration() throws Exception { assertEquals(2, testPublisherServiceImpl.getCapturedRequests().get(0).getMessagesCount()); assertEquals(1, testPublisherServiceImpl.getCapturedRequests().get(1).getMessagesCount()); - publisher.shutdown(); - publisher.awaitTermination(1, TimeUnit.MINUTES); + shutdownTestPublisher(publisher); } private ApiFuture sendTestMessage(Publisher publisher, String data) { @@ -326,7 +335,9 @@ public void testBatchedMessagesWithOrderingKeyByNum() throws Exception { } } } - publisher.shutdown(); + + fakeExecutor.advanceTime(Duration.ofSeconds(100)); + shutdownTestPublisher(publisher); } @Test @@ -389,7 +400,7 @@ public void testBatchedMessagesWithOrderingKeyByDuration() throws Exception { } } } - publisher.shutdown(); + shutdownTestPublisher(publisher); } @Test @@ -418,7 +429,8 @@ public void testLargeMessagesDoNotReorderBatches() throws Exception { // Verify that messages with "OrderB" were delivered in order. assertTrue(Integer.parseInt(publishFuture2.get()) < Integer.parseInt(publishFuture3.get())); - publisher.shutdown(); + fakeExecutor.advanceTime(Duration.ofSeconds(100)); + shutdownTestPublisher(publisher); } @Test @@ -431,7 +443,7 @@ public void testOrderingKeyWhenDisabled_throwsException() throws Exception { } catch (IllegalStateException expected) { // expected } - publisher.shutdown(); + shutdownTestPublisher(publisher); } @Test @@ -461,6 +473,7 @@ public void testEnableMessageOrdering_overwritesMaxAttempts() throws Exception { assertEquals(4, testPublisherServiceImpl.getCapturedRequests().size()); publisher.shutdown(); + assertTrue(publisher.awaitTermination(1, TimeUnit.MINUTES)); } @Test @@ -550,7 +563,7 @@ public void testResumePublish() throws Exception { Assert.assertEquals("7", future7.get()); Assert.assertEquals("8", future8.get()); - publisher.shutdown(); + shutdownTestPublisher(publisher); } private ApiFuture sendTestMessageWithOrderingKey( @@ -604,8 +617,7 @@ public void testPublishFailureRetries() throws Exception { assertEquals("1", publishFuture1.get()); assertEquals(2, testPublisherServiceImpl.getCapturedRequests().size()); - publisher.shutdown(); - publisher.awaitTermination(1, TimeUnit.MINUTES); + shutdownTestPublisher(publisher); } @Test(expected = ExecutionException.class) @@ -629,8 +641,7 @@ public void testPublishFailureRetries_retriesDisabled() throws Exception { publishFuture1.get(); } finally { assertSame(testPublisherServiceImpl.getCapturedRequests().size(), 1); - publisher.shutdown(); - publisher.awaitTermination(1, TimeUnit.MINUTES); + shutdownTestPublisher(publisher); } } @@ -656,8 +667,7 @@ public void testPublishFailureRetries_maxRetriesSetup() throws Exception { assertEquals("1", publishFuture1.get()); assertEquals(3, testPublisherServiceImpl.getCapturedRequests().size()); - publisher.shutdown(); - publisher.awaitTermination(1, TimeUnit.MINUTES); + shutdownTestPublisher(publisher); } @Test @@ -683,7 +693,7 @@ public void testPublishFailureRetries_maxRetriesSetUnlimited() throws Exception assertEquals(3, testPublisherServiceImpl.getCapturedRequests().size()); publisher.shutdown(); - publisher.awaitTermination(1, TimeUnit.MINUTES); + assertTrue(publisher.awaitTermination(1, TimeUnit.MINUTES)); } @Test(expected = ExecutionException.class) @@ -712,14 +722,15 @@ public void testPublishFailureRetries_nonRetryableFailsImmediately() throws Exce } finally { assertTrue(testPublisherServiceImpl.getCapturedRequests().size() >= 1); publisher.shutdown(); - publisher.awaitTermination(1, TimeUnit.MINUTES); + assertTrue(publisher.awaitTermination(1, TimeUnit.MINUTES)); } } @Test public void testPublisherGetters() throws Exception { Publisher.Builder builder = Publisher.newBuilder(TEST_TOPIC); - builder.setChannelProvider(TEST_CHANNEL_PROVIDER); + builder.setChannelProvider( + FixedTransportChannelProvider.create(GrpcTransportChannel.create(testChannel))); builder.setExecutorProvider(SINGLE_THREAD_EXECUTOR); builder.setBatchingSettings( BatchingSettings.newBuilder() @@ -735,7 +746,7 @@ public void testPublisherGetters() throws Exception { assertEquals(Duration.ofMillis(11), publisher.getBatchingSettings().getDelayThreshold()); assertEquals(12, (long) publisher.getBatchingSettings().getElementCountThreshold()); publisher.shutdown(); - publisher.awaitTermination(1, TimeUnit.MINUTES); + assertTrue(publisher.awaitTermination(1, TimeUnit.MINUTES)); } @Test @@ -1115,7 +1126,14 @@ public void run() { private Builder getTestPublisherBuilder() { return Publisher.newBuilder(TEST_TOPIC) .setExecutorProvider(FixedExecutorProvider.create(fakeExecutor)) - .setChannelProvider(TEST_CHANNEL_PROVIDER) + .setChannelProvider( + FixedTransportChannelProvider.create(GrpcTransportChannel.create(testChannel))) .setCredentialsProvider(NoCredentialsProvider.create()); } + + private void shutdownTestPublisher(Publisher publisher) throws InterruptedException { + publisher.shutdown(); + fakeExecutor.advanceTime(Duration.ofSeconds(10)); + assertTrue(publisher.awaitTermination(1, TimeUnit.MINUTES)); + } }