Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

To subscribe to multiple subjects in one subscription #152

Open
fmvin opened this issue Jul 28, 2018 · 21 comments
Open

To subscribe to multiple subjects in one subscription #152

fmvin opened this issue Jul 28, 2018 · 21 comments
Assignees
Labels
enhancement Enhancement to existing functionality

Comments

@fmvin
Copy link

fmvin commented Jul 28, 2018

This copy of the discussion from https://groups.google.com/d/topic/natsio/0nuqWz1BWBY.

The answer was no with explanation what wildcards should be used to subscribe to several subjects. Obviously if program should subscribe to logically different data from different layers in large real system, for example, global system mode, temperature sensor data, database, GUI, and system failures, it becomes impossible to organize subjects names in such structure where wildcards can help to avoid using of creation of several subscribers. Of course, subscription to everything with “>” is not looking good.

So users should manually create additional wrapper around of cnats to provide multi subscription (btw, Kafka has multisubscription from the box).

@kozlovic
Copy link
Member

But what's the problem you are trying to solve by having subscriptions to several subjects into one? Is it to limit the number of threads per subscription? If so, you can have a thread pool managing all subscriptions for a given connection. Check natsOptions_UseGlobalMessageDelivery for details.

This would reduce the number of dispatcher threads if you have many subscriptions, but a given subscription will be bound to a given thread in the pool so that delivery of messages for this subscription is ordered.

Note that the server does not guarantee ordering for matching subscriptions. So suppose your application creates 2 subscriptions with overlapping subjects: foo.* and foo.> (or *.*, >, etc..), then when a message is produced, say on foo.bar, the server will get the list of matching subscriptions in no particular order, and then send the messages. So for one incoming message, the server may get the list of subscriptions as foo.> then foo.*, next time it could be the opposite. So grouping subscriptions in the client as a single subscription in the intent to preserve ordering is moot.

@fmvin
Copy link
Author

fmvin commented Jul 31, 2018

But what's the problem you are trying to solve by having subscriptions to several subjects into one?

Just a syntactic sugar to help creation subscribers with the same natsMsgHandler callback and easy way to unsubscribe using natsSubscription_Destroy(). Right now we need to loop on subjects list to subscribe and then unsunscribe to. As I mentioned above we have a lot of subject groups organized in several independent "trees", so wildcards don't help to automate subscriptions each time.

Is it to limit the number of threads per subscription?

No limits.

@jim-king-2000
Copy link

jim-king-2000 commented Oct 15, 2018

I have a scenario which depends on subscription to multiple subjects.

Let's say we have 3 rooms (room IDs are 1, 2, 3) which are installed temperature sensors. Three users want to monitor the real-time temperature stream (be rendered in charts). User A wants to see the temperature charts of ROOM 1 and 2. User B wants the ones of ROOM 2 and 3, and User C, ROOM 3 and 1.

And we have the following extentions:

  1. Number of rooms could be N and number of users could be M. Both are unpredictable.
  2. Users may change the subscription. For example, User A may wants to see the ones of ROOM 1, 2 and 3.

My solution is to use MQ together with websocket. One user consumes one websocket connection and one mq subscription. One room creates one mq subject. Thus, every messages generated by ROOM 1 are published to subject "1", ROOM 2 to subject "2", ROOM 3 to subject "3". User A subscribes to subject "1" and "2", User B, "2" and "3", User C, "3" and "1". If user changes the subscription, we have to unsubscribe the old subjects and subscribe the new subjects. Wildcards are not helpful here.

That's why we need subscribe to multiple subjects.

@kozlovic kozlovic added the enhancement Enhancement to existing functionality label Oct 15, 2018
@kozlovic
Copy link
Member

@jim-king-2000 I am a bit confused in how subscribing to multiple subjects would be different. What I mean is that the server does not support that, so it means that anything we would do in the client would be some kind of wrapper that would under the hood create subscriptions and unsubscribe them.

You say that If user changes the subscription, we have to unsubscribe the old subjects and subscribe the new subjects. That's why we need subscribe to multiple subjects.. How would that be different? Say that we have a new API that allows to specify a list of subjects, or add/remove subjects. How is that different? Do you have pseudo code in mind on how this would look like?

@jim-king-2000
Copy link

jim-king-2000 commented Oct 16, 2018

@kozlovic I'm investigating various message queues for implementing our scenario. I have the real demo code using RabbitMQ, but does not have the demo code using NATS yet.

I'm apt to use NATS against RabbitMQ because of the extraordinary performance. But how can I make the wrapper which would under the hood create multiple subscriptions? Can you provide the demo code?

@kozlovic
Copy link
Member

