diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs index a91dd3115d879..38f8c793afea3 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs @@ -19,7 +19,7 @@ use crate::aggregates::group_values::multi_group_by::{ GroupColumn, Nulls, nulls_equal_to, }; use crate::aggregates::group_values::null_builder::MaybeNullBufferBuilder; -use arrow::array::{Array, ArrayRef, AsArray, ByteView, GenericByteViewArray, make_view}; +use arrow::array::{Array, ArrayRef, AsArray, ByteView, GenericByteViewArray}; use arrow::buffer::{Buffer, ScalarBuffer}; use arrow::datatypes::ByteViewType; use datafusion_common::Result; @@ -115,7 +115,137 @@ impl ByteViewGroupValueBuilder { // Not null row case self.nulls.append(false); - self.do_append_val_inner(arr, row); + let input_view = unsafe { *arr.views().get_unchecked(row) }; + let view = Self::copy_from_view( + input_view, + arr, + row, + &mut self.in_progress, + &mut self.completed, + self.max_block_size, + ); + self.views.push(view); + } + + /// Copy a non-null value from `arr[row]` into our buffers, reusing the + /// input view. For inline values (len <= 12) the view is returned as-is. + /// For non-inline values the data is copied and the view is updated with + /// our buffer_index/offset. + fn copy_from_view( + input_view: u128, + arr: &GenericByteViewArray, + row: usize, + in_progress: &mut Vec, + completed: &mut Vec, + max_block_size: usize, + ) -> u128 { + let len = input_view as u32; + if len <= 12 { + return input_view; + } + let value: &[u8] = unsafe { arr.value_unchecked(row).as_ref() }; + let require_cap = in_progress.len() + value.len(); + if require_cap > max_block_size { + let flushed_block = replace(in_progress, Vec::with_capacity(max_block_size)); + completed.push(Buffer::from_vec(flushed_block)); + } + let buffer_index = completed.len() as u32; + let offset = in_progress.len() as u32; + in_progress.extend_from_slice(value); + ByteView::from(input_view) + .with_buffer_index(buffer_index) + .with_offset(offset) + .as_u128() + } + + /// Compact non-inline data from reused input buffers into owned + /// `in_progress`/`completed` buffers. Buffers that are >=90% + /// referenced are kept as-is (already good locality); the rest + /// are compacted by copying only referenced data. + fn compact_new_views( + views: &mut [u128], + reused_buffers: &[Buffer], + base: u32, + in_progress: &mut Vec, + completed: &mut Vec, + max_block_size: usize, + ) { + // Count referenced bytes per reused buffer + let mut referenced = vec![0usize; reused_buffers.len()]; + for view in views.iter() { + let len = *view as u32; + if len <= 12 { + continue; + } + let bv = ByteView::from(*view); + if bv.buffer_index >= base { + let idx = (bv.buffer_index - base) as usize; + referenced[idx] += bv.length as usize; + } + } + + // Decide per buffer: keep (>=90% referenced) or compact. + // For kept buffers, flush in_progress first so buffer indices + // stay ordered, then add to completed. + let mut reused_to_new = vec![u32::MAX; reused_buffers.len()]; + for (idx, buf) in reused_buffers.iter().enumerate() { + let total = buf.len(); + if total > 0 && referenced[idx] * 10 >= total * 9 { + if !in_progress.is_empty() { + let flushed = + replace(in_progress, Vec::with_capacity(max_block_size)); + completed.push(Buffer::from_vec(flushed)); + } + reused_to_new[idx] = completed.len() as u32; + completed.push(buf.clone()); + } + } + + // Remap/compact views + for view in views.iter_mut() { + let len = *view as u32; + if len <= 12 { + continue; + } + let bv = ByteView::from(*view); + if bv.buffer_index < base { + continue; + } + let reused_idx = (bv.buffer_index - base) as usize; + + if reused_to_new[reused_idx] != u32::MAX { + // Buffer kept, just remap index + *view = ByteView::from(*view) + .with_buffer_index(reused_to_new[reused_idx]) + .as_u128(); + } else { + // Compact: copy data into in_progress + let offset = bv.offset as usize; + let length = bv.length as usize; + let value = unsafe { + reused_buffers + .get_unchecked(reused_idx) + .as_slice() + .get_unchecked(offset..offset + length) + }; + + let require_cap = in_progress.len() + length; + if require_cap > max_block_size { + let flushed = + replace(in_progress, Vec::with_capacity(max_block_size)); + completed.push(Buffer::from_vec(flushed)); + } + + let new_buffer_index = completed.len() as u32; + let new_offset = in_progress.len() as u32; + in_progress.extend_from_slice(value); + + *view = ByteView::from(*view) + .with_buffer_index(new_buffer_index) + .with_offset(new_offset) + .as_u128(); + } + } } // Don't inline to keep the code small and give LLVM the best chance of @@ -157,18 +287,53 @@ impl ByteViewGroupValueBuilder { Nulls::Some }; + let input_views = arr.views(); + let data_buffers = arr.data_buffers(); + + // Phase 1: Fast extend — temporarily reference input buffers so + // the extend loop only touches views (no per-row memcpy). + let base = self.completed.len() as u32; + let views_start = self.views.len(); + match all_null_or_non_null { Nulls::Some => { - for &row in rows { - self.append_val_inner(array, row); - } + let Self { views, nulls, .. } = self; + views.extend(rows.iter().map(|&row| { + if arr.is_null(row) { + nulls.append(true); + 0u128 + } else { + nulls.append(false); + let input_view = unsafe { *input_views.get_unchecked(row) }; + let len = input_view as u32; + if len <= 12 { + input_view + } else { + ByteView::from(input_view) + .with_buffer_index( + ByteView::from(input_view).buffer_index + base, + ) + .as_u128() + } + } + })); } Nulls::None => { self.nulls.append_n(rows.len(), false); - for &row in rows { - self.do_append_val_inner(arr, row); - } + self.views.extend(rows.iter().map(|&row| { + let input_view = unsafe { *input_views.get_unchecked(row) }; + let len = input_view as u32; + if len <= 12 { + input_view + } else { + ByteView::from(input_view) + .with_buffer_index( + ByteView::from(input_view).buffer_index + base, + ) + .as_u128() + } + })); } Nulls::All => { @@ -177,45 +342,19 @@ impl ByteViewGroupValueBuilder { self.views.resize(new_len, 0); } } - } - - fn do_append_val_inner(&mut self, array: &GenericByteViewArray, row: usize) - where - B: ByteViewType, - { - let value: &[u8] = array.value(row).as_ref(); - - let value_len = value.len(); - let view = if value_len <= 12 { - make_view(value, 0, 0) - } else { - // Ensure big enough block to hold the value firstly - self.ensure_in_progress_big_enough(value_len); - - // Append value - let buffer_index = self.completed.len(); - let offset = self.in_progress.len(); - self.in_progress.extend_from_slice(value); - - make_view(value, buffer_index as u32, offset as u32) - }; - - // Append view - self.views.push(view); - } - - fn ensure_in_progress_big_enough(&mut self, value_len: usize) { - debug_assert!(value_len > 12); - let require_cap = self.in_progress.len() + value_len; - // If current block isn't big enough, flush it and create a new in progress block - if require_cap > self.max_block_size { - let flushed_block = replace( + // Phase 2: Compact — copy non-inline data from the input's + // buffers into our own in_progress/completed for perfect locality. + // Only touches the newly added views (views_start..). + if !data_buffers.is_empty() { + Self::compact_new_views( + &mut self.views[views_start..], + data_buffers, + base, &mut self.in_progress, - Vec::with_capacity(self.max_block_size), + &mut self.completed, + self.max_block_size, ); - let buffer = Buffer::from_vec(flushed_block); - self.completed.push(buffer); } }