Skip to content

Commit 6738e9b

Browse files
committed
deal with more failures and null things; log more needful things
1 parent 1f909f3 commit 6738e9b

File tree

1 file changed

+47
-33
lines changed

1 file changed

+47
-33
lines changed

app/org/thp/cortex/services/K8sJobRunnerSrv.scala

Lines changed: 47 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,22 @@
11
package org.thp.cortex.services
22

33
import java.util.concurrent.TimeUnit
4-
import java.nio.charset.StandardCharsets
54
import java.nio.file._
65

76
import scala.concurrent.duration.FiniteDuration
87
import scala.concurrent.{ExecutionContext, Future}
9-
import scala.util.Try
8+
import scala.util.{Try, Success, Failure}
109
import scala.collection.JavaConverters._
1110

12-
import play.api.libs.json.Json
1311
import play.api.{Configuration, Logger}
1412

1513
import akka.actor.ActorSystem
16-
import io.fabric8.kubernetes.client.{DefaultKubernetesClient, ConfigBuilder, Watcher}
17-
import io.fabric8.kubernetes.api.model.batch.{Job => KJob, JobBuilder => KJobBuilder}
14+
import io.fabric8.kubernetes.client.{DefaultKubernetesClient}
15+
import io.fabric8.kubernetes.api.model.batch.{JobBuilder => KJobBuilder}
1816
import io.fabric8.kubernetes.api.model.{PersistentVolumeClaimVolumeSourceBuilder}
19-
// import com.spotify.docker.client.DockerClient.LogsParam
20-
// import com.spotify.docker.client.messages.HostConfig.Bind
21-
// import com.spotify.docker.client.messages.{ContainerConfig, HostConfig}
22-
// import com.spotify.docker.client.{DefaultDockerClient, DockerClient}
2317
import javax.inject.{Inject, Singleton}
2418
import org.thp.cortex.models._
2519

