Skip to content

revisit timeout handling #254

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Mar 25, 2025
2 changes: 1 addition & 1 deletion clm/cmd/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func newApplyCmd() *cobra.Command {

for {
release.State = component.StateProcessing
ok, err := reconciler.Apply(context.TODO(), &release.Inventory, objects, namespace, ownerId, release.Revision)
ok, err := reconciler.Apply(context.TODO(), &release.Inventory, objects, namespace, ownerId, fmt.Sprintf("%d", release.Revision))
if err != nil {
if !isEphmeralError(err) || errCount >= maxErrCount {
return err
Expand Down
17 changes: 8 additions & 9 deletions internal/backoff/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,15 @@ type Backoff struct {
func NewBackoff(maxDelay time.Duration) *Backoff {
return &Backoff{
activities: make(map[any]any),
// resulting per-item backoff is the maximum of a 200-times-50ms-then-maxDelay per-item limiter,
// and an overall 5-per-second-burst-20 bucket limiter;
// as a consequence, we have up to
// - up to 20 almost immediate retries
// - then then a phase of 5 guaranteed retries per seconnd (could be more if burst capacity is refilled
// because of the duration of the reconcile logic execution itself)
// - finally (after 200 iterations) slow retries at the rate given by maxDelay
// resulting per-item backoff is the maximum of a 120-times-100ms-then-maxDelay per-item limiter,
// and an overall 1-per-second-burst-50 bucket limiter;
// as a consequence, we have
// - a phase of 10 retries per second for the first 5 seconds
// - then a phase of 1 retry per second for the next 60 seconds
// - finally slow retries at the rate given by maxDelay
limiter: workqueue.NewMaxOfRateLimiter(
workqueue.NewItemFastSlowRateLimiter(50*time.Millisecond, maxDelay, 200),
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(5), 20)},
workqueue.NewItemFastSlowRateLimiter(100*time.Millisecond, maxDelay, 120),
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(1), 50)},
),
}
}
Expand Down
147 changes: 92 additions & 55 deletions pkg/component/reconciler.go

Large diffs are not rendered by default.

37 changes: 32 additions & 5 deletions pkg/component/reference.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type ConfigMapReference struct {
}

func (r *ConfigMapReference) load(ctx context.Context, clnt client.Client, namespace string, ignoreNotFound bool) error {
// TODO: shouldn't we panic if already loaded?
configMap := &corev1.ConfigMap{}
if err := clnt.Get(ctx, apitypes.NamespacedName{Namespace: namespace, Name: r.Name}, configMap); err != nil {
if apierrors.IsNotFound(err) {
Expand All @@ -67,7 +68,7 @@ func (r *ConfigMapReference) load(ctx context.Context, clnt client.Client, names

func (r *ConfigMapReference) digest() string {
if !r.loaded {
// TODO: shouldn't we panic here?
// note: we can't panic here because this might be called in case of not-found situations
return ""
}
return calculateDigest(r.data)
Expand Down Expand Up @@ -97,6 +98,7 @@ type ConfigMapKeyReference struct {
}

func (r *ConfigMapKeyReference) load(ctx context.Context, clnt client.Client, namespace string, ignoreNotFound bool, fallbackKeys ...string) error {
// TODO: shouldn't we panic if already loaded?
configMap := &corev1.ConfigMap{}
if err := clnt.Get(ctx, apitypes.NamespacedName{Namespace: namespace, Name: r.Name}, configMap); err != nil {
if apierrors.IsNotFound(err) {
Expand Down Expand Up @@ -130,7 +132,7 @@ func (r *ConfigMapKeyReference) load(ctx context.Context, clnt client.Client, na

func (r *ConfigMapKeyReference) digest() string {
if !r.loaded {
// TODO: shouldn't we panic here?
// note: we can't panic here because this might be called in case of not-found situations
return ""
}
return sha256hex([]byte(r.value))
Expand All @@ -157,6 +159,7 @@ type SecretReference struct {
}

func (r *SecretReference) load(ctx context.Context, clnt client.Client, namespace string, ignoreNotFound bool) error {
// TODO: shouldn't we panic if already loaded?
secret := &corev1.Secret{}
if err := clnt.Get(ctx, apitypes.NamespacedName{Namespace: namespace, Name: r.Name}, secret); err != nil {
if apierrors.IsNotFound(err) {
Expand All @@ -175,7 +178,7 @@ func (r *SecretReference) load(ctx context.Context, clnt client.Client, namespac

func (r *SecretReference) digest() string {
if !r.loaded {
// TODO: shouldn't we panic here?
// note: we can't panic here because this might be called in case of not-found situations
return ""
}
return calculateDigest(r.data)
Expand Down Expand Up @@ -205,6 +208,7 @@ type SecretKeyReference struct {
}

func (r *SecretKeyReference) load(ctx context.Context, clnt client.Client, namespace string, ignoreNotFound bool, fallbackKeys ...string) error {
// TODO: shouldn't we panic if already loaded?
secret := &corev1.Secret{}
if err := clnt.Get(ctx, apitypes.NamespacedName{Namespace: namespace, Name: r.Name}, secret); err != nil {
if apierrors.IsNotFound(err) {
Expand Down Expand Up @@ -238,7 +242,7 @@ func (r *SecretKeyReference) load(ctx context.Context, clnt client.Client, names

func (r *SecretKeyReference) digest() string {
if !r.loaded {
// TODO: shouldn't we panic here?
// note: we can't panic here because this might be called in case of not-found situations
return ""
}
return sha256hex(r.value)
Expand All @@ -253,16 +257,31 @@ func (r *SecretKeyReference) Value() []byte {
return r.value
}

// Generic reference. All occurrences in the component's spec of types implementing this interface are automatically resolved
// by the framework during reconcile by calling the Load() method. The digests returned by the Digest() methods are
// incorporated into the component's digest.
type Reference[T Component] interface {
// Load the referenced content. The framework calls this at most once. So it is ok if implementation
// errors out or even panics if invoked more than once. The implementation may skip loading in certain cases,
// for example if deletion is ongoing.
Load(ctx context.Context, clnt client.Client, component T) error
// Return a digest of the referenced content. This digest is incorporated into the component digest which
// is passed to generators and hooks (per context) and which decides when the processing timer is reset,
// and therefore influences the timeout behavior of the compoment. In case the reference is not loaded,
// the implementation should return the empty string.
Digest() string
}

func resolveReferences[T Component](ctx context.Context, clnt client.Client, component T) (string, error) {
digestData := make(map[string]any)
spec := getSpec(component)
digestData["generation"] = component.GetGeneration()
digestData["annotations"] = component.GetAnnotations()
// TODO: including spec into the digest is actually not required (since generation is included)
digestData["spec"] = spec
if err := walk.Walk(spec, func(x any, path []string, tag reflect.StructTag) error {
// note: this must() is ok because marshalling []string should always work
rawPath := must(json.Marshal(path))
// TODO: allow arbitrary loadable types (with an interface LoadableReference or similar)
switch r := x.(type) {
case *ConfigMapReference:
if r == nil {
Expand Down Expand Up @@ -308,6 +327,14 @@ func resolveReferences[T Component](ctx context.Context, clnt client.Client, com
return err
}
digestData["refs:"+string(rawPath)] = r.digest()
case Reference[T]:
if v := reflect.ValueOf(r); r == nil || v.Kind() == reflect.Pointer && v.IsNil() {
return nil
}
if err := r.Load(ctx, clnt, component); err != nil {
return err
}
digestData["refs:"+string(rawPath)] = r.Digest()
}
return nil
}); err != nil {
Expand Down
8 changes: 4 additions & 4 deletions pkg/component/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func newReconcileTarget[T Component](reconcilerName string, reconcilerId string,
}
}

func (t *reconcileTarget[T]) Apply(ctx context.Context, component T, componentDigest string) (bool, string, error) {
func (t *reconcileTarget[T]) Apply(ctx context.Context, component T, componentDigest string) (bool, error) {
//log := log.FromContext(ctx)
namespace := ""
name := ""
Expand Down Expand Up @@ -63,12 +63,12 @@ func (t *reconcileTarget[T]) Apply(ctx context.Context, component T, componentDi
WithComponentDigest(componentDigest)
objects, err := t.resourceGenerator.Generate(generateCtx, namespace, name, component.GetSpec())
if err != nil {
return false, "", errors.Wrap(err, "error rendering manifests")
return false, errors.Wrap(err, "error rendering manifests")
}

ok, err := t.reconciler.Apply(ctx, &status.Inventory, objects, namespace, ownerId, component.GetGeneration())
ok, err := t.reconciler.Apply(ctx, &status.Inventory, objects, namespace, ownerId, componentDigest)

return ok, calculateDigest(componentDigest, objects), err
return ok, err
}

func (t *reconcileTarget[T]) Delete(ctx context.Context, component T) (bool, error) {
Expand Down
10 changes: 6 additions & 4 deletions pkg/reconciler/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func NewReconciler(name string, clnt cluster.Client, options ReconcilerOptions)
// Objects which are instances of namespaced types will be placed into the namespace passed to Apply(), if they have no namespace defined in their manifest.
// An update of an existing object will be performed if it is considered to be out of sync; that means:
// - the object's manifest has changed, and the effective reconcile policy is ReconcilePolicyOnObjectChange or ReconcilePolicyOnObjectOrComponentChange or
// - the specified component revision has changed and the effective reconcile policy is ReconcilePolicyOnObjectOrComponentChange or
// - the specified component has changed and the effective reconcile policy is ReconcilePolicyOnObjectOrComponentChange or
// - periodically after forceReapplyPeriod.
//
// The update itself will be done as follows:
Expand All @@ -242,10 +242,12 @@ func NewReconciler(name string, clnt cluster.Client, options ReconcilerOptions)
// This method will change the passed inventory (add or remove elements, change elements). If Apply() returns true, then all objects are successfully reconciled;
// otherwise, if it returns false, the caller should re-call it periodically, until it returns true. In any case, the passed inventory should match the state of the
// inventory after the previous invocation of Apply(); usually, the caller saves the inventory after calling Apply(), and loads it before calling Apply().
// The namespace and ownerId arguments should not be changed across subsequent invocations of Apply(); the componentRevision should be incremented only.
// The namespace and ownerId arguments should not be changed across subsequent invocations of Apply(); the supplied componentDigest is included into the
// digest of dependent objects if the effective reconcile policy is ReconcilePolicyOnObjectOrComponentChange (such that in this case, a change of componentDigest
// triggers an immediate reconciliation of all dependent objects).
//
// Also note: it is absolutely crucial that this method returns (true, nil) immediately (on the first call) if everything is already in the right state.
func (r *Reconciler) Apply(ctx context.Context, inventory *[]*InventoryItem, objects []client.Object, namespace string, ownerId string, componentRevision int64) (bool, error) {
func (r *Reconciler) Apply(ctx context.Context, inventory *[]*InventoryItem, objects []client.Object, namespace string, ownerId string, componentDigest string) (bool, error) {
var err error
log := log.FromContext(ctx)

Expand Down Expand Up @@ -417,7 +419,7 @@ func (r *Reconciler) Apply(ctx context.Context, inventory *[]*InventoryItem, obj
// calculate object digest
// note: if the effective reconcile policy of an object changes, it will always be reconciled at least one more time;
// this is in particular the case if the policy changes from or to ReconcilePolicyOnce.
digest, err := calculateObjectDigest(object, componentRevision, getReconcilePolicy(object))
digest, err := calculateObjectDigest(object, componentDigest, getReconcilePolicy(object))
if err != nil {
return false, errors.Wrapf(err, "error calculating digest for object %s", types.ObjectKeyToString(object))
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/reconciler/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ const (
// Reconcile the dependent object if its manifest, as produced by the generator, changes.
ReconcilePolicyOnObjectChange ReconcilePolicy = "OnObjectChange"
// Reconcile the dependent object if its manifest, as produced by the generator, changes, or if the owning
// component changes (identified by a change of its metadata.generation).
// component changes (identified by a change of its digest, including references).
ReconcilePolicyOnObjectOrComponentChange ReconcilePolicy = "OnObjectOrComponentChange"
// Reconcile the dependent object only once; afterwards it will never be touched again by the reconciler.
ReconcilePolicyOnce ReconcilePolicy = "Once"
Expand Down
5 changes: 3 additions & 2 deletions pkg/reconciler/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func checkRange(x int, min int, max int) error {
return nil
}

func calculateObjectDigest(obj client.Object, revision int64, reconcilePolicy ReconcilePolicy) (string, error) {
func calculateObjectDigest(obj client.Object, componentDigest string, reconcilePolicy ReconcilePolicy) (string, error) {
if reconcilePolicy == ReconcilePolicyOnce {
return "__once__", nil
}
Expand All @@ -79,7 +79,8 @@ func calculateObjectDigest(obj client.Object, revision int64, reconcilePolicy Re
digest := sha256hex(raw)

if reconcilePolicy == ReconcilePolicyOnObjectOrComponentChange {
digest = fmt.Sprintf("%s@%d", digest, revision)
// TODO: this becomes rather long; should we hash it once more?
digest = fmt.Sprintf("%s@%s", digest, componentDigest)
}

return digest, nil
Expand Down