diff --git a/go.mod b/go.mod index ffa9147651..c8cb2d9d2f 100644 --- a/go.mod +++ b/go.mod @@ -181,6 +181,7 @@ require ( go.opentelemetry.io/collector/exporter/nopexporter v0.115.0 go.opentelemetry.io/collector/extension v0.115.0 go.opentelemetry.io/collector/extension/zpagesextension v0.115.0 + go.opentelemetry.io/collector/filter v0.115.0 go.opentelemetry.io/collector/otelcol v0.115.0 go.opentelemetry.io/collector/processor v0.115.0 go.opentelemetry.io/collector/processor/batchprocessor v0.115.0 @@ -224,6 +225,7 @@ require ( go.opentelemetry.io/collector/processor/processortest v0.115.0 go.opentelemetry.io/collector/receiver/receivertest v0.115.0 go.opentelemetry.io/collector/scraper v0.115.0 + go.uber.org/goleak v1.3.0 ) require ( diff --git a/go.sum b/go.sum index 48320727a8..3d8f0ecb60 100644 --- a/go.sum +++ b/go.sum @@ -1681,6 +1681,8 @@ go.opentelemetry.io/collector/extension/zpagesextension v0.115.0 h1:zYrZZocc7n0Z go.opentelemetry.io/collector/extension/zpagesextension v0.115.0/go.mod h1:OaXwNHF3MAcInBzCXrhXbTNHfIi9b7YGhXjtCFZqxNY= go.opentelemetry.io/collector/featuregate v1.22.0 h1:1TUcdqA5VpEsX1Lrr6GG15CptZxDXxiu5AXgwpeNSR4= go.opentelemetry.io/collector/featuregate v1.22.0/go.mod h1:3GaXqflNDVwWndNGBJ1+XJFy3Fv/XrFgjMN60N3z7yg= +go.opentelemetry.io/collector/filter v0.115.0 h1:pYnHUFDSHSjEIFZit+CU09itVkDXgV+WcV2HOkjvQcE= +go.opentelemetry.io/collector/filter v0.115.0/go.mod h1:aewQ+jmvpH88gPVWpNXiWSm+wwJVxTK4f23ex2NMd2c= go.opentelemetry.io/collector/internal/fanoutconsumer v0.115.0 h1:6DRiSECeApFq6Jj5ug77rG53R6FzJEZBfygkyMEXdpg= go.opentelemetry.io/collector/internal/fanoutconsumer v0.115.0/go.mod h1:vgQf5HQdmLQqpDHpDq2S3nTRoUuKtRcZpRTsy+UiwYw= go.opentelemetry.io/collector/internal/memorylimiter v0.115.0 h1:U07IJxyHZXM6eLn8cOq/Lycx6DhQZhpDOuYtIRw/d6I= diff --git a/receiver/awsebsnvmereceiver/config.go b/receiver/awsebsnvmereceiver/config.go new file mode 100644 index 0000000000..ad906a506d --- /dev/null +++ b/receiver/awsebsnvmereceiver/config.go @@ -0,0 +1,19 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package awsebsnvmereceiver + +import ( + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/receiver/scraperhelper" + + "github.com/aws/amazon-cloudwatch-agent/receiver/awsebsnvmereceiver/internal/metadata" +) + +type Config struct { + scraperhelper.ControllerConfig `mapstructure:",squash"` + metadata.MetricsBuilderConfig `mapstructure:",squash"` + Devices []string `mapstructure:"devices,omitempty"` +} + +var _ component.Config = (*Config)(nil) diff --git a/receiver/awsebsnvmereceiver/config_test.go b/receiver/awsebsnvmereceiver/config_test.go new file mode 100644 index 0000000000..55193c26ed --- /dev/null +++ b/receiver/awsebsnvmereceiver/config_test.go @@ -0,0 +1,51 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package awsebsnvmereceiver + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestConfigValidate(t *testing.T) { + c := Config{} + err := c.Validate() + require.NotNil(t, err) +} + +func TestConfigWithDevices(t *testing.T) { + testCases := []struct { + name string + devices []string + }{ + { + name: "empty devices", + devices: []string{}, + }, + { + name: "single device", + devices: []string{"nvme0n1"}, + }, + { + name: "multiple devices", + devices: []string{"nvme0n1", "nvme1n1"}, + }, + { + name: "wildcard", + devices: []string{"*"}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + cfg := createDefaultConfig().(*Config) + cfg.Devices = tc.devices + + // Just verify we can set the devices field + assert.Equal(t, tc.devices, cfg.Devices) + }) + } +} diff --git a/receiver/awsebsnvmereceiver/documentation.md b/receiver/awsebsnvmereceiver/documentation.md new file mode 100644 index 0000000000..c60f78eb0d --- /dev/null +++ b/receiver/awsebsnvmereceiver/documentation.md @@ -0,0 +1,117 @@ +[comment]: <> (Code generated by mdatagen. DO NOT EDIT.) + +# awsebsnvmereceiver + +## Default Metrics + +The following metrics are emitted by default. Each of them can be disabled by applying the following configuration: + +```yaml +metrics: + : + enabled: false +``` + +### diskio_ebs_total_read_ops + +The total number of completed read operations + +| Unit | Metric Type | Value Type | Aggregation Temporality | Monotonic | +| ---- | ----------- | ---------- | ----------------------- | --------- | +| 1 | Sum | Int | Cumulative | true | + +## Optional Metrics + +The following metrics are not emitted by default. Each of them can be enabled by applying the following configuration: + +```yaml +metrics: + : + enabled: true +``` + +### diskio_ebs_ec2_instance_performance_exceeded_iops + +The total time, in microseconds, that the EBS volume exceeded the attached Amazon EC2 instance's maximum IOPS performance + +| Unit | Metric Type | Value Type | Aggregation Temporality | Monotonic | +| ---- | ----------- | ---------- | ----------------------- | --------- | +| us | Sum | Int | Cumulative | true | + +### diskio_ebs_ec2_instance_performance_exceeded_tp + +The total time, in microseconds, that the EBS volume exceeded the attached Amazon EC2 instance's maximum throughput performance + +| Unit | Metric Type | Value Type | Aggregation Temporality | Monotonic | +| ---- | ----------- | ---------- | ----------------------- | --------- | +| us | Sum | Int | Cumulative | true | + +### diskio_ebs_total_read_bytes + +The total number of read bytes transferred + +| Unit | Metric Type | Value Type | Aggregation Temporality | Monotonic | +| ---- | ----------- | ---------- | ----------------------- | --------- | +| By | Sum | Int | Cumulative | true | + +### diskio_ebs_total_read_time + +The total time spent, in microseconds, by all completed read operations + +| Unit | Metric Type | Value Type | Aggregation Temporality | Monotonic | +| ---- | ----------- | ---------- | ----------------------- | --------- | +| us | Sum | Int | Cumulative | true | + +### diskio_ebs_total_write_bytes + +The total number of write bytes transferred + +| Unit | Metric Type | Value Type | Aggregation Temporality | Monotonic | +| ---- | ----------- | ---------- | ----------------------- | --------- | +| By | Sum | Int | Cumulative | true | + +### diskio_ebs_total_write_ops + +The total number of completed write operations + +| Unit | Metric Type | Value Type | Aggregation Temporality | Monotonic | +| ---- | ----------- | ---------- | ----------------------- | --------- | +| 1 | Sum | Int | Cumulative | true | + +### diskio_ebs_total_write_time + +The total time spent, in microseconds, by all completed write operations + +| Unit | Metric Type | Value Type | Aggregation Temporality | Monotonic | +| ---- | ----------- | ---------- | ----------------------- | --------- | +| us | Sum | Int | Cumulative | true | + +### diskio_ebs_volume_performance_exceeded_iops + +The total time, in microseconds, that IOPS demand exceeded the volume's provisioned IOPS performance + +| Unit | Metric Type | Value Type | Aggregation Temporality | Monotonic | +| ---- | ----------- | ---------- | ----------------------- | --------- | +| us | Sum | Int | Cumulative | true | + +### diskio_ebs_volume_performance_exceeded_tp + +The total time, in microseconds, that throughput demand exceeded the volume's provisioned throughput performance + +| Unit | Metric Type | Value Type | Aggregation Temporality | Monotonic | +| ---- | ----------- | ---------- | ----------------------- | --------- | +| us | Sum | Int | Cumulative | true | + +### diskio_ebs_volume_queue_length + +The number of read and write operations waiting to be completed + +| Unit | Metric Type | Value Type | +| ---- | ----------- | ---------- | +| 1 | Gauge | Int | + +## Resource Attributes + +| Name | Description | Values | Enabled | +| ---- | ----------- | ------ | ------- | +| VolumeId | Unique identifier to the EBS volume | Any Str | true | diff --git a/receiver/awsebsnvmereceiver/factory.go b/receiver/awsebsnvmereceiver/factory.go new file mode 100644 index 0000000000..66372ced47 --- /dev/null +++ b/receiver/awsebsnvmereceiver/factory.go @@ -0,0 +1,51 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package awsebsnvmereceiver + +import ( + "context" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/receiver" + "go.opentelemetry.io/collector/receiver/scraperhelper" + otelscraper "go.opentelemetry.io/collector/scraper" + + "github.com/aws/amazon-cloudwatch-agent/internal/util/collections" + "github.com/aws/amazon-cloudwatch-agent/receiver/awsebsnvmereceiver/internal/metadata" + "github.com/aws/amazon-cloudwatch-agent/receiver/awsebsnvmereceiver/internal/nvme" +) + +func NewFactory() receiver.Factory { + return receiver.NewFactory(metadata.Type, + createDefaultConfig, + receiver.WithMetrics(createMetricsReceiver, metadata.MetricsStability)) +} + +func createDefaultConfig() component.Config { + return &Config{ + ControllerConfig: scraperhelper.NewDefaultControllerConfig(), + MetricsBuilderConfig: metadata.DefaultMetricsBuilderConfig(), + Devices: []string{}, + } +} + +func createMetricsReceiver( + _ context.Context, + settings receiver.Settings, + baseCfg component.Config, + consumer consumer.Metrics, +) (receiver.Metrics, error) { + cfg := baseCfg.(*Config) + nvmeScraper := newScraper(cfg, settings, &nvme.Util{}, collections.NewSet[string](cfg.Devices...)) + scraper, err := otelscraper.NewMetrics(nvmeScraper.scrape, otelscraper.WithStart(nvmeScraper.start), otelscraper.WithShutdown(nvmeScraper.shutdown)) + if err != nil { + return nil, err + } + + return scraperhelper.NewScraperControllerReceiver( + &cfg.ControllerConfig, settings, consumer, + scraperhelper.AddScraper(metadata.Type, scraper), + ) +} diff --git a/receiver/awsebsnvmereceiver/factory_test.go b/receiver/awsebsnvmereceiver/factory_test.go new file mode 100644 index 0000000000..93d056bf99 --- /dev/null +++ b/receiver/awsebsnvmereceiver/factory_test.go @@ -0,0 +1,57 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package awsebsnvmereceiver + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/receiver/receivertest" +) + +func TestCreateDefaultConfig(t *testing.T) { + config := createDefaultConfig().(*Config) + assert.NotNil(t, config) + assert.Empty(t, config.Devices) +} + +func TestCreateMetricsReceiver(t *testing.T) { + testCases := []struct { + name string + devices []string + }{ + { + name: "no devices", + devices: []string{}, + }, + { + name: "with devices", + devices: []string{"nvme0n1", "nvme1n1"}, + }, + { + name: "with wildcard", + devices: []string{"*"}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + cfg := createDefaultConfig().(*Config) + cfg.Devices = tc.devices + + receiver, err := createMetricsReceiver( + context.Background(), + receivertest.NewNopSettings(), + cfg, + consumertest.NewNop(), + ) + + require.NoError(t, err) + require.NotNil(t, receiver) + }) + } +} diff --git a/receiver/awsebsnvmereceiver/generated_component_test.go b/receiver/awsebsnvmereceiver/generated_component_test.go new file mode 100644 index 0000000000..1922c32b4d --- /dev/null +++ b/receiver/awsebsnvmereceiver/generated_component_test.go @@ -0,0 +1,69 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package awsebsnvmereceiver + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/confmap/confmaptest" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/receiver" + "go.opentelemetry.io/collector/receiver/receivertest" +) + +func TestComponentFactoryType(t *testing.T) { + require.Equal(t, "awsebsnvmereceiver", NewFactory().Type().String()) +} + +func TestComponentConfigStruct(t *testing.T) { + require.NoError(t, componenttest.CheckConfigStruct(NewFactory().CreateDefaultConfig())) +} + +func TestComponentLifecycle(t *testing.T) { + factory := NewFactory() + + tests := []struct { + name string + createFn func(ctx context.Context, set receiver.Settings, cfg component.Config) (component.Component, error) + }{ + + { + name: "metrics", + createFn: func(ctx context.Context, set receiver.Settings, cfg component.Config) (component.Component, error) { + return factory.CreateMetrics(ctx, set, cfg, consumertest.NewNop()) + }, + }, + } + + cm, err := confmaptest.LoadConf("metadata.yaml") + require.NoError(t, err) + cfg := factory.CreateDefaultConfig() + sub, err := cm.Sub("tests::config") + require.NoError(t, err) + require.NoError(t, sub.Unmarshal(&cfg)) + + for _, tt := range tests { + t.Run(tt.name+"-shutdown", func(t *testing.T) { + c, err := tt.createFn(context.Background(), receivertest.NewNopSettings(), cfg) + require.NoError(t, err) + err = c.Shutdown(context.Background()) + require.NoError(t, err) + }) + t.Run(tt.name+"-lifecycle", func(t *testing.T) { + firstRcvr, err := tt.createFn(context.Background(), receivertest.NewNopSettings(), cfg) + require.NoError(t, err) + host := componenttest.NewNopHost() + require.NoError(t, err) + require.NoError(t, firstRcvr.Start(context.Background(), host)) + require.NoError(t, firstRcvr.Shutdown(context.Background())) + secondRcvr, err := tt.createFn(context.Background(), receivertest.NewNopSettings(), cfg) + require.NoError(t, err) + require.NoError(t, secondRcvr.Start(context.Background(), host)) + require.NoError(t, secondRcvr.Shutdown(context.Background())) + }) + } +} diff --git a/receiver/awsebsnvmereceiver/generated_package_test.go b/receiver/awsebsnvmereceiver/generated_package_test.go new file mode 100644 index 0000000000..9c41aa5b29 --- /dev/null +++ b/receiver/awsebsnvmereceiver/generated_package_test.go @@ -0,0 +1,13 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package awsebsnvmereceiver + +import ( + "testing" + + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +} diff --git a/receiver/awsebsnvmereceiver/internal/metadata/generated_config.go b/receiver/awsebsnvmereceiver/internal/metadata/generated_config.go new file mode 100644 index 0000000000..53f17d52ea --- /dev/null +++ b/receiver/awsebsnvmereceiver/internal/metadata/generated_config.go @@ -0,0 +1,132 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadata + +import ( + "go.opentelemetry.io/collector/confmap" + "go.opentelemetry.io/collector/filter" +) + +// MetricConfig provides common config for a particular metric. +type MetricConfig struct { + Enabled bool `mapstructure:"enabled"` + + enabledSetByUser bool +} + +func (ms *MetricConfig) Unmarshal(parser *confmap.Conf) error { + if parser == nil { + return nil + } + err := parser.Unmarshal(ms) + if err != nil { + return err + } + ms.enabledSetByUser = parser.IsSet("enabled") + return nil +} + +// MetricsConfig provides config for awsebsnvmereceiver metrics. +type MetricsConfig struct { + DiskioEbsEc2InstancePerformanceExceededIops MetricConfig `mapstructure:"diskio_ebs_ec2_instance_performance_exceeded_iops"` + DiskioEbsEc2InstancePerformanceExceededTp MetricConfig `mapstructure:"diskio_ebs_ec2_instance_performance_exceeded_tp"` + DiskioEbsTotalReadBytes MetricConfig `mapstructure:"diskio_ebs_total_read_bytes"` + DiskioEbsTotalReadOps MetricConfig `mapstructure:"diskio_ebs_total_read_ops"` + DiskioEbsTotalReadTime MetricConfig `mapstructure:"diskio_ebs_total_read_time"` + DiskioEbsTotalWriteBytes MetricConfig `mapstructure:"diskio_ebs_total_write_bytes"` + DiskioEbsTotalWriteOps MetricConfig `mapstructure:"diskio_ebs_total_write_ops"` + DiskioEbsTotalWriteTime MetricConfig `mapstructure:"diskio_ebs_total_write_time"` + DiskioEbsVolumePerformanceExceededIops MetricConfig `mapstructure:"diskio_ebs_volume_performance_exceeded_iops"` + DiskioEbsVolumePerformanceExceededTp MetricConfig `mapstructure:"diskio_ebs_volume_performance_exceeded_tp"` + DiskioEbsVolumeQueueLength MetricConfig `mapstructure:"diskio_ebs_volume_queue_length"` +} + +func DefaultMetricsConfig() MetricsConfig { + return MetricsConfig{ + DiskioEbsEc2InstancePerformanceExceededIops: MetricConfig{ + Enabled: false, + }, + DiskioEbsEc2InstancePerformanceExceededTp: MetricConfig{ + Enabled: false, + }, + DiskioEbsTotalReadBytes: MetricConfig{ + Enabled: false, + }, + DiskioEbsTotalReadOps: MetricConfig{ + Enabled: true, + }, + DiskioEbsTotalReadTime: MetricConfig{ + Enabled: false, + }, + DiskioEbsTotalWriteBytes: MetricConfig{ + Enabled: false, + }, + DiskioEbsTotalWriteOps: MetricConfig{ + Enabled: false, + }, + DiskioEbsTotalWriteTime: MetricConfig{ + Enabled: false, + }, + DiskioEbsVolumePerformanceExceededIops: MetricConfig{ + Enabled: false, + }, + DiskioEbsVolumePerformanceExceededTp: MetricConfig{ + Enabled: false, + }, + DiskioEbsVolumeQueueLength: MetricConfig{ + Enabled: false, + }, + } +} + +// ResourceAttributeConfig provides common config for a particular resource attribute. +type ResourceAttributeConfig struct { + Enabled bool `mapstructure:"enabled"` + // Experimental: MetricsInclude defines a list of filters for attribute values. + // If the list is not empty, only metrics with matching resource attribute values will be emitted. + MetricsInclude []filter.Config `mapstructure:"metrics_include"` + // Experimental: MetricsExclude defines a list of filters for attribute values. + // If the list is not empty, metrics with matching resource attribute values will not be emitted. + // MetricsInclude has higher priority than MetricsExclude. + MetricsExclude []filter.Config `mapstructure:"metrics_exclude"` + + enabledSetByUser bool +} + +func (rac *ResourceAttributeConfig) Unmarshal(parser *confmap.Conf) error { + if parser == nil { + return nil + } + err := parser.Unmarshal(rac) + if err != nil { + return err + } + rac.enabledSetByUser = parser.IsSet("enabled") + return nil +} + +// ResourceAttributesConfig provides config for awsebsnvmereceiver resource attributes. +type ResourceAttributesConfig struct { + VolumeID ResourceAttributeConfig `mapstructure:"VolumeId"` +} + +func DefaultResourceAttributesConfig() ResourceAttributesConfig { + return ResourceAttributesConfig{ + VolumeID: ResourceAttributeConfig{ + Enabled: true, + }, + } +} + +// MetricsBuilderConfig is a configuration for awsebsnvmereceiver metrics builder. +type MetricsBuilderConfig struct { + Metrics MetricsConfig `mapstructure:"metrics"` + ResourceAttributes ResourceAttributesConfig `mapstructure:"resource_attributes"` +} + +func DefaultMetricsBuilderConfig() MetricsBuilderConfig { + return MetricsBuilderConfig{ + Metrics: DefaultMetricsConfig(), + ResourceAttributes: DefaultResourceAttributesConfig(), + } +} diff --git a/receiver/awsebsnvmereceiver/internal/metadata/generated_config_test.go b/receiver/awsebsnvmereceiver/internal/metadata/generated_config_test.go new file mode 100644 index 0000000000..d13ea947b6 --- /dev/null +++ b/receiver/awsebsnvmereceiver/internal/metadata/generated_config_test.go @@ -0,0 +1,127 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadata + +import ( + "path/filepath" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/confmap/confmaptest" +) + +func TestMetricsBuilderConfig(t *testing.T) { + tests := []struct { + name string + want MetricsBuilderConfig + }{ + { + name: "default", + want: DefaultMetricsBuilderConfig(), + }, + { + name: "all_set", + want: MetricsBuilderConfig{ + Metrics: MetricsConfig{ + DiskioEbsEc2InstancePerformanceExceededIops: MetricConfig{Enabled: true}, + DiskioEbsEc2InstancePerformanceExceededTp: MetricConfig{Enabled: true}, + DiskioEbsTotalReadBytes: MetricConfig{Enabled: true}, + DiskioEbsTotalReadOps: MetricConfig{Enabled: true}, + DiskioEbsTotalReadTime: MetricConfig{Enabled: true}, + DiskioEbsTotalWriteBytes: MetricConfig{Enabled: true}, + DiskioEbsTotalWriteOps: MetricConfig{Enabled: true}, + DiskioEbsTotalWriteTime: MetricConfig{Enabled: true}, + DiskioEbsVolumePerformanceExceededIops: MetricConfig{Enabled: true}, + DiskioEbsVolumePerformanceExceededTp: MetricConfig{Enabled: true}, + DiskioEbsVolumeQueueLength: MetricConfig{Enabled: true}, + }, + ResourceAttributes: ResourceAttributesConfig{ + VolumeID: ResourceAttributeConfig{Enabled: true}, + }, + }, + }, + { + name: "none_set", + want: MetricsBuilderConfig{ + Metrics: MetricsConfig{ + DiskioEbsEc2InstancePerformanceExceededIops: MetricConfig{Enabled: false}, + DiskioEbsEc2InstancePerformanceExceededTp: MetricConfig{Enabled: false}, + DiskioEbsTotalReadBytes: MetricConfig{Enabled: false}, + DiskioEbsTotalReadOps: MetricConfig{Enabled: false}, + DiskioEbsTotalReadTime: MetricConfig{Enabled: false}, + DiskioEbsTotalWriteBytes: MetricConfig{Enabled: false}, + DiskioEbsTotalWriteOps: MetricConfig{Enabled: false}, + DiskioEbsTotalWriteTime: MetricConfig{Enabled: false}, + DiskioEbsVolumePerformanceExceededIops: MetricConfig{Enabled: false}, + DiskioEbsVolumePerformanceExceededTp: MetricConfig{Enabled: false}, + DiskioEbsVolumeQueueLength: MetricConfig{Enabled: false}, + }, + ResourceAttributes: ResourceAttributesConfig{ + VolumeID: ResourceAttributeConfig{Enabled: false}, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfg := loadMetricsBuilderConfig(t, tt.name) + diff := cmp.Diff(tt.want, cfg, cmpopts.IgnoreUnexported(MetricConfig{}, ResourceAttributeConfig{})) + require.Emptyf(t, diff, "Config mismatch (-expected +actual):\n%s", diff) + }) + } +} + +func loadMetricsBuilderConfig(t *testing.T, name string) MetricsBuilderConfig { + cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml")) + require.NoError(t, err) + sub, err := cm.Sub(name) + require.NoError(t, err) + cfg := DefaultMetricsBuilderConfig() + require.NoError(t, sub.Unmarshal(&cfg)) + return cfg +} + +func TestResourceAttributesConfig(t *testing.T) { + tests := []struct { + name string + want ResourceAttributesConfig + }{ + { + name: "default", + want: DefaultResourceAttributesConfig(), + }, + { + name: "all_set", + want: ResourceAttributesConfig{ + VolumeID: ResourceAttributeConfig{Enabled: true}, + }, + }, + { + name: "none_set", + want: ResourceAttributesConfig{ + VolumeID: ResourceAttributeConfig{Enabled: false}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfg := loadResourceAttributesConfig(t, tt.name) + diff := cmp.Diff(tt.want, cfg, cmpopts.IgnoreUnexported(ResourceAttributeConfig{})) + require.Emptyf(t, diff, "Config mismatch (-expected +actual):\n%s", diff) + }) + } +} + +func loadResourceAttributesConfig(t *testing.T, name string) ResourceAttributesConfig { + cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml")) + require.NoError(t, err) + sub, err := cm.Sub(name) + require.NoError(t, err) + sub, err = sub.Sub("resource_attributes") + require.NoError(t, err) + cfg := DefaultResourceAttributesConfig() + require.NoError(t, sub.Unmarshal(&cfg)) + return cfg +} diff --git a/receiver/awsebsnvmereceiver/internal/metadata/generated_metrics.go b/receiver/awsebsnvmereceiver/internal/metadata/generated_metrics.go new file mode 100644 index 0000000000..7379bdb92b --- /dev/null +++ b/receiver/awsebsnvmereceiver/internal/metadata/generated_metrics.go @@ -0,0 +1,814 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadata + +import ( + "time" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/filter" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/receiver" +) + +type metricDiskioEbsEc2InstancePerformanceExceededIops struct { + data pmetric.Metric // data buffer for generated metric. + config MetricConfig // metric config provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills diskio_ebs_ec2_instance_performance_exceeded_iops metric with initial data. +func (m *metricDiskioEbsEc2InstancePerformanceExceededIops) init() { + m.data.SetName("diskio_ebs_ec2_instance_performance_exceeded_iops") + m.data.SetDescription("The total time, in microseconds, that the EBS volume exceeded the attached Amazon EC2 instance's maximum IOPS performance") + m.data.SetUnit("us") + m.data.SetEmptySum() + m.data.Sum().SetIsMonotonic(true) + m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) +} + +func (m *metricDiskioEbsEc2InstancePerformanceExceededIops) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) { + if !m.config.Enabled { + return + } + dp := m.data.Sum().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetIntValue(val) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricDiskioEbsEc2InstancePerformanceExceededIops) updateCapacity() { + if m.data.Sum().DataPoints().Len() > m.capacity { + m.capacity = m.data.Sum().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricDiskioEbsEc2InstancePerformanceExceededIops) emit(metrics pmetric.MetricSlice) { + if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricDiskioEbsEc2InstancePerformanceExceededIops(cfg MetricConfig) metricDiskioEbsEc2InstancePerformanceExceededIops { + m := metricDiskioEbsEc2InstancePerformanceExceededIops{config: cfg} + if cfg.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + +type metricDiskioEbsEc2InstancePerformanceExceededTp struct { + data pmetric.Metric // data buffer for generated metric. + config MetricConfig // metric config provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills diskio_ebs_ec2_instance_performance_exceeded_tp metric with initial data. +func (m *metricDiskioEbsEc2InstancePerformanceExceededTp) init() { + m.data.SetName("diskio_ebs_ec2_instance_performance_exceeded_tp") + m.data.SetDescription("The total time, in microseconds, that the EBS volume exceeded the attached Amazon EC2 instance's maximum throughput performance") + m.data.SetUnit("us") + m.data.SetEmptySum() + m.data.Sum().SetIsMonotonic(true) + m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) +} + +func (m *metricDiskioEbsEc2InstancePerformanceExceededTp) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) { + if !m.config.Enabled { + return + } + dp := m.data.Sum().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetIntValue(val) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricDiskioEbsEc2InstancePerformanceExceededTp) updateCapacity() { + if m.data.Sum().DataPoints().Len() > m.capacity { + m.capacity = m.data.Sum().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricDiskioEbsEc2InstancePerformanceExceededTp) emit(metrics pmetric.MetricSlice) { + if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricDiskioEbsEc2InstancePerformanceExceededTp(cfg MetricConfig) metricDiskioEbsEc2InstancePerformanceExceededTp { + m := metricDiskioEbsEc2InstancePerformanceExceededTp{config: cfg} + if cfg.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + +type metricDiskioEbsTotalReadBytes struct { + data pmetric.Metric // data buffer for generated metric. + config MetricConfig // metric config provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills diskio_ebs_total_read_bytes metric with initial data. +func (m *metricDiskioEbsTotalReadBytes) init() { + m.data.SetName("diskio_ebs_total_read_bytes") + m.data.SetDescription("The total number of read bytes transferred") + m.data.SetUnit("By") + m.data.SetEmptySum() + m.data.Sum().SetIsMonotonic(true) + m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) +} + +func (m *metricDiskioEbsTotalReadBytes) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) { + if !m.config.Enabled { + return + } + dp := m.data.Sum().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetIntValue(val) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricDiskioEbsTotalReadBytes) updateCapacity() { + if m.data.Sum().DataPoints().Len() > m.capacity { + m.capacity = m.data.Sum().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricDiskioEbsTotalReadBytes) emit(metrics pmetric.MetricSlice) { + if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricDiskioEbsTotalReadBytes(cfg MetricConfig) metricDiskioEbsTotalReadBytes { + m := metricDiskioEbsTotalReadBytes{config: cfg} + if cfg.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + +type metricDiskioEbsTotalReadOps struct { + data pmetric.Metric // data buffer for generated metric. + config MetricConfig // metric config provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills diskio_ebs_total_read_ops metric with initial data. +func (m *metricDiskioEbsTotalReadOps) init() { + m.data.SetName("diskio_ebs_total_read_ops") + m.data.SetDescription("The total number of completed read operations") + m.data.SetUnit("1") + m.data.SetEmptySum() + m.data.Sum().SetIsMonotonic(true) + m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) +} + +func (m *metricDiskioEbsTotalReadOps) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) { + if !m.config.Enabled { + return + } + dp := m.data.Sum().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetIntValue(val) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricDiskioEbsTotalReadOps) updateCapacity() { + if m.data.Sum().DataPoints().Len() > m.capacity { + m.capacity = m.data.Sum().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricDiskioEbsTotalReadOps) emit(metrics pmetric.MetricSlice) { + if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricDiskioEbsTotalReadOps(cfg MetricConfig) metricDiskioEbsTotalReadOps { + m := metricDiskioEbsTotalReadOps{config: cfg} + if cfg.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + +type metricDiskioEbsTotalReadTime struct { + data pmetric.Metric // data buffer for generated metric. + config MetricConfig // metric config provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills diskio_ebs_total_read_time metric with initial data. +func (m *metricDiskioEbsTotalReadTime) init() { + m.data.SetName("diskio_ebs_total_read_time") + m.data.SetDescription("The total time spent, in microseconds, by all completed read operations") + m.data.SetUnit("us") + m.data.SetEmptySum() + m.data.Sum().SetIsMonotonic(true) + m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) +} + +func (m *metricDiskioEbsTotalReadTime) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) { + if !m.config.Enabled { + return + } + dp := m.data.Sum().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetIntValue(val) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricDiskioEbsTotalReadTime) updateCapacity() { + if m.data.Sum().DataPoints().Len() > m.capacity { + m.capacity = m.data.Sum().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricDiskioEbsTotalReadTime) emit(metrics pmetric.MetricSlice) { + if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricDiskioEbsTotalReadTime(cfg MetricConfig) metricDiskioEbsTotalReadTime { + m := metricDiskioEbsTotalReadTime{config: cfg} + if cfg.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + +type metricDiskioEbsTotalWriteBytes struct { + data pmetric.Metric // data buffer for generated metric. + config MetricConfig // metric config provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills diskio_ebs_total_write_bytes metric with initial data. +func (m *metricDiskioEbsTotalWriteBytes) init() { + m.data.SetName("diskio_ebs_total_write_bytes") + m.data.SetDescription("The total number of write bytes transferred") + m.data.SetUnit("By") + m.data.SetEmptySum() + m.data.Sum().SetIsMonotonic(true) + m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) +} + +func (m *metricDiskioEbsTotalWriteBytes) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) { + if !m.config.Enabled { + return + } + dp := m.data.Sum().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetIntValue(val) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricDiskioEbsTotalWriteBytes) updateCapacity() { + if m.data.Sum().DataPoints().Len() > m.capacity { + m.capacity = m.data.Sum().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricDiskioEbsTotalWriteBytes) emit(metrics pmetric.MetricSlice) { + if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricDiskioEbsTotalWriteBytes(cfg MetricConfig) metricDiskioEbsTotalWriteBytes { + m := metricDiskioEbsTotalWriteBytes{config: cfg} + if cfg.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + +type metricDiskioEbsTotalWriteOps struct { + data pmetric.Metric // data buffer for generated metric. + config MetricConfig // metric config provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills diskio_ebs_total_write_ops metric with initial data. +func (m *metricDiskioEbsTotalWriteOps) init() { + m.data.SetName("diskio_ebs_total_write_ops") + m.data.SetDescription("The total number of completed write operations") + m.data.SetUnit("1") + m.data.SetEmptySum() + m.data.Sum().SetIsMonotonic(true) + m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) +} + +func (m *metricDiskioEbsTotalWriteOps) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) { + if !m.config.Enabled { + return + } + dp := m.data.Sum().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetIntValue(val) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricDiskioEbsTotalWriteOps) updateCapacity() { + if m.data.Sum().DataPoints().Len() > m.capacity { + m.capacity = m.data.Sum().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricDiskioEbsTotalWriteOps) emit(metrics pmetric.MetricSlice) { + if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricDiskioEbsTotalWriteOps(cfg MetricConfig) metricDiskioEbsTotalWriteOps { + m := metricDiskioEbsTotalWriteOps{config: cfg} + if cfg.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + +type metricDiskioEbsTotalWriteTime struct { + data pmetric.Metric // data buffer for generated metric. + config MetricConfig // metric config provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills diskio_ebs_total_write_time metric with initial data. +func (m *metricDiskioEbsTotalWriteTime) init() { + m.data.SetName("diskio_ebs_total_write_time") + m.data.SetDescription("The total time spent, in microseconds, by all completed write operations") + m.data.SetUnit("us") + m.data.SetEmptySum() + m.data.Sum().SetIsMonotonic(true) + m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) +} + +func (m *metricDiskioEbsTotalWriteTime) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) { + if !m.config.Enabled { + return + } + dp := m.data.Sum().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetIntValue(val) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricDiskioEbsTotalWriteTime) updateCapacity() { + if m.data.Sum().DataPoints().Len() > m.capacity { + m.capacity = m.data.Sum().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricDiskioEbsTotalWriteTime) emit(metrics pmetric.MetricSlice) { + if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricDiskioEbsTotalWriteTime(cfg MetricConfig) metricDiskioEbsTotalWriteTime { + m := metricDiskioEbsTotalWriteTime{config: cfg} + if cfg.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + +type metricDiskioEbsVolumePerformanceExceededIops struct { + data pmetric.Metric // data buffer for generated metric. + config MetricConfig // metric config provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills diskio_ebs_volume_performance_exceeded_iops metric with initial data. +func (m *metricDiskioEbsVolumePerformanceExceededIops) init() { + m.data.SetName("diskio_ebs_volume_performance_exceeded_iops") + m.data.SetDescription("The total time, in microseconds, that IOPS demand exceeded the volume's provisioned IOPS performance") + m.data.SetUnit("us") + m.data.SetEmptySum() + m.data.Sum().SetIsMonotonic(true) + m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) +} + +func (m *metricDiskioEbsVolumePerformanceExceededIops) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) { + if !m.config.Enabled { + return + } + dp := m.data.Sum().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetIntValue(val) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricDiskioEbsVolumePerformanceExceededIops) updateCapacity() { + if m.data.Sum().DataPoints().Len() > m.capacity { + m.capacity = m.data.Sum().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricDiskioEbsVolumePerformanceExceededIops) emit(metrics pmetric.MetricSlice) { + if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricDiskioEbsVolumePerformanceExceededIops(cfg MetricConfig) metricDiskioEbsVolumePerformanceExceededIops { + m := metricDiskioEbsVolumePerformanceExceededIops{config: cfg} + if cfg.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + +type metricDiskioEbsVolumePerformanceExceededTp struct { + data pmetric.Metric // data buffer for generated metric. + config MetricConfig // metric config provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills diskio_ebs_volume_performance_exceeded_tp metric with initial data. +func (m *metricDiskioEbsVolumePerformanceExceededTp) init() { + m.data.SetName("diskio_ebs_volume_performance_exceeded_tp") + m.data.SetDescription("The total time, in microseconds, that throughput demand exceeded the volume's provisioned throughput performance") + m.data.SetUnit("us") + m.data.SetEmptySum() + m.data.Sum().SetIsMonotonic(true) + m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) +} + +func (m *metricDiskioEbsVolumePerformanceExceededTp) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) { + if !m.config.Enabled { + return + } + dp := m.data.Sum().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetIntValue(val) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricDiskioEbsVolumePerformanceExceededTp) updateCapacity() { + if m.data.Sum().DataPoints().Len() > m.capacity { + m.capacity = m.data.Sum().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricDiskioEbsVolumePerformanceExceededTp) emit(metrics pmetric.MetricSlice) { + if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricDiskioEbsVolumePerformanceExceededTp(cfg MetricConfig) metricDiskioEbsVolumePerformanceExceededTp { + m := metricDiskioEbsVolumePerformanceExceededTp{config: cfg} + if cfg.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + +type metricDiskioEbsVolumeQueueLength struct { + data pmetric.Metric // data buffer for generated metric. + config MetricConfig // metric config provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills diskio_ebs_volume_queue_length metric with initial data. +func (m *metricDiskioEbsVolumeQueueLength) init() { + m.data.SetName("diskio_ebs_volume_queue_length") + m.data.SetDescription("The number of read and write operations waiting to be completed") + m.data.SetUnit("1") + m.data.SetEmptyGauge() +} + +func (m *metricDiskioEbsVolumeQueueLength) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) { + if !m.config.Enabled { + return + } + dp := m.data.Gauge().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetIntValue(val) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricDiskioEbsVolumeQueueLength) updateCapacity() { + if m.data.Gauge().DataPoints().Len() > m.capacity { + m.capacity = m.data.Gauge().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricDiskioEbsVolumeQueueLength) emit(metrics pmetric.MetricSlice) { + if m.config.Enabled && m.data.Gauge().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricDiskioEbsVolumeQueueLength(cfg MetricConfig) metricDiskioEbsVolumeQueueLength { + m := metricDiskioEbsVolumeQueueLength{config: cfg} + if cfg.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + +// MetricsBuilder provides an interface for scrapers to report metrics while taking care of all the transformations +// required to produce metric representation defined in metadata and user config. +type MetricsBuilder struct { + config MetricsBuilderConfig // config of the metrics builder. + startTime pcommon.Timestamp // start time that will be applied to all recorded data points. + metricsCapacity int // maximum observed number of metrics per resource. + metricsBuffer pmetric.Metrics // accumulates metrics data before emitting. + buildInfo component.BuildInfo // contains version information. + resourceAttributeIncludeFilter map[string]filter.Filter + resourceAttributeExcludeFilter map[string]filter.Filter + metricDiskioEbsEc2InstancePerformanceExceededIops metricDiskioEbsEc2InstancePerformanceExceededIops + metricDiskioEbsEc2InstancePerformanceExceededTp metricDiskioEbsEc2InstancePerformanceExceededTp + metricDiskioEbsTotalReadBytes metricDiskioEbsTotalReadBytes + metricDiskioEbsTotalReadOps metricDiskioEbsTotalReadOps + metricDiskioEbsTotalReadTime metricDiskioEbsTotalReadTime + metricDiskioEbsTotalWriteBytes metricDiskioEbsTotalWriteBytes + metricDiskioEbsTotalWriteOps metricDiskioEbsTotalWriteOps + metricDiskioEbsTotalWriteTime metricDiskioEbsTotalWriteTime + metricDiskioEbsVolumePerformanceExceededIops metricDiskioEbsVolumePerformanceExceededIops + metricDiskioEbsVolumePerformanceExceededTp metricDiskioEbsVolumePerformanceExceededTp + metricDiskioEbsVolumeQueueLength metricDiskioEbsVolumeQueueLength +} + +// MetricBuilderOption applies changes to default metrics builder. +type MetricBuilderOption interface { + apply(*MetricsBuilder) +} + +type metricBuilderOptionFunc func(mb *MetricsBuilder) + +func (mbof metricBuilderOptionFunc) apply(mb *MetricsBuilder) { + mbof(mb) +} + +// WithStartTime sets startTime on the metrics builder. +func WithStartTime(startTime pcommon.Timestamp) MetricBuilderOption { + return metricBuilderOptionFunc(func(mb *MetricsBuilder) { + mb.startTime = startTime + }) +} + +func NewMetricsBuilder(mbc MetricsBuilderConfig, settings receiver.Settings, options ...MetricBuilderOption) *MetricsBuilder { + mb := &MetricsBuilder{ + config: mbc, + startTime: pcommon.NewTimestampFromTime(time.Now()), + metricsBuffer: pmetric.NewMetrics(), + buildInfo: settings.BuildInfo, + metricDiskioEbsEc2InstancePerformanceExceededIops: newMetricDiskioEbsEc2InstancePerformanceExceededIops(mbc.Metrics.DiskioEbsEc2InstancePerformanceExceededIops), + metricDiskioEbsEc2InstancePerformanceExceededTp: newMetricDiskioEbsEc2InstancePerformanceExceededTp(mbc.Metrics.DiskioEbsEc2InstancePerformanceExceededTp), + metricDiskioEbsTotalReadBytes: newMetricDiskioEbsTotalReadBytes(mbc.Metrics.DiskioEbsTotalReadBytes), + metricDiskioEbsTotalReadOps: newMetricDiskioEbsTotalReadOps(mbc.Metrics.DiskioEbsTotalReadOps), + metricDiskioEbsTotalReadTime: newMetricDiskioEbsTotalReadTime(mbc.Metrics.DiskioEbsTotalReadTime), + metricDiskioEbsTotalWriteBytes: newMetricDiskioEbsTotalWriteBytes(mbc.Metrics.DiskioEbsTotalWriteBytes), + metricDiskioEbsTotalWriteOps: newMetricDiskioEbsTotalWriteOps(mbc.Metrics.DiskioEbsTotalWriteOps), + metricDiskioEbsTotalWriteTime: newMetricDiskioEbsTotalWriteTime(mbc.Metrics.DiskioEbsTotalWriteTime), + metricDiskioEbsVolumePerformanceExceededIops: newMetricDiskioEbsVolumePerformanceExceededIops(mbc.Metrics.DiskioEbsVolumePerformanceExceededIops), + metricDiskioEbsVolumePerformanceExceededTp: newMetricDiskioEbsVolumePerformanceExceededTp(mbc.Metrics.DiskioEbsVolumePerformanceExceededTp), + metricDiskioEbsVolumeQueueLength: newMetricDiskioEbsVolumeQueueLength(mbc.Metrics.DiskioEbsVolumeQueueLength), + resourceAttributeIncludeFilter: make(map[string]filter.Filter), + resourceAttributeExcludeFilter: make(map[string]filter.Filter), + } + if mbc.ResourceAttributes.VolumeID.MetricsInclude != nil { + mb.resourceAttributeIncludeFilter["VolumeId"] = filter.CreateFilter(mbc.ResourceAttributes.VolumeID.MetricsInclude) + } + if mbc.ResourceAttributes.VolumeID.MetricsExclude != nil { + mb.resourceAttributeExcludeFilter["VolumeId"] = filter.CreateFilter(mbc.ResourceAttributes.VolumeID.MetricsExclude) + } + + for _, op := range options { + op.apply(mb) + } + return mb +} + +// NewResourceBuilder returns a new resource builder that should be used to build a resource associated with for the emitted metrics. +func (mb *MetricsBuilder) NewResourceBuilder() *ResourceBuilder { + return NewResourceBuilder(mb.config.ResourceAttributes) +} + +// updateCapacity updates max length of metrics and resource attributes that will be used for the slice capacity. +func (mb *MetricsBuilder) updateCapacity(rm pmetric.ResourceMetrics) { + if mb.metricsCapacity < rm.ScopeMetrics().At(0).Metrics().Len() { + mb.metricsCapacity = rm.ScopeMetrics().At(0).Metrics().Len() + } +} + +// ResourceMetricsOption applies changes to provided resource metrics. +type ResourceMetricsOption interface { + apply(pmetric.ResourceMetrics) +} + +type resourceMetricsOptionFunc func(pmetric.ResourceMetrics) + +func (rmof resourceMetricsOptionFunc) apply(rm pmetric.ResourceMetrics) { + rmof(rm) +} + +// WithResource sets the provided resource on the emitted ResourceMetrics. +// It's recommended to use ResourceBuilder to create the resource. +func WithResource(res pcommon.Resource) ResourceMetricsOption { + return resourceMetricsOptionFunc(func(rm pmetric.ResourceMetrics) { + res.CopyTo(rm.Resource()) + }) +} + +// WithStartTimeOverride overrides start time for all the resource metrics data points. +// This option should be only used if different start time has to be set on metrics coming from different resources. +func WithStartTimeOverride(start pcommon.Timestamp) ResourceMetricsOption { + return resourceMetricsOptionFunc(func(rm pmetric.ResourceMetrics) { + var dps pmetric.NumberDataPointSlice + metrics := rm.ScopeMetrics().At(0).Metrics() + for i := 0; i < metrics.Len(); i++ { + switch metrics.At(i).Type() { + case pmetric.MetricTypeGauge: + dps = metrics.At(i).Gauge().DataPoints() + case pmetric.MetricTypeSum: + dps = metrics.At(i).Sum().DataPoints() + } + for j := 0; j < dps.Len(); j++ { + dps.At(j).SetStartTimestamp(start) + } + } + }) +} + +// EmitForResource saves all the generated metrics under a new resource and updates the internal state to be ready for +// recording another set of data points as part of another resource. This function can be helpful when one scraper +// needs to emit metrics from several resources. Otherwise calling this function is not required, +// just `Emit` function can be called instead. +// Resource attributes should be provided as ResourceMetricsOption arguments. +func (mb *MetricsBuilder) EmitForResource(options ...ResourceMetricsOption) { + rm := pmetric.NewResourceMetrics() + ils := rm.ScopeMetrics().AppendEmpty() + ils.Scope().SetName("github.com/aws/amazon-cloudwatch-agent/receiver/awsebsnvmereceiver") + ils.Scope().SetVersion(mb.buildInfo.Version) + ils.Metrics().EnsureCapacity(mb.metricsCapacity) + mb.metricDiskioEbsEc2InstancePerformanceExceededIops.emit(ils.Metrics()) + mb.metricDiskioEbsEc2InstancePerformanceExceededTp.emit(ils.Metrics()) + mb.metricDiskioEbsTotalReadBytes.emit(ils.Metrics()) + mb.metricDiskioEbsTotalReadOps.emit(ils.Metrics()) + mb.metricDiskioEbsTotalReadTime.emit(ils.Metrics()) + mb.metricDiskioEbsTotalWriteBytes.emit(ils.Metrics()) + mb.metricDiskioEbsTotalWriteOps.emit(ils.Metrics()) + mb.metricDiskioEbsTotalWriteTime.emit(ils.Metrics()) + mb.metricDiskioEbsVolumePerformanceExceededIops.emit(ils.Metrics()) + mb.metricDiskioEbsVolumePerformanceExceededTp.emit(ils.Metrics()) + mb.metricDiskioEbsVolumeQueueLength.emit(ils.Metrics()) + + for _, op := range options { + op.apply(rm) + } + for attr, filter := range mb.resourceAttributeIncludeFilter { + if val, ok := rm.Resource().Attributes().Get(attr); ok && !filter.Matches(val.AsString()) { + return + } + } + for attr, filter := range mb.resourceAttributeExcludeFilter { + if val, ok := rm.Resource().Attributes().Get(attr); ok && filter.Matches(val.AsString()) { + return + } + } + + if ils.Metrics().Len() > 0 { + mb.updateCapacity(rm) + rm.MoveTo(mb.metricsBuffer.ResourceMetrics().AppendEmpty()) + } +} + +// Emit returns all the metrics accumulated by the metrics builder and updates the internal state to be ready for +// recording another set of metrics. This function will be responsible for applying all the transformations required to +// produce metric representation defined in metadata and user config, e.g. delta or cumulative. +func (mb *MetricsBuilder) Emit(options ...ResourceMetricsOption) pmetric.Metrics { + mb.EmitForResource(options...) + metrics := mb.metricsBuffer + mb.metricsBuffer = pmetric.NewMetrics() + return metrics +} + +// RecordDiskioEbsEc2InstancePerformanceExceededIopsDataPoint adds a data point to diskio_ebs_ec2_instance_performance_exceeded_iops metric. +func (mb *MetricsBuilder) RecordDiskioEbsEc2InstancePerformanceExceededIopsDataPoint(ts pcommon.Timestamp, val int64) { + mb.metricDiskioEbsEc2InstancePerformanceExceededIops.recordDataPoint(mb.startTime, ts, val) +} + +// RecordDiskioEbsEc2InstancePerformanceExceededTpDataPoint adds a data point to diskio_ebs_ec2_instance_performance_exceeded_tp metric. +func (mb *MetricsBuilder) RecordDiskioEbsEc2InstancePerformanceExceededTpDataPoint(ts pcommon.Timestamp, val int64) { + mb.metricDiskioEbsEc2InstancePerformanceExceededTp.recordDataPoint(mb.startTime, ts, val) +} + +// RecordDiskioEbsTotalReadBytesDataPoint adds a data point to diskio_ebs_total_read_bytes metric. +func (mb *MetricsBuilder) RecordDiskioEbsTotalReadBytesDataPoint(ts pcommon.Timestamp, val int64) { + mb.metricDiskioEbsTotalReadBytes.recordDataPoint(mb.startTime, ts, val) +} + +// RecordDiskioEbsTotalReadOpsDataPoint adds a data point to diskio_ebs_total_read_ops metric. +func (mb *MetricsBuilder) RecordDiskioEbsTotalReadOpsDataPoint(ts pcommon.Timestamp, val int64) { + mb.metricDiskioEbsTotalReadOps.recordDataPoint(mb.startTime, ts, val) +} + +// RecordDiskioEbsTotalReadTimeDataPoint adds a data point to diskio_ebs_total_read_time metric. +func (mb *MetricsBuilder) RecordDiskioEbsTotalReadTimeDataPoint(ts pcommon.Timestamp, val int64) { + mb.metricDiskioEbsTotalReadTime.recordDataPoint(mb.startTime, ts, val) +} + +// RecordDiskioEbsTotalWriteBytesDataPoint adds a data point to diskio_ebs_total_write_bytes metric. +func (mb *MetricsBuilder) RecordDiskioEbsTotalWriteBytesDataPoint(ts pcommon.Timestamp, val int64) { + mb.metricDiskioEbsTotalWriteBytes.recordDataPoint(mb.startTime, ts, val) +} + +// RecordDiskioEbsTotalWriteOpsDataPoint adds a data point to diskio_ebs_total_write_ops metric. +func (mb *MetricsBuilder) RecordDiskioEbsTotalWriteOpsDataPoint(ts pcommon.Timestamp, val int64) { + mb.metricDiskioEbsTotalWriteOps.recordDataPoint(mb.startTime, ts, val) +} + +// RecordDiskioEbsTotalWriteTimeDataPoint adds a data point to diskio_ebs_total_write_time metric. +func (mb *MetricsBuilder) RecordDiskioEbsTotalWriteTimeDataPoint(ts pcommon.Timestamp, val int64) { + mb.metricDiskioEbsTotalWriteTime.recordDataPoint(mb.startTime, ts, val) +} + +// RecordDiskioEbsVolumePerformanceExceededIopsDataPoint adds a data point to diskio_ebs_volume_performance_exceeded_iops metric. +func (mb *MetricsBuilder) RecordDiskioEbsVolumePerformanceExceededIopsDataPoint(ts pcommon.Timestamp, val int64) { + mb.metricDiskioEbsVolumePerformanceExceededIops.recordDataPoint(mb.startTime, ts, val) +} + +// RecordDiskioEbsVolumePerformanceExceededTpDataPoint adds a data point to diskio_ebs_volume_performance_exceeded_tp metric. +func (mb *MetricsBuilder) RecordDiskioEbsVolumePerformanceExceededTpDataPoint(ts pcommon.Timestamp, val int64) { + mb.metricDiskioEbsVolumePerformanceExceededTp.recordDataPoint(mb.startTime, ts, val) +} + +// RecordDiskioEbsVolumeQueueLengthDataPoint adds a data point to diskio_ebs_volume_queue_length metric. +func (mb *MetricsBuilder) RecordDiskioEbsVolumeQueueLengthDataPoint(ts pcommon.Timestamp, val int64) { + mb.metricDiskioEbsVolumeQueueLength.recordDataPoint(mb.startTime, ts, val) +} + +// Reset resets metrics builder to its initial state. It should be used when external metrics source is restarted, +// and metrics builder should update its startTime and reset it's internal state accordingly. +func (mb *MetricsBuilder) Reset(options ...MetricBuilderOption) { + mb.startTime = pcommon.NewTimestampFromTime(time.Now()) + for _, op := range options { + op.apply(mb) + } +} diff --git a/receiver/awsebsnvmereceiver/internal/metadata/generated_metrics_test.go b/receiver/awsebsnvmereceiver/internal/metadata/generated_metrics_test.go new file mode 100644 index 0000000000..b9192b1ae4 --- /dev/null +++ b/receiver/awsebsnvmereceiver/internal/metadata/generated_metrics_test.go @@ -0,0 +1,285 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadata + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/receiver/receivertest" + "go.uber.org/zap" + "go.uber.org/zap/zaptest/observer" +) + +type testDataSet int + +const ( + testDataSetDefault testDataSet = iota + testDataSetAll + testDataSetNone +) + +func TestMetricsBuilder(t *testing.T) { + tests := []struct { + name string + metricsSet testDataSet + resAttrsSet testDataSet + expectEmpty bool + }{ + { + name: "default", + }, + { + name: "all_set", + metricsSet: testDataSetAll, + resAttrsSet: testDataSetAll, + }, + { + name: "none_set", + metricsSet: testDataSetNone, + resAttrsSet: testDataSetNone, + expectEmpty: true, + }, + { + name: "filter_set_include", + resAttrsSet: testDataSetAll, + }, + { + name: "filter_set_exclude", + resAttrsSet: testDataSetAll, + expectEmpty: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + start := pcommon.Timestamp(1_000_000_000) + ts := pcommon.Timestamp(1_000_001_000) + observedZapCore, observedLogs := observer.New(zap.WarnLevel) + settings := receivertest.NewNopSettings() + settings.Logger = zap.New(observedZapCore) + mb := NewMetricsBuilder(loadMetricsBuilderConfig(t, tt.name), settings, WithStartTime(start)) + + expectedWarnings := 0 + + assert.Equal(t, expectedWarnings, observedLogs.Len()) + + defaultMetricsCount := 0 + allMetricsCount := 0 + + allMetricsCount++ + mb.RecordDiskioEbsEc2InstancePerformanceExceededIopsDataPoint(ts, 1) + + allMetricsCount++ + mb.RecordDiskioEbsEc2InstancePerformanceExceededTpDataPoint(ts, 1) + + allMetricsCount++ + mb.RecordDiskioEbsTotalReadBytesDataPoint(ts, 1) + + defaultMetricsCount++ + allMetricsCount++ + mb.RecordDiskioEbsTotalReadOpsDataPoint(ts, 1) + + allMetricsCount++ + mb.RecordDiskioEbsTotalReadTimeDataPoint(ts, 1) + + allMetricsCount++ + mb.RecordDiskioEbsTotalWriteBytesDataPoint(ts, 1) + + allMetricsCount++ + mb.RecordDiskioEbsTotalWriteOpsDataPoint(ts, 1) + + allMetricsCount++ + mb.RecordDiskioEbsTotalWriteTimeDataPoint(ts, 1) + + allMetricsCount++ + mb.RecordDiskioEbsVolumePerformanceExceededIopsDataPoint(ts, 1) + + allMetricsCount++ + mb.RecordDiskioEbsVolumePerformanceExceededTpDataPoint(ts, 1) + + allMetricsCount++ + mb.RecordDiskioEbsVolumeQueueLengthDataPoint(ts, 1) + + rb := mb.NewResourceBuilder() + rb.SetVolumeID("VolumeId-val") + res := rb.Emit() + metrics := mb.Emit(WithResource(res)) + + if tt.expectEmpty { + assert.Equal(t, 0, metrics.ResourceMetrics().Len()) + return + } + + assert.Equal(t, 1, metrics.ResourceMetrics().Len()) + rm := metrics.ResourceMetrics().At(0) + assert.Equal(t, res, rm.Resource()) + assert.Equal(t, 1, rm.ScopeMetrics().Len()) + ms := rm.ScopeMetrics().At(0).Metrics() + if tt.metricsSet == testDataSetDefault { + assert.Equal(t, defaultMetricsCount, ms.Len()) + } + if tt.metricsSet == testDataSetAll { + assert.Equal(t, allMetricsCount, ms.Len()) + } + validatedMetrics := make(map[string]bool) + for i := 0; i < ms.Len(); i++ { + switch ms.At(i).Name() { + case "diskio_ebs_ec2_instance_performance_exceeded_iops": + assert.False(t, validatedMetrics["diskio_ebs_ec2_instance_performance_exceeded_iops"], "Found a duplicate in the metrics slice: diskio_ebs_ec2_instance_performance_exceeded_iops") + validatedMetrics["diskio_ebs_ec2_instance_performance_exceeded_iops"] = true + assert.Equal(t, pmetric.MetricTypeSum, ms.At(i).Type()) + assert.Equal(t, 1, ms.At(i).Sum().DataPoints().Len()) + assert.Equal(t, "The total time, in microseconds, that the EBS volume exceeded the attached Amazon EC2 instance's maximum IOPS performance", ms.At(i).Description()) + assert.Equal(t, "us", ms.At(i).Unit()) + assert.True(t, ms.At(i).Sum().IsMonotonic()) + assert.Equal(t, pmetric.AggregationTemporalityCumulative, ms.At(i).Sum().AggregationTemporality()) + dp := ms.At(i).Sum().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeInt, dp.ValueType()) + assert.Equal(t, int64(1), dp.IntValue()) + case "diskio_ebs_ec2_instance_performance_exceeded_tp": + assert.False(t, validatedMetrics["diskio_ebs_ec2_instance_performance_exceeded_tp"], "Found a duplicate in the metrics slice: diskio_ebs_ec2_instance_performance_exceeded_tp") + validatedMetrics["diskio_ebs_ec2_instance_performance_exceeded_tp"] = true + assert.Equal(t, pmetric.MetricTypeSum, ms.At(i).Type()) + assert.Equal(t, 1, ms.At(i).Sum().DataPoints().Len()) + assert.Equal(t, "The total time, in microseconds, that the EBS volume exceeded the attached Amazon EC2 instance's maximum throughput performance", ms.At(i).Description()) + assert.Equal(t, "us", ms.At(i).Unit()) + assert.True(t, ms.At(i).Sum().IsMonotonic()) + assert.Equal(t, pmetric.AggregationTemporalityCumulative, ms.At(i).Sum().AggregationTemporality()) + dp := ms.At(i).Sum().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeInt, dp.ValueType()) + assert.Equal(t, int64(1), dp.IntValue()) + case "diskio_ebs_total_read_bytes": + assert.False(t, validatedMetrics["diskio_ebs_total_read_bytes"], "Found a duplicate in the metrics slice: diskio_ebs_total_read_bytes") + validatedMetrics["diskio_ebs_total_read_bytes"] = true + assert.Equal(t, pmetric.MetricTypeSum, ms.At(i).Type()) + assert.Equal(t, 1, ms.At(i).Sum().DataPoints().Len()) + assert.Equal(t, "The total number of read bytes transferred", ms.At(i).Description()) + assert.Equal(t, "By", ms.At(i).Unit()) + assert.True(t, ms.At(i).Sum().IsMonotonic()) + assert.Equal(t, pmetric.AggregationTemporalityCumulative, ms.At(i).Sum().AggregationTemporality()) + dp := ms.At(i).Sum().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeInt, dp.ValueType()) + assert.Equal(t, int64(1), dp.IntValue()) + case "diskio_ebs_total_read_ops": + assert.False(t, validatedMetrics["diskio_ebs_total_read_ops"], "Found a duplicate in the metrics slice: diskio_ebs_total_read_ops") + validatedMetrics["diskio_ebs_total_read_ops"] = true + assert.Equal(t, pmetric.MetricTypeSum, ms.At(i).Type()) + assert.Equal(t, 1, ms.At(i).Sum().DataPoints().Len()) + assert.Equal(t, "The total number of completed read operations", ms.At(i).Description()) + assert.Equal(t, "1", ms.At(i).Unit()) + assert.True(t, ms.At(i).Sum().IsMonotonic()) + assert.Equal(t, pmetric.AggregationTemporalityCumulative, ms.At(i).Sum().AggregationTemporality()) + dp := ms.At(i).Sum().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeInt, dp.ValueType()) + assert.Equal(t, int64(1), dp.IntValue()) + case "diskio_ebs_total_read_time": + assert.False(t, validatedMetrics["diskio_ebs_total_read_time"], "Found a duplicate in the metrics slice: diskio_ebs_total_read_time") + validatedMetrics["diskio_ebs_total_read_time"] = true + assert.Equal(t, pmetric.MetricTypeSum, ms.At(i).Type()) + assert.Equal(t, 1, ms.At(i).Sum().DataPoints().Len()) + assert.Equal(t, "The total time spent, in microseconds, by all completed read operations", ms.At(i).Description()) + assert.Equal(t, "us", ms.At(i).Unit()) + assert.True(t, ms.At(i).Sum().IsMonotonic()) + assert.Equal(t, pmetric.AggregationTemporalityCumulative, ms.At(i).Sum().AggregationTemporality()) + dp := ms.At(i).Sum().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeInt, dp.ValueType()) + assert.Equal(t, int64(1), dp.IntValue()) + case "diskio_ebs_total_write_bytes": + assert.False(t, validatedMetrics["diskio_ebs_total_write_bytes"], "Found a duplicate in the metrics slice: diskio_ebs_total_write_bytes") + validatedMetrics["diskio_ebs_total_write_bytes"] = true + assert.Equal(t, pmetric.MetricTypeSum, ms.At(i).Type()) + assert.Equal(t, 1, ms.At(i).Sum().DataPoints().Len()) + assert.Equal(t, "The total number of write bytes transferred", ms.At(i).Description()) + assert.Equal(t, "By", ms.At(i).Unit()) + assert.True(t, ms.At(i).Sum().IsMonotonic()) + assert.Equal(t, pmetric.AggregationTemporalityCumulative, ms.At(i).Sum().AggregationTemporality()) + dp := ms.At(i).Sum().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeInt, dp.ValueType()) + assert.Equal(t, int64(1), dp.IntValue()) + case "diskio_ebs_total_write_ops": + assert.False(t, validatedMetrics["diskio_ebs_total_write_ops"], "Found a duplicate in the metrics slice: diskio_ebs_total_write_ops") + validatedMetrics["diskio_ebs_total_write_ops"] = true + assert.Equal(t, pmetric.MetricTypeSum, ms.At(i).Type()) + assert.Equal(t, 1, ms.At(i).Sum().DataPoints().Len()) + assert.Equal(t, "The total number of completed write operations", ms.At(i).Description()) + assert.Equal(t, "1", ms.At(i).Unit()) + assert.True(t, ms.At(i).Sum().IsMonotonic()) + assert.Equal(t, pmetric.AggregationTemporalityCumulative, ms.At(i).Sum().AggregationTemporality()) + dp := ms.At(i).Sum().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeInt, dp.ValueType()) + assert.Equal(t, int64(1), dp.IntValue()) + case "diskio_ebs_total_write_time": + assert.False(t, validatedMetrics["diskio_ebs_total_write_time"], "Found a duplicate in the metrics slice: diskio_ebs_total_write_time") + validatedMetrics["diskio_ebs_total_write_time"] = true + assert.Equal(t, pmetric.MetricTypeSum, ms.At(i).Type()) + assert.Equal(t, 1, ms.At(i).Sum().DataPoints().Len()) + assert.Equal(t, "The total time spent, in microseconds, by all completed write operations", ms.At(i).Description()) + assert.Equal(t, "us", ms.At(i).Unit()) + assert.True(t, ms.At(i).Sum().IsMonotonic()) + assert.Equal(t, pmetric.AggregationTemporalityCumulative, ms.At(i).Sum().AggregationTemporality()) + dp := ms.At(i).Sum().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeInt, dp.ValueType()) + assert.Equal(t, int64(1), dp.IntValue()) + case "diskio_ebs_volume_performance_exceeded_iops": + assert.False(t, validatedMetrics["diskio_ebs_volume_performance_exceeded_iops"], "Found a duplicate in the metrics slice: diskio_ebs_volume_performance_exceeded_iops") + validatedMetrics["diskio_ebs_volume_performance_exceeded_iops"] = true + assert.Equal(t, pmetric.MetricTypeSum, ms.At(i).Type()) + assert.Equal(t, 1, ms.At(i).Sum().DataPoints().Len()) + assert.Equal(t, "The total time, in microseconds, that IOPS demand exceeded the volume's provisioned IOPS performance", ms.At(i).Description()) + assert.Equal(t, "us", ms.At(i).Unit()) + assert.True(t, ms.At(i).Sum().IsMonotonic()) + assert.Equal(t, pmetric.AggregationTemporalityCumulative, ms.At(i).Sum().AggregationTemporality()) + dp := ms.At(i).Sum().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeInt, dp.ValueType()) + assert.Equal(t, int64(1), dp.IntValue()) + case "diskio_ebs_volume_performance_exceeded_tp": + assert.False(t, validatedMetrics["diskio_ebs_volume_performance_exceeded_tp"], "Found a duplicate in the metrics slice: diskio_ebs_volume_performance_exceeded_tp") + validatedMetrics["diskio_ebs_volume_performance_exceeded_tp"] = true + assert.Equal(t, pmetric.MetricTypeSum, ms.At(i).Type()) + assert.Equal(t, 1, ms.At(i).Sum().DataPoints().Len()) + assert.Equal(t, "The total time, in microseconds, that throughput demand exceeded the volume's provisioned throughput performance", ms.At(i).Description()) + assert.Equal(t, "us", ms.At(i).Unit()) + assert.True(t, ms.At(i).Sum().IsMonotonic()) + assert.Equal(t, pmetric.AggregationTemporalityCumulative, ms.At(i).Sum().AggregationTemporality()) + dp := ms.At(i).Sum().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeInt, dp.ValueType()) + assert.Equal(t, int64(1), dp.IntValue()) + case "diskio_ebs_volume_queue_length": + assert.False(t, validatedMetrics["diskio_ebs_volume_queue_length"], "Found a duplicate in the metrics slice: diskio_ebs_volume_queue_length") + validatedMetrics["diskio_ebs_volume_queue_length"] = true + assert.Equal(t, pmetric.MetricTypeGauge, ms.At(i).Type()) + assert.Equal(t, 1, ms.At(i).Gauge().DataPoints().Len()) + assert.Equal(t, "The number of read and write operations waiting to be completed", ms.At(i).Description()) + assert.Equal(t, "1", ms.At(i).Unit()) + dp := ms.At(i).Gauge().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeInt, dp.ValueType()) + assert.Equal(t, int64(1), dp.IntValue()) + } + } + }) + } +} diff --git a/receiver/awsebsnvmereceiver/internal/metadata/generated_resource.go b/receiver/awsebsnvmereceiver/internal/metadata/generated_resource.go new file mode 100644 index 0000000000..149c54117e --- /dev/null +++ b/receiver/awsebsnvmereceiver/internal/metadata/generated_resource.go @@ -0,0 +1,36 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadata + +import ( + "go.opentelemetry.io/collector/pdata/pcommon" +) + +// ResourceBuilder is a helper struct to build resources predefined in metadata.yaml. +// The ResourceBuilder is not thread-safe and must not to be used in multiple goroutines. +type ResourceBuilder struct { + config ResourceAttributesConfig + res pcommon.Resource +} + +// NewResourceBuilder creates a new ResourceBuilder. This method should be called on the start of the application. +func NewResourceBuilder(rac ResourceAttributesConfig) *ResourceBuilder { + return &ResourceBuilder{ + config: rac, + res: pcommon.NewResource(), + } +} + +// SetVolumeID sets provided value as "VolumeId" attribute. +func (rb *ResourceBuilder) SetVolumeID(val string) { + if rb.config.VolumeID.Enabled { + rb.res.Attributes().PutStr("VolumeId", val) + } +} + +// Emit returns the built resource and resets the internal builder state. +func (rb *ResourceBuilder) Emit() pcommon.Resource { + r := rb.res + rb.res = pcommon.NewResource() + return r +} diff --git a/receiver/awsebsnvmereceiver/internal/metadata/generated_resource_test.go b/receiver/awsebsnvmereceiver/internal/metadata/generated_resource_test.go new file mode 100644 index 0000000000..e95e2bf837 --- /dev/null +++ b/receiver/awsebsnvmereceiver/internal/metadata/generated_resource_test.go @@ -0,0 +1,40 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadata + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestResourceBuilder(t *testing.T) { + for _, tt := range []string{"default", "all_set", "none_set"} { + t.Run(tt, func(t *testing.T) { + cfg := loadResourceAttributesConfig(t, tt) + rb := NewResourceBuilder(cfg) + rb.SetVolumeID("VolumeId-val") + + res := rb.Emit() + assert.Equal(t, 0, rb.Emit().Attributes().Len()) // Second call should return empty Resource + + switch tt { + case "default": + assert.Equal(t, 1, res.Attributes().Len()) + case "all_set": + assert.Equal(t, 1, res.Attributes().Len()) + case "none_set": + assert.Equal(t, 0, res.Attributes().Len()) + return + default: + assert.Failf(t, "unexpected test case: %s", tt) + } + + val, ok := res.Attributes().Get("VolumeId") + assert.True(t, ok) + if ok { + assert.EqualValues(t, "VolumeId-val", val.Str()) + } + }) + } +} diff --git a/receiver/awsebsnvmereceiver/internal/metadata/generated_status.go b/receiver/awsebsnvmereceiver/internal/metadata/generated_status.go new file mode 100644 index 0000000000..8386451026 --- /dev/null +++ b/receiver/awsebsnvmereceiver/internal/metadata/generated_status.go @@ -0,0 +1,16 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadata + +import ( + "go.opentelemetry.io/collector/component" +) + +var ( + Type = component.MustNewType("awsebsnvmereceiver") + ScopeName = "github.com/aws/amazon-cloudwatch-agent/receiver/awsebsnvmereceiver" +) + +const ( + MetricsStability = component.StabilityLevelBeta +) diff --git a/receiver/awsebsnvmereceiver/internal/metadata/testdata/config.yaml b/receiver/awsebsnvmereceiver/internal/metadata/testdata/config.yaml new file mode 100644 index 0000000000..5a7cb5817c --- /dev/null +++ b/receiver/awsebsnvmereceiver/internal/metadata/testdata/config.yaml @@ -0,0 +1,67 @@ +default: +all_set: + metrics: + diskio_ebs_ec2_instance_performance_exceeded_iops: + enabled: true + diskio_ebs_ec2_instance_performance_exceeded_tp: + enabled: true + diskio_ebs_total_read_bytes: + enabled: true + diskio_ebs_total_read_ops: + enabled: true + diskio_ebs_total_read_time: + enabled: true + diskio_ebs_total_write_bytes: + enabled: true + diskio_ebs_total_write_ops: + enabled: true + diskio_ebs_total_write_time: + enabled: true + diskio_ebs_volume_performance_exceeded_iops: + enabled: true + diskio_ebs_volume_performance_exceeded_tp: + enabled: true + diskio_ebs_volume_queue_length: + enabled: true + resource_attributes: + VolumeId: + enabled: true +none_set: + metrics: + diskio_ebs_ec2_instance_performance_exceeded_iops: + enabled: false + diskio_ebs_ec2_instance_performance_exceeded_tp: + enabled: false + diskio_ebs_total_read_bytes: + enabled: false + diskio_ebs_total_read_ops: + enabled: false + diskio_ebs_total_read_time: + enabled: false + diskio_ebs_total_write_bytes: + enabled: false + diskio_ebs_total_write_ops: + enabled: false + diskio_ebs_total_write_time: + enabled: false + diskio_ebs_volume_performance_exceeded_iops: + enabled: false + diskio_ebs_volume_performance_exceeded_tp: + enabled: false + diskio_ebs_volume_queue_length: + enabled: false + resource_attributes: + VolumeId: + enabled: false +filter_set_include: + resource_attributes: + VolumeId: + enabled: true + metrics_include: + - regexp: ".*" +filter_set_exclude: + resource_attributes: + VolumeId: + enabled: true + metrics_exclude: + - strict: "VolumeId-val" diff --git a/receiver/awsebsnvmereceiver/internal/nvme/constants.go b/receiver/awsebsnvmereceiver/internal/nvme/constants.go new file mode 100644 index 0000000000..b112ce29a5 --- /dev/null +++ b/receiver/awsebsnvmereceiver/internal/nvme/constants.go @@ -0,0 +1,13 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package nvme + +const ( + devDirectoryPath = "/dev" + + nvmeDevicePrefix = "nvme" + nvmeSysDirectoryPath = "/sys/class/nvme" + + ebsNvmeModelName = "Amazon Elastic Block Store" +) diff --git a/receiver/awsebsnvmereceiver/internal/nvme/device_file_attributes.go b/receiver/awsebsnvmereceiver/internal/nvme/device_file_attributes.go new file mode 100644 index 0000000000..7a7439c929 --- /dev/null +++ b/receiver/awsebsnvmereceiver/internal/nvme/device_file_attributes.go @@ -0,0 +1,59 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package nvme + +import ( + "errors" + "fmt" +) + +type DeviceFileAttributes struct { + controller int + namespace int + partition int + deviceName string +} + +func ParseNvmeDeviceFileName(device string) (DeviceFileAttributes, error) { + controller := -1 + namespace := -1 + partition := -1 + + _, _ = fmt.Sscanf(device, "nvme%dn%dp%d", &controller, &namespace, &partition) + + if controller == -1 { + return DeviceFileAttributes{deviceName: device}, errors.New("unable to parse device name") + } + + return DeviceFileAttributes{ + controller: controller, + namespace: namespace, + partition: partition, + deviceName: device, + }, nil +} + +func (n *DeviceFileAttributes) Controller() int { + return n.controller +} + +func (n *DeviceFileAttributes) Namespace() int { + return n.namespace +} + +func (n *DeviceFileAttributes) Partition() int { + return n.partition +} + +func (n *DeviceFileAttributes) BaseDeviceName() (string, error) { + if n.Controller() == -1 { + return "", errors.New("unable to re-create device name due to missing controller id") + } + + return fmt.Sprintf("nvme%d", n.Controller()), nil +} + +func (n *DeviceFileAttributes) DeviceName() string { + return n.deviceName +} diff --git a/receiver/awsebsnvmereceiver/internal/nvme/device_file_attributes_test.go b/receiver/awsebsnvmereceiver/internal/nvme/device_file_attributes_test.go new file mode 100644 index 0000000000..2ee7ff7211 --- /dev/null +++ b/receiver/awsebsnvmereceiver/internal/nvme/device_file_attributes_test.go @@ -0,0 +1,124 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package nvme + +import ( + "fmt" + "testing" +) + +func TestParseNvmeDeviceFileName(t *testing.T) { + tests := []struct { + name string + device string + wantController int + wantNamespace int + wantPartition int + wantErr bool + }{ + { + name: "Valid controller only", + device: "nvme0", + wantController: 0, + wantNamespace: -1, + wantPartition: -1, + wantErr: false, + }, + { + name: "Valid controller and namespace", + device: "nvme0n1", + wantController: 0, + wantNamespace: 1, + wantPartition: -1, + wantErr: false, + }, + { + name: "Valid controller, namespace and partition", + device: "nvme0n1p2", + wantController: 0, + wantNamespace: 1, + wantPartition: 2, + wantErr: false, + }, + { + name: "Invalid prefix", + device: "abcd", + wantErr: true, + }, + { + name: "Invalid format nvmeanbp", + device: "nvmeanbp", + wantController: -1, + wantNamespace: -1, + wantPartition: -1, + wantErr: true, + }, + { + name: "Multiple digit controller", + device: "nvme12n1p2", + wantController: 12, + wantNamespace: 1, + wantPartition: 2, + wantErr: false, + }, + { + name: "Multiple digit namespace", + device: "nvme0n123", + wantController: 0, + wantNamespace: 123, + wantPartition: -1, + wantErr: false, + }, + { + name: "Non-numeric controller", + device: "nvmean1p2", + wantController: -1, + wantNamespace: 1, + wantPartition: 2, + wantErr: true, + }, + { + name: "Wrong order", + device: "nvmep1n1", + wantController: -1, + wantNamespace: -1, + wantPartition: -1, + wantErr: true, + }, + { + name: "Empty string", + device: "", + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := ParseNvmeDeviceFileName(tt.device) + if (err != nil) != tt.wantErr { + t.Errorf("ParseNvmeDeviceFileName() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !tt.wantErr { + if got.Controller() != tt.wantController { + t.Errorf("Controller() = %v, want %v", got.Controller(), tt.wantController) + } + if got.Namespace() != tt.wantNamespace { + t.Errorf("Namespace() = %v, want %v", got.Namespace(), tt.wantNamespace) + } + if got.Partition() != tt.wantPartition { + t.Errorf("Partition() = %v, want %v", got.Partition(), tt.wantPartition) + } + if got.DeviceName() != tt.device { + t.Errorf("DeviceName() = %v, want %v", got.DeviceName(), tt.device) + } + baseName, err := got.BaseDeviceName() + expectedBaseName := fmt.Sprintf("nvme%d", got.Controller()) + if err != nil || baseName != expectedBaseName { + t.Errorf("BaseDeviceName() = %v, want %v", baseName, expectedBaseName) + } + } + }) + } +} diff --git a/receiver/awsebsnvmereceiver/internal/nvme/ebs_metrics.go b/receiver/awsebsnvmereceiver/internal/nvme/ebs_metrics.go new file mode 100644 index 0000000000..c82cea51d5 --- /dev/null +++ b/receiver/awsebsnvmereceiver/internal/nvme/ebs_metrics.go @@ -0,0 +1,50 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +// The following code is based on https://github.com/kubernetes-sigs/aws-ebs-csi-driver/blob/master/pkg/metrics/nvme.go + +// Copyright 2024 The Kubernetes Authors. +// +// Licensed under the Apache License, Version 2.0 (the 'License'); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an 'AS IS' BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package nvme + +// EBSMetrics represents the parsed metrics from the NVMe log page. +type EBSMetrics struct { + EBSMagic uint64 + ReadOps uint64 + WriteOps uint64 + ReadBytes uint64 + WriteBytes uint64 + TotalReadTime uint64 + TotalWriteTime uint64 + EBSIOPSExceeded uint64 + EBSThroughputExceeded uint64 + EC2IOPSExceeded uint64 + EC2ThroughputExceeded uint64 + QueueLength uint64 + ReservedArea [416]byte + ReadLatency Histogram + WriteLatency Histogram +} + +type Histogram struct { + BinCount uint64 + Bins [64]HistogramBin +} + +type HistogramBin struct { + Lower uint64 + Upper uint64 + Count uint64 +} diff --git a/receiver/awsebsnvmereceiver/internal/nvme/ebs_metrics_notunix.go b/receiver/awsebsnvmereceiver/internal/nvme/ebs_metrics_notunix.go new file mode 100644 index 0000000000..5df5158919 --- /dev/null +++ b/receiver/awsebsnvmereceiver/internal/nvme/ebs_metrics_notunix.go @@ -0,0 +1,12 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +//go:build !linux + +package nvme + +import "errors" + +func GetMetrics(devicePath string) (EBSMetrics, error) { + return EBSMetrics{}, errors.New("ebs metrics not supported") +} diff --git a/receiver/awsebsnvmereceiver/internal/nvme/ebs_metrics_unix.go b/receiver/awsebsnvmereceiver/internal/nvme/ebs_metrics_unix.go new file mode 100644 index 0000000000..9a51411352 --- /dev/null +++ b/receiver/awsebsnvmereceiver/internal/nvme/ebs_metrics_unix.go @@ -0,0 +1,128 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +// The following code is based on https://github.com/kubernetes-sigs/aws-ebs-csi-driver/blob/master/pkg/metrics/nvme.go + +// Copyright 2024 The Kubernetes Authors. +// +// Licensed under the Apache License, Version 2.0 (the 'License'); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an 'AS IS' BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build linux + +package nvme + +import ( + "bytes" + "encoding/binary" + "errors" + "fmt" + "math" + "os" + "syscall" + "unsafe" +) + +// As defined in . +type nvmePassthruCommand struct { + opcode uint8 + flags uint8 + rsvd1 uint16 + nsid uint32 + cdw2 uint32 + cdw3 uint32 + metadata uint64 + addr uint64 + metadataLen uint32 + dataLen uint32 + cdw10 uint32 + cdw11 uint32 + cdw12 uint32 + cdw13 uint32 + cdw14 uint32 + cdw15 uint32 + timeoutMs uint32 + result uint32 +} + +var ( + ErrInvalidEBSMagic = errors.New("invalid EBS magic number") + ErrParseLogPage = errors.New("failed to parse log page") +) + +func GetMetrics(devicePath string) (EBSMetrics, error) { + data, err := getNVMEMetrics(devicePath) + if err != nil { + return EBSMetrics{}, err + } + + return parseLogPage(data) +} + +// getNVMEMetrics retrieves NVMe metrics by reading the log page from the NVMe device at the given path. +func getNVMEMetrics(devicePath string) ([]byte, error) { + f, err := os.OpenFile(devicePath, os.O_RDWR, 0) + if err != nil { + return nil, fmt.Errorf("getNVMEMetrics: error opening device: %w", err) + } + defer f.Close() + + data, err := nvmeReadLogPage(f.Fd(), 0xD0) + if err != nil { + return nil, fmt.Errorf("getNVMEMetrics: error reading log page %w", err) + } + + return data, nil +} + +// nvmeReadLogPage reads an NVMe log page via an ioctl system call. +func nvmeReadLogPage(fd uintptr, logID uint8) ([]byte, error) { + data := make([]byte, 4096) // 4096 bytes is the length of the log page. + bufferLen := len(data) + + if bufferLen > math.MaxUint32 { + return nil, errors.New("nvmeReadLogPage: bufferLen exceeds MaxUint32") + } + + cmd := nvmePassthruCommand{ + opcode: 0x02, + addr: uint64(uintptr(unsafe.Pointer(&data[0]))), + nsid: 1, + dataLen: uint32(bufferLen), + cdw10: uint32(logID) | (1024 << 16), + } + + status, _, errno := syscall.Syscall(syscall.SYS_IOCTL, fd, 0xC0484E41, uintptr(unsafe.Pointer(&cmd))) + if errno != 0 { + return nil, fmt.Errorf("nvmeReadLogPage: ioctl error %w", errno) + } + if status != 0 { + return nil, fmt.Errorf("nvmeReadLogPage: ioctl command failed with status %d", status) + } + return data, nil +} + +// parseLogPage parses the binary data from an EBS log page into EBSMetrics. +func parseLogPage(data []byte) (EBSMetrics, error) { + var metrics EBSMetrics + reader := bytes.NewReader(data) + + if err := binary.Read(reader, binary.LittleEndian, &metrics); err != nil { + return EBSMetrics{}, fmt.Errorf("%w: %w", ErrParseLogPage, err) + } + + if metrics.EBSMagic != 0x3C23B510 { + return EBSMetrics{}, fmt.Errorf("%w: %x", ErrInvalidEBSMagic, metrics.EBSMagic) + } + + return metrics, nil +} diff --git a/receiver/awsebsnvmereceiver/internal/nvme/ebs_metrics_unix_test.go b/receiver/awsebsnvmereceiver/internal/nvme/ebs_metrics_unix_test.go new file mode 100644 index 0000000000..aa08463e4e --- /dev/null +++ b/receiver/awsebsnvmereceiver/internal/nvme/ebs_metrics_unix_test.go @@ -0,0 +1,132 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +// The following code is based on https://github.com/kubernetes-sigs/aws-ebs-csi-driver/blob/master/pkg/metrics/nvme.go + +// Copyright 2024 The Kubernetes Authors. +// +// Licensed under the Apache License, Version 2.0 (the 'License'); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an 'AS IS' BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build linux + +package nvme + +import ( + "bytes" + "encoding/binary" + "strings" + "testing" +) + +func TestParseLogPage(t *testing.T) { + tests := []struct { + name string + input []byte + want EBSMetrics + wantErr string + }{ + { + name: "valid log page", + input: func() []byte { + metrics := EBSMetrics{ + EBSMagic: 0x3C23B510, + ReadOps: 100, + WriteOps: 200, + ReadBytes: 1024, + WriteBytes: 2048, + TotalReadTime: 5000, + TotalWriteTime: 6000, + EBSIOPSExceeded: 10, + EBSThroughputExceeded: 20, + } + buf := new(bytes.Buffer) + if err := binary.Write(buf, binary.LittleEndian, metrics); err != nil { + t.Fatalf("failed to create test data: %v", err) + } + return buf.Bytes() + }(), + want: EBSMetrics{ + EBSMagic: 0x3C23B510, + ReadOps: 100, + WriteOps: 200, + ReadBytes: 1024, + WriteBytes: 2048, + TotalReadTime: 5000, + TotalWriteTime: 6000, + EBSIOPSExceeded: 10, + EBSThroughputExceeded: 20, + }, + wantErr: "", + }, + { + name: "invalid magic number", + input: func() []byte { + metrics := EBSMetrics{ + EBSMagic: 0x12345678, + } + buf := new(bytes.Buffer) + if err := binary.Write(buf, binary.LittleEndian, metrics); err != nil { + t.Fatalf("failed to create test data: %v", err) + } + return buf.Bytes() + }(), + want: EBSMetrics{}, + wantErr: ErrInvalidEBSMagic.Error(), + }, + { + name: "empty data", + input: []byte{}, + want: EBSMetrics{}, + wantErr: ErrParseLogPage.Error(), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := parseLogPage(tt.input) + + if tt.wantErr != "" { + if err == nil { + t.Errorf("parseLogPage() error = nil, wantErr %v", tt.wantErr) + return + } + if !strings.Contains(err.Error(), tt.wantErr) { + t.Errorf("parseLogPage() error = %v, wantErr %v", err, tt.wantErr) + return + } + return + } + + if err != nil { + t.Errorf("parseLogPage() unexpected error = %v", err) + return + } + + if got.EBSMagic != tt.want.EBSMagic { + t.Errorf("parseLogPage() magic number = %x, want %x", got.EBSMagic, tt.want.EBSMagic) + } + if got.ReadOps != tt.want.ReadOps { + t.Errorf("parseLogPage() ReadOps = %v, want %v", got.ReadOps, tt.want.ReadOps) + } + if got.WriteOps != tt.want.WriteOps { + t.Errorf("parseLogPage() WriteOps = %v, want %v", got.WriteOps, tt.want.WriteOps) + } + if got.ReadBytes != tt.want.ReadBytes { + t.Errorf("parseLogPage() ReadBytes = %v, want %v", got.ReadBytes, tt.want.ReadBytes) + } + if got.WriteBytes != tt.want.WriteBytes { + t.Errorf("parseLogPage() WriteBytes = %v, want %v", got.WriteBytes, tt.want.WriteBytes) + } + }) + } +} diff --git a/receiver/awsebsnvmereceiver/internal/nvme/util.go b/receiver/awsebsnvmereceiver/internal/nvme/util.go new file mode 100644 index 0000000000..e7f3ae6aea --- /dev/null +++ b/receiver/awsebsnvmereceiver/internal/nvme/util.go @@ -0,0 +1,15 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package nvme + +type DeviceInfoProvider interface { + GetAllDevices() ([]DeviceFileAttributes, error) + GetDeviceSerial(*DeviceFileAttributes) (string, error) + GetDeviceModel(*DeviceFileAttributes) (string, error) + IsEbsDevice(*DeviceFileAttributes) (bool, error) + DevicePath(string) (string, error) +} + +type Util struct { +} diff --git a/receiver/awsebsnvmereceiver/internal/nvme/util_notunix.go b/receiver/awsebsnvmereceiver/internal/nvme/util_notunix.go new file mode 100644 index 0000000000..40e5a2011b --- /dev/null +++ b/receiver/awsebsnvmereceiver/internal/nvme/util_notunix.go @@ -0,0 +1,28 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +//go:build !linux + +package nvme + +import "errors" + +func (u *Util) GetAllDevices() ([]DeviceFileAttributes, error) { + return nil, errors.New("nvme not supported") +} + +func (u *Util) GetDeviceSerial(device *DeviceFileAttributes) (string, error) { + return "", errors.New("nvme not supported") +} + +func (u *Util) GetDeviceModel(device *DeviceFileAttributes) (string, error) { + return "", errors.New("nvme not supported") +} + +func (u *Util) IsEbsDevice(device *DeviceFileAttributes) (bool, error) { + return false, errors.New("nvme not supported") +} + +func (u *Util) DevicePath(device string) (string, error) { + return "", errors.New("nvme not supported") +} diff --git a/receiver/awsebsnvmereceiver/internal/nvme/util_unix.go b/receiver/awsebsnvmereceiver/internal/nvme/util_unix.go new file mode 100644 index 0000000000..a91217a11f --- /dev/null +++ b/receiver/awsebsnvmereceiver/internal/nvme/util_unix.go @@ -0,0 +1,72 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +//go:build linux + +package nvme + +import ( + "fmt" + "os" + "path/filepath" + "strings" +) + +// For unit testing +var osReadFile = os.ReadFile +var osReadDir = os.ReadDir + +func (u *Util) GetAllDevices() ([]DeviceFileAttributes, error) { + entries, err := osReadDir(devDirectoryPath) + if err != nil { + return nil, err + } + + devices := []DeviceFileAttributes{} + for _, entry := range entries { + if !entry.IsDir() && strings.HasPrefix(entry.Name(), nvmeDevicePrefix) { + device, err := ParseNvmeDeviceFileName(entry.Name()) + if err == nil { + devices = append(devices, device) + } + } + } + + return devices, nil +} + +func (u *Util) GetDeviceSerial(device *DeviceFileAttributes) (string, error) { + deviceName, err := device.BaseDeviceName() + if err != nil { + return "", err + } + data, err := osReadFile(fmt.Sprintf("%s/%s/serial", nvmeSysDirectoryPath, deviceName)) + if err != nil { + return "", err + } + return strings.TrimSpace(string(data)), nil +} + +func (u *Util) GetDeviceModel(device *DeviceFileAttributes) (string, error) { + deviceName, err := device.BaseDeviceName() + if err != nil { + return "", err + } + data, err := osReadFile(fmt.Sprintf("%s/%s/model", nvmeSysDirectoryPath, deviceName)) + if err != nil { + return "", err + } + return strings.TrimSpace(string(data)), nil +} + +func (u *Util) IsEbsDevice(device *DeviceFileAttributes) (bool, error) { + model, err := u.GetDeviceModel(device) + if err != nil { + return false, err + } + return model == ebsNvmeModelName, nil +} + +func (u *Util) DevicePath(device string) (string, error) { + return filepath.Join(devDirectoryPath, device), nil +} diff --git a/receiver/awsebsnvmereceiver/internal/nvme/util_unix_test.go b/receiver/awsebsnvmereceiver/internal/nvme/util_unix_test.go new file mode 100644 index 0000000000..29fd45672e --- /dev/null +++ b/receiver/awsebsnvmereceiver/internal/nvme/util_unix_test.go @@ -0,0 +1,300 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +//go:build linux + +package nvme + +import ( + "errors" + "os" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestGetAllDevices(t *testing.T) { + tests := []struct { + name string + mockDirEntries []os.DirEntry + mockError error + expected []DeviceFileAttributes + expectedError error + }{ + { + name: "successful read with multiple devices", + mockDirEntries: []os.DirEntry{ + mockDirEntry{name: "nvme0n1", isDir: false}, + mockDirEntry{name: "nvme1n1", isDir: false}, + mockDirEntry{name: "other-device", isDir: false}, // Should be ignored + mockDirEntry{name: "nvme2", isDir: true}, // Should be ignored because it's a directory + }, + expected: []DeviceFileAttributes{ + {controller: 0, namespace: 1, partition: -1, deviceName: "nvme0n1"}, + {controller: 1, namespace: 1, partition: -1, deviceName: "nvme1n1"}, + }, + }, + { + name: "directory read error", + mockError: errors.New("read error"), + expectedError: errors.New("read error"), + }, + { + name: "invalid device name format", + mockDirEntries: []os.DirEntry{ + mockDirEntry{name: "nvmeinvalid", isDir: false}, + mockDirEntry{name: "nvme0n1", isDir: false}, + }, + expected: []DeviceFileAttributes{ + {controller: 0, namespace: 1, partition: -1, deviceName: "nvme0n1"}, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Cleanup(func() { + osReadDir = os.ReadDir + }) + + osReadDir = func(_ string) ([]os.DirEntry, error) { + if tt.mockError != nil { + return nil, tt.mockError + } + return tt.mockDirEntries, nil + } + + util := &Util{} + devices, err := util.GetAllDevices() + + if tt.expectedError != nil { + assert.Error(t, err) + assert.Equal(t, tt.expectedError.Error(), err.Error()) + return + } + + assert.NoError(t, err) + assert.Equal(t, tt.expected, devices) + }) + } +} + +func TestGetDeviceSerial(t *testing.T) { + tests := []struct { + name string + device DeviceFileAttributes + mockData string + mockError error + expected string + expectedError error + }{ + { + name: "successful read", + device: DeviceFileAttributes{controller: 0, namespace: 1, partition: -1}, + mockData: "vol0123456789\n", + expected: "vol0123456789", + }, + { + name: "read error", + device: DeviceFileAttributes{controller: 0, namespace: 1, partition: -1}, + mockError: errors.New("read error"), + expectedError: errors.New("read error"), + }, + { + name: "padded serial number", + device: DeviceFileAttributes{controller: 0, namespace: 1, partition: -1}, + mockData: " vol0123456789 \n", + expected: "vol0123456789", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Cleanup(func() { + osReadFile = os.ReadFile + }) + + osReadFile = func(_ string) ([]byte, error) { + if tt.mockError != nil { + return nil, tt.mockError + } + return []byte(tt.mockData), nil + } + + util := &Util{} + serial, err := util.GetDeviceSerial(&tt.device) + + if tt.expectedError != nil { + assert.Error(t, err) + assert.Equal(t, tt.expectedError.Error(), err.Error()) + return + } + + assert.NoError(t, err) + assert.Equal(t, tt.expected, serial) + }) + } +} + +func TestGetDeviceModel(t *testing.T) { + tests := []struct { + name string + device DeviceFileAttributes + mockData string + mockError error + expected string + expectedError error + }{ + { + name: "successful read", + device: DeviceFileAttributes{controller: 0, namespace: 1, partition: -1}, + mockData: "Amazon Elastic Block Store\n", + expected: "Amazon Elastic Block Store", + }, + { + name: "read error", + device: DeviceFileAttributes{controller: 0, namespace: 1, partition: -1}, + mockError: errors.New("read error"), + expectedError: errors.New("read error"), + }, + { + name: "padded model name", + device: DeviceFileAttributes{controller: 0, namespace: 1, partition: -1}, + mockData: " Amazon Elastic Block Store \n", + expected: "Amazon Elastic Block Store", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Cleanup(func() { + osReadFile = os.ReadFile + }) + + osReadFile = func(_ string) ([]byte, error) { + if tt.mockError != nil { + return nil, tt.mockError + } + return []byte(tt.mockData), nil + } + + util := &Util{} + model, err := util.GetDeviceModel(&tt.device) + + if tt.expectedError != nil { + assert.Error(t, err) + assert.Equal(t, tt.expectedError.Error(), err.Error()) + return + } + + assert.NoError(t, err) + assert.Equal(t, tt.expected, model) + }) + } +} + +func TestIsEbsDevice(t *testing.T) { + tests := []struct { + name string + device DeviceFileAttributes + mockData string + mockError error + expected bool + expectedError error + }{ + { + name: "is EBS device", + device: DeviceFileAttributes{controller: 0, namespace: 1, partition: -1}, + mockData: "Amazon Elastic Block Store\n", + expected: true, + }, + { + name: "not EBS device", + device: DeviceFileAttributes{controller: 0, namespace: 1, partition: -1}, + mockData: "Other Storage Device\n", + expected: false, + }, + { + name: "read error", + device: DeviceFileAttributes{controller: 0, namespace: 1, partition: -1}, + mockError: errors.New("read error"), + expectedError: errors.New("read error"), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Cleanup(func() { + osReadFile = os.ReadFile + }) + + osReadFile = func(_ string) ([]byte, error) { + if tt.mockError != nil { + return nil, tt.mockError + } + return []byte(tt.mockData), nil + } + + util := &Util{} + isEbs, err := util.IsEbsDevice(&tt.device) + + if tt.expectedError != nil { + assert.Error(t, err) + assert.Equal(t, tt.expectedError.Error(), err.Error()) + return + } + + assert.NoError(t, err) + assert.Equal(t, tt.expected, isEbs) + }) + } +} + +// Mock DirEntry implementation +type mockDirEntry struct { + name string + isDir bool +} + +func (m mockDirEntry) Name() string { + return m.name +} + +func (m mockDirEntry) IsDir() bool { + return m.isDir +} + +func (m mockDirEntry) Type() os.FileMode { + return 0 +} + +func (m mockDirEntry) Info() (os.FileInfo, error) { + return nil, nil +} +func TestDevicePath(t *testing.T) { + tests := []struct { + name string + device string + expected string + }{ + { + name: "valid device", + device: "nvme0n1", + expected: "/dev/nvme0n1", + }, + { + name: "empty device", + device: "", + expected: "/dev", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + util := &Util{} + path, err := util.DevicePath(tt.device) + assert.NoError(t, err) + assert.Equal(t, tt.expected, path) + }) + } +} diff --git a/receiver/awsebsnvmereceiver/metadata.yaml b/receiver/awsebsnvmereceiver/metadata.yaml new file mode 100644 index 0000000000..900b1063a7 --- /dev/null +++ b/receiver/awsebsnvmereceiver/metadata.yaml @@ -0,0 +1,103 @@ +type: awsebsnvmereceiver + +status: + class: receiver + stability: + beta: [metrics] + distributions: [] + codeowners: + active: [duhminick] + +resource_attributes: + VolumeId: + enabled: true + description: Unique identifier to the EBS volume + type: string + +metrics: + diskio_ebs_total_read_ops: + description: The total number of completed read operations + enabled: true + sum: + monotonic: true + aggregation_temporality: cumulative + value_type: int + unit: "1" + diskio_ebs_total_write_ops: + description: The total number of completed write operations + enabled: false + sum: + monotonic: true + aggregation_temporality: cumulative + value_type: int + unit: "1" + diskio_ebs_total_read_bytes: + description: The total number of read bytes transferred + enabled: false + sum: + monotonic: true + aggregation_temporality: cumulative + value_type: int + unit: "By" + diskio_ebs_total_write_bytes: + description: The total number of write bytes transferred + enabled: false + sum: + monotonic: true + aggregation_temporality: cumulative + value_type: int + unit: "By" + diskio_ebs_total_read_time: + description: The total time spent, in microseconds, by all completed read operations + enabled: false + sum: + monotonic: true + aggregation_temporality: cumulative + value_type: int + unit: "us" + diskio_ebs_total_write_time: + description: The total time spent, in microseconds, by all completed write operations + enabled: false + sum: + monotonic: true + aggregation_temporality: cumulative + value_type: int + unit: "us" + diskio_ebs_volume_performance_exceeded_iops: + description: The total time, in microseconds, that IOPS demand exceeded the volume's provisioned IOPS performance + enabled: false + sum: + monotonic: true + aggregation_temporality: cumulative + value_type: int + unit: "us" + diskio_ebs_volume_performance_exceeded_tp: + description: The total time, in microseconds, that throughput demand exceeded the volume's provisioned throughput performance + enabled: false + sum: + monotonic: true + aggregation_temporality: cumulative + value_type: int + unit: "us" + diskio_ebs_ec2_instance_performance_exceeded_iops: + description: The total time, in microseconds, that the EBS volume exceeded the attached Amazon EC2 instance's maximum IOPS performance + enabled: false + sum: + monotonic: true + aggregation_temporality: cumulative + value_type: int + unit: "us" + diskio_ebs_ec2_instance_performance_exceeded_tp: + description: The total time, in microseconds, that the EBS volume exceeded the attached Amazon EC2 instance's maximum throughput performance + enabled: false + sum: + monotonic: true + aggregation_temporality: cumulative + value_type: int + unit: "us" + diskio_ebs_volume_queue_length: + description: The number of read and write operations waiting to be completed + enabled: false + gauge: + value_type: int + unit: "1" diff --git a/receiver/awsebsnvmereceiver/scraper.go b/receiver/awsebsnvmereceiver/scraper.go new file mode 100644 index 0000000000..3bcc0536ec --- /dev/null +++ b/receiver/awsebsnvmereceiver/scraper.go @@ -0,0 +1,200 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package awsebsnvmereceiver + +import ( + "context" + "fmt" + "math" + "strings" + "time" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/receiver" + "go.uber.org/zap" + + "github.com/aws/amazon-cloudwatch-agent/internal/util/collections" + "github.com/aws/amazon-cloudwatch-agent/receiver/awsebsnvmereceiver/internal/metadata" + "github.com/aws/amazon-cloudwatch-agent/receiver/awsebsnvmereceiver/internal/nvme" +) + +type nvmeScraper struct { + logger *zap.Logger + mb *metadata.MetricsBuilder + nvme nvme.DeviceInfoProvider + + allowedDevices collections.Set[string] +} + +type ebsDevices struct { + volumeID string + deviceNames []string +} + +type recordDataMetricFunc func(pcommon.Timestamp, int64) + +// For unit testing +var getMetrics = nvme.GetMetrics + +func (s *nvmeScraper) start(_ context.Context, _ component.Host) error { + s.logger.Debug("Starting NVMe scraper", zap.String("receiver", metadata.Type.String())) + return nil +} + +func (s *nvmeScraper) shutdown(_ context.Context) error { + s.logger.Debug("Shutting down NVMe scraper", zap.String("receiver", metadata.Type.String())) + return nil +} + +func (s *nvmeScraper) scrape(_ context.Context) (pmetric.Metrics, error) { + s.logger.Debug("Began scraping for NVMe metrics") + + ebsDevicesByController, err := s.getEbsDevicesByController() + if err != nil { + return pmetric.NewMetrics(), err + } + + now := pcommon.NewTimestampFromTime(time.Now()) + + for id, ebsDevices := range ebsDevicesByController { + // Some devices are owned by root:root, root:disk, etc, so the agent will attempt to + // retrieve the metric for a device (grouped by controller ID) until the first + // success + foundWorkingDevice := false + + for _, device := range ebsDevices.deviceNames { + if foundWorkingDevice { + break + } + + devicePath, err := s.nvme.DevicePath(device) + if err != nil { + s.logger.Debug("unable to get device path", zap.String("device", device), zap.Error(err)) + continue + } + metrics, err := getMetrics(devicePath) + if err != nil { + s.logger.Debug("unable to get metrics for device", zap.String("device", device), zap.Error(err)) + continue + } + + foundWorkingDevice = true + + rb := s.mb.NewResourceBuilder() + rb.SetVolumeID(ebsDevices.volumeID) + + s.recordMetric(s.mb.RecordDiskioEbsTotalReadOpsDataPoint, now, metrics.ReadOps) + s.recordMetric(s.mb.RecordDiskioEbsTotalWriteOpsDataPoint, now, metrics.WriteOps) + s.recordMetric(s.mb.RecordDiskioEbsTotalReadBytesDataPoint, now, metrics.ReadBytes) + s.recordMetric(s.mb.RecordDiskioEbsTotalWriteBytesDataPoint, now, metrics.WriteBytes) + s.recordMetric(s.mb.RecordDiskioEbsTotalReadTimeDataPoint, now, metrics.TotalReadTime) + s.recordMetric(s.mb.RecordDiskioEbsTotalWriteTimeDataPoint, now, metrics.TotalWriteTime) + s.recordMetric(s.mb.RecordDiskioEbsVolumePerformanceExceededIopsDataPoint, now, metrics.EBSIOPSExceeded) + s.recordMetric(s.mb.RecordDiskioEbsVolumePerformanceExceededTpDataPoint, now, metrics.EBSThroughputExceeded) + s.recordMetric(s.mb.RecordDiskioEbsEc2InstancePerformanceExceededIopsDataPoint, now, metrics.EC2IOPSExceeded) + s.recordMetric(s.mb.RecordDiskioEbsEc2InstancePerformanceExceededTpDataPoint, now, metrics.EC2ThroughputExceeded) + s.recordMetric(s.mb.RecordDiskioEbsVolumeQueueLengthDataPoint, now, metrics.QueueLength) + + s.mb.EmitForResource(metadata.WithResource(rb.Emit())) + } + + if foundWorkingDevice { + s.logger.Debug("emitted metrics for nvme device with controller id", zap.Int("controllerID", id), zap.String("volumeID", ebsDevices.volumeID)) + } else { + s.logger.Debug("unable to get metrics for nvme device with controller id", zap.Int("controllerID", id), zap.String("volumeID", ebsDevices.volumeID)) + } + } + + return s.mb.Emit(), nil +} + +// nvme0, nvme1, ... nvme{n} can have multiple devices with the same controller ID. +// For example nvme0n1, nvme0n1p1 are all under the controller ID 0. The metrics +// are the same based on the controller ID. We also do not want to duplicate metrics +// so we group the devices by the controller ID. +func (s *nvmeScraper) getEbsDevicesByController() (map[int]*ebsDevices, error) { + allNvmeDevices, err := s.nvme.GetAllDevices() + if err != nil { + return nil, err + } + + devices := make(map[int]*ebsDevices) + + for _, device := range allNvmeDevices { + deviceName := device.DeviceName() + + // Check if all devices should be collected. Otherwise check if defined by user + hasAsterisk := s.allowedDevices.Contains("*") + if !hasAsterisk { + if isAllowed := s.allowedDevices.Contains(deviceName); !isAllowed { + s.logger.Debug("skipping un-allowed device", zap.String("device", deviceName)) + continue + } + } + + // NVMe device with the same controller ID was already seen. We do not need to repeat the work of + // retrieving the volume ID and validating if it's an EBS device + if entry, seenController := devices[device.Controller()]; seenController { + entry.deviceNames = append(entry.deviceNames, deviceName) + s.logger.Debug("skipping unnecessary device validation steps", zap.String("device", deviceName)) + continue + } + + isEbs, err := s.nvme.IsEbsDevice(&device) + if err != nil || !isEbs { + s.logger.Debug("skipping non-ebs nvme device", zap.String("device", deviceName), zap.Error(err)) + continue + } + + serial, err := s.nvme.GetDeviceSerial(&device) + if err != nil { + s.logger.Debug("unable to get serial number of device", zap.String("device", deviceName), zap.Error(err)) + continue + } + + // The serial should begin with vol and have content after the vol prefix + if !strings.HasPrefix(serial, "vol") || len(serial) < 4 { + s.logger.Debug("device serial is not a valid volume id", zap.String("device", deviceName), zap.String("serial", serial)) + continue + } + + devices[device.Controller()] = &ebsDevices{ + deviceNames: []string{deviceName}, + volumeID: fmt.Sprintf("vol-%s", serial[3:]), + } + } + + return devices, nil +} + +func newScraper(cfg *Config, + settings receiver.Settings, + nvme nvme.DeviceInfoProvider, + allowedDevices collections.Set[string], +) *nvmeScraper { + return &nvmeScraper{ + logger: settings.TelemetrySettings.Logger, + mb: metadata.NewMetricsBuilder(cfg.MetricsBuilderConfig, settings), + nvme: nvme, + allowedDevices: allowedDevices, + } +} + +func (s *nvmeScraper) recordMetric(recordFn recordDataMetricFunc, ts pcommon.Timestamp, val uint64) { + converted, err := safeUint64ToInt64(val) + if err != nil { + s.logger.Debug("skipping metric due to potential integer overflow") + return + } + recordFn(ts, converted) +} + +func safeUint64ToInt64(value uint64) (int64, error) { + if value > math.MaxInt64 { + return 0, fmt.Errorf("value %d is too large for int64", value) + } + return int64(value), nil +} diff --git a/receiver/awsebsnvmereceiver/scraper_test.go b/receiver/awsebsnvmereceiver/scraper_test.go new file mode 100644 index 0000000000..9e4bf7ec9c --- /dev/null +++ b/receiver/awsebsnvmereceiver/scraper_test.go @@ -0,0 +1,477 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package awsebsnvmereceiver + +import ( + "context" + "errors" + "reflect" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/receiver/receivertest" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "go.uber.org/zap/zaptest/observer" + + "github.com/aws/amazon-cloudwatch-agent/internal/util/collections" + "github.com/aws/amazon-cloudwatch-agent/receiver/awsebsnvmereceiver/internal/nvme" +) + +// mockNvmeUtil is a mock implementation of the DeviceInfoProvider +type mockNvmeUtil struct { + mock.Mock +} + +func (m *mockNvmeUtil) GetAllDevices() ([]nvme.DeviceFileAttributes, error) { + args := m.Called() + return args.Get(0).([]nvme.DeviceFileAttributes), args.Error(1) +} + +func (m *mockNvmeUtil) GetDeviceSerial(device *nvme.DeviceFileAttributes) (string, error) { + args := m.Called(device) + return args.String(0), args.Error(1) +} + +func (m *mockNvmeUtil) GetDeviceModel(device *nvme.DeviceFileAttributes) (string, error) { + args := m.Called(device) + return args.String(0), args.Error(1) +} + +func (m *mockNvmeUtil) IsEbsDevice(device *nvme.DeviceFileAttributes) (bool, error) { + args := m.Called(device) + return args.Bool(0), args.Error(1) +} + +func (m *mockNvmeUtil) DevicePath(device string) (string, error) { + args := m.Called(device) + return args.String(0), args.Error(1) +} + +// mockGetMetrics is a mock function for nvme.GetMetrics +func mockGetMetrics(_ string) (nvme.EBSMetrics, error) { + return nvme.EBSMetrics{ + EBSMagic: 0x3C23B510, + ReadOps: 100, + WriteOps: 200, + ReadBytes: 1024, + WriteBytes: 2048, + TotalReadTime: 500, + TotalWriteTime: 600, + EBSIOPSExceeded: 1, + EBSThroughputExceeded: 2, + EC2IOPSExceeded: 3, + EC2ThroughputExceeded: 4, + QueueLength: 5, + }, nil +} + +// mockGetMetricsError is a mock function that always returns an error +func mockGetMetricsError(_ string) (nvme.EBSMetrics, error) { + return nvme.EBSMetrics{}, errors.New("failed to get metrics") +} + +func TestScraper_Start(t *testing.T) { + mockUtil := new(mockNvmeUtil) + scraper := newScraper(createTestReceiverConfig(), receivertest.NewNopSettings(), mockUtil, collections.NewSet[string]()) + + err := scraper.start(context.Background(), componenttest.NewNopHost()) + assert.NoError(t, err) +} + +func TestScraper_Shutdown(t *testing.T) { + mockUtil := new(mockNvmeUtil) + scraper := newScraper(createTestReceiverConfig(), receivertest.NewNopSettings(), mockUtil, collections.NewSet[string]()) + + err := scraper.shutdown(context.Background()) + assert.NoError(t, err) +} + +func TestScraper_Scrape_NoDevices(t *testing.T) { + mockUtil := new(mockNvmeUtil) + mockUtil.On("GetAllDevices").Return([]nvme.DeviceFileAttributes{}, nil) + + scraper := newScraper(createTestReceiverConfig(), receivertest.NewNopSettings(), mockUtil, collections.NewSet[string]()) + + metrics, err := scraper.scrape(context.Background()) + assert.NoError(t, err) + assert.Equal(t, 0, metrics.ResourceMetrics().Len()) + + mockUtil.AssertExpectations(t) +} + +func TestScraper_Scrape_GetAllDevicesError(t *testing.T) { + mockUtil := new(mockNvmeUtil) + mockUtil.On("GetAllDevices").Return([]nvme.DeviceFileAttributes{}, errors.New("failed to get devices")) + + scraper := newScraper(createTestReceiverConfig(), receivertest.NewNopSettings(), mockUtil, collections.NewSet[string]()) + + _, err := scraper.scrape(context.Background()) + assert.Error(t, err) + assert.Contains(t, err.Error(), "failed to get devices") + + mockUtil.AssertExpectations(t) +} + +func TestScraper_Scrape_Success(t *testing.T) { + t.Cleanup(func() { + getMetrics = nvme.GetMetrics + }) + getMetrics = mockGetMetrics + + device1, err := nvme.ParseNvmeDeviceFileName("nvme0n1") + require.NoError(t, err) + + mockUtil := new(mockNvmeUtil) + mockUtil.On("GetAllDevices").Return([]nvme.DeviceFileAttributes{device1}, nil) + mockUtil.On("IsEbsDevice", &device1).Return(true, nil) + mockUtil.On("GetDeviceSerial", &device1).Return("vol1234567890abcdef", nil) + mockUtil.On("DevicePath", "nvme0n1").Return("/dev/nvme0n1", nil) + + // Allow all devices with empty map + scraper := newScraper(createTestReceiverConfig(), receivertest.NewNopSettings(), mockUtil, collections.NewSet[string]("*")) + + metrics, err := scraper.scrape(context.Background()) + assert.NoError(t, err) + + // Verify metrics + assert.Equal(t, 1, metrics.ResourceMetrics().Len()) + + rm := metrics.ResourceMetrics().At(0) + assert.Equal(t, "vol-1234567890abcdef", rm.Resource().Attributes().AsRaw()["VolumeId"]) + + // Check metric values + ilm := rm.ScopeMetrics().At(0).Metrics() + assert.Equal(t, 11, ilm.Len()) // We expect 11 metrics based on the scraper implementation + + // Verify specific metrics + verifySumMetric(t, ilm, "diskio_ebs_total_read_ops", 100) + verifySumMetric(t, ilm, "diskio_ebs_total_write_ops", 200) + verifySumMetric(t, ilm, "diskio_ebs_total_read_bytes", 1024) + verifySumMetric(t, ilm, "diskio_ebs_total_write_bytes", 2048) + verifySumMetric(t, ilm, "diskio_ebs_total_read_time", 500) + verifySumMetric(t, ilm, "diskio_ebs_total_write_time", 600) + verifySumMetric(t, ilm, "diskio_ebs_volume_performance_exceeded_iops", 1) + verifySumMetric(t, ilm, "diskio_ebs_volume_performance_exceeded_tp", 2) + verifySumMetric(t, ilm, "diskio_ebs_ec2_instance_performance_exceeded_iops", 3) + verifySumMetric(t, ilm, "diskio_ebs_ec2_instance_performance_exceeded_tp", 4) + verifyGaugeMetric(t, ilm, "diskio_ebs_volume_queue_length", 5) + + mockUtil.AssertExpectations(t) +} + +func TestScraper_Scrape_NonEbsDevice(t *testing.T) { + device1, err := nvme.ParseNvmeDeviceFileName("nvme0n1") + require.NoError(t, err) + + mockUtil := new(mockNvmeUtil) + mockUtil.On("GetAllDevices").Return([]nvme.DeviceFileAttributes{device1}, nil) + mockUtil.On("IsEbsDevice", &device1).Return(false, nil) + + scraper := newScraper(createTestReceiverConfig(), receivertest.NewNopSettings(), mockUtil, collections.NewSet[string]("*")) + + metrics, err := scraper.scrape(context.Background()) + assert.NoError(t, err) + assert.Equal(t, 0, metrics.ResourceMetrics().Len()) + + mockUtil.AssertExpectations(t) +} + +func TestScraper_Scrape_IsEbsDeviceError(t *testing.T) { + device1, err := nvme.ParseNvmeDeviceFileName("nvme0n1") + require.NoError(t, err) + + mockUtil := new(mockNvmeUtil) + mockUtil.On("GetAllDevices").Return([]nvme.DeviceFileAttributes{device1}, nil) + mockUtil.On("IsEbsDevice", &device1).Return(false, errors.New("failed to check device")) + + scraper := newScraper(createTestReceiverConfig(), receivertest.NewNopSettings(), mockUtil, collections.NewSet[string]("*")) + + metrics, err := scraper.scrape(context.Background()) + assert.NoError(t, err) + assert.Equal(t, 0, metrics.ResourceMetrics().Len()) + + mockUtil.AssertExpectations(t) +} + +func TestScraper_Scrape_GetDeviceSerialError(t *testing.T) { + device1, err := nvme.ParseNvmeDeviceFileName("nvme0n1") + require.NoError(t, err) + + mockUtil := new(mockNvmeUtil) + mockUtil.On("GetAllDevices").Return([]nvme.DeviceFileAttributes{device1}, nil) + mockUtil.On("IsEbsDevice", &device1).Return(true, nil) + mockUtil.On("GetDeviceSerial", &device1).Return("", errors.New("failed to get serial")) + + scraper := newScraper(createTestReceiverConfig(), receivertest.NewNopSettings(), mockUtil, collections.NewSet[string]("*")) + + metrics, err := scraper.scrape(context.Background()) + assert.NoError(t, err) + assert.Equal(t, 0, metrics.ResourceMetrics().Len()) + + mockUtil.AssertExpectations(t) +} + +func TestScraper_Scrape_InvalidSerialPrefix(t *testing.T) { + device1, err := nvme.ParseNvmeDeviceFileName("nvme0n1") + require.NoError(t, err) + + mockUtil := new(mockNvmeUtil) + mockUtil.On("GetAllDevices").Return([]nvme.DeviceFileAttributes{device1}, nil) + mockUtil.On("IsEbsDevice", &device1).Return(true, nil) + mockUtil.On("GetDeviceSerial", &device1).Return("invalid-serial", nil) + + scraper := newScraper(createTestReceiverConfig(), receivertest.NewNopSettings(), mockUtil, collections.NewSet[string]("*")) + + metrics, err := scraper.scrape(context.Background()) + assert.NoError(t, err) + assert.Equal(t, 0, metrics.ResourceMetrics().Len()) + + mockUtil.AssertExpectations(t) +} + +func TestScraper_Scrape_GetMetricsError(t *testing.T) { + t.Cleanup(func() { + getMetrics = nvme.GetMetrics + }) + getMetrics = mockGetMetricsError + + device1, err := nvme.ParseNvmeDeviceFileName("nvme0n1") + require.NoError(t, err) + + mockUtil := new(mockNvmeUtil) + mockUtil.On("GetAllDevices").Return([]nvme.DeviceFileAttributes{device1}, nil) + mockUtil.On("IsEbsDevice", &device1).Return(true, nil) + mockUtil.On("GetDeviceSerial", &device1).Return("vol1234567890abcdef", nil) + mockUtil.On("DevicePath", "nvme0n1").Return("/dev/nvme0n1", nil) + + // Create a test logger to capture log messages + core, observedLogs := observer.New(zapcore.DebugLevel) + logger := zap.New(core) + + settings := receivertest.NewNopSettings() + settings.TelemetrySettings.Logger = logger + + scraper := newScraper(createTestReceiverConfig(), settings, mockUtil, collections.NewSet[string]("*")) + + metrics, err := scraper.scrape(context.Background()) + assert.NoError(t, err) + assert.Equal(t, 0, metrics.ResourceMetrics().Len()) + + foundLogMessage := false + for _, log := range observedLogs.All() { + if log.Message == "unable to get metrics for device" { + foundLogMessage = true + break + } + } + assert.True(t, foundLogMessage, "Expected to find log about unable to get metrics") + + mockUtil.AssertExpectations(t) +} + +func TestScraper_Scrape_MultipleDevices(t *testing.T) { + t.Cleanup(func() { + getMetrics = nvme.GetMetrics + }) + getMetrics = mockGetMetrics + + device1, err := nvme.ParseNvmeDeviceFileName("nvme0n1") + require.NoError(t, err) + + device2, err := nvme.ParseNvmeDeviceFileName("nvme1n1") + require.NoError(t, err) + + mockUtil := new(mockNvmeUtil) + mockUtil.On("GetAllDevices").Return([]nvme.DeviceFileAttributes{device1, device2}, nil) + mockUtil.On("IsEbsDevice", &device1).Return(true, nil) + mockUtil.On("GetDeviceSerial", &device1).Return("vol1234567890abcdef", nil) + mockUtil.On("DevicePath", "nvme0n1").Return("/dev/nvme0n1", nil) + mockUtil.On("IsEbsDevice", &device2).Return(true, nil) + mockUtil.On("GetDeviceSerial", &device2).Return("vol0987654321fedcba", nil) + mockUtil.On("DevicePath", "nvme1n1").Return("/dev/nvme1n1", nil) + + scraper := newScraper(createTestReceiverConfig(), receivertest.NewNopSettings(), mockUtil, collections.NewSet[string]("*")) + + metrics, err := scraper.scrape(context.Background()) + assert.NoError(t, err) + + assert.Equal(t, 2, metrics.ResourceMetrics().Len()) + + mockUtil.AssertExpectations(t) +} + +func TestScraper_Scrape_FilteredDevices(t *testing.T) { + t.Cleanup(func() { + getMetrics = nvme.GetMetrics + }) + getMetrics = mockGetMetrics + + device1, err := nvme.ParseNvmeDeviceFileName("nvme0n1") + require.NoError(t, err) + + device2, err := nvme.ParseNvmeDeviceFileName("nvme1n1") + require.NoError(t, err) + + mockUtil := new(mockNvmeUtil) + mockUtil.On("GetAllDevices").Return([]nvme.DeviceFileAttributes{device1, device2}, nil) + + mockUtil.On("IsEbsDevice", &device1).Return(true, nil) + mockUtil.On("GetDeviceSerial", &device1).Return("vol0987654321fedcba", nil) + mockUtil.On("DevicePath", "nvme0n1").Return("/dev/nvme0n1", nil) + + core, observedLogs := observer.New(zapcore.DebugLevel) + logger := zap.New(core) + + settings := receivertest.NewNopSettings() + settings.TelemetrySettings.Logger = logger + + // Only allow nvme0n1 + scraper := newScraper(createTestReceiverConfig(), settings, mockUtil, collections.NewSet[string]("nvme0n1")) + + metrics, err := scraper.scrape(context.Background()) + assert.NoError(t, err) + + // We should get one set of metrics because of nvme0n1 + assert.Equal(t, 1, metrics.ResourceMetrics().Len()) + + // Verify that we logged about skipping nvme1n1 + foundSkipLog := false + for _, log := range observedLogs.All() { + if log.Message == "skipping un-allowed device" && log.ContextMap()["device"] == "nvme1n1" { + foundSkipLog = true + break + } + } + assert.True(t, foundSkipLog, "Expected to find log about skipping un-allowed device") + + mockUtil.AssertExpectations(t) +} + +func TestScraper_Scrape_MultipleDevicesSameController(t *testing.T) { + t.Cleanup(func() { + getMetrics = nvme.GetMetrics + }) + getMetrics = mockGetMetrics + + device1, err := nvme.ParseNvmeDeviceFileName("nvme0n1") + require.NoError(t, err) + + device2, err := nvme.ParseNvmeDeviceFileName("nvme0n1p1") + require.NoError(t, err) + + mockUtil := new(mockNvmeUtil) + mockUtil.On("GetAllDevices").Return([]nvme.DeviceFileAttributes{device1, device2}, nil) + mockUtil.On("IsEbsDevice", &device1).Return(true, nil) + mockUtil.On("GetDeviceSerial", &device1).Return("vol1234567890abcdef", nil) + mockUtil.On("DevicePath", "nvme0n1").Return("/dev/nvme0n1", nil) + + core, observedLogs := observer.New(zapcore.DebugLevel) + logger := zap.New(core) + + settings := receivertest.NewNopSettings() + settings.TelemetrySettings.Logger = logger + + scraper := newScraper(createTestReceiverConfig(), settings, mockUtil, collections.NewSet[string]("*")) + + metrics, err := scraper.scrape(context.Background()) + assert.NoError(t, err) + + // Should only get one set of metrics for the controller + assert.Equal(t, 1, metrics.ResourceMetrics().Len()) + + // Verify that we logged about skipping unnecessary validation for the second device + foundSkipLog := false + for _, log := range observedLogs.All() { + if log.Message == "skipping unnecessary device validation steps" && log.ContextMap()["device"] == "nvme0n1p1" { + foundSkipLog = true + break + } + } + assert.True(t, foundSkipLog, "Expected to find log about skipping unnecessary device validation steps") + + mockUtil.AssertExpectations(t) +} + +func verifySumMetric(t *testing.T, metrics pmetric.MetricSlice, name string, expectedValue int64) { + for i := 0; i < metrics.Len(); i++ { + metric := metrics.At(i) + if metric.Name() == name { + assert.Equal(t, expectedValue, metric.Sum().DataPoints().At(0).IntValue()) + return + } + } + t.Errorf("Metric %s not found", name) +} + +func verifyGaugeMetric(t *testing.T, metrics pmetric.MetricSlice, name string, expectedValue int64) { + for i := 0; i < metrics.Len(); i++ { + metric := metrics.At(i) + if metric.Name() == name { + assert.Equal(t, expectedValue, metric.Gauge().DataPoints().At(0).IntValue()) + return + } + } + t.Errorf("Metric %s not found", name) +} + +// Test for the device path error case +func TestScraper_Scrape_DevicePathError(t *testing.T) { + device1, err := nvme.ParseNvmeDeviceFileName("nvme0n1") + require.NoError(t, err) + + mockUtil := new(mockNvmeUtil) + mockUtil.On("GetAllDevices").Return([]nvme.DeviceFileAttributes{device1}, nil) + mockUtil.On("IsEbsDevice", &device1).Return(true, nil) + mockUtil.On("GetDeviceSerial", &device1).Return("vol1234567890abcdef", nil) + mockUtil.On("DevicePath", "nvme0n1").Return("", errors.New("device path error")) + + core, observedLogs := observer.New(zapcore.DebugLevel) + logger := zap.New(core) + + settings := receivertest.NewNopSettings() + settings.TelemetrySettings.Logger = logger + + scraper := newScraper(createTestReceiverConfig(), settings, mockUtil, collections.NewSet[string]("*")) + + metrics, err := scraper.scrape(context.Background()) + assert.NoError(t, err) + assert.Equal(t, 0, metrics.ResourceMetrics().Len()) + + // Verify log message about device path error + foundLogMessage := false + for _, log := range observedLogs.All() { + if log.Message == "unable to get device path" { + foundLogMessage = true + break + } + } + assert.True(t, foundLogMessage, "Expected to find log about unable to get device path") + + mockUtil.AssertExpectations(t) +} + +func createTestReceiverConfig() *Config { + cfg := createDefaultConfig().(*Config) + + // Use reflection to enable all metrics + v := reflect.ValueOf(&cfg.MetricsBuilderConfig.Metrics).Elem() + + for i := 0; i < v.NumField(); i++ { + field := v.Field(i) + + if field.Kind() == reflect.Struct { + enabledField := field.FieldByName("Enabled") + if enabledField.IsValid() && enabledField.CanSet() { + enabledField.SetBool(true) + } + } + } + + return cfg +}