@@ -192,8 +192,6 @@ qvi_pthread_group::m_subgroup_info(
192
192
int rc = QV_SUCCESS;
193
193
const int master_rank = 0 ;
194
194
const int my_rank = rank ();
195
- // TODO(skg)
196
- // qvi_log_debug("color={}, key={}, rank={}", color, key, my_rank);
197
195
// Gather colors and keys from ALL threads in the parent group.
198
196
// NOTE: this (i.e., the caller) is a member of the parent group).
199
197
m_ckrs[my_rank].color = color;
@@ -277,9 +275,10 @@ qvi_pthread_group::split(
277
275
const qvi_group_id_t mygid = m_subgroup_gids[sginfo.index ];
278
276
ichild = m_context->groupid2thgroup .at (mygid);
279
277
}
278
+ // Now we can check if errors happened above.
279
+ if (qvi_unlikely (rc != QV_SUCCESS)) goto out;
280
280
281
281
rc = m_finish_init_by_all_threads (ichild);
282
-
283
282
out:
284
283
if (qvi_unlikely (rc != QV_SUCCESS)) {
285
284
qvi_delete (&ichild);
@@ -295,16 +294,22 @@ qvi_pthread_group::gather(
295
294
qvi_bbuff_alloc_type_t *alloc_type,
296
295
qvi_bbuff ***rxbuffs
297
296
) {
298
- const int rc = qvi_bbuff_copy (*txbuff, m_data_g[rank ()]);
297
+ const int myrank = rank ();
298
+ // I'm not sure why this barrier is needed, but it seems to help...
299
+ barrier ();
300
+ int rc = QV_SUCCESS;
301
+ {
302
+ std::lock_guard<std::mutex> guard (m_mutex);
303
+ rc = qvi_bbuff_copy (*txbuff, m_data_g[myrank]);
304
+ *alloc_type = QVI_BBUFF_ALLOC_SHARED_GLOBAL;
305
+ }
299
306
// Need to ensure that all threads have contributed to m_data_g
300
- pthread_barrier_wait (&m_barrier);
301
- *alloc_type = QVI_BBUFF_ALLOC_SHARED_GLOBAL;
307
+ barrier ();
302
308
303
309
if (qvi_unlikely (rc != QV_SUCCESS)) {
304
310
*rxbuffs = nullptr ;
305
311
return QV_ERR_INTERNAL;
306
312
}
307
-
308
313
*rxbuffs = m_data_g;
309
314
return rc;
310
315
}
@@ -315,16 +320,20 @@ qvi_pthread_group::scatter(
315
320
int rootid,
316
321
qvi_bbuff **rxbuff
317
322
) {
323
+ int rc = QV_SUCCESS;
318
324
const int myrank = rank ();
325
+ qvi_bbuff *mybbuff = nullptr ;
319
326
320
327
if (rootid == myrank) {
321
328
*m_data_s = txbuffs;
322
329
}
323
- pthread_barrier_wait (&m_barrier);
324
330
325
- qvi_bbuff *mybbuff = nullptr ;
326
- const int rc = qvi_bbuff_dup ( *((*m_data_s)[myrank]), &mybbuff);
327
- pthread_barrier_wait (&m_barrier);
331
+ barrier ();
332
+ {
333
+ std::lock_guard<std::mutex> guard (m_mutex);
334
+ rc = qvi_bbuff_dup (*((*m_data_s)[myrank]), &mybbuff);
335
+ }
336
+ barrier ();
328
337
329
338
if (qvi_unlikely (rc != QV_SUCCESS)) {
330
339
qvi_bbuff_delete (&mybbuff);
0 commit comments