Skip to content

Commit 25bcdca

Browse files
committed
Update the strategy API
1 parent e17e907 commit 25bcdca

File tree

7 files changed

+165
-101
lines changed

7 files changed

+165
-101
lines changed

parquet/benches/row_selection_state.rs

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,7 @@ use bytes::Bytes;
2525
use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
2626
use parquet::arrow::ArrowWriter;
2727
use parquet::arrow::arrow_reader::{
28-
AvgSelectorLenMaskThresholdGuard, ParquetRecordBatchReaderBuilder, RowSelection, RowSelector,
29-
set_avg_selector_len_mask_threshold,
28+
ParquetRecordBatchReaderBuilder, RowSelection, RowSelectionStrategy, RowSelector,
3029
};
3130
use rand::rngs::StdRng;
3231
use rand::{Rng, SeedableRng};
@@ -239,12 +238,11 @@ fn bench_over_lengths(
239238
BenchmarkId::new(mode.label(), &suffix),
240239
&bench_input,
241240
|b, input| {
242-
let _guard = mode.override_threshold();
243241
b.iter(|| {
244-
let total = run_read(&input.parquet_data, &input.selection);
242+
let total =
243+
run_read(&input.parquet_data, &input.selection, mode.strategy());
245244
hint::black_box(total);
246245
});
247-
drop(_guard);
248246
},
249247
);
250248
}
@@ -259,11 +257,16 @@ struct BenchInput {
259257
selection: RowSelection,
260258
}
261259

