Skip to content

Commit

Permalink
fix: fix various unit test failures in native_datafusion and native_i…
Browse files Browse the repository at this point in the history
…ceberg_compat readers (#1415)

major changes :
- allow Uint64 to decimal and  FixedWidthBinary to Binary conversions in complex readers
- do not enable prefetch reads in tests if complex reader is enabled
- fix more incompatible checks in uint_8/uint_16 tests
- skip datetime rebase tests for complex readers (not supported)

There may be conflicts between this and #1413  (@mbutrovich) which removes the `cast_supported` method but can be reconciled afterwards

Without #1413 the failure counts are: 
```
native_datafusion: Tests: succeeded 658, failed 19, canceled 4, ignored 54, pending 0
native_iceberg_compat: Tests: succeeded 662, failed 15, canceled 4, ignored 54, pending 0
```
  • Loading branch information
parthchandra authored Feb 20, 2025
1 parent d61bcfc commit 55ae543
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,8 @@ public void init() throws URISyntaxException, IOException {
} ////// End get requested schema

String timeZoneId = conf.get("spark.sql.session.timeZone");
Schema arrowSchema = Utils$.MODULE$.toArrowSchema(sparkSchema, timeZoneId);
// Native code uses "UTC" always as the timeZoneId when converting from spark to arrow schema.
Schema arrowSchema = Utils$.MODULE$.toArrowSchema(sparkSchema, "UTC");
ByteArrayOutputStream out = new ByteArrayOutputStream();
WriteChannel writeChannel = new WriteChannel(Channels.newChannel(out));
MessageSerializer.serialize(writeChannel, arrowSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class CometParquetFileFormat extends ParquetFileFormat with MetricsSupport with

// Comet specific configurations
val capacity = CometConf.COMET_BATCH_SIZE.get(sqlConf)
val nativeRecordBatchReaderEnabled =
val nativeIcebergCompat =
CometConf.COMET_NATIVE_SCAN_IMPL.get(sqlConf).equals(CometConf.SCAN_NATIVE_ICEBERG_COMPAT)

(file: PartitionedFile) => {
Expand Down Expand Up @@ -137,7 +137,7 @@ class CometParquetFileFormat extends ParquetFileFormat with MetricsSupport with
pushed.foreach(p => ParquetInputFormat.setFilterPredicate(sharedConf, p))

val recordBatchReader =
if (nativeRecordBatchReaderEnabled) {
if (nativeIcebergCompat) {
val batchReader = new NativeBatchReader(
sharedConf,
file,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,26 @@ case class CometParquetPartitionReaderFactory(
// Comet specific configurations
private val batchSize = CometConf.COMET_BATCH_SIZE.get(sqlConf)

@transient private lazy val usingDataFusionReader: Boolean = {
val conf = broadcastedConf.value.value
conf.getBoolean(
CometConf.COMET_NATIVE_SCAN_ENABLED.key,
CometConf.COMET_NATIVE_SCAN_ENABLED.defaultValue.get) &&
conf
.get(
CometConf.COMET_NATIVE_SCAN_IMPL.key,
CometConf.COMET_NATIVE_SCAN_IMPL.defaultValueString)
.equalsIgnoreCase(CometConf.SCAN_NATIVE_ICEBERG_COMPAT)
}
// This is only called at executor on a Broadcast variable, so we don't want it to be
// materialized at driver.
@transient private lazy val preFetchEnabled = {
val conf = broadcastedConf.value.value

conf.getBoolean(
CometConf.COMET_SCAN_PREFETCH_ENABLED.key,
CometConf.COMET_SCAN_PREFETCH_ENABLED.defaultValue.get)
CometConf.COMET_SCAN_PREFETCH_ENABLED.defaultValue.get) &&
!usingDataFusionReader // Turn off prefetch if native_iceberg_compat is enabled
}

private var cometReaders: Iterator[BatchReader] = _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,9 +413,20 @@ case class CometScanExec(
readFile: (PartitionedFile) => Iterator[InternalRow],
partitions: Seq[FilePartition]): RDD[InternalRow] = {
val hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options)
val usingDataFusionReader: Boolean = {
hadoopConf.getBoolean(
CometConf.COMET_NATIVE_SCAN_ENABLED.key,
CometConf.COMET_NATIVE_SCAN_ENABLED.defaultValue.get) &&
hadoopConf
.get(
CometConf.COMET_NATIVE_SCAN_IMPL.key,
CometConf.COMET_NATIVE_SCAN_IMPL.defaultValueString)
.equalsIgnoreCase(CometConf.SCAN_NATIVE_ICEBERG_COMPAT)
}
val prefetchEnabled = hadoopConf.getBoolean(
CometConf.COMET_SCAN_PREFETCH_ENABLED.key,
CometConf.COMET_SCAN_PREFETCH_ENABLED.defaultValue.get)
CometConf.COMET_SCAN_PREFETCH_ENABLED.defaultValue.get) &&
!usingDataFusionReader

val sqlConf = fsRelation.sparkSession.sessionState.conf
if (prefetchEnabled) {
Expand Down
69 changes: 38 additions & 31 deletions spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
Seq(true, false).foreach { dictionaryEnabled =>
withTempDir { dir =>
val path = new Path(dir.toURI.toString, "test.parquet")
makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 10000)
withSQLConf(CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "false") {
makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 10000)
}
withParquetTable(path.toString, "tbl") {
checkSparkAnswerAndOperator("select * FROM tbl WHERE _2 > 100")
}
Expand Down Expand Up @@ -1273,32 +1275,36 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
-128,
128,
randomSize = 100)
withParquetTable(path.toString, "tbl") {
for (s <- Seq(-5, -1, 0, 1, 5, -1000, 1000, -323, -308, 308, -15, 15, -16, 16, null)) {
// array tests
// TODO: enable test for floats (_6, _7, _8, _13)
for (c <- Seq(2, 3, 4, 5, 9, 10, 11, 12, 15, 16, 17)) {
checkSparkAnswerAndOperator(s"select _${c}, round(_${c}, ${s}) FROM tbl")
}
// scalar tests
// Exclude the constant folding optimizer in order to actually execute the native round
// operations for scalar (literal) values.
// TODO: comment in the tests for float once supported
withSQLConf(
"spark.sql.optimizer.excludedRules" -> "org.apache.spark.sql.catalyst.optimizer.ConstantFolding") {
for (n <- Seq("0.0", "-0.0", "0.5", "-0.5", "1.2", "-1.2")) {
checkSparkAnswerAndOperator(s"select round(cast(${n} as tinyint), ${s}) FROM tbl")
// checkSparkAnswerAndCometOperators(s"select round(cast(${n} as float), ${s}) FROM tbl")
checkSparkAnswerAndOperator(
s"select round(cast(${n} as decimal(38, 18)), ${s}) FROM tbl")
checkSparkAnswerAndOperator(
s"select round(cast(${n} as decimal(20, 0)), ${s}) FROM tbl")
withSQLConf(CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "true") {
withParquetTable(path.toString, "tbl") {
for (s <- Seq(-5, -1, 0, 1, 5, -1000, 1000, -323, -308, 308, -15, 15, -16, 16,
null)) {
// array tests
// TODO: enable test for floats (_6, _7, _8, _13)
for (c <- Seq(2, 3, 4, 5, 9, 10, 11, 12, 15, 16, 17)) {
checkSparkAnswerAndOperator(s"select _${c}, round(_${c}, ${s}) FROM tbl")
}
// scalar tests
// Exclude the constant folding optimizer in order to actually execute the native round
// operations for scalar (literal) values.
// TODO: comment in the tests for float once supported
withSQLConf(
"spark.sql.optimizer.excludedRules" -> "org.apache.spark.sql.catalyst.optimizer.ConstantFolding") {
for (n <- Seq("0.0", "-0.0", "0.5", "-0.5", "1.2", "-1.2")) {
checkSparkAnswerAndOperator(
s"select round(cast(${n} as tinyint), ${s}) FROM tbl")
// checkSparkAnswerAndCometOperators(s"select round(cast(${n} as float), ${s}) FROM tbl")
checkSparkAnswerAndOperator(
s"select round(cast(${n} as decimal(38, 18)), ${s}) FROM tbl")
checkSparkAnswerAndOperator(
s"select round(cast(${n} as decimal(20, 0)), ${s}) FROM tbl")
}
// checkSparkAnswer(s"select round(double('infinity'), ${s}) FROM tbl")
// checkSparkAnswer(s"select round(double('-infinity'), ${s}) FROM tbl")
// checkSparkAnswer(s"select round(double('NaN'), ${s}) FROM tbl")
// checkSparkAnswer(
// s"select round(double('0.000000000000000000000000000000000001'), ${s}) FROM tbl")
}
// checkSparkAnswer(s"select round(double('infinity'), ${s}) FROM tbl")
// checkSparkAnswer(s"select round(double('-infinity'), ${s}) FROM tbl")
// checkSparkAnswer(s"select round(double('NaN'), ${s}) FROM tbl")
// checkSparkAnswer(
// s"select round(double('0.000000000000000000000000000000000001'), ${s}) FROM tbl")
}
}
}
Expand Down Expand Up @@ -1491,11 +1497,12 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
Seq(true, false).foreach { dictionaryEnabled =>
withTempDir { dir =>
val path = new Path(dir.toURI.toString, "hex.parquet")
makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 10000)

withParquetTable(path.toString, "tbl") {
checkSparkAnswerAndOperator(
"SELECT hex(_1), hex(_2), hex(_3), hex(_4), hex(_5), hex(_6), hex(_7), hex(_8), hex(_9), hex(_10), hex(_11), hex(_12), hex(_13), hex(_14), hex(_15), hex(_16), hex(_17), hex(_18), hex(_19), hex(_20) FROM tbl")
withSQLConf(CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "true") {
makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 10000)
withParquetTable(path.toString, "tbl") {
checkSparkAnswerAndOperator(
"SELECT hex(_1), hex(_2), hex(_3), hex(_4), hex(_5), hex(_6), hex(_7), hex(_8), hex(_9), hex(_10), hex(_11), hex(_12), hex(_13), hex(_14), hex(_15), hex(_16), hex(_17), hex(_18), hex(_19), hex(_20) FROM tbl")
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.sql.functions.col
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

import org.apache.comet.CometConf
import org.apache.comet.{CometConf, CometSparkSessionExtensions}

abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSparkPlanHelper {
protected val adaptiveExecutionEnabled: Boolean
Expand Down Expand Up @@ -740,28 +740,50 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar
withTempDir { dir =>
val path = new Path(dir.toURI.toString, "test.parquet")
makeParquetFileAllTypes(path, false, 10000, 10010)

Seq(
$"_1",
$"_2",
$"_3",
$"_4",
$"_5",
$"_6",
$"_7",
$"_8",
$"_9",
$"_10",
$"_11",
$"_12",
$"_13",
$"_14",
$"_15",
$"_16",
$"_17",
$"_18",
$"_19",
$"_20").foreach { col =>
// TODO: revisit this when we have resolution of https://github.com/apache/arrow-rs/issues/7040
// and https://github.com/apache/arrow-rs/issues/7097
val fieldsToTest = if (CometSparkSessionExtensions.usingDataFusionParquetExec(conf)) {
Seq(
$"_1",
$"_4",
$"_5",
$"_6",
$"_7",
$"_8",
$"_11",
$"_12",
$"_13",
$"_14",
$"_15",
$"_16",
$"_17",
$"_18",
$"_19",
$"_20")
} else {
Seq(
$"_1",
$"_2",
$"_3",
$"_4",
$"_5",
$"_6",
$"_7",
$"_8",
$"_9",
$"_10",
$"_11",
$"_12",
$"_13",
$"_14",
$"_15",
$"_16",
$"_17",
$"_18",
$"_19",
$"_20")
}
fieldsToTest.foreach { col =>
readParquetFile(path.toString) { df =>
val shuffled = df.select(col).repartition(numPartitions, col)
checkShuffleAnswer(shuffled, 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import org.apache.spark.unsafe.types.UTF8String
import com.google.common.primitives.UnsignedLong

import org.apache.comet.{CometConf, CometSparkSessionExtensions}
import org.apache.comet.CometSparkSessionExtensions.{isSpark34Plus, isSpark40Plus}
import org.apache.comet.CometSparkSessionExtensions.{isSpark34Plus, isSpark40Plus, usingDataFusionParquetExec}

abstract class ParquetReadSuite extends CometTestBase {
import testImplicits._
Expand Down Expand Up @@ -82,6 +82,11 @@ abstract class ParquetReadSuite extends CometTestBase {
}

test("unsupported Spark types") {
// for native iceberg compat, CometScanExec supports some types that native_comet does not.
// note that native_datafusion does not use CometScanExec so we need not include that in
// the check
val usingNativeIcebergCompat =
(CometConf.COMET_NATIVE_SCAN_IMPL.get() == CometConf.SCAN_NATIVE_ICEBERG_COMPAT)
Seq(
NullType -> false,
BooleanType -> true,
Expand All @@ -97,13 +102,19 @@ abstract class ParquetReadSuite extends CometTestBase {
StructType(
Seq(
StructField("f1", DecimalType.SYSTEM_DEFAULT),
StructField("f2", StringType))) -> false,
StructField("f2", StringType))) -> usingNativeIcebergCompat,
MapType(keyType = LongType, valueType = DateType) -> false,
StructType(Seq(StructField("f1", ByteType), StructField("f2", StringType))) -> false,
StructType(
Seq(
StructField("f1", ByteType),
StructField("f2", StringType))) -> usingNativeIcebergCompat,
MapType(keyType = IntegerType, valueType = BinaryType) -> false).foreach {
case (dt, expected) =>
assert(CometScanExec.isTypeSupported(dt) == expected)
assert(CometBatchScanExec.isTypeSupported(dt) == expected)
// usingDataFusionParquetExec does not support CometBatchScanExec yet
if (!usingDataFusionParquetExec(conf)) {
assert(CometBatchScanExec.isTypeSupported(dt) == expected)
}
}
}

Expand Down Expand Up @@ -1001,7 +1012,8 @@ abstract class ParquetReadSuite extends CometTestBase {
Seq(StructField("_1", LongType, false), StructField("_2", DoubleType, false)))

withParquetDataFrame(data, schema = Some(readSchema)) { df =>
if (enableSchemaEvolution) {
// TODO: validate with Spark 3.x and 'usingDataFusionParquetExec=true'
if (enableSchemaEvolution || usingDataFusionParquetExec(conf)) {
checkAnswer(df, data.map(Row.fromTuple))
} else {
assertThrows[SparkException](df.collect())
Expand Down Expand Up @@ -1162,8 +1174,7 @@ abstract class ParquetReadSuite extends CometTestBase {
test("row group skipping doesn't overflow when reading into larger type") {
// Spark 4.0 no longer fails for widening types SPARK-40876
// https://github.com/apache/spark/commit/3361f25dc0ff6e5233903c26ee105711b79ba967
assume(isSpark34Plus && !isSpark40Plus)

assume(isSpark34Plus && !isSpark40Plus && !usingDataFusionParquetExec(conf))
withTempPath { path =>
Seq(0).toDF("a").write.parquet(path.toString)
// Reading integer 'a' as a long isn't supported. Check that an exception is raised instead
Expand Down Expand Up @@ -1460,9 +1471,12 @@ class ParquetReadV1Suite extends ParquetReadSuite with AdaptiveSparkPlanHelper {
class ParquetReadV2Suite extends ParquetReadSuite with AdaptiveSparkPlanHelper {
override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit
pos: Position): Unit = {
super.test(testName, testTags: _*)(withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "") {
testFun
})(pos)
super.test(testName, testTags: _*)(
withSQLConf(
SQLConf.USE_V1_SOURCE_LIST.key -> "",
CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) {
testFun
})(pos)
}

override def checkParquetScan[T <: Product: ClassTag: TypeTag](
Expand Down
Loading

0 comments on commit 55ae543

Please sign in to comment.