Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat/resetSignatories #101

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 20 additions & 11 deletions mq/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,31 @@ func New(opts Options) MessageQueue {
// have heights up to (and including) the given height. The appropriate callback
// will be called for every message that is consumed. All consumed messages will
// be dropped from the MessageQueue.
func (mq *MessageQueue) Consume(h process.Height, propose func(process.Propose), prevote func(process.Prevote), precommit func(process.Precommit)) (n int) {
func (mq *MessageQueue) Consume(h process.Height, propose func(process.Propose), prevote func(process.Prevote), precommit func(process.Precommit), procsAllowed map[id.Signatory]bool) (n int) {
for from, q := range mq.queuesByPid {
for len(q) > 0 {
if q[0] == nil || height(q[0]) > h {
break
}
switch msg := q[0].(type) {
case process.Propose:
propose(msg)
case process.Prevote:
prevote(msg)
case process.Precommit:
precommit(msg)
}
n++
q = q[1:]
func() {
defer func() {
n++
q = q[1:]
}()

if ok := procsAllowed[from]; !ok {
return
}

switch msg := q[0].(type) {
case process.Propose:
propose(msg)
case process.Prevote:
prevote(msg)
case process.Precommit:
precommit(msg)
}
}()
}
mq.queuesByPid[from] = q
}
Expand Down
37 changes: 26 additions & 11 deletions mq/mq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ var _ = Describe("MQ", func() {
proposeCallback,
prevoteCallback,
precommitCallback,
map[id.Signatory]bool{},
)

Expect(n).To(Equal(0))
Expand All @@ -124,6 +125,8 @@ var _ = Describe("MQ", func() {
sender := id.NewPrivKey().Signatory()
lowerHeight := process.Height(r.Int63())
higherHeight := lowerHeight + 1 + process.Height(r.Intn(100))
procsAllowed := map[id.Signatory]bool{}
procsAllowed[sender] = true

// send msg1
msg1 := randomMsg(r, sender, lowerHeight, processutil.RandomRound(r))
Expand Down Expand Up @@ -199,12 +202,12 @@ var _ = Describe("MQ", func() {

// cannot consume msgs of height less than lowerHeight
evenLowerHeight := lowerHeight - 1 - process.Height(r.Intn(100))
n := queue.Consume(evenLowerHeight, proposeCallback, prevoteCallback, precommitCallback)
n := queue.Consume(evenLowerHeight, proposeCallback, prevoteCallback, precommitCallback, procsAllowed)
Expect(n).To(Equal(0))
Expect(i).To(Equal(0))

// consume all messages
n = queue.Consume(higherHeight, proposeCallback, prevoteCallback, precommitCallback)
n = queue.Consume(higherHeight, proposeCallback, prevoteCallback, precommitCallback, procsAllowed)
Expect(n).To(Equal(2))
Expect(i).To(Equal(2))

Expand All @@ -222,6 +225,9 @@ var _ = Describe("MQ", func() {
loop := func() bool {
sender := id.NewPrivKey().Signatory()
height := process.Height(r.Int63())
procsAllowed := map[id.Signatory]bool{}
procsAllowed[sender] = true

// at the most 20 rounds
rounds := make([]process.Round, 1+r.Intn(20))
for t := 0; t < cap(rounds); t++ {
Expand Down Expand Up @@ -280,12 +286,12 @@ var _ = Describe("MQ", func() {

// cannot consume msgs of height less than lowerHeight
lowerHeight := height - 1 - process.Height(r.Intn(100))
n := queue.Consume(lowerHeight, proposeCallback, prevoteCallback, precommitCallback)
n := queue.Consume(lowerHeight, proposeCallback, prevoteCallback, precommitCallback, procsAllowed)
Expect(n).To(Equal(0))
Expect(t).To(Equal(0))

// consume all messages
n = queue.Consume(height, proposeCallback, prevoteCallback, precommitCallback)
n = queue.Consume(height, proposeCallback, prevoteCallback, precommitCallback, procsAllowed)
Expect(n).To(Equal(cap(rounds)))
Expect(t).To(Equal(cap(rounds)))

Expand All @@ -303,6 +309,8 @@ var _ = Describe("MQ", func() {
loop := func() bool {
sender := id.NewPrivKey().Signatory()
minHeight, maxHeight, msgsCount := insertRandomMessages(&queue, sender)
procsAllowed := map[id.Signatory]bool{}
procsAllowed[sender] = true

// we should first consume msg1 and then msg2
prevHeight := process.Height(-1)
Expand Down Expand Up @@ -369,12 +377,12 @@ var _ = Describe("MQ", func() {
}

// cannot consume msgs of height less than the min height
n := queue.Consume(minHeight-1, proposeCallback, prevoteCallback, precommitCallback)
n := queue.Consume(minHeight-1, proposeCallback, prevoteCallback, precommitCallback, procsAllowed)
Expect(n).To(Equal(0))
Expect(i).To(Equal(0))

// consume all messages
n = queue.Consume(maxHeight, proposeCallback, prevoteCallback, precommitCallback)
n = queue.Consume(maxHeight, proposeCallback, prevoteCallback, precommitCallback, procsAllowed)
Expect(n).To(Equal(msgsCount))
Expect(i).To(Equal(msgsCount))

Expand All @@ -392,6 +400,8 @@ var _ = Describe("MQ", func() {

loop := func() bool {
sender := id.NewPrivKey().Signatory()
procsAllowed := map[id.Signatory]bool{}
procsAllowed[sender] = true
_, maxHeight, _ := insertRandomMessages(&queue, sender)
thresholdHeight := process.Height(r.Intn(int(maxHeight)))
queue.DropMessagesBelowHeight(thresholdHeight)
Expand All @@ -406,7 +416,7 @@ var _ = Describe("MQ", func() {
Expect(precommit.Height >= thresholdHeight).To(BeTrue())
}

_ = queue.Consume(maxHeight, proposeCallback, prevoteCallback, precommitCallback)
_ = queue.Consume(maxHeight, proposeCallback, prevoteCallback, precommitCallback, procsAllowed)
return true
}
Expect(quick.Check(loop, nil)).To(Succeed())
Expand All @@ -418,13 +428,15 @@ var _ = Describe("MQ", func() {
loop := func() bool {
opts := mq.DefaultOptions().WithMaxCapacity(1)
queue := mq.New(opts)
procsAllowed := map[id.Signatory]bool{}

// insert a msg
originalSender := id.NewPrivKey().Signatory()
originalMsg := processutil.RandomPropose(r)
originalMsg.From = originalSender
originalMsg.Height = process.Height(1)
originalMsg.Round = process.Round(1)
procsAllowed[originalMsg.From] = true
queue.InsertPropose(originalMsg)

// any message in height > 1 or (height = 1 || round > 1) will be dropped
Expand All @@ -434,11 +446,12 @@ var _ = Describe("MQ", func() {
msg.From = id.NewPrivKey().Signatory()
msg.Height = process.Height(1)
msg.Round = process.Round(2)
procsAllowed[msg.From] = true
queue.InsertPropose(msg)

// so consuming will only return the first msg
proposeCallback := func(propose process.Propose) {}
n := queue.Consume(process.Height(1), proposeCallback, nil, nil)
n := queue.Consume(process.Height(1), proposeCallback, nil, nil, procsAllowed)
Expect(n).To(Equal(2))

// re-insert the original msg
Expand All @@ -458,7 +471,7 @@ var _ = Describe("MQ", func() {
Expect(propose.Round).To(Equal(originalMsg.Round))
Expect(propose.From).To(Equal(originalSender))
}
n = queue.Consume(process.Height(1), proposeCallback, nil, nil)
n = queue.Consume(process.Height(1), proposeCallback, nil, nil, procsAllowed)
Expect(n).To(Equal(1))

// re-insert the original msg
Expand All @@ -477,7 +490,7 @@ var _ = Describe("MQ", func() {
Expect(propose.Round).To(Equal(msg.Round))
Expect(propose.From).To(Equal(originalSender))
}
n = queue.Consume(process.Height(1), proposeCallback, nil, nil)
n = queue.Consume(process.Height(1), proposeCallback, nil, nil, procsAllowed)
Expect(n).To(Equal(1))

return true
Expand All @@ -497,6 +510,8 @@ var _ = Describe("MQ", func() {
// msgsCount > c
sender := id.NewPrivKey().Signatory()
height := process.Height(1)
procsAllowed := map[id.Signatory]bool{}
procsAllowed[sender] = true
msgsCount := c + 5 + r.Intn(20)
rounds := make([]process.Round, msgsCount)
msgs := make([]interface{}, msgsCount)
Expand Down Expand Up @@ -553,7 +568,7 @@ var _ = Describe("MQ", func() {
i++
}

n := queue.Consume(height, proposeCallback, prevoteCallback, precommitCallback)
n := queue.Consume(height, proposeCallback, prevoteCallback, precommitCallback, procsAllowed)
Expect(n).To(Equal(c))
Expect(i).To(Equal(c))

Expand Down
6 changes: 6 additions & 0 deletions process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package process

import (
"fmt"
"sync/atomic"

"github.com/renproject/id"
"github.com/renproject/surge"
Expand Down Expand Up @@ -343,6 +344,11 @@ func (p *Process) StartRound(round Round) {
}
}

func (p *Process) ResetF(f uint64, scheduler Scheduler) {
atomic.StoreUint64(&p.f, f)
p.scheduler = scheduler
}

// OnTimeoutPropose is used to notify the Process that a timeout has been
// activated. It must only be called after the TimeoutPropose method in the
// Timer has been called.
Expand Down
37 changes: 22 additions & 15 deletions replica/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,29 +118,30 @@ func (replica *Replica) Run(ctx context.Context) {
if !replica.filterHeight(m.Height) {
return
}
if !replica.filterFrom(m.From) {
return
}
replica.mq.InsertPropose(m)
case process.Prevote:
if !replica.filterHeight(m.Height) {
return
}
if !replica.filterFrom(m.From) {
return
}
replica.mq.InsertPrevote(m)
case process.Precommit:
if !replica.filterHeight(m.Height) {
return
}
if !replica.filterFrom(m.From) {
return
}
replica.mq.InsertPrecommit(m)
case process.Height:
replica.proc.State = process.DefaultState().WithCurrentHeight(m)
replica.mq.DropMessagesBelowHeight(m)
case []id.Signatory:
procAllowed := map[id.Signatory]bool{}
for _, sig := range m {
procAllowed[sig] = true
}
replica.procsAllowed = procAllowed

scheduler := scheduler.NewRoundRobin(m)
f := len(procAllowed) / 3
replica.proc.ResetF(uint64(f), scheduler)
}
}

Expand Down Expand Up @@ -212,20 +213,29 @@ func (replica *Replica) TimeoutPrecommit(ctx context.Context, timeout timer.Time
}
}

// ResetHeight of the underlying process to a future height. This is should only
// ResetHeight of the underlying process to a future height. This should only
// be used when resynchronising the chain. If the given height is less than or
// equal to the current height, nothing happens.
//
// NOTE: All messages that are currently in the message queue for heights less
// than the given height will be dropped.
func (replica *Replica) ResetHeight(ctx context.Context, newHeight process.Height) {
func (replica *Replica) ResetHeight(ctx context.Context, newHeight process.Height, signatories []id.Signatory) {
if newHeight <= replica.proc.State.CurrentHeight {
return
}
select {
case <-ctx.Done():
case replica.mch <- newHeight:
}

if len(signatories) == 0 {
return
}

select {
case <-ctx.Done():
case replica.mch <- signatories:
}
}

// State returns the current height, round and step of the underlying process.
Expand All @@ -242,17 +252,14 @@ func (replica *Replica) filterHeight(height process.Height) bool {
return height >= replica.proc.CurrentHeight
}

func (replica *Replica) filterFrom(from id.Signatory) bool {
return replica.procsAllowed[from]
}

func (replica *Replica) flush() {
for {
n := replica.mq.Consume(
replica.proc.CurrentHeight,
replica.proc.Propose,
replica.proc.Prevote,
replica.proc.Precommit,
replica.procsAllowed,
)
if n == 0 {
return
Expand Down