Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

4.x: change AMQP on disk message format & speed up the AMQP parser #10964

Merged
merged 26 commits into from May 3, 2024

Conversation

ansd
Copy link
Member

@ansd ansd commented Apr 10, 2024

This PR comes with the following benefits

1. Immutable bare message

According to the AMQP spec:

The bare message is immutable within the AMQP network. That is, none of the sections can be changed by any node acting as an AMQP intermediary. If a section of the bare message is omitted, one MUST NOT be inserted by an intermediary. The exact encoding of sections of the bare message MUST NOT be modified. This preserves message hashes, HMACs and signatures based on the binary encoding of the bare message.

Prior to this commit, this requirement was violated by RabbitMQ.
RabbitMQ used to decode the bare message from the sending client and encode the bare message before sending to the receiving client. RabbitMQ violated the immutability of the bare message for example:

  1. by not respecting the 1 vs 2 byte AMQP boolean encoding, and
  2. by always providing the optional null value in the trailing fields to indicate absence of a value

This PR ensures that the bare message is never modified if the message is sent and consumed by AMQP clients.

2. End-to-end checksumming

Thanks to 1. Immutable bare message, this PR adds a new feature to the AMQP Erlang client: end-to-end checksumming over the bare message (i.e. body + application defined headers).

This PR allows an app to configure the AMQP client:

  • for a sending link to automatically compute CRC-32 or Adler-32
    checksums over each bare message including the computed checksum
    as a footer annotation, and
  • for a receiving link to automatically lookup the expected CRC-32
    or Adler-32 checksum in the footer annotation and, if present, check
    the received checksum against the actually computed checksum.

The feature comes with the following advantages:

  1. Transparent end-to-end checksumming. Although checksumming is
    performed by TCP and RabbitMQ queues using the disk, end-to-end
    checksumming is a level higher up and can therefore detect bit flips
    within RabbitMQ nodes or load balancers and other bit flips that
    went unnoticed.
  2. Not only is the body checksummed, but also the properties and
    application-properties sections. This is an advantage over AMQP 0.9.1
    because the AMQP protocol disallows modification of the bare message.
  3. This feature is currently used for testing the RabbitMQ AMQP
    implementation, but it shows the feasibility of how apps could also
    get integrity guarantees of the whole bare message using HMACs or
    signatures.

3. Avoid parsing the AMQP body

Do not parse entire AMQP body.

Prior to this PR the entire amqp-value or amqp-sequence sections were parsed when receiving a message from a client.
Parsing the entire amqp-value or amqp-sequence sections can generate a huge amount of garbage depending on how large these sections are.

Not only will this PR not parse the amqp-value or amqp-sequence sections when receiving a message from a client, but also it won't parse the sections when converting from mc_amqp to another protocol, such as mc_amqpl or mc_mqtt. Given that other protocols cannot make use of amqp-value and amqp-sequence sections anyway, leave them AMQP encoded when converting from mc_amqp.

In fact prior to this commit, the entire body section was parsed generating huge amounts of garbage just to subsequently encode it again in mc_amqpl or mc_mqtt.

The new conversion interface from mc_amqp to other mc_* modules will either output amqp-data sections or the encoded amqp-value / amqp-sequence sections.

4. Faster AMQP parser

The AMQP parser got optimised.

When compiling file amqp10_binary_parser.erl with ERL_COMPILER_OPTIONS=bin_opt_info, all code now outputs OPTIMIZED: match context reused instead of BINARY CREATED or NOT OPTIMIZED. The only exception are arrays since arrays aren't used in the hot path.

Benchmarks

  1. To remove artificial bottlenecks, increase link credit RabbitMQ grants to the sending clients and the session window as follows:
diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl
index f26802be46..5c3ef1151e 100644
--- a/deps/rabbit/src/rabbit_amqp_session.erl
+++ b/deps/rabbit/src/rabbit_amqp_session.erl
@@ -29,7 +29,7 @@
 -define(HIBERNATE_AFTER, 6_000).
 -define(CREDIT_REPLY_TIMEOUT, 30_000).
 -define(UINT_OUTGOING_WINDOW, {uint, ?UINT_MAX}).
--define(MAX_INCOMING_WINDOW, 400).
+-define(MAX_INCOMING_WINDOW, 5000).
 %% "The next-outgoing-id MAY be initialized to an arbitrary value" [2.5.6]
 -define(INITIAL_OUTGOING_TRANSFER_ID, ?UINT_MAX - 3).
 %% "Note that, despite its name, the delivery-count is not a count but a
