diff --git a/docs/en/index.rst b/docs/en/index.rst index 3de9203..9ca35e5 100644 --- a/docs/en/index.rst +++ b/docs/en/index.rst @@ -54,6 +54,10 @@ The following configuration should be present in the config array of your **conf // The name of an event listener class to associate with the worker 'listener' => \App\Listener\WorkerListener::class, + // (optional) The processor class to use for processing messages. + // Must implement Interop\Queue\Processor. Defaults to Cake\Queue\Queue\Processor + 'processor' => \App\Queue\CustomProcessor::class, + // The amount of time in milliseconds to sleep if no jobs are currently available. default: 10000 'receiveTimeout' => 10000, @@ -119,14 +123,14 @@ A simple job that logs received messages would look like:: /** * The maximum number of times the job may be attempted. (optional property) - * + * * @var int|null */ public static $maxAttempts = 3; /** * Whether there should be only one instance of a job on the queue at a time. (optional property) - * + * * @var bool */ public static $shouldBeUnique = false; @@ -299,7 +303,7 @@ queue jobs, you can use the ``QueueTransport``. In your application's return [ // ... other configuration - 'EmailTransport' => [ + 'EmailTransport' => [ 'default' => [ 'className' => MailTransport::class, // Configuration for MailTransport. @@ -323,6 +327,153 @@ With this configuration in place, any time you send an email with the ``default` email profile CakePHP will generate a queue message. Once that queue message is processed the default ``MailTransport`` will be used to deliver the email messages. +Custom Processors +================ + +You can customize how messages are processed by specifying a custom processor class +in your queue configuration. Custom processors must implement the ``Interop\Queue\Processor`` +interface. + +Example custom processor that extends the main Processor:: + + dispatchEvent('Processor.message.seen', ['queueMessage' => $message]); + + $jobMessage = new Message($message, $context, $this->container); + try { + $jobMessage->getCallable(); + } catch (RuntimeException | Error $e) { + $this->logger->debug('Invalid callable for message. Rejecting message from queue.'); + $this->dispatchEvent('Processor.message.invalid', ['message' => $jobMessage]); + + return InteropProcessor::REJECT; + } + + $startTime = microtime(true) * 1000; + $this->dispatchEvent('Processor.message.start', ['message' => $jobMessage]); + + try { + $response = $this->processMessage($jobMessage); + } catch (Throwable $e) { + $message->setProperty('jobException', $e); + + $this->logger->debug(sprintf('Message encountered exception: %s', $e->getMessage())); + $this->dispatchEvent('Processor.message.exception', [ + 'message' => $jobMessage, + 'exception' => $e, + 'duration' => (int)((microtime(true) * 1000) - $startTime), + ]); + + return Result::requeue('Exception occurred while processing message'); + } + + $duration = (int)((microtime(true) * 1000) - $startTime); + + if ($response === InteropProcessor::ACK) { + $this->logger->debug('Message processed successfully'); + $this->dispatchEvent('Processor.message.success', [ + 'message' => $jobMessage, + 'duration' => $duration, + ]); + + return InteropProcessor::ACK; + } + + if ($response === InteropProcessor::REJECT) { + $this->logger->debug('Message processed with rejection'); + $this->dispatchEvent('Processor.message.reject', [ + 'message' => $jobMessage, + 'duration' => $duration, + ]); + + return InteropProcessor::REJECT; + } + + $this->logger->debug('Message processed with failure, requeuing'); + $this->dispatchEvent('Processor.message.failure', [ + 'message' => $jobMessage, + 'duration' => $duration, + ]); + + return InteropProcessor::REQUEUE; + } + } + +Configuration example:: + + 'Queue' => [ + 'default' => [ + 'url' => 'redis://localhost:6379', + 'queue' => 'default', + // No processor specified - uses default Processor class + ], + 'timed' => [ + 'url' => 'redis://localhost:6379', + 'queue' => 'timed', + 'processor' => \App\Queue\TimedProcessor::class, // Custom processor with timing + ], + ], + +**Note**: If no processor is specified in the configuration, the default +``Cake\Queue\Queue\Processor`` class will be used. Custom processors are useful +for adding custom logging, metrics collection, or specialized message handling. + +**Important**: The `--processor` command line option is different from the `processor` configuration option: + +- **Configuration `processor`**: Specifies the processor class to use for processing messages +- **Command line `--processor`**: Specifies the processor name for Enqueue topic binding (used in `bindTopic()`) + +Example usage:: + + # Use custom processor class from config + bin/cake queue worker --config=timed + + # Use custom processor class AND specify topic binding name + bin/cake queue worker --config=timed --processor=my-topic-processor + Run the worker ============== @@ -336,7 +487,7 @@ This shell can take a few different options: - ``--config`` (default: default): Name of a queue config to use - ``--queue`` (default: default): Name of queue to bind to -- ``--processor`` (default: ``null``): Name of processor to bind to +- ``--processor`` (default: ``null``): Name of processor to bind to (for Enqueue topic binding, not the processor class) - ``--logger`` (default: ``stdout``): Name of a configured logger - ``--max-jobs`` (default: ``null``): Maximum number of jobs to process. Worker will exit after limit is reached. - ``--max-runtime`` (default: ``null``): Maximum number of seconds to run. Worker will exit after limit is reached. @@ -370,7 +521,7 @@ Requeue Failed Jobs Push jobs back onto the queue and remove them from the ``queue_failed_jobs`` table. If a job fails to requeue it is not guaranteed that the job was not run. - + .. code-block:: bash bin/cake queue requeue diff --git a/src/Command/WorkerCommand.php b/src/Command/WorkerCommand.php index 6541f19..e755b34 100644 --- a/src/Command/WorkerCommand.php +++ b/src/Command/WorkerCommand.php @@ -34,6 +34,7 @@ use Enqueue\Consumption\Extension\LimitConsumptionTimeExtension; use Enqueue\Consumption\Extension\LoggerExtension; use Enqueue\Consumption\ExtensionInterface; +use Interop\Queue\Processor as InteropProcessor; use Psr\Log\LoggerInterface; use Psr\Log\NullLogger; @@ -170,6 +171,34 @@ protected function getLogger(Arguments $args): LoggerInterface return $logger ?? new NullLogger(); } + /** + * Creates and returns a Processor object + * + * @param \Cake\Console\Arguments $args Arguments + * @param \Cake\Console\ConsoleIo $io ConsoleIo + * @param \Psr\Log\LoggerInterface $logger Logger instance + * @return \Interop\Queue\Processor + */ + protected function getProcessor(Arguments $args, ConsoleIo $io, LoggerInterface $logger): InteropProcessor + { + $configKey = (string)$args->getOption('config'); + $config = QueueManager::getConfig($configKey); + + $processorClass = $config['processor'] ?? Processor::class; + + if (!class_exists($processorClass)) { + $io->error(sprintf(sprintf('Processor class %s not found', $processorClass))); + $this->abort(); + } + + if (!is_subclass_of($processorClass, InteropProcessor::class)) { + $io->error(sprintf(sprintf('Processor class %s must implement Interop\Queue\Processor', $processorClass))); + $this->abort(); + } + + return new $processorClass($logger, $this->container); + } + /** * @param \Cake\Console\Arguments $args Arguments * @param \Cake\Console\ConsoleIo $io ConsoleIo @@ -184,7 +213,7 @@ public function execute(Arguments $args, ConsoleIo $io): int } $logger = $this->getLogger($args); - $processor = new Processor($logger, $this->container); + $processor = $this->getProcessor($args, $io, $logger); $extension = $this->getQueueExtension($args, $logger); $hasListener = Configure::check(sprintf('Queue.%s.listener', $config)); @@ -197,7 +226,10 @@ public function execute(Arguments $args, ConsoleIo $io): int /** @var \Cake\Event\EventListenerInterface $listener */ $listener = new $listenerClassName(); - $processor->getEventManager()->on($listener); + + if ($processor instanceof Processor) { + $processor->getEventManager()->on($listener); + } } $client = QueueManager::engine($config); $queue = $args->getOption('queue') diff --git a/tests/TestCase/Command/WorkerCommandTest.php b/tests/TestCase/Command/WorkerCommandTest.php index 38f458a..5eb1623 100644 --- a/tests/TestCase/Command/WorkerCommandTest.php +++ b/tests/TestCase/Command/WorkerCommandTest.php @@ -21,6 +21,7 @@ use Cake\Log\Log; use Cake\Queue\QueueManager; use Cake\Queue\Test\test_app\src\Job\LogToDebugWithServiceJob; +use Cake\Queue\Test\test_app\src\Queue\TestCustomProcessor; use Cake\Queue\Test\TestCase\DebugLogTrait; use Cake\TestSuite\TestCase; use TestApp\Job\LogToDebugJob; @@ -320,4 +321,134 @@ public function testQueueProcessesWithUniqueCacheConfigured() $this->assertDebugLogContains('Consumption has started'); } + + /** + * Test that queue uses default processor when no processor is specified. + * + * @runInSeparateProcess + */ + public function testQueueUsesDefaultProcessor() + { + $config = [ + 'queue' => 'default', + 'url' => 'file:///' . TMP . DS . 'queue', + 'receiveTimeout' => 100, + ]; + Configure::write('Queue', ['default' => $config]); + + Log::setConfig('debug', [ + 'className' => 'Array', + 'levels' => ['notice', 'info', 'debug'], + ]); + + QueueManager::setConfig('default', $config); + QueueManager::push(LogToDebugJob::class); + QueueManager::drop('default'); + + $this->exec('queue worker --max-jobs=1 --logger=debug --verbose'); + + $this->assertDebugLogContains('Debug job was run'); + } + + /** + * Test that queue uses custom processor when specified in configuration. + * + * @runInSeparateProcess + */ + public function testQueueUsesCustomProcessor() + { + $config = [ + 'queue' => 'default', + 'url' => 'file:///' . TMP . DS . 'queue', + 'receiveTimeout' => 100, + 'processor' => TestCustomProcessor::class, + ]; + Configure::write('Queue', ['default' => $config]); + + Log::setConfig('debug', [ + 'className' => 'Array', + 'levels' => ['notice', 'info', 'debug'], + ]); + + QueueManager::setConfig('default', $config); + QueueManager::push(LogToDebugJob::class); + QueueManager::drop('default'); + + $this->exec('queue worker --max-jobs=1 --logger=debug --verbose'); + + $this->assertDebugLogContains('Debug job was run'); + $this->assertDebugLogContains('TestCustomProcessor processing message'); + $this->assertDebugLogContains('Message processed successfully by TestCustomProcessor'); + } + + /** + * Test that queue aborts when custom processor class does not exist. + * + * @runInSeparateProcess + */ + public function testQueueAbortsWithNonExistentProcessor() + { + $config = [ + 'queue' => 'default', + 'url' => 'file:///' . TMP . DS . 'queue', + 'receiveTimeout' => 100, + 'processor' => 'NonExistentProcessor', + ]; + Configure::write('Queue', ['default' => $config]); + + $this->exec('queue worker --max-runtime=0'); + + $this->assertErrorContains('Processor class NonExistentProcessor not found'); + } + + /** + * Test that queue aborts when custom processor does not implement Interop\Queue\Processor. + * + * @runInSeparateProcess + */ + public function testQueueAbortsWithInvalidProcessor() + { + $config = [ + 'queue' => 'default', + 'url' => 'file:///' . TMP . DS . 'queue', + 'receiveTimeout' => 100, + 'processor' => 'stdClass', + ]; + Configure::write('Queue', ['default' => $config]); + + $this->exec('queue worker --max-runtime=0'); + + $this->assertErrorContains('Processor class stdClass must implement Interop\Queue\Processor'); + } + + /** + * Test that custom processor works with listener configuration. + * + * @runInSeparateProcess + */ + public function testCustomProcessorWithListener() + { + $config = [ + 'queue' => 'default', + 'url' => 'file:///' . TMP . DS . 'queue', + 'receiveTimeout' => 100, + 'processor' => TestCustomProcessor::class, + 'listener' => WelcomeMailerListener::class, + ]; + Configure::write('Queue', ['default' => $config]); + + Log::setConfig('debug', [ + 'className' => 'Array', + 'levels' => ['notice', 'info', 'debug'], + ]); + + QueueManager::setConfig('default', $config); + QueueManager::push(LogToDebugJob::class); + QueueManager::drop('default'); + + $this->exec('queue worker --max-jobs=1 --logger=debug --verbose'); + + $this->assertDebugLogContains('Debug job was run'); + $this->assertDebugLogContains('TestCustomProcessor processing message'); + } } diff --git a/tests/test_app/src/Queue/TestCustomProcessor.php b/tests/test_app/src/Queue/TestCustomProcessor.php new file mode 100644 index 0000000..56f72eb --- /dev/null +++ b/tests/test_app/src/Queue/TestCustomProcessor.php @@ -0,0 +1,124 @@ +logger = $logger ?: new NullLogger(); + $this->container = $container; + } + + /** + * Process a message from the queue + * + * @param \Interop\Queue\Message $message The queue message + * @param \Interop\Queue\Context $context The queue context + * @return string|object Processing result + */ + public function process(QueueMessage $message, Context $context): string|object + { + $this->logger->debug('TestCustomProcessor processing message'); + + $jobMessage = new Message($message, $context, $this->container); + try { + $jobMessage->getCallable(); + } catch (RuntimeException | Error $e) { + $this->logger->debug('Invalid callable for message. Rejecting message from queue.'); + $this->dispatchEvent('Processor.message.invalid', ['message' => $jobMessage]); + + return InteropProcessor::REJECT; + } + + $this->dispatchEvent('Processor.message.start', ['message' => $jobMessage]); + + try { + $response = $this->processMessage($jobMessage); + } catch (Throwable $e) { + $message->setProperty('jobException', $e); + + $this->logger->debug(sprintf('Message encountered exception: %s', $e->getMessage())); + $this->dispatchEvent('Processor.message.exception', [ + 'message' => $jobMessage, + 'exception' => $e, + ]); + + return Result::requeue('Exception occurred while processing message'); + } + + if ($response === InteropProcessor::ACK) { + $this->logger->debug('Message processed successfully by TestCustomProcessor'); + $this->dispatchEvent('Processor.message.success', ['message' => $jobMessage]); + + return InteropProcessor::ACK; + } + + if ($response === InteropProcessor::REJECT) { + $this->logger->debug('Message processed with rejection by TestCustomProcessor'); + $this->dispatchEvent('Processor.message.reject', ['message' => $jobMessage]); + + return InteropProcessor::REJECT; + } + + $this->logger->debug('Message processed with failure, requeuing by TestCustomProcessor'); + $this->dispatchEvent('Processor.message.failure', ['message' => $jobMessage]); + + return InteropProcessor::REQUEUE; + } + + /** + * Process the job message and return the result + * + * @param \Cake\Queue\Job\Message $message The job message + * @return string|object Processing result + */ + public function processMessage(Message $message): string|object + { + $callable = $message->getCallable(); + $response = $callable($message); + if ($response === null) { + $response = InteropProcessor::ACK; + } + + return $response; + } +}