Skip to content

Commit

Permalink
feat(pubsub): Set client-scoped UUID in initial StreamingPullRequest#…
Browse files Browse the repository at this point in the history
…client_id

closes: googleapis#5924
  • Loading branch information
quartzmo committed May 6, 2020
1 parent f6b402f commit 29c1375
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 0 deletions.
8 changes: 8 additions & 0 deletions google-cloud-pubsub/lib/google/cloud/pubsub/service.rb
Expand Up @@ -19,6 +19,7 @@
require "google/cloud/pubsub/version"
require "google/cloud/pubsub/v1"
require "google/gax/errors"
require "securerandom"

module Google
module Cloud
Expand All @@ -28,6 +29,12 @@ module PubSub
# methods.
class Service
attr_accessor :project, :credentials, :host, :timeout, :client_config
###
# The same client_id is used across all streaming pull connections that are created by this client. This is
# intentional, as it indicates to the server that any guarantees, such as message ordering, made for a stream
# that is disconnected will be made for the stream that is created to replace it. The attr_accessor allows the
# value to be replaced for unit testing.
attr_accessor :client_id

##
# Creates a new Service instance.
Expand All @@ -38,6 +45,7 @@ def initialize project, credentials, host: nil, timeout: nil,
@host = host || V1::PublisherClient::SERVICE_ADDRESS
@timeout = timeout
@client_config = client_config || {}
@client_id = SecureRandom.uuid.freeze
end

def channel
Expand Down
Expand Up @@ -363,6 +363,7 @@ def initial_input_request
req.stream_ack_deadline_seconds = @subscriber.deadline
req.modify_deadline_ack_ids += @inventory.ack_ids
req.modify_deadline_seconds += @inventory.ack_ids.map { @subscriber.deadline }
req.client_id = @subscriber.service.client_id
end
end

Expand Down
Expand Up @@ -27,6 +27,7 @@
rec_message_hash("rec_message2-msg-goes-here", 1112) }
let(:rec_msg3_grpc) { Google::Cloud::PubSub::V1::ReceivedMessage.new \
rec_message_hash("rec_message3-msg-goes-here", 1113) }
let(:client_id) { "my-client-uuid" }

it "can acknowledge a single message" do
rec_message_msg = "pulled-message"
Expand All @@ -38,6 +39,8 @@
called = false

subscription.service.mocked_subscriber = stub
subscription.service.client_id = client_id

subscriber = subscription.listen streams: 1 do |result|
# flush the initial buffer before any callbacks are processed
subscriber.buffer.flush! unless called
Expand All @@ -63,6 +66,7 @@

