diff --git a/zebra-rpc/src/methods.rs b/zebra-rpc/src/methods.rs index f55ad77a8a7..57f8f12ec48 100644 --- a/zebra-rpc/src/methods.rs +++ b/zebra-rpc/src/methods.rs @@ -341,6 +341,24 @@ pub trait Rpc { address_strings: AddressStrings, ) -> Result>; + /// Invalidates a block if it is not yet finalized, removing it from the non-finalized + /// state if it is present and rejecting it during contextual validation if it is submitted. + /// + /// # Parameters + /// + /// - `block_hash`: (hex-encoded block hash, required) The block hash to invalidate. + // TODO: Invalidate block hashes even if they're not present in the non-finalized state. + #[method(name = "invalidateblock")] + async fn invalidate_block(&self, block_hash: GetBlockHash) -> Result<()>; + + /// Reconsiders a previously invalidated block if it exists in the cache of previously invalidated blocks. + /// + /// # Parameters + /// + /// - `block_hash`: (hex-encoded block hash, required) The block hash to reconsider. + #[method(name = "reconsiderblock")] + async fn reconsider_block(&self, block_hash: GetBlockHash) -> Result<()>; + /// Stop the running zebrad process. /// /// # Notes @@ -357,7 +375,7 @@ pub trait Rpc { /// RPC method implementations. #[derive(Clone)] -pub struct RpcImpl +pub struct RpcImpl where Mempool: Service< mempool::Request, @@ -369,6 +387,15 @@ where + 'static, Mempool::Future: Send, State: Service< + zebra_state::Request, + Response = zebra_state::Response, + Error = zebra_state::BoxError, + > + Clone + + Send + + Sync + + 'static, + State::Future: Send, + ReadState: Service< zebra_state::ReadRequest, Response = zebra_state::ReadResponse, Error = zebra_state::BoxError, @@ -376,7 +403,7 @@ where + Send + Sync + 'static, - State::Future: Send, + ReadState::Future: Send, Tip: ChainTip + Clone + Send + Sync + 'static, AddressBook: AddressBookPeers + Clone + Send + Sync + 'static, { @@ -407,6 +434,9 @@ where /// A handle to the state service. state: State, + /// A handle to the state service. + read_state: ReadState, + /// Allows efficient access to the best tip of the blockchain. latest_chain_tip: Tip, @@ -425,7 +455,8 @@ where /// A type alias for the last event logged by the server. pub type LoggedLastEvent = watch::Receiver)>>; -impl Debug for RpcImpl +impl Debug + for RpcImpl where Mempool: Service< mempool::Request, @@ -437,6 +468,15 @@ where + 'static, Mempool::Future: Send, State: Service< + zebra_state::Request, + Response = zebra_state::Response, + Error = zebra_state::BoxError, + > + Clone + + Send + + Sync + + 'static, + State::Future: Send, + ReadState: Service< zebra_state::ReadRequest, Response = zebra_state::ReadResponse, Error = zebra_state::BoxError, @@ -444,7 +484,7 @@ where + Send + Sync + 'static, - State::Future: Send, + ReadState::Future: Send, Tip: ChainTip + Clone + Send + Sync + 'static, AddressBook: AddressBookPeers + Clone + Send + Sync + 'static, { @@ -460,7 +500,8 @@ where } } -impl RpcImpl +impl + RpcImpl where Mempool: Service< mempool::Request, @@ -472,6 +513,15 @@ where + 'static, Mempool::Future: Send, State: Service< + zebra_state::Request, + Response = zebra_state::Response, + Error = zebra_state::BoxError, + > + Clone + + Send + + Sync + + 'static, + State::Future: Send, + ReadState: Service< zebra_state::ReadRequest, Response = zebra_state::ReadResponse, Error = zebra_state::BoxError, @@ -479,7 +529,7 @@ where + Send + Sync + 'static, - State::Future: Send, + ReadState::Future: Send, Tip: ChainTip + Clone + Send + Sync + 'static, AddressBook: AddressBookPeers + Clone + Send + Sync + 'static, { @@ -496,6 +546,7 @@ where debug_like_zcashd: bool, mempool: Mempool, state: State, + read_state: ReadState, latest_chain_tip: Tip, address_book: AddressBook, last_warn_error_log_rx: LoggedLastEvent, @@ -521,7 +572,8 @@ where debug_force_finished_sync, debug_like_zcashd, mempool: mempool.clone(), - state: state.clone(), + state, + read_state: read_state.clone(), latest_chain_tip: latest_chain_tip.clone(), queue_sender, address_book, @@ -531,7 +583,7 @@ where // run the process queue let rpc_tx_queue_task_handle = tokio::spawn( runner - .run(mempool, state, latest_chain_tip, network) + .run(mempool, read_state, latest_chain_tip, network) .in_current_span(), ); @@ -540,7 +592,8 @@ where } #[async_trait] -impl RpcServer for RpcImpl +impl RpcServer + for RpcImpl where Mempool: Service< mempool::Request, @@ -552,6 +605,15 @@ where + 'static, Mempool::Future: Send, State: Service< + zebra_state::Request, + Response = zebra_state::Response, + Error = zebra_state::BoxError, + > + Clone + + Send + + Sync + + 'static, + State::Future: Send, + ReadState: Service< zebra_state::ReadRequest, Response = zebra_state::ReadResponse, Error = zebra_state::BoxError, @@ -559,7 +621,7 @@ where + Send + Sync + 'static, - State::Future: Send, + ReadState::Future: Send, Tip: ChainTip + Clone + Send + Sync + 'static, AddressBook: AddressBookPeers + Clone + Send + Sync + 'static, { @@ -620,7 +682,7 @@ where let (usage_info_rsp, tip_pool_values_rsp, chain_tip_difficulty) = { use zebra_state::ReadRequest::*; - let state_call = |request| self.state.clone().oneshot(request); + let state_call = |request| self.read_state.clone().oneshot(request); tokio::join!( state_call(UsageInfo), state_call(TipPoolValues), @@ -741,7 +803,7 @@ where } async fn get_address_balance(&self, address_strings: AddressStrings) -> Result { - let state = self.state.clone(); + let state = self.read_state.clone(); let valid_addresses = address_strings.valid_addresses()?; @@ -824,7 +886,7 @@ where // - use `height_from_signed_int()` to handle negative heights // (this might be better in the state request, because it needs the state height) async fn get_block(&self, hash_or_height: String, verbosity: Option) -> Result { - let mut state = self.state.clone(); + let mut state = self.read_state.clone(); let verbosity = verbosity.unwrap_or(1); let network = self.network.clone(); let original_hash_or_height = hash_or_height.clone(); @@ -1008,7 +1070,7 @@ where hash_or_height: String, verbose: Option, ) -> Result { - let state = self.state.clone(); + let state = self.read_state.clone(); let verbose = verbose.unwrap_or(true); let network = self.network.clone(); @@ -1232,7 +1294,7 @@ where txid: String, verbose: Option, ) -> Result { - let mut state = self.state.clone(); + let mut state = self.read_state.clone(); let mut mempool = self.mempool.clone(); let verbose = verbose.unwrap_or(0) != 0; @@ -1305,7 +1367,7 @@ where // - use `height_from_signed_int()` to handle negative heights // (this might be better in the state request, because it needs the state height) async fn z_get_treestate(&self, hash_or_height: String) -> Result { - let mut state = self.state.clone(); + let mut state = self.read_state.clone(); let network = self.network.clone(); let hash_or_height = @@ -1394,7 +1456,7 @@ where start_index: NoteCommitmentSubtreeIndex, limit: Option, ) -> Result { - let mut state = self.state.clone(); + let mut state = self.read_state.clone(); const POOL_LIST: &[&str] = &["sapling", "orchard"]; @@ -1460,7 +1522,7 @@ where } async fn get_address_tx_ids(&self, request: GetAddressTxIdsRequest) -> Result> { - let mut state = self.state.clone(); + let mut state = self.read_state.clone(); let latest_chain_tip = self.latest_chain_tip.clone(); let height_range = build_height_range( @@ -1515,7 +1577,7 @@ where &self, address_strings: AddressStrings, ) -> Result> { - let mut state = self.state.clone(); + let mut state = self.read_state.clone(); let mut response_utxos = vec![]; let valid_addresses = address_strings.valid_addresses()?; @@ -1567,6 +1629,24 @@ where Ok(response_utxos) } + async fn invalidate_block(&self, GetBlockHash(block_hash): GetBlockHash) -> Result<()> { + self.state + .clone() + .oneshot(zebra_state::Request::InvalidateBlock(block_hash)) + .await + .map(|rsp| assert_eq!(rsp, zebra_state::Response::Committed(block_hash))) + .map_misc_error() + } + + async fn reconsider_block(&self, GetBlockHash(block_hash): GetBlockHash) -> Result<()> { + self.state + .clone() + .oneshot(zebra_state::Request::ReconsiderBlock(block_hash)) + .await + .map(|rsp| assert_eq!(rsp, zebra_state::Response::Committed(block_hash))) + .map_misc_error() + } + fn stop(&self) -> Result { #[cfg(not(target_os = "windows"))] if self.network.is_regtest() { diff --git a/zebra-rpc/src/methods/tests/prop.rs b/zebra-rpc/src/methods/tests/prop.rs index dbac40d3fb3..d5c0bf5bf7a 100644 --- a/zebra-rpc/src/methods/tests/prop.rs +++ b/zebra-rpc/src/methods/tests/prop.rs @@ -938,6 +938,14 @@ fn mock_services( zebra_node_services::mempool::Response, zebra_test::mock_service::PropTestAssertion, >, + tower::buffer::Buffer< + zebra_test::mock_service::MockService< + zebra_state::Request, + zebra_state::Response, + zebra_test::mock_service::PropTestAssertion, + >, + zebra_state::Request, + >, tower::buffer::Buffer< zebra_test::mock_service::MockService< zebra_state::ReadRequest, @@ -956,6 +964,7 @@ where { let mempool = MockService::build().for_prop_tests(); let state = MockService::build().for_prop_tests(); + let read_state = MockService::build().for_prop_tests(); let (_tx, rx) = tokio::sync::watch::channel(None); let (rpc, mempool_tx_queue) = RpcImpl::new( @@ -965,11 +974,12 @@ where false, true, mempool.clone(), - Buffer::new(state.clone(), 1), + Buffer::new(state, 1), + Buffer::new(read_state.clone(), 1), chain_tip, MockAddressBookPeers::new(vec![]), rx, ); - (mempool, state, rpc, mempool_tx_queue) + (mempool, read_state, rpc, mempool_tx_queue) } diff --git a/zebra-rpc/src/methods/tests/snapshot.rs b/zebra-rpc/src/methods/tests/snapshot.rs index ab32234fccd..87561740254 100644 --- a/zebra-rpc/src/methods/tests/snapshot.rs +++ b/zebra-rpc/src/methods/tests/snapshot.rs @@ -105,7 +105,7 @@ async fn test_z_get_treestate() { .map(|(_, block_bytes)| block_bytes.zcash_deserialize_into().unwrap()) .collect(); - let (_, state, tip, _) = zebra_state::populated_state(blocks.clone(), &testnet).await; + let (state, read_state, tip, _) = zebra_state::populated_state(blocks.clone(), &testnet).await; let (_tx, rx) = tokio::sync::watch::channel(None); let (rpc, _) = RpcImpl::new( @@ -116,6 +116,7 @@ async fn test_z_get_treestate() { true, Buffer::new(MockService::build().for_unit_tests::<_, _, BoxError>(), 1), state, + read_state, tip, MockAddressBookPeers::new(vec![]), rx, @@ -192,7 +193,7 @@ async fn test_rpc_response_data_for_network(network: &Network) { get_block_template_rpcs::test_responses( network, mempool.clone(), - state, + state.clone(), read_state.clone(), settings.clone(), ) @@ -207,6 +208,7 @@ async fn test_rpc_response_data_for_network(network: &Network) { false, true, Buffer::new(mempool.clone(), 1), + state, read_state, latest_chain_tip, MockAddressBookPeers::new(vec![]), @@ -527,7 +529,8 @@ async fn test_mocked_rpc_response_data_for_network(network: &Network) { settings.set_snapshot_suffix(network_string(network)); let (latest_chain_tip, _) = MockChainTip::new(); - let mut state = MockService::build().for_unit_tests(); + let state = MockService::build().for_unit_tests(); + let mut read_state = MockService::build().for_unit_tests(); let mempool = MockService::build().for_unit_tests(); let (_tx, rx) = tokio::sync::watch::channel(None); @@ -539,6 +542,7 @@ async fn test_mocked_rpc_response_data_for_network(network: &Network) { true, mempool, state.clone(), + read_state.clone(), latest_chain_tip, MockAddressBookPeers::new(vec![]), rx, @@ -556,7 +560,7 @@ async fn test_mocked_rpc_response_data_for_network(network: &Network) { } // Prepare the response. - let rsp = state + let rsp = read_state .expect_request_that(|req| matches!(req, ReadRequest::SaplingSubtrees { .. })) .map(|responder| responder.respond(ReadResponse::SaplingSubtrees(subtrees))); @@ -584,7 +588,7 @@ async fn test_mocked_rpc_response_data_for_network(network: &Network) { } // Prepare the response. - let rsp = state + let rsp = read_state .expect_request_that(|req| matches!(req, ReadRequest::OrchardSubtrees { .. })) .map(|responder| responder.respond(ReadResponse::OrchardSubtrees(subtrees))); diff --git a/zebra-rpc/src/methods/tests/vectors.rs b/zebra-rpc/src/methods/tests/vectors.rs index 057ff93c5f3..dccb8f06816 100644 --- a/zebra-rpc/src/methods/tests/vectors.rs +++ b/zebra-rpc/src/methods/tests/vectors.rs @@ -28,6 +28,7 @@ async fn rpc_getinfo() { let mut mempool: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests(); let mut state: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests(); + let mut read_state: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests(); let (_tx, rx) = tokio::sync::watch::channel(None); let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( @@ -38,6 +39,7 @@ async fn rpc_getinfo() { true, Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), + Buffer::new(read_state.clone(), 1), NoChainTip, MockAddressBookPeers::new(vec![]), rx, @@ -46,7 +48,7 @@ async fn rpc_getinfo() { let getinfo_future = tokio::spawn(async move { rpc.get_info().await }); // Make the mock service respond with - let response_handler = state + let response_handler = read_state .expect_request(zebra_state::ReadRequest::ChainInfo) .await; response_handler.respond(zebra_state::ReadResponse::ChainInfo( @@ -140,7 +142,7 @@ async fn rpc_getblock() { let mut mempool: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests(); // Create a populated state service - let (_state, read_state, latest_chain_tip, _chain_tip_change) = + let (state, read_state, latest_chain_tip, _chain_tip_change) = zebra_state::populated_state(blocks.clone(), &Mainnet).await; // Init RPC @@ -152,6 +154,7 @@ async fn rpc_getblock() { false, true, Buffer::new(mempool.clone(), 1), + state, read_state.clone(), latest_chain_tip, MockAddressBookPeers::new(vec![]), @@ -581,6 +584,7 @@ async fn rpc_getblock_parse_error() { let mut mempool: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests(); let mut state: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests(); + let mut read_state: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests(); // Init RPC let (_tx, rx) = tokio::sync::watch::channel(None); @@ -592,6 +596,7 @@ async fn rpc_getblock_parse_error() { true, Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), + Buffer::new(read_state.clone(), 1), NoChainTip, MockAddressBookPeers::new(vec![]), rx, @@ -615,6 +620,7 @@ async fn rpc_getblock_parse_error() { mempool.expect_no_requests().await; state.expect_no_requests().await; + read_state.expect_no_requests().await; // The queue task should continue without errors or panics let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never(); @@ -627,6 +633,7 @@ async fn rpc_getblock_missing_error() { let mut mempool: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests(); let mut state: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests(); + let mut read_state: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests(); // Init RPC let (_tx, rx) = tokio::sync::watch::channel(None); @@ -638,6 +645,7 @@ async fn rpc_getblock_missing_error() { true, Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), + Buffer::new(read_state.clone(), 1), NoChainTip, MockAddressBookPeers::new(vec![]), rx, @@ -648,7 +656,7 @@ async fn rpc_getblock_missing_error() { let block_future = tokio::spawn(async move { rpc.get_block("0".to_string(), Some(0u8)).await }); // Make the mock service respond with no block - let response_handler = state + let response_handler = read_state .expect_request(zebra_state::ReadRequest::Block(Height(0).into())) .await; response_handler.respond(zebra_state::ReadResponse::Block(None)); @@ -672,6 +680,7 @@ async fn rpc_getblock_missing_error() { mempool.expect_no_requests().await; state.expect_no_requests().await; + read_state.expect_no_requests().await; // The queue task should continue without errors or panics let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never(); @@ -690,7 +699,7 @@ async fn rpc_getblockheader() { let mut mempool: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests(); // Create a populated state service - let (_state, read_state, latest_chain_tip, _chain_tip_change) = + let (state, read_state, latest_chain_tip, _chain_tip_change) = zebra_state::populated_state(blocks.clone(), &Mainnet).await; // Init RPC @@ -702,6 +711,7 @@ async fn rpc_getblockheader() { false, true, Buffer::new(mempool.clone(), 1), + state, read_state.clone(), latest_chain_tip, MockAddressBookPeers::new(vec![]), @@ -819,7 +829,7 @@ async fn rpc_getbestblockhash() { // Get a mempool handle let mut mempool: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests(); // Create a populated state service, the tip will be in `NUMBER_OF_BLOCKS`. - let (_state, read_state, latest_chain_tip, _chain_tip_change) = + let (state, read_state, latest_chain_tip, _chain_tip_change) = zebra_state::populated_state(blocks.clone(), &Mainnet).await; // Init RPC @@ -831,6 +841,7 @@ async fn rpc_getbestblockhash() { false, true, Buffer::new(mempool.clone(), 1), + state, read_state, latest_chain_tip, MockAddressBookPeers::new(vec![]), @@ -865,7 +876,7 @@ async fn rpc_getrawtransaction() { let mut mempool: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests(); // Create a populated state service - let (_state, read_state, _latest_chain_tip, _chain_tip_change) = + let (state, read_state, _latest_chain_tip, _chain_tip_change) = zebra_state::populated_state(blocks.clone(), &Mainnet).await; let (latest_chain_tip, latest_chain_tip_sender) = MockChainTip::new(); @@ -880,6 +891,7 @@ async fn rpc_getrawtransaction() { false, true, Buffer::new(mempool.clone(), 1), + state, read_state.clone(), latest_chain_tip, MockAddressBookPeers::new(vec![]), @@ -1052,7 +1064,7 @@ async fn rpc_getaddresstxids_invalid_arguments() { .collect(); // Create a populated state service - let (_state, read_state, latest_chain_tip, _chain_tip_change) = + let (state, read_state, latest_chain_tip, _chain_tip_change) = zebra_state::populated_state(blocks.clone(), &Mainnet).await; let (_tx, rx) = tokio::sync::watch::channel(None); @@ -1063,6 +1075,7 @@ async fn rpc_getaddresstxids_invalid_arguments() { false, true, Buffer::new(mempool.clone(), 1), + state, Buffer::new(read_state.clone(), 1), latest_chain_tip, MockAddressBookPeers::new(vec![]), @@ -1239,6 +1252,7 @@ async fn rpc_getaddresstxids_response_with( expected_response_len: usize, ) { let mut mempool: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests(); + let state: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests(); let (_tx, rx) = tokio::sync::watch::channel(None); let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( @@ -1248,6 +1262,7 @@ async fn rpc_getaddresstxids_response_with( false, true, Buffer::new(mempool.clone(), 1), + state, Buffer::new(read_state.clone(), 1), latest_chain_tip.clone(), MockAddressBookPeers::new(vec![]), @@ -1294,6 +1309,7 @@ async fn rpc_getaddressutxos_invalid_arguments() { let mut mempool: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests(); let mut state: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests(); + let mut read_state: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests(); let (_tx, rx) = tokio::sync::watch::channel(None); let rpc = RpcImpl::new( @@ -1304,6 +1320,7 @@ async fn rpc_getaddressutxos_invalid_arguments() { true, Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), + Buffer::new(read_state.clone(), 1), NoChainTip, MockAddressBookPeers::new(vec![]), rx, @@ -1320,6 +1337,7 @@ async fn rpc_getaddressutxos_invalid_arguments() { mempool.expect_no_requests().await; state.expect_no_requests().await; + read_state.expect_no_requests().await; } #[tokio::test(flavor = "multi_thread")] @@ -1340,7 +1358,7 @@ async fn rpc_getaddressutxos_response() { let mut mempool: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests(); // Create a populated state service - let (_state, read_state, latest_chain_tip, _chain_tip_change) = + let (state, read_state, latest_chain_tip, _chain_tip_change) = zebra_state::populated_state(blocks.clone(), &Mainnet).await; let (_tx, rx) = tokio::sync::watch::channel(None); @@ -1351,6 +1369,7 @@ async fn rpc_getaddressutxos_response() { false, true, Buffer::new(mempool.clone(), 1), + state, Buffer::new(read_state.clone(), 1), latest_chain_tip, MockAddressBookPeers::new(vec![]), diff --git a/zebra-rpc/src/server.rs b/zebra-rpc/src/server.rs index 15245ed7c49..730e8ae8de4 100644 --- a/zebra-rpc/src/server.rs +++ b/zebra-rpc/src/server.rs @@ -99,6 +99,7 @@ impl RpcServer { UserAgentString, Mempool, State, + ReadState, Tip, BlockVerifierRouter, SyncStatus, @@ -110,6 +111,7 @@ impl RpcServer { user_agent: UserAgentString, mempool: Mempool, state: State, + read_state: ReadState, block_verifier_router: BlockVerifierRouter, sync_status: SyncStatus, address_book: AddressBook, @@ -131,6 +133,15 @@ impl RpcServer { + 'static, Mempool::Future: Send, State: Service< + zebra_state::Request, + Response = zebra_state::Response, + Error = zebra_state::BoxError, + > + Clone + + Send + + Sync + + 'static, + State::Future: Send, + ReadState: Service< zebra_state::ReadRequest, Response = zebra_state::ReadResponse, Error = zebra_state::BoxError, @@ -138,7 +149,7 @@ impl RpcServer { + Send + Sync + 'static, - State::Future: Send, + ReadState::Future: Send, Tip: ChainTip + Clone + Send + Sync + 'static, BlockVerifierRouter: Service< zebra_consensus::Request, @@ -161,7 +172,7 @@ impl RpcServer { &network, mining_config.clone(), mempool.clone(), - state.clone(), + read_state.clone(), latest_chain_tip.clone(), block_verifier_router, sync_status, @@ -178,6 +189,7 @@ impl RpcServer { mining_config.debug_like_zcashd, mempool, state, + read_state, latest_chain_tip, address_book, last_event, diff --git a/zebra-rpc/src/server/tests/vectors.rs b/zebra-rpc/src/server/tests/vectors.rs index 183f54d23b9..5af5fd20b70 100644 --- a/zebra-rpc/src/server/tests/vectors.rs +++ b/zebra-rpc/src/server/tests/vectors.rs @@ -39,6 +39,7 @@ async fn rpc_server_spawn() { let mut mempool: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests(); let mut state: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests(); + let mut read_state: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests(); let mut block_verifier_router: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests(); @@ -52,6 +53,7 @@ async fn rpc_server_spawn() { "RPC server test", Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), + Buffer::new(read_state.clone(), 1), Buffer::new(block_verifier_router.clone(), 1), MockSyncStatus::default(), MockAddressBookPeers::default(), @@ -65,6 +67,7 @@ async fn rpc_server_spawn() { mempool.expect_no_requests().await; state.expect_no_requests().await; + read_state.expect_no_requests().await; block_verifier_router.expect_no_requests().await; } @@ -101,6 +104,7 @@ async fn rpc_spawn_unallocated_port(do_shutdown: bool) { let mut mempool: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests(); let mut state: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests(); + let mut read_state: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests(); let mut block_verifier_router: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests(); @@ -114,6 +118,7 @@ async fn rpc_spawn_unallocated_port(do_shutdown: bool) { "RPC server test", Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), + Buffer::new(read_state.clone(), 1), Buffer::new(block_verifier_router.clone(), 1), MockSyncStatus::default(), MockAddressBookPeers::default(), @@ -129,6 +134,7 @@ async fn rpc_spawn_unallocated_port(do_shutdown: bool) { mempool.expect_no_requests().await; state.expect_no_requests().await; + read_state.expect_no_requests().await; block_verifier_router.expect_no_requests().await; if do_shutdown { @@ -159,6 +165,7 @@ async fn rpc_server_spawn_port_conflict() { let mut mempool: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests(); let mut state: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests(); + let mut read_state: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests(); let mut block_verifier_router: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests(); @@ -172,6 +179,7 @@ async fn rpc_server_spawn_port_conflict() { "RPC server 1 test", Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), + Buffer::new(read_state.clone(), 1), Buffer::new(block_verifier_router.clone(), 1), MockSyncStatus::default(), MockAddressBookPeers::default(), @@ -193,6 +201,7 @@ async fn rpc_server_spawn_port_conflict() { "RPC server 2 conflict test", Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), + Buffer::new(read_state.clone(), 1), Buffer::new(block_verifier_router.clone(), 1), MockSyncStatus::default(), MockAddressBookPeers::default(), @@ -207,5 +216,6 @@ async fn rpc_server_spawn_port_conflict() { mempool.expect_no_requests().await; state.expect_no_requests().await; + read_state.expect_no_requests().await; block_verifier_router.expect_no_requests().await; } diff --git a/zebra-state/src/request.rs b/zebra-state/src/request.rs index 25f865dff69..3a111edfb4a 100644 --- a/zebra-state/src/request.rs +++ b/zebra-state/src/request.rs @@ -860,6 +860,13 @@ pub enum Request { /// Returns [`Response::KnownBlock(None)`](Response::KnownBlock) otherwise. KnownBlock(block::Hash), + /// Invalidates a block in the non-finalized state with the provided hash if one is present, removing it and + /// its child blocks, and rejecting it during contextual validation if it's resubmitted to the state. + InvalidateBlock(block::Hash), + + /// Reconsiders a previously invalidated block in the non-finalized state with the provided hash if one is present. + ReconsiderBlock(block::Hash), + /// Performs contextual validation of the given block, but does not commit it to the state. /// /// Returns [`Response::ValidBlockProposal`] when successful. @@ -890,6 +897,8 @@ impl Request { Request::BestChainNextMedianTimePast => "best_chain_next_median_time_past", Request::BestChainBlockHash(_) => "best_chain_block_hash", Request::KnownBlock(_) => "known_block", + Request::InvalidateBlock(_) => "invalidate_block", + Request::ReconsiderBlock(_) => "reconsider_block", Request::CheckBlockProposalValidity(_) => "check_block_proposal_validity", } } @@ -1266,7 +1275,9 @@ impl TryFrom for ReadRequest { } Request::CommitSemanticallyVerifiedBlock(_) - | Request::CommitCheckpointVerifiedBlock(_) => Err("ReadService does not write blocks"), + | Request::CommitCheckpointVerifiedBlock(_) + | Request::InvalidateBlock(_) + | Request::ReconsiderBlock(_) => Err("ReadService does not write blocks"), Request::AwaitUtxo(_) => Err("ReadService does not track pending UTXOs. \ Manually convert the request to ReadRequest::AnyChainUtxo, \ diff --git a/zebra-state/src/response.rs b/zebra-state/src/response.rs index 48132dffa4f..212acafc306 100644 --- a/zebra-state/src/response.rs +++ b/zebra-state/src/response.rs @@ -28,7 +28,9 @@ use crate::{service::read::AddressUtxos, TransactionLocation}; /// A response to a [`StateService`](crate::service::StateService) [`Request`]. pub enum Response { /// Response to [`Request::CommitSemanticallyVerifiedBlock`] indicating that a block was - /// successfully committed to the state. + /// successfully committed to the state, or a response to [`Request::InvalidateBlock`] or + /// [`Request::ReconsiderBlock`] indicating that the state attempted to send a message to + /// the block write task to invalidate or reconsider the block hash. Committed(block::Hash), /// Response to [`Request::Depth`] with the depth of the specified block. diff --git a/zebra-state/src/service.rs b/zebra-state/src/service.rs index 5130b6aca9a..2217f1e0ffc 100644 --- a/zebra-state/src/service.rs +++ b/zebra-state/src/service.rs @@ -32,6 +32,7 @@ use tracing::{instrument, Instrument, Span}; #[cfg(any(test, feature = "proptest-impl"))] use tower::buffer::Buffer; +use write::NonFinalizedWriteMessage; use zebra_chain::{ block::{self, CountedHeader, HeightDiff}, diagnostic::{task::WaitForPanics, CodeTimer}, @@ -125,18 +126,8 @@ pub(crate) struct StateService { /// Indexed by their parent block hash. finalized_state_queued_blocks: HashMap, - /// A channel to send blocks to the `block_write_task`, - /// so they can be written to the [`NonFinalizedState`]. - non_finalized_block_write_sender: - Option>, - - /// A channel to send blocks to the `block_write_task`, - /// so they can be written to the [`FinalizedState`]. - /// - /// This sender is dropped after the state has finished sending all the checkpointed blocks, - /// and the lowest semantically verified block arrives. - finalized_block_write_sender: - Option>, + /// Channels to send blocks to the block write task. + block_write_sender: write::BlockWriteSender, /// The [`block::Hash`] of the most recent block sent on /// `finalized_block_write_sender` or `non_finalized_block_write_sender`. @@ -236,8 +227,8 @@ impl Drop for StateService { // We want to do this here so we get any errors or panics from the block write task before it shuts down. self.invalid_block_write_reset_receiver.close(); - std::mem::drop(self.finalized_block_write_sender.take()); - std::mem::drop(self.non_finalized_block_write_sender.take()); + std::mem::drop(self.block_write_sender.finalized.take()); + std::mem::drop(self.block_write_sender.non_finalized.take()); self.clear_finalized_block_queue( "dropping the state: dropped unused finalized state queue block", @@ -334,35 +325,18 @@ impl StateService { let (non_finalized_state_sender, non_finalized_state_receiver) = watch::channel(NonFinalizedState::new(&finalized_state.network())); - // Security: The number of blocks in these channels is limited by - // the syncer and inbound lookahead limits. - let (non_finalized_block_write_sender, non_finalized_block_write_receiver) = - tokio::sync::mpsc::unbounded_channel(); - let (finalized_block_write_sender, finalized_block_write_receiver) = - tokio::sync::mpsc::unbounded_channel(); - let (invalid_block_reset_sender, invalid_block_write_reset_receiver) = - tokio::sync::mpsc::unbounded_channel(); - let finalized_state_for_writing = finalized_state.clone(); - let span = Span::current(); - let block_write_task = std::thread::spawn(move || { - span.in_scope(move || { - write::write_blocks_from_channels( - finalized_block_write_receiver, - non_finalized_block_write_receiver, - finalized_state_for_writing, - non_finalized_state, - invalid_block_reset_sender, - chain_tip_sender, - non_finalized_state_sender, - ) - }) - }); - let block_write_task = Arc::new(block_write_task); + let (block_write_sender, invalid_block_write_reset_receiver, block_write_task) = + write::BlockWriteSender::spawn( + finalized_state_for_writing, + non_finalized_state, + chain_tip_sender, + non_finalized_state_sender, + ); let read_service = ReadStateService::new( &finalized_state, - Some(block_write_task), + block_write_task, non_finalized_state_receiver, ); @@ -381,8 +355,7 @@ impl StateService { full_verifier_utxo_lookahead, non_finalized_state_queued_blocks, finalized_state_queued_blocks: HashMap::new(), - non_finalized_block_write_sender: Some(non_finalized_block_write_sender), - finalized_block_write_sender: Some(finalized_block_write_sender), + block_write_sender, finalized_block_write_last_sent_hash, non_finalized_block_write_sent_hashes: SentHashes::default(), invalid_block_write_reset_receiver, @@ -458,7 +431,7 @@ impl StateService { let (rsp_tx, rsp_rx) = oneshot::channel(); let queued = (checkpoint_verified, rsp_tx); - if self.finalized_block_write_sender.is_some() { + if self.block_write_sender.finalized.is_some() { // We're still committing checkpoint verified blocks if let Some(duplicate_queued) = self .finalized_state_queued_blocks @@ -545,7 +518,7 @@ impl StateService { // If we've finished sending finalized blocks, ignore any repeated blocks. // (Blocks can be repeated after a syncer reset.) - if let Some(finalized_block_write_sender) = &self.finalized_block_write_sender { + if let Some(finalized_block_write_sender) = &self.block_write_sender.finalized { let send_result = finalized_block_write_sender.send(queued_block); // If the receiver is closed, we can't send any more blocks. @@ -673,7 +646,7 @@ impl StateService { // that are a child of the last block we sent. // // TODO: configure the state with the last checkpoint hash instead? - if self.finalized_block_write_sender.is_some() + if self.block_write_sender.finalized.is_some() && self .non_finalized_state_queued_blocks .has_queued_children(self.finalized_block_write_last_sent_hash) @@ -682,7 +655,7 @@ impl StateService { { // Tell the block write task to stop committing checkpoint verified blocks to the finalized state, // and move on to committing semantically verified blocks to the non-finalized state. - std::mem::drop(self.finalized_block_write_sender.take()); + std::mem::drop(self.block_write_sender.finalized.take()); // Remove any checkpoint-verified block hashes from `non_finalized_block_write_sent_hashes`. self.non_finalized_block_write_sent_hashes = SentHashes::default(); // Mark `SentHashes` as usable by the `can_fork_chain_at()` method. @@ -697,7 +670,7 @@ impl StateService { ); } else if !self.can_fork_chain_at(&parent_hash) { tracing::trace!("unready to verify, returning early"); - } else if self.finalized_block_write_sender.is_none() { + } else if self.block_write_sender.finalized.is_none() { // Wait until block commit task is ready to write non-finalized blocks before dequeuing them self.send_ready_non_finalized_queued(parent_hash); @@ -738,7 +711,7 @@ impl StateService { #[tracing::instrument(level = "debug", skip(self, new_parent))] fn send_ready_non_finalized_queued(&mut self, new_parent: block::Hash) { use tokio::sync::mpsc::error::SendError; - if let Some(non_finalized_block_write_sender) = &self.non_finalized_block_write_sender { + if let Some(non_finalized_block_write_sender) = &self.block_write_sender.non_finalized { let mut new_parents: Vec = vec![new_parent]; while let Some(parent_hash) = new_parents.pop() { @@ -751,9 +724,9 @@ impl StateService { self.non_finalized_block_write_sent_hashes .add(&queued_child.0); - let send_result = non_finalized_block_write_sender.send(queued_child); + let send_result = non_finalized_block_write_sender.send(queued_child.into()); - if let Err(SendError(queued)) = send_result { + if let Err(SendError(NonFinalizedWriteMessage::Commit(queued))) = send_result { // If Zebra is shutting down, drop blocks and return an error. Self::send_semantically_verified_block_error( queued, @@ -1084,6 +1057,24 @@ impl Service for StateService { .boxed() } + req @ (Request::InvalidateBlock(_) | Request::ReconsiderBlock(_)) => { + use NonFinalizedWriteMessage::*; + + let (msg, hash) = match req { + Request::InvalidateBlock(hash) => (Invalidate(hash), hash), + Request::ReconsiderBlock(hash) => (Reconsider(hash), hash), + _ => unreachable!("Request::InvalidateBlock and Request::ReconsiderBlock"), + }; + + if let Some(non_finalized_block_write_sender) = + &self.block_write_sender.non_finalized + { + let _ = non_finalized_block_write_sender.send(msg); + } + + async move { Ok(Response::Committed(hash)) }.boxed() + } + // Runs concurrently using the ReadStateService Request::Tip | Request::Depth(_) diff --git a/zebra-state/src/service/write.rs b/zebra-state/src/service/write.rs index acbc5c14ce1..17a1ed47642 100644 --- a/zebra-state/src/service/write.rs +++ b/zebra-state/src/service/write.rs @@ -1,11 +1,14 @@ //! Writing blocks to the finalized and non-finalized states. +use std::sync::Arc; + use indexmap::IndexMap; use tokio::sync::{ mpsc::{UnboundedReceiver, UnboundedSender}, watch, }; +use tracing::Span; use zebra_chain::{ block::{self, Height}, transparent::EXTRA_ZEBRA_COINBASE_DATA, @@ -110,221 +113,343 @@ fn update_latest_chain_channels( tip_block_height } -/// Reads blocks from the channels, writes them to the `finalized_state` or `non_finalized_state`, -/// sends any errors on the `invalid_block_reset_sender`, then updates the `chain_tip_sender` and -/// `non_finalized_state_sender`. -// TODO: make the task an object -#[allow(clippy::too_many_arguments)] -#[instrument( - level = "debug", - skip( - finalized_block_write_receiver, - non_finalized_block_write_receiver, - finalized_state, - non_finalized_state, - invalid_block_reset_sender, - chain_tip_sender, - non_finalized_state_sender, - ), - fields( - network = %non_finalized_state.network - ) -)] -pub fn write_blocks_from_channels( - mut finalized_block_write_receiver: UnboundedReceiver, - mut non_finalized_block_write_receiver: UnboundedReceiver, - mut finalized_state: FinalizedState, - mut non_finalized_state: NonFinalizedState, +/// A worker task that reads, validates, and writes blocks to the +/// `finalized_state` or `non_finalized_state`. +struct WriteBlockWorkerTask { + finalized_block_write_receiver: UnboundedReceiver, + non_finalized_block_write_receiver: UnboundedReceiver, + finalized_state: FinalizedState, + non_finalized_state: NonFinalizedState, invalid_block_reset_sender: UnboundedSender, - mut chain_tip_sender: ChainTipSender, + chain_tip_sender: ChainTipSender, non_finalized_state_sender: watch::Sender, -) { - let mut last_zebra_mined_log_height = None; - let mut prev_finalized_note_commitment_trees = None; +} + +/// The message type for the non-finalized block write task channel. +pub enum NonFinalizedWriteMessage { + /// A newly downloaded and semantically verified block prepared for + /// contextual validation and insertion into the non-finalized state. + Commit(QueuedSemanticallyVerified), + /// The hash of a block that should be invalidated and removed from + /// the non-finalized state, if present. + Invalidate(block::Hash), + /// The hash of a block that was previously invalidated but should be + /// reconsidered and reinserted into the non-finalized state. + Reconsider(block::Hash), +} + +impl From for NonFinalizedWriteMessage { + fn from(block: QueuedSemanticallyVerified) -> Self { + NonFinalizedWriteMessage::Commit(block) + } +} + +/// A worker with a task that reads, validates, and writes blocks to the +/// `finalized_state` or `non_finalized_state` and channels for sending +/// it blocks. +#[derive(Clone, Debug)] +pub(super) struct BlockWriteSender { + /// A channel to send blocks to the `block_write_task`, + /// so they can be written to the [`NonFinalizedState`]. + pub non_finalized: Option>, + + /// A channel to send blocks to the `block_write_task`, + /// so they can be written to the [`FinalizedState`]. + /// + /// This sender is dropped after the state has finished sending all the checkpointed blocks, + /// and the lowest semantically verified block arrives. + pub finalized: Option>, +} + +impl BlockWriteSender { + /// Creates a new [`BlockWriteSender`] with the given receivers and states. + #[instrument( + level = "debug", + skip_all, + fields( + network = %non_finalized_state.network + ) + )] + pub fn spawn( + finalized_state: FinalizedState, + non_finalized_state: NonFinalizedState, + chain_tip_sender: ChainTipSender, + non_finalized_state_sender: watch::Sender, + ) -> ( + Self, + tokio::sync::mpsc::UnboundedReceiver, + Option>>, + ) { + // Security: The number of blocks in these channels is limited by + // the syncer and inbound lookahead limits. + let (non_finalized_block_write_sender, non_finalized_block_write_receiver) = + tokio::sync::mpsc::unbounded_channel(); + let (finalized_block_write_sender, finalized_block_write_receiver) = + tokio::sync::mpsc::unbounded_channel(); + let (invalid_block_reset_sender, invalid_block_write_reset_receiver) = + tokio::sync::mpsc::unbounded_channel(); + + let span = Span::current(); + let task = std::thread::spawn(move || { + span.in_scope(|| { + WriteBlockWorkerTask { + finalized_block_write_receiver, + non_finalized_block_write_receiver, + finalized_state, + non_finalized_state, + invalid_block_reset_sender, + chain_tip_sender, + non_finalized_state_sender, + } + .run() + }) + }); + + ( + Self { + non_finalized: Some(non_finalized_block_write_sender), + finalized: Some(finalized_block_write_sender), + }, + invalid_block_write_reset_receiver, + Some(Arc::new(task)), + ) + } +} + +impl WriteBlockWorkerTask { + /// Reads blocks from the channels, writes them to the `finalized_state` or `non_finalized_state`, + /// sends any errors on the `invalid_block_reset_sender`, then updates the `chain_tip_sender` and + /// `non_finalized_state_sender`. + #[instrument( + level = "debug", + skip(self), + fields( + network = %self.non_finalized_state.network + ) + )] + pub fn run(mut self) { + let Self { + finalized_block_write_receiver, + non_finalized_block_write_receiver, + finalized_state, + non_finalized_state, + invalid_block_reset_sender, + chain_tip_sender, + non_finalized_state_sender, + } = &mut self; + + let mut last_zebra_mined_log_height = None; + let mut prev_finalized_note_commitment_trees = None; + + // Write all the finalized blocks sent by the state, + // until the state closes the finalized block channel's sender. + while let Some(ordered_block) = finalized_block_write_receiver.blocking_recv() { + // TODO: split these checks into separate functions + + if invalid_block_reset_sender.is_closed() { + info!("StateService closed the block reset channel. Is Zebra shutting down?"); + return; + } + + // Discard any children of invalid blocks in the channel + // + // `commit_finalized()` requires blocks in height order. + // So if there has been a block commit error, + // we need to drop all the descendants of that block, + // until we receive a block at the required next height. + let next_valid_height = finalized_state + .db + .finalized_tip_height() + .map(|height| (height + 1).expect("committed heights are valid")) + .unwrap_or(Height(0)); + + if ordered_block.0.height != next_valid_height { + debug!( + ?next_valid_height, + invalid_height = ?ordered_block.0.height, + invalid_hash = ?ordered_block.0.hash, + "got a block that was the wrong height. \ + Assuming a parent block failed, and dropping this block", + ); + + // We don't want to send a reset here, because it could overwrite a valid sent hash + std::mem::drop(ordered_block); + continue; + } + + // Try committing the block + match finalized_state + .commit_finalized(ordered_block, prev_finalized_note_commitment_trees.take()) + { + Ok((finalized, note_commitment_trees)) => { + let tip_block = ChainTipBlock::from(finalized); + prev_finalized_note_commitment_trees = Some(note_commitment_trees); - // Write all the finalized blocks sent by the state, - // until the state closes the finalized block channel's sender. - while let Some(ordered_block) = finalized_block_write_receiver.blocking_recv() { - // TODO: split these checks into separate functions + log_if_mined_by_zebra(&tip_block, &mut last_zebra_mined_log_height); + chain_tip_sender.set_finalized_tip(tip_block); + } + Err(error) => { + let finalized_tip = finalized_state.db.tip(); + + // The last block in the queue failed, so we can't commit the next block. + // Instead, we need to reset the state queue, + // and discard any children of the invalid block in the channel. + info!( + ?error, + last_valid_height = ?finalized_tip.map(|tip| tip.0), + last_valid_hash = ?finalized_tip.map(|tip| tip.1), + "committing a block to the finalized state failed, resetting state queue", + ); + + let send_result = + invalid_block_reset_sender.send(finalized_state.db.finalized_tip_hash()); + + if send_result.is_err() { + info!( + "StateService closed the block reset channel. Is Zebra shutting down?" + ); + return; + } + } + } + } + + // Do this check even if the channel got closed before any finalized blocks were sent. + // This can happen if we're past the finalized tip. if invalid_block_reset_sender.is_closed() { info!("StateService closed the block reset channel. Is Zebra shutting down?"); return; } - // Discard any children of invalid blocks in the channel - // - // `commit_finalized()` requires blocks in height order. - // So if there has been a block commit error, - // we need to drop all the descendants of that block, - // until we receive a block at the required next height. - let next_valid_height = finalized_state - .db - .finalized_tip_height() - .map(|height| (height + 1).expect("committed heights are valid")) - .unwrap_or(Height(0)); - - if ordered_block.0.height != next_valid_height { - debug!( - ?next_valid_height, - invalid_height = ?ordered_block.0.height, - invalid_hash = ?ordered_block.0.hash, - "got a block that was the wrong height. \ - Assuming a parent block failed, and dropping this block", - ); + // Save any errors to propagate down to queued child blocks + let mut parent_error_map: IndexMap = IndexMap::new(); - // We don't want to send a reset here, because it could overwrite a valid sent hash - std::mem::drop(ordered_block); - continue; - } + while let Some(msg) = non_finalized_block_write_receiver.blocking_recv() { + let queued_child_and_rsp_tx = match msg { + NonFinalizedWriteMessage::Commit(queued_child) => Some(queued_child), + NonFinalizedWriteMessage::Invalidate(hash) => { + non_finalized_state.invalidate_block(hash); + None + } + NonFinalizedWriteMessage::Reconsider(_hash) => { + // TODO: Uncomment after #9260 has merged. + // non_finalized_state.reconsider_block(hash); + None + } + }; + + let Some((queued_child, rsp_tx)) = queued_child_and_rsp_tx else { + update_latest_chain_channels( + non_finalized_state, + chain_tip_sender, + non_finalized_state_sender, + &mut last_zebra_mined_log_height, + ); + continue; + }; - // Try committing the block - match finalized_state - .commit_finalized(ordered_block, prev_finalized_note_commitment_trees.take()) - { - Ok((finalized, note_commitment_trees)) => { - let tip_block = ChainTipBlock::from(finalized); - prev_finalized_note_commitment_trees = Some(note_commitment_trees); + let child_hash = queued_child.hash; + let parent_hash = queued_child.block.header.previous_block_hash; + let parent_error = parent_error_map.get(&parent_hash); - log_if_mined_by_zebra(&tip_block, &mut last_zebra_mined_log_height); + let result; - chain_tip_sender.set_finalized_tip(tip_block); - } - Err(error) => { - let finalized_tip = finalized_state.db.tip(); - - // The last block in the queue failed, so we can't commit the next block. - // Instead, we need to reset the state queue, - // and discard any children of the invalid block in the channel. - info!( - ?error, - last_valid_height = ?finalized_tip.map(|tip| tip.0), - last_valid_hash = ?finalized_tip.map(|tip| tip.1), - "committing a block to the finalized state failed, resetting state queue", + // If the parent block was marked as rejected, also reject all its children. + // + // At this point, we know that all the block's descendants + // are invalid, because we checked all the consensus rules before + // committing the failing ancestor block to the non-finalized state. + if let Some(parent_error) = parent_error { + tracing::trace!( + ?child_hash, + ?parent_error, + "rejecting queued child due to parent error" ); + result = Err(parent_error.clone()); + } else { + tracing::trace!(?child_hash, "validating queued child"); + result = validate_and_commit_non_finalized( + &finalized_state.db, + non_finalized_state, + queued_child, + ) + .map_err(CloneError::from); + } + + // TODO: fix the test timing bugs that require the result to be sent + // after `update_latest_chain_channels()`, + // and send the result on rsp_tx here - let send_result = - invalid_block_reset_sender.send(finalized_state.db.finalized_tip_hash()); + if let Err(ref error) = result { + // Update the caller with the error. + let _ = rsp_tx.send(result.clone().map(|()| child_hash).map_err(BoxError::from)); - if send_result.is_err() { - info!("StateService closed the block reset channel. Is Zebra shutting down?"); - return; + // If the block is invalid, mark any descendant blocks as rejected. + parent_error_map.insert(child_hash, error.clone()); + + // Make sure the error map doesn't get too big. + if parent_error_map.len() > PARENT_ERROR_MAP_LIMIT { + // We only add one hash at a time, so we only need to remove one extra here. + parent_error_map.shift_remove_index(0); } - } - } - } - // Do this check even if the channel got closed before any finalized blocks were sent. - // This can happen if we're past the finalized tip. - if invalid_block_reset_sender.is_closed() { - info!("StateService closed the block reset channel. Is Zebra shutting down?"); - return; - } + // Skip the things we only need to do for successfully committed blocks + continue; + } - // Save any errors to propagate down to queued child blocks - let mut parent_error_map: IndexMap = IndexMap::new(); - - while let Some((queued_child, rsp_tx)) = non_finalized_block_write_receiver.blocking_recv() { - let child_hash = queued_child.hash; - let parent_hash = queued_child.block.header.previous_block_hash; - let parent_error = parent_error_map.get(&parent_hash); - - let result; - - // If the parent block was marked as rejected, also reject all its children. - // - // At this point, we know that all the block's descendants - // are invalid, because we checked all the consensus rules before - // committing the failing ancestor block to the non-finalized state. - if let Some(parent_error) = parent_error { - tracing::trace!( - ?child_hash, - ?parent_error, - "rejecting queued child due to parent error" + // Committing blocks to the finalized state keeps the same chain, + // so we can update the chain seen by the rest of the application now. + // + // TODO: if this causes state request errors due to chain conflicts, + // fix the `service::read` bugs, + // or do the channel update after the finalized state commit + let tip_block_height = update_latest_chain_channels( + non_finalized_state, + chain_tip_sender, + non_finalized_state_sender, + &mut last_zebra_mined_log_height, ); - result = Err(parent_error.clone()); - } else { - tracing::trace!(?child_hash, "validating queued child"); - result = validate_and_commit_non_finalized( - &finalized_state.db, - &mut non_finalized_state, - queued_child, - ) - .map_err(CloneError::from); - } - - // TODO: fix the test timing bugs that require the result to be sent - // after `update_latest_chain_channels()`, - // and send the result on rsp_tx here - if let Err(ref error) = result { - // Update the caller with the error. + // Update the caller with the result. let _ = rsp_tx.send(result.clone().map(|()| child_hash).map_err(BoxError::from)); - // If the block is invalid, mark any descendant blocks as rejected. - parent_error_map.insert(child_hash, error.clone()); - - // Make sure the error map doesn't get too big. - if parent_error_map.len() > PARENT_ERROR_MAP_LIMIT { - // We only add one hash at a time, so we only need to remove one extra here. - parent_error_map.shift_remove_index(0); + while non_finalized_state + .best_chain_len() + .expect("just successfully inserted a non-finalized block above") + > MAX_BLOCK_REORG_HEIGHT + { + tracing::trace!("finalizing block past the reorg limit"); + let contextually_verified_with_trees = non_finalized_state.finalize(); + prev_finalized_note_commitment_trees = finalized_state + .commit_finalized_direct(contextually_verified_with_trees, prev_finalized_note_commitment_trees.take(), "commit contextually-verified request") + .expect( + "unexpected finalized block commit error: note commitment and history trees were already checked by the non-finalized state", + ).1.into(); } - // Skip the things we only need to do for successfully committed blocks - continue; - } - - // Committing blocks to the finalized state keeps the same chain, - // so we can update the chain seen by the rest of the application now. - // - // TODO: if this causes state request errors due to chain conflicts, - // fix the `service::read` bugs, - // or do the channel update after the finalized state commit - let tip_block_height = update_latest_chain_channels( - &non_finalized_state, - &mut chain_tip_sender, - &non_finalized_state_sender, - &mut last_zebra_mined_log_height, - ); - - // Update the caller with the result. - let _ = rsp_tx.send(result.clone().map(|()| child_hash).map_err(BoxError::from)); - - while non_finalized_state - .best_chain_len() - .expect("just successfully inserted a non-finalized block above") - > MAX_BLOCK_REORG_HEIGHT - { - tracing::trace!("finalizing block past the reorg limit"); - let contextually_verified_with_trees = non_finalized_state.finalize(); - prev_finalized_note_commitment_trees = finalized_state - .commit_finalized_direct(contextually_verified_with_trees, prev_finalized_note_commitment_trees.take(), "commit contextually-verified request") - .expect( - "unexpected finalized block commit error: note commitment and history trees were already checked by the non-finalized state", - ).1.into(); - } + // Update the metrics if semantic and contextual validation passes + // + // TODO: split this out into a function? + metrics::counter!("state.full_verifier.committed.block.count").increment(1); + metrics::counter!("zcash.chain.verified.block.total").increment(1); - // Update the metrics if semantic and contextual validation passes - // - // TODO: split this out into a function? - metrics::counter!("state.full_verifier.committed.block.count").increment(1); - metrics::counter!("zcash.chain.verified.block.total").increment(1); + metrics::gauge!("state.full_verifier.committed.block.height") + .set(tip_block_height.0 as f64); - metrics::gauge!("state.full_verifier.committed.block.height") - .set(tip_block_height.0 as f64); + // This height gauge is updated for both fully verified and checkpoint blocks. + // These updates can't conflict, because this block write task makes sure that blocks + // are committed in order. + metrics::gauge!("zcash.chain.verified.block.height").set(tip_block_height.0 as f64); - // This height gauge is updated for both fully verified and checkpoint blocks. - // These updates can't conflict, because this block write task makes sure that blocks - // are committed in order. - metrics::gauge!("zcash.chain.verified.block.height").set(tip_block_height.0 as f64); + tracing::trace!("finished processing queued block"); + } - tracing::trace!("finished processing queued block"); + // We're finished receiving non-finalized blocks from the state, and + // done writing to the finalized state, so we can force it to shut down. + finalized_state.db.shutdown(true); + std::mem::drop(self.finalized_state); } - - // We're finished receiving non-finalized blocks from the state, and - // done writing to the finalized state, so we can force it to shut down. - finalized_state.db.shutdown(true); - std::mem::drop(finalized_state); } /// Log a message if this block was mined by Zebra. diff --git a/zebrad/src/commands/start.rs b/zebrad/src/commands/start.rs index 532c00e4778..825a5f523ca 100644 --- a/zebrad/src/commands/start.rs +++ b/zebrad/src/commands/start.rs @@ -253,6 +253,7 @@ impl StartCmd { build_version(), user_agent(), mempool.clone(), + state.clone(), read_only_state_service.clone(), block_verifier_router.clone(), sync_status.clone(), diff --git a/zebrad/tests/acceptance.rs b/zebrad/tests/acceptance.rs index d65ee68c5f9..f05d7118da2 100644 --- a/zebrad/tests/acceptance.rs +++ b/zebrad/tests/acceptance.rs @@ -2965,17 +2965,12 @@ async fn trusted_chain_sync_handles_forks_correctly() -> Result<()> { use common::regtest::MiningRpcMethods; use eyre::Error; use tokio::time::timeout; - use zebra_chain::{ - chain_tip::ChainTip, parameters::NetworkUpgrade, - primitives::byte_array::increment_big_endian, - }; - use zebra_rpc::methods::GetBlockHash; - use zebra_state::{ReadResponse, Response}; + use zebra_chain::parameters::testnet::REGTEST_NU5_ACTIVATION_HEIGHT; + use zebra_state::ReadResponse; let _init_guard = zebra_test::init(); let mut config = os_assigned_rpc_port_config(false, &Network::new_regtest(Default::default()))?; config.state.ephemeral = false; - let network = config.network.network.clone(); let test_dir = testdir()?.with_config(&mut config)?; @@ -3011,7 +3006,15 @@ async fn trusted_chain_sync_handles_forks_correctly() -> Result<()> { let rpc_client = RpcRequestClient::new(rpc_address); let mut blocks = Vec::new(); for _ in 0..10 { - let (block, height) = rpc_client.submit_block_from_template().await?; + let (block, height) = rpc_client + .block_from_template(Height(REGTEST_NU5_ACTIVATION_HEIGHT)) + .await?; + // TODO: Submit 50 blocks instead of 5, call reconsiderblock, and check that there's another chain tip reset. + // let header = Arc::make_mut(&mut block.header); + // while header.hash() < network.target_difficulty_limit() { + // increment_big_endian(header.nonce.as_mut()); + // } + rpc_client.submit_block(block.clone()).await?; blocks.push(block); let tip_action = timeout( Duration::from_secs(1), @@ -3057,73 +3060,22 @@ async fn trusted_chain_sync_handles_forks_correctly() -> Result<()> { ); } - tracing::info!("getting next block template"); - let (block_11, _) = rpc_client.block_from_template(Height(100)).await?; - let next_blocks: Vec<_> = blocks - .split_off(5) - .into_iter() - .chain(std::iter::once(block_11)) - .collect(); - - tracing::info!("creating populated state"); - let genesis_block = regtest_genesis_block(); - let (state2, read_state2, latest_chain_tip2, _chain_tip_change2) = - zebra_state::populated_state( - std::iter::once(genesis_block).chain(blocks.iter().cloned().map(Arc::new)), - &network, - ) - .await; - - tracing::info!("attempting to trigger a best chain change"); - for mut block in next_blocks { - let is_chain_history_activation_height = NetworkUpgrade::Heartwood - .activation_height(&network) - == Some(block.coinbase_height().unwrap()); - let header = Arc::make_mut(&mut block.header); - increment_big_endian(header.nonce.as_mut()); - let ReadResponse::ChainInfo(chain_info) = read_state2 - .clone() - .oneshot(zebra_state::ReadRequest::ChainInfo) - .await - .map_err(|err| eyre!(err))? - else { - unreachable!("wrong response variant"); - }; + tracing::info!("invalidating blocks to trigger a best chain change"); - header.previous_block_hash = chain_info.tip_hash; - header.commitment_bytes = chain_info - .chain_history_root - .or(is_chain_history_activation_height.then_some([0; 32].into())) - .expect("history tree can't be empty") - .bytes_in_serialized_order() - .into(); + let block_6_hash = blocks.get(5).expect("should have 11 blocks").hash(); + let _ = rpc_client + .call("invalidateblock", format!("[${block_6_hash}]")) + .await + .map_err(|err| eyre!(err))?; - let Response::Committed(block_hash) = state2 - .clone() - .oneshot(zebra_state::Request::CommitSemanticallyVerifiedBlock( - Arc::new(block.clone()).into(), - )) - .await - .map_err(|err| eyre!(err))? - else { - unreachable!("wrong response variant"); - }; + for _ in 0..10 { + let fut = rpc_client + .block_from_template(Height(REGTEST_NU5_ACTIVATION_HEIGHT)) + .await; - assert!( - chain_tip_change.last_tip_change().is_none(), - "there should be no tip change until the last block is submitted" - ); + let (block, _height) = fut?; rpc_client.submit_block(block.clone()).await?; - blocks.push(block); - let GetBlockHash(best_block_hash) = rpc_client - .json_result_from_call("getbestblockhash", "[]") - .await - .map_err(|err| eyre!(err))?; - - if block_hash == best_block_hash { - break; - } } tracing::info!("newly submitted blocks are in the best chain, checking for reset"); @@ -3133,15 +3085,7 @@ async fn trusted_chain_sync_handles_forks_correctly() -> Result<()> { chain_tip_change.wait_for_tip_change(), ) .await??; - let (expected_height, expected_hash) = latest_chain_tip2 - .best_tip_height_and_hash() - .expect("should have a chain tip"); assert!(tip_action.is_reset(), "tip action should be reset"); - assert_eq!( - tip_action.best_tip_hash_and_height(), - (expected_hash, expected_height), - "tip action hashes and heights should match" - ); tracing::info!("checking that read state has the new non-finalized best chain blocks"); for expected_block in blocks { diff --git a/zebrad/tests/common/regtest.rs b/zebrad/tests/common/regtest.rs index 5134e79bc65..8e463421611 100644 --- a/zebrad/tests/common/regtest.rs +++ b/zebrad/tests/common/regtest.rs @@ -99,6 +99,7 @@ async fn submit_blocks(network: Network, rpc_address: SocketAddr) -> Result<()> pub trait MiningRpcMethods { async fn block_from_template(&self, nu5_activation_height: Height) -> Result<(Block, Height)>; async fn submit_block(&self, block: Block) -> Result<()>; + #[allow(dead_code)] async fn submit_block_from_template(&self) -> Result<(Block, Height)>; async fn get_block(&self, height: i32) -> Result>, BoxError>; } @@ -140,6 +141,7 @@ impl MiningRpcMethods for RpcRequestClient { } } + #[allow(dead_code)] async fn submit_block_from_template(&self) -> Result<(Block, Height)> { let (block, height) = self .block_from_template(Height(REGTEST_NU5_ACTIVATION_HEIGHT))