@@ -771,7 +771,7 @@ public synchronized int read() throws IOException {
771
771
* ChecksumFileSystem
772
772
*/
773
773
private synchronized int readBuffer (ReaderStrategy reader , int len ,
774
- CorruptedBlocks corruptedBlocks )
774
+ CorruptedBlocks corruptedBlocks , final Map < InetSocketAddress , List < IOException >> exceptionMap )
775
775
throws IOException {
776
776
IOException ioe ;
777
777
@@ -786,6 +786,7 @@ private synchronized int readBuffer(ReaderStrategy reader, int len,
786
786
while (true ) {
787
787
// retry as many times as seekToNewSource allows.
788
788
try {
789
+ DFSClientFaultInjector .get ().fetchFromDatanodeException ();
789
790
return reader .readFromBlock (blockReader , len );
790
791
} catch (ChecksumException ce ) {
791
792
DFSClient .LOG .warn ("Found Checksum error for "
@@ -796,11 +797,18 @@ private synchronized int readBuffer(ReaderStrategy reader, int len,
796
797
// we want to remember which block replicas we have tried
797
798
corruptedBlocks .addCorruptedBlock (getCurrentBlock (), currentNode );
798
799
} catch (IOException e ) {
799
- if (!retryCurrentNode ) {
800
- DFSClient .LOG .warn ("Exception while reading from "
801
- + getCurrentBlock () + " of " + src + " from "
802
- + currentNode , e );
800
+ String msg = String .format ("Failed to read block %s for file %s from datanode %s. "
801
+ + "Exception is %s. Retry with the current or next available datanode." ,
802
+ getCurrentBlock ().getBlockName (), src , currentNode .getXferAddr (), e );
803
+ DFSClient .LOG .warn (msg );
804
+
805
+ // Add the exception to exceptionMap for this datanode.
806
+ InetSocketAddress datanode = currentNode .getResolvedAddress ();
807
+ if (!exceptionMap .containsKey (datanode )) {
808
+ exceptionMap .put (datanode , new LinkedList <IOException >());
803
809
}
810
+ exceptionMap .get (datanode ).add (e );
811
+
804
812
ioe = e ;
805
813
}
806
814
boolean sourceFound ;
@@ -822,6 +830,29 @@ private synchronized int readBuffer(ReaderStrategy reader, int len,
822
830
}
823
831
}
824
832
833
+ /**
834
+ * Send IOExceptions happened at each individual datanode to DFSClient.LOG for a failed read
835
+ * request. Used in both readWithStrategy() and pread(), to record the exceptions when a read
836
+ * request failed to be served.
837
+ * @param position offset in the file where we fail to read
838
+ * @param exceptionMap a map which stores the list of IOExceptions for each datanode
839
+ */
840
+ private void logDataNodeExceptionsOnReadError (long position , final Map <InetSocketAddress ,
841
+ List <IOException >> exceptionMap ) {
842
+ String msg = String .format ("Failed to read from all available datanodes for file %s "
843
+ + "at position=%d after retrying." , src , position );
844
+ DFSClient .LOG .error (msg );
845
+ for (Map .Entry <InetSocketAddress , List <IOException >> dataNodeExceptions :
846
+ exceptionMap .entrySet ()) {
847
+ List <IOException > exceptions = dataNodeExceptions .getValue ();
848
+ for (IOException ex : exceptions ) {
849
+ msg = String .format ("Exception when fetching file %s at position=%d at datanode %s:" , src ,
850
+ position , dataNodeExceptions .getKey ());
851
+ DFSClient .LOG .error (msg , ex );
852
+ }
853
+ }
854
+ }
855
+
825
856
protected synchronized int readWithStrategy (ReaderStrategy strategy )
826
857
throws IOException {
827
858
dfsClient .checkOpen ();
@@ -831,6 +862,9 @@ protected synchronized int readWithStrategy(ReaderStrategy strategy)
831
862
832
863
int len = strategy .getTargetLength ();
833
864
CorruptedBlocks corruptedBlocks = new CorruptedBlocks ();
865
+ // A map to record IOExceptions when fetching from each datanode. Key is the socketAddress of
866
+ // a datanode.
867
+ Map <InetSocketAddress , List <IOException >> exceptionMap = new HashMap <>();
834
868
failures = 0 ;
835
869
836
870
maybeRegisterBlockRefresh ();
@@ -852,7 +886,7 @@ protected synchronized int readWithStrategy(ReaderStrategy strategy)
852
886
}
853
887
}
854
888
long beginReadMS = Time .monotonicNow ();
855
- int result = readBuffer (strategy , realLen , corruptedBlocks );
889
+ int result = readBuffer (strategy , realLen , corruptedBlocks , exceptionMap );
856
890
long readTimeMS = Time .monotonicNow () - beginReadMS ;
857
891
if (result >= 0 ) {
858
892
pos += result ;
@@ -880,6 +914,8 @@ protected synchronized int readWithStrategy(ReaderStrategy strategy)
880
914
dfsClient .addNodeToDeadNodeDetector (this , currentNode );
881
915
}
882
916
if (--retries == 0 ) {
917
+ // Fail the request and log all exceptions
918
+ logDataNodeExceptionsOnReadError (pos , exceptionMap );
883
919
throw e ;
884
920
}
885
921
} finally {
@@ -1122,16 +1158,16 @@ private static String getBestNodeDNAddrPairErrorString(
1122
1158
return errMsgr .toString ();
1123
1159
}
1124
1160
1125
- protected void fetchBlockByteRange (LocatedBlock block , long start , long end ,
1126
- ByteBuffer buf , CorruptedBlocks corruptedBlocks )
1161
+ protected void fetchBlockByteRange (LocatedBlock block , long start , long end , ByteBuffer buf ,
1162
+ CorruptedBlocks corruptedBlocks , final Map < InetSocketAddress , List < IOException >> exceptionMap )
1127
1163
throws IOException {
1128
1164
while (true ) {
1129
1165
DNAddrPair addressPair = chooseDataNode (block , null );
1130
1166
// Latest block, if refreshed internally
1131
1167
block = addressPair .block ;
1132
1168
try {
1133
1169
actualGetFromOneDataNode (addressPair , start , end , buf ,
1134
- corruptedBlocks );
1170
+ corruptedBlocks , exceptionMap );
1135
1171
return ;
1136
1172
} catch (IOException e ) {
1137
1173
checkInterrupted (e ); // check if the read has been interrupted
@@ -1142,15 +1178,15 @@ protected void fetchBlockByteRange(LocatedBlock block, long start, long end,
1142
1178
}
1143
1179
1144
1180
private Callable <ByteBuffer > getFromOneDataNode (final DNAddrPair datanode ,
1145
- final LocatedBlock block , final long start , final long end ,
1181
+ final long start , final long end ,
1146
1182
final ByteBuffer bb ,
1147
1183
final CorruptedBlocks corruptedBlocks ,
1148
- final int hedgedReadId ) {
1184
+ final Map < InetSocketAddress , List < IOException >> exceptionMap ) {
1149
1185
return new Callable <ByteBuffer >() {
1150
1186
@ Override
1151
1187
public ByteBuffer call () throws Exception {
1152
1188
DFSClientFaultInjector .get ().sleepBeforeHedgedGet ();
1153
- actualGetFromOneDataNode (datanode , start , end , bb , corruptedBlocks );
1189
+ actualGetFromOneDataNode (datanode , start , end , bb , corruptedBlocks , exceptionMap );
1154
1190
return bb ;
1155
1191
}
1156
1192
};
@@ -1167,7 +1203,8 @@ public ByteBuffer call() throws Exception {
1167
1203
* block replica
1168
1204
*/
1169
1205
void actualGetFromOneDataNode (final DNAddrPair datanode , final long startInBlk ,
1170
- final long endInBlk , ByteBuffer buf , CorruptedBlocks corruptedBlocks )
1206
+ final long endInBlk , ByteBuffer buf , CorruptedBlocks corruptedBlocks ,
1207
+ final Map <InetSocketAddress , List <IOException >> exceptionMap )
1171
1208
throws IOException {
1172
1209
DFSClientFaultInjector .get ().startFetchFromDatanode ();
1173
1210
int refetchToken = 1 ; // only need to get a new access token once
@@ -1236,9 +1273,16 @@ void actualGetFromOneDataNode(final DNAddrPair datanode, final long startInBlk,
1236
1273
// ignore IOE, since we can retry it later in a loop
1237
1274
}
1238
1275
} else {
1239
- String msg = "Failed to connect to " + datanode .addr + " for file "
1240
- + src + " for block " + block .getBlock () + ":" + e ;
1241
- DFSClient .LOG .warn ("Connection failure: " + msg , e );
1276
+ String msg = String .format ("Failed to read block %s for file %s from datanode %s. "
1277
+ + "Exception is %s. Retry with the next available datanode." ,
1278
+ block .getBlock ().getBlockName (), src , datanode .addr , e );
1279
+ DFSClient .LOG .warn (msg );
1280
+
1281
+ // Add the exception to the exceptionMap
1282
+ if (!exceptionMap .containsKey (datanode .addr )) {
1283
+ exceptionMap .put (datanode .addr , new LinkedList <IOException >());
1284
+ }
1285
+ exceptionMap .get (datanode .addr ).add (e );
1242
1286
addToLocalDeadNodes (datanode .info );
1243
1287
dfsClient .addNodeToDeadNodeDetector (this , datanode .info );
1244
1288
throw new IOException (msg );
@@ -1270,17 +1314,16 @@ protected LocatedBlock refreshLocatedBlock(LocatedBlock block)
1270
1314
* 'hedged' read if the first read is taking longer than configured amount of
1271
1315
* time. We then wait on which ever read returns first.
1272
1316
*/
1273
- private void hedgedFetchBlockByteRange (LocatedBlock block , long start ,
1274
- long end , ByteBuffer buf , CorruptedBlocks corruptedBlocks )
1275
- throws IOException {
1317
+ private void hedgedFetchBlockByteRange (LocatedBlock block , long start , long end , ByteBuffer buf ,
1318
+ CorruptedBlocks corruptedBlocks ,
1319
+ final Map < InetSocketAddress , List < IOException >> exceptionMap ) throws IOException {
1276
1320
final DfsClientConf conf = dfsClient .getConf ();
1277
1321
ArrayList <Future <ByteBuffer >> futures = new ArrayList <>();
1278
1322
CompletionService <ByteBuffer > hedgedService =
1279
1323
new ExecutorCompletionService <>(dfsClient .getHedgedReadsThreadPool ());
1280
1324
ArrayList <DatanodeInfo > ignored = new ArrayList <>();
1281
1325
ByteBuffer bb ;
1282
1326
int len = (int ) (end - start + 1 );
1283
- int hedgedReadId = 0 ;
1284
1327
while (true ) {
1285
1328
// see HDFS-6591, this metric is used to verify/catch unnecessary loops
1286
1329
hedgedReadOpsLoopNumForTesting ++;
@@ -1293,9 +1336,8 @@ private void hedgedFetchBlockByteRange(LocatedBlock block, long start,
1293
1336
// Latest block, if refreshed internally
1294
1337
block = chosenNode .block ;
1295
1338
bb = ByteBuffer .allocate (len );
1296
- Callable <ByteBuffer > getFromDataNodeCallable = getFromOneDataNode (
1297
- chosenNode , block , start , end , bb ,
1298
- corruptedBlocks , hedgedReadId ++);
1339
+ Callable <ByteBuffer > getFromDataNodeCallable =
1340
+ getFromOneDataNode (chosenNode , start , end , bb , corruptedBlocks , exceptionMap );
1299
1341
Future <ByteBuffer > firstRequest = hedgedService
1300
1342
.submit (getFromDataNodeCallable );
1301
1343
futures .add (firstRequest );
@@ -1335,8 +1377,7 @@ private void hedgedFetchBlockByteRange(LocatedBlock block, long start,
1335
1377
block = chosenNode .block ;
1336
1378
bb = ByteBuffer .allocate (len );
1337
1379
Callable <ByteBuffer > getFromDataNodeCallable =
1338
- getFromOneDataNode (chosenNode , block , start , end , bb ,
1339
- corruptedBlocks , hedgedReadId ++);
1380
+ getFromOneDataNode (chosenNode , start , end , bb , corruptedBlocks , exceptionMap );
1340
1381
Future <ByteBuffer > oneMoreRequest =
1341
1382
hedgedService .submit (getFromDataNodeCallable );
1342
1383
futures .add (oneMoreRequest );
@@ -1486,6 +1527,11 @@ private int pread(long position, ByteBuffer buffer)
1486
1527
List <LocatedBlock > blockRange = getBlockRange (position , realLen );
1487
1528
int remaining = realLen ;
1488
1529
CorruptedBlocks corruptedBlocks = new CorruptedBlocks ();
1530
+ // A map to record all IOExceptions happened at each datanode when fetching a block.
1531
+ // In HDFS-17332, we worked on populating this map only for DFSInputStream, but not for
1532
+ // DFSStripedInputStream. If you need the same function for DFSStripedInputStream, please
1533
+ // work on it yourself (fetchBlockByteRange() in DFSStripedInputStream).
1534
+ Map <InetSocketAddress , List <IOException >> exceptionMap = new HashMap <>();
1489
1535
for (LocatedBlock blk : blockRange ) {
1490
1536
long targetStart = position - blk .getStartOffset ();
1491
1537
int bytesToRead = (int ) Math .min (remaining ,
@@ -1494,11 +1540,17 @@ private int pread(long position, ByteBuffer buffer)
1494
1540
try {
1495
1541
if (dfsClient .isHedgedReadsEnabled () && !blk .isStriped ()) {
1496
1542
hedgedFetchBlockByteRange (blk , targetStart ,
1497
- targetEnd , buffer , corruptedBlocks );
1543
+ targetEnd , buffer , corruptedBlocks , exceptionMap );
1498
1544
} else {
1499
1545
fetchBlockByteRange (blk , targetStart , targetEnd ,
1500
- buffer , corruptedBlocks );
1546
+ buffer , corruptedBlocks , exceptionMap );
1501
1547
}
1548
+ } catch (IOException e ) {
1549
+ // When we reach here, it means we fail to fetch the current block from all available
1550
+ // datanodes. Send IOExceptions in exceptionMap to the log and rethrow the exception to
1551
+ // fail this request.
1552
+ logDataNodeExceptionsOnReadError (position , exceptionMap );
1553
+ throw e ;
1502
1554
} finally {
1503
1555
// Check and report if any block replicas are corrupted.
1504
1556
// BlockMissingException may be caught if all block replicas are
@@ -1507,6 +1559,8 @@ private int pread(long position, ByteBuffer buffer)
1507
1559
false );
1508
1560
}
1509
1561
1562
+ // Reset exceptionMap before fetching the next block.
1563
+ exceptionMap .clear ();
1510
1564
remaining -= bytesToRead ;
1511
1565
position += bytesToRead ;
1512
1566
}
0 commit comments