9
9
package scala .concurrent .impl
10
10
11
11
import java .util .concurrent .{ ForkJoinPool , ForkJoinWorkerThread , ForkJoinTask , Callable , Executor , ExecutorService , ThreadFactory , TimeUnit }
12
- import java .util .concurrent .atomic .AtomicInteger
12
+ import java .util .concurrent .atomic .{ AtomicInteger , AtomicReference }
13
13
import java .util .Collection
14
14
import scala .concurrent .{ BlockContext , ExecutionContext , CanAwait , ExecutionContextExecutor , ExecutionContextExecutorService }
15
15
import scala .annotation .tailrec
@@ -24,26 +24,25 @@ private[scala] class ExecutionContextImpl private[impl] (val executor: Executor,
24
24
25
25
private [concurrent] object ExecutionContextImpl {
26
26
27
- // Implement BlockContext on FJP threads
28
27
final class DefaultThreadFactory (
29
28
daemonic : Boolean ,
30
- maxThreads : Int ,
29
+ maxBlockers : Int ,
31
30
prefix : String ,
32
31
uncaught : Thread .UncaughtExceptionHandler ) extends ThreadFactory with ForkJoinPool .ForkJoinWorkerThreadFactory {
33
32
34
33
require(prefix ne null , " DefaultThreadFactory.prefix must be non null" )
35
- require(maxThreads > 0 , " DefaultThreadFactory.maxThreads must be greater than 0" )
34
+ require(maxBlockers >= 0 , " DefaultThreadFactory.maxBlockers must be greater-or-equal-to 0" )
36
35
37
- private final val currentNumberOfThreads = new AtomicInteger (0 )
36
+ private final val currentNumberOfBlockers = new AtomicInteger (0 )
38
37
39
- @ tailrec private final def reserveThread (): Boolean = currentNumberOfThreads .get() match {
40
- case `maxThreads ` | Int .`MaxValue` => false
41
- case other => currentNumberOfThreads .compareAndSet(other, other + 1 ) || reserveThread ()
38
+ @ tailrec private final def newBlocker (): Boolean = currentNumberOfBlockers .get() match {
39
+ case `maxBlockers ` | Int .`MaxValue` => false
40
+ case other => currentNumberOfBlockers .compareAndSet(other, other + 1 ) || newBlocker ()
42
41
}
43
42
44
- @ tailrec private final def deregisterThread (): Boolean = currentNumberOfThreads .get() match {
43
+ @ tailrec private final def freeBlocker (): Boolean = currentNumberOfBlockers .get() match {
45
44
case 0 => false
46
- case other => currentNumberOfThreads .compareAndSet(other, other - 1 ) || deregisterThread ()
45
+ case other => currentNumberOfBlockers .compareAndSet(other, other - 1 ) || freeBlocker ()
47
46
}
48
47
49
48
def wire [T <: Thread ](thread : T ): T = {
@@ -53,39 +52,42 @@ private[concurrent] object ExecutionContextImpl {
53
52
thread
54
53
}
55
54
56
- // As per ThreadFactory contract newThread should return `null` if cannot create new thread.
57
- def newThread (runnable : Runnable ): Thread =
58
- if (reserveThread())
59
- wire(new Thread (new Runnable {
60
- // We have to decrement the current thread count when the thread exits
61
- override def run () = try runnable.run() finally deregisterThread()
62
- })) else null
55
+ def newThread (runnable : Runnable ): Thread = wire(new Thread (runnable))
63
56
64
57
def newThread (fjp : ForkJoinPool ): ForkJoinWorkerThread =
65
- if (reserveThread()) {
66
- wire(new ForkJoinWorkerThread (fjp) with BlockContext {
67
- // We have to decrement the current thread count when the thread exits
68
- final override def onTermination (exception : Throwable ): Unit = deregisterThread()
69
- final override def blockOn [T ](thunk : => T )(implicit permission : CanAwait ): T = {
70
- var result : T = null .asInstanceOf [T ]
71
- ForkJoinPool .managedBlock(new ForkJoinPool .ManagedBlocker {
72
- @ volatile var isdone = false
73
- override def block (): Boolean = {
74
- result = try {
75
- // When we block, switch out the BlockContext temporarily so that nested blocking does not created N new Threads
76
- BlockContext .withBlockContext(BlockContext .defaultBlockContext) { thunk }
77
- } finally {
78
- isdone = true
58
+ wire(new ForkJoinWorkerThread (fjp) with BlockContext {
59
+ private [this ] var isBlocked : Boolean = false // This is only ever read & written if this thread is the current thread
60
+ final override def blockOn [T ](thunk : => T )(implicit permission : CanAwait ): T =
61
+ if ((Thread .currentThread eq this ) && ! isBlocked && newBlocker()) {
62
+ try {
63
+ isBlocked = true
64
+ val b : ForkJoinPool .ManagedBlocker with (() => T ) =
65
+ new ForkJoinPool .ManagedBlocker with (() => T ) {
66
+ private [this ] var result : T = null .asInstanceOf [T ]
67
+ private [this ] var done : Boolean = false
68
+ final override def block (): Boolean = {
69
+ try {
70
+ if (! done)
71
+ result = thunk
72
+ } finally {
73
+ done = true
74
+ }
75
+
76
+ true
79
77
}
80
78
81
- true
82
- }
83
- override def isReleasable = isdone
84
- })
85
- result
86
- }
87
- })
88
- } else null
79
+ final override def isReleasable = done
80
+
81
+ final override def apply (): T = result
82
+ }
83
+ ForkJoinPool .managedBlock(b)
84
+ b()
85
+ } finally {
86
+ isBlocked = false
87
+ freeBlocker()
88
+ }
89
+ } else thunk // Unmanaged blocking
90
+ })
89
91
}
90
92
91
93
def createDefaultExecutorService (reporter : Throwable => Unit ): ExecutorService = {
@@ -99,8 +101,6 @@ private[concurrent] object ExecutionContextImpl {
99
101
def range (floor : Int , desired : Int , ceiling : Int ) = scala.math.min(scala.math.max(floor, desired), ceiling)
100
102
val numThreads = getInt(" scala.concurrent.context.numThreads" , " x1" )
101
103
// The hard limit on the number of active threads that the thread factory will produce
102
- // scala/bug#8955 Deadlocks can happen if maxNoOfThreads is too low, although we're currently not sure
103
- // about what the exact threshold is. numThreads + 256 is conservatively high.
104
104
val maxNoOfThreads = getInt(" scala.concurrent.context.maxThreads" , " x1" )
105
105
106
106
val desiredParallelism = range(
@@ -116,7 +116,7 @@ private[concurrent] object ExecutionContextImpl {
116
116
}
117
117
118
118
val threadFactory = new ExecutionContextImpl .DefaultThreadFactory (daemonic = true ,
119
- maxThreads = maxNoOfThreads + maxExtraThreads,
119
+ maxBlockers = maxExtraThreads,
120
120
prefix = " scala-execution-context-global" ,
121
121
uncaught = uncaughtExceptionHandler)
122
122
0 commit comments