Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle database timeouts from Khepri minority #10915

Draft
wants to merge 23 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
572ec51
WIP: Bump Khepri to X
the-mikedavis Apr 3, 2024
b8d8a21
Introduce a rabbit_khepri:timeout_error() error type
the-mikedavis Apr 3, 2024
d3832f7
Improve cluster_minority_SUITE
the-mikedavis Apr 2, 2024
e1d785f
Handle database failures when declaring exchanges
the-mikedavis Apr 2, 2024
3fbad38
Handle database failures when declaring queues
the-mikedavis Apr 30, 2024
45fb884
Handle database failures when adding/removing bindings
the-mikedavis Apr 3, 2024
08572d9
Handle database failures when deleting exchanges
the-mikedavis Apr 30, 2024
b1ee0ce
rabbit_db_queue: Transactionally delete transient queues from Khepri
the-mikedavis Apr 30, 2024
0232bce
Ignore timeout errors from deleting transient queues on node down
the-mikedavis Apr 30, 2024
5fce957
minor: Correct outdated spec for rabbit_amqqueue:lookup/1
the-mikedavis May 6, 2024
bed8267
rabbit_db_queue: Bubble up errors in set_many/1 with Khepri enabled
the-mikedavis May 6, 2024
b8494be
rabbit_db_user: Raise instead of 'khepri_tx:abort/1'
the-mikedavis May 7, 2024
6ece1bf
rabbit_db_exchange: Reflect possible failure in update/2 spec
the-mikedavis May 7, 2024
e804034
rabbit_db_exchange: Bubble up database errors in set/1
the-mikedavis May 7, 2024
f0567ee
rabbit_db_exchange: Raise database errors in next_serial/1
the-mikedavis May 7, 2024
e81b064
rabbit_db_exchange: Bubble up errors in delete_serial/1
the-mikedavis May 7, 2024
1a6489f
rabbit_db_exchange: Raise Khepri errors instead of throwing in clear/0
the-mikedavis May 7, 2024
f069001
rabbit_db_vhost: Declare no-return in create_or_get/3 spec
the-mikedavis May 7, 2024
d45cda6
rabbit_db_vhost: Bubble up database errors in delete/1
the-mikedavis May 7, 2024
ba55fbb
rabbit_db_vhost: Bubble up database errors in clear/0
the-mikedavis May 7, 2024
be6644e
rabbit_db_rtparams: Handle timeout failures from set/set_global
the-mikedavis May 13, 2024
ca55031
rabbit_runtime_parameters: Remove unused value_global/2, value/4
the-mikedavis May 13, 2024
6add459
rabbit_runtime_parameters: Handle timeout failures in clear functions
the-mikedavis May 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions MODULE.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -303,11 +303,11 @@ erlang_package.hex_package(
version = "1.4.1",
)

