Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce per-channel consumers limit (by @illotum, AWS) (backport #10754) (backport #10755) #10756

Merged
merged 5 commits into from Mar 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 18 additions & 0 deletions deps/rabbit/priv/schema/rabbit.schema
Expand Up @@ -899,6 +899,24 @@ end}.
end
}.

%% Set the max allowed number of consumers per channel.
%% `infinity` means "no limit".
%%
%% {consumer_max_per_channel, infinity},

{mapping, "consumer_max_per_channel", "rabbit.consumer_max_per_channel",
[{datatype, [{atom, infinity}, integer]}]}.

{translation, "rabbit.consumer_max_per_channel",
fun(Conf) ->
case cuttlefish:conf_get("consumer_max_per_channel", Conf, undefined) of
undefined -> cuttlefish:unset();
infinity -> infinity;
Val when is_integer(Val) andalso Val > 0 -> Val;
_ -> cuttlefish:invalid("should be positive integer or 'infinity'")
end
end
}.

%% Set the max permissible number of client connections per node.
%% `infinity` means "no limit".
Expand Down
16 changes: 15 additions & 1 deletion deps/rabbit/src/rabbit_channel.erl
Expand Up @@ -109,6 +109,8 @@
max_message_size,
consumer_timeout,
authz_context,
%% taken from rabbit.consumer_max_per_channel
max_consumers,
%% defines how ofter gc will be executed
writer_gc_threshold,
%% true with AMQP 1.0 to include the publishing sequence
Expand Down Expand Up @@ -506,6 +508,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
OptionalVariables = extract_variable_map_from_amqp_params(AmqpParams),
UseExtendedReturnCallback = use_extended_return_callback(AmqpParams),
{ok, GCThreshold} = application:get_env(rabbit, writer_gc_threshold),
MaxConsumers = application:get_env(rabbit, consumer_max_per_channel, infinity),
State = #ch{cfg = #conf{state = starting,
protocol = Protocol,
channel = Channel,
Expand All @@ -524,7 +527,8 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
consumer_timeout = ConsumerTimeout,
authz_context = OptionalVariables,
writer_gc_threshold = GCThreshold,
extended_return_callback = UseExtendedReturnCallback
extended_return_callback = UseExtendedReturnCallback,
max_consumers = MaxConsumers
},
limiter = Limiter,
tx = none,
Expand Down Expand Up @@ -1355,8 +1359,13 @@ handle_method(#'basic.consume'{queue = <<"amq.rabbitmq.reply-to">>,
no_ack = NoAck,
nowait = NoWait},
_, State = #ch{reply_consumer = ReplyConsumer,
cfg = #conf{max_consumers = MaxConsumers},
consumer_mapping = ConsumerMapping}) ->
CurrentConsumers = maps:size(ConsumerMapping),
case maps:find(CTag0, ConsumerMapping) of
error when CurrentConsumers >= MaxConsumers -> % false when MaxConsumers is 'infinity'
rabbit_misc:protocol_error(
not_allowed, "reached maximum (~ts) of consumers per channel", [MaxConsumers]);
error ->
case {ReplyConsumer, NoAck} of
{none, true} ->
Expand Down Expand Up @@ -1405,12 +1414,17 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
nowait = NoWait,
arguments = Args},
_, State = #ch{cfg = #conf{consumer_prefetch = ConsumerPrefetch,
max_consumers = MaxConsumers,
user = User,
virtual_host = VHostPath,
authz_context = AuthzContext},
consumer_mapping = ConsumerMapping
}) ->
CurrentConsumers = maps:size(ConsumerMapping),
case maps:find(ConsumerTag, ConsumerMapping) of
error when CurrentConsumers >= MaxConsumers -> % false when MaxConsumers is 'infinity'
rabbit_misc:protocol_error(
not_allowed, "reached maximum (~ts) of consumers per channel", [MaxConsumers]);
error ->
QueueName = qbin_to_resource(QueueNameBin, VHostPath),
check_read_permitted(QueueName, User, AuthzContext),
Expand Down
8 changes: 8 additions & 0 deletions deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets
Expand Up @@ -320,6 +320,14 @@ tcp_listen_options.exit_on_close = false",
"channel_max_per_node = infinity",
[{rabbit,[{channel_max_per_node, infinity}]}],
[]},
{consumer_max_per_channel,
"consumer_max_per_channel = 16",
[{rabbit,[{consumer_max_per_channel, 16}]}],
[]},
{consumer_max_per_channel,
"consumer_max_per_channel = infinity",
[{rabbit,[{consumer_max_per_channel, infinity}]}],
[]},
{max_message_size,
"max_message_size = 131072",
[{rabbit, [{max_message_size, 131072}]}],
Expand Down
43 changes: 40 additions & 3 deletions deps/rabbit/test/per_node_limit_SUITE.erl
Expand Up @@ -23,6 +23,7 @@ groups() ->
{limit_tests, [], [
node_connection_limit,
vhost_limit,
channel_consumers_limit,
node_channel_limit
]}
].
Expand Down Expand Up @@ -62,13 +63,15 @@ init_per_testcase(Testcase, Config) ->

