From f57f9f7f61f08668208aff89673cc3685ee55d00 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Wi=C4=85cek?= Date: Wed, 11 Jun 2025 17:43:57 +0200 Subject: [PATCH] add tracing and try to nest it correctly --- .../bloop/engine/tasks/CompileTask.scala | 614 ++++++++--------- .../tasks/compilation/CompileGatekeeper.scala | 348 ++++++---- .../tasks/compilation/CompileGraph.scala | 624 +++++++++--------- 3 files changed, 847 insertions(+), 739 deletions(-) diff --git a/frontend/src/main/scala/bloop/engine/tasks/CompileTask.scala b/frontend/src/main/scala/bloop/engine/tasks/CompileTask.scala index daeb365bb0..8c9754a1e4 100644 --- a/frontend/src/main/scala/bloop/engine/tasks/CompileTask.scala +++ b/frontend/src/main/scala/bloop/engine/tasks/CompileTask.scala @@ -47,6 +47,7 @@ import xsbti.compile.PreviousResult object CompileTask { private implicit val logContext: DebugFilter = DebugFilter.Compilation + def compile[UseSiteLogger <: Logger]( state: State, dag: Dag[Project], @@ -89,296 +90,305 @@ object CompileTask { "client" -> clientName ) - def compile( - graphInputs: CompileGraph.Inputs, - isBestEffort: Boolean, - isBestEffortDep: Boolean - ): Task[ResultBundle] = { - val bundle = graphInputs.bundle - val project = bundle.project - val logger = bundle.logger - val reporter = bundle.reporter - val previousResult = bundle.latestResult - val compileOut = bundle.out - val lastSuccessful = bundle.lastSuccessful - val compileProjectTracer = rootTracer.startNewChildTracer( - s"compile ${project.name}", - "compile.target" -> project.name - ) + rootTracer.trace("CompileTask.compile") { tracer => + def compile( + graphInputs: CompileGraph.Inputs, + isBestEffort: Boolean, + isBestEffortDep: Boolean, + tracer: BraveTracer + ): Task[ResultBundle] = tracer.trace("CompileTask.compile - inner") { tracer => + val bundle = graphInputs.bundle + val project = bundle.project + val logger = bundle.logger + val reporter = bundle.reporter + val previousResult = bundle.latestResult + val compileOut = bundle.out + val lastSuccessful = bundle.lastSuccessful + val compileProjectTracer = tracer.startNewChildTracer( + s"compile ${project.name}", + "compile.target" -> project.name + ) - bundle.prepareSourcesAndInstance match { - case Left(earlyResultBundle) => - compileProjectTracer.terminate() - Task.now(earlyResultBundle) - case Right(CompileSourcesAndInstance(sources, instance, _)) => - val readOnlyClassesDir = lastSuccessful.classesDir - val newClassesDir = compileOut.internalNewClassesDir - val classpath = bundle.dependenciesData.buildFullCompileClasspathFor( - project, - readOnlyClassesDir, - newClassesDir - ) + bundle.prepareSourcesAndInstance match { + case Left(earlyResultBundle) => + compileProjectTracer.terminate() + Task.now(earlyResultBundle) + case Right(CompileSourcesAndInstance(sources, instance, _)) => + val readOnlyClassesDir = lastSuccessful.classesDir + val newClassesDir = compileOut.internalNewClassesDir + val classpath = bundle.dependenciesData.buildFullCompileClasspathFor( + project, + readOnlyClassesDir, + newClassesDir + ) - // Warn user if detected missing dep, see https://github.com/scalacenter/bloop/issues/708 - state.build.hasMissingDependencies(project).foreach { missing => - Feedback - .detectMissingDependencies(project.name, missing) - .foreach(msg => logger.warn(msg)) - } + // Warn user if detected missing dep, see https://github.com/scalacenter/bloop/issues/708 + state.build.hasMissingDependencies(project).foreach { missing => + Feedback + .detectMissingDependencies(project.name, missing) + .foreach(msg => logger.warn(msg)) + } - val configuration = configureCompilation(project) - val newScalacOptions = { - CompilerPluginAllowlist - .enableCachingInScalacOptions( - instance.version, - configuration.scalacOptions, + val configuration = configureCompilation(project) + val newScalacOptions = { + CompilerPluginAllowlist + .enableCachingInScalacOptions( + instance.version, + configuration.scalacOptions, + logger, + compileProjectTracer, + 5 + ) + } + + val inputs = newScalacOptions.map { newScalacOptions => + CompileInputs( + instance, + state.compilerCache, + sources.toArray, + classpath, + bundle.uniqueInputs, + compileOut, + project.out, + newScalacOptions.toArray, + project.javacOptions.toArray, + project.compileJdkConfig.flatMap(_.javacBin), + project.compileOrder, + project.classpathOptions, + lastSuccessful.previous, + previousResult, + reporter, logger, + graphInputs.dependentResults, + cancelCompilation, compileProjectTracer, - 5 + ExecutionContext.ioScheduler, + ExecutionContext.ioExecutor, + bundle.dependenciesData.allInvalidatedClassFiles, + bundle.dependenciesData.allGeneratedClassFilePaths, + project.runtimeResources ) - } - - val inputs = newScalacOptions.map { newScalacOptions => - CompileInputs( - instance, - state.compilerCache, - sources.toArray, - classpath, - bundle.uniqueInputs, - compileOut, - project.out, - newScalacOptions.toArray, - project.javacOptions.toArray, - project.compileJdkConfig.flatMap(_.javacBin), - project.compileOrder, - project.classpathOptions, - lastSuccessful.previous, - previousResult, - reporter, - logger, - graphInputs.dependentResults, - cancelCompilation, - compileProjectTracer, - ExecutionContext.ioScheduler, - ExecutionContext.ioExecutor, - bundle.dependenciesData.allInvalidatedClassFiles, - bundle.dependenciesData.allGeneratedClassFilePaths, - project.runtimeResources - ) - } - - val waitOnReadClassesDir = { - compileProjectTracer.traceTaskVerbose("wait on populating products") { _ => - // This task is memoized and started by the compilation that created - // it, so this execution blocks until it's run or completes right away - lastSuccessful.populatingProducts } - } - // Block on the task associated with this result that sets up the read-only classes dir - waitOnReadClassesDir.flatMap { _ => - // Only when the task is finished, we kickstart the compilation - def compile(inputs: CompileInputs) = { - val firstResult = - Compiler.compile(inputs, isBestEffort, isBestEffortDep, firstCompilation = true) - firstResult.flatMap { - case result @ Compiler.Result.Failed( - _, - _, - _, - _, - Some(BestEffortProducts(_, _, recompile)) - ) if recompile => - // we restart the compilation, starting from scratch (without any previous artifacts) - inputs.reporter.reset() - val emptyResult = - PreviousResult.of(Optional.empty[CompileAnalysis], Optional.empty[MiniSetup]) - val nonIncrementalClasspath = - inputs.classpath.filter(_ != inputs.out.internalReadOnlyClassesDir) - val newInputs = inputs.copy( - sources = inputs.sources, - classpath = nonIncrementalClasspath, - previousCompilerResult = result, - previousResult = emptyResult - ) - Compiler.compile( - newInputs, - isBestEffort, - isBestEffortDep, - firstCompilation = false - ) - case result => Task(result) + val waitOnReadClassesDir = { + compileProjectTracer.traceTaskVerbose("wait on populating products") { _ => + // This task is memoized and started by the compilation that created + // it, so this execution blocks until it's run or completes right away + lastSuccessful.populatingProducts } } - inputs.flatMap(inputs => compile(inputs)).map { result => - def runPostCompilationTasks( - backgroundTasks: CompileBackgroundTasks - ): CancelableFuture[Unit] = { - // Post compilation tasks use tracer, so terminate right after they have - val postCompilationTasks = - backgroundTasks - .trigger( - bundle.clientClassesObserver, - reporter.underlying, - compileProjectTracer, - logger + + // Block on the task associated with this result that sets up the read-only classes dir + waitOnReadClassesDir.flatMap { _ => + // Only when the task is finished, we kickstart the compilation + def compile(inputs: CompileInputs) = { + val firstResult = + Compiler.compile(inputs, isBestEffort, isBestEffortDep, firstCompilation = true) + firstResult.flatMap { + case result @ Compiler.Result.Failed( + _, + _, + _, + _, + Some(BestEffortProducts(_, _, recompile)) + ) if recompile => + // we restart the compilation, starting from scratch (without any previous artifacts) + inputs.reporter.reset() + val emptyResult = + PreviousResult.of(Optional.empty[CompileAnalysis], Optional.empty[MiniSetup]) + val nonIncrementalClasspath = + inputs.classpath.filter(_ != inputs.out.internalReadOnlyClassesDir) + val newInputs = inputs.copy( + sources = inputs.sources, + classpath = nonIncrementalClasspath, + previousCompilerResult = result, + previousResult = emptyResult ) - .doOnFinish(_ => Task(compileProjectTracer.terminate())) - postCompilationTasks.runAsync(ExecutionContext.ioScheduler) + Compiler.compile( + newInputs, + isBestEffort, + isBestEffortDep, + firstCompilation = false + ) + case result => Task(result) + } } - // Populate the last successful result if result was success - result match { - case s: Compiler.Result.Success => - val runningTasks = runPostCompilationTasks(s.backgroundTasks) - val blockingOnRunningTasks = Task - .fromFuture(runningTasks) - .executeOn(ExecutionContext.ioScheduler) - val populatingTask = { - if (s.isNoOp) blockingOnRunningTasks // Task.unit - else { - for { - _ <- blockingOnRunningTasks - _ <- populateNewReadOnlyClassesDir(s.products, bgTracer, rawLogger) - .doOnFinish(_ => Task(bgTracer.terminate())) - } yield () + inputs.flatMap(inputs => compile(inputs)).map { result => + def runPostCompilationTasks( + backgroundTasks: CompileBackgroundTasks + ): CancelableFuture[Unit] = { + // Post compilation tasks use tracer, so terminate right after they have + val postCompilationTasks = + backgroundTasks + .trigger( + bundle.clientClassesObserver, + reporter.underlying, + compileProjectTracer, + logger + ) + .doOnFinish(_ => Task(compileProjectTracer.terminate())) + postCompilationTasks.runAsync(ExecutionContext.ioScheduler) + } + + // Populate the last successful result if result was success + result match { + case s: Compiler.Result.Success => + val runningTasks = runPostCompilationTasks(s.backgroundTasks) + val blockingOnRunningTasks = Task + .fromFuture(runningTasks) + .executeOn(ExecutionContext.ioScheduler) + val populatingTask = { + if (s.isNoOp) blockingOnRunningTasks // Task.unit + else { + for { + _ <- blockingOnRunningTasks + _ <- populateNewReadOnlyClassesDir(s.products, bgTracer, rawLogger) + .doOnFinish(_ => Task(bgTracer.terminate())) + } yield () + } } - } - - // Memoize so that no matter how many times it's run, it's executed only once - val newSuccessful = - LastSuccessfulResult(bundle.uniqueInputs, s.products, populatingTask.memoize) - ResultBundle(s, Some(newSuccessful), Some(lastSuccessful), runningTasks) - case f: Compiler.Result.Failed => - val runningTasks = runPostCompilationTasks(f.backgroundTasks) - ResultBundle(result, None, Some(lastSuccessful), runningTasks) - case c: Compiler.Result.Cancelled => - val runningTasks = runPostCompilationTasks(c.backgroundTasks) - ResultBundle(result, None, Some(lastSuccessful), runningTasks) - case _: Compiler.Result.Blocked | Compiler.Result.Empty | - _: Compiler.Result.GlobalError => - ResultBundle(result, None, None, CancelableFuture.unit) + + // Memoize so that no matter how many times it's run, it's executed only once + val newSuccessful = + LastSuccessfulResult(bundle.uniqueInputs, s.products, populatingTask.memoize) + ResultBundle(s, Some(newSuccessful), Some(lastSuccessful), runningTasks) + case f: Compiler.Result.Failed => + val runningTasks = runPostCompilationTasks(f.backgroundTasks) + ResultBundle(result, None, Some(lastSuccessful), runningTasks) + case c: Compiler.Result.Cancelled => + val runningTasks = runPostCompilationTasks(c.backgroundTasks) + ResultBundle(result, None, Some(lastSuccessful), runningTasks) + case _: Compiler.Result.Blocked | Compiler.Result.Empty | + _: Compiler.Result.GlobalError => + ResultBundle(result, None, None, CancelableFuture.unit) + } } } - } - } - } - - def setup(inputs: CompileDefinitions.BundleInputs): Task[CompileBundle] = { - // Create a multicast observable stream to allow multiple mirrors of loggers - val (observer, obs) = { - Observable.multicast[Either[ReporterAction, LoggerAction]]( - MulticastStrategy.replay - )(ExecutionContext.ioScheduler) - } - - // Compute the previous and last successful results from the results cache - import inputs.project - val (prev, last) = { - if (pipeline) { - val emptySuccessful = LastSuccessfulResult.empty(project) - // Disable incremental compilation if pipelining is enabled - Compiler.Result.Empty -> emptySuccessful - } else { - // Use last successful from user cache, only useful if this is its first - // compilation, otherwise we use the last successful from [[CompileGraph]] - val latestResult = state.results.latestResult(project) - val lastSuccessful = state.results.lastSuccessfulResultOrEmpty(project) - latestResult -> lastSuccessful } } - val t = rootTracer - val o = state.commonOptions - val cancel = cancelCompilation - val logger = ObservedLogger(rawLogger, observer) - val clientClassesObserver = state.client.getClassesObserverFor(inputs.project) - val underlying = createReporter(ReporterInputs(inputs.project, cwd, rawLogger)) - val reporter = new ObservedReporter(logger, underlying) - val sourceGeneratorCache = state.sourceGeneratorCache - CompileBundle.computeFrom( - inputs, - sourceGeneratorCache, - clientClassesObserver, - reporter, - last, - prev, - cancel, - logger, - obs, - t, - o - ) - } + def setup( + inputs: CompileDefinitions.BundleInputs, + tracer: BraveTracer + ): Task[CompileBundle] = { + // Create a multicast observable stream to allow multiple mirrors of loggers + val (observer, obs) = { + Observable.multicast[Either[ReporterAction, LoggerAction]]( + MulticastStrategy.replay + )(ExecutionContext.ioScheduler) + } - val client = state.client - CompileGraph.traverse(dag, client, store, bestEffortAllowed, setup(_), compile).flatMap { - pdag => - val partialResults = Dag.dfs(pdag, mode = Dag.PreOrder) - val finalResults = partialResults.map(r => PartialCompileResult.toFinalResult(r)) - Task.gatherUnordered(finalResults).map(_.flatten).flatMap { results => - val cleanUpTasksToRunInBackground = - markUnusedClassesDirAndCollectCleanUpTasks(results, state, rawLogger) - - val failures = results.flatMap { - case FinalNormalCompileResult(p, results) => - results.fromCompiler match { - case Compiler.Result.NotOk(_) => List(p) - // Consider success with reported fatal warnings as error to simulate -Xfatal-warnings - case s: Compiler.Result.Success if s.reportedFatalWarnings => List(p) - case _ => Nil - } - case _ => Nil + // Compute the previous and last successful results from the results cache + import inputs.project + val (prev, last) = { + if (pipeline) { + val emptySuccessful = LastSuccessfulResult.empty(project) + // Disable incremental compilation if pipelining is enabled + Compiler.Result.Empty -> emptySuccessful + } else { + // Use last successful from user cache, only useful if this is its first + // compilation, otherwise we use the last successful from [[CompileGraph]] + val latestResult = state.results.latestResult(project) + val lastSuccessful = state.results.lastSuccessfulResultOrEmpty(project) + latestResult -> lastSuccessful } + } - val newState: State = { - val stateWithResults = state.copy(results = state.results.addFinalResults(results)) - if (failures.isEmpty) { - stateWithResults.copy(status = ExitStatus.Ok) - } else { - results.foreach { - case FinalNormalCompileResult.HasException(project, err) => - val errMsg = err.fold(identity, Logger.prettyPrintException) - rawLogger.error(s"Unexpected error when compiling ${project.name}: $errMsg") - case _ => () // Do nothing when the final compilation result is not an actual error - } + val t = tracer + val o = state.commonOptions + val cancel = cancelCompilation + val logger = ObservedLogger(rawLogger, observer) + val clientClassesObserver = state.client.getClassesObserverFor(inputs.project) + val underlying = createReporter(ReporterInputs(inputs.project, cwd, rawLogger)) + val reporter = new ObservedReporter(logger, underlying) + val sourceGeneratorCache = state.sourceGeneratorCache + CompileBundle.computeFrom( + inputs, + sourceGeneratorCache, + clientClassesObserver, + reporter, + last, + prev, + cancel, + logger, + obs, + t, + o + ) + } - client match { - case _: ClientInfo.CliClientInfo => - // Reverse list of failed projects to get ~correct order of failure - val projectsFailedToCompile = failures.map(p => s"'${p.name}'").reverse - val failureMessage = - if (failures.size <= 2) projectsFailedToCompile.mkString(",") - else { - s"${projectsFailedToCompile.take(2).mkString(", ")} and ${projectsFailedToCompile.size - 2} more projects" - } + val client = state.client + CompileGraph + .traverse(dag, client, store, bestEffortAllowed, setup, compile, tracer) + .flatMap { pdag => + val partialResults = Dag.dfs(pdag, mode = Dag.PreOrder) + val finalResults = partialResults.map(r => PartialCompileResult.toFinalResult(r)) + Task.gatherUnordered(finalResults).map(_.flatten).flatMap { results => + val cleanUpTasksToRunInBackground = + markUnusedClassesDirAndCollectCleanUpTasks(results, state, tracer, rawLogger) + + val failures = results.flatMap { + case FinalNormalCompileResult(p, results) => + results.fromCompiler match { + case Compiler.Result.NotOk(_) => List(p) + // Consider success with reported fatal warnings as error to simulate -Xfatal-warnings + case s: Compiler.Result.Success if s.reportedFatalWarnings => List(p) + case _ => Nil + } + case _ => Nil + } - rawLogger.error("Failed to compile " + failureMessage) - case _: ClientInfo.BspClientInfo => () // Don't report if bsp client + val newState: State = { + val stateWithResults = state.copy(results = state.results.addFinalResults(results)) + if (failures.isEmpty) { + stateWithResults.copy(status = ExitStatus.Ok) + } else { + results.foreach { + case FinalNormalCompileResult.HasException(project, err) => + val errMsg = err.fold(identity, Logger.prettyPrintException) + rawLogger.error(s"Unexpected error when compiling ${project.name}: $errMsg") + case _ => + () // Do nothing when the final compilation result is not an actual error + } + + client match { + case _: ClientInfo.CliClientInfo => + // Reverse list of failed projects to get ~correct order of failure + val projectsFailedToCompile = failures.map(p => s"'${p.name}'").reverse + val failureMessage = + if (failures.size <= 2) projectsFailedToCompile.mkString(",") + else { + s"${projectsFailedToCompile.take(2).mkString(", ")} and ${projectsFailedToCompile.size - 2} more projects" + } + + rawLogger.error("Failed to compile " + failureMessage) + case _: ClientInfo.BspClientInfo => () // Don't report if bsp client + } + + stateWithResults.copy(status = ExitStatus.CompilationError) } - - stateWithResults.copy(status = ExitStatus.CompilationError) } - } - // Schedule to run clean-up tasks in the background - runIOTasksInParallel(cleanUpTasksToRunInBackground) + // Schedule to run clean-up tasks in the background + runIOTasksInParallel(cleanUpTasksToRunInBackground) - val runningTasksRequiredForCorrectness = Task.sequence { - results.flatMap { - case FinalNormalCompileResult(_, result) => - val tasksAtEndOfBuildCompilation = - Task.fromFuture(result.runningBackgroundTasks) - List(tasksAtEndOfBuildCompilation) - case _ => Nil + val runningTasksRequiredForCorrectness = Task.sequence { + results.flatMap { + case FinalNormalCompileResult(_, result) => + val tasksAtEndOfBuildCompilation = + Task.fromFuture(result.runningBackgroundTasks) + List(tasksAtEndOfBuildCompilation) + case _ => Nil + } } - } - // Block on all background task that are running and are required for correctness - runningTasksRequiredForCorrectness - .executeOn(ExecutionContext.ioScheduler) - .map(_ => newState) - .doOnFinish(_ => Task(rootTracer.terminate())) + // Block on all background task that are running and are required for correctness + runningTasksRequiredForCorrectness + .executeOn(ExecutionContext.ioScheduler) + .map(_ => newState) + .doOnFinish(_ => Task(rootTracer.terminate())) + } } } } @@ -420,36 +430,44 @@ object CompileTask { private def markUnusedClassesDirAndCollectCleanUpTasks( results: List[FinalCompileResult], previousState: State, + tracer: BraveTracer, logger: Logger - ): List[Task[Unit]] = { - val cleanUpTasksToSpawnInBackground = mutable.ListBuffer[Task[Unit]]() - results.foreach { finalResult => - val resultBundle = finalResult.result - val newSuccessful = resultBundle.successful - val compilerResult = resultBundle.fromCompiler - val previousResult = - finalResult match { - case FinalNormalCompileResult(p, _) => - previousState.results.all.get(p) - case _ => None + ): List[Task[Unit]] = + tracer.trace("markUnusedClassesDirAndCollectCleanUpTasks") { _ => + val cleanUpTasksToSpawnInBackground = mutable.ListBuffer[Task[Unit]]() + results.foreach { finalResult => + val resultBundle = finalResult.result + val newSuccessful = resultBundle.successful + val compilerResult = resultBundle.fromCompiler + val previousResult = + finalResult match { + case FinalNormalCompileResult(p, _) => + previousState.results.all.get(p) + case _ => None + } + val populateNewProductsTask = newSuccessful.map(_.populatingProducts).getOrElse(Task.unit) + val cleanUpPreviousLastSuccessful = resultBundle.previous match { + case None => populateNewProductsTask + case Some(previousSuccessful) => + for { + _ <- previousSuccessful.populatingProducts + _ <- populateNewProductsTask + _ <- cleanUpPreviousResult( + previousSuccessful, + previousResult, + compilerResult, + tracer, + logger + ) + } yield () } - val populateNewProductsTask = newSuccessful.map(_.populatingProducts).getOrElse(Task.unit) - val cleanUpPreviousLastSuccessful = resultBundle.previous match { - case None => populateNewProductsTask - case Some(previousSuccessful) => - for { - _ <- previousSuccessful.populatingProducts - _ <- populateNewProductsTask - _ <- cleanUpPreviousResult(previousSuccessful, previousResult, compilerResult, logger) - } yield () + + cleanUpTasksToSpawnInBackground.+=(cleanUpPreviousLastSuccessful) } - cleanUpTasksToSpawnInBackground.+=(cleanUpPreviousLastSuccessful) + cleanUpTasksToSpawnInBackground.toList } - cleanUpTasksToSpawnInBackground.toList - } - def runIOTasksInParallel[T]( tasks: Traversable[Task[T]], parallelUnits: Int = Runtime.getRuntime().availableProcessors() @@ -482,8 +500,9 @@ object CompileTask { previousSuccessful: LastSuccessfulResult, previousResult: Option[Compiler.Result], compilerResult: Compiler.Result, + tracer: BraveTracer, logger: Logger - ): Task[Unit] = { + ): Task[Unit] = tracer.trace("clean up previous result") { implicit tracer => val previousClassesDir = previousSuccessful.classesDir val currentlyUsedCounter = previousSuccessful.counterForClassesDir.decrementAndGet(1) @@ -496,7 +515,9 @@ object CompileTask { logger.debug(s"Skipping delete of empty classes dir ${previousClassesDir}") None } else if (currentlyUsedCounter != 0) { - logger.debug(s"Skipping delete of $previousClassesDir, counter is $currentlyUsedCounter") + logger.debug( + s"Skipping delete of $previousClassesDir, counter is $currentlyUsedCounter" + ) None } else { val newClassesDir = products.newClassesDir @@ -527,15 +548,20 @@ object CompileTask { case _ => None } - def deleteOrphanDir(orphanDir: Option[AbsolutePath]): Task[Unit] = - orphanDir match { - case None => Task.unit - case Some(classesDir) => - Task.eval { - logger.debug(s"Deleting contents of orphan dir $classesDir") - BloopPaths.delete(classesDir) - }.asyncBoundary + def deleteOrphanDir(orphanDir: Option[AbsolutePath])(implicit + tracer: BraveTracer + ): Task[Unit] = { + tracer.trace("delete orphan dir") { _ => + orphanDir match { + case None => Task.unit + case Some(classesDir) => + Task.eval { + logger.debug(s"Deleting contents of orphan dir $classesDir") + BloopPaths.delete(classesDir) + }.asyncBoundary + } } + } Task .gatherUnordered( diff --git a/frontend/src/main/scala/bloop/engine/tasks/compilation/CompileGatekeeper.scala b/frontend/src/main/scala/bloop/engine/tasks/compilation/CompileGatekeeper.scala index 339135343c..ac682d6ba6 100644 --- a/frontend/src/main/scala/bloop/engine/tasks/compilation/CompileGatekeeper.scala +++ b/frontend/src/main/scala/bloop/engine/tasks/compilation/CompileGatekeeper.scala @@ -1,24 +1,19 @@ package bloop.engine.tasks.compilation -import java.util.concurrent.ConcurrentHashMap - -import bloop.Compiler -import bloop.UniqueCompileInputs -import bloop.data.ClientInfo -import bloop.data.Project +import bloop.{Compiler, UniqueCompileInputs} +import bloop.data.{ClientInfo, Project} import bloop.engine.Dag import bloop.engine.caches.LastSuccessfulResult import bloop.io.AbsolutePath -import bloop.logging.DebugFilter -import bloop.logging.Logger -import bloop.logging.LoggerAction +import bloop.logging.{DebugFilter, Logger, LoggerAction} import bloop.reporter.ReporterAction import bloop.task.Task - -import monix.execution.atomic.AtomicBoolean -import monix.execution.atomic.AtomicInt +import bloop.tracing.BraveTracer +import monix.execution.atomic.{AtomicBoolean, AtomicInt} import monix.reactive.Observable +import java.util.concurrent.ConcurrentHashMap + object CompileGatekeeper { private implicit val filter: DebugFilter = DebugFilter.Compilation import bloop.engine.tasks.compilation.CompileDefinitions._ @@ -43,49 +38,87 @@ object CompileGatekeeper { inputs: BundleInputs, bundle: SuccessfulCompileBundle, client: ClientInfo, - compile: SuccessfulCompileBundle => CompileTraversal - ): (RunningCompilation, CanBeDeduplicated) = { - var deduplicate = true - - val running = runningCompilations.compute( - bundle.uniqueInputs, - (_: UniqueCompileInputs, running: RunningCompilation) => { - if (running == null) { - deduplicate = false - scheduleCompilation(inputs, bundle, client, compile) - } else { - val usedClassesDir = running.usedLastSuccessful.classesDir - val usedClassesDirCounter = running.usedLastSuccessful.counterForClassesDir - - usedClassesDirCounter.getAndTransform { count => - if (count == 0) { - // Abort deduplication, dir is scheduled to be deleted in background + compile: SuccessfulCompileBundle => CompileTraversal, + tracer: BraveTracer + ): (RunningCompilation, CanBeDeduplicated) = + tracer.trace("Finding compilation atomically.") { tracer => + import bundle.logger + var deduplicate = true + + val running = runningCompilations.compute( + bundle.uniqueInputs, + (_: UniqueCompileInputs, running: RunningCompilation) => { + if (running == null) { + tracer.trace( + "no running compilation found starting new one", + ("uniqueInputs", bundle.uniqueInputs.toString) + ) { tracer => + logger.debug(s"no running compilation found starting new one:${bundle.uniqueInputs}") deduplicate = false - // Remove from map of used classes dirs in case it hasn't already been - currentlyUsedClassesDirs.remove(usedClassesDir, usedClassesDirCounter) - // Return previous count, this counter will soon be deallocated - count - } else { - // Increase count to prevent other compiles to schedule its deletion - count + 1 + scheduleCompilation(inputs, bundle, client, compile, tracer) } - } + } else { + tracer.trace( + "Found matching compilation", + ("uniqueInputs", bundle.uniqueInputs.toString) + ) { tracer => + val usedClassesDir = running.usedLastSuccessful.classesDir + val usedClassesDirCounter = running.usedLastSuccessful.counterForClassesDir + + usedClassesDirCounter.getAndTransform { count => + if (count == 0) { + tracer.trace( + "Aborting deduplication", + ("uniqueInputs", bundle.uniqueInputs.toString) + ) { tracer => + logger.debug( + s"Abort deduplication, dir is scheduled to be deleted in background:${bundle.uniqueInputs}" + ) + // Abort deduplication, dir is scheduled to be deleted in background + deduplicate = false + // Remove from map of used classes dirs in case it hasn't already been + currentlyUsedClassesDirs.remove(usedClassesDir, usedClassesDirCounter) + // Return previous count, this counter will soon be deallocated + count + } + } else { + tracer.trace( + "Increasing compilation counter", + ("uniqueInputs", bundle.uniqueInputs.toString) + ) { tracer => + logger.debug( + s"Increase count to prevent other compiles to schedule its deletion:${bundle.uniqueInputs}" + ) + // Increase count to prevent other compiles to schedule its deletion + count + 1 + } + } + } - if (deduplicate) running - else scheduleCompilation(inputs, bundle, client, compile) + if (deduplicate) running + else scheduleCompilation(inputs, bundle, client, compile, tracer) + } + } } - } - ) + ) - (running, deduplicate) - } + (running, deduplicate) + } def disconnectDeduplicationFromRunning( inputs: UniqueCompileInputs, - runningCompilation: RunningCompilation + runningCompilation: RunningCompilation, + logger: Logger, + tracer: BraveTracer ): Unit = { - runningCompilation.isUnsubscribed.compareAndSet(false, true) - runningCompilations.remove(inputs, runningCompilation); () + tracer.trace( + "disconnectDeduplicationFromRunning", + ("uniqueInputs", inputs.toString) + ) { _ => + logger.debug(s"Disconnected deduplication from running compilation:${inputs}") + runningCompilation.isUnsubscribed.compareAndSet(false, true) + runningCompilations.remove(inputs, runningCompilation); () + } } /** @@ -95,125 +128,154 @@ object CompileGatekeeper { * inputs. The call-site ensures that only one compilation can exist for the * same inputs for a period of time. */ - def scheduleCompilation( + private def scheduleCompilation( inputs: BundleInputs, bundle: SuccessfulCompileBundle, client: ClientInfo, - compile: SuccessfulCompileBundle => CompileTraversal - ): RunningCompilation = { - import inputs.project - import bundle.logger - import logger.debug - - var counterForUsedClassesDir: AtomicInt = null - - def initializeLastSuccessful(previousOrNull: LastSuccessfulResult): LastSuccessfulResult = { - val result = Option(previousOrNull).getOrElse(bundle.lastSuccessful) - if (!result.classesDir.exists) { - debug(s"Ignoring analysis for ${project.name}, directory ${result.classesDir} is missing") - LastSuccessfulResult.empty(inputs.project) - } else if (bundle.latestResult == Compiler.Result.Empty) { - debug(s"Ignoring existing analysis for ${project.name}, last result was empty") - LastSuccessfulResult - .empty(inputs.project) - // Replace classes dir, counter and populating with values from previous for correctness - .copy( - classesDir = result.classesDir, - counterForClassesDir = result.counterForClassesDir, - populatingProducts = result.populatingProducts - ) - } else { - debug(s"Using successful result for ${project.name} associated with ${result.classesDir}") - result - } - } + compile: SuccessfulCompileBundle => CompileTraversal, + tracer: BraveTracer + ): RunningCompilation = + tracer.trace("schedule compilation") { _ => + import bundle.logger + import inputs.project + + var counterForUsedClassesDir: AtomicInt = null + + def initializeLastSuccessful(previousOrNull: LastSuccessfulResult): LastSuccessfulResult = + tracer.trace(s"initialize last successful") { _ => + val result = Option(previousOrNull).getOrElse(bundle.lastSuccessful) + if (!result.classesDir.exists) { + logger.debug( + s"Ignoring analysis for ${project.name}, directory ${result.classesDir} is missing" + ) + LastSuccessfulResult.empty(inputs.project) + } else if (bundle.latestResult == Compiler.Result.Empty) { + logger.debug(s"Ignoring existing analysis for ${project.name}, last result was empty") + LastSuccessfulResult + .empty(inputs.project) + // Replace classes dir, counter and populating with values from previous for correctness + .copy( + classesDir = result.classesDir, + counterForClassesDir = result.counterForClassesDir, + populatingProducts = result.populatingProducts + ) + } else { + logger.debug( + s"Using successful result for ${project.name} associated with ${result.classesDir}" + ) + result + } + } - def getMostRecentSuccessfulResultAtomically = { - lastSuccessfulResults.compute( - project.uniqueId, - (_: String, previousResultOrNull: LastSuccessfulResult) => { - // Return previous result or the initial last successful coming from the bundle - val previousResult = initializeLastSuccessful(previousResultOrNull) - - currentlyUsedClassesDirs.compute( - previousResult.classesDir, - (_: AbsolutePath, counter: AtomicInt) => { - // Set counter for used classes dir when init or incrementing - if (counter == null) { - val initialCounter = AtomicInt(1) - counterForUsedClassesDir = initialCounter - initialCounter - } else { - counterForUsedClassesDir = counter - val newCount = counter.incrementAndGet(1) - logger.debug(s"Increasing counter for ${previousResult.classesDir} to $newCount") - counter - } + def getMostRecentSuccessfulResultAtomically = + tracer.trace("get most recent successful result atomically") { _ => + lastSuccessfulResults.compute( + project.uniqueId, + (_: String, previousResultOrNull: LastSuccessfulResult) => { + logger.debug( + s"Return previous result or the initial last successful coming from the bundle:${project.uniqueId}" + ) + // Return previous result or the initial last successful coming from the bundle + val previousResult = initializeLastSuccessful(previousResultOrNull) + + currentlyUsedClassesDirs.compute( + previousResult.classesDir, + (_: AbsolutePath, counter: AtomicInt) => { + logger.debug( + s"Set counter for used classes dir when init or incrementing:${previousResult.classesDir}" + ) + // Set counter for used classes dir when init or incrementing + if (counter == null) { + logger.debug(s"Create new counter:${previousResult.classesDir}") + val initialCounter = AtomicInt(1) + counterForUsedClassesDir = initialCounter + initialCounter + } else { + counterForUsedClassesDir = counter + val newCount = counter.incrementAndGet(1) + logger.debug( + s"Increasing counter for ${previousResult.classesDir} to $newCount" + ) + counter + } + } + ) + + previousResult.copy(counterForClassesDir = counterForUsedClassesDir) } ) - - previousResult.copy(counterForClassesDir = counterForUsedClassesDir) } - ) - } - logger.debug(s"Scheduling compilation for ${project.name}...") - - // Replace client-specific last successful with the most recent result - val mostRecentSuccessful = getMostRecentSuccessfulResultAtomically - - val isUnsubscribed = AtomicBoolean(false) - val newBundle = bundle.copy(lastSuccessful = mostRecentSuccessful) - val compileAndUnsubscribe = { - compile(newBundle) - .doOnFinish(_ => Task(logger.observer.onComplete())) - .map { result => - // Unregister deduplication atomically and register last successful if any - processResultAtomically( - result, - project, - bundle.uniqueInputs, - isUnsubscribed, - logger - ) - } - .memoize // Without memoization, there is no deduplication - } + logger.debug(s"Scheduling compilation for ${project.name}...") + + // Replace client-specific last successful with the most recent result + val mostRecentSuccessful = getMostRecentSuccessfulResultAtomically + + val isUnsubscribed = AtomicBoolean(false) + val newBundle = bundle.copy(lastSuccessful = mostRecentSuccessful) + val compileAndUnsubscribe = tracer.trace("compile and unsubscribe") { _ => + compile(newBundle) + .doOnFinish(_ => Task(logger.observer.onComplete())) + .map { result => + // Unregister deduplication atomically and register last successful if any + tracer.trace("process result atomically") { _ => + processResultAtomically( + result, + project, + bundle.uniqueInputs, + isUnsubscribed, + logger, + tracer + ) + } // Without memoization, there is no deduplication + } + .memoize + } - RunningCompilation( - compileAndUnsubscribe, - mostRecentSuccessful, - isUnsubscribed, - bundle.mirror, - client - ) - } + RunningCompilation( + compileAndUnsubscribe, + mostRecentSuccessful, + isUnsubscribed, + bundle.mirror, + client + ) + } private def processResultAtomically( resultDag: Dag[PartialCompileResult], project: Project, oinputs: UniqueCompileInputs, isAlreadyUnsubscribed: AtomicBoolean, - logger: Logger + logger: Logger, + tracer: BraveTracer ): Dag[PartialCompileResult] = { - def cleanUpAfterCompilationError[T](result: T): T = { - if (!isAlreadyUnsubscribed.get) { - // Remove running compilation if host compilation hasn't unsubscribed (maybe it's blocked) - runningCompilations.remove(oinputs) - } + def cleanUpAfterCompilationError[T](result: T): T = + tracer.trace("cleaning after compilation error") { _ => + if (!isAlreadyUnsubscribed.get) { + // Remove running compilation if host compilation hasn't unsubscribed (maybe it's blocked) + logger.debug( + s"Remove running compilation if host compilation hasn't unsubscribed (maybe it's blocked):${oinputs}" + ) + runningCompilations.remove(oinputs) + } - result - } + result + } // Unregister deduplication atomically and register last successful if any PartialCompileResult.mapEveryResult(resultDag) { case s: PartialSuccess => val processedResult = s.result.map { (result: ResultBundle) => result.successful match { - case None => cleanUpAfterCompilationError(result) + case None => + tracer.trace("cleaning after compilation error") { _ => + cleanUpAfterCompilationError(result) + } case Some(res) => - unregisterDeduplicationAndRegisterSuccessful(project, oinputs, res, logger) + tracer.trace("unregister deduplication and register successful") { _ => + unregisterDeduplicationAndRegisterSuccessful(project, oinputs, res, logger) + } } result } @@ -225,7 +287,10 @@ object CompileGatekeeper { */ s.copy(result = processedResult.memoize) - case result => cleanUpAfterCompilationError(result) + case result => + tracer.trace("cleaning after compilation error") { _ => + cleanUpAfterCompilationError(result) + } } } @@ -244,6 +309,7 @@ object CompileGatekeeper { runningCompilations.compute( oracleInputs, (_: UniqueCompileInputs, _: RunningCompilation) => { + logger.debug("Unregister deduplication and registered successfully") lastSuccessfulResults.compute(project.uniqueId, (_, _) => successful) null } diff --git a/frontend/src/main/scala/bloop/engine/tasks/compilation/CompileGraph.scala b/frontend/src/main/scala/bloop/engine/tasks/compilation/CompileGraph.scala index 1aab0af1ef..86cc87f7f5 100644 --- a/frontend/src/main/scala/bloop/engine/tasks/compilation/CompileGraph.scala +++ b/frontend/src/main/scala/bloop/engine/tasks/compilation/CompileGraph.scala @@ -4,13 +4,10 @@ package bloop.engine.tasks.compilation import java.io.File import java.io.PrintWriter import java.io.StringWriter - import scala.util.Failure import scala.util.Success - import ch.epfl.scala.bsp.StatusCode import ch.epfl.scala.bsp.{StatusCode => BspStatusCode} - import bloop.CompileBackgroundTasks import bloop.CompileExceptions.BlockURI import bloop.CompileExceptions.FailedOrCancelledPromise @@ -22,15 +19,14 @@ import bloop.engine.Dag import bloop.engine.ExecutionContext import bloop.engine.Leaf import bloop.engine.Parent -import bloop.engine.tasks.compilation.CompileDefinitions.CompileTraversal import bloop.logging.DebugFilter import bloop.logging.LoggerAction import bloop.reporter.ReporterAction import bloop.task.Task +import bloop.tracing.BraveTracer import bloop.util.BestEffortUtils.BestEffortProducts import bloop.util.JavaCompat.EnrichOptional import bloop.util.SystemProperties - import xsbti.compile.PreviousResult object CompileGraph { @@ -94,10 +90,11 @@ object CompileGraph { def setupAndDeduplicate( client: ClientInfo, inputs: BundleInputs, - setup: BundleInputs => Task[CompileBundle] + setup: (BundleInputs, BraveTracer) => Task[CompileBundle], + tracer: BraveTracer )( compile: SuccessfulCompileBundle => CompileTraversal - ): CompileTraversal = { + ): CompileTraversal = tracer.trace("setupAndDeduplicate") { tracer => def partialFailure( errorMsg: String, err: Option[Throwable] @@ -110,8 +107,8 @@ object CompileGraph { } implicit val filter = DebugFilter.Compilation - def withBundle(f: SuccessfulCompileBundle => CompileTraversal): CompileTraversal = { - setup(inputs).materialize.flatMap { + def withBundle(f: SuccessfulCompileBundle => CompileTraversal): CompileTraversal = tracer.trace("withBundle") { tracer => + setup(inputs, tracer).materialize.flatMap { case Success(bundle: SuccessfulCompileBundle) => f(bundle).materialize.flatMap { case Success(result) => Task.now(result) @@ -134,220 +131,232 @@ object CompileGraph { withBundle { bundle0 => val logger = bundle0.logger val (runningCompilation, deduplicate) = - CompileGatekeeper.findRunningCompilationAtomically(inputs, bundle0, client, compile) + CompileGatekeeper.findRunningCompilationAtomically(inputs, bundle0, client, compile, tracer) val bundle = bundle0.copy(lastSuccessful = runningCompilation.usedLastSuccessful) if (!deduplicate) { - runningCompilation.traversal - } else { - val rawLogger = logger.underlying - rawLogger.info( - s"Deduplicating compilation of ${bundle.project.name} from ${runningCompilation.client}" - ) - val reporter = bundle.reporter.underlying - // Don't use `bundle.lastSuccessful`, it's not the final input to `compile` - val analysis = runningCompilation.usedLastSuccessful.previous.analysis().toOption - val previousSuccessfulProblems = - Compiler.previousProblemsFromSuccessfulCompilation(analysis) - val wasPreviousSuccessful = bundle.latestResult match { - case Compiler.Result.Ok(_) => true - case _ => false + tracer.trace("traversing compilation") { tracer => + runningCompilation.traversal } - val previousProblems = - Compiler.previousProblemsFromResult(bundle.latestResult, previousSuccessfulProblems) - - val clientClassesObserver = client.getClassesObserverFor(bundle.project) - - // Replay events asynchronously to waiting for the compilation result - import scala.concurrent.duration.FiniteDuration - import monix.execution.exceptions.UpstreamTimeoutException - val disconnectionTime = SystemProperties.getCompileDisconnectionTime(rawLogger) - val replayEventsTask = runningCompilation.mirror - .timeoutOnSlowUpstream(disconnectionTime) - .foreachL { - case Left(action) => - action match { - case ReporterAction.EnableFatalWarnings => - reporter.enableFatalWarnings() - case ReporterAction.ReportStartCompilation => - reporter.reportStartCompilation(previousProblems, wasPreviousSuccessful) - case a: ReporterAction.ReportStartIncrementalCycle => - reporter.reportStartIncrementalCycle(a.sources, a.outputDirs) - case a: ReporterAction.ReportProblem => reporter.log(a.problem) - case ReporterAction.PublishDiagnosticsSummary => - reporter.printSummary() - case a: ReporterAction.ReportNextPhase => - reporter.reportNextPhase(a.phase, a.sourceFile) - case a: ReporterAction.ReportCompilationProgress => - reporter.reportCompilationProgress(a.progress, a.total) - case a: ReporterAction.ReportEndIncrementalCycle => - reporter.reportEndIncrementalCycle(a.durationMs, a.result) - case ReporterAction.ReportCancelledCompilation => - reporter.reportCancelledCompilation() - case a: ReporterAction.ProcessEndCompilation => - a.code match { - case BspStatusCode.Cancelled | BspStatusCode.Error => - reporter.processEndCompilation(previousProblems, a.code, None, None) - reporter.reportEndCompilation() - case _ => - /* - * Only process the end, don't report it. It's only safe to - * report when all the client tasks have been run and the - * analysis/classes dirs are fully populated so that clients - * can use `taskFinish` notifications as a signal to process them. - */ - reporter.processEndCompilation( - previousProblems, - a.code, - Some(clientClassesObserver.classesDir), - Some(bundle.out.analysisOut) - ) - } - } - case Right(action) => - action match { - case LoggerAction.LogErrorMessage(msg) => rawLogger.error(msg) - case LoggerAction.LogWarnMessage(msg) => rawLogger.warn(msg) - case LoggerAction.LogInfoMessage(msg) => rawLogger.info(msg) - case LoggerAction.LogDebugMessage(msg) => - rawLogger.debug(msg) - case LoggerAction.LogTraceMessage(msg) => - rawLogger.debug(msg) - } - } - .materialize - .map { - case Success(_) => DeduplicationResult.Ok - case Failure(_: UpstreamTimeoutException) => - DeduplicationResult.DisconnectFromDeduplication - case Failure(t) => DeduplicationResult.DeduplicationError(t) + } else { + tracer.trace("deduplication required") { tracer => + val rawLogger = logger.underlying + rawLogger.info( + s"Deduplicating compilation of ${bundle.project.name} from ${runningCompilation.client}" + ) + val reporter = bundle.reporter.underlying + // Don't use `bundle.lastSuccessful`, it's not the final input to `compile` + val analysis = runningCompilation.usedLastSuccessful.previous.analysis().toOption + val previousSuccessfulProblems = + Compiler.previousProblemsFromSuccessfulCompilation(analysis) + val wasPreviousSuccessful = bundle.latestResult match { + case Compiler.Result.Ok(_) => true + case _ => false } + val previousProblems = + Compiler.previousProblemsFromResult(bundle.latestResult, previousSuccessfulProblems) + + val clientClassesObserver = client.getClassesObserverFor(bundle.project) + + // Replay events asynchronously to waiting for the compilation result + import scala.concurrent.duration.FiniteDuration + import monix.execution.exceptions.UpstreamTimeoutException + val disconnectionTime = SystemProperties.getCompileDisconnectionTime(rawLogger) + val replayEventsTask = runningCompilation.mirror + .timeoutOnSlowUpstream(disconnectionTime) + .foreachL { + case Left(action) => + action match { + case ReporterAction.EnableFatalWarnings => + reporter.enableFatalWarnings() + case ReporterAction.ReportStartCompilation => + reporter.reportStartCompilation(previousProblems, wasPreviousSuccessful) + case a: ReporterAction.ReportStartIncrementalCycle => + reporter.reportStartIncrementalCycle(a.sources, a.outputDirs) + case a: ReporterAction.ReportProblem => reporter.log(a.problem) + case ReporterAction.PublishDiagnosticsSummary => + reporter.printSummary() + case a: ReporterAction.ReportNextPhase => + reporter.reportNextPhase(a.phase, a.sourceFile) + case a: ReporterAction.ReportCompilationProgress => + reporter.reportCompilationProgress(a.progress, a.total) + case a: ReporterAction.ReportEndIncrementalCycle => + reporter.reportEndIncrementalCycle(a.durationMs, a.result) + case ReporterAction.ReportCancelledCompilation => + reporter.reportCancelledCompilation() + case a: ReporterAction.ProcessEndCompilation => + a.code match { + case BspStatusCode.Cancelled | BspStatusCode.Error => + reporter.processEndCompilation(previousProblems, a.code, None, None) + reporter.reportEndCompilation() + case _ => + /* + * Only process the end, don't report it. It's only safe to + * report when all the client tasks have been run and the + * analysis/classes dirs are fully populated so that clients + * can use `taskFinish` notifications as a signal to process them. + */ + reporter.processEndCompilation( + previousProblems, + a.code, + Some(clientClassesObserver.classesDir), + Some(bundle.out.analysisOut) + ) + } + } + case Right(action) => + action match { + case LoggerAction.LogErrorMessage(msg) => rawLogger.error(msg) + case LoggerAction.LogWarnMessage(msg) => rawLogger.warn(msg) + case LoggerAction.LogInfoMessage(msg) => rawLogger.info(msg) + case LoggerAction.LogDebugMessage(msg) => + rawLogger.debug(msg) + case LoggerAction.LogTraceMessage(msg) => + rawLogger.debug(msg) + } + } + .materialize + .map { + case Success(_) => DeduplicationResult.Ok + case Failure(_: UpstreamTimeoutException) => + DeduplicationResult.DisconnectFromDeduplication + case Failure(t) => DeduplicationResult.DeduplicationError(t) + } - /* The task set up by another process whose memoized result we're going to - * reuse. To prevent blocking compilations, we execute this task (which will - * block until its completion is done) in the IO thread pool, which is - * unbounded. This makes sure that the blocking threads *never* block - * the computation pool, which could produce a hang in the build server. - */ - val runningCompilationTask = - runningCompilation.traversal.executeOn(ExecutionContext.ioScheduler) - - val deduplicateStreamSideEffectsHandle = - replayEventsTask.runToFuture(ExecutionContext.ioScheduler) - - /** - * Deduplicate and change the implementation of the task returning the - * deduplicate compiler result to trigger a syncing process to keep the - * client external classes directory up-to-date with the new classes - * directory. This copying process blocks until the background IO work - * of the deduplicated compilation result has been finished. Note that - * this mechanism allows pipelined compilations to perform this IO only - * when the full compilation of a module is finished. - */ - val obtainResultFromDeduplication = runningCompilationTask.map { results => - PartialCompileResult.mapEveryResult(results) { - case s @ PartialSuccess(bundle, compilerResult) => - val newCompilerResult = compilerResult.flatMap { results => - results.fromCompiler match { - case s: Compiler.Result.Success => - // Wait on new classes to be populated for correctness - val runningBackgroundTasks = s.backgroundTasks - .trigger(clientClassesObserver, reporter, bundle.tracer, logger) - .runAsync(ExecutionContext.ioScheduler) - Task.now(results.copy(runningBackgroundTasks = runningBackgroundTasks)) - case _: Compiler.Result.Cancelled => - // Make sure to cancel the deduplicating task if compilation is cancelled - deduplicateStreamSideEffectsHandle.cancel() - Task.now(results) - case _ => Task.now(results) + /* The task set up by another process whose memoized result we're going to + * reuse. To prevent blocking compilations, we execute this task (which will + * block until its completion is done) in the IO thread pool, which is + * unbounded. This makes sure that the blocking threads *never* block + * the computation pool, which could produce a hang in the build server. + */ + val runningCompilationTask = + runningCompilation.traversal.executeOn(ExecutionContext.ioScheduler) + + val deduplicateStreamSideEffectsHandle = + replayEventsTask.runToFuture(ExecutionContext.ioScheduler) + + /** + * Deduplicate and change the implementation of the task returning the + * deduplicate compiler result to trigger a syncing process to keep the + * client external classes directory up-to-date with the new classes + * directory. This copying process blocks until the background IO work + * of the deduplicated compilation result has been finished. Note that + * this mechanism allows pipelined compilations to perform this IO only + * when the full compilation of a module is finished. + */ + val obtainResultFromDeduplication = runningCompilationTask.map { results => + PartialCompileResult.mapEveryResult(results) { + case s @ PartialSuccess(bundle, compilerResult) => + val newCompilerResult = compilerResult.flatMap { results => + results.fromCompiler match { + case s: Compiler.Result.Success => + // Wait on new classes to be populated for correctness + val runningBackgroundTasks = s.backgroundTasks + .trigger(clientClassesObserver, reporter, tracer, logger) + .runAsync(ExecutionContext.ioScheduler) + Task.now(results.copy(runningBackgroundTasks = runningBackgroundTasks)) + case _: Compiler.Result.Cancelled => + // Make sure to cancel the deduplicating task if compilation is cancelled + deduplicateStreamSideEffectsHandle.cancel() + Task.now(results) + case _ => Task.now(results) + } } - } - s.copy(result = newCompilerResult) - case result => result + s.copy(result = newCompilerResult) + case result => result + } } - } - val compileAndDeduplicate = Task - .chooseFirstOf( - obtainResultFromDeduplication, - Task.fromFuture(deduplicateStreamSideEffectsHandle) - ) - .executeOn(ExecutionContext.ioScheduler) - - val finalCompileTask = compileAndDeduplicate.flatMap { - case Left((result, deduplicationFuture)) => - Task.fromFuture(deduplicationFuture).map(_ => result) - case Right((compilationFuture, deduplicationResult)) => - deduplicationResult match { - case DeduplicationResult.Ok => Task.fromFuture(compilationFuture) - case DeduplicationResult.DeduplicationError(t) => - rawLogger.trace(t) - val failedDeduplicationResult = Compiler.Result.GlobalError( - s"Unexpected error while deduplicating compilation for ${inputs.project.name}: ${t.getMessage}", - Some(t) - ) - - /* - * When an error happens while replaying all events of the - * deduplicated compilation, we keep track of the error, wait - * until the deduplicated compilation finishes and then we - * replace the result by a failed result that informs the - * client compilation was not successfully deduplicated. - */ - Task.fromFuture(compilationFuture).map { results => - PartialCompileResult.mapEveryResult(results) { (p: PartialCompileResult) => - p match { - case s: PartialSuccess => - val failedBundle = ResultBundle(failedDeduplicationResult, None, None) - s.copy(result = s.result.map(_ => failedBundle)) - case result => result + val compileAndDeduplicate = Task + .chooseFirstOf( + tracer.trace("obtainResultFromDeduplication - obtainResultFromDeduplication") { _ => + obtainResultFromDeduplication + }, + tracer.trace("obtainResultFromDeduplication - deduplicateStreamSideEffectsHandle") { _ => + Task.fromFuture(deduplicateStreamSideEffectsHandle) + } + ) + .executeOn(ExecutionContext.ioScheduler) + + val finalCompileTask = compileAndDeduplicate.flatMap { + case Left((result, deduplicationFuture)) => + Task.fromFuture(deduplicationFuture).map(_ => result) + case Right((compilationFuture, deduplicationResult)) => + deduplicationResult match { + case DeduplicationResult.Ok => Task.fromFuture(compilationFuture) + case DeduplicationResult.DeduplicationError(t) => + rawLogger.trace(t) + val failedDeduplicationResult = Compiler.Result.GlobalError( + s"Unexpected error while deduplicating compilation for ${inputs.project.name}: ${t.getMessage}", + Some(t) + ) + + /* + * When an error happens while replaying all events of the + * deduplicated compilation, we keep track of the error, wait + * until the deduplicated compilation finishes and then we + * replace the result by a failed result that informs the + * client compilation was not successfully deduplicated. + */ + Task.fromFuture(compilationFuture).map { results => + PartialCompileResult.mapEveryResult(results) { (p: PartialCompileResult) => + p match { + case s: PartialSuccess => + val failedBundle = ResultBundle(failedDeduplicationResult, None, None) + s.copy(result = s.result.map(_ => failedBundle)) + case result => result + } } } - } - case DeduplicationResult.DisconnectFromDeduplication => - /* - * Deduplication timed out after no compilation updates were - * recorded. In theory, this could happen because a rogue - * compilation process has stalled or is blocked. To ensure - * deduplicated clients always make progress, we now proceed - * with: - * + case DeduplicationResult.DisconnectFromDeduplication => + /* + * Deduplication timed out after no compilation updates were + * recorded. In theory, this could happen because a rogue + * compilation process has stalled or is blocked. To ensure + * deduplicated clients always make progress, we now proceed + * with: + * * 1. Cancelling the dead-looking compilation, hoping that the - * process will wake up at some point and stop running. - * 2. Shutting down the deduplication and triggering a new - * compilation. If there are several clients deduplicating this - * compilation, they will compete to start the compilation again - * with new compile inputs, as they could have already changed. - * 3. Reporting the end of compilation in case it hasn't been - * reported. Clients must handle two end compilation notifications - * gracefully. - * 4. Display the user that the deduplication was cancelled and a - * new compilation was scheduled. - */ - - CompileGatekeeper.disconnectDeduplicationFromRunning( - bundle.uniqueInputs, - runningCompilation - ) - - compilationFuture.cancel() - reporter.processEndCompilation(Nil, StatusCode.Cancelled, None, None) - reporter.reportEndCompilation() - - logger.displayWarningToUser( - s"""Disconnecting from deduplication of ongoing compilation for '${inputs.project.name}' - |No progress update for ${(disconnectionTime: FiniteDuration) - .toString()} caused bloop to cancel compilation and schedule a new compile. + * process will wake up at some point and stop running. + * 2. Shutting down the deduplication and triggering a new + * compilation. If there are several clients deduplicating this + * compilation, they will compete to start the compilation again + * with new compile inputs, as they could have already changed. + * 3. Reporting the end of compilation in case it hasn't been + * reported. Clients must handle two end compilation notifications + * gracefully. + * 4. Display the user that the deduplication was cancelled and a + * new compilation was scheduled. + */ + + CompileGatekeeper.disconnectDeduplicationFromRunning( + bundle.uniqueInputs, + runningCompilation, + logger, + tracer + ) + + compilationFuture.cancel() + reporter.processEndCompilation(Nil, StatusCode.Cancelled, None, None) + reporter.reportEndCompilation() + + logger.displayWarningToUser( + s"""Disconnecting from deduplication of ongoing compilation for '${inputs.project.name}' + |No progress update for ${(disconnectionTime: FiniteDuration) + .toString()} caused bloop to cancel compilation and schedule a new compile. """.stripMargin - ) + ) - setupAndDeduplicate(client, inputs, setup)(compile) - } - } + tracer.trace("setupAndDeduplicate - disconnected from deduplication") { tracer => + setupAndDeduplicate(client, inputs, setup, tracer)(compile) + } + } + } - bundle.tracer.traceTask(s"deduplicating ${bundle.project.name}") { _ => - finalCompileTask.executeOn(ExecutionContext.ioScheduler) + tracer.traceTask(s"deduplicating ${bundle.project.name}") { _ => + finalCompileTask.executeOn(ExecutionContext.ioScheduler) + } } } } @@ -368,120 +377,127 @@ object CompileGraph { client: ClientInfo, store: CompileClientStore, bestEffortAllowed: Boolean, - computeBundle: BundleInputs => Task[CompileBundle], - compile: (Inputs, Boolean, Boolean) => Task[ResultBundle] - ): CompileTraversal = { - val tasks = new mutable.HashMap[Dag[Project], CompileTraversal]() - def register(k: Dag[Project], v: CompileTraversal): CompileTraversal = { - val toCache = store.findPreviousTraversalOrAddNew(k, v).getOrElse(v) - tasks.put(k, toCache) - toCache - } + computeBundle: (BundleInputs, BraveTracer) => Task[CompileBundle], + compile: (Inputs, Boolean, Boolean, BraveTracer) => Task[ResultBundle], + tracer: BraveTracer + ): CompileTraversal = + tracer.trace("traversing ") { _ => + val tasks = new mutable.HashMap[Dag[Project], CompileTraversal]() + def register(k: Dag[Project], v: CompileTraversal): CompileTraversal = { + val toCache = store.findPreviousTraversalOrAddNew(k, v).getOrElse(v) + tasks.put(k, toCache) + toCache + } - /* - * [[PartialCompileResult]] is our way to represent errors at the build graph - * so that we can block the compilation of downstream projects. As we have to - * abide by this contract because it's used by the pipeline traversal too, we - * turn an actual compiler failure into a partial failure with a dummy - * `FailPromise` exception that makes the partial result be recognized as error. - */ - def toPartialFailure(bundle: SuccessfulCompileBundle, results: ResultBundle): PartialFailure = { - PartialFailure(bundle.project, FailedOrCancelledPromise, Task.now(results)) - } + /* + * [[PartialCompileResult]] is our way to represent errors at the build graph + * so that we can block the compilation of downstream projects. As we have to + * abide by this contract because it's used by the pipeline traversal too, we + * turn an actual compiler failure into a partial failure with a dummy + * `FailPromise` exception that makes the partial result be recognized as error. + */ + def toPartialFailure(bundle: SuccessfulCompileBundle, results: ResultBundle): PartialFailure = { + PartialFailure(bundle.project, FailedOrCancelledPromise, Task.now(results)) + } - def loop(dag: Dag[Project]): CompileTraversal = { - tasks.get(dag) match { - case Some(task) => task - case None => - val task: Task[Dag[PartialCompileResult]] = dag match { - case Leaf(project) => - val bundleInputs = BundleInputs(project, dag, Map.empty) - setupAndDeduplicate(client, bundleInputs, computeBundle) { bundle => - val isBestEffortDep = false - compile(Inputs(bundle, Map.empty), bestEffortAllowed && project.isBestEffort, isBestEffortDep).map { results => - results.fromCompiler match { - case Compiler.Result.Ok(_) => Leaf(partialSuccess(bundle, results)) - case _ => Leaf(toPartialFailure(bundle, results)) + def loop(dag: Dag[Project]): CompileTraversal = { + tasks.get(dag) match { + case Some(task) => task + case None => + val task: Task[Dag[PartialCompileResult]] = dag match { + case Leaf(project) => + val bundleInputs = BundleInputs(project, dag, Map.empty) + tracer.trace("setupAndDeduplicate - leaf project") { tracer => + setupAndDeduplicate(client, bundleInputs, computeBundle, tracer) { bundle => + val isBestEffortDep = false + compile(Inputs(bundle, Map.empty), bestEffortAllowed && project.isBestEffort, isBestEffortDep, tracer).map { + results => + results.fromCompiler match { + case Compiler.Result.Ok(_) => Leaf(partialSuccess(bundle, results)) + case _ => Leaf(toPartialFailure(bundle, results)) + } + } } } - } - - case Aggregate(dags) => - val downstream = dags.map(loop(_)) - Task.gatherUnordered(downstream).flatMap { dagResults => - Task.now(Parent(PartialEmpty, dagResults)) - } - case Parent(project, dependencies) => - val downstream = dependencies.map(loop(_)) - Task.gatherUnordered(downstream).flatMap { dagResults => - val depsSupportBestEffort = - dependencies.map(Dag.dfs(_, mode = Dag.PreOrder)).flatten.forall(_.isBestEffort) - val failed = dagResults.flatMap(dag => blockedBy(dag).toList) - - val allResults = Task.gatherUnordered { - val transitive = dagResults.flatMap(Dag.dfs(_, mode = Dag.PreOrder)).distinct - transitive.flatMap { - case PartialSuccess(bundle, result) => Some(result.map(r => bundle.project -> r)) - case PartialFailure(project, _, result) => Some(result.map(r => project -> r)) - case _ => None - } + case Aggregate(dags) => + val downstream = dags.map(loop(_)) + Task.gatherUnordered(downstream).flatMap { dagResults => + Task.now(Parent(PartialEmpty, dagResults)) } - allResults.flatMap { results => - val successfulBestEffort = !results.exists { - case (_, ResultBundle(f: Compiler.Result.Failed, _, _, _)) => f.bestEffortProducts.isEmpty - case _ => false + case Parent(project, dependencies) => + val downstream = dependencies.map(loop(_)) + Task.gatherUnordered(downstream).flatMap { dagResults => + val depsSupportBestEffort = + dependencies.map(Dag.dfs(_, mode = Dag.PreOrder)).flatten.forall(_.isBestEffort) + val failed = dagResults.flatMap(dag => blockedBy(dag).toList) + + val allResults = Task.gatherUnordered { + val transitive = dagResults.flatMap(Dag.dfs(_, mode = Dag.PreOrder)).distinct + transitive.flatMap { + case PartialSuccess(bundle, result) => Some(result.map(r => bundle.project -> r)) + case PartialFailure(project, _, result) => Some(result.map(r => project -> r)) + case _ => None + } } - val continue = bestEffortAllowed && depsSupportBestEffort && successfulBestEffort || failed.isEmpty - val dependsOnBestEffort = failed.nonEmpty && bestEffortAllowed && depsSupportBestEffort - - if (!continue) { - // Register the name of the projects we're blocked on (intransitively) - val blockedResult = Compiler.Result.Blocked(failed.map(_.name)) - val blocked = Task.now(ResultBundle(blockedResult, None, None)) - Task.now(Parent(PartialFailure(project, BlockURI, blocked), dagResults)) - } else { - val dependentProducts = new mutable.ListBuffer[(Project, BundleProducts)]() - val dependentResults = new mutable.ListBuffer[(File, PreviousResult)]() - results.foreach { - case (p, ResultBundle(s: Compiler.Result.Success, _, _, _)) => - val newProducts = s.products - dependentProducts.+=(p -> Right(newProducts)) - val newResult = newProducts.resultForDependentCompilationsInSameRun - dependentResults - .+=(newProducts.newClassesDir.toFile -> newResult) - .+=(newProducts.readOnlyClassesDir.toFile -> newResult) - case (p, ResultBundle(f: Compiler.Result.Failed, _, _, _)) => - f.bestEffortProducts.foreach { - case BestEffortProducts(products, _, _) => - dependentProducts += (p -> Right(products)) - } - case _ => () + + allResults.flatMap { results => + val successfulBestEffort = !results.exists { + case (_, ResultBundle(f: Compiler.Result.Failed, _, _, _)) => f.bestEffortProducts.isEmpty + case _ => false } + val continue = bestEffortAllowed && depsSupportBestEffort && successfulBestEffort || failed.isEmpty + val dependsOnBestEffort = failed.nonEmpty && bestEffortAllowed && depsSupportBestEffort + + if (!continue) { + // Register the name of the projects we're blocked on (intransitively) + val blockedResult = Compiler.Result.Blocked(failed.map(_.name)) + val blocked = Task.now(ResultBundle(blockedResult, None, None)) + Task.now(Parent(PartialFailure(project, BlockURI, blocked), dagResults)) + } else { + val dependentProducts = new mutable.ListBuffer[(Project, BundleProducts)]() + val dependentResults = new mutable.ListBuffer[(File, PreviousResult)]() + results.foreach { + case (p, ResultBundle(s: Compiler.Result.Success, _, _, _)) => + val newProducts = s.products + dependentProducts.+=(p -> Right(newProducts)) + val newResult = newProducts.resultForDependentCompilationsInSameRun + dependentResults + .+=(newProducts.newClassesDir.toFile -> newResult) + .+=(newProducts.readOnlyClassesDir.toFile -> newResult) + case (p, ResultBundle(f: Compiler.Result.Failed, _, _, _)) => + f.bestEffortProducts.foreach { + case BestEffortProducts(products, _, _) => + dependentProducts += (p -> Right(products)) + } + case _ => () + } - val resultsMap = dependentResults.toMap - val bundleInputs = BundleInputs(project, dag, dependentProducts.toMap) - setupAndDeduplicate(client, bundleInputs, computeBundle) { bundle => - val inputs = Inputs(bundle, resultsMap) - compile(inputs, bestEffortAllowed && project.isBestEffort, dependsOnBestEffort).map { results => - results.fromCompiler match { - case Compiler.Result.Ok(_) if failed.isEmpty => - Parent(partialSuccess(bundle, results), dagResults) - case _ => Parent(toPartialFailure(bundle, results), dagResults) + val resultsMap = dependentResults.toMap + val bundleInputs = BundleInputs(project, dag, dependentProducts.toMap) + tracer.trace("setupAndDeduplicate - parent project") { tracer => + setupAndDeduplicate(client, bundleInputs, computeBundle, tracer) { bundle => + val inputs = Inputs(bundle, resultsMap) + compile(inputs, bestEffortAllowed && project.isBestEffort, dependsOnBestEffort, tracer).map { results => + results.fromCompiler match { + case Compiler.Result.Ok(_) if failed.isEmpty => + Parent(partialSuccess(bundle, results), dagResults) + case _ => Parent(toPartialFailure(bundle, results), dagResults) + } + } } } } } } - } - } - register(dag, task.memoize) + } + register(dag, task.memoize) + } } - } - loop(dag) - } + loop(dag) + } private def errorToString(err: Throwable): String = { val sw = new StringWriter()