Skip to content

Commit 3b2f10a

Browse files
authored
Merge pull request #21 from rust-amplify/waker
Refactor poller to provide custom wakers
2 parents fbf573b + a7f48fd commit 3b2f10a

File tree

5 files changed

+99
-81
lines changed

5 files changed

+99
-81
lines changed

Cargo.lock

+2-2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ name = "reactor"
2121
[dependencies]
2222
amplify = { version = "4.0.0", features = ["hex"] }
2323
crossbeam-channel = "0.5.8"
24-
popol = { version = "2.2.0", optional = true }
24+
popol = { version = "3.0.0", optional = true }
2525
polling = { version = "2.7.0", optional = true }
2626
# epoll = { version = "4.3.1", optional = true } - NB: epoll not supported on MacOS
2727
mio = { version = "0.8.6", optional = true }

src/poller/mod.rs

+26
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,9 @@ where
150150
Self: Send + Iterator<Item = (RawFd, Result<IoType, IoFail>)>,
151151
for<'a> &'a mut Self: Iterator<Item = (RawFd, Result<IoType, IoFail>)>,
152152
{
153+
/// Waker type used by the poll provider.
154+
type Waker: Waker;
155+
153156
/// Registers a file-descriptor based resource for a poll.
154157
fn register(&mut self, fd: &impl AsRawFd, interest: IoType);
155158
/// Unregisters a file-descriptor based resource from a poll.
@@ -165,3 +168,26 @@ where
165168
/// Number of generated events.
166169
fn poll(&mut self, timeout: Option<Duration>) -> io::Result<usize>;
167170
}
171+
172+
/// Waker object provided by the poller.
173+
pub trait Waker {
174+
/// Data type for sending wake signals to the poller.
175+
type Send: WakerSend;
176+
/// Data type for receiving wake signals inside the poller.
177+
type Recv: WakerRecv;
178+
179+
/// Constructs pair of waker receiver and sender objects.
180+
fn pair() -> Result<(Self::Send, Self::Recv), io::Error>;
181+
}
182+
183+
/// Sending part of the waker.
184+
pub trait WakerSend: Send + Sync + Clone {
185+
/// Awakes the poller to read events.
186+
fn wake(&self) -> io::Result<()>;
187+
}
188+
189+
/// Receiver part of the waker.
190+
pub trait WakerRecv: AsRawFd + Send + io::Read {
191+
/// Resets the waker reader.
192+
fn reset(&self);
193+
}

src/poller/popol.rs

+46-2
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,12 @@
2424
//! Poll engine provided by the [`popol`] crate.
2525
2626
use std::collections::VecDeque;
27-
use std::io;
27+
use std::io::{self, Error};
2828
use std::os::unix::io::{AsRawFd, RawFd};
29+
use std::sync::Arc;
2930
use std::time::Duration;
3031

31-
use crate::poller::{IoFail, IoType, Poll};
32+
use crate::poller::{IoFail, IoType, Poll, Waker, WakerRecv, WakerSend};
3233

3334
/// Manager for a set of reactor which are polled for an event loop by the
3435
/// re-actor by using [`popol`] library.
@@ -61,6 +62,8 @@ impl Poller {
6162
}
6263

