Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.common.exception;

public class SegmentIngestionFailureException extends RuntimeException {

public SegmentIngestionFailureException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ private void processNewSegment(TableConfig tableConfig, SegmentMetadata segmentM
finalSegmentLocationURI, enableParallelPushProtection, segmentUploadStartTime);

try {
_pinotHelixResourceManager.assignSegment(tableConfig, segmentZKMetadata);
_pinotHelixResourceManager.assignSegmentWithRetry(tableConfig, segmentZKMetadata);
} catch (Exception e) {
// assignTableSegment removes the zk entry.
// Call deleteSegment to remove the segment from permanent location if needed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException;
import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.assignment.InstancePartitionsUtils;
Expand All @@ -97,6 +98,7 @@
import org.apache.pinot.common.exception.SchemaAlreadyExistsException;
import org.apache.pinot.common.exception.SchemaBackwardIncompatibleException;
import org.apache.pinot.common.exception.SchemaNotFoundException;
import org.apache.pinot.common.exception.SegmentIngestionFailureException;
import org.apache.pinot.common.exception.TableNotFoundException;
import org.apache.pinot.common.lineage.LineageEntry;
import org.apache.pinot.common.lineage.LineageEntryState;
Expand Down Expand Up @@ -2540,7 +2542,7 @@ public void addNewSegment(String tableNameWithType, SegmentMetadata segmentMetad
"Failed to create ZK metadata for table: " + tableNameWithType + ", segment: " + segmentName);
LOGGER.info("Added segment: {} of table: {} to property store", segmentName, tableNameWithType);

assignSegment(tableConfig, segmentZKMetadata);
assignSegmentWithRetry(tableConfig, segmentZKMetadata);
}

public boolean needTieredSegmentAssignment(TableConfig tableConfig) {
Expand All @@ -2552,51 +2554,117 @@ public List<Tier> getSortedTiers(TableConfig tableConfig) {
TierFactory.PINOT_SERVER_STORAGE_TYPE);
}

public void assignSegment(TableConfig tableConfig, SegmentZKMetadata segmentZKMetadata) {
public void assignSegmentWithRetry(TableConfig tableConfig, SegmentZKMetadata segmentZKMetadata) {
String tableNameWithType = tableConfig.getTableName();
String segmentName = segmentZKMetadata.getSegmentName();

// Assign instances for the segment and add it into IdealState
try {
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap;
// TODO: Support direct tier assignment for UPLOADED real-time segments
if (TableNameBuilder.isOfflineTableResource(tableNameWithType)) {
instancePartitionsMap = getInstacePartitionsMap(tableConfig, segmentZKMetadata.getTier());
assignSegmentInternal(tableConfig, segmentZKMetadata);
} catch (Exception e) {
if (containsException(e, ZkInterruptedException.class)) {
LOGGER.warn("Encountered ZkInterruptedException while assigning segment: {} to table: {}. Retrying once...",
segmentName, tableNameWithType);

try {
assignSegmentInternal(tableConfig, segmentZKMetadata);
LOGGER.info("Successfully assigned segment: {} to table: {} on retry", segmentName, tableNameWithType);
return;
} catch (Exception retryException) {
LOGGER.error("Retry failed for assigning segment: {} to table {}. Proceeding with cleanup.",
segmentName, tableNameWithType, retryException);
}
}

handleAssignmentFailure(tableNameWithType, segmentName, e);
}
}

private void handleAssignmentFailure(String tableNameWithType, String segmentName, Exception originalException) {
LOGGER.error(
"Caught exception while adding segment: {} to IdealState for table: {}, deleting segment ZK metadata",
segmentName, tableNameWithType, originalException);
if (removeSegmentZKMetadata(tableNameWithType, segmentName)) {
LOGGER.info("Deleted segment ZK metadata for segment: {} of table: {}", segmentName, tableNameWithType);
} else {
LOGGER.error("Failed to delete segment ZK metadata for segment: {} of table: {}", segmentName,
tableNameWithType);
}

if (containsException(originalException, ZkInterruptedException.class)) {
LOGGER.warn("Encountered ZkInterruptedException while assigning segment: {} to table: {}. "
+ "Deleting segment to prevent inconsistent state.",
segmentName, tableNameWithType);

PinotResourceManagerResponse response = deleteSegment(tableNameWithType, segmentName);
String errorMessage;
if (!response.isSuccessful()) {
errorMessage =
String.format("Failed to delete segment: %s of table: %s after ZkInterruptedException. Response: %s",
segmentName, tableNameWithType, response.getMessage());
} else {
instancePartitionsMap = fetchOrComputeInstancePartitions(tableNameWithType, tableConfig);
errorMessage = String.format(
"Failed to assign segment: %s to table: %s due to ZkInterruptedException. "
+ "Segment deleted successfully.",
segmentName, tableNameWithType);
}
LOGGER.error(errorMessage);
throw new SegmentIngestionFailureException(errorMessage);
}

SegmentAssignment segmentAssignment =
SegmentAssignmentFactory.getSegmentAssignment(_helixZkManager, tableConfig, _controllerMetrics);
HelixHelper.updateIdealState(_helixZkManager, tableNameWithType, idealState -> {
assert idealState != null;
Map<String, Map<String, String>> currentAssignment = idealState.getRecord().getMapFields();
if (currentAssignment.containsKey(segmentName)) {
LOGGER.warn("Segment: {} already exists in the IdealState for table: {}, do not update", segmentName,
tableNameWithType);
} else {
List<String> assignedInstances =
segmentAssignment.assignSegment(segmentName, currentAssignment, instancePartitionsMap);
LOGGER.info("Assigning segment: {} to instances: {} for table: {}", segmentName, assignedInstances,
tableNameWithType);
currentAssignment.put(segmentName,
SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, SegmentStateModel.ONLINE));
}
return idealState;
});
LOGGER.info("Added segment: {} to IdealState for table: {}", segmentName, tableNameWithType);
} catch (Exception e) {
LOGGER.error(
"Caught exception while adding segment: {} to IdealState for table: {}, deleting segment ZK metadata",
segmentName, tableNameWithType, e);
if (removeSegmentZKMetadata(tableNameWithType, segmentName)) {
LOGGER.info("Deleted segment ZK metadata for segment: {} of table: {}", segmentName, tableNameWithType);
throw new RuntimeException(originalException);
}

private void assignSegmentInternal(TableConfig tableConfig, SegmentZKMetadata segmentZKMetadata) {
String tableNameWithType = tableConfig.getTableName();
String segmentName = segmentZKMetadata.getSegmentName();

// Assign instances for the segment and add it into IdealState
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap;
// TODO: Support direct tier assignment for UPLOADED real-time segments
if (TableNameBuilder.isOfflineTableResource(tableNameWithType)) {
instancePartitionsMap = getInstacePartitionsMap(tableConfig, segmentZKMetadata.getTier());
} else {
instancePartitionsMap = fetchOrComputeInstancePartitions(tableNameWithType, tableConfig);
}

SegmentAssignment segmentAssignment =
SegmentAssignmentFactory.getSegmentAssignment(_helixZkManager, tableConfig, _controllerMetrics);
HelixHelper.updateIdealState(_helixZkManager, tableNameWithType, idealState -> {
assert idealState != null;
Map<String, Map<String, String>> currentAssignment = idealState.getRecord().getMapFields();
if (currentAssignment.containsKey(segmentName)) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also note: by retrying, we are also checking if the segment exists automatically here which is nice

LOGGER.warn("Segment: {} already exists in the IdealState for table: {}, do not update", segmentName,
tableNameWithType);
} else {
LOGGER.error("Failed to deleted segment ZK metadata for segment: {} of table: {}", segmentName,
List<String> assignedInstances =
segmentAssignment.assignSegment(segmentName, currentAssignment, instancePartitionsMap);
LOGGER.info("Assigning segment: {} to instances: {} for table: {}", segmentName, assignedInstances,
tableNameWithType);
currentAssignment.put(segmentName,
SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, SegmentStateModel.ONLINE));
}
throw e;
return idealState;
});
LOGGER.info("Added segment: {} to IdealState for table: {}", segmentName, tableNameWithType);
}

/**
* Checks if the given exception or any exception in its causal chain
* is an instance of the specified exception type.
*
* @param exception the exception to check
* @param exceptionType the exception type to look for
* @return true if the exception type is found in the chain, false otherwise
*/
public static boolean containsException(Throwable exception, Class<? extends Throwable> exceptionType) {
Throwable current = exception;
while (current != null) {
if (exceptionType.isInstance(current)) {
return true;
}
current = current.getCause();
}
return false;
}

private Map<InstancePartitionsType, InstancePartitions> getInstacePartitionsMap(TableConfig tableConfig,
Expand Down
Loading
Loading