Skip to content

Commit 3df945a

Browse files
committed
Keep track of absolute IDLE time for timeout
The code used to apply the set timeout value to the TcpStream before entering the IDLE loop. This effectively resets the timeout after receiving and handling incoming messages, which nullifies the purpose of the timeout when messages are received. This change remembers when the IDLE command is sent initially and uses that value to set the remaining time for the TcpStream timeout. This allows the IDLE loop to reconnect the IDLE connection at the appropriate time. Fixes #300.
1 parent 6fe22ed commit 3df945a

File tree

1 file changed

+75
-63
lines changed

1 file changed

+75
-63
lines changed

src/extensions/idle.rs

+75-63
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use rustls_connector::TlsStream as RustlsStream;
1313
use std::io::{self, Read, Write};
1414
use std::net::TcpStream;
1515
use std::ops::DerefMut;
16-
use std::time::Duration;
16+
use std::time::{Duration, Instant};
1717

1818
/// `Handle` allows a client to block waiting for changes to the remote mailbox.
1919
///
@@ -58,7 +58,7 @@ pub struct Handle<'a, T: Read + Write> {
5858
session: &'a mut Session<T>,
5959
timeout: Duration,
6060
keepalive: bool,
61-
done: bool,
61+
last_idle: Option<Instant>,
6262
}
6363

6464
/// The result of a wait on a [`Handle`]
@@ -91,11 +91,14 @@ impl<'a, T: Read + Write + 'a> Handle<'a, T> {
9191
session,
9292
timeout: Duration::from_secs(29 * 60),
9393
keepalive: true,
94-
done: false,
94+
last_idle: None,
9595
}
9696
}
9797

9898
fn init(&mut self) -> Result<()> {
99+
let last_idle = Instant::now();
100+
self.last_idle = Some(last_idle);
101+
99102
// https://tools.ietf.org/html/rfc2177
100103
//
101104
// The IDLE command takes no arguments.
@@ -108,39 +111,97 @@ impl<'a, T: Read + Write + 'a> Handle<'a, T> {
108111
let mut v = Vec::new();
109112
self.session.readline(&mut v)?;
110113
if v.starts_with(b"+") {
111-
self.done = false;
112114
return Ok(());
113115
}
114116

117+
self.last_idle = None;
115118
self.session.read_response_onto(&mut v)?;
116119
// We should *only* get a continuation on an error (i.e., it gives BAD or NO).
117120
unreachable!();
118121
}
119122

120123
fn terminate(&mut self) -> Result<()> {
121-
if !self.done {
122-
self.done = true;
124+
if let Some(_) = self.last_idle.take() {
123125
self.session.write_line(b"DONE")?;
124126
self.session.read_response().map(|_| ())
125127
} else {
126128
Ok(())
127129
}
128130
}
131+
}
129132

