Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert fixes related to JENKINS-67351 and JENKINS-67164 #198

Merged
merged 7 commits into from
Jan 18, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import hudson.Extension;
import hudson.ExtensionList;
import hudson.XmlFile;
import hudson.init.InitMilestone;
import hudson.init.Terminator;
import hudson.model.listeners.ItemListener;
import hudson.remoting.SingleLaneExecutorService;
Expand All @@ -32,7 +31,6 @@
import java.util.logging.Logger;

import org.kohsuke.accmod.Restricted;
import org.kohsuke.accmod.restrictions.Beta;
import org.kohsuke.accmod.restrictions.DoNotUse;

/**
Expand All @@ -46,8 +44,6 @@ public class FlowExecutionList implements Iterable<FlowExecution> {
private final SingleLaneExecutorService executor = new SingleLaneExecutorService(Timer.get());
private XmlFile configFile;

private transient volatile boolean resumptionComplete;

public FlowExecutionList() {
load();
}
Expand Down Expand Up @@ -164,15 +160,11 @@ public static FlowExecutionList get() {
}

/**
* Returns true if all executions that were present in this {@link FlowExecutionList} have been loaded and resumed.
*
* This takes place slightly after {@link InitMilestone#COMPLETED} is reached during Jenkins startup.
*
* Useful to avoid resuming Pipelines in contexts that may lead to deadlock.
* @deprecated Only exists for binary compatibility.
*/
@Restricted(Beta.class)
@Deprecated
public boolean isResumptionComplete() {
return resumptionComplete;
return false;
}

