diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java index 45f7fb90c..047e1ba75 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java @@ -45,6 +45,7 @@ import com.google.pubsub.v1.StreamingPullResponse; import io.grpc.Status; import java.util.List; +import java.util.UUID; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -77,6 +78,13 @@ final class StreamingSubscriberConnection extends AbstractApiService implements private final Lock lock = new ReentrantLock(); private ClientStream clientStream; + /** + * The same clientId is used across all streaming pull connections that are created. This is + * intentional, as it indicates to the server that any guarantees made for a stream that + * disconnected will be made for the stream that is created to replace it. + */ + private final String clientId = UUID.randomUUID().toString(); + public StreamingSubscriberConnection( String subscription, MessageReceiver receiver, @@ -200,6 +208,7 @@ private void initialize() { StreamingPullRequest.newBuilder() .setSubscription(subscription) .setStreamAckDeadlineSeconds(60) + .setClientId(clientId) .build()); /**