Skip to content

Commit 50a87fc

Browse files
authored
fix(worker): setname on worker blocking connection (#291)
1 parent 4ef1cbc commit 50a87fc

File tree

4 files changed

+466
-413
lines changed

4 files changed

+466
-413
lines changed

.github/workflows/coverage.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ jobs:
1818

1919
strategy:
2020
matrix:
21-
node-version: [12]
21+
node-version: [14]
2222

2323
steps:
2424
- uses: actions/checkout@v2

src/classes/worker.ts

+6-1
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,11 @@ export class Worker<T = any> extends QueueBase {
8585
this.on('error', err => console.error(err));
8686
}
8787

88+
async waitUntilReady() {
89+
await super.waitUntilReady();
90+
return this.blockingConnection.client;
91+
}
92+
8893
get repeat() {
8994
return new Promise<Repeat>(async resolve => {
9095
if (!this._repeat) {
@@ -99,7 +104,7 @@ export class Worker<T = any> extends QueueBase {
99104
}
100105

101106
private async run() {
102-
const client = await this.client;
107+
const client = await this.blockingConnection.client;
103108

104109
if (this.closing) {
105110
return;

src/test/test_worker.ts

+51
Original file line numberDiff line numberDiff line change
@@ -930,6 +930,57 @@ describe('workers', function() {
930930
await queueScheduler.close();
931931
});
932932

933+
it('continue processing after a worker has stalled', async function() {
934+
let first = true;
935+
this.timeout(10000);
936+
937+
const worker = new Worker(
938+
queueName,
939+
async job => {
940+
if (first) {
941+
first = false;
942+
return delay(2000);
943+
}
944+
},
945+
{
946+
lockDuration: 1000,
947+
lockRenewTime: 3000, // The lock will not be updated
948+
},
949+
);
950+
await worker.waitUntilReady();
951+
952+
const queueScheduler = new QueueScheduler(queueName, {
953+
stalledInterval: 100,
954+
});
955+
await queueScheduler.waitUntilReady();
956+
957+
const job = await queue.add('test', { bar: 'baz' });
958+
959+
const completed = new Promise(resolve => {
960+
worker.on('completed', resolve);
961+
});
962+
963+
await completed;
964+
965+
await worker.close();
966+
await queueScheduler.close();
967+
});
968+
969+
it('stalled interval cannot be zero', function(done) {
970+
this.timeout(10000);
971+
let queueScheduler;
972+
973+
try {
974+
queueScheduler = new QueueScheduler(queueName, {
975+
stalledInterval: 0,
976+
});
977+
// Fail test if we reach here.
978+
done(new Error('Should throw an exception'));
979+
} catch (err) {
980+
done();
981+
}
982+
});
983+
933984
describe('Concurrency process', () => {
934985
it('should run job in sequence if I specify a concurrency of 1', async () => {
935986
let processing = false;

0 commit comments

Comments
 (0)