Skip to content

Commit aa87aee

Browse files
committed
refactor: subprocessProcessor now extends processor
1 parent 487b954 commit aa87aee

File tree

2 files changed

+20
-71
lines changed

2 files changed

+20
-71
lines changed

src/Queue/Processor.php

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public function process(QueueMessage $message, Context $context): string|object
7171
$this->dispatchEvent('Processor.message.start', ['message' => $jobMessage]);
7272

7373
try {
74-
$response = $this->processMessage($jobMessage);
74+
$response = $this->executeJob($jobMessage, $message);
7575
} catch (Throwable $throwable) {
7676
$message->setProperty('jobException', $throwable);
7777

@@ -116,6 +116,18 @@ public function process(QueueMessage $message, Context $context): string|object
116116
return InteropProcessor::REQUEUE;
117117
}
118118

119+
/**
120+
* Execute the job and return the response.
121+
*
122+
* @param \Cake\Queue\Job\Message $jobMessage Job message wrapper
123+
* @param \Interop\Queue\Message $queueMessage Original queue message
124+
* @return object|string with __toString method implemented
125+
*/
126+
protected function executeJob(Message $jobMessage, QueueMessage $queueMessage): string|object
127+
{
128+
return $this->processMessage($jobMessage);
129+
}
130+
119131
/**
120132
* @param \Cake\Queue\Job\Message $message Message.
121133
* @return object|string with __toString method implemented

src/Queue/SubprocessProcessor.php

Lines changed: 7 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,9 @@
1818

1919
use Cake\Core\ContainerInterface;
2020
use Cake\Queue\Job\Message;
21-
use Enqueue\Consumption\Result;
22-
use Error;
23-
use Interop\Queue\Context;
2421
use Interop\Queue\Message as QueueMessage;
25-
use Interop\Queue\Processor as InteropProcessor;
2622
use Psr\Log\LoggerInterface;
2723
use RuntimeException;
28-
use Throwable;
2924

3025
/**
3126
* Subprocess processor that executes jobs in isolated PHP processes.
@@ -70,76 +65,18 @@ public function __construct(
7065
}
7166

7267
/**
73-
* Process a message in a subprocess.
74-
* Overrides parent to execute in subprocess, but reuses parent's event dispatching.
68+
* Execute the job in a subprocess.
7569
*
76-
* @param \Interop\Queue\Message $message Message.
77-
* @param \Interop\Queue\Context $context Context.
70+
* @param \Cake\Queue\Job\Message $jobMessage Job message wrapper
71+
* @param \Interop\Queue\Message $queueMessage Original queue message
7872
* @return object|string with __toString method implemented
7973
*/
80-
public function process(QueueMessage $message, Context $context): string|object
74+
protected function executeJob(Message $jobMessage, QueueMessage $queueMessage): string|object
8175
{
82-
$this->dispatchEvent('Processor.message.seen', ['queueMessage' => $message]);
76+
$jobData = $this->prepareJobData($queueMessage);
77+
$subprocessResult = $this->executeInSubprocess($jobData);
8378

84-
$jobMessage = new Message($message, $context, $this->container);
85-
try {
86-
$jobMessage->getCallable();
87-
} catch (RuntimeException | Error $e) {
88-
$this->logger->debug('Invalid callable for message. Rejecting message from queue.');
89-
$this->dispatchEvent('Processor.message.invalid', ['message' => $jobMessage]);
90-
91-
return InteropProcessor::REJECT;
92-
}
93-
94-
$startTime = microtime(true) * 1000;
95-
$this->dispatchEvent('Processor.message.start', ['message' => $jobMessage]);
96-
97-
try {
98-
$jobData = $this->prepareJobData($message);
99-
$subprocessResult = $this->executeInSubprocess($jobData);
100-
$response = $this->handleSubprocessResult($subprocessResult, $message);
101-
} catch (Throwable $throwable) {
102-
$message->setProperty('jobException', $throwable);
103-
104-
$this->logger->debug(sprintf('Message encountered exception: %s', $throwable->getMessage()));
105-
$this->dispatchEvent('Processor.message.exception', [
106-
'message' => $jobMessage,
107-
'exception' => $throwable,
108-
'duration' => (int)((microtime(true) * 1000) - $startTime),
109-
]);
110-
111-
return Result::requeue('Exception occurred while processing message');
112-
}
113-
114-
$duration = (int)((microtime(true) * 1000) - $startTime);
115-
116-
if ($response === InteropProcessor::ACK) {
117-
$this->logger->debug('Message processed successfully');
118-
$this->dispatchEvent('Processor.message.success', [
119-
'message' => $jobMessage,
120-
'duration' => $duration,
121-
]);
122-
123-
return InteropProcessor::ACK;
124-
}
125-
126-
if ($response === InteropProcessor::REJECT) {
127-
$this->logger->debug('Message processed with rejection');
128-
$this->dispatchEvent('Processor.message.reject', [
129-
'message' => $jobMessage,
130-
'duration' => $duration,
131-
]);
132-
133-
return InteropProcessor::REJECT;
134-
}
135-
136-
$this->logger->debug('Message processed with failure, requeuing');
137-
$this->dispatchEvent('Processor.message.failure', [
138-
'message' => $jobMessage,
139-
'duration' => $duration,
140-
]);
141-
142-
return InteropProcessor::REQUEUE;
79+
return $this->handleSubprocessResult($subprocessResult, $queueMessage);
14380
}
14481

14582
/**

0 commit comments

Comments
 (0)