Skip to content

Commit

Permalink
Merge #123257
Browse files Browse the repository at this point in the history
123257: colexecdisk: speed up external sort with low distsql_workmem r=yuzefovich a=yuzefovich

This commit adjusts the setup of the external sorter to have a minimum amount of memory to work with. We already had a special case for `distsql_workmem = '1B'` (which usually means "force disk spill" testing scenario), but if the workmem was set to something larger than 1B but relatively small (e.g. 20KiB), we could still get into a pathological behavior where the external sort would process the data one tuple at a time, with each tuple comprising the full "partition". This is because we unconditionally consume "disk queue buffers usage" from the available memory, and this consumption is on the order of 1MiB by default. To prevent the pathological behavior we now always bump the memory limit to be at least "disk queue buffers usage" plus 100KiB.

Fixes: #123185.

Release note: None

Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com>
  • Loading branch information
craig[bot] and yuzefovich committed Apr 29, 2024
2 parents 4e178de + fc0c4a0 commit 5a4bae3
Showing 1 changed file with 11 additions and 11 deletions.
22 changes: 11 additions & 11 deletions pkg/sql/colexec/colexecdisk/external_sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,15 +259,22 @@ func NewExternalSorter(
if testingVecFDsToAcquire > 0 {
maxNumberPartitions = testingVecFDsToAcquire
}
// Each disk queue will use up to BufferSizeBytes of RAM, so we'll reduce
// the memoryLimit of the partitions to sort in memory by those cache sizes.
diskQueueBuffersUsage := int64(maxNumberPartitions * diskQueueCfg.BufferSizeBytes)
if memoryLimit == 1 {
// If memory limit is 1, we're likely in a "force disk spill"
// scenario, but we don't want to artificially limit batches when we
// have already spilled, so we'll use a larger limit.
memoryLimit = execinfra.DefaultMemoryLimit
}
// Each disk queue will use up to BufferSizeBytes of RAM, so we reduce the
// memoryLimit of the partitions to sort in memory by those cache sizes.
memoryLimit -= int64(maxNumberPartitions * diskQueueCfg.BufferSizeBytes)
} else if budget := int64(100 << 10); memoryLimit < diskQueueBuffersUsage+budget {
// If the limit is not 1 but relatively small, we'll ensure that we have
// at least 100KiB of memory to work with. This memory is only utilized
// after we have spilled to disk, so we want to avoid sorting the data
// one tuple at a time (with each tuple forcing us to repartition).
memoryLimit = diskQueueBuffersUsage + budget
}
memoryLimit -= diskQueueBuffersUsage
// We give half of the available RAM to the in-memory sorter. Note that we
// will reuse that memory for each partition and will be holding onto it all
// the time, so we cannot "return" this usage after spilling each partition.
Expand All @@ -276,13 +283,6 @@ func NewExternalSorter(
inMemSortOutputLimit := inMemSortTotalMemoryLimit / 5
// We give another half of the available RAM to the merge operation.
mergeMemoryLimit := memoryLimit / 2
if inMemSortPartitionLimit < 1 {
// If the memory limit is 0, the input partitioning operator will return
// a zero-length batch, so make it at least 1.
inMemSortPartitionLimit = 1
inMemSortOutputLimit = 1
mergeMemoryLimit = 1
}
inputPartitioner := newInputPartitioningOperator(sortUnlimitedAllocator, input, inputTypes, inMemSortPartitionLimit)
var inMemSorter colexecop.ResettableOperator
if topK > 0 {
Expand Down

0 comments on commit 5a4bae3

Please sign in to comment.