Skip to content

Commit d8afa42

Browse files
committed
controller: reuse error-state repair for non-pauseless; delete redundant reingest method
- Reuse repairSegmentsInErrorStateForPauselessConsumption() to repair errored segments for both pauseless and non-pauseless tables - Remove reingestCommittingSegmentsForPauselessDisabled() and route call sites to the shared repair flow - Keep a single reingestion entrypoint: reingestSegment(table, segment, instances) with simple predicates (hasOnlineInstance, maybeResetIfNotInProgress) - Update RealtimeSegmentValidationManager to always invoke the shared repair flow (and preserve optional auto-reset behavior) - Adjust RealtimeSegmentValidationManagerTest expectations accordingly Behavioral notes: - Only re-ingests COMMITTING LLC segments with start/end offsets and no download URL when all replicas are in ERROR and the segment is ONLINE in IdealState - Otherwise resets segments not in IN_PROGRESS to fetch from deep store/peer - Applies only to tables without dedup or partial upsert unless explicitly allowed via allowRepairOfErrorSegments()
1 parent a7bd7e9 commit d8afa42

File tree

3 files changed

+110
-35
lines changed

3 files changed

+110
-35
lines changed

pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java

Lines changed: 35 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2401,9 +2401,8 @@ private Set<String> filterSegmentsToCommit(Set<String> allConsumingSegments,
24012401
Set<String> invalidSegments = partitionedByIsConsuming.get(false);
24022402
if (!invalidSegments.isEmpty()) {
24032403
LOGGER.warn("Cannot commit segments that are not in CONSUMING state. All consuming segments: {}, "
2404-
+ "provided segments to commit: {}. Ignoring all non-consuming segments, sampling 10: {}",
2405-
allConsumingSegments,
2406-
segmentsToCommitStr, invalidSegments.stream().limit(10).collect(Collectors.toSet()));
2404+
+ "provided segments to commit: {}. Ignoring all non-consuming segments, sampling 10: {}",
2405+
allConsumingSegments, segmentsToCommitStr, invalidSegments.stream().limit(10).collect(Collectors.toSet()));
24072406
}
24082407
return validSegmentsToCommit;
24092408
}
@@ -2607,6 +2606,7 @@ URI createSegmentPath(String rawTableName, String segmentName) {
26072606
}
26082607

