Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsub): Add Publisher Flow Control #11383

Merged
merged 19 commits into from Jun 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 3 additions & 2 deletions google-cloud-pubsub/.rubocop.yml
Expand Up @@ -11,6 +11,8 @@ AllCops:
Style/Documentation:
Enabled: false

Metrics/AbcSize:
Max: 35
Metrics/BlockLength:
Exclude:
- "google-cloud-pubsub.gemspec"
Expand All @@ -20,8 +22,7 @@ Metrics/ClassLength:
Metrics/CyclomaticComplexity:
Max: 15
Metrics/MethodLength:
Exclude:
- "lib/google/cloud/pubsub.rb"
Max: 35
Metrics/PerceivedComplexity:
Max: 15
Naming/FileName:
Expand Down
50 changes: 48 additions & 2 deletions google-cloud-pubsub/acceptance/pubsub/async_test.rb
Expand Up @@ -13,10 +13,11 @@
# limitations under the License.

require "pubsub_helper"
require "concurrent/atomics"

describe Google::Cloud::PubSub, :async, :pubsub do
def retrieve_topic topic_name
pubsub.get_topic(topic_name) || pubsub.create_topic(topic_name)
def retrieve_topic topic_name, async: nil
pubsub.get_topic(topic_name, async: async) || pubsub.create_topic(topic_name, async: async)
end

def retrieve_subscription topic, subscription_name
Expand All @@ -27,6 +28,16 @@ def retrieve_subscription topic, subscription_name
let(:nonce) { rand 100 }
let(:topic) { retrieve_topic "#{$topic_prefix}-async#{nonce}" }
let(:sub) { retrieve_subscription topic, "#{$topic_prefix}-async-sub#{nonce}" }
let(:async_flow_control) do
{
interval: 30,
flow_control: {
message_limit: 2,
limit_exceeded_behavior: :error
}
}
end
let(:topic_flow_control) { retrieve_topic "#{$topic_prefix}-async#{nonce}", async: async_flow_control }

it "publishes and pulls asyncronously" do
events = sub.pull
Expand Down Expand Up @@ -268,4 +279,39 @@ def retrieve_subscription topic, subscription_name
# Remove the subscription
sub.delete
end

it "publishes asyncronously with publisher flow control" do
publish_1_done = Concurrent::Event.new
publish_2_done = Concurrent::Event.new
publish_3_done = Concurrent::Event.new

topic_flow_control.publish_async("a") { publish_1_done.set }

flow_controller = topic_flow_control.async_publisher.flow_controller
_(flow_controller.outstanding_messages).must_equal 1

topic_flow_control.publish_async("b") { publish_2_done.set }
_(flow_controller.outstanding_messages).must_equal 2 # Limit

expect do
topic_flow_control.publish_async "c"
end.must_raise Google::Cloud::PubSub::FlowControlLimitError

# Force the queued messages to be published and wait for events.
topic_flow_control.async_publisher.flush
assert publish_1_done.wait(1), "Publishing message 1 errored."
assert publish_2_done.wait(1), "Publishing message 2 errored."

_(flow_controller.outstanding_messages).must_equal 0

topic_flow_control.publish_async("c") { publish_3_done.set }

_(flow_controller.outstanding_messages).must_equal 1

# Force the queued message to be published and wait for event.
topic_flow_control.async_publisher.stop!
assert publish_3_done.wait(1), "Publishing message 3 errored."

_(flow_controller.outstanding_messages).must_equal 0
end
end
29 changes: 23 additions & 6 deletions google-cloud-pubsub/acceptance/pubsub/pubsub_test.rb
Expand Up @@ -252,7 +252,10 @@ def retrieve_snapshot project, subscription, snapshot_name
msg = topic.publish "hello"
_(msg).wont_be :nil?
# Check it received the published message
received_messages = pull_with_retry subscription
wait_for_condition description: "subscription pull" do
received_messages = subscription.pull immediate: false
received_messages.any?
end
_(received_messages).wont_be :empty?
_(received_messages.count).must_equal 1
received_message = received_messages.first
Expand Down Expand Up @@ -284,7 +287,10 @@ def retrieve_snapshot project, subscription, snapshot_name
snapshot = subscription.create_snapshot labels: labels

# Check it pulls the message
received_messages = pull_with_retry subscription
wait_for_condition description: "subscription pull" do
received_messages = subscription.pull immediate: false
received_messages.any?
end
_(received_messages).wont_be :empty?
_(received_messages.count).must_equal 1
received_message = received_messages.first
Expand All @@ -303,7 +309,10 @@ def retrieve_snapshot project, subscription, snapshot_name
subscription.seek snapshot

