Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions docs/tutorial/files/geoparquet-sedona-spark.md
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,67 @@ ORDER BY geohash

Let's look closer at how Sedona uses the GeoParquet bbox metadata to optimize queries.

## Spatial Partitioning for GeoParquet

When building a GeoParquet data lake, spatial partitioning can dramatically improve query performance by co-locating spatially nearby records into the same partitions. This makes GeoParquet bbox-based file skipping much more effective because each file's bounding box covers a compact spatial region instead of spanning the entire dataset.

Sedona provides a one-step API — `StructuredAdapter.repartitionBySpatialKey` — that handles spatial partitioning directly on DataFrames. Under the hood it converts to a SpatialRDD, applies a partitioning scheme such as KDB-Tree, and converts back to a DataFrame — all in a single call.

=== "Python"

```python
from sedona.core.enums import GridType
from sedona.spark.adapters.structured_adapter import StructuredAdapter

df = sedona.read.format("geoparquet").load("/path/to/input")

# Repartition with explicit geometry column and partition count
partitioned_df = StructuredAdapter.repartitionBySpatialKey(
df, GridType.KDBTREE, "geometry", 16
)

# Or auto-detect geometry column
partitioned_df = StructuredAdapter.repartitionBySpatialKey(df, GridType.KDBTREE)

partitioned_df.write.format("geoparquet").save("/path/to/output")
```

=== "Scala"

```scala
import org.apache.sedona.core.enums.GridType
import org.apache.spark.sql.sedona_sql.adapters.StructuredAdapter

val df = sedona.read.format("geoparquet").load("/path/to/input")

// Repartition with explicit geometry column and partition count
val partitionedDf = StructuredAdapter.repartitionBySpatialKey(df, "geometry", GridType.KDBTREE, 16)

// Or auto-detect geometry column
val partitionedDf = StructuredAdapter.repartitionBySpatialKey(df, GridType.KDBTREE)

partitionedDf.write.format("geoparquet").save("/path/to/output")
```

=== "Java"

```java
import org.apache.sedona.core.enums.GridType;
import org.apache.spark.sql.sedona_sql.adapters.StructuredAdapter;

Dataset<Row> df = sedona.read().format("geoparquet").load("/path/to/input");

// Repartition with explicit geometry column and partition count
Dataset<Row> partitionedDf = StructuredAdapter.repartitionBySpatialKey(df, "geometry", GridType.KDBTREE, 16);

// Or auto-detect geometry column
Dataset<Row> partitionedDf = StructuredAdapter.repartitionBySpatialKey(df, GridType.KDBTREE);

partitionedDf.write().format("geoparquet").save("/path/to/output");
```

This approach is more effective than sorting by GeoHash because it uses a KDB-Tree to create balanced spatial partitions that respect the actual data distribution. Each output file will cover a compact spatial region, maximizing the effectiveness of GeoParquet's bbox-based file skipping.

## How Sedona uses GeoParquet bounding box (bbox) metadata with Spark

The bounding box metadata specifies the area covered by geometric shapes in a given file. Suppose you query points in a region not covered by the bounding box for a given file. The engine can skip that entire file when executing the query because it’s known that it does not cover any relevant data.
Expand Down
50 changes: 37 additions & 13 deletions docs/tutorial/sql.md
Original file line number Diff line number Diff line change
Expand Up @@ -1383,37 +1383,61 @@ are introduced on purpose to ensure correctness when performing a spatial join;
however, when using Sedona to prepare a dataset for distribution this is not typically
desired.

You can use `StructuredAdapter` and the `spatialRDD.spatialPartitioningWithoutDuplicates` function to obtain a Sedona DataFrame that is spatially partitioned without duplicates. This is especially useful for generating balanced GeoParquet files while preserving spatial proximity within files, which is crucial for optimizing filter pushdown performance in GeoParquet files.
You can use `StructuredAdapter.repartitionBySpatialKey` to spatially partition a DataFrame in one step, without duplicates. This is especially useful for generating balanced GeoParquet files while preserving spatial proximity within files, which is crucial for optimizing filter pushdown performance in GeoParquet files.

=== "Scala"

```scala
spatialRDD.spatialPartitioningWithoutDuplicates(GridType.KDBTREE)
// Specify the desired number of partitions as 10, though the actual number may vary
// spatialRDD.spatialPartitioningWithoutDuplicates(GridType.KDBTREE, 10)
var spatialDf = StructuredAdapter.toSpatialPartitionedDf(spatialRDD, sedona)
import org.apache.spark.sql.sedona_sql.adapters.StructuredAdapter
import org.apache.sedona.core.enums.GridType

// Repartition using KDB-Tree with auto-detected geometry column
var spatialDf = StructuredAdapter.repartitionBySpatialKey(df, GridType.KDBTREE)

// Specify a geometry column and the desired number of partitions
var spatialDf = StructuredAdapter.repartitionBySpatialKey(df, "geometry", GridType.KDBTREE, 10)

// Write to GeoParquet
spatialDf.write.format("geoparquet").save("/path/to/output")
```

