Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
### Non-Breaking

- Addapted tests to changes in the `Ouroboros.Network.TxSubmission.Mempool.Simple` API

Original file line number Diff line number Diff line change
Expand Up @@ -682,7 +682,7 @@ applications debugTracer txSubmissionInboundTracer txSubmissionInboundDebug node

txSubmissionInitiator
:: TxDecisionPolicy
-> Mempool m (Tx TxId)
-> Mempool m TxId (Tx TxId)
-> MiniProtocolCb (ExpandedInitiatorContext NtNAddr m) ByteString m ()
txSubmissionInitiator txDecisionPolicy mempool =
MiniProtocolCb $
Expand All @@ -709,7 +709,7 @@ applications debugTracer txSubmissionInboundTracer txSubmissionInboundDebug node
(txSubmissionClientPeer client)

txSubmissionResponder
:: Mempool m (Tx TxId)
:: Mempool m TxId (Tx TxId)
-> TxChannelsVar m NtNAddr Int (Tx Int)
-> TxMempoolSem m
-> SharedTxStateVar m NtNAddr Int (Tx Int)
Expand Down
21 changes: 16 additions & 5 deletions dmq-node/app/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

module Main where

import Control.Exception (throwIO)
import Control.Monad (void, when)
import Control.Tracer (Tracer (..), nullTracer, traceWith)

Expand All @@ -22,6 +23,7 @@ import System.Exit (exitSuccess)
import System.Random (newStdGen, split)

import Cardano.Git.Rev (gitRev)
import Cardano.KESAgent.KES.Evolution qualified as KES
import Cardano.KESAgent.Protocols.StandardCrypto (StandardCrypto)

import DMQ.Configuration
Expand Down Expand Up @@ -68,6 +70,7 @@ runDMQ commandLineConfig = do
let dmqConfig@Configuration {
dmqcPrettyLog = I prettyLog,
dmqcTopologyFile = I topologyFile,
dmqcShelleyGenesisFile = I genesisFile,
dmqcHandshakeTracer = I handshakeTracer,
dmqcLocalHandshakeTracer = I localHandshakeTracer,
dmqcVersion = I version
Expand Down Expand Up @@ -95,25 +98,33 @@ runDMQ commandLineConfig = do
]
exitSuccess

res <- KES.evolutionConfigFromGenesisFile genesisFile
evolutionConfig <- case res of
Left err -> traceWith tracer (WithEventType "ShelleyGenesisFile" err)
>> throwIO (userError $ err)
Right ev -> return ev

traceWith tracer (WithEventType "Configuration" dmqConfig)
nt <- readTopologyFileOrError topologyFile
traceWith tracer (WithEventType "NetworkTopology" nt)

stdGen <- newStdGen
let (psRng, policyRng) = split stdGen

withNodeKernel @StandardCrypto tracer dmqConfig psRng $ \nodeKernel -> do
withNodeKernel @StandardCrypto
tracer
dmqConfig
evolutionConfig
psRng $ \nodeKernel -> do
dmqDiffusionConfiguration <- mkDiffusionConfiguration dmqConfig nt

