@@ -94,7 +94,7 @@ ldms_t setup_connection(const char *xprt, const char *host,
94
94
ts.tv_nsec = 0 ;
95
95
}
96
96
97
- ldms_g = ldms_xprt_new_with_auth (xprt, auth, NULL );
97
+ ldms_g = ldms_xprt_new_with_auth (xprt, NULL , auth, NULL );
98
98
if (!ldms_g) {
99
99
printf (" Error %d creating the '%s' transport\n " ,
100
100
errno, xprt);
@@ -127,8 +127,8 @@ void caliper_ldms_connector_initialize()
127
127
/* Check/set LDMS transport type */
128
128
if (!env_ldms_xprt || !env_ldms_host || !env_ldms_port || !env_ldms_auth){
129
129
Log (1 ).stream () << " Either the transport, host, port or authentication is not given. Setting to default.\n " ;
130
-
131
- if (env_ldms_xprt == NULL )
130
+
131
+ if (env_ldms_xprt == NULL )
132
132
env_ldms_xprt = " sock" ;
133
133
134
134
if (env_ldms_host == NULL )
@@ -139,7 +139,7 @@ void caliper_ldms_connector_initialize()
139
139
140
140
if (env_ldms_auth == NULL )
141
141
env_ldms_auth = " munge" ;
142
-
142
+
143
143
}
144
144
145
145
pthread_mutex_lock (&ln_lock);
@@ -158,28 +158,26 @@ void caliper_ldms_connector_initialize()
158
158
return ;
159
159
}
160
160
161
- std::ostream& write_ldms_record (std::ostream& os, int mpi_rank, RegionProfile& profile)
161
+ void write_ldms_record (int mpi_rank, RegionProfile& profile)
162
162
{
163
-
164
- caliper_ldms_connector_initialize ();
163
+ caliper_ldms_connector_initialize ();
165
164
166
165
std::map<std::string, double > region_times;
167
166
double total_time = 0 ;
168
-
169
167
170
168
int buffer_size = 4096 ;
171
169
char * buffer = (char *) malloc (sizeof (char ) * buffer_size);
172
-
170
+
173
171
const char * env_ldms_jobid_str = getenv (" SLURM_JOB_ID" );
174
172
const char * env_ldms_procid = getenv (" SLURM_PROCID" );
175
173
const char * env_ldms_slurm_nodelist = getenv (" SLURM_JOB_NODELIST" );
176
174
const char * env_ldms_caliper_verbose_str = getenv (" CALIPER_LDMS_VERBOSE" );
177
175
178
176
int env_ldms_jobid = env_ldms_jobid_str == NULL ? 0 : atoi ( env_ldms_jobid_str );
179
- int env_ldms_caliper_verbose = env_ldms_caliper_verbose_str == NULL ? 0 : atoi ( env_ldms_caliper_verbose_str );
177
+ int env_ldms_caliper_verbose = env_ldms_caliper_verbose_str == NULL ? 0 : atoi ( env_ldms_caliper_verbose_str );
180
178
181
179
std::tie (region_times, std::ignore, total_time) =
182
- profile.inclusive_path_profile ();
180
+ profile.inclusive_region_times ();
183
181
184
182
double unix_ts = 1e-6 * std::chrono::duration_cast<std::chrono::microseconds>(
185
183
std::chrono::system_clock::now ().time_since_epoch ()).count ();
@@ -189,78 +187,63 @@ std::ostream& write_ldms_record(std::ostream& os, int mpi_rank, RegionProfile& p
189
187
if (p.second < 0.05 * total_time)
190
188
continue ;
191
189
192
- os << " { \" timestamp\" : " << std::fixed << unix_ts
193
- << " , \" duration\" : " << std::fixed << p.second ;
194
-
195
- util::write_esc_string (os << " , \" path\" : \" " , p.first ) << " \" " ;
196
-
197
- std::string path_msg = " " + p.first ;
198
- std::string path = path_msg.c_str ();
190
+ // std::string path_msg = "" + p.first;
191
+ const char * path = p.first .c_str ();
199
192
200
193
if (mpi_rank >= 0 ) {
201
- os << " , \" rank\" : " << mpi_rank;
202
- snprintf (buffer, buffer_size, " { \" timestamp\" : %f, \" jobid\" : %d, \" rank\" : %d, \" procid\" : %d, \" nodelist\" : %s, \" caliper-perf-data\" , \" duration\" : %f, \" path\" : \" %s\" } \n " , unix_ts, env_ldms_jobid, mpi_rank, env_ldms_procid, env_ldms_slurm_nodelist, p.second , path);
203
- } else {
204
- snprintf (buffer, buffer_size, " { \" timestamp\" : %f, \" jobid\" : %d, \" rank\" : %d, \" procid\" : %d, \" nodelist\" : %s, \" caliper-perf-data\" , \" duration\" : %f, \" path\" : \" %s\" } \n " , unix_ts, env_ldms_jobid, 0 , env_ldms_procid, env_ldms_slurm_nodelist, p.second , path);
205
- }
206
- os << " } \n " ;
207
-
208
- if (env_ldms_caliper_verbose > 0 )
209
- puts (buffer);
194
+ snprintf (buffer, buffer_size, " { \" timestamp\" : %f, \" jobid\" : %d, \" rank\" : %d, \" procid\" : %s, \" nodelist\" : %s, \" caliper-perf-data\" , \" duration\" : %f, \" path\" : \" %s\" } \n " , unix_ts, env_ldms_jobid, mpi_rank, env_ldms_procid, env_ldms_slurm_nodelist, p.second , path);
195
+ } else {
196
+ snprintf (buffer, buffer_size, " { \" timestamp\" : %f, \" jobid\" : %d, \" rank\" : %d, \" procid\" : %s, \" nodelist\" : %s, \" caliper-perf-data\" , \" duration\" : %f, \" path\" : \" %s\" } \n " , unix_ts, env_ldms_jobid, 0 , env_ldms_procid, env_ldms_slurm_nodelist, p.second , path);
197
+ }
210
198
211
- int rc = ldmsd_stream_publish ( ldms_cali, " caliper-perf-data" , LDMSD_STREAM_JSON, buffer, strlen (buffer) + 1 );
212
- if (rc)
213
- Log (0 ).stream () << " Error " << rc << " publishing data.\n " ;
199
+ if (env_ldms_caliper_verbose > 0 )
200
+ puts (buffer);
214
201
215
- else if (env_ldms_caliper_verbose > 0 )
216
- Log (2 ).stream () << " Caliper Message published successfully\n " ;
217
- }
202
+ int rc = ldmsd_stream_publish ( ldms_cali, " caliper-perf-data" , LDMSD_STREAM_JSON, buffer, strlen (buffer) + 1 );
218
203
219
- return os;
204
+ if (rc)
205
+ Log (0 ).stream () << " Error " << rc << " publishing data.\n " ;
206
+ else if (env_ldms_caliper_verbose > 0 )
207
+ Log (2 ).stream () << " Caliper Message published successfully\n " ;
208
+ }
220
209
}
221
210
222
211
class LdmsForwarder
223
212
{
224
213
RegionProfile profile;
225
- OutputStream stream;
226
-
227
- std::string filename;
228
214
229
215
void snapshot (Caliper* c, Channel*) {
230
216
Entry e = c->get (c->get_attribute (" mpi.rank" ));
231
217
int rank = e.empty () ? -1 : e.value ().to_int ();
232
218
233
- write_ldms_record (*stream. stream (), rank, profile) << " \n " ;
219
+ write_ldms_record (rank, profile);
234
220
235
221
profile.clear (); // reset profile - skip to create a cumulative profile
236
222
}
237
223
238
224
void post_init (Caliper* c, Channel* channel) {
239
- std::vector<Entry> rec;
240
- stream.set_filename (filename.c_str (), *c, rec);
241
225
profile.start ();
242
226
}
243
227
244
- LdmsForwarder (const char * fname)
245
- : filename { fname }
228
+ LdmsForwarder ()
246
229
{ }
247
230
248
231
public:
249
-
232
+
250
233
static const char * s_spec;
251
234
252
235
static void create (Caliper* c, Channel* channel) {
253
- ConfigSet cfg =
236
+ ConfigSet cfg =
254
237
services::init_config_from_spec (channel->config (), s_spec);
255
238
256
- LdmsForwarder* instance = new LdmsForwarder (cfg. get ( " filename " ). to_string (). c_str () );
239
+ LdmsForwarder* instance = new LdmsForwarder ();
257
240
258
241
channel->events ().post_init_evt .connect (
259
242
[instance](Caliper* c, Channel* channel){
260
243
instance->post_init (c, channel);
261
244
});
262
245
channel->events ().snapshot .connect (
263
- [instance](Caliper* c, Channel* channel, int scope, SnapshotView, SnapshotBuilder&){
246
+ [instance](Caliper* c, Channel* channel, SnapshotView, SnapshotBuilder&){
264
247
instance->snapshot (c, channel);
265
248
});
266
249
channel->events ().finish_evt .connect (
@@ -271,21 +254,15 @@ class LdmsForwarder
271
254
Log (1 ).stream () << channel->name () << " Initialized LDMS forwarder\n " ;
272
255
}
273
256
274
-
257
+
275
258
};
276
259
277
260
const char * LdmsForwarder::s_spec = R"json(
278
- {
261
+ {
279
262
"name" : "ldms",
280
263
"description" : "Forward Caliper regions to LDMS (prototype)",
281
264
"config" :
282
265
[
283
- {
284
- "name" : "filename",
285
- "description" : "Output file name, or stdout/stderr",
286
- "type" : "string",
287
- "value" : "stdout"
288
- }
289
266
]
290
267
}
291
268
)json" ;
0 commit comments