Skip to content

Commit 2305643

Browse files
committed
Merge bitcoin/bitcoin#27257: refactor, net: End friendship of CNode, CConnman and ConnmanTestMsg
3566aa7 [net] Remove CNode friends (dergoegge) 3eac5e7 [net] Add CNode helper for send byte accounting (dergoegge) 60441a3 scripted-diff: [net] Rename CNode process queue members (dergoegge) 6693c49 [net] Make cs_vProcessMsg a non-recursive mutex (dergoegge) 23d9352 [net] Make CNode msg process queue members private (dergoegge) 897e342 [net] Encapsulate CNode message polling (dergoegge) cc5cdf8 [net] Deduplicate marking received message for processing (dergoegge) ad44aa5 [net] Add connection type getter to CNode (dergoegge) Pull request description: We should define clear interfaces between CNode, CConnman and PeerManager. This PR makes a small step in that direction by ending the friendship of CNode, CConnman and ConnmanTestMsg. CNode's message processing queue is made private in the process and its mutex is turned into a non-recursive mutex. ACKs for top commit: jnewbery: utACK 3566aa7 vasild: ACK 3566aa7 theStack: re-ACK 3566aa7 brunoerg: re-ACK 3566aa7 Tree-SHA512: 26b87da5054e32401b693b2904e9c5f40e35a53937c0b6cf44b8597034ad07bacf27d87cdffc54d3e7ccfebde4231ef30a38d326f88cc18133bbb34688ead567
2 parents 381593c + 3566aa7 commit 2305643

File tree

4 files changed

+73
-48
lines changed

4 files changed

+73
-48
lines changed

src/net.cpp

+36-16
Original file line numberDiff line numberDiff line change
@@ -917,7 +917,7 @@ bool CConnman::AttemptToEvictConnection()
917917
.m_is_local = node->addr.IsLocal(),
918918
.m_network = node->ConnectedThroughNetwork(),
919919
.m_noban = node->HasPermission(NetPermissionFlags::NoBan),
920-
.m_conn_type = node->m_conn_type,
920+
.m_conn_type = node->GetConnectionType(),
921921
};
922922
vEvictionCandidates.push_back(candidate);
923923
}
@@ -1092,7 +1092,7 @@ bool CConnman::AddConnection(const std::string& address, ConnectionType conn_typ
10921092

10931093
// Count existing connections
10941094
int existing_connections = WITH_LOCK(m_nodes_mutex,
1095-
return std::count_if(m_nodes.begin(), m_nodes.end(), [conn_type](CNode* node) { return node->m_conn_type == conn_type; }););
1095+
return std::count_if(m_nodes.begin(), m_nodes.end(), [conn_type](CNode* node) { return node->GetConnectionType() == conn_type; }););
10961096

10971097
// Max connections of specified type already exist
10981098
if (max_connections != std::nullopt && existing_connections >= max_connections) return false;
@@ -1328,18 +1328,7 @@ void CConnman::SocketHandlerConnected(const std::vector<CNode*>& nodes,
13281328
}
13291329
RecordBytesRecv(nBytes);
13301330
if (notify) {
1331-
size_t nSizeAdded = 0;
1332-
for (const auto& msg : pnode->vRecvMsg) {
1333-
// vRecvMsg contains only completed CNetMessage
1334-
// the single possible partially deserialized message are held by TransportDeserializer
1335-
nSizeAdded += msg.m_raw_message_size;
1336-
}
1337-
{
1338-
LOCK(pnode->cs_vProcessMsg);
1339-
pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg);
1340-
pnode->nProcessQueueSize += nSizeAdded;
1341-
pnode->fPauseRecv = pnode->nProcessQueueSize > nReceiveFloodSize;
1342-
}
1331+
pnode->MarkReceivedMsgsForProcessing(nReceiveFloodSize);
13431332
WakeMessageHandler();
13441333
}
13451334
}
@@ -1722,7 +1711,7 @@ void CConnman::ThreadOpenConnections(const std::vector<std::string> connect)
17221711
if (pnode->IsBlockOnlyConn()) nOutboundBlockRelay++;
17231712

