Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 88 additions & 2 deletions parquet-variant-compute/src/unshred_variant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@

//! Module for unshredding VariantArray by folding typed_value columns back into the value column.

use crate::arrow_to_variant::ListLikeArray;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the nice new interface, @liamzwbao !

use crate::{BorrowedShreddingState, VariantArray, VariantValueArrayBuilder};
use arrow::array::{
Array, AsArray as _, BinaryViewArray, BooleanArray, FixedSizeBinaryArray, PrimitiveArray,
StringArray, StructArray,
Array, AsArray as _, BinaryViewArray, BooleanArray, FixedSizeBinaryArray, FixedSizeListArray,
GenericListArray, GenericListViewArray, PrimitiveArray, StringArray, StructArray,
};
use arrow::buffer::NullBuffer;
use arrow::datatypes::{
Expand Down Expand Up @@ -99,6 +100,11 @@ enum UnshredVariantRowBuilder<'a> {
PrimitiveString(UnshredPrimitiveRowBuilder<'a, StringArray>),
PrimitiveBinaryView(UnshredPrimitiveRowBuilder<'a, BinaryViewArray>),
PrimitiveUuid(UnshredPrimitiveRowBuilder<'a, FixedSizeBinaryArray>),
List(ListUnshredVariantBuilder<'a, GenericListArray<i32>>),
LargeList(ListUnshredVariantBuilder<'a, GenericListArray<i64>>),
ListView(ListUnshredVariantBuilder<'a, GenericListViewArray<i32>>),
LargeListView(ListUnshredVariantBuilder<'a, GenericListViewArray<i64>>),
FixedSizeList(ListUnshredVariantBuilder<'a, FixedSizeListArray>),
Struct(StructUnshredVariantBuilder<'a>),
ValueOnly(ValueOnlyUnshredVariantBuilder<'a>),
Null(NullUnshredVariantBuilder<'a>),
Expand Down Expand Up @@ -132,6 +138,11 @@ impl<'a> UnshredVariantRowBuilder<'a> {
Self::PrimitiveString(b) => b.append_row(builder, metadata, index),
Self::PrimitiveBinaryView(b) => b.append_row(builder, metadata, index),
Self::PrimitiveUuid(b) => b.append_row(builder, metadata, index),
Self::List(b) => b.append_row(builder, metadata, index),
Self::LargeList(b) => b.append_row(builder, metadata, index),
Self::ListView(b) => b.append_row(builder, metadata, index),
Self::LargeListView(b) => b.append_row(builder, metadata, index),
Self::FixedSizeList(b) => b.append_row(builder, metadata, index),
Self::Struct(b) => b.append_row(builder, metadata, index),
Self::ValueOnly(b) => b.append_row(builder, metadata, index),
Self::Null(b) => b.append_row(builder, metadata, index),
Expand Down Expand Up @@ -208,6 +219,25 @@ impl<'a> UnshredVariantRowBuilder<'a> {
value,
typed_value.as_struct(),
)?),
DataType::List(_) => Self::List(ListUnshredVariantBuilder::try_new(
value,
typed_value.as_list(),
)?),
DataType::LargeList(_) => Self::LargeList(ListUnshredVariantBuilder::try_new(
value,
typed_value.as_list(),
)?),
DataType::ListView(_) => Self::ListView(ListUnshredVariantBuilder::try_new(
value,
typed_value.as_list_view(),
)?),
DataType::LargeListView(_) => Self::LargeListView(ListUnshredVariantBuilder::try_new(
value,
typed_value.as_list_view(),
)?),
DataType::FixedSizeList(_, _) => Self::FixedSizeList(
ListUnshredVariantBuilder::try_new(value, typed_value.as_fixed_size_list())?,
),
_ => {
return Err(ArrowError::NotYetImplemented(format!(
"Unshredding not yet supported for type: {}",
Expand Down Expand Up @@ -517,5 +547,61 @@ impl<'a> StructUnshredVariantBuilder<'a> {
}
}

/// Builder for unshredding list/array types with recursive element processing
struct ListUnshredVariantBuilder<'a, L: ListLikeArray> {
value: Option<&'a BinaryViewArray>,
typed_value: &'a L,
element_unshredder: Box<UnshredVariantRowBuilder<'a>>,
}

impl<'a, L: ListLikeArray> ListUnshredVariantBuilder<'a, L> {
fn try_new(value: Option<&'a BinaryViewArray>, typed_value: &'a L) -> Result<Self> {
// Create a recursive unshredder for the list elements
// The element type comes from the values array of the list
let element_values = typed_value.values();

// For shredded lists, each element would be a ShreddedVariantFieldArray (struct)
// Extract value/typed_value from the element struct
let Some(element_values) = element_values.as_struct_opt() else {
return Err(ArrowError::InvalidArgumentError(format!(
"Invalid shredded variant array element: expected Struct, got {}",
element_values.data_type()
)));
};

// Create recursive unshredder for elements
//
// NOTE: A None/None array element is technically invalid, but the shredding spec
// requires us to emit `Variant::Null` when a required value is missing.
let element_unshredder = UnshredVariantRowBuilder::try_new_opt(element_values.try_into()?)?
.unwrap_or_else(|| UnshredVariantRowBuilder::null(None));

Ok(Self {
value,
typed_value,
element_unshredder: Box::new(element_unshredder),
})
}

fn append_row(
&mut self,
builder: &mut impl VariantBuilderExt,
metadata: &VariantMetadata,
index: usize,
) -> Result<()> {
handle_unshredded_case!(self, builder, metadata, index, false);

// If we get here, typed_value is valid and value is NULL -- process the list elements
let mut list_builder = builder.try_new_list()?;
for element_index in self.typed_value.element_range(index) {
self.element_unshredder
.append_row(&mut list_builder, metadata, element_index)?;
}

list_builder.finish();
Ok(())
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder how we could test this reasonably 🤔 Maybe we can rework the tests in cast_to_variant to shred and then unshred an array and verify it survives round tripping 🤔


// TODO: This code is covered by tests in `parquet/tests/variant_integration.rs`. Does that suffice?
// Or do we also need targeted stand-alone unit tests for full coverage?
16 changes: 15 additions & 1 deletion parquet-variant/src/variant/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ impl VariantListHeader {
///
/// [valid]: VariantMetadata#Validation
/// [Variant spec]: https://github.com/apache/parquet-format/blob/master/VariantEncoding.md#value-data-for-array-basic_type3
#[derive(Debug, Clone, PartialEq)]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We forgot to fix this PartialEq when we fixed the one for VariantObject.

Manual implementation below.

#[derive(Debug, Clone)]
pub struct VariantList<'m, 'v> {
pub metadata: VariantMetadata<'m>,
pub value: &'v [u8],
Expand Down Expand Up @@ -302,6 +302,20 @@ impl<'m, 'v> VariantList<'m, 'v> {
}
}

// Custom implementation of PartialEq for variant arrays
//
// Instead of comparing the raw bytes of 2 variant lists, this implementation recursively
// checks whether their elements are equal.
impl<'m, 'v> PartialEq for VariantList<'m, 'v> {
fn eq(&self, other: &Self) -> bool {
if self.num_elements != other.num_elements {
return false;
}

self.iter().zip(other.iter()).all(|(a, b)| a == b)
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
10 changes: 3 additions & 7 deletions parquet-variant/src/variant/object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,13 +419,9 @@ impl<'m, 'v> PartialEq for VariantObject<'m, 'v> {
// IFF two objects are valid and logically equal, they will have the same
// field names in the same order, because the spec requires the object
// fields to be sorted lexicographically.
for ((name_a, value_a), (name_b, value_b)) in self.iter().zip(other.iter()) {
if name_a != name_b || value_a != value_b {
return false;
}
}

true
self.iter()
.zip(other.iter())
.all(|((name_a, value_a), (name_b, value_b))| name_a == name_b && value_a == value_b)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opportunistic simplification.

}
}

Expand Down
43 changes: 16 additions & 27 deletions parquet/tests/variant_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,24 +34,16 @@ use std::{fs, path::PathBuf};

type Result<T> = std::result::Result<T, String>;

/// Creates a test function for a given case number
/// Creates a test function for a given case number.
///
/// If an error message is provided, generate an error test case that expects it.
///
/// Note the index is zero-based, while the case number is one-based
macro_rules! variant_test_case {
($case_num:literal) => {
paste::paste! {
#[test]
fn [<test_variant_integration_case_ $case_num>]() {
all_cases()[$case_num - 1].run()
}
}
};

// Generates an error test case, where the expected result is an error message
($case_num:literal, $expected_error:literal) => {
Comment on lines -41 to -51
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opportunistic simplification: Instead of duplicating the macro definition, just use $(...)? syntax to capture (and respond to) an optional error message.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL -- thanks @scovich

($case_num:literal $(, $expected_error:literal )? ) => {
paste::paste! {
#[test]
#[should_panic(expected = $expected_error)]
$( #[should_panic(expected = $expected_error)] )?
fn [<test_variant_integration_case_ $case_num>]() {
all_cases()[$case_num - 1].run()
}
Expand All @@ -65,8 +57,8 @@ macro_rules! variant_test_case {
// - cases 40, 42, 87, 127 and 128 are expected to fail always (they include invalid variants)
// - the remaining cases are expected to (eventually) pass

variant_test_case!(1, "Unshredding not yet supported for type: List(");
variant_test_case!(2, "Unshredding not yet supported for type: List(");
variant_test_case!(1);
variant_test_case!(2);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I read this diff correctly that after this PR we handle all the test cases other than Decimal? If so, that is pretty rad 🤯

(btw I think the reference to https://github.com/apache/arrow-rs/issues/8329 above variant_test_case!(4); is old and can be removed )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I read this diff correctly that after this PR we handle all the test cases other than Decimal?

I believe so!

the reference to https://github.com/apache/arrow-rs/issues/8329 above variant_test_case!(4); is old and can be removed

Good catch! Fixed in #8481, since it has nothing to do with variant arrays.

// case 3 is empty in cases.json 🤷
// ```json
// {
Expand Down Expand Up @@ -130,16 +122,14 @@ variant_test_case!(37);
variant_test_case!(38);
variant_test_case!(39);
// Is an error case (should be failing as the expected error message indicates)
// TODO: Once we support lists: "both value and typed_value are non-null"
variant_test_case!(40, "Unshredding not yet supported for type: List(");
variant_test_case!(41, "Unshredding not yet supported for type: List(");
variant_test_case!(40, "both value and typed_value are non-null");
variant_test_case!(41);
// Is an error case (should be failing as the expected error message indicates)
variant_test_case!(42, "both value and typed_value are non-null");
// Is an error case (should be failing as the expected error message indicates)
variant_test_case!(43, "Field 'b' appears in both typed_value and value");
variant_test_case!(44);
// https://github.com/apache/arrow-rs/issues/8337
variant_test_case!(45, "Unshredding not yet supported for type: List(");
variant_test_case!(45);
variant_test_case!(46);
variant_test_case!(47);
variant_test_case!(48);
Expand Down Expand Up @@ -180,12 +170,11 @@ variant_test_case!(82);
variant_test_case!(83);
// Invalid case, implementations can choose to read the shredded value or error out
variant_test_case!(84);
// https://github.com/apache/arrow-rs/issues/8337
variant_test_case!(85, "Unshredding not yet supported for type: List(");
variant_test_case!(86, "Unshredding not yet supported for type: List(");
variant_test_case!(85);
variant_test_case!(86);
// Is an error case (should be failing as the expected error message indicates)
variant_test_case!(87, "Expected object in value field");
variant_test_case!(88, "Unshredding not yet supported for type: List(");
variant_test_case!(88);
variant_test_case!(89);
variant_test_case!(90);
variant_test_case!(91);
Expand Down Expand Up @@ -224,7 +213,7 @@ variant_test_case!(123);
variant_test_case!(124);
// Is an error case (should be failing as the expected error message indicates)
variant_test_case!(125, "Field 'b' appears in both typed_value and value");
variant_test_case!(126, "Unshredding not yet supported for type: List(");
variant_test_case!(126);
// Is an error case (should be failing as the expected error message indicates)
variant_test_case!(127, "Illegal shredded value type: UInt32");
// Is an error case (should be failing as the expected error message indicates)
Expand All @@ -235,8 +224,8 @@ variant_test_case!(131);
variant_test_case!(132);
variant_test_case!(133);
variant_test_case!(134);
variant_test_case!(135, "Unshredding not yet supported for type: List(");
variant_test_case!(136, "Unshredding not yet supported for type: List(");
variant_test_case!(135);
variant_test_case!(136);
// Is an error case (should be failing as the expected error message indicates)
variant_test_case!(137, "Illegal shredded value type: FixedSizeBinary(4)");
variant_test_case!(138);
Expand Down
Loading