From 8baf8028a954937bf56577cae08a39283bd6951a Mon Sep 17 00:00:00 2001 From: Mikhail Bezoyan Date: Fri, 15 Dec 2023 00:40:58 +0000 Subject: [PATCH] finagle-core: Limit the maximum number of jobs in the thread pool backing the offload filter Problem The existence of an unbound queue anywhere can cause the application to run out of memory and terminate unexpectedly. Solution Allow the service owner to limit the length of the queue with some reasonable number. Differential Revision: https://phabricator.twitter.biz/D1116102 --- .../offload/DefaultThreadPoolExecutor.scala | 7 +++++-- .../finagle/offload/OffloadFuturePool.scala | 3 ++- .../finagle/offload/OffloadThreadPool.scala | 16 ++++++++++++---- .../offload/OffloadThreadPoolFactory.scala | 10 ++++++---- .../twitter/finagle/offload/maxQueueLength.scala | 10 ++++++++++ 5 files changed, 35 insertions(+), 11 deletions(-) create mode 100644 finagle-core/src/main/scala/com/twitter/finagle/offload/maxQueueLength.scala diff --git a/finagle-core/src/main/scala/com/twitter/finagle/offload/DefaultThreadPoolExecutor.scala b/finagle-core/src/main/scala/com/twitter/finagle/offload/DefaultThreadPoolExecutor.scala index fcdf892510..cff7187c4a 100644 --- a/finagle-core/src/main/scala/com/twitter/finagle/offload/DefaultThreadPoolExecutor.scala +++ b/finagle-core/src/main/scala/com/twitter/finagle/offload/DefaultThreadPoolExecutor.scala @@ -9,13 +9,16 @@ import java.util.concurrent.RejectedExecutionHandler import java.util.concurrent.ThreadPoolExecutor import java.util.concurrent.TimeUnit -private[twitter] class DefaultThreadPoolExecutor(poolSize: Int, stats: StatsReceiver) +private[twitter] class DefaultThreadPoolExecutor( + poolSize: Int, + maxQueueLen: Int, + stats: StatsReceiver) extends ThreadPoolExecutor( poolSize /*corePoolSize*/, poolSize /*maximumPoolSize*/, 0L /*keepAliveTime*/, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue[Runnable]() /*workQueue*/, + new LinkedBlockingQueue[Runnable](maxQueueLen) /*workQueue*/, new NamedPoolThreadFactory("finagle/offload", makeDaemons = true) /*threadFactory*/, new RunsOnNettyThread(stats.counter("not_offloaded_tasks"))) diff --git a/finagle-core/src/main/scala/com/twitter/finagle/offload/OffloadFuturePool.scala b/finagle-core/src/main/scala/com/twitter/finagle/offload/OffloadFuturePool.scala index e74ae46a2a..275c3c566d 100644 --- a/finagle-core/src/main/scala/com/twitter/finagle/offload/OffloadFuturePool.scala +++ b/finagle-core/src/main/scala/com/twitter/finagle/offload/OffloadFuturePool.scala @@ -45,10 +45,11 @@ object OffloadFuturePool { lazy val configuredPool: Option[FuturePool] = { val workers = numWorkers.get.orElse(if (auto()) Some(com.twitter.jvm.numProcs().ceil.toInt) else None) + val maxQueueLen = maxQueueLength() workers.map { threads => val stats = FinagleStatsReceiver.scope("offload_pool") - val pool = new OffloadFuturePool(OffloadThreadPool(threads, stats), stats) + val pool = new OffloadFuturePool(OffloadThreadPool(threads, maxQueueLen, stats), stats) // Start sampling the offload delay if the interval isn't Duration.Top. if (statsSampleInterval().isFinite && statsSampleInterval() > Duration.Zero) { diff --git a/finagle-core/src/main/scala/com/twitter/finagle/offload/OffloadThreadPool.scala b/finagle-core/src/main/scala/com/twitter/finagle/offload/OffloadThreadPool.scala index 5778491ac5..92bca80b3a 100644 --- a/finagle-core/src/main/scala/com/twitter/finagle/offload/OffloadThreadPool.scala +++ b/finagle-core/src/main/scala/com/twitter/finagle/offload/OffloadThreadPool.scala @@ -10,21 +10,29 @@ private object OffloadThreadPool { private[this] val logger = Logger.get() /** Construct an `ExecutorService` with the proper thread names and metrics */ - def apply(poolSize: Int, stats: StatsReceiver): ExecutorService = { + def apply(poolSize: Int, maxQueueLen: Int, stats: StatsReceiver): ExecutorService = { LoadService[OffloadThreadPoolFactory]() match { case Seq() => logger.info("Constructing the default OffloadThreadPool executor service") - new DefaultThreadPoolExecutor(poolSize, stats) + new DefaultThreadPoolExecutor( + poolSize = poolSize, + maxQueueLen = maxQueueLen, + stats = stats + ) case Seq(factory) => logger.info(s"Constructing OffloadThreadPool using $factory") - factory.newPool(poolSize, stats) + factory.newPool(poolSize, maxQueueLen, stats) case multiple => logger.error( s"Found multiple `OffloadThreadPoolFactory`s: $multiple. " + s"Using the default implementation.") - new DefaultThreadPoolExecutor(poolSize, stats) + new DefaultThreadPoolExecutor( + poolSize = poolSize, + maxQueueLen = maxQueueLen, + stats = stats + ) } } } diff --git a/finagle-core/src/main/scala/com/twitter/finagle/offload/OffloadThreadPoolFactory.scala b/finagle-core/src/main/scala/com/twitter/finagle/offload/OffloadThreadPoolFactory.scala index 2dd8a013a0..fa3266dd83 100644 --- a/finagle-core/src/main/scala/com/twitter/finagle/offload/OffloadThreadPoolFactory.scala +++ b/finagle-core/src/main/scala/com/twitter/finagle/offload/OffloadThreadPoolFactory.scala @@ -14,12 +14,14 @@ abstract class OffloadThreadPoolFactory { /** Construct a new `ExecutorService` * - * @param poolSize The size of the pool as configured by the finagle flags - * `com.twitter.finagle.offload.numWorkers` and `com.twitter.finagle.offload.auto` - * @param stats `StatsReceiver` to use for observability. + * @param poolSize The size of the pool as configured by the finagle flags + * `com.twitter.finagle.offload.numWorkers` and `com.twitter.finagle.offload.auto` + * @param maxQueueLen The maximum length of the queue in the pool as configured by the finagle flag + * `com.twitter.finagle.offload.maxQueueLen` + * @param stats `StatsReceiver` to use for observability. */ - def newPool(poolSize: Int, stats: StatsReceiver): ExecutorService + def newPool(poolSize: Int, maxQueueLen: Int, stats: StatsReceiver): ExecutorService /** Implementors should make the `toString` method meaningful and it will be used in log entries */ override def toString: String diff --git a/finagle-core/src/main/scala/com/twitter/finagle/offload/maxQueueLength.scala b/finagle-core/src/main/scala/com/twitter/finagle/offload/maxQueueLength.scala new file mode 100644 index 0000000000..b234d93a35 --- /dev/null +++ b/finagle-core/src/main/scala/com/twitter/finagle/offload/maxQueueLength.scala @@ -0,0 +1,10 @@ +package com.twitter.finagle.offload + +import com.twitter.app.GlobalFlag + +object maxQueueLength + extends GlobalFlag[Int]( + default = Int.MaxValue, + help = + "Experimental flag. Sets the maximum number of jobs in the thread pool in the offload filter" + )