262-
fn run_read(parquet_data: &Bytes, selection: &RowSelection) -> usize {
260+
fn run_read(
261+
parquet_data: &Bytes,
262+
selection: &RowSelection,
263+
strategy: RowSelectionStrategy,
264+
) -> usize {
263265
let reader = ParquetRecordBatchReaderBuilder::try_new(parquet_data.clone())
264266
.unwrap()
265267
.with_batch_size(BATCH_SIZE)
266268
.with_row_selection(selection.clone())
269+
.with_row_selection_strategy(strategy)
267270
.build()
268271
.unwrap();
269272

@@ -462,10 +465,10 @@ impl BenchMode {
462465
}
463466
}
464467

465-
fn override_threshold(self) -> Option<AvgSelectorLenMaskThresholdGuard> {
468+
fn strategy(self) -> RowSelectionStrategy {
466469
match self {
467-
BenchMode::ReadSelector => Some(set_avg_selector_len_mask_threshold(0)),
468-
BenchMode::ReadMask => Some(set_avg_selector_len_mask_threshold(usize::MAX)),
470+
BenchMode::ReadSelector => RowSelectionStrategy::Selectors,
471+
BenchMode::ReadMask => RowSelectionStrategy::Mask,
469472
}
470473
}
471474
}

parquet/src/arrow/arrow_reader/mod.rs

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,7 @@ use crate::schema::types::SchemaDescriptor;
4444

4545
use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
4646
// Exposed so integration tests and benchmarks can temporarily override the threshold.
47-
pub use read_plan::{
48-
AvgSelectorLenMaskThresholdGuard, ReadPlan, ReadPlanBuilder,
49-
set_avg_selector_len_mask_threshold,
50-
};
47+
pub use read_plan::{ReadPlan, ReadPlanBuilder};
5148

5249
mod filter;
5350
pub mod metrics;
@@ -121,6 +118,8 @@ pub struct ArrowReaderBuilder<T> {
121118

122119
pub(crate) selection: Option<RowSelection>,
123120

121+
pub(crate) selection_strategy: RowSelectionStrategy,
122+
124123
pub(crate) limit: Option<usize>,
125124

126125
pub(crate) offset: Option<usize>,
@@ -142,6 +141,7 @@ impl<T: Debug> Debug for ArrowReaderBuilder<T> {
142141
.field("projection", &self.projection)
143142
.field("filter", &self.filter)
144143
.field("selection", &self.selection)
144+
.field("selection_strategy", &self.selection_strategy)
145145
.field("limit", &self.limit)
146146
.field("offset", &self.offset)
147147
.field("metrics", &self.metrics)
@@ -161,6 +161,7 @@ impl<T> ArrowReaderBuilder<T> {
161161
projection: ProjectionMask::all(),
162162
filter: None,
163163
selection: None,
164+
selection_strategy: RowSelectionStrategy::default(),
164165
limit: None,
165166
offset: None,
166167
metrics: ArrowReaderMetrics::Disabled,
@@ -209,6 +210,14 @@ impl<T> ArrowReaderBuilder<T> {
209210
}
210211
}
211212

213+
/// Configure how row selections should be materialised during execution
214+
pub fn with_row_selection_strategy(self, strategy: RowSelectionStrategy) -> Self {
215+
Self {
216+
selection_strategy: strategy,
217+
..self
218+
}
219+
}
220+
212221
/// Provide a [`RowSelection`] to filter out rows, and avoid fetching their
213222
/// data into memory.
214223
///
@@ -860,16 +869,19 @@ impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
860869
///
861870
/// Note: this will eagerly evaluate any `RowFilter` before returning
862871
pub fn build(self) -> Result<ParquetRecordBatchReader> {
872+
let selection_strategy = self.selection_strategy;
873+
863874
let Self {
864875
input,
865876
metadata,
866877
schema: _,
867878
fields,
868-
batch_size: _,
879+
batch_size,
869880
row_groups,
870881
projection,
871882
mut filter,
872883
selection,
884+
selection_strategy: _,
873885
limit,
874886
offset,
875887
metrics,
@@ -878,9 +890,7 @@ impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
878890
} = self;
879891

880892
// Try to avoid allocate large buffer
881-
let batch_size = self
882-
.batch_size
883-
.min(metadata.file_metadata().num_rows() as usize);
893+
let batch_size = batch_size.min(metadata.file_metadata().num_rows() as usize);
884894

885895
let row_groups = row_groups.unwrap_or_else(|| (0..metadata.num_row_groups()).collect());
886896

@@ -890,7 +900,9 @@ impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
890900
row_groups,
891901
};
892902

893-
let mut plan_builder = ReadPlanBuilder::new(batch_size).with_selection(selection);
903+
let mut plan_builder = ReadPlanBuilder::new(batch_size)
904+
.with_selection(selection)
905+
.with_selection_strategy(selection_strategy);
894906

895907
// Update selection based on any filters
896908
if let Some(filter) = filter.as_mut() {

parquet/src/arrow/arrow_reader/read_plan.rs

Lines changed: 76 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -26,55 +26,6 @@ use crate::arrow::arrow_reader::{
2626
use crate::errors::{ParquetError, Result};
2727
use arrow_array::Array;
2828
use arrow_select::filter::prep_null_mask_filter;
29-
use std::sync::atomic::{AtomicUsize, Ordering};
30-
31-
// The average selector length threshold for choosing between
32-
// `RowSelectionStrategy::Mask` and `RowSelectionStrategy::Selectors`.
33-
// If the average selector length is less than this value,
34-
// `RowSelectionStrategy::Mask` is preferred.
35-
const AVG_SELECTOR_LEN_MASK_THRESHOLD: usize = 32;
36-
37-
// The logic in `preferred_selection_strategy` depends on the constant
38-
// `AVG_SELECTOR_LEN_MASK_THRESHOLD`. To allow unit testing of this logic,
39-
// we use a mutable global variable that can be temporarily changed during tests.
40-
//
41-
// An `AtomicUsize` is used because the Rust test runner (`cargo test`) runs tests
42-
// in parallel by default. The atomic operations prevent data races between
43-
// different test threads that might try to modify this value simultaneously.
44-
//
45-
// For the production code path, `load(Ordering::Relaxed)` is used. This is the
46-
// weakest memory ordering and for a simple load on most modern architectures,
47-
// it compiles down to a regular memory read with negligible performance overhead.
48-
// The more expensive atomic operations with stronger ordering are only used in the
49-
// test-only functions below.
50-
static AVG_SELECTOR_LEN_MASK_THRESHOLD_OVERRIDE: AtomicUsize =
51-
AtomicUsize::new(AVG_SELECTOR_LEN_MASK_THRESHOLD);
52-
53-
#[inline(always)]
54-
fn avg_selector_len_mask_threshold() -> usize {
55-
AVG_SELECTOR_LEN_MASK_THRESHOLD_OVERRIDE.load(Ordering::Relaxed)
56-
}
57-
58-
/// An RAII guard that restores the previous value of the override when it is dropped.
59-
/// This ensures that any change to the global threshold is temporary and scoped to
60-
/// the test or benchmark where it's used, even in the case of a panic.
61-
pub struct AvgSelectorLenMaskThresholdGuard {
62-
previous: usize,
63-
}
64-
65-
impl Drop for AvgSelectorLenMaskThresholdGuard {
66-
fn drop(&mut self) {
67-
AVG_SELECTOR_LEN_MASK_THRESHOLD_OVERRIDE.store(self.previous, Ordering::SeqCst);
68-
}
69-
}
70-
71-
/// Override AVG_SELECTOR_LEN_MASK_THRESHOLD (primarily for tests / benchmarks).
72-
///
73-
/// Returns an [`AvgSelectorLenMaskThresholdGuard`] that restores the previous value on drop.
74-
pub fn set_avg_selector_len_mask_threshold(value: usize) -> AvgSelectorLenMaskThresholdGuard {
75-
let previous = AVG_SELECTOR_LEN_MASK_THRESHOLD_OVERRIDE.swap(value, Ordering::SeqCst);
76-
AvgSelectorLenMaskThresholdGuard { previous }
77-
}
7829

7930
/// A builder for [`ReadPlan`]
8031
#[derive(Clone, Debug)]
@@ -102,7 +53,7 @@ impl ReadPlanBuilder {
10253
self
10354
}
10455

105-
/// Force a specific strategy when materialising the [`RowSelection`]
56+
/// Configure the strategy to use when materialising the [`RowSelection`]
10657
pub fn with_selection_strategy(mut self, strategy: RowSelectionStrategy) -> Self {
10758
self.selection_strategy = strategy;
10859
self
@@ -139,24 +90,45 @@ impl ReadPlanBuilder {
13990

14091
/// Returns the preferred [`RowSelectionStrategy`] for materialising the current selection.
14192
pub fn preferred_selection_strategy(&self) -> RowSelectionStrategy {
142-
let selection = match self.selection.as_ref() {
143-
Some(selection) => selection,
144-
None => return RowSelectionStrategy::Mask,
145-
};
146-
147-
let trimmed = selection.clone().trim();
148-
let selectors: Vec<RowSelector> = trimmed.into();
149-
if selectors.is_empty() {
150-
return RowSelectionStrategy::Mask;
93+
match self.selection_strategy {
94+
RowSelectionStrategy::Selectors => RowSelectionStrategy::Selectors,
95+
RowSelectionStrategy::Mask => RowSelectionStrategy::Mask,
96+
RowSelectionStrategy::Auto { threshold, .. } => {
97+
let selection = match self.selection.as_ref() {
98+
Some(selection) => selection,
99+
None => return RowSelectionStrategy::Mask,
100+
};
101+
102+
let trimmed = selection.clone().trim();
103+
let selectors: Vec<RowSelector> = trimmed.into();
104+
if selectors.is_empty() {
105+
return RowSelectionStrategy::Mask;
106+
}
107+
108+
let total_rows: usize = selectors.iter().map(|s| s.row_count).sum();
109+
let selector_count = selectors.len();
110+
if selector_count == 0 {
111+
return RowSelectionStrategy::Mask;
112+
}
113+
114+
if total_rows < selector_count.saturating_mul(threshold) {
115+
RowSelectionStrategy::Mask
116+
} else {
117+
RowSelectionStrategy::Selectors
118+
}
119+
}
151120
}
121+
}
152122

153-
let total_rows: usize = selectors.iter().map(|s| s.row_count).sum();
154-
let selector_count = selectors.len();
155-
if total_rows < selector_count.saturating_mul(avg_selector_len_mask_threshold()) {
156-
RowSelectionStrategy::Mask
157-
} else {
158-
RowSelectionStrategy::Selectors
159-
}
123+
/// Returns `true` if the configured strategy allows falling back to selectors for safety.
124+
pub(crate) fn selection_strategy_allows_safe_fallback(&self) -> bool {
125+
matches!(
126+
self.selection_strategy,
127+
RowSelectionStrategy::Auto {
128+
safe_strategy: true,
129+
..
130+
}
131+
)
160132
}
161133

162134
/// Evaluates an [`ArrowPredicate`], updating this plan's `selection`
@@ -206,7 +178,10 @@ impl ReadPlanBuilder {
206178
if !self.selects_any() {
207179
self.selection = Some(RowSelection::from(vec![]));
208180
}
209-
let selection_strategy = self.selection_strategy;
181+
let selection_strategy = match self.selection_strategy {
182+
RowSelectionStrategy::Auto { .. } => self.preferred_selection_strategy(),
183+
strategy => strategy,
184+
};
210185
let Self {
211186
batch_size,
212187
selection,
@@ -355,12 +330,44 @@ mod tests {
355330

356331
#[test]
357332
fn preferred_selection_strategy_prefers_selectors_when_threshold_small() {
358-
let _guard = set_avg_selector_len_mask_threshold(1);
359333
let selection = RowSelection::from(vec![RowSelector::select(8)]);
360-
let builder = builder_with_selection(selection);
334+
let builder =
335+
builder_with_selection(selection).with_selection_strategy(RowSelectionStrategy::Auto {
336+
threshold: 1,
337+
safe_strategy: true,
338+
});
361339
assert_eq!(
362340
builder.preferred_selection_strategy(),
363341
RowSelectionStrategy::Selectors
364342
);
365343
}
344+
345+
#[test]
346+
fn selection_strategy_safe_fallback_detection() {
347+
let selection = RowSelection::from(vec![RowSelector::select(8)]);
348+
349+
let builder_safe = builder_with_selection(selection.clone()).with_selection_strategy(
350+
RowSelectionStrategy::Auto {
351+
threshold: 32,
352+
safe_strategy: true,
353+
},
354+
);
355+
assert!(builder_safe.selection_strategy_allows_safe_fallback());
356+
357+
let builder_unsafe = builder_with_selection(selection.clone()).with_selection_strategy(
358+
RowSelectionStrategy::Auto {
359+
threshold: 32,
360+
safe_strategy: false,
361+
},
362+
);
363+
assert!(!builder_unsafe.selection_strategy_allows_safe_fallback());
364+
365+
let builder_mask = builder_with_selection(selection.clone())
366+
.with_selection_strategy(RowSelectionStrategy::Mask);
367+
assert!(!builder_mask.selection_strategy_allows_safe_fallback());
368+
369+
let builder_selectors = builder_with_selection(selection)
370+
.with_selection_strategy(RowSelectionStrategy::Selectors);
371+
assert!(!builder_selectors.selection_strategy_allows_safe_fallback());
372+
}
366373
}

parquet/src/arrow/arrow_reader/selection.rs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,28 @@ use crate::arrow::ProjectionMask;
2626
use crate::file::page_index::offset_index::{OffsetIndexMetaData, PageLocation};
2727

2828
/// Strategy for materialising [`RowSelection`] during execution.
29-
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
29+
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
3030
pub enum RowSelectionStrategy {
3131
/// Use a queue of [`RowSelector`] values
32-
#[default]
3332
Selectors,
3433
/// Use a boolean mask to materialise the selection
3534
Mask,
35+
/// Choose between [`Self::Mask`] and [`Self::Selectors`] based on selector density
36+
Auto {
37+
/// Average selector length below which masks are preferred
38+
threshold: usize,
39+
/// Fallback to selectors when mask would be unsafe (e.g. page skipping)
40+
safe_strategy: bool,
41+
},
42+
}
43+
44+
impl Default for RowSelectionStrategy {
45+
fn default() -> Self {
46+
Self::Auto {
47+
threshold: 32,
48+
safe_strategy: true,
49+
}
50+
}
3651
}
3752

3853
/// [`RowSelection`] is a collection of [`RowSelector`] used to skip rows when
@@ -780,6 +795,9 @@ impl RowSelectionCursor {
780795
RowSelectionBacking::Mask(boolean_mask_from_selectors(&selectors))
781796
}
782797
RowSelectionStrategy::Selectors => RowSelectionBacking::Selectors(selectors.into()),
798+
RowSelectionStrategy::Auto { .. } => {
799+
panic!("RowSelectionStrategy::Auto must be resolved before creating cursor")
800+
}
783801
};
784802

785803
Self {

0 commit comments

Comments
 (0)