Skip to content

HBASE-29288 Avoid adding new blocks during prefetch if usage is greater than accept factor #6965

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ public static HFileBlock getBlockForCaching(CacheConfig cacheConf, HFileBlock bl
.withPrevBlockOffset(block.getPrevBlockOffset()).withByteBuff(buff)
.withFillHeader(FILL_HEADER).withOffset(block.getOffset()).withNextBlockOnDiskSize(-1)
.withOnDiskDataSizeWithHeader(block.getOnDiskDataSizeWithHeader() + numBytes)
.withNextBlockOnDiskSize(block.getNextBlockOnDiskSize())
.withHFileContext(cloneContext(block.getHFileContext()))
.withByteBuffAllocator(cacheConf.getByteBuffAllocator()).withShared(!buff.hasArray()).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,8 @@ protected enum CacheState {
"hbase.bucketcache.persistent.file.integrity.check.algorithm";
private static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5";

public static final String QUEUE_ADDITION_WAIT_TIME = "hbase.bucketcache.queue.addition.waittime";
static final long DEFAULT_QUEUE_ADDITION_WAIT_TIME = 0;
static final String QUEUE_ADDITION_WAIT_TIME = "hbase.bucketcache.queue.addition.waittime";
private static final long DEFAULT_QUEUE_ADDITION_WAIT_TIME = 0;
private long queueAdditionWaitTime;
/**
* Use {@link java.security.MessageDigest} class's encryption algorithms to check persistent file
Expand Down Expand Up @@ -592,7 +592,7 @@ protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cach
LOG.trace("Caching key={}, item={}", cacheKey, cachedItem);
// Stuff the entry into the RAM cache so it can get drained to the persistent store
RAMQueueEntry re = new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(),
inMemory, isCachePersistent() && ioEngine instanceof FileIOEngine);
inMemory, isCachePersistent() && ioEngine instanceof FileIOEngine, wait);
/**
* Don't use ramCache.put(cacheKey, re) here. because there may be a existing entry with same
* key in ramCache, the heap size of bucket cache need to update if replacing entry from
Expand Down Expand Up @@ -1396,8 +1396,8 @@ void doDrain(final List<RAMQueueEntry> entries, ByteBuffer metaBuff) throws Inte
// transferred with our current IOEngines. Should take care, when we have new kinds of
// IOEngine in the future.
metaBuff.clear();
BucketEntry bucketEntry =
re.writeToCache(ioEngine, bucketAllocator, realCacheSize, this::createRecycler, metaBuff);
BucketEntry bucketEntry = re.writeToCache(ioEngine, bucketAllocator, realCacheSize,
this::createRecycler, metaBuff, acceptableSize());
// Successfully added. Up index and add bucketEntry. Clear io exceptions.
bucketEntries[index] = bucketEntry;
if (ioErrorStartTime > 0) {
Expand All @@ -1418,8 +1418,11 @@ void doDrain(final List<RAMQueueEntry> entries, ByteBuffer metaBuff) throws Inte
index++;
} catch (CacheFullException cfe) {
// Cache full when we tried to add. Try freeing space and then retrying (don't up index)
if (!freeInProgress) {
if (!freeInProgress && !re.isPrefetch()) {
freeSpace("Full!");
} else if (re.isPrefetch()) {
bucketEntries[index] = null;
index++;
} else {
Thread.sleep(50);
}
Expand Down Expand Up @@ -1473,13 +1476,13 @@ void doDrain(final List<RAMQueueEntry> entries, ByteBuffer metaBuff) throws Inte
return null;
});
}
long used = bucketAllocator.getUsedSize();
if (!entries.get(i).isPrefetch() && used > acceptableSize()) {
LOG.debug("Calling freeSpace for block: {}", entries.get(i).getKey());
freeSpace("Used=" + used + " > acceptable=" + acceptableSize());
}
}

long used = bucketAllocator.getUsedSize();
if (used > acceptableSize()) {
freeSpace("Used=" + used + " > acceptable=" + acceptableSize());
}
return;
}

/**
Expand Down Expand Up @@ -1957,13 +1960,16 @@ static class RAMQueueEntry {
private boolean inMemory;
private boolean isCachePersistent;

private boolean isPrefetch;

RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter, boolean inMemory,
boolean isCachePersistent) {
boolean isCachePersistent, boolean isPrefetch) {
this.key = bck;
this.data = data;
this.accessCounter = accessCounter;
this.inMemory = inMemory;
this.isCachePersistent = isCachePersistent;
this.isPrefetch = isPrefetch;
}

public Cacheable getData() {
Expand All @@ -1974,6 +1980,10 @@ public BlockCacheKey getKey() {
return key;
}

public boolean isPrefetch() {
return isPrefetch;
}

public void access(long accessCounter) {
this.accessCounter = accessCounter;
}
Expand All @@ -1987,7 +1997,7 @@ private ByteBuffAllocator getByteBuffAllocator() {

public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator alloc,
final LongAdder realCacheSize, Function<BucketEntry, Recycler> createRecycler,
ByteBuffer metaBuff) throws IOException {
ByteBuffer metaBuff, final Long acceptableSize) throws IOException {
int len = data.getSerializedLength();
// This cacheable thing can't be serialized
if (len == 0) {
Expand All @@ -1998,6 +2008,14 @@ public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator a
// recovery
}
long offset = alloc.allocateBlock(len);
// In the case of prefetch, we want to avoid freeSpace runs when the cache is full.
// this makes the cache allocation more predictable, and is particularly important
// when persistent cache is enabled, as it won't trigger evictions of the recovered blocks,
// which are likely the most accessed and relevant blocks in the cache.
if (isPrefetch() && alloc.getUsedSize() > acceptableSize) {
alloc.freeBlock(offset, len);
return null;
}
boolean succ = false;
BucketEntry bucketEntry = null;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import static org.junit.Assert.assertTrue;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
Expand All @@ -30,6 +32,7 @@
import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
import org.apache.hadoop.hbase.StartTestingClusterOption;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Put;
Expand Down Expand Up @@ -114,11 +117,17 @@ public void testPrefetchPersistence() throws Exception {

// Default interval for cache persistence is 1000ms. So after 1000ms, both the persistence files
// should exist.

HRegionServer regionServingRS = cluster.getRegionServer(0);

Admin admin = TEST_UTIL.getAdmin();
List<String> cachedFilesList = admin.getCachedFilesList(regionServingRS.getServerName());
List<String> cachedFilesList = new ArrayList<>();
Waiter.waitFor(conf, 5000, () -> {
try {
cachedFilesList.addAll(admin.getCachedFilesList(regionServingRS.getServerName()));
} catch (IOException e) {
// let the test try again
}
return cachedFilesList.size() > 0;
});
assertEquals(1, cachedFilesList.size());
for (HStoreFile h : regionServingRS.getRegions().get(0).getStores().get(0).getStorefiles()) {
assertTrue(cachedFilesList.contains(h.getPath().getName()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -748,8 +748,8 @@ public void testRAMCache() {
HFileBlock.FILL_HEADER, -1, 52, -1, meta, ByteBuffAllocator.HEAP);
HFileBlock blk2 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
HFileBlock.FILL_HEADER, -1, -1, -1, meta, ByteBuffAllocator.HEAP);
RAMQueueEntry re1 = new RAMQueueEntry(key1, blk1, 1, false, false);
RAMQueueEntry re2 = new RAMQueueEntry(key1, blk2, 1, false, false);
RAMQueueEntry re1 = new RAMQueueEntry(key1, blk1, 1, false, false, false);
RAMQueueEntry re2 = new RAMQueueEntry(key1, blk2, 1, false, false, false);

assertFalse(cache.containsKey(key1));
assertNull(cache.putIfAbsent(key1, re1));
Expand Down Expand Up @@ -796,12 +796,12 @@ public void testFreeBlockWhenIOEngineWriteFailure() throws IOException {
BucketAllocator allocator = new BucketAllocator(availableSpace, null);

BlockCacheKey key = new BlockCacheKey("dummy", 1L);
RAMQueueEntry re = new RAMQueueEntry(key, block, 1, true, false);
RAMQueueEntry re = new RAMQueueEntry(key, block, 1, true, false, false);

Assert.assertEquals(0, allocator.getUsedSize());
try {
re.writeToCache(ioEngine, allocator, null, null,
ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE));
ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE), Long.MAX_VALUE);
Assert.fail();
} catch (Exception e) {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public void testIOE() throws IOException, InterruptedException {
RAMQueueEntry rqe = q.remove();
RAMQueueEntry spiedRqe = Mockito.spy(rqe);
Mockito.doThrow(new IOException("Mocked!")).when(spiedRqe).writeToCache(Mockito.any(),
Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
this.q.add(spiedRqe);
doDrainOfOneEntry(bc, wt, q);
assertTrue(bc.blocksByHFile.isEmpty());
Expand All @@ -158,7 +158,7 @@ public void testCacheFullException() throws IOException, InterruptedException {
final CacheFullException cfe = new CacheFullException(0, 0);
BucketEntry mockedBucketEntry = Mockito.mock(BucketEntry.class);
Mockito.doThrow(cfe).doReturn(mockedBucketEntry).when(spiedRqe).writeToCache(Mockito.any(),
Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
this.q.add(spiedRqe);
doDrainOfOneEntry(bc, wt, q);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;
package org.apache.hadoop.hbase.io.hfile.bucket;

import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY;
import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY;
import static org.apache.hadoop.hbase.io.hfile.BlockCacheFactory.BUCKET_CACHE_BUCKETS_KEY;
import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.QUEUE_ADDITION_WAIT_TIME;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
Expand All @@ -30,6 +32,7 @@
import java.io.IOException;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
Expand All @@ -48,8 +51,19 @@
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketEntry;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
import org.apache.hadoop.hbase.io.hfile.BlockType;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.io.hfile.PrefetchExecutor;
import org.apache.hadoop.hbase.io.hfile.RandomKeyValueUtil;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
Expand Down Expand Up @@ -202,40 +216,48 @@ public void testPrefetchInterruptOnCapacity() throws Exception {
conf.setLong(BUCKET_CACHE_SIZE_KEY, 1);
conf.set(BUCKET_CACHE_BUCKETS_KEY, "3072");
conf.setDouble("hbase.bucketcache.acceptfactor", 0.98);
conf.setDouble("hbase.bucketcache.minfactor", 0.95);
conf.setDouble("hbase.bucketcache.extrafreefactor", 0.01);
conf.setDouble("hbase.bucketcache.minfactor", 0.98);
conf.setDouble("hbase.bucketcache.extrafreefactor", 0.0);
conf.setLong(QUEUE_ADDITION_WAIT_TIME, 100);
blockCache = BlockCacheFactory.createBlockCache(conf);
cacheConf = new CacheConfig(conf, blockCache);
Path storeFile = writeStoreFile("testPrefetchInterruptOnCapacity", 10000);
// Prefetches the file blocks
LOG.debug("First read should prefetch the blocks.");
createReaderAndWaitForPrefetchInterruption(storeFile);
Waiter.waitFor(conf, (PrefetchExecutor.getPrefetchDelay() + 1000),
() -> PrefetchExecutor.isCompleted(storeFile));
BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get();
long evictionsFirstPrefetch = bc.getStats().getEvictionCount();
LOG.debug("evictions after first prefetch: {}", bc.getStats().getEvictionCount());
long evictedFirstPrefetch = bc.getStats().getEvictedCount();
HFile.Reader reader = createReaderAndWaitForPrefetchInterruption(storeFile);
LOG.debug("evictions after second prefetch: {}", bc.getStats().getEvictionCount());
assertTrue((bc.getStats().getEvictionCount() - evictionsFirstPrefetch) < 10);
assertEquals(evictedFirstPrefetch, bc.getStats().getEvictedCount());
HFileScanner scanner = reader.getScanner(conf, true, true);
scanner.seekTo();
while (scanner.next()) {
// do a full scan to force some evictions
LOG.trace("Iterating the full scan to evict some blocks");
}
scanner.close();
LOG.debug("evictions after scanner: {}", bc.getStats().getEvictionCount());
Waiter.waitFor(conf, 5000, () -> {
for (BlockingQueue<BucketCache.RAMQueueEntry> queue : bc.writerQueues) {
if (!queue.isEmpty()) {
return false;
}
}
return true;
});
// The scanner should had triggered at least 3x evictions from the prefetch,
// as we try cache each block without interruption.
assertTrue(bc.getStats().getEvictionCount() > evictionsFirstPrefetch);
assertTrue(bc.getStats().getEvictedCount() > evictedFirstPrefetch);
}

@Test
public void testPrefetchDoesntInterruptInMemoryOnCapacity() throws Exception {
conf.setLong(BUCKET_CACHE_SIZE_KEY, 1);
conf.set(BUCKET_CACHE_BUCKETS_KEY, "3072");
conf.setDouble("hbase.bucketcache.acceptfactor", 0.98);
conf.setDouble("hbase.bucketcache.minfactor", 0.95);
conf.setDouble("hbase.bucketcache.extrafreefactor", 0.01);
conf.setDouble("hbase.bucketcache.minfactor", 0.98);
conf.setDouble("hbase.bucketcache.extrafreefactor", 0.0);
blockCache = BlockCacheFactory.createBlockCache(conf);
ColumnFamilyDescriptor family =
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("f")).setInMemory(true).build();
Expand All @@ -245,7 +267,73 @@ public void testPrefetchDoesntInterruptInMemoryOnCapacity() throws Exception {
LOG.debug("First read should prefetch the blocks.");
createReaderAndWaitForPrefetchInterruption(storeFile);
BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get();
assertTrue(bc.getStats().getEvictedCount() > 200);
Waiter.waitFor(conf, 1000, () -> PrefetchExecutor.isCompleted(storeFile));
long evictions = bc.getStats().getEvictedCount();
LOG.debug("Total evicted at this point: {}", evictions);
// creates another reader, now that cache is full, no block would fit and prefetch should not
// trigger any new evictions
createReaderAndWaitForPrefetchInterruption(storeFile);
assertEquals(evictions, bc.getStats().getEvictedCount());
}

@Test
public void testPrefetchRunNoEvictions() throws Exception {
conf.setLong(BUCKET_CACHE_SIZE_KEY, 1);
conf.set(BUCKET_CACHE_BUCKETS_KEY, "3072");
conf.setDouble("hbase.bucketcache.acceptfactor", 0.98);
conf.setDouble("hbase.bucketcache.minfactor", 0.98);
conf.setDouble("hbase.bucketcache.extrafreefactor", 0.0);
conf.setLong(QUEUE_ADDITION_WAIT_TIME, 100);
blockCache = BlockCacheFactory.createBlockCache(conf);
cacheConf = new CacheConfig(conf, blockCache);
Path storeFile = writeStoreFile("testPrefetchRunNoEvictions", 10000);
// Prefetches the file blocks
createReaderAndWaitForPrefetchInterruption(storeFile);
Waiter.waitFor(conf, (PrefetchExecutor.getPrefetchDelay() + 1000),
() -> PrefetchExecutor.isCompleted(storeFile));
BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get();
// Wait until all cache writer queues are empty
Waiter.waitFor(conf, 5000, () -> {
for (BlockingQueue<BucketCache.RAMQueueEntry> queue : bc.writerQueues) {
if (!queue.isEmpty()) {
return false;
}
}
return true;
});
// With the wait time configuration, prefetch should trigger no evictions once it reaches
// cache capacity
assertEquals(0, bc.getStats().getEvictedCount());
}

@Test
public void testPrefetchRunTriggersEvictions() throws Exception {
conf.setLong(BUCKET_CACHE_SIZE_KEY, 1);
conf.set(BUCKET_CACHE_BUCKETS_KEY, "3072");
conf.setDouble("hbase.bucketcache.acceptfactor", 0.98);
conf.setDouble("hbase.bucketcache.minfactor", 0.98);
conf.setDouble("hbase.bucketcache.extrafreefactor", 0.0);
conf.setLong(QUEUE_ADDITION_WAIT_TIME, 0);
blockCache = BlockCacheFactory.createBlockCache(conf);
cacheConf = new CacheConfig(conf, blockCache);
Path storeFile = writeStoreFile("testPrefetchInterruptOnCapacity", 10000);
// Prefetches the file blocks
createReaderAndWaitForPrefetchInterruption(storeFile);
Waiter.waitFor(conf, (PrefetchExecutor.getPrefetchDelay() + 1000),
() -> PrefetchExecutor.isCompleted(storeFile));
BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get();
// Wait until all cache writer queues are empty
Waiter.waitFor(conf, 5000, () -> {
for (BlockingQueue<BucketCache.RAMQueueEntry> queue : bc.writerQueues) {
if (!queue.isEmpty()) {
return false;
}
}
return true;
});
// With the wait time configuration, prefetch should trigger no evictions once it reaches
// cache capacity
assertNotEquals(0, bc.getStats().getEvictedCount());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void testAtomicRAMCache() throws Exception {
MockHFileBlock blk = new MockHFileBlock(BlockType.DATA, size, size, -1,
ByteBuffer.wrap(byteArr, 0, size), HFileBlock.FILL_HEADER, -1, 52, -1,
new HFileContextBuilder().build(), ByteBuffAllocator.HEAP);
RAMQueueEntry re = new RAMQueueEntry(key, blk, 1, false, false);
RAMQueueEntry re = new RAMQueueEntry(key, blk, 1, false, false, false);

Assert.assertNull(cache.putIfAbsent(key, re));
Assert.assertEquals(cache.putIfAbsent(key, re), re);
Expand Down