Skip to content

Commit

Permalink
Remove feature flag code handling potential code server deadlocks
Browse files Browse the repository at this point in the history
These test cases and rpc calls were concerned with deadlocks possible
from modifying the code server process from multiple callers. With the
switch to persistent_term in the parent commits these kinds of deadlocks
are no longer possible.
  • Loading branch information
the-mikedavis committed Apr 13, 2024
1 parent e6b5c36 commit e7de95c
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 254 deletions.
41 changes: 3 additions & 38 deletions deps/rabbit/src/rabbit_ff_controller.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1120,6 +1120,8 @@ collect_inventory_on_nodes(Nodes, Timeout) ->
Rets = inventory_rpcs(Nodes, Timeout),
maps:fold(
fun
(_Node, init_required, {ok, Inventory}) ->
{ok, Inventory};
(Node,
#{feature_flags := FeatureFlags1,
applications := ScannedApps,
Expand All @@ -1144,44 +1146,7 @@ collect_inventory_on_nodes(Nodes, Timeout) ->
end, {ok, Inventory0}, Rets).

inventory_rpcs(Nodes, Timeout) ->
%% We must use `rabbit_ff_registry_wrapper' if it is available to avoid
%% any deadlock with the Code server. If it is unavailable, we fall back
%% to `rabbit_ff_registry'.
%%
%% See commit aacfa1978e24bcacd8de7d06a7c3c5d9d8bd098e and pull request
%% #8155.
Rets0 = rpc_calls(
Nodes,
rabbit_ff_registry_wrapper, inventory, [], Timeout),
OlderNodes = maps:fold(
fun
(Node,
{error,
{exception, undef,
[{rabbit_ff_registry_wrapper,_ , _, _} | _]}},
Acc) ->
[Node | Acc];
(_Node, _Ret, Acc) ->
Acc
end, [], Rets0),
case OlderNodes of
[] ->
Rets0;
_ ->
?LOG_INFO(
"Feature flags: the following nodes run an older version of "
"RabbitMQ causing the "
"\"rabbit_ff_registry_wrapper:inventory[] undefined\" error "
"above: ~2p~n"
"Feature flags: falling back to another method for those "
"nodes; this may trigger a bug causing them to hang",
[lists:sort(OlderNodes)],
#{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
Rets1 = rpc_calls(
OlderNodes,
rabbit_ff_registry, inventory, [], Timeout),
maps:merge(Rets0, Rets1)
end.
rpc_calls(Nodes, rabbit_ff_registry, inventory, [], Timeout).

merge_feature_flags(FeatureFlagsA, FeatureFlagsB) ->
FeatureFlags = maps:merge(FeatureFlagsA, FeatureFlagsB),
Expand Down
216 changes: 0 additions & 216 deletions deps/rabbit/test/feature_flags_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@

registry_general_usage/1,
registry_concurrent_reloads/1,
try_to_deadlock_in_registry_reload_1/1,
try_to_deadlock_in_registry_reload_2/1,
registry_reset/1,
enable_feature_flag_in_a_healthy_situation/1,
enable_unsupported_feature_flag_in_a_healthy_situation/1,
Expand Down Expand Up @@ -104,8 +102,6 @@ groups() ->
[
registry_general_usage,
registry_concurrent_reloads,
try_to_deadlock_in_registry_reload_1,
try_to_deadlock_in_registry_reload_2,
registry_reset
]},
{feature_flags_v2, [], Groups}
Expand Down Expand Up @@ -573,218 +569,6 @@ registry_spammer1(FeatureNames) ->
?assertEqual(FeatureNames, ?list_ff(all)),
registry_spammer1(FeatureNames).

try_to_deadlock_in_registry_reload_1(_Config) ->
rabbit_ff_registry_factory:purge_old_registry(rabbit_ff_registry),
_ = code:delete(rabbit_ff_registry),
?assertEqual(false, code:is_loaded(rabbit_ff_registry)),

FeatureName = ?FUNCTION_NAME,
FeatureProps = #{provided_by => rabbit,
stability => stable},

Parent = self(),

%% Deadlock steps:
%% * Process A acquires the lock first.
%% * Process B loads the registry stub and waits for the lock.
%% * Process A deletes the registry stub and loads the initialized
%% registry.
%% * Process B wants to purge the deleted registry stub by sending a
%% request to the Code server.
%%
%% => Process B waits forever the return from the Code server because the
%% Code server waits for process B to be runnable again to handle the
%% signal.

