@@ -932,7 +932,7 @@ mod tests {
932
932
use lightning:: chain:: channelmonitor:: ANTI_REORG_DELAY ;
933
933
use lightning:: sign:: { InMemorySigner , KeysManager , ChangeDestinationSource } ;
934
934
use lightning:: chain:: transaction:: OutPoint ;
935
- use lightning:: events:: { Event , PathFailure , MessageSendEventsProvider , MessageSendEvent } ;
935
+ use lightning:: events:: { Event , PathFailure , MessageSendEventsProvider , MessageSendEvent , ReplayEvent } ;
936
936
use lightning:: { get_event_msg, get_event} ;
937
937
use lightning:: ln:: types:: { PaymentHash , ChannelId } ;
938
938
use lightning:: ln:: channelmanager;
@@ -954,6 +954,7 @@ mod tests {
954
954
SCORER_PERSISTENCE_PRIMARY_NAMESPACE , SCORER_PERSISTENCE_SECONDARY_NAMESPACE , SCORER_PERSISTENCE_KEY } ;
955
955
use lightning:: util:: sweep:: { OutputSweeper , OutputSpendStatus } ;
956
956
use lightning_persister:: fs_store:: FilesystemStore ;
957
+ use core:: sync:: atomic:: { AtomicBool , Ordering } ;
957
958
use std:: collections:: VecDeque ;
958
959
use std:: { fs, env} ;
959
960
use std:: path:: PathBuf ;
@@ -1774,6 +1775,40 @@ mod tests {
1774
1775
}
1775
1776
}
1776
1777
1778
+ #[ test]
1779
+ fn test_event_handling_failures_are_replayed ( ) {
1780
+ let ( _, nodes) = create_nodes ( 2 , "test_event_handling_failures_are_replayed" ) ;
1781
+ let channel_value = 100000 ;
1782
+ let data_dir = nodes[ 0 ] . kv_store . get_data_dir ( ) ;
1783
+ let persister = Arc :: new ( Persister :: new ( data_dir. clone ( ) ) ) ;
1784
+
1785
+ let ( first_event_send, first_event_recv) = std:: sync:: mpsc:: sync_channel ( 1 ) ;
1786
+ let ( second_event_send, second_event_recv) = std:: sync:: mpsc:: sync_channel ( 1 ) ;
1787
+ let should_fail_event_handling = Arc :: new ( AtomicBool :: new ( true ) ) ;
1788
+ let event_handler = move |event : Event | {
1789
+ if let Ok ( true ) = should_fail_event_handling. compare_exchange ( true , false , Ordering :: Acquire , Ordering :: Relaxed ) {
1790
+ first_event_send. send ( event) . unwrap ( ) ;
1791
+ return Err ( ReplayEvent ( ) ) ;
1792
+ }
1793
+
1794
+ second_event_send. send ( event. clone ( ) ) . unwrap ( ) ;
1795
+ Ok ( ( ) )
1796
+ } ;
1797
+
1798
+ let bg_processor = BackgroundProcessor :: start (
1799
+ persister, event_handler, nodes[ 0 ] . chain_monitor . clone ( ) , nodes[ 0 ] . node . clone ( ) ,
1800
+ Some ( nodes[ 0 ] . messenger . clone ( ) ) , nodes[ 0 ] . no_gossip_sync ( ) ,
1801
+ nodes[ 0 ] . peer_manager . clone ( ) , nodes[ 0 ] . logger . clone ( ) , Some ( nodes[ 0 ] . scorer . clone ( ) )
1802
+ ) ;
1803
+
1804
+ begin_open_channel ! ( nodes[ 0 ] , nodes[ 1 ] , channel_value) ;
1805
+ assert_eq ! ( first_event_recv. recv_timeout( Duration :: from_secs( EVENT_DEADLINE ) ) , second_event_recv. recv_timeout( Duration :: from_secs( EVENT_DEADLINE ) ) ) ;
1806
+
1807
+ if !std:: thread:: panicking ( ) {
1808
+ bg_processor. stop ( ) . unwrap ( ) ;
1809
+ }
1810
+ }
1811
+
1777
1812
#[ test]
1778
1813
fn test_scorer_persistence ( ) {
1779
1814
let ( _, nodes) = create_nodes ( 2 , "test_scorer_persistence" ) ;
0 commit comments