Skip to content

Commit add3d9a

Browse files
committed
feat(node): include a delay in block broadcasting
1 parent 3b3ded6 commit add3d9a

File tree

6 files changed

+73
-18
lines changed

6 files changed

+73
-18
lines changed

node/src/actors/chain_manager/handlers.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -645,10 +645,10 @@ fn log_sync_progress(
645645
impl Handler<AddCandidates> for ChainManager {
646646
type Result = SessionUnitResult;
647647

648-
fn handle(&mut self, msg: AddCandidates, _ctx: &mut Context<Self>) {
648+
fn handle(&mut self, msg: AddCandidates, ctx: &mut Context<Self>) {
649649
// AddCandidates is needed in all states
650-
for block in msg.blocks {
651-
self.process_candidate(block);
650+
for (block, ts) in msg.blocks {
651+
self.process_candidate(ctx, block, ts);
652652
}
653653
}
654654
}

node/src/actors/chain_manager/mining.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ use witnet_data_structures::{
4040
};
4141
use witnet_futures_utils::TryFutureExt2;
4242
use witnet_rad::{error::RadError, types::serial_iter_decode};
43-
use witnet_util::timestamp::get_timestamp;
43+
use witnet_util::timestamp::{get_timestamp, get_timestamp_nanos};
4444
use witnet_validations::validations::{
4545
block_reward, calculate_liars_and_errors_count_from_tally, calculate_mining_probability,
4646
calculate_randpoe_threshold, calculate_reppoe_threshold, dr_transaction_fee, merkle_tree_root,
@@ -250,7 +250,7 @@ impl ChainManager {
250250
beacon,
251251
epoch_constants,
252252
)
253-
.map_ok(|_diff, act, _ctx| {
253+
.map_ok(|_diff, act, ctx| {
254254
// Send AddCandidates message to self
255255
// This will run all the validations again
256256

@@ -263,7 +263,7 @@ impl ChainManager {
263263
Yellow.bold().paint(block_hash.to_string())
264264
);
265265

266-
act.process_candidate(block);
266+
act.process_candidate(ctx, block, get_timestamp_nanos());
267267
})
268268
.map_err(|e, _, _| log::error!("Error trying to mine a block: {}", e))
269269
})

node/src/actors/chain_manager/mod.rs

+59-8
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ use witnet_data_structures::{
6868
};
6969

7070
use witnet_rad::types::RadonTypes;
71-
use witnet_util::timestamp::seconds_to_human_string;
71+
use witnet_util::timestamp::{duration_between_timestamps, seconds_to_human_string};
7272
use witnet_validations::validations::{
7373
compare_block_candidates, validate_block, validate_block_transactions,
7474
validate_new_transaction, validate_rad_request, verify_signatures, VrfSlots,
@@ -515,7 +515,7 @@ impl ChainManager {
515515
}
516516

517517
#[allow(clippy::map_entry)]
518-
fn process_candidate(&mut self, block: Block) {
518+
fn process_candidate(&mut self, ctx: &mut Context<Self>, block: Block, ts: (i64, u32)) {
519519
if let (Some(current_epoch), Some(chain_info), Some(rep_engine), Some(vrf_ctx)) = (
520520
self.current_epoch,
521521
self.chain_state.chain_info.as_ref(),
@@ -582,7 +582,16 @@ impl ChainManager {
582582
// In order to do not block possible validate candidates in AlmostSynced
583583
// state, we would broadcast the errors too
584584
if self.sm_state == StateMachine::AlmostSynced {
585-
self.broadcast_item(InventoryItem::Block(block));
585+
let delay = calculate_delay_from_mining_timestamp(
586+
chain_info.consensus_constants.checkpoint_zero_timestamp,
587+
chain_info.consensus_constants.checkpoints_period,
588+
current_epoch,
589+
ts,
590+
);
591+
592+
ctx.run_later(delay_function(delay), |act, _ctx| {
593+
act.broadcast_item(InventoryItem::Block(block))
594+
});
586595
}
587596

588597
return;
@@ -646,7 +655,16 @@ impl ChainManager {
646655
vrf_proof,
647656
});
648657

649-
self.broadcast_item(InventoryItem::Block(block));
658+
let delay = calculate_delay_from_mining_timestamp(
659+
chain_info.consensus_constants.checkpoint_zero_timestamp,
660+
chain_info.consensus_constants.checkpoints_period,
661+
current_epoch,
662+
ts,
663+
);
664+
665+
ctx.run_later(delay_function(delay), |act, _ctx| {
666+
act.broadcast_item(InventoryItem::Block(block))
667+
});
650668
}
651669
Err(e) => {
652670
log::warn!(
@@ -658,7 +676,16 @@ impl ChainManager {
658676
// In order to do not block possible validate candidates in AlmostSynced
659677
// state, we would broadcast the errors too
660678
if self.sm_state == StateMachine::AlmostSynced {
661-
self.broadcast_item(InventoryItem::Block(block));
679+
let delay = calculate_delay_from_mining_timestamp(
680+
chain_info.consensus_constants.checkpoint_zero_timestamp,
681+
chain_info.consensus_constants.checkpoints_period,
682+
current_epoch,
683+
ts,
684+
);
685+
686+
ctx.run_later(delay_function(delay), |act, _ctx| {
687+
act.broadcast_item(InventoryItem::Block(block))
688+
});
662689
}
663690
}
664691
}
@@ -2207,6 +2234,30 @@ impl ChainManager {
22072234
}
22082235
}
22092236

2237+
// Calculate delay between mining block timestamp and another timestamp
2238+
fn calculate_delay_from_mining_timestamp(
2239+
checkpoint_zero_timestamp: i64,
2240+
checkpoints_period: u16,
2241+
current_epoch: Epoch,
2242+
ts: (i64, u32),
2243+
) -> Duration {
2244+
let epoch_constants = EpochConstants {
2245+
checkpoint_zero_timestamp,
2246+
checkpoints_period,
2247+
};
2248+
let timestamp_mining = epoch_constants
2249+
.block_mining_timestamp(current_epoch)
2250+
.unwrap();
2251+
2252+
duration_between_timestamps((timestamp_mining, 0), ts).unwrap_or_else(|| Duration::from_secs(0))
2253+
}
2254+
2255+
fn delay_function(initial_delay: Duration) -> Duration {
2256+
// TODO: Apply a right delay function
2257+
// Direct delay
2258+
initial_delay
2259+
}
2260+
22102261
/// Helper struct used to persist an old copy of the `ChainState` to the storage
22112262
#[derive(Debug, Default)]
22122263
struct ChainStateSnapshot {
@@ -3528,19 +3579,19 @@ mod tests {
35283579
assert_ne!(block_1, block_mal_1);
35293580

35303581
// Process the modified candidate first
3531-
chain_manager.process_candidate(block_mal_1);
3582+
chain_manager.process_candidate(&mut Context::new(), block_mal_1, (0, 0));
35323583
// The best candidate should be None because this block is invalid
35333584
let best_cand = chain_manager.best_candidate.as_ref().map(|bc| &bc.block);
35343585
assert_eq!(best_cand, None);
35353586

35363587
// Process candidate with the same hash, but this one is valid
3537-
chain_manager.process_candidate(block_1.clone());
3588+
chain_manager.process_candidate(&mut Context::new(), block_1.clone(), (0, 0));
35383589
// The best candidate should be block_1
35393590
let best_cand = chain_manager.best_candidate.as_ref().map(|bc| &bc.block);
35403591
assert_eq!(best_cand, Some(&block_1));
35413592

35423593
// Process another valid candidate, but worse than the other one
3543-
chain_manager.process_candidate(block_2);
3594+
chain_manager.process_candidate(&mut Context::new(), block_2, (0, 0));
35443595
// The best candidate should still be block_1
35453596
let best_cand = chain_manager.best_candidate.as_ref().map(|bc| &bc.block);
35463597
assert_eq!(best_cand, Some(&block_1));

node/src/actors/json_rpc/json_rpc_methods.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ use crate::actors::messages::GetSupplyInfo;
4949
use futures::FutureExt;
5050
use futures_util::compat::Compat;
5151
use std::future::Future;
52+
use witnet_util::timestamp::get_timestamp_nanos;
5253

5354
type JsonRpcResult = Result<Value, jsonrpc_core::Error>;
5455

@@ -433,9 +434,10 @@ pub async fn inventory(params: Result<InventoryItem, jsonrpc_core::Error>) -> Js
433434
log::debug!("Got block from JSON-RPC. Sending AnnounceItems message.");
434435

435436
let chain_manager_addr = ChainManager::from_registry();
437+
let now = get_timestamp_nanos();
436438
let res = chain_manager_addr
437439
.send(AddCandidates {
438-
blocks: vec![block],
440+
blocks: vec![(block, now)],
439441
})
440442
.await;
441443

node/src/actors/messages.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ impl Message for AddBlocks {
8484
/// Add a new candidate
8585
pub struct AddCandidates {
8686
/// Candidates
87-
pub blocks: Vec<Block>,
87+
pub blocks: Vec<(Block, (i64, u32))>,
8888
}
8989

9090
impl Message for AddCandidates {

node/src/actors/session/handlers.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use crate::actors::{
3939
sessions_manager::SessionsManager,
4040
};
4141

42-
use witnet_util::timestamp::get_timestamp;
42+
use witnet_util::timestamp::{get_timestamp, get_timestamp_nanos};
4343

4444
#[derive(Debug, Eq, Fail, PartialEq)]
4545
enum HandshakeError {
@@ -710,10 +710,12 @@ fn inventory_process_block(session: &mut Session, _ctx: &mut Context<Session>, b
710710
// requested_block_hashes is cleared by using drain(..) above
711711
}
712712
} else {
713+
let ts = get_timestamp_nanos();
714+
713715
// If this is not a requested block, assume it is a candidate
714716
// Send a message to the ChainManager to try to add a new candidate
715717
chain_manager_addr.do_send(AddCandidates {
716-
blocks: vec![block],
718+
blocks: vec![(block, ts)],
717719
});
718720
}
719721
}

0 commit comments

Comments
 (0)