@@ -390,17 +390,18 @@ private void startPersistenceRetriever(int[] bucketSizes, long capacity) {
390
390
try {
391
391
retrieveFromFile (bucketSizes );
392
392
LOG .info ("Persistent bucket cache recovery from {} is complete." , persistencePath );
393
- } catch (IOException ioex ) {
394
- LOG .error ("Can't restore from file[{}] because of " , persistencePath , ioex );
393
+ } catch (Throwable ex ) {
394
+ LOG .warn ("Can't restore from file[{}]. The bucket cache will be reset and rebuilt."
395
+ + " Exception seen: " , persistencePath , ex );
395
396
backingMap .clear ();
396
397
fullyCachedFiles .clear ();
397
398
backingMapValidated .set (true );
399
+ regionCachedSize .clear ();
398
400
try {
399
401
bucketAllocator = new BucketAllocator (capacity , bucketSizes );
400
- } catch (BucketAllocatorException ex ) {
401
- LOG .error ("Exception during Bucket Allocation" , ex );
402
+ } catch (BucketAllocatorException allocatorException ) {
403
+ LOG .error ("Exception during Bucket Allocation" , allocatorException );
402
404
}
403
- regionCachedSize .clear ();
404
405
} finally {
405
406
this .cacheState = CacheState .ENABLED ;
406
407
startWriterThreads ();
@@ -951,7 +952,8 @@ public void logStats() {
951
952
: (StringUtils .formatPercent (cacheStats .getHitCachingRatio (), 2 ) + ", " ))
952
953
+ "evictions=" + cacheStats .getEvictionCount () + ", " + "evicted="
953
954
+ cacheStats .getEvictedCount () + ", " + "evictedPerRun=" + cacheStats .evictedPerEviction ()
954
- + ", " + "allocationFailCount=" + cacheStats .getAllocationFailCount ());
955
+ + ", " + "allocationFailCount=" + cacheStats .getAllocationFailCount () + ", blocksCount="
956
+ + backingMap .size ());
955
957
cacheStats .reset ();
956
958
957
959
bucketAllocator .logDebugStatistics ();
@@ -1496,7 +1498,7 @@ private void retrieveFromFile(int[] bucketSizes) throws IOException {
1496
1498
} else if (Arrays .equals (pbuf , BucketProtoUtils .PB_MAGIC_V2 )) {
1497
1499
// The new persistence format of chunked persistence.
1498
1500
LOG .info ("Reading new chunked format of persistence." );
1499
- retrieveChunkedBackingMap (in , bucketSizes );
1501
+ retrieveChunkedBackingMap (in );
1500
1502
} else {
1501
1503
// In 3.0 we have enough flexibility to dump the old cache data.
1502
1504
// TODO: In 2.x line, this might need to be filled in to support reading the old format
@@ -1590,17 +1592,7 @@ private void verifyFileIntegrity(BucketCacheProtos.BucketCacheEntry proto) {
1590
1592
}
1591
1593
}
1592
1594
1593
- private void parseFirstChunk (BucketCacheProtos .BucketCacheEntry firstChunk ) throws IOException {
1594
- fullyCachedFiles .clear ();
1595
- Pair <ConcurrentHashMap <BlockCacheKey , BucketEntry >, NavigableSet <BlockCacheKey >> pair =
1596
- BucketProtoUtils .fromPB (firstChunk .getDeserializersMap (), firstChunk .getBackingMap (),
1597
- this ::createRecycler );
1598
- backingMap .putAll (pair .getFirst ());
1599
- blocksByHFile .addAll (pair .getSecond ());
1600
- fullyCachedFiles .putAll (BucketProtoUtils .fromPB (firstChunk .getCachedFilesMap ()));
1601
- }
1602
-
1603
- private void parseChunkPB (BucketCacheProtos .BackingMap chunk ,
1595
+ private void updateCacheIndex (BucketCacheProtos .BackingMap chunk ,
1604
1596
java .util .Map <java .lang .Integer , java .lang .String > deserializer ) throws IOException {
1605
1597
Pair <ConcurrentHashMap <BlockCacheKey , BucketEntry >, NavigableSet <BlockCacheKey >> pair2 =
1606
1598
BucketProtoUtils .fromPB (deserializer , chunk , this ::createRecycler );
@@ -1626,55 +1618,42 @@ private void parsePB(BucketCacheProtos.BucketCacheEntry proto) throws IOExceptio
1626
1618
}
1627
1619
1628
1620
private void persistChunkedBackingMap (FileOutputStream fos ) throws IOException {
1629
- long numChunks = backingMap .size () / persistenceChunkSize ;
1630
- if (backingMap .size () % persistenceChunkSize != 0 ) {
1631
- numChunks += 1 ;
1632
- }
1633
-
1634
1621
LOG .debug (
1635
1622
"persistToFile: before persisting backing map size: {}, "
1636
- + "fullycachedFiles size: {}, chunkSize: {}, numberofChunks: {} " ,
1637
- backingMap .size (), fullyCachedFiles .size (), persistenceChunkSize , numChunks );
1623
+ + "fullycachedFiles size: {}, chunkSize: {}" ,
1624
+ backingMap .size (), fullyCachedFiles .size (), persistenceChunkSize );
1638
1625
1639
- BucketProtoUtils .serializeAsPB (this , fos , persistenceChunkSize , numChunks );
1626
+ BucketProtoUtils .serializeAsPB (this , fos , persistenceChunkSize );
1640
1627
1641
1628
LOG .debug (
1642
- "persistToFile: after persisting backing map size: {}, "
1643
- + "fullycachedFiles size: {}, numChunksPersisteed: {}" ,
1644
- backingMap .size (), fullyCachedFiles .size (), numChunks );
1629
+ "persistToFile: after persisting backing map size: {}, " + "fullycachedFiles size: {}" ,
1630
+ backingMap .size (), fullyCachedFiles .size ());
1645
1631
}
1646
1632
1647
- private void retrieveChunkedBackingMap (FileInputStream in , int [] bucketSizes ) throws IOException {
1648
- byte [] bytes = new byte [Long .BYTES ];
1649
- int readSize = in .read (bytes );
1650
- if (readSize != Long .BYTES ) {
1651
- throw new IOException ("Invalid size of chunk-size read from persistence: " + readSize );
1652
- }
1653
- long batchSize = Bytes .toLong (bytes , 0 );
1654
-
1655
- readSize = in .read (bytes );
1656
- if (readSize != Long .BYTES ) {
1657
- throw new IOException ("Invalid size for number of chunks read from persistence: " + readSize );
1658
- }
1659
- long numChunks = Bytes .toLong (bytes , 0 );
1660
-
1661
- LOG .info ("Number of chunks: {}, chunk size: {}" , numChunks , batchSize );
1633
+ private void retrieveChunkedBackingMap (FileInputStream in ) throws IOException {
1662
1634
1663
1635
// Read the first chunk that has all the details.
1664
- BucketCacheProtos .BucketCacheEntry firstChunk =
1636
+ BucketCacheProtos .BucketCacheEntry cacheEntry =
1665
1637
BucketCacheProtos .BucketCacheEntry .parseDelimitedFrom (in );
1666
- parseFirstChunk (firstChunk );
1667
-
1668
- // Subsequent chunks have the backingMap entries.
1669
- for (int i = 1 ; i < numChunks ; i ++) {
1670
- LOG .info ("Reading chunk no: {}" , i + 1 );
1671
- parseChunkPB (BucketCacheProtos .BackingMap .parseDelimitedFrom (in ),
1672
- firstChunk .getDeserializersMap ());
1673
- LOG .info ("Retrieved chunk: {}" , i + 1 );
1674
- }
1675
- verifyFileIntegrity (firstChunk );
1676
- verifyCapacityAndClasses (firstChunk .getCacheCapacity (), firstChunk .getIoClass (),
1677
- firstChunk .getMapClass ());
1638
+
1639
+ fullyCachedFiles .clear ();
1640
+ fullyCachedFiles .putAll (BucketProtoUtils .fromPB (cacheEntry .getCachedFilesMap ()));
1641
+
1642
+ backingMap .clear ();
1643
+ blocksByHFile .clear ();
1644
+
1645
+ // Read the backing map entries in batches.
1646
+ int numChunks = 0 ;
1647
+ while (in .available () > 0 ) {
1648
+ updateCacheIndex (BucketCacheProtos .BackingMap .parseDelimitedFrom (in ),
1649
+ cacheEntry .getDeserializersMap ());
1650
+ numChunks ++;
1651
+ }
1652
+
1653
+ LOG .info ("Retrieved {} of chunks with blockCount = {}." , numChunks , backingMap .size ());
1654
+ verifyFileIntegrity (cacheEntry );
1655
+ verifyCapacityAndClasses (cacheEntry .getCacheCapacity (), cacheEntry .getIoClass (),
1656
+ cacheEntry .getMapClass ());
1678
1657
updateRegionSizeMapWhileRetrievingFromFile ();
1679
1658
}
1680
1659
0 commit comments