Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -738,14 +738,14 @@ under the License.
<artifactId>scala-maven-plugin</artifactId>
<configuration>
<args>
<arg>-deprecation</arg>
<arg>-unchecked</arg>
<arg>-feature</arg>
<arg>-Xlint:_</arg>
<arg>-Ywarn-dead-code</arg>
<!-- <arg>-deprecation</arg>-->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these intended changes down here?

<!-- <arg>-unchecked</arg>-->
<!-- <arg>-feature</arg>-->
<!-- <arg>-Xlint:_</arg>-->
<!-- <arg>-Ywarn-dead-code</arg>-->
<arg>-Ywarn-numeric-widen</arg>
<arg>-Ywarn-value-discard</arg>
<arg>-Ywarn-unused:imports,patvars,privates,locals,params,-implicits</arg>
<!-- <arg>-Ywarn-value-discard</arg>-->
<!-- <arg>-Ywarn-unused:imports,patvars,privates,locals,params,-implicits</arg>-->
<arg>-Xfatal-warnings</arg>
</args>
</configuration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ object SourceFilterSerde extends Logging {
// refer to org.apache.spark.sql.catalyst.CatalystTypeConverters.CatalystTypeConverter#toScala
dataType match {
case _: BooleanType => exprBuilder.setBoolVal(value.asInstanceOf[Boolean])
case _: ByteType => exprBuilder.setByteVal(value.asInstanceOf[Byte])
case _: ShortType => exprBuilder.setShortVal(value.asInstanceOf[Short])
case _: ByteType => exprBuilder.setByteVal(value.asInstanceOf[Byte].toInt)
case _: ShortType => exprBuilder.setShortVal(value.asInstanceOf[Short].toInt)
case _: IntegerType => exprBuilder.setIntVal(value.asInstanceOf[Int])
case _: LongType => exprBuilder.setLongVal(value.asInstanceOf[Long])
case _: FloatType => exprBuilder.setFloatVal(value.asInstanceOf[Float])
Expand Down
4 changes: 2 additions & 2 deletions spark/src/main/scala/org/apache/comet/serde/literals.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ object CometLiteral extends CometExpressionSerde[Literal] with Logging {
exprBuilder.setIsNull(false)
dataType match {
case _: BooleanType => exprBuilder.setBoolVal(value.asInstanceOf[Boolean])
case _: ByteType => exprBuilder.setByteVal(value.asInstanceOf[Byte])
case _: ShortType => exprBuilder.setShortVal(value.asInstanceOf[Short])
case _: ByteType => exprBuilder.setByteVal(value.asInstanceOf[Byte].toInt)
case _: ShortType => exprBuilder.setShortVal(value.asInstanceOf[Short].toInt)
case _: IntegerType | _: DateType => exprBuilder.setIntVal(value.asInstanceOf[Int])
case _: LongType | _: TimestampType | _: TimestampNTZType =>
exprBuilder.setLongVal(value.asInstanceOf[Long])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ case class CometBatchScanExec(wrapped: BatchScanExec, runtimeFilters: Seq[Expres

override def next(): ColumnarBatch = {
val batch = batches.next()
numOutputRows += batch.numRows()
numOutputRows += batch.numRows().toLong
batch
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ case class CometBroadcastExchangeExec(
longMetric("numOutputRows") += numRows
if (numRows >= maxBroadcastRows) {
throw QueryExecutionErrors.cannotBroadcastTableOverMaxTableRowsError(
maxBroadcastRows,
maxBroadcastRows.toLong,
numRows)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ case class CometCollectLimitExec(
outputPartitioning,
serializer,
metrics)
metrics("numPartitions").set(dep.partitioner.numPartitions)
metrics("numPartitions").set(dep.partitioner.numPartitions.toLong)

new CometShuffledBatchRDD(dep, readMetrics)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ case class CometColumnarToRowExec(child: SparkPlan)
val toUnsafe = UnsafeProjection.create(localOutput, localOutput)
batches.flatMap { batch =>
numInputBatches += 1
numOutputRows += batch.numRows()
numOutputRows += batch.numRows().toLong
batch.rowIterator().asScala.map(toUnsafe)
}
}
Expand Down Expand Up @@ -120,7 +120,7 @@ case class CometColumnarToRowExec(child: SparkPlan)
.flatMap(CometUtils.decodeBatches(_, this.getClass.getSimpleName))
.flatMap { batch =>
numInputBatches += 1
numOutputRows += batch.numRows()
numOutputRows += batch.numRows().toLong
batch.rowIterator().asScala.map(toUnsafe)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ case class CometScanExec(
driverMetrics("staticFilesSize") = filesSize
}
if (relation.partitionSchema.nonEmpty) {
driverMetrics("numPartitions") = partitions.length
driverMetrics("numPartitions") = partitions.length.toLong
}
}

Expand Down Expand Up @@ -284,7 +284,7 @@ case class CometScanExec(

override def next(): ColumnarBatch = {
val batch = batches.next()
numOutputRows += batch.numRows()
numOutputRows += batch.numRows().toLong
batch
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ case class CometSparkToColumnarExec(child: SparkPlan)
val startNs = System.nanoTime()
val batch = iter.next()
conversionTime += System.nanoTime() - startNs
numInputRows += batch.numRows()
numInputRows += batch.numRows().toLong
numOutputBatches += 1
batch
}
Expand Down Expand Up @@ -123,7 +123,7 @@ case class CometSparkToColumnarExec(child: SparkPlan)
CometArrowConverters.rowToArrowBatchIter(
sparkBatches,
schema,
maxRecordsPerBatch,
maxRecordsPerBatch.toLong,
timeZoneId,
context)
createTimingIter(arrowBatches, numInputRows, numOutputBatches, conversionTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ case class CometTakeOrderedAndProjectExec(
outputPartitioning,
serializer,
metrics)
metrics("numPartitions").set(dep.partitioner.numPartitions)
metrics("numPartitions").set(dep.partitioner.numPartitions.toLong)

new CometShuffledBatchRDD(dep, readMetrics)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ class CometBlockStoreShuffleReader[K, C](
// Update the context task metrics for each record read.
val metricIter = CompletionIterator[(Any, Any), Iterator[(Any, Any)]](
recordIter.map { record =>
readMetrics.incRecordsRead(record._2.numRows())
readMetrics.incRecordsRead(record._2.numRows().toLong)
record
},
context.taskMetrics().mergeShuffleReadMetrics())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ case class CometShuffleExchangeExec(
outputPartitioning,
serializer,
metrics)
metrics("numPartitions").set(dep.partitioner.numPartitions)
metrics("numPartitions").set(dep.partitioner.numPartitions.toLong)
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
SQLMetrics.postDriverMetricUpdates(
sparkContext,
Expand All @@ -151,7 +151,7 @@ case class CometShuffleExchangeExec(
outputPartitioning,
serializer,
metrics)
metrics("numPartitions").set(dep.partitioner.numPartitions)
metrics("numPartitions").set(dep.partitioner.numPartitions.toLong)
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
SQLMetrics.postDriverMetricUpdates(
sparkContext,
Expand Down Expand Up @@ -385,7 +385,7 @@ object CometShuffleExchangeExec extends ShimCometShuffleExchangeExec {
// end up being almost the same regardless of the index. substantially scrambling the
// seed by hashing will help. Refer to SPARK-21782 for more details.
val partitionId = TaskContext.get().partitionId()
var position = new XORShiftRandom(partitionId).nextInt(numPartitions)
var position = new XORShiftRandom(partitionId.toLong).nextInt(numPartitions)
(_: InternalRow) => {
// The HashPartitioner will handle the `mod` by the number of partitions
position += 1
Expand Down Expand Up @@ -432,7 +432,7 @@ object CometShuffleExchangeExec extends ShimCometShuffleExchangeExec {
row: InternalRow): UnsafeExternalRowSorter.PrefixComputer.Prefix = {
// The hashcode generated from the binary form of a [[UnsafeRow]] should not be null.
result.isNull = false
result.value = row.hashCode()
result.value = row.hashCode().toLong
result
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1162,7 +1162,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
def makeDecimalRDD(num: Int, decimal: DecimalType, useDictionary: Boolean): DataFrame = {
val div = if (useDictionary) 5 else num // narrow the space to make it dictionary encoded
spark
.range(num)
.range(num.toLong)
.map(_ % div)
// Parquet doesn't allow column names with spaces, have to add an alias here.
// Minus 500 here so that negative decimals are also tested.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1801,7 +1801,7 @@ class CometExecSuite extends CometTestBase {
withTable("t1") {
val numRows = 10
spark
.range(numRows)
.range(numRows.toLong)
.selectExpr("if (id % 2 = 0, null, id) AS a", s"$numRows - id AS b")
.repartition(3) // Force repartition to test data will come to single partition
.write
Expand Down Expand Up @@ -1838,7 +1838,7 @@ class CometExecSuite extends CometTestBase {
withTable("t1") {
val numRows = 10
spark
.range(numRows)
.range(numRows.toLong)
.selectExpr("if (id % 2 = 0, null, id) AS a", s"$numRows - id AS b")
.repartition(3) // Force repartition to test data will come to single partition
.write
Expand Down Expand Up @@ -1869,7 +1869,7 @@ class CometExecSuite extends CometTestBase {
withTable("t1") {
val numRows = 10
spark
.range(numRows)
.range(numRows.toLong)
.selectExpr("if (id % 2 = 0, null, id) AS a", s"$numRows - id AS b")
.repartition(3) // Force repartition to test data will come to single partition
.write
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,15 +416,15 @@ abstract class ParquetReadSuite extends CometTestBase {
opt match {
case Some(i) =>
record.add(0, i % 2 == 0)
record.add(1, i.toByte)
record.add(2, i.toShort)
record.add(1, i.toByte.toInt)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its probably can be directly casted to int?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe overflow behaviour will be different and lead to unexpected and incorrect results

record.add(2, i.toShort.toInt)
record.add(3, i)
record.add(4, i.toLong)
record.add(5, i.toFloat)
record.add(6, i.toDouble)
record.add(7, i.toString * 48)
record.add(8, (-i).toByte)
record.add(9, (-i).toShort)
record.add(8, (-i).toByte.toInt)
record.add(9, (-i).toShort.toInt)
record.add(10, -i)
record.add(11, (-i).toLong)
record.add(12, i.toString)
Expand Down Expand Up @@ -639,8 +639,8 @@ abstract class ParquetReadSuite extends CometTestBase {
opt match {
case Some(i) =>
record.add(0, i % 2 == 0)
record.add(1, i.toByte)
record.add(2, i.toShort)
record.add(1, i.toByte.toInt)
record.add(2, i.toShort.toInt)
record.add(3, i)
record.add(4, i.toLong)
record.add(5, i.toFloat)
Expand Down Expand Up @@ -1575,15 +1575,15 @@ abstract class ParquetReadSuite extends CometTestBase {
opt match {
case Some(i) =>
record.add(0, i % 2 == 0)
record.add(1, i.toByte)
record.add(2, i.toShort)
record.add(1, i.toByte.toInt)
record.add(2, i.toShort.toInt)
record.add(3, i)
record.add(4, i.toLong)
record.add(5, i.toFloat)
record.add(6, i.toDouble)
record.add(7, i.toString * 48)
record.add(8, (-i).toByte)
record.add(9, (-i).toShort)
record.add(8, (-i).toByte.toInt)
record.add(9, (-i).toShort.toInt)
record.add(10, -i)
record.add(11, (-i).toLong)
record.add(12, i.toString)
Expand Down Expand Up @@ -1672,7 +1672,7 @@ abstract class ParquetReadSuite extends CometTestBase {
val record = new SimpleGroup(schema)
opt match {
case Some(i) =>
record.add(0, i.toShort)
record.add(0, i.toShort.toInt)
record.add(1, i)
record.add(2, i.toLong)
case _ =>
Expand Down Expand Up @@ -1765,7 +1765,7 @@ abstract class ParquetReadSuite extends CometTestBase {
}

private def withId(id: Int) =
new MetadataBuilder().putLong(ParquetUtils.FIELD_ID_METADATA_KEY, id).build()
new MetadataBuilder().putLong(ParquetUtils.FIELD_ID_METADATA_KEY, id.toLong).build()

// Based on Spark ParquetIOSuite.test("vectorized reader: array of nested struct")
test("array of nested struct with and without field id") {
Expand Down
31 changes: 16 additions & 15 deletions spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package org.apache.spark.sql

import java.util.concurrent.atomic.AtomicInteger

import scala.annotation.nowarn
import scala.concurrent.duration._
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag
Expand Down Expand Up @@ -425,6 +426,7 @@ abstract class CometTestBase
case None => f(spark.read.format("parquet").load(path))
}

@nowarn("cat=deprecation")
protected def createParquetWriter(
schema: MessageType,
path: Path,
Expand All @@ -434,7 +436,6 @@ abstract class CometTestBase
pageRowCountLimit: Int = ParquetProperties.DEFAULT_PAGE_ROW_COUNT_LIMIT,
rowGroupSize: Long = 1024 * 1024L): ParquetWriter[Group] = {
val hadoopConf = spark.sessionState.newHadoopConf()

ExampleParquetWriter
.builder(path)
.withDictionaryEncoding(dictionaryEnabled)
Expand Down Expand Up @@ -557,15 +558,15 @@ abstract class CometTestBase
opt match {
case Some(i) =>
record.add(0, i % 2 == 0)
record.add(1, i.toByte)
record.add(2, i.toShort)
record.add(1, i.toByte.toInt)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same, can be probably converted directly

record.add(2, i.toShort.toInt)
record.add(3, i)
record.add(4, i.toLong)
record.add(5, i.toFloat)
record.add(6, i.toDouble)
record.add(7, i.toString * 48)
record.add(8, (-i).toByte)
record.add(9, (-i).toShort)
record.add(8, (-i).toByte.toInt)
record.add(9, (-i).toShort.toInt)
record.add(10, -i)
record.add(11, (-i).toLong)
record.add(12, i.toString)
Expand All @@ -586,15 +587,15 @@ abstract class CometTestBase
val i = rand.nextLong()
val record = new SimpleGroup(schema)
record.add(0, i % 2 == 0)
record.add(1, i.toByte)
record.add(2, i.toShort)
record.add(1, i.toByte.toInt)
record.add(2, i.toShort.toInt)
record.add(3, i.toInt)
record.add(4, i)
record.add(5, java.lang.Float.intBitsToFloat(i.toInt))
record.add(6, java.lang.Double.longBitsToDouble(i))
record.add(7, i.toString * 24)
record.add(8, (-i).toByte)
record.add(9, (-i).toShort)
record.add(8, (-i).toByte.toInt)
record.add(9, (-i).toShort.toInt)
record.add(10, (-i).toInt)
record.add(11, -i)
record.add(12, i.toString)
Expand Down Expand Up @@ -643,7 +644,7 @@ abstract class CometTestBase
if (rand.nextBoolean()) {
None
} else {
Some(getValue(i, div))
Some(getValue(i.toLong, div.toLong))
}
}
expected.foreach { opt =>
Expand Down Expand Up @@ -697,7 +698,7 @@ abstract class CometTestBase
if (rand.nextBoolean()) {
None
} else {
Some(getValue(i, div))
Some(getValue(i.toLong, div.toLong))
}
}
expected.foreach { opt =>
Expand Down Expand Up @@ -875,7 +876,7 @@ abstract class CometTestBase
val div = if (dictionaryEnabled) 10 else n // maps value to a small range for dict to kick in

val expected = (0 until n).map { i =>
Some(getValue(i, div))
Some(getValue(i.toLong, div.toLong))
}
expected.foreach { opt =>
val timestampFormats = List(
Expand Down Expand Up @@ -923,7 +924,7 @@ abstract class CometTestBase
def makeDecimalRDD(num: Int, decimal: DecimalType, useDictionary: Boolean): DataFrame = {
val div = if (useDictionary) 5 else num // narrow the space to make it dictionary encoded
spark
.range(num)
.range(num.toLong)
.map(_ % div)
// Parquet doesn't allow column names with spaces, have to add an alias here.
// Minus 500 here so that negative decimals are also tested.
Expand Down Expand Up @@ -1103,8 +1104,8 @@ abstract class CometTestBase
val record = new SimpleGroup(schema)
opt match {
case Some(i) =>
record.add(0, i.toByte)
record.add(1, i.toShort)
record.add(0, i.toByte.toInt)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

record.add(1, i.toShort.toInt)
record.add(2, i)
record.add(3, i.toLong)
record.add(4, rand.nextFloat())
Expand Down
Loading
Loading