You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Copy file name to clipboardexpand all lines: docs/gitbook/bullmq-pro/batches.md
+51-13
Original file line number
Diff line number
Diff line change
@@ -4,11 +4,9 @@ description: Processing jobs in batches
4
4
5
5
# Batches
6
6
7
-
It is possible to configure workers so that instead of processing one job at a time they can process up to a number of jobs (a so-called _batch_) in one go.
7
+
It is possible to configure workers so that instead of processing one job at a time they can process up to a number of jobs (a so-called _batch_) in one go. Workers using batches have slightly different semantics and behavior than normal workers, so read carefully the following examples to avoid pitfalls.
8
8
9
-
Workers using batches have slightly different semantics and behavior than normal workers, so read carefully the following examples to avoid pitfalls.
10
-
11
-
In order to enable batches you must pass the `batches` option with a size representing the maximum amount of jobs per batch:
9
+
To enable batches, pass the `batch` option with a `size` property representing the maximum number of jobs per batch:
12
10
13
11
```typescript
14
12
const worker =newWorkerPro(
@@ -26,14 +24,54 @@ const worker = new WorkerPro(
26
24
```
27
25
28
26
{% hint style="info" %}
29
-
There is no maximum limit for the size of the batches, however, keep in mind that there is an overhead proportional to the size of the batch, so really large batches could create performance issues. A typical value would be something between 10 and 50 jobs per batch.
27
+
There is no strict maximum limit for the size of batches; however, keep in mind that larger batches introduce overhead proportional to their size, which could lead to performance issues. Typical batch sizes range between 10 and 50 jobs.
28
+
{% endhint %}
29
+
30
+
### New Batch Options: `minSize` and `timeout`
31
+
32
+
In addition to the size option, two new options—`minSize` and `timeout`—provide greater control over batch processing:
33
+
34
+
*`minSize`: Specifies the minimum number of jobs required before the worker processes a batch. The worker will wait until at least minSize jobs are available before fetching and processing them, up to the size limit. If fewer than minSize jobs are available, the worker waits indefinitely unless a timeout is also set. 
35
+
*`timeout`: Defines the maximum time (in milliseconds) the worker will wait for minSize jobs to accumulate. If the timeout expires before minSize is reached, the worker processes whatever jobs are available, up to the size limit. If minSize is not set the timeout option is effectively ignored, as the worker batches only avaialble jobs.
36
+
37
+
{% hint style="info" %}
38
+
Important: minSize and timeout are not compatible with groups. When groups are used, the worker ignores minSize and tries to batch avaialble jobs without waiting.
30
39
{% endhint %}
31
40
41
+
Here’s an example configuration using both `minSize` and `timeout`:
42
+
43
+
```typescript
44
+
const worker =newWorkerPro(
45
+
'My Queue',
46
+
async (job:JobPro) => {
47
+
const batch =job.getBatch();
48
+
for (let i =0; i<batch.length; i++) {
49
+
const batchedJob =batch[i];
50
+
awaitdoSomethingWithBatchedJob(batchedJob);
51
+
}
52
+
},
53
+
{
54
+
connection,
55
+
batch: {
56
+
size: 10, // Maximum jobs per batch
57
+
minSize: 5, // Wait for at least 5 jobs
58
+
timeout: 30_000// Wait up to 30 seconds
59
+
},
60
+
},
61
+
);
62
+
```
63
+
64
+
In this example:
65
+
66
+
* The worker waits for at least 5 jobs to become available, up to a maximum of 10 jobs per batch.
67
+
* If 5 or more jobs are available within 30 seconds, it processes the batch (up to 10 jobs).
68
+
* If fewer than 5 jobs are available after 30 seconds, it processes whatever jobs are present, even if below `minSize`.
69
+
32
70
### Failing jobs
33
71
34
-
When using batches, the default is that if the processor throws an exception, **all the jobs in the batch will fail.**
72
+
When using batches, the default is that if the processor throws an exception, **all jobs in the batch will fail.**
35
73
36
-
Sometimes it is useful to just fail specific jobs in a batch, we can accomplish this by using the job's method `setAsFailed`. See how the example above can be modified to fail specific jobs:
74
+
To fail specific jobs instead, use the `setAsFailed` method on individual jobs within the batch:
37
75
38
76
```typescript
39
77
const worker =newWorkerPro(
@@ -54,13 +92,13 @@ const worker = new WorkerPro(
54
92
);
55
93
```
56
94
57
-
Only the jobs that are `setAsFailed` will fail, the rest will be moved to _complete_ when the processor for the batch job completes.
95
+
Only jobs explicitly marked with `setAsFailed` will fail; the remaining jobs in the batch will complete succesfully once the processor finishes.
58
96
59
97
### Handling events
60
98
61
-
Batches are handled by wrapping all the jobs in a batch into a dummy job that keeps all the jobs in an internal array. This approach simplifies the mechanics of running batches, however, it also affects things like how events are handled. For instance, if you need to listen for individual jobs that have completed or failed you must use global events, as the event handler on the worker instance will only report on the events produced by the wrapper batch job, and not the jobs themselves.
99
+
Batches are managed by wrapping all jobs in a batch into a dummy job that holds the jobs in an internal array. This simplifies batch processing but affects event handling. For example, worker-level event listeners (e.g., `worker.on('completed', ...)`) report events for the dummy batch job, not the individual jobs within it.
62
100
63
-
It is possible, however, to call the `getBatch` function in order to retrieve all the jobs that belong to a given batch.
101
+
To retrieve the jobs in a batch from an event handler, use the `getBatch` method:
***job-scheduler:** consider removing current job from wait, paused or prioritized ([#3066](https://github.com/taskforcesh/bullmq/issues/3066)) ([97cd2b1](https://github.com/taskforcesh/bullmq/commit/97cd2b147d541e0984d1c2e107110e1a9d56d9b5))
7
+
8
+
9
+
### Performance Improvements
10
+
11
+
***delayed:** add marker once when promoting delayed jobs ([#3096](https://github.com/taskforcesh/bullmq/issues/3096)) (python) ([38912fb](https://github.com/taskforcesh/bullmq/commit/38912fba969d614eb44d05517ba2ec8bc418a16e))
***repeat:** use JobPro class when creating delayed job ([#292](https://github.com/taskforcesh/bullmq-pro/issues/292)) ([ce9eff8](https://github.com/taskforcesh/bullmq-pro/commit/ce9eff8a7c000afb5bc23173267f44b2040a0c6a))
19
+
***worker:** do not execute run method when no processor is defined when resuming ([#3089](https://github.com/taskforcesh/bullmq/issues/3089)) ([4a66933](https://github.com/taskforcesh/bullmq/commit/4a66933496db68a84ec7eb7c153fcedb7bd14c7b))
20
+
***worker:** do not resume when closing ([#3080](https://github.com/taskforcesh/bullmq/issues/3080)) ([024ee0f](https://github.com/taskforcesh/bullmq/commit/024ee0f3f0e808c256712d3ccb1bcadb025eb931))
21
+
***job:** set processedBy when moving job to active in moveToFinished ([#3077](https://github.com/taskforcesh/bullmq/issues/3077)) fixes [#3073](https://github.com/taskforcesh/bullmq/issues/3073) ([1aa970c](https://github.com/taskforcesh/bullmq/commit/1aa970ced3c55949aea6726c4ad29531089f5370))
22
+
***drain:** pass delayed key for redis cluster ([#3074](https://github.com/taskforcesh/bullmq/issues/3074)) ([05ea32b](https://github.com/taskforcesh/bullmq/commit/05ea32b7e4f0cd4099783fd81d2b3214d7a293d5))
23
+
***job-scheduler:** restore limit option to be saved ([#3071](https://github.com/taskforcesh/bullmq/issues/3071)) ([3e649f7](https://github.com/taskforcesh/bullmq/commit/3e649f7399514b343447ed2073cc07e4661f7390))
24
+
***job-scheduler:** return undefined in getJobScheduler when it does not exist ([#3065](https://github.com/taskforcesh/bullmq/issues/3065)) fixes [#3062](https://github.com/taskforcesh/bullmq/issues/3062) ([548cc1c](https://github.com/taskforcesh/bullmq/commit/548cc1ce8080042b4b44009ea99108bd24193895))
25
+
* fix return type of getNextJob ([b970281](https://github.com/taskforcesh/bullmq/commit/b9702812e6961f0f5a834f66d43cfb2feabaafd8))
26
+
27
+
28
+
### Features
29
+
30
+
***job:** add moveToWait method for manual processing ([#2978](https://github.com/taskforcesh/bullmq/issues/2978)) ([5a97491](https://github.com/taskforcesh/bullmq/commit/5a97491a0319df320b7858657e03c357284e0108))
31
+
***queue:** support removeGlobalConcurrency method ([#3076](https://github.com/taskforcesh/bullmq/issues/3076)) ([ece8532](https://github.com/taskforcesh/bullmq/commit/ece853203adb420466dfaf3ff8bccc73fb917147))
32
+
33
+
34
+
### Performance Improvements
35
+
36
+
***add-job:** add job into wait or prioritized state when delay is provided as 0 ([#3052](https://github.com/taskforcesh/bullmq/issues/3052)) ([3e990eb](https://github.com/taskforcesh/bullmq/commit/3e990eb742b3a12065110f33135f282711fdd7b9))
***worker:** wait fetched jobs to be processed when closing ([#3059](https://github.com/taskforcesh/bullmq/issues/3059)) ([d4de2f5](https://github.com/taskforcesh/bullmq/commit/d4de2f5e88d57ea00274e62ab23d09f4806196f8))
***job:** save processedBy attribute when preparing for processing ([#300](https://github.com/taskforcesh/bullmq-pro/issues/300)) ([c947f6e](https://github.com/taskforcesh/bullmq-pro/commit/c947f6eab80ecd7124e77a589e23f50909e0dee8))
***groups:** support local limiter options ([#262](https://github.com/taskforcesh/bullmq-pro/issues/262)) ([fed293c](https://github.com/taskforcesh/bullmq-pro/commit/fed293cceb575caa7be4987cb65c488faf700075))
***job-scheduler:** revert add delayed job and update in the same script ([9f0f1ba](https://github.com/taskforcesh/bullmq/commit/9f0f1ba9b17874a757ac38c1878792c0df3c5a9a))
***worker:** evaluate if a job needs to be fetched when moving to failed ([#3043](https://github.com/taskforcesh/bullmq/issues/3043)) ([406e21c](https://github.com/taskforcesh/bullmq/commit/406e21c9aadd7670f353c1c6b102a401fc327653))
73
+
***retry-job:** consider updating failures in job ([#3036](https://github.com/taskforcesh/bullmq/issues/3036)) ([21e8495](https://github.com/taskforcesh/bullmq/commit/21e8495b5f2bf5418d86f60b59fad25d306a0298))
74
+
***flow-producer:** add support for skipWaitingForReady ([6d829fc](https://github.com/taskforcesh/bullmq/commit/6d829fceda9f204f193c533ffc780962692b8f16))
75
+
76
+
77
+
### Features
78
+
79
+
***job-scheduler:** save limit option ([#3033](https://github.com/taskforcesh/bullmq/issues/3033)) ([a1571ea](https://github.com/taskforcesh/bullmq/commit/a1571ea03be6c6c41794fa272c38c29588351bbf))
80
+
***queue:** add option to skip wait until connection ready ([e728299](https://github.com/taskforcesh/bullmq/commit/e72829922d4234b92290346dce5d33f5b98ee373))
***worker:** avoid possible hazard in closing worker ([0f07467](https://github.com/taskforcesh/bullmq/commit/0f0746727176d7ff285ae2d1f35048109b4574c5))
***worker:** remove the use of multi in extend locks ([3862075](https://github.com/taskforcesh/bullmq-pro/commit/3862075ab4e41cfa4c1f6b3f87ba50a5087f8c0d))
description: How to rate-limit each group with a different limit per group.
3
+
---
4
+
5
+
# Local group rate limit
6
+
7
+
Sometimes it is required that different groups have different rate limits, this could be the case for example if a group represents a given user in the system, and depending on the user's quota or other factors we would like to have a different rate-limit for it.
8
+
9
+
You can use a local group rate limit, which would be used only for the specific group that have the rate-limit setup. For example:
This code would set a specific rate limit on the group "my group" of max 100 jobs per second. Note that you can still have a ["default" rate-limit](rate-limiting.md) specified for the rest of the groups, the call to `setGroupRateLimit` will therefore allow you to override that rate-limit .
24
+
25
+
### Read more
26
+
27
+
*[ Local Rate Limit Group API Reference](https://api.bullmq.pro/classes/v7.QueuePro.html#setGroupRateLimit)
0 commit comments