Skip to content

Commit 9b6e7d0

Browse files
committed
规则引擎传输
1 parent d44bb01 commit 9b6e7d0

File tree

8 files changed

+19
-32
lines changed

8 files changed

+19
-32
lines changed

smqtt-common/src/main/java/io/github/quickmsg/common/rule/source/SourceBean.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public interface SourceBean {
3939
*
4040
* @param object {@link Map}
4141
*/
42-
void transmit(Map<String, Object> object);
42+
void transmit(Object object);
4343

4444
/**
4545
* 关闭资源

smqtt-rule/smqtt-rule-engine/src/main/java/io/github/quickmsg/rule/node/TransmitRuleNode.java

+5-9
Original file line numberDiff line numberDiff line change
@@ -34,22 +34,18 @@ public TransmitRuleNode(Source source, String script) {
3434
@Override
3535
public void execute(ContextView contextView) {
3636
HeapMqttMessage heapMqttMessage = contextView.get(HeapMqttMessage.class);
37-
Map<String, Object> param;
37+
Object param;
3838
if (script != null) {
39-
Object obj = triggerScript(script, context -> heapMqttMessage.getKeyMap().forEach(context::set));
40-
param = JacksonUtil.json2Map(obj.toString(), String.class, Object.class);
41-
39+
param = triggerScript(script, context -> heapMqttMessage.getKeyMap().forEach(context::set));
4240
} else {
4341
param = heapMqttMessage.getKeyMap();
4442
}
4543
SourceBean sourceBean = SourceManager.getSourceBean(source);
46-
if(sourceBean == null){
47-
log.warn("[ please set source {}]",source);
48-
}
49-
else{
44+
if (sourceBean != null) {
5045
sourceBean.transmit(param);
46+
executeNext(contextView);
47+
5148
}
52-
executeNext(contextView);
5349
}
5450

5551
@Override

smqtt-rule/smqtt-rule-source/smqtt-rule-source-db/src/main/java/io/github/quickmsg/source/db/DbSourceBean.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,10 @@ public Boolean bootstrap(Map<String, Object> sourceParam) {
5757
* @param object 对象
5858
*/
5959
@Override
60-
public void transmit(Map<String, Object> object) {
60+
public void transmit(Object object) {
6161
try (Connection connection = HikariCPConnectionProvider.singleTon().getConnection()) {
6262
DSLContext dslContext = DSL.using(connection);
63-
dslContext.execute(object.get("sql").toString());
63+
dslContext.execute(object.toString());
6464
} catch (Exception e) {
6565
log.error("execute sql error", e);
6666
}

smqtt-rule/smqtt-rule-source/smqtt-rule-source-http/src/main/java/io/github/quickmsg/http/HttpSourceBean.java

+2-5
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,11 @@ public Boolean bootstrap(Map<String, Object> sourceParam) {
4343

4444

4545
@Override
46-
public void transmit(Map<String, Object> object) {
47-
if (httpParam.getAdditions() != null && httpParam.getAdditions().size() > 0) {
48-
httpParam.getAdditions().forEach(object::put);
49-
}
46+
public void transmit(Object object) {
5047
httpClient
5148
.post()
5249
.uri(httpParam.getUrl())
53-
.send(Mono.just(PooledByteBufAllocator.DEFAULT.directBuffer().writeBytes(JacksonUtil.bean2Json(object).getBytes())))
50+
.send(Mono.just(PooledByteBufAllocator.DEFAULT.directBuffer().writeBytes(JacksonUtil.dynamicJson(object).getBytes())))
5451
.response()
5552
.log()
5653
.subscribe();

smqtt-rule/smqtt-rule-source/smqtt-rule-source-kafka/src/main/java/io/github/quickmsg/source/mqtt/KafkaSourceBean.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,9 @@ public Boolean bootstrap(Map<String, Object> sourceParam) {
5959
* @param object 对象
6060
*/
6161
@Override
62-
public void transmit(Map<String, Object> object) {
63-
String json = JacksonUtil.bean2Json(object);
64-
log.info("kafka send msg {}", json);
62+
public void transmit(Object object) {
6563
if (producer != null) {
66-
ProducerRecord<String, Object> record = new ProducerRecord<>(topic, json);
64+
ProducerRecord<String, Object> record = new ProducerRecord<>(topic, JacksonUtil.dynamicJson(object));
6765
producer.send(record);
6866
}
6967
}

smqtt-rule/smqtt-rule-source/smqtt-rule-source-mqtt/src/main/java/io/github/quickmsg/source/mqtt/MqttSourceBean.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -110,10 +110,11 @@ public Boolean bootstrap(Map<String, Object> sourceParam) {
110110
/**
111111
* 转发数据
112112
*
113-
* @param object 对象
113+
* @param param 对象
114114
*/
115115
@Override
116-
public void transmit(Map<String, Object> object) {
116+
public void transmit(Object param) {
117+
Map<String,Object> object = (Map<String,Object>)param;
117118
String topic = (String) object.get("topic");
118119
Object msg = object.get("msg");
119120
String bytes = msg instanceof Map ? JacksonUtil.map2Json((Map<? extends Object, ? extends Object>) msg) : msg.toString();

smqtt-rule/smqtt-rule-source/smqtt-rule-source-rabbitmq/src/main/java/io/github/quickmsg/source/rabbitmq/RabbitmqSourceBean.java

+2-7
Original file line numberDiff line numberDiff line change
@@ -72,13 +72,8 @@ public Boolean bootstrap(Map<String, Object> sourceParam) {
7272
* @param object 对象
7373
*/
7474
@Override
75-
public void transmit(Map<String, Object> object) {
76-
String json = JacksonUtil.bean2Json(object);
77-
log.info("transmit={}", json);
78-
// String clientId = (String) object.get("clientIdentifier");
79-
// String topic = (String) object.get("topic");
80-
// String msg = (String) object.get("msg");
81-
corePublish(queueName, json);
75+
public void transmit(Object object) {
76+
corePublish(queueName, JacksonUtil.dynamicJson(object));
8277
}
8378

8479
/**

smqtt-rule/smqtt-rule-source/smqtt-rule-source-rocketmq/src/main/java/io/github/quickmsg/source/rocketmq/RocketmqSourceBean.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,8 @@ public Boolean bootstrap(Map<String, Object> sourceParam) {
6565
* @param object 对象
6666
*/
6767
@Override
68-
public void transmit(Map<String, Object> object) {
69-
String json = JacksonUtil.bean2Json(object);
68+
public void transmit(Object object) {
69+
String json = JacksonUtil.dynamicJson(object);
7070
if (producer != null) {
7171
Message message = new Message(topic, tags, json.getBytes());
7272
//发送消息v

0 commit comments

Comments
 (0)