Skip to content

Commit 9c70435

Browse files
author
litongjava
committed
add disruptor
1 parent 1aa3d20 commit 9c70435

File tree

2 files changed

+285
-1
lines changed

2 files changed

+285
-1
lines changed

docs/.vuepress/config/sidebar-zh.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@
9494
{
9595
"title": "10_MQ",
9696
"collapsable": false,
97-
"children": ["10_MQ/01.md", "10_MQ/02.md"]
97+
"children": ["10_MQ/01.md", "10_MQ/02.md", "10_MQ/03.md"]
9898
},
9999
{
100100
"title": "11_i18n",

docs/zh/10_MQ/03.md

+284
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,284 @@
1+
# Disruptor
2+
3+
## 背景
4+
5+
工作中遇到项目使用 Disruptor 做消息队列,对你没看错,不是 Kafka,也不是 rabbitmq;Disruptor 有个最大的优点就是快,还有一点它是开源的哦,下面做个简单的记录.
6+
7+
## Disruptor 介绍
8+
9+
Disruptor 的 github 主页:https://github.com/LMAX-Exchange/disruptor
10+
Disruptor 是英国外汇交易公司 LMAX 开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与 I/O 操作处于同样的数量级)。基于 Disruptor 开发的系统单线程能支撑每秒 600 万订单,2010 年在 QCon 演讲后,获得了业界关注。
11+
12+
- Disruptor 是一个开源的 Java 框架,它被设计用于在生产者—消费者(producer-consumer problem,简称 PCP)问题上获得尽量高的吞吐量(TPS)和尽量低的延迟。
13+
- 从功能上来看,Disruptor 是实现了“队列”的功能,而且是一个有界队列。那么它的应用场景自然就是“生产者-消费者”模型的应用场合了。
14+
- Disruptor 是 LMAX 在线交易平台的关键组成部分,LMAX 平台使用该框架对订单处理速度能达到 600 万 TPS,除金融领域之外,其他一般的应用中都可以用到 Disruptor,它可以带来显著的性能提升。
15+
- 实 Disruptor 与其说是一个框架,不如说是一种设计思路,这个设计思路对于存在“并发、缓冲区、生产者—消费者模型、事务处理”这些元素的程序来说,Disruptor 提出了一种大幅提升性能(TPS)的方案。
16+
17+
## Disruptor 的核心概念
18+
19+
先从了解 Disruptor 的核心概念开始,来了解它是如何运作的。下面介绍的概念模型,既是领域对象,也是映射到代码实现上的核心对象。
20+
21+
1. Ring Buffer
22+
23+
如其名,环形的缓冲区。曾经 RingBuffer 是 Disruptor 中的最主要的对象,但从 3.0 版本开始,其职责被简化为仅仅负责对通过 Disruptor 进行交换的数据(事件)进行存储和更新。在一些更高级的应用场景中,Ring Buffer 可以由用户的自定义实现来完全替代。
24+
25+
2. Sequence Disruptor
26+
27+
通过顺序递增的序号来编号管理通过其进行交换的数据(事件),对数据(事件)的处理过程总是沿着序号逐个递增处理。一个 Sequence 用于跟踪标识某个特定的事件处理者( RingBuffer/Consumer )的处理进度。虽然一个 AtomicLong 也可以用于标识进度,但定义 Sequence 来负责该问题还有另一个目的,那就是防止不同的 Sequence 之间的 CPU 缓存伪共享(Flase Sharing)问题。
28+
29+
(注:这是 Disruptor 实现高性能的关键点之一,网上关于伪共享问题的介绍已经汗牛充栋,在此不再赘述)。
30+
31+
3. Sequencer
32+
33+
Sequencer 是 Disruptor 的真正核心。此接口有两个实现类 SingleProducerSequencer、MultiProducerSequencer ,它们定义在生产者和消费者之间快速、正确地传递数据的并发算法。
34+
35+
4. Sequence Barrier
36+
37+
用于保持对 RingBuffer 的 main published Sequence 和 Consumer 依赖的其它 Consumer 的 Sequence 的引用。Sequence Barrier 还定义了决定 Consumer 是否还有可处理的事件的逻辑。
38+
39+
5. Wait Strategy
40+
41+
定义 Consumer 如何进行等待下一个事件的策略。(注:Disruptor 定义了多种不同的策略,针对不同的场景,提供了不一样的性能表现)
42+
43+
6. Event
44+
45+
在 Disruptor 的语义中,生产者和消费者之间进行交换的数据被称为事件(Event)。它不是一个被 Disruptor 定义的特定类型,而是由 Disruptor 的使用者定义并指定。
46+
47+
7. EventProcessor
48+
49+
EventProcessor 持有特定消费者(Consumer)的 Sequence,并提供用于调用事件处理实现的事件循环(Event Loop)。
50+
51+
8. EventHandler
52+
53+
Disruptor 定义的事件处理接口,由用户实现,用于处理事件,是 Consumer 的真正实现。
54+
55+
9. Producer
56+
57+
即生产者,只是泛指调用 Disruptor 发布事件的用户代码,Disruptor 没有定义特定接口或类型。
58+
59+
## 案例-demo
60+
61+
通过下面几个个步骤,你就能将 Disruptor Get 回家啦:
62+
63+
1、添加 pom.xml 依赖
64+
65+
```
66+
<dependency>
67+
<groupId>com.lmax</groupId>
68+
<artifactId>disruptor</artifactId>
69+
<version>3.4.4</version>
70+
</dependency>
71+
```
72+
73+
2、消息体 Model
74+
75+
```java
76+
import lombok.Data;
77+
78+
/**
79+
* 消息体
80+
*/
81+
@Data
82+
public class MessageModel {
83+
private String message;
84+
}
85+
```
86+
87+
3、构造 EventFactory
88+
89+
```java
90+
import com.litongjava.tio.boot.demo.disruptor.model.MessageModel;
91+
import com.lmax.disruptor.EventFactory;
92+
93+
public class HelloEventFactory implements EventFactory<MessageModel> {
94+
95+
public MessageModel newInstance() {
96+
return new MessageModel();
97+
}
98+
}
99+
```
100+
101+
4、构造 EventHandler-消费者
102+
103+
```java
104+
import com.litongjava.tio.boot.demo.disruptor.model.MessageModel;
105+
import com.lmax.disruptor.EventHandler;
106+
107+
import lombok.extern.slf4j.Slf4j;
108+
109+
@Slf4j
110+
public class HelloEventHandler implements EventHandler<MessageModel> {
111+
public void onEvent(MessageModel event, long sequence, boolean endOfBatch) {
112+
try {
113+
// 这里停止1000ms是为了确定消费消息是异步的
114+
Thread.sleep(1000);
115+
log.info("消费者处理消息开始");
116+
if (event != null) {
117+
log.info("消费者消费的信息是:{}", event);
118+
}
119+
} catch (Exception e) {
120+
log.info("消费者处理消息失败");
121+
}
122+
log.info("消费者处理消息结束");
123+
}
124+
}
125+
```
126+
127+
5、构造 DisruptorConfiguration
128+
129+
```java
130+
package com.litongjava.tio.boot.demo.disruptor.config;
131+
132+
import java.util.concurrent.ExecutorService;
133+
import java.util.concurrent.Executors;
134+
135+
import com.litongjava.jfinal.aop.annotation.ABean;
136+
import com.litongjava.jfinal.aop.annotation.AConfiguration;
137+
import com.litongjava.tio.boot.demo.disruptor.factory.HelloEventFactory;
138+
import com.litongjava.tio.boot.demo.disruptor.handler.HelloEventHandler;
139+
import com.litongjava.tio.boot.demo.disruptor.model.MessageModel;
140+
import com.litongjava.tio.boot.server.TioBootServer;
141+
import com.lmax.disruptor.BlockingWaitStrategy;
142+
import com.lmax.disruptor.RingBuffer;
143+
import com.lmax.disruptor.dsl.Disruptor;
144+
import com.lmax.disruptor.dsl.ProducerType;
145+
146+
@AConfiguration
147+
public class DisruptorConfiguration {
148+
@ABean
149+
public RingBuffer<MessageModel> messageModelRingBuffer() {
150+
// 定义用于事件处理的线程池, Disruptor通过java.util.concurrent.ExecutorSerivce提供的线程来触发consumer的事件处理
151+
ExecutorService executor = Executors.newFixedThreadPool(2);
152+
// 指定事件工厂
153+
HelloEventFactory factory = new HelloEventFactory();
154+
// 指定ringbuffer字节大小,必须为2的N次方(能将求模运算转为位运算提高效率),否则将影响效率
155+
int bufferSize = 1024 * 256;
156+
157+
BlockingWaitStrategy waitStrategy = new BlockingWaitStrategy();
158+
// 单线程模式,获取额外的性能
159+
@SuppressWarnings("deprecation")
160+
Disruptor<MessageModel> disruptor = new Disruptor<>(factory, bufferSize, executor, ProducerType.SINGLE,
161+
waitStrategy);
162+
// 设置事件业务处理器---消费者
163+
disruptor.handleEventsWith(new HelloEventHandler());
164+
// 启动disruptor线程
165+
disruptor.start();
166+
// 获取ringbuffer环,用于接取生产者生产的事件
167+
RingBuffer<MessageModel> ringBuffer = disruptor.getRingBuffer();
168+
// 关闭程序时关闭时disruptor
169+
TioBootServer.me().addDestroyMethod(disruptor::shutdown);
170+
return ringBuffer;
171+
}
172+
173+
}
174+
```
175+
176+
6、构造 Mqservice 和实现类-生产者
177+
178+
```java
179+
public interface DisruptorMqService {
180+
/**
181+
* 消息
182+
* @param message
183+
*/
184+
void sayHelloMq(String message);
185+
}
186+
```
187+
188+
```java
189+
import com.litongjava.jfinal.aop.annotation.AAutowired;
190+
import com.litongjava.jfinal.aop.annotation.AService;
191+
import com.litongjava.tio.boot.demo.disruptor.model.MessageModel;
192+
import com.lmax.disruptor.RingBuffer;
193+
194+
import lombok.extern.slf4j.Slf4j;
195+
196+
@Slf4j
197+
@AService
198+
public class DisruptorMqServiceImpl implements DisruptorMqService {
199+
200+
@AAutowired
201+
private RingBuffer<MessageModel> messageModelRingBuffer;
202+
203+
public void sayHelloMq(String message) {
204+
log.info("record the message: {}", message);
205+
// 获取下一个Event槽的下标
206+
long sequence = messageModelRingBuffer.next();
207+
try {
208+
// 给Event填充数据
209+
MessageModel event = messageModelRingBuffer.get(sequence);
210+
event.setMessage(message);
211+
log.info("往消息队列中添加消息:{}", event);
212+
} catch (Exception e) {
213+
log.error("failed to add event to messageModelRingBuffer for : e = {},{}", e, e.getMessage());
214+
} finally {
215+
// 发布Event,激活观察者去消费,将sequence传递给改消费者
216+
// 注意最后的publish方法必须放在finally中以确保必须得到调用;如果某个请求的sequence未被提交将会堵塞后续的发布操作或者其他的producer
217+
messageModelRingBuffer.publish(sequence);
218+
}
219+
}
220+
}
221+
```
222+
223+
8、构造测试类及方法
224+
225+
```java
226+
package com.litongjava.tio.boot.demo.disruptor.service;
227+
228+
import org.junit.Before;
229+
import org.junit.Test;
230+
231+
import com.litongjava.jfinal.aop.Aop;
232+
import com.litongjava.tio.boot.demo.disruptor.HelloApp;
233+
import com.litongjava.tio.boot.tesing.TioBootTest;
234+
235+
import lombok.extern.slf4j.Slf4j;
236+
237+
@Slf4j
238+
public class DisruptorMqServiceImplTest {
239+
240+
@Before
241+
public void before() throws Exception {
242+
//TioBootTest.before();
243+
TioBootTest.scan(HelloApp.class);
244+
}
245+
246+
/**
247+
* 项目内部使用Disruptor做消息队列
248+
* @throws Exception
249+
*/
250+
@Test
251+
public void sayHelloMqTest() throws Exception {
252+
DisruptorMqService disruptorMqService = Aop.get(DisruptorMqService.class);
253+
// send
254+
disruptorMqService.sayHelloMq("消息到了,Hello world!");
255+
log.info("消息队列已发送完毕");
256+
// 这里停止2000ms是为了确定是处理消息是异步的
257+
Thread.sleep(2000);
258+
}
259+
}
260+
```
261+
262+
测试运行结果
263+
264+
```
265+
11:49:13.605 [main] INFO com.litongjava.jfinal.aop.scaner.ComponentScanner - resource:file:/D:/dev_workspace/eclipse-jee-2022-6/tio-boot-disruptor-demo01/target/test-classes/com/litongjava/tio/boot/demo/disruptor
266+
11:49:13.609 [main] INFO com.litongjava.jfinal.aop.scaner.ComponentScanner - resource:file:/D:/dev_workspace/eclipse-jee-2022-6/tio-boot-disruptor-demo01/target/classes/com/litongjava/tio/boot/demo/disruptor
267+
11:49:13.663 [main] INFO com.litongjava.jfinal.aop.process.ConfigurationAnnotaionProcess - start init config bean:public com.lmax.disruptor.RingBuffer com.litongjava.tio.boot.demo.disruptor.config.DisruptorConfiguration.messageModelRingBuffer()
268+
11:49:13.731 [main] INFO com.litongjava.tio.boot.demo.disruptor.service.DisruptorMqServiceImpl - record the message: 消息到了,Hello world!
269+
11:49:13.731 [main] INFO com.litongjava.tio.boot.demo.disruptor.service.DisruptorMqServiceImpl - 往消息队列中添加消息:MessageModel(message=消息到了,Hello world!)
270+
11:49:13.731 [main] INFO com.litongjava.tio.boot.demo.disruptor.service.DisruptorMqServiceImplTest - 消息队列已发送完毕
271+
11:49:14.739 [pool-1-thread-1] INFO com.litongjava.tio.boot.demo.disruptor.handler.HelloEventHandler - 消费者处理消息开始
272+
11:49:14.739 [pool-1-thread-1] INFO com.litongjava.tio.boot.demo.disruptor.handler.HelloEventHandler - 消费者消费的信息是:MessageModel(message=消息到了,Hello world!)
273+
11:49:14.739 [pool-1-thread-1] INFO com.litongjava.tio.boot.demo.disruptor.handler.HelloEventHandler - 消费者处理消息结束
274+
275+
```
276+
277+
### 总结
278+
279+
其实 生成者 -> 消费者 模式是很常见的,通过一些消息队列也可以轻松做到上述的效果。不同的地方在于,Disruptor 是在内存中以队列的方式去实现的,而且是无锁的。这也是 Disruptor 为什么高效的原因。
280+
281+
###
282+
283+
测试代码地址
284+
https://github.com/litongjava/java-ee-tio-boot-study/tree/main/tio-boot-latest-study/tio-boot-disruptor-demo01

0 commit comments

Comments
 (0)