Skip to content

Commit

Permalink
Merge pull request #917 from broneill/integration
Browse files Browse the repository at this point in the history
fix(memory): Forcibly reclaim time-ordered-blocks when running out of…
  • Loading branch information
broneill committed Oct 6, 2020
2 parents fd05e63 + 0b6ac20 commit 7fc6222
Showing 1 changed file with 38 additions and 21 deletions.
59 changes: 38 additions & 21 deletions memory/src/main/scala/filodb.memory/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -397,40 +397,56 @@ class PageAlignedBlockManager(val totalMemorySizeInBytes: Long,
reclaimLog += event
}

//scalastyle:off
protected def tryReclaim(num: Int): Unit = {
var reclaimed = 0
var currList = 0
val timeOrderedListIt = usedBlocksTimeOrdered.entrySet.iterator
while ( reclaimed < num &&
timeOrderedListIt.hasNext ) {
val entry = timeOrderedListIt.next
val prevReclaimed = reclaimed
val removed = reclaimFrom(entry.getValue, stats.timeOrderedBlocksReclaimedMetric)
if (removed.nonEmpty) {
logger.info(s"timeBlockReclaim: Reclaimed ${removed.length} time ordered blocks " +
s"from list at t=${entry.getKey} (${(System.currentTimeMillis - entry.getKey)/3600000} hrs ago) " +
s"\nReclaimed blocks: ${removed.map(b => jLong.toHexString(b.address)).mkString(" ")}")

// First reclaim time-ordered blocks which are marked as reclaimable.
reclaimTimeOrdered(false);

if (reclaimed < num) {
// Not enough reclaimed, so try reclaiming non-time-ordered blocks which are marked as reclaimable.
reclaimFrom(usedBlocks, stats.blocksReclaimedMetric, false)

if (reclaimed < num) {
// Still not enough? Forcibly reclaim time-ordered blocks.
reclaimTimeOrdered(true);

if (reclaimed < num) {
// Still not enough, but forcibly reclaiming non-time-ordered blocks is dangerous.
logger.warn(s"$num blocks to reclaim but only reclaimed $reclaimed. usedblocks=${usedBlocks.size} " +
s"usedBlocksTimeOrdered=${usedBlocksTimeOrdered.asScala.toList.map{case(n, l) => (n, l.size)}}")
}
}
// If the block list is now empty, remove it from tree map
if (entry.getValue.isEmpty) timeOrderedListIt.remove()
}
if (reclaimed < num) reclaimFrom(usedBlocks, stats.blocksReclaimedMetric)
// if we do not get required blocks even after reclaim call
if (reclaimed < num) {
logger.warn(s"$num blocks to reclaim but only reclaimed $reclaimed. usedblocks=${usedBlocks.size} " +
s"usedBlocksTimeOrdered=${usedBlocksTimeOrdered.asScala.toList.map{case(n, l) => (n, l.size)}}")

def reclaimTimeOrdered(forced: Boolean): Unit = {
val timeOrderedListIt = usedBlocksTimeOrdered.entrySet.iterator
while ( reclaimed < num &&
timeOrderedListIt.hasNext ) {
val entry = timeOrderedListIt.next
val prevReclaimed = reclaimed
val removed = reclaimFrom(entry.getValue, stats.timeOrderedBlocksReclaimedMetric, forced)
if (removed.nonEmpty) {
logger.info(s"timeBlockReclaim: Reclaimed ${removed.length} time ordered blocks " +
s"from list at t=${entry.getKey} (${(System.currentTimeMillis - entry.getKey)/3600000} hrs ago) " +
s"\nReclaimed blocks: ${removed.map(b => jLong.toHexString(b.address)).mkString(" ")}")
}
// If the block list is now empty, remove it from tree map
if (entry.getValue.isEmpty) timeOrderedListIt.remove()
}
}

def reclaimFrom(list: util.ArrayDeque[Block], reclaimedCounter: Counter): Seq[Block] = {
def reclaimFrom(list: util.ArrayDeque[Block], reclaimedCounter: Counter, forced: Boolean): Seq[Block] = {
val entries = list.iterator
val removed = new collection.mutable.ArrayBuffer[Block]
while (entries.hasNext && reclaimed < num) {
val block = entries.next
if (block.canReclaim) {
if (forced || block.canReclaim) {
entries.remove()
removed += block
addToReclaimLog(block)
block.reclaim()
block.reclaim(forced)
block.clearOwner()
freeBlocks.add(block)
stats.freeBlocksMetric.update(freeBlocks.size())
Expand All @@ -441,6 +457,7 @@ class PageAlignedBlockManager(val totalMemorySizeInBytes: Long,
removed
}
}
//scalastyle:on

def numTimeOrderedBlocks: Int = usedBlocksTimeOrdered.values.asScala.map(_.size).sum

Expand Down

0 comments on commit 7fc6222

Please sign in to comment.