Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

k8s: Use contexts instead of namespaces for flexibility #228

Open
wants to merge 3 commits into
base: 0.11.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ services:
branches:
only:
- master
- 0.9.x
- /0\.[0-9]+\.x/

install:
- bin/install-promtool
Expand Down
1 change: 0 additions & 1 deletion core/src/main/resources/nelson/defaults.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,6 @@ nelson {
# scheduler {
# scheduler = "nomad"
# kubernetes {
# in-cluster = false
# timeout = 10 seconds
# kubeconfig = "/opt/application/conf/kubeconfig"
# }
Expand Down
15 changes: 5 additions & 10 deletions core/src/main/scala/Config.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package nelson

import nelson.BannedClientsConfig.HttpUserAgent
import nelson.Infrastructure.KubernetesMode
import nelson.audit.{Auditor,AuditEvent}
import nelson.cleanup.ExpirationPolicy
import nelson.docker.Docker
Expand Down Expand Up @@ -542,14 +541,10 @@ object Config {
})
}

def readKubernetesOutClusterParams(kfg: KConfig): Option[KubernetesMode] =
kfg.lookup[String]("kubeconfig").map(kubeconfig => KubernetesMode.OutCluster(Paths.get(kubeconfig)))

def readKubernetesInfrastructure(kfg: KConfig): Option[Infrastructure.Kubernetes] = for {
inCluster <- kfg.lookup[Boolean]("in-cluster")
mode <- if (inCluster) Some(KubernetesMode.InCluster) else readKubernetesOutClusterParams(kfg)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will also break any existing configuration files in the field.

timeout <- kfg.lookup[FiniteDuration]("timeout")
} yield Infrastructure.Kubernetes(mode, timeout)
kubeconfig <- kfg.lookup[String]("kubeconfig").map(k => Paths.get(k))
timeout <- kfg.lookup[FiniteDuration]("timeout")
} yield Infrastructure.Kubernetes(kubeconfig, timeout)

def readNomadScheduler(kfg: KConfig): IO[SchedulerOp ~> IO] =
readNomadInfrastructure(kfg) match {
Expand Down Expand Up @@ -614,8 +609,8 @@ object Config {

case Some("kubernetes") =>
readKubernetesInfrastructure(schedConfig.subconfig("kubernetes")) match {
case Some(Infrastructure.Kubernetes(mode, timeout)) =>
val kubectl = new Kubectl(mode)
case Some(Infrastructure.Kubernetes(kubeconfig, timeout)) =>
val kubectl = new Kubectl(kubeconfig)
IO.pure((
new KubernetesShell(kubectl, timeout, ec, schedulerPool),
new KubernetesHealthClient(kubectl, timeout, ec, schedulerPool),
Expand Down
14 changes: 1 addition & 13 deletions core/src/main/scala/Datacenter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -73,22 +73,10 @@ object Infrastructure {
)

final case class Kubernetes(
mode: KubernetesMode,
kubeconfig: Path,
timeout: FiniteDuration
)

sealed abstract class KubernetesMode extends Product with Serializable {
def environment: List[(String, String)] = this match {
case KubernetesMode.InCluster => List.empty
case KubernetesMode.OutCluster(path) => List(("KUBECONFIG", path.toString))
}
}

object KubernetesMode {
final case object InCluster extends KubernetesMode
final case class OutCluster(kubeconfig: Path) extends KubernetesMode
}

final case class Credentials(
username: String,
password: String
Expand Down
24 changes: 12 additions & 12 deletions core/src/main/scala/Kubectl.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package nelson

import nelson.Datacenter.StackName
import nelson.Infrastructure.KubernetesMode
import nelson.health.{HealthCheck, HealthStatus, Passing, Failing, Unknown}

import argonaut.{CursorHistory, DecodeJson, Parse}
Expand All @@ -14,25 +13,26 @@ import fs2.Stream

import java.io.{ByteArrayInputStream, InputStream}
import java.nio.charset.StandardCharsets.UTF_8
import java.nio.file.Path

import scala.sys.process.{Process, ProcessLogger}
import scala.collection.mutable.ListBuffer

final class Kubectl(mode: KubernetesMode) {
final class Kubectl(kubeconfig: Path) {
import Kubectl._

def apply(payload: String): IO[String] = {
def apply(payload: String, namespace: NamespaceName): IO[String] = {
val input = IO { new ByteArrayInputStream(payload.getBytes(UTF_8)) }
for {
result <- exec(List("kubectl", "apply", "-f", "-"), input)
result <- exec(List("kubectl", "apply", "--context", namespace.root.asString, "-f", "-"), input)
output <- result.output
} yield output.mkString("\n")
}

def delete(payload: String): IO[String] = {
def delete(payload: String, namespace: NamespaceName): IO[String] = {
val input = IO { new ByteArrayInputStream(payload.getBytes(UTF_8)) }
for {
result <- exec(List("kubectl", "delete", "-f", "-"), input)
result <- exec(List("kubectl", "delete", "--context", namespace.root.asString, "-f", "-"), input)
output <- result.output
} yield output.mkString("\n")
}
Expand All @@ -53,7 +53,7 @@ final class Kubectl(mode: KubernetesMode) {

def getPods(namespace: NamespaceName, stackName: StackName): IO[List[HealthStatus]] = {
implicit val healthStatusDecoder = healthStatusDecodeJson
exec(List("kubectl", "get", "pods", "-l", s"stackName=${stackName.toString}", "-n", namespace.root.asString, "-o", "json"), emptyStdin)
exec(List("kubectl", "get", "pods", "-l", s"stackName=${stackName.toString}", "--context", namespace.root.asString, "-o", "json"), emptyStdin)
.flatMap(_.output)
.flatMap { stdout =>
IO.fromEither(for {
Expand All @@ -64,14 +64,14 @@ final class Kubectl(mode: KubernetesMode) {
}

def getDeployment(namespace: NamespaceName, stackName: StackName): IO[DeploymentStatus] =
exec(List("kubectl", "get", "deployment", stackName.toString, "-n", namespace.root.asString, "-o", "json"), emptyStdin)
exec(List("kubectl", "get", "deployment", stackName.toString, "--context", namespace.root.asString, "-o", "json"), emptyStdin)
.flatMap(_.output)
.flatMap { stdout =>
IO.fromEither(Parse.decodeEither[DeploymentStatus](stdout.mkString("\n")).leftMap(kubectlJsonError))
}

def getCronJob(namespace: NamespaceName, stackName: StackName): IO[JobStatus] =
exec(List("kubectl", "get", "job", "-l", s"stackName=${stackName.toString}", "-n", namespace.root.asString, "-o", "json"), emptyStdin)
exec(List("kubectl", "get", "job", "-l", s"stackName=${stackName.toString}", "--context", namespace.root.asString, "-o", "json"), emptyStdin)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually this is very breaking... this assumes that the kubeconfig files people are using are structured in a particular manner, correct? This would break my setup today.

.flatMap(_.output)
.flatMap { stdout =>
IO.fromEither(for {
Expand All @@ -81,14 +81,14 @@ final class Kubectl(mode: KubernetesMode) {
}

def getJob(namespace: NamespaceName, stackName: StackName): IO[JobStatus] =
exec(List("kubectl", "get", "job", stackName.toString, "-n", namespace.root.asString, "-o", "json"), emptyStdin)
exec(List("kubectl", "get", "job", stackName.toString, "--context", namespace.root.asString, "-o", "json"), emptyStdin)
.flatMap(_.output)
.flatMap { stdout =>
IO.fromEither(Parse.decodeEither[JobStatus](stdout.mkString("\n")).leftMap(kubectlJsonError))
}

private def deleteV1(namespace: String, objectType: String, name: String): IO[Output] =
exec(List("kubectl", "delete", objectType, name, "-n", namespace), emptyStdin)
exec(List("kubectl", "delete", objectType, name, "--context", namespace), emptyStdin)

private def exec(cmd: List[String], stdin: IO[InputStream]): IO[Output] = {
// We need the new cats-effect resource safety hotness..
Expand All @@ -98,7 +98,7 @@ final class Kubectl(mode: KubernetesMode) {
stdout <- IO(ListBuffer.empty[String])
stderr <- IO(ListBuffer.empty[String])
logger <- IO(ProcessLogger(sout => { stdout += sout; () }, serr => { stderr += serr; () }))
exitCode <- IO((Process(cmd, None, mode.environment: _*) #< is).run(logger).exitValue)
exitCode <- IO((Process(cmd, None, ("KUBECONFIG", kubeconfig.toString)) #< is).run(logger).exitValue)
} yield Output(stdout.toList, stderr.toList, exitCode)
}
}, is => IO(is.close()))
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/scheduler/KubernetesShell.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ final class KubernetesShell(
}
}

deployment.renderedBlueprint.fold(fallback)(spec => kubectl.delete(spec).void)
deployment.renderedBlueprint.fold(fallback)(spec => kubectl.delete(spec, ns).void)
}

// Janky heuristic to see if an attempted (legacy) deletion failed because
Expand Down Expand Up @@ -89,7 +89,7 @@ final class KubernetesShell(

for {
t <- template
r <- kubectl.apply(t.render(env))
r <- kubectl.apply(t.render(env), ns)
} yield r
}

Expand Down
4 changes: 2 additions & 2 deletions core/src/test/resources/nelson/datacenters-missing-consul.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ nelson {
scheduler {
scheduler = "kubernetes"
kubernetes {
in-cluster = true
kubeconfig = "~/.kube/config"
timeout = 1 second
}
}
Expand Down Expand Up @@ -68,7 +68,7 @@ nelson {
scheduler {
scheduler = "kubernetes"
kubernetes {
in-cluster = true
kubeconfig = /foo/bar/.kube/config
timeout = 1 second
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ nelson {
scheduler {
scheduler = "kubernetes"
kubernetes {
in-cluster = true
kubeconfig = "~/.kube/config"
timeout = 1 second
}
}
Expand Down
3 changes: 1 addition & 2 deletions core/src/test/resources/nelson/datacenters.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ nelson {
scheduler {
scheduler = "kubernetes"
kubernetes {
in-cluster = true
kubeconfig = "~/.kube/config"
timeout = 1 second
}
}
Expand Down Expand Up @@ -77,7 +77,6 @@ nelson {
scheduler {
scheduler = "kubernetes"
kubernetes {
in-cluster = false
kubeconfig = "/foo/bar/.kube/config"
timeout = 2 seconds
}
Expand Down
9 changes: 3 additions & 6 deletions docs/src/hugo/content/getting-started/install.md
Original file line number Diff line number Diff line change
Expand Up @@ -186,13 +186,10 @@ nelson {
scheduler {
scheduler = "kubernetes"
kubernetes {
# is nelson hosted on the cluster it is managing?
in-cluster = false
# in the event that Nelson is running on a substrate
# outside of the deployment clusters, specify the path
# to the kubeconfig which will dictate how kubectl does
# authentication with the cluster and so forth.
# specify the path to the kubeconfig which will dictate
# kubectl do authentication with the cluster and so forth.
kubeconfig = /path/to/.kube/config
timeout = 10 seconds
}
}

Expand Down
2 changes: 1 addition & 1 deletion etc/development/http/http.dev.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ nelson {
scheduler {
scheduler = "kubernetes"
kubernetes {
in-cluster = true
kubeconfig = "~/.kube/config"
timeout = 1 second
}
}
Expand Down
2 changes: 1 addition & 1 deletion project.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,4 @@ lazy val http = project.dependsOn(core % "test->test;compile->compile")

enablePlugins(DisablePublishingPlugin)

addCommandAlias("ci", ";test;coverageReport;coverageAggregate;tut;unidoc")
addCommandAlias("ci", ";test;coverageReport;coverageAggregate")