Skip to content

Commit f0ca46e

Browse files
author
litongjava
committed
add websocket client
1 parent 3d7ec16 commit f0ca46e

File tree

4 files changed

+368
-2
lines changed

4 files changed

+368
-2
lines changed

docs/zh/06_内置组件/03.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
### 异步响应
1+
# 异步响应
22

33
tio-boot 的响应本身就是异步的,但是如果你想要在发送 http 响应之后在做一些事情,可以参考本节的内容
44

@@ -229,4 +229,4 @@ public class AsyncController {
229229

230230
```
231231
{"code":1}
232-
```
232+
```

docs/zh/06_内置组件/05.md

+107
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
# WebSocket
22

3+
## WebSocket Server
4+
35
配置 websocket 路由
46

57
```java
@@ -173,3 +175,108 @@ ws://127.0.0.1/hello
173175
#### 测试代码地址
174176

175177
https://github.com/litongjava/java-ee-tio-boot-study/tree/main/tio-boot-latest-study/tio-boot-websocket-hello
178+
179+
## WebSocket Client
180+
181+
### 1. 依赖添加
182+
183+
首先,您需要在项目的`pom.xml`文件中添加以下依赖,以使用`tio-websocket-client`库。
184+
185+
```xml
186+
<dependency>
187+
<groupId>com.litongjava</groupId>
188+
<artifactId>tio-websocket-client</artifactId>
189+
<version>3.7.3.v20210706</version>
190+
</dependency>
191+
```
192+
193+
这个依赖是 WebSocket 客户端的核心,提供了建立连接、发送和接收消息等功能。
194+
195+
### 2. WebSocket 客户端测试
196+
197+
### 主要组件和流程:
198+
199+
1. **消息发送跟踪:** 使用`ConcurrentHashMap`来存储和跟踪每条消息的发送状态。
200+
2. **消息确认机制:** 使用 RxJava 的`Subject``PublishSubject`来处理消息确认。当所有消息都确认发送后,会打印出“All sent success!”。
201+
3. **WebSocket 客户端配置:**
202+
- `onOpen`:连接打开时的回调。
203+
- `onMessage`:接收到消息时的回调。更新消息状态,并打印接收到的消息。
204+
- `onClose`:连接关闭时的回调。
205+
- `onError`:出现错误时的回调。
206+
- `onThrows`:异常处理。
207+
4. **连接建立:** 使用`WsClient.create`创建 WebSocket 客户端,并通过`connect`方法建立连接。
208+
5. **消息发送:** 循环发送一定数量的消息,并打印发送状态。
209+
210+
```
211+
212+
import java.util.List;
213+
import java.util.Map;
214+
import java.util.concurrent.ConcurrentHashMap;
215+
import java.util.function.Consumer;
216+
217+
import com.litongjava.tio.websocket.client.WebSocket;
218+
import com.litongjava.tio.websocket.client.WsClient;
219+
import com.litongjava.tio.websocket.client.config.WsClientConfig;
220+
import com.litongjava.tio.websocket.client.event.CloseEvent;
221+
import com.litongjava.tio.websocket.client.event.ErrorEvent;
222+
import com.litongjava.tio.websocket.client.event.MessageEvent;
223+
import com.litongjava.tio.websocket.client.event.OpenEvent;
224+
import com.litongjava.tio.websocket.common.WsPacket;
225+
226+
import io.reactivex.subjects.PublishSubject;
227+
import io.reactivex.subjects.Subject;
228+
229+
public class TioWebSocketDemo {
230+
231+
public static void main(String[] args) throws Exception {
232+
Map<Long, Boolean> sent = new ConcurrentHashMap<>();
233+
int total = 1000;
234+
String uri = "ws://localhost/hello";
235+
236+
// onNext
237+
io.reactivex.functions.Consumer<? super List<Object>> onNext = x -> {
238+
Boolean all = sent.values().stream().reduce(true, (p, c) -> p && c);
239+
if (all) {
240+
System.out.println("All sent success! ");
241+
}
242+
};
243+
244+
// complete
245+
Subject<Object> complete = PublishSubject.create().toSerialized();
246+
// subscribe
247+
complete.buffer(total).subscribe(onNext);
248+
249+
// wsClientConfig
250+
Consumer<OpenEvent> onOpen = e -> System.out.println("opened");
251+
252+
Consumer<MessageEvent> onMessage = e -> {
253+
WsPacket data = e.data;
254+
Long id = data.getId();
255+
String wsBodyText = data.getWsBodyText();
256+
sent.put(id, true);
257+
System.out.println("recv: " + wsBodyText);
258+
complete.onNext(id);
259+
};
260+
261+
Consumer<CloseEvent> onClose = e -> System.out.printf("on close: %d, %s, %s\n", e.code, e.reason, e.wasClean);
262+
Consumer<ErrorEvent> onError = e -> System.out.println(String.format("on error: %s", e.msg));
263+
Consumer<Throwable> onThrows = Throwable::printStackTrace;
264+
265+
// wsClientConfig
266+
WsClientConfig wsClientConfig = new WsClientConfig(onOpen, onMessage, onClose, onError, onThrows);
267+
268+
// create
269+
WsClient echo = WsClient.create(uri, wsClientConfig);
270+
271+
// connect
272+
WebSocket ws = echo.connect();
273+
274+
// sent
275+
for (int i = 0; i < total; i++) {
276+
ws.send("" + i);
277+
System.out.println("sent: " + i);
278+
}
279+
}
280+
}
281+
282+
```

docs/zh/06_内置组件/11.md

+219
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
# 独立启动 UDPServer
2+
3+
tio-boot 内置了 UDPServer,开发者口可以通过两种方式启动
4+
5+
- 独立方式 UDPServer 占用单独的端口
6+
- 联合方式 UDPServer 和其他协议共用一个端口
7+
8+
## 操作系统会自动区分 TCP 和 UDP
9+
10+
TCP 和 UDP 的区分是由操作系统的网络协议栈进行的。当数据包到达网络接口时,操作系统会检查数据包的协议头信息来确定它是 TCP 数据包还是 UDP 数据包。然后,根据这个信息,操作系统将数据包路由到相应的处理程序或应用程序。
11+
12+
TCP(传输控制协议)和 UDP(用户数据报协议)都建立在 IP(互联网协议)之上,但它们在传输层提供不同的服务模型:
13+
14+
1. **TCP** 提供一种可靠的数据传输服务,它确保数据准确无误地从发送方传输到接收方。TCP 通过序号、确认回应、重传机制、流量控制等方式来保证这种可靠性。由于这些特性,TCP 适用于要求高可靠性的应用,如网页浏览、文件传输、电子邮件等。
15+
16+
2. **UDP** 提供一种无连接的服务,允许数据以数据报文的形式发送,而不保证传输的可靠性、顺序或数据完整性。UDP 的这种简单性使其成为对实时性要求高(如在线游戏、语音或视频会议等)和/或对系统资源使用敏感的应用程序的理想选择。
17+
18+
当操作系统收到一个数据包时,它会检查 IP 头部中的协议字段来确定上层使用的是哪个协议(例如,TCP 或 UDP)。然后,它会检查数据包的端口号,并将数据包传递给在该端口监听的应用程序。如果是 TCP 数据包,它还会处理与 TCP 连接相关的各种控制消息,如 SYN、ACK 等。对于 UDP,由于其无连接的特性,操作系统的处理相对简单。
19+
20+
这种区分和处理都是透明的,通常对应用程序开发者和用户而言是不可见的。操作系统和网络协议栈负责确保网络通信的正确性和效率,而应用程序可以通过标准的网络接口(如套接字)来进行通信,无需关心底层的复杂性。
21+
22+
## 使用 Java 代码启动 TCP 端口和 UDP 端口
23+
24+
在 Java 中,要同时在同一个端口上处理 TCP 和 UDP,你需要分别为 TCP 和 UDP 创建不同的服务端套接字,并且这两个套接字监听同一个端口号。通常,你会为 TCP 使用`ServerSocket`类,并为 UDP 使用`DatagramSocket`类。
25+
26+
下面是一个简单的示例,展示了如何同时监听同一端口上的 TCP 和 UDP 请求:
27+
28+
```java
29+
package org.tio.showcase.udp.demo;
30+
31+
import java.io.IOException;
32+
import java.net.DatagramPacket;
33+
import java.net.DatagramSocket;
34+
import java.net.ServerSocket;
35+
import java.net.Socket;
36+
37+
public class TcpUdpServer {
38+
private static final int PORT = 12345; // 选择一个端口号
39+
40+
public static void main(String[] args) throws IOException {
41+
// 创建并启动TCP服务器线程
42+
Thread tcpThread = new Thread(() -> {
43+
try (ServerSocket serverSocket = new ServerSocket(PORT)) {
44+
System.out.println("TCP Server is running on port " + PORT);
45+
while (true) {
46+
Socket clientSocket = serverSocket.accept();
47+
// 处理TCP连接...
48+
System.out.println("TCP Connection established");
49+
// 可以为每个连接创建新线程或使用线程池来处理
50+
}
51+
} catch (IOException e) {
52+
e.printStackTrace();
53+
}
54+
});
55+
56+
// 创建并启动UDP服务器线程
57+
Thread udpThread = new Thread(() -> {
58+
try (DatagramSocket datagramSocket = new DatagramSocket(PORT)) {
59+
System.out.println("UDP Server is running on port " + PORT);
60+
byte[] receiveData = new byte[1024];
61+
while (true) {
62+
DatagramPacket receivePacket = new DatagramPacket(receiveData, receiveData.length);
63+
datagramSocket.receive(receivePacket);
64+
// 处理UDP包...
65+
System.out.println("UDP Packet received");
66+
// 可以在这里处理数据或者创建新线程进行处理
67+
}
68+
} catch (IOException e) {
69+
e.printStackTrace();
70+
}
71+
});
72+
73+
tcpThread.start(); // 启动TCP服务器线程
74+
udpThread.start(); // 启动UDP服务器线程
75+
}
76+
}
77+
```
78+
79+
在这个例子中:
80+
81+
- 对于 TCP,我们创建了一个`ServerSocket`,它在指定的端口上监听传入的 TCP 连接请求。每当接受一个连接时,`accept`方法会返回一个新的`Socket`实例,用于与客户端通信。
82+
- 对于 UDP,我们创建了一个`DatagramSocket`,它可以在指定的端口上接收 UDP 数据包。每次调用`receive`方法都会从套接字的队列中取出一个数据包,我们可以处理这个数据包。
83+
84+
两个服务器(TCP 和 UDP)都在自己的线程中运行,这样它们就可以并行处理数据,并独立地接收和处理各自协议的数据。记住在实际应用中,你需要适当处理异常和多线程同步等问题。
85+
86+
## 独立启动 udp 服务
87+
88+
本文档提供了一个简单 UDP 服务器的实现方法
89+
90+
### 1. 主程序入口 (`Main.java`)
91+
92+
这个方法会自动扫描带有`@AComponentScan`注解的组件并进行初始化。
93+
94+
```java
95+
package demo.udp;
96+
97+
import com.litongjava.hotswap.wrapper.tio.boot.TioApplicationWrapper;
98+
import com.litongjava.jfinal.aop.annotation.AComponent;
99+
import com.litongjava.jfinal.aop.annotation.AComponentScan;
100+
101+
@AComponentScan
102+
public class Main {
103+
public static void main(String[] args) {
104+
long start = System.currentTimeMillis();
105+
TioApplicationWrapper.run(Main.class, args);
106+
long end = System.currentTimeMillis();
107+
System.out.println((end - start) + "(ms)");
108+
}
109+
}
110+
```
111+
112+
### 2. UDP 消息处理 (`DemoUdpHandler.java`)
113+
114+
该类负责接收和处理 UDP 消息。它读取数据包内容,记录发送者信息,并将收到的消息回发给发送者。
115+
116+
```java
117+
package demo.udp.handler;
118+
119+
import java.net.DatagramPacket;
120+
import java.net.DatagramSocket;
121+
import java.net.InetSocketAddress;
122+
123+
import com.litongjava.tio.core.Node;
124+
import com.litongjava.tio.core.udp.UdpPacket;
125+
import com.litongjava.tio.core.udp.intf.UdpHandler;
126+
127+
import lombok.extern.slf4j.Slf4j;
128+
129+
@Slf4j
130+
public class DemoUdpHandler implements UdpHandler {
131+
/**
132+
* 处理udp消息
133+
*/
134+
public void handler(UdpPacket udpPacket, DatagramSocket datagramSocket) {
135+
byte[] data = udpPacket.getData();
136+
String msg = new String(data);
137+
Node remote = udpPacket.getRemote();
138+
139+
log.info("收到来自{}的消息:【{}】", remote, msg);
140+
DatagramPacket datagramPacket = new DatagramPacket(data, data.length,
141+
new InetSocketAddress(remote.getIp(), remote.getPort()));
142+
try {
143+
datagramSocket.send(datagramPacket);
144+
} catch (Throwable e) {
145+
log.error(e.toString(), e);
146+
}
147+
}
148+
149+
}
150+
```
151+
152+
## 3. UDP 服务器配置 (`UdpServerConfig.java`)
153+
154+
这个类用于配置和启动 UDP 服务器。它创建了一个`UdpServerConf`实例,指定服务器的端口和处理器,然后启动服务器。
155+
156+
```
157+
import java.net.SocketException;
158+
159+
import com.litongjava.jfinal.aop.annotation.AConfiguration;
160+
import com.litongjava.jfinal.aop.annotation.AInitialization;
161+
import com.litongjava.tio.core.udp.UdpServer;
162+
import com.litongjava.tio.core.udp.UdpServerConf;
163+
164+
import demo.udp.handler.DemoUdpHandler;
165+
import lombok.extern.slf4j.Slf4j;
166+
167+
@Slf4j
168+
@AConfiguration
169+
public class UdpServerConfig {
170+
171+
@AInitialization
172+
public void config() {
173+
DemoUdpHandler fpmsUdpHandler = new DemoUdpHandler();
174+
UdpServerConf udpServerConf = new UdpServerConf(3000, fpmsUdpHandler, 5000);
175+
UdpServer udpServer;
176+
try {
177+
udpServer = new UdpServer(udpServerConf);
178+
udpServer.start();
179+
log.info("udp started");
180+
} catch (SocketException e) {
181+
e.printStackTrace();
182+
}
183+
184+
}
185+
}
186+
```
187+
188+
## Udp 客户端
189+
190+
tio-boot 同样提供了 UDP 客户端,你可以使用 UDP 客户端给上面的服务发送数据
191+
192+
```
193+
package demo.udp.client;
194+
195+
import com.litongjava.tio.core.udp.UdpClient;
196+
import com.litongjava.tio.core.udp.UdpClientConf;
197+
198+
public class UdpClientDemo {
199+
200+
public static void main(String[] args) {
201+
UdpClientConf udpClientConf = new UdpClientConf("127.0.0.1", 3000, 5000);
202+
UdpClient udpClient = new UdpClient(udpClientConf);
203+
udpClient.start();
204+
205+
long start = System.currentTimeMillis();
206+
for (int i = 0; i < 1000000; i++) {
207+
String str = i + "、" + "hello";
208+
udpClient.send(str.getBytes());
209+
}
210+
long end = System.currentTimeMillis();
211+
long iv = end - start;
212+
System.out.println("耗时:" + iv + "ms");
213+
}
214+
}
215+
```
216+
217+
## 测试代码地址
218+
219+
https://github.com/litongjava/java-ee-tio-boot-study/tree/main/tio-boot-latest-study/tio-boot-udp-demo01

docs/zh/06_内置组件/12.md

+40
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
# 使用内置 UDPServer
2+
3+
tio-boot 内置了 UDPServer tio-boot 内置了 UDP Server,你只需要配置 UDPHandler 即可使用
4+
5+
将 UDPServer 和 TioBoot 配置为相同的端口即可,其他细节可以参考上一章节
6+
7+
```
8+
import java.net.SocketException;
9+
10+
import com.litongjava.jfinal.aop.annotation.AConfiguration;
11+
import com.litongjava.jfinal.aop.annotation.AInitialization;
12+
import com.litongjava.tio.boot.constatns.ConfigKeys;
13+
import com.litongjava.tio.core.udp.UdpServer;
14+
import com.litongjava.tio.core.udp.UdpServerConf;
15+
import com.litongjava.tio.utils.environment.EnvironmentUtils;
16+
17+
import demo.udp.handler.DemoUdpHandler;
18+
import lombok.extern.slf4j.Slf4j;
19+
20+
@Slf4j
21+
@AConfiguration
22+
public class UdpServerConfig {
23+
24+
@AInitialization
25+
public void config() {
26+
int port = EnvironmentUtils.getInt(ConfigKeys.SERVER_PORT,80);
27+
DemoUdpHandler fpmsUdpHandler = new DemoUdpHandler();
28+
UdpServerConf udpServerConf = new UdpServerConf(port, fpmsUdpHandler, 5000);
29+
UdpServer udpServer;
30+
try {
31+
udpServer = new UdpServer(udpServerConf);
32+
udpServer.start();
33+
log.info("udp started");
34+
} catch (SocketException e) {
35+
e.printStackTrace();
36+
}
37+
38+
}
39+
}
40+
```

0 commit comments

Comments
 (0)