Skip to content

Commit c8164cc

Browse files
author
Kai Hudalla
committed
Small improvements regarding test execution.
1 parent 421a71a commit c8164cc

File tree

4 files changed

+49
-15
lines changed

4 files changed

+49
-15
lines changed

Diff for: server/src/main/java/org/eclipse/hono/server/HonoServer.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,10 @@ ProtonServerOptions createServerOptions() {
119119
@Override
120120
public void stop(Future<Void> shutdownHandler) {
121121
if (server != null) {
122-
server.close(done -> shutdownHandler.complete());
122+
server.close(done -> {
123+
LOG.info("HonoServer has been shut down");
124+
shutdownHandler.complete();
125+
});
123126
}
124127
}
125128

Diff for: server/src/main/java/org/eclipse/hono/telemetry/impl/MessageDiscardingTelemetryAdapter.java

+16-2
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
import java.util.HashMap;
1515
import java.util.Map;
16+
import java.util.function.Consumer;
1617

1718
import org.apache.qpid.proton.message.Message;
1819
import org.slf4j.Logger;
@@ -34,20 +35,30 @@ public final class MessageDiscardingTelemetryAdapter extends BaseTelemetryAdapte
3435
private final long pauseThreshold;
3536
private final long pausePeriod;
3637
private Map<String, LinkStatus> statusMap = new HashMap<>();
38+
private Consumer<Message> messageConsumer;
3739

3840
public MessageDiscardingTelemetryAdapter() {
39-
this(0, 0);
41+
this(0, 0, null);
42+
}
43+
44+
/**
45+
*
46+
* @param consumer a consumer that is invoked for every message received.
47+
*/
48+
public MessageDiscardingTelemetryAdapter(final Consumer<Message> consumer) {
49+
this(0, 0, consumer);
4050
}
4151

4252
/**
4353
* @param pauseThreshold the number of messages after which the sender is paused. If set to 0 (zero) the sender will
4454
* never be paused.
4555
* @param pausePeriod the number of milliseconds after which the sender is resumed.
4656
*/
47-
public MessageDiscardingTelemetryAdapter(final long pauseThreshold, final long pausePeriod) {
57+
public MessageDiscardingTelemetryAdapter(final long pauseThreshold, final long pausePeriod, final Consumer<Message> consumer) {
4858
super(0, 1);
4959
this.pauseThreshold = pauseThreshold;
5060
this.pausePeriod = pausePeriod;
61+
this.messageConsumer = consumer;
5162
}
5263

5364
@Override
@@ -60,6 +71,9 @@ public void processTelemetryData(final Message data, final String linkId) {
6071
}
6172
LOG.debug("processing telemetry data [id: {}, to: {}, content-type: {}]", data.getMessageId(), data.getAddress(),
6273
data.getContentType());
74+
if (messageConsumer != null) {
75+
messageConsumer.accept(data);
76+
}
6377
status.onMsgReceived();
6478
}
6579

Diff for: server/src/test/java/org/eclipse/hono/server/HonoServerIntegrationTest.java

+17-11
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,17 @@ public class HonoServerIntegrationTest {
5656
private static final String NAME_HONO_CONNECTION_FACTORY = "hono";
5757
private static final Logger LOG = LoggerFactory.getLogger(HonoServerIntegrationTest.class);
5858
private static final String BIND_ADDRESS = InetAddress.getLoopbackAddress().getHostAddress();
59+
private Connection connection;
60+
private Vertx vertx;
5961

6062
@After
61-
public void disconnect(final TestContext ctx) {
63+
public void disconnect(final TestContext ctx) throws JMSException {
64+
if (connection != null) {
65+
connection.close();
66+
}
67+
if (vertx != null) {
68+
vertx.close(ctx.asyncAssertSuccess());
69+
}
6270
}
6371

6472
private static HonoServer createServer(final Endpoint telemetryEndpoint) {
@@ -72,14 +80,18 @@ private static HonoServer createServer(final Endpoint telemetryEndpoint) {
7280
@Test
7381
public void testTelemetryUpload(final TestContext ctx) throws Exception {
7482

75-
Vertx vertx = Vertx.vertx();
83+
vertx = Vertx.vertx();
7684
LOG.debug("starting telemetry upload test");
7785
int count = 110;
86+
final Async messagesReceived = ctx.async(count);
7887
final Async deployed = ctx.async();
7988

8089
TelemetryEndpoint telemetryEndpoint = new TelemetryEndpoint(vertx, false);
8190
HonoServer server = createServer(telemetryEndpoint);
82-
vertx.deployVerticle(new MessageDiscardingTelemetryAdapter());
91+
vertx.deployVerticle(new MessageDiscardingTelemetryAdapter(msg -> {
92+
messagesReceived.countDown();
93+
LOG.debug("Received message [id: {}]", msg.getMessageId());
94+
}));
8395
vertx.deployVerticle(InMemoryAuthorizationService.class.getName());
8496
vertx.deployVerticle(server, res -> {
8597
ctx.assertTrue(res.succeeded());
@@ -91,7 +103,7 @@ public void testTelemetryUpload(final TestContext ctx) throws Exception {
91103
ConnectionFactory factory = (ConnectionFactory) context.lookup(NAME_HONO_CONNECTION_FACTORY);
92104
Destination telemetryAddress = (Destination) context.lookup(TelemetryConstants.TELEMETRY_ENDPOINT);
93105

94-
Connection connection = factory.createConnection();
106+
connection = factory.createConnection();
95107
connection.setExceptionListener(new ExceptionListener() {
96108

97109
@Override
@@ -105,7 +117,6 @@ public void onException(JMSException exception) {
105117
MessageProducer messageProducer = session.createProducer(telemetryAddress);
106118
messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
107119

108-
long start = System.currentTimeMillis();
109120
for (int i = 1; i <= count; i++) {
110121
BytesMessage message = createTelemetryMessage(session, i);
111122
messageProducer.send(message);
@@ -114,12 +125,7 @@ public void onException(JMSException exception) {
114125
LOG.debug("Sent message {}", i);
115126
}
116127
}
117-
118-
long finish = System.currentTimeMillis();
119-
long taken = finish - start;
120-
LOG.info("Sent {} messages in {}ms", count, taken);
121-
122-
connection.close();
128+
messagesReceived.awaitSuccess(2000);
123129
}
124130

125131
private static Context createInitialContext(final HonoServer server) throws NamingException {

Diff for: tests/src/test/java/org/eclipse/hono/tests/HonoTestSupport.java

+12-1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
import org.eclipse.hono.Application;
1515
import org.eclipse.hono.client.TelemetryClient;
16+
import org.junit.After;
1617
import org.junit.Assert;
1718
import org.junit.Test;
1819
import org.junit.runner.RunWith;
@@ -54,13 +55,23 @@ public class HonoTestSupport {
5455
@Value(value = "${hono.telemetry.downstream.port}")
5556
private int downstreamPort;
5657

58+
@After
59+
public void closeClients() {
60+
if (receiver != null) {
61+
receiver.shutdown();
62+
}
63+
if (sender != null) {
64+
sender.shutdown();
65+
}
66+
}
67+
5768
@Test
5869
public void testTelemetry() throws Exception {
5970
final CountDownLatch received = new CountDownLatch(MSG_COUNT);
6071
receiver = new TelemetryClient(downstreamHostName, downstreamPort, TENANT_ID);
6172

6273
receiver.createReceiver(message -> {
63-
LOGGER.info("Received " + message);
74+
LOGGER.debug("Received message: {}", message);
6475
received.countDown();
6576
}, "telemetry" + pathSeparator + "%s").setHandler(r -> createSender());
6677

0 commit comments

Comments
 (0)