ProcessA = spawn_link(
fun() ->
%% Process A acquires the lock manually first to
%% ensure the ordering of events. It can be acquired
%% recursively, so the feature flag injection can
%% "acquire" it again and proceed.
ct:pal("Process A: Acquire registry loading lock"),
Lock =
rabbit_ff_registry_factory:registry_loading_lock(),
global:set_lock(Lock, [node()]),
receive proceed -> ok end,

ct:pal(
"Process A: "
"Inject arbitrary feature flag to reload "
"registry"),
rabbit_feature_flags:inject_test_feature_flags(
#{FeatureName => FeatureProps}),

ct:pal("Process A: Release registry loading lock"),
global:del_lock(Lock, [node()]),

ct:pal("Process A: Exiting..."),
erlang:unlink(Parent)
end),
timer:sleep(500),

ProcessB = spawn_link(
fun() ->
%% Process B is the one loading the registry stub and
%% wants to initialize the real registry.
ct:pal(
"Process B: "
"Trigger automatic initial registry load"),
FF = rabbit_ff_registry_wrapper:get(FeatureName),

ct:pal("Process B: Exiting..."),
erlang:unlink(Parent),
Parent ! {self(), FF}
end),
timer:sleep(500),

begin
{_, StacktraceA} = erlang:process_info(ProcessA, current_stacktrace),
{_, StacktraceB} = erlang:process_info(ProcessB, current_stacktrace),
ct:pal(
"Process stacktraces:~n"
" Process A: ~p~n"
" Process B: ~p",
[StacktraceA, StacktraceB])
end,

%% Process A is resumed. Without a proper check, process B would try to
%% purge the copy of the registry it is currently using itself, causing a
%% deadlock because the Code server wants process A to handle a signal, but
%% process A is not runnable.
ProcessA ! proceed,

ct:pal("Waiting for process B to exit"),
receive
{ProcessB, FF} ->
?assertEqual(FeatureProps#{name => FeatureName}, FF),
ok
after 10000 ->
{_, StacktraceB} = erlang:process_info(
ProcessB, current_stacktrace),
ct:pal("Process B stuck; stacktrace: ~p", [StacktraceB]),
error(registry_reload_deadlock)
end.

try_to_deadlock_in_registry_reload_2(_Config) ->
rabbit_ff_registry_factory:purge_old_registry(rabbit_ff_registry),
_ = code:delete(rabbit_ff_registry),
?assertEqual(false, code:is_loaded(rabbit_ff_registry)),

FeatureName = ?FUNCTION_NAME,
FeatureProps = #{provided_by => rabbit,
stability => stable},

ct:pal("Inject arbitrary feature flag to reload registry"),
rabbit_feature_flags:inject_test_feature_flags(
#{FeatureName => FeatureProps},
false),

_ = erlang:process_flag(trap_exit, true),

ct:pal("Parent ~p: Acquire registry loading lock", [self()]),
Lock = rabbit_ff_registry_factory:registry_loading_lock(),
global:set_lock(Lock, [node()]),

Parent = self(),

%% Deadlock steps:
%% * Processes A, B1 and B2 wait for the lock. The registry stub is loaded.
%% * Process B1 acquires the lock.
%% * Process B1 deletes the registry stub and loads the initialized
%% registry.
%% * Process A acquires the lock.
%% * Process A wants to purge the deleted registry stub by sending a
%% request to the Code server.
%%
%% => Process A waits forever the return from the Code server because the
%% Code server waits for process B2 to stop lingering on the deleted
%% registry stub, but process B2 waits for the lock.

ProcessA = spawn_link(
fun() ->
%% Process A acquires the lock automatically as part
%% of requesting an explicit initialization of the
%% registry. Process A doesn't linger on the registry
%% stub.
ct:pal(
"Process A ~p: "
"Trigger manual initial registry load",
[self()]),
rabbit_ff_registry_factory:initialize_registry(),

ct:pal("Process A ~p: Exiting...", [self()]),
erlang:unlink(Parent),
Parent ! {self(), done}
end),

FunB = fun() ->
%% Processes B1 and B2 acquire the lock automatically as
%% part of trying to load the registry as part of they
%% querying a feature flag.
ct:pal(
"Process B ~p: "
"Trigger automatic initial registry load",
[self()]),
_ = rabbit_ff_registry_wrapper:get(FeatureName),

ct:pal(
"Process B ~p: Exiting...",
[self()]),
erlang:unlink(Parent),
Parent ! {self(), done}
end,
ProcessB1 = spawn_link(FunB),
ProcessB2 = spawn_link(FunB),
timer:sleep(500),

%% We use `erlang:suspend_process/1' and `erlang:resume_process/1' to
%% ensure the order in which processes acquire the lock.
erlang:suspend_process(ProcessA),
erlang:suspend_process(ProcessB1),
erlang:suspend_process(ProcessB2),
timer:sleep(500),

ct:pal("Parent ~p: Release registry loading lock", [self()]),
global:del_lock(Lock, [node()]),

erlang:resume_process(ProcessB1),
timer:sleep(500),
erlang:resume_process(ProcessA),
timer:sleep(500),
erlang:resume_process(ProcessB2),

ct:pal("Waiting for processes to exit"),
Procs = [ProcessA, ProcessB1, ProcessB2],
lists:foreach(
fun(Pid) ->
receive
{Pid, done} ->
ok;
{'EXIT', Pid, Reason} ->
ct:pal("Process ~p exited; reason: ~p", [Pid, Reason]),
error(test_process_killed)
after 10000 ->
lists:foreach(
fun(Pid1) ->
PI = erlang:process_info(
Pid1, current_stacktrace),
case PI of
undefined ->
ok;
{_, Stacktrace} ->
ct:pal(
"Process ~p stuck; "
"stacktrace: ~p",
[Pid1, Stacktrace])
end
end, Procs),
error(registry_reload_deadlock)
end
end, Procs),

ok.

registry_reset(_Config) ->
%% At first, the registry must be uninitialized.
?assertNot(rabbit_ff_registry:is_registry_initialized()),
Expand Down

0 comments on commit e7de95c

Please sign in to comment.