Skip to content

Commit 6e99250

Browse files
authored
fix(priority): consider paused state when calling getCountsPerPriority (python) (#2609)
1 parent 16fb267 commit 6e99250

File tree

4 files changed

+40
-4
lines changed

4 files changed

+40
-4
lines changed

python/bullmq/scripts.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ def __init__(self, prefix: str, queueName: str, redisConnection: RedisConnection
3939
"cleanJobsInSet": self.redisClient.register_script(self.getScript("cleanJobsInSet-2.lua")),
4040
"extendLock": self.redisClient.register_script(self.getScript("extendLock-2.lua")),
4141
"getCounts": self.redisClient.register_script(self.getScript("getCounts-1.lua")),
42-
"getCountsPerPriority": self.redisClient.register_script(self.getScript("getCountsPerPriority-2.lua")),
42+
"getCountsPerPriority": self.redisClient.register_script(self.getScript("getCountsPerPriority-4.lua")),
4343
"getRanges": self.redisClient.register_script(self.getScript("getRanges-1.lua")),
4444
"getState": self.redisClient.register_script(self.getScript("getState-8.lua")),
4545
"getStateV2": self.redisClient.register_script(self.getScript("getStateV2-8.lua")),
@@ -325,6 +325,8 @@ def getCounts(self, types):
325325

326326
def getCountsPerPriorityArgs(self, priorities):
327327
keys = [self.keys['wait'],
328+
self.keys['paused'],
329+
self.keys['meta'],
328330
self.keys['prioritized']]
329331

330332
args = priorities

src/classes/scripts.ts

+2
Original file line numberDiff line numberDiff line change
@@ -617,6 +617,8 @@ export class Scripts {
617617
private getCountsPerPriorityArgs(priorities: number[]): (string | number)[] {
618618
const keys: (string | number)[] = [
619619
this.queue.keys.wait,
620+
this.queue.keys.paused,
621+
this.queue.keys.meta,
620622
this.queue.keys.prioritized,
621623
];
622624

src/commands/getCountsPerPriority-2.lua src/commands/getCountsPerPriority-4.lua

+10-3
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,26 @@
33
44
Input:
55
KEYS[1] wait key
6-
KEYS[2] prioritized key
6+
KEYS[2] paused key
7+
KEYS[3] meta key
8+
KEYS[4] prioritized key
79
810
ARGV[1...] priorities
911
]]
1012
local rcall = redis.call
1113
local results = {}
1214
local waitKey = KEYS[1]
13-
local prioritizedKey = KEYS[2]
15+
local pausedKey = KEYS[2]
16+
local prioritizedKey = KEYS[4]
17+
18+
-- Includes
19+
--- @include "includes/getTargetQueueList"
1420

1521
for i = 1, #ARGV do
1622
local priority = tonumber(ARGV[i])
1723
if priority == 0 then
18-
results[#results+1] = rcall("LLEN", waitKey)
24+
local target = getTargetQueueList(KEYS[3], waitKey, pausedKey)
25+
results[#results+1] = rcall("LLEN", target)
1926
else
2027
results[#results+1] = rcall("ZCOUNT", prioritizedKey,
2128
priority * 0x100000000, (priority + 1) * 0x100000000 - 1)

tests/test_getters.ts

+25
Original file line numberDiff line numberDiff line change
@@ -866,6 +866,31 @@ describe('Jobs getters', function () {
866866
'3': 10,
867867
});
868868
});
869+
870+
describe('when queue is paused', () => {
871+
it('returns job counts per priority', async () => {
872+
await queue.waitUntilReady();
873+
874+
await queue.pause();
875+
const jobs = Array.from(Array(42).keys()).map(index => ({
876+
name: 'test',
877+
data: {},
878+
opts: {
879+
priority: index % 4,
880+
},
881+
}));
882+
await queue.addBulk(jobs);
883+
884+
const counts = await queue.getCountsPerPriority([0, 1, 2, 3]);
885+
886+
expect(counts).to.be.eql({
887+
'0': 11,
888+
'1': 11,
889+
'2': 10,
890+
'3': 10,
891+
});
892+
});
893+
});
869894
});
870895

871896
describe('.getDependencies', () => {

0 commit comments

Comments
 (0)