Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Close #10345. Add prometheus_rabbitmq_federation_collector. #10721

Merged
merged 2 commits into from Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions deps/rabbitmq_federation/app.bzl
Expand Up @@ -201,6 +201,7 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
hdrs = ["include/rabbit_federation.hrl"],
app_name = "rabbitmq_federation",
erlc_opts = "//:test_erlc_opts",
visibility = ["//visibility:public"],
deps = ["//deps/amqp_client:erlang_app"],
)
erlang_bytecode(
Expand Down
7 changes: 5 additions & 2 deletions deps/rabbitmq_federation/src/rabbit_federation_status.erl
Expand Up @@ -13,7 +13,7 @@

-export([start_link/0]).

-export([report/4, remove_exchange_or_queue/1, remove/2, status/0, lookup/1]).
-export([report/4, remove_exchange_or_queue/1, remove/2, status/0, status/1, lookup/1]).

-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
Expand Down Expand Up @@ -41,7 +41,10 @@ remove(Upstream, XorQName) ->
gen_server:call(?SERVER, {remove, Upstream, XorQName}, infinity).

status() ->
gen_server:call(?SERVER, status, infinity).
status(infinity).

status(Timeout) ->
gen_server:call(?SERVER, status, Timeout).

lookup(Id) ->
gen_server:call(?SERVER, {lookup, Id}, infinity).
Expand Down
11 changes: 11 additions & 0 deletions deps/rabbitmq_prometheus/BUILD.bazel
Expand Up @@ -52,6 +52,7 @@ rabbitmq_app(
priv = [":priv"],
deps = [
"//deps/rabbit:erlang_app",
"//deps/rabbitmq_federation:erlang_app",
"//deps/rabbitmq_management_agent:erlang_app",
"//deps/rabbitmq_web_dispatch:erlang_app",
"@accept//:erlang_app",
Expand Down Expand Up @@ -81,6 +82,7 @@ dialyze(

eunit(
name = "eunit",
compiled_suites = [":rabbitmq_prometheus_collector_test_proxy_beam_files"], #keep
target = ":test_erlang_app",
)

Expand All @@ -97,6 +99,15 @@ rabbitmq_integration_suite(
flaky = True,
)

rabbitmq_integration_suite(
name = "prometheus_rabbitmq_federation_collector_SUITE",
size = "small",
additional_beam = [
"//deps/rabbitmq_federation:test/rabbit_federation_test_util.beam", #keep
"test/rabbitmq_prometheus_collector_test_proxy.beam", #keep
],
)

assert_suites()

alias(
Expand Down
20 changes: 20 additions & 0 deletions deps/rabbitmq_prometheus/app.bzl
Expand Up @@ -13,6 +13,7 @@ def all_beam_files(name = "all_beam_files"):
"src/collectors/prometheus_rabbitmq_alarm_metrics_collector.erl",
"src/collectors/prometheus_rabbitmq_core_metrics_collector.erl",
"src/collectors/prometheus_rabbitmq_dynamic_collector.erl",
"src/collectors/prometheus_rabbitmq_federation_collector.erl",
"src/collectors/prometheus_rabbitmq_global_metrics_collector.erl",
"src/rabbit_prometheus_app.erl",
"src/rabbit_prometheus_dispatcher.erl",
Expand Down Expand Up @@ -44,6 +45,7 @@ def all_test_beam_files(name = "all_test_beam_files"):
"src/collectors/prometheus_rabbitmq_alarm_metrics_collector.erl",
"src/collectors/prometheus_rabbitmq_core_metrics_collector.erl",
"src/collectors/prometheus_rabbitmq_dynamic_collector.erl",
"src/collectors/prometheus_rabbitmq_federation_collector.erl",
"src/collectors/prometheus_rabbitmq_global_metrics_collector.erl",
"src/rabbit_prometheus_app.erl",
"src/rabbit_prometheus_dispatcher.erl",
Expand Down Expand Up @@ -86,6 +88,7 @@ def all_srcs(name = "all_srcs"):
"src/collectors/prometheus_rabbitmq_alarm_metrics_collector.erl",
"src/collectors/prometheus_rabbitmq_core_metrics_collector.erl",
"src/collectors/prometheus_rabbitmq_dynamic_collector.erl",
"src/collectors/prometheus_rabbitmq_federation_collector.erl",
"src/collectors/prometheus_rabbitmq_global_metrics_collector.erl",
"src/rabbit_prometheus_app.erl",
"src/rabbit_prometheus_dispatcher.erl",
Expand Down Expand Up @@ -124,3 +127,20 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
"//deps/rabbitmq_ct_helpers:erlang_app",
],
)
erlang_bytecode(
name = "prometheus_rabbitmq_federation_collector_SUITE_beam_files",
testonly = True,
srcs = ["test/prometheus_rabbitmq_federation_collector_SUITE.erl"],
outs = ["test/prometheus_rabbitmq_federation_collector_SUITE.beam"],
app_name = "rabbitmq_prometheus",
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/amqp_client:erlang_app", "@prometheus//:erlang_app"],
)
erlang_bytecode(
name = "rabbitmq_prometheus_collector_test_proxy_beam_files",
testonly = True,
srcs = ["test/rabbitmq_prometheus_collector_test_proxy.erl"],
outs = ["test/rabbitmq_prometheus_collector_test_proxy.beam"],
app_name = "rabbitmq_prometheus",
erlc_opts = "//:test_erlc_opts",
)
6 changes: 6 additions & 0 deletions deps/rabbitmq_prometheus/metrics.md
Expand Up @@ -258,6 +258,12 @@ These metrics are specific to the stream protocol.
| rabbitmq_raft_log_snapshot_index | Raft log snapshot index |
| rabbitmq_raft_term_total | Current Raft term number |

### Federation

| Metric | Description |
| --- | --- |
| rabbitmq_federation_links | Federations Links count grouped by Link status |

## Telemetry

| Metric | Description |
Expand Down
@@ -0,0 +1,45 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
-module(prometheus_rabbitmq_federation_collector).
-export([deregister_cleanup/1,
collect_mf/2]).

-import(prometheus_model_helpers, [create_mf/4]).

-behaviour(prometheus_collector).

-define(METRICS, [{rabbitmq_federation_links, gauge,
"Current number of federation links."},
]).

%% API exports
-export([]).

%%====================================================================
%% Collector API
%%====================================================================

deregister_cleanup(_) -> ok.

collect_mf(_Registry, Callback) ->
Status = rabbit_federation_status:status(500),
StatusGroups = lists:foldl(fun(S, Acc) ->
%% note Init value set to 1 because if status seen first time
%% update with will take Init and put into Acc, wuthout calling fun
maps:update_with(proplists:get_value(status, S), fun(C) -> C + 1 end, 1, Acc)
end, #{}, Status),
Metrics = [{rabbitmq_federation_links, gauge, "Current number of federation links.",
[{[{status, S}], C} || {S, C} <- maps:to_list(StatusGroups)]}],
_ = [add_metric_family(Metric, Callback) || Metric <- Metrics],
ok.

add_metric_family({Name, Type, Help, Metrics}, Callback) ->
Callback(create_mf(Name, Help, Type, Metrics)).

%%====================================================================
%% Private Parts
%%====================================================================
Expand Up @@ -18,6 +18,7 @@ build_dispatcher() ->
prometheus_rabbitmq_global_metrics_collector,
prometheus_rabbitmq_alarm_metrics_collector,
prometheus_rabbitmq_dynamic_collector,
prometheus_rabbitmq_federation_collector,
prometheus_process_collector]),
prometheus_registry:register_collectors('per-object', [
prometheus_vm_system_info_collector,
Expand Down
@@ -0,0 +1,152 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%

-module(prometheus_rabbitmq_federation_collector_SUITE).

-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-include_lib("prometheus/include/prometheus_model.hrl").

-compile(export_all).

-define(ONE_RUNNING_METRIC, #'MetricFamily'{name = <<"rabbitmq_federation_links">>,
help = "Current number of federation links.",
type = 'GAUGE',
metric = [#'Metric'{label = [#'LabelPair'{name = <<"status">>,
value = <<"running">>}],
gauge = #'Gauge'{value = 1}}]}).

-define(TWO_RUNNING_METRIC, #'MetricFamily'{name = <<"rabbitmq_federation_links">>,
help = "Current number of federation links.",
type = 'GAUGE',
metric = [#'Metric'{label = [#'LabelPair'{name = <<"status">>,
value = <<"running">>}],
gauge = #'Gauge'{value = 2}}]}).

-define(ONE_RUNNING_ONE_STARTING_METRIC, #'MetricFamily'{name = <<"rabbitmq_federation_links">>,
help = "Current number of federation links.",
type = 'GAUGE',
metric = [#'Metric'{label = [#'LabelPair'{name = <<"status">>,
value = <<"running">>}],
gauge = #'Gauge'{value = 1}},
#'Metric'{label = [#'LabelPair'{name = <<"status">>,
value = <<"starting">>}],
gauge = #'Gauge'{value = 1}}]}).

-import(rabbit_federation_test_util,
[expect/3, expect_empty/2,
set_upstream/4, clear_upstream/3, set_upstream_set/4,
set_policy/5, clear_policy/3,
set_policy_upstream/5, set_policy_upstreams/4,
no_plugins/1, with_ch/3, q/2, maybe_declare_queue/3, delete_all/2]).

all() ->
[
{group, non_parallel_tests}
].

groups() ->
[
{non_parallel_tests, [], [
single_link_then_second_added,
two_links_from_the_start
]}
].

suite() ->
[{timetrap, {minutes, 5}}].

%% -------------------------------------------------------------------
%% Testsuite setup/teardown.
%% -------------------------------------------------------------------
init_per_suite(Config) ->
rabbit_ct_helpers:log_environment(),
Config1 = rabbit_ct_helpers:set_config(Config, [
{rmq_nodename_suffix, ?MODULE}
]),
rabbit_ct_helpers:run_setup_steps(Config1,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps() ++
[fun rabbit_federation_test_util:setup_federation/1]).
end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config,
rabbit_ct_client_helpers:teardown_steps() ++
rabbit_ct_broker_helpers:teardown_steps()).

init_per_group(_, Config) ->
Config.

end_per_group(_, Config) ->
Config.

init_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_started(Config, Testcase).

end_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_finished(Config, Testcase).

%% -------------------------------------------------------------------
%% Test cases
%% -------------------------------------------------------------------

single_link_then_second_added(Config) ->
with_ch(
Config,
fun (Ch) ->
timer:sleep(3000),
[_L1] = rabbit_ct_broker_helpers:rpc(Config, 0,
rabbit_federation_status, status, []),
MFs = get_metrics(Config),
[?ONE_RUNNING_METRIC] = MFs,
maybe_declare_queue(Config, Ch, q(<<"fed.downstream2">>, [{<<"x-queue-type">>, longstr, <<"classic">>}])),
%% here we race against queue.declare... most of the times there is going to be
%% new status=starting metric. In this case we wait a bit more for running=2.
%% But running=2 is also possible first time if rpc for some reason is slow.
%% And of course simple running=1 possible too if queue.declare is really slow
MFs1 = get_metrics(Config),
case MFs1 of
[?TWO_RUNNING_METRIC] -> ok;
[?ONE_RUNNING_METRIC] ->
rabbit_ct_helpers:eventually(?_assertEqual([?TWO_RUNNING_METRIC],
get_metrics(Config)),
500,
5);
[?ONE_RUNNING_ONE_STARTING_METRIC] ->
rabbit_ct_helpers:eventually(?_assertEqual([?TWO_RUNNING_METRIC],
get_metrics(Config)),
500,
5)

end,

delete_all(Ch, [q(<<"fed.downstream2">>, [{<<"x-queue-type">>, longstr, <<"classic">>}])])
end, upstream_downstream()).

two_links_from_the_start(Config) ->
with_ch(
Config,
fun (_Ch) ->
timer:sleep(3000),
[_L1 | _L2] = rabbit_ct_broker_helpers:rpc(Config, 0,
rabbit_federation_status, status, []),
MFs = get_metrics(Config),
[?TWO_RUNNING_METRIC] = MFs

end, upstream_downstream() ++ [q(<<"fed.downstream2">>, [{<<"x-queue-type">>, longstr, <<"classic">>}])]).

%% -------------------------------------------------------------------
%%
%% -------------------------------------------------------------------

upstream_downstream() ->
[q(<<"upstream">>, undefined), q(<<"fed.downstream">>, undefined)].

get_metrics(Config) ->
rabbit_ct_broker_helpers:rpc(Config, 0,
rabbitmq_prometheus_collector_test_proxy, collect_mf,
[default, prometheus_rabbitmq_federation_collector]).
@@ -0,0 +1,12 @@
-module(rabbitmq_prometheus_collector_test_proxy).

-export([collect_mf/2]).

-define(PD_KEY, metric_families).

collect_mf(Registry, Collector) ->
put(?PD_KEY, []),
Collector:collect_mf(Registry, fun(MF) -> put(?PD_KEY, [MF | get(?PD_KEY)]) end),
MFs = lists:reverse(get(?PD_KEY)),
erase(?PD_KEY),
MFs.
1 change: 1 addition & 0 deletions moduleindex.yaml
Expand Up @@ -1109,6 +1109,7 @@ rabbitmq_prometheus:
- prometheus_rabbitmq_alarm_metrics_collector
- prometheus_rabbitmq_core_metrics_collector
- prometheus_rabbitmq_dynamic_collector
- prometheus_rabbitmq_federation_collector
- prometheus_rabbitmq_global_metrics_collector
- rabbit_prometheus_app
- rabbit_prometheus_dispatcher
Expand Down