Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Panic when reading message from start of compacted topic #874

Open
GFriedrich opened this issue Mar 29, 2022 · 15 comments · May be fixed by #1164
Open

Panic when reading message from start of compacted topic #874

GFriedrich opened this issue Mar 29, 2022 · 15 comments · May be fixed by #1164
Assignees
Labels

Comments

@GFriedrich
Copy link

GFriedrich commented Mar 29, 2022

Describe the bug
When trying to read from a compacted topic using the FirstOffset configuration without a consumer group, the library panics with the following stacktrace.

panic: markRead: negative count

goroutine 68 [running]:
github.com/segmentio/kafka-go.(*messageSetReader).markRead(0xc0004587a0)
    external/com_github_segmentio_kafka_go/message_reader.go:345 +0x11a
github.com/segmentio/kafka-go.(*messageSetReader).readMessageV2(0xc0004587a0, 0x2c709, 0xc000507ac8, 0xc000507ab8, 0x2c708, 0x2c708, 0xc000059800, 0xc0005078f8, 0x416b3b, 0xc0005078f8, ...)
    external/com_github_segmentio_kafka_go/message_reader.go:329 +0x49d
github.com/segmentio/kafka-go.(*messageSetReader).readMessage(0xc0004587a0, 0x2c709, 0xc000507ac8, 0xc000507ab8, 0x2c708, 0xc000507a5c, 0x17fd1b29700, 0x0, 0xc0004922a0, 0x0, ...)
    external/com_github_segmentio_kafka_go/message_reader.go:136 +0xc5
github.com/segmentio/kafka-go.(*Batch).readMessage(0xc000195880, 0xc000507ac8, 0xc000507ab8, 0x0, 0x17fd1b29700, 0x100000001, 0xc0004922a0, 0xc0004922a0, 0xc000507ab8, 0x2)
    external/com_github_segmentio_kafka_go/batch.go:240 +0x79
github.com/segmentio/kafka-go.(*Batch).ReadMessage(0xc000195880, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, ...)
    external/com_github_segmentio_kafka_go/batch.go:192 +0x11a
github.com/segmentio/kafka-go.(*reader).read(0xc000507ed8, 0xe30160, 0xc0004882c0, 0x2c709, 0xc0000e21e0, 0x0, 0x0, 0x0)
    external/com_github_segmentio_kafka_go/reader.go:1492 +0x3ec
github.com/segmentio/kafka-go.(*reader).run(0xc000507ed8, 0xe30160, 0xc0004882c0, 0x0)
    external/com_github_segmentio_kafka_go/reader.go:1310 +0x2d9
github.com/segmentio/kafka-go.(*Reader).start.func1(0xc0004d8000, 0xe30160, 0xc0004882c0, 0xc00004402c, 0x10, 0x0, 0xfffffffffffffffe, 0xc0004d8138)
    external/com_github_segmentio_kafka_go/reader.go:1211 +0x1d8
created by github.com/segmentio/kafka-go.(*Reader).start
    external/com_github_segmentio_kafka_go/reader.go:1191 +0x1a5

Kafka Version
2.4.0

To Reproduce
Sadly I'm unable to reproduce the issue. But maybe you've seen the issue already in the past or you can point me to a place what I could check.
To be exact I found the application to panic even when restarting it over and over again. My only solution was to truncate the topic, which then brought the consumer back to life. But these messages were not special at all because I've exported them and reimported them to a different cluster and couldn't make the application fail with the other cluster. So it must be (additionally?!) connected to some internal state of the Kafka cluster.

Expected behavior
No panic at all should happen.

Additional context
Used version of the library is 0.4.30

@GFriedrich GFriedrich added the bug label Mar 29, 2022
@achille-roussel
Copy link
Contributor

Hello @GFriedrich, thanks for reporting the issue!

This is not something we have observed before but it seems like a legit issue from what you reported. If you are able to next time, it would be really useful if you are able to provide us with the state of the partition that triggered the bug (I understand this might be difficult depending on the kind of data that you store in Kafka).

@achille-roussel achille-roussel self-assigned this Apr 8, 2022
@GFriedrich
Copy link
Author

hi @achille-roussel,
today the Kafka app panic'd again but this time with an error like the one at #457
I found that the bugfix of #551 never made it to the master but only ended up in the 0.4 branch.
Could we bring this bugfix to the master branch? Maybe that then also fixes the problem here?!

