From ba9cd5af37d334537b29db8e15b903e7201b5123 Mon Sep 17 00:00:00 2001 From: "Brian S. O'Neill" Date: Thu, 1 Oct 2020 12:16:52 -0700 Subject: [PATCH 1/4] fix(core): Shutdown process if headroom task is failing. (#912) --- .../filodb.core/memstore/TimeSeriesShard.scala | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/filodb.core/memstore/TimeSeriesShard.scala b/core/src/main/scala/filodb.core/memstore/TimeSeriesShard.scala index 5e9c194bb4..4968da4105 100644 --- a/core/src/main/scala/filodb.core/memstore/TimeSeriesShard.scala +++ b/core/src/main/scala/filodb.core/memstore/TimeSeriesShard.scala @@ -30,6 +30,7 @@ import filodb.core.metadata.{Schema, Schemas} import filodb.core.query.{ColumnFilter, QuerySession} import filodb.core.store._ import filodb.memory._ +import filodb.memory.data.ChunkMap import filodb.memory.format.{UnsafeUtils, ZeroCopyUTF8String} import filodb.memory.format.BinaryVector.BinaryVectorPtr import filodb.memory.format.ZeroCopyUTF8String._ @@ -1500,7 +1501,21 @@ class TimeSeriesShard(val ref: DatasetRef, private def startHeadroomTask(sched: Scheduler): Unit = { sched.scheduleWithFixedDelay(1, 1, TimeUnit.MINUTES, new Runnable { - def run() = blockStore.ensureHeadroom(storeConfig.ensureHeadroomPercent) + var numFailures = 0 + + def run() = { + val numFree = blockStore.ensureHeadroom(storeConfig.ensureHeadroomPercent) + if (numFree > 0) { + numFailures = 0 + } else { + numFailures += 1 + if (numFailures >= 5) { + logger.error(s"Headroom task was unable to free memory for $numFailures consecutive attempts. " + + s"Shutting down process. shard=$shardNum") + ChunkMap.haltAndCatchFire() + } + } + } }) } From 8a6a68c834d96d6f1de8ca931bb81743b139e1ed Mon Sep 17 00:00:00 2001 From: "Brian S. O'Neill" Date: Thu, 1 Oct 2020 09:25:25 -0700 Subject: [PATCH 2/4] fix(core,memory): Allow blocks to be reclaimed when ODP fails. (#911) --- .../memstore/DemandPagedChunkStore.scala | 29 ++++++++++++++----- .../memstore/TimeSeriesShard.scala | 2 +- 2 files changed, 22 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/filodb.core/memstore/DemandPagedChunkStore.scala b/core/src/main/scala/filodb.core/memstore/DemandPagedChunkStore.scala index e6945bb55f..14048ba2ba 100644 --- a/core/src/main/scala/filodb.core/memstore/DemandPagedChunkStore.scala +++ b/core/src/main/scala/filodb.core/memstore/DemandPagedChunkStore.scala @@ -2,11 +2,13 @@ package filodb.core.memstore import java.nio.ByteBuffer +import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ import com.typesafe.scalalogging.StrictLogging import monix.eval.Task import org.jctools.maps.NonBlockingHashMapLong +import spire.syntax.cfor._ import filodb.core.store._ import filodb.memory.{BlockManager, BlockMemFactory} @@ -65,6 +67,7 @@ extends RawToPartitionMaker with StrictLogging { /** * Stores raw chunks into offheap memory and populates chunks into partition */ + //scalastyle:off def populateRawChunks(rawPartition: RawPartData): Task[ReadablePartition] = Task { FiloSchedulers.assertThreadName(FiloSchedulers.PopulateChunksSched) // Find the right partition given the partition key @@ -85,10 +88,15 @@ extends RawToPartitionMaker with StrictLogging { val chunkID = ChunkSetInfo.getChunkID(infoBytes) if (!tsPart.chunkmapContains(chunkID)) { + val chunkPtrs = new ArrayBuffer[BinaryVectorPtr](rawVectors.length) memFactory.startMetaSpan() - val chunkPtrs = copyToOffHeap(rawVectors, memFactory) - val metaAddr = memFactory.endMetaSpan(writeMeta(_, tsPart.partID, infoBytes, chunkPtrs), - tsPart.schema.data.blockMetaSize.toShort) + var metaAddr: Long = 0 + try { + copyToOffHeap(rawVectors, memFactory, chunkPtrs) + } finally { + metaAddr = memFactory.endMetaSpan(writeMeta(_, tsPart.partID, infoBytes, chunkPtrs), + tsPart.schema.data.blockMetaSize.toShort) + } require(metaAddr != 0) val infoAddr = metaAddr + 4 // Important: don't point at partID val inserted = tsPart.addChunkInfoIfAbsent(chunkID, infoAddr) @@ -109,6 +117,7 @@ extends RawToPartitionMaker with StrictLogging { throw new RuntimeException(s"Partition [${new String(rawPartition.partitionKey)}] not found, this is bad") } } + //scalastyle:on /** * For a given chunkset, this method calculates the time bucket the chunks fall in. @@ -118,16 +127,20 @@ extends RawToPartitionMaker with StrictLogging { (ChunkSetInfo.getEndTime(infoBytes) / flushIntervalMillis) * flushIntervalMillis /** - * Copies the onHeap contents read from ColStore into off-heap using the given memFactory + * Copies the onHeap contents read from ColStore into off-heap using the given memFactory. + * If an exception is thrown by this method, the tail of chunkPtrs sequence isn't filled in. + * + * @param chunkPtrs filled in by this method */ private def copyToOffHeap(buffers: Array[ByteBuffer], - memFactory: BlockMemFactory): Array[BinaryVectorPtr] = { - buffers.map { buf => - // TODO: if in case the buffer is offheap/direct buffer already, maybe we don't need to copy it? + memFactory: BlockMemFactory, + chunkPtrs: ArrayBuffer[BinaryVectorPtr]): Unit = { + cforRange { 0 until buffers.length } { i => + val buf = buffers(i) val (bufBase, bufOffset, bufLen) = UnsafeUtils.BOLfromBuffer(buf) val vectorAddr = memFactory.allocateOffheap(bufLen) UnsafeUtils.unsafe.copyMemory(bufBase, bufOffset, UnsafeUtils.ZeroPointer, vectorAddr, bufLen) - vectorAddr + chunkPtrs += vectorAddr } } diff --git a/core/src/main/scala/filodb.core/memstore/TimeSeriesShard.scala b/core/src/main/scala/filodb.core/memstore/TimeSeriesShard.scala index 4968da4105..efd3a35a3a 100644 --- a/core/src/main/scala/filodb.core/memstore/TimeSeriesShard.scala +++ b/core/src/main/scala/filodb.core/memstore/TimeSeriesShard.scala @@ -123,7 +123,7 @@ object TimeSeriesShard { /** * Copies serialized ChunkSetInfo bytes from persistent storage / on-demand paging. */ - def writeMeta(addr: Long, partitionID: Int, bytes: Array[Byte], vectors: Array[BinaryVectorPtr]): Unit = { + def writeMeta(addr: Long, partitionID: Int, bytes: Array[Byte], vectors: ArrayBuffer[BinaryVectorPtr]): Unit = { UnsafeUtils.setInt(UnsafeUtils.ZeroPointer, addr, partitionID) ChunkSetInfo.copy(bytes, addr + 4) for { i <- 0 until vectors.size optimized } { From fd05e6302eab345c588b9dc8c3241acf9ee173d5 Mon Sep 17 00:00:00 2001 From: Vish Ramachandran Date: Tue, 6 Oct 2020 15:16:20 -0700 Subject: [PATCH 3/4] bug(core): Shutdown when out of ingestion blocks; Mark ingestion blocks reclaimable on cassandra failure (#915) --- .../memstore/TimeSeriesPartition.scala | 33 +++++++++++-------- .../memstore/TimeSeriesShard.scala | 22 ++++++++----- .../scala/filodb.memory/data/ChunkMap.scala | 17 +++------- .../scala/filodb.memory/data/Shutdown.scala | 16 +++++++++ 4 files changed, 54 insertions(+), 34 deletions(-) create mode 100644 memory/src/main/scala/filodb.memory/data/Shutdown.scala diff --git a/core/src/main/scala/filodb.core/memstore/TimeSeriesPartition.scala b/core/src/main/scala/filodb.core/memstore/TimeSeriesPartition.scala index 871ae227c5..3ba56f451e 100644 --- a/core/src/main/scala/filodb.core/memstore/TimeSeriesPartition.scala +++ b/core/src/main/scala/filodb.core/memstore/TimeSeriesPartition.scala @@ -8,7 +8,7 @@ import filodb.core.Types._ import filodb.core.metadata.{Column, PartitionSchema, Schema} import filodb.core.store._ import filodb.memory.{BinaryRegion, BinaryRegionLarge, BlockMemFactory, MemFactory} -import filodb.memory.data.ChunkMap +import filodb.memory.data.{ChunkMap, Shutdown} import filodb.memory.format._ import filodb.memory.format.MemoryReader._ @@ -221,20 +221,27 @@ extends ChunkMap(memFactory, initMapSize) with ReadablePartition { */ private def encodeOneChunkset(info: ChunkSetInfo, appenders: AppenderArray, blockHolder: BlockMemFactory) = { blockHolder.startMetaSpan() - // optimize and compact chunks - val frozenVectors = appenders.zipWithIndex.map { case (appender, i) => - // This assumption cannot break. We should ensure one vector can be written - // to one block always atleast as per the current design. - // If this gets triggered, decrease the max writebuffer size so smaller chunks are encoded - require(blockHolder.blockAllocationSize() > appender.frozenSize) - val optimized = appender.optimize(blockHolder) - shardStats.encodedBytes.increment(BinaryVector.totalBytes(nativePtrReader, optimized)) - if (schema.data.columns(i).columnType == Column.ColumnType.HistogramColumn) - shardStats.encodedHistBytes.increment(BinaryVector.totalBytes(nativePtrReader, optimized)) - optimized + val frozenVectors = try { + // optimize and compact chunks + appenders.zipWithIndex.map { case (appender, i) => + // This assumption cannot break. We should ensure one vector can be written + // to one block always atleast as per the current design. + // If this gets triggered, decrease the max writebuffer size so smaller chunks are encoded + require(blockHolder.blockAllocationSize() > appender.frozenSize) + val optimized = appender.optimize(blockHolder) + shardStats.encodedBytes.increment(BinaryVector.totalBytes(nativePtrReader, optimized)) + if (schema.data.columns(i).columnType == Column.ColumnType.HistogramColumn) + shardStats.encodedHistBytes.increment(BinaryVector.totalBytes(nativePtrReader, optimized)) + optimized + } + } catch { case e: Exception => + // Shutdown process right away! Reaching this state means that we could not reclaim + // a whole bunch of blocks possibly because they were not marked as reclaimable, + // because of some bug. Cleanup or rollback at this point is not viable. + Shutdown.haltAndCatchFire(new RuntimeException("Error occurred when encoding vectors", e)) + throw e } shardStats.numSamplesEncoded.increment(info.numRows) - // Now, write metadata into offheap block metadata space and update infosChunks val metaAddr = blockHolder.endMetaSpan(TimeSeriesShard.writeMeta(_, partID, info, frozenVectors), schema.data.blockMetaSize.toShort) diff --git a/core/src/main/scala/filodb.core/memstore/TimeSeriesShard.scala b/core/src/main/scala/filodb.core/memstore/TimeSeriesShard.scala index efd3a35a3a..45fe29deeb 100644 --- a/core/src/main/scala/filodb.core/memstore/TimeSeriesShard.scala +++ b/core/src/main/scala/filodb.core/memstore/TimeSeriesShard.scala @@ -30,7 +30,7 @@ import filodb.core.metadata.{Schema, Schemas} import filodb.core.query.{ColumnFilter, QuerySession} import filodb.core.store._ import filodb.memory._ -import filodb.memory.data.ChunkMap +import filodb.memory.data.Shutdown import filodb.memory.format.{UnsafeUtils, ZeroCopyUTF8String} import filodb.memory.format.BinaryVector.BinaryVectorPtr import filodb.memory.format.ZeroCopyUTF8String._ @@ -332,7 +332,7 @@ class TimeSeriesShard(val ref: DatasetRef, private[memstore] final val reclaimLock = blockStore.reclaimLock // Requires blockStore. - startHeadroomTask(ingestSched) + private val headroomTask = startHeadroomTask(ingestSched) // Each shard has a single ingestion stream at a time. This BlockMemFactory is used for buffer overflow encoding // strictly during ingest() and switchBuffers(). @@ -951,8 +951,7 @@ class TimeSeriesShard(val ref: DatasetRef, val result = Future.sequence(Seq(writeChunksFuture, writeDirtyPartKeysFuture, pubDownsampleFuture)).map { _.find(_.isInstanceOf[ErrorResponse]).getOrElse(Success) }.flatMap { - case Success => blockHolder.markUsedBlocksReclaimable() - commitCheckpoint(ref, shardNum, flushGroup) + case Success => commitCheckpoint(ref, shardNum, flushGroup) case er: ErrorResponse => Future.successful(er) }.recover { case e => logger.error(s"Internal Error when persisting chunks in dataset=$ref shard=$shardNum - should " + @@ -962,6 +961,13 @@ class TimeSeriesShard(val ref: DatasetRef, result.onComplete { resp => assertThreadName(IngestSchedName) try { + // COMMENTARY ON BUG FIX DONE: Mark used blocks as reclaimable even on failure. Even if cassandra write fails + // or other errors occur, we cannot leave blocks as not reclaimable and also release the factory back into pool. + // Earlier, we were not calling this with the hope that next use of the blockMemFactory will mark them + // as reclaimable. But the factory could be used for a different flush group. Not the same one. It can + // succeed, and the wrong blocks can be marked as reclaimable. + // Can try out tracking unreclaimed blockMemFactories without releasing, but it needs to be separate PR. + blockHolder.markUsedBlocksReclaimable() blockFactoryPool.release(blockHolder) flushDoneTasks(flushGroup, resp) tracer.finish() @@ -1499,7 +1505,7 @@ class TimeSeriesShard(val ref: DatasetRef, }) } - private def startHeadroomTask(sched: Scheduler): Unit = { + private def startHeadroomTask(sched: Scheduler) = { sched.scheduleWithFixedDelay(1, 1, TimeUnit.MINUTES, new Runnable { var numFailures = 0 @@ -1510,9 +1516,8 @@ class TimeSeriesShard(val ref: DatasetRef, } else { numFailures += 1 if (numFailures >= 5) { - logger.error(s"Headroom task was unable to free memory for $numFailures consecutive attempts. " + - s"Shutting down process. shard=$shardNum") - ChunkMap.haltAndCatchFire() + Shutdown.haltAndCatchFire(new RuntimeException(s"Headroom task was unable to free memory " + + s"for $numFailures consecutive attempts. Shutting down process. shard=$shardNum")) } } } @@ -1556,6 +1561,7 @@ class TimeSeriesShard(val ref: DatasetRef, method to ensure that no threads are accessing the memory before it's freed. blockStore.releaseBlocks() */ + headroomTask.cancel() ingestSched.shutdown() } } diff --git a/memory/src/main/scala/filodb.memory/data/ChunkMap.scala b/memory/src/main/scala/filodb.memory/data/ChunkMap.scala index 86d9a5f1f6..1b036b2e27 100644 --- a/memory/src/main/scala/filodb.memory/data/ChunkMap.scala +++ b/memory/src/main/scala/filodb.memory/data/ChunkMap.scala @@ -113,20 +113,12 @@ object ChunkMap extends StrictLogging { def validateNoSharedLocks(unitTest: Boolean = false): Unit = { val numLocksReleased = ChunkMap.releaseAllSharedLocks() if (numLocksReleased > 0) { - val msg = s"Number of locks was non-zero: $numLocksReleased. " + - s"This is indicative of a possible lock acquisition/release bug." - if (unitTest) { - throw new Error(msg) - } - logger.error(msg) - haltAndCatchFire() + val ex = new RuntimeException(s"Number of locks was non-zero: $numLocksReleased. " + + s"This is indicative of a possible lock acquisition/release bug.") + Shutdown.haltAndCatchFire(ex) } } - def haltAndCatchFire(): Unit = { - logger.error(s"Shutting down process since it may be in an unstable/corrupt state.") - Runtime.getRuntime.halt(1) - } } /** @@ -273,8 +265,7 @@ class ChunkMap(val memFactory: MemFactory, var capacity: Int) { warned = true } else if (warned && timeoutNanos >= MaxExclusiveRetryTimeoutNanos) { val lockState = UnsafeUtils.getIntVolatile(this, lockStateOffset) - _logger.error(s"Unable to acquire exclusive lock: $lockState") - haltAndCatchFire() + Shutdown.haltAndCatchFire(new RuntimeException(s"Unable to acquire exclusive lock: $lockState")) } } } diff --git a/memory/src/main/scala/filodb.memory/data/Shutdown.scala b/memory/src/main/scala/filodb.memory/data/Shutdown.scala new file mode 100644 index 0000000000..3fdf2d6aff --- /dev/null +++ b/memory/src/main/scala/filodb.memory/data/Shutdown.scala @@ -0,0 +1,16 @@ +package filodb.memory.data + +import com.typesafe.scalalogging.StrictLogging +import kamon.Kamon + +object Shutdown extends StrictLogging { + + val forcedShutdowns = Kamon.counter("forced-shutdowns").withoutTags() + def haltAndCatchFire(e: Exception, unitTest: Boolean = false): Unit = { + forcedShutdowns.increment() + if (unitTest) throw e + logger.error(s"Shutting down process since it may be in an unstable/corrupt state", e) + Runtime.getRuntime.halt(189) + } + +} From 0b6ac2002d5ee22856513fa0a82d469e106343c6 Mon Sep 17 00:00:00 2001 From: "Brian S. O'Neill" Date: Tue, 6 Oct 2020 14:58:58 -0700 Subject: [PATCH 4/4] fix(memory): Forcibly reclaim time-ordered-blocks when running out of memory. (#914) --- .../scala/filodb.memory/BlockManager.scala | 59 ++++++++++++------- 1 file changed, 38 insertions(+), 21 deletions(-) diff --git a/memory/src/main/scala/filodb.memory/BlockManager.scala b/memory/src/main/scala/filodb.memory/BlockManager.scala index 2f8edbb5aa..de2ef21c63 100644 --- a/memory/src/main/scala/filodb.memory/BlockManager.scala +++ b/memory/src/main/scala/filodb.memory/BlockManager.scala @@ -397,40 +397,56 @@ class PageAlignedBlockManager(val totalMemorySizeInBytes: Long, reclaimLog += event } + //scalastyle:off protected def tryReclaim(num: Int): Unit = { var reclaimed = 0 - var currList = 0 - val timeOrderedListIt = usedBlocksTimeOrdered.entrySet.iterator - while ( reclaimed < num && - timeOrderedListIt.hasNext ) { - val entry = timeOrderedListIt.next - val prevReclaimed = reclaimed - val removed = reclaimFrom(entry.getValue, stats.timeOrderedBlocksReclaimedMetric) - if (removed.nonEmpty) { - logger.info(s"timeBlockReclaim: Reclaimed ${removed.length} time ordered blocks " + - s"from list at t=${entry.getKey} (${(System.currentTimeMillis - entry.getKey)/3600000} hrs ago) " + - s"\nReclaimed blocks: ${removed.map(b => jLong.toHexString(b.address)).mkString(" ")}") + + // First reclaim time-ordered blocks which are marked as reclaimable. + reclaimTimeOrdered(false); + + if (reclaimed < num) { + // Not enough reclaimed, so try reclaiming non-time-ordered blocks which are marked as reclaimable. + reclaimFrom(usedBlocks, stats.blocksReclaimedMetric, false) + + if (reclaimed < num) { + // Still not enough? Forcibly reclaim time-ordered blocks. + reclaimTimeOrdered(true); + + if (reclaimed < num) { + // Still not enough, but forcibly reclaiming non-time-ordered blocks is dangerous. + logger.warn(s"$num blocks to reclaim but only reclaimed $reclaimed. usedblocks=${usedBlocks.size} " + + s"usedBlocksTimeOrdered=${usedBlocksTimeOrdered.asScala.toList.map{case(n, l) => (n, l.size)}}") + } } - // If the block list is now empty, remove it from tree map - if (entry.getValue.isEmpty) timeOrderedListIt.remove() } - if (reclaimed < num) reclaimFrom(usedBlocks, stats.blocksReclaimedMetric) - // if we do not get required blocks even after reclaim call - if (reclaimed < num) { - logger.warn(s"$num blocks to reclaim but only reclaimed $reclaimed. usedblocks=${usedBlocks.size} " + - s"usedBlocksTimeOrdered=${usedBlocksTimeOrdered.asScala.toList.map{case(n, l) => (n, l.size)}}") + + def reclaimTimeOrdered(forced: Boolean): Unit = { + val timeOrderedListIt = usedBlocksTimeOrdered.entrySet.iterator + while ( reclaimed < num && + timeOrderedListIt.hasNext ) { + val entry = timeOrderedListIt.next + val prevReclaimed = reclaimed + val removed = reclaimFrom(entry.getValue, stats.timeOrderedBlocksReclaimedMetric, forced) + if (removed.nonEmpty) { + logger.info(s"timeBlockReclaim: Reclaimed ${removed.length} time ordered blocks " + + s"from list at t=${entry.getKey} (${(System.currentTimeMillis - entry.getKey)/3600000} hrs ago) " + + s"\nReclaimed blocks: ${removed.map(b => jLong.toHexString(b.address)).mkString(" ")}") + } + // If the block list is now empty, remove it from tree map + if (entry.getValue.isEmpty) timeOrderedListIt.remove() + } } - def reclaimFrom(list: util.ArrayDeque[Block], reclaimedCounter: Counter): Seq[Block] = { + def reclaimFrom(list: util.ArrayDeque[Block], reclaimedCounter: Counter, forced: Boolean): Seq[Block] = { val entries = list.iterator val removed = new collection.mutable.ArrayBuffer[Block] while (entries.hasNext && reclaimed < num) { val block = entries.next - if (block.canReclaim) { + if (forced || block.canReclaim) { entries.remove() removed += block addToReclaimLog(block) - block.reclaim() + block.reclaim(forced) block.clearOwner() freeBlocks.add(block) stats.freeBlocksMetric.update(freeBlocks.size()) @@ -441,6 +457,7 @@ class PageAlignedBlockManager(val totalMemorySizeInBytes: Long, removed } } + //scalastyle:on def numTimeOrderedBlocks: Int = usedBlocksTimeOrdered.values.asScala.map(_.size).sum