Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optionally queue outgoing data #3844

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
{stringprep, ".*", {git, "https://github.com/processone/stringprep", {tag, "1.0.28"}}},
{if_var_true, stun,
{stun, ".*", {git, "https://github.com/processone/stun", {tag, "1.2.2"}}}},
{xmpp, ".*", {git, "https://github.com/processone/xmpp", {tag, "1.5.8"}}},
{xmpp, ".*", {git, "https://github.com/weiss/xmpp", {branch, "feature/send-queue"}}},
{yconf, ".*", {git, "https://github.com/processone/yconf", {tag, "1.0.13"}}}
]}.

Expand Down
17 changes: 16 additions & 1 deletion src/ejabberd_c2s.erl
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,18 @@ init([State, Opts]) ->
TLSVerify = proplists:get_bool(tls_verify, Opts),
Zlib = proplists:get_bool(zlib, Opts),
Timeout = ejabberd_option:negotiation_timeout(),
MaxQSize = case ejabberd_option:c2s_max_send_queue_size() of
undefined ->
proplists:get_value(max_send_queue_size, Opts, 10);
C2SMaxQSize ->
C2SMaxQSize
end,
MaxQDelay = case ejabberd_option:c2s_max_send_queue_delay() of
undefined ->
proplists:get_value(max_send_queue_delay, Opts, 0);
C2SMaxQDelay ->
C2SMaxQDelay
end,
State1 = State#{tls_options => TLSOpts2,
tls_required => TLSRequired,
tls_enabled => TLSEnabled,
Expand All @@ -567,7 +579,8 @@ init([State, Opts]) ->
access => Access,
shaper => Shaper},
State2 = xmpp_stream_in:set_timeout(State1, Timeout),
ejabberd_hooks:run_fold(c2s_init, {ok, State2}, [Opts]).
State3 = xmpp_stream_in:configure_queue(State2, MaxQSize, MaxQDelay),
ejabberd_hooks:run_fold(c2s_init, {ok, State3}, [Opts]).

handle_call(get_presence, From, #{jid := JID} = State) ->
Pres = case maps:get(pres_last, State, error) of
Expand Down Expand Up @@ -1022,4 +1035,6 @@ listen_options() ->
{tls_verify, false},
{zlib, false},
{max_stanza_size, infinity},
{max_send_queue_size, 10},
{max_send_queue_delay, 0},
{max_fsm_queue, 10000}].
4 changes: 4 additions & 0 deletions src/ejabberd_listener.erl
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,10 @@ listen_opt_type(tls) ->
econf:bool();
listen_opt_type(max_stanza_size) ->
econf:pos_int(infinity);
listen_opt_type(max_send_queue_size) ->
econf:non_neg_int();
listen_opt_type(max_send_queue_delay) ->
econf:non_neg_int();
listen_opt_type(max_fsm_queue) ->
econf:pos_int();
listen_opt_type(send_timeout) ->
Expand Down
26 changes: 26 additions & 0 deletions src/ejabberd_option.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
-export([c2s_cafile/0, c2s_cafile/1]).
-export([c2s_ciphers/0, c2s_ciphers/1]).
-export([c2s_dhfile/0, c2s_dhfile/1]).
-export([c2s_max_send_queue_delay/0]).
-export([c2s_max_send_queue_size/0]).
-export([c2s_protocol_options/0, c2s_protocol_options/1]).
-export([c2s_tls_compression/0, c2s_tls_compression/1]).
-export([ca_file/0]).
Expand Down Expand Up @@ -124,6 +126,8 @@
-export([s2s_dns_retries/0, s2s_dns_retries/1]).
-export([s2s_dns_timeout/0, s2s_dns_timeout/1]).
-export([s2s_max_retry_delay/0]).
-export([s2s_max_send_queue_delay/0, s2s_max_send_queue_delay/1]).
-export([s2s_max_send_queue_size/0, s2s_max_send_queue_size/1]).
-export([s2s_protocol_options/0, s2s_protocol_options/1]).
-export([s2s_queue_type/0, s2s_queue_type/1]).
-export([s2s_timeout/0, s2s_timeout/1]).
Expand Down Expand Up @@ -275,6 +279,14 @@ c2s_dhfile() ->
c2s_dhfile(Host) ->
ejabberd_config:get_option({c2s_dhfile, Host}).

-spec c2s_max_send_queue_delay() -> 'undefined' | non_neg_integer().
c2s_max_send_queue_delay() ->
ejabberd_config:get_option({c2s_max_send_queue_delay, global}).

-spec c2s_max_send_queue_size() -> 'undefined' | non_neg_integer().
c2s_max_send_queue_size() ->
ejabberd_config:get_option({c2s_max_send_queue_size, global}).

-spec c2s_protocol_options() -> 'undefined' | binary().
c2s_protocol_options() ->
c2s_protocol_options(global).
Expand Down Expand Up @@ -851,6 +863,20 @@ s2s_dns_timeout(Host) ->
s2s_max_retry_delay() ->
ejabberd_config:get_option({s2s_max_retry_delay, global}).

