Skip to content

Commit 65bd13d

Browse files
authored
Coalesce batches inside FilterExec (#18604)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes: #18606 - Relates to: #7001 ## Rationale for this change Moe coalesce batches inside filter exec. We can use `BatchCoalescer ::push_batch_with_filter` which should give a speed up compared to filtering individual batches + concatenating afterwards. <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? Changes the FilterExec to coalesce batches inside. I did not make a change to remove CoalesceBatchesExec from all the plans, I plan to create an issue and a PR after this is merged to do so. Now it should be mostly a no-op with limited overhead as input batches are already well-sized. <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
1 parent cbf3f50 commit 65bd13d

File tree

2 files changed

+77
-25
lines changed

2 files changed

+77
-25
lines changed

datafusion/core/src/physical_planner.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -853,6 +853,7 @@ impl DefaultPhysicalPlanner {
853853
)? {
854854
PlanAsyncExpr::Sync(PlannedExprResult::Expr(runtime_expr)) => {
855855
FilterExec::try_new(Arc::clone(&runtime_expr[0]), physical_input)?
856+
.with_batch_size(session_state.config().batch_size())?
856857
}
857858
PlanAsyncExpr::Async(
858859
async_map,
@@ -871,6 +872,7 @@ impl DefaultPhysicalPlanner {
871872
.with_projection(Some(
872873
(0..input.schema().fields().len()).collect(),
873874
))?
875+
.with_batch_size(session_state.config().batch_size())?
874876
}
875877
_ => {
876878
return internal_err!(

datafusion/physical-plan/src/filter.rs

Lines changed: 75 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ use crate::{
4242
DisplayFormatType, ExecutionPlan,
4343
};
4444

45-
use arrow::compute::filter_record_batch;
45+
use arrow::compute::{filter_record_batch, BatchCoalescer};
4646
use arrow::datatypes::{DataType, SchemaRef};
4747
use arrow::record_batch::RecordBatch;
4848
use datafusion_common::cast::as_boolean_array;
@@ -67,6 +67,7 @@ use futures::stream::{Stream, StreamExt};
6767
use log::trace;
6868

6969
const FILTER_EXEC_DEFAULT_SELECTIVITY: u8 = 20;
70+
const FILTER_EXEC_DEFAULT_BATCH_SIZE: usize = 8192;
7071

7172
/// FilterExec evaluates a boolean predicate against all input batches to determine which rows to
7273
/// include in its output batches.
@@ -84,6 +85,8 @@ pub struct FilterExec {
8485
cache: PlanProperties,
8586
/// The projection indices of the columns in the output schema of join
8687
projection: Option<Vec<usize>>,
88+
/// Target batch size for output batches
89+
batch_size: usize,
8790
}
8891

8992
impl FilterExec {
@@ -108,6 +111,7 @@ impl FilterExec {
108111
default_selectivity,
109112
cache,
110113
projection: None,
114+
batch_size: FILTER_EXEC_DEFAULT_BATCH_SIZE,
111115
})
112116
}
113117
other => {
@@ -155,6 +159,19 @@ impl FilterExec {
155159
default_selectivity: self.default_selectivity,
156160
cache,
157161
projection,
162+
batch_size: self.batch_size,
163+
})
164+
}
165+
166+
pub fn with_batch_size(&self, batch_size: usize) -> Result<Self> {
167+
Ok(Self {
168+
predicate: Arc::clone(&self.predicate),
169+
input: Arc::clone(&self.input),
170+
metrics: self.metrics.clone(),
171+
default_selectivity: self.default_selectivity,
172+
cache: self.cache.clone(),
173+
projection: self.projection.clone(),
174+
batch_size,
158175
})
159176
}
160177

@@ -392,6 +409,8 @@ impl ExecutionPlan for FilterExec {
392409
input: self.input.execute(partition, context)?,
393410
metrics,
394411
projection: self.projection.clone(),
412+
batch_coalescer: BatchCoalescer::new(self.schema(), self.batch_size)
413+
.with_biggest_coalesce_batch_size(Some(self.batch_size / 2)),
395414
}))
396415
}
397416

@@ -549,6 +568,7 @@ impl ExecutionPlan for FilterExec {
549568
self.projection.as_ref(),
550569
)?,
551570
projection: None,
571+
batch_size: self.batch_size,
552572
};
553573
Some(Arc::new(new) as _)
554574
};
@@ -627,6 +647,8 @@ struct FilterExecStream {
627647
metrics: FilterExecMetrics,
628648
/// The projection indices of the columns in the input schema
629649
projection: Option<Vec<usize>>,
650+
/// Batch coalescer to combine small batches
651+
batch_coalescer: BatchCoalescer,
630652
}
631653

