From 12ff1d86ec470846821b8b2b81f95cf3fb0980d7 Mon Sep 17 00:00:00 2001 From: Anton Ivanov Date: Fri, 10 Nov 2023 14:51:21 +0000 Subject: [PATCH] [twitter.server] offload ShutdownHandler # Problem ShutdownHandler is executed on a netty IO thread and calls app.close which can easily block this thread. # Solution Offload execution to a thread. JIRA Issues: STOR-1705 Differential Revision: https://phabricator.twitter.biz/D1109579 --- .../server/handler/ShutdownHandler.scala | 30 ++++++++++++++++--- .../server/handler/ShutdownHandlerTest.scala | 27 +++++++++++++---- 2 files changed, 48 insertions(+), 9 deletions(-) diff --git a/server/src/main/scala/com/twitter/server/handler/ShutdownHandler.scala b/server/src/main/scala/com/twitter/server/handler/ShutdownHandler.scala index 0108611f..b8ab28c4 100644 --- a/server/src/main/scala/com/twitter/server/handler/ShutdownHandler.scala +++ b/server/src/main/scala/com/twitter/server/handler/ShutdownHandler.scala @@ -2,11 +2,18 @@ package com.twitter.server.handler import com.twitter.app.App import com.twitter.finagle.Service -import com.twitter.finagle.http.{Method, Request, Response, Status, Uri} +import com.twitter.finagle.http.Method +import com.twitter.finagle.http.Request +import com.twitter.finagle.http.Response +import com.twitter.finagle.http.Status +import com.twitter.finagle.http.Uri import com.twitter.io.Buf -import com.twitter.server.util.HttpUtils.{newOk, newResponse} +import com.twitter.server.util.HttpUtils.newOk +import com.twitter.server.util.HttpUtils.newResponse import com.twitter.util.logging.Logger -import com.twitter.util.{Duration, Future} +import com.twitter.util.Duration +import com.twitter.util.Future +import com.twitter.util.Local class ShutdownHandler(app: App) extends Service[Request, Response] { private[this] val log = Logger[ShutdownHandler] @@ -38,7 +45,22 @@ class ShutdownHandler(app: App) extends Service[Request, Response] { s"[${req.uri}] from ${req.remoteAddress.getHostAddress} " + s"quitting with grace period $grace" ) - app.close(grace) + + // Isolate a current thread (likely netty IO thread) + // from probable blocking calls inside app.close. + val ctx = Local.save() + new Thread( + () => + try { + Local.restore(ctx) + app.close(grace) + } catch { + case th: Throwable => + log.error("Exception when asking an app to close", th) + }, + "shutdown-handler" // thread name + ).start() + newOk("quitting\n") } else { log.info( diff --git a/server/src/test/scala/com/twitter/server/handler/ShutdownHandlerTest.scala b/server/src/test/scala/com/twitter/server/handler/ShutdownHandlerTest.scala index 2d353721..16e9f80a 100644 --- a/server/src/test/scala/com/twitter/server/handler/ShutdownHandlerTest.scala +++ b/server/src/test/scala/com/twitter/server/handler/ShutdownHandlerTest.scala @@ -1,10 +1,21 @@ package com.twitter.server.handler import com.twitter.conversions.DurationOps._ -import com.twitter.finagle.http.{Method, Request, Status} +import com.twitter.finagle.http.Method +import com.twitter.finagle.http.Request +import com.twitter.finagle.http.Status import com.twitter.server.TwitterServer -import com.twitter.util.{Await, Awaitable, Closable, Duration, Future, Time} +import com.twitter.util.Await +import com.twitter.util.Awaitable +import com.twitter.util.Closable +import com.twitter.util.Duration +import com.twitter.util.Future +import com.twitter.util.Time +import org.scalatest.concurrent.Eventually.eventually +import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.time.Seconds +import org.scalatest.time.Span class ShutdownHandlerTest extends AnyFunSuite { @@ -38,7 +49,9 @@ class ShutdownHandlerTest extends AnyFunSuite { val handler = new ShutdownHandler(closer) val rsp = await(handler(Request(Method.Post, "/foo"))) assert(rsp.status == Status.Ok) - assert(closer.closed) + eventually(Timeout(Span(5, Seconds))) { + assert(closer.closed) + } }) test("close without a grace period, but closer overrides default grace")( @@ -53,7 +66,9 @@ class ShutdownHandlerTest extends AnyFunSuite { val handler = new ShutdownHandler(closer) val rsp = await(handler(Request(Method.Post, "/foo"))) assert(rsp.status == Status.Ok) - assert(closer.closed) + eventually(Timeout(Span(5, Seconds))) { + assert(closer.closed) + } }) test("close with a grace period") { @@ -66,7 +81,9 @@ class ShutdownHandlerTest extends AnyFunSuite { val handler = new ShutdownHandler(closer) val rsp = await(handler(Request(Method.Post, "/foo?grace=" + grace.toString))) assert(rsp.status == Status.Ok) - assert(closer.closed) + eventually(Timeout(Span(5, Seconds))) { + assert(closer.closed) + } } test("fail when an invalid grace parameter is specified") {