Skip to content

Commit 68247fc

Browse files
authored
fix: count_state use stat to eval&predicate w/out region (#7116)
* fix: count_state use stat to eval Signed-off-by: discord9 <[email protected]> * cleanup Signed-off-by: discord9 <[email protected]> * fix: use predicate without region Signed-off-by: discord9 <[email protected]> * test: diverge standalone/dist impl Signed-off-by: discord9 <[email protected]> --------- Signed-off-by: discord9 <[email protected]>
1 parent e386a36 commit 68247fc

File tree

13 files changed

+1221
-177
lines changed

13 files changed

+1221
-177
lines changed

src/common/function/src/aggrs/aggr_wrapper.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ use arrow::array::StructArray;
2929
use arrow_schema::{FieldRef, Fields};
3030
use common_telemetry::debug;
3131
use datafusion::functions_aggregate::all_default_aggregate_functions;
32+
use datafusion::functions_aggregate::count::Count;
33+
use datafusion::functions_aggregate::min_max::{Max, Min};
3234
use datafusion::optimizer::AnalyzerRule;
3335
use datafusion::optimizer::analyzer::type_coercion::TypeCoercion;
3436
use datafusion::physical_planner::create_aggregate_expr_and_maybe_filter;
@@ -413,6 +415,51 @@ impl AggregateUDFImpl for StateWrapper {
413415
fn coerce_types(&self, arg_types: &[DataType]) -> datafusion_common::Result<Vec<DataType>> {
414416
self.inner.coerce_types(arg_types)
415417
}
418+
419+
fn value_from_stats(
420+
&self,
421+
statistics_args: &datafusion_expr::StatisticsArgs,
422+
) -> Option<ScalarValue> {
423+
let inner = self.inner().inner().as_any();
424+
// only count/min/max need special handling here, for getting result from statistics
425+
// the result of count/min/max is also the result of count_state so can return directly
426+
let can_use_stat = inner.is::<Count>() || inner.is::<Max>() || inner.is::<Min>();
427+
if !can_use_stat {
428+
return None;
429+
}
430+
431+
// fix return type by extract the first field's data type from the struct type
432+
let state_type = if let DataType::Struct(fields) = &statistics_args.return_type {
433+
if fields.is_empty() {
434+
return None;
435+
}
436+
fields[0].data_type().clone()
437+
} else {
438+
return None;
439+
};
440+
441+
let fixed_args = datafusion_expr::StatisticsArgs {
442+
statistics: statistics_args.statistics,
443+
return_type: &state_type,
444+
is_distinct: statistics_args.is_distinct,
445+
exprs: statistics_args.exprs,
446+
};
447+
448+
let ret = self.inner().value_from_stats(&fixed_args)?;
449+
450+
// wrap the result into struct scalar value
451+
let fields = if let DataType::Struct(fields) = &statistics_args.return_type {
452+
fields
453+
} else {
454+
return None;
455+
};
456+
457+
let array = ret.to_array().ok()?;
458+
459+
let struct_array = StructArray::new(fields.clone(), vec![array], None);
460+
let ret = ScalarValue::Struct(Arc::new(struct_array));
461+
Some(ret)
462+
}
416463
}
417464

418465
/// The wrapper's input is the same as the original aggregate function's input,

src/mito2/src/read/scan_region.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1106,9 +1106,8 @@ impl ScanInput {
11061106
rows
11071107
}
11081108

1109-
/// Returns table predicate of all exprs.
1110-
pub(crate) fn predicate(&self) -> Option<&Predicate> {
1111-
self.predicate.predicate()
1109+
pub(crate) fn predicate_group(&self) -> &PredicateGroup {
1110+
&self.predicate
11121111
}
11131112

11141113
/// Returns number of memtables to scan.

src/mito2/src/read/seq_scan.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -632,8 +632,12 @@ impl RegionScanner for SeqScan {
632632
Ok(())
633633
}
634634

635-
fn has_predicate(&self) -> bool {
636-
let predicate = self.stream_ctx.input.predicate();
635+
fn has_predicate_without_region(&self) -> bool {
636+
let predicate = self
637+
.stream_ctx
638+
.input
639+
.predicate_group()
640+
.predicate_without_region();
637641
predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
638642
}
639643

src/mito2/src/read/series_scan.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -314,8 +314,12 @@ impl RegionScanner for SeriesScan {
314314
Ok(())
315315
}
316316

317-
fn has_predicate(&self) -> bool {
318-
let predicate = self.stream_ctx.input.predicate();
317+
fn has_predicate_without_region(&self) -> bool {
318+
let predicate = self
319+
.stream_ctx
320+
.input
321+
.predicate_group()
322+
.predicate_without_region();
319323
predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
320324
}
321325

src/mito2/src/read/unordered_scan.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -427,8 +427,14 @@ impl RegionScanner for UnorderedScan {
427427
.map_err(BoxedError::new)
428428
}
429429

430-
fn has_predicate(&self) -> bool {
431-
let predicate = self.stream_ctx.input.predicate();
430+
/// If this scanner have predicate other than region partition exprs
431+
fn has_predicate_without_region(&self) -> bool {
432+
let predicate = self
433+
.stream_ctx
434+
.input
435+
.predicate_group()
436+
.predicate_without_region();
437+
432438
predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
433439
}
434440

src/store-api/src/region_engine.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -444,8 +444,8 @@ pub trait RegionScanner: Debug + DisplayAs + Send {
444444
partition: usize,
445445
) -> Result<SendableRecordBatchStream, BoxedError>;
446446

447-
/// Check if there is any predicate that may be executed in this scanner.
448-
fn has_predicate(&self) -> bool;
447+
/// Check if there is any predicate exclude region partition exprs that may be executed in this scanner.
448+
fn has_predicate_without_region(&self) -> bool;
449449

450450
/// Sets whether the scanner is reading a logical region.
451451
fn set_logical_region(&mut self, logical_region: bool);
@@ -857,7 +857,7 @@ impl RegionScanner for SinglePartitionScanner {
857857
Ok(result.unwrap())
858858
}
859859

860-
fn has_predicate(&self) -> bool {
860+
fn has_predicate_without_region(&self) -> bool {
861861
false
862862
}
863863

src/table/src/table/scan.rs

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -335,25 +335,26 @@ impl ExecutionPlan for RegionScanExec {
335335
return Ok(Statistics::new_unknown(self.schema().as_ref()));
336336
}
337337

338-
let statistics = if self.append_mode && !self.scanner.lock().unwrap().has_predicate() {
339-
let column_statistics = self
340-
.arrow_schema
341-
.fields
342-
.iter()
343-
.map(|_| ColumnStatistics {
344-
distinct_count: Precision::Exact(self.total_rows),
345-
null_count: Precision::Exact(0), // all null rows are counted for append-only table
346-
..Default::default()
347-
})
348-
.collect();
349-
Statistics {
350-
num_rows: Precision::Exact(self.total_rows),
351-
total_byte_size: Default::default(),
352-
column_statistics,
353-
}
354-
} else {
355-
Statistics::new_unknown(&self.arrow_schema)
356-
};
338+
let statistics =
339+
if self.append_mode && !self.scanner.lock().unwrap().has_predicate_without_region() {
340+
let column_statistics = self
341+
.arrow_schema
342+
.fields
343+
.iter()
344+
.map(|_| ColumnStatistics {
345+
distinct_count: Precision::Exact(self.total_rows),
346+
null_count: Precision::Exact(0), // all null rows are counted for append-only table
347+
..Default::default()
348+
})
349+
.collect();
350+
Statistics {
351+
num_rows: Precision::Exact(self.total_rows),
352+
total_byte_size: Default::default(),
353+
column_statistics,
354+
}
355+
} else {
356+
Statistics::new_unknown(&self.arrow_schema)
357+
};
357358
Ok(statistics)
358359
}
359360

0 commit comments

Comments
 (0)