Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

4.x: change AMQP on disk message format & speed up the AMQP parser #10964

Merged
merged 26 commits into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
45694c9
Reword atoms
ansd Apr 11, 2024
5ce7f49
Use more efficient maps:from_list
ansd Apr 11, 2024
cf7a2a0
Speed up AMQP parser
ansd Apr 10, 2024
0a31d77
Parse until body and return position for bare sections
ansd Apr 12, 2024
4980669
New mc_amqp state
ansd Apr 14, 2024
bfe2854
Remove slow guard tests
ansd Apr 15, 2024
9fbbb47
Move frequent clauses to the top
ansd Apr 15, 2024
312d2af
Fix AMQP -> MQTT durable conversion
ansd Apr 16, 2024
2ee246c
Fix MQTT -> Stream
ansd Apr 16, 2024
2380b85
Do not create binaries in parse/1
ansd Apr 16, 2024
fc7f458
Fix tests
ansd Apr 17, 2024
0b51b8d
Introduce feature flag message_containers_store_amqp_v1
ansd Apr 21, 2024
81709d9
Fix MQTT QoS
ansd Apr 21, 2024
2e15704
Maintain footer
ansd Apr 22, 2024
eac469a
Change AMQP header durable to true by default
ansd Apr 21, 2024
e8e9ef3
Delete unused module rabbit_msg_record
ansd Apr 22, 2024
8040b8f
Do not store delivery-annotations
ansd Apr 22, 2024
1d02ea9
Fix crashes when message gets dead lettered
ansd Apr 23, 2024
0697c20
Support end-to-end checksums over the bare message
ansd Apr 24, 2024
6225dc9
Do not parse entire AMQP body
ansd Apr 25, 2024
9f42e40
Fix crash when old node receives new AMQP message
ansd Apr 26, 2024
6018155
Add property test for AMQP serializer and parser
ansd Apr 29, 2024
b721b1a
Support maps and lists in AMQP array elements
ansd May 1, 2024
4209f3f
Set durable annotation for AMQP messages
ansd May 2, 2024
c328968
Remove unused code
ansd May 2, 2024
d42e110
Apply PR feedback
ansd May 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 12 additions & 6 deletions deps/amqp10_client/src/amqp10_client_frame_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -262,11 +262,17 @@ handle_input(expecting_frame_body, Data,
{<<_:BodyLength/binary, Rest/binary>>, 0} ->
% heartbeat
handle_input(expecting_frame_header, Rest, State);
{<<FrameBody:BodyLength/binary, Rest/binary>>, _} ->
{<<Body:BodyLength/binary, Rest/binary>>, _} ->
State1 = State#state{frame_state = undefined},
{PerfDesc, Payload} = amqp10_binary_parser:parse(FrameBody),
Perf = amqp10_framing:decode(PerfDesc),
State2 = route_frame(Channel, FrameType, {Perf, Payload}, State1),
BytesBody = size(Body),
{DescribedPerformative, BytesParsed} = amqp10_binary_parser:parse(Body),
Performative = amqp10_framing:decode(DescribedPerformative),
Payload = if BytesParsed < BytesBody ->
binary_part(Body, BytesParsed, BytesBody - BytesParsed);
BytesParsed =:= BytesBody ->
no_payload
end,
State2 = route_frame(Channel, FrameType, {Performative, Payload}, State1),
handle_input(expecting_frame_header, Rest, State2);
_ ->
{ok, expecting_frame_body, Data, State}
Expand Down Expand Up @@ -294,8 +300,8 @@ route_frame(Channel, FrameType, {Performative, Payload} = Frame, State0) ->
State0),
?DBG("FRAME -> ~tp ~tp~n ~tp", [Channel, DestinationPid, Performative]),
case Payload of
<<>> -> ok = gen_statem:cast(DestinationPid, Performative);
_ -> ok = gen_statem:cast(DestinationPid, Frame)
no_payload -> gen_statem:cast(DestinationPid, Performative);
_ -> gen_statem:cast(DestinationPid, Frame)
end,
State.

