Skip to content

Commit c8c1247

Browse files
authored
Merge pull request #62 from getamis/feature/refactor_seal
consensus/pbft: refactor committed, seal and viewChanged
2 parents a8d387b + 11f7d62 commit c8c1247

File tree

3 files changed

+46
-98
lines changed

3 files changed

+46
-98
lines changed

consensus/pbft/backends/simple/backend.go

Lines changed: 20 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package simple
1818

1919
import (
2020
"crypto/ecdsa"
21+
"sync"
2122

2223
"github.com/ethereum/go-ethereum/common"
2324
"github.com/ethereum/go-ethereum/consensus"
@@ -48,6 +49,7 @@ func New(config *pbft.Config, eventMux *event.TypeMux, privateKey *ecdsa.Private
4849
address: crypto.PubkeyToAddress(privateKey.PublicKey),
4950
logger: log.New("backend", "simple"),
5051
db: db,
52+
commitCh: make(chan common.Hash, 1),
5153
}
5254
return backend
5355
}
@@ -70,9 +72,9 @@ type simpleBackend struct {
7072
inserter func(block *types.Block) error
7173

7274
// the channels for pbft engine notifications
73-
viewChange chan bool
74-
commit chan common.Hash
75-
commitErr chan error
75+
commitCh chan common.Hash
76+
proposedBlockHash common.Hash
77+
sealMu sync.Mutex
7678
}
7779

7880
// Address implements pbft.Backend.Address
@@ -128,33 +130,28 @@ func (sb *simpleBackend) Commit(proposal pbft.Proposal) error {
128130
sb.logger.Error("Failed to commit proposal since RequestContext cannot cast to *types.Block")
129131
return errCastingRequest
130132
}
131-
// it's a proposer
132-
if sb.commit != nil {
133-
sb.commitErr = make(chan error, 1)
134-
closeCommitErr := func() {
135-
close(sb.commitErr)
136-
}
137-
defer closeCommitErr()
133+
// - if the proposed and committed blocks are the same, send the proposed hash
134+
// to commit channel, which is being watched inside the engine.Seal() function.
135+
// - otherwise, we try to insert the block.
136+
// -- if success, the ChainHeadEvent event will be broadcasted and try to build
137+
// the next block. If it's a proposer, the previous Seal() will be stopped.
138+
// -- otherwise, a error will be returned and a view change event will be fired.
139+
if sb.proposedBlockHash == block.Hash() {
138140
// feed block hash to Seal() and wait the Seal() result
139-
sb.commit <- block.Hash()
141+
sb.commitCh <- block.Hash()
140142
// TODO: how do we check the block is inserted correctly?
141-
return <-sb.commitErr
142-
} else {
143-
return sb.inserter(block)
143+
return nil
144144
}
145+
return sb.inserter(block)
145146
}
146147