_(stub.requests.map(&:to_a)).must_equal [
[Google::Cloud::PubSub::V1::StreamingPullRequest.new(
client_id: client_id,
subscription: sub_path,
stream_ack_deadline_seconds: 60
)]
Expand Down Expand Up @@ -97,6 +101,8 @@
called = 0

subscription.service.mocked_subscriber = stub
subscription.service.client_id = client_id

subscriber = subscription.listen streams: 1 do |msg|
# flush the initial buffer before any callbacks are processed
subscriber.buffer.flush! if called.zero?
Expand All @@ -119,6 +125,7 @@

_(stub.requests.map(&:to_a)).must_equal [
[Google::Cloud::PubSub::V1::StreamingPullRequest.new(
client_id: client_id,
subscription: sub_path,
stream_ack_deadline_seconds: 60
)]
Expand Down
Expand Up @@ -27,6 +27,7 @@
rec_message_hash("rec_message2-msg-goes-here", 1112) }
let(:rec_msg3_grpc) { Google::Cloud::PubSub::V1::ReceivedMessage.new \
rec_message_hash("rec_message3-msg-goes-here", 1113) }
let(:client_id) { "my-client-uuid" }

it "removes a single message from inventory, even when ack or nack are not called" do
rec_message_msg = "pulled-message"
Expand All @@ -38,6 +39,8 @@
called = false

subscription.service.mocked_subscriber = stub
subscription.service.client_id = client_id

subscriber = subscription.listen streams: 1 do |result|
# flush the initial buffer before any callbacks are processed
subscriber.buffer.flush! unless called
Expand Down Expand Up @@ -68,6 +71,7 @@

_(stub.requests.map(&:to_a)).must_equal [
[Google::Cloud::PubSub::V1::StreamingPullRequest.new(
client_id: client_id,
subscription: sub_path,
stream_ack_deadline_seconds: 60
)]
Expand Down Expand Up @@ -102,6 +106,8 @@
called = 0

subscription.service.mocked_subscriber = stub
subscription.service.client_id = client_id

subscriber = subscription.listen streams: 1 do |msg|
# flush the initial buffer before any callbacks are processed
subscriber.buffer.flush! if called.zero?
Expand All @@ -128,6 +134,7 @@

_(stub.requests.map(&:to_a)).must_equal [
[Google::Cloud::PubSub::V1::StreamingPullRequest.new(
client_id: client_id,
subscription: sub_path,
stream_ack_deadline_seconds: 60
)]
Expand Down
Expand Up @@ -27,6 +27,7 @@
rec_message_hash("rec_message2-msg-goes-here", 1112) }
let(:rec_msg3_grpc) { Google::Cloud::PubSub::V1::ReceivedMessage.new \
rec_message_hash("rec_message3-msg-goes-here", 1113) }
let(:client_id) { "my-client-uuid" }

it "can modify_ack_deadline a single message" do
rec_message_msg = "pulled-message"
Expand All @@ -38,6 +39,8 @@
called = false

subscription.service.mocked_subscriber = stub
subscription.service.client_id = client_id

subscriber = subscription.listen streams: 1 do |msg|
# flush the initial buffer before any callbacks are processed
subscriber.buffer.flush! unless called
Expand All @@ -63,6 +66,7 @@

_(stub.requests.map(&:to_a)).must_equal [
[Google::Cloud::PubSub::V1::StreamingPullRequest.new(
client_id: client_id,
subscription: sub_path,
stream_ack_deadline_seconds: 60
)]
Expand Down Expand Up @@ -92,6 +96,8 @@
called = 0

subscription.service.mocked_subscriber = stub
subscription.service.client_id = client_id

subscriber = subscription.listen streams: 1 do |msg|
# flush the initial buffer before any callbacks are processed
subscriber.buffer.flush! if called.zero?
Expand All @@ -114,6 +120,7 @@

_(stub.requests.map(&:to_a)).must_equal [
[Google::Cloud::PubSub::V1::StreamingPullRequest.new(
client_id: client_id,
subscription: sub_path,
stream_ack_deadline_seconds: 60
)]
Expand Down
Expand Up @@ -27,6 +27,7 @@
rec_message_hash("rec_message2-msg-goes-here", 1112) }
let(:rec_msg3_grpc) { Google::Cloud::PubSub::V1::ReceivedMessage.new \
rec_message_hash("rec_message3-msg-goes-here", 1113) }
let(:client_id) { "my-client-uuid" }

it "can nack a single message" do
rec_message_msg = "pulled-message"
Expand All @@ -38,6 +39,8 @@
called = false

subscription.service.mocked_subscriber = stub
subscription.service.client_id = client_id

subscriber = subscription.listen streams: 1 do |msg|
# flush the initial buffer before any callbacks are processed
subscriber.buffer.flush! unless called
Expand All @@ -62,6 +65,7 @@

_(stub.requests.map(&:to_a)).must_equal [
[Google::Cloud::PubSub::V1::StreamingPullRequest.new(
client_id: client_id,
subscription: sub_path,
stream_ack_deadline_seconds: 60
)]
Expand Down Expand Up @@ -91,6 +95,8 @@
called = 0

subscription.service.mocked_subscriber = stub
subscription.service.client_id = client_id

subscriber = subscription.listen streams: 1 do |msg|
# flush the initial buffer before any callbacks are processed
subscriber.buffer.flush! if called.zero?
Expand All @@ -113,6 +119,7 @@

_(stub.requests.map(&:to_a)).must_equal [
[Google::Cloud::PubSub::V1::StreamingPullRequest.new(
client_id: client_id,
subscription: sub_path,
stream_ack_deadline_seconds: 60
)]
Expand Down

0 comments on commit 29c1375

Please sign in to comment.