From 58b116cfafd4afe569d730fc1fd83b3fe782c48a Mon Sep 17 00:00:00 2001 From: ovalseven8 <8258609+ovalseven8@users.noreply.github.com> Date: Wed, 24 Jul 2019 09:35:31 +0200 Subject: [PATCH] Synchronize on Signaling class Before that change, the handler methods for the WebSocket library were synchronized on the WebSocketAdapter while other methods in Signaling class were synchronized on the class itself. Also, the public SaltyRTC API is also synchronized on the Signaling class now. With this change, the WebSocket library and the public API are synchronized on the same class which should guarantee thread-safety. --- README.md | 3 - .../java/org/saltyrtc/client/SaltyRTC.java | 40 +- .../client/signaling/InitiatorSignaling.java | 4 +- .../saltyrtc/client/signaling/Signaling.java | 394 +++++++++--------- 4 files changed, 230 insertions(+), 211 deletions(-) diff --git a/README.md b/README.md index 1eb4baf..d719a36 100644 --- a/README.md +++ b/README.md @@ -36,9 +36,6 @@ Maven: Documentation can be found at [https://saltyrtc.github.io/saltyrtc-client-java/](https://saltyrtc.github.io/saltyrtc-client-java/). -Plase note that instances of this library are not considered thread-safe. Thus, an application -using more than one thread needs to take care of synchronisation itself. - ## Manual Testing To try a development version of the library, you can build a local version to diff --git a/src/main/java/org/saltyrtc/client/SaltyRTC.java b/src/main/java/org/saltyrtc/client/SaltyRTC.java index 1207458..0f634e3 100644 --- a/src/main/java/org/saltyrtc/client/SaltyRTC.java +++ b/src/main/java/org/saltyrtc/client/SaltyRTC.java @@ -38,7 +38,7 @@ public class SaltyRTC { private boolean debug = false; // Reference to signaling class - private Signaling signaling; + private final Signaling signaling; // Event registry public final SaltyRTC.Events events = new SaltyRTC.Events(); @@ -102,22 +102,30 @@ public class SaltyRTC { } public KeyStore getKeyStore() { - return this.signaling.getKeyStore(); + synchronized (this.signaling) { + return this.signaling.getKeyStore(); + } } public byte[] getPublicPermanentKey() { - return this.signaling.getPublicPermanentKey(); + synchronized (this.signaling) { + return this.signaling.getPublicPermanentKey(); + } } public byte[] getAuthToken() { - return this.signaling.getAuthToken(); + synchronized (this.signaling) { + return this.signaling.getAuthToken(); + } } /** * Return the current signaling state. */ public SignalingState getSignalingState() { - return this.signaling.getState(); + synchronized (this.signaling) { + return this.signaling.getState(); + } } /** @@ -125,7 +133,9 @@ public SignalingState getSignalingState() { */ @Nullable public Task getTask() { - return this.signaling.getTask(); + synchronized (this.signaling) { + return this.signaling.getTask(); + } } /** @@ -137,7 +147,9 @@ public Task getTask() { * @throws ConnectionException if setting up the WebSocket connection fails. */ public void connect() throws ConnectionException { - this.signaling.connect(); + synchronized (this.signaling) { + this.signaling.connect(); + } } /** @@ -147,11 +159,13 @@ public void connect() throws ConnectionException { * @throws InvalidStateException if the SaltyRTC instance is not currently in the TASK signaling state. */ public void sendApplicationMessage(Object data) throws ConnectionException, InvalidStateException { - if (this.signaling.getState() != SignalingState.TASK) { - throw new InvalidStateException( - "Application messages can only be sent in TASK state, not in " + this.signaling.getState().name()); + synchronized (this.signaling) { + if (this.signaling.getState() != SignalingState.TASK) { + throw new InvalidStateException( + "Application messages can only be sent in TASK state, not in " + this.signaling.getState().name()); + } + this.signaling.sendApplication(new Application(data)); } - this.signaling.sendApplication(new Application(data)); } /** @@ -162,7 +176,9 @@ public void sendApplicationMessage(Object data) throws ConnectionException, Inva * this method again from within your `SignalingStateChangedEvent` event handlers, or deadlocks may occur! */ public void disconnect() { - this.signaling.disconnect(); + synchronized (this.signaling) { + this.signaling.disconnect(); + } } /** diff --git a/src/main/java/org/saltyrtc/client/signaling/InitiatorSignaling.java b/src/main/java/org/saltyrtc/client/signaling/InitiatorSignaling.java index 79e7adb..08c2c2f 100644 --- a/src/main/java/org/saltyrtc/client/signaling/InitiatorSignaling.java +++ b/src/main/java/org/saltyrtc/client/signaling/InitiatorSignaling.java @@ -78,7 +78,7 @@ public InitiatorSignaling(SaltyRTC saltyRTC, String host, int port, /** * Handle signaling errors during peer handshake. */ - synchronized void handlePeerHandshakeSignalingError(@NonNull SignalingException e, short source) { + void handlePeerHandshakeSignalingError(@NonNull SignalingException e, short source) { // Simply drop the responder Responder responder = this.responders.get(source); if (responder != null) { @@ -544,7 +544,7 @@ private void dropResponders() throws SignalingException, ConnectionException { } @Override - synchronized void handleSendError(short receiver) throws SignalingException { + void handleSendError(short receiver) throws SignalingException { // Validate receiver byte if (!this.isResponderId(receiver)) { throw new ProtocolException("Outgoing c2c messages must have been sent to a responder"); diff --git a/src/main/java/org/saltyrtc/client/signaling/Signaling.java b/src/main/java/org/saltyrtc/client/signaling/Signaling.java index 1e6b25b..556b937 100644 --- a/src/main/java/org/saltyrtc/client/signaling/Signaling.java +++ b/src/main/java/org/saltyrtc/client/signaling/Signaling.java @@ -71,14 +71,14 @@ public abstract class Signaling implements SignalingInterface { abstract Logger getLogger(); // WebSocket + private WebSocket ws; private final String host; private final int port; private final SSLContext sslContext; - private WebSocket ws; - final private int pingInterval; - final private int wsConnectTimeoutInitial; - final private int wsConnectAttemptsMax; - final private boolean wsConnectLinearBackoff; + private final int pingInterval; + private final int wsConnectTimeoutInitial; + private final int wsConnectAttemptsMax; + private final boolean wsConnectLinearBackoff; private int wsConnectTimeout; private int wsConnectAttempt = 0; @@ -217,12 +217,8 @@ public void connect() throws ConnectionException { /** * Disconnect from the SaltyRTC server. - * - * This is a synchronous operation. The event handlers for the `SignalingStateChangedEvent` - * will also be called synchronously with the states `CLOSING` and `CLOSED`. Therefore make sure not to call - * this method again from within your `SignalingStateChangedEvent` event handlers, or deadlocks may occur! */ - synchronized void disconnect(int reason) { + void disconnect(int reason) { this.setState(SignalingState.CLOSING); // Send close message if necessary @@ -249,10 +245,6 @@ synchronized void disconnect(int reason) { /** * Disconnect from the SaltyRTC server. - * - * This is a synchronous operation. The event handlers for the `SignalingStateChangedEvent` - * will also be called synchronously with the states `CLOSING` and `CLOSED`. Therefore make sure not to call - * this method again from within your `SignalingStateChangedEvent` event handlers, or deadlocks may occur! */ public void disconnect() { this.disconnect(CloseCode.CLOSING_NORMAL); @@ -261,7 +253,7 @@ public void disconnect() { /** * Reset the connection. */ - public synchronized void resetConnection(@Nullable Integer reason) { + public void resetConnection(@Nullable Integer reason) { // Disconnect if (this.state != SignalingState.NEW) { final int code = reason != null ? reason : CloseCode.CLOSING_NORMAL; @@ -294,215 +286,225 @@ private void initWebsocket() throws IOException { WebSocketAdapter listener = new WebSocketAdapter() { @Override @SuppressWarnings("UnqualifiedMethodAccess") - public synchronized void onConnected(WebSocket websocket, Map> headers) { - if (getState() == SignalingState.WS_CONNECTING) { - getLogger().info("WebSocket connection established"); - setState(SignalingState.SERVER_HANDSHAKE); - } else { - getLogger().warn("Got onConnected event, but WebSocket connection already open"); + public void onConnected(WebSocket websocket, Map> headers) { + synchronized (Signaling.this) { + if (getState() == SignalingState.WS_CONNECTING) { + getLogger().info("WebSocket connection established"); + setState(SignalingState.SERVER_HANDSHAKE); + } else { + getLogger().warn("Got onConnected event, but WebSocket connection already open"); + } } } @Override @SuppressWarnings("UnqualifiedMethodAccess") - public synchronized void onConnectError(WebSocket websocket, WebSocketException ex) throws Exception { - getLogger().error("Could not connect to websocket (" + ex.getError().toString() + "): " + ex.getMessage()); - if (Signaling.this.wsConnectAttemptsMax <= 0 || Signaling.this.wsConnectAttempt < Signaling.this.wsConnectAttemptsMax) { - // Increase #attempts (and timeout if needed) - if (Signaling.this.wsConnectLinearBackoff) { - Signaling.this.wsConnectTimeout += Signaling.this.wsConnectTimeoutInitial; - } - Signaling.this.wsConnectAttempt += 1; + public void onConnectError(WebSocket websocket, WebSocketException ex) throws Exception { + synchronized (Signaling.this) { + getLogger().error("Could not connect to websocket (" + ex.getError().toString() + "): " + ex.getMessage()); + if (Signaling.this.wsConnectAttemptsMax <= 0 || Signaling.this.wsConnectAttempt < Signaling.this.wsConnectAttemptsMax) { + // Increase #attempts (and timeout if needed) + if (Signaling.this.wsConnectLinearBackoff) { + Signaling.this.wsConnectTimeout += Signaling.this.wsConnectTimeoutInitial; + } + Signaling.this.wsConnectAttempt += 1; + + // Log retry attempt + final String retryConstraint; + if (Signaling.this.wsConnectAttemptsMax <= 0) { + retryConstraint = "infinitely"; + } else { + retryConstraint = Signaling.this.wsConnectAttempt + "/" + Signaling.this.wsConnectAttemptsMax; + } + getLogger().info("Retrying to reconnect (" + retryConstraint + ")..."); - // Log retry attempt - final String retryConstraint; - if (Signaling.this.wsConnectAttemptsMax <= 0) { - retryConstraint = "infinitely"; + // Retry WS connection + Signaling.this.setState(SignalingState.WS_CONNECTING); + Signaling.this.ws.recreate(Signaling.this.wsConnectTimeout).connectAsynchronously(); } else { - retryConstraint = Signaling.this.wsConnectAttempt + "/" + Signaling.this.wsConnectAttemptsMax; + getLogger().info("Giving up."); + setState(SignalingState.ERROR); } - getLogger().info("Retrying to reconnect (" + retryConstraint + ")..."); - - // Retry WS connection - Signaling.this.setState(SignalingState.WS_CONNECTING); - Signaling.this.ws.recreate(Signaling.this.wsConnectTimeout).connectAsynchronously(); - } else { - getLogger().info("Giving up."); - setState(SignalingState.ERROR); } } @Override @SuppressWarnings("UnqualifiedMethodAccess") - public synchronized void onTextMessage(WebSocket websocket, String text) { - getLogger().debug("New string message: " + text); - getLogger().error("Protocol error: Received string message, but only binary messages are valid."); - Signaling.this.resetConnection(CloseCode.PROTOCOL_ERROR); + public void onTextMessage(WebSocket websocket, String text) { + synchronized (Signaling.this) { + getLogger().debug("New string message: " + text); + getLogger().error("Protocol error: Received string message, but only binary messages are valid."); + Signaling.this.resetConnection(CloseCode.PROTOCOL_ERROR); + } } @Override @SuppressWarnings("UnqualifiedMethodAccess") - public synchronized void onBinaryMessage(WebSocket websocket, byte[] binary) { - getLogger().debug("New binary message (" + binary.length + " bytes)"); - switch (Signaling.this.getState()) { - case WS_CONNECTING: - getLogger().info("WebSocket connection open"); - Signaling.this.setState(SignalingState.SERVER_HANDSHAKE); - break; - case CLOSED: - getLogger().debug("Ignoring message in state " + Signaling.this.getState()); - return; - } - - SignalingChannelNonce nonce = null; - try { - // Parse buffer - final Box box = new Box(ByteBuffer.wrap(binary), SignalingChannelNonce.TOTAL_LENGTH); - - // Parse and validate nonce - nonce = new SignalingChannelNonce(ByteBuffer.wrap(box.getNonce())); - if (Signaling.this.getPeerWithId(nonce.getSource()) == null) { - // Note: This can happen when a responder has been dropped - // but a message was still in flight. - getLogger().debug("Ignoring message from unknown id: " + nonce.getSource()); - return; + public void onBinaryMessage(WebSocket websocket, byte[] binary) { + synchronized (Signaling.this) { + getLogger().debug("New binary message (" + binary.length + " bytes)"); + switch (Signaling.this.getState()) { + case WS_CONNECTING: + getLogger().info("WebSocket connection open"); + Signaling.this.setState(SignalingState.SERVER_HANDSHAKE); + break; + case CLOSED: + getLogger().debug("Ignoring message in state " + Signaling.this.getState()); + return; } - validateNonce(nonce); - // Check peer handover state - if (nonce.getSource() != SALTYRTC_ADDR_SERVER && Signaling.this.handoverState.getPeer()) { - getLogger().error("Protocol error: Received WebSocket message from peer " + - "even though it has already handed over to task."); - Signaling.this.resetConnection(CloseCode.PROTOCOL_ERROR); - return; - } + SignalingChannelNonce nonce = null; + try { + // Parse buffer + final Box box = new Box(ByteBuffer.wrap(binary), SignalingChannelNonce.TOTAL_LENGTH); + + // Parse and validate nonce + nonce = new SignalingChannelNonce(ByteBuffer.wrap(box.getNonce())); + if (Signaling.this.getPeerWithId(nonce.getSource()) == null) { + // Note: This can happen when a responder has been dropped + // but a message was still in flight. + getLogger().debug("Ignoring message from unknown id: " + nonce.getSource()); + return; + } + validateNonce(nonce); + + // Check peer handover state + if (nonce.getSource() != SALTYRTC_ADDR_SERVER && Signaling.this.handoverState.getPeer()) { + getLogger().error("Protocol error: Received WebSocket message from peer " + + "even though it has already handed over to task."); + Signaling.this.resetConnection(CloseCode.PROTOCOL_ERROR); + return; + } - // Dispatch message - switch (Signaling.this.getState()) { - case SERVER_HANDSHAKE: - Signaling.this.onServerHandshakeMessage(box, nonce); - break; - case PEER_HANDSHAKE: - Signaling.this.onPeerHandshakeMessage(box, nonce); - break; - case TASK: - Signaling.this.onSignalingMessage(box, nonce); - break; - default: - getLogger().warn("Received message in " + Signaling.this.getState().name() + + // Dispatch message + switch (Signaling.this.getState()) { + case SERVER_HANDSHAKE: + Signaling.this.onServerHandshakeMessage(box, nonce); + break; + case PEER_HANDSHAKE: + Signaling.this.onPeerHandshakeMessage(box, nonce); + break; + case TASK: + Signaling.this.onSignalingMessage(box, nonce); + break; + default: + getLogger().warn("Received message in " + Signaling.this.getState().name() + " signaling state. Ignoring."); - } - // TODO: The following errors could also be handled using `handleCallbackError` on the websocket. - } catch (ValidationError e) { - if (e.critical) { + } + // TODO: The following errors could also be handled using `handleCallbackError` on the websocket. + } catch (ValidationError e) { + if (e.critical) { + getLogger().error("Protocol error: Invalid incoming message: " + e.getMessage()); + e.printStackTrace(); + Signaling.this.resetConnection(CloseCode.PROTOCOL_ERROR); + } else { + getLogger().warn("Dropping invalid message: " + e.getMessage()); + e.printStackTrace(); + } + } catch (SerializationError e) { getLogger().error("Protocol error: Invalid incoming message: " + e.getMessage()); e.printStackTrace(); Signaling.this.resetConnection(CloseCode.PROTOCOL_ERROR); - } else { - getLogger().warn("Dropping invalid message: " + e.getMessage()); + } catch (InternalException e) { + getLogger().error("Internal server error: " + e.getMessage()); e.printStackTrace(); - } - } catch (SerializationError e) { - getLogger().error("Protocol error: Invalid incoming message: " + e.getMessage()); - e.printStackTrace(); - Signaling.this.resetConnection(CloseCode.PROTOCOL_ERROR); - } catch (InternalException e) { - getLogger().error("Internal server error: " + e.getMessage()); - e.printStackTrace(); - Signaling.this.resetConnection(CloseCode.INTERNAL_ERROR); - } catch (ConnectionException e) { - getLogger().error("Connection error: " + e.getMessage()); - e.printStackTrace(); - Signaling.this.resetConnection(CloseCode.INTERNAL_ERROR); - } catch (SignalingException e) { - getLogger().error("Signaling error: " + CloseCode.explain(e.getCloseCode())); - e.printStackTrace(); - switch (Signaling.this.getState()) { - case NEW: - case WS_CONNECTING: - case SERVER_HANDSHAKE: - // Close connection - Signaling.this.resetConnection(e.getCloseCode()); - break; - case PEER_HANDSHAKE: - // Handle error depending on role - Signaling.this.handlePeerHandshakeSignalingError(e, nonce.getSource()); - break; - case TASK: - // Close websocket connection - Signaling.this.sendClose(e.getCloseCode()); - Signaling.this.resetConnection(CloseCode.CLOSING_NORMAL); - break; - case CLOSING: - case CLOSED: - // Ignore - break; + Signaling.this.resetConnection(CloseCode.INTERNAL_ERROR); + } catch (ConnectionException e) { + getLogger().error("Connection error: " + e.getMessage()); + e.printStackTrace(); + Signaling.this.resetConnection(CloseCode.INTERNAL_ERROR); + } catch (SignalingException e) { + getLogger().error("Signaling error: " + CloseCode.explain(e.getCloseCode())); + e.printStackTrace(); + switch (Signaling.this.getState()) { + case NEW: + case WS_CONNECTING: + case SERVER_HANDSHAKE: + // Close connection + Signaling.this.resetConnection(e.getCloseCode()); + break; + case PEER_HANDSHAKE: + // Handle error depending on role + Signaling.this.handlePeerHandshakeSignalingError(e, nonce.getSource()); + break; + case TASK: + // Close websocket connection + Signaling.this.sendClose(e.getCloseCode()); + Signaling.this.resetConnection(CloseCode.CLOSING_NORMAL); + break; + case CLOSING: + case CLOSED: + // Ignore + break; + } } } } @Override @SuppressWarnings("UnqualifiedMethodAccess") - public synchronized void onDisconnected(WebSocket websocket, + public void onDisconnected(WebSocket websocket, @Nullable WebSocketFrame serverCloseFrame, @Nullable WebSocketFrame clientCloseFrame, boolean closedByServer) { - // Log details to debug log - final String closer = closedByServer ? "server" : "client"; - final WebSocketFrame frame = closedByServer ? serverCloseFrame : clientCloseFrame; - final int closeCode = frame == null ? 0 : frame.getCloseCode(); - String closeReason = frame == null ? null : frame.getCloseReason(); - if (closeReason == null) { - closeReason = CloseCode.explain(closeCode); - } - getLogger().debug("WebSocket connection closed by " + closer + - " with code " + closeCode + ": " + closeReason); - - // Log some of the codes on higher log levels too - if (closedByServer) { - switch (closeCode) { - case 0: - getLogger().warn("WebSocket closed (no close frame provided)"); - break; - case CloseCode.CLOSING_NORMAL: - getLogger().info("WebSocket closed normally"); - break; - case CloseCode.TIMEOUT: - getLogger().info("WebSocket closed due to timeout"); - break; - case CloseCode.GOING_AWAY: - getLogger().warn("WebSocket closed, server is being shut down"); - break; - case CloseCode.NO_SHARED_SUBPROTOCOL: - getLogger().warn("WebSocket closed: No shared sub-protocol could be found"); - break; - case CloseCode.PATH_FULL: - getLogger().warn("WebSocket closed: Path full (no free responder byte)"); - break; - case CloseCode.PROTOCOL_ERROR: - getLogger().error("WebSocket closed: Protocol error"); - break; - case CloseCode.INTERNAL_ERROR: - getLogger().error("WebSocket closed: Internal server error"); - break; - case CloseCode.DROPPED_BY_INITIATOR: - getLogger().info("WebSocket closed: Dropped by initiator"); - break; - case CloseCode.INITIATOR_COULD_NOT_DECRYPT: - getLogger().warn("WebSocket closed: Initiator could not decrypt message"); - break; - case CloseCode.NO_SHARED_TASK: - getLogger().warn("WebSocket closed: No shared task was found"); - break; - case CloseCode.INVALID_KEY: - getLogger().error("WebSocket closed: An invalid public permanent server key was specified"); - break; + synchronized (Signaling.this) { + // Log details to debug log + final String closer = closedByServer ? "server" : "client"; + final WebSocketFrame frame = closedByServer ? serverCloseFrame : clientCloseFrame; + final int closeCode = frame == null ? 0 : frame.getCloseCode(); + String closeReason = frame == null ? null : frame.getCloseReason(); + if (closeReason == null) { + closeReason = CloseCode.explain(closeCode); + } + getLogger().debug("WebSocket connection closed by " + closer + + " with code " + closeCode + ": " + closeReason); + + // Log some of the codes on higher log levels too + if (closedByServer) { + switch (closeCode) { + case 0: + getLogger().warn("WebSocket closed (no close frame provided)"); + break; + case CloseCode.CLOSING_NORMAL: + getLogger().info("WebSocket closed normally"); + break; + case CloseCode.TIMEOUT: + getLogger().info("WebSocket closed due to timeout"); + break; + case CloseCode.GOING_AWAY: + getLogger().warn("WebSocket closed, server is being shut down"); + break; + case CloseCode.NO_SHARED_SUBPROTOCOL: + getLogger().warn("WebSocket closed: No shared sub-protocol could be found"); + break; + case CloseCode.PATH_FULL: + getLogger().warn("WebSocket closed: Path full (no free responder byte)"); + break; + case CloseCode.PROTOCOL_ERROR: + getLogger().error("WebSocket closed: Protocol error"); + break; + case CloseCode.INTERNAL_ERROR: + getLogger().error("WebSocket closed: Internal server error"); + break; + case CloseCode.DROPPED_BY_INITIATOR: + getLogger().info("WebSocket closed: Dropped by initiator"); + break; + case CloseCode.INITIATOR_COULD_NOT_DECRYPT: + getLogger().warn("WebSocket closed: Initiator could not decrypt message"); + break; + case CloseCode.NO_SHARED_TASK: + getLogger().warn("WebSocket closed: No shared task was found"); + break; + case CloseCode.INVALID_KEY: + getLogger().error("WebSocket closed: An invalid public permanent server key was specified"); + break; + } + } + // Note: Don't check for signaling state here, it will already have been resetted. + if (closeCode != CloseCode.HANDOVER) { + Signaling.this.salty.events.close.notifyHandlers(new CloseEvent(closeCode)); + setState(SignalingState.CLOSED); } - } - // Note: Don't check for signaling state here, it will already have been resetted. - if (closeCode != CloseCode.HANDOVER) { - Signaling.this.salty.events.close.notifyHandlers(new CloseEvent(closeCode)); - setState(SignalingState.CLOSED); } } @@ -515,15 +517,19 @@ public synchronized void onDisconnected(WebSocket websocket, @Override @SuppressWarnings("UnqualifiedMethodAccess") public void onError(WebSocket websocket, WebSocketException cause) { - getLogger().warn("A WebSocket error occured: " + cause.getMessage(), cause); + synchronized (Signaling.this) { + getLogger().warn("A WebSocket error occured: " + cause.getMessage(), cause); + } } @Override @SuppressWarnings("UnqualifiedMethodAccess") - public synchronized void handleCallbackError(WebSocket websocket, Throwable cause) { - getLogger().error("WebSocket callback error: " + cause); - cause.printStackTrace(); - Signaling.this.resetConnection(CloseCode.INTERNAL_ERROR); + public void handleCallbackError(WebSocket websocket, Throwable cause) { + synchronized (Signaling.this) { + getLogger().error("WebSocket callback error: " + cause); + cause.printStackTrace(); + Signaling.this.resetConnection(CloseCode.INTERNAL_ERROR); + } } };