Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion phpunit.xml.dist
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
colors="true"
cacheDirectory=".phpunit.cache"
bootstrap="tests/bootstrap.php"
xsi:noNamespaceSchemaLocation="https://schema.phpunit.de/10.1/phpunit.xsd">
xsi:noNamespaceSchemaLocation="https://schema.phpunit.de/12.0/phpunit.xsd">
<testsuites>
<testsuite name="queue">
<directory>tests/TestCase</directory>
Expand Down
222 changes: 222 additions & 0 deletions src/Command/SubprocessJobRunnerCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
<?php
declare(strict_types=1);

/**
* CakePHP(tm) : Rapid Development Framework (https://cakephp.org)
* Copyright (c) Cake Software Foundation, Inc. (https://cakefoundation.org/)
*
* Licensed under The MIT License
* For full copyright and license information, please see the LICENSE.txt
* Redistributions of files must retain the above copyright notice.
*
* @copyright Copyright (c) Cake Software Foundation, Inc. (https://cakefoundation.org/)
* @link https://cakephp.org CakePHP(tm) Project
* @since 2.2.0
* @license https://opensource.org/licenses/MIT MIT License
*/
namespace Cake\Queue\Command;

use Cake\Command\Command;
use Cake\Console\Arguments;
use Cake\Console\ConsoleIo;
use Cake\Core\ContainerInterface;
use Cake\Log\Engine\ConsoleLog;
use Cake\Log\Log;
use Cake\Queue\Job\Message;
use Cake\Queue\Queue\Processor;
use Enqueue\Null\NullConnectionFactory;
use Enqueue\Null\NullMessage;
use Interop\Queue\Message as QueueMessage;
use Interop\Queue\Processor as InteropProcessor;
use JsonException;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
use RuntimeException;
use Throwable;

