diff --git a/src/Queue/Processor.php b/src/Queue/Processor.php index 515cf1c..b90601d 100644 --- a/src/Queue/Processor.php +++ b/src/Queue/Processor.php @@ -79,6 +79,7 @@ public function process(QueueMessage $message, Context $context): string|object return InteropProcessor::REJECT; } + $startTime = microtime(true) * 1000; $this->dispatchEvent('Processor.message.start', ['message' => $jobMessage]); try { @@ -90,27 +91,39 @@ public function process(QueueMessage $message, Context $context): string|object $this->dispatchEvent('Processor.message.exception', [ 'message' => $jobMessage, 'exception' => $e, + 'duration' => (int)((microtime(true) * 1000) - $startTime), ]); return Result::requeue('Exception occurred while processing message'); } + $duration = (int)((microtime(true) * 1000) - $startTime); + if ($response === InteropProcessor::ACK) { $this->logger->debug('Message processed successfully'); - $this->dispatchEvent('Processor.message.success', ['message' => $jobMessage]); + $this->dispatchEvent('Processor.message.success', [ + 'message' => $jobMessage, + 'duration' => $duration, + ]); return InteropProcessor::ACK; } if ($response === InteropProcessor::REJECT) { $this->logger->debug('Message processed with rejection'); - $this->dispatchEvent('Processor.message.reject', ['message' => $jobMessage]); + $this->dispatchEvent('Processor.message.reject', [ + 'message' => $jobMessage, + 'duration' => $duration, + ]); return InteropProcessor::REJECT; } $this->logger->debug('Message processed with failure, requeuing'); - $this->dispatchEvent('Processor.message.failure', ['message' => $jobMessage]); + $this->dispatchEvent('Processor.message.failure', [ + 'message' => $jobMessage, + 'duration' => $duration, + ]); return InteropProcessor::REQUEUE; } diff --git a/tests/TestCase/Queue/ProcessorTest.php b/tests/TestCase/Queue/ProcessorTest.php index ec8bbab..0da6b9c 100644 --- a/tests/TestCase/Queue/ProcessorTest.php +++ b/tests/TestCase/Queue/ProcessorTest.php @@ -95,6 +95,11 @@ public function testProcess($jobMethod, $expected, $logMessage, $dispatchedEvent $data = $events[2]->getData(); $this->assertArrayHasKey('message', $data); $this->assertSame($message->jsonSerialize(), $data['message']->jsonSerialize()); + + // Verify timing information is present in completion events + $this->assertArrayHasKey('duration', $data); + $this->assertIsInt($data['duration']); + $this->assertGreaterThanOrEqual(0, $data['duration']); } /** @@ -154,6 +159,14 @@ public function testProcessWillRequeueOnException() $result = $processor->process($queueMessage, $context); $this->assertEquals(InteropProcessor::REQUEUE, $result); + + // Verify timing information is present in exception event + $this->assertSame(3, $events->count()); + $this->assertSame('Processor.message.exception', $events[2]->getName()); + $data = $events[2]->getData(); + $this->assertArrayHasKey('duration', $data); + $this->assertIsInt($data['duration']); + $this->assertGreaterThanOrEqual(0, $data['duration']); } /**