From dfb71e1cfc054a96cd0f037e92fe12130b5db439 Mon Sep 17 00:00:00 2001 From: Travis Staton Date: Fri, 15 Oct 2021 14:56:41 -0400 Subject: [PATCH 1/2] Add FIFO queue of waiters --- Data/Pool.hs | 38 ++++++++++++++---- Data/Pool/WaiterQueue.hs | 87 ++++++++++++++++++++++++++++++++++++++++ resource-pool.cabal | 1 + 3 files changed, 118 insertions(+), 8 deletions(-) create mode 100644 Data/Pool/WaiterQueue.hs diff --git a/Data/Pool.hs b/Data/Pool.hs index 6764e8b..761497a 100644 --- a/Data/Pool.hs +++ b/Data/Pool.hs @@ -45,10 +45,11 @@ import Control.Applicative ((<$>)) import Control.Concurrent (ThreadId, forkIOWithUnmask, killThread, myThreadId, threadDelay) import Control.Concurrent.STM import Control.Exception (SomeException, onException, mask_) -import Control.Monad (forM_, forever, join, liftM3, unless, when) +import Control.Monad (forM_, forever, join, liftM4, unless, when) import Data.Hashable (hash) import Data.IORef (IORef, newIORef, mkWeakIORef) import Data.List (partition) +import Data.Pool.WaiterQueue (WaiterQueue, newQueueIO, push, pop) import Data.Time.Clock (NominalDiffTime, UTCTime, diffUTCTime, getCurrentTime) import Data.Typeable (Typeable) import GHC.Conc.Sync (labelThread) @@ -86,6 +87,8 @@ data LocalPool a = LocalPool { -- ^ Count of open entries (both idle and in use). , entries :: TVar [Entry a] -- ^ Idle entries. + , waiters :: WaiterQueue (TMVar (Entry a)) + -- ^ threads waiting for a resource , lfin :: IORef () -- ^ empty value used to attach a finalizer to (internal) } deriving (Typeable) @@ -159,7 +162,7 @@ createPool create destroy numStripes idleTime maxResources = do when (maxResources < 1) $ modError "pool " $ "invalid maximum resource count " ++ show maxResources localPools <- V.replicateM numStripes $ - liftM3 LocalPool (newTVarIO 0) (newTVarIO []) (newIORef ()) + liftM4 LocalPool (newTVarIO 0) (newTVarIO []) newQueueIO (newIORef ()) reaperId <- forkIOLabeledWithUnmask "resource-pool: reaper" $ \unmask -> unmask $ reaper destroy idleTime localPools fin <- newIORef () @@ -281,10 +284,21 @@ takeResource pool@Pool{..} = do (Entry{..}:es) -> writeTVar entries es >> return (return entry) [] -> do used <- readTVar inUse - when (used == maxResources) retry - writeTVar inUse $! used + 1 - return $ - create `onException` atomically (modifyTVar_ inUse (subtract 1)) + case used == maxResources of + False -> do + writeTVar inUse $! used + 1 + return $ + create `onException` atomically (modifyTVar_ inUse (subtract 1)) + True -> do + var <- newEmptyTMVar + removeSelf <- push waiters var + let dequeue = atomically $ do + removeSelf + maybeEntry <- tryTakeTMVar var + case maybeEntry of + Nothing -> pure () + Just v -> putResourceSTM local v + return (entry <$> atomically (takeTMVar var) `onException` dequeue) return (resource, local) #if __GLASGOW_HASKELL__ >= 700 {-# INLINABLE takeResource #-} @@ -361,13 +375,21 @@ destroyResource Pool{..} LocalPool{..} resource = do -- | Return a resource to the given 'LocalPool'. putResource :: LocalPool a -> a -> IO () -putResource LocalPool{..} resource = do +putResource lp resource = do now <- getCurrentTime - atomically $ modifyTVar_ entries (Entry resource now:) + atomically $ putResourceSTM lp (Entry resource now) #if __GLASGOW_HASKELL__ >= 700 {-# INLINABLE putResource #-} #endif +putResourceSTM :: LocalPool a -> Entry a -> STM () +putResourceSTM LocalPool{..} resourceEntry = do + mWaiters <- pop waiters + case mWaiters of + Nothing -> modifyTVar_ entries (resourceEntry:) + Just w -> putTMVar w resourceEntry +{-# INLINE putResourceSTM #-} + -- | Destroy all resources in all stripes in the pool. Note that this -- will ignore any exceptions in the destroy function. -- diff --git a/Data/Pool/WaiterQueue.hs b/Data/Pool/WaiterQueue.hs new file mode 100644 index 0000000..5cdee1d --- /dev/null +++ b/Data/Pool/WaiterQueue.hs @@ -0,0 +1,87 @@ +module Data.Pool.WaiterQueue + ( WaiterQueue, + newQueueIO, + push, + pop, + ) +where + +import Control.Concurrent.STM + +-- | A FIFO queue that supports removing any element from the queue. +-- +-- We have a pointer to the head of the list, and a pointer to the +-- final foward pointer in the list. +data WaiterQueue a + = WaiterQueue + (TVar (TDList a)) + (TVar (TVar (TDList a))) + +-- | Each element has a pointer to the previous element's forward +-- pointer where "previous element" can be a 'TDList' cons cell or the +-- 'WaiterQueue' head pointer. +data TDList a + = TCons + (TVar (TVar (TDList a))) + a + (TVar (TDList a)) + | TNil + +newQueueIO :: IO (WaiterQueue a) +newQueueIO = do + emptyVarL <- newTVarIO TNil + emptyVarR <- newTVarIO emptyVarL + pure (WaiterQueue emptyVarL emptyVarR) + +removeSelf :: + -- | 'WaiterQueue's final foward pointer pointer + TVar (TVar (TDList a)) -> + -- | Our back pointer + TVar (TVar (TDList a)) -> + -- | Our forward pointter + TVar (TDList a) -> + STM () +removeSelf tv prevPP nextP = do + prevP <- readTVar prevPP + -- If our back pointer points to our forward pointer then we have + -- already been removed from the queue + case prevP == nextP of + True -> pure () + False -> do + next <- readTVar nextP + writeTVar prevP next + case next of + TNil -> writeTVar tv prevP + TCons bp _ _ -> writeTVar bp prevP + writeTVar prevPP nextP +{-# INLINE removeSelf #-} + +-- | Returns an STM action that removes the pushed element from the +-- queue +push :: WaiterQueue a -> a -> STM (STM ()) +push (WaiterQueue _ tv) a = do + fwdPointer <- readTVar tv + backPointer <- newTVar fwdPointer + emptyVar <- newTVar TNil + let cell = TCons backPointer a emptyVar + writeTVar fwdPointer cell + writeTVar tv emptyVar + pure (removeSelf tv backPointer emptyVar) +{-# INLINE push #-} + +pop :: WaiterQueue a -> STM (Maybe a) +pop (WaiterQueue hv tv) = do + firstElem <- readTVar hv + case firstElem of + TNil -> pure Nothing + TCons bp a fp -> do + f <- readTVar fp + writeTVar hv f + case f of + TNil -> writeTVar tv hv + TCons fbp _ _ -> writeTVar fbp hv + -- point the back pointer to the forward pointer as a sign that + -- the cell has been popped (referenced in removeSelf) + writeTVar bp fp + pure (Just a) +{-# INLINE pop #-} diff --git a/resource-pool.cabal b/resource-pool.cabal index 6a9bc09..2a64ac4 100644 --- a/resource-pool.cabal +++ b/resource-pool.cabal @@ -28,6 +28,7 @@ flag developer library exposed-modules: Data.Pool + other-modules: Data.Pool.WaiterQueue build-depends: base >= 4.4 && < 5, From 243a5b365b26775ea68ec138e18172662135d036 Mon Sep 17 00:00:00 2001 From: Travis Staton Date: Tue, 19 Oct 2021 22:21:36 -0400 Subject: [PATCH 2/2] Notify waiters on destroy --- Data/Pool.hs | 37 +++++++++++++++++++++++++------------ 1 file changed, 25 insertions(+), 12 deletions(-) diff --git a/Data/Pool.hs b/Data/Pool.hs index 761497a..58750ec 100644 --- a/Data/Pool.hs +++ b/Data/Pool.hs @@ -87,7 +87,7 @@ data LocalPool a = LocalPool { -- ^ Count of open entries (both idle and in use). , entries :: TVar [Entry a] -- ^ Idle entries. - , waiters :: WaiterQueue (TMVar (Entry a)) + , waiters :: WaiterQueue (TMVar (Maybe (Entry a))) -- ^ threads waiting for a resource , lfin :: IORef () -- ^ empty value used to attach a finalizer to (internal) @@ -288,17 +288,22 @@ takeResource pool@Pool{..} = do False -> do writeTVar inUse $! used + 1 return $ - create `onException` atomically (modifyTVar_ inUse (subtract 1)) + create `onException` atomically (destroyResourceSTM local) True -> do var <- newEmptyTMVar removeSelf <- push waiters var - let dequeue = atomically $ do - removeSelf - maybeEntry <- tryTakeTMVar var - case maybeEntry of + let getResource x = case x of + Just y -> pure (entry y) + Nothing -> create `onException` atomically (destroyResourceSTM local) + let dequeue = do + maybeEntry <- atomically $ do + removeSelf + tryTakeTMVar var + atomically $ case maybeEntry of Nothing -> pure () - Just v -> putResourceSTM local v - return (entry <$> atomically (takeTMVar var) `onException` dequeue) + Just Nothing -> destroyResourceSTM local + Just (Just v) -> putResourceSTM local v + return (getResource =<< atomically (takeTMVar var) `onException` dequeue) return (resource, local) #if __GLASGOW_HASKELL__ >= 700 {-# INLINABLE takeResource #-} @@ -346,7 +351,7 @@ tryTakeResource pool@Pool{..} = do else do writeTVar inUse $! used + 1 return $ Just <$> - create `onException` atomically (modifyTVar_ inUse (subtract 1)) + create `onException` atomically (destroyResourceSTM local) return $ (flip (,) local) <$> resource #if __GLASGOW_HASKELL__ >= 700 {-# INLINABLE tryTakeResource #-} @@ -366,9 +371,9 @@ getLocalPool Pool{..} = do -- | Destroy a resource. Note that this will ignore any exceptions in the -- destroy function. destroyResource :: Pool a -> LocalPool a -> a -> IO () -destroyResource Pool{..} LocalPool{..} resource = do +destroyResource Pool{..} local resource = do destroy resource `E.catch` \(_::SomeException) -> return () - atomically (modifyTVar_ inUse (subtract 1)) + atomically (destroyResourceSTM local) #if __GLASGOW_HASKELL__ >= 700 {-# INLINABLE destroyResource #-} #endif @@ -387,9 +392,17 @@ putResourceSTM LocalPool{..} resourceEntry = do mWaiters <- pop waiters case mWaiters of Nothing -> modifyTVar_ entries (resourceEntry:) - Just w -> putTMVar w resourceEntry + Just w -> putTMVar w (Just resourceEntry) {-# INLINE putResourceSTM #-} +destroyResourceSTM :: LocalPool a -> STM () +destroyResourceSTM LocalPool{..} = do + mwaiter <- pop waiters + case mwaiter of + Nothing -> modifyTVar_ inUse (subtract 1) + Just w -> putTMVar w Nothing +{-# INLINE destroyResourceSTM #-} + -- | Destroy all resources in all stripes in the pool. Note that this -- will ignore any exceptions in the destroy function. --