Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

In Springwolf UI, add support for sending RabbitMQ Message towards an exchange using a RoutingKey #366

Open
pdalfarr opened this issue Sep 25, 2023 · 4 comments
Labels
amqp enhancement New feature or request

Comments

@pdalfarr
Copy link
Contributor

Describe the feature request
In Springwolf UI, add support for sending RabbitMQ Message towards an exchange using a Routing Key.

Motivation
Springwolf is able to list all the Publishers and Subscribers and present a UI which, not only list these, but also allow to test them by sending message.
Regarding RabbitMq, this 'message sending' feature is working fine for RabbitMQ QUEUES, but does not work for EXCHANGE.
This feature request aims to implements this missing part.

Technical details

Here are some technical details I gathered so far:

The springwolf UI allows to send messages to QUEUES (when plugin.amqp.publishing.enabled: true )
which is OK when specifying queue name in AsyncOperation.channelName

But sending a message to a TOPIC (exchange) is not working as expected:

Let's take an example:

In one of the RabbitMQ your example, here

https://github.com/springwolf/springwolf-core/blob/master/springwolf-examples/springwolf-amqp-example/src/main/java/io/github/stavshamir/springwolf/example/amqp/producers/ExampleProducer.java#L17-L28

we have:

   @AsyncPublisher(
            operation =
                    @AsyncOperation(
                            channelName = "example-producer-channel-publisher",
                            description = "Custom, optional description defined in the AsyncPublisher annotation"))
    @AmqpAsyncOperationBinding()
    public void sendMessage(ExamplePayloadDto msg) {
        // send
        AnotherPayloadDto dto = new AnotherPayloadDto("fooValue", msg);
        rabbitTemplate.convertAndSend("example-topic-exchange", "example-topic-routing-key", dto);
    }

Now, let's suppose we have, in another micro-service, a method with the following annotation:

    @Override
    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(
                            name = "microservice-2-private-queue-name" // this is a private queue of this micro-service: I do not want this string value to be known by the first micro-service (micro-service 1), i.e., your sample code
                    ),
                    exchange = @Exchange(
                            name = "example-producer-channel-publisher", // the exchange from in your sample code
                            type = ExchangeTypes.TOPIC
                    ),
                    key = "example-topic-routing-key" // the key from your sample code
            )
    )

So, this microservice2 does declare a topic which receives messages send by the RabbitMQ 'sample' code :

rabbitTemplate.convertAndSend("example-topic-exchange", "example-topic-routing-key", dto);

This is working fine, BUT, when using the Springwolf UI, it does not work.

It seems that messages sent from the UI goes to a QUEUE named "example-producer-channel-publisher" along with a routing-key with the same value, i.e. "example-producer-channel-publisher"

Ideas:
Maybe we could make use of amqpchannelbinding to express the fact that we want 'the UI' to send message towards a given exchange (a topic in my example), along with a routing key ?

In the example I gave here, your code (micro-service 1) is sending a message to a topic (rabbitTemplate.convertAndSend("example-topic-exchange", "example-topic-routing-key", dto);
).
Of course, I do NOT want the "microservice-2-private-queue-name" queue name (from micro-service 2) to be known by the AsyncPublisher annotation of micro-service 1.

