Skip to content

Commit 2ecac8d

Browse files
authored
feat: Add implementation of new AvailableComponents message (#340)
Implements the specification added to opamp-spec in open-telemetry/opamp-spec#201
1 parent 88af57a commit 2ecac8d

10 files changed

+477
-10
lines changed

client/client.go

+19
Original file line numberDiff line numberDiff line change
@@ -134,4 +134,23 @@ type OpAMPClient interface {
134134
// If no error is returned, the channel returned will be closed after the specified
135135
// message is sent.
136136
SendCustomMessage(message *protobufs.CustomMessage) (messageSendingChannel chan struct{}, err error)
137+
138+
// SetAvailableComponents modifies the set of components that are available for configuration
139+
// on the agent.
140+
// If called before Start(), initializes the client state that will be sent to the server upon
141+
// Start() if the ReportsAvailableComponents capability is set.
142+
// Must be called before Start() if the ReportsAvailableComponents capability is set.
143+
//
144+
// May be called any time after Start(), including from the OnMessage handler.
145+
// The new components will be sent with the next message to the server.
146+
//
147+
// When called after Start():
148+
// If components is nil, types.ErrAvailableComponentsMissing will be returned.
149+
// If components.Hash is nil or an empty []byte, types.ErrNoAvailableComponentHash will be returned.
150+
// If the ReportsAvailableComponents capability is not set in StartSettings.Capabilities during Start(),
151+
// types.ErrReportsAvailableComponentsNotSet will be returned.
152+
//
153+
// This method is subject to agent status compression - if components is not
154+
// different from the cached agent state, this method is a no-op.
155+
SetAvailableComponents(components *protobufs.AvailableComponents) error
137156
}

client/clientimpl_test.go

+123
Original file line numberDiff line numberDiff line change
@@ -2305,3 +2305,126 @@ func TestSetFlagsBeforeStart(t *testing.T) {
23052305
assert.NoError(t, err)
23062306
})
23072307
}
2308+
2309+
func TestSetAvailableComponents(t *testing.T) {
2310+
testCases := []struct {
2311+
desc string
2312+
capabilities protobufs.AgentCapabilities
2313+
testFunc func(t *testing.T, client OpAMPClient, srv *internal.MockServer)
2314+
}{
2315+
{
2316+
desc: "apply nil AvailableComponents",
2317+
capabilities: protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents,
2318+
testFunc: func(t *testing.T, client OpAMPClient, _ *internal.MockServer) {
2319+
require.ErrorIs(t, client.SetAvailableComponents(nil), types.ErrAvailableComponentsMissing)
2320+
},
2321+
},
2322+
{
2323+
desc: "apply AvailableComponents with empty hash",
2324+
capabilities: protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents,
2325+
testFunc: func(t *testing.T, client OpAMPClient, _ *internal.MockServer) {
2326+
require.ErrorIs(t, client.SetAvailableComponents(&protobufs.AvailableComponents{}), types.ErrNoAvailableComponentHash)
2327+
},
2328+
},
2329+
{
2330+
desc: "apply AvailableComponents without required capability",
2331+
testFunc: func(t *testing.T, client OpAMPClient, _ *internal.MockServer) {
2332+
require.ErrorIs(t, client.SetAvailableComponents(generateTestAvailableComponents()), types.ErrReportsAvailableComponentsNotSet)
2333+
},
2334+
},
2335+
{
2336+
desc: "apply AvailableComponents with cached AvailableComponents",
2337+
capabilities: protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents,
2338+
testFunc: func(t *testing.T, client OpAMPClient, _ *internal.MockServer) {
2339+
require.NoError(t, client.SetAvailableComponents(generateTestAvailableComponents()))
2340+
},
2341+
},
2342+
{
2343+
desc: "apply AvailableComponents with new AvailableComponents",
2344+
capabilities: protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents,
2345+
testFunc: func(t *testing.T, client OpAMPClient, srv *internal.MockServer) {
2346+
availableComponents := generateTestAvailableComponents()
2347+
availableComponents.Hash = []byte("different")
2348+
require.NoError(t, client.SetAvailableComponents(availableComponents))
2349+
srv.Expect(func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
2350+
assert.EqualValues(t, 1, msg.SequenceNum)
2351+
msgAvailableComponents := msg.GetAvailableComponents()
2352+
require.NotNil(t, msgAvailableComponents)
2353+
require.Equal(t, msgAvailableComponents.GetHash(), availableComponents.GetHash())
2354+
require.Nil(t, msgAvailableComponents.GetComponents())
2355+
return nil
2356+
})
2357+
},
2358+
},
2359+
}
2360+
2361+
for _, tc := range testCases {
2362+
t.Run(tc.desc, func(t *testing.T) {
2363+
testClients(t, func(t *testing.T, client OpAMPClient) {
2364+
2365+
// Start a Server.
2366+
srv := internal.StartMockServer(t)
2367+
srv.EnableExpectMode()
2368+
2369+
availableComponents := generateTestAvailableComponents()
2370+
client.SetAvailableComponents(availableComponents)
2371+
2372+
// Start a client.
2373+
settings := types.StartSettings{
2374+
OpAMPServerURL: "ws://" + srv.Endpoint,
2375+
Callbacks: types.Callbacks{
2376+
OnMessage: func(ctx context.Context, msg *types.MessageData) {},
2377+
},
2378+
Capabilities: tc.capabilities,
2379+
}
2380+
prepareClient(t, &settings, client)
2381+
2382+
// Client --->
2383+
assert.NoError(t, client.Start(context.Background(), settings))
2384+
2385+
// ---> Server
2386+
srv.Expect(func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
2387+
assert.EqualValues(t, 0, msg.SequenceNum)
2388+
msgAvailableComponents := msg.GetAvailableComponents()
2389+
if tc.capabilities&protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents != 0 {
2390+
require.NotNil(t, msgAvailableComponents)
2391+
require.Equal(t, msgAvailableComponents.GetHash(), availableComponents.GetHash())
2392+
require.Nil(t, msgAvailableComponents.GetComponents())
2393+
} else {
2394+
require.Nil(t, msgAvailableComponents)
2395+
}
2396+
return nil
2397+
})
2398+
2399+
tc.testFunc(t, client, srv)
2400+
2401+
// Shutdown the Server.
2402+
srv.Close()
2403+
2404+
// Shutdown the client.
2405+
err := client.Stop(context.Background())
2406+
assert.NoError(t, err)
2407+
})
2408+
})
2409+
}
2410+
}
2411+
2412+
func generateTestAvailableComponents() *protobufs.AvailableComponents {
2413+
return &protobufs.AvailableComponents{
2414+
Hash: []byte("fake-hash"),
2415+
Components: map[string]*protobufs.ComponentDetails{
2416+
"receivers": {
2417+
Metadata: []*protobufs.KeyValue{
2418+
{
2419+
Key: "component",
2420+
Value: &protobufs.AnyValue{
2421+
Value: &protobufs.AnyValue_StringValue{
2422+
StringValue: "filereceiver",
2423+
},
2424+
},
2425+
},
2426+
},
2427+
},
2428+
},
2429+
}
2430+
}

