Skip to content
2 changes: 2 additions & 0 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,7 @@ impl DefaultPhysicalPlanner {
)? {
PlanAsyncExpr::Sync(PlannedExprResult::Expr(runtime_expr)) => {
FilterExec::try_new(Arc::clone(&runtime_expr[0]), physical_input)?
.with_batch_size(session_state.config().batch_size())?
}
PlanAsyncExpr::Async(
async_map,
Expand All @@ -871,6 +872,7 @@ impl DefaultPhysicalPlanner {
.with_projection(Some(
(0..input.schema().fields().len()).collect(),
))?
.with_batch_size(session_state.config().batch_size())?
}
_ => {
return internal_err!(
Expand Down
100 changes: 75 additions & 25 deletions datafusion/physical-plan/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use crate::{
DisplayFormatType, ExecutionPlan,
};

use arrow::compute::filter_record_batch;
use arrow::compute::{filter_record_batch, BatchCoalescer};
use arrow::datatypes::{DataType, SchemaRef};
use arrow::record_batch::RecordBatch;
use datafusion_common::cast::as_boolean_array;
Expand All @@ -67,6 +67,7 @@ use futures::stream::{Stream, StreamExt};
use log::trace;

const FILTER_EXEC_DEFAULT_SELECTIVITY: u8 = 20;
const FILTER_EXEC_DEFAULT_BATCH_SIZE: usize = 8192;

/// FilterExec evaluates a boolean predicate against all input batches to determine which rows to
/// include in its output batches.
Expand All @@ -84,6 +85,8 @@ pub struct FilterExec {
cache: PlanProperties,
/// The projection indices of the columns in the output schema of join
projection: Option<Vec<usize>>,
/// Target batch size for output batches
batch_size: usize,
}

impl FilterExec {
Expand All @@ -108,6 +111,7 @@ impl FilterExec {
default_selectivity,
cache,
projection: None,
batch_size: FILTER_EXEC_DEFAULT_BATCH_SIZE,
})
}
other => {
Expand Down Expand Up @@ -155,6 +159,19 @@ impl FilterExec {
default_selectivity: self.default_selectivity,
cache,
projection,
batch_size: self.batch_size,
})
}

pub fn with_batch_size(&self, batch_size: usize) -> Result<Self> {
Ok(Self {
predicate: Arc::clone(&self.predicate),
input: Arc::clone(&self.input),
metrics: self.metrics.clone(),
default_selectivity: self.default_selectivity,
cache: self.cache.clone(),
projection: self.projection.clone(),
batch_size,
})
}

Expand Down Expand Up @@ -392,6 +409,8 @@ impl ExecutionPlan for FilterExec {
input: self.input.execute(partition, context)?,
metrics,
projection: self.projection.clone(),
batch_coalescer: BatchCoalescer::new(self.schema(), self.batch_size)
.with_biggest_coalesce_batch_size(Some(self.batch_size / 2)),
}))
}

Expand Down Expand Up @@ -549,6 +568,7 @@ impl ExecutionPlan for FilterExec {
self.projection.as_ref(),
)?,
projection: None,
batch_size: self.batch_size,
};
Some(Arc::new(new) as _)
};
Expand Down Expand Up @@ -627,6 +647,8 @@ struct FilterExecStream {
metrics: FilterExecMetrics,
/// The projection indices of the columns in the input schema
projection: Option<Vec<usize>>,
/// Batch coalescer to combine small batches
batch_coalescer: BatchCoalescer,
}

/// The metrics for `FilterExec`
Expand All @@ -652,14 +674,13 @@ pub fn batch_filter(
batch: &RecordBatch,
predicate: &Arc<dyn PhysicalExpr>,
) -> Result<RecordBatch> {
filter_and_project(batch, predicate, None, &batch.schema())
filter_and_project(batch, predicate, None)
}

fn filter_and_project(
batch: &RecordBatch,
predicate: &Arc<dyn PhysicalExpr>,
projection: Option<&Vec<usize>>,
output_schema: &SchemaRef,
) -> Result<RecordBatch> {
predicate
.evaluate(batch)
Expand All @@ -669,14 +690,7 @@ fn filter_and_project(
// Apply filter array to record batch
(Ok(filter_array), None) => filter_record_batch(batch, filter_array)?,
(Ok(filter_array), Some(projection)) => {
let projected_columns = projection
.iter()
.map(|i| Arc::clone(batch.column(*i)))
.collect();
let projected_batch = RecordBatch::try_new(
Arc::clone(output_schema),
projected_columns,
)?;
let projected_batch = batch.project(projection)?;
filter_record_batch(&projected_batch, filter_array)?
}
(Err(_), _) => {
Expand All @@ -696,26 +710,62 @@ impl Stream for FilterExecStream {
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let poll;
let elapsed_compute = self.metrics.baseline_metrics.elapsed_compute().clone();
loop {
match ready!(self.input.poll_next_unpin(cx)) {
Some(Ok(batch)) => {
let timer = self.metrics.baseline_metrics.elapsed_compute().timer();
let filtered_batch = filter_and_project(
&batch,
&self.predicate,
self.projection.as_ref(),
&self.schema,
)?;
timer.done();
let timer = elapsed_compute.timer();
self.predicate.as_ref()
.evaluate(&batch)
.and_then(|v| v.into_array(batch.num_rows()))
.and_then(|array| {
Ok(match self.projection {
Some(ref projection) => {
let projected_batch = batch.project(projection)?;
(array, projected_batch)
},
None => (array, batch)
})
}).and_then(|(array, batch)| {
match as_boolean_array(&array) {
Ok(filter_array) => {
self.metrics.selectivity.add_part(filter_array.true_count());
self.metrics.selectivity.add_total(batch.num_rows());

self.batch_coalescer.push_batch_with_filter(batch.clone(), filter_array)?;
Ok(())
}
Err(_) => {
internal_err!(
"Cannot create filter_array from non-boolean predicates"
)
}
}
})?;

self.metrics.selectivity.add_part(filtered_batch.num_rows());
self.metrics.selectivity.add_total(batch.num_rows());
timer.done();

// Skip entirely filtered batches
if filtered_batch.num_rows() == 0 {
continue;
if self.batch_coalescer.has_completed_batch() {
poll = Poll::Ready(Some(Ok(self
.batch_coalescer
.next_completed_batch()
.expect("has_completed_batch is true"))));
break;
}
continue;
}
None => {
// Flush any remaining buffered batch
match self.batch_coalescer.finish_buffered_batch() {
Ok(()) => {
poll = Poll::Ready(
self.batch_coalescer.next_completed_batch().map(Ok),
);
}
Err(e) => {
poll = Poll::Ready(Some(Err(e.into())));
}
}
poll = Poll::Ready(Some(Ok(filtered_batch)));
break;
}
value => {
Expand Down