Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Schema registry module tagged release #453

Open
lovromazgon opened this issue May 17, 2023 · 14 comments
Open

Schema registry module tagged release #453

lovromazgon opened this issue May 17, 2023 · 14 comments
Labels
enhancement New feature or request

Comments

@lovromazgon
Copy link
Contributor

I'm wondering if there's a reason that there's no tagged release for the schema registry code in pkg/sr? Having proper tags would make it easier to update the dependency with dependabot.

@twmb
Copy link
Owner

twmb commented May 18, 2023

The main reason is that I want to either use this myself extensively or have some community feedback on how the package feels before stabilizing it, to be sure that the API isn't kludgy / that it shouldn't change. I partially would like to have plugins for Avro / Schema Registry available as well, so the entire thing is just plug & play for producing & consuming. The plugins would take a lot of time / I don't have time (this is more for the schema registry actually). But, if you'd like to try things out and let me know how it is -- I'd try to set aside some time to move it forward. I don't think there's anything more that I want to do with this package, but look&feel feedback would be good. I'm planning to revisit this PR someday which uses the sr package and that might be a forcing function for me to play with the look&feel more. I don't anticipate changes at this point, though, since writing that PR originally is what allowed me to play with the sr package in the first place. The main thing to add to that PR is the producing & consuming components.

@lovromazgon
Copy link
Contributor Author

lovromazgon commented May 22, 2023

Well, let me give you some context first. My plan is to use the library in Conduit to provide encoder and decoder processors which use schemas from the schema registry to encode/decode a payload (similar to Kafka Connect key/value converters). I've started playing around with the schema registry client and the serde interface and the experience is quite good, especially compared to existing alternatives out there. I can give you feedback on what I found so far.