client/httpclient.go

+5
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,11 @@ func (c *httpClient) SendCustomMessage(message *protobufs.CustomMessage) (messag
120120
return c.common.SendCustomMessage(message)
121121
}
122122

123+
// SetAvailableComponents implements OpAMPClient.SetAvailableComponents
124+
func (c *httpClient) SetAvailableComponents(components *protobufs.AvailableComponents) error {
125+
return c.common.SetAvailableComponents(components)
126+
}
127+
123128
func (c *httpClient) runUntilStopped(ctx context.Context) {
124129
// Start the HTTP sender. This will make request/responses with retries for
125130
// failures and will wait with configured polling interval if there is nothing

client/httpclient_test.go

+122
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414

1515
"github.com/stretchr/testify/assert"
1616
"github.com/stretchr/testify/mock"
17+
"github.com/stretchr/testify/require"
1718
"google.golang.org/protobuf/proto"
1819

1920
"github.com/open-telemetry/opamp-go/client/internal"
@@ -311,3 +312,124 @@ func TestRedirectHTTP(t *testing.T) {
311312
})
312313
}
313314
}
315+
316+
func TestHTTPReportsAvailableComponents(t *testing.T) {
317+
testCases := []struct {
318+
desc string
319+
capabilities protobufs.AgentCapabilities
320+
availableComponents *protobufs.AvailableComponents
321+
startErr error
322+
}{
323+
{
324+
desc: "Does not report AvailableComponents",
325+
availableComponents: nil,
326+
},
327+
{
328+
desc: "Reports AvailableComponents",
329+
capabilities: protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents,
330+
availableComponents: generateTestAvailableComponents(),
331+
},
332+
{
333+
desc: "No AvailableComponents on Start() despite capability",
334+
capabilities: protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents,
335+
startErr: internal.ErrAvailableComponentsMissing,
336+
},
337+
}
338+
339+
for _, tc := range testCases {
340+
t.Run(tc.desc, func(t *testing.T) {
341+
// Start a Server.
342+
srv := internal.StartMockServer(t)
343+
var rcvCounter atomic.Uint64
344+
srv.OnMessage = func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
345+
assert.EqualValues(t, rcvCounter.Load(), msg.SequenceNum)
346+
rcvCounter.Add(1)
347+
time.Sleep(50 * time.Millisecond)
348+
if rcvCounter.Load() == 1 {
349+
resp := &protobufs.ServerToAgent{
350+
InstanceUid: msg.InstanceUid,
351+
}
352+
353+
if tc.availableComponents != nil {
354+
// the first message received should contain just the available component hash
355+
availableComponents := msg.GetAvailableComponents()
356+
require.NotNil(t, availableComponents)
357+
require.Nil(t, availableComponents.GetComponents())
358+
require.Equal(t, tc.availableComponents.GetHash(), availableComponents.GetHash())
359+
360+
// add the flag asking for the full available component state to the response
361+
resp.Flags = uint64(protobufs.ServerToAgentFlags_ServerToAgentFlags_ReportAvailableComponents)
362+
} else {
363+
require.Nil(t, msg.GetAvailableComponents())
364+
}
365+
366+
return resp
367+
}
368+
369+
if rcvCounter.Load() == 2 {
370+
if tc.availableComponents != nil {
371+
// the second message received should contain the full component state
372+
availableComponents := msg.GetAvailableComponents()
373+
require.NotNil(t, availableComponents)
374+
require.Equal(t, tc.availableComponents.GetComponents(), availableComponents.GetComponents())
375+
require.Equal(t, tc.availableComponents.GetHash(), availableComponents.GetHash())
376+
} else {
377+
require.Nil(t, msg.GetAvailableComponents())
378+
}
379+
380+
return nil
381+
}
382+
383+
// all subsequent messages should not have any available components
384+
require.Nil(t, msg.GetAvailableComponents())
385+
return nil
386+
}
387+
388+
// Start a client.
389+
settings := types.StartSettings{}
390+
settings.OpAMPServerURL = "http://" + srv.Endpoint
391+
settings.Capabilities = tc.capabilities
392+
393+
client := NewHTTP(nil)
394+
client.SetAvailableComponents(tc.availableComponents)
395+
prepareClient(t, &settings, client)
396+
397+
startErr := client.Start(context.Background(), settings)
398+
if tc.startErr == nil {
399+
assert.NoError(t, startErr)
400+
} else {
401+
assert.ErrorIs(t, startErr, tc.startErr)
402+
return
403+
}
404+
405+
// Verify that status report is delivered.
406+
eventually(t, func() bool {
407+
return rcvCounter.Load() == 1
408+
})
409+
410+
if tc.availableComponents != nil {
411+
// Verify that status report is delivered again. Polling should ensure this.
412+
eventually(t, func() bool {
413+
return rcvCounter.Load() == 2
414+
})
415+
} else {
416+
// Verify that no second status report is delivered (polling is too infrequent for this to happen in 50ms)
417+
assert.Never(t, func() bool {
418+
return rcvCounter.Load() == 2
419+
}, 50*time.Millisecond, 10*time.Millisecond)
420+
}
421+
422+
// Verify that no third status report is delivered (polling is too infrequent for this to happen in 50ms)
423+
assert.Never(t, func() bool {
424+
return rcvCounter.Load() == 3
425+
}, 50*time.Millisecond, 10*time.Millisecond)
426+
427+
// Shutdown the Server.
428+
srv.Close()
429+
430+
// Shutdown the client.
431+
err := client.Stop(context.Background())
432+
assert.NoError(t, err)
433+
})
434+
}
435+
}

