22// SPDX-License-Identifier: Apache-2.0
33
44use crate :: {
5- account:: AccountState , base_types:: * , committee:: Committee , error:: FastPayError , messages:: * ,
5+ account:: AccountState , base_types:: * , committee:: Committee , consensus:: ConsensusState ,
6+ error:: FastPayError , messages:: * ,
67} ;
8+ use either:: Either ;
79use std:: collections:: { BTreeMap , HashSet } ;
810
911#[ cfg( test) ]
@@ -20,6 +22,8 @@ pub struct AuthorityState {
2022 pub key_pair : KeyPair ,
2123 /// States of FastPay accounts.
2224 pub accounts : BTreeMap < AccountId , AccountState > ,
25+ /// States of consensus instances.
26+ pub instances : BTreeMap < AccountId , ConsensusState > ,
2327 /// The latest transaction index of the blockchain that the authority has seen.
2428 pub last_transaction_index : SequenceNumber ,
2529 /// The sharding ID of this authority shard. 0 if one shard.
@@ -60,6 +64,12 @@ pub trait Authority {
6064 order : CoinCreationOrder ,
6165 ) -> Result < ( Vec < Vote > , Vec < CrossShardContinuation > ) , FastPayError > ;
6266
67+ /// Process a message meant for a consensus instance.
68+ fn handle_consensus_order (
69+ & mut self ,
70+ order : ConsensusOrder ,
71+ ) -> Result < Either < Vote , Vec < CrossShardContinuation > > , FastPayError > ;
72+
6373 /// Force synchronization to finalize requests from Primary to FastPay.
6474 fn handle_primary_synchronization_order (
6575 & mut self ,
@@ -148,14 +158,22 @@ impl AuthorityState {
148158 operation : Operation ,
149159 certificate : Certificate ,
150160 ) -> Result < ( ) , FastPayError > {
151- let recipient = operation
152- . recipient ( )
153- . ok_or ( FastPayError :: InvalidCrossShardRequest ) ?;
154- // Verify sharding.
155- fp_ensure ! ( self . in_shard( recipient) , FastPayError :: WrongShard ) ;
156- // Execute the recipient's side of the operation.
157- let account = self . accounts . entry ( recipient. clone ( ) ) . or_default ( ) ;
158- account. apply_operation_as_recipient ( & operation, certificate) ?;
161+ if let Some ( recipient) = operation. recipient ( ) {
162+ fp_ensure ! ( self . in_shard( recipient) , FastPayError :: WrongShard ) ;
163+ // Execute the recipient's side of the operation.
164+ let account = self . accounts . entry ( recipient. clone ( ) ) . or_default ( ) ;
165+ account. apply_operation_as_recipient ( & operation, certificate) ?;
166+ } else if let Operation :: StartConsensusInstance {
167+ new_id,
168+ accounts,
169+ functionality,
170+ } = & operation
171+ {
172+ fp_ensure ! ( self . in_shard( new_id) , FastPayError :: WrongShard ) ;
173+ assert ! ( !self . instances. contains_key( new_id) ) ; // guaranteed under BFT assumptions.
174+ let instance = ConsensusState :: new ( * functionality, accounts. clone ( ) ) ;
175+ self . instances . insert ( new_id. clone ( ) , instance) ;
176+ }
159177 // This concludes the confirmation of `certificate`.
160178 Ok ( ( ) )
161179 }
@@ -332,6 +350,167 @@ impl Authority for AuthorityState {
332350 Ok ( ( votes, continuations) )
333351 }
334352
353+ /// Process a message meant for a consensus instance.
354+ fn handle_consensus_order (
355+ & mut self ,
356+ order : ConsensusOrder ,
357+ ) -> Result < Either < Vote , Vec < CrossShardContinuation > > , FastPayError > {
358+ match order {
359+ ConsensusOrder :: Propose {
360+ proposal,
361+ owner,
362+ signature,
363+ locks,
364+ } => {
365+ let instance = self
366+ . instances
367+ . get_mut ( & proposal. instance_id )
368+ . ok_or_else ( || {
369+ FastPayError :: UnknownConsensusInstance ( proposal. instance_id . clone ( ) )
370+ } ) ?;
371+ // Process lock certificates.
372+ for lock in locks {
373+ lock. check ( & self . committee ) ?;
374+ match lock. value {
375+ Value :: Lock ( Request {
376+ account_id,
377+ operation : Operation :: LockInto { instance_id, owner } ,
378+ sequence_number,
379+ } ) if instance_id == proposal. instance_id
380+ && Some ( & sequence_number)
381+ == instance. sequence_numbers . get ( & account_id) =>
382+ {
383+ // Update locking status for `account_id`.
384+ instance. locked_accounts . insert ( account_id, owner) ;
385+ instance. participants . insert ( owner) ;
386+ }
387+ _ => fp_bail ! ( FastPayError :: InvalidConsensusOrder ) ,
388+ }
389+ }
390+ // Verify the signature and that the author of the signature is authorized.
391+ fp_ensure ! (
392+ instance. participants. contains( & owner) ,
393+ FastPayError :: InvalidConsensusOrder
394+ ) ;
395+ signature. check ( & proposal, owner) ?;
396+ // Check validity of the proposal and obtain the corresponding requests.
397+ let requests = instance. make_requests ( proposal. decision ) ?;
398+ // TODO: verify that `proposal.round` is "available".
399+ // Verify safety.
400+ if let Some ( proposed) = & instance. proposed {
401+ fp_ensure ! (
402+ ( proposed. round == proposal. round
403+ && proposed. decision == proposal. decision)
404+ || proposed. round < proposal. round,
405+ FastPayError :: UnsafeConsensusProposal
406+ ) ;
407+ }
408+ if let Some ( locked) = & instance. locked {
409+ fp_ensure ! (
410+ locked. round < proposal. round && locked. decision == proposal. decision,
411+ FastPayError :: UnsafeConsensusProposal
412+ ) ;
413+ }
414+ // Update proposed decision.
415+ instance. proposed = Some ( proposal. clone ( ) ) ;
416+ // Vote in favor of pre-commit (aka lock).
417+ let value = Value :: PreCommit { proposal, requests } ;
418+ let vote = Vote :: new ( value, & self . key_pair ) ;
419+ Ok ( Either :: Left ( vote) )
420+ }
421+ ConsensusOrder :: HandlePreCommit { certificate } => {
422+ certificate. check ( & self . committee ) ?;
423+ let ( proposal, requests) = match certificate. value {
424+ Value :: PreCommit { proposal, requests } => ( proposal, requests) ,
425+ _ => fp_bail ! ( FastPayError :: InvalidConsensusOrder ) ,
426+ } ;
427+ let instance = self
428+ . instances
429+ . get_mut ( & proposal. instance_id )
430+ . ok_or_else ( || {
431+ FastPayError :: UnknownConsensusInstance ( proposal. instance_id . clone ( ) )
432+ } ) ?;
433+ // Verify safety.
434+ if let Some ( proposed) = & instance. proposed {
435+ fp_ensure ! (
436+ proposed. round <= proposal. round,
437+ FastPayError :: UnsafeConsensusPreCommit
438+ ) ;
439+ }
440+ if let Some ( locked) = & instance. locked {
441+ fp_ensure ! (
442+ locked. round <= proposal. round,
443+ FastPayError :: UnsafeConsensusPreCommit
444+ ) ;
445+ }
446+ // Update locked decision.
447+ instance. locked = Some ( proposal. clone ( ) ) ;
448+ // Vote in favor of commit.
449+ let value = Value :: Commit { proposal, requests } ;
450+ let vote = Vote :: new ( value, & self . key_pair ) ;
451+ Ok ( Either :: Left ( vote) )
452+ }
453+ ConsensusOrder :: HandleCommit { certificate, locks } => {
454+ certificate. check ( & self . committee ) ?;
455+ let ( proposal, requests) = match & certificate. value {
456+ Value :: Commit { proposal, requests } => ( proposal, requests) ,
457+ _ => fp_bail ! ( FastPayError :: InvalidConsensusOrder ) ,
458+ } ;
459+ // Success.
460+ // Only execute the requests in the commit once.
461+ let mut requests = {
462+ if self . instances . contains_key ( & proposal. instance_id ) {
463+ requests. clone ( )
464+ } else {
465+ Vec :: new ( )
466+ }
467+ } ;
468+ // Process lock certificates to add skip requests if needed.
469+ if let ConsensusDecision :: Abort = & proposal. decision {
470+ for lock in locks {
471+ lock. check ( & self . committee ) ?;
472+ match lock. value {
473+ Value :: Lock ( Request {
474+ account_id,
475+ operation :
476+ Operation :: LockInto {
477+ instance_id,
478+ owner : _,
479+ } ,
480+ sequence_number,
481+ } ) if instance_id == proposal. instance_id => {
482+ requests. push ( Request {
483+ account_id : account_id. clone ( ) ,
484+ operation : Operation :: Skip ,
485+ sequence_number,
486+ } ) ;
487+ }
488+ _ => fp_bail ! ( FastPayError :: InvalidConsensusOrder ) ,
489+ }
490+ }
491+ }
492+ // Remove the consensus instance if needed.
493+ self . instances . remove ( & proposal. instance_id ) ;
494+ // Prepate cross-requests.
495+ let continuations = requests
496+ . iter ( )
497+ . map ( |request| {
498+ let shard_id = self . which_shard ( & request. account_id ) ;
499+ CrossShardContinuation :: Request {
500+ shard_id,
501+ request : Box :: new ( CrossShardRequest :: ProcessConfirmedRequest {
502+ request : request. clone ( ) ,
503+ certificate : certificate. clone ( ) ,
504+ } ) ,
505+ }
506+ } )
507+ . collect ( ) ;
508+ Ok ( Either :: Right ( continuations) )
509+ }
510+ }
511+ }
512+
513+ /// Finalize a request from Primary.
335514 fn handle_primary_synchronization_order (
336515 & mut self ,
337516 order : PrimarySynchronizationOrder ,
@@ -374,6 +553,13 @@ impl Authority for AuthorityState {
374553 self . accounts . remove ( & account_id) ;
375554 Ok ( ( ) )
376555 }
556+ CrossShardRequest :: ProcessConfirmedRequest {
557+ request,
558+ certificate,
559+ } => {
560+ self . process_confirmed_request ( request, certificate) ?; // TODO: process continuations
561+ Ok ( ( ) )
562+ }
377563 }
378564 }
379565
@@ -405,6 +591,7 @@ impl AuthorityState {
405591 name,
406592 key_pair,
407593 accounts : BTreeMap :: new ( ) ,
594+ instances : BTreeMap :: new ( ) ,
408595 last_transaction_index : SequenceNumber :: new ( ) ,
409596 shard_id : 0 ,
410597 number_of_shards : 1 ,
@@ -422,6 +609,7 @@ impl AuthorityState {
422609 name : key_pair. public ( ) ,
423610 key_pair,
424611 accounts : BTreeMap :: new ( ) ,
612+ instances : BTreeMap :: new ( ) ,
425613 last_transaction_index : SequenceNumber :: new ( ) ,
426614 shard_id,
427615 number_of_shards,
0 commit comments