Skip to content

Add deltatocumulativeprocessor for AMP destinations #1624

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Apr 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ require (

require (
github.com/aws/aws-sdk-go-v2 v1.32.6
github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor v0.115.0
go.opentelemetry.io/collector/component/componenttest v0.115.0
go.opentelemetry.io/collector/config/configtelemetry v0.115.0
go.opentelemetry.io/collector/confmap/converter/expandconverter v0.113.0
Expand Down Expand Up @@ -425,6 +426,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/xray v0.115.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.115.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.115.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics v0.115.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter v0.115.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig v0.115.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka v0.115.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1206,6 +1206,8 @@ github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/metrics v
github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/metrics v0.115.0/go.mod h1:G56rS4nL0VypkD7a94UaQmIjO5t0kffVcjbhpvSogww=
github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.115.0 h1:vRQQFD4YpasQFUAdF030UWtaflSYFXK542bfWMGhOK0=
github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.115.0/go.mod h1:BZ7DT+0VkKR7P3I9PGEDfVa0GdB0ty41eEcejIUXF9A=
github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics v0.115.0 h1:tFUm48xxdtuk3AgY5AY90DJ6UnxRW5k/HBpA24blCAo=
github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics v0.115.0/go.mod h1:EI5GXHQVRNLx78DSyqSU8ZzIxQayUN7KlaeVChk5rJc=
github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter v0.115.0 h1:h6zEsBtuZalQu7lKYf6ZCcj8fTocT+zxdmuOou9515Q=
github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter v0.115.0/go.mod h1:6QU/K0dGCGYorkOvJmhbDFCspy4RPxRkFjf9I64y6I0=
github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8stest v0.115.0 h1:vXDJE8YHfAoYIAlPRtODchlqb6lWnGhJxPaT2ljvN7I=
Expand Down Expand Up @@ -1240,6 +1242,8 @@ github.com/open-telemetry/opentelemetry-collector-contrib/processor/attributespr
github.com/open-telemetry/opentelemetry-collector-contrib/processor/attributesprocessor v0.115.0/go.mod h1:z8XdvlhXSYVboxS3TPGembE9kfxLAYH2PxPLMvf8wTk=
github.com/open-telemetry/opentelemetry-collector-contrib/processor/cumulativetodeltaprocessor v0.115.0 h1:t3BGnPpmeuxW51vISSu51PrAs49ACBCa1Yl1NfZGE5Y=
github.com/open-telemetry/opentelemetry-collector-contrib/processor/cumulativetodeltaprocessor v0.115.0/go.mod h1:jQLYyroEYEV1kWJApmGBgVuGUd73v+Q6EUJ6Wy7N508=
github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor v0.115.0 h1:a0LC50FmNTyWI/vIhKWSyDRw8GLlq2XFIY7GezlK3/w=
github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor v0.115.0/go.mod h1:hNQwpth9RZ+nS9oCDDKRo56XwNc2TGhZGcAKc4CF1AI=
github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatorateprocessor v0.115.0 h1:X6rEs7IxDpcDDBOCmkA3xHmc373UxHchH7BykK3Ao+o=
github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatorateprocessor v0.115.0/go.mod h1:fmLLh7jL0uK/t8op9TieOz7pwxItl4hdFo2fX7U0Etg=
github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterprocessor v0.115.0 h1:ficXJmB6l6kfiu+R6CmggtnlQWMHUNzu2csDYA4CFSs=
Expand Down
2 changes: 2 additions & 0 deletions service/defaultcomponents/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage/filestorage"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/attributesprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/cumulativetodeltaprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatorateprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbytraceprocessor"
Expand Down Expand Up @@ -96,6 +97,7 @@ func Factories() (otelcol.Factories, error) {
awsentity.NewFactory(),
batchprocessor.NewFactory(),
cumulativetodeltaprocessor.NewFactory(),
deltatocumulativeprocessor.NewFactory(),
deltatorateprocessor.NewFactory(),
ec2tagger.NewFactory(),
filterprocessor.NewFactory(),
Expand Down
1 change: 1 addition & 0 deletions service/defaultcomponents/components_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func TestComponents(t *testing.T) {
"attributes",
"batch",
"cumulativetodelta",
"deltatocumulative",
"deltatorate",
"ec2tagger",
"metricsgeneration",
Expand Down
4 changes: 4 additions & 0 deletions translator/tocwconfig/sampleConfig/amp_config_linux.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ processors:
send_batch_max_size: 0
send_batch_size: 8192
timeout: 1m0s
deltatocumulative/host/amp:
max_stale: 336h0m0s
max_streams: 9223372036854775807
ec2tagger:
ec2_instance_tag_keys:
- AutoScalingGroupName
Expand Down Expand Up @@ -152,6 +155,7 @@ service:
- transform
- rollup
- batch/host/amp
- deltatocumulative/host/amp
receivers:
- telegraf_cpu
metrics/host/cloudwatch:
Expand Down
8 changes: 8 additions & 0 deletions translator/tocwconfig/sampleConfig/jmx_config_linux.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@ processors:
match_type: ""
initial_value: 2
max_staleness: 0s
deltatocumulative/host/amp:
max_stale: 336h0m0s
max_streams: 9223372036854775807
deltatocumulative/jmx/amp:
max_stale: 336h0m0s
max_streams: 9223372036854775807
filter/jmx:
error_mode: propagate
logs: {}
Expand Down Expand Up @@ -187,6 +193,7 @@ service:
processors:
- transform
- batch/host/amp
- deltatocumulative/host/amp
receivers:
- telegraf_cpu
- telegraf_disk
Expand All @@ -207,6 +214,7 @@ service:
- resource/jmx
- transform/jmx
- batch/jmx/amp
- deltatocumulative/jmx/amp
receivers:
- jmx
metrics/jmx/cloudwatch:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ processors:
match_type: ""
initial_value: 2
max_staleness: 0s
deltatocumulative/jmx/amp/0:
max_stale: 336h0m0s
max_streams: 9223372036854775807
deltatocumulative/jmx/amp/1:
max_stale: 336h0m0s
max_streams: 9223372036854775807
filter/jmx/0:
error_mode: propagate
logs: {}
Expand Down Expand Up @@ -240,6 +246,7 @@ service:
- transform/jmx/drop
- transform/jmx/0
- batch/jmx/amp/0
- deltatocumulative/jmx/amp/0
receivers:
- otlp/jmx
metrics/jmx/amp/1:
Expand All @@ -252,6 +259,7 @@ service:
- transform/jmx/drop
- transform/jmx/1
- batch/jmx/amp/1
- deltatocumulative/jmx/amp/1
receivers:
- otlp/jmx
metrics/jmx/cloudwatch/0:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,9 @@ processors:
send_batch_max_size: 0
send_batch_size: 8192
timeout: 30s
deltatocumulative/prometheus/amp:
max_stale: 336h0m0s
max_streams: 9223372036854775807
receivers:
prometheus:
config:
Expand Down Expand Up @@ -202,6 +205,7 @@ service:
- prometheusremotewrite/amp
processors:
- batch/prometheus/amp
- deltatocumulative/prometheus/amp
receivers:
- prometheus
metrics/prometheus/cloudwatchlogs:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ processors:
send_batch_max_size: 0
send_batch_size: 8192
timeout: 1m0s
deltatocumulative/prometheus/amp:
max_stale: 336h0m0s
max_streams: 9223372036854775807
receivers:
prometheus:
config:
Expand Down Expand Up @@ -107,6 +110,7 @@ service:
- prometheusremotewrite/amp
processors:
- batch/prometheus/amp
- deltatocumulative/prometheus/amp
receivers:
- prometheus
telemetry:
Expand Down
3 changes: 3 additions & 0 deletions translator/translate/otel/pipeline/host/translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/awsentity"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/batchprocessor"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/cumulativetodeltaprocessor"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/deltatocumulativeprocessor"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/ec2taggerprocessor"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/metricsdecorator"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/rollupprocessor"
Expand Down Expand Up @@ -136,6 +137,8 @@ func (t translator) Translate(conf *confmap.Conf) (*common.ComponentTranslators,
translators.Processors.Set(rollupprocessor.NewTranslator())
}
translators.Processors.Set(batchprocessor.NewTranslatorWithNameAndSection(t.name, common.MetricsKey))
// prometheusremotewrite doesn't support delta metrics so convert them to cumulative metrics
translators.Processors.Set(deltatocumulativeprocessor.NewTranslator(common.WithName(t.name)))
translators.Exporters.Set(prometheusremotewrite.NewTranslatorWithName(common.AMPKey))
translators.Extensions.Set(sigv4auth.NewTranslator())
case common.CloudWatchLogsKey:
Expand Down
4 changes: 2 additions & 2 deletions translator/translate/otel/pipeline/host/translator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ func TestTranslator(t *testing.T) {
want: &want{
pipelineID: "metrics/host/amp",
receivers: []string{"nop", "other"},
processors: []string{"rollup", "batch/host/amp"},
processors: []string{"rollup", "batch/host/amp", "deltatocumulative/host/amp"},
exporters: []string{"prometheusremotewrite/amp"},
extensions: []string{"sigv4auth"},
},
Expand All @@ -322,7 +322,7 @@ func TestTranslator(t *testing.T) {
want: &want{
pipelineID: "metrics/host/amp",
receivers: []string{"nop", "other"},
processors: []string{"batch/host/amp"},
processors: []string{"batch/host/amp", "deltatocumulative/host/amp"},
exporters: []string{"prometheusremotewrite/amp"},
extensions: []string{"sigv4auth"},
},
Expand Down
3 changes: 3 additions & 0 deletions translator/translate/otel/pipeline/jmx/translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/extension/sigv4auth"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/batchprocessor"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/cumulativetodeltaprocessor"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/deltatocumulativeprocessor"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/ec2taggerprocessor"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/filterprocessor"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/metricsdecorator"
Expand Down Expand Up @@ -120,6 +121,8 @@ func (t *translator) Translate(conf *confmap.Conf) (*common.ComponentTranslators
if conf.IsSet(common.MetricsAggregationDimensionsKey) {
translators.Processors.Set(rollupprocessor.NewTranslator())
}
// prometheusremotewrite doesn't support delta metrics so convert them to cumulative metrics
translators.Processors.Set(deltatocumulativeprocessor.NewTranslator(common.WithName(t.name)))
translators.Exporters.Set(prometheusremotewrite.NewTranslatorWithName(common.AMPKey))
translators.Extensions.Set(sigv4auth.NewTranslator())
default:
Expand Down
6 changes: 3 additions & 3 deletions translator/translate/otel/pipeline/jmx/translator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func TestTranslator(t *testing.T) {
want: &want{
pipelineID: "metrics/jmx/amp",
receivers: []string{"jmx"},
processors: []string{"filter/jmx", "resource/jmx", "batch/jmx/amp"},
processors: []string{"filter/jmx", "resource/jmx", "batch/jmx/amp", "deltatocumulative/jmx/amp"},
exporters: []string{"prometheusremotewrite/amp"},
extensions: []string{"sigv4auth"},
},
Expand Down Expand Up @@ -238,7 +238,7 @@ func TestTranslator(t *testing.T) {
want: &want{
pipelineID: "metrics/jmx/amp",
receivers: []string{"otlp/jmx"},
processors: []string{"filter/jmx", "metricstransform/jmx", "transform/jmx/drop", "batch/jmx/amp"},
processors: []string{"filter/jmx", "metricstransform/jmx", "transform/jmx/drop", "batch/jmx/amp", "deltatocumulative/jmx/amp"},
exporters: []string{"prometheusremotewrite/amp"},
extensions: []string{"sigv4auth"},
},
Expand Down Expand Up @@ -338,7 +338,7 @@ func TestTranslator(t *testing.T) {
want: &want{
pipelineID: "metrics/jmx/amp/0",
receivers: []string{"otlp/jmx"},
processors: []string{"filter/jmx/0", "metricstransform/jmx", "transform/jmx/drop", "transform/jmx/0", "batch/jmx/amp/0"},
processors: []string{"filter/jmx/0", "metricstransform/jmx", "transform/jmx/drop", "transform/jmx/0", "batch/jmx/amp/0", "deltatocumulative/jmx/amp/0"},
exporters: []string{"prometheusremotewrite/amp"},
extensions: []string{"sigv4auth"},
},
Expand Down
9 changes: 7 additions & 2 deletions translator/translate/otel/pipeline/prometheus/translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/extension/agenthealth"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/extension/sigv4auth"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/batchprocessor"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/deltatocumulativeprocessor"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/rollupprocessor"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/receiver/adapter"
otelprom "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/receiver/prometheus"
Expand Down Expand Up @@ -79,8 +80,12 @@ func (t *translator) Translate(conf *confmap.Conf) (*common.ComponentTranslators
return nil, fmt.Errorf("pipeline (%s) is missing prometheus configuration under metrics section with destination (%s)", t.name, t.Destination())
}
translators := &common.ComponentTranslators{
Receivers: common.NewTranslatorMap(otelprom.NewTranslator()),
Processors: common.NewTranslatorMap(batchprocessor.NewTranslatorWithNameAndSection(t.name, common.MetricsKey)),
Receivers: common.NewTranslatorMap(otelprom.NewTranslator()),
Processors: common.NewTranslatorMap(
batchprocessor.NewTranslatorWithNameAndSection(t.name, common.MetricsKey),
// prometheusremotewrite doesn't support delta metrics so convert them to cumulative metrics
deltatocumulativeprocessor.NewTranslator(common.WithName(t.name)),
),
Exporters: common.NewTranslatorMap(prometheusremotewrite.NewTranslatorWithName(common.AMPKey)),
Extensions: common.NewTranslatorMap(sigv4auth.NewTranslator()),
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func TestTranslator(t *testing.T) {
want: &want{
pipelineID: "metrics/prometheus/amp",
receivers: []string{"prometheus"},
processors: []string{"batch/prometheus/amp"},
processors: []string{"batch/prometheus/amp", "deltatocumulative/prometheus/amp"},
exporters: []string{"prometheusremotewrite/amp"},
extensions: []string{"sigv4auth"},
},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

package deltatocumulativeprocessor

import (
"time"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/processor"

"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/common"
)

type translator struct {
factory processor.Factory
common.NameProvider
}

var _ common.ComponentTranslator = (*translator)(nil)
var _ common.NameSetter = (*translator)(nil)

func NewTranslator(opts ...common.TranslatorOption) common.ComponentTranslator {
t := &translator{factory: deltatocumulativeprocessor.NewFactory()}
for _, opt := range opts {
opt(t)
}
return t
}

func (t *translator) ID() component.ID {
return component.NewIDWithName(t.factory.Type(), t.Name())
}

// Translate creates a processor config
func (t *translator) Translate(_ *confmap.Conf) (component.Config, error) {
cfg := t.factory.CreateDefaultConfig().(*deltatocumulativeprocessor.Config)
cfg.MaxStale = 14 * 24 * time.Hour // two weeks
return cfg, nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

package deltatocumulativeprocessor

import (
"math"
"testing"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/confmap"

"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/common"
)

func TestTranslator(t *testing.T) {
dcpTranslator := NewTranslator(common.WithName("test"))
require.EqualValues(t, "deltatocumulative/test", dcpTranslator.ID().String())
testCases := map[string]struct {
input map[string]any
want map[string]any
wantErr error
}{
"EmptyConfig": {
input: map[string]any{},
want: map[string]any{
"max_stale": 1209600000000000, // 2 weeks, in minutes
"max_streams": math.MaxInt64,
},
},
}
factory := deltatocumulativeprocessor.NewFactory()
for name, testCase := range testCases {
t.Run(name, func(t *testing.T) {
conf := confmap.NewFromStringMap(testCase.input)
got, err := dcpTranslator.Translate(conf)
require.Equal(t, testCase.wantErr, err)
if err == nil {
require.NotNil(t, got)
gotCfg, ok := got.(*deltatocumulativeprocessor.Config)
require.True(t, ok)
wantCfg := factory.CreateDefaultConfig()
wantConf := confmap.NewFromStringMap(testCase.want)
require.NoError(t, wantConf.Unmarshal(&wantCfg))
assert.Equal(t, wantCfg, gotCfg)
}
})
}
}