6364
impl Poll for Poller {
65+
type Waker = PopolWaker;
66+
6467
fn register(&mut self, fd: &impl AsRawFd, interest: IoType) {
6568
#[cfg(feature = "log")]
6669
log::trace!(target: "popol", "Registering {}", fd.as_raw_fd());
@@ -156,3 +159,44 @@ impl From<IoType> for popol::Interest {
156159
e
157160
}
158161
}
162+
163+
/// Wrapper type around the waker provided by `popol` crate.
164+
#[derive(Clone)]
165+
pub struct PopolWaker(Arc<popol::Waker>);
166+
167+
impl Waker for PopolWaker {
168+
type Send = Self;
169+
type Recv = Self;
170+
171+
fn pair() -> Result<(Self::Send, Self::Recv), Error> {
172+
let waker = Arc::new(popol::Waker::new()?);
173+
Ok((PopolWaker(waker.clone()), PopolWaker(waker)))
174+
}
175+
}
176+
177+
impl io::Read for PopolWaker {
178+
fn read(&mut self, _buf: &mut [u8]) -> io::Result<usize> {
179+
self.reset();
180+
// Waker reads only when there is something which was sent.
181+
// That's why we just return here.
182+
Ok(0)
183+
}
184+
}
185+
186+
impl AsRawFd for PopolWaker {
187+
fn as_raw_fd(&self) -> RawFd { self.0.as_ref().as_raw_fd() }
188+
}
189+
190+
impl WakerRecv for PopolWaker {
191+
fn reset(&self) {
192+
if let Err(e) = popol::Waker::reset(self.0.as_ref()) {
193+
#[cfg(feature = "log")]
194+
log::error!(target: "reactor-controller", "Unable to reset waker queue: {e}");
195+
panic!("unable to reset waker queue. Details: {e}");
196+
}
197+
}
198+
}
199+
200+
impl WakerSend for PopolWaker {
201+
fn wake(&self) -> io::Result<()> { self.0.wake() }
202+
}

src/reactor.rs

+24-76
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,14 @@
2323

2424
use std::collections::HashMap;
2525
use std::fmt::{Debug, Display, Formatter};
26-
use std::io::Write;
2726
use std::os::unix::io::{AsRawFd, RawFd};
28-
use std::os::unix::net::UnixStream;
29-
use std::sync::Arc;
3027
use std::thread::JoinHandle;
3128
use std::time::Duration;
3229
use std::{io, thread};
3330

3431
use crossbeam_channel as chan;
3532

36-
use crate::poller::{IoFail, IoType, Poll};
33+
use crate::poller::{IoFail, IoType, Poll, Waker, WakerRecv, WakerSend};
3734
use crate::resource::WriteError;
3835
use crate::{Resource, Timer, Timestamp, WriteAtomic};
3936

@@ -191,12 +188,12 @@ pub trait Handler: Send + Iterator<Item = Action<Self::Listener, Self::Transport
191188
///
192189
/// Apps running the [`Reactor`] can interface it and a [`Handler`] via use of the [`Controller`]
193190
/// API.
194-
pub struct Reactor<C> {
191+
pub struct Reactor<C, P: Poll> {
195192
thread: JoinHandle<()>,
196-
controller: Controller<C>,
193+
controller: Controller<C, <P::Waker as Waker>::Send>,
197194
}
198195

199-
impl<C> Reactor<C> {
196+
impl<C, P: Poll> Reactor<C, P> {
200197
/// Creates new reactor using provided [`Poll`] engine and a service exposing [`Handler`] API to
201198
/// the reactor.
202199
///
@@ -206,7 +203,7 @@ impl<C> Reactor<C> {
206203
/// # Error
207204
///
208205
/// Errors with a system/OS error if it was impossible to spawn a thread.
209-
pub fn new<P: Poll, H: Handler<Command = C>>(service: H, poller: P) -> Result<Self, io::Error>
206+
pub fn new<H: Handler<Command = C>>(service: H, poller: P) -> Result<Self, io::Error>
210207
where
211208
H: 'static,
212209
P: 'static,
@@ -225,7 +222,7 @@ impl<C> Reactor<C> {
225222
/// # Error
226223
///
227224
/// Errors with a system/OS error if it was impossible to spawn a thread.
228-
pub fn named<P: Poll, H: Handler<Command = C>>(
225+
pub fn named<H: Handler<Command = C>>(
229226
service: H,
230227
poller: P,
231228
thread_name: String,
@@ -248,7 +245,7 @@ impl<C> Reactor<C> {
248245
/// # Error
249246
///
250247
/// Errors with a system/OS error if it was impossible to spawn a thread.
251-
pub fn with<P: Poll, H: Handler<Command = C>>(
248+
pub fn with<H: Handler<Command = C>>(
252249
service: H,
253250
mut poller: P,
254251
builder: thread::Builder,
@@ -260,13 +257,11 @@ impl<C> Reactor<C> {
260257
{
261258
let (ctl_send, ctl_recv) = chan::unbounded();
262259

263-
let (waker_writer, waker_reader) = UnixStream::pair()?;
264-
waker_reader.set_nonblocking(true)?;
265-
waker_writer.set_nonblocking(true)?;
260+
let (waker_writer, waker_reader) = P::Waker::pair()?;
266261

267262
let controller = Controller {
268263
ctl_send,
269-
waker: Arc::new(waker_writer),
264+
waker: waker_writer,
270265
};
271266

272267
#[cfg(feature = "log")]
@@ -306,7 +301,7 @@ impl<C> Reactor<C> {
306301
/// running inside of its thread.
307302
///
308303
/// See [`Handler::Command`] for the details.
309-
pub fn controller(&self) -> Controller<C> { self.controller.clone() }
304+
pub fn controller(&self) -> Controller<C, <P::Waker as Waker>::Send> { self.controller.clone() }
310305

311306
/// Joins the reactor thread.
312307
pub fn join(self) -> thread::Result<()> { self.thread.join() }
@@ -323,12 +318,12 @@ enum Ctl<C> {
323318
/// API to the reactor itself for receiving reactor-generated events. This API is used by the
324319
/// reactor to inform the service about incoming commands, sent via this [`Controller`] API (see
325320
/// [`Handler::Command`] for the details).
326-
pub struct Controller<C> {
321+
pub struct Controller<C, W: WakerSend> {
327322
ctl_send: chan::Sender<Ctl<C>>,
328-
waker: Arc<UnixStream>,
323+
waker: W,
329324
}
330325

331-
impl<C> Clone for Controller<C> {
326+
impl<C, W: WakerSend> Clone for Controller<C, W> {
332327
fn clone(&self) -> Self {
333328
Controller {
334329
ctl_send: self.ctl_send.clone(),
@@ -337,7 +332,7 @@ impl<C> Clone for Controller<C> {
337332
}
338333
}
339334

340-
impl<C> Controller<C> {
335+
impl<C, W: WakerSend> Controller<C, W> {
341336
/// Send a command to the service inside a [`Reactor`] or a reactor [`Runtime`].
342337
#[allow(unused_mut)] // because of the `log` feature gate
343338
pub fn cmd(&self, mut command: C) -> Result<(), io::Error>
@@ -377,56 +372,9 @@ impl<C> Controller<C> {
377372
}
378373

379374
fn wake(&self) -> io::Result<()> {
380-
use io::ErrorKind::*;
381-
382375
#[cfg(feature = "log")]
383376
log::trace!(target: "reactor-controller", "Wakening the reactor");
384-
385-
loop {
386-
let mut waker = self.waker.as_ref();
387-
match (&mut waker).write_all(&[0x1]) {
388-
Ok(_) => return Ok(()),
389-
Err(e) if e.kind() == WouldBlock => {
390-
#[cfg(feature = "log")]
391-
log::error!(target: "reactor-controller", "Waker write queue got overfilled, resetting and repeating...");
392-
reset_fd(&self.waker.as_raw_fd())?;
393-
}
394-
Err(e) if e.kind() == Interrupted => {
395-
#[cfg(feature = "log")]
396-
log::error!(target: "reactor-controller", "Waker failure, repeating...");
397-
}
398-
Err(e) => {
399-
#[cfg(feature = "log")]
400-
log::error!(target: "reactor-controller", "Waker error: {e}");
401-
402-
return Err(e);
403-
}
404-
}
405-
}
406-
}
407-
}
408-
409-
fn reset_fd(fd: &impl AsRawFd) -> io::Result<()> {
410-
let mut buf = [0u8; 4096];
411-
412-
loop {
413-
// We use a low-level "read" here because the alternative is to create a `UnixStream`
414-
// from the `RawFd`, which has "drop" semantics which we want to avoid.
415-
match unsafe {
416-
libc::read(fd.as_raw_fd(), buf.as_mut_ptr() as *mut libc::c_void, buf.len())
417-
} {
418-
-1 => match io::Error::last_os_error() {
419-
e if e.kind() == io::ErrorKind::WouldBlock => return Ok(()),
420-
e => {
421-
#[cfg(feature = "log")]
422-
log::error!(target: "reactor-controller", "Unable to reset waker queue: {e}");
423-
424-
return Err(e);
425-
}
426-
},
427-
0 => return Ok(()),
428-
_ => continue,
429-
}
377+
self.waker.wake()
430378
}
431379
}
432380

@@ -440,13 +388,13 @@ fn reset_fd(fd: &impl AsRawFd) -> io::Result<()> {
440388
pub struct Runtime<H: Handler, P: Poll> {
441389
service: H,
442390
poller: P,
443-
controller: Controller<H::Command>,
391+
controller: Controller<H::Command, <P::Waker as Waker>::Send>,
444392
ctl_recv: chan::Receiver<Ctl<H::Command>>,
445393
listener_map: HashMap<RawFd, <H::Listener as Resource>::Id>,
446394
transport_map: HashMap<RawFd, <H::Transport as Resource>::Id>,
447395
listeners: HashMap<<H::Listener as Resource>::Id, H::Listener>,
448396
transports: HashMap<<H::Transport as Resource>::Id, H::Transport>,
449-
waker: UnixStream,
397+
waker: <P::Waker as Waker>::Recv,
450398
timeouts: Timer,
451399
}
452400

@@ -456,13 +404,11 @@ impl<H: Handler, P: Poll> Runtime<H, P> {
456404
pub fn with(service: H, poller: P) -> io::Result<Self> {
457405
let (ctl_send, ctl_recv) = chan::unbounded();
458406

459-
let (waker_writer, waker_reader) = UnixStream::pair()?;
460-
waker_reader.set_nonblocking(true)?;
461-
waker_writer.set_nonblocking(true)?;
407+
let (waker_writer, waker_reader) = P::Waker::pair()?;
462408

463409
let controller = Controller {
464410
ctl_send,
465-
waker: Arc::new(waker_writer),
411+
waker: waker_writer,
466412
};
467413

468414
Ok(Runtime {
@@ -483,7 +429,9 @@ impl<H: Handler, P: Poll> Runtime<H, P> {
483429
/// running inside of its thread.
484430
///
485431
/// See [`Handler::Command`] for the details.
486-
pub fn controller(&self) -> Controller<H::Command> { self.controller.clone() }
432+
pub fn controller(&self) -> Controller<H::Command, <P::Waker as Waker>::Send> {
433+
self.controller.clone()
434+
}
487435

488436
fn run(mut self) {
489437
loop {
@@ -565,7 +513,7 @@ impl<H: Handler, P: Poll> Runtime<H, P> {
565513
#[cfg(feature = "log")]
566514
log::trace!(target: "reactor", "Awoken by the controller");
567515

568-
reset_fd(&self.waker).expect("waker failure");
516+
self.waker.reset();
569517
awoken = true;
570518
} else if let Some(id) = self.listener_map.get(&fd) {
571519
match res {
@@ -803,7 +751,7 @@ mod test {
803751
impl AsRawFd for DumbRes {
804752
fn as_raw_fd(&self) -> RawFd { self.0.as_raw_fd() }
805753
}
806-
impl Write for DumbRes {
754+
impl io::Write for DumbRes {
807755
fn write(&mut self, buf: &[u8]) -> io::Result<usize> { Ok(buf.len()) }
808756
fn flush(&mut self) -> io::Result<()> { Ok(()) }
809757
}

0 commit comments

Comments
 (0)