From d0e6f6f94fc08dea8e4c3df1ba21da7037d764fa Mon Sep 17 00:00:00 2001 From: Jia Yu Date: Sat, 14 Feb 2026 23:23:50 -0800 Subject: [PATCH] [GH-2651] Add _metadata hidden column support for GeoPackage DataSource V2 reader Implement SupportsMetadataColumns on GeoPackageTable so that reading GeoPackage files into a DataFrame exposes the standard _metadata hidden struct containing file_path, file_name, file_size, file_block_start, file_block_length, and file_modification_time. Changes across all four Spark version modules (3.4, 3.5, 4.0, 4.1): - GeoPackageTable: mix in SupportsMetadataColumns, define the _metadata MetadataColumn with the standard six-field struct type - GeoPackageScanBuilder: override pruneColumns() to capture the pruned metadata schema requested by Spark column pruning optimizer - GeoPackageScan: accept metadataSchema, override readSchema() to append metadata fields, pass schema to partition reader factory - GeoPackagePartitionReaderFactory: construct metadata values from the PartitionedFile, wrap the base reader in PartitionReaderWithMetadata that joins d that joins d that joins d that joins d that joins d that joins d that joins d that joins d that joins d that joins d that joins d thalues against the filesystem, filtering, and projection. --- .../GeoPackagePartitionReaderFactory.scala | 55 +++++++++++++- .../geopackage/GeoPackageScan.scala | 11 ++- .../geopackage/GeoPackageScanBuilder.scala | 13 ++++ .../geopackage/GeoPackageTable.scala | 26 ++++++- .../sedona/sql/GeoPackageReaderTest.scala | 74 ++++++++++++++++++- .../GeoPackagePartitionReaderFactory.scala | 55 +++++++++++++- .../geopackage/GeoPackageScan.scala | 11 ++- .../geopackage/GeoPackageScanBuilder.scala | 13 ++++ .../geopackage/GeoPackageTable.scala | 25 ++++++- .../sedona/sql/GeoPackageReaderTest.scala | 74 ++++++++++++++++++- .../GeoPackagePartitionReaderFactory.scala | 55 +++++++++++++- .../geopackage/GeoPackageScan.scala | 11 ++- .../geopackage/GeoPackageScanBuilder.scala | 13 ++++ .../geopackage/GeoPackageTable.scala | 25 ++++++- .../sedona/sql/GeoPackageReaderTest.scala | 74 ++++++++++++++++++- .../GeoPackagePartitionReaderFactory.scala | 55 +++++++++++++- .../geopackage/GeoPackageScan.scala | 11 ++- .../geopackage/GeoPackageScanBuilder.scala | 13 ++++ .../geopackage/GeoPackageTable.scala | 25 ++++++- .../sedona/sql/GeoPackageReaderTest.scala | 74 ++++++++++++++++++- 20 files changed, 689 insertions(+), 24 deletions(-) diff --git a/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReaderFactory.scala b/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReaderFactory.scala index b1d38996b04..0f2e9a87b81 100644 --- a/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReaderFactory.scala +++ b/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReaderFactory.scala @@ -25,16 +25,20 @@ import org.apache.sedona.sql.datasources.geopackage.model.{GeoPackageOptions, Ge import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{BoundReference, JoinedRow} +import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory} import org.apache.spark.sql.execution.datasources.FilePartition import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.SerializableConfiguration case class GeoPackagePartitionReaderFactory( sparkSession: SparkSession, broadcastedConf: Broadcast[SerializableConfiguration], loadOptions: GeoPackageOptions, - dataSchema: StructType) + dataSchema: StructType, + metadataSchema: StructType) extends PartitionReaderFactory { override def createReader(partition: InputPartition): PartitionReader[InternalRow] = { @@ -72,7 +76,7 @@ case class GeoPackagePartitionReaderFactory( case _ => None } - GeoPackagePartitionReader( + val baseReader = GeoPackagePartitionReader( rs = rs, options = GeoPackageReadOptions( tableName = loadOptions.tableName, @@ -84,5 +88,52 @@ case class GeoPackagePartitionReaderFactory( broadcastedConf = broadcastedConf, currentTempFile = tempFile, copying = copied) + + if (metadataSchema.nonEmpty) { + val gpkgFile = partitionFiles.head + val filePath = gpkgFile.filePath.toString + val fileName = new Path(filePath).getName + + val allMetadataValues: Map[String, Any] = Map( + "file_path" -> UTF8String.fromString(filePath), + "file_name" -> UTF8String.fromString(fileName), + "file_size" -> gpkgFile.fileSize, + "file_block_start" -> gpkgFile.start, + "file_block_length" -> gpkgFile.length, + "file_modification_time" -> (gpkgFile.modificationTime * 1000L)) + + val innerStructType = metadataSchema.fields.head.dataType.asInstanceOf[StructType] + val prunedValues = innerStructType.fields.map(f => allMetadataValues(f.name)) + val metadataStruct = InternalRow.fromSeq(prunedValues.toSeq) + val metadataRow = InternalRow.fromSeq(Seq(metadataStruct)) + + new PartitionReaderWithMetadata(baseReader, dataSchema, metadataSchema, metadataRow) + } else { + baseReader + } } } + +private[geopackage] class PartitionReaderWithMetadata( + reader: PartitionReader[InternalRow], + baseSchema: StructType, + metadataSchema: StructType, + metadataValues: InternalRow) + extends PartitionReader[InternalRow] { + + private val joinedRow = new JoinedRow() + private val unsafeProjection = + GenerateUnsafeProjection.generate(baseSchema.fields.zipWithIndex.map { case (f, i) => + BoundReference(i, f.dataType, f.nullable) + } ++ metadataSchema.fields.zipWithIndex.map { case (f, i) => + BoundReference(baseSchema.length + i, f.dataType, f.nullable) + }) + + override def next(): Boolean = reader.next() + + override def get(): InternalRow = { + unsafeProjection(joinedRow(reader.get(), metadataValues)) + } + + override def close(): Unit = reader.close() +} diff --git a/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScan.scala b/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScan.scala index 768afd79173..edca3d35ff1 100644 --- a/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScan.scala +++ b/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScan.scala @@ -36,10 +36,14 @@ case class GeoPackageScan( fileIndex: PartitioningAwareFileIndex, readDataSchema: StructType, readPartitionSchema: StructType, + metadataSchema: StructType, options: CaseInsensitiveStringMap, loadOptions: GeoPackageOptions) extends FileScan { + override def readSchema(): StructType = + StructType(readDataSchema.fields ++ readPartitionSchema.fields ++ metadataSchema.fields) + override def partitionFilters: Seq[Expression] = { Seq.empty } @@ -54,6 +58,11 @@ case class GeoPackageScan( val broadcastedConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) - GeoPackagePartitionReaderFactory(sparkSession, broadcastedConf, loadOptions, dataSchema) + GeoPackagePartitionReaderFactory( + sparkSession, + broadcastedConf, + loadOptions, + dataSchema, + metadataSchema) } } diff --git a/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScanBuilder.scala b/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScanBuilder.scala index a9674395b41..d363406de19 100644 --- a/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScanBuilder.scala +++ b/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScanBuilder.scala @@ -36,6 +36,18 @@ class GeoPackageScanBuilder( userDefinedSchema: Option[StructType] = None) extends FileScanBuilder(sparkSession, fileIndex, dataSchema) { + private var _requiredMetadataSchema: StructType = StructType(Seq.empty) + + override def pruneColumns(requiredSchema: StructType): Unit = { + val resolver = sparkSession.sessionState.conf.resolver + val metaFields = requiredSchema.fields.filter { field => + !dataSchema.fields.exists(df => resolver(df.name, field.name)) && + !fileIndex.partitionSchema.fields.exists(pf => resolver(pf.name, field.name)) + } + _requiredMetadataSchema = StructType(metaFields) + super.pruneColumns(requiredSchema) + } + override def build(): Scan = { val paths = fileIndex.allFiles().map(_.getPath.toString) @@ -54,6 +66,7 @@ class GeoPackageScanBuilder( fileIndexAdjusted, dataSchema, readPartitionSchema(), + _requiredMetadataSchema, options, loadOptions) } diff --git a/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala b/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala index 999aa812801..498933de30b 100644 --- a/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala +++ b/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala @@ -22,11 +22,12 @@ import org.apache.hadoop.fs.FileStatus import org.apache.sedona.sql.datasources.geopackage.connection.{FileSystemUtils, GeoPackageConnectionManager} import org.apache.sedona.sql.datasources.geopackage.model.{GeoPackageOptions, MetadataSchema, TableType} import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.connector.catalog.{MetadataColumn, SupportsMetadataColumns} import org.apache.spark.sql.connector.read.ScanBuilder import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} import org.apache.spark.sql.execution.datasources.FileFormat import org.apache.spark.sql.execution.datasources.v2.FileTable -import org.apache.spark.sql.types.{DoubleType, IntegerType, StringType, StructField, StructType, TimestampType} +import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType, LongType, StringType, StructField, StructType, TimestampType} import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.SerializableConfiguration @@ -40,7 +41,8 @@ case class GeoPackageTable( userSpecifiedSchema: Option[StructType], fallbackFileFormat: Class[_ <: FileFormat], loadOptions: GeoPackageOptions) - extends FileTable(sparkSession, options, paths, userSpecifiedSchema) { + extends FileTable(sparkSession, options, paths, userSpecifiedSchema) + with SupportsMetadataColumns { override def inferSchema(files: Seq[FileStatus]): Option[StructType] = { if (loadOptions.showMetadata) { @@ -74,6 +76,8 @@ case class GeoPackageTable( "GeoPackage" } + override def metadataColumns(): Array[MetadataColumn] = GeoPackageTable.fileMetadataColumns + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { new GeoPackageScanBuilder( sparkSession, @@ -88,3 +92,21 @@ case class GeoPackageTable( null } } + +object GeoPackageTable { + + private val FILE_METADATA_STRUCT_TYPE: StructType = StructType( + Seq( + StructField("file_path", StringType, nullable = false), + StructField("file_name", StringType, nullable = false), + StructField("file_size", LongType, nullable = false), + StructField("file_block_start", LongType, nullable = false), + StructField("file_block_length", LongType, nullable = false), + StructField("file_modification_time", TimestampType, nullable = false))) + + private[geopackage] val fileMetadataColumns: Array[MetadataColumn] = Array(new MetadataColumn { + override def name: String = "_metadata" + override def dataType: DataType = FILE_METADATA_STRUCT_TYPE + override def isNullable: Boolean = false + }) +} diff --git a/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/GeoPackageReaderTest.scala b/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/GeoPackageReaderTest.scala index 166d8c48dba..0443553a863 100644 --- a/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/GeoPackageReaderTest.scala +++ b/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/GeoPackageReaderTest.scala @@ -19,7 +19,7 @@ package org.apache.sedona.sql import io.minio.{MakeBucketArgs, MinioClient} -import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.functions.expr import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT import org.apache.spark.sql.types._ @@ -321,6 +321,78 @@ class GeoPackageReaderTest extends TestBaseScala with Matchers { } } + describe("_metadata hidden column support") { + it("should expose _metadata struct with all expected fields") { + val df = readFeatureData("point1") + val metaDf = df.select("_metadata") + val metaSchema = metaDf.schema.fields.head.dataType.asInstanceOf[StructType] + val fieldNames = metaSchema.fieldNames.toSet + fieldNames should contain("file_path") + fieldNames should contain("file_name") + fieldNames should contain("file_size") + fieldNames should contain("file_block_start") + fieldNames should contain("file_block_length") + fieldNames should contain("file_modification_time") + } + + it("should not include _metadata in select(*)") { + val df = readFeatureData("point1") + val starCols = df.select("*").columns.toSet + starCols should not contain "_metadata" + } + + it("should return correct file_path and file_name in _metadata") { + val df = readFeatureData("point1") + val row = df.select("_metadata.file_path", "_metadata.file_name").head() + val filePath = row.getString(0) + val fileName = row.getString(1) + filePath should endWith("example.gpkg") + fileName shouldEqual "example.gpkg" + } + + it("should return actual file_size matching the .gpkg file on disk") { + val df = readFeatureData("point1") + val metaFileSize = df.select("_metadata.file_size").head().getLong(0) + val actualFile = new java.io.File(path) + metaFileSize shouldEqual actualFile.length() + } + + it("should return file_block_start=0 and file_block_length=file_size") { + val df = readFeatureData("point1") + val row = df + .select( + "_metadata.file_block_start", + "_metadata.file_block_length", + "_metadata.file_size") + .head() + row.getLong(0) shouldEqual 0L + row.getLong(1) shouldEqual row.getLong(2) + } + + it("should return file_modification_time matching the .gpkg file on disk") { + val df = readFeatureData("point1") + val metaModTime = df.select("_metadata.file_modification_time").head().getTimestamp(0) + val actualFile = new java.io.File(path) + val expectedModTime = new java.sql.Timestamp(actualFile.lastModified()) + metaModTime shouldEqual expectedModTime + } + + it("should allow filtering on _metadata fields") { + val df = readFeatureData("point1") + val filtered = df.filter(df("_metadata.file_name") === "example.gpkg") + filtered.count() shouldEqual df.count() + val empty = df.filter(df("_metadata.file_name") === "nonexistent.gpkg") + empty.count() shouldEqual 0 + } + + it("should select _metadata along with data columns") { + val df = readFeatureData("point1") + val result = df.select("id", "_metadata.file_name").head() + result.getInt(0) shouldEqual 1 + result.getString(1) shouldEqual "example.gpkg" + } + } + private def readFeatureData(tableName: String): DataFrame = { sparkSession.read .format("geopackage") diff --git a/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReaderFactory.scala b/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReaderFactory.scala index b1d38996b04..0f2e9a87b81 100644 --- a/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReaderFactory.scala +++ b/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReaderFactory.scala @@ -25,16 +25,20 @@ import org.apache.sedona.sql.datasources.geopackage.model.{GeoPackageOptions, Ge import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{BoundReference, JoinedRow} +import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory} import org.apache.spark.sql.execution.datasources.FilePartition import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.SerializableConfiguration case class GeoPackagePartitionReaderFactory( sparkSession: SparkSession, broadcastedConf: Broadcast[SerializableConfiguration], loadOptions: GeoPackageOptions, - dataSchema: StructType) + dataSchema: StructType, + metadataSchema: StructType) extends PartitionReaderFactory { override def createReader(partition: InputPartition): PartitionReader[InternalRow] = { @@ -72,7 +76,7 @@ case class GeoPackagePartitionReaderFactory( case _ => None } - GeoPackagePartitionReader( + val baseReader = GeoPackagePartitionReader( rs = rs, options = GeoPackageReadOptions( tableName = loadOptions.tableName, @@ -84,5 +88,52 @@ case class GeoPackagePartitionReaderFactory( broadcastedConf = broadcastedConf, currentTempFile = tempFile, copying = copied) + + if (metadataSchema.nonEmpty) { + val gpkgFile = partitionFiles.head + val filePath = gpkgFile.filePath.toString + val fileName = new Path(filePath).getName + + val allMetadataValues: Map[String, Any] = Map( + "file_path" -> UTF8String.fromString(filePath), + "file_name" -> UTF8String.fromString(fileName), + "file_size" -> gpkgFile.fileSize, + "file_block_start" -> gpkgFile.start, + "file_block_length" -> gpkgFile.length, + "file_modification_time" -> (gpkgFile.modificationTime * 1000L)) + + val innerStructType = metadataSchema.fields.head.dataType.asInstanceOf[StructType] + val prunedValues = innerStructType.fields.map(f => allMetadataValues(f.name)) + val metadataStruct = InternalRow.fromSeq(prunedValues.toSeq) + val metadataRow = InternalRow.fromSeq(Seq(metadataStruct)) + + new PartitionReaderWithMetadata(baseReader, dataSchema, metadataSchema, metadataRow) + } else { + baseReader + } } } + +private[geopackage] class PartitionReaderWithMetadata( + reader: PartitionReader[InternalRow], + baseSchema: StructType, + metadataSchema: StructType, + metadataValues: InternalRow) + extends PartitionReader[InternalRow] { + + private val joinedRow = new JoinedRow() + private val unsafeProjection = + GenerateUnsafeProjection.generate(baseSchema.fields.zipWithIndex.map { case (f, i) => + BoundReference(i, f.dataType, f.nullable) + } ++ metadataSchema.fields.zipWithIndex.map { case (f, i) => + BoundReference(baseSchema.length + i, f.dataType, f.nullable) + }) + + override def next(): Boolean = reader.next() + + override def get(): InternalRow = { + unsafeProjection(joinedRow(reader.get(), metadataValues)) + } + + override def close(): Unit = reader.close() +} diff --git a/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScan.scala b/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScan.scala index 768afd79173..edca3d35ff1 100644 --- a/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScan.scala +++ b/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScan.scala @@ -36,10 +36,14 @@ case class GeoPackageScan( fileIndex: PartitioningAwareFileIndex, readDataSchema: StructType, readPartitionSchema: StructType, + metadataSchema: StructType, options: CaseInsensitiveStringMap, loadOptions: GeoPackageOptions) extends FileScan { + override def readSchema(): StructType = + StructType(readDataSchema.fields ++ readPartitionSchema.fields ++ metadataSchema.fields) + override def partitionFilters: Seq[Expression] = { Seq.empty } @@ -54,6 +58,11 @@ case class GeoPackageScan( val broadcastedConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) - GeoPackagePartitionReaderFactory(sparkSession, broadcastedConf, loadOptions, dataSchema) + GeoPackagePartitionReaderFactory( + sparkSession, + broadcastedConf, + loadOptions, + dataSchema, + metadataSchema) } } diff --git a/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScanBuilder.scala b/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScanBuilder.scala index 829bd9c2201..7fdb716f29f 100644 --- a/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScanBuilder.scala +++ b/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScanBuilder.scala @@ -36,6 +36,18 @@ class GeoPackageScanBuilder( userDefinedSchema: Option[StructType] = None) extends FileScanBuilder(sparkSession, fileIndex, dataSchema) { + private var _requiredMetadataSchema: StructType = StructType(Seq.empty) + + override def pruneColumns(requiredSchema: StructType): Unit = { + val resolver = sparkSession.sessionState.conf.resolver + val metaFields = requiredSchema.fields.filter { field => + !dataSchema.fields.exists(df => resolver(df.name, field.name)) && + !fileIndex.partitionSchema.fields.exists(pf => resolver(pf.name, field.name)) + } + _requiredMetadataSchema = StructType(metaFields) + super.pruneColumns(requiredSchema) + } + override def build(): Scan = { val fileIndexAdjusted = if (loadOptions.showMetadata) @@ -52,6 +64,7 @@ class GeoPackageScanBuilder( fileIndexAdjusted, dataSchema, readPartitionSchema(), + _requiredMetadataSchema, options, loadOptions) } diff --git a/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala b/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala index 85dec8427e4..498933de30b 100644 --- a/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala +++ b/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala @@ -22,11 +22,12 @@ import org.apache.hadoop.fs.FileStatus import org.apache.sedona.sql.datasources.geopackage.connection.{FileSystemUtils, GeoPackageConnectionManager} import org.apache.sedona.sql.datasources.geopackage.model.{GeoPackageOptions, MetadataSchema, TableType} import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.connector.catalog.{MetadataColumn, SupportsMetadataColumns} import org.apache.spark.sql.connector.read.ScanBuilder import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} import org.apache.spark.sql.execution.datasources.FileFormat import org.apache.spark.sql.execution.datasources.v2.FileTable -import org.apache.spark.sql.types.{DoubleType, IntegerType, StringType, StructField, StructType, TimestampType} +import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType, LongType, StringType, StructField, StructType, TimestampType} import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.SerializableConfiguration @@ -40,7 +41,8 @@ case class GeoPackageTable( userSpecifiedSchema: Option[StructType], fallbackFileFormat: Class[_ <: FileFormat], loadOptions: GeoPackageOptions) - extends FileTable(sparkSession, options, paths, userSpecifiedSchema) { + extends FileTable(sparkSession, options, paths, userSpecifiedSchema) + with SupportsMetadataColumns { override def inferSchema(files: Seq[FileStatus]): Option[StructType] = { if (loadOptions.showMetadata) { @@ -74,6 +76,8 @@ case class GeoPackageTable( "GeoPackage" } + override def metadataColumns(): Array[MetadataColumn] = GeoPackageTable.fileMetadataColumns + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { new GeoPackageScanBuilder( sparkSession, @@ -87,5 +91,22 @@ case class GeoPackageTable( override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = { null } +} + +object GeoPackageTable { + + private val FILE_METADATA_STRUCT_TYPE: StructType = StructType( + Seq( + StructField("file_path", StringType, nullable = false), + StructField("file_name", StringType, nullable = false), + StructField("file_size", LongType, nullable = false), + StructField("file_block_start", LongType, nullable = false), + StructField("file_block_length", LongType, nullable = false), + StructField("file_modification_time", TimestampType, nullable = false))) + private[geopackage] val fileMetadataColumns: Array[MetadataColumn] = Array(new MetadataColumn { + override def name: String = "_metadata" + override def dataType: DataType = FILE_METADATA_STRUCT_TYPE + override def isNullable: Boolean = false + }) } diff --git a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/GeoPackageReaderTest.scala b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/GeoPackageReaderTest.scala index 66fa147bc58..f37298661fe 100644 --- a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/GeoPackageReaderTest.scala +++ b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/GeoPackageReaderTest.scala @@ -19,7 +19,7 @@ package org.apache.sedona.sql import io.minio.{MakeBucketArgs, MinioClient, PutObjectArgs} -import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.sql.functions.expr import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT import org.apache.spark.sql.types.{BinaryType, BooleanType, DateType, DoubleType, IntegerType, StringType, StructField, StructType, TimestampType} @@ -321,6 +321,78 @@ class GeoPackageReaderTest extends TestBaseScala with Matchers { } } + describe("_metadata hidden column support") { + it("should expose _metadata struct with all expected fields") { + val df = readFeatureData("point1") + val metaDf = df.select("_metadata") + val metaSchema = metaDf.schema.fields.head.dataType.asInstanceOf[StructType] + val fieldNames = metaSchema.fieldNames.toSet + fieldNames should contain("file_path") + fieldNames should contain("file_name") + fieldNames should contain("file_size") + fieldNames should contain("file_block_start") + fieldNames should contain("file_block_length") + fieldNames should contain("file_modification_time") + } + + it("should not include _metadata in select(*)") { + val df = readFeatureData("point1") + val starCols = df.select("*").columns.toSet + starCols should not contain "_metadata" + } + + it("should return correct file_path and file_name in _metadata") { + val df = readFeatureData("point1") + val row = df.select("_metadata.file_path", "_metadata.file_name").head() + val filePath = row.getString(0) + val fileName = row.getString(1) + filePath should endWith("example.gpkg") + fileName shouldEqual "example.gpkg" + } + + it("should return actual file_size matching the .gpkg file on disk") { + val df = readFeatureData("point1") + val metaFileSize = df.select("_metadata.file_size").head().getLong(0) + val actualFile = new java.io.File(path) + metaFileSize shouldEqual actualFile.length() + } + + it("should return file_block_start=0 and file_block_length=file_size") { + val df = readFeatureData("point1") + val row = df + .select( + "_metadata.file_block_start", + "_metadata.file_block_length", + "_metadata.file_size") + .head() + row.getLong(0) shouldEqual 0L + row.getLong(1) shouldEqual row.getLong(2) + } + + it("should return file_modification_time matching the .gpkg file on disk") { + val df = readFeatureData("point1") + val metaModTime = df.select("_metadata.file_modification_time").head().getTimestamp(0) + val actualFile = new java.io.File(path) + val expectedModTime = new java.sql.Timestamp(actualFile.lastModified()) + metaModTime shouldEqual expectedModTime + } + + it("should allow filtering on _metadata fields") { + val df = readFeatureData("point1") + val filtered = df.filter(df("_metadata.file_name") === "example.gpkg") + filtered.count() shouldEqual df.count() + val empty = df.filter(df("_metadata.file_name") === "nonexistent.gpkg") + empty.count() shouldEqual 0 + } + + it("should select _metadata along with data columns") { + val df = readFeatureData("point1") + val result = df.select("id", "_metadata.file_name").head() + result.getInt(0) shouldEqual 1 + result.getString(1) shouldEqual "example.gpkg" + } + } + private def readFeatureData(tableName: String): DataFrame = { sparkSession.read .format("geopackage") diff --git a/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReaderFactory.scala b/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReaderFactory.scala index b1d38996b04..0f2e9a87b81 100644 --- a/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReaderFactory.scala +++ b/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReaderFactory.scala @@ -25,16 +25,20 @@ import org.apache.sedona.sql.datasources.geopackage.model.{GeoPackageOptions, Ge import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{BoundReference, JoinedRow} +import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory} import org.apache.spark.sql.execution.datasources.FilePartition import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.SerializableConfiguration case class GeoPackagePartitionReaderFactory( sparkSession: SparkSession, broadcastedConf: Broadcast[SerializableConfiguration], loadOptions: GeoPackageOptions, - dataSchema: StructType) + dataSchema: StructType, + metadataSchema: StructType) extends PartitionReaderFactory { override def createReader(partition: InputPartition): PartitionReader[InternalRow] = { @@ -72,7 +76,7 @@ case class GeoPackagePartitionReaderFactory( case _ => None } - GeoPackagePartitionReader( + val baseReader = GeoPackagePartitionReader( rs = rs, options = GeoPackageReadOptions( tableName = loadOptions.tableName, @@ -84,5 +88,52 @@ case class GeoPackagePartitionReaderFactory( broadcastedConf = broadcastedConf, currentTempFile = tempFile, copying = copied) + + if (metadataSchema.nonEmpty) { + val gpkgFile = partitionFiles.head + val filePath = gpkgFile.filePath.toString + val fileName = new Path(filePath).getName + + val allMetadataValues: Map[String, Any] = Map( + "file_path" -> UTF8String.fromString(filePath), + "file_name" -> UTF8String.fromString(fileName), + "file_size" -> gpkgFile.fileSize, + "file_block_start" -> gpkgFile.start, + "file_block_length" -> gpkgFile.length, + "file_modification_time" -> (gpkgFile.modificationTime * 1000L)) + + val innerStructType = metadataSchema.fields.head.dataType.asInstanceOf[StructType] + val prunedValues = innerStructType.fields.map(f => allMetadataValues(f.name)) + val metadataStruct = InternalRow.fromSeq(prunedValues.toSeq) + val metadataRow = InternalRow.fromSeq(Seq(metadataStruct)) + + new PartitionReaderWithMetadata(baseReader, dataSchema, metadataSchema, metadataRow) + } else { + baseReader + } } } + +private[geopackage] class PartitionReaderWithMetadata( + reader: PartitionReader[InternalRow], + baseSchema: StructType, + metadataSchema: StructType, + metadataValues: InternalRow) + extends PartitionReader[InternalRow] { + + private val joinedRow = new JoinedRow() + private val unsafeProjection = + GenerateUnsafeProjection.generate(baseSchema.fields.zipWithIndex.map { case (f, i) => + BoundReference(i, f.dataType, f.nullable) + } ++ metadataSchema.fields.zipWithIndex.map { case (f, i) => + BoundReference(baseSchema.length + i, f.dataType, f.nullable) + }) + + override def next(): Boolean = reader.next() + + override def get(): InternalRow = { + unsafeProjection(joinedRow(reader.get(), metadataValues)) + } + + override def close(): Unit = reader.close() +} diff --git a/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScan.scala b/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScan.scala index 768afd79173..edca3d35ff1 100644 --- a/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScan.scala +++ b/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScan.scala @@ -36,10 +36,14 @@ case class GeoPackageScan( fileIndex: PartitioningAwareFileIndex, readDataSchema: StructType, readPartitionSchema: StructType, + metadataSchema: StructType, options: CaseInsensitiveStringMap, loadOptions: GeoPackageOptions) extends FileScan { + override def readSchema(): StructType = + StructType(readDataSchema.fields ++ readPartitionSchema.fields ++ metadataSchema.fields) + override def partitionFilters: Seq[Expression] = { Seq.empty } @@ -54,6 +58,11 @@ case class GeoPackageScan( val broadcastedConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) - GeoPackagePartitionReaderFactory(sparkSession, broadcastedConf, loadOptions, dataSchema) + GeoPackagePartitionReaderFactory( + sparkSession, + broadcastedConf, + loadOptions, + dataSchema, + metadataSchema) } } diff --git a/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScanBuilder.scala b/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScanBuilder.scala index 829bd9c2201..7fdb716f29f 100644 --- a/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScanBuilder.scala +++ b/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScanBuilder.scala @@ -36,6 +36,18 @@ class GeoPackageScanBuilder( userDefinedSchema: Option[StructType] = None) extends FileScanBuilder(sparkSession, fileIndex, dataSchema) { + private var _requiredMetadataSchema: StructType = StructType(Seq.empty) + + override def pruneColumns(requiredSchema: StructType): Unit = { + val resolver = sparkSession.sessionState.conf.resolver + val metaFields = requiredSchema.fields.filter { field => + !dataSchema.fields.exists(df => resolver(df.name, field.name)) && + !fileIndex.partitionSchema.fields.exists(pf => resolver(pf.name, field.name)) + } + _requiredMetadataSchema = StructType(metaFields) + super.pruneColumns(requiredSchema) + } + override def build(): Scan = { val fileIndexAdjusted = if (loadOptions.showMetadata) @@ -52,6 +64,7 @@ class GeoPackageScanBuilder( fileIndexAdjusted, dataSchema, readPartitionSchema(), + _requiredMetadataSchema, options, loadOptions) } diff --git a/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala b/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala index 85dec8427e4..498933de30b 100644 --- a/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala +++ b/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala @@ -22,11 +22,12 @@ import org.apache.hadoop.fs.FileStatus import org.apache.sedona.sql.datasources.geopackage.connection.{FileSystemUtils, GeoPackageConnectionManager} import org.apache.sedona.sql.datasources.geopackage.model.{GeoPackageOptions, MetadataSchema, TableType} import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.connector.catalog.{MetadataColumn, SupportsMetadataColumns} import org.apache.spark.sql.connector.read.ScanBuilder import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} import org.apache.spark.sql.execution.datasources.FileFormat import org.apache.spark.sql.execution.datasources.v2.FileTable -import org.apache.spark.sql.types.{DoubleType, IntegerType, StringType, StructField, StructType, TimestampType} +import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType, LongType, StringType, StructField, StructType, TimestampType} import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.SerializableConfiguration @@ -40,7 +41,8 @@ case class GeoPackageTable( userSpecifiedSchema: Option[StructType], fallbackFileFormat: Class[_ <: FileFormat], loadOptions: GeoPackageOptions) - extends FileTable(sparkSession, options, paths, userSpecifiedSchema) { + extends FileTable(sparkSession, options, paths, userSpecifiedSchema) + with SupportsMetadataColumns { override def inferSchema(files: Seq[FileStatus]): Option[StructType] = { if (loadOptions.showMetadata) { @@ -74,6 +76,8 @@ case class GeoPackageTable( "GeoPackage" } + override def metadataColumns(): Array[MetadataColumn] = GeoPackageTable.fileMetadataColumns + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { new GeoPackageScanBuilder( sparkSession, @@ -87,5 +91,22 @@ case class GeoPackageTable( override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = { null } +} + +object GeoPackageTable { + + private val FILE_METADATA_STRUCT_TYPE: StructType = StructType( + Seq( + StructField("file_path", StringType, nullable = false), + StructField("file_name", StringType, nullable = false), + StructField("file_size", LongType, nullable = false), + StructField("file_block_start", LongType, nullable = false), + StructField("file_block_length", LongType, nullable = false), + StructField("file_modification_time", TimestampType, nullable = false))) + private[geopackage] val fileMetadataColumns: Array[MetadataColumn] = Array(new MetadataColumn { + override def name: String = "_metadata" + override def dataType: DataType = FILE_METADATA_STRUCT_TYPE + override def isNullable: Boolean = false + }) } diff --git a/spark/spark-4.0/src/test/scala/org/apache/sedona/sql/GeoPackageReaderTest.scala b/spark/spark-4.0/src/test/scala/org/apache/sedona/sql/GeoPackageReaderTest.scala index 66fa147bc58..f37298661fe 100644 --- a/spark/spark-4.0/src/test/scala/org/apache/sedona/sql/GeoPackageReaderTest.scala +++ b/spark/spark-4.0/src/test/scala/org/apache/sedona/sql/GeoPackageReaderTest.scala @@ -19,7 +19,7 @@ package org.apache.sedona.sql import io.minio.{MakeBucketArgs, MinioClient, PutObjectArgs} -import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.sql.functions.expr import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT import org.apache.spark.sql.types.{BinaryType, BooleanType, DateType, DoubleType, IntegerType, StringType, StructField, StructType, TimestampType} @@ -321,6 +321,78 @@ class GeoPackageReaderTest extends TestBaseScala with Matchers { } } + describe("_metadata hidden column support") { + it("should expose _metadata struct with all expected fields") { + val df = readFeatureData("point1") + val metaDf = df.select("_metadata") + val metaSchema = metaDf.schema.fields.head.dataType.asInstanceOf[StructType] + val fieldNames = metaSchema.fieldNames.toSet + fieldNames should contain("file_path") + fieldNames should contain("file_name") + fieldNames should contain("file_size") + fieldNames should contain("file_block_start") + fieldNames should contain("file_block_length") + fieldNames should contain("file_modification_time") + } + + it("should not include _metadata in select(*)") { + val df = readFeatureData("point1") + val starCols = df.select("*").columns.toSet + starCols should not contain "_metadata" + } + + it("should return correct file_path and file_name in _metadata") { + val df = readFeatureData("point1") + val row = df.select("_metadata.file_path", "_metadata.file_name").head() + val filePath = row.getString(0) + val fileName = row.getString(1) + filePath should endWith("example.gpkg") + fileName shouldEqual "example.gpkg" + } + + it("should return actual file_size matching the .gpkg file on disk") { + val df = readFeatureData("point1") + val metaFileSize = df.select("_metadata.file_size").head().getLong(0) + val actualFile = new java.io.File(path) + metaFileSize shouldEqual actualFile.length() + } + + it("should return file_block_start=0 and file_block_length=file_size") { + val df = readFeatureData("point1") + val row = df + .select( + "_metadata.file_block_start", + "_metadata.file_block_length", + "_metadata.file_size") + .head() + row.getLong(0) shouldEqual 0L + row.getLong(1) shouldEqual row.getLong(2) + } + + it("should return file_modification_time matching the .gpkg file on disk") { + val df = readFeatureData("point1") + val metaModTime = df.select("_metadata.file_modification_time").head().getTimestamp(0) + val actualFile = new java.io.File(path) + val expectedModTime = new java.sql.Timestamp(actualFile.lastModified()) + metaModTime shouldEqual expectedModTime + } + + it("should allow filtering on _metadata fields") { + val df = readFeatureData("point1") + val filtered = df.filter(df("_metadata.file_name") === "example.gpkg") + filtered.count() shouldEqual df.count() + val empty = df.filter(df("_metadata.file_name") === "nonexistent.gpkg") + empty.count() shouldEqual 0 + } + + it("should select _metadata along with data columns") { + val df = readFeatureData("point1") + val result = df.select("id", "_metadata.file_name").head() + result.getInt(0) shouldEqual 1 + result.getString(1) shouldEqual "example.gpkg" + } + } + private def readFeatureData(tableName: String): DataFrame = { sparkSession.read .format("geopackage") diff --git a/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReaderFactory.scala b/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReaderFactory.scala index b1d38996b04..0f2e9a87b81 100644 --- a/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReaderFactory.scala +++ b/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReaderFactory.scala @@ -25,16 +25,20 @@ import org.apache.sedona.sql.datasources.geopackage.model.{GeoPackageOptions, Ge import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{BoundReference, JoinedRow} +import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory} import org.apache.spark.sql.execution.datasources.FilePartition import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.SerializableConfiguration case class GeoPackagePartitionReaderFactory( sparkSession: SparkSession, broadcastedConf: Broadcast[SerializableConfiguration], loadOptions: GeoPackageOptions, - dataSchema: StructType) + dataSchema: StructType, + metadataSchema: StructType) extends PartitionReaderFactory { override def createReader(partition: InputPartition): PartitionReader[InternalRow] = { @@ -72,7 +76,7 @@ case class GeoPackagePartitionReaderFactory( case _ => None } - GeoPackagePartitionReader( + val baseReader = GeoPackagePartitionReader( rs = rs, options = GeoPackageReadOptions( tableName = loadOptions.tableName, @@ -84,5 +88,52 @@ case class GeoPackagePartitionReaderFactory( broadcastedConf = broadcastedConf, currentTempFile = tempFile, copying = copied) + + if (metadataSchema.nonEmpty) { + val gpkgFile = partitionFiles.head + val filePath = gpkgFile.filePath.toString + val fileName = new Path(filePath).getName + + val allMetadataValues: Map[String, Any] = Map( + "file_path" -> UTF8String.fromString(filePath), + "file_name" -> UTF8String.fromString(fileName), + "file_size" -> gpkgFile.fileSize, + "file_block_start" -> gpkgFile.start, + "file_block_length" -> gpkgFile.length, + "file_modification_time" -> (gpkgFile.modificationTime * 1000L)) + + val innerStructType = metadataSchema.fields.head.dataType.asInstanceOf[StructType] + val prunedValues = innerStructType.fields.map(f => allMetadataValues(f.name)) + val metadataStruct = InternalRow.fromSeq(prunedValues.toSeq) + val metadataRow = InternalRow.fromSeq(Seq(metadataStruct)) + + new PartitionReaderWithMetadata(baseReader, dataSchema, metadataSchema, metadataRow) + } else { + baseReader + } } } + +private[geopackage] class PartitionReaderWithMetadata( + reader: PartitionReader[InternalRow], + baseSchema: StructType, + metadataSchema: StructType, + metadataValues: InternalRow) + extends PartitionReader[InternalRow] { + + private val joinedRow = new JoinedRow() + private val unsafeProjection = + GenerateUnsafeProjection.generate(baseSchema.fields.zipWithIndex.map { case (f, i) => + BoundReference(i, f.dataType, f.nullable) + } ++ metadataSchema.fields.zipWithIndex.map { case (f, i) => + BoundReference(baseSchema.length + i, f.dataType, f.nullable) + }) + + override def next(): Boolean = reader.next() + + override def get(): InternalRow = { + unsafeProjection(joinedRow(reader.get(), metadataValues)) + } + + override def close(): Unit = reader.close() +} diff --git a/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScan.scala b/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScan.scala index 768afd79173..edca3d35ff1 100644 --- a/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScan.scala +++ b/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScan.scala @@ -36,10 +36,14 @@ case class GeoPackageScan( fileIndex: PartitioningAwareFileIndex, readDataSchema: StructType, readPartitionSchema: StructType, + metadataSchema: StructType, options: CaseInsensitiveStringMap, loadOptions: GeoPackageOptions) extends FileScan { + override def readSchema(): StructType = + StructType(readDataSchema.fields ++ readPartitionSchema.fields ++ metadataSchema.fields) + override def partitionFilters: Seq[Expression] = { Seq.empty } @@ -54,6 +58,11 @@ case class GeoPackageScan( val broadcastedConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) - GeoPackagePartitionReaderFactory(sparkSession, broadcastedConf, loadOptions, dataSchema) + GeoPackagePartitionReaderFactory( + sparkSession, + broadcastedConf, + loadOptions, + dataSchema, + metadataSchema) } } diff --git a/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScanBuilder.scala b/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScanBuilder.scala index 829bd9c2201..7fdb716f29f 100644 --- a/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScanBuilder.scala +++ b/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScanBuilder.scala @@ -36,6 +36,18 @@ class GeoPackageScanBuilder( userDefinedSchema: Option[StructType] = None) extends FileScanBuilder(sparkSession, fileIndex, dataSchema) { + private var _requiredMetadataSchema: StructType = StructType(Seq.empty) + + override def pruneColumns(requiredSchema: StructType): Unit = { + val resolver = sparkSession.sessionState.conf.resolver + val metaFields = requiredSchema.fields.filter { field => + !dataSchema.fields.exists(df => resolver(df.name, field.name)) && + !fileIndex.partitionSchema.fields.exists(pf => resolver(pf.name, field.name)) + } + _requiredMetadataSchema = StructType(metaFields) + super.pruneColumns(requiredSchema) + } + override def build(): Scan = { val fileIndexAdjusted = if (loadOptions.showMetadata) @@ -52,6 +64,7 @@ class GeoPackageScanBuilder( fileIndexAdjusted, dataSchema, readPartitionSchema(), + _requiredMetadataSchema, options, loadOptions) } diff --git a/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala b/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala index 85dec8427e4..498933de30b 100644 --- a/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala +++ b/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala @@ -22,11 +22,12 @@ import org.apache.hadoop.fs.FileStatus import org.apache.sedona.sql.datasources.geopackage.connection.{FileSystemUtils, GeoPackageConnectionManager} import org.apache.sedona.sql.datasources.geopackage.model.{GeoPackageOptions, MetadataSchema, TableType} import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.connector.catalog.{MetadataColumn, SupportsMetadataColumns} import org.apache.spark.sql.connector.read.ScanBuilder import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} import org.apache.spark.sql.execution.datasources.FileFormat import org.apache.spark.sql.execution.datasources.v2.FileTable -import org.apache.spark.sql.types.{DoubleType, IntegerType, StringType, StructField, StructType, TimestampType} +import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType, LongType, StringType, StructField, StructType, TimestampType} import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.SerializableConfiguration @@ -40,7 +41,8 @@ case class GeoPackageTable( userSpecifiedSchema: Option[StructType], fallbackFileFormat: Class[_ <: FileFormat], loadOptions: GeoPackageOptions) - extends FileTable(sparkSession, options, paths, userSpecifiedSchema) { + extends FileTable(sparkSession, options, paths, userSpecifiedSchema) + with SupportsMetadataColumns { override def inferSchema(files: Seq[FileStatus]): Option[StructType] = { if (loadOptions.showMetadata) { @@ -74,6 +76,8 @@ case class GeoPackageTable( "GeoPackage" } + override def metadataColumns(): Array[MetadataColumn] = GeoPackageTable.fileMetadataColumns + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { new GeoPackageScanBuilder( sparkSession, @@ -87,5 +91,22 @@ case class GeoPackageTable( override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = { null } +} + +object GeoPackageTable { + + private val FILE_METADATA_STRUCT_TYPE: StructType = StructType( + Seq( + StructField("file_path", StringType, nullable = false), + StructField("file_name", StringType, nullable = false), + StructField("file_size", LongType, nullable = false), + StructField("file_block_start", LongType, nullable = false), + StructField("file_block_length", LongType, nullable = false), + StructField("file_modification_time", TimestampType, nullable = false))) + private[geopackage] val fileMetadataColumns: Array[MetadataColumn] = Array(new MetadataColumn { + override def name: String = "_metadata" + override def dataType: DataType = FILE_METADATA_STRUCT_TYPE + override def isNullable: Boolean = false + }) } diff --git a/spark/spark-4.1/src/test/scala/org/apache/sedona/sql/GeoPackageReaderTest.scala b/spark/spark-4.1/src/test/scala/org/apache/sedona/sql/GeoPackageReaderTest.scala index 66fa147bc58..f37298661fe 100644 --- a/spark/spark-4.1/src/test/scala/org/apache/sedona/sql/GeoPackageReaderTest.scala +++ b/spark/spark-4.1/src/test/scala/org/apache/sedona/sql/GeoPackageReaderTest.scala @@ -19,7 +19,7 @@ package org.apache.sedona.sql import io.minio.{MakeBucketArgs, MinioClient, PutObjectArgs} -import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.sql.functions.expr import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT import org.apache.spark.sql.types.{BinaryType, BooleanType, DateType, DoubleType, IntegerType, StringType, StructField, StructType, TimestampType} @@ -321,6 +321,78 @@ class GeoPackageReaderTest extends TestBaseScala with Matchers { } } + describe("_metadata hidden column support") { + it("should expose _metadata struct with all expected fields") { + val df = readFeatureData("point1") + val metaDf = df.select("_metadata") + val metaSchema = metaDf.schema.fields.head.dataType.asInstanceOf[StructType] + val fieldNames = metaSchema.fieldNames.toSet + fieldNames should contain("file_path") + fieldNames should contain("file_name") + fieldNames should contain("file_size") + fieldNames should contain("file_block_start") + fieldNames should contain("file_block_length") + fieldNames should contain("file_modification_time") + } + + it("should not include _metadata in select(*)") { + val df = readFeatureData("point1") + val starCols = df.select("*").columns.toSet + starCols should not contain "_metadata" + } + + it("should return correct file_path and file_name in _metadata") { + val df = readFeatureData("point1") + val row = df.select("_metadata.file_path", "_metadata.file_name").head() + val filePath = row.getString(0) + val fileName = row.getString(1) + filePath should endWith("example.gpkg") + fileName shouldEqual "example.gpkg" + } + + it("should return actual file_size matching the .gpkg file on disk") { + val df = readFeatureData("point1") + val metaFileSize = df.select("_metadata.file_size").head().getLong(0) + val actualFile = new java.io.File(path) + metaFileSize shouldEqual actualFile.length() + } + + it("should return file_block_start=0 and file_block_length=file_size") { + val df = readFeatureData("point1") + val row = df + .select( + "_metadata.file_block_start", + "_metadata.file_block_length", + "_metadata.file_size") + .head() + row.getLong(0) shouldEqual 0L + row.getLong(1) shouldEqual row.getLong(2) + } + + it("should return file_modification_time matching the .gpkg file on disk") { + val df = readFeatureData("point1") + val metaModTime = df.select("_metadata.file_modification_time").head().getTimestamp(0) + val actualFile = new java.io.File(path) + val expectedModTime = new java.sql.Timestamp(actualFile.lastModified()) + metaModTime shouldEqual expectedModTime + } + + it("should allow filtering on _metadata fields") { + val df = readFeatureData("point1") + val filtered = df.filter(df("_metadata.file_name") === "example.gpkg") + filtered.count() shouldEqual df.count() + val empty = df.filter(df("_metadata.file_name") === "nonexistent.gpkg") + empty.count() shouldEqual 0 + } + + it("should select _metadata along with data columns") { + val df = readFeatureData("point1") + val result = df.select("id", "_metadata.file_name").head() + result.getInt(0) shouldEqual 1 + result.getString(1) shouldEqual "example.gpkg" + } + } + private def readFeatureData(tableName: String): DataFrame = { sparkSession.read .format("geopackage")