Skip to content

Commit 09eade5

Browse files
committed
Message length predicting parser
1 parent 17db0dd commit 09eade5

File tree

5 files changed

+455
-1
lines changed

5 files changed

+455
-1
lines changed

rmp/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
[package]
22
name = "rmp"
33
version = "0.8.14"
4-
authors = ["Evgeny Safronov <[email protected]>"]
4+
authors = ["Evgeny Safronov <[email protected]>", "Kornel <[email protected]>"]
55
license = "MIT"
66
description = "Pure Rust MessagePack serialization implementation"
77
repository = "https://github.com/3Hren/msgpack-rust"

rmp/src/decode/est.rs

+332
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,332 @@
1+
use std::num::{NonZeroU32, NonZeroUsize};
2+
use crate::Marker;
3+
4+
/// Incremental MessagePack parser that can parse incomplete messages,
5+
/// and report their estimated total length.
6+
pub struct MessageLen {
7+
/// The last operation interrupted
8+
wip: Option<WIP>,
9+
/// Max size estimate
10+
max_position: NonZeroUsize,
11+
/// Bytes read so far
12+
position: usize,
13+
/// Stack of open arrays and maps
14+
/// It is not a complete stack. Used only when resumption is needed.
15+
sequences_wip: Vec<Seq>,
16+
/// Nesting of arrays and maps
17+
current_depth: u16,
18+
/// Configured limit
19+
max_depth: u16,
20+
/// Configured limit
21+
max_len: u32,
22+
}
23+
24+
/// [`MessageLen`] result
25+
#[derive(Debug)]
26+
pub enum LenError {
27+
/// The message is truncated, and needs at least this many bytes to parse
28+
Truncated(NonZeroUsize),
29+
/// The message is invalid or exceeded size limits
30+
ParseError,
31+
}
32+
33+
impl LenError {
34+
/// Get expected min length or 0 on error
35+
pub fn len(&self) -> usize {
36+
match *self {
37+
Self::ParseError => 0,
38+
Self::Truncated(l) => l.get(),
39+
}
40+
}
41+
}
42+
43+
impl MessageLen {
44+
/// New parser with default limits
45+
///
46+
/// If you have all MessagePack data in memory already, you can use [`MessageLen::len_of`].
47+
/// If you're reading data in a streaming fashion, you can feed chunks of data
48+
/// to [`MessageLen::incremental_len`].
49+
pub fn new() -> Self {
50+
Self::with_limits(1024, (u32::MAX as usize).min(isize::MAX as usize / 2))
51+
}
52+
53+
/// * `max_depth` limits nesting of arrays and maps
54+
///
55+
/// * `max_len` is maximum size of any string, byte string, map, or array.
56+
/// For maps and arrays this is the number of items, not bytes.
57+
///
58+
/// Messages can be both deep and wide, being `max_depth` * `max_len` in size.
59+
/// You should also limit the maximum byte size of the message (outside of this parser).
60+
pub fn with_limits(max_depth: usize, max_len: usize) -> Self {
61+
Self {
62+
max_position: NonZeroUsize::new(1).unwrap(),
63+
position: 0,
64+
current_depth: 0,
65+
max_depth: max_depth.min(u16::MAX as _) as u16,
66+
max_len: max_len.min(u32::MAX as _) as u32,
67+
sequences_wip: Vec::new(),
68+
wip: Some(WIP::NextMarker),
69+
}
70+
}
71+
72+
/// Parse the entire message to find if it's complete, and what is its serialized size in bytes.
73+
///
74+
/// If it returns `Ok(len)`, then the first `len` bytes of the given slice
75+
/// parse as a single MessagePack object.
76+
/// The length may be shorter than the slice given (extra data is gracefully ignored).
77+
///
78+
/// `Err(LenError::Truncated(len))` means that the the object is incomplete, the slice is truncated,
79+
/// and it would need *at least* this many bytes to parse.
80+
/// The `len` is always the lower bound, and never exceeds actual message length.
81+
///
82+
/// `Err(LenError::ParseError)` — the end of the message is unknown.
83+
///
84+
/// Don't call this function in a loop. Use [`MessageLen::incremental_len`] instead.
85+
pub fn len_of(complete_message: &[u8]) -> Result<usize, LenError> {
86+
Self::with_limits(1024, 1<<30).incremental_len(&mut complete_message.as_ref())
87+
}
88+
89+
/// Parse more bytes, and re-evaluate required message length.
90+
///
91+
/// This function is stateful and keeps "appending" the data to its evaluation.
92+
///
93+
/// * `Ok(len)` — size of the whole MessagePack object, in bytes, starting at the beginning
94+
/// of all data given to this function, including previous calls (not just this slice).
95+
/// The object's data may end before the end of this slice. In such case the extra bytes
96+
/// are gracefully ignored, and have not been parsed.
97+
///
98+
/// * `Err(LenError::Truncated(len))` — all bytes of this slice have been consumed,
99+
/// and that was still not enough. The object needs at least `len` bytes in total
100+
/// (counting from the start of all data given to this function, not just this slice).
101+
/// The `len` is always the lower bound, and never exceeds actual message length,
102+
/// so it's safe to read the additional bytes without overshooting the end of the message.
103+
///
104+
/// * `Err(LenError::ParseError)` — the end of the message cannot be determined, and this
105+
/// is a non-recoverable error. Any further calls to this function may return nonsense.
106+
pub fn incremental_len(&mut self, mut next_message_fragment: &[u8]) -> Result<usize, LenError> {
107+
let data = &mut next_message_fragment;
108+
let wip = match self.wip.take() {
109+
Some(wip) => wip,
110+
None => return Ok(self.position), // must have succeded already
111+
};
112+
match wip {
113+
WIP::Data(Data { bytes_left }) => self.skip_data(data, bytes_left.get()),
114+
WIP::MarkerLen(wip) => self.read_marker_with_len(data, wip),
115+
WIP::NextMarker => self.read_one_item(data),
116+
WIP::LimitExceeded => {
117+
self.wip = Some(WIP::LimitExceeded); // put it back!
118+
return Err(LenError::ParseError);
119+
},
120+
}.ok_or(LenError::Truncated(self.max_position))?;
121+
122+
while let Some(seq) = self.sequences_wip.pop() {
123+
self.current_depth = seq.depth;
124+
debug_assert!(self.wip.is_none());
125+
self.read_sequence(data, seq.items_left.get() - 1).ok_or(LenError::Truncated(self.max_position))?;
126+
}
127+
debug_assert!(self.wip.is_none());
128+
debug_assert!(self.max_position.get() <= self.position);
129+
Ok(self.position)
130+
}
131+
132+
/// Forget all the state. The next call to `incremental_len` will assume it's the start of a new message.
133+
pub fn reset(&mut self) {
134+
self.max_position = NonZeroUsize::new(1).unwrap();
135+
self.position = 0;
136+
self.current_depth = 0;
137+
self.sequences_wip.clear();
138+
self.wip = Some(WIP::NextMarker);
139+
}
140+
141+
fn read_one_item(&mut self, data: &mut &[u8]) -> Option<()> {
142+
debug_assert!(self.wip.is_none());
143+
let marker = self.read_marker(data)?;
144+
match marker {
145+
Marker::FixPos(_) => Some(()),
146+
Marker::FixMap(len) => self.read_sequence(data, u32::from(len) * 2),
147+
Marker::FixArray(len) => self.read_sequence(data, u32::from(len)),
148+
Marker::FixStr(len) => self.skip_data(data, len.into()),
149+
Marker::Null |
150+
Marker::Reserved |
151+
Marker::False |
152+
Marker::True => Some(()),
153+
Marker::Str8 |
154+
Marker::Str16 |
155+
Marker::Str32 |
156+
Marker::Bin8 |
157+
Marker::Bin16 |
158+
Marker::Bin32 |
159+
Marker::Array16 |
160+
Marker::Array32 |
161+
Marker::Map16 |
162+
Marker::Map32 => self.read_marker_with_len(data, MarkerLen { marker, buf: [0; 4], has: 0 }),
163+
Marker::Ext8 |
164+
Marker::Ext16 |
165+
Marker::Ext32 => todo!(),
166+
Marker::F32 => self.skip_data(data, 4),
167+
Marker::F64 => self.skip_data(data, 8),
168+
Marker::U8 => self.skip_data(data, 1),
169+
Marker::U16 => self.skip_data(data, 2),
170+
Marker::U32 => self.skip_data(data, 4),
171+
Marker::U64 => self.skip_data(data, 8),
172+
Marker::I8 => self.skip_data(data, 1),
173+
Marker::I16 => self.skip_data(data, 2),
174+
Marker::I32 => self.skip_data(data, 4),
175+
Marker::I64 => self.skip_data(data, 8),
176+
Marker::FixExt1 |
177+
Marker::FixExt2 |
178+
Marker::FixExt4 |
179+
Marker::FixExt8 |
180+
Marker::FixExt16 => todo!(),
181+
Marker::FixNeg(_) => Some(()),
182+
}
183+
}
184+
185+
fn read_marker_with_len(&mut self, data: &mut &[u8], mut wip: MarkerLen) -> Option<()> {
186+
let size = wip.size();
187+
debug_assert!(wip.has < size && size > 0 && size <= 4);
188+
let dest = &mut wip.buf[0..size as usize];
189+
let wanted = dest.len().checked_sub(wip.has as _)?;
190+
191+
let taken = self.take_bytes(data, wanted as u32);
192+
dest[wip.has as usize..][..taken.len()].copy_from_slice(taken);
193+
wip.has += taken.len() as u8;
194+
if wip.has < size {
195+
return self.fail(WIP::MarkerLen(wip));
196+
}
197+
let len = match dest.len() {
198+
1 => dest[0].into(),
199+
2 => u16::from_be_bytes(dest.try_into().unwrap()).into(),
200+
4 => u32::from_be_bytes(dest.try_into().unwrap()),
201+
_ => {
202+
debug_assert!(false);
203+
return None
204+
},
205+
};
206+
if len >= self.max_len {
207+
return self.fail(WIP::LimitExceeded);
208+
}
209+
match wip.marker {
210+
Marker::Bin8 |
211+
Marker::Bin16 |
212+
Marker::Bin32 |
213+
Marker::Str8 |
214+
Marker::Str16 |
215+
Marker::Str32 => self.skip_data(data, len),
216+
Marker::Ext8 |
217+
Marker::Ext16 |
218+
Marker::Ext32 => todo!(),
219+
Marker::Array16 |
220+
Marker::Array32 => self.read_sequence(data, len),
221+
Marker::Map16 |
222+
Marker::Map32 => {
223+
if let Some(len) = len.checked_mul(2).filter(|&l| l < self.max_len) {
224+
self.read_sequence(data, len)
225+
} else {
226+
self.fail(WIP::LimitExceeded)
227+
}
228+
},
229+
_ => {
230+
debug_assert!(false);
231+
None
232+
}
233+
}
234+
}
235+
236+
fn read_sequence(&mut self, data: &mut &[u8], mut items_left: u32) -> Option<()> {
237+
self.current_depth += 1;
238+
if self.current_depth > self.max_depth {
239+
return self.fail(WIP::LimitExceeded);
240+
}
241+
while let Some(non_zero) = NonZeroU32::new(items_left) {
242+
let position_before_item = self.position;
243+
self.read_one_item(data).or_else(|| {
244+
self.set_max_position(position_before_item + items_left as usize);
245+
// -1, because it will increase depth again when resumed
246+
self.sequences_wip.push(Seq { items_left: non_zero, depth: self.current_depth-1 });
247+
None
248+
})?;
249+
items_left -= 1;
250+
}
251+
debug_assert!(self.current_depth > 0);
252+
self.current_depth -= 1;
253+
Some(())
254+
}
255+
256+
fn skip_data(&mut self, data: &mut &[u8], wanted: u32) -> Option<()> {
257+
let taken = self.take_bytes(data, wanted);
258+
if let Some(bytes_left) = NonZeroU32::new(wanted - taken.len() as u32) {
259+
debug_assert!(data.is_empty());
260+
self.fail(WIP::Data(Data { bytes_left }))
261+
} else {
262+
Some(())
263+
}
264+
}
265+
266+
fn read_marker(&mut self, data: &mut &[u8]) -> Option<Marker> {
267+
let Some((&b, rest)) = data.split_first() else {
268+
debug_assert!(data.is_empty());
269+
return self.fail(WIP::NextMarker);
270+
};
271+
self.position += 1;
272+
*data = rest;
273+
Some(Marker::from_u8(b))
274+
}
275+
276+
fn set_max_position(&mut self, position: usize) {
277+
self.max_position = NonZeroUsize::new(self.max_position.get().max(position)).unwrap();
278+
}
279+
280+
/// May return less than requested
281+
fn take_bytes<'data>(&mut self, data: &mut &'data [u8], wanted: u32) -> &'data [u8] {
282+
let (taken, rest) = data.split_at(data.len().min(wanted as usize));
283+
self.position += taken.len();
284+
*data = rest;
285+
taken
286+
}
287+
288+
#[inline(always)]
289+
fn fail<T>(&mut self, wip: WIP) -> Option<T> {
290+
debug_assert!(self.wip.is_none());
291+
let pos = match self.wip.insert(wip) {
292+
WIP::NextMarker => self.position + 1,
293+
WIP::Data(Data { bytes_left }) => self.position + bytes_left.get() as usize,
294+
WIP::MarkerLen(m) => self.position + (m.size() - m.has) as usize,
295+
WIP::LimitExceeded => 0,
296+
};
297+
self.set_max_position(pos);
298+
None
299+
}
300+
}
301+
302+
enum WIP {
303+
NextMarker,
304+
Data(Data),
305+
MarkerLen(MarkerLen),
306+
LimitExceeded,
307+
}
308+
309+
struct Seq { items_left: NonZeroU32, depth: u16 }
310+
struct Data { bytes_left: NonZeroU32 }
311+
struct MarkerLen { marker: Marker, buf: [u8; 4], has: u8 }
312+
313+
impl MarkerLen {
314+
fn size(&self) -> u8 {
315+
match self.marker {
316+
Marker::Bin8 => 1,
317+
Marker::Bin16 => 2,
318+
Marker::Bin32 => 4,
319+
Marker::Ext8 => 1,
320+
Marker::Ext16 => 2,
321+
Marker::Ext32 => 4,
322+
Marker::Str8 => 1,
323+
Marker::Str16 => 2,
324+
Marker::Str32 => 4,
325+
Marker::Array16 => 2,
326+
Marker::Array32 => 4,
327+
Marker::Map16 => 2,
328+
Marker::Map32 => 4,
329+
_ => unimplemented!(),
330+
}
331+
}
332+
}

rmp/src/decode/mod.rs

+5
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,11 @@ mod sint;
1515
mod str;
1616
mod uint;
1717

18+
#[cfg(feature = "std")]
19+
mod est;
20+
#[cfg(feature = "std")]
21+
pub use est::{MessageLen, LenError};
22+
1823
pub use self::dec::{read_f32, read_f64};
1924
pub use self::ext::{
2025
read_ext_meta, read_fixext1, read_fixext16, read_fixext2, read_fixext4, read_fixext8, ExtMeta,

0 commit comments

Comments
 (0)