@@ -55,7 +55,7 @@
 %% on target queue type. For the time being just use a static value that's something in between.
 %% An even better approach in future would be to dynamically grow (or shrink) the link credit
 %% we grant depending on how fast target queue(s) actually confirm messages.
--define(LINK_CREDIT_RCV, 128).
+-define(LINK_CREDIT_RCV, 5000).
 -define(MANAGEMENT_LINK_CREDIT_RCV, 8).
 -define(MANAGEMENT_NODE_ADDRESS, <<"/management">>).
 -define(DEFAULT_EXCHANGE_NAME, <<>>).
  1. Start RabbitMQ and create 2 classic queues:
make run-broker PLUGINS="rabbitmq_management" FULL=1 RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+S 1 +JPperf true"
deps/rabbitmq_management/bin/rabbitmqadmin declare queue name=cq1 queue_type=classic durable=false
deps/rabbitmq_management/bin/rabbitmqadmin declare queue name=cq2 queue_type=classic durable=true
deps/rabbitmq_management/bin/rabbitmqadmin declare queue name=qq queue_type=quorum durable=true
  1. Start quiver:
docker run -it --rm --add-host host.docker.internal:host-gateway ssorj/quiver:latest
bash-5.1# quiver --version
quiver 0.4.0-SNAPSHOT
  1. Benchmark 1: classic queue with transient messages:
bash-5.1# quiver //host.docker.internal//queue/cq1 --duration 60s --body-size 12 --credit 10000

Results with PR:

Count ............................................. 3,699,634 messages
Duration ............................................... 60.0 seconds
Sender rate .......................................... 61,675 messages/s
Receiver rate ........................................ 61,666 messages/s
End-to-end rate ...................................... 61,665 messages/s

Results prior to PR:

Count ............................................. 3,460,189 messages
Duration ............................................... 60.0 seconds
Sender rate .......................................... 57,713 messages/s
Receiver rate ........................................ 57,675 messages/s
End-to-end rate ...................................... 57,673 messages/s
  1. Benchmark 2: classic queue with persistent messages:
bash-5.1# quiver //host.docker.internal//queue/cq2 --duration 60s --body-size 12 --credit 10000 --durable

Results with PR:

Count ............................................. 2,450,989 messages
Duration ............................................... 60.0 seconds
Sender rate .......................................... 40,973 messages/s
Receiver rate ........................................ 40,856 messages/s
End-to-end rate ...................................... 40,851 messages/s

Results prior to PR:

Count ............................................. 1,990,530 messages
Duration ............................................... 60.0 seconds
Sender rate .......................................... 33,301 messages/s
Receiver rate ........................................ 33,182 messages/s
End-to-end rate ...................................... 33,177 messages/s

Using persistent messages and classic queues, the end to end throughput increase is 23% with this PR.

For the last benchmark, the CPU flame graphs show that this PR spends only 2.6% instead of 5.5% in function rabbit_amqp_reader:parse_frame_body/2 and that this PR spends 18.9% instead of 25.6% in garbage_collect.

  1. Benchmark 3: quorum queue with persistent messages:
bash-5.1# quiver //host.docker.internal//queue/qq --duration 60s --body-size 12 --credit 10000 --durable

Results with PR:

Count ............................................. 3,103,960 messages
Duration ............................................... 60.0 seconds
Sender rate .......................................... 51,875 messages/s
Receiver rate ........................................ 51,774 messages/s
End-to-end rate ...................................... 51,740 messages/s

Results prior to PR:

Count ............................................. 2,487,133 messages
Duration ............................................... 60.0 seconds
Sender rate .......................................... 41,603 messages/s
Receiver rate ........................................ 41,486 messages/s
End-to-end rate ...................................... 41,459 messages/s

Using quorum queues, the end to end throughput increase is 24% with this PR.

@ansd ansd force-pushed the amqp-parser branch 4 times, most recently from 9a4ea2a to 0464769 Compare April 15, 2024 10:24
@ansd ansd changed the title WIP speed up AMQP parser WIP change AMQP on disk message format Apr 15, 2024
@ansd ansd force-pushed the amqp-parser branch 10 times, most recently from 4dc0ef9 to 85e289f Compare April 16, 2024 16:28
@lukebakken lukebakken self-requested a review April 17, 2024 13:18
@ansd ansd force-pushed the amqp-parser branch 6 times, most recently from 557e5d3 to 73d697a Compare April 22, 2024 13:13
ansd added a commit that referenced this pull request Apr 22, 2024
This is a follow up to #11012

 ## What?
