34
34
#include " qvi-utils.h"
35
35
36
36
struct qvi_rmi_msg_header {
37
- qvi_rmi_rpc_funid_t fid = QVI_RMI_FID_INVALID;
37
+ qvi_rmi_rpc_fid_t fid = QVI_RMI_FID_INVALID;
38
38
char picture[16 ] = {' \0 ' };
39
39
};
40
40
@@ -62,7 +62,7 @@ zwrn_msg(
62
62
qvi_log_warn (" {} with errno={} ({})" , ers, erno, qvi_strerr (erno));
63
63
}
64
64
65
- static void
65
+ static inline void
66
66
zsocket_close (
67
67
void **sock
68
68
) {
@@ -77,7 +77,7 @@ zsocket_close(
77
77
*sock = nullptr ;
78
78
}
79
79
80
- static void
80
+ static inline void
81
81
zctx_destroy (
82
82
void **ctx
83
83
) {
@@ -92,27 +92,34 @@ zctx_destroy(
92
92
*ctx = nullptr ;
93
93
}
94
94
95
- static void *
96
- zsocket_create_and_connect (
95
+ static inline void *
96
+ zsocket_create (
97
97
void *zctx,
98
- int sock_type,
99
- const char *addr
98
+ int sock_type
100
99
) {
101
100
void *zsock = zmq_socket (zctx, sock_type);
102
101
if (qvi_unlikely (!zsock)) {
103
102
zerr_msg (" zmq_socket() failed" , errno);
104
103
return nullptr ;
105
104
}
105
+ return zsock;
106
+ }
107
+
108
+ static inline int
109
+ zsocket_connect (
110
+ void *zsock,
111
+ const char *addr
112
+ ) {
106
113
const int rc = zmq_connect (zsock, addr);
107
114
if (qvi_unlikely (rc != 0 )) {
108
115
zerr_msg (" zmq_connect() failed" , errno);
109
116
zsocket_close (&zsock);
110
- return nullptr ;
117
+ return QV_ERR_RPC ;
111
118
}
112
- return zsock ;
119
+ return QV_SUCCESS ;
113
120
}
114
121
115
- static void *
122
+ static inline void *
116
123
zsocket_create_and_bind (
117
124
void *zctx,
118
125
int sock_type,
@@ -147,7 +154,7 @@ msg_free_byte_buffer_cb(
147
154
static inline int
148
155
buffer_append_header (
149
156
qvi_bbuff *buff,
150
- qvi_rmi_rpc_funid_t fid,
157
+ qvi_rmi_rpc_fid_t fid,
151
158
cstr_t picture
152
159
) {
153
160
qvi_rmi_msg_header hdr;
@@ -206,14 +213,13 @@ zmsg_send(
206
213
zmq_msg_t *msg,
207
214
int *bsent
208
215
) {
209
- int qvrc = QV_SUCCESS;
210
216
*bsent = zmq_msg_send (msg, zsock, 0 );
211
217
if (qvi_unlikely (*bsent == -1 )) {
212
218
zerr_msg (" zmq_msg_send() failed" , errno);
213
- qvrc = QV_ERR_RPC;
219
+ zmq_msg_close (msg);
220
+ return QV_ERR_RPC;
214
221
}
215
- if (qvi_unlikely (qvrc != QV_SUCCESS)) zmq_msg_close (msg);
216
- return qvrc;
222
+ return QV_SUCCESS;
217
223
}
218
224
219
225
static inline int
@@ -243,7 +249,7 @@ template <typename... Types>
243
249
static inline int
244
250
rpc_pack (
245
251
qvi_bbuff **buff,
246
- qvi_rmi_rpc_funid_t fid,
252
+ qvi_rmi_rpc_fid_t fid,
247
253
Types &&...args
248
254
) {
249
255
std::string picture;
@@ -297,24 +303,23 @@ template <typename... Types>
297
303
static inline int
298
304
rpc_req (
299
305
void *zsock,
300
- qvi_rmi_rpc_funid_t fid,
306
+ qvi_rmi_rpc_fid_t fid,
301
307
Types &&...args
302
308
) {
303
- int buffer_size = 0 ;
304
-
305
309
qvi_bbuff *buff = nullptr ;
306
310
int rc = rpc_pack (&buff, fid, std::forward<Types>(args)...);
307
311
if (qvi_unlikely (rc != QV_SUCCESS)) {
308
312
qvi_bbuff_delete (&buff);
309
313
return rc;
310
314
}
311
- zmq_msg_t msg;
312
- rc = zmsg_init_from_bbuff (buff, &msg);
313
- if (qvi_unlikely (rc != QV_SUCCESS)) goto out;
314
315
// Cache buffer size here because our call to qvi_bbuff_size() after
315
316
// zmsg_send() may be invalid because msg_free_byte_buffer_cb() may have
316
317
// already been called.
317
- buffer_size = (int )buff->size ();
318
+ const int buffer_size = (int )buff->size ();
319
+
320
+ zmq_msg_t msg;
321
+ rc = zmsg_init_from_bbuff (buff, &msg);
322
+ if (qvi_unlikely (rc != QV_SUCCESS)) goto out;
318
323
319
324
int nbytes_sent;
320
325
rc = zmsg_send (zsock, &msg, &nbytes_sent);
@@ -351,6 +356,9 @@ qvi_rmi_client::qvi_rmi_client(void)
351
356
// Create a new ZMQ context.
352
357
m_zctx = zmq_ctx_new ();
353
358
if (qvi_unlikely (!m_zctx)) throw qvi_runtime_error ();
359
+ // Create the ZMQ socket used for communication with the server.
360
+ m_zsock = zsocket_create (m_zctx, ZMQ_REQ);
361
+ if (qvi_unlikely (!m_zsock)) throw qvi_runtime_error ();
354
362
}
355
363
356
364
qvi_rmi_client::~qvi_rmi_client (void )
@@ -366,27 +374,21 @@ qvi_rmi_client::hwloc(void)
366
374
return m_config.hwloc ;
367
375
}
368
376
369
- int
370
- qvi_rmi_client::m_hello (void )
371
- {
372
- const int rc = rpc_req (m_zsock, QVI_RMI_FID_HELLO, getpid ());
373
- if (qvi_unlikely (rc != QV_SUCCESS)) return rc;
374
-
375
- return rpc_rep (m_zsock, m_config.url , m_config.hwtopo_path );
376
- }
377
-
378
377
int
379
378
qvi_rmi_client::connect (
380
379
const std::string &url
381
380
) {
382
- m_zsock = zsocket_create_and_connect (
383
- m_zctx, ZMQ_REQ, url.c_str ()
384
- );
385
- if (qvi_unlikely (!m_zsock)) return QV_ERR_RPC;
386
-
387
- int rc = m_hello ();
381
+ int rc = zsocket_connect (m_zsock, url.c_str ());
388
382
if (qvi_unlikely (rc != QV_SUCCESS)) return rc;
389
383
384
+ std::string hwtopo_path;
385
+ rc = m_hello (hwtopo_path);
386
+ if (qvi_unlikely (rc != QV_SUCCESS)) return rc;
387
+ // Now that we have all the info we need,
388
+ // finish populating the RMI config.
389
+ m_config.url = url;
390
+ m_config.hwtopo_path = hwtopo_path;
391
+ // Now we can initialize and load our topology.
390
392
rc = qvi_hwloc_topology_init (
391
393
m_config.hwloc , m_config.hwtopo_path .c_str ()
392
394
);
@@ -396,8 +398,23 @@ qvi_rmi_client::connect(
396
398
}
397
399
398
400
// //////////////////////////////////////////////////////////////////////////////
399
- // Client-Side (Public) RPC Definitions
401
+ // Client-Side RPC Definitions
400
402
// //////////////////////////////////////////////////////////////////////////////
403
+ int
404
+ qvi_rmi_client::m_hello (
405
+ std::string &hwtopo_path
406
+ ) {
407
+ int qvrc = rpc_req (m_zsock, QVI_RMI_FID_HELLO, qvi_gettid ());
408
+ if (qvi_unlikely (qvrc != QV_SUCCESS)) return qvrc;
409
+ // Should be set by rpc_rep, so assume an error.
410
+ int rpcrc = QV_ERR_RPC;
411
+ qvrc = rpc_rep (m_zsock, &rpcrc, hwtopo_path);
412
+ if (qvi_unlikely (qvrc != QV_SUCCESS)) {
413
+ return qvrc;
414
+ }
415
+ return rpcrc;
416
+ }
417
+
401
418
int
402
419
qvi_rmi_client::get_cpubind (
403
420
pid_t who,
@@ -584,9 +601,9 @@ qvi_rmi_server::s_rpc_hello(
584
601
const int rc = qvi_bbuff_rmi_unpack (input, &whoisit);
585
602
if (qvi_unlikely (rc != QV_SUCCESS)) return rc;
586
603
// Pack relevant configuration information.
604
+ const int rpcrc = QV_SUCCESS;
587
605
return rpc_pack (
588
- output, hdr->fid ,
589
- server->m_config .url ,
606
+ output, hdr->fid , rpcrc,
590
607
server->m_config .hwtopo_path
591
608
);
592
609
}
@@ -831,11 +848,11 @@ qvi_rmi_server::m_rpc_dispatch(
831
848
}
832
849
833
850
int
834
- qvi_rmi_server::m_go (void )
851
+ qvi_rmi_server::m_execute_main_server_loop (void )
835
852
{
836
853
int rc, bsent;
837
854
volatile int bsentt = 0 ;
838
- volatile std::atomic< bool > active{ true } ;
855
+ volatile bool active = true ;
839
856
do {
840
857
zmq_msg_t mrx, mtx;
841
858
rc = zmsg_recv (m_zsock, &mrx);
@@ -851,7 +868,7 @@ qvi_rmi_server::m_go(void)
851
868
// Nice to understand messaging characteristics.
852
869
qvi_log_debug (" Server Sent {} bytes" , bsentt);
853
870
#else
854
- QVI_UNUSED (bsentt);
871
+ qvi_unused (bsentt);
855
872
#endif
856
873
if (qvi_unlikely (rc != QV_SUCCESS && rc != QV_SUCCESS_SHUTDOWN)) {
857
874
qvi_log_error (" RX/TX loop exited with rc={} ({})" , rc, qv_strerr (rc));
@@ -867,9 +884,6 @@ qvi_rmi_server::configure(
867
884
return QV_SUCCESS;
868
885
}
869
886
870
- /* *
871
- * Populates base hardware pool.
872
- */
873
887
int
874
888
qvi_rmi_server::m_populate_base_hwpool (void )
875
889
{
@@ -891,7 +905,7 @@ qvi_rmi_server::start(void)
891
905
);
892
906
if (qvi_unlikely (!m_zsock)) return QV_ERR_SYS;
893
907
// Start the main service loop.
894
- return m_go ();
908
+ return m_execute_main_server_loop ();
895
909
}
896
910
897
911
/*
0 commit comments