Skip to content

Commit de1c932

Browse files
Cleanup RMI code. (#306)
Attempting to address server-side TSan warning. Signed-off-by: Samuel K. Gutierrez <[email protected]>
1 parent a105176 commit de1c932

File tree

2 files changed

+33
-71
lines changed

2 files changed

+33
-71
lines changed

src/qvi-rmi.cc

+29-68
Original file line numberDiff line numberDiff line change
@@ -139,18 +139,6 @@ zsocket_create_and_bind(
139139
return zsock;
140140
}
141141

142-
/**
143-
* Buffer resource deallocation callback.
144-
*/
145-
static void
146-
msg_free_byte_buffer_cb(
147-
void *,
148-
void *hint
149-
) {
150-
qvi_bbuff *buff = (qvi_bbuff *)hint;
151-
qvi_bbuff_delete(&buff);
152-
}
153-
154142
static inline int
155143
buffer_append_header(
156144
qvi_bbuff *buff,
@@ -191,39 +179,25 @@ unpack_msg_header(
191179
}
192180

193181
static inline int
194-
zmsg_init_from_bbuff(
195-
qvi_bbuff *bbuff,
196-
zmq_msg_t *zmsg
197-
) {
198-
const size_t buffer_size = bbuff->size();
199-
const int zrc = zmq_msg_init_data(
200-
zmsg, bbuff->data(), buffer_size,
201-
msg_free_byte_buffer_cb, bbuff
202-
);
203-
if (qvi_unlikely(zrc != 0)) {
204-
zerr_msg("zmq_msg_init_data() failed", errno);
205-
return QV_ERR_RPC;
206-
}
207-
return QV_SUCCESS;
208-
}
209-
210-
static inline int
211-
zmsg_send(
182+
zsock_send_bbuff(
212183
void *zsock,
213-
zmq_msg_t *msg,
184+
qvi_bbuff *bbuff,
214185
int *bsent
215186
) {
216-
*bsent = zmq_msg_send(msg, zsock, 0);
217-
if (qvi_unlikely(*bsent == -1)) {
218-
zerr_msg("zmq_msg_send() failed", errno);
219-
zmq_msg_close(msg);
187+
const int buff_size = bbuff->size();
188+
*bsent = zmq_send(zsock, bbuff->data(), buff_size, 0);
189+
if (qvi_unlikely(*bsent != buff_size)) {
190+
zerr_msg("zmq_send() truncated", errno);
220191
return QV_ERR_RPC;
221192
}
193+
// We are resposible for freeing the buffer after ZMQ has
194+
// taken ownership of its contents after zmq_send() returns.
195+
qvi_bbuff_delete(&bbuff);
222196
return QV_SUCCESS;
223197
}
224198

225199
static inline int
226-
zmsg_recv(
200+
zsock_recv_msg(
227201
void *zsock,
228202
zmq_msg_t *mrx
229203
) {
@@ -306,23 +280,14 @@ rpc_req(
306280
qvi_rmi_rpc_fid_t fid,
307281
Types &&...args
308282
) {
309-
qvi_bbuff *buff = nullptr;
310-
const int rc = rpc_pack(&buff, fid, std::forward<Types>(args)...);
283+
qvi_bbuff *bbuff = nullptr;
284+
const int rc = rpc_pack(&bbuff, fid, std::forward<Types>(args)...);
311285
if (qvi_unlikely(rc != QV_SUCCESS)) {
312-
qvi_bbuff_delete(&buff);
286+
qvi_bbuff_delete(&bbuff);
313287
return rc;
314288
}
315-
316-
const int buff_size = buff->size();
317-
const int nbsent = zmq_send(zsock, buff->data(), buff_size, 0);
318-
if (qvi_unlikely(nbsent != buff_size)) {
319-
zerr_msg("zmq_send() truncated", errno);
320-
return QV_ERR_RPC;
321-
}
322-
// We are resposible for freeing the buffer after ZMQ has
323-
// taken ownership of its contents after zmq_send() returns.
324-
qvi_bbuff_delete(&buff);
325-
return rc;
289+
int bsent = 0;
290+
return zsock_send_bbuff(zsock, bbuff, &bsent);
326291
}
327292

328293
template <typename... Types>
@@ -332,7 +297,7 @@ rpc_rep(
332297
Types &&...args
333298
) {
334299
zmq_msg_t msg;
335-
int rc = zmsg_recv(zsock, &msg);
300+
int rc = zsock_recv_msg(zsock, &msg);
336301
if (qvi_unlikely(rc != QV_SUCCESS)) goto out;
337302
rc = rpc_unpack(zmq_msg_data(&msg), std::forward<Types>(args)...);
338303
out:
@@ -801,13 +766,14 @@ qvi_rmi_server::s_rpc_get_intrinsic_hwpool(
801766

802767
int
803768
qvi_rmi_server::m_rpc_dispatch(
804-
zmq_msg_t *msg_in,
805-
zmq_msg_t *msg_out
769+
void *zsock,
770+
zmq_msg_t *command_msg,
771+
int *bsent
806772
) {
807773
int rc = QV_SUCCESS;
808774
bool shutdown = false;
809775

810-
void *data = zmq_msg_data(msg_in);
776+
void *data = zmq_msg_data(command_msg);
811777
qvi_rmi_msg_header hdr;
812778
const size_t trim = unpack_msg_header(data, &hdr);
813779
void *body = data_trim(data, trim);
@@ -819,8 +785,8 @@ qvi_rmi_server::m_rpc_dispatch(
819785
goto out;
820786
}
821787

822-
qvi_bbuff *res;
823-
rc = fidfunp->second(this, &hdr, body, &res);
788+
qvi_bbuff *result;
789+
rc = fidfunp->second(this, &hdr, body, &result);
824790
if (qvi_unlikely(rc != QV_SUCCESS && rc != QV_SUCCESS_SHUTDOWN)) {
825791
cstr_t ers = "RPC dispatch failed";
826792
qvi_log_error("{} with rc={} ({})", ers, rc, qv_strerr(rc));
@@ -830,30 +796,25 @@ qvi_rmi_server::m_rpc_dispatch(
830796
if (qvi_unlikely(rc == QV_SUCCESS_SHUTDOWN)) {
831797
shutdown = true;
832798
}
833-
rc = zmsg_init_from_bbuff(res, msg_out);
799+
rc = zsock_send_bbuff(zsock, result, bsent);
834800
out:
835-
zmq_msg_close(msg_in);
836-
if (rc != QV_SUCCESS && rc != QV_SUCCESS_SHUTDOWN) {
837-
zmq_msg_close(msg_out);
838-
}
801+
zmq_msg_close(command_msg);
839802
return (shutdown ? QV_SUCCESS_SHUTDOWN : rc);
840803
}
841804

842805
int
843-
qvi_rmi_server::m_execute_main_server_loop(void)
806+
qvi_rmi_server::m_enter_main_server_loop(void)
844807
{
845808
int rc, bsent;
846809
volatile int bsentt = 0;
847810
volatile bool active = true;
811+
zmq_msg_t mrx;
848812
do {
849-
zmq_msg_t mrx, mtx;
850-
rc = zmsg_recv(m_zsock, &mrx);
813+
rc = zsock_recv_msg(m_zsock, &mrx);
851814
if (qvi_unlikely(rc != QV_SUCCESS)) break;
852-
rc = m_rpc_dispatch(&mrx, &mtx);
815+
rc = m_rpc_dispatch(m_zsock, &mrx, &bsent);
853816
if (qvi_unlikely(rc != QV_SUCCESS && rc != QV_SUCCESS_SHUTDOWN)) break;
854817
if (qvi_unlikely(rc == QV_SUCCESS_SHUTDOWN)) active = false;
855-
rc = zmsg_send(m_zsock, &mtx, &bsent);
856-
if (qvi_unlikely(rc != QV_SUCCESS)) break;
857818
bsentt += bsent;
858819
} while(qvi_likely(active));
859820
#if QVI_DEBUG_MODE == 1
@@ -897,7 +858,7 @@ qvi_rmi_server::start(void)
897858
);
898859
if (qvi_unlikely(!m_zsock)) return QV_ERR_SYS;
899860
// Start the main service loop.
900-
return m_execute_main_server_loop();
861+
return m_enter_main_server_loop();
901862
}
902863

903864
/*

src/qvi-rmi.h

+4-3
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,9 @@ struct qvi_rmi_server {
8080
/** Performs RPC dispatch. */
8181
int
8282
m_rpc_dispatch(
83-
zmq_msg_t *msg_in,
84-
zmq_msg_t *msg_out
83+
void *zsock,
84+
zmq_msg_t *command_msg,
85+
int *bsent
8586
);
8687
/** */
8788
int
@@ -97,7 +98,7 @@ struct qvi_rmi_server {
9798
);
9899
/** Executes the main server loop. */
99100
int
100-
m_execute_main_server_loop(void);
101+
m_enter_main_server_loop(void);
101102
/** */
102103
static int
103104
s_rpc_invalid(

0 commit comments

Comments
 (0)