31
31
import org .slf4j .Logger ;
32
32
import org .slf4j .LoggerFactory ;
33
33
34
+ import static org .apache .hadoop .net .NetUtils .getHostname ;
35
+
34
36
/** An implementation of a round-robin scheme for disk allocation for creating
35
37
* files. The way it works is that it is kept track what disk was last
36
38
* allocated for a file write. For the current request, the next disk from
65
67
@ InterfaceAudience .LimitedPrivate ({"HDFS" , "MapReduce" })
66
68
@ InterfaceStability .Unstable
67
69
public class LocalDirAllocator {
68
-
70
+
71
+ static final String E_NO_SPACE_AVAILABLE =
72
+ "No space available in any of the local directories" ;
73
+
69
74
//A Map from the config item names like "mapred.local.dir"
70
75
//to the instance of the AllocatorPerContext. This
71
76
//is a static object to make sure there exists exactly one instance per JVM
@@ -384,6 +389,24 @@ int getCurrentDirectoryIndex() {
384
389
return currentContext .get ().dirNumLastAccessed .get ();
385
390
}
386
391
392
+ /**
393
+ * Format a string, log at debug and append it to the history as a new line.
394
+ *
395
+ * @param history history to fill in
396
+ * @param fmt format string
397
+ * @param args varags
398
+ */
399
+ private void note (StringBuilder history , String fmt , Object ... args ) {
400
+ try {
401
+ final String s = String .format (fmt , args );
402
+ history .append (s ).append ("\n " );
403
+ LOG .debug (s );
404
+ } catch (Exception e ) {
405
+ // some resilience in case the format string is wrong
406
+ LOG .debug (fmt , e );
407
+ }
408
+ }
409
+
387
410
/** Get a path from the local FS. If size is known, we go
388
411
* round-robin over the set of disks (via the configured dirs) and return
389
412
* the first complete path which has enough space.
@@ -393,6 +416,12 @@ int getCurrentDirectoryIndex() {
393
416
*/
394
417
public Path getLocalPathForWrite (String pathStr , long size ,
395
418
Configuration conf , boolean checkWrite ) throws IOException {
419
+
420
+ // history is built up and logged at error if the alloc
421
+ StringBuilder history = new StringBuilder ();
422
+
423
+ note (history , "Searching for a directory for file \" %s\" , size = %,d; checkWrite=%s" ,
424
+ pathStr , size , checkWrite );
396
425
Context ctx = confChanged (conf );
397
426
int numDirs = ctx .localDirs .length ;
398
427
int numDirsSearched = 0 ;
@@ -406,27 +435,62 @@ public Path getLocalPathForWrite(String pathStr, long size,
406
435
pathStr = pathStr .substring (1 );
407
436
}
408
437
Path returnPath = null ;
409
-
410
- if (size == SIZE_UNKNOWN ) { //do roulette selection: pick dir with probability
411
- //proportional to available size
412
- long [] availableOnDisk = new long [ctx .dirDF .length ];
413
- long totalAvailable = 0 ;
414
-
415
- //build the "roulette wheel"
416
- for (int i =0 ; i < ctx .dirDF .length ; ++i ) {
417
- final DF target = ctx .dirDF [i ];
418
- // attempt to recreate the dir so that getAvailable() is valid
419
- // if it fails, getAvailable() will return 0, so the dir will
420
- // be declared unavailable.
421
- // return value is logged at debug to keep spotbugs quiet.
422
- final boolean b = new File (target .getDirPath ()).mkdirs ();
423
- LOG .debug ("mkdirs of {}={}" , target , b );
424
- availableOnDisk [i ] = target .getAvailable ();
438
+
439
+ final int dirCount = ctx .dirDF .length ;
440
+ long [] availableOnDisk = new long [dirCount ];
441
+ long totalAvailable = 0 ;
442
+
443
+ StringBuilder pathNames = new StringBuilder ();
444
+
445
+ //build the "roulette wheel"
446
+ for (int i =0 ; i < dirCount ; ++i ) {
447
+ final DF target = ctx .dirDF [i ];
448
+ // attempt to recreate the dir so that getAvailable() is valid
449
+ // if it fails, getAvailable() will return 0, so the dir will
450
+ // be declared unavailable.
451
+ // return value is logged at debug to keep spotbugs quiet.
452
+ final String name = target .getDirPath ();
453
+ pathNames .append (" " ).append (name );
454
+ final File dirPath = new File (name );
455
+
456
+ // existence probe with directory recreation
457
+ if (!dirPath .exists ()) {
458
+ LOG .debug ("Creating buffer dir {}" , name );
459
+ if (dirPath .mkdirs ()) {
460
+ note (history , "Created buffer dir %s" , name );
461
+ } else {
462
+ note (history , "Failed to create buffer dir %s" , name );
463
+ }
464
+ }
465
+
466
+ // path already existed or the mkdir call had an outcome
467
+ // make sure the path is present and a dir, and if so add its availability
468
+ if (dirPath .isDirectory ()) {
469
+ final long available = target .getAvailable ();
470
+ availableOnDisk [i ] = available ;
471
+ note (history , "%,d bytes available under path %s" , available , name );
425
472
totalAvailable += availableOnDisk [i ];
473
+ } else {
474
+ note (history , "%s does not exist/is not a directory" , name );
426
475
}
476
+ }
427
477
428
- if (totalAvailable == 0 ){
429
- throw new DiskErrorException ("No space available in any of the local directories." );
478
+ note (history , "Directory count is %d; total available capacity is %,d" ,
479
+ dirCount , totalAvailable );
480
+
481
+ if (size == SIZE_UNKNOWN ) {
482
+ //do roulette selection: pick dir with probability
483
+ // proportional to available size
484
+ note (history , "Size not specified, so picking directories at random." );
485
+
486
+ if (totalAvailable == 0 ) {
487
+ // log error and history
488
+ String newErrorText = E_NO_SPACE_AVAILABLE + pathNames
489
+ + " on host" + getHostname ();
490
+ LOG .error (newErrorText );
491
+ LOG .error (history .toString ());
492
+ // then raise the exception
493
+ throw new DiskErrorException (newErrorText );
430
494
}
431
495
432
496
// Keep rolling the wheel till we get a valid path
@@ -439,14 +503,20 @@ public Path getLocalPathForWrite(String pathStr, long size,
439
503
dir ++;
440
504
}
441
505
ctx .dirNumLastAccessed .set (dir );
442
- returnPath = createPath (ctx .localDirs [dir ], pathStr , checkWrite );
506
+ final Path localDir = ctx .localDirs [dir ];
507
+ returnPath = createPath (localDir , pathStr , checkWrite );
443
508
if (returnPath == null ) {
444
509
totalAvailable -= availableOnDisk [dir ];
445
510
availableOnDisk [dir ] = 0 ; // skip this disk
446
511
numDirsSearched ++;
512
+ note (history , "No capacity in %s" , localDir );
513
+ } else {
514
+ note (history , "Allocated file %s in %s" , returnPath , localDir );
447
515
}
448
516
}
449
517
} else {
518
+ note (history , "Requested file size is %,d; searching for a suitable directory" ,
519
+ size );
450
520
// Start linear search with random increment if possible
451
521
int randomInc = 1 ;
452
522
if (numDirs > 2 ) {
@@ -459,17 +529,22 @@ public Path getLocalPathForWrite(String pathStr, long size,
459
529
maxCapacity = capacity ;
460
530
}
461
531
if (capacity > size ) {
532
+ final Path localDir = ctx .localDirs [dirNum ];
462
533
try {
463
- returnPath = createPath (ctx .localDirs [dirNum ], pathStr ,
464
- checkWrite );
534
+ returnPath = createPath (localDir , pathStr , checkWrite );
465
535
} catch (IOException e ) {
466
536
errorText = e .getMessage ();
467
537
diskException = e ;
468
- LOG .debug ("DiskException caught for dir {}" , ctx .localDirs [dirNum ], e );
538
+ note (history , "Exception while creating path %s: %s" , localDir , errorText );
539
+ LOG .debug ("DiskException caught for dir {}" , localDir , e );
469
540
}
470
541
if (returnPath != null ) {
542
+ // success
471
543
ctx .getAndIncrDirNumLastAccessed (numDirsSearched );
544
+ note (history , "Allocated file %s in %s" , returnPath , localDir );
472
545
break ;
546
+ } else {
547
+ note (history , "No capacity in %s" , localDir );
473
548
}
474
549
}
475
550
dirNum ++;
@@ -482,12 +557,18 @@ public Path getLocalPathForWrite(String pathStr, long size,
482
557
}
483
558
484
559
//no path found
485
- String newErrorText = "Could not find any valid local directory for " +
486
- pathStr + " with requested size " + size +
487
- " as the max capacity in any directory is " + maxCapacity ;
560
+ String hostname = getHostname ();
561
+ String newErrorText = "Could not find any valid local directory for "
562
+ + pathStr + " with requested size " + size
563
+ + " on host " + hostname
564
+ + " as the max capacity in any directory"
565
+ + " (" + pathNames + " )"
566
+ + " is " + maxCapacity ;
488
567
if (errorText != null ) {
489
568
newErrorText = newErrorText + " due to " + errorText ;
490
569
}
570
+ LOG .error (newErrorText );
571
+ LOG .error (history .toString ());
491
572
throw new DiskErrorException (newErrorText , diskException );
492
573
}
493
574
0 commit comments