Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE] Support message TTL and dead letter queue #832

Open
GhostBoyBoy opened this issue Feb 27, 2023 · 0 comments
Open

[FEATURE] Support message TTL and dead letter queue #832

GhostBoyBoy opened this issue Feb 27, 2023 · 0 comments
Labels
type/feature Indicates new functionality

Comments

@GhostBoyBoy
Copy link
Contributor

GhostBoyBoy commented Feb 27, 2023

reference:
1.https://www.rabbitmq.com/ttl.html
2.https://www.rabbitmq.com/dlx.html

implementation:

  1. The queue is created with the parameter set to PersistentQueue, the pulsar topic is fetched at creation, Through the topic - > managedLedger - > cursor, in a similar (cursor.asyncFindNewestMatching) API
cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, entry -> {
                try {
                    long entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer());
                    return MessageImpl.isEntryExpired(messageTTLInSeconds, entryTimestamp);
                } catch (Exception e) {
                    log.error("[{}][{}] Error deserializing message for expiry check", topicName, subName, e);
                } finally {
                    entry.release();
                }
                return false;
            }, this, null);
  1. We need a regular task to keep checking and, according to the rabbitmq ttl definition, we only need to check if the first item has expired.
  2. According to the definition of a dead letter queue, messages need to be sent to a dead letter queue when they expire. The parameters of the dead letter queue are maintained by the AmqpQueue and can be sent directly to the dead letter queue through the scheduled tasks in 2 (if configured by the user, otherwise deleted directly from the queue).
  3. When both a per-queue and a per-message TTL are specified, the lower value between the two will be chosen.

Implementation of the dead letter queue:
1.The message is negatively acknowledged by a consumer using basic.reject or basic.nack with requeue parameter set to false.
2.The message expires due to per-message TTL; or
3.The message is dropped because its queue exceeded a length limit

For 1: Implement api semantics.
For 2: As described above.
For 3:
Maximum number of messages can be set by supplying the x-max-length queue declaration argument with a non-negative integer value.
Maximum length in bytes can be set by supplying the x-max-length-bytes queue declaration argument with a non-negative integer value.
These two parameters are available in AmqpQueue,maybe we can do this with backlog.

@GhostBoyBoy GhostBoyBoy added the type/feature Indicates new functionality label Feb 27, 2023
@GhostBoyBoy GhostBoyBoy changed the title [FEATURE] Support message TTL [FEATURE] Support message TTL and dead letter queue Feb 27, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/feature Indicates new functionality
Projects
None yet
1 participant