From d1e7dba1f46a827fa412321f58ecc6742abcd83a Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 5 Feb 2025 08:58:16 -0800 Subject: [PATCH 1/9] add failing test --- .../apache/comet/testing/ParquetGenerator.scala | 10 +++++----- .../apache/comet/exec/CometAggregateSuite.scala | 15 +++++++++++++++ 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/testing/ParquetGenerator.scala b/spark/src/main/scala/org/apache/comet/testing/ParquetGenerator.scala index f209cc4c9..acb565ec0 100644 --- a/spark/src/main/scala/org/apache/comet/testing/ParquetGenerator.scala +++ b/spark/src/main/scala/org/apache/comet/testing/ParquetGenerator.scala @@ -212,8 +212,8 @@ object ParquetGenerator { } case class DataGenOptions( - allowNull: Boolean, - generateNegativeZero: Boolean, - generateArray: Boolean, - generateStruct: Boolean, - generateMap: Boolean) + allowNull: Boolean = true, + generateNegativeZero: Boolean = true, + generateArray: Boolean = false, + generateStruct: Boolean = false, + generateMap: Boolean = false) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala index 6795f3d55..3ad5f6d21 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.comet.CometConf import org.apache.comet.CometSparkSessionExtensions.isSpark34Plus +import org.apache.comet.testing.{DataGenOptions, ParquetGenerator} /** * Test suite dedicated to Comet native aggregate operator @@ -39,6 +40,20 @@ import org.apache.comet.CometSparkSessionExtensions.isSpark34Plus class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { import testImplicits._ + test("avg decimal") { + withTempDir { dir => + val path = new Path(dir.toURI.toString, "test.parquet") + val filename = path.toString + val random = new Random(42) + withSQLConf(CometConf.COMET_ENABLED.key -> "false") { + ParquetGenerator.makeParquetFile(random, spark, filename, 10000, DataGenOptions()) + } + val table = spark.read.parquet(filename).coalesce(1) + table.createOrReplaceTempView("t1") + checkSparkAnswer("SELECT c1, avg(c7) FROM t1 GROUP BY c1 ORDER BY c1") + } + } + test("stddev_pop should return NaN for some cases") { withSQLConf( CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", From d7979e1c012bffe3065169e3e0ec69bed2558036 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 6 Feb 2025 06:53:32 -0800 Subject: [PATCH 2/9] Mark cast from float/double to decimal as incompat --- .../scala/org/apache/comet/expressions/CometCast.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala b/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala index 6b0b10d80..9aa7f7c8b 100644 --- a/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala +++ b/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala @@ -267,7 +267,9 @@ object CometCast { case DataTypes.BooleanType | DataTypes.DoubleType | DataTypes.ByteType | DataTypes.ShortType | DataTypes.IntegerType | DataTypes.LongType => Compatible() - case _: DecimalType => Compatible() + case _: DecimalType => + // https://github.com/apache/datafusion-comet/issues/1371 + Incompatible(Some("There can be rounding differences")) case _ => Unsupported } @@ -275,7 +277,9 @@ object CometCast { case DataTypes.BooleanType | DataTypes.FloatType | DataTypes.ByteType | DataTypes.ShortType | DataTypes.IntegerType | DataTypes.LongType => Compatible() - case _: DecimalType => Compatible() + case _: DecimalType => + // https://github.com/apache/datafusion-comet/issues/1371 + Incompatible(Some("There can be rounding differences")) case _ => Unsupported } From 66d25c3b20571acd98b11888ce271de4f2465e87 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 6 Feb 2025 06:55:57 -0800 Subject: [PATCH 3/9] update docs --- docs/source/user-guide/compatibility.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/source/user-guide/compatibility.md b/docs/source/user-guide/compatibility.md index 893326e19..d603c0b99 100644 --- a/docs/source/user-guide/compatibility.md +++ b/docs/source/user-guide/compatibility.md @@ -117,7 +117,6 @@ The following cast operations are generally compatible with Spark except for the | float | integer | | | float | long | | | float | double | | -| float | decimal | | | float | string | There can be differences in precision. For example, the input "1.4E-45" will produce 1.0E-45 instead of 1.4E-45 | | double | boolean | | | double | byte | | @@ -125,7 +124,6 @@ The following cast operations are generally compatible with Spark except for the | double | integer | | | double | long | | | double | float | | -| double | decimal | | | double | string | There can be differences in precision. For example, the input "1.4E-45" will produce 1.0E-45 instead of 1.4E-45 | | decimal | byte | | | decimal | short | | @@ -154,6 +152,8 @@ The following cast operations are not compatible with Spark for all inputs and a |-|-|-| | integer | decimal | No overflow check | | long | decimal | No overflow check | +| float | decimal | There can be rounding differences | +| double | decimal | There can be rounding differences | | string | float | Does not support inputs ending with 'd' or 'f'. Does not support 'inf'. Does not support ANSI mode. | | string | double | Does not support inputs ending with 'd' or 'f'. Does not support 'inf'. Does not support ANSI mode. | | string | decimal | Does not support inputs ending with 'd' or 'f'. Does not support 'inf'. Does not support ANSI mode. Returns 0.0 instead of null if input contains no digits | From 2e61c79044c9a565384a82dc37d738055f23da7c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 6 Feb 2025 07:00:40 -0800 Subject: [PATCH 4/9] update cast tests --- .../scala/org/apache/comet/CometCastSuite.scala | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala index 27d8e2357..3b75da9a0 100644 --- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala @@ -348,10 +348,16 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { castTest(generateFloats(), DataTypes.DoubleType) } - test("cast FloatType to DecimalType(10,2)") { + ignore("cast FloatType to DecimalType(10,2)") { castTest(generateFloats(), DataTypes.createDecimalType(10, 2)) } + test("cast FloatType to DecimalType(10,2) - allow incompat") { + withSQLConf(CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + castTest(generateFloats(), DataTypes.createDecimalType(10, 2)) + } + } + test("cast FloatType to StringType") { // https://github.com/apache/datafusion-comet/issues/312 val r = new Random(0) @@ -401,10 +407,16 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { castTest(generateDoubles(), DataTypes.FloatType) } - test("cast DoubleType to DecimalType(10,2)") { + ignore("cast DoubleType to DecimalType(10,2)") { castTest(generateDoubles(), DataTypes.createDecimalType(10, 2)) } + test("cast DoubleType to DecimalType(10,2) - allow incompat") { + withSQLConf(CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + castTest(generateDoubles(), DataTypes.createDecimalType(10, 2)) + } + } + test("cast DoubleType to StringType") { // https://github.com/apache/datafusion-comet/issues/312 val r = new Random(0) From 0c5fdcda2adf57376be7f6aa190378653c08fac5 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 6 Feb 2025 07:02:13 -0800 Subject: [PATCH 5/9] link to issue --- spark/src/test/scala/org/apache/comet/CometCastSuite.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala index 3b75da9a0..e2b2ed55a 100644 --- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala @@ -349,6 +349,7 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { } ignore("cast FloatType to DecimalType(10,2)") { + // // https://github.com/apache/datafusion-comet/issues/1371 castTest(generateFloats(), DataTypes.createDecimalType(10, 2)) } @@ -408,6 +409,7 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { } ignore("cast DoubleType to DecimalType(10,2)") { + // https://github.com/apache/datafusion-comet/issues/1371 castTest(generateDoubles(), DataTypes.createDecimalType(10, 2)) } From 1fc6abe9e4d341997c415a6db6605afa9c713da3 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 6 Feb 2025 08:43:03 -0800 Subject: [PATCH 6/9] fix regressions --- .../test/scala/org/apache/comet/exec/CometAggregateSuite.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala index 3ad5f6d21..d625d8b64 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala @@ -882,6 +882,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { withSQLConf( CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", + CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true", CometConf.COMET_SHUFFLE_MODE.key -> "native") { Seq(true, false).foreach { dictionaryEnabled => withSQLConf("parquet.enable.dictionary" -> dictionaryEnabled.toString) { @@ -929,6 +930,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("avg null handling") { withSQLConf( CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", + CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true", CometConf.COMET_SHUFFLE_MODE.key -> "native") { val table = "t1" withTable(table) { From 1f8a351961b3f78c59f020ac5b91236d65bc7027 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 6 Feb 2025 15:03:54 -0800 Subject: [PATCH 7/9] use unique table name in test --- .../org/apache/comet/exec/CometAggregateSuite.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala index d625d8b64..206c0c1a0 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala @@ -886,7 +886,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { CometConf.COMET_SHUFFLE_MODE.key -> "native") { Seq(true, false).foreach { dictionaryEnabled => withSQLConf("parquet.enable.dictionary" -> dictionaryEnabled.toString) { - val table = "t1" + val table = s"final_decimal_avg_$dictionaryEnabled" withTable(table) { sql(s"create table $table(a decimal(38, 37), b INT) using parquet") sql(s"insert into $table values(-0.0000000000000000000000000000000000002, 1)") @@ -900,13 +900,13 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { sql(s"insert into $table values(0.13344406545919155429936259114971302408, 5)") sql(s"insert into $table values(0.13344406545919155429936259114971302408, 5)") - checkSparkAnswerAndNumOfAggregates("SELECT b , AVG(a) FROM t1 GROUP BY b", 2) - checkSparkAnswerAndNumOfAggregates("SELECT AVG(a) FROM t1", 2) + checkSparkAnswerAndNumOfAggregates(s"SELECT b , AVG(a) FROM $table GROUP BY b", 2) + checkSparkAnswerAndNumOfAggregates(s"SELECT AVG(a) FROM $table", 2) checkSparkAnswerAndNumOfAggregates( - "SELECT b, MIN(a), MAX(a), COUNT(a), SUM(a), AVG(a) FROM t1 GROUP BY b", + s"SELECT b, MIN(a), MAX(a), COUNT(a), SUM(a), AVG(a) FROM $table GROUP BY b", 2) checkSparkAnswerAndNumOfAggregates( - "SELECT MIN(a), MAX(a), COUNT(a), SUM(a), AVG(a) FROM t1", + s"SELECT MIN(a), MAX(a), COUNT(a), SUM(a), AVG(a) FROM $table", 2) } } From 206efe0ea23ab55a6d3f3b8a56e4b8d60141d3f0 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 6 Feb 2025 16:28:06 -0800 Subject: [PATCH 8/9] use withTable --- .../org/apache/comet/exec/CometAggregateSuite.scala | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala index 206c0c1a0..8e0ef777a 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala @@ -48,9 +48,12 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { withSQLConf(CometConf.COMET_ENABLED.key -> "false") { ParquetGenerator.makeParquetFile(random, spark, filename, 10000, DataGenOptions()) } - val table = spark.read.parquet(filename).coalesce(1) - table.createOrReplaceTempView("t1") - checkSparkAnswer("SELECT c1, avg(c7) FROM t1 GROUP BY c1 ORDER BY c1") + val tableName = "avg_decimal" + withTable(tableName) { + val table = spark.read.parquet(filename).coalesce(1) + table.createOrReplaceTempView(tableName) + checkSparkAnswer(s"SELECT c1, avg(c7) FROM $tableName GROUP BY c1 ORDER BY c1") + } } } @@ -930,9 +933,8 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("avg null handling") { withSQLConf( CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true", CometConf.COMET_SHUFFLE_MODE.key -> "native") { - val table = "t1" + val table = "avg_null_handling" withTable(table) { sql(s"create table $table(a double, b double) using parquet") sql(s"insert into $table values(1, 1.0)") From 880328b2487a3e96a894cc5ca53ba0a0d007027b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 7 Feb 2025 10:45:33 -0800 Subject: [PATCH 9/9] address feedback --- .../scala/org/apache/comet/exec/CometAggregateSuite.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala index 8e0ef777a..3215a984e 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala @@ -52,6 +52,10 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { withTable(tableName) { val table = spark.read.parquet(filename).coalesce(1) table.createOrReplaceTempView(tableName) + // we fall back to Spark for avg on decimal due to the following issue + // https://github.com/apache/datafusion-comet/issues/1371 + // once this is fixed, we should change this test to + // checkSparkAnswerAndNumOfAggregates checkSparkAnswer(s"SELECT c1, avg(c7) FROM $tableName GROUP BY c1 ORDER BY c1") } }