Skip to content

Commit b930014

Browse files
Implement Kubernetes Metadata Extension (#1605)
Co-authored-by: POOJA REDDY NATHALA <[email protected]>
1 parent 9a655fc commit b930014

11 files changed

+1089
-0
lines changed

extension/k8smetadata/README.md

+13
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
# Kubernetes Metadata
2+
3+
The Kubernetes Metadata utilizes a Kubernetes client to start an informer, which queries the Kubernetes API for EndpointSlices. The EndpointSlices are transformed to reduce storage and periodically updated.
4+
5+
> Kubernetes' EndpointSlice API provides a way to track network endpoints within a Kubernetes cluster. (https://kubernetes.io/docs/concepts/services-networking/endpoint-slices/)
6+
7+
These network endpoints expose relevant Kubernetes metadata for service-exposed applications.
8+
9+
Pod IP → {Workload, Namespace, Node} mappings are stored.
10+
- Workload: This is the application's name.
11+
- Namespace: This is the Kubernetes namespace the application is in.
12+
- Node: This is the Kubernetes node the application is in.
13+

extension/k8smetadata/config.go

+12
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: MIT
3+
4+
package k8smetadata
5+
6+
import (
7+
"go.opentelemetry.io/collector/component"
8+
)
9+
10+
type Config struct{}
11+
12+
var _ component.Config = (*Config)(nil)

extension/k8smetadata/config_test.go

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: MIT
3+
4+
package k8smetadata
5+
6+
import (
7+
"testing"
8+
9+
"github.com/stretchr/testify/assert"
10+
"go.opentelemetry.io/collector/confmap"
11+
)
12+
13+
func TestUnmarshalDefaultConfig(t *testing.T) {
14+
factory := NewFactory()
15+
cfg := factory.CreateDefaultConfig()
16+
assert.NoError(t, confmap.New().Unmarshal(cfg))
17+
assert.Equal(t, factory.CreateDefaultConfig(), cfg)
18+
}

extension/k8smetadata/extension.go

+99
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: MIT
3+
4+
package k8smetadata
5+
6+
import (
7+
"context"
8+
"math/rand"
9+
"time"
10+
11+
"go.opentelemetry.io/collector/component"
12+
"go.opentelemetry.io/collector/extension"
13+
"go.uber.org/atomic"
14+
"go.uber.org/zap"
15+
"k8s.io/client-go/informers"
16+
"k8s.io/client-go/kubernetes"
17+
"k8s.io/client-go/tools/clientcmd"
18+
19+
"github.com/aws/amazon-cloudwatch-agent/internal/k8sCommon/k8sclient"
20+
)
21+
22+
const (
23+
deletionDelay = 2 * time.Minute
24+
jitterKubernetesAPISeconds = 10
25+
)
26+
27+
type KubernetesMetadata struct {
28+
logger *zap.Logger
29+
config *Config
30+
ready atomic.Bool
31+
safeStopCh *k8sclient.SafeChannel
32+
endpointSliceWatcher *k8sclient.EndpointSliceWatcher
33+
}
34+
35+
var _ extension.Extension = (*KubernetesMetadata)(nil)
36+
37+
func jitterSleep(seconds int) {
38+
jitter := time.Duration(rand.Intn(seconds)) * time.Second // nolint:gosec
39+
time.Sleep(jitter)
40+
}
41+
42+
func (e *KubernetesMetadata) Start(_ context.Context, _ component.Host) error {
43+
e.logger.Debug("Starting k8smetadata extension...")
44+
45+
config, err := clientcmd.BuildConfigFromFlags("", "")
46+
if err != nil {
47+
e.logger.Error("Failed to create config", zap.Error(err))
48+
}
49+
50+
clientset, err := kubernetes.NewForConfig(config)
51+
if err != nil {
52+
e.logger.Error("Failed to create kubernetes client", zap.Error(err))
53+
}
54+
55+
// jitter calls to the kubernetes api (a precaution to prevent overloading api server)
56+
jitterSleep(jitterKubernetesAPISeconds)
57+
58+
timedDeleter := &k8sclient.TimedDeleter{Delay: deletionDelay}
59+
sharedInformerFactory := informers.NewSharedInformerFactory(clientset, 0)
60+
61+
e.endpointSliceWatcher = k8sclient.NewEndpointSliceWatcher(e.logger, sharedInformerFactory, timedDeleter)
62+
e.safeStopCh = &k8sclient.SafeChannel{Ch: make(chan struct{}), Closed: false}
63+
64+
e.endpointSliceWatcher.Run(e.safeStopCh.Ch)
65+
66+
e.endpointSliceWatcher.WaitForCacheSync(e.safeStopCh.Ch)
67+
68+
e.logger.Debug("EndpointSlice cache synced, extension fully started")
69+
e.ready.Store(true)
70+
71+
return nil
72+
}
73+
74+
func (e *KubernetesMetadata) Shutdown(_ context.Context) error {
75+
if e.safeStopCh != nil {
76+
e.safeStopCh.Close()
77+
}
78+
return nil
79+
}
80+
81+
func (e *KubernetesMetadata) GetPodMetadata(ip string) k8sclient.PodMetadata {
82+
if ip == "" {
83+
e.logger.Debug("GetPodMetadata: no IP provided")
84+
return k8sclient.PodMetadata{}
85+
}
86+
pm, ok := e.endpointSliceWatcher.IPToPodMetadata.Load(ip)
87+
if !ok {
88+
e.logger.Debug("GetPodMetadata: no mapping found for IP", zap.String("ip", ip))
89+
return k8sclient.PodMetadata{}
90+
}
91+
metadata := pm.(k8sclient.PodMetadata)
92+
e.logger.Debug("GetPodMetadata: found metadata",
93+
zap.String("ip", ip),
94+
zap.String("workload", metadata.Workload),
95+
zap.String("namespace", metadata.Namespace),
96+
zap.String("node", metadata.Node),
97+
)
98+
return metadata
99+
}
+64
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: MIT
3+
4+
package k8smetadata
5+
6+
import (
7+
"sync"
8+
"testing"
9+
10+
"github.com/stretchr/testify/assert"
11+
"go.uber.org/zap"
12+
13+
"github.com/aws/amazon-cloudwatch-agent/internal/k8sCommon/k8sclient"
14+
)
15+
16+
func TestKubernetesMetadata_GetPodMetadata(t *testing.T) {
17+
esw := &k8sclient.EndpointSliceWatcher{
18+
IPToPodMetadata: &sync.Map{},
19+
}
20+
21+
const testIP = "1.2.3.4"
22+
expected := k8sclient.PodMetadata{
23+
Workload: "my-workload",
24+
Namespace: "my-namespace",
25+
Node: "my-node",
26+
}
27+
esw.IPToPodMetadata.Store(testIP, expected)
28+
29+
kMeta := &KubernetesMetadata{
30+
logger: zap.NewNop(),
31+
endpointSliceWatcher: esw,
32+
}
33+
34+
got := kMeta.GetPodMetadata(testIP)
35+
assert.Equal(t, expected, got, "GetPodMetadata should return the stored PodMetadata for %s", testIP)
36+
37+
unknown := kMeta.GetPodMetadata("9.9.9.9")
38+
assert.Equal(t, k8sclient.PodMetadata{}, unknown, "GetPodMetadata should return empty if the IP is not present")
39+
40+
unknown = kMeta.GetPodMetadata("")
41+
assert.Equal(t, k8sclient.PodMetadata{}, unknown, "GetPodMetadata should return empty if the IP is empty")
42+
}
43+
44+
func TestKubernetesMetadata_GetPodMetadata_Incomplete(t *testing.T) {
45+
esw := &k8sclient.EndpointSliceWatcher{
46+
IPToPodMetadata: &sync.Map{},
47+
}
48+
49+
const testIP = "2.2.2.2"
50+
expected := k8sclient.PodMetadata{
51+
Workload: "incomplete-workload",
52+
Namespace: "",
53+
Node: "",
54+
}
55+
esw.IPToPodMetadata.Store(testIP, expected)
56+
57+
kMeta := &KubernetesMetadata{
58+
logger: zap.NewNop(),
59+
endpointSliceWatcher: esw,
60+
}
61+
62+
got := kMeta.GetPodMetadata(testIP)
63+
assert.Equal(t, expected, got, "GetPodMetadata should return the stored incomplete PodMetadata for IP %s", testIP)
64+
}

extension/k8smetadata/factory.go

+50
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: MIT
3+
4+
package k8smetadata
5+
6+
import (
7+
"context"
8+
"sync"
9+
10+
"go.opentelemetry.io/collector/component"
11+
"go.opentelemetry.io/collector/extension"
12+
)
13+
14+
var (
15+
TypeStr, _ = component.NewType("k8smetadata")
16+
kubernetesMetadata *KubernetesMetadata
17+
mutex sync.RWMutex
18+
)
19+
20+
func GetKubernetesMetadata() *KubernetesMetadata {
21+
mutex.RLock()
22+
defer mutex.RUnlock()
23+
if kubernetesMetadata != nil && kubernetesMetadata.ready.Load() {
24+
return kubernetesMetadata
25+
}
26+
return nil
27+
}
28+
29+
func NewFactory() extension.Factory {
30+
return extension.NewFactory(
31+
TypeStr,
32+
createDefaultConfig,
33+
createExtension,
34+
component.StabilityLevelAlpha,
35+
)
36+
}
37+
38+
func createDefaultConfig() component.Config {
39+
return &Config{}
40+
}
41+
42+
func createExtension(_ context.Context, settings extension.Settings, cfg component.Config) (extension.Extension, error) {
43+
mutex.Lock()
44+
defer mutex.Unlock()
45+
kubernetesMetadata = &KubernetesMetadata{
46+
logger: settings.Logger,
47+
config: cfg.(*Config),
48+
}
49+
return kubernetesMetadata, nil
50+
}

extension/k8smetadata/factory_test.go

+26
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: MIT
3+
4+
package k8smetadata
5+
6+
import (
7+
"context"
8+
"testing"
9+
10+
"github.com/stretchr/testify/assert"
11+
"go.opentelemetry.io/collector/component/componenttest"
12+
"go.opentelemetry.io/collector/extension/extensiontest"
13+
)
14+
15+
func TestCreateDefaultConfig(t *testing.T) {
16+
cfg := NewFactory().CreateDefaultConfig()
17+
assert.Equal(t, &Config{}, cfg)
18+
assert.NoError(t, componenttest.CheckConfigStruct(cfg))
19+
}
20+
21+
func TestCreate(t *testing.T) {
22+
cfg := &Config{}
23+
got, err := NewFactory().Create(context.Background(), extensiontest.NewNopSettings(), cfg)
24+
assert.NoError(t, err)
25+
assert.NotNil(t, got)
26+
}

0 commit comments

Comments
 (0)