Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Recovery mode in a new force_restart_server call/3 #308

Closed
wants to merge 13 commits into from
44 changes: 36 additions & 8 deletions src/ra.erl
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
restart_server/1,
restart_server/2,
restart_server/3,
force_restart_server/3,
force_restart_server/4,
% deprecated
stop_server/1,
stop_server/2,
Expand Down Expand Up @@ -168,13 +170,7 @@ restart_server(ServerId) ->
ok | {error, term()}.
restart_server(System, ServerId)
when is_atom(System) ->
% don't match on return value in case it is already running
case catch ra_server_sup_sup:restart_server(System, ServerId, #{}) of
{ok, _} -> ok;
{ok, _, _} -> ok;
{error, _} = Err -> Err;
{'EXIT', Err} -> {error, Err}
end.
restart_server(System, ServerId, #{}).

%% @doc Restarts a previously successfully started ra server
%% @param System the system identifier
Expand All @@ -198,6 +194,38 @@ restart_server(System, ServerId, AddConfig)
{'EXIT', Err} -> {error, Err}
end.

%% @doc Restarts a previously successfully started ra server in a new cluster configuration
%% through inclusion filter of nodes.
%% This function call is designed mostly for a data recovery purposes.
%% Allows to forcely restart a server in a new restricted membership configuration.
%% @param System the system identifier
%% @param ServerId the ra_server_id() of the server
%% @param FilterNodes to test against membership configuration
%% @returns the same ra:restart_server/2
%% @end

-spec force_restart_server(atom(), ra_server_id(), [node()]) ->
ok | {error, term()}.
force_restart_server(System, ServerId, FilterNodes)
when is_list(FilterNodes) ->
force_restart_server(System, ServerId, FilterNodes, #{}).

%% @doc Same as `force_restart_server/3' but accepts additional config parameters
%% @see ra:restart_server/3.
%% @param System the system identifier
%% @param ServerId the ra_server_id() of the server
%% @param FilterNodes to test against membership configuration
%% @param AddConfig additional config parameters to be merged into the
%% original config.
%% @returns the same ra:restart_server/2
%% @end

-spec force_restart_server(atom(), ra_server_id(), [node()], ra_server:mutable_config()) ->
ok | {error, term()}.
force_restart_server(System, ServerId, FilterNodes, AddConfig)
when is_list(FilterNodes) ->
restart_server(System, ServerId, AddConfig#{filter_nodes => FilterNodes}).

%% @doc Stops a ra server in the default system
%% @param ServerId the ra_server_id() of the server
%% @returns `{ok | error, nodedown}'
Expand Down Expand Up @@ -446,7 +474,7 @@ start_cluster(System, [#{cluster_name := ClusterName} | _] = ServerConfigs,
end
end.

%% @doc Starts a new distributed ra cluster.
%% @doc Starts server in a new distributed ra cluster.
%% @param ClusterName the name of the cluster.
%% @param ServerId the ra_server_id() of the server
%% @param Machine The {@link ra_machine:machine/0} configuration.
Expand Down
61 changes: 42 additions & 19 deletions src/ra_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@
query_index := non_neg_integer(),
queries_waiting_heartbeats := queue:queue({non_neg_integer(), consistent_query_ref()}),
pending_consistent_queries := [consistent_query_ref()],
commit_latency => 'maybe'(non_neg_integer())
commit_latency => 'maybe'(non_neg_integer()),
filter_nodes => 'maybe'([node()])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not clear from the name what nodes are being filtered, or whether these nodes act as filters, or what kind of filtering will be performed.

}.

-type ra_state() :: leader | follower | candidate
Expand Down Expand Up @@ -201,7 +202,9 @@
install_snap_rpc_timeout => non_neg_integer(), % ms
await_condition_timeout => non_neg_integer(),
max_pipeline_count => non_neg_integer(),
ra_event_formatter => {module(), atom(), [term()]}}.
ra_event_formatter => {module(), atom(), [term()]},
%% distribution setup
filter_nodes => 'maybe'([node()])}.

-type config() :: ra_server_config().

Expand Down Expand Up @@ -333,7 +336,8 @@ init(#{id := Id,
aux_state => ra_machine:init_aux(MacMod, Name),
query_index => 0,
queries_waiting_heartbeats => queue:new(),
pending_consistent_queries => []}.
pending_consistent_queries => [],
filter_nodes => maps:get(filter_nodes, Config, undefined)}.

recover(#{cfg := #cfg{log_id = LogId,
machine_version = MacVer,
Expand All @@ -343,7 +347,7 @@ recover(#{cfg := #cfg{log_id = LogId,
?DEBUG("~s: recovering state machine version ~b:~b from index ~b to ~b",
[LogId, EffMacVer, MacVer, LastApplied, CommitIndex]),
Before = erlang:system_time(millisecond),
{#{log := Log0} = State, _} =
{#{log := Log0} = State1, _} =
apply_to(CommitIndex,
fun(E, S) ->
%% Clear out the effects to avoid building
Expand All @@ -359,9 +363,12 @@ recover(#{cfg := #cfg{log_id = LogId,
[LogId, EffMacVer, MacVer, LastApplied, CommitIndex, After - Before]),
%% disable segment read cache by setting random access pattern
Log = ra_log:release_resources(1, random, Log0),
State#{log => Log,
%% reset commit latency as recovery may calculate a very old value
commit_latency => 0}.

State2 = State1#{cluster => validate_cluster(State1),
log => Log,
%% reset commit latency as recovery may calculate a very old value
commit_latency => 0},
State2.

-spec handle_leader(ra_msg(), ra_server_state()) ->
{ra_state(), ra_server_state(), effects()}.
Expand Down Expand Up @@ -1231,16 +1238,17 @@ handle_receive_snapshot(#install_snapshot_rpc{term = Term,
end,

{#{cluster := ClusterIds}, MacState} = ra_log:recover_snapshot(Log),
State = State0#{cfg => Cfg,
log => Log,
current_term => Term,
commit_index => SnapIndex,
last_applied => SnapIndex,
cluster => make_cluster(Id, ClusterIds),
machine_state => MacState},
State1 = State0#{cfg => Cfg,
log => Log,
current_term => Term,
commit_index => SnapIndex,
last_applied => SnapIndex,
cluster => make_cluster(Id, ClusterIds),
machine_state => MacState},
State2 = State1#{ cluster => validate_cluster(State1) },
%% it was the last snapshot chunk so we can revert back to
%% follower status
{follower, persist_last_applied(State), [{reply, Reply} | Effs]};
{follower, persist_last_applied(State2), [{reply, Reply} | Effs]};
next ->
Log = ra_log:set_snapshot_state(SnapState, Log0),
State = State0#{log => Log},
Expand Down Expand Up @@ -1452,7 +1460,6 @@ machine_query(QueryFun, #{cfg := #cfg{effective_machine_module = MacMod},
{{Last, Term}, Res}.



% Internal

become(leader, #{cluster := Cluster, log := Log0} = State) ->
Expand Down Expand Up @@ -2107,6 +2114,20 @@ make_cluster(Self, Nodes) ->
Cluster#{Self => new_peer()}
end.

validate_cluster(State) ->
Filter = maps:get(filter_nodes, State),
Cluster0 = maps:get(cluster, State),
case Filter of
undefined ->
Cluster0;
_ ->
maps:filter(fun ({Name, Node}, _) -> Res = lists:member(Node, Filter),
Res orelse ?INFO("~p is filtered on node: ~s", [Name, Node]),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find this log message to be confusing. "[machine name] is filtered on node […]" does not really tell the user about the consequences.

Res
end, Cluster0)
end.


initialise_peers(State = #{log := Log, cluster := Cluster0}) ->
PeerIds = peer_ids(State),
NextIdx = ra_log:next_index(Log),
Expand Down Expand Up @@ -2209,9 +2230,11 @@ apply_with({Idx, Term, {'$ra_cluster_change', CmdMeta, NewCluster, ReplyType}},
?DEBUG("~s: applying ra cluster change to ~w",
[log_id(State0), maps:keys(NewCluster)]),
%% we are recovering and should apply the cluster change
State0#{cluster => NewCluster,
cluster_change_permitted => true,
cluster_index_term => {Idx, Term}};
State1 = State0#{cluster => NewCluster,
cluster_change_permitted => true,
cluster_index_term => {Idx, Term}},
State2 = State1#{cluster => validate_cluster(State1)},
State2;
_ ->
?DEBUG("~s: committing ra cluster change to ~w",
[log_id(State0), maps:keys(NewCluster)]),
Expand Down
3 changes: 2 additions & 1 deletion src/ra_server_sup_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
install_snap_rpc_timeout,
await_condition_timeout,
max_pipeline_count,
ra_event_formatter]).
ra_event_formatter,
filter_nodes]).

%% API functions
-export([start_server/2,
Expand Down
24 changes: 20 additions & 4 deletions test/nemesis.erl
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ handle_info(next_step, State0) ->
{noreply, State}
end.


terminate(_Reason, _State) ->
ok.

Expand All @@ -101,6 +100,7 @@ partition(Partitions) ->
handle_step(#state{steps = [{wait, Time} | Rem]} = State) ->
erlang:send_after(Time, self(), next_step),
State#state{steps = Rem};

handle_step(#state{steps = [{part, Partition0, Time} | Rem],
nodes = Nodes} = State) ->
%% first we need to always heal
Expand All @@ -110,9 +110,16 @@ handle_step(#state{steps = [{part, Partition0, Time} | Rem],
ok = partition(Partitions),
% always heal after part
State#state{steps = [heal | Rem]};

handle_step(#state{steps = [heal | Rem]} = State) ->
heal(State#state.nodes),
handle_step(State#state{steps = Rem});

handle_step(#state{steps = [{app_stop, Servers} | Rem]} = State) ->
ct:pal("doing app stop of ~w", [Servers]),
[begin rpc:call(N, application, stop, [ra]) end || {_, N} <- Servers],
handle_step(State#state{steps = Rem});

handle_step(#state{steps = [{app_restart, Servers} | Rem]} = State) ->
ct:pal("doing app restart of ~w", [Servers]),
[begin
Expand All @@ -121,10 +128,19 @@ handle_step(#state{steps = [{app_restart, Servers} | Rem]} = State) ->
rpc:call(N, ra, restart_server, [?SYS, Id])
end || {_, N} = Id <- Servers],
handle_step(State#state{steps = Rem});
handle_step(#state{steps = []}) ->
done.

handle_step(#state{steps = [{app_start, Servers} | Rem]} = State) ->
ct:pal("doing app start of ~w", [Servers]),
[begin rpc:call(N, ra, start, []) end || {_, N} <- Servers],
handle_step(State#state{steps = Rem});

handle_step(#state{steps = [{force_restart_server, Servers, FilterNodes} | Rem]} = State) ->
ct:pal("doing app restart of ~w", [Servers]),
[begin rpc:call(N, ra, force_restart_server, [?SYS, Id, FilterNodes]) end || {_, N} = Id <- Servers],
handle_step(State#state{steps = Rem});

handle_step(#state{steps = []}) ->
done.

partition({Partition1, Partition2}, PartitionFun) ->
lists:foreach(
Expand All @@ -141,4 +157,4 @@ heal(Nodes) ->
Node <- Nodes,
OtherNode =/= Node],
[net_kernel:connect_node(N)
|| N <- Nodes].
|| N <- Nodes].
28 changes: 25 additions & 3 deletions test/partitions_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ all() -> [

groups() ->
Tests = [
enq_drain_basic
enq_drain_basic,
enq_drain_recovery
% prop_enq_drain
],
[{tests, [], Tests}].
Expand Down Expand Up @@ -104,6 +105,19 @@ enq_drain_basic(Config) ->
{wait, 5000}],
true = do_enq_drain_scenario(ClusterName, Nodes, Servers, Scenario).

enq_drain_recovery(Config) ->
ClusterName = ?config(cluster_name, Config),
Nodes = ?config(nodes, Config),
DC0 = select_some(Nodes, 1.5),
DC1 = Nodes -- DC0,
Servers0 = ?config(servers, Config),
Servers1 = [S || S <- Servers0, lists:member(S, Servers0)],
ct:pal("DC0: ~p DC1: ~p",[DC0, DC1]),
Scenario = [{wait, 5000},
{app_stop, DC0},
{force_restart_server, DC1, DC1}],
true = do_enq_drain_scenario(ClusterName, Nodes, Servers1, Scenario).

do_enq_drain_scenario(ClusterName, Nodes, Servers, Scenario) ->
ct:pal("Running ~p", [Scenario]),
NemConf = #{nodes => Nodes,
Expand Down Expand Up @@ -169,7 +183,10 @@ validate_machine_state(Servers, Num) ->
end.

select_some(Servers) ->
N = trunc(length(Servers) / 2),
select_some(Servers, 2).

select_some(Servers, Count) ->
N = trunc(length(Servers) / Count),
element(1,
lists:foldl(fun (_, {Selected, Rem0}) ->
{S, Rem} = random_element(Rem0),
Expand All @@ -186,12 +203,17 @@ scenario_time([heal | Rest], Acc) ->
scenario_time(Rest, Acc);
scenario_time([{app_restart, _} | Rest], Acc) ->
scenario_time(Rest, Acc + 100);
scenario_time([{app_stop, _} | Rest], Acc) ->
scenario_time(Rest, Acc + 100);
scenario_time([{app_start, _} | Rest], Acc) ->
scenario_time(Rest, Acc + 100);
scenario_time([{_, T} | Rest], Acc) ->
scenario_time(Rest, Acc + T);
scenario_time([{force_restart_server, _, _} | Rest], Acc) ->
scenario_time(Rest, Acc + 100);
scenario_time([{_, _, T} | Rest], Acc) ->
scenario_time(Rest, Acc + T).


drain(ClusterName, Nodes) ->
ct:pal("draining ~w", [ClusterName]),
F = ra_fifo_client:init(ClusterName, Nodes),
Expand Down
33 changes: 33 additions & 0 deletions test/ra_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ all_tests() ->
stop_server_idemp,
minority,
start_servers,
force_restart,
server_recovery,
process_command,
pipeline_command,
Expand Down Expand Up @@ -271,6 +272,36 @@ start_servers(Config) ->
{ok, _, _} = ra:process_command(N3, 5, ?PROCESS_COMMAND_TIMEOUT),
terminate_cluster([N1, N2, N3] -- [element(1, Target)]).

force_restart(Config) ->
%% Sanity check to guarantee that default configuration doesn't break cluster consistency
%%

Name = ?config(test_name, Config),

N1 = nth_server_name(Config, 1),
N2 = nth_server_name(Config, 2),
N3 = nth_server_name(Config, 3),

Nodes = [ N1, N2,
N3
],

{ok, Res, Failed} = ra:start_cluster(default, Name, _Machine = add_machine(), Nodes),

{ok, 10, _} = ra:process_command(N1, 10),
{ok, 15, _} = ra:process_command(N2, 5),
{ok, 20, _} = ra:process_command(N3, 5),

[_, _, _] = terminate_cluster(Nodes),

[ begin ok = ra:force_restart_server(default, N, [node()]),
ok
end || N <- Nodes
],

{ok, 25, _} = ra:process_command(N1, 5),
{ok, 30, _} = ra:process_command(N2, 5),
{ok, 35, _} = ra:process_command(N3, 5).

server_recovery(Config) ->
N1 = nth_server_name(Config, 1),
Expand All @@ -296,6 +327,8 @@ server_recovery(Config) ->
{ok, _, _Leader} = ra:process_command(N, 5, ?PROCESS_COMMAND_TIMEOUT),
terminate_cluster([N1, N2]).

%%

process_command(Config) ->
[A, _B, _C] = Cluster =
start_local_cluster(3, ?config(test_name, Config),
Expand Down