Skip to content

Concurrency example with worker threads

Alberto edited this page Nov 22, 2016 · 34 revisions

This is a translation of the las snippet of code of the article Comparative Concurrency with Haskell.

The code is as close as possible to the original but follows the transient philosophy. The original is a canonical example of multi-threading created by @snoyberg to teach haskell newcomers from languages like Go or Elixir to programs with multi-threading and channels in Haskell.

The program spawn a number of worker threads which will each sleep for a random period of time, grab an integer off of a shared work queue, square it, and put the result back on a result queue. Meanwhile, a master thread will fill up the work queue with integers, and read and print results.

You will see some differences:

  • There are no loops!
  • Exceptions treatment is explicit and in the hands of the programmer.
  • Threading is first class. there are no special combinators for threading
  • Number of threads to assign to each worker is explicit and in the hands of the programmer
  • The changes are minimally intrusive, only in order to add functionality. No reinvention of the weel
  • No result channel is necessary: it is substituted by monadic composition: printResult get each result directly from each worker thread. This is the only major change necessary since it is how a transient programmer would leverage the capability of sequencing computations in presence of the multithreading effect.

Moreover:

  • Under keep, it is possible to compose this program with other parallel programs using monadic/applicative/alternative operators, since threading in transient is first class.
  • It is possible to convert the program into a distributed and/or web application with little effort (example pending) since distributed/web computing is also first class.

Thank to the wonderful work of the FPcomplete people, the program is runnable in the command line if you have stack installed.

#!/usr/bin/env stack
{- stack --install-ghc --resolver lts-6.23 runghc
    --package random --package transient --package stm-chans
-}
{-#LANGUAGE ScopedTypeVariables #-}
import Control.Concurrent (threadDelay)
import Transient.Base(parallel,keep,threads,StreamData(..),killChilds)
import Transient.Backtrack(onFinish)
import Control.Exception
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TBMChan (readTBMChan, writeTBMChan, newTBMChan, closeTBMChan)
import System.Random (randomRIO)
import Control.Applicative
import Control.Monad.IO.Class

workerCount = 250
workloadCount = 10000 :: Int
minDelay = 250000 -- in microseconds, == 0.25 seconds
maxDelay = 750000 --                  == 0.75 seconds


worker requestChan workerId = do
      -- run a parallel thread for each worker
      -- `parallel` return `empty` to the invoking thread and spawn working threads
      -- since `threads` impose a limit of 1, parallel will perform the work with one thread
      r <- threads 1 $ parallel $ do
            delay <- randomRIO (minDelay, maxDelay)
            threadDelay delay
            mint <- atomically $ readTBMChan requestChan
            case mint of
                -- stop the thread when the channel is closed
                Nothing -> return SDone
                Just int -> return$ SMore(workerId, int, int * int)
      case r of
         SDone -> empty  -- no response, stop further actions
         SMore r -> return r


main =  keep $ do   -- run the Transient monad
    -- Create our communication channels. Now the response channel is
    -- also bounded and closable.
    requestChan <- liftIO . atomically $ newTBMChan (workerCount * 2)
--    responseChan <- atomically $ newTBMChan (workerCount * 2)      -- mot needed

    -- We're going to have three main threads. Let's define them all
    -- here. Note that we're _defining_ an action to be run, not
    -- running it yet! We'll run them below.
    let
        -- runWorkers is going to run all of the worker threads
        runWorkers = do
            -- use the alternative operator to spawn all the workers
            -- since each worker return empty to the main thread
            -- all the workers will be initiated
            foldr  (<|>) empty $ map (worker requestChan) [1..workerCount]
            -- Workers are all done, so close the response channel
--            atomically $ closeTBMChan responseChan

        -- Fill up the request channel, exactly the same as before
        fillRequests = do
            liftIO $ mapM_ (atomically . writeTBMChan requestChan) [1..workloadCount]
            liftIO $ atomically $ closeTBMChan requestChan
            -- this task stop here
            empty

        -- Print one result, no loop
        printResult  (workerId, int, square) = liftIO $ do
                    -- Print it...
                    putStrLn $ concat
                        [ "Worker #"
                        , show workerId
                        , ": square of "
                        , show int
                        , " is "
                        , show square
                        ]

    -- If any thread dies with an exception, the other threads are killed 
    -- and the event also will execute other finalization actions.
    -- `killChilds` will kill all the child threads created below
    -- this sentence could also restart the process if `noFinish` is added
    onFinish (\(Just (e :: SomeException)) -> killChilds)
    
    -- Now that we've defined our actions, we can use the alternative operator to
    -- run all of them. it forks a thread for each action and waits for all threads to exit
    -- successfully
    r <- runWorkers <|> fillRequests 
    printResult r


    return ()
Clone this wiki locally