@@ -180,19 +180,20 @@ public void initFlowFrameworkIndexIfAbsent(FlowFrameworkIndex index, ActionListe
180
180
@ SuppressWarnings ("deprecation" )
181
181
ActionListener <CreateIndexResponse > actionListener = ActionListener .wrap (r -> {
182
182
if (r .isAcknowledged ()) {
183
- logger .info ("create index:{}" , indexName );
183
+ logger .info ("create index: {}" , indexName );
184
184
internalListener .onResponse (true );
185
185
} else {
186
186
internalListener .onResponse (false );
187
187
}
188
188
}, e -> {
189
- logger .error ("Failed to create index {}" , indexName , e );
190
- internalListener .onFailure (new FlowFrameworkException (e .getMessage (), ExceptionsHelper .status (e )));
189
+ String errorMessage = "Failed to create index " + indexName ;
190
+ logger .error (errorMessage , e );
191
+ internalListener .onFailure (new FlowFrameworkException (errorMessage , ExceptionsHelper .status (e )));
191
192
});
192
193
CreateIndexRequest request = new CreateIndexRequest (indexName ).mapping (mapping ).settings (indexSettings );
193
194
client .admin ().indices ().create (request , actionListener );
194
195
} else {
195
- logger .debug ("index:{} is already created" , indexName );
196
+ logger .debug ("index: {} is already created" , indexName );
196
197
if (indexMappingUpdated .containsKey (indexName ) && !indexMappingUpdated .get (indexName ).get ()) {
197
198
shouldUpdateIndex (indexName , index .getVersion (), ActionListener .wrap (r -> {
198
199
if (r ) {
@@ -223,10 +224,7 @@ public void initFlowFrameworkIndexIfAbsent(FlowFrameworkIndex index, ActionListe
223
224
String errorMessage = "Failed to update index setting for: " + indexName ;
224
225
logger .error (errorMessage , exception );
225
226
internalListener .onFailure (
226
- new FlowFrameworkException (
227
- errorMessage + " : " + exception .getMessage (),
228
- ExceptionsHelper .status (exception )
229
- )
227
+ new FlowFrameworkException (errorMessage , ExceptionsHelper .status (exception ))
230
228
);
231
229
}));
232
230
} else {
@@ -238,10 +236,7 @@ public void initFlowFrameworkIndexIfAbsent(FlowFrameworkIndex index, ActionListe
238
236
String errorMessage = "Failed to update index " + indexName ;
239
237
logger .error (errorMessage , exception );
240
238
internalListener .onFailure (
241
- new FlowFrameworkException (
242
- errorMessage + " : " + exception .getMessage (),
243
- ExceptionsHelper .status (exception )
244
- )
239
+ new FlowFrameworkException (errorMessage , ExceptionsHelper .status (exception ))
245
240
);
246
241
})
247
242
);
@@ -251,11 +246,9 @@ public void initFlowFrameworkIndexIfAbsent(FlowFrameworkIndex index, ActionListe
251
246
internalListener .onResponse (true );
252
247
}
253
248
}, e -> {
254
- String errorMessage = "Failed to update index mapping" ;
249
+ String errorMessage = "Failed to update index mapping for " + indexName ;
255
250
logger .error (errorMessage , e );
256
- internalListener .onFailure (
257
- new FlowFrameworkException (errorMessage + " : " + e .getMessage (), ExceptionsHelper .status (e ))
258
- );
251
+ internalListener .onFailure (new FlowFrameworkException (errorMessage , ExceptionsHelper .status (e )));
259
252
}));
260
253
} else {
261
254
// No need to update index if it's already updated.
@@ -265,7 +258,7 @@ public void initFlowFrameworkIndexIfAbsent(FlowFrameworkIndex index, ActionListe
265
258
} catch (Exception e ) {
266
259
String errorMessage = "Failed to init index " + indexName ;
267
260
logger .error (errorMessage , e );
268
- listener .onFailure (new FlowFrameworkException (errorMessage + " : " + e . getMessage () , ExceptionsHelper .status (e )));
261
+ listener .onFailure (new FlowFrameworkException (errorMessage , ExceptionsHelper .status (e )));
269
262
}
270
263
}
271
264
@@ -328,11 +321,11 @@ public void putTemplateToGlobalContext(Template template, ActionListener<IndexRe
328
321
client .index (request , ActionListener .runBefore (listener , context ::restore ));
329
322
} catch (Exception e ) {
330
323
String errorMessage = "Failed to index global_context index" ;
331
- logger .error (errorMessage );
332
- listener .onFailure (new FlowFrameworkException (errorMessage + " : " + e . getMessage () , ExceptionsHelper .status (e )));
324
+ logger .error (errorMessage , e );
325
+ listener .onFailure (new FlowFrameworkException (errorMessage , ExceptionsHelper .status (e )));
333
326
}
334
327
}, e -> {
335
- logger .error ("Failed to create global_context index" , e );
328
+ logger .error ("Failed to create global_context index" );
336
329
listener .onFailure (e );
337
330
}));
338
331
}
@@ -349,7 +342,7 @@ public void initializeConfigIndex(ActionListener<Boolean> listener) {
349
342
}
350
343
encryptorUtils .initializeMasterKey (listener );
351
344
}, createIndexException -> {
352
- logger .error ("Failed to create config index" , createIndexException );
345
+ logger .error ("Failed to create config index" );
353
346
listener .onFailure (createIndexException );
354
347
}));
355
348
}
@@ -385,13 +378,13 @@ public void putInitialStateToWorkflowState(String workflowId, User user, ActionL
385
378
} catch (Exception e ) {
386
379
String errorMessage = "Failed to put state index document" ;
387
380
logger .error (errorMessage , e );
388
- listener .onFailure (new FlowFrameworkException (errorMessage + " : " + e . getMessage () , ExceptionsHelper .status (e )));
381
+ listener .onFailure (new FlowFrameworkException (errorMessage , ExceptionsHelper .status (e )));
389
382
}
390
383
391
384
}, e -> {
392
385
String errorMessage = "Failed to create workflow_state index" ;
393
386
logger .error (errorMessage , e );
394
- listener .onFailure (new FlowFrameworkException (errorMessage + " : " + e . getMessage () , ExceptionsHelper .status (e )));
387
+ listener .onFailure (new FlowFrameworkException (errorMessage , ExceptionsHelper .status (e )));
395
388
}));
396
389
}
397
390
@@ -403,11 +396,9 @@ public void putInitialStateToWorkflowState(String workflowId, User user, ActionL
403
396
*/
404
397
public void updateTemplateInGlobalContext (String documentId , Template template , ActionListener <IndexResponse > listener ) {
405
398
if (!doesIndexExist (GLOBAL_CONTEXT_INDEX )) {
406
- String exceptionMessage = "Failed to update template for workflow_id : "
407
- + documentId
408
- + ", global_context index does not exist." ;
409
- logger .error (exceptionMessage );
410
- listener .onFailure (new FlowFrameworkException (exceptionMessage , RestStatus .BAD_REQUEST ));
399
+ String errorMessage = "Failed to update template for workflow_id : " + documentId + ", global_context index does not exist." ;
400
+ logger .error (errorMessage );
401
+ listener .onFailure (new FlowFrameworkException (errorMessage , RestStatus .BAD_REQUEST ));
411
402
return ;
412
403
}
413
404
doesTemplateExist (documentId , templateExists -> {
@@ -426,9 +417,7 @@ public void updateTemplateInGlobalContext(String documentId, Template template,
426
417
} catch (Exception e ) {
427
418
String errorMessage = "Failed to update global_context entry : " + documentId ;
428
419
logger .error (errorMessage , e );
429
- listener .onFailure (
430
- new FlowFrameworkException (errorMessage + " : " + e .getMessage (), ExceptionsHelper .status (e ))
431
- );
420
+ listener .onFailure (new FlowFrameworkException (errorMessage , ExceptionsHelper .status (e )));
432
421
}
433
422
} else {
434
423
String errorMessage = "The template has already been provisioned so it can't be updated: " + documentId ;
@@ -457,12 +446,14 @@ public <T> void doesTemplateExist(String documentId, Consumer<Boolean> booleanRe
457
446
try (ThreadContext .StoredContext context = client .threadPool ().getThreadContext ().stashContext ()) {
458
447
client .get (getRequest , ActionListener .wrap (response -> { booleanResultConsumer .accept (response .isExists ()); }, exception -> {
459
448
context .restore ();
460
- logger .error ("Failed to get template " + documentId , exception );
461
- listener .onFailure (new FlowFrameworkException (exception .getMessage (), ExceptionsHelper .status (exception )));
449
+ String errorMessage = "Failed to get template " + documentId ;
450
+ logger .error (errorMessage );
451
+ listener .onFailure (new FlowFrameworkException (errorMessage , ExceptionsHelper .status (exception )));
462
452
}));
463
453
} catch (Exception e ) {
464
- logger .error ("Failed to retrieve template from global context." , e );
465
- listener .onFailure (new FlowFrameworkException (e .getMessage (), ExceptionsHelper .status (e )));
454
+ String errorMessage = "Failed to retrieve template from global context: " + documentId ;
455
+ logger .error (errorMessage , e );
456
+ listener .onFailure (new FlowFrameworkException (errorMessage , ExceptionsHelper .status (e )));
466
457
}
467
458
}
468
459
@@ -490,17 +481,18 @@ public <T> void isWorkflowNotStarted(String documentId, Consumer<Boolean> boolea
490
481
WorkflowState workflowState = WorkflowState .parse (parser );
491
482
booleanResultConsumer .accept (workflowState .getProvisioningProgress ().equals (ProvisioningProgress .NOT_STARTED .name ()));
492
483
} catch (Exception e ) {
493
- String message = "Failed to parse workflow state " + documentId ;
494
- logger .error (message , e );
495
- listener .onFailure (new FlowFrameworkException (message , RestStatus .INTERNAL_SERVER_ERROR ));
484
+ String errorMessage = "Failed to parse workflow state " + documentId ;
485
+ logger .error (errorMessage , e );
486
+ listener .onFailure (new FlowFrameworkException (errorMessage , RestStatus .INTERNAL_SERVER_ERROR ));
496
487
}
497
488
}, exception -> {
498
- logger .error ("Failed to get workflow state " + documentId , exception );
489
+ logger .error ("Failed to get workflow state for {} " , documentId );
499
490
booleanResultConsumer .accept (false );
500
491
}));
501
492
} catch (Exception e ) {
502
- logger .error ("Failed to retrieve workflow state to check provisioning status" , e );
503
- listener .onFailure (new FlowFrameworkException (e .getMessage (), ExceptionsHelper .status (e )));
493
+ String errorMessage = "Failed to retrieve workflow state to check provisioning status" ;
494
+ logger .error (errorMessage , e );
495
+ listener .onFailure (new FlowFrameworkException (errorMessage , ExceptionsHelper .status (e )));
504
496
}
505
497
}
506
498
@@ -516,9 +508,9 @@ public void updateFlowFrameworkSystemIndexDoc(
516
508
ActionListener <UpdateResponse > listener
517
509
) {
518
510
if (!doesIndexExist (WORKFLOW_STATE_INDEX )) {
519
- String exceptionMessage = "Failed to update document for given workflow due to missing " + WORKFLOW_STATE_INDEX + " index" ;
520
- logger .error (exceptionMessage );
521
- listener .onFailure (new FlowFrameworkException (exceptionMessage , RestStatus .BAD_REQUEST ));
511
+ String errorMessage = "Failed to update document for given workflow due to missing " + WORKFLOW_STATE_INDEX + " index" ;
512
+ logger .error (errorMessage );
513
+ listener .onFailure (new FlowFrameworkException (errorMessage , RestStatus .BAD_REQUEST ));
522
514
} else {
523
515
try (ThreadContext .StoredContext context = client .threadPool ().getThreadContext ().stashContext ()) {
524
516
UpdateRequest updateRequest = new UpdateRequest (WORKFLOW_STATE_INDEX , documentId );
@@ -531,7 +523,7 @@ public void updateFlowFrameworkSystemIndexDoc(
531
523
} catch (Exception e ) {
532
524
String errorMessage = "Failed to update " + WORKFLOW_STATE_INDEX + " entry : " + documentId ;
533
525
logger .error (errorMessage , e );
534
- listener .onFailure (new FlowFrameworkException (errorMessage + " : " + e . getMessage () , ExceptionsHelper .status (e )));
526
+ listener .onFailure (new FlowFrameworkException (errorMessage , ExceptionsHelper .status (e )));
535
527
}
536
528
}
537
529
}
@@ -550,9 +542,9 @@ public void updateFlowFrameworkSystemIndexDocWithScript(
550
542
ActionListener <UpdateResponse > listener
551
543
) {
552
544
if (!doesIndexExist (indexName )) {
553
- String exceptionMessage = "Failed to update document for given workflow due to missing " + indexName + " index" ;
554
- logger .error (exceptionMessage );
555
- listener .onFailure (new Exception (exceptionMessage ));
545
+ String errorMessage = "Failed to update document for given workflow due to missing " + indexName + " index" ;
546
+ logger .error (errorMessage );
547
+ listener .onFailure (new Exception (errorMessage ));
556
548
} else {
557
549
try (ThreadContext .StoredContext context = client .threadPool ().getThreadContext ().stashContext ()) {
558
550
UpdateRequest updateRequest = new UpdateRequest (indexName , documentId );
@@ -563,10 +555,9 @@ public void updateFlowFrameworkSystemIndexDocWithScript(
563
555
// TODO: Implement our own concurrency control to improve on retry mechanism
564
556
client .update (updateRequest , ActionListener .runBefore (listener , context ::restore ));
565
557
} catch (Exception e ) {
566
- logger .error ("Failed to update {} entry : {}. {}" , indexName , documentId , e .getMessage ());
567
- listener .onFailure (
568
- new FlowFrameworkException ("Failed to update " + indexName + "entry: " + documentId , ExceptionsHelper .status (e ))
569
- );
558
+ String errorMessage = "Failed to update " + indexName + " entry : " + documentId ;
559
+ logger .error (errorMessage , e );
560
+ listener .onFailure (new FlowFrameworkException (errorMessage , ExceptionsHelper .status (e )));
570
561
}
571
562
}
572
563
}
0 commit comments