Skip to content

Commit 4d29adb

Browse files
committed
Extend node kernel peer state with object diffusion state
1 parent 10cb0aa commit 4d29adb

File tree

4 files changed

+55
-17
lines changed

4 files changed

+55
-17
lines changed

ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node/GSM.hs

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,6 @@ import qualified Ouroboros.Consensus.HardFork.History.Qry as Qry
5656
import qualified Ouroboros.Consensus.Ledger.Basics as L
5757
import Ouroboros.Consensus.Node.GsmState
5858
import Ouroboros.Consensus.Storage.ChainDB.API (ChainDB)
59-
import Ouroboros.Consensus.Util.NormalForm.StrictTVar (StrictTVar)
60-
import qualified Ouroboros.Consensus.Util.NormalForm.StrictTVar as StrictSTM
6159
import System.FS.API
6260
( HasFS
6361
, createDirectoryIfMissing
@@ -123,7 +121,7 @@ data GsmView m upstreamPeer selection peerState = GsmView
123121
, equivalent :: selection -> selection -> Bool
124122
-- ^ Whether the two selections are equivalent for the purpose of the
125123
-- Genesis State Machine
126-
, getPeerStates :: STM m (Map.Map upstreamPeer (StrictTVar m peerState))
124+
, getPeerStates :: STM m (Map.Map upstreamPeer peerState)
127125
-- ^ The current peer state with the latest candidates from the upstream peers
128126
, getCurrentSelection :: STM m selection
129127
-- ^ The node's current selection
@@ -371,11 +369,10 @@ realGsmEntryPoints tracerArgs gsmView =
371369
-- STAGE 1: all peers are idle, which means that
372370
-- * all ChainSync clients report no subsequent headers, and
373371
-- * all PerasCertDiffusion clients report no subsequent certificates
374-
varsState <- getPeerStates
375-
states <- traverse StrictSTM.readTVar varsState
372+
peerStates <- getPeerStates
376373
check $
377-
not (Map.null states)
378-
&& all peerIsIdle states
374+
not (Map.null peerStates)
375+
&& all peerIsIdle peerStates
379376

380377
-- STAGE 2: no candidate is better than the node's current
381378
-- selection
@@ -388,16 +385,15 @@ realGsmEntryPoints tracerArgs gsmView =
388385
-- block; general Praos reasoning ensures that won't take particularly
389386
-- long.
390387
selection <- getCurrentSelection
391-
candidates <- traverse StrictSTM.readTVar varsState
392388
candidateOverSelection <- getCandidateOverSelection
393389
let ok candidate =
394390
WhetherCandidateIsBetter False
395391
== candidateOverSelection selection candidate
396-
check $ all ok candidates
392+
check $ all ok peerStates
397393

398394
pure $
399395
GsmEventEnterCaughtUp
400-
(Map.size states)
396+
(Map.size peerStates)
401397
(cnvSelection selection)
402398

403399
-- STAGE 3: the previous stages weren't so slow that the idler

ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/NodeKernel.hs

Lines changed: 47 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ module Ouroboros.Consensus.NodeKernel
1717
MempoolCapacityBytesOverride (..)
1818
, NodeKernel (..)
1919
, NodeKernelArgs (..)
20+
, NodeKernelPeerState (..)
2021
, TraceForgeEvent (..)
2122
, getImmTipSlot
2223
, getMempoolReader
@@ -49,6 +50,7 @@ import Data.Functor ((<&>))
4950
import Data.Hashable (Hashable)
5051
import Data.List.NonEmpty (NonEmpty)
5152
import qualified Data.List.NonEmpty as NE
53+
import qualified Data.Map.Strict as Map
5254
import Data.Maybe (isJust, mapMaybe)
5355
import Data.Proxy
5456
import qualified Data.Text as Text
@@ -80,6 +82,16 @@ import Ouroboros.Consensus.MiniProtocol.ChainSync.Client.HistoricityCheck
8082
import Ouroboros.Consensus.MiniProtocol.ChainSync.Client.InFutureCheck
8183
( SomeHeaderInFutureCheck
8284
)
85+
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.State
86+
( ObjectDiffusionInboundHandleCollection (..)
87+
, ObjectDiffusionInboundState (..)
88+
, newObjectDiffusionInboundHandleCollection
89+
, odihState
90+
)
91+
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.PerasCert
92+
( PerasCertDiffusionInboundHandleCollection
93+
, PerasCertDiffusionInboundState
94+
)
8395
import Ouroboros.Consensus.Node.GSM (GsmNodeKernelArgs (..))
8496
import qualified Ouroboros.Consensus.Node.GSM as GSM
8597
import Ouroboros.Consensus.Node.Genesis
@@ -173,6 +185,9 @@ data NodeKernel m addrNTN addrNTC blk = NodeKernel
173185
-- from it with 'GSM.gsmStateToLedgerJudgement'.
174186
, getChainSyncHandles :: ChainSyncClientHandleCollection (ConnectionId addrNTN) m blk
175187
-- ^ The kill handle and exposed state for each ChainSync client.
188+
, getObjectDiffusionInboundHandles ::
189+
ObjectDiffusionInboundHandleCollection (ConnectionId addrNTN) m blk
190+
-- ^ The exposed state for each ObjectDiffusion inbound client.
176191
, getPeerSharingRegistry :: PeerSharingRegistry addrNTN m
177192
-- ^ Read the current peer sharing registry, used for interacting with
178193
-- the PeerSharing protocol
@@ -217,6 +232,12 @@ data NodeKernelArgs m addrNTN addrNTC blk = NodeKernelArgs
217232
, getDiffusionPipeliningSupport :: DiffusionPipeliningSupport
218233
}
219234

