Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion parquet-variant-compute/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ rust-version = { workspace = true }


[dependencies]
arrow = { workspace = true }
arrow = { workspace = true , features = ["canonical_extension_types"]}
arrow-schema = { workspace = true }
half = { version = "2.1", default-features = false }
indexmap = "2.10.0"
Expand Down
2 changes: 1 addition & 1 deletion parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ encryption = ["dep:ring"]
flate2-rust_backened = ["flate2/rust_backend"]
flate2-zlib-rs = ["flate2/zlib-rs"]
# Enable parquet variant support
variant_experimental = ["parquet-variant", "parquet-variant-json", "parquet-variant-compute"]
variant_experimental = ["arrow", "parquet-variant", "parquet-variant-json", "parquet-variant-compute"]


[[example]]
Expand Down
29 changes: 17 additions & 12 deletions parquet/src/arrow/schema/complex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use std::collections::HashMap;
use std::sync::Arc;

use crate::arrow::schema::extension::add_extension_type;
use crate::arrow::schema::primitive::convert_primitive;
use crate::arrow::{ProjectionMask, PARQUET_FIELD_ID_META_KEY};
use crate::basic::{ConvertedType, Repetition};
Expand Down Expand Up @@ -172,7 +173,7 @@ impl Visitor {

let parquet_fields = struct_type.get_fields();

// Extract the arrow fields
// Extract any arrow fields from the hints
let arrow_fields = match &context.data_type {
Some(DataType::Struct(fields)) => {
if fields.len() != parquet_fields.len() {
Expand Down Expand Up @@ -220,10 +221,10 @@ impl Visitor {
data_type,
};

if let Some(child) = self.dispatch(parquet_field, child_ctx)? {
if let Some(mut child) = self.dispatch(parquet_field, child_ctx)? {
// The child type returned may be different from what is encoded in the arrow
// schema in the event of a mismatch or a projection
child_fields.push(convert_field(parquet_field, &child, arrow_field));
child_fields.push(convert_field(parquet_field, &mut child, arrow_field));
children.push(child);
}
}
Expand Down Expand Up @@ -352,13 +353,13 @@ impl Visitor {

// Need both columns to be projected
match (maybe_key, maybe_value) {
(Some(key), Some(value)) => {
(Some(mut key), Some(mut value)) => {
let key_field = Arc::new(
convert_field(map_key, &key, arrow_key)
convert_field(map_key, &mut key, arrow_key)
// The key is always non-nullable (#5630)
.with_nullable(false),
);
let value_field = Arc::new(convert_field(map_value, &value, arrow_value));
let value_field = Arc::new(convert_field(map_value, &mut value, arrow_value));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is somewhat confusing to me in that the arrow type is encoded twice -- once on a ParquetField (which has an arrow Field in it) and once in this ValueField.

Any help simplifing this would be most appreciated

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I don't think I understand this part of the code well enough to suggest anything :(

let field_metadata = match arrow_map {
Some(field) => field.metadata().clone(),
_ => HashMap::default(),
Expand Down Expand Up @@ -495,8 +496,8 @@ impl Visitor {
};

match self.dispatch(item_type, new_context) {
Ok(Some(item)) => {
let item_field = Arc::new(convert_field(item_type, &item, arrow_field));
Ok(Some(mut item)) => {
let item_field = Arc::new(convert_field(item_type, &mut item, arrow_field));

// Use arrow type as hint for index size
let arrow_type = match context.data_type {
Expand Down Expand Up @@ -540,11 +541,15 @@ impl Visitor {
}
}

/// Computes the [`Field`] for a child column
/// Computes the Arrow [`Field`] for a child column
///
/// The resulting [`Field`] will have the type dictated by `field`, a name
/// The resulting Arrow [`Field`] will have the type dictated by the Parquet `field`, a name
/// dictated by the `parquet_type`, and any metadata from `arrow_hint`
fn convert_field(parquet_type: &Type, field: &ParquetField, arrow_hint: Option<&Field>) -> Field {
fn convert_field(
parquet_type: &Type,
field: &mut ParquetField,
arrow_hint: Option<&Field>,
) -> Field {
let name = parquet_type.name();
let data_type = field.arrow_type.clone();
let nullable = field.nullable;
Expand Down Expand Up @@ -575,7 +580,7 @@ fn convert_field(parquet_type: &Type, field: &ParquetField, arrow_hint: Option<&
);
ret.set_metadata(meta);
}
ret
add_extension_type(ret, parquet_type)
}
}
}
Expand Down
68 changes: 68 additions & 0 deletions parquet/src/arrow/schema/extension.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Arrow Extension Type Support for Parquet
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I plan to consolidate the rest of the extension type handling in this module, to try and improve the current situation where #cfg(..) is sprinkled over the type conversion logic

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

//!
//! This module contains mapping code to map Parquet [`LogicalType`]s to/from
//! Arrow [`ExtensionType`]s.
//!
//! Extension types are represented using the metadata from Arrow [`Field`]s
//! with the key "ARROW:extension:name".
use crate::basic::LogicalType;
use crate::schema::types::Type;
use arrow_schema::extension::ExtensionType;
use arrow_schema::Field;

/// Adds extension type metadata, if necessary, based on the Parquet field's
/// [`LogicalType`]
///
/// Some Parquet logical types, such as Variant, do not map directly to an
/// Arrow DataType, and instead are represented by an Arrow ExtensionType.
/// Extension types are attached to Arrow Fields via metadata.
pub(crate) fn add_extension_type(arrow_field: Field, parquet_type: &Type) -> Field {
let result = match parquet_type.get_basic_info().logical_type() {
#[cfg(feature = "variant_experimental")]
Some(LogicalType::Variant) => {
arrow_field.with_extension_type(parquet_variant_compute::VariantType)
}
// TODO add other LogicalTypes here
_ => arrow_field,
};
result
}

/// Return the Parquet logical type to use for the specified Arrow field, if any.
#[cfg(feature = "variant_experimental")]
pub(crate) fn logical_type_for_struct(field: &Field) -> Option<LogicalType> {
use parquet_variant_compute::VariantType;
// Check the name (= quick and cheap) and only try_extension_type if the name matches
// to avoid unnecessary String allocations in ArrowError
if field.extension_type_name()? != VariantType::NAME {
return None;
}
match field.try_extension_type::<VariantType>() {
Ok(VariantType) => Some(LogicalType::Variant),
// Given check above, this should not error, but if it does ignore
Err(_e) => None,
}
Comment on lines +57 to +66
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know you're working to make this more generic, but as a pattern this is also how Arrow C++ does the conversion (except in Arrow C++ we can just look at the DataType and here you have to poke the field metadata and parse it. Arrow C++ just hard-codes a few names/storage types too (but no compile time flag on the Parquet library...just a static extension registry that would control whether an extension type shows up as a DataType or a field with metadata).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think it is a fine line to decide how much of the canonical extension types end up in the core parquet/Arrow API

Including them all by default makes the initial developer experience nicer perhaps and keeps the code cleaner, but it makes it harder to customize the binary size / feature set

In my mind I am trying to follow the existing pattern / guidelines (set by @tustvold I think) and I do think it makes sense, but I do understand there are tradeoffs involved

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah sorry, I think I was reflecting in my comment that you're doing exactly what Arrow C++ is doing here (canonical extension type support there is also controlled by compile-time flags, they're just on by default and it just manifests as missing field metadata).

}

#[cfg(not(feature = "variant_experimental"))]
pub(crate) fn logical_type_for_struct(field: &Field) -> Option<LogicalType> {
None
}
12 changes: 9 additions & 3 deletions parquet/src/arrow/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,14 @@ use crate::file::{metadata::KeyValue, properties::WriterProperties};
use crate::schema::types::{ColumnDescriptor, SchemaDescriptor, Type};

mod complex;
mod extension;
mod primitive;

use super::PARQUET_FIELD_ID_META_KEY;
use crate::arrow::schema::extension::logical_type_for_struct;
use crate::arrow::ProjectionMask;
pub(crate) use complex::{ParquetField, ParquetFieldType};

use super::PARQUET_FIELD_ID_META_KEY;

/// Convert Parquet schema to Arrow schema including optional metadata
///
/// Attempts to decode any existing Arrow schema metadata, falling back
Expand All @@ -63,7 +64,11 @@ pub fn parquet_to_arrow_schema_by_columns(
Ok(parquet_to_arrow_schema_and_fields(parquet_schema, mask, key_value_metadata)?.0)
}

/// Extracts the arrow metadata
/// Determines the Arrow Schema from a Parquet schema
///
/// Looks for an Arrow schema metadata "hint" (see
/// [`parquet_to_arrow_field_levels`]), and uses it if present to ensure
/// lossless round trips.
pub(crate) fn parquet_to_arrow_schema_and_fields(
parquet_schema: &SchemaDescriptor,
mask: ProjectionMask,
Expand Down Expand Up @@ -728,6 +733,7 @@ fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result<Type> {
.with_fields(fields)
.with_repetition(repetition)
.with_id(id)
.with_logical_type(logical_type_for_struct(field))
.build()
}
DataType::Map(field, _) => {
Expand Down
Loading
Loading