Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix KafkaJS instrumentation of orphaned promise #5270

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

mastermatt
Copy link

@mastermatt mastermatt commented Feb 13, 2025

What does this PR do?

Refactors querying the cluster ID in the KafkaJS instrumentation, so that unhandled rejections can't occur if the underlying client is unable to connect to the broker.

Motivation

Bug added in #4808, released under 5.25.0
Similar in effect to the bug which was resolved with #4112

The addition of the logic which added the clusterId to publish and consume spans created orphaned promises in producer and consumer methods which don't become re-exposed until the send or run methods are called respectively. This results in unhandled rejections, and by default, exits the process if those secondary methods are not called in time and admin methods fail.

Plugin Checklist

Additional Notes

Reproduction:

require('dd-trace/init')

const { Kafka } = require('kafkajs')
const { config } = require('@/config')

process.on('unhandledRejection', err => {
  console.error('unhandledRejection', err)
  throw err
})

const main = async () => {
  const producer = new Kafka({
    ...config.KAFKA,
    retry: { retries: 0 },
  }).producer()

  await producer.connect() // the error thrown here will be caught safely
  await producer.send({
    messages: [{ value: 'hello world' }],
    topic: 'my.topic',
  })
}

main().catch(err => {
  console.error('boom', err)
})

If the above file is run when no Kafka broker is running/accessible, the expected behavior is for the connect() to timeout, throw an error, and be caught and logged at the bottom with a "boom" message.

Current behavior will log the expected error from the rejected connect(), however, there will also be an unhandled rejection log with a stack trace that originates from the synchronous producer() call.

I'd like to call out that this file has exhibited the same bug twice in the last year and there doesn't seem to be any test file for the KafkaJS instrumentation.
I'm willing to add tests go along with this PR, but I'd like guidance on test strategy from the DD team.
The PR which added the bug only updated a test suite in datadog-plugin-kafkajs, and only covered happy paths. It's unclear to me if any new tests should live in datadog-plugin-kafkajs or datadog-instrumentations/test.

cc: @wconti27 @bengl @juan-fernandez

Bug added in DataDog#4808, released under 5.25.0
Similar in effect to the bug which was resolved with DataDog#4112

The addition of the logic which added the clusterId to publish and consume spans created orphaned promises in `producer` and `consumer` methods which don't become re-exposed until the `send` or `run` methods are called respectively.
This results in unhandled rejections, and by default, exits the process.
@mastermatt mastermatt requested review from a team as code owners February 13, 2025 20:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant