From 08e77d428aa50bb53ed7d5b922e76c2da18ed6d1 Mon Sep 17 00:00:00 2001 From: Kamal Aboul-Hosn Date: Fri, 31 Jan 2020 15:48:52 -0500 Subject: [PATCH] feat: add randomly generated UUID to outgoing initial streaming pull requests (#77) * google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/FakePublisherServiceImpl.java * Ensure that if a batch is started and the timeout completes before the currently outstanding message has finished publishing with an ordering key that the last batch does in fact get published. * add back in unit test * feat: add randomly generated UUID to outgoing initial streaming pull requests for better ordering keys affinity --- .../cloud/pubsub/v1/StreamingSubscriberConnection.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java index 45f7fb90c..047e1ba75 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java @@ -45,6 +45,7 @@ import com.google.pubsub.v1.StreamingPullResponse; import io.grpc.Status; import java.util.List; +import java.util.UUID; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -77,6 +78,13 @@ final class StreamingSubscriberConnection extends AbstractApiService implements private final Lock lock = new ReentrantLock(); private ClientStream clientStream; + /** + * The same clientId is used across all streaming pull connections that are created. This is + * intentional, as it indicates to the server that any guarantees made for a stream that + * disconnected will be made for the stream that is created to replace it. + */ + private final String clientId = UUID.randomUUID().toString(); + public StreamingSubscriberConnection( String subscription, MessageReceiver receiver, @@ -200,6 +208,7 @@ private void initialize() { StreamingPullRequest.newBuilder() .setSubscription(subscription) .setStreamAckDeadlineSeconds(60) + .setClientId(clientId) .build()); /**