Skip to content

Commit e967350

Browse files
W1thOutW1thOut
W1thOut
authored andcommitted
delete segment expect remaining numbe
Revise
1 parent d8f7df9 commit e967350

File tree

4 files changed

+193
-1
lines changed

4 files changed

+193
-1
lines changed

integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala

+2
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,8 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
180180
protected val REGISTER = carbonKeyWord("REGISTER")
181181
protected val PROPERTIES = carbonKeyWord("PROPERTIES")
182182
protected val REFRESH = carbonKeyWord("REFRESH")
183+
protected val EXPECT = carbonKeyWord("EXPECT")
184+
protected val REMAIN_NUMBER = carbonKeyWord("REMAIN_NUMBER")
183185

184186
// For materialized view
185187
// Keywords used in this parser
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.command.management
19+
20+
import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
21+
import org.apache.spark.sql.execution.command.{Checker, DataCommand}
22+
import org.apache.carbondata.api.CarbonStore
23+
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
24+
import org.apache.carbondata.core.exception.ConcurrentOperationException
25+
import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
26+
import org.apache.carbondata.events.{DeleteSegmentByIdPostEvent, DeleteSegmentByIdPreEvent, withEvents}
27+
28+
/**
29+
* A command for delete by remaining number.
30+
* In general, keep the latest segment.
31+
*
32+
* @param remaining expected remaining quantity after deletion
33+
*/
34+
case class CarbonDeleteLoadByRemainNumberCommand(
35+
remaining: String,
36+
databaseNameOp: Option[String],
37+
tableName: String)
38+
extends DataCommand {
39+
40+
override def processData(sparkSession: SparkSession): Seq[Row] = {
41+
Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
42+
val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
43+
setAuditTable(carbonTable)
44+
setAuditInfo(Map("remaining number" -> remaining))
45+
if (!carbonTable.getTableInfo.isTransactionalTable) {
46+
throw new MalformedCarbonCommandException("Unsupported operation on non transactional table")
47+
}
48+
49+
// if insert overwrite in progress, do not allow delete segment
50+
if (SegmentStatusManager.isOverwriteInProgressInTable(carbonTable)) {
51+
throw new ConcurrentOperationException(carbonTable, "insert overwrite", "delete segment")
52+
}
53+
54+
val segments = CarbonStore.readSegments(carbonTable.getTablePath, showHistory = false, None)
55+
if (segments.length == remaining.toInt) {
56+
return Seq.empty
57+
}
58+
59+
// Through the remaining number, get the delete id
60+
val deleteSegmentIds = segments.filter(segment => segment.getSegmentStatus != SegmentStatus.MARKED_FOR_DELETE)
61+
.sortBy(_.getLoadStartTime)
62+
.map(_.getLoadName)
63+
.reverse
64+
.drop(remaining.toInt)
65+
66+
withEvents(DeleteSegmentByIdPreEvent(carbonTable, deleteSegmentIds, sparkSession),
67+
DeleteSegmentByIdPostEvent(carbonTable, deleteSegmentIds, sparkSession)) {
68+
CarbonStore.deleteLoadById(
69+
deleteSegmentIds,
70+
CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession),
71+
tableName,
72+
carbonTable
73+
)
74+
}
75+
Seq.empty
76+
}
77+
78+
override protected def opName: String = "DELETE SEGMENT BY REMAIN_NUMBER"
79+
}

integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala

+9-1
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
7979

8080
protected lazy val segmentManagement: Parser[LogicalPlan] =
8181
deleteSegmentByID | deleteSegmentByLoadDate | deleteStage | cleanFiles | addSegment |
82-
showSegments
82+
showSegments | deleteSegmentByRemainNumber
8383

8484
protected lazy val restructure: Parser[LogicalPlan] = alterTableDropColumn
8585

@@ -508,6 +508,14 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
508508
CarbonDeleteLoadByIdCommand(loadIds, dbName, tableName.toLowerCase())
509509
}
510510

511+
protected lazy val deleteSegmentByRemainNumber: Parser[LogicalPlan] =
512+
DELETE ~> FROM ~ TABLE ~> (ident <~ ".").? ~ ident ~
513+
(EXPECT ~> (SEGMENT ~ "." ~ REMAIN_NUMBER) ~> "=" ~> segmentId) <~
514+
opt(";") ^^ {
515+
case dbName ~ tableName ~ loadIds =>
516+
CarbonDeleteLoadByRemainNumberCommand(loadIds, dbName, tableName.toLowerCase())
517+
}
518+
511519
protected lazy val deleteSegmentByLoadDate: Parser[LogicalPlan] =
512520
DELETE ~> FROM ~> TABLE ~> (ident <~ ".").? ~ ident ~
513521
(WHERE ~> (SEGMENT ~ "." ~ STARTTIME ~> BEFORE) ~ stringLit) <~
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.carbondata.spark.testsuite.deletesegment
18+
19+
import org.apache.spark.sql.test.util.QueryTest
20+
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
21+
22+
import org.apache.carbondata.core.constants.CarbonCommonConstants
23+
import org.apache.carbondata.core.util.CarbonProperties
24+
25+
/**
26+
* test class for testing the delete segment expect remaining number.
27+
*/
28+
class DeleteSegmentByRemainNumberTestCase extends QueryTest with BeforeAndAfterAll with BeforeAndAfterEach {
29+
val DELETED_STATUS = "Marked for Delete"
30+
31+
val SUCCESSFUL_STATUS = "Success"
32+
33+
override def beforeAll {
34+
CarbonProperties.getInstance()
35+
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
36+
.addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "dd-MM-yyyy")
37+
}
38+
39+
override def beforeEach(): Unit = {
40+
initTestTable
41+
}
42+
43+
test("delete segment, remain_number = 1") {
44+
sql("delete from table deleteSegmentTable expect segment.remain_number = 1")
45+
val rows = sql("show segments on deleteSegmentTable").collect()
46+
assertResult(SUCCESSFUL_STATUS)(rows(0).get(1))
47+
assertResult(DELETED_STATUS)(rows(1).get(1))
48+
assertResult(DELETED_STATUS)(rows(2).get(1))
49+
}
50+
51+
test("delete segment, remain nothing") {
52+
sql("delete from table deleteSegmentTable expect segment.remain_number = 0")
53+
val rows = sql("show segments on deleteSegmentTable").collect()
54+
rows.foreach(row => assertResult(DELETED_STATUS)(row.get(1)))
55+
}
56+
57+
test("delete segment, remain all") {
58+
sql("delete from table deleteSegmentTable expect segment.remain_number = 3")
59+
val rows = sql("show segments on deleteSegmentTable").collect()
60+
rows.foreach(row => assertResult(SUCCESSFUL_STATUS)(row.get(1)))
61+
}
62+
63+
test("delete segment, remain_number = -1") {
64+
val ex = intercept[Exception] {
65+
sql("delete from table deleteSegmentTable expect segment.remain_number = -1")
66+
}
67+
assert(ex.getMessage.contains("not found"))
68+
}
69+
70+
test("delete segment after update") {
71+
sql("update deleteSegmentTable d set (d.country) = ('fr') where d.country = 'aus'")
72+
sql("delete from table deleteSegmentTable expect segment.remain_number = 1")
73+
val rows = sql("select * from deleteSegmentTable").collect()
74+
rows.foreach(row => assertResult("fr")(row(2)))
75+
}
76+
77+
test("delete segment after delete newest segment by segmentId") {
78+
sql("delete from table deleteSegmentTable where segment.id in (2)")
79+
sql("delete from table deleteSegmentTable expect segment.remain_number = 1")
80+
val rows = sql("show segments on deleteSegmentTable").collect()
81+
assertResult(DELETED_STATUS)(rows(0).get(1))
82+
assertResult(SUCCESSFUL_STATUS)(rows(1).get(1))
83+
assertResult(DELETED_STATUS)(rows(2).get(1))
84+
}
85+
86+
private def initTestTable = {
87+
sql("drop table if exists deleteSegmentTable")
88+
sql(
89+
"CREATE table deleteSegmentTable (ID int, date String, country String, name " +
90+
"String, phonetype String, serialname String, salary String) STORED AS carbondata " +
91+
"PARTITIONED by(age int)"
92+
)
93+
sql(
94+
s"""LOAD DATA local inpath '$resourcesPath/dataretention1.csv'
95+
| INTO TABLE deleteSegmentTable PARTITION (age='20') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
96+
sql(
97+
s"""LOAD DATA local inpath '$resourcesPath/dataretention2.csv'
98+
| INTO TABLE deleteSegmentTable PARTITION (age='30') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
99+
sql(
100+
s"""LOAD DATA local inpath '$resourcesPath/dataretention3.csv'
101+
| INTO TABLE deleteSegmentTable PARTITION (age='40') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
102+
}
103+
}

0 commit comments

Comments
 (0)