/**
* Subprocess job runner command.
* Executes a single job in an isolated subprocess.
*/
class SubprocessJobRunnerCommand extends Command
{
/**
* @param \Cake\Core\ContainerInterface|null $container DI container instance
*/
public function __construct(
protected readonly ?ContainerInterface $container = null,
) {
}

/**
* Get the command name.
*
* @return string
*/
public static function defaultName(): string
{
return 'queue subprocess_runner';
}

/**
* Execute a single job from STDIN and output result to STDOUT.
*
* @param \Cake\Console\Arguments $args Arguments
* @param \Cake\Console\ConsoleIo $io ConsoleIo
* @return int
*/
public function execute(Arguments $args, ConsoleIo $io): int
{
$input = $this->readInput($io);

if ($input === '') {
$this->outputResult($io, [
'success' => false,
'error' => 'No input received',
]);

return self::CODE_ERROR;
}

try {
$data = json_decode($input, true, 512, JSON_THROW_ON_ERROR);
} catch (JsonException $jsonException) {
$this->outputResult($io, [
'success' => false,
'error' => 'Invalid JSON input: ' . $jsonException->getMessage(),
]);

return self::CODE_ERROR;
}

try {
$result = $this->executeJob($data);
$this->outputResult($io, [
'success' => true,
'result' => $result,
]);

return self::CODE_SUCCESS;
} catch (Throwable $throwable) {
$this->outputResult($io, [
'success' => false,
'result' => InteropProcessor::REQUEUE,
'exception' => [
'class' => get_class($throwable),
'message' => $throwable->getMessage(),
'code' => $throwable->getCode(),
'file' => $throwable->getFile(),
'line' => $throwable->getLine(),
'trace' => $throwable->getTraceAsString(),
],
]);

return self::CODE_SUCCESS;
}
}

/**
* Read input from STDIN or ConsoleIo
*
* @param \Cake\Console\ConsoleIo $io ConsoleIo
* @return string
*/
protected function readInput(ConsoleIo $io): string
{
$input = '';
while (!feof(STDIN)) {
$chunk = fread(STDIN, 8192);
if ($chunk === false) {
break;
}

$input .= $chunk;
}

return $input;
}

/**
* Execute the job with the provided data.
*
* @param array<string, mixed> $data Job data
* @return string
*/
protected function executeJob(array $data): string
{
$connectionFactory = new NullConnectionFactory();
$context = $connectionFactory->createContext();

$messageClass = $data['messageClass'] ?? NullMessage::class;

// Validate message class for security
if (!class_exists($messageClass) || !is_subclass_of($messageClass, QueueMessage::class)) {
throw new RuntimeException(sprintf('Invalid message class: %s', $messageClass));
}

$messageBody = json_encode($data['body']);

/** @var \Interop\Queue\Message $queueMessage */
$queueMessage = new $messageClass($messageBody);

if (isset($data['properties']) && is_array($data['properties'])) {
foreach ($data['properties'] as $key => $value) {
$queueMessage->setProperty($key, $value);
}
}

$logger = $this->configureLogging($data);

$message = new Message($queueMessage, $context, $this->container);
$processor = new Processor($logger, $this->container);

$result = $processor->processMessage($message);

// Result is string|object (with __toString)
/** @phpstan-ignore cast.string */
return is_string($result) ? $result : (string)$result;
}

/**
* Configure logging to use STDERR to prevent job logs from contaminating STDOUT.
* Reconfigures all CakePHP loggers to write to STDERR with no additional formatting.
*
* @param array<string, mixed> $data Job data
* @return \Psr\Log\LoggerInterface
*/
protected function configureLogging(array $data): LoggerInterface
{
// Drop all existing loggers to prevent duplicate logging
foreach (Log::configured() as $loggerName) {
Log::drop($loggerName);
}

// Configure a single stderr logger
Log::setConfig('default', [
'className' => ConsoleLog::class,
'stream' => 'php://stderr',
]);

$logger = Log::engine('default');
if (!$logger instanceof LoggerInterface) {
$logger = new NullLogger();
}

return $logger;
}

/**
* Output result as JSON to STDOUT.
*
* @param \Cake\Console\ConsoleIo $io ConsoleIo
* @param array<string, mixed> $result Result data
* @return void
*/
protected function outputResult(ConsoleIo $io, array $result): void
{
$json = json_encode($result);
if ($json !== false) {
$io->out($json);
}
}
}
34 changes: 31 additions & 3 deletions src/Command/WorkerCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
use Cake\Queue\Consumption\RemoveUniqueJobIdFromCacheExtension;
use Cake\Queue\Listener\FailedJobsListener;
use Cake\Queue\Queue\Processor;
use Cake\Queue\Queue\SubprocessProcessor;
use Cake\Queue\QueueManager;
use DateTime;
use Enqueue\Consumption\ChainExtension;
Expand Down Expand Up @@ -105,6 +106,11 @@ public function getOptionParser(): ConsoleOptionParser
'default' => null,
'short' => 'a',
]);
$parser->addOption('subprocess', [
'help' => 'Execute jobs in a subprocess. Useful for development to reload code for each job.',
'boolean' => true,
'default' => false,
]);
$parser->setDescription(
'Runs a queue worker that consumes from the named queue.',
);
Expand Down Expand Up @@ -154,12 +160,13 @@ protected function getQueueExtension(Arguments $args, LoggerInterface $logger):
* Creates and returns a LoggerInterface object
*
* @param \Cake\Console\Arguments $args Arguments
* @param bool $forceLogger Force logger creation even without verbose flag
* @return \Psr\Log\LoggerInterface
*/
protected function getLogger(Arguments $args): LoggerInterface
protected function getLogger(Arguments $args, bool $forceLogger = false): LoggerInterface
{
$logger = null;
if (!empty($args->getOption('verbose'))) {
if ($forceLogger || !empty($args->getOption('verbose'))) {
$logger = Log::engine((string)$args->getOption('logger'));
}

Expand Down Expand Up @@ -191,7 +198,28 @@ protected function getProcessor(Arguments $args, ConsoleIo $io, LoggerInterface
$this->abort();
}

return new $processorClass($logger, $this->container);
// Check subprocess mode before instantiating processor
if ($args->getOption('subprocess') || ($config['subprocess']['enabled'] ?? false)) {
if ($processorClass !== Processor::class && !is_subclass_of($processorClass, Processor::class)) {
$io->error('Subprocess mode is only supported with the default Processor class');
$this->abort();
}

$subprocessConfig = array_merge(
$config['subprocess'] ?? [],
[
'enabled' => true,
'logger' => $config['logger'] ?? (string)$args->getOption('logger'),
],
);

$subprocessLogger = $this->getLogger($args, forceLogger: true);
$processor = new SubprocessProcessor($subprocessLogger, $subprocessConfig, $this->container);
} else {
$processor = new $processorClass($logger, $this->container);
}

return $processor;
}

/**
Expand Down
14 changes: 13 additions & 1 deletion src/Queue/Processor.php
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public function process(QueueMessage $message, Context $context): string|object
$this->dispatchEvent('Processor.message.start', ['message' => $jobMessage]);

try {
$response = $this->processMessage($jobMessage);
$response = $this->executeJob($jobMessage, $message);
} catch (Throwable $throwable) {
$message->setProperty('jobException', $throwable);

Expand Down Expand Up @@ -116,6 +116,18 @@ public function process(QueueMessage $message, Context $context): string|object
return InteropProcessor::REQUEUE;
}

/**
* Execute the job and return the response.
*
* @param \Cake\Queue\Job\Message $jobMessage Job message wrapper
* @param \Interop\Queue\Message $queueMessage Original queue message
* @return object|string with __toString method implemented
*/
protected function executeJob(Message $jobMessage, QueueMessage $queueMessage): string|object
{
return $this->processMessage($jobMessage);
}

/**
* @param \Cake\Queue\Job\Message $message Message.
* @return object|string with __toString method implemented
Expand Down
Loading