diff --git a/google-cloud-pubsub/lib/google/cloud/pubsub/service.rb b/google-cloud-pubsub/lib/google/cloud/pubsub/service.rb index 31bb38f9d128..6fd2bb011c9d 100644 --- a/google-cloud-pubsub/lib/google/cloud/pubsub/service.rb +++ b/google-cloud-pubsub/lib/google/cloud/pubsub/service.rb @@ -19,6 +19,7 @@ require "google/cloud/pubsub/version" require "google/cloud/pubsub/v1" require "google/gax/errors" +require "securerandom" module Google module Cloud @@ -28,6 +29,12 @@ module PubSub # methods. class Service attr_accessor :project, :credentials, :host, :timeout, :client_config + ### + # The same client_id is used across all streaming pull connections that are created by this client. This is + # intentional, as it indicates to the server that any guarantees, such as message ordering, made for a stream + # that is disconnected will be made for the stream that is created to replace it. The attr_accessor allows the + # value to be replaced for unit testing. + attr_accessor :client_id ## # Creates a new Service instance. @@ -38,6 +45,7 @@ def initialize project, credentials, host: nil, timeout: nil, @host = host || V1::PublisherClient::SERVICE_ADDRESS @timeout = timeout @client_config = client_config || {} + @client_id = SecureRandom.uuid.freeze end def channel diff --git a/google-cloud-pubsub/lib/google/cloud/pubsub/subscriber/stream.rb b/google-cloud-pubsub/lib/google/cloud/pubsub/subscriber/stream.rb index 721b2efb602e..2daeed8d6cbe 100644 --- a/google-cloud-pubsub/lib/google/cloud/pubsub/subscriber/stream.rb +++ b/google-cloud-pubsub/lib/google/cloud/pubsub/subscriber/stream.rb @@ -363,6 +363,7 @@ def initial_input_request req.stream_ack_deadline_seconds = @subscriber.deadline req.modify_deadline_ack_ids += @inventory.ack_ids req.modify_deadline_seconds += @inventory.ack_ids.map { @subscriber.deadline } + req.client_id = @subscriber.service.client_id end end diff --git a/google-cloud-pubsub/test/google/cloud/pubsub/subscriber/acknowledge_test.rb b/google-cloud-pubsub/test/google/cloud/pubsub/subscriber/acknowledge_test.rb index fab81860fffb..d475d77fe950 100644 --- a/google-cloud-pubsub/test/google/cloud/pubsub/subscriber/acknowledge_test.rb +++ b/google-cloud-pubsub/test/google/cloud/pubsub/subscriber/acknowledge_test.rb @@ -27,6 +27,7 @@ rec_message_hash("rec_message2-msg-goes-here", 1112) } let(:rec_msg3_grpc) { Google::Cloud::PubSub::V1::ReceivedMessage.new \ rec_message_hash("rec_message3-msg-goes-here", 1113) } + let(:client_id) { "my-client-uuid" } it "can acknowledge a single message" do rec_message_msg = "pulled-message" @@ -38,6 +39,8 @@ called = false subscription.service.mocked_subscriber = stub + subscription.service.client_id = client_id + subscriber = subscription.listen streams: 1 do |result| # flush the initial buffer before any callbacks are processed subscriber.buffer.flush! unless called @@ -63,6 +66,7 @@ _(stub.requests.map(&:to_a)).must_equal [ [Google::Cloud::PubSub::V1::StreamingPullRequest.new( + client_id: client_id, subscription: sub_path, stream_ack_deadline_seconds: 60 )] @@ -97,6 +101,8 @@ called = 0 subscription.service.mocked_subscriber = stub + subscription.service.client_id = client_id + subscriber = subscription.listen streams: 1 do |msg| # flush the initial buffer before any callbacks are processed subscriber.buffer.flush! if called.zero? @@ -119,6 +125,7 @@ _(stub.requests.map(&:to_a)).must_equal [ [Google::Cloud::PubSub::V1::StreamingPullRequest.new( + client_id: client_id, subscription: sub_path, stream_ack_deadline_seconds: 60 )] diff --git a/google-cloud-pubsub/test/google/cloud/pubsub/subscriber/inventory_test.rb b/google-cloud-pubsub/test/google/cloud/pubsub/subscriber/inventory_test.rb index 317ff2ff7dda..89520a741203 100644 --- a/google-cloud-pubsub/test/google/cloud/pubsub/subscriber/inventory_test.rb +++ b/google-cloud-pubsub/test/google/cloud/pubsub/subscriber/inventory_test.rb @@ -27,6 +27,7 @@ rec_message_hash("rec_message2-msg-goes-here", 1112) } let(:rec_msg3_grpc) { Google::Cloud::PubSub::V1::ReceivedMessage.new \ rec_message_hash("rec_message3-msg-goes-here", 1113) } + let(:client_id) { "my-client-uuid" } it "removes a single message from inventory, even when ack or nack are not called" do rec_message_msg = "pulled-message" @@ -38,6 +39,8 @@ called = false subscription.service.mocked_subscriber = stub + subscription.service.client_id = client_id + subscriber = subscription.listen streams: 1 do |result| # flush the initial buffer before any callbacks are processed subscriber.buffer.flush! unless called @@ -68,6 +71,7 @@ _(stub.requests.map(&:to_a)).must_equal [ [Google::Cloud::PubSub::V1::StreamingPullRequest.new( + client_id: client_id, subscription: sub_path, stream_ack_deadline_seconds: 60 )] @@ -102,6 +106,8 @@ called = 0 subscription.service.mocked_subscriber = stub + subscription.service.client_id = client_id + subscriber = subscription.listen streams: 1 do |msg| # flush the initial buffer before any callbacks are processed subscriber.buffer.flush! if called.zero? @@ -128,6 +134,7 @@ _(stub.requests.map(&:to_a)).must_equal [ [Google::Cloud::PubSub::V1::StreamingPullRequest.new( + client_id: client_id, subscription: sub_path, stream_ack_deadline_seconds: 60 )] diff --git a/google-cloud-pubsub/test/google/cloud/pubsub/subscriber/modify_ack_deadline_test.rb b/google-cloud-pubsub/test/google/cloud/pubsub/subscriber/modify_ack_deadline_test.rb index c82a89cd5026..228a8c0065f0 100644 --- a/google-cloud-pubsub/test/google/cloud/pubsub/subscriber/modify_ack_deadline_test.rb +++ b/google-cloud-pubsub/test/google/cloud/pubsub/subscriber/modify_ack_deadline_test.rb @@ -27,6 +27,7 @@ rec_message_hash("rec_message2-msg-goes-here", 1112) } let(:rec_msg3_grpc) { Google::Cloud::PubSub::V1::ReceivedMessage.new \ rec_message_hash("rec_message3-msg-goes-here", 1113) } + let(:client_id) { "my-client-uuid" } it "can modify_ack_deadline a single message" do rec_message_msg = "pulled-message" @@ -38,6 +39,8 @@ called = false subscription.service.mocked_subscriber = stub + subscription.service.client_id = client_id + subscriber = subscription.listen streams: 1 do |msg| # flush the initial buffer before any callbacks are processed subscriber.buffer.flush! unless called @@ -63,6 +66,7 @@ _(stub.requests.map(&:to_a)).must_equal [ [Google::Cloud::PubSub::V1::StreamingPullRequest.new( + client_id: client_id, subscription: sub_path, stream_ack_deadline_seconds: 60 )] @@ -92,6 +96,8 @@ called = 0 subscription.service.mocked_subscriber = stub + subscription.service.client_id = client_id + subscriber = subscription.listen streams: 1 do |msg| # flush the initial buffer before any callbacks are processed subscriber.buffer.flush! if called.zero? @@ -114,6 +120,7 @@ _(stub.requests.map(&:to_a)).must_equal [ [Google::Cloud::PubSub::V1::StreamingPullRequest.new( + client_id: client_id, subscription: sub_path, stream_ack_deadline_seconds: 60 )] diff --git a/google-cloud-pubsub/test/google/cloud/pubsub/subscriber/nack_test.rb b/google-cloud-pubsub/test/google/cloud/pubsub/subscriber/nack_test.rb index 1eaf2e4e1af1..6887b87e8598 100644 --- a/google-cloud-pubsub/test/google/cloud/pubsub/subscriber/nack_test.rb +++ b/google-cloud-pubsub/test/google/cloud/pubsub/subscriber/nack_test.rb @@ -27,6 +27,7 @@ rec_message_hash("rec_message2-msg-goes-here", 1112) } let(:rec_msg3_grpc) { Google::Cloud::PubSub::V1::ReceivedMessage.new \ rec_message_hash("rec_message3-msg-goes-here", 1113) } + let(:client_id) { "my-client-uuid" } it "can nack a single message" do rec_message_msg = "pulled-message" @@ -38,6 +39,8 @@ called = false subscription.service.mocked_subscriber = stub + subscription.service.client_id = client_id + subscriber = subscription.listen streams: 1 do |msg| # flush the initial buffer before any callbacks are processed subscriber.buffer.flush! unless called @@ -62,6 +65,7 @@ _(stub.requests.map(&:to_a)).must_equal [ [Google::Cloud::PubSub::V1::StreamingPullRequest.new( + client_id: client_id, subscription: sub_path, stream_ack_deadline_seconds: 60 )] @@ -91,6 +95,8 @@ called = 0 subscription.service.mocked_subscriber = stub + subscription.service.client_id = client_id + subscriber = subscription.listen streams: 1 do |msg| # flush the initial buffer before any callbacks are processed subscriber.buffer.flush! if called.zero? @@ -113,6 +119,7 @@ _(stub.requests.map(&:to_a)).must_equal [ [Google::Cloud::PubSub::V1::StreamingPullRequest.new( + client_id: client_id, subscription: sub_path, stream_ack_deadline_seconds: 60 )]