Skip to content

Commit

Permalink
working writer, need to stop apm writes and complete metadata writes
Browse files Browse the repository at this point in the history
  • Loading branch information
gary-huang committed Feb 19, 2025
1 parent c437b9c commit 88cbf86
Show file tree
Hide file tree
Showing 16 changed files with 640 additions and 23 deletions.
1 change: 1 addition & 0 deletions communication/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ dependencies {
implementation libs.slf4j

api project(':remote-config:remote-config-api')
implementation project(':components:json')
implementation project(':remote-config:remote-config-core')
implementation project(':internal-api')
implementation project(':utils:container-utils')
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package datadog.communication.serialization.json;

import datadog.communication.serialization.EncodingCache;
import datadog.communication.serialization.Mapper;
import datadog.communication.serialization.WritableFormatter;
import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
import java.nio.ByteBuffer;
import java.util.Map;

public class JSONWritableFormatter implements WritableFormatter {

@Override
public <T> boolean format(T message, Mapper<T> mapper) {
return false;
}

@Override
public void flush() {

}

@Override
public void writeNull() {

}

@Override
public void writeBoolean(boolean value) {

}

@Override
public void writeObject(Object value, EncodingCache encodingCache) {

}

@Override
public void writeObjectString(Object value, EncodingCache encodingCache) {

}

@Override
public void writeMap(Map<? extends CharSequence, ?> map, EncodingCache encodingCache) {

}

@Override
public void writeString(CharSequence s, EncodingCache encodingCache) {

}

@Override
public void writeUTF8(byte[] string, int offset, int length) {

}

@Override
public void writeUTF8(byte[] string) {

}

@Override
public void writeUTF8(UTF8BytesString string) {

}

@Override
public void writeBinary(byte[] binary) {

}

@Override
public void writeBinary(byte[] binary, int offset, int length) {

}

@Override
public void startMap(int elementCount) {

}

@Override
public void startStruct(int elementCount) {

}

@Override
public void startArray(int elementCount) {

}

@Override
public void writeBinary(ByteBuffer buffer) {

}

@Override
public void writeInt(int value) {

}

@Override
public void writeSignedInt(int value) {

}

@Override
public void writeLong(long value) {

}

@Override
public void writeUnsignedLong(long value) {

}

@Override
public void writeSignedLong(long value) {

}

@Override
public void writeFloat(float value) {

}

@Override
public void writeDouble(double value) {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import java.util.List;
import java.util.Map;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DDLLMObsSpan implements LLMObsSpan {

Expand All @@ -20,6 +22,8 @@ public class DDLLMObsSpan implements LLMObsSpan {

private boolean finished = false;

private static final Logger LOGGER = LoggerFactory.getLogger(DDLLMObsSpan.class);

public DDLLMObsSpan(
@Nonnull String kind,
@Nonnull String spanName,
Expand Down Expand Up @@ -71,6 +75,7 @@ public void annotateIO(
if (finished) {
return;
}
LOGGER.warn("ANNOTATE IN {} OUT {}", inputData, outputData);
if (inputData != null && !inputData.isEmpty()) {
this.span.setTag(LLMObsTags.LLMOBS_TAG_PREFIX + LLMObsTags.INPUT, inputData);
}
Expand Down
9 changes: 5 additions & 4 deletions dd-java-agent/instrumentation/wildfly-9/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ muzzle {
pass {
group = 'org.wildfly'
module = 'wildfly-ee'
versions = '[9.0.0.Final,)'
versions = '[9.0.0.Final,35.0.0.Final]'
excludeDependency 'org.jboss.xnio:*' // not related and causes issues with missing jar in maven repo
}
}
Expand Down Expand Up @@ -75,9 +75,10 @@ dependencies {
wildflyLatestPoll group: 'org.wildfly', name: 'wildfly-dist', version: '+'

configurations.wildflyLatestPoll.resolve()
def latestWildflyVersion = configurations.wildflyLatestPoll.resolvedConfiguration.getResolvedArtifacts().find {
it.name == "wildfly-dist"
}.moduleVersion.id.version
// def latestWildflyVersion = configurations.wildflyLatestPoll.resolvedConfiguration.getResolvedArtifacts().find {
// it.name == "wildfly-dist"
// }.moduleVersion.id.version
def latestWildflyVersion = "35.0.0.Final"
wildflyLatestDepTest "wildfly:wildfly:$latestWildflyVersion@zip"
latestDepForkedTest {
configure {
Expand Down
5 changes: 5 additions & 0 deletions dd-trace-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ dependencies {
implementation project(':components:json')
implementation project(':utils:container-utils')
implementation project(':utils:socket-utils')

implementation group: 'org.msgpack', name: 'msgpack-core', version: '0.8.10'
implementation group: 'org.msgpack', name: 'jackson-dataformat-msgpack', version: '0.8.10'
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.10.0'

// for span exception debugging
compileOnly project(':dd-java-agent:agent-debugger:debugger-bootstrap')

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import java.util.EnumMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DDIntakeWriter extends RemoteWriter {

Expand Down Expand Up @@ -40,6 +42,8 @@ public static class DDIntakeWriterBuilder {

private SingleSpanSampler singleSpanSampler;

private static final Logger log = LoggerFactory.getLogger(DDIntakeWriterBuilder.class);

public DDIntakeWriterBuilder addTrack(final TrackType trackType, final RemoteApi intakeApi) {
tracks.put(trackType, intakeApi);
return this;
Expand Down Expand Up @@ -98,6 +102,7 @@ public DDIntakeWriterBuilder singleSpanSampler(SingleSpanSampler singleSpanSampl
}

public DDIntakeWriter build() {
log.debug("DDINTAKEWRITER TRACKS {}", tracks);
if (tracks.isEmpty()) {
throw new IllegalArgumentException("At least one track needs to be configured");
}
Expand All @@ -112,6 +117,7 @@ public DDIntakeWriter build() {
.toArray(PayloadDispatcher[]::new);
dispatcher = new CompositePayloadDispatcher(dispatchers);
}
log.debug("DISPATCHER {}", dispatcher);

final TraceProcessingWorker traceProcessingWorker =
new TraceProcessingWorker(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ public static Writer createWriter(
final Sampler sampler,
final SingleSpanSampler singleSpanSampler,
final HealthMetrics healthMetrics) {
return createWriter(
Writer w = createWriter(
config, commObjects, sampler, singleSpanSampler, healthMetrics, config.getWriterType());
return w;
}

public static Writer createWriter(
Expand All @@ -51,14 +52,20 @@ public static Writer createWriter(
final HealthMetrics healthMetrics,
String configuredType) {

log.debug("START CREATE WRITER {}", configuredType);

if (LOGGING_WRITER_TYPE.equals(configuredType)) {
log.debug("STARTED WRITER LOGGING");
return new LoggingWriter();
} else if (PRINTING_WRITER_TYPE.equals(configuredType)) {
log.debug("STARTED WRITER PRINTING");
return new PrintingWriter(System.out, true);
} else if (configuredType.startsWith(TRACE_STRUCTURE_WRITER_TYPE)) {
log.debug("STARTED WRITER TRACE STRCT");
return new TraceStructureWriter(
Strings.replace(configuredType, TRACE_STRUCTURE_WRITER_TYPE, ""));
} else if (configuredType.startsWith(MULTI_WRITER_TYPE)) {
log.debug("STARTED WRITER MULTI");
return new MultiWriter(
config, commObjects, sampler, singleSpanSampler, healthMetrics, configuredType);
}
Expand All @@ -82,8 +89,9 @@ public static Writer createWriter(

// The AgentWriter doesn't support the CI Visibility protocol. If CI Visibility is
// enabled, check if we can use the IntakeWriter instead.
if (DD_AGENT_WRITER_TYPE.equals(configuredType) && config.isCiVisibilityEnabled()) {
if (featuresDiscovery.supportsEvpProxy() || config.isCiVisibilityAgentlessEnabled()) {
if (DD_AGENT_WRITER_TYPE.equals(configuredType) && (config.isCiVisibilityEnabled() || config.isLlmObsEnabled())) {
log.info("SUPPORTS EVP PROXY {}", featuresDiscovery.supportsEvpProxy());
if (featuresDiscovery.supportsEvpProxy() || config.isCiVisibilityAgentlessEnabled() || config.isLlmObsAgentlessEnabled()) {
configuredType = DD_INTAKE_WRITER_TYPE;
} else {
log.info(
Expand Down Expand Up @@ -116,6 +124,13 @@ public static Writer createWriter(
builder.addTrack(TrackType.CITESTCOV, coverageApi);
}

log.debug("BEFORE ADDING LLM OBSERVER");
// if (config.isLlmObsEnabled() && config.isLlmObsAgentlessEnabled()) {
final RemoteApi llmobsApi = createDDIntakeRemoteApi(config, commObjects, featuresDiscovery, TrackType.LLMOBS);
builder.addTrack(TrackType.LLMOBS, llmobsApi);
log.debug("ADDED LLM OBSERVER");


remoteWriter = builder.build();

} else { // configuredType == DDAgentWriter
Expand Down Expand Up @@ -171,26 +186,34 @@ private static RemoteApi createDDIntakeRemoteApi(
SharedCommunicationObjects commObjects,
DDAgentFeaturesDiscovery featuresDiscovery,
TrackType trackType) {
if (featuresDiscovery.supportsEvpProxy() && !config.isCiVisibilityAgentlessEnabled()) {
// TODO make it so that it is agentless for the requested product and not both
if (featuresDiscovery.supportsEvpProxy() && !config.isCiVisibilityAgentlessEnabled() && !config.isLlmObsAgentlessEnabled()) {
return DDEvpProxyApi.builder()
.httpClient(commObjects.okHttpClient)
.agentUrl(commObjects.agentUrl)
.evpProxyEndpoint(featuresDiscovery.getEvpProxyEndpoint())
.trackType(trackType)
.compressionEnabled(featuresDiscovery.supportsContentEncodingHeadersWithEvpProxy())
.build();

} else {
HttpUrl hostUrl = null;
String llmObsAgentlessUrl = config.getLlMObsAgentlessUrl();
log.debug("LLMOBS URL {}", llmObsAgentlessUrl);

if (config.getCiVisibilityAgentlessUrl() != null) {
hostUrl = HttpUrl.get(config.getCiVisibilityAgentlessUrl());
log.info("Using host URL '{}' to report CI Visibility traces in Agentless mode.", hostUrl);
} else if (config.isLlmObsEnabled() && config.isLlmObsAgentlessEnabled() && llmObsAgentlessUrl != null && !llmObsAgentlessUrl.isEmpty()) {
hostUrl = HttpUrl.get(llmObsAgentlessUrl);
log.info("Using host URL '{}' to report LLM Obs traces in Agentless mode.", hostUrl);
}
return DDIntakeApi.builder()
RemoteApi ddintake = DDIntakeApi.builder()
.hostUrl(hostUrl)
.apiKey(config.getApiKey())
.trackType(trackType)
.build();
log.debug("CREATED DD INTAKE for track {} {}", trackType.name(), ddintake);
return ddintake;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ private DDEvpProxyApi(
public Response sendSerializedTraces(Payload payload) {
final int sizeInBytes = payload.sizeInBytes();

log.debug("SENDING PL TO TRACK {}", trackType);
Request.Builder builder =
new Request.Builder()
.url(proxiedApiUrl)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.net.ConnectException;
import java.util.Locale;
import java.util.concurrent.TimeUnit;
import okhttp3.Headers;
import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
import okhttp3.Request;
Expand Down Expand Up @@ -121,16 +122,32 @@ private DDIntakeApi(

@Override
public Response sendSerializedTraces(Payload payload) {
log.debug("DDINTAKE SENDING {} for track {}", payload, trackType);
final int sizeInBytes = payload.sizeInBytes();

final Request request =
new Request.Builder()
.url(intakeUrl)
.addHeader(DD_API_KEY_HEADER, apiKey)
.addHeader(CONTENT_ENCODING_HEADER, GZIP_CONTENT_TYPE)
.post(payload.toRequest())
.tag(OkHttpUtils.CustomListener.class, telemetryListener)
.build();
Request.Builder builder = new Request.Builder()
.url(intakeUrl)
.addHeader(DD_API_KEY_HEADER, apiKey)
.post(payload.toRequest())
.tag(OkHttpUtils.CustomListener.class, telemetryListener);

// if (!trackType.equals(TrackType.LLMOBS)) {
// builder.addHeader(CONTENT_ENCODING_HEADER, GZIP_CONTENT_TYPE);
// }
builder.addHeader(CONTENT_ENCODING_HEADER, GZIP_CONTENT_TYPE);
builder.addHeader("content-type", payload.toRequest().contentType().toString());

final Request request = builder.build();
Headers headers = request.headers();
log.warn("HEADER SIZE {}", headers.size());
for (int i = 0; i < headers.size(); i++) {
String name = headers.name(i);
String value = headers.value(i);
if (name != null && !name.equals(DD_API_KEY_HEADER)) {
log.warn("HEADER {} KEY {} VAL {}", i, name, value);
}
}

totalTraces += payload.traceCount();
receivedTraces += payload.traceCount();

Expand All @@ -143,6 +160,7 @@ public Response sendSerializedTraces(Payload payload) {
InstrumentationBridge.getMetricCollector()
.add(CiVisibilityCountMetric.ENDPOINT_PAYLOAD_DROPPED, 1, trackType.endpoint);
countAndLogFailedSend(payload.traceCount(), sizeInBytes, response, null);
log.error("FAILED TO SEND FOR TRACK {}", trackType);
return Response.failed(response.code());
}

Expand Down
Loading

0 comments on commit 88cbf86

Please sign in to comment.