diff --git a/plugins/net_plugin/include/sysio/net_plugin/protocol.hpp b/plugins/net_plugin/include/sysio/net_plugin/protocol.hpp index 24ef3f6535..c2283b98fd 100644 --- a/plugins/net_plugin/include/sysio/net_plugin/protocol.hpp +++ b/plugins/net_plugin/include/sysio/net_plugin/protocol.hpp @@ -22,15 +22,8 @@ namespace sysio { return enc.result(); } - struct chain_size_message { - uint32_t last_irreversible_block_num = 0; - block_id_type last_irreversible_block_id; - uint32_t head_num = 0; - block_id_type head_id; - }; - struct handshake_message { - uint16_t network_version = 0; ///< incremental value above a computed base + uint16_t network_version = 0; ///< network protocol version chain_id_type chain_id; ///< used to identify chain fc::sha256 node_id; ///< used to identify peers and prevent self-connect chain::public_key_type key; ///< authentication key; may be a producer or peer key, or empty @@ -55,8 +48,6 @@ namespace sysio { wrong_chain, ///< the peer's chain id doesn't match wrong_version, ///< the peer's network version doesn't match forked, ///< the peer's irreversible blocks are different - unlinkable, ///< the peer sent a block we couldn't use - bad_transaction, ///< the peer sent a transaction that failed verification validation, ///< the peer sent a block that failed validation benign_other, ///< reasons such as a timeout. not fatal but warrant resetting fatal_other, ///< a catch-all for errors we don't have discriminated @@ -71,8 +62,6 @@ namespace sysio { case go_away_reason::wrong_chain : return "wrong chain"; case go_away_reason::wrong_version : return "wrong version"; case go_away_reason::forked : return "chain is forked"; - case go_away_reason::unlinkable : return "unlinkable block received"; - case go_away_reason::bad_transaction : return "bad transaction"; case go_away_reason::validation : return "invalid block"; case go_away_reason::authentication : return "authentication failure"; case go_away_reason::fatal_other : return "some other failure"; @@ -97,46 +86,20 @@ namespace sysio { return fmt::format("time_message(org={}, rec={}, xmt={}, dst={})", tm.org, tm.rec, tm.xmt, tm.dst); } - enum id_list_modes { - none, - catch_up, - last_irr_catch_up, - normal - }; - - constexpr auto modes_str( id_list_modes m ) { - switch( m ) { - case none : return "none"; - case catch_up : return "catch up"; - case last_irr_catch_up : return "last irreversible"; - case normal : return "normal"; - default: return "undefined mode"; - } - } - - template - struct select_ids { - select_ids() : mode(none),pending(0),ids() {} - id_list_modes mode{none}; - uint32_t pending{0}; - vector ids; - bool empty () const { return (mode == none || ids.empty()); } - bool operator==(const select_ids&) const noexcept = default; + struct peer_status_notice { + bool lib_sync{false}; + block_id_type fork_db_root_id; + block_id_type fork_db_head_id; + uint32_t earliest_available_block_num{0}; }; - using ordered_txn_ids = select_ids; - using ordered_blk_ids = select_ids; - - struct notice_message { - notice_message() : known_trx(), known_blocks() {} - ordered_txn_ids known_trx; - ordered_blk_ids known_blocks; + struct block_request_message { + block_id_type my_head_id; }; - struct request_message { - request_message() : req_trx(), req_blocks() {} - ordered_txn_ids req_trx; - ordered_blk_ids req_blocks; + struct block_nack_request_message { + block_id_type target_id; + block_id_type my_head_id; }; struct sync_request_message { @@ -194,34 +157,34 @@ namespace sysio { }; using net_message = std::variant; // see protocol net_message enum class msg_type_t { - handshake_message = fc::get_index(), - chain_size_message = fc::get_index(), - go_away_message = fc::get_index(), - time_message = fc::get_index(), - notice_message = fc::get_index(), - request_message = fc::get_index(), - sync_request_message = fc::get_index(), - signed_block = fc::get_index(), - transaction_message = fc::get_index(), - vote_message = fc::get_index(), - block_nack_message = fc::get_index(), - block_notice_message = fc::get_index(), + handshake_message = fc::get_index(), + go_away_message = fc::get_index(), + time_message = fc::get_index(), + peer_status_notice = fc::get_index(), + block_request_message = fc::get_index(), + sync_request_message = fc::get_index(), + signed_block = fc::get_index(), + transaction_message = fc::get_index(), + vote_message = fc::get_index(), + block_nack_message = fc::get_index(), + block_nack_request_message = fc::get_index(), + block_notice_message = fc::get_index(), gossip_bp_peers_message = fc::get_index(), transaction_notice_message = fc::get_index(), unknown @@ -240,10 +203,6 @@ namespace sysio { } // namespace sysio -FC_REFLECT( sysio::select_ids, (mode)(pending)(ids) ) -FC_REFLECT( sysio::chain_size_message, - (last_irreversible_block_num)(last_irreversible_block_id) - (head_num)(head_id)) FC_REFLECT( sysio::handshake_message, (network_version)(chain_id)(node_id)(key) (time)(token)(sig)(p2p_address) @@ -252,8 +211,9 @@ FC_REFLECT( sysio::handshake_message, (os)(agent)(generation) ) FC_REFLECT( sysio::go_away_message, (reason)(node_id) ) FC_REFLECT( sysio::time_message, (org)(rec)(xmt)(dst) ) -FC_REFLECT( sysio::notice_message, (known_trx)(known_blocks) ) -FC_REFLECT( sysio::request_message, (req_trx)(req_blocks) ) +FC_REFLECT( sysio::peer_status_notice, (lib_sync)(fork_db_root_id)(fork_db_head_id)(earliest_available_block_num) ) +FC_REFLECT( sysio::block_request_message, (my_head_id) ) +FC_REFLECT( sysio::block_nack_request_message, (target_id)(my_head_id) ) FC_REFLECT( sysio::sync_request_message, (start_block)(end_block) ) FC_REFLECT( sysio::block_nack_message, (id) ) FC_REFLECT( sysio::block_notice_message, (previous)(id) ) diff --git a/plugins/net_plugin/src/net_plugin.cpp b/plugins/net_plugin/src/net_plugin.cpp index 3b57c23dc6..7a8674bdcb 100644 --- a/plugins/net_plugin/src/net_plugin.cpp +++ b/plugins/net_plugin/src/net_plugin.cpp @@ -69,8 +69,8 @@ namespace sysio { struct peer_sync_state { enum class sync_t { peer_sync, // sync_request_message, syncing - peer_catchup, // head catchup, syncing request_message:catch_up - block_nack // sync due to block nack (block_notice_message) request_message:normal + peer_catchup, // head catchup, syncing block_request_message + block_nack // sync due to block nack (block_notice_message) block_nack_request_message }; peer_sync_state(uint32_t start, uint32_t end, uint32_t last_acted, sync_t sync_type) :start_block( start ), end_block( end ), last( last_acted ), sync_type( sync_type ) @@ -226,7 +226,7 @@ namespace sysio { void sync_recv_block( const connection_ptr& c, const block_id_type& blk_id, uint32_t blk_num, const fc::microseconds& blk_latency ); void recv_handshake( const connection_ptr& c, const handshake_message& msg, uint32_t nblk_combined_latency ); - void sync_recv_notice( const connection_ptr& c, const notice_message& msg ); + void sync_recv_status( const connection_ptr& c, const peer_status_notice& msg ); void send_handshakes_if_synced(const fc::microseconds& blk_latency); }; @@ -271,7 +271,6 @@ namespace sysio { void bcast_block( const signed_block_ptr& b, const block_id_type& id ); void expire_blocks( uint32_t fork_db_root_num ); - void recv_notice(const connection_ptr& conn, const notice_message& msg, bool generated); bool add_peer_block( const block_id_type& blkid, connection_id_t connection_id ); bool peer_has_block(const block_id_type& blkid, connection_id_t connection_id) const; @@ -296,40 +295,16 @@ namespace sysio { }; /** - * For a while, network version was a 16 bit value equal to the second set of 16 bits - * of the current build's git commit id. We are now replacing that with an integer protocol - * identifier. Based on historical analysis of all git commit identifiers, the larges gap - * between ajacent commit id values is shown below. - * these numbers were found with the following commands on the master branch: - * - * git log | grep "^commit" | awk '{print substr($2,5,4)}' | sort -u > sorted.txt - * rm -f gap.txt; prev=0; for a in $(cat sorted.txt); do echo $prev $((0x$a - 0x$prev)) $a >> gap.txt; prev=$a; done; sort -k2 -n gap.txt | tail - * - * DO NOT EDIT net_version_base OR net_version_range! - */ - constexpr uint16_t net_version_base = 0x04b5; - constexpr uint16_t net_version_range = 106; - /** - * If there is a change to network protocol or behavior, increment net version to identify - * the need for compatibility hooks + * Wire protocol version. Version 1 is the Wire genesis protocol. + * Increment for future protocol changes that need compatibility hooks. + * The version value is sent directly as handshake_message::network_version. */ enum class proto_version_t : uint16_t { - base = 0, - explicit_sync = 1, // version at time of sysio 1.0 - block_id_notify = 2, // reserved. feature was removed. next net_version should be 3 - pruned_types = 3, // sysio 2.1: supports new signed_block & packed_transaction types - heartbeat_interval = 4, // sysio 2.1: supports configurable heartbeat interval - dup_goaway_resolution = 5, // sysio 2.1: support peer address based duplicate connection resolution - dup_node_id_goaway = 6, // sysio 2.1: support peer node_id based duplicate connection resolution - leap_initial = 7, // leap client, needed because none of the 2.1 versions are supported - block_range = 8, // include block range in notice_message - savanna = 9, // savanna, adds vote_message - block_nack = 10, // adds block_nack_message & block_notice_message - gossip_bp_peers = 11, // adds gossip_bp_peers_message - trx_notice = 12 // adds transaction_notice_message + base = 1 // Wire genesis protocol (Savanna, block_nack, gossip_bp_peers, trx_notice, etc.) }; - constexpr proto_version_t net_version_max = proto_version_t::trx_notice; + constexpr proto_version_t net_version_min = proto_version_t::base; + constexpr proto_version_t net_version_max = proto_version_t::base; /** * default value initializers @@ -910,7 +885,7 @@ namespace sysio { alignas(hardware_destructive_interference_sz) std::atomic peer_syncing_from_us{false}; - std::atomic protocol_version = proto_version_t::base; + std::atomic protocol_version{static_cast(0)}; proto_version_t net_version = net_version_max; std::atomic consecutive_immediate_connection_close = 0; // bp_config = p2p-auto-bp-peer, bp_gossip = validated gossip connection, @@ -1067,7 +1042,6 @@ namespace sysio { bool is_valid( const handshake_message& msg ) const; void handle_message( const handshake_message& msg ); - void handle_message( const chain_size_message& msg ); void handle_message( const go_away_message& msg ); /** \name Peer Timestamps * Time message handling @@ -1084,8 +1058,9 @@ namespace sysio { */ void handle_message( const time_message& msg ); /** @} */ - void handle_message( const notice_message& msg ); - void handle_message( const request_message& msg ); + void handle_message( const peer_status_notice& msg ); + void handle_message( const block_request_message& msg ); + void handle_message( const block_nack_request_message& msg ); void handle_message( const sync_request_message& msg ); void handle_message( const signed_block& msg ) = delete; // signed_block_ptr overload used instead void handle_message( const block_id_type& id, signed_block_ptr ptr ); @@ -1168,12 +1143,6 @@ namespace sysio { c->handle_message( msg ); } - void operator()( const chain_size_message& msg ) const { - // continue call to handle_message on connection strand - peer_dlog( p2p_msg_log, c, "handle chain_size_message" ); - c->handle_message( msg ); - } - void operator()( const go_away_message& msg ) const { // continue call to handle_message on connection strand peer_dlog( p2p_msg_log, c, "handle go_away_message" ); @@ -1186,15 +1155,21 @@ namespace sysio { c->handle_message( msg ); } - void operator()( const notice_message& msg ) const { + void operator()( const peer_status_notice& msg ) const { + // continue call to handle_message on connection strand + peer_dlog( p2p_msg_log, c, "handle peer_status_notice" ); + c->handle_message( msg ); + } + + void operator()( const block_request_message& msg ) const { // continue call to handle_message on connection strand - peer_dlog( p2p_msg_log, c, "handle notice_message" ); + peer_dlog( p2p_msg_log, c, "handle block_request_message" ); c->handle_message( msg ); } - void operator()( const request_message& msg ) const { + void operator()( const block_nack_request_message& msg ) const { // continue call to handle_message on connection strand - peer_dlog( p2p_msg_log, c, "handle request_message" ); + peer_dlog( p2p_msg_log, c, "handle block_nack_request_message" ); c->handle_message( msg ); } @@ -1566,9 +1541,7 @@ namespace sysio { peer_dlog(p2p_blk_log, this, "head_num = {}", head_num); if(head_num == 0) { - notice_message note; - note.known_blocks.mode = normal; - note.known_blocks.pending = 0; + peer_status_notice note; enqueue(note); return; } @@ -2348,12 +2321,6 @@ namespace sysio { } } - inline block_id_type make_block_id( uint32_t block_num ) { - chain::block_id_type block_id; - block_id._hash[0] = fc::endian_reverse_u32(block_num); - return block_id; - } - // called from c's connection strand void sync_manager::recv_handshake( const connection_ptr& c, const handshake_message& msg, uint32_t nblk_combined_latency ) { @@ -2369,10 +2336,10 @@ namespace sysio { // // 0. my head block id == peer head id means we are all caught up block wise // 1. my head block num < peer froot then start sync locally by sending handshake - // 2. my froot > peer head num + nblk_combined_latency then send last_irr_catch_up notice if not the first generation + // 2. my froot > peer head num + nblk_combined_latency then send last_irr_catch_up notice // // 3 my head block num + nblk_combined_latency < peer head block num then update sync state and send a catchup request - // 4 my head block num >= peer block num + nblk_combined_latency send a notice catchup if this is not the first generation + // 4 my head block num >= peer block num + nblk_combined_latency send a notice catchup // 4.1 if peer appears to be on a different fork ( our_id_for( msg.head_num ) != msg.head_id ) // then request peer's blocks // @@ -2399,21 +2366,14 @@ namespace sysio { peer_dlog( p2p_blk_log, c, "handshake msg.froot {}, msg.fhead {}, msg.id {}.. sync 2, fhead {}, froot {}", msg.fork_db_root_num, msg.fork_db_head_num, msg.fork_db_head_id.str().substr(8,16), chain_info.fork_db_head_num, chain_info.fork_db_root_num); - if (msg.generation > 1 || c->protocol_version > proto_version_t::base) { - controller& cc = my_impl->chain_plug->chain(); - notice_message note; - note.known_trx.pending = chain_info.fork_db_root_num; - note.known_trx.mode = last_irr_catch_up; - note.known_blocks.mode = last_irr_catch_up; - note.known_blocks.pending = chain_info.fork_db_head_num; - note.known_blocks.ids.push_back(chain_info.fork_db_head_id); - if (c->protocol_version >= proto_version_t::block_range) { - // begin, more efficient to encode a block num instead of retrieving actual block id - note.known_blocks.ids.push_back(make_block_id(cc.earliest_available_block_num())); - } - c->enqueue( note ); - c->peer_syncing_from_us = true; - } + controller& cc = my_impl->chain_plug->chain(); + peer_status_notice note; + note.lib_sync = true; + note.fork_db_root_id = chain_info.fork_db_root_id; + note.fork_db_head_id = chain_info.fork_db_head_id; + note.earliest_available_block_num = cc.earliest_available_block_num(); + c->enqueue( note ); + c->peer_syncing_from_us = true; return; } @@ -2428,27 +2388,18 @@ namespace sysio { peer_dlog( p2p_blk_log, c, "handshake msg.froot {}, msg.fhead {}, msg.id {}.. sync 4, fhead {}, froot {}", msg.fork_db_root_num, msg.fork_db_head_num, msg.fork_db_head_id.str().substr(8,16), chain_info.fork_db_head_num, chain_info.fork_db_root_num); - if (msg.generation > 1 || c->protocol_version > proto_version_t::base) { - controller& cc = my_impl->chain_plug->chain(); - notice_message note; - note.known_trx.mode = none; - note.known_blocks.mode = catch_up; - note.known_blocks.pending = chain_info.fork_db_head_num; - note.known_blocks.ids.push_back(chain_info.fork_db_head_id); - if (c->protocol_version >= proto_version_t::block_range) { - // begin, more efficient to encode a block num instead of retrieving actual block id - note.known_blocks.ids.push_back(make_block_id(cc.earliest_available_block_num())); - } - c->enqueue( note ); - } + controller& cc = my_impl->chain_plug->chain(); + peer_status_notice note; + note.fork_db_root_id = chain_info.fork_db_root_id; + note.fork_db_head_id = chain_info.fork_db_head_id; + note.earliest_available_block_num = cc.earliest_available_block_num(); + c->enqueue( note ); c->peer_syncing_from_us = false; try { auto [on_fork, unknown_block] = block_on_fork(msg.fork_db_head_id); // thread safe if (on_fork) { // possible for fork_db_root to move and fork_db_head_num not be found if running with no block-log - peer_dlog(p2p_msg_log, c, "Sending catch_up request_message sync 4, msg.fhead {} on fork", msg.fork_db_head_id); - request_message req; - req.req_blocks.mode = catch_up; - req.req_trx.mode = none; + peer_dlog(p2p_msg_log, c, "Sending block_request_message sync 4, msg.fhead {} on fork", msg.fork_db_head_id); + block_request_message req; // my_head_id zero = send from root c->enqueue( req ); } } catch( ... ) {} @@ -2461,20 +2412,17 @@ namespace sysio { // called from c's connection strand bool sync_manager::verify_catchup(const connection_ptr& c, uint32_t num, const block_id_type& id) { - request_message req; - req.req_blocks.mode = catch_up; - auto is_fork_db_head_greater = [num, &id, &req]( const auto& cc ) { + bool already_have = false; + auto is_fork_db_head_greater = [num, &id, &already_have]( const auto& cc ) { fc::lock_guard g_conn( cc->conn_mtx ); if( cc->conn_fork_db_head_num > num || cc->conn_fork_db_head == id ) { - req.req_blocks.mode = none; + already_have = true; return true; } return false; }; - if (my_impl->connections.any_of_block_connections(is_fork_db_head_greater)) { - req.req_blocks.mode = none; - } - if( req.req_blocks.mode == catch_up ) { + my_impl->connections.any_of_block_connections(is_fork_db_head_greater); + if( !already_have ) { { fc::lock_guard g( sync_mtx ); peer_ilog( p2p_blk_log, c, "catch_up while in {}, fhead = {} " @@ -2494,47 +2442,48 @@ namespace sysio { c->conn_fork_db_head_num = num; } - req.req_blocks.ids.emplace_back( chain_info.fork_db_head_id ); + block_request_message req; + req.my_head_id = chain_info.fork_db_head_id; + c->enqueue( req ); } else { - peer_ilog( p2p_blk_log, c, "none notice while in {}, fhead = {}, id {}...", + peer_ilog( p2p_blk_log, c, "already have block while in {}, fhead = {}, id {}...", stage_str( sync_state ), num, id.str().substr(8,16) ); - fc::lock_guard g_conn( c->conn_mtx ); - c->conn_fork_db_head = block_id_type(); - c->conn_fork_db_head_num = 0; + { + fc::lock_guard g_conn( c->conn_mtx ); + c->conn_fork_db_head = block_id_type(); + c->conn_fork_db_head_num = 0; + } + // Send handshake so remote peer re-evaluates sync state and clears peer_syncing_from_us. + // Previously the old request_message with mode=none served this purpose. + c->send_handshake(); } - req.req_trx.mode = none; - c->enqueue( req ); return true; } // called from c's connection strand - void sync_manager::sync_recv_notice( const connection_ptr& c, const notice_message& msg) { - peer_dlog( p2p_blk_log, c, "sync_manager got {} block notice", modes_str( msg.known_blocks.mode ) ); - SYS_ASSERT( msg.known_blocks.mode == catch_up || msg.known_blocks.mode == last_irr_catch_up, plugin_exception, - "sync_recv_notice only called on catch_up" ); - if (msg.known_blocks.mode == catch_up) { - if (msg.known_blocks.ids.empty()) { - peer_wlog( p2p_blk_log, c, "got a catch up with ids size = 0" ); - } else { - const block_id_type& id = msg.known_blocks.ids.front(); - peer_ilog( p2p_blk_log, c, "notice_message, pending {}, blk_num {}, id {}...", - msg.known_blocks.pending, block_header::num_from_id(id), id.str().substr(8,16) ); - if( !my_impl->dispatcher.have_block( id ) ) { - verify_catchup( c, msg.known_blocks.pending, id ); - } else { - // we already have the block, so update peer with our view of the world - peer_dlog(p2p_blk_log, c, "Already have block, sending handshake"); - c->send_handshake(); - } - } - } else if (msg.known_blocks.mode == last_irr_catch_up) { + void sync_manager::sync_recv_status( const connection_ptr& c, const peer_status_notice& msg) { + if (msg.lib_sync) { + uint32_t root_num = block_header::num_from_id(msg.fork_db_root_id); + peer_dlog( p2p_blk_log, c, "sync_manager got lib_sync peer_status_notice" ); + c->peer_fork_db_root_num.store( root_num, std::memory_order_relaxed ); { - c->peer_fork_db_root_num.store( msg.known_trx.pending, std::memory_order_relaxed ); fc::lock_guard g_conn( c->conn_mtx ); - c->last_handshake_recv.fork_db_root_num = msg.known_trx.pending; + c->last_handshake_recv.fork_db_root_num = root_num; } sync_reset_fork_db_root_num(c, false); - start_sync(c, msg.known_trx.pending); + start_sync(c, root_num); + } else { + uint32_t head_num = block_header::num_from_id(msg.fork_db_head_id); + peer_dlog( p2p_blk_log, c, "sync_manager got catch_up peer_status_notice" ); + peer_ilog( p2p_blk_log, c, "peer_status_notice, head_num {}, id {}...", + head_num, msg.fork_db_head_id.str().substr(8,16) ); + if( !my_impl->dispatcher.have_block( msg.fork_db_head_id ) ) { + verify_catchup( c, head_num, msg.fork_db_head_id ); + } else { + // we already have the block, so update peer with our view of the world + peer_dlog(p2p_blk_log, c, "Already have block, sending handshake"); + c->send_handshake(); + } } } @@ -2888,7 +2837,7 @@ namespace sysio { return; } - if (cp->protocol_version >= proto_version_t::block_nack && !my_impl->p2p_disable_block_nack) { + if (!my_impl->p2p_disable_block_nack) { if (cp->consecutive_blocks_nacks > connection::consecutive_block_nacks_threshold) { // only send block_notice if we didn't produce the block, otherwise broadcast the block below if (!my_impl->producer_plug->producer_accounts().contains(b->producer)) { @@ -2920,7 +2869,6 @@ namespace sysio { my_impl->connections.for_each_block_connection( [exclude_peer, &msg]( auto& cp ) { if( !cp->current() ) return true; if( cp->connection_id == exclude_peer ) return true; - if (cp->protocol_version < proto_version_t::savanna) return true; fc_dlog(vote_logger, "sending vote msg, connection - {}", cp->connection_id); cp->queue_write_mt( msg_type_t::vote_message, queued_buffer::queue_t::general, msg, go_away_reason::no_reason ); return true; @@ -2935,7 +2883,7 @@ namespace sysio { if( !cp->is_transactions_connection() || !cp->current() ) { return; } - if( std::find(trx_connections.begin(), trx_connections.end(), cp->connection_id) != trx_connections.end() ) { + if( std::binary_search(trx_connections.begin(), trx_connections.end(), cp->connection_id) ) { return; } @@ -2949,7 +2897,7 @@ namespace sysio { void dispatch_manager::bcast_transaction_notify(const packed_transaction_ptr& trx) { trx_buffer_factory buff_factory; my_impl->connections.for_each_connection( [&]( const connection_ptr& cp ) { - if( cp->protocol_version < proto_version_t::trx_notice || !cp->is_transactions_connection() || !cp->current() ) { + if( !cp->is_transactions_connection() || !cp->current() ) { return; } @@ -2965,21 +2913,6 @@ namespace sysio { // keep rejected transaction around for awhile so we don't broadcast it, don't remove from local_txns } - // called from c's connection strand - void dispatch_manager::recv_notice(const connection_ptr& c, const notice_message& msg, bool generated) { - if (msg.known_trx.mode == normal) { - } else if (msg.known_trx.mode != none) { - peer_wlog( p2p_msg_log, c, "passed a notice_message with something other than a normal on none known_trx" ); - return; - } - if (msg.known_blocks.mode == normal) { - return; - } else if (msg.known_blocks.mode != none) { - peer_wlog( p2p_msg_log, c, "passed a notice_message with something other than a normal on none known_blocks" ); - return; - } - } - //------------------------------------------------------------------------ // called from connection strand @@ -3326,25 +3259,25 @@ namespace sysio { } // Early dedup: check if we already have this transaction — zero heap allocations on the duplicate path. - // Peek the transaction ID (first field after variant which) for zero-allocation dedup. + // Consume which + trx_id from buffer // Wire format: [which (varint)][transaction_id (32 bytes)][packed_transaction ...] - auto peek_ds = pending_message_buffer.create_peek_datastream(); + const auto bytes_before = pending_message_buffer.bytes_to_read(); + auto ds = pending_message_buffer.create_datastream(); unsigned_int which{}; - fc::raw::unpack( peek_ds, which ); + fc::raw::unpack( ds, which ); transaction_id_type trx_id; - fc::raw::unpack( peek_ds, trx_id ); + fc::raw::unpack( ds, trx_id ); + const auto header_bytes = bytes_before - pending_message_buffer.bytes_to_read(); + if( my_impl->dispatcher.have_peer_txn( trx_id, *this ) ) { peer_dlog( p2p_trx_log, this, "got a duplicate transaction - dropping {}", trx_id ); - pending_message_buffer.advance_read_ptr( message_length ); + pending_message_buffer.advance_read_ptr( message_length - header_bytes ); return true; } const uint32_t trx_in_progress_sz = this->trx_in_progress_size.load(); auto now = fc::time_point::now(); - auto ds = pending_message_buffer.create_datastream(); - fc::raw::unpack( ds, which ); // consume which - fc::raw::unpack( ds, trx_id ); // consume trx_id // shared_ptr needed here because packed_transaction_ptr is shared_ptr std::shared_ptr ptr = std::make_shared(); fc::raw::unpack( ds, *ptr ); @@ -3435,32 +3368,29 @@ namespace sysio { return true; } - // Early dedup: peek the vote_id (first field after variant which) for zero-allocation dedup. + // Early dedup: consume which + vote_id from buffer // Wire format: [which (varint)][vote_id (32 bytes)][vote_message ...] - auto peek_ds = pending_message_buffer.create_peek_datastream(); + const auto bytes_before = pending_message_buffer.bytes_to_read(); + auto ds = pending_message_buffer.create_datastream(); unsigned_int which{}; - fc::raw::unpack( peek_ds, which ); + fc::raw::unpack( ds, which ); assert(to_msg_type_t(which) == msg_type_t::vote_message); // verified by caller vote_id_type vote_id; - fc::raw::unpack( peek_ds, vote_id ); + fc::raw::unpack( ds, vote_id ); + const auto header_bytes = bytes_before - pending_message_buffer.bytes_to_read(); if( my_impl->dispatcher.have_vote( vote_id ) ) { peer_dlog( vote_logger, this, "duplicate vote - dropping {}", vote_id ); - pending_message_buffer.advance_read_ptr( message_length ); + pending_message_buffer.advance_read_ptr( message_length - header_bytes ); return true; } - - // Full deserialization path: consume which + vote_id, then unpack vote_message. - auto ds = pending_message_buffer.create_datastream(); - fc::raw::unpack( ds, which ); // consume which - vote_id_type consumed_id; - fc::raw::unpack( ds, consumed_id ); // consume vote_id vote_message_ptr ptr = std::make_shared(); fc::raw::unpack( ds, *ptr ); // Validate that the wire vote_id matches the actual computed vote_id. - if( compute_vote_id(*ptr) != vote_id ) { - peer_wlog( vote_logger, this, "vote_message ID mismatch: wire={} actual={}", vote_id, compute_vote_id(*ptr) ); + auto computed_vote_id = compute_vote_id(*ptr); + if( computed_vote_id != vote_id ) { + peer_wlog( vote_logger, this, "vote_message ID mismatch: wire={} actual={}", vote_id, computed_vote_id ); close(); return true; } @@ -3471,7 +3401,7 @@ namespace sysio { // called from connection strand void connection::send_block_nack(const block_id_type& block_id) { - if (protocol_version < proto_version_t::block_nack || my_impl->p2p_disable_block_nack) + if (my_impl->p2p_disable_block_nack) return; if (my_impl->sync_master->syncing_from_peer()) @@ -3590,10 +3520,6 @@ namespace sysio { return valid; } - void connection::handle_message( const chain_size_message& msg ) { - peer_dlog(p2p_msg_log, this, "received chain_size_message"); - } - // called from connection strand void connection::handle_message( const handshake_message& msg ) { if( !is_valid( msg ) ) { @@ -3613,6 +3539,13 @@ namespace sysio { set_state(connection_state::connected); if (msg.generation == 1) { + if( msg.chain_id != my_impl->chain_id ) { + peer_ilog( p2p_conn_log, this, "Peer on a different chain. Closing connection" ); + no_retry = go_away_reason::wrong_chain; + enqueue( go_away_message(go_away_reason::wrong_chain) ); + return; + } + if( msg.node_id == my_impl->node_id) { peer_ilog( p2p_conn_log, this, "Self connection detected node_id {}. Closing connection", msg.node_id); no_retry = go_away_reason::self; @@ -3675,13 +3608,14 @@ namespace sysio { return; } - if( msg.chain_id != my_impl->chain_id ) { - peer_ilog( p2p_conn_log, this, "Peer on a different chain. Closing connection" ); - no_retry = go_away_reason::wrong_chain; - enqueue( go_away_message(go_away_reason::wrong_chain) ); + protocol_version = net_plugin_impl::to_protocol_version(msg.network_version); + if( protocol_version < net_version_min ) { + peer_ilog( p2p_conn_log, this, "Peer protocol version {} below minimum {}, disconnecting", + static_cast(protocol_version.load()), static_cast(net_version_min) ); + no_retry = go_away_reason::wrong_version; + enqueue( go_away_message( go_away_reason::wrong_version ) ); return; } - protocol_version = net_plugin_impl::to_protocol_version(msg.network_version); if( protocol_version != net_version ) { peer_ilog( p2p_conn_log, this, "Local network version different: {} Remote version: {}", static_cast(net_version), static_cast(protocol_version.load()) ); @@ -3720,14 +3654,6 @@ namespace sysio { } } - // we don't support the 2.1 packed_transaction & signed_block, so tell 2.1 clients we are 2.0 - if( protocol_version >= proto_version_t::pruned_types && protocol_version < proto_version_t::leap_initial ) { - sent_handshake_count = 0; - net_version = proto_version_t::explicit_sync; - send_handshake(); - return; - } - if( sent_handshake_count == 0 ) { send_handshake(); } @@ -3776,22 +3702,6 @@ namespace sysio { close( retry ); // reconnect if wrong_version } - // some clients before leap 5.0 provided microsecond epoch instead of nanosecond epoch - std::chrono::nanoseconds normalize_epoch_to_ns(int64_t x) { - // 1686211688888 milliseconds - 2023-06-08T08:08:08.888, 5yrs from SYS genesis 2018-06-08T08:08:08.888 - // 1686211688888000 microseconds - // 1686211688888000000 nanoseconds - if (x >= 1686211688888000000) // nanoseconds - return std::chrono::nanoseconds{x}; - if (x >= 1686211688888000) // microseconds - return std::chrono::nanoseconds{x*1000}; - if (x >= 1686211688888) // milliseconds - return std::chrono::nanoseconds{x*1000*1000}; - if (x >= 1686211688) // seconds - return std::chrono::nanoseconds{x*1000*1000*1000}; - return std::chrono::nanoseconds{0}; // unknown or is zero - } - void connection::handle_message( const time_message& msg ) { peer_dlog( p2p_msg_log, this, "received time_message: {}, org: {}", msg, org.count() ); @@ -3813,7 +3723,7 @@ namespace sysio { } } - auto msg_xmt = normalize_epoch_to_ns(msg.xmt); + std::chrono::nanoseconds msg_xmt{msg.xmt}; if (msg_xmt == xmt) return; // duplicate packet @@ -3825,7 +3735,7 @@ namespace sysio { } if (org != std::chrono::nanoseconds{0}) { - auto rec = normalize_epoch_to_ns(msg.rec); + std::chrono::nanoseconds rec{msg.rec}; int64_t offset = (double((rec - org).count()) + double(msg_xmt.count() - msg.dst)) / 2.0; if (std::abs(offset) > block_interval_ns) { @@ -3847,127 +3757,33 @@ namespace sysio { } } - void connection::handle_message( const notice_message& msg ) { - // peer tells us about one or more blocks or txns. When done syncing, forward on - // notices of previously unknown blocks or txns, - // + void connection::handle_message( const peer_status_notice& msg ) { set_state(connection_state::connected); - if( msg.known_blocks.ids.size() > 2 ) { - peer_wlog( p2p_msg_log, this, "Invalid notice_message, known_blocks.ids.size {}, closing connection", - msg.known_blocks.ids.size() ); - close( false ); - return; - } - if( msg.known_trx.mode != none ) { - if( p2p_msg_log.is_enabled( fc::log_level::debug ) ) { - const block_id_type& blkid = msg.known_blocks.ids.empty() ? block_id_type{} : msg.known_blocks.ids.front(); - peer_dlog( p2p_msg_log, this, "this is a {} notice with {} pending blocks: {} {}...", - modes_str(msg.known_blocks.mode), msg.known_blocks.pending, - block_header::num_from_id( blkid ), blkid.str().substr( 8, 16 ) ); - } - } - switch (msg.known_trx.mode) { - case none: - case last_irr_catch_up: { - fc::unique_lock g_conn( conn_mtx ); - last_handshake_recv.fork_db_head_num = std::max(msg.known_blocks.pending, last_handshake_recv.fork_db_head_num); - g_conn.unlock(); - break; - } - case catch_up: - break; - case normal: { - my_impl->dispatcher.recv_notice( shared_from_this(), msg, false ); - } - } - - if( msg.known_blocks.mode != none ) { - peer_dlog( p2p_msg_log, this, "this is a {} notice with {} blocks", - modes_str( msg.known_blocks.mode ), msg.known_blocks.pending ); - } - switch (msg.known_blocks.mode) { - case none : { - break; - } - case last_irr_catch_up: - case catch_up: { - if (msg.known_blocks.ids.size() > 1) { - peer_start_block_num = block_header::num_from_id(msg.known_blocks.ids[1]); - } - if (msg.known_blocks.ids.size() > 0) { - peer_fork_db_head_block_num = block_header::num_from_id(msg.known_blocks.ids[0]); - } - my_impl->sync_master->sync_recv_notice( shared_from_this(), msg ); - break; - } - case normal : { - my_impl->dispatcher.recv_notice( shared_from_this(), msg, false ); - break; - } - default: { - peer_wlog( p2p_msg_log, this, "bad notice_message : invalid known_blocks.mode {}", - static_cast(msg.known_blocks.mode) ); - } + uint32_t head_num = block_header::num_from_id(msg.fork_db_head_id); + fc::unique_lock g_conn(conn_mtx); + last_handshake_recv.fork_db_head_num = std::max(head_num, last_handshake_recv.fork_db_head_num); + g_conn.unlock(); + if (head_num > 0) { + peer_start_block_num = msg.earliest_available_block_num; + peer_fork_db_head_block_num = head_num; + my_impl->sync_master->sync_recv_status(shared_from_this(), msg); } } - void connection::handle_message( const request_message& msg ) { - if( msg.req_blocks.ids.size() > 2 ) { - peer_wlog( p2p_blk_log, this, "Invalid request_message, req_blocks.ids.size {}, closing", - msg.req_blocks.ids.size() ); - close(); - return; - } - - switch (msg.req_blocks.mode) { - case catch_up : { - const block_id_type& id = msg.req_blocks.ids.empty() ? block_id_type() : msg.req_blocks.ids.back(); - peer_dlog( p2p_blk_log, this, "{} request_message:catch_up #{}:{}", - is_blocks_connection() ? "received" : "ignoring", block_header::num_from_id(id), id ); - if (!is_blocks_connection()) - return; - blk_send_branch( id ); - return; - } - case normal : { - if (protocol_version >= proto_version_t::block_nack) { - if (msg.req_blocks.ids.size() == 2 && msg.req_trx.ids.empty()) { - const block_id_type& req_id = msg.req_blocks.ids[0]; // 0 - req_id, 1 - peer_head_id - peer_dlog( p2p_blk_log, this, "{} request_message:normal #{}:{}", - is_blocks_connection() ? "received" : "ignoring", block_header::num_from_id(req_id), req_id ); - if (!is_blocks_connection()) - return; - const block_id_type& peer_head_id = msg.req_blocks.ids[1]; - blk_send_branch_from_nack_request(req_id, peer_head_id); - return; - } - } - peer_wlog( p2p_blk_log, this, "Invalid request_message, req_blocks.mode = normal" ); - close(); - return; - } - default:; - } - + void connection::handle_message( const block_request_message& msg ) { + peer_dlog(p2p_blk_log, this, "{} block_request_message #{}:{}", + is_blocks_connection() ? "received" : "ignoring", + block_header::num_from_id(msg.my_head_id), msg.my_head_id); + if (!is_blocks_connection()) return; + blk_send_branch(msg.my_head_id); + } - switch (msg.req_trx.mode) { - case catch_up : - break; - case none : - if( msg.req_blocks.mode == none ) { - peer_syncing_from_us = false; - } - if( !msg.req_trx.ids.empty() ) { - peer_wlog( p2p_msg_log, this, "Invalid request_message, req_trx.mode=none, req_trx.ids.size {}", msg.req_trx.ids.size() ); - close(); - } - break; - case normal : - peer_wlog( p2p_msg_log, this, "Invalid request_message, req_trx.mode=normal" ); - close(); - break; - default:; - } + void connection::handle_message( const block_nack_request_message& msg ) { + peer_dlog(p2p_blk_log, this, "{} block_nack_request_message #{}:{}", + is_blocks_connection() ? "received" : "ignoring", + block_header::num_from_id(msg.target_id), msg.target_id); + if (!is_blocks_connection()) return; + blk_send_branch_from_nack_request(msg.target_id, msg.my_head_id); } void connection::handle_message( const sync_request_message& msg ) { @@ -4051,22 +3867,21 @@ namespace sysio { my_impl->dispatcher.add_peer_block(msg.id, connection_id); } else if (!my_impl->dispatcher.have_block(msg.previous)) { // still don't have previous block peer_dlog(p2p_blk_log, this, "Received unknown block notice, checking already requested"); - request_message req; - req.req_blocks.mode = normal; - req.req_blocks.ids.push_back(msg.previous); - bool already_requested = my_impl->connections.any_of_block_connections([&req](const auto& c) { + const block_id_type& target = msg.previous; + bool already_requested = my_impl->connections.any_of_block_connections([&target](const auto& c) { fc::lock_guard g_conn(c->conn_mtx); - return c->last_block_nack_request_message_id == req.req_blocks.ids[0]; + return c->last_block_nack_request_message_id == target; }); if (!already_requested) { peer_ilog(p2p_blk_log, this, "Received unknown block notice, requesting blocks from {}", block_header::num_from_id(msg.previous)); - block_id_type head_id = my_impl->get_chain_info().head_id; - req.req_blocks.ids.push_back(head_id); + block_nack_request_message req; + req.target_id = msg.previous; + req.my_head_id = my_impl->get_chain_info().head_id; send_block_nack({}); { fc::lock_guard g_conn(conn_mtx); - last_block_nack_request_message_id = req.req_blocks.ids[0]; + last_block_nack_request_message_id = target; } enqueue(req); } @@ -4127,7 +3942,7 @@ namespace sysio { // called from connection strand void connection::send_gossip_bp_peers_initial_message() { - if (protocol_version < proto_version_t::gossip_bp_peers || !my_impl->bp_gossip_enabled()) + if (!my_impl->bp_gossip_enabled()) return; peer_dlog(p2p_msg_log, this, "sending initial gossip_bp_peers_message"); const auto& sb = my_impl->get_gossip_bp_initial_send_buffer(); @@ -4151,7 +3966,7 @@ namespace sysio { assert(my_impl->bp_gossip_enabled()); my_impl->connections.for_each_connection([](const connection_ptr& c) { gossip_buffer_factory factory; - if (c->protocol_version >= proto_version_t::gossip_bp_peers && c->socket_is_open()) { + if (c->socket_is_open()) { if (c->bp_connection == bp_connection_type::bp_gossip) { const send_buffer_type& sb = my_impl->get_gossip_bp_send_buffer(factory); boost::asio::post(c->strand, [sb, c]() { @@ -4208,8 +4023,6 @@ namespace sysio { return; controller& cc = my_impl->chain_plug->chain(); - // Wire starts in Savanna at genesis, so all blocks are Savanna blocks. - std::optional obh; bool exception = false; fork_db_add_t fork_db_add_result = fork_db_add_t::failure; @@ -4503,8 +4316,7 @@ namespace sysio { chain::public_key_type peer_key; try { peer_key = crypto::public_key::recover(msg.sig, msg.token); - } - catch (const std::exception& /*e*/) { + } catch (...) { fc_wlog( p2p_conn_log, "Peer {} sent a handshake with an unrecoverable key.", msg.p2p_address); return false; } @@ -4545,7 +4357,7 @@ namespace sysio { // nothing as changed since last handshake and one was sent recently, so skip sending if (chain_info.fork_db_head_id == hello.fork_db_head_id && (hello.time + hs_delay > now)) return false; - hello.network_version = net_version_base + static_cast(net_version); + hello.network_version = static_cast(net_version); hello.fork_db_root_num = chain_info.fork_db_root_num; hello.fork_db_root_id = chain_info.fork_db_root_id; hello.fork_db_head_num = chain_info.fork_db_head_num; @@ -5000,11 +4812,9 @@ namespace sysio { } constexpr proto_version_t net_plugin_impl::to_protocol_version(uint16_t v) { - if (v >= net_version_base) { - v -= net_version_base; - return (v > net_version_range) ? proto_version_t::base : static_cast(v); - } - return proto_version_t::base; + if (v > static_cast(net_version_max)) + return static_cast(0); + return static_cast(v); } bool net_plugin_impl::is_lib_catchup() const { diff --git a/tests/trx_generator/CMakeLists.txt b/tests/trx_generator/CMakeLists.txt index 77da92d191..127ec53bc2 100644 --- a/tests/trx_generator/CMakeLists.txt +++ b/tests/trx_generator/CMakeLists.txt @@ -1,10 +1,12 @@ add_executable(trx_generator main.cpp trx_generator.cpp trx_provider.cpp) -target_include_directories(trx_generator PUBLIC ${CMAKE_CURRENT_BINARY_DIR} ${CMAKE_CURRENT_SOURCE_DIR}) +target_include_directories(trx_generator PUBLIC ${CMAKE_CURRENT_BINARY_DIR} ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_SOURCE_DIR}/plugins/net_plugin/include) target_link_libraries(trx_generator PRIVATE sysio_chain fc Boost::program_options ${CMAKE_DL_LIBS} ${PLATFORM_SPECIFIC_LIBS}) add_executable(trx_generator_tests trx_generator_tests.cpp trx_provider.cpp trx_generator.cpp) target_link_libraries(trx_generator_tests PRIVATE sysio_chain fc Boost::program_options ${CMAKE_DL_LIBS} ${PLATFORM_SPECIFIC_LIBS}) -target_include_directories(trx_generator_tests PUBLIC ${CMAKE_CURRENT_BINARY_DIR} ${CMAKE_CURRENT_SOURCE_DIR}) +target_include_directories(trx_generator_tests PUBLIC ${CMAKE_CURRENT_BINARY_DIR} ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_SOURCE_DIR}/plugins/net_plugin/include) add_np_test(NAME trx_generator_tests COMMAND trx_generator_tests) diff --git a/tests/trx_generator/trx_provider.cpp b/tests/trx_generator/trx_provider.cpp index 03abeccdef..0b91f07aa3 100644 --- a/tests/trx_generator/trx_provider.cpp +++ b/tests/trx_generator/trx_provider.cpp @@ -1,20 +1,23 @@ #include #include +#include +#include + #include #include #include #include -#include namespace sysio::testing { using namespace boost::asio; using namespace std::literals::string_literals; - using ip::tcp; + using boost::asio::ip::tcp; constexpr auto message_header_size = sizeof(uint32_t); - constexpr uint32_t trx_msg_which = 8; // transaction_message in net_message variant + constexpr uint32_t trx_msg_which = sysio::to_index(sysio::msg_type_t::transaction_message); + static_assert(trx_msg_which == fc::get_index()); static send_buffer_type create_send_buffer( const chain::packed_transaction& m ) { // Build wire bytes matching transaction_message: [size][which][trx_id][packed_transaction]