Skip to content

Commit 15ed398

Browse files
committed
add K8sJobRunnerSrv, which runs Cortex jobs using Kubernetes Jobs
1 parent da75a6e commit 15ed398

File tree

8 files changed

+184
-3
lines changed

8 files changed

+184
-3
lines changed

app/org/thp/cortex/services/JobRunnerSrv.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ class JobRunnerSrv @Inject() (
3030
artifactModel: ArtifactModel,
3131
processJobRunnerSrv: ProcessJobRunnerSrv,
3232
dockerJobRunnerSrv: DockerJobRunnerSrv,
33+
k8sJobRunnerSrv: K8sJobRunnerSrv,
3334
workerSrv: WorkerSrv,
3435
createSrv: CreateSrv,
3536
updateSrv: UpdateSrv,
@@ -49,6 +50,7 @@ class JobRunnerSrv @Inject() (
4950
.getOrElse(Seq("docker", "process"))
5051
.map(_.toLowerCase)
5152
.collect {
53+
case "kubernetes" if k8sJobRunnerSrv.isAvailable => "kubernetes"
5254
case "docker" if dockerJobRunnerSrv.isAvailable => "docker"
5355
case "process" =>
5456
Seq("", "2", "3").foreach { pythonVersion =>
@@ -67,6 +69,7 @@ class JobRunnerSrv @Inject() (
6769

6870
lazy val processRunnerIsEnable: Boolean = runners.contains("process")
6971
lazy val dockerRunnerIsEnable: Boolean = runners.contains("docker")
72+
lazy val k8sRunnerIsEnable: Boolean = runners.contains("kubernetes")
7073

7174
private object deleteVisitor extends SimpleFileVisitor[Path] {
7275
override def visitFile(file: Path, attrs: BasicFileAttributes): FileVisitResult = {
@@ -209,6 +212,14 @@ class JobRunnerSrv @Inject() (
209212
_ <- startJob(job)
210213
j <- runners
211214
.foldLeft[Option[Future[Unit]]](None) {
215+
case (None, "kubernetes") =>
216+
worker
217+
.dockerImage()
218+
.map(dockerImage => k8sJobRunnerSrv.run(jobFolder, dockerImage, job, worker.jobTimeout().map(_.minutes))(executionContext))
219+
.orElse {
220+
logger.warn(s"worker ${worker.id} can't be run with docker (doesn't have image)")
221+
None
222+
}
212223
case (None, "docker") =>
213224
worker
214225
.dockerImage()
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
package org.thp.cortex.services
2+
3+
import java.util.concurrent.TimeUnit
4+
import java.nio.file._
5+
6+
import scala.concurrent.duration.FiniteDuration
7+
import scala.concurrent.{ExecutionContext, Future}
8+
import scala.util.{Try, Success, Failure}
9+
import scala.collection.JavaConverters._
10+
11+
import play.api.{Configuration, Logger}
12+
13+
import akka.actor.ActorSystem
14+
import io.fabric8.kubernetes.client.{DefaultKubernetesClient}
15+
import io.fabric8.kubernetes.api.model.batch.{JobBuilder => KJobBuilder}
16+
import io.fabric8.kubernetes.api.model.{PersistentVolumeClaimVolumeSourceBuilder}
17+
import javax.inject.{Inject, Singleton}
18+
import org.thp.cortex.models._
19+
20+
@Singleton
21+
class K8sJobRunnerSrv(
22+
client: DefaultKubernetesClient,
23+
autoUpdate: Boolean,
24+
jobBaseDirectory: Path,
25+
persistentVolumeClaimName: String,
26+
implicit val system: ActorSystem
27+
) {
28+
29+
@Inject()
30+
def this(config: Configuration, system: ActorSystem) =
31+
this(
32+
new DefaultKubernetesClient(),
33+
config.getOptional[Boolean]("job.kubernetes.autoUpdate").getOrElse(true),
34+
Paths.get(config.get[String]("job.directory")),
35+
config.get[String]("job.kubernetes.persistentVolumeClaimName"),
36+
system: ActorSystem
37+
)
38+
39+
lazy val logger = Logger(getClass)
40+
41+
lazy val isAvailable: Boolean =
42+
Try {
43+
val ver = client.getVersion()
44+
logger.info(s"Kubernetes is available: major ${ver.getMajor()} minor ${ver.getMinor()} git ${ver.getGitCommit()}")
45+
true
46+
}.recover {
47+
case error =>
48+
logger.info(s"Kubernetes is not available", error)
49+
false
50+
}.get
51+
52+
def run(jobDirectory: Path, dockerImage: String, job: Job, timeout: Option[FiniteDuration])(implicit ec: ExecutionContext): Future[Unit] = {
53+
val cacertsFile = jobDirectory.resolve("input").resolve("cacerts")
54+
val relativeJobDirectory = jobBaseDirectory.relativize(jobDirectory).toString()
55+
// make the default longer than likely values, but still not infinite
56+
val timeout_or_default = timeout getOrElse new FiniteDuration(8, TimeUnit.HOURS)
57+
// https://kubernetes.io/docs/concepts/overview/working-with-objects/names/
58+
// FIXME: this collapses case, jeopardizing the uniqueness of the
59+
// identifier. LDH: lowercase, digits, hyphens.
60+
val ldh_jobid = "_".r.replaceAllIn(job.id.map(_.toLower), "-")
61+
val kjobName = "neuron-job-" + ldh_jobid
62+
val pvcvs = new PersistentVolumeClaimVolumeSourceBuilder()
63+
.withClaimName(persistentVolumeClaimName)
64+
.withReadOnly(false)
65+
.build();
66+
val kjob1 = new KJobBuilder()
67+
.withApiVersion("batch/v1")
68+
.withNewMetadata()
69+
.withName(kjobName)
70+
.withLabels(Map(
71+
"cortex-job-id" -> job.id,
72+
"cortex-worker-id" -> job.workerId(),
73+
"cortex-neuron-job" -> "true").asJava)
74+
.endMetadata()
75+
.withNewSpec()
76+
.withNewTemplate()
77+
.withNewSpec()
78+
.addNewVolume()
79+
.withName("job-directory")
80+
.withPersistentVolumeClaim(pvcvs)
81+
.endVolume()
82+
.addNewContainer()
83+
.withName("neuron")
84+
.withImage(dockerImage)
85+
.withArgs("/job")
86+
.addNewEnv()
87+
.withName("CORTEX_JOB_FOLDER")
88+
.withValue(relativeJobDirectory)
89+
.endEnv();
90+
val kjob2 = if (Files.exists(cacertsFile)) {
91+
kjob1.addNewEnv()
92+
.withName("REQUESTS_CA_BUNDLE")
93+
.withValue("/job/input/cacerts")
94+
.endEnv()
95+
} else {
96+
kjob1
97+
}
98+
val kjob3 = kjob2
99+
.addNewVolumeMount()
100+
.withName("job-directory")
101+
.withSubPathExpr("$(CORTEX_JOB_FOLDER)/input")
102+
.withMountPath("/job/input")
103+
.withReadOnly(true)
104+
.endVolumeMount()
105+
.addNewVolumeMount()
106+
.withName("job-directory")
107+
.withSubPathExpr("$(CORTEX_JOB_FOLDER)/output")
108+
.withMountPath("/job/output")
109+
.withReadOnly(false)
110+
.endVolumeMount()
111+
.endContainer()
112+
.withRestartPolicy("Never")
113+
.endSpec()
114+
.endTemplate()
115+
.endSpec()
116+
.build();
117+
118+
val execution = Future {
119+
val created_kjob = client.batch().jobs().create(kjob3)
120+
val created_env = created_kjob
121+
.getSpec().getTemplate().getSpec().getContainers().get(0)
122+
.getEnv().asScala;
123+
logger.info(
124+
s"Created Kubernetes Job ${created_kjob.getMetadata().getName()}\n" +
125+
s" timeout: ${timeout_or_default.toString}\n" +
126+
s" image : $dockerImage\n" +
127+
s" mount : pvc ${persistentVolumeClaimName} subdir ${relativeJobDirectory} as /job" +
128+
created_env.map(ev => s"\n env : ${ev.getName()} = ${ev.getValue()}").mkString)
129+
val ended_kjob = client.batch().jobs().withLabel("cortex-job-id", job.id)
130+
.waitUntilCondition((x => Option(x).flatMap(j =>
131+
Option(j.getStatus).flatMap(s =>
132+
Some(s.getConditions.asScala.map(_.getType).filter(t =>
133+
t.equals("Complete") || t.equals("Failed")).nonEmpty)))
134+
getOrElse false),
135+
timeout_or_default.length, timeout_or_default.unit);
136+
if(ended_kjob != null) {
137+
logger.info(s"Kubernetes Job ${ended_kjob.getMetadata().getName()} " +
138+
s"(for job ${job.id}) status is now ${ended_kjob.getStatus().toString()}")
139+
} else {
140+
logger.info(s"Kubernetes Job for ${job.id} no longer exists")
141+
}
142+
}.andThen {
143+
// let's find the job by the attribute we know is fundamentally
144+
// unique, rather than one constructed from it
145+
case Success(r) =>
146+
val deleted = client.batch().jobs().withLabel("cortex-job-id", job.id).delete()
147+
if(deleted) {
148+
logger.info(s"Deleted Kubernetes Job for job ${job.id}")
149+
} else {
150+
logger.info(s"While trying to delete Kubernetes Job for ${job.id}, the job was not found; this is OK")
151+
}
152+
Future {}
153+
case Failure(t) =>
154+
logger.warn(s"Some problem happened; not deleting Kubernetes Job for job ${job.id}")
155+
Future {}
156+
}
157+
execution
158+
}
159+
}

app/org/thp/cortex/services/WorkerSrv.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,10 +173,14 @@ class WorkerSrv @Inject() (
173173
workerDefinitions.filter {
174174
case w if w.command.isDefined && jobRunnerSrv.processRunnerIsEnable => true
175175
case w if w.dockerImage.isDefined && jobRunnerSrv.dockerRunnerIsEnable => true
176+
case w if w.dockerImage.isDefined && jobRunnerSrv.k8sRunnerIsEnable => true
176177
case w =>
177178
val reason =
178179
if (w.command.isDefined) "process runner is disabled"
179-
else if (w.dockerImage.isDefined) "Docker runner is disabled"
180+
else if (w.dockerImage.isDefined && !jobRunnerSrv.dockerRunnerIsEnable)
181+
"Docker runner is disabled"
182+
else if (w.dockerImage.isDefined && !jobRunnerSrv.k8sRunnerIsEnable)
183+
"Kubernetes runner is disabled"
180184
else "it doesn't have image nor command"
181185

182186
logger.warn(s"$workerType ${w.name} is disabled because $reason")

build.sbt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ libraryDependencies ++= Seq(
1515
Dependencies.reflections,
1616
Dependencies.zip4j,
1717
Dependencies.dockerClient,
18+
Dependencies.k8sClient,
1819
Dependencies.akkaCluster,
1920
Dependencies.akkaClusterTyped
2021
)

conf/reference.conf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ cache {
1111

1212
job {
1313
timeout = 30 minutes
14-
runners = [docker, process]
14+
runners = [kubernetes, docker, process]
1515
directory = ${java.io.tmpdir}
1616
dockerDirectory = ${job.directory}
1717
}

package/docker/entrypoint

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ SHOW_SECRET=${show_secret:-0}
2020
DAEMON_USER=${daemon_user:-cortex}
2121
JOB_DIRECTORY=${job_directory:-/tmp/cortex-jobs}
2222
DOCKER_JOB_DIRECTORY=${docker_job_directory:-}
23+
KUBERNETES_JOB_PVC=${kubernetes_job_pvc:-}
2324

2425
function usage {
2526
cat <<- _EOF_
@@ -33,6 +34,7 @@ function usage {
3334
--show-secret | show the generated secret
3435
--job-directory <dir> | use this directory to store job files
3536
--docker-job-directory <dir> | indicate the job directory in the host (not inside container)
37+
--kubernetes-job-pvc <name> | indicate the ReadWriteMany persistent volume claim holding job directory
3638
--analyzer-url <url> | where analyzers are located (url or path)
3739
--responder-url <url> | where responders are located (url or path)
3840
--start-docker | start a internal docker (inside container) to run analyzers/responders
@@ -56,6 +58,7 @@ do
5658
"--show-secret") SHOW_SECRET=1;;
5759
"--job-directory") shift; JOB_DIRECTORY=$1;;
5860
"--docker-job-directory") shift; DOCKER_JOB_DIRECTORY=$1;;
61+
"--kubernetes-job-pvc") shift; KUBERNETES_JOB_PVC=$1;;
5962
"--analyzer-path") echo "--analyzer-path is deprecated, please use --analyzer-url"
6063
shift; ANALYZER_URLS+=("$1");;
6164
"--responder-path") echo "--responder-path is deprecated, please use --responder-url"
@@ -112,6 +115,7 @@ then
112115

113116
test -n "$JOB_DIRECTORY" && echo "job.directory=\"$JOB_DIRECTORY\"" >> "$CONFIG_FILE"
114117
test -n "$DOCKER_JOB_DIRECTORY" && echo "job.dockerDirectory=\"$DOCKER_JOB_DIRECTORY\"" >> "$CONFIG_FILE"
118+
test -n "$KUBERNETES_JOB_PVC" && echo "job.kubernetes.persistentVolumeClaimName=\"$KUBERNETES_JOB_PVC\"" >> "$CONFIG_FILE"
115119

116120
function join_urls {
117121
echo -n "\"$1\""

project/Dependencies.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ object Dependencies {
2020
val zip4j = "net.lingala.zip4j" % "zip4j" % "1.3.2"
2121
val elastic4play = "org.thehive-project" %% "elastic4play" % "1.12.3"
2222
val dockerClient = "com.spotify" % "docker-client" % "8.14.4"
23+
val k8sClient = "io.fabric8" % "kubernetes-client" % "5.0.2"
2324
val akkaCluster = "com.typesafe.akka" %% "akka-cluster" % play.core.PlayVersion.akkaVersion
2425
val akkaClusterTyped = "com.typesafe.akka" %% "akka-cluster-typed" % play.core.PlayVersion.akkaVersion
2526
}

www/src/app/pages/analyzers/analyzers.service.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ export default class AnalyzerService {
4343

4444
if (def.dockerImage && def.dockerImage !== null) {
4545
def.runners.push('Docker');
46+
def.runners.push('Kubernetes');
4647
}
4748
});
4849

@@ -232,4 +233,4 @@ export default class AnalyzerService {
232233
return this.$http.post('./api/analyzer/' + id + '/run', postData);
233234
}
234235
}
235-
}
236+
}

0 commit comments

Comments
 (0)