Skip to content

Commit

Permalink
Better handle QQ delete member operation when member already removed
Browse files Browse the repository at this point in the history
  • Loading branch information
kjnilsson committed Apr 29, 2024
1 parent 00b9aa5 commit 844992d
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 4 deletions.
11 changes: 7 additions & 4 deletions deps/rabbit/src/rabbit_quorum_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@
-define(START_CLUSTER_RPC_TIMEOUT, 60_000). %% needs to be longer than START_CLUSTER_TIMEOUT
-define(TICK_TIMEOUT, 5000). %% the ra server tick time
-define(DELETE_TIMEOUT, 5000).
-define(ADD_MEMBER_TIMEOUT, 5000).
-define(MEMBER_CHANGE_TIMEOUT, 20_000).
-define(SNAPSHOT_INTERVAL, 8192). %% the ra default is 4096
-define(UNLIMITED_PREFETCH_COUNT, 2000). %% something large for ra

Expand Down Expand Up @@ -1201,7 +1201,7 @@ add_member(Q, Node) ->
add_member(Q, Node, promotable).

add_member(Q, Node, Membership) ->
add_member(Q, Node, Membership, ?ADD_MEMBER_TIMEOUT).
add_member(Q, Node, Membership, ?MEMBER_CHANGE_TIMEOUT).

add_member(VHost, Name, Node, Timeout) when is_binary(VHost) ->
%% NOTE needed to pass mixed cluster tests.
Expand Down Expand Up @@ -1278,8 +1278,11 @@ delete_member(Q, Node) when ?amqqueue_is_quorum(Q) ->
%% deleting the last member is not allowed
{error, last_node};
Members ->
case ra:remove_member(Members, ServerId) of
{ok, _, _Leader} ->
case ra:remove_member(Members, ServerId, ?MEMBER_CHANGE_TIMEOUT) of
Res when element(1, Res) == ok orelse
Res == {error, not_member} ->
%% if not a member we can still proceed with updating the
%% mnesia record and clean up server if still running
Fun = fun(Q1) ->
update_type_state(
Q1,
Expand Down
27 changes: 27 additions & 0 deletions deps/rabbit/test/quorum_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ groups() ->
delete_member_queue_not_found,
delete_member,
delete_member_not_a_member,
delete_member_member_already_deleted,
node_removal_is_quorum_critical]
++ memory_tests()},
{cluster_size_3, [], [
Expand Down Expand Up @@ -1954,6 +1955,32 @@ delete_member_not_a_member(Config) ->
rpc:call(Server, rabbit_quorum_queue, delete_member,
[<<"/">>, QQ, Server])).

delete_member_member_already_deleted(Config) ->
[Server, Server2] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
NServers = length(Servers),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
QQ = ?config(queue_name, Config),
RaName = ra_name(QQ),
?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
?awaitMatch(NServers, count_online_nodes(Server, <<"/">>, QQ), ?DEFAULT_AWAIT),
ServerId = {RaName, Server},
ServerId2 = {RaName, Server2},
%% use are APU directory to simulate situation where the ra:remove_server/2
%% call timed out but later succeeded
?assertMatch(ok,
rpc:call(Server2, ra, leave_and_terminate,
[quorum_queues, ServerId, ServerId2])),

%% idempotent by design
?assertEqual(ok,
rpc:call(Server, rabbit_quorum_queue, delete_member,
[<<"/">>, QQ, Server2])),
{ok, Q} = rpc:call(Server, rabbit_amqqueue, lookup, [QQ, <<"/">>]),
#{nodes := Nodes} = amqqueue:get_type_state(Q),
?assertEqual(1, length(Nodes)),
ok.

delete_member_during_node_down(Config) ->
[Server, DownServer, Remove] = Servers = rabbit_ct_broker_helpers:get_node_configs(
Config, nodename),
Expand Down
1 change: 1 addition & 0 deletions rabbitmq-components.mk
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ dep_khepri_mnesia_migration = hex 0.4.0
dep_looking_glass = git https://github.com/rabbitmq/looking_glass.git main
dep_prometheus = hex 4.11.0
dep_ra = hex 2.10.0
dep_prometheus = hex 4.11.0
dep_ranch = hex 2.1.0
dep_recon = hex 2.5.3
dep_redbug = hex 2.0.7
Expand Down

0 comments on commit 844992d

Please sign in to comment.