Skip to content

Commit

Permalink
feat(pubsub): Add Publisher Flow Control
Browse files Browse the repository at this point in the history
* Add flow_control to async options in Project#create_topic and Project#topic
* Add FlowControlLimitError

pr: #11383
  • Loading branch information
quartzmo committed Jun 10, 2021
1 parent 0c5ca60 commit cfa68ba
Show file tree
Hide file tree
Showing 18 changed files with 1,154 additions and 59 deletions.
5 changes: 3 additions & 2 deletions google-cloud-pubsub/.rubocop.yml
Expand Up @@ -11,6 +11,8 @@ AllCops:
Style/Documentation:
Enabled: false

Metrics/AbcSize:
Max: 35
Metrics/BlockLength:
Exclude:
- "google-cloud-pubsub.gemspec"
Expand All @@ -20,8 +22,7 @@ Metrics/ClassLength:
Metrics/CyclomaticComplexity:
Max: 15
Metrics/MethodLength:
Exclude:
- "lib/google/cloud/pubsub.rb"
Max: 35
Metrics/PerceivedComplexity:
Max: 15
Naming/FileName:
Expand Down
50 changes: 48 additions & 2 deletions google-cloud-pubsub/acceptance/pubsub/async_test.rb
Expand Up @@ -13,10 +13,11 @@
# limitations under the License.

require "pubsub_helper"
require "concurrent/atomics"

describe Google::Cloud::PubSub, :async, :pubsub do
def retrieve_topic topic_name
pubsub.get_topic(topic_name) || pubsub.create_topic(topic_name)
def retrieve_topic topic_name, async: nil
pubsub.get_topic(topic_name, async: async) || pubsub.create_topic(topic_name, async: async)
end

def retrieve_subscription topic, subscription_name
Expand All @@ -27,6 +28,16 @@ def retrieve_subscription topic, subscription_name
let(:nonce) { rand 100 }
let(:topic) { retrieve_topic "#{$topic_prefix}-async#{nonce}" }
let(:sub) { retrieve_subscription topic, "#{$topic_prefix}-async-sub#{nonce}" }
let(:async_flow_control) do
{
interval: 30,
flow_control: {
message_limit: 2,
limit_exceeded_behavior: :error
}
}
end
let(:topic_flow_control) { retrieve_topic "#{$topic_prefix}-async#{nonce}", async: async_flow_control }

it "publishes and pulls asyncronously" do
events = sub.pull
Expand Down Expand Up @@ -268,4 +279,39 @@ def retrieve_subscription topic, subscription_name
# Remove the subscription
sub.delete
end

it "publishes asyncronously with publisher flow control" do
publish_1_done = Concurrent::Event.new
publish_2_done = Concurrent::Event.new
publish_3_done = Concurrent::Event.new

topic_flow_control.publish_async("a") { publish_1_done.set }

flow_controller = topic_flow_control.async_publisher.flow_controller
_(flow_controller.outstanding_messages).must_equal 1

topic_flow_control.publish_async("b") { publish_2_done.set }
_(flow_controller.outstanding_messages).must_equal 2 # Limit

expect do
topic_flow_control.publish_async "c"
end.must_raise Google::Cloud::PubSub::FlowControlLimitError

# Force the queued messages to be published and wait for events.
topic_flow_control.async_publisher.flush
assert publish_1_done.wait(1), "Publishing message 1 errored."
assert publish_2_done.wait(1), "Publishing message 2 errored."

_(flow_controller.outstanding_messages).must_equal 0

topic_flow_control.publish_async("c") { publish_3_done.set }

_(flow_controller.outstanding_messages).must_equal 1

# Force the queued message to be published and wait for event.
topic_flow_control.async_publisher.stop!
assert publish_3_done.wait(1), "Publishing message 3 errored."

_(flow_controller.outstanding_messages).must_equal 0
end
end
29 changes: 23 additions & 6 deletions google-cloud-pubsub/acceptance/pubsub/pubsub_test.rb
Expand Up @@ -252,7 +252,10 @@ def retrieve_snapshot project, subscription, snapshot_name
msg = topic.publish "hello"
_(msg).wont_be :nil?
# Check it received the published message
received_messages = pull_with_retry subscription
wait_for_condition description: "subscription pull" do
received_messages = subscription.pull immediate: false
received_messages.any?
end
_(received_messages).wont_be :empty?
_(received_messages.count).must_equal 1
received_message = received_messages.first
Expand Down Expand Up @@ -284,7 +287,10 @@ def retrieve_snapshot project, subscription, snapshot_name
snapshot = subscription.create_snapshot labels: labels

# Check it pulls the message
received_messages = pull_with_retry subscription
wait_for_condition description: "subscription pull" do
received_messages = subscription.pull immediate: false
received_messages.any?
end
_(received_messages).wont_be :empty?
_(received_messages.count).must_equal 1
received_message = received_messages.first
Expand All @@ -303,7 +309,10 @@ def retrieve_snapshot project, subscription, snapshot_name
subscription.seek snapshot