@jim-king-2000 I don't because I am not sure how you think things should behave. You say that you have a demo for RabittMQ. Any code or pseudo code that you could share? I am curious on how you create the subscription on multiple subjects and how you remove some of the subjects and add new ones to the list.

I could imagine adding new APIs that subscribe based on a list of subjects. A single callback would be provided. You could remove subjects from this object to remove interest or add more. For instance something like:

natsStatus
natsConnection_SubscribeList(natsSubscription **sub, natsConnection *nc, char **subjects, int count, natsMsgHandler cb, void *closure);

natsStatus
natsSubscription_AddSubjects(natsSubscription *sub, char **subjects, int count);

natsStatus
natsSubscription_RemoveSubjects(natsSubscription *sub, char **subjects, int count);

This would not prevent users from adding overlapping subjects though. That is, if the list contains foo.* and foo.> then a message on foo.bar would be presented twice to the callback.

Does that look like something that could work for you and @fmvin too?

@jim-king-2000
Copy link

jim-king-2000 commented Oct 16, 2018

Hi @kozlovic , let me share my demo code using RabbitMQ here. The channel(variable ch) mode is "direct".

// publisher, publish 3 messages which keys(subjects) are 0, 1, and 2.
for (let i = 0; i < 3; ++i)
    await ch.publish(
      q, // exchange to publish
      `${i}`, // "key" for RabbitMQ("subject" for NATS)
      Buffer.from(`Hello World!${i}`));
// first subscriber, ask for key(subject) 0 and 1
ch.bindQueue(queue0.queue, q, '0');
ch.bindQueue(queue0.queue, q, '1');
await ch.consume(queue0.queue, msg => console.log(msg.content.toString()), {noAck: true});

// second subscriber, ask for key(subject) 1 and 2
ch.bindQueue(queue1.queue, q, '1');
ch.bindQueue(queue1.queue, q, '2');
await ch.consume(queue1.queue, msg => console.log(msg.content.toString()), {noAck: true});

// third subscriber, ask for key(subject) 2 and 0
ch.bindQueue(queue2.queue, q, '2');
ch.bindQueue(queue2.queue, q, '0');
await ch.consume(queue2.queue, msg => console.log(msg.content.toString()), {noAck: true});

Althought there are 3 messages which are sent to the exchange of RabbitMQ, the 3 subscribers would receive 6 messages altogether. The broker is responsible for the message duplication and routing. If we want to change the subscription, we can use "ch.unbindQueue".

If NATS could support this, I'll choose NATS. Since I'm new to NATS, I need some help.

@fmvin
Copy link
Author

fmvin commented Oct 16, 2018

kozlovic: Does that look like something that could work for you and @fmvin too?

It's actually what we are looking for. BTW we already created a C++ wrapper which provides exactly the same functionality you proposed above ;) In my point of view cnats should manage overlapping subjects internally and call the callback only once in your example.

@jim-king-2000
Copy link

Hi @fmvin , could you show the demo code (the wrapper)? I'm really interested in it.

@fmvin
Copy link
Author

fmvin commented Oct 16, 2018

Hi @fmvin , could you show the demo code (the wrapper)? I'm really interested in it.

I think it won't be informative because this stuff is deeply depended on our system architecture. In two words we created a map of subjects as keys and natsSubscription as a values. If our high level protocol is opened then we loop this map and creates Subscription with the same callback. If our protocol is closed we loop this map one more time and destroy opened subscriptions. Something like this...

@jim-king-2000
Copy link

Thank you, @fmvin . Maybe I could try to creates multiple subscriptions(different keys) with the same callback.

@kozlovic
Copy link
Member

@jim-king-2000 Thanks for sending the demo code. The first thing that you should be aware with NATS is that there is no persistence and is not a queuing system like a JMS broker. That is, in the demo code, you are publishing messages to a queue and later consuming them. With NATS, that would not work. If a message is published and there is no subscriber for that message at that moment, the message is dropped.

@fmvin @jim-king-2000 The issue with creating your own wrapper is that you won't be able to ensure that the callback is invoked serially for all those subjects. That is, each subscription has its own dispatcher thread and if you "share" the callback, it may be executed in parallel. If that's not an issue, then that's ok.