@achille-roussel
Copy link
Contributor

Thanks for pointing out #551, this seems like a mistake indeed, we'll address 🙇

@GFriedrich
Copy link
Author

GFriedrich commented Apr 11, 2022

Thanks @achille-roussel
But I've just checked the Kafka code and found that it is basically impossible that Kafka will send a -1 there (or any other negative number):
https://github.com/apache/kafka/blob/75795d1ed8402f185e00b5f3aedcc2bcbb914ca9/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java#L198
To me it rather looks now like some other issue which just results in this error. Probably at some other place the whole deserialization gets off-track and then ends up with a negative number right there. That would also explain the other deserialization error from the first post.
I'll try to hook into this somehow next time and debug the entire deserialization process.

@achille-roussel
Copy link
Contributor

To me it rather looks now like some other issue which just results in this error. Probably at some other place the whole deserialization gets off-track and then ends up with a negative number right there. That would also explain the other deserialization error from the first post.

That's entirely possible, addressing the panic there seems like the right thing to do tho (we shouldn't trust data coming from the network), and not crashing the application might give us more opportunities to figure out the conditions that triggered the issue.

@DustinChaloupka
Copy link

We have also run in to this panic on a compacted topic, though using a consumer group, and are able to reliably have it happen. The reader gets to offset 3330 in this case and then panics. This is from a kafka-dump-log of the offset before and after that one:

baseOffset: 3296 lastOffset: 3296 count: 1 baseSequence: 0 lastSequence: 0 producerId: 236033 producerEpoch: 0 partitionLeaderEpoch: 147 isTransactional: false isControl: false position: 107000 CreateTime: 1638893633736 size: 228 magic: 2 compresscodec: lz4 crc: 3664565143 isvalid: true
| offset: 3296 CreateTime: 1638893633736 keySize: 42 valueSize: 168 sequence: 0 headerKeys: [] key: �Hf679b464-bb68-1034-9385-473bdaf916c3 payload: �Hf679b464-bb68-1034-9385-473bdaf916ctest@test.com�https://test.com/f679b464-bb68-1034-9385-473bdaf916c3
baseOffset: 3302 lastOffset: 3302 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 147 isTransactional: false isControl: false position: 107228 CreateTime: 1638977719139 size: 231 magic: 2 compresscodec: lz4 crc: 2774672340 isvalid: true
| offset: 3302 CreateTime: 1638977719139 keySize: 42 valueSize: 174 sequence: -1 headerKeys: [] key: �H60db575a-eaf5-103b-933f-c3907e341545 payload: �H60db575a-eaf5-103b-933f-c3907e341545
                                                                                                                                                                                           test@test.com�https://test.com/60db575a-eaf5-103b-933f-c3907e341545

every offset before this was successfully fetched/committed. This does not seem to necessarily stop the consumer from being able to consume from a later offset though. I had checked a bunch of offsets between 3300 and 3692 to see if it was ever able to continue, and it did at 3392 (unsure if there are more in between, but it was panicing at 3691). The following are the offsets around that 3692:

| offset: 3687 CreateTime: 1649176342358 keySize: 42 valueSize: 170 sequence: 1 headerKeys: [] key: �H5eee9676-67cc-103a-8df9-9d1647267a43 payload: �H5eee9676-67cc-103a-8df9-9d1647267a43
test@test.com�https://test.com/5eee9676-67cc-103a-8df9-9d1647267a43
baseOffset: 3688 lastOffset: 3688 count: 1 baseSequence: 2 lastSequence: 2 producerId: 244713 producerEpoch: 0 partitionLeaderEpoch: 158 isTransactional: false isControl: false position: 137466 CreateTime: 1649177741486 size: 225 magic: 2 compresscodec: lz4 crc: 948681325 isvalid: true
baseOffset: 3691 lastOffset: 3691 count: 0 baseSequence: 2 lastSequence: 2 producerId: 245657 producerEpoch: 0 partitionLeaderEpoch: 158 isTransactional: false isControl: false position: 137691 CreateTime: 1649278518145 size: 61 magic: 2 compresscodec: none crc: 2615745801 isvalid: true
baseOffset: 3693 lastOffset: 3693 count: 0 baseSequence: 1 lastSequence: 1 producerId: 244734 producerEpoch: 0 partitionLeaderEpoch: 158 isTransactional: false isControl: false position: 137752 CreateTime: 1649280246610 size: 61 magic: 2 compresscodec: none crc: 2386546229 isvalid: true
baseOffset: 3695 lastOffset: 3695 count: 0 baseSequence: 1 lastSequence: 1 producerId: 243722 producerEpoch: 0 partitionLeaderEpoch: 158 isTransactional: false isControl: false position: 137813 CreateTime: 1649345324335 size: 61 magic: 2 compresscodec: none crc: 1810500351 isvalid: true
baseOffset: 3698 lastOffset: 3698 count: 1 baseSequence: 2 lastSequence: 2 producerId: 244745 producerEpoch: 0 partitionLeaderEpoch: 158 isTransactional: false isControl: false position: 137874 CreateTime: 1649357610796 size: 233 magic: 2 compresscodec: lz4 crc: 770161861 isvalid: true
| offset: 3698 CreateTime: 1649357610796 keySize: 42 valueSize: 178 sequence: 2 headerKeys: [] key: �He8631c5e-2d4b-1039-9f5a-198a47df2db6 payload: �He8631c5e-2d4b-1039-9f5a-198a47df2db6test@test.com�https://test.com/e8631c5e-2d4b-1039-9f5a-198a47df2db6

