diff --git a/parquet/src/column/page.rs b/parquet/src/column/page.rs index 23517f05df11..f18b296c1c65 100644 --- a/parquet/src/column/page.rs +++ b/parquet/src/column/page.rs @@ -31,7 +31,7 @@ use crate::file::statistics::{Statistics, page_stats_to_thrift}; /// List of supported pages. /// These are 1-to-1 mapped from the equivalent Thrift definitions, except `buf` which /// used to store uncompressed bytes of the page. -#[derive(Clone)] +#[derive(Clone, Debug)] pub enum Page { /// Data page Parquet format v1. DataPage { diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs index b8ff38efa3c4..ebde79e6a7f2 100644 --- a/parquet/src/column/reader.rs +++ b/parquet/src/column/reader.rs @@ -569,11 +569,16 @@ fn parse_v1_level( match encoding { Encoding::RLE => { let i32_size = std::mem::size_of::(); - let data_size = read_num_bytes::(i32_size, buf.as_ref()) as usize; - Ok(( - i32_size + data_size, - buf.slice(i32_size..i32_size + data_size), - )) + if i32_size <= buf.len() { + let data_size = read_num_bytes::(i32_size, buf.as_ref()) as usize; + let end = i32_size + .checked_add(data_size) + .ok_or(general_err!("invalid level length"))?; + if end <= buf.len() { + return Ok((end, buf.slice(i32_size..end))); + } + } + Err(general_err!("not enough data to read levels")) } #[allow(deprecated)] Encoding::BIT_PACKED => { @@ -597,6 +602,25 @@ mod tests { use crate::util::test_common::page_util::InMemoryPageReader; use crate::util::test_common::rand_gen::make_pages; + #[test] + fn test_parse_v1_level_invalid_length() { + // Say length is 10, but buffer is only 4 + let buf = Bytes::from(vec![10, 0, 0, 0]); + let err = parse_v1_level(1, 100, Encoding::RLE, buf).unwrap_err(); + assert_eq!( + err.to_string(), + "Parquet error: not enough data to read levels" + ); + + // Say length is 4, but buffer is only 3 + let buf = Bytes::from(vec![4, 0, 0]); + let err = parse_v1_level(1, 100, Encoding::RLE, buf).unwrap_err(); + assert_eq!( + err.to_string(), + "Parquet error: not enough data to read levels" + ); + } + const NUM_LEVELS: usize = 128; const NUM_PAGES: usize = 2; const MAX_DEF_LEVEL: i16 = 5; diff --git a/parquet/src/encodings/decoding.rs b/parquet/src/encodings/decoding.rs index 91b31dbdfcd2..b5ec5d1c1af5 100644 --- a/parquet/src/encodings/decoding.rs +++ b/parquet/src/encodings/decoding.rs @@ -381,7 +381,17 @@ impl DictDecoder { impl Decoder for DictDecoder { fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> { // First byte in `data` is bit width + if data.is_empty() { + return Err(eof_err!("Not enough bytes to decode bit_width")); + } + let bit_width = data.as_ref()[0]; + if bit_width > 32 { + return Err(general_err!( + "Invalid or corrupted RLE bit width {}. Max allowed is 32", + bit_width + )); + } let mut rle_decoder = RleDecoder::new(bit_width); rle_decoder.set_data(data.slice(1..)); self.num_values = num_values; @@ -1380,6 +1390,13 @@ mod tests { test_plain_skip::(Bytes::from(data_bytes), 3, 6, 4, &[]); } + #[test] + fn test_dict_decoder_empty_data() { + let mut decoder = DictDecoder::::new(); + let err = decoder.set_data(Bytes::new(), 10).unwrap_err(); + assert_eq!(err.to_string(), "EOF: Not enough bytes to decode bit_width"); + } + fn test_plain_decode( data: Bytes, num_values: usize, diff --git a/parquet/src/encodings/rle.rs b/parquet/src/encodings/rle.rs index db8227fcac3a..41c050132064 100644 --- a/parquet/src/encodings/rle.rs +++ b/parquet/src/encodings/rle.rs @@ -513,7 +513,10 @@ impl RleDecoder { self.rle_left = (indicator_value >> 1) as u32; let value_width = bit_util::ceil(self.bit_width as usize, 8); self.current_value = bit_reader.get_aligned::(value_width); - assert!(self.current_value.is_some()); + assert!( + self.current_value.is_some(), + "parquet_data_error: not enough data for RLE decoding" + ); } true } else { diff --git a/parquet/src/file/reader.rs b/parquet/src/file/reader.rs index 61af21a68ec1..3adf10fac220 100644 --- a/parquet/src/file/reader.rs +++ b/parquet/src/file/reader.rs @@ -124,11 +124,25 @@ impl ChunkReader for Bytes { fn get_read(&self, start: u64) -> Result { let start = start as usize; + if start > self.len() { + return Err(eof_err!( + "Expected to read at offset {start}, while file has length {}", + self.len() + )); + } Ok(self.slice(start..).reader()) } fn get_bytes(&self, start: u64, length: usize) -> Result { let start = start as usize; + if start > self.len() || start + length > self.len() { + return Err(eof_err!( + "Expected to read {} bytes at offset {}, while file has length {}", + length, + start, + self.len() + )); + } Ok(self.slice(start..start + length)) } } @@ -274,3 +288,34 @@ impl Iterator for FilePageIterator { } impl PageIterator for FilePageIterator {} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_bytes_chunk_reader_get_read_out_of_bounds() { + let data = Bytes::from(vec![0, 1, 2, 3]); + let err = data.get_read(5).unwrap_err(); + assert_eq!( + err.to_string(), + "EOF: Expected to read at offset 5, while file has length 4" + ); + } + + #[test] + fn test_bytes_chunk_reader_get_bytes_out_of_bounds() { + let data = Bytes::from(vec![0, 1, 2, 3]); + let err = data.get_bytes(5, 1).unwrap_err(); + assert_eq!( + err.to_string(), + "EOF: Expected to read 1 bytes at offset 5, while file has length 4" + ); + + let err = data.get_bytes(2, 3).unwrap_err(); + assert_eq!( + err.to_string(), + "EOF: Expected to read 3 bytes at offset 2, while file has length 4" + ); + } +} diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index 6da5c39d745b..3f95ea9d4982 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -392,6 +392,9 @@ pub(crate) fn decode_page( let buffer = match decompressor { Some(decompressor) if can_decompress => { let uncompressed_page_size = usize::try_from(page_header.uncompressed_page_size)?; + if offset > buffer.len() || offset > uncompressed_page_size { + return Err(general_err!("Invalid page header")); + } let decompressed_size = uncompressed_page_size - offset; let mut decompressed = Vec::with_capacity(uncompressed_page_size); decompressed.extend_from_slice(&buffer.as_ref()[..offset]); @@ -458,7 +461,10 @@ pub(crate) fn decode_page( } _ => { // For unknown page type (e.g., INDEX_PAGE), skip and read next. - unimplemented!("Page type {:?} is not supported", page_header.r#type) + return Err(general_err!( + "Page type {:?} is not supported", + page_header.r#type + )); } }; @@ -1130,6 +1136,7 @@ mod tests { use crate::column::reader::ColumnReader; use crate::data_type::private::ParquetValueType; use crate::data_type::{AsBytes, FixedLenByteArrayType, Int32Type}; + use crate::file::metadata::thrift::DataPageHeaderV2; #[allow(deprecated)] use crate::file::page_index::index_reader::{read_columns_indexes, read_offset_indexes}; use crate::file::writer::SerializedFileWriter; @@ -1139,6 +1146,72 @@ mod tests { use super::*; + #[test] + fn test_decode_page_invalid_offset() { + let page_header = PageHeader { + r#type: PageType::DATA_PAGE_V2, + uncompressed_page_size: 10, + compressed_page_size: 10, + data_page_header: None, + index_page_header: None, + dictionary_page_header: None, + crc: None, + data_page_header_v2: Some(DataPageHeaderV2 { + num_nulls: 0, + num_rows: 0, + num_values: 0, + encoding: Encoding::PLAIN, + definition_levels_byte_length: 11, + repetition_levels_byte_length: 0, + is_compressed: None, + statistics: None, + }), + }; + + let buffer = Bytes::new(); + let err = decode_page(page_header, buffer, Type::INT32, None).unwrap_err(); + assert!( + err.to_string() + .contains("DataPage v2 header contains implausible values") + ); + } + + #[test] + fn test_decode_unsupported_page() { + let mut page_header = PageHeader { + r#type: PageType::INDEX_PAGE, + uncompressed_page_size: 10, + compressed_page_size: 10, + data_page_header: None, + index_page_header: None, + dictionary_page_header: None, + crc: None, + data_page_header_v2: None, + }; + let buffer = Bytes::new(); + let err = decode_page(page_header.clone(), buffer.clone(), Type::INT32, None).unwrap_err(); + assert_eq!( + err.to_string(), + "Parquet error: Page type INDEX_PAGE is not supported" + ); + + page_header.data_page_header_v2 = Some(DataPageHeaderV2 { + num_nulls: 0, + num_rows: 0, + num_values: 0, + encoding: Encoding::PLAIN, + definition_levels_byte_length: 11, + repetition_levels_byte_length: 0, + is_compressed: None, + statistics: None, + }); + let err = decode_page(page_header, buffer, Type::INT32, None).unwrap_err(); + assert!( + err.to_string() + .contains("DataPage v2 header contains implausible values") + ); + } + #[test] fn test_cursor_and_file_has_the_same_behaviour() { let mut buf: Vec = Vec::new(); diff --git a/parquet/src/schema/types.rs b/parquet/src/schema/types.rs index 1ae37d0a462f..de6f855685a6 100644 --- a/parquet/src/schema/types.rs +++ b/parquet/src/schema/types.rs @@ -1348,19 +1348,23 @@ fn schema_from_array_helper<'a>( .with_logical_type(logical_type) .with_fields(fields) .with_id(field_id); - if let Some(rep) = repetition { - // Sometimes parquet-cpp and parquet-mr set repetition level REQUIRED or - // REPEATED for root node. - // - // We only set repetition for group types that are not top-level message - // type. According to parquet-format: - // Root of the schema does not have a repetition_type. - // All other types must have one. - if !is_root_node { - builder = builder.with_repetition(rep); - } + + // Sometimes parquet-cpp and parquet-mr set repetition level REQUIRED or + // REPEATED for root node. + // + // We only set repetition for group types that are not top-level message + // type. According to parquet-format: + // Root of the schema does not have a repetition_type. + // All other types must have one. + if !is_root_node { + let Some(rep) = repetition else { + return Err(general_err!( + "Repetition level must be defined for non-root types" + )); + }; + builder = builder.with_repetition(rep); } - Ok((next_index, Arc::new(builder.build().unwrap()))) + Ok((next_index, Arc::new(builder.build()?))) } } } diff --git a/parquet/tests/arrow_reader/bad_data.rs b/parquet/tests/arrow_reader/bad_data.rs index 235f81812468..54c92976e41c 100644 --- a/parquet/tests/arrow_reader/bad_data.rs +++ b/parquet/tests/arrow_reader/bad_data.rs @@ -84,10 +84,12 @@ fn test_parquet_1481() { } #[test] -#[should_panic(expected = "assertion failed: self.current_value.is_some()")] fn test_arrow_gh_41321() { let err = read_file("ARROW-GH-41321.parquet").unwrap_err(); - assert_eq!(err.to_string(), "TBD (currently panics)"); + assert_eq!( + err.to_string(), + "External: Parquet argument error: Parquet error: Invalid or corrupted RLE bit width 254. Max allowed is 32" + ); } #[test]