Skip to content

Commit d5b35d8

Browse files
committed
Revert "[CARBONDATA-3514] Support Spark 2.4.4 integration"
This reverts commit ba35a02.
1 parent 1a03b2d commit d5b35d8

File tree

82 files changed

+2548
-858
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

82 files changed

+2548
-858
lines changed

README.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ Visit count: [![HitCount](http://hits.dwyl.io/jackylk/apache/carbondata.svg)](ht
2828

2929

3030
## Status
31-
Spark2.3:
32-
[![Build Status](https://builds.apache.org/buildStatus/icon?job=carbondata-master-spark-2.3)](https://builds.apache.org/view/A-D/view/CarbonData/job/carbondata-master-spark-2.2/lastBuild/testReport)
31+
Spark2.2:
32+
[![Build Status](https://builds.apache.org/buildStatus/icon?job=carbondata-master-spark-2.2)](https://builds.apache.org/view/A-D/view/CarbonData/job/carbondata-master-spark-2.2/lastBuild/testReport)
3333
[![Coverage Status](https://coveralls.io/repos/github/apache/carbondata/badge.svg?branch=master)](https://coveralls.io/github/apache/carbondata?branch=master)
3434
<a href="https://scan.coverity.com/projects/carbondata">
3535
<img alt="Coverity Scan Build Status"

build/README.md

+5-3
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,11 @@
2525
* [Apache Thrift 0.9.3](http://archive.apache.org/dist/thrift/0.9.3/)
2626

2727
## Build command
28-
Build with different supported versions of Spark, by default using Spark 2.4.4
28+
Build with different supported versions of Spark, by default using Spark 2.2.1 to build
2929
```
30-
mvn -DskipTests -Pspark-2.4 clean package
30+
mvn -DskipTests -Pspark-2.1 -Dspark.version=2.1.0 clean package
31+
mvn -DskipTests -Pspark-2.2 -Dspark.version=2.2.1 clean package
32+
mvn -DskipTests -Pspark-2.3 -Dspark.version=2.3.2 clean package
3133
```
3234

3335
Note:
@@ -37,5 +39,5 @@ Note:
3739
## For contributors : To build the format code after any changes, please follow the below command.
3840
Note:Need install Apache Thrift 0.9.3
3941
```
40-
mvn clean -DskipTests -Pbuild-with-format -Pspark-2.4 package
42+
mvn clean -DskipTests -Pbuild-with-format -Pspark-2.2 package
4143
```

core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java

+8-9
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,11 @@
2727
import java.nio.charset.Charset;
2828
import java.util.ArrayList;
2929
import java.util.HashSet;
30+
import java.util.Iterator;
3031
import java.util.List;
3132
import java.util.Set;
3233

3334
import org.apache.carbondata.common.exceptions.sql.NoSuchDataMapException;
34-
import org.apache.carbondata.common.logging.LogServiceFactory;
3535
import org.apache.carbondata.core.constants.CarbonCommonConstants;
3636
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
3737
import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
@@ -42,15 +42,12 @@
4242
import org.apache.commons.lang3.StringUtils;
4343
import org.apache.hadoop.fs.permission.FsAction;
4444
import org.apache.hadoop.fs.permission.FsPermission;
45-
import org.apache.log4j.Logger;
4645

4746
/**
4847
* Stores datamap schema in disk as json format
4948
*/
5049
public class DiskBasedDMSchemaStorageProvider implements DataMapSchemaStorageProvider {
5150

52-
private Logger LOG = LogServiceFactory.getLogService(this.getClass().getCanonicalName());
53-
5451
private String storePath;
5552

5653
private String mdtFilePath;
@@ -174,15 +171,17 @@ public void dropSchema(String dataMapName)
174171
if (!FileFactory.isFileExist(schemaPath)) {
175172
throw new IOException("DataMap with name " + dataMapName + " does not exists in storage");
176173
}
177-
178-
LOG.info(String.format("Trying to delete DataMap %s schema", dataMapName));
179-
180-
dataMapSchemas.removeIf(schema -> schema.getDataMapName().equalsIgnoreCase(dataMapName));
174+
Iterator<DataMapSchema> iterator = dataMapSchemas.iterator();
175+
while (iterator.hasNext()) {
176+
DataMapSchema schema = iterator.next();
177+
if (schema.getDataMapName().equalsIgnoreCase(dataMapName)) {
178+
iterator.remove();
179+
}
180+
}
181181
touchMDTFile();
182182
if (!FileFactory.deleteFile(schemaPath)) {
183183
throw new IOException("DataMap with name " + dataMapName + " cannot be deleted");
184184
}
185-
LOG.info(String.format("DataMap %s schema is deleted", dataMapName));
186185
}
187186

188187
private void checkAndReloadDataMapSchemas(boolean touchFile) throws IOException {

datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala

+14-15
Original file line numberDiff line numberDiff line change
@@ -373,8 +373,7 @@ object MVHelper {
373373

374374
def updateColumnName(attr: Attribute, counter: Int): String = {
375375
val name = getUpdatedName(attr.name, counter)
376-
val value = attr.qualifier.map(qualifier => qualifier + "_" + name)
377-
if (value.nonEmpty) value.head else name
376+
attr.qualifier.map(qualifier => qualifier + "_" + name).getOrElse(name)
378377
}
379378

380379
def getTables(logicalPlan: LogicalPlan): Seq[CatalogTable] = {
@@ -474,7 +473,7 @@ object MVHelper {
474473
}
475474

476475
def createAttrReference(ref: NamedExpression, name: String): Alias = {
477-
CarbonToSparkAdapter.createAliasRef(ref, name, exprId = ref.exprId)
476+
Alias(ref, name)(exprId = ref.exprId, qualifier = None)
478477
}
479478

480479
case class AttributeKey(exp: Expression) {
@@ -538,13 +537,13 @@ object MVHelper {
538537
case attr: AttributeReference =>
539538
val uattr = attrMap.get(AttributeKey(attr)).map{a =>
540539
if (keepAlias) {
541-
CarbonToSparkAdapter.createAttributeReference(
542-
name = a.name,
543-
dataType = a.dataType,
544-
nullable = a.nullable,
545-
metadata = a.metadata,
546-
exprId = a.exprId,
547-
qualifier = attr.qualifier)
540+
CarbonToSparkAdapter.createAttributeReference(a.name,
541+
a.dataType,
542+
a.nullable,
543+
a.metadata,
544+
a.exprId,
545+
attr.qualifier,
546+
a)
548547
} else {
549548
a
550549
}
@@ -576,9 +575,9 @@ object MVHelper {
576575
outputSel.zip(subsumerOutputList).map{ case (l, r) =>
577576
l match {
578577
case attr: AttributeReference =>
579-
CarbonToSparkAdapter.createAliasRef(attr, r.name, r.exprId)
578+
Alias(attr, r.name)(r.exprId, None)
580579
case a@Alias(attr: AttributeReference, name) =>
581-
CarbonToSparkAdapter.createAliasRef(attr, r.name, r.exprId)
580+
Alias(attr, r.name)(r.exprId, None)
582581
case other => other
583582
}
584583
}
@@ -595,13 +594,13 @@ object MVHelper {
595594
val uattr = attrMap.get(AttributeKey(attr)).map{a =>
596595
if (keepAlias) {
597596
CarbonToSparkAdapter
598-
.createAttributeReference(
599-
a.name,
597+
.createAttributeReference(a.name,
600598
a.dataType,
601599
a.nullable,
602600
a.metadata,
603601
a.exprId,
604-
attr.qualifier)
602+
attr.qualifier,
603+
a)
605604
} else {
606605
a
607606
}

datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala

+8-10
Original file line numberDiff line numberDiff line change
@@ -125,18 +125,17 @@ class MVUtil {
125125
arrayBuffer += relation
126126
}
127127
var qualifier: Option[String] = None
128-
if (attr.qualifier.nonEmpty) {
129-
qualifier = if (attr.qualifier.headOption.get.startsWith("gen_sub")) {
128+
if (attr.qualifier.isDefined) {
129+
qualifier = if (attr.qualifier.get.startsWith("gen_sub")) {
130130
Some(carbonTable.getTableName)
131131
} else {
132-
attr.qualifier.headOption
132+
attr.qualifier
133133
}
134134
}
135135
fieldToDataMapFieldMap +=
136-
getFieldToDataMapFields(
137-
attr.name,
136+
getFieldToDataMapFields(attr.name,
138137
attr.dataType,
139-
qualifier.headOption,
138+
qualifier,
140139
"",
141140
arrayBuffer,
142141
carbonTable.getTableName)
@@ -249,8 +248,7 @@ class MVUtil {
249248
/**
250249
* Below method will be used to get the fields object for mv table
251250
*/
252-
private def getFieldToDataMapFields(
253-
name: String,
251+
private def getFieldToDataMapFields(name: String,
254252
dataType: DataType,
255253
qualifier: Option[String],
256254
aggregateType: String,
@@ -315,7 +313,7 @@ class MVUtil {
315313
val updatedOutList = outputList.map { col =>
316314
val duplicateColumn = duplicateNameCols
317315
.find(a => a.semanticEquals(col))
318-
val qualifiedName = col.qualifier.headOption.getOrElse(s"${ col.exprId.id }") + "_" + col.name
316+
val qualifiedName = col.qualifier.getOrElse(s"${ col.exprId.id }") + "_" + col.name
319317
if (duplicateColumn.isDefined) {
320318
val attributesOfDuplicateCol = duplicateColumn.get.collect {
321319
case a: AttributeReference => a
@@ -331,7 +329,7 @@ class MVUtil {
331329
attributeOfCol.exists(a => a.semanticEquals(expr)))
332330
if (!isStrictDuplicate) {
333331
Alias(col, qualifiedName)(exprId = col.exprId)
334-
} else if (col.qualifier.nonEmpty) {
332+
} else if (col.qualifier.isDefined) {
335333
Alias(col, qualifiedName)(exprId = col.exprId)
336334
// this check is added in scenario where the column is direct Attribute reference and
337335
// since duplicate columns select is allowed, we should just put alias for those columns

datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala

+3-7
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,11 @@
1818

1919
package org.apache.carbondata.mv.rewrite
2020

21-
import org.apache.spark.sql.CarbonToSparkAdapter
2221
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference, AttributeSet, Expression, PredicateHelper, _}
2322
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
2423
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
2524
import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, LeftOuter}
26-
import org.apache.spark.sql.types.{DataType, Metadata}
25+
import org.apache.spark.sql.types.DataType
2726

2827
import org.apache.carbondata.mv.datamap.MVHelper
2928
import org.apache.carbondata.mv.plans.modular.{JoinEdge, Matchable, ModularPlan, _}
@@ -96,12 +95,9 @@ abstract class DefaultMatchPattern extends MatchPattern[ModularPlan] {
9695
// Replace all compensation1 attributes with refrences of subsumer attributeset
9796
val compensationFinal = compensation1.transformExpressions {
9897
case ref: Attribute if subqueryAttributeSet.contains(ref) =>
99-
CarbonToSparkAdapter.createAttributeReference(
100-
ref.name, ref.dataType, nullable = true, metadata = Metadata.empty,
101-
exprId = ref.exprId, qualifier = subsumerName)
98+
AttributeReference(ref.name, ref.dataType)(exprId = ref.exprId, qualifier = subsumerName)
10299
case alias: Alias if subqueryAttributeSet.contains(alias.toAttribute) =>
103-
CarbonToSparkAdapter.createAliasRef(
104-
alias.child, alias.name, alias.exprId, subsumerName)
100+
Alias(alias.child, alias.name)(exprId = alias.exprId, qualifier = subsumerName)
105101
}
106102
compensationFinal
107103
} else {

datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Harmonizer.scala

+13-14
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,10 @@
1717

1818
package org.apache.carbondata.mv.plans.modular
1919

20-
import org.apache.spark.sql.{CarbonToSparkAdapter, SQLConf}
2120
import org.apache.spark.sql.catalyst.expressions._
2221
import org.apache.spark.sql.catalyst.plans._
2322
import org.apache.spark.sql.catalyst.rules._
24-
import org.apache.spark.sql.types.Metadata
23+
import org.apache.spark.sql.SQLConf
2524

2625
import org.apache.carbondata.mv.plans
2726
import org.apache.carbondata.mv.plans._
@@ -199,28 +198,28 @@ object HarmonizeFactTable extends Rule[ModularPlan] with PredicateHelper with Ag
199198
.isInstanceOf[Attribute]))
200199
val aggOutputList = aggTransMap.values.flatMap(t => t._2)
201200
.map { ref =>
202-
CarbonToSparkAdapter.createAttributeReference(
203-
ref.name, ref.dataType, nullable = true, Metadata.empty,
204-
ref.exprId, Some(hFactName))
201+
AttributeReference(ref.name, ref.dataType)(
202+
exprId = ref.exprId,
203+
qualifier = Some(hFactName))
205204
}
206205
val hFactOutputSet = hFact.outputSet
207206
// Update the outputlist qualifier
208207
val hOutputList = (attrOutputList ++ aggOutputList).map {attr =>
209208
attr.transform {
210209
case ref: Attribute if hFactOutputSet.contains(ref) =>
211-
CarbonToSparkAdapter.createAttributeReference(
212-
ref.name, ref.dataType, nullable = true, Metadata.empty,
213-
ref.exprId, Some(hFactName))
210+
AttributeReference(ref.name, ref.dataType)(
211+
exprId = ref.exprId,
212+
qualifier = Some(hFactName))
214213
}
215214
}.asInstanceOf[Seq[NamedExpression]]
216215

217216
// Update the predicate qualifier
218217
val hPredList = s.predicateList.map{ pred =>
219218
pred.transform {
220219
case ref: Attribute if hFactOutputSet.contains(ref) =>
221-
CarbonToSparkAdapter.createAttributeReference(
222-
ref.name, ref.dataType, nullable = true, Metadata.empty,
223-
ref.exprId, Some(hFactName))
220+
AttributeReference(ref.name, ref.dataType)(
221+
exprId = ref.exprId,
222+
qualifier = Some(hFactName))
224223
}
225224
}
226225
val hSel = s.copy(
@@ -242,9 +241,9 @@ object HarmonizeFactTable extends Rule[ModularPlan] with PredicateHelper with Ag
242241
val wip = g.copy(outputList = gOutputList, inputList = hInputList, child = hSel)
243242
wip.transformExpressions {
244243
case ref: Attribute if hFactOutputSet.contains(ref) =>
245-
CarbonToSparkAdapter.createAttributeReference(
246-
ref.name, ref.dataType, nullable = true, Metadata.empty,
247-
ref.exprId, Some(hFactName))
244+
AttributeReference(ref.name, ref.dataType)(
245+
exprId = ref.exprId,
246+
qualifier = Some(hFactName))
248247
}
249248
}
250249
}

datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPatterns.scala

+2-4
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,9 @@ object SimpleModularizer extends ModularPatterns {
5252
plan transform {
5353
case g@GroupBy(_, _, _, _, s@Select(_, _, _, aliasmap, _, children, _, _, _, _), _, _, _) =>
5454
val aq = AttributeSet(g.outputList).filter(_.qualifier.nonEmpty)
55-
val makeupmap: Map[Int, String] = children.zipWithIndex.flatMap {
55+
val makeupmap = children.zipWithIndex.flatMap {
5656
case (child, i) =>
57-
aq.find(child.outputSet.contains(_))
58-
.flatMap(_.qualifier.headOption)
59-
.map((i, _))
57+
aq.find(child.outputSet.contains(_)).map(_.qualifier).flatten.map((i, _))
6058
}.toMap
6159
g.copy(child = s.copy(aliasMap = makeupmap ++ aliasmap))
6260
}

datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/BirdcageOptimizer.scala

+4-1
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,10 @@ object BirdcageOptimizer extends RuleExecutor[LogicalPlan] {
110110
RewriteCorrelatedScalarSubquery,
111111
EliminateSerialization,
112112
SparkSQLUtil.getRemoveRedundantAliasesObj(),
113-
RemoveRedundantProject) ++
113+
RemoveRedundantProject,
114+
SimplifyCreateStructOps,
115+
SimplifyCreateArrayOps,
116+
SimplifyCreateMapOps) ++
114117
extendedOperatorOptimizationRules: _*) ::
115118
Batch(
116119
"Check Cartesian Products", Once,

datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Logical2ModularExtractions.scala

+18-5
Original file line numberDiff line numberDiff line change
@@ -167,9 +167,7 @@ object ExtractSelectModule extends PredicateHelper {
167167
val aq = attributeSet.filter(_.qualifier.nonEmpty)
168168
children.zipWithIndex.flatMap {
169169
case (child, i) =>
170-
aq.find(child.outputSet.contains(_))
171-
.flatMap(_.qualifier.headOption)
172-
.map((i, _))
170+
aq.find(child.outputSet.contains(_)).map(_.qualifier).flatten.map((i, _))
173171
}.toMap
174172
}
175173

@@ -355,13 +353,28 @@ object ExtractTableModule extends PredicateHelper {
355353
Seq.empty)
356354
case l: LogicalRelation =>
357355
val tableIdentifier = l.catalogTable.map(_.identifier)
358-
val database = tableIdentifier.flatMap(_.database).orNull
359-
val table = tableIdentifier.map(_.table).orNull
356+
val database = tableIdentifier.map(_.database).flatten.getOrElse(null)
357+
val table = tableIdentifier.map(_.table).getOrElse(null)
360358
Some(database, table, l.output, Nil, NoFlags, Seq.empty)
361359
case l: LocalRelation => // used for unit test
362360
Some(null, null, l.output, Nil, NoFlags, Seq.empty)
363361
case _ =>
362+
// this check is added as we get MetastoreRelation in spark2.1,
363+
// this is removed in later spark version
364+
// TODO: this check can be removed once 2.1 support is removed from carbon
365+
if (SparkUtil.isSparkVersionEqualTo("2.1") &&
366+
plan.getClass.getName.equals("org.apache.spark.sql.hive.MetastoreRelation")) {
367+
val catalogTable = CarbonReflectionUtils.getFieldOfCatalogTable("catalogTable", plan)
368+
.asInstanceOf[CatalogTable]
369+
Some(catalogTable.database,
370+
catalogTable.identifier.table,
371+
plan.output,
372+
Nil,
373+
NoFlags,
374+
Seq.empty)
375+
} else {
364376
None
377+
}
365378
}
366379
}
367380
}

datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Printers.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ trait Printers {
204204
s.child match {
205205
case a: Alias =>
206206
val qualifierPrefix = a.qualifier
207-
.map(_ + ".").headOption.getOrElse("")
207+
.map(_ + ".").getOrElse("")
208208
s"$qualifierPrefix${
209209
quoteIdentifier(a
210210
.name)
@@ -221,7 +221,7 @@ trait Printers {
221221
s.child match {
222222
case a: Alias =>
223223
val qualifierPrefix = a.qualifier.map(_ + ".")
224-
.headOption.getOrElse("")
224+
.getOrElse("")
225225
s"$qualifierPrefix${ quoteIdentifier(a.name) }"
226226

227227
case other => other.sql

0 commit comments

Comments
 (0)