Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions cabal.project
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,9 @@ source-repository-package
source-repository-package
type: git
location: https://github.com/IntersectMBO/ouroboros-network
tag: 3c4433d05ec012af6d1a26e6b5e86665627c08c4
--sha256: sha256-Jemp6PlzISA+l1wdXV6MrIxaBpAxdrLLAlbkB7ZqF2Y=
-- from coot/dmq-related-changes
tag: adc2fa928d1856a8f0069190b7f698d2218f2110
--sha256: sha256-z1ftTOd3/EO6kjbIqLuRBtaGtkB1kNrpPOrF/q6oigM=
subdir:
acts-generic
cardano-diffusion
Expand Down
3 changes: 2 additions & 1 deletion dmq-node/app/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ runDMQ commandLineConfig = do

stdGen <- newStdGen
let (psRng, policyRng) = split stdGen
policyRngVar <- newTVarIO policyRng

-- TODO: this might not work, since `ouroboros-network` creates its own IO Completion Port.
withIOManager \iocp -> do
Expand Down Expand Up @@ -212,7 +213,7 @@ runDMQ commandLineConfig = do
dmqLimitsAndTimeouts
dmqNtNApps
dmqNtCApps
(policy policyRng)
(policy policyRngVar)

Diffusion.run dmqDiffusionArguments
(dmqDiffusionTracers dmqConfig tracer)
Expand Down
140 changes: 109 additions & 31 deletions dmq-node/src/DMQ/Diffusion/PeerSelection.hs
Original file line number Diff line number Diff line change
@@ -1,40 +1,118 @@
module DMQ.Diffusion.PeerSelection where

import Data.Set (Set)
import Control.Concurrent.Class.MonadSTM.Strict
import Data.List (sortOn, unfoldr)
import Data.Map.Strict qualified as Map
import Data.Set qualified as Set
import Network.Socket (SockAddr)
import Ouroboros.Network.PeerSelection.Governor.Types
import System.Random (Random (..), StdGen)
import Data.Word (Word32)
import Ouroboros.Network.PeerSelection
import System.Random (Random (..), StdGen, split)

-- | Trivial peer selection policy used as dummy value
--
policy :: StdGen -> PeerSelectionPolicy SockAddr IO
policy gen =
policy :: forall peerAddr m.
( MonadSTM m
, Ord peerAddr
)
=> StrictTVar m StdGen
-> PeerSelectionPolicy peerAddr m
policy rngVar =
PeerSelectionPolicy {
policyPickKnownPeersForPeerShare = \_ _ _ -> pickTrivially
, policyPickColdPeersToForget = \_ _ _ -> pickTrivially
, policyPickColdPeersToPromote = \_ _ _ -> pickTrivially
, policyPickWarmPeersToPromote = \_ _ _ -> pickTrivially
, policyPickHotPeersToDemote = \_ _ _ -> pickTrivially
, policyPickWarmPeersToDemote = \_ _ _ -> pickTrivially
, policyPickInboundPeers = \_ _ _ -> pickTrivially
, policyFindPublicRootTimeout = 5
, policyMaxInProgressPeerShareReqs = 0
, policyPeerShareRetryTime = 0 -- seconds
, policyPeerShareBatchWaitTime = 0 -- seconds
, policyPeerShareOverallTimeout = 0 -- seconds
, policyPeerShareActivationDelay = 2 -- seconds
policyPickKnownPeersForPeerShare = simplePromotionPolicy,
policyPickColdPeersToPromote = simplePromotionPolicy,
policyPickWarmPeersToPromote = simplePromotionPolicy,
policyPickInboundPeers = simplePromotionPolicy,

policyPickHotPeersToDemote = hotDemotionPolicy,
policyPickWarmPeersToDemote = warmDemotionPolicy,
policyPickColdPeersToForget = coldForgetPolicy,

policyFindPublicRootTimeout = 5,
policyMaxInProgressPeerShareReqs = 0,
policyPeerShareRetryTime = 0, -- seconds
policyPeerShareBatchWaitTime = 0, -- seconds
policyPeerShareOverallTimeout = 0, -- seconds
policyPeerShareActivationDelay = 2 -- seconds
}
where
pickTrivially :: Applicative m => Set SockAddr -> Int -> m (Set SockAddr)
pickTrivially set n = pure
. fst
$ go gen (Set.toList set) n []
where
go g _ 0 acc = (Set.fromList acc, g)
go g [] _ acc = (Set.fromList acc, g)
go g xs k acc =
let (idx, g') = randomR (0, length xs - 1) g
picked = xs !! idx
xs' = take idx xs ++ drop (idx + 1) xs
in go g' xs' (k - 1) (picked : acc)
hotDemotionPolicy :: PickPolicy peerAddr (STM m)
hotDemotionPolicy _ _ _ available pickNum = do
available' <- addRand rngVar available (,)
return $ Set.fromList
. map fst
. take pickNum
. sortOn snd
. Map.assocs
$ available'

-- Randomly pick peers to demote, peers with knownPeerTepid set are twice
-- as likely to be demoted.
warmDemotionPolicy :: PickPolicy peerAddr (STM m)
warmDemotionPolicy _ _ isTepid available pickNum = do
available' <- addRand rngVar available (tepidWeight isTepid)
return $ Set.fromList
. map fst
. take pickNum
. sortOn snd
. Map.assocs
$ available'

simplePromotionPolicy :: PickPolicy peerAddr (STM m)
simplePromotionPolicy _ _ _ available pickNum = do
available' <- addRand rngVar available (,)
return $ Set.fromList
. map fst
. take pickNum
. sortOn snd
. Map.assocs
$ available'

-- Randomly pick peers to forget, peers with failures are more likely to
-- be forgotten.
coldForgetPolicy :: PickPolicy peerAddr (STM m)
coldForgetPolicy source failCnt _ available pickNum = do
available' <- addRand rngVar available (failWeight failCnt)
return $ Set.fromList
. map fst
. take pickNum
. sortOn snd
. Map.assocs
-- avoid demoting local root peers
. Map.filterWithKey (\peer _ -> source peer /= PeerSourceLocalRoot)
$ available'

-- Failures lowers r
failWeight :: (peerAddr -> Int)
-> peerAddr
-> Word32
-> (peerAddr, Word32)
failWeight failCnt peer r =
(peer, r `div` fromIntegral (failCnt peer + 1))

-- Tepid flag cuts r in half
tepidWeight :: (peerAddr -> Bool)
-> peerAddr
-> Word32
-> (peerAddr, Word32)
tepidWeight isTepid peer r =
if isTepid peer then (peer, r `div` 2)
else (peer, r)


-- Add scaled random number in order to prevent ordering based on SockAddr
addRand :: ( MonadSTM m
, Ord peerAddr
)
=> StrictTVar m StdGen
-> Set.Set peerAddr
-> (peerAddr -> Word32 -> (peerAddr, Word32))
-> STM m (Map.Map peerAddr Word32)
addRand rngVar available scaleFn = do
inRng <- readTVar rngVar

let (rng, rng') = split inRng
rns = take (Set.size available) $ unfoldr (Just . random) rng :: [Word32]
available' = Map.fromList $ zipWith scaleFn (Set.toList available) rns
writeTVar rngVar rng'
return available'

Loading