Skip to content

Commit 4ae8472

Browse files
nh2snoyberg
authored andcommitted
Make streamingProcess work with the non-threaded runtime. Fixes #40
1 parent 2987381 commit 4ae8472

File tree

1 file changed

+23
-2
lines changed

1 file changed

+23
-2
lines changed

Data/Streaming/Process.hs

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ module Data.Streaming.Process
3232
) where
3333

3434
import Control.Applicative as A ((<$>), (<*>))
35-
import Control.Concurrent (forkIOWithUnmask)
35+
import Control.Concurrent (forkIOWithUnmask, threadWaitRead)
3636
import Control.Concurrent.STM (STM, TMVar, atomically,
3737
newEmptyTMVar, putTMVar,
3838
readTMVar)
@@ -44,6 +44,7 @@ import Data.Streaming.Process.Internal
4444
import Data.Typeable (Typeable)
4545
import System.Exit (ExitCode (ExitSuccess))
4646
import System.IO (hClose)
47+
import System.Posix.IO (handleToFd)
4748
import System.Process
4849

4950
#if MIN_VERSION_process(1,2,0)
@@ -148,6 +149,19 @@ streamingProcess cp = liftIO $ do
148149
(getStdout, stdoutStream) = osStdStream
149150
(getStderr, stderrStream) = osStdStream
150151

152+
-- We use a pipe to the child process to determine when it's dead.
153+
-- In Unix, when there is a Unix pipe between two processes, then
154+
-- "When the child process terminates, its end of the pipe will be closed"
155+
-- (see https://stackoverflow.com/questions/8976004/using-waitpid-or-sigaction/8976461#8976461)
156+
-- See also http://tldp.org/LDP/lpg/node11.html about Unix pipes.
157+
-- Making this decision based on a pipe FD is better than `waitpid()` because
158+
-- we can use GHC IO manager's `threadWaitRead` function to wait in a
159+
-- non-blocking, non-polling way.
160+
-- TODO: Use `createPipeFd` instead of `createPipe` once this package
161+
-- requries process >= 1.4.2.0; then we don't have to use
162+
-- `handleToFd` below.
163+
(readHandle, writeHandle) <- createPipe
164+
151165
#if MIN_VERSION_process(1,2,0)
152166
(stdinH, stdoutH, stderrH, ph) <- PI.createProcess_ "streamingProcess" cp
153167
#else
@@ -158,12 +172,19 @@ streamingProcess cp = liftIO $ do
158172
, std_err = fromMaybe (std_err cp) stderrStream
159173
}
160174

175+
-- Close pipe write end from parent process (we don't need it).
176+
hClose writeHandle
177+
-- When the child process closes its write end (e.g. by terminating),
178+
-- we'll read EOF on our read end, and we wait for that to happen with
179+
-- the `threadWaitRead readFd` below.
180+
readFd <- handleToFd readHandle
181+
161182
ec <- atomically newEmptyTMVar
162183
-- Apparently waitForProcess can throw an exception itself when
163184
-- delegate_ctlc is True, so to avoid this TMVar from being left empty, we
164185
-- capture any exceptions and store them as an impure exception in the
165186
-- TMVar
166-
_ <- forkIOWithUnmask $ \_unmask -> try (waitForProcess ph)
187+
_ <- forkIOWithUnmask $ \_unmask -> try (threadWaitRead readFd >> waitForProcess ph)
167188
>>= atomically
168189
. putTMVar ec
169190
. either

0 commit comments

Comments
 (0)