diff --git a/docs/source/contributor-guide/benchmarking.md b/docs/source/contributor-guide/benchmarking.md index e2e8a7946..456cacef1 100644 --- a/docs/source/contributor-guide/benchmarking.md +++ b/docs/source/contributor-guide/benchmarking.md @@ -57,13 +57,14 @@ $SPARK_HOME/bin/spark-submit \ --conf spark.executor.memory=32G \ --conf spark.executor.cores=8 \ --conf spark.cores.max=8 \ + --conf spark.memory.offHeap.enabled=true \ + --conf spark.memory.offHeap.size=10g \ --jars $COMET_JAR \ --conf spark.driver.extraClassPath=$COMET_JAR \ --conf spark.executor.extraClassPath=$COMET_JAR \ --conf spark.plugins=org.apache.spark.CometPlugin \ --conf spark.comet.cast.allowIncompatible=true \ --conf spark.comet.exec.shuffle.enabled=true \ - --conf spark.comet.exec.shuffle.mode=auto \ --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \ tpcbench.py \ --benchmark tpch \ diff --git a/docs/source/user-guide/tuning.md b/docs/source/user-guide/tuning.md index f1a928ff4..2baced092 100644 --- a/docs/source/user-guide/tuning.md +++ b/docs/source/user-guide/tuning.md @@ -21,45 +21,66 @@ under the License. Comet provides some tuning options to help you get the best performance from your queries. -## Metrics +## Memory Tuning -Comet metrics are not directly comparable to Spark metrics in some cases. +Comet provides two options for memory management: -`CometScanExec` uses nanoseconds for total scan time. Spark also measures scan time in nanoseconds but converts to -milliseconds *per batch* which can result in a large loss of precision. In one case we saw total scan time -of 41 seconds reported as 23 seconds for example. +- **Unified Memory Management** shares an off-heap memory pool between Spark and Comet. This is the recommended option. +- **Native Memory Management** leverages DataFusion's memory management for the native plans and allocates memory independently of Spark. -## Memory Tuning +### Unified Memory Management + +This option is automatically enabled when `spark.memory.offHeap.enabled=true`. + +Each executor will have a single memory pool which will be shared by all native plans being executed within that +process, and by Spark itself. The size of the pool is specified by `spark.memory.offHeap.size`. + +### Native Memory Management + +This option is automatically enabled when `spark.memory.offHeap.enabled=false`. + +Each native plan has a dedicated memory pool. + +By default, the size of each pool is `spark.comet.memory.overhead.factor * spark.executor.memory`. The default value +for `spark.comet.memory.overhead.factor` is `0.2`. -Comet currently doesn't share the memory allocation from Spark but owns its own memory allocation. -That's said, Comet requires additional memory to be allocated. Comet provides some memory related configurations to help you tune the memory usage. +It is important to take executor concurrency into account. The maximum number of concurrent plans in an executor can +be calculated with `spark.executor.cores / spark.task.cpus`. -By default, the amount of memory is `spark.comet.memory.overhead.factor` * `spark.executor.memory`. -The default value for `spark.comet.memory.overhead.factor` is 0.2. You can increase the factor to require more -memory for Comet to use, if you see OOM error. Generally, increasing memory overhead will improve the performance of your queries. -For example, some operators like `SortMergeJoin` and `HashAggregate` may require more memory to run. -Once the memory is not enough, the operator will spill to disk, which will slow down the query. +For example, if the executor can execute 4 plans concurrently, then the total amount of memory allocated will be +`4 * spark.comet.memory.overhead.factor * spark.executor.memory`. -Besides, you can also set the memory explicitly by setting `spark.comet.memoryOverhead` to the desired value. -Comet will allocate at least `spark.comet.memory.overhead.min` memory. +It is also possible to set `spark.comet.memoryOverhead` to the desired size for each pool, rather than calculating +it based on `spark.comet.memory.overhead.factor`. If both `spark.comet.memoryOverhead` and `spark.comet.memory.overhead.factor` are set, the former will be used. -## Memory Tuning using CometPlugin -Configuring memory for Spark and Comet might be a tedious task as it requires to tune Spark executor overhead memory and Comet memory overhead configs. Comet provides a Spark plugin `CometPlugin` which can be set up to your Spark application to help memory settings. +Comet will allocate at least `spark.comet.memory.overhead.min` memory per pool. -For users running the Comet in clusters like Kubernetes or YARN, `CometPlugin` can also make the resource manager respect correctly Comet memory parameters `spark.comet.memory*`. -it is needed to pass to the starting command line additional Spark configuration parameter `--conf spark.plugins=org.apache.spark.CometPlugin` +### Determining How Much Memory to Allocate -The resource managers respects Apache Spark memory configuration before starting the containers. +Generally, increasing memory overhead will improve query performance, especially for queries containing joins and +aggregates. -The `CometPlugin` plugin overrides `spark.executor.memoryOverhead` adding up the Comet memory configuration. +Once a memory pool is exhausted, the native plan will start spilling to disk, which will slow down the query. +Insufficient memory allocation can also lead to out-of-memory (OOM) errors. + +## Configuring spark.executor.memoryOverhead + +In some environments, such as Kubernetes and YARN, it is important to correctly set `spark.executor.memoryOverhead` so +that it is possible to allocate off-heap memory. + +Comet will automatically set `spark.executor.memoryOverhead` based on the `spark.comet.memory*` settings so that +resource managers respect Apache Spark memory configuration before starting the containers. + +Note that there is currently a known issue where this will be inaccurate when using Native Memory Management because it +does not take executor concurrency into account. The tracking issue for this is +https://github.com/apache/datafusion-comet/issues/949. ## Shuffle -Comet provides Comet shuffle features that can be used to improve the performance of your queries. -The following sections describe the different shuffle options available in Comet. +Comet provides accelerated shuffle implementations that can be used to improve the performance of your queries. To enable Comet shuffle, set the following configuration in your Spark configuration: @@ -71,30 +92,37 @@ spark.comet.exec.shuffle.enabled=true `spark.shuffle.manager` is a Spark static configuration which cannot be changed at runtime. It must be set before the Spark context is created. You can enable or disable Comet shuffle at runtime by setting `spark.comet.exec.shuffle.enabled` to `true` or `false`. -Once it is disabled, Comet will fallback to the default Spark shuffle manager. - -> **_NOTE:_** At the moment Comet Shuffle is not compatible with Spark AQE partition coalesce. To disable set `spark.sql.adaptive.coalescePartitions.enabled` to `false`. +Once it is disabled, Comet will fall back to the default Spark shuffle manager. ### Shuffle Mode Comet provides three shuffle modes: Columnar Shuffle, Native Shuffle and Auto Mode. -#### Columnar Shuffle +#### Auto Mode + +`spark.comet.exec.shuffle.mode` to `auto` will let Comet choose the best shuffle mode based on the query plan. This +is the default. -By default, once `spark.comet.exec.shuffle.enabled` is enabled, Comet uses JVM-based columnar shuffle -to improve the performance of shuffle operations. Columnar shuffle supports HashPartitioning, -RoundRobinPartitioning, RangePartitioning and SinglePartitioning. This mode has the highest -query coverage. +#### Columnar (JVM) Shuffle -Columnar shuffle can be enabled by setting `spark.comet.exec.shuffle.mode` to `jvm`. +Comet Columnar shuffle is JVM-based and supports `HashPartitioning`, `RoundRobinPartitioning`, `RangePartitioning`, and +`SinglePartitioning`. This mode has the highest query coverage. + +Columnar shuffle can be enabled by setting `spark.comet.exec.shuffle.mode` to `jvm`. If this mode is explicitly set, +then any shuffle operations that cannot be supported in this mode will fall back to Spark. #### Native Shuffle -Comet also provides a fully native shuffle implementation that can be used to improve the performance. -To enable native shuffle, just set `spark.comet.exec.shuffle.mode` to `native` +Comet also provides a fully native shuffle implementation, which generally provides the best performance. However, +native shuffle currently only supports `HashPartitioning` and `SinglePartitioning`. + +To enable native shuffle, set `spark.comet.exec.shuffle.mode` to `native`. If this mode is explicitly set, +then any shuffle operations that cannot be supported in this mode will fall back to Spark. -Native shuffle only supports HashPartitioning and SinglePartitioning. +## Metrics -### Auto Mode +Comet metrics are not directly comparable to Spark metrics in some cases. -`spark.comet.exec.shuffle.mode` to `auto` will let Comet choose the best shuffle mode based on the query plan. +`CometScanExec` uses nanoseconds for total scan time. Spark also measures scan time in nanoseconds but converts to +milliseconds _per batch_ which can result in a large loss of precision. In one case we saw total scan time +of 41 seconds reported as 23 seconds for example.