Skip to content

Commit 93b9c7a

Browse files
W1thOutW1thOut
W1thOut
authored andcommitted
delete segment expect remaining numbe
Revise modify code add SI test add no partition table teset code checkstyle
1 parent d8f7df9 commit 93b9c7a

File tree

4 files changed

+263
-1
lines changed

4 files changed

+263
-1
lines changed

integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala

+4
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,8 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
180180
protected val REGISTER = carbonKeyWord("REGISTER")
181181
protected val PROPERTIES = carbonKeyWord("PROPERTIES")
182182
protected val REFRESH = carbonKeyWord("REFRESH")
183+
protected val EXPECT = carbonKeyWord("EXPECT")
184+
protected val REMAIN_NUMBER = carbonKeyWord("REMAIN_NUMBER")
183185

184186
// For materialized view
185187
// Keywords used in this parser
@@ -353,4 +355,6 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
353355
p.getClass.getSimpleName.equals("FloatLit") ||
354356
p.getClass.getSimpleName.equals("DecimalLit")
355357
}) ^^ (_.chars)
358+
359+
protected lazy val number: Parser[Int] = numericLit ^^ (_.toInt)
356360
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.command.management
19+
20+
import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
21+
import org.apache.spark.sql.execution.command.{Checker, DataCommand}
22+
23+
import org.apache.carbondata.api.CarbonStore
24+
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
25+
import org.apache.carbondata.core.exception.ConcurrentOperationException
26+
import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
27+
import org.apache.carbondata.events.{withEvents, DeleteSegmentByIdPostEvent, DeleteSegmentByIdPreEvent}
28+
29+
/**
30+
* A command for delete by remaining number.
31+
* In general, keep the latest segment.
32+
*
33+
* @param remaining expected remaining quantity after deletion
34+
*/
35+
case class CarbonDeleteLoadByRemainNumberCommand(
36+
remaining: Int,
37+
databaseNameOp: Option[String],
38+
tableName: String)
39+
extends DataCommand {
40+
41+
override def processData(sparkSession: SparkSession): Seq[Row] = {
42+
Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
43+
val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
44+
setAuditTable(carbonTable)
45+
setAuditInfo(Map("remaining number" -> remaining.toString))
46+
if (!carbonTable.getTableInfo.isTransactionalTable) {
47+
throw new MalformedCarbonCommandException("Unsupported operation on non transactional table")
48+
}
49+
50+
// if insert overwrite in progress, do not allow delete segment
51+
if (SegmentStatusManager.isOverwriteInProgressInTable(carbonTable)) {
52+
throw new ConcurrentOperationException(carbonTable, "insert overwrite", "delete segment")
53+
}
54+
55+
val segments = CarbonStore.readSegments(carbonTable.getTablePath, showHistory = false, None)
56+
57+
// Through the remaining number, get the delete id
58+
val deleteSegmentIds = segments.filter(segment =>
59+
segment.getSegmentStatus == SegmentStatus.SUCCESS ||
60+
segment.getSegmentStatus == SegmentStatus.COMPACTED)
61+
.sortBy(_.getLoadStartTime)
62+
.map(_.getLoadName)
63+
.reverse
64+
.drop(remaining)
65+
66+
if (deleteSegmentIds.length == 0) {
67+
return Seq.empty
68+
}
69+
70+
withEvents(DeleteSegmentByIdPreEvent(carbonTable, deleteSegmentIds, sparkSession),
71+
DeleteSegmentByIdPostEvent(carbonTable, deleteSegmentIds, sparkSession)) {
72+
CarbonStore.deleteLoadById(
73+
deleteSegmentIds,
74+
CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession),
75+
tableName,
76+
carbonTable
77+
)
78+
}
79+
Seq.empty
80+
}
81+
82+
override protected def opName: String = "DELETE SEGMENT BY REMAIN_NUMBER"
83+
}

integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala

+9-1
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
7979

8080
protected lazy val segmentManagement: Parser[LogicalPlan] =
8181
deleteSegmentByID | deleteSegmentByLoadDate | deleteStage | cleanFiles | addSegment |
82-
showSegments
82+
showSegments | deleteSegmentByRemainNumber
8383

8484
protected lazy val restructure: Parser[LogicalPlan] = alterTableDropColumn
8585

@@ -508,6 +508,14 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
508508
CarbonDeleteLoadByIdCommand(loadIds, dbName, tableName.toLowerCase())
509509
}
510510

