Skip to content

Commit

Permalink
rabbitmq_peer_discovery_consul: Select the node to join
Browse files Browse the repository at this point in the history
[Why]
The default node selection of the peer discovery subsystem doesn't work
well with Consul. The reason is that that selection is based on the
nodes' uptime. However, the node with the highest uptime may not be the
first to register in Consul.

When this happens, the node that registered first will only discover
itself and boot as a standalone node. Then, the node with the highest
uptime will discover both of them, but will select itself as the node to
join because of its uptime. In the end, we end up with two clusters
instead of one.

[How]
We use the `CreateIndex` property in the Consul response to sort
services. We then derive the name of the node to join after the service
that has the lower `CreateIndex`, meaning it was the first to register.
  • Loading branch information
dumbbell committed May 3, 2024
1 parent 81110ef commit e47ca00
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,9 @@ list_nodes() ->
HttpOpts) of
{ok, Nodes} ->
IncludeWithWarnings = get_config_key(consul_include_nodes_with_warnings, M),
Result = extract_nodes(
filter_nodes(Nodes, IncludeWithWarnings)),
Result = extract_node(
sort_nodes(
filter_nodes(Nodes, IncludeWithWarnings))),
{ok, {Result, disc}};
{error, _} = Error ->
Error
Expand Down Expand Up @@ -276,13 +277,24 @@ filter_nodes(Nodes, Warn) ->
false -> Nodes
end.

