-
Notifications
You must be signed in to change notification settings - Fork 489
/
Copy pathflclient2.java
116 lines (99 loc) · 3.64 KB
/
flclient2.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package guide;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ.Poller;
import org.zeromq.ZMQ.Socket;
import org.zeromq.ZMsg;
// Freelance client - Model 2
// Uses DEALER socket to blast one or more services
public class flclient2
{
// If not a single service replies within this time, give up
private static final int GLOBAL_TIMEOUT = 2500;
// .split class implementation
// Here is the {{flclient}} class implementation. Each instance has a
// context, a DEALER socket it uses to talk to the servers, a counter
// of how many servers it's connected to, and a request getSequence number:
private final ZContext ctx; // Our context wrapper
private final Socket socket; // DEALER socket talking to servers
private int servers; // How many servers we have connected to
private int sequence; // Number of requests ever sent
public flclient2()
{
ctx = new ZContext();
socket = ctx.createSocket(SocketType.DEALER);
}
public void destroy()
{
ctx.destroy();
}
private void connect(String endpoint)
{
socket.connect(endpoint);
servers++;
}
private ZMsg request(ZMsg request)
{
// Prefix request with getSequence number and empty envelope
String sequenceText = String.format("%d", ++sequence);
request.push(sequenceText);
request.push("");
// Blast the request to all connected servers
int server;
for (server = 0; server < servers; server++) {
ZMsg msg = request.duplicate();
msg.send(socket);
}
// Wait for a matching reply to arrive from anywhere
// Since we can poll several times, calculate each one
ZMsg reply = null;
long endtime = System.currentTimeMillis() + GLOBAL_TIMEOUT;
Poller poller = ctx.createPoller(1);
poller.register(socket, Poller.POLLIN);
while (System.currentTimeMillis() < endtime) {
poller.poll(endtime - System.currentTimeMillis());
if (poller.pollin(0)) {
// Reply is [empty][getSequence][OK]
reply = ZMsg.recvMsg(socket);
assert (reply.size() == 3);
reply.pop();
String sequenceStr = reply.popString();
int sequenceNbr = Integer.parseInt(sequenceStr);
if (sequenceNbr == sequence)
break;
reply.destroy();
}
}
poller.close();
request.destroy();
return reply;
}
public static void main(String[] argv)
{
if (argv.length == 0) {
System.out.print("I: syntax: flclient2 <endpoint> ...\n");
System.exit(0);
}
// Create new freelance client object
flclient2 client = new flclient2();
// Connect to each endpoint
int argn;
for (argn = 0; argn < argv.length; argn++)
client.connect(argv[argn]);
// Send a bunch of name resolution 'requests', measure time
int requests = 10000;
long start = System.currentTimeMillis();
while (requests-- > 0) {
ZMsg request = new ZMsg();
request.add("random name");
ZMsg reply = client.request(request);
if (reply == null) {
System.out.print("E: name service not available, aborting\n");
break;
}
reply.destroy();
}
System.out.printf("Average round trip cost: %d usec\n", (int) (System.currentTimeMillis() - start) / 10);
client.destroy();
}
}