17241713
// Make sure our persistent outbound slots belong to different netgroups.
1725-
switch (pnode->m_conn_type) {
1714+
switch (pnode->GetConnectionType()) {
17261715
// We currently don't take inbound connections into account. Since they are
17271716
// free to make, an attacker could make them to prevent us from connecting to
17281717
// certain peers.
@@ -2806,6 +2795,37 @@ CNode::CNode(NodeId idIn,
28062795
}
28072796
}
28082797

2798+
void CNode::MarkReceivedMsgsForProcessing(unsigned int recv_flood_size)
2799+
{
2800+
AssertLockNotHeld(m_msg_process_queue_mutex);
2801+
2802+
size_t nSizeAdded = 0;
2803+
for (const auto& msg : vRecvMsg) {
2804+
// vRecvMsg contains only completed CNetMessage
2805+
// the single possible partially deserialized message are held by TransportDeserializer
2806+
nSizeAdded += msg.m_raw_message_size;
2807+
}
2808+
2809+
LOCK(m_msg_process_queue_mutex);
2810+
m_msg_process_queue.splice(m_msg_process_queue.end(), vRecvMsg);
2811+
m_msg_process_queue_size += nSizeAdded;
2812+
fPauseRecv = m_msg_process_queue_size > recv_flood_size;
2813+
}
2814+
2815+
std::optional<std::pair<CNetMessage, bool>> CNode::PollMessage(size_t recv_flood_size)
2816+
{
2817+
LOCK(m_msg_process_queue_mutex);
2818+
if (m_msg_process_queue.empty()) return std::nullopt;
2819+
2820+
std::list<CNetMessage> msgs;
2821+
// Just take one message
2822+
msgs.splice(msgs.begin(), m_msg_process_queue, m_msg_process_queue.begin());
2823+
m_msg_process_queue_size -= msgs.front().m_raw_message_size;
2824+
fPauseRecv = m_msg_process_queue_size > recv_flood_size;
2825+
2826+
return std::make_pair(std::move(msgs.front()), !m_msg_process_queue.empty());
2827+
}
2828+
28092829
bool CConnman::NodeFullyConnected(const CNode* pnode)
28102830
{
28112831
return pnode && pnode->fSuccessfullyConnected && !pnode->fDisconnect;
@@ -2840,7 +2860,7 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
28402860
bool optimisticSend(pnode->vSendMsg.empty());
28412861

28422862
//log total amount of bytes per message type
2843-
pnode->mapSendBytesPerMsgType[msg.m_type] += nTotalSize;
2863+
pnode->AccountForSentBytes(msg.m_type, nTotalSize);
28442864
pnode->nSendSize += nTotalSize;
28452865

28462866
if (pnode->nSendSize > nSendBufferMaxSize) pnode->fPauseSend = true;

src/net.h

+28-7
Original file line numberDiff line numberDiff line change
@@ -347,9 +347,6 @@ struct CNodeOptions
347347
/** Information about a peer */
348348
class CNode
349349
{
350-
friend class CConnman;
351-
friend struct ConnmanTestMsg;
352-
353350
public:
354351
const std::unique_ptr<TransportDeserializer> m_deserializer; // Used only by SocketHandler thread
355352
const std::unique_ptr<const TransportSerializer> m_serializer;
@@ -376,10 +373,6 @@ class CNode
376373
Mutex m_sock_mutex;
377374
Mutex cs_vRecv;
378375

379-
RecursiveMutex cs_vProcessMsg;
380-
std::list<CNetMessage> vProcessMsg GUARDED_BY(cs_vProcessMsg);
381-
size_t nProcessQueueSize GUARDED_BY(cs_vProcessMsg){0};
382-
383376
uint64_t nRecvBytes GUARDED_BY(cs_vRecv){0};
384377

385378
std::atomic<std::chrono::seconds> m_last_send{0s};
@@ -417,6 +410,30 @@ class CNode
417410
std::atomic_bool fPauseRecv{false};
418411
std::atomic_bool fPauseSend{false};
419412

413+
const ConnectionType& GetConnectionType() const
414+
{
415+
return m_conn_type;
416+
}
417+
418+
/** Move all messages from the received queue to the processing queue. */
419+
void MarkReceivedMsgsForProcessing(unsigned int recv_flood_size)
420+
EXCLUSIVE_LOCKS_REQUIRED(!m_msg_process_queue_mutex);
421+
422+
/** Poll the next message from the processing queue of this connection.
423+
*
424+
* Returns std::nullopt if the processing queue is empty, or a pair
425+
* consisting of the message and a bool that indicates if the processing
426+
* queue has more entries. */
427+
std::optional<std::pair<CNetMessage, bool>> PollMessage(size_t recv_flood_size)
428+
EXCLUSIVE_LOCKS_REQUIRED(!m_msg_process_queue_mutex);
429+
430+
/** Account for the total size of a sent message in the per msg type connection stats. */
431+
void AccountForSentBytes(const std::string& msg_type, size_t sent_bytes)
432+
EXCLUSIVE_LOCKS_REQUIRED(cs_vSend)
433+
{
434+
mapSendBytesPerMsgType[msg_type] += sent_bytes;
435+
}
436+
420437
bool IsOutboundOrBlockRelayConn() const {
421438
switch (m_conn_type) {
422439
case ConnectionType::OUTBOUND_FULL_RELAY:
@@ -602,6 +619,10 @@ class CNode
602619

603620
std::list<CNetMessage> vRecvMsg; // Used only by SocketHandler thread
604621

622+
Mutex m_msg_process_queue_mutex;
623+
std::list<CNetMessage> m_msg_process_queue GUARDED_BY(m_msg_process_queue_mutex);
624+
size_t m_msg_process_queue_size GUARDED_BY(m_msg_process_queue_mutex){0};
625+
605626
// Our address, as reported by the peer
606627
CService addrLocal GUARDED_BY(m_addr_local_mutex);
607628
mutable Mutex m_addr_local_mutex;

src/net_processing.cpp

+8-13
Original file line numberDiff line numberDiff line change
@@ -4860,8 +4860,6 @@ bool PeerManagerImpl::ProcessMessages(CNode* pfrom, std::atomic<bool>& interrupt
48604860
{
48614861
AssertLockHeld(g_msgproc_mutex);
48624862

4863-
bool fMoreWork = false;
4864-
48654863
PeerRef peer = GetPeerRef(pfrom->GetId());
48664864
if (peer == nullptr) return false;
48674865

@@ -4889,17 +4887,14 @@ bool PeerManagerImpl::ProcessMessages(CNode* pfrom, std::atomic<bool>& interrupt
48894887
// Don't bother if send buffer is too full to respond anyway
48904888
if (pfrom->fPauseSend) return false;
48914889

4892-
std::list<CNetMessage> msgs;
4893-
{
4894-
LOCK(pfrom->cs_vProcessMsg);
4895-
if (pfrom->vProcessMsg.empty()) return false;
4896-
// Just take one message
4897-
msgs.splice(msgs.begin(), pfrom->vProcessMsg, pfrom->vProcessMsg.begin());
4898-
pfrom->nProcessQueueSize -= msgs.front().m_raw_message_size;
4899-
pfrom->fPauseRecv = pfrom->nProcessQueueSize > m_connman.GetReceiveFloodSize();
4900-
fMoreWork = !pfrom->vProcessMsg.empty();
4901-
}
4902-
CNetMessage& msg(msgs.front());
4890+
auto poll_result{pfrom->PollMessage(m_connman.GetReceiveFloodSize())};
4891+
if (!poll_result) {
4892+
// No message to process
4893+
return false;
4894+
}
4895+
4896+
CNetMessage& msg{poll_result->first};
4897+
bool fMoreWork = poll_result->second;
49034898

49044899
TRACE6(net, inbound_message,
49054900
pfrom->GetId(),

src/test/util/net.cpp

+1-12
Original file line numberDiff line numberDiff line change
@@ -66,18 +66,7 @@ void ConnmanTestMsg::NodeReceiveMsgBytes(CNode& node, Span<const uint8_t> msg_by
6666
{
6767
assert(node.ReceiveMsgBytes(msg_bytes, complete));
6868
if (complete) {
69-
size_t nSizeAdded = 0;
70-
for (const auto& msg : node.vRecvMsg) {
71-
// vRecvMsg contains only completed CNetMessage
72-
// the single possible partially deserialized message are held by TransportDeserializer
73-
nSizeAdded += msg.m_raw_message_size;
74-
}
75-
{
76-
LOCK(node.cs_vProcessMsg);
77-
node.vProcessMsg.splice(node.vProcessMsg.end(), node.vRecvMsg);
78-
node.nProcessQueueSize += nSizeAdded;
79-
node.fPauseRecv = node.nProcessQueueSize > nReceiveFloodSize;
80-
}
69+
node.MarkReceivedMsgsForProcessing(nReceiveFloodSize);
8170
}
8271
}
8372

0 commit comments

Comments
 (0)