Skip to content

Commit 7037947

Browse files
committed
basic test for message dropping
1 parent be3e4a4 commit 7037947

File tree

1 file changed

+73
-40
lines changed

1 file changed

+73
-40
lines changed

mq/mq_test.go

+73-40
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,47 @@ var _ = Describe("MQ", func() {
4646
}
4747
}
4848

49+
insertRandomMessages := func(queue *mq.MessageQueue, sender id.Signatory) (process.Height, process.Height, int) {
50+
// at the most 20 heights and rounds in increasing order
51+
heights := make([]process.Height, 1+r.Intn(10))
52+
nextHeight := 1
53+
nextRound := 0
54+
for s := 0; s < cap(heights); s++ {
55+
nextHeight = nextHeight + r.Intn(10)
56+
heights[s] = process.Height(nextHeight)
57+
}
58+
rounds := make([]process.Round, 1+r.Intn(10))
59+
for t := 0; t < cap(rounds); t++ {
60+
nextRound = nextRound + r.Intn(10)
61+
rounds[t] = process.Round(nextRound)
62+
}
63+
64+
// append all messages and shuffle them
65+
msgsCount := cap(heights) * cap(rounds)
66+
msgs := make([]interface{}, 0, msgsCount)
67+
for s := range heights {
68+
for t := range rounds {
69+
msg := randomMsg(r, sender, heights[s], rounds[t])
70+
msgs = append(msgs, msg)
71+
}
72+
}
73+
r.Shuffle(len(msgs), func(i, j int) { msgs[i], msgs[j] = msgs[j], msgs[i] })
74+
75+
// insert all msgs
76+
for _, msg := range msgs {
77+
switch msg := msg.(type) {
78+
case process.Propose:
79+
queue.InsertPropose(msg)
80+
case process.Prevote:
81+
queue.InsertPrevote(msg)
82+
case process.Precommit:
83+
queue.InsertPrecommit(msg)
84+
}
85+
}
86+
87+
return heights[0], heights[len(heights)-1], msgsCount
88+
}
89+
4990
Context("when we instantiate a new message queue", func() {
5091
It("should return an empty mq with the given options", func() {
5192
opts := mq.DefaultOptions()
@@ -261,42 +302,7 @@ var _ = Describe("MQ", func() {
261302

262303
loop := func() bool {
263304
sender := id.NewPrivKey().Signatory()
264-
// at the most 20 heights and rounds in increasing order
265-
heights := make([]process.Height, 1+r.Intn(10))
266-
nextHeight := 1
267-
nextRound := 0
268-
for s := 0; s < cap(heights); s++ {
269-
nextHeight = nextHeight + r.Intn(10)
270-
heights[s] = process.Height(nextHeight)
271-
}
272-
rounds := make([]process.Round, 1+r.Intn(10))
273-
for t := 0; t < cap(rounds); t++ {
274-
nextRound = nextRound + r.Intn(10)
275-
rounds[t] = process.Round(nextRound)
276-
}
277-
278-
// append all messages and shuffle them
279-
msgsCount := cap(heights) * cap(rounds)
280-
msgs := make([]interface{}, 0, msgsCount)
281-
for s := range heights {
282-
for t := range rounds {
283-
msg := randomMsg(r, sender, heights[s], rounds[t])
284-
msgs = append(msgs, msg)
285-
}
286-
}
287-
r.Shuffle(len(msgs), func(i, j int) { msgs[i], msgs[j] = msgs[j], msgs[i] })
288-
289-
// insert all msgs
290-
for _, msg := range msgs {
291-
switch msg := msg.(type) {
292-
case process.Propose:
293-
queue.InsertPropose(msg)
294-
case process.Prevote:
295-
queue.InsertPrevote(msg)
296-
case process.Precommit:
297-
queue.InsertPrecommit(msg)
298-
}
299-
}
305+
minHeight, maxHeight, msgsCount := insertRandomMessages(&queue, sender)
300306

301307
// we should first consume msg1 and then msg2
302308
prevHeight := process.Height(-1)
@@ -362,14 +368,13 @@ var _ = Describe("MQ", func() {
362368
i++
363369
}
364370

365-
// cannot consume msgs of height less than lowerHeight
366-
lowerHeight := heights[0] - 1
367-
n := queue.Consume(lowerHeight, proposeCallback, prevoteCallback, precommitCallback)
371+
// cannot consume msgs of height less than the min height
372+
n := queue.Consume(minHeight - 1, proposeCallback, prevoteCallback, precommitCallback)
368373
Expect(n).To(Equal(0))
369374
Expect(i).To(Equal(0))
370375

371376
// consume all messages
372-
n = queue.Consume(heights[len(heights)-1], proposeCallback, prevoteCallback, precommitCallback)
377+
n = queue.Consume(maxHeight, proposeCallback, prevoteCallback, precommitCallback)
373378
Expect(n).To(Equal(msgsCount))
374379
Expect(i).To(Equal(msgsCount))
375380

@@ -380,6 +385,34 @@ var _ = Describe("MQ", func() {
380385
})
381386
})
382387

388+
Context("when dropping all messages below a certain height", func() {
389+
It("should remove all corresponding messages from the queues", func() {
390+
opts := mq.DefaultOptions()
391+
queue := mq.New(opts)
392+
393+
loop := func() bool {
394+
sender := id.NewPrivKey().Signatory()
395+
_, maxHeight, _ := insertRandomMessages(&queue, sender)
396+
thresholdHeight := process.Height(r.Intn(int(maxHeight)))
397+
queue.DropMessagesBelowHeight(thresholdHeight)
398+
399+
proposeCallback := func(propose process.Propose) {
400+
Expect(propose.Height >= thresholdHeight).To(BeTrue())
401+
}
402+
prevoteCallback := func(prevote process.Prevote) {
403+
Expect(prevote.Height >= thresholdHeight).To(BeTrue())
404+
}
405+
precommitCallback := func(precommit process.Precommit) {
406+
Expect(precommit.Height >= thresholdHeight).To(BeTrue())
407+
}
408+
409+
_ = queue.Consume(maxHeight, proposeCallback, prevoteCallback, precommitCallback)
410+
return true
411+
}
412+
Expect(quick.Check(loop, nil)).To(Succeed())
413+
})
414+
})
415+
383416
Context("when we have reached the queue's max capacity", func() {
384417
It("trivial case when max capacity is 1", func() {
385418
loop := func() bool {

0 commit comments

Comments
 (0)