From fcc5500fdf4eacc97d80f95793e107d1fc8dad5c Mon Sep 17 00:00:00 2001 From: Alex Valiushko Date: Thu, 14 Mar 2024 16:09:26 -0700 Subject: [PATCH 1/2] Add consumers per channel limit --- deps/rabbit/priv/schema/rabbit.schema | 23 ++++++++++++ deps/rabbit/src/rabbit_channel.erl | 13 +++++++ .../config_schema_SUITE_data/rabbit.snippets | 8 +++++ deps/rabbit/test/per_node_limit_SUITE.erl | 35 +++++++++++++++++-- 4 files changed, 77 insertions(+), 2 deletions(-) diff --git a/deps/rabbit/priv/schema/rabbit.schema b/deps/rabbit/priv/schema/rabbit.schema index c271eb751cd6..f59479402b60 100644 --- a/deps/rabbit/priv/schema/rabbit.schema +++ b/deps/rabbit/priv/schema/rabbit.schema @@ -889,6 +889,11 @@ end}. {mapping, "channel_max", "rabbit.channel_max", [{datatype, integer}]}. +%% Set the max allowed number of channels per node. +%% `infinity` means "no limit". +%% +%% {channel_max_per_node, infinity}, + {mapping, "channel_max_per_node", "rabbit.channel_max_per_node", [{datatype, [{atom, infinity}, integer]}]}. @@ -903,6 +908,24 @@ end}. end }. +%% Set the max allowed number of consumers per channel. +%% `infinity` means "no limit". +%% +%% {consumer_max_per_channel, infinity}, + +{mapping, "consumer_max_per_channel", "rabbit.consumer_max_per_channel", + [{datatype, [{atom, infinity}, integer]}]}. + +{translation, "rabbit.consumer_max_per_channel", + fun(Conf) -> + case cuttlefish:conf_get("consumer_max_per_channel", Conf, undefined) of + undefined -> cuttlefish:unset(); + infinity -> infinity; + Val when is_integer(Val) andalso Val > 0 -> Val; + _ -> cuttlefish:invalid("should be positive integer or 'infinity'") + end + end +}. %% Set the max permissible number of client connections per node. %% `infinity` means "no limit". diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index 1464794a90c2..f25ee6225196 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -106,6 +106,7 @@ consumer_timeout, authz_context, %% defines how ofter gc will be executed + max_consumers % taken from rabbit.consumer_max_per_channel writer_gc_threshold }). @@ -507,6 +508,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, ConsumerTimeout = get_consumer_timeout(), OptionalVariables = extract_variable_map_from_amqp_params(AmqpParams), {ok, GCThreshold} = application:get_env(rabbit, writer_gc_threshold), + MaxConsumers = application:get_env(rabbit, consumer_max_per_channel, infinity), State = #ch{cfg = #conf{state = starting, protocol = Protocol, channel = Channel, @@ -523,6 +525,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, consumer_prefetch = Prefetch, consumer_timeout = ConsumerTimeout, authz_context = OptionalVariables, + max_consumers = MaxConsumers, writer_gc_threshold = GCThreshold }, limiter = Limiter, @@ -1313,8 +1316,13 @@ handle_method(#'basic.consume'{queue = <<"amq.rabbitmq.reply-to">>, no_ack = NoAck, nowait = NoWait}, _, State = #ch{reply_consumer = ReplyConsumer, + cfg = #conf{max_consumers = MaxConsumers}, consumer_mapping = ConsumerMapping}) -> + CurrentConsumers = length(maps:keys(ConsumerMapping)), case maps:find(CTag0, ConsumerMapping) of + error when CurrentConsumers >= MaxConsumers -> % false when MaxConsumers is 'infinity' + rabbit_misc:protocol_error( + not_allowed, "reached maximum (~ts) of consumers per channel", [MaxConsumers]); error -> case {ReplyConsumer, NoAck} of {none, true} -> @@ -1363,12 +1371,17 @@ handle_method(#'basic.consume'{queue = QueueNameBin, nowait = NoWait, arguments = Args}, _, State = #ch{cfg = #conf{consumer_prefetch = ConsumerPrefetch, + max_consumers = MaxConsumers, user = User, virtual_host = VHostPath, authz_context = AuthzContext}, consumer_mapping = ConsumerMapping }) -> + CurrentConsumers = length(maps:keys(ConsumerMapping)), case maps:find(ConsumerTag, ConsumerMapping) of + error when CurrentConsumers >= MaxConsumers -> % false when MaxConsumers is 'infinity' + rabbit_misc:protocol_error( + not_allowed, "reached maximum (~ts) of consumers per channel", [MaxConsumers]); error -> QueueName = qbin_to_resource(QueueNameBin, VHostPath), check_read_permitted(QueueName, User, AuthzContext), diff --git a/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets b/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets index 50ab84106861..424bdaf97d44 100644 --- a/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets +++ b/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets @@ -404,6 +404,14 @@ tcp_listen_options.exit_on_close = false", "channel_max_per_node = infinity", [{rabbit,[{channel_max_per_node, infinity}]}], []}, + {consumer_max_per_channel, + "consumer_max_per_channel = 16", + [{rabbit,[{consumer_max_per_channel, 16}]}], + []}, + {consumer_max_per_channel, + "consumer_max_per_channel = infinity", + [{rabbit,[{consumer_max_per_channel, infinity}]}], + []}, {max_message_size, "max_message_size = 131072", [{rabbit, [{max_message_size, 131072}]}], diff --git a/deps/rabbit/test/per_node_limit_SUITE.erl b/deps/rabbit/test/per_node_limit_SUITE.erl index 12a940e58de0..21af12c09847 100644 --- a/deps/rabbit/test/per_node_limit_SUITE.erl +++ b/deps/rabbit/test/per_node_limit_SUITE.erl @@ -23,6 +23,7 @@ groups() -> {limit_tests, [], [ node_connection_limit, vhost_limit, + channel_consumers_limit, node_channel_limit ]} ]. @@ -62,13 +63,15 @@ init_per_testcase(Testcase, Config) -> end_per_testcase(vhost_limit = Testcase, Config) -> set_node_limit(Config, vhost_max, infinity), - set_node_limit(Config, channel_max_per_node, 0), + set_node_limit(Config, channel_max_per_node, infinity), + set_node_limit(Config, consumer_max_per_channel, infinity), set_node_limit(Config, connection_max, infinity), [rabbit_ct_broker_helpers:delete_vhost(Config, integer_to_binary(I)) || I <- lists:seq(1,4)], rabbit_ct_helpers:testcase_finished(Config, Testcase); end_per_testcase(Testcase, Config) -> set_node_limit(Config, vhost_max, infinity), - set_node_limit(Config, channel_max_per_node, 0), + set_node_limit(Config, channel_max_per_node, infinity), + set_node_limit(Config, consumer_max_per_channel, infinity), set_node_limit(Config, connection_max, infinity), rabbit_ct_helpers:testcase_finished(Config, Testcase). @@ -117,6 +120,7 @@ node_channel_limit(Config) -> ok = rabbit_ct_broker_helpers:set_full_permissions(Config, User, VHost), Conn1 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VHost), Conn2 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VHost), + 0 = count_channels_per_node(Config), lists:foreach(fun(N) when (N band 1) == 1 -> {ok, _} = open_channel(Conn1); (_) -> {ok,_ } = open_channel(Conn2) @@ -137,6 +141,24 @@ node_channel_limit(Config) -> %% Now all connections are closed, so there should be 0 open connections 0 = count_channels_per_node(Config), + close_all_connections([Conn1, Conn2]), + ok. + +channel_consumers_limit(Config) -> + set_node_limit(Config, consumer_max_per_channel, 1), + + VHost = <<"foobar">>, + User = <<"guest">>, + ok = rabbit_ct_broker_helpers:add_vhost(Config, VHost), + ok = rabbit_ct_broker_helpers:set_full_permissions(Config, User, VHost), + Conn1 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VHost), + {ok, Ch} = open_channel(Conn1), + Q = <<"Q">>, Tag = <<"Tag">>, + + {ok, _} = consume(Ch, Q, Tag), + {error, not_allowed_crash} = consume(Ch, Q, Tag), % Reusing Tag should fail + + close_all_connections([Conn1]), ok. %% ------------------------------------------------------------------- @@ -157,6 +179,15 @@ set_node_limit(Config, Type, Limit) -> application, set_env, [rabbit, Type, Limit]). +consume(Ch, Q, Tag) -> + #'queue.declare_ok'{queue = Q} = amqp_channel:call(Ch, #'queue.declare'{queue = Q}), + try amqp_channel:call(Ch, #'basic.consume'{queue = Q, consumer_tag = Tag}) of + #'basic.consume_ok'{} = OK -> {ok, OK}; + NotOk -> {error, NotOk} + catch + _:_Error -> {error, not_allowed_crash} + end. + open_channel(Conn) when is_pid(Conn) -> try amqp_connection:open_channel(Conn) of {ok, Ch} -> {ok, Ch}; From 67a1685ebe23d4890f2ada3026f4e58449e79f1c Mon Sep 17 00:00:00 2001 From: Alex Valiushko Date: Fri, 15 Mar 2024 10:00:15 -0700 Subject: [PATCH 2/2] address feedback --- deps/rabbit/priv/schema/rabbit.schema | 5 ----- deps/rabbit/src/rabbit_channel.erl | 4 ++-- deps/rabbit/test/per_node_limit_SUITE.erl | 7 ++++--- 3 files changed, 6 insertions(+), 10 deletions(-) diff --git a/deps/rabbit/priv/schema/rabbit.schema b/deps/rabbit/priv/schema/rabbit.schema index f59479402b60..87fe8b47e773 100644 --- a/deps/rabbit/priv/schema/rabbit.schema +++ b/deps/rabbit/priv/schema/rabbit.schema @@ -889,11 +889,6 @@ end}. {mapping, "channel_max", "rabbit.channel_max", [{datatype, integer}]}. -%% Set the max allowed number of channels per node. -%% `infinity` means "no limit". -%% -%% {channel_max_per_node, infinity}, - {mapping, "channel_max_per_node", "rabbit.channel_max_per_node", [{datatype, [{atom, infinity}, integer]}]}. diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index f25ee6225196..c3d7202d5eec 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -105,8 +105,8 @@ consumer_prefetch, consumer_timeout, authz_context, + max_consumers, % taken from rabbit.consumer_max_per_channel %% defines how ofter gc will be executed - max_consumers % taken from rabbit.consumer_max_per_channel writer_gc_threshold }). @@ -1318,7 +1318,7 @@ handle_method(#'basic.consume'{queue = <<"amq.rabbitmq.reply-to">>, _, State = #ch{reply_consumer = ReplyConsumer, cfg = #conf{max_consumers = MaxConsumers}, consumer_mapping = ConsumerMapping}) -> - CurrentConsumers = length(maps:keys(ConsumerMapping)), + CurrentConsumers = maps:size(ConsumerMapping), case maps:find(CTag0, ConsumerMapping) of error when CurrentConsumers >= MaxConsumers -> % false when MaxConsumers is 'infinity' rabbit_misc:protocol_error( diff --git a/deps/rabbit/test/per_node_limit_SUITE.erl b/deps/rabbit/test/per_node_limit_SUITE.erl index 21af12c09847..962a7f3ede9d 100644 --- a/deps/rabbit/test/per_node_limit_SUITE.erl +++ b/deps/rabbit/test/per_node_limit_SUITE.erl @@ -145,7 +145,7 @@ node_channel_limit(Config) -> ok. channel_consumers_limit(Config) -> - set_node_limit(Config, consumer_max_per_channel, 1), + set_node_limit(Config, consumer_max_per_channel, 2), VHost = <<"foobar">>, User = <<"guest">>, @@ -155,8 +155,9 @@ channel_consumers_limit(Config) -> {ok, Ch} = open_channel(Conn1), Q = <<"Q">>, Tag = <<"Tag">>, - {ok, _} = consume(Ch, Q, Tag), - {error, not_allowed_crash} = consume(Ch, Q, Tag), % Reusing Tag should fail + {ok, _} = consume(Ch, Q, <<"Tag1">>), + {ok, _} = consume(Ch, Q, <<"Tag2">>), + {error, not_allowed_crash} = consume(Ch, Q, <<"Tag3">>), % Third consumer should fail close_all_connections([Conn1]), ok.