Skip to content

Commit 06fee4c

Browse files
committed
project
1 parent b7c91bd commit 06fee4c

18 files changed

+1436
-0
lines changed

CheckMasterProcessSubscriber.php

+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
<?php
2+
namespace Quartz\Bridge;
3+
4+
use Quartz\Core\SchedulerException;
5+
use Quartz\Events\Event;
6+
use Quartz\Events\TickEvent;
7+
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
8+
9+
class CheckMasterProcessSubscriber implements EventSubscriberInterface
10+
{
11+
/**
12+
* @param TickEvent $event
13+
*
14+
* @throws SchedulerException
15+
*/
16+
public function checkMasterProcessor(TickEvent $event)
17+
{
18+
if (false == $mPid = getenv('MASTER_PROCESS_PID')) {
19+
throw new SchedulerException('The extension rely on MASTER_PROCESS_PID env var set but it is not set.');
20+
}
21+
22+
if (false == posix_kill($mPid, 0)) {
23+
$event->setInterrupted(true);
24+
}
25+
}
26+
27+
/**
28+
* {@inheritdoc}
29+
*/
30+
public static function getSubscribedEvents()
31+
{
32+
return [
33+
Event::SCHEDULER_TICK => 'checkMasterProcessor',
34+
];
35+
}
36+
}

Console/ManagementCommand.php

+66
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
<?php
2+
namespace Quartz\App\Console;
3+
4+
use Quartz\App\SchedulerFactory;
5+
use Symfony\Component\Console\Command\Command;
6+
use Symfony\Component\Console\Input\InputInterface;
7+
use Symfony\Component\Console\Input\InputOption;
8+
use Symfony\Component\Console\Output\OutputInterface;
9+
use Symfony\Component\Console\Question\ConfirmationQuestion;
10+
11+
class ManagementCommand extends Command
12+
{
13+
/**
14+
* @var SchedulerFactory
15+
*/
16+
private $factory;
17+
18+
/**
19+
* {@inheritdoc}
20+
*/
21+
public function __construct(SchedulerFactory $factory)
22+
{
23+
parent::__construct('manage');
24+
25+
$this->factory = $factory;
26+
}
27+
28+
/**
29+
* {@inheritdoc}
30+
*/
31+
protected function configure()
32+
{
33+
$this
34+
->addOption('clear-all', null, InputOption::VALUE_NONE, 'Clears (deletes!) all scheduling data - all Jobs, Triggers, Calendars.')
35+
->addOption('create-indexes', null, InputOption::VALUE_NONE, 'Creates all required storage indexes')
36+
->addOption('create-queues', null, InputOption::VALUE_NONE, 'Creates all required queues')
37+
;
38+
}
39+
40+
/**
41+
* {@inheritdoc}
42+
*/
43+
protected function execute(InputInterface $input, OutputInterface $output)
44+
{
45+
$scheduler = $this->factory->getScheduler();
46+
47+
if ($input->getOption('clear-all')) {
48+
$helper = $this->getHelper('question');
49+
$question = new ConfirmationQuestion('You are just about to delete all storage data. Are you sure? ', false, '/^(y|j)/i');
50+
51+
if ($helper->ask($input, $output, $question)) {
52+
$scheduler->clear();
53+
}
54+
}
55+
56+
if ($input->getOption('create-indexes')) {
57+
$output->writeln('Creating storage indexes');
58+
$scheduler->getStore()->createIndexes(); // TODO: is not part of interface :(
59+
}
60+
61+
if ($input->getOption('create-queues')) {
62+
$output->writeln('Creating enqueue queues');
63+
$this->factory->getEnqueue()->setupBroker();
64+
}
65+
}
66+
}

Console/SchedulerCommand.php

