Skip to content

Commit 241ead5

Browse files
committed
controller: reuse error-state repair for non-pauseless; delete redundant reingest method
- Reuse repairSegmentsInErrorStateForPauselessConsumption() to repair errored segments for both pauseless and non-pauseless tables - Remove reingestCommittingSegmentsForPauselessDisabled() and route call sites to the shared repair flow - Keep a single reingestion entrypoint: reingestSegment(table, segment, instances) with simple predicates (hasOnlineInstance, maybeResetIfNotInProgress) - Update RealtimeSegmentValidationManager to always invoke the shared repair flow (and preserve optional auto-reset behavior) - Adjust RealtimeSegmentValidationManagerTest expectations accordingly Behavioral notes: - Only re-ingests COMMITTING LLC segments with start/end offsets and no download URL when all replicas are in ERROR and the segment is ONLINE in IdealState - Otherwise resets segments not in IN_PROGRESS to fetch from deep store/peer - Applies only to tables without dedup or partial upsert unless explicitly allowed via allowRepairOfErrorSegments()
1 parent c897df7 commit 241ead5

File tree

3 files changed

+85
-27
lines changed

3 files changed

+85
-27
lines changed

pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2372,9 +2372,8 @@ private Set<String> filterSegmentsToCommit(Set<String> allConsumingSegments,
23722372
Set<String> invalidSegments = partitionedByIsConsuming.get(false);
23732373
if (!invalidSegments.isEmpty()) {
23742374
LOGGER.warn("Cannot commit segments that are not in CONSUMING state. All consuming segments: {}, "
2375-
+ "provided segments to commit: {}. Ignoring all non-consuming segments, sampling 10: {}",
2376-
allConsumingSegments,
2377-
segmentsToCommitStr, invalidSegments.stream().limit(10).collect(Collectors.toSet()));
2375+
+ "provided segments to commit: {}. Ignoring all non-consuming segments, sampling 10: {}",
2376+
allConsumingSegments, segmentsToCommitStr, invalidSegments.stream().limit(10).collect(Collectors.toSet()));
23782377
}
23792378
return validSegmentsToCommit;
23802379
}
@@ -2691,16 +2690,15 @@ public void repairSegmentsInErrorStateForPauselessConsumption(TableConfig tableC
26912690
assert idealStateMap != null;
26922691
String aliveServer = pickServerToReingest(idealStateMap.keySet());
26932692
if (aliveServer == null) {
2694-
LOGGER.warn("No alive server found to re-ingest segment: {} in table: {}, skipping re-ingestion", segmentName,
2695-
realtimeTableName);
2696-
continue;
2693+
LOGGER.warn("No alive server found to re-ingest segment: {} in table: {}", segmentName, realtimeTableName);
2694+
return;
26972695
}
2698-
26992696
try {
2697+
LOGGER.info("Triggering re-ingestion for segment: {} in table: {} on server: {}", segmentName, realtimeTableName,
2698+
aliveServer);
27002699
triggerReingestion(aliveServer, segmentName);
2701-
LOGGER.info("Successfully triggered re-ingestion for segment: {} on server: {}", segmentName, aliveServer);
27022700
} catch (Exception e) {
2703-
LOGGER.error("Failed to call reingestSegment for segment: {} on server: {}", segmentName, aliveServer, e);
2701+
LOGGER.error("Failed to trigger re-ingestion for segment: {} on server: {}", segmentName, aliveServer, e);
27042702
}
27052703
} else if (segmentZKMetadata.getStatus() != Status.IN_PROGRESS) {
27062704
// Trigger reset for segment not in IN_PROGRESS state to download the segment from deep store or peer server

pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -130,14 +130,13 @@ protected void processTable(String tableNameWithType, Context context) {
130130
LOGGER.info("Skipping segment-level validation for table: {}", tableConfig.getTableName());
131131
}
132132

133-
boolean isPauselessConsumptionEnabled = PauselessConsumptionUtils.isPauselessEnabled(tableConfig);
134-
if (isPauselessConsumptionEnabled) {
135-
// For pauseless tables without dedup or partial upsert, repair segments in error state
136-
_llcRealtimeSegmentManager.repairSegmentsInErrorStateForPauselessConsumption(tableConfig,
137-
context._repairErrorSegmentsForPartialUpsertOrDedup);
138-
} else if (_segmentAutoResetOnErrorAtValidation) {
139-
// Reset for pauseless tables is already handled in repairSegmentsInErrorStateForPauselessConsumption method with
140-
// additional checks for pauseless consumption
133+
// For realtime tables without dedup or partial upsert, repair segments in error state
134+
// This used to only work for pauseless table, but now extends the usage to all tables
135+
_llcRealtimeSegmentManager.repairSegmentsInErrorStateForPauselessConsumption(tableConfig,
136+
context._repairErrorSegmentsForPartialUpsertOrDedup);
137+
if (_segmentAutoResetOnErrorAtValidation) {
138+
// Reset for pauseless tables is already handled in repairSegmentsInErrorStateForPauselessConsumption method
139+
// with additional checks for pauseless consumption
141140
_pinotHelixResourceManager.resetSegments(tableConfig.getTableName(), null, true);
142141
}
143142
}
@@ -184,7 +183,7 @@ boolean shouldEnsureConsuming(String tableNameWithType) {
184183
// The table was previously paused due to exceeding resource utilization, but the current status cannot be
185184
// determined. To be safe, leave it as paused and once the status is available take the correct action
186185
LOGGER.warn("Resource utilization limit could not be determined for for table: {}, and it is paused, leave it as "
187-
+ "paused", tableNameWithType);
186+
+ "paused", tableNameWithType);
188187
return false;
189188
}
190189
_controllerMetrics.setOrUpdateTableGauge(tableNameWithType, ControllerGauge.RESOURCE_UTILIZATION_LIMIT_EXCEEDED,

pinot-controller/src/test/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManagerTest.java

Lines changed: 70 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,11 @@
2323
import org.apache.pinot.controller.api.resources.PauseStatusDetails;
2424
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
2525
import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
26+
import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
2627
import org.apache.pinot.spi.config.table.PauseState;
2728
import org.apache.pinot.spi.config.table.TableConfig;
29+
import org.apache.pinot.spi.config.table.TableType;
30+
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
2831
import org.mockito.Mock;
2932
import org.mockito.MockitoAnnotations;
3033
import org.testng.Assert;
@@ -64,6 +67,52 @@ public void setup() {
6467
_llcRealtimeSegmentManager, null, _controllerMetrics, _storageQuotaChecker, _resourceUtilizationManager);
6568
}
6669

