Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

@AsyncListener is not overriding @KafkaListener and both payloads appears #729

Closed
fcmdeveloper1 opened this issue Apr 28, 2024 · 11 comments
Closed
Labels
bug Something isn't working staged for release

Comments

@fcmdeveloper1
Copy link

Dear @timonback,
i tried for java and i have duplicate messages, although i used the @AsyncListener which should override the payload in the method itself.

but the topic name and the group name should still be taken from the @KafkaListener annotation, unless you will provide attribute for them in the @AsyncListener annotation

example:

@AsyncListener(
operation = @AsyncOperation(payloadType = LearningEvent.class, description = "description", channelName = "lms"
))
@KafkaListener(topics = "lms", groupId = "test")
public void receiveExamplePayload2(
@payload List<ConsumerRecord<String, LearningEvent>> records, Acknowledgment acknowledgment) {
log.info("Received new message in example-queue: {}", records.toString());
}

the generated asyncAPI has :

"channels": {
"lms": {
"messages": {
"com.asyncapi.kafka.dtos.LearningEvent": {
"$ref": "#/components/messages/com.asyncapi.kafka.dtos.LearningEvent"
},
"org.apache.kafka.clients.consumer.ConsumerRecord": {
"$ref": "#/components/messages/org.apache.kafka.clients.consumer.ConsumerRecord"
}
}
}
}

attached is a sample project can be imported by maven and check the output, so it will be easy to reproduce

asyncApi_github.zip

image

My Best Regards

@fcmdeveloper1 fcmdeveloper1 added the bug Something isn't working label Apr 28, 2024
Copy link

Welcome to Springwolf. Thanks a lot for reporting your first issue. Please check out our contributors guide and feel free to join us on discord.

@timonback
Copy link
Member

Hi @fcmdeveloper1,
Thank you for the report.
There is indeed a small bug in the extraction of payloads from generics.

When the bug is released, you still will need to add this to your springwolf configuration so that ConsumerRecords are extracted correctly:

springwolf.payload.extractable-classes.org.apache.kafka.clients.consumer.ConsumerRecord=1

More details: https://www.springwolf.dev/docs/configuration/documenting-messages/#unwrapping-the-payload

@fcmdeveloper1
Copy link
Author

fcmdeveloper1 commented May 5, 2024

Dear @timonback,

   this means that by adding the `@AsyncListener` annotation, the LearningEvent payload only will be shown and the other payload of the method itself will be ignored which is the ConsumerRecord, or you mean i have to add this property  `"springwolf.payload.extractable-classes.org.apache.kafka.clients.consumer.ConsumerRecord=0"` to actually ignore the ConsumerRecord payload?

My Best Regards

@timonback
Copy link
Member

timonback commented May 5, 2024

Hi @fcmdeveloper1 ,

Springwolf does read the payload from the method signature. It will see the consumerrecord and believe that it is the actual payload. However consumerrecord is a wrapper around the actual learningevent payload.
To tell springwolf to unwrap/extract the payload, you will have to add the configuration property in my post above

This is independent of AsyncListener or KafkaListener. Also note value. For more details have a look at the website documentation. As this is a bit complex, we happily take PRs to improve the documentation on it.

@fcmdeveloper1
Copy link
Author

Dear @timonback,
i though from the documentation, it is mentioned that the @AsyncListener has a higher precedence over the method payload itself, that's why i raised this defect and this is what i expected (to have the message from the @AsyncListener annotation instead of the ConsumerRecord)

image

however, when i tried with this mentioned property, "springwolf.payload.extractable-classes.org.apache.kafka.clients.consumer.ConsumerRecord=1", the generated asyncapi yaml has 2 messages, one for the LMSEvent and the other one is just a string

"lms": {
"messages": {
"com.asyncapi.kafka.dtos.LearningEvent": {
"$ref": "#/components/messages/com.asyncapi.kafka.dtos.LearningEvent"
},
"java.lang.String": {
"$ref": "#/components/messages/java.lang.String"
}
}
}

should i try with some new patch or version?

My Best Regards

@timonback
Copy link
Member

Yes, try with the SNAPSHOT version as it contains the bugfix. More details are in the README.

What actually happens is that the AsyncListener generates the documentation with your event.
Additionally, the method is detected as second time due to the kafkalistener annotation. This one will see the consumerrecord. In the current release there is a bug. In the snapshot version (and with the configuration setting), this should detect the same event and thereby resolve the reported issue.

@fcmdeveloper1
Copy link
Author

Dear @timonback,
i tested it and it seems it works fine if the payload of the method has the same generics object

public void receiveExamplePayload2(
@payload(required = true) List<ConsumerRecord<String, LearningEvent>> records, Acknowledgment acknowledgment)

and the @AsyncListener has the same objects
@AsyncListener(
operation = @AsyncOperation(payloadType = LearningEvent.class, description = "description", channelName = "lms"
))

But

if the method signature contains for example another object such as ParentDto, both objects will appear, which still not the expected.

@AsyncListener(
operation = @AsyncOperation(payloadType = LearningEvent.class, description = "description", channelName = "lms"
))
@KafkaListener(topics = "lms", groupId = "test")
public void receiveExamplePayload2(
@payload(required = true) List<ConsumerRecord<String, ParentEventDto>> records, Acknowledgment acknowledgment) {
log.info("Received new message in example-queue: {}", records.toString());

My Best Regards

@timonback
Copy link
Member

timonback commented May 10, 2024

Hi @fcmdeveloper1 ,

we assume that the payload in the method signature (ParentEventDto) matches the actual payload (LearningEvent).

What happens in springwolf, is that we use scanners to detect listeners (and publishers), extract the payload and build the asyncapi.json file.

One scanner will find the ParentEventDto for the topic and the other scanner will find the LEarningEvent. Those are merged and therefore both payloads detected.

Please check out the faq: https://www.springwolf.dev/docs/faq#consumers-are-detected-multiple-times-with-different-payloads

One option is to disable the 'other' scanner.

(As this seems unclear, you are more than welcome to simply edit the documentation and suggest a better description)

@fcmdeveloper1
Copy link
Author

Dear @timonback,

   i got your point, but is there a possibility to add a flag in the @asyncLister to mention if we need to have the @KafkaListener or not for the included method or some flag to ignore this @KafkaListener if needed by adding any annotation to the method signature itself?

My Best Regards

@timonback
Copy link
Member

Hi, I don't see a way at this point, besides the options mentioned above.

Copy link

The change is available in the latest release. 🎉

Thank you for the report/contribution and making Springwolf better!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working staged for release
Projects
None yet
Development

No branches or pull requests

2 participants