Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
*/
package org.apache.sedona.sql.datasources.shapefile

import org.apache.hadoop.fs.Path
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{BoundReference, JoinedRow}
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.connector.read.PartitionReader
import org.apache.spark.sql.connector.read.PartitionReaderFactory
Expand All @@ -28,14 +31,19 @@ import org.apache.spark.sql.execution.datasources.v2.PartitionReaderWithPartitio
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.SerializableConfiguration

import java.util.Locale

case class ShapefilePartitionReaderFactory(
sqlConf: SQLConf,
broadcastedConf: Broadcast[SerializableConfiguration],
dataSchema: StructType,
readDataSchema: StructType,
partitionSchema: StructType,
/** The metadata fields requested by the query (e.g., fields from `_metadata`). */
metadataSchema: StructType,
options: ShapefileReadOptions,
filters: Seq[Filter])
extends PartitionReaderFactory {
Expand All @@ -48,11 +56,51 @@ case class ShapefilePartitionReaderFactory(
partitionedFiles,
readDataSchema,
options)
new PartitionReaderWithPartitionValues(
val withPartitionValues = new PartitionReaderWithPartitionValues(
fileReader,
readDataSchema,
partitionSchema,
partitionedFiles.head.partitionValues)

if (metadataSchema.nonEmpty) {
// Build metadata values from the .shp file's partition information.
// We use the .shp file because it is the primary shapefile component and its path
// is what users would expect to see in _metadata.file_path / _metadata.file_name.
val shpFile = partitionedFiles
.find(_.filePath.toPath.getName.toLowerCase(Locale.ROOT).endsWith(".shp"))
.getOrElse(partitionedFiles.head)
val filePath = shpFile.filePath.toString
val fileName = new Path(filePath).getName

// Complete map of all metadata field values keyed by field name.
// The modificationTime from PartitionedFile is in milliseconds but Spark's
// TimestampType uses microseconds, so we multiply by 1000.
val allMetadataValues: Map[String, Any] = Map(
"file_path" -> UTF8String.fromString(filePath),
"file_name" -> UTF8String.fromString(fileName),
"file_size" -> shpFile.fileSize,
"file_block_start" -> shpFile.start,
"file_block_length" -> shpFile.length,
"file_modification_time" -> (shpFile.modificationTime * 1000L))

// The metadataSchema may be pruned by Spark's column pruning (e.g., when the query
// only selects `_metadata.file_name`). We must construct the inner struct to match
// the pruned schema exactly, otherwise field ordinals will be misaligned.
val innerStructType = metadataSchema.fields.head.dataType.asInstanceOf[StructType]
val prunedValues = innerStructType.fields.map(f => allMetadataValues(f.name))
val metadataStruct = InternalRow.fromSeq(prunedValues.toSeq)

// Wrap the struct in an outer row since _metadata is a single StructType column
val metadataRow = InternalRow.fromSeq(Seq(metadataStruct))
val baseSchema = StructType(readDataSchema.fields ++ partitionSchema.fields)
new PartitionReaderWithMetadata(
withPartitionValues,
baseSchema,
metadataSchema,
metadataRow)
} else {
withPartitionValues
}
}

override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {
Expand All @@ -64,3 +112,43 @@ case class ShapefilePartitionReaderFactory(
}
}
}

/**
* Wraps a partition reader to append metadata column values to each row. This follows the same
* pattern as [[PartitionReaderWithPartitionValues]] but for metadata columns: it uses a
* [[JoinedRow]] to concatenate the base row (data + partition values) with the metadata row, then
* projects the combined row through an
* [[org.apache.spark.sql.catalyst.expressions.UnsafeProjection]] to produce a compact unsafe row.
*
* @param reader
* the underlying reader that produces data + partition value rows
* @param baseSchema
* the combined schema of data columns and partition columns
* @param metadataSchema
* the schema of the metadata columns being appended
* @param metadataValues
* the constant metadata values to append to every row
*/
private[shapefile] class PartitionReaderWithMetadata(
reader: PartitionReader[InternalRow],
baseSchema: StructType,
metadataSchema: StructType,
metadataValues: InternalRow)
extends PartitionReader[InternalRow] {

private val joinedRow = new JoinedRow()
private val unsafeProjection =
GenerateUnsafeProjection.generate(baseSchema.fields.zipWithIndex.map { case (f, i) =>
BoundReference(i, f.dataType, f.nullable)
} ++ metadataSchema.fields.zipWithIndex.map { case (f, i) =>
BoundReference(baseSchema.length + i, f.dataType, f.nullable)
})

override def next(): Boolean = reader.next()

override def get(): InternalRow = {
unsafeProjection(joinedRow(reader.get(), metadataValues))
}

override def close(): Unit = reader.close()
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,22 @@ case class ShapefileScan(
dataSchema: StructType,
readDataSchema: StructType,
readPartitionSchema: StructType,
/** The metadata fields requested by the query (e.g., fields from `_metadata`). */
metadataSchema: StructType,
options: CaseInsensitiveStringMap,
pushedFilters: Array[Filter],
partitionFilters: Seq[Expression] = Seq.empty,
dataFilters: Seq[Expression] = Seq.empty)
extends FileScan {

/**
* Returns the complete read schema including data columns, partition columns, and any requested
* metadata columns. Metadata columns are appended last so the reader factory can construct a
* [[JoinedRow]] that appends metadata values after data and partition values.
*/
override def readSchema(): StructType =
StructType(readDataSchema.fields ++ readPartitionSchema.fields ++ metadataSchema.fields)

override def createReaderFactory(): PartitionReaderFactory = {
val caseSensitiveMap = options.asScala.toMap
val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
Expand All @@ -61,6 +71,7 @@ case class ShapefileScan(
dataSchema,
readDataSchema,
readPartitionSchema,
metadataSchema,
ShapefileReadOptions.parse(options),
pushedFilters)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,36 @@ case class ShapefileScanBuilder(
options: CaseInsensitiveStringMap)
extends FileScanBuilder(sparkSession, fileIndex, dataSchema) {

/**
* Tracks any metadata fields (e.g., from `_metadata`) requested in the query. Populated by
* [[pruneColumns]] when Spark pushes down column projections.
*/
private var _requiredMetadataSchema: StructType = StructType(Seq.empty)

/**
* Intercepts Spark's column pruning to separate metadata columns from data/partition columns.
* Fields in [[requiredSchema]] that do not belong to the data schema or partition schema are
* assumed to be metadata fields (e.g., `_metadata`). These are captured in
* [[_requiredMetadataSchema]] so the scan can include them in the output.
*/
override def pruneColumns(requiredSchema: StructType): Unit = {
val resolver = sparkSession.sessionState.conf.resolver
val metaFields = requiredSchema.fields.filter { field =>
!dataSchema.fields.exists(df => resolver(df.name, field.name)) &&
!fileIndex.partitionSchema.fields.exists(pf => resolver(pf.name, field.name))
}
_requiredMetadataSchema = StructType(metaFields)
super.pruneColumns(requiredSchema)
}

override def build(): Scan = {
ShapefileScan(
sparkSession,
fileIndex,
dataSchema,
readDataSchema(),
readPartitionSchema(),
_requiredMetadataSchema,
options,
pushedDataFilters,
partitionFilters,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,36 @@ package org.apache.sedona.sql.datasources.shapefile
import org.apache.hadoop.fs.FileStatus
import org.apache.sedona.core.formatMapper.shapefileParser.parseUtils.dbf.DbfParseUtil
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.catalog.TableCapability
import org.apache.spark.sql.connector.catalog.{MetadataColumn, SupportsMetadataColumns, TableCapability}
import org.apache.spark.sql.connector.read.ScanBuilder
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.sedona.sql.datasources.shapefile.ShapefileUtils.{baseSchema, fieldDescriptorsToSchema, mergeSchemas}
import org.apache.spark.sql.execution.datasources.v2.FileTable
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{DataType, LongType, StringType, StructField, StructType, TimestampType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.SerializableConfiguration

import java.util.Locale
import scala.collection.JavaConverters._

/**
* A Spark DataSource V2 table implementation for reading Shapefiles.
*
* Extends [[FileTable]] to leverage Spark's file-based scan infrastructure and implements
* [[SupportsMetadataColumns]] to expose hidden metadata columns (e.g., `_metadata`) that provide
* file-level information such as path, name, size, and modification time. These metadata columns
* are not part of the user-visible schema but can be explicitly selected in queries.
*/
case class ShapefileTable(
name: String,
sparkSession: SparkSession,
options: CaseInsensitiveStringMap,
paths: Seq[String],
userSpecifiedSchema: Option[StructType],
fallbackFileFormat: Class[_ <: FileFormat])
extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
extends FileTable(sparkSession, options, paths, userSpecifiedSchema)
with SupportsMetadataColumns {

override def formatName: String = "Shapefile"

Expand Down Expand Up @@ -95,9 +104,49 @@ case class ShapefileTable(
}
}

/** Returns the metadata columns that this table exposes as hidden columns. */
override def metadataColumns(): Array[MetadataColumn] = ShapefileTable.fileMetadataColumns

override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
ShapefileScanBuilder(sparkSession, fileIndex, schema, dataSchema, options)
}

override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = null
}

object ShapefileTable {

/**
* Schema of the `_metadata` struct column exposed by [[SupportsMetadataColumns]]. Each field
* provides file-level information about the source shapefile:
*
* - `file_path`: The fully qualified path of the `.shp` file (e.g.,
* `hdfs://host/data/file.shp`).
* - `file_name`: The name of the `.shp` file without directory components (e.g., `file.shp`).
* - `file_size`: The total size of the `.shp` file in bytes.
* - `file_block_start`: The byte offset within the file where this partition's data begins.
* For non-splittable formats this is typically 0.
* - `file_block_length`: The number of bytes in this partition's data block. For
* non-splittable formats this equals the file size.
* - `file_modification_time`: The last modification timestamp of the `.shp` file.
*/
private val FILE_METADATA_STRUCT_TYPE: StructType = StructType(
Seq(
StructField("file_path", StringType, nullable = false),
StructField("file_name", StringType, nullable = false),
StructField("file_size", LongType, nullable = false),
StructField("file_block_start", LongType, nullable = false),
StructField("file_block_length", LongType, nullable = false),
StructField("file_modification_time", TimestampType, nullable = false)))

/**
* The single metadata column `_metadata` exposed to Spark's catalog. This hidden column can be
* selected in queries (e.g., `SELECT _metadata.file_name FROM shapefile.`...``) but does not
* appear in `SELECT *`.
*/
private[shapefile] val fileMetadataColumns: Array[MetadataColumn] = Array(new MetadataColumn {
override def name: String = "_metadata"
override def dataType: DataType = FILE_METADATA_STRUCT_TYPE
override def isNullable: Boolean = false
})
}
Loading
Loading