# Check it again pulls the message
received_messages = pull_with_retry subscription
wait_for_condition description: "subscription pull" do
received_messages = subscription.pull immediate: false
received_messages.any?
end
_(received_messages.count).must_equal 1
received_message = received_messages.first
_(received_message).wont_be :nil?
Expand Down Expand Up @@ -369,7 +378,11 @@ def retrieve_snapshot project, subscription, snapshot_name

# Nack the message
(1..7).each do |i|
received_messages = pull_with_retry subscription
received_messages = []
wait_for_condition description: "subscription pull" do
received_messages = subscription.pull immediate: false
received_messages.any?
end
_(received_messages.count).must_equal 1
received_message = received_messages.first
_(received_message.msg.data).must_equal msg.data
Expand All @@ -378,13 +391,17 @@ def retrieve_snapshot project, subscription, snapshot_name
end

# Check the dead letter subscription pulls the message
received_messages = pull_with_retry dead_letter_subscription
received_messages = []
wait_for_condition description: "subscription pull" do
received_messages = subscription.pull immediate: false
received_messages.any?
end
_(received_messages).wont_be :empty?
_(received_messages.count).must_equal 1
received_message = received_messages.first
_(received_message).wont_be :nil?
_(received_message.msg.data).must_equal msg.data
_(received_message.delivery_attempt).must_be :nil?
_(received_message.delivery_attempt).must_be :>, 0

# update
dead_letter_topic_2 = retrieve_topic dead_letter_topic_name_2
Expand Down
18 changes: 8 additions & 10 deletions google-cloud-pubsub/acceptance/pubsub/schema_test.rb
Expand Up @@ -109,7 +109,10 @@
_(msg).wont_be :nil?

# Check it received the published message
received_messages = pull_with_retry subscription
wait_for_condition description: "subscription pull" do
received_messages = subscription.pull immediate: false
received_messages.any?
end
_(received_messages.count).must_equal 1
received_message = received_messages.first
_(received_message.data).must_equal msg.data
Expand All @@ -131,15 +134,10 @@
# delete
schema.delete

schema = pubsub.schema schema_name
wait_for_condition description: "schema delete" do
schema = pubsub.schema schema_name
schema.nil?
end
_(schema).must_be :nil?

topic = pubsub.topic topic.name
_(topic.schema_name).must_equal "_deleted-schema_"
_(topic.message_encoding).must_equal :BINARY

expect do
pubsub.create_topic topic_name_2, schema_name: schema_name, message_encoding: :binary
end.must_raise Google::Cloud::NotFoundError
end
end
19 changes: 9 additions & 10 deletions google-cloud-pubsub/acceptance/pubsub_helper.rb
Expand Up @@ -60,17 +60,16 @@ def setup
super
end

def pull_with_retry sub
received_messages = []
retries = 0
while retries <= 5 do
received_messages = sub.pull immediate: false
break if received_messages.any?
retries += 1
puts "the subscription does not have the message yet. sleeping for #{retries*retries} second(s) and retrying."
sleep retries*retries
def wait_for_condition description: nil, retries: 10, &callback
count = 0
while count <= retries do
result = callback.call
break if result
count += 1
puts "The #{description} callback has not been satisfied yet. Sleeping for #{count*count} second(s) and retrying."
sleep count*count
end
received_messages
result
end

# Add spec DSL
Expand Down
4 changes: 0 additions & 4 deletions google-cloud-pubsub/lib/google/cloud/pubsub.rb
Expand Up @@ -33,8 +33,6 @@ module Cloud
# See {file:OVERVIEW.md Google Cloud Pub/Sub Overview}.
#
module PubSub
# rubocop:disable Metrics/AbcSize

##
# Creates a new object for connecting to the Pub/Sub service.
# Each call creates a new connection.
Expand Down Expand Up @@ -110,8 +108,6 @@ def self.new project_id: nil,
PubSub::Project.new service
end

# rubocop:enable Metrics/AbcSize

##
# Configure the Google Cloud PubSub library.
#
Expand Down
48 changes: 42 additions & 6 deletions google-cloud-pubsub/lib/google/cloud/pubsub/async_publisher.rb
Expand Up @@ -16,6 +16,7 @@
require "monitor"
require "concurrent"
require "google/cloud/pubsub/errors"
require "google/cloud/pubsub/flow_controller"
require "google/cloud/pubsub/async_publisher/batch"
require "google/cloud/pubsub/publish_result"
require "google/cloud/pubsub/service"
Expand Down Expand Up @@ -65,14 +66,21 @@ class AsyncPublisher
attr_reader :interval
attr_reader :publish_threads
attr_reader :callback_threads
attr_reader :flow_control
##
# @private Implementation accessors
attr_reader :service, :batch, :publish_thread_pool,
:callback_thread_pool
:callback_thread_pool, :flow_controller

