Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@
import org.apache.pinot.controller.helix.core.util.HelixSetupUtils;
import org.apache.pinot.controller.helix.starter.HelixConfig;
import org.apache.pinot.controller.services.PinotTableReloadService;
import org.apache.pinot.controller.services.PinotTableReloadStatusReporter;
import org.apache.pinot.controller.tuner.TableConfigTunerRegistry;
import org.apache.pinot.controller.util.BrokerServiceHelper;
import org.apache.pinot.controller.util.TableSizeReader;
Expand Down Expand Up @@ -683,6 +684,7 @@ protected void configure() {
bind(controllerStartTime).named(ControllerAdminApiApplication.START_TIME);

bindAsContract(PinotTableReloadService.class).in(Singleton.class);
bindAsContract(PinotTableReloadStatusReporter.class).in(Singleton.class);

String loggerRootDir = _config.getProperty(CommonConstants.Controller.CONFIG_OF_LOGGER_ROOT_DIR);
if (loggerRootDir != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.pinot.controller.api.access.AccessType;
import org.apache.pinot.controller.api.access.Authenticate;
import org.apache.pinot.controller.services.PinotTableReloadService;
import org.apache.pinot.controller.services.PinotTableReloadStatusReporter;
import org.apache.pinot.core.auth.Actions;
import org.apache.pinot.core.auth.Authorize;
import org.apache.pinot.core.auth.TargetType;
Expand Down Expand Up @@ -87,11 +88,14 @@
public class PinotTableReloadResource {
private static final Logger LOG = LoggerFactory.getLogger(PinotTableReloadResource.class);

private final PinotTableReloadService _pinotTableReloadService;
private final PinotTableReloadService _service;
private final PinotTableReloadStatusReporter _statusReporter;

@Inject
public PinotTableReloadResource(PinotTableReloadService pinotTableReloadService) {
_pinotTableReloadService = pinotTableReloadService;
public PinotTableReloadResource(PinotTableReloadService service,
PinotTableReloadStatusReporter statusReporter) {
_service = service;
_statusReporter = statusReporter;
}

@POST
Expand All @@ -114,7 +118,7 @@ public SuccessResponse reloadSegment(
@QueryParam("forceDownload") @DefaultValue("false") boolean forceDownload,
@ApiParam(value = "Target specific server instance") @QueryParam("targetInstance") @Nullable
String targetInstance, @Context HttpHeaders headers) {
return _pinotTableReloadService.reloadSegment(tableName, segmentName, forceDownload, targetInstance, headers);
return _service.reloadSegment(tableName, segmentName, forceDownload, targetInstance, headers);
}

@POST
Expand All @@ -140,7 +144,7 @@ public SuccessResponse reloadAllSegments(
@ApiParam(value = "JSON map of instance to segment lists (overrides targetInstance)")
@QueryParam("instanceToSegmentsMap") @Nullable String instanceToSegmentsMapInJson, @Context HttpHeaders headers)
throws IOException {
return _pinotTableReloadService.reloadAllSegments(tableName, tableTypeStr, forceDownload, targetInstance,
return _service.reloadAllSegments(tableName, tableTypeStr, forceDownload, targetInstance,
instanceToSegmentsMapInJson, headers);
}

Expand All @@ -157,7 +161,7 @@ public SuccessResponse reloadAllSegments(
public ServerReloadControllerJobStatusResponse getReloadJobStatus(
@ApiParam(value = "Reload job ID returned from reload endpoint", required = true) @PathParam("jobId")
String reloadJobId) throws Exception {
return _pinotTableReloadService.getReloadJobStatus(reloadJobId);
return _statusReporter.getReloadJobStatus(reloadJobId);
}

@GET
Expand All @@ -170,11 +174,11 @@ public ServerReloadControllerJobStatusResponse getReloadJobStatus(
@ApiResponse(code = 200, message = "Reload check completed successfully"),
@ApiResponse(code = 400, message = "Invalid table configuration")
})
public String getTableReloadMetadata(
public String needReload(
@ApiParam(value = "Table name with type suffix", required = true, example = "myTable_REALTIME")
@PathParam("tableNameWithType") String tableNameWithType,
@ApiParam(value = "Include detailed server responses", defaultValue = "false") @QueryParam("verbose")
@DefaultValue("false") boolean verbose, @Context HttpHeaders headers) {
return _pinotTableReloadService.getTableReloadMetadata(tableNameWithType, verbose, headers);
return _service.needReload(tableNameWithType, verbose, headers);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,55 +33,64 @@ public int getTotalSegmentCount() {
return _totalSegmentCount;
}

public void setTotalSegmentCount(int totalSegmentCount) {
public ServerReloadControllerJobStatusResponse setTotalSegmentCount(int totalSegmentCount) {
_totalSegmentCount = totalSegmentCount;
return this;
}

public int getSuccessCount() {
return _successCount;
}

public void setSuccessCount(int successCount) {
public ServerReloadControllerJobStatusResponse setSuccessCount(int successCount) {
_successCount = successCount;
return this;
}

public double getEstimatedTimeRemainingInMinutes() {
return _estimatedTimeRemainingInMinutes;
}

public void setEstimatedTimeRemainingInMinutes(double estimatedTimeRemainingInMillis) {
_estimatedTimeRemainingInMinutes = estimatedTimeRemainingInMillis;
public ServerReloadControllerJobStatusResponse setEstimatedTimeRemainingInMinutes(
double estimatedTimeRemainingInMinutes) {
_estimatedTimeRemainingInMinutes = estimatedTimeRemainingInMinutes;
return this;
}

public double getTimeElapsedInMinutes() {
return _timeElapsedInMinutes;
}

public void setTimeElapsedInMinutes(double timeElapsedInMinutes) {
public ServerReloadControllerJobStatusResponse setTimeElapsedInMinutes(double timeElapsedInMinutes) {
_timeElapsedInMinutes = timeElapsedInMinutes;
return this;
}


public int getTotalServersQueried() {
return _totalServersQueried;
}

public void setTotalServersQueried(int totalServersQueried) {
public ServerReloadControllerJobStatusResponse setTotalServersQueried(int totalServersQueried) {
_totalServersQueried = totalServersQueried;
return this;
}

public int getTotalServerCallsFailed() {
return _totalServerCallsFailed;
}

public void setTotalServerCallsFailed(int totalServerCallsFailed) {
public ServerReloadControllerJobStatusResponse setTotalServerCallsFailed(int totalServerCallsFailed) {
_totalServerCallsFailed = totalServerCallsFailed;
return this;
}

public Map<String, String> getMetadata() {
return _metadata;
}

public void setMetadata(Map<String, String> metadata) {
public ServerReloadControllerJobStatusResponse setMetadata(Map<String, String> metadata) {
_metadata = metadata;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,12 @@

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.BiMap;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import javax.annotation.Nullable;
import javax.inject.Inject;
import javax.inject.Singleton;
import javax.ws.rs.core.HttpHeaders;
Expand All @@ -52,15 +44,11 @@
import org.apache.pinot.controller.api.exception.ControllerApplicationException;
import org.apache.pinot.controller.api.resources.Constants;
import org.apache.pinot.controller.api.resources.ResourceUtils;
import org.apache.pinot.controller.api.resources.ServerReloadControllerJobStatusResponse;
import org.apache.pinot.controller.api.resources.SuccessResponse;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes;
import org.apache.pinot.controller.util.CompletionServiceHelper;
import org.apache.pinot.controller.util.TableMetadataReader;
import org.apache.pinot.segment.spi.creator.name.SegmentNameUtils;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
Expand Down Expand Up @@ -181,101 +169,8 @@ public SuccessResponse reloadAllSegments(String tableName, String tableTypeStr,
return new SuccessResponse(JsonUtils.objectToString(perTableMsgData));
}

public ServerReloadControllerJobStatusResponse getReloadJobStatus(String reloadJobId)
throws InvalidConfigException {
Map<String, String> controllerJobZKMetadata =
_pinotHelixResourceManager.getControllerJobZKMetadata(reloadJobId, ControllerJobTypes.RELOAD_SEGMENT);
if (controllerJobZKMetadata == null) {
throw new ControllerApplicationException(LOG, "Failed to find controller job id: " + reloadJobId,
Response.Status.NOT_FOUND);
}

String tableNameWithType = controllerJobZKMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE);
String segmentNames = controllerJobZKMetadata.get(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME);
String instanceName = controllerJobZKMetadata.get(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_INSTANCE_NAME);
Map<String, List<String>> serverToSegments = getServerToSegments(tableNameWithType, segmentNames, instanceName);

BiMap<String, String> serverEndPoints =
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
CompletionServiceHelper completionServiceHelper =
new CompletionServiceHelper(_executor, _connectionManager, serverEndPoints);

List<String> serverUrls = new ArrayList<>();
for (Map.Entry<String, String> entry : serverEndPoints.entrySet()) {
String server = entry.getKey();
String endpoint = entry.getValue();
String reloadTaskStatusEndpoint =
endpoint + "/controllerJob/reloadStatus/" + tableNameWithType + "?reloadJobTimestamp="
+ controllerJobZKMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS);
if (segmentNames != null) {
List<String> segmentsForServer = serverToSegments.get(server);
StringBuilder encodedSegmentsBuilder = new StringBuilder();
if (!segmentsForServer.isEmpty()) {
Iterator<String> segmentIterator = segmentsForServer.iterator();
// Append first segment without a leading separator
encodedSegmentsBuilder.append(URIUtils.encode(segmentIterator.next()));
// Append remaining segments, each prefixed by the separator
while (segmentIterator.hasNext()) {
encodedSegmentsBuilder.append(SegmentNameUtils.SEGMENT_NAME_SEPARATOR)
.append(URIUtils.encode(segmentIterator.next()));
}
}
reloadTaskStatusEndpoint += "&segmentName=" + encodedSegmentsBuilder;
}
serverUrls.add(reloadTaskStatusEndpoint);
}

CompletionServiceHelper.CompletionServiceResponse serviceResponse =
completionServiceHelper.doMultiGetRequest(serverUrls, null, true, 10000);

ServerReloadControllerJobStatusResponse serverReloadControllerJobStatusResponse =
new ServerReloadControllerJobStatusResponse();
serverReloadControllerJobStatusResponse.setSuccessCount(0);

int totalSegments = 0;
for (Map.Entry<String, List<String>> entry : serverToSegments.entrySet()) {
totalSegments += entry.getValue().size();
}
serverReloadControllerJobStatusResponse.setTotalSegmentCount(totalSegments);
serverReloadControllerJobStatusResponse.setTotalServersQueried(serverUrls.size());
serverReloadControllerJobStatusResponse.setTotalServerCallsFailed(serviceResponse._failedResponseCount);

for (Map.Entry<String, String> streamResponse : serviceResponse._httpResponses.entrySet()) {
String responseString = streamResponse.getValue();
try {
ServerReloadControllerJobStatusResponse response =
JsonUtils.stringToObject(responseString, ServerReloadControllerJobStatusResponse.class);
serverReloadControllerJobStatusResponse.setSuccessCount(
serverReloadControllerJobStatusResponse.getSuccessCount() + response.getSuccessCount());
} catch (Exception e) {
serverReloadControllerJobStatusResponse.setTotalServerCallsFailed(
serverReloadControllerJobStatusResponse.getTotalServerCallsFailed() + 1);
}
}

// Add ZK fields
serverReloadControllerJobStatusResponse.setMetadata(controllerJobZKMetadata);

// Add derived fields
long submissionTime = Long.parseLong(controllerJobZKMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS));
double timeElapsedInMinutes = ((double) System.currentTimeMillis() - (double) submissionTime) / (1000.0 * 60.0);
int remainingSegments = serverReloadControllerJobStatusResponse.getTotalSegmentCount()
- serverReloadControllerJobStatusResponse.getSuccessCount();

double estimatedRemainingTimeInMinutes = -1;
if (serverReloadControllerJobStatusResponse.getSuccessCount() > 0) {
estimatedRemainingTimeInMinutes =
((double) remainingSegments / (double) serverReloadControllerJobStatusResponse.getSuccessCount())
* timeElapsedInMinutes;
}

serverReloadControllerJobStatusResponse.setTimeElapsedInMinutes(timeElapsedInMinutes);
serverReloadControllerJobStatusResponse.setEstimatedTimeRemainingInMinutes(estimatedRemainingTimeInMinutes);

return serverReloadControllerJobStatusResponse;
}

public String getTableReloadMetadata(String tableNameWithType, boolean verbose, HttpHeaders headers) {
public String needReload(String tableNameWithType, boolean verbose, HttpHeaders headers) {
tableNameWithType = DatabaseUtils.translateTableName(tableNameWithType, headers);
LOG.info("Received a request to check reload for all servers hosting segments for table {}", tableNameWithType);
try {
Expand Down Expand Up @@ -307,30 +202,6 @@ public String getTableReloadMetadata(String tableNameWithType, boolean verbose,
}
}

@VisibleForTesting
Map<String, List<String>> getServerToSegments(String tableNameWithType, @Nullable String segmentNames,
@Nullable String instanceName) {
if (segmentNames == null) {
// instanceName can be null or not null, and this method below can handle both cases.
return _pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType, instanceName, true);
}
// Skip servers and segments not involved in the segment reloading job.
List<String> segmnetNameList = new ArrayList<>();
Collections.addAll(segmnetNameList, StringUtils.split(segmentNames, SegmentNameUtils.SEGMENT_NAME_SEPARATOR));
if (instanceName != null) {
return Map.of(instanceName, segmnetNameList);
}
// If instance is null, then either one or all segments are being reloaded via current segment reload restful APIs.
// And the if-check at the beginning of this method has handled the case of reloading all segments. So here we
// expect only one segment name.
Preconditions.checkState(segmnetNameList.size() == 1, "Only one segment is expected but got: %s", segmnetNameList);
Map<String, List<String>> serverToSegments = new HashMap<>();
Set<String> servers = _pinotHelixResourceManager.getServers(tableNameWithType, segmentNames);
for (String server : servers) {
serverToSegments.put(server, Collections.singletonList(segmentNames));
}
return serverToSegments;
}

/**
* Helper method to find the existing table based on the given table name (with or without type suffix) and segment
Expand Down
Loading
Loading