Skip to content

Commit 28e2113

Browse files
authored
feat: config option to extract sqs context from message payload (open-telemetry#737)
1 parent 82ebc49 commit 28e2113

6 files changed

Lines changed: 147 additions & 2 deletions

File tree

plugins/node/opentelemetry-instrumentation-aws-sdk/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ aws-sdk instrumentation has few options available to choose from. You can set th
4848
| `responseHook` | `AwsSdkResponseCustomAttributeFunction` | Hook for adding custom attributes when response is received from aws. |
4949
| `sqsProcessHook` | `AwsSdkSqsProcessCustomAttributeFunction` | Hook called after starting sqs `process` span (for each sqs received message), which allow to add custom attributes to it. |
5050
| `suppressInternalInstrumentation` | `boolean` | Most aws operation use http requests under the hood. Set this to `true` to hide all underlying http spans. |
51+
| `sqsExtractContextPropagationFromPayload` | `boolean` | Will parse and extract context propagation headers from SQS Payload, false by default. [When should it be used?](./doc/sns.md#integration-with-sqs)|
5152

5253
## Span Attributes
5354

plugins/node/opentelemetry-instrumentation-aws-sdk/doc/sns.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,15 @@ The following methods are automatically enhanced:
1313

1414
### Consumers
1515
There are many potential consumers: SQS, Lambda, HTTP/S, Email, SMS, mobile notifications. each one of them will received the propagated context in its own way.
16+
17+
18+
## Integration with SQS
19+
AWS provide two ways of integrating SNS and SQS, one sends the message "as is" and one being parsed, this is called raw message delivery.
20+
21+
When it is turn off (by default) message attributes (sent in SNS) will appear in the payload of SQS, if it turned on the payload will be parsed before sent to SQS and the SNS attributes will be mapped to SQS Message attribute which allow this instrumentation to have propagated context works out-of-the-box.
22+
23+
If raw message delivery is turned off, you can solve it by enabling `sqsExtractContextPropagationFromPayload`, it will extract the context from the payload. It does have some performance affect as the instrumentation will run `JSON.parse` to get the data.
24+
25+
More details about raw message deliver can be found in [AWS docs](https://docs.aws.amazon.com/sns/latest/dg/sns-large-payload-raw-message-delivery.html)
26+
27+
>If you see partial / broken traces when integrating SNS with SQS this might be the reason

plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/MessageAttributes.ts

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import {
2121
diag,
2222
} from '@opentelemetry/api';
2323
import type { SQS, SNS } from 'aws-sdk';
24+
import type { MessageBodyAttributeMap } from 'aws-sdk/clients/sqs';
2425

2526
// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-quotas.html
2627
export const MAX_MESSAGE_ATTRIBUTES = 10;
@@ -73,3 +74,26 @@ export const injectPropagationContext = (
7374
}
7475
return attributes;
7576
};
77+
78+
export const extractPropagationContext = (
79+
message: SQS.Message,
80+
sqsExtractContextPropagationFromPayload: boolean | undefined
81+
): MessageBodyAttributeMap | undefined => {
82+
const propagationFields = propagation.fields();
83+
const hasPropagationFields = Object.keys(
84+
message.MessageAttributes || []
85+
).some(attr => propagationFields.includes(attr));
86+
if (hasPropagationFields) {
87+
return message.MessageAttributes;
88+
} else if (sqsExtractContextPropagationFromPayload && message.Body) {
89+
try {
90+
const payload = JSON.parse(message.Body);
91+
return payload.MessageAttributes;
92+
} catch {
93+
diag.debug(
94+
'failed to parse SQS payload to extract context propagation, trace might be incomplete.'
95+
);
96+
}
97+
}
98+
return undefined;
99+
};

plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/sqs.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,11 @@ import {
3434
MessagingDestinationKindValues,
3535
SemanticAttributes,
3636
} from '@opentelemetry/semantic-conventions';
37-
import { contextGetter, injectPropagationContext } from './MessageAttributes';
37+
import {
38+
contextGetter,
39+
extractPropagationContext,
40+
injectPropagationContext,
41+
} from './MessageAttributes';
3842

3943
export class SqsServiceExtension implements ServiceExtension {
4044
requestPreSpanHook(request: NormalizedRequest): RequestMetadata {
@@ -128,7 +132,10 @@ export class SqsServiceExtension implements ServiceExtension {
128132
name: queueName ?? 'unknown',
129133
parentContext: propagation.extract(
130134
ROOT_CONTEXT,
131-
message.MessageAttributes,
135+
extractPropagationContext(
136+
message,
137+
config.sqsExtractContextPropagationFromPayload
138+
),
132139
contextGetter
133140
),
134141
attributes: {

plugins/node/opentelemetry-instrumentation-aws-sdk/src/types.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,4 +80,15 @@ export interface AwsSdkInstrumentationConfig extends InstrumentationConfig {
8080
* effectively causing those http spans to be non-recordable.
8181
*/
8282
suppressInternalInstrumentation?: boolean;
83+
84+
/**
85+
* In some cases the context propagation headers may be found in the message payload
86+
* rather than the message attribute.
87+
* When this field is turned on the instrumentation will parse the payload and extract the
88+
* context from there.
89+
* Even if the field is on and MessageAttribute contains context propagation field are present,
90+
* the MessageAttribute will get priority.
91+
* By default it is off.
92+
*/
93+
sqsExtractContextPropagationFromPayload?: boolean;
8394
}

plugins/node/opentelemetry-instrumentation-aws-sdk/test/sqs.test.ts

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,19 @@ import { ReadableSpan } from '@opentelemetry/sdk-trace-base';
3636
import { mockV2AwsSend } from './testing-utils';
3737
import { Message } from 'aws-sdk/clients/sqs';
3838
import * as expect from 'expect';
39+
import * as sinon from 'sinon';
40+
import * as messageAttributes from '../src/services/MessageAttributes';
3941

4042
const responseMockSuccess = {
4143
requestId: '0000000000000',
4244
error: null,
4345
};
4446

47+
const extractContextSpy = sinon.spy(
48+
messageAttributes,
49+
'extractPropagationContext'
50+
);
51+
4552
describe('SQS', () => {
4653
before(() => {
4754
AWS.config.credentials = {
@@ -429,4 +436,87 @@ describe('SQS', () => {
429436
expect(processSpans[1].status.code).toStrictEqual(SpanStatusCode.UNSET);
430437
});
431438
});
439+
440+
describe('extract payload', () => {
441+
beforeEach(() => {
442+
extractContextSpy.resetHistory();
443+
});
444+
it('should not extract from payload even if set', async () => {
445+
mockV2AwsSend(responseMockSuccess, {
446+
Messages: [{ Body: JSON.stringify({ traceparent: 1 }) }],
447+
} as AWS.SQS.Types.ReceiveMessageResult);
448+
449+
const sqs = new AWS.SQS();
450+
await sqs
451+
.receiveMessage({
452+
QueueUrl: 'queue/url/for/unittests1',
453+
})
454+
.promise();
455+
expect(extractContextSpy.returnValues[0]?.traceparent).toBeUndefined();
456+
});
457+
458+
it('should extract from payload', async () => {
459+
const traceparent = {
460+
traceparent: {
461+
StringValue:
462+
'00-a1d050b7c8ad93c405e7a0d94cda5b03-23a485dc98b24027-01',
463+
DataType: 'String',
464+
},
465+
};
466+
instrumentation.setConfig({
467+
sqsExtractContextPropagationFromPayload: true,
468+
});
469+
mockV2AwsSend(responseMockSuccess, {
470+
Messages: [
471+
{ Body: JSON.stringify({ MessageAttributes: { traceparent } }) },
472+
],
473+
} as AWS.SQS.Types.ReceiveMessageResult);
474+
475+
const sqs = new AWS.SQS();
476+
await sqs
477+
.receiveMessage({
478+
QueueUrl: 'queue/url/for/unittests',
479+
})
480+
.promise();
481+
482+
expect(extractContextSpy.returnValues[0]?.traceparent).toStrictEqual(
483+
traceparent
484+
);
485+
});
486+
487+
it('should not extract from payload but from attributes', async () => {
488+
const traceparentInPayload = 'some-trace-parent-value';
489+
const traceparentInMessageAttributes = {
490+
traceparent: {
491+
StringValue:
492+
'00-a1d050b7c8ad93c405e7a0d94cda5b03-23a485dc98b24027-01',
493+
DataType: 'String',
494+
},
495+
};
496+
instrumentation.setConfig({
497+
sqsExtractContextPropagationFromPayload: false,
498+
});
499+
mockV2AwsSend(responseMockSuccess, {
500+
Messages: [
501+
{
502+
MessageAttributes: traceparentInMessageAttributes,
503+
Body: JSON.stringify({
504+
MessageAttributes: { traceparentInPayload },
505+
}),
506+
},
507+
],
508+
} as AWS.SQS.Types.ReceiveMessageResult);
509+
510+
const sqs = new AWS.SQS();
511+
await sqs
512+
.receiveMessage({
513+
QueueUrl: 'queue/url/for/unittests',
514+
})
515+
.promise();
516+
517+
expect(extractContextSpy.returnValues[0]).toBe(
518+
traceparentInMessageAttributes
519+
);
520+
});
521+
});
432522
});

0 commit comments

Comments
 (0)