Skip to content

Commit

Permalink
Merge pull request #120244 from cockroachdb/blathers/backport-release…
Browse files Browse the repository at this point in the history
…-23.1.17-rc-119799

release-23.1.17-rc: sql: fix a couple of memory leaks around memory monitors
  • Loading branch information
yuzefovich committed Mar 13, 2024
2 parents 0534eb1 + 331f639 commit d50ccb5
Show file tree
Hide file tree
Showing 9 changed files with 61 additions and 58 deletions.
3 changes: 1 addition & 2 deletions pkg/sql/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,8 +706,7 @@ func (ib *IndexBackfiller) Close(ctx context.Context) {
func (ib *IndexBackfiller) GrowBoundAccount(ctx context.Context, growBy int64) error {
ib.muBoundAccount.Lock()
defer ib.muBoundAccount.Unlock()
err := ib.muBoundAccount.boundAccount.Grow(ctx, growBy)
return err
return ib.muBoundAccount.boundAccount.Grow(ctx, growBy)
}

// ShrinkBoundAccount shrinks the mutex protected bound account backing the
Expand Down
28 changes: 13 additions & 15 deletions pkg/sql/backfill/mvcc_index_merger.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
Expand Down Expand Up @@ -87,7 +86,6 @@ type IndexBackfillMerger struct {

evalCtx *eval.Context

mon *mon.BytesMonitor
muBoundAccount muBoundAccount
}

Expand All @@ -109,6 +107,16 @@ func (ibm *IndexBackfillMerger) Run(ctx context.Context, output execinfra.RowRec
ctx = logtags.AddTag(ctx, opName, int(ibm.spec.Table.ID))
ctx, span := execinfra.ProcessorSpan(ctx, ibm.flowCtx, opName, ibm.processorID)
defer span.Finish()
// This method blocks until all worker goroutines exit, so it's safe to
// close memory monitoring infra in defers.
mergerMon := execinfra.NewMonitor(ctx, ibm.flowCtx.Cfg.BackfillerMonitor, "index-backfiller-merger-mon")
defer mergerMon.Stop(ctx)
ibm.muBoundAccount.boundAccount = mergerMon.MakeBoundAccount()
defer func() {
ibm.muBoundAccount.Lock()
defer ibm.muBoundAccount.Unlock()
ibm.muBoundAccount.boundAccount.Close(ctx)
}()
defer output.ProducerDone()
defer execinfra.SendTraceData(ctx, ibm.flowCtx, output)

Expand Down Expand Up @@ -511,25 +519,15 @@ func (ibm *IndexBackfillMerger) Resume(output execinfra.RowReceiver) {

// NewIndexBackfillMerger creates a new IndexBackfillMerger.
func NewIndexBackfillMerger(
ctx context.Context,
flowCtx *execinfra.FlowCtx,
processorID int32,
spec execinfrapb.IndexBackfillMergerSpec,
) (*IndexBackfillMerger, error) {
mergerMon := execinfra.NewMonitor(ctx, flowCtx.Cfg.BackfillerMonitor,
"index-backfiller-merger-mon")

ibm := &IndexBackfillMerger{
flowCtx *execinfra.FlowCtx, processorID int32, spec execinfrapb.IndexBackfillMergerSpec,
) *IndexBackfillMerger {
return &IndexBackfillMerger{
processorID: processorID,
spec: spec,
desc: tabledesc.NewUnsafeImmutable(&spec.Table),
flowCtx: flowCtx,
evalCtx: flowCtx.NewEvalCtx(),
mon: mergerMon,
}

ibm.muBoundAccount.boundAccount = mergerMon.MakeBoundAccount()
return ibm, nil
}

// IndexBackfillMergerTestingKnobs is for testing the distributed processors for
Expand Down
6 changes: 5 additions & 1 deletion pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1018,6 +1018,7 @@ func (s *Server) newConnExecutor(
memMetrics.TxnMaxBytesHist,
-1 /* increment */, noteworthyMemoryUsageBytes, s.cfg.Settings,
)
txnFingerprintIDCacheAcc := sessionMon.MakeBoundAccount()

nodeIDOrZero, _ := s.cfg.NodeInfo.NodeID.OptionalNodeID()
ex := &connExecutor{
Expand Down Expand Up @@ -1059,7 +1060,8 @@ func (s *Server) newConnExecutor(
indexUsageStats: s.indexUsageStats,
txnIDCacheWriter: s.txnIDCache,
totalActiveTimeStopWatch: timeutil.NewStopWatch(),
txnFingerprintIDCache: NewTxnFingerprintIDCache(s.cfg.Settings, sessionRootMon),
txnFingerprintIDCache: NewTxnFingerprintIDCache(ctx, s.cfg.Settings, &txnFingerprintIDCacheAcc),
txnFingerprintIDAcc: &txnFingerprintIDCacheAcc,
}

ex.state.txnAbortCount = ex.metrics.EngineMetrics.TxnAbortCount
Expand Down Expand Up @@ -1268,6 +1270,7 @@ func (ex *connExecutor) close(ctx context.Context, closeType closeType) {
ex.mu.IdleInSessionTimeout.Stop()
ex.mu.IdleInTransactionSessionTimeout.Stop()

ex.txnFingerprintIDAcc.Close(ctx)
if closeType != panicClose {
ex.state.mon.Stop(ctx)
ex.sessionPreparedMon.Stop(ctx)
Expand Down Expand Up @@ -1646,6 +1649,7 @@ type connExecutor struct {
// txnFingerprintIDCache is used to track the most recent
// txnFingerprintIDs executed in this session.
txnFingerprintIDCache *TxnFingerprintIDCache
txnFingerprintIDAcc *mon.BoundAccount

// totalActiveTimeStopWatch tracks the total active time of the session.
// This is defined as the time spent executing transactions and statements.
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -2915,7 +2915,7 @@ func (ex *connExecutor) onTxnFinish(ctx context.Context, ev txnEvent) {
transactionFingerprintID :=
appstatspb.TransactionFingerprintID(ex.extraTxnState.transactionStatementsHash.Sum())

err := ex.txnFingerprintIDCache.Add(transactionFingerprintID)
err := ex.txnFingerprintIDCache.Add(ctx, transactionFingerprintID)
if err != nil {
if log.V(1) {
log.Warningf(ctx, "failed to enqueue transactionFingerprintID = %d: %s", transactionFingerprintID, err)
Expand Down
5 changes: 1 addition & 4 deletions pkg/sql/delete_preserving_index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -704,17 +704,14 @@ func TestMergeProcessor(t *testing.T) {
sp := tableDesc.IndexSpan(codec, srcIndex.GetID())

output := fakeReceiver{}
im, err := backfill.NewIndexBackfillMerger(ctx, &flowCtx, 0 /* processorID */, execinfrapb.IndexBackfillMergerSpec{
im := backfill.NewIndexBackfillMerger(&flowCtx, 0 /* processorID */, execinfrapb.IndexBackfillMergerSpec{
Table: tableDesc.TableDescriptor,
TemporaryIndexes: []descpb.IndexID{srcIndex.GetID()},
AddedIndexes: []descpb.IndexID{dstIndex.GetID()},
Spans: []roachpb.Span{sp},
SpanIdx: []int32{0},
MergeTimestamp: kvDB.Clock().Now(),
})
if err != nil {
t.Fatal(err)
}

im.Run(ctx, &output)
if output.err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/rowexec/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ func NewProcessor(
if err := checkNumIn(inputs, 0); err != nil {
return nil, err
}
return backfill.NewIndexBackfillMerger(ctx, flowCtx, processorID, *core.IndexBackfillMerger)
return backfill.NewIndexBackfillMerger(flowCtx, processorID, *core.IndexBackfillMerger), nil
}
if core.Ttl != nil {
if err := checkNumIn(inputs, 0); err != nil {
Expand Down
39 changes: 20 additions & 19 deletions pkg/sql/txn_fingerprint_id_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package sql

import (
"context"
"unsafe"

"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -42,16 +43,19 @@ type TxnFingerprintIDCache struct {
acc *mon.BoundAccount
cache *cache.UnorderedCache
}

mon *mon.BytesMonitor
}

const (
cacheEntrySize = int64(unsafe.Sizeof(cache.Entry{}))
txnFingerprintIDSize = int64(unsafe.Sizeof(appstatspb.TransactionFingerprintID(0)))
)

// NewTxnFingerprintIDCache returns a new TxnFingerprintIDCache.
func NewTxnFingerprintIDCache(
st *cluster.Settings, parentMon *mon.BytesMonitor,
ctx context.Context, st *cluster.Settings, acc *mon.BoundAccount,
) *TxnFingerprintIDCache {
b := &TxnFingerprintIDCache{st: st}

b.mu.acc = acc
b.mu.cache = cache.NewUnorderedCache(cache.Config{
Policy: cache.CacheFIFO,
ShouldEvict: func(size int, _, _ interface{}) bool {
Expand All @@ -63,29 +67,26 @@ func NewTxnFingerprintIDCache(
return int64(size) > capacity
},
OnEvictedEntry: func(entry *cache.Entry) {
b.mu.acc.Shrink(context.Background(), 1)
// We must be holding the mutex already because this callback is
// executed during Cache.Add which we surround with the lock.
b.mu.AssertHeld()
b.mu.acc.Shrink(ctx, cacheEntrySize+txnFingerprintIDSize)
},
})

monitor := mon.NewMonitorInheritWithLimit("txn-fingerprint-id-cache", 0 /* limit */, parentMon)
b.mon = monitor
b.mon.StartNoReserved(context.Background(), parentMon)

return b
}

// Add adds a TxnFingerprintID to the cache, truncating the cache to the cache's capacity
// if necessary.
func (b *TxnFingerprintIDCache) Add(value appstatspb.TransactionFingerprintID) error {
// Add adds a TxnFingerprintID to the cache, truncating the cache to the cache's
// capacity if necessary.
func (b *TxnFingerprintIDCache) Add(
ctx context.Context, id appstatspb.TransactionFingerprintID,
) error {
b.mu.Lock()
defer b.mu.Unlock()

if err := b.mu.acc.Grow(context.Background(), 1); err != nil {
if err := b.mu.acc.Grow(ctx, cacheEntrySize+txnFingerprintIDSize); err != nil {
return err
}

b.mu.cache.Add(value, value)

b.mu.cache.Add(id, nil /* value */)
return nil
}

Expand All @@ -105,7 +106,7 @@ func (b *TxnFingerprintIDCache) GetAllTxnFingerprintIDs() []appstatspb.Transacti
txnFingerprintIDsRemoved := make([]appstatspb.TransactionFingerprintID, 0)

b.mu.cache.Do(func(entry *cache.Entry) {
id := entry.Value.(appstatspb.TransactionFingerprintID)
id := entry.Key.(appstatspb.TransactionFingerprintID)

if int64(len(txnFingerprintIDs)) == size {
txnFingerprintIDsRemoved = append(txnFingerprintIDsRemoved, id)
Expand Down
17 changes: 3 additions & 14 deletions pkg/sql/txn_fingerprint_id_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package sql
import (
"context"
"fmt"
"math"
"sort"
"strconv"
"testing"
Expand All @@ -28,7 +27,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/datadriven"
"github.com/stretchr/testify/require"
Expand All @@ -46,17 +44,8 @@ func TestTxnFingerprintIDCacheDataDriven(t *testing.T) {
var capacity int
d.ScanArgs(t, "capacity", &capacity)

st := &cluster.Settings{}
monitor := mon.NewUnlimitedMonitor(
ctx,
"test",
mon.MemoryResource,
nil, /* currCount */
nil, /* maxHist */
math.MaxInt64,
st,
)
txnFingerprintIDCache = NewTxnFingerprintIDCache(st, monitor)
st := cluster.MakeTestingClusterSettings()
txnFingerprintIDCache = NewTxnFingerprintIDCache(ctx, st, nil /* acc */)

TxnFingerprintIDCacheCapacity.Override(ctx, &st.SV, int64(capacity))

Expand All @@ -77,7 +66,7 @@ func TestTxnFingerprintIDCacheDataDriven(t *testing.T) {
require.NoError(t, err)
txnFingerprintID := appstatspb.TransactionFingerprintID(id)

err = txnFingerprintIDCache.Add(txnFingerprintID)
err = txnFingerprintIDCache.Add(ctx, txnFingerprintID)
require.NoError(t, err)

return fmt.Sprintf("size: %d", txnFingerprintIDCache.size())
Expand Down
17 changes: 16 additions & 1 deletion pkg/util/mon/bytes_usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ func NewMonitorInheritWithLimit(
}

// noReserved is safe to be used by multiple monitors as the "reserved" account
// since only its 'used' field will ever be read.
// since only its 'used' and 'reserved' fields will ever be read.
var noReserved = BoundAccount{}

// StartNoReserved is the same as Start when there is no pre-reserved budget.
Expand Down Expand Up @@ -623,6 +623,18 @@ func (mm *BytesMonitor) doStop(ctx context.Context, check bool) {
parent.mu.numChildren--
parent.mu.Unlock()
}
// If this monitor still has children, let's lose the reference to them as
// well as break the references between them to aid GC.
if mm.mu.head != nil {
nextChild := mm.mu.head
mm.mu.head = nil
for nextChild != nil {
next := nextChild.parentMu.nextSibling
nextChild.parentMu.prevSibling = nil
nextChild.parentMu.nextSibling = nil
nextChild = next
}
}

// Disable the pool for further allocations, so that further
// uses outside of monitor control get errors.
Expand All @@ -631,6 +643,9 @@ func (mm *BytesMonitor) doStop(ctx context.Context, check bool) {
// Release the reserved budget to its original pool, if any.
if mm.reserved != &noReserved {
mm.reserved.Clear(ctx)
// Make sure to lose reference to the reserved account because it has a
// pointer to the parent monitor.
mm.reserved = &noReserved
}
}

Expand Down

0 comments on commit d50ccb5

Please sign in to comment.