Skip to content

Commit 153f89a

Browse files
committed
added a catalog API which can be extended by any external catalog
1 parent 41831ce commit 153f89a

File tree

45 files changed

+542
-386
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+542
-386
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package org.apache.carbondata.core.catalog;
2+
3+
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
4+
import org.apache.carbondata.format.TableInfo;
5+
6+
public interface BaseCatalog {
7+
8+
long getLastSchemaModificationTime(String schemaFilePath);
9+
10+
long saveSchema( AbsoluteTableIdentifier identifier, TableInfo thriftTableInfo);
11+
12+
TableInfo getSchema(String schemaFilePath);
13+
14+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package org.apache.carbondata.core.catalog;
2+
3+
import org.apache.carbondata.common.logging.LogServiceFactory;
4+
import org.apache.carbondata.core.constants.CarbonCommonConstants;
5+
import org.apache.carbondata.core.util.CarbonProperties;
6+
7+
import org.apache.log4j.Logger;
8+
9+
public class CatalogFactory {
10+
11+
private static final Logger LOGGER = LogServiceFactory.getLogService(CatalogFactory.class.getName());
12+
13+
private static final CatalogFactory INSTANCE = new CatalogFactory();
14+
15+
private Object catalog;
16+
17+
private CatalogFactory() {
18+
String catalogClass = CarbonProperties.getInstance()
19+
.getProperty(CarbonCommonConstants.CARBON_CATALOG_IMPL,
20+
CarbonCommonConstants.CARBON_CATALOG_IMPL_DEFAULT);
21+
try {
22+
catalog = Class.forName(catalogClass).newInstance();
23+
} catch (Exception e){
24+
LOGGER.error("Error while loading class: " + catalogClass);
25+
}
26+
}
27+
28+
public static CatalogFactory getInstance() {
29+
return INSTANCE;
30+
}
31+
32+
public BaseCatalog getCatalog() {
33+
return ((BaseCatalog) catalog);
34+
}
35+
36+
public <T extends BaseCatalog> T getCatalog(Class<T> type) {
37+
return type.cast(catalog);
38+
}
39+
}

core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java

+6
Original file line numberDiff line numberDiff line change
@@ -2876,4 +2876,10 @@ private CarbonCommonConstants() {
28762876

28772877
public static final String CARBON_ENABLE_SCHEMA_ENFORCEMENT_DEFAULT = "true";
28782878

2879+
@CarbonProperty
2880+
public static final String CARBON_CATALOG_IMPL = "carbon.catalog.impl";
2881+
2882+
@CarbonProperty public static final String CARBON_CATALOG_IMPL_DEFAULT =
2883+
"org.apache.spark.sql.catalog.CarbonCatalogImpl";
2884+
28792885
}

core/src/main/java/org/apache/carbondata/core/metadata/converter/SchemaConverter.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,7 @@ org.apache.carbondata.format.TableSchema fromWrapperToExternalTableSchema(
6161
* @param tableName
6262
* @return
6363
*/
64-
org.apache.carbondata.format.TableInfo fromWrapperToExternalTableInfo(TableInfo wrapperTableInfo,
65-
String dbName, String tableName);
64+
org.apache.carbondata.format.TableInfo fromWrapperToExternalTableInfo(TableInfo wrapperTableInfo);
6665

6766
/**
6867
* @param externalSchemaEvolutionEntry

core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,7 @@ private org.apache.carbondata.format.BucketingInfo fromWrapperToExternalBucketin
306306
*/
307307
@Override
308308
public org.apache.carbondata.format.TableInfo fromWrapperToExternalTableInfo(
309-
TableInfo wrapperTableInfo, String dbName, String tableName) {
309+
TableInfo wrapperTableInfo) {
310310
org.apache.carbondata.format.TableSchema thriftFactTable =
311311
fromWrapperToExternalTableSchema(wrapperTableInfo.getFactTable());
312312
return new org.apache.carbondata.format.TableInfo(thriftFactTable, new ArrayList<>());

core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java

+3-7
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@
5050
import java.util.UUID;
5151

5252
import org.apache.carbondata.common.logging.LogServiceFactory;
53+
import org.apache.carbondata.core.catalog.BaseCatalog;
54+
import org.apache.carbondata.core.catalog.CatalogFactory;
5355
import org.apache.carbondata.core.constants.CarbonCommonConstants;
5456
import org.apache.carbondata.core.datastore.TableSpec;
5557
import org.apache.carbondata.core.datastore.block.AbstractIndex;
@@ -2015,13 +2017,7 @@ public static org.apache.carbondata.format.TableInfo readSchemaFile(String schem
20152017
public static org.apache.carbondata.format.TableInfo readSchemaFile(String schemaFilePath,
20162018
Configuration conf)
20172019
throws IOException {
2018-
TBaseCreator createTBase = org.apache.carbondata.format.TableInfo::new;
2019-
ThriftReader thriftReader = new ThriftReader(schemaFilePath, createTBase, conf);
2020-
thriftReader.open();
2021-
org.apache.carbondata.format.TableInfo tableInfo =
2022-
(org.apache.carbondata.format.TableInfo) thriftReader.read();
2023-
thriftReader.close();
2024-
return tableInfo;
2020+
return CatalogFactory.getInstance().getCatalog().getSchema(schemaFilePath);
20252021
}
20262022

20272023
public static ColumnSchema thriftColumnSchemaToWrapperColumnSchema(

core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1425,7 +1425,7 @@ public List<org.apache.carbondata.format.SchemaEvolutionEntry> getSchema_evoluti
14251425
org.apache.carbondata.format.TableSchema thriftFactTable =
14261426
new org.apache.carbondata.format.TableSchema("tableId", thriftColumnSchemas, schemaEvol);
14271427
org.apache.carbondata.format.TableInfo actualResult = thriftWrapperSchemaConverter
1428-
.fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName);
1428+
.fromWrapperToExternalTableInfo(wrapperTableInfo);
14291429
org.apache.carbondata.format.TableInfo expectedResult =
14301430
new org.apache.carbondata.format.TableInfo(thriftFactTable, new ArrayList<org.apache
14311431
.carbondata.format.TableSchema>());

core/src/test/java/org/apache/carbondata/core/metadata/schema/table/TableInfoTest.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,7 @@ private TableInfo getTableInfo(String databaseName, String tableName) {
9393
CarbonTable carbonTable = CarbonTable.buildFromTableInfo(tableInfo);
9494
ThriftWrapperSchemaConverterImpl schemaConverter = new ThriftWrapperSchemaConverterImpl();
9595
org.apache.carbondata.format.TableInfo thriftTable = schemaConverter
96-
.fromWrapperToExternalTableInfo(carbonTable.getTableInfo(), carbonTable.getDatabaseName(),
97-
carbonTable.getTableName());
96+
.fromWrapperToExternalTableInfo(carbonTable.getTableInfo());
9897
assertTrue(null != thriftTable);
9998
}
10099

@@ -135,8 +134,7 @@ private TableInfo getTableInfo(String databaseName, String tableName) {
135134
CarbonTable carbonTable = CarbonTable.buildFromTableInfo(tableInfo);
136135
ThriftWrapperSchemaConverterImpl schemaConverter = new ThriftWrapperSchemaConverterImpl();
137136
org.apache.carbondata.format.TableInfo thriftTable = schemaConverter
138-
.fromWrapperToExternalTableInfo(carbonTable.getTableInfo(), carbonTable.getDatabaseName(),
139-
carbonTable.getTableName());
137+
.fromWrapperToExternalTableInfo(carbonTable.getTableInfo());
140138
assertTrue(null != thriftTable);
141139
}
142140

examples/spark/pom.xml

+3-3
Original file line numberDiff line numberDiff line change
@@ -200,9 +200,6 @@
200200
</profile>
201201
<profile>
202202
<id>spark-2.3</id>
203-
<activation>
204-
<activeByDefault>true</activeByDefault>
205-
</activation>
206203
<properties>
207204
<spark.binary.version>2.3</spark.binary.version>
208205
</properties>
@@ -215,6 +212,9 @@
215212
</profile>
216213
<profile>
217214
<id>spark-3.1</id>
215+
<activation>
216+
<activeByDefault>true</activeByDefault>
217+
</activation>
218218
<properties>
219219
<spark.binary.version>3.1</spark.binary.version>
220220
<dep.jackson.version>2.10.0</dep.jackson.version>

hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java

+1-4
Original file line numberDiff line numberDiff line change
@@ -228,10 +228,7 @@ public CarbonTable createTable(
228228

229229
SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
230230
org.apache.carbondata.format.TableInfo thriftTableInfo =
231-
schemaConverter.fromWrapperToExternalTableInfo(
232-
tableInfo,
233-
tableInfo.getDatabaseName(),
234-
tableInfo.getFactTable().getTableName());
231+
schemaConverter.fromWrapperToExternalTableInfo(tableInfo);
235232
org.apache.carbondata.format.SchemaEvolutionEntry schemaEvolutionEntry =
236233
new org.apache.carbondata.format.SchemaEvolutionEntry(tableInfo.getLastUpdatedTime());
237234
thriftTableInfo.getFact_table().getSchema_evolution().getSchema_evolution_history()

index/secondary-index/pom.xml

+3-3
Original file line numberDiff line numberDiff line change
@@ -158,9 +158,6 @@
158158
</profile>
159159
<profile>
160160
<id>spark-2.3</id>
161-
<activation>
162-
<activeByDefault>true</activeByDefault>
163-
</activation>
164161
<properties>
165162
<spark.binary.version>2.3</spark.binary.version>
166163
</properties>
@@ -173,6 +170,9 @@
173170
</profile>
174171
<profile>
175172
<id>spark-3.1</id>
173+
<activation>
174+
<activeByDefault>true</activeByDefault>
175+
</activation>
176176
<properties>
177177
<spark.binary.version>3.1</spark.binary.version>
178178
</properties>

integration/flink/pom.xml

+3-3
Original file line numberDiff line numberDiff line change
@@ -220,9 +220,6 @@
220220
<profiles>
221221
<profile>
222222
<id>spark-2.3</id>
223-
<activation>
224-
<activeByDefault>true</activeByDefault>
225-
</activation>
226223
<properties>
227224
<spark.binary.version>2.3</spark.binary.version>
228225
</properties>
@@ -263,6 +260,9 @@
263260
</profile>
264261
<profile>
265262
<id>spark-3.1</id>
263+
<activation>
264+
<activeByDefault>true</activeByDefault>
265+
</activation>
266266
<properties>
267267
<spark.binary.version>3.1</spark.binary.version>
268268
</properties>

integration/hive/src/main/java/org/apache/carbondata/hive/util/HiveCarbonUtil.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -255,8 +255,7 @@ private static void writeSchemaFile(TableInfo tableInfo) throws IOException {
255255
ThriftWriter thriftWriter = new ThriftWriter(schemaFilePath, false);
256256
thriftWriter.open(FileWriteOperation.OVERWRITE);
257257
thriftWriter.write(schemaConverter
258-
.fromWrapperToExternalTableInfo(tableInfo, tableInfo.getDatabaseName(),
259-
tableInfo.getFactTable().getTableName()));
258+
.fromWrapperToExternalTableInfo(tableInfo));
260259
thriftWriter.close();
261260
}
262261

integration/presto/src/main/prestosql/org/apache/carbondata/presto/impl/CarbonTableReader.java

+3-24
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,16 @@
2121
import java.util.ArrayList;
2222
import java.util.Arrays;
2323
import java.util.HashMap;
24-
import java.util.HashSet;
2524
import java.util.List;
2625
import java.util.Map;
2726
import java.util.Objects;
28-
import java.util.Set;
2927
import java.util.concurrent.ConcurrentHashMap;
3028
import java.util.concurrent.atomic.AtomicReference;
3129
import java.util.stream.Collectors;
3230

3331
import org.apache.carbondata.common.logging.LogServiceFactory;
32+
import org.apache.carbondata.core.catalog.BaseCatalog;
33+
import org.apache.carbondata.core.catalog.CatalogFactory;
3434
import org.apache.carbondata.core.constants.CarbonCommonConstants;
3535
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
3636
import org.apache.carbondata.core.datastore.impl.FileFactory;
@@ -40,38 +40,30 @@
4040
import org.apache.carbondata.core.indexstore.PartitionSpec;
4141
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
4242
import org.apache.carbondata.core.metadata.CarbonMetadata;
43-
import org.apache.carbondata.core.metadata.SegmentFileStore;
4443
import org.apache.carbondata.core.metadata.converter.SchemaConverter;
4544
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
4645
import org.apache.carbondata.core.metadata.index.IndexType;
4746
import org.apache.carbondata.core.metadata.schema.indextable.IndexMetadata;
4847
import org.apache.carbondata.core.metadata.schema.indextable.IndexTableInfo;
4948
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
5049
import org.apache.carbondata.core.metadata.schema.table.TableInfo;
51-
import org.apache.carbondata.core.reader.ThriftReader;
5250
import org.apache.carbondata.core.scan.expression.Expression;
5351
import org.apache.carbondata.core.statusmanager.FileFormat;
54-
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
5552
import org.apache.carbondata.core.util.CarbonProperties;
5653
import org.apache.carbondata.core.util.CarbonUtil;
5754
import org.apache.carbondata.core.util.path.CarbonTablePath;
5855
import org.apache.carbondata.hadoop.CarbonInputSplit;
5956
import org.apache.carbondata.hadoop.api.CarbonInputFormat;
6057
import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
61-
import org.apache.carbondata.presto.PrestoFilterUtil;
6258

6359
import com.google.gson.Gson;
6460
import com.google.inject.Inject;
65-
import io.prestosql.plugin.hive.HiveColumnHandle;
6661
import io.prestosql.spi.connector.SchemaTableName;
67-
import io.prestosql.spi.predicate.TupleDomain;
68-
import org.apache.commons.collections.CollectionUtils;
6962
import org.apache.hadoop.conf.Configuration;
7063
import org.apache.hadoop.mapred.JobConf;
7164
import org.apache.hadoop.mapreduce.InputSplit;
7265
import org.apache.hadoop.mapreduce.Job;
7366
import org.apache.log4j.Logger;
74-
import org.apache.thrift.TBase;
7567

7668
import static org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY;
7769
import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
@@ -192,20 +184,7 @@ private CarbonTableCacheModel parseCarbonMetadata(SchemaTableName table, String
192184
org.apache.carbondata.format.TableInfo tableInfo;
193185
long modifiedTime = System.currentTimeMillis();
194186
if (isTransactionalTable) {
195-
//Step 2: read the metadata (tableInfo) of the table.
196-
ThriftReader.TBaseCreator createTBase = new ThriftReader.TBaseCreator() {
197-
// TBase is used to read and write thrift objects.
198-
// TableInfo is a kind of TBase used to read and write table information.
199-
// TableInfo is generated by thrift,
200-
// see schema.thrift under format/src/main/thrift for details.
201-
public TBase create() {
202-
return new org.apache.carbondata.format.TableInfo();
203-
}
204-
};
205-
ThriftReader thriftReader = new ThriftReader(schemaFilePath, createTBase, config);
206-
thriftReader.open();
207-
tableInfo = (org.apache.carbondata.format.TableInfo) thriftReader.read();
208-
thriftReader.close();
187+
tableInfo = CatalogFactory.getInstance().getCatalog().getSchema(schemaFilePath);
209188
modifiedTime = schemaFile.getLastModifiedTime();
210189
} else {
211190
tableInfo = CarbonUtil.inferSchema(tablePath, table.getTableName(), false, config);

integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala

+1-4
Original file line numberDiff line numberDiff line change
@@ -153,10 +153,7 @@ object CarbonDataStoreCreator {
153153
val schemaConverter: SchemaConverter =
154154
new ThriftWrapperSchemaConverterImpl()
155155
val thriftTableInfo: TableInfo =
156-
schemaConverter.fromWrapperToExternalTableInfo(
157-
tableInfo,
158-
tableInfo.getDatabaseName,
159-
tableInfo.getFactTable.getTableName)
156+
schemaConverter.fromWrapperToExternalTableInfo(tableInfo)
160157
val schemaEvolutionEntry: SchemaEvolutionEntry =
161158
new org.apache.carbondata.format.SchemaEvolutionEntry(
162159
tableInfo.getLastUpdatedTime)

integration/spark/pom.xml

+3-3
Original file line numberDiff line numberDiff line change
@@ -608,9 +608,6 @@
608608
</profile>
609609
<profile>
610610
<id>spark-2.3</id>
611-
<activation>
612-
<activeByDefault>true</activeByDefault>
613-
</activation>
614611
<properties>
615612
<spark.binary.version>2.3</spark.binary.version>
616613
</properties>
@@ -693,6 +690,9 @@
693690
</profile>
694691
<profile>
695692
<id>spark-3.1</id>
693+
<activation>
694+
<activeByDefault>true</activeByDefault>
695+
</activation>
696696
<properties>
697697
<spark.binary.version>3.1</spark.binary.version>
698698
</properties>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalog
19+
20+
import org.apache.spark.sql.SparkSession
21+
import org.apache.spark.sql.catalyst.TableIdentifier
22+
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable}
23+
24+
import org.apache.carbondata.core.catalog.BaseCatalog
25+
26+
trait CarbonCatalog extends BaseCatalog {
27+
28+
def createTable(tableDefinition: CatalogTable,
29+
ignoreIfExists: Boolean,
30+
validateLocation: Boolean = true)(sparkSession: SparkSession): Unit
31+
32+
def tableExists(name: TableIdentifier)(sparkSession: SparkSession): Boolean
33+
34+
def getDatabaseMetadata(name: String)(sparkSession: SparkSession): CatalogDatabase
35+
36+
def getTableMetadata(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): CatalogTable
37+
38+
def listDatabases()(sparkSession: SparkSession): Seq[String]
39+
40+
def listTables(dbName: String)(sparkSession: SparkSession): Seq[TableIdentifier]
41+
42+
def dropTable(name: TableIdentifier,
43+
ignoreIfNotExists: Boolean,
44+
purge: Boolean)(sparkSession: SparkSession): Unit
45+
46+
/**
47+
* This method will write the schema thrift file in carbon store and load table metadata
48+
*/
49+
50+
51+
// def alterTable(wrapperTableInfo: TableInfo)(sparkSession: SparkSession): Unit
52+
}

0 commit comments

Comments
 (0)