1
1
/*******************************************************************************
2
- * Copyright (c) 2019, 2022 Contributors to the Eclipse Foundation
2
+ * Copyright (c) 2019 Contributors to the Eclipse Foundation
3
3
*
4
4
* See the NOTICE file(s) distributed with this work for additional
5
5
* information regarding copyright ownership.
75
75
import io .vertx .mqtt .messages .MqttPublishMessage ;
76
76
import io .vertx .proton .ProtonHelper ;
77
77
78
-
79
78
/**
80
79
* Integration tests for sending commands to device connected to the MQTT adapter.
81
80
*
@@ -164,7 +163,7 @@ public void testSendOneWayCommandSucceeds(
164
163
final String commandTargetDeviceId = endpointConfig .isSubscribeAsGateway ()
165
164
? helper .setupGatewayDeviceBlocking (tenantId , deviceId , 5 )
166
165
: deviceId ;
167
- final Checkpoint commandsReceived = ctx . checkpoint (COMMANDS_TO_SEND );
166
+ final var remainingCommandsToBeProcessed = new CountDownLatch (COMMANDS_TO_SEND );
168
167
169
168
final AtomicInteger counter = new AtomicInteger ();
170
169
testSendCommandSucceeds (ctx , commandTargetDeviceId , msg -> {
@@ -173,7 +172,7 @@ public void testSendOneWayCommandSucceeds(
173
172
ctx .verify (() -> {
174
173
endpointConfig .assertCommandPublishTopicStructure (topic , commandTargetDeviceId , true , "setValue" );
175
174
});
176
- commandsReceived . flag ();
175
+ remainingCommandsToBeProcessed . countDown ();
177
176
}, payload -> {
178
177
counter .incrementAndGet ();
179
178
return helper .sendOneWayCommand (
@@ -183,7 +182,7 @@ public void testSendOneWayCommandSucceeds(
183
182
"text/plain" ,
184
183
payload ,
185
184
helper .getSendCommandTimeout (counter .get () == 1 ));
186
- }, endpointConfig , COMMANDS_TO_SEND , MqttQoS .AT_MOST_ONCE );
185
+ }, endpointConfig , remainingCommandsToBeProcessed , MqttQoS .AT_MOST_ONCE );
187
186
}
188
187
189
188
/**
@@ -196,6 +195,7 @@ public void testSendOneWayCommandSucceeds(
196
195
*/
197
196
@ ParameterizedTest (name = IntegrationTestSupport .PARAMETERIZED_TEST_NAME_PATTERN )
198
197
@ MethodSource ("allCombinations" )
198
+ @ Timeout (timeUnit = TimeUnit .SECONDS , value = 10 )
199
199
public void testSendCommandSucceedsWithQos0 (
200
200
final MqttCommandEndpointConfiguration endpointConfig ,
201
201
final VertxTestContext ctx ) throws InterruptedException {
@@ -213,6 +213,7 @@ public void testSendCommandSucceedsWithQos0(
213
213
*/
214
214
@ ParameterizedTest (name = IntegrationTestSupport .PARAMETERIZED_TEST_NAME_PATTERN )
215
215
@ MethodSource ("allCombinations" )
216
+ @ Timeout (timeUnit = TimeUnit .SECONDS , value = 10 )
216
217
public void testSendCommandSucceedsWithQos1 (
217
218
final MqttCommandEndpointConfiguration endpointConfig ,
218
219
final VertxTestContext ctx ) throws InterruptedException {
@@ -230,6 +231,7 @@ private void testSendCommandSucceeds(
230
231
: deviceId ;
231
232
232
233
final AtomicInteger counter = new AtomicInteger ();
234
+ final var remainingCommandsToBeProcessed = new CountDownLatch (COMMANDS_TO_SEND );
233
235
testSendCommandSucceeds (ctx , commandTargetDeviceId , msg -> {
234
236
LOGGER .trace ("received command [{}]" , msg .topicName ());
235
237
final ResourceIdentifier topic = ResourceIdentifier .fromString (msg .topicName ());
@@ -267,9 +269,10 @@ private void testSendCommandSucceeds(
267
269
DownstreamMessageAssertions .assertCommandAndControlApiProperties (
268
270
response , tenantId , commandTargetDeviceId );
269
271
});
272
+ remainingCommandsToBeProcessed .countDown ();
270
273
return (Void ) null ;
271
274
});
272
- }, endpointConfig , COMMANDS_TO_SEND , qos );
275
+ }, endpointConfig , remainingCommandsToBeProcessed , qos );
273
276
}
274
277
275
278
private void testSendCommandSucceeds (
@@ -278,27 +281,29 @@ private void testSendCommandSucceeds(
278
281
final Handler <MqttPublishMessage > commandConsumer ,
279
282
final Function <Buffer , Future <Void >> commandSender ,
280
283
final MqttCommandEndpointConfiguration endpointConfig ,
281
- final int totalNoOfCommandsToSend ,
284
+ final CountDownLatch remainingCommandsToBeProcessed ,
282
285
final MqttQoS subscribeQos ) throws InterruptedException {
283
286
284
287
final VertxTestContext setup = new VertxTestContext ();
285
288
final Checkpoint ready = setup .checkpoint (2 );
286
289
287
290
helper .registry
288
- .addDeviceToTenant (tenantId , deviceId , password )
289
- .compose (ok -> connectToAdapter (IntegrationTestSupport .getUsername (deviceId , tenantId ), password ))
290
- .compose (ok -> createConsumer (tenantId , msg -> {
291
- // expect empty notification with TTD -1
292
- setup .verify (() -> assertThat (msg .getContentType ()).isEqualTo (EventConstants .CONTENT_TYPE_EMPTY_NOTIFICATION ));
293
- final TimeUntilDisconnectNotification notification = msg .getTimeUntilDisconnectNotification ().orElse (null );
294
- LOGGER .info ("received notification [{}]" , notification );
295
- setup .verify (() -> assertThat (notification ).isNotNull ());
296
- if (notification .getTtd () == -1 ) {
297
- ready .flag ();
298
- }
299
- }))
300
- .compose (conAck -> subscribeToCommands (commandTargetDeviceId , commandConsumer , endpointConfig , subscribeQos ))
301
- .onComplete (setup .succeeding (ok -> ready .flag ()));
291
+ .addDeviceToTenant (tenantId , deviceId , password )
292
+ .compose (ok -> connectToAdapter (IntegrationTestSupport .getUsername (deviceId , tenantId ), password ))
293
+ .compose (ok -> createConsumer (tenantId , msg -> {
294
+ // expect empty notification with TTD -1
295
+ setup .verify (() -> assertThat (msg .getContentType ())
296
+ .isEqualTo (EventConstants .CONTENT_TYPE_EMPTY_NOTIFICATION ));
297
+ final TimeUntilDisconnectNotification notification = msg .getTimeUntilDisconnectNotification ()
298
+ .orElse (null );
299
+ LOGGER .info ("received notification [{}]" , notification );
300
+ setup .verify (() -> assertThat (notification ).isNotNull ());
301
+ if (notification .getTtd () == -1 ) {
302
+ ready .flag ();
303
+ }
304
+ }))
305
+ .compose (conAck -> subscribeToCommands (commandTargetDeviceId , commandConsumer , endpointConfig , subscribeQos ))
306
+ .onComplete (setup .succeeding (ok -> ready .flag ()));
302
307
303
308
assertWithMessage ("setup of adapter finished within %s seconds" , IntegrationTestSupport .getTestSetupTimeout ())
304
309
.that (setup .awaitCompletion (IntegrationTestSupport .getTestSetupTimeout (), TimeUnit .SECONDS ))
@@ -307,30 +312,23 @@ private void testSendCommandSucceeds(
307
312
ctx .failNow (setup .causeOfFailure ());
308
313
return ;
309
314
}
310
-
311
- final Checkpoint sendCommandsSucceeded = ctx .checkpoint ();
312
- final CountDownLatch commandsSucceeded = new CountDownLatch (totalNoOfCommandsToSend );
315
+ final var totalNoOfCommandsToSend = remainingCommandsToBeProcessed .getCount ();
313
316
final AtomicInteger commandsSent = new AtomicInteger (0 );
314
- final AtomicLong lastReceivedTimestamp = new AtomicLong (0 );
317
+ final AtomicLong lastSentTimestamp = new AtomicLong (0 );
315
318
final long start = System .currentTimeMillis ();
316
319
317
320
while (commandsSent .get () < totalNoOfCommandsToSend ) {
318
321
final CountDownLatch commandSent = new CountDownLatch (1 );
319
322
context .runOnContext (go -> {
320
- commandsSent .getAndIncrement ();
323
+ commandsSent .incrementAndGet ();
321
324
final Buffer msg = commandsSent .get () % 2 == 0
322
325
? Buffer .buffer ("value: " + commandsSent .get ())
323
326
: null ; // use 'null' payload for half the commands, ensuring such commands also get forwarded
324
327
commandSender .apply (msg ).onComplete (sendAttempt -> {
325
328
if (sendAttempt .failed ()) {
326
329
LOGGER .info ("error sending command {}" , commandsSent .get (), sendAttempt .cause ());
327
- } else {
328
- lastReceivedTimestamp .set (System .currentTimeMillis ());
329
- commandsSucceeded .countDown ();
330
- if (commandsSucceeded .getCount () % 20 == 0 ) {
331
- LOGGER .info ("commands succeeded: {}" , totalNoOfCommandsToSend - commandsSucceeded .getCount ());
332
- }
333
330
}
331
+ lastSentTimestamp .set (System .currentTimeMillis ());
334
332
if (commandsSent .get () % 20 == 0 ) {
335
333
LOGGER .info ("commands sent: " + commandsSent .get ());
336
334
}
@@ -342,20 +340,20 @@ private void testSendCommandSucceeds(
342
340
}
343
341
344
342
final long timeToWait = totalNoOfCommandsToSend * 200 ;
345
- if (!commandsSucceeded .await (timeToWait , TimeUnit .MILLISECONDS )) {
346
- LOGGER .info ("Timeout of {} milliseconds reached, stop waiting for commands to succeed " , timeToWait );
343
+ if (!remainingCommandsToBeProcessed .await (timeToWait , TimeUnit .MILLISECONDS )) {
344
+ LOGGER .info ("Timeout of {} milliseconds reached, stop waiting for commands to be processed " , timeToWait );
347
345
}
348
- if (lastReceivedTimestamp .get () == 0L ) {
346
+ if (lastSentTimestamp .get () == 0L ) {
349
347
// no message has been received at all
350
- lastReceivedTimestamp .set (System .currentTimeMillis ());
348
+ lastSentTimestamp .set (System .currentTimeMillis ());
351
349
}
352
- final long commandsCompleted = totalNoOfCommandsToSend - commandsSucceeded .getCount ();
353
- LOGGER .info ("commands sent: {}, commands succeeded : {} after {} milliseconds" ,
354
- commandsSent .get (), commandsCompleted , lastReceivedTimestamp .get () - start );
355
- if (commandsCompleted == commandsSent .get ()) {
356
- sendCommandsSucceeded . flag ();
350
+ final long commandsProcessed = totalNoOfCommandsToSend - remainingCommandsToBeProcessed .getCount ();
351
+ LOGGER .info ("commands sent: {}, commands processed : {} after {} milliseconds" ,
352
+ commandsSent .get (), commandsProcessed , lastSentTimestamp .get () - start );
353
+ if (commandsProcessed == commandsSent .get ()) {
354
+ ctx . completeNow ();
357
355
} else {
358
- ctx .failNow (new IllegalStateException ("did not complete all commands sent" ));
356
+ ctx .failNow (new IllegalStateException ("device did not process all commands that had been sent" ));
359
357
}
360
358
}
361
359
@@ -394,18 +392,18 @@ public void testSendCommandViaAmqpFailsForMalformedMessage(
394
392
ready .flag ();
395
393
}
396
394
})
397
- .compose (consumer -> helper .registry .addDeviceToTenant (tenantId , deviceId , password ))
398
- .compose (ok -> connectToAdapter (IntegrationTestSupport .getUsername (deviceId , tenantId ), password ))
399
- .compose (conAck -> subscribeToCommands (commandTargetDeviceId , msg -> {
400
- // all commands should get rejected because they fail to pass the validity check
401
- ctx .failNow (new IllegalStateException ("should not have received command" ));
402
- }, endpointConfig , MqttQoS .AT_MOST_ONCE ))
403
- .compose (ok -> helper .createGenericAmqpMessageSender (endpointConfig .getNorthboundEndpoint (), tenantId ))
404
- .onComplete (setup .succeeding (genericSender -> {
405
- LOGGER .debug ("created generic sender for sending commands [target address: {}]" , linkTargetAddress );
406
- amqpCmdSenderRef .set (genericSender );
407
- ready .flag ();
408
- }));
395
+ .compose (consumer -> helper .registry .addDeviceToTenant (tenantId , deviceId , password ))
396
+ .compose (ok -> connectToAdapter (IntegrationTestSupport .getUsername (deviceId , tenantId ), password ))
397
+ .compose (conAck -> subscribeToCommands (commandTargetDeviceId , msg -> {
398
+ // all commands should get rejected because they fail to pass the validity check
399
+ ctx .failNow (new IllegalStateException ("should not have received command" ));
400
+ }, endpointConfig , MqttQoS .AT_MOST_ONCE ))
401
+ .compose (ok -> helper .createGenericAmqpMessageSender (endpointConfig .getNorthboundEndpoint (), tenantId ))
402
+ .onComplete (setup .succeeding (genericSender -> {
403
+ LOGGER .debug ("created generic sender for sending commands [target address: {}]" , linkTargetAddress );
404
+ amqpCmdSenderRef .set (genericSender );
405
+ ready .flag ();
406
+ }));
409
407
410
408
assertWithMessage ("setup of adapter finished within %s seconds" , IntegrationTestSupport .getTestSetupTimeout ())
411
409
.that (setup .awaitCompletion (IntegrationTestSupport .getTestSetupTimeout (), TimeUnit .SECONDS ))
@@ -423,10 +421,11 @@ public void testSendCommandViaAmqpFailsForMalformedMessage(
423
421
messageWithoutSubject .setAddress (messageAddress );
424
422
messageWithoutSubject .setMessageId ("message-id" );
425
423
messageWithoutSubject .setReplyTo ("reply/to/address" );
426
- amqpCmdSenderRef .get ().sendAndWaitForOutcome (messageWithoutSubject , NoopSpan .INSTANCE ).onComplete (ctx .failing (t -> {
427
- ctx .verify (() -> assertThat (t ).isInstanceOf (ClientErrorException .class ));
428
- failedAttempts .flag ();
429
- }));
424
+ amqpCmdSenderRef .get ().sendAndWaitForOutcome (messageWithoutSubject , NoopSpan .INSTANCE )
425
+ .onComplete (ctx .failing (t -> {
426
+ ctx .verify (() -> assertThat (t ).isInstanceOf (ClientErrorException .class ));
427
+ failedAttempts .flag ();
428
+ }));
430
429
431
430
LOGGER .debug ("sending command message lacking message ID and correlation ID" );
432
431
final Message messageWithoutId = ProtonHelper .message ("input data" );
@@ -465,12 +464,13 @@ public void testSendCommandViaKafkaFailsForMalformedMessage(
465
464
final VertxTestContext setup = new VertxTestContext ();
466
465
final Checkpoint ready = setup .checkpoint (2 );
467
466
468
- final Future <MessageConsumer > kafkaAsyncErrorResponseConsumer = helper .createDeliveryFailureCommandResponseConsumer (
469
- ctx ,
470
- tenantId ,
471
- HttpURLConnection .HTTP_BAD_REQUEST ,
472
- response -> expectedCommandResponses .countDown (),
473
- null );
467
+ final Future <MessageConsumer > kafkaAsyncErrorResponseConsumer = helper
468
+ .createDeliveryFailureCommandResponseConsumer (
469
+ ctx ,
470
+ tenantId ,
471
+ HttpURLConnection .HTTP_BAD_REQUEST ,
472
+ response -> expectedCommandResponses .countDown (),
473
+ null );
474
474
475
475
createConsumer (tenantId , msg -> {
476
476
// expect empty notification with TTD -1
@@ -505,8 +505,7 @@ public void testSendCommandViaKafkaFailsForMalformedMessage(
505
505
final Map <String , Object > properties1 = Map .of (
506
506
MessageHelper .APP_PROPERTY_DEVICE_ID , deviceId ,
507
507
MessageHelper .SYS_PROPERTY_CONTENT_TYPE , MessageHelper .CONTENT_TYPE_OCTET_STREAM ,
508
- KafkaRecordHelper .HEADER_RESPONSE_REQUIRED , true
509
- );
508
+ KafkaRecordHelper .HEADER_RESPONSE_REQUIRED , true );
510
509
kafkaSenderRef .get ().sendAndWaitForOutcome (commandTopic , tenantId , deviceId , Buffer .buffer (), properties1 )
511
510
.onComplete (ctx .succeeding (ok -> {}));
512
511
@@ -516,8 +515,7 @@ public void testSendCommandViaKafkaFailsForMalformedMessage(
516
515
MessageHelper .SYS_PROPERTY_CORRELATION_ID , correlationId ,
517
516
MessageHelper .APP_PROPERTY_DEVICE_ID , deviceId ,
518
517
MessageHelper .SYS_PROPERTY_CONTENT_TYPE , MessageHelper .CONTENT_TYPE_OCTET_STREAM ,
519
- KafkaRecordHelper .HEADER_RESPONSE_REQUIRED , true
520
- );
518
+ KafkaRecordHelper .HEADER_RESPONSE_REQUIRED , true );
521
519
kafkaSenderRef .get ().sendAndWaitForOutcome (commandTopic , tenantId , deviceId , Buffer .buffer (), properties2 )
522
520
.onComplete (ctx .succeeding (ok -> {}));
523
521
@@ -576,7 +574,7 @@ public void testSendCommandFailsForCommandNotAcknowledgedByDevice(
576
574
counter .incrementAndGet ();
577
575
return helper .sendCommand (tenantId , commandTargetDeviceId , "setValue" , "text/plain" , payload ,
578
576
helper .getSendCommandTimeout (counter .get () == 1 ))
579
- .mapEmpty ();
577
+ .mapEmpty ();
580
578
};
581
579
582
580
helper .registry
@@ -703,16 +701,16 @@ private Future<Void> injectMqttClientPubAckBlocker(final AtomicBoolean outboundP
703
701
final NetSocketInternal connection = (NetSocketInternal ) connectionMethod .invoke (mqttClient );
704
702
connection .channelHandlerContext ().pipeline ().addBefore ("handler" , "OutboundPubAckBlocker" ,
705
703
new ChannelOutboundHandlerAdapter () {
706
- @ Override
704
+ @ Override
707
705
public void write (final ChannelHandlerContext ctx , final Object msg , final ChannelPromise promise )
708
- throws Exception {
706
+ throws Exception {
709
707
if (outboundPubAckBlocked .get () && msg instanceof io .netty .handler .codec .mqtt .MqttPubAckMessage ) {
710
708
LOGGER .debug ("suppressing PubAck, message id: {}" , ((MqttPubAckMessage ) msg ).variableHeader ().messageId ());
711
- } else {
712
- super .write (ctx , msg , promise );
713
- }
714
- }
715
- });
709
+ } else {
710
+ super .write (ctx , msg , promise );
711
+ }
712
+ }
713
+ });
716
714
return Future .succeededFuture ();
717
715
} catch (final Exception e ) {
718
716
LOGGER .error ("failed to inject PubAck blocking handler" );
0 commit comments