diff --git a/lib/duplex_connection.dart b/lib/duplex_connection.dart index 3579548..b3934e3 100644 --- a/lib/duplex_connection.dart +++ b/lib/duplex_connection.dart @@ -23,6 +23,7 @@ abstract class DuplexConnection implements Closeable, Availability { typedef TcpChunkHandler = void Function(Uint8List chunk); typedef CloseHandler = void Function(); +typedef SocketClosedCallback = void Function(); class TcpDuplexConnection extends DuplexConnection { Socket socket; @@ -60,8 +61,9 @@ class TcpDuplexConnection extends DuplexConnection { class WebSocketDuplexConnection extends DuplexConnection { WebSocketChannel webSocket; bool closed = true; + SocketClosedCallback? socketClosedCallback; - WebSocketDuplexConnection(this.webSocket); + WebSocketDuplexConnection(this.webSocket, {this.socketClosedCallback}); @override void init() { @@ -84,6 +86,7 @@ class WebSocketDuplexConnection extends DuplexConnection { closed = true; _availability = 0.0; closeHandler?.call(); + socketClosedCallback?.call(); } } @@ -94,7 +97,7 @@ class WebSocketDuplexConnection extends DuplexConnection { } } -Future connectRSocket(String url, TcpChunkHandler handler) { +Future connectRSocket(String url, TcpChunkHandler handler,SocketClosedCallback? socketClosedCallback) { var uri = Uri.parse(url); var scheme = uri.scheme; if (scheme == 'tcp') { @@ -104,7 +107,7 @@ Future connectRSocket(String url, TcpChunkHandler handler) { final websocket = WebSocketChannel.connect( Uri.parse(url), ); - return Future.value(WebSocketDuplexConnection(websocket)); + return Future.value(WebSocketDuplexConnection(websocket, socketClosedCallback: socketClosedCallback)); } else { return Future.error('${scheme} unsupported'); } diff --git a/lib/rsocket_connector.dart b/lib/rsocket_connector.dart index 8a70952..1512a25 100644 --- a/lib/rsocket_connector.dart +++ b/lib/rsocket_connector.dart @@ -14,6 +14,7 @@ class RSocketConnector { String _metadataMimeType = 'message/x.rsocket.composite-metadata.v0'; ErrorConsumer? _errorConsumer; SocketAcceptor? _acceptor; + SocketClosedCallback? _socketClosedCallback; RSocketConnector.create(); @@ -22,6 +23,11 @@ class RSocketConnector { return this; } + RSocketConnector setupClosedCallback(SocketClosedCallback socketClosedCallback) { + _socketClosedCallback = socketClosedCallback; + return this; + } + RSocketConnector setupPayload(Payload payload) { this.payload = payload; return this; @@ -53,7 +59,7 @@ class RSocketConnector { ..dataMimeType = _dataMimeType ..data = payload?.data ..metadata = payload?.metadata; - return connectRSocket(url, handler).then((conn) { + return connectRSocket(url, handler, _socketClosedCallback).then((conn) { var rsocketRequester = RSocketRequester('requester', connectionSetupPayload, conn); if (_acceptor != null) {