Skip to content

Commit 3ce6055

Browse files
committed
feat: initial sub-processor support
1 parent 0f09498 commit 3ce6055

File tree

15 files changed

+1445
-17
lines changed

15 files changed

+1445
-17
lines changed

phpunit.xml.dist

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
colors="true"
44
cacheDirectory=".phpunit.cache"
55
bootstrap="tests/bootstrap.php"
6-
xsi:noNamespaceSchemaLocation="https://schema.phpunit.de/10.1/phpunit.xsd">
6+
xsi:noNamespaceSchemaLocation="https://schema.phpunit.de/12.0/phpunit.xsd">
77
<testsuites>
88
<testsuite name="queue">
99
<directory>tests/TestCase</directory>
Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
<?php
2+
declare(strict_types=1);
3+
4+
/**
5+
* CakePHP(tm) : Rapid Development Framework (https://cakephp.org)
6+
* Copyright (c) Cake Software Foundation, Inc. (https://cakefoundation.org/)
7+
*
8+
* Licensed under The MIT License
9+
* For full copyright and license information, please see the LICENSE.txt
10+
* Redistributions of files must retain the above copyright notice.
11+
*
12+
* @copyright Copyright (c) Cake Software Foundation, Inc. (https://cakefoundation.org/)
13+
* @link https://cakephp.org CakePHP(tm) Project
14+
* @since 0.1.0
15+
* @license https://opensource.org/licenses/MIT MIT License
16+
*/
17+
namespace Cake\Queue\Command;
18+
19+
use Cake\Command\Command;
20+
use Cake\Console\Arguments;
21+
use Cake\Console\ConsoleIo;
22+
use Cake\Core\ContainerInterface;
23+
use Cake\Queue\Job\Message;
24+
use Cake\Queue\Queue\Processor;
25+
use Enqueue\Null\NullConnectionFactory;
26+
use Enqueue\Null\NullMessage;
27+
use Interop\Queue\Processor as InteropProcessor;
28+
use Psr\Log\NullLogger;
29+
use Throwable;
30+
31+
/**
32+
* Subprocess job runner command.
33+
* Executes a single job in an isolated subprocess.
34+
*/
35+
class SubprocessJobRunnerCommand extends Command
36+
{
37+
/**
38+
* @param \Cake\Core\ContainerInterface|null $container DI container instance
39+
*/
40+
public function __construct(
41+
protected readonly ?ContainerInterface $container = null,
42+
) {
43+
}
44+
45+
/**
46+
* Get the command name.
47+
*
48+
* @return string
49+
*/
50+
public static function defaultName(): string
51+
{
52+
return 'queue subprocess-runner';
53+
}
54+
55+
/**
56+
* Execute a single job from STDIN and output result to STDOUT.
57+
*
58+
* @param \Cake\Console\Arguments $args Arguments
59+
* @param \Cake\Console\ConsoleIo $io ConsoleIo
60+
* @return int
61+
*/
62+
public function execute(Arguments $args, ConsoleIo $io): int
63+
{
64+
$input = $this->readInput($io);
65+
66+
if (empty($input)) {
67+
$this->outputResult($io, [
68+
'success' => false,
69+
'error' => 'No input received',
70+
]);
71+
72+
return self::CODE_ERROR;
73+
}
74+
75+
$data = json_decode($input, true);
76+
if (json_last_error() !== JSON_ERROR_NONE) {
77+
$this->outputResult($io, [
78+
'success' => false,
79+
'error' => 'Invalid JSON input: ' . json_last_error_msg(),
80+
]);
81+
82+
return self::CODE_ERROR;
83+
}
84+
85+
try {
86+
$result = $this->executeJob($data);
87+
$this->outputResult($io, [
88+
'success' => true,
89+
'result' => $result,
90+
]);
91+
92+
return self::CODE_SUCCESS;
93+
} catch (Throwable $throwable) {
94+
$this->outputResult($io, [
95+
'success' => false,
96+
'result' => InteropProcessor::REQUEUE,
97+
'exception' => [
98+
'class' => get_class($throwable),
99+
'message' => $throwable->getMessage(),
100+
'code' => $throwable->getCode(),
101+
'file' => $throwable->getFile(),
102+
'line' => $throwable->getLine(),
103+
'trace' => $throwable->getTraceAsString(),
104+
],
105+
]);
106+
107+
return self::CODE_SUCCESS;
108+
}
109+
}
110+
111+
/**
112+
* Read input from STDIN or ConsoleIo
113+
*
114+
* @param \Cake\Console\ConsoleIo $io ConsoleIo
115+
* @return string
116+
*/
117+
protected function readInput(ConsoleIo $io): string
118+
{
119+
$input = '';
120+
while (!feof(STDIN)) {
121+
$chunk = fread(STDIN, 8192);
122+
if ($chunk === false) {
123+
break;
124+
}
125+
126+
$input .= $chunk;
127+
if ($input !== '' && strlen($chunk) < 8192) {
128+
break;
129+
}
130+
}
131+
132+
return $input;
133+
}
134+
135+
/**
136+
* Execute the job with the provided data.
137+
*
138+
* @param array<string, mixed> $data Job data
139+
* @return string
140+
*/
141+
protected function executeJob(array $data): string
142+
{
143+
$connectionFactory = new NullConnectionFactory();
144+
$context = $connectionFactory->createContext();
145+
146+
$messageClass = $data['messageClass'] ?? NullMessage::class;
147+
$messageBody = json_encode($data['body']);
148+
149+
/** @var \Interop\Queue\Message $queueMessage */
150+
$queueMessage = new $messageClass($messageBody);
151+
152+
if (isset($data['properties']) && is_array($data['properties'])) {
153+
foreach ($data['properties'] as $key => $value) {
154+
$queueMessage->setProperty($key, $value);
155+
}
156+
}
157+
158+
$message = new Message($queueMessage, $context, $this->container);
159+
$processor = new Processor(new NullLogger(), $this->container);
160+
161+
$result = $processor->processMessage($message);
162+
163+
// Result is string|object (with __toString)
164+
/** @phpstan-ignore cast.string */
165+
return is_string($result) ? $result : (string)$result;
166+
}
167+
168+
/**
169+
* Output result as JSON to STDOUT.
170+
*
171+
* @param \Cake\Console\ConsoleIo $io ConsoleIo
172+
* @param array<string, mixed> $result Result data
173+
* @return void
174+
*/
175+
protected function outputResult(ConsoleIo $io, array $result): void
176+
{
177+
$json = json_encode($result);
178+
if ($json !== false) {
179+
$io->out($json);
180+
}
181+
}
182+
}

