Skip to content

Commit fc923af

Browse files
author
shiwenyan
committed
Based on the actual usage scenarios of customers, write customized MQTT message examples
1 parent 6f07e85 commit fc923af

File tree

3 files changed

+207
-21
lines changed

3 files changed

+207
-21
lines changed

example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java

Lines changed: 77 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -21,38 +21,56 @@
2121

2222
import org.apache.iotdb.db.protocol.mqtt.Message;
2323
import org.apache.iotdb.db.protocol.mqtt.PayloadFormatter;
24-
import org.apache.iotdb.db.protocol.mqtt.TreeMessage;
24+
import org.apache.iotdb.db.protocol.mqtt.TableMessage;
2525

26+
import com.google.common.collect.Lists;
27+
import com.google.gson.Gson;
28+
import com.google.gson.GsonBuilder;
29+
import com.google.gson.JsonArray;
30+
import com.google.gson.JsonElement;
31+
import com.google.gson.JsonObject;
32+
import com.google.gson.JsonParseException;
2633
import io.netty.buffer.ByteBuf;
34+
import org.apache.tsfile.enums.TSDataType;
2735
import org.apache.tsfile.external.commons.lang3.NotImplementedException;
2836

37+
import java.nio.charset.StandardCharsets;
2938
import java.util.ArrayList;
3039
import java.util.Arrays;
31-
import java.util.Collections;
3240
import java.util.List;
3341

