-
Notifications
You must be signed in to change notification settings - Fork 64
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for Payload Kafka Key Types #563
Comments
Welcome to Springwolf. Thanks a lot for reporting your first issue. Please check out our contributors guide and feel free to join us on discord. |
Hi @Sonic-Rage I was looking into this ticket. Would be fair to assume that, given something like @KafkaHandler
public void receiveExamplePayload(
@Headers MessageHeaders headers,
@Header(KafkaHeaders.RECEIVED_KEY) String key,
@Payload(required = false) ExamplePayloadDto payload) {
log.info("Received new message in {}: {}", TOPIC, payload.toString());
} Should generate "message": {
"schemaFormat": "application/vnd.oai.openapi+json;version=3.0.0",
"name": "io.github.stavshamir.springwolf.example.kafka.dtos.AnotherPayloadDto",
"title": "AnotherPayloadDto",
"payload": {
"$ref": "#/components/schemas/io.github.stavshamir.springwolf.example.kafka.dtos.AnotherPayloadDto"
},
"headers": {
"type": "object",
"properties": {
"id": {
"type": "string"
},
"kafka_receivedMessageKey": {
"type": "string"
},
"timestamp": {
"type": "string"
},
},
"bindings": {
"kafka": {
"bindingVersion": "0.4.0"
}
}
} We are heavily working on the next release, but we can check how to improve this case. Can you confirm that would be your expectation? |
Fixes issue springwolf#563 Now, when a method is annotated with the `@Header` or `@Headers` annotation, the payload marked with `@Payload` will also include the defined headers. Known limitations: * If a @payload is indicated in different places, the AsyncAPI specification will include any correlated @Header * When using `@Headers` we cannot provide further information
@ctasada Sorry it took me a little bit to go through the Async spec some. The key for Kafka doesn't appear to be well documented from what I can tell so hopefully I'm not making too many assumptions. The Readme here shows the key as https://github.com/asyncapi/bindings/blob/master/kafka/README.md#message-binding-object gets configured as a message binding. To keep the key separated from the actual Kafka headers I would expect the Key to go into message bindings. Instead of manually defining KafkaAsyncOperationBinding with key definition if @Header(KafkaHeaders.RECEIVED_KEY) Object key is found then add to message bindings. Here is a sample I tossed together using studio.asyncapi.com {
"asyncapi": "3.0.0",
"info": {
"title": "SpringWolf Kafka Object Key Payload",
"version": "1.0.0",
"contact": {
"name": "foo",
"url": "https://foo.com",
"email": "foo@gmail.com"
},
"tags": [],
"x-generator": "springwolf"
},
"defaultContentType": "application/json",
"servers": {
"kafka": {
"host": "localhost:9092",
"protocol": "kafka"
}
},
"channels": {
"topic1": {
"address": "Sample topic",
"messages": {
"topic1.message": {
"name": "Topic name",
"title": "Topic title",
"description": "Sample json for Kafka Key and Payload Objects Spring wolf",
"$ref": "#/components/messages/object1",
"headers": {
"$ref": "#/components/schemas/HeadersNotDocumented"
},
"bindings": {},
"payload": {
"schemaFormat": "application/vnd.oai.openapi+json;version=3.0.0"
}
}
}
}
},
"operations": {
"topic1": {
"action": "send",
"channel": {
"$ref": "#/channels/topic1"
},
"description": "Sample topic with Kafka Key and payload as objects",
"bindings": {},
"messages": [
{
"$ref": "#/channels/topic1/messages/topic1.message"
}
]
}
},
"components": {
"messages": {
"object1": {
"name": "Topic 1 message",
"title": "Topic 1 message",
"headers": {
"$ref": "#/components/schemas/MessageHeaders"
},
"payload": {
"$ref": "#/components/schemas/MessagePayload"
},
"bindings": {
"kafka": {
"key": {
"$ref": "#/components/schemas/MessageKey"
}
}
}
}
},
"schemas": {
"MessageHeaders": {
"type": "object",
"properties": {
"header1": {
"type": "string"
},
"header2": {
"type": "string"
}
}
},
"MessagePayload": {
"type": "object",
"properties": {
"intField": {
"type": "integer",
"description": "some integer"
},
"stringField": {
"type": "string",
"description": "some string field"
}
},
"description": "Payload object",
"example": {
"intField": 500,
"stringField": "foo"
}
},
"MessageKey": {
"type": "object",
"properties": {
"keyField1": {
"type": "string",
"description": "Unique id that ensures equal distribution across partitions",
"example": "ea251adf-a778-4491-becb-f3ca9fd886a1"
},
"keyField2": {
"type": "string",
"description": "key field 2"
}
},
"description": "The key for the event",
"example": {
"keyfield1": "ea251adf-a778-4491-becb-f3ca9fd886a1",
"keyfield2": "foo"
}
}
}
}
} |
Fixes issue springwolf#563 Now, when a method is annotated with the `@Header` or `@Headers` annotation, the payload marked with `@Payload` will also include the defined headers. Known limitations: * If a @payload is indicated in different places, the AsyncAPI specification will include any correlated @Header * When using `@Headers` we cannot provide further information
Fixes issue springwolf#563 * When using `@Headers` we cannot provide further information
Fixes issue springwolf#563 Now, when a method is annotated with the @Header or @headers annotation, the payload marked with @payload will also include the defined headers. Known limitations: If a @payload is indicated in different places, the AsyncAPI specification will include any correlated @Header When using @headers we cannot provide further information
Fixes issue springwolf#563 Now, when a method is annotated with the @Header or @headers annotation, the payload marked with @payload will also include the defined headers. Known limitations: If a @payload is indicated in different places, the AsyncAPI specification will include any correlated @Header When using @headers we cannot provide further information
Fixes issue springwolf#563 Now, when a method is annotated with the @Header or @headers annotation, the payload marked with @payload will also include the defined headers. Known limitations: If a @payload is indicated in different places, the AsyncAPI specification will include any correlated @Header When using @headers we cannot provide further information
We utilize an Object as a Key to send events with Kafka. It would be great if the scanner could recognize objects used for Kafka Keys to add the schemas automatically to message binding.
Here is a sample of a listener that we use with Spring Kafka
@KafkaHandler
public void consumeMessage(
@headers MessageHeaders headers,
@Header(KafkaHeaders.RECEIVED_KEY) KeyObject key,
@payload(required = false) PayloadObject payload) {
The text was updated successfully, but these errors were encountered: