Skip to content

Commit 1abae95

Browse files
authored
Merge pull request #9 from alamb/alamb/add_policy
Split RowSelectionPolicy from RowSelectionStrategy
2 parents 59ee569 + c30dca7 commit 1abae95

File tree

7 files changed

+135
-92
lines changed

7 files changed

+135
-92
lines changed

parquet/benches/row_selection_state.rs

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use bytes::Bytes;
2525
use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
2626
use parquet::arrow::ArrowWriter;
2727
use parquet::arrow::arrow_reader::{
28-
ParquetRecordBatchReaderBuilder, RowSelection, RowSelectionStrategy, RowSelector,
28+
ParquetRecordBatchReaderBuilder, RowSelection, RowSelectionPolicy, RowSelector,
2929
};
3030
use rand::rngs::StdRng;
3131
use rand::{Rng, SeedableRng};
@@ -239,8 +239,7 @@ fn bench_over_lengths(
239239
&bench_input,
240240
|b, input| {
241241
b.iter(|| {
242-
let total =
243-
run_read(&input.parquet_data, &input.selection, mode.strategy());
242+
let total = run_read(&input.parquet_data, &input.selection, mode.policy());
244243
hint::black_box(total);
245244
});
246245
},
@@ -257,16 +256,12 @@ struct BenchInput {
257256
selection: RowSelection,
258257
}
259258

260-
fn run_read(
261-
parquet_data: &Bytes,
262-
selection: &RowSelection,
263-
strategy: RowSelectionStrategy,
264-
) -> usize {
259+
fn run_read(parquet_data: &Bytes, selection: &RowSelection, policy: RowSelectionPolicy) -> usize {
265260
let reader = ParquetRecordBatchReaderBuilder::try_new(parquet_data.clone())
266261
.unwrap()
267262
.with_batch_size(BATCH_SIZE)
268263
.with_row_selection(selection.clone())
269-
.with_row_selection_strategy(strategy)
264+
.with_row_selection_policy(policy)
270265
.build()
271266
.unwrap();
272267

@@ -465,10 +460,10 @@ impl BenchMode {
465460
}
466461
}
467462

468-
fn strategy(self) -> RowSelectionStrategy {
463+
fn policy(self) -> RowSelectionPolicy {
469464
match self {
470-
BenchMode::ReadSelector => RowSelectionStrategy::Selectors,
471-
BenchMode::ReadMask => RowSelectionStrategy::Mask,
465+
BenchMode::ReadSelector => RowSelectionPolicy::Selectors,
466+
BenchMode::ReadMask => RowSelectionPolicy::Mask,
472467
}
473468
}
474469
}

parquet/src/arrow/arrow_reader/mod.rs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use arrow_array::{Array, RecordBatch, RecordBatchReader};
2222
use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef};
2323
use arrow_select::filter::filter_record_batch;
2424
pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
25-
pub use selection::{RowSelection, RowSelectionCursor, RowSelectionStrategy, RowSelector};
25+
pub use selection::{RowSelection, RowSelectionCursor, RowSelectionPolicy, RowSelector};
2626
use std::fmt::{Debug, Formatter};
2727
use std::sync::Arc;
2828

@@ -49,7 +49,7 @@ pub use read_plan::{ReadPlan, ReadPlanBuilder};
4949
mod filter;
5050
pub mod metrics;
5151
mod read_plan;
52-
mod selection;
52+
pub(crate) mod selection;
5353
pub mod statistics;
5454

5555
/// Builder for constructing Parquet readers that decode into [Apache Arrow]
@@ -125,7 +125,7 @@ pub struct ArrowReaderBuilder<T> {
125125

126126
pub(crate) selection: Option<RowSelection>,
127127

128-
pub(crate) selection_strategy: RowSelectionStrategy,
128+
pub(crate) row_selection_policy: RowSelectionPolicy,
129129

130130
pub(crate) limit: Option<usize>,
131131

@@ -148,7 +148,7 @@ impl<T: Debug> Debug for ArrowReaderBuilder<T> {
148148
.field("projection", &self.projection)
149149
.field("filter", &self.filter)
150150
.field("selection", &self.selection)
151-
.field("selection_strategy", &self.selection_strategy)
151+
.field("row_selection_policy", &self.row_selection_policy)
152152
.field("limit", &self.limit)
153153
.field("offset", &self.offset)
154154
.field("metrics", &self.metrics)
@@ -168,7 +168,7 @@ impl<T> ArrowReaderBuilder<T> {
168168
projection: ProjectionMask::all(),
169169
filter: None,
170170
selection: None,
171-
selection_strategy: RowSelectionStrategy::default(),
171+
row_selection_policy: RowSelectionPolicy::default(),
172172
limit: None,
173173
offset: None,
174174
metrics: ArrowReaderMetrics::Disabled,
@@ -218,9 +218,11 @@ impl<T> ArrowReaderBuilder<T> {
218218
}
219219

220220
/// Configure how row selections should be materialised during execution
221-
pub fn with_row_selection_strategy(self, strategy: RowSelectionStrategy) -> Self {
221+
///
222+
/// See [`RowSelectionPolicy`] for more details
223+
pub fn with_row_selection_policy(self, policy: RowSelectionPolicy) -> Self {
222224
Self {
223-
selection_strategy: strategy,
225+
row_selection_policy: policy,
224226
..self
225227
}
226228
}
@@ -904,7 +906,7 @@ impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
904906
projection,
905907
mut filter,
906908
selection,
907-
selection_strategy,
909+
row_selection_policy,
908910
limit,
909911
offset,
910912
metrics,
@@ -925,7 +927,7 @@ impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
925927

926928
let mut plan_builder = ReadPlanBuilder::new(batch_size)
927929
.with_selection(selection)
928-
.with_selection_strategy(selection_strategy);
930+
.with_row_selection_policy(row_selection_policy);
929931

930932
// Update selection based on any filters
931933
if let Some(filter) = filter.as_mut() {

parquet/src/arrow/arrow_reader/read_plan.rs

Lines changed: 29 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,10 @@
1919
//! from a Parquet file
2020
2121
use crate::arrow::array_reader::ArrayReader;
22+
use crate::arrow::arrow_reader::selection::RowSelectionPolicy;
23+
use crate::arrow::arrow_reader::selection::RowSelectionStrategy;
2224
use crate::arrow::arrow_reader::{
23-
ArrowPredicate, ParquetRecordBatchReader, RowSelection, RowSelectionCursor,
24-
RowSelectionStrategy, RowSelector,
25+
ArrowPredicate, ParquetRecordBatchReader, RowSelection, RowSelectionCursor, RowSelector,
2526
};
2627
use crate::errors::{ParquetError, Result};
2728
use arrow_array::Array;
@@ -32,10 +33,10 @@ use std::collections::VecDeque;
3233
#[derive(Clone, Debug)]
3334
pub struct ReadPlanBuilder {
3435
batch_size: usize,
35-
/// Current to apply, includes all filters
36+
/// Which rows to select. Includes the result of all filters applied so far
3637
selection: Option<RowSelection>,
37-
/// Strategy to use when materialising the row selection
38-
selection_strategy: RowSelectionStrategy,
38+
/// Policy to use when materializing the row selection
39+
row_selection_policy: RowSelectionPolicy,
3940
}
4041

4142
impl ReadPlanBuilder {
@@ -44,7 +45,7 @@ impl ReadPlanBuilder {
4445
Self {
4546
batch_size,
4647
selection: None,
47-
selection_strategy: RowSelectionStrategy::default(),
48+
row_selection_policy: RowSelectionPolicy::default(),
4849
}
4950
}
5051

@@ -54,12 +55,19 @@ impl ReadPlanBuilder {
5455
self
5556
}
5657

57-
/// Configure the strategy to use when materialising the [`RowSelection`]
58-
pub fn with_selection_strategy(mut self, strategy: RowSelectionStrategy) -> Self {
59-
self.selection_strategy = strategy;
58+
/// Configure the policy to use when materialising the [`RowSelection`]
59+
///
60+
/// Defaults to [`RowSelectionPolicy::Auto`]
61+
pub fn with_row_selection_policy(mut self, policy: RowSelectionPolicy) -> Self {
62+
self.row_selection_policy = policy;
6063
self
6164
}
6265

66+
/// Returns the current row selection policy
67+
pub fn row_selection_policy(&self) -> &RowSelectionPolicy {
68+
&self.row_selection_policy
69+
}
70+
6371
/// Returns the current selection, if any
6472
pub fn selection(&self) -> Option<&RowSelection> {
6573
self.selection.as_ref()
@@ -89,14 +97,14 @@ impl ReadPlanBuilder {
8997
self.selection.as_ref().map(|s| s.row_count())
9098
}
9199

92-
/// Returns the preferred [`RowSelectionStrategy`] for materialising the current selection.
100+
/// Returns the [`RowSelectionStrategy`] for this plan.
93101
///
94102
/// Guarantees to return either `Selectors` or `Mask`, never `Auto`.
95-
pub fn preferred_selection_strategy(&self) -> RowSelectionStrategy {
96-
match self.selection_strategy {
97-
RowSelectionStrategy::Selectors => RowSelectionStrategy::Selectors,
98-
RowSelectionStrategy::Mask => RowSelectionStrategy::Mask,
99-
RowSelectionStrategy::Auto { threshold, .. } => {
103+
pub(crate) fn resolve_selection_strategy(&self) -> RowSelectionStrategy {
104+
match self.row_selection_policy {
105+
RowSelectionPolicy::Selectors => RowSelectionStrategy::Selectors,
106+
RowSelectionPolicy::Mask => RowSelectionStrategy::Mask,
107+
RowSelectionPolicy::Auto { threshold, .. } => {
100108
let selection = match self.selection.as_ref() {
101109
Some(selection) => selection,
102110
None => return RowSelectionStrategy::Selectors,
@@ -172,12 +180,12 @@ impl ReadPlanBuilder {
172180
}
173181

174182
// Preferred strategy must not be Auto
175-
let selection_strategy = self.preferred_selection_strategy();
183+
let selection_strategy = self.resolve_selection_strategy();
176184

177185
let Self {
178186
batch_size,
179187
selection,
180-
selection_strategy: _,
188+
row_selection_policy: _,
181189
} = self;
182190

183191
let selection = selection.map(|s| s.trim());
@@ -191,7 +199,6 @@ impl ReadPlanBuilder {
191199
RowSelectionCursor::new_mask_from_selectors(selectors)
192200
}
193201
RowSelectionStrategy::Selectors => RowSelectionCursor::new_selectors(selectors),
194-
RowSelectionStrategy::Auto { .. } => unreachable!(),
195202
}
196203
})
197204
.unwrap_or(RowSelectionCursor::new_all());
@@ -335,21 +342,18 @@ mod tests {
335342
let selection = RowSelection::from(vec![RowSelector::select(8)]);
336343
let builder = builder_with_selection(selection);
337344
assert_eq!(
338-
builder.preferred_selection_strategy(),
345+
builder.resolve_selection_strategy(),
339346
RowSelectionStrategy::Mask
340347
);
341348
}
342349

343350
#[test]
344351
fn preferred_selection_strategy_prefers_selectors_when_threshold_small() {
345352
let selection = RowSelection::from(vec![RowSelector::select(8)]);
346-
let builder =
347-
builder_with_selection(selection).with_selection_strategy(RowSelectionStrategy::Auto {
348-
threshold: 1,
349-
safe_strategy: true,
350-
});
353+
let builder = builder_with_selection(selection)
354+
.with_row_selection_policy(RowSelectionPolicy::Auto { threshold: 1 });
351355
assert_eq!(
352-
builder.preferred_selection_strategy(),
356+
builder.resolve_selection_strategy(),
353357
RowSelectionStrategy::Selectors
354358
);
355359
}

parquet/src/arrow/arrow_reader/selection.rs

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,12 @@ use std::cmp::Ordering;
2525
use std::collections::VecDeque;
2626
use std::ops::Range;
2727

28-
/// Strategy for materialising [`RowSelection`] during execution.
28+
/// Policy for picking a strategy to materialise [`RowSelection`] during execution.
29+
///
30+
/// Note that this is a user-provided preference, and the actual strategy used
31+
/// may differ based on safety considerations (e.g. page skipping).
2932
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
30-
pub enum RowSelectionStrategy {
33+
pub enum RowSelectionPolicy {
3134
/// Use a queue of [`RowSelector`] values
3235
Selectors,
3336
/// Use a boolean mask to materialise the selection
@@ -36,20 +39,27 @@ pub enum RowSelectionStrategy {
3639
Auto {
3740
/// Average selector length below which masks are preferred
3841
threshold: usize,
39-
/// Fallback to selectors when mask would be unsafe (e.g. page skipping)
40-
safe_strategy: bool,
4142
},
4243
}
4344

44-
impl Default for RowSelectionStrategy {
45+
impl Default for RowSelectionPolicy {
4546
fn default() -> Self {
46-
Self::Auto {
47-
threshold: 32,
48-
safe_strategy: true,
49-
}
47+
Self::Auto { threshold: 32 }
5048
}
5149
}
5250

51+
/// Fully resolved strategy for materializing [`RowSelection`] during execution.
52+
///
53+
/// This is determined from a combination of user preference (via [`RowSelectionPolicy`])
54+
/// and safety considerations (e.g. page skipping).
55+
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
56+
pub(crate) enum RowSelectionStrategy {
57+
/// Use a queue of [`RowSelector`] values
58+
Selectors,
59+
/// Use a boolean mask to materialise the selection
60+
Mask,
61+
}
62+
5363
/// [`RowSelection`] is a collection of [`RowSelector`] used to skip rows when
5464
/// scanning a parquet file
5565
#[derive(Debug, Clone, Copy, Eq, PartialEq)]

parquet/src/arrow/async_reader/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -486,7 +486,7 @@ impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
486486
projection,
487487
filter,
488488
selection,
489-
selection_strategy,
489+
row_selection_policy: selection_strategy,
490490
limit,
491491
offset,
492492
metrics,
@@ -508,7 +508,7 @@ impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
508508
projection,
509509
filter,
510510
selection,
511-
selection_strategy,
511+
row_selection_policy: selection_strategy,
512512
batch_size,
513513
row_groups,
514514
limit,

parquet/src/arrow/push_decoder/mod.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,6 @@ impl ParquetPushDecoderBuilder {
162162

163163
/// Create a [`ParquetPushDecoder`] with the configured options
164164
pub fn build(self) -> Result<ParquetPushDecoder, ParquetError> {
165-
let selection_strategy = self.selection_strategy;
166165
let Self {
167166
input: NoInput,
168167
metadata: parquet_metadata,
@@ -176,7 +175,7 @@ impl ParquetPushDecoderBuilder {
176175
limit,
177176
offset,
178177
metrics,
179-
selection_strategy: _,
178+
row_selection_policy,
180179
max_predicate_cache_size,
181180
} = self;
182181

@@ -198,7 +197,7 @@ impl ParquetPushDecoderBuilder {
198197
metrics,
199198
max_predicate_cache_size,
200199
buffers,
201-
selection_strategy,
200+
row_selection_policy,
202201
);
203202

204203
// Initialize the decoder with the configured options

0 commit comments

Comments
 (0)