15
15
* See the License for the specific language governing permissions and
16
16
* limitations under the License.
17
17
*/
18
- package org .apache .hadoop .hbase .io .hfile ;
18
+ package org .apache .hadoop .hbase .io .hfile . bucket ;
19
19
20
20
import static org .apache .hadoop .hbase .HConstants .BUCKET_CACHE_IOENGINE_KEY ;
21
21
import static org .apache .hadoop .hbase .HConstants .BUCKET_CACHE_SIZE_KEY ;
22
22
import static org .apache .hadoop .hbase .io .hfile .BlockCacheFactory .BUCKET_CACHE_BUCKETS_KEY ;
23
+ import static org .apache .hadoop .hbase .io .hfile .bucket .BucketCache .QUEUE_ADDITION_WAIT_TIME ;
23
24
import static org .junit .Assert .assertEquals ;
25
+ import static org .junit .Assert .assertNotEquals ;
24
26
import static org .junit .Assert .assertNotNull ;
25
27
import static org .junit .Assert .assertNull ;
26
28
import static org .junit .Assert .assertTrue ;
30
32
import java .io .IOException ;
31
33
import java .util .Map ;
32
34
import java .util .Random ;
35
+ import java .util .concurrent .BlockingQueue ;
33
36
import java .util .concurrent .ThreadLocalRandom ;
34
37
import java .util .function .BiConsumer ;
35
38
import java .util .function .BiFunction ;
48
51
import org .apache .hadoop .hbase .client .RegionInfoBuilder ;
49
52
import org .apache .hadoop .hbase .fs .HFileSystem ;
50
53
import org .apache .hadoop .hbase .io .ByteBuffAllocator ;
51
- import org .apache .hadoop .hbase .io .hfile .bucket .BucketCache ;
52
- import org .apache .hadoop .hbase .io .hfile .bucket .BucketEntry ;
54
+ import org .apache .hadoop .hbase .io .hfile .BlockCache ;
55
+ import org .apache .hadoop .hbase .io .hfile .BlockCacheFactory ;
56
+ import org .apache .hadoop .hbase .io .hfile .BlockCacheKey ;
57
+ import org .apache .hadoop .hbase .io .hfile .BlockType ;
58
+ import org .apache .hadoop .hbase .io .hfile .CacheConfig ;
59
+ import org .apache .hadoop .hbase .io .hfile .Cacheable ;
60
+ import org .apache .hadoop .hbase .io .hfile .HFile ;
61
+ import org .apache .hadoop .hbase .io .hfile .HFileBlock ;
62
+ import org .apache .hadoop .hbase .io .hfile .HFileContext ;
63
+ import org .apache .hadoop .hbase .io .hfile .HFileContextBuilder ;
64
+ import org .apache .hadoop .hbase .io .hfile .HFileScanner ;
65
+ import org .apache .hadoop .hbase .io .hfile .PrefetchExecutor ;
66
+ import org .apache .hadoop .hbase .io .hfile .RandomKeyValueUtil ;
53
67
import org .apache .hadoop .hbase .regionserver .BloomType ;
54
68
import org .apache .hadoop .hbase .regionserver .ConstantSizeRegionSplitPolicy ;
55
69
import org .apache .hadoop .hbase .regionserver .HRegionFileSystem ;
@@ -202,40 +216,48 @@ public void testPrefetchInterruptOnCapacity() throws Exception {
202
216
conf .setLong (BUCKET_CACHE_SIZE_KEY , 1 );
203
217
conf .set (BUCKET_CACHE_BUCKETS_KEY , "3072" );
204
218
conf .setDouble ("hbase.bucketcache.acceptfactor" , 0.98 );
205
- conf .setDouble ("hbase.bucketcache.minfactor" , 0.95 );
206
- conf .setDouble ("hbase.bucketcache.extrafreefactor" , 0.01 );
219
+ conf .setDouble ("hbase.bucketcache.minfactor" , 0.98 );
220
+ conf .setDouble ("hbase.bucketcache.extrafreefactor" , 0.0 );
221
+ conf .setLong (QUEUE_ADDITION_WAIT_TIME , 100 );
207
222
blockCache = BlockCacheFactory .createBlockCache (conf );
208
223
cacheConf = new CacheConfig (conf , blockCache );
209
224
Path storeFile = writeStoreFile ("testPrefetchInterruptOnCapacity" , 10000 );
210
225
// Prefetches the file blocks
211
226
LOG .debug ("First read should prefetch the blocks." );
212
227
createReaderAndWaitForPrefetchInterruption (storeFile );
228
+ Waiter .waitFor (conf , (PrefetchExecutor .getPrefetchDelay () + 1000 ),
229
+ () -> PrefetchExecutor .isCompleted (storeFile ));
213
230
BucketCache bc = BucketCache .getBucketCacheFromCacheConfig (cacheConf ).get ();
214
- long evictionsFirstPrefetch = bc .getStats ().getEvictionCount ();
215
- LOG .debug ("evictions after first prefetch: {}" , bc .getStats ().getEvictionCount ());
231
+ long evictedFirstPrefetch = bc .getStats ().getEvictedCount ();
216
232
HFile .Reader reader = createReaderAndWaitForPrefetchInterruption (storeFile );
217
- LOG .debug ("evictions after second prefetch: {}" , bc .getStats ().getEvictionCount ());
218
- assertTrue ((bc .getStats ().getEvictionCount () - evictionsFirstPrefetch ) < 10 );
233
+ assertEquals (evictedFirstPrefetch , bc .getStats ().getEvictedCount ());
219
234
HFileScanner scanner = reader .getScanner (conf , true , true );
220
235
scanner .seekTo ();
221
236
while (scanner .next ()) {
222
237
// do a full scan to force some evictions
223
238
LOG .trace ("Iterating the full scan to evict some blocks" );
224
239
}
225
240
scanner .close ();
226
- LOG .debug ("evictions after scanner: {}" , bc .getStats ().getEvictionCount ());
241
+ Waiter .waitFor (conf , 5000 , () -> {
242
+ for (BlockingQueue <BucketCache .RAMQueueEntry > queue : bc .writerQueues ) {
243
+ if (!queue .isEmpty ()) {
244
+ return false ;
245
+ }
246
+ }
247
+ return true ;
248
+ });
227
249
// The scanner should had triggered at least 3x evictions from the prefetch,
228
250
// as we try cache each block without interruption.
229
- assertTrue (bc .getStats ().getEvictionCount () > evictionsFirstPrefetch );
251
+ assertTrue (bc .getStats ().getEvictedCount () > evictedFirstPrefetch );
230
252
}
231
253
232
254
@ Test
233
255
public void testPrefetchDoesntInterruptInMemoryOnCapacity () throws Exception {
234
256
conf .setLong (BUCKET_CACHE_SIZE_KEY , 1 );
235
257
conf .set (BUCKET_CACHE_BUCKETS_KEY , "3072" );
236
258
conf .setDouble ("hbase.bucketcache.acceptfactor" , 0.98 );
237
- conf .setDouble ("hbase.bucketcache.minfactor" , 0.95 );
238
- conf .setDouble ("hbase.bucketcache.extrafreefactor" , 0.01 );
259
+ conf .setDouble ("hbase.bucketcache.minfactor" , 0.98 );
260
+ conf .setDouble ("hbase.bucketcache.extrafreefactor" , 0.0 );
239
261
blockCache = BlockCacheFactory .createBlockCache (conf );
240
262
ColumnFamilyDescriptor family =
241
263
ColumnFamilyDescriptorBuilder .newBuilder (Bytes .toBytes ("f" )).setInMemory (true ).build ();
@@ -245,7 +267,73 @@ public void testPrefetchDoesntInterruptInMemoryOnCapacity() throws Exception {
245
267
LOG .debug ("First read should prefetch the blocks." );
246
268
createReaderAndWaitForPrefetchInterruption (storeFile );
247
269
BucketCache bc = BucketCache .getBucketCacheFromCacheConfig (cacheConf ).get ();
248
- assertTrue (bc .getStats ().getEvictedCount () > 200 );
270
+ Waiter .waitFor (conf , 1000 , () -> PrefetchExecutor .isCompleted (storeFile ));
271
+ long evictions = bc .getStats ().getEvictedCount ();
272
+ LOG .debug ("Total evicted at this point: {}" , evictions );
273
+ // creates another reader, now that cache is full, no block would fit and prefetch should not
274
+ // trigger any new evictions
275
+ createReaderAndWaitForPrefetchInterruption (storeFile );
276
+ assertEquals (evictions , bc .getStats ().getEvictedCount ());
277
+ }
278
+
279
+ @ Test
280
+ public void testPrefetchRunNoEvictions () throws Exception {
281
+ conf .setLong (BUCKET_CACHE_SIZE_KEY , 1 );
282
+ conf .set (BUCKET_CACHE_BUCKETS_KEY , "3072" );
283
+ conf .setDouble ("hbase.bucketcache.acceptfactor" , 0.98 );
284
+ conf .setDouble ("hbase.bucketcache.minfactor" , 0.98 );
285
+ conf .setDouble ("hbase.bucketcache.extrafreefactor" , 0.0 );
286
+ conf .setLong (QUEUE_ADDITION_WAIT_TIME , 100 );
287
+ blockCache = BlockCacheFactory .createBlockCache (conf );
288
+ cacheConf = new CacheConfig (conf , blockCache );
289
+ Path storeFile = writeStoreFile ("testPrefetchRunNoEvictions" , 10000 );
290
+ // Prefetches the file blocks
291
+ createReaderAndWaitForPrefetchInterruption (storeFile );
292
+ Waiter .waitFor (conf , (PrefetchExecutor .getPrefetchDelay () + 1000 ),
293
+ () -> PrefetchExecutor .isCompleted (storeFile ));
294
+ BucketCache bc = BucketCache .getBucketCacheFromCacheConfig (cacheConf ).get ();
295
+ // Wait until all cache writer queues are empty
296
+ Waiter .waitFor (conf , 5000 , () -> {
297
+ for (BlockingQueue <BucketCache .RAMQueueEntry > queue : bc .writerQueues ) {
298
+ if (!queue .isEmpty ()) {
299
+ return false ;
300
+ }
301
+ }
302
+ return true ;
303
+ });
304
+ // With the wait time configuration, prefetch should trigger no evictions once it reaches
305
+ // cache capacity
306
+ assertEquals (0 , bc .getStats ().getEvictedCount ());
307
+ }
308
+
309
+ @ Test
310
+ public void testPrefetchRunTriggersEvictions () throws Exception {
311
+ conf .setLong (BUCKET_CACHE_SIZE_KEY , 1 );
312
+ conf .set (BUCKET_CACHE_BUCKETS_KEY , "3072" );
313
+ conf .setDouble ("hbase.bucketcache.acceptfactor" , 0.98 );
314
+ conf .setDouble ("hbase.bucketcache.minfactor" , 0.98 );
315
+ conf .setDouble ("hbase.bucketcache.extrafreefactor" , 0.0 );
316
+ conf .setLong (QUEUE_ADDITION_WAIT_TIME , 0 );
317
+ blockCache = BlockCacheFactory .createBlockCache (conf );
318
+ cacheConf = new CacheConfig (conf , blockCache );
319
+ Path storeFile = writeStoreFile ("testPrefetchInterruptOnCapacity" , 10000 );
320
+ // Prefetches the file blocks
321
+ createReaderAndWaitForPrefetchInterruption (storeFile );
322
+ Waiter .waitFor (conf , (PrefetchExecutor .getPrefetchDelay () + 1000 ),
323
+ () -> PrefetchExecutor .isCompleted (storeFile ));
324
+ BucketCache bc = BucketCache .getBucketCacheFromCacheConfig (cacheConf ).get ();
325
+ // Wait until all cache writer queues are empty
326
+ Waiter .waitFor (conf , 5000 , () -> {
327
+ for (BlockingQueue <BucketCache .RAMQueueEntry > queue : bc .writerQueues ) {
328
+ if (!queue .isEmpty ()) {
329
+ return false ;
330
+ }
331
+ }
332
+ return true ;
333
+ });
334
+ // With the wait time configuration, prefetch should trigger no evictions once it reaches
335
+ // cache capacity
336
+ assertNotEquals (0 , bc .getStats ().getEvictedCount ());
249
337
}
250
338
251
339
@ Test
0 commit comments