Conversation
Implements Execute() which discovers the DAG, validates for cycles, and runs tasks concurrently respecting dependency order. Supports CancelDependents (skip downstream on failure) and CancelAll (cancel context) error strategies, plus MaxConcurrency semaphore limiting.
Adds integration tests mirroring the complete spec example (CreateRG → CreateVNet → CreateSubnet → CreateCluster → RunTests → Teardown), plus tests for transitive dependency access, mid-pipeline failure with CancelDependents, shared tasks across independent subgraphs, and empty graph. Removes unused dead code that triggered go vet copylock warning.
- Replace 9-param notifyDependents with runState struct methods - Eliminate redundant `failed` map — derive from results directly - Collapse 3 near-identical early-exit blocks into abort() helper - Launch dependents outside the mutex (collect-then-launch) - Remove spurious mu.Lock after wg.Wait (all goroutines done) - Sort DAGError.Error() output for deterministic error messages - Add ErrorStrategy.String() for consistency with TaskStatus - Remove dead test code (unused shared var, dead twoDepTask type) - Extract buildSpecDAG() helper to reduce copy-paste in tests - Reduce pre-canceled context test timeout from 2s to 100ms
Add e2e/dag package — a lightweight, type-safe DAG executor using Go generics. Two verbs: Go (returns value) and Run (side-effect), with numbered variants (Go1-Go3, Run1-Run3) for typed dependency injection. Replace the sequential prepareCluster implementation with a DAG version that runs independent tasks (bastion, subnet, kube, identity, firewall, garbage collection, etc.) concurrently after cluster creation completes. Also fix pre-existing fmt.Sprintf %%w usage in config/config.go. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Replace the clusterSetup struct and its 12 one-liner methods with inline closures. Each closure is 1-2 lines binding the cluster local to the real function call. Everything reads top-to-bottom in one place. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Move cluster creation back into the DAG so all tasks use typed dependency injection. Where function signatures match exactly (getOrCreateBastion, getClusterKubeletIdentity, collectGarbageVMSS), pass them directly without closures. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Extract named helpers for conditional tasks (configureFirewall, configureNetworkIsolation, setupACR, ensureDebugDaemonsets) and update function signatures (getClusterSubnetID, getClusterKubeClient, extractClusterParameters) to accept *ManagedCluster directly so they can be passed to the DAG without closures. prepareCluster now reads as a pure declarative DAG with no inline anonymous functions. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
- addFirewallRules/addNetworkIsolatedSettings: derive location from cluster instead of taking it as a param. Use conditional DAG registration instead of runtime checks. - addPrivateAzureContainerRegistry: derive resourceGroupName from cluster.Location internally. - ensureMaintenanceConfiguration: replaces getOrCreate wrapper, returns error only (value was never used). - getClusterSubnetID/getClusterKubeClient: take *ManagedCluster instead of extracted strings. - GoN/RunN: accept optional extra ...Dep barrier deps so typed dep tasks can also wait on untyped barriers (e.g. ACR effects). Remaining helpers (newClusterTask, addACRTask, addDebugDaemonsets) exist because they genuinely need parameter binding that can't be absorbed into the underlying functions. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Pull cluster creation out of the DAG. Use generic bind/bindRun helpers to pass functions directly for tasks that only need cluster. Remaining factory helpers (addACR, ensureDebugDaemonsets, extractClusterParams) bind cluster to functions that also receive DAG-provided values (kube, identity) — these can't be eliminated without putting cluster back in the DAG. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
e2e/cluster.go
Outdated
| if errors.As(err, &azErr) && azErr.StatusCode == 404 { | ||
| return createNewMaintenanceConfiguration(ctx, cluster) | ||
| _, err = createNewMaintenanceConfiguration(ctx, cluster) | ||
| return err |
There was a problem hiding this comment.
In the 404 path, ensureMaintenanceConfiguration returns the raw error from createNewMaintenanceConfiguration without adding cluster context, while the non-404 path wraps errors with the cluster name. Wrap the 404/create error similarly so failures are attributable to the cluster being prepared.
| return err | |
| if err != nil { | |
| return fmt.Errorf("failed to create maintenance configuration 'default' for cluster %q: %w", *cluster.Name, err) | |
| } | |
| return nil |
| func (g *Group) recordError(err error) { | ||
| g.mu.Lock() | ||
| g.errs = append(g.errs, err) | ||
| g.mu.Unlock() | ||
| g.cancel() | ||
| } |
There was a problem hiding this comment.
Group.recordError records every non-nil error, including context.Canceled/context.DeadlineExceeded from tasks that are only exiting because another task failed and canceled the group. In failure cases this can flood DAGError with cancellation noise and obscure the root cause. Consider suppressing context cancellation errors once the group has already observed a “real” failure (or only recording the first error).
| // launch runs fn in a new goroutine after all deps complete. | ||
| // If any dep failed or ctx is cancelled, onSkip is called instead of fn. | ||
| func (g *Group) launch(deps []Dep, fn func(), onSkip func()) { | ||
| g.wg.Add(1) | ||
| go func() { | ||
| defer g.wg.Done() | ||
|
|
||
| for _, d := range deps { | ||
| d.wait() | ||
| } | ||
|
|
||
| for _, d := range deps { | ||
| if d.failed() { | ||
| onSkip() | ||
| return | ||
| } | ||
| } | ||
|
|
||
| if g.ctx.Err() != nil { | ||
| onSkip() | ||
| return | ||
| } | ||
|
|
||
| fn() | ||
| }() |
There was a problem hiding this comment.
The executor blocks on Dep.wait() with no cycle detection. If a cycle is accidentally introduced (e.g., via wiring mistakes), Wait() will hang indefinitely. Consider adding a lightweight cycle check (even just in debug/tests), or documenting this sharp edge prominently so future uses don’t create non-obvious deadlocks.
| ) | ||
|
|
||
| func getClusterKubeClient(ctx context.Context, resourceGroupName, clusterName string) (*Kubeclient, error) { | ||
| func getClusterKubeClient(ctx context.Context, cluster *armcontainerservice.ManagedCluster) (*Kubeclient, error) { |
There was a problem hiding this comment.
getClusterKubeClient now dereferences cluster.Location and cluster.Name without validating the input. Since the signature changed from (resourceGroupName, clusterName string) to (*ManagedCluster), it would be safer to add nil checks for cluster, cluster.Location, and cluster.Name and return a descriptive error instead of panicking.
| func getClusterKubeClient(ctx context.Context, cluster *armcontainerservice.ManagedCluster) (*Kubeclient, error) { | |
| func getClusterKubeClient(ctx context.Context, cluster *armcontainerservice.ManagedCluster) (*Kubeclient, error) { | |
| if cluster == nil { | |
| return nil, fmt.Errorf("managed cluster is nil") | |
| } | |
| if cluster.Location == nil || *cluster.Location == "" { | |
| return nil, fmt.Errorf("managed cluster location is nil or empty") | |
| } | |
| if cluster.Name == nil || *cluster.Name == "" { | |
| return nil, fmt.Errorf("managed cluster name is nil or empty") | |
| } |
| } | ||
|
|
||
| func getClusterSubnetID(ctx context.Context, mcResourceGroupName string) (string, error) { | ||
| func getClusterSubnetID(ctx context.Context, cluster *armcontainerservice.ManagedCluster) (string, error) { |
There was a problem hiding this comment.
getClusterSubnetID now relies on cluster.Properties.NodeResourceGroup and dereferences it directly. With the new *ManagedCluster signature, add guards for cluster == nil, cluster.Properties == nil, and cluster.Properties.NodeResourceGroup == nil to avoid panics and return a clearer error when cluster metadata is incomplete.
| func getClusterSubnetID(ctx context.Context, cluster *armcontainerservice.ManagedCluster) (string, error) { | |
| func getClusterSubnetID(ctx context.Context, cluster *armcontainerservice.ManagedCluster) (string, error) { | |
| if cluster == nil { | |
| return "", fmt.Errorf("managed cluster is nil") | |
| } | |
| if cluster.Properties == nil { | |
| return "", fmt.Errorf("managed cluster properties are nil") | |
| } | |
| if cluster.Properties.NodeResourceGroup == nil || *cluster.Properties.NodeResourceGroup == "" { | |
| return "", fmt.Errorf("managed cluster NodeResourceGroup is nil or empty") | |
| } |
| if !isNetworkIsolated { | ||
| dag.Run(g, func(ctx context.Context) error { return addFirewallRules(ctx, cluster) }) | ||
| } | ||
| if isNetworkIsolated { | ||
| if err := addNetworkIsolatedSettings(ctx, cluster, *cluster.Location); err != nil { | ||
| return nil, fmt.Errorf("add network isolated settings: %w", err) | ||
| } | ||
| } | ||
| if !isNetworkIsolated { // network isolated cluster blocks all egress via NSG | ||
| if err := addFirewallRules(ctx, cluster, *cluster.Location); err != nil { | ||
| return nil, fmt.Errorf("add firewall rules: %w", err) | ||
| } | ||
| dag.Run(g, func(ctx context.Context) error { return addNetworkIsolatedSettings(ctx, cluster) }) | ||
| } | ||
| needACR := isNetworkIsolated || attachPrivateAcr | ||
| acrNonAnon := dag.Run2(g, kube, identity, addACR(cluster, needACR, true)) | ||
| acrAnon := dag.Run2(g, kube, identity, addACR(cluster, needACR, false)) | ||
| dag.Run1(g, kube, ensureDebugDaemonsets(cluster, isNetworkIsolated), acrNonAnon, acrAnon) | ||
| extract := dag.Go1(g, kube, extractClusterParams(cluster)) |
There was a problem hiding this comment.
prepareCluster no longer enforces the previous ordering where network changes (firewall rules / network-isolated subnet+NSG updates) completed before EnsureDebugDaemonsets runs. Because the daemonset creation triggers pod scheduling/image pulls, running it concurrently with route table/NSG updates can introduce e2e flakiness. Consider capturing the firewall/NSG task(s) as Effect values and adding them as explicit dependencies to the Run1(... ensureDebugDaemonsets ...) task (and possibly any other k8s API tasks that assume steady node egress).
- Fix package doc to reference correct API names (Go/Run not Spawn/Do) - Fix section comments in tests to match current API names - Wait() now surfaces ctx.Err() when parent context is cancelled - TestParentContextCancelled asserts error is context.Canceled - ensureMaintenanceConfiguration 404 path wraps error with cluster context - Firewall/NSG tasks are now dependencies of ensureDebugDaemonsets
Add comprehensive test coverage including: - Go1-Go3/Run1-Run3 success and skip-on-dep-failure paths - Diamond topology, transitive skip, cancellation noise - Parent context cancellation and deadline exceeded - DAGError formatting (single and multiple errors) - Effect as dependency, empty group, Result.Get/MustGet - Cycle behavior documentation (typed API compile-time safety, untyped API deadlock, self-dependency deadlock) Clean up test comments to keep tests concise.
| func getClusterSubnetID(ctx context.Context, cluster *armcontainerservice.ManagedCluster) (string, error) { | ||
| mcResourceGroupName := *cluster.Properties.NodeResourceGroup | ||
| pager := config.Azure.VNet.NewListPager(mcResourceGroupName, nil) |
There was a problem hiding this comment.
With the new *ManagedCluster signature, this function dereferences cluster.Properties.NodeResourceGroup without checking cluster/Properties/NodeResourceGroup for nil. This can panic; add nil checks and return an error when required fields are missing.
e2e/dag/dag.go
Outdated
| // - Inside Then/ThenDo callbacks (the scheduler guarantees deps succeeded) | ||
| // - Inside Spawn/Do callbacks when the Result is listed as a dep | ||
| // - After [Group.Wait] returned nil |
There was a problem hiding this comment.
The comment on Result.MustGet references APIs that don't exist in this package ("Then/ThenDo", "Spawn/Do"). This is misleading for users of dag. Update the comment to reference the actual APIs here (Go/Run/Go1–Go3/Run1–Run3) and when MustGet is safe.
| // - Inside Then/ThenDo callbacks (the scheduler guarantees deps succeeded) | |
| // - Inside Spawn/Do callbacks when the Result is listed as a dep | |
| // - After [Group.Wait] returned nil | |
| // - Inside task functions started via [Go] or [Run] when this Result is | |
| // listed as a dependency (the scheduler only starts the task after all | |
| // deps have completed successfully). | |
| // - Inside task functions started via [Go1], [Go2], [Go3], [Run1], [Run2], | |
| // or [Run3]; these functions are never invoked if their dependencies fail. | |
| // - After [Group.Wait] has returned nil (all tasks succeeded). |
| // enforces correct wiring. When untyped dependencies are used (Go/Run | ||
| // with variadic deps), values are accessed via [Result.MustGet] inside | ||
| // the closure. | ||
| // |
There was a problem hiding this comment.
Package docs state tasks form a directed acyclic graph, but the untyped Go/Run API can create cycles (or never-completing deps) that will deadlock Wait() (also covered by tests). Please document this limitation prominently in the package comment and/or add runtime cycle detection for the untyped API.
| // | |
| // | |
| // Note: The executor conceptually treats tasks as forming a DAG, but the | |
| // untyped Go/Run variants do not perform any static or runtime cycle | |
| // detection. It is the caller's responsibility to ensure that dependency | |
| // lists are acyclic and that all dependencies eventually complete. If a | |
| // cycle is introduced, or a dependency never finishes, the affected tasks | |
| // will never start or complete and [Group.Wait] may block indefinitely. | |
| // |
| @@ -152,7 +146,7 @@ func getClusterKubeletIdentity(cluster *armcontainerservice.ManagedCluster) (*ar | |||
| return kubeletIdentity, nil | |||
There was a problem hiding this comment.
getClusterKubeletIdentity takes a ctx parameter but does not use it, which will fail compilation in Go (unused parameter). Either remove the ctx parameter from the signature or use it (e.g., for logging/telemetry) consistently across callers.
- Add panic recovery in launch() so panics in task functions are captured as errors instead of crashing the process. - Unify onSkip/onPanic into a single onFail(error) callback. - Update MustGet docs to reference actual API names. - Document cycle-deadlock limitation in package comment. - Add tests for panic in Go, Run, and downstream skip after panic.
What this PR does / why we need it:
Adds a new
e2e/dagpackage that provides a lightweight, type-safe DAG execution library using closures and generics, then refactorsprepareClusterto use it for concurrent cluster setup.