Skip to content

Commit 874764f

Browse files
lionelcaojackylk
authored andcommitted
[CARBONDATA-940] alter table add/split partition for spark 2.1
add/split partition function This closes apache#1192
1 parent 3efd330 commit 874764f

File tree

48 files changed

+2360
-109
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+2360
-109
lines changed

conf/carbon.properties.template

+3
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ carbon.enableXXHash=true
4242
#carbon.max.level.cache.size=-1
4343
#enable prefetch of data during merge sort while reading data from sort temp files in data loading
4444
#carbon.merge.sort.prefetch=true
45+
######## Alter Partition Configuration ########
46+
#Number of cores to be used while alter partition
47+
carbon.number.of.cores.while.alterPartition=2
4548
######## Compaction Configuration ########
4649
#Number of cores to be used while compacting
4750
carbon.number.of.cores.while.compacting=2

core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java

+9
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,10 @@ public final class CarbonCommonConstants {
305305
*/
306306
@CarbonProperty
307307
public static final String NUM_CORES_COMPACTING = "carbon.number.of.cores.while.compacting";
308+
/**
309+
* Number of cores to be used while alter partition
310+
*/
311+
public static final String NUM_CORES_ALT_PARTITION = "carbon.number.of.cores.while.altPartition";
308312
/**
309313
* Number of cores to be used for block sort
310314
*/
@@ -966,6 +970,11 @@ public final class CarbonCommonConstants {
966970
*/
967971
public static final String COMPACTION_KEY_WORD = "COMPACTION";
968972

973+
/**
974+
* Indicates alter partition
975+
*/
976+
public static String ALTER_PARTITION_KEY_WORD = "ALTER_PARTITION";
977+
969978
/**
970979
* hdfs temporary directory key
971980
*/

core/src/main/java/org/apache/carbondata/core/locks/LockUsage.java

+1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ public class LockUsage {
2626
public static final String METADATA_LOCK = "meta.lock";
2727
public static final String COMPACTION_LOCK = "compaction.lock";
2828
public static final String SYSTEMLEVEL_COMPACTION_LOCK = "system_level_compaction.lock";
29+
public static final String ALTER_PARTITION_LOCK = "alter_partition.lock";
2930
public static final String TABLE_STATUS_LOCK = "tablestatus.lock";
3031
public static final String TABLE_UPDATE_STATUS_LOCK = "tableupdatestatus.lock";
3132
public static final String DELETE_SEGMENT_LOCK = "delete_segment.lock";

core/src/main/java/org/apache/carbondata/core/metadata/schema/PartitionInfo.java

+25
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,31 @@ public PartitionInfo(List<ColumnSchema> columnSchemaList, PartitionType partitio
6767
this.partitionIds = new ArrayList<>();
6868
}
6969

70+
/**
71+
* add partition means split default partition, add in last directly
72+
*/
73+
public void addPartition(int addPartitionCount) {
74+
for (int i = 0; i < addPartitionCount; i++) {
75+
partitionIds.add(++MAX_PARTITION);
76+
numPartitions++;
77+
}
78+
}
79+
80+
/**
81+
* e.g. original partition[0,1,2,3,4,5]
82+
* split partition 2 to partition 6,7,8 (will not reuse 2)
83+
* then sourcePartitionId is 2, newPartitionNumbers is 3
84+
* @param sourcePartitionIndex
85+
* @param newPartitionNumbers
86+
*/
87+
public void splitPartition(int sourcePartitionIndex, int newPartitionNumbers) {
88+
partitionIds.remove(sourcePartitionIndex);
89+
for (int i = 0; i < newPartitionNumbers; i++) {
90+
partitionIds.add(sourcePartitionIndex + i, ++MAX_PARTITION);
91+
}
92+
numPartitions = numPartitions - 1 + newPartitionNumbers;
93+
}
94+
7095
public List<ColumnSchema> getColumnSchemaList() {
7196
return columnSchemaList;
7297
}

core/src/main/java/org/apache/carbondata/core/scan/filter/partition/PartitionFilterUtil.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ public static BitSet getPartitionMapForRangeFilter(PartitionInfo partitionInfo,
107107
}
108108
}
109109
} else {
110-
// LessThanEqualTo(<)
110+
// LessThan(<)
111111
outer4:
112112
for (int i = 0; i < partitions; i++) {
113113
for (String value : listInfo.get(i)) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.carbondata.core.scan.result.iterator;
18+
19+
import org.apache.carbondata.common.CarbonIterator;
20+
import org.apache.carbondata.common.logging.LogService;
21+
import org.apache.carbondata.common.logging.LogServiceFactory;
22+
import org.apache.carbondata.core.scan.result.BatchResult;
23+
24+
public class PartitionSpliterRawResultIterator extends CarbonIterator<Object[]> {
25+
26+
private CarbonIterator<BatchResult> iterator;
27+
private BatchResult batch;
28+
private int counter;
29+
30+
private static final LogService LOGGER =
31+
LogServiceFactory.getLogService(PartitionSpliterRawResultIterator.class.getName());
32+
33+
public PartitionSpliterRawResultIterator(CarbonIterator<BatchResult> iterator) {
34+
this.iterator = iterator;
35+
}
36+
37+
@Override public boolean hasNext() {
38+
if (null == batch || checkBatchEnd(batch)) {
39+
if (iterator.hasNext()) {
40+
batch = iterator.next();
41+
counter = 0;
42+
} else {
43+
return false;
44+
}
45+
}
46+
47+
return !checkBatchEnd(batch);
48+
}
49+
50+
@Override public Object[] next() {
51+
if (batch == null) {
52+
batch = iterator.next();
53+
}
54+
if (!checkBatchEnd(batch)) {
55+
return batch.getRawRow(counter++);
56+
} else {
57+
batch = iterator.next();
58+
counter = 0;
59+
}
60+
return batch.getRawRow(counter++);
61+
}
62+
63+
/**
64+
* To check if the batch is processed completely
65+
* @param batch
66+
* @return
67+
*/
68+
private boolean checkBatchEnd(BatchResult batch) {
69+
return !(counter < batch.getSize());
70+
}
71+
72+
}

core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,15 @@
1616
*/
1717
package org.apache.carbondata.core.scan.wrappers;
1818

19+
import java.io.Serializable;
20+
1921
import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
2022

2123
/**
2224
* This class will store the dimension column data when query is executed
2325
* This can be used as a key for aggregation
2426
*/
25-
public class ByteArrayWrapper implements Comparable<ByteArrayWrapper> {
27+
public class ByteArrayWrapper implements Comparable<ByteArrayWrapper>, Serializable {
2628

2729
/**
2830
* to store key which is generated using

core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java

+18
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,20 @@ public String getCarbonIndexFilePath(String taskId, String partitionId, String s
303303
}
304304
}
305305

306+
public String getCarbonIndexFilePath(String taskId, String partitionId, String segmentId,
307+
int batchNo, String bucketNumber, String timeStamp,
308+
ColumnarFormatVersion columnarFormatVersion) {
309+
switch (columnarFormatVersion) {
310+
case V1:
311+
case V2:
312+
return getCarbonIndexFilePath(taskId, partitionId, segmentId, bucketNumber);
313+
default:
314+
String segmentDir = getSegmentDir(partitionId, segmentId);
315+
return segmentDir + File.separator + getCarbonIndexFileName(Integer.parseInt(taskId),
316+
Integer.parseInt(bucketNumber), batchNo, timeStamp);
317+
}
318+
}
319+
306320
private static String getCarbonIndexFileName(String taskNo, int bucketNumber,
307321
String factUpdatedtimeStamp) {
308322
return taskNo + "-" + bucketNumber + "-" + factUpdatedtimeStamp + INDEX_FILE_EXT;
@@ -537,6 +551,10 @@ public static int getTaskIdFromTaskNo(String taskNo) {
537551
return Integer.parseInt(taskNo.split(BATCH_PREFIX)[0]);
538552
}
539553

554+
public static int getBatchNoFromTaskNo(String taskNo) {
555+
return Integer.parseInt(taskNo.split(BATCH_PREFIX)[1]);
556+
}
557+
540558
/**
541559
* Gets the file name from file path
542560
*/

examples/spark2/src/main/resources/partition_data.csv

-25
This file was deleted.

examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala

+62-13
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ object CarbonPartitionExample {
3535
val storeLocation = s"$rootPath/examples/spark2/target/store"
3636
val warehouse = s"$rootPath/examples/spark2/target/warehouse"
3737
val metastoredb = s"$rootPath/examples/spark2/target"
38-
val testData = s"$rootPath/examples/spark2/src/main/resources/partition_data.csv"
38+
val testData = s"$rootPath/integration/spark-common-test/src/test/resources/partition_data.csv"
3939

4040
CarbonProperties.getInstance()
4141
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
@@ -51,45 +51,70 @@ object CarbonPartitionExample {
5151

5252
spark.sparkContext.setLogLevel("WARN")
5353

54-
// none partition table
54+
// range partition with bucket defined
5555
spark.sql("DROP TABLE IF EXISTS t0")
5656
spark.sql("""
5757
| CREATE TABLE IF NOT EXISTS t0
5858
| (
59+
| id Int,
5960
| vin String,
60-
| logdate Timestamp,
6161
| phonenumber Long,
6262
| country String,
63-
| area String
63+
| area String,
64+
| salary Int
6465
| )
66+
| PARTITIONED BY (logdate Timestamp)
6567
| STORED BY 'carbondata'
68+
| TBLPROPERTIES('PARTITION_TYPE'='RANGE',
69+
| 'RANGE_INFO'='2014/01/01, 2015/01/01, 2016/01/01',
70+
| 'BUCKETNUMBER'='3',
71+
| 'BUCKETCOLUMNS'='vin')
6672
""".stripMargin)
6773

68-
// range partition
74+
// none partition table
6975
spark.sql("DROP TABLE IF EXISTS t1")
7076
spark.sql("""
7177
| CREATE TABLE IF NOT EXISTS t1
7278
| (
79+
| id Int,
7380
| vin String,
81+
| logdate Timestamp,
7482
| phonenumber Long,
7583
| country String,
7684
| area String
7785
| )
78-
| PARTITIONED BY (logdate Timestamp)
7986
| STORED BY 'carbondata'
80-
| TBLPROPERTIES('PARTITION_TYPE'='RANGE',
81-
| 'RANGE_INFO'='2014/01/01, 2015/01/01, 2016/01/01')
87+
""".stripMargin)
88+
89+
// list partition
90+
spark.sql("DROP TABLE IF EXISTS t2")
91+
spark.sql("""
92+
| CREATE TABLE IF NOT EXISTS t2
93+
| (
94+
| id Int,
95+
| vin String,
96+
| logdate Timestamp,
97+
| phonenumber Long,
98+
| country String,
99+
| salary Int
100+
| )
101+
| PARTITIONED BY (area String)
102+
| STORED BY 'carbondata'
103+
| TBLPROPERTIES('PARTITION_TYPE'='LIST',
104+
| 'LIST_INFO'='Asia, America, Europe', 'DICTIONARY_EXCLUDE' ='area')
82105
""".stripMargin)
83106

84107
// hash partition
85108
spark.sql("DROP TABLE IF EXISTS t3")
86109
spark.sql("""
87110
| CREATE TABLE IF NOT EXISTS t3
88111
| (
112+
| id Int,
89113
| logdate Timestamp,
90114
| phonenumber Long,
91115
| country String,
92-
| area String
116+
| area String,
117+
| salary Int
93118
| )
94119
| PARTITIONED BY (vin String)
95120
| STORED BY 'carbondata'
@@ -101,17 +126,40 @@ object CarbonPartitionExample {
101126
spark.sql("""
102127
| CREATE TABLE IF NOT EXISTS t5
103128
| (
129+
| id Int,
104130
| vin String,
105131
| logdate Timestamp,
106132
| phonenumber Long,
107-
| area String
133+
| area String,
134+
| salary Int
108135
|)
109136
| PARTITIONED BY (country String)
110137
| STORED BY 'carbondata'
111138
| TBLPROPERTIES('PARTITION_TYPE'='LIST',
112-
| 'LIST_INFO'='(China,United States),UK ,japan,(Canada,Russia), South Korea ')
139+
| 'LIST_INFO'='(China, US),UK ,Japan,(Canada,Russia, Good, NotGood), Korea ')
113140
""".stripMargin)
114141

142+
// load data into partition table
143+
spark.sql(s"""
144+
LOAD DATA LOCAL INPATH '$testData' into table t0 options('BAD_RECORDS_ACTION'='FORCE')
145+
""")
146+
spark.sql(s"""
147+
LOAD DATA LOCAL INPATH '$testData' into table t5 options('BAD_RECORDS_ACTION'='FORCE')
148+
""")
149+
150+
// alter list partition table t5 to add a partition
151+
spark.sql(s"""Alter table t5 add partition ('OutSpace')""".stripMargin)
152+
// alter list partition table t5 to split partition 4 into 3 independent partition
153+
spark.sql(
154+
s"""
155+
Alter table t5 split partition(4) into ('Canada', 'Russia', '(Good, NotGood)')
156+
""".stripMargin)
157+
158+
spark.sql("""select * from t5 where country = 'Good' """).show(100, false)
159+
160+
spark.sql("select * from t0 order by salary ").show(100, false)
161+
spark.sql("select * from t5 order by salary ").show(100, false)
162+
115163
// hive partition table
116164
spark.sql("DROP TABLE IF EXISTS t7")
117165
spark.sql("""
@@ -130,6 +178,7 @@ object CarbonPartitionExample {
130178
spark.sql(s"CREATE DATABASE partitionDB")
131179
spark.sql(s"""
132180
| CREATE TABLE IF NOT EXISTS partitionDB.t9(
181+
| id Int,
133182
| logdate Timestamp,
134183
| phonenumber Int,
135184
| country String,
@@ -145,11 +194,11 @@ object CarbonPartitionExample {
145194

146195
// show partitions
147196
try {
148-
spark.sql("""SHOW PARTITIONS t0""").show(100, false)
197+
spark.sql("""SHOW PARTITIONS t1""").show(100, false)
149198
} catch {
150199
case ex: AnalysisException => LOGGER.error(ex.getMessage())
151200
}
152-
spark.sql("""SHOW PARTITIONS t1""").show(100, false)
201+
spark.sql("""SHOW PARTITIONS t0""").show(100, false)
153202
spark.sql("""SHOW PARTITIONS t3""").show(100, false)
154203
spark.sql("""SHOW PARTITIONS t5""").show(100, false)
155204
spark.sql("""SHOW PARTITIONS t7""").show(100, false)

0 commit comments

Comments
 (0)