Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rabbit_peer_discovery: Add pre/post discovery steps #10763

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
90 changes: 72 additions & 18 deletions deps/rabbit/src/rabbit_peer_discovery.erl
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

-ifdef(TEST).
-export([sort_nodes_and_props/1,
join_selected_node/3]).
join_selected_node/4]).
-endif.

-type backend() :: atom().
Expand Down Expand Up @@ -164,6 +164,19 @@ sync_desired_cluster() ->
%% @private

sync_desired_cluster(Backend, RetriesLeft, RetryDelay) ->
case pre_discovery(Backend) of
{ok, BackendPriv} ->
try
sync_desired_cluster(
Backend, BackendPriv, RetriesLeft, RetryDelay)
after
post_discovery(Backend, BackendPriv)
end;
{error, _Reason} ->
retry_sync_desired_cluster(Backend, RetriesLeft, RetryDelay)
end.

sync_desired_cluster(Backend, BackendPriv, RetriesLeft, RetryDelay) ->
%% The peer discovery process follows the following steps:
%% 1. It uses the configured backend to query the nodes that should form
%% a cluster. It takes care of checking the validity of the returned
Expand Down Expand Up @@ -191,7 +204,8 @@ sync_desired_cluster(Backend, RetriesLeft, RetryDelay) ->
case select_node_to_join(NodesAndProps) of
SelectedNode when SelectedNode =/= false ->
Ret = join_selected_node(
Backend, SelectedNode, NodeType),
Backend, BackendPriv,
SelectedNode, NodeType),
case Ret of
ok ->
%% TODO: Check if there are multiple
Expand Down Expand Up @@ -840,8 +854,9 @@ select_node_to_join([{Node, _Members, _StartTime, false} | _]) ->
#{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
false.

-spec join_selected_node(Backend, Node, NodeType) -> Ret when
-spec join_selected_node(Backend, BackendPriv, Node, NodeType) -> Ret when
Backend :: backend(),
BackendPriv :: rabbit_peer_discovery_backend:backend_priv(),
Node :: node(),
NodeType :: rabbit_types:node_type(),
Ret :: ok | {error, Reason},
Expand All @@ -855,16 +870,17 @@ select_node_to_join([{Node, _Members, _StartTime, false} | _]) ->
%%
%% @private

