Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add reply-from reply-modes to ra:pipeline_command/4 #322

Open
the-mikedavis opened this issue Nov 2, 2022 · 0 comments
Open

Add reply-from reply-modes to ra:pipeline_command/4 #322

the-mikedavis opened this issue Nov 2, 2022 · 0 comments

Comments

@the-mikedavis
Copy link
Member

#314 adds new reply mode options for ra:process_command/3 which controls which member of the Ra cluster replies with gen_statem:reply/2 to a call:

  • local - reply from a node local to the calling process
  • {member, Member} - reply from the given member

which fall back to the leader if the given member is not in cluster membership. These are useful for blocking a caller until a command has been handled by the given member.

These should be added to ra:pipeline_command/4 as well to control which member of the cluster sends notifications. This would involve some refactoring of how notifications are stored within the cluster state (in ra_server_proc's #state.pending_notifys field) and the shape of the notify ra_server:effect():

{notify, #{pid() => [term()]}} |

Currently, notifications are grouped by pid:

ra/src/ra_server.erl

Lines 2310 to 2321 in 09cc606

add_reply(_, Reply, {notify, Corr, Pid},
Effects, Notifys) ->
% notify are casts and thus have to include their own pid()
% reply with the supplied correlation so that the sending can do their
% own bookkeeping
CorrData = {Corr, Reply},
case Notifys of
#{Pid := T} ->
{Effects, Notifys#{Pid => [CorrData | T]}};
_ ->
{Effects, Notifys#{Pid => [CorrData]}}
end;

and replied to in batches:

ra/src/ra_server_proc.erl

Lines 1708 to 1731 in 09cc606

send_applied_notifications(#state{pending_notifys = PendingNots} = State,
Nots0) when map_size(PendingNots) > 0 ->
Nots = ra_lib:maps_merge_with(fun(_K, V1, V2) ->
V1 ++ V2
end, PendingNots, Nots0),
send_applied_notifications(State#state{pending_notifys = #{}}, Nots);
send_applied_notifications(#state{} = State, Nots) ->
Id = id(State),
%% any notifications that could not be sent
%% will be kept and retried
RemNots = maps:filter(
fun(Who, Correlations0) ->
%% correlations are build up in reverse order so we need
%% to reverse before sending
Correlations = lists:reverse(Correlations0),
ok =/= send_ra_event(Who, Correlations, Id,
applied, State)
end, Nots),
case map_size(RemNots) of
0 ->
State;
_ ->
State#state{pending_notifys = RemNots}
end.

This will need to be refactored so that notifications are grouped by pid and reply-from.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant