Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix timestamp conversion in opamp bridge #3582

Merged
merged 1 commit into from
Dec 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 55 additions & 10 deletions cmd/operator-opamp-bridge/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package agent
import (
"bytes"
"context"
"errors"
"fmt"
"strings"
"time"
Expand Down Expand Up @@ -95,10 +96,18 @@ func (agent *Agent) getHealth() *protobufs.ComponentHealth {
LastError: err.Error(),
}
}
statusTime, err := agent.getCurrentTimeUnixNano()
if err != nil {
return &protobufs.ComponentHealth{
Healthy: false,
StartTimeUnixNano: agent.startTime,
LastError: err.Error(),
}
}
return &protobufs.ComponentHealth{
Healthy: true,
StartTimeUnixNano: agent.startTime,
StatusTimeUnixNano: uint64(agent.clock.Now().UnixNano()),
StatusTimeUnixNano: statusTime,
LastError: "",
ComponentHealthMap: healthMap,
}
Expand All @@ -124,9 +133,17 @@ func (agent *Agent) generateCollectorPoolHealth() (map[string]*protobufs.Compone
for _, pod := range podMap {
isPoolHealthy = isPoolHealthy && pod.Healthy
}
podStartTime, err := timeToUnixNanoUnsigned(col.ObjectMeta.GetCreationTimestamp().Time)
if err != nil {
return nil, err
}
statusTime, err := agent.getCurrentTimeUnixNano()
if err != nil {
return nil, err
}
healthMap[key.String()] = &protobufs.ComponentHealth{
StartTimeUnixNano: uint64(col.ObjectMeta.GetCreationTimestamp().UnixNano()),
StatusTimeUnixNano: uint64(agent.clock.Now().UnixNano()),
StartTimeUnixNano: podStartTime,
StatusTimeUnixNano: statusTime,
Status: col.Status.Scale.StatusReplicas,
ComponentHealthMap: podMap,
Healthy: isPoolHealthy,
Expand Down Expand Up @@ -158,6 +175,10 @@ func (agent *Agent) getCollectorSelector(col v1beta1.OpenTelemetryCollector) map
}

func (agent *Agent) generateCollectorHealth(selectorLabels map[string]string, namespace string) (map[string]*protobufs.ComponentHealth, error) {
statusTime, err := agent.getCurrentTimeUnixNano()
if err != nil {
return nil, err
}
pods, err := agent.applier.GetCollectorPods(selectorLabels, namespace)
if err != nil {
return nil, err
Expand All @@ -169,15 +190,18 @@ func (agent *Agent) generateCollectorHealth(selectorLabels map[string]string, na
if item.Status.Phase != "Running" {
healthy = false
}
var startTime int64
var startTime uint64
if item.Status.StartTime != nil {
startTime = item.Status.StartTime.UnixNano()
startTime, err = timeToUnixNanoUnsigned(item.Status.StartTime.Time)
if err != nil {
return nil, err
}
} else {
healthy = false
}
healthMap[key.String()] = &protobufs.ComponentHealth{
StartTimeUnixNano: uint64(startTime),
StatusTimeUnixNano: uint64(agent.clock.Now().UnixNano()),
StartTimeUnixNano: startTime,
StatusTimeUnixNano: statusTime,
Status: string(item.Status.Phase),
Healthy: healthy,
}
Expand All @@ -197,7 +221,7 @@ func (agent *Agent) onConnectFailed(ctx context.Context, err error) {

// onError is called when an agent receives an error response from the server.
func (agent *Agent) onError(ctx context.Context, err *protobufs.ServerErrorResponse) {
agent.logger.Error(fmt.Errorf(err.GetErrorMessage()), "server returned an error response")
agent.logger.Error(errors.New(err.GetErrorMessage()), "server returned an error response")
}

// saveRemoteConfigStatus receives a status from the server when the server sets a remote configuration.
Expand All @@ -207,7 +231,11 @@ func (agent *Agent) saveRemoteConfigStatus(_ context.Context, status *protobufs.

// Start sets up the callbacks for the OpAMP client and begins the client's connection to the server.
func (agent *Agent) Start() error {
agent.startTime = uint64(agent.clock.Now().UnixNano())
startTime, err := agent.getCurrentTimeUnixNano()
if err != nil {
return err
}
agent.startTime = startTime
settings := types.StartSettings{
OpAMPServerURL: agent.config.Endpoint,
Header: agent.config.Headers.ToHTTPHeader(),
Expand All @@ -224,7 +252,7 @@ func (agent *Agent) Start() error {
PackagesStateProvider: nil,
Capabilities: agent.config.GetCapabilities(),
}
err := agent.opampClient.SetAgentDescription(agent.agentDescription)
err = agent.opampClient.SetAgentDescription(agent.agentDescription)
if err != nil {
return err
}
Expand Down Expand Up @@ -429,3 +457,20 @@ func (agent *Agent) onMessage(ctx context.Context, msg *types.MessageData) {
agent.initMeter(msg.OwnMetricsConnSettings)
}
}

// getCurrentTimeUnixNano returns the current time as a uint64, which the protocol expects.
func (agent *Agent) getCurrentTimeUnixNano() (uint64, error) {
// technically this could be negative if the system time is set to before 1970-01-1
// the proto demands this to be a nonnegative number, so in that case, just return 0
return timeToUnixNanoUnsigned(agent.clock.Now())
}

// timeToUnixNanoUnsigned returns the number of nanoseconds elapsed from 1970-01-01 to the given time, but returns an
// error if the value is negative. OpAMP expects these values to be non-negative.
func timeToUnixNanoUnsigned(t time.Time) (uint64, error) {
signedUnixNano := t.UnixNano()
if signedUnixNano < 0 {
return 0, fmt.Errorf("invalid system time, must be after 01-01-1970 due to OpAMP requirements: %v", t)
}
return uint64(signedUnixNano), nil
}
50 changes: 26 additions & 24 deletions cmd/operator-opamp-bridge/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,7 @@ const (
agentTestFileBatchNotAllowedName = "testdata/agentbatchnotallowed.yaml"
agentTestFileNoProcessorsAllowedName = "testdata/agentnoprocessorsallowed.yaml"

// collectorStartTime is set to the result of a zero'd out creation timestamp
// read more here https://github.com/open-telemetry/opentelemetry-go/issues/4268
// we could attempt to hack the creation timestamp, but this is a constant and far easier.
collectorStartTime = uint64(11651379494838206464)
collectorStartTime = uint64(0)
)

var (
Expand All @@ -78,8 +75,9 @@ var (
updatedYamlConfigHash = getConfigHash(testCollectorKey, collectorUpdatedFile)
otherUpdatedYamlConfigHash = getConfigHash(otherCollectorKey, collectorUpdatedFile)

podTime = metav1.NewTime(time.UnixMicro(1704748549000000))
mockPodList = &v1.PodList{
podTime = metav1.NewTime(time.Unix(0, 0))
podTimeUnsigned, _ = timeToUnixNanoUnsigned(podTime.Time)
mockPodList = &v1.PodList{
TypeMeta: metav1.TypeMeta{
Kind: "PodList",
APIVersion: "v1",
Expand All @@ -95,6 +93,7 @@ var (
"app.kubernetes.io/part-of": "opentelemetry",
"app.kubernetes.io/component": "opentelemetry-collector",
},
CreationTimestamp: podTime,
},
Spec: v1.PodSpec{},
Status: v1.PodStatus{
Expand All @@ -119,6 +118,7 @@ var (
"app.kubernetes.io/part-of": "opentelemetry",
"app.kubernetes.io/component": "opentelemetry-collector",
},
CreationTimestamp: podTime,
},
Spec: v1.PodSpec{},
Status: v1.PodStatus{
Expand Down Expand Up @@ -215,6 +215,8 @@ func getFakeApplier(t *testing.T, conf *config.Config, lists ...runtimeClient.Ob

func TestAgent_getHealth(t *testing.T) {
fakeClock := testingclock.NewFakeClock(time.Now())
startTime, err := timeToUnixNanoUnsigned(fakeClock.Now())
require.NoError(t, err)
type fields struct {
configFile string
}
Expand Down Expand Up @@ -244,10 +246,10 @@ func TestAgent_getHealth(t *testing.T) {
want: []*protobufs.ComponentHealth{
{
Healthy: true,
StartTimeUnixNano: uint64(fakeClock.Now().UnixNano()),
StartTimeUnixNano: startTime,
LastError: "",
Status: "",
StatusTimeUnixNano: uint64(fakeClock.Now().UnixNano()),
StatusTimeUnixNano: startTime,
ComponentHealthMap: map[string]*protobufs.ComponentHealth{},
},
},
Expand All @@ -269,15 +271,15 @@ func TestAgent_getHealth(t *testing.T) {
want: []*protobufs.ComponentHealth{
{
Healthy: true,
StartTimeUnixNano: uint64(fakeClock.Now().UnixNano()),
StatusTimeUnixNano: uint64(fakeClock.Now().UnixNano()),
StartTimeUnixNano: startTime,
StatusTimeUnixNano: startTime,
ComponentHealthMap: map[string]*protobufs.ComponentHealth{
"testnamespace/collector": {
Healthy: true,
StartTimeUnixNano: collectorStartTime,
LastError: "",
Status: "",
StatusTimeUnixNano: uint64(fakeClock.Now().UnixNano()),
StatusTimeUnixNano: startTime,
ComponentHealthMap: map[string]*protobufs.ComponentHealth{},
},
},
Expand All @@ -302,23 +304,23 @@ func TestAgent_getHealth(t *testing.T) {
want: []*protobufs.ComponentHealth{
{
Healthy: true,
StartTimeUnixNano: uint64(fakeClock.Now().UnixNano()),
StatusTimeUnixNano: uint64(fakeClock.Now().UnixNano()),
StartTimeUnixNano: startTime,
StatusTimeUnixNano: startTime,
ComponentHealthMap: map[string]*protobufs.ComponentHealth{
"testnamespace/collector": {
Healthy: true,
StartTimeUnixNano: collectorStartTime,
LastError: "",
Status: "",
StatusTimeUnixNano: uint64(fakeClock.Now().UnixNano()),
StatusTimeUnixNano: startTime,
ComponentHealthMap: map[string]*protobufs.ComponentHealth{},
},
"testnamespace/other": {
Healthy: true,
StartTimeUnixNano: collectorStartTime,
LastError: "",
Status: "",
StatusTimeUnixNano: uint64(fakeClock.Now().UnixNano()),
StatusTimeUnixNano: startTime,
ComponentHealthMap: map[string]*protobufs.ComponentHealth{},
},
},
Expand All @@ -342,21 +344,21 @@ func TestAgent_getHealth(t *testing.T) {
want: []*protobufs.ComponentHealth{
{
Healthy: true,
StartTimeUnixNano: uint64(fakeClock.Now().UnixNano()),
StatusTimeUnixNano: uint64(fakeClock.Now().UnixNano()),
StartTimeUnixNano: startTime,
StatusTimeUnixNano: startTime,
ComponentHealthMap: map[string]*protobufs.ComponentHealth{
"other/third": {
Healthy: true,
StartTimeUnixNano: collectorStartTime,
LastError: "",
Status: "",
StatusTimeUnixNano: uint64(fakeClock.Now().UnixNano()),
StatusTimeUnixNano: startTime,
ComponentHealthMap: map[string]*protobufs.ComponentHealth{
otherCollectorName + "/" + thirdCollectorName + "-1": {
Healthy: true,
Status: "Running",
StatusTimeUnixNano: uint64(fakeClock.Now().UnixNano()),
StartTimeUnixNano: uint64(podTime.UnixNano()),
StatusTimeUnixNano: startTime,
StartTimeUnixNano: podTimeUnsigned,
},
},
},
Expand All @@ -381,20 +383,20 @@ func TestAgent_getHealth(t *testing.T) {
want: []*protobufs.ComponentHealth{
{
Healthy: true,
StartTimeUnixNano: uint64(fakeClock.Now().UnixNano()),
StatusTimeUnixNano: uint64(fakeClock.Now().UnixNano()),
StartTimeUnixNano: startTime,
StatusTimeUnixNano: startTime,
ComponentHealthMap: map[string]*protobufs.ComponentHealth{
"other/third": {
Healthy: false, // we're working with mocks so the status will never be reconciled.
StartTimeUnixNano: collectorStartTime,
LastError: "",
Status: "",
StatusTimeUnixNano: uint64(fakeClock.Now().UnixNano()),
StatusTimeUnixNano: startTime,
ComponentHealthMap: map[string]*protobufs.ComponentHealth{
otherCollectorName + "/" + thirdCollectorName + "-1": {
Healthy: false,
Status: "Running",
StatusTimeUnixNano: uint64(fakeClock.Now().UnixNano()),
StatusTimeUnixNano: startTime,
StartTimeUnixNano: uint64(0),
},
},
Expand Down
1 change: 1 addition & 0 deletions cmd/operator-opamp-bridge/agent/testdata/basic.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ metadata:
name: simplest
labels:
"opentelemetry.io/opamp-managed": "true"
creationTimestamp: "1970-01-01T00:00:00Z"
spec:
config:
receivers:
Expand Down
1 change: 1 addition & 0 deletions cmd/operator-opamp-bridge/agent/testdata/updated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ metadata:
name: simplest
labels:
"opentelemetry.io/opamp-managed": "test-bridge"
creationTimestamp: "1970-01-01T00:00:00Z"
spec:
config:
receivers:
Expand Down
Loading