Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 28 additions & 5 deletions parquet/src/column/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -569,11 +569,15 @@ fn parse_v1_level(
match encoding {
Encoding::RLE => {
let i32_size = std::mem::size_of::<i32>();
let data_size = read_num_bytes::<i32>(i32_size, buf.as_ref()) as usize;
Ok((
i32_size + data_size,
buf.slice(i32_size..i32_size + data_size),
))
if i32_size <= buf.len() {
let data_size = read_num_bytes::<i32>(i32_size, buf.as_ref()) as usize;
let end =
i32_size.checked_add(data_size).ok_or(general_err!("invalid level length"))?;
if end <= buf.len() {
return Ok((end, buf.slice(i32_size..end)));
}
}
Err(general_err!("not enough data to read levels"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is definitely an improvement over the existing code, but it opens a question:

Given that we're reading bytes from a byte buffer, it seems like we must expect to hit this situation at least occasionally? And the correct response is to fetch more bytes, not fail? Is there some mechanism for handling that higher up in the call stack? Or is there some reason it should be impossible for this code to run off the end of the buffer?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also -- it seems like read_num_bytes should do bounds checking internally and return Option<T>, so buffer overrun is obvious at the call site instead of a hidden panic footgun? The method has a half dozen other callers, and they all need to do manual bounds checking, in various ways and with varying degrees of safety. In particular, parquet/src/data_type.rs has two call sites that lack any visible bounds checks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this particular instance we're reading a buffer that should contain an entire page of data. If it doesn't, that likely points to a problem with the metadata.

Changes to read_num_bytes would likely need more careful consideration as I suspect it might be used in some performance critical sections.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think changing read_num_bytes to return Option would be a good idea, as that would essentially replace the assert in that method. That assert is currently redundant if the caller already checks the bounds, so those two checks would be replaced by one against the option. In the BitReader that might actually improve performance.

}
#[allow(deprecated)]
Encoding::BIT_PACKED => {
Expand All @@ -597,6 +601,25 @@ mod tests {
use crate::util::test_common::page_util::InMemoryPageReader;
use crate::util::test_common::rand_gen::make_pages;

#[test]
fn test_parse_v1_level_invalid_length() {
// Say length is 10, but buffer is only 4
let buf = Bytes::from(vec![10, 0, 0, 0]);
let err = parse_v1_level(1, 100, Encoding::RLE, buf).unwrap_err();
assert_eq!(
err.to_string(),
"Parquet error: not enough data to read levels"
);

// Say length is 4, but buffer is only 3
let buf = Bytes::from(vec![4, 0, 0]);
let err = parse_v1_level(1, 100, Encoding::RLE, buf).unwrap_err();
assert_eq!(
err.to_string(),
"Parquet error: not enough data to read levels"
);
}

const NUM_LEVELS: usize = 128;
const NUM_PAGES: usize = 2;
const MAX_DEF_LEVEL: i16 = 5;
Expand Down
20 changes: 20 additions & 0 deletions parquet/src/encodings/decoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,17 @@ impl<T: DataType> DictDecoder<T> {
impl<T: DataType> Decoder<T> for DictDecoder<T> {
fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
// First byte in `data` is bit width
if data.len() < 1 {
return Err(eof_err!("Not enough bytes to decode bit_width"));
}

let bit_width = data.as_ref()[0];
if bit_width > 32 {
return Err(general_err!(
"Invalid or corrupted RLE bit width {}. Max allowed is 32",
bit_width
));
}
let mut rle_decoder = RleDecoder::new(bit_width);
rle_decoder.set_data(data.slice(1..));
self.num_values = num_values;
Expand Down Expand Up @@ -1380,6 +1390,16 @@ mod tests {
test_plain_skip::<FixedLenByteArrayType>(Bytes::from(data_bytes), 3, 6, 4, &[]);
}

#[test]
fn test_dict_decoder_empty_data() {
let mut decoder = DictDecoder::<Int32Type>::new();
let err = decoder.set_data(Bytes::new(), 10).unwrap_err();
assert_eq!(
err.to_string(),
"Parquet error: Not enough bytes to decode bit_width"
);
}

fn test_plain_decode<T: DataType>(
data: Bytes,
num_values: usize,
Expand Down
5 changes: 4 additions & 1 deletion parquet/src/encodings/rle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,10 @@ impl RleDecoder {
self.rle_left = (indicator_value >> 1) as u32;
let value_width = bit_util::ceil(self.bit_width as usize, 8);
self.current_value = bit_reader.get_aligned::<u64>(value_width);
assert!(self.current_value.is_some());
assert!(
self.current_value.is_some(),
"parquet_data_error: not enough data for RLE decoding"
);
}
true
} else {
Expand Down
45 changes: 45 additions & 0 deletions parquet/src/file/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,25 @@ impl ChunkReader for Bytes {

fn get_read(&self, start: u64) -> Result<Self::T> {
let start = start as usize;
if start > self.len() {
return Err(eof_err!(
"Expected to read at offset {start}, while file has length {}",
self.len()
));
}
Ok(self.slice(start..).reader())
}

fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
let start = start as usize;
if start > self.len() || start + length > self.len() {
return Err(eof_err!(
"Expected to read {} bytes at offset {}, while file has length {}",
length,
start,
self.len()
));
}
Ok(self.slice(start..start + length))
}
}
Expand Down Expand Up @@ -274,3 +288,34 @@ impl Iterator for FilePageIterator {
}

impl PageIterator for FilePageIterator {}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_bytes_chunk_reader_get_read_out_of_bounds() {
let data = Bytes::from(vec![0, 1, 2, 3]);
let err = data.get_read(5).unwrap_err();
assert_eq!(
err.to_string(),
"Parquet error: Expected to read at offset 5, while file has length 4"
);
}

#[test]
fn test_bytes_chunk_reader_get_bytes_out_of_bounds() {
let data = Bytes::from(vec![0, 1, 2, 3]);
let err = data.get_bytes(5, 1).unwrap_err();
assert_eq!(
err.to_string(),
"Parquet error: Expected to read 1 bytes at offset 5, while file has length 4"
);

let err = data.get_bytes(2, 3).unwrap_err();
assert_eq!(
err.to_string(),
"Parquet error: Expected to read 3 bytes at offset 2, while file has length 4"
);
}
}
34 changes: 33 additions & 1 deletion parquet/src/file/serialized_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,9 @@ pub(crate) fn decode_page(
let buffer = match decompressor {
Some(decompressor) if can_decompress => {
let uncompressed_page_size = usize::try_from(page_header.uncompressed_page_size)?;
if offset > buffer.len() || offset > uncompressed_page_size {
return Err(general_err!("Invalid page header"));
}
let decompressed_size = uncompressed_page_size - offset;
let mut decompressed = Vec::with_capacity(uncompressed_page_size);
decompressed.extend_from_slice(&buffer.as_ref()[..offset]);
Expand Down Expand Up @@ -458,7 +461,7 @@ pub(crate) fn decode_page(
}
_ => {
// For unknown page type (e.g., INDEX_PAGE), skip and read next.
unimplemented!("Page type {:?} is not supported", page_header.r#type)
return Err(general_err!("Page type {:?} is not supported", page_header.r#type));
}
};

Expand Down Expand Up @@ -1139,6 +1142,35 @@ mod tests {

use super::*;

#[test]
fn test_decode_page_invalid_offset() {
use crate::file::metadata::thrift_gen::DataPageHeaderV2;

let mut page_header = PageHeader::default();
page_header.r#type = PageType::DATA_PAGE_V2;
page_header.uncompressed_page_size = 10;
page_header.compressed_page_size = 10;
let mut data_page_header_v2 = DataPageHeaderV2::default();
data_page_header_v2.definition_levels_byte_length = 11; // offset > uncompressed_page_size
page_header.data_page_header_v2 = Some(data_page_header_v2);

let buffer = Bytes::new();
let err = decode_page(page_header, buffer, Type::INT32, None).unwrap_err();
assert_eq!(err.to_string(), "Parquet error: Invalid page header");
Comment on lines +1149 to +1159
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't implement Default for the thrift structs nor Debug for Page. Instead you'll have to explicitly instantiate the page header. Something like

    let page_header = PageHeader {
        r#type: PageType::DATA_PAGE_V2,
        uncompressed_page_size: 10,
        compressed_page_size: 10,
        data_page_header: None,
        index_page_header: None,
        dictionary_page_header: None,
        crc: None,
        data_page_header_v2: Some(DataPageHeaderV2 {
            num_nulls: 0,
            num_rows: 0,
            num_values: 0,
            encoding: Encoding::PLAIN,
            definition_levels_byte_length: 11,
            repetition_levels_byte_length: 0,
            is_compressed: None,
            statistics: None,
        }),
    };

    let buffer = Bytes::new();
    match decode_page(page_header, buffer, Type::INT32, None) {
        Err(e) => assert_eq!(e.to_string(), "Parquet error: Invalid page header"),
        _ => panic!("should have failed"),
    }

}

#[test]
fn test_decode_unsupported_page() {
let mut page_header = PageHeader::default();
page_header.r#type = PageType::INDEX_PAGE;
let buffer = Bytes::new();
let err = decode_page(page_header, buffer, Type::INT32, None).unwrap_err();
assert_eq!(
err.to_string(),
"Parquet error: Page type INDEX_PAGE is not supported"
);
}

#[test]
fn test_cursor_and_file_has_the_same_behaviour() {
let mut buf: Vec<u8> = Vec::new();
Expand Down
24 changes: 13 additions & 11 deletions parquet/src/schema/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1348,17 +1348,19 @@ fn schema_from_array_helper<'a>(
.with_logical_type(logical_type)
.with_fields(fields)
.with_id(field_id);
if let Some(rep) = repetition {
// Sometimes parquet-cpp and parquet-mr set repetition level REQUIRED or
// REPEATED for root node.
//
// We only set repetition for group types that are not top-level message
// type. According to parquet-format:
// Root of the schema does not have a repetition_type.
// All other types must have one.
if !is_root_node {
builder = builder.with_repetition(rep);
}

// Sometimes parquet-cpp and parquet-mr set repetition level REQUIRED or
// REPEATED for root node.
//
// We only set repetition for group types that are not top-level message
// type. According to parquet-format:
// Root of the schema does not have a repetition_type.
// All other types must have one.
if !is_root_node {
let Some(rep) = repetition else {
return Err(general_err!("Repetition level must be defined for non-root types"));
};
builder = builder.with_repetition(rep);
}
Ok((next_index, Arc::new(builder.build().unwrap())))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we know the unwrap is safe?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

build never returns an Err 😉. But good point, could replace unwrap with ?.

}
Expand Down
6 changes: 4 additions & 2 deletions parquet/tests/arrow_reader/bad_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,12 @@ fn test_parquet_1481() {
}

#[test]
#[should_panic(expected = "assertion failed: self.current_value.is_some()")]
fn test_arrow_gh_41321() {
let err = read_file("ARROW-GH-41321.parquet").unwrap_err();
assert_eq!(err.to_string(), "TBD (currently panics)");
assert_eq!(
err.to_string(),
"External: Parquet argument error: Parquet error: Invalid or corrupted RLE bit width 254. Max allowed is 32"
);
}

#[test]
Expand Down
Loading