Skip to content

Commit

Permalink
Merge pull request #10754 from rabbitmq/amazon-mq-consumers-limit
Browse files Browse the repository at this point in the history
Introduce per-channel consumers limit (by @illotum, AWS)
  • Loading branch information
michaelklishin committed Mar 16, 2024
2 parents de6cff6 + 34b1cf6 commit 390a0b4
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 3 deletions.
18 changes: 18 additions & 0 deletions deps/rabbit/priv/schema/rabbit.schema
Expand Up @@ -903,6 +903,24 @@ end}.
end
}.

%% Set the max allowed number of consumers per channel.
%% `infinity` means "no limit".
%%
%% {consumer_max_per_channel, infinity},

{mapping, "consumer_max_per_channel", "rabbit.consumer_max_per_channel",
[{datatype, [{atom, infinity}, integer]}]}.

{translation, "rabbit.consumer_max_per_channel",
fun(Conf) ->
case cuttlefish:conf_get("consumer_max_per_channel", Conf, undefined) of
undefined -> cuttlefish:unset();
infinity -> infinity;
Val when is_integer(Val) andalso Val > 0 -> Val;
_ -> cuttlefish:invalid("should be positive integer or 'infinity'")
end
end
}.

%% Set the max permissible number of client connections per node.
%% `infinity` means "no limit".
Expand Down
13 changes: 13 additions & 0 deletions deps/rabbit/src/rabbit_channel.erl
Expand Up @@ -105,6 +105,7 @@
consumer_prefetch,
consumer_timeout,
authz_context,
max_consumers, % taken from rabbit.consumer_max_per_channel
%% defines how ofter gc will be executed
writer_gc_threshold
}).
Expand Down Expand Up @@ -507,6 +508,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
ConsumerTimeout = get_consumer_timeout(),
OptionalVariables = extract_variable_map_from_amqp_params(AmqpParams),
{ok, GCThreshold} = application:get_env(rabbit, writer_gc_threshold),
MaxConsumers = application:get_env(rabbit, consumer_max_per_channel, infinity),
State = #ch{cfg = #conf{state = starting,
protocol = Protocol,
channel = Channel,
Expand All @@ -523,6 +525,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
consumer_prefetch = Prefetch,
consumer_timeout = ConsumerTimeout,
authz_context = OptionalVariables,
max_consumers = MaxConsumers,
writer_gc_threshold = GCThreshold
},
limiter = Limiter,
Expand Down Expand Up @@ -1313,8 +1316,13 @@ handle_method(#'basic.consume'{queue = <<"amq.rabbitmq.reply-to">>,
no_ack = NoAck,
nowait = NoWait},
_, State = #ch{reply_consumer = ReplyConsumer,
cfg = #conf{max_consumers = MaxConsumers},
consumer_mapping = ConsumerMapping}) ->
CurrentConsumers = maps:size(ConsumerMapping),
case maps:find(CTag0, ConsumerMapping) of
error when CurrentConsumers >= MaxConsumers -> % false when MaxConsumers is 'infinity'
rabbit_misc:protocol_error(
not_allowed, "reached maximum (~ts) of consumers per channel", [MaxConsumers]);
error ->
case {ReplyConsumer, NoAck} of
{none, true} ->
Expand Down Expand Up @@ -1363,12 +1371,17 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
nowait = NoWait,
arguments = Args},
_, State = #ch{cfg = #conf{consumer_prefetch = ConsumerPrefetch,
max_consumers = MaxConsumers,
user = User,
virtual_host = VHostPath,
authz_context = AuthzContext},
consumer_mapping = ConsumerMapping
}) ->
CurrentConsumers = maps:size(ConsumerMapping),
case maps:find(ConsumerTag, ConsumerMapping) of
error when CurrentConsumers >= MaxConsumers -> % false when MaxConsumers is 'infinity'
rabbit_misc:protocol_error(
not_allowed, "reached maximum (~ts) of consumers per channel", [MaxConsumers]);
error ->
QueueName = qbin_to_resource(QueueNameBin, VHostPath),
check_read_permitted(QueueName, User, AuthzContext),
Expand Down
8 changes: 8 additions & 0 deletions deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets
Expand Up @@ -404,6 +404,14 @@ tcp_listen_options.exit_on_close = false",
"channel_max_per_node = infinity",
[{rabbit,[{channel_max_per_node, infinity}]}],
[]},
{consumer_max_per_channel,
"consumer_max_per_channel = 16",
[{rabbit,[{consumer_max_per_channel, 16}]}],
[]},
{consumer_max_per_channel,
"consumer_max_per_channel = infinity",
[{rabbit,[{consumer_max_per_channel, infinity}]}],
[]},
{max_message_size,
"max_message_size = 131072",
[{rabbit, [{max_message_size, 131072}]}],
Expand Down
43 changes: 40 additions & 3 deletions deps/rabbit/test/per_node_limit_SUITE.erl
Expand Up @@ -23,6 +23,7 @@ groups() ->
{limit_tests, [], [
node_connection_limit,
vhost_limit,
channel_consumers_limit,
node_channel_limit
]}
].
Expand Down Expand Up @@ -62,13 +63,15 @@ init_per_testcase(Testcase, Config) ->

