Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsub): Set client-scoped UUID in initial StreamingPullRequest#client_id #5925

Merged
merged 1 commit into from May 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 8 additions & 0 deletions google-cloud-pubsub/lib/google/cloud/pubsub/service.rb
Expand Up @@ -19,6 +19,7 @@
require "google/cloud/pubsub/version"
require "google/cloud/pubsub/v1"
require "google/gax/errors"
require "securerandom"

module Google
module Cloud
Expand All @@ -28,6 +29,12 @@ module PubSub
# methods.
class Service
attr_accessor :project, :credentials, :host, :timeout, :client_config
###
# The same client_id is used across all streaming pull connections that are created by this client. This is
# intentional, as it indicates to the server that any guarantees, such as message ordering, made for a stream
# that is disconnected will be made for the stream that is created to replace it. The attr_accessor allows the
# value to be replaced for unit testing.
attr_accessor :client_id

##
# Creates a new Service instance.
Expand All @@ -38,6 +45,7 @@ def initialize project, credentials, host: nil, timeout: nil,
@host = host || V1::PublisherClient::SERVICE_ADDRESS
@timeout = timeout
@client_config = client_config || {}
@client_id = SecureRandom.uuid.freeze
quartzmo marked this conversation as resolved.
Show resolved Hide resolved
end

def channel
Expand Down
Expand Up @@ -363,6 +363,7 @@ def initial_input_request
req.stream_ack_deadline_seconds = @subscriber.deadline
req.modify_deadline_ack_ids += @inventory.ack_ids
req.modify_deadline_seconds += @inventory.ack_ids.map { @subscriber.deadline }
req.client_id = @subscriber.service.client_id
end
end

Expand Down
Expand Up @@ -27,6 +27,7 @@
rec_message_hash("rec_message2-msg-goes-here", 1112) }
let(:rec_msg3_grpc) { Google::Cloud::PubSub::V1::ReceivedMessage.new \
rec_message_hash("rec_message3-msg-goes-here", 1113) }
let(:client_id) { "my-client-uuid" }

it "can acknowledge a single message" do
rec_message_msg = "pulled-message"
Expand All @@ -38,6 +39,8 @@
called = false

subscription.service.mocked_subscriber = stub
subscription.service.client_id = client_id

subscriber = subscription.listen streams: 1 do |result|
# flush the initial buffer before any callbacks are processed
subscriber.buffer.flush! unless called
Expand All @@ -63,6 +66,7 @@