And I want my microservice-2 being able to change 'at will' the name of it's own internal/private queue.
microservice-1should only know about the exchange+routing-key of micro-service 2.
In other words, microservice-1, must send message to this (let's say public) exchange, and this is what is actually done in the code.
So I guess this should be reflected in the annotation, as well as in the behavior of Springwolf UI.

I tested this on my side in my project, and the messages send from the UI get delivered to micro-service 2 if I set, in micro-service 1, the channelName to "microservice-2-private-queue-name".

Describe alternatives you've considered
I haven't considered any alternatives yet.

@pdalfarr pdalfarr added the enhancement New feature or request label Sep 25, 2023
@timonback
Copy link
Member

Thanks for this elaborate explanation.
Now, I understand that an exchange with a routingKey is a feature to hide the underlying queue name - or adjust it dynamically.

Basically you want to use springwolf-ui on microservice 1 to publish a message. It should take the info from rabbitTemplate.convertAndSend("example-topic-exchange", "example-topic-routing-key", dto);.
The exchange knows the private queue of microservice 2 and does the mapping internally.

Springwolf UI does not support this at this point.
I see three things that would need to change:

  1. Microservice 1 needs to be aware of the routingKey, i.e. through the AmqpAsyncChannelBinding annotation
  2. The Springwolf UI would need to extract the routingKey from the doc and render - specific for amqp - an additional publishing field - or at least pass it on.
  3. The SpringwolfAmqpProducer needs to send the message to the specified routingKey that is supplied from the frontend.

Feel free to contribute this feature, we are happy to help.

@timonback timonback added the amqp label Oct 6, 2023
@pdalfarr
Copy link
Contributor Author

pdalfarr commented Nov 22, 2023

Just adding a link and a picture to illustrate the different kind of exchanges supported by RabbitMQ:

image

src: https://hevodata.com/learn/rabbitmq-exchange-type/

So the 'complete chain' is:

Producer > Channel > Exchange > Binding > Routing Key > Queue > Consumer

@pdalfarr
Copy link
Contributor Author

pdalfarr commented May 31, 2024

I am not really sure of myself here, but still I do share some thoughts to try to move things forward.
The text below is a kind of brainstorming if you will.

I had a look at the code and I think that maybe we could first change a bit the code without adding a new 'routingKey' attribute you mentioned.

1. What if we also add bindings here:

ChannelObject channelItem = channelBuilder
.messages(Map.of(message.getMessageId(), MessageReference.toComponentMessage(message)))
.build();

So we we would have something like

 ChannelObject channelItem = channelBuilder 
         .messages(Map.of(message.getMessageId(), MessageReference.toComponentMessage(message))) 
         .bindings(toChannelBindings(operation.getBindings()) 
         .build();

2. So ChannelBinding would be available for these 2 methods here

(In my case, when I use AsyncPublisher + AsyncOperation and AmqpAsyncOperationBinding, the code below does not find any "amqp" bindings) :

private String getExchangeName(ChannelObject channelItem) {
String exchange = "";
if (channelItem.getBindings() != null && channelItem.getBindings().containsKey("amqp")) {
AMQPChannelBinding channelBinding =
(AMQPChannelBinding) channelItem.getBindings().get("amqp");
if (channelBinding.getExchange() != null
&& channelBinding.getExchange().getName() != null) {
exchange = channelBinding.getExchange().getName();
}
}
return exchange;
}
private String getRoutingKey(Operation operation) {
String routingKey = "";
if (operation != null
&& operation.getBindings() != null
&& operation.getBindings().containsKey("amqp")) {
AMQPOperationBinding operationBinding =
(AMQPOperationBinding) operation.getBindings().get("amqp");
if (!CollectionUtils.isEmpty(operationBinding.getCc())) {
routingKey = operationBinding.getCc().get(0);
}
}
return routingKey;

By doing this I think we should be able to deal with the type of AMQPChannelBinding

If it's AMQPChannelType.ROUTING_KEY, then we should publish like so:

    rabbitTemplate.convertAndSend("exchange-name", "routing-key-as-per-AMQPOperationBinding.cc[0]-OR-empty-string-if-not-defined", dto); 

and if it's AMQPChannelType.QUEUE, then we should publish like so:

    rabbitTemplate.convertAndSend("", "queue-name-as-routing-key", dto); // we publish towards the 'default exchange'

for the latter case, maybe we should be able to to specify / or to obtain from some place , the exchange, other than default exchange?
Or maybe the AMQPChannelType.QUEUEis meant to declare a queue, but not a binding to it.. so there is no point of taking this into in the from of performing a rabbitTemplate.convertAndSend method call.. this I am not really sure of.

3. About existing annotations

FYI, the method I am testing in the frame of this explanation has the following annotations

    @AsyncPublisher(
            operation = @AsyncOperation(
                    channelName = "", // I want to publish on exchange "", is this the proper way to specify this? this generate some weird things in asyncapi.yaml , like '_send_other_service.get-observations.v1' and such ... 
                    description = "other_service.get-observations.v1" + " description",
                    payloadType = GetObservationsRequest.class
            )
    )
    @AmqpAsyncOperationBinding(
            // cc[0] = routingKey, as defined by the other service
            // is this OK? or should I write cc = "other_service.get-observations.v1", without { } ?
            cc = {"other_service.get-observations.v1"}
    )
AmqpAsyncOperationBinding.cc[0] )
    @Override
    public void sendObservationsRequest(

So, here, I am expecting SpringWolf UI to do this:

rabbitTemplate.convertAndSend( AsyncPublisher.operation.channelName , AmqpAsyncOperationBinding.cc[0] )

which is, with actual values :

rabbitTemplate.convertAndSend( "" , "other_service.get-observations.v1")

but what I do observe is this:

rabbitTemplate.convertAndSend( "" , "")

so, routingKey is not 'taken from' AmqpAsyncOperationBinding.cc[0]

4. Possible or not ?

What is your opinion on the idea presented here?
Is it feasible and correct regarding the existing code base?
I am a bit worried about adding bindings in ChannelObject as I have no idea about the implication of such a change.
I do not have the big picture of SpringWolf in mind, so maybe it would break things here and there.

(
5. What's next

Once done, we could could possibly think of a feature in SpringWolf UI which would be 'overwrite routingKey'.
But I guess we could first focus in the current proposal here.
)

@pdalfarr
Copy link
Contributor Author

pdalfarr commented Jun 3, 2024

FYI, here is a screenshat of RabbitMQ management UI.
This screenshot illustrates how one can send a message "towards a queue".
More precisely, the message will be sent

  • to the "Default Exchange" (i.e., the Direct Exchange with name "")
  • with routingKey being the name of the queue (here "contr.test")

(The "Default exchange" then will 'route' the message to the queue)

image

Notes

  • Headers can be specified: SpringWolf already support this thanks to @AsyncOperation.Headers , which is great.
  • Properties can be defined: as far as I know, SpringWolf does not support Properties. Or maybe I missed something.
    Properties section would be nice to have because it would allow to specify additional info, like 'correlation-id', 'reply-to', and other essential properties
  • (FYI, UI to send a message towards an Exchange (instead of a queue) is a bit different:
    it takes into acount exchagne name (of course) and also has an addition 'routingKey' form input

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
amqp enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants