Skip to content

Commit 5ed39de

Browse files
ravipesalajackylk
authored andcommitted
[CARBONDATA-1862][PARTITION] Support compaction for partition table
It supports compaction on partition table. There is a change in compaction during the block identification and grouping. As all blocks which are related same partition always needs to group to same set for compaction.So compactor needs to get the partition information from partition map file during compaction of partition table This closes apache#1675
1 parent 3ff55a2 commit 5ed39de

File tree

14 files changed

+353
-50
lines changed

14 files changed

+353
-50
lines changed

Diff for: core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java

+4
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,10 @@ public List<String> getPartitions(String indexFileName) {
276276
return partitionMap.get(indexFileName);
277277
}
278278

279+
public Map<String, List<String>> getPartitionMap() {
280+
return partitionMap;
281+
}
282+
279283
public static class PartitionMapper implements Serializable {
280284

281285
private static final long serialVersionUID = 3582245668420401089L;

Diff for: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -571,7 +571,8 @@ public PartitionInfo getPartitionInfo(String tableName) {
571571
}
572572

573573
public boolean isPartitionTable() {
574-
return null != tablePartitionMap.get(getTableName());
574+
return null != tablePartitionMap.get(getTableName())
575+
&& tablePartitionMap.get(getTableName()).getPartitionType() != PartitionType.NATIVE_HIVE;
575576
}
576577

577578
public boolean isHivePartitionTable() {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.carbondata.spark.testsuite.standardpartition
18+
19+
import org.apache.spark.sql.test.util.QueryTest
20+
import org.scalatest.BeforeAndAfterAll
21+
22+
import org.apache.carbondata.core.constants.CarbonCommonConstants
23+
import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
24+
import org.apache.carbondata.core.datastore.impl.FileFactory
25+
import org.apache.carbondata.core.metadata.CarbonMetadata
26+
import org.apache.carbondata.core.util.CarbonProperties
27+
import org.apache.carbondata.core.util.path.CarbonTablePath
28+
29+
class StandardPartitionTableCompactionTestCase extends QueryTest with BeforeAndAfterAll {
30+
31+
override def beforeAll {
32+
dropTable
33+
34+
CarbonProperties.getInstance()
35+
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
36+
sql(
37+
"""
38+
| CREATE TABLE originTable (empno int, empname String, designation String, doj Timestamp,
39+
| workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
40+
| projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
41+
| utilization int,salary int)
42+
| STORED BY 'org.apache.carbondata.format'
43+
""".stripMargin)
44+
45+
sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE originTable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
46+
sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE originTable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
47+
sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE originTable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
48+
sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE originTable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
49+
50+
}
51+
52+
def validateDataFiles(tableUniqueName: String, segmentId: String, partitions: Int): Unit = {
53+
val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
54+
val tablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier,
55+
carbonTable.getTablePath)
56+
val segmentDir = tablePath.getCarbonDataDirectoryPath("0", segmentId)
57+
val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir))
58+
val dataFiles = carbonFile.listFiles(new CarbonFileFilter() {
59+
override def accept(file: CarbonFile): Boolean = {
60+
return file.getName.endsWith(".partitionmap")
61+
}
62+
})
63+
assert(dataFiles.length == partitions)
64+
}
65+
66+
test("data compaction for partition table for one partition column") {
67+
sql(
68+
"""
69+
| CREATE TABLE partitionone (empname String, designation String, doj Timestamp,
70+
| workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
71+
| projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
72+
| utilization int,salary int)
73+
| PARTITIONED BY (empno int)
74+
| STORED BY 'org.apache.carbondata.format'
75+
""".stripMargin)
76+
sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionone OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
77+
sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionone OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
78+
sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionone OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
79+
sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionone OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
80+
81+
sql("ALTER TABLE partitionone COMPACT 'MINOR'").collect()
82+
83+
validateDataFiles("default_partitionone", "0.1", 1)
84+
85+
checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from partitionone where empno=11 order by empno"),
86+
sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable where empno=11 order by empno"))
87+
88+
}
89+
90+
91+
test("data compaction for partition table for three partition column") {
92+
sql(
93+
"""
94+
| CREATE TABLE partitionthree (empno int, doj Timestamp,
95+
| workgroupcategoryname String, deptno int, deptname String,
96+
| projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
97+
| utilization int,salary int)
98+
| PARTITIONED BY (workgroupcategory int, empname String, designation String)
99+
| STORED BY 'org.apache.carbondata.format'
100+
""".stripMargin)
101+
sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionthree OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
102+
sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionthree OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
103+
sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionthree OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
104+
sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionthree OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
105+
sql("ALTER TABLE partitionthree COMPACT 'MINOR'").collect()
106+
107+
validateDataFiles("default_partitionthree", "0.1", 1)
108+
109+
checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from partitionthree where workgroupcategory=1 and empname='arvind' and designation='SE' order by empno"),
110+
sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable where workgroupcategory=1 and empname='arvind' and designation='SE' order by empno"))
111+
}
112+
113+
test("data major compaction for partition table") {
114+
sql(
115+
"""
116+
| CREATE TABLE partitionmajor (empno int, doj Timestamp,
117+
| workgroupcategoryname String, deptno int, deptname String,
118+
| projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
119+
| utilization int,salary int)
120+
| PARTITIONED BY (workgroupcategory int, empname String, designation String)
121+
| STORED BY 'org.apache.carbondata.format'
122+
""".stripMargin)
123+
sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionmajor OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
124+
sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionmajor OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
125+
sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionmajor OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
126+
sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionmajor OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
127+
sql("ALTER TABLE partitionmajor COMPACT 'MINOR'").collect()
128+
sql(s"""ALTER TABLE partitionmajor DROP PARTITION(workgroupcategory='1')""")
129+
sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionmajor OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
130+
sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionmajor OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
131+
sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionmajor OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
132+
sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionmajor OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
133+
val rows = sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from partitionmajor where workgroupcategory=1 and empname='arvind' and designation='SE' order by empno").collect()
134+
sql("ALTER TABLE partitionmajor COMPACT 'MAJOR'").collect()
135+
validateDataFiles("default_partitionmajor", "0.2", 1)
136+
checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from partitionmajor where workgroupcategory=1 and empname='arvind' and designation='SE' order by empno"),
137+
rows)
138+
}
139+
140+
test("data compaction for static partition table") {
141+
sql(
142+
"""
143+
| CREATE TABLE staticpartition (empno int, doj Timestamp,
144+
| workgroupcategoryname String, deptno int,
145+
| projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
146+
| utilization int,salary int,workgroupcategory int, empname String, designation String)
147+
| PARTITIONED BY (deptname String)
148+
| STORED BY 'org.apache.carbondata.format'
149+
""".stripMargin)
150+
sql(s"""insert into staticpartition PARTITION(deptname='software') select empno,doj,workgroupcategoryname,deptno,projectcode,projectjoindate,projectenddate,attendance,utilization,salary,workgroupcategory,empname,designation from originTable""")
151+
sql(s"""insert into staticpartition PARTITION(deptname='software') select empno,doj,workgroupcategoryname,deptno,projectcode,projectjoindate,projectenddate,attendance,utilization,salary,workgroupcategory,empname,designation from originTable""")
152+
sql(s"""insert into staticpartition PARTITION(deptname='finance') select empno,doj,workgroupcategoryname,deptno,projectcode,projectjoindate,projectenddate,attendance,utilization,salary,workgroupcategory,empname,designation from originTable""")
153+
sql(s"""insert into staticpartition PARTITION(deptname='finance') select empno,doj,workgroupcategoryname,deptno,projectcode,projectjoindate,projectenddate,attendance,utilization,salary,workgroupcategory,empname,designation from originTable""")
154+
val p1 = sql(s"""select count(*) from staticpartition where deptname='software'""").collect()
155+
val p2 = sql(s"""select count(*) from staticpartition where deptname='finance'""").collect()
156+
sql("ALTER TABLE staticpartition COMPACT 'MINOR'").collect()
157+
158+
validateDataFiles("default_staticpartition", "0.1", 1)
159+
160+
checkAnswer(sql(s"""select count(*) from staticpartition where deptname='software'"""), p1)
161+
checkAnswer(sql(s"""select count(*) from staticpartition where deptname='finance'"""), p2)
162+
}
163+
164+
override def afterAll = {
165+
dropTable
166+
}
167+
168+
def dropTable = {
169+
sql("drop table if exists originTable")
170+
sql("drop table if exists originMultiLoads")
171+
sql("drop table if exists partitionone")
172+
sql("drop table if exists partitiontwo")
173+
sql("drop table if exists partitionthree")
174+
sql("drop table if exists partitionmajor")
175+
sql("drop table if exists staticpartition")
176+
}
177+
178+
}

Diff for: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala

+1-3
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ class StandardPartitionTableQueryTestCase extends QueryTest with BeforeAndAfterA
182182
"doj=2014-08-15 00:00:00",
183183
"projectenddate=2016-12-30"))
184184
checkAnswer(frame1,
185-
sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable where doj>'2006-01-17 00:00:00'"))
185+
sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable where doj>cast('2006-01-17 00:00:00' as Timestamp)"))
186186

187187
}
188188

@@ -209,8 +209,6 @@ class StandardPartitionTableQueryTestCase extends QueryTest with BeforeAndAfterA
209209
sql("drop table if exists partitionmany")
210210
sql("drop table if exists partitiondate")
211211
sql("drop table if exists partitiondateinsert")
212-
sql("drop table if exists staticpartitionone")
213-
sql("drop table if exists singlepasspartitionone")
214212
}
215213

216214
}

0 commit comments

Comments
 (0)