From 983b4bf78727cb9ddd874accb8c61a3eaf18d54a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Mon, 10 Nov 2025 16:21:28 +0100 Subject: [PATCH 01/12] wip --- datafusion/physical-plan/src/filter.rs | 81 +++++++++++++++++--------- 1 file changed, 54 insertions(+), 27 deletions(-) diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 5ba508a8defe..b594ce2eb19f 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -42,7 +42,7 @@ use crate::{ DisplayFormatType, ExecutionPlan, }; -use arrow::compute::filter_record_batch; +use arrow::compute::{filter_record_batch, BatchCoalescer}; use arrow::datatypes::{DataType, SchemaRef}; use arrow::record_batch::RecordBatch; use datafusion_common::cast::as_boolean_array; @@ -392,6 +392,8 @@ impl ExecutionPlan for FilterExec { input: self.input.execute(partition, context)?, metrics, projection: self.projection.clone(), + batch_coalescer: BatchCoalescer::new(self.schema(), 8192) + .with_biggest_coalesce_batch_size(Some(4096)), })) } @@ -627,6 +629,8 @@ struct FilterExecStream { metrics: FilterExecMetrics, /// The projection indices of the columns in the input schema projection: Option>, + /// Batch coalescer to combine small batches + batch_coalescer: BatchCoalescer, } /// The metrics for `FilterExec` @@ -652,14 +656,13 @@ pub fn batch_filter( batch: &RecordBatch, predicate: &Arc, ) -> Result { - filter_and_project(batch, predicate, None, &batch.schema()) + filter_and_project(batch, predicate, None) } fn filter_and_project( batch: &RecordBatch, predicate: &Arc, projection: Option<&Vec>, - output_schema: &SchemaRef, ) -> Result { predicate .evaluate(batch) @@ -669,14 +672,7 @@ fn filter_and_project( // Apply filter array to record batch (Ok(filter_array), None) => filter_record_batch(batch, filter_array)?, (Ok(filter_array), Some(projection)) => { - let projected_columns = projection - .iter() - .map(|i| Arc::clone(batch.column(*i))) - .collect(); - let projected_batch = RecordBatch::try_new( - Arc::clone(output_schema), - projected_columns, - )?; + let projected_batch = batch.project(projection)?; filter_record_batch(&projected_batch, filter_array)? } (Err(_), _) => { @@ -699,23 +695,54 @@ impl Stream for FilterExecStream { loop { match ready!(self.input.poll_next_unpin(cx)) { Some(Ok(batch)) => { - let timer = self.metrics.baseline_metrics.elapsed_compute().timer(); - let filtered_batch = filter_and_project( - &batch, - &self.predicate, - self.projection.as_ref(), - &self.schema, - )?; - timer.done(); - - self.metrics.selectivity.add_part(filtered_batch.num_rows()); - self.metrics.selectivity.add_total(batch.num_rows()); - - // Skip entirely filtered batches - if filtered_batch.num_rows() == 0 { - continue; + // let timer = &self.metrics.baseline_metrics.elapsed_compute().timer(); + let _ = self.predicate.as_ref() + .evaluate(&batch) + .and_then(|v| v.into_array(batch.num_rows())) + .and_then(|array| { + Ok(match (as_boolean_array(&array), self.projection.as_ref()) { + // Apply filter array to record batch + (Ok(filter_array), None) => { + self.metrics.selectivity.add_part(filter_array.true_count()); + self.metrics.selectivity.add_total(batch.num_rows()); + + self.batch_coalescer.push_batch_with_filter(batch.clone(), filter_array)?; + + } + (Ok(filter_array), Some(ref projection)) => { + let projected_batch = batch.project(projection)?; + self.metrics.selectivity.add_part(filter_array.true_count()); + self.metrics.selectivity.add_total(projected_batch.num_rows()); + + self.batch_coalescer.push_batch_with_filter(projected_batch, filter_array)?; + + } + (Err(_), _) => { + return internal_err!( + "Cannot create filter_array from non-boolean predicates" + ); + } + }) + }); + + //timer.done(); + + if self.batch_coalescer.has_completed_batch() { + poll = Poll::Ready(Some(Ok(self + .batch_coalescer + .next_completed_batch() + .expect("has_completed_batch is true")))); + break; + } + continue; + } + None => { + self.batch_coalescer.finish_buffered_batch().unwrap(); + if let Some(batch) = self.batch_coalescer.next_completed_batch() { + poll = Poll::Ready(Some(Ok(batch))); + } else { + poll = Poll::Ready(None); } - poll = Poll::Ready(Some(Ok(filtered_batch))); break; } value => { From 9028e341294b5d837473087705fa4ebbc2a722a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Tue, 11 Nov 2025 08:19:34 +0100 Subject: [PATCH 02/12] Config batch size --- datafusion/core/src/physical_planner.rs | 2 ++ datafusion/physical-plan/src/filter.rs | 22 ++++++++++++++++++++-- 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 6a75485c6284..d5bc37bd6533 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -853,6 +853,7 @@ impl DefaultPhysicalPlanner { )? { PlanAsyncExpr::Sync(PlannedExprResult::Expr(runtime_expr)) => { FilterExec::try_new(Arc::clone(&runtime_expr[0]), physical_input)? + .with_target_batch_size(session_state.config().batch_size())? } PlanAsyncExpr::Async( async_map, @@ -871,6 +872,7 @@ impl DefaultPhysicalPlanner { .with_projection(Some( (0..input.schema().fields().len()).collect(), ))? + .with_target_batch_size(session_state.config().batch_size())? } _ => { return internal_err!( diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index b594ce2eb19f..844b9573b655 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -67,6 +67,7 @@ use futures::stream::{Stream, StreamExt}; use log::trace; const FILTER_EXEC_DEFAULT_SELECTIVITY: u8 = 20; +const FILTER_EXEC_DEFAULT_TARGET_BATCH_SIZE: usize = 8192; /// FilterExec evaluates a boolean predicate against all input batches to determine which rows to /// include in its output batches. @@ -84,6 +85,8 @@ pub struct FilterExec { cache: PlanProperties, /// The projection indices of the columns in the output schema of join projection: Option>, + /// Target batch size for output batches + target_batch_size: usize, } impl FilterExec { @@ -108,6 +111,7 @@ impl FilterExec { default_selectivity, cache, projection: None, + target_batch_size: FILTER_EXEC_DEFAULT_TARGET_BATCH_SIZE, }) } other => { @@ -155,6 +159,19 @@ impl FilterExec { default_selectivity: self.default_selectivity, cache, projection, + target_batch_size: self.target_batch_size, + }) + } + + pub fn with_target_batch_size(&self, target_batch_size: usize) -> Result { + Ok(Self { + predicate: Arc::clone(&self.predicate), + input: self.input.clone(), + metrics: self.metrics.clone(), + default_selectivity: self.default_selectivity, + cache: self.cache.clone(), + projection: self.projection.clone(), + target_batch_size, }) } @@ -392,8 +409,8 @@ impl ExecutionPlan for FilterExec { input: self.input.execute(partition, context)?, metrics, projection: self.projection.clone(), - batch_coalescer: BatchCoalescer::new(self.schema(), 8192) - .with_biggest_coalesce_batch_size(Some(4096)), + batch_coalescer: BatchCoalescer::new(self.schema(), self.target_batch_size) + .with_biggest_coalesce_batch_size(Some(self.target_batch_size / 2)), })) } @@ -551,6 +568,7 @@ impl ExecutionPlan for FilterExec { self.projection.as_ref(), )?, projection: None, + target_batch_size: self.target_batch_size, }; Some(Arc::new(new) as _) }; From eb286882f5da93a6b0fe84d38394260bcebe23bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Tue, 11 Nov 2025 08:31:43 +0100 Subject: [PATCH 03/12] fmt --- datafusion/core/src/physical_planner.rs | 4 ++-- datafusion/physical-plan/src/filter.rs | 18 +++++++++--------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index d5bc37bd6533..f10755a4594c 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -853,7 +853,7 @@ impl DefaultPhysicalPlanner { )? { PlanAsyncExpr::Sync(PlannedExprResult::Expr(runtime_expr)) => { FilterExec::try_new(Arc::clone(&runtime_expr[0]), physical_input)? - .with_target_batch_size(session_state.config().batch_size())? + .with_batch_size(session_state.config().batch_size())? } PlanAsyncExpr::Async( async_map, @@ -872,7 +872,7 @@ impl DefaultPhysicalPlanner { .with_projection(Some( (0..input.schema().fields().len()).collect(), ))? - .with_target_batch_size(session_state.config().batch_size())? + .with_batch_size(session_state.config().batch_size())? } _ => { return internal_err!( diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 844b9573b655..efdade7412de 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -67,7 +67,7 @@ use futures::stream::{Stream, StreamExt}; use log::trace; const FILTER_EXEC_DEFAULT_SELECTIVITY: u8 = 20; -const FILTER_EXEC_DEFAULT_TARGET_BATCH_SIZE: usize = 8192; +const FILTER_EXEC_DEFAULT_BATCH_SIZE: usize = 8192; /// FilterExec evaluates a boolean predicate against all input batches to determine which rows to /// include in its output batches. @@ -86,7 +86,7 @@ pub struct FilterExec { /// The projection indices of the columns in the output schema of join projection: Option>, /// Target batch size for output batches - target_batch_size: usize, + batch_size: usize, } impl FilterExec { @@ -111,7 +111,7 @@ impl FilterExec { default_selectivity, cache, projection: None, - target_batch_size: FILTER_EXEC_DEFAULT_TARGET_BATCH_SIZE, + batch_size: FILTER_EXEC_DEFAULT_BATCH_SIZE, }) } other => { @@ -159,11 +159,11 @@ impl FilterExec { default_selectivity: self.default_selectivity, cache, projection, - target_batch_size: self.target_batch_size, + batch_size: self.batch_size, }) } - pub fn with_target_batch_size(&self, target_batch_size: usize) -> Result { + pub fn with_batch_size(&self, batch_size: usize) -> Result { Ok(Self { predicate: Arc::clone(&self.predicate), input: self.input.clone(), @@ -171,7 +171,7 @@ impl FilterExec { default_selectivity: self.default_selectivity, cache: self.cache.clone(), projection: self.projection.clone(), - target_batch_size, + batch_size, }) } @@ -409,8 +409,8 @@ impl ExecutionPlan for FilterExec { input: self.input.execute(partition, context)?, metrics, projection: self.projection.clone(), - batch_coalescer: BatchCoalescer::new(self.schema(), self.target_batch_size) - .with_biggest_coalesce_batch_size(Some(self.target_batch_size / 2)), + batch_coalescer: BatchCoalescer::new(self.schema(), self.batch_size) + .with_biggest_coalesce_batch_size(Some(self.batch_size / 2)), })) } @@ -568,7 +568,7 @@ impl ExecutionPlan for FilterExec { self.projection.as_ref(), )?, projection: None, - target_batch_size: self.target_batch_size, + batch_size: self.batch_size, }; Some(Arc::new(new) as _) }; From ca4ebf2b8e88cd0f630d42b90be5c04a02c12e44 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Tue, 11 Nov 2025 08:58:24 +0100 Subject: [PATCH 04/12] Bring back timer --- datafusion/physical-plan/src/filter.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index efdade7412de..de75aa16c78a 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -710,15 +710,16 @@ impl Stream for FilterExecStream { cx: &mut Context<'_>, ) -> Poll> { let poll; + let elapsed_compute = self.metrics.baseline_metrics.elapsed_compute().clone(); loop { match ready!(self.input.poll_next_unpin(cx)) { Some(Ok(batch)) => { - // let timer = &self.metrics.baseline_metrics.elapsed_compute().timer(); + let timer = elapsed_compute.timer(); let _ = self.predicate.as_ref() .evaluate(&batch) .and_then(|v| v.into_array(batch.num_rows())) .and_then(|array| { - Ok(match (as_boolean_array(&array), self.projection.as_ref()) { + Ok(match (as_boolean_array(&array), &self.projection) { // Apply filter array to record batch (Ok(filter_array), None) => { self.metrics.selectivity.add_part(filter_array.true_count()); @@ -727,7 +728,7 @@ impl Stream for FilterExecStream { self.batch_coalescer.push_batch_with_filter(batch.clone(), filter_array)?; } - (Ok(filter_array), Some(ref projection)) => { + (Ok(filter_array), Some(projection)) => { let projected_batch = batch.project(projection)?; self.metrics.selectivity.add_part(filter_array.true_count()); self.metrics.selectivity.add_total(projected_batch.num_rows()); @@ -743,7 +744,7 @@ impl Stream for FilterExecStream { }) }); - //timer.done(); + timer.done(); if self.batch_coalescer.has_completed_batch() { poll = Poll::Ready(Some(Ok(self From 0bff7175577ca607dc6e256fbf794e99d9f428ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Tue, 11 Nov 2025 09:00:22 +0100 Subject: [PATCH 05/12] fix --- datafusion/physical-plan/src/filter.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index de75aa16c78a..a4e04f340b52 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -715,7 +715,7 @@ impl Stream for FilterExecStream { match ready!(self.input.poll_next_unpin(cx)) { Some(Ok(batch)) => { let timer = elapsed_compute.timer(); - let _ = self.predicate.as_ref() + self.predicate.as_ref() .evaluate(&batch) .and_then(|v| v.into_array(batch.num_rows())) .and_then(|array| { @@ -737,12 +737,12 @@ impl Stream for FilterExecStream { } (Err(_), _) => { - return internal_err!( + internal_err!( "Cannot create filter_array from non-boolean predicates" - ); + ) } }) - }); + })?; timer.done(); From f1b3d245c736dc12ab816050206ed0d405bc0937 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Tue, 11 Nov 2025 09:12:03 +0100 Subject: [PATCH 06/12] Refactor --- datafusion/physical-plan/src/filter.rs | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index a4e04f340b52..90eb7b365cbd 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -20,6 +20,7 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{ready, Context, Poll}; +use arrow::array; use itertools::Itertools; use super::{ @@ -719,24 +720,24 @@ impl Stream for FilterExecStream { .evaluate(&batch) .and_then(|v| v.into_array(batch.num_rows())) .and_then(|array| { - Ok(match (as_boolean_array(&array), &self.projection) { + Ok(match self.projection { + Some(ref projection) => { + let projected_batch = batch.project(projection)?; + (array, projected_batch) + }, + None => (array, batch) + }) + }).and_then(|(array, batch)| { + Ok(match as_boolean_array(&array) { // Apply filter array to record batch - (Ok(filter_array), None) => { + Ok(filter_array) => { self.metrics.selectivity.add_part(filter_array.true_count()); self.metrics.selectivity.add_total(batch.num_rows()); self.batch_coalescer.push_batch_with_filter(batch.clone(), filter_array)?; } - (Ok(filter_array), Some(projection)) => { - let projected_batch = batch.project(projection)?; - self.metrics.selectivity.add_part(filter_array.true_count()); - self.metrics.selectivity.add_total(projected_batch.num_rows()); - - self.batch_coalescer.push_batch_with_filter(projected_batch, filter_array)?; - - } - (Err(_), _) => { + Err(_) => { internal_err!( "Cannot create filter_array from non-boolean predicates" ) From f71d1db6df8ee8c0218223c9e3b5c2453e0948f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Tue, 11 Nov 2025 09:35:04 +0100 Subject: [PATCH 07/12] Refactor --- datafusion/physical-plan/src/filter.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 90eb7b365cbd..8f69e85edd1a 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -20,7 +20,6 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{ready, Context, Poll}; -use arrow::array; use itertools::Itertools; use super::{ From 2a3b4ef31d1348928f46af1cce66fa3b12cb7316 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Tue, 11 Nov 2025 09:36:20 +0100 Subject: [PATCH 08/12] Refactor --- datafusion/physical-plan/src/filter.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 8f69e85edd1a..37e53bf3ac6a 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -727,21 +727,21 @@ impl Stream for FilterExecStream { None => (array, batch) }) }).and_then(|(array, batch)| { - Ok(match as_boolean_array(&array) { + match as_boolean_array(&array) { // Apply filter array to record batch Ok(filter_array) => { self.metrics.selectivity.add_part(filter_array.true_count()); self.metrics.selectivity.add_total(batch.num_rows()); self.batch_coalescer.push_batch_with_filter(batch.clone(), filter_array)?; - + Ok(()) } Err(_) => { internal_err!( "Cannot create filter_array from non-boolean predicates" ) } - }) + } })?; timer.done(); From 4386c6877a288893f418d16b669af18ea359086a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Tue, 11 Nov 2025 10:39:56 +0100 Subject: [PATCH 09/12] clippy --- datafusion/physical-plan/src/filter.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 37e53bf3ac6a..9e35b059ac86 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -166,7 +166,7 @@ impl FilterExec { pub fn with_batch_size(&self, batch_size: usize) -> Result { Ok(Self { predicate: Arc::clone(&self.predicate), - input: self.input.clone(), + input: Arc::clone(&self.input), metrics: self.metrics.clone(), default_selectivity: self.default_selectivity, cache: self.cache.clone(), From 02e7582fd5bf745d8108042d43dca33145e54665 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Tue, 11 Nov 2025 12:29:02 +0100 Subject: [PATCH 10/12] Handle error --- datafusion/physical-plan/src/filter.rs | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 9e35b059ac86..cd193a6d28f1 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -756,11 +756,19 @@ impl Stream for FilterExecStream { continue; } None => { - self.batch_coalescer.finish_buffered_batch().unwrap(); - if let Some(batch) = self.batch_coalescer.next_completed_batch() { - poll = Poll::Ready(Some(Ok(batch))); - } else { - poll = Poll::Ready(None); + match self.batch_coalescer.finish_buffered_batch() { + Ok(()) => { + if let Some(batch) = + self.batch_coalescer.next_completed_batch() + { + poll = Poll::Ready(Some(Ok(batch))); + } else { + poll = Poll::Ready(None); + } + } + Err(e) => { + poll = Poll::Ready(Some(Err(e.into()))); + } } break; } From a64e8ef3d845e726df19b815d1a6bc894e7811b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Tue, 11 Nov 2025 12:54:05 +0100 Subject: [PATCH 11/12] Improve comments --- datafusion/physical-plan/src/filter.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index cd193a6d28f1..a17e96b97ad8 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -728,7 +728,6 @@ impl Stream for FilterExecStream { }) }).and_then(|(array, batch)| { match as_boolean_array(&array) { - // Apply filter array to record batch Ok(filter_array) => { self.metrics.selectivity.add_part(filter_array.true_count()); self.metrics.selectivity.add_total(batch.num_rows()); @@ -756,6 +755,7 @@ impl Stream for FilterExecStream { continue; } None => { + // Flush any remaining buffered batch match self.batch_coalescer.finish_buffered_batch() { Ok(()) => { if let Some(batch) = From c11583d325c67d03cd7d6e43de76f42cfbc88be1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Tue, 11 Nov 2025 12:56:34 +0100 Subject: [PATCH 12/12] Simplify --- datafusion/physical-plan/src/filter.rs | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index a17e96b97ad8..0c583e1fb973 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -758,13 +758,9 @@ impl Stream for FilterExecStream { // Flush any remaining buffered batch match self.batch_coalescer.finish_buffered_batch() { Ok(()) => { - if let Some(batch) = - self.batch_coalescer.next_completed_batch() - { - poll = Poll::Ready(Some(Ok(batch))); - } else { - poll = Poll::Ready(None); - } + poll = Poll::Ready( + self.batch_coalescer.next_completed_batch().map(Ok), + ); } Err(e) => { poll = Poll::Ready(Some(Err(e.into())));