Skip to content

Commit 57b107b

Browse files
authored
Merge pull request #148 from getamis/feature/round_change_backlog
consensus/istanbul: add backlog for roundChange messages
2 parents 170e938 + 07d450c commit 57b107b

File tree

8 files changed

+82
-111
lines changed

8 files changed

+82
-111
lines changed

consensus/istanbul/core/backlog.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,15 @@ func (c *core) checkMessage(msgCode uint64, view *istanbul.View) error {
4040
return errInvalidMessage
4141
}
4242

43+
if msgCode == msgRoundChange {
44+
if view.Sequence.Cmp(c.currentView().Sequence) > 0 {
45+
return errFutureMessage
46+
} else if view.Cmp(c.currentView()) < 0 {
47+
return errOldMessage
48+
}
49+
return nil
50+
}
51+
4352
if view.Cmp(c.currentView()) > 0 {
4453
return errFutureMessage
4554
}
@@ -90,7 +99,7 @@ func (c *core) storeBacklog(msg *message, src istanbul.Validator) {
9099
if err == nil {
91100
backlog.Push(msg, toPriority(msg.Code, p.View))
92101
}
93-
// for istanbul.MsgPrepare and istanbul.MsgCommit cases
102+
// for msgRoundChange, msgPrepare and msgCommit cases
94103
default:
95104
var p *istanbul.Subject
96105
err := msg.Decode(&p)
@@ -127,7 +136,7 @@ func (c *core) processBacklog() {
127136
if err == nil {
128137
view = m.View
129138
}
130-
// for istanbul.MsgPrepare and istanbul.MsgCommit cases
139+
// for msgRoundChange, msgPrepare and msgCommit cases
131140
default:
132141
var sub *istanbul.Subject
133142
err := msg.Decode(&sub)
@@ -162,6 +171,10 @@ func (c *core) processBacklog() {
162171
}
163172

164173
func toPriority(msgCode uint64, view *istanbul.View) float32 {
174+
if msgCode == msgRoundChange {
175+
// For msgRoundChange, set the message priority based on its sequence
176+
return -float32(view.Sequence.Uint64() * 1000)
177+
}
165178
// FIXME: round will be reset as 0 while new sequence
166179
// 10 * Round limits the range of message code is from 0 to 9
167180
// 1000 * Sequence limits the range of round is from 0 to 99

consensus/istanbul/core/backlog_test.go

Lines changed: 46 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func TestCheckMessage(t *testing.T) {
4747
}
4848

4949
testStates := []State{StateAcceptRequest, StatePreprepared, StatePrepared, StateCommitted}
50-
testCode := []uint64{msgPreprepare, msgPrepare, msgCommit}
50+
testCode := []uint64{msgPreprepare, msgPrepare, msgCommit, msgRoundChange}
5151

5252
// future sequence
5353
v := &istanbul.View{
@@ -73,7 +73,11 @@ func TestCheckMessage(t *testing.T) {
7373
c.state = testStates[i]
7474
for j := 0; j < len(testCode); j++ {
7575
err := c.checkMessage(testCode[j], v)
76-
if err != errFutureMessage {
76+
if testCode[j] == msgRoundChange {
77+
if err != nil {
78+
t.Errorf("error mismatch: have %v, want nil", err)
79+
}
80+
} else if err != errFutureMessage {
7781
t.Errorf("error mismatch: have %v, want %v", err, errFutureMessage)
7882
}
7983
}
@@ -89,7 +93,11 @@ func TestCheckMessage(t *testing.T) {
8993
c.state = testStates[i]
9094
for j := 0; j < len(testCode); j++ {
9195
err := c.checkMessage(testCode[j], v)
92-
if err != errFutureMessage {
96+
if testCode[j] == msgRoundChange {
97+
if err != nil {
98+
t.Errorf("error mismatch: have %v, want nil", err)
99+
}
100+
} else if err != errFutureMessage {
93101
t.Errorf("error mismatch: have %v, want %v", err, errFutureMessage)
94102
}
95103
}
@@ -101,7 +109,11 @@ func TestCheckMessage(t *testing.T) {
101109
c.state = StateAcceptRequest
102110
for i := 0; i < len(testCode); i++ {
103111
err = c.checkMessage(testCode[i], v)
104-
if testCode[i] == msgPreprepare {
112+
if testCode[i] == msgRoundChange {
113+
if err != nil {
114+
t.Errorf("error mismatch: have %v, want nil", err)
115+
}
116+
} else if testCode[i] == msgPreprepare {
105117
if err != nil {
106118
t.Errorf("error mismatch: have %v, want nil", err)
107119
}
@@ -116,7 +128,11 @@ func TestCheckMessage(t *testing.T) {
116128
c.state = StatePreprepared
117129
for i := 0; i < len(testCode); i++ {
118130
err = c.checkMessage(testCode[i], v)
119-
if err != nil {
131+
if testCode[i] == msgRoundChange {
132+
if err != nil {
133+
t.Errorf("error mismatch: have %v, want nil", err)
134+
}
135+
} else if err != nil {
120136
t.Errorf("error mismatch: have %v, want nil", err)
121137
}
122138
}
@@ -125,7 +141,11 @@ func TestCheckMessage(t *testing.T) {
125141
c.state = StatePrepared
126142
for i := 0; i < len(testCode); i++ {
127143
err = c.checkMessage(testCode[i], v)
128-
if err != nil {
144+
if testCode[i] == msgRoundChange {
145+
if err != nil {
146+
t.Errorf("error mismatch: have %v, want nil", err)
147+
}
148+
} else if err != nil {
129149
t.Errorf("error mismatch: have %v, want nil", err)
130150
}
131151
}
@@ -134,7 +154,11 @@ func TestCheckMessage(t *testing.T) {
134154
c.state = StateCommitted
135155
for i := 0; i < len(testCode); i++ {
136156
err = c.checkMessage(testCode[i], v)
137-
if err != nil {
157+
if testCode[i] == msgRoundChange {
158+
if err != nil {
159+
t.Errorf("error mismatch: have %v, want nil", err)
160+
}
161+
} else if err != nil {
138162
t.Errorf("error mismatch: have %v, want nil", err)
139163
}
140164
}
@@ -195,6 +219,17 @@ func TestStoreBacklog(t *testing.T) {
195219
if !reflect.DeepEqual(msg, m) {
196220
t.Errorf("message mismatch: have %v, want %v", msg, m)
197221
}
222+
223+
// push roundChange msg
224+
m = &message{
225+
Code: msgRoundChange,
226+
Msg: subjectPayload,
227+
}
228+
c.storeBacklog(m, p)
229+
msg = c.backlogs[p].PopItem()
230+
if !reflect.DeepEqual(msg, m) {
231+
t.Errorf("message mismatch: have %v, want %v", msg, m)
232+
}
198233
}
199234

200235
func TestProcessFutureBacklog(t *testing.T) {
@@ -276,6 +311,10 @@ func TestProcessBacklog(t *testing.T) {
276311
Code: msgCommit,
277312
Msg: subjectPayload,
278313
},
314+
&message{
315+
Code: msgRoundChange,
316+
Msg: subjectPayload,
317+
},
279318
}
280319
for i := 0; i < len(msgs); i++ {
281320
testProcessBacklog(t, msgs[i])

consensus/istanbul/core/handler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ func (c *core) handleCheckedMsg(msg *message, src istanbul.Validator) error {
172172
case msgCommit:
173173
return testBacklog(c.handleCommit(msg, src))
174174
case msgRoundChange:
175-
return c.handleRoundChange(msg, src)
175+
return testBacklog(c.handleRoundChange(msg, src))
176176
default:
177177
logger.Error("Invalid message", "msg", msg)
178178
}

consensus/istanbul/core/prepare.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ func (c *core) handlePrepare(msg *message, src istanbul.Validator) error {
5959

6060
// Change to Prepared state if we've received enough PREPARE messages or it is locked
6161
// and we are in earlier state before Prepared state.
62-
if ((c.current.IsHashLocked() && prepare.Digest == c.current.GetLockedHash()) || c.current.Prepares.Size() > 2*c.valSet.F()) &&
62+
if ((c.current.IsHashLocked() && prepare.Digest == c.current.GetLockedHash()) || c.current.GetPrepareOrCommitSize() > 2*c.valSet.F()) &&
6363
c.state.Cmp(StatePrepared) < 0 {
6464
c.current.LockHash()
6565
c.setState(StatePrepared)

consensus/istanbul/core/roundchange.go

Lines changed: 16 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,9 @@ func (c *core) sendRoundChange(round *big.Int) {
4848

4949
// Now we have the new round number and sequence number
5050
cv = c.currentView()
51-
rc := &roundChange{
52-
Round: new(big.Int).Set(cv.Round),
53-
Sequence: new(big.Int).Set(cv.Sequence),
54-
Digest: common.Hash{},
51+
rc := &istanbul.Subject{
52+
View: cv,
53+
Digest: common.Hash{},
5554
}
5655

5756
payload, err := Encode(rc)
@@ -70,29 +69,22 @@ func (c *core) handleRoundChange(msg *message, src istanbul.Validator) error {
7069
logger := c.logger.New("state", c.state, "from", src.Address().Hex())
7170

7271
// Decode ROUND CHANGE message
73-
var rc *roundChange
72+
var rc *istanbul.Subject
7473
if err := msg.Decode(&rc); err != nil {
7574
logger.Error("Failed to decode ROUND CHANGE", "err", err)
7675
return errInvalidMessage
7776
}
7877

79-
cv := c.currentView()
80-
81-
// We never accept ROUND CHANGE message with different sequence number
82-
if rc.Sequence.Cmp(cv.Sequence) != 0 {
83-
logger.Warn("Inconsistent sequence number", "expected", cv.Sequence, "got", rc.Sequence)
84-
return errInvalidMessage
78+
if err := c.checkMessage(msgRoundChange, rc.View); err != nil {
79+
return err
8580
}
8681

87-
// We never accept ROUND CHANGE message with smaller round number
88-
if rc.Round.Cmp(cv.Round) < 0 {
89-
logger.Warn("Old round change", "from", src, "expected", cv.Round, "got", rc.Round)
90-
return errOldMessage
91-
}
82+
cv := c.currentView()
83+
roundView := rc.View
9284

9385
// Add the ROUND CHANGE message to its message set and return how many
9486
// messages we've got with the same round number and sequence number.
95-
num, err := c.roundChangeSet.Add(rc.Round, msg)
87+
num, err := c.roundChangeSet.Add(roundView.Round, msg)
9688
if err != nil {
9789
logger.Warn("Failed to add round change message", "from", src, "msg", msg, "err", err)
9890
return err
@@ -102,18 +94,17 @@ func (c *core) handleRoundChange(msg *message, src istanbul.Validator) error {
10294
// If our round number is smaller than the certificate's round number, we would
10395
// try to catch up the round number.
10496
if c.waitingForRoundChange && num == int(c.valSet.F()+1) {
105-
if cv.Round.Cmp(rc.Round) < 0 {
106-
c.sendRoundChange(rc.Round)
97+
if cv.Round.Cmp(roundView.Round) < 0 {
98+
c.sendRoundChange(roundView.Round)
10799
}
108100
return nil
109-
} else if num == int(2*c.valSet.F()+1) && (c.waitingForRoundChange || cv.Round.Cmp(rc.Round) < 0) {
101+
} else if num == int(2*c.valSet.F()+1) && (c.waitingForRoundChange || cv.Round.Cmp(roundView.Round) < 0) {
110102
// We've received 2f+1 ROUND CHANGE messages, start a new round immediately.
111-
c.startNewRound(rc.Round)
103+
c.startNewRound(roundView.Round)
112104
return nil
113-
} else if cv.Round.Cmp(rc.Round) < 0 {
114-
// We consider the message with larger round as future messages and not
115-
// gossip it to other validators.
116-
return errFutureMessage
105+
} else if cv.Round.Cmp(roundView.Round) < 0 {
106+
// Only gossip the message with current round to other validators.
107+
return errIgnored
117108
}
118109
return nil
119110
}

consensus/istanbul/core/roundchange_test.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,9 @@ func TestRoundChangeSet(t *testing.T) {
3333
Sequence: big.NewInt(1),
3434
Round: big.NewInt(1),
3535
}
36-
r := &roundChange{
37-
Round: view.Round,
38-
Sequence: view.Sequence,
39-
Digest: common.Hash{},
36+
r := &istanbul.Subject{
37+
View: view,
38+
Digest: common.Hash{},
4039
}
4140
m, _ := Encode(r)
4241

consensus/istanbul/core/types.go

Lines changed: 0 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package core
1919
import (
2020
"fmt"
2121
"io"
22-
"math/big"
2322

2423
"github.com/ethereum/go-ethereum/common"
2524
"github.com/ethereum/go-ethereum/rlp"
@@ -163,34 +162,3 @@ func (m *message) String() string {
163162
func Encode(val interface{}) ([]byte, error) {
164163
return rlp.EncodeToBytes(val)
165164
}
166-
167-
// ----------------------------------------------------------------------------
168-
169-
type roundChange struct {
170-
Round *big.Int
171-
Sequence *big.Int
172-
Digest common.Hash
173-
}
174-
175-
// EncodeRLP serializes rc into the Ethereum RLP format.
176-
func (rc *roundChange) EncodeRLP(w io.Writer) error {
177-
return rlp.Encode(w, []interface{}{
178-
rc.Round,
179-
rc.Sequence,
180-
rc.Digest,
181-
})
182-
}
183-
184-
// DecodeRLP implements rlp.Decoder, and load the consensus fields from a RLP stream.
185-
func (rc *roundChange) DecodeRLP(s *rlp.Stream) error {
186-
var rawRoundChange struct {
187-
Round *big.Int
188-
Sequence *big.Int
189-
Digest common.Hash
190-
}
191-
if err := s.Decode(&rawRoundChange); err != nil {
192-
return err
193-
}
194-
rc.Round, rc.Sequence, rc.Digest = rawRoundChange.Round, rawRoundChange.Sequence, rawRoundChange.Digest
195-
return nil
196-
}

consensus/istanbul/core/types_test.go

Lines changed: 0 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -172,47 +172,8 @@ func testSubjectWithSignature(t *testing.T) {
172172
}
173173
}
174174

175-
func testRoundChange(t *testing.T) {
176-
rc := &roundChange{
177-
Round: big.NewInt(1),
178-
Sequence: big.NewInt(2),
179-
Digest: common.StringToHash("1234567890"),
180-
}
181-
RoundChangePayload, _ := Encode(rc)
182-
183-
m := &message{
184-
Code: msgRoundChange,
185-
Msg: RoundChangePayload,
186-
Address: common.HexToAddress("0x1234567890"),
187-
}
188-
189-
msgPayload, err := m.Payload()
190-
if err != nil {
191-
t.Errorf("error mismatch: have %v, want nil", err)
192-
}
193-
194-
decodedMsg := new(message)
195-
err = decodedMsg.FromPayload(msgPayload, nil)
196-
if err != nil {
197-
t.Errorf("error mismatch: have %v, want nil", err)
198-
}
199-
200-
var decodedRC *roundChange
201-
err = decodedMsg.Decode(&decodedRC)
202-
if err != nil {
203-
t.Errorf("error mismatch: have %v, want nil", err)
204-
}
205-
206-
// if block is encoded/decoded by rlp, we cannot to compare interface data type using reflect.DeepEqual. (like istanbul.Proposal)
207-
// so individual comparison here.
208-
if !reflect.DeepEqual(rc, decodedRC) {
209-
t.Errorf("round change mismatch: have %v, want %v", decodedRC, rc)
210-
}
211-
}
212-
213175
func TestMessageEncodeDecode(t *testing.T) {
214176
testPreprepare(t)
215177
testSubject(t)
216178
testSubjectWithSignature(t)
217-
testRoundChange(t)
218179
}

0 commit comments

Comments
 (0)