end_per_testcase(vhost_limit = Testcase, Config) ->
set_node_limit(Config, vhost_max, infinity),
set_node_limit(Config, channel_max_per_node, 0),
set_node_limit(Config, channel_max_per_node, infinity),
set_node_limit(Config, consumer_max_per_channel, infinity),
set_node_limit(Config, connection_max, infinity),
[rabbit_ct_broker_helpers:delete_vhost(Config, integer_to_binary(I)) || I <- lists:seq(1,4)],
rabbit_ct_helpers:testcase_finished(Config, Testcase);
end_per_testcase(Testcase, Config) ->
set_node_limit(Config, vhost_max, infinity),
set_node_limit(Config, channel_max_per_node, 0),
set_node_limit(Config, channel_max_per_node, infinity),
set_node_limit(Config, consumer_max_per_channel, infinity),
set_node_limit(Config, connection_max, infinity),
rabbit_ct_helpers:testcase_finished(Config, Testcase).

Expand Down Expand Up @@ -111,12 +114,13 @@ vhost_limit(Config) ->
node_channel_limit(Config) ->
set_node_limit(Config, channel_max_per_node, 5),

VHost = <<"foobar">>,
VHost = <<"node_channel_limit">>,
User = <<"guest">>,
ok = rabbit_ct_broker_helpers:add_vhost(Config, VHost),
ok = rabbit_ct_broker_helpers:set_full_permissions(Config, User, VHost),
Conn1 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VHost),
Conn2 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VHost),
0 = count_channels_per_node(Config),

lists:foreach(fun(N) when (N band 1) == 1 -> {ok, _} = open_channel(Conn1);
(_) -> {ok,_ } = open_channel(Conn2)
Expand All @@ -137,6 +141,30 @@ node_channel_limit(Config) ->

%% Now all connections are closed, so there should be 0 open connections
0 = count_channels_per_node(Config),
close_all_connections([Conn1, Conn2]),

rabbit_ct_broker_helpers:delete_vhost(Config, VHost),

ok.

channel_consumers_limit(Config) ->
set_node_limit(Config, consumer_max_per_channel, 2),

VHost = <<"channel_consumers_limit">>,
User = <<"guest">>,
ok = rabbit_ct_broker_helpers:add_vhost(Config, VHost),
ok = rabbit_ct_broker_helpers:set_full_permissions(Config, User, VHost),
Conn1 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VHost),
{ok, Ch} = open_channel(Conn1),
Q = <<"Q">>, Tag = <<"Tag">>,

{ok, _} = consume(Ch, Q, <<"Tag1">>),
{ok, _} = consume(Ch, Q, <<"Tag2">>),
{error, not_allowed_crash} = consume(Ch, Q, <<"Tag3">>), % Third consumer should fail

close_all_connections([Conn1]),
rabbit_ct_broker_helpers:delete_vhost(Config, VHost),

ok.

%% -------------------------------------------------------------------
Expand All @@ -157,6 +185,15 @@ set_node_limit(Config, Type, Limit) ->
application,
set_env, [rabbit, Type, Limit]).

consume(Ch, Q, Tag) ->
#'queue.declare_ok'{queue = Q} = amqp_channel:call(Ch, #'queue.declare'{queue = Q}),
try amqp_channel:call(Ch, #'basic.consume'{queue = Q, consumer_tag = Tag}) of
#'basic.consume_ok'{} = OK -> {ok, OK};
NotOk -> {error, NotOk}
catch
_:_Error -> {error, not_allowed_crash}
end.

open_channel(Conn) when is_pid(Conn) ->
try amqp_connection:open_channel(Conn) of
{ok, Ch} -> {ok, Ch};
Expand Down