Skip to content

Commit 564e87c

Browse files
authored
Merge pull request #413 from devrck/batch-consumer
[Feature] Bulk Consumer
2 parents c28c4e3 + a39a2a0 commit 564e87c

9 files changed

+902
-6
lines changed

Command/BatchConsumerCommand.php

+106
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
<?php
2+
3+
namespace OldSound\RabbitMqBundle\Command;
4+
5+
use OldSound\RabbitMqBundle\RabbitMq\BatchConsumer;
6+
use PhpAmqpLib\Exception\AMQPTimeoutException;
7+
use Symfony\Component\Console\Input\InputArgument;
8+
use Symfony\Component\Console\Input\InputInterface;
9+
use Symfony\Component\Console\Input\InputOption;
10+
use Symfony\Component\Console\Output\OutputInterface;
11+
12+
final class BatchConsumerCommand extends BaseRabbitMqCommand
13+
{
14+
/**
15+
* @var BatchConsumer
16+
*/
17+
protected $consumer;
18+
19+
public function stopConsumer()
20+
{
21+
if ($this->consumer instanceof BatchConsumer) {
22+
// Process current message, then halt consumer
23+
$this->consumer->forceStopConsumer();
24+
25+
// Halt consumer if waiting for a new message from the queue
26+
try {
27+
$this->consumer->stopConsuming();
28+
} catch (AMQPTimeoutException $e) {}
29+
}
30+
}
31+
32+
protected function configure()
33+
{
34+
parent::configure();
35+
36+
$this
37+
->setName('rabbitmq:batch:consumer')
38+
->addArgument('name', InputArgument::REQUIRED, 'Consumer Name')
39+
->addOption('route', 'r', InputOption::VALUE_OPTIONAL, 'Routing Key', '')
40+
->addOption('memory-limit', 'l', InputOption::VALUE_OPTIONAL, 'Allowed memory for this process', null)
41+
->addOption('debug', 'd', InputOption::VALUE_NONE, 'Enable Debugging')
42+
->addOption('without-signals', 'w', InputOption::VALUE_NONE, 'Disable catching of system signals')
43+
->setDescription('Executes a Batch Consumer');
44+
;
45+
}
46+
47+
/**
48+
* Executes the current command.
49+
*
50+
* @param InputInterface $input An InputInterface instance
51+
* @param OutputInterface $output An OutputInterface instance
52+
*
53+
* @return integer 0 if everything went fine, or an error code
54+
*
55+
* @throws \InvalidArgumentException When the number of messages to consume is less than 0
56+
* @throws \BadFunctionCallException When the pcntl is not installed and option -s is true
57+
*/
58+
protected function execute(InputInterface $input, OutputInterface $output)
59+
{
60+
if (defined('AMQP_WITHOUT_SIGNALS') === false) {
61+
define('AMQP_WITHOUT_SIGNALS', $input->getOption('without-signals'));
62+
}
63+
64+
if (!AMQP_WITHOUT_SIGNALS && extension_loaded('pcntl')) {
65+
if (!function_exists('pcntl_signal')) {
66+
throw new \BadFunctionCallException("Function 'pcntl_signal' is referenced in the php.ini 'disable_functions' and can't be called.");
67+
}
68+
69+
pcntl_signal(SIGTERM, array(&$this, 'stopConsumer'));
70+
pcntl_signal(SIGINT, array(&$this, 'stopConsumer'));
71+
}
72+
73+
if (defined('AMQP_DEBUG') === false) {
74+
define('AMQP_DEBUG', (bool) $input->getOption('debug'));
75+
}
76+
77+
$this->initConsumer($input);
78+
79+
return $this->consumer->consume();
80+
}
81+
82+
/**
83+
* @param InputInterface $input
84+
*/
85+
protected function initConsumer(InputInterface $input)
86+
{
87+
$this->consumer = $this->getContainer()
88+
->get(sprintf($this->getConsumerService(), $input->getArgument('name')));
89+
90+
if (null !== $input->getOption('memory-limit') &&
91+
ctype_digit((string) $input->getOption('memory-limit')) &&
92+
$input->getOption('memory-limit') > 0
93+
) {
94+
$this->consumer->setMemoryLimit($input->getOption('memory-limit'));
95+
}
96+
$this->consumer->setRoutingKey($input->getOption('route'));
97+
}
98+
99+
/**
100+
* @return string
101+
*/
102+
protected function getConsumerService()
103+
{
104+
return 'old_sound_rabbit_mq.%s_batch';
105+
}
106+
}

DependencyInjection/Configuration.php

