Skip to content

Commit 25d095b

Browse files
author
Hernan Gelaf-Romer
committed
Log filtering in IncrementalBackupManager can lead to data loss
1 parent 1c2025c commit 25d095b

File tree

3 files changed

+226
-19
lines changed

3 files changed

+226
-19
lines changed

hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java

Lines changed: 60 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,14 @@
2525

2626
import java.io.IOException;
2727
import java.util.ArrayList;
28+
import java.util.Arrays;
29+
import java.util.HashMap;
2830
import java.util.List;
2931
import java.util.Map;
32+
import org.apache.hadoop.fs.FileStatus;
33+
import org.apache.hadoop.fs.FileSystem;
34+
import org.apache.hadoop.fs.Path;
35+
import org.apache.hadoop.hbase.HConstants;
3036
import org.apache.hadoop.hbase.TableName;
3137
import org.apache.hadoop.hbase.backup.BackupCopyJob;
3238
import org.apache.hadoop.hbase.backup.BackupInfo;
@@ -38,7 +44,9 @@
3844
import org.apache.hadoop.hbase.backup.util.BackupUtils;
3945
import org.apache.hadoop.hbase.client.Admin;
4046
import org.apache.hadoop.hbase.client.Connection;
47+
import org.apache.hadoop.hbase.util.CommonFSUtils;
4148
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
49+
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
4250
import org.apache.yetus.audience.InterfaceAudience;
4351
import org.slf4j.Logger;
4452
import org.slf4j.LoggerFactory;
@@ -144,21 +152,21 @@ public void execute() throws IOException {
144152
// logs while we do the backup.
145153
backupManager.writeBackupStartCode(0L);
146154
}
147-
// We roll log here before we do the snapshot. It is possible there is duplicate data
148-
// in the log that is already in the snapshot. But if we do it after the snapshot, we
149-
// could have data loss.
150-
// A better approach is to do the roll log on each RS in the same global procedure as
151-
// the snapshot.
152-
LOG.info("Execute roll log procedure for full backup ...");
153155

154156
// Gather the bulk loads being tracked by the system, which can be deleted (since their data
155157
// will be part of the snapshot being taken). We gather this list before taking the actual
156158
// snapshots for the same reason as the log rolls.
157159
List<BulkLoad> bulkLoadsToDelete = backupManager.readBulkloadRows(tableList);
160+
Map<String, Long> previousLogRollsByHost = backupManager.readRegionServerLastLogRollResult();
158161

162+
// We roll log here before we do the snapshot. It is possible there is duplicate data
163+
// in the log that is already in the snapshot. But if we do it after the snapshot, we
164+
// could have data loss.
165+
// A better approach is to do the roll log on each RS in the same global procedure as
166+
// the snapshot.
167+
LOG.info("Execute roll log procedure for full backup ...");
159168
BackupUtils.logRoll(conn, backupInfo.getBackupRootDir(), conf);
160-
161-
newTimestamps = backupManager.readRegionServerLastLogRollResult();
169+
Map<String, Long> latestLogRollsByHost = backupManager.readRegionServerLastLogRollResult();
162170

163171
// SNAPSHOT_TABLES:
164172
backupInfo.setPhase(BackupPhase.SNAPSHOT);
@@ -181,6 +189,50 @@ public void execute() throws IOException {
181189
// set overall backup status: complete. Here we make sure to complete the backup.
182190
// After this checkpoint, even if entering cancel process, will let the backup finished
183191
backupInfo.setState(BackupState.COMPLETE);
192+
193+
// Scan oldlogs for dead/decommissioned hosts and add their max WAL timestamps
194+
// to newTimestamps. This ensures subsequent incremental backups won't try to back up
195+
// WALs that are already covered by this full backup's snapshot.
196+
Path walRootDir = CommonFSUtils.getWALRootDir(conf);
197+
Path logDir = new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME);
198+
Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
199+
FileSystem fs = walRootDir.getFileSystem(conf);
200+
201+
List<FileStatus> allLogs = new ArrayList<>();
202+
for (FileStatus hostLogDir : fs.listStatus(logDir)) {
203+
String host = BackupUtils.parseHostNameFromLogFile(hostLogDir.getPath());
204+
if (host == null) {
205+
continue;
206+
}
207+
allLogs.addAll(Arrays.asList(fs.listStatus(hostLogDir.getPath())));
208+
}
209+
allLogs.addAll(Arrays.asList(fs.listStatus(oldLogDir)));
210+
211+
newTimestamps = new HashMap<>();
212+
213+
for (FileStatus log : allLogs) {
214+
if (AbstractFSWALProvider.isMetaFile(log.getPath())) {
215+
continue;
216+
}
217+
String host = BackupUtils.parseHostNameFromLogFile(log.getPath());
218+
if (host == null) {
219+
continue;
220+
}
221+
long timestamp = BackupUtils.getCreationTime(log.getPath());
222+
Long previousLogRoll = previousLogRollsByHost.get(host);
223+
Long latestLogRoll = latestLogRollsByHost.get(host);
224+
boolean isInactive = latestLogRoll == null || latestLogRoll.equals(previousLogRoll);
225+
226+
if (isInactive) {
227+
long currentTs = newTimestamps.getOrDefault(host, 0L);
228+
if (timestamp > currentTs) {
229+
newTimestamps.put(host, timestamp);
230+
}
231+
} else {
232+
newTimestamps.put(host, latestLogRoll);
233+
}
234+
}
235+
184236
// The table list in backupInfo is good for both full backup and incremental backup.
185237
// For incremental backup, it contains the incremental backup table set.
186238
backupManager.writeRegionServerLogTimestamp(backupInfo.getTables(), newTimestamps);

hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@ public IncrementalBackupManager(Connection conn, Configuration conf) throws IOEx
5858
*/
5959
public Map<String, Long> getIncrBackupLogFileMap() throws IOException {
6060
List<String> logList;
61-
Map<String, Long> newTimestamps;
6261
Map<String, Long> previousTimestampMins;
6362

6463
String savedStartCode = readBackupStartCode();
@@ -83,12 +82,48 @@ public Map<String, Long> getIncrBackupLogFileMap() throws IOException {
8382
LOG.info("Execute roll log procedure for incremental backup ...");
8483
BackupUtils.logRoll(conn, backupInfo.getBackupRootDir(), conf);
8584

86-
newTimestamps = readRegionServerLastLogRollResult();
85+
Map<String, Long> newTimestamps = readRegionServerLastLogRollResult();
86+
87+
Map<String, Long> latestLogRollByHost = readRegionServerLastLogRollResult();
88+
for (Map.Entry<String, Long> entry : latestLogRollByHost.entrySet()) {
89+
String host = entry.getKey();
90+
long latestLogRoll = entry.getValue();
91+
Long earliestTimestampToIncludeInBackup = previousTimestampMins.get(host);
92+
93+
boolean isInactive = earliestTimestampToIncludeInBackup != null
94+
&& earliestTimestampToIncludeInBackup > latestLogRoll;
95+
96+
long latestTimestampToIncludeInBackup;
97+
if (isInactive) {
98+
LOG.debug("Avoided resetting latest timestamp boundary for {} from {} to {}", host,
99+
earliestTimestampToIncludeInBackup, latestLogRoll);
100+
latestTimestampToIncludeInBackup = earliestTimestampToIncludeInBackup;
101+
} else {
102+
latestTimestampToIncludeInBackup = latestLogRoll;
103+
}
104+
newTimestamps.put(host, latestTimestampToIncludeInBackup);
105+
}
87106

88107
logList = getLogFilesForNewBackup(previousTimestampMins, newTimestamps, conf, savedStartCode);
89108
logList = excludeProcV2WALs(logList);
90109
backupInfo.setIncrBackupFileList(logList);
91110

111+
// Update boundaries based on WALs that will be backed up
112+
for (String logFile : logList) {
113+
Path logPath = new Path(logFile);
114+
String logHost = BackupUtils.parseHostFromOldLog(logPath);
115+
if (logHost == null) {
116+
logHost = BackupUtils.parseHostNameFromLogFile(logPath.getParent());
117+
}
118+
if (logHost != null) {
119+
long logTs = BackupUtils.getCreationTime(logPath);
120+
Long latestTimestampToIncludeInBackup = newTimestamps.get(logHost);
121+
if (latestTimestampToIncludeInBackup == null || logTs > latestTimestampToIncludeInBackup) {
122+
LOG.info("Updating backup boundary for inactive host {}: timestamp={}", logHost, logTs);
123+
newTimestamps.put(logHost, logTs);
124+
}
125+
}
126+
}
92127
return newTimestamps;
93128
}
94129

@@ -228,15 +263,6 @@ private List<String> getLogFilesForNewBackup(Map<String, Long> olderTimestamps,
228263
} else if (currentLogTS > oldTimeStamp) {
229264
resultLogFiles.add(currentLogFile);
230265
}
231-
232-
// It is possible that a host in .oldlogs is an obsolete region server
233-
// so newestTimestamps.get(host) here can be null.
234-
// Even if these logs belong to a obsolete region server, we still need
235-
// to include they to avoid loss of edits for backup.
236-
Long newTimestamp = newestTimestamps.get(host);
237-
if (newTimestamp == null || currentLogTS > newTimestamp) {
238-
newestLogs.add(currentLogFile);
239-
}
240266
}
241267
// remove newest log per host because they are still in use
242268
resultLogFiles.removeAll(newestLogs);
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.backup;
19+
20+
import static org.junit.Assert.assertFalse;
21+
import static org.junit.Assert.assertNotNull;
22+
import static org.junit.Assert.assertTrue;
23+
24+
import java.util.List;
25+
import java.util.Map;
26+
import org.apache.hadoop.hbase.HBaseClassTestRule;
27+
import org.apache.hadoop.hbase.HBaseTestingUtil;
28+
import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
29+
import org.apache.hadoop.hbase.TableName;
30+
import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
31+
import org.apache.hadoop.hbase.client.Connection;
32+
import org.apache.hadoop.hbase.client.ConnectionFactory;
33+
import org.apache.hadoop.hbase.regionserver.HRegionServer;
34+
import org.apache.hadoop.hbase.testclassification.LargeTests;
35+
import org.junit.BeforeClass;
36+
import org.junit.ClassRule;
37+
import org.junit.Test;
38+
import org.junit.experimental.categories.Category;
39+
import org.slf4j.Logger;
40+
import org.slf4j.LoggerFactory;
41+
42+
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
43+
44+
/**
45+
* Tests that WAL files from offline/inactive RegionServers are handled correctly during backup.
46+
* Specifically verifies that WALs from an offline RS are:
47+
* <ol>
48+
* <li>Backed up once in the first backup after the RS goes offline</li>
49+
* <li>NOT re-backed up in subsequent backups</li>
50+
* </ol>
51+
*/
52+
@Category(LargeTests.class)
53+
public class TestBackupOfflineRS extends TestBackupBase {
54+
55+
@ClassRule
56+
public static final HBaseClassTestRule CLASS_RULE =
57+
HBaseClassTestRule.forClass(TestBackupOfflineRS.class);
58+
59+
private static final Logger LOG = LoggerFactory.getLogger(TestBackupOfflineRS.class);
60+
61+
@BeforeClass
62+
public static void setUp() throws Exception {
63+
TEST_UTIL = new HBaseTestingUtil();
64+
conf1 = TEST_UTIL.getConfiguration();
65+
conf1.setInt("hbase.regionserver.info.port", -1);
66+
autoRestoreOnFailure = true;
67+
useSecondCluster = false;
68+
setUpHelper();
69+
// Start an additional RS so we have at least 2
70+
TEST_UTIL.getMiniHBaseCluster().startRegionServer();
71+
TEST_UTIL.waitTableAvailable(table1);
72+
}
73+
74+
/**
75+
* Tests that when a full backup is taken while an RS is offline (with WALs in oldlogs), the
76+
* offline host's timestamps are recorded so subsequent incremental backups don't re-include those
77+
* WALs.
78+
*/
79+
@Test
80+
public void testBackupWithOfflineRS() throws Exception {
81+
LOG.info("Starting testFullBackupWithOfflineRS");
82+
83+
SingleProcessHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster();
84+
List<TableName> tables = Lists.newArrayList(table1);
85+
86+
if (cluster.getNumLiveRegionServers() < 2) {
87+
cluster.startRegionServer();
88+
Thread.sleep(2000);
89+
}
90+
91+
LOG.info("Inserting data to generate WAL entries");
92+
try (Connection conn = ConnectionFactory.createConnection(conf1)) {
93+
insertIntoTable(conn, table1, famName, 2, 100);
94+
}
95+
96+
int rsToStop = 0;
97+
HRegionServer rsBeforeStop = cluster.getRegionServer(rsToStop);
98+
String offlineHost =
99+
rsBeforeStop.getServerName().getHostname() + ":" + rsBeforeStop.getServerName().getPort();
100+
LOG.info("Stopping RS: {}", offlineHost);
101+
102+
cluster.stopRegionServer(rsToStop);
103+
// Wait for WALs to be moved to oldlogs
104+
Thread.sleep(5000);
105+
106+
LOG.info("Taking full backup (with offline RS WALs in oldlogs)");
107+
String fullBackupId = fullTableBackup(tables);
108+
assertTrue("Full backup should succeed", checkSucceeded(fullBackupId));
109+
110+
try (BackupSystemTable sysTable = new BackupSystemTable(TEST_UTIL.getConnection())) {
111+
Map<TableName, Map<String, Long>> timestamps = sysTable.readLogTimestampMap(BACKUP_ROOT_DIR);
112+
Map<String, Long> rsTimestamps = timestamps.get(table1);
113+
LOG.info("RS timestamps after full backup: {}", rsTimestamps);
114+
115+
Long tsAfterFullBackup = rsTimestamps.get(offlineHost);
116+
assertNotNull("Offline host should have timestamp recorded in trslm after full backup",
117+
tsAfterFullBackup);
118+
119+
LOG.info("Taking incremental backup (should NOT include offline RS WALs)");
120+
String incrBackupId = incrementalTableBackup(tables);
121+
assertTrue("Incremental backup should succeed", checkSucceeded(incrBackupId));
122+
123+
timestamps = sysTable.readLogTimestampMap(BACKUP_ROOT_DIR);
124+
rsTimestamps = timestamps.get(table1);
125+
assertFalse("Offline host should not have a boundary ",
126+
rsTimestamps.containsKey(offlineHost));
127+
}
128+
}
129+
}

0 commit comments

Comments
 (0)