Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce per-channel consumers limit (by @illotum, AWS) #10754

Merged
merged 4 commits into from Mar 16, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 18 additions & 0 deletions deps/rabbit/priv/schema/rabbit.schema
Expand Up @@ -903,6 +903,24 @@ end}.
end
}.

%% Set the max allowed number of consumers per channel.
%% `infinity` means "no limit".
%%
%% {consumer_max_per_channel, infinity},

{mapping, "consumer_max_per_channel", "rabbit.consumer_max_per_channel",
[{datatype, [{atom, infinity}, integer]}]}.

{translation, "rabbit.consumer_max_per_channel",
fun(Conf) ->
case cuttlefish:conf_get("consumer_max_per_channel", Conf, undefined) of
undefined -> cuttlefish:unset();
infinity -> infinity;
Val when is_integer(Val) andalso Val > 0 -> Val;
_ -> cuttlefish:invalid("should be positive integer or 'infinity'")
end
end
}.

%% Set the max permissible number of client connections per node.
%% `infinity` means "no limit".
Expand Down
13 changes: 13 additions & 0 deletions deps/rabbit/src/rabbit_channel.erl
Expand Up @@ -105,6 +105,7 @@
consumer_prefetch,
consumer_timeout,
authz_context,
max_consumers, % taken from rabbit.consumer_max_per_channel
%% defines how ofter gc will be executed
writer_gc_threshold
}).
Expand Down Expand Up @@ -507,6 +508,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
ConsumerTimeout = get_consumer_timeout(),
OptionalVariables = extract_variable_map_from_amqp_params(AmqpParams),
{ok, GCThreshold} = application:get_env(rabbit, writer_gc_threshold),
MaxConsumers = application:get_env(rabbit, consumer_max_per_channel, infinity),
State = #ch{cfg = #conf{state = starting,
protocol = Protocol,
channel = Channel,
Expand All @@ -523,6 +525,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
consumer_prefetch = Prefetch,
consumer_timeout = ConsumerTimeout,
authz_context = OptionalVariables,
max_consumers = MaxConsumers,
writer_gc_threshold = GCThreshold
},
limiter = Limiter,
Expand Down Expand Up @@ -1313,8 +1316,13 @@ handle_method(#'basic.consume'{queue = <<"amq.rabbitmq.reply-to">>,
no_ack = NoAck,
nowait = NoWait},
_, State = #ch{reply_consumer = ReplyConsumer,
cfg = #conf{max_consumers = MaxConsumers},
consumer_mapping = ConsumerMapping}) ->
CurrentConsumers = maps:size(ConsumerMapping),
case maps:find(CTag0, ConsumerMapping) of
error when CurrentConsumers >= MaxConsumers -> % false when MaxConsumers is 'infinity'
rabbit_misc:protocol_error(
not_allowed, "reached maximum (~ts) of consumers per channel", [MaxConsumers]);
error ->
case {ReplyConsumer, NoAck} of
{none, true} ->
Expand Down Expand Up @@ -1363,12 +1371,17 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
nowait = NoWait,
arguments = Args},
_, State = #ch{cfg = #conf{consumer_prefetch = ConsumerPrefetch,
max_consumers = MaxConsumers,
user = User,
virtual_host = VHostPath,
authz_context = AuthzContext},
consumer_mapping = ConsumerMapping
}) ->
CurrentConsumers = length(maps:keys(ConsumerMapping)),
michaelklishin marked this conversation as resolved.
Show resolved Hide resolved
case maps:find(ConsumerTag, ConsumerMapping) of
error when CurrentConsumers >= MaxConsumers -> % false when MaxConsumers is 'infinity'
rabbit_misc:protocol_error(
not_allowed, "reached maximum (~ts) of consumers per channel", [MaxConsumers]);
error ->
QueueName = qbin_to_resource(QueueNameBin, VHostPath),
check_read_permitted(QueueName, User, AuthzContext),
Expand Down
8 changes: 8 additions & 0 deletions deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets
Expand Up @@ -404,6 +404,14 @@ tcp_listen_options.exit_on_close = false",
"channel_max_per_node = infinity",
[{rabbit,[{channel_max_per_node, infinity}]}],
[]},
{consumer_max_per_channel,
"consumer_max_per_channel = 16",
[{rabbit,[{consumer_max_per_channel, 16}]}],
[]},
{consumer_max_per_channel,
"consumer_max_per_channel = infinity",
[{rabbit,[{consumer_max_per_channel, infinity}]}],
[]},
{max_message_size,
"max_message_size = 131072",
[{rabbit, [{max_message_size, 131072}]}],
Expand Down
43 changes: 40 additions & 3 deletions deps/rabbit/test/per_node_limit_SUITE.erl
Expand Up @@ -23,6 +23,7 @@ groups() ->
{limit_tests, [], [
node_connection_limit,
vhost_limit,
channel_consumers_limit,
node_channel_limit
]}
].
Expand Down Expand Up @@ -62,13 +63,15 @@ init_per_testcase(Testcase, Config) ->