let dmqNtNApps =
ntnApps tracer
dmqConfig
nodeKernel
(dmqCodecs
-- TODO: `maxBound :: Cardano.Network.NodeToNode.NodeToNodeVersion`
-- is unsafe here!
(encodeRemoteAddress (maxBound :: NodeToNodeVersion))
(decodeRemoteAddress (maxBound :: NodeToNodeVersion)))
(encodeRemoteAddress (maxBound @NodeToNodeVersion))
(decodeRemoteAddress (maxBound @NodeToNodeVersion)))
dmqLimitsAndTimeouts
defaultSigDecisionPolicy
dmqNtCApps =
Expand Down
2 changes: 1 addition & 1 deletion dmq-node/cddl/specs/sig.cddl
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ messagePayload = [

messageId = bstr
messageBody = bstr
kesSignature = bstr
kesSignature = bstr .size 448
kesPeriod = word64
operationalCertificate = [ bstr .size 32, word64, word64, bstr .size 64 ]
coldVerificationKey = bstr .size 32
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<!--
A new scriv changelog fragment.

Uncomment the section that is right (remove the HTML comment wrapper).
For top level release notes, leave all the headers commented out.
-->

### Breaking

- Using `KESPeriod` from `Cardano.Crypto.KES` instead of `SigKESPeriod`
newtype. `KESPeriod` is used by `SigRaw` data type.
- `SigKESSignature` holds `SigKES (KES crypto)` instead of a `ByteString`.
- `SigColdKey` holds `VerKeyDSIGN` instead of a `ByteString`.
- `ntnApps` constraints changed in order to use `sigValidate` function.

### Non-Breaking

- `Sig` codec decodes KES signatures, and the cold key.
- Added `DMQ.SigSubmission.Type.validateSig` and `SigValidationError`.

10 changes: 9 additions & 1 deletion dmq-node/dmq-node.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,15 @@ extra-doc-files: CHANGELOG.md

flag cddl
description: Enable CDDL based tests of the CBOR encoding
manual: True
-- These tests need the cddl and the cbor-diag Ruby-package
default: True

flag standardcrypto-tests
description: Enable StandardCrypto tests
-- these tests are flaky on GH Windows instances
manual: True
default: True

common extensions
default-extensions:
BangPatterns
Expand Down Expand Up @@ -186,6 +191,9 @@ test-suite dmq-tests
-T
-RTS

if flag(standardcrypto-tests)
cpp-options: -DSTANDARDCRYPTO_TESTS

test-suite dmq-cddl
import:
warnings,
Expand Down
8 changes: 8 additions & 0 deletions dmq-node/src/DMQ/Configuration.hs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ data Configuration' f =
dmqcPortNumber :: f PortNumber,
dmqcConfigFile :: f FilePath,
dmqcTopologyFile :: f FilePath,
dmqcShelleyGenesisFile :: f FilePath,
-- ^ shelley genesis file, e.g.
-- `/configuration/cardano/mainnet-shelley-genesis.json` in `cardano-node`
-- repo.
dmqcAcceptedConnectionsLimit :: f AcceptedConnectionsLimit,
dmqcDiffusionMode :: f DiffusionMode,
dmqcTargetOfRootPeers :: f Int,
Expand Down Expand Up @@ -210,6 +214,7 @@ defaultConfiguration = Configuration {
dmqcPortNumber = I 3_141,
dmqcConfigFile = I "dmq.configuration.yaml",
dmqcTopologyFile = I "dmq.topology.json",
dmqcShelleyGenesisFile = I "mainnet-shelley-genesis.json",
dmqcAcceptedConnectionsLimit = I defaultAcceptedConnectionsLimit,
dmqcDiffusionMode = I InitiatorAndResponderDiffusionMode,
dmqcTargetOfRootPeers = I targetNumberOfRootPeers,
Expand Down Expand Up @@ -300,6 +305,8 @@ instance FromJSON PartialConfig where
dmqcDiffusionMode <- Last <$> v .:? "DiffusionMode"
dmqcPeerSharing <- Last <$> v .:? "PeerSharing"

dmqcShelleyGenesisFile <- Last <$> v .:? "ShelleyGenesisFile"

dmqcTargetOfRootPeers <- Last <$> v .:? "TargetNumberOfRootPeers"
dmqcTargetOfKnownPeers <- Last <$> v .:? "TargetNumberOfKnownPeers"
dmqcTargetOfEstablishedPeers <- Last <$> v .:? "TargetNumberOfEstablishedPeers"
Expand Down Expand Up @@ -376,6 +383,7 @@ instance ToJSON Configuration where
, "LocalAddress" .= unI dmqcLocalAddress
, "ConfigFile" .= unI dmqcConfigFile
, "TopologyFile" .= unI dmqcTopologyFile
, "ShelleyGenesisFile" .= unI dmqcShelleyGenesisFile
, "AcceptedConnectionsLimit" .= unI dmqcAcceptedConnectionsLimit
, "DiffusionMode" .= unI dmqcDiffusionMode
, "TargetOfRootPeers" .= unI dmqcTargetOfRootPeers
Expand Down
56 changes: 40 additions & 16 deletions dmq-node/src/DMQ/Diffusion/NodeKernel.hs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@ import Data.Aeson qualified as Aeson
import Data.Function (on)
import Data.Functor.Contravariant ((>$<))
import Data.Hashable
import Data.Sequence (Seq)
import Data.Sequence qualified as Seq
import Data.Set (Set)
import Data.Set qualified as Set
import Data.Time.Clock.POSIX (POSIXTime)
import Data.Time.Clock.POSIX qualified as Time
import Data.Void (Void)
import System.Random (StdGen)
import System.Random qualified as Random

import Cardano.KESAgent.KES.Crypto (Crypto (..))
import Cardano.KESAgent.KES.Evolution qualified as KES

import Ouroboros.Network.BlockFetch (FetchClientRegistry,
newFetchClientRegistry)
Expand All @@ -37,11 +41,12 @@ import Ouroboros.Network.PeerSharing (PeerSharingAPI, PeerSharingRegistry,
newPeerSharingAPI, newPeerSharingRegistry,
ps_POLICY_PEER_SHARE_MAX_PEERS, ps_POLICY_PEER_SHARE_STICKY_TIME)
import Ouroboros.Network.TxSubmission.Inbound.V2
import Ouroboros.Network.TxSubmission.Mempool.Simple (Mempool (..))
import Ouroboros.Network.TxSubmission.Mempool.Simple (Mempool (..),
MempoolSeq (..))
import Ouroboros.Network.TxSubmission.Mempool.Simple qualified as Mempool

import DMQ.Configuration
import DMQ.Protocol.SigSubmission.Type (Sig (sigExpiresAt), SigId)
import DMQ.Protocol.SigSubmission.Type (Sig (sigExpiresAt, sigId), SigId)
import DMQ.Tracer


Expand All @@ -54,7 +59,8 @@ data NodeKernel crypto ntnAddr m =
-- the PeerSharing protocol
, peerSharingRegistry :: !(PeerSharingRegistry ntnAddr m)
, peerSharingAPI :: !(PeerSharingAPI ntnAddr StdGen m)
, mempool :: !(Mempool m (Sig crypto))
, mempool :: !(Mempool m SigId (Sig crypto))
, evolutionConfig :: !(KES.EvolutionConfig)
, sigChannelVar :: !(TxChannelsVar m ntnAddr SigId (Sig crypto))
, sigMempoolSem :: !(TxMempoolSem m)
, sigSharedTxStateVar :: !(SharedTxStateVar m ntnAddr SigId (Sig crypto))
Expand All @@ -64,9 +70,10 @@ newNodeKernel :: ( MonadLabelledSTM m
, MonadMVar m
, Ord ntnAddr
)
=> StdGen
=> KES.EvolutionConfig
-> StdGen
-> m (NodeKernel crypto ntnAddr m)
newNodeKernel rng = do
newNodeKernel evolutionConfig rng = do
publicPeerSelectionStateVar <- makePublicPeerSelectionStateVar

fetchClientRegistry <- newFetchClientRegistry
Expand All @@ -89,6 +96,7 @@ newNodeKernel rng = do
, peerSharingRegistry
, peerSharingAPI
, mempool
, evolutionConfig
, sigChannelVar
, sigMempoolSem
, sigSharedTxStateVar
Expand All @@ -110,6 +118,7 @@ withNodeKernel :: forall crypto ntnAddr m a.
)
=> (forall ev. Aeson.ToJSON ev => Tracer m (WithEventType ev))
-> Configuration
-> KES.EvolutionConfig
-> StdGen
-> (NodeKernel crypto ntnAddr m -> m a)
-- ^ as soon as the callback exits the `mempoolWorker` and all
Expand All @@ -119,12 +128,13 @@ withNodeKernel tracer
Configuration {
dmqcSigSubmissionLogicTracer = I sigSubmissionLogicTracer
}
evolutionConfig
rng k = do
nodeKernel@NodeKernel { mempool,
sigChannelVar,
sigSharedTxStateVar
}
<- newNodeKernel rng
<- newNodeKernel evolutionConfig rng
withAsync (mempoolWorker mempool)
$ \mempoolThread ->
withAsync (decisionLogicThreads
Expand All @@ -146,22 +156,36 @@ mempoolWorker :: forall crypto m.
, MonadSTM m
, MonadTime m
)
=> Mempool m (Sig crypto)
=> Mempool m SigId (Sig crypto)
-> m Void
mempoolWorker (Mempool v) = loop
where
loop = do
now <- getCurrentPOSIXTime
rt <- atomically $ do
(sigs :: Seq.Seq (Sig crypto)) <- readTVar v
let sigs' :: Seq.Seq (Sig crypto)
(resumeTime, sigs') =
foldr (\a (rt, as) -> if sigExpiresAt a <= now
then (rt, as)
else (rt `min` sigExpiresAt a, a Seq.<| as))
(now, Seq.empty)
sigs
writeTVar v sigs'
MempoolSeq { mempoolSeq, mempoolSet } <- readTVar v
let mempoolSeq' :: Seq (Sig crypto)
mempoolSet', expiredSet' :: Set SigId

(resumeTime, expiredSet', mempoolSeq') =
foldr (\sig (rt, expiredSet, sigs) ->
if sigExpiresAt sig <= now
then ( rt
, sigId sig `Set.insert` expiredSet
, sigs
)
else ( rt `min` sigExpiresAt sig
, expiredSet
, sig Seq.<| sigs
)
)
(now, Set.empty, Seq.empty)
mempoolSeq

mempoolSet' = mempoolSet `Set.difference` expiredSet'

writeTVar v MempoolSeq { mempoolSet = mempoolSet',
mempoolSeq = mempoolSeq' }
return resumeTime

now' <- getCurrentPOSIXTime
Expand Down
14 changes: 12 additions & 2 deletions dmq-node/src/DMQ/NodeToNode.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeOperators #-}

module DMQ.NodeToNode
( RemoteAddress
Expand Down Expand Up @@ -40,6 +41,7 @@ import Codec.CBOR.Encoding qualified as CBOR
import Codec.CBOR.Read qualified as CBOR
import Codec.CBOR.Term qualified as CBOR
import Data.Aeson qualified as Aeson
import Data.ByteString qualified as BS
import Data.ByteString.Lazy qualified as BL
import Data.Functor.Contravariant ((>$<))
import Data.Hashable (Hashable)
Expand All @@ -52,7 +54,10 @@ import Network.Mux.Types (Mode (..))
import Network.Mux.Types qualified as Mx
import Network.TypedProtocol.Codec (AnnotatedCodec, Codec)

import Cardano.Crypto.DSIGN.Class qualified as DSIGN
import Cardano.Crypto.KES.Class qualified as KES
import Cardano.KESAgent.KES.Crypto (Crypto (..))
import Cardano.KESAgent.KES.OCert (OCertSignable)

import DMQ.Configuration (Configuration, Configuration' (..), I (..))
import DMQ.Diffusion.NodeKernel (NodeKernel (..))
Expand Down Expand Up @@ -147,6 +152,10 @@ data Apps addr m a b =
ntnApps
:: forall crypto m addr .
( Crypto crypto
, DSIGN.ContextDSIGN (DSIGN crypto) ~ ()
, DSIGN.Signable (DSIGN crypto) (OCertSignable crypto)
, KES.ContextKES (KES crypto) ~ ()
, KES.Signable (KES crypto) BS.ByteString
, Typeable crypto
, Alternative (STM m)
, MonadAsync m
Expand Down Expand Up @@ -187,6 +196,7 @@ ntnApps
, peerSharingRegistry
, peerSharingAPI
, mempool
, evolutionConfig
, sigChannelVar
, sigMempoolSem
, sigSharedTxStateVar
Expand Down Expand Up @@ -224,8 +234,8 @@ ntnApps
-- connection if we receive one, rather than validate them in the
-- mempool.
mempoolWriter = Mempool.getWriter sigId
(pure ())
(\_ _ -> Right () :: Either Void ())
(pure ()) -- TODO not needed
(\_ -> validateSig evolutionConfig)
(\_ -> True)
mempool

Expand Down
Loading