|
| 1 | +// Copyright The OpenTelemetry Authors |
| 2 | +// |
| 3 | +// Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | +// you may not use this file except in compliance with the License. |
| 5 | +// You may obtain a copy of the License at |
| 6 | +// |
| 7 | +// http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | +// |
| 9 | +// Unless required by applicable law or agreed to in writing, software |
| 10 | +// distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | +// See the License for the specific language governing permissions and |
| 13 | +// limitations under the License. |
| 14 | + |
| 15 | +package agent |
| 16 | + |
| 17 | +import ( |
| 18 | + "bytes" |
| 19 | + "context" |
| 20 | + "time" |
| 21 | + |
| 22 | + "gopkg.in/yaml.v3" |
| 23 | + |
| 24 | + "github.com/open-telemetry/opentelemetry-operator/cmd/operator-opamp-bridge/metrics" |
| 25 | + "github.com/open-telemetry/opentelemetry-operator/cmd/operator-opamp-bridge/operator" |
| 26 | + |
| 27 | + "github.com/open-telemetry/opentelemetry-operator/cmd/operator-opamp-bridge/config" |
| 28 | + |
| 29 | + "github.com/oklog/ulid/v2" |
| 30 | + "go.uber.org/multierr" |
| 31 | + |
| 32 | + "github.com/open-telemetry/opamp-go/client" |
| 33 | + "github.com/open-telemetry/opamp-go/client/types" |
| 34 | + "github.com/open-telemetry/opamp-go/protobufs" |
| 35 | +) |
| 36 | + |
| 37 | +type Agent struct { |
| 38 | + logger types.Logger |
| 39 | + |
| 40 | + appliedKeys map[collectorKey]bool |
| 41 | + startTime uint64 |
| 42 | + lastHash []byte |
| 43 | + |
| 44 | + instanceId ulid.ULID |
| 45 | + agentDescription *protobufs.AgentDescription |
| 46 | + remoteConfigStatus *protobufs.RemoteConfigStatus |
| 47 | + |
| 48 | + opampClient client.OpAMPClient |
| 49 | + metricReporter *metrics.MetricReporter |
| 50 | + config config.Config |
| 51 | + applier operator.ConfigApplier |
| 52 | + remoteConfigEnabled bool |
| 53 | +} |
| 54 | + |
| 55 | +func NewAgent(logger types.Logger, applier operator.ConfigApplier, config config.Config, opampClient client.OpAMPClient) *Agent { |
| 56 | + agent := &Agent{ |
| 57 | + config: config, |
| 58 | + applier: applier, |
| 59 | + logger: logger, |
| 60 | + appliedKeys: map[collectorKey]bool{}, |
| 61 | + instanceId: config.GetNewInstanceId(), |
| 62 | + agentDescription: config.GetDescription(), |
| 63 | + remoteConfigEnabled: config.RemoteConfigEnabled(), |
| 64 | + opampClient: opampClient, |
| 65 | + } |
| 66 | + |
| 67 | + agent.logger.Debugf("Agent created, id=%v, type=%s, version=%s.", |
| 68 | + agent.instanceId.String(), config.GetAgentType(), config.GetAgentVersion()) |
| 69 | + |
| 70 | + return agent |
| 71 | +} |
| 72 | + |
| 73 | +// TODO: Something should run on a schedule to set the health of the OpAMP client. |
| 74 | +func (agent *Agent) getHealth() *protobufs.AgentHealth { |
| 75 | + return &protobufs.AgentHealth{ |
| 76 | + Healthy: true, |
| 77 | + StartTimeUnixNano: agent.startTime, |
| 78 | + LastError: "", |
| 79 | + } |
| 80 | +} |
| 81 | + |
| 82 | +// onConnect is called when an agent is successfully connected to a server. |
| 83 | +func (agent *Agent) onConnect() { |
| 84 | + agent.logger.Debugf("Connected to the server.") |
| 85 | +} |
| 86 | + |
| 87 | +// onConnectFailed is called when an agent was unable to connect to a server. |
| 88 | +func (agent *Agent) onConnectFailed(err error) { |
| 89 | + agent.logger.Errorf("Failed to connect to the server: %v", err) |
| 90 | +} |
| 91 | + |
| 92 | +// onError is called when an agent receives an error response from the server. |
| 93 | +func (agent *Agent) onError(err *protobufs.ServerErrorResponse) { |
| 94 | + agent.logger.Errorf("Server returned an error response: %v", err.ErrorMessage) |
| 95 | +} |
| 96 | + |
| 97 | +// saveRemoteConfigStatus receives a status from the server when the server sets a remote configuration. |
| 98 | +func (agent *Agent) saveRemoteConfigStatus(_ context.Context, status *protobufs.RemoteConfigStatus) { |
| 99 | + agent.remoteConfigStatus = status |
| 100 | +} |
| 101 | + |
| 102 | +// Start sets up the callbacks for the OpAMP client and begins the client's connection to the server. |
| 103 | +func (agent *Agent) Start() error { |
| 104 | + agent.startTime = uint64(time.Now().UnixNano()) |
| 105 | + settings := types.StartSettings{ |
| 106 | + OpAMPServerURL: agent.config.Endpoint, |
| 107 | + InstanceUid: agent.instanceId.String(), |
| 108 | + Callbacks: types.CallbacksStruct{ |
| 109 | + OnConnectFunc: agent.onConnect, |
| 110 | + OnConnectFailedFunc: agent.onConnectFailed, |
| 111 | + OnErrorFunc: agent.onError, |
| 112 | + SaveRemoteConfigStatusFunc: agent.saveRemoteConfigStatus, |
| 113 | + GetEffectiveConfigFunc: agent.getEffectiveConfig, |
| 114 | + OnMessageFunc: agent.onMessage, |
| 115 | + }, |
| 116 | + RemoteConfigStatus: agent.remoteConfigStatus, |
| 117 | + PackagesStateProvider: nil, |
| 118 | + Capabilities: agent.config.GetCapabilities(), |
| 119 | + } |
| 120 | + err := agent.opampClient.SetAgentDescription(agent.agentDescription) |
| 121 | + if err != nil { |
| 122 | + return err |
| 123 | + } |
| 124 | + err = agent.opampClient.SetHealth(agent.getHealth()) |
| 125 | + if err != nil { |
| 126 | + return err |
| 127 | + } |
| 128 | + |
| 129 | + agent.logger.Debugf("Starting OpAMP client...") |
| 130 | + |
| 131 | + err = agent.opampClient.Start(context.Background(), settings) |
| 132 | + if err != nil { |
| 133 | + return err |
| 134 | + } |
| 135 | + |
| 136 | + agent.logger.Debugf("OpAMP Client started.") |
| 137 | + |
| 138 | + return nil |
| 139 | +} |
| 140 | + |
| 141 | +// updateAgentIdentity receives a new instanced Id from the remote server and updates the agent's instanceID field. |
| 142 | +// The meter will be reinitialized by the onMessage function. |
| 143 | +func (agent *Agent) updateAgentIdentity(instanceId ulid.ULID) { |
| 144 | + agent.logger.Debugf("Agent identity is being changed from id=%v to id=%v", |
| 145 | + agent.instanceId.String(), |
| 146 | + instanceId.String()) |
| 147 | + agent.instanceId = instanceId |
| 148 | +} |
| 149 | + |
| 150 | +// getEffectiveConfig is called when a remote server needs to learn of the current effective configuration of each |
| 151 | +// collector the agent is managing. |
| 152 | +func (agent *Agent) getEffectiveConfig(ctx context.Context) (*protobufs.EffectiveConfig, error) { |
| 153 | + instances, err := agent.applier.ListInstances() |
| 154 | + if err != nil { |
| 155 | + agent.logger.Errorf("couldn't list instances", err) |
| 156 | + return nil, err |
| 157 | + } |
| 158 | + instanceMap := map[string]*protobufs.AgentConfigFile{} |
| 159 | + for _, instance := range instances { |
| 160 | + marshaled, err := yaml.Marshal(instance) |
| 161 | + if err != nil { |
| 162 | + agent.logger.Errorf("couldn't marshal collector configuration", err) |
| 163 | + return nil, err |
| 164 | + } |
| 165 | + mapKey := newCollectorKey(instance.GetName(), instance.GetNamespace()) |
| 166 | + instanceMap[mapKey.String()] = &protobufs.AgentConfigFile{ |
| 167 | + Body: marshaled, |
| 168 | + ContentType: "yaml", |
| 169 | + } |
| 170 | + } |
| 171 | + return &protobufs.EffectiveConfig{ |
| 172 | + ConfigMap: &protobufs.AgentConfigMap{ |
| 173 | + ConfigMap: instanceMap, |
| 174 | + }, |
| 175 | + }, nil |
| 176 | +} |
| 177 | + |
| 178 | +// initMeter initializes a metric reporter instance for the agent to report runtime metrics to the |
| 179 | +// configured destination. The settings received will be used to initialize a reporter, shutting down any previously |
| 180 | +// running metrics reporting instances. |
| 181 | +func (agent *Agent) initMeter(settings *protobufs.TelemetryConnectionSettings) { |
| 182 | + reporter, err := metrics.NewMetricReporter(agent.logger, settings, agent.config.GetAgentType(), agent.config.GetAgentVersion(), agent.instanceId) |
| 183 | + if err != nil { |
| 184 | + agent.logger.Errorf("Cannot collect metrics: %v", err) |
| 185 | + return |
| 186 | + } |
| 187 | + |
| 188 | + if agent.metricReporter != nil { |
| 189 | + agent.metricReporter.Shutdown() |
| 190 | + } |
| 191 | + agent.metricReporter = reporter |
| 192 | +} |
| 193 | + |
| 194 | +// applyRemoteConfig receives a remote configuration from a remote server of the following form: |
| 195 | +// |
| 196 | +// map[name/namespace] -> collector CRD spec |
| 197 | +// |
| 198 | +// For every key in the received remote configuration, the agent attempts to apply it to the connected |
| 199 | +// Kubernetes cluster. If an agent fails to apply a collector CRD, it will continue to the next entry. The agent will |
| 200 | +// store the received configuration hash regardless of application status as per the OpAMP spec. |
| 201 | +// |
| 202 | +// INVARIANT: The caller must verify that config isn't nil _and_ the configuration has changed between calls. |
| 203 | +func (agent *Agent) applyRemoteConfig(config *protobufs.AgentRemoteConfig) (*protobufs.RemoteConfigStatus, error) { |
| 204 | + var multiErr error |
| 205 | + // Apply changes from the received config map |
| 206 | + for key, file := range config.Config.GetConfigMap() { |
| 207 | + if len(key) == 0 || len(file.Body) == 0 { |
| 208 | + continue |
| 209 | + } |
| 210 | + colKey, err := collectorKeyFromKey(key) |
| 211 | + if err != nil { |
| 212 | + multiErr = multierr.Append(multiErr, err) |
| 213 | + continue |
| 214 | + } |
| 215 | + err = agent.applier.Apply(colKey.name, colKey.namespace, file) |
| 216 | + if err != nil { |
| 217 | + multiErr = multierr.Append(multiErr, err) |
| 218 | + continue |
| 219 | + } |
| 220 | + agent.appliedKeys[colKey] = true |
| 221 | + } |
| 222 | + // Check if anything was deleted |
| 223 | + for collectorKey := range agent.appliedKeys { |
| 224 | + if _, ok := config.Config.GetConfigMap()[collectorKey.String()]; !ok { |
| 225 | + err := agent.applier.Delete(collectorKey.name, collectorKey.namespace) |
| 226 | + if err != nil { |
| 227 | + multiErr = multierr.Append(multiErr, err) |
| 228 | + } |
| 229 | + } |
| 230 | + } |
| 231 | + agent.lastHash = config.GetConfigHash() |
| 232 | + if multiErr != nil { |
| 233 | + return &protobufs.RemoteConfigStatus{ |
| 234 | + LastRemoteConfigHash: agent.lastHash, |
| 235 | + Status: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED, |
| 236 | + ErrorMessage: multiErr.Error(), |
| 237 | + }, multiErr |
| 238 | + } |
| 239 | + return &protobufs.RemoteConfigStatus{ |
| 240 | + LastRemoteConfigHash: agent.lastHash, |
| 241 | + Status: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED, |
| 242 | + }, nil |
| 243 | +} |
| 244 | + |
| 245 | +// Shutdown will stop the OpAMP client gracefully. |
| 246 | +func (agent *Agent) Shutdown() { |
| 247 | + agent.logger.Debugf("Agent shutting down...") |
| 248 | + if agent.opampClient != nil { |
| 249 | + err := agent.opampClient.Stop(context.Background()) |
| 250 | + if err != nil { |
| 251 | + agent.logger.Errorf(err.Error()) |
| 252 | + } |
| 253 | + } |
| 254 | + if agent.metricReporter != nil { |
| 255 | + agent.metricReporter.Shutdown() |
| 256 | + } |
| 257 | +} |
| 258 | + |
| 259 | +// onMessage is called when the client receives a new message from the connected OpAMP server. The agent is responsible |
| 260 | +// for checking if it should apply a new remote configuration. The agent will also initialize metrics based on the |
| 261 | +// settings received from the server. The agent is also able to update its identifier if it needs to. |
| 262 | +func (agent *Agent) onMessage(ctx context.Context, msg *types.MessageData) { |
| 263 | + // If we received remote configuration, and it's not the same as the previously applied one |
| 264 | + if agent.remoteConfigEnabled && msg.RemoteConfig != nil && !bytes.Equal(agent.lastHash, msg.RemoteConfig.GetConfigHash()) { |
| 265 | + var err error |
| 266 | + status, err := agent.applyRemoteConfig(msg.RemoteConfig) |
| 267 | + if err != nil { |
| 268 | + agent.logger.Errorf(err.Error()) |
| 269 | + } |
| 270 | + err = agent.opampClient.SetRemoteConfigStatus(status) |
| 271 | + if err != nil { |
| 272 | + agent.logger.Errorf(err.Error()) |
| 273 | + return |
| 274 | + } |
| 275 | + err = agent.opampClient.UpdateEffectiveConfig(ctx) |
| 276 | + if err != nil { |
| 277 | + agent.logger.Errorf(err.Error()) |
| 278 | + } |
| 279 | + } |
| 280 | + |
| 281 | + // The instance id is updated prior to the meter initialization so that the new meter will report using the updated |
| 282 | + // instanceId. |
| 283 | + if msg.AgentIdentification != nil { |
| 284 | + newInstanceId, err := ulid.Parse(msg.AgentIdentification.NewInstanceUid) |
| 285 | + if err != nil { |
| 286 | + agent.logger.Errorf(err.Error()) |
| 287 | + return |
| 288 | + } |
| 289 | + agent.updateAgentIdentity(newInstanceId) |
| 290 | + } |
| 291 | + |
| 292 | + if msg.OwnMetricsConnSettings != nil { |
| 293 | + agent.initMeter(msg.OwnMetricsConnSettings) |
| 294 | + } |
| 295 | +} |
0 commit comments