Skip to content

Commit 8e7d276

Browse files
committed
Support reject setup.
1 parent 58da718 commit 8e7d276

File tree

7 files changed

+52
-20
lines changed

7 files changed

+52
-20
lines changed

README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
3333
.transport(&addr)
3434
.acceptor(|setup, _socket| {
3535
info!("accept setup: {:?}", setup);
36-
Box::new(EchoRSocket)
36+
Ok(Box::new(EchoRSocket))
37+
// Or you can reject setup
38+
// Err(From::from("SETUP_NOT_ALLOW"))
3739
})
3840
.serve()
3941
.await

examples/echo/main.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
1616
.transport(&addr)
1717
.acceptor(|setup, _socket| {
1818
info!("accept setup: {:?}", setup);
19-
Box::new(EchoRSocket)
19+
Ok(Box::new(EchoRSocket))
20+
// Or you can reject setup
21+
// Err(From::from("SETUP_NOT_ALLOW"))
2022
})
2123
.on_start(|| info!("+++++++ echo server started! +++++++"))
2224
.serve()

examples/proxy/main.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,15 @@ async fn main() -> Result<(), Box<dyn Error>> {
1616
RSocketFactory::receive()
1717
.acceptor(|setup, _sending_socket| {
1818
info!("incoming socket: setup={:?}", setup);
19-
Box::new(block_on(async move {
19+
Ok(Box::new(block_on(async move {
2020
RSocketFactory::connect()
2121
.acceptor(|| Box::new(EchoRSocket))
2222
.setup(Payload::from("I'm Rust!"))
2323
.transport("tcp://127.0.0.1:7878")
2424
.start()
2525
.await
2626
.unwrap()
27-
}))
27+
})))
2828
})
2929
.transport("tcp://127.0.0.1:7979")
3030
.serve()

src/spi.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ impl RSocket for EchoRSocket {
5757
}
5858
}
5959

60-
pub struct EmptyRSocket;
60+
pub(crate) struct EmptyRSocket;
6161