147148
// ViewChanged implements pbft.Backend.ViewChanged
148149
func (sb *simpleBackend) ViewChanged(needNewProposal bool) error {
149-
// step1: update proposer
150-
// step2: notify proposer and validator
151-
if sb.viewChange != nil {
152-
go func() {
153-
sb.viewChange <- needNewProposal
154-
}()
155-
}
156-
if sb.IsProposer() {
157-
go sb.eventMux.Post(core.ChainHeadEvent{})
150+
if needNewProposal {
151+
// if I was or I'm a proposer now, fire a ChainHeadEvent to trigger next Seal()
152+
if !common.EmptyHash(sb.proposedBlockHash) || sb.IsProposer() {
153+
go sb.eventMux.Post(core.ChainHeadEvent{})
154+
}
158155
}
159156
return nil
160157
}

consensus/pbft/backends/simple/engine.go

Lines changed: 12 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,6 @@ var (
4545
errInvalidDifficulty = errors.New("invalid difficulty")
4646
// errNotProposer is returned when I'm not a proposer
4747
errNotProposer = errors.New("not a proposer")
48-
// errViewChanged is returned when we receive a view change event
49-
errViewChanged = errors.New("view changed")
50-
// errOtherBlockCommitted is returned when other block is committed.
51-
errOtherBlockCommitted = errors.New("other block is committed")
5248
// errInvalidPeer is returned when a message from invalid peer comes
5349
errInvalidPeer = errors.New("invalid peer")
5450
// errInvalidExtraDataFormat is returned when the extra data format is incorrect
@@ -270,23 +266,6 @@ func (sb *simpleBackend) Finalize(chain consensus.ChainReader, header *types.Hea
270266
return types.NewBlock(header, txs, nil, receipts), nil
271267
}
272268

273-
func (sb *simpleBackend) closeChannels() {
274-
if sb.viewChange != nil {
275-
close(sb.viewChange)
276-
sb.viewChange = nil
277-
}
278-
279-
if sb.commit != nil {
280-
close(sb.commit)
281-
sb.commit = nil
282-
}
283-
}
284-
285-
func (sb *simpleBackend) newChannels() {
286-
sb.viewChange = make(chan bool, 1)
287-
sb.commit = make(chan common.Hash, 1)
288-
}
289-
290269
// Seal generates a new block for the given input block with the local miner's
291270
// seal place on top.
292271
func (sb *simpleBackend) Seal(chain consensus.ChainReader, block *types.Block, stop <-chan struct{}) (*types.Block, error) {
@@ -314,27 +293,28 @@ func (sb *simpleBackend) Seal(chain consensus.ChainReader, block *types.Block, s
314293
return nil, errNotProposer
315294
}
316295

317-
sb.newChannels()
318-
defer sb.closeChannels()
296+
// get the proposed block hash and clear it if the seal() is completed.
297+
sb.sealMu.Lock()
298+
sb.proposedBlockHash = block.Hash()
299+
clear := func() {
300+
sb.proposedBlockHash = common.Hash{}
301+
sb.sealMu.Unlock()
302+
}
303+
defer clear()
304+
319305
// post block into PBFT engine
320306
go sb.EventMux().Post(pbft.RequestEvent{
321307
Proposal: block,
322308
})
323309

324310
for {
325311
select {
326-
case needNewProposal := <-sb.viewChange:
327-
if needNewProposal {
328-
return nil, errViewChanged
329-
}
330-
// if we don't need to change block, we keep waiting events.
331-
case hash := <-sb.commit:
312+
case hash := <-sb.commitCh:
313+
// if the block hash and the hash from channel are the same,
314+
// return the block. Otherwise, keep waiting the next hash.
332315
if block.Hash() == hash {
333-
sb.commitErr <- nil
334316
return block, nil
335317
}
336-
sb.commitErr <- errOtherBlockCommitted
337-
return nil, errOtherBlockCommitted
338318
case <-stop:
339319
return nil, nil
340320
}

consensus/pbft/backends/simple/engine_test.go

Lines changed: 14 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ func TestSealStopChannel(t *testing.T) {
155155
}
156156
}
157157

158-
func TestSealViewChange(t *testing.T) {
158+
func TestSealRoundChange(t *testing.T) {
159159
chain, engine := newBlockChain(4)
160160
block := makeBlockWithoutSeal(chain, engine, chain.Genesis())
161161
eventSub := engine.EventMux().Subscribe(pbft.RequestEvent{})
@@ -166,7 +166,7 @@ func TestSealViewChange(t *testing.T) {
166166
if !ok {
167167
t.Errorf("unexpected event comes, got: %v, expected: pbft.RequestEvent", reflect.TypeOf(ev.Data))
168168
}
169-
engine.viewChange <- false
169+
engine.ViewChanged(false)
170170
}
171171
eventSub.Unsubscribe()
172172
}
@@ -186,35 +186,10 @@ func TestSealViewChange(t *testing.T) {
186186
}
187187
}
188188

189-
func TestSealViewChangeNeedNewProposal(t *testing.T) {
190-
chain, engine := newBlockChain(4)
191-
block := makeBlockWithoutSeal(chain, engine, chain.Genesis())
192-
eventSub := engine.EventMux().Subscribe(pbft.RequestEvent{})
193-
eventLoop := func() {
194-
select {
195-
case ev := <-eventSub.Chan():
196-
_, ok := ev.Data.(pbft.RequestEvent)
197-
if !ok {
198-
t.Errorf("unexpected event comes, got: %v, expected: pbft.RequestEvent", reflect.TypeOf(ev.Data))
199-
}
200-
engine.viewChange <- true
201-
}
202-
eventSub.Unsubscribe()
203-
}
204-
go eventLoop()
205-
206-
finalBlock, err := engine.Seal(chain, block, nil)
207-
if err != errViewChanged {
208-
t.Errorf("unexpected error comes, got: %v, expected: errViewChanged", err)
209-
}
210-
if finalBlock != nil {
211-
t.Errorf("block should be nil, but got: %v", finalBlock.Hash().Hex())
212-
}
213-
}
214-
215189
func TestSealCommittedOtherHash(t *testing.T) {
216190
chain, engine := newBlockChain(4)
217191
block := makeBlockWithoutSeal(chain, engine, chain.Genesis())
192+
otherBlock := makeBlockWithoutSeal(chain, engine, block)
218193
eventSub := engine.EventMux().Subscribe(pbft.RequestEvent{})
219194
eventLoop := func() {
220195
select {
@@ -223,26 +198,22 @@ func TestSealCommittedOtherHash(t *testing.T) {
223198
if !ok {
224199
t.Errorf("unexpected event comes, got: %v, expected: pbft.RequestEvent", reflect.TypeOf(ev.Data))
225200
}
226-
engine.commitErr = make(chan error, 1)
227-
closeCommitErr := func() {
228-
close(engine.commitErr)
229-
}
230-
defer closeCommitErr()
231-
engine.commit <- common.StringToHash("1234567890")
232-
err := <-engine.commitErr
233-
if err != errOtherBlockCommitted {
234-
t.Errorf("unexpected error comes, got: %v, expected: errOtherBlockCommitted", err)
235-
}
201+
engine.Commit(otherBlock)
236202
}
237203
eventSub.Unsubscribe()
238204
}
239205
go eventLoop()
240-
finalBlock, err := engine.Seal(chain, block, nil)
241-
if err != errOtherBlockCommitted {
242-
t.Errorf("unexpected error comes, got: %v, expected: errOtherBlockCommitted", err)
206+
seal := func() {
207+
engine.Seal(chain, block, nil)
208+
t.Errorf("should not be called")
243209
}
244-
if finalBlock != nil {
245-
t.Errorf("block should be nil, but got: %v", finalBlock)
210+
go seal()
211+
212+
const timeoutDura = 2 * time.Second
213+
timeout := time.NewTimer(timeoutDura)
214+
select {
215+
case <-timeout.C:
216+
// wait 2 seconds to ensure we cannot get any blocks from PBFT
246217
}
247218
}
248219

0 commit comments

Comments
 (0)