Unsure if this is helpful information, but if there is something else I can get to help debug this, let me know.

@GFriedrich
Copy link
Author

@achille-roussel today I was finally able to debug the situation on one of our Kafka clusters. The application is still on version 0.4.30 - so all the line numbers you'll see, are matching to this specific version:
The library started to read a batch of messages via https://github.com/segmentio/kafka-go/blob/v0.4.30/message_reader.go#L127
The result can be seen here:
grafik
As you can see the message count in the header is 0. And this is already the issue: Kafka sends around some empty batches and the library is not expecting this - but due to some special situation (see below), this only becomes a problem if two subsequent batches are empty.
But lets move on: As you can see in the stacktrace above, I've created the screenshot just before the headers we're read a second time. This is because after the header was read at https://github.com/segmentio/kafka-go/blob/v0.4.30/message_reader.go#L127 and due to the fact that the count is 0, it will do another read of the next header at https://github.com/segmentio/kafka-go/blob/v0.4.30/message_reader.go#L249
Unfortunalely (as said before) the next batch is also empty - resulting in this screenshot:
grafik
At this place it is at least expected to have a batch with one or more messages, and so it tries to read some message data in the next lines of readMessageV2, but in fact it is reading of course another batch header. This may than break easily: Either by reading some data that causes the "header" value of the message to become negative, or it may even get to the point where markRead is called, but as the count is 0, it causes the library to panic.
The parsed data just before markRead was called, looked like this for my scenario:
grafik

As said before: The wild thing is that due to the readHeader being called twice, the empty batch only becomes a problem if two of them are returned back to back. A single empty batch is "accidentally" working.

Important to note here: I found that empty batches are only available from Kafka message v2 on. You can find the corresponding Kafka part at https://github.com/apache/kafka/blob/a3a4323a5a2ea4ba1820da58a8c31532e8864f0d/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java#L219

So what to do:
IMHO first of all the readHeader should be removed from https://github.com/segmentio/kafka-go/blob/v0.4.30/message_reader.go#L249 - I don't see a good reason that this method is called a second time. This made sense back then for v1 when the read method is internally doing some loops. But for v2 the outer readHeader is always called first - or do I miss something?
Second the readMessage method should do some check around whether the batch is empty for v2 at https://github.com/segmentio/kafka-go/blob/v0.4.30/message_reader.go#L136 - if this is the case, it should be handled in some way.

I think if these points are tackled, it should be possible to fix all of the previously mentioned deserialization issues.

Let me know if you need any more details - I've kept all the binary response data in case it is needed once again. Unfortunately I can't release it here, as it contains some private data, but I hope the given information is already enough to reproduce the situation.

@sergeylarionov
Copy link

Please, fix this bug ASAP!

@achille-roussel
Copy link
Contributor

achille-roussel commented May 27, 2022

@GFriedrich thanks for the detailed bug report.

@sergeylarionov We always welcome contributions, would you have free cycles to help submit a pull request which fixes the bug?

@Gerifield
Copy link

Gerifield commented Jun 15, 2022

We had a similar issue (not the first time sadly), but this debug is very very good.

