Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add instrumentation for Lambda Java interface HandleStreamRequest #13466

Merged
merged 1 commit into from
Mar 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,18 @@
package io.opentelemetry.javaagent.instrumentation.awslambdacore.v1_0;

import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed;
import static java.util.Collections.singletonList;
import static net.bytebuddy.matcher.ElementMatchers.not;

import com.google.auto.service.AutoService;
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import java.util.Arrays;
import java.util.List;
import net.bytebuddy.matcher.ElementMatcher;

@AutoService(InstrumentationModule.class)
public class AwsLambdaInstrumentationModule extends InstrumentationModule {

public AwsLambdaInstrumentationModule() {
super("aws-lambda-core", "aws-lambda-core-1.0", "aws-lambda");
}
Expand All @@ -34,6 +35,8 @@ public boolean isHelperClass(String className) {

@Override
public List<TypeInstrumentation> typeInstrumentations() {
return singletonList(new AwsLambdaRequestHandlerInstrumentation());
return Arrays.asList(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

usually we static import the asList method when it is used in instrumentation module.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some day I'm going to write an automation for this...

new AwsLambdaRequestHandlerInstrumentation(),
new AwsLambdaRequestStreamHandlerInstrumentation());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.awslambdacore.v1_0;

import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed;
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.implementsInterface;
import static io.opentelemetry.javaagent.instrumentation.awslambdacore.v1_0.AwsLambdaInstrumentationHelper.flushTimeout;
import static io.opentelemetry.javaagent.instrumentation.awslambdacore.v1_0.AwsLambdaInstrumentationHelper.functionInstrumenter;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.nameStartsWith;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.not;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;

import com.amazonaws.services.lambda.runtime.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.awslambdacore.v1_0.AwsLambdaRequest;
import io.opentelemetry.javaagent.bootstrap.OpenTelemetrySdkAccess;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import java.io.InputStream;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;

public class AwsLambdaRequestStreamHandlerInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<ClassLoader> classLoaderOptimization() {
return hasClassesNamed("com.amazonaws.services.lambda.runtime.RequestStreamHandler");
}

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return implementsInterface(named("com.amazonaws.services.lambda.runtime.RequestStreamHandler"))
.and(not(nameStartsWith("com.amazonaws.services.lambda.runtime.api.client")))
// In Java 8 and Java 11 runtimes,
// AWS Lambda runtime is packaged under `lambdainternal` package.
// But it is `com.amazonaws.services.lambda.runtime.api.client`
// for new runtime likes Java 17 and Java 21.
.and(not(nameStartsWith("lambdainternal")));
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isMethod()
.and(isPublic())
.and(named("handleRequest"))
.and(takesArgument(2, named("com.amazonaws.services.lambda.runtime.Context"))),
AwsLambdaRequestStreamHandlerInstrumentation.class.getName() + "$HandleRequestAdvice");
}

@SuppressWarnings("unused")
public static class HandleRequestAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.Argument(0) InputStream input,
@Advice.Argument(2) Context context,
@Advice.Local("otelInput") AwsLambdaRequest otelInput,
@Advice.Local("otelContext") io.opentelemetry.context.Context otelContext,
@Advice.Local("otelScope") Scope otelScope) {

otelInput = AwsLambdaRequest.create(context, input, Collections.emptyMap());
io.opentelemetry.context.Context parentContext = functionInstrumenter().extract(otelInput);

if (!functionInstrumenter().shouldStart(parentContext, otelInput)) {
return;
}

otelContext = functionInstrumenter().start(parentContext, otelInput);
otelScope = otelContext.makeCurrent();
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void stopSpan(
@Advice.Thrown Throwable throwable,
@Advice.Local("otelInput") AwsLambdaRequest input,
@Advice.Local("otelContext") io.opentelemetry.context.Context functionContext,
@Advice.Local("otelScope") Scope functionScope) {

if (functionScope != null) {
functionScope.close();
functionInstrumenter().end(functionContext, input, null, throwable);
}

OpenTelemetrySdkAccess.forceFlush(flushTimeout().toNanos(), TimeUnit.NANOSECONDS);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.awslambdacore.v1_0;

import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.catchThrowable;
import static org.mockito.Mockito.when;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestStreamHandler;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.sdk.trace.data.StatusData;
import io.opentelemetry.semconv.incubating.FaasIncubatingAttributes;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith(MockitoExtension.class)
public class AwsLambdaStreamHandlerTest {

@RegisterExtension
public static final InstrumentationExtension testing = AgentInstrumentationExtension.create();

@Mock private Context context;

@BeforeEach
void setUp() {
when(context.getFunctionName()).thenReturn("my_function");
when(context.getAwsRequestId()).thenReturn("1-22-333");
}

@AfterEach
void tearDown() {
assertThat(testing.forceFlushCalled()).isTrue();
}

@Test
void handlerTraced() throws Exception {
InputStream input = new ByteArrayInputStream("hello\n".getBytes(StandardCharsets.UTF_8));
OutputStream output = new ByteArrayOutputStream();
RequestStreamHandlerTestImpl handler = new RequestStreamHandlerTestImpl();
handler.handleRequest(input, output, context);

testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("my_function")
.hasKind(SpanKind.SERVER)
.hasAttributesSatisfyingExactly(
equalTo(FaasIncubatingAttributes.FAAS_INVOCATION_ID, "1-22-333"))));
}

@Test
void handlerTracedWithException() {
InputStream input = new ByteArrayInputStream("bye\n".getBytes(StandardCharsets.UTF_8));
OutputStream output = new ByteArrayOutputStream();
RequestStreamHandlerTestImpl handler = new RequestStreamHandlerTestImpl();

Throwable thrown = catchThrowable(() -> handler.handleRequest(input, output, context));
assertThat(thrown).isInstanceOf(IllegalArgumentException.class);

testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("my_function")
.hasKind(SpanKind.SERVER)
.hasStatus(StatusData.error())
.hasException(thrown)
.hasAttributesSatisfyingExactly(
equalTo(FaasIncubatingAttributes.FAAS_INVOCATION_ID, "1-22-333"))));
}