-spec extract_nodes(ConsulResult :: [#{binary() => term()}]) -> list().
extract_nodes(Data) -> extract_nodes(Data, []).

-spec extract_nodes(ConsulResult :: [#{binary() => term()}], Nodes :: list())
-> list().
extract_nodes([], Nodes) -> Nodes;
extract_nodes([H | T], Nodes) ->
-spec sort_nodes(ConsulResult :: [#{binary() => term()}]) -> [#{binary() => term()}].
sort_nodes(Nodes) ->
lists:sort(
fun(NodeA, NodeB) ->
IndexA = maps:get(
<<"CreateIndex">>,
maps:get(<<"Service">>, NodeA, #{}), undefined),
IndexB = maps:get(
<<"CreateIndex">>,
maps:get(<<"Service">>, NodeB, #{}), undefined),
%% `undefined' is always greater than an integer, so we are fine here.
IndexA =< IndexB
end, Nodes).

-spec extract_node(ConsulResult :: [#{binary() => term()}]) -> list().
extract_node([]) ->
[];
extract_node([H | _]) ->
Service = maps:get(<<"Service">>, H),
Meta = maps:get(<<"Meta">>, Service, #{}),
NodeName = case Meta of
Expand All @@ -299,7 +311,7 @@ extract_nodes([H | T], Nodes) ->
?UTIL_MODULE:node_name(Address)
end
end,
extract_nodes(T, lists:merge(Nodes, [NodeName])).
NodeName.

-spec maybe_add_acl(QArgs :: list()) -> list().
maybe_add_acl(List) ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ list_nodes_return_value_basic_test(_Config) ->
rabbit_json:try_decode(rabbit_data_coercion:to_binary(Body))
end),
meck:expect(rabbit_nodes, name_type, fun() -> shortnames end),
?assertEqual({ok, {['rabbit@rabbit1', 'rabbit@rabbit2'], disc}},
?assertEqual({ok, {'rabbit@rabbit2', disc}},
rabbit_peer_discovery_consul:list_nodes()),
?assert(meck:validate(rabbit_peer_discovery_httpc)).

Expand Down Expand Up @@ -388,7 +388,7 @@ list_nodes_return_value_basic_long_node_name_test(_Config) ->
rabbit_json:try_decode(rabbit_data_coercion:to_binary(Body))
end),
meck:expect(rabbit_nodes, name_type, fun() -> longnames end),
?assertEqual({ok, {['rabbit@rabbit1.node.consul', 'rabbit@rabbit2.node.consul'], disc}},
?assertEqual({ok, {'rabbit@rabbit2.node.consul', disc}},
rabbit_peer_discovery_consul:list_nodes()),
?assert(meck:validate(rabbit_peer_discovery_httpc)).

Expand Down Expand Up @@ -419,7 +419,7 @@ list_nodes_return_value_long_node_name_and_custom_domain_test(_Config) ->


meck:expect(rabbit_nodes, name_type, fun() -> longnames end),
?assertEqual({ok, {['rabbit@rabbit1.node.internal', 'rabbit@rabbit2.node.internal'], disc}},
?assertEqual({ok, {'rabbit@rabbit2.node.internal', disc}},
rabbit_peer_discovery_consul:list_nodes()),
?assert(meck:validate(rabbit_peer_discovery_httpc)).

Expand All @@ -446,7 +446,7 @@ list_nodes_return_value_srv_address_test(_Config) ->
Body = "[{\"Node\": {\"Node\": \"rabbit2.internal.domain\", \"Address\": \"10.20.16.160\"}, \"Checks\": [{\"Node\": \"rabbit2.internal.domain\", \"CheckID\": \"service:rabbitmq\", \"Name\": \"Service \'rabbitmq\' check\", \"ServiceName\": \"rabbitmq\", \"Notes\": \"Connect to the port internally every 30 seconds\", \"Status\": \"passing\", \"ServiceID\": \"rabbitmq:172.172.16.4.50\", \"Output\": \"\"}, {\"Node\": \"rabbit2.internal.domain\", \"CheckID\": \"serfHealth\", \"Name\": \"Serf Health Status\", \"ServiceName\": \"\", \"Notes\": \"\", \"Status\": \"passing\", \"ServiceID\": \"\", \"Output\": \"Agent alive and reachable\"}], \"Service\": {\"Address\": \"172.16.4.51\", \"Port\": 5672, \"ID\": \"rabbitmq:172.16.4.51\", \"Service\": \"rabbitmq\", \"Tags\": [\"amqp\"]}}, {\"Node\": {\"Node\": \"rabbit1.internal.domain\", \"Address\": \"10.20.16.159\"}, \"Checks\": [{\"Node\": \"rabbit1.internal.domain\", \"CheckID\": \"service:rabbitmq\", \"Name\": \"Service \'rabbitmq\' check\", \"ServiceName\": \"rabbitmq\", \"Notes\": \"Connect to the port internally every 30 seconds\", \"Status\": \"passing\", \"ServiceID\": \"rabbitmq\", \"Output\": \"\"}, {\"Node\": \"rabbit1.internal.domain\", \"CheckID\": \"serfHealth\", \"Name\": \"Serf Health Status\", \"ServiceName\": \"\", \"Notes\": \"\", \"Status\": \"passing\", \"ServiceID\": \"\", \"Output\": \"Agent alive and reachable\"}], \"Service\": {\"Address\": \"172.172.16.51\", \"Port\": 5672, \"ID\": \"rabbitmq:172.172.16.51\", \"Service\": \"rabbitmq\", \"Tags\": [\"amqp\"]}}]",
rabbit_json:try_decode(rabbit_data_coercion:to_binary(Body))
end),
?assertEqual({ok, {['rabbit@172.16.4.51', 'rabbit@172.172.16.51'], disc}},
?assertEqual({ok, {'rabbit@172.16.4.51', disc}},
rabbit_peer_discovery_consul:list_nodes()),
?assert(meck:validate(rabbit_peer_discovery_httpc)).

Expand Down Expand Up @@ -475,7 +475,7 @@ list_nodes_return_value_nodes_in_warning_state_included_test(_Config) ->
rabbit_json:try_decode(list_of_nodes_without_warnings())
end),
os:putenv("CONSUL_INCLUDE_NODES_WITH_WARNINGS", "true"),
?assertEqual({ok, {['rabbit@172.16.4.51'], disc}},
?assertEqual({ok, {'rabbit@172.16.4.51', disc}},
rabbit_peer_discovery_consul:list_nodes()),
?assert(meck:validate(rabbit_peer_discovery_httpc)).

Expand Down Expand Up @@ -504,7 +504,7 @@ list_nodes_return_value_nodes_in_warning_state_filtered_out_test(_Config) ->
rabbit_json:try_decode(list_of_nodes_without_warnings())
end),
os:putenv("CONSUL_INCLUDE_NODES_WITH_WARNINGS", "false"),
?assertEqual({ok, {['rabbit@172.16.4.51', 'rabbit@172.172.16.51'], disc}},
?assertEqual({ok, {'rabbit@172.16.4.51', disc}},
rabbit_peer_discovery_consul:list_nodes()),
?assert(meck:validate(rabbit_peer_discovery_httpc)).

Expand Down

0 comments on commit e47ca00

Please sign in to comment.