70+
@Test
71+
public void testReingestionCalledWhenPauselessDisabled() {
72+
String rawTable = "testTable";
73+
String tableName = rawTable + "_REALTIME";
74+
TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(rawTable)
75+
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap()).build();
76+
77+
// Force shouldEnsureConsuming=false by simulating admin pause
78+
when(_pinotHelixResourceManager.getTableConfig(tableName)).thenReturn(tableConfig);
79+
when(_llcRealtimeSegmentManager.getPauseStatusDetails(tableName))
80+
.thenReturn(new PauseStatusDetails(true, null, PauseState.ReasonCode.ADMINISTRATIVE, null, null));
81+
82+
_realtimeSegmentValidationManager.processTable(tableName, new RealtimeSegmentValidationManager.Context());
83+
84+
verify(_llcRealtimeSegmentManager, times(1))
85+
.repairSegmentsInErrorStateForPauselessConsumption(eq(tableConfig), anyBoolean());
86+
}
87+
88+
@Test
89+
public void testNoReingestionWhenPauselessEnabled() {
90+
String rawTable = "testTable";
91+
String tableName = rawTable + "_REALTIME";
92+
TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(rawTable)
93+
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap()).build();
94+
95+
// Enable pauseless on ingestion config
96+
org.apache.pinot.spi.config.table.ingestion.IngestionConfig ingestionConfig =
97+
new org.apache.pinot.spi.config.table.ingestion.IngestionConfig();
98+
org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig streamIngestionConfig =
99+
new org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig(
100+
java.util.List.of(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap()));
101+
streamIngestionConfig.setPauselessConsumptionEnabled(true);
102+
ingestionConfig.setStreamIngestionConfig(streamIngestionConfig);
103+
tableConfig.setIngestionConfig(ingestionConfig);
104+
105+
// Force shouldEnsureConsuming=false by simulating admin pause (to avoid ensureAllPartitionsConsuming)
106+
when(_pinotHelixResourceManager.getTableConfig(tableName)).thenReturn(tableConfig);
107+
when(_llcRealtimeSegmentManager.getPauseStatusDetails(tableName))
108+
.thenReturn(new PauseStatusDetails(true, null, PauseState.ReasonCode.ADMINISTRATIVE, null, null));
109+
110+
_realtimeSegmentValidationManager.processTable(tableName, new RealtimeSegmentValidationManager.Context());
111+
112+
verify(_llcRealtimeSegmentManager, times(1))
113+
.repairSegmentsInErrorStateForPauselessConsumption(eq(tableConfig), anyBoolean());
114+
}
115+
67116
@AfterMethod
68117
public void tearDown()
69118
throws Exception {
@@ -77,21 +126,32 @@ public Object[][] testCases() {
77126
{true, PauseState.ReasonCode.ADMINISTRATIVE, UtilizationChecker.CheckResult.PASS, false, false},
78127

79128
// Resource utilization exceeded and pause state is updated, should return false
80-
{false, PauseState.ReasonCode.RESOURCE_UTILIZATION_LIMIT_EXCEEDED, UtilizationChecker.CheckResult.FAIL, false,
81-
false},
129+
{
130+
false, PauseState.ReasonCode.RESOURCE_UTILIZATION_LIMIT_EXCEEDED, UtilizationChecker.CheckResult.FAIL,
131+
false,
132+
false
133+
},
82134

83135
// Resource utilization is within limits but was previously paused due to resource utilization,
84136
// should return true
85-
{true, PauseState.ReasonCode.RESOURCE_UTILIZATION_LIMIT_EXCEEDED, UtilizationChecker.CheckResult.PASS, false,
86-
true},
137+
{
138+
true, PauseState.ReasonCode.RESOURCE_UTILIZATION_LIMIT_EXCEEDED, UtilizationChecker.CheckResult.PASS, false,
139+
true
140+
},
87141

88142
// Resource utilization is STALE but was previously paused due to resource utilization, should return false
89-
{true, PauseState.ReasonCode.RESOURCE_UTILIZATION_LIMIT_EXCEEDED, UtilizationChecker.CheckResult.UNDETERMINED,
90-
false, false},
143+
{
144+
true, PauseState.ReasonCode.RESOURCE_UTILIZATION_LIMIT_EXCEEDED,
145+
UtilizationChecker.CheckResult.UNDETERMINED,
146+
false, false
147+
},
91148

92149
// Resource utilization is STALE but was not previously paused due to resource utilization, should return true
93-
{false, PauseState.ReasonCode.RESOURCE_UTILIZATION_LIMIT_EXCEEDED, UtilizationChecker.CheckResult.UNDETERMINED,
94-
false, true},
150+
{
151+
false, PauseState.ReasonCode.RESOURCE_UTILIZATION_LIMIT_EXCEEDED,
152+
UtilizationChecker.CheckResult.UNDETERMINED,
153+
false, true
154+
},
95155

96156
// Resource utilization is within limits but was previously paused due to storage quota exceeded,
97157
// should return false
@@ -101,7 +161,8 @@ public Object[][] testCases() {
101161
{false, PauseState.ReasonCode.STORAGE_QUOTA_EXCEEDED, UtilizationChecker.CheckResult.PASS, true, false},
102162

103163
// Storage quota within limits but was previously paused due to storage quota exceeded, should return true
104-
{true, PauseState.ReasonCode.STORAGE_QUOTA_EXCEEDED, UtilizationChecker.CheckResult.PASS, false, true}};
164+
{true, PauseState.ReasonCode.STORAGE_QUOTA_EXCEEDED, UtilizationChecker.CheckResult.PASS, false, true}
165+
};
105166
}
106167

107168
@Test(dataProvider = "testCases")

0 commit comments

Comments
 (0)