diff --git a/samples/snippets/src/test/java/pubsub/SubscriberIT.java b/samples/snippets/src/test/java/pubsub/SubscriberIT.java index f69acd8cb..aae4fb5af 100644 --- a/samples/snippets/src/test/java/pubsub/SubscriberIT.java +++ b/samples/snippets/src/test/java/pubsub/SubscriberIT.java @@ -37,6 +37,7 @@ import java.io.ByteArrayOutputStream; import java.io.PrintStream; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.UUID; import org.junit.After; @@ -84,6 +85,32 @@ private static void publishSomeMessages(Integer numOfMessages) throws Exception ApiFutures.allAsList(messageIdFutures).get(); } + // Helper function to retry synchronous pull attempts until all outstanding messages are received. + private void syncPullWithRetries( + Integer numOfMessages, Integer maxRetries, CheckedRunnable syncPull) throws Exception { + HashSet outstandingMessages = new HashSet<>(); + for (int i = 0; i < numOfMessages; i++) { + outstandingMessages.add("Hello " + i); + } + int attempt = 1; + while ((outstandingMessages.size() > 0) && (attempt <= maxRetries)) { + syncPull.run(); + HashSet clone = (HashSet) outstandingMessages.clone(); + for (String message : clone) { + if (bout.toString().contains(message)) { + outstandingMessages.remove(message); + } + } + attempt++; + } + assertThat(outstandingMessages).isEmpty(); + } + + @FunctionalInterface + public interface CheckedRunnable { + void run() throws Exception; + } + @Rule public Timeout globalTimeout = Timeout.seconds(600); // 10 minute timeout @BeforeClass @@ -168,17 +195,17 @@ public void testSubscriber() throws Exception { publishSomeMessages(3); bout.reset(); // Test subscribe synchronously. - SubscribeSyncExample.subscribeSyncExample(projectId, subscriptionId, 10); - for (int i = 0; i < 3; i++) { - assertThat(bout.toString()).contains("Hello " + i); - } + syncPullWithRetries( + 3, 3, () -> SubscribeSyncExample.subscribeSyncExample(projectId, subscriptionId, 3)); publishSomeMessages(3); bout.reset(); // Test subscribe synchronously with lease management. - SubscribeSyncWithLeaseExample.subscribeSyncWithLeaseExample(projectId, subscriptionId, 10); - for (int i = 0; i < 3; i++) { - assertThat(bout.toString()).contains("Hello " + i); - } + syncPullWithRetries( + 3, + 3, + () -> + SubscribeSyncWithLeaseExample.subscribeSyncWithLeaseExample( + projectId, subscriptionId, 10)); } }