Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Grouping message in the same queue #43

Open
khashish opened this issue Oct 14, 2020 · 6 comments
Open

Grouping message in the same queue #43

khashish opened this issue Oct 14, 2020 · 6 comments
Labels
limitation This is a system limitation

Comments

@khashish
Copy link

khashish commented Oct 14, 2020

Is your feature request related to a problem? Please describe.
Messages in the queue don’t have the option to have a group id to signal them for fifo execution

Describe the solution you'd like.
When enqueuing a message I should provide a message group id that would allow all messages in that queue with the same group id to execute in sequence while still allowing concurrency across different group ids

Describe alternatives you've considered
Using SQS fifo since it allows both concurrent execution across different group ids and respects sequential execution for messages with same group id

@sonus21
Copy link
Owner

sonus21 commented Oct 15, 2020

How many group ids do you expect?

There's an easiest way to solve if you have handful number of group ids, you can use group id as priority.

Use equal priority and concurrency=1

@RqueueListener(value = "event_queue", priority="critical=1,high=1,medium=1,low=1", concurrency=1)
  public void onMessage(Event event) {
    log.info("Event : {}", event);
  }

You can also define groupIds in the application configuration

# g1,g2,g3,g4,g5,g6 are group ids
rqueue.event.queue.priority="g1=1,g2=1,g3=1,g4=1,g5=1,g6=1"


@RqueueListener(value = "event_queue", priority="${rqueue.event.queue.priority}",concurrency=1)
  public void onMessage(Event event) {
    log.info("Event : {}", event);
  }

@khashish
Copy link
Author

khashish commented Nov 1, 2020

@sonus21 Thanks for your reply.

The group ids are dynamic and not a predefined static list. Imagine that you are creating an online ordering service and you want to group the messages based on the order id allowing for messages related to the same order id to be enqueued in a FIFO manner while at the same time allowing messages across different order ids to be executed concurrently.

So if message 1 = {order_id: 1, action="create_order"} , message 2 = {order_id: 1, action="reserve_credit"} in that case message 1 is consumed always before message 2

if however message 1 = {order_id: 1, action="create_order"} , message 2 = {order_id: 2, action="reserve_credit"} then there is no need for these to be executed in any particular order meaning that there is no problem of executing them concurrently.

That way you are able to provide ordering guarantees when needed while still maintaining a good throughput by allowing for concurrent consumption when it is possible.

@sonus21
Copy link
Owner

sonus21 commented Nov 2, 2020

@khashish Thanks for the detail.

This feature is quite complex to support, you can use sharding/partitioning concept to deal with this. For example in Kafka we create partitions, so we can create partition in similar fashion but we need to add application code to deal with partitioning.

You can use any hashing strategy to deal with this, for example a simple hashing strategy could be

group id = int(order id) % (number of partition)

Once you have group id, you can enqueue item in the corresponding group of a queue, I would suggest you set concurrency value to less than or equal to number of partitions and you should use weight=1.

What about increasing/decreasing number of partition?

There's a no rebalancing mechanism like Kafka, so we need to deal with that. One simple strategy could be, we set the number of partition to large value like 127/257/509 (prime number) etc but smaller concurrency like 10 so you have 10 workers and if you see you need more workers than you can increase concurrency later. Changing concurrency is very easy than changing number of partition.

What about retry mechanism?
You should disable all retry on this queue, as it might cause problem. Using retry mechanism on this queue entry could lead to out of order execution. But you can retry the same message multiple times once it's removed from queue, this can be added per queue if it's required, currently it's a global value. The rqueue.retry.per.poll property is used for retry mechanism, default value is 1 you can change this to 3 if you see, that's applicable.

@sonus21
Copy link
Owner

sonus21 commented Nov 4, 2020

There's one more issue left to solve here, due to competing listeners/consumers out of order execution can happen. To avoid this problem, we should set only set of group ids aka priority in one listener and we should not run competing listeners. For example let's say I have 10 workers and 127 group ids, each worker should be processing only subset of groups, for example let's say I want to run 5 competing consumers, as we need to run only 10 workers and we've 5 competing consumers, each consumer should have concurrency of 2 (number of workers/number of competing consumers), given we have 127 worker group ids, we should distribute them across all consumers.

[0,25) => Listener on machine 1
[25,50) => Listener on machine 2
[50,75) => Listener on machine 3
[75,101) => Listener on machine 4
[101,127) => Listener on machine 5

@sonus21 sonus21 added the limitation This is a system limitation label Nov 6, 2020
@febinct
Copy link

febinct commented Aug 7, 2021

Is there any plan to support this ?

@sonus21
Copy link
Owner

sonus21 commented Aug 8, 2021

@febinct would workaround solve your usecase?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
limitation This is a system limitation
Projects
None yet
Development

No branches or pull requests

3 participants