Skip to content

Commit 04b2e37

Browse files
committed
Support reading/writing VariantArray to parquet with Variant LogicalType
1 parent 010d0e7 commit 04b2e37

File tree

6 files changed

+313
-53
lines changed

6 files changed

+313
-53
lines changed

parquet-variant-compute/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ rust-version = { workspace = true }
3131

3232

3333
[dependencies]
34-
arrow = { workspace = true }
34+
arrow = { workspace = true , features = ["canonical_extension_types"]}
3535
arrow-schema = { workspace = true }
3636
half = { version = "2.1", default-features = false }
3737
indexmap = "2.10.0"

parquet/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ encryption = ["dep:ring"]
129129
flate2-rust_backened = ["flate2/rust_backend"]
130130
flate2-zlib-rs = ["flate2/zlib-rs"]
131131
# Enable parquet variant support
132-
variant_experimental = ["parquet-variant", "parquet-variant-json", "parquet-variant-compute"]
132+
variant_experimental = ["arrow", "parquet-variant", "parquet-variant-json", "parquet-variant-compute"]
133133

134134

135135
[[example]]

parquet/src/arrow/schema/complex.rs

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
use std::collections::HashMap;
1919
use std::sync::Arc;
2020

21+
use crate::arrow::schema::extension::add_extension_type;
2122
use crate::arrow::schema::primitive::convert_primitive;
2223
use crate::arrow::{ProjectionMask, PARQUET_FIELD_ID_META_KEY};
2324
use crate::basic::{ConvertedType, Repetition};
@@ -172,7 +173,7 @@ impl Visitor {
172173

173174
let parquet_fields = struct_type.get_fields();
174175

175-
// Extract the arrow fields
176+
// Extract any arrow fields from the hints
176177
let arrow_fields = match &context.data_type {
177178
Some(DataType::Struct(fields)) => {
178179
if fields.len() != parquet_fields.len() {
@@ -220,10 +221,10 @@ impl Visitor {
220221
data_type,
221222
};
222223

223-
if let Some(child) = self.dispatch(parquet_field, child_ctx)? {
224+
if let Some(mut child) = self.dispatch(parquet_field, child_ctx)? {
224225
// The child type returned may be different from what is encoded in the arrow
225226
// schema in the event of a mismatch or a projection
226-
child_fields.push(convert_field(parquet_field, &child, arrow_field));
227+
child_fields.push(convert_field(parquet_field, &mut child, arrow_field));
227228
children.push(child);
228229
}
229230
}
@@ -352,13 +353,13 @@ impl Visitor {
352353

353354
// Need both columns to be projected
354355
match (maybe_key, maybe_value) {
355-
(Some(key), Some(value)) => {
356+
(Some(mut key), Some(mut value)) => {
356357
let key_field = Arc::new(
357-
convert_field(map_key, &key, arrow_key)
358+
convert_field(map_key, &mut key, arrow_key)
358359
// The key is always non-nullable (#5630)
359360
.with_nullable(false),
360361
);
361-
let value_field = Arc::new(convert_field(map_value, &value, arrow_value));
362+
let value_field = Arc::new(convert_field(map_value, &mut value, arrow_value));
362363
let field_metadata = match arrow_map {
363364
Some(field) => field.metadata().clone(),
364365
_ => HashMap::default(),
@@ -495,8 +496,8 @@ impl Visitor {
495496
};
496497

497498
match self.dispatch(item_type, new_context) {
498-
Ok(Some(item)) => {
499-
let item_field = Arc::new(convert_field(item_type, &item, arrow_field));
499+
Ok(Some(mut item)) => {
500+
let item_field = Arc::new(convert_field(item_type, &mut item, arrow_field));
500501

501502
// Use arrow type as hint for index size
502503
let arrow_type = match context.data_type {
@@ -540,11 +541,15 @@ impl Visitor {
540541
}
541542
}
542543

543-
/// Computes the [`Field`] for a child column
544+
/// Computes the Arrow [`Field`] for a child column
544545
///
545-
/// The resulting [`Field`] will have the type dictated by `field`, a name
546+
/// The resulting Arrow [`Field`] will have the type dictated by the Parquet `field`, a name
546547
/// dictated by the `parquet_type`, and any metadata from `arrow_hint`
547-
fn convert_field(parquet_type: &Type, field: &ParquetField, arrow_hint: Option<&Field>) -> Field {
548+
fn convert_field(
549+
parquet_type: &Type,
550+
field: &mut ParquetField,
551+
arrow_hint: Option<&Field>,
552+
) -> Field {
548553
let name = parquet_type.name();
549554
let data_type = field.arrow_type.clone();
550555
let nullable = field.nullable;
@@ -575,7 +580,7 @@ fn convert_field(parquet_type: &Type, field: &ParquetField, arrow_hint: Option<&
575580
);
576581
ret.set_metadata(meta);
577582
}
578-
ret
583+
add_extension_type(ret, parquet_type)
579584
}
580585
}
581586
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Arrow Extension Type Support for Parquet
19+
//!
20+
//! This module contains mapping code to map Parquet [`LogicalType`]s to/from
21+
//! Arrow [`ExtensionType`]s.
22+
//!
23+
//! Extension types are represented using the metadata from Arrow [`Field`]s
24+
//! with the key "ARROW:extension:name".
25+
26+
use crate::basic::LogicalType;
27+
use crate::schema::types::Type;
28+
use arrow_schema::extension::ExtensionType;
29+
use arrow_schema::Field;
30+
31+
/// Adds extension type metadata, if necessary, based on the Parquet field's
32+
/// [`LogicalType`]
33+
///
34+
/// Some Parquet logical types, such as Variant, do not map directly to an
35+
/// Arrow DataType, and instead are represented by an Arrow ExtensionType.
36+
/// Extension types are attached to Arrow Fields via metadata.
37+
pub(crate) fn add_extension_type(arrow_field: Field, parquet_type: &Type) -> Field {
38+
let result = match parquet_type.get_basic_info().logical_type() {
39+
#[cfg(feature = "variant_experimental")]
40+
Some(LogicalType::Variant) => {
41+
arrow_field.with_extension_type(parquet_variant_compute::VariantType)
42+
}
43+
// TODO add other LogicalTypes here
44+
_ => arrow_field,
45+
};
46+
result
47+
}
48+
49+
/// Return the Parquet logical type to use for the specified Arrow field, if any.
50+
#[cfg(feature = "variant_experimental")]
51+
pub(crate) fn logical_type_for_struct(field: &Field) -> Option<LogicalType> {
52+
use parquet_variant_compute::VariantType;
53+
if field.extension_type_name()? == VariantType::NAME {
54+
return Some(LogicalType::Variant);
55+
};
56+
None
57+
}
58+
59+
#[cfg(not(feature = "variant_experimental"))]
60+
pub(crate) fn logical_type_for_struct(field: &Field) -> Option<LogicalType> {
61+
None
62+
}

parquet/src/arrow/schema/mod.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,14 @@ use crate::file::{metadata::KeyValue, properties::WriterProperties};
3535
use crate::schema::types::{ColumnDescriptor, SchemaDescriptor, Type};
3636

3737
mod complex;
38+
mod extension;
3839
mod primitive;
3940

41+
use super::PARQUET_FIELD_ID_META_KEY;
42+
use crate::arrow::schema::extension::logical_type_for_struct;
4043
use crate::arrow::ProjectionMask;
4144
pub(crate) use complex::{ParquetField, ParquetFieldType};
4245

43-
use super::PARQUET_FIELD_ID_META_KEY;
44-
4546
/// Convert Parquet schema to Arrow schema including optional metadata
4647
///
4748
/// Attempts to decode any existing Arrow schema metadata, falling back
@@ -63,7 +64,11 @@ pub fn parquet_to_arrow_schema_by_columns(
6364
Ok(parquet_to_arrow_schema_and_fields(parquet_schema, mask, key_value_metadata)?.0)
6465
}
6566

66-
/// Extracts the arrow metadata
67+
/// Determines the Arrow Schema from a Parquet schema
68+
///
69+
/// Looks for an Arrow schema metadata "hint" (see
70+
/// [`parquet_to_arrow_field_levels`]), and uses it if present to ensure
71+
/// lossless round trips.
6772
pub(crate) fn parquet_to_arrow_schema_and_fields(
6873
parquet_schema: &SchemaDescriptor,
6974
mask: ProjectionMask,
@@ -728,6 +733,7 @@ fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result<Type> {
728733
.with_fields(fields)
729734
.with_repetition(repetition)
730735
.with_id(id)
736+
.with_logical_type(logical_type_for_struct(field))
731737
.build()
732738
}
733739
DataType::Map(field, _) => {

0 commit comments

Comments
 (0)