For incoming MQTT messages, always set the `durable` message container
annotation.

 ## Why?
Even though defaulting to `durable=true` when no durable annotation is
set, as prior to this commit, is good enough, explicitly setting the
durable annotation makes the code a bit more future proof and
maintainable going forward in 4.0 where we will rely more on the durable
annotation because AMQP 1.0 message headers will be omitted in classic
and quorum queues (see
#10964)

For MQTT messages, it's important to know whether the message was
published with QoS 0 or QoS 1 because it affects the QoS the MQTT
message that will delivered to the MQTT subscriber.

The performance impact of always setting the durable annotation is
negligible.
ansd added a commit that referenced this pull request Apr 22, 2024
This is a follow up to #11012

 ## What?
For incoming MQTT messages, always set the `durable` message container
annotation.

 ## Why?
Even though defaulting to `durable=true` when no durable annotation is
set, as prior to this commit, is good enough, explicitly setting the
durable annotation makes the code a bit more future proof and
maintainable going forward in 4.0 where we will rely more on the durable
annotation because AMQP 1.0 message headers will be omitted in classic
and quorum queues (see #10964)

For MQTT messages, it's important to know whether the message was
published with QoS 0 or QoS 1 because it affects the QoS for the MQTT
message that will delivered to the MQTT subscriber.

The performance impact of always setting the durable annotation is
negligible.
@mergify mergify bot added the bazel label Apr 22, 2024
ansd added a commit that referenced this pull request Apr 22, 2024
This is a follow up to #11012

 ## What?
For incoming MQTT messages, always set the `durable` message container
annotation.

 ## Why?
Even though defaulting to `durable=true` when no durable annotation is
set, as prior to this commit, is good enough, explicitly setting the
durable annotation makes the code a bit more future proof and
maintainable going forward in 4.0 where we will rely more on the durable
annotation because AMQP 1.0 message headers will be omitted in classic
and quorum queues (see #10964)

For MQTT messages, it's important to know whether the message was
published with QoS 0 or QoS 1 because it affects the QoS for the MQTT
message that will delivered to the MQTT subscriber.

The performance impact of always setting the durable annotation is
negligible.
ansd added 19 commits May 2, 2024 07:55
and preserve original delivery_mode field
i.e. leave it undefined if it was sent as undefined
Prior to this commit test case
```
bazel test //deps/rabbit:amqp_client_SUITE-mixed -t- \
    --test_sharding_strategy=disabled --test_env \
    FOCUS="-group [cluster_size_3] -case quorum_queue_on_old_node"
```
was failing because `mc_amqp:size(#v1{})` was called on the old node
which doesn't understand the new AMQP on disk message format.

Even though the old 3.13 node never stored mc_amqp messages in classic
or quorum queues,
1. it will either need to understand the new mc_amqp message format, or
2. we should prevent the new format being sent to 3.13. nodes.

In this commit we decide for the 2nd solution.
In `mc:prepare(store, Msg)`, we convert the new mc_amqp format to
mc_amqpl which is guaranteed to be understood by the old node.
Note that `mc:prepare(store, Msg)` is not only stored before actual
storage, but already before the message is sent to the queue process
(which might be hosted by the old node).
The 2nd solution is easier to reason about over the 1st solution
because:
a) We don't have to backport code meant to be for 4.0 to 3.13, and
b) 3.13 is guaranteed to never store mc_amqp messages in classic or
   quorum queues, even in mixed version clusters.

The disadvantage of the 2nd solution is that messages are converted from
mc_amqp to mc_amqpl and back to mc_amqp if there is an AMQP sender and
AMQP receiver. However, this only happens while the new feature flag
is disabled during the rolling upgrade. In a certain sense, this is a
hybrid to how the AMQP 1.0 plugin worked in 3.13: Even though we don't
proxy via AMQP 0.9.1 anymore, we still convert to AMQP 0.9.1 (mc_amqpl)
messages when feature flag message_containers_store_amqp_v1 is disabled.
This commit fixes test
```
bazel test //deps/rabbitmq_mqtt:shared_SUITE-mixed -t- \
    --test_sharding_strategy=disabled --test_env \
    FOCUS="-group [mqtt,v3,cluster_size_3] -case pubsub"
```

Fix some mixed version tests

Assume the AMQP body, especially amqp-value section won't be parsed.
Hence, omit smart conversions from AMQP to MQTT involving the
Payload-Format-Indicator bit.

Fix test

Fix
```
bazel test //deps/amqp10_client:system_SUITE-mixed -t- --test_sharding_strategy=disabled --test_env FOCUS="-group [rabbitmq]
```
Ensure footer gets deliverd to AMQP client as received from AMQP client
when feature flag message_containers_store_amqp_v1 is disabled.

Fixes test
```
bazel test //deps/rabbit:amqp_system_SUITE-mixed  -t- --test_sharding_strategy=disabled --test_env FOCUS="-group [dotnet] -case footer"
```
AMQP 3.2.1 defines durable=false to be the default.

However, the same section also mentions:
> If the header section is omitted the receiver MUST assume the appropriate
> default values (or the meaning implied by no value being set) for the
> fields within the header unless other target or node specific defaults
> have otherwise been set.

We want RabbitMQ to be secure by default, hence in RabbitMQ we set
durable=true to be the default.
as they are only meant to be used from sending to receiving peer
Fix crashes when message is originally sent via AMQP and
stored within a classic or quorum queue and subsequently
dead lettered where the dead letter exchange needs access to message
annotations or properties or application-properties.
This commit enables client apps to automatically perform end-to-end
checksumming over the bare message (i.e. body + application defined
headers).

This commit allows an app to configure the AMQP client:
* for a sending link to automatically compute CRC-32 or Adler-32
checksums over each bare message including the computed checksum
as a footer annotation, and
* for a receiving link to automatically lookup the expected CRC-32
or Adler-32 checksum in the footer annotation and, if present, check
the received checksum against the actually computed checksum.

The commit comes with the following advantages:
1. Transparent end-to-end checksumming. Although checksumming is
   performed by TCP and RabbitMQ queues using the disk, end-to-end
   checksumming is a level higher up and can therefore detect bit flips
   within RabbitMQ nodes or load balancers and other bit flips that
   went unnoticed.
2. Not only is the body checksummed, but also the properties and
   application-properties sections. This is an advantage over AMQP 0.9.1
   because the AMQP protocol disallows modification of the bare message.
3. This commit is currently used for testing the RabbitMQ AMQP
   implementation, but it shows the feasiblity of how apps could also
   get integrity guarantees of the whole bare message using HMACs or
   signatures.
Prior to this commit the entire amqp-value or amqp-sequence sections
were parsed when converting a message from mc_amqp.
Parsing the entire amqp-value or amqp-sequence section can generate a
huge amount of garbage depending on how large these sections are.

Given that other protocol cannot make use of amqp-value and
amqp-sequence sections anyway, leave them AMQP encoded when converting
from mc_amqp.

In fact prior to this commit, the entire body section was parsed
generating huge amounts of garbage just to subsequently encode it again
in mc_amqpl or mc_mqtt.

The new conversion interface from mc_amqp to other mc_* modules will
either output amqp-data sections or the encoded amqp-value /
amqp-sequence sections.
Similar to how we convert from mc_amqp to mc_amqpl before
sending to a classic queue or quorum queue process if
feature flag message_containers_store_amqp_v1 is disabled,
we also need to do the same conversion before sending to an MQTT QoS 0
queue on the old node.
This is similar to #11057

What?
For incoming AMQP messages, always set the durable message container annotation.

Why?
Even though defaulting to durable=true when no durable annotation is set, as prior
to this commit, is good enough, explicitly setting the durable annotation makes
the code a bit more future proof and maintainable going forward in 4.0 where we
will rely more on the durable annotation because AMQP 1.0 message headers will
be omitted in classic and quorum queues.

The performance impact of always setting the durable annotation is negligible.
@ansd ansd changed the title WIP change AMQP on disk message format Change AMQP on disk message format & Speed up AMQP parser May 2, 2024
@ansd ansd added this to the 4.0.0 milestone May 2, 2024
@ansd ansd marked this pull request as ready for review May 2, 2024 11:59
@kjnilsson kjnilsson self-requested a review May 2, 2024 12:21
@michaelklishin michaelklishin changed the title Change AMQP on disk message format & Speed up AMQP parser 4.x: change AMQP on disk message format & speed up the AMQP parser May 2, 2024
Copy link
Contributor

@kjnilsson kjnilsson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me - a couple of optional comments only.
I can't say I have evaluated every single line of code fully but the approach is good.

Still not 100% sure that concatenating multiple data sections is the right way but it should be extremely rare I hope.

deps/amqp10_client/src/amqp10_client_session.erl Outdated Show resolved Hide resolved
deps/rabbit/src/mc.erl Show resolved Hide resolved
@ansd ansd merged commit 1ecaaad into main May 3, 2024
18 checks passed
@ansd ansd deleted the amqp-parser branch May 3, 2024 12:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants