Skip to content

Commit 23b734e

Browse files
committed
remote transport interface
1 parent d9c0885 commit 23b734e

16 files changed

+343
-69
lines changed

phpunit.xml.dist

+6
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,12 @@
1616
<testsuite name="quartz">
1717
<directory>pkg/quartz/Tests</directory>
1818
</testsuite>
19+
<testsuite name="bridge">
20+
<directory>pkg/bridge/Tests</directory>
21+
</testsuite>
22+
<testsuite name="app">
23+
<directory>pkg/app/tests</directory>
24+
</testsuite>
1925
</testsuites>
2026

2127
<filter>

pkg/app/.env.dist

+3
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,7 @@ RABBITMQ_PORT=5672
1111
RABBITMQ_USER=guest
1212
RABBITMQ_PASS=guest
1313
RABBITMQ_VHOST=/
14+
MONGODB_HOST=localhost
15+
MONGODB_PORT=27017
16+
MONGODB_DB=quartz
1417
###< symfony/framework-bundle ###
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
<?php
2+
namespace Quartz\App\Tests\Command;
3+
4+
use PHPUnit\Framework\TestCase;
5+
use Quartz\App\Command\ManagementCommand;
6+
use Quartz\Scheduler\StdScheduler;
7+
use Quartz\Scheduler\Store\YadmStore;
8+
use Symfony\Component\Console\Command\Command;
9+
use Symfony\Component\Console\Helper\HelperSet;
10+
use Symfony\Component\Console\Helper\QuestionHelper;
11+
use Symfony\Component\Console\Tester\CommandTester;
12+
13+
class ManagementCommandTest extends TestCase
14+
{
15+
public function testShouldExtendSymfonyCommand()
16+
{
17+
$this->assertInstanceOf(Command::class, new ManagementCommand($this->createSchedulerMock()));
18+
}
19+
20+
public function testShouldClearAll()
21+
{
22+
$scheduler = $this->createSchedulerMock();
23+
$scheduler
24+
->expects($this->once())
25+
->method('clear')
26+
;
27+
28+
$command = new ManagementCommand($scheduler);
29+
$command->setHelperSet(new HelperSet(['question' => new QuestionHelper()]));
30+
31+
$tester = new CommandTester($command);
32+
$tester->setInputs(['y']);
33+
$tester->execute([
34+
'--clear-all' => true,
35+
]);
36+
37+
$this->assertContains('You are just about to delete all storage data. Are you sure', $tester->getDisplay());
38+
}
39+
40+
public function testShouldCreateStoreIndexes()
41+
{
42+
$store = $this->createStoreMock();
43+
$store
44+
->expects($this->once())
45+
->method('createIndexes')
46+
;
47+
48+
$scheduler = $this->createSchedulerMock();
49+
$scheduler
50+
->expects($this->once())
51+
->method('getStore')
52+
->willReturn($store)
53+
;
54+
55+
$command = new ManagementCommand($scheduler);
56+
57+
$tester = new CommandTester($command);
58+
$tester->execute([
59+
'--create-indexes' => true,
60+
]);
61+
62+
$this->assertContains('Creating storage indexes', $tester->getDisplay());
63+
}
64+
65+
/**
66+
* @return \PHPUnit_Framework_MockObject_MockObject|StdScheduler
67+
*/
68+
private function createSchedulerMock()
69+
{
70+
return $this->createMock(StdScheduler::class);
71+
}
72+
73+
/**
74+
* @return \PHPUnit_Framework_MockObject_MockObject|YadmStore
75+
*/
76+
private function createStoreMock()
77+
{
78+
return $this->createMock(YadmStore::class);
79+
}
80+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
<?php
2+
namespace Quartz\App\Tests\Command;
3+
4+
use PHPUnit\Framework\TestCase;
5+
use Quartz\App\Command\SchedulerCommand;
6+
use Quartz\Bridge\LoggerSubscriber;
7+
use Quartz\Bridge\SignalSubscriber;
8+
use Quartz\Scheduler\StdScheduler;
9+
use Symfony\Component\Console\Command\Command;
10+
use Symfony\Component\Console\Tester\CommandTester;
11+
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
12+
13+
class SchedulerCommandTest extends TestCase
14+
{
15+
public function testShouldExtendSymfonyCommand()
16+
{
17+
$this->assertInstanceOf(Command::class, new SchedulerCommand($this->createSchedulerMock()));
18+
}
19+
20+
public function testShouldStartScheduler()
21+
{
22+
$dispatcher = $this->createEventDispatcherMock();
23+
24+
$scheduler = $this->createSchedulerMock();
25+
$scheduler
26+
->expects($this->any())
27+
->method('getEventDispatcher')
28+
->willReturn($dispatcher)
29+
;
30+
$scheduler
31+
->expects($this->once())
32+
->method('start')
33+
;
34+
35+
$command = new SchedulerCommand($scheduler);
36+
37+
$tester = new CommandTester($command);
38+
$tester->execute([]);
39+
40+
$this->assertEmpty($tester->getDisplay());
41+
}
42+
43+
public function testShouldAddLoggerSubscriber()
44+
{
45+
$dispatcher = $this->createEventDispatcherMock();
46+
$dispatcher
47+
->expects($this->at(0))
48+
->method('addSubscriber')
49+
->with($this->isInstanceOf(LoggerSubscriber::class))
50+
;
51+
52+
$scheduler = $this->createSchedulerMock();
53+
$scheduler
54+
->expects($this->any())
55+
->method('getEventDispatcher')
56+
->willReturn($dispatcher)
57+
;
58+
59+
$command = new SchedulerCommand($scheduler);
60+
61+
$tester = new CommandTester($command);
62+
$tester->execute([]);
63+
64+
$this->assertEmpty($tester->getDisplay());
65+
}
66+
67+
public function testShouldAddSignalSubscriber()
68+
{
69+
$dispatcher = $this->createEventDispatcherMock();
70+
$dispatcher
71+
->expects($this->at(1))
72+
->method('addSubscriber')
73+
->with($this->isInstanceOf(SignalSubscriber::class))
74+
;
75+
76+
$scheduler = $this->createSchedulerMock();
77+
$scheduler
78+
->expects($this->any())
79+
->method('getEventDispatcher')
80+
->willReturn($dispatcher)
81+
;
82+
83+
$command = new SchedulerCommand($scheduler);
84+
85+
$tester = new CommandTester($command);
86+
$tester->execute([]);
87+
88+
$this->assertEmpty($tester->getDisplay());
89+
}
90+
91+
/**
92+
* @return \PHPUnit_Framework_MockObject_MockObject|StdScheduler
93+
*/
94+
private function createSchedulerMock()
95+
{
96+
return $this->createMock(StdScheduler::class);
97+
}
98+
99+
/**
100+
* @return \PHPUnit_Framework_MockObject_MockObject|EventDispatcherInterface
101+
*/
102+
private function createEventDispatcherMock()
103+
{
104+
return $this->createMock(EventDispatcherInterface::class);
105+
}
106+
}

pkg/bridge/DI/QuartzExtension.php

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
<?php
22
namespace Quartz\Bridge\DI;
33

4+
use Quartz\Bridge\Enqueue\EnqueueRemoteTransportProcessor;
45
use Quartz\Bridge\Enqueue\EnqueueResponseJob;
56
use Quartz\Bridge\Scheduler\EnqueueJobRunShell;
67
use Quartz\Bridge\Scheduler\JobRunShellProcessor;
7-
use Quartz\Bridge\Scheduler\RemoteSchedulerProcessor;
88
use Quartz\Bridge\Scheduler\RpcProtocol;
99
use Quartz\Core\SimpleJobFactory;
1010
use Quartz\Scheduler\StdJobRunShell;
@@ -98,7 +98,7 @@ public function load(array $configs, ContainerBuilder $container)
9898
->setPublic(false)
9999
;
100100

101-
$container->register($this->format('remote_scheduler_processor'), RemoteSchedulerProcessor::class)
101+
$container->register($this->format('remote_transport_processor'), EnqueueRemoteTransportProcessor::class)
102102
->setArguments([
103103
new Reference($this->format('scheduler')),
104104
new Reference($this->format('rpc_protocol'))
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
<?php
2+
3+
namespace Quartz\Bridge\Enqueue;
4+
5+
use Enqueue\Client\ProducerV2Interface;
6+
use Enqueue\Util\JSON;
7+
use Quartz\Bridge\Scheduler\RemoteTransport;
8+
9+
class EnqueueRemoteTransport implements RemoteTransport
10+
{
11+
const COMMAND = 'quartz.rpc';
12+
13+
/**
14+
* @var ProducerV2Interface
15+
*/
16+
private $producer;
17+
18+
/**
19+
* @param ProducerV2Interface $producer
20+
*/
21+
public function __construct(ProducerV2Interface $producer)
22+
{
23+
$this->producer = $producer;
24+
}
25+
26+
/**
27+
* {@inheritdoc}
28+
*/
29+
public function request(array $parameters)
30+
{
31+
$responseMessage = $this->producer->sendCommand(self::COMMAND, $parameters, true)->receive();
32+
33+
return JSON::decode($responseMessage->getBody());
34+
}
35+
}

pkg/bridge/Scheduler/RemoteSchedulerProcessor.php renamed to pkg/bridge/Enqueue/EnqueueRemoteTransportProcessor.php

+4-3
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
11
<?php
2-
namespace Quartz\Bridge\Scheduler;
2+
namespace Quartz\Bridge\Enqueue;
33

44
use Enqueue\Client\CommandSubscriberInterface;
55
use Enqueue\Consumption\Result;
66
use Enqueue\Psr\PsrContext;
77
use Enqueue\Psr\PsrMessage;
88
use Enqueue\Psr\PsrProcessor;
99
use Enqueue\Util\JSON;
10+
use Quartz\Bridge\Scheduler\RpcProtocol;
1011
use Quartz\Core\Scheduler;
1112

12-
class RemoteSchedulerProcessor implements PsrProcessor, CommandSubscriberInterface
13+
class EnqueueRemoteTransportProcessor implements PsrProcessor, CommandSubscriberInterface
1314
{
1415
/**
1516
* @var Scheduler
@@ -53,6 +54,6 @@ public function process(PsrMessage $message, PsrContext $context)
5354
*/
5455
public static function getSubscribedCommand()
5556
{
56-
return RemoteScheduler::COMMAND;
57+
return EnqueueRemoteTransport::COMMAND;
5758
}
5859
}

pkg/bridge/Scheduler/RemoteScheduler.php

+8-12
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
<?php
22
namespace Quartz\Bridge\Scheduler;
33

4-
use Enqueue\Client\ProducerV2Interface;
5-
use Enqueue\Util\JSON;
64
use Quartz\Core\Calendar;
75
use Quartz\Core\JobDetail;
86
use Quartz\Core\Key;
@@ -11,25 +9,23 @@
119

1210
class RemoteScheduler implements Scheduler
1311
{
14-
const COMMAND = 'quartz.rpc';
15-
1612
/**
17-
* @var ProducerV2Interface
13+
* @var RemoteTransport
1814
*/
19-
private $producer;
15+
private $transport;
2016

2117
/**
2218
* @var RpcProtocol
2319
*/
2420
private $rpcProtocol;
2521

2622
/**
27-
* @param ProducerV2Interface $producer
28-
* @param RpcProtocol $rpcProtocol
23+
* @param RemoteTransport $transport
24+
* @param RpcProtocol $rpcProtocol
2925
*/
30-
public function __construct(ProducerV2Interface $producer, RpcProtocol $rpcProtocol)
26+
public function __construct(RemoteTransport $transport, RpcProtocol $rpcProtocol)
3127
{
32-
$this->producer = $producer;
28+
$this->transport = $transport;
3329
$this->rpcProtocol = $rpcProtocol;
3430
}
3531

@@ -45,9 +41,9 @@ private function call($method, array $args)
4541
{
4642
$request = $this->rpcProtocol->encodeRequest($method, $args);
4743

48-
$responseMessage = $this->producer->sendCommand(self::COMMAND, $request, true)->receive();
44+
$response = $this->transport->request($request);
4945

50-
$response = $this->rpcProtocol->decodeValue(JSON::decode($responseMessage->getBody()));
46+
$response = $this->rpcProtocol->decodeValue($response);
5147

5248
if ($response instanceof \Exception) {
5349
throw $response;
+13
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
<?php
2+
namespace Quartz\Bridge\Scheduler;
3+
4+
5+
interface RemoteTransport
6+
{
7+
/**
8+
* @param array $parameters
9+
*
10+
* @return mixed
11+
*/
12+
public function request(array $parameters);
13+
}

pkg/bridge/Scheduler/SchedulerFactory.php

+7-3
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
namespace Quartz\Bridge\Scheduler;
33

44
use Enqueue\SimpleClient\SimpleClient;
5+
use Quartz\Bridge\Enqueue\EnqueueRemoteTransport;
6+
use Quartz\Bridge\Enqueue\EnqueueRemoteTransportProcessor;
57
use Quartz\Bridge\Enqueue\EnqueueResponseJob;
68
use Quartz\Core\Scheduler;
79
use Quartz\Core\SchedulerFactory as BaseSchedulerFactory;
@@ -75,7 +77,9 @@ public function getScheduler()
7577

7678
public function getRemoteScheduler()
7779
{
78-
return new RemoteScheduler($this->getEnqueue()->getProducerV2(), new RpcProtocol());
80+
$transport = new EnqueueRemoteTransport($this->getEnqueue()->getProducerV2());
81+
82+
return new RemoteScheduler($transport, new RpcProtocol());
7983
}
8084

8185
public function getJobRunShellFactory()
@@ -116,11 +120,11 @@ public function getJobRunShellProcessor()
116120
}
117121

118122
/**
119-
* @return RemoteSchedulerProcessor
123+
* @return EnqueueRemoteTransportProcessor
120124
*/
121125
public function getRemoteSchedulerProcessor()
122126
{
123-
return new RemoteSchedulerProcessor($this->getScheduler(), new RpcProtocol());
127+
return new EnqueueRemoteTransportProcessor($this->getScheduler(), new RpcProtocol());
124128
}
125129

126130
/**

0 commit comments

Comments
 (0)