Skip to content

Commit bab6789

Browse files
committed
Keep track of absolute IDLE time for timeout
The code used to apply the set timeout value to the TcpStream before entering the IDLE loop. This effectively resets the timeout after receiving and handling incoming messages, which nullifies the purpose of the timeout when messages are received. This change remembers when the IDLE command is sent initially and uses that value to set the remaining time for the TcpStream timeout. This allows the IDLE loop to reconnect the IDLE connection at the appropriate time. Fixes #300.
1 parent 6fe22ed commit bab6789

File tree

1 file changed

+74
-63
lines changed

1 file changed

+74
-63
lines changed

src/extensions/idle.rs

+74-63
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use rustls_connector::TlsStream as RustlsStream;
1313
use std::io::{self, Read, Write};
1414
use std::net::TcpStream;
1515
use std::ops::DerefMut;
16-
use std::time::Duration;
16+
use std::time::{Duration, Instant};
1717

1818
/// `Handle` allows a client to block waiting for changes to the remote mailbox.
1919
///
@@ -58,7 +58,7 @@ pub struct Handle<'a, T: Read + Write> {
5858
session: &'a mut Session<T>,
5959
timeout: Duration,
6060
keepalive: bool,
61-
done: bool,
61+
last_idle: Option<Instant>,
6262
}
6363

6464
/// The result of a wait on a [`Handle`]
@@ -91,11 +91,13 @@ impl<'a, T: Read + Write + 'a> Handle<'a, T> {
9191
session,
9292
timeout: Duration::from_secs(29 * 60),
9393
keepalive: true,
94-
done: false,
94+
last_idle: None,
9595
}
9696
}
9797

9898
fn init(&mut self) -> Result<()> {
99+
self.last_idle = Some(Instant::now());
100+
99101
// https://tools.ietf.org/html/rfc2177
100102
//
101103
// The IDLE command takes no arguments.
@@ -108,39 +110,97 @@ impl<'a, T: Read + Write + 'a> Handle<'a, T> {
108110
let mut v = Vec::new();
109111
self.session.readline(&mut v)?;
110112
if v.starts_with(b"+") {
111-
self.done = false;
112113
return Ok(());
113114
}
114115

116+
self.last_idle = None;
115117
self.session.read_response_onto(&mut v)?;
116118
// We should *only* get a continuation on an error (i.e., it gives BAD or NO).
117119
unreachable!();
118120
}
119121

120122
fn terminate(&mut self) -> Result<()> {
121-
if !self.done {
122-
self.done = true;
123+
if let Some(_) = self.last_idle.take() {
123124
self.session.write_line(b"DONE")?;
124125
self.session.read_response().map(|_| ())
125126
} else {
126127
Ok(())
127128
}
128129
}
130+
}
129131

