@@ -55,6 +55,8 @@ struct ompi_continuation_t {
55
55
opal_atomic_int32_t cont_failed ; /**< the continution is failed */
56
56
opal_atomic_int32_t cont_request_check ; /**< flag set by the failed continuation handler to block
57
57
* completing threads from freeing their request */
58
+ int cont_rc ; /** return code to be passed to callback */
59
+ bool cont_invoke_failed ; /** if true, failed continuations will be invoked and passed the error code */
58
60
};
59
61
60
62
/* Convenience typedef */
@@ -70,6 +72,8 @@ static void ompi_continuation_construct(ompi_continuation_t* cont)
70
72
cont -> cont_opreqs = NULL ;
71
73
cont -> cont_failed = 0 ;
72
74
cont -> cont_request_check = 0 ;
75
+ cont -> cont_rc = MPI_SUCCESS ;
76
+ cont -> cont_invoke_failed = false;
73
77
}
74
78
75
79
static void ompi_continuation_destruct (ompi_continuation_t * cont )
@@ -306,7 +310,7 @@ int ompi_continue_cont_invoke(ompi_continuation_t *cont)
306
310
307
311
MPIX_Continue_cb_function * fn = cont -> cont_cb ;
308
312
void * cont_data = cont -> cont_data ;
309
- int rc = fn (MPI_SUCCESS , cont_data );
313
+ int rc = fn (cont -> cont_rc , cont_data );
310
314
ompi_continue_cont_release (cont , rc );
311
315
return rc ;
312
316
}
@@ -711,22 +715,31 @@ static void handle_failed_cont(ompi_continuation_t *cont, int status, bool have_
711
715
/* wait for other threads in request_completion_cb to decrement the counter */
712
716
cont -> cont_request_check = 0 ;
713
717
while (cont -> cont_num_active != 1 ) { }
714
- cont -> cont_num_active = 0 ;
715
718
}
716
719
opal_list_remove_item (& cont_req -> cont_incomplete_list , & cont -> super .super );
717
- opal_list_append (& cont_req -> cont_failed_list , & cont -> super .super );
718
- int32_t num_active = OPAL_THREAD_ADD_FETCH32 (& cont_req -> cont_num_active , -1 );
719
- if (MPI_SUCCESS == cont_req -> super .req_status .MPI_ERROR ) {
720
- cont_req -> super .req_status .MPI_ERROR = status ;
721
- cont_req -> cont_errorinfo .mpi_object = error_object ;
722
- cont_req -> cont_errorinfo .type = error_object_type ;
723
- }
724
720
725
- if (0 == num_active ) {
726
- opal_atomic_wmb ();
727
- ompi_request_complete (& cont_req -> super , true);
728
- }
721
+ if (cont -> cont_invoke_failed ) {
722
+ /* make sure all requests have completed and enqueue the continuation for execution */
723
+ cont -> cont_rc = status ;
724
+ if (0 == OPAL_THREAD_ADD_FETCH32 (& cont -> cont_num_active , -1 )) {
725
+ ompi_continue_enqueue_runnable (cont );
726
+ }
727
+ } else {
728
+ cont -> cont_num_active = 0 ;
729
+ /* put the continuation into the list of failed continuations */
730
+ opal_list_append (& cont_req -> cont_failed_list , & cont -> super .super );
731
+ int32_t num_active = OPAL_THREAD_ADD_FETCH32 (& cont_req -> cont_num_active , -1 );
732
+ if (MPI_SUCCESS == cont_req -> super .req_status .MPI_ERROR ) {
733
+ cont_req -> super .req_status .MPI_ERROR = status ;
734
+ cont_req -> cont_errorinfo .mpi_object = error_object ;
735
+ cont_req -> cont_errorinfo .type = error_object_type ;
736
+ }
729
737
738
+ if (0 == num_active ) {
739
+ opal_atomic_wmb ();
740
+ ompi_request_complete (& cont_req -> super , true);
741
+ }
742
+ }
730
743
if (!have_cont_req_lock ) {
731
744
opal_atomic_unlock (& cont_req -> cont_lock );
732
745
}
@@ -770,7 +783,6 @@ static int request_completion_cb(ompi_request_t *request)
770
783
ompi_request_free (& request );
771
784
}
772
785
opal_atomic_wmb ();
773
- //int32_t num_active = OPAL_THREAD_ADD_FETCH32(&cont->cont_num_active, -1);
774
786
775
787
if (1 == cont -> cont_num_active || 0 == OPAL_THREAD_ADD_FETCH32 (& cont -> cont_num_active , -1 )) {
776
788
/* the continuation is ready for execution */
@@ -783,7 +795,14 @@ static int request_completion_cb(ompi_request_t *request)
783
795
handle_failed_cont (cont , request -> req_status .MPI_ERROR , false);
784
796
} else {
785
797
/* someone else handles the fault, so just signal that we're done with the continuation object */
786
- OPAL_THREAD_ADD_FETCH32 (& cont -> cont_num_active , -1 );
798
+ if (1 == cont -> cont_num_active || 0 == OPAL_THREAD_ADD_FETCH32 (& cont -> cont_num_active , -1 )) {
799
+ /* we're responsible for enqueuing the continuation for execution if:
800
+ * 1) we don't have access to the requests (handle_failed_cont couldn't handle all pending requests); and
801
+ * 2) this is the last outstanding request
802
+ */
803
+ cont -> cont_num_active = 0 ;
804
+ ompi_continue_enqueue_runnable (cont );
805
+ }
787
806
}
788
807
789
808
opal_free_list_return (& ompi_request_cont_data_freelist , & req_cont_data -> super );
@@ -824,6 +843,7 @@ int ompi_continue_global_wakeup(int status) {
824
843
}
825
844
826
845
opal_mutex_atomic_unlock (& cont_req_list_mtx );
846
+ return OMPI_SUCCESS ;
827
847
}
828
848
829
849
int ompi_continue_attach (
@@ -839,8 +859,9 @@ int ompi_continue_attach(
839
859
return OMPI_ERR_REQUEST ;
840
860
}
841
861
842
- bool req_volatile = (flags & MPIX_CONT_REQBUF_VOLATILE );
862
+ bool req_volatile = (flags & MPIX_CONT_REQBUF_VOLATILE );
843
863
bool defer_complete = (flags & MPIX_CONT_DEFER_COMPLETE );
864
+ bool invoke_failed = (flags & MPIX_CONT_INVOKE_FAILED ) | (flags & MPIX_CONT_REQBUF_VOLATILE );
844
865
845
866
ompi_cont_request_t * cont_req = (ompi_cont_request_t * )continuation_request ;
846
867
ompi_continuation_t * cont = ompi_continue_cont_create (count , cont_req , cont_cb ,
@@ -857,6 +878,8 @@ int ompi_continue_attach(
857
878
reset_requests = false;
858
879
}
859
880
881
+ cont -> cont_invoke_failed = invoke_failed ;
882
+
860
883
/* memory barrier to make sure a thread completing a request see
861
884
* a correct continuation object */
862
885
opal_atomic_wmb ();
0 commit comments