Skip to content

Commit 76c2ec2

Browse files
authored
Cleanup segment assignment strategy interface (#17038)
1 parent 9c2948b commit 76c2ec2

File tree

8 files changed

+42
-66
lines changed

8 files changed

+42
-66
lines changed

pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/BaseSegmentAssignment.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import org.apache.pinot.controller.helix.core.assignment.segment.strategy.SegmentAssignmentStrategyFactory;
3434
import org.apache.pinot.segment.local.utils.TableConfigUtils;
3535
import org.apache.pinot.spi.config.table.TableConfig;
36-
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
3736
import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
3837
import org.slf4j.Logger;
3938
import org.slf4j.LoggerFactory;
@@ -94,8 +93,7 @@ public void init(HelixManager helixManager, TableConfig tableConfig, @Nullable C
9493
*/
9594
protected Pair<List<Map<String, Map<String, String>>>, Map<String, Map<String, String>>> rebalanceTiers(
9695
Map<String, Map<String, String>> currentAssignment, @Nullable List<Tier> sortedTiers,
97-
@Nullable Map<String, InstancePartitions> tierInstancePartitionsMap, boolean bootstrap,
98-
InstancePartitionsType instancePartitionsType) {
96+
@Nullable Map<String, InstancePartitions> tierInstancePartitionsMap, boolean bootstrap) {
9997
if (sortedTiers == null) {
10098
return Pair.of(null, currentAssignment);
10199
}
@@ -129,7 +127,7 @@ protected Pair<List<Map<String, Map<String, String>>>, Map<String, Map<String, S
129127
_logger.info("Rebalancing tier: {} for table: {} with bootstrap: {}, instance partitions: {}", tierName,
130128
_tableNameWithType, bootstrap, tierInstancePartitions);
131129
newTierAssignments.add(reassignSegments(tierName, tierCurrentAssignment, tierInstancePartitions, bootstrap,
132-
segmentAssignmentStrategy, instancePartitionsType));
130+
segmentAssignmentStrategy));
133131
}
134132