511+
protected lazy val deleteSegmentByRemainNumber: Parser[LogicalPlan] =
512+
DELETE ~> FROM ~ TABLE ~> (ident <~ ".").? ~ ident ~
513+
(EXPECT ~> (SEGMENT ~ "." ~ REMAIN_NUMBER) ~> "=" ~> number) <~
514+
opt(";") ^^ {
515+
case dbName ~ tableName ~ remaining =>
516+
CarbonDeleteLoadByRemainNumberCommand(remaining, dbName, tableName.toLowerCase())
517+
}
518+
511519
protected lazy val deleteSegmentByLoadDate: Parser[LogicalPlan] =
512520
DELETE ~> FROM ~> TABLE ~> (ident <~ ".").? ~ ident ~
513521
(WHERE ~> (SEGMENT ~ "." ~ STARTTIME ~> BEFORE) ~ stringLit) <~
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.carbondata.spark.testsuite.deletesegment
18+
19+
import org.apache.spark.sql.test.util.QueryTest
20+
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
21+
22+
import org.apache.carbondata.core.constants.CarbonCommonConstants
23+
import org.apache.carbondata.core.util.CarbonProperties
24+
25+
/**
26+
* test class for testing the delete segment expect remaining number.
27+
*/
28+
class DeleteSegmentByRemainNumberTestCase extends QueryTest with BeforeAndAfterAll
29+
with BeforeAndAfterEach {
30+
val DELETED_STATUS = "Marked for Delete"
31+
32+
val SUCCESSFUL_STATUS = "Success"
33+
34+
override def beforeAll {
35+
CarbonProperties.getInstance()
36+
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
37+
.addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "dd-MM-yyyy")
38+
}
39+
40+
override def beforeEach(): Unit = {
41+
sql("drop table if exists deleteSegmentPartitionTable")
42+
sql("drop table if exists deleteSegmentTable")
43+
sql("drop table if exists indexTable")
44+
sql(
45+
"CREATE table deleteSegmentPartitionTable (ID int, date String, country String, name " +
46+
"String, phonetype String, serialname String, salary String) STORED AS carbondata " +
47+
"PARTITIONED by(age int)"
48+
)
49+
sql(
50+
s"""LOAD DATA local inpath '$resourcesPath/dataretention1.csv'
51+
| INTO TABLE deleteSegmentPartitionTable PARTITION (age='20')
52+
| OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
53+
sql(
54+
s"""LOAD DATA local inpath '$resourcesPath/dataretention2.csv'
55+
| INTO TABLE deleteSegmentPartitionTable PARTITION (age='30')
56+
| OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
57+
sql(
58+
s"""LOAD DATA local inpath '$resourcesPath/dataretention3.csv'
59+
| INTO TABLE deleteSegmentPartitionTable PARTITION (age='40')
60+
| OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
61+
62+
sql(
63+
"CREATE table deleteSegmentTable (ID int, date String, country String, name " +
64+
"String, phonetype String, serialname String, salary String) STORED AS carbondata"
65+
)
66+
sql(
67+
s"""LOAD DATA local inpath '$resourcesPath/dataretention1.csv'
68+
| INTO TABLE deleteSegmentTable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
69+
sql(
70+
s"""LOAD DATA local inpath '$resourcesPath/dataretention2.csv'
71+
| INTO TABLE deleteSegmentTable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
72+
sql(
73+
s"""LOAD DATA local inpath '$resourcesPath/dataretention3.csv'
74+
| INTO TABLE deleteSegmentTable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
75+
76+
sql("create index indexTable on table deleteSegmentTable(country) as 'carbondata'" +
77+
"properties('sort_scope'='global_sort', 'Global_sort_partitions'='3')")
78+
}
79+
80+
override def afterAll(): Unit = {
81+
sql("drop table if exists deleteSegmentTable")
82+
sql("drop table if exists deleteSegmentPartitionTable")
83+
sql("drop table if exists indexTable")
84+
}
85+
86+
test("delete segment, remain_number = 1") {
87+
sql("delete from table deleteSegmentTable expect segment.remain_number = 1")
88+
val segments1 = sql("show segments on deleteSegmentTable").collect()
89+
assertResult(SUCCESSFUL_STATUS)(segments1(0).get(1))
90+
assertResult(DELETED_STATUS)(segments1(1).get(1))
91+
assertResult(DELETED_STATUS)(segments1(2).get(1))
92+
assertResult(sql("select * from indexTable").count())(10)
93+
94+
sql("delete from table deleteSegmentPartitionTable expect segment.remain_number = 1")
95+
val segments2 = sql("show segments on deleteSegmentPartitionTable").collect()
96+
assertResult(SUCCESSFUL_STATUS)(segments2(0).get(1))
97+
assertResult(DELETED_STATUS)(segments2(1).get(1))
98+
assertResult(DELETED_STATUS)(segments2(2).get(1))
99+
}
100+
101+
test("delete segment, remain nothing") {
102+
sql("delete from table deleteSegmentTable expect segment.remain_number = 0")
103+
val segments1 = sql("show segments on deleteSegmentTable").collect()
104+
segments1.foreach(row => assertResult(DELETED_STATUS)(row.get(1)))
105+
assertResult(sql("select * from indexTable").count())(0)
106+
107+
sql("delete from table deleteSegmentPartitionTable expect segment.remain_number = 0")
108+
val segments2 = sql("show segments on deleteSegmentPartitionTable").collect()
109+
segments2.foreach(row => assertResult(DELETED_STATUS)(row.get(1)))
110+
}
111+
112+
test("delete segment, remain all") {
113+
sql("delete from table deleteSegmentTable expect segment.remain_number = 3")
114+
val segments1 = sql("show segments on deleteSegmentTable").collect()
115+
segments1.foreach(row => assertResult(SUCCESSFUL_STATUS)(row.get(1)))
116+
assertResult(sql("select * from indexTable").count())(30)
117+
118+
sql("delete from table deleteSegmentPartitionTable expect segment.remain_number = 3")
119+
val segments2 = sql("show segments on deleteSegmentPartitionTable").collect()
120+
segments2.foreach(row => assertResult(SUCCESSFUL_STATUS)(row.get(1)))
121+
}
122+
123+
test("delete segment, remain_number is invalid") {
124+
val ex1 = intercept[Exception] {
125+
sql("delete from table deleteSegmentTable expect segment.remain_number = -1")
126+
}
127+
assert(ex1.getMessage.contains("not found"))
128+
val ex2 = intercept[Exception] {
129+
sql("delete from table deleteSegmentTable expect segment.remain_number = 2147483648")
130+
}
131+
assert(ex2.getMessage.contains("SqlParse"))
132+
assertResult(sql("select * from indexTable").count())(30)
133+
134+
val ex3 = intercept[Exception] {
135+
sql("delete from table deleteSegmentPartitionTable expect segment.remain_number = -1")
136+
}
137+
assert(ex3.getMessage.contains("not found"))
138+
val ex4 = intercept[Exception] {
139+
sql("delete from table deleteSegmentPartitionTable expect segment.remain_number = 2147483648")
140+
}
141+
assert(ex4.getMessage.contains("SqlParse"))
142+
}
143+
144+
test("delete segment after update") {
145+
sql("update deleteSegmentPartitionTable d set (d.country) = ('fr') where d.country = 'aus'")
146+
sql("delete from table deleteSegmentPartitionTable expect segment.remain_number = 1")
147+
val segments2 = sql("select * from deleteSegmentPartitionTable").collect()
148+
segments2.foreach(row => assertResult("fr")(row(2)))
149+
}
150+
151+
test("delete segment after delete newest segment by segmentId") {
152+
sql("delete from table deleteSegmentTable where segment.id in (2)")
153+
sql("delete from table deleteSegmentTable expect segment.remain_number = 1")
154+
val segments1 = sql("show segments on deleteSegmentTable").collect()
155+
assertResult(DELETED_STATUS)(segments1(0).get(1))
156+
assertResult(SUCCESSFUL_STATUS)(segments1(1).get(1))
157+
assertResult(DELETED_STATUS)(segments1(2).get(1))
158+
assertResult(sql("select * from indexTable").count())(10)
159+
160+
sql("delete from table deleteSegmentPartitionTable where segment.id in (2)")
161+
sql("delete from table deleteSegmentPartitionTable expect segment.remain_number = 1")
162+
val segments2 = sql("show segments on deleteSegmentPartitionTable").collect()
163+
assertResult(DELETED_STATUS)(segments2(0).get(1))
164+
assertResult(SUCCESSFUL_STATUS)(segments2(1).get(1))
165+
assertResult(DELETED_STATUS)(segments2(2).get(1))
166+
}
167+
}

0 commit comments

Comments
 (0)