join_selected_node(_Backend, ThisNode, _NodeType) when ThisNode =:= node() ->
join_selected_node(_Backend, _BackendPriv, ThisNode, _NodeType)
when ThisNode =:= node() ->
?LOG_DEBUG(
"Peer discovery: the selected node is this node; proceed with boot",
#{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
ok;
join_selected_node(Backend, SelectedNode, NodeType) ->
join_selected_node(Backend, BackendPriv, SelectedNode, NodeType) ->
?LOG_DEBUG(
"Peer discovery: trying to acquire lock",
#{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
LockResult = lock(Backend, SelectedNode),
LockResult = lock(Backend, SelectedNode, BackendPriv),
?LOG_DEBUG(
"Peer discovery: rabbit_peer_discovery:lock/0 returned ~0tp",
[LockResult],
Expand All @@ -875,7 +891,7 @@ join_selected_node(Backend, SelectedNode, NodeType) ->
"Peer discovery: no lock acquired",
#{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
join_selected_node_locked(SelectedNode, NodeType);
{ok, Data} ->
{ok, LockData} ->
?LOG_DEBUG(
"Peer discovery: lock acquired",
#{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
Expand All @@ -885,7 +901,7 @@ join_selected_node(Backend, SelectedNode, NodeType) ->
?LOG_DEBUG(
"Peer discovery: lock released",
#{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
unlock(Backend, Data)
unlock(Backend, LockData, BackendPriv)
end;
{error, _Reason} = Error ->
?LOG_WARNING(
Expand Down Expand Up @@ -1039,14 +1055,43 @@ unregister(Backend) ->
ok
end.

-spec lock(Backend, SelectedNode) -> Ret when
pre_discovery(Backend) ->
case erlang:function_exported(Backend, pre_discovery, 0) of
true ->
Ret = Backend:pre_discovery(),
case Ret of
{ok, _} ->
Ret;
{error, Reason} = Error ->
?LOG_ERROR(
"Peer discovery: failed to run pre-discovery for "
"peer discovery backend ~ts: ~0tp",
[Backend, Reason],
#{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
Error;
Any ->
Any
end;
false ->
{ok, undefined}
end.

post_discovery(Backend, BackendPriv) ->
case erlang:function_exported(Backend, post_discovery, 1) of
true -> _ = Backend:post_discovery(BackendPriv);
false -> ok
end,
ok.

-spec lock(Backend, SelectedNode, BackendPriv) -> Ret when
Backend :: backend(),
SelectedNode :: node(),
Ret :: {ok, Data} | not_supported | {error, Reason},
Data :: any(),
BackendPriv :: rabbit_peer_discovery_backend:backend_priv(),
Ret :: {ok, LockData} | not_supported | {error, Reason},
LockData :: rabbit_peer_discovery_backend:lock_data(),
Reason :: string().

lock(Backend, SelectedNode) ->
lock(Backend, BackendPriv, SelectedNode) ->
?LOG_INFO(
"Peer discovery: will try to lock with peer discovery backend ~ts",
[Backend],
Expand All @@ -1066,7 +1111,11 @@ lock(Backend, SelectedNode) ->
%% specific node, not all of them.
ThisNode = node(),
NodesToLock = [ThisNode, SelectedNode],
case Backend:lock(NodesToLock) of
Ret = case erlang:function_exported(Backend, lock, 2) of
true -> Backend:lock(NodesToLock, BackendPriv);
false -> Backend:lock(NodesToLock)
end,
case Ret of
{error, Reason} = Error ->
?LOG_ERROR(
"Peer discovery: failed to lock with peer discovery "
Expand All @@ -1078,24 +1127,29 @@ lock(Backend, SelectedNode) ->
Any
end.

-spec unlock(Backend, Data) -> Ret when
-spec unlock(Backend, LockData, BackendPriv) -> Ret when
Backend :: backend(),
Data :: any(),
LockData :: rabbit_peer_discovery_backend:lock_data(),
BackendPriv :: rabbit_peer_discovery_backend:backend_priv(),
Ret :: ok | {error, Reason},
Reason :: string().

unlock(Backend, Data) ->
unlock(Backend, LockData, BackendPriv) ->
?LOG_INFO(
"Peer discovery: will try to unlock with peer discovery "
"backend ~ts",
[Backend],
#{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
case Backend:unlock(Data) of
Ret = case erlang:function_exported(Backend, unlock, 3) of
true -> Backend:unlock(LockData, BackendPriv);
false -> Backend:unlock(LockData)
end,
case Ret of
{error, Reason} = Error ->
?LOG_ERROR(
"Peer discovery: failed to unlock with peer discovery "
"backend ~ts: ~0tp, lock data: ~0tp",
[Backend, Reason, Data],
[Backend, Reason, LockData],
#{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
Error;
Any ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ init_with_lock_exits_after_errors(_Config) ->
meck:expect(rabbit_peer_discovery_classic_config, lock, fun(_) -> {error, "test error"} end),
?assertEqual(
{error, "test error"},
rabbit_peer_discovery:join_selected_node(rabbit_peer_discovery_classic_config, missing@localhost, disc)),
rabbit_peer_discovery:join_selected_node(rabbit_peer_discovery_classic_config, undefined, missing@localhost, disc)),
?assert(meck:validate(rabbit_peer_discovery_classic_config)),
passed.

Expand All @@ -61,15 +61,15 @@ init_with_lock_ignore_after_errors(_Config) ->
meck:expect(rabbit_peer_discovery_classic_config, lock, fun(_) -> {error, "test error"} end),
?assertEqual(
{error, {aborted_feature_flags_compat_check, {error, feature_flags_file_not_set}}},
rabbit_peer_discovery:join_selected_node(rabbit_peer_discovery_classic_config, missing@localhost, disc)),
rabbit_peer_discovery:join_selected_node(rabbit_peer_discovery_classic_config, undefined, missing@localhost, disc)),
?assert(meck:validate(rabbit_peer_discovery_classic_config)),
passed.

init_with_lock_not_supported(_Config) ->
meck:expect(rabbit_peer_discovery_classic_config, lock, fun(_) -> not_supported end),
?assertEqual(
{error, {aborted_feature_flags_compat_check, {error, feature_flags_file_not_set}}},
rabbit_peer_discovery:join_selected_node(rabbit_peer_discovery_classic_config, missing@localhost, disc)),
rabbit_peer_discovery:join_selected_node(rabbit_peer_discovery_classic_config, undefined, missing@localhost, disc)),
?assert(meck:validate(rabbit_peer_discovery_classic_config)),
passed.

Expand All @@ -78,6 +78,6 @@ init_with_lock_supported(_Config) ->
meck:expect(rabbit_peer_discovery_classic_config, unlock, fun(data) -> ok end),
?assertEqual(
{error, {aborted_feature_flags_compat_check, {error, feature_flags_file_not_set}}},
rabbit_peer_discovery:join_selected_node(rabbit_peer_discovery_classic_config, missing@localhost, disc)),
rabbit_peer_discovery:join_selected_node(rabbit_peer_discovery_classic_config, undefined, missing@localhost, disc)),
?assert(meck:validate(rabbit_peer_discovery_classic_config)),
passed.
36 changes: 33 additions & 3 deletions deps/rabbit_common/src/rabbit_peer_discovery_backend.erl
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,38 @@

-callback post_registration() -> ok | {error, Reason :: string()}.

-callback lock(Nodes :: [node()]) -> {ok, Data :: term()} | not_supported | {error, Reason :: string()}.
-callback pre_discovery() ->
{ok, BackendPriv :: backend_priv()} | {error, Reason :: string()}.

-callback unlock(Data :: term()) -> ok.
-callback post_discovery(BackendPriv :: backend_priv()) ->
ok.

-optional_callbacks([init/0]).
-callback lock(Nodes :: [node()]) ->
{ok, LockData :: lock_data()} |
not_supported |
{error, Reason :: string()}.

-callback lock(Nodes :: [node()], BackendPriv :: term()) ->
{ok, LockData :: lock_data()} |
not_supported |
{error, Reason :: string()}.

-callback unlock(LockData :: lock_data()) ->
ok.

-callback unlock(LockData :: lock_data(), BackendPriv :: backend_priv()) ->
ok.

-type backend_priv() :: any().
-type lock_data() :: any().

-optional_callbacks([init/0,
pre_discovery/0,
post_discovery/1,
lock/1,
lock/2,
unlock/1,
unlock/2]).

-export_type([backend_priv/0,
lock_data/0]).