Skip to content

Commit d799780

Browse files
committed
Impl gather_filters_for_pushdown for CoalescePartitionsExec (apache#18046)
* Impl gather_filters_for_pushdown for CoalescePartitionsExec * add tests
1 parent 7b53d3e commit d799780

File tree

3 files changed

+146
-2
lines changed

3 files changed

+146
-2
lines changed

datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs

Lines changed: 133 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,11 @@ use datafusion::{
3333
prelude::{ParquetReadOptions, SessionConfig, SessionContext},
3434
scalar::ScalarValue,
3535
};
36+
use datafusion_catalog::memory::DataSourceExec;
3637
use datafusion_common::config::ConfigOptions;
38+
use datafusion_datasource::{
39+
file_groups::FileGroup, file_scan_config::FileScanConfigBuilder, PartitionedFile,
40+
};
3741
use datafusion_execution::object_store::ObjectStoreUrl;
3842
use datafusion_expr::ScalarUDF;
3943
use datafusion_functions::math::random::RandomFunc;
@@ -60,6 +64,8 @@ use futures::StreamExt;
6064
use object_store::{memory::InMemory, ObjectStore};
6165
use util::{format_plan_for_test, OptimizationTest, TestNode, TestScanBuilder};
6266

67+
use crate::physical_optimizer::filter_pushdown::util::TestSource;
68+
6369
mod util;
6470

6571
#[test]
@@ -834,6 +840,132 @@ async fn test_topk_dynamic_filter_pushdown_multi_column_sort() {
834840
assert!(stream.next().await.is_none());
835841
}
836842

843+
#[tokio::test]
844+
async fn test_topk_filter_passes_through_coalesce_partitions() {
845+
// Create multiple batches for different partitions
846+
let batches = vec![
847+
record_batch!(
848+
("a", Utf8, ["aa", "ab"]),
849+
("b", Utf8, ["bd", "bc"]),
850+
("c", Float64, [1.0, 2.0])
851+
)
852+
.unwrap(),
853+
record_batch!(
854+
("a", Utf8, ["ac", "ad"]),
855+
("b", Utf8, ["bb", "ba"]),
856+
("c", Float64, [2.0, 1.0])
857+
)
858+
.unwrap(),
859+
];
860+
861+
// Create a source that supports all batches
862+
let source = Arc::new(TestSource::new(true, batches));
863+
864+
let base_config = FileScanConfigBuilder::new(
865+
ObjectStoreUrl::parse("test://").unwrap(),
866+
Arc::clone(&schema()),
867+
source,
868+
)
869+
.with_file_groups(vec![
870+
// Partition 0
871+
FileGroup::new(vec![PartitionedFile::new("test1.parquet", 123)]),
872+
// Partition 1
873+
FileGroup::new(vec![PartitionedFile::new("test2.parquet", 123)]),
874+
])
875+
.build();
876+
877+
let scan = DataSourceExec::from_data_source(base_config);
878+
879+
// Add CoalescePartitionsExec to merge the two partitions
880+
let coalesce = Arc::new(CoalescePartitionsExec::new(scan)) as Arc<dyn ExecutionPlan>;
881+
882+
// Add SortExec with TopK
883+
let plan = Arc::new(
884+
SortExec::new(
885+
LexOrdering::new(vec![PhysicalSortExpr::new(
886+
col("b", &schema()).unwrap(),
887+
SortOptions::new(true, false),
888+
)])
889+
.unwrap(),
890+
coalesce,
891+
)
892+
.with_fetch(Some(1)),
893+
) as Arc<dyn ExecutionPlan>;
894+
895+
// Test optimization - the filter SHOULD pass through CoalescePartitionsExec
896+
// if it properly implements from_children (not all_unsupported)
897+
insta::assert_snapshot!(
898+
OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new_post_optimization(), true),
899+
@r"
900+
OptimizationTest:
901+
input:
902+
- SortExec: TopK(fetch=1), expr=[b@1 DESC NULLS LAST], preserve_partitioning=[false]
903+
- CoalescePartitionsExec
904+
- DataSourceExec: file_groups={2 groups: [[test1.parquet], [test2.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
905+
output:
906+
Ok:
907+
- SortExec: TopK(fetch=1), expr=[b@1 DESC NULLS LAST], preserve_partitioning=[false]
908+
- CoalescePartitionsExec
909+
- DataSourceExec: file_groups={2 groups: [[test1.parquet], [test2.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]
910+
"
911+
);
912+
}
913+
914+
#[tokio::test]
915+
async fn test_topk_filter_passes_through_coalesce_batches() {
916+
let batches = vec![
917+
record_batch!(
918+
("a", Utf8, ["aa", "ab"]),
919+
("b", Utf8, ["bd", "bc"]),
920+
("c", Float64, [1.0, 2.0])
921+
)
922+
.unwrap(),
923+
record_batch!(
924+
("a", Utf8, ["ac", "ad"]),
925+
("b", Utf8, ["bb", "ba"]),
926+
("c", Float64, [2.0, 1.0])
927+
)
928+
.unwrap(),
929+
];
930+
931+
let scan = TestScanBuilder::new(schema())
932+
.with_support(true)
933+
.with_batches(batches)
934+
.build();
935+
936+
let coalesce_batches =
937+
Arc::new(CoalesceBatchesExec::new(scan, 1024)) as Arc<dyn ExecutionPlan>;
938+
939+
// Add SortExec with TopK
940+
let plan = Arc::new(
941+
SortExec::new(
942+
LexOrdering::new(vec![PhysicalSortExpr::new(
943+
col("b", &schema()).unwrap(),
944+
SortOptions::new(true, false),
945+
)])
946+
.unwrap(),
947+
coalesce_batches,
948+
)
949+
.with_fetch(Some(1)),
950+
) as Arc<dyn ExecutionPlan>;
951+
952+
insta::assert_snapshot!(
953+
OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new_post_optimization(), true),
954+
@r"
955+
OptimizationTest:
956+
input:
957+
- SortExec: TopK(fetch=1), expr=[b@1 DESC NULLS LAST], preserve_partitioning=[false]
958+
- CoalesceBatchesExec: target_batch_size=1024
959+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
960+
output:
961+
Ok:
962+
- SortExec: TopK(fetch=1), expr=[b@1 DESC NULLS LAST], preserve_partitioning=[false]
963+
- CoalesceBatchesExec: target_batch_size=1024
964+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]
965+
"
966+
);
967+
}
968+
837969
#[tokio::test]
838970
async fn test_hashjoin_dynamic_filter_pushdown() {
839971
use datafusion_common::JoinType;
@@ -1478,7 +1610,7 @@ async fn test_topk_dynamic_filter_pushdown_integration() {
14781610
ctx.sql(
14791611
r"
14801612
COPY (
1481-
SELECT 1372708800 + value AS t
1613+
SELECT 1372708800 + value AS t
14821614
FROM generate_series(0, 99999)
14831615
ORDER BY t
14841616
) TO 'memory:///1.parquet'

datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ pub struct TestSource {
111111
}
112112

113113
impl TestSource {
114-
fn new(support: bool, batches: Vec<RecordBatch>) -> Self {
114+
pub fn new(support: bool, batches: Vec<RecordBatch>) -> Self {
115115
Self {
116116
support,
117117
metrics: ExecutionPlanMetricsSet::new(),

datafusion/physical-plan/src/coalesce_partitions.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,14 @@ use super::{
2828
Statistics,
2929
};
3030
use crate::execution_plan::{CardinalityEffect, EvaluationType, SchedulingType};
31+
use crate::filter_pushdown::{FilterDescription, FilterPushdownPhase};
3132
use crate::projection::{make_with_child, ProjectionExec};
3233
use crate::{DisplayFormatType, ExecutionPlan, Partitioning};
3334

35+
use datafusion_common::config::ConfigOptions;
3436
use datafusion_common::{internal_err, Result};
3537
use datafusion_execution::TaskContext;
38+
use datafusion_physical_expr::PhysicalExpr;
3639

3740
/// Merge execution plan executes partitions in parallel and combines them into a single
3841
/// partition. No guarantees are made about the order of the resulting partition.
@@ -270,6 +273,15 @@ impl ExecutionPlan for CoalescePartitionsExec {
270273
cache: self.cache.clone(),
271274
}))
272275
}
276+
277+
fn gather_filters_for_pushdown(
278+
&self,
279+
_phase: FilterPushdownPhase,
280+
parent_filters: Vec<Arc<dyn PhysicalExpr>>,
281+
_config: &ConfigOptions,
282+
) -> Result<FilterDescription> {
283+
FilterDescription::from_children(parent_filters, &self.children())
284+
}
273285
}
274286

275287
#[cfg(test)]

0 commit comments

Comments
 (0)