Skip to content

Commit 63b6359

Browse files
[Enhancement] Delete shards meta from starmgr after recycling partition for cloud-native cluster (backport #56691) (#57282)
Co-authored-by: Drake Wang <[email protected]>
1 parent 5de0d5f commit 63b6359

File tree

5 files changed

+95
-0
lines changed

5 files changed

+95
-0
lines changed

Diff for: fe/fe-core/src/main/java/com/starrocks/lake/LakeTableHelper.java

+24
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,12 @@
4343
import org.apache.logging.log4j.LogManager;
4444
import org.apache.logging.log4j.Logger;
4545

46+
import java.util.ArrayList;
47+
import java.util.Collection;
48+
import java.util.HashSet;
4649
import java.util.List;
4750
import java.util.Optional;
51+
import java.util.Set;
4852

4953
public class LakeTableHelper {
5054
private static final Logger LOG = LogManager.getLogger(LakeTableHelper.class);
@@ -160,6 +164,26 @@ static boolean removePartitionDirectory(Partition partition, long warehouseId) t
160164
return ret;
161165
}
162166

167+
/**
168+
* delete `partition`'s all shard group meta (shards meta included) from starmanager
169+
*/
170+
static void deleteShardGroupMeta(Partition partition) {
171+
// use Set to avoid duplicate shard group id
172+
StarOSAgent starOSAgent = GlobalStateMgr.getCurrentState().getStarOSAgent();
173+
Collection<PhysicalPartition> subPartitions = partition.getSubPartitions();
174+
Set<Long> needRemoveShardGroupIdSet = new HashSet<>();
175+
for (PhysicalPartition subPartition : subPartitions) {
176+
for (MaterializedIndex index : subPartition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) {
177+
needRemoveShardGroupIdSet.add(index.getShardGroupId());
178+
}
179+
}
180+
if (!needRemoveShardGroupIdSet.isEmpty()) {
181+
starOSAgent.deleteShardGroup(new ArrayList<>(needRemoveShardGroupIdSet));
182+
LOG.debug("Deleted shard group related to partition {}, group ids: {}", partition.getId(),
183+
needRemoveShardGroupIdSet);
184+
}
185+
}
186+
163187
public static boolean isSharedPartitionDirectory(PhysicalPartition physicalPartition, long warehouseId)
164188
throws StarClientException {
165189
ShardInfo shardInfo = getAssociatedShardInfo(physicalPartition, warehouseId).orElse(null);

Diff for: fe/fe-core/src/main/java/com/starrocks/lake/RecycleLakeListPartitionInfo.java

+1
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ public boolean delete() {
4040
Warehouse warehouse = manager.getBackgroundWarehouse();
4141
if (LakeTableHelper.removePartitionDirectory(partition, warehouse.getId())) {
4242
GlobalStateMgr.getCurrentState().getLocalMetastore().onErasePartition(partition);
43+
LakeTableHelper.deleteShardGroupMeta(partition);
4344
return true;
4445
} else {
4546
return false;

Diff for: fe/fe-core/src/main/java/com/starrocks/lake/RecycleLakeRangePartitionInfo.java

+1
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public boolean delete() {
4343
Warehouse warehouse = manager.getBackgroundWarehouse();
4444
if (LakeTableHelper.removePartitionDirectory(partition, warehouse.getId())) {
4545
GlobalStateMgr.getCurrentState().getLocalMetastore().onErasePartition(partition);
46+
LakeTableHelper.deleteShardGroupMeta(partition);
4647
return true;
4748
} else {
4849
return false;

Diff for: fe/fe-core/src/main/java/com/starrocks/lake/RecycleLakeUnPartitionInfo.java

+1
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ public boolean delete() {
4242
Warehouse warehouse = manager.getBackgroundWarehouse();
4343
if (LakeTableHelper.removePartitionDirectory(partition, warehouse.getId())) {
4444
GlobalStateMgr.getCurrentState().getLocalMetastore().onErasePartition(partition);
45+
LakeTableHelper.deleteShardGroupMeta(partition);
4546
return true;
4647
} else {
4748
return false;

Diff for: fe/fe-core/src/test/java/com/starrocks/lake/LakeTableHelperTest.java

+68
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,16 @@
1414

1515
package com.starrocks.lake;
1616

17+
import com.google.common.collect.Lists;
18+
import com.staros.proto.ShardGroupInfo;
1719
import com.starrocks.catalog.Database;
20+
import com.starrocks.catalog.DistributionInfo;
21+
import com.starrocks.catalog.HashDistributionInfo;
22+
import com.starrocks.catalog.MaterializedIndex;
23+
import com.starrocks.catalog.Partition;
24+
import com.starrocks.catalog.PartitionInfo;
25+
import com.starrocks.catalog.PhysicalPartition;
26+
import com.starrocks.catalog.SinglePartitionInfo;
1827
import com.starrocks.catalog.Table;
1928
import com.starrocks.common.Config;
2029
import com.starrocks.qe.ConnectContext;
@@ -24,11 +33,20 @@
2433
import com.starrocks.sql.ast.CreateTableStmt;
2534
import com.starrocks.transaction.TransactionState;
2635
import com.starrocks.utframe.UtFrameUtils;
36+
import mockit.Mock;
37+
import mockit.MockUp;
38+
import mockit.Mocked;
2739
import org.junit.AfterClass;
2840
import org.junit.Assert;
2941
import org.junit.BeforeClass;
3042
import org.junit.Test;
3143

44+
import java.util.ArrayList;
45+
import java.util.Collection;
46+
import java.util.List;
47+
import java.util.stream.Collectors;
48+
import java.util.stream.Stream;
49+
3250
public class LakeTableHelperTest {
3351
private static ConnectContext connectContext;
3452
private static final String DB_NAME = "test_lake_table_helper";
@@ -74,4 +92,54 @@ public void testSupportCombinedTxnLog() throws Exception {
7492
Config.lake_use_combined_txn_log = false;
7593
Assert.assertFalse(LakeTableHelper.supportCombinedTxnLog(TransactionState.LoadJobSourceType.BACKEND_STREAMING));
7694
}
95+
96+
@Test
97+
public void testDeleteShardGroupMeta(@Mocked StarOSAgent starOSAgent) {
98+
99+
new MockUp<GlobalStateMgr>() {
100+
@Mock
101+
public StarOSAgent getStarOSAgent() {
102+
return starOSAgent;
103+
}
104+
};
105+
106+
long tableId = 1001L;
107+
long partitionId = 1000L;
108+
long physicalPartitionId = 1002L;
109+
long groupIdToClear = 5100L;
110+
111+
DistributionInfo distributionInfo = new HashDistributionInfo(10, Lists.newArrayList());
112+
PartitionInfo partitionInfo = new SinglePartitionInfo();
113+
partitionInfo.setReplicationNum(1000L, (short) 3);
114+
Partition partition =
115+
new Partition(partitionId, physicalPartitionId, "p1", new MaterializedIndex(), distributionInfo);
116+
Collection<PhysicalPartition> subPartitions = partition.getSubPartitions();
117+
subPartitions.forEach(physicalPartition -> {
118+
MaterializedIndex materializedIndex =
119+
physicalPartition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL).get(0);
120+
materializedIndex.setShardGroupId(groupIdToClear);
121+
});
122+
123+
// build shardGroupInfos
124+
List<Long> allShardIds = Stream.of(1000L, 1001L, 1002L, 1003L).collect(Collectors.toList());
125+
List<ShardGroupInfo> shardGroupInfos = new ArrayList<>();
126+
ShardGroupInfo info = ShardGroupInfo.newBuilder()
127+
.setGroupId(groupIdToClear)
128+
.putLabels("tableId", String.valueOf(tableId))
129+
.putProperties("createTime", String.valueOf(System.currentTimeMillis() - 86400 * 1000))
130+
.addAllShardIds(allShardIds)
131+
.build();
132+
shardGroupInfos.add(info);
133+
new MockUp<StarOSAgent>() {
134+
@Mock
135+
public void deleteShardGroup(List<Long> groupIds) {
136+
for (long groupId : groupIds) {
137+
shardGroupInfos.removeIf(item -> item.getGroupId() == groupId);
138+
}
139+
}
140+
};
141+
142+
LakeTableHelper.deleteShardGroupMeta(partition);
143+
Assert.assertEquals(0, shardGroupInfos.size());
144+
}
77145
}

0 commit comments

Comments
 (0)