Skip to content

Commit

Permalink
Add consumers per channel limit
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Valiushko committed Mar 14, 2024
1 parent 210a685 commit fcc5500
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 2 deletions.
23 changes: 23 additions & 0 deletions deps/rabbit/priv/schema/rabbit.schema
Expand Up @@ -889,6 +889,11 @@ end}.

{mapping, "channel_max", "rabbit.channel_max", [{datatype, integer}]}.

%% Set the max allowed number of channels per node.
%% `infinity` means "no limit".
%%
%% {channel_max_per_node, infinity},

{mapping, "channel_max_per_node", "rabbit.channel_max_per_node",
[{datatype, [{atom, infinity}, integer]}]}.

Expand All @@ -903,6 +908,24 @@ end}.
end
}.

%% Set the max allowed number of consumers per channel.
%% `infinity` means "no limit".
%%
%% {consumer_max_per_channel, infinity},

{mapping, "consumer_max_per_channel", "rabbit.consumer_max_per_channel",
[{datatype, [{atom, infinity}, integer]}]}.

{translation, "rabbit.consumer_max_per_channel",
fun(Conf) ->
case cuttlefish:conf_get("consumer_max_per_channel", Conf, undefined) of
undefined -> cuttlefish:unset();
infinity -> infinity;
Val when is_integer(Val) andalso Val > 0 -> Val;
_ -> cuttlefish:invalid("should be positive integer or 'infinity'")
end
end
}.

%% Set the max permissible number of client connections per node.
%% `infinity` means "no limit".
Expand Down
13 changes: 13 additions & 0 deletions deps/rabbit/src/rabbit_channel.erl
Expand Up @@ -106,6 +106,7 @@
consumer_timeout,
authz_context,
%% defines how ofter gc will be executed
max_consumers % taken from rabbit.consumer_max_per_channel
writer_gc_threshold
}).

Expand Down Expand Up @@ -507,6 +508,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
ConsumerTimeout = get_consumer_timeout(),
OptionalVariables = extract_variable_map_from_amqp_params(AmqpParams),
{ok, GCThreshold} = application:get_env(rabbit, writer_gc_threshold),
MaxConsumers = application:get_env(rabbit, consumer_max_per_channel, infinity),
State = #ch{cfg = #conf{state = starting,
protocol = Protocol,
channel = Channel,
Expand All @@ -523,6 +525,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
consumer_prefetch = Prefetch,
consumer_timeout = ConsumerTimeout,
authz_context = OptionalVariables,
max_consumers = MaxConsumers,
writer_gc_threshold = GCThreshold
},
limiter = Limiter,
Expand Down Expand Up @@ -1313,8 +1316,13 @@ handle_method(#'basic.consume'{queue = <<"amq.rabbitmq.reply-to">>,
no_ack = NoAck,
nowait = NoWait},
_, State = #ch{reply_consumer = ReplyConsumer,
cfg = #conf{max_consumers = MaxConsumers},
consumer_mapping = ConsumerMapping}) ->
CurrentConsumers = length(maps:keys(ConsumerMapping)),
case maps:find(CTag0, ConsumerMapping) of
error when CurrentConsumers >= MaxConsumers -> % false when MaxConsumers is 'infinity'
rabbit_misc:protocol_error(
not_allowed, "reached maximum (~ts) of consumers per channel", [MaxConsumers]);
error ->
case {ReplyConsumer, NoAck} of
{none, true} ->
Expand Down Expand Up @@ -1363,12 +1371,17 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
nowait = NoWait,
arguments = Args},
_, State = #ch{cfg = #conf{consumer_prefetch = ConsumerPrefetch,
max_consumers = MaxConsumers,
user = User,
virtual_host = VHostPath,
authz_context = AuthzContext},
consumer_mapping = ConsumerMapping
}) ->
CurrentConsumers = length(maps:keys(ConsumerMapping)),
case maps:find(ConsumerTag, ConsumerMapping) of
error when CurrentConsumers >= MaxConsumers -> % false when MaxConsumers is 'infinity'
rabbit_misc:protocol_error(
not_allowed, "reached maximum (~ts) of consumers per channel", [MaxConsumers]);
error ->
QueueName = qbin_to_resource(QueueNameBin, VHostPath),
check_read_permitted(QueueName, User, AuthzContext),
Expand Down
8 changes: 8 additions & 0 deletions deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets
Expand Up @@ -404,6 +404,14 @@ tcp_listen_options.exit_on_close = false",
"channel_max_per_node = infinity",
[{rabbit,[{channel_max_per_node, infinity}]}],
[]},
{consumer_max_per_channel,
"consumer_max_per_channel = 16",
[{rabbit,[{consumer_max_per_channel, 16}]}],
[]},
{consumer_max_per_channel,
"consumer_max_per_channel = infinity",
[{rabbit,[{consumer_max_per_channel, infinity}]}],
[]},
{max_message_size,
"max_message_size = 131072",
[{rabbit, [{max_message_size, 131072}]}],
Expand Down
35 changes: 33 additions & 2 deletions deps/rabbit/test/per_node_limit_SUITE.erl
Expand Up @@ -23,6 +23,7 @@ groups() ->
{limit_tests, [], [
node_connection_limit,
vhost_limit,
channel_consumers_limit,
node_channel_limit
]}
].
Expand Down Expand Up @@ -62,13 +63,15 @@ init_per_testcase(Testcase, Config) ->