235+
-- | State about peers we are connected to
236+
data NodeKernelPeerState blk = NodeKernelPeerState
237+
{ chainSyncState :: ChainSyncState blk
238+
, perasCertDiffusionInboundState :: PerasCertDiffusionInboundState blk
239+
}
240+
220241
initNodeKernel ::
221242
forall m addrNTN addrNTC blk.
222243
( IOLike m
@@ -254,6 +275,7 @@ initNodeKernel
254275
, mempool
255276
, peerSharingRegistry
256277
, varChainSyncHandles
278+
, varPerasCertDiffusionHandles
257279
, varGsmState
258280
} = st
259281

@@ -273,23 +295,38 @@ initNodeKernel
273295
{ GSM.antiThunderingHerd = Just gsmAntiThunderingHerd
274296
, GSM.getCandidateOverSelection = do
275297
weights <- ChainDB.getPerasWeightSnapshot chainDB
276-
pure $ \(headers, _lst) state ->
277-
case AF.intersectionPoint headers (csCandidate state) of
298+
pure $ \(headers, _lst) peerState -> do
299+
let candidate = csCandidate (chainSyncState peerState)
300+
case AF.intersectionPoint headers candidate of
278301
Nothing -> GSM.CandidateDoesNotIntersect
279302
Just{} ->
280303
GSM.WhetherCandidateIsBetter $ -- precondition requires intersection
281304
preferAnchoredCandidate
282305
(configBlock cfg)
283306
(forgetFingerprint weights)
284307
headers
285-
(csCandidate state)
286-
, GSM.peerIsIdle = csIdling
308+
candidate
309+
, GSM.peerIsIdle = \peerState -> do
310+
csIdling (chainSyncState peerState)
311+
&& odisIdling (perasCertDiffusionInboundState peerState)
287312
, GSM.durationUntilTooOld =
288313
gsmDurationUntilTooOld
289314
<&> \wd (_headers, lst) ->
290315
GSM.getDurationUntilTooOld wd (getTipSlot lst)
291316
, GSM.equivalent = (==) `on` (AF.headPoint . fst)
292-
, GSM.getPeerStates = fmap cschState <$> cschcMap varChainSyncHandles
317+
, GSM.getPeerStates = do
318+
chainSyncPeers <- cschcMap varChainSyncHandles
319+
perasCertDiffusionPeers <- odihcMap varPerasCertDiffusionHandles
320+
let commonPeers = Map.intersectionWith (,) chainSyncPeers perasCertDiffusionPeers
321+
-- TODO understand whether map intersection provides the right semantics here
322+
forM commonPeers $ \(cscHandle, odiHandle) -> do
323+
chainSyncState <- readTVar (cschState cscHandle)
324+
perasCertDiffusionState <- readTVar (odihState odiHandle)
325+
pure
326+
NodeKernelPeerState
327+
{ chainSyncState = chainSyncState
328+
, perasCertDiffusionInboundState = perasCertDiffusionState
329+
}
293330
, GSM.getCurrentSelection = do
294331
headers <- ChainDB.getCurrentChainWithTime chainDB
295332
extLedgerState <- ChainDB.getCurrentLedger chainDB
@@ -366,6 +403,7 @@ initNodeKernel
366403
, getFetchMode = readFetchMode blockFetchInterface
367404
, getGsmState = readTVar varGsmState
368405
, getChainSyncHandles = varChainSyncHandles
406+
, getObjectDiffusionInboundHandles = varPerasCertDiffusionHandles
369407
, getPeerSharingRegistry = peerSharingRegistry
370408
, getTracers = tracers
371409
, setBlockForging = \a -> atomically . LazySTM.putTMVar blockForgingVar $! a
@@ -416,6 +454,8 @@ data InternalState m addrNTN addrNTC blk = IS
416454
BlockFetchConsensusInterface (ConnectionId addrNTN) (HeaderWithTime blk) blk m
417455
, fetchClientRegistry :: FetchClientRegistry (ConnectionId addrNTN) (HeaderWithTime blk) blk m
418456
, varChainSyncHandles :: ChainSyncClientHandleCollection (ConnectionId addrNTN) m blk
457+
, varPerasCertDiffusionHandles ::
458+
PerasCertDiffusionInboundHandleCollection (ConnectionId addrNTN) m blk
419459
, varGsmState :: StrictTVar m GSM.GsmState
420460
, mempool :: Mempool m blk
421461
, peerSharingRegistry :: PeerSharingRegistry addrNTN m
@@ -454,6 +494,8 @@ initInternalState
454494
newTVarIO gsmState
455495

456496
varChainSyncHandles <- atomically newChainSyncClientHandleCollection
497+
varPerasCertDiffusionHandles <- atomically newObjectDiffusionInboundHandleCollection
498+
457499
mempool <-
458500
openMempool
459501
registry

ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/GSM.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ setupGsm isHaaSatisfied vars = do
142142
, GSM.peerIsIdle = isIdling
143143
, GSM.durationUntilTooOld = Just durationUntilTooOld
144144
, GSM.equivalent = (==) -- unsound, but harmless in this test
145-
, GSM.getPeerStates = readTVar varStates
145+
, GSM.getPeerStates = traverse readTVar =<< readTVar varStates
146146
, GSM.getCurrentSelection = readTVar varSelection
147147
, GSM.minCaughtUpDuration = thrashLimit
148148
, GSM.setCaughtUpPersistentMark = \b ->

ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/Genesis/Tests/LoE/CaughtUp.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@ mkGsmEntryPoints varChainSyncHandles chainDB writeGsmState =
283283
{ GSM.getCandidateOverSelection = pure candidateOverSelection
284284
, GSM.peerIsIdle = csIdling
285285
, GSM.equivalent = (==) `on` AF.headPoint
286-
, GSM.getPeerStates = fmap cschState <$> cschcMap varChainSyncHandles
286+
, GSM.getPeerStates = traverse readTVar =<< fmap cschState <$> cschcMap varChainSyncHandles
287287
, GSM.getCurrentSelection = ChainDB.getCurrentChain chainDB
288288
, -- Make sure that we stay in CaughtUp for the duration of the test once we
289289
-- have entered it.

0 commit comments

Comments
 (0)