Skip to content

Commit cbd1c2f

Browse files
author
litongjava
committed
add handler threadpool sse
1 parent 774f0a4 commit cbd1c2f

File tree

1 file changed

+140
-0
lines changed
  • docs/zh/06_内置组件

1 file changed

+140
-0
lines changed

docs/zh/06_内置组件/02.md

+140
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,146 @@ public class MessageHander {
193193

194194
```
195195

196+
### handler 配合线程池,读取音频文件并发送
197+
198+
实现了一个 SSE 服务端,用于从资源文件中读取音频数据,并将其作为 Base64 编码的二进制数据块发送给客户端。它使用了一个线程池来异步处理数据传输,并在应用程序关闭时正确关闭线程池。
199+
200+
```
201+
import java.util.concurrent.ExecutorService;
202+
203+
import com.litongjava.jfinal.aop.annotation.AConfiguration;
204+
import com.litongjava.jfinal.aop.annotation.AInitialization;
205+
import com.litongjava.tio.boot.server.TioBootServer;
206+
import com.litongjava.tio.utils.thread.ThreadUtils;
207+
208+
@AConfiguration
209+
public class ExecutorServiceConfig {
210+
211+
@AInitialization
212+
public void config() {
213+
// 创建包含10个线程的线程池
214+
ExecutorService executor = ThreadUtils.newFixedThreadPool(10);
215+
216+
// 项目关闭时,关闭线程池
217+
TioBootServer.me().addDestroyMethod(() -> {
218+
if (executor != null && !executor.isShutdown()) {
219+
executor.shutdown();
220+
}
221+
});
222+
223+
}
224+
}
225+
```
226+
227+
```
228+
import java.io.InputStream;
229+
import java.util.Arrays;
230+
import java.util.Base64;
231+
232+
import com.litongjava.tio.core.ChannelContext;
233+
import com.litongjava.tio.core.Tio;
234+
import com.litongjava.tio.http.common.HttpRequest;
235+
import com.litongjava.tio.http.common.HttpResponse;
236+
import com.litongjava.tio.http.server.sse.SsePacket;
237+
import com.litongjava.tio.utils.hutool.ResourceUtil;
238+
import com.litongjava.tio.utils.thread.ThreadUtils;
239+
240+
import lombok.extern.slf4j.Slf4j;
241+
242+
@Slf4j
243+
public class MessageHander {
244+
245+
public HttpResponse send(HttpRequest httpRequest) {
246+
247+
ChannelContext channelContext = httpRequest.getChannelContext();
248+
249+
// 设置sse请求头
250+
HttpResponse httpResponse = new HttpResponse(httpRequest).setServerSentEventsHeader();
251+
252+
// 发送http响应包,告诉客户端保持连接
253+
Tio.send(channelContext, httpResponse);
254+
255+
// 发送数据
256+
sendData(channelContext);
257+
258+
// 告诉处理器不要将消息发送给客户端
259+
return new HttpResponse().setSend(false);
260+
}
261+
262+
private void sendData(ChannelContext channelContext) {
263+
// 读取音频文件,准备音频流
264+
InputStream inputStream = ResourceUtil.getResourceAsStream("audio/01.mp3");
265+
266+
ThreadUtils.getFixedThreadPool().submit(() -> {
267+
// 手动移除连接
268+
String eventName = "data";
269+
270+
try {
271+
// 从InputStream读取数据并转换为字符串
272+
byte[] buffer = new byte[1024 * 10];
273+
int bytesRead;
274+
int id = 0;
275+
while ((bytesRead = inputStream.read(buffer)) != -1) {
276+
// 创建SsePacket并发送
277+
byte[] binaryData = Arrays.copyOf(buffer, bytesRead);
278+
String base64Data = Base64.getEncoder().encodeToString(binaryData);
279+
SsePacket ssePacket = new SsePacket().id(id).event(eventName).data(base64Data.getBytes());
280+
Tio.send(channelContext, ssePacket);
281+
id++;
282+
log.info("id:{}", id);
283+
}
284+
} catch (Exception e) {
285+
log.error("Error sending data: {}", e.getMessage());
286+
} finally {
287+
try {
288+
inputStream.close();
289+
} catch (Exception e) {
290+
log.error("Error closing input stream: {}", e.getMessage());
291+
}
292+
Tio.remove(channelContext, "remove sse");
293+
}
294+
});
295+
}
296+
}
297+
```
298+
299+
代码包含两个 Java 类: `ExecutorServiceConfig``MessageHander`。以下是每个类的文档说明:
300+
301+
**ExecutorServiceConfig**
302+
303+
这个类用于配置和管理线程池。它使用了 JFinal AOP 注解进行配置。
304+
305+
- `@AConfiguration`: 这个注解表示该类是一个配置类。
306+
- `config()` 方法:
307+
- 使用 `ThreadUtils.newFixedThreadPool(10)` 创建了一个包含 10 个线程的线程池。
308+
- 注册了一个 `DestroyMethod`,在应用程序关闭时关闭线程池。
309+
310+
**MessageHander**
311+
312+
这个类负责处理和发送音频数据。它实现了服务器发送事件 (Server-Sent Events, SSE) 的功能。
313+
314+
- `@Slf4j`: 这个注解自动配置了一个 slf4j 日志记录器。
315+
- `send(HttpRequest httpRequest)` 方法:
316+
-`HttpRequest` 对象中获取 `ChannelContext`
317+
- 设置 SSE 响应头。
318+
- 发送 HTTP 响应包以保持连接。
319+
- 调用 `sendData` 方法发送数据。
320+
- 返回一个新的 `HttpResponse` 对象,并设置 `setSend(false)` 以避免重复发送响应。
321+
- `sendData(ChannelContext channelContext)` 方法:
322+
- 从资源文件中读取音频文件。
323+
- 在线程池中执行以下操作:
324+
- 初始化一个 10KB 的缓冲区。
325+
- 从输入流中循环读取数据。
326+
- 为每个读取的数据块:
327+
- 创建一个新的字节数组,仅包含实际读取的字节数。
328+
- 使用 Base64 编码器将字节数组编码为字符串。
329+
- 创建一个新的 `SsePacket` 对象,包含 ID、事件名称和编码后的数据。
330+
- 使用 `Tio.send` 发送 `SsePacket` 对象。
331+
- 增加 ID 计数器。
332+
- 记录日志信息。
333+
- 在循环结束时关闭输入流。
334+
- 移除 `ChannelContext` 连接。
335+
196336
### 总结
197337

198338
通过上述步骤,你可以在 tio-boot 框架中成功整合 SSE,从而使你的应用能够实时地向客户端推送数据。这种方法的优点在于其简单性和低延迟,非常适用于需要服务器实时更新的场景。

0 commit comments

Comments
 (0)