diff --git a/composer.json b/composer.json index 6e52ced..ebabda3 100644 --- a/composer.json +++ b/composer.json @@ -6,15 +6,13 @@ "license": "MIT", "require": { "php": ">=7.1", - "illuminate/queue": "^6.0|^7.0|^8.0", + "illuminate/queue": "^9.0", "queue-interop/amqp-interop": "^0.8", "queue-interop/queue-interop": "^0.7|^0.8", - "enqueue/enqueue": "^0.10", - "enqueue/dsn": "^0.10" + "enqueue/enqueue-dev": "dev-master" }, "require-dev": { "phpunit/phpunit": "~5.5", - "enqueue/enqueue": "^0.10", "enqueue/null": "^0.10@dev", "enqueue/test": "^0.10@dev", "enqueue/simple-client": "^0.10@dev" @@ -25,6 +23,12 @@ "/Tests/" ] }, + "repositories": [ + { + "type": "vcs", + "url": "https://github.com/weconnectdata/enqueue-dev" + } + ], "suggest": { "enqueue/simple-client": "If you want to use enqueue client and cli commands" }, diff --git a/src/Job.php b/src/Job.php index c148953..1e87547 100644 --- a/src/Job.php +++ b/src/Job.php @@ -9,6 +9,7 @@ use Interop\Queue\Context; use Interop\Queue\Exception\DeliveryDelayNotSupportedException; use Interop\Queue\Message; +use ReflectionClass; class Job extends BaseJob implements JobContract { @@ -51,6 +52,42 @@ public function delete() $this->consumer->acknowledge($this->message); } + /** + * {@inheritdoc} + */ + public function fire() + { + $handlerClass = config('queue.connections.' . $this->getConnectionName() . '.handler'); + $timeout = config('queue.connections.' . $this->getConnectionName() . '.timeout'); + + if (! empty($handlerClass)) { + return (new $handlerClass($this->consumer->receive($timeout)))->handle(); + } else { + return parent::fire(); + } + } + + /** + * In case the payload is not a serialised PHP message, provide a default fall back + * + * @return array + * @throws \ReflectionException + */ + public function payload() + { + if (empty(parent::payload())) { + $handlerClass = config('queue.connections.' . $this->getConnectionName() . '.handler'); + + if (! empty($handlerClass)) { + return [ + 'job' => $handlerClass + ]; + } + } + + return parent::payload(); + } + /** * {@inheritdoc} */ @@ -60,7 +97,7 @@ public function release($delay = 0) $requeueMessage = clone $this->message; $requeueMessage->setProperty('x-attempts', $this->attempts() + 1); - + $producer = $this->context->createProducer(); try { diff --git a/src/Worker.php b/src/Worker.php index 55b6210..a4eb99d 100644 --- a/src/Worker.php +++ b/src/Worker.php @@ -144,7 +144,7 @@ public function onPostMessageReceived(PostMessageReceived $context): void } } - public function stop($status = 0) + public function stop($status = 0, $options = null) { if ($this->interop) { $this->stopped = true; @@ -152,7 +152,7 @@ public function stop($status = 0) return; } - parent::stop($status); + parent::stop($status, $options); } }