Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

check remaining bytes when marking batch as empty #1192

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

zachxu42
Copy link

@zachxu42 zachxu42 commented Sep 8, 2023

Fixes issue-1188 and drastically reduce the number of reader errors and dials. Across a couple hundred reader instances, we saw the number of reader errors drop from a couple hundred per minute to 0.

When hwm==offset and remaining bytes in the buffer is not zero, it means the buffer contains information that has not been replicated yet, as least according to the hwm. However from our testing, this is mostly entirely due to race condition (i.e. we asked for hwm too soon, before the message is replicated.), and hwm always catches up so there doesn't seem to be any harm reading those bytes in the buffer.

Alternatively, we could discard the remaining bytes in the buffer when we mark the batch empty based on hwm==offset, similar to this PR.

@petedannemann
Copy link
Contributor

petedannemann commented Oct 6, 2023

@zachxu42 this seems like a plausible fix but given that we've broken kafka-go before trying to fix this issue I would feel a lot better if we first found some way to:

  1. Reproduce this issue in a test
  2. Add test coverage to prevent us from breaking this in a similar way as we did in Nettest fix #788

@petedannemann
Copy link
Contributor

We spent some more time comparing this to #1177 and we think that is more of a root cause fix, but I think we'll still need to make some testing investments prior to proceeding with it

@zachxu42
Copy link
Author

zachxu42 commented Oct 6, 2023

Thanks @petedannemann for the comment. I believe #1177 is a different case, when the error is not nil. In this PR I could change it to do the same

if highWaterMark == offset {
    msgs = &messageSetReader{empty: true}
    if remain > 0 {
		c.rbuf.Discard(remain)
    }
} else {
    msgs, err = newMessageSetReader(&c.rbuf, remain)
}

I think the issue is, whether there's an error or not, if we're about to return an empty message set, we need to make sure we clear the buffer if there's any remaining bytes, so the next reader doesn't see partial/corrupt data.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

ErrNoProgress due to parsing/concurrency issue?
2 participants