Skip to content

Commit 6f4df8f

Browse files
apache#2963 Fix NPE during task rebalancing
In case _helixManager in one of the known controllers is _null_, finding the "leader controller" can lead to a NPE. When run as an asynchronous task, the NPE may not even be logged, and the task finishes without any trace, hence the addition of ExecutorTaskUtil class to wrap callables/runnables in such a manner that such an exception at least gets logged.
1 parent f37f465 commit 6f4df8f

File tree

10 files changed

+91
-17
lines changed

10 files changed

+91
-17
lines changed

Diff for: helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,9 @@ public static GenericHelixController getLeaderController(String clusterName) {
235235
if (clusterName != null) {
236236
ImmutableSet<GenericHelixController> controllers = _helixControllerFactory.get(clusterName);
237237
if (controllers != null) {
238-
return controllers.stream().filter(controller -> controller._helixManager.isLeader())
238+
return controllers.stream()
239+
.filter(controller -> controller._helixManager != null)
240+
.filter(controller -> controller._helixManager.isLeader())
239241
.findAny().orElse(null);
240242
}
241243
}

Diff for: helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractBaseStage.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.helix.common.DedupEventProcessor;
2828
import org.apache.helix.controller.stages.AttributeName;
2929
import org.apache.helix.controller.stages.ClusterEvent;
30+
import org.apache.helix.util.ExecutorTaskUtil;
3031

3132
public class AbstractBaseStage implements Stage {
3233
protected String _eventId;
@@ -64,9 +65,9 @@ public String getStageName() {
6465
return className;
6566
}
6667

67-
public static <T> Future asyncExecute(ExecutorService service, Callable<T> task) {
68+
public static <T> Future<T> asyncExecute(ExecutorService service, Callable<T> task) {
6869
if (service != null) {
69-
return service.submit(task);
70+
return service.submit(ExecutorTaskUtil.wrap(task));
7071
}
7172
return null;
7273
}

Diff for: helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/GlobalRebalanceRunner.java

+3-4
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,13 @@
3535
import org.apache.helix.controller.rebalancer.waged.model.ClusterModel;
3636
import org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider;
3737
import org.apache.helix.controller.stages.CurrentStateOutput;
38-
import org.apache.helix.model.ClusterTopologyConfig;
39-
import org.apache.helix.model.Partition;
4038
import org.apache.helix.model.Resource;
4139
import org.apache.helix.model.ResourceAssignment;
4240
import org.apache.helix.monitoring.metrics.MetricCollector;
4341
import org.apache.helix.monitoring.metrics.WagedRebalancerMetricCollector;
4442
import org.apache.helix.monitoring.metrics.model.CountMetric;
4543
import org.apache.helix.monitoring.metrics.model.LatencyMetric;
44+
import org.apache.helix.util.ExecutorTaskUtil;
4645
import org.apache.helix.util.RebalanceUtil;
4746
import org.slf4j.Logger;
4847
import org.slf4j.LoggerFactory;
@@ -117,7 +116,7 @@ public void globalRebalance(ResourceControllerDataProvider clusterData, Map<Stri
117116
if (clusterChanges.keySet().stream().anyMatch(GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES::contains)) {
118117
final boolean waitForGlobalRebalance = !_asyncGlobalRebalanceEnabled;
119118
// Calculate the Baseline assignment for global rebalance.
120-
Future<Boolean> result = _baselineCalculateExecutor.submit(() -> {
119+
Future<Boolean> result = _baselineCalculateExecutor.submit(ExecutorTaskUtil.wrap(() -> {
121120
try {
122121
// If the synchronous thread does not wait for the baseline to be calculated, the synchronous thread should
123122
// be triggered again after baseline is finished.
@@ -132,7 +131,7 @@ public void globalRebalance(ResourceControllerDataProvider clusterData, Map<Stri
132131
return false;
133132
}
134133
return true;
135-
});
134+
}));
136135
if (waitForGlobalRebalance) {
137136
try {
138137
if (!result.get()) {

Diff for: helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/PartialRebalanceRunner.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.helix.monitoring.metrics.implementation.BaselineDivergenceGauge;
4040
import org.apache.helix.monitoring.metrics.model.CountMetric;
4141
import org.apache.helix.monitoring.metrics.model.LatencyMetric;
42+
import org.apache.helix.util.ExecutorTaskUtil;
4243
import org.apache.helix.util.RebalanceUtil;
4344
import org.slf4j.Logger;
4445
import org.slf4j.LoggerFactory;
@@ -99,7 +100,7 @@ public void partialRebalance(ResourceControllerDataProvider clusterData, Map<Str
99100
return;
100101
}
101102

102-
_asyncPartialRebalanceResult = _bestPossibleCalculateExecutor.submit(() -> {
103+
_asyncPartialRebalanceResult = _bestPossibleCalculateExecutor.submit(ExecutorTaskUtil.wrap(() -> {
103104
try {
104105
doPartialRebalance(clusterData, resourceMap, activeNodes, algorithm,
105106
currentStateOutput);
@@ -111,7 +112,7 @@ public void partialRebalance(ResourceControllerDataProvider clusterData, Map<Str
111112
return false;
112113
}
113114
return true;
114-
});
115+
}));
115116
if (!_asyncPartialRebalanceEnabled) {
116117
try {
117118
if (!_asyncPartialRebalanceResult.get()) {

Diff for: helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.apache.helix.model.Resource;
4949
import org.apache.helix.model.ResourceConfig;
5050
import org.apache.helix.model.StateModelDefinition;
51+
import org.apache.helix.util.ExecutorTaskUtil;
5152
import org.apache.helix.util.HelixUtil;
5253
import org.apache.helix.util.MessageUtil;
5354
import org.slf4j.Logger;
@@ -401,7 +402,7 @@ private void addGeneratedMessageToMap(final Message message,
401402
private void schedulePendingMessageCleanUp(
402403
final Map<String, Map<String, Message>> pendingMessagesToPurge, ExecutorService workerPool,
403404
final HelixDataAccessor accessor) {
404-
workerPool.submit(new Callable<Object>() {
405+
workerPool.submit(ExecutorTaskUtil.wrap(ExecutorTaskUtil.wrap(new Callable<Object>() {
405406
@Override
406407
public Object call() {
407408
for (Map.Entry<String, Map<String, Message>> entry : pendingMessagesToPurge.entrySet()) {
@@ -415,7 +416,7 @@ public Object call() {
415416
}
416417
return null;
417418
}
418-
});
419+
})));
419420
}
420421

421422
private boolean shouldCleanUpPendingMessage(Message pendingMsg, Map<String, String> sessionIdMap,

Diff for: helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventExecutor.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.helix.HelixManager;
2626
import org.apache.helix.NotificationContext;
2727
import org.apache.helix.common.DedupEventBlockingQueue;
28+
import org.apache.helix.util.ExecutorTaskUtil;
2829
import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException;
2930
import org.slf4j.Logger;
3031
import org.slf4j.LoggerFactory;
@@ -89,7 +90,7 @@ public void submitEventToExecutor(NotificationContext.Type eventType, Notificati
8990
}
9091
if (_futureCallBackProcessEvent == null || _futureCallBackProcessEvent.isDone()) {
9192
_futureCallBackProcessEvent =
92-
_threadPoolExecutor.submit(new CallbackProcessor(handler, event));
93+
_threadPoolExecutor.submit(ExecutorTaskUtil.wrap(new CallbackProcessor(handler, event)));
9394
} else {
9495
_callBackEventQueue.put(eventType, event);
9596
}
@@ -102,7 +103,7 @@ private void submitPendingHandleCallBackEventToManagerThreadPool(CallbackHandler
102103
try {
103104
NotificationContext event = _callBackEventQueue.take();
104105
_futureCallBackProcessEvent =
105-
_threadPoolExecutor.submit(new CallbackProcessor(handler, event));
106+
_threadPoolExecutor.submit(ExecutorTaskUtil.wrap(new CallbackProcessor(handler, event)));
106107
} catch (InterruptedException e) {
107108
logger
108109
.error("Error when submitting pending HandleCallBackEvent to manager thread pool", e);

Diff for: helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@
7676
import org.apache.helix.participant.statemachine.StateModel;
7777
import org.apache.helix.participant.statemachine.StateModelFactory;
7878
import org.apache.helix.task.TaskConstants;
79+
import org.apache.helix.util.ExecutorTaskUtil;
7980
import org.apache.helix.util.HelixUtil;
8081
import org.apache.helix.util.StatusUpdateUtil;
8182
import org.apache.helix.zookeeper.datamodel.ZNRecord;
@@ -465,7 +466,7 @@ public boolean scheduleTask(MessageTask task) {
465466
}
466467

467468
LOG.info("Submit task: " + taskId + " to pool: " + exeSvc);
468-
Future<HelixTaskResult> future = exeSvc.submit(task);
469+
Future<HelixTaskResult> future = exeSvc.submit(ExecutorTaskUtil.wrap(task));
469470

470471
_messageTaskMap
471472
.putIfAbsent(getMessageTarget(message.getResourceName(), message.getPartitionName()),

Diff for: helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
import org.apache.helix.model.InstanceConfig;
6565
import org.apache.helix.model.LiveInstance;
6666
import org.apache.helix.monitoring.mbeans.RoutingTableProviderMonitor;
67+
import org.apache.helix.util.ExecutorTaskUtil;
6768
import org.slf4j.Logger;
6869
import org.slf4j.LoggerFactory;
6970

@@ -971,7 +972,7 @@ private void recordPropagationLatency(final long currentTime,
971972
// restrict running report task count to be 1.
972973
// Any parallel tasks will be skipped. So the reporting metric data is sampled.
973974
if (_reportingTask == null || _reportingTask.isDone()) {
974-
_reportingTask = _reportExecutor.submit(new Callable<Object>() {
975+
_reportingTask = _reportExecutor.submit(ExecutorTaskUtil.wrap(new Callable<Object>() {
975976
@Override
976977
public Object call() {
977978
// getNewCurrentStateEndTimes() needs to iterate all current states. Make it async to
@@ -1000,7 +1001,7 @@ public Object call() {
10001001
}
10011002
return null;
10021003
}
1003-
});
1004+
}));
10041005
}
10051006
}
10061007

Diff for: helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.helix.participant.statemachine.StateModel;
3232
import org.apache.helix.participant.statemachine.StateModelInfo;
3333
import org.apache.helix.participant.statemachine.Transition;
34+
import org.apache.helix.util.ExecutorTaskUtil;
3435
import org.slf4j.Logger;
3536
import org.slf4j.LoggerFactory;
3637

@@ -329,7 +330,7 @@ private void startTask(Message msg, String taskPartition) {
329330
_taskRunner =
330331
new TaskRunner(task, msg.getResourceName(), taskPartition, msg.getTgtName(), _manager,
331332
msg.getTgtSessionId(), this);
332-
_taskExecutor.submit(_taskRunner);
333+
_taskExecutor.submit(ExecutorTaskUtil.wrap(_taskRunner));
333334
_taskRunner.waitTillStarted();
334335

335336
// Set up a timer to cancel the task when its time out expires.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package org.apache.helix.util;
2+
3+
/*
4+
* Licensed to the Apache Software Foundation (ASF) under one
5+
* or more contributor license agreements. See the NOTICE file
6+
* distributed with this work for additional information
7+
* regarding copyright ownership. The ASF licenses this file
8+
* to you under the Apache License, Version 2.0 (the
9+
* "License"); you may not use this file except in compliance
10+
* with the License. You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing,
15+
* software distributed under the License is distributed on an
16+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17+
* KIND, either express or implied. See the License for the
18+
* specific language governing permissions and limitations
19+
* under the License.
20+
*/
21+
22+
import java.util.concurrent.Callable;
23+
24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
26+
27+
public class ExecutorTaskUtil {
28+
29+
private static final Logger LOG = LoggerFactory.getLogger(ExecutorTaskUtil.class);
30+
31+
/**
32+
* Wrap a callable so that any raised exception is logged
33+
* (can be interesting in case the callable is used as a completely asynchronous task
34+
* fed to an {@link java.util.concurrent.ExecutorService}), for which we are never
35+
* calling any of the {@link java.util.concurrent.Future#get()} or {@link java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit)}
36+
* methods.
37+
*/
38+
public static <T> Callable<T> wrap(Callable<T> callable) {
39+
return () -> {
40+
try {
41+
return callable.call();
42+
} catch (Throwable t) {
43+
LOG.error("Callable run on thread {} raised an exception and exited", Thread.currentThread().getName(), t);
44+
throw t;
45+
}
46+
};
47+
}
48+
49+
/**
50+
* Wrap a runnable so that any raised exception is logged
51+
* (can be interesting in case the callable is used as a completely asynchronous task
52+
* fed to an {@link java.util.concurrent.ExecutorService}), for which we are never
53+
* calling any of the {@link java.util.concurrent.Future#get()} or {@link java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit)}
54+
* methods.
55+
*/
56+
public static Runnable wrap(Runnable runnable) {
57+
return () -> {
58+
try {
59+
runnable.run();
60+
} catch (Throwable t) {
61+
LOG.error("Runnable run on thread {} raised an exception and exited", Thread.currentThread().getName(), t);
62+
throw t;
63+
}
64+
};
65+
}
66+
}

0 commit comments

Comments
 (0)