diff --git a/CHANGELOG.md b/CHANGELOG.md index 7a17202af5..fa6c79ccf4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ Yii2 Queue Extension Change Log ## 2.0.1 under development +- Enh #102: Stoppable behavior (zhuravljov) - Enh: Rename cli\Verbose behavior to cli\VerboseBehavior (zhuravljov) - Enh #147: Igbinary job serializer (xutl) - Enh: Rename serializers\Serializer interface to serializers\SerializerInterface (zhuravljov) diff --git a/src/stoppable/BaseBehavior.php b/src/stoppable/BaseBehavior.php new file mode 100644 index 0000000000..e0f10da87a --- /dev/null +++ b/src/stoppable/BaseBehavior.php @@ -0,0 +1,81 @@ + + * @since 2.0.1 + */ +abstract class BaseBehavior extends \yii\base\Behavior +{ + /** + * @var bool option allows to turn status checking off in case a driver does not support it. + */ + public $checkWaiting = true; + /** + * @var Queue + * @inheritdoc + */ + public $owner; + + /** + * @inheritdoc + */ + public function events() + { + return [ + Queue::EVENT_BEFORE_EXEC => 'beforeExec', + ]; + } + + /** + * @param ExecEvent $event + */ + public function beforeExec(ExecEvent $event) + { + $event->handled = $this->isStopped($event->id); + } + + /** + * Sets stop flag. + * + * @param string $id of a job + * @return bool + */ + public function stop($id) + { + if (!$this->checkWaiting || $this->owner->isWaiting($id)) { + $this->markAsStopped($id); + return true; + } + + return false; + } + + /** + * @param string $id of a job + * @return bool + */ + abstract protected function markAsStopped($id); + + /** + * @param string $id of a job + * @return bool + */ + abstract protected function isStopped($id); +} diff --git a/src/stoppable/Behavior.php b/src/stoppable/Behavior.php new file mode 100644 index 0000000000..049001a010 --- /dev/null +++ b/src/stoppable/Behavior.php @@ -0,0 +1,56 @@ + + * @since 2.0.1 + */ +class Behavior extends BaseBehavior +{ + /** + * @var Cache|array|string the cache instance used to store stopped status. + */ + public $cache = 'cache'; + + /** + * @inheritdoc + */ + public function init() + { + parent::init(); + $this->cache = Instance::ensure($this->cache, Cache::class); + } + /** + * @param string $id of a job + * @return bool + */ + protected function markAsStopped($id) + { + $this->cache->set(__CLASS__ . $id, true); + } + + /** + * @param string $id of a job + * @return bool + */ + protected function isStopped($id) + { + if ($this->cache->exists(__CLASS__ . $id)) { + $this->cache->delete(__CLASS__ . $id); + return true; + } + + return false; + } +} diff --git a/tests/stoppable/StoppableTest.php b/tests/stoppable/StoppableTest.php new file mode 100644 index 0000000000..f9841424ac --- /dev/null +++ b/tests/stoppable/StoppableTest.php @@ -0,0 +1,70 @@ + + */ +class StoppableTest extends TestCase +{ + public function testStop() + { + $job = new SimpleJob(['uid' => uniqid()]); + $id = $this->getQueue()->push($job); + $this->getQueue()->stop($id); + $this->getQueue()->run(); + $this->assertFileNotExists($job->getFileName()); + } + + public function testNotStop() + { + $job = new SimpleJob(['uid' => uniqid()]); + $this->getQueue()->push($job); + $this->getQueue()->run(); + $this->assertFileExists($job->getFileName()); + } + + /** + * @return SyncQueue|Stoppable + */ + protected function getQueue() + { + if (!$this->_queue) { + $this->_queue = new SyncQueue([ + 'handle' => false, + 'as stoppable' => [ + 'class' => Stoppable::class, + 'cache' => ArrayCache::class, + ], + ]); + } + return $this->_queue; + } + + private $_queue; + + /** + * @inheritdoc + */ + protected function tearDown() + { + foreach (glob(Yii::getAlias("@runtime/job-*.lock")) as $fileName) { + unlink($fileName); + } + parent::tearDown(); + } +} \ No newline at end of file