Skip to content

Commit

Permalink
Fix restarting unhealthy instances. (#6882)
Browse files Browse the repository at this point in the history
  • Loading branch information
jeschkies committed Apr 8, 2019
1 parent 69a4a14 commit 8a92920
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 7 deletions.
Expand Up @@ -28,9 +28,10 @@ private[health] class HealthCheckActor(
healthCheck: HealthCheck,
instanceTracker: InstanceTracker,
eventBus: EventStream,
healthCheckHub: Sink[(AppDefinition, Instance, MarathonHealthCheck, ActorRef), NotUsed])(implicit mat: ActorMaterializer)
healthCheckHub: Sink[(AppDefinition, Instance, MarathonHealthCheck, ActorRef), NotUsed])
extends Actor with StrictLogging {

implicit val mat = ActorMaterializer()
import context.dispatcher

var healthByInstanceId = TrieMap.empty[Instance.Id, Health]
Expand Down Expand Up @@ -74,11 +75,9 @@ private[health] class HealthCheckActor(
def purgeStatusOfDoneInstances(instances: Seq[Instance]): Unit = {
logger.debug(s"Purging health status of inactive instances for app ${app.id} version ${app.version} and healthCheck ${healthCheck}")

val activeInstanceIds: Set[Instance.Id] = instances.withFilter(_.isLaunched).map(_.instanceId)(collection.breakOut)
// The Map built with filterKeys wraps the original map and contains a reference to activeInstanceIds.
// Therefore we materialize it into a new map.
activeInstanceIds.foreach { activeId =>
healthByInstanceId.remove(activeId)
val inactiveInstanceIds: Set[Instance.Id] = instances.filterNot(_.isActive).map(_.instanceId)(collection.breakOut)
inactiveInstanceIds.foreach { inactiveId =>
healthByInstanceId.remove(inactiveId)
}

val checksToPurge = instances.withFilter(!_.isActive).map(instance => {
Expand Down Expand Up @@ -207,7 +206,7 @@ object HealthCheckActor {
healthCheck: HealthCheck,
instanceTracker: InstanceTracker,
eventBus: EventStream,
healthCheckHub: Sink[(AppDefinition, Instance, MarathonHealthCheck, ActorRef), NotUsed])(implicit mat: ActorMaterializer): Props = {
healthCheckHub: Sink[(AppDefinition, Instance, MarathonHealthCheck, ActorRef), NotUsed]): Props = {

Props(new HealthCheckActor(
app,
Expand Down
@@ -0,0 +1,77 @@
package mesosphere.marathon
package integration

import java.util.UUID

import mesosphere.AkkaIntegrationTest
import mesosphere.marathon.core.task.Task
import mesosphere.marathon.integration.setup.EmbeddedMarathonTest
import mesosphere.marathon.raml.{AppHealthCheck, AppHealthCheckProtocol}
import mesosphere.marathon.state.PathId

import scala.concurrent.duration._

class HealthCheckIntegrationTest extends AkkaIntegrationTest with EmbeddedMarathonTest {

def appId(suffix: Option[String] = None): PathId = testBasePath / s"app-${suffix.getOrElse(UUID.randomUUID)}"

"Health checks" should {
"kill instance with failing Marathon health checks" in {
Given("a deployed app with health checks")
val id = appId(Some(s"replace-marathon-http-health-check"))
val app = appProxy(id, "v1", instances = 1, healthCheck = None).
copy(healthChecks = Set(ramlHealthCheck(AppHealthCheckProtocol.Http)))
val check = registerAppProxyHealthCheck(id, "v1", state = true)
val result = marathon.createAppV2(app)
result should be(Created)
waitForDeployment(result)

When("the app becomes unhealthy")
val oldTaskId = marathon.tasks(id).value.head.id
check.afterDelay(1.seconds, false)

Then("the unhealthy instance is killed")
waitForEventWith("unhealthy_instance_kill_event", { event => event.info("taskId") == oldTaskId })

And("a replacement is started")
check.afterDelay(1.seconds, true)
eventually {
val currentTasks = marathon.tasks(id).value
currentTasks should have size (1)
currentTasks.map(_.id) should not contain (oldTaskId)
}
}

"kill instance with failing Mesos health checks" in {
Given("a deployed app with health checks")
val id = appId(Some(s"replace-mesos-http-health-check"))
val app = appProxy(id, "v1", instances = 1, healthCheck = None).
copy(healthChecks = Set(ramlHealthCheck(AppHealthCheckProtocol.Http)))
val check = registerAppProxyHealthCheck(id, "v1", state = true)
val result = marathon.createAppV2(app)
result should be(Created)
waitForDeployment(result)

When("the app becomes unhealthy")
val oldTaskId = marathon.tasks(id).value.head.id
val oldInstanceId = Task.Id(oldTaskId).instanceId.idString
check.afterDelay(1.seconds, false)

Then("the unhealthy instance is killed")
waitForEventWith("instance_changed_event", { event => event.info("condition") == "Killed" && event.info("instanceId") == oldInstanceId })

Then("the unhealthy instance is killed")
waitForEvent("unhealthy_instance_kill_event")
}
}

private def ramlHealthCheck(protocol: AppHealthCheckProtocol) = AppHealthCheck(
path = Some("/health"),
protocol = protocol,
gracePeriodSeconds = 3,
intervalSeconds = 1,
maxConsecutiveFailures = 3,
portIndex = Some(0),
delaySeconds = 3
)
}

0 comments on commit 8a92920

Please sign in to comment.