I found some minor issues like some comments not matching function names and the mode not being set (PR: #456).

There are some bigger issues. In my use case I am dealing with dynamic data, meaning that I don't know the structure and schema in advance. The Go type will always be the same (map), the schema used for encoding will be determined based on the user input (subject+version) or extracted from the data (reflection). This means the Serde type will be almost useless in the encoder since it assumes that tserde is determined based on the type that's being encoded. I'm not sure if there's a way around that other than writing the logic on my own, but I'd be happy to hear what you think.

In the decoder the situation is better, since Serde will determine tserde based on the ID extracted from the wire format. It will choose the correct DecodeFn used for decoding, therefore the correct schema will be chosen for decoding the value. The problem is if the schema is not registered in Serde yet. In that case, I'd like to download the schema and try decoding again. I could check for the ErrNotRegistered error, but then I'd have to extract the schema ID again and Serde already contains the logic for that. To solve this I'd either need ErrNotRegistered to contain a field ID that tells me which ID is not registered, or I'd need to be able to execute the wire format logic separately. The first approach is obviously simpler, while the second one could allow the user to provide their own wire format logic, making the interface a bit more flexible. The Confluent wire format could be provided and used as a default. Something along the lines of:

type WireFormat interface {
	// DecodeID extracts the schema ID from the wire format
	// and returns it alongside the number of bytes used by the ID.
	// If the ID can't be extracted it returns ErrBadHeader.
	DecodeID([]byte) (uint32, int, error)

	// EncodeID encodes the ID in the wire format and appends it
	// to the provided byte slice.
	EncodeID([]byte, uint32) ([]byte, error)
}

type ConfluentWireFormat struct { /.../ }

I realize that WireFormat might not be the right naming here, since it only takes care of the ID and not the whole format (e.g. there might be a footer). It also assumes that the wire format always encodes the ID in front of the payload. But it would at least solve my immediate issue of having to duplicate the logic around schema IDs.

@twmb
Copy link
Owner

twmb commented May 26, 2023

For the Serde problem, would the following function handle the problem?

// DynEncodeFn allows Serde to encode dynamic values.
func DynEncodeFn(func(any) ([]byte, error)) SerdeOpt

// DynAppendEncodeFn allows Serde to encode a dynamic value to an existing slice.
// This can be more efficient than EncodeFn; this function is used if it exists.
func DynAppendEncodeFn(func(any) ([]byte, error)) SerdeOpt

// DynEncode appends a dynamic type with the given id to b. If index is
// non-nil, it is encoded as a type index into the schema (this is relevant for protobuf).
func (s *Serde) DynEncode(id int, index []int, v any) ([]byte, error)

// DynAppendEncode appends a dynamic type with the given id to b. If index is
// non-nil, it is encoded as a type index into the schema (this is relevant for protobuf).
func (s *Serde) DynAppendEncode(b []byte, id int, index []int, v any) ([]byte, error)

The first approach is obviously simpler, while the second one could allow the user to provide their own wire format logic, making the interface a bit more flexible

I don't understand this bit fully -- are you thinking of writing your own encode / decode logic? This isn't necessarily a bad idea fwiw -- encoding the ID in the value itself has always seemed weird (vs. using a record header). The interface would also need to handle indices though (for the protobuf index aspect). +1 to explore this idea more.

However even with that interface, I'm not entirely sure how you're going to dynamically map to a schema to decode into. I was thinking that there could be another option to dynamically fetch schemas

// DynDecode opts Serde into dynamically fetching a decoder when an unrecognized
// schema ID or schema index is seen. If fn returns a decoder, the decoder is
// permanently cached. If fn returns an error, the Decode function simply returns
// that error to the user and no value is cached.
func DynDecode(fn func(id int, index []int) (func([]byte, any), error)) SerdeOpt

I'm not really a fan of how any of these options have the []int index field, but I'm not sure how to avoid it. I also hope Confluent doesn't decide to add yet another means in the future of setting further options that need further parsing / selecting. One way to future proof this would be to use a type Header struct { ID int, Index []int } type and use that in the options, so that new fields could be added.

What do you think of the above?

@lovromazgon
Copy link
Contributor Author

I don't understand this bit fully -- are you thinking of writing your own encode / decode logic?

Essentially, yes. This is how I see it - one part of Serde is the format of the encoding (Avro, Protobuf etc.), that part is already customisable. The other part is the wire format itself, which combines the encoded data from part one with some metadata (schema ID and index, possibly more in the future, as you pointed out) and creates a wire format (or envelope), which can be sent to Kafka or stored somewhere. I'm thinking how we could make the second part customisable as well.

As for your proposal, I'll dive deeper and get back to you next week. Thanks for spending time on this! 👍

@lovromazgon
Copy link
Contributor Author

I had a second look at the proposed DynEncode functions. I played around with it and they actually address my issue and allow me to use the type with dynamic data. I'm thinking if we can simplify some things though.

I don't quite understand why we would need separate options DynEncodeFn and DynAppendEncodeFn in addition to EncodeFn and AppendEncodeFn? IIUC they mostly serve the same purpose - defining the encoding logic. I understand that reusing the same option would force you to use the same function when encoding dynamic data vs. when encoding a Go struct, while they might require different encoding functions. To me it seems unlikely that someone would want to use both approaches simultaneously though. I'd expect people to use either one approach or the other, especially on the same schema.

Regarding Serde.DynEncode and Serde.DynAppendEncode I'm on board! Still, let me present an alternative for you to consider. I'm thinking if we can collapse both of them into Serde.Encode and Serde.AppendEncode by letting them accept options (as it's already done in Serde.Register). Something like this:

type (
	// EncodeOpt is an option to configure Serde.Encode and related functions.
	EncodeOpt  interface{ apply(*encodeOpts) }
	encodeOpt  struct{ fn func(*encodeOpts) }
	encodeOpts struct {
		id    int
		index []int
	}
)

func (o encodeOpt) apply(opts *encodeOpts) { o.fn(opts) }

// WithIDAndIndex forces Encode to use the specified schema ID and index.
func WithIDAndIndex(id int, index []int) EncodeOpt {
	return encodeOpt{func(opts *encodeOpts) {
		opts.id = id
		opts.index = index
	}}
}

func (s *Serde) AppendEncode(b []byte, v any, opts ...EncodeOpt) ([]byte, error) {
	// apply opts and decide to fetch tserde using type or ID
}

I admit that having only one encode option makes it seem a bit overengineered. Or future proof, depending on how you look at it 🙃

Let me know what you think, I can open a PR this week.

@lovromazgon
Copy link
Contributor Author

As you see I took the liberty to act on this and opened a PR, looking forward to hearing what you think about this approach.

@twmb
Copy link
Owner

twmb commented Jun 1, 2023

Sorry not sure what you mean -- didn't open a PR, only merged other open PRs and tagged kotel / franz-go. missed the "I". Yep, will look -- sorry I'm a bit behind, very busy at the moment.

@lovromazgon
Copy link
Contributor Author

So there's another PR for extracting the header logic. I think that's the last one, that should provide all I need for my use case.

I know you're busy and I feel bad putting more work on your plate 😬 Let me know if I can help you in any way with the reviews (e.g. pair and/or talk through the changes).

@twmb twmb added the enhancement New feature or request label Jun 27, 2023
@law-dwg
Copy link

law-dwg commented Feb 20, 2024

Hello, we also are experiencing headache with our renovatorbot regarding the pseudo-version of pkg/sr. I'm hesitant to completely exclude it from our renovator config because I wouldn't want to miss any potential important updates.

Let me know how I can contribute to help move it forward to a proper tagging scheme.

I want to mention that your all's hard work on this package has been greatly appreciated. We find it to be a great, robust, solution for our current kafka implementation.

@twmb
Copy link
Owner

twmb commented Feb 22, 2024

I think the main hangup is #506 now -- I don't foresee API bits changing now that Params is "generic" / forward compatible (albeit a bit odd by stuffing into ctx, maybe)

@lovromazgon
Copy link
Contributor Author

This issue fell of my radar (again). I spent some time now and updated the PR according to the last comments.

@twmb twmb added the has pr label Apr 3, 2024
@twmb
Copy link
Owner

twmb commented May 26, 2024

@lovromazgon wdyt of the current API, anything remaining? Stabilize?
IMO I think it's basically stable, but I'd like a second opinion (or third if you also want to check, @law-dwg :) )

@twmb twmb removed the has pr label May 26, 2024
@lovromazgon
Copy link
Contributor Author

I added one small fix (#745), otherwise looks good to me 👍 Looking forward to have a stable release for the sr package!

@law-dwg
Copy link

law-dwg commented May 27, 2024

From my side looks great! Thanks you two for the time and effort <3

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants