Skip to content

Commit 5c9bcbd

Browse files
vinayakphegdeanmolnar
authored andcommitted
HBASE-29025: Enhance the full backup command to support Continuous Backup (#6710)
* HBASE-29025: Enhance the full backup command to support continuous backup * add new check for full backup command regards to continuous backup flag * minor fixes
1 parent 77eeb71 commit 5c9bcbd

File tree

12 files changed

+713
-69
lines changed

12 files changed

+713
-69
lines changed

hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupDriver.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,14 @@
1717
*/
1818
package org.apache.hadoop.hbase.backup;
1919

20+
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.LONG_OPTION_ENABLE_CONTINUOUS_BACKUP;
2021
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_BACKUP_LIST_DESC;
2122
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_BANDWIDTH;
2223
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_BANDWIDTH_DESC;
2324
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_DEBUG;
2425
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_DEBUG_DESC;
26+
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_ENABLE_CONTINUOUS_BACKUP;
27+
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_ENABLE_CONTINUOUS_BACKUP_DESC;
2528
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_IGNORECHECKSUM;
2629
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_IGNORECHECKSUM_DESC;
2730
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_KEEP;
@@ -159,7 +162,8 @@ protected void addOptions() {
159162
addOptWithArg(OPTION_PATH, OPTION_PATH_DESC);
160163
addOptWithArg(OPTION_KEEP, OPTION_KEEP_DESC);
161164
addOptWithArg(OPTION_YARN_QUEUE_NAME, OPTION_YARN_QUEUE_NAME_DESC);
162-
165+
addOptNoArg(OPTION_ENABLE_CONTINUOUS_BACKUP, LONG_OPTION_ENABLE_CONTINUOUS_BACKUP,
166+
OPTION_ENABLE_CONTINUOUS_BACKUP_DESC);
163167
}
164168

165169
@Override

hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupInfo.java

+12
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ public enum BackupState {
7171
*/
7272
public enum BackupPhase {
7373
REQUEST,
74+
SETUP_WAL_REPLICATION,
7475
SNAPSHOT,
7576
PREPARE_INCREMENTAL,
7677
SNAPSHOTCOPY,
@@ -170,6 +171,8 @@ public enum BackupPhase {
170171
*/
171172
private boolean noChecksumVerify;
172173

174+
private boolean continuousBackupEnabled;
175+
173176
public BackupInfo() {
174177
backupTableInfoMap = new HashMap<>();
175178
}
@@ -185,6 +188,7 @@ public BackupInfo(String backupId, BackupType type, TableName[] tables, String t
185188
}
186189
this.startTs = 0;
187190
this.completeTs = 0;
191+
this.continuousBackupEnabled = false;
188192
}
189193

190194
public int getWorkers() {
@@ -592,4 +596,12 @@ public int compareTo(BackupInfo o) {
592596
Long otherTS = Long.valueOf(o.getBackupId().substring(o.getBackupId().lastIndexOf("_") + 1));
593597
return thisTS.compareTo(otherTS);
594598
}
599+
600+
public void setContinuousBackupEnabled(boolean continuousBackupEnabled) {
601+
this.continuousBackupEnabled = continuousBackupEnabled;
602+
}
603+
604+
public boolean isContinuousBackupEnabled() {
605+
return this.continuousBackupEnabled;
606+
}
595607
}

hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupRequest.java

+14
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,11 @@ public Builder withYarnPoolName(String name) {
7575
return this;
7676
}
7777

78+
public Builder withContinuousBackupEnabled(boolean continuousBackupEnabled) {
79+
request.setContinuousBackupEnabled(continuousBackupEnabled);
80+
return this;
81+
}
82+
7883
public BackupRequest build() {
7984
return request;
8085
}
@@ -89,6 +94,7 @@ public BackupRequest build() {
8994
private boolean noChecksumVerify = false;
9095
private String backupSetName;
9196
private String yarnPoolName;
97+
private boolean continuousBackupEnabled;
9298

9399
private BackupRequest() {
94100
}
@@ -163,4 +169,12 @@ public String getYarnPoolName() {
163169
public void setYarnPoolName(String yarnPoolName) {
164170
this.yarnPoolName = yarnPoolName;
165171
}
172+
173+
private void setContinuousBackupEnabled(boolean continuousBackupEnabled) {
174+
this.continuousBackupEnabled = continuousBackupEnabled;
175+
}
176+
177+
public boolean isContinuousBackupEnabled() {
178+
return this.continuousBackupEnabled;
179+
}
166180
}

hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreConstants.java

+12
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,11 @@ public interface BackupRestoreConstants {
9696
String OPTION_YARN_QUEUE_NAME_DESC = "Yarn queue name to run backup create command on";
9797
String OPTION_YARN_QUEUE_NAME_RESTORE_DESC = "Yarn queue name to run backup restore command on";
9898

99+
String OPTION_ENABLE_CONTINUOUS_BACKUP = "cb";
100+
String LONG_OPTION_ENABLE_CONTINUOUS_BACKUP = "continuous-backup-enabled";
101+
String OPTION_ENABLE_CONTINUOUS_BACKUP_DESC =
102+
"Flag indicating that the full backup is part of a continuous backup process.";
103+
99104
String JOB_NAME_CONF_KEY = "mapreduce.job.name";
100105

101106
String BACKUP_CONFIG_STRING =
@@ -122,6 +127,13 @@ public interface BackupRestoreConstants {
122127

123128
String BACKUPID_PREFIX = "backup_";
124129

130+
String CONTINUOUS_BACKUP_REPLICATION_PEER = "continuous_backup_replication_peer";
131+
132+
String DEFAULT_CONTINUOUS_BACKUP_REPLICATION_ENDPOINT =
133+
"org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint";
134+
135+
String CONF_CONTINUOUS_BACKUP_WAL_DIR = "hbase.backup.continuous.wal.dir";
136+
125137
enum BackupCommand {
126138
CREATE,
127139
CANCEL,

hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -581,7 +581,8 @@ public String backupTables(BackupRequest request) throws IOException {
581581
request = builder.withBackupType(request.getBackupType()).withTableList(tableList)
582582
.withTargetRootDir(request.getTargetRootDir()).withBackupSetName(request.getBackupSetName())
583583
.withTotalTasks(request.getTotalTasks()).withBandwidthPerTasks((int) request.getBandwidth())
584-
.withNoChecksumVerify(request.getNoChecksumVerify()).build();
584+
.withNoChecksumVerify(request.getNoChecksumVerify())
585+
.withContinuousBackupEnabled(request.isContinuousBackupEnabled()).build();
585586

586587
TableBackupClient client;
587588
try {

hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java

+59-4
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_BANDWIDTH_DESC;
2323
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_DEBUG;
2424
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_DEBUG_DESC;
25+
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_ENABLE_CONTINUOUS_BACKUP;
26+
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_ENABLE_CONTINUOUS_BACKUP_DESC;
2527
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_IGNORECHECKSUM;
2628
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_IGNORECHECKSUM_DESC;
2729
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_KEEP;
@@ -45,6 +47,7 @@
4547
import java.io.IOException;
4648
import java.net.URI;
4749
import java.util.List;
50+
import java.util.Set;
4851
import org.apache.commons.lang3.StringUtils;
4952
import org.apache.hadoop.conf.Configuration;
5053
import org.apache.hadoop.conf.Configured;
@@ -339,14 +342,64 @@ public void execute() throws IOException {
339342

340343
boolean ignoreChecksum = cmdline.hasOption(OPTION_IGNORECHECKSUM);
341344

345+
BackupType backupType = BackupType.valueOf(args[1].toUpperCase());
346+
List<TableName> tableNameList = null;
347+
if (tables != null) {
348+
tableNameList = Lists.newArrayList(BackupUtils.parseTableNames(tables));
349+
}
350+
boolean continuousBackup = cmdline.hasOption(OPTION_ENABLE_CONTINUOUS_BACKUP);
351+
if (continuousBackup && !BackupType.FULL.equals(backupType)) {
352+
System.out.println("ERROR: Continuous backup can Only be specified for Full Backup");
353+
printUsage();
354+
throw new IOException(INCORRECT_USAGE);
355+
}
356+
357+
/*
358+
* The `continuousBackup` flag is specified only during the first full backup to initiate
359+
* continuous WAL replication. After that, it is redundant because the tables are already set
360+
* up for continuous backup. If the `continuousBackup` flag is not explicitly enabled, we need
361+
* to determine the backup mode based on the current state of the specified tables: - If all
362+
* the specified tables are already part of continuous backup, we treat the request as a
363+
* continuous backup request and proceed accordingly (since these tables are already
364+
* continuously backed up, no additional setup is needed). - If none of the specified tables
365+
* are part of continuous backup, we treat the request as a normal full backup without
366+
* continuous backup. - If the request includes a mix of tables—some with continuous backup
367+
* enabled and others without—we cannot determine a clear backup strategy. In this case, we
368+
* throw an error. If all tables are already in continuous backup mode, we explicitly set the
369+
* `continuousBackup` flag to `true` so that the request is processed using the continuous
370+
* backup approach rather than the normal full backup flow.
371+
*/
372+
if (!continuousBackup && tableNameList != null && !tableNameList.isEmpty()) {
373+
try (BackupSystemTable backupSystemTable = new BackupSystemTable(conn)) {
374+
Set<TableName> continuousBackupTableSet =
375+
backupSystemTable.getContinuousBackupTableSet().keySet();
376+
377+
boolean allTablesInContinuousBackup = continuousBackupTableSet.containsAll(tableNameList);
378+
boolean noTablesInContinuousBackup =
379+
tableNameList.stream().noneMatch(continuousBackupTableSet::contains);
380+
381+
// Ensure that all tables are either fully in continuous backup or not at all
382+
if (!allTablesInContinuousBackup && !noTablesInContinuousBackup) {
383+
System.err
384+
.println("ERROR: Some tables are already in continuous backup, while others are not. "
385+
+ "Cannot mix both in a single request.");
386+
printUsage();
387+
throw new IOException(INCORRECT_USAGE);
388+
}
389+
390+
// If all tables are already in continuous backup, enable the flag
391+
if (allTablesInContinuousBackup) {
392+
continuousBackup = true;
393+
}
394+
}
395+
}
396+
342397
try (BackupAdminImpl admin = new BackupAdminImpl(conn)) {
343398
BackupRequest.Builder builder = new BackupRequest.Builder();
344-
BackupRequest request = builder.withBackupType(BackupType.valueOf(args[1].toUpperCase()))
345-
.withTableList(
346-
tables != null ? Lists.newArrayList(BackupUtils.parseTableNames(tables)) : null)
399+
BackupRequest request = builder.withBackupType(backupType).withTableList(tableNameList)
347400
.withTargetRootDir(targetBackupDir).withTotalTasks(workers)
348401
.withBandwidthPerTasks(bandwidth).withNoChecksumVerify(ignoreChecksum)
349-
.withBackupSetName(setName).build();
402+
.withBackupSetName(setName).withContinuousBackupEnabled(continuousBackup).build();
350403
String backupId = admin.backupTables(request);
351404
System.out.println("Backup session " + backupId + " finished. Status: SUCCESS");
352405
} catch (IOException e) {
@@ -400,6 +453,8 @@ protected void printUsage() {
400453
options.addOption(OPTION_YARN_QUEUE_NAME, true, OPTION_YARN_QUEUE_NAME_DESC);
401454
options.addOption(OPTION_DEBUG, false, OPTION_DEBUG_DESC);
402455
options.addOption(OPTION_IGNORECHECKSUM, false, OPTION_IGNORECHECKSUM_DESC);
456+
options.addOption(OPTION_ENABLE_CONTINUOUS_BACKUP, false,
457+
OPTION_ENABLE_CONTINUOUS_BACKUP_DESC);
403458

404459
HelpFormatter helpFormatter = new HelpFormatter();
405460
helpFormatter.setLeftPadding(2);

hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java

+16-2
Original file line numberDiff line numberDiff line change
@@ -192,8 +192,8 @@ public void close() {
192192
* @throws BackupException exception
193193
*/
194194
public BackupInfo createBackupInfo(String backupId, BackupType type, List<TableName> tableList,
195-
String targetRootDir, int workers, long bandwidth, boolean noChecksumVerify)
196-
throws BackupException {
195+
String targetRootDir, int workers, long bandwidth, boolean noChecksumVerify,
196+
boolean continuousBackupEnabled) throws BackupException {
197197
if (targetRootDir == null) {
198198
throw new BackupException("Wrong backup request parameter: target backup root directory");
199199
}
@@ -231,6 +231,7 @@ public BackupInfo createBackupInfo(String backupId, BackupType type, List<TableN
231231
backupInfo.setBandwidth(bandwidth);
232232
backupInfo.setWorkers(workers);
233233
backupInfo.setNoChecksumVerify(noChecksumVerify);
234+
backupInfo.setContinuousBackupEnabled(continuousBackupEnabled);
234235
return backupInfo;
235236
}
236237

@@ -420,4 +421,17 @@ public void addIncrementalBackupTableSet(Set<TableName> tables) throws IOExcepti
420421
public Connection getConnection() {
421422
return conn;
422423
}
424+
425+
/**
426+
* Adds a set of tables to the global continuous backup set. Only tables that do not already have
427+
* continuous backup enabled will be updated.
428+
* @param tables set of tables to add to continuous backup
429+
* @param startTimestamp timestamp indicating when continuous backup started for newly added
430+
* tables
431+
* @throws IOException if an error occurs while updating the backup system table
432+
*/
433+
public void addContinuousBackupTableSet(Set<TableName> tables, long startTimestamp)
434+
throws IOException {
435+
systemTable.addContinuousBackupTableSet(tables, startTimestamp);
436+
}
423437
}

hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java

+94
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ public String toString() {
167167
private final static byte[] ACTIVE_SESSION_NO = Bytes.toBytes("no");
168168

169169
private final static String INCR_BACKUP_SET = "incrbackupset:";
170+
private final static String CONTINUOUS_BACKUP_SET = "continuousbackupset";
170171
private final static String TABLE_RS_LOG_MAP_PREFIX = "trslm:";
171172
private final static String RS_LOG_TS_PREFIX = "rslogts:";
172173

@@ -969,6 +970,37 @@ public Set<TableName> getIncrementalBackupTableSet(String backupRoot) throws IOE
969970
}
970971
}
971972

973+
/**
974+
* Retrieves the current set of tables covered by continuous backup along with the timestamp
975+
* indicating when continuous backup started for each table.
976+
* @return a map where the key is the table name and the value is the timestamp representing the
977+
* start time of continuous backup for that table.
978+
* @throws IOException if an I/O error occurs while accessing the backup system table.
979+
*/
980+
public Map<TableName, Long> getContinuousBackupTableSet() throws IOException {
981+
LOG.trace("Retrieving continuous backup table set from the backup system table.");
982+
Map<TableName, Long> tableMap = new TreeMap<>();
983+
984+
try (Table systemTable = connection.getTable(tableName)) {
985+
Get getOperation = createGetForContinuousBackupTableSet();
986+
Result result = systemTable.get(getOperation);
987+
988+
if (result.isEmpty()) {
989+
return tableMap;
990+
}
991+
992+
// Extract table names and timestamps from the result cells
993+
List<Cell> cells = result.listCells();
994+
for (Cell cell : cells) {
995+
TableName tableName = TableName.valueOf(CellUtil.cloneQualifier(cell));
996+
long timestamp = Bytes.toLong(CellUtil.cloneValue(cell));
997+
tableMap.put(tableName, timestamp);
998+
}
999+
}
1000+
1001+
return tableMap;
1002+
}
1003+
9721004
/**
9731005
* Add tables to global incremental backup set
9741006
* @param tables set of tables
@@ -990,6 +1022,34 @@ public void addIncrementalBackupTableSet(Set<TableName> tables, String backupRoo
9901022
}
9911023
}
9921024

1025+
/**
1026+
* Add tables to the global continuous backup set. Only updates tables that are not already in the
1027+
* continuous backup set.
1028+
* @param tables set of tables to add
1029+
* @param startTimestamp timestamp indicating when continuous backup started
1030+
* @throws IOException if an error occurs while updating the backup system table
1031+
*/
1032+
public void addContinuousBackupTableSet(Set<TableName> tables, long startTimestamp)
1033+
throws IOException {
1034+
if (LOG.isTraceEnabled()) {
1035+
LOG.trace("Add continuous backup table set to backup system table. tables ["
1036+
+ StringUtils.join(tables, " ") + "]");
1037+
}
1038+
if (LOG.isDebugEnabled()) {
1039+
tables.forEach(table -> LOG.debug(Objects.toString(table)));
1040+
}
1041+
1042+
// Get existing continuous backup tables
1043+
Map<TableName, Long> existingTables = getContinuousBackupTableSet();
1044+
1045+
try (Table table = connection.getTable(tableName)) {
1046+
Put put = createPutForContinuousBackupTableSet(tables, existingTables, startTimestamp);
1047+
if (!put.isEmpty()) {
1048+
table.put(put);
1049+
}
1050+
}
1051+
}
1052+
9931053
/**
9941054
* Deletes incremental backup set for a backup destination
9951055
* @param backupRoot backup root
@@ -1318,6 +1378,18 @@ private Get createGetForIncrBackupTableSet(String backupRoot) throws IOException
13181378
return get;
13191379
}
13201380

1381+
/**
1382+
* Creates a Get operation to retrieve the continuous backup table set from the backup system
1383+
* table.
1384+
* @return a Get operation for retrieving the table set
1385+
*/
1386+
private Get createGetForContinuousBackupTableSet() throws IOException {
1387+
Get get = new Get(rowkey(CONTINUOUS_BACKUP_SET));
1388+
get.addFamily(BackupSystemTable.META_FAMILY);
1389+
get.readVersions(1);
1390+
return get;
1391+
}
1392+
13211393
/**
13221394
* Creates Put to store incremental backup table set
13231395
* @param tables tables
@@ -1332,6 +1404,28 @@ private Put createPutForIncrBackupTableSet(Set<TableName> tables, String backupR
13321404
return put;
13331405
}
13341406

1407+
/**
1408+
* Creates a Put operation to store the continuous backup table set. Only includes tables that are
1409+
* not already in the set.
1410+
* @param tables tables to add
1411+
* @param existingTables tables that already have continuous backup enabled
1412+
* @param startTimestamp timestamp indicating when continuous backup started
1413+
* @return put operation
1414+
*/
1415+
private Put createPutForContinuousBackupTableSet(Set<TableName> tables,
1416+
Map<TableName, Long> existingTables, long startTimestamp) {
1417+
Put put = new Put(rowkey(CONTINUOUS_BACKUP_SET));
1418+
1419+
for (TableName table : tables) {
1420+
if (!existingTables.containsKey(table)) {
1421+
put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes(table.getNameAsString()),
1422+
Bytes.toBytes(startTimestamp));
1423+
}
1424+
}
1425+
1426+
return put;
1427+
}
1428+
13351429
/**
13361430
* Creates Delete for incremental backup table set
13371431
* @param backupRoot backup root

0 commit comments

Comments
 (0)