Skip to content

Commit

Permalink
WIP Do not create binaries in parse/1
Browse files Browse the repository at this point in the history
  • Loading branch information
ansd committed Apr 16, 2024
1 parent 72fd0a6 commit 4dc0ef9
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 139 deletions.
18 changes: 12 additions & 6 deletions deps/amqp10_client/src/amqp10_client_frame_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -262,11 +262,17 @@ handle_input(expecting_frame_body, Data,
{<<_:BodyLength/binary, Rest/binary>>, 0} ->
% heartbeat
handle_input(expecting_frame_header, Rest, State);
{<<FrameBody:BodyLength/binary, Rest/binary>>, _} ->
{<<Body:BodyLength/binary, Rest/binary>>, _} ->
State1 = State#state{frame_state = undefined},
{PerfDesc, Payload} = amqp10_binary_parser:parse(FrameBody),
Perf = amqp10_framing:decode(PerfDesc),
State2 = route_frame(Channel, FrameType, {Perf, Payload}, State1),
BytesBody = size(Body),
{DescribedPerformative, BytesParsed} = amqp10_binary_parser:parse(Body),
Performative = amqp10_framing:decode(DescribedPerformative),
Payload = if BytesParsed < BytesBody ->
binary_part(Body, BytesParsed, BytesBody - BytesParsed);
BytesParsed =:= BytesBody ->
no_payload
end,
State2 = route_frame(Channel, FrameType, {Performative, Payload}, State1),
handle_input(expecting_frame_header, Rest, State2);
_ ->
{ok, expecting_frame_body, Data, State}
Expand Down Expand Up @@ -294,8 +300,8 @@ route_frame(Channel, FrameType, {Performative, Payload} = Frame, State0) ->
State0),
?DBG("FRAME -> ~tp ~tp~n ~tp", [Channel, DestinationPid, Performative]),
case Payload of
<<>> -> ok = gen_statem:cast(DestinationPid, Performative);
_ -> ok = gen_statem:cast(DestinationPid, Frame)
no_payload -> gen_statem:cast(DestinationPid, Performative);
_ -> gen_statem:cast(DestinationPid, Frame)
end,
State.

Expand Down
188 changes: 81 additions & 107 deletions deps/amqp10_common/src/amqp10_binary_parser.erl
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,6 @@
-export([parse/1,
parse_many/2]).

-ifdef(TEST).
-export([parse_many_slow/1]).
-endif.