/**
Expand All @@ -187,8 +179,25 @@ public void onLoaded() {
for (final FlowExecution e : list) {
// The call to FlowExecutionOwner.get in the implementation of iterator() is sufficent to load the Pipeline.
LOGGER.log(Level.FINE, "Eagerly loaded {0}", e);
Futures.addCallback(e.getCurrentExecutions(false), new FutureCallback<List<StepExecution>>() {
@Override
public void onSuccess(@NonNull List<StepExecution> result) {
LOGGER.log(Level.FINE, "Will resume {0}", result);
for (StepExecution se : result) {
se.onResume();
}
}

@Override
public void onFailure(@NonNull Throwable t) {
if (t instanceof CancellationException) {
LOGGER.log(Level.FINE, "Cancelled load of " + e, t);
} else {
LOGGER.log(Level.WARNING, "Failed to load " + e, t);
}
}
}, MoreExecutors.directExecutor());
}
list.resumptionComplete = true;
}
}

Expand Down Expand Up @@ -243,57 +252,4 @@ public void onFailure(@NonNull Throwable t) {
executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);
}

/**
* Whenever a Pipeline resumes, resume all incomplete steps in its {@link FlowExecution}.
*
* Called by {@code WorkflowRun.onLoad}, so guaranteed to run if a Pipeline resumes regardless of its presence in
* {@link FlowExecutionList}.
*/
@Extension
public static class ResumeStepExecutionListener extends FlowExecutionListener {
@Override
public void onResumed(@NonNull FlowExecution e) {
Futures.addCallback(e.getCurrentExecutions(false), new FutureCallback<List<StepExecution>>() {
@Override
public void onSuccess(@NonNull List<StepExecution> result) {
if (e.isComplete()) {
// WorkflowRun.onLoad will not fire onResumed if the serialized execution was already
// complete when loaded, but right now (at least for workflow-cps), the execution resumes
// asynchronously before WorkflowRun.onLoad completes, so it is possible that the execution
// finishes before onResumed gets called.
// That said, there is nothing to prevent the execution from completing right after we check
// isComplete. If we want to fully prevent that, we would need to delay actual execution
// resumption until WorkflowRun.onLoad completes or add some form of synchronization.
return;
}
FlowExecutionList list = FlowExecutionList.get();
FlowExecutionOwner owner = e.getOwner();
if (!list.runningTasks.contains(owner)) {
LOGGER.log(Level.WARNING, "Resuming {0}, which is missing from FlowExecutionList ({1}), so registering it now.",
new Object[] {owner, list.runningTasks.getView()});
list.register(owner);
}
LOGGER.log(Level.FINE, "Will resume {0}", result);
for (StepExecution se : result) {
try {
se.onResume();
} catch (Throwable x) {
se.getContext().onFailure(x);
}
}
}

@Override
public void onFailure(@NonNull Throwable t) {
if (t instanceof CancellationException) {
LOGGER.log(Level.FINE, "Cancelled load of " + e, t);
} else {
LOGGER.log(Level.WARNING, "Failed to load " + e, t);
}
}

}, Timer.get()); // We always hold RunMap and WorkflowRun locks here, so we resume steps on a different thread to avoid potential deadlocks. See JENKINS-67351.
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,44 +24,24 @@

package org.jenkinsci.plugins.workflow.flow;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.hasItem;
import static org.junit.Assert.assertNotNull;

import hudson.AbortException;
import hudson.model.ParametersAction;
import hudson.model.ParametersDefinitionProperty;
import hudson.model.Result;
import hudson.model.StringParameterDefinition;
import hudson.model.StringParameterValue;
import hudson.model.TaskListener;
import hudson.model.queue.QueueTaskFuture;
import java.io.Serializable;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.Set;
import java.util.function.Supplier;
import java.util.logging.Level;
import org.hamcrest.Matcher;
import org.jenkinsci.plugins.workflow.cps.CpsFlowDefinition;
import org.jenkinsci.plugins.workflow.job.WorkflowJob;
import org.jenkinsci.plugins.workflow.job.WorkflowRun;
import org.jenkinsci.plugins.workflow.steps.Step;
import org.jenkinsci.plugins.workflow.steps.StepContext;
import org.jenkinsci.plugins.workflow.steps.StepDescriptor;
import org.jenkinsci.plugins.workflow.steps.StepExecution;
import org.jenkinsci.plugins.workflow.test.steps.SemaphoreStep;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.Rule;
import org.jvnet.hudson.test.BuildWatcher;
import org.jvnet.hudson.test.Issue;
import org.jvnet.hudson.test.LoggerRule;
import org.jvnet.hudson.test.JenkinsSessionRule;
import org.jvnet.hudson.test.TestExtension;
import org.kohsuke.stapler.DataBoundConstructor;

public class FlowExecutionListTest {

Expand Down Expand Up @@ -99,113 +79,4 @@ public class FlowExecutionListTest {
});
}

@Test public void forceLoadRunningExecutionsAfterRestart() throws Throwable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or just @Ignore

logging.capture(50);
sessions.then(r -> {
WorkflowJob p = r.jenkins.createProject(WorkflowJob.class, "p");
p.setDefinition(new CpsFlowDefinition("semaphore('wait')", true));
WorkflowRun b = p.scheduleBuild2(0).waitForStart();
SemaphoreStep.waitForStart("wait/1", b);
});
sessions.then(r -> {
/*
Make sure that the build gets loaded automatically by FlowExecutionList$ItemListenerImpl before we load it explictly.
Expected call stack for resuming a Pipelines and its steps:
at org.jenkinsci.plugins.workflow.flow.FlowExecutionList$ResumeStepExecutionListener$1.onSuccess(FlowExecutionList.java:250)
at org.jenkinsci.plugins.workflow.flow.FlowExecutionList$ResumeStepExecutionListener$1.onSuccess(FlowExecutionList.java:247)
at com.google.common.util.concurrent.Futures$6.run(Futures.java:975)
at org.jenkinsci.plugins.workflow.flow.DirectExecutor.execute(DirectExecutor.java:33)
... Guava Futures API internals ...
at com.google.common.util.concurrent.Futures.addCallback(Futures.java:985)
at org.jenkinsci.plugins.workflow.flow.FlowExecutionList$ResumeStepExecutionListener.onResumed(FlowExecutionList.java:247)
at org.jenkinsci.plugins.workflow.flow.FlowExecutionListener.fireResumed(FlowExecutionListener.java:84)
at org.jenkinsci.plugins.workflow.job.WorkflowRun.onLoad(WorkflowRun.java:528)
at hudson.model.RunMap.retrieve(RunMap.java:225)
... RunMap internals ...
at hudson.model.RunMap.getById(RunMap.java:205)
at org.jenkinsci.plugins.workflow.job.WorkflowRun$Owner.run(WorkflowRun.java:937)
at org.jenkinsci.plugins.workflow.job.WorkflowRun$Owner.get(WorkflowRun.java:948)
at org.jenkinsci.plugins.workflow.flow.FlowExecutionList$1.computeNext(FlowExecutionList.java:65)
at org.jenkinsci.plugins.workflow.flow.FlowExecutionList$1.computeNext(FlowExecutionList.java:57)
at com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:143)
at com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:138)
at org.jenkinsci.plugins.workflow.flow.FlowExecutionList$ItemListenerImpl.onLoaded(FlowExecutionList.java:175)
at jenkins.model.Jenkins.<init>(Jenkins.java:1019)
*/
waitFor(logging::getMessages, hasItem(containsString("Will resume [org.jenkinsci.plugins.workflow.test.steps.SemaphoreStep")));
WorkflowJob p = r.jenkins.getItemByFullName("p", WorkflowJob.class);
SemaphoreStep.success("wait/1", null);
WorkflowRun b = p.getBuildByNumber(1);
r.waitForCompletion(b);
r.assertBuildStatus(Result.SUCCESS, b);
});
}

