Skip to content

Commit

Permalink
rabbitmq_peer_discovery_consul: Implement the new pre/post discovery …
Browse files Browse the repository at this point in the history
…callbacks

[Why]
The Consul peer discovery backend needs to create a session before it
can acquire a lock. This session is also required for nodes to discover
each other.

It must open the session before the `list_nodes/0` callback can return
meaningful results.

[How]
The new `pre_discovery/0` and `post_discovery/1` callbacks are used to
create and delete that session before the whole discover/lock/join
process.

Fixes #10760.
  • Loading branch information
dumbbell committed Mar 18, 2024
1 parent f65353d commit 496cfde
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,19 @@
-include("rabbit_peer_discovery_consul.hrl").

-export([init/0, list_nodes/0, supports_registration/0, register/0, unregister/0,
post_registration/0, lock/1, unlock/1]).
post_registration/0, lock/2, unlock/2,
pre_discovery/0, post_discovery/1]).
-export([send_health_check_pass/0]).
-export([session_ttl_update_callback/1]).
%% for debugging from the REPL
-export([service_id/0, service_address/0]).
%% for debugging from the REPL
-export([http_options/1, http_options/2]).

-type backend_priv() :: {Config :: #{atom() => peer_discovery_config_value()},
SessionId :: string(),
TRef :: timer:tref()}.

%% for tests
-ifdef(TEST).
-compile(export_all).
Expand Down Expand Up @@ -160,10 +165,9 @@ post_registration() ->
send_health_check_pass(),
ok.

-spec lock(Nodes :: [node()]) ->
{ok, Data :: term()} | {error, Reason :: string()}.
-spec pre_discovery() -> {ok, backend_priv()} | {error, string()}.

