diff --git a/docs/tutorial/files/geoparquet-sedona-spark.md b/docs/tutorial/files/geoparquet-sedona-spark.md index bbe8273b3e4..af04dfc701d 100644 --- a/docs/tutorial/files/geoparquet-sedona-spark.md +++ b/docs/tutorial/files/geoparquet-sedona-spark.md @@ -244,6 +244,67 @@ ORDER BY geohash Let's look closer at how Sedona uses the GeoParquet bbox metadata to optimize queries. +## Spatial Partitioning for GeoParquet + +When building a GeoParquet data lake, spatial partitioning can dramatically improve query performance by co-locating spatially nearby records into the same partitions. This makes GeoParquet bbox-based file skipping much more effective because each file's bounding box covers a compact spatial region instead of spanning the entire dataset. + +Sedona provides a one-step API β€” `StructuredAdapter.repartitionBySpatialKey` β€” that handles spatial partitioning directly on DataFrames. Under the hood it converts to a SpatialRDD, applies a partitioning scheme such as KDB-Tree, and converts back to a DataFrame β€” all in a single call. + +=== "Python" + + ```python + from sedona.core.enums import GridType + from sedona.spark.adapters.structured_adapter import StructuredAdapter + + df = sedona.read.format("geoparquet").load("/path/to/input") + + # Repartition with explicit geometry column and partition count + partitioned_df = StructuredAdapter.repartitionBySpatialKey( + df, GridType.KDBTREE, "geometry", 16 + ) + + # Or auto-detect geometry column + partitioned_df = StructuredAdapter.repartitionBySpatialKey(df, GridType.KDBTREE) + + partitioned_df.write.format("geoparquet").save("/path/to/output") + ``` + +=== "Scala" + + ```scala + import org.apache.sedona.core.enums.GridType + import org.apache.spark.sql.sedona_sql.adapters.StructuredAdapter + + val df = sedona.read.format("geoparquet").load("/path/to/input") + + // Repartition with explicit geometry column and partition count + val partitionedDf = StructuredAdapter.repartitionBySpatialKey(df, "geometry", GridType.KDBTREE, 16) + + // Or auto-detect geometry column + val partitionedDf = StructuredAdapter.repartitionBySpatialKey(df, GridType.KDBTREE) + + partitionedDf.write.format("geoparquet").save("/path/to/output") + ``` + +=== "Java" + + ```java + import org.apache.sedona.core.enums.GridType; + import org.apache.spark.sql.sedona_sql.adapters.StructuredAdapter; + + Dataset df = sedona.read().format("geoparquet").load("/path/to/input"); + + // Repartition with explicit geometry column and partition count + Dataset partitionedDf = StructuredAdapter.repartitionBySpatialKey(df, "geometry", GridType.KDBTREE, 16); + + // Or auto-detect geometry column + Dataset partitionedDf = StructuredAdapter.repartitionBySpatialKey(df, GridType.KDBTREE); + + partitionedDf.write().format("geoparquet").save("/path/to/output"); + ``` + +This approach is more effective than sorting by GeoHash because it uses a KDB-Tree to create balanced spatial partitions that respect the actual data distribution. Each output file will cover a compact spatial region, maximizing the effectiveness of GeoParquet's bbox-based file skipping. + ## How Sedona uses GeoParquet bounding box (bbox) metadata with Spark The bounding box metadata specifies the area covered by geometric shapes in a given file. Suppose you query points in a region not covered by the bounding box for a given file. The engine can skip that entire file when executing the query because it’s known that it does not cover any relevant data. diff --git a/docs/tutorial/sql.md b/docs/tutorial/sql.md index 5031af398d7..94fa5014189 100644 --- a/docs/tutorial/sql.md +++ b/docs/tutorial/sql.md @@ -1383,37 +1383,61 @@ are introduced on purpose to ensure correctness when performing a spatial join; however, when using Sedona to prepare a dataset for distribution this is not typically desired. -You can use `StructuredAdapter` and the `spatialRDD.spatialPartitioningWithoutDuplicates` function to obtain a Sedona DataFrame that is spatially partitioned without duplicates. This is especially useful for generating balanced GeoParquet files while preserving spatial proximity within files, which is crucial for optimizing filter pushdown performance in GeoParquet files. +You can use `StructuredAdapter.repartitionBySpatialKey` to spatially partition a DataFrame in one step, without duplicates. This is especially useful for generating balanced GeoParquet files while preserving spatial proximity within files, which is crucial for optimizing filter pushdown performance in GeoParquet files. === "Scala" ```scala - spatialRDD.spatialPartitioningWithoutDuplicates(GridType.KDBTREE) - // Specify the desired number of partitions as 10, though the actual number may vary - // spatialRDD.spatialPartitioningWithoutDuplicates(GridType.KDBTREE, 10) - var spatialDf = StructuredAdapter.toSpatialPartitionedDf(spatialRDD, sedona) + import org.apache.spark.sql.sedona_sql.adapters.StructuredAdapter + import org.apache.sedona.core.enums.GridType + + // Repartition using KDB-Tree with auto-detected geometry column + var spatialDf = StructuredAdapter.repartitionBySpatialKey(df, GridType.KDBTREE) + + // Specify a geometry column and the desired number of partitions + var spatialDf = StructuredAdapter.repartitionBySpatialKey(df, "geometry", GridType.KDBTREE, 10) + + // Write to GeoParquet + spatialDf.write.format("geoparquet").save("/path/to/output") ``` === "Java" ```java - spatialRDD.spatialPartitioningWithoutDuplicates(GridType.KDBTREE) - // Specify the desired number of partitions as 10, though the actual number may vary - // spatialRDD.spatialPartitioningWithoutDuplicates(GridType.KDBTREE, 10) - Dataset spatialDf = StructuredAdapter.toSpatialPartitionedDf(spatialRDD, sedona) + import org.apache.spark.sql.sedona_sql.adapters.StructuredAdapter; + import org.apache.sedona.core.enums.GridType; + + // Repartition using KDB-Tree with auto-detected geometry column + Dataset spatialDf = StructuredAdapter.repartitionBySpatialKey(df, GridType.KDBTREE); + + // Specify a geometry column and the desired number of partitions + Dataset spatialDf = StructuredAdapter.repartitionBySpatialKey(df, "geometry", GridType.KDBTREE, 10); + + // Write to GeoParquet + spatialDf.write().format("geoparquet").save("/path/to/output"); ``` === "Python" ```python from sedona.spark import StructuredAdapter + from sedona.spark.core.enums.grid_type import GridType + + # Repartition using KDB-Tree with auto-detected geometry column + spatial_df = StructuredAdapter.repartitionBySpatialKey(df, GridType.KDBTREE) + + # Specify a geometry column and the desired number of partitions + spatial_df = StructuredAdapter.repartitionBySpatialKey( + df, GridType.KDBTREE, geometryFieldName="geometry", numPartitions=10 + ) - spatialRDD.spatialPartitioningWithoutDuplicates(GridType.KDBTREE) - # Specify the desired number of partitions as 10, though the actual number may vary - # spatialRDD.spatialPartitioningWithoutDuplicates(GridType.KDBTREE, 10) - spatialDf = StructuredAdapter.toSpatialPartitionedDf(spatialRDD, sedona) + # Write to GeoParquet + spatial_df.write.format("geoparquet").save("/path/to/output") ``` +!!!note + You can also achieve spatial partitioning manually using the lower-level API. See the `spatialPartitioningWithoutDuplicates` method on `SpatialRDD` and `StructuredAdapter.toSpatialPartitionedDf` for the step-by-step approach. + ### SpatialPairRDD to DataFrame PairRDD is the result of a spatial join query or distance join query. SedonaSQL DataFrame-RDD Adapter can convert the result to a DataFrame. But you need to provide the schema of the left and right RDDs. diff --git a/python/sedona/spark/utils/structured_adapter.py b/python/sedona/spark/utils/structured_adapter.py index d1d5e9bd292..633ef76398a 100644 --- a/python/sedona/spark/utils/structured_adapter.py +++ b/python/sedona/spark/utils/structured_adapter.py @@ -18,6 +18,7 @@ from pyspark.sql import DataFrame, SparkSession from pyspark.sql.types import StructType +from sedona.spark.core.enums.grid_type import GridType from sedona.spark.core.SpatialRDD.spatial_rdd import SpatialRDD from sedona.spark.core.spatialOperator.rdd import SedonaPairRDD @@ -124,3 +125,50 @@ def pairRddToDf( ) df = StructuredAdapter._create_dataframe(jdf, sparkSession) return df + + @classmethod + def repartitionBySpatialKey( + cls, + dataFrame: DataFrame, + gridType: GridType = GridType.KDBTREE, + geometryFieldName: str = None, + numPartitions: int = 0, + ) -> DataFrame: + """ + Repartition a DataFrame using a spatial partitioning scheme (e.g., KDB-Tree). + This is a convenience method that wraps the multi-step process of converting a + DataFrame to a SpatialRDD, applying spatial partitioning without duplicates, + and converting back to a DataFrame. + + Example usage:: + + partitioned_df = StructuredAdapter.repartitionBySpatialKey(df, GridType.KDBTREE, "geometry", 16) + partitioned_df.write.format("geoparquet").save("/path/to/output") + + Args: + dataFrame: The input DataFrame containing a geometry column. + gridType: The spatial partitioning grid type (default: GridType.KDBTREE). + geometryFieldName: The name of the geometry column. If None, auto-detects. + numPartitions: The target number of partitions. If 0, uses the current number. + + Returns: + A spatially partitioned DataFrame. + """ + sc = dataFrame._sc + jvm = sc._jvm + sparkSession = dataFrame.sparkSession + + jgrid_type = jvm.org.apache.sedona.core.enums.GridType.getGridType( + gridType.value + ) + + if geometryFieldName is not None: + jdf = jvm.StructuredAdapter.repartitionBySpatialKey( + dataFrame._jdf, geometryFieldName, jgrid_type, numPartitions + ) + else: + jdf = jvm.StructuredAdapter.repartitionBySpatialKey( + dataFrame._jdf, jgrid_type, numPartitions + ) + + return StructuredAdapter._create_dataframe(jdf, sparkSession) diff --git a/python/tests/sql/test_structured_adapter.py b/python/tests/sql/test_structured_adapter.py index 640540ca344..76a922dfc6b 100644 --- a/python/tests/sql/test_structured_adapter.py +++ b/python/tests/sql/test_structured_adapter.py @@ -210,6 +210,25 @@ def test_build_index_and_range_query_with_mixed_geometries(self): result_count = query_result.count() assert result_count > 0, f"Expected at least one result, got {result_count}" + def test_repartition_by_spatial_key(self): + xys = [(i, i // 100, i % 100) for i in range(1_000)] + df = self.spark.createDataFrame(xys, ["id", "x", "y"]).selectExpr( + "id", "ST_Point(x, y) AS geom" + ) + partitioned_df = StructuredAdapter.repartitionBySpatialKey( + df, GridType.KDBTREE, "geom", 4 + ) + assert partitioned_df.count() == 1_000 + assert partitioned_df.rdd.getNumPartitions() >= 4 + + def test_repartition_by_spatial_key_auto_detect(self): + xys = [(i, i // 100, i % 100) for i in range(1_000)] + df = self.spark.createDataFrame(xys, ["id", "x", "y"]).selectExpr( + "id", "ST_Point(x, y) AS geom" + ) + partitioned_df = StructuredAdapter.repartitionBySpatialKey(df, GridType.KDBTREE) + assert partitioned_df.count() == 1_000 + def test_toDf_preserves_columns_with_proper_types(self): # Create a spatial DataFrame with various columns and types data = [ diff --git a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/adapters/StructuredAdapter.scala b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/adapters/StructuredAdapter.scala index 4046f38fe97..cebca2e076e 100644 --- a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/adapters/StructuredAdapter.scala +++ b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/adapters/StructuredAdapter.scala @@ -18,6 +18,7 @@ */ package org.apache.spark.sql.sedona_sql.adapters +import org.apache.sedona.core.enums.GridType import org.apache.sedona.core.spatialPartitioning.GenericUniquePartitioner import org.apache.sedona.core.spatialRDD.SpatialRDD import org.apache.sedona.sql.utils.GeometrySerializer @@ -245,4 +246,80 @@ object StructuredAdapter { originalRightSpatialRdd.schema, sparkSession) } + + /** + * Repartition a DataFrame using a spatial partitioning scheme (e.g., KDB-Tree). This is a + * convenience method that wraps the multi-step process of converting a DataFrame to a + * SpatialRDD, applying spatial partitioning without duplicates, and converting back to a + * DataFrame. + * + * Example usage: + * {{{ + * val partitionedDf = StructuredAdapter.repartitionBySpatialKey(df, "geometry", GridType.KDBTREE, 16) + * partitionedDf.write.format("geoparquet").save("/path/to/output") + * }}} + * + * @param dataFrame + * The input DataFrame containing a geometry column. + * @param geometryFieldName + * The name of the geometry column. + * @param gridType + * The spatial partitioning grid type (e.g., GridType.KDBTREE). + * @param numPartitions + * The target number of partitions. If 0, defaults to the current number of partitions. + * @return + * A spatially partitioned DataFrame. + */ + def repartitionBySpatialKey( + dataFrame: DataFrame, + geometryFieldName: String, + gridType: GridType, + numPartitions: Int = 0): DataFrame = { + val spatialRDD = toSpatialRdd(dataFrame, geometryFieldName) + spatialRDD.analyze() + val partCount = + if (numPartitions > 0) numPartitions + else dataFrame.rdd.getNumPartitions + spatialRDD.spatialPartitioningWithoutDuplicates(gridType, partCount) + toSpatialPartitionedDf(spatialRDD, dataFrame.sparkSession) + } + + /** + * Repartition a DataFrame using a spatial partitioning scheme (e.g., KDB-Tree). Auto-detects + * the geometry column. + * + * @param dataFrame + * The input DataFrame containing a geometry column. + * @param gridType + * The spatial partitioning grid type (e.g., GridType.KDBTREE). + * @param numPartitions + * The target number of partitions. If 0, defaults to the current number of partitions. + * @return + * A spatially partitioned DataFrame. + */ + def repartitionBySpatialKey( + dataFrame: DataFrame, + gridType: GridType, + numPartitions: Int): DataFrame = { + repartitionBySpatialKey( + dataFrame, + DfUtils.getGeometryColumnName(dataFrame.schema), + gridType, + numPartitions) + } + + /** + * Repartition a DataFrame using a spatial partitioning scheme (e.g., KDB-Tree). Auto-detects + * the geometry column and uses the current number of partitions. + * + * @param dataFrame + * The input DataFrame containing a geometry column. + * @param gridType + * The spatial partitioning grid type (e.g., GridType.KDBTREE). + * @return + * A spatially partitioned DataFrame. + */ + def repartitionBySpatialKey(dataFrame: DataFrame, gridType: GridType): DataFrame = { + repartitionBySpatialKey(dataFrame, gridType, 0) + } } diff --git a/spark/common/src/test/scala/org/apache/sedona/sql/structuredAdapterTestScala.scala b/spark/common/src/test/scala/org/apache/sedona/sql/structuredAdapterTestScala.scala index d258ce3b403..54c76e7d44b 100644 --- a/spark/common/src/test/scala/org/apache/sedona/sql/structuredAdapterTestScala.scala +++ b/spark/common/src/test/scala/org/apache/sedona/sql/structuredAdapterTestScala.scala @@ -126,5 +126,22 @@ class structuredAdapterTestScala extends TestBaseScala with GivenWhenThen { val dfPartitions: Long = partitionedDF.select(spark_partition_id).distinct().count() assert(dfPartitions == numSpatialPartitions) } + + it("Should repartition by spatial key in one step") { + val seq = generateTestData() + val dfOrigin = sparkSession.createDataFrame(seq) + val partitionedDf = + StructuredAdapter.repartitionBySpatialKey(dfOrigin, "_3", GridType.KDBTREE, 4) + assertEquals(seq.size, partitionedDf.count()) + assert(partitionedDf.rdd.getNumPartitions >= 4) + } + + it("Should repartition by spatial key with auto-detected geometry column") { + val seq = generateTestData() + val dfOrigin = sparkSession.createDataFrame(seq) + val partitionedDf = + StructuredAdapter.repartitionBySpatialKey(dfOrigin, GridType.KDBTREE) + assertEquals(seq.size, partitionedDf.count()) + } } }