Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions src/common/function/src/aggrs/aggr_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ use arrow::array::StructArray;
use arrow_schema::{FieldRef, Fields};
use common_telemetry::debug;
use datafusion::functions_aggregate::all_default_aggregate_functions;
use datafusion::functions_aggregate::count::Count;
use datafusion::functions_aggregate::min_max::{Max, Min};
use datafusion::optimizer::AnalyzerRule;
use datafusion::optimizer::analyzer::type_coercion::TypeCoercion;
use datafusion::physical_planner::create_aggregate_expr_and_maybe_filter;
Expand Down Expand Up @@ -412,6 +414,51 @@ impl AggregateUDFImpl for StateWrapper {
fn coerce_types(&self, arg_types: &[DataType]) -> datafusion_common::Result<Vec<DataType>> {
self.inner.coerce_types(arg_types)
}

fn value_from_stats(
&self,
statistics_args: &datafusion_expr::StatisticsArgs,
) -> Option<ScalarValue> {
let inner = self.inner().inner().as_any();
// only count/min/max need special handling here, for getting result from statistics
// the result of count/min/max is also the result of count_state so can return directly
let can_use_stat = inner.is::<Count>() || inner.is::<Max>() || inner.is::<Min>();
if !can_use_stat {
return None;
}

// fix return type by extract the first field's data type from the struct type
let state_type = if let DataType::Struct(fields) = &statistics_args.return_type {
if fields.is_empty() {
return None;
}
fields[0].data_type().clone()
} else {
return None;
};

let fixed_args = datafusion_expr::StatisticsArgs {
statistics: statistics_args.statistics,
return_type: &state_type,
is_distinct: statistics_args.is_distinct,
exprs: statistics_args.exprs,
};

let ret = self.inner().value_from_stats(&fixed_args)?;

// wrap the result into struct scalar value
let fields = if let DataType::Struct(fields) = &statistics_args.return_type {
fields
} else {
return None;
};

let array = ret.to_array().ok()?;

let struct_array = StructArray::new(fields.clone(), vec![array], None);
let ret = ScalarValue::Struct(Arc::new(struct_array));
Some(ret)
}
}

/// The wrapper's input is the same as the original aggregate function's input,
Expand Down
5 changes: 2 additions & 3 deletions src/mito2/src/read/scan_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1106,9 +1106,8 @@ impl ScanInput {
rows
}

/// Returns table predicate of all exprs.
pub(crate) fn predicate(&self) -> Option<&Predicate> {
self.predicate.predicate()
pub(crate) fn predicate_group(&self) -> &PredicateGroup {
&self.predicate
}

/// Returns number of memtables to scan.
Expand Down
8 changes: 6 additions & 2 deletions src/mito2/src/read/seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -632,8 +632,12 @@ impl RegionScanner for SeqScan {
Ok(())
}

fn has_predicate(&self) -> bool {
let predicate = self.stream_ctx.input.predicate();
fn has_predicate_without_region(&self) -> bool {
let predicate = self
.stream_ctx
.input
.predicate_group()
.predicate_without_region();
predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
}

Expand Down
8 changes: 6 additions & 2 deletions src/mito2/src/read/series_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,8 +314,12 @@ impl RegionScanner for SeriesScan {
Ok(())
}

fn has_predicate(&self) -> bool {
let predicate = self.stream_ctx.input.predicate();
fn has_predicate_without_region(&self) -> bool {
let predicate = self
.stream_ctx
.input
.predicate_group()
.predicate_without_region();
predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
}

Expand Down
10 changes: 8 additions & 2 deletions src/mito2/src/read/unordered_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,8 +427,14 @@ impl RegionScanner for UnorderedScan {
.map_err(BoxedError::new)
}

fn has_predicate(&self) -> bool {
let predicate = self.stream_ctx.input.predicate();
/// If this scanner have predicate other than region partition exprs
fn has_predicate_without_region(&self) -> bool {
let predicate = self
.stream_ctx
.input
.predicate_group()
.predicate_without_region();

predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
}

Expand Down
6 changes: 3 additions & 3 deletions src/store-api/src/region_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -444,8 +444,8 @@ pub trait RegionScanner: Debug + DisplayAs + Send {
partition: usize,
) -> Result<SendableRecordBatchStream, BoxedError>;

/// Check if there is any predicate that may be executed in this scanner.
fn has_predicate(&self) -> bool;
/// Check if there is any predicate exclude region partition exprs that may be executed in this scanner.
fn has_predicate_without_region(&self) -> bool;

/// Sets whether the scanner is reading a logical region.
fn set_logical_region(&mut self, logical_region: bool);
Expand Down Expand Up @@ -857,7 +857,7 @@ impl RegionScanner for SinglePartitionScanner {
Ok(result.unwrap())
}

fn has_predicate(&self) -> bool {
fn has_predicate_without_region(&self) -> bool {
false
}

Expand Down
39 changes: 20 additions & 19 deletions src/table/src/table/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,25 +335,26 @@ impl ExecutionPlan for RegionScanExec {
return Ok(Statistics::new_unknown(self.schema().as_ref()));
}

let statistics = if self.append_mode && !self.scanner.lock().unwrap().has_predicate() {
let column_statistics = self
.arrow_schema
.fields
.iter()
.map(|_| ColumnStatistics {
distinct_count: Precision::Exact(self.total_rows),
null_count: Precision::Exact(0), // all null rows are counted for append-only table
..Default::default()
})
.collect();
Statistics {
num_rows: Precision::Exact(self.total_rows),
total_byte_size: Default::default(),
column_statistics,
}
} else {
Statistics::new_unknown(&self.arrow_schema)
};
let statistics =
if self.append_mode && !self.scanner.lock().unwrap().has_predicate_without_region() {
let column_statistics = self
.arrow_schema
.fields
.iter()
.map(|_| ColumnStatistics {
distinct_count: Precision::Exact(self.total_rows),
null_count: Precision::Exact(0), // all null rows are counted for append-only table
..Default::default()
})
.collect();
Statistics {
num_rows: Precision::Exact(self.total_rows),
total_byte_size: Default::default(),
column_statistics,
}
} else {
Statistics::new_unknown(&self.arrow_schema)
};
Ok(statistics)
}

Expand Down
Loading