Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A new command for Web MQTT connection listing #10693 #9302 #10761

Merged
merged 11 commits into from Mar 19, 2024
10 changes: 9 additions & 1 deletion deps/rabbit/src/rabbit_networking.erl
Expand Up @@ -34,7 +34,8 @@
force_connection_event_refresh/1, force_non_amqp_connection_event_refresh/1,
handshake/2, tcp_host/1,
ranch_ref/1, ranch_ref/2, ranch_ref_of_protocol/1,
listener_of_protocol/1, stop_ranch_listener_of_protocol/1]).
listener_of_protocol/1, stop_ranch_listener_of_protocol/1,
list_local_connections_of_protocol/1]).

%% Used by TCP-based transports, e.g. STOMP adapter
-export([tcp_listener_addresses/1,
Expand Down Expand Up @@ -252,6 +253,13 @@ stop_ranch_listener_of_protocol(Protocol) ->
ranch:stop_listener(Ref)
end.

-spec list_local_connections_of_protocol(atom()) -> [pid()].
list_local_connections_of_protocol(Protocol) ->
case ranch_ref_of_protocol(Protocol) of
undefined -> [];
AcceptorRef -> ranch:procs(AcceptorRef, connections)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function calls
supervisor:which_children/1 which documents:

Notice that calling this function when supervising many children under low memory conditions can cause an out of memory exception.

So, this implementation is dangerous when there are many connections, which is typical for MQTT.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, there isn't a more efficient way to do it in Ranch. Introducing connection tracking tables for Web MQTT would completely change the stope of this PR, which is meant to be a bug fix.

end.

-spec start_tcp_listener(
listener_config(), integer()) -> 'ok' | {'error', term()}.

Expand Down
3 changes: 3 additions & 0 deletions deps/rabbitmq_mqtt/include/rabbit_mqtt.hrl
Expand Up @@ -12,7 +12,10 @@
-define(PERSISTENT_TERM_EXCHANGE, mqtt_exchange).
-define(DEFAULT_MQTT_EXCHANGE, <<"amq.topic">>).
-define(MQTT_GUIDE_URL, <<"https://rabbitmq.com/docs/mqtt/">>).
-define(WEB_MQTT_GUIDE_URL, <<"https://rabbitmq.com/docs/web-mqtt/">>).
ansd marked this conversation as resolved.
Show resolved Hide resolved

-define(MQTT_TCP_PROTOCOL, 'mqtt').
-define(MQTT_TLS_PROTOCOL, 'mqtt/ssl').
-define(MQTT_PROTO_V3, mqtt310).
-define(MQTT_PROTO_V4, mqtt311).
-define(MQTT_PROTO_V5, mqtt50).
Expand Down
9 changes: 8 additions & 1 deletion deps/rabbitmq_mqtt/src/rabbit_mqtt.erl
Expand Up @@ -62,7 +62,7 @@ emit_connection_info_all(Nodes, Items, Ref, AggregatorPid) ->

-spec emit_connection_info_local(rabbit_types:info_keys(), reference(), pid()) -> ok.
emit_connection_info_local(Items, Ref, AggregatorPid) ->
LocalPids = local_connection_pids(),
LocalPids = list_local_mqtt_connections(),
emit_connection_info(Items, Ref, AggregatorPid, LocalPids).

emit_connection_info(Items, Ref, AggregatorPid, Pids) ->
Expand Down Expand Up @@ -93,6 +93,13 @@ local_connection_pids() ->
end, pg:which_groups(PgScope))
end.

%% This function excludes Web MQTT connections
list_local_mqtt_connections() ->
PlainPids = rabbit_networking:list_local_connections_of_protocol(?MQTT_TCP_PROTOCOL),
TLSPids = rabbit_networking:list_local_connections_of_protocol(?MQTT_TLS_PROTOCOL),
PlainPids ++ TLSPids.


init_global_counters() ->
lists:foreach(fun init_global_counters/1, [?MQTT_PROTO_V3,
?MQTT_PROTO_V4,
Expand Down
15 changes: 6 additions & 9 deletions deps/rabbitmq_mqtt/src/rabbit_mqtt_sup.erl
Expand Up @@ -13,9 +13,6 @@

-export([start_link/2, init/1, stop_listeners/0]).

-define(TCP_PROTOCOL, 'mqtt').
-define(TLS_PROTOCOL, 'mqtt/ssl').

start_link(Listeners, []) ->
supervisor:start_link({local, ?MODULE}, ?MODULE, [Listeners]).

Expand Down Expand Up @@ -66,8 +63,8 @@ init([{Listeners, SslListeners0}]) ->

-spec stop_listeners() -> ok.
stop_listeners() ->
_ = rabbit_networking:stop_ranch_listener_of_protocol(?TCP_PROTOCOL),
_ = rabbit_networking:stop_ranch_listener_of_protocol(?TLS_PROTOCOL),
_ = rabbit_networking:stop_ranch_listener_of_protocol(?MQTT_TCP_PROTOCOL),
_ = rabbit_networking:stop_ranch_listener_of_protocol(?MQTT_TLS_PROTOCOL),
ok.

%%
Expand All @@ -86,7 +83,7 @@ tcp_listener_spec([Address, SocketOpts, NumAcceptors, ConcurrentConnsSups]) ->
rabbit_mqtt_listener_sup,
Address,
SocketOpts,
transport(?TCP_PROTOCOL),
transport(?MQTT_TCP_PROTOCOL),
rabbit_mqtt_reader,
[],
mqtt,
Expand All @@ -101,7 +98,7 @@ ssl_listener_spec([Address, SocketOpts, SslOpts, NumAcceptors, ConcurrentConnsSu
rabbit_mqtt_listener_sup,
Address,
SocketOpts ++ SslOpts,
transport(?TLS_PROTOCOL),
transport(?MQTT_TLS_PROTOCOL),
rabbit_mqtt_reader,
[],
'mqtt/ssl',
Expand All @@ -111,7 +108,7 @@ ssl_listener_spec([Address, SocketOpts, SslOpts, NumAcceptors, ConcurrentConnsSu
"MQTT TLS listener"
).

transport(?TCP_PROTOCOL) ->
transport(?MQTT_TCP_PROTOCOL) ->
ranch_tcp;
transport(?TLS_PROTOCOL) ->
transport(?MQTT_TLS_PROTOCOL) ->
ranch_ssl.
25 changes: 24 additions & 1 deletion deps/rabbitmq_mqtt/test/command_SUITE.erl
Expand Up @@ -57,7 +57,23 @@ init_per_group(unit, Config) ->
Config;
init_per_group(Group, Config) ->
Config1 = rabbit_ct_helpers:set_config(Config, {mqtt_version, Group}),
util:maybe_skip_v5(Config1).
case Group of
v4 ->
AllApps = rabbit_ct_broker_helpers:rpc_all(Config1, application, loaded_applications, []),
AllAppNames = lists:map(fun (AppList) ->
lists:map(fun ({Name, _, _}) -> Name end, AppList)
end, AllApps),
case lists:all(fun (NodeApps) ->
lists:member(rabbit_web_mqtt_app, NodeApps)
end, AllAppNames) of
true ->
Config1;
false ->
{skip, "rabbit_web_mqtt_app not available on all nodes"}
end;
v5 ->
util:maybe_skip_v5(Config1)
end.
ansd marked this conversation as resolved.
Show resolved Hide resolved

end_per_group(_, Config) ->
Config.
Expand Down Expand Up @@ -86,6 +102,13 @@ run(Config) ->
%% No connections
[] = 'Elixir.Enum':to_list(?COMMAND:run([], Opts)),

%% Open a WebMQTT connection, command won't list it
WebMqttConfig = [{websocket, true} | Config],
_C0 = connect(<<"simpleWebMqttClient">>, WebMqttConfig, [{ack_timeout, 1}]),

[] = 'Elixir.Enum':to_list(?COMMAND:run([], Opts)),

%% Open a connection
C1 = connect(<<"simpleClient">>, Config, [{ack_timeout, 1}]),

timer:sleep(100),
Expand Down
15 changes: 15 additions & 0 deletions deps/rabbitmq_web_mqtt/BUILD.bazel
Expand Up @@ -68,6 +68,9 @@ rabbitmq_app(

xref(
name = "xref",
additional_libs = [
"//deps/rabbitmq_cli:erlang_app", # keep
],
target = ":erlang_app",
)

Expand All @@ -77,6 +80,7 @@ plt(
ignore_warnings = True,
libs = ["//deps/rabbitmq_cli:elixir"], # keep
plt = "//:base_plt",
deps = ["//deps/rabbitmq_cli:erlang_app"], # keep
)

dialyze(
Expand All @@ -91,6 +95,7 @@ eunit(
compiled_suites = [
":test_src_rabbit_ws_test_util_beam",
":test_src_rfc6455_client_beam",
":test_rabbit_web_mqtt_test_util_beam",
],
target = ":test_erlang_app",
)
Expand All @@ -101,6 +106,16 @@ rabbitmq_integration_suite(
name = "config_schema_SUITE",
)

rabbitmq_integration_suite(
name = "command_SUITE",
additional_beam = [
"test/rabbit_web_mqtt_test_util.beam",
],
runtime_deps = [
"@emqtt//:erlang_app",
],
)

rabbitmq_integration_suite(
name = "proxy_protocol_SUITE",
additional_beam = [
Expand Down
22 changes: 22 additions & 0 deletions deps/rabbitmq_web_mqtt/app.bzl
Expand Up @@ -9,6 +9,7 @@ def all_beam_files(name = "all_beam_files"):
erlang_bytecode(
name = "other_beam",
srcs = [
"src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListWebMqttConnectionsCommand.erl",
"src/rabbit_web_mqtt_app.erl",
"src/rabbit_web_mqtt_handler.erl",
"src/rabbit_web_mqtt_stream_handler.erl",
Expand All @@ -19,6 +20,7 @@ def all_beam_files(name = "all_beam_files"):
erlc_opts = "//:erlc_opts",
deps = [
"//deps/rabbit_common:erlang_app",
"//deps/rabbitmq_cli:erlang_app",
"//deps/rabbitmq_mqtt:erlang_app",
"@cowboy//:erlang_app",
],
Expand All @@ -34,6 +36,7 @@ def all_test_beam_files(name = "all_test_beam_files"):
name = "test_other_beam",
testonly = True,
srcs = [
"src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListWebMqttConnectionsCommand.erl",
"src/rabbit_web_mqtt_app.erl",
"src/rabbit_web_mqtt_handler.erl",
"src/rabbit_web_mqtt_stream_handler.erl",
Expand All @@ -44,6 +47,7 @@ def all_test_beam_files(name = "all_test_beam_files"):
erlc_opts = "//:test_erlc_opts",
deps = [
"//deps/rabbit_common:erlang_app",
"//deps/rabbitmq_cli:erlang_app",
"//deps/rabbitmq_mqtt:erlang_app",
"@cowboy//:erlang_app",
],
Expand All @@ -70,6 +74,7 @@ def all_srcs(name = "all_srcs"):
filegroup(
name = "srcs",
srcs = [
"src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListWebMqttConnectionsCommand.erl",
"src/rabbit_web_mqtt_app.erl",
"src/rabbit_web_mqtt_handler.erl",
"src/rabbit_web_mqtt_stream_handler.erl",
Expand Down Expand Up @@ -128,3 +133,20 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
app_name = "rabbitmq_web_mqtt",
erlc_opts = "//:test_erlc_opts",
)
erlang_bytecode(
name = "command_SUITE_beam_files",
testonly = True,
srcs = ["test/command_SUITE.erl"],
outs = ["test/command_SUITE.beam"],
app_name = "rabbitmq_web_mqtt",
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/amqp_client:erlang_app", "//deps/rabbitmq_mqtt:erlang_app"],
)
erlang_bytecode(
name = "test_rabbit_web_mqtt_test_util_beam",
testonly = True,
srcs = ["test/rabbit_web_mqtt_test_util.erl"],
outs = ["test/rabbit_web_mqtt_test_util.beam"],
app_name = "rabbitmq_web_mqtt",
erlc_opts = "//:test_erlc_opts",
)
@@ -0,0 +1,86 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.

-module('Elixir.RabbitMQ.CLI.Ctl.Commands.ListWebMqttConnectionsCommand').

-include_lib("rabbitmq_mqtt/include/rabbit_mqtt.hrl").

-behaviour('Elixir.RabbitMQ.CLI.CommandBehaviour').

-export([formatter/0,
scopes/0,
switches/0,
aliases/0,
usage/0,
usage_additional/0,
usage_doc_guides/0,
banner/2,
validate/2,
merge_defaults/2,
run/2,
output/2,
description/0,
help_section/0]).

formatter() -> 'Elixir.RabbitMQ.CLI.Formatters.Table'.
scopes() -> [ctl, diagnostics].
switches() -> [{verbose, boolean}].
aliases() -> [{'V', verbose}].

description() -> <<"Lists all Web MQTT connections">>.

help_section() ->
{plugin, web_mqtt}.

validate(Args, _) ->
InfoItems = lists:map(fun atom_to_list/1, ?INFO_ITEMS),
case 'Elixir.RabbitMQ.CLI.Ctl.InfoKeys':validate_info_keys(Args,
InfoItems) of
{ok, _} -> ok;
Error -> Error
end.

merge_defaults([], Opts) ->
merge_defaults([<<"client_id">>, <<"conn_name">>], Opts);
merge_defaults(Args, Opts) ->
{Args, maps:merge(#{verbose => false}, Opts)}.

usage() ->
<<"list_web_mqtt_connections [<column> ...]">>.

usage_additional() ->
Prefix = <<" must be one of ">>,
InfoItems = 'Elixir.Enum':join(lists:usort(?INFO_ITEMS), <<", ">>),
[
{<<"<column>">>, <<Prefix/binary, InfoItems/binary>>}
].

usage_doc_guides() ->
[?WEB_MQTT_GUIDE_URL].

run(Args, #{node := NodeName,
timeout := Timeout,
verbose := Verbose}) ->
InfoKeys = case Verbose of
true -> ?INFO_ITEMS;
false -> 'Elixir.RabbitMQ.CLI.Ctl.InfoKeys':prepare_info_keys(Args)
end,

Nodes = 'Elixir.RabbitMQ.CLI.Core.Helpers':nodes_in_cluster(NodeName),

'Elixir.RabbitMQ.CLI.Ctl.RpcStream':receive_list_items(
NodeName,
rabbit_web_mqtt_app,
emit_connection_info_all,
[Nodes, InfoKeys],
Timeout,
InfoKeys,
length(Nodes)).

banner(_, _) -> <<"Listing Web MQTT connections ...">>.

output(Result, _Opts) ->
'Elixir.RabbitMQ.CLI.DefaultOutput':output(Result).
40 changes: 24 additions & 16 deletions deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_app.erl
Expand Up @@ -12,7 +12,9 @@
start/2,
prep_stop/1,
stop/1,
list_connections/0
list_connections/0,
emit_connection_info_all/4,
emit_connection_info_local/3
]).

%% Dummy supervisor - see Ulf Wiger's comment at
Expand Down Expand Up @@ -48,27 +50,33 @@ init([]) -> {ok, {{one_for_one, 1, 5}, []}}.

-spec list_connections() -> [pid()].
list_connections() ->
PlainPids = connection_pids_of_protocol(?TCP_PROTOCOL),
TLSPids = connection_pids_of_protocol(?TLS_PROTOCOL),
PlainPids = rabbit_networking:list_local_connections_of_protocol(?TCP_PROTOCOL),
TLSPids = rabbit_networking:list_local_connections_of_protocol(?TLS_PROTOCOL),
PlainPids ++ TLSPids.

-spec emit_connection_info_all([node()], rabbit_types:info_keys(), reference(), pid()) -> term().
emit_connection_info_all(Nodes, Items, Ref, AggregatorPid) ->
Pids = [spawn_link(Node, ?MODULE, emit_connection_info_local,
[Items, Ref, AggregatorPid])
|| Node <- Nodes],

rabbit_control_misc:await_emitters_termination(Pids).

-spec emit_connection_info_local(rabbit_types:info_keys(), reference(), pid()) -> ok.
emit_connection_info_local(Items, Ref, AggregatorPid) ->
LocalPids = list_connections(),
emit_connection_info(Items, Ref, AggregatorPid, LocalPids).

emit_connection_info(Items, Ref, AggregatorPid, Pids) ->
rabbit_control_misc:emitting_map_with_exit_handler(
AggregatorPid, Ref,
fun(Pid) ->
rabbit_web_mqtt_handler:info(Pid, Items)
end, Pids).
%%
%% Implementation
%%

connection_pids_of_protocol(Protocol) ->
case rabbit_networking:ranch_ref_of_protocol(Protocol) of
undefined -> [];
AcceptorRef ->
lists:map(fun cowboy_ws_connection_pid/1, ranch:procs(AcceptorRef, connections))
end.

-spec cowboy_ws_connection_pid(pid()) -> pid().
cowboy_ws_connection_pid(RanchConnPid) ->
Children = supervisor:which_children(RanchConnPid),
{cowboy_clear, Pid, _, _} = lists:keyfind(cowboy_clear, 1, Children),
Pid.

mqtt_init() ->
CowboyOpts0 = maps:from_list(get_env(cowboy_opts, [])),
CowboyWsOpts = maps:from_list(get_env(cowboy_ws_opts, [])),
Expand Down