130-
/// Internal helper that doesn't consume self.
132+
impl<'a, T: SetReadTimeout + Read + Write + 'a> Handle<'a, T> {
133+
/// Set the timeout duration on the connection. This will also set the frequency
134+
/// at which the connection is refreshed.
131135
///
132-
/// This is necessary so that we can keep using the inner `Session` in `wait_while`.
133-
fn wait_inner<F>(&mut self, reconnect: bool, mut callback: F) -> Result<WaitOutcome>
136+
/// The interval defaults to 29 minutes as given in RFC 2177.
137+
pub fn timeout(&mut self, interval: Duration) -> &mut Self {
138+
self.timeout = interval;
139+
self
140+
}
141+
142+
/// Do not continuously refresh the IDLE connection in the background.
143+
///
144+
/// By default, connections will periodically be refreshed in the background using the
145+
/// timeout duration set by [`Handle::timeout`]. If you do not want this behaviour, call
146+
/// this function and the connection will simply IDLE until `wait_while` returns or
147+
/// the timeout expires.
148+
pub fn keepalive(&mut self, keepalive: bool) -> &mut Self {
149+
self.keepalive = keepalive;
150+
self
151+
}
152+
153+
/// Block until the given callback returns `false`, or until a response
154+
/// arrives that is not explicitly handled by [`UnsolicitedResponse`].
155+
pub fn wait_while<F>(&mut self, mut callback: F) -> Result<WaitOutcome>
134156
where
135157
F: FnMut(UnsolicitedResponse) -> bool,
136158
{
137159
let mut v = Vec::new();
138160
let result = loop {
139-
match self.session.readline(&mut v) {
161+
match {
162+
// The server MAY consider a client inactive if it has an IDLE command
163+
// running, and if such a server has an inactivity timeout it MAY log
164+
// the client off implicitly at the end of its timeout period. Because
165+
// of that, clients using IDLE are advised to terminate the IDLE and
166+
// re-issue it at least every 29 minutes to avoid being logged off.
167+
// This still allows a client to receive immediate mailbox updates even
168+
// though it need only "poll" at half hour intervals.
169+
self.last_idle
170+
// Check if the time since last_idle has exceeded the timeout.
171+
.map(|last_idle| {
172+
let time_since_idle = last_idle.elapsed();
173+
if time_since_idle >= self.timeout {
174+
Err(Error::Io(io::ErrorKind::TimedOut.into()))
175+
} else {
176+
Ok(time_since_idle)
177+
}
178+
})
179+
// If there's no self.last_idle, initialize the connection (and return a 0 time since idle).
180+
.unwrap_or_else(|| self.init().map(|()| Duration::ZERO))
181+
// Finally, if no error occurred, read from the stream.
182+
.map(|time_since_idle| {
183+
self.session
184+
.stream
185+
.get_mut()
186+
.set_read_timeout(Some(self.timeout - time_since_idle))
187+
.expect("cannot be Some(0) since time is monotonically increasing");
188+
self.session.readline(&mut v)
189+
})
190+
} {
140191
Err(Error::Io(ref e))
141192
if e.kind() == io::ErrorKind::TimedOut
142193
|| e.kind() == io::ErrorKind::WouldBlock =>
143194
{
195+
if self.keepalive {
196+
match self.terminate() {
197+
Ok(()) => {
198+
// The connection gets initialized again on the next iteration.
199+
continue;
200+
}
201+
Err(e) => break Err(e),
202+
}
203+
}
144204
break Ok(WaitOutcome::TimedOut);
145205
}
146206
Ok(_len) => {
@@ -183,60 +243,11 @@ impl<'a, T: Read + Write + 'a> Handle<'a, T> {
183243
};
184244
};
185245

186-
// Reconnect on timeout if needed
187-
match (reconnect, result) {
188-
(true, Ok(WaitOutcome::TimedOut)) => {
189-
self.terminate()?;
190-
self.init()?;
191-
self.wait_inner(reconnect, callback)
192-
}
193-
(_, result) => result,
194-
}
195-
}
196-
}
197-
198-
impl<'a, T: SetReadTimeout + Read + Write + 'a> Handle<'a, T> {
199-
/// Set the timeout duration on the connection. This will also set the frequency
200-
/// at which the connection is refreshed.
201-
///
202-
/// The interval defaults to 29 minutes as given in RFC 2177.
203-
pub fn timeout(&mut self, interval: Duration) -> &mut Self {
204-
self.timeout = interval;
205-
self
206-
}
207-
208-
/// Do not continuously refresh the IDLE connection in the background.
209-
///
210-
/// By default, connections will periodically be refreshed in the background using the
211-
/// timeout duration set by [`Handle::timeout`]. If you do not want this behaviour, call
212-
/// this function and the connection will simply IDLE until `wait_while` returns or
213-
/// the timeout expires.
214-
pub fn keepalive(&mut self, keepalive: bool) -> &mut Self {
215-
self.keepalive = keepalive;
216-
self
217-
}
246+
// set_read_timeout() can fail if the argument is Some(0), which can never be the
247+
// case here.
248+
self.session.stream.get_mut().set_read_timeout(None).unwrap();
218249

219-
/// Block until the given callback returns `false`, or until a response
220-
/// arrives that is not explicitly handled by [`UnsolicitedResponse`].
221-
pub fn wait_while<F>(&mut self, callback: F) -> Result<WaitOutcome>
222-
where
223-
F: FnMut(UnsolicitedResponse) -> bool,
224-
{
225-
self.init()?;
226-
// The server MAY consider a client inactive if it has an IDLE command
227-
// running, and if such a server has an inactivity timeout it MAY log
228-
// the client off implicitly at the end of its timeout period. Because
229-
// of that, clients using IDLE are advised to terminate the IDLE and
230-
// re-issue it at least every 29 minutes to avoid being logged off.
231-
// This still allows a client to receive immediate mailbox updates even
232-
// though it need only "poll" at half hour intervals.
233-
self.session
234-
.stream
235-
.get_mut()
236-
.set_read_timeout(Some(self.timeout))?;
237-
let res = self.wait_inner(self.keepalive, callback);
238-
let _ = self.session.stream.get_mut().set_read_timeout(None).is_ok();
239-
res
250+
result
240251
}
241252
}
242253

0 commit comments

Comments
 (0)