+52
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
<?php
2+
namespace Quartz\App\Console;
3+
4+
use Quartz\App\LoggerSubscriber;
5+
use Quartz\App\SchedulerFactory;
6+
use Quartz\App\SignalSubscriber;
7+
use Symfony\Component\Console\Command\Command;
8+
use Symfony\Component\Console\Input\InputInterface;
9+
use Symfony\Component\Console\Input\InputOption;
10+
use Symfony\Component\Console\Logger\ConsoleLogger;
11+
use Symfony\Component\Console\Output\OutputInterface;
12+
13+
class SchedulerCommand extends Command
14+
{
15+
/**
16+
* SchedulerFactory
17+
*/
18+
private $factory;
19+
20+
/**
21+
* @param SchedulerFactory $factory
22+
*/
23+
public function __construct(SchedulerFactory $factory)
24+
{
25+
parent::__construct('scheduler');
26+
27+
$this->factory = $factory;
28+
}
29+
30+
protected function configure()
31+
{
32+
$this->addOption('setup', null, InputOption::VALUE_NONE);
33+
}
34+
35+
/**
36+
* {@inheritdoc}
37+
*/
38+
protected function execute(InputInterface $input, OutputInterface $output)
39+
{
40+
if ($input->getOption('setup')) {
41+
$this->factory->getEnqueue()->setupBroker();
42+
// TODO: create store index
43+
}
44+
45+
$scheduler = $this->factory->getScheduler();
46+
$logger = new LoggerSubscriber(new ConsoleLogger($output));
47+
$scheduler->getEventDispatcher()->addSubscriber($logger);
48+
$scheduler->getEventDispatcher()->addSubscriber(new SignalSubscriber());
49+
50+
$scheduler->start();
51+
}
52+
}

Console/WorkerCommand.php

+58
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
<?php
2+
namespace Quartz\App\Console;
3+
4+
use Enqueue\Consumption\ChainExtension;
5+
use Enqueue\Consumption\Extension\LoggerExtension;
6+
use Enqueue\Consumption\Extension\ReplyExtension;
7+
use Quartz\App\LoggerSubscriber;
8+
use Quartz\App\RemoteScheduler;
9+
use Quartz\App\SchedulerFactory;
10+
use Quartz\App\Async\EnqueueJobRunShell;
11+
use Symfony\Component\Console\Command\Command;
12+
use Symfony\Component\Console\Input\InputInterface;
13+
use Symfony\Component\Console\Logger\ConsoleLogger;
14+
use Symfony\Component\Console\Output\OutputInterface;
15+
16+
class WorkerCommand extends Command
17+
{
18+
/**
19+
* SchedulerFactory
20+
*/
21+
private $factory;
22+
23+
/**
24+
* @param SchedulerFactory $scheduler
25+
*/
26+
public function __construct(SchedulerFactory $scheduler)
27+
{
28+
parent::__construct('worker');
29+
30+
$this->factory = $scheduler;
31+
}
32+
33+
/**
34+
* {@inheritdoc}
35+
*/
36+
protected function execute(InputInterface $input, OutputInterface $output)
37+
{
38+
$enqueue = $this->factory->getEnqueue();
39+
40+
$scheduler = $this->factory->getScheduler();
41+
$logger = new LoggerSubscriber(new ConsoleLogger($output));
42+
$scheduler->getEventDispatcher()->addSubscriber($logger);
43+
44+
$jobRunShell = $this->factory->getJobRunShellProcessor();
45+
$enqueue->bind(EnqueueJobRunShell::COMMAND, EnqueueJobRunShell::COMMAND, function($message, $context) use ($jobRunShell) {
46+
return $jobRunShell->process($message, $context);
47+
});
48+
49+
$remoteScheduler = $this->factory->getRemoteSchedulerProcessor();
50+
$enqueue->bind(RemoteScheduler::COMMAND, RemoteScheduler::COMMAND, function($message, $context) use ($remoteScheduler) {
51+
return $remoteScheduler->process($message, $context);
52+
});
53+
54+
$extensions = new ChainExtension([new ReplyExtension(), new LoggerExtension(new ConsoleLogger($output))]);
55+
56+
$enqueue->consume($extensions);
57+
}
58+
}

DI/QuartzConfiguration.php

+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
<?php
2+
3+
namespace Quartz\Bridge\DI;
4+
5+
use Symfony\Component\Config\Definition\Builder\TreeBuilder;
6+
use Symfony\Component\Config\Definition\ConfigurationInterface;
7+
8+
class QuartzConfiguration implements ConfigurationInterface
9+
{
10+
/**
11+
* {@inheritdoc}
12+
*/
13+
public function getConfigTreeBuilder()
14+
{
15+
$tb = new TreeBuilder();
16+
$rootNode = $tb->root('quartz');
17+
18+
$rootNode->children()
19+
->arrayNode('store')->addDefaultsIfNotSet()->children()
20+
->scalarNode('uri')->defaultValue('mongodb://localhost:27017')->end()
21+
->end()->end()
22+
->integerNode('misfire_threshold')->min(10)->defaultValue(60)->end()
23+
;
24+
25+
return $tb;
26+
}
27+
}

DI/QuartzExtension.php

