Skip to content

Commit 81f08f1

Browse files
authored
Merge pull request #67 from getamis/feature/jump-state
consensus/pbft: jump consensus state if receive enough certifications
2 parents 970e6d5 + 6a42562 commit 81f08f1

File tree

12 files changed

+204
-55
lines changed

12 files changed

+204
-55
lines changed

consensus/pbft/core/backlog.go

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,12 @@ import (
2222
)
2323

2424
var (
25-
waitFor = map[State]uint64{
26-
StateAcceptRequest: msgPreprepare,
27-
StatePreprepared: msgPrepare,
28-
StatePrepared: msgCommit,
29-
StateCommitted: msgAll,
25+
// msgPriority is defined for calculating processing priority to speedup consensus
26+
// msgPreprepare > msgCommit > msgPrepare
27+
msgPriority = map[uint64]int{
28+
msgPreprepare: 1,
29+
msgCommit: 2,
30+
msgPrepare: 3,
3031
}
3132
)
3233

@@ -37,15 +38,23 @@ func (c *core) isFutureMessage(msgCode uint64, view *pbft.View) bool {
3738
return false
3839
}
3940

40-
waitMsgCode, ok := waitFor[c.state]
41-
// don't check if not in pre-defined state
42-
if !ok {
41+
if view.Cmp(c.currentView()) > 0 {
42+
return true
43+
}
44+
45+
if view.Cmp(c.currentView()) < 0 {
4346
return false
4447
}
45-
priority := toPriority(waitMsgCode, c.currentView())
46-
newPriority := toPriority(msgCode, view)
4748

48-
return priority > newPriority
49+
// StateAcceptRequest only accepts msgPreprepare
50+
// other messages are future messages
51+
if c.state == StateAcceptRequest {
52+
return msgCode > msgPreprepare
53+
}
54+
55+
// For states(StatePreprepared, StatePrepared, StateCommitted),
56+
// can accept all message types if processing with same view
57+
return false
4958
}
5059

5160
func (c *core) storeBacklog(msg *message, src pbft.Validator) {
@@ -143,5 +152,5 @@ func toPriority(msgCode uint64, view *pbft.View) float32 {
143152
// FIXME: round will be reset as 0 while new sequence
144153
// 10 * Round limits the range of message code is from 0 to 9
145154
// 1000 * Sequence limits the range of round is from 0 to 99
146-
return -float32(view.Sequence.Uint64()*1000 + view.Round.Uint64()*10 + msgCode)
155+
return -float32(view.Sequence.Uint64()*1000 + view.Round.Uint64()*10 + uint64(msgPriority[msgCode]))
147156
}

consensus/pbft/core/backlog_test.go

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -99,29 +99,17 @@ func TestIsFutureMessage(t *testing.T) {
9999
c.state = StatePreprepared
100100
for i := 0; i < len(testCode); i++ {
101101
r = c.isFutureMessage(testCode[i], v)
102-
if testCode[i] <= msgPrepare {
103-
if r {
104-
t.Error("Should return false because we can execute it now")
105-
}
106-
} else {
107-
if !r {
108-
t.Error("Should return true because it's a future round")
109-
}
102+
if r {
103+
t.Error("Should return false because we can execute it now")
110104
}
111105
}
112106

113107
// current view, state = StatePrepared
114108
c.state = StatePrepared
115109
for i := 0; i < len(testCode); i++ {
116110
r = c.isFutureMessage(testCode[i], v)
117-
if testCode[i] <= msgCommit {
118-
if r {
119-
t.Error("Should return false because we can execute it now")
120-
}
121-
} else {
122-
if !r {
123-
t.Error("Should return true because it's a future round")
124-
}
111+
if r {
112+
t.Error("Should return false because we can execute it now")
125113
}
126114
}
127115

consensus/pbft/core/commit.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,9 @@ func (c *core) handleCommit(msg *message, src pbft.Validator) error {
6262

6363
c.acceptCommit(msg, src)
6464

65-
if int64(c.current.Commits.Size()) > 2*c.F && c.state == StatePrepared {
65+
// if has proposal and receives enough commit messages,
66+
// it can chnage to StateCommitted directly to speed up the consensus process
67+
if int64(c.current.Commits.Size()) > 2*c.F && c.state.Cmp(StateCommitted) < 0 {
6668
c.commit()
6769
}
6870

@@ -73,7 +75,7 @@ func (c *core) verifyCommit(commit *pbft.Subject, src pbft.Validator) error {
7375
logger := c.logger.New("from", src.Address().Hex(), "state", c.state)
7476

7577
if !reflect.DeepEqual(commit, c.subject) {
76-
logger.Warn("Subjects do not match", "expected", c.subject, "got", commit)
78+
logger.Warn("Inconsistent subjects between commit and proposal", "expected", c.subject, "got", commit)
7779
return pbft.ErrSubjectNotMatched
7880
}
7981

consensus/pbft/core/commit_test.go

Lines changed: 54 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ func TestHandleCommit(t *testing.T) {
3737
Proposal: makeBlock(1),
3838
}
3939
c.acceptPreprepare(preprepare)
40+
c.state = StatePreprepared
4041
}
4142

4243
testCases := []struct {
@@ -83,6 +84,26 @@ func TestHandleCommit(t *testing.T) {
8384
}(),
8485
errFutureMessage,
8586
},
87+
{
88+
// future message because still at StateAcceptRequest
89+
func() *testSystem {
90+
sys := NewTestSystemWithBackend(N, F)
91+
92+
for i, backend := range sys.backends {
93+
c := backend.engine.(*core)
94+
95+
// replica0 is still waiting for preprepare message
96+
// other replicas are at StatePrepared
97+
if i != 0 {
98+
toPreprepare(c)
99+
// change to prepared
100+
c.state = StatePrepared
101+
}
102+
}
103+
return sys
104+
}(),
105+
errFutureMessage,
106+
},
86107
{
87108
// subject not match
88109
func() *testSystem {
@@ -124,6 +145,26 @@ func TestHandleCommit(t *testing.T) {
124145
}(),
125146
nil,
126147
},
148+
{
149+
// jump state
150+
func() *testSystem {
151+
sys := NewTestSystemWithBackend(N, F)
152+
153+
for i, backend := range sys.backends {
154+
c := backend.engine.(*core)
155+
// should have subject for each backend
156+
toPreprepare(c)
157+
158+
// only replica0 stays at StatePreprepared
159+
// other replicas are at StatePrepared
160+
if i != 0 {
161+
c.state = StatePrepared
162+
}
163+
}
164+
return sys
165+
}(),
166+
nil,
167+
},
127168
// TODO: double send message
128169
}
129170

@@ -135,17 +176,20 @@ OUTER:
135176
r0 := v0.engine.(*core)
136177

137178
for i, v := range test.system.backends {
138-
validator := v.Validators().GetByIndex(uint64(i))
139-
m, _ := Encode(v.engine.(*core).subject)
140-
if err := r0.handleCommit(&message{
141-
Code: msgCommit,
142-
Msg: m,
143-
Address: validator.Address(),
144-
}, validator); err != nil {
145-
if err != test.expectedErr {
146-
t.Error("unexpected error: ", err)
179+
// send commit message to replica0 if has subject
180+
if v.engine.(*core).subject != nil {
181+
validator := v.Validators().GetByIndex(uint64(i))
182+
m, _ := Encode(v.engine.(*core).subject)
183+
if err := r0.handleCommit(&message{
184+
Code: msgCommit,
185+
Msg: m,
186+
Address: validator.Address(),
187+
}, validator); err != nil {
188+
if err != test.expectedErr {
189+
t.Error("unexpected error: ", err)
190+
}
191+
continue OUTER
147192
}
148-
continue OUTER
149193
}
150194
}
151195

consensus/pbft/core/core_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,10 @@ func TestNewRequest(t *testing.T) {
6868
t.Error("expected execution of requests should be 2, but got:", len(backend.commitMsgs))
6969
}
7070
if !reflect.DeepEqual(request1.Number(), backend.commitMsgs[0].Number()) {
71-
t.Error("payload is not the same (1)")
71+
t.Errorf("request number should be equal, expect:%v, but got:%v", request1.Number(), backend.commitMsgs[0].Number())
7272
}
7373
if !reflect.DeepEqual(request2.Number(), backend.commitMsgs[1].Number()) {
74-
t.Error("payload is not the same (2)")
74+
t.Errorf("request number should be equal, expect:%v, but got:%v", request2.Number(), backend.commitMsgs[1].Number())
7575
}
7676
}
7777
}

consensus/pbft/core/prepare.go

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,9 @@ func (c *core) handlePrepare(msg *message, src pbft.Validator) error {
6262

6363
c.acceptPrepare(msg, src)
6464

65-
// If 2f+1
66-
if int64(c.current.Prepares.Size()) > 2*c.F && c.state == StatePreprepared {
65+
// change to StatePrepared if receving enough prepare messages
66+
// and the current state is at previous state
67+
if int64(c.current.Prepares.Size()) > 2*c.F && c.state.Cmp(StatePrepared) < 0 {
6768
c.setState(StatePrepared)
6869
c.sendCommit()
6970
}
@@ -74,15 +75,8 @@ func (c *core) handlePrepare(msg *message, src pbft.Validator) error {
7475
func (c *core) verifyPrepare(prepare *pbft.Subject, src pbft.Validator) error {
7576
logger := c.logger.New("from", src.Address().Hex(), "state", c.state)
7677

77-
if prepare.View.Sequence != nil &&
78-
c.subject != nil &&
79-
prepare.View.Sequence.Cmp(c.subject.View.Sequence) < 0 {
80-
logger.Warn("Old message", "expected", c.subject, "got", prepare)
81-
return pbft.ErrOldMessage
82-
}
83-
8478
if !reflect.DeepEqual(prepare, c.subject) {
85-
logger.Warn("Subjects do not match", "expected", c.subject, "got", prepare)
79+
logger.Warn("Inconsistent subjects between prepare and proposal", "expected", c.subject, "got", prepare)
8680
return pbft.ErrSubjectNotMatched
8781
}
8882

consensus/pbft/core/prepare_test.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ func TestVerifyPrepare(t *testing.T) {
229229
},
230230
{
231231
// old message
232-
expected: pbft.ErrOldMessage,
232+
expected: pbft.ErrSubjectNotMatched,
233233
prepare: &pbft.Subject{
234234
View: &pbft.View{Round: big.NewInt(0), Sequence: big.NewInt(0)},
235235
Digest: common.StringToHash("1234567890"),
@@ -239,6 +239,18 @@ func TestVerifyPrepare(t *testing.T) {
239239
Digest: common.StringToHash("1234567890"),
240240
},
241241
},
242+
{
243+
// different digest
244+
expected: pbft.ErrSubjectNotMatched,
245+
prepare: &pbft.Subject{
246+
View: &pbft.View{Round: big.NewInt(0), Sequence: big.NewInt(0)},
247+
Digest: common.StringToHash("1234567890"),
248+
},
249+
self: &pbft.Subject{
250+
View: &pbft.View{Round: big.NewInt(0), Sequence: big.NewInt(0)},
251+
Digest: common.StringToHash("1234567888"),
252+
},
253+
},
242254
{
243255
// malicious package(lack of sequence)
244256
expected: pbft.ErrSubjectNotMatched,

consensus/pbft/core/preprepare_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ import (
2121
"reflect"
2222
"testing"
2323

24-
"github.com/ethereum/go-ethereum/consensus/pbft"
2524
"github.com/ethereum/go-ethereum/common"
25+
"github.com/ethereum/go-ethereum/consensus/pbft"
2626
)
2727

2828
func TestHandlePreprepare(t *testing.T) {
@@ -177,7 +177,7 @@ OUTER:
177177
Address: v0.Address(),
178178
}, val); err != nil {
179179
if err != test.expectedErr {
180-
t.Error("unexpected error: ", err)
180+
t.Errorf("unexpected error, expect:%v, but got:%v", test.expectedErr, err)
181181
}
182182
continue OUTER
183183
}

consensus/pbft/core/testbackend_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,7 @@ func NewTestSystemWithBackend(n, f uint64) *testSystem {
206206
backend.address = vset.GetByIndex(i).Address()
207207

208208
core := New(backend, config).(*core)
209+
core.state = StateAcceptRequest
209210
core.current = newSnapshot(&pbft.View{
210211
Round: big.NewInt(0),
211212
Sequence: big.NewInt(1),

consensus/pbft/core/types.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,20 @@ func (s State) String() string {
6060
}
6161
}
6262

63+
// Cmp compares s and y and returns:
64+
// -1 if s is the previous state of y
65+
// 0 if s and y are the same state
66+
// +1 if s is the next state of y
67+
func (s State) Cmp(y State) int {
68+
if uint64(s) < uint64(y) {
69+
return -1
70+
}
71+
if uint64(s) > uint64(y) {
72+
return 1
73+
}
74+
return 0
75+
}
76+
6377
const (
6478
msgPreprepare uint64 = iota
6579
msgPrepare

0 commit comments

Comments
 (0)