Skip to content

Commit 7a80583

Browse files
yuqi1129kgeisz
authored andcommitted
HBASE-25282 Remove processingServers in DeadServer as we can get this… (#2657)
Signed-off-by: Guanghao Zhang <[email protected]> Change-Id: I804f1fa313a6ec8c0dc87fecc78046443c35a22a
1 parent c04cccb commit 7a80583

File tree

9 files changed

+141
-75
lines changed

9 files changed

+141
-75
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/master/DeadServer.java

Lines changed: 0 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,6 @@
3333
import org.slf4j.Logger;
3434
import org.slf4j.LoggerFactory;
3535

36-
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
37-
3836
/**
3937
* Class to hold dead servers list and utility querying dead server list. Servers are added when
4038
* they expire or when we find them in filesystem on startup. When a server crash procedure is
@@ -56,12 +54,6 @@ public class DeadServer {
5654
*/
5755
private final Map<ServerName, Long> deadServers = new HashMap<>();
5856

59-
/**
60-
* Set of dead servers currently being processed by a SCP. Added to this list at the start of SCP
61-
* and removed after it is done processing the crash.
62-
*/
63-
private final Set<ServerName> processingServers = new HashSet<>();
64-
6557
/**
6658
* @param serverName server name.
6759
* @return true if this server is on the dead servers list false otherwise
@@ -70,15 +62,6 @@ public synchronized boolean isDeadServer(final ServerName serverName) {
7062
return deadServers.containsKey(serverName);
7163
}
7264

73-
/**
74-
* Checks if there are currently any dead servers being processed by the master. Returns true if
75-
* at least one region server is currently being processed as dead.
76-
* @return true if any RS are being processed as dead
77-
*/
78-
synchronized boolean areDeadServersInProgress() {
79-
return !processingServers.isEmpty();
80-
}
81-
8265
public synchronized Set<ServerName> copyServerNames() {
8366
Set<ServerName> clone = new HashSet<>(deadServers.size());
8467
clone.addAll(deadServers.keySet());
@@ -90,29 +73,6 @@ public synchronized Set<ServerName> copyServerNames() {
9073
*/
9174
synchronized void putIfAbsent(ServerName sn) {
9275
this.deadServers.putIfAbsent(sn, EnvironmentEdgeManager.currentTime());
93-
processing(sn);
94-
}
95-
96-
/**
97-
* Add <code>sn<</code> to set of processing deadservers.
98-
* @see #finish(ServerName)
99-
*/
100-
public synchronized void processing(ServerName sn) {
101-
if (processingServers.add(sn)) {
102-
// Only log on add.
103-
LOG.debug("Processing {}; numProcessing={}", sn, processingServers.size());
104-
}
105-
}
106-
107-
/**
108-
* Complete processing for this dead server.
109-
* @param sn ServerName for the dead server.
110-
* @see #processing(ServerName)
111-
*/
112-
public synchronized void finish(ServerName sn) {
113-
if (processingServers.remove(sn)) {
114-
LOG.debug("Removed {} from processing; numProcessing={}", sn, processingServers.size());
115-
}
11676
}
11777

11878
public synchronized int size() {
@@ -171,17 +131,12 @@ public synchronized String toString() {
171131
// Display unified set of servers from both maps
172132
Set<ServerName> servers = new HashSet<>();
173133
servers.addAll(deadServers.keySet());
174-
servers.addAll(processingServers);
175134
StringBuilder sb = new StringBuilder();
176135
for (ServerName sn : servers) {
177136
if (sb.length() > 0) {
178137
sb.append(", ");
179138
}
180139
sb.append(sn.toString());
181-
// Star entries that are being processed
182-
if (processingServers.contains(sn)) {
183-
sb.append("*");
184-
}
185140
}
186141
return sb.toString();
187142
}
@@ -220,9 +175,6 @@ public synchronized Date getTimeOfDeath(final ServerName deadServerName) {
220175
* @return true if this server was removed
221176
*/
222177
public synchronized boolean removeDeadServer(final ServerName deadServerName) {
223-
Preconditions.checkState(!processingServers.contains(deadServerName),
224-
"Asked to remove server still in processingServers set " + deadServerName + " (numProcessing="
225-
+ processingServers.size() + ")");
226178
return this.deadServers.remove(deadServerName) != null;
227179
}
228180
}

hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2528,10 +2528,17 @@ public ClearDeadServersResponse clearDeadServers(RpcController controller,
25282528
response.addAllServerName(request.getServerNameList());
25292529
} else {
25302530
for (HBaseProtos.ServerName pbServer : request.getServerNameList()) {
2531-
if (
2532-
!master.getServerManager().getDeadServers()
2533-
.removeDeadServer(ProtobufUtil.toServerName(pbServer))
2534-
) {
2531+
ServerName server = ProtobufUtil.toServerName(pbServer);
2532+
2533+
final boolean deadInProcess = master.getProcedures().stream().anyMatch(
2534+
p -> (p instanceof ServerCrashProcedure)
2535+
&& ((ServerCrashProcedure) p).getServerName().equals(server));
2536+
if (deadInProcess) {
2537+
throw new ServiceException(
2538+
String.format("Dead server '%s' is not 'dead' in fact...", server));
2539+
}
2540+
2541+
if (!deadServer.removeDeadServer(server)) {
25352542
response.addServerName(pbServer);
25362543
}
25372544
}

hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@
5050
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
5151
import org.apache.hadoop.hbase.ipc.RemoteWithExtrasException;
5252
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
53+
import org.apache.hadoop.hbase.master.assignment.RegionStates;
54+
import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
5355
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
5456
import org.apache.hadoop.hbase.procedure2.Procedure;
5557
import org.apache.hadoop.hbase.regionserver.HRegionServer;
@@ -547,8 +549,8 @@ public DeadServer getDeadServers() {
547549
* Checks if any dead servers are currently in progress.
548550
* @return true if any RS are being processed as dead, false if not
549551
*/
550-
public boolean areDeadServersInProgress() {
551-
return this.deadservers.areDeadServersInProgress();
552+
public boolean areDeadServersInProgress() throws IOException {
553+
return master.getProcedures().stream().anyMatch(p -> p instanceof ServerCrashProcedure);
552554
}
553555

554556
void letRegionServersShutdown() {

hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,6 @@ protected Flow executeFromState(MasterProcedureEnv env, ServerCrashState state)
140140
// This adds server to the DeadServer processing list but not to the DeadServers list.
141141
// Server gets removed from processing list below on procedure successful finish.
142142
if (!notifiedDeadServer) {
143-
services.getServerManager().getDeadServers().processing(serverName);
144143
notifiedDeadServer = true;
145144
}
146145

@@ -255,7 +254,6 @@ protected Flow executeFromState(MasterProcedureEnv env, ServerCrashState state)
255254
case SERVER_CRASH_FINISH:
256255
LOG.info("removed crashed server {} after splitting done", serverName);
257256
services.getAssignmentManager().getRegionStates().removeServer(serverName);
258-
services.getServerManager().getDeadServers().finish(serverName);
259257
updateProgress(true);
260258
return Flow.NO_MORE_STATE;
261259
default:

hbase-server/src/test/java/org/apache/hadoop/hbase/TestRegionRebalancing.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ public void testRebalanceOnRegionServerNumberChange() throws IOException, Interr
183183
/**
184184
* Wait on crash processing. Balancer won't run if processing a crashed server.
185185
*/
186-
private void waitOnCrashProcessing() {
186+
private void waitOnCrashProcessing() throws IOException {
187187
while (UTIL.getHBaseCluster().getMaster().getServerManager().areDeadServersInProgress()) {
188188
LOG.info("Waiting on processing of crashed server before proceeding...");
189189
Threads.sleep(1000);

hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDeadServer.java

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static org.junit.Assert.assertFalse;
2121
import static org.junit.Assert.assertTrue;
2222

23+
import java.io.IOException;
2324
import java.util.List;
2425
import java.util.Set;
2526
import org.apache.hadoop.hbase.HBaseClassTestRule;
@@ -69,22 +70,10 @@ public static void tearDownAfterClass() throws Exception {
6970
public void testIsDead() {
7071
DeadServer ds = new DeadServer();
7172
ds.putIfAbsent(hostname123);
72-
ds.processing(hostname123);
73-
assertTrue(ds.areDeadServersInProgress());
74-
ds.finish(hostname123);
75-
assertFalse(ds.areDeadServersInProgress());
7673

7774
ds.putIfAbsent(hostname1234);
78-
ds.processing(hostname1234);
79-
assertTrue(ds.areDeadServersInProgress());
80-
ds.finish(hostname1234);
81-
assertFalse(ds.areDeadServersInProgress());
8275

8376
ds.putIfAbsent(hostname12345);
84-
ds.processing(hostname12345);
85-
assertTrue(ds.areDeadServersInProgress());
86-
ds.finish(hostname12345);
87-
assertFalse(ds.areDeadServersInProgress());
8877

8978
// Already dead = 127.0.0.1,9090,112321
9079
// Coming back alive = 127.0.0.1,9090,223341
@@ -104,15 +93,15 @@ public void testIsDead() {
10493
}
10594

10695
@Test
107-
public void testCrashProcedureReplay() {
96+
public void testCrashProcedureReplay() throws IOException {
10897
HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
10998
final ProcedureExecutor<MasterProcedureEnv> pExecutor = master.getMasterProcedureExecutor();
11099
ServerCrashProcedure proc =
111100
new ServerCrashProcedure(pExecutor.getEnvironment(), hostname123, false, false);
112101

113102
ProcedureTestingUtility.submitAndWait(pExecutor, proc);
114103

115-
assertFalse(master.getServerManager().getDeadServers().areDeadServersInProgress());
104+
assertTrue(master.getServerManager().areDeadServersInProgress());
116105
}
117106

118107
@Test
@@ -163,17 +152,14 @@ public void testClearDeadServer() {
163152
d.putIfAbsent(hostname1234);
164153
Assert.assertEquals(2, d.size());
165154

166-
d.finish(hostname123);
167155
d.removeDeadServer(hostname123);
168156
Assert.assertEquals(1, d.size());
169-
d.finish(hostname1234);
170157
d.removeDeadServer(hostname1234);
171158
Assert.assertTrue(d.isEmpty());
172159

173160
d.putIfAbsent(hostname1234);
174161
Assert.assertFalse(d.removeDeadServer(hostname123_2));
175162
Assert.assertEquals(1, d.size());
176-
d.finish(hostname1234);
177163
Assert.assertTrue(d.removeDeadServer(hostname1234));
178164
Assert.assertTrue(d.isEmpty());
179165
}

hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRollingRestart.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,8 @@ private boolean isDeadServerSCPExecuted(ServerName serverName) throws IOExceptio
230230

231231
private void waitForRSShutdownToStartAndFinish(MasterThread activeMaster, ServerName serverName)
232232
throws InterruptedException, IOException {
233+
private void waitForRSShutdownToStartAndFinish(MasterThread activeMaster,
234+
ServerName serverName) throws InterruptedException, IOException {
233235
ServerManager sm = activeMaster.getMaster().getServerManager();
234236
// First wait for it to be in dead list
235237
while (!sm.getDeadServers().isDeadServer(serverName)) {

hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestHBCKSCP.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,6 @@ public void test() throws Exception {
145145
cluster.killRegionServer(rsServerName);
146146

147147
master.getServerManager().moveFromOnlineToDeadServers(rsServerName);
148-
master.getServerManager().getDeadServers().finish(rsServerName);
149148
master.getServerManager().getDeadServers().removeDeadServer(rsServerName);
150149
master.getAssignmentManager().getRegionStates().removeServer(rsServerName);
151150
// Kill the server. Nothing should happen since an 'Unknown Server' as far
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.replication;
19+
20+
import static org.junit.Assert.assertFalse;
21+
import static org.junit.Assert.assertTrue;
22+
23+
import java.io.IOException;
24+
import java.util.List;
25+
26+
import org.apache.hadoop.fs.Path;
27+
import org.apache.hadoop.hbase.HBaseClassTestRule;
28+
import org.apache.hadoop.hbase.ServerName;
29+
import org.apache.hadoop.hbase.master.MasterFileSystem;
30+
import org.apache.hadoop.hbase.master.ServerManager;
31+
import org.apache.hadoop.hbase.testclassification.LargeTests;
32+
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
33+
import org.apache.hadoop.hbase.util.JVMClusterUtil;
34+
import org.junit.ClassRule;
35+
import org.junit.Test;
36+
import org.junit.experimental.categories.Category;
37+
import org.slf4j.Logger;
38+
import org.slf4j.LoggerFactory;
39+
40+
@Category({ ReplicationTests.class, LargeTests.class })
41+
public class TestSyncReplicationStandbyKillRS extends SyncReplicationTestBase {
42+
43+
private static final Logger LOG =
44+
LoggerFactory.getLogger(TestSyncReplicationStandbyKillRS.class);
45+
46+
private final long SLEEP_TIME = 1000;
47+
48+
private final int COUNT = 1000;
49+
50+
@ClassRule
51+
public static final HBaseClassTestRule CLASS_RULE =
52+
HBaseClassTestRule.forClass(TestSyncReplicationStandbyKillRS.class);
53+
54+
@Test
55+
public void testStandbyKillRegionServer() throws Exception {
56+
MasterFileSystem mfs = UTIL2.getHBaseCluster().getMaster().getMasterFileSystem();
57+
Path remoteWALDir = getRemoteWALDir(mfs, PEER_ID);
58+
assertFalse(mfs.getWALFileSystem().exists(remoteWALDir));
59+
UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
60+
SyncReplicationState.STANDBY);
61+
assertTrue(mfs.getWALFileSystem().exists(remoteWALDir));
62+
UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
63+
SyncReplicationState.ACTIVE);
64+
65+
// Disable async replication and write data, then shutdown
66+
UTIL1.getAdmin().disableReplicationPeer(PEER_ID);
67+
write(UTIL1, 0, COUNT);
68+
UTIL1.shutdownMiniCluster();
69+
70+
JVMClusterUtil.MasterThread activeMaster = UTIL2.getMiniHBaseCluster().getMasterThread();
71+
Thread t = new Thread(() -> {
72+
try {
73+
List<JVMClusterUtil.RegionServerThread> regionServers =
74+
UTIL2.getMiniHBaseCluster().getLiveRegionServerThreads();
75+
for (JVMClusterUtil.RegionServerThread rst : regionServers) {
76+
ServerName serverName = rst.getRegionServer().getServerName();
77+
rst.getRegionServer().stop("Stop RS for test");
78+
waitForRSShutdownToStartAndFinish(activeMaster, serverName);
79+
JVMClusterUtil.RegionServerThread restarted =
80+
UTIL2.getMiniHBaseCluster().startRegionServer();
81+
restarted.waitForServerOnline();
82+
}
83+
} catch (Exception e) {
84+
LOG.error("Failed to kill RS", e);
85+
}
86+
});
87+
t.start();
88+
89+
// Transit standby to DA to replay logs
90+
try {
91+
UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
92+
SyncReplicationState.DOWNGRADE_ACTIVE);
93+
} catch (Exception e) {
94+
LOG.error("Failed to transit standby cluster to " + SyncReplicationState.DOWNGRADE_ACTIVE, e);
95+
}
96+
97+
while (UTIL2.getAdmin().getReplicationPeerSyncReplicationState(PEER_ID)
98+
!= SyncReplicationState.DOWNGRADE_ACTIVE) {
99+
Thread.sleep(SLEEP_TIME);
100+
}
101+
verify(UTIL2, 0, COUNT);
102+
}
103+
104+
private void waitForRSShutdownToStartAndFinish(JVMClusterUtil.MasterThread activeMaster,
105+
ServerName serverName) throws InterruptedException, IOException {
106+
ServerManager sm = activeMaster.getMaster().getServerManager();
107+
// First wait for it to be in dead list
108+
while (!sm.getDeadServers().isDeadServer(serverName)) {
109+
LOG.debug("Waiting for [" + serverName + "] to be listed as dead in master");
110+
Thread.sleep(SLEEP_TIME);
111+
}
112+
LOG.debug("Server [" + serverName + "] marked as dead, waiting for it to " +
113+
"finish dead processing");
114+
while (sm.areDeadServersInProgress()) {
115+
LOG.debug("Server [" + serverName + "] still being processed, waiting");
116+
Thread.sleep(SLEEP_TIME);
117+
}
118+
LOG.debug("Server [" + serverName + "] done with server shutdown processing");
119+
}
120+
}

0 commit comments

Comments
 (0)