Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 47 additions & 1 deletion rust/sedona/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use crate::{
};
use arrow_array::RecordBatch;
use async_trait::async_trait;
use datafusion::dataframe::DataFrameWriteOptions;
use datafusion::datasource::file_format::format_as_file_type;
use datafusion::{
common::plan_err,
Expand All @@ -41,6 +40,7 @@ use datafusion::{
prelude::{DataFrame, SessionConfig, SessionContext},
sql::parser::{DFParser, Statement},
};
use datafusion::{dataframe::DataFrameWriteOptions, execution::memory_pool::MemoryLimit};
use datafusion_common::not_impl_err;
use datafusion_expr::dml::InsertOp;
use datafusion_expr::sqlparser::dialect::{dialect_from_str, Dialect};
Expand Down Expand Up @@ -104,6 +104,7 @@ impl SedonaContext {
// variables.
let session_config = SessionConfig::from_env()?.with_information_schema(true);
let mut session_config = add_sedona_option_extension(session_config);
let target_partitions = session_config.target_partitions();

// Always register the PROJ CrsProvider by default (if PROJ is not configured
// before it is used an error will be raised).
Expand All @@ -115,6 +116,19 @@ impl SedonaContext {
opts.crs_provider =
CrsProviderOption::new(Arc::new(sedona_proj::provider::ProjCrsProvider::default()));

// Set the spilled batch in-memory size threshold to 5% of the per-partition memory limit,
// with a minimum of 10MB. Batches larger than this threshold will be broken into smaller batches
// before writing to spill files. This is to avoid overshooting memory limit when reading super
// large spilled batches.
const SPILLED_BATCH_THRESHOLD_PERCENT_DIVISOR: usize = 20; // 5% == 1 / 20
const MIN_SPILLED_BATCH_IN_MEMORY_THRESHOLD_BYTES: usize = 10 * 1024 * 1024; // 10MB
if let MemoryLimit::Finite(memory_limit) = runtime_env.memory_pool.memory_limit() {
let per_partition_memory_limit = memory_limit.div_ceil(target_partitions);
opts.spatial_join.spilled_batch_in_memory_size_threshold = per_partition_memory_limit
.div_ceil(SPILLED_BATCH_THRESHOLD_PERCENT_DIVISOR)
.max(MIN_SPILLED_BATCH_IN_MEMORY_THRESHOLD_BYTES);
}

#[cfg(feature = "pointcloud")]
let session_config = session_config.with_option_extension(
LasOptions::default()
Expand Down Expand Up @@ -835,4 +849,36 @@ mod tests {
SedonaType::WkbView(Edges::Planar, deserialize_crs("EPSG:3857").unwrap())
);
}

#[tokio::test]
async fn test_auto_configure_spilled_batch_threshold() {
use crate::context_builder::SedonaContextBuilder;
use sedona_common::option::SedonaOptions;

// Specify a memory limit (10GB), spilled batch threshold will also be limited,
// but no lower than 10MB due to the minimum floor.
let memory_limit: usize = 10 * 1024 * 1024 * 1024;
let ctx = SedonaContextBuilder::new()
.with_memory_limit(memory_limit)
.build()
.await
.unwrap();
let state = ctx.ctx.state();
let opts = state
.config_options()
.extensions
.get::<SedonaOptions>()
.expect("SedonaOptions not found");
assert!(opts.spatial_join.spilled_batch_in_memory_size_threshold >= 10 * 1024 * 1024);

// Specify no memory limit, spilled batch threshold should be unlimited (0 is for unlimited)
let ctx = SedonaContextBuilder::new().build().await.unwrap();
let state = ctx.ctx.state();
let opts = state
.config_options()
.extensions
.get::<SedonaOptions>()
.expect("SedonaOptions not found");
assert_eq!(opts.spatial_join.spilled_batch_in_memory_size_threshold, 0);
}
}