Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Payload Kafka Key Types #563

Open
Sonic-Rage opened this issue Jan 25, 2024 · 3 comments
Open

Add support for Payload Kafka Key Types #563

Sonic-Rage opened this issue Jan 25, 2024 · 3 comments
Labels
enhancement New feature or request

Comments

@Sonic-Rage
Copy link

We utilize an Object as a Key to send events with Kafka. It would be great if the scanner could recognize objects used for Kafka Keys to add the schemas automatically to message binding.

Here is a sample of a listener that we use with Spring Kafka

@KafkaHandler
public void consumeMessage(
@headers MessageHeaders headers,
@Header(KafkaHeaders.RECEIVED_KEY) KeyObject key,
@payload(required = false) PayloadObject payload) {

@Sonic-Rage Sonic-Rage added the enhancement New feature or request label Jan 25, 2024
Copy link

Welcome to Springwolf. Thanks a lot for reporting your first issue. Please check out our contributors guide and feel free to join us on discord.

@ctasada
Copy link
Collaborator

ctasada commented Jan 28, 2024

Hi @Sonic-Rage I was looking into this ticket. Would be fair to assume that, given something like

    @KafkaHandler
    public void receiveExamplePayload(
            @Headers MessageHeaders headers,
            @Header(KafkaHeaders.RECEIVED_KEY) String key,
            @Payload(required = false) ExamplePayloadDto payload) {
        log.info("Received new message in {}: {}", TOPIC, payload.toString());
    }

Should generate

        "message": {
          "schemaFormat": "application/vnd.oai.openapi+json;version=3.0.0",
          "name": "io.github.stavshamir.springwolf.example.kafka.dtos.AnotherPayloadDto",
          "title": "AnotherPayloadDto",
          "payload": {
            "$ref": "#/components/schemas/io.github.stavshamir.springwolf.example.kafka.dtos.AnotherPayloadDto"
          },
          "headers": {
            "type": "object",
            "properties": {
                "id": {
                   "type": "string"
                 },
                "kafka_receivedMessageKey": {
                   "type": "string"
                 },
                "timestamp": {
                   "type": "string"
                 },
          },
          "bindings": {
            "kafka": {
              "bindingVersion": "0.4.0"
            }
          }
        }

We are heavily working on the next release, but we can check how to improve this case. Can you confirm that would be your expectation?

@ctasada ctasada self-assigned this Jan 30, 2024
ctasada added a commit to ctasada/springwolf-core that referenced this issue Jan 31, 2024
Fixes issue springwolf#563

Now, when a method is annotated with the `@Header` or `@Headers` annotation, the payload marked with `@Payload` will also include the defined headers.

Known limitations:
* If a @payload is indicated in different places, the AsyncAPI specification will include any correlated @Header
* When using `@Headers` we cannot provide further information
@Sonic-Rage
Copy link
Author

Sonic-Rage commented Jan 31, 2024

@ctasada Sorry it took me a little bit to go through the Async spec some. The key for Kafka doesn't appear to be well documented from what I can tell so hopefully I'm not making too many assumptions.
My use case involves all three( Kafka headers, key and payload) Headers for traceability, and objects in Key for filtering and partioning Kafka events as well as payload objects.

The Readme here shows the key as https://github.com/asyncapi/bindings/blob/master/kafka/README.md#message-binding-object gets configured as a message binding.

To keep the key separated from the actual Kafka headers I would expect the Key to go into message bindings. Instead of manually defining KafkaAsyncOperationBinding with key definition if @Header(KafkaHeaders.RECEIVED_KEY) Object key is found then add to message bindings.

Here is a sample I tossed together using studio.asyncapi.com

{
    "asyncapi": "3.0.0",
    "info": {
        "title": "SpringWolf Kafka Object Key Payload",
        "version": "1.0.0",
        "contact": {
            "name": "foo",
            "url": "https://foo.com",
            "email": "foo@gmail.com"
        },
        "tags": [],
        "x-generator": "springwolf"
    },
    "defaultContentType": "application/json",
    "servers": {
        "kafka": {
            "host": "localhost:9092",
            "protocol": "kafka"
        }
    },
    "channels": {
        "topic1": {
            "address": "Sample topic",
            "messages": {
                "topic1.message": {
                    "name": "Topic name",
                    "title": "Topic title",
                    "description": "Sample json for Kafka Key and Payload Objects Spring wolf",
                    "$ref": "#/components/messages/object1",
                    "headers": {
                        "$ref": "#/components/schemas/HeadersNotDocumented"
                    },
                    "bindings": {},
                    "payload": {
                        "schemaFormat": "application/vnd.oai.openapi+json;version=3.0.0"
                    }
                }
            }
        }
    },
    "operations": {
        "topic1": {
            "action": "send",
            "channel": {
                "$ref": "#/channels/topic1"
            },
            "description": "Sample topic with Kafka Key and payload as objects",
            "bindings": {},
            "messages": [
                {
                    "$ref": "#/channels/topic1/messages/topic1.message"
                }
            ]
        }
    },
    "components": {
        "messages": {
            "object1": {
                "name": "Topic 1 message",
                "title": "Topic 1 message",
                "headers": {
                    "$ref": "#/components/schemas/MessageHeaders"
                },
                "payload": {
                    "$ref": "#/components/schemas/MessagePayload"
                },
                "bindings": {
                    "kafka": {
                        "key": {
                            "$ref": "#/components/schemas/MessageKey"
                        }
                    }
                }
            }
        },
        "schemas": {
            "MessageHeaders": {
                "type": "object",
                "properties": {
                    "header1": {
                        "type": "string"
                    },
                    "header2": {
                        "type": "string"
                    }
                }
            },
            "MessagePayload": {
                "type": "object",
                "properties": {
                    "intField": {
                        "type": "integer",
                        "description": "some integer"
                    },
                    "stringField": {
                        "type": "string",
                        "description": "some string field"
                    }
                },
                "description": "Payload object",
                "example": {
                    "intField": 500,
                    "stringField": "foo"
                }
            },
            "MessageKey": {
                "type": "object",
                "properties": {
                    "keyField1": {
                        "type": "string",
                        "description": "Unique id that ensures equal distribution across partitions",
                        "example": "ea251adf-a778-4491-becb-f3ca9fd886a1"
                    },
                    "keyField2": {
                        "type": "string",
                        "description": "key field 2"
                    }
                },
                "description": "The key for the event",
                "example": {
                    "keyfield1": "ea251adf-a778-4491-becb-f3ca9fd886a1",
                    "keyfield2": "foo"
                }
            }
        }
    }
}

ctasada added a commit to ctasada/springwolf-core that referenced this issue Feb 2, 2024
Fixes issue springwolf#563

Now, when a method is annotated with the `@Header` or `@Headers` annotation, the payload marked with `@Payload` will also include the defined headers.

Known limitations:
* If a @payload is indicated in different places, the AsyncAPI specification will include any correlated @Header
* When using `@Headers` we cannot provide further information
ctasada added a commit to ctasada/springwolf-core that referenced this issue Feb 2, 2024
Fixes issue springwolf#563

* When using `@Headers` we cannot provide further information
ctasada added a commit to ctasada/springwolf-core that referenced this issue Feb 2, 2024
Fixes issue springwolf#563

Now, when a method is annotated with the @Header or @headers annotation, the payload marked with @payload will also include the defined headers.

Known limitations:

If a @payload is indicated in different places, the AsyncAPI specification will include any correlated @Header
When using @headers we cannot provide further information
ctasada added a commit to ctasada/springwolf-core that referenced this issue Mar 2, 2024
Fixes issue springwolf#563

Now, when a method is annotated with the @Header or @headers annotation, the payload marked with @payload will also include the defined headers.

Known limitations:

If a @payload is indicated in different places, the AsyncAPI specification will include any correlated @Header
When using @headers we cannot provide further information
ctasada added a commit to ctasada/springwolf-core that referenced this issue Mar 30, 2024
Fixes issue springwolf#563

Now, when a method is annotated with the @Header or @headers annotation, the payload marked with @payload will also include the defined headers.

Known limitations:

If a @payload is indicated in different places, the AsyncAPI specification will include any correlated @Header
When using @headers we cannot provide further information
@ctasada ctasada removed their assignment Mar 31, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants