Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: ensure proper cleanup of publisher in tests #310

Merged
merged 37 commits into from Aug 24, 2020
Merged
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
19dd129
Modifying Publish example in README to match other examples given, and
hannahrogers-google Nov 25, 2019
4158529
fix: Modifying Publish example in README to match other examples, and
hannahrogers-google Nov 25, 2019
7d704f0
Merge branch 'master' of github.com:hannahrogers-google/java-pubsub
hannahrogers-google Jan 9, 2020
9e0ebc6
Merge branch 'master' of https://github.com/googleapis/java-pubsub
hannahrogers-google Jan 9, 2020
01d41b7
feat: Adding support for DLQs
hannahrogers-google Jan 13, 2020
ed3b1ee
Fix formatting
hannahrogers-google Jan 13, 2020
72f7996
fix: making changes requested in pull request
hannahrogers-google Jan 14, 2020
389cb86
Merge branch 'master' of https://github.com/googleapis/java-pubsub
hannahrogers-google Jan 14, 2020
c9e4bd2
Merge branch 'master' of https://github.com/googleapis/java-pubsub
hannahrogers-google Jan 29, 2020
3738ec8
fix: creating fix to not populate delivery attempt attribute when dead
hannahrogers-google Jan 29, 2020
4ce1d3b
Merge branch 'master' of https://github.com/googleapis/java-pubsub
hannahrogers-google Jan 29, 2020
aecd4ca
Adding unit test for case in which a received message has no delivery…
hannahrogers-google Jan 30, 2020
77fa3b3
Making MessageWaiter class more generic to also be used for outstanding
hannahrogers-google Jan 30, 2020
b87023c
Merge branch 'master' of https://github.com/googleapis/java-pubsub
hannahrogers-google Jan 30, 2020
7086b3c
Merge branch 'master' into fix-subscriber-stop
hannahrogers-google Jan 30, 2020
da0e355
Waiting for acks to complete before shutting down a streaming subscriber
hannahrogers-google Jan 30, 2020
dc55fc1
Fixing formatting error
hannahrogers-google Jan 31, 2020
45beee2
Merge branch 'master' of https://github.com/googleapis/java-pubsub
hannahrogers-google Jan 31, 2020
548b7a7
fix: making sure all publishes complete before shutting down the
hannahrogers-google Jan 31, 2020
76a1de9
Merge branch 'master' of https://github.com/googleapis/java-pubsub
hannahrogers-google Feb 20, 2020
a8989f1
Merge branch 'master' of https://github.com/googleapis/java-pubsub
hannahrogers-google Feb 24, 2020
b4439fd
adding default max outstanding request bytes
hannahrogers-google Feb 24, 2020
01f607c
Merge branch 'master' of https://github.com/googleapis/java-pubsub
hannahrogers-google Feb 27, 2020
c0cddb3
Merge branch 'master' of https://github.com/googleapis/java-pubsub
hannahrogers-google Apr 29, 2020
9a6381d
fix: make push endpoint valid https
hannahrogers-google Apr 29, 2020
0bd165f
Merge branch 'master' of https://github.com/googleapis/java-pubsub
hannahrogers-google May 24, 2020
5d47be8
Merge branch 'master' of https://github.com/googleapis/java-pubsub
hannahrogers-google May 25, 2020
e3b7f6d
Merge branch 'master' of https://github.com/googleapis/java-pubsub in…
hannahrogers-google Jul 8, 2020
6c1a26c
Merge branch 'master' of https://github.com/googleapis/java-pubsub in…
hannahrogers-google Jul 23, 2020
f2fa6b8
fix: use default zero value if a flow control setting is not provided
hannahrogers-google Jul 23, 2020
0cd08dc
fix lint issues
hannahrogers-google Jul 23, 2020
9ba9764
Merge branch 'master' of https://github.com/googleapis/java-pubsub in…
hannahrogers-google Aug 4, 2020
f52eb61
Merge branch 'master' of https://github.com/googleapis/java-pubsub
hannahrogers-google Aug 6, 2020
78d98e8
fix: better cleanup during publisher test
hannahrogers-google Aug 6, 2020
785c3ee
fix: format issues
hannahrogers-google Aug 7, 2020
dbe753f
Merge branch 'master' of https://github.com/googleapis/java-pubsub
hannahrogers-google Aug 20, 2020
35c2c6e
fix: test timeouts should be a minute
hannahrogers-google Aug 20, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -31,18 +31,22 @@
import com.google.api.gax.core.FixedExecutorProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.api.gax.core.NoCredentialsProvider;
import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.grpc.testing.LocalChannelProvider;
import com.google.api.gax.rpc.DataLossException;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.pubsub.v1.Publisher.Builder;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PublishRequest;
import com.google.pubsub.v1.PublishResponse;
import com.google.pubsub.v1.PubsubMessage;
import io.grpc.ManagedChannel;
import io.grpc.Server;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import java.util.List;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -75,6 +79,8 @@ public class PublisherImplTest {

private FakePublisherServiceImpl testPublisherServiceImpl;

private ManagedChannel testChannel;

private Server testServer;

@Before
Expand All @@ -84,6 +90,7 @@ public void setUp() throws Exception {
InProcessServerBuilder serverBuilder = InProcessServerBuilder.forName("test-server");
serverBuilder.addService(testPublisherServiceImpl);
testServer = serverBuilder.build();
testChannel = InProcessChannelBuilder.forName("test-server").build();
testServer.start();

fakeExecutor = new FakeScheduledExecutorService();
Expand All @@ -92,6 +99,7 @@ public void setUp() throws Exception {
@After
public void tearDown() throws Exception {
testServer.shutdownNow().awaitTermination();
testChannel.shutdown();
}

@Test
Expand Down Expand Up @@ -122,8 +130,7 @@ public void testPublishByDuration() throws Exception {
assertEquals("2", publishFuture2.get());

assertEquals(2, testPublisherServiceImpl.getCapturedRequests().get(0).getMessagesCount());
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
shutdownTestPublisher(publisher);
}

@Test
Expand Down Expand Up @@ -160,8 +167,9 @@ public void testPublishByNumBatchedMessages() throws Exception {

assertEquals(2, testPublisherServiceImpl.getCapturedRequests().get(0).getMessagesCount());
assertEquals(2, testPublisherServiceImpl.getCapturedRequests().get(1).getMessagesCount());
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);

fakeExecutor.advanceTime(Duration.ofSeconds(100));
shutdownTestPublisher(publisher);
}

@Test
Expand Down Expand Up @@ -195,8 +203,9 @@ public void testSinglePublishByNumBytes() throws Exception {
assertEquals("4", publishFuture4.get());

assertEquals(2, testPublisherServiceImpl.getCapturedRequests().size());
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);

fakeExecutor.advanceTime(Duration.ofSeconds(100));
shutdownTestPublisher(publisher);
}

@Test
Expand All @@ -219,15 +228,16 @@ public void testPublishByShutdown() throws Exception {

// Note we are not advancing time or reaching the count threshold but messages should
// still get published by call to shutdown

publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);

// Verify the publishes completed
assertTrue(publishFuture1.isDone());
assertTrue(publishFuture2.isDone());
assertEquals("1", publishFuture1.get());
assertEquals("2", publishFuture2.get());

fakeExecutor.advanceTime(Duration.ofSeconds(100));
publisher.awaitTermination(1, TimeUnit.MINUTES);
}

