Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Upgrade to Spark 3.3.0 #4283

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.17.Final</version>
<version>4.1.74.Final</version>
</dependency>
<dependency>
<groupId>org.lz4</groupId>
Expand Down
2 changes: 1 addition & 1 deletion examples/flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.17.Final</version>
<version>4.1.74.Final</version>
</dependency>
<dependency>
<groupId>org.apache.carbondata</groupId>
Expand Down
14 changes: 7 additions & 7 deletions examples/spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.17.Final</version>
<version>4.1.74.Final</version>
</dependency>
<dependency>
<groupId>org.alluxio</groupId>
Expand Down Expand Up @@ -200,9 +200,6 @@
</profile>
<profile>
<id>spark-2.3</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<properties>
<spark.binary.version>2.3</spark.binary.version>
</properties>
Expand All @@ -214,10 +211,13 @@
</properties>
</profile>
<profile>
<id>spark-3.1</id>
<id>spark-3.3</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<properties>
<spark.binary.version>3.1</spark.binary.version>
<dep.jackson.version>2.10.0</dep.jackson.version>
<spark.binary.version>3.3</spark.binary.version>
<dep.jackson.version>2.13.3</dep.jackson.version>
</properties>
</profile>
</profiles>
Expand Down
4 changes: 2 additions & 2 deletions index/examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,9 @@
</properties>
</profile>
<profile>
<id>spark-3.1</id>
<id>spark-3.3</id>
<properties>
<spark.binary.version>3.1</spark.binary.version>
<spark.binary.version>3.3</spark.binary.version>
</properties>
</profile>
</profiles>
Expand Down
10 changes: 5 additions & 5 deletions index/secondary-index/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,6 @@
</profile>
<profile>
<id>spark-2.3</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<properties>
<spark.binary.version>2.3</spark.binary.version>
</properties>
Expand All @@ -172,9 +169,12 @@
</properties>
</profile>
<profile>
<id>spark-3.1</id>
<id>spark-3.3</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<properties>
<spark.binary.version>3.1</spark.binary.version>
<spark.binary.version>3.3</spark.binary.version>
</properties>
</profile>
</profiles>
Expand Down
12 changes: 6 additions & 6 deletions integration/flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -212,17 +212,14 @@
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.17.Final</version>
<version>4.1.74.Final</version>
<scope>test</scope>
</dependency>
</dependencies>

<profiles>
<profile>
<id>spark-2.3</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<properties>
<spark.binary.version>2.3</spark.binary.version>
</properties>
Expand Down Expand Up @@ -262,9 +259,12 @@
</dependencies>
</profile>
<profile>
<id>spark-3.1</id>
<id>spark-3.3</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<properties>
<spark.binary.version>3.1</spark.binary.version>
<spark.binary.version>3.3</spark.binary.version>
</properties>
<dependencies>
<dependency>
Expand Down
2 changes: 1 addition & 1 deletion integration/presto/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
<httpcore.version>4.4.9</httpcore.version>
<dev.path>${basedir}/../../dev</dev.path>
<jacoco.append>true</jacoco.append>
<jackson.core.version>2.10.0</jackson.core.version>
<jackson.core.version>2.13.3</jackson.core.version>
<airlift.bootstrap.version>0.193</airlift.bootstrap.version>
</properties>

