Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions datafusion/physical-plan/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,11 @@ required-features = ["test_utils"]
harness = false
name = "aggregate_vectorized"
required-features = ["test_utils"]

[[bench]]
name = "single_column_aggr"
harness = false

[profile.profiling]
inherits = "release"
debug = true
252 changes: 252 additions & 0 deletions datafusion/physical-plan/benches/single_column_aggr.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
use arrow::array::{ArrayRef, StringDictionaryBuilder};
use arrow::datatypes::{DataType, Field, Schema, UInt8Type};
use criterion::{Criterion, criterion_group, criterion_main};
use datafusion_expr::EmitTo;
use datafusion_physical_plan::aggregates::group_values::single_group_by::dictionary::GroupValuesDictionary;
use datafusion_physical_plan::aggregates::group_values::{GroupValues, new_group_values};
use datafusion_physical_plan::aggregates::order::GroupOrdering;
use std::sync::Arc;
#[derive(Debug)]
enum Cardinality {
Xsmall, // 1
Small, // 10
Medium, // 50
Large, // 200
}
#[derive(Debug)]
enum BatchSize {
Small, // 8192
Medium, // 32768
Large, // 65536
}
#[derive(Debug)]
enum NullRate {
Zero, // 0%
Low, // 1%
Medium, // 5%
High, // 20%
}
#[derive(Debug, Clone)]
enum GroupType {
Dictionary,
GroupValueRows,
}
fn create_string_values(cardinality: &Cardinality) -> Vec<String> {
let num_values = match cardinality {
Cardinality::Xsmall => 3,
Cardinality::Small => 10,
Cardinality::Medium => 50,
Cardinality::Large => 200,
};
(0..num_values)
.map(|i| format!("group_value_{:06}", i))
.collect()
}
fn create_batch(batch_size: &BatchSize, cardinality: &Cardinality) -> Vec<String> {
let size = match batch_size {
BatchSize::Small => 8192,
BatchSize::Medium => 32768,
BatchSize::Large => 65536,
};
let unique_strings = create_string_values(cardinality);
if unique_strings.is_empty() {
return Vec::new();
}

unique_strings.iter().cycle().take(size).cloned().collect()
}
fn strings_to_dict_array(values: Vec<Option<String>>) -> ArrayRef {
let mut builder = StringDictionaryBuilder::<UInt8Type>::new();
for v in values {
match v {
Some(v) => builder.append_value(v),
None => builder.append_null(),
}
}
Arc::new(builder.finish())
}
fn introduce_nulls(values: Vec<String>, null_rate: &NullRate) -> Vec<Option<String>> {
let rate = match null_rate {
NullRate::Zero => 0.0,
NullRate::Low => 0.01,
NullRate::Medium => 0.05,
NullRate::High => 0.20,
};
values
.into_iter()
.map(|v| {
if rand::random::<f64>() < rate {
None
} else {
Some(v)
}
})
.collect()
}

fn generate_group_values(kind: GroupType) -> Box<dyn GroupValues> {
match kind {
GroupType::GroupValueRows => {
// we know this is going to hit the fallback path I.E GroupValueRows, but for the sake of avoiding making private items public call the public api
let schema = Arc::new(Schema::new(vec![Field::new(
"group_col",
DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
false,
)]));
new_group_values(schema, &GroupOrdering::None).unwrap()
}
GroupType::Dictionary => {
// call custom path directly
Box::new(GroupValuesDictionary::<UInt8Type>::new(&DataType::Utf8))
}
}
}