@Test
Expand Down Expand Up @@ -269,8 +279,7 @@ public void testPublishMixedSizeAndDuration() throws Exception {

assertEquals(2, testPublisherServiceImpl.getCapturedRequests().get(0).getMessagesCount());
assertEquals(1, testPublisherServiceImpl.getCapturedRequests().get(1).getMessagesCount());
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
shutdownTestPublisher(publisher);
}

private ApiFuture<String> sendTestMessage(Publisher publisher, String data) {
Expand Down Expand Up @@ -326,7 +335,9 @@ public void testBatchedMessagesWithOrderingKeyByNum() throws Exception {
}
}
}
publisher.shutdown();

fakeExecutor.advanceTime(Duration.ofSeconds(100));
shutdownTestPublisher(publisher);
}

@Test
Expand Down Expand Up @@ -389,7 +400,7 @@ public void testBatchedMessagesWithOrderingKeyByDuration() throws Exception {
}
}
}
publisher.shutdown();
shutdownTestPublisher(publisher);
}

@Test
Expand Down Expand Up @@ -418,7 +429,8 @@ public void testLargeMessagesDoNotReorderBatches() throws Exception {
// Verify that messages with "OrderB" were delivered in order.
assertTrue(Integer.parseInt(publishFuture2.get()) < Integer.parseInt(publishFuture3.get()));

publisher.shutdown();
fakeExecutor.advanceTime(Duration.ofSeconds(100));
shutdownTestPublisher(publisher);
}

@Test
Expand All @@ -431,7 +443,7 @@ public void testOrderingKeyWhenDisabled_throwsException() throws Exception {
} catch (IllegalStateException expected) {
// expected
}
publisher.shutdown();
shutdownTestPublisher(publisher);
}

