Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions parquet/src/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,11 @@ impl EncodingMask {
Self(mask)
}

/// Mark the given [`Encoding`] as present in this mask.
pub fn insert(&mut self, val: Encoding) {
self.0 |= 1 << (val as i32);
}

/// Test if a given [`Encoding`] is present in this mask.
pub fn is_set(&self, val: Encoding) -> bool {
self.0 & (1 << (val as i32)) != 0
Expand Down
63 changes: 34 additions & 29 deletions parquet/src/column/reader/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,9 @@
// specific language governing permissions and limitations
// under the License.

use std::collections::HashMap;

use bytes::Bytes;

use crate::basic::Encoding;
use crate::basic::{Encoding, EncodingMask};
use crate::data_type::DataType;
use crate::encodings::{
decoding::{Decoder, DictDecoder, PlainDecoder, get_decoder},
Expand Down Expand Up @@ -68,9 +66,9 @@ pub trait RepetitionLevelDecoder: ColumnLevelDecoder {
}

pub trait DefinitionLevelDecoder: ColumnLevelDecoder {
/// Read up to `num_levels` definition levels into `out`
/// Read up to `num_levels` definition levels into `out`.
///
/// Returns the number of values skipped, and the number of levels skipped
/// Returns the number of values read, and the number of levels read.
///
/// # Panics
///
Expand All @@ -81,9 +79,9 @@ pub trait DefinitionLevelDecoder: ColumnLevelDecoder {
num_levels: usize,
) -> Result<(usize, usize)>;

/// Skips over `num_levels` definition levels
/// Skips over `num_levels` definition levels.
///
/// Returns the number of values skipped, and the number of levels skipped
/// Returns the number of values skipped, and the number of levels skipped.
fn skip_def_levels(&mut self, num_levels: usize) -> Result<(usize, usize)>;
}

Expand Down Expand Up @@ -136,14 +134,22 @@ pub trait ColumnValueDecoder {
fn skip_values(&mut self, num_values: usize) -> Result<usize>;
}

/// Bucket-based storage for decoder instances keyed by `Encoding`.
///
/// This replaces `HashMap` lookups with direct indexing to avoid hashing overhead in the
/// hot decoding paths.
const ENCODING_SLOTS: usize = Encoding::BYTE_STREAM_SPLIT as usize + 1;

/// An implementation of [`ColumnValueDecoder`] for `[T::T]`
pub struct ColumnValueDecoderImpl<T: DataType> {
descr: ColumnDescPtr,

current_encoding: Option<Encoding>,

// Cache of decoders for existing encodings
decoders: HashMap<Encoding, Box<dyn Decoder<T>>>,
/// Cache of decoders for existing encodings.
/// Uses `EncodingMask` and dense storage keyed by encoding discriminant.
decoder_mask: EncodingMask,
decoders: [Option<Box<dyn Decoder<T>>>; ENCODING_SLOTS],
}

impl<T: DataType> ColumnValueDecoder for ColumnValueDecoderImpl<T> {
Expand All @@ -153,7 +159,8 @@ impl<T: DataType> ColumnValueDecoder for ColumnValueDecoderImpl<T> {
Self {
descr: descr.clone(),
current_encoding: None,
decoders: Default::default(),
decoder_mask: EncodingMask::default(),
decoders: std::array::from_fn(|_| None),
}
}

Expand All @@ -168,7 +175,7 @@ impl<T: DataType> ColumnValueDecoder for ColumnValueDecoderImpl<T> {
encoding = Encoding::RLE_DICTIONARY
}

if self.decoders.contains_key(&encoding) {
if self.decoder_mask.is_set(encoding) {
return Err(general_err!("Column cannot have more than one dictionary"));
}

Expand All @@ -178,7 +185,8 @@ impl<T: DataType> ColumnValueDecoder for ColumnValueDecoderImpl<T> {

let mut decoder = DictDecoder::new();
decoder.set_dict(Box::new(dictionary))?;
self.decoders.insert(encoding, Box::new(decoder));
self.decoders[encoding as usize] = Some(Box::new(decoder));
self.decoder_mask.insert(encoding);
Ok(())
} else {
Err(nyi_err!(
Expand All @@ -195,25 +203,24 @@ impl<T: DataType> ColumnValueDecoder for ColumnValueDecoderImpl<T> {
num_levels: usize,
num_values: Option<usize>,
) -> Result<()> {
use std::collections::hash_map::Entry;

if encoding == Encoding::PLAIN_DICTIONARY {
encoding = Encoding::RLE_DICTIONARY;
}

let decoder = if encoding == Encoding::RLE_DICTIONARY {
self.decoders
.get_mut(&encoding)
self.decoders[encoding as usize]
.as_mut()
.expect("Decoder for dict should have been set")
} else {
// Search cache for data page decoder
match self.decoders.entry(encoding) {
Entry::Occupied(e) => e.into_mut(),
Entry::Vacant(v) => {
let data_decoder = get_decoder::<T>(self.descr.clone(), encoding)?;
v.insert(data_decoder)
}
let slot = encoding as usize;
if self.decoders[slot].is_none() {
let data_decoder = get_decoder::<T>(self.descr.clone(), encoding)?;
self.decoders[slot] = Some(data_decoder);
self.decoder_mask.insert(encoding);
}
self.decoders[slot]
.as_mut()
.expect("decoder should have been inserted")
};

decoder.set_data(data, num_values.unwrap_or(num_levels))?;
Expand All @@ -226,9 +233,8 @@ impl<T: DataType> ColumnValueDecoder for ColumnValueDecoderImpl<T> {
.current_encoding
.expect("current_encoding should be set");

let current_decoder = self
.decoders
.get_mut(&encoding)
let current_decoder = self.decoders[encoding as usize]
.as_mut()
.unwrap_or_else(|| panic!("decoder for encoding {encoding} should be set"));

// TODO: Push vec into decoder (#5177)
Expand All @@ -244,9 +250,8 @@ impl<T: DataType> ColumnValueDecoder for ColumnValueDecoderImpl<T> {
.current_encoding
.expect("current_encoding should be set");

let current_decoder = self
.decoders
.get_mut(&encoding)
let current_decoder = self.decoders[encoding as usize]
.as_mut()
.unwrap_or_else(|| panic!("decoder for encoding {encoding} should be set"));

current_decoder.skip(num_values)
Expand Down
Loading