Skip to content

Commit abfd369

Browse files
authored
Merge pull request #26 from rust-amplify/timer
2 parents d43d1df + 370a15f commit abfd369

File tree

2 files changed

+66
-44
lines changed

2 files changed

+66
-44
lines changed

src/reactor.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -444,7 +444,7 @@ impl<H: Handler, P: Poll> Runtime<H, P> {
444444
fn run(mut self) {
445445
loop {
446446
let before_poll = Timestamp::now();
447-
let timeout = self.timeouts.next(before_poll).unwrap_or(WAIT_TIMEOUT);
447+
let timeout = self.timeouts.next_expiring_from(before_poll).unwrap_or(WAIT_TIMEOUT);
448448

449449
for (id, res) in &self.listeners {
450450
self.poller.set_interest(*id, res.interests());
@@ -463,7 +463,7 @@ impl<H: Handler, P: Poll> Runtime<H, P> {
463463

464464
// Nb. The way this is currently used basically ignores which keys have
465465
// timed out. So as long as *something* timed out, we wake the service.
466-
let timers_fired = self.timeouts.expire(now);
466+
let timers_fired = self.timeouts.remove_expired_by(now);
467467
if timers_fired > 0 {
468468
#[cfg(feature = "log")]
469469
log::trace!(target: "reactor", "Timer has fired");
@@ -688,7 +688,7 @@ impl<H: Handler, P: Poll> Runtime<H, P> {
688688
#[cfg(feature = "log")]
689689
log::debug!(target: "reactor", "Adding timer {duration:?} from now");
690690

691-
self.timeouts.set_timer(duration, time);
691+
self.timeouts.set_timeout(duration, time);
692692
}
693693
}
694694
Ok(())

src/timeouts.rs

+63-41
Original file line numberDiff line numberDiff line change
@@ -88,26 +88,23 @@ pub struct Timer {
8888
}
8989

9090
impl Timer {
91-
/// Create a new timeout manager.
92-
///
93-
/// Takes a threshold below which two timeouts cannot overlap.
91+
/// Create a new timer containing no timeouts.
9492
pub fn new() -> Self { Self { timeouts: bset! {} } }
9593

9694
/// Return the number of timeouts being tracked.
97-
pub fn len(&self) -> usize { self.timeouts.len() }
95+
pub fn count(&self) -> usize { self.timeouts.len() }
9896

9997
/// Check whether there are timeouts being tracked.
100-
pub fn is_empty(&self) -> bool { self.timeouts.is_empty() }
98+
pub fn has_timeouts(&self) -> bool { !self.timeouts.is_empty() }
10199

102-
/// Register a new timeout with an associated key and wake-up time from a
103-
/// UNIX time epoch.
104-
pub fn set_timer(&mut self, span: Duration, after: Timestamp) {
105-
let time = after + Timestamp(span.as_millis());
100+
/// Register a new timeout relative to a certain point in time.
101+
pub fn set_timeout(&mut self, timeout: Duration, after: Timestamp) {
102+
let time = after + Timestamp(timeout.as_millis());
106103
self.timeouts.insert(time);
107104
}
108105

109-
/// Get the minimum time duration we should wait for at least one timeout
110-
/// to be reached. Returns `None` if there are no timeouts.
106+
/// Get the first timeout expiring right at or after certain moment of time.
107+
/// Returns `None` if there are no timeouts.
111108
///
112109
/// ```
113110
/// # use std::time::{Duration};
@@ -116,30 +113,33 @@ impl Timer {
116113
/// let mut tm = Timer::new();
117114
///
118115
/// let now = Timestamp::now();
119-
/// tm.set_timer(Duration::from_secs(16), now);
120-
/// tm.set_timer(Duration::from_secs(8), now);
121-
/// tm.set_timer(Duration::from_secs(64), now);
116+
/// tm.set_timeout(Duration::from_secs(16), now);
117+
/// tm.set_timeout(Duration::from_secs(8), now);
118+
/// tm.set_timeout(Duration::from_secs(64), now);
122119
///
123120
/// let mut now = Timestamp::now();
124121
/// // We need to wait 8 secs to trigger the next timeout (1).
125-
/// assert!(tm.next(now) <= Some(Duration::from_secs(8)));
122+
/// assert!(tm.next_expiring_from(now) <= Some(Duration::from_secs(8)));
126123
///
127124
/// // ... sleep for a sec ...
128125
/// now += Duration::from_secs(1);
129126
///
130127
/// // Now we don't need to wait as long!
131-
/// assert!(tm.next(now).unwrap() <= Duration::from_secs(7));
128+
/// assert!(tm.next_expiring_from(now).unwrap() <= Duration::from_secs(7));
132129
/// ```
133-
pub fn next(&self, after: impl Into<Timestamp>) -> Option<Duration> {
134-
let after = after.into();
135-
self.timeouts
136-
.iter()
137-
.find(|t| **t >= after)
138-
.map(|t| Duration::from_millis((*t - after).as_millis()))
130+
pub fn next_expiring_from(&self, time: impl Into<Timestamp>) -> Option<Duration> {
131+
let time = time.into();
132+
let last = *self.timeouts.first()?;
133+
Some(if last >= time {
134+
Duration::from_millis(last.as_millis() - time.as_millis())
135+
} else {
136+
Duration::from_secs(0)
137+
})
139138
}
140139

141-
/// Returns vector of timers which has fired before certain time.
142-
pub fn expire(&mut self, time: Timestamp) -> usize {
140+
/// Removes timeouts which expire by a certain moment of time (inclusive),
141+
/// returning total number of timeouts which were removed.
142+
pub fn remove_expired_by(&mut self, time: Timestamp) -> usize {
143143
// Since `split_off` returns everything *after* the given key, including the key,
144144
// if a timer is set for exactly the given time, it would remain in the "after"
145145
// set of unexpired keys. This isn't what we want, therefore we add `1` to the
@@ -162,34 +162,56 @@ mod tests {
162162
let mut tm = Timer::new();
163163

164164
let now = Timestamp::now();
165-
tm.set_timer(Duration::from_secs(8), now);
166-
tm.set_timer(Duration::from_secs(9), now);
167-
tm.set_timer(Duration::from_secs(10), now);
165+
tm.set_timeout(Duration::from_secs(8), now);
166+
tm.set_timeout(Duration::from_secs(9), now);
167+
tm.set_timeout(Duration::from_secs(10), now);
168168

169-
assert_eq!(tm.expire(now + Duration::from_secs(9)), 2);
170-
assert_eq!(tm.len(), 1);
169+
assert_eq!(tm.remove_expired_by(now + Duration::from_secs(9)), 2);
170+
assert_eq!(tm.count(), 1);
171171
}
172172

173173
#[test]
174174
fn test_wake() {
175175
let mut tm = Timer::new();
176176

177177
let now = Timestamp::now();
178-
tm.set_timer(Duration::from_secs(8), now);
179-
tm.set_timer(Duration::from_secs(16), now);
180-
tm.set_timer(Duration::from_secs(64), now);
181-
tm.set_timer(Duration::from_secs(72), now);
178+
tm.set_timeout(Duration::from_secs(8), now);
179+
tm.set_timeout(Duration::from_secs(16), now);
180+
tm.set_timeout(Duration::from_secs(64), now);
181+
tm.set_timeout(Duration::from_secs(72), now);
182+
183+
assert_eq!(tm.remove_expired_by(now), 0);
184+
assert_eq!(tm.count(), 4);
185+
186+
assert_eq!(tm.remove_expired_by(now + Duration::from_secs(9)), 1);
187+
assert_eq!(tm.count(), 3, "one timeout has expired");
188+
189+
assert_eq!(tm.remove_expired_by(now + Duration::from_secs(66)), 2);
190+
assert_eq!(tm.count(), 1, "another two timeouts have expired");
191+
192+
assert_eq!(tm.remove_expired_by(now + Duration::from_secs(96)), 1);
193+
assert!(!tm.has_timeouts(), "all timeouts have expired");
194+
}
195+
196+
#[test]
197+
fn test_next() {
198+
let mut tm = Timer::new();
199+
200+
let mut now = Timestamp::now();
201+
tm.set_timeout(Duration::from_secs(3), now);
202+
assert_eq!(tm.next_expiring_from(now), Some(Duration::from_secs(3)));
182203

183-
assert_eq!(tm.expire(now), 0);
184-
assert_eq!(tm.len(), 4);
204+
now += Duration::from_secs(2);
205+
assert_eq!(tm.next_expiring_from(now), Some(Duration::from_secs(1)));
185206

186-
assert_eq!(tm.expire(now + Duration::from_secs(9)), 1);
187-
assert_eq!(tm.len(), 3, "one timeout has expired");
207+
now += Duration::from_secs(1);
208+
assert_eq!(tm.next_expiring_from(now), Some(Duration::from_secs(0)));
188209

189-
assert_eq!(tm.expire(now + Duration::from_secs(66)), 2);
190-
assert_eq!(tm.len(), 1, "another two timeouts have expired");
210+
now += Duration::from_secs(1);
211+
assert_eq!(tm.next_expiring_from(now), Some(Duration::from_secs(0)));
191212

192-
assert_eq!(tm.expire(now + Duration::from_secs(96)), 1);
193-
assert!(tm.is_empty(), "all timeouts have expired");
213+
assert_eq!(tm.remove_expired_by(now), 1);
214+
assert_eq!(tm.count(), 0);
215+
assert_eq!(tm.next_expiring_from(now), None);
194216
}
195217
}

0 commit comments

Comments
 (0)