-
Notifications
You must be signed in to change notification settings - Fork 224
/
BlockManager.scala
530 lines (464 loc) · 20 KB
/
BlockManager.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
package filodb.memory
import java.lang.{Long => jLong}
import java.util
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
import com.kenai.jffi.{MemoryIO, PageManager}
import com.typesafe.scalalogging.StrictLogging
import kamon.Kamon
import kamon.metric.Counter
import kamon.tag.TagSet
final case class MemoryRequestException(msg: String) extends Exception(msg)
/**
* Allows requesting blocks.
*/
trait BlockManager {
/**
* @return The size of the block in bytes which can be allocated by this BlockManager
*/
def blockSizeInBytes: Long
/**
* @return The number of free blocks still available for consumption
*/
def numFreeBlocks: Int
/**
* @return true if the time bucket has blocks allocated
*/
def hasTimeBucket(bucket: Long): Boolean
/**
* @param memorySize The size of memory in bytes for which blocks are to be allocated
* @param bucketTime the timebucket (timestamp) from which to allocate block(s), or None for the general list
* @param owner the BlockMemFactory that will be owning this block, until reclaim. Used for debugging.
* @return A sequence of blocks totaling up in memory requested or empty if unable to allocate
*/
def requestBlocks(memorySize: Long, bucketTime: Option[Long], owner: Option[BlockMemFactory] = None): Seq[Block]
/**
* @param bucketTime the timebucket from which to allocate block(s), or None for the general list
* @param owner the BlockMemFactory that will be owning this block, until reclaim. Used for debugging.
* @return One block of memory
*/
def requestBlock(bucketTime: Option[Long], owner: Option[BlockMemFactory] = None): Option[Block]
/**
* Attempts to reclaim as many blocks as necessary to ensure that enough free blocks are
* available.
*
* @return numFreeBlocks
*/
def ensureFreeBlocks(num: Int): Int
/**
* Attempts to reclaim as many blocks as necessary to ensure that enough free bytes are
* available. The actual amount reclaimed might be higher than requested.
*
* @return numFreeBlocks
*/
def ensureFreeBytes(amt: Long): Int = {
val blocks = (amt + blockSizeInBytes - 1) / blockSizeInBytes
ensureFreeBlocks(Math.min(Integer.MAX_VALUE, blocks).toInt)
}
/**
* Attempts to reclaim as many blocks as necessary to ensure that enough free bytes are
* available as a percentage of total size. The actual amount reclaimed might be higher than
* requested.
*
* @param pct percentage: 0.0 to 100.0
* @return numFreeBlocks
*/
def ensureFreePercent(pct: Double): Int = {
ensureFreeBytes((totalMemorySizeInBytes * pct * 0.01).toLong)
}
def currentFreePercent: Double = {
(((numFreeBlocks * blockSizeInBytes).toDouble) / totalMemorySizeInBytes) * 100.0
}
def totalMemorySizeInBytes: Long
/**
* Releases all blocks allocated by this store.
*/
def releaseBlocks(): Unit
/**
* Marks all time-bucketed blocks in buckets up to upTo as reclaimable
*/
def markBucketedBlocksReclaimable(upTo: Long): Unit
/**
* @return Memory stats for recording
*/
def stats(): MemoryStats
}
class MemoryStats(tags: Map[String, String]) {
val usedBlocksMetric = Kamon.gauge("blockstore-used-blocks").withTags(TagSet.from(tags))
val freeBlocksMetric = Kamon.gauge("blockstore-free-blocks").withTags(TagSet.from(tags))
val requestedBlocksMetric = Kamon.counter("blockstore-blocks-requested").withTags(TagSet.from(tags))
val usedBlocksTimeOrderedMetric = Kamon.gauge("blockstore-used-time-ordered-blocks").withTags(TagSet.from(tags))
val timeOrderedBlocksReclaimedMetric = Kamon.counter("blockstore-time-ordered-blocks-reclaimed")
.withTags(TagSet.from(tags))
val blocksReclaimedMetric = Kamon.counter("blockstore-blocks-reclaimed").withTags(TagSet.from(tags))
/**
* How much time a thread was potentially stalled while attempting to ensure
* free space. Unit is nanoseconds.
*/
val blockHeadroomStall = Kamon.counter("blockstore-headroom-stall-nanos").withTags(TagSet.from(tags))
/**
* How much time a thread was stalled while attempting to acquire the reclaim lock.
* Unit is nanoseconds.
*/
val blockReclaimStall = Kamon.counter("blockstore-reclaim-stall-nanos").withTags(TagSet.from(tags))
}
final case class ReclaimEvent(block: Block, reclaimTime: Long, oldOwner: Option[BlockMemFactory], remaining: Long)
object PageAlignedBlockManager {
val MaxReclaimLogSize = 10000
}
/**
* Pre Allocates blocks totalling to the passed memory size.
* Each block size is the same as the OS page size.
* This class is thread safe
*
* @param totalMemorySizeInBytes Control the number of pages to allocate. (totalling up to the totallMemorySizeInBytes)
* @param stats Memory metrics which need to be recorded
* @param reclaimer ReclaimListener to use on block metadata when a block is freed
* @param numPagesPerBlock The number of pages a block spans
*/
class PageAlignedBlockManager(val totalMemorySizeInBytes: Long,
val stats: MemoryStats,
reclaimer: ReclaimListener,
numPagesPerBlock: Int)
extends BlockManager with StrictLogging {
import PageAlignedBlockManager._
val mask = PageManager.PROT_READ | PageManager.PROT_EXEC | PageManager.PROT_WRITE
import collection.JavaConverters._
protected var firstPageAddress: Long = 0L
protected val freeBlocks: util.ArrayDeque[Block] = allocate()
protected[memory] val usedBlocks: util.ArrayDeque[Block] = new util.ArrayDeque[Block]()
protected[memory] val usedBlocksTimeOrdered = new util.TreeMap[Long, util.ArrayDeque[Block]]
val reclaimLog = new collection.mutable.Queue[ReclaimEvent]
protected val lock = new ReentrantLock()
// Acquired when reclaiming on demand. Acquire shared lock to prevent block reclamation.
final val reclaimLock = new Latch
override def blockSizeInBytes: Long = PageManager.getInstance().pageSize() * numPagesPerBlock
def usedMemory: Long = usedBlocks.size * blockSizeInBytes
override def numFreeBlocks: Int = freeBlocks.size
override def requestBlock(bucketTime: Option[Long], bmf: Option[BlockMemFactory] = None): Option[Block] = {
val blocks = requestBlocks(blockSizeInBytes, bucketTime, bmf)
blocks.size match {
case 0 => None
case 1 => Some(blocks.head)
case _ => throw new IllegalStateException("Should not have gotten more than one block")
}
}
/* Used in tests for assertion */
def usedBlocksSize(bucketTime: Option[Long]): Int = {
bucketTime match {
case Some(t) => usedBlocksTimeOrdered.get(t).size()
case None => usedBlocks.size()
}
}
/**
* Allocates requested number of blocks. If enough blocks are not available,
* then uses the ReclaimPolicy to check if blocks can be reclaimed
* Uses a lock to ensure that concurrent requests are safe.
*
* If bucketTime is provided, a MemoryRequestException is thrown when no blocks are
* currently available. In other words, time ordered block allocation doesn't force
* reclamation. Instead, a background task must be running which calls ensureFreeBlocks.
* Time ordered blocks are used for on-demand-paging only (ODP), initiated by a query, and
* reclamation during ODP can end up causing the query results to have "holes". Throwing an
* exception isn't a perfect solution, but it can suffice until a proper block pinning
* mechanism is in place. Queries which fail with this exception can retry, perhaps after
* calling ensureFreeBLocks explicitly.
*/
override def requestBlocks(memorySize: Long,
bucketTime: Option[Long],
bmf: Option[BlockMemFactory] = None): Seq[Block] = {
val num: Int = Math.ceil(memorySize / blockSizeInBytes).toInt
stats.requestedBlocksMetric.increment(num)
lock.lock()
try {
if (freeBlocks.size < num) {
if (bucketTime.isEmpty) {
tryReclaimOnDemand(num)
} else {
val msg = s"Unable to allocate time ordered block(s) without forcing a reclamation: " +
s"num_blocks=$num num_bytes=$memorySize freeBlocks=${freeBlocks.size}"
throw new MemoryRequestException(msg)
}
}
if (freeBlocks.size >= num) {
val allocated = new Array[Block](num)
(0 until num).foreach { i =>
val block = freeBlocks.remove()
if (bmf.nonEmpty) block.setOwner(bmf.get)
use(block, bucketTime)
allocated(i) = block
}
allocated
} else {
logger.warn(s"Out of blocks to allocate! num_blocks=$num num_bytes=$memorySize freeBlocks=${freeBlocks.size}")
Seq.empty[Block]
}
} finally {
lock.unlock()
}
}
/**
* Internal variant of the tryReclaim method which is called when blocks are requested, but
* none are available. Instead of blindly reclaiming blocks, it attempts to exclusively
* acquire the reclaim lock. By doing so, it avoids reclaiming blocks which are currently
* being accessed. To work properly, all threads which require this protection must hold the
* shared reclaimLock. To prevent indefinite stalls, this method times out lock acquisition,
* logs an error, and then reclaims anyhow.
*
* This method must be called with the primary lock object held. To avoid deadlock, this
* method releases and re-acquires the lock.
*/
private def tryReclaimOnDemand(num: Int): Unit = {
lock.unlock()
var acquired: Boolean = false
try {
val start = System.nanoTime()
// Give up after waiting (in total) a little over 16 seconds.
acquired = tryExclusiveReclaimLock(8192)
if (!acquired) {
// Don't stall ingestion forever. Some queries might return invalid results because
// the lock isn't held. If the lock state is broken, then ingestion is really stuck
// and the node must be restarted. Queries should always release the lock.
logger.error(s"Lock for BlockManager.tryReclaimOnDemand timed out: ${reclaimLock}")
} else {
logger.debug("Lock for BlockManager.tryReclaimOnDemand aquired")
}
val stall = System.nanoTime() - start
stats.blockReclaimStall.increment(stall)
} finally {
lock.lock()
}
try {
if (numFreeBlocks < num) { // double check since lock was released
tryReclaim(num)
}
} finally {
if (acquired) {
reclaimLock.releaseExclusive()
}
}
}
private def tryExclusiveReclaimLock(finalTimeoutMillis: Int): Boolean = {
// Attempting to acquire the exclusive lock must wait for concurrent queries to finish, but
// waiting will also stall new queries from starting. To protect against this, attempt with
// a timeout to let any stalled queries through. To prevent starvation of the exclusive
// lock attempt, increase the timeout each time, but eventually give up. The reason why
// waiting for an exclusive lock causes this problem is that the thread must enqueue itself
// into the lock as a waiter, and all new shared requests must wait their turn. The risk
// with timing out is that if there's a continuous stream of long running queries (more than
// one second), then the exclusive lock will never be acqiured, and then ensureFreeBlocks
// won't be able to do its job. The timeout settings might need to be adjusted in that case.
// Perhaps the timeout should increase automatically if ensureFreeBlocks failed the last time?
// This isn't safe to do until we gain higher confidence that the shared lock is always
// released by queries.
var timeout = 1;
while (true) {
val acquired = reclaimLock.tryAcquireExclusiveNanos(TimeUnit.MILLISECONDS.toNanos(timeout))
if (acquired) {
return true
}
if (timeout >= finalTimeoutMillis) {
return false
}
Thread.`yield`()
timeout = Math.min(finalTimeoutMillis, timeout << 1)
}
false // never reached, but scala compiler complains otherwise
}
/**
* Expected to be called via a background task, to periodically ensure that enough blocks
* are free for new allocations. This helps prevent ODP activity from reclaiming immediately
* from itself.
*
* @param pct percentage: 0.0 to 100.0
*/
def ensureHeadroom(pct: Double): Int = {
// Ramp up the timeout as the current headroom shrinks. Max timeout per attempt is a little
// over 2 seconds, and the total timeout can be double that, for a total of 4 seconds.
val maxTimeoutMillis = 2048
val timeoutMillis = ((1.0 - (currentFreePercent / pct)) * maxTimeoutMillis).toInt
if (timeoutMillis <= 0) {
// Headroom target is already met.
return numFreeBlocks
}
var numFree: Int = 0
val start = System.nanoTime()
val acquired = tryExclusiveReclaimLock(timeoutMillis)
if (!acquired) {
if (timeoutMillis >= maxTimeoutMillis / 2) {
// Start warning when the current headroom has dipped below the halfway point.
// The lock state is logged in case it's stuck due to a runaway query somewhere.
logger.warn(s"Lock for BlockManager.ensureHeadroom timed out: ${reclaimLock}")
}
numFree = numFreeBlocks
} else {
try {
numFree = ensureFreePercent(pct)
} finally {
reclaimLock.releaseExclusive()
}
val numBytes = numFree * blockSizeInBytes
logger.debug(s"BlockManager.ensureHeadroom numFree: $numFree ($numBytes bytes)")
}
val stall = System.nanoTime() - start
stats.blockHeadroomStall.increment(stall)
numFree
}
override def ensureFreeBlocks(num: Int): Int = {
lock.lock()
try {
val require = num - numFreeBlocks
if (require > 0) tryReclaim(require)
numFreeBlocks
} finally {
lock.unlock()
}
}
protected def allocate(): util.ArrayDeque[Block] = {
val numBlocks: Int = Math.floor(totalMemorySizeInBytes / blockSizeInBytes).toInt
val blocks = new util.ArrayDeque[Block]()
logger.info(s"Allocating $numBlocks blocks of $blockSizeInBytes bytes each, total $totalMemorySizeInBytes")
firstPageAddress = MemoryIO.getCheckedInstance().allocateMemory(totalMemorySizeInBytes, false)
for (i <- 0 until numBlocks) {
val address = firstPageAddress + (i * blockSizeInBytes)
blocks.add(new Block(address, blockSizeInBytes, reclaimer))
}
stats.freeBlocksMetric.update(blocks.size())
blocks
}
protected def use(block: Block, bucketTime: Option[Long]) = {
block.markInUse
bucketTime match {
case Some(bucket) => val blockList = Option(usedBlocksTimeOrdered.get(bucket)).getOrElse {
val list = new util.ArrayDeque[Block]()
usedBlocksTimeOrdered.put(bucket, list)
list
}
blockList.add(block)
stats.usedBlocksTimeOrderedMetric.update(numTimeOrderedBlocks)
case None => usedBlocks.add(block)
stats.usedBlocksMetric.update(usedBlocks.size())
}
stats.freeBlocksMetric.update(freeBlocks.size())
}
private def addToReclaimLog(block: Block): Unit = {
val event = ReclaimEvent(block, System.currentTimeMillis, block.owner, block.remaining)
if (reclaimLog.size >= MaxReclaimLogSize) { reclaimLog.dequeue }
reclaimLog += event
}
//scalastyle:off
protected def tryReclaim(num: Int): Unit = {
var reclaimed = 0
// First reclaim time-ordered blocks which are marked as reclaimable.
reclaimTimeOrdered(false);
if (reclaimed < num) {
// Not enough reclaimed, so try reclaiming non-time-ordered blocks which are marked as reclaimable.
reclaimFrom(usedBlocks, stats.blocksReclaimedMetric, false)
if (reclaimed < num) {
// Still not enough? Forcibly reclaim time-ordered blocks.
reclaimTimeOrdered(true);
if (reclaimed < num) {
// Still not enough, but forcibly reclaiming non-time-ordered blocks is dangerous.
logger.warn(s"$num blocks to reclaim but only reclaimed $reclaimed. usedblocks=${usedBlocks.size} " +
s"usedBlocksTimeOrdered=${usedBlocksTimeOrdered.asScala.toList.map{case(n, l) => (n, l.size)}}")
}
}
}
def reclaimTimeOrdered(forced: Boolean): Unit = {
val timeOrderedListIt = usedBlocksTimeOrdered.entrySet.iterator
while ( reclaimed < num &&
timeOrderedListIt.hasNext ) {
val entry = timeOrderedListIt.next
val prevReclaimed = reclaimed
val removed = reclaimFrom(entry.getValue, stats.timeOrderedBlocksReclaimedMetric, forced)
if (removed.nonEmpty) {
logger.info(s"timeBlockReclaim: Reclaimed ${removed.length} time ordered blocks " +
s"from list at t=${entry.getKey} (${(System.currentTimeMillis - entry.getKey)/3600000} hrs ago) " +
s"\nReclaimed blocks: ${removed.map(b => jLong.toHexString(b.address)).mkString(" ")}")
}
// If the block list is now empty, remove it from tree map
if (entry.getValue.isEmpty) timeOrderedListIt.remove()
}
}
def reclaimFrom(list: util.ArrayDeque[Block], reclaimedCounter: Counter, forced: Boolean): Seq[Block] = {
val entries = list.iterator
val removed = new collection.mutable.ArrayBuffer[Block]
while (entries.hasNext && reclaimed < num) {
val block = entries.next
if (forced || block.canReclaim) {
entries.remove()
removed += block
addToReclaimLog(block)
block.reclaim(forced)
block.clearOwner()
freeBlocks.add(block)
stats.freeBlocksMetric.update(freeBlocks.size())
reclaimedCounter.increment()
reclaimed = reclaimed + 1
}
}
removed
}
}
//scalastyle:on
def numTimeOrderedBlocks: Int = usedBlocksTimeOrdered.values.asScala.map(_.size).sum
def timeBuckets: Seq[Long] = usedBlocksTimeOrdered.keySet.asScala.toSeq
def markBucketedBlocksReclaimable(upTo: Long): Unit = {
lock.lock()
try {
logger.info(s"timeBlockReclaim: Marking ($upTo) - this is -${(System.currentTimeMillis - upTo)/3600000}hrs")
val keys = usedBlocksTimeOrdered.headMap(upTo).keySet.asScala
logger.info(s"timeBlockReclaim: Marking lists $keys as reclaimable")
usedBlocksTimeOrdered.headMap(upTo).values.asScala.foreach { list =>
list.asScala.foreach(_.tryMarkReclaimable)
}
} finally {
lock.unlock()
}
}
def hasTimeBucket(bucket: Long): Boolean = {
lock.lock()
val result = usedBlocksTimeOrdered.containsKey(bucket)
lock.unlock()
result
}
/**
* Used during testing only to try and reclaim all existing blocks
*/
def reclaimAll(): Unit = {
logger.warn(s"Reclaiming all used blocks -- THIS BETTER BE A TEST!!!")
markBucketedBlocksReclaimable(Long.MaxValue)
usedBlocks.asScala.foreach(_.markReclaimable)
tryReclaim(usedBlocks.size + numTimeOrderedBlocks)
}
/**
* Finds all reclaim events in the log whose Block contains the pointer passed in.
* Useful for debugging. O(n) - not performant.
*/
def reclaimEventsForPtr(ptr: BinaryRegion.NativePointer): Seq[ReclaimEvent] =
reclaimLog.filter { ev => ptr >= ev.block.address && ptr < (ev.block.address + ev.block.capacity) }
def timeBlocksForPtr(ptr: BinaryRegion.NativePointer): Seq[Block] = {
lock.lock()
try {
usedBlocksTimeOrdered.entrySet.iterator.asScala.flatMap { entry =>
BlockDetective.containsPtr(ptr, entry.getValue)
}.toBuffer
} finally {
lock.unlock()
}
}
def releaseBlocks(): Unit = {
lock.lock()
try {
if (firstPageAddress != 0) {
MemoryIO.getCheckedInstance.freeMemory(firstPageAddress)
firstPageAddress = 0
}
} catch {
case e: Throwable => logger.warn(s"Could not release blocks at $firstPageAddress", e)
} finally {
lock.unlock()
}
}
override def finalize(): Unit = releaseBlocks
}