Skip to content

Commit 346a1b7

Browse files
committed
added a catalog API which can be extended by any external catalog
1 parent 41831ce commit 346a1b7

File tree

41 files changed

+505
-356
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+505
-356
lines changed

core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java

+3
Original file line numberDiff line numberDiff line change
@@ -2876,4 +2876,7 @@ private CarbonCommonConstants() {
28762876

28772877
public static final String CARBON_ENABLE_SCHEMA_ENFORCEMENT_DEFAULT = "true";
28782878

2879+
@CarbonProperty
2880+
public static final String CARBON_CATALOG_IMPL = "carbon.catalog.impl";
2881+
28792882
}

core/src/main/java/org/apache/carbondata/core/metadata/converter/SchemaConverter.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,7 @@ org.apache.carbondata.format.TableSchema fromWrapperToExternalTableSchema(
6161
* @param tableName
6262
* @return
6363
*/
64-
org.apache.carbondata.format.TableInfo fromWrapperToExternalTableInfo(TableInfo wrapperTableInfo,
65-
String dbName, String tableName);
64+
org.apache.carbondata.format.TableInfo fromWrapperToExternalTableInfo(TableInfo wrapperTableInfo);
6665

6766
/**
6867
* @param externalSchemaEvolutionEntry

core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,7 @@ private org.apache.carbondata.format.BucketingInfo fromWrapperToExternalBucketin
306306
*/
307307
@Override
308308
public org.apache.carbondata.format.TableInfo fromWrapperToExternalTableInfo(
309-
TableInfo wrapperTableInfo, String dbName, String tableName) {
309+
TableInfo wrapperTableInfo) {
310310
org.apache.carbondata.format.TableSchema thriftFactTable =
311311
fromWrapperToExternalTableSchema(wrapperTableInfo.getFactTable());
312312
return new org.apache.carbondata.format.TableInfo(thriftFactTable, new ArrayList<>());

core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1425,7 +1425,7 @@ public List<org.apache.carbondata.format.SchemaEvolutionEntry> getSchema_evoluti
14251425
org.apache.carbondata.format.TableSchema thriftFactTable =
14261426
new org.apache.carbondata.format.TableSchema("tableId", thriftColumnSchemas, schemaEvol);
14271427
org.apache.carbondata.format.TableInfo actualResult = thriftWrapperSchemaConverter
1428-
.fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName);
1428+
.fromWrapperToExternalTableInfo(wrapperTableInfo);
14291429
org.apache.carbondata.format.TableInfo expectedResult =
14301430
new org.apache.carbondata.format.TableInfo(thriftFactTable, new ArrayList<org.apache
14311431
.carbondata.format.TableSchema>());

core/src/test/java/org/apache/carbondata/core/metadata/schema/table/TableInfoTest.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,7 @@ private TableInfo getTableInfo(String databaseName, String tableName) {
9393
CarbonTable carbonTable = CarbonTable.buildFromTableInfo(tableInfo);
9494
ThriftWrapperSchemaConverterImpl schemaConverter = new ThriftWrapperSchemaConverterImpl();
9595
org.apache.carbondata.format.TableInfo thriftTable = schemaConverter
96-
.fromWrapperToExternalTableInfo(carbonTable.getTableInfo(), carbonTable.getDatabaseName(),
97-
carbonTable.getTableName());
96+
.fromWrapperToExternalTableInfo(carbonTable.getTableInfo());
9897
assertTrue(null != thriftTable);
9998
}
10099

@@ -135,8 +134,7 @@ private TableInfo getTableInfo(String databaseName, String tableName) {
135134
CarbonTable carbonTable = CarbonTable.buildFromTableInfo(tableInfo);
136135
ThriftWrapperSchemaConverterImpl schemaConverter = new ThriftWrapperSchemaConverterImpl();
137136
org.apache.carbondata.format.TableInfo thriftTable = schemaConverter
138-
.fromWrapperToExternalTableInfo(carbonTable.getTableInfo(), carbonTable.getDatabaseName(),
139-
carbonTable.getTableName());
137+
.fromWrapperToExternalTableInfo(carbonTable.getTableInfo());
140138
assertTrue(null != thriftTable);
141139
}
142140

examples/spark/pom.xml

+3-3
Original file line numberDiff line numberDiff line change
@@ -200,9 +200,6 @@
200200
</profile>
201201
<profile>
202202
<id>spark-2.3</id>
203-
<activation>
204-
<activeByDefault>true</activeByDefault>
205-
</activation>
206203
<properties>
207204
<spark.binary.version>2.3</spark.binary.version>
208205
</properties>
@@ -215,6 +212,9 @@
215212
</profile>
216213
<profile>
217214
<id>spark-3.1</id>
215+
<activation>
216+
<activeByDefault>true</activeByDefault>
217+
</activation>
218218
<properties>
219219
<spark.binary.version>3.1</spark.binary.version>
220220
<dep.jackson.version>2.10.0</dep.jackson.version>

hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java

+1-4
Original file line numberDiff line numberDiff line change
@@ -228,10 +228,7 @@ public CarbonTable createTable(
228228

229229
SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
230230
org.apache.carbondata.format.TableInfo thriftTableInfo =
231-
schemaConverter.fromWrapperToExternalTableInfo(
232-
tableInfo,
233-
tableInfo.getDatabaseName(),
234-
tableInfo.getFactTable().getTableName());
231+
schemaConverter.fromWrapperToExternalTableInfo(tableInfo);
235232
org.apache.carbondata.format.SchemaEvolutionEntry schemaEvolutionEntry =
236233
new org.apache.carbondata.format.SchemaEvolutionEntry(tableInfo.getLastUpdatedTime());
237234
thriftTableInfo.getFact_table().getSchema_evolution().getSchema_evolution_history()

index/secondary-index/pom.xml

+3-3
Original file line numberDiff line numberDiff line change
@@ -158,9 +158,6 @@
158158
</profile>
159159
<profile>
160160
<id>spark-2.3</id>
161-
<activation>
162-
<activeByDefault>true</activeByDefault>
163-
</activation>
164161
<properties>
165162
<spark.binary.version>2.3</spark.binary.version>
166163
</properties>
@@ -173,6 +170,9 @@
173170
</profile>
174171
<profile>
175172
<id>spark-3.1</id>
173+
<activation>
174+
<activeByDefault>true</activeByDefault>
175+
</activation>
176176
<properties>
177177
<spark.binary.version>3.1</spark.binary.version>
178178
</properties>

integration/flink/pom.xml

+3-3
Original file line numberDiff line numberDiff line change
@@ -220,9 +220,6 @@
220220
<profiles>
221221
<profile>
222222
<id>spark-2.3</id>
223-
<activation>
224-
<activeByDefault>true</activeByDefault>
225-
</activation>
226223
<properties>
227224
<spark.binary.version>2.3</spark.binary.version>
228225
</properties>
@@ -263,6 +260,9 @@
263260
</profile>
264261
<profile>
265262
<id>spark-3.1</id>
263+
<activation>
264+
<activeByDefault>true</activeByDefault>
265+
</activation>
266266
<properties>
267267
<spark.binary.version>3.1</spark.binary.version>
268268
</properties>

integration/hive/src/main/java/org/apache/carbondata/hive/util/HiveCarbonUtil.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -255,8 +255,7 @@ private static void writeSchemaFile(TableInfo tableInfo) throws IOException {
255255
ThriftWriter thriftWriter = new ThriftWriter(schemaFilePath, false);
256256
thriftWriter.open(FileWriteOperation.OVERWRITE);
257257
thriftWriter.write(schemaConverter
258-
.fromWrapperToExternalTableInfo(tableInfo, tableInfo.getDatabaseName(),
259-
tableInfo.getFactTable().getTableName()));
258+
.fromWrapperToExternalTableInfo(tableInfo));
260259
thriftWriter.close();
261260
}
262261

integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala

+1-4
Original file line numberDiff line numberDiff line change
@@ -153,10 +153,7 @@ object CarbonDataStoreCreator {
153153
val schemaConverter: SchemaConverter =
154154
new ThriftWrapperSchemaConverterImpl()
155155
val thriftTableInfo: TableInfo =
156-
schemaConverter.fromWrapperToExternalTableInfo(
157-
tableInfo,
158-
tableInfo.getDatabaseName,
159-
tableInfo.getFactTable.getTableName)
156+
schemaConverter.fromWrapperToExternalTableInfo(tableInfo)
160157
val schemaEvolutionEntry: SchemaEvolutionEntry =
161158
new org.apache.carbondata.format.SchemaEvolutionEntry(
162159
tableInfo.getLastUpdatedTime)

integration/spark/pom.xml

+3-3
Original file line numberDiff line numberDiff line change
@@ -608,9 +608,6 @@
608608
</profile>
609609
<profile>
610610
<id>spark-2.3</id>
611-
<activation>
612-
<activeByDefault>true</activeByDefault>
613-
</activation>
614611
<properties>
615612
<spark.binary.version>2.3</spark.binary.version>
616613
</properties>
@@ -693,6 +690,9 @@
693690
</profile>
694691
<profile>
695692
<id>spark-3.1</id>
693+
<activation>
694+
<activeByDefault>true</activeByDefault>
695+
</activation>
696696
<properties>
697697
<spark.binary.version>3.1</spark.binary.version>
698698
</properties>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalog
19+
20+
import org.apache.spark.sql.SparkSession
21+
import org.apache.spark.sql.catalyst.TableIdentifier
22+
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable}
23+
24+
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
25+
import org.apache.carbondata.core.metadata.schema.table.TableInfo
26+
import org.apache.carbondata.format.{TableInfo => ExternalTableInfo}
27+
28+
trait CarbonCatalog {
29+
30+
def createTable(tableDefinition: CatalogTable,
31+
ignoreIfExists: Boolean,
32+
validateLocation: Boolean = true)(sparkSession: SparkSession): Unit
33+
34+
def tableExists(name: TableIdentifier)(sparkSession: SparkSession): Boolean
35+
36+
def getDatabaseMetadata(name: String)(sparkSession: SparkSession): CatalogDatabase
37+
38+
def getTableMetadata(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): CatalogTable
39+
40+
def listDatabases()(sparkSession: SparkSession): Seq[String]
41+
42+
def listTables(dbName: String)(sparkSession: SparkSession): Seq[TableIdentifier]
43+
44+
def dropTable(name: TableIdentifier,
45+
ignoreIfNotExists: Boolean,
46+
purge: Boolean)(sparkSession: SparkSession): Unit
47+
48+
def getSchema(dbName: String, tableName: String, tablePath: String): Option[TableInfo]
49+
50+
def getLastSchemaModificationTime(schemaFilePath: String): Long
51+
52+
/**
53+
* This method will write the schema thrift file in carbon store and load table metadata
54+
*/
55+
def saveSchema(identifier: AbsoluteTableIdentifier, thriftTableInfo: ExternalTableInfo): Long
56+
57+
// def alterTable(wrapperTableInfo: TableInfo)(sparkSession: SparkSession): Unit
58+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalog
19+
20+
import java.io.IOException
21+
22+
import org.apache.spark.sql.SparkSession
23+
import org.apache.spark.sql.catalyst.TableIdentifier
24+
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable}
25+
26+
import org.apache.carbondata.core.datastore.impl.FileFactory
27+
import org.apache.carbondata.core.fileoperations.FileWriteOperation
28+
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
29+
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
30+
import org.apache.carbondata.core.metadata.schema.table.TableInfo
31+
import org.apache.carbondata.core.util.CarbonUtil
32+
import org.apache.carbondata.core.util.path.CarbonTablePath
33+
import org.apache.carbondata.core.writer.ThriftWriter
34+
import org.apache.carbondata.format.{TableInfo => ExternalTableInfo}
35+
36+
37+
private[catalog] object CarbonCatalogImpl extends CarbonCatalog {
38+
39+
def createTable(tableDefinition: CatalogTable,
40+
ignoreIfExists: Boolean,
41+
validateLocation: Boolean = true)(sparkSession: SparkSession): Unit = {
42+
sparkSession.sessionState.catalog.createTable(tableDefinition, ignoreIfExists, validateLocation)
43+
}
44+
45+
def tableExists(name: TableIdentifier)(sparkSession: SparkSession): Boolean = {
46+
sparkSession.sessionState.catalog.tableExists(name)
47+
}
48+
49+
override def getDatabaseMetadata(name: String)(sparkSession: SparkSession): CatalogDatabase = {
50+
sparkSession.sessionState.catalog.getDatabaseMetadata(name)
51+
}
52+
53+
override def listDatabases()(sparkSession: SparkSession): Seq[String] = {
54+
sparkSession.sessionState.catalog.listDatabases()
55+
}
56+
57+
override def listTables(dbName: String)(sparkSession: SparkSession): Seq[TableIdentifier] = {
58+
sparkSession.sessionState.catalog.listTables(dbName)
59+
}
60+
61+
override def getTableMetadata(tableIdentifier: TableIdentifier)
62+
(sparkSession: SparkSession): CatalogTable = {
63+
sparkSession.sessionState.catalog.getTableMetadata(tableIdentifier)
64+
}
65+
66+
override def dropTable(name: TableIdentifier, ignoreIfNotExists: Boolean,
67+
purge: Boolean)(sparkSession: SparkSession): Unit = {
68+
sparkSession.sessionState.catalog.dropTable(name, ignoreIfNotExists = true, purge = false)
69+
}
70+
71+
override def getSchema(dbName: String, tableName: String,
72+
tablePath: String): Option[TableInfo] = {
73+
val schemaConverter = new ThriftWrapperSchemaConverterImpl
74+
val tableMetadataFile = CarbonTablePath.getSchemaFilePath(tablePath)
75+
if (FileFactory.isFileExist(tableMetadataFile)) {
76+
val tableInfo: ExternalTableInfo = CarbonUtil.readSchemaFile(tableMetadataFile)
77+
val wrapperTableInfo =
78+
schemaConverter.fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, tablePath)
79+
Some(wrapperTableInfo)
80+
} else {
81+
None
82+
}
83+
}
84+
85+
override def getLastSchemaModificationTime(schemaFilePath: String): Long = {
86+
val schemaFile = FileFactory.getCarbonFile(schemaFilePath)
87+
if (schemaFile.exists()) {
88+
schemaFile.getLastModifiedTime
89+
} else {
90+
-1L
91+
}
92+
}
93+
94+
/**
95+
* This method will write the schema thrift file in carbon store and load table metadata
96+
*/
97+
def saveSchema(identifier: AbsoluteTableIdentifier, thriftTableInfo: ExternalTableInfo): Long = {
98+
val schemaMetadataPath = CarbonTablePath.getMetadataPath(identifier.getTablePath)
99+
if (!FileFactory.isFileExist(schemaMetadataPath)) {
100+
val isDirCreated = FileFactory
101+
.mkdirs(schemaMetadataPath, SparkSession.getActiveSession.get.sessionState.newHadoopConf())
102+
if (!isDirCreated) {
103+
throw new IOException(s"Failed to create the metadata directory $schemaMetadataPath")
104+
}
105+
}
106+
val schemaFilePath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath)
107+
val thriftWriter = new ThriftWriter(schemaFilePath, false)
108+
thriftWriter.open(FileWriteOperation.OVERWRITE)
109+
thriftWriter.write(thriftTableInfo)
110+
thriftWriter.close()
111+
val modifiedTime = System.currentTimeMillis()
112+
FileFactory.getCarbonFile(schemaFilePath).setLastModifiedTime(modifiedTime)
113+
modifiedTime
114+
}
115+
116+
// def alterTable(wrapperTableInfo: TableInfo)(sparkSession: SparkSession): Unit = {
117+
// val schemaParts = CarbonUtil.convertToMultiGsonStrings(wrapperTableInfo, "=", "'", "")
118+
// val hiveClient = sparkSession
119+
// .sessionState
120+
// .catalog
121+
// .externalCatalog.asInstanceOf[HiveExternalCatalog]
122+
// .client
123+
// hiveClient.runSqlHive(s"ALTER TABLE " +
124+
// s"`${wrapperTableInfo.getDatabaseName}`.`${wrapperTableInfo.getFactTable.getTableName}`
125+
// " +
126+
// s"SET SERDEPROPERTIES($schemaParts)")
127+
// }
128+
}

0 commit comments

Comments
 (0)