Skip to content

Commit

Permalink
Have minimum queue TTL take effect for quorum queues
Browse files Browse the repository at this point in the history
For classic queues, if both policy and queue argument are set
for queue TTL, the minimum takes effect.

Prior to this commit, for quorum queues if both policy and
queue argument are set for queue TTL, the policy always overrides the
queue argument.

This commit brings the quorum queue queue TTL resolution to classic
queue's behaviour. This allows developers to provide a custom lower
queue TTL while the operator policy acts an upper bound safe-guard.
  • Loading branch information
ansd committed May 16, 2024
1 parent 58b36b8 commit 1eed7c2
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 16 deletions.
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_quorum_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ ra_machine_config(Q) when ?is_amqqueue(Q) ->
MaxMemoryLength = args_policy_lookup(<<"max-in-memory-length">>, fun min/2, Q),
MaxMemoryBytes = args_policy_lookup(<<"max-in-memory-bytes">>, fun min/2, Q),
DeliveryLimit = args_policy_lookup(<<"delivery-limit">>, fun min/2, Q),
Expires = args_policy_lookup(<<"expires">>, fun policyHasPrecedence/2, Q),
Expires = args_policy_lookup(<<"expires">>, fun min/2, Q),
MsgTTL = args_policy_lookup(<<"message-ttl">>, fun min/2, Q),
#{name => Name,
queue_resource => QName,
Expand Down
38 changes: 23 additions & 15 deletions deps/rabbit/test/quorum_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -3168,26 +3168,33 @@ delete_if_unused(Config) ->

queue_ttl(Config) ->
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),

Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
QQ = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
{<<"x-expires">>, long, 1000}])),
%% check queue no longer exists

%% Set policy to 10 seconds.
PolicyName = <<"my-queue-ttl-policy">>,
ok = rabbit_ct_broker_helpers:set_policy(
Config, 0, PolicyName, QQ, <<"queues">>,
[{<<"expires">>, 10000}]),
%% Set queue arg to 1 second.
QArgs = [{<<"x-queue-type">>, longstr, <<"quorum">>},
{<<"x-expires">>, long, 1000}],
?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(Ch, QQ, QArgs)),
%% The minimum should take effect.
?awaitMatch(
{'EXIT', {{shutdown,
{server_initiated_close,404,
<<"NOT_FOUND - no queue 'queue_ttl' in vhost '/'">>}},
_}},
catch amqp_channel:call(Ch, #'queue.declare'{queue = QQ,
passive = true,
durable = true,
auto_delete = false,
arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>},
{<<"x-expires">>, long, 1000}]}),
30000),
ok.
catch amqp_channel:call(Ch, #'queue.declare'{
queue = QQ,
passive = true,
durable = true,
auto_delete = false,
arguments = QArgs}),
5_000),
ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, PolicyName).

consumer_priorities(Config) ->
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Expand Down Expand Up @@ -3512,8 +3519,9 @@ leader_locator_policy(Config) ->
Qs = [?config(queue_name, Config),
?config(alt_queue_name, Config),
?config(alt_2_queue_name, Config)],
PolicyName = <<"my-leader-locator">>,
ok = rabbit_ct_broker_helpers:set_policy(
Config, 0, <<"my-leader-locator">>, <<"leader_locator_policy_.*">>, <<"queues">>,
Config, 0, PolicyName, <<"leader_locator_policy_.*">>, <<"queues">>,
[{<<"queue-leader-locator">>, <<"balanced">>}]),

Leaders = [begin
Expand All @@ -3527,7 +3535,7 @@ leader_locator_policy(Config) ->
[?assertMatch(#'queue.delete_ok'{},
amqp_channel:call(Ch, #'queue.delete'{queue = Q}))
|| Q <- Qs],
ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"my-leader-locator">>),
ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, PolicyName),

?assertEqual(3, length(lists:usort(Leaders))),
ok.
Expand Down

0 comments on commit 1eed7c2

Please sign in to comment.