%% §1.6
-define(CODE_ULONG, 16#80).
-define(CODE_SMALL_ULONG, 16#53).
Expand All @@ -37,100 +33,91 @@

-export_type([opts/0]).


%% Parses only the 1st AMQP type (including possible nested AMQP types).
-spec parse(binary()) ->
{amqp10_binary_generator:amqp10_type(), Rest :: binary()}.
parse(<<?DESCRIBED, Bin/binary>>) ->
{Descriptor, Rest0} = parse(Bin),
{Value, Rest} = parse(Rest0),
{{described, Descriptor, Value}, Rest};
parse(<<16#40, R/binary>>) -> {null, R};
parse(<<16#41, R/binary>>) -> {true, R};
parse(<<16#42, R/binary>>) -> {false, R};
parse(<<16#43, R/binary>>) -> {{uint, 0}, R};
parse(<<16#44, R/binary>>) -> {{ulong, 0}, R};
{amqp10_binary_generator:amqp10_type(), BytesParsed :: non_neg_integer()}.
parse(Binary) ->
parse(Binary, 0).

parse(<<?DESCRIBED, Rest/binary>>, B) ->
{Descriptor, B1} = parse(Rest),
<<_ParsedDescriptorBin:B1/binary, Rest1/binary>> = Rest,
{Value, B2} = parse(Rest1),
{{described, Descriptor, Value}, B+1+B1+B2};
parse(<<16#40, _/binary>>, B) -> {null, B+1};
parse(<<16#41, _/binary>>, B) -> {true, B+1};
parse(<<16#42, _/binary>>, B) -> {false, B+1};
parse(<<16#43, _/binary>>, B) -> {{uint, 0}, B+1};
parse(<<16#44, _/binary>>, B) -> {{ulong, 0}, B+1};
%% Fixed-widths. Most integral types have a compact encoding as a byte.
parse(<<16#50, V:8/unsigned, R/binary>>) -> {{ubyte, V}, R};
parse(<<16#51, V:8/signed, R/binary>>) -> {{byte, V}, R};
parse(<<16#52, V:8/unsigned, R/binary>>) -> {{uint, V}, R};
parse(<<?CODE_SMALL_ULONG, V:8/unsigned, R/binary>>) -> {{ulong, V}, R};
parse(<<16#54, V:8/signed, R/binary>>) -> {{int, V}, R};
parse(<<16#55, V:8/signed, R/binary>>) -> {{long, V}, R};
parse(<<16#56, 0:8/unsigned, R/binary>>) -> {false, R};
parse(<<16#56, 1:8/unsigned, R/binary>>) -> {true, R};
parse(<<16#60, V:16/unsigned, R/binary>>) -> {{ushort, V}, R};
parse(<<16#61, V:16/signed, R/binary>>) -> {{short, V}, R};
parse(<<16#70, V:32/unsigned, R/binary>>) -> {{uint, V}, R};
parse(<<16#71, V:32/signed, R/binary>>) -> {{int, V}, R};
parse(<<16#72, V:32/float, R/binary>>) -> {{float, V}, R};
parse(<<16#73, Utf32:4/binary,R/binary>>) -> {{char, Utf32}, R};
parse(<<?CODE_ULONG, V:64/unsigned, R/binary>>) -> {{ulong, V},R};
parse(<<16#81, V:64/signed, R/binary>>) -> {{long, V}, R};
parse(<<16#82, V:64/float, R/binary>>) -> {{double, V}, R};
parse(<<16#83, TS:64/signed, R/binary>>) -> {{timestamp, TS}, R};
parse(<<16#98, Uuid:16/binary,R/binary>>) -> {{uuid, Uuid}, R};
parse(<<16#50, V:8/unsigned, _/binary>>, B) -> {{ubyte, V}, B+2};
parse(<<16#51, V:8/signed, _/binary>>, B) -> {{byte, V}, B+2};
parse(<<16#52, V:8/unsigned, _/binary>>, B) -> {{uint, V}, B+2};
parse(<<?CODE_SMALL_ULONG, V:8/unsigned, _/binary>>, B) -> {{ulong, V}, B+2};
parse(<<16#54, V:8/signed, _/binary>>, B) -> {{int, V}, B+2};
parse(<<16#55, V:8/signed, _/binary>>, B) -> {{long, V}, B+2};
parse(<<16#56, 0:8/unsigned, _/binary>>, B) -> {false, B+2};
parse(<<16#56, 1:8/unsigned, _/binary>>, B) -> {true, B+2};
parse(<<16#60, V:16/unsigned, _/binary>>, B) -> {{ushort, V}, B+3};
parse(<<16#61, V:16/signed, _/binary>>, B) -> {{short, V}, B+3};
parse(<<16#70, V:32/unsigned, _/binary>>, B) -> {{uint, V}, B+5};
parse(<<16#71, V:32/signed, _/binary>>, B) -> {{int, V}, B+5};
parse(<<16#72, V:32/float, _/binary>>, B) -> {{float, V}, B+5};
parse(<<16#73, Utf32:4/binary,_/binary>>, B) -> {{char, Utf32}, B+5};
parse(<<?CODE_ULONG, V:64/unsigned, _/binary>>, B) -> {{ulong, V},B+9};
parse(<<16#81, V:64/signed, _/binary>>, B) -> {{long, V}, B+9};
parse(<<16#82, V:64/float, _/binary>>, B) -> {{double, V}, B+9};
parse(<<16#83, TS:64/signed, _/binary>>, B) -> {{timestamp, TS}, B+9};
parse(<<16#98, Uuid:16/binary,_/binary>>, B) -> {{uuid, Uuid}, B+17};
%% Variable-widths
parse(<<16#a0, S:8, V:S/binary,R/binary>>)-> {{binary, V}, R};
parse(<<16#a1, S:8, V:S/binary,R/binary>>)-> {{utf8, V}, R};
parse(<<?CODE_SYM_8, S:8, V:S/binary,R/binary>>)-> {{symbol, V}, R};
parse(<<?CODE_SYM_32, S:32,V:S/binary,R/binary>>)-> {{symbol, V}, R};
parse(<<16#b0, S:32,V:S/binary,R/binary>>)-> {{binary, V}, R};
parse(<<16#b1, S:32,V:S/binary,R/binary>>)-> {{utf8, V}, R};
parse(<<16#a0, S:8, V:S/binary,_/binary>>, B)-> {{binary, V}, B+2+S};
parse(<<16#a1, S:8, V:S/binary,_/binary>>, B)-> {{utf8, V}, B+2+S};
parse(<<?CODE_SYM_8, S:8, V:S/binary,_/binary>>, B)-> {{symbol, V}, B+2+S};
parse(<<?CODE_SYM_32, S:32,V:S/binary,_/binary>>, B)-> {{symbol, V}, B+5+S};
parse(<<16#b0, S:32,V:S/binary,_/binary>>, B)-> {{binary, V}, B+5+S};
parse(<<16#b1, S:32,V:S/binary,_/binary>>, B)-> {{utf8, V}, B+5+S};
%% Compounds
parse(<<16#45, R/binary>>) ->
{{list, []}, R};
parse(<<16#c0, Size, Count, Value:(Size-1)/binary, R/binary>>) ->
%%TODO avoid lists:foldl => use recursion instead
{L, <<>>} = lists:foldl(fun(_, {AccL, AccBin}) ->
{V, Rest} = parse(AccBin),
{[V | AccL], Rest}
end, {[], Value}, lists:seq(1, Count)),
{{list, lists:reverse(L)}, R};
parse(<<16#c1, Size, Count, Value:(Size-1)/binary, R/binary>>) ->
{L, <<>>} = lists:foldl(fun(_, {AccL, AccBin}) ->
{V, Rest} = parse(AccBin),
{[V | AccL], Rest}
end, {[], Value}, lists:seq(1, Count)),
{{map, mapify(lists:reverse(L))}, R};
parse(<<16#d0, Size:32, Count:32, Value:(Size-4)/binary, R/binary>>) ->
{L, <<>>} = lists:foldl(fun(_, {AccL, AccBin}) ->
{V, Rest} = parse(AccBin),
{[V | AccL], Rest}
end, {[], Value}, lists:seq(1, Count)),
{{list, lists:reverse(L)}, R};
parse(<<16#d1, Size:32, Count:32, Value:(Size-4)/binary,R/binary>>) ->
{L, <<>>} = lists:foldl(fun(_, {AccL, AccBin}) ->
{V, Rest} = parse(AccBin),
{[V | AccL], Rest}
end, {[], Value}, lists:seq(1, Count)),
{{map, mapify(lists:reverse(L))}, R};
parse(<<16#45, _/binary>>, B) ->
{{list, []}, B+1};
parse(<<16#c0, Size, _IgnoreCount, Value:(Size-1)/binary, _/binary>>, B) ->
{{list, parse_many(Value, [])}, B+2+Size};
parse(<<16#c1, Size, _IgnoreCount, Value:(Size-1)/binary, _/binary>>, B) ->
List = parse_many(Value, []),
{{map, mapify(List)}, B+2+Size};
parse(<<16#d0, Size:32, _IgnoreCount:32, Value:(Size-4)/binary, _/binary>>, B) ->
{{list, parse_many(Value, [])}, B+5+Size};
parse(<<16#d1, Size:32, _IgnoreCount:32, Value:(Size-4)/binary, _/binary>>, B) ->
List = parse_many(Value, []),
{{map, mapify(List)}, B+5+Size};
%% Arrays
parse(<<16#e0, S:8,CountAndV:S/binary,R/binary>>) ->
{parse_array(8, CountAndV), R};
parse(<<16#f0, S:32,CountAndV:S/binary,R/binary>>) ->
{parse_array(32, CountAndV), R};
parse(<<16#e0, S:8,CountAndV:S/binary,_/binary>>, B) ->
{parse_array(8, CountAndV), B+2+S};
parse(<<16#f0, S:32,CountAndV:S/binary,_/binary>>, B) ->
{parse_array(32, CountAndV), B+5+S};
%% NaN or +-inf
parse(<<16#72, V:32, R/binary>>) ->
{{as_is, 16#72, <<V:32>>}, R};
parse(<<16#82, V:64, R/binary>>) ->
{{as_is, 16#82, <<V:64>>}, R};
parse(<<16#72, V:32, _/binary>>, B) ->
{{as_is, 16#72, <<V:32>>}, B+5};
parse(<<16#82, V:64, _/binary>>, B) ->
{{as_is, 16#82, <<V:64>>}, B+9};
%% decimals
parse(<<16#74, V:32, R/binary>>) ->
{{as_is, 16#74, <<V:32>>}, R};
parse(<<16#84, V:64, R/binary>>) ->
{{as_is, 16#84, <<V:64>>}, R};
parse(<<16#94, V:128, R/binary>>) ->
{{as_is, 16#94, <<V:128>>}, R};
parse(<<Type, _R/binary>>) ->
throw({primitive_type_unsupported, Type}).
parse(<<16#74, V:32, _/binary>>, B) ->
{{as_is, 16#74, <<V:32>>}, B+5};
parse(<<16#84, V:64, _/binary>>, B) ->
{{as_is, 16#84, <<V:64>>}, B+9};
parse(<<16#94, V:128, _/binary>>, B) ->
{{as_is, 16#94, <<V:128>>}, B+17};
parse(<<Type, _/binary>>, B) ->
throw({primitive_type_unsupported, Type, {position, B}}).

parse_array_primitive(16#40, <<_:8/unsigned, R/binary>>) -> {null, R};
parse_array_primitive(16#41, <<_:8/unsigned, R/binary>>) -> {true, R};
parse_array_primitive(16#42, <<_:8/unsigned, R/binary>>) -> {false, R};
parse_array_primitive(16#43, <<_:8/unsigned, R/binary>>) -> {{uint, 0}, R};
parse_array_primitive(16#44, <<_:8/unsigned, R/binary>>) -> {{ulong, 0}, R};
parse_array_primitive(16#40, <<_:8/unsigned, _/binary>>) -> {null, 1};
parse_array_primitive(16#41, <<_:8/unsigned, _/binary>>) -> {true, 1};
parse_array_primitive(16#42, <<_:8/unsigned, _/binary>>) -> {false, 1};
parse_array_primitive(16#43, <<_:8/unsigned, _/binary>>) -> {{uint, 0}, 1};
parse_array_primitive(16#44, <<_:8/unsigned, _/binary>>) -> {{ulong, 0}, 1};
parse_array_primitive(ElementType, Data) ->
parse(<<ElementType, Data/binary>>).
{Val, B} = parse(<<ElementType, Data/binary>>),
{Val, B-1}.

%% array structure is {array, Ctor, [Data]}
%% e.g. {array, symbol, [<<"amqp:accepted:list">>]}
Expand All @@ -139,7 +126,8 @@ parse_array(UnitSize, Bin) ->
parse_array1(Count, Bin1).

parse_array1(Count, <<?DESCRIBED, Rest/binary>>) ->
{Descriptor, Rest1} = parse(Rest),
{Descriptor, B1} = parse(Rest),
<<_ParsedDescriptorBin:B1/binary, Rest1/binary>> = Rest,
{array, Type, List} = parse_array1(Count, Rest1),
Values = lists:map(fun (Value) ->
{described, Descriptor, Value}
Expand All @@ -156,7 +144,8 @@ parse_array2(0, Type, Bin, Acc) ->
parse_array2(Count, Type, <<>>, Acc) when Count > 0 ->
exit({failed_to_parse_array_insufficient_input, Type, Count, Acc});
parse_array2(Count, Type, Bin, Acc) ->
{Value, Rest} = parse_array_primitive(Type, Bin),
{Value, B} = parse_array_primitive(Type, Bin),
<<_ParsedValue:B/binary, Rest/binary>> = Bin,
parse_array2(Count - 1, Type, Rest, [Value | Acc]).

parse_constructor(?CODE_SYM_8) -> symbol;
Expand All @@ -174,7 +163,7 @@ parse_constructor(16#81) -> long;
parse_constructor(16#40) -> null;
parse_constructor(16#56) -> boolean;
parse_constructor(16#f0) -> array;
parse_constructor(0) -> described;
parse_constructor(0) -> described; %%TODO add test with descriptor in constructor
parse_constructor(X) ->
exit({failed_to_parse_constructor, X}).

Expand All @@ -183,29 +172,14 @@ mapify([]) ->
mapify([Key, Value | Rest]) ->
[{Key, Value} | mapify(Rest)].

-ifdef(TEST).
%% This is the old, slow, original parser implemenation before
%% https://github.com/rabbitmq/rabbitmq-server/pull/4811
%% where many sub binaries are being created.
parse_many_slow(ValueBin)
when is_binary(ValueBin) ->
Res = parse_many_slow(parse(ValueBin), []),
lists:reverse(Res).

parse_many_slow({Value, <<>>}, Acc) ->
[Value | Acc];
parse_many_slow({Value, Rest}, Acc) ->
parse_many_slow(parse(Rest), [Value | Acc]).
-endif.

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% NB: When compiling this file with "ERL_COMPILER_OPTIONS=bin_opt_info"
%% make sure that all code below here outputs only "OPTIMIZED: match context reused"!
%% Neither "BINARY CREATED" nor "NOT OPTIMIZED" must be output!
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

%% Parses all AMQP types (or, in server_mode, stops when the body is reached).
%% This is an optimisation over calling parse/1 repeatedly which is done by parse_many_slow/1
%% This is an optimisation over calling parse/1 repeatedly.
%% We re-use the match context avoiding creation of sub binaries.
-spec parse_many(binary(), opts()) ->
[amqp10_binary_generator:amqp10_type() |
Expand Down
13 changes: 7 additions & 6 deletions deps/amqp10_common/test/binary_generator_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,11 @@ list(_Config) ->
%% list:list0
roundtrip({list, []}),
%% list:list8
roundtrip({list, [{utf8, <<"hi">>},
roundtrip({list, [
{utf8, <<"hi">>},
{int, 123},
{binary, <<"data">>},
{array, int, [{int, 1}, {int, 2}, {int, 3}]},
{array, int, [{int, 1}, {int, -2147483648}, {int, 2147483647}]},
{described,
{utf8, <<"URL">>},
{utf8, <<"http://example.org/hello-world">>}}
Expand Down Expand Up @@ -185,15 +186,15 @@ array(_Config) ->

roundtrip(Term) ->
Bin = iolist_to_binary(amqp10_binary_generator:generate(Term)),
% generate returns an iolist but parse expects a binary
?assertEqual({Term, <<>>}, amqp10_binary_parser:parse(Bin)),
?assertEqual({Term, size(Bin)}, amqp10_binary_parser:parse(Bin)),
?assertEqual([Term], amqp10_binary_parser:parse_many(Bin, [])).

%% Return the roundtripped term.
roundtrip_return(Term) ->
Bin = iolist_to_binary(amqp10_binary_generator:generate(Term)),
%% We assert only that amqp10_binary_parser:parse/1 and
%% amqp10_binary_parser:parse_all/1 return the same term.
{RoundTripTerm, <<>>} = amqp10_binary_parser:parse(Bin),
%% amqp10_binary_parser:parse_many/2 return the same term.
{RoundTripTerm, BytesParsed} = amqp10_binary_parser:parse(Bin),
?assertEqual(size(Bin), BytesParsed),
?assertEqual([RoundTripTerm], amqp10_binary_parser:parse_many(Bin, [])),
RoundTripTerm.
8 changes: 2 additions & 6 deletions deps/amqp10_common/test/binary_parser_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ roundtrip(_Config) ->
<<Acc/binary, B/binary>>
end, <<>>, Terms),

?assertEqual(Terms, amqp10_binary_parser:parse_many_slow(Bin)),
?assertEqual(Terms, amqp10_binary_parser:parse_many(Bin, [])).

array_with_extra_input(_Config) ->
Expand All @@ -80,13 +79,10 @@ array_with_extra_input(_Config) ->
%% element type, input, accumulated result
65, <<105,45,70,73,5,101,110,45,85,83>>, [true,true]},

?assertExit(Expected, amqp10_binary_parser:parse_many_slow(Bin)),
?assertExit(Expected, amqp10_binary_parser:parse_many(Bin, [])).

unsupported_type(_Config) ->
UnsupportedType = 16#02,
Bin = <<UnsupportedType, "hey">>,
?assertThrow({primitive_type_unsupported, UnsupportedType},
amqp10_binary_parser:parse_many_slow(Bin)),
?assertThrow({primitive_type_unsupported, UnsupportedType, {position, 0}},
amqp10_binary_parser:parse_many(Bin, [])).
Expected = {primitive_type_unsupported, UnsupportedType, {position, 0}},
?assertThrow(Expected, amqp10_binary_parser:parse_many(Bin, [])).
28 changes: 14 additions & 14 deletions deps/rabbit/src/rabbit_amqp_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -364,21 +364,21 @@ handle_frame0(Mode, Channel, Body, State) ->

%% "The frame body is defined as a performative followed by an opaque payload." [2.3.2]
parse_frame_body(Body, _Channel) ->
%% TODO test binary_part() here instead of returning binaries in amqp10_binary_parser:parse/1.
%% The latter can return the number of bytes parsed instead.
%% This should remove all warnings from the parser when compiliation option bin_opt_info is set.
{DescribedPerformative, Payload} = amqp10_binary_parser:parse(Body),
BytesBody = size(Body),
{DescribedPerformative, BytesParsed} = amqp10_binary_parser:parse(Body),
Performative = amqp10_framing:decode(DescribedPerformative),
?DEBUG("~s Channel ~tp ->~n~tp~n~ts~n",
[?MODULE, _Channel, amqp10_framing:pprint(Performative),
case Payload of
<<>> -> <<>>;
_ -> rabbit_misc:format(
" followed by ~tb bytes of payload", [size(Payload)])
end]),
case Payload of
<<>> -> Performative;
_ -> {Performative, Payload}
% ?DEBUG("~s Channel ~tp ->~n~tp~n~ts~n",
% [?MODULE, _Channel, amqp10_framing:pprint(Performative),
% case Payload of
% <<>> -> <<>>;
% _ -> rabbit_misc:format(
% " followed by ~tb bytes of payload", [size(Payload)])
% end]),
if BytesParsed < BytesBody ->
Payload = binary_part(Body, BytesParsed, BytesBody - BytesParsed),
{Performative, Payload};
BytesParsed =:= BytesBody ->
Performative
end.

handle_connection_frame(
Expand Down

0 comments on commit 4dc0ef9

Please sign in to comment.