fn bench_single_column_group_values(c: &mut Criterion) {
let group_types = [GroupType::GroupValueRows, GroupType::Dictionary];
let cardinalities = [
Cardinality::Xsmall,
Cardinality::Small,
Cardinality::Medium,
Cardinality::Large,
];
let batch_sizes = [BatchSize::Small, BatchSize::Medium, BatchSize::Large];
let null_rates = [
NullRate::Zero,
NullRate::Low,
NullRate::Medium,
NullRate::High,
];

for cardinality in &cardinalities {
for batch_size in &batch_sizes {
for null_rate in &null_rates {
for group_type in &group_types {
let group_name = format!(
"t1_{:?}_cardinality_{:?}_batch_{:?}_null_rate_{:?}",
group_type, cardinality, batch_size, null_rate
);

let string_vec = create_batch(batch_size, cardinality);
let nullable_values = introduce_nulls(string_vec, null_rate);
let col_ref = match group_type {
GroupType::Dictionary | GroupType::GroupValueRows => {
strings_to_dict_array(nullable_values.clone())
}
};
c.bench_function(&group_name, |b| {
b.iter_batched(
|| {
//create fresh group values for each iteration
let gv = generate_group_values(group_type.clone());
let col = col_ref.clone();
(gv, col)
},
|(mut group_values, col)| {
let mut groups = Vec::new();
group_values.intern(&[col], &mut groups).unwrap();
//group_values.emit(EmitTo::All).unwrap();
},
criterion::BatchSize::SmallInput,
);
});

// Second benchmark that alternates between intern and emit to simulate more realistic usage patterns where the same group values is used across multiple batches of the same grouping column
let multi_batch_name = format!(
"multi_batch/{:?}_cardinality_{:?}_batch_{:?}_null_rate_{:?}",
group_type, cardinality, batch_size, null_rate
);
c.bench_function(&multi_batch_name, |b| {
b.iter_batched(
|| {
// setup - create 3 batches to simulate multiple record batches
let gv = generate_group_values(group_type.clone());
let batch1 = col_ref.clone();
let batch2 = col_ref.clone();
let batch3 = col_ref.clone();
(gv, batch1, batch2, batch3)
},
|(mut group_values, batch1, batch2, batch3)| {
// simulate realistic aggregation flow:
// multiple intern calls (one per record batch)
// followed by emit
let mut groups = Vec::new();

group_values.intern(&[batch1], &mut groups).unwrap();
groups.clear();
group_values.intern(&[batch2], &mut groups).unwrap();
groups.clear();
group_values.intern(&[batch3], &mut groups).unwrap();

// emit once at the end like the real aggregation flow
group_values.emit(EmitTo::All).unwrap();
},
criterion::BatchSize::SmallInput,
);
});
}
}
}
}
}

fn bench_repeated_intern_prefab_cols(c: &mut Criterion) {
let cardinality = Cardinality::Small;
let batch_size = BatchSize::Large;
let null_rate = NullRate::Low;
let group_types = [GroupType::GroupValueRows, GroupType::Dictionary];

for group_type in &group_types {
let group_type = group_type.clone();
let string_vec = create_batch(&batch_size, &cardinality);
let nullable_values = introduce_nulls(string_vec, &null_rate);
let col_ref = match group_type {
GroupType::Dictionary | GroupType::GroupValueRows => {
strings_to_dict_array(nullable_values.clone())
}
};

// Build once outside the benchmark iteration and reuse in intern calls.
let arr1 = col_ref.clone();
let arr2 = col_ref.clone();
let arr3 = col_ref.clone();
let arr4 = col_ref.clone();

let group_name = format!(
"repeated_intern/{:?}_cardinality_{:?}_batch_{:?}_null_rate_{:?}",
group_type, cardinality, batch_size, null_rate
);
c.bench_function(&group_name, |b| {
b.iter_batched(
|| generate_group_values(group_type.clone()),
|mut group_values| {
let mut groups = Vec::new();

group_values
.intern(std::slice::from_ref(&arr1), &mut groups)
.unwrap();
groups.clear();
group_values
.intern(std::slice::from_ref(&arr2), &mut groups)
.unwrap();
groups.clear();
group_values
.intern(std::slice::from_ref(&arr3), &mut groups)
.unwrap();
groups.clear();
group_values
.intern(std::slice::from_ref(&arr4), &mut groups)
.unwrap();
},
criterion::BatchSize::SmallInput,
);
});
}
}

