Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
fix: fix flakiness in subscriberIT integration test (#476)
* fix: retrying sync pulls in subscriberIT test
  • Loading branch information
hannahrogers-google committed Jan 12, 2021
1 parent f01bb0d commit d981b4e
Showing 1 changed file with 35 additions and 8 deletions.
43 changes: 35 additions & 8 deletions samples/snippets/src/test/java/pubsub/SubscriberIT.java
Expand Up @@ -37,6 +37,7 @@
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.UUID;
import org.junit.After;
Expand Down Expand Up @@ -84,6 +85,32 @@ private static void publishSomeMessages(Integer numOfMessages) throws Exception
ApiFutures.allAsList(messageIdFutures).get();
}

// Helper function to retry synchronous pull attempts until all outstanding messages are received.
private void syncPullWithRetries(
Integer numOfMessages, Integer maxRetries, CheckedRunnable syncPull) throws Exception {
HashSet<String> outstandingMessages = new HashSet<>();
for (int i = 0; i < numOfMessages; i++) {
outstandingMessages.add("Hello " + i);
}
int attempt = 1;
while ((outstandingMessages.size() > 0) && (attempt <= maxRetries)) {
syncPull.run();
HashSet<String> clone = (HashSet) outstandingMessages.clone();
for (String message : clone) {
if (bout.toString().contains(message)) {
outstandingMessages.remove(message);
}
}
attempt++;
}
assertThat(outstandingMessages).isEmpty();
}

@FunctionalInterface
public interface CheckedRunnable {
void run() throws Exception;
}

@Rule public Timeout globalTimeout = Timeout.seconds(600); // 10 minute timeout

@BeforeClass
Expand Down Expand Up @@ -168,17 +195,17 @@ public void testSubscriber() throws Exception {
publishSomeMessages(3);
bout.reset();
// Test subscribe synchronously.
SubscribeSyncExample.subscribeSyncExample(projectId, subscriptionId, 10);
for (int i = 0; i < 3; i++) {
assertThat(bout.toString()).contains("Hello " + i);
}
syncPullWithRetries(
3, 3, () -> SubscribeSyncExample.subscribeSyncExample(projectId, subscriptionId, 3));

publishSomeMessages(3);
bout.reset();
// Test subscribe synchronously with lease management.
SubscribeSyncWithLeaseExample.subscribeSyncWithLeaseExample(projectId, subscriptionId, 10);
for (int i = 0; i < 3; i++) {
assertThat(bout.toString()).contains("Hello " + i);
}
syncPullWithRetries(
3,
3,
() ->
SubscribeSyncWithLeaseExample.subscribeSyncWithLeaseExample(
projectId, subscriptionId, 10));
}
}

0 comments on commit d981b4e

Please sign in to comment.