Skip to content

Commit 0d1a363

Browse files
committed
Introduce bracketObjectDiffusionInbound
1 parent fdfbf6b commit 0d1a363

File tree

4 files changed

+85
-13
lines changed
  • ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Network
  • ouroboros-consensus
    • src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion
    • test/consensus-test/Test/Consensus/MiniProtocol/ObjectDiffusion

4 files changed

+85
-13
lines changed

ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Network/NodeToNode.hs

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,10 @@ import Ouroboros.Consensus.MiniProtocol.ChainSync.Client
6969
import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client as CsClient
7070
import Ouroboros.Consensus.MiniProtocol.ChainSync.Server
7171
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound (objectDiffusionInbound)
72+
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.State
73+
( ObjectDiffusionInboundStateView
74+
, bracketObjectDiffusionInbound
75+
)
7276
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.PerasCert
7377
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Outbound (objectDiffusionOutbound)
7478
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.PerasCert
@@ -212,6 +216,7 @@ data Handlers m addr blk = Handlers
212216
, hPerasCertDiffusionClient ::
213217
NodeToNodeVersion ->
214218
ControlMessageSTM m ->
219+
ObjectDiffusionInboundStateView m ->
215220
ConnectionId addr ->
216221
PerasCertDiffusionInboundPipelined blk m ()
217222
, hPerasCertDiffusionServer ::
@@ -314,7 +319,7 @@ mkHandlers
314319
(mapTxSubmissionMempoolReader txForgetValidated $ getMempoolReader getMempool)
315320
(getMempoolWriter getMempool)
316321
version
317-
, hPerasCertDiffusionClient = \version controlMessageSTM peer ->
322+
, hPerasCertDiffusionClient = \version controlMessageSTM state peer ->
318323
objectDiffusionInbound
319324
(contramap (TraceLabelPeer peer) (Node.perasCertDiffusionInboundTracer tracers))
320325
( perasCertDiffusionMaxFifoLength miniProtocolParameters
@@ -324,6 +329,7 @@ mkHandlers
324329
(makePerasCertPoolWriterFromChainDB $ getChainDB)
325330
version
326331
controlMessageSTM
332+
state
327333
, hPerasCertDiffusionServer = \version peer ->
328334
objectDiffusionOutbound
329335
(contramap (TraceLabelPeer peer) (Node.perasCertDiffusionOutboundTracer tracers))
@@ -862,17 +868,21 @@ mkApps kernel rng Tracers{..} mkCodecs ByteLimits{..} chainSyncTimeouts lopBucke
862868
}
863869
channel = do
864870
labelThisThread "PerasCertDiffusionClient"
865-
((), trailing) <-
866-
runPipelinedPeerWithLimits
867-
(TraceLabelPeer them `contramap` tPerasCertDiffusionTracer)
868-
(cPerasCertDiffusionCodec (mkCodecs version))
869-
blPerasCertDiffusion
870-
timeLimitsObjectDiffusion
871-
channel
872-
( objectDiffusionInboundPeerPipelined
873-
(hPerasCertDiffusionClient version controlMessageSTM them)
874-
)
875-
return (NoInitiatorResult, trailing)
871+
bracketObjectDiffusionInbound
872+
(getPerasCertDiffusionHandles kernel)
873+
them
874+
$ \state -> do
875+
((), trailing) <-
876+
runPipelinedPeerWithLimits
877+
(TraceLabelPeer them `contramap` tPerasCertDiffusionTracer)
878+
(cPerasCertDiffusionCodec (mkCodecs version))
879+
blPerasCertDiffusion
880+
timeLimitsObjectDiffusion
881+
channel
882+
( objectDiffusionInboundPeerPipelined
883+
(hPerasCertDiffusionClient version controlMessageSTM state them)
884+
)
885+
return (NoInitiatorResult, trailing)
876886

