Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DATAGO-74463: Add Dynamic Producer Destination to the binder #39839

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## 4.18.0-beta.1 (Unreleased)

### Features Added
- Dynamic Destination Enablement. By setting the AzureHeaders.Name header, you are able to route to different destination dynamically.

### Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ protected MessageHandler createProducerMessageHandler(
ProducerDestination destination,
ExtendedProducerProperties<ServiceBusProducerProperties> producerProperties,
MessageChannel errorChannel) {
Assert.notNull(getServiceBusTemplate(), "ServiceBusTemplate can't be null when create a producer");
Assert.notNull(initServiceBusTemplate(producerProperties), "ServiceBusTemplate failed to be initialized. It can't be null when creating a producer");

extendedProducerPropertiesMap.put(destination.getName(), producerProperties);
DefaultMessageHandler handler = new DefaultMessageHandler(destination.getName(), this.serviceBusTemplate);
Expand Down Expand Up @@ -265,7 +265,7 @@ public void setBindingProperties(ServiceBusExtendedBindingProperties bindingProp
this.bindingProperties = bindingProperties;
}

private ServiceBusTemplate getServiceBusTemplate() {
private ServiceBusTemplate initServiceBusTemplate(ExtendedProducerProperties<ServiceBusProducerProperties> producerProperties) {
if (this.serviceBusTemplate == null) {
DefaultServiceBusNamespaceProducerFactory factory = new DefaultServiceBusNamespaceProducerFactory(
this.namespaceProperties, getProducerPropertiesSupplier());
Expand All @@ -278,6 +278,9 @@ private ServiceBusTemplate getServiceBusTemplate() {
instrumentationManager.addHealthInstrumentation(instrumentation);
});
this.serviceBusTemplate = new ServiceBusTemplate(factory);
if (producerProperties.getExtension() != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering why this entity type is missing from the producer properties retrieved from line 271, the getProducerPropertiesSupplier?

Copy link
Author

@Aryelmr Aryelmr May 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's because the key to return the extended properties is the destination name. When it uses the Dynamic Destination name to pull the properties to create the producer it's not going to find it. Right here:
Permalink

@Override
    public ServiceBusSenderAsyncClient createProducer(String name, ServiceBusEntityType entityType) {
        ProducerProperties producerProperties = this.propertiesSupplier.getProperties(name) != null
            ? this.propertiesSupplier.getProperties(name) : new ProducerProperties();
        if (entityType != null) {
            producerProperties.setEntityType(entityType);
        }
        return doCreateProducer(name, producerProperties);
    }

So it'll create a default ProducerProperty and if the defaultEntityType is set, it'll set the entityType. As the defaultEntityType is not set, then it'll fail to publish to the dynamic destination. That's why in there I'm not setting the entityType and setting the defaultEntityType in case the destination is dynamic is used and no ProducerProperty is register in its name.

I've tested in our internal connector manually and on integration test and it works fine when defaultEntityType is set.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thanks. So a better way to set the type is probably to use another header.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you help add some comments around this code? Thanks.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@saragluna
So you mean you would like to add a new producer property so the user can define in the YAML what would be the defaultEntityType?
About the code, do you mean adding some comments in the code or just in the PR so it makes easier to understand? If it's the comments in the code, wouldn't be better some Java Docs instead?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update, the other way to set the entity type via the header I was talking about is exactly like this setHeader(SolaceBinderHeaders.TARGET_DESTINATION_TYPE, "topic").

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll try the NewDestinationBindingCallback to see what causes the issue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @Aryelmr, I've created this project https://github.com/saragluna/demo-servicebus-binder-dynamic-destinations to test the dynamic destinations. In my test, there are two ways to configure the default entity type:

Could you help look to see whether these two approaches work in your case? If so, I think we don't need to make changes in this PR now. Thanks in advance.

Copy link
Author

@Aryelmr Aryelmr May 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@saragluna
Thank you for coming up with those alternatives to see if we could get the same results with the code changes :)
Unfortunately, they didn't work.
On DefaultServiceBusNamespaceProducerFactory:createProducer it just creates a plain ProducerProperties so it doesn't get the ExtendendProducerProperties to get the default values and the NewDestinationBindingCallback didn't work as well.

For the setHeader(SolaceBinderHeaders.TARGET_DESTINATION_TYPE, "topic") Solace approach is nice.
In the same way as the destination we could check if the EntityType header is set and if so pass the value. E.g.:

    @Override
    public <U> Mono<Void> sendAsync(String destination, Message<U> message) {
        Assert.hasText(destination, "destination can't be null or empty");
        ServiceBusEntityType entityType = getEntityType(message);
        ServiceBusSenderAsyncClient senderAsyncClient =
                     this.producerFactory.createProducer(destination, entityType);
        ServiceBusMessage serviceBusMessage = messageConverter.fromMessage(message, ServiceBusMessage.class);
        return senderAsyncClient.sendMessage(serviceBusMessage);
    }
    
    private ServiceBusEntityType getEntityType(Message<?> message) {
        if (message.getHeaders().containsKey(ServiceBusHeaders.TARGET_DESTINATION_TYPE)) {
            return message.getHeaders().get(ServiceBusHeaders.TARGET_DESTINATION_TYPE, ServiceBusEntityType.class);
        }

        return this.defaultEntityType;
    }

What do you think?

Copy link
Member

@saragluna saragluna May 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Aryelmr, thanks. I like the header approach, and it looks better. But I am still wondering what your dynamic destination code looks like, and why it goes to the DefaultServiceBusNamespaceProducerFactory:createProducer? Could you help provide some code, so I could take a look.

this.serviceBusTemplate.setDefaultEntityType(producerProperties.getExtension().getEntityType());
}
}
return this.serviceBusTemplate;
}
Expand Down