Skip to content

Commit

Permalink
Prevent consulting Web MQTT cons pids for MQTT
Browse files Browse the repository at this point in the history
Discussion #9302
Previous commit tried to do the same as this one but changed an exported
function so current commit provides and makes use of a new internal
function listing only MQTT connections.
  • Loading branch information
LoisSotoLopez committed Mar 8, 2024
1 parent f48d518 commit d8fd771
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 20 deletions.
10 changes: 9 additions & 1 deletion deps/rabbit/src/rabbit_networking.erl
Expand Up @@ -34,7 +34,8 @@
force_connection_event_refresh/1, force_non_amqp_connection_event_refresh/1,
handshake/2, tcp_host/1,
ranch_ref/1, ranch_ref/2, ranch_ref_of_protocol/1,
listener_of_protocol/1, stop_ranch_listener_of_protocol/1]).
listener_of_protocol/1, stop_ranch_listener_of_protocol/1,
list_local_connections_of_protocol/1]).

%% Used by TCP-based transports, e.g. STOMP adapter
-export([tcp_listener_addresses/1,
Expand Down Expand Up @@ -252,6 +253,13 @@ stop_ranch_listener_of_protocol(Protocol) ->
ranch:stop_listener(Ref)
end.

-spec list_local_connections_of_protocol(atom()) -> [pid()].
list_local_connections_of_protocol(Protocol) ->
case ranch_ref_of_protocol(Protocol) of
undefined -> [];
AcceptorRef -> ranch:procs(AcceptorRef, connections)
end.

-spec start_tcp_listener(
listener_config(), integer()) -> 'ok' | {'error', term()}.

Expand Down
5 changes: 4 additions & 1 deletion deps/rabbitmq_mqtt/include/rabbit_mqtt.hrl
Expand Up @@ -11,8 +11,11 @@
-define(PERSISTENT_TERM_MAILBOX_SOFT_LIMIT, mqtt_mailbox_soft_limit).
-define(PERSISTENT_TERM_EXCHANGE, mqtt_exchange).
-define(DEFAULT_MQTT_EXCHANGE, <<"amq.topic">>).
-define(MQTT_GUIDE_URL, <<"https://rabbitmq.com/mqtt.html">>).
-define(MQTT_GUIDE_URL, <<"https://rabbitmq.com/docs/mqtt">>).
-define(WEB_MQTT_GUIDE_URL, <<"https://rabbitmq.com/docs/web-mqtt">>).

-define(MQTT_TCP_PROTOCOL, 'mqtt').
-define(MQTT_TLS_PROTOCOL, 'mqtt/ssl').
-define(MQTT_PROTO_V3, mqtt310).
-define(MQTT_PROTO_V4, mqtt311).
-define(MQTT_PROTO_V5, mqtt50).
Expand Down
Expand Up @@ -71,15 +71,14 @@ run(Args, #{node := NodeName,

Nodes = 'Elixir.RabbitMQ.CLI.Core.Helpers':nodes_in_cluster(NodeName),

AllItemsEnum = 'Elixir.RabbitMQ.CLI.Ctl.RpcStream':receive_list_items(
'Elixir.RabbitMQ.CLI.Ctl.RpcStream':receive_list_items(
NodeName,
rabbit_mqtt,
emit_connection_info_all,
[Nodes, InfoKeys],
Timeout,
InfoKeys,
length(Nodes)),
'Elixir.Enum':to_list(AllItemsEnum).
length(Nodes)).


banner(_, _) -> <<"Listing MQTT connections ...">>.
Expand Down
17 changes: 12 additions & 5 deletions deps/rabbitmq_mqtt/src/rabbit_mqtt.erl
Expand Up @@ -62,7 +62,7 @@ emit_connection_info_all(Nodes, Items, Ref, AggregatorPid) ->

-spec emit_connection_info_local(rabbit_types:info_keys(), reference(), pid()) -> ok.
emit_connection_info_local(Items, Ref, AggregatorPid) ->
LocalPids = local_connection_pids(),
LocalPids = list_local_mqtt_connections(),
emit_connection_info(Items, Ref, AggregatorPid, LocalPids).

emit_connection_info(Items, Ref, AggregatorPid, Pids) ->
Expand All @@ -87,12 +87,19 @@ local_connection_pids() ->
AllPids = rabbit_mqtt_collector:list_pids(),
lists:filter(fun(Pid) -> node(Pid) =:= node() end, AllPids);
false ->
case rabbit_networking:ranch_ref_of_protocol('mqtt') of
undefined -> [];
AcceptorRef -> ranch:procs(AcceptorRef, connections)
end
PgScope = persistent_term:get(?PG_SCOPE),
lists:flatmap(fun(Group) ->
pg:get_local_members(PgScope, Group)
end, pg:which_groups(PgScope))
end.

%% This function excludes Web MQTT connections
list_local_mqtt_connections() ->
PlainPids = rabbit_networking:list_local_connections_of_protocol(?MQTT_TCP_PROTOCOL),
TLSPids = rabbit_networking:list_local_connections_of_protocol(?MQTT_TLS_PROTOCOL),
PlainPids ++ TLSPids.


init_global_counters() ->
lists:foreach(fun init_global_counters/1, [?MQTT_PROTO_V3,
?MQTT_PROTO_V4,
Expand Down
15 changes: 6 additions & 9 deletions deps/rabbitmq_mqtt/src/rabbit_mqtt_sup.erl
Expand Up @@ -13,9 +13,6 @@

-export([start_link/2, init/1, stop_listeners/0]).

-define(TCP_PROTOCOL, 'mqtt').
-define(TLS_PROTOCOL, 'mqtt/ssl').

start_link(Listeners, []) ->
supervisor:start_link({local, ?MODULE}, ?MODULE, [Listeners]).

Expand Down Expand Up @@ -66,8 +63,8 @@ init([{Listeners, SslListeners0}]) ->

-spec stop_listeners() -> ok.
stop_listeners() ->
_ = rabbit_networking:stop_ranch_listener_of_protocol(?TCP_PROTOCOL),
_ = rabbit_networking:stop_ranch_listener_of_protocol(?TLS_PROTOCOL),
_ = rabbit_networking:stop_ranch_listener_of_protocol(?MQTT_TCP_PROTOCOL),
_ = rabbit_networking:stop_ranch_listener_of_protocol(?MQTT_TLS_PROTOCOL),
ok.

%%
Expand All @@ -86,7 +83,7 @@ tcp_listener_spec([Address, SocketOpts, NumAcceptors, ConcurrentConnsSups]) ->
rabbit_mqtt_listener_sup,
Address,
SocketOpts,
transport(?TCP_PROTOCOL),
transport(?MQTT_TCP_PROTOCOL),
rabbit_mqtt_reader,
[],
mqtt,
Expand All @@ -101,7 +98,7 @@ ssl_listener_spec([Address, SocketOpts, SslOpts, NumAcceptors, ConcurrentConnsSu
rabbit_mqtt_listener_sup,
Address,
SocketOpts ++ SslOpts,
transport(?TLS_PROTOCOL),
transport(?MQTT_TLS_PROTOCOL),
rabbit_mqtt_reader,
[],
'mqtt/ssl',
Expand All @@ -111,7 +108,7 @@ ssl_listener_spec([Address, SocketOpts, SslOpts, NumAcceptors, ConcurrentConnsSu
"MQTT TLS listener"
).

transport(?TCP_PROTOCOL) ->
transport(?MQTT_TCP_PROTOCOL) ->
ranch_tcp;
transport(?TLS_PROTOCOL) ->
transport(?MQTT_TLS_PROTOCOL) ->
ranch_ssl.
Expand Up @@ -59,7 +59,7 @@ usage_additional() ->
].

usage_doc_guides() ->
[?MQTT_GUIDE_URL].
[?WEB_MQTT_GUIDE_URL].

run(Args, #{node := NodeName,
timeout := Timeout,
Expand Down

0 comments on commit d8fd771

Please sign in to comment.