Skip to content

transport: log network reconnects with same peer process #128415

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,19 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterApplier;
import org.elasticsearch.common.ReferenceDocs;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportConnectionListener;
import org.elasticsearch.transport.TransportService;

import java.util.ArrayList;
Expand Down Expand Up @@ -188,6 +192,7 @@ public String toString() {

@Override
protected void doStart() {
transportService.addConnectionListener(new ConnectionChangeListener());
final ConnectionChecker connectionChecker = new ConnectionChecker();
this.connectionChecker = connectionChecker;
connectionChecker.scheduleNextCheck();
Expand All @@ -209,12 +214,41 @@ public void reconnectToNodes(DiscoveryNodes discoveryNodes, Runnable onCompletio
});
}

// exposed for testing
protected DisconnectionHistory disconnectionHistoryForNode(DiscoveryNode node) {
synchronized (mutex) {
ConnectionTarget connectionTarget = targetsByNode.get(node);
if (connectionTarget != null) {
return connectionTarget.disconnectionHistory;
}
}
return null;
}

/**
* Time of disconnect in absolute time ({@link ThreadPool#absoluteTimeInMillis()}),
* and disconnect-causing exception, if any
*/
record DisconnectionHistory(long disconnectTimeMillis, @Nullable Exception disconnectCause) {
public long getDisconnectTimeMillis() {
return disconnectTimeMillis;
}

public Exception getDisconnectCause() {
return disconnectCause;
}
}

private class ConnectionTarget {
private final DiscoveryNode discoveryNode;

private final AtomicInteger consecutiveFailureCount = new AtomicInteger();
private final AtomicReference<Releasable> connectionRef = new AtomicReference<>();

// access is synchronized by the service mutex
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@Nullable // null when node is connected or initialized; non-null in between disconnects and connects
private DisconnectionHistory disconnectionHistory = null;

// all access to these fields is synchronized
private List<Releasable> pendingRefs;
private boolean connectionInProgress;
Expand Down Expand Up @@ -345,4 +379,70 @@ public String toString() {
return "ConnectionTarget{discoveryNode=" + discoveryNode + '}';
}
}

/**
* Receives connection/disconnection events from the transport, and records them in per-node DisconnectionHistory
* structures for logging network issues. DisconnectionHistory records are stored their node's ConnectionTarget.
*
* Network issues (that this listener monitors for) occur whenever a reconnection to a node succeeds,
* and it has the same ephemeral ID as it did during the last connection; this happens when a connection event
* occurs, and its ConnectionTarget entry has a previous DisconnectionHistory stored.
*/
private class ConnectionChangeListener implements TransportConnectionListener {
@Override
public void onNodeConnected(DiscoveryNode node, Transport.Connection connection) {
DisconnectionHistory disconnectionHistory = null;
synchronized (mutex) {
ConnectionTarget connectionTarget = targetsByNode.get(node);
if (connectionTarget != null) {
disconnectionHistory = connectionTarget.disconnectionHistory;
connectionTarget.disconnectionHistory = null;
}
}

if (disconnectionHistory != null) {
long millisSinceDisconnect = threadPool.absoluteTimeInMillis() - disconnectionHistory.disconnectTimeMillis;
long secondsSinceDisconnect = millisSinceDisconnect / 1000;
if (disconnectionHistory.disconnectCause != null) {
logger.warn(
() -> format(
"""
reopened transport connection to node [%s] \
which disconnected exceptionally [%ds/%dms] ago but did not \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah this could be minutes/hours/days too, not just seconds - we should convert to a TimeValue and use its toString().

restart, so the disconnection is unexpected; \
see [%s] for troubleshooting guidance""",
node.descriptionWithoutAttributes(),
secondsSinceDisconnect,
millisSinceDisconnect,
ReferenceDocs.NETWORK_DISCONNECT_TROUBLESHOOTING
),
disconnectionHistory.disconnectCause
);
} else {
logger.warn(
"""
reopened transport connection to node [{}] \
which disconnected gracefully [{}s/{}ms] ago but did not \
restart, so the disconnection is unexpected; \
see [{}] for troubleshooting guidance""",
node.descriptionWithoutAttributes(),
secondsSinceDisconnect,
millisSinceDisconnect,
ReferenceDocs.NETWORK_DISCONNECT_TROUBLESHOOTING
);
}
}
}

@Override
public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) {
DisconnectionHistory disconnectionHistory = new DisconnectionHistory(threadPool.absoluteTimeInMillis(), closeException);
synchronized (mutex) {
ConnectionTarget connectionTarget = targetsByNode.get(node);
if (connectionTarget != null) {
connectionTarget.disconnectionHistory = disconnectionHistory;
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.cluster.NodeConnectionsService.DisconnectionHistory;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
Expand Down Expand Up @@ -49,6 +50,7 @@
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.TransportStats;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Before;

Expand All @@ -75,6 +77,7 @@ public class NodeConnectionsServiceTests extends ESTestCase {
private ThreadPool threadPool;
private TransportService transportService;
private Map<DiscoveryNode, CheckedRunnable<Exception>> nodeConnectionBlocks;
private Map<DiscoveryNode, Exception> nodeCloseExceptions;

private List<DiscoveryNode> generateNodes() {
List<DiscoveryNode> nodes = new ArrayList<>();
Expand Down Expand Up @@ -246,6 +249,110 @@ public String toString() {
assertConnectedExactlyToNodes(transportService, targetNodes);
}

public void testDisconnectionHistory() {
final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue();
final ThreadPool threadPool = deterministicTaskQueue.getThreadPool();
final long reconnectIntervalMillis = CLUSTER_NODE_RECONNECT_INTERVAL_SETTING.get(Settings.EMPTY).millis();
final long reconnectIntervalSeconds = reconnectIntervalMillis / 1000;

MockTransport transport = new MockTransport(threadPool);
TestTransportService transportService = new TestTransportService(transport, threadPool);
transportService.start();
transportService.acceptIncomingRequests();

final NodeConnectionsService service = new NodeConnectionsService(Settings.EMPTY, threadPool, transportService);
service.start();

final DiscoveryNode noClose = DiscoveryNodeUtils.create("noClose");
final DiscoveryNode gracefulClose = DiscoveryNodeUtils.create("gracefulClose");
final DiscoveryNode exceptionalClose = DiscoveryNodeUtils.create("exceptionalClose");

nodeCloseExceptions.put(exceptionalClose, new RuntimeException());

final AtomicBoolean connectionCompleted = new AtomicBoolean();
DiscoveryNodes nodes = DiscoveryNodes.builder().add(noClose).add(gracefulClose).add(exceptionalClose).build();

service.connectToNodes(nodes, () -> connectionCompleted.set(true));
deterministicTaskQueue.runAllRunnableTasks();
assertTrue(connectionCompleted.get());

assertNullDisconnectionHistory(service, noClose);
assertNullDisconnectionHistory(service, gracefulClose);
assertNullDisconnectionHistory(service, exceptionalClose);

transportService.disconnectFromNode(gracefulClose);
transportService.disconnectFromNode(exceptionalClose);

// check disconnection history set after close
assertNullDisconnectionHistory(service, noClose);
assertDisconnectionHistoryDetails(service, threadPool, gracefulClose, null);
assertDisconnectionHistoryDetails(service, threadPool, exceptionalClose, RuntimeException.class);

try (var mockLog = MockLog.capture(NodeConnectionsService.class)) {
mockLog.addExpectation(
new MockLog.SeenEventExpectation(
"reconnect after graceful close",
NodeConnectionsService.class.getCanonicalName(),
Level.WARN,
"reopened transport connection to node ["
+ gracefulClose.descriptionWithoutAttributes()
+ "] which disconnected gracefully ["
+ reconnectIntervalSeconds
+ "s/"
+ reconnectIntervalMillis
+ "ms] ago "
+ "but did not restart, so the disconnection is unexpected; "
+ "see [https://www.elastic.co/docs/*] for troubleshooting guidance"
)
);
mockLog.addExpectation(
new MockLog.SeenEventExpectation(
"reconnect after exceptional close",
NodeConnectionsService.class.getCanonicalName(),
Level.WARN,
"reopened transport connection to node ["
+ exceptionalClose.descriptionWithoutAttributes()
+ "] which disconnected exceptionally ["
+ reconnectIntervalSeconds
+ "s/"
+ reconnectIntervalMillis
+ "ms] ago "
+ "but did not restart, so the disconnection is unexpected; "
+ "see [https://www.elastic.co/docs/*] for troubleshooting guidance"
)
);
runTasksUntil(deterministicTaskQueue, deterministicTaskQueue.getCurrentTimeMillis() + reconnectIntervalMillis);
mockLog.assertAllExpectationsMatched();
}

// check on reconnect -- disconnection history is reset
assertNullDisconnectionHistory(service, noClose);
assertNullDisconnectionHistory(service, gracefulClose);
assertNullDisconnectionHistory(service, exceptionalClose);
}

private void assertNullDisconnectionHistory(NodeConnectionsService service, DiscoveryNode node) {
DisconnectionHistory disconnectionHistory = service.disconnectionHistoryForNode(node);
assertNull(disconnectionHistory);
}

private void assertDisconnectionHistoryDetails(
NodeConnectionsService service,
ThreadPool threadPool,
DiscoveryNode node,
@Nullable Class<?> disconnectCauseClass
) {
DisconnectionHistory disconnectionHistory = service.disconnectionHistoryForNode(node);
assertNotNull(disconnectionHistory);
assertTrue(threadPool.absoluteTimeInMillis() - disconnectionHistory.getDisconnectTimeMillis() >= 0);
assertTrue(threadPool.absoluteTimeInMillis() - disconnectionHistory.getDisconnectTimeMillis() <= 200);
if (disconnectCauseClass != null) {
assertThat(disconnectionHistory.getDisconnectCause(), Matchers.isA(disconnectCauseClass));
} else {
assertNull(disconnectionHistory.getDisconnectCause());
}
}

public void testOnlyBlocksOnConnectionsToNewNodes() throws Exception {
final NodeConnectionsService service = new NodeConnectionsService(Settings.EMPTY, threadPool, transportService);

Expand Down Expand Up @@ -526,6 +633,7 @@ public void setUp() throws Exception {
ThreadPool threadPool = new TestThreadPool(getClass().getName());
this.threadPool = threadPool;
nodeConnectionBlocks = newConcurrentMap();
nodeCloseExceptions = newConcurrentMap();
transportService = new TestTransportService(new MockTransport(threadPool), threadPool);
transportService.start();
transportService.acceptIncomingRequests();
Expand Down Expand Up @@ -644,7 +752,12 @@ public void addCloseListener(ActionListener<Void> listener1) {

@Override
public void close() {
closeListener.onResponse(null);
Exception closeException = nodeCloseExceptions.get(node);
if (closeException != null) {
closeListener.onFailure(closeException);
} else {
closeListener.onResponse(null);
}
}

@Override
Expand Down