Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
384 changes: 384 additions & 0 deletions docs/memory-management.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,384 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "a2e6cc7b",
"metadata": {},
"source": [
"<!---\n",
" Licensed to the Apache Software Foundation (ASF) under one\n",
" or more contributor license agreements. See the NOTICE file\n",
" distributed with this work for additional information\n",
" regarding copyright ownership. The ASF licenses this file\n",
" to you under the Apache License, Version 2.0 (the\n",
" \"License\"); you may not use this file except in compliance\n",
" with the License. You may obtain a copy of the License at\n",
"\n",
" http://www.apache.org/licenses/LICENSE-2.0\n",
"\n",
" Unless required by applicable law or agreed to in writing,\n",
" software distributed under the License is distributed on an\n",
" \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
" KIND, either express or implied. See the License for the\n",
" specific language governing permissions and limitations\n",
" under the License.\n",
"-->\n",
"\n",
"# Memory Management and Spilling\n",
"\n",
"SedonaDB supports memory-limited execution with automatic spill-to-disk, allowing you to process datasets that are larger than available memory. When a memory limit is configured, operators that exceed their memory budget automatically spill intermediate data to temporary files on disk and read them back as needed."
]
},
{
"cell_type": "markdown",
"id": "e3a30ea9",
"metadata": {},
"source": [
"## Configuring Memory Limits\n",
"\n",
"Set `memory_limit` on the context options to cap the total memory available for query execution. The limit accepts an integer (bytes) or a human-readable string such as `\"4gb\"`, `\"512m\"`, or `\"1.5g\"`."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "d1f99fcf",
"metadata": {},
"outputs": [],
"source": [
"import sedona.db\n",
"\n",
"sd = sedona.db.connect()\n",
"sd.options.memory_limit = \"4gb\""
]
},
{
"cell_type": "markdown",
"id": "1fdc73aa",
"metadata": {},
"source": [
"Without a memory limit, SedonaDB uses an unbounded memory pool and operators can use as much memory as needed (until the process hits system limits). In this mode, operators typically won't spill to disk because there is no memory budget to enforce.\n",
"\n",
"> **Note:** All runtime options (`memory_limit`, `memory_pool_type`, `temp_dir`, `unspillable_reserve_ratio`) must be set before the internal context is initialized. The internal context is created on the first call to `sd.sql(...)` (including `SET` statements) or any read method (for example, `sd.read_parquet(...)`) -- not when you call `.execute()` on the returned DataFrame. Once the internal context is created, these runtime options become read-only."
]
},
{
"cell_type": "markdown",
"id": "c0f91c15",
"metadata": {},
"source": [
"## Memory Pool Types\n",
"\n",
"The `memory_pool_type` option controls how the memory budget is distributed among concurrent operators. Two pool types are available:\n",
"\n",
"- **`\"greedy\"`** -- Grants memory reservations on a first-come-first-served basis. This is the default when no pool type is specified. Simple, but can lead to memory reservation failures under pressure -- one consumer may exhaust the pool before others get a chance to reserve memory.\n",
"- **`\"fair\"` (recommended)** -- Distributes memory fairly among spillable consumers and reserves a fraction of the pool for unspillable consumers. More stable under memory pressure and significantly less likely to cause reservation failures, at the cost of slightly lower utilization of the total reserved memory.\n",
"\n",
"We recommend using `\"fair\"` whenever a memory limit is configured."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "b1dff726",
"metadata": {},
"outputs": [],
"source": [
"import sedona.db\n",
"\n",
"sd = sedona.db.connect()\n",
"sd.options.memory_limit = \"4gb\"\n",
"sd.options.memory_pool_type = \"fair\""
]
},
{
"cell_type": "markdown",
"id": "bd4c0a76",
"metadata": {},
"source": [
"> **Note:** `memory_pool_type` only takes effect when `memory_limit` is set."
]
},
{
"cell_type": "markdown",
"id": "969f4fe0",
"metadata": {},
"source": [
"### Unspillable reserve ratio\n",
"\n",
"When using the `\"fair\"` pool, the `unspillable_reserve_ratio` option controls the fraction of the memory pool reserved for unspillable consumers (operators that cannot spill their memory to disk). It accepts a float between `0.0` and `1.0` and defaults to `0.2` (20%) when not explicitly set."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "dc0718cf",
"metadata": {},
"outputs": [],
"source": [
"import sedona.db\n",
"\n",
"sd = sedona.db.connect()\n",
"sd.options.memory_limit = \"8gb\"\n",
"sd.options.memory_pool_type = \"fair\"\n",
"sd.options.unspillable_reserve_ratio = 0.3 # reserve 30% for unspillable consumers"
]
},
{
"cell_type": "markdown",
"id": "fb1c9e39",
"metadata": {},
"source": [
"## Temporary Directory for Spill Files\n",
"\n",
"By default, DataFusion uses the system temporary directory for spill files. You can override this with `temp_dir` to control where spill data is written -- for example, to point to a larger or faster disk."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "c8d7a5c9",
"metadata": {},
"outputs": [],
"source": [
"import sedona.db\n",
"\n",
"sd = sedona.db.connect()\n",
"sd.options.memory_limit = \"4gb\"\n",
"sd.options.memory_pool_type = \"fair\"\n",
"sd.options.temp_dir = \"/mnt/fast-ssd/sedona-spill\""
]
},
{
"cell_type": "markdown",
"id": "5d318b8f",
"metadata": {},
"source": [
"## Example: Spatial Join with Limited Memory\n",
"\n",
"This example performs a spatial join between Natural Earth cities (points) and Natural Earth countries (polygons) using `ST_Contains`. Spatial joins are one of the most common workloads that benefit from memory limits and spill-to-disk."
]
},
{
"cell_type": "code",
"execution_count": 21,
"id": "1ed77d58",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"┌───────────────┬─────────────────────────────┐\n",
"│ city_name ┆ country_name │\n",
"│ utf8 ┆ utf8 │\n",
"╞═══════════════╪═════════════════════════════╡\n",
"│ Suva ┆ Fiji │\n",
"├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤\n",
"│ Dodoma ┆ United Republic of Tanzania │\n",
"├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤\n",
"│ Dar es Salaam ┆ United Republic of Tanzania │\n",
"├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤\n",
"│ Bir Lehlou ┆ Western Sahara │\n",
"├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤\n",
"│ Ottawa ┆ Canada │\n",
"├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤\n",
"│ Vancouver ┆ Canada │\n",
"├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤\n",
"│ Toronto ┆ Canada │\n",
"├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤\n",
"│ San Francisco ┆ United States of America │\n",
"├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤\n",
"│ Denver ┆ United States of America │\n",
"├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤\n",
"│ Houston ┆ United States of America │\n",
"└───────────────┴─────────────────────────────┘\n"
]
}
],
"source": [
"import sedona.db\n",
"\n",
"sd = sedona.db.connect()\n",
"\n",
"# Configure runtime options before any sd.sql(...) or sd.read_* call.\n",
"sd.options.memory_limit = \"4gb\"\n",
"sd.options.memory_pool_type = \"fair\"\n",
"sd.options.unspillable_reserve_ratio = 0.2\n",
"sd.options.temp_dir = \"/tmp/sedona-spill\"\n",
"\n",
"cities = sd.read_parquet(\n",
" \"https://raw.githubusercontent.com/geoarrow/geoarrow-data/v0.2.0/natural-earth/files/natural-earth_cities_geo.parquet\"\n",
")\n",
"cities.to_view(\"cities\", overwrite=True)\n",
"\n",
"countries = sd.read_parquet(\n",
" \"https://raw.githubusercontent.com/geoarrow/geoarrow-data/v0.2.0/natural-earth/files/natural-earth_countries_geo.parquet\"\n",
")\n",
"countries.to_view(\"countries\", overwrite=True)\n",
"\n",
"sd.sql(\n",
" \"\"\"\n",
" SELECT\n",
" cities.name city_name,\n",
" countries.name country_name\n",
" FROM cities\n",
" JOIN countries\n",
" ON ST_Contains(countries.geometry, cities.geometry)\n",
" \"\"\"\n",
").show(10)"
]
},
{
"cell_type": "markdown",
"id": "b97c0e8b",
"metadata": {},
"source": [
"## Operators Supporting Memory Limits\n",
"\n",
"When a memory limit is configured, the following operators automatically spill intermediate data to disk when they exceed their memory budget.\n",
"\n",
"In practice, this means memory limits and spilling can apply to both SedonaDB's spatial operators and DataFusion's general-purpose operators used by common SQL constructs.\n",
"\n",
"**SedonaDB:**\n",
"\n",
"- **Spatial joins** -- Both the build-side (index construction, partition collection) and probe-side (stream repartitioning) of SedonaDB's spatial joins support memory-pressure-driven spilling.\n",
"\n",
"**DataFusion (physical operators):**\n",
"\n",
"This list is not exhaustive. Many other DataFusion physical operators and execution strategies may allocate memory through the same runtime memory pool and may spill to disk when memory limits are enforced.\n",
"\n",
"- **`ORDER BY` / sorted Top-K** (`SortExec`) -- External sort that spills sorted runs to disk when memory is exhausted, then merges them.\n",
"- **Hash joins** (`HashJoinExec`) -- Hash join does not support spilling yet. The query will fail with a memory reservation error if the hash table exceeds the memory limit.\n",
"- **Sort-merge joins** (`SortMergeJoinExec`) -- Sort-merge join that spills buffered batches to disk when the memory limit is exceeded.\n",
"- **`GROUP BY` aggregations** (`AggregateExec`) -- Grouped aggregation that spills intermediate aggregation state to sorted spill files when memory is exhausted."
]
},
{
"cell_type": "markdown",
"id": "c56b821f",
"metadata": {},
"source": [
"## Advanced DataFusion Configurations\n",
"\n",
"DataFusion provides additional execution configurations that affect spill behavior. These can be set via SQL `SET` statements after connecting.\n",
"\n",
"> **Note:** Calling `sd.sql(...)` initializes the internal context immediately (including `sd.sql(\"SET ...\")`) and freezes runtime options immediately. Configure `sd.options.*` runtime options (like `memory_limit` and `temp_dir`) before calling any `sd.sql(...)`, including `SET` statements."
]
},
{
"cell_type": "markdown",
"id": "6dd9f5b7",
"metadata": {},
"source": [
"### Spill compression\n",
"\n",
"By default, data is written to spill files uncompressed. Enabling compression reduces the amount of disk I/O and disk space used at the cost of additional CPU work. This is beneficial when disk I/O throughput is low or when disk space is not large enough to hold uncompressed spill data."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "10cf38cb",
"metadata": {},
"outputs": [],
"source": [
"import sedona.db\n",
"\n",
"sd = sedona.db.connect()\n",
"sd.options.memory_limit = \"4gb\"\n",
"sd.options.memory_pool_type = \"fair\"\n",
"\n",
"# Enable LZ4 compression for spill files.\n",
"sd.sql(\"SET datafusion.execution.spill_compression = 'lz4_frame'\").execute()"
]
},
{
"cell_type": "markdown",
"id": "1a3ce16c",
"metadata": {},
"source": [
"### Maximum temporary directory size\n",
"\n",
"DataFusion limits the total size of temporary spill files to prevent unbounded disk usage. The default limit is **100 G**. If your workload needs to spill more data than this, increase the limit."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "eac829ad",
"metadata": {},
"outputs": [],
"source": [
"import sedona.db\n",
"\n",
"sd = sedona.db.connect()\n",
"sd.options.memory_limit = \"4gb\"\n",
"sd.options.memory_pool_type = \"fair\"\n",
"\n",
"# Increase the spill directory size limit to 500 GB.\n",
"sd.sql(\"SET datafusion.runtime.max_temp_directory_size = '500G'\").execute()"
]
},
{
"cell_type": "markdown",
"id": "dd8c2c23",
"metadata": {},
"source": [
"## System Configuration\n",
"\n",
"### Maximum number of open files\n",
"\n",
"Large workloads that spill heavily can create a large number of temporary files. During a spatial join, each parallel execution thread may create one spill file per spatial partition. The total number of open spill files can therefore reach **parallelism x number of spatial partitions**. For example, on an 8-CPU host running a spatial join that produces 500 spatial partitions, up to **8 x 500 = 4,000** spill files may be open simultaneously -- far exceeding the default per-process file descriptor limit.\n",
"\n",
"The operating system's per-process file descriptor limit must be high enough to accommodate this, otherwise queries will fail with \"too many open files\" errors.\n",
"\n",
"**Linux:**\n",
"\n",
"The default limit is typically 1024, which is easily exceeded by spill-heavy workloads like the example above.\n",
"\n",
"To raise the limit permanently, add the following to `/etc/security/limits.conf`:\n",
"\n",
"```\n",
"* soft nofile 65535\n",
"* hard nofile 65535\n",
"```\n",
"\n",
"Then log out and back in (or reboot) for the change to take effect. Verify with:\n",
"\n",
"```bash\n",
"ulimit -n\n",
"```\n",
"\n",
"**macOS:**\n",
"\n",
"```bash\n",
"ulimit -n 65535\n",
"```\n",
"\n",
"This affects the current shell session. Persistent/system-wide limits are OS and configuration dependent; consult your macOS configuration and documentation if you need to raise the hard limit."
]
}
],
"metadata": {
"kernelspec": {
"display_name": "sedonadb",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.13.4"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
Loading