diff --git a/datafusion/physical-plan/benches/aggregate_vectorized.rs b/datafusion/physical-plan/benches/aggregate_vectorized.rs index 66e7a28a28b42..703ad6e484b74 100644 --- a/datafusion/physical-plan/benches/aggregate_vectorized.rs +++ b/datafusion/physical-plan/benches/aggregate_vectorized.rs @@ -16,6 +16,7 @@ // under the License. use arrow::array::ArrayRef; +use arrow::buffer::BooleanBuffer; use arrow::datatypes::{Int32Type, StringViewType}; use arrow::util::bench_util::{ create_primitive_array, create_string_view_array_with_len, @@ -29,7 +30,7 @@ use criterion::{ }; use datafusion_physical_plan::aggregates::group_values::multi_group_by::bytes_view::ByteViewGroupValueBuilder; use datafusion_physical_plan::aggregates::group_values::multi_group_by::primitive::PrimitiveGroupValueBuilder; -use datafusion_physical_plan::aggregates::group_values::multi_group_by::GroupColumn; +use datafusion_physical_plan::aggregates::group_values::multi_group_by::{FixedBitPackedMutableBuffer, GroupColumn}; use rand::distr::{Bernoulli, Distribution}; use std::hint::black_box; use std::sync::Arc; @@ -118,7 +119,7 @@ fn bytes_bench( rows, input, "all_true", - vec![true; size], + &BooleanBuffer::new_set(size), ); vectorized_equal_to( group, @@ -127,10 +128,10 @@ fn bytes_bench( rows, input, "0.75 true", - { + &{ let mut rng = seedable_rng(); let d = Bernoulli::new(0.75).unwrap(); - (0..size).map(|_| d.sample(&mut rng)).collect::>() + BooleanBuffer::from_iter((0..size).map(|_| d.sample(&mut rng))) }, ); vectorized_equal_to( @@ -140,10 +141,10 @@ fn bytes_bench( rows, input, "0.5 true", - { + &{ let mut rng = seedable_rng(); let d = Bernoulli::new(0.5).unwrap(); - (0..size).map(|_| d.sample(&mut rng)).collect::>() + BooleanBuffer::from_iter((0..size).map(|_| d.sample(&mut rng))) }, ); vectorized_equal_to( @@ -153,10 +154,10 @@ fn bytes_bench( rows, input, "0.25 true", - { + &{ let mut rng = seedable_rng(); let d = Bernoulli::new(0.25).unwrap(); - (0..size).map(|_| d.sample(&mut rng)).collect::>() + BooleanBuffer::from_iter((0..size).map(|_| d.sample(&mut rng))) }, ); // Not adding 0 true case here as if we optimize for 0 true cases the caller should avoid calling this method at all @@ -226,7 +227,7 @@ fn bench_single_primitive( rows, &input, "all_true", - vec![true; size], + &BooleanBuffer::new_set(size), ); vectorized_equal_to( group, @@ -235,10 +236,10 @@ fn bench_single_primitive( rows, &input, "0.75 true", - { + &{ let mut rng = seedable_rng(); let d = Bernoulli::new(0.75).unwrap(); - (0..size).map(|_| d.sample(&mut rng)).collect::>() + BooleanBuffer::from_iter((0..size).map(|_| d.sample(&mut rng))) }, ); vectorized_equal_to( @@ -248,10 +249,10 @@ fn bench_single_primitive( rows, &input, "0.5 true", - { + &{ let mut rng = seedable_rng(); let d = Bernoulli::new(0.5).unwrap(); - (0..size).map(|_| d.sample(&mut rng)).collect::>() + BooleanBuffer::from_iter((0..size).map(|_| d.sample(&mut rng))) }, ); vectorized_equal_to( @@ -261,10 +262,10 @@ fn bench_single_primitive( rows, &input, "0.25 true", - { + &{ let mut rng = seedable_rng(); let d = Bernoulli::new(0.25).unwrap(); - (0..size).map(|_| d.sample(&mut rng)).collect::>() + BooleanBuffer::from_iter((0..size).map(|_| d.sample(&mut rng))) }, ); // Not adding 0 true case here as if we optimize for 0 true cases the caller should avoid calling this method at all @@ -279,7 +280,7 @@ fn vectorized_equal_to( rows: &[usize], input: &ArrayRef, equal_to_result_description: &str, - equal_to_results: Vec, + equal_to_results: &BooleanBuffer, ) { let id = BenchmarkId::new( function_name, @@ -291,7 +292,7 @@ fn vectorized_equal_to( b.iter(|| { // Cloning is a must as `vectorized_equal_to` will modify the input vec // and without cloning all benchmarks after the first one won't be meaningful - let mut equal_to_results = equal_to_results.clone(); + let mut equal_to_results = FixedBitPackedMutableBuffer::from(equal_to_results); builder.vectorized_equal_to(rows, input, rows, &mut equal_to_results); // Make sure that the compiler does not optimize away the call diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/boolean.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/boolean.rs index 03e26446f5751..a21f7c7d831f6 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/boolean.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/boolean.rs @@ -17,7 +17,7 @@ use std::sync::Arc; -use crate::aggregates::group_values::multi_group_by::Nulls; +use crate::aggregates::group_values::multi_group_by::{FixedBitPackedMutableBuffer, Nulls}; use crate::aggregates::group_values::multi_group_by::{nulls_equal_to, GroupColumn}; use crate::aggregates::group_values::null_builder::MaybeNullBufferBuilder; use arrow::array::{Array as _, ArrayRef, AsArray, BooleanArray, BooleanBufferBuilder}; @@ -81,19 +81,18 @@ impl GroupColumn for BooleanGroupValueBuilder { lhs_rows: &[usize], array: &ArrayRef, rhs_rows: &[usize], - equal_to_results: &mut [bool], + equal_to_results: &mut FixedBitPackedMutableBuffer, ) { let array = array.as_boolean(); let iter = izip!( lhs_rows.iter(), rhs_rows.iter(), - equal_to_results.iter_mut(), ); - for (&lhs_row, &rhs_row, equal_to_result) in iter { + for (index, (&lhs_row, &rhs_row)) in iter.enumerate() { // Has found not equal to in previous column, don't need to check - if !*equal_to_result { + if !equal_to_results.0.get_bit(index) { continue; } @@ -101,12 +100,12 @@ impl GroupColumn for BooleanGroupValueBuilder { let exist_null = self.nulls.is_null(lhs_row); let input_null = array.is_null(rhs_row); if let Some(result) = nulls_equal_to(exist_null, input_null) { - *equal_to_result = result; + equal_to_results.0.set_bit(index, result); continue; } } - *equal_to_result = self.buffer.get_bit(lhs_row) == array.value(rhs_row); + equal_to_results.0.set_bit(index, self.buffer.get_bit(lhs_row) == array.value(rhs_row)); } } @@ -213,10 +212,10 @@ mod tests { lhs_rows: &[usize], input_array: &ArrayRef, rhs_rows: &[usize], - equal_to_results: &mut Vec| { + equal_to_results: &mut FixedBitPackedMutableBuffer| { let iter = lhs_rows.iter().zip(rhs_rows.iter()); for (idx, (&lhs_row, &rhs_row)) in iter.enumerate() { - equal_to_results[idx] = builder.equal_to(lhs_row, input_array, rhs_row); + equal_to_results.set_bit(idx, builder.equal_to(lhs_row, input_array, rhs_row)); } }; @@ -237,7 +236,7 @@ mod tests { lhs_rows: &[usize], input_array: &ArrayRef, rhs_rows: &[usize], - equal_to_results: &mut Vec| { + equal_to_results: &mut FixedBitPackedMutableBuffer| { builder.vectorized_equal_to( lhs_rows, input_array, @@ -257,7 +256,7 @@ mod tests { &[usize], &ArrayRef, &[usize], - &mut Vec, + &mut FixedBitPackedMutableBuffer, ), { // Will cover such cases: @@ -302,7 +301,7 @@ mod tests { let input_array = Arc::new(BooleanArray::new(values, nulls.finish())) as ArrayRef; // Check - let mut equal_to_results = vec![true; builder.len()]; + let mut equal_to_results = FixedBitPackedMutableBuffer::new_set(builder.len()); equal_to( &builder, &[0, 1, 2, 3, 4, 5], @@ -310,6 +309,7 @@ mod tests { &[0, 1, 2, 3, 4, 5], &mut equal_to_results, ); + let equal_to_results: Vec = equal_to_results.into(); assert!(!equal_to_results[0]); assert!(equal_to_results[1]); @@ -333,10 +333,10 @@ mod tests { lhs_rows: &[usize], input_array: &ArrayRef, rhs_rows: &[usize], - equal_to_results: &mut Vec| { + equal_to_results: &mut FixedBitPackedMutableBuffer| { let iter = lhs_rows.iter().zip(rhs_rows.iter()); for (idx, (&lhs_row, &rhs_row)) in iter.enumerate() { - equal_to_results[idx] = builder.equal_to(lhs_row, input_array, rhs_row); + equal_to_results.set_bit(idx, builder.equal_to(lhs_row, input_array, rhs_row)); } }; @@ -357,7 +357,7 @@ mod tests { lhs_rows: &[usize], input_array: &ArrayRef, rhs_rows: &[usize], - equal_to_results: &mut Vec| { + equal_to_results: &mut FixedBitPackedMutableBuffer| { builder.vectorized_equal_to( lhs_rows, input_array, @@ -377,7 +377,7 @@ mod tests { &[usize], &ArrayRef, &[usize], - &mut Vec, + &mut FixedBitPackedMutableBuffer, ), { // Will cover such cases: @@ -403,7 +403,7 @@ mod tests { ])) as ArrayRef; // Check - let mut equal_to_results = vec![true; builder.len()]; + let mut equal_to_results = FixedBitPackedMutableBuffer::new_set(builder.len()); equal_to( &builder, &[0, 1, 2, 3], @@ -411,6 +411,7 @@ mod tests { &[0, 1, 2, 3], &mut equal_to_results, ); + let equal_to_results: Vec = equal_to_results.into(); assert!(equal_to_results[0]); assert!(!equal_to_results[1]); @@ -432,13 +433,14 @@ mod tests { .vectorized_append(&all_nulls_input_array, &[0, 1, 2, 3, 4]) .unwrap(); - let mut equal_to_results = vec![true; all_nulls_input_array.len()]; + let mut equal_to_results = FixedBitPackedMutableBuffer::new_set(all_nulls_input_array.len()); builder.vectorized_equal_to( &[0, 1, 2, 3, 4], &all_nulls_input_array, &[0, 1, 2, 3, 4], &mut equal_to_results, ); + let equal_to_results: Vec = equal_to_results.into(); assert!(equal_to_results[0]); assert!(equal_to_results[1]); @@ -458,13 +460,14 @@ mod tests { .vectorized_append(&all_not_nulls_input_array, &[0, 1, 2, 3, 4]) .unwrap(); - let mut equal_to_results = vec![true; all_not_nulls_input_array.len()]; + let mut equal_to_results = FixedBitPackedMutableBuffer::new_set(all_not_nulls_input_array.len()); builder.vectorized_equal_to( &[5, 6, 7, 8, 9], &all_not_nulls_input_array, &[0, 1, 2, 3, 4], &mut equal_to_results, ); + let equal_to_results: Vec = equal_to_results.into(); assert!(equal_to_results[0]); assert!(equal_to_results[1]); diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs index d52721c2ee6c3..f4290bf2b84f9 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs @@ -15,9 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::aggregates::group_values::multi_group_by::{ - nulls_equal_to, GroupColumn, Nulls, -}; +use crate::aggregates::group_values::multi_group_by::{nulls_equal_to, FixedBitPackedMutableBuffer, GroupColumn, Nulls}; use crate::aggregates::group_values::null_builder::MaybeNullBufferBuilder; use arrow::array::{ types::GenericStringType, Array, ArrayRef, AsArray, BufferBuilder, @@ -32,6 +30,8 @@ use itertools::izip; use std::mem::size_of; use std::sync::Arc; use std::vec; +use arrow::util::bit_util::apply_bitwise_unary_op; +use crate::aggregates::group_values::multi_group_by::helper::CollectBool; /// An implementation of [`GroupColumn`] for binary and utf8 types. /// @@ -106,26 +106,52 @@ where lhs_rows: &[usize], array: &ArrayRef, rhs_rows: &[usize], - equal_to_results: &mut [bool], + equal_to_results: &mut FixedBitPackedMutableBuffer, ) where B: ByteArrayType, { let array = array.as_bytes::(); - let iter = izip!( - lhs_rows.iter(), - rhs_rows.iter(), - equal_to_results.iter_mut(), - ); - for (&lhs_row, &rhs_row, equal_to_result) in iter { - // Has found not equal to, don't need to check - if !*equal_to_result { - continue; - } + assert_eq!(lhs_rows.len(), rhs_rows.len()); + assert_eq!(lhs_rows.len(), equal_to_results.len()); + + // TODO - skip to the first true bit in equal_to_results to avoid unnecessary work + // in iterating over unnecessary bits oe even get a slice of starting from first true bit to the last true bit + + // TODO - do not assume for byte aligned, added here just for POC + let mut index = 0; + let num_rows = lhs_rows.len(); + apply_bitwise_unary_op( + equal_to_results.0.as_slice_mut(), + 0, + lhs_rows.len(), + |eq| { + // If already false, skip 64 items + if eq == 0 { + index += 64; + return 0; + } - *equal_to_result = self.do_equal_to_inner(lhs_row, array, rhs_row); - } + let result = u64::collect_bool::< + // Using rest true as we don't wanna change bits beyond num_rows + true, + _ + >( + num_rows - index, + |bit_idx| { + if !eq.get_bit(bit_idx) { + return false; + } + let current_index = index + bit_idx; + self.do_equal_to_inner(lhs_rows[current_index], array, rhs_rows[current_index]) + } + ); + + index += 64; + eq & result + } + ); } fn vectorized_append_inner( @@ -275,7 +301,7 @@ where lhs_rows: &[usize], array: &ArrayRef, rhs_rows: &[usize], - equal_to_results: &mut [bool], + equal_to_results: &mut FixedBitPackedMutableBuffer, ) { // Sanity array type match self.output_type { @@ -433,10 +459,11 @@ mod tests { use std::sync::Arc; use crate::aggregates::group_values::multi_group_by::bytes::ByteGroupValueBuilder; - use arrow::array::{ArrayRef, NullBufferBuilder, StringArray}; + use arrow::array::{ArrayRef, BooleanBufferBuilder, NullBufferBuilder, StringArray}; + use itertools::Itertools; use datafusion_common::DataFusionError; use datafusion_physical_expr::binary_map::OutputType; - + use crate::aggregates::group_values::multi_group_by::FixedBitPackedMutableBuffer; use super::GroupColumn; #[test] @@ -520,10 +547,10 @@ mod tests { lhs_rows: &[usize], input_array: &ArrayRef, rhs_rows: &[usize], - equal_to_results: &mut Vec| { + equal_to_results: &mut FixedBitPackedMutableBuffer| { let iter = lhs_rows.iter().zip(rhs_rows.iter()); for (idx, (&lhs_row, &rhs_row)) in iter.enumerate() { - equal_to_results[idx] = builder.equal_to(lhs_row, input_array, rhs_row); + equal_to_results.set_bit(idx, builder.equal_to(lhs_row, input_array, rhs_row)); } }; @@ -544,7 +571,7 @@ mod tests { lhs_rows: &[usize], input_array: &ArrayRef, rhs_rows: &[usize], - equal_to_results: &mut Vec| { + equal_to_results: &mut FixedBitPackedMutableBuffer| { builder.vectorized_equal_to( lhs_rows, input_array, @@ -575,13 +602,19 @@ mod tests { .vectorized_append(&all_nulls_input_array, &[0, 1, 2, 3, 4]) .unwrap(); - let mut equal_to_results = vec![true; all_nulls_input_array.len()]; + + let mut equal_to_results = FixedBitPackedMutableBuffer({ + let mut b = BooleanBufferBuilder::new(all_nulls_input_array.len()); + b.append_n(all_nulls_input_array.len(), true); + b + }); builder.vectorized_equal_to( &[0, 1, 2, 3, 4], &all_nulls_input_array, &[0, 1, 2, 3, 4], &mut equal_to_results, ); + let equal_to_results: Vec = equal_to_results.into(); assert!(equal_to_results[0]); assert!(equal_to_results[1]); @@ -601,13 +634,14 @@ mod tests { .vectorized_append(&all_not_nulls_input_array, &[0, 1, 2, 3, 4]) .unwrap(); - let mut equal_to_results = vec![true; all_not_nulls_input_array.len()]; + let mut equal_to_results = FixedBitPackedMutableBuffer::new_set(all_not_nulls_input_array.len()); builder.vectorized_equal_to( &[5, 6, 7, 8, 9], &all_not_nulls_input_array, &[0, 1, 2, 3, 4], &mut equal_to_results, ); + let equal_to_results: Vec = equal_to_results.into(); assert!(equal_to_results[0]); assert!(equal_to_results[1]); @@ -624,7 +658,7 @@ mod tests { &[usize], &ArrayRef, &[usize], - &mut Vec, + &mut FixedBitPackedMutableBuffer, ), { // Will cover such cases: @@ -670,7 +704,11 @@ mod tests { Arc::new(StringArray::new(offsets, buffer, nulls.finish())) as ArrayRef; // Check - let mut equal_to_results = vec![true; builder.len()]; + let mut equal_to_results = FixedBitPackedMutableBuffer({ + let mut b = BooleanBufferBuilder::new(builder.len()); + b.append_n(builder.len(), true); + b + }); equal_to( &builder, &[0, 1, 2, 3, 4, 5], @@ -679,6 +717,8 @@ mod tests { &mut equal_to_results, ); + let equal_to_results = equal_to_results.0.finish().iter().collect_vec(); + assert!(!equal_to_results[0]); assert!(equal_to_results[1]); assert!(equal_to_results[2]); diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs index fde477c2cf7b5..a23341bc6b6bd 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs @@ -15,9 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::aggregates::group_values::multi_group_by::{ - nulls_equal_to, GroupColumn, Nulls, -}; +use crate::aggregates::group_values::multi_group_by::{nulls_equal_to, FixedBitPackedMutableBuffer, GroupColumn, Nulls}; use crate::aggregates::group_values::null_builder::MaybeNullBufferBuilder; use arrow::array::{make_view, Array, ArrayRef, AsArray, ByteView, GenericByteViewArray}; use arrow::buffer::{Buffer, ScalarBuffer}; @@ -27,6 +25,8 @@ use itertools::izip; use std::marker::PhantomData; use std::mem::{replace, size_of}; use std::sync::Arc; +use arrow::util::bit_util::apply_bitwise_unary_op; +use crate::aggregates::group_values::multi_group_by::helper::CollectBool; const BYTE_VIEW_MAX_BLOCK_SIZE: usize = 2 * 1024 * 1024; @@ -122,24 +122,49 @@ impl ByteViewGroupValueBuilder { lhs_rows: &[usize], array: &ArrayRef, rhs_rows: &[usize], - equal_to_results: &mut [bool], + equal_to_results: &mut FixedBitPackedMutableBuffer, ) { let array = array.as_byte_view::(); - let iter = izip!( - lhs_rows.iter(), - rhs_rows.iter(), - equal_to_results.iter_mut(), - ); + assert_eq!(lhs_rows.len(), rhs_rows.len()); + assert_eq!(lhs_rows.len(), equal_to_results.len()); + + // TODO - skip to the first true bit in equal_to_results to avoid unnecessary work + // in iterating over unnecessary bits oe even get a slice of starting from first true bit to the last true bit + + // TODO - do not assume for byte aligned, added here just for POC + let mut index = 0; + let num_rows = lhs_rows.len(); + apply_bitwise_unary_op( + equal_to_results.0.as_slice_mut(), + 0, + lhs_rows.len(), + |eq| { + // If already false, skip 64 items + if eq == 0 { + index += 64; + return 0; + } - for (&lhs_row, &rhs_row, equal_to_result) in iter { - // Has found not equal to, don't need to check - if !*equal_to_result { - continue; + let result = u64::collect_bool::< + // Using rest true as we don't wanna change bits beyond num_rows + true, + _ + >( + num_rows - index, + |bit_idx| { + if !eq.get_bit(bit_idx) { + return false; + } + let current_index = index + bit_idx; + self.do_equal_to_inner(lhs_rows[current_index], array, rhs_rows[current_index]) + } + ); + + index += 64; + eq & result } - - *equal_to_result = self.do_equal_to_inner(lhs_row, array, rhs_row); - } + ); } fn vectorized_append_inner(&mut self, array: &ArrayRef, rows: &[usize]) { @@ -505,7 +530,7 @@ impl GroupColumn for ByteViewGroupValueBuilder { group_indices: &[usize], array: &ArrayRef, rows: &[usize], - equal_to_results: &mut [bool], + equal_to_results: &mut FixedBitPackedMutableBuffer, ) { self.vectorized_equal_to_inner(group_indices, array, rows, equal_to_results); } @@ -549,7 +574,7 @@ mod tests { use crate::aggregates::group_values::multi_group_by::bytes_view::ByteViewGroupValueBuilder; use arrow::array::{ArrayRef, AsArray, NullBufferBuilder, StringViewArray}; use arrow::datatypes::StringViewType; - + use crate::aggregates::group_values::multi_group_by::FixedBitPackedMutableBuffer; use super::GroupColumn; #[test] @@ -590,10 +615,13 @@ mod tests { lhs_rows: &[usize], input_array: &ArrayRef, rhs_rows: &[usize], - equal_to_results: &mut Vec| { + equal_to_results: &mut FixedBitPackedMutableBuffer| { let iter = lhs_rows.iter().zip(rhs_rows.iter()); for (idx, (&lhs_row, &rhs_row)) in iter.enumerate() { - equal_to_results[idx] = builder.equal_to(lhs_row, input_array, rhs_row); + equal_to_results.set_bit( + idx, + builder.equal_to(lhs_row, input_array, rhs_row), + ) } }; @@ -614,7 +642,7 @@ mod tests { lhs_rows: &[usize], input_array: &ArrayRef, rhs_rows: &[usize], - equal_to_results: &mut Vec| { + equal_to_results: &mut FixedBitPackedMutableBuffer| { builder.vectorized_equal_to( lhs_rows, input_array, @@ -646,13 +674,14 @@ mod tests { .vectorized_append(&all_nulls_input_array, &[0, 1, 2, 3, 4]) .unwrap(); - let mut equal_to_results = vec![true; all_nulls_input_array.len()]; + let mut equal_to_results = FixedBitPackedMutableBuffer::new_set(all_nulls_input_array.len()); builder.vectorized_equal_to( &[0, 1, 2, 3, 4], &all_nulls_input_array, &[0, 1, 2, 3, 4], &mut equal_to_results, ); + let equal_to_results: Vec = equal_to_results.into(); assert!(equal_to_results[0]); assert!(equal_to_results[1]); @@ -672,13 +701,15 @@ mod tests { .vectorized_append(&all_not_nulls_input_array, &[0, 1, 2, 3, 4]) .unwrap(); - let mut equal_to_results = vec![true; all_not_nulls_input_array.len()]; + let mut equal_to_results = FixedBitPackedMutableBuffer::new_set(all_not_nulls_input_array.len()); builder.vectorized_equal_to( &[5, 6, 7, 8, 9], &all_not_nulls_input_array, &[0, 1, 2, 3, 4], &mut equal_to_results, ); + + let equal_to_results: Vec = equal_to_results.into(); assert!(equal_to_results[0]); assert!(equal_to_results[1]); @@ -695,7 +726,7 @@ mod tests { &[usize], &ArrayRef, &[usize], - &mut Vec, + &mut FixedBitPackedMutableBuffer, ), { // Will cover such cases: @@ -774,7 +805,7 @@ mod tests { Arc::new(StringViewArray::new(views, buffer, nulls.finish())) as ArrayRef; // Check - let mut equal_to_results = vec![true; input_array.len()]; + let mut equal_to_results = FixedBitPackedMutableBuffer::new_set(input_array.len()); equal_to( &builder, &[0, 1, 2, 3, 4, 5, 6, 7, 7, 7, 8, 8], @@ -782,6 +813,8 @@ mod tests { &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11], &mut equal_to_results, ); + + let equal_to_results: Vec = equal_to_results.into(); assert!(!equal_to_results[0]); assert!(equal_to_results[1]); diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/helper.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/helper.rs new file mode 100644 index 0000000000000..7164cce590bc4 --- /dev/null +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/helper.rs @@ -0,0 +1,422 @@ +use arrow::{array::ArrayRef, buffer::NullBuffer, util::bit_util::{self, apply_bitwise_unary_op}}; + +use crate::aggregates::group_values::{multi_group_by::FixedBitPackedMutableBuffer, null_builder::MaybeNullBufferBuilder}; + + + +pub trait CollectBool { + + fn collect_bool bool>(length: usize, f: F) -> Self; + fn collect_bool_full bool>(f: F) -> Self; + + /// Collect booleans for partial length, filling the rest with `rest` value + fn collect_bool_partial bool>(length: usize, f: F) -> Self; + + fn get_bit(self, index: usize) -> bool; +} + +impl CollectBool for u64 { + #[inline] + fn collect_bool bool>(length: usize, f: F) -> Self { + if length >= 64 { + Self::collect_bool_full(f) + } else { + Self::collect_bool_partial::(length, f) + } + } + + #[inline] + fn collect_bool_full bool>(mut f: F) -> Self { + let mut packed = 0; + for bit_idx in 0..64 { + packed |= (f(bit_idx) as u64) << bit_idx; + } + + packed + } + + #[inline] + fn collect_bool_partial bool>(length: usize, f: F) -> Self { + let mut packed = 0; + let mut f = f; + for bit_idx in 0..length { + packed |= (f(bit_idx) as u64) << bit_idx; + } + + if REST { + packed |= u64::MAX << length + } + + packed + } + + #[inline] + fn get_bit(self, index: usize) -> bool { + ((self >> index) & 1) != 0 + } +} + + +/// Return (bit packed equal nullability, bit packed for both non-nulls) +/// +/// the value in bits after length should not be used and is not gurrentee to any value +/// +/// so if comparing both: +/// (F - null, T - valid) +/// ```text +/// [F, F, T, T, F] +/// [F, T, T, F, F] +/// ``` +/// +/// it will return bit packed for this: +/// (F - unset, T - set) +/// ```text +/// [T, F, T, F, T] for equal nullability +/// [F, F, T, F, F] for both non nulls +/// ``` +pub(crate) fn compare_nulls_to_packed( + length: usize, + offset: usize, + lhs_rows: &[usize], + lhs_nulls: &MaybeNullBufferBuilder, + rhs_rows: &[usize], + rhs_nulls: Option<&NullBuffer>, +) -> (u64, u64) { + let selected_self_nulls_packed = if lhs_nulls.might_have_nulls() { + u64::collect_bool::< + // rest here doesn't matter as it should not be used + false, + _ + >( + length, + |bit_idx| { + let current_index = offset + bit_idx; + let lhs_row = if cfg!(debug_assertions) { + lhs_rows[current_index] + } else { + // SAFETY: indices are guaranteed to be in bounds + unsafe {*lhs_rows.get_unchecked(current_index)} + }; + + lhs_nulls.is_valid(lhs_row) + }, + ) + } else { + u64::MAX + }; + + let selected_array_nulls_packed = if let Some(nulls) = rhs_nulls { + u64::collect_bool::< + // rest here doesn't matter as it should not be used + false, + _ + >( + length, + |bit_idx| { + let current_index = offset + bit_idx; + let rhs_row = if cfg!(debug_assertions) { + rhs_rows[current_index] + } else { + // SAFETY: indices are guaranteed to be in bounds + unsafe {*rhs_rows.get_unchecked(current_index)} + }; + + // TODO - should use here unchecked as well? + nulls.is_valid(rhs_row) + }, + ) + } else { + // all valid + u64::MAX + }; + + ( + // Equal nullability if both false or true, than this is true + !(selected_self_nulls_packed ^ selected_array_nulls_packed), + // For both valid, + selected_self_nulls_packed & selected_array_nulls_packed + ) +} + + +/// Return (bit packed equal nullability, bit packed for both non-nulls) +/// +/// the value in bits after length should not be used and is not gurrentee to any value +/// +/// so if comparing both: +/// (F - null, T - valid) +/// ```text +/// [F, F, T, T, F] +/// [F, T, T, F, F] +/// ``` +/// +/// it will return bit packed for this: +/// (F - unset, T - set) +/// ```text +/// [T, F, T, F, T] for equal nullability +/// [F, F, T, F, F] for both non nulls +/// ``` +pub(crate) fn compare_fixed_nulls_to_packed( + length: usize, + lhs_rows: &[usize; 64], + lhs_nulls: Option<&[u8]>, + rhs_rows: &[usize; 64], + rhs_nulls: Option<&NullBuffer>, +) -> (u64, u64) { + let selected_self_nulls_packed = if let Some(lhs_nulls) = lhs_nulls { + let lhs_nulls_ptr =lhs_nulls.as_ptr(); + u64::collect_bool::< + // rest here doesn't matter as it should not be used + false, + _ + >( + length, + |bit_idx| { + let lhs_row = if cfg!(debug_assertions) { + lhs_rows[bit_idx] + } else { + // SAFETY: indices are guaranteed to be in bounds + unsafe {*lhs_rows.get_unchecked(bit_idx)} + }; + + unsafe { bit_util::get_bit_raw(lhs_nulls_ptr, lhs_row) } + }, + ) + } else { + u64::MAX + }; + + let selected_array_nulls_packed = if let Some(nulls) = rhs_nulls { + let rhs_nulls_values = nulls.inner().values(); + let offset = nulls.offset(); + let rhs_nulls_ptr = rhs_nulls_values.as_ptr(); + + u64::collect_bool::< + // rest here doesn't matter as it should not be used + false, + _ + >( + length, + |bit_idx| { + let rhs_row = if cfg!(debug_assertions) { + rhs_rows[bit_idx] + } else { + // SAFETY: indices are guaranteed to be in bounds + unsafe {*rhs_rows.get_unchecked(bit_idx)} + }; + + unsafe { bit_util::get_bit_raw(rhs_nulls_ptr, offset + rhs_row) } + }, + ) + } else { + // all valid + u64::MAX + }; + + ( + // Equal nullability if both false or true, than this is true + !(selected_self_nulls_packed ^ selected_array_nulls_packed), + // For both valid, + selected_self_nulls_packed & selected_array_nulls_packed + ) +} + + +/// Return (bit packed equal nullability, bit packed for both non-nulls) +/// +/// the value in bits after length should not be used and is not gurrentee to any value +/// +/// so if comparing both: +/// (F - null, T - valid) +/// ```text +/// [F, F, T, T, F] +/// [F, T, T, F, F] +/// ``` +/// +/// it will return bit packed for this: +/// (F - unset, T - set) +/// ```text +/// [T, F, T, F, T] for equal nullability +/// [F, F, T, F, F] for both non nulls +/// ``` +pub(crate) fn compare_fixed_raw_nulls_to_packed( + length: usize, + lhs_rows: &[usize; 64], + lhs_nulls: Option<&[u8]>, + rhs_rows: &[usize; 64], + rhs_nulls: Option<( + // offset + usize, + // buffer + &[u8] + )>, +) -> (u64, u64) { + let selected_self_nulls_packed = if let Some(lhs_nulls) = lhs_nulls { + let lhs_nulls_ptr =lhs_nulls.as_ptr(); + u64::collect_bool::< + // rest here doesn't matter as it should not be used + false, + _ + >( + length, + |bit_idx| { + let lhs_row = if cfg!(debug_assertions) { + lhs_rows[bit_idx] + } else { + // SAFETY: indices are guaranteed to be in bounds + unsafe {*lhs_rows.get_unchecked(bit_idx)} + }; + + unsafe { bit_util::get_bit_raw(lhs_nulls_ptr, lhs_row) } + }, + ) + } else { + u64::MAX + }; + + let selected_array_nulls_packed = if let Some((offset, nulls)) = rhs_nulls { + let rhs_nulls_ptr = nulls.as_ptr(); + + u64::collect_bool::< + // rest here doesn't matter as it should not be used + false, + _ + >( + length, + |bit_idx| { + let rhs_row = if cfg!(debug_assertions) { + rhs_rows[bit_idx] + } else { + // SAFETY: indices are guaranteed to be in bounds + unsafe {*rhs_rows.get_unchecked(bit_idx)} + }; + + unsafe { bit_util::get_bit_raw(rhs_nulls_ptr, offset + rhs_row) } + }, + ) + } else { + // all valid + u64::MAX + }; + + ( + // Equal nullability if both false or true, than this is true + !(selected_self_nulls_packed ^ selected_array_nulls_packed), + // For both valid, + selected_self_nulls_packed & selected_array_nulls_packed + ) +} + + + +/// Return u64 bit packed where a bit is set if (both are nulls) || (both are valid && values eq) +/// +/// Equal map +/// +/// | nulls |val | | +/// | lhs | rhs | eq | result | +/// | --------- | -- | ------ | +/// | F | F | F | T | +/// | F | F | T | T | +/// | F | T | F | F | +/// | F | T | T | F | +/// | T | F | F | F | +/// | T | F | T | F | +/// | T | T | F | F | +/// | T | T | T | T | +#[inline] +pub(crate) fn combine_nullability_and_value_equal_bit_packed_u64( + both_valid: u64, + nullability_eq: u64, + values_eq: u64 +) -> u64 { + (both_valid & values_eq) | (nullability_eq & !both_valid) +} + + +// pub fn vectorized_equal_to_helper( +// lhs_rows: &[usize], +// rhs_rows: &[usize], +// rhs_array: &ArrayRef, +// equal_to_results: &mut FixedBitPackedMutableBuffer, +// ) { +// if !CHECK_NULLABILITY { +// assert!( +// (rhs_array.null_count() == 0 && !self.nulls.might_have_nulls()), +// "CHECK_NULLABILITY is false for nullable called with nullable input" +// ); +// } + +// assert_eq!(lhs_rows.len(), rhs_rows.len()); +// assert_eq!(lhs_rows.len(), equal_to_results.len()); + +// // TODO - skip to the first true bit in equal_to_results to avoid unnecessary work +// // in iterating over unnecessary bits oe even get a slice of starting from first true bit to the last true bit + +// // TODO - do not assume for byte aligned, added here just for POC +// let mut index = 0; +// let num_rows = lhs_rows.len(); +// apply_bitwise_unary_op( +// equal_to_results.0.as_slice_mut(), +// 0, +// lhs_rows.len(), +// |eq| { +// // If already false, skip 64 items +// if eq == 0 { +// index += 64; +// return 0; +// } + +// let length = num_rows - index; + +// let (nullability_eq, both_valid) = if CHECK_NULLABILITY { +// compare_nulls_to_packed( +// length, +// index, +// lhs_rows, +// &self.nulls, +// rhs_rows, +// array.nulls() +// ) +// } else { +// ( +// // nullability equal +// u64::MAX, +// // both valid +// u64::MAX +// ) +// }; + +// if nullability_eq == 0 { +// index += 64; +// return 0 +// } + +// // if all nullability match and they both nulls than its the same value +// if nullability_eq == u64::MAX && both_valid == 0 { +// index += 64; +// return eq; +// } + +// // TODO - we can maybe get only from the first set bit until the last set bit +// // and then update those gaps with false +// // TODO - make sure not to override bits after `length` +// let values_eq = self.get_bit_packed_u64_for_eq_values( +// length, +// index, +// lhs_rows, +// rhs_rows, +// array_values, +// ); + +// let result = combine_nullability_and_value_equal_bit_packed_u64( +// both_valid, +// nullability_eq, +// values_eq +// ); + +// index += 64; +// eq & result +// }, +// ); +// } \ No newline at end of file diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs index 9adf028eca7f6..f1ebd528c231f 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs @@ -21,16 +21,18 @@ mod boolean; mod bytes; pub mod bytes_view; pub mod primitive; +mod helper; use std::mem::{self, size_of}; - +use std::ops::Index; use crate::aggregates::group_values::multi_group_by::{ boolean::BooleanGroupValueBuilder, bytes::ByteGroupValueBuilder, bytes_view::ByteViewGroupValueBuilder, primitive::PrimitiveGroupValueBuilder, }; use crate::aggregates::group_values::GroupValues; use ahash::RandomState; -use arrow::array::{Array, ArrayRef, RecordBatch}; +use arrow::array::{Array, ArrayRef, BooleanBufferBuilder, RecordBatch}; +use arrow::buffer::BooleanBuffer; use arrow::compute::cast; use arrow::datatypes::{ BinaryViewType, DataType, Date32Type, Date64Type, Decimal128Type, Float32Type, @@ -40,6 +42,7 @@ use arrow::datatypes::{ TimestampNanosecondType, TimestampSecondType, UInt16Type, UInt32Type, UInt64Type, UInt8Type, }; +use arrow::util::bit_iterator::BitIterator; use datafusion_common::hash_utils::create_hashes; use datafusion_common::{internal_datafusion_err, not_impl_err, Result}; use datafusion_execution::memory_pool::proxy::{HashTableAllocExt, VecAllocExt}; @@ -51,6 +54,48 @@ use hashbrown::hash_table::HashTable; const NON_INLINED_FLAG: u64 = 0x8000000000000000; const VALUE_MASK: u64 = 0x7FFFFFFFFFFFFFFF; +#[derive(Debug)] +pub struct FixedBitPackedMutableBuffer(BooleanBufferBuilder); + +impl FixedBitPackedMutableBuffer { + pub(crate) fn new_set(size: usize) -> Self { + let mut builder = BooleanBufferBuilder::new(size); + builder.append_n(size, true); + Self(builder) + } + + pub(crate) fn len(&self) -> usize { + self.0.len() + } + + pub(crate) fn set_bit(&mut self, index: usize, value: bool) { + self.0.set_bit(index, value); + } + + pub(crate) fn iter(&self) -> BitIterator<'_> { + BitIterator::new(self.0.as_slice(), 0, self.0.len()) + } +} + +impl From<&BooleanBuffer> for FixedBitPackedMutableBuffer { + fn from(value: &BooleanBuffer) -> Self { + FixedBitPackedMutableBuffer({ + let mut mutable = BooleanBufferBuilder::new(0); + mutable.append_buffer(value); + + mutable + }) + } +} + + +#[cfg(test)] +impl From for Vec { + fn from(mut value: FixedBitPackedMutableBuffer) -> Self { + value.0.finish().into_iter().collect() + } +} + /// Trait for storing a single column of group values in [`GroupValuesColumn`] /// /// Implementations of this trait store an in-progress collection of group values @@ -82,7 +127,7 @@ pub trait GroupColumn: Send + Sync { lhs_rows: &[usize], array: &ArrayRef, rhs_rows: &[usize], - equal_to_results: &mut [bool], + equal_to_results: &mut FixedBitPackedMutableBuffer, ); /// The vectorized version `append_val` @@ -224,7 +269,6 @@ pub struct GroupValuesColumn { /// Buffers to store intermediate results in `vectorized_append` /// and `vectorized_equal_to`, for reducing memory allocation -#[derive(Default)] struct VectorizedOperationBuffers { /// The `vectorized append` row indices buffer append_row_indices: Vec, @@ -236,7 +280,7 @@ struct VectorizedOperationBuffers { equal_to_group_indices: Vec, /// The `vectorized_equal_to` result buffer - equal_to_results: Vec, + equal_to_results: BooleanBufferBuilder, /// The buffer for storing row indices found not equal to /// exist groups in `group_values` in `vectorized_equal_to`. @@ -249,11 +293,23 @@ impl VectorizedOperationBuffers { self.append_row_indices.clear(); self.equal_to_row_indices.clear(); self.equal_to_group_indices.clear(); - self.equal_to_results.clear(); + self.equal_to_results.truncate(0); self.remaining_row_indices.clear(); } } +impl Default for VectorizedOperationBuffers { + fn default() -> Self { + Self { + append_row_indices: Vec::default(), + equal_to_row_indices: Vec::default(), + equal_to_group_indices: Vec::default(), + equal_to_results: BooleanBufferBuilder::new(0), + remaining_row_indices: Vec::default(), + } + } +} + impl GroupValuesColumn { // ======================================================================== // Initialization functions @@ -617,34 +673,37 @@ impl GroupValuesColumn { // 1. Perform `vectorized_equal_to` for `rows` in `vectorized_equal_to_group_indices` // and `group_indices` in `vectorized_equal_to_group_indices` let mut equal_to_results = - mem::take(&mut self.vectorized_operation_buffers.equal_to_results); - equal_to_results.clear(); - equal_to_results.resize( + mem::replace(&mut self.vectorized_operation_buffers.equal_to_results, BooleanBufferBuilder::new(0)); + equal_to_results.truncate(0); + equal_to_results.append_n( self.vectorized_operation_buffers .equal_to_group_indices .len(), true, ); + let mut fixed_bit_packed_equal_to_results = + FixedBitPackedMutableBuffer(equal_to_results); for (col_idx, group_col) in self.group_values.iter().enumerate() { group_col.vectorized_equal_to( &self.vectorized_operation_buffers.equal_to_group_indices, &cols[col_idx], &self.vectorized_operation_buffers.equal_to_row_indices, - &mut equal_to_results, + &mut fixed_bit_packed_equal_to_results, ); } // 2. Check `equal_to_results`, if found not equal to `row`s, just add them // to `scalarized_indices`, and perform `scalarized_intern` for them after. let mut current_row_equal_to_result = false; - for (idx, &row) in self + for (idx, (&row, equal_to_result)) in self .vectorized_operation_buffers .equal_to_row_indices .iter() + .zip(fixed_bit_packed_equal_to_results.iter()) .enumerate() { - let equal_to_result = equal_to_results[idx]; + // let equal_to_result = fixed_bit_packed_equal_to_results[idx]; // Equal to case, set the `group_indices` to `rows` in `groups` if equal_to_result { @@ -675,7 +734,7 @@ impl GroupValuesColumn { } } - self.vectorized_operation_buffers.equal_to_results = equal_to_results; + self.vectorized_operation_buffers.equal_to_results = fixed_bit_packed_equal_to_results.0; } /// It is possible that some `input rows` have the same diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs index df2cf4bdecce5..6e96e80dc696f 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs @@ -15,19 +15,24 @@ // specific language governing permissions and limitations // under the License. +use crate::aggregates::group_values::multi_group_by::helper::{ + combine_nullability_and_value_equal_bit_packed_u64, compare_fixed_nulls_to_packed, + compare_fixed_raw_nulls_to_packed, compare_nulls_to_packed, CollectBool, +}; use crate::aggregates::group_values::multi_group_by::{ - nulls_equal_to, GroupColumn, Nulls, + nulls_equal_to, FixedBitPackedMutableBuffer, GroupColumn, Nulls, }; use crate::aggregates::group_values::null_builder::MaybeNullBufferBuilder; -use arrow::array::ArrowNativeTypeOp; use arrow::array::{cast::AsArray, Array, ArrayRef, ArrowPrimitiveType, PrimitiveArray}; -use arrow::buffer::ScalarBuffer; +use arrow::array::{ArrowNativeTypeOp, BooleanArray}; +use arrow::buffer::{NullBuffer, ScalarBuffer}; use arrow::datatypes::DataType; +use arrow::util::bit_util::{apply_bitwise_binary_op, apply_bitwise_unary_op}; use datafusion_common::Result; use datafusion_execution::memory_pool::proxy::VecAllocExt; use itertools::izip; -use std::iter; use std::sync::Arc; +use std::{iter, u64}; /// An implementation of [`GroupColumn`] for primitive values /// @@ -57,83 +62,185 @@ where } } - fn vectorized_equal_to_non_nullable( + fn get_fixed_bit_packed_u64_for_eq_values( &self, - lhs_rows: &[usize], - array: &ArrayRef, - rhs_rows: &[usize], - equal_to_results: &mut [bool], - ) { - assert!( - !NULLABLE || (array.null_count() == 0 && !self.nulls.might_have_nulls()), - "called with nullable input" - ); - let array_values = array.as_primitive::().values(); - - let iter = izip!( - lhs_rows.iter(), - rhs_rows.iter(), - equal_to_results.iter_mut(), - ); - - for (&lhs_row, &rhs_row, equal_to_result) in iter { - let result = { - // Getting unchecked not only for bound checks but because the bound checks are - // what prevents auto-vectorization - let left = if cfg!(debug_assertions) { - self.group_values[lhs_row] - } else { - // SAFETY: indices are guaranteed to be in bounds - unsafe { *self.group_values.get_unchecked(lhs_row) } - }; - let right = if cfg!(debug_assertions) { - array_values[rhs_row] - } else { - // SAFETY: indices are guaranteed to be in bounds - unsafe { *array_values.get_unchecked(rhs_row) } - }; + length: usize, + lhs_rows: &[usize; 64], + rhs_rows: &[usize; 64], + array_values: &ScalarBuffer, + ) -> u64 { + u64::collect_bool::< + // Using rest true as we don't wanna change bits beyond num_rows + true, + _, + >(length, |bit_idx| { + let (lhs_row, rhs_row) = if cfg!(debug_assertions) { + (lhs_rows[bit_idx], rhs_rows[bit_idx]) + } else { + // SAFETY: indices are guaranteed to be in bounds + unsafe { + ( + *lhs_rows.get_unchecked(bit_idx), + *rhs_rows.get_unchecked(bit_idx), + ) + } + }; - // Always evaluate, to allow for auto-vectorization - left.is_eq(right) + // Getting unchecked not only for bound checks but because the bound checks are + // what prevents auto-vectorization + let left = if cfg!(debug_assertions) { + self.group_values[lhs_row] + } else { + // SAFETY: indices are guaranteed to be in bounds + unsafe { *self.group_values.get_unchecked(lhs_row) } + }; + let right = if cfg!(debug_assertions) { + array_values[rhs_row] + } else { + // SAFETY: indices are guaranteed to be in bounds + unsafe { *array_values.get_unchecked(rhs_row) } }; - *equal_to_result = result && *equal_to_result; - } + // Always evaluate, to allow for auto-vectorization + left.is_eq(right) + }) } - pub fn vectorized_equal_nullable( + // TODO - extract this function for other datatype impl + pub fn inner_vectorized_equal( &self, lhs_rows: &[usize], array: &ArrayRef, rhs_rows: &[usize], - equal_to_results: &mut [bool], + equal_to_results: &mut FixedBitPackedMutableBuffer, ) { - assert!(NULLABLE, "called with non-nullable input"); + if !CHECK_NULLABILITY { + assert!( + (array.null_count() == 0 && !self.nulls.might_have_nulls()), + "CHECK_NULLABILITY is false for nullable called with nullable input" + ); + } + let array = array.as_primitive::(); + let array_values = array.values(); - let iter = izip!( - lhs_rows.iter(), - rhs_rows.iter(), - equal_to_results.iter_mut(), - ); + assert_eq!(lhs_rows.len(), rhs_rows.len()); + assert_eq!(lhs_rows.len(), equal_to_results.len()); - for (&lhs_row, &rhs_row, equal_to_result) in iter { - // Has found not equal to in previous column, don't need to check - if !*equal_to_result { - continue; - } + // TODO - skip to the first true bit in equal_to_results to avoid unnecessary work + // in iterating over unnecessary bits oe even get a slice of starting from first true bit to the last true bit - // Perf: skip null check (by short circuit) if input is not nullable - let exist_null = self.nulls.is_null(lhs_row); - let input_null = array.is_null(rhs_row); - if let Some(result) = nulls_equal_to(exist_null, input_null) { - *equal_to_result = result; - continue; - } + // TODO - do not assume for byte aligned, added here just for POC + let mut index = 0; + let num_rows = lhs_rows.len(); - // Otherwise, we need to check their values - *equal_to_result = self.group_values[lhs_row].is_eq(array.value(rhs_row)); - } + let self_nulls_slice = if CHECK_NULLABILITY { + self.nulls.maybe_as_slice() + } else { + None + }; + let array_nulls = if CHECK_NULLABILITY { + array + .nulls() + .map(|nulls| (nulls.offset(), nulls.inner().values())) + } else { + None + }; + + let mut scrach_left_64: [usize; 64] = [0; 64]; + let mut scrach_right_64: [usize; 64] = [0; 64]; + apply_bitwise_unary_op( + equal_to_results.0.as_slice_mut(), + 0, + lhs_rows.len(), + |eq| { + // If already false, skip 64 items + if eq == 0 { + index += 64; + return 0; + } + + let length = num_rows - index; + + // Creating an array of size 64 to allow for optimization when building u64 bit packed from this + let (lhs_rows_fixed, rhs_rows_fixed) = if length >= 64 { + ( + lhs_rows[index..index + 64].try_into().unwrap(), + rhs_rows[index..index + 64].try_into().unwrap(), + ) + } else { + scrach_left_64[..length].copy_from_slice(&lhs_rows[index..]); + scrach_right_64[..length].copy_from_slice(&rhs_rows[index..]); + + (&scrach_left_64, &scrach_right_64) + }; + + let (nullability_eq, both_valid) = if CHECK_NULLABILITY { + // TODO - rest here should be + compare_fixed_raw_nulls_to_packed( + length, + lhs_rows_fixed, + self_nulls_slice, + rhs_rows_fixed, + array_nulls, + ) + } else { + ( + // nullability equal + u64::MAX, + // both valid + u64::MAX, + ) + }; + + // If given `nullability_eq` and `both_valid` we can have all the data we need: + // eq | nullability_eq | both_valid | result + // T | T | T | F + // T | T | F | T + // T | F | T | + // T | F | F | T + // F | T | T | T + // F | T | F | T + // F | F | T | + // F | F | F | T + if !(eq & nullability_eq & both_valid) == u64::MAX { + // eq | nullability_eq | both_valid | result + // T | T | T | + // T | T | F | T + // T | F | T | + // T | F | F | F + // F | T | T | F + // F | T | F | F + // F | F | T | + // F | F | F | F + index += 64; + return eq & nullability_eq & !both_valid; + } + + // TODO - we can maybe get only from the first set bit until the last set bit + // and then update those gaps with false + // TODO - make sure not to override bits after `length` + let values_eq = self.get_fixed_bit_packed_u64_for_eq_values( + length, + lhs_rows_fixed, + rhs_rows_fixed, + array_values, + ); + + let result = if CHECK_NULLABILITY { + combine_nullability_and_value_equal_bit_packed_u64( + both_valid, + nullability_eq, + values_eq, + ) + } else { + values_eq + }; + + index += 64; + eq & result + }, + ); } } @@ -176,17 +283,22 @@ impl GroupColumn lhs_rows: &[usize], array: &ArrayRef, rhs_rows: &[usize], - equal_to_results: &mut [bool], + equal_to_results: &mut FixedBitPackedMutableBuffer, ) { if !NULLABLE || (array.null_count() == 0 && !self.nulls.might_have_nulls()) { - self.vectorized_equal_to_non_nullable( + self.inner_vectorized_equal::( lhs_rows, array, rhs_rows, equal_to_results, ); } else { - self.vectorized_equal_nullable(lhs_rows, array, rhs_rows, equal_to_results); + self.inner_vectorized_equal::( + lhs_rows, + array, + rhs_rows, + equal_to_results, + ); } } @@ -280,11 +392,12 @@ impl GroupColumn mod tests { use std::sync::Arc; + use super::GroupColumn; use crate::aggregates::group_values::multi_group_by::primitive::PrimitiveGroupValueBuilder; + use crate::aggregates::group_values::multi_group_by::FixedBitPackedMutableBuffer; use arrow::array::{ArrayRef, Float32Array, Int64Array, NullBufferBuilder}; use arrow::datatypes::{DataType, Float32Type, Int64Type}; - - use super::GroupColumn; + use itertools::Itertools; #[test] fn test_nullable_primitive_equal_to() { @@ -296,16 +409,18 @@ mod tests { } }; - let equal_to = |builder: &PrimitiveGroupValueBuilder, - lhs_rows: &[usize], - input_array: &ArrayRef, - rhs_rows: &[usize], - equal_to_results: &mut Vec| { - let iter = lhs_rows.iter().zip(rhs_rows.iter()); - for (idx, (&lhs_row, &rhs_row)) in iter.enumerate() { - equal_to_results[idx] = builder.equal_to(lhs_row, input_array, rhs_row); - } - }; + let equal_to = + |builder: &PrimitiveGroupValueBuilder, + lhs_rows: &[usize], + input_array: &ArrayRef, + rhs_rows: &[usize], + equal_to_results: &mut FixedBitPackedMutableBuffer| { + let iter = lhs_rows.iter().zip(rhs_rows.iter()); + for (idx, (&lhs_row, &rhs_row)) in iter.enumerate() { + equal_to_results + .set_bit(idx, builder.equal_to(lhs_row, input_array, rhs_row)); + } + }; test_nullable_primitive_equal_to_internal(append, equal_to); } @@ -320,18 +435,19 @@ mod tests { .unwrap(); }; - let equal_to = |builder: &PrimitiveGroupValueBuilder, - lhs_rows: &[usize], - input_array: &ArrayRef, - rhs_rows: &[usize], - equal_to_results: &mut Vec| { - builder.vectorized_equal_to( - lhs_rows, - input_array, - rhs_rows, - equal_to_results, - ); - }; + let equal_to = + |builder: &PrimitiveGroupValueBuilder, + lhs_rows: &[usize], + input_array: &ArrayRef, + rhs_rows: &[usize], + equal_to_results: &mut FixedBitPackedMutableBuffer| { + builder.vectorized_equal_to( + lhs_rows, + input_array, + rhs_rows, + equal_to_results, + ); + }; test_nullable_primitive_equal_to_internal(append, equal_to); } @@ -344,7 +460,7 @@ mod tests { &[usize], &ArrayRef, &[usize], - &mut Vec, + &mut FixedBitPackedMutableBuffer, ), { // Will cover such cases: @@ -393,7 +509,8 @@ mod tests { let input_array = Arc::new(Float32Array::new(values, nulls.finish())) as ArrayRef; // Check - let mut equal_to_results = vec![true; builder.len()]; + let mut equal_to_results = FixedBitPackedMutableBuffer::new_set(builder.len()); + equal_to( &builder, &[0, 1, 2, 3, 4, 5, 6], @@ -402,6 +519,7 @@ mod tests { &mut equal_to_results, ); + let equal_to_results: Vec = equal_to_results.into(); assert!(!equal_to_results[0]); assert!(equal_to_results[1]); assert!(equal_to_results[2]); @@ -421,16 +539,18 @@ mod tests { } }; - let equal_to = |builder: &PrimitiveGroupValueBuilder, - lhs_rows: &[usize], - input_array: &ArrayRef, - rhs_rows: &[usize], - equal_to_results: &mut Vec| { - let iter = lhs_rows.iter().zip(rhs_rows.iter()); - for (idx, (&lhs_row, &rhs_row)) in iter.enumerate() { - equal_to_results[idx] = builder.equal_to(lhs_row, input_array, rhs_row); - } - }; + let equal_to = + |builder: &PrimitiveGroupValueBuilder, + lhs_rows: &[usize], + input_array: &ArrayRef, + rhs_rows: &[usize], + equal_to_results: &mut FixedBitPackedMutableBuffer| { + let iter = lhs_rows.iter().zip(rhs_rows.iter()); + for (idx, (&lhs_row, &rhs_row)) in iter.enumerate() { + equal_to_results + .set_bit(idx, builder.equal_to(lhs_row, input_array, rhs_row)); + } + }; test_not_nullable_primitive_equal_to_internal(append, equal_to); } @@ -445,18 +565,19 @@ mod tests { .unwrap(); }; - let equal_to = |builder: &PrimitiveGroupValueBuilder, - lhs_rows: &[usize], - input_array: &ArrayRef, - rhs_rows: &[usize], - equal_to_results: &mut Vec| { - builder.vectorized_equal_to( - lhs_rows, - input_array, - rhs_rows, - equal_to_results, - ); - }; + let equal_to = + |builder: &PrimitiveGroupValueBuilder, + lhs_rows: &[usize], + input_array: &ArrayRef, + rhs_rows: &[usize], + equal_to_results: &mut FixedBitPackedMutableBuffer| { + builder.vectorized_equal_to( + lhs_rows, + input_array, + rhs_rows, + equal_to_results, + ); + }; test_not_nullable_primitive_equal_to_internal(append, equal_to); } @@ -469,7 +590,7 @@ mod tests { &[usize], &ArrayRef, &[usize], - &mut Vec, + &mut FixedBitPackedMutableBuffer, ), { // Will cover such cases: @@ -487,7 +608,7 @@ mod tests { let input_array = Arc::new(Int64Array::from(vec![Some(0), Some(2)])) as ArrayRef; // Check - let mut equal_to_results = vec![true; builder.len()]; + let mut equal_to_results = FixedBitPackedMutableBuffer::new_set(builder.len()); equal_to( &builder, &[0, 1], @@ -496,6 +617,7 @@ mod tests { &mut equal_to_results, ); + let equal_to_results: Vec = equal_to_results.into(); assert!(equal_to_results[0]); assert!(!equal_to_results[1]); } @@ -520,7 +642,8 @@ mod tests { .vectorized_append(&all_nulls_input_array, &[0, 1, 2, 3, 4]) .unwrap(); - let mut equal_to_results = vec![true; all_nulls_input_array.len()]; + let mut equal_to_results = + FixedBitPackedMutableBuffer::new_set(all_nulls_input_array.len()); builder.vectorized_equal_to( &[0, 1, 2, 3, 4], &all_nulls_input_array, @@ -528,6 +651,7 @@ mod tests { &mut equal_to_results, ); + let equal_to_results: Vec = equal_to_results.into(); assert!(equal_to_results[0]); assert!(equal_to_results[1]); assert!(equal_to_results[2]); @@ -546,7 +670,7 @@ mod tests { .vectorized_append(&all_not_nulls_input_array, &[0, 1, 2, 3, 4]) .unwrap(); - let mut equal_to_results = vec![true; all_not_nulls_input_array.len()]; + let mut equal_to_results = FixedBitPackedMutableBuffer::new_set(all_not_nulls_input_array.len()); builder.vectorized_equal_to( &[5, 6, 7, 8, 9], &all_not_nulls_input_array, @@ -554,6 +678,8 @@ mod tests { &mut equal_to_results, ); + let equal_to_results: Vec = equal_to_results.into(); + assert!(equal_to_results[0]); assert!(equal_to_results[1]); assert!(equal_to_results[2]); diff --git a/datafusion/physical-plan/src/aggregates/group_values/null_builder.rs b/datafusion/physical-plan/src/aggregates/group_values/null_builder.rs index 6a84d685b6c79..1e2aced04b3f6 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/null_builder.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/null_builder.rs @@ -35,15 +35,22 @@ impl MaybeNullBufferBuilder { nulls: NullBufferBuilder::new(0), } } - + /// Return true if the row at index `row` is null - pub fn is_null(&self, row: usize) -> bool { + #[inline] + pub fn is_valid(&self, row: usize) -> bool { match self.nulls.as_slice() { // validity mask means a unset bit is NULL - Some(_) => !self.nulls.is_valid(row), - None => false, + Some(_) => self.nulls.is_valid(row), + None => true, } } + + /// Return true if the row at index `row` is null + #[inline] + pub fn is_null(&self, row: usize) -> bool { + !self.is_valid(row) + } /// Set the nullness of the next row to `is_null` /// @@ -89,6 +96,10 @@ impl MaybeNullBufferBuilder { new_builder.truncate(n); new_builder.finish() } + + pub(crate) fn maybe_as_slice(&self) -> Option<&[u8]> { + self.nulls.as_slice() + } /// Returns true if this builder might have any nulls /// diff --git a/testing b/testing index 0d60ccae40d0e..98fceecd024dc 160000 --- a/testing +++ b/testing @@ -1 +1 @@ -Subproject commit 0d60ccae40d0e8f2d22c15fafb01c5d4be8c63a6 +Subproject commit 98fceecd024dccd2f8a00e32fc144975f218acf4