Skip to content

Commit 3abe071

Browse files
committed
Fix race condition in ScalaCompile by selectively interrupting futures with Thread.interrupt
Prior to this change there was a bug when ScalaCompile actions were used with dynamic execution. The bug would cause builds to fail with the error message "compiler mirror not found". It is my understanding this message typically indicates a problem loading the Scala Library jar during compilation. The error would always occur in the local multiplex worker's logs and not the remote logs. This commit fixes this error by not using Thread.interrupt when cancelling the future running the ScalaCompile action. My hypothesis on why this error occurs is as follows: imagine one of the classloaders in the shared ScalaInstance is in use on Thread A. That thread gets cancelled and interrupted via Thread.interrupt. This causes some kind of persistent error inside either the classloader or another part of Zinc or the Scala compiler. We correctly ignore the error on Thread A because the request it was handling was cancelled. Another thread is working a non-cancelled request, it uses that same ScalaInstance, hit that persistent error, and fails. By not using Thread.interrupt we don't trigger the persistent error and thus avoid the bug.
1 parent da27b46 commit 3abe071

File tree

16 files changed

+181
-49
lines changed

16 files changed

+181
-49
lines changed

rules/scala_proto/private/ScalaProtoWorker.scala

+11-4
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,16 @@ object ScalaProtoWorker extends WorkerMain[Unit] {
7272

7373
override def init(args: Option[Array[String]]): Unit = ()
7474

75-
protected def work(ctx: Unit, args: Array[String], out: PrintStream, workDir: Path, verbosity: Int): Unit = {
75+
protected def work(
76+
ctx: Unit,
77+
args: Array[String],
78+
out: PrintStream,
79+
workDir: Path,
80+
verbosity: Int,
81+
isCancelled: Function0[Boolean],
82+
): Unit = {
7683
val workRequest = ScalaProtoRequest(workDir, ArgsUtil.parseArgsOrFailSafe(args, argParser, out))
77-
InterruptUtil.throwIfInterrupted()
84+
InterruptUtil.throwIfInterrupted(isCancelled)
7885

7986
val scalaOut = workRequest.outputDir
8087
Files.createDirectories(scalaOut)
@@ -93,7 +100,7 @@ object ScalaProtoWorker extends WorkerMain[Unit] {
93100
}
94101
}
95102

96-
InterruptUtil.throwIfInterrupted()
103+
InterruptUtil.throwIfInterrupted(isCancelled)
97104
val exitCode = ProtocBridge.runWithGenerators(
98105
new MyProtocRunner,
99106
namedGenerators = List("scala" -> ScalaPbCodeGenerator),
@@ -102,7 +109,7 @@ object ScalaProtoWorker extends WorkerMain[Unit] {
102109
if (exitCode != 0) {
103110
throw new AnnexWorkerError(exitCode)
104111
}
105-
InterruptUtil.throwIfInterrupted()
112+
InterruptUtil.throwIfInterrupted(isCancelled)
106113
}
107114

108115
}

rules/scalafmt/scalafmt/ScalafmtRunner.scala

+11-4
Original file line numberDiff line numberDiff line change
@@ -45,16 +45,23 @@ object ScalafmtRunner extends WorkerMain[Unit] {
4545

4646
protected def init(args: Option[Array[String]]): Unit = {}
4747

48-
protected def work(worker: Unit, args: Array[String], out: PrintStream, workDir: Path, verbosity: Int): Unit = {
48+
protected def work(
49+
worker: Unit,
50+
args: Array[String],
51+
out: PrintStream,
52+
workDir: Path,
53+
verbosity: Int,
54+
isCancelled: Function0[Boolean],
55+
): Unit = {
4956
val workRequest = ScalafmtRequest(workDir, ArgsUtil.parseArgsOrFailSafe(args, argParser, out))
50-
InterruptUtil.throwIfInterrupted()
57+
InterruptUtil.throwIfInterrupted(isCancelled)
5158

5259
val source = FileOps.readFile(workRequest.inputFile)(Codec.UTF8)
5360

5461
val config = ScalafmtConfig.fromHoconFile(workRequest.configFile).get
5562
@tailrec
5663
def format(code: String): String = {
57-
InterruptUtil.throwIfInterrupted()
64+
InterruptUtil.throwIfInterrupted(isCancelled)
5865
val formatted = Scalafmt.format(code, config).get
5966
if (code == formatted) code else format(formatted)
6067
}
@@ -76,7 +83,7 @@ object ScalafmtRunner extends WorkerMain[Unit] {
7683
}
7784

7885
Files.write(workRequest.outputFile, output.getBytes)
79-
InterruptUtil.throwIfInterrupted()
86+
InterruptUtil.throwIfInterrupted(isCancelled)
8087
}
8188

8289
}
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
11
package higherkindness.rules_scala
22
package common.interrupt
33

4+
import java.util.concurrent.CancellationException
5+
46
object InterruptUtil {
5-
def throwIfInterrupted(): Unit = {
7+
def throwIfInterrupted(isCancelled: Function0[Boolean]): Unit = {
68
if (Thread.interrupted()) {
7-
throw new InterruptedException("WorkRequest was cancelled.")
9+
throw new InterruptedException("WorkRequest was cancelled via Thread interruption.")
10+
} else if (isCancelled()) {
11+
throw new CancellationException("WorkRequest was cancelled via FutureTask cancellation.")
812
}
913
}
1014
}

src/main/scala/higherkindness/rules_scala/common/worker/CancellableTask.scala

+8-2
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,12 @@ import scala.util.Try
1616
* Heavily inspired by the following: https://github.com/NthPortal/cancellable-task/tree/master
1717
* https://stackoverflow.com/a/39986418/6442597
1818
*/
19-
class CancellableTask[S] private (fn: => S) {
19+
class CancellableTask[S] private (fn: Function1[Function0[Boolean], S]) {
2020
private val promise = Promise[S]()
2121
val future: Future[S] = promise.future
2222

2323
private val fnCallable = new Callable[S]() {
24-
def call(): S = fn
24+
def call(): S = fn(isCancelled)
2525
}
2626

2727
private val task = new FutureTaskWaitOnCancel[S](fnCallable) {
@@ -39,10 +39,16 @@ class CancellableTask[S] private (fn: => S) {
3939
def cancel(mayInterruptIfRunning: Boolean): Boolean = task.cancel(mayInterruptIfRunning)
4040

4141
def execute(executionContext: ExecutionContext): Unit = executionContext.execute(task)
42+
43+
def isCancelled(): Boolean = task.isCancelled()
4244
}
4345

4446
object CancellableTask {
4547
def apply[S](fn: => S): CancellableTask[S] = {
48+
new CancellableTask((_: Function0[Boolean]) => fn)
49+
}
50+
51+
def apply[S](fn: Function1[Function0[Boolean], S]): CancellableTask[S] = {
4652
new CancellableTask(fn)
4753
}
4854
}

src/main/scala/higherkindness/rules_scala/common/worker/WorkerMain.scala

+36-6
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ package common.worker
44
import common.error.{AnnexDuplicateActiveRequestException, AnnexWorkerError}
55
import com.google.devtools.build.lib.worker.WorkerProtocol
66
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, InputStream, OutputStream, PrintStream}
7+
import java.nio.channels.ClosedByInterruptException
78
import java.nio.file.{Path, Paths}
89
import java.util.concurrent.{Callable, CancellationException, ConcurrentHashMap, ForkJoinPool, FutureTask}
910
import scala.annotation.tailrec
@@ -15,10 +16,34 @@ abstract class WorkerMain[S](stdin: InputStream = System.in, stdout: PrintStream
1516

1617
protected def init(args: Option[Array[String]]): S
1718

18-
protected def work(ctx: S, args: Array[String], out: PrintStream, workDir: Path, verbosity: Int): Unit
19-
19+
/**
20+
* isCancelled is used to determine whether the FutureTask that is executing this work request has been cancelled. If
21+
* it is safe to Thread.interrupt a process, then that is done and can be checked. Not all workers can be
22+
* Thread.interrupted safely.
23+
*
24+
* TODO(James): document the rest of this function
25+
*/
26+
protected def work(
27+
ctx: S,
28+
args: Array[String],
29+
out: PrintStream,
30+
workDir: Path,
31+
verbosity: Int,
32+
isCancelled: Function0[Boolean],
33+
): Unit
34+
35+
/**
36+
* Indicates whether this program is being executed as a worker or as a regular process. It is a var because we won't
37+
* know until runtime which one it is.
38+
*/
2039
protected var isWorker = false
2140

41+
/**
42+
* Used to determine whether to interrupt the FutureTasks being executed by this worker using Thread.interrupt or not.
43+
* It's safe to interrupt many things with Thread.interrupt, but not all things.
44+
*/
45+
protected val mayInterruptWorkerTasks = true
46+
2247
final def main(args: Array[String]): Unit = {
2348
args.toList match {
2449
case "--persistent_worker" :: args =>
@@ -113,7 +138,9 @@ abstract class WorkerMain[S](stdin: InputStream = System.in, stdout: PrintStream
113138
Option(activeRequests.get(requestId)).foreach { activeRequest =>
114139
// Cancel will wait for the thread to complete or be interrupted, so we do it in a future
115140
// to prevent blocking the worker from processing more requests
116-
Future(activeRequest.cancel(mayInterruptIfRunning = true))(scala.concurrent.ExecutionContext.global)
141+
Future(activeRequest.cancel(mayInterruptIfRunning = mayInterruptWorkerTasks))(
142+
scala.concurrent.ExecutionContext.global,
143+
)
117144
}
118145
} else {
119146
val args = request.getArgumentsList.toArray(Array.empty[String])
@@ -131,13 +158,13 @@ abstract class WorkerMain[S](stdin: InputStream = System.in, stdout: PrintStream
131158
maybeOut.map(_.flush())
132159
}
133160

134-
val workTask = CancellableTask {
161+
def doWork(isCancelled: Function0[Boolean]) = {
135162
val outStream = new ByteArrayOutputStream()
136163
val out = new PrintStream(outStream)
137164
maybeOutStream = Some(outStream)
138165
maybeOut = Some(out)
139166
try {
140-
work(ctx, args, out, sandboxDir, verbosity)
167+
work(ctx, args, out, sandboxDir, verbosity, isCancelled)
141168
0
142169
} catch {
143170
case e @ AnnexWorkerError(code, _, _) =>
@@ -146,6 +173,8 @@ abstract class WorkerMain[S](stdin: InputStream = System.in, stdout: PrintStream
146173
}
147174
}
148175

176+
val workTask = CancellableTask(doWork)
177+
149178
workTask.future
150179
.andThen {
151180
// Work task succeeded or failed in an expected way
@@ -178,7 +207,7 @@ abstract class WorkerMain[S](stdin: InputStream = System.in, stdout: PrintStream
178207
}
179208

180209
// Task successfully cancelled
181-
case Failure(e: CancellationException) =>
210+
case Failure(e @ (_: CancellationException | _: ClosedByInterruptException)) =>
182211
flushOut()
183212
writeResponse(requestId, None, None, wasCancelled = true)
184213
logVerbose(
@@ -240,6 +269,7 @@ abstract class WorkerMain[S](stdin: InputStream = System.in, stdout: PrintStream
240269
out,
241270
workDir = Path.of(""),
242271
verbosity = 0,
272+
isCancelled = () => false,
243273
)
244274

245275
0

src/main/scala/higherkindness/rules_scala/workers/bloop/compile/BloopRunner.scala

+8-1
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,12 @@ import java.nio.file.Path
99

1010
object BloopRunner extends WorkerMain[Unit] {
1111
override def init(args: Option[Array[String]]): Unit = ()
12-
override def work(ctx: Unit, args: Array[String], out: PrintStream, workDir: Path, verbosity: Int): Unit = Bloop
12+
override def work(
13+
ctx: Unit,
14+
args: Array[String],
15+
out: PrintStream,
16+
workDir: Path,
17+
verbosity: Int,
18+
isCancelled: Function0[Boolean],
19+
): Unit = Bloop
1320
}

src/main/scala/higherkindness/rules_scala/workers/deps/DepsRunner.scala

+12-5
Original file line numberDiff line numberDiff line change
@@ -111,9 +111,16 @@ object DepsRunner extends WorkerMain[Unit] {
111111

112112
override def init(args: Option[Array[String]]): Unit = ()
113113

114-
override def work(ctx: Unit, args: Array[String], out: PrintStream, workDir: Path, verbosity: Int): Unit = {
114+
override def work(
115+
ctx: Unit,
116+
args: Array[String],
117+
out: PrintStream,
118+
workDir: Path,
119+
verbosity: Int,
120+
isCancelled: Function0[Boolean],
121+
): Unit = {
115122
val workRequest = DepsRunnerRequest(workDir, ArgsUtil.parseArgsOrFailSafe(args, argParser, out))
116-
InterruptUtil.throwIfInterrupted()
123+
InterruptUtil.throwIfInterrupted(isCancelled)
117124

118125
val groupLabelToJarPaths = workRequest.groups.map { group =>
119126
group.label -> group.jars
@@ -136,7 +143,7 @@ object DepsRunner extends WorkerMain[Unit] {
136143
val readWriteMappers = AnnexMapper.mappers(workDir, isIncremental = false)
137144
val readMapper = readWriteMappers.getReadMapper()
138145

139-
InterruptUtil.throwIfInterrupted()
146+
InterruptUtil.throwIfInterrupted(isCancelled)
140147
val usedPaths = Files
141148
.readAllLines(workRequest.usedDepsFile)
142149
.asScala
@@ -165,7 +172,7 @@ object DepsRunner extends WorkerMain[Unit] {
165172
Nil
166173
}
167174

168-
InterruptUtil.throwIfInterrupted()
175+
InterruptUtil.throwIfInterrupted(isCancelled)
169176
val labelsToAdd = if (workRequest.checkDirect) {
170177
(usedPaths -- (workRequest.directDepLabels :++ workRequest.unusedDepWhitelist).flatMap(pathsForLabel))
171178
.flatMap { path =>
@@ -198,6 +205,6 @@ object DepsRunner extends WorkerMain[Unit] {
198205
throw new AnnexWorkerError(1, errorMessage.result())
199206
}
200207

201-
InterruptUtil.throwIfInterrupted()
208+
InterruptUtil.throwIfInterrupted(isCancelled)
202209
}
203210
}

src/main/scala/higherkindness/rules_scala/workers/jacoco/instrumenter/JacocoInstrumenter.scala

+10-3
Original file line numberDiff line numberDiff line change
@@ -68,14 +68,21 @@ object JacocoInstrumenter extends WorkerMain[Unit] {
6868

6969
override def init(args: Option[Array[String]]): Unit = ()
7070

71-
override def work(ctx: Unit, args: Array[String], out: PrintStream, workDir: Path, verbosity: Int): Unit = {
71+
override def work(
72+
ctx: Unit,
73+
args: Array[String],
74+
out: PrintStream,
75+
workDir: Path,
76+
verbosity: Int,
77+
isCancelled: Function0[Boolean],
78+
): Unit = {
7279
val workRequest = JacocoRequest(workDir, ArgsUtil.parseArgsOrFailSafe(args, argParser, out))
7380

7481
val jacoco = new Instrumenter(new OfflineInstrumentationAccessGenerator)
7582

7683
workRequest.jars.foreach { case (inPath, outPath) =>
7784
Using.Manager { use =>
78-
InterruptUtil.throwIfInterrupted()
85+
InterruptUtil.throwIfInterrupted(isCancelled)
7986

8087
val inFS = use(FileSystems.newFileSystem(inPath, null: ClassLoader))
8188
val outFS =
@@ -107,6 +114,6 @@ object JacocoInstrumenter extends WorkerMain[Unit] {
107114
}.get
108115
}
109116

110-
InterruptUtil.throwIfInterrupted()
117+
InterruptUtil.throwIfInterrupted(isCancelled)
111118
}
112119
}

src/main/scala/higherkindness/rules_scala/workers/zinc/compile/ZincRunner.scala

+27-13
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,10 @@ object ZincRunnerWorkerConfig {
7676
*/
7777
object ZincRunner extends WorkerMain[ZincRunnerWorkerConfig] {
7878

79+
// Interrupting concurrent Zinc/Scala compilation with a shared ScalaInstance (and thus shared classloaders)
80+
// can cause strange concurrency errors.
81+
override protected val mayInterruptWorkerTasks = false
82+
7983
private val classloaderCache = new ClassLoaderCache(new URLClassLoader(Array()))
8084

8185
private val compilerCache = CompilerCache.fresh
@@ -107,9 +111,10 @@ object ZincRunner extends WorkerMain[ZincRunnerWorkerConfig] {
107111
out: PrintStream,
108112
workDir: Path,
109113
verbosity: Int,
114+
isCancelled: Function0[Boolean],
110115
): Unit = {
111116
val workRequest = CommonArguments(ArgsUtil.parseArgsOrFailSafe(args, parser, out), workDir)
112-
InterruptUtil.throwIfInterrupted()
117+
InterruptUtil.throwIfInterrupted(isCancelled)
113118

114119
// These two paths must only be used when persistence is enabled because they escape the sandbox.
115120
// Sandboxing is disabled if persistence is enabled.
@@ -156,7 +161,7 @@ object ZincRunner extends WorkerMain[ZincRunnerWorkerConfig] {
156161
}
157162
Dep.create(extractedFileCache, workRequest.classpath, analyses)
158163
}
159-
InterruptUtil.throwIfInterrupted()
164+
InterruptUtil.throwIfInterrupted(isCancelled)
160165

161166
val debug = workRequest.debug
162167
val analysisStorePath = workRequest.outputAnalysisStore
@@ -282,28 +287,37 @@ object ZincRunner extends WorkerMain[ZincRunnerWorkerConfig] {
282287

283288
val inputs = Inputs.of(compilers, compileOptions, setup, previousResult)
284289

285-
InterruptUtil.throwIfInterrupted()
290+
InterruptUtil.throwIfInterrupted(isCancelled)
286291

287292
// compile
288293
val incrementalCompiler = new IncrementalCompilerImpl()
289294
val compileResult =
290-
try incrementalCompiler.compile(inputs, logger)
291-
catch {
292-
case _: CompileFailed => throw new AnnexWorkerError(-1)
295+
try {
296+
incrementalCompiler.compile(inputs, logger)
297+
} catch {
298+
// The thread running this may have been interrupted during compilation due to a cancel request.
299+
// It's possible that the interruption contribute to the error. We should check if we were
300+
// interrupted, so we can respond with a cancellation rather than erroring and failing the build.
301+
case _: CompileFailed =>
302+
InterruptUtil.throwIfInterrupted(isCancelled)
303+
throw new AnnexWorkerError(-1)
293304
case e: ClassFormatError =>
294-
throw new Exception("You may be missing a `macro = True` attribute.", e)
295-
throw new AnnexWorkerError(1)
296-
case e: StackOverflowError => {
305+
InterruptUtil.throwIfInterrupted(isCancelled)
306+
throw new AnnexWorkerError(1, "You may be missing a `macro = True` attribute.", e)
307+
case e: StackOverflowError =>
297308
// Downgrade to NonFatal error.
298309
// The JVM is not guaranteed to free shared resources correctly when unwinding the stack to catch a StackOverflowError,
299310
// but since we don't share resources between work threads, this should be mostly safe for us for now.
300311
// If Bazel could better handle the worker shutting down suddenly, we could allow this to be caught by
301312
// the UncaughtExceptionHandler in WorkerMain, and exit the entire process to be safe.
302-
throw new Error("StackOverflowError", e)
303-
}
313+
InterruptUtil.throwIfInterrupted(isCancelled)
314+
throw new AnnexWorkerError(1, "StackOverflowError", e)
315+
case NonFatal(e) =>
316+
InterruptUtil.throwIfInterrupted(isCancelled)
317+
throw e
304318
}
305319

306-
InterruptUtil.throwIfInterrupted()
320+
InterruptUtil.throwIfInterrupted(isCancelled)
307321

308322
// create analyses
309323
val pathString = analysisStorePath.toAbsolutePath().normalize().toString()
@@ -395,7 +409,7 @@ object ZincRunner extends WorkerMain[ZincRunnerWorkerConfig] {
395409
FileUtil.delete(tmpDir)
396410
Files.createDirectory(tmpDir)
397411

398-
InterruptUtil.throwIfInterrupted()
412+
InterruptUtil.throwIfInterrupted(isCancelled)
399413
}
400414
}
401415

0 commit comments

Comments
 (0)