Skip to content

Commit db43c8e

Browse files
committed
Add new API to propagate context and deprecate olds
Signed-off-by: sivchari <[email protected]>
1 parent d65e9d9 commit db43c8e

File tree

8 files changed

+141
-36
lines changed

8 files changed

+141
-36
lines changed

pkg/cache/settings.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,10 @@ func (f *noopSettings) IsExcludedResource(_, _, _ string) bool {
3030
// Settings caching customizations
3131
type Settings struct {
3232
// ResourceHealthOverride contains health assessment overrides
33+
// Deprecated: use ResourceHealthOverrideContext insttead.
3334
ResourceHealthOverride health.HealthOverride
35+
// ResourceHealthOverrideContext contains health assessment overrides
36+
ResourceHealthOverrideContext health.HealthOverrideContext
3437
// ResourcesFilter holds filter that excludes resources
3538
ResourcesFilter kube.ResourceFilter
3639
}
@@ -54,7 +57,7 @@ func SetPopulateResourceInfoHandler(handler OnPopulateResourceInfoHandler) Updat
5457
// SetSettings updates caching settings
5558
func SetSettings(settings Settings) UpdateSettingsFunc {
5659
return func(cache *clusterCache) {
57-
cache.settings = Settings{settings.ResourceHealthOverride, settings.ResourcesFilter}
60+
cache.settings = Settings{settings.ResourceHealthOverride, settings.ResourceHealthOverrideContext, settings.ResourcesFilter}
5861
}
5962
}
6063

pkg/diff/diff.go

+6
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,15 @@ func (n *noopNormalizer) Normalize(_ *unstructured.Unstructured) error {
6161
return nil
6262
}
6363

64+
func (n *noopNormalizer) NormalizeContext(_ context.Context, _ *unstructured.Unstructured) error {
65+
return nil
66+
}
67+
6468
// Normalizer updates resource before comparing it
6569
type Normalizer interface {
70+
// Deprecated: use NormalizeContext instead
6671
Normalize(un *unstructured.Unstructured) error
72+
NormalizeContext(ctx context.Context, un *unstructured.Unstructured) error
6773
}
6874

6975
// GetNoopNormalizer returns normalizer that does not apply any resource modifications

pkg/health/health.go

+29-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package health
22

33
import (
4+
"context"
5+
46
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
57
"k8s.io/apimachinery/pkg/runtime/schema"
68

@@ -33,6 +35,13 @@ type HealthOverride interface {
3335
GetResourceHealth(obj *unstructured.Unstructured) (*HealthStatus, error)
3436
}
3537

38+
// Implements custom health assessment that overrides built-in assessment
39+
type HealthOverrideContext interface {
40+
GetResourceHealth(ctx context.Context, obj *unstructured.Unstructured) (*HealthStatus, error)
41+
}
42+
43+
type HealthOverrideFuncContext func(ctx context.Context, obj *unstructured.Unstructured) (*HealthStatus, error)
44+
3645
// Holds health assessment results
3746
type HealthStatus struct {
3847
Status HealthStatusCode `json:"status,omitempty"`
@@ -66,6 +75,25 @@ func IsWorse(current, new HealthStatusCode) bool {
6675

6776
// GetResourceHealth returns the health of a k8s resource
6877
func GetResourceHealth(obj *unstructured.Unstructured, healthOverride HealthOverride) (health *HealthStatus, err error) {
78+
healthOverrideContext := func(_ context.Context, obj *unstructured.Unstructured) (*HealthStatus, error) {
79+
return healthOverride.GetResourceHealth(obj)
80+
}
81+
return getResourceHealth(context.Background(), obj, healthOverrideContext)
82+
}
83+
84+
// GetResourceHealth returns the health of a k8s resource
85+
func GetResourceHealthContext(ctx context.Context, obj *unstructured.Unstructured, healthOverride HealthOverrideContext) (health *HealthStatus, err error) {
86+
var healthOverrideFunc HealthOverrideFuncContext
87+
if healthOverride != nil {
88+
healthOverrideFunc = func(ctx context.Context, obj *unstructured.Unstructured) (*HealthStatus, error) {
89+
return healthOverride.GetResourceHealth(ctx, obj)
90+
}
91+
}
92+
return getResourceHealth(ctx, obj, healthOverrideFunc)
93+
}
94+
95+
// GetResourceHealth returns the health of a k8s resource
96+
func getResourceHealth(ctx context.Context, obj *unstructured.Unstructured, healthOverride HealthOverrideFuncContext) (health *HealthStatus, err error) {
6997
if obj.GetDeletionTimestamp() != nil && !hook.HasHookFinalizer(obj) {
7098
return &HealthStatus{
7199
Status: HealthStatusProgressing,
@@ -74,7 +102,7 @@ func GetResourceHealth(obj *unstructured.Unstructured, healthOverride HealthOver
74102
}
75103

76104
if healthOverride != nil {
77-
health, err := healthOverride.GetResourceHealth(obj)
105+
health, err := healthOverride(ctx, obj)
78106
if err != nil {
79107
health = &HealthStatus{
80108
Status: HealthStatusUnknown,

pkg/health/health_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ Package provides functionality that allows assessing the health state of a Kuber
55
package health
66

77
import (
8+
"context"
89
"os"
910
"testing"
1011

@@ -28,7 +29,7 @@ func getHealthStatus(t *testing.T, yamlPath string) *HealthStatus {
2829
var obj unstructured.Unstructured
2930
err = yaml.Unmarshal(yamlBytes, &obj)
3031
require.NoError(t, err)
31-
health, err := GetResourceHealth(&obj, nil)
32+
health, err := GetResourceHealthContext(context.Background(), &obj, nil)
3233
require.NoError(t, err)
3334
return health
3435
}

pkg/sync/sync_context.go

+61-18
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,16 @@ func (r *reconciledResource) key() kubeutil.ResourceKey {
5050
type SyncContext interface {
5151
// Terminate terminates sync operation. The method is asynchronous: it starts deletion is related K8S resources
5252
// such as in-flight resource hooks, updates operation status, and exists without waiting for resource completion.
53+
// Deprecated: use TerminateContext instead
5354
Terminate()
55+
// TerminateContext terminates sync operation. The method is asynchronous: it starts deletion is related K8S resources
56+
// such as in-flight resource hooks, updates operation status, and exists without waiting for resource completion.
57+
TerminateContext(ctx context.Context)
5458
// Executes next synchronization step and updates operation status.
59+
// Deprecated: use SyncContext instead
5560
Sync()
61+
// Executes next synchronization step and updates operation status.
62+
SyncContext(ctx context.Context)
5663
// Returns current sync operation state and information about resources synchronized so far.
5764
GetState() (common.OperationPhase, string, []common.ResourceSyncResult)
5865
}
@@ -75,12 +82,20 @@ func WithPermissionValidator(validator common.PermissionValidator) SyncOpt {
7582
}
7683

7784
// WithHealthOverride sets specified health override
85+
// Deprecated: use WithHealthOverrideContext instead
7886
func WithHealthOverride(override health.HealthOverride) SyncOpt {
7987
return func(ctx *syncContext) {
8088
ctx.healthOverride = override
8189
}
8290
}
8391

92+
// WithHealthOverrideContext sets specified health override
93+
func WithHealthOverrideContext(override health.HealthOverrideContext) SyncOpt {
94+
return func(ctx *syncContext) {
95+
ctx.healthOverrideContext = override
96+
}
97+
}
98+
8499
// WithInitialState sets sync operation initial state
85100
func WithInitialState(phase common.OperationPhase, message string, results []common.ResourceSyncResult, startedAt metav1.Time) SyncOpt {
86101
return func(ctx *syncContext) {
@@ -308,11 +323,18 @@ const (
308323
)
309324

310325
// getOperationPhase returns a health status from a _live_ unstructured object
311-
func (sc *syncContext) getOperationPhase(obj *unstructured.Unstructured) (common.OperationPhase, string, error) {
326+
func (sc *syncContext) getOperationPhase(ctx context.Context, obj *unstructured.Unstructured) (common.OperationPhase, string, error) {
312327
phase := common.OperationSucceeded
313328
message := obj.GetName() + " created"
314329

315-
resHealth, err := health.GetResourceHealth(obj, sc.healthOverride)
330+
var resHealth *health.HealthStatus
331+
var err error
332+
333+
if sc.healthOverrideContext != nil {
334+
resHealth, err = health.GetResourceHealthContext(ctx, obj, sc.healthOverrideContext)
335+
} else if sc.healthOverride != nil {
336+
resHealth, err = health.GetResourceHealth(obj, sc.healthOverride)
337+
}
316338
if err != nil {
317339
return "", "", err
318340
}
@@ -333,18 +355,19 @@ func (sc *syncContext) getOperationPhase(obj *unstructured.Unstructured) (common
333355
}
334356

335357
type syncContext struct {
336-
healthOverride health.HealthOverride
337-
permissionValidator common.PermissionValidator
338-
resources map[kubeutil.ResourceKey]reconciledResource
339-
hooks []*unstructured.Unstructured
340-
config *rest.Config
341-
rawConfig *rest.Config
342-
dynamicIf dynamic.Interface
343-
disco discovery.DiscoveryInterface
344-
extensionsclientset *clientset.Clientset
345-
kubectl kubeutil.Kubectl
346-
resourceOps kubeutil.ResourceOperations
347-
namespace string
358+
healthOverride health.HealthOverride
359+
healthOverrideContext health.HealthOverrideContext
360+
permissionValidator common.PermissionValidator
361+
resources map[kubeutil.ResourceKey]reconciledResource
362+
hooks []*unstructured.Unstructured
363+
config *rest.Config
364+
rawConfig *rest.Config
365+
dynamicIf dynamic.Interface
366+
disco discovery.DiscoveryInterface
367+
extensionsclientset *clientset.Clientset
368+
kubectl kubeutil.Kubectl
369+
resourceOps kubeutil.ResourceOperations
370+
namespace string
348371

349372
dryRun bool
350373
skipDryRun bool
@@ -403,8 +426,19 @@ func (sc *syncContext) setRunningPhase(tasks []*syncTask, isPendingDeletion bool
403426
}
404427
}
405428

406-
// sync has performs the actual apply or hook based sync
429+
// Sync has performs the actual apply or hook based sync
430+
// Deprecated: use SyncContext instead
407431
func (sc *syncContext) Sync() {
432+
sc.SyncContext(context.Background())
433+
}
434+
435+
// SyncContext has performs the actual apply or hook based sync
436+
func (sc *syncContext) SyncContext(ctx context.Context) {
437+
sc.sync(ctx)
438+
}
439+
440+
// sync has performs the actual apply or hook based sync
441+
func (sc *syncContext) sync(ctx context.Context) {
408442
sc.log.WithValues("skipHooks", sc.skipHooks, "started", sc.started()).Info("Syncing")
409443
tasks, ok := sc.getSyncTasks()
410444
if !ok {
@@ -441,7 +475,7 @@ func (sc *syncContext) Sync() {
441475
}) {
442476
if task.isHook() {
443477
// update the hook's result
444-
operationState, message, err := sc.getOperationPhase(task.liveObj)
478+
operationState, message, err := sc.getOperationPhase(ctx, task.liveObj)
445479
if err != nil {
446480
sc.setResourceResult(task, "", common.OperationError, fmt.Sprintf("failed to get resource health: %v", err))
447481
} else {
@@ -1176,8 +1210,17 @@ func (sc *syncContext) hasCRDOfGroupKind(group string, kind string) bool {
11761210
return false
11771211
}
11781212

1179-
// terminate looks for any running jobs/workflow hooks and deletes the resource
1213+
// Deprecated: use TerminateContext instead
11801214
func (sc *syncContext) Terminate() {
1215+
sc.TerminateContext(context.Background())
1216+
}
1217+
1218+
func (sc *syncContext) TerminateContext(ctx context.Context) {
1219+
sc.terminate(ctx)
1220+
}
1221+
1222+
// terminate looks for any running jobs/workflow hooks and deletes the resource
1223+
func (sc *syncContext) terminate(ctx context.Context) {
11811224
terminateSuccessful := true
11821225
sc.log.V(1).Info("terminating")
11831226
tasks, _ := sc.getSyncTasks()
@@ -1190,7 +1233,7 @@ func (sc *syncContext) Terminate() {
11901233
terminateSuccessful = false
11911234
continue
11921235
}
1193-
phase, msg, err := sc.getOperationPhase(task.liveObj)
1236+
phase, msg, err := sc.getOperationPhase(ctx, task.liveObj)
11941237
if err != nil {
11951238
sc.setOperationPhase(common.OperationError, fmt.Sprintf("Failed to get hook health: %v", err))
11961239
return

pkg/utils/kube/ctl.go

+26-5
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ type CleanupFunc func()
2828

2929
type OnKubectlRunFunc func(command string) (CleanupFunc, error)
3030

31+
type OnKubectlRunFuncContext func(ctx context.Context, command string) (CleanupFunc, error)
32+
3133
type Kubectl interface {
3234
ManageResources(config *rest.Config, openAPISchema openapi.Resources) (ResourceOperations, func(), error)
3335
LoadOpenAPISchema(config *rest.Config) (openapi.Resources, *managedfields.GvkParser, error)
@@ -39,13 +41,16 @@ type Kubectl interface {
3941
GetAPIResources(config *rest.Config, preferred bool, resourceFilter ResourceFilter) ([]APIResourceInfo, error)
4042
GetServerVersion(config *rest.Config) (string, error)
4143
NewDynamicClient(config *rest.Config) (dynamic.Interface, error)
44+
// Deprecated: use SetOnKubectlRunContext instead.
4245
SetOnKubectlRun(onKubectlRun OnKubectlRunFunc)
46+
SetOnKubectlRunContext(onKubectlRun OnKubectlRunFuncContext)
4347
}
4448

4549
type KubectlCmd struct {
46-
Log logr.Logger
47-
Tracer tracing.Tracer
48-
OnKubectlRun OnKubectlRunFunc
50+
Log logr.Logger
51+
Tracer tracing.Tracer
52+
OnKubectlRun OnKubectlRunFunc
53+
OnKubectlRunContext OnKubectlRunFuncContext
4954
}
5055

5156
type APIResourceInfo struct {
@@ -292,11 +297,23 @@ func (k *KubectlCmd) ManageResources(config *rest.Config, openAPISchema openapi.
292297
openAPISchema: openAPISchema,
293298
tracer: k.Tracer,
294299
log: k.Log,
295-
onKubectlRun: k.OnKubectlRun,
300+
onKubectlRun: k.OnKubectlRunContext,
296301
}, cleanup, nil
297302
}
298303

304+
// Deprecated: use ManageServerSideDiffDryRunsContext instead.
299305
func ManageServerSideDiffDryRuns(config *rest.Config, openAPISchema openapi.Resources, tracer tracing.Tracer, log logr.Logger, onKubectlRun OnKubectlRunFunc) (diff.KubeApplier, func(), error) {
306+
onKubectlRunContext := func(_ context.Context, command string) (CleanupFunc, error) {
307+
return onKubectlRun(command)
308+
}
309+
return manageServerSideDiffDryRunsContext(config, openAPISchema, tracer, log, onKubectlRunContext)
310+
}
311+
312+
func ManageServerSideDiffDryRunsContext(config *rest.Config, openAPISchema openapi.Resources, tracer tracing.Tracer, log logr.Logger, onKubectlRunContext OnKubectlRunFuncContext) (diff.KubeApplier, func(), error) {
313+
return manageServerSideDiffDryRunsContext(config, openAPISchema, tracer, log, onKubectlRunContext)
314+
}
315+
316+
func manageServerSideDiffDryRunsContext(config *rest.Config, openAPISchema openapi.Resources, tracer tracing.Tracer, log logr.Logger, onKubectlRunContext OnKubectlRunFuncContext) (diff.KubeApplier, func(), error) {
300317
f, err := os.CreateTemp(utils.TempDir, "")
301318
if err != nil {
302319
return nil, nil, fmt.Errorf("failed to generate temp file for kubeconfig: %w", err)
@@ -317,7 +334,7 @@ func ManageServerSideDiffDryRuns(config *rest.Config, openAPISchema openapi.Reso
317334
openAPISchema: openAPISchema,
318335
tracer: tracer,
319336
log: log,
320-
onKubectlRun: onKubectlRun,
337+
onKubectlRun: onKubectlRunContext,
321338
}, cleanup, nil
322339
}
323340

@@ -356,6 +373,10 @@ func (k *KubectlCmd) SetOnKubectlRun(onKubectlRun OnKubectlRunFunc) {
356373
k.OnKubectlRun = onKubectlRun
357374
}
358375

376+
func (k *KubectlCmd) SetOnKubectlRunContext(onKubectlRunContext OnKubectlRunFuncContext) {
377+
k.OnKubectlRunContext = onKubectlRunContext
378+
}
379+
359380
func RunAllAsync(count int, action func(i int) error) error {
360381
g, ctx := errgroup.WithContext(context.Background())
361382
loop:

pkg/utils/kube/kubetest/mock.go

+3
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,9 @@ func (k *MockKubectlCmd) LoadOpenAPISchema(_ *rest.Config) (openapi.Resources, *
9696
func (k *MockKubectlCmd) SetOnKubectlRun(_ kube.OnKubectlRunFunc) {
9797
}
9898

99+
func (k *MockKubectlCmd) SetOnKubectlRunContext(_ kube.OnKubectlRunFuncContext) {
100+
}
101+
99102
func (k *MockKubectlCmd) ManageResources(_ *rest.Config, _ openapi.Resources) (kube.ResourceOperations, func(), error) {
100103
return &MockResourceOps{}, func() {
101104
}, nil

0 commit comments

Comments
 (0)