Skip to content

Add cats-effect instrumentation #13576

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .fossa.yml
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,15 @@ targets:
- type: gradle
path: ./
target: ':instrumentation:cassandra:cassandra-4.4:library'
- type: gradle
path: ./
target: ':instrumentation:cats-effect:cats-effect-3.6:bootstrap'
- type: gradle
path: ./
target: ':instrumentation:cats-effect:cats-effect-3.6:javaagent'
- type: gradle
path: ./
target: ':instrumentation:cats-effect:cats-effect-common-3.6:javaagent'
- type: gradle
path: ./
target: ':instrumentation:couchbase:couchbase-2-common:javaagent'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
plugins {
id("otel.javaagent-bootstrap")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.bootstrap.catseffect.v3_6;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;

/** The helper stores a reference to the IOLocal#unsafeThreadLocal. */
public final class FiberLocalContextHelper {

private static final Logger logger = Logger.getLogger(FiberLocalContextHelper.class.getName());

private static final AtomicReference<ThreadLocal<Context>> fiberContextThreadLocal =
new AtomicReference<>();

private static final AtomicReference<Supplier<Boolean>> isUnderFiberContextSupplier =
new AtomicReference<>(() -> false);

public static void initialize(
ThreadLocal<Context> fiberThreadLocal, Supplier<Boolean> isUnderFiberContext) {
if (fiberContextThreadLocal.get() == null) {
fiberContextThreadLocal.set(fiberThreadLocal);
isUnderFiberContextSupplier.set(isUnderFiberContext);
logger.fine("The fiberThreadLocalContext is configured");
} else {
if (!fiberContextThreadLocal.get().equals(fiberThreadLocal)) {
logger.warning(
"The fiberThreadLocalContext is already configured. Ignoring subsequent calls.");
}
}
}

public static Boolean isUnderFiberContext() {
return isUnderFiberContextSupplier.get().get();
}

@Nullable
public static Context current() {
ThreadLocal<Context> local = getFiberThreadLocal();
return local != null ? local.get() : null;
}

public static Scope attach(Context toAttach) {
ThreadLocal<Context> local = fiberContextThreadLocal.get();
if (toAttach == null || local == null) {
return Scope.noop();
} else {
Context beforeAttach = current();
if (toAttach == beforeAttach) {
return Scope.noop();
} else {
local.set(toAttach);
return new ScopeImpl(beforeAttach, toAttach);
}
}
}

@Nullable
private static ThreadLocal<Context> getFiberThreadLocal() {
return fiberContextThreadLocal.get();
}

private static class ScopeImpl implements Scope {
@Nullable private final Context beforeAttach;
private final Context toAttach;
private boolean closed;

private ScopeImpl(@Nullable Context beforeAttach, Context toAttach) {
this.beforeAttach = beforeAttach;
this.toAttach = toAttach;
}

@Override
public void close() {
if (!this.closed && FiberLocalContextHelper.current() == this.toAttach) {
this.closed = true;
FiberLocalContextHelper.fiberContextThreadLocal.get().set(this.beforeAttach);
} else {
FiberLocalContextHelper.logger.log(
Level.FINE,
"Trying to close scope which does not represent current context. Ignoring the call.");
}
}
}

private FiberLocalContextHelper() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
plugins {
id("otel.javaagent-instrumentation")
id("otel.nullaway-conventions")
id("otel.scala-conventions")
}

val scalaVersion = "2.13"
val catsEffectVersion = "3.6.0"

muzzle {
pass {
group.set("org.typelevel")
module.set("cats-effect_2.12")
versions.set("[$catsEffectVersion,)")
assertInverse.set(true)
}
pass {
group.set("org.typelevel")
module.set("cats-effect_2.13")
versions.set("[$catsEffectVersion,)")
assertInverse.set(true)
}
pass {
group.set("org.typelevel")
module.set("cats-effect_3")
versions.set("[$catsEffectVersion,)")
assertInverse.set(true)
}
}

dependencies {
bootstrap(project(":instrumentation:cats-effect:cats-effect-3.6:bootstrap"))

// we need access to the "application.io.opentelemetry.context.Context"
// to properly bridge fiber and agent context storages
compileOnly(project(":opentelemetry-api-shaded-for-instrumenting", configuration = "shadow"))
compileOnly("io.opentelemetry:opentelemetry-sdk-extension-autoconfigure")
implementation(project(":instrumentation:opentelemetry-api:opentelemetry-api-1.0:javaagent"))

implementation(project(":instrumentation:cats-effect:cats-effect-common-3.6:javaagent"))

compileOnly("org.typelevel:cats-effect_$scalaVersion:$catsEffectVersion")

testImplementation("org.typelevel:cats-effect_$scalaVersion:$catsEffectVersion")

latestDepTestLibrary("org.typelevel:cats-effect_$scalaVersion:latest.release")
}

tasks {
withType<Test>().configureEach {
jvmArgs("-Dio.opentelemetry.javaagent.shaded.io.opentelemetry.context.enableStrictContext=false")
jvmArgs("-Dcats.effect.trackFiberContext=true")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.catseffect.v3_6;

import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed;

import com.google.auto.service.AutoService;
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.internal.ExperimentalInstrumentationModule;
import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties;
import java.util.Arrays;
import java.util.List;
import net.bytebuddy.matcher.ElementMatcher;

@AutoService(InstrumentationModule.class)
public class CatsEffectInstrumentationModule extends InstrumentationModule
implements ExperimentalInstrumentationModule {

public CatsEffectInstrumentationModule() {
super("cats-effect", "cats-effect-3.6");
}

@Override
public List<TypeInstrumentation> typeInstrumentations() {
return Arrays.asList(new IoRuntimeInstrumentation(), new IoInstrumentation());
}

@Override
public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
return hasClassesNamed("cats.effect.IO")
// missing before 3.6.0
.and(hasClassesNamed("cats.effect.unsafe.metrics.IORuntimeMetrics"));
}

@Override
public boolean defaultEnabled(ConfigProperties config) {
return super.defaultEnabled(config)
&& config.getBoolean("cats.effect.trackFiberContext", false);
}

// ensure it's the last one
@Override
public int order() {
return Integer.MAX_VALUE;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.catseffect.v3_6;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.ContextStorage;
import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.bootstrap.catseffect.v3_6.FiberLocalContextHelper;
import javax.annotation.Nullable;

public class FiberContextBridge implements ContextStorage {

private final ContextStorage agentContextStorage;

public FiberContextBridge(ContextStorage delegate) {
this.agentContextStorage = delegate;
}

@Override
public Scope attach(Context toAttach) {
if (FiberLocalContextHelper.isUnderFiberContext()) {
return FiberLocalContextHelper.attach(toAttach);
} else {
return agentContextStorage.attach(toAttach);
}
}

@Nullable
@Override
public Context current() {
if (FiberLocalContextHelper.isUnderFiberContext()) {
return FiberLocalContextHelper.current();
} else {
return agentContextStorage.current();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.catseffect.v3_6;

import com.google.auto.service.AutoService;
import io.opentelemetry.context.ContextStorage;
import io.opentelemetry.javaagent.tooling.BeforeAgentListener;
import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk;
import java.util.logging.Logger;

/**
* A {@link BeforeAgentListener} that installs {@link FiberContextBridge} if `cats.effect.IO` is
* present in the classpath.
*/
@AutoService(BeforeAgentListener.class)
public class FiberContextBridgeInstaller implements BeforeAgentListener {

private static final Logger logger =
Logger.getLogger(FiberContextBridgeInstaller.class.getName());

@Override
public void beforeAgent(AutoConfiguredOpenTelemetrySdk autoConfiguredOpenTelemetrySdk) {
ContextStorage.addWrapper(FiberContextBridge::new);
logger.fine("Installed Cats Effect FiberContextBridge");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.catseffect.v3_6;

import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.named;

import application.io.opentelemetry.context.Context;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import io.opentelemetry.javaagent.instrumentation.catseffect.common.v3_6.IoLocalContextSingleton;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;

public class IoInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("cats.effect.IO");
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isMethod().and(named("unsafeRunFiber")),
this.getClass().getName() + "$UnsafeRunFiberAdvice");
}

@SuppressWarnings("unused")
public static final class UnsafeRunFiberAdvice {
private UnsafeRunFiberAdvice() {}

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(@Advice.This(readOnly = false) cats.effect.IO<?> io) {
io = IoLocalContextSingleton.ioLocal.asLocal().scope(io, Context.current());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.catseffect.v3_6;

import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.named;

import cats.effect.unsafe.IORuntime;
import io.opentelemetry.javaagent.bootstrap.catseffect.v3_6.FiberLocalContextHelper;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import io.opentelemetry.javaagent.instrumentation.catseffect.common.v3_6.IoLocalContextSingleton;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;

public class IoRuntimeInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("cats.effect.unsafe.IORuntime");
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isConstructor(), this.getClass().getName() + "$ConstructorAdvice");
}

@SuppressWarnings("unused")
public static final class ConstructorAdvice {
private ConstructorAdvice() {}

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter() {
FiberLocalContextHelper.initialize(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if you have deployed 2 wars on tomcat that use this library, won't this break? Messing with the context storage is unusual, my hunch is that this is not a good idea. Typically such instrumentations restore the otel context when fiber starts running on a thread and save the context when it stops using the thread.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a valid concern indeed.

deployed 2 wars on tomcat that use this library

If I understand correctly, each deployment (app) will have its own classloader, but the bootstrap will still be shared.
If that's the case, my implementation won't work, I'm afraid.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suppose I don't find a proper way to make the instrumentation work. Can I distribute the current implementation as a third-party extension? Can the extension have access to the bootstrap loader?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the extension have access to the bootstrap loader?

Not directly, but you could try using byte-buddy to define the class you need in boot loader or you could experiment with Instrumentation.appendToBootstrapClassLoaderSearch.

IoLocalContextSingleton.contextThreadLocal, IORuntime::isUnderFiberContext);
}
}
}
Loading
Loading