26-
import org.elastic4play.utils.RichFuture
27-
2820
@Singleton
2921
class K8sJobRunnerSrv(
3022
client: DefaultKubernetesClient,
@@ -58,29 +50,27 @@ class K8sJobRunnerSrv(
5850
}.get
5951

6052
def run(jobDirectory: Path, dockerImage: String, job: Job, timeout: Option[FiniteDuration])(implicit ec: ExecutionContext): Future[Unit] = {
61-
// Spicy meatball: under Kubernetes, executions can fail for reasons
62-
// other than bad inputs, for example, if a node dies. So we maybe
63-
// can't say, "Kubernetes, only do this once! I will do any
64-
// necessary retrying." But there may be quotas on analyzer usage,
65-
// and retrying outside Cortex may use them up.
66-
6753
val cacertsFile = jobDirectory.resolve("input").resolve("cacerts")
68-
54+
val relativeJobDirectory = jobBaseDirectory.relativize(jobDirectory).toString()
55+
// make the default longer than likely values, but still not infinite
56+
val timeout_or_default = timeout getOrElse new FiniteDuration(8, TimeUnit.HOURS)
57+
// https://kubernetes.io/docs/concepts/overview/working-with-objects/names/
6958
// FIXME: this collapses case, jeopardizing the uniqueness of the
70-
// identifier.
71-
val kname = "_".r.replaceAllIn(job.id.map(_.toLower), "-")
59+
// identifier. LDH: lowercase, digits, hyphens.
60+
val ldh_jobid = "_".r.replaceAllIn(job.id.map(_.toLower), "-")
61+
val kjobName = "neuron-job-" + ldh_jobid
7262
val pvcvs = new PersistentVolumeClaimVolumeSourceBuilder()
7363
.withClaimName(persistentVolumeClaimName)
7464
.withReadOnly(false)
7565
.build();
7666
val kjob1 = new KJobBuilder()
7767
.withApiVersion("batch/v1")
7868
.withNewMetadata()
79-
.withName(kname)
69+
.withName(kjobName)
8070
.withLabels(Map(
8171
"cortex-job-id" -> job.id,
8272
"cortex-worker-id" -> job.workerId(),
83-
"cortex-job" -> "true").asJava)
73+
"cortex-neuron-job" -> "true").asJava)
8474
.endMetadata()
8575
.withNewSpec()
8676
.withNewTemplate()
@@ -95,7 +85,7 @@ class K8sJobRunnerSrv(
9585
.withArgs("/job")
9686
.addNewEnv()
9787
.withName("CORTEX_JOB_FOLDER")
98-
.withValue(jobBaseDirectory.relativize(jobDirectory).toString())
88+
.withValue(relativeJobDirectory)
9989
.endEnv();
10090
val kjob2 = if (Files.exists(cacertsFile)) {
10191
kjob1.addNewEnv()
@@ -124,20 +114,44 @@ class K8sJobRunnerSrv(
124114
.endTemplate()
125115
.endSpec()
126116
.build();
127-
logger.info(s"Constructed k8s Job ${kjob3.getMetadata().getName()}\n")
128117

129118
val execution = Future {
130119
val created_kjob = client.batch().jobs().create(kjob3)
131-
logger.info(s"Created k8s Job ${created_kjob.getMetadata().getName()}")
132-
// FIXME: use the given timeout value
133-
val ended_kjob = client.batch().jobs().withName(kname)
134-
.waitUntilCondition(j => (j.getStatus().getFailed() > 0 || j.getStatus().getSucceeded() > 0),
135-
5, TimeUnit.MINUTES );
136-
()
120+
val created_env = created_kjob
121+
.getSpec().getTemplate().getSpec().getContainers().get(0)
122+
.getEnv().asScala;
123+
logger.info(
124+
s"Created Kubernetes Job ${created_kjob.getMetadata().getName()}\n" +
125+
s" timeout: ${timeout_or_default.toString}\n" +
126+
s" image : $dockerImage\n" +
127+
s" mount : pvc ${persistentVolumeClaimName} subdir ${relativeJobDirectory} as /job" +
128+
created_env.map(ev => s"\n env : ${ev.getName()} = ${ev.getValue()}").mkString)
129+
val ended_kjob = client.batch().jobs().withLabel("cortex-job-id", job.id)
130+
.waitUntilCondition((x => Option(x).flatMap(j =>
131+
Option(j.getStatus).flatMap(s =>
132+
Some(s.getConditions.asScala.map(_.getType).filter(t =>
133+
t.equals("Complete") || t.equals("Failed")).nonEmpty)))
134+
getOrElse false),
135+
timeout_or_default.length, timeout_or_default.unit);
136+
if(ended_kjob != null) {
137+
logger.info(s"Kubernetes Job ${ended_kjob.getMetadata().getName()} " +
138+
s"(for job ${job.id}) status is now ${ended_kjob.getStatus().toString()}")
139+
} else {
140+
logger.info(s"Kubernetes Job for ${job.id} no longer exists")
141+
}
137142
}.andThen {
138-
case r =>
139-
val foo_kjob = client.batch().jobs().withName(kname).get()
140-
logger.info(s"k8s Job ${foo_kjob.getMetadata().getUid()} status ${foo_kjob.getStatus().toString()}")
143+
// let's find the job by the attribute we know is fundamentally
144+
// unique, rather than one constructed from it
145+
case Success(r) =>
146+
val deleted = client.batch().jobs().withLabel("cortex-job-id", job.id).delete()
147+
if(deleted) {
148+
logger.info(s"Deleted Kubernetes Job for job ${job.id}")
149+
} else {
150+
logger.info(s"While trying to delete Kubernetes Job for ${job.id}, the job was not found; this is OK")
151+
}
152+
Future {}
153+
case Failure(t) =>
154+
logger.warn(s"Some problem happened; not deleting Kubernetes Job for job ${job.id}")
141155
Future {}
142156
}
143157
execution

0 commit comments

Comments
 (0)