Skip to content
10 changes: 6 additions & 4 deletions arrow-pyarrow-integration-testing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ use arrow::array::new_empty_array;
use pyo3::prelude::*;
use pyo3::wrap_pyfunction;

use arrow::array::{Array, ArrayData, ArrayRef, Int64Array, make_array};
use arrow::array::{make_array, Array, ArrayData, ArrayRef, Int64Array};
use arrow::compute::kernels;
use arrow::datatypes::{DataType, Field, Schema};
use arrow::error::ArrowError;
use arrow::ffi_stream::ArrowArrayStreamReader;
use arrow::pyarrow::{PyArrowConvert, PyArrowException, PyArrowType};
use arrow::pyarrow::{FromPyArrow, PyArrowException, PyArrowType, ToPyArrow};
use arrow::record_batch::RecordBatch;

fn to_py_err(err: ArrowError) -> PyErr {
Expand Down Expand Up @@ -88,7 +88,8 @@ fn substring(
let array = make_array(array.0);

// substring
let array = kernels::substring::substring(array.as_ref(), start, None).map_err(to_py_err)?;
let array =
kernels::substring::substring(array.as_ref(), start, None).map_err(to_py_err)?;

Ok(array.to_data().into())
}
Expand All @@ -99,7 +100,8 @@ fn concatenate(array: PyArrowType<ArrayData>, py: Python) -> PyResult<PyObject>
let array = make_array(array.0);

// concat
let array = kernels::concat::concat(&[array.as_ref(), array.as_ref()]).map_err(to_py_err)?;
let array =
kernels::concat::concat(&[array.as_ref(), array.as_ref()]).map_err(to_py_err)?;

array.to_data().to_pyarrow(py)
}
Expand Down
99 changes: 34 additions & 65 deletions arrow/src/ffi_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,25 +37,19 @@
//! let reader = Box::new(FileReader::try_new(file).unwrap());
//!
//! // export it
//! let stream = Box::new(FFI_ArrowArrayStream::empty());
//! let stream_ptr = Box::into_raw(stream) as *mut FFI_ArrowArrayStream;
//! unsafe { export_reader_into_raw(reader, stream_ptr) };
//! let mut stream = FFI_ArrowArrayStream::empty();
//! unsafe { export_reader_into_raw(reader, &mut stream) };
//!
//! // consumed and used by something else...
//!
//! // import it
//! let stream_reader = unsafe { ArrowArrayStreamReader::from_raw(stream_ptr).unwrap() };
//! let stream_reader = unsafe { ArrowArrayStreamReader::from_raw(&mut stream).unwrap() };
//! let imported_schema = stream_reader.schema();
//!
//! let mut produced_batches = vec![];
//! for batch in stream_reader {
//! produced_batches.push(batch.unwrap());
//! }
//!
//! // (drop/release)
//! unsafe {
//! Box::from_raw(stream_ptr);
//! }
//! Ok(())
//! }
//! ```
Expand Down Expand Up @@ -105,6 +99,8 @@ pub struct FFI_ArrowArrayStream {
pub private_data: *mut c_void,
}

unsafe impl Send for FFI_ArrowArrayStream {}

// callback used to drop [FFI_ArrowArrayStream] when it is exported.
unsafe extern "C" fn release_stream(stream: *mut FFI_ArrowArrayStream) {
if stream.is_null() {
Expand Down Expand Up @@ -231,8 +227,7 @@ impl ExportedArrayStream {
let struct_array = StructArray::from(batch);
let array = FFI_ArrowArray::new(&struct_array.to_data());

unsafe { std::ptr::copy(addr_of!(array), out, 1) };
std::mem::forget(array);
unsafe { std::ptr::write_unaligned(out, array) };
0
} else {
let err = &next_batch.unwrap_err();
Expand Down Expand Up @@ -261,24 +256,21 @@ fn get_error_code(err: &ArrowError) -> i32 {
/// Struct used to fetch `RecordBatch` from the C Stream Interface.
/// Its main responsibility is to expose `RecordBatchReader` functionality
/// that requires [FFI_ArrowArrayStream].
#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct ArrowArrayStreamReader {
stream: Arc<FFI_ArrowArrayStream>,
stream: FFI_ArrowArrayStream,
schema: SchemaRef,
}

/// Gets schema from a raw pointer of `FFI_ArrowArrayStream`. This is used when constructing
/// `ArrowArrayStreamReader` to cache schema.
fn get_stream_schema(stream_ptr: *mut FFI_ArrowArrayStream) -> Result<SchemaRef> {
let empty_schema = Arc::new(FFI_ArrowSchema::empty());
let schema_ptr = Arc::into_raw(empty_schema) as *mut FFI_ArrowSchema;
let mut schema = FFI_ArrowSchema::empty();

let ret_code = unsafe { (*stream_ptr).get_schema.unwrap()(stream_ptr, schema_ptr) };

let ffi_schema = unsafe { Arc::from_raw(schema_ptr) };
let ret_code = unsafe { (*stream_ptr).get_schema.unwrap()(stream_ptr, &mut schema) };

if ret_code == 0 {
let schema = Schema::try_from(ffi_schema.as_ref()).unwrap();
let schema = Schema::try_from(&schema).unwrap();
Ok(Arc::new(schema))
} else {
Err(ArrowError::CDataInterface(format!(
Expand All @@ -291,21 +283,16 @@ impl ArrowArrayStreamReader {
/// Creates a new `ArrowArrayStreamReader` from a `FFI_ArrowArrayStream`.
/// This is used to import from the C Stream Interface.
#[allow(dead_code)]
pub fn try_new(stream: FFI_ArrowArrayStream) -> Result<Self> {
pub fn try_new(mut stream: FFI_ArrowArrayStream) -> Result<Self> {
if stream.release.is_none() {
return Err(ArrowError::CDataInterface(
"input stream is already released".to_string(),
));
}

let stream_ptr = Arc::into_raw(Arc::new(stream)) as *mut FFI_ArrowArrayStream;

let schema = get_stream_schema(stream_ptr)?;
let schema = get_stream_schema(&mut stream)?;

Ok(Self {
stream: unsafe { Arc::from_raw(stream_ptr) },
schema,
})
Ok(Self { stream, schema })
}

/// Creates a new `ArrowArrayStreamReader` from a raw pointer of `FFI_ArrowArrayStream`.
Expand All @@ -324,13 +311,12 @@ impl ArrowArrayStreamReader {
}

/// Get the last error from `ArrowArrayStreamReader`
fn get_stream_last_error(&self) -> Option<String> {
fn get_stream_last_error(&mut self) -> Option<String> {
self.stream.get_last_error?;

let stream_ptr = Arc::as_ptr(&self.stream) as *mut FFI_ArrowArrayStream;

let error_str = unsafe {
let c_str = self.stream.get_last_error.unwrap()(stream_ptr) as *mut c_char;
let c_str =
self.stream.get_last_error.unwrap()(&mut self.stream) as *mut c_char;
CString::from_raw(c_str).into_string()
};

Expand All @@ -346,26 +332,22 @@ impl Iterator for ArrowArrayStreamReader {
type Item = Result<RecordBatch>;

fn next(&mut self) -> Option<Self::Item> {
let stream_ptr = Arc::as_ptr(&self.stream) as *mut FFI_ArrowArrayStream;

let empty_array = Arc::new(FFI_ArrowArray::empty());
let array_ptr = Arc::into_raw(empty_array) as *mut FFI_ArrowArray;
let mut array = FFI_ArrowArray::empty();

let ret_code = unsafe { self.stream.get_next.unwrap()(stream_ptr, array_ptr) };
let ret_code =
unsafe { self.stream.get_next.unwrap()(&mut self.stream, &mut array) };

if ret_code == 0 {
let ffi_array = unsafe { Arc::from_raw(array_ptr) };

// The end of stream has been reached
if ffi_array.is_released() {
if array.is_released() {
return None;
}

let schema_ref = self.schema();
let schema = FFI_ArrowSchema::try_from(schema_ref.as_ref()).ok()?;

let data = ArrowArray {
array: ffi_array,
array: Arc::new(array),
schema: Arc::new(schema),
}
.to_data()
Expand All @@ -375,8 +357,6 @@ impl Iterator for ArrowArrayStreamReader {

Some(Ok(record_batch))
} else {
unsafe { Arc::from_raw(array_ptr) };

let last_error = self.get_stream_last_error();
let err = ArrowError::CDataInterface(last_error.unwrap());
Some(Err(err))
Expand Down Expand Up @@ -451,40 +431,33 @@ mod tests {
let reader = TestRecordBatchReader::new(schema.clone(), iter);

// Export a `RecordBatchReader` through `FFI_ArrowArrayStream`
let stream = Arc::new(FFI_ArrowArrayStream::empty());
let stream_ptr = Arc::into_raw(stream) as *mut FFI_ArrowArrayStream;

unsafe { export_reader_into_raw(reader, stream_ptr) };

let empty_schema = Arc::new(FFI_ArrowSchema::empty());
let schema_ptr = Arc::into_raw(empty_schema) as *mut FFI_ArrowSchema;
let mut ffi_stream = FFI_ArrowArrayStream::empty();
unsafe { export_reader_into_raw(reader, &mut ffi_stream) };

// Get schema from `FFI_ArrowArrayStream`
let ret_code = unsafe { get_schema(stream_ptr, schema_ptr) };
let mut ffi_schema = FFI_ArrowSchema::empty();
let ret_code = unsafe { get_schema(&mut ffi_stream, &mut ffi_schema) };
assert_eq!(ret_code, 0);

let ffi_schema = unsafe { Arc::from_raw(schema_ptr) };

let exported_schema = Schema::try_from(ffi_schema.as_ref()).unwrap();
let exported_schema = Schema::try_from(&ffi_schema).unwrap();
assert_eq!(&exported_schema, schema.as_ref());

let ffi_schema = Arc::new(ffi_schema);

// Get array from `FFI_ArrowArrayStream`
let mut produced_batches = vec![];
loop {
let empty_array = Arc::new(FFI_ArrowArray::empty());
let array_ptr = Arc::into_raw(empty_array.clone()) as *mut FFI_ArrowArray;

let ret_code = unsafe { get_next(stream_ptr, array_ptr) };
let mut ffi_array = FFI_ArrowArray::empty();
let ret_code = unsafe { get_next(&mut ffi_stream, &mut ffi_array) };
assert_eq!(ret_code, 0);

// The end of stream has been reached
let ffi_array = unsafe { Arc::from_raw(array_ptr) };
if ffi_array.is_released() {
break;
}

let array = ArrowArray {
array: ffi_array,
array: Arc::new(ffi_array),
schema: ffi_schema.clone(),
}
.to_data()
Expand All @@ -496,7 +469,6 @@ mod tests {

assert_eq!(produced_batches, vec![batch.clone(), batch]);

unsafe { Arc::from_raw(stream_ptr) };
Ok(())
}

Expand All @@ -512,10 +484,8 @@ mod tests {
let reader = TestRecordBatchReader::new(schema.clone(), iter);

// Import through `FFI_ArrowArrayStream` as `ArrowArrayStreamReader`
let stream = Arc::new(FFI_ArrowArrayStream::new(reader));
let stream_ptr = Arc::into_raw(stream) as *mut FFI_ArrowArrayStream;
let stream_reader =
unsafe { ArrowArrayStreamReader::from_raw(stream_ptr).unwrap() };
let stream = FFI_ArrowArrayStream::new(reader);
let stream_reader = ArrowArrayStreamReader::try_new(stream).unwrap();

let imported_schema = stream_reader.schema();
assert_eq!(imported_schema, schema);
Expand All @@ -527,7 +497,6 @@ mod tests {

assert_eq!(produced_batches, vec![batch.clone(), batch]);

unsafe { Arc::from_raw(stream_ptr) };
Ok(())
}

Expand Down
Loading