+54-1
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,26 @@
1313
*/
1414
class Configuration implements ConfigurationInterface
1515
{
16+
/**
17+
* @var string
18+
*/
19+
protected $name;
20+
21+
/**
22+
* Configuration constructor.
23+
*
24+
* @param string $name
25+
*/
26+
public function __construct($name)
27+
{
28+
$this->name = $name;
29+
}
30+
1631
public function getConfigTreeBuilder()
1732
{
1833
$tree = new TreeBuilder();
1934

20-
$rootNode = $tree->root('old_sound_rabbit_mq');
35+
$rootNode = $tree->root($this->name);
2136

2237
$rootNode
2338
->children()
@@ -33,6 +48,7 @@ public function getConfigTreeBuilder()
3348
$this->addConsumers($rootNode);
3449
$this->addMultipleConsumers($rootNode);
3550
$this->addDynamicConsumers($rootNode);
51+
$this->addBatchConsumers($rootNode);
3652
$this->addAnonConsumers($rootNode);
3753
$this->addRpcClients($rootNode);
3854
$this->addRpcServers($rootNode);
@@ -228,6 +244,43 @@ protected function addDynamicConsumers(ArrayNodeDefinition $node)
228244
;
229245
}
230246

247+
/**
248+
* @param ArrayNodeDefinition $node
249+
*
250+
* @return void
251+
*/
252+
protected function addBatchConsumers(ArrayNodeDefinition $node)
253+
{
254+
$node
255+
->children()
256+
->arrayNode('batch_consumers')
257+
->canBeUnset()
258+
->useAttributeAsKey('key')
259+
->prototype('array')
260+
->append($this->getExchangeConfiguration())
261+
->append($this->getQueueConfiguration())
262+
->children()
263+
->scalarNode('connection')->defaultValue('default')->end()
264+
->scalarNode('callback')->isRequired()->end()
265+
->scalarNode('idle_timeout')->end()
266+
->scalarNode('timeout_wait')->defaultValue(3)->end()
267+
->scalarNode('idle_timeout_exit_code')->end()
268+
->scalarNode('auto_setup_fabric')->defaultTrue()->end()
269+
->arrayNode('qos_options')
270+
->children()
271+
->scalarNode('prefetch_size')->defaultValue(0)->end()
272+
->scalarNode('prefetch_count')->defaultValue(2)->end()
273+
->booleanNode('global')->defaultFalse()->end()
274+
->end()
275+
->end()
276+
->scalarNode('enable_logger')->defaultFalse()->end()
277+
->end()
278+
->end()
279+
->end()
280+
->end()
281+
;
282+
}
283+
231284
protected function addAnonConsumers(ArrayNodeDefinition $node)
232285
{
233286
$node

DependencyInjection/OldSoundRabbitMqExtension.php

+51-1
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public function load(array $configs, ContainerBuilder $container)
4040
$loader = new XmlFileLoader($this->container, new FileLocator(array(__DIR__ . '/../Resources/config')));
4141
$loader->load('rabbitmq.xml');
4242

43-
$configuration = new Configuration();
43+
$configuration = new Configuration($this->getAlias());
4444
$this->config = $this->processConfiguration($configuration, $configs);
4545

4646
$this->collectorEnabled = $this->config['enable_collector'];
@@ -51,6 +51,7 @@ public function load(array $configs, ContainerBuilder $container)
5151
$this->loadConsumers();
5252
$this->loadMultipleConsumers();
5353
$this->loadDynamicConsumers();
54+
$this->loadBatchConsumers();
5455
$this->loadAnonConsumers();
5556
$this->loadRpcClients();
5657
$this->loadRpcServers();
@@ -362,6 +363,55 @@ protected function loadDynamicConsumers()
362363
}
363364
}
364365

