Skip to content

Commit a4999eb

Browse files
[receiver/k8sobjects] Add logic to properly handle 410 responses (open-telemetry#26098)
**Description:** Updates the receiver to properly handle 410 response code. The expectations for what clients should do when a 410 is received can be found here: https://kubernetes.io/docs/reference/using-api/api-concepts/#410-gone-responses. I originally implemented this feature directly in `startWatch`, rebuilding the watcher and res channel within the for loop, but I grew concerned about making sure everything stopped correctly. I took a look at the retry watcher's implementation and reused its concepts for this implementation. If it is overcomplicated we can go back to my original idea. **Link to tracking Issue:** <Issue number if applicable> Closes open-telemetry#24903 **Testing:** <Describe what testing was performed and which tests were added.> Tested locally. Unit tests proved to be extremely challenging since I couldn't figure out how to get the mock to produce a 410. We really need e2e tests (open-telemetry#18395). --------- Co-authored-by: Evan Bradley <11745660+evan-bradley@users.noreply.github.com>
1 parent ca8c1ad commit a4999eb

3 files changed

Lines changed: 69 additions & 11 deletions

File tree

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: k8sobjectsreceiver
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Adds logic to properly handle 410 response codes when watching. This improves the reliability of the receiver.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [26098]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

receiver/k8sobjectsreceiver/README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,8 @@ Use the below commands to create a `ClusterRole` with required permissions and a
122122
Following config will work for collecting pods and events only. You need to add
123123
appropriate rule for collecting other objects.
124124

125-
When using watch mode without specifying a `resource_version` you must also specify `list` verb so that the receiver has permission to do its initial list.
125+
When using watch mode you must also specify `list` verb so that the receiver has permission to do its initial list if no
126+
`resource_version` was supplied or a list to recover from [410 Gone scenarios](https://kubernetes.io/docs/reference/using-api/api-concepts/#410-gone-responses).
126127

127128
```bash
128129
<<EOF | kubectl apply -f -

receiver/k8sobjectsreceiver/receiver.go

Lines changed: 40 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package k8sobjectsreceiver // import "github.com/open-telemetry/opentelemetry-co
66
import (
77
"context"
88
"fmt"
9+
"net/http"
910
"sync"
1011
"time"
1112

@@ -14,7 +15,9 @@ import (
1415
"go.opentelemetry.io/collector/obsreport"
1516
"go.opentelemetry.io/collector/receiver"
1617
"go.uber.org/zap"
18+
apierrors "k8s.io/apimachinery/pkg/api/errors"
1719
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
20+
"k8s.io/apimachinery/pkg/util/wait"
1821
apiWatch "k8s.io/apimachinery/pkg/watch"
1922
"k8s.io/client-go/dynamic"
2023
"k8s.io/client-go/tools/cache"
@@ -148,36 +151,64 @@ func (kr *k8sobjectsreceiver) startPull(ctx context.Context, config *K8sObjectsC
148151
}
149152

150153
func (kr *k8sobjectsreceiver) startWatch(ctx context.Context, config *K8sObjectsConfig, resource dynamic.ResourceInterface) {
151-
152154
stopperChan := make(chan struct{})
153155
kr.mu.Lock()
154156
kr.stopperChanList = append(kr.stopperChanList, stopperChan)
155157
kr.mu.Unlock()
156158

157-
resourceVersion, err := getResourceVersion(ctx, config, resource)
158-
if err != nil {
159-
kr.setting.Logger.Error("could not retrieve an initial resourceVersion", zap.String("resource", config.gvr.String()), zap.Error(err))
160-
return
161-
}
162159
watchFunc := func(options metav1.ListOptions) (apiWatch.Interface, error) {
163160
options.FieldSelector = config.FieldSelector
164161
options.LabelSelector = config.LabelSelector
165162
return resource.Watch(ctx, options)
166163
}
167164

165+
cancelCtx, cancel := context.WithCancel(ctx)
166+
cfgCopy := *config
167+
wait.UntilWithContext(cancelCtx, func(newCtx context.Context) {
168+
resourceVersion, err := getResourceVersion(newCtx, &cfgCopy, resource)
169+
if err != nil {
170+
kr.setting.Logger.Error("could not retrieve a resourceVersion", zap.String("resource", cfgCopy.gvr.String()), zap.Error(err))
171+
cancel()
172+
return
173+
}
174+
175+
done := kr.doWatch(newCtx, &cfgCopy, resourceVersion, watchFunc, stopperChan)
176+
if done {
177+
cancel()
178+
return
179+
}
180+
181+
// need to restart with a fresh resource version
182+
cfgCopy.ResourceVersion = ""
183+
}, 0)
184+
}
185+
186+
// doWatch returns true when watching is done, false when watching should be restarted.
187+
func (kr *k8sobjectsreceiver) doWatch(ctx context.Context, config *K8sObjectsConfig, resourceVersion string, watchFunc func(options metav1.ListOptions) (apiWatch.Interface, error), stopperChan chan struct{}) bool {
168188
watcher, err := watch.NewRetryWatcher(resourceVersion, &cache.ListWatch{WatchFunc: watchFunc})
169189
if err != nil {
170190
kr.setting.Logger.Error("error in watching object", zap.String("resource", config.gvr.String()), zap.Error(err))
171-
return
191+
return true
172192
}
173193

194+
defer watcher.Stop()
174195
res := watcher.ResultChan()
175196
for {
176197
select {
177198
case data, ok := <-res:
199+
if data.Type == apiWatch.Error {
200+
errObject := apierrors.FromObject(data.Object)
201+
// nolint:errorlint
202+
if errObject.(*apierrors.StatusError).ErrStatus.Code == http.StatusGone {
203+
kr.setting.Logger.Info("received a 410, grabbing new resource version", zap.Any("data", data))
204+
// we received a 410 so we need to restart
205+
return false
206+
}
207+
}
208+
178209
if !ok {
179210
kr.setting.Logger.Warn("Watch channel closed unexpectedly", zap.String("resource", config.gvr.String()))
180-
return
211+
return true
181212
}
182213

183214
if config.exclude[data.Type] {
@@ -195,10 +226,9 @@ func (kr *k8sobjectsreceiver) startWatch(ctx context.Context, config *K8sObjects
195226
}
196227
case <-stopperChan:
197228
watcher.Stop()
198-
return
229+
return true
199230
}
200231
}
201-
202232
}
203233

204234
func getResourceVersion(ctx context.Context, config *K8sObjectsConfig, resource dynamic.ResourceInterface) (string, error) {

0 commit comments

Comments
 (0)