end_per_testcase(vhost_limit = Testcase, Config) ->
set_node_limit(Config, vhost_max, infinity),
set_node_limit(Config, channel_max_per_node, 0),
set_node_limit(Config, channel_max_per_node, infinity),
set_node_limit(Config, consumer_max_per_channel, infinity),
set_node_limit(Config, connection_max, infinity),
[rabbit_ct_broker_helpers:delete_vhost(Config, integer_to_binary(I)) || I <- lists:seq(1,4)],
rabbit_ct_helpers:testcase_finished(Config, Testcase);
end_per_testcase(Testcase, Config) ->
set_node_limit(Config, vhost_max, infinity),
set_node_limit(Config, channel_max_per_node, 0),
set_node_limit(Config, channel_max_per_node, infinity),
set_node_limit(Config, consumer_max_per_channel, infinity),
set_node_limit(Config, connection_max, infinity),
rabbit_ct_helpers:testcase_finished(Config, Testcase).

Expand Down Expand Up @@ -111,12 +114,13 @@ vhost_limit(Config) ->
node_channel_limit(Config) ->
set_node_limit(Config, channel_max_per_node, 5),

VHost = <<"foobar">>,
VHost = <<"node_channel_limit">>,
User = <<"guest">>,
ok = rabbit_ct_broker_helpers:add_vhost(Config, VHost),
ok = rabbit_ct_broker_helpers:set_full_permissions(Config, User, VHost),
Conn1 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VHost),
Conn2 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VHost),
0 = count_channels_per_node(Config),

lists:foreach(fun(N) when (N band 1) == 1 -> {ok, _} = open_channel(Conn1);
(_) -> {ok,_ } = open_channel(Conn2)
Expand All @@ -137,6 +141,30 @@ node_channel_limit(Config) ->

%% Now all connections are closed, so there should be 0 open connections
0 = count_channels_per_node(Config),
close_all_connections([Conn1, Conn2]),

rabbit_ct_broker_helpers:delete_vhost(Config, VHost),

ok.

channel_consumers_limit(Config) ->
set_node_limit(Config, consumer_max_per_channel, 2),

VHost = <<"channel_consumers_limit">>,
User = <<"guest">>,
ok = rabbit_ct_broker_helpers:add_vhost(Config, VHost),
ok = rabbit_ct_broker_helpers:set_full_permissions(Config, User, VHost),
Conn1 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VHost),
{ok, Ch} = open_channel(Conn1),
Q = <<"Q">>, Tag = <<"Tag">>,

{ok, _} = consume(Ch, Q, <<"Tag1">>),
{ok, _} = consume(Ch, Q, <<"Tag2">>),
{error, not_allowed_crash} = consume(Ch, Q, <<"Tag3">>), % Third consumer should fail

close_all_connections([Conn1]),
rabbit_ct_broker_helpers:delete_vhost(Config, VHost),

ok.

%% -------------------------------------------------------------------
Expand All @@ -157,6 +185,15 @@ set_node_limit(Config, Type, Limit) ->
application,
set_env, [rabbit, Type, Limit]).

consume(Ch, Q, Tag) ->
#'queue.declare_ok'{queue = Q} = amqp_channel:call(Ch, #'queue.declare'{queue = Q}),
try amqp_channel:call(Ch, #'basic.consume'{queue = Q, consumer_tag = Tag}) of
#'basic.consume_ok'{} = OK -> {ok, OK};
NotOk -> {error, NotOk}
catch
_:_Error -> {error, not_allowed_crash}
end.

open_channel(Conn) when is_pid(Conn) ->
try amqp_connection:open_channel(Conn) of
{ok, Ch} -> {ok, Ch};
Expand Down