65
65
@ InterfaceAudience .LimitedPrivate ({"HDFS" , "MapReduce" })
66
66
@ InterfaceStability .Unstable
67
67
public class LocalDirAllocator {
68
-
68
+
69
+ static final String E_NO_SPACE_AVAILABLE =
70
+ "No space available in any of the local directories" ;
71
+
69
72
//A Map from the config item names like "mapred.local.dir"
70
73
//to the instance of the AllocatorPerContext. This
71
74
//is a static object to make sure there exists exactly one instance per JVM
@@ -384,6 +387,24 @@ int getCurrentDirectoryIndex() {
384
387
return currentContext .get ().dirNumLastAccessed .get ();
385
388
}
386
389
390
+ /**
391
+ * Format a string, log at debug and append it to the history as a new line.
392
+ *
393
+ * @param history history to fill in
394
+ * @param fmt format string
395
+ * @param args varags
396
+ */
397
+ private void note (StringBuilder history , String fmt , Object ... args ) {
398
+ try {
399
+ final String s = String .format (fmt , args );
400
+ history .append (s ).append ("\n " );
401
+ LOG .debug (s );
402
+ } catch (Exception e ) {
403
+ // some resilience in case the format string is wrong
404
+ LOG .debug (fmt , e );
405
+ }
406
+ }
407
+
387
408
/** Get a path from the local FS. If size is known, we go
388
409
* round-robin over the set of disks (via the configured dirs) and return
389
410
* the first complete path which has enough space.
@@ -393,6 +414,12 @@ int getCurrentDirectoryIndex() {
393
414
*/
394
415
public Path getLocalPathForWrite (String pathStr , long size ,
395
416
Configuration conf , boolean checkWrite ) throws IOException {
417
+
418
+ // history is built up and logged at error if the alloc
419
+ StringBuilder history = new StringBuilder ();
420
+
421
+ note (history , "Searchng for a directory for file at %s, size = %,d; checkWrite=%s" ,
422
+ pathStr , size , checkWrite );
396
423
Context ctx = confChanged (conf );
397
424
int numDirs = ctx .localDirs .length ;
398
425
int numDirsSearched = 0 ;
@@ -406,27 +433,56 @@ public Path getLocalPathForWrite(String pathStr, long size,
406
433
pathStr = pathStr .substring (1 );
407
434
}
408
435
Path returnPath = null ;
409
-
410
- if (size == SIZE_UNKNOWN ) { //do roulette selection: pick dir with probability
411
- //proportional to available size
412
- long [] availableOnDisk = new long [ctx .dirDF .length ];
413
- long totalAvailable = 0 ;
414
-
415
- //build the "roulette wheel"
416
- for (int i =0 ; i < ctx .dirDF .length ; ++i ) {
417
- final DF target = ctx .dirDF [i ];
418
- // attempt to recreate the dir so that getAvailable() is valid
419
- // if it fails, getAvailable() will return 0, so the dir will
420
- // be declared unavailable.
421
- // return value is logged at debug to keep spotbugs quiet.
422
- final boolean b = new File (target .getDirPath ()).mkdirs ();
423
- LOG .debug ("mkdirs of {}={}" , target , b );
424
- availableOnDisk [i ] = target .getAvailable ();
436
+
437
+ final int dirCount = ctx .dirDF .length ;
438
+ long [] availableOnDisk = new long [dirCount ];
439
+ long totalAvailable = 0 ;
440
+
441
+ StringBuilder pathNames = new StringBuilder ();
442
+
443
+ //build the "roulette wheel"
444
+ for (int i =0 ; i < dirCount ; ++i ) {
445
+ final DF target = ctx .dirDF [i ];
446
+ // attempt to recreate the dir so that getAvailable() is valid
447
+ // if it fails, getAvailable() will return 0, so the dir will
448
+ // be declared unavailable.
449
+ // return value is logged at debug to keep spotbugs quiet.
450
+ final String name = target .getDirPath ();
451
+ pathNames .append (" " ).append (name );
452
+ final File dirPath = new File (name );
453
+
454
+ // existence probe
455
+ if (!dirPath .exists ()) {
456
+ LOG .debug ("creating buffer dir {}" , name );
457
+ if (dirPath .mkdirs ()) {
458
+ note (history , "Created buffer dir %s" , name );
459
+ } else {
460
+ note (history , "Failed to create buffer dir %s" , name );
461
+ }
462
+ }
463
+
464
+ // path already existed or the mkdir call had an outcome
465
+ // make sure the path is present and a dir, and if so add its availability
466
+ if (dirPath .isDirectory ()) {
467
+ final long available = target .getAvailable ();
468
+ availableOnDisk [i ] = available ;
469
+ note (history , "%s available under path %s" , pathStr , available );
425
470
totalAvailable += availableOnDisk [i ];
471
+ } else {
472
+ note (history , "%s does not exist/is not a directory" , pathStr );
426
473
}
474
+ }
475
+
476
+ note (history , "Directory count is %d; total available capacity is %,d{}" ,
477
+ dirCount , totalAvailable );
427
478
428
- if (totalAvailable == 0 ){
429
- throw new DiskErrorException ("No space available in any of the local directories." );
479
+ if (size == SIZE_UNKNOWN ) {
480
+ //do roulette selection: pick dir with probability
481
+ // proportional to available size
482
+ note (history , "Size not specified, so picking at random." );
483
+
484
+ if (totalAvailable == 0 ) {
485
+ throw new DiskErrorException (E_NO_SPACE_AVAILABLE + pathNames + "; history=" + history );
430
486
}
431
487
432
488
// Keep rolling the wheel till we get a valid path
@@ -439,14 +495,19 @@ public Path getLocalPathForWrite(String pathStr, long size,
439
495
dir ++;
440
496
}
441
497
ctx .dirNumLastAccessed .set (dir );
442
- returnPath = createPath (ctx .localDirs [dir ], pathStr , checkWrite );
498
+ final Path localDir = ctx .localDirs [dir ];
499
+ returnPath = createPath (localDir , pathStr , checkWrite );
443
500
if (returnPath == null ) {
444
501
totalAvailable -= availableOnDisk [dir ];
445
502
availableOnDisk [dir ] = 0 ; // skip this disk
446
503
numDirsSearched ++;
504
+ note (history , "No capacity in %s" , localDir );
505
+ } else {
506
+ note (history , "Allocated file %s in %s" , returnPath , localDir );
447
507
}
448
508
}
449
509
} else {
510
+ note (history , "Size is %,d; searching" , size );
450
511
// Start linear search with random increment if possible
451
512
int randomInc = 1 ;
452
513
if (numDirs > 2 ) {
@@ -459,17 +520,22 @@ public Path getLocalPathForWrite(String pathStr, long size,
459
520
maxCapacity = capacity ;
460
521
}
461
522
if (capacity > size ) {
523
+ final Path localDir = ctx .localDirs [dirNum ];
462
524
try {
463
- returnPath = createPath (ctx .localDirs [dirNum ], pathStr ,
464
- checkWrite );
525
+ returnPath = createPath (localDir , pathStr , checkWrite );
465
526
} catch (IOException e ) {
466
527
errorText = e .getMessage ();
467
528
diskException = e ;
468
- LOG .debug ("DiskException caught for dir {}" , ctx .localDirs [dirNum ], e );
529
+ note (history , "Exception while creating path %s: %s" , localDir , errorText );
530
+ LOG .debug ("DiskException caught for dir {}" , localDir , e );
469
531
}
470
532
if (returnPath != null ) {
533
+ // success
471
534
ctx .getAndIncrDirNumLastAccessed (numDirsSearched );
535
+ note (history , "Allocated file %s in %s" , returnPath , localDir );
472
536
break ;
537
+ } else {
538
+ note (history , "No capacity in %s" , localDir );
473
539
}
474
540
}
475
541
dirNum ++;
@@ -484,10 +550,14 @@ public Path getLocalPathForWrite(String pathStr, long size,
484
550
//no path found
485
551
String newErrorText = "Could not find any valid local directory for " +
486
552
pathStr + " with requested size " + size +
487
- " as the max capacity in any directory is " + maxCapacity ;
553
+ " as the max capacity in any directory"
554
+ + " (" + pathNames + " )"
555
+ + " is " + maxCapacity ;
488
556
if (errorText != null ) {
489
557
newErrorText = newErrorText + " due to " + errorText ;
490
558
}
559
+ LOG .error (newErrorText );
560
+ LOG .error (history .toString ());
491
561
throw new DiskErrorException (newErrorText , diskException );
492
562
}
493
563
0 commit comments