1
+ use self :: clock:: { Clock , SystemClock } ;
1
2
use async_trait:: async_trait;
2
3
use bitcoin:: secp256k1:: PublicKey ;
3
4
use bitcoin:: Network ;
@@ -491,6 +492,8 @@ pub struct Simulation {
491
492
/// High level triggers used to manage simulation tasks and shutdown.
492
493
shutdown_trigger : Trigger ,
493
494
shutdown_listener : Listener ,
495
+ /// Clock for the simulation.
496
+ clock : Arc < dyn Clock > ,
494
497
}
495
498
496
499
#[ derive( Clone ) ]
@@ -527,6 +530,7 @@ impl Simulation {
527
530
tasks,
528
531
shutdown_trigger,
529
532
shutdown_listener,
533
+ clock : Arc :: new ( SystemClock { } ) ,
530
534
}
531
535
}
532
536
@@ -701,6 +705,7 @@ impl Simulation {
701
705
if let Some ( total_time) = self . cfg . total_time {
702
706
let shutdown = self . shutdown_trigger . clone ( ) ;
703
707
let listener = self . shutdown_listener . clone ( ) ;
708
+ let clock = self . clock . clone ( ) ;
704
709
705
710
self . tasks . spawn ( async move {
706
711
select ! {
@@ -709,7 +714,7 @@ impl Simulation {
709
714
log:: debug!( "Timeout task exited on listener signal" ) ;
710
715
}
711
716
712
- _ = time :: sleep( total_time) => {
717
+ _ = clock . sleep( total_time) => {
713
718
log:: info!(
714
719
"Simulation run for {}s. Shutting down." ,
715
720
total_time. as_secs( )
@@ -776,23 +781,27 @@ impl Simulation {
776
781
777
782
let result_logger_clone = result_logger. clone ( ) ;
778
783
let result_logger_listener = listener. clone ( ) ;
784
+ let clock = self . clock . clone ( ) ;
779
785
tasks. spawn ( async move {
780
786
log:: debug!( "Starting results logger." ) ;
781
787
run_results_logger (
782
788
result_logger_listener,
783
789
result_logger_clone,
784
790
Duration :: from_secs ( 60 ) ,
791
+ clock,
785
792
)
786
793
. await ;
787
794
log:: debug!( "Exiting results logger." ) ;
788
795
} ) ;
789
796
790
797
// csr: consume simulation results
791
798
let csr_write_results = self . cfg . write_results . clone ( ) ;
799
+ let clock = self . clock . clone ( ) ;
792
800
tasks. spawn ( async move {
793
801
log:: debug!( "Starting simulation results consumer." ) ;
794
802
if let Err ( e) = consume_simulation_results (
795
803
result_logger,
804
+ clock,
796
805
results_receiver,
797
806
listener,
798
807
csr_write_results,
@@ -930,11 +939,12 @@ impl Simulation {
930
939
let ce_shutdown = self . shutdown_trigger . clone ( ) ;
931
940
let ce_output_sender = output_sender. clone ( ) ;
932
941
let ce_node = node. clone ( ) ;
942
+ let clock = self . clock . clone ( ) ;
933
943
tasks. spawn ( async move {
934
944
let node_info = ce_node. lock ( ) . await . get_info ( ) . clone ( ) ;
935
945
log:: debug!( "Starting events consumer for {}." , node_info) ;
936
946
if let Err ( e) =
937
- consume_events ( ce_node, receiver, ce_output_sender, ce_listener) . await
947
+ consume_events ( ce_node, clock , receiver, ce_output_sender, ce_listener) . await
938
948
{
939
949
ce_shutdown. trigger ( ) ;
940
950
log:: error!( "Event consumer for node {node_info} exited with error: {e:?}." ) ;
@@ -967,6 +977,8 @@ impl Simulation {
967
977
let pe_shutdown = self . shutdown_trigger . clone ( ) ;
968
978
let pe_listener = self . shutdown_listener . clone ( ) ;
969
979
let pe_sender = sender. clone ( ) ;
980
+ let clock = self . clock . clone ( ) ;
981
+
970
982
tasks. spawn ( async move {
971
983
let source = executor. source_info . clone ( ) ;
972
984
@@ -980,6 +992,7 @@ impl Simulation {
980
992
executor. source_info ,
981
993
executor. network_generator ,
982
994
executor. payment_generator ,
995
+ clock,
983
996
pe_sender,
984
997
pe_listener,
985
998
)
@@ -1001,6 +1014,7 @@ impl Simulation {
1001
1014
/// event being executed is piped into a channel to handle the result of the event.
1002
1015
async fn consume_events (
1003
1016
node : Arc < Mutex < dyn LightningNode > > ,
1017
+ clock : Arc < dyn Clock > ,
1004
1018
mut receiver : Receiver < SimulationEvent > ,
1005
1019
sender : Sender < SimulationOutput > ,
1006
1020
listener : Listener ,
@@ -1022,7 +1036,7 @@ async fn consume_events(
1022
1036
hash: None ,
1023
1037
amount_msat: amt_msat,
1024
1038
destination: dest. pubkey,
1025
- dispatch_time: SystemTime :: now( ) ,
1039
+ dispatch_time: clock . now( ) ,
1026
1040
} ;
1027
1041
1028
1042
let outcome = match node. send_payment( dest. pubkey, amt_msat) . await {
@@ -1084,6 +1098,7 @@ async fn produce_events<N: DestinationGenerator + ?Sized, A: PaymentGenerator +
1084
1098
source : NodeInfo ,
1085
1099
network_generator : Arc < Mutex < N > > ,
1086
1100
node_generator : Box < A > ,
1101
+ clock : Arc < dyn Clock > ,
1087
1102
sender : Sender < SimulationEvent > ,
1088
1103
listener : Listener ,
1089
1104
) -> Result < ( ) , SimulationError > {
@@ -1107,7 +1122,7 @@ async fn produce_events<N: DestinationGenerator + ?Sized, A: PaymentGenerator +
1107
1122
} ,
1108
1123
// Wait until our time to next payment has elapsed then execute a random amount payment to a random
1109
1124
// destination.
1110
- _ = time :: sleep( wait) => {
1125
+ _ = clock . sleep( wait) => {
1111
1126
let ( destination, capacity) = network_generator. lock( ) . await . choose_destination( source. pubkey) . map_err( SimulationError :: DestinationGenerationError ) ?;
1112
1127
1113
1128
// Only proceed with a payment if the amount is non-zero, otherwise skip this round. If we can't get
@@ -1183,13 +1198,14 @@ fn get_payment_delay<A: PaymentGenerator + ?Sized>(
1183
1198
1184
1199
async fn consume_simulation_results (
1185
1200
logger : Arc < Mutex < PaymentResultLogger > > ,
1201
+ clock : Arc < dyn Clock > ,
1186
1202
mut receiver : Receiver < ( Payment , PaymentResult ) > ,
1187
1203
listener : Listener ,
1188
1204
write_results : Option < WriteResults > ,
1189
1205
) -> Result < ( ) , SimulationError > {
1190
1206
let mut writer = match write_results {
1191
1207
Some ( res) => {
1192
- let duration = SystemTime :: now ( ) . duration_since ( SystemTime :: UNIX_EPOCH ) ?;
1208
+ let duration = clock . now ( ) . duration_since ( SystemTime :: UNIX_EPOCH ) ?;
1193
1209
let file = res
1194
1210
. results_dir
1195
1211
. join ( format ! ( "simulation_{:?}.csv" , duration) ) ;
@@ -1288,6 +1304,7 @@ async fn run_results_logger(
1288
1304
listener : Listener ,
1289
1305
logger : Arc < Mutex < PaymentResultLogger > > ,
1290
1306
interval : Duration ,
1307
+ clock : Arc < dyn Clock > ,
1291
1308
) {
1292
1309
log:: info!( "Summary of results will be reported every {:?}." , interval) ;
1293
1310
@@ -1298,7 +1315,7 @@ async fn run_results_logger(
1298
1315
break
1299
1316
}
1300
1317
1301
- _ = time :: sleep( interval) => {
1318
+ _ = clock . sleep( interval) => {
1302
1319
log:: info!( "{}" , logger. lock( ) . await )
1303
1320
}
1304
1321
}
0 commit comments