Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::aggregates::group_values::multi_group_by::{
GroupColumn, Nulls, nulls_equal_to,
};
use crate::aggregates::group_values::null_builder::MaybeNullBufferBuilder;
use arrow::array::{Array, ArrayRef, AsArray, ByteView, GenericByteViewArray, make_view};
use arrow::array::{Array, ArrayRef, AsArray, ByteView, GenericByteViewArray};
use arrow::buffer::{Buffer, ScalarBuffer};
use arrow::datatypes::ByteViewType;
use datafusion_common::Result;
Expand All @@ -28,7 +28,8 @@ use std::marker::PhantomData;
use std::mem::{replace, size_of};
use std::sync::Arc;

const BYTE_VIEW_MAX_BLOCK_SIZE: usize = 2 * 1024 * 1024;
const BYTE_VIEW_INITIAL_BLOCK_SIZE: usize = 2 * 1024 * 1024;
const BYTE_VIEW_MAX_BLOCK_SIZE: usize = 32 * 1024 * 1024;

/// An implementation of [`GroupColumn`] for binary view and utf8 view types.
///
Expand Down Expand Up @@ -85,7 +86,7 @@ impl<B: ByteViewType> ByteViewGroupValueBuilder<B> {
views: Vec::new(),
in_progress: Vec::new(),
completed: Vec::new(),
max_block_size: BYTE_VIEW_MAX_BLOCK_SIZE,
max_block_size: BYTE_VIEW_INITIAL_BLOCK_SIZE,
nulls: MaybeNullBufferBuilder::new(),
_phantom: PhantomData {},
}
Expand Down Expand Up @@ -115,7 +116,67 @@ impl<B: ByteViewType> ByteViewGroupValueBuilder<B> {

// Not null row case
self.nulls.append(false);
self.do_append_val_inner(arr, row);
let input_view = unsafe { *arr.views().get_unchecked(row) };
let view = Self::copy_view(
input_view,
arr,
row,
&mut self.in_progress,
&mut self.completed,
&mut self.max_block_size,
);
self.views.push(view);
}

/// Copy a non-null value from `arr[row]` into our buffers, reusing the
/// input view's length and prefix. For inline values (len <= 12) the
/// view is returned as-is. For non-inline values the data is copied
/// and the view is updated with our buffer_index/offset.
fn copy_view(
input_view: u128,
arr: &GenericByteViewArray<B>,
row: usize,
in_progress: &mut Vec<u8>,
completed: &mut Vec<Buffer>,
max_block_size: &mut usize,
) -> u128 {
let len = input_view as u32;
if len <= 12 {
return input_view;
}
let value: &[u8] = unsafe { arr.value_unchecked(row).as_ref() };
let require_cap = in_progress.len() + value.len();
if require_cap > *max_block_size {
// Try to grow in_progress in place (doubling) if possible,
// avoiding a flush. Only grow when at or above the initial
// default size to avoid changing behavior for tests with
// tiny block sizes.
let new_block_size = if *max_block_size >= BYTE_VIEW_INITIAL_BLOCK_SIZE {
(*max_block_size * 2)
.max(require_cap)
.min(BYTE_VIEW_MAX_BLOCK_SIZE)
} else {
*max_block_size
};
if require_cap <= new_block_size {
// Can grow in place — just reserve more capacity
in_progress.reserve(new_block_size - in_progress.len());
*max_block_size = new_block_size;
} else {
// Must flush and start new block
let flushed_block =
replace(in_progress, Vec::with_capacity(new_block_size));
completed.push(Buffer::from_vec(flushed_block));
*max_block_size = new_block_size;
}
}
let buffer_index = completed.len() as u32;
let offset = in_progress.len() as u32;
in_progress.extend_from_slice(value);
ByteView::from(input_view)
.with_buffer_index(buffer_index)
.with_offset(offset)
.as_u128()
}

// Don't inline to keep the code small and give LLVM the best chance of
Expand Down Expand Up @@ -157,68 +218,57 @@ impl<B: ByteViewType> ByteViewGroupValueBuilder<B> {
Nulls::Some
};

let input_views = arr.views();

match all_null_or_non_null {
Nulls::Some => {
for &row in rows {
self.append_val_inner(array, row);
}
let Self {
views,
in_progress,
completed,
nulls,
max_block_size,
..
} = self;
views.extend(rows.iter().map(|&row| {
if arr.is_null(row) {
nulls.append(true);
0u128
} else {
nulls.append(false);
let v = unsafe { *input_views.get_unchecked(row) };
Self::copy_view(
v,
arr,
row,
in_progress,
completed,
max_block_size,
)
}
}));
}

Nulls::None => {
self.nulls.append_n(rows.len(), false);
for &row in rows {
self.do_append_val_inner(arr, row);
}
let Self {
views,
in_progress,
completed,
max_block_size,
..
} = self;
views.extend(rows.iter().map(|&row| {
let v = unsafe { *input_views.get_unchecked(row) };
Self::copy_view(v, arr, row, in_progress, completed, max_block_size)
}));
}

Nulls::All => {
self.nulls.append_n(rows.len(), true);
let new_len = self.views.len() + rows.len();
self.views.resize(new_len, 0);
self.views.resize(self.views.len() + rows.len(), 0);
}
}
}

fn do_append_val_inner(&mut self, array: &GenericByteViewArray<B>, row: usize)
where
B: ByteViewType,
{
let value: &[u8] = array.value(row).as_ref();

let value_len = value.len();
let view = if value_len <= 12 {
make_view(value, 0, 0)
} else {
// Ensure big enough block to hold the value firstly
self.ensure_in_progress_big_enough(value_len);

// Append value
let buffer_index = self.completed.len();
let offset = self.in_progress.len();
self.in_progress.extend_from_slice(value);

make_view(value, buffer_index as u32, offset as u32)
};

// Append view
self.views.push(view);
}

fn ensure_in_progress_big_enough(&mut self, value_len: usize) {
debug_assert!(value_len > 12);
let require_cap = self.in_progress.len() + value_len;

// If current block isn't big enough, flush it and create a new in progress block
if require_cap > self.max_block_size {
let flushed_block = replace(
&mut self.in_progress,
Vec::with_capacity(self.max_block_size),
);
let buffer = Buffer::from_vec(flushed_block);
self.completed.push(buffer);
}
}

/// Compare the value at `lhs_row` in this builder with
/// the value at `rhs_row` in input `array`
///
Expand Down
Loading