Skip to content

Commit 1cf12a5

Browse files
committed
switch back to custom message properties (device_id and tenant_id) instead of to field
1 parent c8164cc commit 1cf12a5

16 files changed

+265
-200
lines changed

client/src/main/java/org/eclipse/hono/client/TelemetryClient.java

+7-2
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,16 @@
1212
package org.eclipse.hono.client;
1313

1414
import java.nio.ByteBuffer;
15+
import java.util.HashMap;
16+
import java.util.Map;
1517
import java.util.concurrent.CompletableFuture;
1618
import java.util.concurrent.atomic.AtomicLong;
1719
import java.util.function.Consumer;
1820

1921
import javax.annotation.PreDestroy;
2022

2123
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
24+
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
2225
import org.apache.qpid.proton.amqp.messaging.Data;
2326
import org.apache.qpid.proton.amqp.messaging.Section;
2427
import org.apache.qpid.proton.message.Message;
@@ -191,8 +194,10 @@ public void send(final String deviceId, final String body)
191194
final ByteBuffer b = ByteBuffer.allocate(8);
192195
b.putLong(messageTagCounter.getAndIncrement());
193196
b.flip();
194-
final String address = String.format(SENDER_TO_PROPERTY, tenantId, deviceId);
195-
final Message msg = ProtonHelper.message(address, body);
197+
final Message msg = ProtonHelper.message(body);
198+
final Map<String, String> properties = new HashMap<>();
199+
properties.put("device_id", deviceId);
200+
msg.setApplicationProperties(new ApplicationProperties(properties));
196201
honoSender.send(b.array(), msg);
197202
b.clear();
198203
}

server/src/main/java/org/eclipse/hono/registration/RegistrationConstants.java

