diff --git a/mq/mq.go b/mq/mq.go index 9b0a970..7ccf48f 100644 --- a/mq/mq.go +++ b/mq/mq.go @@ -33,22 +33,31 @@ func New(opts Options) MessageQueue { // have heights up to (and including) the given height. The appropriate callback // will be called for every message that is consumed. All consumed messages will // be dropped from the MessageQueue. -func (mq *MessageQueue) Consume(h process.Height, propose func(process.Propose), prevote func(process.Prevote), precommit func(process.Precommit)) (n int) { +func (mq *MessageQueue) Consume(h process.Height, propose func(process.Propose), prevote func(process.Prevote), precommit func(process.Precommit), procsAllowed map[id.Signatory]bool) (n int) { for from, q := range mq.queuesByPid { for len(q) > 0 { if q[0] == nil || height(q[0]) > h { break } - switch msg := q[0].(type) { - case process.Propose: - propose(msg) - case process.Prevote: - prevote(msg) - case process.Precommit: - precommit(msg) - } - n++ - q = q[1:] + func() { + defer func() { + n++ + q = q[1:] + }() + + if ok := procsAllowed[from]; !ok { + return + } + + switch msg := q[0].(type) { + case process.Propose: + propose(msg) + case process.Prevote: + prevote(msg) + case process.Precommit: + precommit(msg) + } + }() } mq.queuesByPid[from] = q } diff --git a/mq/mq_test.go b/mq/mq_test.go index 4fdd9e5..73fd571 100644 --- a/mq/mq_test.go +++ b/mq/mq_test.go @@ -108,6 +108,7 @@ var _ = Describe("MQ", func() { proposeCallback, prevoteCallback, precommitCallback, + map[id.Signatory]bool{}, ) Expect(n).To(Equal(0)) @@ -124,6 +125,8 @@ var _ = Describe("MQ", func() { sender := id.NewPrivKey().Signatory() lowerHeight := process.Height(r.Int63()) higherHeight := lowerHeight + 1 + process.Height(r.Intn(100)) + procsAllowed := map[id.Signatory]bool{} + procsAllowed[sender] = true // send msg1 msg1 := randomMsg(r, sender, lowerHeight, processutil.RandomRound(r)) @@ -199,12 +202,12 @@ var _ = Describe("MQ", func() { // cannot consume msgs of height less than lowerHeight evenLowerHeight := lowerHeight - 1 - process.Height(r.Intn(100)) - n := queue.Consume(evenLowerHeight, proposeCallback, prevoteCallback, precommitCallback) + n := queue.Consume(evenLowerHeight, proposeCallback, prevoteCallback, precommitCallback, procsAllowed) Expect(n).To(Equal(0)) Expect(i).To(Equal(0)) // consume all messages - n = queue.Consume(higherHeight, proposeCallback, prevoteCallback, precommitCallback) + n = queue.Consume(higherHeight, proposeCallback, prevoteCallback, precommitCallback, procsAllowed) Expect(n).To(Equal(2)) Expect(i).To(Equal(2)) @@ -222,6 +225,9 @@ var _ = Describe("MQ", func() { loop := func() bool { sender := id.NewPrivKey().Signatory() height := process.Height(r.Int63()) + procsAllowed := map[id.Signatory]bool{} + procsAllowed[sender] = true + // at the most 20 rounds rounds := make([]process.Round, 1+r.Intn(20)) for t := 0; t < cap(rounds); t++ { @@ -280,12 +286,12 @@ var _ = Describe("MQ", func() { // cannot consume msgs of height less than lowerHeight lowerHeight := height - 1 - process.Height(r.Intn(100)) - n := queue.Consume(lowerHeight, proposeCallback, prevoteCallback, precommitCallback) + n := queue.Consume(lowerHeight, proposeCallback, prevoteCallback, precommitCallback, procsAllowed) Expect(n).To(Equal(0)) Expect(t).To(Equal(0)) // consume all messages - n = queue.Consume(height, proposeCallback, prevoteCallback, precommitCallback) + n = queue.Consume(height, proposeCallback, prevoteCallback, precommitCallback, procsAllowed) Expect(n).To(Equal(cap(rounds))) Expect(t).To(Equal(cap(rounds))) @@ -303,6 +309,8 @@ var _ = Describe("MQ", func() { loop := func() bool { sender := id.NewPrivKey().Signatory() minHeight, maxHeight, msgsCount := insertRandomMessages(&queue, sender) + procsAllowed := map[id.Signatory]bool{} + procsAllowed[sender] = true // we should first consume msg1 and then msg2 prevHeight := process.Height(-1) @@ -369,12 +377,12 @@ var _ = Describe("MQ", func() { } // cannot consume msgs of height less than the min height - n := queue.Consume(minHeight-1, proposeCallback, prevoteCallback, precommitCallback) + n := queue.Consume(minHeight-1, proposeCallback, prevoteCallback, precommitCallback, procsAllowed) Expect(n).To(Equal(0)) Expect(i).To(Equal(0)) // consume all messages - n = queue.Consume(maxHeight, proposeCallback, prevoteCallback, precommitCallback) + n = queue.Consume(maxHeight, proposeCallback, prevoteCallback, precommitCallback, procsAllowed) Expect(n).To(Equal(msgsCount)) Expect(i).To(Equal(msgsCount)) @@ -392,6 +400,8 @@ var _ = Describe("MQ", func() { loop := func() bool { sender := id.NewPrivKey().Signatory() + procsAllowed := map[id.Signatory]bool{} + procsAllowed[sender] = true _, maxHeight, _ := insertRandomMessages(&queue, sender) thresholdHeight := process.Height(r.Intn(int(maxHeight))) queue.DropMessagesBelowHeight(thresholdHeight) @@ -406,7 +416,7 @@ var _ = Describe("MQ", func() { Expect(precommit.Height >= thresholdHeight).To(BeTrue()) } - _ = queue.Consume(maxHeight, proposeCallback, prevoteCallback, precommitCallback) + _ = queue.Consume(maxHeight, proposeCallback, prevoteCallback, precommitCallback, procsAllowed) return true } Expect(quick.Check(loop, nil)).To(Succeed()) @@ -418,6 +428,7 @@ var _ = Describe("MQ", func() { loop := func() bool { opts := mq.DefaultOptions().WithMaxCapacity(1) queue := mq.New(opts) + procsAllowed := map[id.Signatory]bool{} // insert a msg originalSender := id.NewPrivKey().Signatory() @@ -425,6 +436,7 @@ var _ = Describe("MQ", func() { originalMsg.From = originalSender originalMsg.Height = process.Height(1) originalMsg.Round = process.Round(1) + procsAllowed[originalMsg.From] = true queue.InsertPropose(originalMsg) // any message in height > 1 or (height = 1 || round > 1) will be dropped @@ -434,11 +446,12 @@ var _ = Describe("MQ", func() { msg.From = id.NewPrivKey().Signatory() msg.Height = process.Height(1) msg.Round = process.Round(2) + procsAllowed[msg.From] = true queue.InsertPropose(msg) // so consuming will only return the first msg proposeCallback := func(propose process.Propose) {} - n := queue.Consume(process.Height(1), proposeCallback, nil, nil) + n := queue.Consume(process.Height(1), proposeCallback, nil, nil, procsAllowed) Expect(n).To(Equal(2)) // re-insert the original msg @@ -458,7 +471,7 @@ var _ = Describe("MQ", func() { Expect(propose.Round).To(Equal(originalMsg.Round)) Expect(propose.From).To(Equal(originalSender)) } - n = queue.Consume(process.Height(1), proposeCallback, nil, nil) + n = queue.Consume(process.Height(1), proposeCallback, nil, nil, procsAllowed) Expect(n).To(Equal(1)) // re-insert the original msg @@ -477,7 +490,7 @@ var _ = Describe("MQ", func() { Expect(propose.Round).To(Equal(msg.Round)) Expect(propose.From).To(Equal(originalSender)) } - n = queue.Consume(process.Height(1), proposeCallback, nil, nil) + n = queue.Consume(process.Height(1), proposeCallback, nil, nil, procsAllowed) Expect(n).To(Equal(1)) return true @@ -497,6 +510,8 @@ var _ = Describe("MQ", func() { // msgsCount > c sender := id.NewPrivKey().Signatory() height := process.Height(1) + procsAllowed := map[id.Signatory]bool{} + procsAllowed[sender] = true msgsCount := c + 5 + r.Intn(20) rounds := make([]process.Round, msgsCount) msgs := make([]interface{}, msgsCount) @@ -553,7 +568,7 @@ var _ = Describe("MQ", func() { i++ } - n := queue.Consume(height, proposeCallback, prevoteCallback, precommitCallback) + n := queue.Consume(height, proposeCallback, prevoteCallback, precommitCallback, procsAllowed) Expect(n).To(Equal(c)) Expect(i).To(Equal(c)) diff --git a/process/process.go b/process/process.go index b448de4..7c0630f 100644 --- a/process/process.go +++ b/process/process.go @@ -8,6 +8,7 @@ package process import ( "fmt" + "sync/atomic" "github.com/renproject/id" "github.com/renproject/surge" @@ -343,6 +344,11 @@ func (p *Process) StartRound(round Round) { } } +func (p *Process) ResetF(f uint64, scheduler Scheduler) { + atomic.StoreUint64(&p.f, f) + p.scheduler = scheduler +} + // OnTimeoutPropose is used to notify the Process that a timeout has been // activated. It must only be called after the TimeoutPropose method in the // Timer has been called. diff --git a/replica/replica.go b/replica/replica.go index ab3a5c1..4513576 100644 --- a/replica/replica.go +++ b/replica/replica.go @@ -118,29 +118,30 @@ func (replica *Replica) Run(ctx context.Context) { if !replica.filterHeight(m.Height) { return } - if !replica.filterFrom(m.From) { - return - } replica.mq.InsertPropose(m) case process.Prevote: if !replica.filterHeight(m.Height) { return } - if !replica.filterFrom(m.From) { - return - } replica.mq.InsertPrevote(m) case process.Precommit: if !replica.filterHeight(m.Height) { return } - if !replica.filterFrom(m.From) { - return - } replica.mq.InsertPrecommit(m) case process.Height: replica.proc.State = process.DefaultState().WithCurrentHeight(m) replica.mq.DropMessagesBelowHeight(m) + case []id.Signatory: + procAllowed := map[id.Signatory]bool{} + for _, sig := range m { + procAllowed[sig] = true + } + replica.procsAllowed = procAllowed + + scheduler := scheduler.NewRoundRobin(m) + f := len(procAllowed) / 3 + replica.proc.ResetF(uint64(f), scheduler) } } @@ -212,13 +213,13 @@ func (replica *Replica) TimeoutPrecommit(ctx context.Context, timeout timer.Time } } -// ResetHeight of the underlying process to a future height. This is should only +// ResetHeight of the underlying process to a future height. This should only // be used when resynchronising the chain. If the given height is less than or // equal to the current height, nothing happens. // // NOTE: All messages that are currently in the message queue for heights less // than the given height will be dropped. -func (replica *Replica) ResetHeight(ctx context.Context, newHeight process.Height) { +func (replica *Replica) ResetHeight(ctx context.Context, newHeight process.Height, signatories []id.Signatory) { if newHeight <= replica.proc.State.CurrentHeight { return } @@ -226,6 +227,15 @@ func (replica *Replica) ResetHeight(ctx context.Context, newHeight process.Heigh case <-ctx.Done(): case replica.mch <- newHeight: } + + if len(signatories) == 0 { + return + } + + select { + case <-ctx.Done(): + case replica.mch <- signatories: + } } // State returns the current height, round and step of the underlying process. @@ -242,10 +252,6 @@ func (replica *Replica) filterHeight(height process.Height) bool { return height >= replica.proc.CurrentHeight } -func (replica *Replica) filterFrom(from id.Signatory) bool { - return replica.procsAllowed[from] -} - func (replica *Replica) flush() { for { n := replica.mq.Consume( @@ -253,6 +259,7 @@ func (replica *Replica) flush() { replica.proc.Propose, replica.proc.Prevote, replica.proc.Precommit, + replica.procsAllowed, ) if n == 0 { return