Skip to content

Commit 24bb1d0

Browse files
author
Achim Kraus
committed
Add on connecting and dtls retransmission.
Signed-off-by: Achim Kraus <[email protected]>
1 parent eed505f commit 24bb1d0

File tree

11 files changed

+235
-8
lines changed

11 files changed

+235
-8
lines changed

californium-core/src/main/java/org/eclipse/californium/core/coap/Message.java

+23
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
* Achim Kraus (Bosch Software Innovations GmbH) - move onContextEstablished
3939
* to MessageObserver.
4040
* Issue #487
41+
* Achim Kraus (Bosch Software Innovations GmbH) - add onConnect
4142
******************************************************************************/
4243
package org.eclipse.californium.core.coap;
4344

@@ -707,6 +708,28 @@ public void setReadyToSend() {
707708
}
708709
}
709710

711+
/**
712+
* Indicate, that this message triggered the connector to establish a
713+
* connection. Not part of the fluent API.
714+
*/
715+
public void onConnecting() {
716+
for (MessageObserver handler : getMessageObservers()) {
717+
handler.onConnecting();
718+
}
719+
}
720+
721+
/**
722+
* Indicate, that this message triggered the connector to establish a
723+
* connection and a dtls handshake flight was retransmitted.
724+
*
725+
* @param flight {@code 1 ... 6}, number of retransmitted flight.
726+
*/
727+
public void onDtlsRetransmission(int flight) {
728+
for (MessageObserver handler : getMessageObservers()) {
729+
handler.onDtlsRetransmission(flight);
730+
}
731+
}
732+
710733
/**
711734
* Checks if this message has been sent.
712735
*

californium-core/src/main/java/org/eclipse/californium/core/coap/MessageObserver.java

+19
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
* Achim Kraus (Bosch Software Innovations GmbH) - move onContextEstablished
2626
* to MessageObserver.
2727
* Issue #487
28+
* Achim Kraus (Bosch Software Innovations GmbH) - add onConnect
2829
******************************************************************************/
2930
package org.eclipse.californium.core.coap;
3031

