Skip to content

Commit 47d2aa5

Browse files
authored
HBASE-29131 Introduce the option for post-compaction validation of HFiles (#6700)
Introduces the option for an HStore to fully read the file it just wrote after a flush or compaction. To enable this feature, set `hbase.hstore.validate.read_fully=true`. This is an HStore configuration feature, so it can be enabled in hbase-site.xml, in the TableDescriptor, or in the ColumnFamilyDescriptor. Signed-off-by: Peter Somogyi <[email protected] >
1 parent 53e3aa9 commit 47d2aa5

File tree

5 files changed

+228
-60
lines changed

5 files changed

+228
-60
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java

+84-21
Original file line numberDiff line numberDiff line change
@@ -2332,37 +2332,100 @@ private boolean shouldForbidMajorCompaction() {
23322332
}
23332333

23342334
/**
2335+
* <p>
23352336
* We are trying to remove / relax the region read lock for compaction. Let's see what are the
23362337
* potential race conditions among the operations (user scan, region split, region close and
2337-
* region bulk load). user scan ---> region read lock region split --> region close first -->
2338-
* region write lock region close --> region write lock region bulk load --> region write lock
2338+
* region bulk load).
2339+
* </p>
2340+
*
2341+
* <pre>
2342+
* user scan ---> region read lock
2343+
* region split --> region close first --> region write lock
2344+
* region close --> region write lock
2345+
* region bulk load --> region write lock
2346+
* </pre>
2347+
* <p>
23392348
* read lock is compatible with read lock. ---> no problem with user scan/read region bulk load
23402349
* does not cause problem for compaction (no consistency problem, store lock will help the store
2341-
* file accounting). They can run almost concurrently at the region level. The only remaining race
2342-
* condition is between the region close and compaction. So we will evaluate, below, how region
2343-
* close intervenes with compaction if compaction does not acquire region read lock. Here are the
2344-
* steps for compaction: 1. obtain list of StoreFile's 2. create StoreFileScanner's based on list
2345-
* from #1 3. perform compaction and save resulting files under tmp dir 4. swap in compacted files
2350+
* file accounting). They can run almost concurrently at the region level.
2351+
* </p>
2352+
* <p>
2353+
* The only remaining race condition is between the region close and compaction. So we will
2354+
* evaluate, below, how region close intervenes with compaction if compaction does not acquire
2355+
* region read lock.
2356+
* </p>
2357+
* <p>
2358+
* Here are the steps for compaction:
2359+
* <ol>
2360+
* <li>obtain list of StoreFile's</li>
2361+
* <li>create StoreFileScanner's based on list from #1</li>
2362+
* <li>perform compaction and save resulting files under tmp dir</li>
2363+
* <li>swap in compacted files</li>
2364+
* </ol>
2365+
* </p>
2366+
* <p>
23462367
* #1 is guarded by store lock. This patch does not change this --> no worse or better For #2, we
23472368
* obtain smallest read point (for region) across all the Scanners (for both default compactor and
23482369
* stripe compactor). The read points are for user scans. Region keeps the read points for all
23492370
* currently open user scanners. Compaction needs to know the smallest read point so that during
23502371
* re-write of the hfiles, it can remove the mvcc points for the cells if their mvccs are older
23512372
* than the smallest since they are not needed anymore. This will not conflict with compaction.
2352-
* For #3, it can be performed in parallel to other operations. For #4 bulk load and compaction
2353-
* don't conflict with each other on the region level (for multi-family atomicy). Region close and
2354-
* compaction are guarded pretty well by the 'writestate'. In HRegion#doClose(), we have :
2355-
* synchronized (writestate) { // Disable compacting and flushing by background threads for this
2356-
* // region. canFlush = !writestate.readOnly; writestate.writesEnabled = false;
2357-
* LOG.debug("Closing " + this + ": disabling compactions & flushes");
2358-
* waitForFlushesAndCompactions(); } waitForFlushesAndCompactions() would wait for
2359-
* writestate.compacting to come down to 0. and in HRegion.compact() try { synchronized
2360-
* (writestate) { if (writestate.writesEnabled) { wasStateSet = true; ++writestate.compacting; }
2361-
* else { String msg = "NOT compacting region " + this + ". Writes disabled."; LOG.info(msg);
2362-
* status.abort(msg); return false; } } Also in compactor.performCompaction(): check periodically
2363-
* to see if a system stop is requested if (closeChecker != null &&
2364-
* closeChecker.isTimeLimit(store, now)) { progress.cancel(); return false; } if (closeChecker !=
2365-
* null && closeChecker.isSizeLimit(store, len)) { progress.cancel(); return false; }
2373+
* </p>
2374+
* <p>
2375+
* For #3, it can be performed in parallel to other operations.
2376+
* </p>
2377+
* <p>
2378+
* For #4 bulk load and compaction don't conflict with each other on the region level (for
2379+
* multi-family atomicy).
2380+
* </p>
2381+
* <p>
2382+
* Region close and compaction are guarded pretty well by the 'writestate'. In HRegion#doClose(),
2383+
* we have :
2384+
*
2385+
* <pre>
2386+
* synchronized (writestate) {
2387+
* // Disable compacting and flushing by background threads for this
2388+
* // region.
2389+
* canFlush = !writestate.readOnly;
2390+
* writestate.writesEnabled = false;
2391+
* LOG.debug("Closing " + this + ": disabling compactions & flushes");
2392+
* waitForFlushesAndCompactions();
2393+
* }
2394+
* </pre>
2395+
*
2396+
* {@code waitForFlushesAndCompactions()} would wait for {@code writestate.compacting} to come
2397+
* down to 0. and in {@code HRegion.compact()}
2398+
*
2399+
* <pre>
2400+
* try {
2401+
* synchronized (writestate) {
2402+
* if (writestate.writesEnabled) {
2403+
* wasStateSet = true;
2404+
* ++writestate.compacting;
2405+
* } else {
2406+
* String msg = "NOT compacting region " + this + ". Writes disabled.";
2407+
* LOG.info(msg);
2408+
* status.abort(msg);
2409+
* return false;
2410+
* }
2411+
* }
2412+
* }
2413+
* </pre>
2414+
*
2415+
* Also in {@code compactor.performCompaction()}: check periodically to see if a system stop is
2416+
* requested
2417+
*
2418+
* <pre>
2419+
* if (closeChecker != null && closeChecker.isTimeLimit(store, now)) {
2420+
* progress.cancel();
2421+
* return false;
2422+
* }
2423+
* if (closeChecker != null && closeChecker.isSizeLimit(store, len)) {
2424+
* progress.cancel();
2425+
* return false;
2426+
* }
2427+
* </pre>
2428+
* </p>
23662429
*/
23672430
public boolean compact(CompactionContext compaction, HStore store,
23682431
ThroughputController throughputController, User user) throws IOException {

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java

+26-18
Original file line numberDiff line numberDiff line change
@@ -835,7 +835,7 @@ protected List<Path> flushCache(final long logCacheFlushId, MemStoreSnapshot sna
835835
try {
836836
for (Path pathName : pathNames) {
837837
lastPathName = pathName;
838-
storeEngine.validateStoreFile(pathName);
838+
storeEngine.validateStoreFile(pathName, false);
839839
}
840840
return pathNames;
841841
} catch (Exception e) {
@@ -1121,7 +1121,7 @@ public void deleteChangedReaderObserver(ChangedReadersObserver o) {
11211121
* block for long periods.
11221122
* <p>
11231123
* During this time, the Store can work as usual, getting values from StoreFiles and writing new
1124-
* StoreFiles from the memstore. Existing StoreFiles are not destroyed until the new compacted
1124+
* StoreFiles from the MemStore. Existing StoreFiles are not destroyed until the new compacted
11251125
* StoreFile is completely written-out to disk.
11261126
* <p>
11271127
* The compactLock prevents multiple simultaneous compactions. The structureLock prevents us from
@@ -1132,21 +1132,29 @@ public void deleteChangedReaderObserver(ChangedReadersObserver o) {
11321132
* <p>
11331133
* Compaction event should be idempotent, since there is no IO Fencing for the region directory in
11341134
* hdfs. A region server might still try to complete the compaction after it lost the region. That
1135-
* is why the following events are carefully ordered for a compaction: 1. Compaction writes new
1136-
* files under region/.tmp directory (compaction output) 2. Compaction atomically moves the
1137-
* temporary file under region directory 3. Compaction appends a WAL edit containing the
1138-
* compaction input and output files. Forces sync on WAL. 4. Compaction deletes the input files
1139-
* from the region directory. Failure conditions are handled like this: - If RS fails before 2,
1140-
* compaction wont complete. Even if RS lives on and finishes the compaction later, it will only
1141-
* write the new data file to the region directory. Since we already have this data, this will be
1142-
* idempotent but we will have a redundant copy of the data. - If RS fails between 2 and 3, the
1143-
* region will have a redundant copy of the data. The RS that failed won't be able to finish
1144-
* sync() for WAL because of lease recovery in WAL. - If RS fails after 3, the region region
1145-
* server who opens the region will pick up the the compaction marker from the WAL and replay it
1146-
* by removing the compaction input files. Failed RS can also attempt to delete those files, but
1147-
* the operation will be idempotent See HBASE-2231 for details.
1135+
* is why the following events are carefully ordered for a compaction:
1136+
* <ol>
1137+
* <li>Compaction writes new files under region/.tmp directory (compaction output)</li>
1138+
* <li>Compaction atomically moves the temporary file under region directory</li>
1139+
* <li>Compaction appends a WAL edit containing the compaction input and output files. Forces sync
1140+
* on WAL.</li>
1141+
* <li>Compaction deletes the input files from the region directory.</li>
1142+
* </ol>
1143+
* Failure conditions are handled like this:
1144+
* <ul>
1145+
* <li>If RS fails before 2, compaction won't complete. Even if RS lives on and finishes the
1146+
* compaction later, it will only write the new data file to the region directory. Since we
1147+
* already have this data, this will be idempotent, but we will have a redundant copy of the
1148+
* data.</li>
1149+
* <li>If RS fails between 2 and 3, the region will have a redundant copy of the data. The RS that
1150+
* failed won't be able to finish sync() for WAL because of lease recovery in WAL.</li>
1151+
* <li>If RS fails after 3, the region server who opens the region will pick up the compaction
1152+
* marker from the WAL and replay it by removing the compaction input files. Failed RS can also
1153+
* attempt to delete those files, but the operation will be idempotent</li>
1154+
* </ul>
1155+
* See HBASE-2231 for details.
11481156
* @param compaction compaction details obtained from requestCompaction()
1149-
* @return Storefile we compacted into or null if we failed or opted out early.
1157+
* @return The storefiles that we compacted into or null if we failed or opted out early.
11501158
*/
11511159
public List<HStoreFile> compact(CompactionContext compaction,
11521160
ThroughputController throughputController, User user) throws IOException {
@@ -1189,7 +1197,7 @@ protected List<HStoreFile> doCompaction(CompactionRequestImpl cr,
11891197
throws IOException {
11901198
// Do the steps necessary to complete the compaction.
11911199
setStoragePolicyFromFileName(newFiles);
1192-
List<HStoreFile> sfs = storeEngine.commitStoreFiles(newFiles, true);
1200+
List<HStoreFile> sfs = storeEngine.commitStoreFiles(newFiles, true, true);
11931201
if (this.getCoprocessorHost() != null) {
11941202
for (HStoreFile sf : sfs) {
11951203
getCoprocessorHost().postCompact(this, sf, cr.getTracker(), cr, user);
@@ -1983,7 +1991,7 @@ public boolean commit(MonitoredTask status) throws IOException {
19831991
return false;
19841992
}
19851993
status.setStatus("Flushing " + this + ": reopening flushed file");
1986-
List<HStoreFile> storeFiles = storeEngine.commitStoreFiles(tempFiles, false);
1994+
List<HStoreFile> storeFiles = storeEngine.commitStoreFiles(tempFiles, false, false);
19871995
for (HStoreFile sf : storeFiles) {
19881996
StoreFileReader r = sf.getReader();
19891997
if (LOG.isInfoEnabled()) {

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java

+45-19
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
import org.apache.hadoop.conf.Configuration;
3838
import org.apache.hadoop.fs.Path;
3939
import org.apache.hadoop.hbase.CellComparator;
40+
import org.apache.hadoop.hbase.ExtendedCell;
41+
import org.apache.hadoop.hbase.KeyValue;
4042
import org.apache.hadoop.hbase.io.hfile.BloomFilterMetrics;
4143
import org.apache.hadoop.hbase.log.HBaseMarkers;
4244
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
@@ -96,6 +98,9 @@ public abstract class StoreEngine<SF extends StoreFlusher, CP extends Compaction
9698

9799
private static final Logger LOG = LoggerFactory.getLogger(StoreEngine.class);
98100

101+
private static final String READ_FULLY_ON_VALIDATE_KEY = "hbase.hstore.validate.read_fully";
102+
private static final boolean DEFAULT_READ_FULLY_ON_VALIDATE = false;
103+
99104
protected SF storeFlusher;
100105
protected CP compactionPolicy;
101106
protected C compactor;
@@ -163,7 +168,7 @@ public StoreFileManager getStoreFileManager() {
163168
}
164169

165170
/** Returns Store flusher to use. */
166-
public StoreFlusher getStoreFlusher() {
171+
StoreFlusher getStoreFlusher() {
167172
return this.storeFlusher;
168173
}
169174

@@ -202,7 +207,7 @@ protected final void createComponentsOnce(Configuration conf, HStore store,
202207
this.openStoreFileThreadPoolCreator = store.getHRegion()::getStoreFileOpenAndCloseThreadPool;
203208
this.storeFileTracker = createStoreFileTracker(conf, store);
204209
assert compactor != null && compactionPolicy != null && storeFileManager != null
205-
&& storeFlusher != null && storeFileTracker != null;
210+
&& storeFlusher != null;
206211
}
207212

208213
/**
@@ -229,12 +234,34 @@ public HStoreFile createStoreFileAndReader(StoreFileInfo info) throws IOExceptio
229234
/**
230235
* Validates a store file by opening and closing it. In HFileV2 this should not be an expensive
231236
* operation.
232-
* @param path the path to the store file
237+
* @param path the path to the store file
238+
* @param isCompaction whether this is called from the context of a compaction
233239
*/
234-
public void validateStoreFile(Path path) throws IOException {
240+
public void validateStoreFile(Path path, boolean isCompaction) throws IOException {
235241
HStoreFile storeFile = null;
236242
try {
237243
storeFile = createStoreFileAndReader(path);
244+
if (conf.getBoolean(READ_FULLY_ON_VALIDATE_KEY, DEFAULT_READ_FULLY_ON_VALIDATE)) {
245+
if (storeFile.getFirstKey().isEmpty()) {
246+
LOG.debug("'{}=true' but storefile does not contain any data. skipping validation.",
247+
READ_FULLY_ON_VALIDATE_KEY);
248+
return;
249+
}
250+
LOG.debug("Validating the store file by reading the first cell from each block : {}", path);
251+
StoreFileReader reader = storeFile.getReader();
252+
try (StoreFileScanner scanner =
253+
reader.getStoreFileScanner(false, false, isCompaction, Long.MAX_VALUE, 0, false)) {
254+
boolean hasNext = scanner.seek(KeyValue.LOWESTKEY);
255+
assert hasNext : "StoreFile contains no data";
256+
for (ExtendedCell cell = scanner.next(); cell != null; cell = scanner.next()) {
257+
ExtendedCell nextIndexedKey = scanner.getNextIndexedKey();
258+
if (nextIndexedKey == null) {
259+
break;
260+
}
261+
scanner.seek(nextIndexedKey);
262+
}
263+
}
264+
}
238265
} catch (IOException e) {
239266
LOG.error("Failed to open store file : {}, keeping it in tmp location", path, e);
240267
throw e;
@@ -294,8 +321,7 @@ private List<HStoreFile> openStoreFiles(Collection<StoreFileInfo> files, boolean
294321
}
295322
if (ioe != null) {
296323
// close StoreFile readers
297-
boolean evictOnClose =
298-
ctx.getCacheConf() != null ? ctx.getCacheConf().shouldEvictOnClose() : true;
324+
boolean evictOnClose = ctx.getCacheConf() == null || ctx.getCacheConf().shouldEvictOnClose();
299325
for (HStoreFile file : results) {
300326
try {
301327
if (file != null) {
@@ -315,10 +341,8 @@ private List<HStoreFile> openStoreFiles(Collection<StoreFileInfo> files, boolean
315341
for (HStoreFile storeFile : results) {
316342
if (compactedStoreFiles.contains(storeFile.getPath().getName())) {
317343
LOG.warn("Clearing the compacted storefile {} from {}", storeFile, this);
318-
storeFile.getReader()
319-
.close(storeFile.getCacheConf() != null
320-
? storeFile.getCacheConf().shouldEvictOnClose()
321-
: true);
344+
storeFile.getReader().close(
345+
storeFile.getCacheConf() == null || storeFile.getCacheConf().shouldEvictOnClose());
322346
filesToRemove.add(storeFile);
323347
}
324348
}
@@ -380,7 +404,7 @@ private void refreshStoreFilesInternal(Collection<StoreFileInfo> newFiles) throw
380404
compactedFilesSet.put(sf.getFileInfo(), sf);
381405
}
382406

383-
Set<StoreFileInfo> newFilesSet = new HashSet<StoreFileInfo>(newFiles);
407+
Set<StoreFileInfo> newFilesSet = new HashSet<>(newFiles);
384408
// Exclude the files that have already been compacted
385409
newFilesSet = Sets.difference(newFilesSet, compactedFilesSet.keySet());
386410
Set<StoreFileInfo> toBeAddedFiles = Sets.difference(newFilesSet, currentFilesSet.keySet());
@@ -390,8 +414,8 @@ private void refreshStoreFilesInternal(Collection<StoreFileInfo> newFiles) throw
390414
return;
391415
}
392416

393-
LOG.info("Refreshing store files for " + this + " files to add: " + toBeAddedFiles
394-
+ " files to remove: " + toBeRemovedFiles);
417+
LOG.info("Refreshing store files for {} files to add: {} files to remove: {}", this,
418+
toBeAddedFiles, toBeRemovedFiles);
395419

396420
Set<HStoreFile> toBeRemovedStoreFiles = new HashSet<>(toBeRemovedFiles.size());
397421
for (StoreFileInfo sfi : toBeRemovedFiles) {
@@ -401,7 +425,7 @@ private void refreshStoreFilesInternal(Collection<StoreFileInfo> newFiles) throw
401425
// try to open the files
402426
List<HStoreFile> openedFiles = openStoreFiles(toBeAddedFiles, false);
403427

404-
// propogate the file changes to the underlying store file manager
428+
// propagate the file changes to the underlying store file manager
405429
replaceStoreFiles(toBeRemovedStoreFiles, openedFiles, () -> {
406430
}, () -> {
407431
}); // won't throw an exception
@@ -411,25 +435,27 @@ private void refreshStoreFilesInternal(Collection<StoreFileInfo> newFiles) throw
411435
* Commit the given {@code files}.
412436
* <p/>
413437
* We will move the file into data directory, and open it.
414-
* @param files the files want to commit
415-
* @param validate whether to validate the store files
438+
* @param files the files want to commit
439+
* @param isCompaction whether this is called from the context of a compaction
440+
* @param validate whether to validate the store files
416441
* @return the committed store files
417442
*/
418-
public List<HStoreFile> commitStoreFiles(List<Path> files, boolean validate) throws IOException {
443+
public List<HStoreFile> commitStoreFiles(List<Path> files, boolean isCompaction, boolean validate)
444+
throws IOException {
419445
List<HStoreFile> committedFiles = new ArrayList<>(files.size());
420446
HRegionFileSystem hfs = ctx.getRegionFileSystem();
421447
String familyName = ctx.getFamily().getNameAsString();
422448
Path storeDir = hfs.getStoreDir(familyName);
423449
for (Path file : files) {
424450
try {
425451
if (validate) {
426-
validateStoreFile(file);
452+
validateStoreFile(file, isCompaction);
427453
}
428454
Path committedPath;
429455
// As we want to support writing to data directory directly, here we need to check whether
430456
// the store file is already in the right place
431457
if (file.getParent() != null && file.getParent().equals(storeDir)) {
432-
// already in the right place, skip renmaing
458+
// already in the right place, skip renaming
433459
committedPath = file;
434460
} else {
435461
// Write-out finished successfully, move into the right spot

0 commit comments

Comments
 (0)