diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index 0d2ffa5361f6..c447c4f2b145 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -260,5 +260,10 @@ name = "row_selector" harness = false required-features = ["arrow"] +[[bench]] +name = "row_selection_cursor" +harness = false +required-features = ["arrow"] + [lib] bench = false diff --git a/parquet/benches/row_selection_cursor.rs b/parquet/benches/row_selection_cursor.rs new file mode 100644 index 000000000000..49c9e6d68acf --- /dev/null +++ b/parquet/benches/row_selection_cursor.rs @@ -0,0 +1,501 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::hint; +use std::sync::Arc; + +use arrow_array::builder::StringViewBuilder; +use arrow_array::{ArrayRef, Float64Array, Int32Array, RecordBatch, StringViewArray}; +use arrow_schema::{DataType, Field, Schema}; +use bytes::Bytes; +use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main}; +use parquet::arrow::ArrowWriter; +use parquet::arrow::arrow_reader::{ + ParquetRecordBatchReaderBuilder, RowSelection, RowSelectionPolicy, RowSelector, +}; +use rand::rngs::StdRng; +use rand::{Rng, SeedableRng}; + +const TOTAL_ROWS: usize = 1 << 20; +const BATCH_SIZE: usize = 1 << 10; +const BASE_SEED: u64 = 0xA55AA55A; +const AVG_SELECTOR_LENGTHS: &[usize] = &[4, 8, 12, 16, 20, 24, 28, 32, 36, 40]; +const COLUMN_WIDTHS: &[usize] = &[2, 4, 8, 16, 32]; +const UTF8VIEW_LENS: &[usize] = &[4, 8, 16, 32, 64, 128, 256]; +const BENCH_MODES: &[BenchMode] = &[BenchMode::ReadSelector, BenchMode::ReadMask]; + +struct DataProfile { + name: &'static str, + build_batch: fn(usize) -> RecordBatch, +} + +const DATA_PROFILES: &[DataProfile] = &[ + DataProfile { + name: "int32", + build_batch: build_int32_batch, + }, + DataProfile { + name: "float64", + build_batch: build_float64_batch, + }, + DataProfile { + name: "utf8view", + build_batch: build_utf8view_batch, + }, +]; + +fn criterion_benchmark(c: &mut Criterion) { + let scenarios = [ + /* uniform50 (50% selected, constant run lengths, starts with skip) + ```text + ┌───────────────┐ + │ │ skip + │ │ + │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ select + │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + │ │ skip + │ │ + │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ select + │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + │ ... │ + └───────────────┘ + ``` */ + Scenario { + name: "uniform50", + select_ratio: 0.5, + start_with_select: false, + distribution: RunDistribution::Constant, + }, + /* spread50 (50% selected, large jitter in run lengths, starts with skip) + ```text + ┌───────────────┐ + │ │ skip (long) + │ │ + │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ select (short) + │ │ skip (short) + │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ select (long) + │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + │ │ skip (medium) + │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ select (medium) + │ ... │ + └───────────────┘ + ``` */ + Scenario { + name: "spread50", + select_ratio: 0.5, + start_with_select: false, + distribution: RunDistribution::Uniform { spread: 0.9 }, + }, + /* sparse20 (20% selected, bimodal: occasional long runs, starts with skip) + ```text + ┌───────────────┐ + │ │ skip (long) + │ │ + │ │ + │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ select (short) + │ │ skip (long) + │ │ + │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ select (occasional long) + │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + │ ... │ + └───────────────┘ + ``` */ + Scenario { + name: "sparse20", + select_ratio: 0.2, + start_with_select: false, + distribution: RunDistribution::Bimodal { + long_factor: 6.0, + long_prob: 0.1, + }, + }, + /* dense80 (80% selected, bimodal: occasional long runs, starts with select) + ```text + ┌───────────────┐ + │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ select (long) + │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + │ │ skip (short) + │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ select (long) + │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + │ │ skip (very short) + │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ select (long) + │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + │ ... │ + └───────────────┘ + ``` */ + Scenario { + name: "dense80", + select_ratio: 0.8, + start_with_select: true, + distribution: RunDistribution::Bimodal { + long_factor: 4.0, + long_prob: 0.05, + }, + }, + ]; + + let base_parquet = build_parquet_data(TOTAL_ROWS, build_int32_batch); + let base_scenario = &scenarios[0]; + + for (idx, scenario) in scenarios.iter().enumerate() { + // The first scenario is a special case for backwards compatibility with + // existing benchmark result formats. + let suite = if idx == 0 { "len" } else { "scenario" }; + bench_over_lengths( + c, + suite, + scenario.name, + &base_parquet, + scenario, + BASE_SEED ^ ((idx as u64) << 16), + ); + } + + for (profile_idx, profile) in DATA_PROFILES.iter().enumerate() { + let parquet_data = build_parquet_data(TOTAL_ROWS, profile.build_batch); + bench_over_lengths( + c, + "dtype", + profile.name, + &parquet_data, + base_scenario, + BASE_SEED ^ ((profile_idx as u64) << 24), + ); + } + + for (offset, &column_count) in COLUMN_WIDTHS.iter().enumerate() { + let parquet_data = write_parquet_batch(build_int32_columns_batch(TOTAL_ROWS, column_count)); + let variant_label = format!("C{:02}", column_count); + bench_over_lengths( + c, + "columns", + &variant_label, + &parquet_data, + base_scenario, + BASE_SEED ^ ((offset as u64) << 32), + ); + } + + for (offset, &len) in UTF8VIEW_LENS.iter().enumerate() { + let batch = build_utf8view_batch_with_len(TOTAL_ROWS, len); + let parquet_data = write_parquet_batch(batch); + let variant_label = format!("utf8view-L{:03}", len); + bench_over_lengths( + c, + "utf8view-len", + &variant_label, + &parquet_data, + base_scenario, + BASE_SEED ^ ((offset as u64) << 40), + ); + } +} + +fn bench_over_lengths( + c: &mut Criterion, + suite: &str, + variant: &str, + parquet_data: &Bytes, + scenario: &Scenario, + seed_base: u64, +) { + for (offset, &avg_len) in AVG_SELECTOR_LENGTHS.iter().enumerate() { + let selectors = + generate_selectors(avg_len, TOTAL_ROWS, scenario, seed_base + offset as u64); + let stats = SelectorStats::new(&selectors); + let selection = RowSelection::from(selectors); + let suffix = format!( + "{}-{}-{}-L{:02}-avg{:.1}-sel{:02}", + suite, + scenario.name, + variant, + avg_len, + stats.average_selector_len, + (stats.select_ratio * 100.0).round() as u32 + ); + + let bench_input = BenchInput { + parquet_data: parquet_data.clone(), + selection, + }; + + for &mode in BENCH_MODES { + c.bench_with_input( + BenchmarkId::new(mode.label(), &suffix), + &bench_input, + |b, input| { + b.iter(|| { + let total = run_read(&input.parquet_data, &input.selection, mode.policy()); + hint::black_box(total); + }); + }, + ); + } + } +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); + +struct BenchInput { + parquet_data: Bytes, + selection: RowSelection, +} + +fn run_read(parquet_data: &Bytes, selection: &RowSelection, policy: RowSelectionPolicy) -> usize { + let reader = ParquetRecordBatchReaderBuilder::try_new(parquet_data.clone()) + .unwrap() + .with_batch_size(BATCH_SIZE) + .with_row_selection(selection.clone()) + .with_row_selection_policy(policy) + .build() + .unwrap(); + + let mut total_rows = 0usize; + for batch in reader { + let batch = batch.unwrap(); + total_rows += batch.num_rows(); + } + total_rows +} + +fn build_parquet_data(total_rows: usize, build_batch: fn(usize) -> RecordBatch) -> Bytes { + let batch = build_batch(total_rows); + write_parquet_batch(batch) +} + +fn build_single_column_batch(data_type: DataType, array: ArrayRef) -> RecordBatch { + let schema = Arc::new(Schema::new(vec![Field::new("value", data_type, false)])); + RecordBatch::try_new(schema, vec![array]).unwrap() +} + +fn build_int32_batch(total_rows: usize) -> RecordBatch { + let values = Int32Array::from_iter_values((0..total_rows).map(|v| v as i32)); + build_single_column_batch(DataType::Int32, Arc::new(values) as ArrayRef) +} + +fn build_float64_batch(total_rows: usize) -> RecordBatch { + let values = Float64Array::from_iter_values((0..total_rows).map(|v| v as f64)); + build_single_column_batch(DataType::Float64, Arc::new(values) as ArrayRef) +} + +fn build_utf8view_batch(total_rows: usize) -> RecordBatch { + let mut builder = StringViewBuilder::new(); + // Mix short and long values. + for i in 0..total_rows { + match i % 5 { + 0 => builder.append_value("alpha"), + 1 => builder.append_value("beta"), + 2 => builder.append_value("gamma"), + 3 => builder.append_value("delta"), + _ => builder.append_value("a longer utf8 string payload to test view storage"), + } + } + let values: StringViewArray = builder.finish(); + build_single_column_batch(DataType::Utf8View, Arc::new(values) as ArrayRef) +} + +fn build_utf8view_batch_with_len(total_rows: usize, len: usize) -> RecordBatch { + let mut builder = StringViewBuilder::new(); + let value: String = "a".repeat(len); + for _ in 0..total_rows { + builder.append_value(&value); + } + let values: StringViewArray = builder.finish(); + build_single_column_batch(DataType::Utf8View, Arc::new(values) as ArrayRef) +} + +fn build_int32_columns_batch(total_rows: usize, num_columns: usize) -> RecordBatch { + let base_values: ArrayRef = Arc::new(Int32Array::from_iter_values( + (0..total_rows).map(|v| v as i32), + )); + let mut fields = Vec::with_capacity(num_columns); + let mut columns = Vec::with_capacity(num_columns); + for idx in 0..num_columns { + fields.push(Field::new(format!("value{}", idx), DataType::Int32, false)); + columns.push(base_values.clone()); + } + let schema = Arc::new(Schema::new(fields)); + RecordBatch::try_new(schema, columns).unwrap() +} + +fn write_parquet_batch(batch: RecordBatch) -> Bytes { + let schema = batch.schema(); + let mut writer = ArrowWriter::try_new(Vec::new(), schema.clone(), None).unwrap(); + writer.write(&batch).unwrap(); + let buffer = writer.into_inner().unwrap(); + Bytes::from(buffer) +} + +#[derive(Clone)] +struct Scenario { + name: &'static str, + select_ratio: f64, + start_with_select: bool, + distribution: RunDistribution, +} + +#[derive(Clone)] +enum RunDistribution { + Constant, + Uniform { spread: f64 }, + Bimodal { long_factor: f64, long_prob: f64 }, +} + +fn generate_selectors( + avg_selector_len: usize, + total_rows: usize, + scenario: &Scenario, + seed: u64, +) -> Vec { + assert!( + (0.0..=1.0).contains(&scenario.select_ratio), + "select_ratio must be in [0, 1]" + ); + + let mut select_mean = scenario.select_ratio * 2.0 * avg_selector_len as f64; + let mut skip_mean = (1.0 - scenario.select_ratio) * 2.0 * avg_selector_len as f64; + + select_mean = select_mean.max(1.0); + skip_mean = skip_mean.max(1.0); + + let sum = select_mean + skip_mean; + // Rebalance the sampled select/skip run lengths so their sum matches the requested + // average selector length while respecting the configured selectivity ratio. + let scale = if sum == 0.0 { + 1.0 + } else { + (2.0 * avg_selector_len as f64) / sum + }; + select_mean *= scale; + skip_mean *= scale; + + let mut rng = StdRng::seed_from_u64(seed ^ (avg_selector_len as u64).wrapping_mul(0x9E3779B1)); + let mut selectors = Vec::with_capacity(total_rows / avg_selector_len.max(1)); + let mut remaining = total_rows; + let mut is_select = scenario.start_with_select; + + while remaining > 0 { + let mean = if is_select { select_mean } else { skip_mean }; + let len = sample_length(mean, &scenario.distribution, &mut rng).max(1); + let len = len.min(remaining); + selectors.push(if is_select { + RowSelector::select(len) + } else { + RowSelector::skip(len) + }); + remaining -= len; + if remaining == 0 { + break; + } + is_select = !is_select; + } + + let selection: RowSelection = selectors.into(); + selection.into() +} + +fn sample_length(mean: f64, distribution: &RunDistribution, rng: &mut StdRng) -> usize { + match distribution { + RunDistribution::Constant => mean.round().max(1.0) as usize, + RunDistribution::Uniform { spread } => { + let spread = spread.clamp(0.0, 0.99); + let lower = (mean * (1.0 - spread)).max(1.0); + let upper = (mean * (1.0 + spread)).max(lower + f64::EPSILON); + if (upper - lower) < 1.0 { + lower.round().max(1.0) as usize + } else { + let low = lower.floor() as usize; + let high = upper.ceil() as usize; + rng.random_range(low..=high).max(1) + } + } + RunDistribution::Bimodal { + long_factor, + long_prob, + } => { + let long_prob = long_prob.clamp(0.0, 0.5); + let short_prob = 1.0 - long_prob; + let short_factor = if short_prob == 0.0 { + 1.0 / long_factor.max(f64::EPSILON) + } else { + (1.0 - long_prob * long_factor).max(0.0) / short_prob + }; + let use_long = rng.random_bool(long_prob); + let factor = if use_long { + *long_factor + } else { + short_factor.max(0.1) + }; + (mean * factor).round().max(1.0) as usize + } + } +} + +#[derive(Clone, Copy)] +enum BenchMode { + ReadSelector, + ReadMask, +} + +impl BenchMode { + fn label(self) -> &'static str { + match self { + BenchMode::ReadSelector => "read_selector", + BenchMode::ReadMask => "read_mask", + } + } + + fn policy(self) -> RowSelectionPolicy { + match self { + BenchMode::ReadSelector => RowSelectionPolicy::Selectors, + BenchMode::ReadMask => RowSelectionPolicy::Mask, + } + } +} + +struct SelectorStats { + average_selector_len: f64, + select_ratio: f64, +} + +impl SelectorStats { + fn new(selectors: &[RowSelector]) -> Self { + if selectors.is_empty() { + return Self { + average_selector_len: 0.0, + select_ratio: 0.0, + }; + } + + let total_rows: usize = selectors.iter().map(|s| s.row_count).sum(); + let selected_rows: usize = selectors + .iter() + .filter(|s| !s.skip) + .map(|s| s.row_count) + .sum(); + + Self { + average_selector_len: total_rows as f64 / selectors.len() as f64, + select_ratio: if total_rows == 0 { + 0.0 + } else { + selected_rows as f64 / total_rows as f64 + }, + } + } +} diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 4e12c55c9f33..b15e402fa7aa 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -17,12 +17,12 @@ //! Contains reader which reads parquet data into arrow [`RecordBatch`] -use arrow_array::Array; use arrow_array::cast::AsArray; -use arrow_array::{RecordBatch, RecordBatchReader}; +use arrow_array::{Array, RecordBatch, RecordBatchReader}; use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef}; +use arrow_select::filter::filter_record_batch; pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter}; -pub use selection::{RowSelection, RowSelector}; +pub use selection::{RowSelection, RowSelectionCursor, RowSelectionPolicy, RowSelector}; use std::fmt::{Debug, Formatter}; use std::sync::Arc; @@ -43,12 +43,13 @@ use crate::file::reader::{ChunkReader, SerializedPageReader}; use crate::schema::types::SchemaDescriptor; use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics; +// Exposed so integration tests and benchmarks can temporarily override the threshold. pub use read_plan::{ReadPlan, ReadPlanBuilder}; mod filter; pub mod metrics; mod read_plan; -mod selection; +pub(crate) mod selection; pub mod statistics; /// Builder for constructing Parquet readers that decode into [Apache Arrow] @@ -124,6 +125,8 @@ pub struct ArrowReaderBuilder { pub(crate) selection: Option, + pub(crate) row_selection_policy: RowSelectionPolicy, + pub(crate) limit: Option, pub(crate) offset: Option, @@ -145,6 +148,7 @@ impl Debug for ArrowReaderBuilder { .field("projection", &self.projection) .field("filter", &self.filter) .field("selection", &self.selection) + .field("row_selection_policy", &self.row_selection_policy) .field("limit", &self.limit) .field("offset", &self.offset) .field("metrics", &self.metrics) @@ -164,6 +168,7 @@ impl ArrowReaderBuilder { projection: ProjectionMask::all(), filter: None, selection: None, + row_selection_policy: RowSelectionPolicy::default(), limit: None, offset: None, metrics: ArrowReaderMetrics::Disabled, @@ -212,6 +217,16 @@ impl ArrowReaderBuilder { } } + /// Configure how row selections should be materialised during execution + /// + /// See [`RowSelectionPolicy`] for more details + pub fn with_row_selection_policy(self, policy: RowSelectionPolicy) -> Self { + Self { + row_selection_policy: policy, + ..self + } + } + /// Provide a [`RowSelection`] to filter out rows, and avoid fetching their /// data into memory. /// @@ -886,11 +901,12 @@ impl ParquetRecordBatchReaderBuilder { metadata, schema: _, fields, - batch_size: _, + batch_size, row_groups, projection, mut filter, selection, + row_selection_policy, limit, offset, metrics, @@ -899,9 +915,7 @@ impl ParquetRecordBatchReaderBuilder { } = self; // Try to avoid allocate large buffer - let batch_size = self - .batch_size - .min(metadata.file_metadata().num_rows() as usize); + let batch_size = batch_size.min(metadata.file_metadata().num_rows() as usize); let row_groups = row_groups.unwrap_or_else(|| (0..metadata.num_row_groups()).collect()); @@ -911,7 +925,9 @@ impl ParquetRecordBatchReaderBuilder { row_groups, }; - let mut plan_builder = ReadPlanBuilder::new(batch_size).with_selection(selection); + let mut plan_builder = ReadPlanBuilder::new(batch_size) + .with_selection(selection) + .with_row_selection_policy(row_selection_policy); // Update selection based on any filters if let Some(filter) = filter.as_mut() { @@ -1058,10 +1074,83 @@ impl ParquetRecordBatchReader { fn next_inner(&mut self) -> Result> { let mut read_records = 0; let batch_size = self.batch_size(); - match self.read_plan.selection_mut() { - Some(selection) => { - while read_records < batch_size && !selection.is_empty() { - let front = selection.pop_front().unwrap(); + if batch_size == 0 { + return Ok(None); + } + match self.read_plan.row_selection_cursor_mut() { + RowSelectionCursor::Mask(mask_cursor) => { + // Stream the record batch reader using contiguous segments of the selection + // mask, avoiding the need to materialize intermediate `RowSelector` ranges. + while !mask_cursor.is_empty() { + let Some(mask_chunk) = mask_cursor.next_mask_chunk(batch_size) else { + return Ok(None); + }; + + if mask_chunk.initial_skip > 0 { + let skipped = self.array_reader.skip_records(mask_chunk.initial_skip)?; + if skipped != mask_chunk.initial_skip { + return Err(general_err!( + "failed to skip rows, expected {}, got {}", + mask_chunk.initial_skip, + skipped + )); + } + } + + if mask_chunk.chunk_rows == 0 { + if mask_cursor.is_empty() && mask_chunk.selected_rows == 0 { + return Ok(None); + } + continue; + } + + let mask = mask_cursor.mask_values_for(&mask_chunk)?; + + let read = self.array_reader.read_records(mask_chunk.chunk_rows)?; + if read == 0 { + return Err(general_err!( + "reached end of column while expecting {} rows", + mask_chunk.chunk_rows + )); + } + if read != mask_chunk.chunk_rows { + return Err(general_err!( + "insufficient rows read from array reader - expected {}, got {}", + mask_chunk.chunk_rows, + read + )); + } + + let array = self.array_reader.consume_batch()?; + // The column reader exposes the projection as a struct array; convert this + // into a record batch before applying the boolean filter mask. + let struct_array = array.as_struct_opt().ok_or_else(|| { + ArrowError::ParquetError( + "Struct array reader should return struct array".to_string(), + ) + })?; + + let filtered_batch = + filter_record_batch(&RecordBatch::from(struct_array), &mask)?; + + if filtered_batch.num_rows() != mask_chunk.selected_rows { + return Err(general_err!( + "filtered rows mismatch selection - expected {}, got {}", + mask_chunk.selected_rows, + filtered_batch.num_rows() + )); + } + + if filtered_batch.num_rows() == 0 { + continue; + } + + return Ok(Some(filtered_batch)); + } + } + RowSelectionCursor::Selectors(selectors_cursor) => { + while read_records < batch_size && !selectors_cursor.is_empty() { + let front = selectors_cursor.next_selector(); if front.skip { let skipped = self.array_reader.skip_records(front.row_count)?; @@ -1087,7 +1176,7 @@ impl ParquetRecordBatchReader { Some(remaining) if remaining != 0 => { // if page row count less than batch_size we must set batch size to page row count. // add check avoid dead loop - selection.push_front(RowSelector::select(remaining)); + selectors_cursor.return_selector(RowSelector::select(remaining)); need_read } _ => front.row_count, @@ -1098,7 +1187,7 @@ impl ParquetRecordBatchReader { }; } } - None => { + RowSelectionCursor::All => { self.array_reader.read_records(batch_size)?; } }; @@ -1194,6 +1283,27 @@ mod tests { use std::path::PathBuf; use std::sync::Arc; + use crate::arrow::arrow_reader::{ + ArrowPredicateFn, ArrowReaderBuilder, ArrowReaderOptions, ParquetRecordBatchReader, + ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelectionPolicy, RowSelector, + }; + use crate::arrow::schema::add_encoded_arrow_schema_to_metadata; + use crate::arrow::{ArrowWriter, ProjectionMask}; + use crate::basic::{ConvertedType, Encoding, LogicalType, Repetition, Type as PhysicalType}; + use crate::column::reader::decoder::REPETITION_LEVELS_BATCH_SIZE; + use crate::data_type::{ + BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray, FixedLenByteArrayType, + FloatType, Int32Type, Int64Type, Int96, Int96Type, + }; + use crate::errors::Result; + use crate::file::metadata::ParquetMetaData; + use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion}; + use crate::file::writer::SerializedFileWriter; + use crate::schema::parser::parse_message_type; + use crate::schema::types::{Type, TypePtr}; + use crate::util::test_common::rand_gen::RandGen; + use arrow::compute::kernels::cmp::eq; + use arrow::compute::or; use arrow_array::builder::*; use arrow_array::cast::AsArray; use arrow_array::types::{ @@ -1214,26 +1324,6 @@ mod tests { use rand::{Rng, RngCore, rng}; use tempfile::tempfile; - use crate::arrow::arrow_reader::{ - ArrowPredicateFn, ArrowReaderBuilder, ArrowReaderOptions, ParquetRecordBatchReader, - ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelector, - }; - use crate::arrow::schema::add_encoded_arrow_schema_to_metadata; - use crate::arrow::{ArrowWriter, ProjectionMask}; - use crate::basic::{ConvertedType, Encoding, LogicalType, Repetition, Type as PhysicalType}; - use crate::column::reader::decoder::REPETITION_LEVELS_BATCH_SIZE; - use crate::data_type::{ - BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray, FixedLenByteArrayType, - FloatType, Int32Type, Int64Type, Int96, Int96Type, - }; - use crate::errors::Result; - use crate::file::metadata::ParquetMetaData; - use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion}; - use crate::file::writer::SerializedFileWriter; - use crate::schema::parser::parse_message_type; - use crate::schema::types::{Type, TypePtr}; - use crate::util::test_common::rand_gen::RandGen; - #[test] fn test_arrow_reader_all_columns() { let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet"); @@ -4613,6 +4703,93 @@ mod tests { assert_eq!(out, batch.slice(2, 1)); } + #[test] + fn test_row_selection_interleaved_skip() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new( + "v", + ArrowDataType::Int32, + false, + )])); + + let values = Int32Array::from(vec![0, 1, 2, 3, 4]); + let batch = RecordBatch::try_from_iter([("v", Arc::new(values) as ArrayRef)]).unwrap(); + + let mut buffer = Vec::with_capacity(1024); + let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), None).unwrap(); + writer.write(&batch)?; + writer.close()?; + + let selection = RowSelection::from(vec![ + RowSelector::select(1), + RowSelector::skip(2), + RowSelector::select(2), + ]); + + let mut reader = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer))? + .with_batch_size(4) + .with_row_selection(selection) + .build()?; + + let out = reader.next().unwrap()?; + assert_eq!(out.num_rows(), 3); + let values = out + .column(0) + .as_primitive::() + .values(); + assert_eq!(values, &[0, 3, 4]); + assert!(reader.next().is_none()); + Ok(()) + } + + #[test] + fn test_row_selection_mask_sparse_rows() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new( + "v", + ArrowDataType::Int32, + false, + )])); + + let values = Int32Array::from((0..30).collect::>()); + let batch = RecordBatch::try_from_iter([("v", Arc::new(values) as ArrayRef)])?; + + let mut buffer = Vec::with_capacity(1024); + let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), None)?; + writer.write(&batch)?; + writer.close()?; + + let total_rows = batch.num_rows(); + let ranges = (1..total_rows) + .step_by(2) + .map(|i| i..i + 1) + .collect::>(); + let selection = RowSelection::from_consecutive_ranges(ranges.into_iter(), total_rows); + + let selectors: Vec = selection.clone().into(); + assert!(total_rows < selectors.len() * 8); + + let bytes = Bytes::from(buffer); + + let reader = ParquetRecordBatchReaderBuilder::try_new(bytes.clone())? + .with_batch_size(7) + .with_row_selection(selection) + .build()?; + + let mut collected = Vec::new(); + for batch in reader { + let batch = batch?; + collected.extend_from_slice( + batch + .column(0) + .as_primitive::() + .values(), + ); + } + + let expected: Vec = (1..total_rows).step_by(2).map(|i| i as i32).collect(); + assert_eq!(collected, expected); + Ok(()) + } + fn test_decimal32_roundtrip() { let d = |values: Vec, p: u8| { let iter = values.into_iter(); @@ -4995,6 +5172,78 @@ mod tests { c0.iter().zip(c1.iter()).for_each(|(l, r)| assert_eq!(l, r)); } + #[test] + fn test_row_filter_full_page_skip_is_handled() { + let first_value: i64 = 1111; + let last_value: i64 = 9999; + let num_rows: usize = 12; + + // build data with row selection average length 4 + // The result would be (1111 XXXX) ... (4 page in the middle)... (XXXX 9999) + // The Row Selection would be [1111, (skip 10), 9999] + let schema = Arc::new(Schema::new(vec![ + Field::new("key", arrow_schema::DataType::Int64, false), + Field::new("value", arrow_schema::DataType::Int64, false), + ])); + + let mut int_values: Vec = (0..num_rows as i64).collect(); + int_values[0] = first_value; + int_values[num_rows - 1] = last_value; + let keys = Int64Array::from(int_values.clone()); + let values = Int64Array::from(int_values.clone()); + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(keys) as ArrayRef, Arc::new(values) as ArrayRef], + ) + .unwrap(); + + let props = WriterProperties::builder() + .set_write_batch_size(2) + .set_data_page_row_count_limit(2) + .build(); + + let mut buffer = Vec::new(); + let mut writer = ArrowWriter::try_new(&mut buffer, schema, Some(props)).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + let data = Bytes::from(buffer); + + let options = ArrowReaderOptions::new().with_page_index(true); + let builder = + ParquetRecordBatchReaderBuilder::try_new_with_options(data.clone(), options).unwrap(); + let schema = builder.parquet_schema().clone(); + let filter_mask = ProjectionMask::leaves(&schema, [0]); + + let make_predicate = |mask: ProjectionMask| { + ArrowPredicateFn::new(mask, move |batch: RecordBatch| { + let column = batch.column(0); + let match_first = eq(column, &Int64Array::new_scalar(first_value))?; + let match_second = eq(column, &Int64Array::new_scalar(last_value))?; + or(&match_first, &match_second) + }) + }; + + let options = ArrowReaderOptions::new().with_page_index(true); + let predicate = make_predicate(filter_mask.clone()); + + // The batch size is set to 12 to read all rows in one go after filtering + // If the Reader chooses mask to handle filter, it might cause panic because the mid 4 pages may not be decoded. + let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(data.clone(), options) + .unwrap() + .with_row_filter(RowFilter::new(vec![Box::new(predicate)])) + .with_row_selection_policy(RowSelectionPolicy::Auto { threshold: 32 }) + .with_batch_size(12) + .build() + .unwrap(); + + // Predicate pruning used to panic once mask-backed plans removed whole pages. + // Collecting into batches validates the plan now downgrades to selectors instead. + let schema = reader.schema().clone(); + let batches = reader.collect::, _>>().unwrap(); + let result = concat_batches(&schema, &batches).unwrap(); + assert_eq!(result.num_rows(), 2); + } + #[test] fn test_get_row_group_column_bloom_filter_with_length() { // convert to new parquet file with bloom_filter_length diff --git a/parquet/src/arrow/arrow_reader/read_plan.rs b/parquet/src/arrow/arrow_reader/read_plan.rs index 2210f47df2c1..3c17a358f084 100644 --- a/parquet/src/arrow/arrow_reader/read_plan.rs +++ b/parquet/src/arrow/arrow_reader/read_plan.rs @@ -19,8 +19,10 @@ //! from a Parquet file use crate::arrow::array_reader::ArrayReader; +use crate::arrow::arrow_reader::selection::RowSelectionPolicy; +use crate::arrow::arrow_reader::selection::RowSelectionStrategy; use crate::arrow::arrow_reader::{ - ArrowPredicate, ParquetRecordBatchReader, RowSelection, RowSelector, + ArrowPredicate, ParquetRecordBatchReader, RowSelection, RowSelectionCursor, RowSelector, }; use crate::errors::{ParquetError, Result}; use arrow_array::Array; @@ -31,8 +33,10 @@ use std::collections::VecDeque; #[derive(Clone, Debug)] pub struct ReadPlanBuilder { batch_size: usize, - /// Current to apply, includes all filters + /// Which rows to select. Includes the result of all filters applied so far selection: Option, + /// Policy to use when materializing the row selection + row_selection_policy: RowSelectionPolicy, } impl ReadPlanBuilder { @@ -41,6 +45,7 @@ impl ReadPlanBuilder { Self { batch_size, selection: None, + row_selection_policy: RowSelectionPolicy::default(), } } @@ -50,6 +55,19 @@ impl ReadPlanBuilder { self } + /// Configure the policy to use when materialising the [`RowSelection`] + /// + /// Defaults to [`RowSelectionPolicy::Auto`] + pub fn with_row_selection_policy(mut self, policy: RowSelectionPolicy) -> Self { + self.row_selection_policy = policy; + self + } + + /// Returns the current row selection policy + pub fn row_selection_policy(&self) -> &RowSelectionPolicy { + &self.row_selection_policy + } + /// Returns the current selection, if any pub fn selection(&self) -> Option<&RowSelection> { self.selection.as_ref() @@ -79,6 +97,40 @@ impl ReadPlanBuilder { self.selection.as_ref().map(|s| s.row_count()) } + /// Returns the [`RowSelectionStrategy`] for this plan. + /// + /// Guarantees to return either `Selectors` or `Mask`, never `Auto`. + pub(crate) fn resolve_selection_strategy(&self) -> RowSelectionStrategy { + match self.row_selection_policy { + RowSelectionPolicy::Selectors => RowSelectionStrategy::Selectors, + RowSelectionPolicy::Mask => RowSelectionStrategy::Mask, + RowSelectionPolicy::Auto { threshold, .. } => { + let selection = match self.selection.as_ref() { + Some(selection) => selection, + None => return RowSelectionStrategy::Selectors, + }; + + let trimmed = selection.clone().trim(); + let selectors: Vec = trimmed.into(); + if selectors.is_empty() { + return RowSelectionStrategy::Mask; + } + + let total_rows: usize = selectors.iter().map(|s| s.row_count).sum(); + let selector_count = selectors.len(); + if selector_count == 0 { + return RowSelectionStrategy::Mask; + } + + if total_rows < selector_count.saturating_mul(threshold) { + RowSelectionStrategy::Mask + } else { + RowSelectionStrategy::Selectors + } + } + } + } + /// Evaluates an [`ArrowPredicate`], updating this plan's `selection` /// /// If the current `selection` is `Some`, the resulting [`RowSelection`] @@ -126,16 +178,34 @@ impl ReadPlanBuilder { if !self.selects_any() { self.selection = Some(RowSelection::from(vec![])); } + + // Preferred strategy must not be Auto + let selection_strategy = self.resolve_selection_strategy(); + let Self { batch_size, selection, + row_selection_policy: _, } = self; - let selection = selection.map(|s| s.trim().into()); + let selection = selection.map(|s| s.trim()); + + let row_selection_cursor = selection + .map(|s| { + let trimmed = s.trim(); + let selectors: Vec = trimmed.into(); + match selection_strategy { + RowSelectionStrategy::Mask => { + RowSelectionCursor::new_mask_from_selectors(selectors) + } + RowSelectionStrategy::Selectors => RowSelectionCursor::new_selectors(selectors), + } + }) + .unwrap_or(RowSelectionCursor::new_all()); ReadPlan { batch_size, - selection, + row_selection_cursor, } } } @@ -233,13 +303,23 @@ pub struct ReadPlan { /// The number of rows to read in each batch batch_size: usize, /// Row ranges to be selected from the data source - selection: Option>, + row_selection_cursor: RowSelectionCursor, } impl ReadPlan { - /// Returns a mutable reference to the selection, if any + /// Returns a mutable reference to the selection selectors, if any + #[deprecated(since = "57.1.0", note = "Use `row_selection_cursor_mut` instead")] pub fn selection_mut(&mut self) -> Option<&mut VecDeque> { - self.selection.as_mut() + if let RowSelectionCursor::Selectors(selectors_cursor) = &mut self.row_selection_cursor { + Some(selectors_cursor.selectors_mut()) + } else { + None + } + } + + /// Returns a mutable reference to the row selection cursor + pub fn row_selection_cursor_mut(&mut self) -> &mut RowSelectionCursor { + &mut self.row_selection_cursor } /// Return the number of rows to read in each output batch @@ -248,3 +328,33 @@ impl ReadPlan { self.batch_size } } + +#[cfg(test)] +mod tests { + use super::*; + + fn builder_with_selection(selection: RowSelection) -> ReadPlanBuilder { + ReadPlanBuilder::new(1024).with_selection(Some(selection)) + } + + #[test] + fn preferred_selection_strategy_prefers_mask_by_default() { + let selection = RowSelection::from(vec![RowSelector::select(8)]); + let builder = builder_with_selection(selection); + assert_eq!( + builder.resolve_selection_strategy(), + RowSelectionStrategy::Mask + ); + } + + #[test] + fn preferred_selection_strategy_prefers_selectors_when_threshold_small() { + let selection = RowSelection::from(vec![RowSelector::select(8)]); + let builder = builder_with_selection(selection) + .with_row_selection_policy(RowSelectionPolicy::Auto { threshold: 1 }); + assert_eq!( + builder.resolve_selection_strategy(), + RowSelectionStrategy::Selectors + ); + } +} diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index 1eb7c85d1d88..2ddf812f9c39 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -15,13 +15,50 @@ // specific language governing permissions and limitations // under the License. +use crate::arrow::ProjectionMask; +use crate::errors::ParquetError; +use crate::file::page_index::offset_index::{OffsetIndexMetaData, PageLocation}; use arrow_array::{Array, BooleanArray}; +use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder}; use arrow_select::filter::SlicesIterator; use std::cmp::Ordering; use std::collections::VecDeque; use std::ops::Range; -use crate::file::page_index::offset_index::PageLocation; +/// Policy for picking a strategy to materialise [`RowSelection`] during execution. +/// +/// Note that this is a user-provided preference, and the actual strategy used +/// may differ based on safety considerations (e.g. page skipping). +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum RowSelectionPolicy { + /// Use a queue of [`RowSelector`] values + Selectors, + /// Use a boolean mask to materialise the selection + Mask, + /// Choose between [`Self::Mask`] and [`Self::Selectors`] based on selector density + Auto { + /// Average selector length below which masks are preferred + threshold: usize, + }, +} + +impl Default for RowSelectionPolicy { + fn default() -> Self { + Self::Auto { threshold: 32 } + } +} + +/// Fully resolved strategy for materializing [`RowSelection`] during execution. +/// +/// This is determined from a combination of user preference (via [`RowSelectionPolicy`]) +/// and safety considerations (e.g. page skipping). +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub(crate) enum RowSelectionStrategy { + /// Use a queue of [`RowSelector`] values + Selectors, + /// Use a boolean mask to materialise the selection + Mask, +} /// [`RowSelection`] is a collection of [`RowSelector`] used to skip rows when /// scanning a parquet file @@ -213,6 +250,39 @@ impl RowSelection { ranges } + /// Returns true if this selection would skip any data pages within the provided columns + fn selection_skips_any_page( + &self, + projection: &ProjectionMask, + columns: &[OffsetIndexMetaData], + ) -> bool { + columns.iter().enumerate().any(|(leaf_idx, column)| { + if !projection.leaf_included(leaf_idx) { + return false; + } + + let locations = column.page_locations(); + if locations.is_empty() { + return false; + } + + let ranges = self.scan_ranges(locations); + !ranges.is_empty() && ranges.len() < locations.len() + }) + } + + /// Returns true if selectors should be forced, preventing mask materialisation + pub(crate) fn should_force_selectors( + &self, + projection: &ProjectionMask, + offset_index: Option<&[OffsetIndexMetaData]>, + ) -> bool { + match offset_index { + Some(columns) => self.selection_skips_any_page(projection, columns), + None => false, + } + } + /// Splits off the first `row_count` from this [`RowSelection`] pub fn split_off(&mut self, row_count: usize) -> Self { let mut total_count = 0; @@ -691,6 +761,177 @@ fn union_row_selections(left: &[RowSelector], right: &[RowSelector]) -> RowSelec iter.collect() } +/// Cursor for iterating a mask-backed [`RowSelection`] +/// +/// This is best for dense selections where there are many small skips +/// or selections. For example, selecting every other row. +#[derive(Debug)] +pub struct MaskCursor { + mask: BooleanBuffer, + /// Current absolute offset into the selection + position: usize, +} + +impl MaskCursor { + /// Returns `true` when no further rows remain + pub fn is_empty(&self) -> bool { + self.position >= self.mask.len() + } + + /// Advance through the mask representation, producing the next chunk summary + pub fn next_mask_chunk(&mut self, batch_size: usize) -> Option { + let (initial_skip, chunk_rows, selected_rows, mask_start, end_position) = { + let mask = &self.mask; + + if self.position >= mask.len() { + return None; + } + + let start_position = self.position; + let mut cursor = start_position; + let mut initial_skip = 0; + + while cursor < mask.len() && !mask.value(cursor) { + initial_skip += 1; + cursor += 1; + } + + let mask_start = cursor; + let mut chunk_rows = 0; + let mut selected_rows = 0; + + // Advance until enough rows have been selected to satisfy the batch size, + // or until the mask is exhausted. This mirrors the behaviour of the legacy + // `RowSelector` queue-based iteration. + while cursor < mask.len() && selected_rows < batch_size { + chunk_rows += 1; + if mask.value(cursor) { + selected_rows += 1; + } + cursor += 1; + } + + (initial_skip, chunk_rows, selected_rows, mask_start, cursor) + }; + + self.position = end_position; + + Some(MaskChunk { + initial_skip, + chunk_rows, + selected_rows, + mask_start, + }) + } + + /// Materialise the boolean values for a mask-backed chunk + pub fn mask_values_for(&self, chunk: &MaskChunk) -> Result { + if chunk.mask_start.saturating_add(chunk.chunk_rows) > self.mask.len() { + return Err(ParquetError::General( + "Internal Error: MaskChunk exceeds mask length".to_string(), + )); + } + Ok(BooleanArray::from( + self.mask.slice(chunk.mask_start, chunk.chunk_rows), + )) + } +} + +/// Cursor for iterating a selector-backed [`RowSelection`] +/// +/// This is best for sparse selections where large contiguous +/// blocks of rows are selected or skipped. +#[derive(Debug)] +pub struct SelectorsCursor { + selectors: VecDeque, + /// Current absolute offset into the selection + position: usize, +} + +impl SelectorsCursor { + /// Returns `true` when no further rows remain + pub fn is_empty(&self) -> bool { + self.selectors.is_empty() + } + + pub(crate) fn selectors_mut(&mut self) -> &mut VecDeque { + &mut self.selectors + } + + /// Return the next [`RowSelector`] + pub(crate) fn next_selector(&mut self) -> RowSelector { + let selector = self.selectors.pop_front().unwrap(); + self.position += selector.row_count; + selector + } + + /// Return a selector to the front, rewinding the position + pub(crate) fn return_selector(&mut self, selector: RowSelector) { + self.position = self.position.saturating_sub(selector.row_count); + self.selectors.push_front(selector); + } +} + +/// Result of computing the next chunk to read when using a [`MaskCursor`] +#[derive(Debug)] +pub struct MaskChunk { + /// Number of leading rows to skip before reaching selected rows + pub initial_skip: usize, + /// Total rows covered by this chunk (selected + skipped) + pub chunk_rows: usize, + /// Rows actually selected within the chunk + pub selected_rows: usize, + /// Starting offset within the mask where the chunk begins + pub mask_start: usize, +} + +/// Cursor for iterating a [`RowSelection`] during execution within a +/// [`ReadPlan`](crate::arrow::arrow_reader::ReadPlan). +/// +/// This keeps per-reader state such as the current position and delegates the +/// actual storage strategy to the internal `RowSelectionBacking`. +#[derive(Debug)] +pub enum RowSelectionCursor { + /// Reading all rows + All, + /// Use a bitmask to back the selection (dense selections) + Mask(MaskCursor), + /// Use a queue of selectors to back the selection (sparse selections) + Selectors(SelectorsCursor), +} + +impl RowSelectionCursor { + /// Create a [`MaskCursor`] cursor backed by a bitmask, from an existing set of selectors + pub(crate) fn new_mask_from_selectors(selectors: Vec) -> Self { + Self::Mask(MaskCursor { + mask: boolean_mask_from_selectors(&selectors), + position: 0, + }) + } + + /// Create a [`RowSelectionCursor::Selectors`] from the provided selectors + pub(crate) fn new_selectors(selectors: Vec) -> Self { + Self::Selectors(SelectorsCursor { + selectors: selectors.into(), + position: 0, + }) + } + + /// Create a cursor that selects all rows + pub(crate) fn new_all() -> Self { + Self::All + } +} + +fn boolean_mask_from_selectors(selectors: &[RowSelector]) -> BooleanBuffer { + let total_rows: usize = selectors.iter().map(|s| s.row_count).sum(); + let mut builder = BooleanBufferBuilder::new(total_rows); + for selector in selectors { + builder.append_n(selector.row_count, !selector.skip); + } + builder.finish() +} + #[cfg(test)] mod tests { use super::*; diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 44c5465202e7..61a84ea1e4bf 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -486,6 +486,7 @@ impl ParquetRecordBatchStreamBuilder { projection, filter, selection, + row_selection_policy: selection_strategy, limit, offset, metrics, @@ -507,6 +508,7 @@ impl ParquetRecordBatchStreamBuilder { projection, filter, selection, + row_selection_policy: selection_strategy, batch_size, row_groups, limit, @@ -761,6 +763,7 @@ where #[cfg(test)] mod tests { use super::*; + use crate::arrow::arrow_reader::RowSelectionPolicy; use crate::arrow::arrow_reader::{ ArrowPredicateFn, ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelector, }; @@ -769,15 +772,17 @@ mod tests { use crate::file::metadata::ParquetMetaDataReader; use crate::file::properties::WriterProperties; use arrow::compute::kernels::cmp::eq; + use arrow::compute::or; use arrow::error::Result as ArrowResult; use arrow_array::builder::{ListBuilder, StringBuilder}; use arrow_array::cast::AsArray; use arrow_array::types::Int32Type; use arrow_array::{ - Array, ArrayRef, Int8Array, Int32Array, RecordBatchReader, Scalar, StringArray, + Array, ArrayRef, Int8Array, Int32Array, Int64Array, RecordBatchReader, Scalar, StringArray, StructArray, UInt64Array, }; use arrow_schema::{DataType, Field, Schema}; + use arrow_select::concat::concat_batches; use futures::{StreamExt, TryStreamExt}; use rand::{Rng, rng}; use std::collections::HashMap; @@ -1203,6 +1208,82 @@ mod tests { assert_eq!(actual_rows, expected_rows); } + #[tokio::test] + async fn test_row_filter_full_page_skip_is_handled_async() { + let first_value: i64 = 1111; + let last_value: i64 = 9999; + let num_rows: usize = 12; + + // build data with row selection average length 4 + // The result would be (1111 XXXX) ... (4 page in the middle)... (XXXX 9999) + // The Row Selection would be [1111, (skip 10), 9999] + let schema = Arc::new(Schema::new(vec![ + Field::new("key", DataType::Int64, false), + Field::new("value", DataType::Int64, false), + ])); + + let mut int_values: Vec = (0..num_rows as i64).collect(); + int_values[0] = first_value; + int_values[num_rows - 1] = last_value; + let keys = Int64Array::from(int_values.clone()); + let values = Int64Array::from(int_values.clone()); + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(keys) as ArrayRef, Arc::new(values) as ArrayRef], + ) + .unwrap(); + + let props = WriterProperties::builder() + .set_write_batch_size(2) + .set_data_page_row_count_limit(2) + .build(); + + let mut buffer = Vec::new(); + let mut writer = ArrowWriter::try_new(&mut buffer, schema, Some(props)).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + let data = Bytes::from(buffer); + + let builder = ParquetRecordBatchStreamBuilder::new_with_options( + TestReader::new(data.clone()), + ArrowReaderOptions::new().with_page_index(true), + ) + .await + .unwrap(); + let schema = builder.parquet_schema().clone(); + let filter_mask = ProjectionMask::leaves(&schema, [0]); + + let make_predicate = |mask: ProjectionMask| { + ArrowPredicateFn::new(mask, move |batch: RecordBatch| { + let column = batch.column(0); + let match_first = eq(column, &Int64Array::new_scalar(first_value))?; + let match_second = eq(column, &Int64Array::new_scalar(last_value))?; + or(&match_first, &match_second) + }) + }; + + let predicate = make_predicate(filter_mask.clone()); + + // The batch size is set to 12 to read all rows in one go after filtering + // If the Reader chooses mask to handle filter, it might cause panic because the mid 4 pages may not be decoded. + let stream = ParquetRecordBatchStreamBuilder::new_with_options( + TestReader::new(data.clone()), + ArrowReaderOptions::new().with_page_index(true), + ) + .await + .unwrap() + .with_row_filter(RowFilter::new(vec![Box::new(predicate)])) + .with_batch_size(12) + .with_row_selection_policy(RowSelectionPolicy::Auto { threshold: 32 }) + .build() + .unwrap(); + + let schema = stream.schema().clone(); + let batches: Vec<_> = stream.try_collect().await.unwrap(); + let result = concat_batches(&schema, &batches).unwrap(); + assert_eq!(result.num_rows(), 2); + } + #[tokio::test] async fn test_row_filter() { let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]); diff --git a/parquet/src/arrow/in_memory_row_group.rs b/parquet/src/arrow/in_memory_row_group.rs index 34e46cd34e91..7969f6f32321 100644 --- a/parquet/src/arrow/in_memory_row_group.rs +++ b/parquet/src/arrow/in_memory_row_group.rs @@ -258,7 +258,9 @@ impl ColumnChunkData { .map(|idx| data[idx].1.clone()) .map_err(|_| { ParquetError::General(format!( - "Invalid offset in sparse column chunk data: {start}" + "Invalid offset in sparse column chunk data: {start}, no matching page found.\ + If you are using a `SelectionStrategyPolicy::Mask`, ensure that the OffsetIndex is provided when \ + creating the InMemoryRowGroup." )) }), ColumnChunkData::Dense { offset, data } => { diff --git a/parquet/src/arrow/push_decoder/mod.rs b/parquet/src/arrow/push_decoder/mod.rs index b26a21132c4d..9b3893258ba5 100644 --- a/parquet/src/arrow/push_decoder/mod.rs +++ b/parquet/src/arrow/push_decoder/mod.rs @@ -175,6 +175,7 @@ impl ParquetPushDecoderBuilder { limit, offset, metrics, + row_selection_policy, max_predicate_cache_size, } = self; @@ -196,6 +197,7 @@ impl ParquetPushDecoderBuilder { metrics, max_predicate_cache_size, buffers, + row_selection_policy, ); // Initialize the decoder with the configured options diff --git a/parquet/src/arrow/push_decoder/reader_builder/mod.rs b/parquet/src/arrow/push_decoder/reader_builder/mod.rs index a0ced8aa8522..312b8af84535 100644 --- a/parquet/src/arrow/push_decoder/reader_builder/mod.rs +++ b/parquet/src/arrow/push_decoder/reader_builder/mod.rs @@ -22,8 +22,9 @@ use crate::DecodeResult; use crate::arrow::ProjectionMask; use crate::arrow::array_reader::{ArrayReaderBuilder, RowGroupCache}; use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics; +use crate::arrow::arrow_reader::selection::RowSelectionStrategy; use crate::arrow::arrow_reader::{ - ParquetRecordBatchReader, ReadPlanBuilder, RowFilter, RowSelection, + ParquetRecordBatchReader, ReadPlanBuilder, RowFilter, RowSelection, RowSelectionPolicy, }; use crate::arrow::in_memory_row_group::ColumnChunkData; use crate::arrow::push_decoder::reader_builder::data::DataRequestBuilder; @@ -31,6 +32,7 @@ use crate::arrow::push_decoder::reader_builder::filter::CacheInfo; use crate::arrow::schema::ParquetField; use crate::errors::ParquetError; use crate::file::metadata::ParquetMetaData; +use crate::file::page_index::offset_index::OffsetIndexMetaData; use crate::util::push_buffers::PushBuffers; use bytes::Bytes; use data::DataRequest; @@ -155,6 +157,9 @@ pub(crate) struct RowGroupReaderBuilder { /// The metrics collector metrics: ArrowReaderMetrics, + /// Strategy for materialising row selections + row_selection_policy: RowSelectionPolicy, + /// Current state of the decoder. /// /// It is taken when processing, and must be put back before returning @@ -179,6 +184,7 @@ impl RowGroupReaderBuilder { metrics: ArrowReaderMetrics, max_predicate_cache_size: usize, buffers: PushBuffers, + row_selection_policy: RowSelectionPolicy, ) -> Self { Self { batch_size, @@ -190,6 +196,7 @@ impl RowGroupReaderBuilder { offset, metrics, max_predicate_cache_size, + row_selection_policy, state: Some(RowGroupDecoderState::Finished), buffers, } @@ -233,7 +240,9 @@ impl RowGroupReaderBuilder { "Internal Error: next_row_group called while still reading a row group. Expected Finished state, got {state:?}" ))); } - let plan_builder = ReadPlanBuilder::new(self.batch_size).with_selection(selection); + let plan_builder = ReadPlanBuilder::new(self.batch_size) + .with_selection(selection) + .with_row_selection_policy(self.row_selection_policy); let row_group_info = RowGroupInfo { row_group_idx, @@ -484,7 +493,7 @@ impl RowGroupReaderBuilder { } // Apply any limit and offset - let plan_builder = plan_builder + let mut plan_builder = plan_builder .limited(row_count) .with_offset(self.offset) .with_limit(self.limit) @@ -524,6 +533,14 @@ impl RowGroupReaderBuilder { // so don't call with_cache_projection here .build(); + plan_builder = plan_builder.with_row_selection_policy(self.row_selection_policy); + + plan_builder = override_selector_strategy_if_needed( + plan_builder, + &self.projection, + self.row_group_offset_index(row_group_idx), + ); + let row_group_info = RowGroupInfo { row_group_idx, row_count, @@ -650,14 +667,75 @@ impl RowGroupReaderBuilder { Some(ProjectionMask::leaves(schema, included_leaves)) } } + + /// Get the offset index for the specified row group, if any + fn row_group_offset_index(&self, row_group_idx: usize) -> Option<&[OffsetIndexMetaData]> { + self.metadata + .offset_index() + .filter(|index| !index.is_empty()) + .and_then(|index| index.get(row_group_idx)) + .map(|columns| columns.as_slice()) + } +} + +/// Override the selection strategy if needed. +/// +/// Some pages can be skipped during row-group construction if they are not read +/// by the selections. This means that the data pages for those rows are never +/// loaded and definition/repetition levels are never read. When using +/// `RowSelections` selection works because `skip_records()` handles this +/// case and skips the page accordingly. +/// +/// However, with the current mask design, all values must be read and decoded +/// and then a mask filter is applied. Thus if any pages are skipped during +/// row-group construction, the data pages are missing and cannot be decoded. +/// +/// A simple example: +/// * the page size is 2, the mask is 100001, row selection should be read(1) skip(4) read(1) +/// * the `ColumnChunkData` would be page1(10), page2(skipped), page3(01) +/// +/// Using the row selection to skip(4), page2 won't be read at all, so in this +/// case we can't decode all the rows and apply a mask. To correctly apply the +/// bit mask, we need all 6 values be read, but page2 is not in memory. +fn override_selector_strategy_if_needed( + plan_builder: ReadPlanBuilder, + projection_mask: &ProjectionMask, + offset_index: Option<&[OffsetIndexMetaData]>, +) -> ReadPlanBuilder { + // override only applies to Auto policy, If the policy is already Mask or Selectors, respect that + let RowSelectionPolicy::Auto { .. } = plan_builder.row_selection_policy() else { + return plan_builder; + }; + + let preferred_strategy = plan_builder.resolve_selection_strategy(); + + let force_selectors = matches!(preferred_strategy, RowSelectionStrategy::Mask) + && plan_builder.selection().is_some_and(|selection| { + selection.should_force_selectors(projection_mask, offset_index) + }); + + let resolved_strategy = if force_selectors { + RowSelectionStrategy::Selectors + } else { + preferred_strategy + }; + + // override the plan builder strategy with the resolved one + let new_policy = match resolved_strategy { + RowSelectionStrategy::Mask => RowSelectionPolicy::Mask, + RowSelectionStrategy::Selectors => RowSelectionPolicy::Selectors, + }; + + plan_builder.with_row_selection_policy(new_policy) } #[cfg(test)] mod tests { use super::*; + #[test] // Verify that the size of RowGroupDecoderState does not grow too large fn test_structure_size() { - assert_eq!(std::mem::size_of::(), 184); + assert_eq!(std::mem::size_of::(), 200); } }