end_per_testcase(vhost_limit = Testcase, Config) ->
set_node_limit(Config, vhost_max, infinity),
set_node_limit(Config, channel_max_per_node, 0),
set_node_limit(Config, channel_max_per_node, infinity),
set_node_limit(Config, consumer_max_per_channel, infinity),
set_node_limit(Config, connection_max, infinity),
[rabbit_ct_broker_helpers:delete_vhost(Config, integer_to_binary(I)) || I <- lists:seq(1,4)],
rabbit_ct_helpers:testcase_finished(Config, Testcase);
end_per_testcase(Testcase, Config) ->
set_node_limit(Config, vhost_max, infinity),
set_node_limit(Config, channel_max_per_node, 0),
set_node_limit(Config, channel_max_per_node, infinity),
set_node_limit(Config, consumer_max_per_channel, infinity),
set_node_limit(Config, connection_max, infinity),
rabbit_ct_helpers:testcase_finished(Config, Testcase).

Expand Down Expand Up @@ -111,12 +114,13 @@ vhost_limit(Config) ->
node_channel_limit(Config) ->
set_node_limit(Config, channel_max_per_node, 5),

VHost = <<"foobar">>,
VHost = <<"node_channel_limit">>,
User = <<"guest">>,
ok = rabbit_ct_broker_helpers:add_vhost(Config, VHost),
ok = rabbit_ct_broker_helpers:set_full_permissions(Config, User, VHost),
Conn1 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VHost),
Conn2 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VHost),
0 = count_channels_per_node(Config),

lists:foreach(fun(N) when (N band 1) == 1 -> {ok, _} = open_channel(Conn1);
(_) -> {ok,_ } = open_channel(Conn2)
Expand All @@ -137,6 +141,30 @@ node_channel_limit(Config) ->

%% Now all connections are closed, so there should be 0 open connections
0 = count_channels_per_node(Config),
close_all_connections([Conn1, Conn2]),

rabbit_ct_broker_helpers:delete_vhost(Config, VHost),

ok.

channel_consumers_limit(Config) ->
set_node_limit(Config, consumer_max_per_channel, 2),

VHost = <<"channel_consumers_limit">>,
User = <<"guest">>,
ok = rabbit_ct_broker_helpers:add_vhost(Config, VHost),
ok = rabbit_ct_broker_helpers:set_full_permissions(Config, User, VHost),
Conn1 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VHost),
{ok, Ch} = open_channel(Conn1),
Q = <<"Q">>, Tag = <<"Tag">>,

{ok, _} = consume(Ch, Q, <<"Tag1">>),
{ok, _} = consume(Ch, Q, <<"Tag2">>),
{error, not_allowed_crash} = consume(Ch, Q, <<"Tag3">>), % Third consumer should fail

close_all_connections([Conn1]),
rabbit_ct_broker_helpers:delete_vhost(Config, VHost),

ok.

%% -------------------------------------------------------------------
Expand All @@ -157,6 +185,15 @@ set_node_limit(Config, Type, Limit) ->
application,
set_env, [rabbit, Type, Limit]).

consume(Ch, Q, Tag) ->
#'queue.declare_ok'{queue = Q} = amqp_channel:call(Ch, #'queue.declare'{queue = Q}),
try amqp_channel:call(Ch, #'basic.consume'{queue = Q, consumer_tag = Tag}) of
#'basic.consume_ok'{} = OK -> {ok, OK};
NotOk -> {error, NotOk}
catch
_:_Error -> {error, not_allowed_crash}
end.

open_channel(Conn) when is_pid(Conn) ->
try amqp_connection:open_channel(Conn) of
{ok, Ch} -> {ok, Ch};
Expand Down

0 comments on commit 390a0b4

Please sign in to comment.