Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log truncation and committing #123

Open
tylertreat opened this issue May 10, 2018 · 4 comments
Open

Log truncation and committing #123

tylertreat opened this issue May 10, 2018 · 4 comments

Comments

@tylertreat
Copy link
Contributor

tylertreat commented May 10, 2018

My understanding of Kafka is that a follower truncates its log up to the high watermark on recovery to remove any potentially uncommitted messages (this might no longer be the case with the leader epoch changes).

jocko/jocko/broker.go

Lines 1166 to 1169 in e2a8d10

hw := replica.Log.NewestOffset()
if err := replica.Log.Truncate(hw); err != nil {
return protocol.ErrUnknown.WithErr(err)
}

However, the implementation of Truncate here looks like it deletes messages from the oldest offset up to the given offset rather than from the newest.

func (l *CommitLog) Truncate(offset int64) error {
	l.mu.Lock()
	defer l.mu.Unlock()
	var segments []*Segment
	for _, segment := range l.segments {
		if segment.BaseOffset < offset {
			if err := segment.Delete(); err != nil {
				return err
			}
		} else {
			segments = append(segments, segment)
		}
	}
	l.segments = segments
	return nil
}

From looking through some of the code, it looks like Jocko isn't implementing the normal ISR commit "flow", i.e. in Kafka when a message is written to the log, it's not committed until the ISR has replicated.

Is this truncation in Jocko serving a different purpose, am I misunderstanding Kafka, or am I just misreading the code?

Thanks!

@kempjeng
Copy link

From what I understand, you're talking about the WAL, which as you note is used to remove uncommitted entries.

I believe this is message log truncation so that you don't fill up your hdd and crash.

@travisjeffery
Copy link
Owner

Yeah @kempjeng has it, the recovery may not match up 100% yet and the replication I'm working on now. But we need that truncation is needed to periodically remove old segments.

@tylertreat
Copy link
Contributor Author

Isn't the Cleaner interface responsible for compacting/truncating the log for purposes of preventing unbounded disk usage, i.e. DeleteCleaner? It appears this is what uses MaxLogBytes to control the log size:

cleaner: NewDeleteCleaner(opts.MaxLogBytes),

The problem I see is that upon becoming a follower, the broker is deleting all of its log messages because it truncates to its newest offset starting from the beginning of the log.

hw := replica.Log.NewestOffset()
if err := replica.Log.Truncate(hw); err != nil {
	return protocol.ErrUnknown.WithErr(err)
}

This happens inside becomeFollower, so it seemed like the intention was to mimic what Kafka does here which is truncating uncommitted messages from the log.

@travisjeffery
Copy link
Owner

travisjeffery commented May 14, 2018

Ahhh. Yes we need to truncate from the head here rather than truncate from the tail.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants