|
| 1 | +{-# LANGUAGE DeriveAnyClass #-} |
| 2 | +{-# LANGUAGE DeriveGeneric #-} |
| 3 | +{-# LANGUAGE DerivingStrategies #-} |
| 4 | +{-# LANGUAGE FlexibleContexts #-} |
| 5 | +{-# LANGUAGE RankNTypes #-} |
| 6 | +{-# LANGUAGE StandaloneDeriving #-} |
| 7 | +{-# LANGUAGE UndecidableInstances #-} |
| 8 | + |
| 9 | +module Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.State |
| 10 | + ( ObjectDiffusionInboundState (..) |
| 11 | + , initObjectDiffusionInboundState |
| 12 | + , ObjectDiffusionInboundHandle (..) |
| 13 | + , ObjectDiffusionInboundHandleCollection (..) |
| 14 | + , ObjectDiffusionInboundStateView (..) |
| 15 | + , newObjectDiffusionInboundHandleCollection |
| 16 | + , bracketObjectDiffusionInbound |
| 17 | + ) |
| 18 | +where |
| 19 | + |
| 20 | +import Control.Monad.Class.MonadThrow (bracket) |
| 21 | +import Data.Map.Strict (Map) |
| 22 | +import qualified Data.Map.Strict as Map |
| 23 | +import GHC.Generics (Generic) |
| 24 | +import NoThunks.Class (NoThunks) |
| 25 | +import Ouroboros.Consensus.Block (BlockSupportsProtocol, HasHeader, Header) |
| 26 | +import Ouroboros.Consensus.MiniProtocol.Util.Idling (Idling (..)) |
| 27 | +import Ouroboros.Consensus.Util.IOLike |
| 28 | + ( IOLike (..) |
| 29 | + , MonadSTM (..) |
| 30 | + , StrictTVar |
| 31 | + , modifyTVar |
| 32 | + , newTVar |
| 33 | + , newTVarIO |
| 34 | + , readTVar |
| 35 | + ) |
| 36 | + |
| 37 | +-- | An ObjectDiffusion inbound client state that's used by other components. |
| 38 | +-- |
| 39 | +-- NOTE: 'blk' is not needed for now, but we keep it for future use. |
| 40 | +data ObjectDiffusionInboundState blk = ObjectDiffusionInboundState |
| 41 | + { odisIdling :: !Bool |
| 42 | + -- ^ Whether we have received all objects from a peer |
| 43 | + } |
| 44 | + deriving stock Generic |
| 45 | + |
| 46 | +deriving anyclass instance |
| 47 | + ( HasHeader blk |
| 48 | + , NoThunks (Header blk) |
| 49 | + ) => |
| 50 | + NoThunks (ObjectDiffusionInboundState blk) |
| 51 | + |
| 52 | +initObjectDiffusionInboundState :: ObjectDiffusionInboundState blk |
| 53 | +initObjectDiffusionInboundState = ObjectDiffusionInboundState{odisIdling = True} |
| 54 | + |
| 55 | +-- | An interface to an ObjectDiffusion inbound client that's used by other components. |
| 56 | +data ObjectDiffusionInboundHandle m blk = ObjectDiffusionInboundHandle |
| 57 | + { odihState :: !(StrictTVar m (ObjectDiffusionInboundState blk)) |
| 58 | + -- ^ Data shared between the client and external components. |
| 59 | + } |
| 60 | + deriving stock Generic |
| 61 | + |
| 62 | +deriving anyclass instance |
| 63 | + ( IOLike m |
| 64 | + , HasHeader blk |
| 65 | + , NoThunks (Header blk) |
| 66 | + ) => |
| 67 | + NoThunks (ObjectDiffusionInboundHandle m blk) |
| 68 | + |
| 69 | +-- | A collection of ObjectDiffusion inbound client handles for the peers of this node. |
| 70 | +data ObjectDiffusionInboundHandleCollection peer m blk = ObjectDiffusionInboundHandleCollection |
| 71 | + { odihcMap :: !(STM m (Map peer (ObjectDiffusionInboundHandle m blk))) |
| 72 | + -- ^ A map containing the handles for the peers in the collection |
| 73 | + , odihcAddHandle :: !(peer -> ObjectDiffusionInboundHandle m blk -> STM m ()) |
| 74 | + -- ^ Add the handle for the given peer to the collection |
| 75 | + , odihcRemoveHandle :: !(peer -> STM m ()) |
| 76 | + -- ^ Remove the handle for the given peer from the collection |
| 77 | + } |
| 78 | + deriving stock Generic |
| 79 | + |
| 80 | +newObjectDiffusionInboundHandleCollection :: |
| 81 | + (Ord peer, IOLike m, NoThunks peer, BlockSupportsProtocol blk) => |
| 82 | + STM m (ObjectDiffusionInboundHandleCollection peer m blk) |
| 83 | +newObjectDiffusionInboundHandleCollection = do |
| 84 | + handlesMap <- newTVar mempty |
| 85 | + return |
| 86 | + ObjectDiffusionInboundHandleCollection |
| 87 | + { odihcMap = readTVar handlesMap |
| 88 | + , odihcAddHandle = \peer handle -> |
| 89 | + modifyTVar handlesMap (Map.insert peer handle) |
| 90 | + , odihcRemoveHandle = \peer -> |
| 91 | + modifyTVar handlesMap (Map.delete peer) |
| 92 | + } |
| 93 | + |
| 94 | +-- | Interface for the ObjectDiffusion client to its state allocated by |
| 95 | +-- 'bracketObjectDiffusionInbound'. |
| 96 | +data ObjectDiffusionInboundStateView m = ObjectDiffusionInboundStateView |
| 97 | + { odisvIdling :: !(Idling m) |
| 98 | + } |
| 99 | + deriving stock Generic |
| 100 | + |
| 101 | +bracketObjectDiffusionInbound :: |
| 102 | + forall m peer blk a. |
| 103 | + (IOLike m, HasHeader blk, NoThunks (Header blk)) => |
| 104 | + ObjectDiffusionInboundHandleCollection peer m blk -> |
| 105 | + peer -> |
| 106 | + (ObjectDiffusionInboundStateView m -> m a) -> |
| 107 | + m a |
| 108 | +bracketObjectDiffusionInbound handles peer body = do |
| 109 | + odiState <- newTVarIO initObjectDiffusionInboundState |
| 110 | + bracket (acquireContext odiState) releaseContext body |
| 111 | + where |
| 112 | + acquireContext odiState = atomically $ do |
| 113 | + odihcAddHandle handles peer $ |
| 114 | + ObjectDiffusionInboundHandle |
| 115 | + { odihState = odiState |
| 116 | + } |
| 117 | + return |
| 118 | + ObjectDiffusionInboundStateView |
| 119 | + { odisvIdling = |
| 120 | + Idling |
| 121 | + { idlingStart = atomically $ modifyTVar odiState $ \s -> s{odisIdling = True} |
| 122 | + , idlingStop = atomically $ modifyTVar odiState $ \s -> s{odisIdling = False} |
| 123 | + } |
| 124 | + } |
| 125 | + |
| 126 | + releaseContext _ = atomically $ do |
| 127 | + odihcRemoveHandle handles peer |
0 commit comments