53
53
qvi_log_warn (" {} with errno={} ({})" , ers, (erno), qvi_strerr ((erno))); \
54
54
} while (0 )
55
55
56
- static constexpr cstr_t ZINPROC_ADDR = " inproc://qvi-rmi-workers" ;
57
-
58
56
typedef enum qvi_rpc_funid_e {
59
57
FID_INVALID = 0 ,
60
58
FID_SERVER_SHUTDOWN,
@@ -85,11 +83,6 @@ typedef int (*qvi_rpc_fun_ptr_t)(
85
83
qvi_bbuff **
86
84
);
87
85
88
- static void
89
- send_server_shutdown_msg (
90
- qvi_rmi_server_t *server
91
- );
92
-
93
86
static void
94
87
zsocket_close (
95
88
void **sock
@@ -129,10 +122,6 @@ struct qvi_rmi_server_s {
129
122
void *zctx = nullptr ;
130
123
/* * Loopback socket for managerial messages. */
131
124
void *zlo = nullptr ;
132
- /* * The worker thread. */
133
- pthread_t worker_thread;
134
- /* * Flag indicating if main thread blocks for workers to complete. */
135
- bool blocks = false ;
136
125
/* * Constructor. */
137
126
qvi_rmi_server_s (void )
138
127
{
@@ -145,14 +134,10 @@ struct qvi_rmi_server_s {
145
134
/* * Destructor. */
146
135
~qvi_rmi_server_s (void )
147
136
{
148
- send_server_shutdown_msg (this );
149
137
zsocket_close (&zlo);
150
138
zctx_destroy (&zctx);
151
139
unlink (config.hwtopo_path .c_str ());
152
140
qvi_delete (&hwpool);
153
- if (!blocks) {
154
- pthread_join (worker_thread, nullptr );
155
- }
156
141
}
157
142
};
158
143
@@ -772,16 +757,15 @@ server_rpc_dispatch(
772
757
return (shutdown ? QV_SUCCESS_SHUTDOWN : rc);
773
758
}
774
759
775
- static void *
760
+ static int
776
761
server_go (
777
- void *data
762
+ qvi_rmi_server_t *server
778
763
) {
779
- qvi_rmi_server_t *const server = (qvi_rmi_server_t *)data;
780
764
781
- void *zworksock = zsocket_create_and_connect (
782
- server->zctx , ZMQ_REP, ZINPROC_ADDR
765
+ void *zworksock = zsocket_create_and_bind (
766
+ server->zctx , ZMQ_REP, server-> config . url . c_str ()
783
767
);
784
- if (qvi_unlikely (!zworksock)) return nullptr ;
768
+ if (qvi_unlikely (!zworksock)) return QV_ERR_SYS ;
785
769
786
770
int rc, bsent;
787
771
volatile int bsentt = 0 ;
@@ -807,15 +791,7 @@ server_go(
807
791
if (qvi_unlikely (rc != QV_SUCCESS && rc != QV_SUCCESS_SHUTDOWN)) {
808
792
qvi_log_error (" RX/TX loop exited with rc={} ({})" , rc, qv_strerr (rc));
809
793
}
810
- return nullptr ;
811
- }
812
-
813
- static void
814
- send_server_shutdown_msg (
815
- qvi_rmi_server_t *server
816
- ) {
817
- (void )rpc_req (server->zlo , FID_SERVER_SHUTDOWN, QVI_BBUFF_RMI_ZERO_MSG);
818
- (void )rpc_rep (server->zlo , QVI_BBUFF_RMI_ZERO_MSG);
794
+ return QV_SUCCESS;
819
795
}
820
796
821
797
int
@@ -840,49 +816,9 @@ server_populate_base_hwpool(
840
816
return server->hwpool ->initialize (hwloc, cpuset);
841
817
}
842
818
843
- static void *
844
- server_start_threads (
845
- void *data
846
- ) {
847
- qvi_rmi_server_t *server = (qvi_rmi_server_t *)data;
848
-
849
- void *clients = zsocket_create_and_bind (
850
- server->zctx , ZMQ_ROUTER, server->config .url .c_str ()
851
- );
852
- if (qvi_unlikely (!clients)) {
853
- cstr_t ers = " zsocket_create_and_bind() failed" ;
854
- qvi_log_error (" {}" , ers);
855
- return nullptr ;
856
- }
857
-
858
- void *workers = zsocket_create_and_bind (
859
- server->zctx , ZMQ_DEALER, ZINPROC_ADDR
860
- );
861
- if (qvi_unlikely (!workers)) {
862
- cstr_t ers = " zsocket_create_and_bind() failed" ;
863
- qvi_log_error (" {}" , ers);
864
- return nullptr ;
865
- }
866
-
867
- pthread_t worker;
868
- int rc = pthread_create (&worker, nullptr , server_go, server);
869
- if (qvi_unlikely (rc != 0 )) {
870
- cstr_t ers = " pthread_create() failed" ;
871
- qvi_log_error (" {} with rc={} ({})" , ers, rc, qvi_strerr (rc));
872
- }
873
- // The zmq_proxy() function always returns -1 and errno set to ETERM.
874
- zmq_proxy (clients, workers, nullptr );
875
- pthread_join (worker, nullptr );
876
- zsocket_close (&workers);
877
-
878
- zsocket_close (&clients);
879
- return nullptr ;
880
- }
881
-
882
819
int
883
820
qvi_rmi_server_start (
884
- qvi_rmi_server_t *server,
885
- bool block
821
+ qvi_rmi_server_t *server
886
822
) {
887
823
// First populate the base hardware resource pool.
888
824
int qvrc = server_populate_base_hwpool (server);
@@ -891,23 +827,9 @@ qvi_rmi_server_start(
891
827
server->zlo = zsocket_create_and_connect (
892
828
server->zctx , ZMQ_REQ, server->config .url .c_str ()
893
829
);
894
- if (qvi_unlikely (!server->zlo )) return QV_ERR_RPC;
895
-
896
- const int rc = pthread_create (
897
- &server->worker_thread , nullptr ,
898
- server_start_threads, server
899
- );
900
- if (qvi_unlikely (rc != 0 )) {
901
- cstr_t ers = " pthread_create() failed" ;
902
- qvi_log_error (" {} with rc={} ({})" , ers, rc, qvi_strerr (rc));
903
- qvrc = QV_ERR_SYS;
904
- }
830
+ if (qvi_unlikely (!server->zlo )) return QV_ERR_SYS;
905
831
906
- if (block && qvrc == QV_SUCCESS) {
907
- server->blocks = true ;
908
- pthread_join (server->worker_thread , nullptr );
909
- }
910
- return qvrc;
832
+ return server_go (server);
911
833
}
912
834
913
835
static int
@@ -1096,6 +1018,15 @@ qvi_rmi_get_cpuset_for_nobjs(
1096
1018
);
1097
1019
}
1098
1020
1021
+ int
1022
+ qvi_rmi_send_shutdown_message (
1023
+ qvi_rmi_client_t *client
1024
+ ) {
1025
+ return rpc_req (
1026
+ client->zsock , FID_SERVER_SHUTDOWN
1027
+ );
1028
+ }
1029
+
1099
1030
/*
1100
1031
* vim: ft=cpp ts=4 sts=4 sw=4 expandtab
1101
1032
*/
0 commit comments