=== "Java"

```java
spatialRDD.spatialPartitioningWithoutDuplicates(GridType.KDBTREE)
// Specify the desired number of partitions as 10, though the actual number may vary
// spatialRDD.spatialPartitioningWithoutDuplicates(GridType.KDBTREE, 10)
Dataset<Row> spatialDf = StructuredAdapter.toSpatialPartitionedDf(spatialRDD, sedona)
import org.apache.spark.sql.sedona_sql.adapters.StructuredAdapter;
import org.apache.sedona.core.enums.GridType;

// Repartition using KDB-Tree with auto-detected geometry column
Dataset<Row> spatialDf = StructuredAdapter.repartitionBySpatialKey(df, GridType.KDBTREE);

// Specify a geometry column and the desired number of partitions
Dataset<Row> spatialDf = StructuredAdapter.repartitionBySpatialKey(df, "geometry", GridType.KDBTREE, 10);

// Write to GeoParquet
spatialDf.write().format("geoparquet").save("/path/to/output");
```

=== "Python"

```python
from sedona.spark import StructuredAdapter
from sedona.spark.core.enums.grid_type import GridType

# Repartition using KDB-Tree with auto-detected geometry column
spatial_df = StructuredAdapter.repartitionBySpatialKey(df, GridType.KDBTREE)

# Specify a geometry column and the desired number of partitions
spatial_df = StructuredAdapter.repartitionBySpatialKey(
df, GridType.KDBTREE, geometryFieldName="geometry", numPartitions=10
)

spatialRDD.spatialPartitioningWithoutDuplicates(GridType.KDBTREE)
# Specify the desired number of partitions as 10, though the actual number may vary
# spatialRDD.spatialPartitioningWithoutDuplicates(GridType.KDBTREE, 10)
spatialDf = StructuredAdapter.toSpatialPartitionedDf(spatialRDD, sedona)
# Write to GeoParquet
spatial_df.write.format("geoparquet").save("/path/to/output")
```

!!!note
You can also achieve spatial partitioning manually using the lower-level API. See the `spatialPartitioningWithoutDuplicates` method on `SpatialRDD` and `StructuredAdapter.toSpatialPartitionedDf` for the step-by-step approach.

### SpatialPairRDD to DataFrame

PairRDD is the result of a spatial join query or distance join query. SedonaSQL DataFrame-RDD Adapter can convert the result to a DataFrame. But you need to provide the schema of the left and right RDDs.
Expand Down
48 changes: 48 additions & 0 deletions python/sedona/spark/utils/structured_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.types import StructType

from sedona.spark.core.enums.grid_type import GridType
from sedona.spark.core.SpatialRDD.spatial_rdd import SpatialRDD
from sedona.spark.core.spatialOperator.rdd import SedonaPairRDD

Expand Down Expand Up @@ -124,3 +125,50 @@ def pairRddToDf(
)
df = StructuredAdapter._create_dataframe(jdf, sparkSession)
return df

@classmethod
def repartitionBySpatialKey(
cls,
dataFrame: DataFrame,
gridType: GridType = GridType.KDBTREE,
geometryFieldName: str = None,
numPartitions: int = 0,
) -> DataFrame:
"""
Repartition a DataFrame using a spatial partitioning scheme (e.g., KDB-Tree).
This is a convenience method that wraps the multi-step process of converting a
DataFrame to a SpatialRDD, applying spatial partitioning without duplicates,
and converting back to a DataFrame.

Example usage::

partitioned_df = StructuredAdapter.repartitionBySpatialKey(df, GridType.KDBTREE, "geometry", 16)
partitioned_df.write.format("geoparquet").save("/path/to/output")

Args:
dataFrame: The input DataFrame containing a geometry column.
gridType: The spatial partitioning grid type (default: GridType.KDBTREE).
geometryFieldName: The name of the geometry column. If None, auto-detects.
numPartitions: The target number of partitions. If 0, uses the current number.

Returns:
A spatially partitioned DataFrame.
"""
sc = dataFrame._sc
jvm = sc._jvm
sparkSession = dataFrame.sparkSession

jgrid_type = jvm.org.apache.sedona.core.enums.GridType.getGridType(
gridType.value
)

if geometryFieldName is not None:
jdf = jvm.StructuredAdapter.repartitionBySpatialKey(
dataFrame._jdf, geometryFieldName, jgrid_type, numPartitions
)
else:
jdf = jvm.StructuredAdapter.repartitionBySpatialKey(
dataFrame._jdf, jgrid_type, numPartitions
)

return StructuredAdapter._create_dataframe(jdf, sparkSession)
19 changes: 19 additions & 0 deletions python/tests/sql/test_structured_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,25 @@ def test_build_index_and_range_query_with_mixed_geometries(self):
result_count = query_result.count()
assert result_count > 0, f"Expected at least one result, got {result_count}"