@Test
Expand Down Expand Up @@ -461,6 +473,7 @@ public void testEnableMessageOrdering_overwritesMaxAttempts() throws Exception {

assertEquals(4, testPublisherServiceImpl.getCapturedRequests().size());
publisher.shutdown();
assertTrue(publisher.awaitTermination(1, TimeUnit.MINUTES));
}

@Test
Expand Down Expand Up @@ -550,7 +563,7 @@ public void testResumePublish() throws Exception {
Assert.assertEquals("7", future7.get());
Assert.assertEquals("8", future8.get());

publisher.shutdown();
shutdownTestPublisher(publisher);
}

private ApiFuture<String> sendTestMessageWithOrderingKey(
Expand Down Expand Up @@ -604,8 +617,7 @@ public void testPublishFailureRetries() throws Exception {
assertEquals("1", publishFuture1.get());

assertEquals(2, testPublisherServiceImpl.getCapturedRequests().size());
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
shutdownTestPublisher(publisher);
}

@Test(expected = ExecutionException.class)
Expand All @@ -629,8 +641,7 @@ public void testPublishFailureRetries_retriesDisabled() throws Exception {
publishFuture1.get();
} finally {
assertSame(testPublisherServiceImpl.getCapturedRequests().size(), 1);
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
shutdownTestPublisher(publisher);
}
}

Expand All @@ -656,8 +667,7 @@ public void testPublishFailureRetries_maxRetriesSetup() throws Exception {
assertEquals("1", publishFuture1.get());

assertEquals(3, testPublisherServiceImpl.getCapturedRequests().size());
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
shutdownTestPublisher(publisher);
}

@Test
Expand All @@ -683,7 +693,7 @@ public void testPublishFailureRetries_maxRetriesSetUnlimited() throws Exception

assertEquals(3, testPublisherServiceImpl.getCapturedRequests().size());
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
assertTrue(publisher.awaitTermination(1, TimeUnit.MINUTES));
}

@Test(expected = ExecutionException.class)
Expand Down Expand Up @@ -712,14 +722,15 @@ public void testPublishFailureRetries_nonRetryableFailsImmediately() throws Exce
} finally {
assertTrue(testPublisherServiceImpl.getCapturedRequests().size() >= 1);
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
assertTrue(publisher.awaitTermination(1, TimeUnit.MINUTES));
}
}

@Test
public void testPublisherGetters() throws Exception {
Publisher.Builder builder = Publisher.newBuilder(TEST_TOPIC);
builder.setChannelProvider(TEST_CHANNEL_PROVIDER);
builder.setChannelProvider(
FixedTransportChannelProvider.create(GrpcTransportChannel.create(testChannel)));
builder.setExecutorProvider(SINGLE_THREAD_EXECUTOR);
builder.setBatchingSettings(
BatchingSettings.newBuilder()
Expand All @@ -735,7 +746,7 @@ public void testPublisherGetters() throws Exception {
assertEquals(Duration.ofMillis(11), publisher.getBatchingSettings().getDelayThreshold());
assertEquals(12, (long) publisher.getBatchingSettings().getElementCountThreshold());
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
assertTrue(publisher.awaitTermination(1, TimeUnit.MINUTES));
}

@Test
Expand Down Expand Up @@ -1115,7 +1126,14 @@ public void run() {
private Builder getTestPublisherBuilder() {
return Publisher.newBuilder(TEST_TOPIC)
.setExecutorProvider(FixedExecutorProvider.create(fakeExecutor))
.setChannelProvider(TEST_CHANNEL_PROVIDER)
.setChannelProvider(
FixedTransportChannelProvider.create(GrpcTransportChannel.create(testChannel)))
.setCredentialsProvider(NoCredentialsProvider.create());
}

private void shutdownTestPublisher(Publisher publisher) throws InterruptedException {
publisher.shutdown();
fakeExecutor.advanceTime(Duration.ofSeconds(10));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How was 10 seconds chosen? Specifically, how long does this test need to execute on average?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The gax layer implements a Watchdog (https://github.com/googleapis/gax-java/blob/master/gax/src/main/java/com/google/api/gax/rpc/Watchdog.java) that periodically garbage collects idle streams. The check is scheduled at a fixed rate every 10 seconds. So, I chose 10 seconds to ensure that the watchdog check completes, otherwise we have to wait the full timeout.

assertTrue(publisher.awaitTermination(1, TimeUnit.MINUTES));
}
}