Skip to content

Commit 5d011cd

Browse files
author
litongjava
committed
add mq/02.md
1 parent f0ca46e commit 5d011cd

File tree

2 files changed

+231
-1
lines changed

2 files changed

+231
-1
lines changed

docs/.vuepress/config/sidebar-zh.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@
9494
{
9595
"title": "10_MQ",
9696
"collapsable": false,
97-
"children": ["10_MQ/01.md"]
97+
"children": ["10_MQ/01.md", "10_MQ/02.md"]
9898
},
9999
{
100100
"title": "11_i18n",

docs/zh/10_MQ/02.md

+230
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
# EMQX
2+
3+
本文用来设置和运行一个基于 MQTT 协议的客户端程序,利用 Eclipse Paho MQTT 客户端库和 tio-boot 框架进行消息发布和订阅。
4+
5+
### Maven 依赖
6+
7+
首先,`<dependency>`部分是 Maven 构建配置,用于在你的项目中包含 Eclipse Paho MQTT 客户端库。
8+
9+
```xml
10+
<dependency>
11+
<groupId>org.eclipse.paho</groupId>
12+
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
13+
<version>1.2.5</version>
14+
</dependency>
15+
```
16+
17+
### app.properties
18+
19+
eqmx 配置
20+
21+
```
22+
emqx.broker=tcp://192.168.3.9:1883
23+
emqx.username=username
24+
emqx.password=password
25+
emqx.topic=test/topic
26+
emqx.qos=2
27+
```
28+
29+
### HelloApp 类
30+
31+
这是一个使用 Litongjava Tio 框架启动应用程序的主类。`@AComponentScan`注解自动扫描和加载组件。
32+
33+
```java
34+
@AComponentScan
35+
public class HelloApp {
36+
public static void main(String[] args) {
37+
long start = System.currentTimeMillis();
38+
TioApplication.run(HelloApp.class, args);
39+
long end = System.currentTimeMillis();
40+
System.out.println((end - start) + "ms");
41+
}
42+
}
43+
```
44+
45+
### SampleCallback 类
46+
47+
这是一个实现了 `MqttCallback` 接口的类。它提供了三个方法来处理与 MQTT 消息相关的事件:
48+
49+
1. `connectionLost(Throwable cause)`: 当连接丢失时调用。
50+
2. `messageArrived(String topic, MqttMessage message)`: 当收到新消息时调用。
51+
3. `deliveryComplete(IMqttDeliveryToken token)`: 当消息成功发送后调用。
52+
53+
```java
54+
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
55+
import org.eclipse.paho.client.mqttv3.MqttCallback;
56+
import org.eclipse.paho.client.mqttv3.MqttMessage;
57+
58+
public class SampleCallback implements MqttCallback {
59+
// 连接丢失
60+
public void connectionLost(Throwable cause) {
61+
System.out.println("connection lost:" + cause.getMessage());
62+
}
63+
64+
// 收到消息
65+
public void messageArrived(String topic, MqttMessage message) {
66+
System.out.println("Received message: \n topic:" + topic + "\n Qos:" + message.getQos() + "\n payload:"
67+
+ new String(message.getPayload()));
68+
}
69+
70+
// 消息传递成功
71+
public void deliveryComplete(IMqttDeliveryToken token) {
72+
System.out.println("deliveryComplete");
73+
}
74+
}
75+
```
76+
77+
### MqttClientUtils 类
78+
79+
这个工具类用于简化 MQTT 客户端的操作。它提供了保存客户端、发布消息等功能。
80+
81+
```java
82+
import org.eclipse.paho.client.mqttv3.MqttClient;
83+
import org.eclipse.paho.client.mqttv3.MqttException;
84+
import org.eclipse.paho.client.mqttv3.MqttMessage;
85+
86+
public class MqttClientUtils {
87+
private static MqttClient client;
88+
private static String topic;
89+
private static int qos;
90+
91+
public static void init(MqttClient client, String topic, int qos) {
92+
MqttClientUtils.client = client;
93+
MqttClientUtils.topic = topic;
94+
MqttClientUtils.qos = qos;
95+
}
96+
97+
public static MqttClient getClient() {
98+
return client;
99+
}
100+
101+
public static void publish(MqttMessage message) {
102+
message.setQos(qos);
103+
try {
104+
client.publish(topic, message);
105+
} catch (MqttException e) {
106+
e.printStackTrace();
107+
}
108+
}
109+
110+
public static void publishWithQos(MqttMessage message) {
111+
try {
112+
client.publish(topic, message);
113+
} catch (MqttException e) {
114+
e.printStackTrace();
115+
}
116+
}
117+
}
118+
```
119+
120+
### EmqxClientConfig 类
121+
122+
这是一个配置类,用于初始化 MQTT 客户端和连接设置。它使用 `EnvironmentUtils` 从环境或配置文件中获取配置,并初始化一个 MQTT 客户端实例。该实例配置了连接选项、回调处理以及订阅的 topic。此外,还有一个销毁方法,用于在应用程序关闭时正确地断开 MQTT 客户端的连接。
123+
124+
```java
125+
import org.eclipse.paho.client.mqttv3.MqttClient;
126+
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
127+
import org.eclipse.paho.client.mqttv3.MqttException;
128+
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
129+
130+
import com.litongjava.jfinal.aop.annotation.AConfiguration;
131+
import com.litongjava.jfinal.aop.annotation.AInitialization;
132+
import com.litongjava.tio.boot.server.TioBootServer;
133+
import com.litongjava.tio.utils.environment.EnvironmentUtils;
134+
import com.litongjava.tio.web.hello.utils.MqttClientUtils;
135+
136+
import lombok.extern.slf4j.Slf4j;
137+
138+
@AConfiguration
139+
@Slf4j
140+
public class EmqxClientConfig {
141+
@AInitialization
142+
public void config() {
143+
String broker = EnvironmentUtils.get("emqx.broker");
144+
String username = EnvironmentUtils.get("emqx.username");
145+
String password = EnvironmentUtils.get("emqx.password");
146+
String topic = EnvironmentUtils.get("emqx.topic");
147+
148+
int qos = EnvironmentUtils.getInt("emqx.qos", 2);
149+
150+
String clientId = MqttClient.generateClientId();
151+
// 持久化
152+
MemoryPersistence persistence = new MemoryPersistence();
153+
// MQTT 连接选项
154+
MqttConnectOptions connOpts = new MqttConnectOptions();
155+
// 设置认证信息
156+
connOpts.setUserName(username);
157+
connOpts.setPassword(password.toCharArray());
158+
try {
159+
MqttClient client = new MqttClient(broker, clientId, persistence);
160+
// 设置回调
161+
client.setCallback(new SampleCallback());
162+
// 建立连接
163+
log.info("Connecting to broker: " + broker);
164+
client.connect(connOpts);
165+
log.info("Connected to broker: " + broker);
166+
// 订阅 topic
167+
client.subscribe(topic, qos);
168+
log.info("Subscribed to topic: " + topic);
169+
170+
TioBootServer.addDestroyMethod(() -> {
171+
try {
172+
client.disconnect();
173+
log.info("Disconnected");
174+
client.close();
175+
} catch (MqttException e) {
176+
e.printStackTrace();
177+
}
178+
});
179+
180+
MqttClientUtils.init(client,topic,qos);
181+
182+
} catch (MqttException me) {
183+
log.error("reason " + me.getReasonCode());
184+
log.error("msg " + me.getMessage());
185+
log.error("loc " + me.getLocalizedMessage());
186+
log.error("cause " + me.getCause());
187+
log.error("excep " + me);
188+
me.printStackTrace();
189+
}
190+
}
191+
}
192+
```
193+
194+
### IndexController 类
195+
196+
这是一个简单的控制器,用于处理 HTTP 请求。当访问应用程序的根路径时,它会发布一个 "Hello World" 消息到 MQTT 服务器。
197+
198+
```java
199+
import org.eclipse.paho.client.mqttv3.MqttMessage;
200+
201+
import com.litongjava.tio.http.server.annotation.RequestPath;
202+
import com.litongjava.tio.web.hello.utils.MqttClientUtils;
203+
204+
@RequestPath("/")
205+
public class IndexController {
206+
@RequestPath()
207+
public String index() {
208+
String content = "Hello World";
209+
MqttMessage message = new MqttMessage(content.getBytes());
210+
MqttClientUtils.publish(message);
211+
return "index";
212+
}
213+
}
214+
```
215+
216+
### 发布消息
217+
218+
使用 `MqttClientUtils` 类来发布消息到 MQTT 服务器。
219+
220+
```java
221+
// 发布消息
222+
MqttClient client = MqttClientUtils.getClient();
223+
String content = "Hello World";
224+
MqttMessage message = new MqttMessage(content.getBytes());
225+
message.setQos(qos);
226+
client.publish(topic, message);
227+
log.info("Message published");
228+
```
229+
230+
主要涉及使用 Eclipse Paho MQTT 客户端库进行消息发布和订阅,以及使用 tio-boot 框架构建的 Web 应用程序。它演示了如何配置 MQTT 客户端,处理连接丢失、消息到达和消息传递完成的事件,发布和订阅消息,并通过 HTTP 接口触发消息发布。

0 commit comments

Comments
 (0)