26092608
/**
2609+
* Repair segments in error state:
26102610
* Re-ingests segments that are in ERROR state in EV but ONLINE in IS with no peer copy on any server. This method
26112611
* will call the server reingestSegment API on one of the alive servers that are supposed to host that segment
26122612
* according to IdealState.
@@ -2615,18 +2615,17 @@ URI createSegmentPath(String rawTableName, String segmentName) {
26152615
* <p>
26162616
* If segment is in ERROR state in only few replicas but has download URL, we instead trigger a segment reset
26172617
*
2618-
* @param tableConfig The table config
2619-
* @param segmentAutoResetOnErrorAtValidation flag to determine whether to reset the error segments or not
2618+
* @param tableConfig The table config
26202619
*/
2621-
public void repairSegmentsInErrorStateForPauselessConsumption(TableConfig tableConfig,
2622-
boolean repairErrorSegmentsForPartialUpsertOrDedup, boolean segmentAutoResetOnErrorAtValidation) {
2620+
public void repairSegmentsInErrorState(TableConfig tableConfig,
2621+
boolean repairErrorSegmentsForPartialUpsertOrDedup) {
2622+
boolean isPauselessTable = PauselessConsumptionUtils.isPauselessEnabled(tableConfig);
26232623
String realtimeTableName = tableConfig.getTableName();
26242624
// Fetch ideal state and external view
26252625
IdealState idealState = getIdealState(realtimeTableName);
26262626
ExternalView externalView = _helixResourceManager.getTableExternalView(realtimeTableName);
26272627
if (externalView == null) {
2628-
LOGGER.warn(
2629-
"External view not found for table: {}, skipping repairing segments in error state for pauseless consumption",
2628+
LOGGER.warn("External view not found for table: {}, skipping repairing segments in error state",
26302629
realtimeTableName);
26312630
return;
26322631
}
@@ -2673,9 +2672,14 @@ public void repairSegmentsInErrorStateForPauselessConsumption(TableConfig tableC
26732672
}
26742673

26752674
if (segmentsInErrorStateInAtLeastOneReplica.isEmpty()) {
2676-
_controllerMetrics.setOrUpdateTableGauge(realtimeTableName, ControllerGauge.PAUSELESS_SEGMENTS_IN_ERROR_COUNT, 0);
2677-
_controllerMetrics.setOrUpdateTableGauge(realtimeTableName,
2678-
ControllerGauge.PAUSELESS_SEGMENTS_IN_UNRECOVERABLE_ERROR_COUNT, 0);
2675+
if (isPauselessTable) {
2676+
_controllerMetrics.setOrUpdateTableGauge(realtimeTableName, ControllerGauge.PAUSELESS_SEGMENTS_IN_ERROR_COUNT,
2677+
0);
2678+
_controllerMetrics.setOrUpdateTableGauge(realtimeTableName,
2679+
ControllerGauge.PAUSELESS_SEGMENTS_IN_UNRECOVERABLE_ERROR_COUNT, 0);
2680+
} else {
2681+
_controllerMetrics.setOrUpdateTableGauge(realtimeTableName, ControllerGauge.SEGMENTS_IN_ERROR_STATE, 0);
2682+
}
26792683
return;
26802684
}
26812685

@@ -2690,6 +2694,22 @@ public void repairSegmentsInErrorStateForPauselessConsumption(TableConfig tableC
26902694
boolean repairCommittingSegments =
26912695
allowRepairOfCommittingSegments(repairErrorSegmentsForPartialUpsertOrDedup, tableConfig);
26922696
int segmentsInUnRecoverableState = 0;
2697+
if (isPauselessTable && !repairCommittingSegments) {
2698+
// We do not run reingestion for dedup and partial upsert tables in pauseless as it can
2699+
// lead to data inconsistencies
2700+
_controllerMetrics.setOrUpdateTableGauge(realtimeTableName,
2701+
ControllerGauge.PAUSELESS_SEGMENTS_IN_UNRECOVERABLE_ERROR_COUNT, segmentsInErrorStateInAllReplicas.size());
2702+
return;
2703+
} else {
2704+
LOGGER.info("Repairing error segments in table: {}.", realtimeTableName);
2705+
if (isPauselessTable) {
2706+
_controllerMetrics.setOrUpdateTableGauge(realtimeTableName, ControllerGauge.PAUSELESS_SEGMENTS_IN_ERROR_COUNT,
2707+
segmentsInErrorStateInAllReplicas.size());
2708+
} else {
2709+
_controllerMetrics.setOrUpdateTableGauge(realtimeTableName, ControllerGauge.SEGMENTS_IN_ERROR_STATE,
2710+
segmentsInErrorStateInAllReplicas.size());
2711+
}
2712+
}
26932713

26942714
for (String segmentName : segmentsInErrorStateInAtLeastOneReplica) {
26952715
SegmentZKMetadata segmentZKMetadata = _helixResourceManager.getSegmentZKMetadata(realtimeTableName, segmentName);
@@ -2723,14 +2743,14 @@ public void repairSegmentsInErrorStateForPauselessConsumption(TableConfig tableC
27232743
realtimeTableName);
27242744
continue;
27252745
}
2726-
27272746
try {
27282747
triggerReingestion(aliveServer, segmentName);
27292748
LOGGER.info("Successfully triggered re-ingestion for segment: {} on server: {}", segmentName, aliveServer);
27302749
} catch (Exception e) {
2731-
LOGGER.error("Failed to call reingestSegment for segment: {} on server: {}", segmentName, aliveServer, e);
2750+
LOGGER.error("Failed to trigger re-ingestion for segment: {} on server: {}", segmentName, aliveServer, e);
27322751
}
2733-
} else if (segmentAutoResetOnErrorAtValidation) {
2752+
} else if (segmentZKMetadata.getStatus() != Status.IN_PROGRESS) {
2753+
// Trigger reset for segment not in IN_PROGRESS state to download the segment from deep store or peer server
27342754
_helixResourceManager.resetSegment(realtimeTableName, segmentName, null);
27352755
}
27362756
}

pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -129,16 +129,10 @@ protected void processTable(String tableNameWithType, Context context) {
129129
LOGGER.info("Skipping segment-level validation for table: {}", tableConfig.getTableName());
130130
}
131131

132-
boolean isPauselessConsumptionEnabled = PauselessConsumptionUtils.isPauselessEnabled(tableConfig);
133-
if (isPauselessConsumptionEnabled) {
134-
// For pauseless tables without dedup or partial upsert, repair segments in error state
135-
_llcRealtimeSegmentManager.repairSegmentsInErrorStateForPauselessConsumption(tableConfig,
136-
context._repairErrorSegmentsForPartialUpsertOrDedup, _segmentAutoResetOnErrorAtValidation);
137-
} else if (_segmentAutoResetOnErrorAtValidation) {
138-
// Reset for pauseless tables is already handled in repairSegmentsInErrorStateForPauselessConsumption method with
139-
// additional checks for pauseless consumption
140-
_pinotHelixResourceManager.resetSegments(tableConfig.getTableName(), null, true);
141-
}
132+
// For realtime tables without dedup or partial upsert, repair segments in error state
133+
// This used to only work for pauseless table, but now extends the usage to all tables
134+
_llcRealtimeSegmentManager.repairSegmentsInErrorState(tableConfig,
135+
context._repairErrorSegmentsForPartialUpsertOrDedup);
142136
}
143137

144138
/**
@@ -183,7 +177,7 @@ boolean shouldEnsureConsuming(String tableNameWithType) {
183177
// The table was previously paused due to exceeding resource utilization, but the current status cannot be
184178
// determined. To be safe, leave it as paused and once the status is available take the correct action
185179
LOGGER.warn("Resource utilization limit could not be determined for for table: {}, and it is paused, leave it as "
186-
+ "paused", tableNameWithType);
180+
+ "paused", tableNameWithType);
187181
return false;
188182
}
189183
_controllerMetrics.setOrUpdateTableGauge(tableNameWithType, ControllerGauge.RESOURCE_UTILIZATION_LIMIT_EXCEEDED,

pinot-controller/src/test/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManagerTest.java

Lines changed: 70 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,11 @@
2323
import org.apache.pinot.controller.api.resources.PauseStatusDetails;
2424
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
2525
import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
26+
import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
2627
import org.apache.pinot.spi.config.table.PauseState;
2728
import org.apache.pinot.spi.config.table.TableConfig;
29+
import org.apache.pinot.spi.config.table.TableType;
30+
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
2831
import org.mockito.Mock;
2932
import org.mockito.MockitoAnnotations;
3033
import org.testng.Assert;
@@ -64,6 +67,52 @@ public void setup() {
6467
_llcRealtimeSegmentManager, null, _controllerMetrics, _storageQuotaChecker, _resourceUtilizationManager);
6568
}
6669

70+
@Test
71+
public void testReingestionCalledWhenPauselessDisabled() {
72+
String rawTable = "testTable";
73+
String tableName = rawTable + "_REALTIME";
74+
TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(rawTable)
75+
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap()).build();
76+
77+
// Force shouldEnsureConsuming=false by simulating admin pause
78+
when(_pinotHelixResourceManager.getTableConfig(tableName)).thenReturn(tableConfig);
79+
when(_llcRealtimeSegmentManager.getPauseStatusDetails(tableName))
80+
.thenReturn(new PauseStatusDetails(true, null, PauseState.ReasonCode.ADMINISTRATIVE, null, null));
81+
82+
_realtimeSegmentValidationManager.processTable(tableName, new RealtimeSegmentValidationManager.Context());
83+
84+
verify(_llcRealtimeSegmentManager, times(1))
85+
.repairSegmentsInErrorState(eq(tableConfig), anyBoolean());
86+
}
87+
88+
@Test
89+
public void testNoReingestionWhenPauselessEnabled() {
90+
String rawTable = "testTable";
91+
String tableName = rawTable + "_REALTIME";
92+
TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(rawTable)
93+
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap()).build();
94+
95+
// Enable pauseless on ingestion config
96+
org.apache.pinot.spi.config.table.ingestion.IngestionConfig ingestionConfig =
97+
new org.apache.pinot.spi.config.table.ingestion.IngestionConfig();
98+
org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig streamIngestionConfig =
99+
new org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig(
100+
java.util.List.of(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap()));
101+
streamIngestionConfig.setPauselessConsumptionEnabled(true);
102+
ingestionConfig.setStreamIngestionConfig(streamIngestionConfig);
103+
tableConfig.setIngestionConfig(ingestionConfig);
104+
105+
// Force shouldEnsureConsuming=false by simulating admin pause (to avoid ensureAllPartitionsConsuming)
106+
when(_pinotHelixResourceManager.getTableConfig(tableName)).thenReturn(tableConfig);
107+
when(_llcRealtimeSegmentManager.getPauseStatusDetails(tableName))
108+
.thenReturn(new PauseStatusDetails(true, null, PauseState.ReasonCode.ADMINISTRATIVE, null, null));
109+
110+
_realtimeSegmentValidationManager.processTable(tableName, new RealtimeSegmentValidationManager.Context());
111+
112+
verify(_llcRealtimeSegmentManager, times(1))
113+
.repairSegmentsInErrorState(eq(tableConfig), anyBoolean());
114+
}
115+
67116
@AfterMethod
68117
public void tearDown()
69118
throws Exception {
@@ -77,21 +126,32 @@ public Object[][] testCases() {
77126
{true, PauseState.ReasonCode.ADMINISTRATIVE, UtilizationChecker.CheckResult.PASS, false, false},
78127

79128
// Resource utilization exceeded and pause state is updated, should return false
80-
{false, PauseState.ReasonCode.RESOURCE_UTILIZATION_LIMIT_EXCEEDED, UtilizationChecker.CheckResult.FAIL, false,
81-
false},
129+
{
130+
false, PauseState.ReasonCode.RESOURCE_UTILIZATION_LIMIT_EXCEEDED, UtilizationChecker.CheckResult.FAIL,
131+
false,
132+
false
133+
},
82134

83135
// Resource utilization is within limits but was previously paused due to resource utilization,
84136
// should return true
85-
{true, PauseState.ReasonCode.RESOURCE_UTILIZATION_LIMIT_EXCEEDED, UtilizationChecker.CheckResult.PASS, false,
86-
true},
137+
{
138+
true, PauseState.ReasonCode.RESOURCE_UTILIZATION_LIMIT_EXCEEDED, UtilizationChecker.CheckResult.PASS, false,
139+
true
140+
},
87141

88142
// Resource utilization is STALE but was previously paused due to resource utilization, should return false
89-
{true, PauseState.ReasonCode.RESOURCE_UTILIZATION_LIMIT_EXCEEDED, UtilizationChecker.CheckResult.UNDETERMINED,
90-
false, false},
143+
{
144+
true, PauseState.ReasonCode.RESOURCE_UTILIZATION_LIMIT_EXCEEDED,
145+
UtilizationChecker.CheckResult.UNDETERMINED,
146+
false, false
147+
},
91148

92149
// Resource utilization is STALE but was not previously paused due to resource utilization, should return true
93-
{false, PauseState.ReasonCode.RESOURCE_UTILIZATION_LIMIT_EXCEEDED, UtilizationChecker.CheckResult.UNDETERMINED,
94-
false, true},
150+
{
151+
false, PauseState.ReasonCode.RESOURCE_UTILIZATION_LIMIT_EXCEEDED,
152+
UtilizationChecker.CheckResult.UNDETERMINED,
153+
false, true
154+
},
95155

96156
// Resource utilization is within limits but was previously paused due to storage quota exceeded,
97157
// should return false
@@ -101,7 +161,8 @@ public Object[][] testCases() {
101161
{false, PauseState.ReasonCode.STORAGE_QUOTA_EXCEEDED, UtilizationChecker.CheckResult.PASS, true, false},
102162

103163
// Storage quota within limits but was previously paused due to storage quota exceeded, should return true
104-
{true, PauseState.ReasonCode.STORAGE_QUOTA_EXCEEDED, UtilizationChecker.CheckResult.PASS, false, true}};
164+
{true, PauseState.ReasonCode.STORAGE_QUOTA_EXCEEDED, UtilizationChecker.CheckResult.PASS, false, true}
165+
};
105166
}
106167

107168
@Test(dataProvider = "testCases")

0 commit comments

Comments
 (0)