Skip to content

Kotlin extension: Collecting ChannelFlow can result in mismatching Contexts #7194

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Zincfox opened this issue Mar 13, 2025 · 2 comments
Closed
Labels
Bug Something isn't working

Comments

@Zincfox
Copy link

Zincfox commented Mar 13, 2025

Describe the bug
When collecting a ChannelFlow, on which first map and then flowOn(span.asContextElement()) was called, after the the collect{} call completes, Context.current() still contains the span used earlier, while coroutineContext.getOpenTelemetryContext() returns an empty context.

Given that no direct calls to makeCurrent() or close() were made, this feels like a bug in the kotlin-extension.

Steps to reproduce
The following test fails:

import io.opentelemetry.api.GlobalOpenTelemetry
import io.opentelemetry.context.Context
import io.opentelemetry.extension.kotlin.asContextElement
import io.opentelemetry.extension.kotlin.getOpenTelemetryContext
import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.test.runTest
import kotlin.test.Test
import kotlin.test.assertEquals


class CollectChannelFlowSpanContextTest {

    val tracer by lazy {
        GlobalOpenTelemetry.getTracer("CollectChannelFlowSpanContextTest")
    }

    @Test
    fun collectChannelFlowWithSpanDoesNotLeakContext() = runTest {
        val constructedFlow = channelFlow<Nothing?> { }

        val transformedFlow = constructedFlow.map { it } //.map prevents fusion of flowOn into channelFlow

        val flowExecutionSpan = tracer.spanBuilder("flowSpan").startSpan()
        transformedFlow.flowOn(flowExecutionSpan.asContextElement()).collect { }
        flowExecutionSpan.end()

        assertEquals(coroutineContext.getOpenTelemetryContext(), Context.current())
    }
}

Both .map { it } (or a similar flowOn-fusion-blocking operator) and channelFlow are required for this to occur. Removing the call to map or replacing the channelFlow with a normal flow both result in the test passing / the correct context being returned.

What did you expect to see?
I expected Context.current() to return the empty root context, just like coroutineContext.getOpenTelemetryContext().

What did you see instead?
The context returned by Context.current() was not empty:
org.opentest4j.AssertionFailedError: expected: <{}> but was: <{opentelemetry-trace-span-key=PropagatedSpan{ImmutableSpanContext{traceId=00000000000000000000000000000000, spanId=0000000000000000, traceFlags=00, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=false}}}>

What version and what artifacts are you using?
Artifacts: opentelemetry-api, opentelemetry-context, opentelemetry-extension-kotlin
Version: 1.48.0
How did you reference these artifacts?

plugins {
    kotlin("jvm") version "2.1.10"
}
//[...]
dependencies {
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.10.1")

    implementation(platform("io.opentelemetry:opentelemetry-bom:1.48.0"))
    implementation("io.opentelemetry:opentelemetry-api:1.48.0")
    implementation("io.opentelemetry:opentelemetry-context:1.48.0")
    implementation("io.opentelemetry:opentelemetry-extension-kotlin:1.48.0")

    testImplementation(kotlin("test"))
    testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.10.1")
}
//[...]

Environment
Java-Compiler: Oracle OpenJDK 17 (javac)
Kotlin-Compiler: 2.1.10, K2
OS: Windows10
Gradle: 8.10
IDE: IntelliJ IDEA 2024.3.4.1 (Ultimate Edition)

Additional context
Original symptom of the bug was an incorrectly attached span.
After the Flow::collect call (which in my case was Flow::toSet() instead), the following span was attached to the span of the flow, not the "parent" span spanning the whole function.

The code that led to this was part of a custom kotlin http-request library, which I wanted to add first-party span-data to. I then simplified the code until I was left with the above test.

As part of a different project I am working on (kotlin multiplatform opentelemetry facade) I also have an additional kotlin implementation of the ContinuationInterceptor targeting JavaScript, which appears to not have this problem. I will see if I can share it tomorrow, although I have not checked if it passes your existing tests here.

@Zincfox Zincfox added the Bug Something isn't working label Mar 13, 2025
@Zincfox
Copy link
Author

Zincfox commented Mar 13, 2025

The multiplatform project I mentioned: opentelemetry-kotlin-delegate

@Zincfox
Copy link
Author

Zincfox commented Apr 2, 2025

Closed for now as it seems to me like this might actually be a problem of kotlinx.coroutines itself.

@Zincfox Zincfox closed this as completed Apr 2, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Bug Something isn't working
Projects
None yet
Development

No branches or pull requests

1 participant