forked from rabbitmq/rabbitmq-server
/
rabbit_mqtt_sup.erl
114 lines (103 loc) · 3.5 KB
/
rabbit_mqtt_sup.erl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
-module(rabbit_mqtt_sup).
-behaviour(supervisor).
-include_lib("rabbit_common/include/rabbit.hrl").
-include("rabbit_mqtt.hrl").
-export([start_link/2, init/1, stop_listeners/0]).
start_link(Listeners, []) ->
supervisor:start_link({local, ?MODULE}, ?MODULE, [Listeners]).
init([{Listeners, SslListeners0}]) ->
NumTcpAcceptors = application:get_env(?APP_NAME, num_tcp_acceptors, 10),
ConcurrentConnsSups = application:get_env(?APP_NAME, num_conns_sups, 1),
{ok, SocketOpts} = application:get_env(?APP_NAME, tcp_listen_options),
{SslOpts, NumSslAcceptors, SslListeners}
= case SslListeners0 of
[] -> {none, 0, []};
_ -> {rabbit_networking:ensure_ssl(),
application:get_env(?APP_NAME, num_ssl_acceptors, 10),
SslListeners0}
end,
%% Use separate process group scope per RabbitMQ node. This achieves a local-only
%% process group which requires less memory with millions of connections.
PgScope = list_to_atom(io_lib:format("~s_~s", [?PG_SCOPE, node()])),
persistent_term:put(?PG_SCOPE, PgScope),
{ok,
{#{strategy => one_for_all,
intensity => 10,
period => 10},
[
#{id => PgScope,
start => {pg, start_link, [PgScope]},
restart => transient,
shutdown => ?WORKER_WAIT,
type => worker
},
#{
id => rabbit_mqtt_retainer_sup,
start => {rabbit_mqtt_retainer_sup, start_link, []},
restart => transient,
shutdown => ?SUPERVISOR_WAIT,
type => supervisor
}
| listener_specs(
fun tcp_listener_spec/1,
[SocketOpts, NumTcpAcceptors, ConcurrentConnsSups],
Listeners
) ++
listener_specs(
fun ssl_listener_spec/1,
[SocketOpts, SslOpts, NumSslAcceptors, ConcurrentConnsSups],
SslListeners
)
]}}.
-spec stop_listeners() -> ok.
stop_listeners() ->
_ = rabbit_networking:stop_ranch_listener_of_protocol(?MQTT_TCP_PROTOCOL),
_ = rabbit_networking:stop_ranch_listener_of_protocol(?MQTT_TLS_PROTOCOL),
ok.
%%
%% Implementation
%%
listener_specs(Fun, Args, Listeners) ->
[
Fun([Address | Args])
|| Listener <- Listeners,
Address <- rabbit_networking:tcp_listener_addresses(Listener)
].
tcp_listener_spec([Address, SocketOpts, NumAcceptors, ConcurrentConnsSups]) ->
rabbit_networking:tcp_listener_spec(
rabbit_mqtt_listener_sup,
Address,
SocketOpts,
transport(?MQTT_TCP_PROTOCOL),
rabbit_mqtt_reader,
[],
mqtt,
NumAcceptors,
ConcurrentConnsSups,
worker,
"MQTT TCP listener"
).
ssl_listener_spec([Address, SocketOpts, SslOpts, NumAcceptors, ConcurrentConnsSups]) ->
rabbit_networking:tcp_listener_spec(
rabbit_mqtt_listener_sup,
Address,
SocketOpts ++ SslOpts,
transport(?MQTT_TLS_PROTOCOL),
rabbit_mqtt_reader,
[],
'mqtt/ssl',
NumAcceptors,
ConcurrentConnsSups,
worker,
"MQTT TLS listener"
).
transport(?MQTT_TCP_PROTOCOL) ->
ranch_tcp;
transport(?MQTT_TLS_PROTOCOL) ->
ranch_ssl.