-
Notifications
You must be signed in to change notification settings - Fork 28
Concurrency example with worker threads
This is a translation of the last snippet of code of the article Comparative Concurrency with Haskell.
The code is as close as possible to the original, but it follows the transient philosophy. The original is a canonical example of multi-threading created by @snoyberg to teach programmers coming from languages like Go or Elixir to program with multi-threading and channels in Haskell.
The program spawn a number of worker threads which will each sleep for a random period of time, grab an integer off of a shared work queue, square it, and put the result back on a result queue. Meanwhile, a master thread will fill up the work queue with integers, and read and print results.
You will see some differences:
- There are no loops!
- Exceptions treatment is explicit and in the hands of the programmer.
- Threading is first class. there are no special combinators for threading
- Number of threads to assign to each worker is explicit and in the hands of the programmer
- The changes are minimally intrusive, only in order to add functionality. No reinvention of the weel
- No result channel is necessary: it is substituted by monadic composition:
printResult
get each result directly from each worker thread. This is the only major change necessary since it is how a transient programmer would leverage the capability of sequencing computations in presence of the multithreading effect.
Indeed this last point also avoid a subtle problem introduced by lazyness and an excessive thread compartimentation, that sometimes can be the result of the lack of composability of the IO monad when there are multiple threads. The problem is mentioned [here] (https://www.reddit.com/r/haskell/comments/5e97sq/comparative_concurrency_with_haskell_fp_complete/daauk64/): The result of each worker is not evaluated in the original snippet, so is printResults
in a single thread, the one that perform all the work, destroying parallelism. On the contrary, since printResult
in this snippet run in each thread of the worker, this problems does not happens.
Oops: this creates another problem: printing under linux does not block by default at the line level. TO avoid mixing lines in the output, LineBuffering has been set.
Moreover:
- Under
keep
, it is possible to compose this program with other parallel programs using monadic/applicative/alternative operators, since threading in transient is first class. - It is possible to convert the program into a distributed and/or web application with little effort (example pending) since distributed/web computing is also first class.
Thank to the wonderful work of the FPcomplete people, the program is runnable in the command line if you have stack installed.
#!/usr/bin/env stack
{- stack --install-ghc --resolver lts-6.23 runghc
--package random --package transient --package stm-chans
-}
{-#LANGUAGE ScopedTypeVariables #-}
import Control.Concurrent (threadDelay)
import Transient.Base(parallel,keep,threads,StreamData(..),killChilds)
import Transient.Backtrack(onFinish)
import Control.Exception
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TBMChan (readTBMChan, writeTBMChan, newTBMChan, closeTBMChan)
import System.Random (randomRIO)
import Control.Applicative
import Control.Monad.IO.Class
workerCount = 250
workloadCount = 10000 :: Int
minDelay = 250000 -- in microseconds, == 0.25 seconds
maxDelay = 750000 -- == 0.75 seconds
worker requestChan workerId = do
-- set line buffering since `printResult` run in each worker thread
liftIO $ hSetBuffering stdout LineBuffering
-- run a parallel thread for each worker
-- `parallel` return `empty` to the invoking thread and execute his parameter
-- in an internal loop, spawning threads when results are returned
-- since `threads` impose a limit of 1, the worker will perform the work
-- with one thread
r <- threads 1 $ parallel $ do
delay <- randomRIO (minDelay, maxDelay)
threadDelay delay
mint <- atomically $ readTBMChan requestChan
case mint of
-- stop the thread when the channel is closed
Nothing -> return SDone
Just int -> return $ SMore(workerId, int, int * int)
case r of
SDone -> empty -- no response, stop further actions
SMore r -> return r
main = keep $ do -- run the Transient monad
-- Create our communication channels. Now the response channel is
-- also bounded and closable.
requestChan <- liftIO . atomically $ newTBMChan (workerCount * 2)
-- responseChan <- atomically $ newTBMChan (workerCount * 2) -- mot needed
-- We're going to have three main threads. Let's define them all
-- here. Note that we're _defining_ an action to be run, not
-- running it yet! We'll run them below.
let
-- runWorkers is going to run all of the worker threads
runWorkers = do
-- use the alternative operator to spawn all the workers
-- since each worker return empty to the main thread
-- all the workers will be initiated
foldr (<|>) empty $ map (worker requestChan) [1..workerCount]
-- Workers are all done, so close the response channel
-- atomically $ closeTBMChan responseChan -- not needed
-- Fill up the request channel, exactly the same as before
fillRequests = do
liftIO $ mapM_ (atomically . writeTBMChan requestChan) [1..workloadCount]
liftIO $ atomically $ closeTBMChan requestChan
-- this task stop here
empty
-- Print one result, no loop
printResult (workerId, int, square) = liftIO $ do
-- Print it...
putStrLn $ concat
[ "Worker #"
, show workerId
, ": square of "
, show int
, " is "
, show square
]
-- If any thread dies with an exception, the other threads are killed
-- and the event also will execute other finalization actions.
-- `killChilds` will kill all the child threads created below
-- this sentence could also retry the process if `noFinish` is added
onFinish (\(Just (e :: SomeException)) -> killChilds)
-- Now that we've defined our actions, we can use the alternative operator to
-- run all of them. it forks a thread for each action and waits for all threads to exit
-- successfully
r <- runWorkers <|> fillRequests
printResult r
return ()
| Intro
| How-to
| Backtracking to undo IO actions and more
| Finalization: better than exceptions
| Event variables: Publish Suscribe
| Checkpoints(New), suspend and restore
| Remote execution: The Cloud monad
| Clustering: programming the cloud
| Mailboxes for cloud communications
| Distributed computing: map-reduce