diff --git a/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java b/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java index 8c3a962173985..73940d8b486e7 100644 --- a/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java +++ b/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java @@ -21,38 +21,57 @@ import org.apache.iotdb.db.protocol.mqtt.Message; import org.apache.iotdb.db.protocol.mqtt.PayloadFormatter; -import org.apache.iotdb.db.protocol.mqtt.TreeMessage; +import org.apache.iotdb.db.protocol.mqtt.TableMessage; +import com.google.common.collect.Lists; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParseException; import io.netty.buffer.ByteBuf; +import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.external.commons.lang3.NotImplementedException; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.Pair; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; import java.util.List; +/** + * The Customized JSON payload formatter. one json format supported: { "time":1586076045523, + * "deviceID":"car_1", "deviceType":"new energy vehicle", "point":"velocity", "value":80.0 } + */ public class CustomizedJsonPayloadFormatter implements PayloadFormatter { + private static final String JSON_KEY_TIME = "time"; + private static final String JSON_KEY_DEVICEID = "deviceID"; + private static final String JSON_KEY_DEVICETYPE = "deviceType"; + private static final String JSON_KEY_POINT = "point"; + private static final String JSON_KEY_VALUE = "value"; + private static final Gson GSON = new GsonBuilder().create(); @Override public List format(String topic, ByteBuf payload) { - // Suppose the payload is a json format if (payload == null) { - return Collections.emptyList(); + return new ArrayList<>(); } - - // parse data from the json and generate Messages and put them into List ret - List ret = new ArrayList<>(); - // this is just an example, so we just generate some Messages directly - for (int i = 0; i < 2; i++) { - long ts = i; - TreeMessage message = new TreeMessage(); - message.setDevice("d" + i); - message.setTimestamp(ts); - message.setMeasurements(Arrays.asList("s1", "s2")); - message.setValues(Arrays.asList("4.0" + i, "5.0" + i)); - ret.add(message); + String txt = payload.toString(StandardCharsets.UTF_8); + JsonElement jsonElement = GSON.fromJson(txt, JsonElement.class); + if (jsonElement.isJsonObject()) { + JsonObject jsonObject = jsonElement.getAsJsonObject(); + return formatTableRow(topic, jsonObject); + } else if (jsonElement.isJsonArray()) { + JsonArray jsonArray = jsonElement.getAsJsonArray(); + List messages = new ArrayList<>(); + for (JsonElement element : jsonArray) { + JsonObject jsonObject = element.getAsJsonObject(); + messages.addAll(formatTableRow(topic, jsonObject)); + } + return messages; } - return ret; + throw new JsonParseException("payload is invalidate"); } @Override @@ -61,14 +80,109 @@ public List format(ByteBuf payload) { throw new NotImplementedException(); } + private List formatTableRow(String topic, JsonObject jsonObject) { + TableMessage message = new TableMessage(); + String database = !topic.contains("/") ? topic : topic.substring(0, topic.indexOf("/")); + String table = "test_table"; + + // Parsing Database Name + message.setDatabase((database)); + + // Parsing Table Name + message.setTable(table); + + // Parsing Tags + List tagKeys = new ArrayList<>(); + tagKeys.add(JSON_KEY_DEVICEID); + List tagValues = new ArrayList<>(); + tagValues.add( + new Binary[] { + new Binary( + (jsonObject.get(JSON_KEY_DEVICEID).getAsString()).getBytes(StandardCharsets.UTF_8)) + }); + message.setTagKeys(tagKeys); + message.setTagValues(tagValues); + + // Parsing Attributes + List attributeKeys = new ArrayList<>(); + List attributeValues = new ArrayList<>(); + attributeKeys.add(JSON_KEY_DEVICETYPE); + attributeValues.add( + new Binary[] { + new Binary( + (jsonObject.get(JSON_KEY_DEVICETYPE).getAsString()).getBytes(StandardCharsets.UTF_8)) + }); + message.setAttributeKeys(attributeKeys); + message.setAttributeValues(attributeValues); + + // Parsing Fields + List fields = new ArrayList<>(); + List dataTypes = new ArrayList<>(); + List values = new ArrayList<>(); + fields.add(JSON_KEY_POINT); + dataTypes.add(TSDataType.STRING); + values.add( + new Binary[] { + new Binary( + (jsonObject.get(JSON_KEY_POINT).getAsString()).getBytes(StandardCharsets.UTF_8)) + }); + fields.add(JSON_KEY_VALUE); + Pair typeAndValue = + analyticValue(jsonObject.get(JSON_KEY_VALUE).getAsString()); + values.add(typeAndValue.getRight()); + dataTypes.add(typeAndValue.getLeft()); + + message.setFields(fields); + message.setDataTypes(dataTypes); + message.setValues(values); + + // Parsing timestamp + message.setTimestamp(jsonObject.get(JSON_KEY_TIME).getAsLong()); + return Lists.newArrayList(message); + } + + private Pair analyticValue(String value) { + if (value.startsWith("\"") && value.endsWith("\"")) { + // String + return new Pair<>( + TSDataType.TEXT, + new Binary[] { + new Binary(value.substring(1, value.length() - 1).getBytes(StandardCharsets.UTF_8)) + }); + } else if (value.equalsIgnoreCase("t") + || value.equalsIgnoreCase("true") + || value.equalsIgnoreCase("f") + || value.equalsIgnoreCase("false")) { + // boolean + return new Pair<>( + TSDataType.BOOLEAN, + new boolean[] {value.equalsIgnoreCase("t") || value.equalsIgnoreCase("true")}); + } else if (value.endsWith("f")) { + // float + return new Pair<>( + TSDataType.FLOAT, new float[] {Float.parseFloat(value.substring(0, value.length() - 1))}); + } else if (value.endsWith("i32")) { + // int + return new Pair<>( + TSDataType.INT32, new int[] {Integer.parseInt(value.substring(0, value.length() - 3))}); + } else if (value.endsWith("u") || value.endsWith("i")) { + // long + return new Pair<>( + TSDataType.INT64, new long[] {Long.parseLong(value.substring(0, value.length() - 1))}); + } else { + // double + return new Pair<>(TSDataType.DOUBLE, new double[] {Double.parseDouble(value)}); + } + } + @Override public String getName() { // set the value of mqtt_payload_formatter in iotdb-common.properties as the following string: - return "CustomizedJson"; + return "CustomizedJson2Table"; } @Override public String getType() { - return PayloadFormatter.TREE_TYPE; + return PayloadFormatter.TABLE_TYPE; } } diff --git a/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedLinePayloadFormatter.java b/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedLinePayloadFormatter.java new file mode 100644 index 0000000000000..9156fe0c7b74f --- /dev/null +++ b/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedLinePayloadFormatter.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.mqtt.server; + +import org.apache.iotdb.db.protocol.mqtt.Message; +import org.apache.iotdb.db.protocol.mqtt.PayloadFormatter; +import org.apache.iotdb.db.protocol.mqtt.TableMessage; + +import io.netty.buffer.ByteBuf; +import org.apache.commons.lang3.NotImplementedException; +import org.apache.tsfile.enums.TSDataType; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class CustomizedLinePayloadFormatter implements PayloadFormatter { + + @Override + public List format(String topic, ByteBuf payload) { + // Suppose the payload is a line format + if (payload == null) { + return null; + } + + String line = payload.toString(StandardCharsets.UTF_8); + // parse data from the line and generate Messages and put them into List ret + List ret = new ArrayList<>(); + // this is just an example, so we just generate some Messages directly + for (int i = 0; i < 3; i++) { + long ts = i; + TableMessage message = new TableMessage(); + + // Parsing Database Name + message.setDatabase("db" + i); + + // Parsing Table Names + message.setTable("t" + i); + + // Parsing Tags + List tagKeys = new ArrayList<>(); + tagKeys.add("tag1" + i); + tagKeys.add("tag2" + i); + List tagValues = new ArrayList<>(); + tagValues.add("t_value1" + i); + tagValues.add("t_value2" + i); + message.setTagKeys(tagKeys); + message.setTagValues(tagValues); + + // Parsing Attributes + List attributeKeys = new ArrayList<>(); + List attributeValues = new ArrayList<>(); + attributeKeys.add("attr1" + i); + attributeKeys.add("attr2" + i); + attributeValues.add("a_value1" + i); + attributeValues.add("a_value2" + i); + message.setAttributeKeys(attributeKeys); + message.setAttributeValues(attributeValues); + + // Parsing Fields + List fields = Arrays.asList("field1" + i, "field2" + i); + List dataTypes = Arrays.asList(TSDataType.FLOAT, TSDataType.FLOAT); + List values = Arrays.asList("4.0" + i, "5.0" + i); + message.setFields(fields); + message.setDataTypes(dataTypes); + message.setValues(values); + + //// Parsing timestamp + message.setTimestamp(ts); + ret.add(message); + } + return ret; + } + + @Override + @Deprecated + public List format(ByteBuf payload) { + throw new NotImplementedException(); + } + + @Override + public String getName() { + // set the value of mqtt_payload_formatter in iotdb-system.properties as the following string: + return "CustomizedLine"; + } + + @Override + public String getType() { + return PayloadFormatter.TABLE_TYPE; + } +} diff --git a/example/mqtt/src/main/java/org/apache/iotdb/mqtt/MQTTClient.java b/example/mqtt/src/main/java/org/apache/iotdb/mqtt/MQTTClient.java index ec15ad23567a3..5ac2e7e5ff79f 100644 --- a/example/mqtt/src/main/java/org/apache/iotdb/mqtt/MQTTClient.java +++ b/example/mqtt/src/main/java/org/apache/iotdb/mqtt/MQTTClient.java @@ -40,8 +40,12 @@ public static void main(String[] args) throws Exception { connection.connect(); // the config mqttPayloadFormatter must be tree-json // jsonPayloadFormatter(connection); + // the config mqttPayloadFormatter must be table-line - linePayloadFormatter(connection); + // linePayloadFormatter(connection); + + // test customized json formatter of mqtt payload to insert as table row + customizedJsonPayloadFormatter2Table(connection); connection.disconnect(); } @@ -58,7 +62,10 @@ private static void jsonPayloadFormatter(BlockingConnection connection) throws E + "\"values\":[%f]\n" + "}", System.currentTimeMillis(), random.nextDouble()); - sb.append(payload).append(","); + sb.append(payload); + if (i < 9) { + sb.append(","); + } // publish a json object Thread.sleep(1); @@ -109,4 +116,35 @@ private static void linePayloadFormatter(BlockingConnection connection) throws E connection.publish(DATABASE + "/myTopic", payload.getBytes(), QoS.AT_LEAST_ONCE, false); Thread.sleep(10); } + + /** + * The Customized JSON payload formatter. one json format supported: { "time":1586076045523, + * "deviceID":"car_1", "deviceType":"新能源车", "point":"速度", "value":80.0 } + */ + private static void customizedJsonPayloadFormatter2Table(BlockingConnection connection) + throws Exception { + Random random = new Random(); + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < 10; i++) { + String payload = + String.format( + "{\n" + + "\"time\":%d,\n" + + "\"deviceID\":\"car_1\",\n" + + "\"deviceType\":\"新能源车\",\n" + + "\"point\":\"速度\",\n" + + "\"value\":%.2f\n" + + "}", + System.currentTimeMillis(), random.nextFloat()); + sb.append(payload).append(","); + + // publish a json object + Thread.sleep(1); + connection.publish(DATABASE + "/myTopic", payload.getBytes(), QoS.AT_LEAST_ONCE, false); + } + // publish a json array + sb.insert(0, "["); + sb.replace(sb.lastIndexOf(","), sb.length(), "]"); + connection.publish(DATABASE + "/myTopic", sb.toString().getBytes(), QoS.AT_LEAST_ONCE, false); + } }