static final class RequestStreamHandlerTestImpl implements RequestStreamHandler {
@Override
public void handleRequest(InputStream input, OutputStream output, Context context)
throws IOException {
BufferedReader reader =
new BufferedReader(new InputStreamReader(input, StandardCharsets.UTF_8));
BufferedWriter writer =
new BufferedWriter(new OutputStreamWriter(output, StandardCharsets.UTF_8));
String line = reader.readLine();
if (line.equals("hello")) {
writer.write("world");
writer.flush();
writer.close();
} else {
throw new IllegalArgumentException("bad argument");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
package io.opentelemetry.javaagent.instrumentation.awslambdaevents.v2_2;

import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed;
import static java.util.Collections.singletonList;

import com.google.auto.service.AutoService;
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import java.util.Arrays;
import java.util.List;
import net.bytebuddy.matcher.ElementMatcher;

Expand All @@ -32,6 +32,8 @@ public boolean isHelperClass(String className) {

@Override
public List<TypeInstrumentation> typeInstrumentations() {
return singletonList(new AwsLambdaRequestHandlerInstrumentation());
return Arrays.asList(
new AwsLambdaRequestHandlerInstrumentation(),
new AwsLambdaRequestStreamHandlerInstrumentation());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.awslambdaevents.v2_2;

import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed;
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.implementsInterface;
import static io.opentelemetry.javaagent.instrumentation.awslambdaevents.v2_2.AwsLambdaInstrumentationHelper.flushTimeout;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;

import com.amazonaws.services.lambda.runtime.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.awslambdacore.v1_0.AwsLambdaRequest;
import io.opentelemetry.javaagent.bootstrap.OpenTelemetrySdkAccess;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import java.io.InputStream;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;

public class AwsLambdaRequestStreamHandlerInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<ClassLoader> classLoaderOptimization() {
return hasClassesNamed("com.amazonaws.services.lambda.runtime.RequestStreamHandler");
}

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return implementsInterface(named("com.amazonaws.services.lambda.runtime.RequestStreamHandler"));
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isMethod()
.and(isPublic())
.and(named("handleRequest"))
.and(takesArgument(2, named("com.amazonaws.services.lambda.runtime.Context"))),
AwsLambdaRequestStreamHandlerInstrumentation.class.getName() + "$HandleRequestAdvice");
}

@SuppressWarnings("unused")
public static class HandleRequestAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.Argument(0) InputStream input,
@Advice.Argument(2) Context context,
@Advice.Local("otelInput") AwsLambdaRequest otelInput,
@Advice.Local("otelContext") io.opentelemetry.context.Context otelContext,
@Advice.Local("otelScope") Scope otelScope) {
Map<String, String> headers = Collections.emptyMap();
otelInput = AwsLambdaRequest.create(context, input, headers);
io.opentelemetry.context.Context parentContext =
AwsLambdaInstrumentationHelper.functionInstrumenter().extract(otelInput);

if (!AwsLambdaInstrumentationHelper.functionInstrumenter()
.shouldStart(parentContext, otelInput)) {
return;
}

otelContext =
AwsLambdaInstrumentationHelper.functionInstrumenter().start(parentContext, otelInput);
otelScope = otelContext.makeCurrent();
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void stopSpan(
@Advice.Thrown Throwable throwable,
@Advice.Local("otelInput") AwsLambdaRequest input,
@Advice.Local("otelContext") io.opentelemetry.context.Context functionContext,
@Advice.Local("otelScope") Scope functionScope) {
if (functionScope != null) {
functionScope.close();
AwsLambdaInstrumentationHelper.functionInstrumenter()
.end(functionContext, input, null, throwable);
}

OpenTelemetrySdkAccess.forceFlush(flushTimeout().toNanos(), TimeUnit.NANOSECONDS);
}
}
}
Loading
Loading