Skip to content

Commit 90d9992

Browse files
committed
[BugFix]check if BE node is active StarRocks#94
1. check if BE node is active 2. if not, don't select this
1 parent 6524633 commit 90d9992

File tree

1 file changed

+37
-0
lines changed

1 file changed

+37
-0
lines changed

src/main/java/com/starrocks/connector/spark/rest/RestService.java

+37
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@
5151

5252
import java.io.IOException;
5353
import java.io.Serializable;
54+
import java.net.InetAddress;
55+
import java.net.Socket;
5456
import java.nio.charset.StandardCharsets;
5557
import java.util.ArrayList;
5658
import java.util.Arrays;
@@ -354,6 +356,32 @@ static QueryPlan getQueryPlan(String response, Logger logger) throws StarrocksEx
354356
return queryPlan;
355357
}
356358

359+
/**
360+
* check if BE node is active
361+
* @param candidate BE node, format: <be_address>:<brpc_port>
362+
* @return BE node status, active is true
363+
*/
364+
@VisibleForTesting
365+
static boolean isCandidateActive(String candidate) {
366+
String[] split = candidate.split(":");
367+
String host = split[0];
368+
int port = Integer.parseInt(split[1]);
369+
Socket socket = null;
370+
try {
371+
socket = new Socket(InetAddress.getByName(host), port);
372+
return true;
373+
} catch (Exception e) {
374+
return false;
375+
} finally {
376+
if (socket != null) {
377+
try {
378+
socket.close();
379+
} catch (IOException e) {
380+
}
381+
}
382+
}
383+
}
384+
357385
/**
358386
* select which StarRocks BE to get tablet data.
359387
*
@@ -364,6 +392,7 @@ static QueryPlan getQueryPlan(String response, Logger logger) throws StarrocksEx
364392
*/
365393
@VisibleForTesting
366394
static Map<String, List<Long>> selectBeForTablet(QueryPlan queryPlan, Logger logger) throws StarrocksException {
395+
Map<String, Boolean> candidatesStatus = new HashMap<>();
367396
Map<String, List<Long>> be2Tablets = new HashMap<>();
368397
for (Map.Entry<String, Tablet> part : queryPlan.getPartitions().entrySet()) {
369398
logger.debug("Parse tablet info: '{}'.", part);
@@ -378,6 +407,14 @@ static Map<String, List<Long>> selectBeForTablet(QueryPlan queryPlan, Logger log
378407
String target = null;
379408
int tabletCount = Integer.MAX_VALUE;
380409
for (String candidate : part.getValue().getRoutings()) {
410+
// check if BE node is active and save the status
411+
if (!candidatesStatus.containsKey(candidate)) {
412+
candidatesStatus.put(candidate, isCandidateActive(candidate));
413+
}
414+
// check if BE node is active, if not, continue
415+
if (Boolean.FALSE.equals(candidatesStatus.get(candidate))) {
416+
continue;
417+
}
381418
logger.trace("Evaluate StarRocks BE '{}' to tablet '{}'.", candidate, tabletId);
382419
if (!be2Tablets.containsKey(candidate)) {
383420
logger.debug("Choice a new StarRocks BE '{}' for tablet '{}'.", candidate, tabletId);

0 commit comments

Comments
 (0)