diff --git a/phpunit.xml.dist b/phpunit.xml.dist index fa7442a..dfeea09 100644 --- a/phpunit.xml.dist +++ b/phpunit.xml.dist @@ -3,7 +3,7 @@ colors="true" cacheDirectory=".phpunit.cache" bootstrap="tests/bootstrap.php" - xsi:noNamespaceSchemaLocation="https://schema.phpunit.de/10.1/phpunit.xsd"> + xsi:noNamespaceSchemaLocation="https://schema.phpunit.de/12.0/phpunit.xsd"> tests/TestCase diff --git a/src/Command/SubprocessJobRunnerCommand.php b/src/Command/SubprocessJobRunnerCommand.php new file mode 100644 index 0000000..53e2814 --- /dev/null +++ b/src/Command/SubprocessJobRunnerCommand.php @@ -0,0 +1,222 @@ +readInput($io); + + if ($input === '') { + $this->outputResult($io, [ + 'success' => false, + 'error' => 'No input received', + ]); + + return self::CODE_ERROR; + } + + try { + $data = json_decode($input, true, 512, JSON_THROW_ON_ERROR); + } catch (JsonException $jsonException) { + $this->outputResult($io, [ + 'success' => false, + 'error' => 'Invalid JSON input: ' . $jsonException->getMessage(), + ]); + + return self::CODE_ERROR; + } + + try { + $result = $this->executeJob($data); + $this->outputResult($io, [ + 'success' => true, + 'result' => $result, + ]); + + return self::CODE_SUCCESS; + } catch (Throwable $throwable) { + $this->outputResult($io, [ + 'success' => false, + 'result' => InteropProcessor::REQUEUE, + 'exception' => [ + 'class' => get_class($throwable), + 'message' => $throwable->getMessage(), + 'code' => $throwable->getCode(), + 'file' => $throwable->getFile(), + 'line' => $throwable->getLine(), + 'trace' => $throwable->getTraceAsString(), + ], + ]); + + return self::CODE_SUCCESS; + } + } + + /** + * Read input from STDIN or ConsoleIo + * + * @param \Cake\Console\ConsoleIo $io ConsoleIo + * @return string + */ + protected function readInput(ConsoleIo $io): string + { + $input = ''; + while (!feof(STDIN)) { + $chunk = fread(STDIN, 8192); + if ($chunk === false) { + break; + } + + $input .= $chunk; + } + + return $input; + } + + /** + * Execute the job with the provided data. + * + * @param array $data Job data + * @return string + */ + protected function executeJob(array $data): string + { + $connectionFactory = new NullConnectionFactory(); + $context = $connectionFactory->createContext(); + + $messageClass = $data['messageClass'] ?? NullMessage::class; + + // Validate message class for security + if (!class_exists($messageClass) || !is_subclass_of($messageClass, QueueMessage::class)) { + throw new RuntimeException(sprintf('Invalid message class: %s', $messageClass)); + } + + $messageBody = json_encode($data['body']); + + /** @var \Interop\Queue\Message $queueMessage */ + $queueMessage = new $messageClass($messageBody); + + if (isset($data['properties']) && is_array($data['properties'])) { + foreach ($data['properties'] as $key => $value) { + $queueMessage->setProperty($key, $value); + } + } + + $logger = $this->configureLogging($data); + + $message = new Message($queueMessage, $context, $this->container); + $processor = new Processor($logger, $this->container); + + $result = $processor->processMessage($message); + + // Result is string|object (with __toString) + /** @phpstan-ignore cast.string */ + return is_string($result) ? $result : (string)$result; + } + + /** + * Configure logging to use STDERR to prevent job logs from contaminating STDOUT. + * Reconfigures all CakePHP loggers to write to STDERR with no additional formatting. + * + * @param array $data Job data + * @return \Psr\Log\LoggerInterface + */ + protected function configureLogging(array $data): LoggerInterface + { + // Drop all existing loggers to prevent duplicate logging + foreach (Log::configured() as $loggerName) { + Log::drop($loggerName); + } + + // Configure a single stderr logger + Log::setConfig('default', [ + 'className' => ConsoleLog::class, + 'stream' => 'php://stderr', + ]); + + $logger = Log::engine('default'); + if (!$logger instanceof LoggerInterface) { + $logger = new NullLogger(); + } + + return $logger; + } + + /** + * Output result as JSON to STDOUT. + * + * @param \Cake\Console\ConsoleIo $io ConsoleIo + * @param array $result Result data + * @return void + */ + protected function outputResult(ConsoleIo $io, array $result): void + { + $json = json_encode($result); + if ($json !== false) { + $io->out($json); + } + } +} diff --git a/src/Command/WorkerCommand.php b/src/Command/WorkerCommand.php index 44605d1..44ec0c1 100644 --- a/src/Command/WorkerCommand.php +++ b/src/Command/WorkerCommand.php @@ -28,6 +28,7 @@ use Cake\Queue\Consumption\RemoveUniqueJobIdFromCacheExtension; use Cake\Queue\Listener\FailedJobsListener; use Cake\Queue\Queue\Processor; +use Cake\Queue\Queue\SubprocessProcessor; use Cake\Queue\QueueManager; use DateTime; use Enqueue\Consumption\ChainExtension; @@ -105,6 +106,11 @@ public function getOptionParser(): ConsoleOptionParser 'default' => null, 'short' => 'a', ]); + $parser->addOption('subprocess', [ + 'help' => 'Execute jobs in a subprocess. Useful for development to reload code for each job.', + 'boolean' => true, + 'default' => false, + ]); $parser->setDescription( 'Runs a queue worker that consumes from the named queue.', ); @@ -154,12 +160,13 @@ protected function getQueueExtension(Arguments $args, LoggerInterface $logger): * Creates and returns a LoggerInterface object * * @param \Cake\Console\Arguments $args Arguments + * @param bool $forceLogger Force logger creation even without verbose flag * @return \Psr\Log\LoggerInterface */ - protected function getLogger(Arguments $args): LoggerInterface + protected function getLogger(Arguments $args, bool $forceLogger = false): LoggerInterface { $logger = null; - if (!empty($args->getOption('verbose'))) { + if ($forceLogger || !empty($args->getOption('verbose'))) { $logger = Log::engine((string)$args->getOption('logger')); } @@ -191,7 +198,28 @@ protected function getProcessor(Arguments $args, ConsoleIo $io, LoggerInterface $this->abort(); } - return new $processorClass($logger, $this->container); + // Check subprocess mode before instantiating processor + if ($args->getOption('subprocess') || ($config['subprocess']['enabled'] ?? false)) { + if ($processorClass !== Processor::class && !is_subclass_of($processorClass, Processor::class)) { + $io->error('Subprocess mode is only supported with the default Processor class'); + $this->abort(); + } + + $subprocessConfig = array_merge( + $config['subprocess'] ?? [], + [ + 'enabled' => true, + 'logger' => $config['logger'] ?? (string)$args->getOption('logger'), + ], + ); + + $subprocessLogger = $this->getLogger($args, forceLogger: true); + $processor = new SubprocessProcessor($subprocessLogger, $subprocessConfig, $this->container); + } else { + $processor = new $processorClass($logger, $this->container); + } + + return $processor; } /** diff --git a/src/Queue/Processor.php b/src/Queue/Processor.php index bff6628..e1fed4c 100644 --- a/src/Queue/Processor.php +++ b/src/Queue/Processor.php @@ -71,7 +71,7 @@ public function process(QueueMessage $message, Context $context): string|object $this->dispatchEvent('Processor.message.start', ['message' => $jobMessage]); try { - $response = $this->processMessage($jobMessage); + $response = $this->executeJob($jobMessage, $message); } catch (Throwable $throwable) { $message->setProperty('jobException', $throwable); @@ -116,6 +116,18 @@ public function process(QueueMessage $message, Context $context): string|object return InteropProcessor::REQUEUE; } + /** + * Execute the job and return the response. + * + * @param \Cake\Queue\Job\Message $jobMessage Job message wrapper + * @param \Interop\Queue\Message $queueMessage Original queue message + * @return object|string with __toString method implemented + */ + protected function executeJob(Message $jobMessage, QueueMessage $queueMessage): string|object + { + return $this->processMessage($jobMessage); + } + /** * @param \Cake\Queue\Job\Message $message Message. * @return object|string with __toString method implemented diff --git a/src/Queue/SubprocessProcessor.php b/src/Queue/SubprocessProcessor.php new file mode 100644 index 0000000..494fc50 --- /dev/null +++ b/src/Queue/SubprocessProcessor.php @@ -0,0 +1,283 @@ + [ + * 'default' => [ + * 'subprocess' => [ + * 'command' => 'php bin/cake.php queue subprocess_runner', + * 'timeout' => 60, + * 'maxOutputSize' => 2097152, // 2MB + * ], + * ], + * ], + * ``` + * + * Extends Processor to reuse event handling and processing logic (DRY principle). + */ +class SubprocessProcessor extends Processor +{ + /** + * @param \Psr\Log\LoggerInterface $logger Logger instance + * @param array $config Subprocess configuration options + * @param \Cake\Core\ContainerInterface|null $container DI container instance + */ + public function __construct( + LoggerInterface $logger, + protected readonly array $config = [], + ?ContainerInterface $container = null, + ) { + parent::__construct($logger, $container); + } + + /** + * Execute the job in a subprocess. + * + * @param \Cake\Queue\Job\Message $jobMessage Job message wrapper + * @param \Interop\Queue\Message $queueMessage Original queue message + * @return object|string with __toString method implemented + */ + protected function executeJob(Message $jobMessage, QueueMessage $queueMessage): string|object + { + $jobData = $this->prepareJobData($queueMessage); + $subprocessResult = $this->executeInSubprocess($jobData); + + return $this->handleSubprocessResult($subprocessResult, $queueMessage); + } + + /** + * Handle subprocess result and return appropriate response. + * + * @param array $result Subprocess result + * @param \Interop\Queue\Message $message Original message + * @return string + * @throws \RuntimeException + */ + protected function handleSubprocessResult(array $result, QueueMessage $message): string + { + if ($result['success']) { + return $result['result']; + } + + if (isset($result['exception'])) { + $exception = $this->reconstructException($result['exception']); + $message->setProperty('jobException', $exception); + + throw $exception; + } + + throw new RuntimeException($result['error'] ?? 'Subprocess execution failed'); + } + + /** + * Prepare job data for subprocess execution. + * + * @param \Interop\Queue\Message $message Message + * @return array + */ + protected function prepareJobData(QueueMessage $message): array + { + $body = json_decode($message->getBody(), true); + if (json_last_error() !== JSON_ERROR_NONE) { + throw new RuntimeException('Invalid JSON in message body'); + } + + $properties = $message->getProperties(); + + return [ + 'messageClass' => get_class($message), + 'body' => $body, + 'properties' => $properties, + 'logger' => $this->config['logger'] ?? 'stderr', + ]; + } + + /** + * Execute job in subprocess. + * + * @param array $jobData Job data + * @return array + */ + protected function executeInSubprocess(array $jobData): array + { + $command = $this->config['command'] ?? 'php bin/cake.php queue subprocess_runner'; + $timeout = $this->config['timeout'] ?? 300; + + $descriptors = [ + 0 => ['pipe', 'r'], + 1 => ['pipe', 'w'], + 2 => ['pipe', 'w'], + ]; + + $process = proc_open($command, $descriptors, $pipes); + + if (!is_resource($process)) { + throw new RuntimeException('Failed to create subprocess'); + } + + try { + $jobDataJson = json_encode($jobData); + if ($jobDataJson !== false) { + fwrite($pipes[0], $jobDataJson); + } + + fclose($pipes[0]); + + $output = ''; + $errorOutput = ''; + $startTime = time(); + $maxOutputSize = $this->config['maxOutputSize'] ?? 1048576; // 1MB default + + stream_set_blocking($pipes[1], false); + stream_set_blocking($pipes[2], false); + + while (true) { + if ($timeout > 0 && (time() - $startTime) > $timeout) { + proc_terminate($process, 9); + + return [ + 'success' => false, + 'error' => sprintf('Subprocess execution timeout after %d seconds', $timeout), + ]; + } + + $read = [$pipes[1], $pipes[2]]; + $write = null; + $except = null; + $selectResult = stream_select($read, $write, $except, 1); + + if ($selectResult === false) { + return [ + 'success' => false, + 'error' => 'Stream select failed', + ]; + } + + if (in_array($pipes[1], $read)) { + $chunk = fread($pipes[1], 8192); + if ($chunk !== false) { + if (strlen($output) + strlen($chunk) > $maxOutputSize) { + proc_terminate($process, 9); + + return [ + 'success' => false, + 'error' => sprintf('Subprocess output exceeded maximum size of %d bytes', $maxOutputSize), + ]; + } + + $output .= $chunk; + } + } + + if (in_array($pipes[2], $read)) { + $chunk = fread($pipes[2], 8192); + if ($chunk !== false) { + if (strlen($errorOutput) + strlen($chunk) > $maxOutputSize) { + proc_terminate($process, 9); + + return [ + 'success' => false, + 'error' => sprintf( + 'Subprocess error output exceeded maximum size of %d bytes', + $maxOutputSize, + ), + ]; + } + + $errorOutput .= $chunk; + // Stream subprocess logs to parent's stderr in real-time + // Skip in PHPUnit test context to avoid test framework issues + if (!defined('PHPUNIT_COMPOSER_INSTALL') && !defined('__PHPUNIT_PHAR__')) { + fwrite(STDERR, $chunk); + } + } + } + + if (feof($pipes[1]) && feof($pipes[2])) { + break; + } + } + } finally { + // Always cleanup resources + if (is_resource($pipes[1])) { + fclose($pipes[1]); + } + + if (is_resource($pipes[2])) { + fclose($pipes[2]); + } + } + + $exitCode = proc_close($process); + + if ($exitCode !== 0 && empty($output)) { + return [ + 'success' => false, + 'error' => sprintf('Subprocess exited with code %d. Error: %s', $exitCode, $errorOutput), + ]; + } + + $result = json_decode($output, true); + if (json_last_error() !== JSON_ERROR_NONE) { + return [ + 'success' => false, + 'error' => 'Invalid JSON output from subprocess: ' . $output, + ]; + } + + return $result; + } + + /** + * Reconstruct exception from array data. + * + * @param array $exceptionData Exception data + * @return \RuntimeException + */ + protected function reconstructException(array $exceptionData): RuntimeException + { + $message = sprintf( + '%s: %s in %s:%d', + $exceptionData['class'] ?? 'Exception', + $exceptionData['message'] ?? 'Unknown error', + $exceptionData['file'] ?? 'unknown', + $exceptionData['line'] ?? 0, + ); + + return new RuntimeException($message, (int)($exceptionData['code'] ?? 0)); + } +} diff --git a/src/QueuePlugin.php b/src/QueuePlugin.php index 9f7202b..874e38a 100644 --- a/src/QueuePlugin.php +++ b/src/QueuePlugin.php @@ -25,6 +25,7 @@ use Cake\Queue\Command\JobCommand; use Cake\Queue\Command\PurgeFailedCommand; use Cake\Queue\Command\RequeueCommand; +use Cake\Queue\Command\SubprocessJobRunnerCommand; use Cake\Queue\Command\WorkerCommand; use InvalidArgumentException; @@ -80,6 +81,7 @@ public function console(CommandCollection $commands): CommandCollection return $commands ->add('queue worker', WorkerCommand::class) ->add('worker', WorkerCommand::class) + ->add('queue subprocess_runner', SubprocessJobRunnerCommand::class) ->add('queue requeue', RequeueCommand::class) ->add('queue purge_failed', PurgeFailedCommand::class); } @@ -96,5 +98,8 @@ public function services(ContainerInterface $container): void $container ->add(WorkerCommand::class) ->addArgument(ContainerInterface::class); + $container + ->add(SubprocessJobRunnerCommand::class) + ->addArgument(ContainerInterface::class); } } diff --git a/tests/TestCase/Command/SubprocessJobRunnerCommandTest.php b/tests/TestCase/Command/SubprocessJobRunnerCommandTest.php new file mode 100644 index 0000000..7ecb5c2 --- /dev/null +++ b/tests/TestCase/Command/SubprocessJobRunnerCommandTest.php @@ -0,0 +1,496 @@ + NullMessage::class, + 'body' => [ + 'class' => [TestProcessor::class, 'processReturnAck'], + 'args' => [], + ], + 'properties' => [], + ]; + + $command = new SubprocessJobRunnerCommand(); + $reflection = new ReflectionClass($command); + $method = $reflection->getMethod('executeJob'); + + $result = $method->invoke($command, $jobData); + + $this->assertSame(Processor::ACK, $result); + } + + /** + * Test that executeJob handles REJECT response + */ + public function testExecuteJobReturnsReject(): void + { + $jobData = [ + 'messageClass' => NullMessage::class, + 'body' => [ + 'class' => [TestProcessor::class, 'processReturnReject'], + 'args' => [], + ], + 'properties' => [], + ]; + + $command = new SubprocessJobRunnerCommand(); + $reflection = new ReflectionClass($command); + $method = $reflection->getMethod('executeJob'); + + $result = $method->invoke($command, $jobData); + + $this->assertSame(Processor::REJECT, $result); + } + + /** + * Test that executeJob handles REQUEUE response + */ + public function testExecuteJobReturnsRequeue(): void + { + $jobData = [ + 'messageClass' => NullMessage::class, + 'body' => [ + 'class' => [TestProcessor::class, 'processReturnRequeue'], + 'args' => [], + ], + 'properties' => [], + ]; + + $command = new SubprocessJobRunnerCommand(); + $reflection = new ReflectionClass($command); + $method = $reflection->getMethod('executeJob'); + + $result = $method->invoke($command, $jobData); + + $this->assertSame(Processor::REQUEUE, $result); + } + + /** + * Test that executeJob handles job returning null (defaults to ACK) + */ + public function testExecuteJobReturnsNull(): void + { + $jobData = [ + 'messageClass' => NullMessage::class, + 'body' => [ + 'class' => [TestProcessor::class, 'processReturnNull'], + 'args' => [], + ], + 'properties' => [], + ]; + + $command = new SubprocessJobRunnerCommand(); + $reflection = new ReflectionClass($command); + $method = $reflection->getMethod('executeJob'); + + $result = $method->invoke($command, $jobData); + + $this->assertSame(Processor::ACK, $result); + } + + /** + * Test that executeJob handles properties correctly + */ + public function testExecuteJobWithProperties(): void + { + $jobData = [ + 'messageClass' => NullMessage::class, + 'body' => [ + 'class' => [TestProcessor::class, 'processReturnAck'], + 'args' => [], + ], + 'properties' => [ + 'attempts' => 1, + 'custom_property' => 'test_value', + ], + ]; + + $command = new SubprocessJobRunnerCommand(); + $reflection = new ReflectionClass($command); + $method = $reflection->getMethod('executeJob'); + + $result = $method->invoke($command, $jobData); + + $this->assertSame(Processor::ACK, $result); + } + + /** + * Test execute with invalid JSON input + */ + public function testExecuteWithInvalidJson(): void + { + $command = $this->getMockBuilder(SubprocessJobRunnerCommand::class) + ->onlyMethods(['readInput']) + ->getMock(); + + $command->expects($this->once()) + ->method('readInput') + ->willReturn('invalid json {]'); + + $args = $this->createStub(Arguments::class); + $io = $this->createMock(ConsoleIo::class); + + $io->expects($this->once()) + ->method('out') + ->with($this->stringContains('Invalid JSON input')); + + $result = $command->execute($args, $io); + + $this->assertSame(SubprocessJobRunnerCommand::CODE_ERROR, $result); + } + + /** + * Test execute with empty input + */ + public function testExecuteWithEmptyInput(): void + { + $command = $this->getMockBuilder(SubprocessJobRunnerCommand::class) + ->onlyMethods(['readInput']) + ->getMock(); + + $command->expects($this->once()) + ->method('readInput') + ->willReturn(''); + + $args = $this->createStub(Arguments::class); + $io = $this->createMock(ConsoleIo::class); + + $io->expects($this->once()) + ->method('out') + ->with($this->stringContains('No input received')); + + $result = $command->execute($args, $io); + + $this->assertSame(SubprocessJobRunnerCommand::CODE_ERROR, $result); + } + + /** + * Test execute with job that throws exception + */ + public function testExecuteWithJobException(): void + { + $jobData = [ + 'messageClass' => NullMessage::class, + 'body' => [ + 'class' => [TestProcessor::class, 'processAndThrowException'], + 'args' => [], + ], + 'properties' => [], + ]; + + $command = $this->getMockBuilder(SubprocessJobRunnerCommand::class) + ->onlyMethods(['readInput']) + ->getMock(); + + $command->expects($this->once()) + ->method('readInput') + ->willReturn(json_encode($jobData)); + + $args = $this->createStub(Arguments::class); + $io = $this->createMock(ConsoleIo::class); + + $io->expects($this->once()) + ->method('out') + ->with($this->callback(function ($output) { + $result = json_decode($output, true); + + return $result['success'] === false && + isset($result['exception']) && + in_array($result['exception']['class'], ['RuntimeException', 'Exception']); + })); + + $result = $command->execute($args, $io); + + $this->assertSame(SubprocessJobRunnerCommand::CODE_SUCCESS, $result); + } + + /** + * Test outputResult method + */ + public function testOutputResult(): void + { + $command = new SubprocessJobRunnerCommand(); + $reflection = new ReflectionClass($command); + $method = $reflection->getMethod('outputResult'); + + $io = $this->createMock(ConsoleIo::class); + $io->expects($this->once()) + ->method('out') + ->with('{"success":true,"result":"ack"}'); + + $method->invoke($command, $io, ['success' => true, 'result' => 'ack']); + } + + /** + * Test that subprocess jobs with multiple log lines properly separate logs from JSON output + */ + public function testLogsRedirectedToStderr(): void + { + $jobData = [ + 'messageClass' => NullMessage::class, + 'body' => [ + 'class' => [MultilineLogJob::class, 'execute'], + 'args' => [], + ], + 'properties' => [], + 'logger' => 'debug', + ]; + + $command = 'php ' . ROOT . 'bin/cake.php queue subprocess_runner'; + + $descriptors = [ + 0 => ['pipe', 'r'], + 1 => ['pipe', 'w'], + 2 => ['pipe', 'w'], + ]; + + $process = proc_open($command, $descriptors, $pipes); + $this->assertIsResource($process); + + // Write job data to STDIN + $jobDataJson = json_encode($jobData); + if ($jobDataJson !== false) { + fwrite($pipes[0], $jobDataJson); + } + + fclose($pipes[0]); + + // Read STDOUT and STDERR + $stdout = stream_get_contents($pipes[1]); + fclose($pipes[1]); + + $stderr = stream_get_contents($pipes[2]); + fclose($pipes[2]); + + proc_close($process); + + // STDOUT should be valid JSON without any log messages + $result = json_decode($stdout, true); + $this->assertIsArray($result, 'STDOUT should contain valid JSON: ' . $stdout); + $this->assertArrayHasKey('success', $result); + $this->assertTrue($result['success']); + $this->assertSame(Processor::ACK, $result['result']); + + // STDOUT should not contain any job log messages + $this->assertStringNotContainsString('Job execution started', $stdout); + $this->assertStringNotContainsString('Processing step', $stdout); + $this->assertStringNotContainsString('Job execution finished', $stdout); + + // All log messages should be in STDERR + $this->assertStringContainsString('Job execution started', $stderr); + $this->assertStringContainsString('Processing step 1 completed', $stderr); + $this->assertStringContainsString('Processing step 2 completed', $stderr); + $this->assertStringContainsString('Processing step 3 completed', $stderr); + $this->assertStringContainsString('Job execution finished', $stderr); + } + + /** + * Test defaultName method + */ + public function testDefaultName(): void + { + $this->assertSame('queue subprocess_runner', SubprocessJobRunnerCommand::defaultName()); + } + + /** + * Test executeJob with invalid message class (non-existent) + */ + public function testExecuteJobWithInvalidMessageClass(): void + { + $jobData = [ + 'messageClass' => 'NonExistentClass', + 'body' => [ + 'class' => [TestProcessor::class, 'processReturnAck'], + 'args' => [], + ], + 'properties' => [], + ]; + + $command = new SubprocessJobRunnerCommand(); + $reflection = new ReflectionClass($command); + $method = $reflection->getMethod('executeJob'); + + $this->expectException(RuntimeException::class); + $this->expectExceptionMessage('Invalid message class'); + + $method->invoke($command, $jobData); + } + + /** + * Test executeJob with non-QueueMessage class + */ + public function testExecuteJobWithNonQueueMessageClass(): void + { + $jobData = [ + 'messageClass' => stdClass::class, + 'body' => [ + 'class' => [TestProcessor::class, 'processReturnAck'], + 'args' => [], + ], + 'properties' => [], + ]; + + $command = new SubprocessJobRunnerCommand(); + $reflection = new ReflectionClass($command); + $method = $reflection->getMethod('executeJob'); + + $this->expectException(RuntimeException::class); + $this->expectExceptionMessage('Invalid message class'); + + $method->invoke($command, $jobData); + } + + /** + * Test configureLogging with fallback to NullLogger + */ + public function testConfigureLoggingConfiguresStderrLogger(): void + { + $jobData = ['logger' => 'stderr']; + + $command = new SubprocessJobRunnerCommand(); + $reflection = new ReflectionClass($command); + $method = $reflection->getMethod('configureLogging'); + + $logger = $method->invoke($command, $jobData); + + $this->assertInstanceOf(LoggerInterface::class, $logger); + } + + /** + * Test outputResult with valid JSON + */ + public function testOutputResultWithValidData(): void + { + $command = new SubprocessJobRunnerCommand(); + $reflection = new ReflectionClass($command); + $method = $reflection->getMethod('outputResult'); + + $io = $this->createMock(ConsoleIo::class); + $io->expects($this->once()) + ->method('out') + ->with('{"success":true,"result":"ack"}'); + + $method->invoke($command, $io, ['success' => true, 'result' => 'ack']); + } + + /** + * Test outputResult with data that cannot be JSON encoded + */ + public function testOutputResultWithInvalidJsonData(): void + { + $command = new SubprocessJobRunnerCommand(); + $reflection = new ReflectionClass($command); + $method = $reflection->getMethod('outputResult'); + + $io = $this->createMock(ConsoleIo::class); + $io->expects($this->never()) + ->method('out'); + + // Create data with a resource which cannot be JSON encoded + $resource = fopen('php://memory', 'r'); + $this->assertIsResource($resource); + $method->invoke($command, $io, ['resource' => $resource]); + if (is_resource($resource)) { + fclose($resource); + } + } + + /** + * Test readInput with multiple chunks + */ + public function testReadInputWithLargeData(): void + { + // We can't easily mock STDIN, so we'll verify the method exists and is protected + // Large data reading is already covered by the integration test (testLogsRedirectedToStderr) + $reflection = new ReflectionClass(SubprocessJobRunnerCommand::class); + $this->assertTrue($reflection->hasMethod('readInput')); + + $method = $reflection->getMethod('readInput'); + $this->assertTrue($method->isProtected()); + } + + /** + * Test constructor with container + */ + public function testConstructorWithContainer(): void + { + $container = $this->createStub(ContainerInterface::class); + $command = new SubprocessJobRunnerCommand($container); + + $this->assertInstanceOf(SubprocessJobRunnerCommand::class, $command); + } + + /** + * Test constructor without container + */ + public function testConstructorWithoutContainer(): void + { + $command = new SubprocessJobRunnerCommand(); + + $this->assertInstanceOf(SubprocessJobRunnerCommand::class, $command); + } + + /** + * Test executeJob when message body json_encode fails + */ + public function testExecuteJobWithJsonEncodeFailure(): void + { + // PHP's json_encode can fail with certain data (like invalid UTF-8) + // However, in this code path json_encode is called on $data['body'] which is already decoded + // So this edge case is hard to trigger. We'll test normal flow is covered. + $jobData = [ + 'messageClass' => NullMessage::class, + 'body' => [ + 'class' => [TestProcessor::class, 'processReturnAck'], + 'args' => [], + ], + 'properties' => [], + ]; + + $command = new SubprocessJobRunnerCommand(); + $reflection = new ReflectionClass($command); + $method = $reflection->getMethod('executeJob'); + + $result = $method->invoke($command, $jobData); + + // Verify it successfully encodes and processes + $this->assertIsString($result); + } +} diff --git a/tests/TestCase/Command/WorkerCommandTest.php b/tests/TestCase/Command/WorkerCommandTest.php index f77552d..9a29e82 100644 --- a/tests/TestCase/Command/WorkerCommandTest.php +++ b/tests/TestCase/Command/WorkerCommandTest.php @@ -438,4 +438,70 @@ public function testCustomProcessorWithListener() $this->assertDebugLogContains('Debug job was run'); $this->assertDebugLogContains('TestCustomProcessor processing message'); } + + /** + * Test that queue processes job with subprocess flag + */ + #[RunInSeparateProcess] + public function testQueueProcessesJobWithSubprocessFlag() + { + $config = [ + 'queue' => 'default', + 'url' => 'file:///' . TMP . DS . 'queue', + 'receiveTimeout' => 100, + 'subprocess' => [ + 'command' => 'php ' . ROOT . 'bin/cake.php queue subprocess_runner', + ], + ]; + Configure::write('Queue', ['default' => $config]); + + Log::setConfig('debug', [ + 'className' => 'Array', + 'levels' => ['notice', 'info', 'debug'], + ]); + + QueueManager::setConfig('default', $config); + QueueManager::push(LogToDebugJob::class); + QueueManager::drop('default'); + + $this->exec('queue worker --max-jobs=1 --subprocess --logger=debug --verbose'); + + // In subprocess mode, logs from the job itself are isolated to the subprocess + // We can only verify that the parent process logged successful processing + $this->assertDebugLogContains('Message processed successfully'); + } + + /** + * Test that queue processes job with subprocess enabled in config + */ + #[RunInSeparateProcess] + public function testQueueProcessesJobWithSubprocessConfig() + { + $config = [ + 'queue' => 'default', + 'url' => 'file:///' . TMP . DS . 'queue', + 'receiveTimeout' => 100, + 'subprocess' => [ + 'enabled' => true, + 'timeout' => 30, + 'command' => 'php ' . ROOT . 'bin/cake.php queue subprocess_runner', + ], + ]; + Configure::write('Queue', ['default' => $config]); + + Log::setConfig('debug', [ + 'className' => 'Array', + 'levels' => ['notice', 'info', 'debug'], + ]); + + QueueManager::setConfig('default', $config); + QueueManager::push(LogToDebugJob::class); + QueueManager::drop('default'); + + $this->exec('queue worker --max-jobs=1 --logger=debug --verbose'); + + // In subprocess mode, logs from the job itself are isolated to the subprocess + // We can only verify that the parent process logged successful processing + $this->assertDebugLogContains('Message processed successfully'); + } } diff --git a/tests/TestCase/Mailer/Transport/QueueTransportTest.php b/tests/TestCase/Mailer/Transport/QueueTransportTest.php index 7da8041..761c6b3 100644 --- a/tests/TestCase/Mailer/Transport/QueueTransportTest.php +++ b/tests/TestCase/Mailer/Transport/QueueTransportTest.php @@ -27,7 +27,7 @@ class QueueTransportTest extends TestCase { use QueueTestTrait; - private $fsQueuePath = TMP . DS . 'queue'; + private string $fsQueuePath = TMP . DS . 'queue'; private function getFsQueueUrl(): string { @@ -41,10 +41,8 @@ private function getFsQueueFile(): string /** * Test send - * - * @return void */ - public function testSend() + public function testSend(): void { QueueManager::setConfig('default', [ 'queue' => 'default', @@ -87,10 +85,8 @@ public function testSend() /** * Test send custom transport - * - * @return void */ - public function testSendCustomTransport() + public function testSendCustomTransport(): void { QueueManager::setConfig('default', [ 'queue' => 'default', @@ -118,10 +114,8 @@ public function testSendCustomTransport() /** * Test send backwards compatibility transport config - * - * @return void */ - public function testSendBcTransport() + public function testSendBcTransport(): void { QueueManager::setConfig('default', [ 'queue' => 'default', diff --git a/tests/TestCase/PluginTest.php b/tests/TestCase/PluginTest.php index d695069..16cb70b 100644 --- a/tests/TestCase/PluginTest.php +++ b/tests/TestCase/PluginTest.php @@ -17,13 +17,13 @@ class PluginTest extends TestCase * * @return void */ - public function testBootstrapNoConfig() + public function testBootstrapNoConfig(): void { $this->expectException(InvalidArgumentException::class); $this->expectExceptionMessage('Missing `Queue` configuration key, please check the CakePHP Queue documentation to complete the plugin setup'); Configure::delete('Queue'); $plugin = new QueuePlugin(); - $app = $this->getMockBuilder(Application::class)->disableOriginalConstructor()->getMock(); + $app = $this->createStub(Application::class); $plugin->bootstrap($app); } @@ -32,7 +32,7 @@ public function testBootstrapNoConfig() * * @return void */ - public function testBootstrapWithConfig() + public function testBootstrapWithConfig(): void { $queueConfig = [ 'url' => 'null:', @@ -41,7 +41,7 @@ public function testBootstrapWithConfig() ]; Configure::write('Queue', ['default' => $queueConfig]); $plugin = new QueuePlugin(); - $app = $this->getMockBuilder(Application::class)->disableOriginalConstructor()->getMock(); + $app = $this->createStub(Application::class); $plugin->bootstrap($app); $queueConfig['url'] = [ 'transport' => 'null:', diff --git a/tests/TestCase/Queue/SubprocessProcessorTest.php b/tests/TestCase/Queue/SubprocessProcessorTest.php new file mode 100644 index 0000000..3f9540f --- /dev/null +++ b/tests/TestCase/Queue/SubprocessProcessorTest.php @@ -0,0 +1,621 @@ + [TestProcessor::class, 'processReturnAck'], + 'args' => ['test' => 'data'], + ]; + $queueMessage = new NullMessage(json_encode($messageBody) ?: ''); + $queueMessage->setProperty('attempts', 1); + + $logger = new ArrayLog(); + $processor = new SubprocessProcessor($logger); + + $reflection = new ReflectionClass($processor); + $method = $reflection->getMethod('prepareJobData'); + + $jobData = $method->invoke($processor, $queueMessage); + + $this->assertArrayHasKey('messageClass', $jobData); + $this->assertArrayHasKey('body', $jobData); + $this->assertArrayHasKey('properties', $jobData); + $this->assertSame($messageBody, $jobData['body']); + $this->assertArrayHasKey('attempts', $jobData['properties']); + $this->assertSame(1, $jobData['properties']['attempts']); + } + + /** + * Test reconstructException method + */ + public function testReconstructException(): void + { + $exceptionData = [ + 'class' => 'RuntimeException', + 'message' => 'Test error', + 'code' => 500, + 'file' => '/path/to/file.php', + 'line' => 42, + ]; + + $logger = new ArrayLog(); + $processor = new SubprocessProcessor($logger); + + $reflection = new ReflectionClass($processor); + $method = $reflection->getMethod('reconstructException'); + + $exception = $method->invoke($processor, $exceptionData); + + $this->assertInstanceOf(RuntimeException::class, $exception); + $this->assertStringContainsString('Test error', $exception->getMessage()); + $this->assertStringContainsString('RuntimeException', $exception->getMessage()); + $this->assertSame(500, $exception->getCode()); + } + + /** + * Test handleSubprocessResult with success response + */ + public function testHandleSubprocessResultSuccess(): void + { + $result = [ + 'success' => true, + 'result' => InteropProcessor::ACK, + ]; + + $messageBody = ['class' => [TestProcessor::class, 'processReturnAck'], 'args' => []]; + $queueMessage = new NullMessage(json_encode($messageBody) ?: ''); + + $logger = new ArrayLog(); + $processor = new SubprocessProcessor($logger); + + $reflection = new ReflectionClass($processor); + $method = $reflection->getMethod('handleSubprocessResult'); + + $actual = $method->invoke($processor, $result, $queueMessage); + + $this->assertSame(InteropProcessor::ACK, $actual); + } + + /** + * Test handleSubprocessResult with reject response + */ + public function testHandleSubprocessResultReject(): void + { + $result = [ + 'success' => true, + 'result' => InteropProcessor::REJECT, + ]; + + $messageBody = ['class' => [TestProcessor::class, 'processReturnReject'], 'args' => []]; + $queueMessage = new NullMessage(json_encode($messageBody) ?: ''); + + $logger = new ArrayLog(); + $processor = new SubprocessProcessor($logger); + + $reflection = new ReflectionClass($processor); + $method = $reflection->getMethod('handleSubprocessResult'); + + $actual = $method->invoke($processor, $result, $queueMessage); + + $this->assertSame(InteropProcessor::REJECT, $actual); + } + + /** + * Test handleSubprocessResult with requeue response + */ + public function testHandleSubprocessResultRequeue(): void + { + $result = [ + 'success' => true, + 'result' => InteropProcessor::REQUEUE, + ]; + + $messageBody = ['class' => [TestProcessor::class, 'processReturnRequeue'], 'args' => []]; + $queueMessage = new NullMessage(json_encode($messageBody) ?: ''); + + $logger = new ArrayLog(); + $processor = new SubprocessProcessor($logger); + + $reflection = new ReflectionClass($processor); + $method = $reflection->getMethod('handleSubprocessResult'); + + $actual = $method->invoke($processor, $result, $queueMessage); + + $this->assertSame(InteropProcessor::REQUEUE, $actual); + } + + /** + * Test handleSubprocessResult with exception + */ + public function testHandleSubprocessResultWithException(): void + { + $result = [ + 'success' => false, + 'exception' => [ + 'class' => 'RuntimeException', + 'message' => 'Test error', + 'code' => 500, + 'file' => '/path/to/file.php', + 'line' => 42, + ], + ]; + + $messageBody = ['class' => [TestProcessor::class, 'processAndThrowException'], 'args' => []]; + $queueMessage = new NullMessage(json_encode($messageBody) ?: ''); + + $logger = new ArrayLog(); + $processor = new SubprocessProcessor($logger); + + $reflection = new ReflectionClass($processor); + $method = $reflection->getMethod('handleSubprocessResult'); + + $this->expectException(RuntimeException::class); + $this->expectExceptionMessage('Test error'); + + $method->invoke($processor, $result, $queueMessage); + } + + /** + * Test handleSubprocessResult with error message + */ + public function testHandleSubprocessResultWithError(): void + { + $result = [ + 'success' => false, + 'error' => 'Subprocess execution failed', + ]; + + $messageBody = ['class' => [TestProcessor::class, 'processReturnAck'], 'args' => []]; + $queueMessage = new NullMessage(json_encode($messageBody) ?: ''); + + $logger = new ArrayLog(); + $processor = new SubprocessProcessor($logger); + + $reflection = new ReflectionClass($processor); + $method = $reflection->getMethod('handleSubprocessResult'); + + $this->expectException(RuntimeException::class); + $this->expectExceptionMessage('Subprocess execution failed'); + + $method->invoke($processor, $result, $queueMessage); + } + + /** + * Test real subprocess execution with ACK result + */ + public function testRealSubprocessExecutionAck(): void + { + $messageBody = [ + 'class' => [TestProcessor::class, 'processReturnAck'], + 'args' => [], + ]; + $queueMessage = new NullMessage(json_encode($messageBody) ?: ''); + + $logger = new ArrayLog(); + $config = [ + 'command' => 'php ' . ROOT . 'bin/cake.php queue subprocess_runner', + ]; + $processor = new SubprocessProcessor($logger, $config); + + $reflection = new ReflectionClass($processor); + $method = $reflection->getMethod('executeInSubprocess'); + $prepareMethod = $reflection->getMethod('prepareJobData'); + + $jobData = $prepareMethod->invoke($processor, $queueMessage); + $result = $method->invoke($processor, $jobData); + + $this->assertTrue($result['success']); + $this->assertSame(InteropProcessor::ACK, $result['result']); + } + + /** + * Test real subprocess execution with REJECT result + */ + public function testRealSubprocessExecutionReject(): void + { + $messageBody = [ + 'class' => [TestProcessor::class, 'processReturnReject'], + 'args' => [], + ]; + $queueMessage = new NullMessage(json_encode($messageBody) ?: ''); + + $logger = new ArrayLog(); + $config = [ + 'command' => 'php ' . ROOT . 'bin/cake.php queue subprocess_runner', + ]; + $processor = new SubprocessProcessor($logger, $config); + + $reflection = new ReflectionClass($processor); + $method = $reflection->getMethod('executeInSubprocess'); + $prepareMethod = $reflection->getMethod('prepareJobData'); + + $jobData = $prepareMethod->invoke($processor, $queueMessage); + $result = $method->invoke($processor, $jobData); + + $this->assertTrue($result['success']); + $this->assertSame(InteropProcessor::REJECT, $result['result']); + } + + /** + * Test real subprocess execution with exception + */ + public function testRealSubprocessExecutionWithException(): void + { + $messageBody = [ + 'class' => [TestProcessor::class, 'processAndThrowException'], + 'args' => [], + ]; + $queueMessage = new NullMessage(json_encode($messageBody) ?: ''); + + $logger = new ArrayLog(); + $config = [ + 'command' => 'php ' . ROOT . 'bin/cake.php queue subprocess_runner', + ]; + $processor = new SubprocessProcessor($logger, $config); + + $reflection = new ReflectionClass($processor); + $method = $reflection->getMethod('executeInSubprocess'); + $prepareMethod = $reflection->getMethod('prepareJobData'); + + $jobData = $prepareMethod->invoke($processor, $queueMessage); + $result = $method->invoke($processor, $jobData); + + $this->assertFalse($result['success']); + $this->assertArrayHasKey('exception', $result); + $this->assertContains($result['exception']['class'], ['RuntimeException', 'Exception']); + } + + /** + * Test subprocess timeout handling + */ + public function testSubprocessTimeout(): void + { + $messageBody = [ + 'class' => [TestProcessor::class, 'processReturnAck'], + 'args' => [], + ]; + + $logger = new ArrayLog(); + $config = [ + 'command' => 'php ' . ROOT . 'bin/cake.php queue subprocess_runner', + 'timeout' => 1, + ]; + $processor = new SubprocessProcessor($logger, $config); + + $reflection = new ReflectionClass($processor); + $method = $reflection->getMethod('executeInSubprocess'); + + // Create job data that simulates a long-running process + $jobData = [ + 'messageClass' => NullMessage::class, + 'body' => $messageBody, + 'properties' => [], + ]; + + $result = $method->invoke($processor, $jobData); + + // Should complete successfully (fast job) or timeout + $this->assertIsArray($result); + $this->assertArrayHasKey('success', $result); + } + + /** + * Test subprocess with invalid command (non-existent binary) + */ + public function testSubprocessWithInvalidCommand(): void + { + $messageBody = [ + 'class' => [TestProcessor::class, 'processReturnAck'], + 'args' => [], + ]; + $message = new NullMessage(json_encode($messageBody) ?: ''); + + $logger = new ArrayLog(); + $config = [ + 'command' => '/nonexistent/binary queue subprocess_runner', + ]; + $processor = new SubprocessProcessor($logger, $config); + + $reflection = new ReflectionClass($processor); + $method = $reflection->getMethod('executeInSubprocess'); + $prepareMethod = $reflection->getMethod('prepareJobData'); + + $jobData = $prepareMethod->invoke($processor, $message); + $result = $method->invoke($processor, $jobData); + + // Invalid command will fail with an error result + $this->assertFalse($result['success']); + $this->assertArrayHasKey('error', $result); + } + + /** + * Test subprocess with invalid JSON in message body + */ + public function testPrepareJobDataWithInvalidJson(): void + { + $queueMessage = new NullMessage('invalid json {]'); + + $logger = new ArrayLog(); + $processor = new SubprocessProcessor($logger); + + $reflection = new ReflectionClass($processor); + $method = $reflection->getMethod('prepareJobData'); + + $this->expectException(RuntimeException::class); + $this->expectExceptionMessage('Invalid JSON in message body'); + + $method->invoke($processor, $queueMessage); + } + + /** + * Test executeInSubprocess returns error when subprocess returns invalid JSON + */ + public function testExecuteInSubprocessWithInvalidJsonOutput(): void + { + $logger = new ArrayLog(); + $config = [ + 'command' => 'echo invalid-json-output', + ]; + $processor = new SubprocessProcessor($logger, $config); + + $reflection = new ReflectionClass($processor); + $method = $reflection->getMethod('executeInSubprocess'); + + $jobData = [ + 'messageClass' => NullMessage::class, + 'body' => ['class' => [TestProcessor::class, 'processReturnAck'], 'args' => []], + 'properties' => [], + ]; + + $result = $method->invoke($processor, $jobData); + + $this->assertFalse($result['success']); + $this->assertArrayHasKey('error', $result); + $this->assertStringContainsString('Invalid JSON output from subprocess', $result['error']); + } + + /** + * Test executeInSubprocess handles non-zero exit code + */ + public function testExecuteInSubprocessWithNonZeroExitCode(): void + { + $logger = new ArrayLog(); + $config = [ + 'command' => 'php -r "exit(1);"', + ]; + $processor = new SubprocessProcessor($logger, $config); + + $reflection = new ReflectionClass($processor); + $method = $reflection->getMethod('executeInSubprocess'); + + $jobData = [ + 'messageClass' => NullMessage::class, + 'body' => ['class' => [TestProcessor::class, 'processReturnAck'], 'args' => []], + 'properties' => [], + ]; + + $result = $method->invoke($processor, $jobData); + + $this->assertFalse($result['success']); + $this->assertArrayHasKey('error', $result); + } + + /** + * Test full process() method with real subprocess execution + */ + public function testProcessJobInSubprocess(): void + { + $messageBody = [ + 'class' => [TestProcessor::class, 'processReturnAck'], + 'args' => [], + ]; + $queueMessage = new NullMessage(json_encode($messageBody) ?: ''); + + $logger = new ArrayLog(); + $config = [ + 'command' => 'php ' . ROOT . 'bin/cake.php queue subprocess_runner', + ]; + $processor = new SubprocessProcessor($logger, $config); + + $context = (new NullConnectionFactory())->createContext(); + $result = $processor->process($queueMessage, $context); + + $this->assertSame(InteropProcessor::ACK, $result); + } + + /** + * Test full process() with job that rejects + */ + public function testProcessJobInSubprocessReject(): void + { + $messageBody = [ + 'class' => [TestProcessor::class, 'processReturnReject'], + 'args' => [], + ]; + $queueMessage = new NullMessage(json_encode($messageBody) ?: ''); + + $logger = new ArrayLog(); + $config = [ + 'command' => 'php ' . ROOT . 'bin/cake.php queue subprocess_runner', + ]; + $processor = new SubprocessProcessor($logger, $config); + + $context = (new NullConnectionFactory())->createContext(); + $result = $processor->process($queueMessage, $context); + + $this->assertSame(InteropProcessor::REJECT, $result); + } + + /** + * Test full process() with job that throws exception + */ + public function testProcessJobInSubprocessWithException(): void + { + $messageBody = [ + 'class' => [TestProcessor::class, 'processAndThrowException'], + 'args' => [], + ]; + $queueMessage = new NullMessage(json_encode($messageBody) ?: ''); + + $logger = new ArrayLog(); + $config = [ + 'command' => 'php ' . ROOT . 'bin/cake.php queue subprocess_runner', + ]; + $processor = new SubprocessProcessor($logger, $config); + + $context = (new NullConnectionFactory())->createContext(); + $result = $processor->process($queueMessage, $context); + + // Should requeue on exception - result can be Result object or string + /** @phpstan-ignore cast.string */ + $this->assertStringContainsString('requeue', (string)$result); + } + + /** + * Test subprocess with maxOutputSize limit exceeded + */ + public function testSubprocessMaxOutputSizeExceeded(): void + { + $logger = new ArrayLog(); + $config = [ + 'command' => 'php -r "echo str_repeat(\'a\', 10000);"', + 'maxOutputSize' => 100, // Very small limit + 'timeout' => 5, + ]; + $processor = new SubprocessProcessor($logger, $config); + + $reflection = new ReflectionClass($processor); + $method = $reflection->getMethod('executeInSubprocess'); + + $jobData = [ + 'messageClass' => NullMessage::class, + 'body' => ['class' => [TestProcessor::class, 'processReturnAck'], 'args' => []], + 'properties' => [], + ]; + + $result = $method->invoke($processor, $jobData); + + $this->assertFalse($result['success']); + $this->assertArrayHasKey('error', $result); + $this->assertStringContainsString('output exceeded maximum size', $result['error']); + } + + /** + * Test subprocess with maxOutputSize limit on stderr + */ + public function testSubprocessMaxErrorOutputSizeExceeded(): void + { + $logger = new ArrayLog(); + $config = [ + 'command' => 'php -r "fwrite(STDERR, str_repeat(\'e\', 10000));"', + 'maxOutputSize' => 100, // Very small limit + 'timeout' => 5, + ]; + $processor = new SubprocessProcessor($logger, $config); + + $reflection = new ReflectionClass($processor); + $method = $reflection->getMethod('executeInSubprocess'); + + $jobData = [ + 'messageClass' => NullMessage::class, + 'body' => ['class' => [TestProcessor::class, 'processReturnAck'], 'args' => []], + 'properties' => [], + ]; + + $result = $method->invoke($processor, $jobData); + + $this->assertFalse($result['success']); + $this->assertArrayHasKey('error', $result); + $this->assertStringContainsString('error output exceeded maximum size', $result['error']); + } + + /** + * Test subprocess handles normal sized output correctly + */ + public function testSubprocessWithNormalOutputSize(): void + { + $messageBody = [ + 'class' => [TestProcessor::class, 'processReturnAck'], + 'args' => [], + ]; + $queueMessage = new NullMessage(json_encode($messageBody) ?: ''); + + $logger = new ArrayLog(); + $config = [ + 'command' => 'php ' . ROOT . 'bin/cake.php queue subprocess_runner', + 'maxOutputSize' => 1048576, // 1MB - normal size + 'timeout' => 30, + ]; + $processor = new SubprocessProcessor($logger, $config); + + $reflection = new ReflectionClass($processor); + $method = $reflection->getMethod('executeInSubprocess'); + $prepareMethod = $reflection->getMethod('prepareJobData'); + + $jobData = $prepareMethod->invoke($processor, $queueMessage); + $result = $method->invoke($processor, $jobData); + + $this->assertTrue($result['success']); + $this->assertSame(InteropProcessor::ACK, $result['result']); + } + + /** + * Test executeInSubprocess with very short timeout + */ + public function testSubprocessWithVeryShortTimeout(): void + { + $logger = new ArrayLog(); + $config = [ + 'command' => 'php -r "sleep(5);"', + 'timeout' => 1, // 1 second timeout + ]; + $processor = new SubprocessProcessor($logger, $config); + + $reflection = new ReflectionClass($processor); + $method = $reflection->getMethod('executeInSubprocess'); + + $jobData = [ + 'messageClass' => NullMessage::class, + 'body' => ['class' => [TestProcessor::class, 'processReturnAck'], 'args' => []], + 'properties' => [], + ]; + + $result = $method->invoke($processor, $jobData); + + $this->assertFalse($result['success']); + $this->assertArrayHasKey('error', $result); + $this->assertStringContainsString('timeout', $result['error']); + } +} diff --git a/tests/TestCase/QueueManagerTest.php b/tests/TestCase/QueueManagerTest.php index 04576a0..85c8474 100644 --- a/tests/TestCase/QueueManagerTest.php +++ b/tests/TestCase/QueueManagerTest.php @@ -99,7 +99,7 @@ public function testSetMultipleConfigs() public function testSetConfigWithInvalidConfigValue() { $this->expectException(LogicException::class); - QueueManager::setConfig('test', null); + QueueManager::setConfig('test'); } public function testSetConfigInvalidKeyValue() diff --git a/tests/bootstrap.php b/tests/bootstrap.php index e2f7ee2..a146dea 100644 --- a/tests/bootstrap.php +++ b/tests/bootstrap.php @@ -93,6 +93,13 @@ // The name of a configured logger, default: null 'logger' => 'stdout', + + // Subprocess configuration for development + 'subprocess' => [ + 'enabled' => false, + 'timeout' => 30, + 'command' => 'php ' . ROOT . 'bin/cake.php queue subprocess_runner', + ], ], ]); diff --git a/tests/test_app/bin/cake b/tests/test_app/bin/cake new file mode 100755 index 0000000..4b696c8 --- /dev/null +++ b/tests/test_app/bin/cake @@ -0,0 +1,75 @@ +#!/usr/bin/env sh +################################################################################ +# +# Cake is a shell script for invoking CakePHP shell commands +# +# CakePHP(tm) : Rapid Development Framework (https://cakephp.org) +# Copyright (c) Cake Software Foundation, Inc. (https://cakefoundation.org) +# +# Licensed under The MIT License +# For full copyright and license information, please see the LICENSE.txt +# Redistributions of files must retain the above copyright notice. +# +# @copyright Copyright (c) Cake Software Foundation, Inc. (https://cakefoundation.org) +# @link https://cakephp.org CakePHP(tm) Project +# @since 1.2.0 +# @license https://opensource.org/licenses/mit-license.php MIT License +# +################################################################################ + +# Canonicalize by following every symlink of the given name recursively +canonicalize() { + NAME="$1" + if [ -f "$NAME" ] + then + DIR=$(dirname -- "$NAME") + NAME=$(cd -P "$DIR" > /dev/null && pwd -P)/$(basename -- "$NAME") + fi + while [ -h "$NAME" ]; do + DIR=$(dirname -- "$NAME") + SYM=$(readlink "$NAME") + NAME=$(cd "$DIR" > /dev/null && cd "$(dirname -- "$SYM")" > /dev/null && pwd)/$(basename -- "$SYM") + done + echo "$NAME" +} + +# Find a CLI version of PHP +findCliPhp() { + for TESTEXEC in php php-cli /usr/local/bin/php + do + SAPI=$(echo "" | $TESTEXEC 2>/dev/null) + if [ "$SAPI" = "cli" ] + then + echo $TESTEXEC + return + fi + done + echo "Failed to find a CLI version of PHP; falling back to system standard php executable" >&2 + echo "php"; +} + +# If current path is a symlink, resolve to real path +realname="$0" +if [ -L "$realname" ] +then + realname=$(readlink -f "$0") +fi + +CONSOLE=$(dirname -- "$(canonicalize "$realname")") +APP=$(dirname "$CONSOLE") + +# If your CLI PHP is somewhere that this doesn't find, you can define a PHP environment +# variable with the correct path in it. +if [ -z "$PHP" ] +then + PHP=$(findCliPhp) +fi + +if [ "$(basename "$realname")" != 'cake' ] +then + exec "$PHP" "$CONSOLE"/cake.php "$(basename "$realname")" "$@" +else + exec "$PHP" "$CONSOLE"/cake.php "$@" +fi + +exit diff --git a/tests/test_app/bin/cake.php b/tests/test_app/bin/cake.php new file mode 100644 index 0000000..05dd85a --- /dev/null +++ b/tests/test_app/bin/cake.php @@ -0,0 +1,13 @@ +#!/usr/bin/php -q +run($argv)); diff --git a/tests/test_app/src/Application.php b/tests/test_app/src/Application.php index e8119b2..30484d3 100644 --- a/tests/test_app/src/Application.php +++ b/tests/test_app/src/Application.php @@ -3,6 +3,7 @@ namespace TestApp; +use Cake\Core\Configure; use Cake\Core\ContainerInterface; use Cake\Http\BaseApplication; use Cake\Http\MiddlewareQueue; @@ -24,6 +25,23 @@ public function bootstrap(): void { $this->addPlugin('Cake/Queue'); $this->addPlugin('Bake'); + + // Only set default Queue configuration if no Queue config exists at all + // This allows tests to fully control configuration + if (!Configure::check('Queue')) { + Configure::write('Queue', [ + 'default' => [ + 'url' => 'null:', + 'queue' => 'default', + 'logger' => 'stdout', + 'subprocess' => [ + 'enabled' => false, + 'timeout' => 30, + 'command' => 'php ' . dirname(__DIR__, 2) . '/bin/cake.php queue subprocess_runner', + ], + ], + ]); + } } public function services(ContainerInterface $container): void diff --git a/tests/test_app/src/Job/MultilineLogJob.php b/tests/test_app/src/Job/MultilineLogJob.php new file mode 100644 index 0000000..b5624b5 --- /dev/null +++ b/tests/test_app/src/Job/MultilineLogJob.php @@ -0,0 +1,26 @@ +log('Job execution started', LogLevel::INFO); + $this->log('Processing step 1 completed', LogLevel::INFO); + $this->log('Processing step 2 completed', LogLevel::INFO); + $this->log('Processing step 3 completed', LogLevel::INFO); + $this->log('Job execution finished', LogLevel::INFO); + + return Processor::ACK; + } +}