@@ -1038,15 +1038,15 @@ impl ArrowColumnWriterFactory {
10381038
10391039 match data_type {
10401040 _ if data_type. is_primitive ( ) => out. push ( col ( leaves. next ( ) . unwrap ( ) ) ?) ,
1041- ArrowDataType :: FixedSizeBinary ( _) | ArrowDataType :: Boolean | ArrowDataType :: Null => out. push ( col ( leaves. next ( ) . unwrap ( ) ) ?) ,
1041+ ArrowDataType :: FixedSizeBinary ( _) | ArrowDataType :: Boolean | ArrowDataType :: Null => {
1042+ out. push ( col ( leaves. next ( ) . unwrap ( ) ) ?)
1043+ }
10421044 ArrowDataType :: LargeBinary
10431045 | ArrowDataType :: Binary
10441046 | ArrowDataType :: Utf8
10451047 | ArrowDataType :: LargeUtf8
10461048 | ArrowDataType :: BinaryView
1047- | ArrowDataType :: Utf8View => {
1048- out. push ( bytes ( leaves. next ( ) . unwrap ( ) ) ?)
1049- }
1049+ | ArrowDataType :: Utf8View => out. push ( bytes ( leaves. next ( ) . unwrap ( ) ) ?) ,
10501050 ArrowDataType :: List ( f)
10511051 | ArrowDataType :: LargeList ( f)
10521052 | ArrowDataType :: FixedSizeList ( f, _) => {
@@ -1063,21 +1063,27 @@ impl ArrowColumnWriterFactory {
10631063 self . get_arrow_column_writer ( f[ 1 ] . data_type ( ) , props, leaves, out) ?
10641064 }
10651065 _ => unreachable ! ( "invalid map type" ) ,
1066- }
1066+ } ,
10671067 ArrowDataType :: Dictionary ( _, value_type) => match value_type. as_ref ( ) {
1068- ArrowDataType :: Utf8 | ArrowDataType :: LargeUtf8 | ArrowDataType :: Binary | ArrowDataType :: LargeBinary => {
1069- out. push ( bytes ( leaves. next ( ) . unwrap ( ) ) ?)
1070- }
1068+ ArrowDataType :: Utf8
1069+ | ArrowDataType :: LargeUtf8
1070+ | ArrowDataType :: Binary
1071+ | ArrowDataType :: LargeBinary => out. push ( bytes ( leaves. next ( ) . unwrap ( ) ) ?) ,
10711072 ArrowDataType :: Utf8View | ArrowDataType :: BinaryView => {
10721073 out. push ( bytes ( leaves. next ( ) . unwrap ( ) ) ?)
10731074 }
1074- ArrowDataType :: FixedSizeBinary ( _) => {
1075- out. push ( bytes ( leaves. next ( ) . unwrap ( ) ) ?)
1076- }
1077- _ => {
1078- out. push ( col ( leaves. next ( ) . unwrap ( ) ) ?)
1079- }
1080- }
1075+ ArrowDataType :: FixedSizeBinary ( _) => out. push ( bytes ( leaves. next ( ) . unwrap ( ) ) ?) ,
1076+ _ => out. push ( col ( leaves. next ( ) . unwrap ( ) ) ?) ,
1077+ } ,
1078+ ArrowDataType :: RunEndEncoded ( _, value_type) => match value_type. data_type ( ) {
1079+ ArrowDataType :: Utf8
1080+ | ArrowDataType :: LargeUtf8
1081+ | ArrowDataType :: Binary
1082+ | ArrowDataType :: LargeBinary => out. push ( bytes ( leaves. next ( ) . unwrap ( ) ) ?) ,
1083+ ArrowDataType :: Utf8View | ArrowDataType :: BinaryView => out. push ( bytes ( leaves. next ( ) . unwrap ( ) ) ?) ,
1084+ ArrowDataType :: FixedSizeBinary ( _) => out. push ( bytes ( leaves. next ( ) . unwrap ( ) ) ?) ,
1085+ _ => out. push ( col ( leaves. next ( ) . unwrap ( ) ) ?) ,
1086+ } ,
10811087 _ => return Err ( ParquetError :: NYI (
10821088 format ! (
10831089 "Attempting to write an Arrow type {data_type:?} to parquet that is not yet implemented"
@@ -1171,6 +1177,41 @@ fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result<usi
11711177 write_primitive ( typed, array. values ( ) , levels)
11721178 }
11731179 } ,
1180+ ArrowDataType :: RunEndEncoded ( _, value_type) => match value_type. data_type ( ) {
1181+ ArrowDataType :: Decimal32 ( _, _) => {
1182+ let array = arrow_cast:: cast ( column, value_type. data_type ( ) ) ?;
1183+ let array = array
1184+ . as_primitive :: < Decimal32Type > ( )
1185+ . unary :: < _ , Int32Type > ( |v| v) ;
1186+ write_primitive ( typed, array. values ( ) , levels)
1187+ }
1188+ ArrowDataType :: Decimal64 ( _, _) => {
1189+ let array = arrow_cast:: cast ( column, value_type. data_type ( ) ) ?;
1190+ let array = array
1191+ . as_primitive :: < Decimal64Type > ( )
1192+ . unary :: < _ , Int32Type > ( |v| v as i32 ) ;
1193+ write_primitive ( typed, array. values ( ) , levels)
1194+ }
1195+ ArrowDataType :: Decimal128 ( _, _) => {
1196+ let array = arrow_cast:: cast ( column, value_type. data_type ( ) ) ?;
1197+ let array = array
1198+ . as_primitive :: < Decimal128Type > ( )
1199+ . unary :: < _ , Int32Type > ( |v| v as i32 ) ;
1200+ write_primitive ( typed, array. values ( ) , levels)
1201+ }
1202+ ArrowDataType :: Decimal256 ( _, _) => {
1203+ let array = arrow_cast:: cast ( column, value_type. data_type ( ) ) ?;
1204+ let array = array
1205+ . as_primitive :: < Decimal256Type > ( )
1206+ . unary :: < _ , Int32Type > ( |v| v. as_i128 ( ) as i32 ) ;
1207+ write_primitive ( typed, array. values ( ) , levels)
1208+ }
1209+ _ => {
1210+ let array = arrow_cast:: cast ( column, & ArrowDataType :: Int32 ) ?;
1211+ let array = array. as_primitive :: < Int32Type > ( ) ;
1212+ write_primitive ( typed, array. values ( ) , levels)
1213+ }
1214+ } ,
11741215 _ => {
11751216 let array = arrow_cast:: cast ( column, & ArrowDataType :: Int32 ) ?;
11761217 let array = array. as_primitive :: < Int32Type > ( ) ;
@@ -1253,6 +1294,12 @@ fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result<usi
12531294 write_primitive ( typed, array. values ( ) , levels)
12541295 }
12551296 } ,
1297+ ArrowDataType :: RunEndEncoded ( _run_ends, _values) => {
1298+ return Err ( ParquetError :: NYI (
1299+ "Int64ColumnWriter: Attempting to write an Arrow REE type that is not yet implemented"
1300+ . to_string ( ) ,
1301+ ) ) ;
1302+ }
12561303 _ => {
12571304 let array = arrow_cast:: cast ( column, & ArrowDataType :: Int64 ) ?;
12581305 let array = array. as_primitive :: < Int64Type > ( ) ;
@@ -1329,6 +1376,12 @@ fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result<usi
13291376 let array = column. as_primitive :: < Float16Type > ( ) ;
13301377 get_float_16_array_slice ( array, indices)
13311378 }
1379+ ArrowDataType :: RunEndEncoded ( _run_ends, _values) => {
1380+ return Err ( ParquetError :: NYI (
1381+ "FixedLenByteArrayColumnWriter: Attempting to write an Arrow REE type that is not yet implemented"
1382+ . to_string ( ) ,
1383+ ) ) ;
1384+ }
13321385 _ => {
13331386 return Err ( ParquetError :: NYI (
13341387 "Attempting to write an Arrow type that is not yet implemented" . to_string ( ) ,
@@ -4380,4 +4433,68 @@ mod tests {
43804433 assert_eq ! ( get_dict_page_size( col0_meta) , 1024 * 1024 ) ;
43814434 assert_eq ! ( get_dict_page_size( col1_meta) , 1024 * 1024 * 4 ) ;
43824435 }
4436+
4437+ #[ test]
4438+ fn arrow_writer_run_end_encoded_string ( ) {
4439+ // Create a run array of strings
4440+ let mut builder = StringRunBuilder :: < Int32Type > :: new ( ) ;
4441+ builder. extend (
4442+ vec ! [ Some ( "alpha" ) ; 100000 ]
4443+ . into_iter ( )
4444+ . chain ( vec ! [ Some ( "beta" ) ; 100000 ] ) ,
4445+ ) ;
4446+ let run_array: RunArray < Int32Type > = builder. finish ( ) ;
4447+ let schema = Arc :: new ( Schema :: new ( vec ! [ Field :: new(
4448+ "ree" ,
4449+ run_array. data_type( ) . clone( ) ,
4450+ run_array. is_nullable( ) ,
4451+ ) ] ) ) ;
4452+
4453+ // Write to parquet
4454+ let mut parquet_bytes: Vec < u8 > = Vec :: new ( ) ;
4455+ let mut writer = ArrowWriter :: try_new ( & mut parquet_bytes, schema. clone ( ) , None ) . unwrap ( ) ;
4456+ let batch = RecordBatch :: try_new ( schema. clone ( ) , vec ! [ Arc :: new( run_array) ] ) . unwrap ( ) ;
4457+ writer. write ( & batch) . unwrap ( ) ;
4458+ writer. close ( ) . unwrap ( ) ;
4459+
4460+ // Read back and verify
4461+ let bytes = Bytes :: from ( parquet_bytes) ;
4462+ let reader = ParquetRecordBatchReaderBuilder :: try_new ( bytes) . unwrap ( ) ;
4463+
4464+ // Check if dictionary was used by examining the metadata
4465+ let metadata = reader. metadata ( ) ;
4466+ let row_group = & metadata. row_groups ( ) [ 0 ] ;
4467+ let col_meta = & row_group. columns ( ) [ 0 ] ;
4468+
4469+ // If dictionary encoding worked, we should see RLE_DICTIONARY encoding
4470+ // and have a dictionary page offset
4471+ let has_dict_encoding = col_meta. encodings ( ) . contains ( & Encoding :: RLE_DICTIONARY ) ;
4472+ let has_dict_page = col_meta. dictionary_page_offset ( ) . is_some ( ) ;
4473+
4474+ // Verify the schema is REE encoded when we read it back
4475+ let expected_schema = Arc :: new ( Schema :: new ( vec ! [ Field :: new(
4476+ "ree" ,
4477+ DataType :: RunEndEncoded (
4478+ Arc :: new( Field :: new( "run_ends" , arrow_schema:: DataType :: Int32 , false ) ) ,
4479+ Arc :: new( Field :: new( "values" , arrow_schema:: DataType :: Utf8 , true ) ) ,
4480+ ) ,
4481+ false ,
4482+ ) ] ) ) ;
4483+ assert_eq ! ( & expected_schema, reader. schema( ) ) ;
4484+
4485+ // Read the data back
4486+ let batches: Vec < _ > = reader
4487+ . build ( )
4488+ . unwrap ( )
4489+ . collect :: < ArrowResult < Vec < _ > > > ( )
4490+ . unwrap ( ) ;
4491+ assert_eq ! ( batches. len( ) , 196 ) ;
4492+ // Count rows in total
4493+ let total_rows = batches. iter ( ) . map ( |b| b. num_rows ( ) ) . sum :: < usize > ( ) ;
4494+ assert_eq ! ( total_rows, 200000 ) ;
4495+
4496+ // Ensure dictionary encoding
4497+ assert ! ( has_dict_encoding, "RunArray should be dictionary encoded" ) ;
4498+ assert ! ( has_dict_page, "RunArray should have dictionary page" ) ;
4499+ }
43834500}
0 commit comments