632654
/// The metrics for `FilterExec`
@@ -652,14 +674,13 @@ pub fn batch_filter(
652674
batch: &RecordBatch,
653675
predicate: &Arc<dyn PhysicalExpr>,
654676
) -> Result<RecordBatch> {
655-
filter_and_project(batch, predicate, None, &batch.schema())
677+
filter_and_project(batch, predicate, None)
656678
}
657679

658680
fn filter_and_project(
659681
batch: &RecordBatch,
660682
predicate: &Arc<dyn PhysicalExpr>,
661683
projection: Option<&Vec<usize>>,
662-
output_schema: &SchemaRef,
663684
) -> Result<RecordBatch> {
664685
predicate
665686
.evaluate(batch)
@@ -669,14 +690,7 @@ fn filter_and_project(
669690
// Apply filter array to record batch
670691
(Ok(filter_array), None) => filter_record_batch(batch, filter_array)?,
671692
(Ok(filter_array), Some(projection)) => {
672-
let projected_columns = projection
673-
.iter()
674-
.map(|i| Arc::clone(batch.column(*i)))
675-
.collect();
676-
let projected_batch = RecordBatch::try_new(
677-
Arc::clone(output_schema),
678-
projected_columns,
679-
)?;
693+
let projected_batch = batch.project(projection)?;
680694
filter_record_batch(&projected_batch, filter_array)?
681695
}
682696
(Err(_), _) => {
@@ -696,26 +710,62 @@ impl Stream for FilterExecStream {
696710
cx: &mut Context<'_>,
697711
) -> Poll<Option<Self::Item>> {
698712
let poll;
713+
let elapsed_compute = self.metrics.baseline_metrics.elapsed_compute().clone();
699714
loop {
700715
match ready!(self.input.poll_next_unpin(cx)) {
701716
Some(Ok(batch)) => {
702-
let timer = self.metrics.baseline_metrics.elapsed_compute().timer();
703-
let filtered_batch = filter_and_project(
704-
&batch,
705-
&self.predicate,
706-
self.projection.as_ref(),
707-
&self.schema,
708-
)?;
709-
timer.done();
717+
let timer = elapsed_compute.timer();
718+
self.predicate.as_ref()
719+
.evaluate(&batch)
720+
.and_then(|v| v.into_array(batch.num_rows()))
721+
.and_then(|array| {
722+
Ok(match self.projection {
723+
Some(ref projection) => {
724+
let projected_batch = batch.project(projection)?;
725+
(array, projected_batch)
726+
},
727+
None => (array, batch)
728+
})
729+
}).and_then(|(array, batch)| {
730+
match as_boolean_array(&array) {
731+
Ok(filter_array) => {
732+
self.metrics.selectivity.add_part(filter_array.true_count());
733+
self.metrics.selectivity.add_total(batch.num_rows());
734+
735+
self.batch_coalescer.push_batch_with_filter(batch.clone(), filter_array)?;
736+
Ok(())
737+
}
738+
Err(_) => {
739+
internal_err!(
740+
"Cannot create filter_array from non-boolean predicates"
741+
)
742+
}
743+
}
744+
})?;
710745

711-
self.metrics.selectivity.add_part(filtered_batch.num_rows());
712-
self.metrics.selectivity.add_total(batch.num_rows());
746+
timer.done();
713747

714-
// Skip entirely filtered batches
715-
if filtered_batch.num_rows() == 0 {
716-
continue;
748+
if self.batch_coalescer.has_completed_batch() {
749+
poll = Poll::Ready(Some(Ok(self
750+
.batch_coalescer
751+
.next_completed_batch()
752+
.expect("has_completed_batch is true"))));
753+
break;
754+
}
755+
continue;
756+
}
757+
None => {
758+
// Flush any remaining buffered batch
759+
match self.batch_coalescer.finish_buffered_batch() {
760+
Ok(()) => {
761+
poll = Poll::Ready(
762+
self.batch_coalescer.next_completed_batch().map(Ok),
763+
);
764+
}
765+
Err(e) => {
766+
poll = Poll::Ready(Some(Err(e.into())));
767+
}
717768
}
718-
poll = Poll::Ready(Some(Ok(filtered_batch)));
719769
break;
720770
}
721771
value => {

0 commit comments

Comments
 (0)