src/Command/WorkerCommand.php

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
use Cake\Queue\Consumption\RemoveUniqueJobIdFromCacheExtension;
2929
use Cake\Queue\Listener\FailedJobsListener;
3030
use Cake\Queue\Queue\Processor;
31+
use Cake\Queue\Queue\SubprocessProcessor;
3132
use Cake\Queue\QueueManager;
3233
use DateTime;
3334
use Enqueue\Consumption\ChainExtension;
@@ -105,6 +106,11 @@ public function getOptionParser(): ConsoleOptionParser
105106
'default' => null,
106107
'short' => 'a',
107108
]);
109+
$parser->addOption('subprocess', [
110+
'help' => 'Execute jobs in a subprocess. Useful for development to reload code for each job.',
111+
'boolean' => true,
112+
'default' => false,
113+
]);
108114
$parser->setDescription(
109115
'Runs a queue worker that consumes from the named queue.',
110116
);
@@ -191,7 +197,23 @@ protected function getProcessor(Arguments $args, ConsoleIo $io, LoggerInterface
191197
$this->abort();
192198
}
193199

194-
return new $processorClass($logger, $this->container);
200+
$processor = new $processorClass($logger, $this->container);
201+
202+
if ($args->getOption('subprocess') || ($config['subprocess']['enabled'] ?? false)) {
203+
$subprocessConfig = array_merge(
204+
$config['subprocess'] ?? [],
205+
['enabled' => true],
206+
);
207+
208+
if (!($processor instanceof Processor)) {
209+
$io->error('Subprocess mode is only supported with the default Processor class');
210+
$this->abort();
211+
}
212+
213+
$processor = new SubprocessProcessor($logger, $subprocessConfig, $this->container);
214+
}
215+
216+
return $processor;
195217
}
196218

197219
/**

0 commit comments

Comments
 (0)