From 2c3fa5776908ae69ce53c7c6e0b1b93bd04854fd Mon Sep 17 00:00:00 2001 From: Luke Zhang Date: Thu, 6 Mar 2025 11:13:26 -0800 Subject: [PATCH] feat: Add instrumentation for Lambda Java interface HandleStreamRequest Problem: AWS Lambda for Java provides two handler interfaces: com.amazonaws.services.lambda.runtime.RequestHandler com.amazonaws.services.lambda.runtime.RequestStreamHandler However, instrumentation for RequestStreamHandler is missing in the OpenTelemetry Java agent. Fix: Added instrumentation support for RequestStreamHandler. Test: Pass with green: ./gradlew spotlessCheck ./gradlew clean assemble ./gradlew instrumentation:test ./gradlew :smoke-tests:test Manual end-to-end tests pass: - Deployed Lambda functions with Spring Boot 3 and Amazon Serverless Java Container. - Enabled application signals, observed broken traces. - Disabled application signals and added a private build of the Java layer for Lambda with this change. - Verified traces and spans are now correct. Backward Compatibility: No risk of breaking existing functionality. The change only adds instrumentation for RequestStreamHandler without modifying existing behavior for RequestHandler. Existing users not using RequestStreamHandler remain unaffected. --- .../v1_0/AwsLambdaInstrumentationModule.java | 7 +- ...daRequestStreamHandlerInstrumentation.java | 97 +++++++++++++++ .../v1_0/AwsLambdaStreamHandlerTest.java | 113 ++++++++++++++++++ .../v2_2/AwsLambdaInstrumentationModule.java | 6 +- ...daRequestStreamHandlerInstrumentation.java | 92 ++++++++++++++ .../v2_2/AwsLambdaStreamHandlerTest.java | 113 ++++++++++++++++++ 6 files changed, 424 insertions(+), 4 deletions(-) create mode 100644 instrumentation/aws-lambda/aws-lambda-core-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/awslambdacore/v1_0/AwsLambdaRequestStreamHandlerInstrumentation.java create mode 100644 instrumentation/aws-lambda/aws-lambda-core-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/awslambdacore/v1_0/AwsLambdaStreamHandlerTest.java create mode 100644 instrumentation/aws-lambda/aws-lambda-events-2.2/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/awslambdaevents/v2_2/AwsLambdaRequestStreamHandlerInstrumentation.java create mode 100644 instrumentation/aws-lambda/aws-lambda-events-2.2/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/awslambdaevents/v2_2/AwsLambdaStreamHandlerTest.java diff --git a/instrumentation/aws-lambda/aws-lambda-core-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/awslambdacore/v1_0/AwsLambdaInstrumentationModule.java b/instrumentation/aws-lambda/aws-lambda-core-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/awslambdacore/v1_0/AwsLambdaInstrumentationModule.java index 35d6b70ed68b..b6a305178e00 100644 --- a/instrumentation/aws-lambda/aws-lambda-core-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/awslambdacore/v1_0/AwsLambdaInstrumentationModule.java +++ b/instrumentation/aws-lambda/aws-lambda-core-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/awslambdacore/v1_0/AwsLambdaInstrumentationModule.java @@ -6,17 +6,18 @@ package io.opentelemetry.javaagent.instrumentation.awslambdacore.v1_0; import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed; -import static java.util.Collections.singletonList; import static net.bytebuddy.matcher.ElementMatchers.not; import com.google.auto.service.AutoService; import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import java.util.Arrays; import java.util.List; import net.bytebuddy.matcher.ElementMatcher; @AutoService(InstrumentationModule.class) public class AwsLambdaInstrumentationModule extends InstrumentationModule { + public AwsLambdaInstrumentationModule() { super("aws-lambda-core", "aws-lambda-core-1.0", "aws-lambda"); } @@ -34,6 +35,8 @@ public boolean isHelperClass(String className) { @Override public List typeInstrumentations() { - return singletonList(new AwsLambdaRequestHandlerInstrumentation()); + return Arrays.asList( + new AwsLambdaRequestHandlerInstrumentation(), + new AwsLambdaRequestStreamHandlerInstrumentation()); } } diff --git a/instrumentation/aws-lambda/aws-lambda-core-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/awslambdacore/v1_0/AwsLambdaRequestStreamHandlerInstrumentation.java b/instrumentation/aws-lambda/aws-lambda-core-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/awslambdacore/v1_0/AwsLambdaRequestStreamHandlerInstrumentation.java new file mode 100644 index 000000000000..a6b89d253d89 --- /dev/null +++ b/instrumentation/aws-lambda/aws-lambda-core-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/awslambdacore/v1_0/AwsLambdaRequestStreamHandlerInstrumentation.java @@ -0,0 +1,97 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.awslambdacore.v1_0; + +import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed; +import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.implementsInterface; +import static io.opentelemetry.javaagent.instrumentation.awslambdacore.v1_0.AwsLambdaInstrumentationHelper.flushTimeout; +import static io.opentelemetry.javaagent.instrumentation.awslambdacore.v1_0.AwsLambdaInstrumentationHelper.functionInstrumenter; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.isPublic; +import static net.bytebuddy.matcher.ElementMatchers.nameStartsWith; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.not; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; + +import com.amazonaws.services.lambda.runtime.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.instrumentation.awslambdacore.v1_0.AwsLambdaRequest; +import io.opentelemetry.javaagent.bootstrap.OpenTelemetrySdkAccess; +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import java.io.InputStream; +import java.util.Collections; +import java.util.concurrent.TimeUnit; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; + +public class AwsLambdaRequestStreamHandlerInstrumentation implements TypeInstrumentation { + + @Override + public ElementMatcher classLoaderOptimization() { + return hasClassesNamed("com.amazonaws.services.lambda.runtime.RequestStreamHandler"); + } + + @Override + public ElementMatcher typeMatcher() { + return implementsInterface(named("com.amazonaws.services.lambda.runtime.RequestStreamHandler")) + .and(not(nameStartsWith("com.amazonaws.services.lambda.runtime.api.client"))) + // In Java 8 and Java 11 runtimes, + // AWS Lambda runtime is packaged under `lambdainternal` package. + // But it is `com.amazonaws.services.lambda.runtime.api.client` + // for new runtime likes Java 17 and Java 21. + .and(not(nameStartsWith("lambdainternal"))); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + isMethod() + .and(isPublic()) + .and(named("handleRequest")) + .and(takesArgument(2, named("com.amazonaws.services.lambda.runtime.Context"))), + AwsLambdaRequestStreamHandlerInstrumentation.class.getName() + "$HandleRequestAdvice"); + } + + @SuppressWarnings("unused") + public static class HandleRequestAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void onEnter( + @Advice.Argument(0) InputStream input, + @Advice.Argument(2) Context context, + @Advice.Local("otelInput") AwsLambdaRequest otelInput, + @Advice.Local("otelContext") io.opentelemetry.context.Context otelContext, + @Advice.Local("otelScope") Scope otelScope) { + + otelInput = AwsLambdaRequest.create(context, input, Collections.emptyMap()); + io.opentelemetry.context.Context parentContext = functionInstrumenter().extract(otelInput); + + if (!functionInstrumenter().shouldStart(parentContext, otelInput)) { + return; + } + + otelContext = functionInstrumenter().start(parentContext, otelInput); + otelScope = otelContext.makeCurrent(); + } + + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void stopSpan( + @Advice.Thrown Throwable throwable, + @Advice.Local("otelInput") AwsLambdaRequest input, + @Advice.Local("otelContext") io.opentelemetry.context.Context functionContext, + @Advice.Local("otelScope") Scope functionScope) { + + if (functionScope != null) { + functionScope.close(); + functionInstrumenter().end(functionContext, input, null, throwable); + } + + OpenTelemetrySdkAccess.forceFlush(flushTimeout().toNanos(), TimeUnit.NANOSECONDS); + } + } +} diff --git a/instrumentation/aws-lambda/aws-lambda-core-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/awslambdacore/v1_0/AwsLambdaStreamHandlerTest.java b/instrumentation/aws-lambda/aws-lambda-core-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/awslambdacore/v1_0/AwsLambdaStreamHandlerTest.java new file mode 100644 index 000000000000..500abd684344 --- /dev/null +++ b/instrumentation/aws-lambda/aws-lambda-core-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/awslambdacore/v1_0/AwsLambdaStreamHandlerTest.java @@ -0,0 +1,113 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.awslambdacore.v1_0; + +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.catchThrowable; +import static org.mockito.Mockito.when; + +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.RequestStreamHandler; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.sdk.trace.data.StatusData; +import io.opentelemetry.semconv.incubating.FaasIncubatingAttributes; +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +public class AwsLambdaStreamHandlerTest { + + @RegisterExtension + public static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + @Mock private Context context; + + @BeforeEach + void setUp() { + when(context.getFunctionName()).thenReturn("my_function"); + when(context.getAwsRequestId()).thenReturn("1-22-333"); + } + + @AfterEach + void tearDown() { + assertThat(testing.forceFlushCalled()).isTrue(); + } + + @Test + void handlerTraced() throws Exception { + InputStream input = new ByteArrayInputStream("hello\n".getBytes(StandardCharsets.UTF_8)); + OutputStream output = new ByteArrayOutputStream(); + RequestStreamHandlerTestImpl handler = new RequestStreamHandlerTestImpl(); + handler.handleRequest(input, output, context); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("my_function") + .hasKind(SpanKind.SERVER) + .hasAttributesSatisfyingExactly( + equalTo(FaasIncubatingAttributes.FAAS_INVOCATION_ID, "1-22-333")))); + } + + @Test + void handlerTracedWithException() { + InputStream input = new ByteArrayInputStream("bye\n".getBytes(StandardCharsets.UTF_8)); + OutputStream output = new ByteArrayOutputStream(); + RequestStreamHandlerTestImpl handler = new RequestStreamHandlerTestImpl(); + + Throwable thrown = catchThrowable(() -> handler.handleRequest(input, output, context)); + assertThat(thrown).isInstanceOf(IllegalArgumentException.class); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("my_function") + .hasKind(SpanKind.SERVER) + .hasStatus(StatusData.error()) + .hasException(thrown) + .hasAttributesSatisfyingExactly( + equalTo(FaasIncubatingAttributes.FAAS_INVOCATION_ID, "1-22-333")))); + } + + static final class RequestStreamHandlerTestImpl implements RequestStreamHandler { + @Override + public void handleRequest(InputStream input, OutputStream output, Context context) + throws IOException { + BufferedReader reader = + new BufferedReader(new InputStreamReader(input, StandardCharsets.UTF_8)); + BufferedWriter writer = + new BufferedWriter(new OutputStreamWriter(output, StandardCharsets.UTF_8)); + String line = reader.readLine(); + if (line.equals("hello")) { + writer.write("world"); + writer.flush(); + writer.close(); + } else { + throw new IllegalArgumentException("bad argument"); + } + } + } +} diff --git a/instrumentation/aws-lambda/aws-lambda-events-2.2/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/awslambdaevents/v2_2/AwsLambdaInstrumentationModule.java b/instrumentation/aws-lambda/aws-lambda-events-2.2/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/awslambdaevents/v2_2/AwsLambdaInstrumentationModule.java index 9e0e3722417a..2dd6051c23f5 100644 --- a/instrumentation/aws-lambda/aws-lambda-events-2.2/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/awslambdaevents/v2_2/AwsLambdaInstrumentationModule.java +++ b/instrumentation/aws-lambda/aws-lambda-events-2.2/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/awslambdaevents/v2_2/AwsLambdaInstrumentationModule.java @@ -6,11 +6,11 @@ package io.opentelemetry.javaagent.instrumentation.awslambdaevents.v2_2; import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed; -import static java.util.Collections.singletonList; import com.google.auto.service.AutoService; import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import java.util.Arrays; import java.util.List; import net.bytebuddy.matcher.ElementMatcher; @@ -32,6 +32,8 @@ public boolean isHelperClass(String className) { @Override public List typeInstrumentations() { - return singletonList(new AwsLambdaRequestHandlerInstrumentation()); + return Arrays.asList( + new AwsLambdaRequestHandlerInstrumentation(), + new AwsLambdaRequestStreamHandlerInstrumentation()); } } diff --git a/instrumentation/aws-lambda/aws-lambda-events-2.2/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/awslambdaevents/v2_2/AwsLambdaRequestStreamHandlerInstrumentation.java b/instrumentation/aws-lambda/aws-lambda-events-2.2/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/awslambdaevents/v2_2/AwsLambdaRequestStreamHandlerInstrumentation.java new file mode 100644 index 000000000000..fb5971016a5d --- /dev/null +++ b/instrumentation/aws-lambda/aws-lambda-events-2.2/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/awslambdaevents/v2_2/AwsLambdaRequestStreamHandlerInstrumentation.java @@ -0,0 +1,92 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.awslambdaevents.v2_2; + +import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed; +import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.implementsInterface; +import static io.opentelemetry.javaagent.instrumentation.awslambdaevents.v2_2.AwsLambdaInstrumentationHelper.flushTimeout; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.isPublic; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; + +import com.amazonaws.services.lambda.runtime.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.instrumentation.awslambdacore.v1_0.AwsLambdaRequest; +import io.opentelemetry.javaagent.bootstrap.OpenTelemetrySdkAccess; +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import java.io.InputStream; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; + +public class AwsLambdaRequestStreamHandlerInstrumentation implements TypeInstrumentation { + + @Override + public ElementMatcher classLoaderOptimization() { + return hasClassesNamed("com.amazonaws.services.lambda.runtime.RequestStreamHandler"); + } + + @Override + public ElementMatcher typeMatcher() { + return implementsInterface(named("com.amazonaws.services.lambda.runtime.RequestStreamHandler")); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + isMethod() + .and(isPublic()) + .and(named("handleRequest")) + .and(takesArgument(2, named("com.amazonaws.services.lambda.runtime.Context"))), + AwsLambdaRequestStreamHandlerInstrumentation.class.getName() + "$HandleRequestAdvice"); + } + + @SuppressWarnings("unused") + public static class HandleRequestAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void onEnter( + @Advice.Argument(0) InputStream input, + @Advice.Argument(2) Context context, + @Advice.Local("otelInput") AwsLambdaRequest otelInput, + @Advice.Local("otelContext") io.opentelemetry.context.Context otelContext, + @Advice.Local("otelScope") Scope otelScope) { + Map headers = Collections.emptyMap(); + otelInput = AwsLambdaRequest.create(context, input, headers); + io.opentelemetry.context.Context parentContext = + AwsLambdaInstrumentationHelper.functionInstrumenter().extract(otelInput); + + if (!AwsLambdaInstrumentationHelper.functionInstrumenter() + .shouldStart(parentContext, otelInput)) { + return; + } + + otelContext = + AwsLambdaInstrumentationHelper.functionInstrumenter().start(parentContext, otelInput); + otelScope = otelContext.makeCurrent(); + } + + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void stopSpan( + @Advice.Thrown Throwable throwable, + @Advice.Local("otelInput") AwsLambdaRequest input, + @Advice.Local("otelContext") io.opentelemetry.context.Context functionContext, + @Advice.Local("otelScope") Scope functionScope) { + if (functionScope != null) { + functionScope.close(); + AwsLambdaInstrumentationHelper.functionInstrumenter() + .end(functionContext, input, null, throwable); + } + + OpenTelemetrySdkAccess.forceFlush(flushTimeout().toNanos(), TimeUnit.NANOSECONDS); + } + } +} diff --git a/instrumentation/aws-lambda/aws-lambda-events-2.2/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/awslambdaevents/v2_2/AwsLambdaStreamHandlerTest.java b/instrumentation/aws-lambda/aws-lambda-events-2.2/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/awslambdaevents/v2_2/AwsLambdaStreamHandlerTest.java new file mode 100644 index 000000000000..f6dbd04086ec --- /dev/null +++ b/instrumentation/aws-lambda/aws-lambda-events-2.2/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/awslambdaevents/v2_2/AwsLambdaStreamHandlerTest.java @@ -0,0 +1,113 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.awslambdaevents.v2_2; + +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.catchThrowable; +import static org.mockito.Mockito.when; + +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.RequestStreamHandler; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.sdk.trace.data.StatusData; +import io.opentelemetry.semconv.incubating.FaasIncubatingAttributes; +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +public class AwsLambdaStreamHandlerTest { + + @RegisterExtension + public static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + @Mock private Context context; + + @BeforeEach + void setUp() { + when(context.getFunctionName()).thenReturn("my_function"); + when(context.getAwsRequestId()).thenReturn("1-22-333"); + } + + @AfterEach + void tearDown() { + assertThat(testing.forceFlushCalled()).isTrue(); + } + + @Test + void handlerTraced() throws Exception { + InputStream input = new ByteArrayInputStream("hello\n".getBytes(StandardCharsets.UTF_8)); + OutputStream output = new ByteArrayOutputStream(); + RequestStreamHandlerTestImpl handler = new RequestStreamHandlerTestImpl(); + handler.handleRequest(input, output, context); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("my_function") + .hasKind(SpanKind.SERVER) + .hasAttributesSatisfyingExactly( + equalTo(FaasIncubatingAttributes.FAAS_INVOCATION_ID, "1-22-333")))); + } + + @Test + void handlerTracedWithException() { + InputStream input = new ByteArrayInputStream("bye\n".getBytes(StandardCharsets.UTF_8)); + OutputStream output = new ByteArrayOutputStream(); + RequestStreamHandlerTestImpl handler = new RequestStreamHandlerTestImpl(); + + Throwable thrown = catchThrowable(() -> handler.handleRequest(input, output, context)); + assertThat(thrown).isInstanceOf(IllegalArgumentException.class); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("my_function") + .hasKind(SpanKind.SERVER) + .hasStatus(StatusData.error()) + .hasException(thrown) + .hasAttributesSatisfyingExactly( + equalTo(FaasIncubatingAttributes.FAAS_INVOCATION_ID, "1-22-333")))); + } + + static final class RequestStreamHandlerTestImpl implements RequestStreamHandler { + @Override + public void handleRequest(InputStream input, OutputStream output, Context context) + throws IOException { + BufferedReader reader = + new BufferedReader(new InputStreamReader(input, StandardCharsets.UTF_8)); + BufferedWriter writer = + new BufferedWriter(new OutputStreamWriter(output, StandardCharsets.UTF_8)); + String line = reader.readLine(); + if (line.equals("hello")) { + writer.write("world"); + writer.flush(); + writer.close(); + } else { + throw new IllegalArgumentException("bad argument"); + } + } + } +}