# Check it again pulls the message
received_messages = pull_with_retry subscription
wait_for_condition description: "subscription pull" do
received_messages = subscription.pull immediate: false
received_messages.any?
end
_(received_messages.count).must_equal 1
received_message = received_messages.first
_(received_message).wont_be :nil?
Expand Down Expand Up @@ -369,7 +378,11 @@ def retrieve_snapshot project, subscription, snapshot_name

# Nack the message
(1..7).each do |i|
received_messages = pull_with_retry subscription
received_messages = []
wait_for_condition description: "subscription pull" do
received_messages = subscription.pull immediate: false
received_messages.any?
end
_(received_messages.count).must_equal 1
received_message = received_messages.first
_(received_message.msg.data).must_equal msg.data
Expand All @@ -378,13 +391,17 @@ def retrieve_snapshot project, subscription, snapshot_name
end

# Check the dead letter subscription pulls the message
received_messages = pull_with_retry dead_letter_subscription
received_messages = []
wait_for_condition description: "subscription pull" do
received_messages = subscription.pull immediate: false
received_messages.any?
end
_(received_messages).wont_be :empty?
_(received_messages.count).must_equal 1
received_message = received_messages.first
_(received_message).wont_be :nil?
_(received_message.msg.data).must_equal msg.data
_(received_message.delivery_attempt).must_be :nil?
_(received_message.delivery_attempt).must_be :>, 0

# update
dead_letter_topic_2 = retrieve_topic dead_letter_topic_name_2
Expand Down
18 changes: 8 additions & 10 deletions google-cloud-pubsub/acceptance/pubsub/schema_test.rb
Expand Up @@ -109,7 +109,10 @@
_(msg).wont_be :nil?

# Check it received the published message
received_messages = pull_with_retry subscription
wait_for_condition description: "subscription pull" do
received_messages = subscription.pull immediate: false
received_messages.any?
end
_(received_messages.count).must_equal 1
received_message = received_messages.first
_(received_message.data).must_equal msg.data
Expand All @@ -131,15 +134,10 @@
# delete
schema.delete

schema = pubsub.schema schema_name
wait_for_condition description: "schema delete" do
schema = pubsub.schema schema_name
schema.nil?
end
_(schema).must_be :nil?

topic = pubsub.topic topic.name
_(topic.schema_name).must_equal "_deleted-schema_"
_(topic.message_encoding).must_equal :BINARY

expect do
pubsub.create_topic topic_name_2, schema_name: schema_name, message_encoding: :binary
end.must_raise Google::Cloud::NotFoundError
end
end
19 changes: 9 additions & 10 deletions google-cloud-pubsub/acceptance/pubsub_helper.rb
Expand Up @@ -60,17 +60,16 @@ def setup
super
end

def pull_with_retry sub
received_messages = []
retries = 0
while retries <= 5 do
received_messages = sub.pull immediate: false
break if received_messages.any?
retries += 1
puts "the subscription does not have the message yet. sleeping for #{retries*retries} second(s) and retrying."
sleep retries*retries
def wait_for_condition description: nil, retries: 10, &callback
count = 0
while count <= retries do
result = callback.call
break if result
count += 1
puts "The #{description} callback has not been satisfied yet. Sleeping for #{count*count} second(s) and retrying."
sleep count*count
end
received_messages
result
end

# Add spec DSL
Expand Down
4 changes: 0 additions & 4 deletions google-cloud-pubsub/lib/google/cloud/pubsub.rb
Expand Up @@ -33,8 +33,6 @@ module Cloud
# See {file:OVERVIEW.md Google Cloud Pub/Sub Overview}.
#
module PubSub
# rubocop:disable Metrics/AbcSize

##
# Creates a new object for connecting to the Pub/Sub service.
# Each call creates a new connection.
Expand Down Expand Up @@ -110,8 +108,6 @@ def self.new project_id: nil,
PubSub::Project.new service
end

# rubocop:enable Metrics/AbcSize

##
# Configure the Google Cloud PubSub library.
#
Expand Down
48 changes: 42 additions & 6 deletions google-cloud-pubsub/lib/google/cloud/pubsub/async_publisher.rb
Expand Up @@ -16,6 +16,7 @@
require "monitor"
require "concurrent"
require "google/cloud/pubsub/errors"
require "google/cloud/pubsub/flow_controller"
require "google/cloud/pubsub/async_publisher/batch"
require "google/cloud/pubsub/publish_result"
require "google/cloud/pubsub/service"
Expand Down Expand Up @@ -65,14 +66,21 @@ class AsyncPublisher
attr_reader :interval
attr_reader :publish_threads
attr_reader :callback_threads
attr_reader :flow_control
##
# @private Implementation accessors
attr_reader :service, :batch, :publish_thread_pool,
:callback_thread_pool
:callback_thread_pool, :flow_controller

