-
Notifications
You must be signed in to change notification settings - Fork 1k
Description
Is your feature request related to a problem or challenge? Please describe what you are trying to do.
After the great work from @hhhizzz in #8733, we (finally) have the ability to use a Bitmask filter representation when applying filters during Parquet decode. 🎉
However, as @hhhizzz explains in #8733 (comment) using the bitmask filter approach will actually fetch unecessary data pages in some cases. in ASCII art:
┏━━━━┓ ┌────────┐
- '1' means row is selected ┃ 1 ┃ │ Row 0 │
- '0' means row is filtered ┃ 0 ┃ │ Row 1 │
┃ 0 ┃ │ Row 2 │ Page 0
┃ 1 ┃ │ Row 3 │
┃ 0 ┃ │ Row 4 │
┃ ┃ └────────┘
┃ ┃ ┌────────┐
┃ 0 ┃ │ Row 5 │
┃ 0 ┃ │ Row 6 │
Filter only selects Row 0, ┃ 0 ┃ │ Row 7 │
Row 3, and Row 21. All other ┃ 0 ┃ │ Row 8 │
Rows are filtered out ┃ 0 ┃ │ Row 9 │ Filter selects no
┃ 0 ┃ │ Row 10 │ Page 1 rows from
┃ 0 ┃ │ Row 11 │ Page 1, but the
┃ 0 ┃ │ Row 12 │ current Filter mask
┃ 0 ┃ │ Row 13 │ strategy requires
┃ 0 ┃ │ Row 14 │
┃ 0 ┃ │ Row 15 │
┃ ┃ └────────┘
┃ ┃ ┌────────┐
┃ 0 ┃ │ Row 16 │
┃ 0 ┃ │ Row 17 │
┃ 0 ┃ │ Row 18 │ Page 2
┃ 0 ┃ │ Row 19 │
┃ 0 ┃ │ Row 20 │
┃ 1 ┃ │ Row 21 │
┃ 0 ┃ │ Row 22 │
┗━━━━┛ └────────┘
Data Pages
Filter
BitMask
When evaluating using RowSelection the pages are not loaded at all, as the existing skip_records() machinery handles the case and skips reading any data from the pages accordingly.
However, when evaluating with a mask, the mask may include a page which was entirely ruled out (has no matching rows)
A simple example:
the page size is 2, the mask is 100001, row selection should be read(1) skip(4) read(1)
the ColumnChunkData would be page1(10), page2(skipped), page3(01)
Using the rowselection to skip(4), the page2 won't be read at all.
But using the bit mask, we need all 6 value be read, but the page2 is not in the memory, which is why I need to construct this synthetic page.
The current code handles this case by falling back to a selector strategy when masks would straddle page boundaries:
arrow-rs/parquet/src/arrow/push_decoder/reader_builder/mod.rs
Lines 681 to 730 in 911331a
| /// Override the selection strategy if needed. | |
| /// | |
| /// Some pages can be skipped during row-group construction if they are not read | |
| /// by the selections. This means that the data pages for those rows are never | |
| /// loaded and definition/repetition levels are never read. When using | |
| /// `RowSelections` selection works because `skip_records()` handles this | |
| /// case and skips the page accordingly. | |
| /// | |
| /// However, with the current mask design, all values must be read and decoded | |
| /// and then a mask filter is applied. Thus if any pages are skipped during | |
| /// row-group construction, the data pages are missing and cannot be decoded. | |
| /// | |
| /// A simple example: | |
| /// * the page size is 2, the mask is 100001, row selection should be read(1) skip(4) read(1) | |
| /// * the `ColumnChunkData` would be page1(10), page2(skipped), page3(01) | |
| /// | |
| /// Using the row selection to skip(4), page2 won't be read at all, so in this | |
| /// case we can't decode all the rows and apply a mask. To correctly apply the | |
| /// bit mask, we need all 6 values be read, but page2 is not in memory. | |
| fn override_selector_strategy_if_needed( | |
| plan_builder: ReadPlanBuilder, | |
| projection_mask: &ProjectionMask, | |
| offset_index: Option<&[OffsetIndexMetaData]>, | |
| ) -> ReadPlanBuilder { | |
| // override only applies to Auto policy, If the policy is already Mask or Selectors, respect that | |
| let RowSelectionPolicy::Auto { .. } = plan_builder.row_selection_policy() else { | |
| return plan_builder; | |
| }; | |
| let preferred_strategy = plan_builder.resolve_selection_strategy(); | |
| let force_selectors = matches!(preferred_strategy, RowSelectionStrategy::Mask) | |
| && plan_builder.selection().is_some_and(|selection| { | |
| selection.should_force_selectors(projection_mask, offset_index) | |
| }); | |
| let resolved_strategy = if force_selectors { | |
| RowSelectionStrategy::Selectors | |
| } else { | |
| preferred_strategy | |
| }; | |
| // override the plan builder strategy with the resolved one | |
| let new_policy = match resolved_strategy { | |
| RowSelectionStrategy::Mask => RowSelectionPolicy::Mask, | |
| RowSelectionStrategy::Selectors => RowSelectionPolicy::Selectors, | |
| }; | |
| plan_builder.with_row_selection_policy(new_policy) | |
| } |
Describe the solution you'd like
I would like the parquet decoder to be able to use the mask evaluation strategy AND skip pages.
Describe alternatives you've considered
One option (from @tustvold ) is to make the mask iteration page aware, so that we don't evaluate the predicate when we can rule out the entire page. Perhaps we can teach the bitmask iteration to be smarter in this case.
Additional context
Thoughts from @tustvold #8733 (review):
By definition the mask selection strategy requests rows that weren't part of the original selection, the problem is that this could result in requesting rows for pages that we know are irrelevant. In some cases this just results in wasted IO, however, when using prefetching IO systems (such as AsyncParquetReader) this results in errors.
I think a better solution would be to ensure we only construct MaskChunk that don't cross page boundaries. Ideally this would be done on a per-leaf column basis, but tbh I suspect just doing it globally would probably work just fine.
Edit: If one was feeling fancy, one could ignore page boundaries where both pages were present in the original selection, although in practice I suspect this not to make a huge difference.