@@ -42,6 +43,10 @@
4243
* still has not received anything from the remote endpoint</li>
4344
* <li>{@link #onCancel()} when the message has been canceled</li>
4445
* <li>{@link #onReadyToSend()} right before the message is being sent</li>
46+
* <li>{@link #onConnecting()} right before a connector establish a connection.
47+
* Not called, if the connection is already established or the connector doesn't
48+
* require to establish a connection.</li>
49+
* <li>{@link #onDtlsRetransmission()} when a dtls handshake flight is retransmitted.</li>
4550
* <li>{@link #onSent()} right after the message has been sent
4651
* (successfully)</li>
4752
* <li>{@link #onSendError(Throwable)} if the message cannot be sent</li>
@@ -109,6 +114,20 @@ public interface MessageObserver {
109114
*/
110115
void onReadyToSend();
111116

117+
/**
118+
* Invoked, when connector requires to establish a connection before sending
119+
* the message.
120+
*/
121+
void onConnecting();
122+
123+
/**
124+
* Indicate, that this message triggered the connector to establish a
125+
* connection and a dtls handshake flight was retransmitted.
126+
*
127+
* @param flight {@code 1 ... 6}, number of retransmitted flight.
128+
*/
129+
void onDtlsRetransmission(int flight);
130+
112131
/**
113132
* Invoked right after the message has been sent.
114133
* <p>

californium-core/src/main/java/org/eclipse/californium/core/coap/MessageObserverAdapter.java

+11
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
* Achim Kraus (Bosch Software Innovations GmbH) - move onContextEstablished
2828
* to MessageObserver.
2929
* Issue #487
30+
* Achim Kraus (Bosch Software Innovations GmbH) - add onConnect
3031
******************************************************************************/
3132
package org.eclipse.californium.core.coap;
3233

@@ -83,6 +84,16 @@ public void onReadyToSend() {
8384
// empty default implementation
8485
}
8586

87+
@Override
88+
public void onConnecting() {
89+
// empty default implementation
90+
}
91+
92+
@Override
93+
public void onDtlsRetransmission(int flight) {
94+
// empty default implementation
95+
}
96+
8697
@Override
8798
public void onSent() {
8899
// empty default implementation

californium-core/src/main/java/org/eclipse/californium/core/network/CoapEndpoint.java

+11
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
* Achim Kraus (Bosch Software Innovations GmbH) - reject messages with MID only
5959
* (therefore tcp messages are not rejected)
6060
* Achim Kraus (Bosch Software Innovations GmbH) - setup retransmitResponse for notifies
61+
* Achim Kraus (Bosch Software Innovations GmbH) - forward onConnect
6162
******************************************************************************/
6263
package org.eclipse.californium.core.network;
6364

@@ -1030,6 +1031,16 @@ public SendingCallback(final Message message) {
10301031
this.message = message;
10311032
}
10321033

1034+
@Override
1035+
public void onConnecting() {
1036+
message.onConnecting();
1037+
}
1038+
1039+
@Override
1040+
public void onDtlsRetransmission(int flight) {
1041+
message.onDtlsRetransmission(flight);
1042+
}
1043+
10331044
@Override
10341045
public void onContextEstablished(EndpointContext context) {
10351046

element-connector-tcp/src/main/java/org/eclipse/californium/elements/tcp/TcpClientConnector.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
* Bosch Software Innovations GmbH - migrate to SLF4J
3333
* Achim Kraus (Bosch Software Innovations GmbH) - add logs for create and close channel
3434
* Achim Kraus (Bosch Software Innovations GmbH) - adjust logging
35+
* Achim Kraus (Bosch Software Innovations GmbH) - add onConnect
3536
******************************************************************************/
3637
package org.eclipse.californium.elements.tcp;
3738

@@ -145,14 +146,17 @@ public void destroy() {
145146
@Override
146147
public void send(final RawData msg) {
147148
InetSocketAddress addressKey = new InetSocketAddress(msg.getAddress(), msg.getPort());
149+
boolean connected = poolMap.contains(addressKey);
148150
final EndpointContextMatcher endpointMatcher = getEndpointContextMatcher();
149151
/* check, if a new connection should be established */
150-
if (endpointMatcher != null && !poolMap.contains(addressKey)
151-
&& !endpointMatcher.isToBeSent(msg.getEndpointContext(), null)) {
152+
if (endpointMatcher != null && !connected && !endpointMatcher.isToBeSent(msg.getEndpointContext(), null)) {
152153
LOGGER.warn("TcpConnector drops {} bytes to new {}:{}", msg.getSize(), msg.getAddress(), msg.getPort());
153154
msg.onError(new EndpointMismatchException("no connection"));
154155
return;
155156
}
157+
if (!connected) {
158+
msg.onConnecting();
159+
}
156160
final ChannelPool channelPool = poolMap.get(addressKey);
157161
Future<Channel> acquire = channelPool.acquire();
158162
acquire.addListener(new GenericFutureListener<Future<Channel>>() {

element-connector-tcp/src/test/java/org/eclipse/californium/elements/tcp/TcpConnectorTest.java

+34
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333

3434
import org.eclipse.californium.elements.Connector;
3535
import org.eclipse.californium.elements.RawData;
36+
import org.eclipse.californium.elements.util.SimpleMessageCallback;
3637
import org.junit.After;
3738
import org.junit.Rule;
3839
import org.junit.Test;
@@ -150,4 +151,37 @@ public void singleServerManyClients() throws Exception {
150151
assertTrue("Received unexpected message: " + received, matched);
151152
}
152153
}
154+
155+
@Test
156+
public void onConnect() throws Exception {
157+
TcpServerConnector server = new TcpServerConnector(createServerAddress(0), NUMBER_OF_THREADS,
158+
IDLE_TIMEOUT_IN_S);
159+
TcpClientConnector client = new TcpClientConnector(NUMBER_OF_THREADS, CONNECTION_TIMEOUT_IN_MS,
160+
IDLE_TIMEOUT_IN_S);
161+
162+
cleanup.add(server);
163+
cleanup.add(client);
164+
165+
Catcher serverCatcher = new Catcher();
166+
Catcher clientCatcher = new Catcher();
167+
server.setRawDataReceiver(serverCatcher);
168+
client.setRawDataReceiver(clientCatcher);
169+
server.start();
170+
client.start();
171+
172+
SimpleMessageCallback callback = new SimpleMessageCallback();
173+
RawData msg = createMessage(server.getAddress(), messageSize, callback);
174+
175+
client.send(msg);
176+
serverCatcher.blockUntilSize(1);
177+
assertTrue(callback.isConnecting());
178+
179+
callback = new SimpleMessageCallback();
180+
msg = createMessage(server.getAddress(), messageSize, callback);
181+
182+
client.send(msg);
183+
serverCatcher.blockUntilSize(2);
184+
assertFalse(callback.isConnecting());
185+
186+
}
153187
}

element-connector/src/main/java/org/eclipse/californium/elements/MessageCallback.java

+15
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,26 @@
1919
* issue #305
2020
* Achim Kraus (Bosch Software Innovations GmbH) - add comment on processing
2121
* onContextEstablished. issue #311
22+
* Achim Kraus (Bosch Software Innovations GmbH) - add onConnect
2223
******************************************************************************/
2324
package org.eclipse.californium.elements;
2425

2526
public interface MessageCallback {
2627

28+
/**
29+
* Called, when connector requires to establish a connection. Not called, if
30+
* the connection is already established or the connector doesn't require to
31+
* establish a connection.
32+
*/
33+
void onConnecting();
34+
35+
/**
36+
* Called, when the dtls connector retransmits a handshake flight.
37+
*
38+
* @param flight {@code 1 ... 6}, number of retransmitted flight.
39+
*/
40+
void onDtlsRetransmission(int flight);
41+
2742
/**
2843
* Called when the context information for an outbound message has been
2944
* established.

element-connector/src/main/java/org/eclipse/californium/elements/RawData.java

+26-3
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
* constructors.
2828
* Achim Kraus (Bosch Software Innovations GmbH) - replace isSecure by
2929
* connector's protocol
30+
* Achim Kraus (Bosch Software Innovations GmbH) - add onConnect
3031
******************************************************************************/
3132
package org.eclipse.californium.elements;
3233

@@ -56,18 +57,18 @@ public final class RawData {
5657
public final byte[] bytes;
5758

5859
/** Indicates if this message is a multicast message */
59-
private boolean multicast;
60+
private final boolean multicast;
6061

6162
/**
6263
* Endpoint context of the remote peer.
6364
*/
64-
private EndpointContext peerEndpointContext;
65+
private final EndpointContext peerEndpointContext;
6566

6667
/**
6768
* Message callback to receive the actual endpoint context the message is
6869
* sent in.
6970
*/
70-
private MessageCallback callback;
71+
private final MessageCallback callback;
7172

7273
/**
7374
* Instantiates a new raw data.
@@ -221,6 +222,28 @@ public EndpointContext getEndpointContext() {
221222
return peerEndpointContext;
222223
}
223224

225+
/**
226+
* Callback, when connector requires to establish a connection. Not called,
227+
* if the connection is already established or the connector doesn't require
228+
* to establish a connection.
229+
*/
230+
public void onConnecting() {
231+
if (null != callback) {
232+
callback.onConnecting();
233+
}
234+
}
235+
236+
/**
237+
* Callback, when the dtls connector retransmits a handshake flight.
238+
*
239+
* @param flight {@code 1 ... 6}, number of retransmitted flight.
240+
*/
241+
public void onDtlsRetransmission(int flight) {
242+
if (null != callback) {
243+
callback.onDtlsRetransmission(flight);
244+
}
245+
}
246+
224247
/**
225248
* Callback, when context gets available. Used on sending an message.
226249
*

element-connector/src/test/java/org/eclipse/californium/elements/util/SimpleMessageCallback.java

+36
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,14 @@ public class SimpleMessageCallback implements MessageCallback {
5656
* Indicator for message sent.
5757
*/
5858
private boolean sent;
59+
/**
60+
* Indicator for connect.
61+
*/
62+
private boolean connecting;
63+
/**
64+
* Retransmitted dtls handshake flight.
65+
*/
66+
private int retransmittedDtlsFlight;
5967

6068
/**
6169
* Create new message callback.
@@ -92,6 +100,16 @@ public SimpleMessageCallback(int calls, boolean countContextEstablished, Message
92100
this.chained = chained;
93101
}
94102

103+
@Override
104+
public synchronized void onConnecting() {
105+
connecting = true;
106+
}
107+
108+
@Override
109+
public synchronized void onDtlsRetransmission(int flight) {
110+
retransmittedDtlsFlight = flight;
111+
}
112+
95113
@Override
96114
public synchronized void onContextEstablished(EndpointContext context) {
97115
this.context = context;
@@ -179,6 +197,24 @@ public synchronized boolean isSent() {
179197
return sent;
180198
}
181199

200+
/**
201+
* Check, if message requires to establish a connection.
202+
*
203+
* @return {@code true}, if requires a connection, {@code false}, otherwise
204+
*/
205+
public synchronized boolean isConnecting() {
206+
return connecting;
207+
}
208+
209+
/**
210+
* Check, if message requires to establish a connection.
211+
*
212+
* @return {@code true}, if requires a connection, {@code false}, otherwise
213+
*/
214+
public synchronized int getRetransmittedDtlsFlight() {
215+
return retransmittedDtlsFlight;
216+
}
217+
182218
/**
183219
* Get error of sending message.
184220
*

scandium-core/src/main/java/org/eclipse/californium/scandium/DTLSConnector.java

+12-3
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@
9191
* Move session listener callback out of sync
9292
* block of processApplicationDataRecord.
9393
* Achim Kraus (Bosch Software Innovations GmbH) - add handshakeFlightRetransmitted
94+
* Achim Kraus (Bosch Software Innovations GmbH) - add onConnect
9495
******************************************************************************/
9596
package org.eclipse.californium.scandium;
9697

@@ -1462,8 +1463,9 @@ private void sendMessage(final RawData message) throws HandshakeException {
14621463
if (!checkOutboundEndpointContext(message, null)) {
14631464
return;
14641465
}
1465-
// no session with peer established yet, create new empty session &
1466-
// start handshake
1466+
message.onConnecting();
1467+
// no session with peer established yet,
1468+
// create new empty session & start handshake
14671469
Handshaker handshaker = new ClientHandshaker(
14681470
DTLSSession.newClientSession(peerAddress, message.getEndpointContext().getVirtualHost()),
14691471
getRecordLayerForPeer(connection),
@@ -1477,6 +1479,7 @@ private void sendMessage(final RawData message) throws HandshakeException {
14771479
// TODO what if there already is an ongoing handshake with the peer
14781480
else if (connection.isResumptionRequired()){
14791481
// create the session to resume from the previous one.
1482+
message.onConnecting();
14801483
SessionId sessionId;
14811484
if (ticket == null) {
14821485
ticket = session.getSessionTicket();
@@ -1562,7 +1565,13 @@ public void sessionEstablished(Handshaker handshaker, DTLSSession establishedSes
15621565
LOGGER.debug("Session with [{}] established, now sending deferred message", establishedSession.getPeer());
15631566
sendMessage(message, establishedSession);
15641567
}
1565-
1568+
1569+
@Override
1570+
public void handshakeFlightRetransmitted(Handshaker handshaker, int flight) {
1571+
LOGGER.debug("Session with [{}] retransmit flight {}", handshaker.getPeerAddress(), flight);
1572+
message.onDtlsRetransmission(flight);
1573+
}
1574+
15661575
@Override
15671576
public void handshakeFailed(Handshaker handshaker, Throwable error) {
15681577
LOGGER.debug("Session with [{}] failed, report error", handshaker.getPeerAddress());

0 commit comments

Comments
 (0)