Expand Down
4 changes: 2 additions & 2 deletions integration/spark-common-cluster-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,13 @@
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
<version>3.9.9.Final</version>
<version>4.1.74.Final</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.17.Final</version>
<version>4.1.74.Final</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
22 changes: 11 additions & 11 deletions integration/spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -608,9 +608,6 @@
</profile>
<profile>
<id>spark-2.3</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<properties>
<spark.binary.version>2.3</spark.binary.version>
</properties>
Expand All @@ -621,9 +618,9 @@
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<excludes>
<exclude>src/main/spark3.1</exclude>
<exclude>src/main/spark3.3</exclude>
<exclude>src/main/spark2.4</exclude>
<exclude>src/main/common2.4and3.1</exclude>
<exclude>src/main/common2.4and3.3</exclude>
</excludes>
</configuration>
</plugin>
Expand Down Expand Up @@ -662,7 +659,7 @@
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<excludes>
<exclude>src/main/spark3.1</exclude>
<exclude>src/main/spark3.3</exclude>
<exclude>src/main/spark2.3</exclude>
</excludes>
</configuration>
Expand All @@ -682,7 +679,7 @@
<sources>
<source>src/main/spark2.4</source>
<source>src/main/common2.3and2.4</source>
<source>src/main/common2.4and3.1</source>
<source>src/main/common2.4and3.3</source>
</sources>
</configuration>
</execution>
Expand All @@ -692,9 +689,12 @@
</build>
</profile>
<profile>
<id>spark-3.1</id>
<id>spark-3.3</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<properties>
<spark.binary.version>3.1</spark.binary.version>
<spark.binary.version>3.3</spark.binary.version>
</properties>
<build>
<plugins>
Expand Down Expand Up @@ -722,8 +722,8 @@
</goals>
<configuration>
<sources>
<source>src/main/spark3.1</source>
<source>src/main/common2.4and3.1</source>
<source>src/main/spark3.3</source>
<source>src/main/common2.4and3.3</source>
</sources>
</configuration>
</execution>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,35 +18,39 @@
package org.apache.spark.sql

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.antlr.v4.runtime.tree.TerminalNode
import org.apache.spark.{SparkContext, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.Serializer
import org.apache.spark.sql.carbondata.execution.datasources.tasklisteners.CarbonLoadTaskCompletionListener
import org.apache.spark.sql.catalyst.{CarbonParserUtil, InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{Analyzer, UnresolvedRelation}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSeq, Expression, InterpretedPredicate, NamedExpression, SortOrder}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSeq, Expression, InterpretedPredicate, NamedExpression, SortOrder, UnaryExpression}
import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate
import org.apache.spark.sql.catalyst.parser.ParserUtils.operationNotAllowed
import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{BucketSpecContext, ColTypeListContext, CreateTableHeaderContext, LocationSpecContext, QueryContext, SkewSpecContext, TablePropertyListContext}
import org.apache.spark.sql.catalyst.plans.{logical, JoinType, QueryPlan}
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Join, LogicalPlan, OneRowRelation}
import org.apache.spark.sql.catalyst.plans.logical.{Command, InsertIntoTable, Join, LogicalPlan, OneRowRelation}
import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.{QueryExecution, ShuffledRowRDD, SparkPlan, SQLExecution, UnaryExecNode}
import org.apache.spark.sql.execution.command.{ExplainCommand, Field, PartitionerField, TableModel, TableNewProcessor}
import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataCommand, DataWritingCommand, ExplainCommand, Field, MetadataCommand, PartitionerField, RunnableCommand, ShowPartitionsCommand, TableModel, TableNewProcessor}
import org.apache.spark.sql.execution.command.table.{CarbonCreateTableAsSelectCommand, CarbonCreateTableCommand}
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSourceStrategy, RefreshTable}
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, DataSourceStrategy, FilePartition, FileScanRDD, OutputWriter, PartitionedFile, RefreshTable}
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide}
import org.apache.spark.sql.execution.strategy.CarbonDataSourceScan
import org.apache.spark.sql.internal.{SessionState, SharedState}
import org.apache.spark.sql.parser.CarbonSpark2SqlParser
import org.apache.spark.sql.parser.CarbonSparkSqlParserUtil.{checkIfDuplicateColumnExists, convertDbNameToLowerCase, validateStreamingProperty}
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.{DataType, StructField}
import org.apache.spark.sql.sources.{BaseRelation, Filter}
import org.apache.spark.sql.types.{DataType, StructField, StructType}
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.TaskCompletionListener

import org.apache.carbondata.common.exceptions.DeprecatedFeatureException
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
Expand All @@ -57,6 +61,7 @@ import org.apache.carbondata.core.metadata.datatype.DataTypes
import org.apache.carbondata.core.metadata.schema.SchemaReader
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.mv.plans.modular.ModularPlan
import org.apache.carbondata.spark.CarbonOption
import org.apache.carbondata.spark.util.CarbonScalaUtil

Expand Down Expand Up @@ -451,6 +456,56 @@ trait SparkVersionAdapter {
case others => others
}
}