Expand Down
206 changes: 157 additions & 49 deletions deps/amqp10_client/src/amqp10_client_session.erl

Large diffs are not rendered by default.

38 changes: 27 additions & 11 deletions deps/amqp10_client/src/amqp10_msg.erl
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,16 @@
-spec from_amqp_records([amqp10_client_types:amqp10_msg_record()]) ->
amqp10_msg().
from_amqp_records([#'v1_0.transfer'{} = Transfer | Records]) ->
lists:foldl(fun parse_from_amqp/2, #amqp10_msg{transfer = Transfer,
body = unset}, Records).
case lists:foldl(fun parse_from_amqp/2,
#amqp10_msg{transfer = Transfer,
body = unset},
Records) of
#amqp10_msg{body = Body} = Msg
when is_list(Body) ->
Msg#amqp10_msg{body = lists:reverse(Body)};
Msg ->
Msg
end.

-spec to_amqp_records(amqp10_msg()) -> [amqp10_client_types:amqp10_msg_record()].
to_amqp_records(#amqp10_msg{transfer = T,
Expand Down Expand Up @@ -327,7 +335,7 @@ set_properties(Props, #amqp10_msg{properties = Current} = Msg) ->
P = maps:fold(fun(message_id, {T, _V} = TypeVal, Acc) when T =:= ulong orelse
T =:= uuid orelse
T =:= binary orelse
T =:= uf8 ->
T =:= utf8 ->
Acc#'v1_0.properties'{message_id = TypeVal};
(message_id, V, Acc) when is_binary(V) ->
%% backward compat clause
Expand Down Expand Up @@ -414,15 +422,17 @@ wrap_ap_value(true) ->
{boolean, true};
wrap_ap_value(false) ->
{boolean, false};
wrap_ap_value(V) when is_integer(V) ->
{uint, V};
wrap_ap_value(V) when is_binary(V) ->
utf8(V);
wrap_ap_value(V) when is_list(V) ->
utf8(list_to_binary(V));
wrap_ap_value(V) when is_atom(V) ->
utf8(atom_to_list(V)).

utf8(atom_to_binary(V));
wrap_ap_value(V) when is_integer(V) ->
case V < 0 of
true -> {int, V};
false -> {uint, V}
end.

%% LOCAL
header_value(durable, undefined) -> false;
Expand All @@ -444,10 +454,16 @@ parse_from_amqp(#'v1_0.application_properties'{} = APs, AmqpMsg) ->
AmqpMsg#amqp10_msg{application_properties = APs};
parse_from_amqp(#'v1_0.amqp_value'{} = Value, AmqpMsg) ->
AmqpMsg#amqp10_msg{body = Value};
parse_from_amqp(#'v1_0.amqp_sequence'{} = Seq, AmqpMsg) ->
AmqpMsg#amqp10_msg{body = [Seq]};
parse_from_amqp(#'v1_0.data'{} = Data, AmqpMsg) ->
AmqpMsg#amqp10_msg{body = [Data]};
parse_from_amqp(#'v1_0.amqp_sequence'{} = Seq, AmqpMsg = #amqp10_msg{body = Body0}) ->
Body = if Body0 =:= unset -> [Seq];
is_list(Body0) -> [Seq | Body0]
end,
AmqpMsg#amqp10_msg{body = Body};
parse_from_amqp(#'v1_0.data'{} = Data, AmqpMsg = #amqp10_msg{body = Body0}) ->
Body = if Body0 =:= unset -> [Data];
is_list(Body0) -> [Data | Body0]
end,
AmqpMsg#amqp10_msg{body = Body};
parse_from_amqp(#'v1_0.footer'{} = Header, AmqpMsg) ->
AmqpMsg#amqp10_msg{footer = Header}.

Expand Down
7 changes: 4 additions & 3 deletions deps/amqp10_client/test/mock_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,15 @@ recv(Sock) ->
{ok, <<Length:32/unsigned, 2:8/unsigned,
_/unsigned, Ch:16/unsigned>>} = gen_tcp:recv(Sock, 8),
{ok, Data} = gen_tcp:recv(Sock, Length - 8),
{PerfDesc, Payload} = amqp10_binary_parser:parse(Data),
{PerfDesc, BytesParsed} = amqp10_binary_parser:parse(Data),
Perf = amqp10_framing:decode(PerfDesc),
Payload = binary_part(Data, BytesParsed, size(Data) - BytesParsed),
{Ch, Perf, Payload}.

amqp_step(Fun) ->
fun (Sock) ->
Recv = recv(Sock),
ct:pal("AMQP Step receieved ~tp~n", [Recv]),
ct:pal("AMQP Step received ~tp~n", [Recv]),
case Fun(Recv) of
{_Ch, []} -> ok;
{Ch, {multi, Records}} ->
Expand All @@ -81,4 +82,4 @@ send_amqp_header_step(Sock) ->
recv_amqp_header_step(Sock) ->
ct:pal("Receiving AMQP protocol header"),
{ok, R} = gen_tcp:recv(Sock, 8),
ct:pal("handshake Step receieved ~tp~n", [R]).
ct:pal("handshake Step received ~tp~n", [R]).
23 changes: 14 additions & 9 deletions deps/amqp10_client/test/system_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -313,12 +313,17 @@ roundtrip(OpenConf, Body) ->
await_link(Sender, credited, link_credit_timeout),

Now = os:system_time(millisecond),
Props = #{creation_time => Now},
Props = #{creation_time => Now,
message_id => <<"my message ID">>,
correlation_id => <<"my correlation ID">>,
content_type => <<"my content type">>,
content_encoding => <<"my content encoding">>,
group_id => <<"my group ID">>},
Msg0 = amqp10_msg:new(<<"my-tag">>, Body, true),
Msg1 = amqp10_msg:set_properties(Props, Msg0),
Msg2 = amqp10_msg:set_application_properties(#{"a_key" => "a_value"}, Msg1),
Msg3 = amqp10_msg:set_message_annotations(#{<<"x_key">> => "x_value"}, Msg2),
Msg = amqp10_msg:set_delivery_annotations(#{<<"y_key">> => "y_value"}, Msg3),
Msg1 = amqp10_msg:set_application_properties(#{"a_key" => "a_value"}, Msg0),
Msg2 = amqp10_msg:set_properties(Props, Msg1),
Msg = amqp10_msg:set_message_annotations(#{<<"x-key">> => "x-value",
<<"x_key">> => "x_value"}, Msg2),
ok = amqp10_client:send_msg(Sender, Msg),
ok = amqp10_client:detach_link(Sender),
await_link(Sender, {detached, normal}, link_detach_timeout),
Expand All @@ -331,10 +336,10 @@ roundtrip(OpenConf, Body) ->
ok = amqp10_client:close_connection(Connection),

% ct:pal(?LOW_IMPORTANCE, "roundtrip message Out: ~tp~nIn: ~tp~n", [OutMsg, Msg]),
#{creation_time := Now} = amqp10_msg:properties(OutMsg),
#{<<"a_key">> := <<"a_value">>} = amqp10_msg:application_properties(OutMsg),
#{<<"x_key">> := <<"x_value">>} = amqp10_msg:message_annotations(OutMsg),
#{<<"y_key">> := <<"y_value">>} = amqp10_msg:delivery_annotations(OutMsg),
?assertMatch(Props, amqp10_msg:properties(OutMsg)),
?assertEqual(#{<<"a_key">> => <<"a_value">>}, amqp10_msg:application_properties(OutMsg)),
?assertMatch(#{<<"x-key">> := <<"x-value">>,
<<"x_key">> := <<"x_value">>}, amqp10_msg:message_annotations(OutMsg)),
?assertEqual([Body], amqp10_msg:body(OutMsg)),
ok.

Expand Down
2 changes: 1 addition & 1 deletion deps/amqp10_common/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
/logs/
/plugins/
/plugins.lock
/rebar.config
/rebar.lock
/sbin/
/sbin.lock
/test/ct.cover.spec
/xrefr
_build

/amqp10_common.d
/*.plt
Expand Down
9 changes: 9 additions & 0 deletions deps/amqp10_common/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -110,17 +110,26 @@ dialyze(

rabbitmq_suite(
name = "binary_generator_SUITE",
size = "small",
)

rabbitmq_suite(
name = "binary_parser_SUITE",
size = "small",
)

rabbitmq_suite(
name = "serial_number_SUITE",
size = "small",
)

rabbitmq_suite(
name = "prop_SUITE",
deps = [
"//deps/rabbitmq_ct_helpers:erlang_app",
],
)

assert_suites()

alias(
Expand Down
1 change: 1 addition & 0 deletions deps/amqp10_common/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ endef

DIALYZER_OPTS += --src -r test -DTEST
BUILD_DEPS = rabbit_common
TEST_DEPS = rabbitmq_ct_helpers proper

# Variables and recipes in development.*.mk are meant to be used from
# any Git clone. They are excluded from the files published to Hex.pm.
Expand Down
10 changes: 10 additions & 0 deletions deps/amqp10_common/app.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,13 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
app_name = "amqp10_common",
erlc_opts = "//:test_erlc_opts",
)
erlang_bytecode(
name = "prop_SUITE_beam_files",
testonly = True,
srcs = ["test/prop_SUITE.erl"],
outs = ["test/prop_SUITE.beam"],
hdrs = ["include/amqp10_framing.hrl"],
app_name = "amqp10_common",
erlc_opts = "//:test_erlc_opts",
deps = ["@proper//:erlang_app"],
)
5 changes: 5 additions & 0 deletions deps/amqp10_common/rebar.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{profiles,
[{test, [{deps, [proper
]}]}
]
}.
55 changes: 38 additions & 17 deletions deps/amqp10_common/src/amqp10_binary_generator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
{symbol, binary()} |
{binary, binary()} |
{list, [amqp10_type()]} |
{map, [{amqp10_prim(), amqp10_prim()}]} | %% TODO: make map a map
{map, [{amqp10_prim(), amqp10_prim()}]} |
{array, amqp10_ctor(), [amqp10_type()]}.

-type amqp10_described() ::
Expand Down Expand Up @@ -113,16 +113,20 @@ generate1({long, V}) when V<128 andalso V>-129 -> <<16#55,V:8/signed>>;
generate1({long, V}) -> <<16#81,V:64/signed>>;
generate1({float, V}) -> <<16#72,V:32/float>>;
generate1({double, V}) -> <<16#82,V:64/float>>;
generate1({char, V}) -> <<16#73,V:4/binary>>;
generate1({char,V}) when V>=0 andalso V=<16#10ffff -> <<16#73,V:32>>;
%% AMQP timestamp is "64-bit two's-complement integer representing milliseconds since the unix epoch".
%% For small integers (i.e. values that can be stored in a single word),
%% Erlang uses two’s complement to represent the signed integers.
generate1({timestamp,V}) -> <<16#83,V:64/signed>>;
generate1({uuid, V}) -> <<16#98,V:16/binary>>;

generate1({utf8, V}) when size(V) < ?VAR_1_LIMIT -> [16#a1, size(V), V];
generate1({utf8, V}) -> [<<16#b1, (size(V)):32>>, V];
generate1({symbol, V}) -> [16#a3, size(V), V];
generate1({utf8, V}) when size(V) =< ?VAR_1_LIMIT -> [16#a1, size(V), V];
generate1({utf8, V}) -> [<<16#b1, (size(V)):32>>, V];
generate1({symbol, V}) when size(V) =< ?VAR_1_LIMIT -> [16#a3, size(V), V];
generate1({symbol, V}) -> [<<16#b3, (size(V)):32>>, V];
generate1({binary, V}) ->
Size = iolist_size(V),
case Size < ?VAR_1_LIMIT of
case Size =< ?VAR_1_LIMIT of
true ->
[16#a0, Size, V];
false ->
Expand All @@ -145,12 +149,12 @@ generate1({list, List}) ->
[16#c0, S + 1, Count, Compound]
end;

generate1({map, ListOfPairs}) ->
Count = length(ListOfPairs) * 2,
generate1({map, KvList}) ->
Count = length(KvList) * 2,
Compound = lists:map(fun ({Key, Val}) ->
[(generate1(Key)),
(generate1(Val))]
end, ListOfPairs),
end, KvList),
S = iolist_size(Compound),
%% See generate1({list, ...}) for an explanation of this test.
if Count >= (256 - 1) orelse (S + 1) >= 256 ->
Expand All @@ -168,16 +172,12 @@ generate1({array, Type, List}) ->
if Count >= (256 - 1) orelse (S + 1) >= 256 ->
[<<16#f0, (S + 4):32, Count:32>>, Array];
true ->
[16#e0, S + 1, Count, Array]
[16#e0, S + 1, Count, Array]
end;

generate1({as_is, TypeCode, Bin}) ->
<<TypeCode, Bin>>.

%% TODO again these are a stub to get SASL working. New codec? Will
%% that ever happen? If not we really just need to split generate/1
%% up into things like these...
%% for these constructors map straight-forwardly
constructor(symbol) -> 16#b3;
constructor(ubyte) -> 16#50;
constructor(ushort) -> 16#60;
Expand All @@ -194,18 +194,23 @@ constructor(timestamp) -> 16#83;
constructor(uuid) -> 16#98;
constructor(null) -> 16#40;
constructor(boolean) -> 16#56;
constructor(array) -> 16#f0; % use large array type for all nested arrays
constructor(binary) -> 16#b0;
constructor(utf8) -> 16#b1;
constructor(list) -> 16#d0; % use large list type for all array elements
constructor(map) -> 16#d1; % use large map type for all array elements
constructor(array) -> 16#f0; % use large array type for all nested arrays
constructor({described, Descriptor, Primitive}) ->
[16#00, generate1(Descriptor), constructor(Primitive)].

% returns io_list
generate2(symbol, {symbol, V}) -> [<<(size(V)):32>>, V];
generate2(utf8, {utf8, V}) -> [<<(size(V)):32>>, V];
generate2(binary, {binary, V}) -> [<<(size(V)):32>>, V];
generate2(boolean, true) -> 16#01;
generate2(boolean, false) -> 16#00;
generate2(boolean, {boolean, true}) -> 16#01;
generate2(boolean, {boolean, false}) -> 16#00;
generate2(null, null) -> 16#40;
generate2(char, {char,V}) when V>=0 andalso V=<16#10ffff -> <<V:32>>;
generate2(ubyte, {ubyte, V}) -> V;
generate2(byte, {byte, V}) -> <<V:8/signed>>;
generate2(ushort, {ushort, V}) -> <<V:16/unsigned>>;
Expand All @@ -214,12 +219,28 @@ generate2(uint, {uint, V}) -> <<V:32/unsigned>>;
generate2(int, {int, V}) -> <<V:32/signed>>;
generate2(ulong, {ulong, V}) -> <<V:64/unsigned>>;
generate2(long, {long, V}) -> <<V:64/signed>>;
generate2(float, {float, V}) -> <<V:32/float>>;
generate2(double, {double, V}) -> <<V:64/float>>;
generate2(timestamp, {timestamp,V}) -> <<V:64/signed>>;
generate2(uuid, {uuid, V}) -> <<V:16/binary>>;
generate2({described, D, P}, {described, D, V}) ->
generate2(P, V);
generate2(list, {list, List}) ->
Count = length(List),
Compound = lists:map(fun generate1/1, List),
S = iolist_size(Compound),
[<<(S + 4):32, Count:32>>, Compound];
generate2(map, {map, KvList}) ->
Count = length(KvList) * 2,
Compound = lists:map(fun ({Key, Val}) ->
[(generate1(Key)),
(generate1(Val))]
end, KvList),
S = iolist_size(Compound),
[<<(S + 4):32, Count:32>>, Compound];
generate2(array, {array, Type, List}) ->
Count = length(List),
Array = [constructor(Type),
[generate2(Type, I) || I <- List]],
S = iolist_size(Array),
%% See generate1({list, ...}) for an explanation of this test.
[<<(S + 4):32, Count:32>>, Array].