Skip to content

Commit fd47a3f

Browse files
committed
added a catalog API which can be extended by any external catalog
1 parent 41831ce commit fd47a3f

File tree

46 files changed

+575
-614
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+575
-614
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.carbondata.core.catalog;
19+
20+
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
21+
import org.apache.carbondata.format.TableInfo;
22+
23+
public interface BaseCatalog {
24+
25+
long getLastSchemaModificationTime(String schemaFilePath);
26+
27+
long saveSchema(AbsoluteTableIdentifier identifier, TableInfo thriftTableInfo);
28+
29+
TableInfo getSchema(String schemaFilePath);
30+
31+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.carbondata.core.catalog;
19+
20+
import org.apache.carbondata.common.logging.LogServiceFactory;
21+
import org.apache.carbondata.core.constants.CarbonCommonConstants;
22+
import org.apache.carbondata.core.util.CarbonProperties;
23+
24+
import org.apache.log4j.Logger;
25+
26+
public class CatalogFactory {
27+
28+
private static final Logger LOGGER =
29+
LogServiceFactory.getLogService(CatalogFactory.class.getName());
30+
31+
private static final CatalogFactory INSTANCE = new CatalogFactory();
32+
33+
private Object catalog;
34+
35+
private CatalogFactory() {
36+
String catalogClass = CarbonProperties.getInstance()
37+
.getProperty(CarbonCommonConstants.CARBON_CATALOG_IMPL,
38+
CarbonCommonConstants.CARBON_CATALOG_IMPL_DEFAULT);
39+
try {
40+
catalog = Class.forName(catalogClass).newInstance();
41+
} catch (Exception e) {
42+
LOGGER.error("Error while loading class: " + catalogClass);
43+
}
44+
}
45+
46+
public static CatalogFactory getInstance() {
47+
return INSTANCE;
48+
}
49+
50+
public BaseCatalog getCatalog() {
51+
return ((BaseCatalog) catalog);
52+
}
53+
54+
public <T extends BaseCatalog> T getCatalog(Class<T> type) {
55+
return type.cast(catalog);
56+
}
57+
}

core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java

+6
Original file line numberDiff line numberDiff line change
@@ -2876,4 +2876,10 @@ private CarbonCommonConstants() {
28762876

28772877
public static final String CARBON_ENABLE_SCHEMA_ENFORCEMENT_DEFAULT = "true";
28782878

2879+
@CarbonProperty
2880+
public static final String CARBON_CATALOG_IMPL = "carbon.catalog.impl";
2881+
2882+
@CarbonProperty public static final String CARBON_CATALOG_IMPL_DEFAULT =
2883+
"org.apache.spark.sql.catalog.CarbonCatalogImpl";
2884+
28792885
}

core/src/main/java/org/apache/carbondata/core/metadata/converter/SchemaConverter.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,7 @@ org.apache.carbondata.format.TableSchema fromWrapperToExternalTableSchema(
6161
* @param tableName
6262
* @return
6363
*/
64-
org.apache.carbondata.format.TableInfo fromWrapperToExternalTableInfo(TableInfo wrapperTableInfo,
65-
String dbName, String tableName);
64+
org.apache.carbondata.format.TableInfo fromWrapperToExternalTableInfo(TableInfo wrapperTableInfo);
6665

6766
/**
6867
* @param externalSchemaEvolutionEntry

core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,7 @@ private org.apache.carbondata.format.BucketingInfo fromWrapperToExternalBucketin
306306
*/
307307
@Override
308308
public org.apache.carbondata.format.TableInfo fromWrapperToExternalTableInfo(
309-
TableInfo wrapperTableInfo, String dbName, String tableName) {
309+
TableInfo wrapperTableInfo) {
310310
org.apache.carbondata.format.TableSchema thriftFactTable =
311311
fromWrapperToExternalTableSchema(wrapperTableInfo.getFactTable());
312312
return new org.apache.carbondata.format.TableInfo(thriftFactTable, new ArrayList<>());

core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java

+2-8
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import java.util.UUID;
5151

5252
import org.apache.carbondata.common.logging.LogServiceFactory;
53+
import org.apache.carbondata.core.catalog.CatalogFactory;
5354
import org.apache.carbondata.core.constants.CarbonCommonConstants;
5455
import org.apache.carbondata.core.datastore.TableSpec;
5556
import org.apache.carbondata.core.datastore.block.AbstractIndex;
@@ -101,7 +102,6 @@
101102
import org.apache.carbondata.core.mutate.UpdateVO;
102103
import org.apache.carbondata.core.reader.CarbonHeaderReader;
103104
import org.apache.carbondata.core.reader.CarbonIndexFileReader;
104-
import org.apache.carbondata.core.reader.ThriftReader;
105105
import org.apache.carbondata.core.reader.ThriftReader.TBaseCreator;
106106
import org.apache.carbondata.core.scan.model.ProjectionDimension;
107107
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
@@ -2015,13 +2015,7 @@ public static org.apache.carbondata.format.TableInfo readSchemaFile(String schem
20152015
public static org.apache.carbondata.format.TableInfo readSchemaFile(String schemaFilePath,
20162016
Configuration conf)
20172017
throws IOException {
2018-
TBaseCreator createTBase = org.apache.carbondata.format.TableInfo::new;
2019-
ThriftReader thriftReader = new ThriftReader(schemaFilePath, createTBase, conf);
2020-
thriftReader.open();
2021-
org.apache.carbondata.format.TableInfo tableInfo =
2022-
(org.apache.carbondata.format.TableInfo) thriftReader.read();
2023-
thriftReader.close();
2024-
return tableInfo;
2018+
return CatalogFactory.getInstance().getCatalog().getSchema(schemaFilePath);
20252019
}
20262020

20272021
public static ColumnSchema thriftColumnSchemaToWrapperColumnSchema(

core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1425,7 +1425,7 @@ public List<org.apache.carbondata.format.SchemaEvolutionEntry> getSchema_evoluti
14251425
org.apache.carbondata.format.TableSchema thriftFactTable =
14261426
new org.apache.carbondata.format.TableSchema("tableId", thriftColumnSchemas, schemaEvol);
14271427
org.apache.carbondata.format.TableInfo actualResult = thriftWrapperSchemaConverter
1428-
.fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName);
1428+
.fromWrapperToExternalTableInfo(wrapperTableInfo);
14291429
org.apache.carbondata.format.TableInfo expectedResult =
14301430
new org.apache.carbondata.format.TableInfo(thriftFactTable, new ArrayList<org.apache
14311431
.carbondata.format.TableSchema>());

core/src/test/java/org/apache/carbondata/core/metadata/schema/table/TableInfoTest.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,7 @@ private TableInfo getTableInfo(String databaseName, String tableName) {
9393
CarbonTable carbonTable = CarbonTable.buildFromTableInfo(tableInfo);
9494
ThriftWrapperSchemaConverterImpl schemaConverter = new ThriftWrapperSchemaConverterImpl();
9595
org.apache.carbondata.format.TableInfo thriftTable = schemaConverter
96-
.fromWrapperToExternalTableInfo(carbonTable.getTableInfo(), carbonTable.getDatabaseName(),
97-
carbonTable.getTableName());
96+
.fromWrapperToExternalTableInfo(carbonTable.getTableInfo());
9897
assertTrue(null != thriftTable);
9998
}
10099

@@ -135,8 +134,7 @@ private TableInfo getTableInfo(String databaseName, String tableName) {
135134
CarbonTable carbonTable = CarbonTable.buildFromTableInfo(tableInfo);
136135
ThriftWrapperSchemaConverterImpl schemaConverter = new ThriftWrapperSchemaConverterImpl();
137136
org.apache.carbondata.format.TableInfo thriftTable = schemaConverter
138-
.fromWrapperToExternalTableInfo(carbonTable.getTableInfo(), carbonTable.getDatabaseName(),
139-
carbonTable.getTableName());
137+
.fromWrapperToExternalTableInfo(carbonTable.getTableInfo());
140138
assertTrue(null != thriftTable);
141139
}
142140

examples/spark/pom.xml

+3-9
Original file line numberDiff line numberDiff line change
@@ -198,15 +198,6 @@
198198
<maven.test.skip>true</maven.test.skip>
199199
</properties>
200200
</profile>
201-
<profile>
202-
<id>spark-2.3</id>
203-
<activation>
204-
<activeByDefault>true</activeByDefault>
205-
</activation>
206-
<properties>
207-
<spark.binary.version>2.3</spark.binary.version>
208-
</properties>
209-
</profile>
210201
<profile>
211202
<id>spark-2.4</id>
212203
<properties>
@@ -215,6 +206,9 @@
215206
</profile>
216207
<profile>
217208
<id>spark-3.1</id>
209+
<activation>
210+
<activeByDefault>true</activeByDefault>
211+
</activation>
218212
<properties>
219213
<spark.binary.version>3.1</spark.binary.version>
220214
<dep.jackson.version>2.10.0</dep.jackson.version>

hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java

+1-4
Original file line numberDiff line numberDiff line change
@@ -228,10 +228,7 @@ public CarbonTable createTable(
228228

229229
SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
230230
org.apache.carbondata.format.TableInfo thriftTableInfo =
231-
schemaConverter.fromWrapperToExternalTableInfo(
232-
tableInfo,
233-
tableInfo.getDatabaseName(),
234-
tableInfo.getFactTable().getTableName());
231+
schemaConverter.fromWrapperToExternalTableInfo(tableInfo);
235232
org.apache.carbondata.format.SchemaEvolutionEntry schemaEvolutionEntry =
236233
new org.apache.carbondata.format.SchemaEvolutionEntry(tableInfo.getLastUpdatedTime());
237234
thriftTableInfo.getFact_table().getSchema_evolution().getSchema_evolution_history()

index/examples/pom.xml

-6
Original file line numberDiff line numberDiff line change
@@ -79,12 +79,6 @@
7979
</build>
8080

8181
<profiles>
82-
<profile>
83-
<id>spark-2.3</id>
84-
<properties>
85-
<spark.binary.version>2.3</spark.binary.version>
86-
</properties>
87-
</profile>
8882
<profile>
8983
<id>spark-2.4</id>
9084
<properties>

index/secondary-index/pom.xml

+3-9
Original file line numberDiff line numberDiff line change
@@ -156,15 +156,6 @@
156156
<maven.test.skip>true</maven.test.skip>
157157
</properties>
158158
</profile>
159-
<profile>
160-
<id>spark-2.3</id>
161-
<activation>
162-
<activeByDefault>true</activeByDefault>
163-
</activation>
164-
<properties>
165-
<spark.binary.version>2.3</spark.binary.version>
166-
</properties>
167-
</profile>
168159
<profile>
169160
<id>spark-2.4</id>
170161
<properties>
@@ -173,6 +164,9 @@
173164
</profile>
174165
<profile>
175166
<id>spark-3.1</id>
167+
<activation>
168+
<activeByDefault>true</activeByDefault>
169+
</activation>
176170
<properties>
177171
<spark.binary.version>3.1</spark.binary.version>
178172
</properties>

integration/flink/pom.xml

+3-23
Original file line numberDiff line numberDiff line change
@@ -218,29 +218,6 @@
218218
</dependencies>
219219

220220
<profiles>
221-
<profile>
222-
<id>spark-2.3</id>
223-
<activation>
224-
<activeByDefault>true</activeByDefault>
225-
</activation>
226-
<properties>
227-
<spark.binary.version>2.3</spark.binary.version>
228-
</properties>
229-
<dependencies>
230-
<dependency>
231-
<groupId>org.apache.carbondata</groupId>
232-
<artifactId>carbondata-spark_${spark.binary.version}</artifactId>
233-
<version>${project.version}</version>
234-
<scope>test</scope>
235-
<exclusions>
236-
<exclusion>
237-
<groupId>org.apache.hive</groupId>
238-
<artifactId>hive-exec</artifactId>
239-
</exclusion>
240-
</exclusions>
241-
</dependency>
242-
</dependencies>
243-
</profile>
244221
<profile>
245222
<id>spark-2.4</id>
246223
<properties>
@@ -263,6 +240,9 @@
263240
</profile>
264241
<profile>
265242
<id>spark-3.1</id>
243+
<activation>
244+
<activeByDefault>true</activeByDefault>
245+
</activation>
266246
<properties>
267247
<spark.binary.version>3.1</spark.binary.version>
268248
</properties>

integration/hive/src/main/java/org/apache/carbondata/hive/util/HiveCarbonUtil.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -255,8 +255,7 @@ private static void writeSchemaFile(TableInfo tableInfo) throws IOException {
255255
ThriftWriter thriftWriter = new ThriftWriter(schemaFilePath, false);
256256
thriftWriter.open(FileWriteOperation.OVERWRITE);
257257
thriftWriter.write(schemaConverter
258-
.fromWrapperToExternalTableInfo(tableInfo, tableInfo.getDatabaseName(),
259-
tableInfo.getFactTable().getTableName()));
258+
.fromWrapperToExternalTableInfo(tableInfo));
260259
thriftWriter.close();
261260
}
262261

integration/presto/src/main/prestosql/org/apache/carbondata/presto/impl/CarbonTableReader.java

+3-24
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,16 @@
2121
import java.util.ArrayList;
2222
import java.util.Arrays;
2323
import java.util.HashMap;
24-
import java.util.HashSet;
2524
import java.util.List;
2625
import java.util.Map;
2726
import java.util.Objects;
28-
import java.util.Set;
2927
import java.util.concurrent.ConcurrentHashMap;
3028
import java.util.concurrent.atomic.AtomicReference;
3129
import java.util.stream.Collectors;
3230

3331
import org.apache.carbondata.common.logging.LogServiceFactory;
32+
import org.apache.carbondata.core.catalog.BaseCatalog;
33+
import org.apache.carbondata.core.catalog.CatalogFactory;
3434
import org.apache.carbondata.core.constants.CarbonCommonConstants;
3535
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
3636
import org.apache.carbondata.core.datastore.impl.FileFactory;
@@ -40,38 +40,30 @@
4040
import org.apache.carbondata.core.indexstore.PartitionSpec;
4141
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
4242
import org.apache.carbondata.core.metadata.CarbonMetadata;
43-
import org.apache.carbondata.core.metadata.SegmentFileStore;
4443
import org.apache.carbondata.core.metadata.converter.SchemaConverter;
4544
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
4645
import org.apache.carbondata.core.metadata.index.IndexType;
4746
import org.apache.carbondata.core.metadata.schema.indextable.IndexMetadata;
4847
import org.apache.carbondata.core.metadata.schema.indextable.IndexTableInfo;
4948
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
5049
import org.apache.carbondata.core.metadata.schema.table.TableInfo;
51-
import org.apache.carbondata.core.reader.ThriftReader;
5250
import org.apache.carbondata.core.scan.expression.Expression;
5351
import org.apache.carbondata.core.statusmanager.FileFormat;
54-
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
5552
import org.apache.carbondata.core.util.CarbonProperties;
5653
import org.apache.carbondata.core.util.CarbonUtil;
5754
import org.apache.carbondata.core.util.path.CarbonTablePath;
5855
import org.apache.carbondata.hadoop.CarbonInputSplit;
5956
import org.apache.carbondata.hadoop.api.CarbonInputFormat;
6057
import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
61-
import org.apache.carbondata.presto.PrestoFilterUtil;
6258

6359
import com.google.gson.Gson;
6460
import com.google.inject.Inject;
65-
import io.prestosql.plugin.hive.HiveColumnHandle;
6661
import io.prestosql.spi.connector.SchemaTableName;
67-
import io.prestosql.spi.predicate.TupleDomain;
68-
import org.apache.commons.collections.CollectionUtils;
6962
import org.apache.hadoop.conf.Configuration;
7063
import org.apache.hadoop.mapred.JobConf;
7164
import org.apache.hadoop.mapreduce.InputSplit;
7265
import org.apache.hadoop.mapreduce.Job;
7366
import org.apache.log4j.Logger;
74-
import org.apache.thrift.TBase;
7567

7668
import static org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY;
7769
import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
@@ -192,20 +184,7 @@ private CarbonTableCacheModel parseCarbonMetadata(SchemaTableName table, String
192184
org.apache.carbondata.format.TableInfo tableInfo;
193185
long modifiedTime = System.currentTimeMillis();
194186
if (isTransactionalTable) {
195-
//Step 2: read the metadata (tableInfo) of the table.
196-
ThriftReader.TBaseCreator createTBase = new ThriftReader.TBaseCreator() {
197-
// TBase is used to read and write thrift objects.
198-
// TableInfo is a kind of TBase used to read and write table information.
199-
// TableInfo is generated by thrift,
200-
// see schema.thrift under format/src/main/thrift for details.
201-
public TBase create() {
202-
return new org.apache.carbondata.format.TableInfo();
203-
}
204-
};
205-
ThriftReader thriftReader = new ThriftReader(schemaFilePath, createTBase, config);
206-
thriftReader.open();
207-
tableInfo = (org.apache.carbondata.format.TableInfo) thriftReader.read();
208-
thriftReader.close();
187+
tableInfo = CatalogFactory.getInstance().getCatalog().getSchema(schemaFilePath);
209188
modifiedTime = schemaFile.getLastModifiedTime();
210189
} else {
211190
tableInfo = CarbonUtil.inferSchema(tablePath, table.getTableName(), false, config);

integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala

+1-4
Original file line numberDiff line numberDiff line change
@@ -153,10 +153,7 @@ object CarbonDataStoreCreator {
153153
val schemaConverter: SchemaConverter =
154154
new ThriftWrapperSchemaConverterImpl()
155155
val thriftTableInfo: TableInfo =
156-
schemaConverter.fromWrapperToExternalTableInfo(
157-
tableInfo,
158-
tableInfo.getDatabaseName,
159-
tableInfo.getFactTable.getTableName)
156+
schemaConverter.fromWrapperToExternalTableInfo(tableInfo)
160157
val schemaEvolutionEntry: SchemaEvolutionEntry =
161158
new org.apache.carbondata.format.SchemaEvolutionEntry(
162159
tableInfo.getLastUpdatedTime)

0 commit comments

Comments
 (0)