@fmvin Preventing the overlapping is impossible. Internally, I would have no choice but create individual subscriptions (one per given subject). Messages would then arrive one per subscription, but there is no way to detect if a message should be suppressed or not. Even if we were to implement the sublist code that is in the server to figure out the number of matching subscriptions for a given message, the problem is that when getting a message, we would not know if there is a second coming and be able to suppress it.
As an example, say that you have a list subscription for foo.* and foo.>, but also some other normal subscriptions like foo.bar. Say that internally, those subscriptions have ID 1, 2 and 3 respectively.
If a message M is published on foo.bar, the connection will receive 3 messages (1 per subscription - message protocol has the subscription ID in it). The order at which they are received is not guaranteed, that is, server may send the message for subscriptions 3, 1 and 2, or 1, 2 and 3, etc.. and order may change per message. So say that the messages are received like this:

M-S1, M-S3, M-S2

When getting M-S1 we would know that S1 is part of that list subscription and give it to the unique callback. M-S3 is given to the standalone subscription on foo.bar. M-S2 is again part of the list subscription, but how do I know that I should suppress because it matches both foo.* and foo.>? There is no sequence or things like that in NATS that would allow me to detect that M has several subscriptions match and was already presented to the callback. In other words, try to picture that you are the connection and receive a message with a subscription ID attached, you'll know that it is part of a "subscription list", but have no knowledge of what happened before or after, and no way to uniquely identify a message. How do you decide if you drop the message or give it to the "subscription list"?

So if the multi-subscription feature would not make sense without handling subjects overlap, then there is no reason to proceed with this enhancement request. What are your thoughts?

@jim-king-2000
Copy link

If a message is published and there is no subscriber for that message at that moment, the message is dropped.

That is exactly what we want. We don't expect queueing messages. Since I'm also new to RabbitMQ, I don't know how to drop messages when there is no subscriber in RabbitMQ. What we truly need is only message routing.

That is, each subscription has its own dispatcher thread and if you "share" the callback, it may be executed in parallel.

It's not an issue, too. I will use node.js client, which is single-threaded innately. Although I still expect one API to subscribe multiple subjects.

It seems NATS could meet our requirements. I'll investigate it immediately. Thank @kozlovic for your patience and reply.

@kozlovic
Copy link
Member

If you use node.js, then why asking for multiple subscriptions in the C client :-) I don't see the need for a new API then (especially in Node,js). Why can't you just add subscriptions and unsubscribe them when no longer needed?

@jim-king-2000
Copy link

Sorry, I didn't know the thread is specific to C client. I'll try adding subscriptions in node.js immediately.

@kozlovic
Copy link
Member

No worries...

@laugimethods
Copy link

As a connector developper (Spark / NATS), it would be great to be able to specify at once multiple subscriptions (like "Subject1,Subject2"), instead of providing a list of subjects.

Remaining question: where to implement that feature?
a) natively on the NATS protocol
b) on the client librairies (GO, Java, etc)
c) on the final clients (including the various connectors - as I did -)

I would choose option a) since 1) it should be the most efficient (broader filtering on the server side - but it is probably more complex than that-), 2) it will automatically be made available to all librairies (without need of code change).

@delaneyj
Copy link

delaneyj commented Jan 9, 2023

With the addition of JetStream this issue is more apparent. Multiple subscriptions is less effective when you are doing reliable order message handling. Otherwise if you are subscribing to multiple specific messages you have a lot of bookkeeping to do each time.

For example if you have something like a subscription to us.zip.{89129,90210,12345}.* on a jetstream involving address changes. The amount of zip codes to what a county would want to handle versus the # of zip codes is very low.

Another example more related to my use case is Slipppy Map tiles. You make have someone looks at a city at Z15 but have interest in changes up to Z0 (which is the single tile for the world). Each handler has overlap but the entire possible key space is huge compared to what you really want to listen to. In the case of map data with large binaries in the current setup you are throwing most of the messages on the floor if you use a wildcard.

@bruth
Copy link
Member

bruth commented Jan 9, 2023

@delaneyj In the context of JetStream, nats-io/nats-server#2515 would be applicable.

For core pub/sub, I suppose a declaration of us.zip.{89129,90210,12345}.* would have the server manage the pattern matching and distribution over a single subscription rather than the client have to manage that.

@levb levb self-assigned this May 22, 2023
@nickchomey
Copy link

nickchomey commented Mar 31, 2024

I just stumbled upon this - it would seem that the issue is now resolved with the 2.10 release of filterSubjects?

@levb
Copy link
Collaborator

levb commented May 3, 2024

For JetStream, #750 adds js_SubscribeMulti and js_SubscribeSyncMulti. It will be included in the next v3.9 release.

I am 1/5 based on @kozlovic early comments that we should leave this out of the "core" NATS API since it would be merely syntactic sugar.

@delaneyj @nickchomey @bruth @laugimethods @fmvin @jim-king-2000 Do you think we can resolve this ticket?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Enhancement to existing functionality
Projects
None yet
Development

No branches or pull requests

8 participants