42+
/**
43+
* The Customized JSON payload formatter. one json format supported: { "time":1586076045523,
44+
* "deviceID":"car_1", "deviceType":"new energy vehicle", "point":"velocity", "value":80.0 }
45+
*/
3446
public class CustomizedJsonPayloadFormatter implements PayloadFormatter {
47+
private static final String JSON_KEY_TIME = "time";
48+
private static final String JSON_KEY_DEVICEID = "deviceID";
49+
private static final String JSON_KEY_DEVICETYPE = "deviceType";
50+
private static final String JSON_KEY_POINT = "point";
51+
private static final String JSON_KEY_VALUE = "value";
52+
private static final Gson GSON = new GsonBuilder().create();
3553

3654
@Override
3755
public List<Message> format(String topic, ByteBuf payload) {
38-
// Suppose the payload is a json format
3956
if (payload == null) {
40-
return Collections.emptyList();
57+
return new ArrayList<>();
4158
}
42-
43-
// parse data from the json and generate Messages and put them into List<Message> ret
44-
List<Message> ret = new ArrayList<>();
45-
// this is just an example, so we just generate some Messages directly
46-
for (int i = 0; i < 2; i++) {
47-
long ts = i;
48-
TreeMessage message = new TreeMessage();
49-
message.setDevice("d" + i);
50-
message.setTimestamp(ts);
51-
message.setMeasurements(Arrays.asList("s1", "s2"));
52-
message.setValues(Arrays.asList("4.0" + i, "5.0" + i));
53-
ret.add(message);
59+
String txt = payload.toString(StandardCharsets.UTF_8);
60+
JsonElement jsonElement = GSON.fromJson(txt, JsonElement.class);
61+
if (jsonElement.isJsonObject()) {
62+
JsonObject jsonObject = jsonElement.getAsJsonObject();
63+
return formatTableRow(topic, jsonObject);
64+
} else if (jsonElement.isJsonArray()) {
65+
JsonArray jsonArray = jsonElement.getAsJsonArray();
66+
List<Message> messages = new ArrayList<>();
67+
for (JsonElement element : jsonArray) {
68+
JsonObject jsonObject = element.getAsJsonObject();
69+
messages.addAll(formatTableRow(topic, jsonObject));
70+
}
71+
return messages;
5472
}
55-
return ret;
73+
throw new JsonParseException("payload is invalidate");
5674
}
5775

5876
@Override
@@ -61,14 +79,54 @@ public List<Message> format(ByteBuf payload) {
6179
throw new NotImplementedException();
6280
}
6381

82+
private List<Message> formatTableRow(String topic, JsonObject jsonObject) {
83+
TableMessage message = new TableMessage();
84+
String database = !topic.contains("/") ? topic : topic.substring(0, topic.indexOf("/"));
85+
String table = "test_table";
86+
87+
// Parsing Database Name
88+
message.setDatabase((database));
89+
90+
// Parsing Table Name
91+
message.setTable(table);
92+
93+
// Parsing Tags
94+
List<String> tagKeys = new ArrayList<>();
95+
tagKeys.add(JSON_KEY_DEVICEID);
96+
List<Object> tagValues = new ArrayList<>();
97+
tagValues.add(jsonObject.get(JSON_KEY_DEVICEID).getAsString());
98+
message.setTagKeys(tagKeys);
99+
message.setTagValues(tagValues);
100+
101+
// Parsing Attributes
102+
List<String> attributeKeys = new ArrayList<>();
103+
List<Object> attributeValues = new ArrayList<>();
104+
attributeKeys.add(JSON_KEY_DEVICETYPE);
105+
attributeValues.add(jsonObject.get(JSON_KEY_DEVICETYPE).getAsString());
106+
message.setAttributeKeys(attributeKeys);
107+
message.setAttributeValues(attributeValues);
108+
109+
// Parsing Fields
110+
List<String> fields = Arrays.asList(JSON_KEY_POINT);
111+
List<TSDataType> dataTypes = Arrays.asList(TSDataType.FLOAT);
112+
List<Object> values = Arrays.asList(jsonObject.get(JSON_KEY_VALUE).getAsFloat());
113+
message.setFields(fields);
114+
message.setDataTypes(dataTypes);
115+
message.setValues(values);
116+
117+
// Parsing timestamp
118+
message.setTimestamp(jsonObject.get(JSON_KEY_TIME).getAsLong());
119+
return Lists.newArrayList(message);
120+
}
121+
64122
@Override
65123
public String getName() {
66124
// set the value of mqtt_payload_formatter in iotdb-common.properties as the following string:
67-
return "CustomizedJson";
125+
return "CustomizedJson2Table";
68126
}
69127

70128
@Override
71129
public String getType() {
72-
return PayloadFormatter.TREE_TYPE;
130+
return PayloadFormatter.TABLE_TYPE;
73131
}
74132
}
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
package org.apache.iotdb.mqtt.server;
2+
3+
import org.apache.iotdb.db.protocol.mqtt.Message;
4+
import org.apache.iotdb.db.protocol.mqtt.PayloadFormatter;
5+
import org.apache.iotdb.db.protocol.mqtt.TableMessage;
6+
7+
import io.netty.buffer.ByteBuf;
8+
import org.apache.commons.lang3.NotImplementedException;
9+
import org.apache.tsfile.enums.TSDataType;
10+
11+
import java.nio.charset.StandardCharsets;
12+
import java.util.ArrayList;
13+
import java.util.Arrays;
14+
import java.util.List;
15+
16+
public class CustomizedLinePayloadFormatter implements PayloadFormatter {
17+
18+
@Override
19+
public List<Message> format(String topic, ByteBuf payload) {
20+
// Suppose the payload is a line format
21+
if (payload == null) {
22+
return null;
23+
}
24+
25+
String line = payload.toString(StandardCharsets.UTF_8);
26+
// parse data from the line and generate Messages and put them into List<Meesage> ret
27+
List<Message> ret = new ArrayList<>();
28+
// this is just an example, so we just generate some Messages directly
29+
for (int i = 0; i < 3; i++) {
30+
long ts = i;
31+
TableMessage message = new TableMessage();
32+
33+
// Parsing Database Name
34+
message.setDatabase("db" + i);
35+
36+
// Parsing Table Names
37+
message.setTable("t" + i);
38+
39+
// Parsing Tags
40+
List<String> tagKeys = new ArrayList<>();
41+
tagKeys.add("tag1" + i);
42+
tagKeys.add("tag2" + i);
43+
List<Object> tagValues = new ArrayList<>();
44+
tagValues.add("t_value1" + i);
45+
tagValues.add("t_value2" + i);
46+
message.setTagKeys(tagKeys);
47+
message.setTagValues(tagValues);
48+
49+
// Parsing Attributes
50+
List<String> attributeKeys = new ArrayList<>();
51+
List<Object> attributeValues = new ArrayList<>();
52+
attributeKeys.add("attr1" + i);
53+
attributeKeys.add("attr2" + i);
54+
attributeValues.add("a_value1" + i);
55+
attributeValues.add("a_value2" + i);
56+
message.setAttributeKeys(attributeKeys);
57+
message.setAttributeValues(attributeValues);
58+
59+
// Parsing Fields
60+
List<String> fields = Arrays.asList("field1" + i, "field2" + i);
61+
List<TSDataType> dataTypes = Arrays.asList(TSDataType.FLOAT, TSDataType.FLOAT);
62+
List<Object> values = Arrays.asList("4.0" + i, "5.0" + i);
63+
message.setFields(fields);
64+
message.setDataTypes(dataTypes);
65+
message.setValues(values);
66+
67+
//// Parsing timestamp
68+
message.setTimestamp(ts);
69+
ret.add(message);
70+
}
71+
return ret;
72+
}
73+
74+
@Override
75+
@Deprecated
76+
public List<Message> format(ByteBuf payload) {
77+
throw new NotImplementedException();
78+
}
79+
80+
@Override
81+
public String getName() {
82+
// set the value of mqtt_payload_formatter in iotdb-system.properties as the following string:
83+
return "CustomizedLine";
84+
}
85+
86+
@Override
87+
public String getType() {
88+
return PayloadFormatter.TABLE_TYPE;
89+
}
90+
}

example/mqtt/src/main/java/org/apache/iotdb/mqtt/MQTTClient.java

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,12 @@ public static void main(String[] args) throws Exception {
4040
connection.connect();
4141
// the config mqttPayloadFormatter must be tree-json
4242
// jsonPayloadFormatter(connection);
43+
4344
// the config mqttPayloadFormatter must be table-line
44-
linePayloadFormatter(connection);
45+
// linePayloadFormatter(connection);
46+
47+
// test customized json formatter of mqtt payload to insert as table row
48+
customizedJsonPayloadFormatter2Table(connection);
4549
connection.disconnect();
4650
}
4751

@@ -58,7 +62,10 @@ private static void jsonPayloadFormatter(BlockingConnection connection) throws E
5862
+ "\"values\":[%f]\n"
5963
+ "}",
6064
System.currentTimeMillis(), random.nextDouble());
61-
sb.append(payload).append(",");
65+
sb.append(payload);
66+
if (i < 9) {
67+
sb.append(",");
68+
}
6269

