Skip to content

Commit cac14b6

Browse files
authored
Merge pull request #60 from getamis/feature/implement-round-change-protocol
consensus/pbft: implement round change protocol
2 parents c8c1247 + 320e727 commit cac14b6

24 files changed

+603
-301
lines changed

consensus/pbft/backend.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@ type Backend interface {
4141
// Commit is to deliver a final result to write into blockchain
4242
Commit(Proposal) error
4343

44-
// ViewChanged is called when view change occurred
45-
ViewChanged(needNewProposal bool) error
44+
// RoundChanged is called when round change occurred
45+
RoundChanged(needNewProposal bool) error
4646

4747
// Verify is to verify the proposal request
4848
Verify(Proposal) error

consensus/pbft/backends/simple/backend.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ func (sb *simpleBackend) Broadcast(payload []byte) error {
121121

122122
// Commit implements pbft.Backend.Commit
123123
func (sb *simpleBackend) Commit(proposal pbft.Proposal) error {
124-
sb.logger.Info("Committed", "address", sb.Address().Hex(), "proposal", proposal)
124+
sb.logger.Info("Committed", "address", sb.Address().Hex(), "hash", proposal.Hash(), "number", proposal.Number().Uint64())
125125
// step1: update validator set from extra data of block
126126
// step2: insert chain
127127
block := &types.Block{}
@@ -145,8 +145,8 @@ func (sb *simpleBackend) Commit(proposal pbft.Proposal) error {
145145
return sb.inserter(block)
146146
}
147147

148-
// ViewChanged implements pbft.Backend.ViewChanged
149-
func (sb *simpleBackend) ViewChanged(needNewProposal bool) error {
148+
// RoundChanged implements pbft.Backend.RoundChanged
149+
func (sb *simpleBackend) RoundChanged(needNewProposal bool) error {
150150
if needNewProposal {
151151
// if I was or I'm a proposer now, fire a ChainHeadEvent to trigger next Seal()
152152
if !common.EmptyHash(sb.proposedBlockHash) || sb.IsProposer() {

consensus/pbft/backends/simple/engine_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ func TestSealRoundChange(t *testing.T) {
166166
if !ok {
167167
t.Errorf("unexpected event comes, got: %v, expected: pbft.RequestEvent", reflect.TypeOf(ev.Data))
168168
}
169-
engine.ViewChanged(false)
169+
engine.RoundChanged(true)
170170
}
171171
eventSub.Unsubscribe()
172172
}

consensus/pbft/config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ type Config struct {
3131
}
3232

3333
var DefaultConfig = &Config{
34-
RequestTimeoutMsec: 3000,
34+
RequestTimeoutMsec: 10000,
3535
BlockPeriod: 1,
3636
BlockPauseTime: 2,
3737
ProposerPolicy: RoundRobin,

consensus/pbft/core/backlog.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ var (
2626
StateAcceptRequest: msgPreprepare,
2727
StatePreprepared: msgPrepare,
2828
StatePrepared: msgCommit,
29-
StateCommitted: msgNewView,
29+
StateCommitted: msgAll,
3030
}
3131
)
3232

@@ -56,7 +56,7 @@ func (c *core) storeBacklog(msg *message, src pbft.Validator) {
5656
return
5757
}
5858

59-
logger.Debug("Store future message")
59+
logger.Trace("Store future message")
6060

6161
c.backlogsMu.Lock()
6262
defer c.backlogsMu.Unlock()
@@ -123,15 +123,15 @@ func (c *core) processBacklog() {
123123
}
124124
// Push back if it's a future message
125125
if c.isFutureMessage(msg.Code, view) {
126-
logger.Debug("Stop processing backlog", "msg", msg)
126+
logger.Trace("Stop processing backlog", "msg", msg)
127127
backlog.Push(msg, prio)
128128
isFuture = true
129129
break
130130
}
131131

132-
logger.Debug("Post backlog event", "msg", msg)
132+
logger.Trace("Post backlog event", "msg", msg)
133133

134-
go c.sendInternalEvent(backlogEvent{
134+
go c.sendEvent(backlogEvent{
135135
src: src,
136136
msg: msg,
137137
})

consensus/pbft/core/backlog_test.go

Lines changed: 27 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,11 @@ import (
3333

3434
func TestIsFutureMessage(t *testing.T) {
3535
c := &core{
36-
state: StateAcceptRequest,
37-
sequence: big.NewInt(1),
38-
round: big.NewInt(0),
36+
state: StateAcceptRequest,
37+
current: newSnapshot(&pbft.View{
38+
Sequence: big.NewInt(1),
39+
Round: big.NewInt(0),
40+
}, newTestValidatorSet(4)),
3941
}
4042

4143
// invalid view format
@@ -147,7 +149,7 @@ func TestStoreBacklog(t *testing.T) {
147149
p := validator.New(common.StringToAddress("12345667890"))
148150
// push preprepare msg
149151
preprepare := &pbft.Preprepare{
150-
View: v,
152+
View: v,
151153
Proposal: makeBlock(1),
152154
}
153155
prepreparePayload, _ := Encode(preprepare)
@@ -192,14 +194,15 @@ func TestProcessFutureBacklog(t *testing.T) {
192194
events: new(event.TypeMux),
193195
}
194196
c := &core{
195-
logger: log.New("backend", "test", "id", 0),
196-
backlogs: make(map[pbft.Validator]*prque.Prque),
197-
backlogsMu: new(sync.Mutex),
198-
backend: backend,
199-
sequence: big.NewInt(1),
200-
round: big.NewInt(0),
201-
state: StateAcceptRequest,
202-
internalMux: new(event.TypeMux),
197+
logger: log.New("backend", "test", "id", 0),
198+
backlogs: make(map[pbft.Validator]*prque.Prque),
199+
backlogsMu: new(sync.Mutex),
200+
backend: backend,
201+
current: newSnapshot(&pbft.View{
202+
Sequence: big.NewInt(1),
203+
Round: big.NewInt(0),
204+
}, newTestValidatorSet(4)),
205+
state: StateAcceptRequest,
203206
}
204207
c.subscribeEvents()
205208
defer c.unsubscribeEvents()
@@ -225,7 +228,7 @@ func TestProcessFutureBacklog(t *testing.T) {
225228
const timeoutDura = 2 * time.Second
226229
timeout := time.NewTimer(timeoutDura)
227230
select {
228-
case <-c.internalEvents.Chan():
231+
case <-c.events.Chan():
229232
t.Errorf("Should not receive any events")
230233

231234
case <-timeout.C:
@@ -239,7 +242,7 @@ func TestProcessBacklog(t *testing.T) {
239242
Sequence: big.NewInt(1),
240243
}
241244
preprepare := &pbft.Preprepare{
242-
View: v,
245+
View: v,
243246
Proposal: makeBlock(1),
244247
}
245248
prepreparePayload, _ := Encode(preprepare)
@@ -276,14 +279,15 @@ func testProcessBacklog(t *testing.T, msg *message) {
276279
peers: vset,
277280
}
278281
c := &core{
279-
logger: log.New("backend", "test", "id", 0),
280-
backlogs: make(map[pbft.Validator]*prque.Prque),
281-
backlogsMu: new(sync.Mutex),
282-
backend: backend,
283-
internalMux: new(event.TypeMux),
284-
state: State(msg.Code),
285-
sequence: big.NewInt(1),
286-
round: big.NewInt(0),
282+
logger: log.New("backend", "test", "id", 0),
283+
backlogs: make(map[pbft.Validator]*prque.Prque),
284+
backlogsMu: new(sync.Mutex),
285+
backend: backend,
286+
state: State(msg.Code),
287+
current: newSnapshot(&pbft.View{
288+
Sequence: big.NewInt(1),
289+
Round: big.NewInt(0),
290+
}, newTestValidatorSet(4)),
287291
subject: &pbft.Subject{
288292
View: &pbft.View{
289293
Sequence: big.NewInt(1),
@@ -300,7 +304,7 @@ func testProcessBacklog(t *testing.T, msg *message) {
300304
const timeoutDura = 2 * time.Second
301305
timeout := time.NewTimer(timeoutDura)
302306
select {
303-
case ev := <-c.internalEvents.Chan():
307+
case ev := <-c.events.Chan():
304308
e, ok := ev.Data.(backlogEvent)
305309
if !ok {
306310
t.Fatalf("Unexpected event comes")

consensus/pbft/core/checkpoint.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import (
2424

2525
func (c *core) sendCheckpoint(cp *pbft.Subject) {
2626
logger := c.logger.New("state", c.state)
27-
logger.Debug("sendCheckpoint")
27+
logger.Trace("sendCheckpoint")
2828

2929
newCp, err := Encode(cp)
3030
if err != nil {
@@ -54,23 +54,27 @@ func (c *core) handleCheckpoint(msg *message, src pbft.Validator) error {
5454

5555
var snapshot *snapshot
5656

57-
logger.Debug("handleCheckpoint")
57+
logger.Trace("handleCheckpoint")
5858

5959
c.snapshotsMu.Lock()
6060
defer c.snapshotsMu.Unlock()
6161

6262
// Look for matching snapshot
63-
if cp.View.Sequence.Cmp(c.current.Sequence) == 0 { // current
63+
if cp.View.Sequence.Cmp(c.current.Sequence()) == 0 { // current
64+
// If we're waiting for RoundChange, ignore this
65+
if c.waitingForRoundChange {
66+
return pbft.ErrIgnored
67+
}
6468
snapshot = c.current
65-
} else if cp.View.Sequence.Cmp(c.current.Sequence) < 0 { // old checkpoint
69+
} else if cp.View.Sequence.Cmp(c.current.Sequence()) < 0 { // old checkpoint
6670
snapshotIndex := sort.Search(len(c.snapshots),
6771
func(i int) bool {
68-
return c.snapshots[i].Sequence.Cmp(cp.View.Sequence) >= 0
72+
return c.snapshots[i].Sequence().Cmp(cp.View.Sequence) >= 0
6973
},
7074
)
7175

7276
// If there is no such index, Search returns len(c.snapshots).
73-
if snapshotIndex < len(c.snapshots) && c.snapshots[snapshotIndex].Sequence.Cmp(cp.View.Sequence) == 0 {
77+
if snapshotIndex < len(c.snapshots) && c.snapshots[snapshotIndex].Sequence().Cmp(cp.View.Sequence) == 0 {
7478
snapshot = c.snapshots[snapshotIndex]
7579
} else {
7680
logger.Warn("Failed to find snapshot entry", "seq", cp.View.Sequence, "current", c.current.Sequence)
@@ -93,7 +97,7 @@ func (c *core) handleCheckpoint(msg *message, src pbft.Validator) error {
9397
func (c *core) buildStableCheckpoint() {
9498
var stableCheckpoint *snapshot
9599
stableCheckpointIndex := -1
96-
logger := c.logger.New("seq", c.sequence)
100+
logger := c.logger.New("seq", c.current.Sequence())
97101

98102
c.snapshotsMu.Lock()
99103
for i := len(c.snapshots) - 1; i >= 0; i-- {

consensus/pbft/core/checkpoint_test.go

Lines changed: 30 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -34,17 +34,16 @@ func TestHandleCheckpoint(t *testing.T) {
3434
}
3535
system := NewTestSystemWithBackend(N, F)
3636
c := system.backends[0].engine.(*core)
37-
c.current = newSnapshot(preprepare, system.backends[0].Validators())
38-
c.snapshots = append(c.snapshots, newSnapshot(&pbft.Preprepare{
39-
View: &pbft.View{
40-
Round: big.NewInt(0),
41-
Sequence: big.NewInt(1),
42-
},
43-
}, system.backends[0].Validators()), newSnapshot(&pbft.Preprepare{
44-
View: &pbft.View{
45-
Round: big.NewInt(0),
46-
Sequence: big.NewInt(2),
47-
},
37+
c.current = newSnapshot(&pbft.View{
38+
Sequence: big.NewInt(3),
39+
Round: big.NewInt(0),
40+
}, system.backends[0].Validators())
41+
c.snapshots = append(c.snapshots, newSnapshot(&pbft.View{
42+
Round: big.NewInt(0),
43+
Sequence: big.NewInt(1),
44+
}, system.backends[0].Validators()), newSnapshot(&pbft.View{
45+
Round: big.NewInt(0),
46+
Sequence: big.NewInt(2),
4847
}, system.backends[0].Validators()))
4948

5049
testCases := []struct {
@@ -55,7 +54,7 @@ func TestHandleCheckpoint(t *testing.T) {
5554
expectedErr error
5655
}{
5756
// empty subject
58-
{system, &pbft.Subject{}, system.backends[0].Validators().List()[0], nil, pbft.ErrInvalidMessage},
57+
{system, &pbft.Subject{View: &pbft.View{Sequence: big.NewInt(0), Round: big.NewInt(0)}}, system.backends[0].Validators().List()[0], nil, pbft.ErrInvalidMessage},
5958
// current sequence
6059
{system, &pbft.Subject{View: &pbft.View{Sequence: preprepare.View.Sequence}}, system.backends[0].Validators().List()[0], c.current, nil},
6160
// old sequence
@@ -102,20 +101,25 @@ func TestBuildStableCheckpoint(t *testing.T) {
102101
c := system.backends[0].engine.(*core)
103102
v := system.backends[0].Validators().List()[0]
104103
proposal := makeBlock(1)
105-
expectedStableSnapshot := newSnapshot(&pbft.Preprepare{
106-
View: &pbft.View{
107-
Round: big.NewInt(0),
108-
Sequence: big.NewInt(1),
109-
},
104+
view := &pbft.View{
105+
Round: big.NewInt(0),
106+
Sequence: big.NewInt(1),
107+
}
108+
expectedStableSnapshot := newSnapshot(view, system.backends[0].Validators())
109+
expectedStableSnapshot.Preprepare = &pbft.Preprepare{
110+
View: view,
110111
Proposal: proposal,
111-
}, system.backends[0].Validators())
112-
c.snapshots = append(c.snapshots, expectedStableSnapshot, newSnapshot(&pbft.Preprepare{
113-
View: &pbft.View{
114-
Round: big.NewInt(0),
115-
Sequence: big.NewInt(2),
116-
},
112+
}
113+
view = &pbft.View{
114+
Round: big.NewInt(0),
115+
Sequence: big.NewInt(2),
116+
}
117+
s := newSnapshot(view, system.backends[0].Validators())
118+
s.Preprepare = &pbft.Preprepare{
119+
View: view,
117120
Proposal: proposal,
118-
}, system.backends[0].Validators()))
121+
}
122+
c.snapshots = append(c.snapshots, expectedStableSnapshot, s)
119123

120124
sub := &pbft.Subject{View: expectedStableSnapshot.Preprepare.View}
121125
b, _ := Encode(sub)
@@ -133,7 +137,7 @@ func TestBuildStableCheckpoint(t *testing.T) {
133137
}
134138
if stableCheckpoint == nil {
135139
t.Errorf("cannot get the stable checkpoint")
136-
} else if stableCheckpoint.Sequence.Cmp(expectedStableSnapshot.Sequence) != 0 {
137-
t.Errorf("unexpected stable check point, got: %v, expected: %v", stableCheckpoint.Sequence, expectedStableSnapshot.Sequence)
140+
} else if stableCheckpoint.Sequence().Cmp(expectedStableSnapshot.Sequence()) != 0 {
141+
t.Errorf("unexpected stable check point, got: %v, expected: %v", stableCheckpoint.Sequence(), expectedStableSnapshot.Sequence())
138142
}
139143
}

consensus/pbft/core/commit.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import (
2424

2525
func (c *core) sendCommit() {
2626
logger := c.logger.New("state", c.state)
27-
logger.Debug("sendCommit")
27+
logger.Trace("sendCommit")
2828

2929
subject, err := Encode(c.subject)
3030
if err != nil {
@@ -39,7 +39,12 @@ func (c *core) sendCommit() {
3939

4040
func (c *core) handleCommit(msg *message, src pbft.Validator) error {
4141
logger := c.logger.New("from", src.Address().Hex(), "state", c.state)
42-
logger.Debug("handleCommit")
42+
logger.Trace("handleCommit")
43+
44+
if c.waitingForRoundChange {
45+
logger.Warn("Waiting for a RoundChange, ignore", "msg", msg)
46+
return pbft.ErrIgnored
47+
}
4348

4449
var commit *pbft.Subject
4550
err := msg.Decode(&commit)

0 commit comments

Comments
 (0)