17
17
#include " qvi-utils.h"
18
18
19
19
int
20
- qvi_pthread_group::m_init_by_a_single_thread (
20
+ qvi_pthread_group::m_start_init_by_a_single_thread (
21
21
qvi_pthread_group_context *ctx,
22
22
int group_size
23
23
) {
24
- m_context = ctx ;
24
+ assert (ctx && group_size > 0 ) ;
25
25
26
+ m_context = ctx;
26
27
m_size = group_size;
27
28
28
- int rc = pthread_barrier_init (&m_barrier, NULL , m_size);
29
- if (qvi_unlikely (rc != 0 )) throw qvi_runtime_error () ;
29
+ int rc = pthread_barrier_init (&m_barrier, nullptr , m_size);
30
+ if (qvi_unlikely (rc != 0 )) return rc ;
30
31
31
- m_data_g = new qvi_bbuff *[m_size]();
32
+ m_gather_data = new qvi_bbuff *[m_size]();
32
33
for (int i = 0 ; i < group_size ; i++) {
33
- const int rc = qvi_bbuff_new (&m_data_g [i]);
34
- if (qvi_unlikely (rc != 0 )) throw qvi_runtime_error () ;
34
+ rc = qvi_bbuff_new (&m_gather_data [i]);
35
+ if (qvi_unlikely (rc != QV_SUCCESS )) return rc ;
35
36
}
36
- m_data_s = new qvi_bbuff**();
37
+ m_scatter_data = new qvi_bbuff**();
37
38
m_ckrs = new qvi_subgroup_color_key_rank[m_size]();
38
39
39
- return rc ;
40
+ return QV_SUCCESS ;
40
41
}
41
42
42
43
/* static */ int
@@ -52,7 +53,7 @@ qvi_pthread_group::m_finish_init_by_all_threads(
52
53
group->m_tids .push_back (mytid);
53
54
}
54
55
// Make sure they all contribute before continuing.
55
- pthread_barrier_wait (& group->m_barrier );
56
+ group->barrier ( );
56
57
// Elect one thread to be the worker.
57
58
bool worker = false ;
58
59
{
@@ -61,14 +62,15 @@ qvi_pthread_group::m_finish_init_by_all_threads(
61
62
}
62
63
// The worker populates the TID to rank mapping, while the others wait.
63
64
if (worker) {
65
+ std::lock_guard<std::mutex> guard (group->m_mutex );
64
66
std::sort (group->m_tids .begin (), group->m_tids .end ());
65
67
66
68
for (int i = 0 ; i < group->m_size ; ++i) {
67
69
const pid_t tidi = group->m_tids [i];
68
70
group->m_tid2rank .insert ({tidi, i});
69
71
}
70
72
}
71
- pthread_barrier_wait (& group->m_barrier );
73
+ group->barrier ( );
72
74
// Everyone can now create their task and populate the mapping table.
73
75
{
74
76
std::lock_guard<std::mutex> guard (group->m_mutex );
@@ -78,27 +80,28 @@ qvi_pthread_group::m_finish_init_by_all_threads(
78
80
group->m_tid2task .insert ({mytid, task});
79
81
}
80
82
// Make sure they all finish before returning.
81
- pthread_barrier_wait (& group->m_barrier );
83
+ group->barrier ( );
82
84
return rc;
83
85
}
84
86
85
87
qvi_pthread_group::qvi_pthread_group (
86
88
qvi_pthread_group_context *ctx,
87
89
int group_size
88
90
) {
89
- const int rc = m_init_by_a_single_thread (ctx, group_size);
91
+ const int rc = m_start_init_by_a_single_thread (ctx, group_size);
90
92
if (qvi_unlikely (rc != QV_SUCCESS)) throw qvi_runtime_error ();
91
- // TODO(skg) Add to group table context.
92
93
}
93
94
94
95
qvi_pthread_group::qvi_pthread_group (
95
96
qvi_pthread_group *parent_group,
96
97
const qvi_subgroup_info &sginfo
97
98
) {
98
- assert (sginfo.rank == 0 );
99
+ assert (sginfo.rank == qvi_subgroup_info::master_rank );
99
100
100
101
std::lock_guard<std::mutex> guard (parent_group->m_mutex );
101
- const int rc = m_init_by_a_single_thread (parent_group->m_context , sginfo.size );
102
+ const int rc = m_start_init_by_a_single_thread (
103
+ parent_group->m_context , sginfo.size
104
+ );
102
105
if (qvi_unlikely (rc != QV_SUCCESS)) throw qvi_runtime_error ();
103
106
104
107
const qvi_group_id_t mygid = parent_group->m_subgroup_gids [sginfo.index ];
@@ -124,30 +127,23 @@ qvi_pthread_group::call_first_from_pthread_create(
124
127
125
128
qvi_pthread_group::~qvi_pthread_group (void )
126
129
{
127
- // TODO(skg)
128
- return ;
129
-
130
130
std::lock_guard<std::mutex> guard (m_mutex);
131
+
131
132
for (auto &tt : m_tid2task) {
132
133
qvi_delete (&tt.second );
133
134
}
134
- pthread_barrier_destroy (&m_barrier);
135
135
136
- if (m_data_g ) {
136
+ if (m_gather_data ) {
137
137
for (int i = 0 ; i < m_size; ++i) {
138
- qvi_bbuff_delete (&m_data_g [i]);
138
+ qvi_bbuff_delete (&m_gather_data [i]);
139
139
}
140
- delete[] m_data_g ;
140
+ delete[] m_gather_data ;
141
141
}
142
142
143
- // C++ flavor
144
- /*
145
- for (auto &tt : m_data) {
146
- qvi_delete(&tt);
147
- }
148
- */
149
- delete m_data_s;
143
+ delete m_scatter_data;
150
144
delete[] m_ckrs;
145
+
146
+ pthread_barrier_destroy (&m_barrier);
151
147
}
152
148
153
149
int
@@ -216,8 +212,8 @@ qvi_pthread_group::m_subgroup_info(
216
212
const size_t ncolors = color_set.size ();
217
213
m_ckrs[my_rank].ncolors = ncolors;
218
214
// Now that we know the number of distinct colors, populate the
219
- // sub-group IDs. Set rc here and continue until return so we don't hang
220
- // on an error path.
215
+ // sub-group IDs. Set rc here and continue until after the next barrier
216
+ // so we don't hang on an error path.
221
217
rc = qvi_group::next_ids (ncolors, m_subgroup_gids);
222
218
}
223
219
// All threads wait for the number of colors to be computed.
@@ -265,7 +261,9 @@ qvi_pthread_group::split(
265
261
qvi_subgroup_info sginfo;
266
262
int rc = m_subgroup_info (color, key, sginfo);
267
263
if (qvi_unlikely (rc != QV_SUCCESS)) goto out;
268
- if (sginfo.rank == 0 ) {
264
+ // One thread creates the child group. The rest wait for the instance and
265
+ // later grab a pointer to their group based on the sub-group index.
266
+ if (sginfo.rank == qvi_subgroup_info::master_rank) {
269
267
// Recall this is the parent group.
270
268
rc = qvi_new (&ichild, this , sginfo);
271
269
barrier ();
@@ -274,10 +272,11 @@ qvi_pthread_group::split(
274
272
barrier ();
275
273
const qvi_group_id_t mygid = m_subgroup_gids[sginfo.index ];
276
274
ichild = m_context->groupid2thgroup .at (mygid);
275
+ ichild->retain ();
277
276
}
278
277
// Now we can check if errors happened above.
279
278
if (qvi_unlikely (rc != QV_SUCCESS)) goto out;
280
-
279
+ // Collectively finish child instance initialization.
281
280
rc = m_finish_init_by_all_threads (ichild);
282
281
out:
283
282
if (qvi_unlikely (rc != QV_SUCCESS)) {
@@ -300,7 +299,7 @@ qvi_pthread_group::gather(
300
299
int rc = QV_SUCCESS;
301
300
{
302
301
std::lock_guard<std::mutex> guard (m_mutex);
303
- rc = qvi_bbuff_copy (*txbuff, m_data_g [myrank]);
302
+ rc = qvi_bbuff_copy (*txbuff, m_gather_data [myrank]);
304
303
*alloc_type = QVI_BBUFF_ALLOC_SHARED_GLOBAL;
305
304
}
306
305
// Need to ensure that all threads have contributed to m_data_g
@@ -310,7 +309,7 @@ qvi_pthread_group::gather(
310
309
*rxbuffs = nullptr ;
311
310
return QV_ERR_INTERNAL;
312
311
}
313
- *rxbuffs = m_data_g ;
312
+ *rxbuffs = m_gather_data ;
314
313
return rc;
315
314
}
316
315
@@ -325,13 +324,13 @@ qvi_pthread_group::scatter(
325
324
qvi_bbuff *mybbuff = nullptr ;
326
325
327
326
if (rootid == myrank) {
328
- *m_data_s = txbuffs;
327
+ *m_scatter_data = txbuffs;
329
328
}
330
329
331
330
barrier ();
332
331
{
333
332
std::lock_guard<std::mutex> guard (m_mutex);
334
- rc = qvi_bbuff_dup (*((*m_data_s )[myrank]), &mybbuff);
333
+ rc = qvi_bbuff_dup (*((*m_scatter_data )[myrank]), &mybbuff);
335
334
}
336
335
barrier ();
337
336
0 commit comments