-spec s2s_max_send_queue_delay() -> 'undefined' | non_neg_integer().
s2s_max_send_queue_delay() ->
s2s_max_send_queue_delay(global).
-spec s2s_max_send_queue_delay(global | binary()) -> 'undefined' | non_neg_integer().
s2s_max_send_queue_delay(Host) ->
ejabberd_config:get_option({s2s_max_send_queue_delay, Host}).

-spec s2s_max_send_queue_size() -> 'undefined' | non_neg_integer().
s2s_max_send_queue_size() ->
s2s_max_send_queue_size(global).
-spec s2s_max_send_queue_size(global | binary()) -> 'undefined' | non_neg_integer().
s2s_max_send_queue_size(Host) ->
ejabberd_config:get_option({s2s_max_send_queue_size, Host}).

-spec s2s_protocol_options() -> 'undefined' | binary().
s2s_protocol_options() ->
s2s_protocol_options(global).
Expand Down
16 changes: 16 additions & 0 deletions src/ejabberd_options.erl
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ opt_type(c2s_ciphers) ->
end;
opt_type(c2s_dhfile) ->
econf:file();
opt_type(c2s_max_send_queue_delay) ->
econf:non_neg_int();
opt_type(c2s_max_send_queue_size) ->
econf:non_neg_int();
opt_type(c2s_protocol_options) ->
econf:and_then(
econf:list(econf:binary(), [unique]),
Expand Down Expand Up @@ -337,6 +341,10 @@ opt_type(s2s_dns_timeout) ->
econf:timeout(second, infinity);
opt_type(s2s_max_retry_delay) ->
econf:timeout(second);
opt_type(s2s_max_send_queue_delay) ->
econf:non_neg_int();
opt_type(s2s_max_send_queue_size) ->
econf:non_neg_int();
opt_type(s2s_protocol_options) ->
opt_type(c2s_protocol_options);
opt_type(s2s_queue_type) ->
Expand Down Expand Up @@ -527,6 +535,8 @@ options() ->
{c2s_cafile, undefined},
{c2s_ciphers, undefined},
{c2s_dhfile, undefined},
{c2s_max_send_queue_delay, undefined},
{c2s_max_send_queue_size, undefined},
{c2s_protocol_options, undefined},
{c2s_tls_compression, undefined},
{ca_file, iolist_to_binary(pkix:get_cafile())},
Expand Down Expand Up @@ -635,6 +645,8 @@ options() ->
{s2s_dns_retries, 2},
{s2s_dns_timeout, timer:seconds(10)},
{s2s_max_retry_delay, timer:seconds(300)},
{s2s_max_send_queue_delay, 0},
{s2s_max_send_queue_size, 10},
{s2s_protocol_options, undefined},
{s2s_queue_type,
fun(Host) -> ejabberd_config:get_option({queue_type, Host}) end},
Expand Down Expand Up @@ -705,6 +717,8 @@ globals() ->
auth_cache_life_time,
auth_cache_missed,
auth_cache_size,
c2s_max_send_queue_delay,
c2s_max_send_queue_size,
ca_file,
captcha_cmd,
captcha_host,
Expand Down Expand Up @@ -752,6 +766,8 @@ globals() ->
router_use_cache,
rpc_timeout,
s2s_max_retry_delay,
c2s_max_send_queue_delay,
c2s_max_send_queue_size,
shaper,
sm_cache_life_time,
sm_cache_missed,
Expand Down
50 changes: 50 additions & 0 deletions src/ejabberd_options_doc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,31 @@ doc() ->
"dhparam -out dh.pem 2048\". If this option is not specified, "
"2048-bit MODP Group with 256-bit Prime Order Subgroup will be "
"used as defined in RFC5114 Section 2.3.")}},
{c2s_max_send_queue_delay,
#{value => ?T("non_neg_integer()"),
desc =>
[?T("Specifies the maximum number of milliseconds to queue an "
"outgoing stanza or stream management element. Setting this "
"option to a positive (non-zero) number allows for batching up "
"multiple XML elements into a single TCP packet in order to "
"reduce the TCP/IP overhead. The default value is '0', which "
"disables queueing."), "",
?T("To set a specific file per listener, use the listener's "
"http://../listen-options/#max_send_queue_delay[max_send_queue_delay] "
"option. Please note that 'c2s_max_send_queue_delay' overrides "
"the listener's 'max_send_queue_delay' option."), ""]}},
{c2s_max_send_queue_size,
#{value => ?T("non_neg_integer()"),
desc =>
[?T("Specifies the maximum number of elements to add to the send "
"queue. The default value is '10'. Note that this option has "
"no effect if 'max_send_queue_delay' isn't set to a value "
"larger than '0'. Setting this option to '0' disables "
"queueing."), "",
?T("To set a specific file per listener, use the listener's "
"http://../listen-options/#max_send_queue_size[max_send_queue_size] "
"option. Please note that 'c2s_max_send_queue_size' overrides "
"the listener's 'max_send_queue_size' option."), ""]}},
{c2s_protocol_options,
#{value => "[Option, ...]",
desc =>
Expand Down Expand Up @@ -1118,6 +1143,31 @@ doc() ->
"dhparam -out dh.pem 2048\". If this option is not specified, "
"2048-bit MODP Group with 256-bit Prime Order Subgroup will be "
"used as defined in RFC5114 Section 2.3.")}},
{s2s_max_send_queue_delay,
#{value => ?T("non_neg_integer()"),
desc =>
[?T("Specifies the maximum number of milliseconds to queue an "
"outgoing stanza or stream management element. Setting this "
"option to a positive (non-zero) number allows for batching up "
"multiple XML elements into a single TCP packet in order to "
"reduce the TCP/IP overhead. The default value is '0', which "
"disables queueing."), "",
?T("To set a specific file per listener, use the listener's "
"http://../listen-options/#max_send_queue_delay[max_send_queue_delay] "
"option. Please note that 's2s_max_send_queue_delay' overrides "
"the listener's 'max_send_queue_delay' option."), ""]}},
{s2s_max_send_queue_size,
#{value => ?T("non_neg_integer()"),
desc =>
[?T("Specifies the maximum number of elements to add to the send "
"queue. The default value is '10'. Note that this option has "
"no effect if 'max_send_queue_delay' isn't set to a value "
"larger than '0'. Setting this option to '0' disables "
"queueing."), "",
?T("To set a specific file per listener, use the listener's "
"http://../listen-options/#max_send_queue_size[max_send_queue_size] "
"option. Please note that 's2s_max_send_queue_size' overrides "
"the listener's 'max_send_queue_size' option."), ""]}},
{s2s_protocol_options,
#{value => "[Option, ...]",
desc =>
Expand Down
5 changes: 4 additions & 1 deletion src/ejabberd_s2s_out.erl
Original file line number Diff line number Diff line change
Expand Up @@ -279,16 +279,19 @@ init([#{server := LServer, remote_server := RServer} = State, Opts]) ->
false -> unlimited
end,
Timeout = ejabberd_option:negotiation_timeout(),
MaxQSize = ejabberd_option:s2s_max_send_queue_size(),
MaxQDelay = ejabberd_option:s2s_max_send_queue_delay(),
State1 = State#{on_route => queue,
queue => p1_queue:new(QueueType, QueueLimit),
xmlns => ?NS_SERVER,
lang => ejabberd_option:language(),
server_host => ServerHost,
shaper => none},
State2 = xmpp_stream_out:set_timeout(State1, Timeout),
State3 = xmpp_stream_out:configure_queue(State2, MaxQSize, MaxQDelay),
?INFO_MSG("Outbound s2s connection started: ~ts -> ~ts",
[LServer, RServer]),
ejabberd_hooks:run_fold(s2s_out_init, ServerHost, {ok, State2}, [Opts]).
ejabberd_hooks:run_fold(s2s_out_init, ServerHost, {ok, State3}, [Opts]).

handle_call(Request, From, #{server_host := ServerHost} = State) ->
ejabberd_hooks:run_fold(s2s_out_handle_call, ServerHost, State, [Request, From]).
Expand Down
9 changes: 7 additions & 2 deletions src/ejabberd_service.erl
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,13 @@ init([State, Opts]) ->
true -> TLSOpts1
end,
GlobalRoutes = proplists:get_value(global_routes, Opts, true),
MaxQSize = proplists:get_value(max_send_queue_size, Opts, 10),
MaxQDelay = proplists:get_value(max_send_queue_delay, Opts, 0),
Timeout = ejabberd_option:negotiation_timeout(),
State1 = xmpp_stream_in:change_shaper(State, ejabberd_shaper:new(Shaper)),
State2 = xmpp_stream_in:set_timeout(State1, Timeout),
State3 = State2#{access => Access,
State3 = xmpp_stream_in:configure_queue(State2, MaxQSize, MaxQDelay),
State4 = State3#{access => Access,
xmlns => ?NS_COMPONENT,
lang => ejabberd_option:language(),
server => ejabberd_config:get_myname(),
Expand All @@ -129,7 +132,7 @@ init([State, Opts]) ->
tls_options => TLSOpts,
global_routes => GlobalRoutes,
check_from => CheckFrom},
ejabberd_hooks:run_fold(component_init, {ok, State3}, [Opts]).
ejabberd_hooks:run_fold(component_init, {ok, State4}, [Opts]).

handle_stream_start(_StreamStart,
#{remote_server := RemoteServer,
Expand Down Expand Up @@ -302,6 +305,8 @@ listen_options() ->
{tls, false},
{tls_compression, false},
{max_stanza_size, infinity},
{max_send_queue_size, 10},
{max_send_queue_delay, 0},
{max_fsm_queue, 10000},
{password, undefined},
{hosts, []},
Expand Down