From 7608585f86036e326fed7f152d2b8e51ffdb9649 Mon Sep 17 00:00:00 2001 From: Anuraag Agrawal Date: Fri, 21 Mar 2025 11:16:02 +0900 Subject: [PATCH 1/3] feat(instrumentation-aws-sdk): add gen ai conventions for converse stream span --- .../src/aws-sdk.ts | 10 ++- .../src/services/ServiceExtension.ts | 7 ++- .../src/services/ServicesExtensions.ts | 2 +- .../src/services/bedrock-runtime.ts | 61 +++++++++++++++++-- .../test/bedrock-runtime.test.ts | 55 +++++++++++++++++ ...conversestream-adds-genai-conventions.json | 42 +++++++++++++ 6 files changed, 167 insertions(+), 10 deletions(-) create mode 100644 plugins/node/opentelemetry-instrumentation-aws-sdk/test/mock-responses/bedrock-conversestream-adds-genai-conventions.json diff --git a/plugins/node/opentelemetry-instrumentation-aws-sdk/src/aws-sdk.ts b/plugins/node/opentelemetry-instrumentation-aws-sdk/src/aws-sdk.ts index f894e28458..63c5c635bd 100644 --- a/plugins/node/opentelemetry-instrumentation-aws-sdk/src/aws-sdk.ts +++ b/plugins/node/opentelemetry-instrumentation-aws-sdk/src/aws-sdk.ts @@ -400,12 +400,16 @@ export class AwsInstrumentation extends InstrumentationBase { - span.end(); + if (!requestMetadata.isStream) { + span.end(); + } }); promiseWithResponseLogic .then(res => { diff --git a/plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/ServiceExtension.ts b/plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/ServiceExtension.ts index 95f89bdac5..836ac0ddc6 100644 --- a/plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/ServiceExtension.ts +++ b/plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/ServiceExtension.ts @@ -29,6 +29,10 @@ import { export interface RequestMetadata { // isIncoming - if true, then the operation callback / promise should be bind with the operation's span isIncoming: boolean; + // isStream - if true, then the response is a stream so the span should not be ended by the middleware. + // the ServiceExtension must end the span itself, generally by wrapping the stream and ending after it is + // consumed. + isStream?: boolean; spanAttributes?: SpanAttributes; spanKind?: SpanKind; spanName?: string; @@ -45,10 +49,11 @@ export interface ServiceExtension { // called before request is sent, and after span is started requestPostSpanHook?: (request: NormalizedRequest) => void; + // called after response is received. If value is returned, it replaces the response output. responseHook?: ( response: NormalizedResponse, span: Span, tracer: Tracer, config: AwsSdkInstrumentationConfig - ) => void; + ) => any | undefined; } diff --git a/plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/ServicesExtensions.ts b/plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/ServicesExtensions.ts index edf18bdaea..23c1e2aa59 100644 --- a/plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/ServicesExtensions.ts +++ b/plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/ServicesExtensions.ts @@ -67,6 +67,6 @@ export class ServicesExtensions implements ServiceExtension { config: AwsSdkInstrumentationConfig ) { const serviceExtension = this.services.get(response.request.serviceName); - serviceExtension?.responseHook?.(response, span, tracer, config); + return serviceExtension?.responseHook?.(response, span, tracer, config); } } diff --git a/plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/bedrock-runtime.ts b/plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/bedrock-runtime.ts index 3c3e88c813..cd1cd830a9 100644 --- a/plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/bedrock-runtime.ts +++ b/plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/bedrock-runtime.ts @@ -34,6 +34,10 @@ import { NormalizedRequest, NormalizedResponse, } from '../types'; +import { + ConverseStreamOutput, + TokenUsage, +} from '@aws-sdk/client-bedrock-runtime'; export class BedrockRuntimeServiceExtension implements ServiceExtension { requestPreSpanHook( @@ -43,7 +47,9 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension { ): RequestMetadata { switch (request.commandName) { case 'Converse': - return this.requestPreSpanHookConverse(request, config, diag); + return this.requestPreSpanHookConverse(request, config, diag, false); + case 'ConverseStream': + return this.requestPreSpanHookConverse(request, config, diag, true); } return { @@ -54,7 +60,8 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension { private requestPreSpanHookConverse( request: NormalizedRequest, config: AwsSdkInstrumentationConfig, - diag: DiagLogger + diag: DiagLogger, + isStream: boolean ): RequestMetadata { let spanName = GEN_AI_OPERATION_NAME_VALUE_CHAT; const spanAttributes: Attributes = { @@ -90,6 +97,7 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension { return { spanName, isIncoming: false, + isStream, spanAttributes, }; } @@ -107,6 +115,8 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension { switch (response.request.commandName) { case 'Converse': return this.responseHookConverse(response, span, tracer, config); + case 'ConverseStream': + return this.responseHookConverseStream(response, span, tracer, config); } } @@ -117,6 +127,49 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension { config: AwsSdkInstrumentationConfig ) { const { stopReason, usage } = response.data; + + BedrockRuntimeServiceExtension.setStopReason(span, stopReason); + BedrockRuntimeServiceExtension.setUsage(span, usage); + } + + private responseHookConverseStream( + response: NormalizedResponse, + span: Span, + tracer: Tracer, + config: AwsSdkInstrumentationConfig + ) { + return { + ...response.data, + stream: this.wrapConverseStreamResponse(response.data.stream, span), + }; + } + + private async *wrapConverseStreamResponse( + response: AsyncIterable, + span: Span + ) { + try { + for await (const item of response) { + BedrockRuntimeServiceExtension.setStopReason( + span, + item.messageStop?.stopReason + ); + BedrockRuntimeServiceExtension.setUsage(span, item.metadata?.usage); + yield item; + } + } finally { + span.end(); + } + } + + private static setStopReason(span: Span, stopReason: string | undefined) { + if (stopReason !== undefined) { + console.log(stopReason); + span.setAttribute(ATTR_GEN_AI_RESPONSE_FINISH_REASONS, [stopReason]); + } + } + + private static setUsage(span: Span, usage: TokenUsage | undefined) { if (usage) { const { inputTokens, outputTokens } = usage; if (inputTokens !== undefined) { @@ -126,9 +179,5 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension { span.setAttribute(ATTR_GEN_AI_USAGE_OUTPUT_TOKENS, outputTokens); } } - - if (stopReason !== undefined) { - span.setAttribute(ATTR_GEN_AI_RESPONSE_FINISH_REASONS, [stopReason]); - } } } diff --git a/plugins/node/opentelemetry-instrumentation-aws-sdk/test/bedrock-runtime.test.ts b/plugins/node/opentelemetry-instrumentation-aws-sdk/test/bedrock-runtime.test.ts index 43bf1abd47..23271562b7 100644 --- a/plugins/node/opentelemetry-instrumentation-aws-sdk/test/bedrock-runtime.test.ts +++ b/plugins/node/opentelemetry-instrumentation-aws-sdk/test/bedrock-runtime.test.ts @@ -39,6 +39,7 @@ import { ConverseCommand, ConversationRole, InvokeModelCommand, + ConverseStreamCommand, } from '@aws-sdk/client-bedrock-runtime'; import { AwsCredentialIdentity } from '@aws-sdk/types'; import * as path from 'path'; @@ -154,6 +155,60 @@ describe('Bedrock', () => { }); }); + describe('ConverseStream', () => { + it('adds genai conventions', async () => { + const modelId = 'amazon.titan-text-lite-v1'; + const messages = [ + { + role: ConversationRole.USER, + content: [{ text: 'Say this is a test' }], + }, + ]; + const inferenceConfig = { + maxTokens: 10, + temperature: 0.8, + topP: 1, + stopSequences: ['|'], + }; + + const command = new ConverseStreamCommand({ + modelId, + messages, + inferenceConfig, + }); + + const response = await client.send(command); + const chunks: string[] = []; + for await (const item of response.stream!) { + const text = item.contentBlockDelta?.delta?.text; + if (text) { + chunks.push(text); + } + } + expect(chunks.join('')).toBe('Hi! How are you? How'); + + const testSpans: ReadableSpan[] = getTestSpans(); + const converseSpans: ReadableSpan[] = testSpans.filter( + (s: ReadableSpan) => { + return s.name === 'chat amazon.titan-text-lite-v1'; + } + ); + expect(converseSpans.length).toBe(1); + expect(converseSpans[0].attributes).toMatchObject({ + [ATTR_GEN_AI_SYSTEM]: GEN_AI_SYSTEM_VALUE_AWS_BEDROCK, + [ATTR_GEN_AI_OPERATION_NAME]: GEN_AI_OPERATION_NAME_VALUE_CHAT, + [ATTR_GEN_AI_REQUEST_MODEL]: modelId, + [ATTR_GEN_AI_REQUEST_MAX_TOKENS]: 10, + [ATTR_GEN_AI_REQUEST_TEMPERATURE]: 0.8, + [ATTR_GEN_AI_REQUEST_TOP_P]: 1, + [ATTR_GEN_AI_REQUEST_STOP_SEQUENCES]: ['|'], + [ATTR_GEN_AI_USAGE_INPUT_TOKENS]: 8, + [ATTR_GEN_AI_USAGE_OUTPUT_TOKENS]: 10, + [ATTR_GEN_AI_RESPONSE_FINISH_REASONS]: ['max_tokens'], + }); + }); + }); + // TODO: Instrument InvokeModel describe('InvokeModel', () => { it('does not currently add genai conventions', async () => { diff --git a/plugins/node/opentelemetry-instrumentation-aws-sdk/test/mock-responses/bedrock-conversestream-adds-genai-conventions.json b/plugins/node/opentelemetry-instrumentation-aws-sdk/test/mock-responses/bedrock-conversestream-adds-genai-conventions.json new file mode 100644 index 0000000000..2fd8efdee3 --- /dev/null +++ b/plugins/node/opentelemetry-instrumentation-aws-sdk/test/mock-responses/bedrock-conversestream-adds-genai-conventions.json @@ -0,0 +1,42 @@ +[ + { + "scope": "https://bedrock-runtime.us-east-1.amazonaws.com:443", + "method": "POST", + "path": "/model/amazon.titan-text-lite-v1/converse-stream", + "body": { + "inferenceConfig": { + "maxTokens": 10, + "stopSequences": [ + "|" + ], + "temperature": 0.8, + "topP": 1 + }, + "messages": [ + { + "content": [ + { + "text": "Say this is a test" + } + ], + "role": "user" + } + ] + }, + "status": 200, + "response": "00000081000000526cc176930b3a6576656e742d7479706507000c6d65737361676553746172740d3a636f6e74656e742d747970650700106170706c69636174696f6e2f6a736f6e0d3a6d6573736167652d747970650700056576656e747b2270223a2261626364222c22726f6c65223a22617373697374616e74227df512a005000000c600000057f67806450b3a6576656e742d74797065070011636f6e74656e74426c6f636b44656c74610d3a636f6e74656e742d747970650700106170706c69636174696f6e2f6a736f6e0d3a6d6573736167652d747970650700056576656e747b22636f6e74656e74426c6f636b496e646578223a302c2264656c7461223a7b2274657874223a2248692120486f772061726520796f753f20486f77227d2c2270223a226162636465666768696a6b6c6d6e6f70717273747576777879227da88f22a40000009500000056fecc83c80b3a6576656e742d74797065070010636f6e74656e74426c6f636b53746f700d3a636f6e74656e742d747970650700106170706c69636174696f6e2f6a736f6e0d3a6d6573736167652d747970650700056576656e747b22636f6e74656e74426c6f636b496e646578223a302c2270223a226162636465666768696a6b6c6d6e6f7071227d8dc2956d00000097000000511a68450b0b3a6576656e742d7479706507000b6d65737361676553746f700d3a636f6e74656e742d747970650700106170706c69636174696f6e2f6a736f6e0d3a6d6573736167652d747970650700056576656e747b2270223a226162636465666768696a6b6c6d6e6f7071727374222c2273746f70526561736f6e223a226d61785f746f6b656e73227ddb5bf387000000ce0000004ea263e5440b3a6576656e742d747970650700086d657461646174610d3a636f6e74656e742d747970650700106170706c69636174696f6e2f6a736f6e0d3a6d6573736167652d747970650700056576656e747b226d657472696373223a7b226c6174656e63794d73223a3736357d2c2270223a226162636465666768696a6b6c6d6e6f222c227573616765223a7b22696e707574546f6b656e73223a382c226f7574707574546f6b656e73223a31302c22746f74616c546f6b656e73223a31387d7d98eada7f", + "rawHeaders": [ + "Date", + "Fri, 21 Mar 2025 02:04:20 GMT", + "Content-Type", + "application/vnd.amazon.eventstream", + "Transfer-Encoding", + "chunked", + "Connection", + "keep-alive", + "x-amzn-RequestId", + "c01898c3-00ef-43e3-b015-e7458e9afc84" + ], + "responseIsBinary": true + } +] \ No newline at end of file From 938e1c219a523180462b2fb8a2aaf133d4135879 Mon Sep 17 00:00:00 2001 From: Anuraag Agrawal Date: Fri, 21 Mar 2025 12:13:34 +0900 Subject: [PATCH 2/3] Non-automatic style fix --- .../test/bedrock-runtime.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/node/opentelemetry-instrumentation-aws-sdk/test/bedrock-runtime.test.ts b/plugins/node/opentelemetry-instrumentation-aws-sdk/test/bedrock-runtime.test.ts index 23271562b7..676303e992 100644 --- a/plugins/node/opentelemetry-instrumentation-aws-sdk/test/bedrock-runtime.test.ts +++ b/plugins/node/opentelemetry-instrumentation-aws-sdk/test/bedrock-runtime.test.ts @@ -37,9 +37,9 @@ registerInstrumentationTesting(new AwsInstrumentation()); import { BedrockRuntimeClient, ConverseCommand, + ConverseStreamCommand, ConversationRole, InvokeModelCommand, - ConverseStreamCommand, } from '@aws-sdk/client-bedrock-runtime'; import { AwsCredentialIdentity } from '@aws-sdk/types'; import * as path from 'path'; From 22ea86d2bfbe729a97f36e56880c2532935566bd Mon Sep 17 00:00:00 2001 From: Anuraag Agrawal Date: Thu, 10 Apr 2025 11:24:30 +0900 Subject: [PATCH 3/3] import type --- .../src/services/bedrock-runtime.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/bedrock-runtime.ts b/plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/bedrock-runtime.ts index cd1cd830a9..3b7fe95b6b 100644 --- a/plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/bedrock-runtime.ts +++ b/plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/bedrock-runtime.ts @@ -34,7 +34,7 @@ import { NormalizedRequest, NormalizedResponse, } from '../types'; -import { +import type { ConverseStreamOutput, TokenUsage, } from '@aws-sdk/client-bedrock-runtime';