Skip to content

Commit 0d674c7

Browse files
committed
Initial revision for comment.
1 parent f71303b commit 0d674c7

30 files changed

+1804
-6
lines changed

bindings/mqtt/core/pom.xml

+31
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>cloudevents-parent</artifactId>
7+
<groupId>io.cloudevents</groupId>
8+
<version>2.5.0-SNAPSHOT</version>
9+
<relativePath>../../../pom.xml</relativePath>
10+
</parent>
11+
<modelVersion>4.0.0</modelVersion>
12+
13+
<artifactId>cloudevents-mqtt-core</artifactId>
14+
<name>CloudEvents - MQTT Common</name>
15+
<packaging>jar</packaging>
16+
17+
<properties>
18+
<module-name>io.cloudevents.mqtt.core</module-name>
19+
</properties>
20+
21+
<dependencies>
22+
23+
<dependency>
24+
<groupId>io.cloudevents</groupId>
25+
<artifactId>cloudevents-core</artifactId>
26+
<version>${project.version}</version>
27+
</dependency>
28+
29+
</dependencies>
30+
31+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package io.cloudevents.mqtt.core;
2+
3+
import io.cloudevents.SpecVersion;
4+
import io.cloudevents.core.data.BytesCloudEventData;
5+
import io.cloudevents.core.message.impl.BaseGenericBinaryMessageReaderImpl;
6+
import io.cloudevents.core.v1.CloudEventV1;
7+
8+
import java.util.function.BiConsumer;
9+
import java.util.regex.Matcher;
10+
import java.util.regex.Pattern;
11+
12+
/**
13+
* Enable the hydration of a CloudEvent in binary mode from an MQTT message.
14+
* <p>
15+
* This abstract class provides common behavior across different MQTT
16+
* client implementations.
17+
*/
18+
public abstract class BaseMqttBinaryMessageReader extends BaseGenericBinaryMessageReaderImpl<String, Object> {
19+
20+
/**
21+
* CloudEvent attribute names must match this pattern.
22+
*/
23+
private static final Pattern CE_ATTR_NAME_REGEX = Pattern.compile("^[a-z\\d]+$");
24+
private final String contentType;
25+
26+
/**
27+
* Initialise the binary message reader.
28+
* @param version The CloudEvent message version.
29+
* @param contentType The assigned media content type.
30+
* @param payload The raw data payload from the MQTT message.
31+
*/
32+
protected BaseMqttBinaryMessageReader(final SpecVersion version, final String contentType, final byte[] payload) {
33+
super(version, payload != null && payload.length > 0 ? BytesCloudEventData.wrap(payload) : null);
34+
this.contentType = contentType;
35+
}
36+
37+
// --- Overrides
38+
39+
@Override
40+
protected boolean isContentTypeHeader(String key) {
41+
return false; // The content type is not defined in a user-property
42+
}
43+
44+
@Override
45+
protected boolean isCloudEventsHeader(String key) {
46+
47+
// The binding specification does not require name prefixing,
48+
// as such any user-property is a potential CE Context Attribute.
49+
//
50+
// If the name complies with CE convention then we'll assume
51+
// it's a context attribute.
52+
//
53+
Matcher m = CE_ATTR_NAME_REGEX.matcher(key);
54+
return m.matches();
55+
}
56+
57+
@Override
58+
protected String toCloudEventsKey(String key) {
59+
return key; // No special prefixing occurs in the MQTT binding spec.
60+
}
61+
62+
63+
@Override
64+
protected void forEachHeader(BiConsumer<String, Object> fn) {
65+
66+
// If there is a content-type then we need set it.
67+
// Inspired by AMQP/Proton code :-)
68+
69+
if (contentType != null) {
70+
fn.accept(CloudEventV1.DATACONTENTTYPE, contentType);
71+
}
72+
73+
// Now process each MQTT User Property.
74+
forEachUserProperty(fn);
75+
76+
}
77+
78+
@Override
79+
protected String toCloudEventsValue(Object value) {
80+
return value.toString();
81+
}
82+
83+
/**
84+
* Visit each MQTT user-property and invoke the supplied function.
85+
* @param fn The function to invoke for each MQTT User property.
86+
*/
87+
protected abstract void forEachUserProperty(BiConsumer<String, Object> fn);
88+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package io.cloudevents.mqtt.core;
2+
3+
import io.cloudevents.core.format.EventFormat;
4+
import io.cloudevents.core.provider.EventFormatProvider;
5+
6+
/**
7+
* General MQTT Utilities and Helpers
8+
*/
9+
public class MqttUtils {
10+
11+
private MqttUtils() {}
12+
13+
private static final String DEFAULT_FORMAT = "application/cloudevents+json";
14+
15+
/**
16+
* Obtain the {@link EventFormat} to use when working with MQTT V3
17+
* messages.
18+
*
19+
* @return An event format.
20+
*/
21+
public static EventFormat getDefaultEventFormat () {
22+
23+
return EventFormatProvider.getInstance().resolveFormat(DEFAULT_FORMAT);
24+
25+
}
26+
27+
/**
28+
* Get the default content type to assume for MQTT messages.
29+
* @return A Content-Type
30+
*/
31+
public static final String getDefaultContentType() {
32+
return DEFAULT_FORMAT;
33+
}
34+
35+
}

bindings/mqtt/hivemq/pom.xml

+80
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>cloudevents-parent</artifactId>
7+
<groupId>io.cloudevents</groupId>
8+
<version>2.5.0-SNAPSHOT</version>
9+
<relativePath>../../../pom.xml</relativePath>
10+
</parent>
11+
12+
<modelVersion>4.0.0</modelVersion>
13+
14+
<artifactId>cloudevents-mqtt-hivemq</artifactId>
15+
<name>CloudEvents - MQTT HiveMQ Binding</name>
16+
<packaging>jar</packaging>
17+
18+
<properties>
19+
<module-name>io.cloudevents.mqtt.hivemq</module-name>
20+
<hivemq.version>1.3.0</hivemq.version>
21+
</properties>
22+
23+
<dependencies>
24+
25+
<dependency>
26+
<groupId>io.cloudevents</groupId>
27+
<artifactId>cloudevents-core</artifactId>
28+
<version>${project.version}</version>
29+
</dependency>
30+
31+
<dependency>
32+
<groupId>io.cloudevents</groupId>
33+
<artifactId>cloudevents-mqtt-core</artifactId>
34+
<version>${project.version}</version>
35+
</dependency>
36+
37+
<dependency>
38+
<groupId>com.hivemq</groupId>
39+
<artifactId>hivemq-mqtt-client</artifactId>
40+
<version>${hivemq.version}</version>
41+
<scope>provided</scope>
42+
</dependency>
43+
44+
<!-- Testing Dependencies -->
45+
46+
<dependency>
47+
<groupId>io.cloudevents</groupId>
48+
<artifactId>cloudevents-core</artifactId>
49+
<classifier>tests</classifier>
50+
<type>test-jar</type>
51+
<version>${project.version}</version>
52+
<scope>test</scope>
53+
</dependency>
54+
55+
<!-- We need a JSON Format for V3 compliance checking -->
56+
57+
<dependency>
58+
<groupId>io.cloudevents</groupId>
59+
<artifactId>cloudevents-json-jackson</artifactId>
60+
<version>${project.version}</version>
61+
<scope>test</scope>
62+
</dependency>
63+
64+
<dependency>
65+
<groupId>org.assertj</groupId>
66+
<artifactId>assertj-core</artifactId>
67+
<version>${assertj-core.version}</version>
68+
<scope>test</scope>
69+
</dependency>
70+
71+
<dependency>
72+
<groupId>org.junit.jupiter</groupId>
73+
<artifactId>junit-jupiter</artifactId>
74+
<version>${junit-jupiter.version}</version>
75+
<scope>test</scope>
76+
</dependency>
77+
78+
</dependencies>
79+
80+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package io.cloudevents.mqtt.hivemq;
2+
3+
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
4+
import io.cloudevents.SpecVersion;
5+
import io.cloudevents.mqtt.core.BaseMqttBinaryMessageReader;
6+
7+
import java.util.function.BiConsumer;
8+
9+
final class BinaryMessageReader extends BaseMqttBinaryMessageReader {
10+
11+
Mqtt5Publish message;
12+
13+
BinaryMessageReader(final SpecVersion version, final String contentType, Mqtt5Publish message) {
14+
super(version, contentType, message.getPayloadAsBytes());
15+
16+
this.message = message;
17+
}
18+
19+
@Override
20+
protected void forEachUserProperty(BiConsumer<String, Object> fn) {
21+
22+
message.getUserProperties().asList().forEach(up -> {
23+
24+
final String key = up.getName().toString();
25+
final String val = up.getValue().toString();
26+
27+
if (key != null && val != null) {
28+
fn.accept(key, val);
29+
}
30+
});
31+
32+
}
33+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
package io.cloudevents.mqtt.hivemq;
2+
3+
import com.hivemq.client.mqtt.datatypes.MqttUtf8String;
4+
import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish;
5+
import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3PublishBuilder;
6+
import com.hivemq.client.mqtt.mqtt5.datatypes.Mqtt5UserProperty;
7+
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
8+
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishBuilder;
9+
import io.cloudevents.core.message.MessageReader;
10+
import io.cloudevents.core.message.MessageWriter;
11+
import io.cloudevents.core.message.impl.GenericStructuredMessageReader;
12+
import io.cloudevents.core.message.impl.MessageUtils;
13+
import io.cloudevents.core.v1.CloudEventV1;
14+
import io.cloudevents.mqtt.core.MqttUtils;
15+
16+
import java.util.List;
17+
import java.util.Optional;
18+
19+
/**
20+
* A factory to obtain:
21+
* - {@link MessageReader} instances to read CloudEvents from MQTT messages.
22+
* - {@link MessageWriter} instances to write CloudEvents into MQTT messages.
23+
*
24+
*/
25+
public class MqttMessageFactory {
26+
27+
// Prevent Instantiation.
28+
private MqttMessageFactory() {
29+
}
30+
31+
/**
32+
* Create a {@link MessageReader} for an MQTT V3 message.
33+
* <p>
34+
* As-Per MQTT Binding specification this only supports
35+
* a structured JSON Format message.
36+
*
37+
* @param message An MQTT V3 message.
38+
* @return MessageReader.
39+
*/
40+
public static MessageReader createReader(Mqtt3Publish message) {
41+
return new GenericStructuredMessageReader(MqttUtils.getDefaultEventFormat(), message.getPayloadAsBytes());
42+
}
43+
44+
/**
45+
* Create a {@link MessageReader} for an MQTT V5 message
46+
*
47+
* @param message An MQTT V5 message.
48+
* @return A message reader.
49+
*/
50+
public static MessageReader createReader(Mqtt5Publish message) {
51+
52+
Optional<MqttUtf8String> cType = message.getContentType();
53+
54+
String contentType = cType.isPresent() ? cType.get().toString() : null;
55+
56+
return MessageUtils.parseStructuredOrBinaryMessage(
57+
() -> contentType,
58+
format -> new GenericStructuredMessageReader(format, message.getPayloadAsBytes()),
59+
() -> getSpecVersion(message),
60+
sv -> new BinaryMessageReader(sv, contentType, message)
61+
);
62+
}
63+
64+
65+
/**
66+
* Create a {@link MessageWriter} for an MQTT V5 Message.
67+
*
68+
* @param builder {@link Mqtt5PublishBuilder.Complete}
69+
* @return A message writer.
70+
*/
71+
public static MessageWriter createWriter(Mqtt5PublishBuilder.Complete builder) {
72+
return new V5MessageWriter(builder);
73+
}
74+
75+
/**
76+
* Create a {@link MessageWriter} for an MQTT V3 Message.
77+
*
78+
* Only supports structured messages.
79+
*
80+
* @param builder {@link Mqtt3PublishBuilder.Complete}
81+
* @return A message writer.
82+
*/
83+
public static MessageWriter createWriter(Mqtt3PublishBuilder.Complete builder) {
84+
return new V3MessageWriter(builder);
85+
}
86+
87+
88+
// -- Private functions
89+
90+
/**
91+
* Find the value of the CloudEvent 'specversion' in the MQTT V5 User Properties.
92+
* @param message An MQTT message.
93+
* @return spec version attribute content.
94+
*/
95+
private static String getSpecVersion(Mqtt5Publish message) {
96+
97+
List<Mqtt5UserProperty> props = (List<Mqtt5UserProperty>) message.getUserProperties().asList();
98+
99+
Optional<Mqtt5UserProperty> up = props.stream().filter(p -> p.getName().toString().equals(CloudEventV1.SPECVERSION)).findFirst();
100+
101+
return (up.isPresent()) ? up.get().getValue().toString() : null;
102+
103+
}
104+
105+
}

0 commit comments

Comments
 (0)