@@ -81,39 +81,64 @@ public void init(Map<String, Object> params) {
81
81
}
82
82
}
83
83
84
- private Map <Address , Long > getServerToNewestBackupTs (List <BackupInfo > backups )
84
+ /**
85
+ * Calculates the timestamp boundary up to which all backup roots have already included the WAL.
86
+ * I.e. WALs with a lower (= older) or equal timestamp are no longer needed for future incremental
87
+ * backups.
88
+ */
89
+ private Map <Address , Long > serverToPreservationBoundaryTs (List <BackupInfo > backups )
85
90
throws IOException {
86
91
if (LOG .isDebugEnabled ()) {
87
92
LOG .debug (
88
- "Cleaning WALs if they are older than the newest backups . "
93
+ "Cleaning WALs if they are older than the WAL cleanup time-boundary . "
89
94
+ "Checking WALs against {} backups: {}" ,
90
95
backups .size (),
91
96
backups .stream ().map (BackupInfo ::getBackupId ).sorted ().collect (Collectors .joining (", " )));
92
97
}
93
- Map <Address , Long > serverAddressToNewestBackupMap = new HashMap <>();
94
-
95
- Map <TableName , Long > tableNameBackupInfoMap = new HashMap <>();
96
- for (BackupInfo backupInfo : backups ) {
97
- for (TableName table : backupInfo .getTables ()) {
98
- tableNameBackupInfoMap .putIfAbsent (table , backupInfo .getStartTs ());
99
- if (tableNameBackupInfoMap .get (table ) <= backupInfo .getStartTs ()) {
100
- tableNameBackupInfoMap .put (table , backupInfo .getStartTs ());
101
- for (Map .Entry <String , Long > entry : backupInfo .getTableSetTimestampMap ().get (table )
102
- .entrySet ()) {
103
- serverAddressToNewestBackupMap .put (Address .fromString (entry .getKey ()),
104
- entry .getValue ());
98
+
99
+ // This map tracks, for every backup root, the most recent created backup (= highest timestamp)
100
+ Map <String , BackupInfo > newestBackupPerRootDir = new HashMap <>();
101
+ for (BackupInfo backup : backups ) {
102
+ BackupInfo existingEntry = newestBackupPerRootDir .get (backup .getBackupRootDir ());
103
+ if (existingEntry == null || existingEntry .getStartTs () < backup .getStartTs ()) {
104
+ newestBackupPerRootDir .put (backup .getBackupRootDir (), backup );
105
+ }
106
+ }
107
+
108
+ if (LOG .isDebugEnabled ()) {
109
+ LOG .debug ("WAL cleanup time-boundary using info from: {}. " ,
110
+ newestBackupPerRootDir .entrySet ().stream ()
111
+ .map (e -> "Backup root " + e .getKey () + ": " + e .getValue ().getBackupId ()).sorted ()
112
+ .collect (Collectors .joining (", " )));
113
+ }
114
+
115
+ // This map tracks, for every RegionServer, the least recent (= oldest / lowest timestamp)
116
+ // inclusion in any backup. In other words, it is the timestamp boundary up to which all backup
117
+ // roots have included the WAL in their backup.
118
+ Map <Address , Long > boundaries = new HashMap <>();
119
+ for (BackupInfo backupInfo : newestBackupPerRootDir .values ()) {
120
+ // Iterate over all tables in the timestamp map, which contains all tables covered in the
121
+ // backup root, not just the tables included in that specific backup (which could be a subset)
122
+ for (TableName table : backupInfo .getTableSetTimestampMap ().keySet ()) {
123
+ for (Map .Entry <String , Long > entry : backupInfo .getTableSetTimestampMap ().get (table )
124
+ .entrySet ()) {
125
+ Address address = Address .fromString (entry .getKey ());
126
+ Long storedTs = boundaries .get (address );
127
+ if (storedTs == null || entry .getValue () < storedTs ) {
128
+ boundaries .put (address , entry .getValue ());
105
129
}
106
130
}
107
131
}
108
132
}
109
133
110
134
if (LOG .isDebugEnabled ()) {
111
- for (Map .Entry <Address , Long > entry : serverAddressToNewestBackupMap .entrySet ()) {
112
- LOG .debug ("Server: {}, Newest Backup: {}" , entry .getKey ().getHostName (), entry .getValue ());
135
+ for (Map .Entry <Address , Long > entry : boundaries .entrySet ()) {
136
+ LOG .debug ("Server: {}, WAL cleanup boundary: {}" , entry .getKey ().getHostName (),
137
+ entry .getValue ());
113
138
}
114
139
}
115
140
116
- return serverAddressToNewestBackupMap ;
141
+ return boundaries ;
117
142
}
118
143
119
144
@ Override
@@ -128,18 +153,19 @@ public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
128
153
return files ;
129
154
}
130
155
131
- Map <Address , Long > addressToNewestBackupMap ;
156
+ Map <Address , Long > serverToPreservationBoundaryTs ;
132
157
try {
133
158
try (BackupManager backupManager = new BackupManager (conn , getConf ())) {
134
- addressToNewestBackupMap = getServerToNewestBackupTs (backupManager .getBackupHistory (true ));
159
+ serverToPreservationBoundaryTs =
160
+ serverToPreservationBoundaryTs (backupManager .getBackupHistory (true ));
135
161
}
136
162
} catch (IOException ex ) {
137
163
LOG .error ("Failed to analyse backup history with exception: {}. Retaining all logs" ,
138
164
ex .getMessage (), ex );
139
165
return Collections .emptyList ();
140
166
}
141
167
for (FileStatus file : files ) {
142
- if (canDeleteFile (addressToNewestBackupMap , file .getPath ())) {
168
+ if (canDeleteFile (serverToPreservationBoundaryTs , file .getPath ())) {
143
169
filteredFiles .add (file );
144
170
}
145
171
}
@@ -174,7 +200,7 @@ public boolean isStopped() {
174
200
return this .stopped ;
175
201
}
176
202
177
- protected static boolean canDeleteFile (Map <Address , Long > addressToNewestBackupMap , Path path ) {
203
+ protected static boolean canDeleteFile (Map <Address , Long > addressToBoundaryTs , Path path ) {
178
204
if (isHMasterWAL (path )) {
179
205
return true ;
180
206
}
@@ -190,28 +216,27 @@ protected static boolean canDeleteFile(Map<Address, Long> addressToNewestBackupM
190
216
Address walServerAddress = Address .fromString (hostname );
191
217
long walTimestamp = AbstractFSWALProvider .getTimestamp (path .getName ());
192
218
193
- if (!addressToNewestBackupMap .containsKey (walServerAddress )) {
219
+ if (!addressToBoundaryTs .containsKey (walServerAddress )) {
194
220
if (LOG .isDebugEnabled ()) {
195
- LOG .debug ("No backup found for server: {}. Deleting file: {}" ,
221
+ LOG .debug ("No cleanup WAL time-boundary found for server: {}. Ok to delete file: {}" ,
196
222
walServerAddress .getHostName (), path );
197
223
}
198
224
return true ;
199
225
}
200
226
201
- Long lastBackupTs = addressToNewestBackupMap .get (walServerAddress );
202
- if (lastBackupTs >= walTimestamp ) {
227
+ Long backupBoundary = addressToBoundaryTs .get (walServerAddress );
228
+ if (backupBoundary >= walTimestamp ) {
203
229
if (LOG .isDebugEnabled ()) {
204
230
LOG .debug (
205
- "Backup found for server: {}. Backup from {} is newer than file, so deleting : {}" ,
206
- walServerAddress .getHostName (), lastBackupTs , path );
231
+ "WAL cleanup time-boundary found for server {}: {}. Ok to delete older file : {}" ,
232
+ walServerAddress .getHostName (), backupBoundary , path );
207
233
}
208
234
return true ;
209
235
}
210
236
211
237
if (LOG .isDebugEnabled ()) {
212
- LOG .debug (
213
- "Backup found for server: {}. Backup from {} is older than the file, so keeping: {}" ,
214
- walServerAddress .getHostName (), lastBackupTs , path );
238
+ LOG .debug ("WAL cleanup time-boundary found for server {}: {}. Keeping younger file: {}" ,
239
+ walServerAddress .getHostName (), backupBoundary , path );
215
240
}
216
241
} catch (Exception ex ) {
217
242
LOG .warn ("Error occurred while filtering file: {}. Ignoring cleanup of this log" , path , ex );
0 commit comments