From 6cfcbfffc51ba259fdc2440b25a4cbe6ef3ea41f Mon Sep 17 00:00:00 2001 From: Stiopa Koltsov Date: Tue, 11 Feb 2020 02:11:11 +0000 Subject: [PATCH] Implement BufQueue type `BufQueue` is a `VecDeque` which implements `Buf` and caches the remaining size. This utility can be used to implement outgoing network buffer. Imagine this use case: ``` type Header = Cursor<[u8; 5]>; type Payload = Bytes; type MyMessage = Chain; outgoing_queue: BufDeque; ``` New messages can be queues by simply appending to this `BufQueue`, and the queue can be flushed by invoking `bytes_vectored` on the queue object. And both enqueue and flush operations are zero-copy. `remaining` cache is needed to optimize `bytes` operation, which might be expensive if the queue is long. I'm not 100% sure this is how `Bytes` is meant to be used, but this `BufQueue` type seems to be the most natural way. --- src/buf/buf_queue.rs | 101 +++++++++++++++++++++++++++++++ src/buf/mod.rs | 3 +- tests/test_buf_queue.rs | 131 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 234 insertions(+), 1 deletion(-) create mode 100644 src/buf/buf_queue.rs create mode 100644 tests/test_buf_queue.rs diff --git a/src/buf/buf_queue.rs b/src/buf/buf_queue.rs new file mode 100644 index 000000000..144caa920 --- /dev/null +++ b/src/buf/buf_queue.rs @@ -0,0 +1,101 @@ +use alloc::collections::VecDeque; +use crate::Buf; +use core::cmp; +#[cfg(feature = "std")] +use std::io::IoSlice; + +/// Ring buffer of buffers. +/// +/// `push` operation appends a buffer to the tail of the buffer, +/// and read operations (`bytes`, `bytes_vectored`, `advance` etc) +/// pop elements from the head of the buffer. +/// +/// This type can be used to implement an outgoing network buffer, +/// when the front of the queue is written to the network and the back +/// of the queue gets new messages. +/// +/// # Note +/// +/// This type caches the remaining size (sum of all remaining sizes of all buffers). +/// If buffers owned by this `BufQueue` get their remaining size modified +/// not through this type, the behavior is undefined: +/// operations may hang forever, panic or produce otherwise unexpected results +/// (but not violate memory access). +#[derive(Debug)] +pub struct BufQueue { + deque: VecDeque, + remaining: usize, +} + +impl BufQueue { + /// Create an empty queue. + pub fn new() -> Self { + BufQueue::default() + } + + /// Push a buf to the back of the deque. + /// + /// This operation is no-op if the buf has no remaining. + /// + /// # Panics + /// + /// This struct tracks the total remaining, and panics if + /// the total overflows `usize`. + pub fn push(&mut self, buf: B) { + let rem = buf.remaining(); + if rem != 0 { + self.deque.push_back(buf); + self.remaining = self.remaining.checked_add(rem).expect("remaining overflow"); + } + } +} + +impl Default for BufQueue { + fn default() -> Self { + BufQueue { + deque: VecDeque::default(), + remaining: 0, + } + } +} + +impl Buf for BufQueue { + fn remaining(&self) -> usize { + self.remaining + } + + fn bytes(&self) -> &[u8] { + match self.deque.front() { + Some(b) => b.bytes(), + None => &[], + } + } + + #[cfg(feature = "std")] + fn bytes_vectored<'a>(&'a self, mut dst: &mut [IoSlice<'a>]) -> usize { + let mut n = 0; + for b in &self.deque { + if dst.is_empty() { + break; + } + let next = b.bytes_vectored(dst); + dst = &mut dst[next..]; + n += next; + } + n + } + + fn advance(&mut self, mut cnt: usize) { + while cnt != 0 { + let front = self.deque.front_mut().expect("must not be empty"); + let rem = front.remaining(); + let advance = cmp::min(cnt, rem); + front.advance(advance); + if rem == advance { + self.deque.pop_front().unwrap(); + } + cnt -= advance; + self.remaining -= advance; + } + } +} diff --git a/src/buf/mod.rs b/src/buf/mod.rs index d4538f21e..9d610a782 100644 --- a/src/buf/mod.rs +++ b/src/buf/mod.rs @@ -21,6 +21,7 @@ mod buf_mut; pub mod ext; mod iter; mod vec_deque; +mod buf_queue; pub use self::buf_impl::Buf; pub use self::buf_mut::BufMut; @@ -28,4 +29,4 @@ pub use self::ext::{BufExt, BufMutExt}; #[cfg(feature = "std")] pub use self::buf_mut::IoSliceMut; pub use self::iter::IntoIter; - +pub use self::buf_queue::BufQueue; diff --git a/tests/test_buf_queue.rs b/tests/test_buf_queue.rs new file mode 100644 index 000000000..de5e470ea --- /dev/null +++ b/tests/test_buf_queue.rs @@ -0,0 +1,131 @@ +#![deny(warnings, rust_2018_idioms)] + +use bytes::buf::BufQueue; +use bytes::Bytes; +use bytes::Buf; +use std::collections::VecDeque; +use std::cmp; +use std::io::IoSlice; + + +#[test] +fn simple() { + let mut queue = BufQueue::new(); + queue.push(Bytes::copy_from_slice(b"abc")); + queue.push(Bytes::copy_from_slice(b"de")); + assert_eq!(5, queue.remaining()); + assert_eq!(b"abc", queue.bytes()); + queue.advance(1); + assert_eq!(4, queue.remaining()); + assert_eq!(b"bc", queue.bytes()); + queue.advance(2); + assert_eq!(2, queue.remaining()); + assert_eq!(b"de", queue.bytes()); + queue.push(Bytes::copy_from_slice(b"fgh")); + assert_eq!(5, queue.remaining()); + assert_eq!(b"de", queue.bytes()); + // advance past front bytes + queue.advance(4); + assert_eq!(1, queue.remaining()); + assert_eq!(b"h", queue.bytes()); + queue.advance(1); + assert_eq!(0, queue.remaining()); + assert_eq!(b"", queue.bytes()); +} + +struct Rng { + state: u32, +} + +impl Rng { + // copy-paste from https://en.wikipedia.org/wiki/Xorshift + fn next(&mut self) -> u32 { + let mut x = self.state; + x ^= x << 13; + x ^= x >> 17; + x ^= x << 5; + self.state = x; + x + } +} + +#[test] +fn random() { + let mut rng = Rng { state: 1 }; + + // Update these two synchronously + let mut correct: VecDeque = Default::default(); + let mut testing: BufQueue> = Default::default(); + + for _ in 0..10000 { + // uncomment to have a look at what is tested + //println!("{:?}", testing); + + assert_eq!(correct.remaining(), testing.remaining()); + + let bytes = testing.bytes(); + assert!(correct.len() == 0 || bytes.len() != 0); + assert_eq!(bytes, &correct.iter().cloned().take(bytes.len()).collect::>()[..]); + + if correct.len() >= 1000 || rng.next() % 2 == 0 { + let take = cmp::min(rng.next() as usize % 10, correct.len()); + testing.advance(take); + correct.advance(take); + } else { + let mut inner = BufQueue::new(); + + let inner_len = rng.next() % 3; + for _ in 0..inner_len { + let bytes_len = rng.next() % 5; + let v: Vec = (0..bytes_len).map(|_| rng.next() as u8).collect(); + correct.extend(&v); + inner.push(Bytes::from(v)); + } + + testing.push(inner); + + assert_eq!(correct.len(), testing.remaining()); + } + } +} + +#[test] +fn vectored() { + let mut v: BufQueue> = Default::default(); + v.push({ + let mut i = BufQueue::new(); + i.push(Bytes::copy_from_slice(b"ab")); + i.push(Bytes::copy_from_slice(b"cde")); + i + }); + v.push({ + let mut i = BufQueue::new(); + i.push(Bytes::copy_from_slice(b"fg")); + i + }); + + let zero = &mut []; + assert_eq!(0, v.bytes_vectored(zero)); + + let mut one = [IoSlice::new(&[])]; + assert_eq!(1, v.bytes_vectored(&mut one)); + assert_eq!(b"ab", &*one[0]); + + let mut two = [IoSlice::new(&[]), IoSlice::new(&[])]; + assert_eq!(2, v.bytes_vectored(&mut two)); + assert_eq!(b"ab", &*two[0]); + assert_eq!(b"cde", &*two[1]); + + let mut three = [IoSlice::new(&[]), IoSlice::new(&[]), IoSlice::new(&[])]; + assert_eq!(3, v.bytes_vectored(&mut three)); + assert_eq!(b"ab", &*three[0]); + assert_eq!(b"cde", &*three[1]); + assert_eq!(b"fg", &*three[2]); + + let mut four = [IoSlice::new(&[]), IoSlice::new(&[]), IoSlice::new(&[]), IoSlice::new(&[])]; + assert_eq!(3, v.bytes_vectored(&mut four)); + assert_eq!(b"ab", &*four[0]); + assert_eq!(b"cde", &*four[1]); + assert_eq!(b"fg", &*four[2]); + assert_eq!(b"", &*four[3]); +}