Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ use std::sync::Arc;

const BYTE_VIEW_MAX_BLOCK_SIZE: usize = 2 * 1024 * 1024;

#[derive(Clone, Copy)]
struct PendingByteViewCopy {
dest_index: usize,
source: ByteView,
}

/// An implementation of [`GroupColumn`] for binary view and utf8 view types.
///
/// Stores a collection of binary view or utf8 view group values in a buffer
Expand Down Expand Up @@ -159,16 +165,12 @@ impl<B: ByteViewType> ByteViewGroupValueBuilder<B> {

match all_null_or_non_null {
Nulls::Some => {
for &row in rows {
self.append_val_inner(array, row);
}
self.vectorized_append_views_with_nulls(arr, rows);
}

Nulls::None => {
self.nulls.append_n(rows.len(), false);
for &row in rows {
self.do_append_val_inner(arr, row);
}
self.vectorized_append_non_null_views(arr, rows);
}

Nulls::All => {
Expand All @@ -179,6 +181,133 @@ impl<B: ByteViewType> ByteViewGroupValueBuilder<B> {
}
}

fn vectorized_append_non_null_views(
&mut self,
array: &GenericByteViewArray<B>,
rows: &[usize],
) {
let source_views = array.views();
self.views.reserve(rows.len());

if array.data_buffers().is_empty() {
self.views.extend(rows.iter().map(|&row| source_views[row]));
return;
}

let start_idx = self.views.len();
let mut pending = Vec::with_capacity(rows.len().saturating_add(1) / 2);
for (idx, &row) in rows.iter().enumerate() {
let view = source_views[row];
self.views.push(view);
if (view as u32) > 12 {
pending.push(PendingByteViewCopy {
dest_index: start_idx + idx,
source: ByteView::from(view),
});
}
}

self.batch_copy_long_views(array.data_buffers(), &pending);
}

fn vectorized_append_views_with_nulls(
&mut self,
array: &GenericByteViewArray<B>,
rows: &[usize],
) {
let source_views = array.views();
let mut pending = Vec::with_capacity(rows.len().saturating_add(1) / 2);
self.views.reserve(rows.len());

for &row in rows {
if array.is_null(row) {
self.nulls.append(true);
self.views.push(0);
continue;
}

self.nulls.append(false);

let view = source_views[row];
let dest_index = self.views.len();
self.views.push(view);

if (view as u32) > 12 {
pending.push(PendingByteViewCopy {
dest_index,
source: ByteView::from(view),
});
}
}

self.batch_copy_long_views(array.data_buffers(), &pending);
}

fn batch_copy_long_views(
&mut self,
source_buffers: &[Buffer],
pending: &[PendingByteViewCopy],
) {
let mut batch_start = 0;
while batch_start < pending.len() {
let first = pending[batch_start].source;
let first_len = first.length as usize;

if self.in_progress.len() + first_len > self.max_block_size
&& !self.in_progress.is_empty()
{
self.flush_in_progress();
}

let max_batch_len = if self.in_progress.is_empty() {
self.max_block_size.max(first_len)
} else {
self.max_block_size - self.in_progress.len()
};

let source_buffer_index = first.buffer_index as usize;
let batch_source_start = first.offset as usize;
let mut batch_source_end = batch_source_start + first_len;
let mut batch_end = batch_start + 1;

while batch_end < pending.len() {
let next = pending[batch_end].source;
let next_start = next.offset as usize;
let next_end = next_start + next.length as usize;

if next.buffer_index as usize != source_buffer_index
|| next_start != batch_source_end
|| next_end - batch_source_start > max_batch_len
{
break;
}

batch_source_end = next_end;
batch_end += 1;
}

let buffer_index = self.completed.len();
let dest_batch_start = self.in_progress.len();
let batch_len = batch_source_end - batch_source_start;
self.in_progress.reserve(batch_len);

let source_buffer = &source_buffers[source_buffer_index];
self.in_progress.extend_from_slice(
&source_buffer.as_slice()[batch_source_start..batch_source_end],
);

for pending_copy in &pending[batch_start..batch_end] {
let mut view = pending_copy.source;
view.buffer_index = buffer_index as u32;
view.offset = (dest_batch_start + pending_copy.source.offset as usize
- batch_source_start) as u32;
self.views[pending_copy.dest_index] = view.as_u128();
}

batch_start = batch_end;
}
}

fn do_append_val_inner(&mut self, array: &GenericByteViewArray<B>, row: usize)
where
B: ByteViewType,
Expand Down Expand Up @@ -584,7 +713,7 @@ mod tests {
use std::sync::Arc;

use crate::aggregates::group_values::multi_group_by::bytes_view::ByteViewGroupValueBuilder;
use arrow::array::{ArrayRef, AsArray, NullBufferBuilder, StringViewArray};
use arrow::array::{ArrayRef, AsArray, ByteView, NullBufferBuilder, StringViewArray};
use arrow::datatypes::StringViewType;

use super::GroupColumn;
Expand Down Expand Up @@ -724,6 +853,169 @@ mod tests {
assert!(equal_to_results[4]);
}

#[test]
fn test_byte_view_vectorized_append_subset_and_repeated_rows() {
let mut builder =
ByteViewGroupValueBuilder::<StringViewType>::new().with_max_block_size(256);

let input_array = Arc::new(StringViewArray::from(vec![
Some("this string is definitely long 0"),
Some("tiny"),
Some("this string is definitely long 1"),
None,
Some("this string is definitely long 2"),
Some("bar"),
Some("this string is definitely long 3"),
])) as ArrayRef;

let rows = [0, 1, 4, 3, 2, 4, 6];
builder.vectorized_append(&input_array, &rows).unwrap();

let output = Box::new(builder).build();
let expected = Arc::new(StringViewArray::from(vec![
Some("this string is definitely long 0"),
Some("tiny"),
Some("this string is definitely long 2"),
None,
Some("this string is definitely long 1"),
Some("this string is definitely long 2"),
Some("this string is definitely long 3"),
])) as ArrayRef;

assert_eq!(&output, &expected);
}

#[test]
fn test_byte_view_take_n_after_vectorized_append() {
let long0 = "a".repeat(24);
let long1 = "b".repeat(24);
let long2 = "c".repeat(24);
let long3 = "d".repeat(24);

let mut builder =
ByteViewGroupValueBuilder::<StringViewType>::new().with_max_block_size(50);
let input_array = Arc::new(StringViewArray::from(vec![
None,
Some("foo"),
Some(long0.as_str()),
Some(long1.as_str()),
None,
Some(long2.as_str()),
Some(long3.as_str()),
Some("bar"),
])) as ArrayRef;
let rows = (0..input_array.len()).collect::<Vec<_>>();

builder.vectorized_append(&input_array, &rows).unwrap();

assert_eq!(builder.completed.len(), 1);
assert_eq!(builder.in_progress.len(), long2.len() + long3.len());

let taken_array = builder.take_n(4);
assert_eq!(&taken_array, &input_array.slice(0, 4));

let taken_array = builder.take_n(4);
assert_eq!(&taken_array, &input_array.slice(4, 4));

assert!(builder.completed.is_empty());
assert!(builder.in_progress.is_empty());
assert!(builder.views.is_empty());
}

#[test]
fn test_byte_view_vectorized_append_multiple_long_batches() {
let long0 = "a".repeat(24);
let long1 = "b".repeat(24);
let long2 = "c".repeat(24);
let long3 = "d".repeat(24);

let mut builder =
ByteViewGroupValueBuilder::<StringViewType>::new().with_max_block_size(50);

let first_batch = Arc::new(StringViewArray::from(vec![
Some(long0.as_str()),
Some(long1.as_str()),
])) as ArrayRef;
builder.vectorized_append(&first_batch, &[0, 1]).unwrap();

assert!(builder.completed.is_empty());
assert_eq!(ByteView::from(builder.views[0]).buffer_index, 0);
assert_eq!(ByteView::from(builder.views[1]).buffer_index, 0);

let second_batch = Arc::new(StringViewArray::from(vec![
Some(long2.as_str()),
Some(long3.as_str()),
])) as ArrayRef;
builder.vectorized_append(&second_batch, &[0, 1]).unwrap();

assert_eq!(builder.completed.len(), 1);
assert_eq!(ByteView::from(builder.views[0]).buffer_index, 0);
assert_eq!(ByteView::from(builder.views[1]).buffer_index, 0);
assert_eq!(ByteView::from(builder.views[2]).buffer_index, 1);
assert_eq!(ByteView::from(builder.views[3]).buffer_index, 1);

let output = Box::new(builder).build();
let expected = Arc::new(StringViewArray::from(vec![
Some(long0.as_str()),
Some(long1.as_str()),
Some(long2.as_str()),
Some(long3.as_str()),
])) as ArrayRef;
assert_eq!(&output, &expected);
}

#[test]
fn test_byte_view_vectorized_append_flushes_mid_batch() {
let long0 = "a".repeat(24);
let long1 = "b".repeat(24);
let long2 = "c".repeat(24);
let long3 = "d".repeat(24);

let mut builder =
ByteViewGroupValueBuilder::<StringViewType>::new().with_max_block_size(50);
let input_array = Arc::new(StringViewArray::from(vec![
Some(long0.as_str()),
Some(long1.as_str()),
Some(long2.as_str()),
Some(long3.as_str()),
])) as ArrayRef;

builder
.vectorized_append(&input_array, &[0, 1, 2, 3])
.unwrap();

assert_eq!(builder.completed.len(), 1);
assert_eq!(builder.completed[0].len(), long0.len() + long1.len());
assert_eq!(builder.in_progress.len(), long2.len() + long3.len());
assert_eq!(ByteView::from(builder.views[0]).buffer_index, 0);
assert_eq!(ByteView::from(builder.views[1]).buffer_index, 0);
assert_eq!(ByteView::from(builder.views[2]).buffer_index, 1);
assert_eq!(ByteView::from(builder.views[3]).buffer_index, 1);

let output = Box::new(builder).build();
assert_eq!(&output, &input_array);
}

#[test]
fn test_byte_view_vectorized_append_single_oversized_value() {
let oversized = "z".repeat(32);

let mut builder =
ByteViewGroupValueBuilder::<StringViewType>::new().with_max_block_size(16);
let input_array =
Arc::new(StringViewArray::from(vec![Some(oversized.as_str())])) as ArrayRef;

builder.vectorized_append(&input_array, &[0]).unwrap();

assert!(builder.completed.is_empty());
assert_eq!(builder.in_progress.len(), oversized.len());
assert_eq!(ByteView::from(builder.views[0]).buffer_index, 0);
assert_eq!(ByteView::from(builder.views[0]).offset, 0);

let output = Box::new(builder).build();
assert_eq!(&output, &input_array);
}

fn test_byte_view_equal_to_internal<A, E>(mut append: A, mut equal_to: E)
where
A: FnMut(&mut ByteViewGroupValueBuilder<StringViewType>, &ArrayRef, &[usize]),
Expand Down
Loading