Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 156 additions & 5 deletions docs/en/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ The following configuration should be present in the config array of your **conf
// The name of an event listener class to associate with the worker
'listener' => \App\Listener\WorkerListener::class,

// (optional) The processor class to use for processing messages.
// Must implement Interop\Queue\Processor. Defaults to Cake\Queue\Queue\Processor
'processor' => \App\Queue\CustomProcessor::class,

// The amount of time in milliseconds to sleep if no jobs are currently available. default: 10000
'receiveTimeout' => 10000,

Expand Down Expand Up @@ -119,14 +123,14 @@ A simple job that logs received messages would look like::

/**
* The maximum number of times the job may be attempted. (optional property)
*
*
* @var int|null
*/
public static $maxAttempts = 3;

/**
* Whether there should be only one instance of a job on the queue at a time. (optional property)
*
*
* @var bool
*/
public static $shouldBeUnique = false;
Expand Down Expand Up @@ -299,7 +303,7 @@ queue jobs, you can use the ``QueueTransport``. In your application's

return [
// ... other configuration
'EmailTransport' => [
'EmailTransport' => [
'default' => [
'className' => MailTransport::class,
// Configuration for MailTransport.
Expand All @@ -323,6 +327,153 @@ With this configuration in place, any time you send an email with the ``default`
email profile CakePHP will generate a queue message. Once that queue message is
processed the default ``MailTransport`` will be used to deliver the email messages.

Custom Processors
================

You can customize how messages are processed by specifying a custom processor class
in your queue configuration. Custom processors must implement the ``Interop\Queue\Processor``
interface.

Example custom processor that extends the main Processor::

<?php
declare(strict_types=1);

namespace App\Queue;

use Cake\Core\ContainerInterface;
use Cake\Queue\Job\Message;
use Cake\Queue\Queue\Processor;
use Enqueue\Consumption\Result;
use Error;
use Interop\Queue\Context;
use Interop\Queue\Message as QueueMessage;
use Interop\Queue\Processor as InteropProcessor;
use Psr\Log\LoggerInterface;
use RuntimeException;
use Throwable;

/**
* Timed Processor
*
* Extends the original Processor to add timing metrics to all events.
*/
class TimedProcessor extends Processor
{
/**
* Constructor
*
* @param \Psr\Log\LoggerInterface|null $logger Logger instance
* @param \Cake\Core\ContainerInterface|null $container DI container instance
*/
public function __construct(?LoggerInterface $logger = null, ?ContainerInterface $container = null)
{
parent::__construct($logger, $container);
}

/**
* Process message with timing
*
* @param \Interop\Queue\Message $message Message
* @param \Interop\Queue\Context $context Context
* @return object|string
*/
public function process(QueueMessage $message, Context $context): string|object
{
$this->dispatchEvent('Processor.message.seen', ['queueMessage' => $message]);

$jobMessage = new Message($message, $context, $this->container);
try {
$jobMessage->getCallable();
} catch (RuntimeException | Error $e) {
$this->logger->debug('Invalid callable for message. Rejecting message from queue.');
$this->dispatchEvent('Processor.message.invalid', ['message' => $jobMessage]);

return InteropProcessor::REJECT;
}

$startTime = microtime(true) * 1000;
$this->dispatchEvent('Processor.message.start', ['message' => $jobMessage]);

try {
$response = $this->processMessage($jobMessage);
} catch (Throwable $e) {
$message->setProperty('jobException', $e);

$this->logger->debug(sprintf('Message encountered exception: %s', $e->getMessage()));
$this->dispatchEvent('Processor.message.exception', [
'message' => $jobMessage,
'exception' => $e,
'duration' => (int)((microtime(true) * 1000) - $startTime),
]);

return Result::requeue('Exception occurred while processing message');
}

$duration = (int)((microtime(true) * 1000) - $startTime);

if ($response === InteropProcessor::ACK) {
$this->logger->debug('Message processed successfully');
$this->dispatchEvent('Processor.message.success', [
'message' => $jobMessage,
'duration' => $duration,
]);

return InteropProcessor::ACK;
}

if ($response === InteropProcessor::REJECT) {
$this->logger->debug('Message processed with rejection');
$this->dispatchEvent('Processor.message.reject', [
'message' => $jobMessage,
'duration' => $duration,
]);

return InteropProcessor::REJECT;
}

$this->logger->debug('Message processed with failure, requeuing');
$this->dispatchEvent('Processor.message.failure', [
'message' => $jobMessage,
'duration' => $duration,
]);

return InteropProcessor::REQUEUE;
}
}

Configuration example::

'Queue' => [
'default' => [
'url' => 'redis://localhost:6379',
'queue' => 'default',
// No processor specified - uses default Processor class
],
'timed' => [
'url' => 'redis://localhost:6379',
'queue' => 'timed',
'processor' => \App\Queue\TimedProcessor::class, // Custom processor with timing
],
],

**Note**: If no processor is specified in the configuration, the default
``Cake\Queue\Queue\Processor`` class will be used. Custom processors are useful
for adding custom logging, metrics collection, or specialized message handling.

**Important**: The `--processor` command line option is different from the `processor` configuration option:

- **Configuration `processor`**: Specifies the processor class to use for processing messages
- **Command line `--processor`**: Specifies the processor name for Enqueue topic binding (used in `bindTopic()`)
Comment on lines +464 to +467
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we rename the CLI flag to processor-name (can be done separately)? The current overlap is awkward.


Example usage::

# Use custom processor class from config
bin/cake queue worker --config=timed

# Use custom processor class AND specify topic binding name
bin/cake queue worker --config=timed --processor=my-topic-processor

Run the worker
==============

Expand All @@ -336,7 +487,7 @@ This shell can take a few different options:

- ``--config`` (default: default): Name of a queue config to use
- ``--queue`` (default: default): Name of queue to bind to
- ``--processor`` (default: ``null``): Name of processor to bind to
- ``--processor`` (default: ``null``): Name of processor to bind to (for Enqueue topic binding, not the processor class)
- ``--logger`` (default: ``stdout``): Name of a configured logger
- ``--max-jobs`` (default: ``null``): Maximum number of jobs to process. Worker will exit after limit is reached.
- ``--max-runtime`` (default: ``null``): Maximum number of seconds to run. Worker will exit after limit is reached.
Expand Down Expand Up @@ -370,7 +521,7 @@ Requeue Failed Jobs

Push jobs back onto the queue and remove them from the ``queue_failed_jobs``
table. If a job fails to requeue it is not guaranteed that the job was not run.

.. code-block:: bash

bin/cake queue requeue
Expand Down
36 changes: 34 additions & 2 deletions src/Command/WorkerCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
use Enqueue\Consumption\Extension\LimitConsumptionTimeExtension;
use Enqueue\Consumption\Extension\LoggerExtension;
use Enqueue\Consumption\ExtensionInterface;
use Interop\Queue\Processor as InteropProcessor;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;

Expand Down Expand Up @@ -170,6 +171,34 @@ protected function getLogger(Arguments $args): LoggerInterface
return $logger ?? new NullLogger();
}

/**
* Creates and returns a Processor object
*
* @param \Cake\Console\Arguments $args Arguments
* @param \Cake\Console\ConsoleIo $io ConsoleIo
* @param \Psr\Log\LoggerInterface $logger Logger instance
* @return \Interop\Queue\Processor
*/
protected function getProcessor(Arguments $args, ConsoleIo $io, LoggerInterface $logger): InteropProcessor
{
$configKey = (string)$args->getOption('config');
$config = QueueManager::getConfig($configKey);

$processorClass = $config['processor'] ?? Processor::class;

if (!class_exists($processorClass)) {
$io->error(sprintf(sprintf('Processor class %s not found', $processorClass)));
$this->abort();
}

if (!is_subclass_of($processorClass, InteropProcessor::class)) {
$io->error(sprintf(sprintf('Processor class %s must implement Interop\Queue\Processor', $processorClass)));
$this->abort();
}

return new $processorClass($logger, $this->container);
}

/**
* @param \Cake\Console\Arguments $args Arguments
* @param \Cake\Console\ConsoleIo $io ConsoleIo
Expand All @@ -184,7 +213,7 @@ public function execute(Arguments $args, ConsoleIo $io): int
}

$logger = $this->getLogger($args);
$processor = new Processor($logger, $this->container);
$processor = $this->getProcessor($args, $io, $logger);
$extension = $this->getQueueExtension($args, $logger);

$hasListener = Configure::check(sprintf('Queue.%s.listener', $config));
Expand All @@ -197,7 +226,10 @@ public function execute(Arguments $args, ConsoleIo $io): int

/** @var \Cake\Event\EventListenerInterface $listener */
$listener = new $listenerClassName();
$processor->getEventManager()->on($listener);

if ($processor instanceof Processor) {
$processor->getEventManager()->on($listener);
}
}
$client = QueueManager::engine($config);
$queue = $args->getOption('queue')
Expand Down
Loading
Loading