diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 6a75485c6284..f10755a4594c 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -853,6 +853,7 @@ impl DefaultPhysicalPlanner { )? { PlanAsyncExpr::Sync(PlannedExprResult::Expr(runtime_expr)) => { FilterExec::try_new(Arc::clone(&runtime_expr[0]), physical_input)? + .with_batch_size(session_state.config().batch_size())? } PlanAsyncExpr::Async( async_map, @@ -871,6 +872,7 @@ impl DefaultPhysicalPlanner { .with_projection(Some( (0..input.schema().fields().len()).collect(), ))? + .with_batch_size(session_state.config().batch_size())? } _ => { return internal_err!( diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 5ba508a8defe..0c583e1fb973 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -42,7 +42,7 @@ use crate::{ DisplayFormatType, ExecutionPlan, }; -use arrow::compute::filter_record_batch; +use arrow::compute::{filter_record_batch, BatchCoalescer}; use arrow::datatypes::{DataType, SchemaRef}; use arrow::record_batch::RecordBatch; use datafusion_common::cast::as_boolean_array; @@ -67,6 +67,7 @@ use futures::stream::{Stream, StreamExt}; use log::trace; const FILTER_EXEC_DEFAULT_SELECTIVITY: u8 = 20; +const FILTER_EXEC_DEFAULT_BATCH_SIZE: usize = 8192; /// FilterExec evaluates a boolean predicate against all input batches to determine which rows to /// include in its output batches. @@ -84,6 +85,8 @@ pub struct FilterExec { cache: PlanProperties, /// The projection indices of the columns in the output schema of join projection: Option>, + /// Target batch size for output batches + batch_size: usize, } impl FilterExec { @@ -108,6 +111,7 @@ impl FilterExec { default_selectivity, cache, projection: None, + batch_size: FILTER_EXEC_DEFAULT_BATCH_SIZE, }) } other => { @@ -155,6 +159,19 @@ impl FilterExec { default_selectivity: self.default_selectivity, cache, projection, + batch_size: self.batch_size, + }) + } + + pub fn with_batch_size(&self, batch_size: usize) -> Result { + Ok(Self { + predicate: Arc::clone(&self.predicate), + input: Arc::clone(&self.input), + metrics: self.metrics.clone(), + default_selectivity: self.default_selectivity, + cache: self.cache.clone(), + projection: self.projection.clone(), + batch_size, }) } @@ -392,6 +409,8 @@ impl ExecutionPlan for FilterExec { input: self.input.execute(partition, context)?, metrics, projection: self.projection.clone(), + batch_coalescer: BatchCoalescer::new(self.schema(), self.batch_size) + .with_biggest_coalesce_batch_size(Some(self.batch_size / 2)), })) } @@ -549,6 +568,7 @@ impl ExecutionPlan for FilterExec { self.projection.as_ref(), )?, projection: None, + batch_size: self.batch_size, }; Some(Arc::new(new) as _) }; @@ -627,6 +647,8 @@ struct FilterExecStream { metrics: FilterExecMetrics, /// The projection indices of the columns in the input schema projection: Option>, + /// Batch coalescer to combine small batches + batch_coalescer: BatchCoalescer, } /// The metrics for `FilterExec` @@ -652,14 +674,13 @@ pub fn batch_filter( batch: &RecordBatch, predicate: &Arc, ) -> Result { - filter_and_project(batch, predicate, None, &batch.schema()) + filter_and_project(batch, predicate, None) } fn filter_and_project( batch: &RecordBatch, predicate: &Arc, projection: Option<&Vec>, - output_schema: &SchemaRef, ) -> Result { predicate .evaluate(batch) @@ -669,14 +690,7 @@ fn filter_and_project( // Apply filter array to record batch (Ok(filter_array), None) => filter_record_batch(batch, filter_array)?, (Ok(filter_array), Some(projection)) => { - let projected_columns = projection - .iter() - .map(|i| Arc::clone(batch.column(*i))) - .collect(); - let projected_batch = RecordBatch::try_new( - Arc::clone(output_schema), - projected_columns, - )?; + let projected_batch = batch.project(projection)?; filter_record_batch(&projected_batch, filter_array)? } (Err(_), _) => { @@ -696,26 +710,62 @@ impl Stream for FilterExecStream { cx: &mut Context<'_>, ) -> Poll> { let poll; + let elapsed_compute = self.metrics.baseline_metrics.elapsed_compute().clone(); loop { match ready!(self.input.poll_next_unpin(cx)) { Some(Ok(batch)) => { - let timer = self.metrics.baseline_metrics.elapsed_compute().timer(); - let filtered_batch = filter_and_project( - &batch, - &self.predicate, - self.projection.as_ref(), - &self.schema, - )?; - timer.done(); + let timer = elapsed_compute.timer(); + self.predicate.as_ref() + .evaluate(&batch) + .and_then(|v| v.into_array(batch.num_rows())) + .and_then(|array| { + Ok(match self.projection { + Some(ref projection) => { + let projected_batch = batch.project(projection)?; + (array, projected_batch) + }, + None => (array, batch) + }) + }).and_then(|(array, batch)| { + match as_boolean_array(&array) { + Ok(filter_array) => { + self.metrics.selectivity.add_part(filter_array.true_count()); + self.metrics.selectivity.add_total(batch.num_rows()); + + self.batch_coalescer.push_batch_with_filter(batch.clone(), filter_array)?; + Ok(()) + } + Err(_) => { + internal_err!( + "Cannot create filter_array from non-boolean predicates" + ) + } + } + })?; - self.metrics.selectivity.add_part(filtered_batch.num_rows()); - self.metrics.selectivity.add_total(batch.num_rows()); + timer.done(); - // Skip entirely filtered batches - if filtered_batch.num_rows() == 0 { - continue; + if self.batch_coalescer.has_completed_batch() { + poll = Poll::Ready(Some(Ok(self + .batch_coalescer + .next_completed_batch() + .expect("has_completed_batch is true")))); + break; + } + continue; + } + None => { + // Flush any remaining buffered batch + match self.batch_coalescer.finish_buffered_batch() { + Ok(()) => { + poll = Poll::Ready( + self.batch_coalescer.next_completed_batch().map(Ok), + ); + } + Err(e) => { + poll = Poll::Ready(Some(Err(e.into()))); + } } - poll = Poll::Ready(Some(Ok(filtered_batch))); break; } value => {