diff --git a/parquet-variant-compute/Cargo.toml b/parquet-variant-compute/Cargo.toml index 819a131f9c42..94d75a38c6d9 100644 --- a/parquet-variant-compute/Cargo.toml +++ b/parquet-variant-compute/Cargo.toml @@ -31,7 +31,7 @@ rust-version = { workspace = true } [dependencies] -arrow = { workspace = true } +arrow = { workspace = true , features = ["canonical_extension_types"]} arrow-schema = { workspace = true } half = { version = "2.1", default-features = false } parquet-variant = { workspace = true } diff --git a/parquet-variant-compute/benches/variant_kernels.rs b/parquet-variant-compute/benches/variant_kernels.rs index 5e97f948b231..3cdb28229b8a 100644 --- a/parquet-variant-compute/benches/variant_kernels.rs +++ b/parquet-variant-compute/benches/variant_kernels.rs @@ -84,7 +84,7 @@ fn benchmark_batch_json_string_to_variant(c: &mut Criterion) { pub fn variant_get_bench(c: &mut Criterion) { let variant_array = create_primitive_variant_array(8192); - let input: ArrayRef = Arc::new(variant_array); + let input = ArrayRef::from(variant_array); let options = GetOptions { path: vec![].into(), diff --git a/parquet-variant-compute/src/lib.rs b/parquet-variant-compute/src/lib.rs index 70fcbdb66f95..26f7d6ca1636 100644 --- a/parquet-variant-compute/src/lib.rs +++ b/parquet-variant-compute/src/lib.rs @@ -45,7 +45,7 @@ mod variant_array_builder; pub mod variant_get; mod variant_to_arrow; -pub use variant_array::{ShreddingState, VariantArray}; +pub use variant_array::{ShreddingState, VariantArray, VariantType}; pub use variant_array_builder::{VariantArrayBuilder, VariantValueArrayBuilder}; pub use cast_to_variant::{cast_to_variant, cast_to_variant_with_options}; diff --git a/parquet-variant-compute/src/variant_array.rs b/parquet-variant-compute/src/variant_array.rs index 4abffa65c23f..3dea8e48b0e4 100644 --- a/parquet-variant-compute/src/variant_array.rs +++ b/parquet-variant-compute/src/variant_array.rs @@ -18,36 +18,194 @@ //! [`VariantArray`] implementation use crate::type_conversion::primitive_conversion_single_value; -use arrow::array::{Array, ArrayData, ArrayRef, AsArray, BinaryViewArray, StructArray}; +use arrow::array::{Array, ArrayRef, AsArray, BinaryViewArray, StructArray}; use arrow::buffer::NullBuffer; +use arrow::compute::cast; use arrow::datatypes::{ Float16Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, UInt16Type, UInt32Type, UInt64Type, UInt8Type, }; +use arrow_schema::extension::ExtensionType; use arrow_schema::{ArrowError, DataType, Field, FieldRef, Fields}; use parquet_variant::Uuid; use parquet_variant::Variant; -use std::any::Any; use std::sync::Arc; +/// Arrow Variant [`ExtensionType`]. +/// +/// Represents the canonical Arrow Extension Type for storing variants. +/// See [`VariantArray`] for more examples of using this extension type. +pub struct VariantType; + +impl ExtensionType for VariantType { + const NAME: &'static str = "parquet.variant"; + + // Variants have no extension metadata + type Metadata = (); + + fn metadata(&self) -> &Self::Metadata { + &() + } + + fn serialize_metadata(&self) -> Option { + None + } + + fn deserialize_metadata(_metadata: Option<&str>) -> Result { + Ok(()) + } + + fn supports_data_type(&self, data_type: &DataType) -> Result<(), ArrowError> { + // Note don't check for metadata/value fields here because they may be + // absent in shredded variants + if matches!(data_type, DataType::Struct(_)) { + Ok(()) + } else { + Err(ArrowError::InvalidArgumentError(format!( + "VariantType only supports StructArray, got {}", + data_type + ))) + } + } + + fn try_new(data_type: &DataType, _metadata: Self::Metadata) -> Result { + let new_self = Self; + new_self.supports_data_type(data_type)?; + Ok(new_self) + } +} + /// An array of Parquet [`Variant`] values /// /// A [`VariantArray`] wraps an Arrow [`StructArray`] that stores the underlying /// `metadata` and `value` fields, and adds convenience methods to access -/// the `Variant`s +/// the [`Variant`]s. +/// +/// See [`VariantArrayBuilder`] for constructing `VariantArray` row by row. /// -/// See [`VariantArrayBuilder`] for constructing a `VariantArray`. +/// See the examples below from converting between `VariantArray` and +/// `StructArray`. /// /// [`VariantArrayBuilder`]: crate::VariantArrayBuilder /// -/// # Specification +/// # Documentation /// -/// 1. This code follows the conventions for storing variants in Arrow `StructArray` -/// defined by [Extension Type for Parquet Variant arrow] and this [document]. -/// At the time of this writing, this is not yet a standardized Arrow extension type. +/// At the time of this writing, Variant has been accepted as an official +/// extension type but not been published to the [official list of extension +/// types] on the Apache Arrow website. See the [Extension Type for Parquet +/// Variant arrow] ticket for more details. /// /// [Extension Type for Parquet Variant arrow]: https://github.com/apache/arrow/issues/46908 -/// [document]: https://docs.google.com/document/d/1pw0AWoMQY3SjD7R4LgbPvMjG_xSCtXp3rZHkVp9jpZ4/edit?usp=sharing +/// [official list of extension types]: https://arrow.apache.org/docs/format/CanonicalExtensions.html +/// +/// # Example: Check if a [`StructArray`] has the [`VariantType`] extension +/// +/// Arrow Arrays only provide [`DataType`], but the extension type information +/// is stored on a [`Field`]. Thus, you must have access to the [`Schema`] or +/// [`Field`] to check for the extension type. +/// +/// [`Schema`]: arrow_schema::Schema +/// ``` +/// # use arrow::array::StructArray; +/// # use arrow_schema::{Schema, Field, DataType}; +/// # use parquet_variant::Variant; +/// # use parquet_variant_compute::{VariantArrayBuilder, VariantArray, VariantType}; +/// # fn get_variant_array() -> VariantArray { +/// # let mut builder = VariantArrayBuilder::new(10); +/// # builder.append_variant(Variant::from("such wow")); +/// # builder.build() +/// # } +/// # fn get_schema() -> Schema { +/// # Schema::new(vec![ +/// # Field::new("id", DataType::Int32, false), +/// # get_variant_array().field("var"), +/// # ]) +/// # } +/// let schema = get_schema(); +/// assert_eq!(schema.fields().len(), 2); +/// // first field is not a Variant +/// assert!(schema.field(0).try_extension_type::().is_err()); +/// // second field is a Variant +/// assert!(schema.field(1).try_extension_type::().is_ok()); +/// ``` +/// +/// # Example: Constructing the correct [`Field`] for a [`VariantArray`] +/// +/// You can construct the correct [`Field`] for a [`VariantArray`] using the +/// [`VariantArray::field`] method. +/// +/// ``` +/// # use arrow_schema::{Schema, Field, DataType}; +/// # use parquet_variant::Variant; +/// # use parquet_variant_compute::{VariantArrayBuilder, VariantArray, VariantType}; +/// # fn get_variant_array() -> VariantArray { +/// # let mut builder = VariantArrayBuilder::new(10); +/// # builder.append_variant(Variant::from("such wow")); +/// # builder.build() +/// # } +/// let variant_array = get_variant_array(); +/// // First field is an integer id, second field is a variant +/// let schema = Schema::new(vec![ +/// Field::new("id", DataType::Int32, false), +/// // call VariantArray::field to get the correct Field +/// variant_array.field("var"), +/// ]); +/// ``` +/// +/// You can also construct the [`Field`] using [`VariantType`] directly +/// +/// ``` +/// # use arrow_schema::{Schema, Field, DataType}; +/// # use parquet_variant::Variant; +/// # use parquet_variant_compute::{VariantArrayBuilder, VariantArray, VariantType}; +/// # fn get_variant_array() -> VariantArray { +/// # let mut builder = VariantArrayBuilder::new(10); +/// # builder.append_variant(Variant::from("such wow")); +/// # builder.build() +/// # } +/// # let variant_array = get_variant_array(); +/// // The DataType of a VariantArray varies depending on how it is shredded +/// let data_type = variant_array.data_type().clone(); +/// // First field is an integer id, second field is a variant +/// let schema = Schema::new(vec![ +/// Field::new("id", DataType::Int32, false), +/// Field::new("var", data_type, false) +/// // Add extension metadata to the field using `VariantType` +/// .with_extension_type(VariantType), +/// ]); +/// ``` +/// +/// # Example: Converting a [`VariantArray`] to a [`StructArray`] +/// +/// ``` +/// # use arrow::array::StructArray; +/// # use parquet_variant::Variant; +/// # use parquet_variant_compute::VariantArrayBuilder; +/// // Create Variant Array +/// let mut builder = VariantArrayBuilder::new(10); +/// builder.append_variant(Variant::from("such wow")); +/// let variant_array = builder.build(); +/// // convert to StructArray +/// let struct_array: StructArray = variant_array.into(); +/// ``` +/// +/// # Example: Converting a [`StructArray`] to a [`VariantArray`] +/// +/// ``` +/// # use arrow::array::StructArray; +/// # use parquet_variant::Variant; +/// # use parquet_variant_compute::{VariantArrayBuilder, VariantArray}; +/// # fn get_struct_array() -> StructArray { +/// # let mut builder = VariantArrayBuilder::new(10); +/// # builder.append_variant(Variant::from("such wow")); +/// # builder.build().into() +/// # } +/// let struct_array: StructArray = get_struct_array(); +/// // try and create a VariantArray from it +/// let variant_array = VariantArray::try_new(&struct_array).unwrap(); +/// assert_eq!(variant_array.value(0), Variant::from("such wow")); +/// ``` +/// #[derive(Debug)] pub struct VariantArray { /// Reference to the underlying StructArray @@ -88,7 +246,11 @@ impl VariantArray { /// int8. /// /// Currently, only [`BinaryViewArray`] are supported. - pub fn try_new(inner: ArrayRef) -> Result { + pub fn try_new(inner: &dyn Array) -> Result { + // Workaround lack of support for Binary + // https://github.com/apache/arrow-rs/issues/8387 + let inner = cast_to_binary_view_arrays(inner)?; + let Some(inner) = inner.as_struct_opt() else { return Err(ArrowError::InvalidArgumentError( "Invalid VariantArray: requires StructArray as input".to_string(), @@ -246,6 +408,67 @@ impl VariantArray { pub fn typed_value_field(&self) -> Option<&ArrayRef> { self.shredding_state.typed_value_field() } + + /// Return a field to represent this VariantArray in a `Schema` with + /// a particular name + pub fn field(&self, name: impl Into) -> Field { + Field::new( + name.into(), + self.data_type().clone(), + self.inner.is_nullable(), + ) + .with_extension_type(VariantType) + } + + /// Returns a new DataType representing this VariantArray's inner type + pub fn data_type(&self) -> &DataType { + self.inner.data_type() + } + + pub fn slice(&self, offset: usize, length: usize) -> Self { + let inner = self.inner.slice(offset, length); + let metadata = self.metadata.slice(offset, length); + let shredding_state = self.shredding_state.slice(offset, length); + Self { + inner, + metadata, + shredding_state, + } + } + + pub fn len(&self) -> usize { + self.inner.len() + } + + pub fn is_empty(&self) -> bool { + self.inner.is_empty() + } + + pub fn nulls(&self) -> Option<&NullBuffer> { + self.inner.nulls() + } + + /// Is the element at index null? + pub fn is_null(&self, index: usize) -> bool { + self.nulls().map(|n| n.is_null(index)).unwrap_or_default() + } + + /// Is the element at index valid (not null)? + pub fn is_valid(&self, index: usize) -> bool { + !self.is_null(index) + } +} + +impl From for StructArray { + fn from(variant_array: VariantArray) -> Self { + variant_array.into_inner() + } +} + +impl From for ArrayRef { + fn from(variant_array: VariantArray) -> Self { + Arc::new(variant_array.into_inner()) + } } /// One shredded field of a partially or prefectly shredded variant. For example, suppose the @@ -318,18 +541,25 @@ impl ShreddedVariantFieldArray { )); }; + let shredding_state = Self::shredding_state_from_struct_array(inner_struct)?; + Ok(Self { + inner: inner_struct.clone(), + shredding_state, + }) + } + + /// Creates a new `ShreddingState` from a [`StructArray`] representing a potentially + /// shredded variant. + pub(crate) fn shredding_state_from_struct_array( + inner_struct: &StructArray, + ) -> Result { // Extract value and typed_value fields (metadata is not expected in ShreddedVariantFieldArray) let value = inner_struct .column_by_name("value") .and_then(|col| col.as_binary_view_opt().cloned()); let typed_value = inner_struct.column_by_name("typed_value").cloned(); - // Note this clone is cheap, it just bumps the ref count - let inner = inner_struct.clone(); - Ok(Self { - inner: inner.clone(), - shredding_state: ShreddingState::try_new(value, typed_value)?, - }) + ShreddingState::try_new(value, typed_value) } /// Return the shredding state of this `VariantArray` @@ -351,59 +581,45 @@ impl ShreddedVariantFieldArray { pub fn inner(&self) -> &StructArray { &self.inner } -} - -impl Array for ShreddedVariantFieldArray { - fn as_any(&self) -> &dyn Any { - self - } - - fn to_data(&self) -> ArrayData { - self.inner.to_data() - } - fn into_data(self) -> ArrayData { - self.inner.into_data() + /// Returns the inner [`StructArray`], consuming self + pub fn into_inner(self) -> StructArray { + self.inner } - fn data_type(&self) -> &DataType { + pub fn data_type(&self) -> &DataType { self.inner.data_type() } - fn slice(&self, offset: usize, length: usize) -> ArrayRef { - let inner = self.inner.slice(offset, length); - let shredding_state = self.shredding_state.slice(offset, length); - Arc::new(Self { - inner, - shredding_state, - }) - } - - fn len(&self) -> usize { + pub fn len(&self) -> usize { self.inner.len() } - fn is_empty(&self) -> bool { + pub fn is_empty(&self) -> bool { self.inner.is_empty() } - fn offset(&self) -> usize { + pub fn offset(&self) -> usize { self.inner.offset() } - fn nulls(&self) -> Option<&NullBuffer> { + pub fn nulls(&self) -> Option<&NullBuffer> { // According to the shredding spec, ShreddedVariantFieldArray should be // physically non-nullable - SQL NULL is inferred by both value and // typed_value being physically NULL None } +} - fn get_buffer_memory_size(&self) -> usize { - self.inner.get_buffer_memory_size() +impl From for ArrayRef { + fn from(array: ShreddedVariantFieldArray) -> Self { + Arc::new(array.into_inner()) } +} - fn get_array_memory_size(&self) -> usize { - self.inner.get_array_memory_size() +impl From for StructArray { + fn from(array: ShreddedVariantFieldArray) -> Self { + array.into_inner() } } @@ -425,7 +641,7 @@ impl Array for ShreddedVariantFieldArray { /// | non-null | non-null | The value is present and is a partially shredded object | /// /// [Parquet Variant Shredding Spec]: https://github.com/apache/parquet-format/blob/master/VariantShredding.md#value-shredding -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum ShreddingState { /// This variant has no typed_value field Unshredded { value: BinaryViewArray }, @@ -627,70 +843,57 @@ fn typed_value_to_variant(typed_value: &ArrayRef, index: usize) -> Variant<'_, ' } } -impl Array for VariantArray { - fn as_any(&self) -> &dyn Any { - self - } - - fn to_data(&self) -> ArrayData { - self.inner.to_data() - } - - fn into_data(self) -> ArrayData { - self.inner.into_data() - } - - fn data_type(&self) -> &DataType { - self.inner.data_type() - } - - fn slice(&self, offset: usize, length: usize) -> ArrayRef { - let inner = self.inner.slice(offset, length); - let metadata = self.metadata.slice(offset, length); - let shredding_state = self.shredding_state.slice(offset, length); - Arc::new(Self { - inner, - metadata, - shredding_state, - }) - } - - fn len(&self) -> usize { - self.inner.len() - } - - fn is_empty(&self) -> bool { - self.inner.is_empty() - } - - fn offset(&self) -> usize { - self.inner.offset() - } - - fn nulls(&self) -> Option<&NullBuffer> { - self.inner.nulls() - } - - fn get_buffer_memory_size(&self) -> usize { - self.inner.get_buffer_memory_size() - } +/// Workaround for lack of direct support for BinaryArray +/// +/// +/// The values are read as +/// * `StructArray` +/// +/// but VariantArray needs them as +/// * `StructArray` +/// +/// So cast them to get the right type. +fn cast_to_binary_view_arrays(array: &dyn Array) -> Result { + let new_type = map_type(array.data_type()); + cast(array, &new_type) +} - fn get_array_memory_size(&self) -> usize { - self.inner.get_array_memory_size() +/// replaces all instances of Binary with BinaryView in a DataType +fn map_type(data_type: &DataType) -> DataType { + match data_type { + DataType::Binary => DataType::BinaryView, + DataType::List(field) => { + let new_field = field + .as_ref() + .clone() + .with_data_type(map_type(field.data_type())); + DataType::List(Arc::new(new_field)) + } + DataType::Struct(fields) => { + let new_fields: Fields = fields + .iter() + .map(|f| { + let new_field = f.as_ref().clone().with_data_type(map_type(f.data_type())); + Arc::new(new_field) + }) + .collect(); + DataType::Struct(new_fields) + } + _ => data_type.clone(), } } #[cfg(test)] mod test { use super::*; - use arrow::array::{BinaryArray, BinaryViewArray}; + use arrow::array::{BinaryViewArray, Int32Array}; use arrow_schema::{Field, Fields}; #[test] fn invalid_not_a_struct_array() { let array = make_binary_view_array(); // Should fail because the input is not a StructArray - let err = VariantArray::try_new(array); + let err = VariantArray::try_new(&array); assert_eq!( err.unwrap_err().to_string(), "Invalid argument error: Invalid VariantArray: requires StructArray as input" @@ -702,7 +905,7 @@ mod test { let fields = Fields::from(vec![Field::new("value", DataType::BinaryView, true)]); let array = StructArray::new(fields, vec![make_binary_view_array()], None); // Should fail because the StructArray does not contain a 'metadata' field - let err = VariantArray::try_new(Arc::new(array)); + let err = VariantArray::try_new(&array); assert_eq!( err.unwrap_err().to_string(), "Invalid argument error: Invalid VariantArray: StructArray must contain a 'metadata' field" @@ -717,7 +920,7 @@ mod test { // NOTE: By strict spec interpretation, this case (top-level variant with null/null) // should be invalid, but we currently allow it and treat it as Variant::Null. // This is a pragmatic decision to handle missing data gracefully. - let variant_array = VariantArray::try_new(Arc::new(array)).unwrap(); + let variant_array = VariantArray::try_new(&array).unwrap(); // Verify the shredding state is AllNull assert!(matches!( @@ -736,18 +939,18 @@ mod test { #[test] fn invalid_metadata_field_type() { let fields = Fields::from(vec![ - Field::new("metadata", DataType::Binary, true), // Not yet supported + Field::new("metadata", DataType::Int32, true), // not supported Field::new("value", DataType::BinaryView, true), ]); let array = StructArray::new( fields, - vec![make_binary_array(), make_binary_view_array()], + vec![make_int32_array(), make_binary_view_array()], None, ); - let err = VariantArray::try_new(Arc::new(array)); + let err = VariantArray::try_new(&array); assert_eq!( err.unwrap_err().to_string(), - "Not yet implemented: VariantArray 'metadata' field must be BinaryView, got Binary" + "Not yet implemented: VariantArray 'metadata' field must be BinaryView, got Int32" ); } @@ -755,17 +958,17 @@ mod test { fn invalid_value_field_type() { let fields = Fields::from(vec![ Field::new("metadata", DataType::BinaryView, true), - Field::new("value", DataType::Binary, true), // Not yet supported + Field::new("value", DataType::Int32, true), // Not yet supported ]); let array = StructArray::new( fields, - vec![make_binary_view_array(), make_binary_array()], + vec![make_binary_view_array(), make_int32_array()], None, ); - let err = VariantArray::try_new(Arc::new(array)); + let err = VariantArray::try_new(&array); assert_eq!( err.unwrap_err().to_string(), - "Not yet implemented: VariantArray 'value' field must be BinaryView, got Binary" + "Not yet implemented: VariantArray 'value' field must be BinaryView, got Int32" ); } @@ -773,8 +976,8 @@ mod test { Arc::new(BinaryViewArray::from(vec![b"test" as &[u8]])) } - fn make_binary_array() -> ArrayRef { - Arc::new(BinaryArray::from(vec![b"test" as &[u8]])) + fn make_int32_array() -> ArrayRef { + Arc::new(Int32Array::from(vec![1])) } #[test] @@ -793,7 +996,7 @@ mod test { let fields = Fields::from(vec![Field::new("metadata", DataType::BinaryView, false)]); let struct_array = StructArray::new(fields, vec![Arc::new(metadata)], Some(nulls)); - let variant_array = VariantArray::try_new(Arc::new(struct_array)).unwrap(); + let variant_array = VariantArray::try_new(&struct_array).unwrap(); // Verify the shredding state is AllNull assert!(matches!( @@ -843,7 +1046,7 @@ mod test { None, // struct itself is not null, just the value field is all null ); - let variant_array = VariantArray::try_new(Arc::new(struct_array)).unwrap(); + let variant_array = VariantArray::try_new(&struct_array).unwrap(); // This should be Unshredded, not AllNull, because value field exists in schema assert!(matches!( diff --git a/parquet-variant-compute/src/variant_array_builder.rs b/parquet-variant-compute/src/variant_array_builder.rs index 6451e3565802..68c1fd6b5492 100644 --- a/parquet-variant-compute/src/variant_array_builder.rs +++ b/parquet-variant-compute/src/variant_array_builder.rs @@ -133,7 +133,7 @@ impl VariantArrayBuilder { ); // TODO add arrow extension type metadata - VariantArray::try_new(Arc::new(inner)).expect("valid VariantArray by construction") + VariantArray::try_new(&inner).expect("valid VariantArray by construction") } /// Appends a null row to the builder. diff --git a/parquet-variant-compute/src/variant_get.rs b/parquet-variant-compute/src/variant_get.rs index 0e111685169b..5c3d4c155dcc 100644 --- a/parquet-variant-compute/src/variant_get.rs +++ b/parquet-variant-compute/src/variant_get.rs @@ -27,11 +27,12 @@ use crate::variant_array::{ShreddedVariantFieldArray, ShreddingState}; use crate::variant_to_arrow::make_variant_to_arrow_row_builder; use crate::VariantArray; +use arrow::array::AsArray; use std::sync::Arc; -pub(crate) enum ShreddedPathStep<'a> { +pub(crate) enum ShreddedPathStep { /// Path step succeeded, return the new shredding state - Success(&'a ShreddingState), + Success(ShreddingState), /// The path element is not present in the `typed_value` column and there is no `value` column, /// so we we know it does not exist. It, and all paths under it, are all-NULL. Missing, @@ -46,11 +47,11 @@ pub(crate) enum ShreddedPathStep<'a> { /// level, or if `typed_value` is not a struct, or if the requested field name does not exist. /// /// TODO: Support `VariantPathElement::Index`? It wouldn't be easy, and maybe not even possible. -pub(crate) fn follow_shredded_path_element<'a>( - shredding_state: &'a ShreddingState, +pub(crate) fn follow_shredded_path_element( + shredding_state: &ShreddingState, path_element: &VariantPathElement<'_>, cast_options: &CastOptions, -) -> Result> { +) -> Result { // If the requested path element is not present in `typed_value`, and `value` is missing, then // we know it does not exist; it, and all paths under it, are all-NULL. let missing_path_step = || { @@ -87,20 +88,20 @@ pub(crate) fn follow_shredded_path_element<'a>( return Ok(missing_path_step()); }; - let field = field - .as_any() - .downcast_ref::() - .ok_or_else(|| { - // TODO: Should we blow up? Or just end the traversal and let the normal - // variant pathing code sort out the mess that it must anyway be - // prepared to handle? - ArrowError::InvalidArgumentError(format!( - "Expected a ShreddedVariantFieldArray, got {:?} instead", - field.data_type(), - )) - })?; - - Ok(ShreddedPathStep::Success(field.shredding_state())) + let struct_array = field.as_struct_opt().ok_or_else(|| { + // TODO: Should we blow up? Or just end the traversal and let the normal + // variant pathing code sort out the mess that it must anyway be + // prepared to handle? + ArrowError::InvalidArgumentError(format!( + "Expected Struct array while following path, got {}", + field.data_type(), + )) + })?; + + let shredding_state = + ShreddedVariantFieldArray::shredding_state_from_struct_array(struct_array)?; + + Ok(ShreddedPathStep::Success(shredding_state)) } VariantPathElement::Index { .. } => { // TODO: Support array indexing. Among other things, it will require slicing not @@ -154,11 +155,11 @@ fn shredded_get_path( // Peel away the prefix of path elements that traverses the shredded parts of this variant // column. Shredding will traverse the rest of the path on a per-row basis. - let mut shredding_state = input.shredding_state(); + let mut shredding_state = input.shredding_state().clone(); let mut accumulated_nulls = input.inner().nulls().cloned(); let mut path_index = 0; for path_element in path { - match follow_shredded_path_element(shredding_state, path_element, cast_options)? { + match follow_shredded_path_element(&shredding_state, path_element, cast_options)? { ShreddedPathStep::Success(state) => { // Union nulls from the typed_value we just accessed if let Some(typed_value) = shredding_state.typed_value_field() { @@ -199,7 +200,7 @@ fn shredded_get_path( // If our caller did not request any specific type, we can just return whatever we landed on. let Some(as_field) = as_field else { - return Ok(Arc::new(target)); + return Ok(ArrayRef::from(target)); }; // Structs are special. Recurse into each field separately, hoping to follow the shredding even @@ -242,11 +243,7 @@ fn shredded_get_path( /// quickly become annoying (and inefficient) to call `variant_get` for each leaf value in a struct or /// list and then try to assemble the results. pub fn variant_get(input: &ArrayRef, options: GetOptions) -> Result { - let variant_array: &VariantArray = input.as_any().downcast_ref().ok_or_else(|| { - ArrowError::InvalidArgumentError( - "expected a VariantArray as the input for variant_get".to_owned(), - ) - })?; + let variant_array = VariantArray::try_new(input)?; let GetOptions { as_type, @@ -254,7 +251,7 @@ pub fn variant_get(input: &ArrayRef, options: GetOptions) -> Result { cast_options, } = options; - shredded_get_path(variant_array, &path, as_type.as_deref(), &cast_options) + shredded_get_path(&variant_array, &path, as_type.as_deref(), &cast_options) } /// Controls the action of the variant_get kernel. @@ -303,9 +300,9 @@ mod test { use std::sync::Arc; use arrow::array::{ - Array, ArrayRef, BinaryViewArray, Float16Array, Float32Array, Float64Array, Int16Array, - Int32Array, Int64Array, Int8Array, StringArray, StructArray, UInt16Array, UInt32Array, - UInt64Array, UInt8Array, + Array, ArrayRef, AsArray, BinaryViewArray, BooleanArray, FixedSizeBinaryArray, + Float16Array, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, + StringArray, StructArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array, }; use arrow::buffer::NullBuffer; use arrow::compute::CastOptions; @@ -322,8 +319,7 @@ mod test { fn single_variant_get_test(input_json: &str, path: VariantPath, expected_json: &str) { // Create input array from JSON string let input_array_ref: ArrayRef = Arc::new(StringArray::from(vec![Some(input_json)])); - let input_variant_array_ref: ArrayRef = - Arc::new(json_to_variant(&input_array_ref).unwrap()); + let input_variant_array_ref = ArrayRef::from(json_to_variant(&input_array_ref).unwrap()); let result = variant_get(&input_variant_array_ref, GetOptions::new_with_path(path)).unwrap(); @@ -332,7 +328,7 @@ mod test { let expected_array_ref: ArrayRef = Arc::new(StringArray::from(vec![Some(expected_json)])); let expected_variant_array = json_to_variant(&expected_array_ref).unwrap(); - let result_array: &VariantArray = result.as_any().downcast_ref().unwrap(); + let result_array = VariantArray::try_new(&result).unwrap(); assert_eq!( result_array.len(), 1, @@ -408,7 +404,7 @@ mod test { let result = variant_get(&array, options).unwrap(); // expect the result is a VariantArray - let result: &VariantArray = result.as_any().downcast_ref().unwrap(); + let result = VariantArray::try_new(&result).unwrap(); assert_eq!(result.len(), 4); // Expect the values are the same as the original values @@ -487,7 +483,7 @@ mod test { let result = variant_get(&array, options).unwrap(); // expect the result is a VariantArray - let result: &VariantArray = result.as_any().downcast_ref().unwrap(); + let result = VariantArray::try_new(&result).unwrap(); assert_eq!(result.len(), 4); // Expect the values are the same as the original values @@ -504,7 +500,7 @@ mod test { let result = variant_get(&array, options).unwrap(); // expect the result is a VariantArray - let result: &VariantArray = result.as_any().downcast_ref().unwrap(); + let result = VariantArray::try_new(&result).unwrap(); assert_eq!(result.len(), 4); // Expect the values are the same as the original values @@ -521,7 +517,7 @@ mod test { let result = variant_get(&array, options).unwrap(); // expect the result is a VariantArray - let result: &VariantArray = result.as_any().downcast_ref().unwrap(); + let result = VariantArray::try_new(&result).unwrap(); assert_eq!(result.len(), 4); // Expect the values are the same as the original values @@ -538,7 +534,7 @@ mod test { let result = variant_get(&array, options).unwrap(); // expect the result is a VariantArray - let result: &VariantArray = result.as_any().downcast_ref().unwrap(); + let result = VariantArray::try_new(&result).unwrap(); assert_eq!(result.len(), 4); // Expect the values are the same as the original values @@ -593,7 +589,7 @@ mod test { let result = variant_get(&array, options).unwrap(); // expect the result is a VariantArray - let result: &VariantArray = result.as_any().downcast_ref().unwrap(); + let result = VariantArray::try_new(&result).unwrap(); assert_eq!(result.len(), 3); // Expect the values are the same as the original values @@ -675,7 +671,7 @@ mod test { let result = variant_get(&array, options).unwrap(); // expect the result is a VariantArray - let result: &VariantArray = result.as_any().downcast_ref().unwrap(); + let result = VariantArray::try_new(&result).unwrap(); assert_eq!(result.len(), 3); // All values should be null @@ -795,10 +791,9 @@ mod test { .with_field("typed_value", Arc::new(typed_value), true) .build(); - Arc::new( - VariantArray::try_new(Arc::new(struct_array)) - .expect("should create variant array"), - ) + VariantArray::try_new(&struct_array) + .expect("should create variant array") + .into() } }; } @@ -926,10 +921,7 @@ mod test { .with_nulls(nulls) .build(); - Arc::new( - VariantArray::try_new(Arc::new(struct_array)) - .expect("should create variant array"), - ) + Arc::new(struct_array) } }; } @@ -1017,7 +1009,7 @@ mod test { None, // row 3 is shredded, so no value ]); - let typed_value = arrow::array::BooleanArray::from(vec![ + let typed_value = BooleanArray::from(vec![ Some(true), // row 0 is shredded, so it has a value None, // row 1 is null, so no value None, // row 2 is a string, so no typed value @@ -1031,9 +1023,7 @@ mod test { .with_nulls(nulls) .build(); - Arc::new( - VariantArray::try_new(Arc::new(struct_array)).expect("should create variant array"), - ) + Arc::new(struct_array) } /// Return a VariantArray that represents a partially "shredded" variant for fixed size binary @@ -1077,7 +1067,7 @@ mod test { false, // row 2 is string true, // row 3 has value ]); - let typed_value = arrow::array::FixedSizeBinaryArray::try_new( + let typed_value = FixedSizeBinaryArray::try_new( 3, // byte width arrow::buffer::Buffer::from(data), Some(typed_value_nulls), @@ -1091,9 +1081,7 @@ mod test { .with_nulls(nulls) .build(); - Arc::new( - VariantArray::try_new(Arc::new(struct_array)).expect("should create variant array"), - ) + Arc::new(struct_array) } /// Return a VariantArray that represents a partially "shredded" variant for UTF8 @@ -1138,9 +1126,7 @@ mod test { .with_nulls(nulls) .build(); - Arc::new( - VariantArray::try_new(Arc::new(struct_array)).expect("should create variant array"), - ) + Arc::new(struct_array) } /// Return a VariantArray that represents a partially "shredded" variant for BinaryView @@ -1185,9 +1171,7 @@ mod test { .with_nulls(nulls) .build(); - Arc::new( - VariantArray::try_new(Arc::new(struct_array)).expect("should create variant array"), - ) + Arc::new(struct_array) } /// Return a VariantArray that represents an "all null" variant @@ -1222,9 +1206,7 @@ mod test { .with_nulls(nulls) .build(); - Arc::new( - VariantArray::try_new(Arc::new(struct_array)).expect("should create variant array"), - ) + Arc::new(struct_array) } /// This test manually constructs a shredded variant array representing objects /// like {"x": 1, "y": "foo"} and {"x": 42} and tests extracting the "x" field @@ -1237,7 +1219,7 @@ mod test { let options = GetOptions::new_with_path(VariantPath::from("x")); let result = variant_get(&array, options).unwrap(); - let result_variant: &VariantArray = result.as_any().downcast_ref().unwrap(); + let result_variant = VariantArray::try_new(&result).unwrap(); assert_eq!(result_variant.len(), 2); // Row 0: expect x=1 @@ -1325,7 +1307,7 @@ mod test { )]); let typed_value_struct = StructArray::try_new( typed_value_fields, - vec![Arc::new(x_field_shredded)], + vec![ArrayRef::from(x_field_shredded)], None, // No nulls - both rows have the object structure ) .unwrap(); @@ -1337,7 +1319,7 @@ mod test { .with_field("typed_value", Arc::new(typed_value_struct), true) .build(); - Arc::new(VariantArray::try_new(Arc::new(main_struct)).expect("should create variant array")) + Arc::new(main_struct) } /// Simple test to check if nested paths are supported by current implementation @@ -1580,7 +1562,7 @@ mod test { } } - Arc::new(builder.build()) + ArrayRef::from(builder.build()) } /// Create test data for depth 1 (single nested field) @@ -1610,7 +1592,7 @@ mod test { } } - Arc::new(builder.build()) + ArrayRef::from(builder.build()) } /// Create test data for depth 2 (double nested field) @@ -1651,7 +1633,7 @@ mod test { } } - Arc::new(builder.build()) + ArrayRef::from(builder.build()) } /// Create simple shredded test data for depth 0 using a simplified working pattern @@ -1702,9 +1684,12 @@ mod test { x_field_shredded.data_type().clone(), true, )]); - let typed_value_struct = - StructArray::try_new(typed_value_fields, vec![Arc::new(x_field_shredded)], None) - .unwrap(); + let typed_value_struct = StructArray::try_new( + typed_value_fields, + vec![ArrayRef::from(x_field_shredded)], + None, + ) + .unwrap(); // Build final VariantArray let struct_array = StructArrayBuilder::new() @@ -1713,7 +1698,7 @@ mod test { .with_field("typed_value", Arc::new(typed_value_struct), true) .build(); - Arc::new(VariantArray::try_new(Arc::new(struct_array)).expect("should create VariantArray")) + Arc::new(struct_array) } /// Create working depth 1 shredded test data based on the existing working pattern @@ -1799,8 +1784,12 @@ mod test { .with_field( "typed_value", Arc::new( - StructArray::try_new(a_inner_fields, vec![Arc::new(x_field_shredded)], None) - .unwrap(), + StructArray::try_new( + a_inner_fields, + vec![ArrayRef::from(x_field_shredded)], + None, + ) + .unwrap(), ), true, ) @@ -1815,9 +1804,12 @@ mod test { a_field_shredded.data_type().clone(), true, )]); - let typed_value_struct = - StructArray::try_new(typed_value_fields, vec![Arc::new(a_field_shredded)], None) - .unwrap(); + let typed_value_struct = StructArray::try_new( + typed_value_fields, + vec![ArrayRef::from(a_field_shredded)], + None, + ) + .unwrap(); // Build final VariantArray let struct_array = StructArrayBuilder::new() @@ -1826,7 +1818,7 @@ mod test { .with_field("typed_value", Arc::new(typed_value_struct), true) .build(); - Arc::new(VariantArray::try_new(Arc::new(struct_array)).expect("should create VariantArray")) + Arc::new(struct_array) } /// Create working depth 2 shredded test data for "a.b.x" paths @@ -1903,8 +1895,12 @@ mod test { .with_field( "typed_value", Arc::new( - StructArray::try_new(b_inner_fields, vec![Arc::new(x_field_shredded)], None) - .unwrap(), + StructArray::try_new( + b_inner_fields, + vec![ArrayRef::from(x_field_shredded)], + None, + ) + .unwrap(), ), true, ) @@ -1936,8 +1932,12 @@ mod test { .with_field( "typed_value", Arc::new( - StructArray::try_new(a_inner_fields, vec![Arc::new(b_field_shredded)], None) - .unwrap(), + StructArray::try_new( + a_inner_fields, + vec![ArrayRef::from(b_field_shredded)], + None, + ) + .unwrap(), ), true, ) @@ -1952,9 +1952,12 @@ mod test { a_field_shredded.data_type().clone(), true, )]); - let typed_value_struct = - StructArray::try_new(typed_value_fields, vec![Arc::new(a_field_shredded)], None) - .unwrap(); + let typed_value_struct = StructArray::try_new( + typed_value_fields, + vec![ArrayRef::from(a_field_shredded)], + None, + ) + .unwrap(); // Build final VariantArray let struct_array = StructArrayBuilder::new() @@ -1963,7 +1966,7 @@ mod test { .with_field("typed_value", Arc::new(typed_value_struct), true) .build(); - Arc::new(VariantArray::try_new(Arc::new(struct_array)).expect("should create VariantArray")) + Arc::new(struct_array) } #[test] @@ -1984,7 +1987,7 @@ mod test { cast_options: CastOptions::default(), // safe = true }; - let variant_array_ref: Arc = variant_array.clone(); + let variant_array_ref: Arc = variant_array.clone(); let result = variant_get(&variant_array_ref, safe_options); // Should succeed and return NULLs (safe behavior) assert!(result.is_ok()); @@ -2041,7 +2044,7 @@ mod test { cast_options: CastOptions::default(), }; - let variant_array_ref: Arc = variant_array.clone(); + let variant_array_ref: Arc = variant_array.clone(); let result = variant_get(&variant_array_ref, options).unwrap(); // Verify the result length matches input @@ -2057,10 +2060,7 @@ mod test { ); // Verify the actual values - let int32_result = result - .as_any() - .downcast_ref::() - .unwrap(); + let int32_result = result.as_any().downcast_ref::().unwrap(); assert_eq!(int32_result.value(0), 55); // The valid Int32 value } @@ -2100,26 +2100,23 @@ mod test { cast_options: CastOptions::default(), }; - let variant_array_ref: Arc = Arc::new(variant_array); + let variant_array_ref = ArrayRef::from(variant_array); let result = variant_get(&variant_array_ref, options).unwrap(); // Verify the result is a StructArray - let struct_result = result - .as_any() - .downcast_ref::() - .unwrap(); + let struct_result = result.as_struct(); assert_eq!(struct_result.len(), 3); // Get the individual field arrays let field_a = struct_result .column(0) .as_any() - .downcast_ref::() + .downcast_ref::() .unwrap(); let field_b = struct_result .column(1) .as_any() - .downcast_ref::() + .downcast_ref::() .unwrap(); // Verify field values and nulls @@ -2181,13 +2178,13 @@ mod test { cast_options: CastOptions::default(), }; - let variant_array_ref: Arc = Arc::new(variant_array); + let variant_array_ref = ArrayRef::from(variant_array); let result_nullable = variant_get(&variant_array_ref, options_nullable).unwrap(); // Verify we get an Int32Array with nulls for cast failures let int32_result = result_nullable .as_any() - .downcast_ref::() + .downcast_ref::() .unwrap(); assert_eq!(int32_result.len(), 9); @@ -2236,11 +2233,11 @@ mod test { // Create variant array again since we moved it let variant_array_2 = json_to_variant(&string_array).unwrap(); - let variant_array_ref_2: Arc = Arc::new(variant_array_2); + let variant_array_ref_2 = ArrayRef::from(variant_array_2); let result_non_nullable = variant_get(&variant_array_ref_2, options_non_nullable).unwrap(); let int32_result_2 = result_non_nullable .as_any() - .downcast_ref::() + .downcast_ref::() .unwrap(); // Even with a non-nullable field, safe casting should still produce nulls for failures @@ -2553,7 +2550,7 @@ mod test { cast_options: CastOptions::default(), }; - let variant_array_ref: Arc = Arc::new(variant_array); + let variant_array_ref = ArrayRef::from(variant_array); let result = variant_get(&variant_array_ref, options); // Should fail with NotYetImplemented when the row builder tries to handle struct type @@ -2617,9 +2614,9 @@ mod test { let typed_value_struct = StructArray::try_new( typed_value_fields, vec![ - Arc::new(a_field_shredded), - Arc::new(b_field_shredded), - Arc::new(c_field_shredded), + ArrayRef::from(a_field_shredded), + ArrayRef::from(b_field_shredded), + ArrayRef::from(c_field_shredded), ], None, ) @@ -2632,7 +2629,7 @@ mod test { .with_nulls(nulls) .build(); - Arc::new(VariantArray::try_new(Arc::new(struct_array)).expect("should create VariantArray")) + Arc::new(struct_array) } /// Create comprehensive nested shredded variant with diverse null patterns @@ -2655,7 +2652,7 @@ mod test { false, // row 3: top-level NULL ]); let outer_typed_value = StructArrayBuilder::new() - .with_field("inner", Arc::new(inner), false) + .with_field("inner", ArrayRef::from(inner), false) .with_nulls(outer_typed_value_nulls) .build(); @@ -2671,7 +2668,7 @@ mod test { false, // row 3: top-level NULL ]); let typed_value = StructArrayBuilder::new() - .with_field("outer", Arc::new(outer), false) + .with_field("outer", ArrayRef::from(outer), false) .with_nulls(typed_value_nulls) .build(); @@ -2690,7 +2687,7 @@ mod test { .with_nulls(nulls) .build(); - Arc::new(VariantArray::try_new(Arc::new(struct_array)).expect("should create VariantArray")) + Arc::new(struct_array) } /// Create variant with mixed shredding (spec-compliant) including null scenarios @@ -2748,7 +2745,7 @@ mod test { // Create main typed_value struct (only contains shredded fields) let typed_value_struct = StructArrayBuilder::new() - .with_field("x", Arc::new(x_field_shredded), false) + .with_field("x", ArrayRef::from(x_field_shredded), false) .build(); // Build VariantArray with both value and typed_value (PartiallyShredded) @@ -2761,6 +2758,6 @@ mod test { .with_nulls(variant_nulls) .build(); - Arc::new(VariantArray::try_new(Arc::new(struct_array)).expect("should create VariantArray")) + Arc::new(struct_array) } } diff --git a/parquet-variant-compute/src/variant_to_arrow.rs b/parquet-variant-compute/src/variant_to_arrow.rs index df9677edfb44..c32fc9b981ea 100644 --- a/parquet-variant-compute/src/variant_to_arrow.rs +++ b/parquet-variant-compute/src/variant_to_arrow.rs @@ -310,11 +310,13 @@ impl VariantToBinaryVariantArrowRowBuilder { } fn finish(mut self) -> Result { - Ok(Arc::new(VariantArray::from_parts( + let variant_array = VariantArray::from_parts( self.metadata, Some(self.builder.build()?), None, // no typed_value column self.nulls.finish(), - ))) + ); + + Ok(ArrayRef::from(variant_array)) } } diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index 5dbd4b5b39dd..b34928bc1f25 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -129,7 +129,7 @@ encryption = ["dep:ring"] flate2-rust_backened = ["flate2/rust_backend"] flate2-zlib-rs = ["flate2/zlib-rs"] # Enable parquet variant support -variant_experimental = ["parquet-variant", "parquet-variant-json", "parquet-variant-compute"] +variant_experimental = ["arrow", "parquet-variant", "parquet-variant-json", "parquet-variant-compute"] [[example]] diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 81765a800edd..fcb4b63fe7c0 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -2295,7 +2295,7 @@ mod tests { let batch = record_reader.next().unwrap().unwrap(); assert_eq!(batch.num_rows(), 1); - let expected_schema = Schema::new(Fields::from(vec![Field::new( + let expected_schema = Schema::new(vec![Field::new( "my_map", ArrowDataType::Map( Arc::new(Field::new( @@ -2309,7 +2309,7 @@ mod tests { false, ), true, - )])); + )]); assert_eq!(batch.schema().as_ref(), &expected_schema); assert_eq!(batch.num_rows(), 1); @@ -3106,11 +3106,11 @@ mod tests { let reader = builder.with_projection(mask).build().unwrap(); - let expected_schema = Schema::new(Fields::from(vec![Field::new( + let expected_schema = Schema::new(vec![Field::new( "group", ArrowDataType::Struct(vec![Field::new("leaf", ArrowDataType::Int32, false)].into()), true, - )])); + )]); let batch = reader.into_iter().next().unwrap().unwrap(); assert_eq!(batch.schema().as_ref(), &expected_schema); diff --git a/parquet/src/arrow/schema/complex.rs b/parquet/src/arrow/schema/complex.rs index 16d46bd852dc..ecc80a65904a 100644 --- a/parquet/src/arrow/schema/complex.rs +++ b/parquet/src/arrow/schema/complex.rs @@ -18,6 +18,7 @@ use std::collections::HashMap; use std::sync::Arc; +use crate::arrow::schema::extension::add_extension_type; use crate::arrow::schema::primitive::convert_primitive; use crate::arrow::{ProjectionMask, PARQUET_FIELD_ID_META_KEY}; use crate::basic::{ConvertedType, Repetition}; @@ -172,7 +173,7 @@ impl Visitor { let parquet_fields = struct_type.get_fields(); - // Extract the arrow fields + // Extract any arrow fields from the hints let arrow_fields = match &context.data_type { Some(DataType::Struct(fields)) => { if fields.len() != parquet_fields.len() { @@ -220,10 +221,10 @@ impl Visitor { data_type, }; - if let Some(child) = self.dispatch(parquet_field, child_ctx)? { + if let Some(mut child) = self.dispatch(parquet_field, child_ctx)? { // The child type returned may be different from what is encoded in the arrow // schema in the event of a mismatch or a projection - child_fields.push(convert_field(parquet_field, &child, arrow_field)); + child_fields.push(convert_field(parquet_field, &mut child, arrow_field)); children.push(child); } } @@ -352,13 +353,13 @@ impl Visitor { // Need both columns to be projected match (maybe_key, maybe_value) { - (Some(key), Some(value)) => { + (Some(mut key), Some(mut value)) => { let key_field = Arc::new( - convert_field(map_key, &key, arrow_key) + convert_field(map_key, &mut key, arrow_key) // The key is always non-nullable (#5630) .with_nullable(false), ); - let value_field = Arc::new(convert_field(map_value, &value, arrow_value)); + let value_field = Arc::new(convert_field(map_value, &mut value, arrow_value)); let field_metadata = match arrow_map { Some(field) => field.metadata().clone(), _ => HashMap::default(), @@ -495,8 +496,8 @@ impl Visitor { }; match self.dispatch(item_type, new_context) { - Ok(Some(item)) => { - let item_field = Arc::new(convert_field(item_type, &item, arrow_field)); + Ok(Some(mut item)) => { + let item_field = Arc::new(convert_field(item_type, &mut item, arrow_field)); // Use arrow type as hint for index size let arrow_type = match context.data_type { @@ -540,11 +541,15 @@ impl Visitor { } } -/// Computes the [`Field`] for a child column +/// Computes the Arrow [`Field`] for a child column /// -/// The resulting [`Field`] will have the type dictated by `field`, a name +/// The resulting Arrow [`Field`] will have the type dictated by the Parquet `field`, a name /// dictated by the `parquet_type`, and any metadata from `arrow_hint` -fn convert_field(parquet_type: &Type, field: &ParquetField, arrow_hint: Option<&Field>) -> Field { +fn convert_field( + parquet_type: &Type, + field: &mut ParquetField, + arrow_hint: Option<&Field>, +) -> Field { let name = parquet_type.name(); let data_type = field.arrow_type.clone(); let nullable = field.nullable; @@ -575,7 +580,7 @@ fn convert_field(parquet_type: &Type, field: &ParquetField, arrow_hint: Option<& ); ret.set_metadata(meta); } - ret + add_extension_type(ret, parquet_type) } } } diff --git a/parquet/src/arrow/schema/extension.rs b/parquet/src/arrow/schema/extension.rs new file mode 100644 index 000000000000..de70923392ce --- /dev/null +++ b/parquet/src/arrow/schema/extension.rs @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Arrow Extension Type Support for Parquet +//! +//! This module contains mapping code to map Parquet [`LogicalType`]s to/from +//! Arrow [`ExtensionType`]s. +//! +//! Extension types are represented using the metadata from Arrow [`Field`]s +//! with the key "ARROW:extension:name". + +use crate::basic::LogicalType; +use crate::schema::types::Type; +use arrow_schema::extension::ExtensionType; +use arrow_schema::Field; + +/// Adds extension type metadata, if necessary, based on the Parquet field's +/// [`LogicalType`] +/// +/// Some Parquet logical types, such as Variant, do not map directly to an +/// Arrow DataType, and instead are represented by an Arrow ExtensionType. +/// Extension types are attached to Arrow Fields via metadata. +pub(crate) fn add_extension_type(arrow_field: Field, parquet_type: &Type) -> Field { + let result = match parquet_type.get_basic_info().logical_type() { + #[cfg(feature = "variant_experimental")] + Some(LogicalType::Variant) => { + arrow_field.with_extension_type(parquet_variant_compute::VariantType) + } + // TODO add other LogicalTypes here + _ => arrow_field, + }; + result +} + +/// Return the Parquet logical type to use for the specified Arrow field, if any. +#[cfg(feature = "variant_experimental")] +pub(crate) fn logical_type_for_struct(field: &Field) -> Option { + use parquet_variant_compute::VariantType; + if field.extension_type_name()? == VariantType::NAME { + return Some(LogicalType::Variant); + }; + None +} + +#[cfg(not(feature = "variant_experimental"))] +pub(crate) fn logical_type_for_struct(field: &Field) -> Option { + None +} diff --git a/parquet/src/arrow/schema/mod.rs b/parquet/src/arrow/schema/mod.rs index 5b079b66276a..9d1098d86ca6 100644 --- a/parquet/src/arrow/schema/mod.rs +++ b/parquet/src/arrow/schema/mod.rs @@ -35,13 +35,14 @@ use crate::file::{metadata::KeyValue, properties::WriterProperties}; use crate::schema::types::{ColumnDescriptor, SchemaDescriptor, Type}; mod complex; +mod extension; mod primitive; +use super::PARQUET_FIELD_ID_META_KEY; +use crate::arrow::schema::extension::logical_type_for_struct; use crate::arrow::ProjectionMask; pub(crate) use complex::{ParquetField, ParquetFieldType}; -use super::PARQUET_FIELD_ID_META_KEY; - /// Convert Parquet schema to Arrow schema including optional metadata /// /// Attempts to decode any existing Arrow schema metadata, falling back @@ -63,7 +64,11 @@ pub fn parquet_to_arrow_schema_by_columns( Ok(parquet_to_arrow_schema_and_fields(parquet_schema, mask, key_value_metadata)?.0) } -/// Extracts the arrow metadata +/// Determines the Arrow Schema from a Parquet schema +/// +/// Looks for an Arrow schema metadata "hint" (see +/// [`parquet_to_arrow_field_levels`]), and uses it if present to ensure +/// lossless round trips. pub(crate) fn parquet_to_arrow_schema_and_fields( parquet_schema: &SchemaDescriptor, mask: ProjectionMask, @@ -728,6 +733,7 @@ fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result { .with_fields(fields) .with_repetition(repetition) .with_id(id) + .with_logical_type(logical_type_for_struct(field)) .build() } DataType::Map(field, _) => { diff --git a/parquet/src/variant.rs b/parquet/src/variant.rs index b5902c02ed8e..5197bf7cf42b 100644 --- a/parquet/src/variant.rs +++ b/parquet/src/variant.rs @@ -25,38 +25,36 @@ //! * [`Variant`] represents variant value, which can be an object, list, or primitive. //! * [`VariantBuilder`] for building `Variant` values. //! * [`VariantArray`] for representing a column of Variant values. -//! * [`compute`] module with functions for manipulating Variants, such as +//! * [`json_to_variant`] and [`variant_to_json`] for converting to/from JSON. +//! * [`cast_to_variant()`] for casting other Arrow arrays to `VariantArray`. +//! * [`VariantType`] Arrow ExtensionType for Parquet Variant logical type. //! [`variant_get`] to extracting a value by path and functions to convert //! between `Variant` and JSON. //! -//! [Variant Logical Type]: Variant -//! [`VariantArray`]: compute::VariantArray -//! [`variant_get`]: compute::variant_get -//! //! # Example: Writing a Parquet file with Variant column //! ```rust -//! # use parquet::variant::compute::{VariantArray, VariantArrayBuilder}; -//! # use parquet::variant::VariantBuilderExt; +//! # use parquet::variant::{VariantArray, VariantType, VariantArrayBuilder, VariantBuilderExt}; //! # use std::sync::Arc; -//! # use arrow_array::{ArrayRef, RecordBatch}; +//! # use arrow_array::{Array, ArrayRef, RecordBatch}; +//! # use arrow_schema::{DataType, Field, Schema}; //! # use parquet::arrow::ArrowWriter; //! # fn main() -> Result<(), parquet::errors::ParquetError> { //! // Use the VariantArrayBuilder to build a VariantArray //! let mut builder = VariantArrayBuilder::new(3); -//! // row 1: {"name": "Alice"} -//! builder.new_object().with_field("name", "Alice").finish(); +//! builder.new_object().with_field("name", "Alice").finish(); // row 1: {"name": "Alice"} +//! builder.append_value("such wow"); // row 2: "such wow" (a string) //! let array = builder.build(); //! -//! // TODO support writing VariantArray directly -//! // at the moment it panics when trying to downcast to a struct array -//! // https://github.com/apache/arrow-rs/issues/8296 -//! // let array: ArrayRef = Arc::new(array); -//! let array: ArrayRef = Arc::new(array.into_inner()); -//! +//! // Since VariantArray is an ExtensionType, it needs to be converted +//! // to an ArrayRef and Field with the appropriate metadata +//! // before it can be written to a Parquet file +//! let field = array.field("data"); +//! let array = ArrayRef::from(array); //! // create a RecordBatch with the VariantArray -//! let batch = RecordBatch::try_from_iter(vec![("data", array)])?; +//! let schema = Schema::new(vec![field]); +//! let batch = RecordBatch::try_new(Arc::new(schema), vec![array])?; //! -//! // write the RecordBatch to a Parquet file +//! // Now you can write the RecordBatch to the Parquet file, as normal //! let file = std::fs::File::create("variant.parquet")?; //! let mut writer = ArrowWriter::try_new(file, batch.schema(), None)?; //! writer.write(&batch)?; @@ -67,37 +65,29 @@ //! # } //! ``` //! -//! # Example: Writing JSON with a Parquet file with Variant column +//! # Example: Writing JSON into a Parquet file with Variant column //! ```rust //! # use std::sync::Arc; //! # use arrow_array::{ArrayRef, RecordBatch, StringArray}; -//! # use parquet::variant::compute::json_to_variant; -//! # use parquet::variant::compute::VariantArray; +//! # use arrow_schema::Schema; +//! # use parquet::variant::{json_to_variant, VariantArray}; //! # use parquet::arrow::ArrowWriter; //! # fn main() -> Result<(), parquet::errors::ParquetError> { //! // Create an array of JSON strings, simulating a column of JSON data -//! // TODO use StringViewArray when available -//! let input_array = StringArray::from(vec![ +//! let input_array: ArrayRef = Arc::new(StringArray::from(vec![ //! Some(r#"{"name": "Alice", "age": 30}"#), //! Some(r#"{"name": "Bob", "age": 25, "address": {"city": "New York"}}"#), //! None, //! Some("{}"), -//! ]); -//! let input_array: ArrayRef = Arc::new(input_array); +//! ])); //! //! // Convert the JSON strings to a VariantArray //! let array: VariantArray = json_to_variant(&input_array)?; -//! -//! // TODO support writing VariantArray directly -//! // at the moment it panics when trying to downcast to a struct array -//! // https://github.com/apache/arrow-rs/issues/8296 -//! // let array: ArrayRef = Arc::new(array); -//! let array: ArrayRef = Arc::new(array.into_inner()); -//! //! // create a RecordBatch with the VariantArray -//! let batch = RecordBatch::try_from_iter(vec![("data", array)])?; +//! let schema = Schema::new(vec![array.field("data")]); +//! let batch = RecordBatch::try_new(Arc::new(schema), vec![ArrayRef::from(array)])?; //! -//! // write the RecordBatch to a Parquet file +//! // write the RecordBatch to a Parquet file as normal //! let file = std::fs::File::create("variant-json.parquet")?; //! let mut writer = ArrowWriter::try_new(file, batch.schema(), None)?; //! writer.write(&batch)?; @@ -108,6 +98,203 @@ //! ``` //! //! # Example: Reading a Parquet file with Variant column -//! (TODO: add example) +//! +//! Use the [`VariantType`] extension type to find the Variant column: +//! +//! ``` +//! # use std::sync::Arc; +//! # use std::path::PathBuf; +//! # use arrow_array::{ArrayRef, RecordBatch, RecordBatchReader}; +//! # use parquet::variant::{Variant, VariantArray, VariantType}; +//! # use parquet::arrow::arrow_reader::ArrowReaderBuilder; +//! # fn main() -> Result<(), parquet::errors::ParquetError> { +//! # use arrow_array::StructArray; +//! # fn file_path() -> PathBuf { // return a testing file path +//! # PathBuf::from(arrow::util::test_util::parquet_test_data()) +//! # .join("..") +//! # .join("shredded_variant") +//! # .join("case-075.parquet") +//! # } +//! // Read the Parquet file using standard Arrow Parquet reader +//! let file = std::fs::File::open(file_path())?; +//! let mut reader = ArrowReaderBuilder::try_new(file)?.build()?; +//! +//! // You can check if a column contains a Variant using +//! // the VariantType extension type +//! let schema = reader.schema(); +//! let field = schema.field_with_name("var")?; +//! assert!(field.try_extension_type::().is_ok()); +//! +//! // The reader will yield RecordBatches with a StructArray +//! // to convert them to VariantArray, use VariantArray::try_new +//! let batch = reader.next().unwrap().unwrap(); +//! let col = batch.column_by_name("var").unwrap(); +//! let var_array = VariantArray::try_new(col)?; +//! assert_eq!(var_array.len(), 1); +//! let var_value: Variant = var_array.value(0); +//! assert_eq!(var_value, Variant::from("iceberg")); // the value in case-075.parquet +//! # Ok(()) +//! # } +//! ``` pub use parquet_variant::*; -pub use parquet_variant_compute as compute; +pub use parquet_variant_compute::*; + +#[cfg(test)] +mod tests { + use crate::arrow::arrow_reader::ArrowReaderBuilder; + use crate::arrow::ArrowWriter; + use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader}; + use crate::file::reader::ChunkReader; + use arrow::util::test_util::parquet_test_data; + use arrow_array::{ArrayRef, RecordBatch}; + use arrow_schema::Schema; + use bytes::Bytes; + use parquet_variant::{Variant, VariantBuilderExt}; + use parquet_variant_compute::{VariantArray, VariantArrayBuilder, VariantType}; + use std::path::PathBuf; + use std::sync::Arc; + + #[test] + fn roundtrip_basic() { + roundtrip(variant_array()); + } + + /// Ensure a file with Variant LogicalType, written by another writer in + /// parquet-testing, can be read as a VariantArray + #[test] + fn read_logical_type() { + // Note: case-075 2 columns ("id", "var") + // The variant looks like this: + // "Variant(metadata=VariantMetadata(dict={}), value=Variant(type=STRING, value=iceberg))" + let batch = read_shredded_variant_test_case("case-075.parquet"); + + assert_variant_metadata(&batch, "var"); + let var_column = batch.column_by_name("var").expect("expected var column"); + let var_array = + VariantArray::try_new(&var_column).expect("expected var column to be a VariantArray"); + + // verify the value + assert_eq!(var_array.len(), 1); + assert!(var_array.is_valid(0)); + let var_value = var_array.value(0); + assert_eq!(var_value, Variant::from("iceberg")); + } + + /// Writes a variant to a parquet file and ensures the parquet logical type + /// annotation is correct + #[test] + fn write_logical_type() { + let array = variant_array(); + let batch = variant_array_to_batch(array); + let buffer = write_to_buffer(&batch); + + // read the parquet file's metadata and verify the logical type + let metadata = read_metadata(&Bytes::from(buffer)); + let schema = metadata.file_metadata().schema_descr(); + let fields = schema.root_schema().get_fields(); + assert_eq!(fields.len(), 1); + let field = &fields[0]; + assert_eq!(field.name(), "data"); + // data should have been written with the Variant logical type + assert_eq!( + field.get_basic_info().logical_type(), + Some(crate::basic::LogicalType::Variant) + ); + } + + /// Return a VariantArray with 3 rows: + /// + /// 1. `{"name": "Alice"}` + /// 2. `"such wow"` (a string) + /// 3. `null` + fn variant_array() -> VariantArray { + let mut builder = VariantArrayBuilder::new(3); + // row 1: {"name": "Alice"} + builder.new_object().with_field("name", "Alice").finish(); + // row 2: "such wow" (a string) + builder.append_value("such wow"); + // row 3: null + builder.append_null(); + builder.build() + } + + /// Writes a VariantArray to a parquet file and reads it back, verifying that + /// the data is the same + fn roundtrip(array: VariantArray) { + let source_batch = variant_array_to_batch(array); + assert_variant_metadata(&source_batch, "data"); + + let buffer = write_to_buffer(&source_batch); + let result_batch = read_to_batch(Bytes::from(buffer)); + assert_variant_metadata(&result_batch, "data"); + assert_eq!(result_batch, source_batch); // NB this also checks the schemas + } + + /// creates a RecordBatch with a single column "data" from a VariantArray, + fn variant_array_to_batch(array: VariantArray) -> RecordBatch { + let field = array.field("data"); + let schema = Schema::new(vec![field]); + RecordBatch::try_new(Arc::new(schema), vec![ArrayRef::from(array)]).unwrap() + } + + /// writes a RecordBatch to memory buffer and returns the buffer + fn write_to_buffer(batch: &RecordBatch) -> Vec { + let mut buffer = vec![]; + let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap(); + writer.write(batch).unwrap(); + writer.close().unwrap(); + buffer + } + + /// Reads the Parquet metadata + fn read_metadata(input: &T) -> ParquetMetaData { + let mut reader = ParquetMetaDataReader::new(); + reader.try_parse(input).unwrap(); + reader.finish().unwrap() + } + + /// Reads a RecordBatch from a reader (e.g. Vec or File) + fn read_to_batch(reader: T) -> RecordBatch { + let reader = ArrowReaderBuilder::try_new(reader) + .unwrap() + .build() + .unwrap(); + let mut batches: Vec = reader.collect::, _>>().unwrap(); + assert_eq!(batches.len(), 1); + batches.swap_remove(0) + } + + /// Verifies the variant metadata is present in the schema for the specified + /// field name. + fn assert_variant_metadata(batch: &RecordBatch, field_name: &str) { + let schema = batch.schema(); + let field = schema + .field_with_name(field_name) + .expect("could not find expected field"); + + // explicitly check the metadata so it is clear in the tests what the + // names are + let metadata_value = field + .metadata() + .get("ARROW:extension:name") + .expect("metadata does not exist"); + + assert_eq!(metadata_value, "parquet.variant"); + + // verify that `VariantType` also correctly finds the metadata + field + .try_extension_type::() + .expect("VariantExtensionType should be readable"); + } + + /// Read the specified test case filename from parquet-testing + /// See parquet-testing/shredded_variant/cases.json for more details + fn read_shredded_variant_test_case(name: &str) -> RecordBatch { + let case_file = PathBuf::from(parquet_test_data()) + .join("..") // go up from data/ to parquet-testing/ + .join("shredded_variant") + .join(name); + let case_file = std::fs::File::open(case_file).unwrap(); + read_to_batch(case_file) + } +} diff --git a/parquet/tests/variant_integration.rs b/parquet/tests/variant_integration.rs index 97fb6b880108..a2ca20cea7af 100644 --- a/parquet/tests/variant_integration.rs +++ b/parquet/tests/variant_integration.rs @@ -24,15 +24,12 @@ //! Inspired by the arrow-go implementation: use arrow::util::test_util::parquet_test_data; -use arrow_array::{Array, ArrayRef}; -use arrow_cast::cast; -use arrow_schema::{DataType, Fields}; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; use parquet_variant::{Variant, VariantMetadata}; use parquet_variant_compute::VariantArray; use serde::Deserialize; use std::path::Path; -use std::sync::{Arc, LazyLock}; +use std::sync::LazyLock; use std::{fs, path::PathBuf}; type Result = std::result::Result; @@ -399,57 +396,12 @@ impl VariantTestCase { .column_by_name("var") .unwrap_or_else(|| panic!("No 'var' column found in parquet file {path:?}")); - // the values are read as - // * StructArray - // but VariantArray needs them as - // * StructArray - // - // So cast them to get the right type. Hack Alert: the parquet reader - // should read them directly as BinaryView - let var = cast_to_binary_view_arrays(var); - VariantArray::try_new(var).unwrap_or_else(|e| { panic!("Error converting StructArray to VariantArray for {path:?}: {e}") }) } } -fn cast_to_binary_view_arrays(array: &ArrayRef) -> ArrayRef { - let new_type = map_type(array.data_type()); - cast(array, &new_type).unwrap_or_else(|e| { - panic!( - "Error casting array from {:?} to {:?}: {e}", - array.data_type(), - new_type - ) - }) -} - -/// replaces all instances of Binary with BinaryView in a DataType -fn map_type(data_type: &DataType) -> DataType { - match data_type { - DataType::Binary => DataType::BinaryView, - DataType::List(field) => { - let new_field = field - .as_ref() - .clone() - .with_data_type(map_type(field.data_type())); - DataType::List(Arc::new(new_field)) - } - DataType::Struct(fields) => { - let new_fields: Fields = fields - .iter() - .map(|f| { - let new_field = f.as_ref().clone().with_data_type(map_type(f.data_type())); - Arc::new(new_field) - }) - .collect(); - DataType::Struct(new_fields) - } - _ => data_type.clone(), - } -} - /// Variant value loaded from .variant.bin file #[derive(Debug, Clone)] struct ExpectedVariant {