366+
protected function loadBatchConsumers()
367+
{
368+
foreach ($this->config['batch_consumers'] as $key => $consumer) {
369+
$definition = new Definition('%old_sound_rabbit_mq.batch_consumer.class%');
370+
371+
if (!isset($consumer['exchange_options'])) {
372+
$consumer['exchange_options'] = $this->getDefaultExchangeOptions();
373+
}
374+
375+
$definition
376+
->addTag('old_sound_rabbit_mq.base_amqp')
377+
->addTag('old_sound_rabbit_mq.batch_consumer')
378+
->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']))
379+
->addMethodCall('setPrefetchCount', array($consumer['qos_options']['prefetch_count']))
380+
->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'batchExecute')))
381+
->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
382+
->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options'])))
383+
->addMethodCall('setQosOptions', array(
384+
$consumer['qos_options']['prefetch_size'],
385+
$consumer['qos_options']['prefetch_count'],
386+
$consumer['qos_options']['global']
387+
))
388+
;
389+
390+
if (isset($consumer['idle_timeout_exit_code'])) {
391+
$definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
392+
}
393+
394+
if (isset($consumer['idle_timeout'])) {
395+
$definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
396+
}
397+
398+
if (!$consumer['auto_setup_fabric']) {
399+
$definition->addMethodCall('disableAutoSetupFabric');
400+
}
401+
402+
$this->injectConnection($definition, $consumer['connection']);
403+
if ($this->collectorEnabled) {
404+
$this->injectLoggedChannel($definition, $key, $consumer['connection']);
405+
}
406+
407+
if ($consumer['enable_logger']) {
408+
$this->injectLogger($definition);
409+
}
410+
411+
$this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_batch', $key), $definition);
412+
}
413+
}
414+
365415
protected function loadAnonConsumers()
366416
{
367417
foreach ($this->config['anon_consumers'] as $key => $anon) {

README.md

+95
Original file line numberDiff line numberDiff line change
@@ -791,6 +791,101 @@ $ ./app/console_dev rabbitmq:anon-consumer -m 5 -r '#.error' logs_watcher
791791

792792
The only new option compared to the commands that we have seen before is the one that specifies the __routing key__: `-r '#.error'`.
793793

794+
### Batch Consumers ###
795+
796+
In some cases you will want to get a batch of messages and then do some processing on all of them. Batch consumers will allow you to define logic for this type of processing.
797+
798+
e.g: Imagine that you have a queue where you receive a message for inserting some information in the database, and you realize that if you do a batch insert is much better then by inserting one by one.
799+
800+
Define a callback service that implements `BatchConsumerInterface` and add the definition of the consumer to your configuration.
801+
802+
```yaml
803+
batch_consumers:
804+
batch_basic_consumer:
805+
connection: default
806+
exchange_options: {name: 'batch', type: fanout}
807+
queue_options: {name: 'batch'}
808+
callback: batch.basic
809+
qos_options: {prefetch_size: 0, prefetch_count: 2, global: false}
810+
timeout_wait: 5
811+
auto_setup_fabric: false
812+
idle_timeout_exit_code: -2
813+
```
814+
815+
You can implement a batch consumer that will acknowledge all messages in one return or you can have control on what message to acknoledge.
816+
817+
```php
818+
namespace AppBundle\Service;
819+
820+
use OldSound\RabbitMqBundle\RabbitMq\BatchConsumerInterface;
821+
use PhpAmqpLib\Message\AMQPMessage;
822+
823+
class DevckBasicConsumer implements BatchConsumerInterface
824+
{
825+
/**
826+
* @inheritDoc
827+
*/
828+
public function batchExecute(array $messages)
829+
{
830+
echo sprintf('Doing batch execution%s', PHP_EOL);
831+
foreach ($messages as $message) {
832+
$this->executeSomeLogicPerMessage($message);
833+
}
834+
835+
// you ack all messages got in batch
836+
return true;
837+
}
838+
}
839+
```
840+
841+
```php
842+
namespace AppBundle\Service;
843+
844+
use OldSound\RabbitMqBundle\RabbitMq\BatchConsumerInterface;
845+
use PhpAmqpLib\Message\AMQPMessage;
846+
847+
class DevckBasicConsumer implements BatchConsumerInterface
848+
{
849+
/**
850+
* @inheritDoc
851+
*/
852+
public function batchExecute(array $messages)
853+
{
854+
echo sprintf('Doing batch execution%s', PHP_EOL);
855+
$result = [];
856+
/** @var AMQPMessage $message */
857+
foreach ($messages as $message) {
858+
$result[(int)$message->delivery_info['delivery_tag']] = $this->executeSomeLogicPerMessage($message);
859+
}
860+
861+
// you ack only some messages that have return true
862+
// e.g:
863+
// $return = [
864+
// 1 => true,
865+
// 2 => true,
866+
// 3 => false,
867+
// 4 => true,
868+
// 5 => -1,
869+
// 6 => 2,
870+
// ];
871+
// The following will happen:
872+
// * ack: 1,2,4
873+
// * reject and requeq: 3
874+
// * nack and requeue: 6
875+
// * reject and drop: 5
876+
return $result;
877+
}
878+
}
879+
```
880+
881+
How to run the following batch consumer:
882+
883+
```bash
884+
$ ./bin/console rabbitmq:batch:consumer batch_basic_consumer -w
885+
```
886+
887+
Important: BatchConsumers will not have the -m|messages option available
888+
794889
### STDIN Producer ###
795890

796891
There's a Command that reads data from STDIN and publishes it to a RabbitMQ queue. To use it first you have to configure a `producer` service in your configuration file like this:

RabbitMq/BaseConsumer.php

-2
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,6 @@ protected function maybeStopConsumer()
9090

9191
if ($this->forceStop || ($this->consumed == $this->target && $this->target > 0)) {
9292
$this->stopConsuming();
93-
} else {
94-
return;
9593
}
9694
}
9795

0 commit comments

Comments
 (0)