Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,14 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
// Whether to enable datanode's stale state detection and usage for writes
public static final String DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY = "dfs.namenode.avoid.write.stale.datanode";
public static final boolean DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT = false;
// Slow disk cache rebuild interval
public static final String DFS_NAMENODE_SLOW_DISK_CACHE_REBUILD_INTERVAL_KEY =
"dfs.namenode.slow.disk.cache.rebuild.interval";
public static final String DFS_NAMENODE_SLOW_DISK_CACHE_REBUILD_INTERVAL_DEFAULT = "30s";
// Whether to deprioritize slow disk datanodes when returning block locations
public static final String DFS_NAMENODE_DEPRIORITIZE_SLOW_DISK_DATANODE_FOR_READ_KEY =
"dfs.namenode.deprioritize.slow.disk.datanode.for.read";
public static final boolean DFS_NAMENODE_DEPRIORITIZE_SLOW_DISK_DATANODE_FOR_READ_DEFAULT = false;
// enable and disable logging datanode staleness. Disabled by default.
public static final String DFS_NAMENODE_ENABLE_LOG_STALE_DATANODE_KEY =
"dfs.namenode.enable.log.stale.datanode";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Collections;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Map;
Expand Down Expand Up @@ -81,9 +83,12 @@ public class SlowDiskTracker {
private final int maxDisksToReport;
private static final String DATANODE_DISK_SEPARATOR = ":";
private final long reportGenerationIntervalMs;
private final long cacheRebuildIntervalMs;

private volatile long lastUpdateTime;
private volatile long lastCacheRebuildTime;
private AtomicBoolean isUpdateInProgress = new AtomicBoolean(false);
private AtomicBoolean isCacheRebuildInProgress = new AtomicBoolean(false);

/**
* Information about disks that have been reported as being slow.
Expand All @@ -93,6 +98,17 @@ public class SlowDiskTracker {
*/
private final Map<String, DiskLatency> diskIDLatencyMap;

/**
* Cached slow disk map for efficient read path lookup.
*
* <p>Key format: {@code IP:PORT:StorageID}.
*
* <p>Uses a copy-on-write strategy: heartbeat processing only updates
* {@code diskIDLatencyMap}; an async thread periodically rebuilds this cache
* and atomically swaps the reference.
*/
private volatile Map<String, Double> cachedSlowDisksForRead = Collections.emptyMap();

/**
* Map of slow disk -> diskOperations it has been reported slow in.
*/
Expand All @@ -103,6 +119,7 @@ public class SlowDiskTracker {
public SlowDiskTracker(Configuration conf, Timer timer) {
this.timer = timer;
this.lastUpdateTime = timer.monotonicNow();
this.lastCacheRebuildTime = timer.monotonicNow();
this.diskIDLatencyMap = new ConcurrentHashMap<>();
this.reportGenerationIntervalMs = conf.getTimeDuration(
DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY,
Expand All @@ -112,6 +129,34 @@ public SlowDiskTracker(Configuration conf, Timer timer) {
DFSConfigKeys.DFS_DATANODE_MAX_DISKS_TO_REPORT_KEY,
DFSConfigKeys.DFS_DATANODE_MAX_DISKS_TO_REPORT_DEFAULT);
this.reportValidityMs = reportGenerationIntervalMs * 3;
this.cacheRebuildIntervalMs = conf.getTimeDuration(
DFSConfigKeys.DFS_NAMENODE_SLOW_DISK_CACHE_REBUILD_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_SLOW_DISK_CACHE_REBUILD_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS);

}

/**
* Get the Top-N slow disk latency map for WebUI and monitoring.
*
* @return slow disk latency map with key format "IP:PORT:volumeName"
*/
public Map<String, Double> getSlowDiskLatencyMap() {
Map<String, Double> latencyMap = new HashMap<>();
for (DiskLatency dl : slowDisksReport) {
String legacyKey = extractDiskKey(dl.getSlowDiskID(), KeyExtractMode.LEGACY_KEY);
latencyMap.put(legacyKey, dl.getMaxLatency());
}
return latencyMap;
}

/**
* Get all valid slow disks for read path lookup.
*
* @return cached slow disk map with key format "IP:PORT:StorageID"
*/
public Map<String, Double> getAllValidSlowDisks() {
return cachedSlowDisksForRead;
}

@VisibleForTesting
Expand Down Expand Up @@ -141,12 +186,61 @@ public void addSlowDiskReport(String dataNodeID,

}

/**
* Extraction mode for slow disk key formatting.
*/
private enum KeyExtractMode {
CACHE_KEY,
LEGACY_KEY
}

/**
* Extract a formatted key from a slow disk ID.
*
* <p>The slowDiskID format is "IP:PORT:volumeName|storageID".
* CACHE_KEY mode returns "IP:PORT:StorageID" (null if parse fails).
* LEGACY_KEY mode returns "IP:PORT:volumeName" (original if parse fails).</p>
*/
private static String extractDiskKey(String slowDiskID, KeyExtractMode mode) {
if (slowDiskID == null || slowDiskID.isEmpty()) {
return mode == KeyExtractMode.CACHE_KEY ? null : slowDiskID;
}

int lastColonIndex = slowDiskID.lastIndexOf(':');
if (lastColonIndex <= 0 || lastColonIndex >= slowDiskID.length() - 1) {
return mode == KeyExtractMode.CACHE_KEY ? null : slowDiskID;
}

String datanodeAddr = slowDiskID.substring(0, lastColonIndex); // "IP:PORT"
String diskInfo = slowDiskID.substring(lastColonIndex + 1);

int pipeIndex = diskInfo.indexOf('|');
if (pipeIndex < 0) {
return mode == KeyExtractMode.CACHE_KEY ? null : slowDiskID;
}

// Extract different parts based on mode
if (mode == KeyExtractMode.CACHE_KEY) {
if (pipeIndex >= diskInfo.length() - 1) {
return null;
}
String storageID = diskInfo.substring(pipeIndex + 1);
return datanodeAddr + ":" + storageID;
} else {
String volumeName = diskInfo.substring(0, pipeIndex);
return datanodeAddr + ":" + volumeName;
}
}

public void checkAndUpdateReportIfNecessary() {
// Check if it is time for update
long now = timer.monotonicNow();
if (now - lastUpdateTime > reportGenerationIntervalMs) {
updateSlowDiskReportAsync(now);
}
if (now - lastCacheRebuildTime > cacheRebuildIntervalMs) {
rebuildSlowDiskCacheAsync(now);
}
}

@VisibleForTesting
Expand All @@ -167,6 +261,50 @@ public void run() {
}
}

/**
* Asynchronously rebuild the slow disk cache.
*/
private void rebuildSlowDiskCacheAsync(long now) {
if (isCacheRebuildInProgress.compareAndSet(false, true)) {
lastCacheRebuildTime = now;
new SubjectInheritingThread(new Runnable() {
@Override
public void run() {
try {
rebuildSlowDiskCache(now);
} finally {
isCacheRebuildInProgress.set(false);
}
}
}).start();
}
}

/**
* Rebuild the slow disk cache in full (called in an async thread).
*/
private void rebuildSlowDiskCache(long now) {
Map<String, Double> newCache = new HashMap<>();

for (Map.Entry<String, DiskLatency> entry : diskIDLatencyMap.entrySet()) {
DiskLatency diskLatency = entry.getValue();

// Timeliness check: Only valid slow disks on the server are checked.
if (now - diskLatency.timestamp >= reportValidityMs) {
continue;
}

String cacheKey = extractDiskKey(entry.getKey(), KeyExtractMode.CACHE_KEY);
if (cacheKey != null) {
newCache.put(cacheKey, diskLatency.getMaxLatency());
}
}

cachedSlowDisksForRead = newCache;

LOG.debug("Rebuilt slow disk cache: {} valid slow disks", newCache.size());
}

/**
* This structure is a thin wrapper over disk latencies.
*/
Expand Down Expand Up @@ -267,7 +405,15 @@ public String getSlowDiskReportAsJsonString() {
if (slowDisksReport.isEmpty()) {
return null;
}
return WRITER.writeValueAsString(slowDisksReport);
// Transform slowDiskID to legacy format (IP:PORT:volumeName)
// for backward compatibility with existing JSON consumers.
ArrayList<DiskLatency> reportForJson = Lists.newArrayList();
for (DiskLatency dl : slowDisksReport) {
String legacyID = extractDiskKey(dl.getSlowDiskID(),
KeyExtractMode.LEGACY_KEY);
reportForJson.add(new DiskLatency(legacyID, dl.latencyMap));
}
return WRITER.writeValueAsString(reportForJson);
} catch (JsonProcessingException e) {
// Failed to serialize. Don't log the exception call stack.
LOG.debug("Failed to serialize statistics" + e);
Expand All @@ -286,17 +432,17 @@ private void cleanUpOldReports(long now) {
}

@VisibleForTesting
ArrayList<DiskLatency> getSlowDisksReport() {
public ArrayList<DiskLatency> getSlowDisksReport() {
return this.slowDisksReport;
}

@VisibleForTesting
long getReportValidityMs() {
public long getReportValidityMs() {
return reportValidityMs;
}

@VisibleForTesting
void setReportValidityMs(long reportValidityMs) {
public void setReportValidityMs(long reportValidityMs) {
this.reportValidityMs = reportValidityMs;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ public void run() {
while (volumeIterator.hasNext()) {
FsVolumeSpi volume = volumeIterator.next();
DataNodeVolumeMetrics metrics = volume.getMetrics();
String volumeName = volume.getBaseURI().getPath();
String storageID = volume.getStorageID();
String volumeName = volume.getBaseURI().getPath()+ "|" + storageID;

metadataOpStats.put(volumeName,
metrics.getMetadataOperationMean());
Expand Down Expand Up @@ -159,7 +160,13 @@ public void run() {
-> Double.compare(o2.getMaxLatency(), o1.getMaxLatency()));

slowDisksToExclude = diskLatencies.stream().limit(maxSlowDisksToExclude)
.map(DiskLatency::getSlowDisk).collect(Collectors.toList());
.map(dl -> {
// Extract pure volume path for FsVolumeList comparison.
// diskKey format: "volumePath|storageID"
String disk = dl.getSlowDisk();
int pipeIdx = disk.indexOf('|');
return pipeIdx >= 0 ? disk.substring(0, pipeIdx) : disk;
}).collect(Collectors.toList());
}
}

Expand Down
Loading
Loading