@@ -114,6 +114,13 @@ pub enum SignerEvent<T: SignerEventTrait> {
114
114
/// the time at which this event was received by the signer's event processor
115
115
received_time : SystemTime ,
116
116
} ,
117
+ /// A new processed Stacks block was received from the node with the given block hash
118
+ NewBlock {
119
+ /// The block header hash for the newly processed stacks block
120
+ block_hash : Sha512Trunc256Sum ,
121
+ /// The block height for the newly processed stacks block
122
+ block_height : u64 ,
123
+ } ,
117
124
}
118
125
119
126
/// Trait to implement a stop-signaler for the event receiver thread.
@@ -298,29 +305,25 @@ impl<T: SignerEventTrait> EventReceiver<T> for SignerEventReceiver<T> {
298
305
& request. method( ) ,
299
306
) ) ) ;
300
307
}
308
+ debug ! ( "Processing {} event" , request. url( ) ) ;
301
309
if request. url ( ) == "/stackerdb_chunks" {
302
- process_stackerdb_event ( event_receiver. local_addr , request)
303
- . map_err ( |e| {
304
- error ! ( "Error processing stackerdb_chunks message" ; "err" => ?e) ;
305
- e
306
- } )
310
+ process_event :: < T , StackerDBChunksEvent > ( request)
307
311
} else if request. url ( ) == "/proposal_response" {
308
- process_proposal_response ( request)
312
+ process_event :: < T , BlockValidateResponse > ( request)
309
313
} else if request. url ( ) == "/new_burn_block" {
310
- process_new_burn_block_event ( request)
314
+ process_event :: < T , BurnBlockEvent > ( request)
311
315
} else if request. url ( ) == "/shutdown" {
312
316
event_receiver. stop_signal . store ( true , Ordering :: SeqCst ) ;
313
- return Err ( EventError :: Terminated ) ;
317
+ Err ( EventError :: Terminated )
318
+ } else if request. url ( ) == "/new_block" {
319
+ process_event :: < T , BlockEvent > ( request)
314
320
} else {
315
321
let url = request. url ( ) . to_string ( ) ;
316
- // `/new_block` is expected, but not specifically handled. do not log.
317
- if & url != "/new_block" {
318
- debug ! (
319
- "[{:?}] next_event got request with unexpected url {}, return OK so other side doesn't keep sending this" ,
320
- event_receiver. local_addr,
321
- url
322
- ) ;
323
- }
322
+ debug ! (
323
+ "[{:?}] next_event got request with unexpected url {}, return OK so other side doesn't keep sending this" ,
324
+ event_receiver. local_addr,
325
+ url
326
+ ) ;
324
327
ack_dispatcher ( request) ;
325
328
Err ( EventError :: UnrecognizedEvent ( url) )
326
329
}
@@ -385,12 +388,13 @@ fn ack_dispatcher(request: HttpRequest) {
385
388
386
389
// TODO: add tests from mutation testing results #4835
387
390
#[ cfg_attr( test, mutants:: skip) ]
388
- /// Process a stackerdb event from the node
389
- fn process_stackerdb_event < T : SignerEventTrait > (
390
- local_addr : Option < SocketAddr > ,
391
- mut request : HttpRequest ,
392
- ) -> Result < SignerEvent < T > , EventError > {
391
+ fn process_event < T , E > ( mut request : HttpRequest ) -> Result < SignerEvent < T > , EventError >
392
+ where
393
+ T : SignerEventTrait ,
394
+ E : serde :: de :: DeserializeOwned + TryInto < SignerEvent < T > , Error = EventError > ,
395
+ {
393
396
let mut body = String :: new ( ) ;
397
+
394
398
if let Err ( e) = request. as_reader ( ) . read_to_string ( & mut body) {
395
399
error ! ( "Failed to read body: {:?}" , & e) ;
396
400
ack_dispatcher ( request) ;
@@ -399,27 +403,12 @@ fn process_stackerdb_event<T: SignerEventTrait>(
399
403
& e
400
404
) ) ) ;
401
405
}
402
-
403
- debug ! ( "Got stackerdb_chunks event" ; "chunks_event_body" => %body ) ;
404
- let event : StackerDBChunksEvent = serde_json:: from_slice ( body. as_bytes ( ) )
406
+ // Regardless of whether we successfully deserialize, we should ack the dispatcher so they don't keep resending it
407
+ ack_dispatcher ( request ) ;
408
+ let json_event : E = serde_json:: from_slice ( body. as_bytes ( ) )
405
409
. map_err ( |e| EventError :: Deserialize ( format ! ( "Could not decode body to JSON: {:?}" , & e) ) ) ?;
406
410
407
- let event_contract_id = event. contract_id . clone ( ) ;
408
-
409
- let signer_event = match SignerEvent :: try_from ( event) {
410
- Err ( e) => {
411
- info ! (
412
- "[{:?}] next_event got event from an unexpected contract id {}, return OK so other side doesn't keep sending this" ,
413
- local_addr,
414
- event_contract_id
415
- ) ;
416
- ack_dispatcher ( request) ;
417
- return Err ( e) ;
418
- }
419
- Ok ( x) => x,
420
- } ;
421
-
422
- ack_dispatcher ( request) ;
411
+ let signer_event: SignerEvent < T > = json_event. try_into ( ) ?;
423
412
424
413
Ok ( signer_event)
425
414
}
@@ -466,78 +455,69 @@ impl<T: SignerEventTrait> TryFrom<StackerDBChunksEvent> for SignerEvent<T> {
466
455
}
467
456
}
468
457
469
- /// Process a proposal response from the node
470
- fn process_proposal_response < T : SignerEventTrait > (
471
- mut request : HttpRequest ,
472
- ) -> Result < SignerEvent < T > , EventError > {
473
- debug ! ( "Got proposal_response event" ) ;
474
- let mut body = String :: new ( ) ;
475
- if let Err ( e) = request. as_reader ( ) . read_to_string ( & mut body) {
476
- error ! ( "Failed to read body: {:?}" , & e) ;
458
+ impl < T : SignerEventTrait > TryFrom < BlockValidateResponse > for SignerEvent < T > {
459
+ type Error = EventError ;
477
460
478
- if let Err ( e) = request. respond ( HttpResponse :: empty ( 200u16 ) ) {
479
- error ! ( "Failed to respond to request: {:?}" , & e) ;
480
- }
481
- return Err ( EventError :: MalformedRequest ( format ! (
482
- "Failed to read body: {:?}" ,
483
- & e
484
- ) ) ) ;
461
+ fn try_from ( block_validate_response : BlockValidateResponse ) -> Result < Self , Self :: Error > {
462
+ Ok ( SignerEvent :: BlockValidationResponse (
463
+ block_validate_response,
464
+ ) )
485
465
}
466
+ }
486
467
487
- let event: BlockValidateResponse = serde_json:: from_slice ( body. as_bytes ( ) )
488
- . map_err ( |e| EventError :: Deserialize ( format ! ( "Could not decode body to JSON: {:?}" , & e) ) ) ?;
468
+ #[ derive( Debug , Deserialize ) ]
469
+ struct BurnBlockEvent {
470
+ burn_block_hash : String ,
471
+ burn_block_height : u64 ,
472
+ reward_recipients : Vec < serde_json:: Value > ,
473
+ reward_slot_holders : Vec < String > ,
474
+ burn_amount : u64 ,
475
+ }
489
476
490
- if let Err ( e) = request. respond ( HttpResponse :: empty ( 200u16 ) ) {
491
- error ! ( "Failed to respond to request: {:?}" , & e) ;
477
+ impl < T : SignerEventTrait > TryFrom < BurnBlockEvent > for SignerEvent < T > {
478
+ type Error = EventError ;
479
+
480
+ fn try_from ( burn_block_event : BurnBlockEvent ) -> Result < Self , Self :: Error > {
481
+ let burn_header_hash = burn_block_event
482
+ . burn_block_hash
483
+ . get ( 2 ..)
484
+ . ok_or_else ( || EventError :: Deserialize ( "Hex string should be 0x prefixed" . into ( ) ) )
485
+ . and_then ( |hex| {
486
+ BurnchainHeaderHash :: from_hex ( hex)
487
+ . map_err ( |e| EventError :: Deserialize ( format ! ( "Invalid hex string: {e}" ) ) )
488
+ } ) ?;
489
+
490
+ Ok ( SignerEvent :: NewBurnBlock {
491
+ burn_height : burn_block_event. burn_block_height ,
492
+ received_time : SystemTime :: now ( ) ,
493
+ burn_header_hash,
494
+ } )
492
495
}
496
+ }
493
497
494
- Ok ( SignerEvent :: BlockValidationResponse ( event) )
498
+ #[ derive( Debug , Deserialize ) ]
499
+ struct BlockEvent {
500
+ block_hash : String ,
501
+ block_height : u64 ,
495
502
}
496
503
497
- /// Process a new burn block event from the node
498
- fn process_new_burn_block_event < T : SignerEventTrait > (
499
- mut request : HttpRequest ,
500
- ) -> Result < SignerEvent < T > , EventError > {
501
- debug ! ( "Got burn_block event" ) ;
502
- let mut body = String :: new ( ) ;
503
- if let Err ( e) = request. as_reader ( ) . read_to_string ( & mut body) {
504
- error ! ( "Failed to read body: {:?}" , & e) ;
504
+ impl < T : SignerEventTrait > TryFrom < BlockEvent > for SignerEvent < T > {
505
+ type Error = EventError ;
505
506
506
- if let Err ( e) = request. respond ( HttpResponse :: empty ( 200u16 ) ) {
507
- error ! ( "Failed to respond to request: {:?}" , & e) ;
508
- }
509
- return Err ( EventError :: MalformedRequest ( format ! (
510
- "Failed to read body: {:?}" ,
511
- & e
512
- ) ) ) ;
513
- }
514
- #[ derive( Debug , Deserialize ) ]
515
- struct TempBurnBlockEvent {
516
- burn_block_hash : String ,
517
- burn_block_height : u64 ,
518
- reward_recipients : Vec < serde_json:: Value > ,
519
- reward_slot_holders : Vec < String > ,
520
- burn_amount : u64 ,
521
- }
522
- let temp: TempBurnBlockEvent = serde_json:: from_slice ( body. as_bytes ( ) )
523
- . map_err ( |e| EventError :: Deserialize ( format ! ( "Could not decode body to JSON: {:?}" , & e) ) ) ?;
524
- let burn_header_hash = temp
525
- . burn_block_hash
526
- . get ( 2 ..)
527
- . ok_or_else ( || EventError :: Deserialize ( "Hex string should be 0x prefixed" . into ( ) ) )
528
- . and_then ( |hex| {
529
- BurnchainHeaderHash :: from_hex ( hex)
530
- . map_err ( |e| EventError :: Deserialize ( format ! ( "Invalid hex string: {e}" ) ) )
531
- } ) ?;
532
- let event = SignerEvent :: NewBurnBlock {
533
- burn_height : temp. burn_block_height ,
534
- received_time : SystemTime :: now ( ) ,
535
- burn_header_hash,
536
- } ;
537
- if let Err ( e) = request. respond ( HttpResponse :: empty ( 200u16 ) ) {
538
- error ! ( "Failed to respond to request: {:?}" , & e) ;
507
+ fn try_from ( block_event : BlockEvent ) -> Result < Self , Self :: Error > {
508
+ let block_hash: Sha512Trunc256Sum = block_event
509
+ . block_hash
510
+ . get ( 2 ..)
511
+ . ok_or_else ( || EventError :: Deserialize ( "Hex string should be 0x prefixed" . into ( ) ) )
512
+ . and_then ( |hex| {
513
+ Sha512Trunc256Sum :: from_hex ( hex)
514
+ . map_err ( |e| EventError :: Deserialize ( format ! ( "Invalid hex string: {e}" ) ) )
515
+ } ) ?;
516
+ Ok ( SignerEvent :: NewBlock {
517
+ block_hash,
518
+ block_height : block_event. block_height ,
519
+ } )
539
520
}
540
- Ok ( event)
541
521
}
542
522
543
523
pub fn get_signers_db_signer_set_message_id ( name : & str ) -> Option < ( u32 , u32 ) > {
0 commit comments