-
Notifications
You must be signed in to change notification settings - Fork 91
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Recovery mode in a new force_restart_server call/3 #308
Closed
Closed
Changes from all commits
Commits
Show all changes
13 commits
Select commit
Hold shift + click to select a range
31b54cc
Added distribition config
erlmachinedev 4217e0c
Documentation fix
erlmachinedev 7b085ea
Removed repeated code
erlmachinedev 64548d6
Added force start methods
erlmachinedev a362453
Code spaces fix
erlmachinedev 8dd58ee
Fixed and old state argument
erlmachinedev 79cd883
Added API test
erlmachinedev 9188bc5
Intendation format
erlmachinedev e8c7b22
Sanity check on test (force_restart)
erlmachinedev 95d9e68
Add docs sections
erlmachinedev 96c3127
Added recovery SUITE
erlmachinedev 3c4a3b1
Improved cluster through invaliadtion pointt
erlmachinedev 550ba1a
Format improvement
erlmachinedev File filter
Filter by extension
Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -84,7 +84,8 @@ | |
query_index := non_neg_integer(), | ||
queries_waiting_heartbeats := queue:queue({non_neg_integer(), consistent_query_ref()}), | ||
pending_consistent_queries := [consistent_query_ref()], | ||
commit_latency => 'maybe'(non_neg_integer()) | ||
commit_latency => 'maybe'(non_neg_integer()), | ||
filter_nodes => 'maybe'([node()]) | ||
}. | ||
|
||
-type ra_state() :: leader | follower | candidate | ||
|
@@ -201,7 +202,9 @@ | |
install_snap_rpc_timeout => non_neg_integer(), % ms | ||
await_condition_timeout => non_neg_integer(), | ||
max_pipeline_count => non_neg_integer(), | ||
ra_event_formatter => {module(), atom(), [term()]}}. | ||
ra_event_formatter => {module(), atom(), [term()]}, | ||
%% distribution setup | ||
filter_nodes => 'maybe'([node()])}. | ||
|
||
-type config() :: ra_server_config(). | ||
|
||
|
@@ -333,7 +336,8 @@ init(#{id := Id, | |
aux_state => ra_machine:init_aux(MacMod, Name), | ||
query_index => 0, | ||
queries_waiting_heartbeats => queue:new(), | ||
pending_consistent_queries => []}. | ||
pending_consistent_queries => [], | ||
filter_nodes => maps:get(filter_nodes, Config, undefined)}. | ||
|
||
recover(#{cfg := #cfg{log_id = LogId, | ||
machine_version = MacVer, | ||
|
@@ -343,7 +347,7 @@ recover(#{cfg := #cfg{log_id = LogId, | |
?DEBUG("~s: recovering state machine version ~b:~b from index ~b to ~b", | ||
[LogId, EffMacVer, MacVer, LastApplied, CommitIndex]), | ||
Before = erlang:system_time(millisecond), | ||
{#{log := Log0} = State, _} = | ||
{#{log := Log0} = State1, _} = | ||
apply_to(CommitIndex, | ||
fun(E, S) -> | ||
%% Clear out the effects to avoid building | ||
|
@@ -359,9 +363,12 @@ recover(#{cfg := #cfg{log_id = LogId, | |
[LogId, EffMacVer, MacVer, LastApplied, CommitIndex, After - Before]), | ||
%% disable segment read cache by setting random access pattern | ||
Log = ra_log:release_resources(1, random, Log0), | ||
State#{log => Log, | ||
%% reset commit latency as recovery may calculate a very old value | ||
commit_latency => 0}. | ||
|
||
State2 = State1#{cluster => validate_cluster(State1), | ||
log => Log, | ||
%% reset commit latency as recovery may calculate a very old value | ||
commit_latency => 0}, | ||
State2. | ||
|
||
-spec handle_leader(ra_msg(), ra_server_state()) -> | ||
{ra_state(), ra_server_state(), effects()}. | ||
|
@@ -1231,16 +1238,17 @@ handle_receive_snapshot(#install_snapshot_rpc{term = Term, | |
end, | ||
|
||
{#{cluster := ClusterIds}, MacState} = ra_log:recover_snapshot(Log), | ||
State = State0#{cfg => Cfg, | ||
log => Log, | ||
current_term => Term, | ||
commit_index => SnapIndex, | ||
last_applied => SnapIndex, | ||
cluster => make_cluster(Id, ClusterIds), | ||
machine_state => MacState}, | ||
State1 = State0#{cfg => Cfg, | ||
log => Log, | ||
current_term => Term, | ||
commit_index => SnapIndex, | ||
last_applied => SnapIndex, | ||
cluster => make_cluster(Id, ClusterIds), | ||
machine_state => MacState}, | ||
State2 = State1#{ cluster => validate_cluster(State1) }, | ||
%% it was the last snapshot chunk so we can revert back to | ||
%% follower status | ||
{follower, persist_last_applied(State), [{reply, Reply} | Effs]}; | ||
{follower, persist_last_applied(State2), [{reply, Reply} | Effs]}; | ||
next -> | ||
Log = ra_log:set_snapshot_state(SnapState, Log0), | ||
State = State0#{log => Log}, | ||
|
@@ -1452,7 +1460,6 @@ machine_query(QueryFun, #{cfg := #cfg{effective_machine_module = MacMod}, | |
{{Last, Term}, Res}. | ||
|
||
|
||
|
||
% Internal | ||
|
||
become(leader, #{cluster := Cluster, log := Log0} = State) -> | ||
|
@@ -2107,6 +2114,20 @@ make_cluster(Self, Nodes) -> | |
Cluster#{Self => new_peer()} | ||
end. | ||
|
||
validate_cluster(State) -> | ||
Filter = maps:get(filter_nodes, State), | ||
Cluster0 = maps:get(cluster, State), | ||
case Filter of | ||
undefined -> | ||
Cluster0; | ||
_ -> | ||
maps:filter(fun ({Name, Node}, _) -> Res = lists:member(Node, Filter), | ||
Res orelse ?INFO("~p is filtered on node: ~s", [Name, Node]), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I find this log message to be confusing. "[machine name] is filtered on node […]" does not really tell the user about the consequences. |
||
Res | ||
end, Cluster0) | ||
end. | ||
|
||
|
||
initialise_peers(State = #{log := Log, cluster := Cluster0}) -> | ||
PeerIds = peer_ids(State), | ||
NextIdx = ra_log:next_index(Log), | ||
|
@@ -2209,9 +2230,11 @@ apply_with({Idx, Term, {'$ra_cluster_change', CmdMeta, NewCluster, ReplyType}}, | |
?DEBUG("~s: applying ra cluster change to ~w", | ||
[log_id(State0), maps:keys(NewCluster)]), | ||
%% we are recovering and should apply the cluster change | ||
State0#{cluster => NewCluster, | ||
cluster_change_permitted => true, | ||
cluster_index_term => {Idx, Term}}; | ||
State1 = State0#{cluster => NewCluster, | ||
cluster_change_permitted => true, | ||
cluster_index_term => {Idx, Term}}, | ||
State2 = State1#{cluster => validate_cluster(State1)}, | ||
State2; | ||
_ -> | ||
?DEBUG("~s: committing ra cluster change to ~w", | ||
[log_id(State0), maps:keys(NewCluster)]), | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not clear from the name what nodes are being filtered, or whether these nodes act as filters, or what kind of filtering will be performed.