client/internal/clientcommon.go

+59
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ var (
2020
ErrReportsRemoteConfigNotSet = errors.New("ReportsRemoteConfig capability is not set")
2121
ErrPackagesStateProviderNotSet = errors.New("PackagesStateProvider must be set")
2222
ErrAcceptsPackagesNotSet = errors.New("AcceptsPackages and ReportsPackageStatuses must be set")
23+
ErrAvailableComponentsMissing = errors.New("AvailableComponents is nil")
2324

2425
errAlreadyStarted = errors.New("already started")
2526
errCannotStopNotStarted = errors.New("cannot stop because not started")
@@ -88,6 +89,10 @@ func (c *ClientCommon) PrepareStart(
8889
return ErrHealthMissing
8990
}
9091

92+
if c.Capabilities&protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents != 0 && c.ClientSyncedState.AvailableComponents() == nil {
93+
return ErrAvailableComponentsMissing
94+
}
95+
9196
// Prepare remote config status.
9297
if settings.RemoteConfigStatus == nil {
9398
// RemoteConfigStatus is not provided. Start with empty.
@@ -212,6 +217,15 @@ func (c *ClientCommon) PrepareFirstMessage(ctx context.Context) error {
212217
return err
213218
}
214219

220+
// initially, do not send the full component state - just send the hash.
221+
// full state is available on request from the server using the corresponding ServerToAgent flag
222+
var availableComponents *protobufs.AvailableComponents
223+
if c.Capabilities&protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents != 0 {
224+
availableComponents = &protobufs.AvailableComponents{
225+
Hash: c.ClientSyncedState.AvailableComponents().GetHash(),
226+
}
227+
}
228+
215229
c.sender.NextMessage().Update(
216230
func(msg *protobufs.AgentToServer) {
217231
msg.AgentDescription = c.ClientSyncedState.AgentDescription()
@@ -221,6 +235,7 @@ func (c *ClientCommon) PrepareFirstMessage(ctx context.Context) error {
221235
msg.Capabilities = uint64(c.Capabilities)
222236
msg.CustomCapabilities = c.ClientSyncedState.CustomCapabilities()
223237
msg.Flags = c.ClientSyncedState.Flags()
238+
msg.AvailableComponents = availableComponents
224239
},
225240
)
226241
return nil
@@ -433,3 +448,47 @@ func (c *ClientCommon) SendCustomMessage(message *protobufs.CustomMessage) (mess
433448

434449
return sendingChan, nil
435450
}
451+
452+
// SetAvailableComponents sends a message to the server with the available components for the agent
453+
func (c *ClientCommon) SetAvailableComponents(components *protobufs.AvailableComponents) error {
454+
if !c.isStarted {
455+
return c.ClientSyncedState.SetAvailableComponents(components)
456+
}
457+
458+
if c.Capabilities&protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents == 0 {
459+
return types.ErrReportsAvailableComponentsNotSet
460+
}
461+
462+
if components == nil {
463+
return types.ErrAvailableComponentsMissing
464+
}
465+
466+
if len(components.Hash) == 0 {
467+
return types.ErrNoAvailableComponentHash
468+
}
469+
470+
// implement agent status compression, don't send the message if it hasn't changed from the previous message
471+
availableComponentsChanged := !proto.Equal(c.ClientSyncedState.AvailableComponents(), components)
472+
473+
if availableComponentsChanged {
474+
if err := c.ClientSyncedState.SetAvailableComponents(components); err != nil {
475+
return err
476+
}
477+
478+
// initially, do not send the full component state - just send the hash.
479+
// full state is available on request from the server using the corresponding ServerToAgent flag
480+
availableComponents := &protobufs.AvailableComponents{
481+
Hash: c.ClientSyncedState.AvailableComponents().GetHash(),
482+
}
483+
484+
c.sender.NextMessage().Update(
485+
func(msg *protobufs.AgentToServer) {
486+
msg.AvailableComponents = availableComponents
487+
},
488+
)
489+
490+
c.sender.ScheduleSend()
491+
}
492+
493+
return nil
494+
}

0 commit comments

Comments
 (0)