diff --git a/parquet-variant-compute/Cargo.toml b/parquet-variant-compute/Cargo.toml index 819a131f9c42..feb8172a9407 100644 --- a/parquet-variant-compute/Cargo.toml +++ b/parquet-variant-compute/Cargo.toml @@ -34,6 +34,7 @@ rust-version = { workspace = true } arrow = { workspace = true } arrow-schema = { workspace = true } half = { version = "2.1", default-features = false } +indexmap = "2.10.0" parquet-variant = { workspace = true } parquet-variant-json = { workspace = true } chrono = { workspace = true } diff --git a/parquet-variant-compute/src/lib.rs b/parquet-variant-compute/src/lib.rs index 70fcbdb66f95..b0d4c5ac3d3f 100644 --- a/parquet-variant-compute/src/lib.rs +++ b/parquet-variant-compute/src/lib.rs @@ -38,6 +38,7 @@ mod arrow_to_variant; pub mod cast_to_variant; mod from_json; +mod shred_variant; mod to_json; mod type_conversion; mod variant_array; @@ -50,5 +51,6 @@ pub use variant_array_builder::{VariantArrayBuilder, VariantValueArrayBuilder}; pub use cast_to_variant::{cast_to_variant, cast_to_variant_with_options}; pub use from_json::json_to_variant; +pub use shred_variant::shred_variant; pub use to_json::variant_to_json; pub use type_conversion::CastOptions; diff --git a/parquet-variant-compute/src/shred_variant.rs b/parquet-variant-compute/src/shred_variant.rs new file mode 100644 index 000000000000..9b517c034646 --- /dev/null +++ b/parquet-variant-compute/src/shred_variant.rs @@ -0,0 +1,916 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Module for shredding VariantArray with a given schema. + +use crate::variant_array::{ShreddedVariantFieldArray, StructArrayBuilder}; +use crate::variant_to_arrow::{ + make_primitive_variant_to_arrow_row_builder, PrimitiveVariantToArrowRowBuilder, +}; +use crate::{VariantArray, VariantValueArrayBuilder}; +use arrow::array::{Array as _, ArrayRef, BinaryViewArray, NullBufferBuilder}; +use arrow::buffer::NullBuffer; +use arrow::compute::CastOptions; +use arrow::datatypes::{DataType, Fields}; +use arrow::error::{ArrowError, Result}; +use parquet_variant::{ObjectBuilder, ReadOnlyMetadataBuilder, Variant}; + +use indexmap::IndexMap; +use std::sync::Arc; + +/// Shreds the input binary variant using a target shredding schema derived from the requested data type. +/// +/// For example, requesting `DataType::Int64` would produce an output variant array with the schema: +/// +/// ```text +/// { +/// metadata: BINARY, +/// value: BINARY, +/// typed_value: LONG, +/// } +/// ``` +/// +/// Similarly, requesting `DataType::Struct` with two integer fields `a` and `b` would produce an +/// output variant array with the schema: +/// +/// ```text +/// { +/// metadata: BINARY, +/// value: BINARY, +/// typed_value: { +/// a: { +/// value: BINARY, +/// typed_value: INT, +/// }, +/// b: { +/// value: BINARY, +/// typed_value: INT, +/// }, +/// } +/// } +/// ``` +pub fn shred_variant(array: &VariantArray, as_type: &DataType) -> Result { + if array.typed_value_field().is_some() { + return Err(ArrowError::InvalidArgumentError( + "Input is already shredded".to_string(), + )); + } + + if array.value_field().is_none() { + // all-null case -- nothing to do. + return Ok(array.clone()); + }; + + let cast_options = CastOptions::default(); + let mut builder = + make_variant_to_shredded_variant_arrow_row_builder(as_type, &cast_options, array.len())?; + for i in 0..array.len() { + if array.is_null(i) { + builder.append_null()?; + } else { + builder.append_value(array.value(i))?; + } + } + let (value, typed_value, nulls) = builder.finish()?; + Ok(VariantArray::from_parts( + array.metadata_field().clone(), + Some(value), + Some(typed_value), + nulls, + )) +} + +pub(crate) fn make_variant_to_shredded_variant_arrow_row_builder<'a>( + data_type: &'a DataType, + cast_options: &'a CastOptions, + capacity: usize, +) -> Result> { + let builder = match data_type { + DataType::Struct(fields) => { + let typed_value_builder = + VariantToShreddedObjectVariantRowBuilder::try_new(fields, cast_options, capacity)?; + VariantToShreddedVariantRowBuilder::Object(typed_value_builder) + } + DataType::List(_) + | DataType::LargeList(_) + | DataType::ListView(_) + | DataType::LargeListView(_) + | DataType::FixedSizeList(..) => { + return Err(ArrowError::NotYetImplemented( + "Shredding variant array values as arrow lists".to_string(), + )); + } + _ => { + let builder = + make_primitive_variant_to_arrow_row_builder(data_type, cast_options, capacity)?; + let typed_value_builder = + VariantToShreddedPrimitiveVariantRowBuilder::new(builder, capacity); + VariantToShreddedVariantRowBuilder::Primitive(typed_value_builder) + } + }; + Ok(builder) +} + +pub(crate) enum VariantToShreddedVariantRowBuilder<'a> { + Primitive(VariantToShreddedPrimitiveVariantRowBuilder<'a>), + Object(VariantToShreddedObjectVariantRowBuilder<'a>), +} +impl<'a> VariantToShreddedVariantRowBuilder<'a> { + pub fn append_null(&mut self) -> Result<()> { + use VariantToShreddedVariantRowBuilder::*; + match self { + Primitive(b) => b.append_null(), + Object(b) => b.append_null(), + } + } + + pub fn append_value(&mut self, value: Variant<'_, '_>) -> Result { + use VariantToShreddedVariantRowBuilder::*; + match self { + Primitive(b) => b.append_value(value), + Object(b) => b.append_value(value), + } + } + + pub fn finish(self) -> Result<(BinaryViewArray, ArrayRef, Option)> { + use VariantToShreddedVariantRowBuilder::*; + match self { + Primitive(b) => b.finish(), + Object(b) => b.finish(), + } + } +} + +/// A top-level variant shredder -- appending NULL produces typed_value=NULL and value=Variant::Null +pub(crate) struct VariantToShreddedPrimitiveVariantRowBuilder<'a> { + value_builder: VariantValueArrayBuilder, + typed_value_builder: PrimitiveVariantToArrowRowBuilder<'a>, + nulls: NullBufferBuilder, +} + +impl<'a> VariantToShreddedPrimitiveVariantRowBuilder<'a> { + pub(crate) fn new( + typed_value_builder: PrimitiveVariantToArrowRowBuilder<'a>, + capacity: usize, + ) -> Self { + Self { + value_builder: VariantValueArrayBuilder::new(capacity), + typed_value_builder, + nulls: NullBufferBuilder::new(capacity), + } + } + fn append_null(&mut self) -> Result<()> { + self.nulls.append_null(); + self.value_builder.append_null(); + self.typed_value_builder.append_null() + } + fn append_value(&mut self, value: Variant<'_, '_>) -> Result { + self.nulls.append_non_null(); + if self.typed_value_builder.append_value(&value)? { + self.value_builder.append_null(); + } else { + self.value_builder.append_value(value); + } + Ok(true) + } + fn finish(mut self) -> Result<(BinaryViewArray, ArrayRef, Option)> { + Ok(( + self.value_builder.build()?, + self.typed_value_builder.finish()?, + self.nulls.finish(), + )) + } +} + +pub(crate) struct VariantToShreddedObjectVariantRowBuilder<'a> { + value_builder: VariantValueArrayBuilder, + typed_value_builders: IndexMap<&'a str, VariantToShreddedVariantRowBuilder<'a>>, + typed_value_nulls: NullBufferBuilder, + nulls: NullBufferBuilder, +} + +impl<'a> VariantToShreddedObjectVariantRowBuilder<'a> { + fn try_new(fields: &'a Fields, cast_options: &'a CastOptions, capacity: usize) -> Result { + let typed_value_builders = fields.iter().map(|field| { + let builder = make_variant_to_shredded_variant_arrow_row_builder( + field.data_type(), + cast_options, + capacity, + )?; + Ok((field.name().as_str(), builder)) + }); + Ok(Self { + value_builder: VariantValueArrayBuilder::new(capacity), + typed_value_builders: typed_value_builders.collect::>()?, + typed_value_nulls: NullBufferBuilder::new(capacity), + nulls: NullBufferBuilder::new(capacity), + }) + } + + fn append_null(&mut self) -> Result<()> { + self.nulls.append_null(); + self.value_builder.append_null(); + self.typed_value_nulls.append_null(); + for (_, typed_value_builder) in &mut self.typed_value_builders { + typed_value_builder.append_null()?; + } + Ok(()) + } + fn append_value(&mut self, value: Variant<'_, '_>) -> Result { + let Variant::Object(ref obj) = value else { + // Not an object => fall back + self.nulls.append_non_null(); + self.value_builder.append_value(value); + self.typed_value_nulls.append_null(); + for (_, typed_value_builder) in &mut self.typed_value_builders { + typed_value_builder.append_null()?; + } + return Ok(false); + }; + + // Route the object's fields by name as either shredded or unshredded + let mut metadata_builder = ReadOnlyMetadataBuilder::new(value.metadata().clone()); + let state = self.value_builder.parent_state(&mut metadata_builder); + let mut object_builder = ObjectBuilder::new(state, false); + let mut seen = std::collections::HashSet::new(); + let mut partially_shredded = false; + for (field_name, value) in obj.iter() { + match self.typed_value_builders.get_mut(field_name) { + Some(typed_value_builder) => { + typed_value_builder.append_value(value)?; + seen.insert(field_name); + } + None => { + object_builder.insert_bytes(field_name, value); + partially_shredded = true; + } + } + } + + // Handle missing fields + for (field_name, typed_value_builder) in &mut self.typed_value_builders { + if !seen.contains(field_name) { + typed_value_builder.append_null()?; + } + } + + // Only emit the value if it captured any unshredded object fields + if partially_shredded { + object_builder.finish(); + } else { + drop(object_builder); + self.value_builder.append_null(); + } + + self.typed_value_nulls.append_non_null(); + self.nulls.append_non_null(); + Ok(true) + } + fn finish(mut self) -> Result<(BinaryViewArray, ArrayRef, Option)> { + let mut builder = StructArrayBuilder::new(); + for (field_name, typed_value_builder) in self.typed_value_builders { + let (value, typed_value, nulls) = typed_value_builder.finish()?; + let array = + ShreddedVariantFieldArray::from_parts(Some(value), Some(typed_value), nulls); + builder = builder.with_field(field_name, Arc::new(array), false); + } + if let Some(nulls) = self.typed_value_nulls.finish() { + builder = builder.with_nulls(nulls); + } + Ok(( + self.value_builder.build()?, + Arc::new(builder.build()), + self.nulls.finish(), + )) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::VariantArrayBuilder; + use arrow::array::{Float64Array, Int64Array}; + use arrow::datatypes::{DataType, Field, Fields}; + use parquet_variant::{Variant, VariantBuilder, VariantBuilderExt as _}; + use std::sync::Arc; + + fn create_test_variant_array(values: Vec>>) -> VariantArray { + let mut builder = VariantArrayBuilder::new(values.len()); + for value in values { + match value { + Some(v) => builder.append_variant(v), + None => builder.append_null(), + } + } + builder.build() + } + + #[test] + fn test_already_shredded_input_error() { + // Create a VariantArray that already has typed_value_field + // First create a valid VariantArray, then extract its parts to construct a shredded one + let temp_array = create_test_variant_array(vec![Some(Variant::from("test"))]); + let metadata = temp_array.metadata_field().clone(); + let value = temp_array.value_field().unwrap().clone(); + let typed_value = Arc::new(Int64Array::from(vec![42])) as ArrayRef; + + let shredded_array = + VariantArray::from_parts(metadata, Some(value), Some(typed_value), None); + + let result = shred_variant(&shredded_array, &DataType::Int64); + assert!(matches!( + result.unwrap_err(), + ArrowError::InvalidArgumentError(_) + )); + } + + #[test] + fn test_all_null_input() { + // Create VariantArray with no value field (all null case) + let metadata = BinaryViewArray::from_iter_values([&[1u8, 0u8]]); // minimal valid metadata + let all_null_array = VariantArray::from_parts(metadata, None, None, None); + let result = shred_variant(&all_null_array, &DataType::Int64).unwrap(); + + // Should return array with no value/typed_value fields + assert!(result.value_field().is_none()); + assert!(result.typed_value_field().is_none()); + } + + #[test] + fn test_unsupported_list_schema() { + let input = create_test_variant_array(vec![Some(Variant::from(42))]); + let list_schema = DataType::List(Arc::new(Field::new("item", DataType::Int64, true))); + shred_variant(&input, &list_schema).expect_err("unsupported"); + } + + #[test] + fn test_primitive_shredding_comprehensive() { + // Test mixed scenarios in a single array + let input = create_test_variant_array(vec![ + Some(Variant::from(42i64)), // successful shred + Some(Variant::from("hello")), // failed shred (string) + Some(Variant::from(100i64)), // successful shred + None, // array-level null + Some(Variant::Null), // variant null + Some(Variant::from(3i8)), // successful shred (int8->int64 conversion) + ]); + + let result = shred_variant(&input, &DataType::Int64).unwrap(); + + // Verify structure + let metadata_field = result.metadata_field(); + let value_field = result.value_field().unwrap(); + let typed_value_field = result + .typed_value_field() + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + + // Check specific outcomes for each row + assert_eq!(result.len(), 6); + + // Row 0: 42 -> should shred successfully + assert!(!result.is_null(0)); + assert!(value_field.is_null(0)); // value should be null when shredded + assert!(!typed_value_field.is_null(0)); + assert_eq!(typed_value_field.value(0), 42); + + // Row 1: "hello" -> should fail to shred + assert!(!result.is_null(1)); + assert!(!value_field.is_null(1)); // value should contain original + assert!(typed_value_field.is_null(1)); // typed_value should be null + assert_eq!( + Variant::new(metadata_field.value(1), value_field.value(1)), + Variant::from("hello") + ); + + // Row 2: 100 -> should shred successfully + assert!(!result.is_null(2)); + assert!(value_field.is_null(2)); + assert_eq!(typed_value_field.value(2), 100); + + // Row 3: array null -> should be null in result + assert!(result.is_null(3)); + + // Row 4: Variant::Null -> should not shred (it's a null variant, not an integer) + assert!(!result.is_null(4)); + assert!(!value_field.is_null(4)); // should contain Variant::Null + assert_eq!( + Variant::new(metadata_field.value(4), value_field.value(4)), + Variant::Null + ); + assert!(typed_value_field.is_null(4)); + + // Row 5: 3i8 -> should shred successfully (int8->int64 conversion) + assert!(!result.is_null(5)); + assert!(value_field.is_null(5)); // value should be null when shredded + assert!(!typed_value_field.is_null(5)); + assert_eq!(typed_value_field.value(5), 3); + } + + #[test] + fn test_primitive_different_target_types() { + let input = create_test_variant_array(vec![ + Some(Variant::from(42i32)), + Some(Variant::from(3.15f64)), + Some(Variant::from("not_a_number")), + ]); + + // Test Int32 target + let result_int32 = shred_variant(&input, &DataType::Int32).unwrap(); + let typed_value_int32 = result_int32 + .typed_value_field() + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(typed_value_int32.value(0), 42); + assert!(typed_value_int32.is_null(1)); // float doesn't convert to int32 + assert!(typed_value_int32.is_null(2)); // string doesn't convert to int32 + + // Test Float64 target + let result_float64 = shred_variant(&input, &DataType::Float64).unwrap(); + let typed_value_float64 = result_float64 + .typed_value_field() + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(typed_value_float64.value(0), 42.0); // int converts to float + assert_eq!(typed_value_float64.value(1), 3.15); + assert!(typed_value_float64.is_null(2)); // string doesn't convert + } + + #[test] + fn test_object_shredding_comprehensive() { + let mut builder = VariantArrayBuilder::new(7); + + // Row 0: Fully shredded object + builder + .new_object() + .with_field("score", 95.5f64) + .with_field("age", 30i64) + .finish(); + + // Row 1: Partially shredded object (extra email field) + builder + .new_object() + .with_field("score", 87.2f64) + .with_field("age", 25i64) + .with_field("email", "bob@example.com") + .finish(); + + // Row 2: Missing field (no score) + builder.new_object().with_field("age", 35i64).finish(); + + // Row 3: Type mismatch (score is string, age is string) + builder + .new_object() + .with_field("score", "ninety-five") + .with_field("age", "thirty") + .finish(); + + // Row 4: Non-object + builder.append_variant(Variant::from("not an object")); + + // Row 5: Empty object + builder.new_object().finish(); + + // Row 6: Null + builder.append_null(); + + // Row 7: Object with only "wrong" fields + builder.new_object().with_field("foo", 10).finish(); + + // Row 8: Object with one "right" and one "wrong" field + builder + .new_object() + .with_field("score", 66.67f64) + .with_field("foo", 10) + .finish(); + + let input = builder.build(); + + // Create target schema: struct + // Both types are supported for shredding + let fields = Fields::from(vec![ + Field::new("score", DataType::Float64, true), + Field::new("age", DataType::Int64, true), + ]); + let target_schema = DataType::Struct(fields); + + let result = shred_variant(&input, &target_schema).unwrap(); + + // Verify structure + assert!(result.value_field().is_some()); + assert!(result.typed_value_field().is_some()); + assert_eq!(result.len(), 9); + + let metadata = result.metadata_field(); + + let value = result.value_field().unwrap(); + let typed_value = result + .typed_value_field() + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + + // Extract score and age fields from typed_value struct + let score_field = typed_value + .column_by_name("score") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let age_field = typed_value + .column_by_name("age") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + + let score_value = score_field + .value_field() + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let score_typed_value = score_field + .typed_value_field() + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let age_value = age_field + .value_field() + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let age_typed_value = age_field + .typed_value_field() + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + + // Set up exhaustive checking of all shredded columns and their nulls/values + struct ShreddedValue<'m, 'v, T> { + value: Option>, + typed_value: Option, + } + struct ShreddedStruct<'m, 'v> { + score: ShreddedValue<'m, 'v, f64>, + age: ShreddedValue<'m, 'v, i64>, + } + fn get_value<'m, 'v>( + i: usize, + metadata: &'m BinaryViewArray, + value: &'v BinaryViewArray, + ) -> Variant<'m, 'v> { + Variant::new(metadata.value(i), value.value(i)) + } + let expect = |i, expected_result: Option>| { + match expected_result { + Some(ShreddedValue { + value: expected_value, + typed_value: expected_typed_value, + }) => { + assert!(result.is_valid(i)); + match expected_value { + Some(expected_value) => { + assert!(value.is_valid(i)); + assert_eq!(expected_value, get_value(i, metadata, value)); + } + None => { + assert!(value.is_null(i)); + } + } + match expected_typed_value { + Some(ShreddedStruct { + score: expected_score, + age: expected_age, + }) => { + assert!(typed_value.is_valid(i)); + assert!(score_field.is_valid(i)); // non-nullable + assert!(age_field.is_valid(i)); // non-nullable + match expected_score.value { + Some(expected_score_value) => { + assert!(score_value.is_valid(i)); + assert_eq!( + expected_score_value, + get_value(i, metadata, score_value) + ); + } + None => { + assert!(score_value.is_null(i)); + } + } + match expected_score.typed_value { + Some(expected_score) => { + assert!(score_typed_value.is_valid(i)); + assert_eq!(expected_score, score_typed_value.value(i)); + } + None => { + assert!(score_typed_value.is_null(i)); + } + } + match expected_age.value { + Some(expected_age_value) => { + assert!(age_value.is_valid(i)); + assert_eq!( + expected_age_value, + get_value(i, metadata, age_value) + ); + } + None => { + assert!(age_value.is_null(i)); + } + } + match expected_age.typed_value { + Some(expected_age) => { + assert!(age_typed_value.is_valid(i)); + assert_eq!(expected_age, age_typed_value.value(i)); + } + None => { + assert!(age_typed_value.is_null(i)); + } + } + } + None => { + assert!(typed_value.is_null(i)); + } + } + } + None => { + assert!(result.is_null(i)); + } + }; + }; + + // Row 0: Fully shredded - both fields shred successfully + expect( + 0, + Some(ShreddedValue { + value: None, + typed_value: Some(ShreddedStruct { + score: ShreddedValue { + value: None, + typed_value: Some(95.5), + }, + age: ShreddedValue { + value: None, + typed_value: Some(30), + }, + }), + }), + ); + + // Row 1: Partially shredded - value contains extra email field + let mut builder = VariantBuilder::new(); + builder + .new_object() + .with_field("email", "bob@example.com") + .finish(); + let (m, v) = builder.finish(); + let expected_value = Variant::new(&m, &v); + + expect( + 1, + Some(ShreddedValue { + value: Some(expected_value), + typed_value: Some(ShreddedStruct { + score: ShreddedValue { + value: None, + typed_value: Some(87.2), + }, + age: ShreddedValue { + value: None, + typed_value: Some(25), + }, + }), + }), + ); + + // Row 2: Fully shredded -- missing score field + expect( + 2, + Some(ShreddedValue { + value: None, + typed_value: Some(ShreddedStruct { + score: ShreddedValue { + value: None, + typed_value: None, + }, + age: ShreddedValue { + value: None, + typed_value: Some(35), + }, + }), + }), + ); + + // Row 3: Type mismatches - both score and age are strings + expect( + 3, + Some(ShreddedValue { + value: None, + typed_value: Some(ShreddedStruct { + score: ShreddedValue { + value: Some(Variant::from("ninety-five")), + typed_value: None, + }, + age: ShreddedValue { + value: Some(Variant::from("thirty")), + typed_value: None, + }, + }), + }), + ); + + // Row 4: Non-object - falls back to value field + expect( + 4, + Some(ShreddedValue { + value: Some(Variant::from("not an object")), + typed_value: None, + }), + ); + + // Row 5: Empty object + expect( + 5, + Some(ShreddedValue { + value: None, + typed_value: Some(ShreddedStruct { + score: ShreddedValue { + value: None, + typed_value: None, + }, + age: ShreddedValue { + value: None, + typed_value: None, + }, + }), + }), + ); + + // Row 6: Null + expect(6, None); + + // Helper to correctly create a variant object using a row's existing metadata + let object_with_foo_field = |i| { + use parquet_variant::{ParentState, ValueBuilder, VariantMetadata}; + let metadata = VariantMetadata::new(metadata.value(i)); + let mut metadata_builder = ReadOnlyMetadataBuilder::new(metadata.clone()); + let mut value_builder = ValueBuilder::new(); + let state = ParentState::variant(&mut value_builder, &mut metadata_builder); + ObjectBuilder::new(state, false) + .with_field("foo", 10) + .finish(); + (metadata, value_builder.into_inner()) + }; + + // Row 7: Object with only a "wrong" field + let (m, v) = object_with_foo_field(7); + expect( + 7, + Some(ShreddedValue { + value: Some(Variant::new_with_metadata(m, &v)), + typed_value: Some(ShreddedStruct { + score: ShreddedValue { + value: None, + typed_value: None, + }, + age: ShreddedValue { + value: None, + typed_value: None, + }, + }), + }), + ); + + // Row 8: Object with one "wrong" and one "right" field + let (m, v) = object_with_foo_field(8); + expect( + 8, + Some(ShreddedValue { + value: Some(Variant::new_with_metadata(m, &v)), + typed_value: Some(ShreddedStruct { + score: ShreddedValue { + value: None, + typed_value: Some(66.67), + }, + age: ShreddedValue { + value: None, + typed_value: None, + }, + }), + }), + ); + } + + #[test] + fn test_object_different_schemas() { + // Create object with multiple fields + let mut builder = VariantArrayBuilder::new(1); + builder + .new_object() + .with_field("id", 123i32) + .with_field("age", 25i64) + .with_field("score", 95.5f64) + .finish(); + let input = builder.build(); + + // Test with schema containing only id field + let schema1 = DataType::Struct(Fields::from(vec![Field::new("id", DataType::Int32, true)])); + let result1 = shred_variant(&input, &schema1).unwrap(); + let value_field1 = result1.value_field().unwrap(); + assert!(!value_field1.is_null(0)); // should contain {"age": 25, "score": 95.5} + + // Test with schema containing id and age fields + let schema2 = DataType::Struct(Fields::from(vec![ + Field::new("id", DataType::Int32, true), + Field::new("age", DataType::Int64, true), + ])); + let result2 = shred_variant(&input, &schema2).unwrap(); + let value_field2 = result2.value_field().unwrap(); + assert!(!value_field2.is_null(0)); // should contain {"score": 95.5} + + // Test with schema containing all fields + let schema3 = DataType::Struct(Fields::from(vec![ + Field::new("id", DataType::Int32, true), + Field::new("age", DataType::Int64, true), + Field::new("score", DataType::Float64, true), + ])); + let result3 = shred_variant(&input, &schema3).unwrap(); + let value_field3 = result3.value_field().unwrap(); + assert!(value_field3.is_null(0)); // fully shredded, no remaining fields + } + + #[test] + fn test_spec_compliance() { + let input = create_test_variant_array(vec![ + Some(Variant::from(42i64)), + Some(Variant::from("hello")), + ]); + + let result = shred_variant(&input, &DataType::Int64).unwrap(); + + // Test field access by name (not position) + let inner_struct = result.inner(); + assert!(inner_struct.column_by_name("metadata").is_some()); + assert!(inner_struct.column_by_name("value").is_some()); + assert!(inner_struct.column_by_name("typed_value").is_some()); + + // Test metadata preservation + assert_eq!(result.metadata_field().len(), input.metadata_field().len()); + // The metadata should be the same reference (cheap clone) + // Note: BinaryViewArray doesn't have a .values() method, so we compare the arrays directly + assert_eq!(result.metadata_field().len(), input.metadata_field().len()); + + // Test output structure correctness + assert_eq!(result.len(), input.len()); + assert!(result.value_field().is_some()); + assert!(result.typed_value_field().is_some()); + + // For primitive shredding, verify that value and typed_value are never both non-null + // (This rule applies to primitives; for objects, both can be non-null for partial shredding) + let value_field = result.value_field().unwrap(); + let typed_value_field = result + .typed_value_field() + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + + for i in 0..result.len() { + if !result.is_null(i) { + let value_is_null = value_field.is_null(i); + let typed_value_is_null = typed_value_field.is_null(i); + // For primitive shredding, at least one should be null + assert!( + value_is_null || typed_value_is_null, + "Row {}: both value and typed_value are non-null for primitive shredding", + i + ); + } + } + } +} diff --git a/parquet-variant-compute/src/variant_array.rs b/parquet-variant-compute/src/variant_array.rs index 4abffa65c23f..c42ac810f159 100644 --- a/parquet-variant-compute/src/variant_array.rs +++ b/parquet-variant-compute/src/variant_array.rs @@ -48,7 +48,7 @@ use std::sync::Arc; /// /// [Extension Type for Parquet Variant arrow]: https://github.com/apache/arrow/issues/46908 /// [document]: https://docs.google.com/document/d/1pw0AWoMQY3SjD7R4LgbPvMjG_xSCtXp3rZHkVp9jpZ4/edit?usp=sharing -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct VariantArray { /// Reference to the underlying StructArray inner: StructArray, @@ -129,7 +129,7 @@ impl VariantArray { Ok(Self { inner: inner.clone(), metadata: metadata.clone(), - shredding_state: ShreddingState::try_new(value, typed_value)?, + shredding_state: ShreddingState::new(value, typed_value), }) } @@ -151,14 +151,10 @@ impl VariantArray { builder = builder.with_nulls(nulls); } - // This would be a lot simpler if ShreddingState were just a pair of Option... we already - // have everything we need. - let inner = builder.build(); - let shredding_state = ShreddingState::try_new(value, typed_value).unwrap(); // valid by construction Self { - inner, + inner: builder.build(), metadata, - shredding_state, + shredding_state: ShreddingState::new(value, typed_value), } } @@ -325,10 +321,9 @@ impl ShreddedVariantFieldArray { let typed_value = inner_struct.column_by_name("typed_value").cloned(); // Note this clone is cheap, it just bumps the ref count - let inner = inner_struct.clone(); Ok(Self { - inner: inner.clone(), - shredding_state: ShreddingState::try_new(value, typed_value)?, + inner: inner_struct.clone(), + shredding_state: ShreddingState::new(value, typed_value), }) } @@ -351,6 +346,28 @@ impl ShreddedVariantFieldArray { pub fn inner(&self) -> &StructArray { &self.inner } + + pub(crate) fn from_parts( + value: Option, + typed_value: Option, + nulls: Option, + ) -> Self { + let mut builder = StructArrayBuilder::new(); + if let Some(value) = value.clone() { + builder = builder.with_field("value", Arc::new(value), true); + } + if let Some(typed_value) = typed_value.clone() { + builder = builder.with_field("typed_value", typed_value, true); + } + if let Some(nulls) = nulls { + builder = builder.with_nulls(nulls); + } + + Self { + inner: builder.build(), + shredding_state: ShreddingState::new(value, typed_value), + } + } } impl Array for ShreddedVariantFieldArray { @@ -425,7 +442,7 @@ impl Array for ShreddedVariantFieldArray { /// | non-null | non-null | The value is present and is a partially shredded object | /// /// [Parquet Variant Shredding Spec]: https://github.com/apache/parquet-format/blob/master/VariantShredding.md#value-shredding -#[derive(Debug)] +#[derive(Clone, Debug)] pub enum ShreddingState { /// This variant has no typed_value field Unshredded { value: BinaryViewArray }, @@ -456,16 +473,13 @@ pub enum ShreddingState { } impl ShreddingState { - /// try to create a new `ShreddingState` from the given fields - pub fn try_new( - value: Option, - typed_value: Option, - ) -> Result { + /// Create a new `ShreddingState` from the given fields + pub fn new(value: Option, typed_value: Option) -> Self { match (value, typed_value) { - (Some(value), Some(typed_value)) => Ok(Self::PartiallyShredded { value, typed_value }), - (Some(value), None) => Ok(Self::Unshredded { value }), - (None, Some(typed_value)) => Ok(Self::Typed { typed_value }), - (None, None) => Ok(Self::AllNull), + (Some(value), Some(typed_value)) => Self::PartiallyShredded { value, typed_value }, + (Some(value), None) => Self::Unshredded { value }, + (None, Some(typed_value)) => Self::Typed { typed_value }, + (None, None) => Self::AllNull, } } @@ -779,10 +793,11 @@ mod test { #[test] fn all_null_shredding_state() { - let shredding_state = ShreddingState::try_new(None, None).unwrap(); - // Verify the shredding state is AllNull - assert!(matches!(shredding_state, ShreddingState::AllNull)); + assert!(matches!( + ShreddingState::new(None, None), + ShreddingState::AllNull + )); } #[test] diff --git a/parquet-variant-compute/src/variant_get.rs b/parquet-variant-compute/src/variant_get.rs index 0e111685169b..a6ff46e29024 100644 --- a/parquet-variant-compute/src/variant_get.rs +++ b/parquet-variant-compute/src/variant_get.rs @@ -146,7 +146,7 @@ fn shredded_get_path( if target.is_null(i) { builder.append_null()?; } else { - builder.append_value(&target.value(i))?; + builder.append_value(target.value(i))?; } } builder.finish() @@ -1388,7 +1388,7 @@ mod test { } Err(e) => { println!("Nested path 'a.x' error: {}", e); - if e.to_string().contains("not yet implemented") + if e.to_string().contains("Not yet implemented") || e.to_string().contains("NotYetImplemented") { println!("This is expected - nested paths are not implemented"); @@ -2559,7 +2559,7 @@ mod test { // Should fail with NotYetImplemented when the row builder tries to handle struct type assert!(result.is_err()); let error = result.unwrap_err(); - assert!(error.to_string().contains("not yet implemented")); + assert!(error.to_string().contains("Not yet implemented")); } /// Create comprehensive shredded variant with diverse null patterns and empty objects diff --git a/parquet-variant-compute/src/variant_to_arrow.rs b/parquet-variant-compute/src/variant_to_arrow.rs index df9677edfb44..12be4f0748e3 100644 --- a/parquet-variant-compute/src/variant_to_arrow.rs +++ b/parquet-variant-compute/src/variant_to_arrow.rs @@ -26,32 +26,38 @@ use crate::{VariantArray, VariantValueArrayBuilder}; use std::sync::Arc; -/// Builder for converting variant values into strongly typed Arrow arrays. -/// -/// Useful for variant_get kernels that need to extract specific paths from variant values, possibly -/// with casting of leaf values to specific types. -pub(crate) enum VariantToArrowRowBuilder<'a> { - // Direct builders (no path extraction) +/// Builder for converting variant values to primitive Arrow arrays. It is used by both +/// `VariantToArrowRowBuilder` (below) and `VariantToShreddedPrimitiveVariantRowBuilder` (in +/// `shred_variant.rs`). +pub(crate) enum PrimitiveVariantToArrowRowBuilder<'a> { Int8(VariantToPrimitiveArrowRowBuilder<'a, datatypes::Int8Type>), Int16(VariantToPrimitiveArrowRowBuilder<'a, datatypes::Int16Type>), Int32(VariantToPrimitiveArrowRowBuilder<'a, datatypes::Int32Type>), Int64(VariantToPrimitiveArrowRowBuilder<'a, datatypes::Int64Type>), - Float16(VariantToPrimitiveArrowRowBuilder<'a, datatypes::Float16Type>), - Float32(VariantToPrimitiveArrowRowBuilder<'a, datatypes::Float32Type>), - Float64(VariantToPrimitiveArrowRowBuilder<'a, datatypes::Float64Type>), UInt8(VariantToPrimitiveArrowRowBuilder<'a, datatypes::UInt8Type>), UInt16(VariantToPrimitiveArrowRowBuilder<'a, datatypes::UInt16Type>), UInt32(VariantToPrimitiveArrowRowBuilder<'a, datatypes::UInt32Type>), UInt64(VariantToPrimitiveArrowRowBuilder<'a, datatypes::UInt64Type>), + Float16(VariantToPrimitiveArrowRowBuilder<'a, datatypes::Float16Type>), + Float32(VariantToPrimitiveArrowRowBuilder<'a, datatypes::Float32Type>), + Float64(VariantToPrimitiveArrowRowBuilder<'a, datatypes::Float64Type>), +} + +/// Builder for converting variant values into strongly typed Arrow arrays. +/// +/// Useful for variant_get kernels that need to extract specific paths from variant values, possibly +/// with casting of leaf values to specific types. +pub(crate) enum VariantToArrowRowBuilder<'a> { + Primitive(PrimitiveVariantToArrowRowBuilder<'a>), BinaryVariant(VariantToBinaryVariantArrowRowBuilder), // Path extraction wrapper - contains a boxed enum for any of the above WithPath(VariantPathRowBuilder<'a>), } -impl<'a> VariantToArrowRowBuilder<'a> { +impl<'a> PrimitiveVariantToArrowRowBuilder<'a> { pub fn append_null(&mut self) -> Result<()> { - use VariantToArrowRowBuilder::*; + use PrimitiveVariantToArrowRowBuilder::*; match self { Int8(b) => b.append_null(), Int16(b) => b.append_null(), @@ -64,13 +70,11 @@ impl<'a> VariantToArrowRowBuilder<'a> { Float16(b) => b.append_null(), Float32(b) => b.append_null(), Float64(b) => b.append_null(), - BinaryVariant(b) => b.append_null(), - WithPath(path_builder) => path_builder.append_null(), } } pub fn append_value(&mut self, value: &Variant<'_, '_>) -> Result { - use VariantToArrowRowBuilder::*; + use PrimitiveVariantToArrowRowBuilder::*; match self { Int8(b) => b.append_value(value), Int16(b) => b.append_value(value), @@ -83,13 +87,11 @@ impl<'a> VariantToArrowRowBuilder<'a> { Float16(b) => b.append_value(value), Float32(b) => b.append_value(value), Float64(b) => b.append_value(value), - BinaryVariant(b) => b.append_value(value), - WithPath(path_builder) => path_builder.append_value(value), } } pub fn finish(self) -> Result { - use VariantToArrowRowBuilder::*; + use PrimitiveVariantToArrowRowBuilder::*; match self { Int8(b) => b.finish(), Int16(b) => b.finish(), @@ -102,77 +104,142 @@ impl<'a> VariantToArrowRowBuilder<'a> { Float16(b) => b.finish(), Float32(b) => b.finish(), Float64(b) => b.finish(), + } + } +} + +impl<'a> VariantToArrowRowBuilder<'a> { + pub fn append_null(&mut self) -> Result<()> { + use VariantToArrowRowBuilder::*; + match self { + Primitive(b) => b.append_null(), + BinaryVariant(b) => b.append_null(), + WithPath(path_builder) => path_builder.append_null(), + } + } + + pub fn append_value(&mut self, value: Variant<'_, '_>) -> Result { + use VariantToArrowRowBuilder::*; + match self { + Primitive(b) => b.append_value(&value), + BinaryVariant(b) => b.append_value(value), + WithPath(path_builder) => path_builder.append_value(value), + } + } + + pub fn finish(self) -> Result { + use VariantToArrowRowBuilder::*; + match self { + Primitive(b) => b.finish(), BinaryVariant(b) => b.finish(), WithPath(path_builder) => path_builder.finish(), } } } -pub(crate) fn make_variant_to_arrow_row_builder<'a>( - metadata: &BinaryViewArray, - path: VariantPath<'a>, - data_type: Option<&'a DataType>, +/// Creates a primitive row builder, returning Err if the requested data type is not primitive. +pub(crate) fn make_primitive_variant_to_arrow_row_builder<'a>( + data_type: &'a DataType, cast_options: &'a CastOptions, capacity: usize, -) -> Result> { - use VariantToArrowRowBuilder::*; +) -> Result> { + use PrimitiveVariantToArrowRowBuilder::*; - let mut builder = match data_type { - // If no data type was requested, build an unshredded VariantArray. - None => BinaryVariant(VariantToBinaryVariantArrowRowBuilder::new( - metadata.clone(), - capacity, - )), - Some(DataType::Int8) => Int8(VariantToPrimitiveArrowRowBuilder::new( + let builder = match data_type { + DataType::Int8 => Int8(VariantToPrimitiveArrowRowBuilder::new( cast_options, capacity, )), - Some(DataType::Int16) => Int16(VariantToPrimitiveArrowRowBuilder::new( + DataType::Int16 => Int16(VariantToPrimitiveArrowRowBuilder::new( cast_options, capacity, )), - Some(DataType::Int32) => Int32(VariantToPrimitiveArrowRowBuilder::new( + DataType::Int32 => Int32(VariantToPrimitiveArrowRowBuilder::new( cast_options, capacity, )), - Some(DataType::Int64) => Int64(VariantToPrimitiveArrowRowBuilder::new( + DataType::Int64 => Int64(VariantToPrimitiveArrowRowBuilder::new( cast_options, capacity, )), - Some(DataType::Float16) => Float16(VariantToPrimitiveArrowRowBuilder::new( + DataType::UInt8 => UInt8(VariantToPrimitiveArrowRowBuilder::new( cast_options, capacity, )), - Some(DataType::Float32) => Float32(VariantToPrimitiveArrowRowBuilder::new( + DataType::UInt16 => UInt16(VariantToPrimitiveArrowRowBuilder::new( cast_options, capacity, )), - Some(DataType::Float64) => Float64(VariantToPrimitiveArrowRowBuilder::new( + DataType::UInt32 => UInt32(VariantToPrimitiveArrowRowBuilder::new( cast_options, capacity, )), - Some(DataType::UInt8) => UInt8(VariantToPrimitiveArrowRowBuilder::new( + DataType::UInt64 => UInt64(VariantToPrimitiveArrowRowBuilder::new( cast_options, capacity, )), - Some(DataType::UInt16) => UInt16(VariantToPrimitiveArrowRowBuilder::new( + DataType::Float16 => Float16(VariantToPrimitiveArrowRowBuilder::new( cast_options, capacity, )), - Some(DataType::UInt32) => UInt32(VariantToPrimitiveArrowRowBuilder::new( + DataType::Float32 => Float32(VariantToPrimitiveArrowRowBuilder::new( cast_options, capacity, )), - Some(DataType::UInt64) => UInt64(VariantToPrimitiveArrowRowBuilder::new( + DataType::Float64 => Float64(VariantToPrimitiveArrowRowBuilder::new( cast_options, capacity, )), - _ => { + _ if data_type.is_primitive() => { return Err(ArrowError::NotYetImplemented(format!( - "variant_get with path={:?} and data_type={:?} not yet implemented", - path, data_type + "Primitive data_type {data_type:?} not yet implemented" ))); } + _ => { + return Err(ArrowError::InvalidArgumentError(format!( + "Not a primitive type: {data_type:?}" + ))); + } + }; + Ok(builder) +} + +pub(crate) fn make_variant_to_arrow_row_builder<'a>( + metadata: &BinaryViewArray, + path: VariantPath<'a>, + data_type: Option<&'a DataType>, + cast_options: &'a CastOptions, + capacity: usize, +) -> Result> { + use VariantToArrowRowBuilder::*; + + let mut builder = match data_type { + // If no data type was requested, build an unshredded VariantArray. + None => BinaryVariant(VariantToBinaryVariantArrowRowBuilder::new( + metadata.clone(), + capacity, + )), + Some(DataType::Struct(_)) => { + return Err(ArrowError::NotYetImplemented( + "Converting unshredded variant objects to arrow structs".to_string(), + )); + } + Some( + DataType::List(_) + | DataType::LargeList(_) + | DataType::ListView(_) + | DataType::LargeListView(_) + | DataType::FixedSizeList(..), + ) => { + return Err(ArrowError::NotYetImplemented( + "Converting unshredded variant arrays to arrow lists".to_string(), + )); + } + Some(data_type) => { + let builder = + make_primitive_variant_to_arrow_row_builder(data_type, cast_options, capacity)?; + Primitive(builder) + } }; // Wrap with path extraction if needed @@ -198,9 +265,9 @@ impl<'a> VariantPathRowBuilder<'a> { self.builder.append_null() } - fn append_value(&mut self, value: &Variant<'_, '_>) -> Result { + fn append_value(&mut self, value: Variant<'_, '_>) -> Result { if let Some(v) = value.get_path(&self.path) { - self.builder.append_value(&v) + self.builder.append_value(v) } else { self.builder.append_null()?; Ok(false) @@ -303,8 +370,8 @@ impl VariantToBinaryVariantArrowRowBuilder { Ok(()) } - fn append_value(&mut self, value: &Variant<'_, '_>) -> Result { - self.builder.append_value(value.clone()); + fn append_value(&mut self, value: Variant<'_, '_>) -> Result { + self.builder.append_value(value); self.nulls.append_non_null(); Ok(true) } diff --git a/parquet-variant/src/builder.rs b/parquet-variant/src/builder.rs index 1480d6400db1..95a30c206d59 100644 --- a/parquet-variant/src/builder.rs +++ b/parquet-variant/src/builder.rs @@ -3441,7 +3441,7 @@ mod tests { let mut metadata = ReadOnlyMetadataBuilder::new(metadata); let mut builder2 = ValueBuilder::new(); let state = ParentState::variant(&mut builder2, &mut metadata); - ValueBuilder::append_variant_bytes(state, variant1.clone()); + ValueBuilder::append_variant_bytes(state, variant1); let value2 = builder2.into_inner(); // The bytes should be identical, we merely copied them across.