Skip to content

Commit e8a5173

Browse files
committed
[Variant] Support typed access for timestamp(micro&nano)
1 parent 4da423f commit e8a5173

File tree

3 files changed

+263
-52
lines changed

3 files changed

+263
-52
lines changed

parquet-variant-compute/src/variant_array.rs

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,18 @@
1717

1818
//! [`VariantArray`] implementation
1919
20-
use crate::type_conversion::primitive_conversion_single_value;
20+
use crate::type_conversion::{generic_conversion_single_value, primitive_conversion_single_value};
2121
use arrow::array::{Array, ArrayRef, AsArray, BinaryViewArray, StructArray};
2222
use arrow::buffer::NullBuffer;
2323
use arrow::compute::cast;
2424
use arrow::datatypes::{
2525
Date32Type, Float16Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type,
26-
UInt16Type, UInt32Type, UInt64Type, UInt8Type,
26+
TimestampMicrosecondType, TimestampNanosecondType, UInt16Type, UInt32Type, UInt64Type,
27+
UInt8Type,
2728
};
2829
use arrow_schema::extension::ExtensionType;
29-
use arrow_schema::{ArrowError, DataType, Field, FieldRef, Fields};
30+
use arrow_schema::{ArrowError, DataType, Field, FieldRef, Fields, TimeUnit};
31+
use chrono::DateTime;
3032
use parquet_variant::Uuid;
3133
use parquet_variant::Variant;
3234
use std::sync::Arc;
@@ -864,6 +866,51 @@ fn typed_value_to_variant(typed_value: &ArrayRef, index: usize) -> Variant<'_, '
864866
DataType::Float64 => {
865867
primitive_conversion_single_value!(Float64Type, typed_value, index)
866868
}
869+
DataType::Timestamp(timeunit, tz) => {
870+
match (timeunit, tz) {
871+
(TimeUnit::Microsecond, Some(_)) => {
872+
generic_conversion_single_value!(
873+
TimestampMicrosecondType,
874+
as_primitive,
875+
|v| DateTime::from_timestamp_micros(v).unwrap(),
876+
typed_value,
877+
index
878+
)
879+
}
880+
(TimeUnit::Microsecond, None) => {
881+
generic_conversion_single_value!(
882+
TimestampMicrosecondType,
883+
as_primitive,
884+
|v| DateTime::from_timestamp_micros(v).unwrap().naive_utc(),
885+
typed_value,
886+
index
887+
)
888+
}
889+
(TimeUnit::Nanosecond, Some(_)) => {
890+
generic_conversion_single_value!(
891+
TimestampNanosecondType,
892+
as_primitive,
893+
DateTime::from_timestamp_nanos,
894+
typed_value,
895+
index
896+
)
897+
}
898+
(TimeUnit::Nanosecond, None) => {
899+
generic_conversion_single_value!(
900+
TimestampNanosecondType,
901+
as_primitive,
902+
|v| DateTime::from_timestamp_nanos(v).naive_utc(),
903+
typed_value,
904+
index
905+
)
906+
}
907+
// Variant timestamp only support time unit with microsecond or nanosecond precision
908+
_ => panic!(
909+
"Variant only support timestamp with microsecond or nanosecond precision"
910+
),
911+
}
912+
}
913+
867914
// todo other types here (note this is very similar to cast_to_variant.rs)
868915
// so it would be great to figure out how to share this code
869916
_ => {

parquet-variant-compute/src/variant_get.rs

Lines changed: 205 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ pub(crate) enum ShreddedPathStep {
3434
/// Path step succeeded, return the new shredding state
3535
Success(ShreddingState),
3636
/// The path element is not present in the `typed_value` column and there is no `value` column,
37-
/// so we we know it does not exist. It, and all paths under it, are all-NULL.
37+
/// so we know it does not exist. It, and all paths under it, are all-NULL.
3838
Missing,
3939
/// The path element is not present in the `typed_value` column and must be retrieved from the `value`
4040
/// column instead. The caller should be prepared to handle any value, including the requested
@@ -294,24 +294,18 @@ impl<'a> GetOptions<'a> {
294294

295295
#[cfg(test)]
296296
mod test {
297-
use std::sync::Arc;
298-
299-
use arrow::array::{
300-
Array, ArrayRef, AsArray, BinaryViewArray, Date32Array,
301-
Float16Array, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array,
302-
StringArray, StructArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
303-
};
297+
use super::{variant_get, GetOptions};
298+
use crate::json_to_variant;
299+
use crate::variant_array::{ShreddedVariantFieldArray, StructArrayBuilder};
300+
use crate::VariantArray;
301+
use arrow::array::{Array, ArrayRef, AsArray, BinaryViewArray, Date32Array, Float16Array, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, StringArray, StructArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array};
304302
use arrow::buffer::NullBuffer;
305303
use arrow::compute::CastOptions;
306304
use arrow::datatypes::DataType::{Int16, Int32, Int64, UInt16, UInt32, UInt64, UInt8};
307305
use arrow_schema::{DataType, Field, FieldRef, Fields};
306+
use chrono::DateTime;
308307
use parquet_variant::{Variant, VariantPath, EMPTY_VARIANT_METADATA_BYTES};
309-
310-
use crate::json_to_variant;
311-
use crate::variant_array::{ShreddedVariantFieldArray, StructArrayBuilder};
312-
use crate::VariantArray;
313-
314-
use super::{variant_get, GetOptions};
308+
use std::sync::Arc;
315309

316310
fn single_variant_get_test(input_json: &str, path: VariantPath, expected_json: &str) {
317311
// Create input array from JSON string
@@ -923,6 +917,156 @@ mod test {
923917
f64
924918
);
925919

920+
macro_rules! assert_variant_get_as_variant_array_with_default_option {
921+
($variant_array: expr, $array_expected: expr) => {{
922+
let options = GetOptions::new();
923+
let array = $variant_array;
924+
let result = variant_get(&array, options).unwrap();
925+
926+
// expect the result is a VariantArray
927+
let result = VariantArray::try_new(&result).unwrap();
928+
929+
assert_eq!(result.len(), $array_expected.len());
930+
931+
for (idx, item) in $array_expected.into_iter().enumerate() {
932+
match item {
933+
Some(item) => assert_eq!(result.value(idx), item),
934+
None => assert!(result.is_null(idx)),
935+
}
936+
}
937+
}};
938+
}
939+
940+
partially_shredded_variant_array_gen!(
941+
partially_shredded_timestamp_micro_ntz_variant_array,
942+
|| {
943+
arrow::array::TimestampMicrosecondArray::from(vec![
944+
Some(-456000),
945+
None,
946+
None,
947+
Some(1758602096000000),
948+
])
949+
}
950+
);
951+
952+
#[test]
953+
fn get_variant_partial_shredded_timestamp_micro_ntz_as_variant() {
954+
let array = partially_shredded_timestamp_micro_ntz_variant_array();
955+
assert_variant_get_as_variant_array_with_default_option!(
956+
array,
957+
vec![
958+
Some(Variant::from(
959+
DateTime::from_timestamp_micros(-456000i64)
960+
.unwrap()
961+
.naive_utc(),
962+
)),
963+
None,
964+
Some(Variant::from("n/a")),
965+
Some(Variant::from(
966+
DateTime::parse_from_rfc3339("2025-09-23T12:34:56+08:00")
967+
.unwrap()
968+
.naive_utc(),
969+
)),
970+
]
971+
)
972+
}
973+
974+
partially_shredded_variant_array_gen!(partially_shredded_timestamp_micro_variant_array, || {
975+
arrow::array::TimestampMicrosecondArray::from(vec![
976+
Some(-456000),
977+
None,
978+
None,
979+
Some(1758602096000000),
980+
])
981+
.with_timezone("+00:00")
982+
});
983+
984+
#[test]
985+
fn get_variant_partial_shredded_timestamp_micro_as_variant() {
986+
let array = partially_shredded_timestamp_micro_variant_array();
987+
assert_variant_get_as_variant_array_with_default_option!(
988+
array,
989+
vec![
990+
Some(Variant::from(
991+
DateTime::from_timestamp_micros(-456000i64)
992+
.unwrap()
993+
.to_utc(),
994+
)),
995+
None,
996+
Some(Variant::from("n/a")),
997+
Some(Variant::from(
998+
DateTime::parse_from_rfc3339("2025-09-23T12:34:56+08:00")
999+
.unwrap()
1000+
.to_utc(),
1001+
)),
1002+
]
1003+
)
1004+
}
1005+
1006+
partially_shredded_variant_array_gen!(
1007+
partially_shredded_timestamp_nano_ntz_variant_array,
1008+
|| {
1009+
arrow::array::TimestampNanosecondArray::from(vec![
1010+
Some(-4999999561),
1011+
None,
1012+
None,
1013+
Some(1758602096000000000),
1014+
])
1015+
}
1016+
);
1017+
1018+
#[test]
1019+
fn get_variant_partial_shredded_timestamp_nano_ntz_as_variant() {
1020+
let array = partially_shredded_timestamp_nano_ntz_variant_array();
1021+
1022+
assert_variant_get_as_variant_array_with_default_option!(
1023+
array,
1024+
vec![
1025+
Some(Variant::from(
1026+
DateTime::from_timestamp(-5, 439).unwrap().naive_utc()
1027+
)),
1028+
None,
1029+
Some(Variant::from("n/a")),
1030+
Some(Variant::from(
1031+
DateTime::parse_from_rfc3339("2025-09-23T12:34:56+08:00")
1032+
.unwrap()
1033+
.naive_utc()
1034+
)),
1035+
]
1036+
)
1037+
}
1038+
1039+
partially_shredded_variant_array_gen!(partially_shredded_timestamp_nano_variant_array, || {
1040+
arrow::array::TimestampNanosecondArray::from(vec![
1041+
Some(-4999999561),
1042+
None,
1043+
None,
1044+
Some(1758602096000000000),
1045+
])
1046+
.with_timezone("+00:00")
1047+
});
1048+
1049+
#[test]
1050+
fn get_variant_partial_shredded_timestamp_nano_as_variant() {
1051+
let array = partially_shredded_timestamp_nano_variant_array();
1052+
1053+
assert_variant_get_as_variant_array_with_default_option!(
1054+
array,
1055+
vec![
1056+
Some(Variant::from(
1057+
DateTime::from_timestamp(-5, 439).unwrap().to_utc()
1058+
)),
1059+
None,
1060+
Some(Variant::from("n/a")),
1061+
Some(Variant::from(
1062+
DateTime::parse_from_rfc3339("2025-09-23T12:34:56+08:00")
1063+
.unwrap()
1064+
.to_utc()
1065+
)),
1066+
]
1067+
)
1068+
}
1069+
9261070
/// Return a VariantArray that represents a normal "shredded" variant
9271071
/// for the following example
9281072
///
@@ -957,6 +1101,52 @@ mod test {
9571101
}
9581102
}
9591103

1104+
macro_rules! partially_shredded_variant_array_gen {
1105+
($func:ident, $typed_array_gen: expr) => {
1106+
fn $func() -> ArrayRef {
1107+
// At the time of writing, the `VariantArrayBuilder` does not support shredding.
1108+
// so we must construct the array manually. see https://github.com/apache/arrow-rs/issues/7895
1109+
let (metadata, string_value) = {
1110+
let mut builder = parquet_variant::VariantBuilder::new();
1111+
builder.append_value("n/a");
1112+
builder.finish()
1113+
};
1114+
1115+
let nulls = NullBuffer::from(vec![
1116+
true, // row 0 non null
1117+
false, // row 1 is null
1118+
true, // row 2 non null
1119+
true, // row 3 non null
1120+
]);
1121+
1122+
// metadata is the same for all rows
1123+
let metadata = BinaryViewArray::from_iter_values(std::iter::repeat_n(&metadata, 4));
1124+
1125+
// See https://docs.google.com/document/d/1pw0AWoMQY3SjD7R4LgbPvMjG_xSCtXp3rZHkVp9jpZ4/edit?disco=AAABml8WQrY
1126+
// about why row1 is an empty but non null, value.
1127+
let values = BinaryViewArray::from(vec![
1128+
None, // row 0 is shredded, so no value
1129+
Some(b"" as &[u8]), // row 1 is null, so empty value (why?)
1130+
Some(&string_value), // copy the string value "N/A"
1131+
None, // row 3 is shredded, so no value
1132+
]);
1133+
1134+
let typed_value = $typed_array_gen();
1135+
1136+
let struct_array = StructArrayBuilder::new()
1137+
.with_field("metadata", Arc::new(metadata), false)
1138+
.with_field("typed_value", Arc::new(typed_value), true)
1139+
.with_field("value", Arc::new(values), true)
1140+
.with_nulls(nulls)
1141+
.build();
1142+
1143+
ArrayRef::from(
1144+
VariantArray::try_new(&struct_array).expect("should create variant array"),
1145+
)
1146+
}
1147+
};
1148+
}
1149+
9601150
numeric_partially_shredded_variant_array_fn!(
9611151
partially_shredded_int8_variant_array,
9621152
Int8Array,
@@ -1054,7 +1244,7 @@ mod test {
10541244
None, // row 2 is a string
10551245
Some("world"), // row 3 is shredded
10561246
])
1057-
);
1247+
);
10581248

10591249
/// Return a VariantArray that represents a partially "shredded" variant for Date32
10601250
fn partially_shredded_date32_variant_array() -> ArrayRef {

parquet/tests/variant_integration.rs

Lines changed: 8 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -91,23 +91,10 @@ variant_test_case!(16);
9191
variant_test_case!(17);
9292
variant_test_case!(18);
9393
variant_test_case!(19);
94-
// https://github.com/apache/arrow-rs/issues/8331
95-
variant_test_case!(
96-
20,
97-
"Unsupported typed_value type: Timestamp(Microsecond, Some(\"UTC\"))"
98-
);
99-
variant_test_case!(
100-
21,
101-
"Unsupported typed_value type: Timestamp(Microsecond, Some(\"UTC\"))"
102-
);
103-
variant_test_case!(
104-
22,
105-
"Unsupported typed_value type: Timestamp(Microsecond, None)"
106-
);
107-
variant_test_case!(
108-
23,
109-
"Unsupported typed_value type: Timestamp(Microsecond, None)"
110-
);
94+
variant_test_case!(20);
95+
variant_test_case!(21);
96+
variant_test_case!(22);
97+
variant_test_case!(23);
11198
// https://github.com/apache/arrow-rs/issues/8332
11299
variant_test_case!(24, "Unsupported typed_value type: Decimal128(9, 4)");
113100
variant_test_case!(25, "Unsupported typed_value type: Decimal128(9, 4)");
@@ -119,23 +106,10 @@ variant_test_case!(30);
119106
variant_test_case!(31);
120107
// https://github.com/apache/arrow-rs/issues/8334
121108
variant_test_case!(32, "Unsupported typed_value type: Time64(Microsecond)");
122-
// https://github.com/apache/arrow-rs/issues/8331
123-
variant_test_case!(
124-
33,
125-
"Unsupported typed_value type: Timestamp(Nanosecond, Some(\"UTC\"))"
126-
);
127-
variant_test_case!(
128-
34,
129-
"Unsupported typed_value type: Timestamp(Nanosecond, Some(\"UTC\"))"
130-
);
131-
variant_test_case!(
132-
35,
133-
"Unsupported typed_value type: Timestamp(Nanosecond, None)"
134-
);
135-
variant_test_case!(
136-
36,
137-
"Unsupported typed_value type: Timestamp(Nanosecond, None)"
138-
);
109+
variant_test_case!(33);
110+
variant_test_case!(34);
111+
variant_test_case!(35);
112+
variant_test_case!(36);
139113
variant_test_case!(37);
140114
// https://github.com/apache/arrow-rs/issues/8336
141115
variant_test_case!(38, "Unsupported typed_value type: Struct(");

0 commit comments

Comments
 (0)