Skip to content

Commit 0e8e090

Browse files
authored
Merge pull request #105 from renproject/merge/master
Merge latest changes from master
2 parents b7b72b2 + a07629c commit 0e8e090

File tree

7 files changed

+542
-55
lines changed

7 files changed

+542
-55
lines changed

mq/mq.go

+21-11
Original file line numberDiff line numberDiff line change
@@ -33,22 +33,32 @@ func New(opts Options) MessageQueue {
3333
// have heights up to (and including) the given height. The appropriate callback
3434
// will be called for every message that is consumed. All consumed messages will
3535
// be dropped from the MessageQueue.
36-
func (mq *MessageQueue) Consume(h process.Height, propose func(process.Propose), prevote func(process.Prevote), precommit func(process.Precommit)) (n int) {
36+
func (mq *MessageQueue) Consume(h process.Height, propose func(process.Propose), prevote func(process.Prevote), precommit func(process.Precommit), procsAllowed map[id.Signatory]bool) (n int) {
3737
for from, q := range mq.queuesByPid {
3838
for len(q) > 0 {
3939
if q[0] == nil || height(q[0]) > h {
4040
break
4141
}
42-
switch msg := q[0].(type) {
43-
case process.Propose:
44-
propose(msg)
45-
case process.Prevote:
46-
prevote(msg)
47-
case process.Precommit:
48-
precommit(msg)
49-
}
50-
n++
51-
q = q[1:]
42+
43+
func() {
44+
defer func() {
45+
n++
46+
q = q[1:]
47+
}()
48+
49+
if ok := procsAllowed[from]; !ok {
50+
return
51+
}
52+
53+
switch msg := q[0].(type) {
54+
case process.Propose:
55+
propose(msg)
56+
case process.Prevote:
57+
prevote(msg)
58+
case process.Precommit:
59+
precommit(msg)
60+
}
61+
}()
5262
}
5363
mq.queuesByPid[from] = q
5464
}

0 commit comments

Comments
 (0)