_(stub.requests.map(&:to_a)).must_equal [
[Google::Cloud::PubSub::V1::StreamingPullRequest.new(
client_id: client_id,
subscription: sub_path,
stream_ack_deadline_seconds: 60
)]
Expand Down Expand Up @@ -97,6 +101,8 @@
called = 0

subscription.service.mocked_subscriber = stub
subscription.service.client_id = client_id

subscriber = subscription.listen streams: 1 do |msg|
# flush the initial buffer before any callbacks are processed
subscriber.buffer.flush! if called.zero?
Expand All @@ -119,6 +125,7 @@

_(stub.requests.map(&:to_a)).must_equal [
[Google::Cloud::PubSub::V1::StreamingPullRequest.new(
client_id: client_id,
subscription: sub_path,
stream_ack_deadline_seconds: 60
)]
Expand Down
Expand Up @@ -27,6 +27,7 @@
rec_message_hash("rec_message2-msg-goes-here", 1112) }
let(:rec_msg3_grpc) { Google::Cloud::PubSub::V1::ReceivedMessage.new \
rec_message_hash("rec_message3-msg-goes-here", 1113) }
let(:client_id) { "my-client-uuid" }

it "removes a single message from inventory, even when ack or nack are not called" do
rec_message_msg = "pulled-message"
Expand All @@ -38,6 +39,8 @@
called = false

subscription.service.mocked_subscriber = stub
subscription.service.client_id = client_id

subscriber = subscription.listen streams: 1 do |result|
# flush the initial buffer before any callbacks are processed
subscriber.buffer.flush! unless called
Expand Down Expand Up @@ -68,6 +71,7 @@

_(stub.requests.map(&:to_a)).must_equal [
[Google::Cloud::PubSub::V1::StreamingPullRequest.new(
client_id: client_id,
subscription: sub_path,
stream_ack_deadline_seconds: 60
)]
Expand Down Expand Up @@ -102,6 +106,8 @@
called = 0

subscription.service.mocked_subscriber = stub
subscription.service.client_id = client_id

subscriber = subscription.listen streams: 1 do |msg|
# flush the initial buffer before any callbacks are processed
subscriber.buffer.flush! if called.zero?
Expand All @@ -128,6 +134,7 @@

_(stub.requests.map(&:to_a)).must_equal [
[Google::Cloud::PubSub::V1::StreamingPullRequest.new(
client_id: client_id,
subscription: sub_path,
stream_ack_deadline_seconds: 60
)]
Expand Down
Expand Up @@ -27,6 +27,7 @@
rec_message_hash("rec_message2-msg-goes-here", 1112) }
let(:rec_msg3_grpc) { Google::Cloud::PubSub::V1::ReceivedMessage.new \
rec_message_hash("rec_message3-msg-goes-here", 1113) }
let(:client_id) { "my-client-uuid" }

it "can modify_ack_deadline a single message" do
rec_message_msg = "pulled-message"
Expand All @@ -38,6 +39,8 @@
called = false

subscription.service.mocked_subscriber = stub
subscription.service.client_id = client_id

subscriber = subscription.listen streams: 1 do |msg|
# flush the initial buffer before any callbacks are processed
subscriber.buffer.flush! unless called
Expand All @@ -63,6 +66,7 @@

_(stub.requests.map(&:to_a)).must_equal [
[Google::Cloud::PubSub::V1::StreamingPullRequest.new(
client_id: client_id,
subscription: sub_path,
stream_ack_deadline_seconds: 60
)]
Expand Down Expand Up @@ -92,6 +96,8 @@
called = 0

subscription.service.mocked_subscriber = stub
subscription.service.client_id = client_id

subscriber = subscription.listen streams: 1 do |msg|
# flush the initial buffer before any callbacks are processed
subscriber.buffer.flush! if called.zero?
Expand All @@ -114,6 +120,7 @@

_(stub.requests.map(&:to_a)).must_equal [
[Google::Cloud::PubSub::V1::StreamingPullRequest.new(
client_id: client_id,
subscription: sub_path,
stream_ack_deadline_seconds: 60
)]
Expand Down
Expand Up @@ -27,6 +27,7 @@
rec_message_hash("rec_message2-msg-goes-here", 1112) }
let(:rec_msg3_grpc) { Google::Cloud::PubSub::V1::ReceivedMessage.new \
rec_message_hash("rec_message3-msg-goes-here", 1113) }
let(:client_id) { "my-client-uuid" }

it "can nack a single message" do
rec_message_msg = "pulled-message"
Expand All @@ -38,6 +39,8 @@
called = false

subscription.service.mocked_subscriber = stub
subscription.service.client_id = client_id

subscriber = subscription.listen streams: 1 do |msg|
# flush the initial buffer before any callbacks are processed
subscriber.buffer.flush! unless called
Expand All @@ -62,6 +65,7 @@

_(stub.requests.map(&:to_a)).must_equal [
[Google::Cloud::PubSub::V1::StreamingPullRequest.new(
client_id: client_id,
subscription: sub_path,
stream_ack_deadline_seconds: 60
)]
Expand Down Expand Up @@ -91,6 +95,8 @@
called = 0

subscription.service.mocked_subscriber = stub
subscription.service.client_id = client_id

subscriber = subscription.listen streams: 1 do |msg|
# flush the initial buffer before any callbacks are processed
subscriber.buffer.flush! if called.zero?
Expand All @@ -113,6 +119,7 @@

_(stub.requests.map(&:to_a)).must_equal [
[Google::Cloud::PubSub::V1::StreamingPullRequest.new(
client_id: client_id,
subscription: sub_path,
stream_ack_deadline_seconds: 60
)]
Expand Down