def test_repartition_by_spatial_key(self):
xys = [(i, i // 100, i % 100) for i in range(1_000)]
df = self.spark.createDataFrame(xys, ["id", "x", "y"]).selectExpr(
"id", "ST_Point(x, y) AS geom"
)
partitioned_df = StructuredAdapter.repartitionBySpatialKey(
df, GridType.KDBTREE, "geom", 4
)
assert partitioned_df.count() == 1_000
assert partitioned_df.rdd.getNumPartitions() >= 4

def test_repartition_by_spatial_key_auto_detect(self):
xys = [(i, i // 100, i % 100) for i in range(1_000)]
df = self.spark.createDataFrame(xys, ["id", "x", "y"]).selectExpr(
"id", "ST_Point(x, y) AS geom"
)
partitioned_df = StructuredAdapter.repartitionBySpatialKey(df, GridType.KDBTREE)
assert partitioned_df.count() == 1_000

def test_toDf_preserves_columns_with_proper_types(self):
# Create a spatial DataFrame with various columns and types
data = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.spark.sql.sedona_sql.adapters

import org.apache.sedona.core.enums.GridType
import org.apache.sedona.core.spatialPartitioning.GenericUniquePartitioner
import org.apache.sedona.core.spatialRDD.SpatialRDD
import org.apache.sedona.sql.utils.GeometrySerializer
Expand Down Expand Up @@ -245,4 +246,80 @@ object StructuredAdapter {
originalRightSpatialRdd.schema,
sparkSession)
}

/**
* Repartition a DataFrame using a spatial partitioning scheme (e.g., KDB-Tree). This is a
* convenience method that wraps the multi-step process of converting a DataFrame to a
* SpatialRDD, applying spatial partitioning without duplicates, and converting back to a
* DataFrame.
*
* Example usage:
* {{{
* val partitionedDf = StructuredAdapter.repartitionBySpatialKey(df, "geometry", GridType.KDBTREE, 16)
* partitionedDf.write.format("geoparquet").save("/path/to/output")
* }}}
*
* @param dataFrame
* The input DataFrame containing a geometry column.
* @param geometryFieldName
* The name of the geometry column.
* @param gridType
* The spatial partitioning grid type (e.g., GridType.KDBTREE).
* @param numPartitions
* The target number of partitions. If 0, defaults to the current number of partitions.
* @return
* A spatially partitioned DataFrame.
*/
def repartitionBySpatialKey(
dataFrame: DataFrame,
geometryFieldName: String,
gridType: GridType,
numPartitions: Int = 0): DataFrame = {
val spatialRDD = toSpatialRdd(dataFrame, geometryFieldName)
spatialRDD.analyze()
val partCount =
if (numPartitions > 0) numPartitions
else dataFrame.rdd.getNumPartitions
spatialRDD.spatialPartitioningWithoutDuplicates(gridType, partCount)
toSpatialPartitionedDf(spatialRDD, dataFrame.sparkSession)
}

/**
* Repartition a DataFrame using a spatial partitioning scheme (e.g., KDB-Tree). Auto-detects
* the geometry column.
*
* @param dataFrame
* The input DataFrame containing a geometry column.
* @param gridType
* The spatial partitioning grid type (e.g., GridType.KDBTREE).
* @param numPartitions
* The target number of partitions. If 0, defaults to the current number of partitions.
* @return
* A spatially partitioned DataFrame.
*/
def repartitionBySpatialKey(
dataFrame: DataFrame,
gridType: GridType,
numPartitions: Int): DataFrame = {
repartitionBySpatialKey(
dataFrame,
DfUtils.getGeometryColumnName(dataFrame.schema),
gridType,
numPartitions)
}

/**
* Repartition a DataFrame using a spatial partitioning scheme (e.g., KDB-Tree). Auto-detects
* the geometry column and uses the current number of partitions.
*
* @param dataFrame
* The input DataFrame containing a geometry column.
* @param gridType
* The spatial partitioning grid type (e.g., GridType.KDBTREE).
* @return
* A spatially partitioned DataFrame.
*/
def repartitionBySpatialKey(dataFrame: DataFrame, gridType: GridType): DataFrame = {
repartitionBySpatialKey(dataFrame, gridType, 0)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,5 +126,22 @@ class structuredAdapterTestScala extends TestBaseScala with GivenWhenThen {
val dfPartitions: Long = partitionedDF.select(spark_partition_id).distinct().count()
assert(dfPartitions == numSpatialPartitions)
}

it("Should repartition by spatial key in one step") {
val seq = generateTestData()
val dfOrigin = sparkSession.createDataFrame(seq)
val partitionedDf =
StructuredAdapter.repartitionBySpatialKey(dfOrigin, "_3", GridType.KDBTREE, 4)
assertEquals(seq.size, partitionedDf.count())
assert(partitionedDf.rdd.getNumPartitions >= 4)
}

it("Should repartition by spatial key with auto-detected geometry column") {
val seq = generateTestData()
val dfOrigin = sparkSession.createDataFrame(seq)
val partitionedDf =
StructuredAdapter.repartitionBySpatialKey(dfOrigin, GridType.KDBTREE)
assertEquals(seq.size, partitionedDf.count())
}
}
}
Loading