From a8d5e92b45b2cef7a0e49b3b35746aac680de1af Mon Sep 17 00:00:00 2001 From: Dustin Long Date: Fri, 17 Oct 2025 13:28:58 -0400 Subject: [PATCH] Diagnose command only sends to softinv endpoint when it is enabled (cherry picked from commit 8367c752144b9dd65468f2496991336c5faf90b7) --- .../eventplatformimpl/epforwarder.go | 402 +++++++++--------- 1 file changed, 205 insertions(+), 197 deletions(-) diff --git a/comp/forwarder/eventplatform/eventplatformimpl/epforwarder.go b/comp/forwarder/eventplatform/eventplatformimpl/epforwarder.go index 421580a85c2fc4..6503f19cf8f79d 100644 --- a/comp/forwarder/eventplatform/eventplatformimpl/epforwarder.go +++ b/comp/forwarder/eventplatform/eventplatformimpl/epforwarder.go @@ -53,201 +53,209 @@ const ( eventTypeDBMHealth = "dbm-health" ) -var passthroughPipelineDescs = []passthroughPipelineDesc{ - { - eventType: eventTypeDBMSamples, - category: "DBM", - contentType: logshttp.JSONContentType, - endpointsConfigPrefix: "database_monitoring.samples.", - hostnameEndpointPrefix: "dbm-metrics-intake.", - intakeTrackType: "databasequery", - // raise the default batch_max_concurrent_send from 0 to 10 to ensure this pipeline is able to handle 4k events/s - defaultBatchMaxConcurrentSend: 10, - defaultBatchMaxContentSize: 10e6, - defaultBatchMaxSize: pkgconfigsetup.DefaultBatchMaxSize, - // High input chan size is needed to handle high number of DBM events being flushed by DBM integrations - defaultInputChanSize: 500, - }, - { - eventType: eventTypeDBMMetrics, - category: "DBM", - contentType: logshttp.JSONContentType, - endpointsConfigPrefix: "database_monitoring.metrics.", - hostnameEndpointPrefix: "dbm-metrics-intake.", - intakeTrackType: "dbmmetrics", - // raise the default batch_max_concurrent_send from 0 to 10 to ensure this pipeline is able to handle 4k events/s - defaultBatchMaxConcurrentSend: 10, - defaultBatchMaxContentSize: 20e6, - defaultBatchMaxSize: pkgconfigsetup.DefaultBatchMaxSize, - // High input chan size is needed to handle high number of DBM events being flushed by DBM integrations - defaultInputChanSize: 500, - }, - { - eventType: eventTypeDBMMetadata, - contentType: logshttp.JSONContentType, - // set the endpoint config to "metrics" since metadata will hit the same endpoint - // as metrics, so there is no need to add an extra config endpoint. - // As a follow-on PR, we should clean this up to have a single config for each track type since - // all of our data now flows through the same intake - endpointsConfigPrefix: "database_monitoring.metrics.", - hostnameEndpointPrefix: "dbm-metrics-intake.", - intakeTrackType: "dbmmetadata", - // raise the default batch_max_concurrent_send from 0 to 10 to ensure this pipeline is able to handle 4k events/s - defaultBatchMaxConcurrentSend: 10, - defaultBatchMaxContentSize: 20e6, - defaultBatchMaxSize: pkgconfigsetup.DefaultBatchMaxSize, - // High input chan size is needed to handle high number of DBM events being flushed by DBM integrations - defaultInputChanSize: 500, - }, - { - eventType: eventTypeDBMActivity, - category: "DBM", - contentType: logshttp.JSONContentType, - endpointsConfigPrefix: "database_monitoring.activity.", - hostnameEndpointPrefix: "dbm-metrics-intake.", - intakeTrackType: "dbmactivity", - // raise the default batch_max_concurrent_send from 0 to 10 to ensure this pipeline is able to handle 4k events/s - defaultBatchMaxConcurrentSend: 10, - defaultBatchMaxContentSize: 20e6, - defaultBatchMaxSize: pkgconfigsetup.DefaultBatchMaxSize, - // High input chan size is needed to handle high number of DBM events being flushed by DBM integrations - defaultInputChanSize: 500, - }, - { - eventType: eventTypeDBMHealth, - contentType: logshttp.JSONContentType, - // set the endpoint config to "metrics" since health will hit the same endpoint - // as metrics, so there is no need to add an extra config endpoint. - endpointsConfigPrefix: "database_monitoring.metrics.", - hostnameEndpointPrefix: "dbm-metrics-intake.", - intakeTrackType: "dbmhealth", - // raise the default batch_max_concurrent_send from 0 to 10 to ensure this pipeline is able to handle 4k events/s - defaultBatchMaxConcurrentSend: 10, - defaultBatchMaxContentSize: 20e6, - defaultBatchMaxSize: pkgconfigsetup.DefaultBatchMaxSize, - // High input chan size is needed to handle high number of DBM events being flushed by DBM integrations - defaultInputChanSize: 500, - }, { - eventType: eventplatform.EventTypeNetworkDevicesMetadata, - category: "NDM", - contentType: logshttp.JSONContentType, - endpointsConfigPrefix: "network_devices.metadata.", - hostnameEndpointPrefix: "ndm-intake.", - intakeTrackType: "ndm", - defaultBatchMaxConcurrentSend: 10, - defaultBatchMaxContentSize: pkgconfigsetup.DefaultBatchMaxContentSize, - defaultBatchMaxSize: pkgconfigsetup.DefaultBatchMaxSize, - defaultInputChanSize: pkgconfigsetup.DefaultInputChanSize, - }, - { - eventType: eventplatform.EventTypeSnmpTraps, - category: "NDM", - contentType: logshttp.JSONContentType, - endpointsConfigPrefix: "network_devices.snmp_traps.forwarder.", - hostnameEndpointPrefix: "snmp-traps-intake.", - intakeTrackType: "ndmtraps", - defaultBatchMaxConcurrentSend: 10, - defaultBatchMaxContentSize: pkgconfigsetup.DefaultBatchMaxContentSize, - defaultBatchMaxSize: pkgconfigsetup.DefaultBatchMaxSize, - defaultInputChanSize: pkgconfigsetup.DefaultInputChanSize, - }, - { - eventType: eventplatform.EventTypeNetworkDevicesNetFlow, - category: "NDM", - contentType: logshttp.JSONContentType, - endpointsConfigPrefix: "network_devices.netflow.forwarder.", - hostnameEndpointPrefix: "ndmflow-intake.", - intakeTrackType: "ndmflow", - defaultBatchMaxConcurrentSend: 10, - defaultBatchMaxContentSize: pkgconfigsetup.DefaultBatchMaxContentSize, - - // Each NetFlow flow is about 500 bytes - // 10k BatchMaxSize is about 5Mo of content size - defaultBatchMaxSize: 10000, - // High input chan is needed to handle high number of flows being flushed by NetFlow Server every 10s - // Customers might need to set `network_devices.forwarder.input_chan_size` to higher value if flows are dropped - // due to input channel being full. - // TODO: A possible better solution is to make SendEventPlatformEvent blocking when input chan is full and avoid - // dropping events. This can't be done right now due to SendEventPlatformEvent being called by - // aggregator loop, making SendEventPlatformEvent blocking might slow down other type of data handled - // by aggregator. - defaultInputChanSize: 10000, - }, - { - eventType: eventplatform.EventTypeNetworkPath, - category: "Network Path", - contentType: logshttp.JSONContentType, - endpointsConfigPrefix: "network_path.forwarder.", - hostnameEndpointPrefix: "netpath-intake.", - intakeTrackType: "netpath", - defaultBatchMaxConcurrentSend: 10, - defaultBatchMaxContentSize: pkgconfigsetup.DefaultBatchMaxContentSize, - defaultBatchMaxSize: pkgconfigsetup.DefaultBatchMaxSize, - defaultInputChanSize: pkgconfigsetup.DefaultInputChanSize, - }, - { - eventType: eventplatform.EventTypeContainerLifecycle, - category: "Container", - contentType: logshttp.ProtobufContentType, - endpointsConfigPrefix: "container_lifecycle.", - hostnameEndpointPrefix: "contlcycle-intake.", - intakeTrackType: "contlcycle", - defaultBatchMaxConcurrentSend: 10, - defaultBatchMaxContentSize: pkgconfigsetup.DefaultBatchMaxContentSize, - defaultBatchMaxSize: pkgconfigsetup.DefaultBatchMaxSize, - defaultInputChanSize: pkgconfigsetup.DefaultInputChanSize, - }, - { - eventType: eventplatform.EventTypeContainerImages, - category: "Container", - contentType: logshttp.ProtobufContentType, - endpointsConfigPrefix: "container_image.", - hostnameEndpointPrefix: "contimage-intake.", - intakeTrackType: "contimage", - defaultBatchMaxConcurrentSend: 10, - defaultBatchMaxContentSize: pkgconfigsetup.DefaultBatchMaxContentSize, - defaultBatchMaxSize: pkgconfigsetup.DefaultBatchMaxSize, - defaultInputChanSize: pkgconfigsetup.DefaultInputChanSize, - }, - { - eventType: eventplatform.EventTypeContainerSBOM, - category: "SBOM", - contentType: logshttp.ProtobufContentType, - endpointsConfigPrefix: "sbom.", - hostnameEndpointPrefix: "sbom-intake.", - intakeTrackType: "sbom", - defaultBatchMaxConcurrentSend: 10, - defaultBatchMaxContentSize: pkgconfigsetup.DefaultBatchMaxContentSize, - defaultBatchMaxSize: pkgconfigsetup.DefaultBatchMaxSize, - // on every periodic refresh, we re-send all the SBOMs for all the - // container images in the workloadmeta store. This can be a lot of - // payloads at once, so we need a large input channel size to avoid dropping - defaultInputChanSize: 1000, - }, - { - eventType: eventplatform.EventTypeSoftwareInventory, - category: "EUDM", - contentType: logshttp.JSONContentType, - endpointsConfigPrefix: "software_inventory.forwarder.", - hostnameEndpointPrefix: "event-platform-intake.", - intakeTrackType: "softinv", - defaultBatchMaxConcurrentSend: pkgconfigsetup.DefaultBatchMaxConcurrentSend, - defaultBatchMaxContentSize: pkgconfigsetup.DefaultBatchMaxContentSize, - defaultBatchMaxSize: pkgconfigsetup.DefaultBatchMaxSize, - defaultInputChanSize: pkgconfigsetup.DefaultInputChanSize, - }, - { - eventType: eventplatform.EventTypeSynthetics, - category: "Synthetics", - contentType: logshttp.JSONContentType, - endpointsConfigPrefix: "synthetics.forwarder.", - hostnameEndpointPrefix: "http-synthetics.", - intakeTrackType: "synthetics", - defaultBatchMaxConcurrentSend: 10, - defaultBatchMaxContentSize: pkgconfigsetup.DefaultBatchMaxContentSize, - defaultBatchMaxSize: pkgconfigsetup.DefaultBatchMaxSize, - defaultInputChanSize: pkgconfigsetup.DefaultInputChanSize, - }, +func getPassthroughPipelines() []passthroughPipelineDesc { + var passthroughPipelineDescs = []passthroughPipelineDesc{ + { + eventType: eventTypeDBMSamples, + category: "DBM", + contentType: logshttp.JSONContentType, + endpointsConfigPrefix: "database_monitoring.samples.", + hostnameEndpointPrefix: "dbm-metrics-intake.", + intakeTrackType: "databasequery", + // raise the default batch_max_concurrent_send from 0 to 10 to ensure this pipeline is able to handle 4k events/s + defaultBatchMaxConcurrentSend: 10, + defaultBatchMaxContentSize: 10e6, + defaultBatchMaxSize: pkgconfigsetup.DefaultBatchMaxSize, + // High input chan size is needed to handle high number of DBM events being flushed by DBM integrations + defaultInputChanSize: 500, + }, + { + eventType: eventTypeDBMMetrics, + category: "DBM", + contentType: logshttp.JSONContentType, + endpointsConfigPrefix: "database_monitoring.metrics.", + hostnameEndpointPrefix: "dbm-metrics-intake.", + intakeTrackType: "dbmmetrics", + // raise the default batch_max_concurrent_send from 0 to 10 to ensure this pipeline is able to handle 4k events/s + defaultBatchMaxConcurrentSend: 10, + defaultBatchMaxContentSize: 20e6, + defaultBatchMaxSize: pkgconfigsetup.DefaultBatchMaxSize, + // High input chan size is needed to handle high number of DBM events being flushed by DBM integrations + defaultInputChanSize: 500, + }, + { + eventType: eventTypeDBMMetadata, + contentType: logshttp.JSONContentType, + // set the endpoint config to "metrics" since metadata will hit the same endpoint + // as metrics, so there is no need to add an extra config endpoint. + // As a follow-on PR, we should clean this up to have a single config for each track type since + // all of our data now flows through the same intake + endpointsConfigPrefix: "database_monitoring.metrics.", + hostnameEndpointPrefix: "dbm-metrics-intake.", + intakeTrackType: "dbmmetadata", + // raise the default batch_max_concurrent_send from 0 to 10 to ensure this pipeline is able to handle 4k events/s + defaultBatchMaxConcurrentSend: 10, + defaultBatchMaxContentSize: 20e6, + defaultBatchMaxSize: pkgconfigsetup.DefaultBatchMaxSize, + // High input chan size is needed to handle high number of DBM events being flushed by DBM integrations + defaultInputChanSize: 500, + }, + { + eventType: eventTypeDBMActivity, + category: "DBM", + contentType: logshttp.JSONContentType, + endpointsConfigPrefix: "database_monitoring.activity.", + hostnameEndpointPrefix: "dbm-metrics-intake.", + intakeTrackType: "dbmactivity", + // raise the default batch_max_concurrent_send from 0 to 10 to ensure this pipeline is able to handle 4k events/s + defaultBatchMaxConcurrentSend: 10, + defaultBatchMaxContentSize: 20e6, + defaultBatchMaxSize: pkgconfigsetup.DefaultBatchMaxSize, + // High input chan size is needed to handle high number of DBM events being flushed by DBM integrations + defaultInputChanSize: 500, + }, + { + eventType: eventTypeDBMHealth, + contentType: logshttp.JSONContentType, + // set the endpoint config to "metrics" since health will hit the same endpoint + // as metrics, so there is no need to add an extra config endpoint. + endpointsConfigPrefix: "database_monitoring.metrics.", + hostnameEndpointPrefix: "dbm-metrics-intake.", + intakeTrackType: "dbmhealth", + // raise the default batch_max_concurrent_send from 0 to 10 to ensure this pipeline is able to handle 4k events/s + defaultBatchMaxConcurrentSend: 10, + defaultBatchMaxContentSize: 20e6, + defaultBatchMaxSize: pkgconfigsetup.DefaultBatchMaxSize, + // High input chan size is needed to handle high number of DBM events being flushed by DBM integrations + defaultInputChanSize: 500, + }, { + eventType: eventplatform.EventTypeNetworkDevicesMetadata, + category: "NDM", + contentType: logshttp.JSONContentType, + endpointsConfigPrefix: "network_devices.metadata.", + hostnameEndpointPrefix: "ndm-intake.", + intakeTrackType: "ndm", + defaultBatchMaxConcurrentSend: 10, + defaultBatchMaxContentSize: pkgconfigsetup.DefaultBatchMaxContentSize, + defaultBatchMaxSize: pkgconfigsetup.DefaultBatchMaxSize, + defaultInputChanSize: pkgconfigsetup.DefaultInputChanSize, + }, + { + eventType: eventplatform.EventTypeSnmpTraps, + category: "NDM", + contentType: logshttp.JSONContentType, + endpointsConfigPrefix: "network_devices.snmp_traps.forwarder.", + hostnameEndpointPrefix: "snmp-traps-intake.", + intakeTrackType: "ndmtraps", + defaultBatchMaxConcurrentSend: 10, + defaultBatchMaxContentSize: pkgconfigsetup.DefaultBatchMaxContentSize, + defaultBatchMaxSize: pkgconfigsetup.DefaultBatchMaxSize, + defaultInputChanSize: pkgconfigsetup.DefaultInputChanSize, + }, + { + eventType: eventplatform.EventTypeNetworkDevicesNetFlow, + category: "NDM", + contentType: logshttp.JSONContentType, + endpointsConfigPrefix: "network_devices.netflow.forwarder.", + hostnameEndpointPrefix: "ndmflow-intake.", + intakeTrackType: "ndmflow", + defaultBatchMaxConcurrentSend: 10, + defaultBatchMaxContentSize: pkgconfigsetup.DefaultBatchMaxContentSize, + + // Each NetFlow flow is about 500 bytes + // 10k BatchMaxSize is about 5Mo of content size + defaultBatchMaxSize: 10000, + // High input chan is needed to handle high number of flows being flushed by NetFlow Server every 10s + // Customers might need to set `network_devices.forwarder.input_chan_size` to higher value if flows are dropped + // due to input channel being full. + // TODO: A possible better solution is to make SendEventPlatformEvent blocking when input chan is full and avoid + // dropping events. This can't be done right now due to SendEventPlatformEvent being called by + // aggregator loop, making SendEventPlatformEvent blocking might slow down other type of data handled + // by aggregator. + defaultInputChanSize: 10000, + }, + { + eventType: eventplatform.EventTypeNetworkPath, + category: "Network Path", + contentType: logshttp.JSONContentType, + endpointsConfigPrefix: "network_path.forwarder.", + hostnameEndpointPrefix: "netpath-intake.", + intakeTrackType: "netpath", + defaultBatchMaxConcurrentSend: 10, + defaultBatchMaxContentSize: pkgconfigsetup.DefaultBatchMaxContentSize, + defaultBatchMaxSize: pkgconfigsetup.DefaultBatchMaxSize, + defaultInputChanSize: pkgconfigsetup.DefaultInputChanSize, + }, + { + eventType: eventplatform.EventTypeContainerLifecycle, + category: "Container", + contentType: logshttp.ProtobufContentType, + endpointsConfigPrefix: "container_lifecycle.", + hostnameEndpointPrefix: "contlcycle-intake.", + intakeTrackType: "contlcycle", + defaultBatchMaxConcurrentSend: 10, + defaultBatchMaxContentSize: pkgconfigsetup.DefaultBatchMaxContentSize, + defaultBatchMaxSize: pkgconfigsetup.DefaultBatchMaxSize, + defaultInputChanSize: pkgconfigsetup.DefaultInputChanSize, + }, + { + eventType: eventplatform.EventTypeContainerImages, + category: "Container", + contentType: logshttp.ProtobufContentType, + endpointsConfigPrefix: "container_image.", + hostnameEndpointPrefix: "contimage-intake.", + intakeTrackType: "contimage", + defaultBatchMaxConcurrentSend: 10, + defaultBatchMaxContentSize: pkgconfigsetup.DefaultBatchMaxContentSize, + defaultBatchMaxSize: pkgconfigsetup.DefaultBatchMaxSize, + defaultInputChanSize: pkgconfigsetup.DefaultInputChanSize, + }, + { + eventType: eventplatform.EventTypeContainerSBOM, + category: "SBOM", + contentType: logshttp.ProtobufContentType, + endpointsConfigPrefix: "sbom.", + hostnameEndpointPrefix: "sbom-intake.", + intakeTrackType: "sbom", + defaultBatchMaxConcurrentSend: 10, + defaultBatchMaxContentSize: pkgconfigsetup.DefaultBatchMaxContentSize, + defaultBatchMaxSize: pkgconfigsetup.DefaultBatchMaxSize, + // on every periodic refresh, we re-send all the SBOMs for all the + // container images in the workloadmeta store. This can be a lot of + // payloads at once, so we need a large input channel size to avoid dropping + defaultInputChanSize: 1000, + }, + { + eventType: eventplatform.EventTypeSynthetics, + category: "Synthetics", + contentType: logshttp.JSONContentType, + endpointsConfigPrefix: "synthetics.forwarder.", + hostnameEndpointPrefix: "http-synthetics.", + intakeTrackType: "synthetics", + defaultBatchMaxConcurrentSend: 10, + defaultBatchMaxContentSize: pkgconfigsetup.DefaultBatchMaxContentSize, + defaultBatchMaxSize: pkgconfigsetup.DefaultBatchMaxSize, + defaultInputChanSize: pkgconfigsetup.DefaultInputChanSize, + }, + } + + if pkgconfigsetup.Datadog().GetBool("software_inventory.enabled") { + softinvPipeline := passthroughPipelineDesc{ + eventType: eventplatform.EventTypeSoftwareInventory, + category: "EUDM", + contentType: logshttp.JSONContentType, + endpointsConfigPrefix: "software_inventory.forwarder.", + hostnameEndpointPrefix: "event-platform-intake.", + intakeTrackType: "softinv", + defaultBatchMaxConcurrentSend: pkgconfigsetup.DefaultBatchMaxConcurrentSend, + defaultBatchMaxContentSize: pkgconfigsetup.DefaultBatchMaxContentSize, + defaultBatchMaxSize: pkgconfigsetup.DefaultBatchMaxSize, + defaultInputChanSize: pkgconfigsetup.DefaultInputChanSize, + } + passthroughPipelineDescs = append(passthroughPipelineDescs, softinvPipeline) + } + + return passthroughPipelineDescs } type defaultEventPlatformForwarder struct { @@ -279,7 +287,7 @@ func (s *defaultEventPlatformForwarder) SendEventPlatformEvent(e *message.Messag func Diagnose() []diagnose.Diagnosis { var diagnoses []diagnose.Diagnosis - for _, desc := range passthroughPipelineDescs { + for _, desc := range getPassthroughPipelines() { configKeys := config.NewLogsConfigKeys(desc.endpointsConfigPrefix, pkgconfigsetup.Datadog()) endpoints, err := config.BuildHTTPEndpointsWithConfig(pkgconfigsetup.Datadog(), configKeys, desc.hostnameEndpointPrefix, desc.intakeTrackType, config.DefaultIntakeProtocol, config.DefaultIntakeOrigin) if err != nil { @@ -538,7 +546,7 @@ func newDefaultEventPlatformForwarder(config model.Reader, eventPlatformReceiver destinationsCtx := client.NewDestinationsContext() destinationsCtx.Start() pipelines := make(map[string]*passthroughPipeline) - for i, desc := range passthroughPipelineDescs { + for i, desc := range getPassthroughPipelines() { p, err := newHTTPPassthroughPipeline(config, eventPlatformReceiver, compression, desc, destinationsCtx, i) if err != nil { log.Errorf("Failed to initialize event platform forwarder pipeline. eventType=%s, error=%s", desc.eventType, err.Error())