diff --git a/parquet/benches/row_selection_state.rs b/parquet/benches/row_selection_state.rs index 893f1c382cc..49c9e6d68ac 100644 --- a/parquet/benches/row_selection_state.rs +++ b/parquet/benches/row_selection_state.rs @@ -25,7 +25,7 @@ use bytes::Bytes; use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main}; use parquet::arrow::ArrowWriter; use parquet::arrow::arrow_reader::{ - ParquetRecordBatchReaderBuilder, RowSelection, RowSelectionStrategy, RowSelector, + ParquetRecordBatchReaderBuilder, RowSelection, RowSelectionPolicy, RowSelector, }; use rand::rngs::StdRng; use rand::{Rng, SeedableRng}; @@ -239,8 +239,7 @@ fn bench_over_lengths( &bench_input, |b, input| { b.iter(|| { - let total = - run_read(&input.parquet_data, &input.selection, mode.strategy()); + let total = run_read(&input.parquet_data, &input.selection, mode.policy()); hint::black_box(total); }); }, @@ -257,16 +256,12 @@ struct BenchInput { selection: RowSelection, } -fn run_read( - parquet_data: &Bytes, - selection: &RowSelection, - strategy: RowSelectionStrategy, -) -> usize { +fn run_read(parquet_data: &Bytes, selection: &RowSelection, policy: RowSelectionPolicy) -> usize { let reader = ParquetRecordBatchReaderBuilder::try_new(parquet_data.clone()) .unwrap() .with_batch_size(BATCH_SIZE) .with_row_selection(selection.clone()) - .with_row_selection_strategy(strategy) + .with_row_selection_policy(policy) .build() .unwrap(); @@ -465,10 +460,10 @@ impl BenchMode { } } - fn strategy(self) -> RowSelectionStrategy { + fn policy(self) -> RowSelectionPolicy { match self { - BenchMode::ReadSelector => RowSelectionStrategy::Selectors, - BenchMode::ReadMask => RowSelectionStrategy::Mask, + BenchMode::ReadSelector => RowSelectionPolicy::Selectors, + BenchMode::ReadMask => RowSelectionPolicy::Mask, } } } diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index af5ee7db4cc..6667afa0f5b 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -22,7 +22,7 @@ use arrow_array::{Array, RecordBatch, RecordBatchReader}; use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef}; use arrow_select::filter::filter_record_batch; pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter}; -pub use selection::{RowSelection, RowSelectionCursor, RowSelectionStrategy, RowSelector}; +pub use selection::{RowSelection, RowSelectionCursor, RowSelectionPolicy, RowSelector}; use std::fmt::{Debug, Formatter}; use std::sync::Arc; @@ -49,7 +49,7 @@ pub use read_plan::{ReadPlan, ReadPlanBuilder}; mod filter; pub mod metrics; mod read_plan; -mod selection; +pub(crate) mod selection; pub mod statistics; /// Builder for constructing Parquet readers that decode into [Apache Arrow] @@ -125,7 +125,7 @@ pub struct ArrowReaderBuilder { pub(crate) selection: Option, - pub(crate) selection_strategy: RowSelectionStrategy, + pub(crate) row_selection_policy: RowSelectionPolicy, pub(crate) limit: Option, @@ -148,7 +148,7 @@ impl Debug for ArrowReaderBuilder { .field("projection", &self.projection) .field("filter", &self.filter) .field("selection", &self.selection) - .field("selection_strategy", &self.selection_strategy) + .field("row_selection_policy", &self.row_selection_policy) .field("limit", &self.limit) .field("offset", &self.offset) .field("metrics", &self.metrics) @@ -168,7 +168,7 @@ impl ArrowReaderBuilder { projection: ProjectionMask::all(), filter: None, selection: None, - selection_strategy: RowSelectionStrategy::default(), + row_selection_policy: RowSelectionPolicy::default(), limit: None, offset: None, metrics: ArrowReaderMetrics::Disabled, @@ -218,9 +218,11 @@ impl ArrowReaderBuilder { } /// Configure how row selections should be materialised during execution - pub fn with_row_selection_strategy(self, strategy: RowSelectionStrategy) -> Self { + /// + /// See [`RowSelectionPolicy`] for more details + pub fn with_row_selection_policy(self, policy: RowSelectionPolicy) -> Self { Self { - selection_strategy: strategy, + row_selection_policy: policy, ..self } } @@ -904,7 +906,7 @@ impl ParquetRecordBatchReaderBuilder { projection, mut filter, selection, - selection_strategy, + row_selection_policy, limit, offset, metrics, @@ -925,7 +927,7 @@ impl ParquetRecordBatchReaderBuilder { let mut plan_builder = ReadPlanBuilder::new(batch_size) .with_selection(selection) - .with_selection_strategy(selection_strategy); + .with_row_selection_policy(row_selection_policy); // Update selection based on any filters if let Some(filter) = filter.as_mut() { diff --git a/parquet/src/arrow/arrow_reader/read_plan.rs b/parquet/src/arrow/arrow_reader/read_plan.rs index d0cbe434c07..3c17a358f08 100644 --- a/parquet/src/arrow/arrow_reader/read_plan.rs +++ b/parquet/src/arrow/arrow_reader/read_plan.rs @@ -19,9 +19,10 @@ //! from a Parquet file use crate::arrow::array_reader::ArrayReader; +use crate::arrow::arrow_reader::selection::RowSelectionPolicy; +use crate::arrow::arrow_reader::selection::RowSelectionStrategy; use crate::arrow::arrow_reader::{ - ArrowPredicate, ParquetRecordBatchReader, RowSelection, RowSelectionCursor, - RowSelectionStrategy, RowSelector, + ArrowPredicate, ParquetRecordBatchReader, RowSelection, RowSelectionCursor, RowSelector, }; use crate::errors::{ParquetError, Result}; use arrow_array::Array; @@ -32,10 +33,10 @@ use std::collections::VecDeque; #[derive(Clone, Debug)] pub struct ReadPlanBuilder { batch_size: usize, - /// Current to apply, includes all filters + /// Which rows to select. Includes the result of all filters applied so far selection: Option, - /// Strategy to use when materialising the row selection - selection_strategy: RowSelectionStrategy, + /// Policy to use when materializing the row selection + row_selection_policy: RowSelectionPolicy, } impl ReadPlanBuilder { @@ -44,7 +45,7 @@ impl ReadPlanBuilder { Self { batch_size, selection: None, - selection_strategy: RowSelectionStrategy::default(), + row_selection_policy: RowSelectionPolicy::default(), } } @@ -54,12 +55,19 @@ impl ReadPlanBuilder { self } - /// Configure the strategy to use when materialising the [`RowSelection`] - pub fn with_selection_strategy(mut self, strategy: RowSelectionStrategy) -> Self { - self.selection_strategy = strategy; + /// Configure the policy to use when materialising the [`RowSelection`] + /// + /// Defaults to [`RowSelectionPolicy::Auto`] + pub fn with_row_selection_policy(mut self, policy: RowSelectionPolicy) -> Self { + self.row_selection_policy = policy; self } + /// Returns the current row selection policy + pub fn row_selection_policy(&self) -> &RowSelectionPolicy { + &self.row_selection_policy + } + /// Returns the current selection, if any pub fn selection(&self) -> Option<&RowSelection> { self.selection.as_ref() @@ -89,14 +97,14 @@ impl ReadPlanBuilder { self.selection.as_ref().map(|s| s.row_count()) } - /// Returns the preferred [`RowSelectionStrategy`] for materialising the current selection. + /// Returns the [`RowSelectionStrategy`] for this plan. /// /// Guarantees to return either `Selectors` or `Mask`, never `Auto`. - pub fn preferred_selection_strategy(&self) -> RowSelectionStrategy { - match self.selection_strategy { - RowSelectionStrategy::Selectors => RowSelectionStrategy::Selectors, - RowSelectionStrategy::Mask => RowSelectionStrategy::Mask, - RowSelectionStrategy::Auto { threshold, .. } => { + pub(crate) fn resolve_selection_strategy(&self) -> RowSelectionStrategy { + match self.row_selection_policy { + RowSelectionPolicy::Selectors => RowSelectionStrategy::Selectors, + RowSelectionPolicy::Mask => RowSelectionStrategy::Mask, + RowSelectionPolicy::Auto { threshold, .. } => { let selection = match self.selection.as_ref() { Some(selection) => selection, None => return RowSelectionStrategy::Selectors, @@ -172,12 +180,12 @@ impl ReadPlanBuilder { } // Preferred strategy must not be Auto - let selection_strategy = self.preferred_selection_strategy(); + let selection_strategy = self.resolve_selection_strategy(); let Self { batch_size, selection, - selection_strategy: _, + row_selection_policy: _, } = self; let selection = selection.map(|s| s.trim()); @@ -191,7 +199,6 @@ impl ReadPlanBuilder { RowSelectionCursor::new_mask_from_selectors(selectors) } RowSelectionStrategy::Selectors => RowSelectionCursor::new_selectors(selectors), - RowSelectionStrategy::Auto { .. } => unreachable!(), } }) .unwrap_or(RowSelectionCursor::new_all()); @@ -335,7 +342,7 @@ mod tests { let selection = RowSelection::from(vec![RowSelector::select(8)]); let builder = builder_with_selection(selection); assert_eq!( - builder.preferred_selection_strategy(), + builder.resolve_selection_strategy(), RowSelectionStrategy::Mask ); } @@ -343,13 +350,10 @@ mod tests { #[test] fn preferred_selection_strategy_prefers_selectors_when_threshold_small() { let selection = RowSelection::from(vec![RowSelector::select(8)]); - let builder = - builder_with_selection(selection).with_selection_strategy(RowSelectionStrategy::Auto { - threshold: 1, - safe_strategy: true, - }); + let builder = builder_with_selection(selection) + .with_row_selection_policy(RowSelectionPolicy::Auto { threshold: 1 }); assert_eq!( - builder.preferred_selection_strategy(), + builder.resolve_selection_strategy(), RowSelectionStrategy::Selectors ); } diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index 5fcf494454f..2ddf812f9c3 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -25,9 +25,12 @@ use std::cmp::Ordering; use std::collections::VecDeque; use std::ops::Range; -/// Strategy for materialising [`RowSelection`] during execution. +/// Policy for picking a strategy to materialise [`RowSelection`] during execution. +/// +/// Note that this is a user-provided preference, and the actual strategy used +/// may differ based on safety considerations (e.g. page skipping). #[derive(Clone, Copy, Debug, Eq, PartialEq)] -pub enum RowSelectionStrategy { +pub enum RowSelectionPolicy { /// Use a queue of [`RowSelector`] values Selectors, /// Use a boolean mask to materialise the selection @@ -36,20 +39,27 @@ pub enum RowSelectionStrategy { Auto { /// Average selector length below which masks are preferred threshold: usize, - /// Fallback to selectors when mask would be unsafe (e.g. page skipping) - safe_strategy: bool, }, } -impl Default for RowSelectionStrategy { +impl Default for RowSelectionPolicy { fn default() -> Self { - Self::Auto { - threshold: 32, - safe_strategy: true, - } + Self::Auto { threshold: 32 } } } +/// Fully resolved strategy for materializing [`RowSelection`] during execution. +/// +/// This is determined from a combination of user preference (via [`RowSelectionPolicy`]) +/// and safety considerations (e.g. page skipping). +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub(crate) enum RowSelectionStrategy { + /// Use a queue of [`RowSelector`] values + Selectors, + /// Use a boolean mask to materialise the selection + Mask, +} + /// [`RowSelection`] is a collection of [`RowSelector`] used to skip rows when /// scanning a parquet file #[derive(Debug, Clone, Copy, Eq, PartialEq)] diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 9f2b22ed29d..d990361d21b 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -486,7 +486,7 @@ impl ParquetRecordBatchStreamBuilder { projection, filter, selection, - selection_strategy, + row_selection_policy: selection_strategy, limit, offset, metrics, @@ -508,7 +508,7 @@ impl ParquetRecordBatchStreamBuilder { projection, filter, selection, - selection_strategy, + row_selection_policy: selection_strategy, batch_size, row_groups, limit, diff --git a/parquet/src/arrow/push_decoder/mod.rs b/parquet/src/arrow/push_decoder/mod.rs index c5d09b5e7e8..9b3893258ba 100644 --- a/parquet/src/arrow/push_decoder/mod.rs +++ b/parquet/src/arrow/push_decoder/mod.rs @@ -162,7 +162,6 @@ impl ParquetPushDecoderBuilder { /// Create a [`ParquetPushDecoder`] with the configured options pub fn build(self) -> Result { - let selection_strategy = self.selection_strategy; let Self { input: NoInput, metadata: parquet_metadata, @@ -176,7 +175,7 @@ impl ParquetPushDecoderBuilder { limit, offset, metrics, - selection_strategy: _, + row_selection_policy, max_predicate_cache_size, } = self; @@ -198,7 +197,7 @@ impl ParquetPushDecoderBuilder { metrics, max_predicate_cache_size, buffers, - selection_strategy, + row_selection_policy, ); // Initialize the decoder with the configured options diff --git a/parquet/src/arrow/push_decoder/reader_builder/mod.rs b/parquet/src/arrow/push_decoder/reader_builder/mod.rs index 4545c8d60e6..cd4a6fa3889 100644 --- a/parquet/src/arrow/push_decoder/reader_builder/mod.rs +++ b/parquet/src/arrow/push_decoder/reader_builder/mod.rs @@ -22,8 +22,9 @@ use crate::DecodeResult; use crate::arrow::ProjectionMask; use crate::arrow::array_reader::{ArrayReaderBuilder, RowGroupCache}; use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics; +use crate::arrow::arrow_reader::selection::RowSelectionStrategy; use crate::arrow::arrow_reader::{ - ParquetRecordBatchReader, ReadPlanBuilder, RowFilter, RowSelection, RowSelectionStrategy, + ParquetRecordBatchReader, ReadPlanBuilder, RowFilter, RowSelection, RowSelectionPolicy, }; use crate::arrow::in_memory_row_group::ColumnChunkData; use crate::arrow::push_decoder::reader_builder::data::DataRequestBuilder; @@ -157,7 +158,7 @@ pub(crate) struct RowGroupReaderBuilder { metrics: ArrowReaderMetrics, /// Strategy for materialising row selections - selection_strategy: RowSelectionStrategy, + row_selection_policy: RowSelectionPolicy, /// Current state of the decoder. /// @@ -183,7 +184,7 @@ impl RowGroupReaderBuilder { metrics: ArrowReaderMetrics, max_predicate_cache_size: usize, buffers: PushBuffers, - selection_strategy: RowSelectionStrategy, + row_selection_policy: RowSelectionPolicy, ) -> Self { Self { batch_size, @@ -195,7 +196,7 @@ impl RowGroupReaderBuilder { offset, metrics, max_predicate_cache_size, - selection_strategy, + row_selection_policy, state: Some(RowGroupDecoderState::Finished), buffers, } @@ -241,7 +242,7 @@ impl RowGroupReaderBuilder { } let plan_builder = ReadPlanBuilder::new(self.batch_size) .with_selection(selection) - .with_selection_strategy(self.selection_strategy); + .with_row_selection_policy(self.row_selection_policy); let row_group_info = RowGroupInfo { row_group_idx, @@ -532,33 +533,13 @@ impl RowGroupReaderBuilder { // so don't call with_cache_projection here .build(); - match self.selection_strategy { - RowSelectionStrategy::Auto { - threshold: _threshold, - safe_strategy, - } => { - let preferred_strategy = plan_builder.preferred_selection_strategy(); - let offset_index = self.row_group_offset_index(row_group_idx); - let force_selectors = safe_strategy - && matches!(preferred_strategy, RowSelectionStrategy::Mask) - && plan_builder.selection().is_some_and(|selection| { - selection.should_force_selectors(&self.projection, offset_index) - }); - - let resolved_strategy = if force_selectors { - RowSelectionStrategy::Selectors - } else { - preferred_strategy - }; - - plan_builder = plan_builder.with_selection_strategy(resolved_strategy); - } - _ => { - // If a non-auto strategy is specified, override any plan builder strategy - plan_builder = - plan_builder.with_selection_strategy(self.selection_strategy); - } - } + plan_builder = plan_builder.with_row_selection_policy(self.row_selection_policy); + + plan_builder = overide_selector_strategy_if_needed( + plan_builder, + &self.projection, + self.row_group_offset_index(row_group_idx), + ); let row_group_info = RowGroupInfo { row_group_idx, @@ -697,6 +678,58 @@ impl RowGroupReaderBuilder { } } +/// Overrider the selection strategy if needed. +/// +/// Some pages can be skipped during row-group construction of they are not read +/// by the selections. This means that the data pages for those rows are never +/// loaded and definition/repetition levels are never read. When using +/// `RowSelections` selection works because skip_records() handles this +/// case and skips the page accordingly. +/// +/// However, with the current Mask design, all values must be read and decoded +/// and then a Mask filter is applied filtering. Thus if any pages are skipped +/// during row-group construction, the data pages are missing and cannot be +/// decoded. +/// +/// A simple example: +/// * the page size is 2, the mask is 100001, row selection should be read(1) skip(4) read(1) +/// * the ColumnChunkData would be page1(10), page2(skipped), page3(01) +/// +/// Using the rowselection to skip(4), page2 won't be read at all, so in this +/// case we can't decode all the rows and apply a mask. To correctly apply the +/// bit mask, we need all 6 value be read, but the page2 is not in memory. +fn overide_selector_strategy_if_needed( + plan_builder: ReadPlanBuilder, + projection_mask: &ProjectionMask, + offset_index: Option<&[OffsetIndexMetaData]>, +) -> ReadPlanBuilder { + // override only applies to Auto policy + let RowSelectionPolicy::Auto { .. } = plan_builder.row_selection_policy() else { + return plan_builder; + }; + + let preferred_strategy = plan_builder.resolve_selection_strategy(); + + let force_selectors = matches!(preferred_strategy, RowSelectionStrategy::Mask) + && plan_builder.selection().is_some_and(|selection| { + selection.should_force_selectors(projection_mask, offset_index) + }); + + let resolved_strategy = if force_selectors { + RowSelectionStrategy::Selectors + } else { + preferred_strategy + }; + + // override the plan builder strategy with the resolved one + let new_policy = match resolved_strategy { + RowSelectionStrategy::Mask => RowSelectionPolicy::Mask, + RowSelectionStrategy::Selectors => RowSelectionPolicy::Selectors, + }; + + plan_builder.with_row_selection_policy(new_policy) +} + #[cfg(test)] mod tests { use super::*;