diff --git a/core/pom.xml b/core/pom.xml
index a5f2e756da0..93159995bac 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -112,7 +112,7 @@
io.netty
netty-all
- 4.1.17.Final
+ 4.1.74.Final
org.lz4
diff --git a/examples/flink/pom.xml b/examples/flink/pom.xml
index 360fcff72b6..904b84d9f15 100644
--- a/examples/flink/pom.xml
+++ b/examples/flink/pom.xml
@@ -54,7 +54,7 @@
io.netty
netty-all
- 4.1.17.Final
+ 4.1.74.Final
org.apache.carbondata
diff --git a/examples/spark/pom.xml b/examples/spark/pom.xml
index 64f0d9f20c4..fc5ce57ce67 100644
--- a/examples/spark/pom.xml
+++ b/examples/spark/pom.xml
@@ -63,7 +63,7 @@
io.netty
netty-all
- 4.1.17.Final
+ 4.1.74.Final
org.alluxio
@@ -200,9 +200,6 @@
spark-2.3
-
- true
-
2.3
@@ -214,10 +211,13 @@
- spark-3.1
+ spark-3.3
+
+ true
+
- 3.1
- 2.10.0
+ 3.3
+ 2.13.3
diff --git a/index/examples/pom.xml b/index/examples/pom.xml
index bccbae1bc39..b84aa468044 100644
--- a/index/examples/pom.xml
+++ b/index/examples/pom.xml
@@ -92,9 +92,9 @@
- spark-3.1
+ spark-3.3
- 3.1
+ 3.3
diff --git a/index/secondary-index/pom.xml b/index/secondary-index/pom.xml
index b1a2414ffc3..b404ef36aa1 100644
--- a/index/secondary-index/pom.xml
+++ b/index/secondary-index/pom.xml
@@ -158,9 +158,6 @@
spark-2.3
-
- true
-
2.3
@@ -172,9 +169,12 @@
- spark-3.1
+ spark-3.3
+
+ true
+
- 3.1
+ 3.3
diff --git a/integration/flink/pom.xml b/integration/flink/pom.xml
index 5eca6fbbdc0..a420c2e4dfd 100644
--- a/integration/flink/pom.xml
+++ b/integration/flink/pom.xml
@@ -212,7 +212,7 @@
io.netty
netty-all
- 4.1.17.Final
+ 4.1.74.Final
test
@@ -220,9 +220,6 @@
spark-2.3
-
- true
-
2.3
@@ -262,9 +259,12 @@
- spark-3.1
+ spark-3.3
+
+ true
+
- 3.1
+ 3.3
diff --git a/integration/presto/pom.xml b/integration/presto/pom.xml
index b317060f5f5..693ba8f4711 100644
--- a/integration/presto/pom.xml
+++ b/integration/presto/pom.xml
@@ -34,7 +34,7 @@
4.4.9
${basedir}/../../dev
true
- 2.10.0
+ 2.13.3
0.193
diff --git a/integration/spark-common-cluster-test/pom.xml b/integration/spark-common-cluster-test/pom.xml
index c520ef73dde..9811518fd77 100644
--- a/integration/spark-common-cluster-test/pom.xml
+++ b/integration/spark-common-cluster-test/pom.xml
@@ -65,13 +65,13 @@
io.netty
netty
- 3.9.9.Final
+ 4.1.74.Final
test
io.netty
netty-all
- 4.1.17.Final
+ 4.1.74.Final
test
diff --git a/integration/spark/pom.xml b/integration/spark/pom.xml
index 6d278fcec12..c9bdfea4dba 100644
--- a/integration/spark/pom.xml
+++ b/integration/spark/pom.xml
@@ -608,9 +608,6 @@
spark-2.3
-
- true
-
2.3
@@ -621,9 +618,9 @@
maven-compiler-plugin
- src/main/spark3.1
+ src/main/spark3.3
src/main/spark2.4
- src/main/common2.4and3.1
+ src/main/common2.4and3.3
@@ -662,7 +659,7 @@
maven-compiler-plugin
- src/main/spark3.1
+ src/main/spark3.3
src/main/spark2.3
@@ -682,7 +679,7 @@
src/main/spark2.4
src/main/common2.3and2.4
- src/main/common2.4and3.1
+ src/main/common2.4and3.3
@@ -692,9 +689,12 @@
- spark-3.1
+ spark-3.3
+
+ true
+
- 3.1
+ 3.3
@@ -722,8 +722,8 @@
- src/main/spark3.1
- src/main/common2.4and3.1
+ src/main/spark3.3
+ src/main/common2.4and3.3
diff --git a/integration/spark/src/main/common2.3and2.4/org/apache/spark/sql/SparkVersionAdapter.scala b/integration/spark/src/main/common2.3and2.4/org/apache/spark/sql/SparkVersionAdapter.scala
index e5cb5e0b13b..61e9a556b53 100644
--- a/integration/spark/src/main/common2.3and2.4/org/apache/spark/sql/SparkVersionAdapter.scala
+++ b/integration/spark/src/main/common2.3and2.4/org/apache/spark/sql/SparkVersionAdapter.scala
@@ -18,35 +18,39 @@
package org.apache.spark.sql
import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
import org.antlr.v4.runtime.tree.TerminalNode
import org.apache.spark.{SparkContext, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.Serializer
+import org.apache.spark.sql.carbondata.execution.datasources.tasklisteners.CarbonLoadTaskCompletionListener
import org.apache.spark.sql.catalyst.{CarbonParserUtil, InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{Analyzer, UnresolvedRelation}
+import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.encoders.RowEncoder
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSeq, Expression, InterpretedPredicate, NamedExpression, SortOrder}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSeq, Expression, InterpretedPredicate, NamedExpression, SortOrder, UnaryExpression}
import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate
import org.apache.spark.sql.catalyst.parser.ParserUtils.operationNotAllowed
import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{BucketSpecContext, ColTypeListContext, CreateTableHeaderContext, LocationSpecContext, QueryContext, SkewSpecContext, TablePropertyListContext}
import org.apache.spark.sql.catalyst.plans.{logical, JoinType, QueryPlan}
-import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Join, LogicalPlan, OneRowRelation}
+import org.apache.spark.sql.catalyst.plans.logical.{Command, InsertIntoTable, Join, LogicalPlan, OneRowRelation}
import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.{QueryExecution, ShuffledRowRDD, SparkPlan, SQLExecution, UnaryExecNode}
-import org.apache.spark.sql.execution.command.{ExplainCommand, Field, PartitionerField, TableModel, TableNewProcessor}
+import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataCommand, DataWritingCommand, ExplainCommand, Field, MetadataCommand, PartitionerField, RunnableCommand, ShowPartitionsCommand, TableModel, TableNewProcessor}
import org.apache.spark.sql.execution.command.table.{CarbonCreateTableAsSelectCommand, CarbonCreateTableCommand}
-import org.apache.spark.sql.execution.datasources.{CreateTable, DataSourceStrategy, RefreshTable}
+import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, DataSourceStrategy, FilePartition, FileScanRDD, OutputWriter, PartitionedFile, RefreshTable}
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide}
import org.apache.spark.sql.execution.strategy.CarbonDataSourceScan
import org.apache.spark.sql.internal.{SessionState, SharedState}
import org.apache.spark.sql.parser.CarbonSpark2SqlParser
import org.apache.spark.sql.parser.CarbonSparkSqlParserUtil.{checkIfDuplicateColumnExists, convertDbNameToLowerCase, validateStreamingProperty}
-import org.apache.spark.sql.sources.Filter
-import org.apache.spark.sql.types.{DataType, StructField}
+import org.apache.spark.sql.sources.{BaseRelation, Filter}
+import org.apache.spark.sql.types.{DataType, StructField, StructType}
import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.TaskCompletionListener
import org.apache.carbondata.common.exceptions.DeprecatedFeatureException
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
@@ -57,6 +61,7 @@ import org.apache.carbondata.core.metadata.datatype.DataTypes
import org.apache.carbondata.core.metadata.schema.SchemaReader
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.mv.plans.modular.ModularPlan
import org.apache.carbondata.spark.CarbonOption
import org.apache.carbondata.spark.util.CarbonScalaUtil
@@ -451,6 +456,56 @@ trait SparkVersionAdapter {
case others => others
}
}
+
+ def showPartitionsCommand(spec: Option[TablePartitionSpec],
+ showPartitionsCommand: ShowPartitionsCommand): ShowPartitionsCommand = {
+ ShowPartitionsCommand(showPartitionsCommand.tableName, spec)
+ }
+
+ def invokeWriteAndReadMethod(dataSourceObj: DataSource,
+ dataFrame: DataFrame,
+ data: LogicalPlan,
+ session: SparkSession,
+ mode: SaveMode,
+ query: LogicalPlan,
+ physicalPlan: SparkPlan): BaseRelation = {
+ dataSourceObj.writeAndRead(mode, query, query.output.map(_.name), physicalPlan)
+ }
+
+ /**
+ * Parse a key-value map from a [[TablePropertyListContext]], assuming all values are specified.
+ *
+ * @param ctx Instance of TablePropertyListContext defining parser rule for the table
+ * properties.
+ * @param props
- spark-3.1
+ spark-3.3
+
+ true
+
- 3.1
+ 3.3
@@ -240,7 +240,7 @@
src/main/spark2.3
src/main/spark2.4
- src/main/common2.3and2.4
+ src/main/common2.3and2.4
@@ -257,7 +257,7 @@
- src/main/spark3.1
+ src/main/spark3.3
diff --git a/mv/plan/src/main/common2.3and2.4/org/apache/carbondata/mv/plans/modular/SparkVersionHelper.scala b/mv/plan/src/main/common2.3and2.4/org/apache/carbondata/mv/plans/modular/SparkVersionHelper.scala
index 78002f1198a..3cd8974a505 100644
--- a/mv/plan/src/main/common2.3and2.4/org/apache/carbondata/mv/plans/modular/SparkVersionHelper.scala
+++ b/mv/plan/src/main/common2.3and2.4/org/apache/carbondata/mv/plans/modular/SparkVersionHelper.scala
@@ -17,7 +17,7 @@
package org.apache.carbondata.mv.plans.modular
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference, AttributeSeq, Expression, ExprId, NamedExpression, SubqueryExpression}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference, AttributeSeq, Exists, Expression, ExprId, ListQuery, NamedExpression, Predicate, SubqueryExpression, Unevaluable}
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.optimizer.{BooleanSimplification, CollapseProject, CollapseRepartition, CollapseWindow, ColumnPruning, CombineFilters, CombineLimits, CombineUnions, ConstantFolding, EliminateOuterJoin, EliminateSerialization, EliminateSorts, FoldablePropagation, NullPropagation, PushDownPredicate, PushPredicateThroughJoin, PushProjectionThroughUnion, RemoveDispensableExpressions, RemoveRedundantAliases, RemoveRedundantProject, ReorderAssociativeOperator, ReorderJoin, RewriteCorrelatedScalarSubquery, SimplifyBinaryComparison, SimplifyCaseConversionExpressions, SimplifyCasts, SimplifyConditionals}
import org.apache.spark.sql.catalyst.plans.{logical, JoinType, QueryPlan}
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Join, LogicalPla
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.types.{DataType, Metadata}
+import org.apache.carbondata.mv.expressions.modular.ModularSubquery
import org.apache.carbondata.mv.plans.util.BirdcageOptimizer
@@ -114,6 +115,15 @@ trait UnionModularPlan extends ModularPlan {
trait OneRowTableLeafNode extends LeafNode {
}
+trait ModularRelationHarmonizedRelation extends GetVerboseString {
+}
+
+trait MVModularRelation extends GetVerboseString {
+}
+
+trait MVModularizeLater extends LeafNode {
+}
+
object MatchJoin {
def unapply(plan : LogicalPlan): Option[(LogicalPlan, LogicalPlan, JoinType, Option[Expression],
Option[Any])] = {
@@ -135,3 +145,101 @@ object MatchAggregateExpression {
}
}
}
+
+/**
+ * The [[Exists]] expression checks if a row exists in a subquery given some correlated condition.
+ *
+ * For example (SQL):
+ * {{{
+ * SELECT *
+ * FROM a
+ * WHERE EXISTS (SELECT *
+ * FROM b
+ * WHERE b.id = a.id)
+ * }}}
+ */
+case class ModularExists(
+ plan: ModularPlan,
+ children: Seq[Expression] = Seq.empty,
+ exprId: ExprId = NamedExpression.newExprId)
+ extends ModularSubquery(plan, children, exprId) with Predicate with Unevaluable {
+ override def nullable: Boolean = false
+
+ override def withNewPlan(plan: ModularPlan): ModularExists = copy(plan = plan)
+
+ override def toString: String = s"modular-exists#${exprId.id} $conditionString"
+
+ override lazy val canonicalized: Expression = {
+ ModularExists(
+ plan.canonicalizedDef,
+ children.map(_.canonicalized),
+ ExprId(0))
+ }
+}
+
+/**
+ * A [[ListQuery]] expression defines the query which we want to search in an IN subquery
+ * expression. It should and can only be used in conjunction with an IN expression.
+ *
+ * For example (SQL):
+ * {{{
+ * SELECT *
+ * FROM a
+ * WHERE a.id IN (SELECT id
+ * FROM b)
+ * }}}
+ */
+case class ModularListQuery(
+ plan: ModularPlan,
+ children: Seq[Expression] = Seq.empty,
+ exprId: ExprId = NamedExpression.newExprId)
+ extends ModularSubquery(plan, children, exprId) with Unevaluable {
+ override def dataType: DataType = plan.schema.fields.head.dataType
+
+ override def nullable: Boolean = false
+
+ override def withNewPlan(plan: ModularPlan): ModularListQuery = copy(plan = plan)
+
+ override def toString: String = s"modular-list#${exprId.id} $conditionString"
+
+ override lazy val canonicalized: Expression = {
+ ModularListQuery(
+ plan.canonicalizedDef,
+ children.map(_.canonicalized),
+ ExprId(0))
+ }
+}
+
+/**
+ * A subquery that will return only one row and one column. This will be converted into a physical
+ * scalar subquery during planning.
+ *
+ * Note: `exprId` is used to have a unique name in explain string output.
+ */
+case class ScalarModularSubquery(
+ plan: ModularPlan,
+ children: Seq[Expression] = Seq.empty,
+ exprId: ExprId = NamedExpression.newExprId)
+ extends ModularSubquery(plan, children, exprId) with Unevaluable {
+ override def dataType: DataType = plan.schema.fields.head.dataType
+
+ override def nullable: Boolean = true
+
+ override def withNewPlan(plan: ModularPlan): ScalarModularSubquery = copy(plan = plan)
+
+ override def toString: String = s"scalar-modular-subquery#${ exprId.id } $conditionString"
+
+ override lazy val canonicalized: Expression = {
+ ScalarModularSubquery(
+ plan.canonicalizedDef,
+ children.map(_.canonicalized),
+ ExprId(0))
+ }
+
+ def hasCorrelatedScalarSubquery(e: Expression): Boolean = {
+ e.find {
+ case s: ScalarModularSubquery => s.children.nonEmpty
+ case _ => false
+ }.isDefined
+ }
+}
diff --git a/mv/plan/src/main/scala/org/apache/carbondata/mv/expressions/modular/subquery.scala b/mv/plan/src/main/scala/org/apache/carbondata/mv/expressions/modular/subquery.scala
index 43551bba3c2..a04308dd349 100644
--- a/mv/plan/src/main/scala/org/apache/carbondata/mv/expressions/modular/subquery.scala
+++ b/mv/plan/src/main/scala/org/apache/carbondata/mv/expressions/modular/subquery.scala
@@ -39,16 +39,6 @@ abstract class ModularSubquery(
override def withNewPlan(plan: ModularPlan): ModularSubquery
- override def semanticEquals(o: Expression): Boolean = {
- o match {
- case p: ModularSubquery =>
- this.getClass.getName.equals(p.getClass.getName) && plan.sameResult(p.plan) &&
- children.length == p.children.length &&
- children.zip(p.children).forall(p => p._1.semanticEquals(p._2))
- case _ => false
- }
- }
-
def canonicalize(attrs: AttributeSeq): ModularSubquery = {
// Normalize the outer references in the subquery plan.
val normalizedPlan = plan.transformAllExpressions {
@@ -59,105 +49,6 @@ abstract class ModularSubquery(
}
}
-/**
- * A subquery that will return only one row and one column. This will be converted into a physical
- * scalar subquery during planning.
- *
- * Note: `exprId` is used to have a unique name in explain string output.
- */
-case class ScalarModularSubquery(
- plan: ModularPlan,
- children: Seq[Expression] = Seq.empty,
- exprId: ExprId = NamedExpression.newExprId)
- extends ModularSubquery(plan, children, exprId) with Unevaluable {
- override def dataType: DataType = plan.schema.fields.head.dataType
-
- override def nullable: Boolean = true
-
- override def withNewPlan(plan: ModularPlan): ScalarModularSubquery = copy(plan = plan)
-
- override def toString: String = s"scalar-modular-subquery#${ exprId.id } $conditionString"
-
- override lazy val canonicalized: Expression = {
- ScalarModularSubquery(
- plan.canonicalizedDef,
- children.map(_.canonicalized),
- ExprId(0))
- }
-}
-
-object ScalarModularSubquery {
- def hasCorrelatedScalarSubquery(e: Expression): Boolean = {
- e.find {
- case s: ScalarModularSubquery => s.children.nonEmpty
- case _ => false
- }.isDefined
- }
-}
-
-/**
- * A [[ListQuery]] expression defines the query which we want to search in an IN subquery
- * expression. It should and can only be used in conjunction with an IN expression.
- *
- * For example (SQL):
- * {{{
- * SELECT *
- * FROM a
- * WHERE a.id IN (SELECT id
- * FROM b)
- * }}}
- */
-case class ModularListQuery(
- plan: ModularPlan,
- children: Seq[Expression] = Seq.empty,
- exprId: ExprId = NamedExpression.newExprId)
- extends ModularSubquery(plan, children, exprId) with Unevaluable {
- override def dataType: DataType = plan.schema.fields.head.dataType
-
- override def nullable: Boolean = false
-
- override def withNewPlan(plan: ModularPlan): ModularListQuery = copy(plan = plan)
-
- override def toString: String = s"modular-list#${ exprId.id } $conditionString"
-
- override lazy val canonicalized: Expression = {
- ModularListQuery(
- plan.canonicalizedDef,
- children.map(_.canonicalized),
- ExprId(0))
- }
-}
-
-/**
- * The [[Exists]] expression checks if a row exists in a subquery given some correlated condition.
- *
- * For example (SQL):
- * {{{
- * SELECT *
- * FROM a
- * WHERE EXISTS (SELECT *
- * FROM b
- * WHERE b.id = a.id)
- * }}}
- */
-case class ModularExists(
- plan: ModularPlan,
- children: Seq[Expression] = Seq.empty,
- exprId: ExprId = NamedExpression.newExprId)
- extends ModularSubquery(plan, children, exprId) with Predicate with Unevaluable {
- override def nullable: Boolean = false
-
- override def withNewPlan(plan: ModularPlan): ModularExists = copy(plan = plan)
-
- override def toString: String = s"modular-exists#${ exprId.id } $conditionString"
-
- override lazy val canonicalized: Expression = {
- ModularExists(
- plan.canonicalizedDef,
- children.map(_.canonicalized),
- ExprId(0))
- }
-}
/**
* A place holder for generated SQL for subquery expression.
diff --git a/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/AggregatePushDown.scala b/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/AggregatePushDown.scala
index cbcd4248466..1cd8da6c4d3 100644
--- a/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/AggregatePushDown.scala
+++ b/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/AggregatePushDown.scala
@@ -128,7 +128,9 @@ trait AggregatePushDown { // self: ModularPlan =>
} else {
Map.empty[Int, (NamedExpression, Seq[NamedExpression])]
}
- case sum@MatchAggregateExpression(Sum(cast@MatchCast(expr, dataType)), _, false, _, _) =>
+
+ case sum@MatchAggregateExpression(Sum(cast@MatchCast(expr, dataType), _)
+ , _, false, _, _) =>
val tAttr = selAliasMap.get(expr.asInstanceOf[Attribute]).getOrElse(expr)
.asInstanceOf[Attribute]
if (fact.outputSet.contains(tAttr)) {
@@ -190,7 +192,8 @@ trait AggregatePushDown { // self: ModularPlan =>
} else {
Map.empty[Int, (NamedExpression, Seq[NamedExpression])]
}
- case avg@MatchAggregateExpression(Average(cast@MatchCast(expr, dataType)), _, false, _, _) =>
+ case avg@MatchAggregateExpression(
+ Sum(cast@MatchCast(expr, dataType), _), _, false, _, _) =>
val tAttr = selAliasMap.get(expr.asInstanceOf[Attribute]).getOrElse(expr)
.asInstanceOf[Attribute]
if (fact.outputSet.contains(tAttr)) {
diff --git a/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPatterns.scala b/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPatterns.scala
index 5fbfb9b6c7d..72a138b2d0f 100644
--- a/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPatterns.scala
+++ b/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPatterns.scala
@@ -71,7 +71,7 @@ abstract class ModularPattern extends GenericPattern[ModularPlan] {
override protected def modularizeLater(plan: LogicalPlan): ModularPlan = ModularizeLater(plan)
}
-case class ModularizeLater(plan: LogicalPlan) extends LeafNode {
+case class ModularizeLater(plan: LogicalPlan) extends MVModularizeLater {
override def output: Seq[Attribute] = plan.output
}
diff --git a/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularRelation.scala b/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularRelation.scala
index ba9eb4ba3ae..6df266acd10 100644
--- a/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularRelation.scala
+++ b/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularRelation.scala
@@ -41,7 +41,7 @@ case class ModularRelation(
tableName: String,
outputList: Seq[NamedExpression],
flags: FlagSet,
- rest: Seq[Seq[Any]]) extends GetVerboseString {
+ rest: Seq[Seq[Any]]) extends MVModularRelation {
protected override def computeStats(spark: SparkSession): Statistics = {
val plan = spark.table(s"${ databaseName }.${ tableName }").queryExecution.optimizedPlan
val stats = plan.stats
@@ -125,7 +125,7 @@ object HarmonizedRelation {
}
// support harmonization for dimension table
-case class HarmonizedRelation(source: ModularPlan) extends GetVerboseString {
+case class HarmonizedRelation(source: ModularPlan) extends ModularRelationHarmonizedRelation {
require(HarmonizedRelation.canHarmonize(source), "invalid plan for harmonized relation")
lazy val tableName = source.asInstanceOf[GroupBy].child.children(0).asInstanceOf[ModularRelation]
.tableName
diff --git a/mv/plan/src/main/spark3.1/org/apache/carbondata/mv/plans/modular/ExpressionHelper.scala b/mv/plan/src/main/spark3.3/org/apache/carbondata/mv/plans/modular/ExpressionHelper.scala
similarity index 100%
rename from mv/plan/src/main/spark3.1/org/apache/carbondata/mv/plans/modular/ExpressionHelper.scala
rename to mv/plan/src/main/spark3.3/org/apache/carbondata/mv/plans/modular/ExpressionHelper.scala
diff --git a/mv/plan/src/main/spark3.1/org/apache/carbondata/mv/plans/modular/SparkVersionHelper.scala b/mv/plan/src/main/spark3.3/org/apache/carbondata/mv/plans/modular/SparkVersionHelper.scala
similarity index 57%
rename from mv/plan/src/main/spark3.1/org/apache/carbondata/mv/plans/modular/SparkVersionHelper.scala
rename to mv/plan/src/main/spark3.3/org/apache/carbondata/mv/plans/modular/SparkVersionHelper.scala
index e52efc9d688..8d0b9a44d8d 100644
--- a/mv/plan/src/main/spark3.1/org/apache/carbondata/mv/plans/modular/SparkVersionHelper.scala
+++ b/mv/plan/src/main/spark3.3/org/apache/carbondata/mv/plans/modular/SparkVersionHelper.scala
@@ -19,13 +19,15 @@ package org.apache.carbondata.mv.plans.modular
import scala.reflect.ClassTag
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeSeq, Expression, ExprId, NamedExpression, SubqueryExpression}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeSeq, Exists, Expression, ExprId, ListQuery, NamedExpression, Predicate, SubqueryExpression, Unevaluable}
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, AggregateFunction, AggregateMode}
import org.apache.spark.sql.catalyst.optimizer.{BooleanSimplification, CollapseProject, CollapseRepartition, CollapseWindow, ColumnPruning, CombineFilters, CombineUnions, ConstantFolding, EliminateLimits, EliminateOuterJoin, EliminateSerialization, EliminateSorts, FoldablePropagation, NullPropagation, PushDownPredicates, PushPredicateThroughJoin, PushProjectionThroughUnion, RemoveDispensableExpressions, RemoveRedundantAliases, ReorderAssociativeOperator, ReorderJoin, RewriteCorrelatedScalarSubquery, SimplifyBinaryComparison, SimplifyCaseConversionExpressions, SimplifyCasts, SimplifyConditionals}
import org.apache.spark.sql.catalyst.plans.{logical, JoinType, QueryPlan}
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Join, LogicalPlan, Statistics, Subquery}
import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.types.DataType
+import org.apache.carbondata.mv.expressions.modular.ModularSubquery
import org.apache.carbondata.mv.plans.util.BirdcageOptimizer
object SparkVersionHelper {
@@ -116,6 +118,9 @@ trait GroupByUnaryNode extends UnaryNode {
}
groupBy
}
+
+ override protected def withNewChildrenInternal(
+ newChildren: IndexedSeq[ModularPlan]): ModularPlan = newChildren.head
}
trait SelectModularPlan extends ModularPlan {
@@ -132,14 +137,38 @@ trait SelectModularPlan extends ModularPlan {
}
select
}
+
+ override protected def withNewChildrenInternal(
+ newChildren: IndexedSeq[ModularPlan]): ModularPlan = newChildren.head
}
trait UnionModularPlan extends ModularPlan {
override def verboseString(maxFields: Int): String = super.verboseString(maxFields)
+
+ override protected def withNewChildrenInternal(
+ newChildren: IndexedSeq[ModularPlan]): ModularPlan = newChildren.head
}
trait OneRowTableLeafNode extends LeafNode {
override def verboseString(maxFields: Int): String = super.verboseString(maxFields)
+
+ override protected def withNewChildrenInternal(
+ newChildren: IndexedSeq[ModularPlan]): ModularPlan = newChildren.head
+}
+
+trait ModularRelationHarmonizedRelation extends GetVerboseString {
+ override protected def withNewChildrenInternal(
+ newChildren: IndexedSeq[ModularPlan]): ModularPlan = newChildren.head
+}
+
+trait MVModularRelation extends GetVerboseString {
+ override protected def withNewChildrenInternal(
+ newChildren: IndexedSeq[ModularPlan]): ModularPlan = newChildren.head
+}
+
+trait MVModularizeLater extends LeafNode {
+ override protected def withNewChildrenInternal(
+ newChildren: IndexedSeq[ModularPlan]): ModularPlan = newChildren.head
}
object MatchJoin {
@@ -164,3 +193,110 @@ object MatchAggregateExpression {
}
}
}
+
+/**
+ * The [[Exists]] expression checks if a row exists in a subquery given some correlated condition.
+ *
+ * For example (SQL):
+ * {{{
+ * SELECT *
+ * FROM a
+ * WHERE EXISTS (SELECT *
+ * FROM b
+ * WHERE b.id = a.id)
+ * }}}
+ */
+case class ModularExists(
+ plan: ModularPlan,
+ children: Seq[Expression] = Seq.empty,
+ exprId: ExprId = NamedExpression.newExprId)
+ extends ModularSubquery(plan, children, exprId) with Predicate with Unevaluable {
+ override def nullable: Boolean = false
+
+ override def withNewPlan(plan: ModularPlan): ModularExists = copy(plan = plan)
+
+ override def toString: String = s"modular-exists#${ exprId.id } $conditionString"
+
+ override lazy val canonicalized: Expression = {
+ ModularExists(
+ plan.canonicalizedDef,
+ children.map(_.canonicalized),
+ ExprId(0))
+ }
+
+ override protected def withNewChildrenInternal(
+ newChildren: IndexedSeq[Expression]): Expression = newChildren.head
+}
+
+/**
+ * A [[ListQuery]] expression defines the query which we want to search in an IN subquery
+ * expression. It should and can only be used in conjunction with an IN expression.
+ *
+ * For example (SQL):
+ * {{{
+ * SELECT *
+ * FROM a
+ * WHERE a.id IN (SELECT id
+ * FROM b)
+ * }}}
+ */
+case class ModularListQuery(
+ plan: ModularPlan,
+ children: Seq[Expression] = Seq.empty,
+ exprId: ExprId = NamedExpression.newExprId)
+ extends ModularSubquery(plan, children, exprId) with Unevaluable {
+ override def dataType: DataType = plan.schema.fields.head.dataType
+
+ override def nullable: Boolean = false
+
+ override def withNewPlan(plan: ModularPlan): ModularListQuery = copy(plan = plan)
+
+ override def toString: String = s"modular-list#${ exprId.id } $conditionString"
+
+ override lazy val canonicalized: Expression = {
+ ModularListQuery(
+ plan.canonicalizedDef,
+ children.map(_.canonicalized),
+ ExprId(0))
+ }
+
+ override protected def withNewChildrenInternal(
+ newChildren: IndexedSeq[Expression]): Expression = newChildren.head
+}
+
+/**
+ * A subquery that will return only one row and one column. This will be converted into a physical
+ * scalar subquery during planning.
+ *
+ * Note: `exprId` is used to have a unique name in explain string output.
+ */
+case class ScalarModularSubquery(
+ plan: ModularPlan,
+ children: Seq[Expression] = Seq.empty,
+ exprId: ExprId = NamedExpression.newExprId)
+ extends ModularSubquery(plan, children, exprId) with Unevaluable {
+ override def dataType: DataType = plan.schema.fields.head.dataType
+
+ override def nullable: Boolean = true
+
+ override def withNewPlan(plan: ModularPlan): ScalarModularSubquery = copy(plan = plan)
+
+ override def toString: String = s"scalar-modular-subquery#${ exprId.id } $conditionString"
+
+ override lazy val canonicalized: Expression = {
+ ScalarModularSubquery(
+ plan.canonicalizedDef,
+ children.map(_.canonicalized),
+ ExprId(0))
+ }
+
+ def hasCorrelatedScalarSubquery(e: Expression): Boolean = {
+ e.find {
+ case s: ScalarModularSubquery => s.children.nonEmpty
+ case _ => false
+ }.isDefined
+ }
+
+ override protected def withNewChildrenInternal(
+ newChildren: IndexedSeq[Expression]): Expression = newChildren.head
+}
diff --git a/pom.xml b/pom.xml
index 69dca97237d..57fcbde6f30 100644
--- a/pom.xml
+++ b/pom.xml
@@ -126,11 +126,11 @@
2.7.2
4.3.4
4.3-alpha1
- 2.11
- 2.11.8
+ 2.12
+ 2.12.8
compile
- 2.3.4
- 2.3
+ 3.3.0
+ 3.3
4.8
compile
compile
@@ -454,7 +454,7 @@
${basedir}/src/main/scala
${basedir}/src/main/spark${spark.binary.version}
${basedir}/src/main/common2.3and2.4
- ${basedir}/src/main/common2.4and3.1
+ ${basedir}/src/main/common2.4and3.3
${basedir}/src/test/scala
scalastyle-config.xml
@@ -574,9 +574,6 @@
spark-2.3
-
- true
-
2.3
2.3.4
@@ -705,7 +702,7 @@
${basedir}/integration/spark/src/main/scala
${basedir}/integration/spark/src/main/spark2.4
${basedir}/integration/spark/src/main/common2.3and2.4
- ${basedir}/integration/spark/src/main/common2.4and3.1
+ ${basedir}/integration/spark/src/main/common2.4and3.3
${basedir}/integration/spark/src/main/java
${basedir}/integration/hive/src/main/scala
${basedir}/integration/hive/src/main/java
@@ -723,13 +720,16 @@
- spark-3.1
+ spark-3.3
+
+ true
+
- 3.1
- 3.1.1
+ 3.3
+ 3.3.0
2.12
2.12.8
- 2.10.0
+ 2.13.3
@@ -778,8 +778,8 @@
${basedir}/processing/src/main/java
${basedir}/hadoop/src/main/java
${basedir}/integration/spark/src/main/scala
- ${basedir}/integration/spark/src/main/spark3.1
- ${basedir}/integration/spark/src/main/common2.4and3.1
+ ${basedir}/integration/spark/src/main/spark3.3
+ ${basedir}/integration/spark/src/main/common2.4and3.3
${basedir}/integration/spark/src/main/java
${basedir}/integration/hive/src/main/scala
${basedir}/integration/hive/src/main/java
diff --git a/streaming/pom.xml b/streaming/pom.xml
index 3889bf1b80e..a7885ca9c6f 100644
--- a/streaming/pom.xml
+++ b/streaming/pom.xml
@@ -134,9 +134,6 @@
spark-2.3
-
- true
-
2.3
@@ -147,7 +144,7 @@
maven-compiler-plugin
- src/main/spark3.1
+ src/main/spark3.3
@@ -185,7 +182,7 @@
maven-compiler-plugin
- src/main/spark3.1
+ src/main/spark3.3
@@ -212,10 +209,13 @@
- spark-3.1
+ spark-3.3
+
+ true
+
- 3.1.1
- 3.1
+ 3.3.0
+ 3.3
@@ -241,7 +241,7 @@
- src/main/spark3.1
+ src/main/spark3.3
diff --git a/streaming/src/main/spark3.1/org/apache/carbondata/util/SparkStreamingUtil.scala b/streaming/src/main/spark3.3/org/apache/carbondata/util/SparkStreamingUtil.scala
similarity index 100%
rename from streaming/src/main/spark3.1/org/apache/carbondata/util/SparkStreamingUtil.scala
rename to streaming/src/main/spark3.3/org/apache/carbondata/util/SparkStreamingUtil.scala