Skip to content

Commit d29086b

Browse files
authored
Implement socketio onAny (1c3t3a#169)
* test: auth use iter instead of emit_with_ack * feat: implement socketio onAny * fix: only call on_any on Message / Custom event * docs: add on_any documentation * test: rename on_any integration * test: add mpsc to on_any integration * docs: add sleep doc to on_any integration
1 parent 80a3720 commit d29086b

File tree

4 files changed

+162
-42
lines changed

4 files changed

+162
-42
lines changed

ci/socket-io-auth.js

+3-8
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,9 @@ const io = require('socket.io')(server);
33

44
console.log('Started');
55
var callback = client => {
6-
console.log('Connected!');;
7-
client.on('test', function (arg, ack) {
8-
console.log('Ack received')
9-
if (ack) {
10-
const payload = client.handshake.auth.password === '123' ? '456' : '789'
11-
ack(payload);
12-
}
13-
});
6+
console.log('Connected!');
7+
8+
client.emit('auth', client.handshake.auth.password === '123' ? 'success' : 'failed')
149
};
1510
io.on('connection', callback);
1611
io.of('/admin').on('connection', callback);

socketio/src/client/builder.rs

+37-3
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use rust_engineio::client::ClientBuilder as EngineIoClientBuilder;
66
use rust_engineio::header::{HeaderMap, HeaderValue};
77
use url::Url;
88

9+
use crate::client::callback::{SocketAnyCallback, SocketCallback};
910
use crate::error::{Error, Result};
1011
use std::collections::HashMap;
1112
use std::thread;
@@ -31,7 +32,8 @@ pub enum TransportType {
3132
/// acts the `build` method and returns a connected [`Client`].
3233
pub struct ClientBuilder {
3334
address: String,
34-
on: HashMap<Event, Callback>,
35+
on: HashMap<Event, Callback<SocketCallback>>,
36+
on_any: Option<Callback<SocketAnyCallback>>,
3537
namespace: String,
3638
tls_config: Option<TlsConnector>,
3739
opening_headers: Option<HeaderMap>,
@@ -74,6 +76,7 @@ impl ClientBuilder {
7476
Self {
7577
address: address.into(),
7678
on: HashMap::new(),
79+
on_any: None,
7780
namespace: "/".to_owned(),
7881
tls_config: None,
7982
opening_headers: None,
@@ -117,7 +120,32 @@ impl ClientBuilder {
117120
where
118121
F: for<'a> FnMut(Payload, Client) + 'static + Sync + Send,
119122
{
120-
self.on.insert(event.into(), Callback::new(callback));
123+
self.on
124+
.insert(event.into(), Callback::<SocketCallback>::new(callback));
125+
self
126+
}
127+
128+
/// Registers a Callback for all [`crate::event::Event::Custom`] and [`crate::event::Event::Message`].
129+
///
130+
/// # Example
131+
/// ```rust
132+
/// use rust_socketio::{ClientBuilder, Payload};
133+
///
134+
/// let client = ClientBuilder::new("http://localhost:4200/")
135+
/// .namespace("/admin")
136+
/// .on_any(|event, payload, _client| {
137+
/// if let Payload::String(str) = payload {
138+
/// println!("{} {}", String::from(event), str);
139+
/// }
140+
/// })
141+
/// .connect();
142+
///
143+
/// ```
144+
pub fn on_any<F>(mut self, callback: F) -> Self
145+
where
146+
F: for<'a> FnMut(Event, Payload, Client) + 'static + Sync + Send,
147+
{
148+
self.on_any = Some(Callback::<SocketAnyCallback>::new(callback));
121149
self
122150
}
123151

@@ -283,7 +311,13 @@ impl ClientBuilder {
283311

284312
let inner_socket = InnerSocket::new(engine_client)?;
285313

286-
let socket = Client::new(inner_socket, &self.namespace, self.on, self.auth)?;
314+
let socket = Client::new(
315+
inner_socket,
316+
&self.namespace,
317+
self.on,
318+
self.on_any,
319+
self.auth,
320+
)?;
287321
socket.connect()?;
288322

289323
Ok(socket)

socketio/src/client/callback.rs

+45-8
Original file line numberDiff line numberDiff line change
@@ -4,35 +4,39 @@ use std::{
44
};
55

66
use super::Client;
7-
use crate::Payload;
7+
use crate::{Event, Payload};
88

9-
type InnerCallback = Box<dyn for<'a> FnMut(Payload, Client) + 'static + Sync + Send>;
9+
pub(crate) type SocketCallback = Box<dyn for<'a> FnMut(Payload, Client) + 'static + Sync + Send>;
10+
pub(crate) type SocketAnyCallback =
11+
Box<dyn for<'a> FnMut(Event, Payload, Client) + 'static + Sync + Send>;
1012

11-
pub(crate) struct Callback {
12-
inner: InnerCallback,
13+
pub(crate) struct Callback<T> {
14+
inner: T,
1315
}
1416

15-
impl Debug for Callback {
17+
// SocketCallback implementations
18+
19+
impl Debug for Callback<SocketCallback> {
1620
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1721
f.write_str("Callback")
1822
}
1923
}
2024

21-
impl Deref for Callback {
25+
impl Deref for Callback<SocketCallback> {
2226
type Target = dyn for<'a> FnMut(Payload, Client) + 'static + Sync + Send;
2327

2428
fn deref(&self) -> &Self::Target {
2529
self.inner.as_ref()
2630
}
2731
}
2832

29-
impl DerefMut for Callback {
33+
impl DerefMut for Callback<SocketCallback> {
3034
fn deref_mut(&mut self) -> &mut Self::Target {
3135
self.inner.as_mut()
3236
}
3337
}
3438

35-
impl Callback {
39+
impl Callback<SocketCallback> {
3640
pub(crate) fn new<T>(callback: T) -> Self
3741
where
3842
T: for<'a> FnMut(Payload, Client) + 'static + Sync + Send,
@@ -42,3 +46,36 @@ impl Callback {
4246
}
4347
}
4448
}
49+
50+
// SocketAnyCallback implementations
51+
52+
impl Debug for Callback<SocketAnyCallback> {
53+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54+
f.write_str("Callback")
55+
}
56+
}
57+
58+
impl Deref for Callback<SocketAnyCallback> {
59+
type Target = dyn for<'a> FnMut(Event, Payload, Client) + 'static + Sync + Send;
60+
61+
fn deref(&self) -> &Self::Target {
62+
self.inner.as_ref()
63+
}
64+
}
65+
66+
impl DerefMut for Callback<SocketAnyCallback> {
67+
fn deref_mut(&mut self) -> &mut Self::Target {
68+
self.inner.as_mut()
69+
}
70+
}
71+
72+
impl Callback<SocketAnyCallback> {
73+
pub(crate) fn new<T>(callback: T) -> Self
74+
where
75+
T: for<'a> FnMut(Event, Payload, Client) + 'static + Sync + Send,
76+
{
77+
Callback {
78+
inner: Box::new(callback),
79+
}
80+
}
81+
}

socketio/src/client/client.rs

+77-23
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use super::callback::Callback;
33
use crate::packet::{Packet, PacketId};
44
use rand::{thread_rng, Rng};
55

6+
use crate::client::callback::{SocketAnyCallback, SocketCallback};
67
use crate::error::Result;
78
use std::collections::HashMap;
89
use std::ops::DerefMut;
@@ -21,7 +22,7 @@ pub struct Ack {
2122
pub id: i32,
2223
timeout: Duration,
2324
time_started: Instant,
24-
callback: Callback,
25+
callback: Callback<SocketCallback>,
2526
}
2627

2728
/// A socket which handles communication with the server. It's initialized with
@@ -31,7 +32,8 @@ pub struct Ack {
3132
pub struct Client {
3233
/// The inner socket client to delegate the methods to.
3334
socket: InnerSocket,
34-
on: Arc<RwLock<HashMap<Event, Callback>>>,
35+
on: Arc<RwLock<HashMap<Event, Callback<SocketCallback>>>>,
36+
on_any: Arc<RwLock<Option<Callback<SocketAnyCallback>>>>,
3537
outstanding_acks: Arc<RwLock<Vec<Ack>>>,
3638
// namespace, for multiplexing messages
3739
nsp: String,
@@ -47,13 +49,15 @@ impl Client {
4749
pub(crate) fn new<T: Into<String>>(
4850
socket: InnerSocket,
4951
namespace: T,
50-
on: HashMap<Event, Callback>,
52+
on: HashMap<Event, Callback<SocketCallback>>,
53+
on_any: Option<Callback<SocketAnyCallback>>,
5154
auth: Option<serde_json::Value>,
5255
) -> Result<Self> {
5356
Ok(Client {
5457
socket,
5558
nsp: namespace.into(),
5659
on: Arc::new(RwLock::new(on)),
60+
on_any: Arc::new(RwLock::new(on_any)),
5761
outstanding_acks: Arc::new(RwLock::new(Vec::new())),
5862
auth,
5963
})
@@ -202,7 +206,7 @@ impl Client {
202206
id,
203207
time_started: Instant::now(),
204208
timeout,
205-
callback: Callback::new(callback),
209+
callback: Callback::<SocketCallback>::new(callback),
206210
};
207211

208212
// add the ack to the tuple of outstanding acks
@@ -238,11 +242,25 @@ impl Client {
238242

239243
fn callback<P: Into<Payload>>(&self, event: &Event, payload: P) -> Result<()> {
240244
let mut on = self.on.write()?;
245+
let mut on_any = self.on_any.write()?;
241246
let lock = on.deref_mut();
247+
let on_any_lock = on_any.deref_mut();
248+
249+
let payload = payload.into();
250+
242251
if let Some(callback) = lock.get_mut(event) {
243-
callback(payload.into(), self.clone());
252+
callback(payload.clone(), self.clone());
253+
}
254+
match event.clone() {
255+
Event::Message | Event::Custom(_) => {
256+
if let Some(callback) = on_any_lock {
257+
callback(event.clone(), payload, self.clone())
258+
}
259+
}
260+
_ => {}
244261
}
245262
drop(on);
263+
drop(on_any);
246264
Ok(())
247265
}
248266

@@ -530,33 +548,69 @@ mod test {
530548
test_socketio_socket(socket, "/admin".to_owned())
531549
}
532550

551+
#[test]
552+
fn socket_io_on_any_integration() -> Result<()> {
553+
let url = crate::test::socket_io_server();
554+
555+
let (tx, rx) = mpsc::sync_channel(1);
556+
557+
let _socket = ClientBuilder::new(url)
558+
.namespace("/")
559+
.auth(json!({ "password": "123" }))
560+
.on("auth", |payload, _client| {
561+
if let Payload::String(msg) = payload {
562+
println!("{}", msg);
563+
}
564+
})
565+
.on_any(move |event, payload, _client| {
566+
if let Payload::String(str) = payload {
567+
println!("{} {}", String::from(event.clone()), str);
568+
}
569+
tx.send(String::from(event)).unwrap();
570+
})
571+
.connect()?;
572+
573+
// Sleep to give server enough time to send 2 events
574+
sleep(Duration::from_secs(2));
575+
576+
let event = rx.recv().unwrap();
577+
assert_eq!(event, "message");
578+
let event = rx.recv().unwrap();
579+
assert_eq!(event, "test");
580+
581+
Ok(())
582+
}
583+
533584
#[test]
534585
fn socket_io_auth_builder_integration() -> Result<()> {
535586
let url = crate::test::socket_io_auth_server();
587+
let nsp = String::from("/admin");
536588
let socket = ClientBuilder::new(url)
537-
.namespace("/admin")
589+
.namespace(nsp.clone())
538590
.auth(json!({ "password": "123" }))
539-
.connect()?;
591+
.connect_manual()?;
540592

541-
let (tx, rx) = mpsc::sync_channel(0);
593+
let mut iter = socket
594+
.iter()
595+
.map(|packet| packet.unwrap())
596+
.filter(|packet| packet.packet_type != PacketId::Connect);
542597

543-
// Send emit with ack after 1s, so socketio server has enough time to register it's listeners
544-
sleep(Duration::from_secs(1));
598+
let packet: Option<Packet> = iter.next();
599+
assert!(packet.is_some());
545600

546-
assert!(socket
547-
.emit_with_ack(
548-
"test",
549-
json!({ "msg": "1" }),
550-
Duration::from_secs(1),
551-
move |payload, _| {
552-
println!("Got ack");
553-
tx.send(Payload::from(json!(["456"])) == payload).unwrap();
554-
}
555-
)
556-
.is_ok());
601+
let packet = packet.unwrap();
557602

558-
let received = rx.recv();
559-
assert!(received.is_ok() && received.unwrap());
603+
assert_eq!(
604+
packet,
605+
Packet::new(
606+
PacketId::Event,
607+
nsp.clone(),
608+
Some("[\"auth\",\"success\"]".to_owned()),
609+
None,
610+
0,
611+
None
612+
)
613+
);
560614

561615
Ok(())
562616
}

0 commit comments

Comments
 (0)