6370
// publish a json object
6471
Thread.sleep(1);
@@ -109,4 +116,35 @@ private static void linePayloadFormatter(BlockingConnection connection) throws E
109116
connection.publish(DATABASE + "/myTopic", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
110117
Thread.sleep(10);
111118
}
119+
120+
/**
121+
* The Customized JSON payload formatter. one json format supported: { "time":1586076045523,
122+
* "deviceID":"car_1", "deviceType":"新能源车", "point":"速度", "value":80.0 }
123+
*/
124+
private static void customizedJsonPayloadFormatter2Table(BlockingConnection connection)
125+
throws Exception {
126+
Random random = new Random();
127+
StringBuilder sb = new StringBuilder();
128+
for (int i = 0; i < 10; i++) {
129+
String payload =
130+
String.format(
131+
"{\n"
132+
+ "\"time\":%d,\n"
133+
+ "\"deviceID\":\"car_1\",\n"
134+
+ "\"deviceType\":\"新能源车\",\n"
135+
+ "\"point\":\"速度\",\n"
136+
+ "\"value\":%.2f\n"
137+
+ "}",
138+
System.currentTimeMillis(), random.nextFloat());
139+
sb.append(payload).append(",");
140+
141+
// publish a json object
142+
Thread.sleep(1);
143+
connection.publish(DATABASE + "/myTopic", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
144+
}
145+
// publish a json array
146+
sb.insert(0, "[");
147+
sb.replace(sb.lastIndexOf(","), sb.length(), "]");
148+
connection.publish(DATABASE + "/myTopic", sb.toString().getBytes(), QoS.AT_LEAST_ONCE, false);
149+
}
112150
}

0 commit comments

Comments
 (0)