-
-
Notifications
You must be signed in to change notification settings - Fork 155
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Schema registry module tagged release #453
Comments
The main reason is that I want to either use this myself extensively or have some community feedback on how the package feels before stabilizing it, to be sure that the API isn't kludgy / that it shouldn't change. I partially would like to have plugins for Avro / Schema Registry available as well, so the entire thing is just plug & play for producing & consuming. The plugins would take a lot of time / I don't have time (this is more for the schema registry actually). But, if you'd like to try things out and let me know how it is -- I'd try to set aside some time to move it forward. I don't think there's anything more that I want to do with this package, but look&feel feedback would be good. I'm planning to revisit this PR someday which uses the |
Well, let me give you some context first. My plan is to use the library in Conduit to provide encoder and decoder processors which use schemas from the schema registry to encode/decode a payload (similar to Kafka Connect key/value converters). I've started playing around with the schema registry client and the serde interface and the experience is quite good, especially compared to existing alternatives out there. I can give you feedback on what I found so far. I found some minor issues like some comments not matching function names and the mode not being set (PR: #456). There are some bigger issues. In my use case I am dealing with dynamic data, meaning that I don't know the structure and schema in advance. The Go type will always be the same (map), the schema used for encoding will be determined based on the user input (subject+version) or extracted from the data (reflection). This means the In the decoder the situation is better, since type WireFormat interface {
// DecodeID extracts the schema ID from the wire format
// and returns it alongside the number of bytes used by the ID.
// If the ID can't be extracted it returns ErrBadHeader.
DecodeID([]byte) (uint32, int, error)
// EncodeID encodes the ID in the wire format and appends it
// to the provided byte slice.
EncodeID([]byte, uint32) ([]byte, error)
}
type ConfluentWireFormat struct { /.../ } I realize that |
For the Serde problem, would the following function handle the problem? // DynEncodeFn allows Serde to encode dynamic values.
func DynEncodeFn(func(any) ([]byte, error)) SerdeOpt
// DynAppendEncodeFn allows Serde to encode a dynamic value to an existing slice.
// This can be more efficient than EncodeFn; this function is used if it exists.
func DynAppendEncodeFn(func(any) ([]byte, error)) SerdeOpt
// DynEncode appends a dynamic type with the given id to b. If index is
// non-nil, it is encoded as a type index into the schema (this is relevant for protobuf).
func (s *Serde) DynEncode(id int, index []int, v any) ([]byte, error)
// DynAppendEncode appends a dynamic type with the given id to b. If index is
// non-nil, it is encoded as a type index into the schema (this is relevant for protobuf).
func (s *Serde) DynAppendEncode(b []byte, id int, index []int, v any) ([]byte, error)
I don't understand this bit fully -- are you thinking of writing your own encode / decode logic? This isn't necessarily a bad idea fwiw -- encoding the ID in the value itself has always seemed weird (vs. using a record header). The interface would also need to handle indices though (for the protobuf index aspect). +1 to explore this idea more. However even with that interface, I'm not entirely sure how you're going to dynamically map to a schema to decode into. I was thinking that there could be another option to dynamically fetch schemas // DynDecode opts Serde into dynamically fetching a decoder when an unrecognized
// schema ID or schema index is seen. If fn returns a decoder, the decoder is
// permanently cached. If fn returns an error, the Decode function simply returns
// that error to the user and no value is cached.
func DynDecode(fn func(id int, index []int) (func([]byte, any), error)) SerdeOpt I'm not really a fan of how any of these options have the What do you think of the above? |
Essentially, yes. This is how I see it - one part of Serde is the format of the encoding (Avro, Protobuf etc.), that part is already customisable. The other part is the wire format itself, which combines the encoded data from part one with some metadata (schema ID and index, possibly more in the future, as you pointed out) and creates a wire format (or envelope), which can be sent to Kafka or stored somewhere. I'm thinking how we could make the second part customisable as well. As for your proposal, I'll dive deeper and get back to you next week. Thanks for spending time on this! 👍 |
I had a second look at the proposed I don't quite understand why we would need separate options Regarding type (
// EncodeOpt is an option to configure Serde.Encode and related functions.
EncodeOpt interface{ apply(*encodeOpts) }
encodeOpt struct{ fn func(*encodeOpts) }
encodeOpts struct {
id int
index []int
}
)
func (o encodeOpt) apply(opts *encodeOpts) { o.fn(opts) }
// WithIDAndIndex forces Encode to use the specified schema ID and index.
func WithIDAndIndex(id int, index []int) EncodeOpt {
return encodeOpt{func(opts *encodeOpts) {
opts.id = id
opts.index = index
}}
}
func (s *Serde) AppendEncode(b []byte, v any, opts ...EncodeOpt) ([]byte, error) {
// apply opts and decide to fetch tserde using type or ID
} I admit that having only one encode option makes it seem a bit overengineered. Or future proof, depending on how you look at it 🙃 Let me know what you think, I can open a PR this week. |
As you see I took the liberty to act on this and opened a PR, looking forward to hearing what you think about this approach. |
|
So there's another PR for extracting the header logic. I think that's the last one, that should provide all I need for my use case. I know you're busy and I feel bad putting more work on your plate 😬 Let me know if I can help you in any way with the reviews (e.g. pair and/or talk through the changes). |
Hello, we also are experiencing headache with our renovatorbot regarding the pseudo-version of Let me know how I can contribute to help move it forward to a proper tagging scheme. I want to mention that your all's hard work on this package has been greatly appreciated. We find it to be a great, robust, solution for our current kafka implementation. |
I think the main hangup is #506 now -- I don't foresee API bits changing now that Params is "generic" / forward compatible (albeit a bit odd by stuffing into ctx, maybe) |
This issue fell of my radar (again). I spent some time now and updated the PR according to the last comments. |
@lovromazgon wdyt of the current API, anything remaining? Stabilize? |
I added one small fix (#745), otherwise looks good to me 👍 Looking forward to have a stable release for the |
From my side looks great! Thanks you two for the time and effort <3 |
I'm wondering if there's a reason that there's no tagged release for the schema registry code in
pkg/sr
? Having proper tags would make it easier to update the dependency with dependabot.The text was updated successfully, but these errors were encountered: