Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::aggregates::group_values::multi_group_by::{
GroupColumn, Nulls, nulls_equal_to,
};
use crate::aggregates::group_values::null_builder::MaybeNullBufferBuilder;
use arrow::array::{Array, ArrayRef, AsArray, ByteView, GenericByteViewArray, make_view};
use arrow::array::{Array, ArrayRef, AsArray, ByteView, GenericByteViewArray};
use arrow::buffer::{Buffer, ScalarBuffer};
use arrow::datatypes::ByteViewType;
use datafusion_common::Result;
Expand Down Expand Up @@ -115,7 +115,137 @@ impl<B: ByteViewType> ByteViewGroupValueBuilder<B> {

// Not null row case
self.nulls.append(false);
self.do_append_val_inner(arr, row);
let input_view = unsafe { *arr.views().get_unchecked(row) };
let view = Self::copy_from_view(
input_view,
arr,
row,
&mut self.in_progress,
&mut self.completed,
self.max_block_size,
);
self.views.push(view);
}

/// Copy a non-null value from `arr[row]` into our buffers, reusing the
/// input view. For inline values (len <= 12) the view is returned as-is.
/// For non-inline values the data is copied and the view is updated with
/// our buffer_index/offset.
fn copy_from_view(
input_view: u128,
arr: &GenericByteViewArray<B>,
row: usize,
in_progress: &mut Vec<u8>,
completed: &mut Vec<Buffer>,
max_block_size: usize,
) -> u128 {
let len = input_view as u32;
if len <= 12 {
return input_view;
}
let value: &[u8] = unsafe { arr.value_unchecked(row).as_ref() };
let require_cap = in_progress.len() + value.len();
if require_cap > max_block_size {
let flushed_block = replace(in_progress, Vec::with_capacity(max_block_size));
completed.push(Buffer::from_vec(flushed_block));
}
let buffer_index = completed.len() as u32;
let offset = in_progress.len() as u32;
in_progress.extend_from_slice(value);
ByteView::from(input_view)
.with_buffer_index(buffer_index)
.with_offset(offset)
.as_u128()
}

/// Compact non-inline data from reused input buffers into owned
/// `in_progress`/`completed` buffers. Buffers that are >=90%
/// referenced are kept as-is (already good locality); the rest
/// are compacted by copying only referenced data.
fn compact_new_views(
views: &mut [u128],
reused_buffers: &[Buffer],
base: u32,
in_progress: &mut Vec<u8>,
completed: &mut Vec<Buffer>,
max_block_size: usize,
) {
// Count referenced bytes per reused buffer
let mut referenced = vec![0usize; reused_buffers.len()];
for view in views.iter() {
let len = *view as u32;
if len <= 12 {
continue;
}
let bv = ByteView::from(*view);
if bv.buffer_index >= base {
let idx = (bv.buffer_index - base) as usize;
referenced[idx] += bv.length as usize;
}
}

// Decide per buffer: keep (>=90% referenced) or compact.
// For kept buffers, flush in_progress first so buffer indices
// stay ordered, then add to completed.
let mut reused_to_new = vec![u32::MAX; reused_buffers.len()];
for (idx, buf) in reused_buffers.iter().enumerate() {
let total = buf.len();
if total > 0 && referenced[idx] * 10 >= total * 9 {
if !in_progress.is_empty() {
let flushed =
replace(in_progress, Vec::with_capacity(max_block_size));
completed.push(Buffer::from_vec(flushed));
}
reused_to_new[idx] = completed.len() as u32;
completed.push(buf.clone());
}
}

// Remap/compact views
for view in views.iter_mut() {
let len = *view as u32;
if len <= 12 {
continue;
}
let bv = ByteView::from(*view);
if bv.buffer_index < base {
continue;
}
let reused_idx = (bv.buffer_index - base) as usize;

if reused_to_new[reused_idx] != u32::MAX {
// Buffer kept, just remap index
*view = ByteView::from(*view)
.with_buffer_index(reused_to_new[reused_idx])
.as_u128();
} else {
// Compact: copy data into in_progress
let offset = bv.offset as usize;
let length = bv.length as usize;
let value = unsafe {
reused_buffers
.get_unchecked(reused_idx)
.as_slice()
.get_unchecked(offset..offset + length)
};

let require_cap = in_progress.len() + length;
if require_cap > max_block_size {
let flushed =
replace(in_progress, Vec::with_capacity(max_block_size));
completed.push(Buffer::from_vec(flushed));
}

let new_buffer_index = completed.len() as u32;
let new_offset = in_progress.len() as u32;
in_progress.extend_from_slice(value);

*view = ByteView::from(*view)
.with_buffer_index(new_buffer_index)
.with_offset(new_offset)
.as_u128();
}
}
}

