Skip to content

transport: log network reconnects with same peer process #128415

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.MockLog;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.transport.ClusterConnectionManager;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.TransportLogger;

Expand All @@ -27,7 +28,7 @@ public class ESLoggingHandlerIT extends ESNetty4IntegTestCase {

public void setUp() throws Exception {
super.setUp();
mockLog = MockLog.capture(ESLoggingHandler.class, TransportLogger.class, TcpTransport.class);
mockLog = MockLog.capture(ESLoggingHandler.class, TransportLogger.class, TcpTransport.class, ClusterConnectionManager.class);
}

public void tearDown() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,20 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterApplier;
import org.elasticsearch.common.ReferenceDocs;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportConnectionListener;
import org.elasticsearch.transport.TransportService;

import java.util.ArrayList;
Expand All @@ -35,6 +40,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -79,12 +85,14 @@ public class NodeConnectionsService extends AbstractLifecycleComponent {

private final TimeValue reconnectInterval;
private volatile ConnectionChecker connectionChecker;
private final ConnectionHistory connectionHistory;

@Inject
public NodeConnectionsService(Settings settings, ThreadPool threadPool, TransportService transportService) {
this.threadPool = threadPool;
this.transportService = transportService;
this.reconnectInterval = NodeConnectionsService.CLUSTER_NODE_RECONNECT_INTERVAL_SETTING.get(settings);
this.connectionHistory = new ConnectionHistory();
}

/**
Expand All @@ -101,6 +109,7 @@ public void connectToNodes(DiscoveryNodes discoveryNodes, Runnable onCompletion)
final List<Runnable> runnables = new ArrayList<>(discoveryNodes.getSize());
try (var refs = new RefCountingRunnable(onCompletion)) {
synchronized (mutex) {
connectionHistory.reserveConnectionHistoryForNodes(DiscoveryNodes);
// Ugly hack: when https://github.com/elastic/elasticsearch/issues/94946 is fixed, just iterate over discoveryNodes here
for (final Iterator<DiscoveryNode> iterator = discoveryNodes.mastersFirstStream().iterator(); iterator.hasNext();) {
final DiscoveryNode discoveryNode = iterator.next();
Expand Down Expand Up @@ -137,6 +146,7 @@ public void disconnectFromNodesExcept(DiscoveryNodes discoveryNodes) {
nodesToDisconnect.remove(discoveryNode);
}

connectionHistory.removeConnectionHistoryForNodes(nodesToDisconnect);
for (final DiscoveryNode discoveryNode : nodesToDisconnect) {
runnables.add(targetsByNode.remove(discoveryNode)::disconnect);
}
Expand Down Expand Up @@ -347,4 +357,113 @@ public String toString() {
}
}
}

private class ConnectionHistory {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I like the look of this. Maybe ConnectionHistory implements TransportConnectionListener rather than having another layer of indirection?

Also this needs to be covered in NodeConnectionsServiceTests.

record NodeConnectionHistory(String ephemeralId, long disconnectTime, Exception disconnectCause) {}

/**
* Holds the DiscoveryNode nodeId to connection history record.
*
* Entries for each node are reserved during NodeConnectionsService.connectToNodes, by placing a (nodeId, dummy) entry
* for each node in the cluster. On node disconnect, this entry is updated with its NodeConnectionHistory. On node
* connect, this entry is reset to the dummy value. On NodeConnectionsService.disconnectFromNodesExcept, node entries
* are removed.
*
* Each node in the cluster always has a nodeHistory entry that is either the dummy value or a connection history record. This
* allows node disconnect callbacks to discard their entry if the disconnect occurred because of a change in cluster state.
*/
private final NodeConnectionHistory dummy = new NodeConnectionHistory("", 0, null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be static I think, it's a global constant. We tend to name global constants in SHOUTY_SNAKE_CASE reflecting their meaning, so here I'd suggest CONNECTED or CONNECTED_MARKER or something like that. This way you get to say nodeConnectionHistory != CONNECTED_MARKER below which makes it clearer to the reader what this predicate means.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: also looks like the javadoc is for the nodeHistory field

private final ConcurrentMap<String, NodeConnectionHistory> nodeHistory = ConcurrentCollections.newConcurrentMap();

ConnectionHistory() {
NodeConnectionsService.this.transportService.addConnectionListener(new TransportConnectionListener() {
@Override
public void onNodeConnected(DiscoveryNode node, Transport.Connection connection) {
// log case where the remote node has same ephemeralId as its previous connection
// (the network was disrupted, but not the remote process)
NodeConnectionHistory nodeConnectionHistory = nodeHistory.get(node.getId());
if (nodeConnectionHistory != null) {
nodeHistory.replace(node.getId(), nodeConnectionHistory, dummy);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks a little racy, although in practice I think it's fine because ClusterConnectionManager protects against opening multiple connections to the same node concurrently. Still, if we did all this (including the logging) within a nodeHistory.compute(node.getId, ...) then there'd obviously be no races.


if (nodeConnectionHistory != null
&& nodeConnectionHistory != dummy
&& nodeConnectionHistory.ephemeralId.equals(node.getEphemeralId())) {
if (nodeConnectionHistory.disconnectCause != null) {
logger.warn(
() -> format(
"reopened transport connection to node [%s] "
+ "which disconnected exceptionally [%dms] ago but did not "
+ "restart, so the disconnection is unexpected; "
+ "if unexpected, see [{}] for troubleshooting guidance",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for if unexpected here, I think the point is that this situation is always unexpected.

node.descriptionWithoutAttributes(),
nodeConnectionHistory.disconnectTime,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This'll show the absolute disconnect time in milliseconds (i.e. since 1970) whereas I think we want to see the duration between the disconnect and the current time.

ReferenceDocs.NETWORK_DISCONNECT_TROUBLESHOOTING
),
nodeConnectionHistory.disconnectCause
);
} else {
logger.warn(
"""
reopened transport connection to node [{}] \
which disconnected gracefully [{}ms] ago but did not \
restart, so the disconnection is unexpected; \
if unexpected, see [{}] for troubleshooting guidance""",
node.descriptionWithoutAttributes(),
nodeConnectionHistory.disconnectTime,
ReferenceDocs.NETWORK_DISCONNECT_TROUBLESHOOTING
);
}
}
}

@Override
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just spotted we're already executing this in a close-listener, but one that runs under ActionListener.running(...) so it drops the exception. I think it'd be nicer to adjust this callback to take a @Nullable Exception e parameter rather than having to add a second close listener just to pick up the exception as done here.

connection.addCloseListener(new ActionListener<Void>() {
@Override
public void onResponse(Void ignored) {
insertNodeConnectionHistory(null);
}

@Override
public void onFailure(Exception e) {
insertNodeConnectionHistory(e);
}

private void insertNodeConnectionHistory(@Nullable Exception e) {
final long disconnectTime = threadPool.absoluteTimeInMillis();
final NodeConnectionHistory nodeConnectionHistory = new NodeConnectionHistory(
node.getEphemeralId(),
disconnectTime,
e
);
final String nodeId = node.getId();
NodeConnectionHistory previousConnectionHistory = nodeHistory.get(nodeId);
if (previousConnectionHistory != null) {
nodeHistory.replace(nodeId, previousConnectionHistory, nodeConnectionHistory);
}
}
});
}
});
}

void reserveConnectionHistoryForNodes(DiscoveryNodes nodes) {
Copy link
Contributor

@nicktindall nicktindall May 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I wonder if this should be called something like startTrackingConnectionHistory (and the other method stop...), the "reserving" language seems like an implementation detail leaking?

I do like the implementation though, nice approach to fixing the race.

for (DiscoveryNode node : nodes) {
nodeHistory.put(node.getId(), dummy);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might need to be putIfAbsent so we don't over-write any actual current NodeConnectionHistory entries right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure. My read was these two calls would come from cluster state changing to add or remove nodes from this table. Inclusion is controlled by these calls, which unconditionally add or remove entries. The close callback has to be careful to check if it has an entry that's valid: this protects against long-running callbacks inserting garbage into the table.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The DiscoveryNodes passed to connectToNodes contains all the nodes in the cluster, including any existing ones, so if there's a node which already exists in the cluster, and is currently disconnected, then it will have an entry in nodeHistory which isn't dummy that this line will overwrite on any cluster state update. So yeah I think putIfAbsent is what we want here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get it now -- for whatever reason, I thought it was passing in the deltas, but it's obvious from connectToNodes that the node connections service is doing that calculation

}
}

void removeConnectionHistoryForNodes(Set<DiscoveryNode> nodes) {
final int startSize = nodeHistory.size();
for (DiscoveryNode node : nodes) {
nodeHistory.remove(node.getId());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's kind of an implicit invariant here that org.elasticsearch.cluster.NodeConnectionsService.ConnectionHistory#nodeHistory and org.elasticsearch.cluster.NodeConnectionsService#targetsByNode have the same keys. At the very least we should be able to assert this. I also wonder if we should be calling nodeHistory.retainAll() to make it super-clear that we are keeping these keysets aligned.

But then that got me thinking, maybe we should be tracking the connection history of each target node in ConnectionTarget rather than trying to maintain two parallel maps. Could that work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a great idea... ConnectionTarget has exactly the lifecycle needed. I think because I moved it from elsewhere and am having a rough week over here, this didn't occur to me.

}
logger.trace("Connection history garbage-collected from {} to {} entries", startSize, nodeHistory.size());
}

int connectionHistorySize() {
return nodeHistory.size();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -235,25 +235,48 @@ private void connectToNodeOrRetry(
managerRefs.decRef();
}));

conn.addCloseListener(ActionListener.running(() -> {
if (connectingRefCounter.hasReferences() == false) {
logger.trace("connection manager shut down, closing transport connection to [{}]", node);
} else if (conn.hasReferences()) {
logger.info(
"""
transport connection to [{}] closed by remote; \
if unexpected, see [{}] for troubleshooting guidance""",
node.descriptionWithoutAttributes(),
ReferenceDocs.NETWORK_DISCONNECT_TROUBLESHOOTING
);
// In production code we only close connections via ref-counting, so this message confirms that a
// 'node-left ... reason: disconnected' event was caused by external factors. Put differently, if a
// node leaves the cluster with "reason: disconnected" but without this message being logged then
// that's a bug.
} else {
logger.debug("closing unused transport connection to [{}]", node);
conn.addCloseListener(new ActionListener<Void>() {
@Override
public void onResponse(Void ignored) {
if (connectingRefCounter.hasReferences() == false) {
logger.trace("connection manager shut down, closing transport connection to [{}]", node);
} else if (conn.hasReferences()) {
logger.info(
"""
transport connection to [{}] closed by remote; \
if unexpected, see [{}] for troubleshooting guidance""",
node.descriptionWithoutAttributes(),
ReferenceDocs.NETWORK_DISCONNECT_TROUBLESHOOTING
);
// In production code we only close connections via ref-counting, so this message confirms that
// a 'node-left ... reason: disconnected' event was caused by external factors. Put
// differently, if a node leaves the cluster with "reason: disconnected" but without this
// message being logged then that's a bug.
} else {
logger.debug("closing unused transport connection to [{}]", node);
}
}
}));

@Override
public void onFailure(Exception e) {
if (conn.hasReferences()) {
logger.warn(
"""
transport connection to [{}] closed by remote with exception [{}]; \
if unexpected, see [{}] for troubleshooting guidance""",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this isn't guaranteed to be a WARN worthy event - if the node shut down then we might get a Connection reset or similar but that's not something that needs action, and we do log those exceptions elsewhere. On reflection I'd rather leave the logging in ClusterConnectionManager alone in this PR and just look at the new logs from the NodeConnectionsService.

node.descriptionWithoutAttributes(),
e,
ReferenceDocs.NETWORK_DISCONNECT_TROUBLESHOOTING
);
} else {
logger.debug(
"closing unused transport connection to [{}], exception [{}]",
node.descriptionWithoutAttributes(),
e
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like previously we would only have logged at debug level in this scenario? unless I'm reading it wrong. I'm not sure how interesting this case is (as we were disconnecting from the node anyway)?

}
}
});
}
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.transport;

import org.apache.logging.log4j.Level;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.MockLog;
import org.elasticsearch.test.junit.annotations.TestLogging;

@ESIntegTestCase.ClusterScope(numDataNodes = 2, scope = ESIntegTestCase.Scope.TEST)
public class ClusterConnectionManagerIntegTests extends ESIntegTestCase {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: ESIntegTestCase tests should have names ending in IT and be in the internalClusterTest source set. But as mentioned in my previous comment we probably don't want to change this here.

private MockLog mockLog;

public void setUp() throws Exception {
super.setUp();
mockLog = MockLog.capture(ClusterConnectionManager.class);
}

public void tearDown() throws Exception {
mockLog.close();
super.tearDown();
}

@TestLogging(
value = "org.elasticsearch.transport.ClusterConnectionManager:WARN",
reason = "to ensure we log cluster manager disconnect events on WARN level"
)
public void testExceptionalDisconnectLoggingInClusterConnectionManager() throws Exception {
mockLog.addExpectation(
new MockLog.PatternSeenEventExpectation(
"cluster connection manager exceptional disconnect log",
ClusterConnectionManager.class.getCanonicalName(),
Level.WARN,
"transport connection to \\[.*\\] closed (by remote )?with exception .*"
)
);

final String nodeName = internalCluster().startNode();
internalCluster().restartNode(nodeName);

mockLog.assertAllExpectationsMatched();
}
}
Loading