Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent not_found on list web mqtt connections #10693

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 9 additions & 1 deletion deps/rabbit/src/rabbit_networking.erl
Expand Up @@ -34,7 +34,8 @@
force_connection_event_refresh/1, force_non_amqp_connection_event_refresh/1,
handshake/2, tcp_host/1,
ranch_ref/1, ranch_ref/2, ranch_ref_of_protocol/1,
listener_of_protocol/1, stop_ranch_listener_of_protocol/1]).
listener_of_protocol/1, stop_ranch_listener_of_protocol/1,
list_local_connections_of_protocol/1]).

%% Used by TCP-based transports, e.g. STOMP adapter
-export([tcp_listener_addresses/1,
Expand Down Expand Up @@ -252,6 +253,13 @@ stop_ranch_listener_of_protocol(Protocol) ->
ranch:stop_listener(Ref)
end.

-spec list_local_connections_of_protocol(atom()) -> [pid()].
list_local_connections_of_protocol(Protocol) ->
case ranch_ref_of_protocol(Protocol) of
undefined -> [];
AcceptorRef -> ranch:procs(AcceptorRef, connections)
end.

-spec start_tcp_listener(
listener_config(), integer()) -> 'ok' | {'error', term()}.

Expand Down
3 changes: 3 additions & 0 deletions deps/rabbitmq_mqtt/include/rabbit_mqtt.hrl
Expand Up @@ -12,7 +12,10 @@
-define(PERSISTENT_TERM_EXCHANGE, mqtt_exchange).
-define(DEFAULT_MQTT_EXCHANGE, <<"amq.topic">>).
-define(MQTT_GUIDE_URL, <<"https://rabbitmq.com/docs/mqtt/">>).
-define(WEB_MQTT_GUIDE_URL, <<"https://rabbitmq.com/docs/web-mqtt/">>).

-define(MQTT_TCP_PROTOCOL, 'mqtt').
-define(MQTT_TLS_PROTOCOL, 'mqtt/ssl').
-define(MQTT_PROTO_V3, mqtt310).
-define(MQTT_PROTO_V4, mqtt311).
-define(MQTT_PROTO_V5, mqtt50).
Expand Down
9 changes: 8 additions & 1 deletion deps/rabbitmq_mqtt/src/rabbit_mqtt.erl
Expand Up @@ -62,7 +62,7 @@ emit_connection_info_all(Nodes, Items, Ref, AggregatorPid) ->

-spec emit_connection_info_local(rabbit_types:info_keys(), reference(), pid()) -> ok.
emit_connection_info_local(Items, Ref, AggregatorPid) ->
LocalPids = local_connection_pids(),
LocalPids = list_local_mqtt_connections(),
emit_connection_info(Items, Ref, AggregatorPid, LocalPids).

emit_connection_info(Items, Ref, AggregatorPid, Pids) ->
Expand Down Expand Up @@ -93,6 +93,13 @@ local_connection_pids() ->
end, pg:which_groups(PgScope))
end.

%% This function excludes Web MQTT connections
list_local_mqtt_connections() ->
PlainPids = rabbit_networking:list_local_connections_of_protocol(?MQTT_TCP_PROTOCOL),
TLSPids = rabbit_networking:list_local_connections_of_protocol(?MQTT_TLS_PROTOCOL),
PlainPids ++ TLSPids.


init_global_counters() ->
lists:foreach(fun init_global_counters/1, [?MQTT_PROTO_V3,
?MQTT_PROTO_V4,
Expand Down
15 changes: 6 additions & 9 deletions deps/rabbitmq_mqtt/src/rabbit_mqtt_sup.erl
Expand Up @@ -13,9 +13,6 @@

-export([start_link/2, init/1, stop_listeners/0]).

-define(TCP_PROTOCOL, 'mqtt').
-define(TLS_PROTOCOL, 'mqtt/ssl').

start_link(Listeners, []) ->
supervisor:start_link({local, ?MODULE}, ?MODULE, [Listeners]).

Expand Down Expand Up @@ -66,8 +63,8 @@ init([{Listeners, SslListeners0}]) ->

-spec stop_listeners() -> ok.
stop_listeners() ->
_ = rabbit_networking:stop_ranch_listener_of_protocol(?TCP_PROTOCOL),
_ = rabbit_networking:stop_ranch_listener_of_protocol(?TLS_PROTOCOL),
_ = rabbit_networking:stop_ranch_listener_of_protocol(?MQTT_TCP_PROTOCOL),
_ = rabbit_networking:stop_ranch_listener_of_protocol(?MQTT_TLS_PROTOCOL),
ok.

%%
Expand All @@ -86,7 +83,7 @@ tcp_listener_spec([Address, SocketOpts, NumAcceptors, ConcurrentConnsSups]) ->
rabbit_mqtt_listener_sup,
Address,
SocketOpts,
transport(?TCP_PROTOCOL),
transport(?MQTT_TCP_PROTOCOL),
rabbit_mqtt_reader,
[],
mqtt,
Expand All @@ -101,7 +98,7 @@ ssl_listener_spec([Address, SocketOpts, SslOpts, NumAcceptors, ConcurrentConnsSu
rabbit_mqtt_listener_sup,
Address,
SocketOpts ++ SslOpts,
transport(?TLS_PROTOCOL),
transport(?MQTT_TLS_PROTOCOL),
rabbit_mqtt_reader,
[],
'mqtt/ssl',
Expand All @@ -111,7 +108,7 @@ ssl_listener_spec([Address, SocketOpts, SslOpts, NumAcceptors, ConcurrentConnsSu
"MQTT TLS listener"
).

transport(?TCP_PROTOCOL) ->
transport(?MQTT_TCP_PROTOCOL) ->
ranch_tcp;
transport(?TLS_PROTOCOL) ->
transport(?MQTT_TLS_PROTOCOL) ->
ranch_ssl.
7 changes: 7 additions & 0 deletions deps/rabbitmq_mqtt/test/command_SUITE.erl
Expand Up @@ -86,6 +86,13 @@ run(Config) ->
%% No connections
[] = 'Elixir.Enum':to_list(?COMMAND:run([], Opts)),

%% Open a WebMQTT connection, command won't list it
WebMqttConfig = [{websocket, true} | Config],
_C0 = connect(<<"simpleWebMqttClient">>, WebMqttConfig, [{ack_timeout, 1}]),

[] = 'Elixir.Enum':to_list(?COMMAND:run([], Opts)),

%% Open a connection
C1 = connect(<<"simpleClient">>, Config, [{ack_timeout, 1}]),

timer:sleep(100),
Expand Down
@@ -0,0 +1,86 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.

-module('Elixir.RabbitMQ.CLI.Ctl.Commands.ListWebMqttConnectionsCommand').

-include_lib("rabbitmq_mqtt/include/rabbit_mqtt.hrl").

-behaviour('Elixir.RabbitMQ.CLI.CommandBehaviour').

-export([formatter/0,
scopes/0,
switches/0,
aliases/0,
usage/0,
usage_additional/0,
usage_doc_guides/0,
banner/2,
validate/2,
merge_defaults/2,
run/2,
output/2,
description/0,
help_section/0]).

formatter() -> 'Elixir.RabbitMQ.CLI.Formatters.Table'.
scopes() -> [ctl, diagnostics].
switches() -> [{verbose, boolean}].
aliases() -> [{'V', verbose}].

description() -> <<"Lists all Web MQTT connections">>.

help_section() ->
{plugin, web_mqtt}.

validate(Args, _) ->
InfoItems = lists:map(fun atom_to_list/1, ?INFO_ITEMS),
case 'Elixir.RabbitMQ.CLI.Ctl.InfoKeys':validate_info_keys(Args,
InfoItems) of
{ok, _} -> ok;
Error -> Error
end.

merge_defaults([], Opts) ->
merge_defaults([<<"client_id">>, <<"conn_name">>], Opts);
merge_defaults(Args, Opts) ->
{Args, maps:merge(#{verbose => false}, Opts)}.

usage() ->
<<"list_web_mqtt_connections [<column> ...]">>.

usage_additional() ->
Prefix = <<" must be one of ">>,
InfoItems = 'Elixir.Enum':join(lists:usort(?INFO_ITEMS), <<", ">>),
[
{<<"<column>">>, <<Prefix/binary, InfoItems/binary>>}
].

usage_doc_guides() ->
[?WEB_MQTT_GUIDE_URL].

run(Args, #{node := NodeName,
timeout := Timeout,
verbose := Verbose}) ->
InfoKeys = case Verbose of
true -> ?INFO_ITEMS;
false -> 'Elixir.RabbitMQ.CLI.Ctl.InfoKeys':prepare_info_keys(Args)
end,

Nodes = 'Elixir.RabbitMQ.CLI.Core.Helpers':nodes_in_cluster(NodeName),

'Elixir.RabbitMQ.CLI.Ctl.RpcStream':receive_list_items(
NodeName,
rabbit_web_mqtt_app,
emit_connection_info_all,
[Nodes, InfoKeys],
Timeout,
InfoKeys,
length(Nodes)).

banner(_, _) -> <<"Listing Web MQTT connections ...">>.

output(Result, _Opts) ->
'Elixir.RabbitMQ.CLI.DefaultOutput':output(Result).
40 changes: 24 additions & 16 deletions deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_app.erl
Expand Up @@ -12,7 +12,9 @@
start/2,
prep_stop/1,
stop/1,
list_connections/0
list_connections/0,
emit_connection_info_all/4,
emit_connection_info_local/3
]).

%% Dummy supervisor - see Ulf Wiger's comment at
Expand Down Expand Up @@ -48,27 +50,33 @@ init([]) -> {ok, {{one_for_one, 1, 5}, []}}.

-spec list_connections() -> [pid()].
list_connections() ->
PlainPids = connection_pids_of_protocol(?TCP_PROTOCOL),
TLSPids = connection_pids_of_protocol(?TLS_PROTOCOL),
PlainPids = rabbit_networking:list_local_connections_of_protocol(?TCP_PROTOCOL),
TLSPids = rabbit_networking:list_local_connections_of_protocol(?TLS_PROTOCOL),
PlainPids ++ TLSPids.

-spec emit_connection_info_all([node()], rabbit_types:info_keys(), reference(), pid()) -> term().
emit_connection_info_all(Nodes, Items, Ref, AggregatorPid) ->
Pids = [spawn_link(Node, ?MODULE, emit_connection_info_local,
[Items, Ref, AggregatorPid])
|| Node <- Nodes],

rabbit_control_misc:await_emitters_termination(Pids).

-spec emit_connection_info_local(rabbit_types:info_keys(), reference(), pid()) -> ok.
emit_connection_info_local(Items, Ref, AggregatorPid) ->
LocalPids = list_connections(),
emit_connection_info(Items, Ref, AggregatorPid, LocalPids).

emit_connection_info(Items, Ref, AggregatorPid, Pids) ->
rabbit_control_misc:emitting_map_with_exit_handler(
AggregatorPid, Ref,
fun(Pid) ->
rabbit_web_mqtt_handler:info(Pid, Items)
end, Pids).
%%
%% Implementation
%%

connection_pids_of_protocol(Protocol) ->
case rabbit_networking:ranch_ref_of_protocol(Protocol) of
undefined -> [];
AcceptorRef ->
lists:map(fun cowboy_ws_connection_pid/1, ranch:procs(AcceptorRef, connections))
end.

-spec cowboy_ws_connection_pid(pid()) -> pid().
cowboy_ws_connection_pid(RanchConnPid) ->
Children = supervisor:which_children(RanchConnPid),
{cowboy_clear, Pid, _, _} = lists:keyfind(cowboy_clear, 1, Children),
Pid.

mqtt_init() ->
CowboyOpts0 = maps:from_list(get_env(cowboy_opts, [])),
CowboyWsOpts = maps:from_list(get_env(cowboy_ws_opts, [])),
Expand Down
18 changes: 18 additions & 0 deletions deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl
Expand Up @@ -23,6 +23,7 @@
]).

-export([conserve_resources/3]).
-export([info/2]).

%% cowboy_sub_protocol
-export([upgrade/4,
Expand Down Expand Up @@ -94,6 +95,19 @@ init(Req, Opts) ->
end
end.

%% We cannot use a gen_server call, because the handler process is a
%% special cowboy_websocket process (not a gen_server) which assumes
%% all gen_server calls are supervisor calls, and does not pass on the
%% request to this callback module. (see cowboy_websocket:loop/3 and
%% cowboy_children:handle_supervisor_call/4) However using a generic
%% gen:call with a special label ?MODULE works fine.
-spec info(pid(), rabbit_types:info_keys()) ->
rabbit_types:infos().
info(Pid, all) ->
info(Pid, ?INFO_ITEMS);
info(Pid, Items) ->
{ok, Res} = gen:call(Pid, ?MODULE, {info, Items}),
Res.
-spec websocket_init(state()) ->
{cowboy_websocket:commands(), state()} |
{cowboy_websocket:commands(), state(), hibernate}.
Expand Down Expand Up @@ -244,6 +258,10 @@ websocket_info(connection_created, State) ->
rabbit_core_metrics:connection_created(self(), Infos),
rabbit_event:notify(connection_created, Infos),
{[], State, hibernate};
websocket_info({?MODULE, From, {info, Items}}, State) ->
Infos = infos(Items, State),
gen:reply(From, Infos),
{[], State, hibernate};
websocket_info(Msg, State) ->
?LOG_WARNING("Web MQTT: unexpected message ~tp", [Msg]),
{[], State, hibernate}.
Expand Down