Skip to content

Commit 33851f0

Browse files
authored
Merge pull request #97 from renproject/fix/reset-height-race
Reset height inside event loop
2 parents c8ef7e0 + 30c2474 commit 33851f0

File tree

3 files changed

+101
-42
lines changed

3 files changed

+101
-42
lines changed

mq/mq.go

+17
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,23 @@ func (mq *MessageQueue) Consume(h process.Height, propose func(process.Propose),
5555
return
5656
}
5757

58+
// DropMessagesBelowHeight removes all messages from the internal message
59+
// queues that have height less than the given height.
60+
func (mq *MessageQueue) DropMessagesBelowHeight(h process.Height) {
61+
for from, q := range mq.queuesByPid {
62+
lastIndexBelowHeight := 0
63+
for _, m := range q {
64+
if m == nil {
65+
break
66+
}
67+
if height(m) < h {
68+
lastIndexBelowHeight++
69+
}
70+
}
71+
mq.queuesByPid[from] = q[lastIndexBelowHeight:]
72+
}
73+
}
74+
5875
// InsertPropose message into the MessageQueue. This method assumes that the
5976
// sender has already been authenticated and filtered.
6077
func (mq *MessageQueue) InsertPropose(propose process.Propose) {

mq/mq_test.go

+73-40
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,47 @@ var _ = Describe("MQ", func() {
4646
}
4747
}
4848

49+
insertRandomMessages := func(queue *mq.MessageQueue, sender id.Signatory) (process.Height, process.Height, int) {
50+
// at the most 20 heights and rounds in increasing order
51+
heights := make([]process.Height, 1+r.Intn(10))
52+
nextHeight := 1
53+
nextRound := 0
54+
for s := 0; s < cap(heights); s++ {
55+
nextHeight = nextHeight + r.Intn(10)
56+
heights[s] = process.Height(nextHeight)
57+
}
58+
rounds := make([]process.Round, 1+r.Intn(10))
59+
for t := 0; t < cap(rounds); t++ {
60+
nextRound = nextRound + r.Intn(10)
61+
rounds[t] = process.Round(nextRound)
62+
}
63+
64+
// append all messages and shuffle them
65+
msgsCount := cap(heights) * cap(rounds)
66+
msgs := make([]interface{}, 0, msgsCount)
67+
for s := range heights {
68+
for t := range rounds {
69+
msg := randomMsg(r, sender, heights[s], rounds[t])
70+
msgs = append(msgs, msg)
71+
}
72+
}
73+
r.Shuffle(len(msgs), func(i, j int) { msgs[i], msgs[j] = msgs[j], msgs[i] })
74+
75+
// insert all msgs
76+
for _, msg := range msgs {
77+
switch msg := msg.(type) {
78+
case process.Propose:
79+
queue.InsertPropose(msg)
80+
case process.Prevote:
81+
queue.InsertPrevote(msg)
82+
case process.Precommit:
83+
queue.InsertPrecommit(msg)
84+
}
85+
}
86+
87+
return heights[0], heights[len(heights)-1], msgsCount
88+
}
89+
4990
Context("when we instantiate a new message queue", func() {
5091
It("should return an empty mq with the given options", func() {
5192
opts := mq.DefaultOptions()
@@ -261,42 +302,7 @@ var _ = Describe("MQ", func() {
261302

262303
loop := func() bool {
263304
sender := id.NewPrivKey().Signatory()
264-
// at the most 20 heights and rounds in increasing order
265-
heights := make([]process.Height, 1+r.Intn(10))
266-
nextHeight := 1
267-
nextRound := 0
268-
for s := 0; s < cap(heights); s++ {
269-
nextHeight = nextHeight + r.Intn(10)
270-
heights[s] = process.Height(nextHeight)
271-
}
272-
rounds := make([]process.Round, 1+r.Intn(10))
273-
for t := 0; t < cap(rounds); t++ {
274-
nextRound = nextRound + r.Intn(10)
275-
rounds[t] = process.Round(nextRound)
276-
}
277-
278-
// append all messages and shuffle them
279-
msgsCount := cap(heights) * cap(rounds)
280-
msgs := make([]interface{}, 0, msgsCount)
281-
for s := range heights {
282-
for t := range rounds {
283-
msg := randomMsg(r, sender, heights[s], rounds[t])
284-
msgs = append(msgs, msg)
285-
}
286-
}
287-
r.Shuffle(len(msgs), func(i, j int) { msgs[i], msgs[j] = msgs[j], msgs[i] })
288-
289-
// insert all msgs
290-
for _, msg := range msgs {
291-
switch msg := msg.(type) {
292-
case process.Propose:
293-
queue.InsertPropose(msg)
294-
case process.Prevote:
295-
queue.InsertPrevote(msg)
296-
case process.Precommit:
297-
queue.InsertPrecommit(msg)
298-
}
299-
}
305+
minHeight, maxHeight, msgsCount := insertRandomMessages(&queue, sender)
300306

301307
// we should first consume msg1 and then msg2
302308
prevHeight := process.Height(-1)
@@ -362,14 +368,13 @@ var _ = Describe("MQ", func() {
362368
i++
363369
}
364370

365-
// cannot consume msgs of height less than lowerHeight
366-
lowerHeight := heights[0] - 1
367-
n := queue.Consume(lowerHeight, proposeCallback, prevoteCallback, precommitCallback)
371+
// cannot consume msgs of height less than the min height
372+
n := queue.Consume(minHeight-1, proposeCallback, prevoteCallback, precommitCallback)
368373
Expect(n).To(Equal(0))
369374
Expect(i).To(Equal(0))
370375

371376
// consume all messages
372-
n = queue.Consume(heights[len(heights)-1], proposeCallback, prevoteCallback, precommitCallback)
377+
n = queue.Consume(maxHeight, proposeCallback, prevoteCallback, precommitCallback)
373378
Expect(n).To(Equal(msgsCount))
374379
Expect(i).To(Equal(msgsCount))
375380

@@ -380,6 +385,34 @@ var _ = Describe("MQ", func() {
380385
})
381386
})
382387

388+
Context("when dropping all messages below a certain height", func() {
389+
It("should remove all corresponding messages from the queues", func() {
390+
opts := mq.DefaultOptions()
391+
queue := mq.New(opts)
392+
393+
loop := func() bool {
394+
sender := id.NewPrivKey().Signatory()
395+
_, maxHeight, _ := insertRandomMessages(&queue, sender)
396+
thresholdHeight := process.Height(r.Intn(int(maxHeight)))
397+
queue.DropMessagesBelowHeight(thresholdHeight)
398+
399+
proposeCallback := func(propose process.Propose) {
400+
Expect(propose.Height >= thresholdHeight).To(BeTrue())
401+
}
402+
prevoteCallback := func(prevote process.Prevote) {
403+
Expect(prevote.Height >= thresholdHeight).To(BeTrue())
404+
}
405+
precommitCallback := func(precommit process.Precommit) {
406+
Expect(precommit.Height >= thresholdHeight).To(BeTrue())
407+
}
408+
409+
_ = queue.Consume(maxHeight, proposeCallback, prevoteCallback, precommitCallback)
410+
return true
411+
}
412+
Expect(quick.Check(loop, nil)).To(Succeed())
413+
})
414+
})
415+
383416
Context("when we have reached the queue's max capacity", func() {
384417
It("trivial case when max capacity is 1", func() {
385418
loop := func() bool {

replica/replica.go

+11-2
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,9 @@ func (replica *Replica) Run(ctx context.Context) {
137137
return
138138
}
139139
replica.mq.InsertPrecommit(m)
140+
case process.Height:
141+
replica.proc.State = process.DefaultState().WithCurrentHeight(m)
142+
replica.mq.DropMessagesBelowHeight(m)
140143
}
141144
}
142145

@@ -211,11 +214,17 @@ func (replica *Replica) TimeoutPrecommit(ctx context.Context, timeout timer.Time
211214
// ResetHeight of the underlying process to a future height. This is should only
212215
// be used when resynchronising the chain. If the given height is less than or
213216
// equal to the current height, nothing happens.
214-
func (replica *Replica) ResetHeight(newHeight process.Height) {
217+
//
218+
// NOTE: All messages that are currently in the message queue for heights less
219+
// than the given height will be dropped.
220+
func (replica *Replica) ResetHeight(ctx context.Context, newHeight process.Height) {
215221
if newHeight <= replica.proc.State.CurrentHeight {
216222
return
217223
}
218-
replica.proc.State = process.DefaultState().WithCurrentHeight(newHeight)
224+
select {
225+
case <-ctx.Done():
226+
case replica.mch <- newHeight:
227+
}
219228
}
220229

221230
// State returns the current height, round and step of the underlying process.

0 commit comments

Comments
 (0)