Skip to content

Commit

Permalink
db,wal: avoid Batch.refData allocation
Browse files Browse the repository at this point in the history
Passing in the Batch.refData func into (wal.Writer).WriteRecord forced an
allocation for every committed batch. This commit defines a new wal.RefCount
interface implemented by Batch to avoid the allocation.

Informs cockroachdb/cockroach#123236.
  • Loading branch information
jbowens committed Apr 30, 2024
1 parent 55642e0 commit 62fc20d
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 37 deletions.
27 changes: 14 additions & 13 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,11 +204,11 @@ type Batch struct {
// To resolve this data race, this [lifecycle] atomic is used to determine
// safety and responsibility of reusing a batch. The low bits of the atomic
// are used as a reference count (really just the lowest bit—in practice
// there's only 1 code path that references). The [Batch.refData] func is
// passed into [wal.Writer]'s WriteRecord method. The wal.Writer guarantees
// that if it will read [Batch.data] after the call to WriteRecord returns,
// it will increment the reference count. When it's complete, it'll
// unreference through invoking [Batch.unrefData].
// there's only 1 code path that references). The [Batch] is passed into
// [wal.Writer]'s WriteRecord method as a [RefCount] implementation. The
// wal.Writer guarantees that if it will read [Batch.data] after the call to
// WriteRecord returns, it will increment the reference count. When it's
// complete, it'll unreference through invoking [Batch.Unref].
//
// When the committer of a batch indicates intent to recycle a Batch through
// calling [Batch.Reset] or [Batch.Close], the lifecycle atomic is read. If
Expand All @@ -218,7 +218,7 @@ type Batch struct {
// In [Batch.Close], we set a special high bit [batchClosedBit] on lifecycle
// that indicates that the user will not use [Batch] again and we're free to
// recycle it when safe. When the commit pipeline eventually calls
// [Batch.unrefData], the [batchClosedBit] is noticed and the batch is
// [Batch.Unref], the [batchClosedBit] is noticed and the batch is
// recycled.
lifecycle atomic.Int32
}
Expand All @@ -228,16 +228,17 @@ type Batch struct {
// prevented immediate recycling.
const batchClosedBit = 1 << 30

// refData is passed to (wal.Writer).WriteRecord. If the WAL writer may need to
// read b.data after it returns, it invokes refData to increment the lifecycle's
// reference count. When it's finished, it invokes the returned function
// [Batch.unrefData].
func (b *Batch) refData() (unref func()) {
// TODO(jackson): Hide the wal.RefCount implementation from the public Batch interface.

// Ref implements wal.RefCount. If the WAL writer may need to read b.data after
// it returns, it invokes Ref to increment the lifecycle's reference count. When
// it's finished, it invokes Unref.
func (b *Batch) Ref() {
b.lifecycle.Add(+1)
return b.unrefData
}

func (b *Batch) unrefData() {
// Unref implemets wal.RefCount.
func (b *Batch) Unref() {
if v := b.lifecycle.Add(-1); (v ^ batchClosedBit) == 0 {
// The [batchClosedBit] high bit is set, and there are no outstanding
// references. The user of the Batch called [Batch.Close], expecting the
Expand Down
4 changes: 2 additions & 2 deletions batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,11 +464,11 @@ func TestBatchReuse(t *testing.T) {
fmt.Fprintf(&buf, "%s = %b\n", l, v)
case fields.Index(1) == "refData":
// Command of the form: b1.refData()
batches[fields.Index(0).Str()].refData()
batches[fields.Index(0).Str()].Ref()
fmt.Fprintf(&buf, "%s\n", l)
case fields.Index(1) == "unrefData":
// Command of the form: b1.unrefData()
batches[fields.Index(0).Str()].unrefData()
batches[fields.Index(0).Str()].Unref()
fmt.Fprintf(&buf, "%s\n", l)
case fields.Index(1) == "Close":
// Command of the form: b1.Close()
Expand Down
4 changes: 2 additions & 2 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -913,7 +913,7 @@ func (d *DB) commitWrite(b *Batch, syncWG *sync.WaitGroup, syncErr *error) (*mem
b.flushable.setSeqNum(b.SeqNum())
if !d.opts.DisableWAL {
var err error
size, err = d.mu.log.writer.WriteRecord(repr, wal.SyncOptions{Done: syncWG, Err: syncErr}, b.refData)
size, err = d.mu.log.writer.WriteRecord(repr, wal.SyncOptions{Done: syncWG, Err: syncErr}, b)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -951,7 +951,7 @@ func (d *DB) commitWrite(b *Batch, syncWG *sync.WaitGroup, syncErr *error) (*mem
}

if b.flushable == nil {
size, err = d.mu.log.writer.WriteRecord(repr, wal.SyncOptions{Done: syncWG, Err: syncErr}, b.refData)
size, err = d.mu.log.writer.WriteRecord(repr, wal.SyncOptions{Done: syncWG, Err: syncErr}, b)
if err != nil {
panic(err)
}
Expand Down
21 changes: 10 additions & 11 deletions wal/failover_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ import (
type recordQueueEntry struct {
p []byte
opts SyncOptions
unref func()
refCount RefCount
writeStartUnixNanos int64
}

type poppedEntry struct {
opts SyncOptions
unref func()
refCount RefCount
writeStartUnixNanos int64
}

Expand Down Expand Up @@ -151,7 +151,7 @@ func (q *recordQueue) init(failoverWriteAndSyncLatency prometheus.Histogram) {
func (q *recordQueue) push(
p []byte,
opts SyncOptions,
unref func(),
refCount RefCount,
writeStartUnixNanos int64,
latestLogSizeInWriteRecord int64,
latestWriterInWriteRecord *record.LogWriter,
Expand All @@ -175,7 +175,7 @@ func (q *recordQueue) push(
q.buffer[int(h)%m] = recordQueueEntry{
p: p,
opts: opts,
unref: unref,
refCount: refCount,
writeStartUnixNanos: writeStartUnixNanos,
}
// Reclaim memory for consumed entries. We couldn't do that in pop since
Expand Down Expand Up @@ -255,7 +255,7 @@ func (q *recordQueue) pop(index uint32, err error) (numSyncsPopped int) {
idx := (i + int(tail)) % n
b[i] = poppedEntry{
opts: q.buffer[idx].opts,
unref: q.buffer[idx].unref,
refCount: q.buffer[idx].refCount,
writeStartUnixNanos: q.buffer[idx].writeStartUnixNanos,
}
}
Expand All @@ -268,8 +268,8 @@ func (q *recordQueue) pop(index uint32, err error) (numSyncsPopped int) {
for i := 0; i < numEntriesToPop; i++ {
// Now that we've synced the entry, we can unref it to signal that we
// will not read the written byte slice again.
if b[i].unref != nil {
b[i].unref()
if b[i].refCount != nil {
b[i].refCount.Unref()
}
if b[i].opts.Done != nil {
numSyncsPopped++
Expand Down Expand Up @@ -499,11 +499,10 @@ func newFailoverWriter(

// WriteRecord implements Writer.
func (ww *failoverWriter) WriteRecord(
p []byte, opts SyncOptions, ref RefFunc,
p []byte, opts SyncOptions, ref RefCount,
) (logicalOffset int64, err error) {
var unref func()
if ref != nil {
unref = ref()
ref.Ref()
}
var writeStartUnixNanos int64
if opts.Done != nil {
Expand All @@ -512,7 +511,7 @@ func (ww *failoverWriter) WriteRecord(
recordIndex, writer, lastLogSize := ww.q.push(
p,
opts,
unref,
ref,
writeStartUnixNanos,
ww.logicalOffset.latestLogSizeInWriteRecord,
ww.logicalOffset.latestWriterInWriteRecord,
Expand Down
2 changes: 1 addition & 1 deletion wal/standalone_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ var _ Writer = &standaloneWriter{}

// WriteRecord implements Writer.
func (w *standaloneWriter) WriteRecord(
p []byte, opts SyncOptions, _ RefFunc,
p []byte, opts SyncOptions, _ RefCount,
) (logicalOffset int64, err error) {
return w.w.SyncRecord(p, opts.Done, opts.Err)
}
Expand Down
21 changes: 13 additions & 8 deletions wal/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,21 +384,26 @@ type Writer interface {
//
// Some Writer implementations may continue to read p after WriteRecord
// returns. This is an obstacle to reusing p's memory. If the caller would
// like to reuse p's memory, the caller may pass a non-nil [RefFunc].
// If the Writer will retain p, it will invoke the [RefFunc] before
// returning. When it's finished, it will invoke the func returned by the
// [RefFunc] to release its reference.
WriteRecord(p []byte, opts SyncOptions, ref RefFunc) (logicalOffset int64, err error)
// like to reuse p's memory, the caller may pass a non-nil [RefCount]. If
// the Writer will retain p, it will invoke the [RefCount] before returning.
// When it's finished, it will invoke [RefCount.Unref] to release its
// reference.
WriteRecord(p []byte, opts SyncOptions, ref RefCount) (logicalOffset int64, err error)
// Close the writer.
Close() (logicalOffset int64, err error)
// Metrics must be called after Close. The callee will no longer modify the
// returned LogWriterMetrics.
Metrics() record.LogWriterMetrics
}

// RefFunc holds funcs to increment a reference count associated with a record
// passed to [Writer.WriteRecord]. See the comment on WriteRecord.
type RefFunc func() (unref func())
// RefCount is a reference count associated with a record passed to
// [Writer.WriteRecord]. See the comment on WriteRecord.
type RefCount interface {
// Ref increments the reference count.
Ref()
// Unref increments the reference count.
Unref()
}

// Reader reads a virtual WAL.
type Reader interface {
Expand Down

0 comments on commit 62fc20d

Please sign in to comment.