criterion_group!(
benches,
bench_single_column_group_values,
bench_repeated_intern_prefab_cols
);
criterion_main!(benches);
Binary file added datafusion/physical-plan/profile.json.gz
Binary file not shown.
78 changes: 75 additions & 3 deletions datafusion/physical-plan/src/aggregates/group_values/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ use datafusion_expr::EmitTo;

pub mod multi_group_by;

mod row;
mod single_group_by;
pub mod row;
pub mod single_group_by;
use datafusion_physical_expr::binary_map::OutputType;
use multi_group_by::GroupValuesColumn;
use row::GroupValuesRows;
Expand All @@ -41,7 +41,8 @@ pub(crate) use single_group_by::primitive::HashValue;
use crate::aggregates::{
group_values::single_group_by::{
boolean::GroupValuesBoolean, bytes::GroupValuesBytes,
bytes_view::GroupValuesBytesView, primitive::GroupValuesPrimitive,
bytes_view::GroupValuesBytesView, dictionary::GroupValuesDictionary,
primitive::GroupValuesPrimitive,
},
order::GroupOrdering,
};
Expand Down Expand Up @@ -196,6 +197,56 @@ pub fn new_group_values(
DataType::Boolean => {
return Ok(Box::new(GroupValuesBoolean::new()));
}
DataType::Dictionary(key_type, value_type) => {
if supported_single_dictionary_value(value_type) {
return match key_type.as_ref() {
// TODO: turn this into a macro
DataType::Int8 => {
Ok(Box::new(GroupValuesDictionary::<
arrow::datatypes::Int8Type,
>::new(value_type)))
}
DataType::Int16 => {
Ok(Box::new(GroupValuesDictionary::<
arrow::datatypes::Int16Type,
>::new(value_type)))
}
DataType::Int32 => {
Ok(Box::new(GroupValuesDictionary::<
arrow::datatypes::Int32Type,
>::new(value_type)))
}
DataType::Int64 => {
Ok(Box::new(GroupValuesDictionary::<
arrow::datatypes::Int64Type,
>::new(value_type)))
}
DataType::UInt8 => {
Ok(Box::new(GroupValuesDictionary::<
arrow::datatypes::UInt8Type,
>::new(value_type)))
}
DataType::UInt16 => {
Ok(Box::new(GroupValuesDictionary::<
arrow::datatypes::UInt16Type,
>::new(value_type)))
}
DataType::UInt32 => {
Ok(Box::new(GroupValuesDictionary::<
arrow::datatypes::UInt32Type,
>::new(value_type)))
}
DataType::UInt64 => {
Ok(Box::new(GroupValuesDictionary::<
arrow::datatypes::UInt64Type,
>::new(value_type)))
}
_ => Err(datafusion_common::DataFusionError::NotImplemented(
format!("Unsupported dictionary key type: {key_type:?}",),
)),
};
}
}
_ => {}
}
}
Expand All @@ -207,6 +258,27 @@ pub fn new_group_values(
Ok(Box::new(GroupValuesColumn::<true>::try_new(schema)?))
}
} else {
// TODO: add specialized implementation for dictionary encoding columns for 2+ group by columns case
Ok(Box::new(GroupValuesRows::try_new(schema)?))
}
}

fn supported_single_dictionary_value(t: &DataType) -> bool {
matches!(
t,
DataType::Utf8
| DataType::LargeUtf8
| DataType::Binary
| DataType::LargeBinary
| DataType::Utf8View
| DataType::BinaryView
| DataType::Int8
| DataType::Int16
| DataType::Int32
| DataType::Int64
| DataType::UInt8
| DataType::UInt16
| DataType::UInt32
| DataType::UInt64
)
}
Loading
Loading