Skip to content

Commit 0bb11ed

Browse files
authored
Merge pull request #415 from tiborb/multiple_consumer_context
Added optional context argument to multiple consumer
2 parents df75641 + 0d35a82 commit 0bb11ed

File tree

5 files changed

+119
-4
lines changed

5 files changed

+119
-4
lines changed

Command/MultipleConsumerCommand.php

+13-3
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,28 @@
22

33
namespace OldSound\RabbitMqBundle\Command;
44

5+
use Symfony\Component\Console\Input\InputArgument;
6+
57
class MultipleConsumerCommand extends BaseConsumerCommand
68
{
79
protected function configure()
810
{
911
parent::configure();
1012

11-
$this->setDescription('Executes a consumer that uses multiple queues');
12-
$this->setName('rabbitmq:multiple-consumer');
13+
$this->setDescription('Executes a consumer that uses multiple queues')
14+
->setName('rabbitmq:multiple-consumer')
15+
->addArgument('context', InputArgument::OPTIONAL, 'Context the consumer runs in')
16+
;
1317
}
1418

1519
protected function getConsumerService()
1620
{
1721
return 'old_sound_rabbit_mq.%s_multiple';
1822
}
19-
}
23+
24+
protected function initConsumer($input)
25+
{
26+
parent::initConsumer($input);
27+
$this->consumer->setContext($input->getArgument('context'));
28+
}
29+
}

Provider/QueuesProviderInterface.php

+15
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,22 @@ interface QueuesProviderInterface
1212
/**
1313
* Return array of queues
1414
*
15+
* Example:
16+
* array(
17+
* 'queue_name' => array(
18+
* 'durable' => false,
19+
* 'exclusive' => false,
20+
* 'passive' => false,
21+
* 'nowait' => false,
22+
* 'auto_delete' => false,
23+
* 'routing_keys' => array('key.1', 'key.2'),
24+
* 'arguments' => array(),
25+
* 'ticket' => '',
26+
* 'callback' => array($callback, 'execute')
27+
* )
28+
* );
1529
* @return array
30+
*
1631
*/
1732
public function getQueues();
1833
}

RabbitMq/MultipleConsumer.php

+13-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,13 @@ class MultipleConsumer extends Consumer
1616
* @var QueuesProviderInterface
1717
*/
1818
protected $queuesProvider = null;
19+
20+
/**
21+
* Context the consumer runs in
22+
*
23+
* @var string
24+
*/
25+
protected $context = null;
1926

2027
/**
2128
* QueuesProvider setter
@@ -39,6 +46,11 @@ public function setQueues(array $queues)
3946
{
4047
$this->queues = $queues;
4148
}
49+
50+
public function setContext($context)
51+
{
52+
$this->context = $context;
53+
}
4254

4355
protected function setupConsumer()
4456
{
@@ -102,7 +114,7 @@ protected function mergeQueues()
102114
if ($this->queuesProvider) {
103115
$this->queues = array_merge(
104116
$this->queues,
105-
$this->queuesProvider->getQueues()
117+
$this->queuesProvider->getQueues($this->context)
106118
);
107119
}
108120
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
<?php
2+
3+
namespace OldSound\RabbitMqBundle\Tests\Command;
4+
5+
use OldSound\RabbitMqBundle\Command\MultipleConsumerCommand;
6+
7+
use Symfony\Component\Console\Input\InputOption;
8+
9+
class MultipleConsumerCommandTest extends BaseCommandTest{
10+
11+
protected function setUp()
12+
{
13+
parent::setUp();
14+
$this->definition->expects($this->any())
15+
->method('getOptions')
16+
->will($this->returnValue(array(
17+
new InputOption('--verbose', '-v', InputOption::VALUE_NONE, 'Increase verbosity of messages.'),
18+
new InputOption('--env', '-e', InputOption::VALUE_REQUIRED, 'The Environment name.', 'dev'),
19+
new InputOption('--no-debug', null, InputOption::VALUE_NONE, 'Switches off debug mode.'),
20+
)));
21+
$this->application->expects($this->once())
22+
->method('getHelperSet')
23+
->will($this->returnValue($this->helperSet));
24+
25+
$this->command = new MultipleConsumerCommand();
26+
$this->command->setApplication($this->application);
27+
}
28+
29+
/**
30+
* testInputsDefinitionCommand
31+
*/
32+
public function testInputsDefinitionCommand()
33+
{
34+
// check argument
35+
$definition = $this->command->getDefinition();
36+
$this->assertTrue($definition->hasArgument('name'));
37+
$this->assertTrue($definition->getArgument('name')->isRequired()); // Name is required to find the service
38+
39+
$this->assertTrue($definition->hasArgument('context'));
40+
$this->assertFalse($definition->getArgument('context')->isRequired()); // Context is required for the queue options provider
41+
42+
//check options
43+
$this->assertTrue($definition->hasOption('messages'));
44+
$this->assertTrue($definition->getOption('messages')->isValueOptional()); // It should accept value
45+
46+
$this->assertTrue($definition->hasOption('route'));
47+
$this->assertTrue($definition->getOption('route')->isValueOptional()); // It should accept value
48+
49+
$this->assertTrue($definition->hasOption('without-signals'));
50+
$this->assertFalse($definition->getOption('without-signals')->acceptValue()); // It shouldn't accept value because it is a true/false input
51+
52+
$this->assertTrue($definition->hasOption('debug'));
53+
$this->assertFalse($definition->getOption('debug')->acceptValue()); // It shouldn't accept value because it is a true/false input
54+
}
55+
}

Tests/RabbitMq/MultipleConsumerTest.php

+23
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,29 @@ public function testQueuesProvider($processFlag, $expectedMethod, $expectedReque
111111
$this->multipleConsumer->processQueueMessage('test-1', $amqpMessage);
112112
$this->multipleConsumer->processQueueMessage('test-2', $amqpMessage);
113113
}
114+
115+
public function testQueuesPrivider()
116+
{
117+
$amqpConnection = $this->prepareAMQPConnection();
118+
$amqpChannel = $this->prepareAMQPChannel();
119+
$this->multipleConsumer->setContext('foo');
120+
121+
$queuesProvider = $this->prepareQueuesProvider();
122+
$queuesProvider->expects($this->once())
123+
->method('getQueues')
124+
->will($this->returnValue(
125+
array(
126+
'queue_foo' => array()
127+
)
128+
));
129+
130+
$this->multipleConsumer->setQueuesProvider($queuesProvider);
131+
132+
$reflectionClass = new \ReflectionClass(get_class($this->multipleConsumer));
133+
$reflectionMethod = $reflectionClass->getMethod('mergeQueues');
134+
$reflectionMethod->setAccessible(true);
135+
$reflectionMethod->invoke($this->multipleConsumer);
136+
}
114137

115138
/**
116139
* Check queues provider works well with static queues together

0 commit comments

Comments
 (0)