Skip to content
This repository was archived by the owner on Oct 19, 2024. It is now read-only.

Commit 0f4942b

Browse files
authored
[grpc-client] Handle bidirectional streams correctly (#314)
This is a breaking change: The handler for bidirectional streams is returns two conduits now, instead of one. This enables the client to correctly tackle the concurrent nature of the client to server stream and the server to client stream. Each response in the server-to-client stream is no longer wrapped in GRpcReply, any error during parsing the stream is thrown in IO. Other connection related errors are returned in the result value of the conduit corresponding to the server-to-client Conduit. Note: The client didn't and still doesn't handle any errors that the server might indicate using headers or trailers, e.g. grpc-status or the HTTP status code. This commit also adds TODO comments to handle these.
1 parent 5315abd commit 0f4942b

File tree

1 file changed

+39
-35
lines changed

1 file changed

+39
-35
lines changed

grpc/client/src/Mu/GRpc/Client/Internal.hs

+39-35
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import Control.Concurrent.Async
2121
import Control.Concurrent.STM (atomically)
2222
import Control.Concurrent.STM.TMChan
2323
import Control.Concurrent.STM.TMVar
24+
import Control.Exception (throwIO)
2425
import Control.Monad.IO.Class
2526
import Data.Avro
2627
import qualified Data.ByteString.Char8 as BS
@@ -38,7 +39,7 @@ import Network.GRPC.Client.Helpers
3839
import Network.GRPC.HTTP2.Encoding (GRPCInput, GRPCOutput)
3940
import Network.HTTP2 (ErrorCode)
4041
import Network.HTTP2.Client (ClientError, ClientIO, TooMuchConcurrency,
41-
runExceptT)
42+
runExceptT, ExceptT)
4243

4344
import Mu.Adapter.ProtoBuf.Via
4445
import Mu.GRpc.Avro
@@ -304,47 +305,50 @@ conduitFromChannel chan promise = go
304305

305306
instance ( KnownName name
306307
, GRpcInputWrapper p vref v, GRpcOutputWrapper p rref r
307-
, handler ~ (CompressMode -> IO (ConduitT v (GRpcReply r) IO ())) )
308+
, handler ~ (CompressMode -> IO (ConduitT v Void IO (), ConduitT () r IO (GRpcReply ()))))
308309
=> GRpcMethodCall p ('Method name '[ 'ArgStream aname vref ]
309310
('RetStream rref)) handler where
310311
gRpcMethodCall rpc _ client compress
311-
= do -- Create a new TMChan
312-
inchan <- newTMChanIO :: IO (TMChan (GRpcReply r))
313-
outchan <- newTMChanIO :: IO (TMChan v)
314-
var <- newEmptyTMVarIO -- if full, this means an error
312+
= do serverChan <- newTMChanIO :: IO (TMChan r)
313+
clientChan <- newTMChanIO :: IO (TMChan v)
314+
finalReply <- newEmptyTMVarIO :: IO (TMVar (GRpcReply ()))
315315
-- Start executing the client in another thread
316+
-- TODO: Is there anything that makes sure that this thread doesn't keep running forever?
316317
_ <- async $ do
317318
v <- simplifyResponse $
318319
buildGRpcReply3 <$>
319320
rawGeneralStream
320321
@_ @(GRpcIWTy p vref v) @(GRpcOWTy p rref r)
321322
rpc client
322-
() (\_ ievent -> do -- on the first iteration, say that everything is OK
323-
_ <- liftIO $ atomically $ tryPutTMVar var (GRpcOk ())
324-
case ievent of
325-
RecvMessage o -> liftIO $ atomically $ writeTMChan inchan (GRpcOk $ unGRpcOWTy(Proxy @p) (Proxy @rref) o)
326-
Invalid e -> liftIO $ atomically $ writeTMChan inchan (GRpcErrorString (show e))
327-
_ -> pure () )
328-
() (\_ -> do
329-
nextVal <- liftIO $ atomically $ readTMChan outchan
330-
case nextVal of
331-
Nothing -> pure ((), Finalize)
332-
Just v -> pure ((), SendMessage compress (buildGRpcIWTy (Proxy @p) (Proxy @vref) v)))
333-
case v of
334-
GRpcOk () -> liftIO $ atomically $ closeTMChan inchan
335-
_ -> liftIO $ atomically $ putTMVar var v
336-
-- This conduit feeds information to the other thread
337-
let go = do err <- liftIO $ atomically $ takeTMVar var
338-
case err of
339-
GRpcOk _ -> go2
340-
e -> yield $ (\_ -> error "this should never happen") <$> e
341-
go2 = do nextOut <- await
342-
case nextOut of
343-
Just v -> do liftIO $ atomically $ writeTMChan outchan v
344-
go2
345-
Nothing -> do r <- liftIO $ atomically $ tryReadTMChan inchan
346-
case r of
347-
Nothing -> pure () -- both are empty, end
348-
Just Nothing -> go2
349-
Just (Just nextIn) -> yield nextIn >> go2
350-
pure go
323+
() (incomingEventConsumer serverChan)
324+
() (outgoingEventProducer clientChan)
325+
liftIO $ atomically $ putTMVar finalReply v
326+
let clientConduit = do
327+
sinkTMChan clientChan
328+
liftIO . atomically . closeTMChan $ clientChan
329+
serverConduit = do
330+
sourceTMChan serverChan
331+
liftIO . atomically . readTMVar $ finalReply
332+
pure (clientConduit, serverConduit)
333+
where
334+
incomingEventConsumer :: TMChan r -> () -> IncomingEvent (GRpcOWTy p rref r) () -> ExceptT ClientError IO ()
335+
incomingEventConsumer serverChan _ ievent =
336+
case ievent of
337+
RecvMessage o -> do
338+
liftIO $ atomically $ writeTMChan serverChan (unGRpcOWTy (Proxy @p) (Proxy @rref) o)
339+
Invalid e -> liftIO $ do
340+
atomically $ closeTMChan serverChan
341+
throwIO e
342+
Trailers _ ->
343+
-- TODO: Read the trailers and use them to make the 'finalReply'
344+
liftIO $ atomically $ closeTMChan serverChan
345+
Headers _ ->
346+
-- TODO: Read the headers and use them to make the 'finalReply'
347+
pure ()
348+
349+
outgoingEventProducer :: TMChan v -> () -> ExceptT ClientError IO ((), OutgoingEvent (GRpcIWTy p vref v) ())
350+
outgoingEventProducer clientChan _ = do
351+
nextVal <- liftIO $ atomically $ readTMChan clientChan
352+
case nextVal of
353+
Nothing -> pure ((), Finalize)
354+
Just v -> pure ((), SendMessage compress (buildGRpcIWTy (Proxy @p) (Proxy @vref) v))

0 commit comments

Comments
 (0)