-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathConcurrency.hs
269 lines (224 loc) · 11.4 KB
/
Concurrency.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
{-# LANGUAGE RecordWildCards, LambdaCase, OverloadedStrings, PatternSynonyms #-}
module Stg.Interpreter.PrimOp.Concurrency where
import Control.Monad
import Control.Monad.State
import qualified Data.ByteString.Char8 as BS8
import qualified Data.IntMap as IntMap
import Foreign.Ptr
import Stg.Syntax
import Stg.Interpreter.Base
pattern IntV i = IntAtom i
evalPrimOp :: PrimOpEval -> Name -> [Atom] -> Type -> Maybe TyCon -> M [Atom]
evalPrimOp fallback op args t tc = case (op, args) of
-- fork# :: (State# RealWorld -> (# State# RealWorld, t0 #)) -> State# RealWorld -> (# State# RealWorld, ThreadId# #)
( "fork#", [ioAction, _s]) -> do
promptM $ print (op, args)
currentTS <- getCurrentThreadState
(newTId, newTS) <- createThread
updateThreadState newTId $ newTS
{ tsCurrentResult = [ioAction]
, tsStack = [Apply [Void], RunScheduler SR_ThreadFinished]
-- NOTE: start blocked if the current thread is blocked
, tsBlockExceptions = tsBlockExceptions currentTS
, tsInterruptible = tsInterruptible currentTS
}
scheduleToTheEnd newTId
-- NOTE: context switch soon, but not immediately: we don't want every forkIO to force a context-switch.
requestContextSwitch -- TODO: push continuation reschedule, reason request context switch
pure [ThreadId newTId]
-- forkOn# :: Int# -> (State# RealWorld -> (# State# RealWorld, t0 #)) -> State# RealWorld -> (# State# RealWorld, ThreadId# #)
( "forkOn#", [IntV capabilityNo, ioAction, _s]) -> do
promptM $ print (op, args)
currentTS <- getCurrentThreadState
(newTId, newTS) <- createThread
updateThreadState newTId $ newTS
{ tsCurrentResult = [ioAction]
, tsStack = [Apply [Void], RunScheduler SR_ThreadFinished]
-- NOTE: start blocked if the current thread is blocked
, tsBlockExceptions = tsBlockExceptions currentTS
, tsInterruptible = tsInterruptible currentTS
-- NOTE: capability related
, tsLocked = True -- HINT: do not move this thread across capabilities
, tsCapability = capabilityNo
}
scheduleToTheEnd newTId
-- NOTE: context switch soon, but not immediately: we don't want every forkIO to force a context-switch.
requestContextSwitch -- TODO: push continuation reschedule, reason request context switch
pure [ThreadId newTId]
-- killThread# :: ThreadId# -> a -> State# RealWorld -> State# RealWorld
( "killThread#", [ThreadId tidTarget, exception, _s]) -> do
tid <- gets ssCurrentThreadId
case tid == tidTarget of
True -> do
-- killMyself
{-
the thread might survive
Q: how?
A: a catch frame can save the thread so that it will handle the exception
-}
removeFromQueues tidTarget
let myResult = [] -- HINT: this is the result of the killThread# primop
raiseAsyncEx myResult tidTarget exception
-- TODO: remove this below, problem: raiseAsyncEx may kill the thread ; model kill thread as return to scheduler operation with a descriptive reason
-- return the result that the raise async ex has calculated
tsCurrentResult <$> getCurrentThreadState
False -> do
-- kill other thread
targetTS <- getThreadState tidTarget
let blockIfNotInterruptible_raiseOtherwise
| tsBlockExceptions targetTS
, not (tsInterruptible targetTS) = block
| otherwise = raise
blockIfBlocked_raiseOtherwise
| tsBlockExceptions targetTS = block
| otherwise = raise
block = do
-- add our thread id and exception to target's blocked excpetions queue
updateThreadState tidTarget (targetTS {tsBlockedExceptions = tid : tsBlockedExceptions targetTS})
-- block our thread
myTS <- getCurrentThreadState
updateThreadState tid (myTS {tsStatus = ThreadBlocked $ BlockedOnThrowAsyncEx tidTarget exception})
--liftIO $ putStrLn $ " * killThread#, blocked tid: " ++ show tid
-- push reschedule continuation, reason: block
stackPush $ RunScheduler SR_ThreadBlocked
pure []
raise = do
removeFromQueues tidTarget
raiseAsyncEx (tsCurrentResult targetTS) tidTarget exception
pure []
case tsStatus targetTS of
ThreadFinished -> pure [] -- NOTE: nothing to do
ThreadDied -> pure [] -- NOTE: nothing to do
ThreadRunning -> blockIfBlocked_raiseOtherwise
ThreadBlocked blockReason -> case blockReason of
BlockedOnForeignCall{} -> block
BlockedOnBlackHole{} -> blockIfBlocked_raiseOtherwise
BlockedOnThrowAsyncEx{} -> blockIfNotInterruptible_raiseOtherwise
BlockedOnSTM{} -> blockIfNotInterruptible_raiseOtherwise
BlockedOnMVar{} -> blockIfNotInterruptible_raiseOtherwise
BlockedOnMVarRead{} -> blockIfNotInterruptible_raiseOtherwise
BlockedOnRead{} -> blockIfNotInterruptible_raiseOtherwise
BlockedOnWrite{} -> blockIfNotInterruptible_raiseOtherwise
BlockedOnDelay{} -> blockIfNotInterruptible_raiseOtherwise
-- yield# :: State# RealWorld -> State# RealWorld
( "yield#", [_s]) -> do
stackPush $ RunScheduler SR_ThreadYield
pure []
-- myThreadId# :: State# RealWorld -> (# State# RealWorld, ThreadId# #)
( "myThreadId#", [_s]) -> do
tid <- gets ssCurrentThreadId
pure [ThreadId tid]
-- labelThread# :: ThreadId# -> Addr# -> State# RealWorld -> State# RealWorld
( "labelThread#", [ThreadId tid, PtrAtom _ p, _s]) -> do
threadLabel <- liftIO . BS8.packCString $ castPtr p
let setLabel ts@ThreadState{..} = ts {tsLabel = Just threadLabel}
modify' $ \s@StgState{..} -> s {ssThreads = IntMap.adjust setLabel tid ssThreads}
pure []
-- isCurrentThreadBound# :: State# RealWorld -> (# State# RealWorld, Int# #)
( "isCurrentThreadBound#", [_s]) -> do
ThreadState{..} <- getCurrentThreadState
pure [IntV $ if tsBound then 1 else 0]
-- noDuplicate# :: State# s -> State# s
( "noDuplicate#", [_s]) -> do
-- NOTE: the stg interpreter is not parallel, so this is a no-op
pure []
-- threadStatus# :: ThreadId# -> State# RealWorld -> (# State# RealWorld, Int#, Int#, Int# #)
( "threadStatus#", [ThreadId tid, _s]) -> do
ThreadState{..} <- getThreadState tid
-- HINT: includes/rts/Constants.h
-- base:GHC.Conc.Sync.threadStatus
let statusCode = case tsStatus of
ThreadRunning -> 0
ThreadFinished -> 16
ThreadDied -> 17
ThreadBlocked r -> case r of
BlockedOnMVar{} -> 1
BlockedOnMVarRead{} -> 14
BlockedOnBlackHole -> 2
BlockedOnSTM -> 6
BlockedOnForeignCall -> 10
BlockedOnRead{} -> 3
BlockedOnWrite{} -> 4
BlockedOnDelay{} -> 5
BlockedOnThrowAsyncEx{} -> 12
pure [IntV statusCode, IntV tsCapability, IntV $ if tsLocked then 1 else 0]
_ -> fallback op args t tc
raiseAsyncEx :: [Atom] -> Int -> Atom -> M ()
raiseAsyncEx lastResult tid exception = do
let unwindStack result stackPiece = \case
-- no Catch stack frame is found, kill thread
[] -> do
ts <- getThreadState tid
updateThreadState tid (ts {tsStack = [], tsStatus = ThreadDied})
-- TODO: reschedule continuation??
--stackPush $ RunScheduler SR_ThreadBlocked
-- the thread continues with the excaption handler, also wakes up the thread if necessary
exStack@(Catch exHandler bEx iEx : _) -> do
ts <- getThreadState tid
updateThreadState tid $ ts
{ tsCurrentResult = [exHandler]
, tsStack = Apply [exception, Void] : exStack -- TODO: restore the catch frames exception mask, sync exceptions do it, and according the async ex pape it should be done here also
, tsStatus = ThreadRunning -- HINT: whatever blocked this thread now that operation got cancelled by the async exception
-- NOTE: Ensure that async exceptions are blocked now, so we don't get a surprise exception before we get around to executing the handler.
, tsBlockExceptions = True
, tsInterruptible = iEx
}
-- replace Update with ApStack
Update addr : stackTail -> do
let apStack = ApStack
{ hoResult = result
, hoStack = reverse stackPiece
}
store addr apStack
let newResult = [HeapPtr addr]
unwindStack newResult [Apply []] stackTail
Atomically stmAction : stackTail -> do
ts <- getCurrentThreadState
tid <- gets ssCurrentThreadId
-- extra validation (optional)
when (tsTLogStack ts /= []) $ error "internal error: non-empty tsTLogStack without tsActiveTLog"
let Just tlog = tsActiveTLog ts
-- abandon transaction
updateThreadState tid $ ts {tsActiveTLog = Nothing}
unsubscribeTVarWaitQueues tid tlog
unwindStack result (AtomicallyOp stmAction : stackPiece) stackTail
stackHead@(CatchSTM{}) : stackTail -> do
-- FIXME: IMO this is smeantically incorrect, but this is how it's done in the native RTS
-- this stack frame should not persist in ApStack only AtomicallyOp, it will restart the STM transaction anyway
ts <- getCurrentThreadState
tid <- gets ssCurrentThreadId
let Just tlog = tsActiveTLog ts
tlogStackTop : tlogStackTail = tsTLogStack ts
-- HINT: abort transaction
unsubscribeTVarWaitQueues tid tlog
updateThreadState tid $ ts
{ tsActiveTLog = Just tlogStackTop
, tsTLogStack = tlogStackTail
}
unwindStack result (stackHead : stackPiece) stackTail
stackHead@(CatchRetry{}) : stackTail -> do
-- FIXME: IMO this is smeantically incorrect, but this is how it's done in the native RTS
-- this stack frame should not persist in ApStack only AtomicallyOp, it will restart the STM transaction anyway
-- HINT: abort transaction
ts <- getCurrentThreadState
tid <- gets ssCurrentThreadId
let Just tlog = tsActiveTLog ts
unsubscribeTVarWaitQueues tid tlog
updateThreadState tid $ ts { tsTLogStack = tail $ tsTLogStack ts }
unwindStack result (stackHead : stackPiece) stackTail
-- collect stack frames for ApStack
stackHead : stackTail -> do
unwindStack result (stackHead : stackPiece) stackTail
ts <- getThreadState tid
unwindStack lastResult [] (tsStack ts)
removeFromQueues :: Int -> M ()
removeFromQueues tid = do
ThreadState{..} <- getThreadState tid
case tsStatus of
ThreadBlocked (BlockedOnMVar m _) -> removeFromMVarQueue tid m
ThreadBlocked (BlockedOnMVarRead m) -> removeFromMVarQueue tid m
_ -> pure ()
removeFromMVarQueue :: Int -> Int -> M ()
removeFromMVarQueue tid m = do
let filterFun mvd@MVarDescriptor{..} = mvd {mvdQueue = filter (tid /=) mvdQueue}
modify' $ \s@StgState{..} -> s {ssMVars = IntMap.adjust filterFun m ssMVars}