def showPartitionsCommand(spec: Option[TablePartitionSpec],
showPartitionsCommand: ShowPartitionsCommand): ShowPartitionsCommand = {
ShowPartitionsCommand(showPartitionsCommand.tableName, spec)
}

def invokeWriteAndReadMethod(dataSourceObj: DataSource,
dataFrame: DataFrame,
data: LogicalPlan,
session: SparkSession,
mode: SaveMode,
query: LogicalPlan,
physicalPlan: SparkPlan): BaseRelation = {
dataSourceObj.writeAndRead(mode, query, query.output.map(_.name), physicalPlan)
}

/**
* Parse a key-value map from a [[TablePropertyListContext]], assuming all values are specified.
*
* @param ctx Instance of TablePropertyListContext defining parser rule for the table
* properties.
* @param props <Map[String, String]> Map of table property list
* @return <Map[String, String]> Map of transformed table property.
*/
def visitPropertyKeyValues(ctx: TablePropertyListContext,
props: Map[String, String]): Map[String, String] = {
val badKeys = props.filter { case (_, v) => v == null }.keys
if (badKeys.nonEmpty) {
operationNotAllowed(
s"Values must be specified for key(s): ${ badKeys.mkString("[", ",", "]") }", ctx)
}
props.map { case (key, value) =>
(key.toLowerCase, value)
}
}

def getFileScanRDD(spark: SparkSession, readFunction: PartitionedFile => Iterator[InternalRow],
partitions: ArrayBuffer[FilePartition]) : FileScanRDD = {
new FileScanRDD(spark, readFunction, partitions)
}

def check(context: TaskContext): Boolean = {
val onCompleteCallbacksField =
context.getClass.getDeclaredField("onCompleteCallbacks")
onCompleteCallbacksField.setAccessible(true)
val listeners = onCompleteCallbacksField.get(context)
.asInstanceOf[ArrayBuffer[TaskCompletionListener]]
listeners.exists(p => p.isInstanceOf[CarbonLoadTaskCompletionListener])
}

}

case class CarbonBuildSide(buildSide: BuildSide) {
Expand All @@ -468,3 +523,43 @@ abstract class CarbonTakeOrderedAndProjectExecHelper(sortOrder: Seq[SortOrder],
s"skipMapOrder=$skipMapOrder, readFromHead=$readFromHead, output=$outputString)"
}
}

trait CarbonMergeIntoSQLCommandCarbon extends AtomicRunnableCommand {
}

trait MvPlanWrapperCarbon extends ModularPlan {
}

trait CarbonProjectForUpdate extends LogicalPlan {
}

trait CarbonUpdateTable extends LogicalPlan {
}

trait CarbonDeleteRecords extends LogicalPlan {
}

trait CarbonInsertIntoCarbonTable extends Command {
}

trait CarbonCustomDeterministicExpression extends Expression {
}

abstract class OutputWriterCarbon(paths: String) extends OutputWriter {
}

trait CarbonCommands extends MetadataCommand{
}

trait CarbonAtomicRunnableCommands extends AtomicRunnableCommand{
}

trait CarbonDataCommands extends DataCommand {
}

trait CarbonDataWritingCommand extends DataWritingCommand {
}

trait CarbonUnaryExpression extends UnaryExpression {
}

Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf,
*/
override def visitPropertyKeyValues(ctx: TablePropertyListContext): Map[String, String] = {
val props = visitTablePropertyList(ctx)
CarbonSparkSqlParserUtil.visitPropertyKeyValues(ctx, props)
CarbonToSparkAdapter.visitPropertyKeyValues(ctx, props)
}

def getPropertyKeyValues(ctx: TablePropertyListContext): Map[String, String]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,14 @@
package org.apache.spark.sql.execution

import org.apache.log4j.Logger
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.{AnalysisException, CarbonRunnableCommand, Row, SparkSession}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, CatalogUtils}
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.util.CreateTableCommonUtil.getCatalogTable

import org.apache.carbondata.common.logging.LogServiceFactory

case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boolean)
extends RunnableCommand {
extends CarbonRunnableCommand {

val LOGGER: Logger = LogServiceFactory.getLogService(this.getClass.getName)

Expand Down
Loading