lock(_Nodes) ->
pre_discovery() ->
M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
?LOG_DEBUG(
"Effective Consul peer discovery configuration: ~tp", [M],
Expand All @@ -172,21 +176,33 @@ lock(_Nodes) ->
case create_session(Node, get_config_key(consul_svc_ttl, M)) of
{ok, SessionId} ->
TRef = start_session_ttl_updater(SessionId),
Now = erlang:system_time(seconds),
EndTime = Now + get_config_key(lock_wait_time, M),
lock(TRef, SessionId, Now, EndTime);
{ok, {M, SessionId, TRef}};
{error, Reason} ->
{error, lists:flatten(io_lib:format("Error while creating a session, reason: ~ts",
[Reason]))}
end.

-spec unlock({SessionId :: string(), TRef :: timer:tref()}) -> ok.
-spec post_discovery(backend_priv()) -> ok.

unlock({SessionId, TRef}) ->
post_discovery({_M, SessionId, TRef}) ->
_ = timer:cancel(TRef),
delete_session(SessionId),
?LOG_DEBUG(
"Stopped session renewal",
#{domain => ?RMQLOG_DOMAIN_PEER_DIS}),
ok.

-spec lock(Nodes :: [node()], BackendPriv :: backend_priv()) ->
{ok, ok} | {error, Reason :: string()}.

lock(_Nodes, {M, SessionId, _TRef}) ->
Now = erlang:system_time(seconds),
EndTime = Now + get_config_key(lock_wait_time, M),
lock(SessionId, Now, EndTime).

-spec unlock(LockData :: ok, backend_priv()) -> ok.

unlock(ok, {_M, SessionId, _TRef}) ->
case release_lock(SessionId) of
{ok, true} ->
ok;
Expand Down Expand Up @@ -613,7 +629,11 @@ create_session(Name, TTL) ->
[{'Name', Name},
{'TTL', rabbit_data_coercion:to_atom(service_ttl(TTL))}]) of
{ok, Response} ->
{ok, get_session_id(Response)};
SessionId = get_session_id(Response),
?LOG_DEBUG(
"Consul created session ID: ~s", [SessionId],
#{domain => ?RMQLOG_DOMAIN_PEER_DIS}),
{ok, SessionId};
{error, _} = Err ->
Err
end.
Expand Down Expand Up @@ -645,6 +665,31 @@ consul_session_create(Query, Headers, Body) ->
Err
end.

%%--------------------------------------------------------------------
%% @private
%% @doc
%% Delete a session
%% @end
%%--------------------------------------------------------------------
-spec delete_session(string()) -> ok.
delete_session(SessionId) ->
M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
Headers = maybe_add_acl([]),
HttpOpts = http_options(M),
Ret = rabbit_peer_discovery_httpc:put(
get_config_key(consul_scheme, M),
get_config_key(consul_host, M),
get_integer_config_key(consul_port, M),
"v1/session/destroy/" ++ SessionId,
[],
Headers,
HttpOpts,
<<>>),
?LOG_DEBUG(
"Consul deleted session: ~p", [Ret],
#{domain => ?RMQLOG_DOMAIN_PEER_DIS}),
ok.

%%--------------------------------------------------------------------
%% @private
%% @doc
Expand Down Expand Up @@ -692,31 +737,27 @@ start_session_ttl_updater(SessionId) ->
%% Tries to acquire lock. If the lock is held by someone else, waits until it
%% is released, or too much time has passed
%% @end
-spec lock(timer:tref(), string(), pos_integer(), pos_integer()) -> {ok, string()} | {error, string()}.
lock(TRef, _, Now, EndTime) when EndTime < Now ->
_ = timer:cancel(TRef),
-spec lock(string(), pos_integer(), pos_integer()) -> {ok, ok} | {error, string()}.
lock(_, Now, EndTime) when EndTime < Now ->
{error, "Acquiring lock taking too long, bailing out"};
lock(TRef, SessionId, _, EndTime) ->
lock(SessionId, _, EndTime) ->
case acquire_lock(SessionId) of
{ok, true} ->
{ok, {SessionId, TRef}};
{ok, ok};
{ok, false} ->
case get_lock_status() of
{ok, {SessionHeld, ModifyIndex}} ->
Wait = max(EndTime - erlang:system_time(seconds), 0),
case wait_for_lock_release(SessionHeld, ModifyIndex, Wait) of
ok ->
lock(TRef, SessionId, erlang:system_time(seconds), EndTime);
lock(SessionId, erlang:system_time(seconds), EndTime);
{error, Reason} ->
_ = timer:cancel(TRef),
{error, lists:flatten(io_lib:format("Error waiting for lock release, reason: ~ts",[Reason]))}
end;
{error, Reason} ->
_ = timer:cancel(TRef),
{error, lists:flatten(io_lib:format("Error obtaining lock status, reason: ~ts", [Reason]))}
end;
{error, Reason} ->
_ = timer:cancel(TRef),
{error, lists:flatten(io_lib:format("Error while acquiring lock, reason: ~ts", [Reason]))}
end.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
-behaviour(rabbit_peer_discovery_backend).

-export([init/0, list_nodes/0, supports_registration/0, register/0, unregister/0,
post_registration/0, lock/1, unlock/1]).
post_registration/0, lock/2, unlock/2,
pre_discovery/0, post_discovery/1]).
-export([send_health_check_pass/0]).
-export([session_ttl_update_callback/1]).

Expand Down Expand Up @@ -42,13 +43,31 @@ unregister() ->
post_registration() ->
?DELEGATE:post_registration().

-spec lock(Nodes :: [node()]) -> {ok, Data :: term()} | {error, Reason :: string()}.
lock(Node) ->
?DELEGATE:lock(Node).
-spec pre_discovery() ->
{ok, BackendPriv :: rabbit_peer_discovery_backend:backend_priv()} |
{error, Reason :: string()}.

-spec unlock({SessionId :: string(), TRef :: timer:tref()}) -> ok.
unlock(Data) ->
?DELEGATE:unlock(Data).
pre_discovery() ->
?DELEGATE:pre_discovery().

-spec post_discovery(BackendPriv :: rabbit_peer_discovery_backend:backend_priv()) ->
ok.

post_discovery(BackendPriv) ->
?DELEGATE:post_discovery(BackendPriv).

-spec lock(Nodes :: [node()],
BackendPriv :: rabbit_peer_discovery_backend:backend_priv()) ->
{ok, ok} | {error, Reason :: string()}.

lock(Node, BackendPriv) ->
?DELEGATE:lock(Node, BackendPriv).

-spec unlock(LockData :: rabbit_peer_discovery_backend:lock_data(),
BackendPriv :: rabbit_peer_discovery_backend:backend_priv()) ->
ok.
unlock(LockData, BackendPriv) ->
?DELEGATE:unlock(LockData, BackendPriv).

-spec send_health_check_pass() -> ok.
send_health_check_pass() ->
Expand Down

0 comments on commit 496cfde

Please sign in to comment.