Skip to content

Commit dadd1f1

Browse files
sirkrypt01c3t3a
authored andcommitted
feat(socketio): notify client of transport completion
When the async client connects to the server, it spawns a new thread to handle the arriving messages asynchronously, which is immediately detached with no chance of awaiting its completion. For long-running programs (e.g. a client program that never really disconnects from the server) this can be problematic, as unexpected stream completions would go unnoticed. This may happen if the underlying tungstenite websocket shuts down ('Received close frame: None') but there are no engineio/socketio close frames. Hence, since the stream terminates, the message handling task stops without a Close or Error event being fired. Thus, we now fire an additional Event::Close when the stream terminates, to signal the (potentially unexpected) close to the user.
1 parent d5f40d9 commit dadd1f1

File tree

2 files changed

+66
-2
lines changed

2 files changed

+66
-2
lines changed

ci/socket-io.js

+8
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,14 @@ var callback = client => {
3636
ack(Buffer.from([1, 2, 3]));
3737
}
3838
});
39+
40+
// This event allows the test framework to arbitrarily close the underlying connection
41+
client.on('close_transport', data => {
42+
console.log(['close_transport', 'Request to close transport received'])
43+
// Close underlying websocket connection
44+
client.client.conn.close();
45+
})
46+
3947
client.emit('Hello from the message event!');
4048
client.emit('test', 'Hello from the test event!');
4149
client.emit(Buffer.from([4, 5, 6]));

socketio/src/asynchronous/client/client.rs

+58-2
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use std::{ops::DerefMut, pin::Pin, sync::Arc};
22

33
use backoff::{backoff::Backoff, ExponentialBackoffBuilder};
44
use futures_util::{future::BoxFuture, stream, Stream, StreamExt};
5-
use log::trace;
5+
use log::{error, trace};
66
use rand::{thread_rng, Rng};
77
use serde_json::Value;
88
use tokio::{
@@ -162,7 +162,21 @@ impl Client {
162162
drop(stream);
163163

164164
let should_reconnect = match *(client_clone.disconnect_reason.read().await) {
165-
DisconnectReason::Unknown => reconnect,
165+
DisconnectReason::Unknown => {
166+
// If we disconnected for an unknown reason, the client might not have noticed
167+
// the closure yet. Hence, fire a transport close event to notify it.
168+
// We don't need to do that in the other cases, since proper server close
169+
// and manual client close are handled explicitly.
170+
if let Some(err) = client_clone
171+
.callback(&Event::Close, "transport close")
172+
.await
173+
.err()
174+
{
175+
error!("Error while notifying client of transport close: {err}")
176+
}
177+
178+
reconnect
179+
}
166180
DisconnectReason::Manual => false,
167181
DisconnectReason::Server => reconnect_on_disconnect,
168182
};
@@ -590,6 +604,7 @@ mod test {
590604
},
591605
error::Result,
592606
packet::{Packet, PacketId},
607+
Event,
593608
Payload, TransportType,
594609
};
595610

@@ -926,6 +941,47 @@ mod test {
926941
Ok(())
927942
}
928943

944+
#[tokio::test]
945+
async fn socket_io_transport_close() -> Result<()> {
946+
let url = crate::test::socket_io_server();
947+
948+
let (tx, mut rx) = mpsc::channel(1);
949+
950+
let notify = Arc::new(tokio::sync::Notify::new());
951+
let notify_clone = notify.clone();
952+
953+
let socket = ClientBuilder::new(url)
954+
.on(Event::Connect, move |_, _| {
955+
let cl = notify_clone.clone();
956+
async move {
957+
cl.notify_one();
958+
}
959+
.boxed()
960+
})
961+
.on(Event::Close, move |payload, _| {
962+
let clone_tx = tx.clone();
963+
async move { clone_tx.send(payload).await.unwrap() }.boxed()
964+
})
965+
.connect()
966+
.await?;
967+
968+
// Wait until socket is connected
969+
let connect_timeout = timeout(Duration::from_secs(1), notify.notified()).await;
970+
assert!(connect_timeout.is_ok());
971+
972+
// Instruct server to close transport
973+
let result = socket.emit("close_transport", Payload::from("")).await;
974+
assert!(result.is_ok());
975+
976+
// Wait for Event::Close
977+
let rx_timeout = timeout(Duration::from_secs(1), rx.recv()).await;
978+
assert!(rx_timeout.is_ok());
979+
980+
assert_eq!(rx_timeout.unwrap(), Some(Payload::from("transport close")));
981+
982+
Ok(())
983+
}
984+
929985
#[tokio::test]
930986
async fn socketio_polling_integration() -> Result<()> {
931987
let url = crate::test::socket_io_server();

0 commit comments

Comments
 (0)