Skip to content

Commit

Permalink
feat: Require offHeap memory to be enabled (always use unified memory) (
Browse files Browse the repository at this point in the history
#1062)

* Require offHeap memory

* remove unused import

* use off heap memory in stability tests

* reorder imports
  • Loading branch information
andygrove authored Nov 14, 2024
1 parent f3da844 commit 2c832b4
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 61 deletions.
32 changes: 2 additions & 30 deletions docs/source/user-guide/tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,40 +23,12 @@ Comet provides some tuning options to help you get the best performance from you

## Memory Tuning

Comet provides two options for memory management:

- **Unified Memory Management** shares an off-heap memory pool between Spark and Comet. This is the recommended option.
- **Native Memory Management** leverages DataFusion's memory management for the native plans and allocates memory independently of Spark.

### Unified Memory Management

This option is automatically enabled when `spark.memory.offHeap.enabled=true`.
Comet shares an off-heap memory pool between Spark and Comet. This requires setting `spark.memory.offHeap.enabled=true`.
If this setting is not enabled, Comet will not accelerate queries and will fall back to Spark.

Each executor will have a single memory pool which will be shared by all native plans being executed within that
process, and by Spark itself. The size of the pool is specified by `spark.memory.offHeap.size`.

### Native Memory Management

This option is automatically enabled when `spark.memory.offHeap.enabled=false`.

Each native plan has a dedicated memory pool.

By default, the size of each pool is `spark.comet.memory.overhead.factor * spark.executor.memory`. The default value
for `spark.comet.memory.overhead.factor` is `0.2`.

It is important to take executor concurrency into account. The maximum number of concurrent plans in an executor can
be calculated with `spark.executor.cores / spark.task.cpus`.

For example, if the executor can execute 4 plans concurrently, then the total amount of memory allocated will be
`4 * spark.comet.memory.overhead.factor * spark.executor.memory`.

It is also possible to set `spark.comet.memoryOverhead` to the desired size for each pool, rather than calculating
it based on `spark.comet.memory.overhead.factor`.

If both `spark.comet.memoryOverhead` and `spark.comet.memory.overhead.factor` are set, the former will be used.

Comet will allocate at least `spark.comet.memory.overhead.min` memory per pool.

### Determining How Much Memory to Allocate

Generally, increasing memory overhead will improve query performance, especially for queries containing joins and
Expand Down
24 changes: 3 additions & 21 deletions native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,27 +202,9 @@ fn prepare_datafusion_session_context(

let mut rt_config = RuntimeConfig::new().with_disk_manager(DiskManagerConfig::NewOs);

// Check if we are using unified memory manager integrated with Spark. Default to false if not
// set.
let use_unified_memory_manager = parse_bool(conf, "use_unified_memory_manager")?;

if use_unified_memory_manager {
// Set Comet memory pool for native
let memory_pool = CometMemoryPool::new(comet_task_memory_manager);
rt_config = rt_config.with_memory_pool(Arc::new(memory_pool));
} else {
// Use the memory pool from DF
if conf.contains_key("memory_limit") {
let memory_limit = conf.get("memory_limit").unwrap().parse::<usize>()?;
let memory_fraction = conf
.get("memory_fraction")
.ok_or(CometError::Internal(
"Config 'memory_fraction' is not specified from Comet JVM side".to_string(),
))?
.parse::<f64>()?;
rt_config = rt_config.with_memory_limit(memory_limit, memory_fraction)
}
}
// Set Comet memory pool for native
let memory_pool = CometMemoryPool::new(comet_task_memory_manager);
rt_config = rt_config.with_memory_pool(Arc::new(memory_pool));

// Get Datafusion configuration from Spark Execution context
// can be configured in Comet Spark JVM using Spark --conf parameters
Expand Down
11 changes: 1 addition & 10 deletions spark/src/main/scala/org/apache/comet/CometExecIterator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark._
import org.apache.spark.sql.comet.CometMetricNode
import org.apache.spark.sql.vectorized._

import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_BLOCKING_THREADS, COMET_DEBUG_ENABLED, COMET_EXEC_MEMORY_FRACTION, COMET_EXPLAIN_NATIVE_ENABLED, COMET_WORKER_THREADS}
import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_BLOCKING_THREADS, COMET_DEBUG_ENABLED, COMET_EXPLAIN_NATIVE_ENABLED, COMET_WORKER_THREADS}
import org.apache.comet.vector.NativeUtil

/**
Expand Down Expand Up @@ -75,15 +75,6 @@ class CometExecIterator(
val result = new java.util.HashMap[String, String]()
val conf = SparkEnv.get.conf

val maxMemory = CometSparkSessionExtensions.getCometMemoryOverhead(conf)
// Only enable unified memory manager when off-heap mode is enabled. Otherwise,
// we'll use the built-in memory pool from DF, and initializes with `memory_limit`
// and `memory_fraction` below.
result.put(
"use_unified_memory_manager",
String.valueOf(conf.get("spark.memory.offHeap.enabled", "false")))
result.put("memory_limit", String.valueOf(maxMemory))
result.put("memory_fraction", String.valueOf(COMET_EXEC_MEMORY_FRACTION.get()))
result.put("batch_size", String.valueOf(COMET_BATCH_SIZE.get()))
result.put("debug_native", String.valueOf(COMET_DEBUG_ENABLED.get()))
result.put("explain_native", String.valueOf(COMET_EXPLAIN_NATIVE_ENABLED.get()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -915,6 +915,13 @@ class CometSparkSessionExtensions
}

override def apply(plan: SparkPlan): SparkPlan = {

// Comet required off-heap memory to be enabled
if ("true" != conf.getConfString("spark.memory.offHeap.enabled", "false")) {
logInfo("Comet extension disabled because spark.memory.offHeap.enabled=false")
return plan
}

// DataFusion doesn't have ANSI mode. For now we just disable CometExec if ANSI mode is
// enabled.
if (isANSIEnabled(conf)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import scala.collection.mutable

import org.apache.commons.io.FileUtils
import org.apache.spark.SparkContext
import org.apache.spark.internal.config.{MEMORY_OFFHEAP_ENABLED, MEMORY_OFFHEAP_SIZE}
import org.apache.spark.sql.TPCDSBase
import org.apache.spark.sql.catalyst.expressions.AttributeSet
import org.apache.spark.sql.catalyst.util.resourceToString
Expand Down Expand Up @@ -286,6 +287,8 @@ trait CometPlanStabilitySuite extends DisableAdaptiveExecutionSuite with TPCDSBa
conf.set(
"spark.shuffle.manager",
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
conf.set(MEMORY_OFFHEAP_ENABLED.key, "true")
conf.set(MEMORY_OFFHEAP_SIZE.key, "2g")
conf.set(CometConf.COMET_ENABLED.key, "true")
conf.set(CometConf.COMET_EXEC_ENABLED.key, "true")
conf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "1g")
Expand Down

0 comments on commit 2c832b4

Please sign in to comment.