##
# @private Create a new instance of the object.
def initialize topic_name, service, max_bytes: 1_000_000, max_messages: 100, interval: 0.01, threads: {}
def initialize topic_name,
service,
max_bytes: 1_000_000,
max_messages: 100,
interval: 0.01,
threads: {},
flow_control: {}
# init MonitorMixin
super()
@topic_name = service.topic_path topic_name
Expand All @@ -83,6 +91,10 @@ def initialize topic_name, service, max_bytes: 1_000_000, max_messages: 100, int
@interval = interval
@publish_threads = (threads[:publish] || 2).to_i
@callback_threads = (threads[:callback] || 4).to_i
@flow_control = {
message_limit: 10 * @max_messages,
byte_limit: 10 * @max_bytes
}.merge(flow_control).freeze

@published_at = nil
@publish_thread_pool = Concurrent::ThreadPoolExecutor.new max_threads: @publish_threads
Expand All @@ -91,7 +103,7 @@ def initialize topic_name, service, max_bytes: 1_000_000, max_messages: 100, int
@ordered = false
@batches = {}
@cond = new_cond

@flow_controller = FlowController.new(**@flow_control)
@thread = Thread.new { run_background }
end

Expand Down Expand Up @@ -121,13 +133,22 @@ def initialize topic_name, service, max_bytes: 1_000_000, max_messages: 100, int
#
def publish data = nil, attributes = nil, ordering_key: nil, **extra_attrs, &callback
msg = Convert.pubsub_message data, attributes, ordering_key, extra_attrs
begin
@flow_controller.acquire msg.to_proto.bytesize
rescue FlowControlLimitError => e
stop_publish ordering_key, e if ordering_key
raise
end

synchronize do
raise AsyncPublisherStopped if @stopped
raise OrderedMessagesDisabled if !@ordered && !msg.ordering_key.empty? # default is empty string

batch = resolve_batch_for_message msg
raise OrderingKeyError, batch.ordering_key if batch.canceled?
if batch.canceled?
@flow_controller.release msg.to_proto.bytesize
raise OrderingKeyError, batch.ordering_key
end
batch_action = batch.add msg, callback
if batch_action == :full
publish_batches!
Expand Down Expand Up @@ -305,6 +326,21 @@ def resolve_batch_for_ordering_key ordering_key
@batches[ordering_key]
end

def stop_publish ordering_key, err
synchronize do
batch = resolve_batch_for_ordering_key ordering_key
return if batch.nil?
items = batch.cancel!
items.each do |item|
@flow_controller.release item.bytesize
next unless item.callback

publish_result = PublishResult.from_error item.msg, err
execute_callback_async item.callback, publish_result
quartzmo marked this conversation as resolved.
Show resolved Hide resolved
end
end
end

def publish_batches! stop: nil
@batches.reject! { |_ordering_key, batch| batch.empty? }
@batches.each_value do |batch|
Expand All @@ -325,7 +361,6 @@ def publish_batch_async topic_name, batch
end

# rubocop:disable Metrics/AbcSize
# rubocop:disable Metrics/MethodLength

def publish_batch_sync topic_name, batch
# The only batch methods that are safe to call from the loop are
Expand All @@ -337,6 +372,7 @@ def publish_batch_sync topic_name, batch
unless items.empty?
grpc = @service.publish topic_name, items.map(&:msg)
items.zip Array(grpc.message_ids) do |item, id|
@flow_controller.release item.bytesize
next unless item.callback

item.msg.message_id = id
Expand All @@ -363,6 +399,7 @@ def publish_batch_sync topic_name, batch
end

items.each do |item|
@flow_controller.release item.bytesize
next unless item.callback

publish_result = PublishResult.from_error item.msg, e
Expand All @@ -374,7 +411,6 @@ def publish_batch_sync topic_name, batch
end

# rubocop:enable Metrics/AbcSize
# rubocop:enable Metrics/MethodLength

PUBLISH_RETRY_ERRORS = [
GRPC::Cancelled, GRPC::DeadlineExceeded, GRPC::Internal,
Expand Down
Expand Up @@ -129,8 +129,8 @@ def stopping?
end

##
# Rebalances the batch by moving any queued items that will fit into
# the active item list.
# Fills the batch by sequentially moving the queued items that will
# fit into the active item list.
#
# This method is only intended to be used by the active publishing
# job.
Expand All @@ -152,8 +152,6 @@ def rebalance!
end
end

# rubocop:disable Metrics/MethodLength

##
# Resets the batch after a successful publish. This clears the active
# item list and moves the queued items that will fit into the active
Expand Down Expand Up @@ -202,8 +200,6 @@ def reset!
true
end

# rubocop:enable Metrics/MethodLength

##
# Cancel the batch and hault futher batches until resumed. See
# {#resume!} and {#canceled?}.
Expand Down