@@ -29,13 +29,13 @@ qvi_pthread_group::m_start_init_by_a_single_thread(
29
29
int rc = pthread_barrier_init (&m_barrier, nullptr , m_size);
30
30
if (qvi_unlikely (rc != 0 )) return rc;
31
31
32
- m_gather_data = new qvi_bbuff *[m_size]() ;
32
+ m_gather_data = new qvi_bbuff *[m_size];
33
33
for (int i = 0 ; i < group_size ; i++) {
34
34
rc = qvi_bbuff_new (&m_gather_data[i]);
35
35
if (qvi_unlikely (rc != QV_SUCCESS)) return rc;
36
36
}
37
- m_scatter_data = new qvi_bbuff**() ;
38
- m_ckrs = new qvi_subgroup_color_key_rank[m_size]( );
37
+ m_scatter_data = new qvi_bbuff**;
38
+ m_ckrs. resize (m_size );
39
39
40
40
return QV_SUCCESS;
41
41
}
@@ -108,23 +108,6 @@ qvi_pthread_group::qvi_pthread_group(
108
108
m_context->groupid2thgroup .insert ({mygid, this });
109
109
}
110
110
111
- void *
112
- qvi_pthread_group::call_first_from_pthread_create (
113
- void *arg
114
- ) {
115
- auto args = (qvi_pthread_group_pthread_create_args *)arg;
116
- const qvi_pthread_routine_fun_ptr_t thread_routine = args->throutine ;
117
- void *const th_routine_argp = args->throutine_argp ;
118
-
119
- const int rc = m_finish_init_by_all_threads (args->group );
120
- // TODO(skg) Is this the correct thing to do? Shall we return something?
121
- if (qvi_unlikely (rc != QV_SUCCESS)) throw qvi_runtime_error ();
122
- // Free the provided argument container.
123
- qvi_delete (&args);
124
- // Finally, call the specified thread routine.
125
- return thread_routine (th_routine_argp);
126
- }
127
-
128
111
qvi_pthread_group::~qvi_pthread_group (void )
129
112
{
130
113
std::lock_guard<std::mutex> guard (m_mutex);
@@ -141,11 +124,32 @@ qvi_pthread_group::~qvi_pthread_group(void)
141
124
}
142
125
143
126
delete m_scatter_data;
144
- delete[] m_ckrs;
145
127
146
128
pthread_barrier_destroy (&m_barrier);
147
129
}
148
130
131
+ void *
132
+ qvi_pthread_group::call_first_from_pthread_create (
133
+ void *arg
134
+ ) {
135
+ auto args = (qvi_pthread_group_pthread_create_args *)arg;
136
+ const qvi_pthread_routine_fun_ptr_t thread_routine = args->throutine ;
137
+ void *const th_routine_argp = args->throutine_argp ;
138
+
139
+ const int rc = m_finish_init_by_all_threads (args->group );
140
+ if (qvi_unlikely (rc != QV_SUCCESS)) {
141
+ qvi_log_error (
142
+ " An error occurred in m_finish_init_by_all_threads(): {} ({})" ,
143
+ rc, qv_strerr (rc)
144
+ );
145
+ throw qvi_runtime_error ();
146
+ }
147
+ // Free the provided argument container.
148
+ qvi_delete (&args);
149
+ // Finally, call the specified thread routine.
150
+ return thread_routine (th_routine_argp);
151
+ }
152
+
149
153
int
150
154
qvi_pthread_group::size (void )
151
155
{
@@ -201,9 +205,9 @@ qvi_pthread_group::m_subgroup_info(
201
205
// Sort the color/key/rank array. First according to color, then by key,
202
206
// but in the same color realm. If color and key are identical, sort by
203
207
// the rank from given group.
204
- std::sort (m_ckrs, m_ckrs + m_size , qvi_subgroup_color_key_rank::by_color);
205
- std::sort (m_ckrs, m_ckrs + m_size , qvi_subgroup_color_key_rank::by_key);
206
- std::sort (m_ckrs, m_ckrs + m_size , qvi_subgroup_color_key_rank::by_rank);
208
+ std::sort (m_ckrs. begin () , m_ckrs. end () , qvi_subgroup_color_key_rank::by_color);
209
+ std::sort (m_ckrs. begin () , m_ckrs. end () , qvi_subgroup_color_key_rank::by_key);
210
+ std::sort (m_ckrs. begin () , m_ckrs. end () , qvi_subgroup_color_key_rank::by_rank);
207
211
// Calculate the number of distinct colors provided.
208
212
std::set<int > color_set;
209
213
for (int i = 0 ; i < m_size; ++i) {
@@ -293,21 +297,21 @@ qvi_pthread_group::gather(
293
297
qvi_bbuff_alloc_type_t *alloc_type,
294
298
qvi_bbuff ***rxbuffs
295
299
) {
300
+ int rc = QV_SUCCESS;
296
301
const int myrank = rank ();
297
- // I'm not sure why this barrier is needed, but it seems to help...
302
+
298
303
barrier ();
299
- int rc = QV_SUCCESS;
300
304
{
301
305
std::lock_guard<std::mutex> guard (m_mutex);
302
306
rc = qvi_bbuff_copy (*txbuff, m_gather_data[myrank]);
303
307
*alloc_type = QVI_BBUFF_ALLOC_SHARED_GLOBAL;
304
308
}
305
- // Need to ensure that all threads have contributed to m_data_g
309
+ // Ensure that all threads have contributed to m_gather_data.
306
310
barrier ();
307
311
308
312
if (qvi_unlikely (rc != QV_SUCCESS)) {
309
313
*rxbuffs = nullptr ;
310
- return QV_ERR_INTERNAL ;
314
+ return rc ;
311
315
}
312
316
*rxbuffs = m_gather_data;
313
317
return rc;
@@ -336,7 +340,7 @@ qvi_pthread_group::scatter(
336
340
337
341
if (qvi_unlikely (rc != QV_SUCCESS)) {
338
342
qvi_bbuff_delete (&mybbuff);
339
- return QV_ERR_INTERNAL ;
343
+ return rc ;
340
344
}
341
345
*rxbuff = mybbuff;
342
346
return rc;
0 commit comments