Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Instrument jdbc batch queries #12797

Merged
merged 12 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package io.opentelemetry.instrumentation.api.incubator.semconv.db;

import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor;
import io.opentelemetry.instrumentation.api.internal.SemconvStability;
import java.util.Collection;

public abstract class DbClientSpanNameExtractor<REQUEST> implements SpanNameExtractor<REQUEST> {

Expand Down Expand Up @@ -94,9 +96,40 @@ private SqlClientSpanNameExtractor(SqlClientAttributesGetter<REQUEST> getter) {
@Override
public String extract(REQUEST request) {
String namespace = getter.getDbNamespace(request);
SqlStatementInfo sanitizedStatement = sanitizer.sanitize(getter.getRawQueryText(request));
Collection<String> rawQueryTexts = getter.getRawQueryTexts(request);

if (rawQueryTexts.isEmpty()) {
return computeSpanName(namespace, null, null);
}

if (!SemconvStability.emitStableDatabaseSemconv()) {
if (rawQueryTexts.size() > 1) { // for backcompat(?)
return computeSpanName(namespace, null, null);
}
SqlStatementInfo sanitizedStatement = sanitizer.sanitize(rawQueryTexts.iterator().next());
return computeSpanName(
namespace, sanitizedStatement.getOperation(), sanitizedStatement.getMainIdentifier());
}

if (rawQueryTexts.size() == 1) {
SqlStatementInfo sanitizedStatement = sanitizer.sanitize(rawQueryTexts.iterator().next());
String operation = sanitizedStatement.getOperation();
if (isBatch(request)) {
operation = "BATCH " + operation;
}
return computeSpanName(namespace, operation, sanitizedStatement.getMainIdentifier());
}

MultiQuery multiQuery = MultiQuery.analyze(rawQueryTexts, false);
return computeSpanName(
namespace, sanitizedStatement.getOperation(), sanitizedStatement.getMainIdentifier());
namespace,
multiQuery.getOperation() != null ? "BATCH " + multiQuery.getOperation() : "BATCH",
multiQuery.getMainIdentifier());
}

private boolean isBatch(REQUEST request) {
Long batchSize = getter.getBatchSize(request);
return batchSize != null && batchSize > 1;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.api.incubator.semconv.db;

import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.Set;

class MultiQuery {
private static final SqlStatementSanitizer sanitizer = SqlStatementSanitizer.create(true);

private final String mainIdentifier;
private final String operation;
private final Set<String> statements;

private MultiQuery(String mainIdentifier, String operation, Set<String> statements) {
this.mainIdentifier = mainIdentifier;
this.operation = operation;
this.statements = statements;
}

static MultiQuery analyze(
Collection<String> rawQueryTexts, boolean statementSanitizationEnabled) {
UniqueValue uniqueMainIdentifier = new UniqueValue();
UniqueValue uniqueOperation = new UniqueValue();
Set<String> uniqueStatements = new LinkedHashSet<>();
for (String rawQueryText : rawQueryTexts) {
SqlStatementInfo sanitizedStatement = sanitizer.sanitize(rawQueryText);
String mainIdentifier = sanitizedStatement.getMainIdentifier();
uniqueMainIdentifier.set(mainIdentifier);
String operation = sanitizedStatement.getOperation();
uniqueOperation.set(operation);
uniqueStatements.add(
statementSanitizationEnabled ? sanitizedStatement.getFullStatement() : rawQueryText);
}

return new MultiQuery(
uniqueMainIdentifier.getValue(), uniqueOperation.getValue(), uniqueStatements);
}

public String getMainIdentifier() {
return mainIdentifier;
}

public String getOperation() {
return operation;
}

public Set<String> getStatements() {
return statements;
}

private static class UniqueValue {
private String value;
private boolean valid = true;

void set(String value) {
if (!valid) {
return;
}
if (this.value == null) {
this.value = value;
} else if (!this.value.equals(value)) {
valid = false;
}
}

String getValue() {
return valid ? value : null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import io.opentelemetry.instrumentation.api.internal.SemconvStability;
import java.util.Collection;

/**
* Extractor of <a
Expand All @@ -35,6 +36,8 @@ public final class SqlClientAttributesExtractor<REQUEST, RESPONSE>
private static final AttributeKey<String> DB_QUERY_TEXT = AttributeKey.stringKey("db.query.text");
static final AttributeKey<String> DB_COLLECTION_NAME =
AttributeKey.stringKey("db.collection.name");
private static final AttributeKey<Long> DB_OPERATION_BATCH_SIZE =
AttributeKey.longKey("db.operation.batch.size");

/** Creates the SQL client attributes extractor with default configuration. */
public static <REQUEST, RESPONSE> AttributesExtractor<REQUEST, RESPONSE> create(
Expand All @@ -52,7 +55,7 @@ public static <REQUEST, RESPONSE> SqlClientAttributesExtractorBuilder<REQUEST, R
}

private static final String SQL_CALL = "CALL";
// sanitizer is also used to extract operation and table name, so we have it always enable here
// sanitizer is also used to extract operation and table name, so we have it always enabled here
private static final SqlStatementSanitizer sanitizer = SqlStatementSanitizer.create(true);

private final AttributeKey<String> oldSemconvTableAttribute;
Expand All @@ -71,30 +74,72 @@ public static <REQUEST, RESPONSE> SqlClientAttributesExtractorBuilder<REQUEST, R
public void onStart(AttributesBuilder attributes, Context parentContext, REQUEST request) {
super.onStart(attributes, parentContext, request);

String rawQueryText = getter.getRawQueryText(request);
SqlStatementInfo sanitizedStatement = sanitizer.sanitize(rawQueryText);
String operation = sanitizedStatement.getOperation();
if (SemconvStability.emitStableDatabaseSemconv()) {
internalSet(
attributes,
DB_QUERY_TEXT,
statementSanitizationEnabled ? sanitizedStatement.getFullStatement() : rawQueryText);
internalSet(attributes, DB_OPERATION_NAME, operation);
Collection<String> rawQueryTexts = getter.getRawQueryTexts(request);

if (rawQueryTexts.isEmpty()) {
return;
}

if (SemconvStability.emitOldDatabaseSemconv()) {
internalSet(
attributes,
DB_STATEMENT,
statementSanitizationEnabled ? sanitizedStatement.getFullStatement() : rawQueryText);
internalSet(attributes, DB_OPERATION, operation);
if (rawQueryTexts.size() == 1) { // for backcompat(?)
String rawQueryText = rawQueryTexts.iterator().next();
SqlStatementInfo sanitizedStatement = sanitizer.sanitize(rawQueryText);
String operation = sanitizedStatement.getOperation();
internalSet(
attributes,
DB_STATEMENT,
statementSanitizationEnabled ? sanitizedStatement.getFullStatement() : rawQueryText);
internalSet(attributes, DB_OPERATION, operation);
if (!SQL_CALL.equals(operation)) {
internalSet(attributes, oldSemconvTableAttribute, sanitizedStatement.getMainIdentifier());
}
}
}
if (!SQL_CALL.equals(operation)) {
if (SemconvStability.emitStableDatabaseSemconv()) {
internalSet(attributes, DB_COLLECTION_NAME, sanitizedStatement.getMainIdentifier());

if (SemconvStability.emitStableDatabaseSemconv()) {
Long batchSize = getter.getBatchSize(request);
boolean isBatch = batchSize != null && batchSize > 1;
if (isBatch) {
internalSet(attributes, DB_OPERATION_BATCH_SIZE, batchSize);
}
if (SemconvStability.emitOldDatabaseSemconv()) {
internalSet(attributes, oldSemconvTableAttribute, sanitizedStatement.getMainIdentifier());
if (rawQueryTexts.size() == 1) {
String rawQueryText = rawQueryTexts.iterator().next();
SqlStatementInfo sanitizedStatement = sanitizer.sanitize(rawQueryText);
String operation = sanitizedStatement.getOperation();
internalSet(
attributes,
DB_QUERY_TEXT,
statementSanitizationEnabled ? sanitizedStatement.getFullStatement() : rawQueryText);
internalSet(attributes, DB_OPERATION_NAME, isBatch ? "BATCH " + operation : operation);
if (!SQL_CALL.equals(operation)) {
internalSet(attributes, DB_COLLECTION_NAME, sanitizedStatement.getMainIdentifier());
}
} else {
MultiQuery multiQuery =
MultiQuery.analyze(getter.getRawQueryTexts(request), statementSanitizationEnabled);
internalSet(attributes, DB_QUERY_TEXT, join("; ", multiQuery.getStatements()));

String operation =
multiQuery.getOperation() != null ? "BATCH " + multiQuery.getOperation() : "BATCH";
internalSet(attributes, DB_OPERATION_NAME, operation);

if (multiQuery.getMainIdentifier() != null
&& (multiQuery.getOperation() == null || !SQL_CALL.equals(multiQuery.getOperation()))) {
internalSet(attributes, DB_COLLECTION_NAME, multiQuery.getMainIdentifier());
}
}
}
}

// String.join is not available on android
private static String join(String delimiter, Collection<String> collection) {
StringBuilder builder = new StringBuilder();
for (String string : collection) {
if (builder.length() != 0) {
builder.append(delimiter);
}
builder.append(string);
}
return builder.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@

package io.opentelemetry.instrumentation.api.incubator.semconv.db;

import static java.util.Collections.emptySet;
import static java.util.Collections.singleton;

import java.util.Collection;
import javax.annotation.Nullable;

/**
Expand Down Expand Up @@ -33,9 +37,33 @@ default String getRawStatement(REQUEST request) {
return null;
}

// TODO: make this required to implement
/**
* Get the raw SQL query text. The value returned by this method is later sanitized by the {@link
* SqlClientAttributesExtractor} before being set as span attribute.
*
* @deprecated Use {@link #getRawQueryTexts(Object)} instead.
*/
@Deprecated
@Nullable
default String getRawQueryText(REQUEST request) {
return getRawStatement(request);
}

/**
* Get the raw SQL query texts. The values returned by this method is later sanitized by the
* {@link SqlClientAttributesExtractor} before being set as span attribute.
*
* <p>If {@code request} is not a batch query, then this method should return a collection with a
* single element.
*/
// TODO: make this required to implement
default Collection<String> getRawQueryTexts(REQUEST request) {
String rawQueryText = getRawQueryText(request);
return rawQueryText == null ? emptySet() : singleton(rawQueryText);
}

// TODO: make this required to implement
default Long getBatchSize(REQUEST request) {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@

package io.opentelemetry.instrumentation.api.incubator.semconv.db;

import static java.util.Collections.singleton;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.when;

import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor;
import io.opentelemetry.instrumentation.api.internal.SemconvStability;
import java.util.Arrays;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
Expand All @@ -24,7 +27,8 @@ void shouldExtractFullSpanName() {
// given
DbRequest dbRequest = new DbRequest();

when(sqlAttributesGetter.getRawQueryText(dbRequest)).thenReturn("SELECT * from table");
when(sqlAttributesGetter.getRawQueryTexts(dbRequest))
.thenReturn(singleton("SELECT * from table"));
when(sqlAttributesGetter.getDbNamespace(dbRequest)).thenReturn("database");

SpanNameExtractor<DbRequest> underTest = DbClientSpanNameExtractor.create(sqlAttributesGetter);
Expand All @@ -41,7 +45,8 @@ void shouldSkipDbNameIfTableAlreadyHasDbNamePrefix() {
// given
DbRequest dbRequest = new DbRequest();

when(sqlAttributesGetter.getRawQueryText(dbRequest)).thenReturn("SELECT * from another.table");
when(sqlAttributesGetter.getRawQueryTexts(dbRequest))
.thenReturn(singleton("SELECT * from another.table"));
when(sqlAttributesGetter.getDbNamespace(dbRequest)).thenReturn("database");

SpanNameExtractor<DbRequest> underTest = DbClientSpanNameExtractor.create(sqlAttributesGetter);
Expand All @@ -58,7 +63,8 @@ void shouldExtractOperationAndTable() {
// given
DbRequest dbRequest = new DbRequest();

when(sqlAttributesGetter.getRawQueryText(dbRequest)).thenReturn("SELECT * from table");
when(sqlAttributesGetter.getRawQueryTexts(dbRequest))
.thenReturn(singleton("SELECT * from table"));

SpanNameExtractor<DbRequest> underTest = DbClientSpanNameExtractor.create(sqlAttributesGetter);

Expand Down Expand Up @@ -132,5 +138,50 @@ void shouldFallBackToDefaultSpanName() {
assertEquals("DB Query", spanName);
}

@Test
void shouldExtractFullSpanNameForBatch() {
// given
DbRequest dbRequest = new DbRequest();

when(sqlAttributesGetter.getRawQueryTexts(dbRequest))
.thenReturn(Arrays.asList("INSERT INTO table VALUES(1)", "INSERT INTO table VALUES(2)"));
when(sqlAttributesGetter.getDbNamespace(dbRequest)).thenReturn("database");

SpanNameExtractor<DbRequest> underTest = DbClientSpanNameExtractor.create(sqlAttributesGetter);

// when
String spanName = underTest.extract(dbRequest);

// then
assertEquals(
SemconvStability.emitStableDatabaseSemconv() ? "BATCH INSERT database.table" : "database",
spanName);
}

@Test
void shouldExtractFullSpanNameForSingleQueryBatch() {
// given
DbRequest dbRequest = new DbRequest();

when(sqlAttributesGetter.getRawQueryTexts(dbRequest))
.thenReturn(singleton("INSERT INTO table VALUES(?)"));
when(sqlAttributesGetter.getDbNamespace(dbRequest)).thenReturn("database");
if (SemconvStability.emitStableDatabaseSemconv()) {
when(sqlAttributesGetter.getBatchSize(dbRequest)).thenReturn(2L);
}

SpanNameExtractor<DbRequest> underTest = DbClientSpanNameExtractor.create(sqlAttributesGetter);

// when
String spanName = underTest.extract(dbRequest);

// then
assertEquals(
SemconvStability.emitStableDatabaseSemconv()
? "BATCH INSERT database.table"
: "INSERT database.table",
spanName);
}

static class DbRequest {}
}
Loading
Loading