-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathWsWebfluxClient.java
68 lines (54 loc) · 2.52 KB
/
WsWebfluxClient.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package com.example;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;
import org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient;
import org.springframework.web.reactive.socket.client.WebSocketClient;
import reactor.core.publisher.Mono;
import java.net.URI;
import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@Slf4j
public class WsWebfluxClient {
public static void main(String[] args) throws Exception {
WebSocketClient client = new ReactorNettyWebSocketClient();
CountDownLatch latch = new CountDownLatch(2);
AtomicReference<WebSocketSession> s1 = new AtomicReference<>();
AtomicReference<WebSocketSession> s2 = new AtomicReference<>();
client.execute(URI.create("ws://localhost:8080/ws"), new WsHandle(s1, latch)).subscribe();
client.execute(URI.create("ws://localhost:8080/ws"), new WsHandle(s2, latch)).subscribe();
latch.await();
Mono<Void> s1Send = s1.get().send(Mono.just(s1.get().textMessage("s1")));
Mono<Void> s2Send = s2.get().send(Mono.just(s2.get().textMessage("s2")));
Mono.zip(s1Send, s2Send).block();
TimeUnit.SECONDS.sleep(1);
Mono.zip(s1.get().close(), s2.get().close()).block();
}
public static class WsHandle implements WebSocketHandler {
private final AtomicReference<WebSocketSession> sessionRef;
private final CountDownLatch latch;
public WsHandle(AtomicReference<WebSocketSession> sessionRef, CountDownLatch latch) {
this.sessionRef = sessionRef;
this.latch = latch;
}
@Override
public Mono<Void> handle(WebSocketSession session) {
sessionRef.set(session);
log.info("link: {}", session.getId());
return session.receive()
.doOnSubscribe(subscription -> latch.countDown())
.doOnComplete(() -> {
log.info("close: {}", session.getId());
})
.doOnError(throwable -> {
log.error("error[{}]: ", session.getId(), throwable);
})
.doOnNext(message -> {
log.info("receive[{}]: {}", session.getId(), message.getPayloadAsText());
})
.then();
}
}
}