Skip to content

Commit d048a3d

Browse files
authored
Only count resubmission from worker failures (#142)
* Add logging to SinkClient to track uncleaned connections * Only count resubmission from worker failures Co-authored-by: Calvin Cheung <[email protected]>
1 parent 86556dd commit d048a3d

File tree

1 file changed

+3
-4
lines changed
  • mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/job

1 file changed

+3
-4
lines changed

mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/job/JobActor.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1328,7 +1328,7 @@ class WorkerManager implements IWorkerManager {
13281328
private String currentJobSchedulingInfoStr = null;
13291329
private final WorkerResubmitRateLimiter resubmitRateLimiter = new WorkerResubmitRateLimiter();
13301330
// Use expiring cache to effectively track worker resubmitted in the last hour.
1331-
private Cache<Integer, Boolean> recentlyResubmittedWorkersCache = CacheBuilder.newBuilder()
1331+
private Cache<Integer, Boolean> recentErrorWorkersCache = CacheBuilder.newBuilder()
13321332
.expireAfterWrite(1, TimeUnit.HOURS)
13331333
.build();
13341334
private volatile boolean stageAssignmentPotentiallyChanged;
@@ -2123,7 +2123,7 @@ public void processEvent(WorkerEvent event, JobState jobState) {
21232123
eventPublisher.publishStatusEvent(new LifecycleEventsProto.WorkerStatusEvent(WARN,
21242124
"resubmitting lost worker ", wm.getStageNum(),
21252125
wm.getWorkerId(), wm.getState()));
2126-
2126+
recentErrorWorkersCache.put(wm.getWorkerNumber(), true);
21272127
resubmitWorker(workerOp.get());
21282128
return;
21292129
} else if (WorkerState.isTerminalState(wm.getState())) { // worker has explicitly
@@ -2251,7 +2251,7 @@ private void resubmitWorker(JobWorker oldWorker) throws Exception {
22512251
Map<Integer, Integer> workerToStageMap = mantisJobMetaData.getWorkerNumberToStageMap();
22522252

22532253
IMantisWorkerMetadata oldWorkerMetadata = oldWorker.getMetadata();
2254-
if (recentlyResubmittedWorkersCache.size()
2254+
if (recentErrorWorkersCache.size()
22552255
< ConfigurationProvider.getConfig().getMaximumResubmissionsPerWorker()) {
22562256

22572257
Integer stageNo = workerToStageMap.get(oldWorkerMetadata.getWorkerId().getWorkerNum());
@@ -2298,7 +2298,6 @@ private void resubmitWorker(JobWorker oldWorker) throws Exception {
22982298
markStageAssignmentsChanged(true);
22992299
// queue the new worker for execution
23002300
queueTask(newWorker.getMetadata(), delayDuration);
2301-
recentlyResubmittedWorkersCache.put(oldWorkerMetadata.getWorkerNumber(), true);
23022301
LOGGER.info("Worker {} successfully queued for scheduling", newWorker);
23032302
numWorkerResubmissions.increment();
23042303
} else {

0 commit comments

Comments
 (0)