diff --git a/pkg/sync/sync_context.go b/pkg/sync/sync_context.go index d56de6a1b..c78600196 100644 --- a/pkg/sync/sync_context.go +++ b/pkg/sync/sync_context.go @@ -629,87 +629,20 @@ func (sc *syncContext) containsResource(resource reconciledResource) bool { // generates the list of sync tasks we will be performing during this sync. func (sc *syncContext) getSyncTasks() (_ syncTasks, successful bool) { - resourceTasks := syncTasks{} - successful = true - - for k, resource := range sc.resources { - if !sc.containsResource(resource) { - sc.log.WithValues("group", k.Group, "kind", k.Kind, "name", k.Name).V(1).Info("Skipping") - continue - } - - obj := obj(resource.Target, resource.Live) - - // this creates garbage tasks - if hook.IsHook(obj) { - sc.log.WithValues("group", obj.GroupVersionKind().Group, "kind", obj.GetKind(), "namespace", obj.GetNamespace(), "name", obj.GetName()).V(1).Info("Skipping hook") - continue - } - - for _, phase := range syncPhases(obj) { - resourceTasks = append(resourceTasks, &syncTask{phase: phase, targetObj: resource.Target, liveObj: resource.Live}) - } - } - - sc.log.WithValues("resourceTasks", resourceTasks).V(1).Info("Tasks from managed resources") - - hookTasks := syncTasks{} - if !sc.skipHooks { - for _, obj := range sc.hooks { - for _, phase := range syncPhases(obj) { - // Hook resources names are deterministic, whether they are defined by the user (metadata.name), - // or formulated at the time of the operation (metadata.generateName). If user specifies - // metadata.generateName, then we will generate a formulated metadata.name before submission. - targetObj := obj.DeepCopy() - if targetObj.GetName() == "" { - var syncRevision string - if len(sc.revision) >= 8 { - syncRevision = sc.revision[0:7] - } else { - syncRevision = sc.revision - } - postfix := strings.ToLower(fmt.Sprintf("%s-%s-%d", syncRevision, phase, sc.startedAt.UTC().Unix())) - generateName := obj.GetGenerateName() - targetObj.SetName(fmt.Sprintf("%s%s", generateName, postfix)) - } - hookTasks = append(hookTasks, &syncTask{phase: phase, targetObj: targetObj}) - } - } - } - - sc.log.WithValues("hookTasks", hookTasks).V(1).Info("tasks from hooks") + successful = true + resourceTasks := sc.createResourceTasks() + hookTasks := sc.createHookTasks() tasks := resourceTasks tasks = append(tasks, hookTasks...) // enrich target objects with the namespace - for _, task := range tasks { - if task.targetObj == nil { - continue - } - - if task.targetObj.GetNamespace() == "" { - // If target object's namespace is empty, we set namespace in the object. We do - // this even though it might be a cluster-scoped resource. This prevents any - // possibility of the resource from unintentionally becoming created in the - // namespace during the `kubectl apply` - task.targetObj = task.targetObj.DeepCopy() - task.targetObj.SetNamespace(sc.namespace) - } - } - - if sc.syncNamespace != nil && sc.namespace != "" { - tasks = sc.autoCreateNamespace(tasks) - } + sc.enrichTargetObjectsWithNamespace(tasks) + tasks = sc.addAutoCreateNamespaceTask(tasks) // enrich task with live obj - for _, task := range tasks { - if task.targetObj == nil || task.liveObj != nil { - continue - } - task.liveObj = sc.liveObj(task.targetObj) - } + sc.enrichTasksWithLiveObjects(tasks) isRetryable := func(err error) bool { return apierr.IsUnauthorized(err) @@ -759,23 +692,61 @@ func (sc *syncContext) getSyncTasks() (_ syncTasks, successful bool) { } // for prune tasks, modify the waves for proper cleanup i.e reverse of sync wave (creation order) - pruneTasks := make(map[int][]*syncTask) + pruneTasksByWave := sc.groupPruneTasksIntoWaves(tasks) + + // reorder waves for pruning tasks using symmetric swap on prune waves + // waves to swap + sc.reorderPruneTaskWaves(pruneTasksByWave) + + // for pruneLast tasks, modify the wave to sync phase last wave of tasks + 1 + // to ensure proper cleanup, syncPhaseLastWave should also consider prune tasks to determine last wave + sc.reorderPruneLastTaskWaves(tasks) + tasks.Sort() + sc.enrichTasksWithResult(tasks) + + return tasks, successful +} + +func (sc *syncContext) enrichTasksWithResult(tasks syncTasks) { for _, task := range tasks { - if task.isPrune() { - pruneTasks[task.wave()] = append(pruneTasks[task.wave()], task) + result, ok := sc.syncRes[task.resultKey()] + if ok { + task.syncStatus = result.Status + task.operationState = result.HookPhase + task.message = result.Message + } + } +} + +func (sc *syncContext) reorderPruneLastTaskWaves(tasks syncTasks) { + syncPhaseLastWave := 0 + for _, task := range tasks { + if task.phase == common.SyncPhaseSync { + if task.wave() > syncPhaseLastWave { + syncPhaseLastWave = task.wave() + } } } + syncPhaseLastWave = syncPhaseLastWave + 1 + for _, task := range tasks { + if task.isPrune() && + (sc.pruneLast || resourceutil.HasAnnotationOption(task.liveObj, common.AnnotationSyncOptions, common.SyncOptionPruneLast)) { + task.waveOverride = &syncPhaseLastWave + } + } +} + +func (*syncContext) reorderPruneTaskWaves(pruneTasks map[int][]*syncTask) { var uniquePruneWaves []int for k := range pruneTasks { uniquePruneWaves = append(uniquePruneWaves, k) } sort.Ints(uniquePruneWaves) - // reorder waves for pruning tasks using symmetric swap on prune waves n := len(uniquePruneWaves) for i := 0; i < n/2; i++ { - // waves to swap + startWave := uniquePruneWaves[i] endWave := uniquePruneWaves[n-1-i] @@ -787,39 +758,103 @@ func (sc *syncContext) getSyncTasks() (_ syncTasks, successful bool) { task.waveOverride = &startWave } } +} - // for pruneLast tasks, modify the wave to sync phase last wave of tasks + 1 - // to ensure proper cleanup, syncPhaseLastWave should also consider prune tasks to determine last wave - syncPhaseLastWave := 0 +func (*syncContext) groupPruneTasksIntoWaves(tasks syncTasks) map[int][]*syncTask { + pruneTasks := make(map[int][]*syncTask) for _, task := range tasks { - if task.phase == common.SyncPhaseSync { - if task.wave() > syncPhaseLastWave { - syncPhaseLastWave = task.wave() - } + if task.isPrune() { + pruneTasks[task.wave()] = append(pruneTasks[task.wave()], task) } } - syncPhaseLastWave = syncPhaseLastWave + 1 + return pruneTasks +} +func (sc *syncContext) enrichTasksWithLiveObjects(tasks syncTasks) { for _, task := range tasks { - if task.isPrune() && - (sc.pruneLast || resourceutil.HasAnnotationOption(task.liveObj, common.AnnotationSyncOptions, common.SyncOptionPruneLast)) { - task.waveOverride = &syncPhaseLastWave + if task.targetObj == nil || task.liveObj != nil { + continue } + task.liveObj = sc.liveObj(task.targetObj) } +} - tasks.Sort() +func (sc *syncContext) addAutoCreateNamespaceTask(tasks syncTasks) syncTasks { + if sc.syncNamespace != nil && sc.namespace != "" { + tasks = sc.autoCreateNamespace(tasks) + } + return tasks +} - // finally enrich tasks with the result +func (sc *syncContext) enrichTargetObjectsWithNamespace(tasks syncTasks) { for _, task := range tasks { - result, ok := sc.syncRes[task.resultKey()] - if ok { - task.syncStatus = result.Status - task.operationState = result.HookPhase - task.message = result.Message + if task.targetObj == nil { + continue + } + + if task.targetObj.GetNamespace() == "" { + // If target object's namespace is empty, we set namespace in the object. We do + // this even though it might be a cluster-scoped resource. This prevents any + // possibility of the resource from unintentionally becoming created in the + // namespace during the `kubectl apply` + task.targetObj = task.targetObj.DeepCopy() + task.targetObj.SetNamespace(sc.namespace) } } +} - return tasks, successful +func (sc *syncContext) createHookTasks() syncTasks { + hookTasks := syncTasks{} + if !sc.skipHooks { + for _, obj := range sc.hooks { + for _, phase := range syncPhases(obj) { + // Hook resources names are deterministic, whether they are defined by the user (metadata.name), + // or formulated at the time of the operation (metadata.generateName). If user specifies + // metadata.generateName, then we will generate a formulated metadata.name before submission. + targetObj := obj.DeepCopy() + if targetObj.GetName() == "" { + var syncRevision string + if len(sc.revision) >= 8 { + syncRevision = sc.revision[0:7] + } else { + syncRevision = sc.revision + } + postfix := strings.ToLower(fmt.Sprintf("%s-%s-%d", syncRevision, phase, sc.startedAt.UTC().Unix())) + generateName := obj.GetGenerateName() + targetObj.SetName(fmt.Sprintf("%s%s", generateName, postfix)) + } + + hookTasks = append(hookTasks, &syncTask{phase: phase, targetObj: targetObj}) + } + } + } + + sc.log.WithValues("hookTasks", hookTasks).V(1).Info("tasks from hooks") + return hookTasks +} + +func (sc *syncContext) createResourceTasks() syncTasks { + resourceTasks := syncTasks{} + for k, resource := range sc.resources { + if !sc.containsResource(resource) { + sc.log.WithValues("group", k.Group, "kind", k.Kind, "name", k.Name).V(1).Info("Skipping") + continue + } + + obj := obj(resource.Target, resource.Live) + + // this creates garbage tasks + if hook.IsHook(obj) { + sc.log.WithValues("group", obj.GroupVersionKind().Group, "kind", obj.GetKind(), "namespace", obj.GetNamespace(), "name", obj.GetName()).V(1).Info("Skipping hook") + continue + } + + for _, phase := range syncPhases(obj) { + resourceTasks = append(resourceTasks, &syncTask{phase: phase, targetObj: resource.Target, liveObj: resource.Live}) + } + } + sc.log.WithValues("resourceTasks", resourceTasks).V(1).Info("Tasks from managed resources") + return resourceTasks } func (sc *syncContext) autoCreateNamespace(tasks syncTasks) syncTasks {