Skip to content

Commit 59ee569

Browse files
authored
Merge pull request #8 from alamb/alamb/rework_selections
Rework RowSelectionCursor to use enums
2 parents 0e2895d + 93fc72b commit 59ee569

File tree

3 files changed

+206
-192
lines changed

3 files changed

+206
-192
lines changed

parquet/src/arrow/arrow_reader/mod.rs

Lines changed: 64 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -1072,83 +1072,80 @@ impl ParquetRecordBatchReader {
10721072
fn next_inner(&mut self) -> Result<Option<RecordBatch>> {
10731073
let mut read_records = 0;
10741074
let batch_size = self.batch_size();
1075-
match self.read_plan.selection_mut() {
1076-
Some(selection_cursor) => {
1077-
if selection_cursor.is_mask_backed() {
1078-
// Stream the record batch reader using contiguous segments of the selection
1079-
// mask, avoiding the need to materialize intermediate `RowSelector` ranges.
1080-
while !selection_cursor.is_empty() {
1081-
let Some(mask_chunk) = selection_cursor.next_mask_chunk(batch_size) else {
1082-
return Ok(None);
1083-
};
1084-
1085-
if mask_chunk.initial_skip > 0 {
1086-
let skipped =
1087-
self.array_reader.skip_records(mask_chunk.initial_skip)?;
1088-
if skipped != mask_chunk.initial_skip {
1089-
return Err(general_err!(
1090-
"failed to skip rows, expected {}, got {}",
1091-
mask_chunk.initial_skip,
1092-
skipped
1093-
));
1094-
}
1095-
}
1096-
1097-
if mask_chunk.chunk_rows == 0 {
1098-
if selection_cursor.is_empty() && mask_chunk.selected_rows == 0 {
1099-
return Ok(None);
1100-
}
1101-
continue;
1102-
}
1103-
1104-
let mask = selection_cursor
1105-
.mask_values_for(&mask_chunk)
1106-
.ok_or_else(|| general_err!("row selection mask out of bounds"))?;
1075+
match self.read_plan.row_selection_cursor_mut() {
1076+
RowSelectionCursor::Mask(mask_cursor) => {
1077+
// Stream the record batch reader using contiguous segments of the selection
1078+
// mask, avoiding the need to materialize intermediate `RowSelector` ranges.
1079+
while !mask_cursor.is_empty() {
1080+
let Some(mask_chunk) = mask_cursor.next_mask_chunk(batch_size) else {
1081+
return Ok(None);
1082+
};
11071083

1108-
let read = self.array_reader.read_records(mask_chunk.chunk_rows)?;
1109-
if read == 0 {
1084+
if mask_chunk.initial_skip > 0 {
1085+
let skipped = self.array_reader.skip_records(mask_chunk.initial_skip)?;
1086+
if skipped != mask_chunk.initial_skip {
11101087
return Err(general_err!(
1111-
"reached end of column while expecting {} rows",
1112-
mask_chunk.chunk_rows
1088+
"failed to skip rows, expected {}, got {}",
1089+
mask_chunk.initial_skip,
1090+
skipped
11131091
));
11141092
}
1115-
if read != mask_chunk.chunk_rows {
1116-
return Err(general_err!(
1117-
"insufficient rows read from array reader - expected {}, got {}",
1118-
mask_chunk.chunk_rows,
1119-
read
1120-
));
1093+
}
1094+
1095+
if mask_chunk.chunk_rows == 0 {
1096+
if mask_cursor.is_empty() && mask_chunk.selected_rows == 0 {
1097+
return Ok(None);
11211098
}
1099+
continue;
1100+
}
11221101

1123-
let array = self.array_reader.consume_batch()?;
1124-
// The column reader exposes the projection as a struct array; convert this
1125-
// into a record batch before applying the boolean filter mask.
1126-
let struct_array = array.as_struct_opt().ok_or_else(|| {
1127-
ArrowError::ParquetError(
1128-
"Struct array reader should return struct array".to_string(),
1129-
)
1130-
})?;
1102+
let mask = mask_cursor.mask_values_for(&mask_chunk)?;
11311103

1132-
let filtered_batch =
1133-
filter_record_batch(&RecordBatch::from(struct_array), &mask)?;
1104+
let read = self.array_reader.read_records(mask_chunk.chunk_rows)?;
1105+
if read == 0 {
1106+
return Err(general_err!(
1107+
"reached end of column while expecting {} rows",
1108+
mask_chunk.chunk_rows
1109+
));
1110+
}
1111+
if read != mask_chunk.chunk_rows {
1112+
return Err(general_err!(
1113+
"insufficient rows read from array reader - expected {}, got {}",
1114+
mask_chunk.chunk_rows,
1115+
read
1116+
));
1117+
}
11341118

1135-
if filtered_batch.num_rows() != mask_chunk.selected_rows {
1136-
return Err(general_err!(
1137-
"filtered rows mismatch selection - expected {}, got {}",
1138-
mask_chunk.selected_rows,
1139-
filtered_batch.num_rows()
1140-
));
1141-
}
1119+
let array = self.array_reader.consume_batch()?;
1120+
// The column reader exposes the projection as a struct array; convert this
1121+
// into a record batch before applying the boolean filter mask.
1122+
let struct_array = array.as_struct_opt().ok_or_else(|| {
1123+
ArrowError::ParquetError(
1124+
"Struct array reader should return struct array".to_string(),
1125+
)
1126+
})?;
11421127

1143-
if filtered_batch.num_rows() == 0 {
1144-
continue;
1145-
}
1128+
let filtered_batch =
1129+
filter_record_batch(&RecordBatch::from(struct_array), &mask)?;
1130+
1131+
if filtered_batch.num_rows() != mask_chunk.selected_rows {
1132+
return Err(general_err!(
1133+
"filtered rows mismatch selection - expected {}, got {}",
1134+
mask_chunk.selected_rows,
1135+
filtered_batch.num_rows()
1136+
));
1137+
}
11461138

1147-
return Ok(Some(filtered_batch));
1139+
if filtered_batch.num_rows() == 0 {
1140+
continue;
11481141
}
1142+
1143+
return Ok(Some(filtered_batch));
11491144
}
1150-
while read_records < batch_size && !selection_cursor.is_empty() {
1151-
let front = selection_cursor.next_selector();
1145+
}
1146+
RowSelectionCursor::Selectors(selectors_cursor) => {
1147+
while read_records < batch_size && !selectors_cursor.is_empty() {
1148+
let front = selectors_cursor.next_selector();
11521149
if front.skip {
11531150
let skipped = self.array_reader.skip_records(front.row_count)?;
11541151

@@ -1174,7 +1171,7 @@ impl ParquetRecordBatchReader {
11741171
Some(remaining) if remaining != 0 => {
11751172
// if page row count less than batch_size we must set batch size to page row count.
11761173
// add check avoid dead loop
1177-
selection_cursor.return_selector(RowSelector::select(remaining));
1174+
selectors_cursor.return_selector(RowSelector::select(remaining));
11781175
need_read
11791176
}
11801177
_ => front.row_count,
@@ -1185,7 +1182,7 @@ impl ParquetRecordBatchReader {
11851182
};
11861183
}
11871184
}
1188-
None => {
1185+
RowSelectionCursor::All => {
11891186
self.array_reader.read_records(batch_size)?;
11901187
}
11911188
};

parquet/src/arrow/arrow_reader/read_plan.rs

Lines changed: 37 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use crate::arrow::arrow_reader::{
2626
use crate::errors::{ParquetError, Result};
2727
use arrow_array::Array;
2828
use arrow_select::filter::prep_null_mask_filter;
29+
use std::collections::VecDeque;
2930

3031
/// A builder for [`ReadPlan`]
3132
#[derive(Clone, Debug)]
@@ -89,6 +90,8 @@ impl ReadPlanBuilder {
8990
}
9091

9192
/// Returns the preferred [`RowSelectionStrategy`] for materialising the current selection.
93+
///
94+
/// Guarantees to return either `Selectors` or `Mask`, never `Auto`.
9295
pub fn preferred_selection_strategy(&self) -> RowSelectionStrategy {
9396
match self.selection_strategy {
9497
RowSelectionStrategy::Selectors => RowSelectionStrategy::Selectors,
@@ -167,25 +170,35 @@ impl ReadPlanBuilder {
167170
if !self.selects_any() {
168171
self.selection = Some(RowSelection::from(vec![]));
169172
}
170-
let selection_strategy = match self.selection_strategy {
171-
RowSelectionStrategy::Auto { .. } => self.preferred_selection_strategy(),
172-
strategy => strategy,
173-
};
173+
174+
// Preferred strategy must not be Auto
175+
let selection_strategy = self.preferred_selection_strategy();
176+
174177
let Self {
175178
batch_size,
176179
selection,
177180
selection_strategy: _,
178181
} = self;
179182

180-
let selection = selection.map(|s| {
181-
let trimmed = s.trim();
182-
let selectors: Vec<RowSelector> = trimmed.into();
183-
RowSelectionCursor::new(selectors, selection_strategy)
184-
});
183+
let selection = selection.map(|s| s.trim());
184+
185+
let row_selection_cursor = selection
186+
.map(|s| {
187+
let trimmed = s.trim();
188+
let selectors: Vec<RowSelector> = trimmed.into();
189+
match selection_strategy {
190+
RowSelectionStrategy::Mask => {
191+
RowSelectionCursor::new_mask_from_selectors(selectors)
192+
}
193+
RowSelectionStrategy::Selectors => RowSelectionCursor::new_selectors(selectors),
194+
RowSelectionStrategy::Auto { .. } => unreachable!(),
195+
}
196+
})
197+
.unwrap_or(RowSelectionCursor::new_all());
185198

186199
ReadPlan {
187200
batch_size,
188-
selection,
201+
row_selection_cursor,
189202
}
190203
}
191204
}
@@ -283,13 +296,23 @@ pub struct ReadPlan {
283296
/// The number of rows to read in each batch
284297
batch_size: usize,
285298
/// Row ranges to be selected from the data source
286-
selection: Option<RowSelectionCursor>,
299+
row_selection_cursor: RowSelectionCursor,
287300
}
288301

289302
impl ReadPlan {
290-
/// Returns a mutable reference to the selection, if any
291-
pub fn selection_mut(&mut self) -> Option<&mut RowSelectionCursor> {
292-
self.selection.as_mut()
303+
/// Returns a mutable reference to the selection selectors, if any
304+
#[deprecated(since = "57.1.0", note = "Use `row_selection_cursor_mut` instead")]
305+
pub fn selection_mut(&mut self) -> Option<&mut VecDeque<RowSelector>> {
306+
if let RowSelectionCursor::Selectors(selectors_cursor) = &mut self.row_selection_cursor {
307+
Some(selectors_cursor.selectors_mut())
308+
} else {
309+
None
310+
}
311+
}
312+
313+
/// Returns a mutable reference to the row selection cursor
314+
pub fn row_selection_cursor_mut(&mut self) -> &mut RowSelectionCursor {
315+
&mut self.row_selection_cursor
293316
}
294317

295318
/// Return the number of rows to read in each output batch

0 commit comments

Comments
 (0)