Skip to content

Commit 745b373

Browse files
authored
Merge pull request #28 from rust-amplify/fix/hangup
Fix missed service notifications on all resource unregistrations
2 parents ba5f1b0 + ceb7c48 commit 745b373

File tree

2 files changed

+14
-36
lines changed

2 files changed

+14
-36
lines changed

src/poller/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -133,9 +133,9 @@ impl Display for IoType {
133133
#[derive(Copy, Clone, Debug, Display, Error)]
134134
#[display(doc_comments)]
135135
pub enum IoFail {
136-
/// connection is absent (POSIX events {0:#b})
136+
/// hung up (POSIX events {0:#b})
137137
Connectivity(i16),
138-
/// OS-level error (POSIX events {0:#b})
138+
/// errored (POSIX events {0:#b})
139139
Os(i16),
140140
}
141141

src/reactor.rs

+12-34
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use std::{io, thread};
3232

3333
use crossbeam_channel as chan;
3434

35-
use crate::poller::{IoFail, IoType, Poll, Waker, WakerRecv, WakerSend};
35+
use crate::poller::{IoType, Poll, Waker, WakerRecv, WakerSend};
3636
use crate::resource::WriteError;
3737
use crate::{Resource, ResourceId, ResourceType, Timer, Timestamp, WriteAtomic};
3838

@@ -505,11 +505,10 @@ impl<H: Handler, P: Poll> Runtime<H, P> {
505505

506506
/// # Returns
507507
///
508-
/// Whether it was awaken by a waker
508+
/// Whether it was awakened by a waker
509509
fn handle_events(&mut self, time: Timestamp) -> bool {
510510
let mut awoken = false;
511511

512-
let mut unregister_queue = vec![];
513512
while let Some((id, res)) = self.poller.next() {
514513
if id == ResourceId::WAKER {
515514
if let Err(err) = res {
@@ -536,19 +535,13 @@ impl<H: Handler, P: Poll> Runtime<H, P> {
536535
}
537536
}
538537
}
539-
Err(IoFail::Connectivity(flags)) => {
538+
Err(err) => {
540539
#[cfg(feature = "log")]
541-
log::trace!(target: "reactor", "Listener {id} hung up (OS flags {flags:#b})");
542-
543-
let listener = self.listeners.remove(&id).expect("resource disappeared");
544-
unregister_queue.push(id);
540+
log::trace!(target: "reactor", "Listener {id} {err}");
541+
let listener =
542+
self.unregister_listener(id).expect("listener has disappeared");
545543
self.service.handle_error(Error::ListenerDisconnect(id, listener));
546544
}
547-
Err(IoFail::Os(flags)) => {
548-
#[cfg(feature = "log")]
549-
log::trace!(target: "reactor", "Listener {id} errored (OS flags {flags:#b})");
550-
self.unregister_listener(id);
551-
}
552545
}
553546
} else if self.transports.contains_key(&id) {
554547
match res {
@@ -563,19 +556,13 @@ impl<H: Handler, P: Poll> Runtime<H, P> {
563556
}
564557
}
565558
}
566-
Err(IoFail::Connectivity(posix_events)) => {
559+
Err(err) => {
567560
#[cfg(feature = "log")]
568-
log::trace!(target: "reactor", "Transport {id} hanged up (POSIX events are {posix_events:#b})");
569-
570-
let transport = self.transports.remove(&id).expect("resource disappeared");
571-
unregister_queue.push(id);
561+
log::trace!(target: "reactor", "Transport {id} {err}");
562+
let transport =
563+
self.unregister_transport(id).expect("transport has disappeared");
572564
self.service.handle_error(Error::TransportDisconnect(id, transport));
573565
}
574-
Err(IoFail::Os(posix_events)) => {
575-
#[cfg(feature = "log")]
576-
log::trace!(target: "reactor", "Transport {id} errored (POSIX events are {posix_events:#b})");
577-
self.unregister_transport(id);
578-
}
579566
}
580567
} else {
581568
panic!(
@@ -584,11 +571,6 @@ impl<H: Handler, P: Poll> Runtime<H, P> {
584571
}
585572
}
586573

587-
// We need this b/c of borrow checker
588-
for id in unregister_queue {
589-
self.poller.unregister(id);
590-
}
591-
592574
awoken
593575
}
594576

@@ -708,10 +690,8 @@ impl<H: Handler, P: Poll> Runtime<H, P> {
708690
return None;
709691
};
710692

711-
let fd = listener.as_raw_fd();
712-
713693
#[cfg(feature = "log")]
714-
log::debug!(target: "reactor", "Handling over listener {id} (fd={fd})");
694+
log::debug!(target: "reactor", "Handling over listener {id} (fd={})", listener.as_raw_fd());
715695

716696
self.poller.unregister(id);
717697

@@ -725,10 +705,8 @@ impl<H: Handler, P: Poll> Runtime<H, P> {
725705
return None;
726706
};
727707

728-
let fd = transport.as_raw_fd();
729-
730708
#[cfg(feature = "log")]
731-
log::debug!(target: "reactor", "Unregistering over transport {id} (fd={fd})");
709+
log::debug!(target: "reactor", "Unregistering over transport {id} (fd={})", transport.as_raw_fd());
732710

733711
self.poller.unregister(id);
734712

0 commit comments

Comments
 (0)