-
Notifications
You must be signed in to change notification settings - Fork 22
/
Copy pathevent.proto
150 lines (124 loc) · 4.29 KB
/
event.proto
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
syntax = "proto3";
package io.opensergo.proto.event.v1;
option java_package = "io.opensergo.proto.event.v1";
option java_outer_classname = "EventProto";
option java_multiple_files = true;
option go_package = "github.com/opensergo/opensergo-control-plane/proto/event/v1";
import "fault_tolerance/v1/fault_tolerance.proto";
// 事件管道 对应消息队列主题
message EventChannel {
string unique_id = 1; // 全局唯一id
string url = 2; // 消息队列连接地址
string mq_topic_name = 3; // channel对应的消息队列主题
}
// 处理器对象
message EventProcessorRef {
string unique_id = 1; // 全局唯一id
string kind = 2; // 处理器类型 比如 Service 表示服务
string name = 3; // 处理器名称 比如 events-demo 表示服务名叫的 events-demo 的服务
}
// 事件源 生产者
message EventSource {
EventProcessorRef ref = 1;
}
// 事件触发器 消费者
message EventTrigger {
EventProcessorRef ref = 1;
}
// 事件组件
message EventComponents {
repeated EventChannel channels = 1;
repeated EventSource sources = 2;
repeated EventTrigger triggers = 3;
}
// 持久化
message Persistence {
enum PersistenceType {
MEMORY = 0; // 内存
LFS = 1; // 本地文件系统
DFS = 2; // 分布式文件系统
}
message PersistenceAddress {
// 存储地址 根据PersistenceType确定值
// 如 本地文件目录 /usr/local/data 远程文件系统地址 xxx.xxx.xxx.xxx
string address = 1;
}
enum FullStrategy {
BLOCK = 0; // 阻塞
DROP = 1; // 丢弃
}
PersistenceType persistence_type = 1;
int64 persistence_size = 2; // 存储大小
FullStrategy full_strategy = 3;
}
// 重试规则
message RetryRule {
int64 retry_max = 1;
// 重试之间的延时 使用 ISO-8601 规则指定时间段的字符串 比如P6S 表示持续时间6s
string back_off_delay = 2;
// back off 类型
enum BackoffPolicyType {
BACK_OFF_POLICY_LINEAR = 0; // 线性级增长策略
BACK_OFF_POLICY_EXPONENTIAL = 1; // 指数级增长策略 默认以2为底数
}
BackoffPolicyType back_off_policy_type = 3;
}
// 死信消息规则
message DeadLetterStrategy {
bool enable = 1; // 开启死信消息
int64 retry_trigger_threshold = 2; // 重试触发死信的阈值
oneof store {// 当enable_block=true时生效
EventChannel channel = 3; // 表示存放死信生产或者消费消息的channel
Persistence persistence = 4; // 表示使用本地或者远程存储
}
}
// 事件运行时策略
// 集中设置 封装 容错 限流 断路器
message EventRuntimeStrategy {
io.opensergo.proto.fault_tolerance.v1.FaultToleranceRule fault_tolerance_rule = 1;
io.opensergo.proto.fault_tolerance.v1.RateLimitStrategy rate_limit_strategy = 2;
io.opensergo.proto.fault_tolerance.v1.CircuitBreakerStrategy circuit_breaker_strategy = 3;
io.opensergo.proto.fault_tolerance.v1.ConcurrencyLimitStrategy concurrency_limit_strateg = 4;
RetryRule retry_rule = 5;
}
// 事件源 生产者的策略
message EventSourceStrategy {
string event_source_id = 1;
bool async_send = 2; // 是否开启异步发送
Persistence fault_tolerant_storage = 3; // 容错存储设置
EventRuntimeStrategy runtime_strategy = 4;
}
// 事件触发器 消费者的策略
message EventTriggerStrategy {
string event_trigger_id = 1;
int64 receive_buffer_size = 2; // 消费者缓存大小
bool enable_idempotence = 3; // 是否开启幂等
EventRuntimeStrategy runtime_strategy = 4;
DeadLetterStrategy dead_letter_strategy = 5;
}
// 事件的策略
message EventStrategies {
repeated EventSourceStrategy source_strategies = 1; // 生产者策略 详细如上
repeated EventTriggerStrategy trigger_strategies = 2; // 消费者策略 详细如上
}
// 事件路由规则 包括 source trigger filter
message EventRouterRules {
// 事件过滤器
message Filter {
string cesql = 1; // CloudEvents SQL Expression 基于 cloudevents sql表达式
}
// 事件路由
message Router {
string source_id = 1;
string trigger_id = 2;
string channel_id = 3;
Filter filter = 4;
}
repeated Router routers = 1;
}
// 事件规则 事件需遵循 CloudEvents 事件数据规范
message Event {
EventComponents components = 1; // 事件组件
EventStrategies strategy = 2; // 事件策略
EventRouterRules router_rules = 3; // 事件路由规则
}