Skip to content
This repository was archived by the owner on Oct 19, 2024. It is now read-only.

Commit 5315abd

Browse files
authored
[grpc-server] Close bidirectional streaming requests correctly (#312)
Changes: - Don't cancel the handler when server stream ends, this will allow the handler to finish. - Use TMChan instead of TMVar for consuming the server stream, this allows the handler to indicate that it has finished and there will be no more output being produced.
1 parent c045fda commit 5315abd

File tree

1 file changed

+17
-12
lines changed

1 file changed

+17
-12
lines changed

grpc/server/src/Mu/GRpc/Server.hs

+17-12
Original file line numberDiff line numberDiff line change
@@ -492,29 +492,34 @@ instance (GRpcInputWrapper p vref v, GRpcOutputWrapper p rref r, MonadIO m)
492492
-> m ( (), IncomingStream m (GRpcIWTy p vref v) ()
493493
, (), OutgoingStream m (GRpcOWTy p rref r) () )
494494
bdstream req = do
495-
-- Create a new TMChan and a new variable
496-
chan <- liftIO newTMChanIO :: m (TMChan v)
497-
let producer = sourceTMChan @m chan
498-
var <- liftIO newEmptyTMVarIO :: m (TMVar (Maybe r))
495+
-- Create a new TMChan for consuming the client stream, it will be
496+
-- the producer for the conduit.
497+
clientChan <- liftIO newTMChanIO :: m (TMChan v)
498+
let producer = sourceTMChan @m clientChan
499+
500+
-- Create a new TMChan for producing the server stream, it will be
501+
-- the consumer for the conduit.
502+
serverChan <- liftIO newTMChanIO :: m (TMChan r)
503+
let consumer = sinkTMChan @m serverChan
504+
499505
-- Start executing the handler
500-
promise <- liftIO $ async
501-
(raiseErrors $ f $ h req producer (toTMVarConduit var))
506+
handlerPromise <- liftIO $ async $ do
507+
raiseErrors $ f $ h req producer consumer
508+
atomically $ closeTMChan serverChan
509+
502510
-- Build the actual handler
503511
let cstreamHandler _ newInput
504512
= liftIO $ atomically $
505-
writeTMChan chan (unGRpcIWTy (Proxy @p) (Proxy @vref) newInput)
513+
writeTMChan clientChan (unGRpcIWTy (Proxy @p) (Proxy @vref) newInput)
506514
cstreamFinalizer _
507-
= liftIO $ atomically (closeTMChan chan) >> wait promise
515+
= liftIO $ atomically (closeTMChan clientChan) >> wait handlerPromise
508516
readNext _
509-
= do nextOutput <- liftIO $ atomically $ takeTMVar var
517+
= do nextOutput <- liftIO $ atomically $ readTMChan serverChan
510518
case nextOutput of
511519
Just o ->
512520
pure $ Just ((), buildGRpcOWTy (Proxy @p) (Proxy @rref) o)
513521
Nothing -> do
514-
liftIO $ cancel promise
515522
pure Nothing
516-
-- Nothing -> -- no new elements to output
517-
-- readNext ()
518523
pure ((), IncomingStream cstreamHandler cstreamFinalizer, (), OutgoingStream readNext)
519524

520525
-----

0 commit comments

Comments
 (0)