Skip to content

Commit 3c163fa

Browse files
authored
Fix SlaEnforcer error from no previous job record (#125)
* Avoid slaEnforcer on empty cluster * Use cluster default for quickSubmit * Refactor if format
1 parent fc65856 commit 3c163fa

File tree

3 files changed

+53
-15
lines changed

3 files changed

+53
-15
lines changed

mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/JobClusterActor.java

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@
107107
import io.mantisrx.master.jobcluster.proto.JobClusterProto.KillJobRequest;
108108
import io.mantisrx.master.jobcluster.proto.JobProto;
109109
import io.mantisrx.runtime.JobConstraints;
110+
import io.mantisrx.runtime.JobSla;
110111
import io.mantisrx.runtime.descriptor.StageSchedulingInfo;
111112
import io.mantisrx.server.core.JobCompletedReason;
112113
import io.mantisrx.server.master.ConstraintsEvaluators;
@@ -127,8 +128,20 @@
127128
import io.mantisrx.shaded.com.google.common.collect.Lists;
128129
import java.time.Duration;
129130
import java.time.Instant;
130-
import java.util.*;
131-
import java.util.concurrent.*;
131+
import java.util.ArrayList;
132+
import java.util.Collections;
133+
import java.util.HashMap;
134+
import java.util.HashSet;
135+
import java.util.Iterator;
136+
import java.util.List;
137+
import java.util.Map;
138+
import java.util.Objects;
139+
import java.util.Optional;
140+
import java.util.Set;
141+
import java.util.TreeSet;
142+
import java.util.concurrent.CompletionStage;
143+
import java.util.concurrent.ConcurrentHashMap;
144+
import java.util.concurrent.ConcurrentMap;
132145
import java.util.stream.Collectors;
133146
import org.slf4j.Logger;
134147
import org.slf4j.LoggerFactory;
@@ -1235,6 +1248,7 @@ public void onJobClusterEnable(final EnableJobClusterRequest req) {
12351248

12361249
//start SLA timer
12371250
setBookkeepingTimer(BOOKKEEPING_INTERVAL_SECS);
1251+
12381252
eventPublisher.publishAuditEvent(
12391253
new LifecycleEventsProto.AuditEvent(LifecycleEventsProto.AuditEvent.AuditEventType.JOB_CLUSTER_ENABLED,
12401254
this.jobClusterMetadata.getJobClusterDefinition().getName(), name + " enabled")
@@ -1440,11 +1454,32 @@ private JobDefinition getResolvedJobDefinition(final String user, final Optional
14401454
// for request inheriting from non-terminal jobs, it has been sent to job actor instead.
14411455
Optional<JobDefinition> jobDefnOp = cloneJobDefinitionForQuickSubmitFromArchivedJobs(
14421456
jobManager.getCompletedJobsList(), empty(), jobStore);
1443-
if(jobDefnOp.isPresent()) {
1457+
if (jobDefnOp.isPresent()) {
14441458
logger.info("Inherited scheduling Info and parameters from previous job");
14451459
resolvedJobDefn = jobDefnOp.get();
1446-
} else {
1447-
throw new Exception("Job Definition could not retrieved from a previous submission (There may not be a previous submission)");
1460+
} else if (this.jobClusterMetadata != null
1461+
&& this.jobClusterMetadata.getJobClusterDefinition() != null &&
1462+
this.jobClusterMetadata.getJobClusterDefinition().getJobClusterConfig() != null) {
1463+
logger.info("No previous job definition found. Fall back to cluster definition: {}", this.name);
1464+
IJobClusterDefinition clusterDefinition = this.jobClusterMetadata.getJobClusterDefinition();
1465+
JobClusterConfig clusterConfig =
1466+
this.jobClusterMetadata.getJobClusterDefinition().getJobClusterConfig();
1467+
1468+
resolvedJobDefn = new JobDefinition.Builder()
1469+
.withJobSla(new JobSla.Builder().build())
1470+
.withArtifactName(clusterConfig.getArtifactName())
1471+
.withVersion(clusterConfig.getVersion())
1472+
.withLabels(clusterDefinition.getLabels())
1473+
.withName(this.name)
1474+
.withParameters(clusterDefinition.getParameters())
1475+
.withSchedulingInfo(clusterConfig.getSchedulingInfo())
1476+
.withUser(user)
1477+
.build();
1478+
logger.info("Built job definition from cluster definition: {}", resolvedJobDefn);
1479+
}
1480+
else {
1481+
throw new Exception("Job Definition could not retrieved from a previous submission (There may "
1482+
+ "not be a previous submission)");
14481483
}
14491484
}
14501485

mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/api/akka/route/v0/JobClusterRouteTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -326,11 +326,11 @@ public void testJobClusterQuickSubmit() throws InterruptedException {
326326
HttpRequest.POST(namedJobAPIEndpoint("quicksubmit"))
327327
.withEntity(ContentTypes.create(MediaTypes.APPLICATION_X_WWW_FORM_URLENCODED), JobClusterPayloads.QUICK_SUBMIT));
328328
responseFuture
329-
.thenCompose(r -> processRespFut(r, 400))
329+
.thenCompose(r -> processRespFut(r, 200))
330330
.whenComplete((msg, t) -> {
331331
String responseMessage = getResponseMessage(msg, t);
332332
logger.info("got response {}", responseMessage);
333-
assertTrue(responseMessage.contains("Job Definition could not retrieved from a previous submission (There may not be a previous submission)"));
333+
assertTrue(responseMessage.contains("sine-function-1"));
334334
latch.countDown();
335335
});
336336
assertTrue(latch.await(1, TimeUnit.SECONDS));
@@ -385,7 +385,7 @@ public void testJobClusterGetDetail() throws InterruptedException {
385385
assertEquals("sine-function", jc.getName());
386386
// TODO fix Jars list
387387
assertEquals(2, jc.getJars().size());
388-
assertEquals(1, jc.getJars().get(0).getSchedulingInfo().getStages().size());
388+
assertEquals(2, jc.getJars().get(0).getSchedulingInfo().getStages().size());
389389
assertEquals(1, jc.getJars().get(0).getSchedulingInfo().getStages().get(1).getNumberOfInstances());
390390
assertEquals(true, jc.getJars().get(0).getSchedulingInfo().getStages().get(1).getScalable());
391391
assertEquals("sine-function", jc.getName());
@@ -412,7 +412,7 @@ public void testJobClusterGetJobIds() throws InterruptedException {
412412
logger.info("got response {}", responseMessage);
413413
List<JobClusterProtoAdapter.JobIdInfo> jobIdInfos = Jackson.fromJSON(responseMessage, new TypeReference<List<JobClusterProtoAdapter.JobIdInfo>>() {
414414
});
415-
assertEquals(0, jobIdInfos.size());
415+
assertEquals(1, jobIdInfos.size());
416416
} catch (Exception e) {
417417
fail("unexpected error "+ e.getMessage());
418418
}

mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/JobClusterTest.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import static io.mantisrx.master.jobcluster.JobClusterActor.JobInfo;
2020
import static io.mantisrx.master.jobcluster.JobClusterActor.props;
2121
import static io.mantisrx.master.jobcluster.proto.BaseResponse.ResponseCode.CLIENT_ERROR;
22-
import static io.mantisrx.master.jobcluster.proto.BaseResponse.ResponseCode.CLIENT_ERROR_NOT_FOUND;
2322
import static io.mantisrx.master.jobcluster.proto.BaseResponse.ResponseCode.SERVER_ERROR;
2423
import static io.mantisrx.master.jobcluster.proto.BaseResponse.ResponseCode.SUCCESS;
2524
import static io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.DisableJobClusterRequest;
@@ -51,7 +50,8 @@
5150
import static io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.UpdateJobClusterResponse;
5251
import static io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.UpdateJobClusterSLAResponse;
5352
import static io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.UpdateJobClusterWorkerMigrationStrategyResponse;
54-
import static java.util.Optional.*;
53+
import static java.util.Optional.empty;
54+
import static java.util.Optional.of;
5555
import static org.junit.Assert.assertEquals;
5656
import static org.junit.Assert.assertFalse;
5757
import static org.junit.Assert.assertTrue;
@@ -105,7 +105,10 @@
105105
import io.mantisrx.runtime.WorkerMigrationConfig;
106106
import io.mantisrx.runtime.WorkerMigrationConfig.MigrationStrategyEnum;
107107
import io.mantisrx.runtime.command.InvalidJobException;
108-
import io.mantisrx.runtime.descriptor.*;
108+
import io.mantisrx.runtime.descriptor.DeploymentStrategy;
109+
import io.mantisrx.runtime.descriptor.SchedulingInfo;
110+
import io.mantisrx.runtime.descriptor.StageDeploymentStrategy;
111+
import io.mantisrx.runtime.descriptor.StageScalingPolicy;
109112
import io.mantisrx.server.core.JobCompletedReason;
110113
import io.mantisrx.server.core.Status;
111114
import io.mantisrx.server.core.Status.TYPE;
@@ -1969,12 +1972,12 @@ public void testQuickJobSubmitWithNoPreviousHistoryFails() {
19691972
final JobDefinition jobDefn = null;
19701973
String jobId = clusterName + "-1";
19711974

1972-
JobTestHelper.submitJobAndVerifyStatus(probe, clusterName, jobClusterActor, jobDefn, null, CLIENT_ERROR);
1975+
JobTestHelper.submitJobAndVerifyStatus(probe, clusterName, jobClusterActor, jobDefn, jobId, SUCCESS);
19731976

1974-
JobTestHelper.getJobDetailsAndVerify(probe, jobClusterActor, jobId, CLIENT_ERROR_NOT_FOUND, JobState.Noop);
1977+
JobTestHelper.getJobDetailsAndVerify(probe, jobClusterActor, jobId, SUCCESS, JobState.Accepted);
19751978

19761979
verify(jobStoreMock, times(1)).createJobCluster(any());
1977-
verify(jobStoreMock, times(0)).updateJobCluster(any());
1980+
verify(jobStoreMock, times(1)).updateJobCluster(any());
19781981
} catch (Exception e) {
19791982
e.printStackTrace();
19801983
fail();

0 commit comments

Comments
 (0)