135133
return Pair.of(newTierAssignments, tierSegmentAssignment.getNonTierSegmentAssignment());
@@ -140,7 +138,7 @@ protected Pair<List<Map<String, Map<String, String>>>, Map<String, Map<String, S
140138
*/
141139
protected Map<String, Map<String, String>> reassignSegments(String instancePartitionType,
142140
Map<String, Map<String, String>> currentAssignment, InstancePartitions instancePartitions, boolean bootstrap,
143-
SegmentAssignmentStrategy segmentAssignmentStrategy, InstancePartitionsType instancePartitionsType) {
141+
SegmentAssignmentStrategy segmentAssignmentStrategy) {
144142
Map<String, Map<String, String>> newAssignment;
145143
if (bootstrap) {
146144
_logger.info("Bootstrapping segment assignment for {} segments of table: {}", instancePartitionType,
@@ -150,14 +148,14 @@ protected Map<String, Map<String, String>> reassignSegments(String instanceParti
150148
newAssignment = new TreeMap<>();
151149
for (String segment : currentAssignment.keySet()) {
152150
List<String> assignedInstances =
153-
segmentAssignmentStrategy.assignSegment(segment, newAssignment, instancePartitions, instancePartitionsType);
151+
segmentAssignmentStrategy.assignSegment(segment, newAssignment, instancePartitions);
154152
newAssignment
155153
.put(segment, SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, SegmentStateModel.ONLINE));
156154
}
157155
} else {
158156
// Use segment assignment strategy
159157
newAssignment =
160-
segmentAssignmentStrategy.reassignSegments(currentAssignment, instancePartitions, instancePartitionsType);
158+
segmentAssignmentStrategy.reassignSegments(currentAssignment, instancePartitions);
161159
}
162160
return newAssignment;
163161
}

pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,7 @@ public List<String> assignSegment(String segmentName, Map<String, Map<String, St
5151
_logger.info("Assigning segment: {} with instance partitions: {} for table: {}", segmentName, instancePartitions,
5252
_tableNameWithType);
5353
List<String> instancesAssigned =
54-
segmentAssignmentStrategy.assignSegment(segmentName, currentAssignment, instancePartitions,
55-
InstancePartitionsType.OFFLINE);
54+
segmentAssignmentStrategy.assignSegment(segmentName, currentAssignment, instancePartitions);
5655
_logger.info("Assigned segment: {} to instances: {} for table: {}", segmentName, instancesAssigned,
5756
_tableNameWithType);
5857
return instancesAssigned;
@@ -74,22 +73,20 @@ public Map<String, Map<String, String>> rebalanceTable(Map<String, Map<String, S
7473
// tierPartitionMap has single tier for Dim tables and remove below check
7574
// See https://github.com/apache/pinot/issues/9047
7675
if (segmentAssignmentStrategy instanceof AllServersSegmentAssignmentStrategy) {
77-
return segmentAssignmentStrategy.reassignSegments(currentAssignment, offlineInstancePartitions,
78-
InstancePartitionsType.OFFLINE);
76+
return segmentAssignmentStrategy.reassignSegments(currentAssignment, offlineInstancePartitions);
7977
}
8078
boolean bootstrap = config.isBootstrap();
8179
// Rebalance tiers first
8280
Pair<List<Map<String, Map<String, String>>>, Map<String, Map<String, String>>> pair =
83-
rebalanceTiers(currentAssignment, sortedTiers, tierInstancePartitionsMap, bootstrap,
84-
InstancePartitionsType.OFFLINE);
81+
rebalanceTiers(currentAssignment, sortedTiers, tierInstancePartitionsMap, bootstrap);
8582
List<Map<String, Map<String, String>>> newTierAssignments = pair.getLeft();
8683
Map<String, Map<String, String>> nonTierAssignment = pair.getRight();
8784

8885
_logger.info("Rebalancing table: {} with instance partitions: {}, bootstrap: {}", _tableNameWithType,
8986
offlineInstancePartitions, bootstrap);
9087
Map<String, Map<String, String>> newAssignment =
9188
reassignSegments(InstancePartitionsType.OFFLINE.toString(), nonTierAssignment, offlineInstancePartitions,
92-
bootstrap, segmentAssignmentStrategy, InstancePartitionsType.OFFLINE);
89+
bootstrap, segmentAssignmentStrategy);
9390

9491
// Add tier assignments, if available
9592
if (CollectionUtils.isNotEmpty(newTierAssignments)) {

pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,7 @@ public List<String> assignSegment(String segmentName, Map<String, Map<String, St
9696
SegmentAssignmentStrategy segmentAssignmentStrategy =
9797
SegmentAssignmentStrategyFactory.getSegmentAssignmentStrategy(_helixManager, _tableConfig,
9898
instancePartitionsType.toString(), instancePartitions);
99-
instancesAssigned = segmentAssignmentStrategy.assignSegment(segmentName, currentAssignment, instancePartitions,
100-
InstancePartitionsType.COMPLETED);
99+
instancesAssigned = segmentAssignmentStrategy.assignSegment(segmentName, currentAssignment, instancePartitions);
101100
} else {
102101
instancesAssigned = assignConsumingSegment(segmentName, instancePartitions);
103102
}
@@ -186,8 +185,7 @@ public Map<String, Map<String, String>> rebalanceTable(Map<String, Map<String, S
186185
boolean bootstrap = config.isBootstrap();
187186
// Rebalance tiers first
188187
Pair<List<Map<String, Map<String, String>>>, Map<String, Map<String, String>>> pair =
189-
rebalanceTiers(currentAssignment, sortedTiers, tierInstancePartitionsMap, bootstrap,
190-
InstancePartitionsType.COMPLETED);
188+
rebalanceTiers(currentAssignment, sortedTiers, tierInstancePartitionsMap, bootstrap);
191189

192190
List<Map<String, Map<String, String>>> newTierAssignments = pair.getLeft();
193191
Map<String, Map<String, String>> nonTierAssignment = pair.getRight();
@@ -220,7 +218,7 @@ public Map<String, Map<String, String>> rebalanceTable(Map<String, Map<String, S
220218
_logger.info("Reassigning COMPLETED segments with COMPLETED instance partitions for table: {}",
221219
_tableNameWithType);
222220
newAssignment = reassignSegments(InstancePartitionsType.COMPLETED.toString(), completedSegmentAssignment,
223-
completedInstancePartitions, bootstrap, segmentAssignmentStrategy, InstancePartitionsType.COMPLETED);
221+
completedInstancePartitions, bootstrap, segmentAssignmentStrategy);
224222
} else {
225223
// When COMPLETED instance partitions are not provided, reassign COMPLETED segments the same way as CONSUMING
226224
// segments with CONSUMING instance partitions (ensure COMPLETED segments are served by the correct instances when

pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/AllServersSegmentAssignmentStrategy.java

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
3232
import org.apache.pinot.spi.config.table.TableConfig;
3333
import org.apache.pinot.spi.config.table.TenantConfig;
34-
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
3534
import org.apache.pinot.spi.utils.CommonConstants;
3635
import org.slf4j.Logger;
3736
import org.slf4j.LoggerFactory;
@@ -41,34 +40,30 @@
4140
* Segment assignment for an offline dimension table.
4241
* <ul>
4342
* <li>
44-
* <p>This segment assignment strategy is used when {@link TableConfig#IS_DIM_TABLE_KEY}is
45-
* set to "true".</p>
43+
* This segment assignment strategy is used when {@link TableConfig#IS_DIM_TABLE_KEY} is set to "true".
4644
* </li>
4745
* <li>
48-
* <p>For a dimension table we assign the segment to all the hosts. Thus for this assignment
49-
* strategy we simply return all the hosts under a given tag as the assigned hosts for
50-
* a given segment.</p>
46+
* For a dimension table we assign all segments to all the servers. Thus, for this assignment strategy, we simply
47+
* return all the instances under a given tag as the assigned instances for any given segment.
5148
* </li>
5249
* </ul>
5350
*/
5451
public class AllServersSegmentAssignmentStrategy implements SegmentAssignmentStrategy {
5552
private static final Logger LOGGER = LoggerFactory.getLogger(AllServersSegmentAssignmentStrategy.class);
5653

5754
private HelixManager _helixManager;
58-
private String _offlineTableName;
5955
private TenantConfig _tenantConfig;
6056

6157
@Override
6258
public void init(HelixManager helixManager, TableConfig tableConfig) {
6359
_helixManager = helixManager;
64-
_offlineTableName = tableConfig.getTableName();
6560
_tenantConfig = tableConfig.getTenantConfig();
66-
LOGGER.info("Initialized AllServersSegmentAssignmentStrategy for table: {}", _offlineTableName);
61+
LOGGER.info("Initialized AllServersSegmentAssignmentStrategy for table: {}", tableConfig.getTableName());
6762
}
6863

6964
@Override
7065
public List<String> assignSegment(String segmentName, Map<String, Map<String, String>> currentAssignment,
71-
InstancePartitions instancePartitions, InstancePartitionsType instancePartitionsType) {
66+
InstancePartitions instancePartitions) {
7267
String serverTag = _tenantConfig.getServer();
7368
Set<String> instances = HelixHelper.getServerInstancesForTenant(_helixManager, serverTag);
7469
int numInstances = instances.size();
@@ -79,7 +74,7 @@ public List<String> assignSegment(String segmentName, Map<String, Map<String, St
7974

8075
@Override
8176
public Map<String, Map<String, String>> reassignSegments(Map<String, Map<String, String>> currentAssignment,
82-
InstancePartitions instancePartitions, InstancePartitionsType instancePartitionsType) {
77+
InstancePartitions instancePartitions) {
8378

8479
String serverTag = _tenantConfig.getServer();
8580
Set<String> instances = HelixHelper.getServerInstancesForTenant(_helixManager, serverTag);

pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/BalancedNumSegmentAssignmentStrategy.java

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,18 +26,15 @@
2626
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
2727
import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
2828
import org.apache.pinot.spi.config.table.TableConfig;
29-
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
3029
import org.slf4j.Logger;
3130
import org.slf4j.LoggerFactory;
3231

3332

3433
/**
35-
* Balance num Segment assignment strategy class for offline segment assignment
36-
* <ul>
37-
* <li>
38-
* <p>This segment assignment strategy is used when table replication/ num_replica_groups = 1.</p>
39-
* </li>
40-
* </ul>
34+
* Balanced segment assignment strategy class where segments are distributed to instances such that each instance has
35+
* approximately the same number of segments. This is the default segment assignment strategy.
36+
* <p>
37+
* This segment assignment strategy is used when table replication/ num_replica_groups = 1.
4138
*/
4239
public class BalancedNumSegmentAssignmentStrategy implements SegmentAssignmentStrategy {
4340
private static final Logger LOGGER = LoggerFactory.getLogger(BalancedNumSegmentAssignmentStrategy.class);
@@ -47,22 +44,22 @@ public class BalancedNumSegmentAssignmentStrategy implements SegmentAssignmentSt
4744
@Override
4845
public void init(HelixManager helixManager, TableConfig tableConfig) {
4946
SegmentsValidationAndRetentionConfig validationAndRetentionConfig = tableConfig.getValidationConfig();
50-
Preconditions.checkState(validationAndRetentionConfig != null, "Validation Config is null");
47+
Preconditions.checkState(validationAndRetentionConfig != null, "segmentsConfig is null");
5148
_replication = tableConfig.getReplication();
5249
LOGGER.info("Initialized BalancedNumSegmentAssignmentStrategy for table: {} with replication: {}",
5350
tableConfig.getTableName(), _replication);
5451
}
5552

5653
@Override
5754
public List<String> assignSegment(String segmentName, Map<String, Map<String, String>> currentAssignment,
58-
InstancePartitions instancePartitions, InstancePartitionsType instancePartitionsType) {
55+
InstancePartitions instancePartitions) {
5956
validateSegmentAssignmentStrategy(instancePartitions);
6057
return SegmentAssignmentUtils.assignSegmentWithoutReplicaGroup(currentAssignment, instancePartitions, _replication);
6158
}
6259

6360
@Override
6461
public Map<String, Map<String, String>> reassignSegments(Map<String, Map<String, String>> currentAssignment,
65-
InstancePartitions instancePartitions, InstancePartitionsType instancePartitionsType) {
62+
InstancePartitions instancePartitions) {
6663
validateSegmentAssignmentStrategy(instancePartitions);
6764
List<String> instances =
6865
SegmentAssignmentUtils.getInstancesForNonReplicaGroupBasedAssignment(instancePartitions, _replication);
@@ -74,8 +71,8 @@ private void validateSegmentAssignmentStrategy(InstancePartitions instancePartit
7471
int numPartitions = instancePartitions.getNumPartitions();
7572
// Non-replica-group based assignment should have numReplicaGroups and numPartitions = 1
7673
Preconditions.checkState(numReplicaGroups == 1,
77-
"Replica groups should be 1 in order to use BalanceNumSegmentAssignmentStrategy");
74+
"Number of replica groups should be 1 in order to use BalancedNumSegmentAssignmentStrategy");
7875
Preconditions.checkState(numPartitions == 1,
79-
"Replica groups should be 1 in order to use BalanceNumSegmentAssignmentStrategy");
76+
"Number of instance partitions should be 1 in order to use BalancedNumSegmentAssignmentStrategy");
8077
}
8178
}

pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/ReplicaGroupSegmentAssignmentStrategy.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import org.apache.pinot.segment.local.utils.TableConfigUtils;
3434
import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
3535
import org.apache.pinot.spi.config.table.TableConfig;
36-
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
3736
import org.slf4j.Logger;
3837
import org.slf4j.LoggerFactory;
3938

@@ -45,17 +44,15 @@ class ReplicaGroupSegmentAssignmentStrategy implements SegmentAssignmentStrategy
4544
private String _tableName;
4645
private String _partitionColumn;
4746
private int _replication;
48-
private TableConfig _tableConfig;
4947

5048
@Override
5149
public void init(HelixManager helixManager, TableConfig tableConfig) {
5250
_helixManager = helixManager;
53-
_tableConfig = tableConfig;
5451
_tableName = tableConfig.getTableName();
5552
SegmentsValidationAndRetentionConfig validationAndRetentionConfig = tableConfig.getValidationConfig();
56-
Preconditions.checkState(validationAndRetentionConfig != null, "Validation Config is null");
53+
Preconditions.checkState(validationAndRetentionConfig != null, "segmentsConfig is null");
5754
_replication = tableConfig.getReplication();
58-
_partitionColumn = TableConfigUtils.getPartitionColumn(_tableConfig);
55+
_partitionColumn = TableConfigUtils.getPartitionColumn(tableConfig);
5956
if (_partitionColumn == null) {
6057
LOGGER.info("Initialized ReplicaGroupSegmentAssignmentStrategy "
6158
+ "with replication: {} without partition column for table: {} ", _replication, _tableName);
@@ -70,7 +67,7 @@ public void init(HelixManager helixManager, TableConfig tableConfig) {
7067
*/
7168
@Override
7269
public List<String> assignSegment(String segmentName, Map<String, Map<String, String>> currentAssignment,
73-
InstancePartitions instancePartitions, InstancePartitionsType instancePartitionsType) {
70+
InstancePartitions instancePartitions) {
7471
checkReplication(instancePartitions, _replication, _tableName);
7572
int numPartitions = instancePartitions.getNumPartitions();
7673
int partitionId;
@@ -86,7 +83,7 @@ public List<String> assignSegment(String segmentName, Map<String, Map<String, St
8683

8784
@Override
8885
public Map<String, Map<String, String>> reassignSegments(Map<String, Map<String, String>> currentAssignment,
89-
InstancePartitions instancePartitions, InstancePartitionsType instancePartitionsType) {
86+
InstancePartitions instancePartitions) {
9087
Map<String, Map<String, String>> newAssignment;
9188
int numPartitions = instancePartitions.getNumPartitions();
9289

pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/SegmentAssignmentStrategy.java

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.apache.helix.HelixManager;
2424
import org.apache.pinot.common.assignment.InstancePartitions;
2525
import org.apache.pinot.spi.config.table.TableConfig;
26-
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
2726

2827

2928
/**
@@ -40,28 +39,27 @@ public interface SegmentAssignmentStrategy {
4039
void init(HelixManager helixManager, TableConfig tableConfig);
4140

4241
/**
43-
* Assigns segment to instances. The assignment strategy will be configured in
44-
* OfflineSegmentAssignment and RealtimeSegmentAssignment classes and depending on type of
45-
* assignment strategy, this function will be called to assign a new segment
42+
* Assigns a segment to instances. The assignment strategy will be configured in the OfflineSegmentAssignment and
43+
* RealtimeSegmentAssignment classes, and depending on the type of assignment strategy, this method will be called
44+
* to assign a new segment.
4645
*
4746
* @param segmentName Name of the segment to be assigned
4847
* @param currentAssignment Current segment assignment of the table (map from segment name to instance state map)
4948
* @param instancePartitions Instance partitions
5049
* @return List of instances to assign the segment to
5150
*/
5251
List<String> assignSegment(String segmentName, Map<String, Map<String, String>> currentAssignment,
53-
InstancePartitions instancePartitions, InstancePartitionsType instancePartitionsType);
52+
InstancePartitions instancePartitions);
5453

5554
/**
56-
* Re-assigns segment to instances. The assignment strategy will be configured in
57-
* OfflineSegmentAssignment and RealtimeSegmentAssignment classes and depending on type of
58-
* assignment strategy, this function will be called to re-assign a segment
59-
* when the InstancePartitions has been changed.
55+
* Re-assigns segments to instances. The assignment strategy will be configured in the OfflineSegmentAssignment and
56+
* RealtimeSegmentAssignment classes, and depending on the type of assignment strategy, this method will be called
57+
* to re-assign segments when the InstancePartitions has been changed.
6058
*
6159
* @param currentAssignment Current segment assignment of the table (map from segment name to instance state map)
6260
* @param instancePartitions Instance partitions
63-
* @return Rebalanced assignment for the segments per assignment strategy
61+
* @return Rebalanced assignment for the segments as per the assignment strategy
6462
*/
6563
Map<String, Map<String, String>> reassignSegments(Map<String, Map<String, String>> currentAssignment,
66-
InstancePartitions instancePartitions, InstancePartitionsType instancePartitionsType);
64+
InstancePartitions instancePartitions);
6765
}

0 commit comments

Comments
 (0)