Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expected behavior when producing to non-existent partition #711

Closed
alistairking opened this issue Apr 23, 2024 · 6 comments
Closed

Expected behavior when producing to non-existent partition #711

alistairking opened this issue Apr 23, 2024 · 6 comments

Comments

@alistairking
Copy link
Contributor

Hey again,

We recently had a bug in some code that was doing manual assignment of partitions to messages where it was configured with an incorrect number of partitions and so was producing messages to partitions that did not exist (the topic had 100 partitions and the partitioning code was assigning messages across 1000).

Naively I would have expected the produce to fail or at worst to log an error, but from what I could tell the message was silently dropped (whether by franz or by Kafka I don't know)?

Now, obviously the solution is to use the correct number of partitions (and probably asking Kafka for this number is better than statically configuring it), but I'm curious as to whether this is the expected behavior when one does produce to a partition that does not exist.

@twmb
Copy link
Owner

twmb commented Apr 24, 2024

It sounds like you were using manual partitioning? The client forever tries to send to partitions that don't exist. Usually, it just outright wont happen that you try to produce to a partition that does not exist. The number of partitions in a topic only goes up (stonks), so if a partition can be produced to via metadata discovery, the only way for it not to be produced to in the future is if you delete a topic and recreate it. Or, if you have a bug and try producing to a partition manually that doesn't exist.

There is an option, UnknownTopicRetries, that causes produces to fail when the entire topic does not exist. This can probably be extended -- though not super easily -- to when partitions keep repeatedly receiving this error. If you want to bite this off, feel free (I can try helping on discord). I may have some time to address this once I finally solve the implementation for the new KIP (that I'm not prioritizing that highly, to be honest).

@alistairking
Copy link
Contributor Author

Right, that's exactly what happened in our case -- we had a bug in a manual partitioner so it was producing to partitions that didn't (and wouldn't) exist.

Now that I've fixed the bug I'm not super worried about it, but it was a bit surprising that there was no logging whatsoever. Would it be easier (and would it make sense) to emit a warning log after some number of retries producing to a non-existent partition?

@twmb
Copy link
Owner

twmb commented Apr 24, 2024

There's definitely a log, but only at the debug level.

Looking at the code though,

franz-go/pkg/kgo/sink.go

Lines 1298 to 1305 in 6a58760

func (recBuf *recBuf) checkUnknownFailLimit(err error) bool {
if errors.Is(err, kerr.UnknownTopicOrPartition) {
recBuf.unknownFailures++
} else {
recBuf.unknownFailures = 0
}
return recBuf.cl.cfg.maxUnknownFailures >= 0 && recBuf.unknownFailures > recBuf.cl.cfg.maxUnknownFailures
}

The unknown check actually is at the partition level (recBuf == per-partition buffer for records).

I think maybe something else is up -- IIRC, the records are never even attempted to be produced because the client knows the partition does not exist. In fact, they should be failed outright, because of this chunk of code:

if pick < 0 || pick >= len(mapping) {
cl.producer.promiseRecord(pr, fmt.Errorf("invalid record partitioning choice of %d from %d available", pick, len(mapping)))
return
}

Something else is up here. I'll see about reproducing this locally with kfake...

@pikrzysztof
Copy link

pikrzysztof commented Apr 24, 2024

These guys here https://www.youtube.com/watch?v=paVdXL5vDzg starting at 17:45 ending at 20:50 demonstrate that Java client will synchronously block until the partition gets created. No log message is produced.

@twmb twmb added the TODO label May 23, 2024
@twmb
Copy link
Owner

twmb commented May 26, 2024

Sorry for the delay on getting back to this, but I can't reproduce the initial problem when running the following locally against a topic ("foo") with 10 partitions:

package main

import (
	"context"
	"fmt"

	"github.com/twmb/franz-go/pkg/kgo"
)

func main() {
	cl, _ := kgo.NewClient(
		kgo.DefaultProduceTopic("foo"),
		kgo.RecordPartitioner(kgo.ManualPartitioner()),
	)

	r := &kgo.Record{
		Value:     []byte("foo"),
		Partition: 0,
	}

	{
		r, err := cl.ProduceSync(context.Background(), r).First()
		fmt.Println(r, err)
	}

	r.Partition = 11
	{
		r, err := cl.ProduceSync(context.Background(), r).First()
		fmt.Println(r, err)
	}
}
$ go run main.go
&{[] [102 111 111] [] 2024-05-25 20:14:28.1 -0600 MDT foo 0 {0} 0 2039761367311953381 0 0 context.Background} <nil>
&{[] [102 111 111] [] 2024-05-25 20:14:28.1 -0600 MDT foo 0 {0} 0 0 0 0 context.Background} invalid record partitioning choice of 11 from 10 available

I'm going to close this for now but if you have more details (maybe a slimmed down repro) about how you ran into a record being invisibly dropped, I'm open to investigate further.

@alistairking
Copy link
Contributor Author

Thanks for looking into this. It turns out that we were not passing an error callback function to our async Produce calls, so the errors were being silently dropped. Sorry for the fire drill!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants