Skip to content

Commit

Permalink
Merge branch 'integration' into main-0.9.9
Browse files Browse the repository at this point in the history
  • Loading branch information
tjackpaul committed Oct 9, 2020
2 parents efbf985 + 7fc6222 commit ae0b356
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 61 deletions.
Expand Up @@ -2,11 +2,13 @@ package filodb.core.memstore

import java.nio.ByteBuffer

import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._

import com.typesafe.scalalogging.StrictLogging
import monix.eval.Task
import org.jctools.maps.NonBlockingHashMapLong
import spire.syntax.cfor._

import filodb.core.store._
import filodb.memory.{BlockManager, BlockMemFactory}
Expand Down Expand Up @@ -65,6 +67,7 @@ extends RawToPartitionMaker with StrictLogging {
/**
* Stores raw chunks into offheap memory and populates chunks into partition
*/
//scalastyle:off
def populateRawChunks(rawPartition: RawPartData): Task[ReadablePartition] = Task {
FiloSchedulers.assertThreadName(FiloSchedulers.PopulateChunksSched)
// Find the right partition given the partition key
Expand All @@ -85,10 +88,15 @@ extends RawToPartitionMaker with StrictLogging {
val chunkID = ChunkSetInfo.getChunkID(infoBytes)

if (!tsPart.chunkmapContains(chunkID)) {
val chunkPtrs = new ArrayBuffer[BinaryVectorPtr](rawVectors.length)
memFactory.startMetaSpan()
val chunkPtrs = copyToOffHeap(rawVectors, memFactory)
val metaAddr = memFactory.endMetaSpan(writeMeta(_, tsPart.partID, infoBytes, chunkPtrs),
tsPart.schema.data.blockMetaSize.toShort)
var metaAddr: Long = 0
try {
copyToOffHeap(rawVectors, memFactory, chunkPtrs)
} finally {
metaAddr = memFactory.endMetaSpan(writeMeta(_, tsPart.partID, infoBytes, chunkPtrs),
tsPart.schema.data.blockMetaSize.toShort)
}
require(metaAddr != 0)
val infoAddr = metaAddr + 4 // Important: don't point at partID
val inserted = tsPart.addChunkInfoIfAbsent(chunkID, infoAddr)
Expand All @@ -109,6 +117,7 @@ extends RawToPartitionMaker with StrictLogging {
throw new RuntimeException(s"Partition [${new String(rawPartition.partitionKey)}] not found, this is bad")
}
}
//scalastyle:on

/**
* For a given chunkset, this method calculates the time bucket the chunks fall in.
Expand All @@ -118,16 +127,20 @@ extends RawToPartitionMaker with StrictLogging {
(ChunkSetInfo.getEndTime(infoBytes) / flushIntervalMillis) * flushIntervalMillis

/**
* Copies the onHeap contents read from ColStore into off-heap using the given memFactory
* Copies the onHeap contents read from ColStore into off-heap using the given memFactory.
* If an exception is thrown by this method, the tail of chunkPtrs sequence isn't filled in.
*
* @param chunkPtrs filled in by this method
*/
private def copyToOffHeap(buffers: Array[ByteBuffer],
memFactory: BlockMemFactory): Array[BinaryVectorPtr] = {
buffers.map { buf =>
// TODO: if in case the buffer is offheap/direct buffer already, maybe we don't need to copy it?
memFactory: BlockMemFactory,
chunkPtrs: ArrayBuffer[BinaryVectorPtr]): Unit = {
cforRange { 0 until buffers.length } { i =>
val buf = buffers(i)
val (bufBase, bufOffset, bufLen) = UnsafeUtils.BOLfromBuffer(buf)
val vectorAddr = memFactory.allocateOffheap(bufLen)
UnsafeUtils.unsafe.copyMemory(bufBase, bufOffset, UnsafeUtils.ZeroPointer, vectorAddr, bufLen)
vectorAddr
chunkPtrs += vectorAddr
}
}

Expand Down
33 changes: 20 additions & 13 deletions core/src/main/scala/filodb.core/memstore/TimeSeriesPartition.scala
Expand Up @@ -8,7 +8,7 @@ import filodb.core.Types._
import filodb.core.metadata.{Column, PartitionSchema, Schema}
import filodb.core.store._
import filodb.memory.{BinaryRegion, BinaryRegionLarge, BlockMemFactory, MemFactory}
import filodb.memory.data.ChunkMap
import filodb.memory.data.{ChunkMap, Shutdown}
import filodb.memory.format._
import filodb.memory.format.MemoryReader._

Expand Down Expand Up @@ -221,20 +221,27 @@ extends ChunkMap(memFactory, initMapSize) with ReadablePartition {
*/
private def encodeOneChunkset(info: ChunkSetInfo, appenders: AppenderArray, blockHolder: BlockMemFactory) = {
blockHolder.startMetaSpan()
// optimize and compact chunks
val frozenVectors = appenders.zipWithIndex.map { case (appender, i) =>
// This assumption cannot break. We should ensure one vector can be written
// to one block always atleast as per the current design.
// If this gets triggered, decrease the max writebuffer size so smaller chunks are encoded
require(blockHolder.blockAllocationSize() > appender.frozenSize)
val optimized = appender.optimize(blockHolder)
shardStats.encodedBytes.increment(BinaryVector.totalBytes(nativePtrReader, optimized))
if (schema.data.columns(i).columnType == Column.ColumnType.HistogramColumn)
shardStats.encodedHistBytes.increment(BinaryVector.totalBytes(nativePtrReader, optimized))
optimized
val frozenVectors = try {
// optimize and compact chunks
appenders.zipWithIndex.map { case (appender, i) =>
// This assumption cannot break. We should ensure one vector can be written
// to one block always atleast as per the current design.
// If this gets triggered, decrease the max writebuffer size so smaller chunks are encoded
require(blockHolder.blockAllocationSize() > appender.frozenSize)
val optimized = appender.optimize(blockHolder)
shardStats.encodedBytes.increment(BinaryVector.totalBytes(nativePtrReader, optimized))
if (schema.data.columns(i).columnType == Column.ColumnType.HistogramColumn)
shardStats.encodedHistBytes.increment(BinaryVector.totalBytes(nativePtrReader, optimized))
optimized
}
} catch { case e: Exception =>
// Shutdown process right away! Reaching this state means that we could not reclaim
// a whole bunch of blocks possibly because they were not marked as reclaimable,
// because of some bug. Cleanup or rollback at this point is not viable.
Shutdown.haltAndCatchFire(new RuntimeException("Error occurred when encoding vectors", e))
throw e
}
shardStats.numSamplesEncoded.increment(info.numRows)

// Now, write metadata into offheap block metadata space and update infosChunks
val metaAddr = blockHolder.endMetaSpan(TimeSeriesShard.writeMeta(_, partID, info, frozenVectors),
schema.data.blockMetaSize.toShort)
Expand Down
33 changes: 27 additions & 6 deletions core/src/main/scala/filodb.core/memstore/TimeSeriesShard.scala
Expand Up @@ -30,6 +30,7 @@ import filodb.core.metadata.{Schema, Schemas}
import filodb.core.query.{ColumnFilter, QuerySession}
import filodb.core.store._
import filodb.memory._
import filodb.memory.data.Shutdown
import filodb.memory.format.{UnsafeUtils, ZeroCopyUTF8String}
import filodb.memory.format.BinaryVector.BinaryVectorPtr
import filodb.memory.format.ZeroCopyUTF8String._
Expand Down Expand Up @@ -122,7 +123,7 @@ object TimeSeriesShard {
/**
* Copies serialized ChunkSetInfo bytes from persistent storage / on-demand paging.
*/
def writeMeta(addr: Long, partitionID: Int, bytes: Array[Byte], vectors: Array[BinaryVectorPtr]): Unit = {
def writeMeta(addr: Long, partitionID: Int, bytes: Array[Byte], vectors: ArrayBuffer[BinaryVectorPtr]): Unit = {
UnsafeUtils.setInt(UnsafeUtils.ZeroPointer, addr, partitionID)
ChunkSetInfo.copy(bytes, addr + 4)
for { i <- 0 until vectors.size optimized } {
Expand Down Expand Up @@ -331,7 +332,7 @@ class TimeSeriesShard(val ref: DatasetRef,
private[memstore] final val reclaimLock = blockStore.reclaimLock

// Requires blockStore.
startHeadroomTask(ingestSched)
private val headroomTask = startHeadroomTask(ingestSched)

// Each shard has a single ingestion stream at a time. This BlockMemFactory is used for buffer overflow encoding
// strictly during ingest() and switchBuffers().
Expand Down Expand Up @@ -950,8 +951,7 @@ class TimeSeriesShard(val ref: DatasetRef,
val result = Future.sequence(Seq(writeChunksFuture, writeDirtyPartKeysFuture, pubDownsampleFuture)).map {
_.find(_.isInstanceOf[ErrorResponse]).getOrElse(Success)
}.flatMap {
case Success => blockHolder.markUsedBlocksReclaimable()
commitCheckpoint(ref, shardNum, flushGroup)
case Success => commitCheckpoint(ref, shardNum, flushGroup)
case er: ErrorResponse => Future.successful(er)
}.recover { case e =>
logger.error(s"Internal Error when persisting chunks in dataset=$ref shard=$shardNum - should " +
Expand All @@ -961,6 +961,13 @@ class TimeSeriesShard(val ref: DatasetRef,
result.onComplete { resp =>
assertThreadName(IngestSchedName)
try {
// COMMENTARY ON BUG FIX DONE: Mark used blocks as reclaimable even on failure. Even if cassandra write fails
// or other errors occur, we cannot leave blocks as not reclaimable and also release the factory back into pool.
// Earlier, we were not calling this with the hope that next use of the blockMemFactory will mark them
// as reclaimable. But the factory could be used for a different flush group. Not the same one. It can
// succeed, and the wrong blocks can be marked as reclaimable.
// Can try out tracking unreclaimed blockMemFactories without releasing, but it needs to be separate PR.
blockHolder.markUsedBlocksReclaimable()
blockFactoryPool.release(blockHolder)
flushDoneTasks(flushGroup, resp)
tracer.finish()
Expand Down Expand Up @@ -1498,9 +1505,22 @@ class TimeSeriesShard(val ref: DatasetRef,
})
}

private def startHeadroomTask(sched: Scheduler): Unit = {
private def startHeadroomTask(sched: Scheduler) = {
sched.scheduleWithFixedDelay(1, 1, TimeUnit.MINUTES, new Runnable {
def run() = blockStore.ensureHeadroom(storeConfig.ensureHeadroomPercent)
var numFailures = 0

def run() = {
val numFree = blockStore.ensureHeadroom(storeConfig.ensureHeadroomPercent)
if (numFree > 0) {
numFailures = 0
} else {
numFailures += 1
if (numFailures >= 5) {
Shutdown.haltAndCatchFire(new RuntimeException(s"Headroom task was unable to free memory " +
s"for $numFailures consecutive attempts. Shutting down process. shard=$shardNum"))
}
}
}
})
}

Expand Down Expand Up @@ -1541,6 +1561,7 @@ class TimeSeriesShard(val ref: DatasetRef,
method to ensure that no threads are accessing the memory before it's freed.
blockStore.releaseBlocks()
*/
headroomTask.cancel()
ingestSched.shutdown()
}
}
59 changes: 38 additions & 21 deletions memory/src/main/scala/filodb.memory/BlockManager.scala
Expand Up @@ -397,40 +397,56 @@ class PageAlignedBlockManager(val totalMemorySizeInBytes: Long,
reclaimLog += event
}

//scalastyle:off
protected def tryReclaim(num: Int): Unit = {
var reclaimed = 0
var currList = 0
val timeOrderedListIt = usedBlocksTimeOrdered.entrySet.iterator
while ( reclaimed < num &&
timeOrderedListIt.hasNext ) {
val entry = timeOrderedListIt.next
val prevReclaimed = reclaimed
val removed = reclaimFrom(entry.getValue, stats.timeOrderedBlocksReclaimedMetric)
if (removed.nonEmpty) {
logger.info(s"timeBlockReclaim: Reclaimed ${removed.length} time ordered blocks " +
s"from list at t=${entry.getKey} (${(System.currentTimeMillis - entry.getKey)/3600000} hrs ago) " +
s"\nReclaimed blocks: ${removed.map(b => jLong.toHexString(b.address)).mkString(" ")}")

// First reclaim time-ordered blocks which are marked as reclaimable.
reclaimTimeOrdered(false);

if (reclaimed < num) {
// Not enough reclaimed, so try reclaiming non-time-ordered blocks which are marked as reclaimable.
reclaimFrom(usedBlocks, stats.blocksReclaimedMetric, false)

if (reclaimed < num) {
// Still not enough? Forcibly reclaim time-ordered blocks.
reclaimTimeOrdered(true);

if (reclaimed < num) {
// Still not enough, but forcibly reclaiming non-time-ordered blocks is dangerous.
logger.warn(s"$num blocks to reclaim but only reclaimed $reclaimed. usedblocks=${usedBlocks.size} " +
s"usedBlocksTimeOrdered=${usedBlocksTimeOrdered.asScala.toList.map{case(n, l) => (n, l.size)}}")
}
}
// If the block list is now empty, remove it from tree map
if (entry.getValue.isEmpty) timeOrderedListIt.remove()
}
if (reclaimed < num) reclaimFrom(usedBlocks, stats.blocksReclaimedMetric)
// if we do not get required blocks even after reclaim call
if (reclaimed < num) {
logger.warn(s"$num blocks to reclaim but only reclaimed $reclaimed. usedblocks=${usedBlocks.size} " +
s"usedBlocksTimeOrdered=${usedBlocksTimeOrdered.asScala.toList.map{case(n, l) => (n, l.size)}}")

def reclaimTimeOrdered(forced: Boolean): Unit = {
val timeOrderedListIt = usedBlocksTimeOrdered.entrySet.iterator
while ( reclaimed < num &&
timeOrderedListIt.hasNext ) {
val entry = timeOrderedListIt.next
val prevReclaimed = reclaimed
val removed = reclaimFrom(entry.getValue, stats.timeOrderedBlocksReclaimedMetric, forced)
if (removed.nonEmpty) {
logger.info(s"timeBlockReclaim: Reclaimed ${removed.length} time ordered blocks " +
s"from list at t=${entry.getKey} (${(System.currentTimeMillis - entry.getKey)/3600000} hrs ago) " +
s"\nReclaimed blocks: ${removed.map(b => jLong.toHexString(b.address)).mkString(" ")}")
}
// If the block list is now empty, remove it from tree map
if (entry.getValue.isEmpty) timeOrderedListIt.remove()
}
}

def reclaimFrom(list: util.ArrayDeque[Block], reclaimedCounter: Counter): Seq[Block] = {
def reclaimFrom(list: util.ArrayDeque[Block], reclaimedCounter: Counter, forced: Boolean): Seq[Block] = {
val entries = list.iterator
val removed = new collection.mutable.ArrayBuffer[Block]
while (entries.hasNext && reclaimed < num) {
val block = entries.next
if (block.canReclaim) {
if (forced || block.canReclaim) {
entries.remove()
removed += block
addToReclaimLog(block)
block.reclaim()
block.reclaim(forced)
block.clearOwner()
freeBlocks.add(block)
stats.freeBlocksMetric.update(freeBlocks.size())
Expand All @@ -441,6 +457,7 @@ class PageAlignedBlockManager(val totalMemorySizeInBytes: Long,
removed
}
}
//scalastyle:on

def numTimeOrderedBlocks: Int = usedBlocksTimeOrdered.values.asScala.map(_.size).sum

Expand Down
17 changes: 4 additions & 13 deletions memory/src/main/scala/filodb.memory/data/ChunkMap.scala
Expand Up @@ -113,20 +113,12 @@ object ChunkMap extends StrictLogging {
def validateNoSharedLocks(unitTest: Boolean = false): Unit = {
val numLocksReleased = ChunkMap.releaseAllSharedLocks()
if (numLocksReleased > 0) {
val msg = s"Number of locks was non-zero: $numLocksReleased. " +
s"This is indicative of a possible lock acquisition/release bug."
if (unitTest) {
throw new Error(msg)
}
logger.error(msg)
haltAndCatchFire()
val ex = new RuntimeException(s"Number of locks was non-zero: $numLocksReleased. " +
s"This is indicative of a possible lock acquisition/release bug.")
Shutdown.haltAndCatchFire(ex)
}
}

def haltAndCatchFire(): Unit = {
logger.error(s"Shutting down process since it may be in an unstable/corrupt state.")
Runtime.getRuntime.halt(1)
}
}

/**
Expand Down Expand Up @@ -273,8 +265,7 @@ class ChunkMap(val memFactory: MemFactory, var capacity: Int) {
warned = true
} else if (warned && timeoutNanos >= MaxExclusiveRetryTimeoutNanos) {
val lockState = UnsafeUtils.getIntVolatile(this, lockStateOffset)
_logger.error(s"Unable to acquire exclusive lock: $lockState")
haltAndCatchFire()
Shutdown.haltAndCatchFire(new RuntimeException(s"Unable to acquire exclusive lock: $lockState"))
}
}
}
Expand Down
16 changes: 16 additions & 0 deletions memory/src/main/scala/filodb.memory/data/Shutdown.scala
@@ -0,0 +1,16 @@
package filodb.memory.data

import com.typesafe.scalalogging.StrictLogging
import kamon.Kamon

object Shutdown extends StrictLogging {

val forcedShutdowns = Kamon.counter("forced-shutdowns").withoutTags()
def haltAndCatchFire(e: Exception, unitTest: Boolean = false): Unit = {
forcedShutdowns.increment()
if (unitTest) throw e
logger.error(s"Shutting down process since it may be in an unstable/corrupt state", e)
Runtime.getRuntime.halt(189)
}

}

0 comments on commit ae0b356

Please sign in to comment.