Skip to content

Commit fc225c4

Browse files
committed
docs:专栏更新
1 parent 12427fc commit fc225c4

9 files changed

+1022
-7
lines changed

.vscode/.server-controller-port.log

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{
2-
"port": 9145,
3-
"time": 1723122064966,
2+
"port": 13452,
3+
"time": 1723549160905,
44
"version": "0.0.3"
55
}

README.md

+6-4
Original file line numberDiff line numberDiff line change
@@ -59,14 +59,16 @@ docs/.vuepress/public/images 存储网站本身展示所需宣传营销图片。
5959
注意本仓库分为 master、main两个分支,只在 main 分支操作文章,勿碰 master 分支!
6060

6161
## FAQ
62-
文章名称不要带有括号等特色字符!
62+
文章名称不要带有括号、#等特色字符
6363

64-
文章内容不要带有尖括号等特殊字符,如
64+
文章内容不要带有尖括号、#等特殊字符,如
6565
```
6666
- Java API中,用户需要使用Dataset<Row>表示DataFrame
6767
```
68-
会导致整篇文章不显示!对此,需将其包进一个代码块里,如:
68+
会导致整篇文章不显示!
69+
70+
对此,需将其包进一个代码块里,如:
6971
`Dataset<Row>`
7072
这样就能正常显示了。
7173

72-
文章内容不要使用 html 标签渲染,也会导致空白页!
74+
文章内容不要使用 html 标签渲染,也会导致空白页!

docs/.vuepress/config.js

+18
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,11 @@ module.exports = {
152152
text: '00-DDD专栏规划',
153153
link: '/md/DDD/00-DDD专栏规划.md'
154154
},
155+
156+
{
157+
text: '事件驱动',
158+
link: '/md/DDD/integrating-event-driven-microservices-with-request-response-APIs.md'
159+
},
155160
{
156161
text: '00-大厂实践',
157162
link: '/md/DDD/02-领域驱动设计DDD在B端营销系统的实践.md'
@@ -1169,6 +1174,16 @@ module.exports = {
11691174
]
11701175
},
11711176

