@@ -549,7 +549,16 @@ impl Simulation {
549
549
listener. clone ( ) ,
550
550
) ) ;
551
551
552
+ let result_logger = Arc :: new ( Mutex :: new ( PaymentResultLogger :: new ( ) ) ) ;
553
+
554
+ tasks. spawn ( run_results_logger (
555
+ listener. clone ( ) ,
556
+ result_logger. clone ( ) ,
557
+ Duration :: from_secs ( 60 ) ,
558
+ ) ) ;
559
+
552
560
tasks. spawn ( consume_simulation_results (
561
+ result_logger,
553
562
results_receiver,
554
563
listener,
555
564
self . print_batch_size ,
@@ -874,21 +883,25 @@ async fn produce_random_events<N: NetworkGenerator, A: PaymentGenerator + Displa
874
883
}
875
884
876
885
async fn consume_simulation_results (
886
+ logger : Arc < Mutex < PaymentResultLogger > > ,
877
887
receiver : Receiver < ( Payment , PaymentResult ) > ,
878
888
listener : Listener ,
879
889
print_batch_size : u32 ,
880
890
no_results : bool ,
881
891
) {
882
892
log:: debug!( "Simulation results consumer started." ) ;
883
893
884
- if let Err ( e) = write_payment_results ( receiver, listener, print_batch_size, no_results) . await {
894
+ if let Err ( e) =
895
+ write_payment_results ( logger, receiver, listener, print_batch_size, no_results) . await
896
+ {
885
897
log:: error!( "Error while reporting payment results: {:?}." , e) ;
886
898
}
887
899
888
900
log:: debug!( "Simulation results consumer exiting." ) ;
889
901
}
890
902
891
903
async fn write_payment_results (
904
+ logger : Arc < Mutex < PaymentResultLogger > > ,
892
905
mut receiver : Receiver < ( Payment , PaymentResult ) > ,
893
906
listener : Listener ,
894
907
print_batch_size : u32 ,
@@ -906,8 +919,7 @@ async fn write_payment_results(
906
919
None
907
920
} ;
908
921
909
- let mut result_logger = PaymentResultLogger :: new ( ) ;
910
- let mut counter = 0 ;
922
+ let mut counter = 1 ;
911
923
loop {
912
924
tokio:: select! {
913
925
biased;
@@ -918,7 +930,7 @@ async fn write_payment_results(
918
930
payment_report = receiver. recv( ) => {
919
931
match payment_report {
920
932
Some ( ( details, result) ) => {
921
- result_logger . report_result( & details, & result) ;
933
+ logger . lock ( ) . await . report_result( & details, & result) ;
922
934
log:: trace!( "Resolved dispatched payment: {} with: {}." , details, result) ;
923
935
924
936
if let Some ( ref mut w) = writer {
@@ -940,22 +952,17 @@ async fn write_payment_results(
940
952
}
941
953
}
942
954
943
- /// PaymentResultLogger is an aggregate logger that will report on a summary of the payments that have been reported
944
- /// to it at regular intervals (defined by the log_interval it is created with).
955
+ /// PaymentResultLogger is an aggregate logger that will report on a summary of the payments that have been reported.
945
956
#[ derive( Default ) ]
946
957
struct PaymentResultLogger {
947
958
success_payment : u64 ,
948
959
failed_payment : u64 ,
949
960
total_sent : u64 ,
950
- call_count : u8 ,
951
- log_interval : u8 ,
952
961
}
953
962
954
963
impl PaymentResultLogger {
955
964
fn new ( ) -> Self {
956
965
PaymentResultLogger {
957
- // TODO: set the interval at which we log based on the number of payment we're expecting to log.
958
- log_interval : 10 ,
959
966
..Default :: default ( )
960
967
}
961
968
}
@@ -967,18 +974,44 @@ impl PaymentResultLogger {
967
974
}
968
975
969
976
self . total_sent += details. amount_msat ;
970
- self . call_count += 1 ;
977
+ }
978
+ }
971
979
972
- if self . call_count % self . log_interval == 0 || self . call_count == 0 {
973
- let total_payments = self . success_payment + self . failed_payment ;
974
- log:: info!(
975
- "Processed {} payments sending {} msat total with {}% success rate." ,
976
- total_payments,
977
- self . total_sent,
978
- ( self . success_payment * 100 / total_payments)
979
- ) ;
980
+ impl Display for PaymentResultLogger {
981
+ fn fmt ( & self , f : & mut Formatter < ' _ > ) -> std:: fmt:: Result {
982
+ let total_payments = self . success_payment + self . failed_payment ;
983
+ write ! (
984
+ f,
985
+ "Processed {} payments sending {} msat total with {:.2}% success rate." ,
986
+ total_payments,
987
+ self . total_sent,
988
+ ( self . success_payment as f64 / total_payments as f64 ) * 100.0
989
+ )
990
+ }
991
+ }
992
+
993
+ async fn run_results_logger (
994
+ listener : Listener ,
995
+ logger : Arc < Mutex < PaymentResultLogger > > ,
996
+ interval : Duration ,
997
+ ) {
998
+ log:: debug!( "Results logger started." ) ;
999
+ log:: info!( "Summary of results will be reported every {:?}." , interval) ;
1000
+
1001
+ loop {
1002
+ select ! {
1003
+ biased;
1004
+ _ = listener. clone( ) => {
1005
+ break
1006
+ }
1007
+
1008
+ _ = time:: sleep( interval) => {
1009
+ log:: info!( "{}" , logger. lock( ) . await )
1010
+ }
980
1011
}
981
1012
}
1013
+
1014
+ log:: debug!( "Results logger stopped." )
982
1015
}
983
1016
984
1017
/// produce_results is responsible for receiving the outputs of events that the simulator has taken and
0 commit comments