7
7
use OpenTelemetry \API \Globals ;
8
8
use OpenTelemetry \API \Instrumentation \CachedInstrumentation ;
9
9
use OpenTelemetry \API \Trace \Span ;
10
+ use OpenTelemetry \API \Trace \SpanBuilderInterface ;
10
11
use OpenTelemetry \API \Trace \SpanKind ;
11
12
use OpenTelemetry \API \Trace \StatusCode ;
12
13
use OpenTelemetry \Context \Context ;
26
27
use Symfony \Component \Messenger \Transport \Receiver \ReceiverInterface ;
27
28
use Symfony \Component \Messenger \Transport \Sender \SenderInterface ;
28
29
use Symfony \Component \Messenger \Worker ;
29
- use OpenTelemetry \API \Trace \SpanBuilderInterface ;
30
30
31
31
// Add Amazon SQS stamp class if available
32
32
if (\class_exists ('Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsReceivedStamp ' )) {
@@ -42,10 +42,6 @@ class_alias(
42
42
*/
43
43
final class MessengerInstrumentation
44
44
{
45
- const ATTRIBUTE_MESSAGING_SYSTEM = 'messaging.system ' ;
46
- const ATTRIBUTE_MESSAGING_OPERATION = 'messaging.operation ' ;
47
- const ATTRIBUTE_MESSAGING_DESTINATION = 'messaging.destination ' ;
48
- const ATTRIBUTE_MESSAGING_MESSAGE_ID = 'messaging.message_id ' ;
49
45
const ATTRIBUTE_MESSAGING_MESSAGE = 'messaging.message ' ;
50
46
const ATTRIBUTE_MESSAGING_BUS = 'messaging.symfony.bus ' ;
51
47
const ATTRIBUTE_MESSAGING_HANDLER = 'messaging.symfony.handler ' ;
@@ -54,10 +50,11 @@ final class MessengerInstrumentation
54
50
const ATTRIBUTE_MESSAGING_DELAY = 'messaging.symfony.delay ' ;
55
51
const ATTRIBUTE_MESSAGING_RETRY_COUNT = 'messaging.symfony.retry_count ' ;
56
52
const ATTRIBUTE_MESSAGING_STAMPS = 'messaging.symfony.stamps ' ;
53
+ const ATTRIBUTE_MESSAGING_MIDDLEWARE = 'symfony.messenger.middleware ' ;
54
+ const ATTRIBUTE_MESSAGING_CONSUMED_BY_WORKER = 'messaging.symfony.consumed_by_worker ' ;
57
55
58
56
// Constants used in tests
59
57
const ATTRIBUTE_MESSENGER_BUS = self ::ATTRIBUTE_MESSAGING_BUS ;
60
- const ATTRIBUTE_MESSENGER_TRANSPORT = self ::ATTRIBUTE_MESSAGING_DESTINATION ;
61
58
const ATTRIBUTE_MESSENGER_MESSAGE = self ::ATTRIBUTE_MESSAGING_MESSAGE ;
62
59
63
60
/** @psalm-suppress PossiblyUnusedMethod */
@@ -96,8 +93,8 @@ public static function register(): void
96
93
->setAttribute (TraceAttributes::CODE_NAMESPACE , $ class )
97
94
->setAttribute (TraceAttributes::CODE_FILEPATH , $ filename )
98
95
->setAttribute (TraceAttributes::CODE_LINE_NUMBER , $ lineno )
99
- ->setAttribute (self :: ATTRIBUTE_MESSAGING_SYSTEM , 'symfony ' )
100
- ->setAttribute (self :: ATTRIBUTE_MESSAGING_OPERATION , 'dispatch ' )
96
+ ->setAttribute (TraceAttributes:: MESSAGING_SYSTEM , 'symfony ' )
97
+ ->setAttribute (TraceAttributes:: MESSAGING_OPERATION_TYPE , 'dispatch ' )
101
98
->setAttribute (self ::ATTRIBUTE_MESSAGING_MESSAGE , $ messageClass )
102
99
->setAttribute (self ::ATTRIBUTE_MESSAGING_BUS , $ class )
103
100
;
@@ -174,10 +171,10 @@ public static function register(): void
174
171
->setAttribute (TraceAttributes::CODE_NAMESPACE , $ class )
175
172
->setAttribute (TraceAttributes::CODE_FILEPATH , $ filename )
176
173
->setAttribute (TraceAttributes::CODE_LINE_NUMBER , $ lineno )
177
- ->setAttribute (self :: ATTRIBUTE_MESSAGING_SYSTEM , 'symfony ' )
178
- ->setAttribute (self :: ATTRIBUTE_MESSAGING_OPERATION , 'send ' )
174
+ ->setAttribute (TraceAttributes:: MESSAGING_SYSTEM , 'symfony ' )
175
+ ->setAttribute (TraceAttributes:: MESSAGING_OPERATION_TYPE , 'send ' )
179
176
->setAttribute (self ::ATTRIBUTE_MESSAGING_MESSAGE , $ messageClass )
180
- ->setAttribute (self :: ATTRIBUTE_MESSAGING_DESTINATION , $ class )
177
+ ->setAttribute (TraceAttributes:: MESSAGING_DESTINATION_NAME , $ class )
181
178
;
182
179
183
180
self ::addMessageStampsToSpan ($ builder , $ envelope );
@@ -257,9 +254,9 @@ public static function register(): void
257
254
->setAttribute (TraceAttributes::CODE_NAMESPACE , $ class )
258
255
->setAttribute (TraceAttributes::CODE_FILEPATH , $ filename )
259
256
->setAttribute (TraceAttributes::CODE_LINE_NUMBER , $ lineno )
260
- ->setAttribute (self :: ATTRIBUTE_MESSAGING_SYSTEM , 'symfony ' )
261
- ->setAttribute (self :: ATTRIBUTE_MESSAGING_OPERATION , 'receive ' )
262
- ->setAttribute (self :: ATTRIBUTE_MESSAGING_DESTINATION , $ transportName )
257
+ ->setAttribute (TraceAttributes:: MESSAGING_SYSTEM , 'symfony ' )
258
+ ->setAttribute (TraceAttributes:: MESSAGING_OPERATION_TYPE , 'receive ' )
259
+ ->setAttribute (TraceAttributes:: MESSAGING_DESTINATION_NAME , $ transportName )
263
260
;
264
261
265
262
self ::addMessageStampsToSpan ($ builder , $ envelope );
@@ -324,8 +321,8 @@ public static function register(): void
324
321
->setAttribute (TraceAttributes::CODE_NAMESPACE , $ class )
325
322
->setAttribute (TraceAttributes::CODE_FILEPATH , $ filename )
326
323
->setAttribute (TraceAttributes::CODE_LINE_NUMBER , $ lineno )
327
- ->setAttribute (self :: ATTRIBUTE_MESSAGING_SYSTEM , 'symfony ' )
328
- ->setAttribute (self :: ATTRIBUTE_MESSAGING_OPERATION , 'process ' )
324
+ ->setAttribute (TraceAttributes:: MESSAGING_SYSTEM , 'symfony ' )
325
+ ->setAttribute (TraceAttributes:: MESSAGING_OPERATION_TYPE , 'process ' )
329
326
->setAttribute (self ::ATTRIBUTE_MESSAGING_MESSAGE , $ messageClass )
330
327
;
331
328
@@ -391,8 +388,8 @@ public static function register(): void
391
388
->setAttribute (TraceAttributes::CODE_NAMESPACE , $ class )
392
389
->setAttribute (TraceAttributes::CODE_FILEPATH , $ filename )
393
390
->setAttribute (TraceAttributes::CODE_LINE_NUMBER , $ lineno )
394
- ->setAttribute (self :: ATTRIBUTE_MESSAGING_SYSTEM , 'symfony ' )
395
- ->setAttribute (self :: ATTRIBUTE_MESSAGING_OPERATION , 'process ' )
391
+ ->setAttribute (TraceAttributes:: MESSAGING_SYSTEM , 'symfony ' )
392
+ ->setAttribute (TraceAttributes:: MESSAGING_OPERATION_TYPE , 'process ' )
396
393
->setAttribute (self ::ATTRIBUTE_MESSAGING_MESSAGE , $ messageClass )
397
394
->setAttribute (self ::ATTRIBUTE_MESSAGING_HANDLER , $ handlerClass )
398
395
;
@@ -460,10 +457,10 @@ public static function register(): void
460
457
->setAttribute (TraceAttributes::CODE_NAMESPACE , $ class )
461
458
->setAttribute (TraceAttributes::CODE_FILEPATH , $ filename )
462
459
->setAttribute (TraceAttributes::CODE_LINE_NUMBER , $ lineno )
463
- ->setAttribute (self :: ATTRIBUTE_MESSAGING_SYSTEM , 'symfony ' )
464
- ->setAttribute (self :: ATTRIBUTE_MESSAGING_OPERATION , 'middleware ' )
460
+ ->setAttribute (TraceAttributes:: MESSAGING_SYSTEM , 'symfony ' )
461
+ ->setAttribute (TraceAttributes:: MESSAGING_OPERATION_TYPE , 'middleware ' )
465
462
->setAttribute (self ::ATTRIBUTE_MESSAGING_MESSAGE , $ messageClass )
466
- ->setAttribute (' messaging.symfony.middleware ' , $ middlewareClass )
463
+ ->setAttribute (self :: ATTRIBUTE_MESSAGING_MIDDLEWARE , $ middlewareClass )
467
464
;
468
465
469
466
self ::addMessageStampsToSpan ($ builder , $ envelope );
@@ -514,14 +511,14 @@ private static function addMessageStampsToSpan(SpanBuilderInterface $builder, En
514
511
$ sentStamp = $ envelope ->last (SentStamp::class);
515
512
$ transportMessageIdStamp = $ envelope ->last (TransportMessageIdStamp::class);
516
513
517
- $ messageClass = \get_class ($ envelope ->getMessage ());
518
- $ transportName = null ;
519
- $ operation = null ;
520
-
521
514
if ($ busStamp ) {
522
515
$ builder ->setAttribute (self ::ATTRIBUTE_MESSAGING_BUS , $ busStamp ->getBusName ());
523
516
}
524
517
518
+ if ($ consumedByWorkerStamp ) {
519
+ $ builder ->setAttribute (self ::ATTRIBUTE_MESSAGING_CONSUMED_BY_WORKER , true );
520
+ }
521
+
525
522
if ($ handledStamp ) {
526
523
$ builder ->setAttribute (self ::ATTRIBUTE_MESSAGING_HANDLER , $ handledStamp ->getHandlerName ());
527
524
}
@@ -533,17 +530,13 @@ private static function addMessageStampsToSpan(SpanBuilderInterface $builder, En
533
530
534
531
if ($ sentStamp ) {
535
532
$ builder ->setAttribute (self ::ATTRIBUTE_MESSAGING_SENDER , $ sentStamp ->getSenderClass ());
536
- $ builder ->setAttribute (self ::ATTRIBUTE_MESSAGING_DESTINATION , $ sentStamp ->getSenderAlias ());
537
- $ transportName = $ sentStamp ->getSenderAlias ();
538
- $ operation = 'send ' ;
533
+ $ builder ->setAttribute (TraceAttributes::MESSAGING_DESTINATION_NAME , $ sentStamp ->getSenderAlias ());
539
534
} elseif ($ receivedStamp ) {
540
- $ builder ->setAttribute (self ::ATTRIBUTE_MESSAGING_DESTINATION , $ receivedStamp ->getTransportName ());
541
- $ transportName = $ receivedStamp ->getTransportName ();
542
- $ operation = 'receive ' ;
535
+ $ builder ->setAttribute (TraceAttributes::MESSAGING_DESTINATION_NAME , $ receivedStamp ->getTransportName ());
543
536
}
544
537
545
538
if ($ transportMessageIdStamp ) {
546
- $ builder ->setAttribute (self :: ATTRIBUTE_MESSAGING_MESSAGE_ID , $ transportMessageIdStamp ->getId ());
539
+ $ builder ->setAttribute (TraceAttributes:: MESSAGING_MESSAGE_ID , $ transportMessageIdStamp ->getId ());
547
540
}
548
541
549
542
if ($ delayStamp ) {
@@ -564,7 +557,7 @@ private static function addMessageStampsToSpan(SpanBuilderInterface $builder, En
564
557
/** @var \Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsReceivedStamp|null $amazonSqsReceivedStamp */
565
558
$ amazonSqsReceivedStamp = $ envelope ->last ('Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsReceivedStamp ' );
566
559
if ($ amazonSqsReceivedStamp && !$ transportMessageIdStamp && method_exists ($ amazonSqsReceivedStamp , 'getId ' )) {
567
- $ builder ->setAttribute (self :: ATTRIBUTE_MESSAGING_MESSAGE_ID , $ amazonSqsReceivedStamp ->getId ());
560
+ $ builder ->setAttribute (TraceAttributes:: MESSAGING_MESSAGE_ID , $ amazonSqsReceivedStamp ->getId ());
568
561
}
569
562
}
570
563
}
0 commit comments