130-
/// Internal helper that doesn't consume self.
133+
impl<'a, T: SetReadTimeout + Read + Write + 'a> Handle<'a, T> {
134+
/// Set the timeout duration on the connection. This will also set the frequency
135+
/// at which the connection is refreshed.
131136
///
132-
/// This is necessary so that we can keep using the inner `Session` in `wait_while`.
133-
fn wait_inner<F>(&mut self, reconnect: bool, mut callback: F) -> Result<WaitOutcome>
137+
/// The interval defaults to 29 minutes as given in RFC 2177.
138+
pub fn timeout(&mut self, interval: Duration) -> &mut Self {
139+
self.timeout = interval;
140+
self
141+
}
142+
143+
/// Do not continuously refresh the IDLE connection in the background.
144+
///
145+
/// By default, connections will periodically be refreshed in the background using the
146+
/// timeout duration set by [`Handle::timeout`]. If you do not want this behaviour, call
147+
/// this function and the connection will simply IDLE until `wait_while` returns or
148+
/// the timeout expires.
149+
pub fn keepalive(&mut self, keepalive: bool) -> &mut Self {
150+
self.keepalive = keepalive;
151+
self
152+
}
153+
154+
/// Block until the given callback returns `false`, or until a response
155+
/// arrives that is not explicitly handled by [`UnsolicitedResponse`].
156+
pub fn wait_while<F>(&mut self, mut callback: F) -> Result<WaitOutcome>
134157
where
135158
F: FnMut(UnsolicitedResponse) -> bool,
136159
{
137160
let mut v = Vec::new();
138161
let result = loop {
139-
match self.session.readline(&mut v) {
162+
match {
163+
// The server MAY consider a client inactive if it has an IDLE command
164+
// running, and if such a server has an inactivity timeout it MAY log
165+
// the client off implicitly at the end of its timeout period. Because
166+
// of that, clients using IDLE are advised to terminate the IDLE and
167+
// re-issue it at least every 29 minutes to avoid being logged off.
168+
// This still allows a client to receive immediate mailbox updates even
169+
// though it need only "poll" at half hour intervals.
170+
self.last_idle
171+
// Check if the time since last_idle has exceeded the timeout.
172+
.map(|last_idle| {
173+
let time_since_idle = last_idle.elapsed();
174+
if time_since_idle >= self.timeout {
175+
Err(Error::Io(io::ErrorKind::TimedOut.into()))
176+
} else {
177+
Ok(time_since_idle)
178+
}
179+
})
180+
// If there's no self.last_idle, initialize the connection (and return a 0 time since idle).
181+
.unwrap_or_else(|| self.init().map(|()| Duration::ZERO))
182+
// Finally, if no error occurred, read from the stream.
183+
.map(|time_since_idle| {
184+
self.session
185+
.stream
186+
.get_mut()
187+
.set_read_timeout(Some(self.timeout - time_since_idle))
188+
.expect("cannot be Some(0) since time is monotonically increasing");
189+
self.session.readline(&mut v)
190+
})
191+
} {
140192
Err(Error::Io(ref e))
141193
if e.kind() == io::ErrorKind::TimedOut
142194
|| e.kind() == io::ErrorKind::WouldBlock =>
143195
{
196+
if self.keepalive {
197+
match self.terminate() {
198+
Ok(()) => {
199+
// The connection gets initialized again on the next iteration.
200+
continue,
201+
}
202+
Err(e) => break Err(e),
203+
}
204+
}
144205
break Ok(WaitOutcome::TimedOut);
145206
}
146207
Ok(_len) => {
@@ -183,60 +244,11 @@ impl<'a, T: Read + Write + 'a> Handle<'a, T> {
183244
};
184245
};
185246

186-
// Reconnect on timeout if needed
187-
match (reconnect, result) {
188-
(true, Ok(WaitOutcome::TimedOut)) => {
189-
self.terminate()?;
190-
self.init()?;
191-
self.wait_inner(reconnect, callback)
192-
}
193-
(_, result) => result,
194-
}
195-
}
196-
}
197-
198-
impl<'a, T: SetReadTimeout + Read + Write + 'a> Handle<'a, T> {
199-
/// Set the timeout duration on the connection. This will also set the frequency
200-
/// at which the connection is refreshed.
201-
///
202-
/// The interval defaults to 29 minutes as given in RFC 2177.
203-
pub fn timeout(&mut self, interval: Duration) -> &mut Self {
204-
self.timeout = interval;
205-
self
206-
}
207-
208-
/// Do not continuously refresh the IDLE connection in the background.
209-
///
210-
/// By default, connections will periodically be refreshed in the background using the
211-
/// timeout duration set by [`Handle::timeout`]. If you do not want this behaviour, call
212-
/// this function and the connection will simply IDLE until `wait_while` returns or
213-
/// the timeout expires.
214-
pub fn keepalive(&mut self, keepalive: bool) -> &mut Self {
215-
self.keepalive = keepalive;
216-
self
217-
}
247+
// set_read_timeout() can fail if the argument is Some(0), which can never be the
248+
// case here.
249+
self.session.stream.get_mut().set_read_timeout(None).unwrap();
218250

219-
/// Block until the given callback returns `false`, or until a response
220-
/// arrives that is not explicitly handled by [`UnsolicitedResponse`].
221-
pub fn wait_while<F>(&mut self, callback: F) -> Result<WaitOutcome>
222-
where
223-
F: FnMut(UnsolicitedResponse) -> bool,
224-
{
225-
self.init()?;
226-
// The server MAY consider a client inactive if it has an IDLE command
227-
// running, and if such a server has an inactivity timeout it MAY log
228-
// the client off implicitly at the end of its timeout period. Because
229-
// of that, clients using IDLE are advised to terminate the IDLE and
230-
// re-issue it at least every 29 minutes to avoid being logged off.
231-
// This still allows a client to receive immediate mailbox updates even
232-
// though it need only "poll" at half hour intervals.
233-
self.session
234-
.stream
235-
.get_mut()
236-
.set_read_timeout(Some(self.timeout))?;
237-
let res = self.wait_inner(self.keepalive, callback);
238-
let _ = self.session.stream.get_mut().set_read_timeout(None).is_ok();
239-
res
251+
result
240252
}
241253
}
242254

0 commit comments

Comments
 (0)