1177+
{
1178+
title: "SpringMVC",
1179+
collapsable: false,
1180+
sidebarDepth: 0,
1181+
children: [
1182+
"SpringMVC-service-doDispatch",
1183+
"SpringMVC-DispatcherServlet-doDispatch",
1184+
]
1185+
},
1186+
11721187
{
11731188
title: "SpringBoot",
11741189
collapsable: false,
@@ -1246,6 +1261,7 @@ module.exports = {
12461261
sidebarDepth: 0,
12471262
children: [
12481263
"integrating-event-driven-microservices-with-request-response-APIs",
1264+
"decouple-event-retrieval-from-processing",
12491265
]
12501266
},
12511267
{
@@ -1875,9 +1891,11 @@ module.exports = {
18751891
children: [
18761892
"01-Flink实战-概述",
18771893
"flink-beginner-case-study",
1894+
"Flink部署及任务提交",
18781895
"flink-programming-paradigms-core-concepts",
18791896
"flink-architecture",
18801897
"flink-state-management",
1898+
"flink-state-backend",
18811899
"05-Flink实战DataStream API编程",
18821900
"streaming-connectors-programming",
18831901
"flink-data-latency-solution",
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
# 将事件检索与事件处理解耦
2+
3+
## 0 前言
4+
5+
part1讨论了集成过程中遇到的挑战以及幂等事件处理的作用。解决集成问题之后,我们需要反思事件检索的问题。我们的经验教训表明,将事件检索与事件处理解耦至关重要。
6+
7+
## 1 事件处理与请求/响应 API 紧耦合
8+
9+
part1讨论了将请求/响应 API 集成到事件驱动微服务中时,由于基于请求/响应的通信,导致紧耦合。单个事件的处理速度取决于请求/响应 API 及其响应时间,因为事件处理会阻塞直到收到响应。
10+
11+
像我们在part1中使用的简单事件循环实现或 [AWS SQS Java Messaging Library](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-jms-code-examples.html#example-synchronous-message-receiver) 中的工作示例,会顺序处理事件。
12+
13+
不推荐这种方法,因为整体处理时间是所有单个处理时间的总和。
14+
15+
## 2 并发处理事件
16+
17+
幸运的是,像 [Spring Cloud AWS](https://awspring.io/) 这种库提供支持并发处理事件的更高效实现。属性 [ALWAYS_POLL_MAX_MESSAGES](https://docs.awspring.io/spring-cloud-aws/docs/3.0.2/reference/html/index.html#message-processing-throughput) 的行为在下图概述:并发事件处理
18+
19+
![](https://www.thoughtworks.com/content/dam/thoughtworks/images/photography/inline-image/insights/blog/apis/blg_inline_Konrad_MS_Part_Two_Figure_1.jpg)
20+
21+
检索到一批事件后,每个事件在一个单独的线程中并发处理。当所有线程完成处理后,将检索下一批事件。由于基于请求/响应的通信导致的紧耦合,可能使事件处理速度不同。较快的线程会在较慢的线程处理事件时处于等待状态。因此,一批事件的处理时间对应于处理最慢的事件的时间。
22+
23+
当事件顺序不重要时,并发处理可以是一个合理的默认设置。但根据经验,某些情况下,事件处理可进一步优化。当单个事件的处理时间差异较大时,线程可能长时间处于等待状态。
24+
25+
如集成了一个性能波动较大的请求/响应 API。平均而言,该 API 在 0.5s 后响应。但第 95 百分位和第 99 百分位值经常分别为 1.5s 和超过 10s。在这种并发事件处理方式中,由于响应缓慢的 API,线程经常会等待几s,然后才能处理新事件。
26+
27+
## 3 将事件检索与事件处理解耦
28+
29+
即可进一步优化事件处理。这样,处理时间较长的单个事件不会减慢其他事件的处理速度。Spring Cloud AWS 提供了 [FIXED_HIGH_THROUGHPUT](https://docs.awspring.io/spring-cloud-aws/docs/3.0.2/reference/html/index.html#message-processing-throughput) 属性,展示了这种解耦可能的实现方式。
30+
31+
具体描述如下。详细信息可在[文档](https://docs.awspring.io/spring-cloud-aws/docs/3.0.2/reference/html/index.html#message-processing-throughput)中找到。
32+
33+
解耦的事件处理策略:
34+
35+
![](https://www.thoughtworks.com/content/dam/thoughtworks/images/photography/inline-image/insights/blog/apis/blg_inline_Konrad_MS_Part_Two_Figure_2.jpg)
36+
37+
为此,定义一个额外属性,用于在两次事件检索之间的最大等待时间。当所有事件已处理完毕或等待时间已过期时,将检索新事件。若在等待时间过期后,如一个事件仍未处理完毕,则会提前接收九个新事件,并可以开始处理。这意味着这九个线程不会等到最后一个事件处理完毕后才开始工作。
38+
39+
根据经验,如果等待时间和其他参数配置得当,解耦可提高单个线程的利用率。一个可能缺点,由于事件往往以更频繁但较小批次的方式被检索,因此可能增加成本。因此,了解 API 性能特征,对于在并发和解耦事件处理之间做出选择至关重要。
40+
41+
## 4 结论
42+
43+
当你将事件驱动微服务与请求/响应 API 集成时,会引入紧耦合。请求/响应 API 的性能特征很重要,因为它们有助于你在并发和解耦事件处理之间做出选择。
44+
45+
本文重点讨论了请求/响应 API 的请求时间性能及其如何影响事件驱动微服务的性能。
Original file line numberDiff line numberDiff line change
@@ -1 +1,122 @@
1-
# 将事件驱动的微服务与请求响应API集成
1+
# 将事件驱动的微服务与请求响应API集成
2+
3+
## 0 前言
4+
5+
事件驱动微服务架构在当今非常流行,广泛采用的原因之一是它们促进了松耦合。
6+
7+
但使用基于请求/响应的通信也有很好的理由。如系统现代化过程中,有些系统已迁移到事件驱动架构,而有些系统还没。或你可能使用通过HTTP提供REST API服务的第三方SaaS解决方案。在这些情况下,将事件驱动的微服务与请求/响应API集成并不罕见。这种集成引入了新的挑战,因为带来紧耦合问题。
8+
9+
本系列分享在将事件驱动微服务与请求/响应API集成过程中学到的经验:
10+
11+
- 实现幂等的事件处理(本文)
12+
- 解耦事件检索与事件处理(part2)
13+
- 使用断路器暂停事件检索(part3)
14+
- 限制事件处理的速率(part4)
15+
16+
## 1 挑战:将事件驱动的微服务与请求/响应API集成
17+
18+
要理解为啥要实现幂等的事件处理,先关注事件驱动的微服务与请求/响应API的集成。
19+
20+
事件驱动通信是一种间接的通信方式,微服务通过生成和消费事件并通过类似AWS SQS这种中间件进行交换来相互通信。事件驱动的微服务通常实现一个事件循环,不断从中间件中检索事件并处理它们(有时称 [轮询消费者](https://www.enterpriseintegrationpatterns.com/patterns/messaging/PollingConsumer.html))。
21+
22+
## 2 代码示例
23+
24+
25+
26+
```java
27+
// 简化的事件循环
28+
29+
void eventLoop() {
30+
// 无限循环
31+
while(true) {
32+
// 获取事件
33+
List<Events> events = retrieveEvents();
34+
for (Event event : events) {
35+
// 处理事件
36+
processEvent(event);
37+
}
38+
}
39+
}
40+
```
41+
42+
将事件驱动的微服务与基于请求/响应的API集成,实际上意味着在事件处理过程中从事件循环中发送请求到API。当请求发送到API后,事件的处理会被阻塞,直到收到响应。然后,事件循环才会继续处理剩余的业务逻辑。
43+
44+
如下代码示例说明这点:
45+
46+
```java
47+
// 集成请求/响应API的简化事件处理
48+
49+
void processEvent(Event event) {
50+
/* ... */
51+
// 根据事件创建一个请求对象
52+
Request request = createRequest(event);
53+
// 发送请求并等待响应
54+
Response response = sendRequestAndWaitForResponse(request);
55+
// 处理事件和响应的进一步业务逻辑
56+
moreBusinessLogic(event, response);
57+
/* ... */
58+
}
59+
```
60+
61+
## 3 图解
62+
63+
描述了中间件、事件驱动的微服务和请求/响应API之间的详细交互:
64+
65+
![](https://www.thoughtworks.com/content/dam/thoughtworks/images/photography/inline-image/insights/blog/apis/blg_inline_Konrad_MS_Part_One_Figure_1.jpg)
66+
67+
集成挑战在于两种不同的通信风格:
68+
69+
- 事件驱动通信是异步的
70+
- 而基于请求/响应的通信是同步的
71+
72+
事件驱动的微服务从中间件中检索事件,并以一种与生成事件的微服务解耦的方式处理这些事件。它还独立于生成微服务发出新事件的速度。这意味着,即使消费微服务暂时不可用,生成微服务也可以继续发出事件,而中间件则缓冲这些事件,以便稍后检索。
73+
74+
相反,基于请求/响应的通信是两个微服务之间的直接通信。如果上游微服务向下游微服务的请求/响应API发送请求,它会阻塞自己的处理,直到收到下游微服务的响应。这种紧耦合意味着两个微服务都需要可用。下游微服务的处理还会受到上游微服务发送请求速度的影响。
75+
76+
## 4 经验:实现幂等的事件处理
77+
78+
在处理事件驱动的微服务时,重试不可避免;某些事件会被多次消费。这是因为中间件通常提供某种程度的传递保证,如 [至少一次](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/standard-queues.html) 传递(以AWS SQS为例),以及在处理失败或耗时过长时使用的重试功能(如 [可见性超时](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html))。
79+
80+
这就是为啥事件处理过程中需要考虑重试并使处理幂等化。即当一个事件被多次处理时,其结果应与处理一次时相同。通过实现 [幂等接收者](https://martinfowler.com/articles/patterns-of-distributed-systems/idempotent-receiver.html) 模式,可以忽略已成功处理的重复事件;这可以通过为事件赋予唯一标识符([幂等性键](https://stripe.com/docs/api/idempotent_requests))并在事件处理过程中存储和检查这些标识符来实现。
81+
82+
但根据经验,仅检测和忽略已成功处理的重复事件还不够。请求/响应API也应该幂等,以便能处理重复的请求。想象某请求由于网络不稳定而丢失或上游微服务没收到响应。如果API不是幂等,当再次发出请求时,重复请求可能会失败或产生错误的结果。
83+
84+
因此,最好解决方案是使请求/响应API幂等化。某些API操作(如GET或PUT)易实现幂等,而其他操作(如POST)则需要幂等性键和像幂等接收者这样的实现模式。如果你无法影响API的设计使其幂等化,至少在事件处理过程中需要考虑到这一点,以避免因使用非幂等API而导致的失败和错误结果。不过,是否考虑这点在很大程度上取决于API的设计。
85+
86+
### 示例
87+
88+
代码显示了一个响应重复请求时返回错误的集成请求/响应API。在这种情况下,重要的是识别重复错误和其他类型的错误。
89+
90+
考虑一个POST操作,它在处理重复请求时会返回422 UNPROCESSABLE_ENTITY错误,指出资源已经存在。错误响应中通常会包含进一步的信息,例如资源标识符,允许我们获取现有资源并继续业务逻辑。
91+
92+
```java
93+
// 在新资源创建失败时获取资源
94+
95+
void processEvent(Event event) {
96+
97+
Response response;
98+
99+
try {
100+
101+
response = sendRequestAndWaitForResponse(create_Post_Request(event));
102+
103+
} catch(UnprocessableEntityException ex) {
104+
105+
var resourceIdentifier = extractResourceIdentifier(ex.errorDetails());
106+
107+
response = sendRequestAndWaitForResponse(create_Get_Request(resourceIdentifier);
108+
109+
}
110+
111+
moreBusinessLogic(event, response);
112+
113+
/* ... */
114+
115+
}
116+
```
117+
118+
## 5 结论
119+
120+
上述讨论表明,幂等的请求/响应API更容易集成。然而,非幂等的请求/响应API也可以以幂等的方式集成。如何集成非幂等API最终取决于上下文和API的设计。
121+
122+
无论API设计如何,重要的是通过提供合理的恢复选项来应对重复事件和重复请求。重试是不可避免的,事件处理包括请求/响应API应当是幂等的。
+118
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
# Flink部署及任务提交
2+
3+
## 1 本地安装
4+
5+
只需几个简单的步骤即可启动并运行Flink示例程序。
6+
7+
### 1.1 安装:下载并启动Flink
8+
9+
Java 8。
10+
直接下载flink二进制包到本地并解压。
11+
12+
## 2 配置 flink-conf.yaml
13+
14+
jobmanager.rpc.address: 10.0.0.1 配置主节点的ip
15+
16+
jobmanager 主节点
17+
taskmanager 从节点
18+
19+
### 配置. bash_profile
20+
21+
```bash
22+
vim ~/.bash_profile
23+
24+
# Flink
25+
export FLINK_HOME=/Users/javaedge/Downloads/soft/flink-1.17.0
26+
export PATH=$FLINK_HOME/bin:$PATH
27+
28+
source ~/.bash_profile
29+
```
30+
31+
## 3 启动集群
32+
33+
```bash
34+
$ flink-1.17.0 % cd bin
35+
$ bin % ./start-cluster.sh
36+
Starting cluster.
37+
Starting standalonesession daemon on host JavaEdgedeMac-mini.local.
38+
Starting taskexecutor daemon on host JavaEdgedeMac-mini.local.
39+
# 验证集群启动成功
40+
$ jps
41+
```
42+
43+
![](https://img-blog.csdnimg.cn/e5a1cdcd747144ffa9d42b9393c01b2e.png)
44+
45+
## 4 提交任务
46+
47+
先启动一个 socket 传输:
48+
49+
````bash
50+
$ nc -lk 9527
51+
javaedge
52+
666
53+
888
54+
````
55+
56+
再提交任务:
57+
58+
```bash
59+
./flink run -c org.apache.flink.streaming.examples.socket.SocketWindowWordCount ../examples/streaming/SocketwindowWordCount.jar --hostname localhost --port 9527
60+
```
61+
62+
打开[控制台](http://localhost:18081),可见有个运行中任务了:
63+
64+
![](https://img-blog.csdnimg.cn/531cf22842a342cd868a796a821fbe1d.png)
65+
66+
![](https://img-blog.csdnimg.cn/5dedc9a9132b4437a49e0dd51938404c.png)
67+
68+
![](https://img-blog.csdnimg.cn/8200b82d10ed49f58346af85a8631e04.png)
69+
70+
任务执行结果:
71+
72+
![](https://img-blog.csdnimg.cn/a7d4c2ddfce449ceaf7710542fb4c055.png)
73+
74+
## 5 并行度
75+
76+
任务执行时,将一个任务划分为多个并行子任务来执行的能力。
77+
78+
- Flink中每个并行子任务被称为一个Task
79+
- 整个任务则被称为一个Job
80+
81+
### 5.1 env全局设置
82+
83+
使用`StreamExecutionEnvironment` 对象设置并行度,影响该环境中所有算子的并行度:
84+
85+
```java
86+
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
87+
env.setParallelism(4);
88+
```
89+
90+
### 5.2 算子级别设置
91+
92+
直接在算子上设置并行度,这会覆盖全局设置的并行度。
93+
94+
每个算子都可以单独设置并行度, 1个job可以包含不同的并行度。
95+
96+
```java
97+
final DataStream<String> input = env.addSource(new FlinkKafkaConsumer010<>("topic", new SimpleStringSchema(), props));
98+
input.flatMap(new MyFlatMapFunction()).setParallelism(2).print();
99+
```
100+
101+
### 5.3 配置文件级别
102+
103+
flink-conf.yaml
104+
105+
### 5.4 Client 级别
106+
107+
任务提交的命令行进行设置
108+
109+
### 5.5 并行度越大越好吗?
110+
111+
设置要根据具体场景和资源而调整:
112+
113+
- 过高的并行度可能会导致资源浪费和性能下降
114+
- 过低的并行度可能会导致无法充分利用资源,影响任务的执行效率
115+
116+
```bash
117+
./flink run -c org.apache.flink.streaming.examples.socket.SocketWindowWordCount -p 2 ../examples/streaming/SocketwindowWordCount.jar --hostname localhost --port 9527
118+
```

0 commit comments

Comments
 (0)