Skip to content

Commit

Permalink
finagle-core: Limit the maximum number of jobs in the thread pool bac…
Browse files Browse the repository at this point in the history
…king the offload filter

Problem
The existence of an unbound queue anywhere can cause the application to run out of memory and terminate unexpectedly.

Solution
Allow the service owner to limit the length of the queue with some reasonable number.

Differential Revision: https://phabricator.twitter.biz/D1116102
  • Loading branch information
mbezoyan authored and jenkins committed Dec 15, 2023
1 parent 1609ebb commit 8baf802
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@ import java.util.concurrent.RejectedExecutionHandler
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit

private[twitter] class DefaultThreadPoolExecutor(poolSize: Int, stats: StatsReceiver)
private[twitter] class DefaultThreadPoolExecutor(
poolSize: Int,
maxQueueLen: Int,
stats: StatsReceiver)
extends ThreadPoolExecutor(
poolSize /*corePoolSize*/,
poolSize /*maximumPoolSize*/,
0L /*keepAliveTime*/,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue[Runnable]() /*workQueue*/,
new LinkedBlockingQueue[Runnable](maxQueueLen) /*workQueue*/,
new NamedPoolThreadFactory("finagle/offload", makeDaemons = true) /*threadFactory*/,
new RunsOnNettyThread(stats.counter("not_offloaded_tasks")))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,11 @@ object OffloadFuturePool {
lazy val configuredPool: Option[FuturePool] = {
val workers =
numWorkers.get.orElse(if (auto()) Some(com.twitter.jvm.numProcs().ceil.toInt) else None)
val maxQueueLen = maxQueueLength()

workers.map { threads =>
val stats = FinagleStatsReceiver.scope("offload_pool")
val pool = new OffloadFuturePool(OffloadThreadPool(threads, stats), stats)
val pool = new OffloadFuturePool(OffloadThreadPool(threads, maxQueueLen, stats), stats)

// Start sampling the offload delay if the interval isn't Duration.Top.
if (statsSampleInterval().isFinite && statsSampleInterval() > Duration.Zero) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,29 @@ private object OffloadThreadPool {
private[this] val logger = Logger.get()

/** Construct an `ExecutorService` with the proper thread names and metrics */
def apply(poolSize: Int, stats: StatsReceiver): ExecutorService = {
def apply(poolSize: Int, maxQueueLen: Int, stats: StatsReceiver): ExecutorService = {
LoadService[OffloadThreadPoolFactory]() match {
case Seq() =>
logger.info("Constructing the default OffloadThreadPool executor service")
new DefaultThreadPoolExecutor(poolSize, stats)
new DefaultThreadPoolExecutor(
poolSize = poolSize,
maxQueueLen = maxQueueLen,
stats = stats
)

case Seq(factory) =>
logger.info(s"Constructing OffloadThreadPool using $factory")
factory.newPool(poolSize, stats)
factory.newPool(poolSize, maxQueueLen, stats)

case multiple =>
logger.error(
s"Found multiple `OffloadThreadPoolFactory`s: $multiple. " +
s"Using the default implementation.")
new DefaultThreadPoolExecutor(poolSize, stats)
new DefaultThreadPoolExecutor(
poolSize = poolSize,
maxQueueLen = maxQueueLen,
stats = stats
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@ abstract class OffloadThreadPoolFactory {

/** Construct a new `ExecutorService`
*
* @param poolSize The size of the pool as configured by the finagle flags
* `com.twitter.finagle.offload.numWorkers` and `com.twitter.finagle.offload.auto`
* @param stats `StatsReceiver` to use for observability.
* @param poolSize The size of the pool as configured by the finagle flags
* `com.twitter.finagle.offload.numWorkers` and `com.twitter.finagle.offload.auto`
* @param maxQueueLen The maximum length of the queue in the pool as configured by the finagle flag
* `com.twitter.finagle.offload.maxQueueLen`
* @param stats `StatsReceiver` to use for observability.
*/

def newPool(poolSize: Int, stats: StatsReceiver): ExecutorService
def newPool(poolSize: Int, maxQueueLen: Int, stats: StatsReceiver): ExecutorService

/** Implementors should make the `toString` method meaningful and it will be used in log entries */
override def toString: String
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.twitter.finagle.offload

import com.twitter.app.GlobalFlag

object maxQueueLength
extends GlobalFlag[Int](
default = Int.MaxValue,
help =
"Experimental flag. Sets the maximum number of jobs in the thread pool in the offload filter"
)

0 comments on commit 8baf802

Please sign in to comment.