Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Close #10543. Add If-Unused and If-Empty support for delete_queue for QQs #10711

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions deps/rabbit/docs/rabbitmqctl.8
Expand Up @@ -2130,9 +2130,9 @@ Evaluates an Erlang expression on the target node
.It Ar queue_name
The name of the queue to delete.
.It Ar --if-empty
Delete the queue if it is empty (has no messages ready for delivery)
Delete the queue if it is empty (has no messages ready for delivery). Ignored for Streams
.It Ar --if-unused
Delete the queue only if it has no consumers
Delete the queue only if it has no consumers. Ignored for Streams
.El
.Pp
Deletes a queue.
Expand Down
91 changes: 44 additions & 47 deletions deps/rabbit/src/rabbit_quorum_queue.erl
Expand Up @@ -713,59 +713,56 @@ restart_server({_, _} = Ref) ->
-spec delete(amqqueue:amqqueue(),
boolean(), boolean(),
rabbit_types:username()) ->
{ok, QLen :: non_neg_integer()} |
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
delete(Q, true, _IfEmpty, _ActingUser) when ?amqqueue_is_quorum(Q) ->
{protocol_error, not_implemented,
"cannot delete ~ts. queue.delete operations with if-unused flag set are not supported by quorum queues",
[rabbit_misc:rs(amqqueue:get_name(Q))]};
delete(Q, _IfUnused, true, _ActingUser) when ?amqqueue_is_quorum(Q) ->
{protocol_error, not_implemented,
"cannot delete ~ts. queue.delete operations with if-empty flag set are not supported by quorum queues",
[rabbit_misc:rs(amqqueue:get_name(Q))]};
delete(Q, _IfUnused, _IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) ->
{ok, QLen :: non_neg_integer()}.
delete(Q, IfUnused, IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) ->
{Name, _} = amqqueue:get_pid(Q),
QName = amqqueue:get_name(Q),
QNodes = get_nodes(Q),
%% TODO Quorum queue needs to support consumer tracking for IfUnused
Timeout = ?DELETE_TIMEOUT,
{ok, ReadyMsgs, _} = stat(Q),
Servers = [{Name, Node} || Node <- QNodes],
case ra:delete_cluster(Servers, Timeout) of
{ok, {_, LeaderNode} = Leader} ->
MRef = erlang:monitor(process, Leader),
receive
{'DOWN', MRef, process, _, _} ->

{ok, ReadyMsgs, Consumers} = stat(Q),
IsEmpty = ReadyMsgs == 0,
IsUnused = Consumers == 0,
if IfEmpty and not(IsEmpty) -> {error, not_empty};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no need to line up bits of code like that. Please only use a single white space between things.
If you want the start of clauses to line up in columns use newlines.

IfUnused and not(IsUnused) -> {error, in_use};
true ->
Timeout = ?DELETE_TIMEOUT,
Servers = [{Name, Node} || Node <- QNodes],
case ra:delete_cluster(Servers, Timeout) of
{ok, {_, LeaderNode} = Leader} ->
MRef = erlang:monitor(process, Leader),
receive
{'DOWN', MRef, process, _, _} ->
ok
after Timeout ->
erlang:demonitor(MRef, [flush]),
ok = force_delete_queue(Servers)
end,
notify_decorators(QName, shutdown),
ok = delete_queue_data(Q, ActingUser),
_ = erpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName],
?RPC_TIMEOUT),
{ok, ReadyMsgs};
{error, {no_more_servers_to_try, Errs}} ->
case lists:all(fun({{error, noproc}, _}) -> true;
(_) -> false
end, Errs) of
true ->
%% If all ra nodes were already down, the delete
%% has succeed
delete_queue_data(Q, ActingUser),
after Timeout ->
erlang:demonitor(MRef, [flush]),
ok = force_delete_queue(Servers)
end,
notify_decorators(QName, shutdown),
ok = delete_queue_data(Q, ActingUser),
_ = erpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName],
?RPC_TIMEOUT),
{ok, ReadyMsgs};
false ->
%% attempt forced deletion of all servers
rabbit_log:warning(
"Could not delete quorum '~ts', not enough nodes "
" online to reach a quorum: ~255p."
{error, {no_more_servers_to_try, Errs}} ->
case lists:all(fun({{error, noproc}, _}) -> true;
(_) -> false
end, Errs) of
true ->
%% If all ra nodes were already down, the delete
%% has succeed
delete_queue_data(Q, ActingUser),
{ok, ReadyMsgs};
false ->
%% attempt forced deletion of all servers
rabbit_log:warning(
"Could not delete quorum '~ts', not enough nodes "
" online to reach a quorum: ~255p."
" Attempting force delete.",
[rabbit_misc:rs(QName), Errs]),
ok = force_delete_queue(Servers),
notify_decorators(QName, shutdown),
delete_queue_data(Q, ActingUser),
{ok, ReadyMsgs}
[rabbit_misc:rs(QName), Errs]),
ok = force_delete_queue(Servers),
notify_decorators(QName, shutdown),
delete_queue_data(Q, ActingUser),
{ok, ReadyMsgs}
end
end
end.

Expand Down