877887
aPerasCertDiffusionServer ::
878888
NodeToNodeVersion ->

ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Inbound.hs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ import Data.Word (Word64)
3838
import GHC.Generics (Generic)
3939
import Network.TypedProtocol.Core (N (Z), Nat (..), natToInt)
4040
import NoThunks.Class (NoThunks (..), unsafeNoThunks)
41+
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.State
42+
( ObjectDiffusionInboundStateView
43+
)
4144
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.API
4245
import Ouroboros.Network.ControlMessage
4346
import Ouroboros.Network.NodeToNode.Version (NodeToNodeVersion)
@@ -131,13 +134,15 @@ objectDiffusionInbound ::
131134
ObjectPoolWriter objectId object m ->
132135
NodeToNodeVersion ->
133136
ControlMessageSTM m ->
137+
ObjectDiffusionInboundStateView m ->
134138
ObjectDiffusionInboundPipelined objectId object m ()
135139
objectDiffusionInbound
136140
tracer
137141
(maxFifoLength, maxNumIdsToReq, maxNumObjectsToReq)
138142
ObjectPoolWriter{..}
139143
_version
140-
controlMessageSTM =
144+
controlMessageSTM
145+
_state =
141146
ObjectDiffusionInboundPipelined $ do
142147
continueWithStateM (go Zero) initialInboundSt
143148
where

ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Inbound/State.hs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,32 +2,40 @@
22
{-# LANGUAGE DeriveGeneric #-}
33
{-# LANGUAGE DerivingStrategies #-}
44
{-# LANGUAGE FlexibleContexts #-}
5+
{-# LANGUAGE RankNTypes #-}
56
{-# LANGUAGE StandaloneDeriving #-}
67
{-# LANGUAGE UndecidableInstances #-}
78

89
module Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.State
910
( ObjectDiffusionInboundState (..)
1011
, ObjectDiffusionInboundHandle (..)
1112
, ObjectDiffusionInboundHandleCollection (..)
13+
, ObjectDiffusionInboundStateView (..)
1214
, newObjectDiffusionInboundHandleCollection
15+
, bracketObjectDiffusionInbound
1316
)
1417
where
1518

19+
import Control.Monad.Class.MonadThrow (bracket)
1620
import Data.Map.Strict (Map)
1721
import qualified Data.Map.Strict as Map
1822
import GHC.Generics (Generic)
1923
import NoThunks.Class (NoThunks)
2024
import Ouroboros.Consensus.Block (BlockSupportsProtocol, HasHeader, Header)
25+
import Ouroboros.Consensus.MiniProtocol.Util.Idling (Idling (..))
2126
import Ouroboros.Consensus.Util.IOLike
2227
( IOLike (..)
2328
, MonadSTM (..)
2429
, StrictTVar
2530
, modifyTVar
2631
, newTVar
32+
, newTVarIO
2733
, readTVar
2834
)
2935

3036
-- | An ObjectDiffusion inbound client state that's used by other components.
37+
--
38+
-- NOTE: 'blk' is not needed for now, but we keep it for future use.
3139
data ObjectDiffusionInboundState blk = ObjectDiffusionInboundState
3240
{ odisIdling :: !Bool
3341
-- ^ Whether we have received all objects from a peer
@@ -82,3 +90,47 @@ newObjectDiffusionInboundHandleCollection = do
8290
, odihcRemoveAllHandles =
8391
modifyTVar handlesMap (const mempty)
8492
}
93+
94+
-- | Interface for the ObjectDiffusion client to its state allocated by
95+
-- 'bracketChainSyncClient'.
96+
data ObjectDiffusionInboundStateView m = ObjectDiffusionInboundStateView
97+
{ odisvIdling :: !(Idling m)
98+
}
99+
deriving stock Generic
100+
101+
bracketObjectDiffusionInbound ::
102+
forall m peer blk a.
103+
(IOLike m, HasHeader blk, NoThunks (Header blk)) =>
104+
ObjectDiffusionInboundHandleCollection peer m blk ->
105+
peer ->
106+
(ObjectDiffusionInboundStateView m -> m a) ->
107+
m a
108+
bracketObjectDiffusionInbound handles peer body =
109+
mkObjectDiffusionInboundState >>= \odiState ->
110+
bracket
111+
(acquireContext odiState)
112+
releaseContext
113+
body
114+
where
115+
acquireContext odiState = atomically $ do
116+
odihcAddHandle handles peer $
117+
ObjectDiffusionInboundHandle
118+
{ odihState = odiState
119+
}
120+
return
121+
ObjectDiffusionInboundStateView
122+
{ odisvIdling =
123+
Idling
124+
{ idlingStart = atomically $ modifyTVar odiState $ \s -> s{odisIdling = True}
125+
, idlingStop = atomically $ modifyTVar odiState $ \s -> s{odisIdling = False}
126+
}
127+
}
128+
129+
releaseContext _ = atomically $ do
130+
odihcRemoveHandle handles peer
131+
132+
mkObjectDiffusionInboundState =
133+
newTVarIO
134+
ObjectDiffusionInboundState
135+
{ odisIdling = False
136+
}

ouroboros-consensus/test/consensus-test/Test/Consensus/MiniProtocol/ObjectDiffusion/Smoke.hs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,15 @@ import NoThunks.Class (NoThunks)
3131
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound
3232
( objectDiffusionInbound
3333
)
34+
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.State
35+
( ObjectDiffusionInboundStateView (..)
36+
)
3437
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.API
3538
( ObjectPoolReader (..)
3639
, ObjectPoolWriter (..)
3740
)
3841
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Outbound (objectDiffusionOutbound)
42+
import qualified Ouroboros.Consensus.MiniProtocol.Util.Idling as Idling
3943
import Ouroboros.Consensus.Util.IOLike
4044
( IOLike
4145
, MonadDelay (..)
@@ -267,6 +271,7 @@ prop_smoke_object_diffusion
267271
inboundPoolWriter
268272
nodeToNodeVersion
269273
(readTVar controlMessage)
274+
ObjectDiffusionInboundStateView{odisvIdling = Idling.noIdling}
270275

271276
outbound =
272277
objectDiffusionOutbound

0 commit comments

Comments
 (0)