diff --git a/src/main/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionList.java b/src/main/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionList.java index d621d84c..3fab2131 100644 --- a/src/main/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionList.java +++ b/src/main/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionList.java @@ -10,7 +10,6 @@ import hudson.Extension; import hudson.ExtensionList; import hudson.XmlFile; -import hudson.init.InitMilestone; import hudson.init.Terminator; import hudson.model.listeners.ItemListener; import hudson.remoting.SingleLaneExecutorService; @@ -32,7 +31,6 @@ import java.util.logging.Logger; import org.kohsuke.accmod.Restricted; -import org.kohsuke.accmod.restrictions.Beta; import org.kohsuke.accmod.restrictions.DoNotUse; /** @@ -46,8 +44,6 @@ public class FlowExecutionList implements Iterable { private final SingleLaneExecutorService executor = new SingleLaneExecutorService(Timer.get()); private XmlFile configFile; - private transient volatile boolean resumptionComplete; - public FlowExecutionList() { load(); } @@ -164,15 +160,11 @@ public static FlowExecutionList get() { } /** - * Returns true if all executions that were present in this {@link FlowExecutionList} have been loaded and resumed. - * - * This takes place slightly after {@link InitMilestone#COMPLETED} is reached during Jenkins startup. - * - * Useful to avoid resuming Pipelines in contexts that may lead to deadlock. + * @deprecated Only exists for binary compatibility. */ - @Restricted(Beta.class) + @Deprecated public boolean isResumptionComplete() { - return resumptionComplete; + return false; } /** @@ -187,8 +179,25 @@ public void onLoaded() { for (final FlowExecution e : list) { // The call to FlowExecutionOwner.get in the implementation of iterator() is sufficent to load the Pipeline. LOGGER.log(Level.FINE, "Eagerly loaded {0}", e); + Futures.addCallback(e.getCurrentExecutions(false), new FutureCallback>() { + @Override + public void onSuccess(@NonNull List result) { + LOGGER.log(Level.FINE, "Will resume {0}", result); + for (StepExecution se : result) { + se.onResume(); + } + } + + @Override + public void onFailure(@NonNull Throwable t) { + if (t instanceof CancellationException) { + LOGGER.log(Level.FINE, "Cancelled load of " + e, t); + } else { + LOGGER.log(Level.WARNING, "Failed to load " + e, t); + } + } + }, MoreExecutors.directExecutor()); } - list.resumptionComplete = true; } } @@ -243,57 +252,4 @@ public void onFailure(@NonNull Throwable t) { executor.shutdown(); executor.awaitTermination(1, TimeUnit.MINUTES); } - - /** - * Whenever a Pipeline resumes, resume all incomplete steps in its {@link FlowExecution}. - * - * Called by {@code WorkflowRun.onLoad}, so guaranteed to run if a Pipeline resumes regardless of its presence in - * {@link FlowExecutionList}. - */ - @Extension - public static class ResumeStepExecutionListener extends FlowExecutionListener { - @Override - public void onResumed(@NonNull FlowExecution e) { - Futures.addCallback(e.getCurrentExecutions(false), new FutureCallback>() { - @Override - public void onSuccess(@NonNull List result) { - if (e.isComplete()) { - // WorkflowRun.onLoad will not fire onResumed if the serialized execution was already - // complete when loaded, but right now (at least for workflow-cps), the execution resumes - // asynchronously before WorkflowRun.onLoad completes, so it is possible that the execution - // finishes before onResumed gets called. - // That said, there is nothing to prevent the execution from completing right after we check - // isComplete. If we want to fully prevent that, we would need to delay actual execution - // resumption until WorkflowRun.onLoad completes or add some form of synchronization. - return; - } - FlowExecutionList list = FlowExecutionList.get(); - FlowExecutionOwner owner = e.getOwner(); - if (!list.runningTasks.contains(owner)) { - LOGGER.log(Level.WARNING, "Resuming {0}, which is missing from FlowExecutionList ({1}), so registering it now.", - new Object[] {owner, list.runningTasks.getView()}); - list.register(owner); - } - LOGGER.log(Level.FINE, "Will resume {0}", result); - for (StepExecution se : result) { - try { - se.onResume(); - } catch (Throwable x) { - se.getContext().onFailure(x); - } - } - } - - @Override - public void onFailure(@NonNull Throwable t) { - if (t instanceof CancellationException) { - LOGGER.log(Level.FINE, "Cancelled load of " + e, t); - } else { - LOGGER.log(Level.WARNING, "Failed to load " + e, t); - } - } - - }, Timer.get()); // We always hold RunMap and WorkflowRun locks here, so we resume steps on a different thread to avoid potential deadlocks. See JENKINS-67351. - } - } } diff --git a/src/test/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionListTest.java b/src/test/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionListTest.java index 8cf59f30..733a3859 100644 --- a/src/test/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionListTest.java +++ b/src/test/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionListTest.java @@ -24,35 +24,17 @@ package org.jenkinsci.plugins.workflow.flow; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.hasItem; import static org.junit.Assert.assertNotNull; -import hudson.AbortException; import hudson.model.ParametersAction; import hudson.model.ParametersDefinitionProperty; -import hudson.model.Result; import hudson.model.StringParameterDefinition; import hudson.model.StringParameterValue; -import hudson.model.TaskListener; import hudson.model.queue.QueueTaskFuture; -import java.io.Serializable; -import java.time.Duration; -import java.time.Instant; -import java.util.Collections; -import java.util.Set; -import java.util.function.Supplier; import java.util.logging.Level; -import org.hamcrest.Matcher; import org.jenkinsci.plugins.workflow.cps.CpsFlowDefinition; import org.jenkinsci.plugins.workflow.job.WorkflowJob; import org.jenkinsci.plugins.workflow.job.WorkflowRun; -import org.jenkinsci.plugins.workflow.steps.Step; -import org.jenkinsci.plugins.workflow.steps.StepContext; -import org.jenkinsci.plugins.workflow.steps.StepDescriptor; -import org.jenkinsci.plugins.workflow.steps.StepExecution; -import org.jenkinsci.plugins.workflow.test.steps.SemaphoreStep; import org.junit.ClassRule; import org.junit.Test; import org.junit.Rule; @@ -60,8 +42,6 @@ import org.jvnet.hudson.test.Issue; import org.jvnet.hudson.test.LoggerRule; import org.jvnet.hudson.test.JenkinsSessionRule; -import org.jvnet.hudson.test.TestExtension; -import org.kohsuke.stapler.DataBoundConstructor; public class FlowExecutionListTest { @@ -99,113 +79,4 @@ public class FlowExecutionListTest { }); } - @Test public void forceLoadRunningExecutionsAfterRestart() throws Throwable { - logging.capture(50); - sessions.then(r -> { - WorkflowJob p = r.jenkins.createProject(WorkflowJob.class, "p"); - p.setDefinition(new CpsFlowDefinition("semaphore('wait')", true)); - WorkflowRun b = p.scheduleBuild2(0).waitForStart(); - SemaphoreStep.waitForStart("wait/1", b); - }); - sessions.then(r -> { - /* - Make sure that the build gets loaded automatically by FlowExecutionList$ItemListenerImpl before we load it explictly. - Expected call stack for resuming a Pipelines and its steps: - at org.jenkinsci.plugins.workflow.flow.FlowExecutionList$ResumeStepExecutionListener$1.onSuccess(FlowExecutionList.java:250) - at org.jenkinsci.plugins.workflow.flow.FlowExecutionList$ResumeStepExecutionListener$1.onSuccess(FlowExecutionList.java:247) - at com.google.common.util.concurrent.Futures$6.run(Futures.java:975) - at org.jenkinsci.plugins.workflow.flow.DirectExecutor.execute(DirectExecutor.java:33) - ... Guava Futures API internals ... - at com.google.common.util.concurrent.Futures.addCallback(Futures.java:985) - at org.jenkinsci.plugins.workflow.flow.FlowExecutionList$ResumeStepExecutionListener.onResumed(FlowExecutionList.java:247) - at org.jenkinsci.plugins.workflow.flow.FlowExecutionListener.fireResumed(FlowExecutionListener.java:84) - at org.jenkinsci.plugins.workflow.job.WorkflowRun.onLoad(WorkflowRun.java:528) - at hudson.model.RunMap.retrieve(RunMap.java:225) - ... RunMap internals ... - at hudson.model.RunMap.getById(RunMap.java:205) - at org.jenkinsci.plugins.workflow.job.WorkflowRun$Owner.run(WorkflowRun.java:937) - at org.jenkinsci.plugins.workflow.job.WorkflowRun$Owner.get(WorkflowRun.java:948) - at org.jenkinsci.plugins.workflow.flow.FlowExecutionList$1.computeNext(FlowExecutionList.java:65) - at org.jenkinsci.plugins.workflow.flow.FlowExecutionList$1.computeNext(FlowExecutionList.java:57) - at com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:143) - at com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:138) - at org.jenkinsci.plugins.workflow.flow.FlowExecutionList$ItemListenerImpl.onLoaded(FlowExecutionList.java:175) - at jenkins.model.Jenkins.(Jenkins.java:1019) - */ - waitFor(logging::getMessages, hasItem(containsString("Will resume [org.jenkinsci.plugins.workflow.test.steps.SemaphoreStep"))); - WorkflowJob p = r.jenkins.getItemByFullName("p", WorkflowJob.class); - SemaphoreStep.success("wait/1", null); - WorkflowRun b = p.getBuildByNumber(1); - r.waitForCompletion(b); - r.assertBuildStatus(Result.SUCCESS, b); - }); - } - - @Issue("JENKINS-67164") - @Test public void resumeStepExecutions() throws Throwable { - sessions.then(r -> { - WorkflowJob p = r.jenkins.createProject(WorkflowJob.class, "p"); - p.setDefinition(new CpsFlowDefinition("noResume()", true)); - WorkflowRun b = p.scheduleBuild2(0).waitForStart(); - r.waitForMessage("Starting non-resumable step", b); - // TODO: Unclear how this might happen in practice. - FlowExecutionList.get().unregister(b.asFlowExecutionOwner()); - }); - sessions.then(r -> { - WorkflowJob p = r.jenkins.getItemByFullName("p", WorkflowJob.class); - WorkflowRun b = p.getBuildByNumber(1); - r.waitForCompletion(b); - r.assertBuildStatus(Result.FAILURE, b); - r.assertLogContains("Unable to resume NonResumableStep", b); - }); - } - - public static class NonResumableStep extends Step implements Serializable { - public static final long serialVersionUID = 1L; - @DataBoundConstructor - public NonResumableStep() { } - @Override - public StepExecution start(StepContext sc) { - return new ExecutionImpl(sc); - } - - private static class ExecutionImpl extends StepExecution implements Serializable { - public static final long serialVersionUID = 1L; - private ExecutionImpl(StepContext sc) { - super(sc); - } - @Override - public boolean start() throws Exception { - getContext().get(TaskListener.class).getLogger().println("Starting non-resumable step"); - return false; - } - @Override - public void onResume() { - getContext().onFailure(new AbortException("Unable to resume NonResumableStep")); - } - } - - @TestExtension public static class DescriptorImpl extends StepDescriptor { - @Override - public Set> getRequiredContext() { - return Collections.singleton(TaskListener.class); - } - @Override - public String getFunctionName() { - return "noResume"; - } - } - } - - /** - * Wait up to 5 seconds for the given supplier to return a matching value. - */ - private static void waitFor(Supplier valueSupplier, Matcher matcher) throws InterruptedException { - Instant end = Instant.now().plus(Duration.ofSeconds(5)); - while (!matcher.matches(valueSupplier.get()) && Instant.now().isBefore(end)) { - Thread.sleep(100L); - } - assertThat("Matcher should have matched after 5s", valueSupplier.get(), matcher); - } - }