Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 7 additions & 12 deletions parquet/benches/row_selection_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use bytes::Bytes;
use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
use parquet::arrow::ArrowWriter;
use parquet::arrow::arrow_reader::{
ParquetRecordBatchReaderBuilder, RowSelection, RowSelectionStrategy, RowSelector,
ParquetRecordBatchReaderBuilder, RowSelection, RowSelectionPolicy, RowSelector,
};
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
Expand Down Expand Up @@ -239,8 +239,7 @@ fn bench_over_lengths(
&bench_input,
|b, input| {
b.iter(|| {
let total =
run_read(&input.parquet_data, &input.selection, mode.strategy());
let total = run_read(&input.parquet_data, &input.selection, mode.policy());
hint::black_box(total);
});
},
Expand All @@ -257,16 +256,12 @@ struct BenchInput {
selection: RowSelection,
}

fn run_read(
parquet_data: &Bytes,
selection: &RowSelection,
strategy: RowSelectionStrategy,
) -> usize {
fn run_read(parquet_data: &Bytes, selection: &RowSelection, policy: RowSelectionPolicy) -> usize {
let reader = ParquetRecordBatchReaderBuilder::try_new(parquet_data.clone())
.unwrap()
.with_batch_size(BATCH_SIZE)
.with_row_selection(selection.clone())
.with_row_selection_strategy(strategy)
.with_row_selection_policy(policy)
.build()
.unwrap();

Expand Down Expand Up @@ -465,10 +460,10 @@ impl BenchMode {
}
}

fn strategy(self) -> RowSelectionStrategy {
fn policy(self) -> RowSelectionPolicy {
match self {
BenchMode::ReadSelector => RowSelectionStrategy::Selectors,
BenchMode::ReadMask => RowSelectionStrategy::Mask,
BenchMode::ReadSelector => RowSelectionPolicy::Selectors,
BenchMode::ReadMask => RowSelectionPolicy::Mask,
}
}
}
Expand Down
20 changes: 11 additions & 9 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use arrow_array::{Array, RecordBatch, RecordBatchReader};
use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef};
use arrow_select::filter::filter_record_batch;
pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
pub use selection::{RowSelection, RowSelectionCursor, RowSelectionStrategy, RowSelector};
pub use selection::{RowSelection, RowSelectionCursor, RowSelectionPolicy, RowSelector};
use std::fmt::{Debug, Formatter};
use std::sync::Arc;

Expand All @@ -49,7 +49,7 @@ pub use read_plan::{ReadPlan, ReadPlanBuilder};
mod filter;
pub mod metrics;
mod read_plan;
mod selection;
pub(crate) mod selection;
pub mod statistics;

/// Builder for constructing Parquet readers that decode into [Apache Arrow]
Expand Down Expand Up @@ -125,7 +125,7 @@ pub struct ArrowReaderBuilder<T> {

pub(crate) selection: Option<RowSelection>,

pub(crate) selection_strategy: RowSelectionStrategy,
pub(crate) row_selection_policy: RowSelectionPolicy,

pub(crate) limit: Option<usize>,

Expand All @@ -148,7 +148,7 @@ impl<T: Debug> Debug for ArrowReaderBuilder<T> {
.field("projection", &self.projection)
.field("filter", &self.filter)
.field("selection", &self.selection)
.field("selection_strategy", &self.selection_strategy)
.field("row_selection_policy", &self.row_selection_policy)
.field("limit", &self.limit)
.field("offset", &self.offset)
.field("metrics", &self.metrics)
Expand All @@ -168,7 +168,7 @@ impl<T> ArrowReaderBuilder<T> {
projection: ProjectionMask::all(),
filter: None,
selection: None,
selection_strategy: RowSelectionStrategy::default(),
row_selection_policy: RowSelectionPolicy::default(),
limit: None,
offset: None,
metrics: ArrowReaderMetrics::Disabled,
Expand Down Expand Up @@ -218,9 +218,11 @@ impl<T> ArrowReaderBuilder<T> {
}

/// Configure how row selections should be materialised during execution
pub fn with_row_selection_strategy(self, strategy: RowSelectionStrategy) -> Self {
///
/// See [`RowSelectionPolicy`] for more details
pub fn with_row_selection_policy(self, policy: RowSelectionPolicy) -> Self {
Self {
selection_strategy: strategy,
row_selection_policy: policy,
..self
}
}
Expand Down Expand Up @@ -904,7 +906,7 @@ impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
projection,
mut filter,
selection,
selection_strategy,
row_selection_policy,
limit,
offset,
metrics,
Expand All @@ -925,7 +927,7 @@ impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {

let mut plan_builder = ReadPlanBuilder::new(batch_size)
.with_selection(selection)
.with_selection_strategy(selection_strategy);
.with_row_selection_policy(row_selection_policy);

// Update selection based on any filters
if let Some(filter) = filter.as_mut() {
Expand Down
54 changes: 29 additions & 25 deletions parquet/src/arrow/arrow_reader/read_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@
//! from a Parquet file

use crate::arrow::array_reader::ArrayReader;
use crate::arrow::arrow_reader::selection::RowSelectionPolicy;
use crate::arrow::arrow_reader::selection::RowSelectionStrategy;
use crate::arrow::arrow_reader::{
ArrowPredicate, ParquetRecordBatchReader, RowSelection, RowSelectionCursor,
RowSelectionStrategy, RowSelector,
ArrowPredicate, ParquetRecordBatchReader, RowSelection, RowSelectionCursor, RowSelector,
};
use crate::errors::{ParquetError, Result};
use arrow_array::Array;
Expand All @@ -32,10 +33,10 @@ use std::collections::VecDeque;
#[derive(Clone, Debug)]
pub struct ReadPlanBuilder {
batch_size: usize,
/// Current to apply, includes all filters
/// Which rows to select. Includes the result of all filters applied so far
selection: Option<RowSelection>,
/// Strategy to use when materialising the row selection
selection_strategy: RowSelectionStrategy,
/// Policy to use when materializing the row selection
row_selection_policy: RowSelectionPolicy,
}

impl ReadPlanBuilder {
Expand All @@ -44,7 +45,7 @@ impl ReadPlanBuilder {
Self {
batch_size,
selection: None,
selection_strategy: RowSelectionStrategy::default(),
row_selection_policy: RowSelectionPolicy::default(),
}
}

Expand All @@ -54,12 +55,19 @@ impl ReadPlanBuilder {
self
}

/// Configure the strategy to use when materialising the [`RowSelection`]
pub fn with_selection_strategy(mut self, strategy: RowSelectionStrategy) -> Self {
self.selection_strategy = strategy;
/// Configure the policy to use when materialising the [`RowSelection`]
///
/// Defaults to [`RowSelectionPolicy::Auto`]
pub fn with_row_selection_policy(mut self, policy: RowSelectionPolicy) -> Self {
self.row_selection_policy = policy;
self
}

/// Returns the current row selection policy
pub fn row_selection_policy(&self) -> &RowSelectionPolicy {
&self.row_selection_policy
}

/// Returns the current selection, if any
pub fn selection(&self) -> Option<&RowSelection> {
self.selection.as_ref()
Expand Down Expand Up @@ -89,14 +97,14 @@ impl ReadPlanBuilder {
self.selection.as_ref().map(|s| s.row_count())
}

/// Returns the preferred [`RowSelectionStrategy`] for materialising the current selection.
/// Returns the [`RowSelectionStrategy`] for this plan.
///
/// Guarantees to return either `Selectors` or `Mask`, never `Auto`.
pub fn preferred_selection_strategy(&self) -> RowSelectionStrategy {
match self.selection_strategy {
RowSelectionStrategy::Selectors => RowSelectionStrategy::Selectors,
RowSelectionStrategy::Mask => RowSelectionStrategy::Mask,
RowSelectionStrategy::Auto { threshold, .. } => {
pub(crate) fn resolve_selection_strategy(&self) -> RowSelectionStrategy {
match self.row_selection_policy {
RowSelectionPolicy::Selectors => RowSelectionStrategy::Selectors,
RowSelectionPolicy::Mask => RowSelectionStrategy::Mask,
RowSelectionPolicy::Auto { threshold, .. } => {
let selection = match self.selection.as_ref() {
Some(selection) => selection,
None => return RowSelectionStrategy::Selectors,
Expand Down Expand Up @@ -172,12 +180,12 @@ impl ReadPlanBuilder {
}

// Preferred strategy must not be Auto
let selection_strategy = self.preferred_selection_strategy();
let selection_strategy = self.resolve_selection_strategy();

let Self {
batch_size,
selection,
selection_strategy: _,
row_selection_policy: _,
} = self;

let selection = selection.map(|s| s.trim());
Expand All @@ -191,7 +199,6 @@ impl ReadPlanBuilder {
RowSelectionCursor::new_mask_from_selectors(selectors)
}
RowSelectionStrategy::Selectors => RowSelectionCursor::new_selectors(selectors),
RowSelectionStrategy::Auto { .. } => unreachable!(),
}
})
.unwrap_or(RowSelectionCursor::new_all());
Expand Down Expand Up @@ -335,21 +342,18 @@ mod tests {
let selection = RowSelection::from(vec![RowSelector::select(8)]);
let builder = builder_with_selection(selection);
assert_eq!(
builder.preferred_selection_strategy(),
builder.resolve_selection_strategy(),
RowSelectionStrategy::Mask
);
}

#[test]
fn preferred_selection_strategy_prefers_selectors_when_threshold_small() {
let selection = RowSelection::from(vec![RowSelector::select(8)]);
let builder =
builder_with_selection(selection).with_selection_strategy(RowSelectionStrategy::Auto {
threshold: 1,
safe_strategy: true,
});
let builder = builder_with_selection(selection)
.with_row_selection_policy(RowSelectionPolicy::Auto { threshold: 1 });
assert_eq!(
builder.preferred_selection_strategy(),
builder.resolve_selection_strategy(),
RowSelectionStrategy::Selectors
);
}
Expand Down
28 changes: 19 additions & 9 deletions parquet/src/arrow/arrow_reader/selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@ use std::cmp::Ordering;
use std::collections::VecDeque;
use std::ops::Range;

/// Strategy for materialising [`RowSelection`] during execution.
/// Policy for picking a strategy to materialise [`RowSelection`] during execution.
///
/// Note that this is a user-provided preference, and the actual strategy used
/// may differ based on safety considerations (e.g. page skipping).
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum RowSelectionStrategy {
pub enum RowSelectionPolicy {
/// Use a queue of [`RowSelector`] values
Selectors,
/// Use a boolean mask to materialise the selection
Expand All @@ -36,20 +39,27 @@ pub enum RowSelectionStrategy {
Auto {
/// Average selector length below which masks are preferred
threshold: usize,
/// Fallback to selectors when mask would be unsafe (e.g. page skipping)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

safe was always set to true, so I removed it as I believe it is redundant

safe_strategy: bool,
},
}

impl Default for RowSelectionStrategy {
impl Default for RowSelectionPolicy {
fn default() -> Self {
Self::Auto {
threshold: 32,
safe_strategy: true,
}
Self::Auto { threshold: 32 }
}
}

/// Fully resolved strategy for materializing [`RowSelection`] during execution.
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I eventually concluded that using a separate enum for the actual strategy to use was the clearest -- that way the type system encodes when Auto must have been resolved

///
/// This is determined from a combination of user preference (via [`RowSelectionPolicy`])
/// and safety considerations (e.g. page skipping).
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(crate) enum RowSelectionStrategy {
/// Use a queue of [`RowSelector`] values
Selectors,
/// Use a boolean mask to materialise the selection
Mask,
}

/// [`RowSelection`] is a collection of [`RowSelector`] used to skip rows when
/// scanning a parquet file
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
Expand Down
4 changes: 2 additions & 2 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
projection,
filter,
selection,
selection_strategy,
row_selection_policy: selection_strategy,
limit,
offset,
metrics,
Expand All @@ -508,7 +508,7 @@ impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
projection,
filter,
selection,
selection_strategy,
row_selection_policy: selection_strategy,
batch_size,
row_groups,
limit,
Expand Down
5 changes: 2 additions & 3 deletions parquet/src/arrow/push_decoder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ impl ParquetPushDecoderBuilder {

/// Create a [`ParquetPushDecoder`] with the configured options
pub fn build(self) -> Result<ParquetPushDecoder, ParquetError> {
let selection_strategy = self.selection_strategy;
let Self {
input: NoInput,
metadata: parquet_metadata,
Expand All @@ -176,7 +175,7 @@ impl ParquetPushDecoderBuilder {
limit,
offset,
metrics,
selection_strategy: _,
row_selection_policy,
max_predicate_cache_size,
} = self;

Expand All @@ -198,7 +197,7 @@ impl ParquetPushDecoderBuilder {
metrics,
max_predicate_cache_size,
buffers,
selection_strategy,
row_selection_policy,
);

// Initialize the decoder with the configured options
Expand Down
Loading
Loading