// Don't inline to keep the code small and give LLVM the best chance of
Expand Down Expand Up @@ -157,18 +287,53 @@ impl<B: ByteViewType> ByteViewGroupValueBuilder<B> {
Nulls::Some
};

let input_views = arr.views();
let data_buffers = arr.data_buffers();

// Phase 1: Fast extend — temporarily reference input buffers so
// the extend loop only touches views (no per-row memcpy).
let base = self.completed.len() as u32;
let views_start = self.views.len();

match all_null_or_non_null {
Nulls::Some => {
for &row in rows {
self.append_val_inner(array, row);
}
let Self { views, nulls, .. } = self;
views.extend(rows.iter().map(|&row| {
if arr.is_null(row) {
nulls.append(true);
0u128
} else {
nulls.append(false);
let input_view = unsafe { *input_views.get_unchecked(row) };
let len = input_view as u32;
if len <= 12 {
input_view
} else {
ByteView::from(input_view)
.with_buffer_index(
ByteView::from(input_view).buffer_index + base,
)
.as_u128()
}
}
}));
}

Nulls::None => {
self.nulls.append_n(rows.len(), false);
for &row in rows {
self.do_append_val_inner(arr, row);
}
self.views.extend(rows.iter().map(|&row| {
let input_view = unsafe { *input_views.get_unchecked(row) };
let len = input_view as u32;
if len <= 12 {
input_view
} else {
ByteView::from(input_view)
.with_buffer_index(
ByteView::from(input_view).buffer_index + base,
)
.as_u128()
}
}));
}

Nulls::All => {
Expand All @@ -177,45 +342,19 @@ impl<B: ByteViewType> ByteViewGroupValueBuilder<B> {
self.views.resize(new_len, 0);
}
}
}

fn do_append_val_inner(&mut self, array: &GenericByteViewArray<B>, row: usize)
where
B: ByteViewType,
{
let value: &[u8] = array.value(row).as_ref();

let value_len = value.len();
let view = if value_len <= 12 {
make_view(value, 0, 0)
} else {
// Ensure big enough block to hold the value firstly
self.ensure_in_progress_big_enough(value_len);

// Append value
let buffer_index = self.completed.len();
let offset = self.in_progress.len();
self.in_progress.extend_from_slice(value);

make_view(value, buffer_index as u32, offset as u32)
};

// Append view
self.views.push(view);
}

fn ensure_in_progress_big_enough(&mut self, value_len: usize) {
debug_assert!(value_len > 12);
let require_cap = self.in_progress.len() + value_len;

// If current block isn't big enough, flush it and create a new in progress block
if require_cap > self.max_block_size {
let flushed_block = replace(
// Phase 2: Compact — copy non-inline data from the input's
// buffers into our own in_progress/completed for perfect locality.
// Only touches the newly added views (views_start..).
if !data_buffers.is_empty() {
Self::compact_new_views(
&mut self.views[views_start..],
data_buffers,
base,
&mut self.in_progress,
Vec::with_capacity(self.max_block_size),
&mut self.completed,
self.max_block_size,
);
let buffer = Buffer::from_vec(flushed_block);
self.completed.push(buffer);
}
}

Expand Down
Loading