9
9
10
10
final class BatchConsumer extends BaseAmqp implements DequeuerInterface
11
11
{
12
- /**
13
- * @var int
14
- */
15
- private $ consumed = 0 ;
16
-
17
12
/**
18
13
* @var \Closure|callable
19
14
*/
@@ -71,67 +66,36 @@ public function setCallback($callback)
71
66
return $ this ;
72
67
}
73
68
74
- public function start ()
69
+ public function consume ()
75
70
{
76
71
$ this ->setupConsumer ();
77
72
78
73
while (count ($ this ->getChannel ()->callbacks )) {
79
- $ this ->getChannel ()->wait ();
80
- }
81
- }
82
-
83
- public function execute (AMQPMessage $ msg )
84
- {
85
- $ this ->addMessage ($ msg );
86
-
87
- $ this ->maybeStopConsumer ();
74
+ if ($ this ->isCompleteBatch ()) {
75
+ $ this ->batchConsume ();
76
+ }
88
77
89
- if (null !== $ this ->getMemoryLimit () && $ this ->isRamAlmostOverloaded ()) {
90
- $ this ->stopConsuming ();
91
- }
92
- }
78
+ $ this ->maybeStopConsumer ();
93
79
94
- public function consume ()
95
- {
96
- $ this ->setupConsumer ();
80
+ $ timeout = $ this ->isEmptyBatch () ? $ this ->getIdleTimeout () : $ this ->getTimeoutWait ();
97
81
98
- $ isConsuming = false ;
99
- $ timeoutWanted = $ this ->getTimeoutWait ();
100
- while (count ($ this ->getChannel ()->callbacks )) {
101
- $ this ->maybeStopConsumer ();
102
- if (!$ this ->forceStop ) {
103
- try {
104
- $ this ->getChannel ()->wait (null , false , $ timeoutWanted );
105
- $ isConsuming = true ;
106
- } catch (AMQPTimeoutException $ e ) {
82
+ try {
83
+ $ this ->getChannel ()->wait (null , false , $ timeout );
84
+ } catch (AMQPTimeoutException $ e ) {
85
+ if (!$ this ->isEmptyBatch ()) {
107
86
$ this ->batchConsume ();
108
- if ($ isConsuming ) {
109
- $ isConsuming = false ;
110
- } elseif (null !== $ this ->getIdleTimeoutExitCode ()) {
111
- return $ this ->getIdleTimeoutExitCode ();
112
- } else {
113
- throw $ e ;
114
- }
87
+ } elseif (null !== $ this ->getIdleTimeoutExitCode ()) {
88
+ return $ this ->getIdleTimeoutExitCode ();
89
+ } else {
90
+ throw $ e ;
115
91
}
116
- } else {
117
- $ this ->batchConsume ();
118
92
}
119
-
120
- if ($ this ->isCompleteBatch ($ isConsuming )) {
121
- $ this ->batchConsume ();
122
- }
123
-
124
- $ timeoutWanted = $ isConsuming ? $ this ->getTimeoutWait () : $ this ->getIdleTimeout ();
125
93
}
126
94
}
127
95
128
- public function batchConsume ()
96
+ private function batchConsume ()
129
97
{
130
- if ($ this ->batchCounter === 0 ) {
131
- return ;
132
- }
133
-
134
- try {
98
+ try {
135
99
$ processFlags = call_user_func ($ this ->callback , $ this ->messages );
136
100
$ this ->handleProcessMessages ($ processFlags );
137
101
$ this ->logger ->debug ('Queue message processed ' , array (
@@ -149,6 +113,7 @@ public function batchConsume()
149
113
'stacktrace ' => $ e ->getTraceAsString ()
150
114
)
151
115
));
116
+ $ this ->resetBatch ();
152
117
$ this ->stopConsuming ();
153
118
} catch (\Exception $ e ) {
154
119
$ this ->logger ->error ($ e ->getMessage (), array (
@@ -186,13 +151,6 @@ protected function handleProcessMessages($processFlags = null)
186
151
foreach ($ processFlags as $ deliveryTag => $ processFlag ) {
187
152
$ this ->handleProcessFlag ($ deliveryTag , $ processFlag );
188
153
}
189
-
190
- $ this ->consumed ++;
191
- $ this ->maybeStopConsumer ();
192
-
193
- if (null !== $ this ->getMemoryLimit () && $ this ->isRamAlmostOverloaded ()) {
194
- $ this ->stopConsuming ();
195
- }
196
154
}
197
155
198
156
/**
@@ -201,7 +159,7 @@ protected function handleProcessMessages($processFlags = null)
201
159
*
202
160
* @return void
203
161
*/
204
- private function handleProcessFlag ($ deliveryTag , $ processFlag )
162
+ private function handleProcessFlag ($ deliveryTag , $ processFlag )
205
163
{
206
164
if ($ processFlag === ConsumerInterface::MSG_REJECT_REQUEUE || false === $ processFlag ) {
207
165
// Reject and requeue message to RabbitMQ
@@ -219,13 +177,19 @@ private function handleProcessFlag ($deliveryTag, $processFlag)
219
177
}
220
178
221
179
/**
222
- * @param bool $isConsuming
223
- *
224
180
* @return bool
225
181
*/
226
- protected function isCompleteBatch ($ isConsuming )
182
+ protected function isCompleteBatch ()
227
183
{
228
- return $ isConsuming && $ this ->batchCounter === $ this ->prefetchCount ;
184
+ return $ this ->batchCounter === $ this ->prefetchCount ;
185
+ }
186
+
187
+ /**
188
+ * @return bool
189
+ */
190
+ protected function isEmptyBatch ()
191
+ {
192
+ return $ this ->batchCounter === 0 ;
229
193
}
230
194
231
195
/**
@@ -238,40 +202,9 @@ protected function isCompleteBatch($isConsuming)
238
202
*/
239
203
public function processMessage (AMQPMessage $ msg )
240
204
{
241
- try {
242
- call_user_func (array ($ this , 'execute ' ), $ msg );
243
- } catch (Exception \StopConsumerException $ e ) {
244
- $ this ->logger ->info ('Consumer requested restart ' , array (
245
- 'amqp ' => array (
246
- 'queue ' => $ this ->queueOptions ['name ' ],
247
- 'message ' => $ msg ,
248
- 'stacktrace ' => $ e ->getTraceAsString ()
249
- )
250
- ));
251
- $ this ->stopConsuming ();
252
- } catch (\Exception $ e ) {
253
- $ this ->logger ->error ($ e ->getMessage (), array (
254
- 'amqp ' => array (
255
- 'queue ' => $ this ->queueOptions ['name ' ],
256
- 'message ' => $ msg ,
257
- 'stacktrace ' => $ e ->getTraceAsString ()
258
- )
259
- ));
260
- $ this ->batchConsume ();
261
-
262
- throw $ e ;
263
- } catch (\Error $ e ) {
264
- $ this ->logger ->error ($ e ->getMessage (), array (
265
- 'amqp ' => array (
266
- 'queue ' => $ this ->queueOptions ['name ' ],
267
- 'message ' => $ msg ,
268
- 'stacktrace ' => $ e ->getTraceAsString ()
269
- )
270
- ));
271
- $ this ->batchConsume ();
205
+ $ this ->addMessage ($ msg );
272
206
273
- throw $ e ;
274
- }
207
+ $ this ->maybeStopConsumer ();
275
208
}
276
209
277
210
/**
@@ -355,7 +288,9 @@ private function getMessageChannel($deliveryTag)
355
288
*/
356
289
public function stopConsuming ()
357
290
{
358
- $ this ->batchConsume ();
291
+ if (!$ this ->isEmptyBatch ()) {
292
+ $ this ->batchConsume ();
293
+ }
359
294
360
295
$ this ->getChannel ()->basic_cancel ($ this ->getConsumerTag ());
361
296
}
@@ -390,6 +325,10 @@ protected function maybeStopConsumer()
390
325
if ($ this ->forceStop ) {
391
326
$ this ->stopConsuming ();
392
327
}
328
+
329
+ if (null !== $ this ->getMemoryLimit () && $ this ->isRamAlmostOverloaded ()) {
330
+ $ this ->stopConsuming ();
331
+ }
393
332
}
394
333
395
334
/**
0 commit comments