diff --git a/cabal.project b/cabal.project index caee395..6c28a7b 100644 --- a/cabal.project +++ b/cabal.project @@ -58,8 +58,9 @@ source-repository-package source-repository-package type: git location: https://github.com/IntersectMBO/ouroboros-network - tag: 3c4433d05ec012af6d1a26e6b5e86665627c08c4 - --sha256: sha256-Jemp6PlzISA+l1wdXV6MrIxaBpAxdrLLAlbkB7ZqF2Y= + -- from coot/dmq-related-changes + tag: adc2fa928d1856a8f0069190b7f698d2218f2110 + --sha256: sha256-z1ftTOd3/EO6kjbIqLuRBtaGtkB1kNrpPOrF/q6oigM= subdir: acts-generic cardano-diffusion diff --git a/dmq-node/app/Main.hs b/dmq-node/app/Main.hs index c3d322f..4bab9cf 100644 --- a/dmq-node/app/Main.hs +++ b/dmq-node/app/Main.hs @@ -119,6 +119,7 @@ runDMQ commandLineConfig = do stdGen <- newStdGen let (psRng, policyRng) = split stdGen + policyRngVar <- newTVarIO policyRng -- TODO: this might not work, since `ouroboros-network` creates its own IO Completion Port. withIOManager \iocp -> do @@ -212,7 +213,7 @@ runDMQ commandLineConfig = do dmqLimitsAndTimeouts dmqNtNApps dmqNtCApps - (policy policyRng) + (policy policyRngVar) Diffusion.run dmqDiffusionArguments (dmqDiffusionTracers dmqConfig tracer) diff --git a/dmq-node/src/DMQ/Diffusion/PeerSelection.hs b/dmq-node/src/DMQ/Diffusion/PeerSelection.hs index 13add1d..b17bbaa 100644 --- a/dmq-node/src/DMQ/Diffusion/PeerSelection.hs +++ b/dmq-node/src/DMQ/Diffusion/PeerSelection.hs @@ -1,40 +1,118 @@ module DMQ.Diffusion.PeerSelection where -import Data.Set (Set) +import Control.Concurrent.Class.MonadSTM.Strict +import Data.List (sortOn, unfoldr) +import Data.Map.Strict qualified as Map import Data.Set qualified as Set -import Network.Socket (SockAddr) -import Ouroboros.Network.PeerSelection.Governor.Types -import System.Random (Random (..), StdGen) +import Data.Word (Word32) +import Ouroboros.Network.PeerSelection +import System.Random (Random (..), StdGen, split) -- | Trivial peer selection policy used as dummy value -- -policy :: StdGen -> PeerSelectionPolicy SockAddr IO -policy gen = +policy :: forall peerAddr m. + ( MonadSTM m + , Ord peerAddr + ) + => StrictTVar m StdGen + -> PeerSelectionPolicy peerAddr m +policy rngVar = PeerSelectionPolicy { - policyPickKnownPeersForPeerShare = \_ _ _ -> pickTrivially - , policyPickColdPeersToForget = \_ _ _ -> pickTrivially - , policyPickColdPeersToPromote = \_ _ _ -> pickTrivially - , policyPickWarmPeersToPromote = \_ _ _ -> pickTrivially - , policyPickHotPeersToDemote = \_ _ _ -> pickTrivially - , policyPickWarmPeersToDemote = \_ _ _ -> pickTrivially - , policyPickInboundPeers = \_ _ _ -> pickTrivially - , policyFindPublicRootTimeout = 5 - , policyMaxInProgressPeerShareReqs = 0 - , policyPeerShareRetryTime = 0 -- seconds - , policyPeerShareBatchWaitTime = 0 -- seconds - , policyPeerShareOverallTimeout = 0 -- seconds - , policyPeerShareActivationDelay = 2 -- seconds + policyPickKnownPeersForPeerShare = simplePromotionPolicy, + policyPickColdPeersToPromote = simplePromotionPolicy, + policyPickWarmPeersToPromote = simplePromotionPolicy, + policyPickInboundPeers = simplePromotionPolicy, + + policyPickHotPeersToDemote = hotDemotionPolicy, + policyPickWarmPeersToDemote = warmDemotionPolicy, + policyPickColdPeersToForget = coldForgetPolicy, + + policyFindPublicRootTimeout = 5, + policyMaxInProgressPeerShareReqs = 0, + policyPeerShareRetryTime = 0, -- seconds + policyPeerShareBatchWaitTime = 0, -- seconds + policyPeerShareOverallTimeout = 0, -- seconds + policyPeerShareActivationDelay = 2 -- seconds } where - pickTrivially :: Applicative m => Set SockAddr -> Int -> m (Set SockAddr) - pickTrivially set n = pure - . fst - $ go gen (Set.toList set) n [] - where - go g _ 0 acc = (Set.fromList acc, g) - go g [] _ acc = (Set.fromList acc, g) - go g xs k acc = - let (idx, g') = randomR (0, length xs - 1) g - picked = xs !! idx - xs' = take idx xs ++ drop (idx + 1) xs - in go g' xs' (k - 1) (picked : acc) + hotDemotionPolicy :: PickPolicy peerAddr (STM m) + hotDemotionPolicy _ _ _ available pickNum = do + available' <- addRand rngVar available (,) + return $ Set.fromList + . map fst + . take pickNum + . sortOn snd + . Map.assocs + $ available' + + -- Randomly pick peers to demote, peers with knownPeerTepid set are twice + -- as likely to be demoted. + warmDemotionPolicy :: PickPolicy peerAddr (STM m) + warmDemotionPolicy _ _ isTepid available pickNum = do + available' <- addRand rngVar available (tepidWeight isTepid) + return $ Set.fromList + . map fst + . take pickNum + . sortOn snd + . Map.assocs + $ available' + + simplePromotionPolicy :: PickPolicy peerAddr (STM m) + simplePromotionPolicy _ _ _ available pickNum = do + available' <- addRand rngVar available (,) + return $ Set.fromList + . map fst + . take pickNum + . sortOn snd + . Map.assocs + $ available' + + -- Randomly pick peers to forget, peers with failures are more likely to + -- be forgotten. + coldForgetPolicy :: PickPolicy peerAddr (STM m) + coldForgetPolicy source failCnt _ available pickNum = do + available' <- addRand rngVar available (failWeight failCnt) + return $ Set.fromList + . map fst + . take pickNum + . sortOn snd + . Map.assocs + -- avoid demoting local root peers + . Map.filterWithKey (\peer _ -> source peer /= PeerSourceLocalRoot) + $ available' + + -- Failures lowers r + failWeight :: (peerAddr -> Int) + -> peerAddr + -> Word32 + -> (peerAddr, Word32) + failWeight failCnt peer r = + (peer, r `div` fromIntegral (failCnt peer + 1)) + + -- Tepid flag cuts r in half + tepidWeight :: (peerAddr -> Bool) + -> peerAddr + -> Word32 + -> (peerAddr, Word32) + tepidWeight isTepid peer r = + if isTepid peer then (peer, r `div` 2) + else (peer, r) + + + -- Add scaled random number in order to prevent ordering based on SockAddr +addRand :: ( MonadSTM m + , Ord peerAddr + ) + => StrictTVar m StdGen + -> Set.Set peerAddr + -> (peerAddr -> Word32 -> (peerAddr, Word32)) + -> STM m (Map.Map peerAddr Word32) +addRand rngVar available scaleFn = do + inRng <- readTVar rngVar + + let (rng, rng') = split inRng + rns = take (Set.size available) $ unfoldr (Just . random) rng :: [Word32] + available' = Map.fromList $ zipWith scaleFn (Set.toList available) rns + writeTVar rngVar rng' + return available' +