##
# @private Create a new instance of the object.
def initialize topic_name, service, max_bytes: 1_000_000, max_messages: 100, interval: 0.01, threads: {}
def initialize topic_name,
service,
max_bytes: 1_000_000,
max_messages: 100,
interval: 0.01,
threads: {},
flow_control: {}
# init MonitorMixin
super()
@topic_name = service.topic_path topic_name
Expand All @@ -83,6 +91,10 @@ def initialize topic_name, service, max_bytes: 1_000_000, max_messages: 100, int
@interval = interval
@publish_threads = (threads[:publish] || 2).to_i
@callback_threads = (threads[:callback] || 4).to_i
@flow_control = {
message_limit: 10 * @max_messages,
byte_limit: 10 * @max_bytes
}.merge(flow_control).freeze

@published_at = nil
@publish_thread_pool = Concurrent::ThreadPoolExecutor.new max_threads: @publish_threads
Expand All @@ -91,7 +103,7 @@ def initialize topic_name, service, max_bytes: 1_000_000, max_messages: 100, int
@ordered = false
@batches = {}
@cond = new_cond

@flow_controller = FlowController.new(**@flow_control)
@thread = Thread.new { run_background }
end

Expand Down Expand Up @@ -121,13 +133,22 @@ def initialize topic_name, service, max_bytes: 1_000_000, max_messages: 100, int
#
def publish data = nil, attributes = nil, ordering_key: nil, **extra_attrs, &callback
msg = Convert.pubsub_message data, attributes, ordering_key, extra_attrs
begin
@flow_controller.acquire msg.to_proto.bytesize
rescue FlowControlLimitError => e
stop_publish ordering_key, e if ordering_key
raise
end

synchronize do
raise AsyncPublisherStopped if @stopped
raise OrderedMessagesDisabled if !@ordered && !msg.ordering_key.empty? # default is empty string

batch = resolve_batch_for_message msg
raise OrderingKeyError, batch.ordering_key if batch.canceled?
if batch.canceled?
@flow_controller.release msg.to_proto.bytesize
raise OrderingKeyError, batch.ordering_key
end
batch_action = batch.add msg, callback
if batch_action == :full
publish_batches!
Expand Down Expand Up @@ -305,6 +326,21 @@ def resolve_batch_for_ordering_key ordering_key
@batches[ordering_key]
end

def stop_publish ordering_key, err
synchronize do
batch = resolve_batch_for_ordering_key ordering_key
return if batch.nil?
items = batch.cancel!
items.each do |item|
@flow_controller.release item.bytesize
next unless item.callback

publish_result = PublishResult.from_error item.msg, err
execute_callback_async item.callback, publish_result
end
end
end

def publish_batches! stop: nil
@batches.reject! { |_ordering_key, batch| batch.empty? }
@batches.each_value do |batch|
Expand All @@ -325,7 +361,6 @@ def publish_batch_async topic_name, batch
end

# rubocop:disable Metrics/AbcSize
# rubocop:disable Metrics/MethodLength

def publish_batch_sync topic_name, batch
# The only batch methods that are safe to call from the loop are
Expand All @@ -337,6 +372,7 @@ def publish_batch_sync topic_name, batch
unless items.empty?
grpc = @service.publish topic_name, items.map(&:msg)
items.zip Array(grpc.message_ids) do |item, id|
@flow_controller.release item.bytesize
next unless item.callback

item.msg.message_id = id
Expand All @@ -363,6 +399,7 @@ def publish_batch_sync topic_name, batch
end

items.each do |item|
@flow_controller.release item.bytesize
next unless item.callback

publish_result = PublishResult.from_error item.msg, e
Expand All @@ -374,7 +411,6 @@ def publish_batch_sync topic_name, batch
end

# rubocop:enable Metrics/AbcSize
# rubocop:enable Metrics/MethodLength

PUBLISH_RETRY_ERRORS = [
GRPC::Cancelled, GRPC::DeadlineExceeded, GRPC::Internal,
Expand Down
Expand Up @@ -129,8 +129,8 @@ def stopping?
end

##
# Rebalances the batch by moving any queued items that will fit into
# the active item list.
# Fills the batch by sequentially moving the queued items that will
# fit into the active item list.
#
# This method is only intended to be used by the active publishing
# job.
Expand All @@ -152,8 +152,6 @@ def rebalance!
end
end

# rubocop:disable Metrics/MethodLength

##
# Resets the batch after a successful publish. This clears the active
# item list and moves the queued items that will fit into the active
Expand Down Expand Up @@ -202,8 +200,6 @@ def reset!
true
end

# rubocop:enable Metrics/MethodLength

##
# Cancel the batch and hault futher batches until resumed. See
# {#resume!} and {#canceled?}.
Expand Down

0 comments on commit cfa68ba

Please sign in to comment.