diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java index d8ab16f41d3ac..ba7a5418e617d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java @@ -65,7 +65,10 @@ @InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"}) @InterfaceStability.Unstable public class LocalDirAllocator { - + + static final String E_NO_SPACE_AVAILABLE = + "No space available in any of the local directories"; + //A Map from the config item names like "mapred.local.dir" //to the instance of the AllocatorPerContext. This //is a static object to make sure there exists exactly one instance per JVM @@ -384,6 +387,24 @@ int getCurrentDirectoryIndex() { return currentContext.get().dirNumLastAccessed.get(); } + /** + * Format a string, log at debug and append it to the history as a new line. + * + * @param history history to fill in + * @param fmt format string + * @param args varags + */ + private void note(StringBuilder history, String fmt, Object... args) { + try { + final String s = String.format(fmt, args); + history.append(s).append("\n"); + LOG.debug(s); + } catch (Exception e) { + // some resilience in case the format string is wrong + LOG.debug(fmt, e); + } + } + /** Get a path from the local FS. If size is known, we go * round-robin over the set of disks (via the configured dirs) and return * the first complete path which has enough space. @@ -393,6 +414,12 @@ int getCurrentDirectoryIndex() { */ public Path getLocalPathForWrite(String pathStr, long size, Configuration conf, boolean checkWrite) throws IOException { + + // history is built up and logged at error if the alloc + StringBuilder history = new StringBuilder(); + + note(history, "Searchng for a directory for file at %s, size = %,d; checkWrite=%s", + pathStr, size, checkWrite); Context ctx = confChanged(conf); int numDirs = ctx.localDirs.length; int numDirsSearched = 0; @@ -406,27 +433,56 @@ public Path getLocalPathForWrite(String pathStr, long size, pathStr = pathStr.substring(1); } Path returnPath = null; - - if(size == SIZE_UNKNOWN) { //do roulette selection: pick dir with probability - //proportional to available size - long[] availableOnDisk = new long[ctx.dirDF.length]; - long totalAvailable = 0; - - //build the "roulette wheel" - for(int i =0; i < ctx.dirDF.length; ++i) { - final DF target = ctx.dirDF[i]; - // attempt to recreate the dir so that getAvailable() is valid - // if it fails, getAvailable() will return 0, so the dir will - // be declared unavailable. - // return value is logged at debug to keep spotbugs quiet. - final boolean b = new File(target.getDirPath()).mkdirs(); - LOG.debug("mkdirs of {}={}", target, b); - availableOnDisk[i] = target.getAvailable(); + + final int dirCount = ctx.dirDF.length; + long[] availableOnDisk = new long[dirCount]; + long totalAvailable = 0; + + StringBuilder pathNames = new StringBuilder(); + + //build the "roulette wheel" + for (int i =0; i < dirCount; ++i) { + final DF target = ctx.dirDF[i]; + // attempt to recreate the dir so that getAvailable() is valid + // if it fails, getAvailable() will return 0, so the dir will + // be declared unavailable. + // return value is logged at debug to keep spotbugs quiet. + final String name = target.getDirPath(); + pathNames.append(" ").append(name); + final File dirPath = new File(name); + + // existence probe + if (!dirPath.exists()) { + LOG.debug("creating buffer dir {}", name); + if (dirPath.mkdirs()) { + note(history, "Created buffer dir %s", name); + } else { + note(history, "Failed to create buffer dir %s", name); + } + } + + // path already existed or the mkdir call had an outcome + // make sure the path is present and a dir, and if so add its availability + if (dirPath.isDirectory()) { + final long available = target.getAvailable(); + availableOnDisk[i] = available; + note(history, "%s available under path %s", pathStr, available); totalAvailable += availableOnDisk[i]; + } else { + note(history, "%s does not exist/is not a directory", pathStr); } + } + + note(history, "Directory count is %d; total available capacity is %,d{}", + dirCount, totalAvailable); - if (totalAvailable == 0){ - throw new DiskErrorException("No space available in any of the local directories."); + if (size == SIZE_UNKNOWN) { + //do roulette selection: pick dir with probability + // proportional to available size + note(history, "Size not specified, so picking at random."); + + if (totalAvailable == 0) { + throw new DiskErrorException(E_NO_SPACE_AVAILABLE + pathNames + "; history=" + history); } // Keep rolling the wheel till we get a valid path @@ -439,14 +495,19 @@ public Path getLocalPathForWrite(String pathStr, long size, dir++; } ctx.dirNumLastAccessed.set(dir); - returnPath = createPath(ctx.localDirs[dir], pathStr, checkWrite); + final Path localDir = ctx.localDirs[dir]; + returnPath = createPath(localDir, pathStr, checkWrite); if (returnPath == null) { totalAvailable -= availableOnDisk[dir]; availableOnDisk[dir] = 0; // skip this disk numDirsSearched++; + note(history, "No capacity in %s", localDir); + } else { + note(history, "Allocated file %s in %s", returnPath, localDir); } } } else { + note(history, "Size is %,d; searching", size); // Start linear search with random increment if possible int randomInc = 1; if (numDirs > 2) { @@ -459,17 +520,22 @@ public Path getLocalPathForWrite(String pathStr, long size, maxCapacity = capacity; } if (capacity > size) { + final Path localDir = ctx.localDirs[dirNum]; try { - returnPath = createPath(ctx.localDirs[dirNum], pathStr, - checkWrite); + returnPath = createPath(localDir, pathStr, checkWrite); } catch (IOException e) { errorText = e.getMessage(); diskException = e; - LOG.debug("DiskException caught for dir {}", ctx.localDirs[dirNum], e); + note(history, "Exception while creating path %s: %s", localDir, errorText); + LOG.debug("DiskException caught for dir {}", localDir, e); } if (returnPath != null) { + // success ctx.getAndIncrDirNumLastAccessed(numDirsSearched); + note(history, "Allocated file %s in %s", returnPath, localDir); break; + } else { + note(history, "No capacity in %s", localDir); } } dirNum++; @@ -484,10 +550,14 @@ public Path getLocalPathForWrite(String pathStr, long size, //no path found String newErrorText = "Could not find any valid local directory for " + pathStr + " with requested size " + size + - " as the max capacity in any directory is " + maxCapacity; + " as the max capacity in any directory" + + " (" + pathNames + " )" + + " is " + maxCapacity; if (errorText != null) { newErrorText = newErrorText + " due to " + errorText; } + LOG.error(newErrorText); + LOG.error(history.toString()); throw new DiskErrorException(newErrorText, diskException); } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalDirAllocator.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalDirAllocator.java index eb6d251add0c5..0e89e29dace65 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalDirAllocator.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalDirAllocator.java @@ -26,14 +26,16 @@ import java.util.NoSuchElementException; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.test.LambdaTestUtils; import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.apache.hadoop.util.Shell; +import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; +import static org.apache.hadoop.fs.LocalDirAllocator.E_NO_SPACE_AVAILABLE; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.apache.hadoop.test.PlatformAssumptions.assumeNotWindows; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -564,13 +566,8 @@ public void testGetLocalPathForWriteForInvalidPaths(String paramRoot, String par throws Exception { initTestLocalDirAllocator(paramRoot, paramPrefix); conf.set(CONTEXT, " "); - try { - dirAllocator.getLocalPathForWrite("/test", conf); - fail("not throwing the exception"); - } catch (IOException e) { - assertEquals("No space available in any of the local directories.", - e.getMessage(), "Incorrect exception message"); - } + intercept(IOException.class, E_NO_SPACE_AVAILABLE, () -> + dirAllocator.getLocalPathForWrite("/test", conf)); } /** @@ -587,10 +584,13 @@ public void testGetLocalPathForWriteForLessSpace(String paramRoot, String paramP String dir0 = buildBufferDir(root, 0); String dir1 = buildBufferDir(root, 1); conf.set(CONTEXT, dir0 + "," + dir1); - LambdaTestUtils.intercept(DiskErrorException.class, + final DiskErrorException ex = intercept(DiskErrorException.class, String.format("Could not find any valid local directory for %s with requested size %s", "p1/x", Long.MAX_VALUE - 1), "Expect a DiskErrorException.", () -> dirAllocator.getLocalPathForWrite("p1/x", Long.MAX_VALUE - 1, conf)); + Assertions.assertThat(ex.getMessage()) + .contains(new File(dir0).getName()) + .contains(new File(dir1).getName()); } /** @@ -614,5 +614,31 @@ public void testDirectoryRecovery(String paramRoot, String paramPrefix) throws T // and expect to get a new file back dirAllocator.getLocalPathForWrite("file2", -1, conf); } + + + /** + * Test for HADOOP-19554. LocalDirAllocator still doesn't always recover + * from directory tree deletion. + */ + @Timeout(value = 30) + @MethodSource("params") + @ParameterizedTest + public void testDirectoryRecoveryKnownSize(String paramRoot, String paramPrefix) throws Throwable { + initTestLocalDirAllocator(paramRoot, paramPrefix); + String dir0 = buildBufferDir(root, 0); + String subdir = dir0 + "/subdir1/subdir2"; + + conf.set(CONTEXT, subdir); + // get local path and an ancestor + final Path pathForWrite = dirAllocator.getLocalPathForWrite("file", 512, conf); + final Path ancestor = pathForWrite.getParent().getParent(); + + // delete that ancestor + localFs.delete(ancestor, true); + // and expect to get a new file back + dirAllocator.getLocalPathForWrite("file2", -1, conf); + } + + }