I was checking on the code a little bit, but I am not that familiar with every detail so this might be wrong, but a possible fix could be just to add a check here https://github.com/segmentio/kafka-go/blob/v0.4.30/message_reader.go#L247 on the r.count after the readHeader if it is 0 and just return an error in that case?

I am not sure, but I think that should go back to the lib's caller which can handle it however it wants?

@SoMuchForSubtlety
Copy link
Contributor

Here is a stack trace from v0.4.39

panic: markRead: negative count

goroutine 195 [running]:
github.com/segmentio/kafka-go.(*messageSetReader).markRead(0xc00036b110?)
	github.com/segmentio/kafka-go@v0.4.39/message_reader.go:351 +0xc5
github.com/segmentio/kafka-go.(*messageSetReader).readMessageV2(0xc00085b9c0, 0xc000637a30?, 0xc000637a04?, 0x2?)
	github.com/segmentio/kafka-go@v0.4.39/message_reader.go:335 +0x9c5
github.com/segmentio/kafka-go.(*messageSetReader).readMessage(0xc00085b9c0, 0x48b614?, 0xc101d506990d9857?, 0x3352a4467?)
	github.com/segmentio/kafka-go@v0.4.39/message_reader.go:138 +0x92
github.com/segmentio/kafka-go.(*Batch).readMessage(0xc0003b0e80, 0x0?, 0x0?)
	github.com/segmentio/kafka-go@v0.4.39/batch.go:248 +0x4a
github.com/segmentio/kafka-go.(*Batch).ReadMessage(0xc0003b0e80)
	github.com/segmentio/kafka-go@v0.4.39/batch.go:200 +0xe5
github.com/segmentio/kafka-go.(*reader).read(0xc000637ec8, {0xf7e400, 0xc0003da6c0}, 0x670c22, 0xc00036b0e0)
	github.com/segmentio/kafka-go@v0.4.39/reader.go:1513 +0x285
github.com/segmentio/kafka-go.(*reader).run(0xc000637ec8, {0xf7e400, 0xc0003da6c0}, 0x670c22)
	github.com/segmentio/kafka-go@v0.4.39/reader.go:1325 +0x417
github.com/segmentio/kafka-go.(*Reader).start.func1({0xf7e400?, 0xc0003da6c0?}, {{0xc00003c108?, 0x4?}, 0x5d6596?}, 0x5d63aa?, 0x1?)
	github.com/segmentio/kafka-go@v0.4.39/reader.go:1217 +0x1db
created by github.com/segmentio/kafka-go.(*Reader).start
	github.com/segmentio/kafka-go@v0.4.39/reader.go:1193 +0x145

SoMuchForSubtlety added a commit to SoMuchForSubtlety/kafka-go that referenced this issue Mar 31, 2023
With magic v2 and above, it's possible to have empty record batches.
This was previously not considered and could lead to panics because the
code relied on the record count being non-zero in multiple places.

Fixes segmentio#874
SoMuchForSubtlety added a commit to SoMuchForSubtlety/kafka-go that referenced this issue Apr 2, 2023
With magic v2 and above, it's possible to have empty record batches.
This was previously not considered and could lead to panics because the
code relied on the record count being non-zero in multiple places.

Fixes segmentio#874
SoMuchForSubtlety added a commit to SoMuchForSubtlety/kafka-go that referenced this issue Apr 3, 2023
With magic v2 and above, it's possible to have empty record batches.
This was previously not considered and could lead to panics because the
code relied on the record count being non-zero in multiple places.

Fixes segmentio#874
@maksadbek
Copy link

Hi, I've started using kafka-go package and this error keeps occuring. To which version should I downgrade to temporarily get around it until this gets fixed ?

SoMuchForSubtlety added a commit to SoMuchForSubtlety/kafka-go that referenced this issue May 8, 2023
With magic v2 and above, it's possible to have empty record batches.
This was previously not considered and could lead to panics because the
code relied on the record count being non-zero in multiple places.

Fixes segmentio#874
@niafly niafly linked a pull request Jul 14, 2023 that will close this issue
@PaulForgey-Discovery
Copy link

we are also hitting this. Any progress? This is almost 9 months old.

@maksadbek
Copy link

@PaulForgey-Discovery I've migrated to sarama and so far all good!

@GFriedrich
Copy link
Author

I've migrated to the official Go client from Confluent

kf6nux added a commit to kf6nux/kafka-go that referenced this issue May 16, 2024
@kf6nux kf6nux mentioned this issue May 16, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
8 participants