erlang_package.hex_package(
erlang_package.git_package(
name = "khepri",
build_file = "@rabbitmq-server//bazel:BUILD.khepri",
sha256 = "1157d963eb5c002e040bbc86348669818d1da86d259a3705008655fefbd7f1c2",
version = "0.13.0",
commit = "e65e3b38d772b76ef3974feec5697fb9f995f0d7",
repository = "rabbitmq/khepri",
)

erlang_package.hex_package(
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1227,7 +1227,7 @@ rabbitmq_integration_suite(

rabbitmq_integration_suite(
name = "cluster_minority_SUITE",
size = "large",
size = "medium",
additional_beam = [
":test_clustering_utils_beam",
],
Expand Down
14 changes: 11 additions & 3 deletions deps/rabbit/src/rabbit_amqp_management.erl
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,17 @@ handle_http_req(<<"PUT">>,
{error, not_found} ->
ok = prohibit_cr_lf(XNameBin),
ok = prohibit_reserved_amq(XName),
rabbit_exchange:declare(
XName, XTypeAtom, Durable, AutoDelete,
Internal, XArgs, Username)
case rabbit_exchange:declare(
XName, XTypeAtom, Durable, AutoDelete,
Internal, XArgs, Username) of
{ok, DeclaredX} ->
DeclaredX;
{error, timeout} ->
throw(
<<"500">>,
"Could not create exchange '~ts' due to timeout",
[XName])
end
end,
try rabbit_exchange:assert_equivalence(
X, XTypeAtom, Durable, AutoDelete, Internal, XArgs) of
Expand Down
34 changes: 22 additions & 12 deletions deps/rabbit/src/rabbit_amqqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,8 @@ find_recoverable_queues() ->
{'new' | 'existing' | 'owner_died', amqqueue:amqqueue()} |
{'new', amqqueue:amqqueue(), rabbit_fifo_client:state()} |
{'absent', amqqueue:amqqueue(), absent_reason()} |
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()} |
{error, Reason :: term()}.
declare(QueueName, Durable, AutoDelete, Args, Owner, ActingUser) ->
declare(QueueName, Durable, AutoDelete, Args, Owner, ActingUser, node()).

Expand All @@ -219,7 +220,8 @@ declare(QueueName, Durable, AutoDelete, Args, Owner, ActingUser) ->
node() | {'ignore_location', node()}) ->
{'new' | 'existing' | 'owner_died', amqqueue:amqqueue()} |
{'absent', amqqueue:amqqueue(), absent_reason()} |
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()} |
{error, Reason :: term()}.
declare(QueueName = #resource{virtual_host = VHost}, Durable, AutoDelete, Args,
Owner, ActingUser, Node) ->
ok = check_declare_arguments(QueueName, Args),
Expand Down Expand Up @@ -258,8 +260,13 @@ get_queue_type(Args) ->
rabbit_queue_type:discover(V)
end.

-spec internal_declare(amqqueue:amqqueue(), boolean()) ->
{created | existing, amqqueue:amqqueue()} | queue_absent().
-spec internal_declare(Q, Recover) -> Ret when
Q :: amqqueue:amqqueue(),
Recover :: boolean(),
Ret :: {created, amqqueue:amqqueue()}
| {existing, amqqueue:amqqueue()}
| queue_absent()
| rabbit_khepri:timeout_error().

internal_declare(Q, Recover) ->
do_internal_declare(Q, Recover).
Expand Down Expand Up @@ -291,7 +298,7 @@ update(Name, Fun) ->
ensure_rabbit_queue_record_is_initialized(Q) ->
store_queue(Q).

-spec store_queue(amqqueue:amqqueue()) -> 'ok'.
-spec store_queue(amqqueue:amqqueue()) -> 'ok' | {error, timeeout}.

store_queue(Q0) ->
Q = rabbit_queue_decorator:set(Q0),
Expand Down Expand Up @@ -332,12 +339,10 @@ is_server_named_allowed(Args) ->
Type = get_queue_type(Args),
rabbit_queue_type:is_server_named_allowed(Type).

-spec lookup
(name()) ->
rabbit_types:ok(amqqueue:amqqueue()) |
rabbit_types:error('not_found');
([name()]) ->
[amqqueue:amqqueue()].
-spec lookup(QueueName) -> Ret when
QueueName :: name(),
Ret :: rabbit_types:ok(amqqueue:amqqueue())
| rabbit_types:error('not_found').

lookup(Name) when is_record(Name, resource) ->
rabbit_db_queue:get(Name).
Expand Down Expand Up @@ -1988,7 +1993,12 @@ maybe_clear_recoverable_node(Node) ->
on_node_down(Node) ->
{Time, Ret} = timer:tc(fun() -> rabbit_db_queue:delete_transient(filter_transient_queues_to_delete(Node)) end),
case Ret of
ok -> ok;
{error, timeout} ->
%% This type of failure is only possible with Khepri but transient
%% entities are going away as Khepri stabilizes.
rabbit_log:warning("Failed to delete transient queues on node "
"down due to a timeout"),
ok;
{QueueNames, Deletions} ->
case length(QueueNames) of
0 -> ok;
Expand Down
3 changes: 2 additions & 1 deletion deps/rabbit/src/rabbit_binding.erl
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@
-type bind_ok_or_error() :: 'ok' | bind_errors() |
rabbit_types:error({'binding_invalid', string(), [any()]}) |
%% inner_fun() result
rabbit_types:error(rabbit_types:amqp_error()).
rabbit_types:error(rabbit_types:amqp_error()) |
rabbit_khepri:timeout_error().
-type bind_res() :: bind_ok_or_error() | rabbit_misc:thunk(bind_ok_or_error()).
-type inner_fun() ::
fun((rabbit_types:exchange(),
Expand Down
37 changes: 29 additions & 8 deletions deps/rabbit/src/rabbit_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1866,6 +1866,9 @@ binding_action(Action, Binding, Username, ConnPid) ->
rabbit_misc:protocol_error(precondition_failed, Fmt, Args);
{error, #amqp_error{} = Error} ->
rabbit_misc:protocol_error(Error);
{error, timeout} ->
rabbit_misc:protocol_error(
internal_error, "Could not ~s binding due to timeout", [Action]);
ok ->
ok
end.
Expand Down Expand Up @@ -2514,7 +2517,12 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
%% just so nothing fails.
{ok, QueueName, 0, 0};
{protocol_error, ErrorType, Reason, ReasonArgs} ->
rabbit_misc:protocol_error(ErrorType, Reason, ReasonArgs)
rabbit_misc:protocol_error(ErrorType, Reason, ReasonArgs);
{error, timeout} ->
rabbit_misc:protocol_error(
internal_error,
"Could not declare queue '~ts' due to timeout",
[rabbit_misc:rs(QueueName)])
end;
{error, {absent, Q, Reason}} ->
rabbit_amqqueue:absent(Q, Reason)
Expand Down Expand Up @@ -2557,6 +2565,11 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
ok;
{error, in_use} ->
rabbit_misc:precondition_failed("~ts in use", [rabbit_misc:rs(ExchangeName)]);
{error, timeout} ->
rabbit_misc:protocol_error(
internal_error,
"Could not delete exchange '~ts' due to timeout",
[rabbit_misc:rs(ExchangeName)]);
ok ->
ok
end;
Expand Down Expand Up @@ -2606,13 +2619,21 @@ handle_method(#'exchange.declare'{exchange = XNameBin,
check_write_permitted(AName, User, AuthzContext),
ok
end,
rabbit_exchange:declare(ExchangeName,
CheckedType,
Durable,
AutoDelete,
Internal,
Args,
Username)
case rabbit_exchange:declare(ExchangeName,
CheckedType,
Durable,
AutoDelete,
Internal,
Args,
Username) of
{ok, DeclaredX} ->
DeclaredX;
{error, timeout} ->
rabbit_misc:protocol_error(
internal_error,
"Could not create exchange '~ts' due to timeout",
[rabbit_misc:rs(ExchangeName)])
end
end,
ok = rabbit_exchange:assert_equivalence(X, CheckedType, Durable,
AutoDelete, Internal, Args);
Expand Down
84 changes: 43 additions & 41 deletions deps/rabbit/src/rabbit_db_exchange.erl
Original file line number Diff line number Diff line change
Expand Up @@ -256,9 +256,10 @@ count_in_khepri() ->
%% update().
%% -------------------------------------------------------------------

-spec update(ExchangeName, UpdateFun) -> ok when
-spec update(ExchangeName, UpdateFun) -> Ret when
ExchangeName :: rabbit_exchange:name(),
UpdateFun :: fun((Exchange) -> Exchange).
UpdateFun :: fun((Exchange) -> Exchange),
Ret :: ok | rabbit_khepri:timeout_error().
%% @doc Updates an existing exchange record using the result of
%% `UpdateFun'.
%%
Expand Down Expand Up @@ -358,7 +359,9 @@ update_in_khepri_tx(Name, Fun) ->

-spec create_or_get(Exchange) -> Ret when
Exchange :: rabbit_types:exchange(),
Ret :: {new, Exchange} | {existing, Exchange}.
Ret :: {new, Exchange} |
{existing, Exchange} |
rabbit_khepri:timeout_error().
%% @doc Writes an exchange record if it doesn't exist already or returns
%% the existing one.
%%
Expand Down Expand Up @@ -390,15 +393,18 @@ create_or_get_in_khepri(#exchange{name = XName} = X) ->
ok ->
{new, X};
{error, {khepri, mismatching_node, #{node_props := #{data := ExistingX}}}} ->
{existing, ExistingX}
{existing, ExistingX};
{error, _} = Error ->
Error
end.

%% -------------------------------------------------------------------
%% set().
%% -------------------------------------------------------------------

-spec set([Exchange]) -> ok when
Exchange :: rabbit_types:exchange().
-spec set([Exchange]) -> Ret when
Exchange :: rabbit_types:exchange(),
Ret :: ok | rabbit_khepri:timeout_error().
%% @doc Writes the exchange records.
%%
%% @returns ok.
Expand All @@ -414,16 +420,16 @@ set(Xs) ->
set_in_mnesia(Xs) when is_list(Xs) ->
rabbit_mnesia:execute_mnesia_transaction(
fun () ->
[mnesia:write(rabbit_durable_exchange, X, write) || X <- Xs]
end),
ok.
_ = [mnesia:write(rabbit_durable_exchange, X, write) || X <- Xs],
ok
end).

set_in_khepri(Xs) when is_list(Xs) ->
rabbit_khepri:transaction(
fun() ->
[set_in_khepri_tx(X) || X <- Xs]
end, rw),
ok.
_ = [set_in_khepri_tx(X) || X <- Xs],
ok
end, rw).

set_in_khepri_tx(X) ->
Path = khepri_exchange_path(X#exchange.name),
Expand Down Expand Up @@ -474,8 +480,9 @@ peek_serial_in_khepri(XName) ->
%% next_serial().
%% -------------------------------------------------------------------

-spec next_serial(ExchangeName) -> Serial when
-spec next_serial(ExchangeName) -> Ret when
ExchangeName :: rabbit_exchange:name(),
Ret :: Serial | no_return(),
Serial :: integer().
%% @doc Returns the next serial number and increases it.
%%
Expand Down Expand Up @@ -505,39 +512,30 @@ next_serial_in_mnesia_tx(XName) ->
Serial.

next_serial_in_khepri(XName) ->
%% Just storing the serial number is enough, no need to keep #exchange_serial{}
Path = khepri_exchange_serial_path(XName),
Ret1 = rabbit_khepri:adv_get(Path),
case Ret1 of
{ok, #{data := Serial,
payload_version := Vsn}} ->
UpdatePath =
khepri_path:combine_with_conditions(
Path, [#if_payload_version{version = Vsn}]),
case rabbit_khepri:put(UpdatePath, Serial + 1) of
ok ->
Serial;
{error, {khepri, mismatching_node, _}} ->
next_serial_in_khepri(XName);
Err ->
Err
end;
_ ->
Serial = 1,
ok = rabbit_khepri:put(Path, Serial + 1),
Ret = rabbit_khepri:transaction(
fun() ->
next_serial_in_khepri_tx(XName)
end, rw),
case Ret of
{error, Reason} ->
erlang:error(Reason);
Serial ->
Serial
end.

-spec next_serial_in_khepri_tx(Exchange) -> Serial when
Exchange :: rabbit_types:exchange(),
Exchange :: rabbit_types:exchange() | rabbit_exchange:name(),
Serial :: integer().

next_serial_in_khepri_tx(#exchange{name = XName}) ->
next_serial_in_khepri_tx(XName);
next_serial_in_khepri_tx(#resource{} = XName) ->
Path = khepri_exchange_serial_path(XName),
Serial = case khepri_tx:get(Path) of
{ok, Serial0} -> Serial0;
_ -> 1
end,
%% Just storing the serial number is enough, no need to keep #exchange_serial{}
ok = khepri_tx:put(Path, Serial + 1),
Serial.

Expand All @@ -551,7 +549,10 @@ next_serial_in_khepri_tx(#exchange{name = XName}) ->
Exchange :: rabbit_types:exchange(),
Binding :: rabbit_types:binding(),
Deletions :: dict:dict(),
Ret :: {error, not_found} | {error, in_use} | {deleted, Exchange, [Binding], Deletions}.
Ret :: {error, not_found}
| {error, in_use}
| {deleted, Exchange, [Binding], Deletions}
| rabbit_khepri:timeout_error().
%% @doc Deletes an exchange record from the database. If `IfUnused' is set
%% to `true', it is only deleted when there are no bindings present on the
%% exchange.
Expand Down Expand Up @@ -636,11 +637,12 @@ delete_in_khepri(X = #exchange{name = XName}, OnlyDurable, RemoveBindingsForSour
%% delete_serial().
%% -------------------------------------------------------------------

-spec delete_serial(ExchangeName) -> ok when
ExchangeName :: rabbit_exchange:name().
-spec delete_serial(ExchangeName) -> Ret when
ExchangeName :: rabbit_exchange:name(),
Ret :: ok | rabbit_khepri:timeout_error().
%% @doc Deletes an exchange serial record from the database.
%%
%% @returns ok
%% @returns ok if the deletion succeeds or an error tuple otherwise.
%%
%% @private

Expand All @@ -658,7 +660,7 @@ delete_serial_in_mnesia(XName) ->

delete_serial_in_khepri(XName) ->
Path = khepri_exchange_serial_path(XName),
ok = rabbit_khepri:delete(Path).
rabbit_khepri:delete(Path).

%% -------------------------------------------------------------------
%% recover().
Expand Down Expand Up @@ -779,7 +781,7 @@ exists_in_khepri(Name) ->
%% clear().
%% -------------------------------------------------------------------

-spec clear() -> ok.
-spec clear() -> ok | no_return().
%% @doc Deletes all exchanges.
%%
%% @private
Expand All @@ -803,7 +805,7 @@ clear_in_khepri() ->
khepri_delete(Path) ->
case rabbit_khepri:delete(Path) of
ok -> ok;
Error -> throw(Error)
{error, _} = Error -> erlang:error(Error)
end.

%% -------------------------------------------------------------------
Expand Down