+119
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
<?php
2+
namespace Quartz\Bridge\DI;
3+
4+
use Quartz\Bridge\Enqueue\EnqueueResponseJob;
5+
use Quartz\Bridge\Scheduler\EnqueueJobRunShell;
6+
use Quartz\Bridge\Scheduler\JobRunShellProcessor;
7+
use Quartz\Bridge\Scheduler\RemoteSchedulerProcessor;
8+
use Quartz\Bridge\Scheduler\RpcProtocol;
9+
use Quartz\Core\SimpleJobFactory;
10+
use Quartz\Scheduler\StdJobRunShell;
11+
use Quartz\Scheduler\StdJobRunShellFactory;
12+
use Quartz\Scheduler\StdScheduler;
13+
use Quartz\Scheduler\Store\YadmStore;
14+
use Quartz\Scheduler\Store\YadmStoreResource;
15+
use Symfony\Component\DependencyInjection\ContainerBuilder;
16+
use Symfony\Component\DependencyInjection\Extension\Extension;
17+
use Symfony\Component\DependencyInjection\Reference;
18+
19+
class QuartzExtension extends Extension
20+
{
21+
/**
22+
* @var string
23+
*/
24+
private $alias;
25+
26+
/**
27+
* @param string $alias
28+
*/
29+
public function __construct($alias = 'quartz')
30+
{
31+
$this->alias = $alias;
32+
}
33+
34+
/**
35+
* {@inheritdoc}
36+
*/
37+
public function getAlias()
38+
{
39+
return $this->alias;
40+
}
41+
42+
/**
43+
* {@inheritdoc}
44+
*/
45+
public function load(array $configs, ContainerBuilder $container)
46+
{
47+
$config = $this->processConfiguration(new QuartzConfiguration(), $configs);
48+
49+
$container->register($this->format('store_resource'), YadmStoreResource::class)
50+
->setArguments([$config['store']])
51+
;
52+
53+
$container->register($this->format('store'), YadmStore::class)
54+
->setArguments([new Reference($this->format('store_resource'))])
55+
->addMethodCall('setMisfireThreshold', [$config['misfire_threshold']])
56+
;
57+
58+
$container->register($this->format('enqueue_job_run_shell'), EnqueueJobRunShell::class)
59+
->setArguments([new Reference('enqueue.client.producer_v2')])
60+
;
61+
62+
$container->register($this->format('job_run_shell_factory'), StdJobRunShellFactory::class)
63+
->setArguments([new Reference($this->format('enqueue_job_run_shell'))])
64+
;
65+
66+
$container->register($this->format('job_factory'), SimpleJobFactory::class)
67+
->setArguments([[]])
68+
;
69+
70+
// TODO: add config option where can enable/disable this job
71+
$container->register($this->format('job.enqueue_response'), EnqueueResponseJob::class)
72+
->setArguments([new Reference('enqueue.client.producer_v2')])
73+
->addTag($this->format('job'), ['alias' => 'enqueue_response'])
74+
;
75+
76+
$container->register($this->format('scheduler'), StdScheduler::class)
77+
->setArguments([
78+
new Reference($this->format('store')),
79+
new Reference($this->format('job_run_shell_factory')),
80+
new Reference($this->format('job_factory')),
81+
new Reference($this->format('event_dispatcher'))
82+
])
83+
;
84+
85+
$container->register($this->format('std_job_run_shell'), StdJobRunShell::class)
86+
->addMethodCall('initialize', [new Reference($this->format('scheduler'))])
87+
;
88+
89+
$container->register($this->format('job_run_shell_processor'), JobRunShellProcessor::class)
90+
->setArguments([
91+
new Reference($this->format('store')),
92+
new Reference($this->format('std_job_run_shell'))
93+
])
94+
->addTag('enqueue.client.processor')
95+
;
96+
97+
$container->register($this->format('rpc_protocol'), RpcProtocol::class)
98+
->setPublic(false)
99+
;
100+
101+
$container->register($this->format('remote_scheduler_processor'), RemoteSchedulerProcessor::class)
102+
->setArguments([
103+
new Reference($this->format('scheduler')),
104+
new Reference($this->format('rpc_protocol'))
105+
])
106+
->addTag('enqueue.client.processor')
107+
;
108+
}
109+
110+
/**
111+
* @param string $service
112+
*
113+
* @return string
114+
*/
115+
private function format($service)
116+
{
117+
return $this->alias.'.'.$service;
118+
}
119+
}

0 commit comments

Comments
 (0)