diff --git a/build.sbt b/build.sbt index 384d58971..567c4925c 100644 --- a/build.sbt +++ b/build.sbt @@ -550,6 +550,7 @@ lazy val `kamon-redis` = (project in file("instrumentation/kamon-redis")) "redis.clients" % "jedis" % "3.6.0" % "provided", "io.lettuce" % "lettuce-core" % "6.1.2.RELEASE" % "provided", "com.github.etaty" %% "rediscala" % "1.9.0" % "provided", + "org.redisson" % "redisson" % "3.11.6" % "provided", scalatest % "test", logbackClassic % "test", diff --git a/instrumentation/kamon-redis/src/main/resources/reference.conf b/instrumentation/kamon-redis/src/main/resources/reference.conf index 6cd5ee97c..1ab3cb9bf 100644 --- a/instrumentation/kamon-redis/src/main/resources/reference.conf +++ b/instrumentation/kamon-redis/src/main/resources/reference.conf @@ -1,18 +1,20 @@ kanela.modules { redis { name = "Redis Instrumentation" - description = "Provides tracing for Jedis, Lettuce and Rediscala libraries" + description = "Provides tracing for Jedis, Lettuce, Rediscala and Redisson libraries" instrumentations = [ "kamon.instrumentation.jedis.JedisInstrumentation", "kamon.instrumentation.lettuce.LettuceInstrumentation", "kamon.instrumentation.rediscala.RediscalaInstrumentation", + "kamon.instrumentation.redisson.RedissonInstrumentation", ] within = [ "redis.clients.jedis..*", "io.lettuce.core..*", "redis..*", + "org.redisson..*", ] } } diff --git a/instrumentation/kamon-redis/src/main/scala/kamon/instrumentation/redisson/RedissonInstrumentation.scala b/instrumentation/kamon-redis/src/main/scala/kamon/instrumentation/redisson/RedissonInstrumentation.scala new file mode 100644 index 000000000..ef442b165 --- /dev/null +++ b/instrumentation/kamon-redis/src/main/scala/kamon/instrumentation/redisson/RedissonInstrumentation.scala @@ -0,0 +1,68 @@ +package kamon.instrumentation.redisson + +import io.netty.buffer.ByteBuf +import io.netty.channel.{ChannelFuture, ChannelFutureListener} +import kamon.Kamon +import kamon.trace.Span +import kanela.agent.api.instrumentation.InstrumentationBuilder +import kanela.agent.libs.net.bytebuddy.asm.Advice +import org.redisson.client.protocol.{CommandData, CommandsData} + +import scala.collection.JavaConverters._ + +class RedissonInstrumentation extends InstrumentationBuilder { + + onType("org.redisson.client.RedisConnection") + .advise(method("send").and(takesArguments(1)), classOf[RedisConnectionInstrumentation]) + +} + +class RedisConnectionInstrumentation + +object RedisConnectionInstrumentation { + @Advice.OnMethodEnter() + def enter(@Advice.Argument(0) command: Any): Span = { + + def parseCommand(command: CommandData[_, _]): String = { + command.getParams.map { + case _: ByteBuf => "" + case bytes => String.valueOf(bytes) + }.filter(_.nonEmpty).mkString(" ") + } + + val (commandName, statements) = command match { + case commands: CommandsData => + val spanName = "redis.command.batch_execute" + val statements = commands.getCommands.asScala.map(parseCommand).mkString(",") + (spanName, statements) + case command: CommandData[_, _] => + val spanName = s"redis.command.${command.getCommand.getName}" + val statement = parseCommand(command) + (spanName, statement) + } + + Kamon.clientSpanBuilder(commandName, "redisson") + .tag("db.statement", statements) + .tagMetrics("db.system", "redis") + .start() + } + + @Advice.OnMethodExit(onThrowable = classOf[Throwable], suppress = classOf[Throwable]) + def exit(@Advice.Enter span: Span, + @Advice.Thrown t: Throwable, + @Advice.Return future: ChannelFuture) = { + if (t != null) { + span.fail(t) + } + future.addListener(new ChannelFutureListener() { + override def operationComplete(future: ChannelFuture): Unit = { + if (future.isSuccess) { + span.finish() + } else { + span.fail(future.cause()) + span.finish() + } + } + }) + } +} \ No newline at end of file diff --git a/instrumentation/kamon-redis/src/test/scala/kamon/instrumentation/combined/RedisInstrumentationsSpec.scala b/instrumentation/kamon-redis/src/test/scala/kamon/instrumentation/combined/RedisInstrumentationsSpec.scala index 62e037875..d1845af1d 100644 --- a/instrumentation/kamon-redis/src/test/scala/kamon/instrumentation/combined/RedisInstrumentationsSpec.scala +++ b/instrumentation/kamon-redis/src/test/scala/kamon/instrumentation/combined/RedisInstrumentationsSpec.scala @@ -6,6 +6,9 @@ import kamon.tag.Lookups import kamon.tag.Lookups._ import kamon.testkit.{InitAndStopKamonAfterAll, MetricInspection, TestSpanReporter} import kamon.trace.Span.Kind +import org.redisson.Redisson +import org.redisson.api.RBloomFilter +import org.redisson.config.Config import org.scalatest.concurrent.{Eventually, ScalaFutures} import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpec @@ -129,6 +132,82 @@ class RedisInstrumentationsSpec extends AnyWordSpec } } + "the Redisson instrumentation" should { + "generate a client span for set operation" in { + val config: Config = new Config() + config.useSingleServer().setAddress(s"redis://${container.getHost}:${container.getFirstMappedPort}") + val redisson = Redisson.create(config) + + val mySet = redisson.getSet[String]("redisson:foo") + mySet.add("bar") + + eventually(timeout(30.seconds)) { + val span = testSpanReporter().nextSpan().get + span.kind shouldBe Kind.Client + span.operationName shouldBe "redis.command.SADD" + span.metricTags.get(plain("db.system")) shouldBe "redis" + testSpanReporter().spans() shouldBe empty + testSpanReporter().clear() + } + redisson.shutdown() + } + + "generate a client span for blocking queue operation" in { + val config: Config = new Config() + config.useSingleServer().setAddress(s"redis://${container.getHost}:${container.getFirstMappedPort}") + val redisson = Redisson.create(config) + + val queue = redisson.getBlockingQueue[String]("myQueue") + queue.add("1") + queue.add("2") + queue.add("3") + queue.add("4") + + queue.contains("1") + queue.peek() + queue.poll() + queue.element() + + eventually(timeout(30.seconds)) { + val span = testSpanReporter().nextSpan().get + span.kind shouldBe Kind.Client + span.metricTags.get(plain("db.system")) shouldBe "redis" + testSpanReporter().spans() shouldBe empty + testSpanReporter().clear() + } + redisson.shutdown() + } + + "generate a client span for bloom filter operation" in { + val config: Config = new Config() + config.useSingleServer().setAddress(s"redis://${container.getHost}:${container.getFirstMappedPort}") + val redisson = Redisson.create(config) + + val bloomFilter: RBloomFilter[String] = redisson.getBloomFilter("bloomFilter") + bloomFilter.tryInit(100000000, 0.03) + + bloomFilter.add("a") + bloomFilter.add("b") + bloomFilter.add("c") + bloomFilter.add("d") + + bloomFilter.getExpectedInsertions + bloomFilter.getFalseProbability + bloomFilter.getHashIterations + + bloomFilter.contains("a") + + eventually(timeout(30.seconds)) { + val span = testSpanReporter().nextSpan().get + span.kind shouldBe Kind.Client + span.metricTags.get(plain("db.system")) shouldBe "redis" + testSpanReporter().spans() shouldBe empty + testSpanReporter().clear() + } + redisson.shutdown() + } + } + "the Rediscala instrumentation" should { implicit val akkaSystem = akka.actor.ActorSystem() "generate only one client span for commands" in {