From 8b342c5952f130354b75b06e3f78bc24da355d54 Mon Sep 17 00:00:00 2001 From: Liang Geng Date: Tue, 17 Mar 2026 14:51:51 +0800 Subject: [PATCH 1/4] Integrate libgpuspatial into sedona-db --- rust/sedona-common/src/option.rs | 31 + rust/sedona-spatial-join/Cargo.toml | 3 + rust/sedona-spatial-join/src/index.rs | 2 + .../src/index/gpu_spatial_index.rs | 550 ++++++++++++++++++ .../src/index/gpu_spatial_index_builder.rs | 327 +++++++++++ .../src/index/partitioned_index_provider.rs | 74 ++- .../src/operand_evaluator.rs | 70 ++- rust/sedona-spatial-join/src/prepare.rs | 1 + rust/sedona/Cargo.toml | 2 + rust/sedona/src/context.rs | 16 +- 10 files changed, 1049 insertions(+), 27 deletions(-) create mode 100644 rust/sedona-spatial-join/src/index/gpu_spatial_index.rs create mode 100644 rust/sedona-spatial-join/src/index/gpu_spatial_index_builder.rs diff --git a/rust/sedona-common/src/option.rs b/rust/sedona-common/src/option.rs index 21b228d4e..c147b1f49 100644 --- a/rust/sedona-common/src/option.rs +++ b/rust/sedona-common/src/option.rs @@ -109,6 +109,9 @@ config_namespace! { /// Options for debugging or testing spatial join pub debug : SpatialJoinDebugOptions, default = SpatialJoinDebugOptions::default() + + /// GPU acceleration options + pub gpu: GpuOptions, default = GpuOptions::default() } } @@ -129,6 +132,34 @@ config_namespace! { } } +config_namespace! { + /// Configuration options for GPU-accelerated spatial joins + pub struct GpuOptions { + /// Enable GPU-accelerated spatial joins (requires CUDA and GPU feature flag) + pub enable: bool, default = false + + // Concatenate all geometries on the build-side into a single buffer for GPU processing + pub concat_build: bool, default = true + + /// GPU device ID to use (0 = first GPU, 1 = second, etc.) + pub device_id: usize, default = 0 + + /// Fall back to CPU if GPU initialization or execution fails + pub fallback_to_cpu: bool, default = true + /// Use CUDA memory pool for GPU memory management + pub use_memory_pool: bool, default = true + /// Percentage of total GPU memory to initialize CUDA memory pool (between 0% and 100%) + pub memory_pool_init_percentage: usize, default = 50 + + /// Overlapping parsing and refinement by pipelining multiple batches; 1 means no pipelining + pub pipeline_batches: usize, default = 1 + + + /// Compress BVH to reduce memory usage for processing larger datasets at the cost of some performance + pub compress_bvh: bool, default = false + } +} + #[derive(Debug, PartialEq, Clone, Copy)] pub enum NumSpatialPartitionsConfig { /// Automatically determine the number of spatial partitions diff --git a/rust/sedona-spatial-join/Cargo.toml b/rust/sedona-spatial-join/Cargo.toml index e9d830cf8..95a882005 100644 --- a/rust/sedona-spatial-join/Cargo.toml +++ b/rust/sedona-spatial-join/Cargo.toml @@ -32,6 +32,8 @@ result_large_err = "allow" [features] backtrace = ["datafusion-common/backtrace"] +# Enable GPU acceleration (requires CUDA toolkit and sedona-libgpuspatial with gpu feature) +gpu = ["sedona-libgpuspatial/gpu"] [dependencies] async-trait = { workspace = true } @@ -63,6 +65,7 @@ sedona-geometry = { workspace = true } sedona-schema = { workspace = true } sedona-tg = { workspace = true } sedona-geos = { workspace = true } +sedona-libgpuspatial = { workspace = true } wkb = { workspace = true } geo-index = { workspace = true } geos = { workspace = true } diff --git a/rust/sedona-spatial-join/src/index.rs b/rust/sedona-spatial-join/src/index.rs index cd5f38d2f..332369b88 100644 --- a/rust/sedona-spatial-join/src/index.rs +++ b/rust/sedona-spatial-join/src/index.rs @@ -18,6 +18,8 @@ pub(crate) mod build_side_collector; pub(crate) mod default_spatial_index; pub(crate) mod default_spatial_index_builder; +pub(crate) mod gpu_spatial_index; +mod gpu_spatial_index_builder; mod knn_adapter; pub(crate) mod memory_plan; pub(crate) mod partitioned_index_provider; diff --git a/rust/sedona-spatial-join/src/index/gpu_spatial_index.rs b/rust/sedona-spatial-join/src/index/gpu_spatial_index.rs new file mode 100644 index 000000000..1edf3dfdb --- /dev/null +++ b/rust/sedona-spatial-join/src/index/gpu_spatial_index.rs @@ -0,0 +1,550 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::evaluated_batch::EvaluatedBatch; +use crate::index::spatial_index::SpatialIndex; +use crate::index::QueryResultMetrics; +use crate::operand_evaluator::OperandEvaluator; +use crate::spatial_predicate::SpatialRelationType; +use crate::{operand_evaluator::create_operand_evaluator, spatial_predicate::SpatialPredicate}; +use arrow::array::BooleanBufferBuilder; +use arrow_array::{ArrayRef, RecordBatch}; +use arrow_schema::SchemaRef; +use async_trait::async_trait; +use datafusion_common::{DataFusionError, Result}; +use geo_types::{coord, Rect}; +use parking_lot::Mutex; +use sedona_common::{ExecutionMode, SpatialJoinOptions}; +use sedona_expr::statistics::GeoStatistics; +use sedona_libgpuspatial::{ + GpuSpatialIndex, GpuSpatialOptions, GpuSpatialRefiner, GpuSpatialRelationPredicate, +}; +use std::ops::Range; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use wkb::reader::Wkb; + +pub struct GPUSpatialIndex { + pub(crate) schema: SchemaRef, + pub(crate) _options: SpatialJoinOptions, + /// The spatial predicate evaluator for the spatial predicate. + #[allow(dead_code)] // reserved for GPU-based distance evaluation + pub(crate) evaluator: Arc, + /// GPU spatial index for performing GPU-accelerated filtering + pub(crate) index: Arc, + /// GPU spatial refiner for performing GPU-accelerated refinement + pub(crate) refiner: Arc, + pub(crate) spatial_predicate: SpatialPredicate, + /// Indexed batches containing evaluated geometry arrays. It contains the original record + /// batches and geometry arrays obtained by evaluating the geometry expression on the build side. + pub(crate) indexed_batches: Vec, + /// An array for translating data index to geometry batch index and row index + pub(crate) data_id_to_batch_pos: Vec<(i32, i32)>, + /// Shared bitmap builders for visited left indices, one per batch + pub(crate) visited_build_side: Option>>, + /// Counter of running probe-threads, potentially able to update `bitmap`. + /// Each time a probe thread finished probing the index, it will decrement the counter. + /// The last finished probe thread will produce the extra output batches for unmatched + /// build side when running left-outer joins. See also [`report_probe_completed`]. + pub(crate) probe_threads_counter: AtomicUsize, +} +impl GPUSpatialIndex { + pub fn empty( + spatial_predicate: SpatialPredicate, + schema: SchemaRef, + options: SpatialJoinOptions, + probe_threads_counter: AtomicUsize, + ) -> Result { + let gpu_options = GpuSpatialOptions { + cuda_use_memory_pool: options.gpu.use_memory_pool, + cuda_memory_pool_init_percent: options.gpu.memory_pool_init_percentage as i32, + concurrency: 1, + device_id: options.gpu.device_id as i32, + compress_bvh: options.gpu.compress_bvh, + pipeline_batches: options.gpu.pipeline_batches as u32, + }; + + let evaluator = create_operand_evaluator(&spatial_predicate, options.clone()); + + Ok(Self { + schema, + _options: options, + evaluator, + spatial_predicate, + index: Arc::new( + GpuSpatialIndex::try_new(&gpu_options) + .map_err(|e| DataFusionError::Execution(e.to_string()))?, + ), + refiner: Arc::new( + GpuSpatialRefiner::try_new(&gpu_options) + .map_err(|e| DataFusionError::Execution(e.to_string()))?, + ), + indexed_batches: vec![], + data_id_to_batch_pos: vec![], + visited_build_side: None, + probe_threads_counter, + }) + } + + #[allow(clippy::too_many_arguments)] + pub fn new( + spatial_predicate: SpatialPredicate, + schema: SchemaRef, + options: SpatialJoinOptions, + evaluator: Arc, + index: Arc, + refiner: Arc, + indexed_batches: Vec, + data_id_to_batch_pos: Vec<(i32, i32)>, + visited_build_side: Option>>, + probe_threads_counter: AtomicUsize, + ) -> Result { + Ok(Self { + schema, + _options: options, + evaluator, + spatial_predicate, + index, + refiner, + indexed_batches, + data_id_to_batch_pos, + visited_build_side, + probe_threads_counter, + }) + } + + fn refine( + &self, + probe_geoms: &ArrayRef, + predicate: &SpatialPredicate, + build_indices: &mut Vec, + probe_indices: &mut Vec, + ) -> Result<()> { + match predicate { + SpatialPredicate::Relation(rel_p) => { + self.refiner + .refine( + probe_geoms, + Self::convert_relation_type(&rel_p.relation_type)?, + build_indices, + probe_indices, + ) + .map_err(|e| { + DataFusionError::Execution(format!( + "GPU spatial refinement failed: {:?}", + e + )) + })?; + Ok(()) + } + _ => Err(DataFusionError::NotImplemented( + "Only Relation predicate is supported for GPU spatial query".to_string(), + )), + } + } + // Translate Sedona SpatialRelationType to GpuSpatialRelationPredicate + fn convert_relation_type(t: &SpatialRelationType) -> Result { + match t { + SpatialRelationType::Equals => Ok(GpuSpatialRelationPredicate::Equals), + SpatialRelationType::Touches => Ok(GpuSpatialRelationPredicate::Touches), + SpatialRelationType::Contains => Ok(GpuSpatialRelationPredicate::Contains), + SpatialRelationType::Covers => Ok(GpuSpatialRelationPredicate::Covers), + SpatialRelationType::Intersects => Ok(GpuSpatialRelationPredicate::Intersects), + SpatialRelationType::Within => Ok(GpuSpatialRelationPredicate::Within), + SpatialRelationType::CoveredBy => Ok(GpuSpatialRelationPredicate::CoveredBy), + _ => { + // This should not happen as we check for supported predicates earlier + Err(DataFusionError::Execution(format!( + "Unsupported spatial relation type for GPU: {:?}", + t + ))) + } + } + } +} + +#[async_trait] +impl SpatialIndex for GPUSpatialIndex { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + #[cfg(test)] + fn num_indexed_batches(&self) -> usize { + self.indexed_batches.len() + } + fn get_indexed_batch(&self, batch_idx: usize) -> &RecordBatch { + &self.indexed_batches[batch_idx].batch + } + + /// This method implements [`SpatialIndex::query_batch`] with GPU-accelerated spatial filtering + /// and refinement. It takes a batch of probe geometries and a range of row indices to process, + /// performs a spatial query on the GPU to find candidate matches, + /// refines the candidates using the GPU refiner, + /// and returns the matching build batch positions and probe indices. + async fn query_batch( + &self, + evaluated_batch: &Arc, + range: Range, + _max_result_size: usize, + build_batch_positions: &mut Vec<(i32, i32)>, + probe_indices: &mut Vec, + ) -> Result<(QueryResultMetrics, usize)> { + if range.is_empty() { + return Ok(( + QueryResultMetrics { + count: 0, + candidate_count: 0, + }, + range.start, + )); + } + let index = &self.index.as_ref(); + + let empty_rect = Rect::new( + coord!(x: f32::NAN, y: f32::NAN), + coord!(x: f32::NAN, y: f32::NAN), + ); + let rects: Vec<_> = range + .clone() + .map(|row_idx| evaluated_batch.geom_array.rects[row_idx].unwrap_or(empty_rect)) + .collect(); + + let (mut gpu_build_indices, mut gpu_probe_indices) = + index.probe(rects.as_ref()).map_err(|e| { + DataFusionError::Execution(format!("GPU spatial query failed: {:?}", e)) + })?; + + assert_eq!(gpu_build_indices.len(), gpu_probe_indices.len()); + + let candidate_count = gpu_build_indices.len(); + + self.refine( + &evaluated_batch.geom_array.geometry_array, + &self.spatial_predicate, + &mut gpu_build_indices, + &mut gpu_probe_indices, + )?; + + assert_eq!(gpu_build_indices.len(), gpu_probe_indices.len()); + + let total_count = gpu_build_indices.len(); + + for (build_idx, probe_idx) in gpu_build_indices.iter().zip(gpu_probe_indices.iter()) { + let data_id = *build_idx as usize; + let (batch_idx, row_idx) = self.data_id_to_batch_pos[data_id]; + build_batch_positions.push((batch_idx, row_idx)); + probe_indices.push(range.start as u32 + probe_idx); + } + Ok(( + QueryResultMetrics { + count: total_count, + candidate_count, + }, + range.end, + )) + } + fn need_more_probe_stats(&self) -> bool { + false + } + + fn merge_probe_stats(&self, stats: GeoStatistics) { + let _ = stats; + } + + fn visited_build_side(&self) -> Option<&Mutex>> { + self.visited_build_side.as_ref() + } + + fn report_probe_completed(&self) -> bool { + self.probe_threads_counter.fetch_sub(1, Ordering::Relaxed) == 1 + } + + fn get_refiner_mem_usage(&self) -> usize { + 0 + } + + fn get_actual_execution_mode(&self) -> ExecutionMode { + ExecutionMode::PrepareBuild // GPU-based spatial index is always on PrepareBuild mode + } + #[allow(unused)] + fn query( + &self, + probe_wkb: &Wkb, + probe_rect: &Rect, + distance: &Option, + build_batch_positions: &mut Vec<(i32, i32)>, + ) -> Result { + let _ = (probe_wkb, probe_rect, distance, build_batch_positions); + Err(DataFusionError::NotImplemented( + "Serial query is not implemented for GPU spatial index".to_string(), + )) + } + + fn query_knn( + &self, + _probe_wkb: &Wkb, + _k: u32, + _use_spheroid: bool, + _include_tie_breakers: bool, + _build_batch_positions: &mut Vec<(i32, i32)>, + _distances: Option<&mut Vec>, + ) -> Result { + Err(DataFusionError::NotImplemented( + "KNN query is not implemented for GPU spatial index".to_string(), + )) + } +} + +#[cfg(test)] +#[cfg(feature = "gpu")] +mod tests { + use crate::evaluated_batch::evaluated_batch_stream::{ + EvaluatedBatchStream, SendableEvaluatedBatchStream, + }; + use crate::evaluated_batch::EvaluatedBatch; + use crate::index::gpu_spatial_index_builder::GPUSpatialIndexBuilder; + use crate::index::spatial_index::SpatialIndexRef; + use crate::index::spatial_index_builder::{SpatialIndexBuilder, SpatialJoinBuildMetrics}; + use crate::operand_evaluator::EvaluatedGeometryArray; + use crate::spatial_predicate::{RelationPredicate, SpatialRelationType}; + use crate::SpatialPredicate; + use arrow_array::RecordBatch; + use arrow_schema::{DataType, Field, SchemaRef}; + use datafusion_common::JoinType; + use datafusion_physical_expr::expressions::Column; + use futures::Stream; + use sedona_common::{ExecutionMode, SpatialJoinOptions}; + use sedona_expr::statistics::GeoStatistics; + use sedona_schema::datatypes::WKB_GEOMETRY; + use sedona_testing::create::create_array; + use std::pin::Pin; + use std::sync::Arc; + use std::task::{Context, Poll}; + + pub struct SingleBatchStream { + // We use an Option so we can `take()` it on the first poll, + // leaving `None` for subsequent polls to signal the end of the stream. + batch: Option, + schema: SchemaRef, + } + + impl SingleBatchStream { + pub fn new(batch: EvaluatedBatch, schema: SchemaRef) -> Self { + Self { + batch: Some(batch), + schema, + } + } + } + + impl Stream for SingleBatchStream { + type Item = datafusion_common::Result; // Or Result + + fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + // `take()` removes the value from the Option, leaving `None` in its place. + // If there is a batch, it maps it to `Some(Ok(batch))`. + // If it's already empty, it returns `None`. + Poll::Ready(self.batch.take().map(Ok)) + } + } + + impl EvaluatedBatchStream for SingleBatchStream { + fn is_external(&self) -> bool { + false + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + } + + async fn build_index( + mut builder: GPUSpatialIndexBuilder, + indexed_batch: EvaluatedBatch, + schema: SchemaRef, + ) -> SpatialIndexRef { + let single_batch_stream = SingleBatchStream::new(indexed_batch, schema); + let sendable_stream: SendableEvaluatedBatchStream = Box::pin(single_batch_stream); + let stats = GeoStatistics::empty(); + builder.add_stream(sendable_stream, stats).await.unwrap(); + builder.finish().unwrap() + } + #[test] + fn test_spatial_index_builder_empty() { + let options = SpatialJoinOptions { + execution_mode: ExecutionMode::PrepareBuild, + ..Default::default() + }; + let metrics = SpatialJoinBuildMetrics::default(); + let schema = Arc::new(arrow_schema::Schema::empty()); + let spatial_predicate = SpatialPredicate::Relation(RelationPredicate::new( + Arc::new(Column::new("geom", 0)), + Arc::new(Column::new("geom", 1)), + SpatialRelationType::Intersects, + )); + + let builder = GPUSpatialIndexBuilder::new( + schema.clone(), + spatial_predicate, + options, + JoinType::Inner, + 4, + metrics, + ); + + // Test finishing with empty data + let index = builder.finish().unwrap(); + assert_eq!(index.schema(), schema); + assert_eq!(index.num_indexed_batches(), 0); + } + + #[tokio::test] + async fn test_spatial_index_builder_add_batch() { + let options = SpatialJoinOptions { + ..Default::default() + }; + let metrics = SpatialJoinBuildMetrics::default(); + + let spatial_predicate = SpatialPredicate::Relation(RelationPredicate::new( + Arc::new(Column::new("geom", 0)), + Arc::new(Column::new("geom", 1)), + SpatialRelationType::Intersects, + )); + + // Create a simple test geometry batch + let schema = Arc::new(arrow_schema::Schema::new(vec![Field::new( + "geom", + DataType::Binary, + true, + )])); + + let builder = GPUSpatialIndexBuilder::new( + schema.clone(), + spatial_predicate, + options, + JoinType::Inner, + 4, + metrics, + ); + + let batch = RecordBatch::new_empty(schema.clone()); + let geom_batch = create_array( + &[ + Some("POINT (0.25 0.25)"), + Some("POINT (10 10)"), + None, + Some("POINT (0.25 0.25)"), + ], + &WKB_GEOMETRY, + ); + let indexed_batch = EvaluatedBatch { + batch, + geom_array: EvaluatedGeometryArray::try_new(geom_batch, &WKB_GEOMETRY).unwrap(), + }; + let index = build_index(builder, indexed_batch, schema.clone()).await; + + assert_eq!(index.schema(), schema); + assert_eq!(index.num_indexed_batches(), 1); + } + + async fn setup_index_for_batch_test( + build_geoms: &[Option<&str>], + options: SpatialJoinOptions, + ) -> SpatialIndexRef { + let metrics = SpatialJoinBuildMetrics::default(); + let spatial_predicate = SpatialPredicate::Relation(RelationPredicate::new( + Arc::new(Column::new("left", 0)), + Arc::new(Column::new("right", 0)), + SpatialRelationType::Intersects, + )); + let schema = Arc::new(arrow_schema::Schema::new(vec![Field::new( + "geom", + DataType::Binary, + true, + )])); + + let builder = GPUSpatialIndexBuilder::new( + schema.clone(), + spatial_predicate, + options, + JoinType::Inner, + 1, + metrics, + ); + + let geom_array = create_array(build_geoms, &WKB_GEOMETRY); + let batch = RecordBatch::try_new( + Arc::new(arrow_schema::Schema::new(vec![Field::new( + "geom", + DataType::Binary, + true, + )])), + vec![Arc::new(geom_array.clone())], + ) + .unwrap(); + let evaluated_batch = EvaluatedBatch { + batch, + geom_array: EvaluatedGeometryArray::try_new(geom_array, &WKB_GEOMETRY).unwrap(), + }; + build_index(builder, evaluated_batch, schema).await + } + + fn create_probe_batch(probe_geoms: &[Option<&str>]) -> Arc { + let geom_array = create_array(probe_geoms, &WKB_GEOMETRY); + let batch = RecordBatch::try_new( + Arc::new(arrow_schema::Schema::new(vec![Field::new( + "geom", + DataType::Binary, + true, + )])), + vec![Arc::new(geom_array.clone())], + ) + .unwrap(); + Arc::new(EvaluatedBatch { + batch, + geom_array: EvaluatedGeometryArray::try_new(geom_array, &WKB_GEOMETRY).unwrap(), + }) + } + #[tokio::test] + async fn test_query_batch_empty_results() { + let build_geoms = &[Some("POINT (0 0)"), Some("POINT (1 1)")]; + let index = setup_index_for_batch_test(build_geoms, SpatialJoinOptions::default()).await; + + // Probe with geometries that don't intersect + let probe_geoms = &[Some("POINT (10 10)"), Some("POINT (20 20)")]; + let probe_batch = create_probe_batch(probe_geoms); + + let mut build_batch_positions = Vec::new(); + let mut probe_indices = Vec::new(); + let (metrics, next_idx) = index + .query_batch( + &probe_batch, + 0..2, + usize::MAX, + &mut build_batch_positions, + &mut probe_indices, + ) + .await + .unwrap(); + + assert_eq!(metrics.count, 0); + assert_eq!(build_batch_positions.len(), 0); + assert_eq!(probe_indices.len(), 0); + assert_eq!(next_idx, 2); + } +} diff --git a/rust/sedona-spatial-join/src/index/gpu_spatial_index_builder.rs b/rust/sedona-spatial-join/src/index/gpu_spatial_index_builder.rs new file mode 100644 index 000000000..b499abf2f --- /dev/null +++ b/rust/sedona-spatial-join/src/index/gpu_spatial_index_builder.rs @@ -0,0 +1,327 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::evaluated_batch::evaluated_batch_stream::SendableEvaluatedBatchStream; +use crate::index::gpu_spatial_index::GPUSpatialIndex; +use crate::index::spatial_index::SpatialIndexRef; +use crate::index::spatial_index_builder::{SpatialIndexBuilder, SpatialJoinBuildMetrics}; +use crate::operand_evaluator::EvaluatedGeometryArray; +use crate::spatial_predicate::SpatialRelationType; +use crate::utils::join_utils::need_produce_result_in_final; +use crate::{ + evaluated_batch::EvaluatedBatch, operand_evaluator::create_operand_evaluator, + spatial_predicate::SpatialPredicate, +}; +use arrow::array::BooleanBufferBuilder; +use arrow::compute::concat; +use arrow_array::RecordBatch; +use arrow_schema::SchemaRef; +use async_trait::async_trait; +use datafusion_common::Result; +use datafusion_common::{DataFusionError, JoinType}; +use futures::StreamExt; +use geo_types::{coord, Rect}; +use parking_lot::Mutex; +use sedona_common::SpatialJoinOptions; +use sedona_expr::statistics::GeoStatistics; +use sedona_libgpuspatial::{GpuSpatialIndex, GpuSpatialOptions, GpuSpatialRefiner}; +use std::sync::atomic::AtomicUsize; +use std::sync::Arc; + +pub struct GPUSpatialIndexBuilder { + schema: SchemaRef, + spatial_predicate: SpatialPredicate, + options: SpatialJoinOptions, + join_type: JoinType, + probe_threads_count: usize, + metrics: SpatialJoinBuildMetrics, + /// Batches to be indexed + indexed_batches: Vec, + /// Statistics for indexed geometries + stats: GeoStatistics, + /// Memory used by the spatial index + memory_used: usize, +} + +impl GPUSpatialIndexBuilder { + pub(crate) fn is_using_gpu( + spatial_predicate: &SpatialPredicate, + join_opts: &SpatialJoinOptions, + ) -> Result { + if join_opts.gpu.enable { + if Self::is_spatial_predicate_supported_on_gpu(spatial_predicate) { + return Ok(true); + } else if join_opts.gpu.fallback_to_cpu { + log::warn!("Falling back to CPU spatial join as the spatial predicate is not supported on GPU"); + return Ok(false); + } else { + return Err(DataFusionError::Execution("GPU spatial join is enabled, but the spatial predicate is not supported on GPU".into())); + } + } + Ok(false) + } + + fn is_spatial_predicate_supported_on_gpu(spatial_predicate: &SpatialPredicate) -> bool { + match spatial_predicate { + SpatialPredicate::Relation(rel) => match rel.relation_type { + SpatialRelationType::Intersects => true, + SpatialRelationType::Contains => true, + SpatialRelationType::Within => true, + SpatialRelationType::Covers => true, + SpatialRelationType::CoveredBy => true, + SpatialRelationType::Touches => true, + SpatialRelationType::Crosses => false, + SpatialRelationType::Overlaps => false, + SpatialRelationType::Equals => true, + }, + SpatialPredicate::Distance(_) => false, + SpatialPredicate::KNearestNeighbors(_) => false, + } + } + pub fn new( + schema: SchemaRef, + spatial_predicate: SpatialPredicate, + options: SpatialJoinOptions, + join_type: JoinType, + probe_threads_count: usize, + metrics: SpatialJoinBuildMetrics, + ) -> Self { + Self { + schema, + spatial_predicate, + options, + join_type, + probe_threads_count, + metrics, + indexed_batches: vec![], + stats: GeoStatistics::empty(), + memory_used: 0, + } + } + + fn build_visited_bitmaps(&mut self) -> Result>>> { + if !need_produce_result_in_final(self.join_type) { + return Ok(None); + } + + let mut bitmaps = Vec::with_capacity(self.indexed_batches.len()); + let mut total_buffer_size = 0; + + for batch in &self.indexed_batches { + let batch_rows = batch.batch.num_rows(); + let buffer_size = batch_rows.div_ceil(8); + total_buffer_size += buffer_size; + + let mut bitmap = BooleanBufferBuilder::new(batch_rows); + bitmap.append_n(batch_rows, false); + bitmaps.push(bitmap); + } + + self.record_memory_usage(total_buffer_size); + + Ok(Some(Mutex::new(bitmaps))) + } + + fn record_memory_usage(&mut self, bytes: usize) { + self.memory_used += bytes; + self.metrics.build_mem_used.set_max(self.memory_used); + } + /// Add a geometry batch to be indexed. + /// + /// This method accumulates geometry batches that will be used to build the spatial index. + /// Each batch contains processed geometry data along with memory usage information. + fn add_batch(&mut self, indexed_batch: EvaluatedBatch) -> Result<()> { + let in_mem_size = indexed_batch.in_mem_size()?; + self.indexed_batches.push(indexed_batch); + self.record_memory_usage(in_mem_size); + Ok(()) + } + fn merge_stats(&mut self, stats: GeoStatistics) -> &mut Self { + self.stats.merge(&stats); + self + } +} + +#[async_trait] +impl SpatialIndexBuilder for GPUSpatialIndexBuilder { + fn estimate_extra_memory_usage( + &self, + geo_stats: &GeoStatistics, + _spatial_predicate: &SpatialPredicate, + _options: &SpatialJoinOptions, + ) -> usize { + let num_geoms = geo_stats.total_geometries().unwrap_or(0) as usize; + // Each geometry requires 4 f32 values to store the bounding rectangle (min_x, min_y, max_x, max_y) + num_geoms * (4 * 4) + } + /// Finish building and return the completed SpatialIndex. + fn finish(mut self) -> Result { + if self.indexed_batches.is_empty() { + return Ok(Arc::new(GPUSpatialIndex::empty( + self.spatial_predicate, + self.schema, + self.options, + AtomicUsize::new(self.probe_threads_count), + )?)); + } + let build_timer = self.metrics.build_time.timer(); + let gpu_options = GpuSpatialOptions { + cuda_use_memory_pool: self.options.gpu.use_memory_pool, + cuda_memory_pool_init_percent: self.options.gpu.memory_pool_init_percentage as i32, + concurrency: self.probe_threads_count as u32, + device_id: self.options.gpu.device_id as i32, + compress_bvh: self.options.gpu.compress_bvh, + pipeline_batches: self.options.gpu.pipeline_batches as u32, + }; + + let mut index = GpuSpatialIndex::try_new(&gpu_options).map_err(|e| { + DataFusionError::Execution(format!("Failed to initialize GPU context {e:?}")) + })?; + let mut refiner = GpuSpatialRefiner::try_new(&gpu_options).map_err(|e| { + DataFusionError::Execution(format!("Failed to initialize GPU context {e:?}")) + })?; + + // Concat indexed batches into a single batch to reduce build time + let total_rows: usize = self + .indexed_batches + .iter() + .map(|batch| batch.batch.num_rows()) + .sum(); + + let sedona_type = self.indexed_batches[0].geom_array.sedona_type.clone(); + + if self.options.gpu.concat_build { + let all_record_batches: Vec<&RecordBatch> = self + .indexed_batches + .iter() + .map(|batch| &batch.batch) + .collect(); + let schema = all_record_batches[0].schema(); + let batch = + arrow::compute::concat_batches(&schema, all_record_batches).map_err(|e| { + DataFusionError::Execution(format!("Failed to concatenate left batches: {}", e)) + })?; + let references: Vec<&dyn arrow::array::Array> = self + .indexed_batches + .iter() + .map(|batch| batch.geom_array.geometry_array.as_ref()) + .collect(); + let concat_array = concat(&references)?; + + // rects have been computed by EvaluatedGeometryArray::try_new, we just need to concatenate them + let rects = self + .indexed_batches + .iter() + .flat_map(|batch| batch.geom_array.rects.iter().cloned()) + .collect(); + let eval_batch = EvaluatedBatch { + batch, + geom_array: EvaluatedGeometryArray { + sedona_type: sedona_type.clone(), + geometry_array: Arc::new(concat_array), + rects, + distance: None, + wkbs: vec![], + }, + }; + self.indexed_batches.clear(); + self.indexed_batches.push(eval_batch); + } + let mut data_id_to_batch_pos: Vec<(i32, i32)> = Vec::with_capacity( + self.indexed_batches + .iter() + .map(|x| x.batch.num_rows()) + .sum(), + ); + let empty_rect = Rect::new( + coord!(x: f32::NAN, y: f32::NAN), + coord!(x: f32::NAN, y: f32::NAN), + ); + + refiner + .init_build_schema(sedona_type.storage_type()) + .map_err(|e| { + DataFusionError::Execution(format!("Failed to init schema for refiner {e:?}")) + })?; + + let mut native_rects = Vec::with_capacity(total_rows); + + for (batch_idx, batch) in self.indexed_batches.iter().enumerate() { + let rects = batch.rects(); + + for (idx, rect_opt) in rects.iter().enumerate() { + if let Some(rect) = rect_opt { + native_rects.push(*rect); + } else { + native_rects.push(empty_rect); + } + data_id_to_batch_pos.push((batch_idx as i32, idx as i32)); + } + refiner + .push_build(&batch.geom_array.geometry_array) + .map_err(|e| { + DataFusionError::Execution(format!( + "Failed to add geometries to GPU refiner {e:?}" + )) + })?; + } + + refiner.finish_building().map_err(|e| { + DataFusionError::Execution(format!("Failed to build spatial refiner on GPU {e:?}")) + })?; + + // Add rectangles from build side to the spatial index + index.push_build(&native_rects).map_err(|e| { + DataFusionError::Execution(format!( + "Failed to push rectangles to GPU spatial index {e:?}" + )) + })?; + + index.finish_building().map_err(|e| { + DataFusionError::Execution(format!("Failed to build spatial index on GPU {e:?}")) + })?; + build_timer.done(); + let visited_build_side = self.build_visited_bitmaps()?; + let evaluator = create_operand_evaluator(&self.spatial_predicate, self.options.clone()); + // Build index for rectangle queries + Ok(Arc::new(GPUSpatialIndex::new( + self.spatial_predicate, + self.schema, + self.options, + evaluator, + Arc::new(index), + Arc::new(refiner), + self.indexed_batches, + data_id_to_batch_pos, + visited_build_side, + AtomicUsize::new(self.probe_threads_count), + )?)) + } + + async fn add_stream( + &mut self, + mut stream: SendableEvaluatedBatchStream, + geo_statistics: GeoStatistics, + ) -> Result<()> { + while let Some(batch) = stream.next().await { + let indexed_batch = batch?; + self.add_batch(indexed_batch)?; + } + self.merge_stats(geo_statistics); + Ok(()) + } +} diff --git a/rust/sedona-spatial-join/src/index/partitioned_index_provider.rs b/rust/sedona-spatial-join/src/index/partitioned_index_provider.rs index c56e99306..d23143a00 100644 --- a/rust/sedona-spatial-join/src/index/partitioned_index_provider.rs +++ b/rust/sedona-spatial-join/src/index/partitioned_index_provider.rs @@ -20,6 +20,7 @@ use crate::evaluated_batch::evaluated_batch_stream::{ EvaluatedBatchStream, SendableEvaluatedBatchStream, }; use crate::evaluated_batch::EvaluatedBatch; +use crate::index::gpu_spatial_index_builder::GPUSpatialIndexBuilder; use crate::index::spatial_index::SpatialIndexRef; use crate::index::spatial_index_builder::{SpatialIndexBuilder, SpatialJoinBuildMetrics}; use crate::index::{BuildPartition, DefaultSpatialIndexBuilder}; @@ -34,6 +35,7 @@ use datafusion_expr::JoinType; use futures::{Stream, StreamExt}; use parking_lot::Mutex; use sedona_common::{sedona_internal_err, SpatialJoinOptions}; +use sedona_expr::statistics::GeoStatistics; use std::ops::DerefMut; use std::pin::Pin; use std::sync::Arc; @@ -270,18 +272,35 @@ impl PartitionedIndexProvider { .count() } + fn create_index_builder(&self) -> Result { + if GPUSpatialIndexBuilder::is_using_gpu(&self.spatial_predicate, &self.options)? { + let builder = GPUSpatialIndexBuilder::new( + Arc::clone(&self.schema), + self.spatial_predicate.clone(), + self.options.clone(), + self.join_type, + self.probe_threads_count, + self.metrics.clone(), + ); + Ok(SpatialIndexBuilderVariant::Gpu(builder)) + } else { + let builder = DefaultSpatialIndexBuilder::new( + Arc::clone(&self.schema), + self.spatial_predicate.clone(), + self.options.clone(), + self.join_type, + self.probe_threads_count, + self.metrics.clone(), + )?; + Ok(SpatialIndexBuilderVariant::Default(builder)) + } + } + async fn build_index_for_single_partition( &self, build_partitions: Vec, ) -> Result { - let mut builder = DefaultSpatialIndexBuilder::new( - Arc::clone(&self.schema), - self.spatial_predicate.clone(), - self.options.clone(), - self.join_type, - self.probe_threads_count, - self.metrics.clone(), - )?; + let mut builder = self.create_index_builder()?; for build_partition in build_partitions { let stream = build_partition.build_side_batch_stream; @@ -296,14 +315,7 @@ impl PartitionedIndexProvider { &self, spilled_partition: SpilledPartition, ) -> Result { - let mut builder = DefaultSpatialIndexBuilder::new( - Arc::clone(&self.schema), - self.spatial_predicate.clone(), - self.options.clone(), - self.join_type, - self.probe_threads_count, - self.metrics.clone(), - )?; + let mut builder = self.create_index_builder()?; // Spawn tasks to load indexed batches from spilled files concurrently let (spill_files, geo_statistics, _) = spilled_partition.into_inner(); @@ -392,6 +404,36 @@ impl EvaluatedBatchStream for ReceiverBatchStream { } } +enum SpatialIndexBuilderVariant { + Default(DefaultSpatialIndexBuilder), + Gpu(GPUSpatialIndexBuilder), +} + +impl SpatialIndexBuilderVariant { + // Delegate the mutable method + pub async fn add_stream( + &mut self, + stream: SendableEvaluatedBatchStream, + geo_statistics: GeoStatistics, + ) -> Result<()> { + match self { + SpatialIndexBuilderVariant::Default(builder) => { + builder.add_stream(stream, geo_statistics).await + } + SpatialIndexBuilderVariant::Gpu(builder) => { + builder.add_stream(stream, geo_statistics).await + } + } + } + + pub fn finish(self) -> Result { + match self { + SpatialIndexBuilderVariant::Default(builder) => builder.finish(), + SpatialIndexBuilderVariant::Gpu(builder) => builder.finish(), + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/rust/sedona-spatial-join/src/operand_evaluator.rs b/rust/sedona-spatial-join/src/operand_evaluator.rs index efcf2a436..3cbeaaf1e 100644 --- a/rust/sedona-spatial-join/src/operand_evaluator.rs +++ b/rust/sedona-spatial-join/src/operand_evaluator.rs @@ -14,20 +14,18 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. -use core::fmt; -use std::{mem::transmute, sync::Arc}; - use arrow_array::{Array, ArrayRef, Float64Array, RecordBatch}; use arrow_schema::DataType; +use core::fmt; use datafusion_common::{utils::proxy::VecAllocExt, JoinSide, Result, ScalarValue}; use datafusion_expr::ColumnarValue; use datafusion_physical_expr::PhysicalExpr; use float_next_after::NextAfter; -use geo_index::rtree::util::f64_box_to_f32; use geo_types::{coord, Rect}; use sedona_functions::executor::IterGeo; use sedona_geo_generic_alg::BoundingRect; use sedona_schema::datatypes::SedonaType; +use std::{mem::transmute, sync::Arc}; use wkb::reader::Wkb; use sedona_common::option::SpatialJoinOptions; @@ -103,10 +101,38 @@ pub struct EvaluatedGeometryArray { /// but we'll only allow accessing Wkb<'a> where 'a is the lifetime of the GeometryBatchResult to make /// the interfaces safe. The buffers in `geometry_array` are allocated on the heap and won't be moved when /// the GeometryBatchResult is moved, so we don't need to worry about pinning. - wkbs: Vec>>, + pub wkbs: Vec>>, } impl EvaluatedGeometryArray { + #[cfg(feature = "gpu")] + /// Expand the box by two ULPs to ensure the resulting f32 box covers a f64 point that + /// is covered by the original f64 box. + fn make_conservative_box( + min_x: f64, + min_y: f64, + max_x: f64, + max_y: f64, + ) -> (f32, f32, f32, f32) { + let mut new_min_x = min_x as f32; + let mut new_min_y = min_y as f32; + let mut new_max_x = max_x as f32; + let mut new_max_y = max_y as f32; + + for _ in 0..2 { + new_min_x = new_min_x.next_after(f32::NEG_INFINITY); + new_min_y = new_min_y.next_after(f32::NEG_INFINITY); + new_max_x = new_max_x.next_after(f32::INFINITY); + new_max_y = new_max_y.next_after(f32::INFINITY); + } + + debug_assert!((new_min_x as f64) <= min_x); + debug_assert!((new_min_y as f64) <= min_y); + debug_assert!((new_max_x as f64) >= max_x); + debug_assert!((new_max_y as f64) >= max_y); + + (new_min_x, new_min_y, new_max_x, new_max_y) + } pub fn try_new(geometry_array: ArrayRef, sedona_type: &SedonaType) -> Result { let num_rows = geometry_array.len(); let mut rect_vec = Vec::with_capacity(num_rows); @@ -116,10 +142,35 @@ impl EvaluatedGeometryArray { if let Some(rect) = wkb.bounding_rect() { let min = rect.min(); let max = rect.max(); - // f64_box_to_f32 will ensure the resulting `f32` box is no smaller than the `f64` box. - let (min_x, min_y, max_x, max_y) = f64_box_to_f32(min.x, min.y, max.x, max.y); - let rect = Rect::new(coord!(x: min_x, y: min_y), coord!(x: max_x, y: max_y)); - Some(rect) + #[cfg(feature = "gpu")] + { + use wkb::reader::GeometryType; + // For point geometries, we can directly cast f64 to f32 without expanding the box. + // This enables libgpuspatial to treat the Rect as point for faster processing. + if wkb.geometry_type() == GeometryType::Point { + Some(Rect::new( + coord!(x: min.x as f32, y: min.y as f32), + coord!(x: max.x as f32, y: max.y as f32), + )) + } else { + let (min_x, min_y, max_x, max_y) = + Self::make_conservative_box(min.x, min.y, max.x, max.y); + Some(Rect::new( + coord!(x: min_x, y: min_y), + coord!(x: max_x, y: max_y), + )) + } + } + #[cfg(not(feature = "gpu"))] + { + use geo_index::rtree::util::f64_box_to_f32; + // f64_box_to_f32 will ensure the resulting `f32` box is no smaller than the `f64` box. + let (min_x, min_y, max_x, max_y) = + f64_box_to_f32(min.x, min.y, max.x, max.y); + let rect = + Rect::new(coord!(x: min_x, y: min_y), coord!(x: max_x, y: max_y)); + Some(rect) + } } else { None } @@ -147,7 +198,6 @@ impl EvaluatedGeometryArray { wkbs, }) } - /// Get the WKBs of the geometries in the geometry array. pub fn wkbs(&self) -> &Vec>> { // The returned WKBs are guaranteed to be valid for the lifetime of the GeometryBatchResult, diff --git a/rust/sedona-spatial-join/src/prepare.rs b/rust/sedona-spatial-join/src/prepare.rs index b0ee38dbc..02a68d2a8 100644 --- a/rust/sedona-spatial-join/src/prepare.rs +++ b/rust/sedona-spatial-join/src/prepare.rs @@ -217,6 +217,7 @@ impl SpatialJoinComponentsBuilder { collect_metrics_vec.push(CollectBuildSideMetrics::new(k, &self.metrics)); } let join_metrics = SpatialJoinBuildMetrics::new(0, &self.metrics); + // Passing a SpatialIndexBuilder for estimating memory usage during collection let builder = Arc::new(DefaultSpatialIndexBuilder::new( self.build_schema.clone(), self.spatial_predicate.clone(), diff --git a/rust/sedona/Cargo.toml b/rust/sedona/Cargo.toml index 051c7841b..bf9ae6769 100644 --- a/rust/sedona/Cargo.toml +++ b/rust/sedona/Cargo.toml @@ -44,6 +44,7 @@ proj = ["sedona-proj/proj-sys"] gdal = ["sedona-gdal/gdal-sys"] spatial-join = ["dep:sedona-spatial-join"] s2geography = ["dep:sedona-s2geography"] +gpu = ["sedona-spatial-join/gpu"] [dev-dependencies] tempfile = { workspace = true } @@ -85,6 +86,7 @@ sedona-spatial-join = { workspace = true, optional = true } sedona-s2geography = { workspace = true, optional = true } sedona-testing = { workspace = true } sedona-tg = { workspace = true, optional = true } +sedona-libgpuspatial = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } sysinfo = { workspace = true } diff --git a/rust/sedona/src/context.rs b/rust/sedona/src/context.rs index ecf5362ba..b5ee6732b 100644 --- a/rust/sedona/src/context.rs +++ b/rust/sedona/src/context.rs @@ -135,7 +135,21 @@ impl SedonaContext { .with_geometry_encoding(GeometryEncoding::Wkb) .with_las_extra_bytes(LasExtraBytes::Typed), ); - + // Auto-enable GPU when built with gpu feature + // The optimizer will check actual GPU availability at runtime + #[cfg(feature = "gpu")] + let session_config = { + use sedona_common::option::SedonaOptions; + let mut session_config = session_config; + if let Some(sedona_opts) = session_config + .options_mut() + .extensions + .get_mut::() + { + sedona_opts.spatial_join.gpu.enable = true; + } + session_config + }; #[allow(unused_mut)] let mut state_builder = SessionStateBuilder::new() .with_default_features() From 41b97c8ff75b70fce3ff662dc3a341eaad57dee3 Mon Sep 17 00:00:00 2001 From: Liang Geng Date: Thu, 19 Mar 2026 07:58:35 -0400 Subject: [PATCH 2/4] Fix codereview issues and add tests --- Cargo.lock | 2 + Cargo.toml | 1 + .../src/index/gpu_spatial_index.rs | 237 +++++++++++++++++- .../src/index/gpu_spatial_index_builder.rs | 15 +- .../src/index/partitioned_index_provider.rs | 1 + 5 files changed, 242 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 57b5ebad8..2555ef430 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5142,6 +5142,7 @@ dependencies = [ "sedona-geometry", "sedona-geoparquet", "sedona-geos", + "sedona-libgpuspatial", "sedona-pointcloud", "sedona-proj", "sedona-raster-functions", @@ -5658,6 +5659,7 @@ dependencies = [ "sedona-geo-traits-ext", "sedona-geometry", "sedona-geos", + "sedona-libgpuspatial", "sedona-schema", "sedona-testing", "sedona-tg", diff --git a/Cargo.toml b/Cargo.toml index 2e579ce20..170f2f9cc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -156,3 +156,4 @@ sedona-gdal = { version = "0.4.0", path = "c/sedona-gdal", default-features = fa sedona-proj = { version = "0.4.0", path = "c/sedona-proj", default-features = false } sedona-s2geography = { version = "0.4.0", path = "c/sedona-s2geography" } sedona-tg = { version = "0.4.0", path = "c/sedona-tg" } +sedona-libgpuspatial = { version = "0.4.0", path = "c/sedona-libgpuspatial" } diff --git a/rust/sedona-spatial-join/src/index/gpu_spatial_index.rs b/rust/sedona-spatial-join/src/index/gpu_spatial_index.rs index 1edf3dfdb..5b896744c 100644 --- a/rust/sedona-spatial-join/src/index/gpu_spatial_index.rs +++ b/rust/sedona-spatial-join/src/index/gpu_spatial_index.rs @@ -328,13 +328,14 @@ mod tests { use datafusion_common::JoinType; use datafusion_physical_expr::expressions::Column; use futures::Stream; - use sedona_common::{ExecutionMode, SpatialJoinOptions}; + use sedona_common::{ExecutionMode, GpuOptions, SpatialJoinOptions}; use sedona_expr::statistics::GeoStatistics; use sedona_schema::datatypes::WKB_GEOMETRY; use sedona_testing::create::create_array; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; + use std::vec::IntoIter; pub struct SingleBatchStream { // We use an Option so we can `take()` it on the first poll, @@ -384,6 +385,58 @@ mod tests { builder.add_stream(sendable_stream, stats).await.unwrap(); builder.finish().unwrap() } + + // 1. Create a new stream struct for multiple batches + pub struct VecBatchStream { + // IntoIter allows us to cleanly pop items off the vector one by one + batches: IntoIter, + schema: SchemaRef, + } + + impl VecBatchStream { + pub fn new(batches: Vec, schema: SchemaRef) -> Self { + Self { + batches: batches.into_iter(), + schema, + } + } + } + + impl Stream for VecBatchStream { + type Item = datafusion_common::Result; + + fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + // `next()` on IntoIter returns Option + // We map it to Option> to match the stream's Item type + Poll::Ready(self.batches.next().map(Ok)) + } + } + + impl EvaluatedBatchStream for VecBatchStream { + fn is_external(&self) -> bool { + false + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + } + + // 2. Write the new build_index function that accepts the Vec + async fn build_index_from_vec( + mut builder: GPUSpatialIndexBuilder, + indexed_batches: Vec, + schema: SchemaRef, + ) -> SpatialIndexRef { + let vec_batch_stream = VecBatchStream::new(indexed_batches, schema); + let sendable_stream: SendableEvaluatedBatchStream = Box::pin(vec_batch_stream); + + let stats = GeoStatistics::empty(); + + // Add the stream of multiple batches to the builder + builder.add_stream(sendable_stream, stats).await.unwrap(); + builder.finish().unwrap() + } #[test] fn test_spatial_index_builder_empty() { let options = SpatialJoinOptions { @@ -462,6 +515,86 @@ mod tests { assert_eq!(index.num_indexed_batches(), 1); } + #[tokio::test] + async fn test_spatial_index_builder_add_multiple_batches() { + let gpu_ops = GpuOptions { + concat_build: false, + ..Default::default() + }; + let options = SpatialJoinOptions { + gpu: gpu_ops, + ..Default::default() + }; + let metrics = SpatialJoinBuildMetrics::default(); + + let spatial_predicate = SpatialPredicate::Relation(RelationPredicate::new( + Arc::new(Column::new("geom", 0)), + Arc::new(Column::new("geom", 1)), + SpatialRelationType::Intersects, + )); + + // Create the schema + let schema = Arc::new(arrow_schema::Schema::new(vec![Field::new( + "geom", + DataType::Binary, + true, + )])); + + // Initialize the builder + let builder = GPUSpatialIndexBuilder::new( + schema.clone(), + spatial_predicate, + options, + JoinType::Inner, + 4, + metrics, + ); + + // --- Create Batch 1 --- + let batch1 = RecordBatch::new_empty(schema.clone()); + let geom_batch1 = create_array( + &[ + Some("POINT (0.25 0.25)"), + Some("POINT (10 10)"), + None, + Some("POINT (0.25 0.25)"), + ], + &WKB_GEOMETRY, + ); + let evaluated_batch1 = EvaluatedBatch { + batch: batch1, + geom_array: EvaluatedGeometryArray::try_new(geom_batch1, &WKB_GEOMETRY).unwrap(), + }; + + // --- Create Batch 2 --- + let batch2 = RecordBatch::new_empty(schema.clone()); + let geom_batch2 = create_array( + &[ + Some("POINT (1 1)"), + Some("POINT (5 5)"), + Some("POINT (20 20)"), + ], + &WKB_GEOMETRY, + ); + let evaluated_batch2 = EvaluatedBatch { + batch: batch2, + geom_array: EvaluatedGeometryArray::try_new(geom_batch2, &WKB_GEOMETRY).unwrap(), + }; + + // --- Build the Index --- + // Combine them into a Vec and use the multi-batch builder function + let indexed_batches = vec![evaluated_batch1, evaluated_batch2]; + + // Note: This relies on the `build_index_from_vec` function we created earlier + let index = build_index_from_vec(builder, indexed_batches, schema.clone()).await; + + // --- Assertions --- + assert_eq!(index.schema(), schema); + + // Assert that exactly 2 batches were indexed + assert_eq!(index.num_indexed_batches(), 2); + } + async fn setup_index_for_batch_test( build_geoms: &[Option<&str>], options: SpatialJoinOptions, @@ -547,4 +680,106 @@ mod tests { assert_eq!(probe_indices.len(), 0); assert_eq!(next_idx, 2); } + + #[tokio::test] + async fn test_query_batch_non_empty_results_multiple_build_batches() { + let gpu_ops = GpuOptions { + concat_build: false, + ..Default::default() + }; + let options = SpatialJoinOptions { + gpu: gpu_ops, + ..Default::default() + }; + let metrics = SpatialJoinBuildMetrics::default(); + + let spatial_predicate = SpatialPredicate::Relation(RelationPredicate::new( + Arc::new(Column::new("geom", 0)), + Arc::new(Column::new("geom", 1)), + SpatialRelationType::Intersects, + )); + + let schema = Arc::new(arrow_schema::Schema::new(vec![Field::new( + "geom", + DataType::Binary, + true, + )])); + + let builder = GPUSpatialIndexBuilder::new( + schema.clone(), + spatial_predicate, + options, + JoinType::Inner, + 4, + metrics, + ); + + // --- Build Side: Multiple Batches of Polygons --- + // Batch 0 + let build_batch0 = RecordBatch::new_empty(schema.clone()); + let build_geom_batch0 = create_array( + &[ + Some("POLYGON ((0 0, 0 10, 10 10, 10 0, 0 0))"), + Some("POLYGON ((20 20, 20 30, 30 30, 30 20, 20 20))"), + ], + &WKB_GEOMETRY, + ); + let evaluated_build0 = EvaluatedBatch { + batch: build_batch0, + geom_array: EvaluatedGeometryArray::try_new(build_geom_batch0, &WKB_GEOMETRY).unwrap(), + }; + + // Batch 1 + let build_batch1 = RecordBatch::new_empty(schema.clone()); + let build_geom_batch1 = create_array( + &[Some("POLYGON ((40 40, 40 50, 50 50, 50 40, 40 40))")], + &WKB_GEOMETRY, + ); + let evaluated_build1 = EvaluatedBatch { + batch: build_batch1, + geom_array: EvaluatedGeometryArray::try_new(build_geom_batch1, &WKB_GEOMETRY).unwrap(), + }; + + // Build the multi-batch index using the helper function we created previously + let indexed_batches = vec![evaluated_build0, evaluated_build1]; + let index = build_index_from_vec(builder, indexed_batches, schema.clone()).await; + + // --- Probe Side: One Batch of Points --- + let probe_batch = RecordBatch::new_empty(schema.clone()); + let probe_geoms = &[ + Some("POINT (5 5)"), // Matches Batch 0, Row 0 + Some("POINT (100 100)"), // No match + Some("POINT (25 25)"), // Matches Batch 0, Row 1 + Some("POINT (45 45)"), // Matches Batch 1, Row 0 + ]; + let probe_batch = create_probe_batch(probe_geoms); + + // --- Execute Query --- + let mut build_batch_positions = Vec::new(); + let mut probe_indices = Vec::new(); + + // Probing all 4 points + let (query_metrics, next_idx) = index + .query_batch( + &probe_batch, + 0..4, + usize::MAX, + &mut build_batch_positions, + &mut probe_indices, + ) + .await + .unwrap(); + + // --- Assertions --- + // We expect exactly 3 matches out of the 4 probe points + assert_eq!(query_metrics.count, 3); + assert_eq!(build_batch_positions.len(), 3); + assert_eq!(probe_indices.len(), 3); + + // The probe indices should match the rows in our probe batch that successfully intersected + assert_eq!(probe_indices, vec![0, 2, 3]); + + // The query processed all 4 probe points + assert_eq!(next_idx, 4); + } } diff --git a/rust/sedona-spatial-join/src/index/gpu_spatial_index_builder.rs b/rust/sedona-spatial-join/src/index/gpu_spatial_index_builder.rs index b499abf2f..e4f225567 100644 --- a/rust/sedona-spatial-join/src/index/gpu_spatial_index_builder.rs +++ b/rust/sedona-spatial-join/src/index/gpu_spatial_index_builder.rs @@ -52,8 +52,6 @@ pub struct GPUSpatialIndexBuilder { /// Batches to be indexed indexed_batches: Vec, /// Statistics for indexed geometries - stats: GeoStatistics, - /// Memory used by the spatial index memory_used: usize, } @@ -108,7 +106,6 @@ impl GPUSpatialIndexBuilder { probe_threads_count, metrics, indexed_batches: vec![], - stats: GeoStatistics::empty(), memory_used: 0, } } @@ -150,10 +147,6 @@ impl GPUSpatialIndexBuilder { self.record_memory_usage(in_mem_size); Ok(()) } - fn merge_stats(&mut self, stats: GeoStatistics) -> &mut Self { - self.stats.merge(&stats); - self - } } #[async_trait] @@ -211,10 +204,7 @@ impl SpatialIndexBuilder for GPUSpatialIndexBuilder { .map(|batch| &batch.batch) .collect(); let schema = all_record_batches[0].schema(); - let batch = - arrow::compute::concat_batches(&schema, all_record_batches).map_err(|e| { - DataFusionError::Execution(format!("Failed to concatenate left batches: {}", e)) - })?; + let batch = arrow::compute::concat_batches(&schema, all_record_batches)?; let references: Vec<&dyn arrow::array::Array> = self .indexed_batches .iter() @@ -315,13 +305,12 @@ impl SpatialIndexBuilder for GPUSpatialIndexBuilder { async fn add_stream( &mut self, mut stream: SendableEvaluatedBatchStream, - geo_statistics: GeoStatistics, + _geo_statistics: GeoStatistics, ) -> Result<()> { while let Some(batch) = stream.next().await { let indexed_batch = batch?; self.add_batch(indexed_batch)?; } - self.merge_stats(geo_statistics); Ok(()) } } diff --git a/rust/sedona-spatial-join/src/index/partitioned_index_provider.rs b/rust/sedona-spatial-join/src/index/partitioned_index_provider.rs index d23143a00..c707f65b5 100644 --- a/rust/sedona-spatial-join/src/index/partitioned_index_provider.rs +++ b/rust/sedona-spatial-join/src/index/partitioned_index_provider.rs @@ -404,6 +404,7 @@ impl EvaluatedBatchStream for ReceiverBatchStream { } } +#[allow(clippy::large_enum_variant)] enum SpatialIndexBuilderVariant { Default(DefaultSpatialIndexBuilder), Gpu(GPUSpatialIndexBuilder), From 8e5a9a4e4936b6d5f3d1cc78f5343be9d43901a3 Mon Sep 17 00:00:00 2001 From: Liang Geng Date: Fri, 20 Mar 2026 07:44:34 -0400 Subject: [PATCH 3/4] Fix CI --- .github/workflows/rust.yml | 2 +- rust/sedona-spatial-join/src/index/gpu_spatial_index.rs | 1 - rust/sedona/Cargo.toml | 1 + 3 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index d79d7a23a..7b076264c 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -162,7 +162,7 @@ jobs: - name: Test if: matrix.name == 'test' run: | - cargo test --workspace --all-targets --all-features + cargo test --workspace --all-targets --features all_except_gpu # Clean up intermediate build artifacts to free disk space aggressively cargo clean -p sedona-s2geography rm -rf target/debug/deps diff --git a/rust/sedona-spatial-join/src/index/gpu_spatial_index.rs b/rust/sedona-spatial-join/src/index/gpu_spatial_index.rs index 5b896744c..16737c63e 100644 --- a/rust/sedona-spatial-join/src/index/gpu_spatial_index.rs +++ b/rust/sedona-spatial-join/src/index/gpu_spatial_index.rs @@ -745,7 +745,6 @@ mod tests { let index = build_index_from_vec(builder, indexed_batches, schema.clone()).await; // --- Probe Side: One Batch of Points --- - let probe_batch = RecordBatch::new_empty(schema.clone()); let probe_geoms = &[ Some("POINT (5 5)"), // Matches Batch 0, Row 0 Some("POINT (100 100)"), // No match diff --git a/rust/sedona/Cargo.toml b/rust/sedona/Cargo.toml index bf9ae6769..386942920 100644 --- a/rust/sedona/Cargo.toml +++ b/rust/sedona/Cargo.toml @@ -32,6 +32,7 @@ result_large_err = "allow" [features] default = ["aws", "azure", "gcp", "http", "geo", "geos", "tg", "spatial-join"] +all-except-gpu = ["aws", "azure", "gcp", "geo", "geos", "tg", "http", "pointcloud", "proj", "gdal", "spatial-join", "s2geography"] aws = ["dep:aws-config", "dep:aws-credential-types", "object_store/aws"] azure = ["object_store/azure"] gcp = ["object_store/gcp"] From 78a3d2031da2ee2f1fa6fb2b768a534a5edb30d8 Mon Sep 17 00:00:00 2001 From: Liang Geng Date: Fri, 20 Mar 2026 08:03:27 -0400 Subject: [PATCH 4/4] Fix wrong feature name --- .github/workflows/rust.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 7b076264c..9ea56b6d7 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -162,7 +162,7 @@ jobs: - name: Test if: matrix.name == 'test' run: | - cargo test --workspace --all-targets --features all_except_gpu + cargo test --workspace --all-targets --features all-except-gpu # Clean up intermediate build artifacts to free disk space aggressively cargo clean -p sedona-s2geography rm -rf target/debug/deps