+9-9
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,9 @@ public final class RegistrationConstants {
3737
public static final String ACTION_DEREGISTER = "deregister";
3838

3939
/* message fields */
40-
public static final String FIELD_NAME_MESSAGE_ID = "message-id";
41-
public static final String FIELD_NAME_ACTION = "action";
42-
public static final String FIELD_NAME_STATUS = "status";
40+
public static final String APP_PROPERTY_MESSAGE_ID = "message-id";
41+
public static final String APP_PROPERTY_ACTION = "action";
42+
public static final String APP_PROPERTY_STATUS = "status";
4343

4444
public static final String REGISTRATION_ENDPOINT = "registration";
4545
public static final String PATH_SEPARATOR = "/";
@@ -67,8 +67,8 @@ public static JsonObject getReply(final int status, final String messageId, fina
6767
final JsonObject jsonObject = new JsonObject();
6868
jsonObject.put(MessageHelper.APP_PROPERTY_TENANT_ID, tenantId);
6969
jsonObject.put(MessageHelper.APP_PROPERTY_DEVICE_ID, deviceId);
70-
jsonObject.put(RegistrationConstants.FIELD_NAME_STATUS, Integer.toString(status));
71-
jsonObject.put(RegistrationConstants.FIELD_NAME_MESSAGE_ID, messageId);
70+
jsonObject.put(RegistrationConstants.APP_PROPERTY_STATUS, Integer.toString(status));
71+
jsonObject.put(RegistrationConstants.APP_PROPERTY_MESSAGE_ID, messageId);
7272
return jsonObject;
7373
}
7474

@@ -77,7 +77,7 @@ public static Message getAmqpReply(final String status, final String messageId,
7777
final HashMap<String, String> map = new HashMap<>();
7878
map.put(MessageHelper.APP_PROPERTY_DEVICE_ID, deviceId);
7979
map.put(MessageHelper.APP_PROPERTY_TENANT_ID, tenantId);
80-
map.put(FIELD_NAME_STATUS, status);
80+
map.put(APP_PROPERTY_STATUS, status);
8181
final ApplicationProperties applicationProperties = new ApplicationProperties(map);
8282

8383
final ResourceIdentifier address = ResourceIdentifier.from(RegistrationConstants.REGISTRATION_ENDPOINT, tenantId, deviceId);
@@ -92,16 +92,16 @@ public static Message getAmqpReply(final String status, final String messageId,
9292

9393
private static JsonObject getRegistrationJson(final String action, final String messageId, final String tenantId, final String deviceId) {
9494
final JsonObject msg = new JsonObject();
95-
msg.put(FIELD_NAME_ACTION, action);
96-
msg.put(FIELD_NAME_MESSAGE_ID, messageId);
95+
msg.put(APP_PROPERTY_ACTION, action);
96+
msg.put(APP_PROPERTY_MESSAGE_ID, messageId);
9797
msg.put(APP_PROPERTY_DEVICE_ID, deviceId);
9898
msg.put(APP_PROPERTY_TENANT_ID, tenantId);
9999
return msg;
100100
}
101101

102102
private static String getAction(final Message msg) {
103103
Objects.requireNonNull(msg);
104-
return (String) getApplicationProperty(msg.getApplicationProperties(), FIELD_NAME_ACTION);
104+
return (String) getApplicationProperty(msg.getApplicationProperties(), APP_PROPERTY_ACTION);
105105
}
106106

107107
private RegistrationConstants() {

server/src/main/java/org/eclipse/hono/registration/RegistrationMessageFilter.java

+26-26
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
*/
1212
package org.eclipse.hono.registration;
1313

14+
import static org.eclipse.hono.registration.RegistrationConstants.APP_PROPERTY_ACTION;
15+
1416
import org.apache.qpid.proton.message.Message;
1517
import org.eclipse.hono.util.MessageHelper;
1618
import org.eclipse.hono.util.ResourceIdentifier;
@@ -31,34 +33,32 @@ private RegistrationMessageFilter() {
3133
/**
3234
* Checks whether a given registration message contains all required properties.
3335
*/
34-
public static boolean verify(final ResourceIdentifier linkTarget, final ResourceIdentifier messageAddress, final Message msg) {
35-
if (messageAddress == null
36-
|| !RegistrationConstants.REGISTRATION_ENDPOINT.equals(messageAddress.getEndpoint())) {
37-
LOG.trace("message [id: {}] has no registration endpoint address [to: {}]", msg.getMessageId(),
38-
messageAddress);
39-
return false;
40-
} else {
41-
return hasValidAddress(linkTarget, messageAddress, msg) && hasValidProperties(msg);
42-
}
43-
}
36+
public static boolean verify(final ResourceIdentifier linkTarget, final Message msg) {
37+
final String deviceIdProperty = MessageHelper.getDeviceId(msg);
38+
final String tenantIdProperty = MessageHelper.getTenantId(msg);
39+
final String actionProperty = (String) MessageHelper.getApplicationProperty(msg.getApplicationProperties(),
40+
APP_PROPERTY_ACTION);
4441

45-
private static boolean hasValidProperties(final Message msg) {
46-
return msg.getApplicationProperties() != null
47-
&& msg.getApplicationProperties().getValue().containsKey(MessageHelper.APP_PROPERTY_TENANT_ID)
48-
&& msg.getApplicationProperties().getValue().containsKey(MessageHelper.APP_PROPERTY_DEVICE_ID);
42+
if (tenantIdProperty != null && !linkTarget.getTenantId().equals(tenantIdProperty)) {
43+
LOG.trace("message property contains invalid tenant ID [expected: {}, but was: {}]",
44+
linkTarget.getTenantId(), tenantIdProperty);
45+
return false;
46+
} else if (deviceIdProperty == null) {
47+
LOG.trace("message [{}] contains no valid device ID", msg.getMessageId());
48+
return false;
49+
} else if (actionProperty == null) {
50+
LOG.trace("message [{}] contains no valid action.", msg.getMessageId());
51+
return false;
52+
} else if (msg.getMessageId() == null) {
53+
LOG.trace("message [{}] contains no valid message id.", msg.getMessageId());
54+
return false;
55+
} else {
56+
final ResourceIdentifier targetResource = ResourceIdentifier
57+
.from(linkTarget.getEndpoint(), linkTarget.getTenantId(), deviceIdProperty);
58+
MessageHelper.annotate(msg, targetResource);
59+
return true;
60+
}
4961
}
5062

51-
private static boolean hasValidAddress(final ResourceIdentifier linkTarget, final ResourceIdentifier messageAddress,
52-
final Message msg) {
53-
if (linkTarget.getTenantId().equals(messageAddress.getTenantId())) {
54-
MessageHelper.addAnnotation(msg, MessageHelper.APP_PROPERTY_TENANT_ID, messageAddress.getTenantId());
55-
MessageHelper.addAnnotation(msg, MessageHelper.APP_PROPERTY_DEVICE_ID, messageAddress.getDeviceId());
56-
return true;
57-
} else {
58-
LOG.trace("message address contains invalid tenant ID [expected: {}, but was: {}]", linkTarget.getTenantId(),
59-
messageAddress.getTenantId());
60-
return false;
61-
}
62-
}
6363

6464
}

server/src/main/java/org/eclipse/hono/registration/impl/BaseRegistrationAdapter.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,8 @@ private void processMessage(final Message<JsonObject> message) {
9696
final JsonObject body = message.body();
9797
final String tenantId = body.getString(MessageHelper.APP_PROPERTY_TENANT_ID);
9898
final String deviceId = body.getString(MessageHelper.APP_PROPERTY_DEVICE_ID);
99-
final String action = body.getString(RegistrationConstants.FIELD_NAME_ACTION);
100-
final String msgId = body.getString(RegistrationConstants.FIELD_NAME_MESSAGE_ID);
99+
final String action = body.getString(RegistrationConstants.APP_PROPERTY_ACTION);
100+
final String msgId = body.getString(RegistrationConstants.APP_PROPERTY_MESSAGE_ID);
101101
processRegistrationMessage(tenantId, deviceId, action, msgId);
102102
}
103103

@@ -106,8 +106,8 @@ protected void reply(final int status, final String deviceId, final String tenan
106106
final JsonObject jsonObject = new JsonObject();
107107
jsonObject.put(MessageHelper.APP_PROPERTY_TENANT_ID, tenantId);
108108
jsonObject.put(MessageHelper.APP_PROPERTY_DEVICE_ID, deviceId);
109-
jsonObject.put(RegistrationConstants.FIELD_NAME_STATUS, Integer.toString(status));
110-
jsonObject.put(RegistrationConstants.FIELD_NAME_MESSAGE_ID, messageId);
109+
jsonObject.put(RegistrationConstants.APP_PROPERTY_STATUS, Integer.toString(status));
110+
jsonObject.put(RegistrationConstants.APP_PROPERTY_MESSAGE_ID, messageId);
111111

112112
LOG.debug("Publishing to event bus at {}: {}", EVENT_BUS_ADDRESS_REGISTRATION_REPLY, jsonObject);
113113

server/src/main/java/org/eclipse/hono/registration/impl/RegistrationEndpoint.java

+7-7
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
import static org.eclipse.hono.registration.RegistrationConstants.EVENT_BUS_ADDRESS_REGISTRATION_IN;
1515
import static org.eclipse.hono.registration.RegistrationConstants.getAmqpReply;
16+
import static org.eclipse.hono.util.MessageHelper.APP_PROPERTY_RESOURCE_ID;
1617
import static org.eclipse.hono.util.MessageHelper.getLinkName;
1718

1819
import java.util.Objects;
@@ -69,9 +70,8 @@ public void onLinkAttach(final ProtonReceiver receiver, final ResourceIdentifier
6970
LOG.debug("incoming message [{}]: {}", receiverImpl.getName(), message);
7071
LOG.debug("app properties: {}", message.getApplicationProperties());
7172

72-
final ResourceIdentifier messageAddress = getResourceIdentifier(message.getAddress());
73-
if (RegistrationMessageFilter.verify(targetAddress, messageAddress, message)) {
74-
sendRegistrationData(delivery, message, messageAddress);
73+
if (RegistrationMessageFilter.verify(targetAddress, message)) {
74+
sendRegistrationData(delivery, message);
7575
} else {
7676
onLinkDetach(receiver);
7777
}
@@ -94,8 +94,8 @@ private void onLinkDetach(final ProtonReceiver client) {
9494
client.close();
9595
}
9696

97-
private void sendRegistrationData(final ProtonDelivery delivery, final Message msg,
98-
final ResourceIdentifier messageAddress) {
97+
private void sendRegistrationData(final ProtonDelivery delivery, final Message msg) {
98+
final ResourceIdentifier messageAddress = ResourceIdentifier.fromString(MessageHelper.getAnnotation(msg, APP_PROPERTY_RESOURCE_ID));
9999
checkPermission(messageAddress, permissionGranted -> {
100100
if (permissionGranted) {
101101
vertx.runOnContext(run -> {
@@ -116,8 +116,8 @@ private void processReplyMessage(final ProtonSender reply, final io.vertx.core.e
116116
final JsonObject body = message.body();
117117
final String tenantId = body.getString(MessageHelper.APP_PROPERTY_TENANT_ID);
118118
final String deviceId = body.getString(MessageHelper.APP_PROPERTY_DEVICE_ID);
119-
final String status = body.getString(RegistrationConstants.FIELD_NAME_STATUS);
120-
final String msgId = body.getString(RegistrationConstants.FIELD_NAME_MESSAGE_ID);
119+
final String status = body.getString(RegistrationConstants.APP_PROPERTY_STATUS);
120+
final String msgId = body.getString(RegistrationConstants.APP_PROPERTY_MESSAGE_ID);
121121
final Message replyMsg = getAmqpReply(status, msgId, tenantId, deviceId);
122122
LOG.debug("Sending reply on link [{}]: {}", getLinkName(reply), replyMsg);
123123
reply.send(replyMsg);

server/src/main/java/org/eclipse/hono/telemetry/TelemetryMessageFilter.java

+18-25
Original file line numberDiff line numberDiff line change
@@ -35,38 +35,31 @@ private TelemetryMessageFilter() {
3535
* <em>annotations</em>:
3636
* </p>
3737
* <ul>
38-
* <li><em>device-id</em> - the ID of the device that reported the data.</li>
39-
* <li><em>tenant-id</em> - the ID of the tenant the device belongs to.</li>
38+
* <li><em>device_id</em> - the ID of the device that reported the data.</li>
39+
* <li><em>tenant_id</em> - the ID of the tenant the device belongs to.</li>
4040
* </ul>
4141
*
4242
* @param linkTarget the link target address to match the telemetry message's address against.
43-
* @param messageAddress the resource identifier representing the message's address as taken from its
44-
* <em>to</em> property.
4543
* @param msg the message to verify.
4644
* @return {@code true} if the given message complies with the <em>Telemetry</em> API specification, {@code false}
4745
* otherwise.
4846
*/
49-
public static boolean verify(final ResourceIdentifier linkTarget, final ResourceIdentifier messageAddress, final Message msg) {
50-
if (messageAddress == null
51-
|| !TelemetryConstants.TELEMETRY_ENDPOINT.equals(messageAddress.getEndpoint())) {
52-
LOG.trace("message [id: {}] has no telemetry endpoint address [to: {}]", msg.getMessageId(),
53-
messageAddress);
54-
return false;
55-
} else {
56-
return hasValidAddress(linkTarget, messageAddress, msg);
57-
}
58-
}
47+
public static boolean verify(final ResourceIdentifier linkTarget, final Message msg) {
48+
final String deviceIdProperty = MessageHelper.getDeviceId(msg);
49+
final String tenantIdProperty = MessageHelper.getTenantId(msg);
5950

60-
private static boolean hasValidAddress(final ResourceIdentifier linkTarget, final ResourceIdentifier messageAddress,
61-
final Message msg) {
62-
if (linkTarget.getTenantId().equals(messageAddress.getTenantId())) {
63-
MessageHelper.addAnnotation(msg, MessageHelper.APP_PROPERTY_TENANT_ID, messageAddress.getTenantId());
64-
MessageHelper.addAnnotation(msg, MessageHelper.APP_PROPERTY_DEVICE_ID, messageAddress.getDeviceId());
65-
return true;
66-
} else {
67-
LOG.trace("message address contains invalid tenant ID [expected: {}, but was: {}]", linkTarget.getTenantId(),
68-
messageAddress.getTenantId());
69-
return false;
70-
}
51+
if (tenantIdProperty != null && !linkTarget.getTenantId().equals(tenantIdProperty)) {
52+
LOG.trace("message property contains invalid tenant ID [expected: {}, but was: {}]",
53+
linkTarget.getTenantId(), tenantIdProperty);
54+
return false;
55+
} else if (deviceIdProperty == null) {
56+
LOG.trace("message [{}] contains no valid device ID", msg.getMessageId());
57+
return false;
58+
} else {
59+
final ResourceIdentifier targetResource = ResourceIdentifier
60+
.from(linkTarget.getEndpoint(), linkTarget.getTenantId(), deviceIdProperty);
61+
MessageHelper.annotate(msg, targetResource);
62+
return true;
63+
}
7164
}
7265
}

server/src/main/java/org/eclipse/hono/telemetry/impl/TelemetryEndpoint.java

+7-4
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import static org.eclipse.hono.telemetry.TelemetryConstants.getLinkDetachedMsg;
2424
import static org.eclipse.hono.telemetry.TelemetryConstants.isErrorMessage;
2525
import static org.eclipse.hono.telemetry.TelemetryConstants.isFlowControlMessage;
26+
import static org.eclipse.hono.util.MessageHelper.APP_PROPERTY_RESOURCE_ID;
2627

2728
import java.util.HashMap;
2829
import java.util.Map;
@@ -35,6 +36,7 @@
3536
import org.eclipse.hono.server.BaseEndpoint;
3637
import org.eclipse.hono.telemetry.TelemetryConstants;
3738
import org.eclipse.hono.telemetry.TelemetryMessageFilter;
39+
import org.eclipse.hono.util.MessageHelper;
3840
import org.eclipse.hono.util.ResourceIdentifier;
3941
import org.slf4j.Logger;
4042
import org.slf4j.LoggerFactory;
@@ -151,9 +153,9 @@ public void onLinkAttach(final ProtonReceiver receiver, final ResourceIdentifier
151153
// client has closed link -> inform TelemetryAdapter about client detach
152154
onLinkDetach(link);
153155
}).handler((delivery, message) -> {
154-
final ResourceIdentifier messageAddress = getResourceIdentifier(message.getAddress());
155-
if (TelemetryMessageFilter.verify(targetAddress, messageAddress, message)) {
156-
sendTelemetryData(link, delivery, message, messageAddress);
156+
157+
if (TelemetryMessageFilter.verify(targetAddress, message)) {
158+
sendTelemetryData(link, delivery, message);
157159
} else {
158160
onLinkDetach(link);
159161
}
@@ -174,10 +176,11 @@ private void onLinkDetach(final LinkWrapper client) {
174176
vertx.eventBus().send(linkControlAddress, msg);
175177
}
176178

177-
private void sendTelemetryData(final LinkWrapper link, final ProtonDelivery delivery, final Message msg, final ResourceIdentifier messageAddress) {
179+
private void sendTelemetryData(final LinkWrapper link, final ProtonDelivery delivery, final Message msg) {
178180
if (!delivery.remotelySettled()) {
179181
LOG.trace("received un-settled telemetry message on link [{}]", link.getLinkId());
180182
}
183+
final ResourceIdentifier messageAddress = ResourceIdentifier.fromString(MessageHelper.getAnnotation(msg, APP_PROPERTY_RESOURCE_ID));
181184
checkPermission(messageAddress, permissionGranted -> {
182185
if (permissionGranted) {
183186
vertx.runOnContext(run -> {

0 commit comments

Comments
 (0)