@@ -177,19 +177,23 @@ public synchronized void enqueueTasksForRegionalJob (RegionalAnalysis regionalAn
177
177
LOG .error ("Someone tried to enqueue job {} but it already exists." , templateTask .jobId );
178
178
throw new RuntimeException ("Enqueued duplicate job " + templateTask .jobId );
179
179
}
180
+ // Create the Job object to share with the MultiOriginAssembler, but defer adding this job to the Multimap of
181
+ // active jobs until we're sure the result assembler was constructed without any errors. Always add and remove
182
+ // the Job and corresponding MultiOriginAssembler as a unit in the same synchronized block of code (see #887).
180
183
WorkerTags workerTags = WorkerTags .fromRegionalAnalysis (regionalAnalysis );
181
184
Job job = new Job (templateTask , workerTags );
182
- jobs .put (job .workerCategory , job );
183
185
184
186
// Register the regional job so results received from multiple workers can be assembled into one file.
187
+ // If any parameters fail checks here, an exception may cause this method to exit early.
185
188
// TODO encapsulate MultiOriginAssemblers in a new Component
186
- // Note: if this fails with an exception we'll have a job enqueued, possibly being processed, with no assembler.
187
- // That is not catastrophic, but the user may need to recognize and delete the stalled regional job.
188
189
MultiOriginAssembler assembler = new MultiOriginAssembler (regionalAnalysis , job , fileStorage );
189
190
resultAssemblers .put (templateTask .jobId , assembler );
190
191
192
+ // A MultiOriginAssembler was successfully put in place. It's now safe to register and start the Job.
193
+ jobs .put (job .workerCategory , job );
194
+
195
+ // If this is a fake job for testing, don't confuse the worker startup code below with its null graph ID.
191
196
if (config .testTaskRedelivery ()) {
192
- // This is a fake job for testing, don't confuse the worker startup code below with null graph ID.
193
197
return ;
194
198
}
195
199
@@ -385,14 +389,20 @@ public synchronized void markTaskCompleted (Job job, int taskId) {
385
389
}
386
390
387
391
/**
388
- * When job.errors is non-empty, job.isErrored() becomes true and job.isActive() becomes false.
392
+ * Record an error that happened while a worker was processing a task on the given job. This method is tolerant
393
+ * of job being null, because it's called on a code path where any number of things could be wrong or missing.
394
+ * This method also ensures synchronization of writes to Jobs from any non-synchronized sections of an HTTP handler.
395
+ * Once job.errors is non-empty, job.isErrored() becomes true and job.isActive() becomes false.
389
396
* The Job will stop delivering tasks, allowing workers to shut down, but will continue to exist allowing the user
390
397
* to see the error message. User will then need to manually delete it, which will remove the result assembler.
391
- * This method ensures synchronization of writes to Jobs from the unsynchronized worker poll HTTP handler.
392
398
*/
393
399
private synchronized void recordJobError (Job job , String error ) {
394
400
if (job != null ) {
395
- job .errors .add (error );
401
+ // Limit the number of errors recorded to one.
402
+ // Still using a Set<String> instead of just String since the set of errors is exposed in a UI-facing API.
403
+ if (job .errors .isEmpty ()) {
404
+ job .errors .add (error );
405
+ }
396
406
}
397
407
}
398
408
@@ -488,21 +498,23 @@ public void handleRegionalWorkResult(RegionalWorkResult workResult) {
488
498
// Once the job is retrieved, it can be used below to requestExtraWorkersIfAppropriate without synchronization,
489
499
// because that method only uses final fields of the job.
490
500
Job job = null ;
491
- MultiOriginAssembler assembler ;
492
501
try {
502
+ MultiOriginAssembler assembler ;
493
503
synchronized (this ) {
494
504
job = findJob (workResult .jobId );
505
+ // Record any error reported by the worker and don't pass bad results on to regional result assembly.
506
+ // This will mark the job as errored and not-active, stopping distribution of tasks to workers.
507
+ // To ensure that happens, record errors before any other conditional that could exit this method.
508
+ if (workResult .error != null ) {
509
+ recordJobError (job , workResult .error );
510
+ return ;
511
+ }
495
512
assembler = resultAssemblers .get (workResult .jobId );
496
513
if (job == null || assembler == null || !job .isActive ()) {
497
514
// This will happen naturally for all delivered tasks after a job is deleted or it errors out.
498
515
LOG .debug ("Ignoring result for unrecognized, deleted, or inactive job ID {}." , workResult .jobId );
499
516
return ;
500
517
}
501
- if (workResult .error != null ) {
502
- // Record any error reported by the worker and don't pass bad results on to regional result assembly.
503
- recordJobError (job , workResult .error );
504
- return ;
505
- }
506
518
// Mark tasks completed first before passing results to the assembler. On the final result received,
507
519
// this will minimize the risk of race conditions by quickly making the job invisible to incoming stray
508
520
// results from spurious redeliveries, before the assembler is busy finalizing and uploading results.
0 commit comments