@@ -139,18 +139,6 @@ zsocket_create_and_bind(
139
139
return zsock;
140
140
}
141
141
142
- /* *
143
- * Buffer resource deallocation callback.
144
- */
145
- static void
146
- msg_free_byte_buffer_cb (
147
- void *,
148
- void *hint
149
- ) {
150
- qvi_bbuff *buff = (qvi_bbuff *)hint;
151
- qvi_bbuff_delete (&buff);
152
- }
153
-
154
142
static inline int
155
143
buffer_append_header (
156
144
qvi_bbuff *buff,
@@ -191,34 +179,20 @@ unpack_msg_header(
191
179
}
192
180
193
181
static inline int
194
- zmsg_init_from_bbuff (
195
- qvi_bbuff *bbuff,
196
- zmq_msg_t *zmsg
197
- ) {
198
- const size_t buffer_size = bbuff->size ();
199
- const int zrc = zmq_msg_init_data (
200
- zmsg, bbuff->data (), buffer_size,
201
- msg_free_byte_buffer_cb, bbuff
202
- );
203
- if (qvi_unlikely (zrc != 0 )) {
204
- zerr_msg (" zmq_msg_init_data() failed" , errno);
205
- return QV_ERR_RPC;
206
- }
207
- return QV_SUCCESS;
208
- }
209
-
210
- static inline int
211
- zmsg_send (
182
+ zsock_send_bbuff (
212
183
void *zsock,
213
- zmq_msg_t *msg ,
184
+ qvi_bbuff *bbuff ,
214
185
int *bsent
215
186
) {
216
- *bsent = zmq_msg_send (msg, zsock, 0 );
217
- if ( qvi_unlikely ( *bsent == - 1 )) {
218
- zerr_msg ( " zmq_msg_send() failed " , errno);
219
- zmq_msg_close (msg );
187
+ const int buff_size = bbuff-> size ( );
188
+ *bsent = zmq_send (zsock, bbuff-> data (), buff_size, 0 );
189
+ if ( qvi_unlikely (*bsent != buff_size)) {
190
+ zerr_msg ( " zmq_send() truncated " , errno );
220
191
return QV_ERR_RPC;
221
192
}
193
+ // We are resposible for freeing the buffer after ZMQ has
194
+ // taken ownership of its contents after zmq_send() returns.
195
+ qvi_bbuff_delete (&bbuff);
222
196
return QV_SUCCESS;
223
197
}
224
198
@@ -306,23 +280,14 @@ rpc_req(
306
280
qvi_rmi_rpc_fid_t fid,
307
281
Types &&...args
308
282
) {
309
- qvi_bbuff *buff = nullptr ;
310
- const int rc = rpc_pack (&buff , fid, std::forward<Types>(args)...);
283
+ qvi_bbuff *bbuff = nullptr ;
284
+ const int rc = rpc_pack (&bbuff , fid, std::forward<Types>(args)...);
311
285
if (qvi_unlikely (rc != QV_SUCCESS)) {
312
- qvi_bbuff_delete (&buff );
286
+ qvi_bbuff_delete (&bbuff );
313
287
return rc;
314
288
}
315
-
316
- const int buff_size = buff->size ();
317
- const int nbsent = zmq_send (zsock, buff->data (), buff_size, 0 );
318
- if (qvi_unlikely (nbsent != buff_size)) {
319
- zerr_msg (" zmq_send() truncated" , errno);
320
- return QV_ERR_RPC;
321
- }
322
- // We are resposible for freeing the buffer after ZMQ has
323
- // taken ownership of its contents after zmq_send() returns.
324
- qvi_bbuff_delete (&buff);
325
- return rc;
289
+ int bsent = 0 ;
290
+ return zsock_send_bbuff (zsock, bbuff, &bsent);
326
291
}
327
292
328
293
template <typename ... Types>
@@ -801,13 +766,14 @@ qvi_rmi_server::s_rpc_get_intrinsic_hwpool(
801
766
802
767
int
803
768
qvi_rmi_server::m_rpc_dispatch (
804
- zmq_msg_t *msg_in,
805
- zmq_msg_t *msg_out
769
+ void *zsock,
770
+ zmq_msg_t *command_msg,
771
+ int *bsent
806
772
) {
807
773
int rc = QV_SUCCESS;
808
774
bool shutdown = false ;
809
775
810
- void *data = zmq_msg_data (msg_in );
776
+ void *data = zmq_msg_data (command_msg );
811
777
qvi_rmi_msg_header hdr;
812
778
const size_t trim = unpack_msg_header (data, &hdr);
813
779
void *body = data_trim (data, trim);
@@ -819,8 +785,8 @@ qvi_rmi_server::m_rpc_dispatch(
819
785
goto out;
820
786
}
821
787
822
- qvi_bbuff *res ;
823
- rc = fidfunp->second (this , &hdr, body, &res );
788
+ qvi_bbuff *result ;
789
+ rc = fidfunp->second (this , &hdr, body, &result );
824
790
if (qvi_unlikely (rc != QV_SUCCESS && rc != QV_SUCCESS_SHUTDOWN)) {
825
791
cstr_t ers = " RPC dispatch failed" ;
826
792
qvi_log_error (" {} with rc={} ({})" , ers, rc, qv_strerr (rc));
@@ -830,30 +796,25 @@ qvi_rmi_server::m_rpc_dispatch(
830
796
if (qvi_unlikely (rc == QV_SUCCESS_SHUTDOWN)) {
831
797
shutdown = true ;
832
798
}
833
- rc = zmsg_init_from_bbuff (res, msg_out );
799
+ rc = zsock_send_bbuff (zsock, result, bsent );
834
800
out:
835
- zmq_msg_close (msg_in);
836
- if (rc != QV_SUCCESS && rc != QV_SUCCESS_SHUTDOWN) {
837
- zmq_msg_close (msg_out);
838
- }
801
+ zmq_msg_close (command_msg);
839
802
return (shutdown ? QV_SUCCESS_SHUTDOWN : rc);
840
803
}
841
804
842
805
int
843
- qvi_rmi_server::m_execute_main_server_loop (void )
806
+ qvi_rmi_server::m_enter_main_server_loop (void )
844
807
{
845
808
int rc, bsent;
846
809
volatile int bsentt = 0 ;
847
810
volatile bool active = true ;
811
+ zmq_msg_t mrx;
848
812
do {
849
- zmq_msg_t mrx, mtx;
850
813
rc = zmsg_recv (m_zsock, &mrx);
851
814
if (qvi_unlikely (rc != QV_SUCCESS)) break ;
852
- rc = m_rpc_dispatch (&mrx, &mtx );
815
+ rc = m_rpc_dispatch (m_zsock, &mrx, &bsent );
853
816
if (qvi_unlikely (rc != QV_SUCCESS && rc != QV_SUCCESS_SHUTDOWN)) break ;
854
817
if (qvi_unlikely (rc == QV_SUCCESS_SHUTDOWN)) active = false ;
855
- rc = zmsg_send (m_zsock, &mtx, &bsent);
856
- if (qvi_unlikely (rc != QV_SUCCESS)) break ;
857
818
bsentt += bsent;
858
819
} while (qvi_likely (active));
859
820
#if QVI_DEBUG_MODE == 1
@@ -897,7 +858,7 @@ qvi_rmi_server::start(void)
897
858
);
898
859
if (qvi_unlikely (!m_zsock)) return QV_ERR_SYS;
899
860
// Start the main service loop.
900
- return m_execute_main_server_loop ();
861
+ return m_enter_main_server_loop ();
901
862
}
902
863
903
864
/*
0 commit comments