diff --git a/src/DBFTPlugin/Consensus/ConsensusContext.Get.cs b/src/DBFTPlugin/Consensus/ConsensusContext.Get.cs index b54b6eca1..ef3e96f13 100644 --- a/src/DBFTPlugin/Consensus/ConsensusContext.Get.cs +++ b/src/DBFTPlugin/Consensus/ConsensusContext.Get.cs @@ -45,6 +45,18 @@ private ChangeViewPayloadCompact GetChangeViewPayloadCompact(ExtensiblePayload p }; } + private PreCommitPayloadCompact GetPreCommitPayloadCompact(ExtensiblePayload payload) + { + PreCommit preCommit = GetMessage(payload); + return new PreCommitPayloadCompact + { + ViewNumber = preCommit.ViewNumber, + ValidatorIndex = preCommit.ValidatorIndex, + PreparationHash = preCommit.PreparationHash, + InvocationScript = payload.Witness.InvocationScript, + }; + } + private CommitPayloadCompact GetCommitPayloadCompact(ExtensiblePayload payload) { Commit message = GetMessage(payload); @@ -67,12 +79,21 @@ private PreparationPayloadCompact GetPreparationPayloadCompact(ExtensiblePayload } [MethodImpl(MethodImplOptions.AggressiveInlining)] - public byte GetPrimaryIndex(byte viewNumber) + public byte GetPriorityPrimaryIndex(byte viewNumber) { - int p = ((int)Block.Index - viewNumber) % Validators.Length; + int p = ((int)Block[0].Index - viewNumber) % Validators.Length; return p >= 0 ? (byte)p : (byte)(p + Validators.Length); } + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public byte GetFallbackPrimaryIndex(byte priorityPrimaryIndex) + { + if (Validators.Length <= 1) return priorityPrimaryIndex; + int p = ((int)Block[0].Index + 1) % (Validators.Length - 1); + p = p >= 0 ? (byte)p : (byte)(p + Validators.Length); + return p < priorityPrimaryIndex ? (byte)p : (byte)(p + 1); + } + public UInt160 GetSender(int index) { return Contract.CreateSignatureRedeemScript(Validators[index]).ToScriptHash(); @@ -81,18 +102,18 @@ public UInt160 GetSender(int index) /// /// Return the expected block size /// - public int GetExpectedBlockSize() + public int GetExpectedBlockSize(uint pId) { - return GetExpectedBlockSizeWithoutTransactions(Transactions.Count) + // Base size - Transactions.Values.Sum(u => u.Size); // Sum Txs + return GetExpectedBlockSizeWithoutTransactions(Transactions[pId].Count) + // Base size + Transactions[pId].Values.Sum(u => u.Size); // Sum Txs } /// /// Return the expected block system fee /// - public long GetExpectedBlockSystemFee() + public long GetExpectedBlockSystemFee(uint pId) { - return Transactions.Values.Sum(u => u.SystemFee); // Sum Txs + return Transactions[pId].Values.Sum(u => u.SystemFee); // Sum Txs } /// diff --git a/src/DBFTPlugin/Consensus/ConsensusContext.MakePayload.cs b/src/DBFTPlugin/Consensus/ConsensusContext.MakePayload.cs index a761cbafb..442084310 100644 --- a/src/DBFTPlugin/Consensus/ConsensusContext.MakePayload.cs +++ b/src/DBFTPlugin/Consensus/ConsensusContext.MakePayload.cs @@ -32,17 +32,18 @@ public ExtensiblePayload MakeChangeView(ChangeViewReason reason) }); } - public ExtensiblePayload MakeCommit() + public ExtensiblePayload MakeCommit(uint pId) { - return CommitPayloads[MyIndex] ?? (CommitPayloads[MyIndex] = MakeSignedPayload(new Commit + return CommitPayloads[pId][MyIndex] ?? (CommitPayloads[pId][MyIndex] = MakeSignedPayload(new Commit { - Signature = EnsureHeader().Sign(keyPair, neoSystem.Settings.Network) + Signature = EnsureHeader(pId).Sign(keyPair, neoSystem.Settings.Network), + PId = pId })); } private ExtensiblePayload MakeSignedPayload(ConsensusMessage message) { - message.BlockIndex = Block.Index; + message.BlockIndex = Block[0].Index; message.ValidatorIndex = (byte)MyIndex; message.ViewNumber = ViewNumber; ExtensiblePayload payload = CreatePayload(message, null); @@ -70,16 +71,16 @@ private void SignPayload(ExtensiblePayload payload) /// Prevent that block exceed the max size /// /// Ordered transactions - internal void EnsureMaxBlockLimitation(IEnumerable txs) + internal void EnsureMaxBlockLimitation(IEnumerable txs, uint pId) { uint maxTransactionsPerBlock = neoSystem.Settings.MaxTransactionsPerBlock; // Limit Speaker proposal to the limit `MaxTransactionsPerBlock` or all available transactions of the mempool txs = txs.Take((int)maxTransactionsPerBlock); - List hashes = new List(); - Transactions = new Dictionary(); - VerificationContext = new TransactionVerificationContext(); + List hashes = new(); + Transactions[pId] = new Dictionary(); + VerificationContext[pId] = new TransactionVerificationContext(); // Expected block size var blockSize = GetExpectedBlockSizeWithoutTransactions(txs.Count()); @@ -97,25 +98,38 @@ internal void EnsureMaxBlockLimitation(IEnumerable txs) if (blockSystemFee > dbftSettings.MaxBlockSystemFee) break; hashes.Add(tx.Hash); - Transactions.Add(tx.Hash, tx); - VerificationContext.AddTransaction(tx); + Transactions[pId].Add(tx.Hash, tx); + VerificationContext[pId].AddTransaction(tx); } - TransactionHashes = hashes.ToArray(); + TransactionHashes[pId] = hashes.ToArray(); } - public ExtensiblePayload MakePrepareRequest() + internal IEnumerable PickTransactions() { - EnsureMaxBlockLimitation(neoSystem.MemPool.GetSortedVerifiedTransactions()); - Block.Header.Timestamp = Math.Max(TimeProvider.Current.UtcNow.ToTimestampMS(), PrevHeader.Timestamp + 1); - Block.Header.Nonce = GetNonce(); - return PreparationPayloads[MyIndex] = MakeSignedPayload(new PrepareRequest + var verifiedTxes = neoSystem.MemPool.GetSortedVerifiedTransactions(); + if (ViewNumber > 0 && LastProposal.Length > 0) { - Version = Block.Version, - PrevHash = Block.PrevHash, - Timestamp = Block.Timestamp, - Nonce = Block.Nonce, - TransactionHashes = TransactionHashes + var txes = verifiedTxes.Where(p => LastProposal.Contains(p.Hash)); + if (txes.Count() > LastProposal.Length / 2) + return txes; + } + return verifiedTxes; + } + + public ExtensiblePayload MakePrepareRequest(uint pId) + { + EnsureMaxBlockLimitation(PickTransactions(), pId); + Block[pId].Header.Timestamp = Math.Max(TimeProvider.Current.UtcNow.ToTimestampMS(), PrevHeader.Timestamp + 1); + Block[pId].Header.Nonce = GetNonce(); + + return PreparationPayloads[pId][MyIndex] = MakeSignedPayload(new PrepareRequest + { + Version = Block[pId].Version, + PrevHash = Block[pId].PrevHash, + Timestamp = Block[pId].Timestamp, + Nonce = Block[pId].Nonce, + TransactionHashes = TransactionHashes[pId] }); } @@ -130,38 +144,51 @@ public ExtensiblePayload MakeRecoveryRequest() public ExtensiblePayload MakeRecoveryMessage() { PrepareRequest prepareRequestMessage = null; - if (TransactionHashes != null) + uint pId = TransactionHashes[0] != null ? 0u : (TransactionHashes[1] != null ? 1u : 0u); + if (TransactionHashes[pId] != null) { prepareRequestMessage = new PrepareRequest { - Version = Block.Version, - PrevHash = Block.PrevHash, + Version = Block[pId].Version, + PrevHash = Block[pId].PrevHash, ViewNumber = ViewNumber, - Timestamp = Block.Timestamp, - Nonce = Block.Nonce, - BlockIndex = Block.Index, - ValidatorIndex = Block.PrimaryIndex, - TransactionHashes = TransactionHashes + Timestamp = Block[pId].Timestamp, + Nonce = Block[pId].Nonce, + BlockIndex = Block[pId].Index, + ValidatorIndex = Block[pId].PrimaryIndex, + TransactionHashes = TransactionHashes[pId] }; } return MakeSignedPayload(new RecoveryMessage { + PId = pId, ChangeViewMessages = LastChangeViewPayloads.Where(p => p != null).Select(p => GetChangeViewPayloadCompact(p)).Take(M).ToDictionary(p => p.ValidatorIndex), PrepareRequestMessage = prepareRequestMessage, // We only need a PreparationHash set if we don't have the PrepareRequest information. - PreparationHash = TransactionHashes == null ? PreparationPayloads.Where(p => p != null).GroupBy(p => GetMessage(p).PreparationHash, (k, g) => new { Hash = k, Count = g.Count() }).OrderByDescending(p => p.Count).Select(p => p.Hash).FirstOrDefault() : null, - PreparationMessages = PreparationPayloads.Where(p => p != null).Select(p => GetPreparationPayloadCompact(p)).ToDictionary(p => p.ValidatorIndex), + PreparationHash = TransactionHashes[pId] == null ? PreparationPayloads[pId].Where(p => p != null).GroupBy(p => GetMessage(p).PreparationHash, (k, g) => new { Hash = k, Count = g.Count() }).OrderByDescending(p => p.Count).Select(p => p.Hash).FirstOrDefault() : null, + PreparationMessages = PreparationPayloads[pId].Where(p => p != null).Select(p => GetPreparationPayloadCompact(p)).ToDictionary(p => p.ValidatorIndex), + PreCommitMessages = PreCommitPayloads[pId].Where(p => p != null).Select(p => GetPreCommitPayloadCompact(p)).ToDictionary(p => p.ValidatorIndex), CommitMessages = CommitSent - ? CommitPayloads.Where(p => p != null).Select(p => GetCommitPayloadCompact(p)).ToDictionary(p => p.ValidatorIndex) + ? CommitPayloads[pId].Where(p => p != null).Select(p => GetCommitPayloadCompact(p)).ToDictionary(p => p.ValidatorIndex) : new Dictionary() }); } - public ExtensiblePayload MakePrepareResponse() + public ExtensiblePayload MakePrepareResponse(uint pId) + { + return PreparationPayloads[pId][MyIndex] = MakeSignedPayload(new PrepareResponse + { + PreparationHash = PreparationPayloads[pId][Block[pId].PrimaryIndex].Hash, + PId = pId + }); + } + + public ExtensiblePayload MakePreCommit(uint pId) { - return PreparationPayloads[MyIndex] = MakeSignedPayload(new PrepareResponse + return PreCommitPayloads[pId][MyIndex] = MakeSignedPayload(new PreCommit { - PreparationHash = PreparationPayloads[Block.PrimaryIndex].Hash + PreparationHash = PreparationPayloads[pId][Block[pId].PrimaryIndex].Hash, + PId = pId }); } diff --git a/src/DBFTPlugin/Consensus/ConsensusContext.cs b/src/DBFTPlugin/Consensus/ConsensusContext.cs index a224e31d2..167b007d7 100644 --- a/src/DBFTPlugin/Consensus/ConsensusContext.cs +++ b/src/DBFTPlugin/Consensus/ConsensusContext.cs @@ -33,16 +33,19 @@ public partial class ConsensusContext : IDisposable, ISerializable /// private static readonly byte[] ConsensusStateKey = { 0xf4 }; - public Block Block; + public Block[] Block = new Block[2]; public byte ViewNumber; public ECPoint[] Validators; public int MyIndex; - public UInt256[] TransactionHashes; - public Dictionary Transactions; - public ExtensiblePayload[] PreparationPayloads; - public ExtensiblePayload[] CommitPayloads; + public UInt256[][] TransactionHashes = new UInt256[2][]; + public Dictionary[] Transactions = new Dictionary[2]; + public ExtensiblePayload[][] PreparationPayloads = new ExtensiblePayload[2][]; + public ExtensiblePayload[][] PreCommitPayloads = new ExtensiblePayload[2][]; + public ExtensiblePayload[][] CommitPayloads = new ExtensiblePayload[2][]; public ExtensiblePayload[] ChangeViewPayloads; public ExtensiblePayload[] LastChangeViewPayloads; + public UInt256[] LastProposal; + // LastSeenMessage array stores the height of the last seen message, for each validator. // if this node never heard from validator i, LastSeenMessage[i] will be -1. public Dictionary LastSeenMessage { get; private set; } @@ -50,7 +53,7 @@ public partial class ConsensusContext : IDisposable, ISerializable /// /// Store all verified unsorted transactions' senders' fee currently in the consensus context. /// - public TransactionVerificationContext VerificationContext = new(); + public TransactionVerificationContext[] VerificationContext = new TransactionVerificationContext[2]; public SnapshotCache Snapshot { get; private set; } private KeyPair keyPair; @@ -63,17 +66,26 @@ public partial class ConsensusContext : IDisposable, ISerializable public int F => (Validators.Length - 1) / 3; public int M => Validators.Length - F; - public bool IsPrimary => MyIndex == Block.PrimaryIndex; - public bool IsBackup => MyIndex >= 0 && MyIndex != Block.PrimaryIndex; + + public bool IsPriorityPrimary => MyIndex == Block[0].PrimaryIndex; + public bool IsFallbackPrimary => ViewNumber == 0 && MyIndex == Block[1].PrimaryIndex; + + public bool IsAPrimary => IsPriorityPrimary || IsFallbackPrimary; + + //Modify to be 1 or 4/3 + public static float PrimaryTimerPriorityMultiplier => 1; + public static float PrimaryTimerFallBackMultiplier => (float)4 / 3; + public float PrimaryTimerMultiplier => IsPriorityPrimary ? PrimaryTimerPriorityMultiplier : PrimaryTimerFallBackMultiplier; + public bool IsBackup => MyIndex >= 0 && !IsPriorityPrimary && !IsFallbackPrimary; public bool WatchOnly => MyIndex < 0; - public Header PrevHeader => NativeContract.Ledger.GetHeader(Snapshot, Block.PrevHash); - public int CountCommitted => CommitPayloads.Count(p => p != null); + public Header PrevHeader => NativeContract.Ledger.GetHeader(Snapshot, Block[0].PrevHash); + public int CountCommitted => CommitPayloads[0].Count(p => p != null) + CommitPayloads[1].Count(p => p != null); public int CountFailed { get { if (LastSeenMessage == null) return 0; - return Validators.Count(p => !LastSeenMessage.TryGetValue(p, out var value) || value < (Block.Index - 1)); + return Validators.Count(p => !LastSeenMessage.TryGetValue(p, out var value) || value < (Block[0].Index - 1)); } } public bool ValidatorsChanged @@ -89,10 +101,11 @@ public bool ValidatorsChanged } #region Consensus States - public bool RequestSentOrReceived => PreparationPayloads[Block.PrimaryIndex] != null; - public bool ResponseSent => !WatchOnly && PreparationPayloads[MyIndex] != null; - public bool CommitSent => !WatchOnly && CommitPayloads[MyIndex] != null; - public bool BlockSent => Block.Transactions != null; + public bool RequestSentOrReceived => PreparationPayloads[0][Block[0].PrimaryIndex] != null || (ViewNumber == 0 && PreparationPayloads[1][Block[1].PrimaryIndex] != null); + public bool ResponseSent => !WatchOnly && (PreparationPayloads[0][MyIndex] != null || (ViewNumber == 0 && PreparationPayloads[1][MyIndex] != null)); + public bool PreCommitSent => !WatchOnly && (PreCommitPayloads[0][MyIndex] != null || (ViewNumber == 0 && PreCommitPayloads[1][MyIndex] != null)); + public bool CommitSent => !WatchOnly && (CommitPayloads[0][MyIndex] != null || (ViewNumber == 0 && CommitPayloads[1][MyIndex] != null)); + public bool BlockSent => Block[0].Transactions != null || Block[1]?.Transactions != null; public bool ViewChanging => !WatchOnly && GetMessage(ChangeViewPayloads[MyIndex])?.NewViewNumber > ViewNumber; // NotAcceptingPayloadsDueToViewChanging imposes nodes to not accept some payloads if View is Changing, // i.e: OnTransaction function will not process any transaction; OnPrepareRequestReceived will also return; @@ -118,20 +131,20 @@ public ConsensusContext(NeoSystem neoSystem, Settings settings, Wallet wallet) this.store = neoSystem.LoadStore(settings.RecoveryLogs); } - public Block CreateBlock() + public Block CreateBlock(uint pID) { - EnsureHeader(); + EnsureHeader(pID); Contract contract = Contract.CreateMultiSigContract(M, Validators); - ContractParametersContext sc = new ContractParametersContext(neoSystem.StoreView, Block.Header, dbftSettings.Network); + ContractParametersContext sc = new ContractParametersContext(neoSystem.StoreView, Block[pID].Header, dbftSettings.Network); for (int i = 0, j = 0; i < Validators.Length && j < M; i++) { - if (GetMessage(CommitPayloads[i])?.ViewNumber != ViewNumber) continue; - sc.AddSignature(contract, Validators[i], GetMessage(CommitPayloads[i]).Signature.ToArray()); + if (GetMessage(CommitPayloads[pID][i])?.ViewNumber != ViewNumber) continue; + sc.AddSignature(contract, Validators[i], GetMessage(CommitPayloads[pID][i]).Signature.ToArray()); j++; } - Block.Header.Witness = sc.GetWitnesses()[0]; - Block.Transactions = TransactionHashes.Select(p => Transactions[p]).ToArray(); - return Block; + Block[pID].Header.Witness = sc.GetWitnesses()[0]; + Block[pID].Transactions = TransactionHashes[pID].Select(p => Transactions[pID][p]).ToArray(); + return Block[pID]; } public ExtensiblePayload CreatePayload(ConsensusMessage message, ReadOnlyMemory invocationScript = default) @@ -158,11 +171,11 @@ public void Dispose() Snapshot?.Dispose(); } - public Block EnsureHeader() + public Block EnsureHeader(uint pID) { - if (TransactionHashes == null) return null; - Block.Header.MerkleRoot ??= MerkleTree.ComputeRoot(TransactionHashes); - return Block; + if (TransactionHashes[pID] == null) return null; + Block[pID].Header.MerkleRoot ??= MerkleTree.ComputeRoot(TransactionHashes[pID]); + return Block[pID]; } public bool Load() @@ -193,18 +206,21 @@ public void Reset(byte viewNumber) Snapshot?.Dispose(); Snapshot = neoSystem.GetSnapshot(); uint height = NativeContract.Ledger.CurrentIndex(Snapshot); - Block = new Block + for (uint i = 0; i <= 1; i++) { - Header = new Header + Block[i] = new Block { - PrevHash = NativeContract.Ledger.CurrentHash(Snapshot), - Index = height + 1, - NextConsensus = Contract.GetBFTAddress( - NeoToken.ShouldRefreshCommittee(height + 1, neoSystem.Settings.CommitteeMembersCount) ? - NativeContract.NEO.ComputeNextBlockValidators(Snapshot, neoSystem.Settings) : - NativeContract.NEO.GetNextBlockValidators(Snapshot, neoSystem.Settings.ValidatorsCount)) - } - }; + Header = new Header + { + PrevHash = NativeContract.Ledger.CurrentHash(Snapshot), + Index = height + 1, + NextConsensus = Contract.GetBFTAddress( + NeoToken.ShouldRefreshCommittee(height + 1, neoSystem.Settings.CommitteeMembersCount) ? + NativeContract.NEO.ComputeNextBlockValidators(Snapshot, neoSystem.Settings) : + NativeContract.NEO.GetNextBlockValidators(Snapshot, neoSystem.Settings.ValidatorsCount)) + } + }; + } var pv = Validators; Validators = NativeContract.NEO.GetNextBlockValidators(Snapshot, neoSystem.Settings.ValidatorsCount); if (_witnessSize == 0 || (pv != null && pv.Length != Validators.Length)) @@ -226,7 +242,6 @@ public void Reset(byte viewNumber) MyIndex = -1; ChangeViewPayloads = new ExtensiblePayload[Validators.Length]; LastChangeViewPayloads = new ExtensiblePayload[Validators.Length]; - CommitPayloads = new ExtensiblePayload[Validators.Length]; if (ValidatorsChanged || LastSeenMessage is null) { var previous_last_seen_message = LastSeenMessage; @@ -249,6 +264,21 @@ public void Reset(byte viewNumber) break; } cachedMessages = new Dictionary(); + LastProposal = Array.Empty(); + for (uint pID = 0; pID <= 1; pID++) + { + Block[pID].Header.MerkleRoot = null; + Block[pID].Header.Timestamp = 0; + Block[pID].Header.Nonce = 0; + Block[pID].Transactions = null; + TransactionHashes[pID] = null; + PreparationPayloads[pID] = new ExtensiblePayload[Validators.Length]; + PreCommitPayloads[pID] = new ExtensiblePayload[Validators.Length]; + CommitPayloads[pID] = new ExtensiblePayload[Validators.Length]; + if (MyIndex >= 0) LastSeenMessage[Validators[MyIndex]] = Block[pID].Index; + } + Block[0].Header.PrimaryIndex = GetPriorityPrimaryIndex(viewNumber); + Block[1].Header.PrimaryIndex = GetFallbackPrimaryIndex(Block[0].Header.PrimaryIndex); } else { @@ -257,16 +287,25 @@ public void Reset(byte viewNumber) LastChangeViewPayloads[i] = ChangeViewPayloads[i]; else LastChangeViewPayloads[i] = null; + + Block[0].Header.MerkleRoot = null; + Block[0].Header.Timestamp = 0; + Block[0].Header.Nonce = 0; + Block[0].Transactions = null; + TransactionHashes[0] = null; + PreparationPayloads[0] = new ExtensiblePayload[Validators.Length]; + PreCommitPayloads[0] = new ExtensiblePayload[Validators.Length]; + if (MyIndex >= 0) LastSeenMessage[Validators[MyIndex]] = Block[0].Index; + Block[0].Header.PrimaryIndex = GetPriorityPrimaryIndex(viewNumber); + + Block[1] = null; + TransactionHashes[1] = null; + Transactions[1] = null; + VerificationContext[1] = null; + PreparationPayloads[1] = null; + PreCommitPayloads[1] = null; } ViewNumber = viewNumber; - Block.Header.PrimaryIndex = GetPrimaryIndex(viewNumber); - Block.Header.MerkleRoot = null; - Block.Header.Timestamp = 0; - Block.Header.Nonce = 0; - Block.Transactions = null; - TransactionHashes = null; - PreparationPayloads = new ExtensiblePayload[Validators.Length]; - if (MyIndex >= 0) LastSeenMessage[Validators[MyIndex]] = Block.Index; } public void Save() @@ -277,47 +316,65 @@ public void Save() public void Deserialize(ref MemoryReader reader) { Reset(0); - if (reader.ReadUInt32() != Block.Version) throw new FormatException(); - if (reader.ReadUInt32() != Block.Index) throw new InvalidOperationException(); - Block.Header.Timestamp = reader.ReadUInt64(); - Block.Header.Nonce = reader.ReadUInt64(); - Block.Header.PrimaryIndex = reader.ReadByte(); - Block.Header.NextConsensus = reader.ReadSerializable(); - if (Block.NextConsensus.Equals(UInt160.Zero)) - Block.Header.NextConsensus = null; ViewNumber = reader.ReadByte(); - TransactionHashes = reader.ReadSerializableArray(ushort.MaxValue); - Transaction[] transactions = reader.ReadSerializableArray(ushort.MaxValue); - PreparationPayloads = reader.ReadNullableArray(neoSystem.Settings.ValidatorsCount); - CommitPayloads = reader.ReadNullableArray(neoSystem.Settings.ValidatorsCount); - ChangeViewPayloads = reader.ReadNullableArray(neoSystem.Settings.ValidatorsCount); - LastChangeViewPayloads = reader.ReadNullableArray(neoSystem.Settings.ValidatorsCount); - if (TransactionHashes.Length == 0 && !RequestSentOrReceived) - TransactionHashes = null; - Transactions = transactions.Length == 0 && !RequestSentOrReceived ? null : transactions.ToDictionary(p => p.Hash); - VerificationContext = new TransactionVerificationContext(); - if (Transactions != null) + for (uint pID = 0; pID <= 1; pID++) { - foreach (Transaction tx in Transactions.Values) - VerificationContext.AddTransaction(tx); + if (ViewNumber > 0 && pID > 0) break; + if (reader.ReadUInt32() != Block[pID].Version) throw new FormatException(); + if (reader.ReadUInt32() != Block[pID].Index) throw new InvalidOperationException(); + Block[pID].Header.Timestamp = reader.ReadUInt64(); + Block[pID].Header.Nonce = reader.ReadUInt64(); + Block[pID].Header.PrimaryIndex = reader.ReadByte(); + Block[pID].Header.NextConsensus = reader.ReadSerializable(); + if (Block[pID].NextConsensus.Equals(UInt160.Zero)) + Block[pID].Header.NextConsensus = null; + + TransactionHashes[pID] = reader.ReadSerializableArray(ushort.MaxValue); + Transaction[] transactions = reader.ReadSerializableArray(ushort.MaxValue); + PreparationPayloads[pID] = reader.ReadNullableArray(neoSystem.Settings.ValidatorsCount); + PreCommitPayloads[pID] = reader.ReadNullableArray(neoSystem.Settings.ValidatorsCount); + CommitPayloads[pID] = reader.ReadNullableArray(neoSystem.Settings.ValidatorsCount); + + if (TransactionHashes[pID].Length == 0 && !RequestSentOrReceived) + TransactionHashes[pID] = null; + Transactions[pID] = transactions.Length == 0 && !RequestSentOrReceived ? null : transactions.ToDictionary(p => p.Hash); + VerificationContext[pID] = new TransactionVerificationContext(); + if (Transactions[pID] != null) + { + foreach (Transaction tx in Transactions[pID].Values) + VerificationContext[pID].AddTransaction(tx); + } } + ChangeViewPayloads = reader.ReadNullableArray(neoSystem.Settings.ValidatorsCount); + LastChangeViewPayloads = reader.ReadNullableArray(neoSystem.Settings.ValidatorsCount); + } public void Serialize(BinaryWriter writer) { - writer.Write(Block.Version); - writer.Write(Block.Index); - writer.Write(Block.Timestamp); - writer.Write(Block.Nonce); - writer.Write(Block.PrimaryIndex); - writer.Write(Block.NextConsensus ?? UInt160.Zero); writer.Write(ViewNumber); - writer.Write(TransactionHashes ?? Array.Empty()); - writer.Write(Transactions?.Values.ToArray() ?? Array.Empty()); - writer.WriteNullableArray(PreparationPayloads); - writer.WriteNullableArray(CommitPayloads); + for (uint i = 0; i <= 1; i++) + { + if (ViewNumber > 0 && i > 0) break; + writer.Write(Block[i].Version); + writer.Write(Block[i].Index); + writer.Write(Block[i].Timestamp); + writer.Write(Block[i].Nonce); + writer.Write(Block[i].PrimaryIndex); + writer.Write(Block[i].NextConsensus ?? UInt160.Zero); + writer.Write(TransactionHashes[i] ?? Array.Empty()); + writer.Write(Transactions[i]?.Values.ToArray() ?? Array.Empty()); + writer.WriteNullableArray(PreparationPayloads[i]); + writer.WriteNullableArray(PreCommitPayloads[i]); + writer.WriteNullableArray(CommitPayloads[i]); + } writer.WriteNullableArray(ChangeViewPayloads); writer.WriteNullableArray(LastChangeViewPayloads); } + + private static void Log(string message, LogLevel level = LogLevel.Info) + { + Utility.Log(nameof(ConsensusService), level, message); + } } } diff --git a/src/DBFTPlugin/Consensus/ConsensusService.Check.cs b/src/DBFTPlugin/Consensus/ConsensusService.Check.cs index 3b15bcd8f..29bc97919 100644 --- a/src/DBFTPlugin/Consensus/ConsensusService.Check.cs +++ b/src/DBFTPlugin/Consensus/ConsensusService.Check.cs @@ -20,25 +20,25 @@ namespace Neo.Consensus { partial class ConsensusService { - private bool CheckPrepareResponse() + private bool CheckPrepareResponse(uint pId) { - if (context.TransactionHashes.Length == context.Transactions.Count) + if (context.TransactionHashes[pId].Length == context.Transactions[pId].Count) { // if we are the primary for this view, but acting as a backup because we recovered our own // previously sent prepare request, then we don't want to send a prepare response. - if (context.IsPrimary || context.WatchOnly) return true; + if ((pId == 0 && context.IsPriorityPrimary) || (pId == 1 && context.IsFallbackPrimary) || context.WatchOnly) return true; // Check maximum block size via Native Contract policy - if (context.GetExpectedBlockSize() > dbftSettings.MaxBlockSize) + if (context.GetExpectedBlockSize(pId) > dbftSettings.MaxBlockSize) { - Log($"Rejected block: {context.Block.Index} The size exceed the policy", LogLevel.Warning); + Log($"Rejected block: {context.Block[pId].Index} The size exceed the policy", LogLevel.Warning); RequestChangeView(ChangeViewReason.BlockRejectedByPolicy); return false; } // Check maximum block system fee via Native Contract policy - if (context.GetExpectedBlockSystemFee() > dbftSettings.MaxBlockSystemFee) + if (context.GetExpectedBlockSystemFee(pId) > dbftSettings.MaxBlockSystemFee) { - Log($"Rejected block: {context.Block.Index} The system fee exceed the policy", LogLevel.Warning); + Log($"Rejected block: {context.Block[pId].Index} The system fee exceed the policy", LogLevel.Warning); RequestChangeView(ChangeViewReason.BlockRejectedByPolicy); return false; } @@ -48,21 +48,36 @@ private bool CheckPrepareResponse() ExtendTimerByFactor(2); Log($"Sending {nameof(PrepareResponse)}"); - localNode.Tell(new LocalNode.SendDirectly { Inventory = context.MakePrepareResponse() }); - CheckPreparations(); + localNode.Tell(new LocalNode.SendDirectly { Inventory = context.MakePrepareResponse(pId) }); + CheckPreparations(pId); } return true; } - private void CheckCommits() + private void CheckPreCommits(uint pId, bool forced = false) { - if (context.CommitPayloads.Count(p => context.GetMessage(p)?.ViewNumber == context.ViewNumber) >= context.M && context.TransactionHashes.All(p => context.Transactions.ContainsKey(p))) + if (forced || context.PreCommitPayloads[pId].Count(p => p != null) >= context.M && context.TransactionHashes[pId].All(p => context.Transactions[pId].ContainsKey(p))) { - block_received_index = context.Block.Index; + ExtensiblePayload payload = context.MakeCommit(pId); + Log($"Sending {nameof(Commit)} to pId={pId}"); + context.Save(); + localNode.Tell(new LocalNode.SendDirectly { Inventory = payload }); + // Set timer, so we will resend the commit in case of a networking issue + ChangeTimer(TimeSpan.FromMilliseconds(neoSystem.Settings.MillisecondsPerBlock)); + CheckCommits(pId); + } + } + + private void CheckCommits(uint pId) + { + if (context.CommitPayloads[pId].Count(p => context.GetMessage(p)?.ViewNumber == context.ViewNumber) >= context.M && context.TransactionHashes[pId].All(p => context.Transactions[pId].ContainsKey(p))) + { + block_received_index = context.Block[pId].Index; block_received_time = TimeProvider.Current.UtcNow; - Block block = context.CreateBlock(); - Log($"Sending {nameof(Block)}: height={block.Index} hash={block.Hash} tx={block.Transactions.Length}"); + Block block = context.CreateBlock(pId); + Log($"Sending {nameof(Block)}: height={block.Index} hash={block.Hash} tx={block.Transactions.Length} Id={pId}"); blockchain.Tell(block); + return; } } @@ -85,17 +100,33 @@ private void CheckExpectedView(byte viewNumber) } } - private void CheckPreparations() + private void CheckPreparations(uint pId) { - if (context.PreparationPayloads.Count(p => p != null) >= context.M && context.TransactionHashes.All(p => context.Transactions.ContainsKey(p))) + if (context.TransactionHashes[pId].All(p => context.Transactions[pId].ContainsKey(p))) { - ExtensiblePayload payload = context.MakeCommit(); - Log($"Sending {nameof(Commit)}"); - context.Save(); - localNode.Tell(new LocalNode.SendDirectly { Inventory = payload }); - // Set timer, so we will resend the commit in case of a networking issue - ChangeTimer(TimeSpan.FromMilliseconds(neoSystem.Settings.MillisecondsPerBlock)); - CheckCommits(); + var preparationsCount = context.PreparationPayloads[pId].Count(p => p != null); + if (context.ViewNumber > 0) + { + if (preparationsCount >= context.M) + CheckPreCommits(0, true); + return; + } + if (!context.PreCommitSent + && ((pId == 0 && preparationsCount >= context.F + 1) + || (pId == 1 && preparationsCount >= context.M))) + { + ExtensiblePayload payload = context.MakePreCommit(pId); + Log($"Sending {nameof(PreCommit)} to pId={pId}"); + context.Save(); + localNode.Tell(new LocalNode.SendDirectly { Inventory = payload }); + // Set timer, so we will resend the commit in case of a networking issue + ChangeTimer(TimeSpan.FromMilliseconds(neoSystem.Settings.MillisecondsPerBlock)); + CheckPreCommits(pId); + } + if (context.ViewNumber == 0 && pId == 0 && preparationsCount >= context.M) + { + CheckPreCommits(0, true); + } } } } diff --git a/src/DBFTPlugin/Consensus/ConsensusService.OnMessage.cs b/src/DBFTPlugin/Consensus/ConsensusService.OnMessage.cs index ecc31f7ba..d19b44e45 100644 --- a/src/DBFTPlugin/Consensus/ConsensusService.OnMessage.cs +++ b/src/DBFTPlugin/Consensus/ConsensusService.OnMessage.cs @@ -41,11 +41,11 @@ private void OnConsensusPayload(ExtensiblePayload payload) } if (!message.Verify(neoSystem.Settings)) return; - if (message.BlockIndex != context.Block.Index) + if (message.BlockIndex != context.Block[0].Index) { - if (context.Block.Index < message.BlockIndex) + if (context.Block[0].Index < message.BlockIndex) { - Log($"Chain is behind: expected={message.BlockIndex} current={context.Block.Index - 1}", LogLevel.Warning); + Log($"Chain is behind: expected={message.BlockIndex} current={context.Block[0].Index - 1}", LogLevel.Warning); } return; } @@ -63,6 +63,9 @@ private void OnConsensusPayload(ExtensiblePayload payload) case ChangeView view: OnChangeViewReceived(payload, view); break; + case PreCommit precommit: + OnPreCommitReceived(payload, precommit); + break; case Commit commit: OnCommitReceived(payload, commit); break; @@ -78,10 +81,13 @@ private void OnConsensusPayload(ExtensiblePayload payload) private void OnPrepareRequestReceived(ExtensiblePayload payload, PrepareRequest message) { if (context.RequestSentOrReceived || context.NotAcceptingPayloadsDueToViewChanging) return; - if (message.ValidatorIndex != context.Block.PrimaryIndex || message.ViewNumber != context.ViewNumber) return; - if (message.Version != context.Block.Version || message.PrevHash != context.Block.PrevHash) return; + uint pId = context.ViewNumber > 0 || message.ValidatorIndex == context.GetPriorityPrimaryIndex(context.ViewNumber) ? 0u : 1u; + // Add verification for Fallback + if (message.ValidatorIndex != context.Block[pId].PrimaryIndex || message.ViewNumber != context.ViewNumber) return; + if (message.Version != context.Block[pId].Version || message.PrevHash != context.Block[pId].PrevHash) return; if (message.TransactionHashes.Length > neoSystem.Settings.MaxTransactionsPerBlock) return; - Log($"{nameof(OnPrepareRequestReceived)}: height={message.BlockIndex} view={message.ViewNumber} index={message.ValidatorIndex} tx={message.TransactionHashes.Length}"); + + Log($"{nameof(OnPrepareRequestReceived)}: height={message.BlockIndex} view={message.ViewNumber} index={message.ValidatorIndex} tx={message.TransactionHashes.Length} priority={message.ValidatorIndex == context.Block[0].PrimaryIndex} fallback={context.ViewNumber == 0 && message.ValidatorIndex == context.Block[1].PrimaryIndex}"); if (message.Timestamp <= context.PrevHeader.Timestamp || message.Timestamp > TimeProvider.Current.UtcNow.AddMilliseconds(8 * neoSystem.Settings.MillisecondsPerBlock).ToTimestampMS()) { Log($"Timestamp incorrect: {message.Timestamp}", LogLevel.Warning); @@ -98,33 +104,35 @@ private void OnPrepareRequestReceived(ExtensiblePayload payload, PrepareRequest // around 2*15/M=30.0/5 ~ 40% block time (for M=5) ExtendTimerByFactor(2); - context.Block.Header.Timestamp = message.Timestamp; - context.Block.Header.Nonce = message.Nonce; - context.TransactionHashes = message.TransactionHashes; + context.Block[pId].Header.Timestamp = message.Timestamp; + context.Block[pId].Header.Nonce = message.Nonce; + context.TransactionHashes[pId] = message.TransactionHashes; + context.LastProposal = message.TransactionHashes; - context.Transactions = new Dictionary(); - context.VerificationContext = new TransactionVerificationContext(); - for (int i = 0; i < context.PreparationPayloads.Length; i++) - if (context.PreparationPayloads[i] != null) - if (!context.GetMessage(context.PreparationPayloads[i]).PreparationHash.Equals(payload.Hash)) - context.PreparationPayloads[i] = null; - context.PreparationPayloads[message.ValidatorIndex] = payload; - byte[] hashData = context.EnsureHeader().GetSignData(neoSystem.Settings.Network); - for (int i = 0; i < context.CommitPayloads.Length; i++) - if (context.GetMessage(context.CommitPayloads[i])?.ViewNumber == context.ViewNumber) - if (!Crypto.VerifySignature(hashData, context.GetMessage(context.CommitPayloads[i]).Signature.Span, context.Validators[i])) - context.CommitPayloads[i] = null; + context.Transactions[pId] = new Dictionary(); + context.VerificationContext[pId] = new TransactionVerificationContext(); + for (int i = 0; i < context.PreparationPayloads[pId].Length; i++) + if (context.PreparationPayloads[pId][i] != null) + if (!context.GetMessage(context.PreparationPayloads[pId][i]).PreparationHash.Equals(payload.Hash)) + context.PreparationPayloads[pId][i] = null; + context.PreparationPayloads[pId][message.ValidatorIndex] = payload; + byte[] hashData = context.EnsureHeader(pId).GetSignData(neoSystem.Settings.Network); + for (int i = 0; i < context.CommitPayloads[pId].Length; i++) + if (context.GetMessage(context.CommitPayloads[pId][i])?.ViewNumber == context.ViewNumber) + if (!Crypto.VerifySignature(hashData, context.GetMessage(context.CommitPayloads[pId][i]).Signature.Span, context.Validators[i])) + context.CommitPayloads[pId][i] = null; - if (context.TransactionHashes.Length == 0) + if (context.TransactionHashes[pId].Length == 0) { // There are no tx so we should act like if all the transactions were filled - CheckPrepareResponse(); + CheckPrepareResponse(pId); return; } Dictionary mempoolVerified = neoSystem.MemPool.GetVerifiedTransactions().ToDictionary(p => p.Hash); List unverified = new List(); - foreach (UInt256 hash in context.TransactionHashes) + //Cash previous asked TX Hashes + foreach (UInt256 hash in context.TransactionHashes[pId]) { if (mempoolVerified.TryGetValue(hash, out Transaction tx)) { @@ -153,9 +161,9 @@ private void OnPrepareRequestReceived(ExtensiblePayload payload, PrepareRequest foreach (Transaction tx in unverified) if (!AddTransaction(tx, true)) return; - if (context.Transactions.Count < context.TransactionHashes.Length) + if (context.Transactions[pId].Count < context.TransactionHashes[pId].Length) { - UInt256[] hashes = context.TransactionHashes.Where(i => !context.Transactions.ContainsKey(i)).ToArray(); + UInt256[] hashes = context.TransactionHashes[pId].Where(i => !context.Transactions[pId].ContainsKey(i)).ToArray(); taskManager.Tell(new TaskManager.RestartTasks { Payload = InvPayload.Create(InventoryType.TX, hashes) @@ -166,19 +174,44 @@ private void OnPrepareRequestReceived(ExtensiblePayload payload, PrepareRequest private void OnPrepareResponseReceived(ExtensiblePayload payload, PrepareResponse message) { if (message.ViewNumber != context.ViewNumber) return; - if (context.PreparationPayloads[message.ValidatorIndex] != null || context.NotAcceptingPayloadsDueToViewChanging) return; - if (context.PreparationPayloads[context.Block.PrimaryIndex] != null && !message.PreparationHash.Equals(context.PreparationPayloads[context.Block.PrimaryIndex].Hash)) - return; + if (context.PreparationPayloads[message.PId][message.ValidatorIndex] != null || context.NotAcceptingPayloadsDueToViewChanging) return; + if (context.RequestSentOrReceived) + { + // Check if we have joined another consensus process + if (context.PreparationPayloads[message.PId][context.Block[message.PId].PrimaryIndex] == null) return; + if (!message.PreparationHash.Equals(context.PreparationPayloads[message.PId][context.Block[message.PId].PrimaryIndex].Hash)) + return; + } + // Timeout extension: prepare response has been received with success // around 2*15/M=30.0/5 ~ 40% block time (for M=5) ExtendTimerByFactor(2); - Log($"{nameof(OnPrepareResponseReceived)}: height={message.BlockIndex} view={message.ViewNumber} index={message.ValidatorIndex}"); - context.PreparationPayloads[message.ValidatorIndex] = payload; + Log($"{nameof(OnPrepareResponseReceived)}: height={message.BlockIndex} view={message.ViewNumber} index={message.ValidatorIndex} pId={message.PId}"); + context.PreparationPayloads[message.PId][message.ValidatorIndex] = payload; if (context.WatchOnly || context.CommitSent) return; if (context.RequestSentOrReceived) - CheckPreparations(); + CheckPreparations(message.PId); + } + + private void OnPreCommitReceived(ExtensiblePayload payload, PreCommit message) + { + if (message.ViewNumber != context.ViewNumber) return; + if (context.PreCommitPayloads[message.PId][message.ValidatorIndex] != null || context.NotAcceptingPayloadsDueToViewChanging) return; + if (context.RequestSentOrReceived) + { + // Check if we have joined another consensus process + if (context.PreparationPayloads[message.PId][context.Block[message.PId].PrimaryIndex] == null) return; + if (!message.PreparationHash.Equals(context.PreparationPayloads[message.PId][context.Block[message.PId].PrimaryIndex].Hash)) + return; + } + + Log($"{nameof(OnPreCommitReceived)}: height={message.BlockIndex} view={message.ViewNumber} index={message.ValidatorIndex} pId={message.PId}"); + context.PreCommitPayloads[message.PId][message.ValidatorIndex] = payload; + if (context.WatchOnly || context.CommitSent) return; + if (context.RequestSentOrReceived) + CheckPreCommits(message.PId); } private void OnChangeViewReceived(ExtensiblePayload payload, ChangeView message) @@ -199,11 +232,11 @@ private void OnChangeViewReceived(ExtensiblePayload payload, ChangeView message) private void OnCommitReceived(ExtensiblePayload payload, Commit commit) { - ref ExtensiblePayload existingCommitPayload = ref context.CommitPayloads[commit.ValidatorIndex]; + ref ExtensiblePayload existingCommitPayload = ref context.CommitPayloads[commit.PId][commit.ValidatorIndex]; if (existingCommitPayload != null) { if (existingCommitPayload.Hash != payload.Hash) - Log($"Rejected {nameof(Commit)}: height={commit.BlockIndex} index={commit.ValidatorIndex} view={commit.ViewNumber} existingView={context.GetMessage(existingCommitPayload).ViewNumber}", LogLevel.Warning); + Log($"Rejected {nameof(Commit)}: height={commit.BlockIndex} index={commit.ValidatorIndex} view={commit.ViewNumber} existingView={context.GetMessage(existingCommitPayload).ViewNumber} pId={commit.PId}", LogLevel.Warning); return; } @@ -215,7 +248,7 @@ private void OnCommitReceived(ExtensiblePayload payload, Commit commit) Log($"{nameof(OnCommitReceived)}: height={commit.BlockIndex} view={commit.ViewNumber} index={commit.ValidatorIndex} nc={context.CountCommitted} nf={context.CountFailed}"); - byte[] hashData = context.EnsureHeader()?.GetSignData(neoSystem.Settings.Network); + byte[] hashData = context.EnsureHeader(commit.PId)?.GetSignData(neoSystem.Settings.Network); if (hashData == null) { existingCommitPayload = payload; @@ -223,7 +256,7 @@ private void OnCommitReceived(ExtensiblePayload payload, Commit commit) else if (Crypto.VerifySignature(hashData, commit.Signature.Span, context.Validators[commit.ValidatorIndex])) { existingCommitPayload = payload; - CheckCommits(); + CheckCommits(commit.PId); } return; } @@ -240,8 +273,9 @@ private void OnRecoveryMessageReceived(RecoveryMessage message) isRecovering = true; int validChangeViews = 0, totalChangeViews = 0, validPrepReq = 0, totalPrepReq = 0; int validPrepResponses = 0, totalPrepResponses = 0, validCommits = 0, totalCommits = 0; + int validPreCommits = 0, totalPreCommits = 0; - Log($"{nameof(OnRecoveryMessageReceived)}: height={message.BlockIndex} view={message.ViewNumber} index={message.ValidatorIndex}"); + Log($"{nameof(OnRecoveryMessageReceived)}: height={message.BlockIndex} view={message.ViewNumber} index={message.ValidatorIndex} pId={message.PId}"); try { if (message.ViewNumber > context.ViewNumber) @@ -262,11 +296,20 @@ private void OnRecoveryMessageReceived(RecoveryMessage message) totalPrepReq = 1; if (ReverifyAndProcessPayload(prepareRequestPayload)) validPrepReq++; } + else if (context.IsAPrimary) + { + uint pId = Convert.ToUInt32(!context.IsPriorityPrimary); + SendPrepareRequest(pId); + } } ExtensiblePayload[] prepareResponsePayloads = message.GetPrepareResponsePayloads(context); totalPrepResponses = prepareResponsePayloads.Length; foreach (ExtensiblePayload prepareResponsePayload in prepareResponsePayloads) if (ReverifyAndProcessPayload(prepareResponsePayload)) validPrepResponses++; + ExtensiblePayload[] preCommitPayloads = message.GetPreCommitPayloads(context); + totalPreCommits = preCommitPayloads.Length; + foreach (ExtensiblePayload preCommitPayload in preCommitPayloads) + if (ReverifyAndProcessPayload(preCommitPayload)) validPreCommits++; } if (message.ViewNumber <= context.ViewNumber) { @@ -279,7 +322,7 @@ private void OnRecoveryMessageReceived(RecoveryMessage message) } finally { - Log($"Recovery finished: (valid/total) ChgView: {validChangeViews}/{totalChangeViews} PrepReq: {validPrepReq}/{totalPrepReq} PrepResp: {validPrepResponses}/{totalPrepResponses} Commits: {validCommits}/{totalCommits}"); + Log($"Recovery finished: (valid/total) ChgView: {validChangeViews}/{totalChangeViews} PrepReq: {validPrepReq}/{totalPrepReq} PrepResp: {validPrepResponses}/{totalPrepResponses} PreCommits: {validPreCommits}/{totalPreCommits} Commits: {validCommits}/{totalCommits}"); isRecovering = false; } } diff --git a/src/DBFTPlugin/Consensus/ConsensusService.cs b/src/DBFTPlugin/Consensus/ConsensusService.cs index 8a6d75e8b..d5f7e72c2 100644 --- a/src/DBFTPlugin/Consensus/ConsensusService.cs +++ b/src/DBFTPlugin/Consensus/ConsensusService.cs @@ -80,19 +80,21 @@ private void InitializeConsensus(byte viewNumber) { context.Reset(viewNumber); if (viewNumber > 0) - Log($"View changed: view={viewNumber} primary={context.Validators[context.GetPrimaryIndex((byte)(viewNumber - 1u))]}", LogLevel.Warning); - Log($"Initialize: height={context.Block.Index} view={viewNumber} index={context.MyIndex} role={(context.IsPrimary ? "Primary" : context.WatchOnly ? "WatchOnly" : "Backup")}"); + Log($"View changed: view={viewNumber} primary={context.Validators[context.GetPriorityPrimaryIndex((byte)(viewNumber - 1u))]}", LogLevel.Warning); + uint blockCurrentIndex = context.Block[0].Index; + Log($"Initialize: height={blockCurrentIndex} view={viewNumber} index={context.MyIndex} role={(context.IsPriorityPrimary ? (viewNumber > 0 ? "Primary" : "PrimaryP1") : (context.IsFallbackPrimary ? "PrimaryP2" : (context.WatchOnly ? "WatchOnly" : "Backup")))}"); if (context.WatchOnly) return; - if (context.IsPrimary) + if (context.IsAPrimary) { if (isRecovering) { - ChangeTimer(TimeSpan.FromMilliseconds(neoSystem.Settings.MillisecondsPerBlock << (viewNumber + 1))); + ChangeTimer(TimeSpan.FromMilliseconds(context.PrimaryTimerMultiplier * (neoSystem.Settings.MillisecondsPerBlock << (viewNumber + 1)))); } else { - TimeSpan span = neoSystem.Settings.TimePerBlock; - if (block_received_index + 1 == context.Block.Index) + // If both Primaries already expired move to Zero or take the difference + TimeSpan span = TimeSpan.FromMilliseconds(context.PrimaryTimerMultiplier * neoSystem.Settings.MillisecondsPerBlock); + if (block_received_index + 1 == blockCurrentIndex) { var diff = TimeProvider.Current.UtcNow - block_received_time; if (diff >= span) @@ -144,16 +146,20 @@ private void OnStart() started = true; if (!dbftSettings.IgnoreRecoveryLogs && context.Load()) { - if (context.Transactions != null) + // Check if any preparation was obtained and extract the primary ID + var pId = context.RequestSentOrReceived + ? (context.PreparationPayloads[0][context.Block[0].PrimaryIndex] != null ? 0u : 1u) + : 0u; + if (context.Transactions[pId] != null) { blockchain.Ask(new Blockchain.FillMemoryPool { - Transactions = context.Transactions.Values + Transactions = context.Transactions[pId].Values }).Wait(); } if (context.CommitSent) { - CheckPreparations(); + CheckPreparations(pId); return; } } @@ -166,12 +172,15 @@ private void OnStart() private void OnTimer(Timer timer) { if (context.WatchOnly || context.BlockSent) return; - if (timer.Height != context.Block.Index || timer.ViewNumber != context.ViewNumber) return; - if (context.IsPrimary && !context.RequestSentOrReceived) + if (timer.Height != context.Block[0].Index || timer.ViewNumber != context.ViewNumber) return; + if (context.IsAPrimary && !context.RequestSentOrReceived) { - SendPrepareRequest(); + if (context.IsPriorityPrimary) + SendPrepareRequest(0); + else + SendPrepareRequest(1); } - else if ((context.IsPrimary && context.RequestSentOrReceived) || context.IsBackup) + else if ((context.IsAPrimary && context.RequestSentOrReceived) || context.IsBackup) { if (context.CommitSent) { @@ -183,10 +192,13 @@ private void OnTimer(Timer timer) else { var reason = ChangeViewReason.Timeout; - - if (context.Block != null && context.TransactionHashes?.Length > context.Transactions?.Count) + if (context.RequestSentOrReceived) { - reason = ChangeViewReason.TxNotFound; + var pId = context.PreparationPayloads[0][context.Block[0].PrimaryIndex] != null ? 0u : 1u; + if (context.Block[pId] != null && context.TransactionHashes[pId]?.Length > context.Transactions[pId]?.Count) + { + reason = ChangeViewReason.TxNotFound; + } } RequestChangeView(reason); @@ -194,25 +206,25 @@ private void OnTimer(Timer timer) } } - private void SendPrepareRequest() + private void SendPrepareRequest(uint pId) { - Log($"Sending {nameof(PrepareRequest)}: height={context.Block.Index} view={context.ViewNumber}"); - localNode.Tell(new LocalNode.SendDirectly { Inventory = context.MakePrepareRequest() }); + Log($"Sending {nameof(PrepareRequest)}: height={context.Block[pId].Index} view={context.ViewNumber} Id={pId}"); + localNode.Tell(new LocalNode.SendDirectly { Inventory = context.MakePrepareRequest(pId) }); if (context.Validators.Length == 1) - CheckPreparations(); + CheckPreparations(pId); - if (context.TransactionHashes.Length > 0) + if (context.TransactionHashes[pId].Length > 0) { - foreach (InvPayload payload in InvPayload.CreateGroup(InventoryType.TX, context.TransactionHashes)) + foreach (InvPayload payload in InvPayload.CreateGroup(InventoryType.TX, context.TransactionHashes[pId])) localNode.Tell(Message.Create(MessageCommand.Inv, payload)); } - ChangeTimer(TimeSpan.FromMilliseconds((neoSystem.Settings.MillisecondsPerBlock << (context.ViewNumber + 1)) - (context.ViewNumber == 0 ? neoSystem.Settings.MillisecondsPerBlock : 0))); + ChangeTimer(TimeSpan.FromMilliseconds(context.PrimaryTimerMultiplier * ((neoSystem.Settings.MillisecondsPerBlock << (context.ViewNumber + 1)) - (context.ViewNumber == 0 ? neoSystem.Settings.MillisecondsPerBlock : 0)))); } private void RequestRecovery() { - Log($"Sending {nameof(RecoveryRequest)}: height={context.Block.Index} view={context.ViewNumber} nc={context.CountCommitted} nf={context.CountFailed}"); + Log($"Sending {nameof(RecoveryRequest)}: height={context.Block[0].Index} view={context.ViewNumber} nc={context.CountCommitted} nf={context.CountFailed}"); localNode.Tell(new LocalNode.SendDirectly { Inventory = context.MakeRecoveryRequest() }); } @@ -231,7 +243,7 @@ private void RequestChangeView(ChangeViewReason reason) } else { - Log($"Sending {nameof(ChangeView)}: height={context.Block.Index} view={context.ViewNumber} nv={expectedView} nc={context.CountCommitted} nf={context.CountFailed} reason={reason}"); + Log($"Sending {nameof(ChangeView)}: height={context.Block[0].Index} view={context.ViewNumber} nv={expectedView} nc={context.CountCommitted} nf={context.CountFailed} reason={reason}"); localNode.Tell(new LocalNode.SendDirectly { Inventory = context.MakeChangeView(reason) }); CheckExpectedView(expectedView); } @@ -249,57 +261,71 @@ private void OnTransaction(Transaction transaction) { if (!context.IsBackup || context.NotAcceptingPayloadsDueToViewChanging || !context.RequestSentOrReceived || context.ResponseSent || context.BlockSent) return; - if (context.Transactions.ContainsKey(transaction.Hash)) return; - if (!context.TransactionHashes.Contains(transaction.Hash)) return; + + for (uint i = 0; i <= 1; i++) + if (context.Transactions[i] is not null && context.Transactions[i].ContainsKey(transaction.Hash)) + return; + + bool hashNotRequestedByPrimary = context.TransactionHashes[0] is not null && !context.TransactionHashes[0].Contains(transaction.Hash); + bool hashNotRequestedByBackup = context.TransactionHashes[1] is not null && !context.TransactionHashes[1].Contains(transaction.Hash); + + if (hashNotRequestedByPrimary && hashNotRequestedByBackup) return; + AddTransaction(transaction, true); } private bool AddTransaction(Transaction tx, bool verify) { - if (verify) - { - // At this step we're sure that there's no on-chain transaction that conflicts with - // the provided tx because of the previous Blockchain's OnReceive check. Thus, we only - // need to check that current context doesn't contain conflicting transactions. - VerifyResult result; - - // Firstly, check whether tx has Conlicts attribute with the hash of one of the context's transactions. - foreach (var h in tx.GetAttributes().Select(attr => attr.Hash)) + bool returnValue = false; + for (uint i = 0; i <= 1; i++) + if (context.TransactionHashes[i] is not null && context.TransactionHashes[i].Contains(tx.Hash)) { - if (context.TransactionHashes.Contains(h)) + if (verify) { - result = VerifyResult.HasConflicts; - Log($"Rejected tx: {tx.Hash}, {result}{Environment.NewLine}{tx.ToArray().ToHexString()}", LogLevel.Warning); - RequestChangeView(ChangeViewReason.TxInvalid); - return false; - } - } - // After that, check whether context's transactions have Conflicts attribute with tx's hash. - foreach (var pooledTx in context.Transactions.Values) - { - if (pooledTx.GetAttributes().Select(attr => attr.Hash).Contains(tx.Hash)) - { - result = VerifyResult.HasConflicts; - Log($"Rejected tx: {tx.Hash}, {result}{Environment.NewLine}{tx.ToArray().ToHexString()}", LogLevel.Warning); - RequestChangeView(ChangeViewReason.TxInvalid); - return false; - } - } + // At this step we're sure that there's no on-chain transaction that conflicts with + // the provided tx because of the previous Blockchain's OnReceive check. Thus, we only + // need to check that current context doesn't contain conflicting transactions. + VerifyResult result; - // We've ensured that there's no conlicting transactions in the context, thus, can safely provide an empty conflicting list - // for futher verification. - var conflictingTxs = new List(); - result = tx.Verify(neoSystem.Settings, context.Snapshot, context.VerificationContext, conflictingTxs); - if (result != VerifyResult.Succeed) - { - Log($"Rejected tx: {tx.Hash}, {result}{Environment.NewLine}{tx.ToArray().ToHexString()}", LogLevel.Warning); - RequestChangeView(result == VerifyResult.PolicyFail ? ChangeViewReason.TxRejectedByPolicy : ChangeViewReason.TxInvalid); - return false; + // Firstly, check whether tx has Conlicts attribute with the hash of one of the context's transactions. + foreach (var h in tx.GetAttributes().Select(attr => attr.Hash)) + { + if (context.TransactionHashes[i].Contains(h)) + { + result = VerifyResult.HasConflicts; + Log($"Rejected tx: {tx.Hash}, {result}{Environment.NewLine}{tx.ToArray().ToHexString()}", LogLevel.Warning); + RequestChangeView(ChangeViewReason.TxInvalid); + return false; + } + } + // After that, check whether context's transactions have Conflicts attribute with tx's hash. + foreach (var pooledTx in context.Transactions[i].Values) + { + if (pooledTx.GetAttributes().Select(attr => attr.Hash).Contains(tx.Hash)) + { + result = VerifyResult.HasConflicts; + Log($"Rejected tx: {tx.Hash}, {result}{Environment.NewLine}{tx.ToArray().ToHexString()}", LogLevel.Warning); + RequestChangeView(ChangeViewReason.TxInvalid); + return false; + } + } + + // We've ensured that there's no conlicting transactions in the context, thus, can safely provide an empty conflicting list + // for futher verification. + var conflictingTxs = new List(); + result = tx.Verify(neoSystem.Settings, context.Snapshot, context.VerificationContext[i], conflictingTxs); + if (result != VerifyResult.Succeed) + { + Log($"Rejected tx: {tx.Hash}, {result}{Environment.NewLine}{tx.ToArray().ToHexString()}", LogLevel.Warning); + RequestChangeView(result == VerifyResult.PolicyFail ? ChangeViewReason.TxRejectedByPolicy : ChangeViewReason.TxInvalid); + return false; + } + } + context.Transactions[i][tx.Hash] = tx; + context.VerificationContext[i].AddTransaction(tx); + returnValue = returnValue || CheckPrepareResponse(i); } - } - context.Transactions[tx.Hash] = tx; - context.VerificationContext.AddTransaction(tx); - return CheckPrepareResponse(); + return returnValue; } private void ChangeTimer(TimeSpan delay) @@ -309,7 +335,7 @@ private void ChangeTimer(TimeSpan delay) timer_token.CancelIfNotNull(); timer_token = Context.System.Scheduler.ScheduleTellOnceCancelable(delay, Self, new Timer { - Height = context.Block.Index, + Height = context.Block[0].Index, ViewNumber = context.ViewNumber }, ActorRefs.NoSender); } diff --git a/src/DBFTPlugin/Messages/Commit.cs b/src/DBFTPlugin/Messages/Commit.cs index 6e8fe93d8..a0104f4ad 100644 --- a/src/DBFTPlugin/Messages/Commit.cs +++ b/src/DBFTPlugin/Messages/Commit.cs @@ -19,7 +19,10 @@ public class Commit : ConsensusMessage { public ReadOnlyMemory Signature; - public override int Size => base.Size + Signature.Length; + // priority or fallback + public uint PId; + + public override int Size => base.Size + Signature.Length + sizeof(uint); public Commit() : base(ConsensusMessageType.Commit) { } @@ -27,12 +30,14 @@ public override void Deserialize(ref MemoryReader reader) { base.Deserialize(ref reader); Signature = reader.ReadMemory(64); + PId = reader.ReadUInt32(); } public override void Serialize(BinaryWriter writer) { base.Serialize(writer); writer.Write(Signature.Span); + writer.Write(PId); } } } diff --git a/src/DBFTPlugin/Messages/PreCommit.cs b/src/DBFTPlugin/Messages/PreCommit.cs new file mode 100644 index 000000000..190c123d8 --- /dev/null +++ b/src/DBFTPlugin/Messages/PreCommit.cs @@ -0,0 +1,41 @@ +// Copyright (C) 2015-2024 The Neo Project. +// +// PreCommit.cs file belongs to the neo project and is free +// software distributed under the MIT software license, see the +// accompanying file LICENSE in the main directory of the +// repository or http://www.opensource.org/licenses/mit-license.php +// for more details. +// +// Redistribution and use in source and binary forms with or without +// modifications are permitted. + +using Neo.IO; +using System.IO; + +namespace Neo.Consensus +{ + public class PreCommit : ConsensusMessage + { + public UInt256 PreparationHash; + + // priority or fallback + public uint PId; + public override int Size => base.Size + PreparationHash.Size + sizeof(uint); + + public PreCommit() : base(ConsensusMessageType.PreCommit) { } + + public override void Deserialize(ref MemoryReader reader) + { + base.Deserialize(ref reader); + PreparationHash = reader.ReadSerializable(); + PId = reader.ReadUInt32(); + } + + public override void Serialize(BinaryWriter writer) + { + base.Serialize(writer); + writer.Write(PreparationHash); + writer.Write(PId); + } + } +} diff --git a/src/DBFTPlugin/Messages/PrepareResponse.cs b/src/DBFTPlugin/Messages/PrepareResponse.cs index 7510ff99b..a7410d8e7 100644 --- a/src/DBFTPlugin/Messages/PrepareResponse.cs +++ b/src/DBFTPlugin/Messages/PrepareResponse.cs @@ -10,6 +10,7 @@ // modifications are permitted. using Neo.IO; +using System; using System.IO; namespace Neo.Consensus @@ -18,7 +19,9 @@ public class PrepareResponse : ConsensusMessage { public UInt256 PreparationHash; - public override int Size => base.Size + PreparationHash.Size; + // priority or fallback + public uint PId; + public override int Size => base.Size + PreparationHash.Size + sizeof(uint); public PrepareResponse() : base(ConsensusMessageType.PrepareResponse) { } @@ -26,12 +29,14 @@ public override void Deserialize(ref MemoryReader reader) { base.Deserialize(ref reader); PreparationHash = reader.ReadSerializable(); + PId = reader.ReadUInt32(); } public override void Serialize(BinaryWriter writer) { base.Serialize(writer); writer.Write(PreparationHash); + writer.Write(PId); } } } diff --git a/src/DBFTPlugin/Messages/RecoveryMessage/RecoveryMessage.PreCommitPayloadCompact.cs b/src/DBFTPlugin/Messages/RecoveryMessage/RecoveryMessage.PreCommitPayloadCompact.cs new file mode 100644 index 000000000..65b90d61f --- /dev/null +++ b/src/DBFTPlugin/Messages/RecoveryMessage/RecoveryMessage.PreCommitPayloadCompact.cs @@ -0,0 +1,50 @@ +// Copyright (C) 2015-2024 The Neo Project. +// +// RecoveryMessage.PreCommitPayloadCompact.cs file belongs to the neo project and is free +// software distributed under the MIT software license, see the +// accompanying file LICENSE in the main directory of the +// repository or http://www.opensource.org/licenses/mit-license.php +// for more details. +// +// Redistribution and use in source and binary forms with or without +// modifications are permitted. + +using Neo.IO; +using System; +using System.IO; + +namespace Neo.Consensus +{ + partial class RecoveryMessage + { + public class PreCommitPayloadCompact : ISerializable + { + public byte ViewNumber; + public byte ValidatorIndex; + public UInt256 PreparationHash; + public ReadOnlyMemory InvocationScript; + + int ISerializable.Size => + sizeof(byte) + //ViewNumber + sizeof(byte) + //ValidatorIndex + UInt256.Length + //PreparationHash + InvocationScript.GetVarSize(); //InvocationScript + + void ISerializable.Deserialize(ref MemoryReader reader) + { + ViewNumber = reader.ReadByte(); + ValidatorIndex = reader.ReadByte(); + PreparationHash = reader.ReadSerializable(); + InvocationScript = reader.ReadVarMemory(1024); + } + + void ISerializable.Serialize(BinaryWriter writer) + { + writer.Write(ViewNumber); + writer.Write(ValidatorIndex); + writer.Write(PreparationHash); + writer.WriteVarBytes(InvocationScript.Span); + } + } + } +} diff --git a/src/DBFTPlugin/Messages/RecoveryMessage/RecoveryMessage.cs b/src/DBFTPlugin/Messages/RecoveryMessage/RecoveryMessage.cs index fc688d6a1..0c75866f8 100644 --- a/src/DBFTPlugin/Messages/RecoveryMessage/RecoveryMessage.cs +++ b/src/DBFTPlugin/Messages/RecoveryMessage/RecoveryMessage.cs @@ -20,12 +20,14 @@ namespace Neo.Consensus { public partial class RecoveryMessage : ConsensusMessage { + public uint PId; public Dictionary ChangeViewMessages; public PrepareRequest PrepareRequestMessage; /// The PreparationHash in case the PrepareRequest hasn't been received yet. /// This can be null if the PrepareRequest information is present, since it can be derived in that case. public UInt256 PreparationHash; public Dictionary PreparationMessages; + public Dictionary PreCommitMessages; public Dictionary CommitMessages; public override int Size => base.Size @@ -40,6 +42,7 @@ public RecoveryMessage() : base(ConsensusMessageType.RecoveryMessage) { } public override void Deserialize(ref MemoryReader reader) { base.Deserialize(ref reader); + PId = reader.ReadUInt32(); ChangeViewMessages = reader.ReadSerializableArray(byte.MaxValue).ToDictionary(p => p.ValidatorIndex); if (reader.ReadBoolean()) { @@ -53,6 +56,7 @@ public override void Deserialize(ref MemoryReader reader) } PreparationMessages = reader.ReadSerializableArray(byte.MaxValue).ToDictionary(p => p.ValidatorIndex); + PreCommitMessages = reader.ReadSerializableArray(byte.MaxValue).ToDictionary(p => p.ValidatorIndex); CommitMessages = reader.ReadSerializableArray(byte.MaxValue).ToDictionary(p => p.ValidatorIndex); } @@ -83,6 +87,7 @@ internal ExtensiblePayload[] GetCommitPayloadsFromRecoveryMessage(ConsensusConte BlockIndex = BlockIndex, ValidatorIndex = p.ValidatorIndex, ViewNumber = p.ViewNumber, + PId = PId, Signature = p.Signature }, p.InvocationScript)).ToArray(); } @@ -90,27 +95,41 @@ internal ExtensiblePayload[] GetCommitPayloadsFromRecoveryMessage(ConsensusConte internal ExtensiblePayload GetPrepareRequestPayload(ConsensusContext context) { if (PrepareRequestMessage == null) return null; - if (!PreparationMessages.TryGetValue(context.Block.PrimaryIndex, out PreparationPayloadCompact compact)) + if (!PreparationMessages.TryGetValue(context.Block[PId].PrimaryIndex, out PreparationPayloadCompact compact)) return null; return context.CreatePayload(PrepareRequestMessage, compact.InvocationScript); } internal ExtensiblePayload[] GetPrepareResponsePayloads(ConsensusContext context) { - UInt256 preparationHash = PreparationHash ?? context.PreparationPayloads[context.Block.PrimaryIndex]?.Hash; + UInt256 preparationHash = PreparationHash ?? context.PreparationPayloads[PId][context.Block[PId].PrimaryIndex]?.Hash; if (preparationHash is null) return Array.Empty(); - return PreparationMessages.Values.Where(p => p.ValidatorIndex != context.Block.PrimaryIndex).Select(p => context.CreatePayload(new PrepareResponse + return PreparationMessages.Values.Where(p => p.ValidatorIndex != context.Block[0].PrimaryIndex).Select(p => context.CreatePayload(new PrepareResponse { BlockIndex = BlockIndex, ValidatorIndex = p.ValidatorIndex, ViewNumber = ViewNumber, - PreparationHash = preparationHash + PId = PId, + PreparationHash = preparationHash, + }, p.InvocationScript)).ToArray(); + } + + internal ExtensiblePayload[] GetPreCommitPayloads(ConsensusContext context) + { + return PreCommitMessages.Values.Select(p => context.CreatePayload(new PreCommit + { + BlockIndex = BlockIndex, + ViewNumber = p.ViewNumber, + ValidatorIndex = p.ValidatorIndex, + PreparationHash = p.PreparationHash, + PId = PId, }, p.InvocationScript)).ToArray(); } public override void Serialize(BinaryWriter writer) { base.Serialize(writer); + writer.Write(PId); writer.Write(ChangeViewMessages.Values.ToArray()); bool hasPrepareRequestMessage = PrepareRequestMessage != null; writer.Write(hasPrepareRequestMessage); @@ -125,6 +144,7 @@ public override void Serialize(BinaryWriter writer) } writer.Write(PreparationMessages.Values.ToArray()); + writer.Write(PreCommitMessages.Values.ToArray()); writer.Write(CommitMessages.Values.ToArray()); } } diff --git a/src/DBFTPlugin/Types/ConsensusMessageType.cs b/src/DBFTPlugin/Types/ConsensusMessageType.cs index f325133f0..fedd9d8f8 100644 --- a/src/DBFTPlugin/Types/ConsensusMessageType.cs +++ b/src/DBFTPlugin/Types/ConsensusMessageType.cs @@ -18,6 +18,7 @@ public enum ConsensusMessageType : byte PrepareRequest = 0x20, PrepareResponse = 0x21, Commit = 0x30, + PreCommit = 0x31, RecoveryRequest = 0x40, RecoveryMessage = 0x41,