6262
impl EmptyRSocket {
6363
fn must_failed(&self) -> RSocketError {

src/transport/socket.rs

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use super::misc::{self, Counter, StreamID};
22
use super::spi::*;
3-
use crate::errors::{ErrorKind, RSocketError, ERR_APPLICATION};
3+
use crate::errors::{self, ErrorKind, RSocketError};
44
use crate::frame::{self, Body, Frame};
55
use crate::payload::{Payload, SetupPayload};
66
use crate::result::RSocketResult;
@@ -103,7 +103,17 @@ impl DuplexSocket {
103103
let flag = msg.get_flag();
104104
misc::debug_frame(false, &msg);
105105
match msg.get_body() {
106-
Body::Setup(v) => self.on_setup(&acceptor, sid, flag, SetupPayload::from(v)),
106+
Body::Setup(v) => {
107+
if let Err(e) = self.on_setup(&acceptor, sid, flag, SetupPayload::from(v)) {
108+
let errmsg = format!("{}", e);
109+
let sending = frame::Error::builder(0, 0)
110+
.set_code(errors::ERR_REJECT_SETUP)
111+
.set_data(Bytes::from(errmsg))
112+
.build();
113+
self.tx.send(sending).unwrap();
114+
return;
115+
}
116+
}
107117
Body::Resume(v) => {
108118
// TODO: support resume
109119
}
@@ -225,16 +235,28 @@ impl DuplexSocket {
225235
}
226236

227237
#[inline]
228-
fn on_setup(&self, acceptor: &Acceptor, sid: u32, flag: u16, setup: SetupPayload) {
238+
fn on_setup(
239+
&self,
240+
acceptor: &Acceptor,
241+
sid: u32,
242+
flag: u16,
243+
setup: SetupPayload,
244+
) -> Result<(), Box<dyn Error>> {
229245
match acceptor {
230246
Acceptor::Simple(gen) => {
231247
self.responder.set(gen());
248+
Ok(())
232249
}
233-
Acceptor::Generate(gen) => {
234-
self.responder.set(gen(setup, Box::new(self.clone())));
235-
}
250+
Acceptor::Generate(gen) => match gen(setup, Box::new(self.clone())) {
251+
Ok(it) => {
252+
self.responder.set(it);
253+
Ok(())
254+
}
255+
Err(e) => Err(e),
256+
},
236257
Acceptor::Empty() => {
237258
self.responder.set(Box::new(EmptyRSocket));
259+
Ok(())
238260
}
239261
}
240262
}
@@ -277,7 +299,7 @@ impl DuplexSocket {
277299
bu.build()
278300
}
279301
Err(e) => frame::Error::builder(sid, 0)
280-
.set_code(ERR_APPLICATION)
302+
.set_code(errors::ERR_APPLICATION)
281303
.set_data(Bytes::from("TODO: should be error details"))
282304
.build(),
283305
};

src/transport/spi.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
use crate::frame::Frame;
22
use crate::payload::SetupPayload;
33
use crate::spi::RSocket;
4+
use std::error::Error;
45
use std::future::Future;
6+
use std::result::Result;
57
use std::sync::Arc;
68
use tokio::sync::{mpsc, oneshot};
79

@@ -33,7 +35,8 @@ impl Transport {
3335
}
3436
}
3537

36-
type FnAcceptorWithSetup = fn(SetupPayload, Box<dyn RSocket>) -> Box<dyn RSocket>;
38+
pub type FnAcceptorWithSetup =
39+
fn(SetupPayload, Box<dyn RSocket>) -> Result<Box<dyn RSocket>, Box<dyn Error>>;
3740

3841
pub(crate) enum Acceptor {
3942
Simple(Arc<fn() -> Box<dyn RSocket>>),

src/x/server.rs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,19 @@ use crate::errors::RSocketError;
33
use crate::frame::{self, Frame};
44
use crate::payload::SetupPayload;
55
use crate::spi::{EmptyRSocket, RSocket};
6-
use crate::transport::{Acceptor, DuplexSocket};
6+
use crate::transport::{Acceptor, DuplexSocket, FnAcceptorWithSetup};
77
use std::error::Error;
88
use std::net::SocketAddr;
9+
use std::result::Result;
910
use std::sync::Arc;
1011
use tokio::net::TcpListener;
1112
use tokio::sync::mpsc;
1213

13-
type FnSetup = fn(SetupPayload, Box<dyn RSocket>) -> Box<dyn RSocket>;
1414
type FnStart = fn();
1515

1616
pub struct ServerBuilder {
1717
uri: Option<String>,
18-
on_setup: FnSetup,
18+
on_setup: FnAcceptorWithSetup,
1919
start_handler: Option<FnStart>,
2020
}
2121

@@ -34,7 +34,7 @@ impl ServerBuilder {
3434
}
3535
}
3636

37-
pub fn acceptor(mut self, handler: FnSetup) -> Self {
37+
pub fn acceptor(mut self, handler: FnAcceptorWithSetup) -> Self {
3838
self.on_setup = handler;
3939
self
4040
}
@@ -64,7 +64,7 @@ impl ServerBuilder {
6464
#[inline]
6565
async fn serve_tcp(
6666
addr: SocketAddr,
67-
on_setup: FnSetup,
67+
on_setup: FnAcceptorWithSetup,
6868
on_start: Option<FnStart>,
6969
) -> Result<(), Box<dyn Error>> {
7070
let mut listener = TcpListener::bind(&addr).await.unwrap();
@@ -91,6 +91,9 @@ impl ServerBuilder {
9191
}
9292

9393
#[inline]
94-
fn on_setup_noop(_setup: SetupPayload, _socket: Box<dyn RSocket>) -> Box<dyn RSocket> {
95-
Box::new(EmptyRSocket)
94+
fn on_setup_noop(
95+
_setup: SetupPayload,
96+
_socket: Box<dyn RSocket>,
97+
) -> Result<Box<dyn RSocket>, Box<dyn Error>> {
98+
Ok(Box::new(EmptyRSocket))
9699
}

0 commit comments

Comments
 (0)