diff --git a/arrow-pyarrow-integration-testing/src/lib.rs b/arrow-pyarrow-integration-testing/src/lib.rs index af400868ffa9..730409b3777e 100644 --- a/arrow-pyarrow-integration-testing/src/lib.rs +++ b/arrow-pyarrow-integration-testing/src/lib.rs @@ -24,12 +24,12 @@ use arrow::array::new_empty_array; use pyo3::prelude::*; use pyo3::wrap_pyfunction; -use arrow::array::{Array, ArrayData, ArrayRef, Int64Array, make_array}; +use arrow::array::{make_array, Array, ArrayData, ArrayRef, Int64Array}; use arrow::compute::kernels; use arrow::datatypes::{DataType, Field, Schema}; use arrow::error::ArrowError; use arrow::ffi_stream::ArrowArrayStreamReader; -use arrow::pyarrow::{PyArrowConvert, PyArrowException, PyArrowType}; +use arrow::pyarrow::{FromPyArrow, PyArrowException, PyArrowType, ToPyArrow}; use arrow::record_batch::RecordBatch; fn to_py_err(err: ArrowError) -> PyErr { @@ -88,7 +88,8 @@ fn substring( let array = make_array(array.0); // substring - let array = kernels::substring::substring(array.as_ref(), start, None).map_err(to_py_err)?; + let array = + kernels::substring::substring(array.as_ref(), start, None).map_err(to_py_err)?; Ok(array.to_data().into()) } @@ -99,7 +100,8 @@ fn concatenate(array: PyArrowType, py: Python) -> PyResult let array = make_array(array.0); // concat - let array = kernels::concat::concat(&[array.as_ref(), array.as_ref()]).map_err(to_py_err)?; + let array = + kernels::concat::concat(&[array.as_ref(), array.as_ref()]).map_err(to_py_err)?; array.to_data().to_pyarrow(py) } diff --git a/arrow/src/ffi_stream.rs b/arrow/src/ffi_stream.rs index 0e358c36a0dc..cfda4c88b4b9 100644 --- a/arrow/src/ffi_stream.rs +++ b/arrow/src/ffi_stream.rs @@ -37,25 +37,19 @@ //! let reader = Box::new(FileReader::try_new(file).unwrap()); //! //! // export it -//! let stream = Box::new(FFI_ArrowArrayStream::empty()); -//! let stream_ptr = Box::into_raw(stream) as *mut FFI_ArrowArrayStream; -//! unsafe { export_reader_into_raw(reader, stream_ptr) }; +//! let mut stream = FFI_ArrowArrayStream::empty(); +//! unsafe { export_reader_into_raw(reader, &mut stream) }; //! //! // consumed and used by something else... //! //! // import it -//! let stream_reader = unsafe { ArrowArrayStreamReader::from_raw(stream_ptr).unwrap() }; +//! let stream_reader = unsafe { ArrowArrayStreamReader::from_raw(&mut stream).unwrap() }; //! let imported_schema = stream_reader.schema(); //! //! let mut produced_batches = vec![]; //! for batch in stream_reader { //! produced_batches.push(batch.unwrap()); //! } -//! -//! // (drop/release) -//! unsafe { -//! Box::from_raw(stream_ptr); -//! } //! Ok(()) //! } //! ``` @@ -105,6 +99,8 @@ pub struct FFI_ArrowArrayStream { pub private_data: *mut c_void, } +unsafe impl Send for FFI_ArrowArrayStream {} + // callback used to drop [FFI_ArrowArrayStream] when it is exported. unsafe extern "C" fn release_stream(stream: *mut FFI_ArrowArrayStream) { if stream.is_null() { @@ -231,8 +227,7 @@ impl ExportedArrayStream { let struct_array = StructArray::from(batch); let array = FFI_ArrowArray::new(&struct_array.to_data()); - unsafe { std::ptr::copy(addr_of!(array), out, 1) }; - std::mem::forget(array); + unsafe { std::ptr::write_unaligned(out, array) }; 0 } else { let err = &next_batch.unwrap_err(); @@ -261,24 +256,21 @@ fn get_error_code(err: &ArrowError) -> i32 { /// Struct used to fetch `RecordBatch` from the C Stream Interface. /// Its main responsibility is to expose `RecordBatchReader` functionality /// that requires [FFI_ArrowArrayStream]. -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct ArrowArrayStreamReader { - stream: Arc, + stream: FFI_ArrowArrayStream, schema: SchemaRef, } /// Gets schema from a raw pointer of `FFI_ArrowArrayStream`. This is used when constructing /// `ArrowArrayStreamReader` to cache schema. fn get_stream_schema(stream_ptr: *mut FFI_ArrowArrayStream) -> Result { - let empty_schema = Arc::new(FFI_ArrowSchema::empty()); - let schema_ptr = Arc::into_raw(empty_schema) as *mut FFI_ArrowSchema; + let mut schema = FFI_ArrowSchema::empty(); - let ret_code = unsafe { (*stream_ptr).get_schema.unwrap()(stream_ptr, schema_ptr) }; - - let ffi_schema = unsafe { Arc::from_raw(schema_ptr) }; + let ret_code = unsafe { (*stream_ptr).get_schema.unwrap()(stream_ptr, &mut schema) }; if ret_code == 0 { - let schema = Schema::try_from(ffi_schema.as_ref()).unwrap(); + let schema = Schema::try_from(&schema).unwrap(); Ok(Arc::new(schema)) } else { Err(ArrowError::CDataInterface(format!( @@ -291,21 +283,16 @@ impl ArrowArrayStreamReader { /// Creates a new `ArrowArrayStreamReader` from a `FFI_ArrowArrayStream`. /// This is used to import from the C Stream Interface. #[allow(dead_code)] - pub fn try_new(stream: FFI_ArrowArrayStream) -> Result { + pub fn try_new(mut stream: FFI_ArrowArrayStream) -> Result { if stream.release.is_none() { return Err(ArrowError::CDataInterface( "input stream is already released".to_string(), )); } - let stream_ptr = Arc::into_raw(Arc::new(stream)) as *mut FFI_ArrowArrayStream; - - let schema = get_stream_schema(stream_ptr)?; + let schema = get_stream_schema(&mut stream)?; - Ok(Self { - stream: unsafe { Arc::from_raw(stream_ptr) }, - schema, - }) + Ok(Self { stream, schema }) } /// Creates a new `ArrowArrayStreamReader` from a raw pointer of `FFI_ArrowArrayStream`. @@ -324,13 +311,12 @@ impl ArrowArrayStreamReader { } /// Get the last error from `ArrowArrayStreamReader` - fn get_stream_last_error(&self) -> Option { + fn get_stream_last_error(&mut self) -> Option { self.stream.get_last_error?; - let stream_ptr = Arc::as_ptr(&self.stream) as *mut FFI_ArrowArrayStream; - let error_str = unsafe { - let c_str = self.stream.get_last_error.unwrap()(stream_ptr) as *mut c_char; + let c_str = + self.stream.get_last_error.unwrap()(&mut self.stream) as *mut c_char; CString::from_raw(c_str).into_string() }; @@ -346,18 +332,14 @@ impl Iterator for ArrowArrayStreamReader { type Item = Result; fn next(&mut self) -> Option { - let stream_ptr = Arc::as_ptr(&self.stream) as *mut FFI_ArrowArrayStream; - - let empty_array = Arc::new(FFI_ArrowArray::empty()); - let array_ptr = Arc::into_raw(empty_array) as *mut FFI_ArrowArray; + let mut array = FFI_ArrowArray::empty(); - let ret_code = unsafe { self.stream.get_next.unwrap()(stream_ptr, array_ptr) }; + let ret_code = + unsafe { self.stream.get_next.unwrap()(&mut self.stream, &mut array) }; if ret_code == 0 { - let ffi_array = unsafe { Arc::from_raw(array_ptr) }; - // The end of stream has been reached - if ffi_array.is_released() { + if array.is_released() { return None; } @@ -365,7 +347,7 @@ impl Iterator for ArrowArrayStreamReader { let schema = FFI_ArrowSchema::try_from(schema_ref.as_ref()).ok()?; let data = ArrowArray { - array: ffi_array, + array: Arc::new(array), schema: Arc::new(schema), } .to_data() @@ -375,8 +357,6 @@ impl Iterator for ArrowArrayStreamReader { Some(Ok(record_batch)) } else { - unsafe { Arc::from_raw(array_ptr) }; - let last_error = self.get_stream_last_error(); let err = ArrowError::CDataInterface(last_error.unwrap()); Some(Err(err)) @@ -451,40 +431,33 @@ mod tests { let reader = TestRecordBatchReader::new(schema.clone(), iter); // Export a `RecordBatchReader` through `FFI_ArrowArrayStream` - let stream = Arc::new(FFI_ArrowArrayStream::empty()); - let stream_ptr = Arc::into_raw(stream) as *mut FFI_ArrowArrayStream; - - unsafe { export_reader_into_raw(reader, stream_ptr) }; - - let empty_schema = Arc::new(FFI_ArrowSchema::empty()); - let schema_ptr = Arc::into_raw(empty_schema) as *mut FFI_ArrowSchema; + let mut ffi_stream = FFI_ArrowArrayStream::empty(); + unsafe { export_reader_into_raw(reader, &mut ffi_stream) }; // Get schema from `FFI_ArrowArrayStream` - let ret_code = unsafe { get_schema(stream_ptr, schema_ptr) }; + let mut ffi_schema = FFI_ArrowSchema::empty(); + let ret_code = unsafe { get_schema(&mut ffi_stream, &mut ffi_schema) }; assert_eq!(ret_code, 0); - let ffi_schema = unsafe { Arc::from_raw(schema_ptr) }; - - let exported_schema = Schema::try_from(ffi_schema.as_ref()).unwrap(); + let exported_schema = Schema::try_from(&ffi_schema).unwrap(); assert_eq!(&exported_schema, schema.as_ref()); + let ffi_schema = Arc::new(ffi_schema); + // Get array from `FFI_ArrowArrayStream` let mut produced_batches = vec![]; loop { - let empty_array = Arc::new(FFI_ArrowArray::empty()); - let array_ptr = Arc::into_raw(empty_array.clone()) as *mut FFI_ArrowArray; - - let ret_code = unsafe { get_next(stream_ptr, array_ptr) }; + let mut ffi_array = FFI_ArrowArray::empty(); + let ret_code = unsafe { get_next(&mut ffi_stream, &mut ffi_array) }; assert_eq!(ret_code, 0); // The end of stream has been reached - let ffi_array = unsafe { Arc::from_raw(array_ptr) }; if ffi_array.is_released() { break; } let array = ArrowArray { - array: ffi_array, + array: Arc::new(ffi_array), schema: ffi_schema.clone(), } .to_data() @@ -496,7 +469,6 @@ mod tests { assert_eq!(produced_batches, vec![batch.clone(), batch]); - unsafe { Arc::from_raw(stream_ptr) }; Ok(()) } @@ -512,10 +484,8 @@ mod tests { let reader = TestRecordBatchReader::new(schema.clone(), iter); // Import through `FFI_ArrowArrayStream` as `ArrowArrayStreamReader` - let stream = Arc::new(FFI_ArrowArrayStream::new(reader)); - let stream_ptr = Arc::into_raw(stream) as *mut FFI_ArrowArrayStream; - let stream_reader = - unsafe { ArrowArrayStreamReader::from_raw(stream_ptr).unwrap() }; + let stream = FFI_ArrowArrayStream::new(reader); + let stream_reader = ArrowArrayStreamReader::try_new(stream).unwrap(); let imported_schema = stream_reader.schema(); assert_eq!(imported_schema, schema); @@ -527,7 +497,6 @@ mod tests { assert_eq!(produced_batches, vec![batch.clone(), batch]); - unsafe { Arc::from_raw(stream_ptr) }; Ok(()) } diff --git a/arrow/src/pyarrow.rs b/arrow/src/pyarrow.rs index 081cc8063366..ba8d606f2e1f 100644 --- a/arrow/src/pyarrow.rs +++ b/arrow/src/pyarrow.rs @@ -15,13 +15,16 @@ // specific language governing permissions and limitations // under the License. -//! This module demonstrates a minimal usage of Rust's C data interface to pass -//! arrays from and to Python. +//! Pass Arrow objects from and to Python, using Arrow's +//! [C Data Interface](https://arrow.apache.org/docs/format/CDataInterface.html) +//! and [pyo3](https://docs.rs/pyo3/latest/pyo3/). +//! For underlying implementation, see the [ffi] module. use std::convert::{From, TryFrom}; use std::ptr::{addr_of, addr_of_mut}; use std::sync::Arc; +use pyo3::exceptions::PyValueError; use pyo3::ffi::Py_uintptr_t; use pyo3::import_exception; use pyo3::prelude::*; @@ -44,12 +47,27 @@ fn to_py_err(err: ArrowError) -> PyErr { PyArrowException::new_err(err.to_string()) } -pub trait PyArrowConvert: Sized { +pub trait FromPyArrow: Sized { fn from_pyarrow(value: &PyAny) -> PyResult; +} + +/// Create a new PyArrow object from a arrow-rs type. +pub trait ToPyArrow { fn to_pyarrow(&self, py: Python) -> PyResult; } -impl PyArrowConvert for DataType { +/// Convert an arrow-rs type into a PyArrow object. +pub trait IntoPyArrow { + fn into_pyarrow(self, py: Python) -> PyResult; +} + +impl IntoPyArrow for T { + fn into_pyarrow(self, py: Python) -> PyResult { + self.to_pyarrow(py) + } +} + +impl FromPyArrow for DataType { fn from_pyarrow(value: &PyAny) -> PyResult { let c_schema = FFI_ArrowSchema::empty(); let c_schema_ptr = &c_schema as *const FFI_ArrowSchema; @@ -57,7 +75,9 @@ impl PyArrowConvert for DataType { let dtype = DataType::try_from(&c_schema).map_err(to_py_err)?; Ok(dtype) } +} +impl ToPyArrow for DataType { fn to_pyarrow(&self, py: Python) -> PyResult { let c_schema = FFI_ArrowSchema::try_from(self).map_err(to_py_err)?; let c_schema_ptr = &c_schema as *const FFI_ArrowSchema; @@ -69,7 +89,7 @@ impl PyArrowConvert for DataType { } } -impl PyArrowConvert for Field { +impl FromPyArrow for Field { fn from_pyarrow(value: &PyAny) -> PyResult { let c_schema = FFI_ArrowSchema::empty(); let c_schema_ptr = &c_schema as *const FFI_ArrowSchema; @@ -77,7 +97,9 @@ impl PyArrowConvert for Field { let field = Field::try_from(&c_schema).map_err(to_py_err)?; Ok(field) } +} +impl ToPyArrow for Field { fn to_pyarrow(&self, py: Python) -> PyResult { let c_schema = FFI_ArrowSchema::try_from(self).map_err(to_py_err)?; let c_schema_ptr = &c_schema as *const FFI_ArrowSchema; @@ -89,7 +111,7 @@ impl PyArrowConvert for Field { } } -impl PyArrowConvert for Schema { +impl FromPyArrow for Schema { fn from_pyarrow(value: &PyAny) -> PyResult { let c_schema = FFI_ArrowSchema::empty(); let c_schema_ptr = &c_schema as *const FFI_ArrowSchema; @@ -97,7 +119,9 @@ impl PyArrowConvert for Schema { let schema = Schema::try_from(&c_schema).map_err(to_py_err)?; Ok(schema) } +} +impl ToPyArrow for Schema { fn to_pyarrow(&self, py: Python) -> PyResult { let c_schema = FFI_ArrowSchema::try_from(self).map_err(to_py_err)?; let c_schema_ptr = &c_schema as *const FFI_ArrowSchema; @@ -109,7 +133,7 @@ impl PyArrowConvert for Schema { } } -impl PyArrowConvert for ArrayData { +impl FromPyArrow for ArrayData { fn from_pyarrow(value: &PyAny) -> PyResult { // prepare a pointer to receive the Array struct let mut array = FFI_ArrowArray::empty(); @@ -131,7 +155,9 @@ impl PyArrowConvert for ArrayData { Ok(data) } +} +impl ToPyArrow for ArrayData { fn to_pyarrow(&self, py: Python) -> PyResult { let array = FFI_ArrowArray::new(self); let schema = FFI_ArrowSchema::try_from(self.data_type()).map_err(to_py_err)?; @@ -149,12 +175,14 @@ impl PyArrowConvert for ArrayData { } } -impl PyArrowConvert for Vec { +impl FromPyArrow for Vec { fn from_pyarrow(value: &PyAny) -> PyResult { let list = value.downcast::()?; - list.iter().map(|x| T::from_pyarrow(&x)).collect() + list.iter().map(|x| T::from_pyarrow(x)).collect() } +} +impl ToPyArrow for Vec { fn to_pyarrow(&self, py: Python) -> PyResult { let values = self .iter() @@ -164,7 +192,7 @@ impl PyArrowConvert for Vec { } } -impl PyArrowConvert for RecordBatch { +impl FromPyArrow for RecordBatch { fn from_pyarrow(value: &PyAny) -> PyResult { // TODO(kszucs): implement the FFI conversions in arrow-rs for RecordBatches let schema = value.getattr("schema")?; @@ -179,7 +207,9 @@ impl PyArrowConvert for RecordBatch { let batch = RecordBatch::try_new(schema, arrays).map_err(to_py_err)?; Ok(batch) } +} +impl ToPyArrow for RecordBatch { fn to_pyarrow(&self, py: Python) -> PyResult { let mut py_arrays = vec![]; @@ -203,38 +233,36 @@ impl PyArrowConvert for RecordBatch { } } -impl PyArrowConvert for ArrowArrayStreamReader { +impl FromPyArrow for ArrowArrayStreamReader { fn from_pyarrow(value: &PyAny) -> PyResult { // prepare a pointer to receive the stream struct - let stream = Box::new(FFI_ArrowArrayStream::empty()); - let stream_ptr = Box::into_raw(stream) as *mut FFI_ArrowArrayStream; + let mut stream = FFI_ArrowArrayStream::empty(); + let stream_ptr = &mut stream as *mut FFI_ArrowArrayStream; // make the conversion through PyArrow's private API // this changes the pointer's memory and is thus unsafe. // In particular, `_export_to_c` can go out of bounds - let args = PyTuple::new(value.py(), &[stream_ptr as Py_uintptr_t]); + let args = PyTuple::new(value.py(), [stream_ptr as Py_uintptr_t]); value.call_method1("_export_to_c", args)?; - let stream_reader = - unsafe { ArrowArrayStreamReader::from_raw(stream_ptr).unwrap() }; - - unsafe { - drop(Box::from_raw(stream_ptr)); - } + let stream_reader = ArrowArrayStreamReader::try_new(stream) + .map_err(|err| PyValueError::new_err(err.to_string()))?; Ok(stream_reader) } +} - fn to_pyarrow(&self, py: Python) -> PyResult { - let stream = Box::new(FFI_ArrowArrayStream::empty()); - let stream_ptr = Box::into_raw(stream) as *mut FFI_ArrowArrayStream; - - unsafe { export_reader_into_raw(Box::new(self.clone()), stream_ptr) }; +impl IntoPyArrow for ArrowArrayStreamReader { + fn into_pyarrow(self, py: Python) -> PyResult { + let mut stream = FFI_ArrowArrayStream::empty(); + unsafe { export_reader_into_raw(Box::new(self), &mut stream) }; + let stream_ptr = (&mut stream) as *mut FFI_ArrowArrayStream; let module = py.import("pyarrow")?; let class = module.getattr("RecordBatchReader")?; - let args = PyTuple::new(py, &[stream_ptr as Py_uintptr_t]); + let args = PyTuple::new(py, [stream_ptr as Py_uintptr_t]); let reader = class.call_method1("_import_from_c", args)?; + Ok(PyObject::from(reader)) } } @@ -242,21 +270,24 @@ impl PyArrowConvert for ArrowArrayStreamReader { /// A newtype wrapper around a `T: PyArrowConvert` that implements /// [`FromPyObject`] and [`IntoPy`] allowing usage with pyo3 macros #[derive(Debug)] -pub struct PyArrowType(pub T); +pub struct PyArrowType(pub T); -impl<'source, T: PyArrowConvert> FromPyObject<'source> for PyArrowType { +impl<'source, T: FromPyArrow + IntoPyArrow> FromPyObject<'source> for PyArrowType { fn extract(value: &'source PyAny) -> PyResult { Ok(Self(T::from_pyarrow(value)?)) } } -impl<'a, T: PyArrowConvert> IntoPy for PyArrowType { +impl IntoPy for PyArrowType { fn into_py(self, py: Python) -> PyObject { - self.0.to_pyarrow(py).unwrap() + match self.0.into_pyarrow(py) { + Ok(obj) => obj, + Err(err) => err.to_object(py), + } } } -impl From for PyArrowType { +impl From for PyArrowType { fn from(s: T) -> Self { Self(s) } diff --git a/arrow/tests/pyarrow.rs b/arrow/tests/pyarrow.rs index 4b1226c738f5..4b6991da0063 100644 --- a/arrow/tests/pyarrow.rs +++ b/arrow/tests/pyarrow.rs @@ -16,7 +16,7 @@ // under the License. use arrow::array::{ArrayRef, Int32Array, StringArray}; -use arrow::pyarrow::PyArrowConvert; +use arrow::pyarrow::{FromPyArrow, ToPyArrow}; use arrow::record_batch::RecordBatch; use pyo3::Python; use std::sync::Arc; diff --git a/parquet/src/arrow/array_reader/builder.rs b/parquet/src/arrow/array_reader/builder.rs index 241a5efe078a..5e0d05e8953c 100644 --- a/parquet/src/arrow/array_reader/builder.rs +++ b/parquet/src/arrow/array_reader/builder.rs @@ -23,8 +23,8 @@ use crate::arrow::array_reader::empty_array::make_empty_array_reader; use crate::arrow::array_reader::fixed_len_byte_array::make_fixed_len_byte_array_reader; use crate::arrow::array_reader::{ make_byte_array_dictionary_reader, make_byte_array_reader, ArrayReader, - ListArrayReader, MapArrayReader, NullArrayReader, PrimitiveArrayReader, - RowGroupCollection, StructArrayReader, + FixedSizeListArrayReader, ListArrayReader, MapArrayReader, NullArrayReader, + PrimitiveArrayReader, RowGroupCollection, StructArrayReader, }; use crate::arrow::schema::{ParquetField, ParquetFieldType}; use crate::arrow::ProjectionMask; @@ -63,6 +63,9 @@ fn build_reader( DataType::Struct(_) => build_struct_reader(field, mask, row_groups), DataType::List(_) => build_list_reader(field, mask, false, row_groups), DataType::LargeList(_) => build_list_reader(field, mask, true, row_groups), + DataType::FixedSizeList(_, _) => { + build_fixed_size_list_reader(field, mask, row_groups) + } d => unimplemented!("reading group type {} not implemented", d), }, } @@ -166,6 +169,43 @@ fn build_list_reader( Ok(reader) } +/// Build array reader for fixed-size list type. +fn build_fixed_size_list_reader( + field: &ParquetField, + mask: &ProjectionMask, + row_groups: &dyn RowGroupCollection, +) -> Result>> { + let children = field.children().unwrap(); + assert_eq!(children.len(), 1); + + let reader = match build_reader(&children[0], mask, row_groups)? { + Some(item_reader) => { + let item_type = item_reader.get_data_type().clone(); + let reader = match &field.arrow_type { + &DataType::FixedSizeList(ref f, size) => { + let data_type = DataType::FixedSizeList( + Arc::new(f.as_ref().clone().with_data_type(item_type)), + size, + ); + + Box::new(FixedSizeListArrayReader::new( + item_reader, + size as usize, + data_type, + field.def_level, + field.rep_level, + field.nullable, + )) as _ + } + _ => unimplemented!(), + }; + Some(reader) + } + None => None, + }; + Ok(reader) +} + /// Creates primitive array reader for each primitive type. fn build_primitive_reader( field: &ParquetField, diff --git a/parquet/src/arrow/array_reader/fixed_size_list_array.rs b/parquet/src/arrow/array_reader/fixed_size_list_array.rs new file mode 100644 index 000000000000..4cf68a06601c --- /dev/null +++ b/parquet/src/arrow/array_reader/fixed_size_list_array.rs @@ -0,0 +1,688 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::cmp::Ordering; +use std::sync::Arc; + +use crate::arrow::array_reader::ArrayReader; +use crate::errors::ParquetError; +use crate::errors::Result; +use arrow_array::FixedSizeListArray; +use arrow_array::{builder::BooleanBufferBuilder, new_empty_array, Array, ArrayRef}; +use arrow_data::{transform::MutableArrayData, ArrayData}; +use arrow_schema::DataType as ArrowType; + +/// Implementation of fixed-size list array reader. +pub struct FixedSizeListArrayReader { + item_reader: Box, + /// The number of child items in each row of the list array + fixed_size: usize, + data_type: ArrowType, + /// The definition level at which this list is not null + def_level: i16, + /// The repetition level that corresponds to a new value in this array + rep_level: i16, + /// If the list is nullable + nullable: bool, +} + +impl FixedSizeListArrayReader { + /// Construct fixed-size list array reader. + pub fn new( + item_reader: Box, + fixed_size: usize, + data_type: ArrowType, + def_level: i16, + rep_level: i16, + nullable: bool, + ) -> Self { + Self { + item_reader, + fixed_size, + data_type, + def_level, + rep_level, + nullable, + } + } +} + +impl ArrayReader for FixedSizeListArrayReader { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn get_data_type(&self) -> &ArrowType { + &self.data_type + } + + fn read_records(&mut self, batch_size: usize) -> Result { + let size = self.item_reader.read_records(batch_size)?; + Ok(size) + } + + fn consume_batch(&mut self) -> Result { + let next_batch_array = self.item_reader.consume_batch()?; + if next_batch_array.len() == 0 { + return Ok(new_empty_array(&self.data_type)); + } + + let def_levels = self + .get_def_levels() + .ok_or_else(|| general_err!("item_reader def levels are None"))?; + let rep_levels = self + .get_rep_levels() + .ok_or_else(|| general_err!("item_reader rep levels are None"))?; + + if !rep_levels.is_empty() && rep_levels[0] != 0 { + // This implies either the source data was invalid, or the leaf column + // reader did not correctly delimit semantic records + return Err(general_err!("first repetition level of batch must be 0")); + } + + let mut validity = self + .nullable + .then(|| BooleanBufferBuilder::new(next_batch_array.len())); + + let data = next_batch_array.to_data(); + let mut child_data_builder = + MutableArrayData::new(vec![&data], true, next_batch_array.len()); + + // The current index into the child array entries + let mut child_idx = 0; + // The total number of rows (valid and invalid) in the list array + let mut list_len = 0; + // Start of the current run of valid values + let mut start_idx = None; + let mut row_len = 0; + + def_levels.iter().zip(rep_levels).try_for_each(|(d, r)| { + match r.cmp(&self.rep_level) { + Ordering::Greater => { + // Repetition level greater than current => already handled by inner array + if *d < self.def_level { + return Err(general_err!( + "Encountered repetition level too large for definition level" + )); + } + } + Ordering::Equal => { + // Item inside of the current list + child_idx += 1; + row_len += 1; + } + Ordering::Less => { + // Start of new list row + list_len += 1; + + // Length of the previous row should be equal to: + // - the list's fixed size (valid entries) + // - zero (null entries, start of array) + // Any other length indicates invalid data + if start_idx.is_some() && row_len != self.fixed_size { + return Err(general_err!( + "Encountered misaligned row with length {} (expected length {})", + row_len, + self.fixed_size + )) + } + row_len = 0; + + if *d >= self.def_level { + row_len += 1; + + // Valid list entry + if let Some(validity) = validity.as_mut() { + validity.append(true); + } + // Start a run of valid rows if not already inside of one + start_idx.get_or_insert(child_idx); + } else { + // Null list entry + + if let Some(start) = start_idx.take() { + // Flush pending child items + child_data_builder.extend(0, start, child_idx); + } + // Pad list with nulls + child_data_builder.extend_nulls(self.fixed_size); + + if let Some(validity) = validity.as_mut() { + // Valid if empty list + validity.append(*d + 1 == self.def_level); + } + } + child_idx += 1; + } + } + Ok(()) + })?; + + let child_data = match start_idx { + Some(0) => { + // No null entries - can reuse original array + next_batch_array.to_data() + } + Some(start) => { + // Flush pending child items + child_data_builder.extend(0, start, child_idx); + child_data_builder.freeze() + } + None => child_data_builder.freeze(), + }; + + // Verify total number of elements is aligned with fixed list size + if list_len * self.fixed_size != child_data.len() { + return Err(general_err!( + "fixed-size list length must be a multiple of {} but array contains {} elements", + self.fixed_size, + child_data.len() + )); + } + + let mut list_builder = ArrayData::builder(self.get_data_type().clone()) + .len(list_len) + .add_child_data(child_data); + + if let Some(builder) = validity { + list_builder = list_builder.null_bit_buffer(Some(builder.into())); + } + + let list_data = unsafe { list_builder.build_unchecked() }; + + let result_array = FixedSizeListArray::from(list_data); + Ok(Arc::new(result_array)) + } + + fn skip_records(&mut self, num_records: usize) -> Result { + self.item_reader.skip_records(num_records) + } + + fn get_def_levels(&self) -> Option<&[i16]> { + self.item_reader.get_def_levels() + } + + fn get_rep_levels(&self) -> Option<&[i16]> { + self.item_reader.get_rep_levels() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::arrow::{ + array_reader::{test_util::InMemoryArrayReader, ListArrayReader}, + arrow_reader::{ + ArrowReaderBuilder, ArrowReaderOptions, ParquetRecordBatchReader, + }, + ArrowWriter, + }; + use arrow::datatypes::{Field, Int32Type}; + use arrow_array::{ + builder::{FixedSizeListBuilder, Int32Builder, ListBuilder}, + cast::AsArray, + FixedSizeListArray, ListArray, PrimitiveArray, RecordBatch, + }; + use arrow_buffer::Buffer; + use arrow_data::ArrayDataBuilder; + use arrow_schema::Schema; + use bytes::Bytes; + + #[test] + fn test_nullable_list() { + // [null, [1, null, 2], null, [3, 4, 5], [null, null, null]] + let expected = FixedSizeListArray::from_iter_primitive::( + vec![ + None, + Some([Some(1), None, Some(2)]), + None, + Some([Some(3), Some(4), Some(5)]), + Some([None, None, None]), + ], + 3, + ); + + let array = Arc::new(PrimitiveArray::::from(vec![ + None, + Some(1), + None, + Some(2), + None, + Some(3), + Some(4), + Some(5), + None, + None, + None, + ])); + let item_array_reader = InMemoryArrayReader::new( + ArrowType::Int32, + array, + Some(vec![0, 3, 2, 3, 0, 3, 3, 3, 2, 2, 2]), + Some(vec![0, 0, 1, 1, 0, 0, 1, 1, 0, 1, 1]), + ); + + let mut list_array_reader = FixedSizeListArrayReader::new( + Box::new(item_array_reader), + 3, + ArrowType::FixedSizeList( + Arc::new(Field::new("item", ArrowType::Int32, true)), + 3, + ), + 2, + 1, + true, + ); + let actual = list_array_reader.next_batch(1024).unwrap(); + let actual = actual + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(&expected, actual) + } + + #[test] + fn test_required_list() { + // [[1, null], [2, 3], [null, null], [4, 5]] + let expected = FixedSizeListArray::from_iter_primitive::( + vec![ + Some([Some(1), None]), + Some([Some(2), Some(3)]), + Some([None, None]), + Some([Some(4), Some(5)]), + ], + 2, + ); + + let array = Arc::new(PrimitiveArray::::from(vec![ + Some(1), + None, + Some(2), + Some(3), + None, + None, + Some(4), + Some(5), + ])); + let item_array_reader = InMemoryArrayReader::new( + ArrowType::Int32, + array, + Some(vec![2, 1, 2, 2, 1, 1, 2, 2]), + Some(vec![0, 1, 0, 1, 0, 1, 0, 1]), + ); + + let mut list_array_reader = FixedSizeListArrayReader::new( + Box::new(item_array_reader), + 2, + ArrowType::FixedSizeList( + Arc::new(Field::new("item", ArrowType::Int32, true)), + 2, + ), + 1, + 1, + false, + ); + let actual = list_array_reader.next_batch(1024).unwrap(); + let actual = actual + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(&expected, actual) + } + + #[test] + fn test_nested_list() { + // [ + // null, + // [[1, 2]], + // [[null, 3]], + // null, + // [[4, 5]], + // [[null, null]], + // ] + let l2_type = ArrowType::FixedSizeList( + Arc::new(Field::new("item", ArrowType::Int32, true)), + 2, + ); + let l1_type = ArrowType::FixedSizeList( + Arc::new(Field::new("item", l2_type.clone(), false)), + 1, + ); + + let array = PrimitiveArray::::from(vec![ + None, + None, + Some(1), + Some(2), + None, + Some(3), + None, + None, + Some(4), + Some(5), + None, + None, + ]); + + let l2 = ArrayDataBuilder::new(l2_type.clone()) + .len(6) + .add_child_data(array.into_data()) + .build() + .unwrap(); + + let l1 = ArrayDataBuilder::new(l1_type.clone()) + .len(6) + .add_child_data(l2) + .null_bit_buffer(Some(Buffer::from([0b110110]))) + .build() + .unwrap(); + + let expected = FixedSizeListArray::from(l1); + + let values = Arc::new(PrimitiveArray::::from(vec![ + None, + Some(1), + Some(2), + None, + Some(3), + None, + Some(4), + Some(5), + None, + None, + ])); + + let item_array_reader = InMemoryArrayReader::new( + ArrowType::Int32, + values, + Some(vec![0, 5, 5, 4, 5, 0, 5, 5, 4, 4]), + Some(vec![0, 0, 2, 0, 2, 0, 0, 2, 0, 2]), + ); + + let l2 = FixedSizeListArrayReader::new( + Box::new(item_array_reader), + 2, + l2_type, + 4, + 2, + false, + ); + let mut l1 = FixedSizeListArrayReader::new(Box::new(l2), 1, l1_type, 3, 1, true); + + let expected_1 = expected.slice(0, 2); + let expected_2 = expected.slice(2, 4); + + let actual = l1.next_batch(2).unwrap(); + assert_eq!(actual.as_ref(), &expected_1); + + let actual = l1.next_batch(1024).unwrap(); + assert_eq!(actual.as_ref(), &expected_2); + } + + #[test] + fn test_empty_list() { + // [null, [], null, []] + let expected = FixedSizeListArray::from_iter_primitive::( + vec![None, Some([]), None, Some([])], + 0, + ); + + let array = Arc::new(PrimitiveArray::::from(vec![ + None, None, None, None, + ])); + let item_array_reader = InMemoryArrayReader::new( + ArrowType::Int32, + array, + Some(vec![0, 1, 0, 1]), + Some(vec![0, 0, 0, 0]), + ); + + let mut list_array_reader = FixedSizeListArrayReader::new( + Box::new(item_array_reader), + 0, + ArrowType::FixedSizeList( + Arc::new(Field::new("item", ArrowType::Int32, true)), + 0, + ), + 2, + 1, + true, + ); + let actual = list_array_reader.next_batch(1024).unwrap(); + let actual = actual + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(&expected, actual) + } + + #[test] + fn test_nested_var_list() { + // [[[1, null, 3], null], [[4], []], [[5, 6], [null, null]], null] + let mut builder = + FixedSizeListBuilder::new(ListBuilder::new(Int32Builder::new()), 2); + builder.values().append_value([Some(1), None, Some(3)]); + builder.values().append_null(); + builder.append(true); + builder.values().append_value([Some(4)]); + builder.values().append_value([]); + builder.append(true); + builder.values().append_value([Some(5), Some(6)]); + builder.values().append_value([None, None]); + builder.append(true); + builder.values().append_null(); + builder.values().append_null(); + builder.append(false); + let expected = builder.finish(); + + let array = Arc::new(PrimitiveArray::::from(vec![ + Some(1), + None, + Some(3), + None, + Some(4), + None, + Some(5), + Some(6), + None, + None, + None, + ])); + + let inner_type = + ArrowType::List(Arc::new(Field::new("item", ArrowType::Int32, true))); + let list_type = ArrowType::FixedSizeList( + Arc::new(Field::new("item", inner_type.clone(), true)), + 2, + ); + + let item_array_reader = InMemoryArrayReader::new( + ArrowType::Int32, + array, + Some(vec![5, 4, 5, 2, 5, 3, 5, 5, 4, 4, 0]), + Some(vec![0, 2, 2, 1, 0, 1, 0, 2, 1, 2, 0]), + ); + + let inner_array_reader = ListArrayReader::::new( + Box::new(item_array_reader), + inner_type, + 4, + 2, + true, + ); + + let mut list_array_reader = FixedSizeListArrayReader::new( + Box::new(inner_array_reader), + 2, + list_type, + 2, + 1, + true, + ); + let actual = list_array_reader.next_batch(1024).unwrap(); + let actual = actual + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(&expected, actual) + } + + #[test] + fn test_read_list_column() { + // This test writes a Parquet file containing a fixed-length array column and a primitive column, + // then reads the columns back from the file. + + // [ + // [1, 2, 3, null], + // [5, 6, 7, 8], + // null, + // [9, null, 11, 12], + // ] + let list = FixedSizeListArray::from_iter_primitive::( + vec![ + Some(vec![Some(1), Some(2), Some(3), None]), + Some(vec![Some(5), Some(6), Some(7), Some(8)]), + None, + Some(vec![Some(9), None, Some(11), Some(12)]), + Some(vec![None, None, None, None]), + ], + 4, + ); + + // [null, 2, 3, null, 5] + let primitive = PrimitiveArray::::from_iter(vec![ + None, + Some(2), + Some(3), + None, + Some(5), + ]); + + let schema = Arc::new(Schema::new(vec![ + Field::new( + "list", + ArrowType::FixedSizeList( + Arc::new(Field::new("item", ArrowType::Int32, true)), + 4, + ), + true, + ), + Field::new("primitive", ArrowType::Int32, true), + ])); + + // Create record batch with a fixed-length array column and a primitive column + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(list.clone()), Arc::new(primitive.clone())], + ) + .expect("unable to create record batch"); + + // Write record batch to Parquet + let mut buffer = Vec::with_capacity(1024); + let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), None) + .expect("unable to create parquet writer"); + writer.write(&batch).expect("unable to write record batch"); + writer.close().expect("unable to close parquet writer"); + + // Read record batch from Parquet + let reader = Bytes::from(buffer); + let mut batch_reader = ParquetRecordBatchReader::try_new(reader, 1024) + .expect("unable to create parquet reader"); + let actual = batch_reader + .next() + .expect("missing record batch") + .expect("unable to read record batch"); + + // Verify values of both read columns match + assert_eq!(schema, actual.schema()); + let actual_list = actual + .column(0) + .as_any() + .downcast_ref::() + .expect("unable to cast array to FixedSizeListArray"); + let actual_primitive = actual.column(1).as_primitive::(); + assert_eq!(actual_list, &list); + assert_eq!(actual_primitive, &primitive); + } + + #[test] + fn test_read_as_dyn_list() { + // This test verifies that fixed-size list arrays can be read from Parquet + // as variable-length list arrays. + + // [ + // [1, 2, 3, null], + // [5, 6, 7, 8], + // null, + // [9, null, 11, 12], + // ] + let list = FixedSizeListArray::from_iter_primitive::( + vec![ + Some(vec![Some(1), Some(2), Some(3), None]), + Some(vec![Some(5), Some(6), Some(7), Some(8)]), + None, + Some(vec![Some(9), None, Some(11), Some(12)]), + Some(vec![None, None, None, None]), + ], + 4, + ); + + let schema = Arc::new(Schema::new(vec![Field::new( + "list", + ArrowType::FixedSizeList( + Arc::new(Field::new("item", ArrowType::Int32, true)), + 4, + ), + true, + )])); + + // Create record batch with a single fixed-length array column + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(list)]).unwrap(); + + // Write record batch to Parquet + let mut buffer = Vec::with_capacity(1024); + let mut writer = ArrowWriter::try_new(&mut buffer, schema, None) + .expect("unable to create parquet writer"); + writer.write(&batch).expect("unable to write record batch"); + writer.close().expect("unable to close parquet writer"); + + // Read record batch from Parquet - ignoring arrow metadata + let reader = Bytes::from(buffer); + let mut batch_reader = ArrowReaderBuilder::try_new_with_options( + reader, + ArrowReaderOptions::new().with_skip_arrow_metadata(true), + ) + .expect("unable to create reader builder") + .build() + .expect("unable to create parquet reader"); + let actual = batch_reader + .next() + .expect("missing record batch") + .expect("unable to read record batch"); + + // Verify the read column is a variable length list with values that match the input + let col = actual.column(0).as_list::(); + let expected = ListArray::from_iter_primitive::(vec![ + Some(vec![Some(1), Some(2), Some(3), None]), + Some(vec![Some(5), Some(6), Some(7), Some(8)]), + None, + Some(vec![Some(9), None, Some(11), Some(12)]), + Some(vec![None, None, None, None]), + ]); + assert_eq!(col, &expected); + } +} diff --git a/parquet/src/arrow/array_reader/mod.rs b/parquet/src/arrow/array_reader/mod.rs index f46f6073a714..823084b43207 100644 --- a/parquet/src/arrow/array_reader/mod.rs +++ b/parquet/src/arrow/array_reader/mod.rs @@ -35,6 +35,7 @@ mod byte_array; mod byte_array_dictionary; mod empty_array; mod fixed_len_byte_array; +mod fixed_size_list_array; mod list_array; mod map_array; mod null_array; @@ -48,6 +49,7 @@ pub use builder::build_array_reader; pub use byte_array::make_byte_array_reader; pub use byte_array_dictionary::make_byte_array_dictionary_reader; pub use fixed_len_byte_array::make_fixed_len_byte_array_reader; +pub use fixed_size_list_array::FixedSizeListArrayReader; pub use list_array::ListArrayReader; pub use map_array::MapArrayReader; pub use null_array::NullArrayReader; diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index c69fa420d564..4b14a54c531b 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -1746,11 +1746,10 @@ mod tests { { // Write using low-level parquet API (#1167) - let writer_props = Arc::new(WriterProperties::builder().build()); let mut writer = SerializedFileWriter::new( file.try_clone().unwrap(), schema, - writer_props, + Default::default(), ) .unwrap(); @@ -2288,7 +2287,7 @@ mod tests { } "; let schema = Arc::new(parse_message_type(MESSAGE_TYPE).unwrap()); - let props = Arc::new(WriterProperties::builder().build()); + let props = Default::default(); let mut buf = Vec::with_capacity(1024); let mut writer = SerializedFileWriter::new(&mut buf, schema, props).unwrap(); diff --git a/parquet/src/arrow/arrow_writer/byte_array.rs b/parquet/src/arrow/arrow_writer/byte_array.rs index 24dae4f20d64..6dbc83dd05c4 100644 --- a/parquet/src/arrow/arrow_writer/byte_array.rs +++ b/parquet/src/arrow/arrow_writer/byte_array.rs @@ -15,25 +15,21 @@ // specific language governing permissions and limitations // under the License. -use crate::arrow::arrow_writer::levels::LevelInfo; use crate::basic::Encoding; use crate::bloom_filter::Sbbf; -use crate::column::page::PageWriter; use crate::column::writer::encoder::{ ColumnValueEncoder, DataPageValues, DictionaryPage, }; -use crate::column::writer::GenericColumnWriter; use crate::data_type::{AsBytes, ByteArray, Int32Type}; use crate::encodings::encoding::{DeltaBitPackEncoder, Encoder}; use crate::encodings::rle::RleEncoder; use crate::errors::{ParquetError, Result}; -use crate::file::properties::{WriterProperties, WriterPropertiesPtr, WriterVersion}; -use crate::file::writer::OnCloseColumnChunk; +use crate::file::properties::{WriterProperties, WriterVersion}; use crate::schema::types::ColumnDescPtr; use crate::util::bit_util::num_required_bits; use crate::util::interner::{Interner, Storage}; use arrow_array::{ - Array, ArrayAccessor, ArrayRef, BinaryArray, DictionaryArray, LargeBinaryArray, + Array, ArrayAccessor, BinaryArray, DictionaryArray, LargeBinaryArray, LargeStringArray, StringArray, }; use arrow_schema::DataType; @@ -94,49 +90,6 @@ macro_rules! downcast_op { }; } -/// A writer for byte array types -pub(super) struct ByteArrayWriter<'a> { - writer: GenericColumnWriter<'a, ByteArrayEncoder>, - on_close: Option>, -} - -impl<'a> ByteArrayWriter<'a> { - /// Returns a new [`ByteArrayWriter`] - pub fn new( - descr: ColumnDescPtr, - props: &'a WriterPropertiesPtr, - page_writer: Box, - on_close: OnCloseColumnChunk<'a>, - ) -> Result { - Ok(Self { - writer: GenericColumnWriter::new(descr, props.clone(), page_writer), - on_close: Some(on_close), - }) - } - - pub fn write(&mut self, array: &ArrayRef, levels: LevelInfo) -> Result<()> { - self.writer.write_batch_internal( - array, - Some(levels.non_null_indices()), - levels.def_levels(), - levels.rep_levels(), - None, - None, - None, - )?; - Ok(()) - } - - pub fn close(self) -> Result<()> { - let r = self.writer.close()?; - - if let Some(on_close) = self.on_close { - on_close(r)?; - } - Ok(()) - } -} - /// A fallback encoder, i.e. non-dictionary, for [`ByteArray`] struct FallbackEncoder { encoder: FallbackEncoderImpl, @@ -427,7 +380,7 @@ impl DictEncoder { } } -struct ByteArrayEncoder { +pub struct ByteArrayEncoder { fallback: FallbackEncoder, dict_encoder: Option, min_value: Option, @@ -437,11 +390,11 @@ struct ByteArrayEncoder { impl ColumnValueEncoder for ByteArrayEncoder { type T = ByteArray; - type Values = ArrayRef; + type Values = dyn Array; fn min_max( &self, - values: &ArrayRef, + values: &dyn Array, value_indices: Option<&[usize]>, ) -> Option<(Self::T, Self::T)> { match value_indices { diff --git a/parquet/src/arrow/arrow_writer/levels.rs b/parquet/src/arrow/arrow_writer/levels.rs index fc5b9460322a..21b3e7dff88d 100644 --- a/parquet/src/arrow/arrow_writer/levels.rs +++ b/parquet/src/arrow/arrow_writer/levels.rs @@ -42,7 +42,7 @@ use crate::errors::{ParquetError, Result}; use arrow_array::cast::AsArray; -use arrow_array::{Array, ArrayRef, OffsetSizeTrait, StructArray}; +use arrow_array::{Array, ArrayRef, FixedSizeListArray, OffsetSizeTrait, StructArray}; use arrow_buffer::NullBuffer; use arrow_schema::{DataType, Field}; use std::ops::Range; @@ -144,7 +144,8 @@ impl LevelInfoBuilder { } DataType::List(child) | DataType::LargeList(child) - | DataType::Map(child, _) => { + | DataType::Map(child, _) + | DataType::FixedSizeList(child, _) => { let def_level = match field.is_nullable() { true => parent_ctx.def_level + 2, false => parent_ctx.def_level + 1, @@ -214,6 +215,19 @@ impl LevelInfoBuilder { range, ) } + &DataType::FixedSizeList(_, size) => { + let array = array + .as_any() + .downcast_ref::() + .expect("unable to get fixed-size list array"); + + self.write_fixed_size_list( + size as usize, + array.nulls(), + array.values(), + range, + ) + } _ => unreachable!(), } } @@ -371,6 +385,100 @@ impl LevelInfoBuilder { } } + /// Write `range` elements from FixedSizeListArray with child data `values` and null bitmap `nulls`. + fn write_fixed_size_list( + &mut self, + fixed_size: usize, + nulls: Option<&NullBuffer>, + values: &dyn Array, + range: Range, + ) { + let (child, ctx) = match self { + Self::List(child, ctx) => (child, ctx), + _ => unreachable!(), + }; + + let write_non_null = + |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| { + let values_start = start_idx * fixed_size; + let values_end = end_idx * fixed_size; + child.write(values, values_start..values_end); + + child.visit_leaves(|leaf| { + let rep_levels = leaf.rep_levels.as_mut().unwrap(); + + let row_indices = (0..fixed_size) + .rev() + .cycle() + .take(values_end - values_start); + + // Step backward over the child rep levels and mark the start of each list + rep_levels + .iter_mut() + .rev() + // Filter out reps from nested children + .filter(|&&mut r| r == ctx.rep_level) + .zip(row_indices) + .for_each(|(r, idx)| { + if idx == 0 { + *r = ctx.rep_level - 1; + } + }); + }) + }; + + // If list size is 0, ignore values and just write rep/def levels. + let write_empty = + |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| { + let len = end_idx - start_idx; + child.visit_leaves(|leaf| { + let rep_levels = leaf.rep_levels.as_mut().unwrap(); + rep_levels.extend(std::iter::repeat(ctx.rep_level - 1).take(len)); + let def_levels = leaf.def_levels.as_mut().unwrap(); + def_levels.extend(std::iter::repeat(ctx.def_level - 1).take(len)); + }) + }; + + let write_rows = + |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| { + if fixed_size > 0 { + write_non_null(child, start_idx, end_idx) + } else { + write_empty(child, start_idx, end_idx) + } + }; + + match nulls { + Some(nulls) => { + let mut start_idx = None; + for idx in range.clone() { + if nulls.is_valid(idx) { + // Start a run of valid rows if not already inside of one + start_idx.get_or_insert(idx); + } else { + // Write out any pending valid rows + if let Some(start) = start_idx.take() { + write_rows(child, start, idx); + } + // Add null row + child.visit_leaves(|leaf| { + let rep_levels = leaf.rep_levels.as_mut().unwrap(); + rep_levels.push(ctx.rep_level - 1); + let def_levels = leaf.def_levels.as_mut().unwrap(); + def_levels.push(ctx.def_level - 2); + }) + } + } + // Write out any remaining valid rows + if let Some(start) = start_idx.take() { + write_rows(child, start, range.end); + } + } + // If all rows are valid then write the whole array + None => write_rows(child, range.start, range.end), + } + } + /// Write a primitive array, as defined by [`is_leaf`] fn write_leaf(&mut self, array: &dyn Array, range: Range) { let info = match self { @@ -1397,4 +1505,260 @@ mod tests { assert_eq!(&levels[1], &expected_level); } + + #[test] + fn test_fixed_size_list() { + // [[1, 2], null, null, [7, 8], null] + let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 2); + builder.values().append_slice(&[1, 2]); + builder.append(true); + builder.values().append_slice(&[3, 4]); + builder.append(false); + builder.values().append_slice(&[5, 6]); + builder.append(false); + builder.values().append_slice(&[7, 8]); + builder.append(true); + builder.values().append_slice(&[9, 10]); + builder.append(false); + let a = builder.finish(); + + let item_field = Field::new("item", a.data_type().clone(), true); + let mut builder = + LevelInfoBuilder::try_new(&item_field, Default::default()).unwrap(); + builder.write(&a, 1..4); + let levels = builder.finish(); + + assert_eq!(levels.len(), 1); + + let list_level = levels.get(0).unwrap(); + + let expected_level = LevelInfo { + def_levels: Some(vec![0, 0, 3, 3]), + rep_levels: Some(vec![0, 0, 0, 1]), + non_null_indices: vec![6, 7], + max_def_level: 3, + max_rep_level: 1, + }; + assert_eq!(list_level, &expected_level); + } + + #[test] + fn test_fixed_size_list_of_struct() { + // define schema + let field_a = Field::new("a", DataType::Int32, true); + let field_b = Field::new("b", DataType::Int64, false); + let fields = Fields::from([Arc::new(field_a), Arc::new(field_b)]); + let item_field = Field::new("item", DataType::Struct(fields.clone()), true); + let list_field = Field::new( + "list", + DataType::FixedSizeList(Arc::new(item_field), 2), + true, + ); + + let builder_a = Int32Builder::with_capacity(10); + let builder_b = Int64Builder::with_capacity(10); + let struct_builder = + StructBuilder::new(fields, vec![Box::new(builder_a), Box::new(builder_b)]); + let mut list_builder = FixedSizeListBuilder::new(struct_builder, 2); + + // [ + // [{a: 1, b: 2}, null], + // null, + // [null, null], + // [{a: null, b: 3}, {a: 2, b: 4}] + // ] + + // [{a: 1, b: 2}, null] + let values = list_builder.values(); + // {a: 1, b: 2} + values + .field_builder::(0) + .unwrap() + .append_value(1); + values + .field_builder::(1) + .unwrap() + .append_value(2); + values.append(true); + // null + values + .field_builder::(0) + .unwrap() + .append_null(); + values + .field_builder::(1) + .unwrap() + .append_value(0); + values.append(false); + list_builder.append(true); + + // null + let values = list_builder.values(); + // null + values + .field_builder::(0) + .unwrap() + .append_null(); + values + .field_builder::(1) + .unwrap() + .append_value(0); + values.append(false); + // null + values + .field_builder::(0) + .unwrap() + .append_null(); + values + .field_builder::(1) + .unwrap() + .append_value(0); + values.append(false); + list_builder.append(false); + + // [null, null] + let values = list_builder.values(); + // null + values + .field_builder::(0) + .unwrap() + .append_null(); + values + .field_builder::(1) + .unwrap() + .append_value(0); + values.append(false); + // null + values + .field_builder::(0) + .unwrap() + .append_null(); + values + .field_builder::(1) + .unwrap() + .append_value(0); + values.append(false); + list_builder.append(true); + + // [{a: null, b: 3}, {a: 2, b: 4}] + let values = list_builder.values(); + // {a: null, b: 3} + values + .field_builder::(0) + .unwrap() + .append_null(); + values + .field_builder::(1) + .unwrap() + .append_value(3); + values.append(true); + // {a: 2, b: 4} + values + .field_builder::(0) + .unwrap() + .append_value(2); + values + .field_builder::(1) + .unwrap() + .append_value(4); + values.append(true); + list_builder.append(true); + + let array = Arc::new(list_builder.finish()); + + assert_eq!(array.values().len(), 8); + assert_eq!(array.len(), 4); + + let schema = Arc::new(Schema::new(vec![list_field])); + let rb = RecordBatch::try_new(schema, vec![array]).unwrap(); + + let levels = calculate_array_levels(rb.column(0), rb.schema().field(0)).unwrap(); + let a_levels = &levels[0]; + let b_levels = &levels[1]; + + // [[{a: 1}, null], null, [null, null], [{a: null}, {a: 2}]] + let expected_a = LevelInfo { + def_levels: Some(vec![4, 2, 0, 2, 2, 3, 4]), + rep_levels: Some(vec![0, 1, 0, 0, 1, 0, 1]), + non_null_indices: vec![0, 7], + max_def_level: 4, + max_rep_level: 1, + }; + // [[{b: 2}, null], null, [null, null], [{b: 3}, {b: 4}]] + let expected_b = LevelInfo { + def_levels: Some(vec![3, 2, 0, 2, 2, 3, 3]), + rep_levels: Some(vec![0, 1, 0, 0, 1, 0, 1]), + non_null_indices: vec![0, 6, 7], + max_def_level: 3, + max_rep_level: 1, + }; + + assert_eq!(a_levels, &expected_a); + assert_eq!(b_levels, &expected_b); + } + + #[test] + fn test_fixed_size_list_empty() { + let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 0); + builder.append(true); + builder.append(false); + builder.append(true); + let a = builder.finish(); + + let item_field = Field::new("item", a.data_type().clone(), true); + let mut builder = + LevelInfoBuilder::try_new(&item_field, Default::default()).unwrap(); + builder.write(&a, 0..3); + let levels = builder.finish(); + + assert_eq!(levels.len(), 1); + + let list_level = levels.get(0).unwrap(); + + let expected_level = LevelInfo { + def_levels: Some(vec![1, 0, 1]), + rep_levels: Some(vec![0, 0, 0]), + non_null_indices: vec![], + max_def_level: 3, + max_rep_level: 1, + }; + assert_eq!(list_level, &expected_level); + } + + #[test] + fn test_fixed_size_list_of_var_lists() { + // [[[1, null, 3], null], [[4], []], [[5, 6], [null, null]], null] + let mut builder = + FixedSizeListBuilder::new(ListBuilder::new(Int32Builder::new()), 2); + builder.values().append_value([Some(1), None, Some(3)]); + builder.values().append_null(); + builder.append(true); + builder.values().append_value([Some(4)]); + builder.values().append_value([]); + builder.append(true); + builder.values().append_value([Some(5), Some(6)]); + builder.values().append_value([None, None]); + builder.append(true); + builder.values().append_null(); + builder.values().append_null(); + builder.append(false); + let a = builder.finish(); + + let item_field = Field::new("item", a.data_type().clone(), true); + let mut builder = + LevelInfoBuilder::try_new(&item_field, Default::default()).unwrap(); + builder.write(&a, 0..4); + let levels = builder.finish(); + + let list_level = levels.get(0).unwrap(); + let expected_level = LevelInfo { + def_levels: Some(vec![5, 4, 5, 2, 5, 3, 5, 5, 4, 4, 0]), + rep_levels: Some(vec![0, 2, 2, 1, 0, 1, 0, 2, 1, 2, 0]), + non_null_indices: vec![0, 2, 3, 4, 5], + max_def_level: 5, + max_rep_level: 2, + }; + + assert_eq!(list_level, &expected_level); + } } diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index af820218255d..bde21ae856d0 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -17,13 +17,21 @@ //! Contains writer which writes arrow data into parquet data. -use std::collections::VecDeque; -use std::io::Write; -use std::sync::Arc; +use bytes::Bytes; +use std::fmt::Debug; +use std::io::{Read, Write}; +use std::iter::Peekable; +use std::slice::Iter; +use std::sync::{Arc, Mutex}; +use std::vec::IntoIter; +use thrift::protocol::{TCompactOutputProtocol, TSerializable}; use arrow_array::cast::AsArray; -use arrow_array::types::{Decimal128Type, Int32Type, Int64Type, UInt32Type, UInt64Type}; -use arrow_array::{types, Array, ArrayRef, RecordBatch, RecordBatchWriter}; +use arrow_array::types::{ + Decimal128Type, Float32Type, Float64Type, Int32Type, Int64Type, UInt32Type, + UInt64Type, +}; +use arrow_array::{Array, FixedSizeListArray, RecordBatch, RecordBatchWriter}; use arrow_schema::{ArrowError, DataType as ArrowDataType, IntervalUnit, SchemaRef}; use super::schema::{ @@ -31,14 +39,19 @@ use super::schema::{ decimal_length_from_precision, }; -use crate::arrow::arrow_writer::byte_array::ByteArrayWriter; -use crate::column::writer::{ColumnWriter, ColumnWriterImpl}; -use crate::data_type::{ByteArray, DataType, FixedLenByteArray}; +use crate::arrow::arrow_writer::byte_array::ByteArrayEncoder; +use crate::column::page::{CompressedPage, PageWriteSpec, PageWriter}; +use crate::column::writer::encoder::ColumnValueEncoder; +use crate::column::writer::{ + get_column_writer, ColumnCloseResult, ColumnWriter, GenericColumnWriter, +}; +use crate::data_type::{ByteArray, FixedLenByteArray}; use crate::errors::{ParquetError, Result}; -use crate::file::metadata::{KeyValue, RowGroupMetaDataPtr}; -use crate::file::properties::WriterProperties; +use crate::file::metadata::{ColumnChunkMetaData, KeyValue, RowGroupMetaDataPtr}; +use crate::file::properties::{WriterProperties, WriterPropertiesPtr}; +use crate::file::reader::{ChunkReader, Length}; use crate::file::writer::SerializedFileWriter; -use crate::file::writer::SerializedRowGroupWriter; +use crate::schema::types::{ColumnDescPtr, SchemaDescriptor}; use levels::{calculate_array_levels, LevelInfo}; mod byte_array; @@ -46,8 +59,8 @@ mod levels; /// Arrow writer /// -/// Writes Arrow `RecordBatch`es to a Parquet writer, buffering up `RecordBatch` in order -/// to produce row groups with `max_row_group_size` rows. Any remaining rows will be +/// Writes Arrow `RecordBatch`es to a Parquet writer. Multiple [`RecordBatch`] will be encoded +/// to the same row group, up to `max_row_group_size` rows. Any remaining rows will be /// flushed on close, leading the final row group in the output file to potentially /// contain fewer than `max_row_group_size` rows /// @@ -75,11 +88,8 @@ pub struct ArrowWriter { /// Underlying Parquet writer writer: SerializedFileWriter, - /// For each column, maintain an ordered queue of arrays to write - buffer: Vec>, - - /// The total number of rows currently buffered - buffered_rows: usize, + /// The in-progress row group if any + in_progress: Option, /// A copy of the Arrow schema. /// @@ -90,7 +100,20 @@ pub struct ArrowWriter { max_row_group_size: usize, } -impl ArrowWriter { +impl Debug for ArrowWriter { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let buffered_memory = self.in_progress_size(); + f.debug_struct("ArrowWriter") + .field("writer", &self.writer) + .field("in_progress_size", &format_args!("{buffered_memory} bytes")) + .field("in_progress_rows", &self.in_progress_rows()) + .field("arrow_schema", &self.arrow_schema) + .field("max_row_group_size", &self.max_row_group_size) + .finish() + } +} + +impl ArrowWriter { /// Try to create a new Arrow writer /// /// The writer will fail if: @@ -103,7 +126,7 @@ impl ArrowWriter { ) -> Result { let schema = arrow_to_parquet_schema(&arrow_schema)?; // add serialized arrow schema - let mut props = props.unwrap_or_else(|| WriterProperties::builder().build()); + let mut props = props.unwrap_or_default(); add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props); let max_row_group_size = props.max_row_group_size(); @@ -113,8 +136,7 @@ impl ArrowWriter { Ok(Self { writer: file_writer, - buffer: vec![Default::default(); arrow_schema.fields().len()], - buffered_rows: 0, + in_progress: None, arrow_schema, max_row_group_size, }) @@ -125,43 +147,75 @@ impl ArrowWriter { self.writer.flushed_row_groups() } - /// Enqueues the provided `RecordBatch` to be written + /// Returns the estimated length in bytes of the current in progress row group + pub fn in_progress_size(&self) -> usize { + match &self.in_progress { + Some(in_progress) => in_progress + .writers + .iter() + .map(|(_, x)| x.get_estimated_total_bytes() as usize) + .sum(), + None => 0, + } + } + + /// Returns the number of rows buffered in the in progress row group + pub fn in_progress_rows(&self) -> usize { + self.in_progress + .as_ref() + .map(|x| x.buffered_rows) + .unwrap_or_default() + } + + /// Encodes the provided [`RecordBatch`] /// - /// If following this there are more than `max_row_group_size` rows buffered, - /// this will flush out one or more row groups with `max_row_group_size` rows, - /// and drop any fully written `RecordBatch` + /// If this would cause the current row group to exceed [`WriterProperties::max_row_group_size`] + /// rows, the contents of `batch` will be written to one or more row groups such that all but + /// the final row group in the file contain [`WriterProperties::max_row_group_size`] rows pub fn write(&mut self, batch: &RecordBatch) -> Result<()> { - // validate batch schema against writer's supplied schema - let batch_schema = batch.schema(); - if !(Arc::ptr_eq(&self.arrow_schema, &batch_schema) - || self.arrow_schema.contains(&batch_schema)) - { - return Err(ParquetError::ArrowError( - "Record batch schema does not match writer schema".to_string(), - )); + if batch.num_rows() == 0 { + return Ok(()); } - for (buffer, column) in self.buffer.iter_mut().zip(batch.columns()) { - buffer.push_back(column.clone()) - } + let in_progress = match &mut self.in_progress { + Some(in_progress) => in_progress, + x => x.insert(ArrowRowGroupWriter::new( + self.writer.schema_descr(), + self.writer.properties(), + &self.arrow_schema, + )?), + }; - self.buffered_rows += batch.num_rows(); - self.flush_completed()?; + // If would exceed max_row_group_size, split batch + if in_progress.buffered_rows + batch.num_rows() > self.max_row_group_size { + let to_write = self.max_row_group_size - in_progress.buffered_rows; + let a = batch.slice(0, to_write); + let b = batch.slice(to_write, batch.num_rows() - to_write); + self.write(&a)?; + return self.write(&b); + } - Ok(()) - } + in_progress.write(batch)?; - /// Flushes buffered data until there are less than `max_row_group_size` rows buffered - fn flush_completed(&mut self) -> Result<()> { - while self.buffered_rows >= self.max_row_group_size { - self.flush_rows(self.max_row_group_size)?; + if in_progress.buffered_rows >= self.max_row_group_size { + self.flush()? } Ok(()) } /// Flushes all buffered rows into a new row group pub fn flush(&mut self) -> Result<()> { - self.flush_rows(self.buffered_rows) + let in_progress = match self.in_progress.take() { + Some(in_progress) => in_progress, + None => return Ok(()), + }; + + let mut row_group_writer = self.writer.next_row_group()?; + for (chunk, close) in in_progress.close()? { + row_group_writer.append_column(&chunk, close)?; + } + row_group_writer.close()?; + Ok(()) } /// Additional [`KeyValue`] metadata to be written in addition to those from [`WriterProperties`] @@ -171,68 +225,6 @@ impl ArrowWriter { self.writer.append_key_value_metadata(kv_metadata) } - /// Flushes `num_rows` from the buffer into a new row group - fn flush_rows(&mut self, num_rows: usize) -> Result<()> { - if num_rows == 0 { - return Ok(()); - } - - assert!( - num_rows <= self.buffered_rows, - "cannot flush {} rows only have {}", - num_rows, - self.buffered_rows - ); - - assert!( - num_rows <= self.max_row_group_size, - "cannot flush {} rows would exceed max row group size of {}", - num_rows, - self.max_row_group_size - ); - - let mut row_group_writer = self.writer.next_row_group()?; - - for (col_buffer, field) in self.buffer.iter_mut().zip(self.arrow_schema.fields()) - { - // Collect the number of arrays to append - let mut remaining = num_rows; - let mut arrays = Vec::with_capacity(col_buffer.len()); - while remaining != 0 { - match col_buffer.pop_front() { - Some(next) if next.len() > remaining => { - col_buffer - .push_front(next.slice(remaining, next.len() - remaining)); - arrays.push(next.slice(0, remaining)); - remaining = 0; - } - Some(next) => { - remaining -= next.len(); - arrays.push(next); - } - _ => break, - } - } - - let mut levels = arrays - .iter() - .map(|array| { - let mut levels = calculate_array_levels(array, field)?; - // Reverse levels as we pop() them when writing arrays - levels.reverse(); - Ok(levels) - }) - .collect::>>()?; - - write_leaves(&mut row_group_writer, &arrays, &mut levels)?; - } - - row_group_writer.close()?; - self.buffered_rows -= num_rows; - - Ok(()) - } - /// Flushes any outstanding data and returns the underlying writer. pub fn into_inner(mut self) -> Result { self.flush()?; @@ -246,7 +238,7 @@ impl ArrowWriter { } } -impl RecordBatchWriter for ArrowWriter { +impl RecordBatchWriter for ArrowWriter { fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> { self.write(batch).map_err(|e| e.into()) } @@ -257,146 +249,284 @@ impl RecordBatchWriter for ArrowWriter { } } -fn write_leaves( - row_group_writer: &mut SerializedRowGroupWriter<'_, W>, - arrays: &[ArrayRef], - levels: &mut [Vec], -) -> Result<()> { - assert_eq!(arrays.len(), levels.len()); - assert!(!arrays.is_empty()); - - let data_type = arrays.first().unwrap().data_type().clone(); - assert!(arrays.iter().all(|a| a.data_type() == &data_type)); - - match &data_type { - ArrowDataType::Null - | ArrowDataType::Boolean - | ArrowDataType::Int8 - | ArrowDataType::Int16 - | ArrowDataType::Int32 - | ArrowDataType::Int64 - | ArrowDataType::UInt8 - | ArrowDataType::UInt16 - | ArrowDataType::UInt32 - | ArrowDataType::UInt64 - | ArrowDataType::Float32 - | ArrowDataType::Float64 - | ArrowDataType::Timestamp(_, _) - | ArrowDataType::Date32 - | ArrowDataType::Date64 - | ArrowDataType::Time32(_) - | ArrowDataType::Time64(_) - | ArrowDataType::Duration(_) - | ArrowDataType::Interval(_) - | ArrowDataType::Decimal128(_, _) - | ArrowDataType::Decimal256(_, _) - | ArrowDataType::FixedSizeBinary(_) => { - let mut col_writer = row_group_writer.next_column()?.unwrap(); - for (array, levels) in arrays.iter().zip(levels.iter_mut()) { - write_leaf(col_writer.untyped(), array, levels.pop().expect("Levels exhausted"))?; - } - col_writer.close() - } - ArrowDataType::LargeBinary - | ArrowDataType::Binary - | ArrowDataType::Utf8 - | ArrowDataType::LargeUtf8 => { - let mut col_writer = row_group_writer.next_column_with_factory(ByteArrayWriter::new)?.unwrap(); - for (array, levels) in arrays.iter().zip(levels.iter_mut()) { - col_writer.write(array, levels.pop().expect("Levels exhausted"))?; +/// A list of [`Bytes`] comprising a single column chunk +#[derive(Default)] +struct ArrowColumnChunk { + length: usize, + data: Vec, +} + +impl Length for ArrowColumnChunk { + fn len(&self) -> u64 { + self.length as _ + } +} + +impl ChunkReader for ArrowColumnChunk { + type T = ChainReader; + + fn get_read(&self, start: u64) -> Result { + assert_eq!(start, 0); // Assume append_column writes all data in one-shot + Ok(ChainReader(self.data.clone().into_iter().peekable())) + } + + fn get_bytes(&self, _start: u64, _length: usize) -> Result { + unimplemented!() + } +} + +/// A [`Read`] for an iterator of [`Bytes`] +struct ChainReader(Peekable>); + +impl Read for ChainReader { + fn read(&mut self, out: &mut [u8]) -> std::io::Result { + let buffer = loop { + match self.0.peek_mut() { + Some(b) if b.is_empty() => { + self.0.next(); + continue; + } + Some(b) => break b, + None => return Ok(0), } - col_writer.close() + }; + + let len = buffer.len().min(out.len()); + let b = buffer.split_to(len); + out[..len].copy_from_slice(&b); + Ok(len) + } +} + +/// A shared [`ArrowColumnChunk`] +/// +/// This allows it to be owned by [`ArrowPageWriter`] whilst allowing access via +/// [`ArrowRowGroupWriter`] on flush, without requiring self-referential borrows +type SharedColumnChunk = Arc>; + +#[derive(Default)] +struct ArrowPageWriter { + buffer: SharedColumnChunk, +} + +impl PageWriter for ArrowPageWriter { + fn write_page(&mut self, page: CompressedPage) -> Result { + let mut buf = self.buffer.try_lock().unwrap(); + let page_header = page.to_thrift_header(); + let header = { + let mut header = Vec::with_capacity(1024); + let mut protocol = TCompactOutputProtocol::new(&mut header); + page_header.write_to_out_protocol(&mut protocol)?; + Bytes::from(header) + }; + + let data = page.compressed_page().buffer().clone(); + let compressed_size = data.len() + header.len(); + + let mut spec = PageWriteSpec::new(); + spec.page_type = page.page_type(); + spec.num_values = page.num_values(); + spec.uncompressed_size = page.uncompressed_size() + header.len(); + spec.offset = buf.length as u64; + spec.compressed_size = compressed_size; + spec.bytes_written = compressed_size as u64; + + buf.length += compressed_size; + buf.data.push(header); + buf.data.push(data.into()); + + Ok(spec) + } + + fn write_metadata(&mut self, _metadata: &ColumnChunkMetaData) -> Result<()> { + // Skip writing metadata as won't be copied anyway + Ok(()) + } + + fn close(&mut self) -> Result<()> { + Ok(()) + } +} + +/// Encodes a leaf column to [`ArrowPageWriter`] +enum ArrowColumnWriter { + ByteArray(GenericColumnWriter<'static, ByteArrayEncoder>), + Column(ColumnWriter<'static>), +} + +impl ArrowColumnWriter { + /// Returns the estimated total bytes for this column writer + fn get_estimated_total_bytes(&self) -> u64 { + match self { + ArrowColumnWriter::ByteArray(c) => c.get_estimated_total_bytes(), + ArrowColumnWriter::Column(c) => c.get_estimated_total_bytes(), } - ArrowDataType::List(_) => { - let arrays: Vec<_> = arrays.iter().map(|array|{ - array.as_list::().values().clone() - }).collect(); + } +} + +/// Encodes [`RecordBatch`] to a parquet row group +struct ArrowRowGroupWriter { + writers: Vec<(SharedColumnChunk, ArrowColumnWriter)>, + schema: SchemaRef, + buffered_rows: usize, +} - write_leaves(row_group_writer, &arrays, levels)?; - Ok(()) +impl ArrowRowGroupWriter { + fn new( + parquet: &SchemaDescriptor, + props: &WriterPropertiesPtr, + arrow: &SchemaRef, + ) -> Result { + let mut writers = Vec::with_capacity(arrow.fields.len()); + let mut leaves = parquet.columns().iter(); + for field in &arrow.fields { + get_arrow_column_writer(field.data_type(), props, &mut leaves, &mut writers)?; } - ArrowDataType::LargeList(_) => { - let arrays: Vec<_> = arrays.iter().map(|array|{ - array.as_list::().values().clone() - }).collect(); - write_leaves(row_group_writer, &arrays, levels)?; - Ok(()) + Ok(Self { + writers, + schema: arrow.clone(), + buffered_rows: 0, + }) + } + + fn write(&mut self, batch: &RecordBatch) -> Result<()> { + self.buffered_rows += batch.num_rows(); + let mut writers = self.writers.iter_mut().map(|(_, x)| x); + for (array, field) in batch.columns().iter().zip(&self.schema.fields) { + let mut levels = calculate_array_levels(array, field)?.into_iter(); + write_leaves(&mut writers, &mut levels, array.as_ref())?; } - ArrowDataType::Struct(fields) => { - // Groups child arrays by field - let mut field_arrays = vec![Vec::with_capacity(arrays.len()); fields.len()]; + Ok(()) + } - for array in arrays { - let struct_array: &arrow_array::StructArray = array - .as_any() - .downcast_ref::() - .expect("Unable to get struct array"); + fn close(self) -> Result> { + self.writers + .into_iter() + .map(|(chunk, writer)| { + let close_result = match writer { + ArrowColumnWriter::ByteArray(c) => c.close()?, + ArrowColumnWriter::Column(c) => c.close()?, + }; + + let chunk = Arc::try_unwrap(chunk).ok().unwrap().into_inner().unwrap(); + Ok((chunk, close_result)) + }) + .collect() + } +} - assert_eq!(struct_array.columns().len(), fields.len()); +/// Get an [`ArrowColumnWriter`] along with a reference to its [`SharedColumnChunk`] +fn get_arrow_column_writer( + data_type: &ArrowDataType, + props: &WriterPropertiesPtr, + leaves: &mut Iter<'_, ColumnDescPtr>, + out: &mut Vec<(SharedColumnChunk, ArrowColumnWriter)>, +) -> Result<()> { + let col = |desc: &ColumnDescPtr| { + let page_writer = Box::::default(); + let chunk = page_writer.buffer.clone(); + let writer = get_column_writer(desc.clone(), props.clone(), page_writer); + (chunk, ArrowColumnWriter::Column(writer)) + }; - for (child_array, field) in field_arrays.iter_mut().zip(struct_array.columns()) { - child_array.push(field.clone()) - } - } + let bytes = |desc: &ColumnDescPtr| { + let page_writer = Box::::default(); + let chunk = page_writer.buffer.clone(); + let writer = GenericColumnWriter::new(desc.clone(), props.clone(), page_writer); + (chunk, ArrowColumnWriter::ByteArray(writer)) + }; - for field in field_arrays { - write_leaves(row_group_writer, &field, levels)?; + match data_type { + _ if data_type.is_primitive() => out.push(col(leaves.next().unwrap())), + ArrowDataType::FixedSizeBinary(_) | ArrowDataType::Boolean | ArrowDataType::Null => out.push(col(leaves.next().unwrap())), + ArrowDataType::LargeBinary + | ArrowDataType::Binary + | ArrowDataType::Utf8 + | ArrowDataType::LargeUtf8 => { + out.push(bytes(leaves.next().unwrap())) + } + ArrowDataType::List(f) + | ArrowDataType::LargeList(f) + | ArrowDataType::FixedSizeList(f, _) => { + get_arrow_column_writer(f.data_type(), props, leaves, out)? + } + ArrowDataType::Struct(fields) => { + for field in fields { + get_arrow_column_writer(field.data_type(), props, leaves, out)? } - - Ok(()) } - ArrowDataType::Map(_, _) => { - let mut keys = Vec::with_capacity(arrays.len()); - let mut values = Vec::with_capacity(arrays.len()); - for array in arrays { - let map_array: &arrow_array::MapArray = array - .as_any() - .downcast_ref::() - .expect("Unable to get map array"); - keys.push(map_array.keys().clone()); - values.push(map_array.values().clone()); + ArrowDataType::Map(f, _) => match f.data_type() { + ArrowDataType::Struct(f) => { + get_arrow_column_writer(f[0].data_type(), props, leaves, out)?; + get_arrow_column_writer(f[1].data_type(), props, leaves, out)? } - - write_leaves(row_group_writer, &keys, levels)?; - write_leaves(row_group_writer, &values, levels)?; - Ok(()) + _ => unreachable!("invalid map type"), } ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() { ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 | ArrowDataType::Binary | ArrowDataType::LargeBinary => { - let mut col_writer = row_group_writer.next_column_with_factory(ByteArrayWriter::new)?.unwrap(); - for (array, levels) in arrays.iter().zip(levels.iter_mut()) { - col_writer.write(array, levels.pop().expect("Levels exhausted"))?; - } - col_writer.close() + out.push(bytes(leaves.next().unwrap())) } _ => { - let mut col_writer = row_group_writer.next_column()?.unwrap(); - for (array, levels) in arrays.iter().zip(levels.iter_mut()) { - write_leaf(col_writer.untyped(), array, levels.pop().expect("Levels exhausted"))?; - } - col_writer.close() + out.push(col(leaves.next().unwrap())) + } + } + _ => return Err(ParquetError::NYI( + format!( + "Attempting to write an Arrow type {data_type:?} to parquet that is not yet implemented" + ) + )) + } + Ok(()) +} + +/// Write the leaves of `array` in depth-first order to `writers` with `levels` +fn write_leaves<'a, W>( + writers: &mut W, + levels: &mut IntoIter, + array: &(dyn Array + 'static), +) -> Result<()> +where + W: Iterator, +{ + match array.data_type() { + ArrowDataType::List(_) => { + write_leaves(writers, levels, array.as_list::().values().as_ref())? + } + ArrowDataType::LargeList(_) => { + write_leaves(writers, levels, array.as_list::().values().as_ref())? + } + ArrowDataType::FixedSizeList(_, _) => { + let array = array.as_any().downcast_ref::().unwrap(); + write_leaves(writers, levels, array.values().as_ref())? + } + ArrowDataType::Struct(_) => { + for column in array.as_struct().columns() { + write_leaves(writers, levels, column.as_ref())? } } - ArrowDataType::Float16 => Err(ParquetError::ArrowError( - "Float16 arrays not supported".to_string(), - )), - ArrowDataType::FixedSizeList(_, _) | ArrowDataType::Union(_, _) | ArrowDataType::RunEndEncoded(_, _) => { - Err(ParquetError::NYI( - format!( - "Attempting to write an Arrow type {data_type:?} to parquet that is not yet implemented" - ) - )) + ArrowDataType::Map(_, _) => { + let map = array.as_map(); + write_leaves(writers, levels, map.keys().as_ref())?; + write_leaves(writers, levels, map.values().as_ref())? + } + _ => { + let levels = levels.next().unwrap(); + match writers.next().unwrap() { + ArrowColumnWriter::Column(c) => write_leaf(c, array, levels)?, + ArrowColumnWriter::ByteArray(c) => write_primitive(c, array, levels)?, + }; } } + Ok(()) } fn write_leaf( writer: &mut ColumnWriter<'_>, - column: &ArrayRef, + column: &dyn Array, levels: LevelInfo, -) -> Result { +) -> Result { let indices = levels.non_null_indices(); - let written = match writer { + match writer { ColumnWriter::Int32ColumnWriter(ref mut typed) => { match column.data_type() { ArrowDataType::Date64 => { @@ -405,26 +535,26 @@ fn write_leaf( let array = arrow_cast::cast(&array, &ArrowDataType::Int32)?; let array = array.as_primitive::(); - write_primitive(typed, array.values(), levels)? + write_primitive(typed, array.values(), levels) } ArrowDataType::UInt32 => { let values = column.as_primitive::().values(); // follow C++ implementation and use overflow/reinterpret cast from u32 to i32 which will map // `(i32::MAX as u32)..u32::MAX` to `i32::MIN..0` let array = values.inner().typed_data::(); - write_primitive(typed, array, levels)? + write_primitive(typed, array, levels) } ArrowDataType::Decimal128(_, _) => { // use the int32 to represent the decimal with low precision let array = column .as_primitive::() - .unary::<_, types::Int32Type>(|v| v as i32); - write_primitive(typed, array.values(), levels)? + .unary::<_, Int32Type>(|v| v as i32); + write_primitive(typed, array.values(), levels) } _ => { let array = arrow_cast::cast(column, &ArrowDataType::Int32)?; let array = array.as_primitive::(); - write_primitive(typed, array.values(), levels)? + write_primitive(typed, array.values(), levels) } } } @@ -434,32 +564,32 @@ fn write_leaf( get_bool_array_slice(array, indices).as_slice(), levels.def_levels(), levels.rep_levels(), - )? + ) } ColumnWriter::Int64ColumnWriter(ref mut typed) => { match column.data_type() { ArrowDataType::Int64 => { let array = column.as_primitive::(); - write_primitive(typed, array.values(), levels)? + write_primitive(typed, array.values(), levels) } ArrowDataType::UInt64 => { let values = column.as_primitive::().values(); // follow C++ implementation and use overflow/reinterpret cast from u64 to i64 which will map // `(i64::MAX as u64)..u64::MAX` to `i64::MIN..0` let array = values.inner().typed_data::(); - write_primitive(typed, array, levels)? + write_primitive(typed, array, levels) } ArrowDataType::Decimal128(_, _) => { // use the int64 to represent the decimal with low precision let array = column .as_primitive::() - .unary::<_, types::Int64Type>(|v| v as i64); - write_primitive(typed, array.values(), levels)? + .unary::<_, Int64Type>(|v| v as i64); + write_primitive(typed, array.values(), levels) } _ => { let array = arrow_cast::cast(column, &ArrowDataType::Int64)?; let array = array.as_primitive::(); - write_primitive(typed, array.values(), levels)? + write_primitive(typed, array.values(), levels) } } } @@ -467,18 +597,12 @@ fn write_leaf( unreachable!("Currently unreachable because data type not supported") } ColumnWriter::FloatColumnWriter(ref mut typed) => { - let array = column - .as_any() - .downcast_ref::() - .expect("Unable to get Float32 array"); - write_primitive(typed, array.values(), levels)? + let array = column.as_primitive::(); + write_primitive(typed, array.values(), levels) } ColumnWriter::DoubleColumnWriter(ref mut typed) => { - let array = column - .as_any() - .downcast_ref::() - .expect("Unable to get Float64 array"); - write_primitive(typed, array.values(), levels)? + let array = column.as_primitive::(); + write_primitive(typed, array.values(), levels) } ColumnWriter::ByteArrayColumnWriter(_) => { unreachable!("should use ByteArrayWriter") @@ -516,10 +640,7 @@ fn write_leaf( get_fsb_array_slice(array, indices) } ArrowDataType::Decimal128(_, _) => { - let array = column - .as_any() - .downcast_ref::() - .unwrap(); + let array = column.as_primitive::(); get_decimal_array_slice(array, indices) } _ => { @@ -529,19 +650,14 @@ fn write_leaf( )); } }; - typed.write_batch( - bytes.as_slice(), - levels.def_levels(), - levels.rep_levels(), - )? + typed.write_batch(bytes.as_slice(), levels.def_levels(), levels.rep_levels()) } - }; - Ok(written as i64) + } } -fn write_primitive( - writer: &mut ColumnWriterImpl<'_, T>, - values: &[T::T], +fn write_primitive( + writer: &mut GenericColumnWriter, + values: &E::Values, levels: LevelInfo, ) -> Result { writer.write_batch_internal( @@ -2425,4 +2541,40 @@ mod tests { assert_ne!(back.schema(), batch.schema()); assert_eq!(back.column(0).as_ref(), batch.column(0).as_ref()); } + + #[test] + fn in_progress_accounting() { + // define schema + let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + + // create some data + let a = Int32Array::from(vec![1, 2, 3, 4, 5]); + + // build a record batch + let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap(); + + let mut writer = ArrowWriter::try_new(vec![], batch.schema(), None).unwrap(); + + // starts empty + assert_eq!(writer.in_progress_size(), 0); + assert_eq!(writer.in_progress_rows(), 0); + writer.write(&batch).unwrap(); + + // updated on write + let initial_size = writer.in_progress_size(); + assert!(initial_size > 0); + assert_eq!(writer.in_progress_rows(), 5); + + // updated on second write + writer.write(&batch).unwrap(); + assert!(writer.in_progress_size() > initial_size); + assert_eq!(writer.in_progress_rows(), 10); + + // cleared on flush + writer.flush().unwrap(); + assert_eq!(writer.in_progress_size(), 0); + assert_eq!(writer.in_progress_rows(), 0); + + writer.close().unwrap(); + } } diff --git a/parquet/src/arrow/mod.rs b/parquet/src/arrow/mod.rs index da7e850c3d60..e5211ec23931 100644 --- a/parquet/src/arrow/mod.rs +++ b/parquet/src/arrow/mod.rs @@ -40,10 +40,7 @@ //! //! let file = File::create("data.parquet").unwrap(); //! -//! // Default writer properties -//! let props = WriterProperties::builder().build(); -//! -//! let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(props)).unwrap(); +//! let mut writer = ArrowWriter::try_new(file, batch.schema(), None).unwrap(); //! //! writer.write(&batch).expect("Writing batch"); //! diff --git a/parquet/src/column/mod.rs b/parquet/src/column/mod.rs index cb0c035dd6e2..a68127a4ef05 100644 --- a/parquet/src/column/mod.rs +++ b/parquet/src/column/mod.rs @@ -63,9 +63,8 @@ //! } //! "; //! let schema = Arc::new(parse_message_type(message_type).unwrap()); -//! let props = Arc::new(WriterProperties::builder().build()); //! let file = fs::File::create(path).unwrap(); -//! let mut writer = SerializedFileWriter::new(file, schema, props).unwrap(); +//! let mut writer = SerializedFileWriter::new(file, schema, Default::default()).unwrap(); //! //! let mut row_group_writer = writer.next_row_group().unwrap(); //! while let Some(mut col_writer) = row_group_writer.next_column().unwrap() { diff --git a/parquet/src/column/page.rs b/parquet/src/column/page.rs index bd3568d13cee..57a0278e23c4 100644 --- a/parquet/src/column/page.rs +++ b/parquet/src/column/page.rs @@ -162,6 +162,75 @@ impl CompressedPage { pub fn data(&self) -> &[u8] { self.compressed_page.buffer().data() } + + /// Returns the thrift page header + pub(crate) fn to_thrift_header(&self) -> PageHeader { + let uncompressed_size = self.uncompressed_size(); + let compressed_size = self.compressed_size(); + let num_values = self.num_values(); + let encoding = self.encoding(); + let page_type = self.page_type(); + + let mut page_header = PageHeader { + type_: page_type.into(), + uncompressed_page_size: uncompressed_size as i32, + compressed_page_size: compressed_size as i32, + // TODO: Add support for crc checksum + crc: None, + data_page_header: None, + index_page_header: None, + dictionary_page_header: None, + data_page_header_v2: None, + }; + + match self.compressed_page { + Page::DataPage { + def_level_encoding, + rep_level_encoding, + ref statistics, + .. + } => { + let data_page_header = crate::format::DataPageHeader { + num_values: num_values as i32, + encoding: encoding.into(), + definition_level_encoding: def_level_encoding.into(), + repetition_level_encoding: rep_level_encoding.into(), + statistics: crate::file::statistics::to_thrift(statistics.as_ref()), + }; + page_header.data_page_header = Some(data_page_header); + } + Page::DataPageV2 { + num_nulls, + num_rows, + def_levels_byte_len, + rep_levels_byte_len, + is_compressed, + ref statistics, + .. + } => { + let data_page_header_v2 = crate::format::DataPageHeaderV2 { + num_values: num_values as i32, + num_nulls: num_nulls as i32, + num_rows: num_rows as i32, + encoding: encoding.into(), + definition_levels_byte_length: def_levels_byte_len as i32, + repetition_levels_byte_length: rep_levels_byte_len as i32, + is_compressed: Some(is_compressed), + statistics: crate::file::statistics::to_thrift(statistics.as_ref()), + }; + page_header.data_page_header_v2 = Some(data_page_header_v2); + } + Page::DictionaryPage { is_sorted, .. } => { + let dictionary_page_header = crate::format::DictionaryPageHeader { + num_values: num_values as i32, + encoding: encoding.into(), + is_sorted: Some(is_sorted), + }; + page_header.dictionary_page_header = Some(dictionary_page_header); + } + } + page_header + } } /// Contains page write metrics. @@ -248,7 +317,7 @@ pub trait PageReader: Iterator> + Send { /// /// It is reasonable to assume that all pages will be written in the correct order, e.g. /// dictionary page followed by data pages, or a set of data pages, etc. -pub trait PageWriter { +pub trait PageWriter: Send { /// Writes a page into the output stream/sink. /// Returns `PageWriteSpec` that contains information about written page metrics, /// including number of bytes, size, number of values, offset, etc. diff --git a/parquet/src/column/writer/encoder.rs b/parquet/src/column/writer/encoder.rs index c343f1d6c824..fb5889b785a8 100644 --- a/parquet/src/column/writer/encoder.rs +++ b/parquet/src/column/writer/encoder.rs @@ -36,7 +36,7 @@ pub trait ColumnValues { } #[cfg(feature = "arrow")] -impl ColumnValues for T { +impl ColumnValues for dyn arrow_array::Array { fn len(&self) -> usize { arrow_array::Array::len(self) } diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index 51e2614993e1..5e623d281157 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -43,6 +43,21 @@ use crate::util::memory::ByteBufferPtr; pub(crate) mod encoder; +macro_rules! downcast_writer { + ($e:expr, $i:ident, $b:expr) => { + match $e { + Self::BoolColumnWriter($i) => $b, + Self::Int32ColumnWriter($i) => $b, + Self::Int64ColumnWriter($i) => $b, + Self::Int96ColumnWriter($i) => $b, + Self::FloatColumnWriter($i) => $b, + Self::DoubleColumnWriter($i) => $b, + Self::ByteArrayColumnWriter($i) => $b, + Self::FixedLenByteArrayColumnWriter($i) => $b, + } + }; +} + /// Column writer for a Parquet type. pub enum ColumnWriter<'a> { BoolColumnWriter(ColumnWriterImpl<'a, BoolType>), @@ -55,6 +70,19 @@ pub enum ColumnWriter<'a> { FixedLenByteArrayColumnWriter(ColumnWriterImpl<'a, FixedLenByteArrayType>), } +impl<'a> ColumnWriter<'a> { + /// Returns the estimated total bytes for this column writer + #[cfg(feature = "arrow")] + pub(crate) fn get_estimated_total_bytes(&self) -> u64 { + downcast_writer!(self, typed, typed.get_estimated_total_bytes()) + } + + /// Close this [`ColumnWriter`] + pub fn close(self) -> Result { + downcast_writer!(self, typed, typed.close()) + } +} + pub enum Level { Page, Column, @@ -421,10 +449,24 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { /// Returns total number of bytes written by this column writer so far. /// This value is also returned when column writer is closed. + /// + /// Note: this value does not include any buffered data that has not + /// yet been flushed to a page. pub fn get_total_bytes_written(&self) -> u64 { self.column_metrics.total_bytes_written } + /// Returns the estimated total bytes for this column writer + /// + /// Unlike [`Self::get_total_bytes_written`] this includes an estimate + /// of any data that has not yet been flushed to a page + #[cfg(feature = "arrow")] + pub(crate) fn get_estimated_total_bytes(&self) -> u64 { + self.column_metrics.total_bytes_written + + self.encoder.estimated_data_page_size() as u64 + + self.encoder.estimated_dict_page_size().unwrap_or_default() as u64 + } + /// Returns total number of rows written by this column writer so far. /// This value is also returned when column writer is closed. pub fn get_total_rows_written(&self) -> u64 { @@ -915,11 +957,11 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { fn update_metrics_for_page(&mut self, page_spec: PageWriteSpec) { self.column_metrics.total_uncompressed_size += page_spec.uncompressed_size as u64; self.column_metrics.total_compressed_size += page_spec.compressed_size as u64; - self.column_metrics.total_num_values += page_spec.num_values as u64; self.column_metrics.total_bytes_written += page_spec.bytes_written; match page_spec.page_type { PageType::DATA_PAGE | PageType::DATA_PAGE_V2 => { + self.column_metrics.total_num_values += page_spec.num_values as u64; if self.column_metrics.data_page_offset.is_none() { self.column_metrics.data_page_offset = Some(page_spec.offset); } @@ -1131,7 +1173,7 @@ mod tests { #[test] fn test_column_writer_inconsistent_def_rep_length() { let page_writer = get_test_page_writer(); - let props = Arc::new(WriterProperties::builder().build()); + let props = Default::default(); let mut writer = get_test_column_writer::(page_writer, 1, 1, props); let res = writer.write_batch(&[1, 2, 3, 4], Some(&[1, 1, 1]), Some(&[0, 0])); assert!(res.is_err()); @@ -1146,7 +1188,7 @@ mod tests { #[test] fn test_column_writer_invalid_def_levels() { let page_writer = get_test_page_writer(); - let props = Arc::new(WriterProperties::builder().build()); + let props = Default::default(); let mut writer = get_test_column_writer::(page_writer, 1, 0, props); let res = writer.write_batch(&[1, 2, 3, 4], None, None); assert!(res.is_err()); @@ -1161,7 +1203,7 @@ mod tests { #[test] fn test_column_writer_invalid_rep_levels() { let page_writer = get_test_page_writer(); - let props = Arc::new(WriterProperties::builder().build()); + let props = Default::default(); let mut writer = get_test_column_writer::(page_writer, 0, 1, props); let res = writer.write_batch(&[1, 2, 3, 4], None, None); assert!(res.is_err()); @@ -1176,7 +1218,7 @@ mod tests { #[test] fn test_column_writer_not_enough_values_to_write() { let page_writer = get_test_page_writer(); - let props = Arc::new(WriterProperties::builder().build()); + let props = Default::default(); let mut writer = get_test_column_writer::(page_writer, 1, 0, props); let res = writer.write_batch(&[1, 2], Some(&[1, 1, 1, 1]), None); assert!(res.is_err()); @@ -1191,7 +1233,7 @@ mod tests { #[test] fn test_column_writer_write_only_one_dictionary_page() { let page_writer = get_test_page_writer(); - let props = Arc::new(WriterProperties::builder().build()); + let props = Default::default(); let mut writer = get_test_column_writer::(page_writer, 0, 0, props); writer.write_batch(&[1, 2, 3, 4], None, None).unwrap(); // First page should be correctly written. @@ -1499,7 +1541,7 @@ mod tests { #[test] fn test_column_writer_check_metadata() { let page_writer = get_test_page_writer(); - let props = Arc::new(WriterProperties::builder().build()); + let props = Default::default(); let mut writer = get_test_column_writer::(page_writer, 0, 0, props); writer.write_batch(&[1, 2, 3, 4], None, None).unwrap(); @@ -1512,7 +1554,7 @@ mod tests { metadata.encodings(), &vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY] ); - assert_eq!(metadata.num_values(), 8); // dictionary + value indexes + assert_eq!(metadata.num_values(), 4); assert_eq!(metadata.compressed_size(), 20); assert_eq!(metadata.uncompressed_size(), 20); assert_eq!(metadata.data_page_offset(), 0); @@ -1535,7 +1577,7 @@ mod tests { #[test] fn test_column_writer_check_byte_array_min_max() { let page_writer = get_test_page_writer(); - let props = Arc::new(WriterProperties::builder().build()); + let props = Default::default(); let mut writer = get_test_decimals_column_writer::(page_writer, 0, 0, props); writer @@ -1591,7 +1633,7 @@ mod tests { #[test] fn test_column_writer_uint32_converted_type_min_max() { let page_writer = get_test_page_writer(); - let props = Arc::new(WriterProperties::builder().build()); + let props = Default::default(); let mut writer = get_test_unsigned_int_given_as_converted_column_writer::< Int32Type, >(page_writer, 0, 0, props); @@ -1639,7 +1681,7 @@ mod tests { metadata.encodings(), &vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY] ); - assert_eq!(metadata.num_values(), 8); // dictionary + value indexes + assert_eq!(metadata.num_values(), 4); assert_eq!(metadata.compressed_size(), 20); assert_eq!(metadata.uncompressed_size(), 20); assert_eq!(metadata.data_page_offset(), 0); @@ -1664,7 +1706,7 @@ mod tests { let mut buf = Vec::with_capacity(100); let mut write = TrackedWrite::new(&mut buf); let page_writer = Box::new(SerializedPageWriter::new(&mut write)); - let props = Arc::new(WriterProperties::builder().build()); + let props = Default::default(); let mut writer = get_test_column_writer::(page_writer, 0, 0, props); writer.write_batch(&[1, 2, 3, 4], None, None).unwrap(); @@ -1772,25 +1814,25 @@ mod tests { #[test] fn test_column_writer_empty_column_roundtrip() { - let props = WriterProperties::builder().build(); + let props = Default::default(); column_roundtrip::(props, &[], None, None); } #[test] fn test_column_writer_non_nullable_values_roundtrip() { - let props = WriterProperties::builder().build(); + let props = Default::default(); column_roundtrip_random::(props, 1024, i32::MIN, i32::MAX, 0, 0); } #[test] fn test_column_writer_nullable_non_repeated_values_roundtrip() { - let props = WriterProperties::builder().build(); + let props = Default::default(); column_roundtrip_random::(props, 1024, i32::MIN, i32::MAX, 10, 0); } #[test] fn test_column_writer_nullable_repeated_values_roundtrip() { - let props = WriterProperties::builder().build(); + let props = Default::default(); column_roundtrip_random::(props, 1024, i32::MIN, i32::MAX, 10, 10); } @@ -2121,7 +2163,7 @@ mod tests { // write data // and check the offset index and column index let page_writer = get_test_page_writer(); - let props = Arc::new(WriterProperties::builder().build()); + let props = Default::default(); let mut writer = get_test_column_writer::(page_writer, 0, 0, props); writer.write_batch(&[1, 2, 3, 4], None, None).unwrap(); // first page @@ -2174,6 +2216,12 @@ mod tests { ); } + #[test] + fn test_send() { + fn test() {} + test::>(); + } + /// Performs write-read roundtrip with randomly generated values and levels. /// `max_size` is maximum number of values or levels (if `max_def_level` > 0) to write /// for a column. @@ -2433,7 +2481,7 @@ mod tests { /// Write data into parquet using [`get_test_page_writer`] and [`get_test_column_writer`] and returns generated statistics. fn statistics_roundtrip(values: &[::T]) -> Statistics { let page_writer = get_test_page_writer(); - let props = Arc::new(WriterProperties::builder().build()); + let props = Default::default(); let mut writer = get_test_column_writer::(page_writer, 0, 0, props); writer.write_batch(values, None, None).unwrap(); diff --git a/parquet/src/encodings/encoding/mod.rs b/parquet/src/encodings/encoding/mod.rs index b7e30c4ecf08..3088f332183b 100644 --- a/parquet/src/encodings/encoding/mod.rs +++ b/parquet/src/encodings/encoding/mod.rs @@ -40,7 +40,7 @@ mod dict_encoder; /// /// Currently this allocates internal buffers for the encoded values. After done putting /// values, caller should call `flush_buffer()` to get an immutable buffer pointer. -pub trait Encoder { +pub trait Encoder: Send { /// Encodes data from `values`. fn put(&mut self, values: &[T::T]) -> Result<()>; diff --git a/parquet/src/file/metadata.rs b/parquet/src/file/metadata.rs index c2961aa76d06..40f6cf3123c7 100644 --- a/parquet/src/file/metadata.rs +++ b/parquet/src/file/metadata.rs @@ -365,78 +365,69 @@ impl RowGroupMetaData { ordinal: None, } } + + /// Converts this [`RowGroupMetaData`] into a [`RowGroupMetaDataBuilder`] + pub fn into_builder(self) -> RowGroupMetaDataBuilder { + RowGroupMetaDataBuilder(self) + } } /// Builder for row group metadata. -pub struct RowGroupMetaDataBuilder { - columns: Vec, - schema_descr: SchemaDescPtr, - num_rows: i64, - sorting_columns: Option>, - total_byte_size: i64, -} +pub struct RowGroupMetaDataBuilder(RowGroupMetaData); impl RowGroupMetaDataBuilder { /// Creates new builder from schema descriptor. fn new(schema_descr: SchemaDescPtr) -> Self { - Self { + Self(RowGroupMetaData { columns: Vec::with_capacity(schema_descr.num_columns()), schema_descr, num_rows: 0, sorting_columns: None, total_byte_size: 0, - } + }) } /// Sets number of rows in this row group. pub fn set_num_rows(mut self, value: i64) -> Self { - self.num_rows = value; + self.0.num_rows = value; self } /// Sets the sorting order for columns pub fn set_sorting_columns(mut self, value: Option>) -> Self { - self.sorting_columns = value; + self.0.sorting_columns = value; self } /// Sets total size in bytes for this row group. pub fn set_total_byte_size(mut self, value: i64) -> Self { - self.total_byte_size = value; + self.0.total_byte_size = value; self } /// Sets column metadata for this row group. pub fn set_column_metadata(mut self, value: Vec) -> Self { - self.columns = value; + self.0.columns = value; self } /// Builds row group metadata. pub fn build(self) -> Result { - if self.schema_descr.num_columns() != self.columns.len() { + if self.0.schema_descr.num_columns() != self.0.columns.len() { return Err(general_err!( "Column length mismatch: {} != {}", - self.schema_descr.num_columns(), - self.columns.len() + self.0.schema_descr.num_columns(), + self.0.columns.len() )); } - Ok(RowGroupMetaData { - columns: self.columns, - num_rows: self.num_rows, - sorting_columns: self.sorting_columns, - total_byte_size: self.total_byte_size, - schema_descr: self.schema_descr, - }) + Ok(self.0) } } /// Metadata for a column chunk. #[derive(Debug, Clone, PartialEq)] pub struct ColumnChunkMetaData { - column_type: Type, - column_path: ColumnPath, column_descr: ColumnDescPtr, encodings: Vec, file_path: Option, @@ -479,12 +470,12 @@ impl ColumnChunkMetaData { /// Type of this column. Must be primitive. pub fn column_type(&self) -> Type { - self.column_type + self.column_descr.physical_type() } /// Path (or identifier) of this column. pub fn column_path(&self) -> &ColumnPath { - &self.column_path + self.column_descr.path() } /// Descriptor for this column. @@ -609,7 +600,6 @@ impl ColumnChunkMetaData { } let mut col_metadata: ColumnMetaData = cc.meta_data.unwrap(); let column_type = Type::try_from(col_metadata.type_)?; - let column_path = ColumnPath::new(col_metadata.path_in_schema); let encodings = col_metadata .encodings .drain(0..) @@ -641,8 +631,6 @@ impl ColumnChunkMetaData { let column_index_length = cc.column_index_length; let result = ColumnChunkMetaData { - column_type, - column_path, column_descr, encodings, file_path, @@ -685,9 +673,9 @@ impl ColumnChunkMetaData { /// Method to convert to Thrift `ColumnMetaData` pub fn to_column_metadata_thrift(&self) -> ColumnMetaData { ColumnMetaData { - type_: self.column_type.into(), + type_: self.column_type().into(), encodings: self.encodings().iter().map(|&v| v.into()).collect(), - path_in_schema: Vec::from(self.column_path.as_ref()), + path_in_schema: self.column_path().as_ref().to_vec(), codec: self.compression.into(), num_values: self.num_values, total_uncompressed_size: self.total_uncompressed_size, @@ -704,34 +692,20 @@ impl ColumnChunkMetaData { bloom_filter_offset: self.bloom_filter_offset, } } + + /// Converts this [`ColumnChunkMetaData`] into a [`ColumnChunkMetaDataBuilder`] + pub fn into_builder(self) -> ColumnChunkMetaDataBuilder { + ColumnChunkMetaDataBuilder(self) + } } /// Builder for column chunk metadata. -pub struct ColumnChunkMetaDataBuilder { - column_descr: ColumnDescPtr, - encodings: Vec, - file_path: Option, - file_offset: i64, - num_values: i64, - compression: Compression, - total_compressed_size: i64, - total_uncompressed_size: i64, - data_page_offset: i64, - index_page_offset: Option, - dictionary_page_offset: Option, - statistics: Option, - encoding_stats: Option>, - bloom_filter_offset: Option, - offset_index_offset: Option, - offset_index_length: Option, - column_index_offset: Option, - column_index_length: Option, -} +pub struct ColumnChunkMetaDataBuilder(ColumnChunkMetaData); impl ColumnChunkMetaDataBuilder { /// Creates new column chunk metadata builder. fn new(column_descr: ColumnDescPtr) -> Self { - Self { + Self(ColumnChunkMetaData { column_descr, encodings: Vec::new(), file_path: None, @@ -750,135 +724,114 @@ impl ColumnChunkMetaDataBuilder { offset_index_length: None, column_index_offset: None, column_index_length: None, - } + }) } /// Sets list of encodings for this column chunk. pub fn set_encodings(mut self, encodings: Vec) -> Self { - self.encodings = encodings; + self.0.encodings = encodings; self } /// Sets optional file path for this column chunk. pub fn set_file_path(mut self, value: String) -> Self { - self.file_path = Some(value); + self.0.file_path = Some(value); self } /// Sets file offset in bytes. pub fn set_file_offset(mut self, value: i64) -> Self { - self.file_offset = value; + self.0.file_offset = value; self } /// Sets number of values. pub fn set_num_values(mut self, value: i64) -> Self { - self.num_values = value; + self.0.num_values = value; self } /// Sets compression. pub fn set_compression(mut self, value: Compression) -> Self { - self.compression = value; + self.0.compression = value; self } /// Sets total compressed size in bytes. pub fn set_total_compressed_size(mut self, value: i64) -> Self { - self.total_compressed_size = value; + self.0.total_compressed_size = value; self } /// Sets total uncompressed size in bytes. pub fn set_total_uncompressed_size(mut self, value: i64) -> Self { - self.total_uncompressed_size = value; + self.0.total_uncompressed_size = value; self } /// Sets data page offset in bytes. pub fn set_data_page_offset(mut self, value: i64) -> Self { - self.data_page_offset = value; + self.0.data_page_offset = value; self } /// Sets optional dictionary page ofset in bytes. pub fn set_dictionary_page_offset(mut self, value: Option) -> Self { - self.dictionary_page_offset = value; + self.0.dictionary_page_offset = value; self } /// Sets optional index page offset in bytes. pub fn set_index_page_offset(mut self, value: Option) -> Self { - self.index_page_offset = value; + self.0.index_page_offset = value; self } /// Sets statistics for this column chunk. pub fn set_statistics(mut self, value: Statistics) -> Self { - self.statistics = Some(value); + self.0.statistics = Some(value); self } /// Sets page encoding stats for this column chunk. pub fn set_page_encoding_stats(mut self, value: Vec) -> Self { - self.encoding_stats = Some(value); + self.0.encoding_stats = Some(value); self } /// Sets optional bloom filter offset in bytes. pub fn set_bloom_filter_offset(mut self, value: Option) -> Self { - self.bloom_filter_offset = value; + self.0.bloom_filter_offset = value; self } /// Sets optional offset index offset in bytes. pub fn set_offset_index_offset(mut self, value: Option) -> Self { - self.offset_index_offset = value; + self.0.offset_index_offset = value; self } /// Sets optional offset index length in bytes. pub fn set_offset_index_length(mut self, value: Option) -> Self { - self.offset_index_length = value; + self.0.offset_index_length = value; self } /// Sets optional column index offset in bytes. pub fn set_column_index_offset(mut self, value: Option) -> Self { - self.column_index_offset = value; + self.0.column_index_offset = value; self } /// Sets optional column index length in bytes. pub fn set_column_index_length(mut self, value: Option) -> Self { - self.column_index_length = value; + self.0.column_index_length = value; self } /// Builds column chunk metadata. pub fn build(self) -> Result { - Ok(ColumnChunkMetaData { - column_type: self.column_descr.physical_type(), - column_path: self.column_descr.path().clone(), - column_descr: self.column_descr, - encodings: self.encodings, - file_path: self.file_path, - file_offset: self.file_offset, - num_values: self.num_values, - compression: self.compression, - total_compressed_size: self.total_compressed_size, - total_uncompressed_size: self.total_uncompressed_size, - data_page_offset: self.data_page_offset, - index_page_offset: self.index_page_offset, - dictionary_page_offset: self.dictionary_page_offset, - statistics: self.statistics, - encoding_stats: self.encoding_stats, - bloom_filter_offset: self.bloom_filter_offset, - offset_index_offset: self.offset_index_offset, - offset_index_length: self.offset_index_length, - column_index_offset: self.column_index_offset, - column_index_length: self.column_index_length, - }) + Ok(self.0) } } diff --git a/parquet/src/file/mod.rs b/parquet/src/file/mod.rs index 66d8ce48e0a7..fffe383c57ae 100644 --- a/parquet/src/file/mod.rs +++ b/parquet/src/file/mod.rs @@ -45,9 +45,8 @@ //! } //! "; //! let schema = Arc::new(parse_message_type(message_type).unwrap()); -//! let props = Arc::new(WriterProperties::builder().build()); //! let file = fs::File::create(&path).unwrap(); -//! let mut writer = SerializedFileWriter::new(file, schema, props).unwrap(); +//! let mut writer = SerializedFileWriter::new(file, schema, Default::default()).unwrap(); //! let mut row_group_writer = writer.next_row_group().unwrap(); //! while let Some(mut col_writer) = row_group_writer.next_column().unwrap() { //! // ... write values to a column writer diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index 1d6f38dcd3c4..c09503987a00 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -27,7 +27,7 @@ //! }; //! //! // Create properties with default configuration. -//! let props = WriterProperties::builder().build(); +//! let props = WriterProperties::default(); //! //! // Use properties builder to set certain options and assemble the configuration. //! let props = WriterProperties::builder() @@ -130,7 +130,20 @@ pub struct WriterProperties { sorting_columns: Option>, } +impl Default for WriterProperties { + fn default() -> Self { + Self::builder().build() + } +} + impl WriterProperties { + /// Create a new [`WriterProperties`] with the default settings + /// + /// See [`WriterProperties::builder`] for customising settings + pub fn new() -> Self { + Self::default() + } + /// Returns builder for writer properties with default values. pub fn builder() -> WriterPropertiesBuilder { WriterPropertiesBuilder::with_defaults() @@ -836,7 +849,7 @@ mod tests { #[test] fn test_writer_properties_default_settings() { - let props = WriterProperties::builder().build(); + let props = WriterProperties::default(); assert_eq!(props.data_pagesize_limit(), DEFAULT_PAGE_SIZE); assert_eq!( props.dictionary_pagesize_limit(), diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index bf843562ed02..782394942df4 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -785,7 +785,6 @@ mod tests { use crate::file::page_index::index_reader::{ read_columns_indexes, read_pages_locations, }; - use crate::file::properties::WriterProperties; use crate::file::writer::SerializedFileWriter; use crate::record::RowAccessor; use crate::schema::parser::parse_message_type; @@ -1716,12 +1715,9 @@ mod tests { let schema = parse_message_type(message_type).unwrap(); let mut out = Vec::with_capacity(1024); - let mut writer = SerializedFileWriter::new( - &mut out, - Arc::new(schema), - Arc::new(WriterProperties::builder().build()), - ) - .unwrap(); + let mut writer = + SerializedFileWriter::new(&mut out, Arc::new(schema), Default::default()) + .unwrap(); let mut r = writer.next_row_group().unwrap(); let mut c = r.next_column().unwrap().unwrap(); diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index 9923970bedde..defdaad321d8 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -21,24 +21,22 @@ use crate::bloom_filter::Sbbf; use crate::format as parquet; use crate::format::{ColumnIndex, OffsetIndex, RowGroup}; -use std::io::{BufWriter, IoSlice}; +use std::fmt::Debug; +use std::io::{BufWriter, IoSlice, Read}; use std::{io::Write, sync::Arc}; use thrift::protocol::{TCompactOutputProtocol, TSerializable}; -use crate::basic::PageType; use crate::column::writer::{ get_typed_column_writer_mut, ColumnCloseResult, ColumnWriterImpl, }; use crate::column::{ - page::{CompressedPage, Page, PageWriteSpec, PageWriter}, + page::{CompressedPage, PageWriteSpec, PageWriter}, writer::{get_column_writer, ColumnWriter}, }; use crate::data_type::DataType; use crate::errors::{ParquetError, Result}; -use crate::file::{ - metadata::*, properties::WriterPropertiesPtr, - statistics::to_thrift as statistics_to_thrift, PARQUET_MAGIC, -}; +use crate::file::reader::ChunkReader; +use crate::file::{metadata::*, properties::WriterPropertiesPtr, PARQUET_MAGIC}; use crate::schema::types::{ self, ColumnDescPtr, SchemaDescPtr, SchemaDescriptor, TypePtr, }; @@ -146,7 +144,19 @@ pub struct SerializedFileWriter { kv_metadatas: Vec, } -impl SerializedFileWriter { +impl Debug for SerializedFileWriter { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + // implement Debug so this can be used with #[derive(Debug)] + // in client code rather than actually listing all the fields + f.debug_struct("SerializedFileWriter") + .field("descr", &self.descr) + .field("row_group_index", &self.row_group_index) + .field("kv_metadatas", &self.kv_metadatas) + .finish_non_exhaustive() + } +} + +impl SerializedFileWriter { /// Creates new file writer. pub fn new(buf: W, schema: TypePtr, properties: WriterPropertiesPtr) -> Result { let mut buf = TrackedWrite::new(buf); @@ -357,6 +367,16 @@ impl SerializedFileWriter { self.kv_metadatas.push(kv_metadata); } + /// Returns a reference to schema descriptor. + pub fn schema_descr(&self) -> &SchemaDescriptor { + &self.descr + } + + /// Returns a reference to the writer properties + pub fn properties(&self) -> &WriterPropertiesPtr { + &self.props + } + /// Writes the file footer and returns the underlying writer. pub fn into_inner(mut self) -> Result { self.assert_previous_writer_closed()?; @@ -392,7 +412,7 @@ pub struct SerializedRowGroupWriter<'a, W: Write> { on_close: Option>, } -impl<'a, W: Write> SerializedRowGroupWriter<'a, W> { +impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> { /// Creates a new `SerializedRowGroupWriter` with: /// /// - `schema_descr` - the schema to write @@ -423,27 +443,15 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> { } } - /// Returns the next column writer, if available, using the factory function; - /// otherwise returns `None`. - pub(crate) fn next_column_with_factory<'b, F, C>( - &'b mut self, - factory: F, - ) -> Result> - where - F: FnOnce( - ColumnDescPtr, - &'b WriterPropertiesPtr, - Box, - OnCloseColumnChunk<'b>, - ) -> Result, - { - self.assert_previous_writer_closed()?; - - if self.column_index >= self.descr.num_columns() { - return Ok(None); - } - let page_writer = Box::new(SerializedPageWriter::new(self.buf)); + /// Advance `self.column_index` returning the next [`ColumnDescPtr`] if any + fn next_column_desc(&mut self) -> Option { + let ret = self.descr.columns().get(self.column_index)?.clone(); + self.column_index += 1; + Some(ret) + } + /// Returns [`OnCloseColumnChunk`] for the next writer + fn get_on_close(&mut self) -> (&mut TrackedWrite, OnCloseColumnChunk<'_>) { let total_bytes_written = &mut self.total_bytes_written; let total_uncompressed_bytes = &mut self.total_uncompressed_bytes; let total_rows_written = &mut self.total_rows_written; @@ -475,16 +483,33 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> { Ok(()) }; + (self.buf, Box::new(on_close)) + } - let column = self.descr.column(self.column_index); - self.column_index += 1; - - Ok(Some(factory( - column, - &self.props, - page_writer, - Box::new(on_close), - )?)) + /// Returns the next column writer, if available, using the factory function; + /// otherwise returns `None`. + pub(crate) fn next_column_with_factory<'b, F, C>( + &'b mut self, + factory: F, + ) -> Result> + where + F: FnOnce( + ColumnDescPtr, + WriterPropertiesPtr, + Box, + OnCloseColumnChunk<'b>, + ) -> Result, + { + self.assert_previous_writer_closed()?; + Ok(match self.next_column_desc() { + Some(column) => { + let props = self.props.clone(); + let (buf, on_close) = self.get_on_close(); + let page_writer = Box::new(SerializedPageWriter::new(buf)); + Some(factory(column, props, page_writer, Box::new(on_close))?) + } + None => None, + }) } /// Returns the next column writer, if available; otherwise returns `None`. @@ -492,11 +517,81 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> { /// closed returns `Err`. pub fn next_column(&mut self) -> Result>> { self.next_column_with_factory(|descr, props, page_writer, on_close| { - let column_writer = get_column_writer(descr, props.clone(), page_writer); + let column_writer = get_column_writer(descr, props, page_writer); Ok(SerializedColumnWriter::new(column_writer, Some(on_close))) }) } + /// Append an encoded column chunk from another source without decoding it + /// + /// This can be used for efficiently concatenating or projecting parquet data, + /// or encoding parquet data to temporary in-memory buffers + /// + /// See [`Self::next_column`] for writing data that isn't already encoded + pub fn append_column( + &mut self, + reader: &R, + mut close: ColumnCloseResult, + ) -> Result<()> { + self.assert_previous_writer_closed()?; + let desc = self.next_column_desc().ok_or_else(|| { + general_err!("exhausted columns in SerializedRowGroupWriter") + })?; + + let metadata = close.metadata; + + if metadata.column_descr() != desc.as_ref() { + return Err(general_err!( + "column descriptor mismatch, expected {:?} got {:?}", + desc, + metadata.column_descr() + )); + } + + let src_dictionary_offset = metadata.dictionary_page_offset(); + let src_data_offset = metadata.data_page_offset(); + let src_offset = src_dictionary_offset.unwrap_or(src_data_offset); + let src_length = metadata.compressed_size(); + + let write_offset = self.buf.bytes_written(); + let mut read = reader.get_read(src_offset as _)?.take(src_length as _); + let write_length = std::io::copy(&mut read, &mut self.buf)?; + + if src_length as u64 != write_length { + return Err(general_err!( + "Failed to splice column data, expected {read_length} got {write_length}" + )); + } + + let file_offset = self.buf.bytes_written() as i64; + + let map_offset = |x| x - src_offset + write_offset as i64; + let mut builder = ColumnChunkMetaData::builder(metadata.column_descr_ptr()) + .set_compression(metadata.compression()) + .set_encodings(metadata.encodings().clone()) + .set_file_offset(file_offset) + .set_total_compressed_size(metadata.compressed_size()) + .set_total_uncompressed_size(metadata.uncompressed_size()) + .set_num_values(metadata.num_values()) + .set_data_page_offset(map_offset(src_data_offset)) + .set_dictionary_page_offset(src_dictionary_offset.map(map_offset)); + + if let Some(statistics) = metadata.statistics() { + builder = builder.set_statistics(statistics.clone()) + } + close.metadata = builder.build()?; + + if let Some(offsets) = close.offset_index.as_mut() { + for location in &mut offsets.page_locations { + location.offset = map_offset(location.offset) + } + } + + SerializedPageWriter::new(self.buf).write_metadata(&metadata)?; + let (_, on_close) = self.get_on_close(); + on_close(close) + } + /// Closes this row group writer and returns row group metadata. pub fn close(mut self) -> Result { if self.row_group_metadata.is_none() { @@ -516,9 +611,9 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> { if let Some(on_close) = self.on_close.take() { on_close( metadata, - self.bloom_filters.clone(), - self.column_indexes.clone(), - self.offset_indexes.clone(), + self.bloom_filters, + self.column_indexes, + self.offset_indexes, )? } } @@ -565,17 +660,7 @@ impl<'a> SerializedColumnWriter<'a> { /// Close this [`SerializedColumnWriter`] pub fn close(mut self) -> Result<()> { - let r = match self.inner { - ColumnWriter::BoolColumnWriter(typed) => typed.close()?, - ColumnWriter::Int32ColumnWriter(typed) => typed.close()?, - ColumnWriter::Int64ColumnWriter(typed) => typed.close()?, - ColumnWriter::Int96ColumnWriter(typed) => typed.close()?, - ColumnWriter::FloatColumnWriter(typed) => typed.close()?, - ColumnWriter::DoubleColumnWriter(typed) => typed.close()?, - ColumnWriter::ByteArrayColumnWriter(typed) => typed.close()?, - ColumnWriter::FixedLenByteArrayColumnWriter(typed) => typed.close()?, - }; - + let r = self.inner.close()?; if let Some(on_close) = self.on_close.take() { on_close(r)? } @@ -611,88 +696,22 @@ impl<'a, W: Write> SerializedPageWriter<'a, W> { } } -impl<'a, W: Write> PageWriter for SerializedPageWriter<'a, W> { +impl<'a, W: Write + Send> PageWriter for SerializedPageWriter<'a, W> { fn write_page(&mut self, page: CompressedPage) -> Result { - let uncompressed_size = page.uncompressed_size(); - let compressed_size = page.compressed_size(); - let num_values = page.num_values(); - let encoding = page.encoding(); let page_type = page.page_type(); - - let mut page_header = parquet::PageHeader { - type_: page_type.into(), - uncompressed_page_size: uncompressed_size as i32, - compressed_page_size: compressed_size as i32, - // TODO: Add support for crc checksum - crc: None, - data_page_header: None, - index_page_header: None, - dictionary_page_header: None, - data_page_header_v2: None, - }; - - match *page.compressed_page() { - Page::DataPage { - def_level_encoding, - rep_level_encoding, - ref statistics, - .. - } => { - let data_page_header = parquet::DataPageHeader { - num_values: num_values as i32, - encoding: encoding.into(), - definition_level_encoding: def_level_encoding.into(), - repetition_level_encoding: rep_level_encoding.into(), - statistics: statistics_to_thrift(statistics.as_ref()), - }; - page_header.data_page_header = Some(data_page_header); - } - Page::DataPageV2 { - num_nulls, - num_rows, - def_levels_byte_len, - rep_levels_byte_len, - is_compressed, - ref statistics, - .. - } => { - let data_page_header_v2 = parquet::DataPageHeaderV2 { - num_values: num_values as i32, - num_nulls: num_nulls as i32, - num_rows: num_rows as i32, - encoding: encoding.into(), - definition_levels_byte_length: def_levels_byte_len as i32, - repetition_levels_byte_length: rep_levels_byte_len as i32, - is_compressed: Some(is_compressed), - statistics: statistics_to_thrift(statistics.as_ref()), - }; - page_header.data_page_header_v2 = Some(data_page_header_v2); - } - Page::DictionaryPage { is_sorted, .. } => { - let dictionary_page_header = parquet::DictionaryPageHeader { - num_values: num_values as i32, - encoding: encoding.into(), - is_sorted: Some(is_sorted), - }; - page_header.dictionary_page_header = Some(dictionary_page_header); - } - } - let start_pos = self.sink.bytes_written() as u64; + let page_header = page.to_thrift_header(); let header_size = self.serialize_page_header(page_header)?; self.sink.write_all(page.data())?; let mut spec = PageWriteSpec::new(); spec.page_type = page_type; - spec.uncompressed_size = uncompressed_size + header_size; - spec.compressed_size = compressed_size + header_size; + spec.uncompressed_size = page.uncompressed_size() + header_size; + spec.compressed_size = page.compressed_size() + header_size; spec.offset = start_pos; spec.bytes_written = self.sink.bytes_written() as u64 - start_pos; - // Number of values is incremented for data pages only - if page_type == PageType::DATA_PAGE || page_type == PageType::DATA_PAGE_V2 { - spec.num_values = num_values; - } + spec.num_values = page.num_values(); Ok(spec) } @@ -719,10 +738,12 @@ mod tests { use std::fs::File; use crate::basic::{Compression, Encoding, LogicalType, Repetition, Type}; - use crate::column::page::PageReader; + use crate::column::page::{Page, PageReader}; + use crate::column::reader::get_typed_column_reader; use crate::compression::{create_codec, Codec, CodecOptionsBuilder}; use crate::data_type::{BoolType, Int32Type}; use crate::file::reader::ChunkReader; + use crate::file::serialized_reader::ReadOptionsBuilder; use crate::file::{ properties::{ReaderProperties, WriterProperties, WriterVersion}, reader::{FileReader, SerializedFileReader, SerializedPageReader}, @@ -747,7 +768,7 @@ mod tests { .build() .unwrap(), ); - let props = Arc::new(WriterProperties::builder().build()); + let props = Default::default(); let mut writer = SerializedFileWriter::new(file, schema, props).unwrap(); let row_group_writer = writer.next_row_group().unwrap(); let res = row_group_writer.close(); @@ -782,7 +803,7 @@ mod tests { .build() .unwrap(), ); - let props = Arc::new(WriterProperties::builder().build()); + let props = Default::default(); let mut writer = SerializedFileWriter::new(file, schema, props).unwrap(); let mut row_group_writer = writer.next_row_group().unwrap(); @@ -820,7 +841,7 @@ mod tests { .build() .unwrap(), ); - let props = Arc::new(WriterProperties::builder().build()); + let props = Default::default(); let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap(); writer.close().unwrap(); @@ -1245,7 +1266,7 @@ mod tests { compression: Compression, ) -> crate::format::FileMetaData where - W: Write, + W: Write + Send, R: ChunkReader + From + 'static, { test_roundtrip::( @@ -1265,7 +1286,7 @@ mod tests { compression: Compression, ) -> crate::format::FileMetaData where - W: Write, + W: Write + Send, R: ChunkReader + From + 'static, D: DataType, F: Fn(Row) -> D::T, @@ -1497,7 +1518,7 @@ mod tests { "; let schema = Arc::new(parse_message_type(message_type).unwrap()); - let props = Arc::new(WriterProperties::builder().build()); + let props = Default::default(); let mut writer = SerializedFileWriter::new(vec![], schema, props).unwrap(); let mut row_group_writer = writer.next_row_group().unwrap(); @@ -1540,4 +1561,91 @@ mod tests { assert_eq!(s.min_value.as_deref(), Some(1_i32.to_le_bytes().as_ref())); assert_eq!(s.max_value.as_deref(), Some(3_i32.to_le_bytes().as_ref())); } + + #[test] + fn test_spliced_write() { + let message_type = " + message test_schema { + REQUIRED INT32 i32 (INTEGER(32,true)); + REQUIRED INT32 u32 (INTEGER(32,false)); + } + "; + let schema = Arc::new(parse_message_type(message_type).unwrap()); + let props = Arc::new(WriterProperties::builder().build()); + + let mut file = Vec::with_capacity(1024); + let mut file_writer = + SerializedFileWriter::new(&mut file, schema, props.clone()).unwrap(); + + let columns = file_writer.descr.columns(); + let mut column_state: Vec<(_, Option)> = columns + .iter() + .map(|_| (TrackedWrite::new(Vec::with_capacity(1024)), None)) + .collect(); + + let mut column_state_slice = column_state.as_mut_slice(); + let mut column_writers = Vec::with_capacity(columns.len()); + for c in columns { + let ((buf, out), tail) = column_state_slice.split_first_mut().unwrap(); + column_state_slice = tail; + + let page_writer = Box::new(SerializedPageWriter::new(buf)); + let col_writer = get_column_writer(c.clone(), props.clone(), page_writer); + column_writers.push(SerializedColumnWriter::new( + col_writer, + Some(Box::new(|on_close| { + *out = Some(on_close); + Ok(()) + })), + )); + } + + let column_data = [[1, 2, 3, 4], [7, 3, 7, 3]]; + + // Interleaved writing to the column writers + for (writer, batch) in column_writers.iter_mut().zip(column_data) { + let writer = writer.typed::(); + writer.write_batch(&batch, None, None).unwrap(); + } + + // Close the column writers + for writer in column_writers { + writer.close().unwrap() + } + + // Splice column data into a row group + let mut row_group_writer = file_writer.next_row_group().unwrap(); + for (write, close) in column_state { + let buf = Bytes::from(write.into_inner().unwrap()); + row_group_writer + .append_column(&buf, close.unwrap()) + .unwrap(); + } + row_group_writer.close().unwrap(); + file_writer.close().unwrap(); + + // Check data was written correctly + let file = Bytes::from(file); + let test_read = |reader: SerializedFileReader| { + let row_group = reader.get_row_group(0).unwrap(); + + let mut out = [0; 4]; + let c1 = row_group.get_column_reader(0).unwrap(); + let mut c1 = get_typed_column_reader::(c1); + c1.read_batch(4, None, None, &mut out).unwrap(); + assert_eq!(out, column_data[0]); + + let c2 = row_group.get_column_reader(1).unwrap(); + let mut c2 = get_typed_column_reader::(c2); + c2.read_batch(4, None, None, &mut out).unwrap(); + assert_eq!(out, column_data[1]); + }; + + let reader = SerializedFileReader::new(file.clone()).unwrap(); + test_read(reader); + + let options = ReadOptionsBuilder::new().with_page_index().build(); + let reader = SerializedFileReader::new_with_options(file, options).unwrap(); + test_read(reader); + } } diff --git a/parquet/src/record/record_writer.rs b/parquet/src/record/record_writer.rs index fe803a7ff4ef..62099051f513 100644 --- a/parquet/src/record/record_writer.rs +++ b/parquet/src/record/record_writer.rs @@ -21,7 +21,7 @@ use super::super::errors::ParquetError; use super::super::file::writer::SerializedRowGroupWriter; pub trait RecordWriter { - fn write_to_row_group( + fn write_to_row_group( &self, row_group_writer: &mut SerializedRowGroupWriter, ) -> Result<(), ParquetError>; diff --git a/parquet/src/util/memory.rs b/parquet/src/util/memory.rs index 909878a6d538..25d15dd4ff73 100644 --- a/parquet/src/util/memory.rs +++ b/parquet/src/util/memory.rs @@ -114,6 +114,12 @@ impl From for ByteBufferPtr { } } +impl From for Bytes { + fn from(value: ByteBufferPtr) -> Self { + value.data + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/parquet_derive/src/lib.rs b/parquet_derive/src/lib.rs index 6525513cbaa1..0f875401f0e9 100644 --- a/parquet_derive/src/lib.rs +++ b/parquet_derive/src/lib.rs @@ -65,8 +65,7 @@ mod parquet_field; /// /// let schema = samples.as_slice().schema(); /// -/// let props = Arc::new(WriterProperties::builder().build()); -/// let mut writer = SerializedFileWriter::new(file, schema, props).unwrap(); +/// let mut writer = SerializedFileWriter::new(file, schema, Default::default()).unwrap(); /// /// let mut row_group = writer.next_row_group().unwrap(); /// samples.as_slice().write_to_row_group(&mut row_group).unwrap(); @@ -97,7 +96,7 @@ pub fn parquet_record_writer(input: proc_macro::TokenStream) -> proc_macro::Toke (quote! { impl #generics ::parquet::record::RecordWriter<#derived_for #generics> for &[#derived_for #generics] { - fn write_to_row_group( + fn write_to_row_group( &self, row_group_writer: &mut ::parquet::file::writer::SerializedRowGroupWriter<'_, W> ) -> Result<(), ::parquet::errors::ParquetError> { diff --git a/parquet_derive_test/src/lib.rs b/parquet_derive_test/src/lib.rs index 2aa174974aba..f4f8be1e0d8c 100644 --- a/parquet_derive_test/src/lib.rs +++ b/parquet_derive_test/src/lib.rs @@ -56,8 +56,7 @@ mod tests { use std::{env, fs, io::Write, sync::Arc}; use parquet::{ - file::{properties::WriterProperties, writer::SerializedFileWriter}, - record::RecordWriter, + file::writer::SerializedFileWriter, record::RecordWriter, schema::parser::parse_message_type, }; @@ -139,7 +138,7 @@ mod tests { assert_eq!(&schema, &generated_schema); - let props = Arc::new(WriterProperties::builder().build()); + let props = Default::default(); let mut writer = SerializedFileWriter::new(file, generated_schema, props).unwrap();