@Issue("JENKINS-67164")
@Test public void resumeStepExecutions() throws Throwable {
sessions.then(r -> {
WorkflowJob p = r.jenkins.createProject(WorkflowJob.class, "p");
p.setDefinition(new CpsFlowDefinition("noResume()", true));
WorkflowRun b = p.scheduleBuild2(0).waitForStart();
r.waitForMessage("Starting non-resumable step", b);
// TODO: Unclear how this might happen in practice.
FlowExecutionList.get().unregister(b.asFlowExecutionOwner());
});
sessions.then(r -> {
WorkflowJob p = r.jenkins.getItemByFullName("p", WorkflowJob.class);
WorkflowRun b = p.getBuildByNumber(1);
r.waitForCompletion(b);
r.assertBuildStatus(Result.FAILURE, b);
r.assertLogContains("Unable to resume NonResumableStep", b);
});
}

public static class NonResumableStep extends Step implements Serializable {
public static final long serialVersionUID = 1L;
@DataBoundConstructor
public NonResumableStep() { }
@Override
public StepExecution start(StepContext sc) {
return new ExecutionImpl(sc);
}

private static class ExecutionImpl extends StepExecution implements Serializable {
public static final long serialVersionUID = 1L;
private ExecutionImpl(StepContext sc) {
super(sc);
}
@Override
public boolean start() throws Exception {
getContext().get(TaskListener.class).getLogger().println("Starting non-resumable step");
return false;
}
@Override
public void onResume() {
getContext().onFailure(new AbortException("Unable to resume NonResumableStep"));
}
}

@TestExtension public static class DescriptorImpl extends StepDescriptor {
@Override
public Set<? extends Class<?>> getRequiredContext() {
return Collections.singleton(TaskListener.class);
}
@Override
public String getFunctionName() {
return "noResume";
}
}
}

/**
* Wait up to 5 seconds for the given supplier to return a matching value.
*/
private static <T> void waitFor(Supplier<T> valueSupplier, Matcher<T> matcher) throws InterruptedException {
Instant end = Instant.now().plus(Duration.ofSeconds(5));
while (!matcher.matches(valueSupplier.get()) && Instant.now().isBefore(end)) {
Thread.sleep(100L);
}
assertThat("Matcher should have matched after 5s", valueSupplier.get(), matcher);
}

}