end_per_testcase(vhost_limit = Testcase, Config) ->
set_node_limit(Config, vhost_max, infinity),
set_node_limit(Config, channel_max_per_node, 0),
set_node_limit(Config, channel_max_per_node, infinity),
set_node_limit(Config, consumer_max_per_channel, infinity),
set_node_limit(Config, connection_max, infinity),
[rabbit_ct_broker_helpers:delete_vhost(Config, integer_to_binary(I)) || I <- lists:seq(1,4)],
rabbit_ct_helpers:testcase_finished(Config, Testcase);
end_per_testcase(Testcase, Config) ->
set_node_limit(Config, vhost_max, infinity),
set_node_limit(Config, channel_max_per_node, 0),
set_node_limit(Config, channel_max_per_node, infinity),
set_node_limit(Config, consumer_max_per_channel, infinity),
set_node_limit(Config, connection_max, infinity),
rabbit_ct_helpers:testcase_finished(Config, Testcase).

Expand Down Expand Up @@ -117,6 +120,7 @@ node_channel_limit(Config) ->
ok = rabbit_ct_broker_helpers:set_full_permissions(Config, User, VHost),
Conn1 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VHost),
Conn2 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VHost),
0 = count_channels_per_node(Config),

lists:foreach(fun(N) when (N band 1) == 1 -> {ok, _} = open_channel(Conn1);
(_) -> {ok,_ } = open_channel(Conn2)
Expand All @@ -137,6 +141,24 @@ node_channel_limit(Config) ->

%% Now all connections are closed, so there should be 0 open connections
0 = count_channels_per_node(Config),
close_all_connections([Conn1, Conn2]),
ok.

channel_consumers_limit(Config) ->
set_node_limit(Config, consumer_max_per_channel, 1),

VHost = <<"foobar">>,
User = <<"guest">>,
ok = rabbit_ct_broker_helpers:add_vhost(Config, VHost),
ok = rabbit_ct_broker_helpers:set_full_permissions(Config, User, VHost),
Conn1 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VHost),
{ok, Ch} = open_channel(Conn1),
Q = <<"Q">>, Tag = <<"Tag">>,

{ok, _} = consume(Ch, Q, Tag),
{error, not_allowed_crash} = consume(Ch, Q, Tag), % Reusing Tag should fail

close_all_connections([Conn1]),
ok.

%% -------------------------------------------------------------------
Expand All @@ -157,6 +179,15 @@ set_node_limit(Config, Type, Limit) ->
application,
set_env, [rabbit, Type, Limit]).

consume(Ch, Q, Tag) ->
#'queue.declare_ok'{queue = Q} = amqp_channel:call(Ch, #'queue.declare'{queue = Q}),
try amqp_channel:call(Ch, #'basic.consume'{queue = Q, consumer_tag = Tag}) of
#'basic.consume_ok'{} = OK -> {ok, OK};
NotOk -> {error, NotOk}
catch
_:_Error -> {error, not_allowed_crash}
end.

open_channel(Conn) when is_pid(Conn) ->
try amqp_connection:open_channel(Conn) of
{ok, Ch} -> {ok, Ch};
Expand Down

0 comments on commit fcc5500

Please sign in to comment.