diff --git a/Data/Pool.hs b/Data/Pool.hs index 6764e8b..58750ec 100644 --- a/Data/Pool.hs +++ b/Data/Pool.hs @@ -45,10 +45,11 @@ import Control.Applicative ((<$>)) import Control.Concurrent (ThreadId, forkIOWithUnmask, killThread, myThreadId, threadDelay) import Control.Concurrent.STM import Control.Exception (SomeException, onException, mask_) -import Control.Monad (forM_, forever, join, liftM3, unless, when) +import Control.Monad (forM_, forever, join, liftM4, unless, when) import Data.Hashable (hash) import Data.IORef (IORef, newIORef, mkWeakIORef) import Data.List (partition) +import Data.Pool.WaiterQueue (WaiterQueue, newQueueIO, push, pop) import Data.Time.Clock (NominalDiffTime, UTCTime, diffUTCTime, getCurrentTime) import Data.Typeable (Typeable) import GHC.Conc.Sync (labelThread) @@ -86,6 +87,8 @@ data LocalPool a = LocalPool { -- ^ Count of open entries (both idle and in use). , entries :: TVar [Entry a] -- ^ Idle entries. + , waiters :: WaiterQueue (TMVar (Maybe (Entry a))) + -- ^ threads waiting for a resource , lfin :: IORef () -- ^ empty value used to attach a finalizer to (internal) } deriving (Typeable) @@ -159,7 +162,7 @@ createPool create destroy numStripes idleTime maxResources = do when (maxResources < 1) $ modError "pool " $ "invalid maximum resource count " ++ show maxResources localPools <- V.replicateM numStripes $ - liftM3 LocalPool (newTVarIO 0) (newTVarIO []) (newIORef ()) + liftM4 LocalPool (newTVarIO 0) (newTVarIO []) newQueueIO (newIORef ()) reaperId <- forkIOLabeledWithUnmask "resource-pool: reaper" $ \unmask -> unmask $ reaper destroy idleTime localPools fin <- newIORef () @@ -281,10 +284,26 @@ takeResource pool@Pool{..} = do (Entry{..}:es) -> writeTVar entries es >> return (return entry) [] -> do used <- readTVar inUse - when (used == maxResources) retry - writeTVar inUse $! used + 1 - return $ - create `onException` atomically (modifyTVar_ inUse (subtract 1)) + case used == maxResources of + False -> do + writeTVar inUse $! used + 1 + return $ + create `onException` atomically (destroyResourceSTM local) + True -> do + var <- newEmptyTMVar + removeSelf <- push waiters var + let getResource x = case x of + Just y -> pure (entry y) + Nothing -> create `onException` atomically (destroyResourceSTM local) + let dequeue = do + maybeEntry <- atomically $ do + removeSelf + tryTakeTMVar var + atomically $ case maybeEntry of + Nothing -> pure () + Just Nothing -> destroyResourceSTM local + Just (Just v) -> putResourceSTM local v + return (getResource =<< atomically (takeTMVar var) `onException` dequeue) return (resource, local) #if __GLASGOW_HASKELL__ >= 700 {-# INLINABLE takeResource #-} @@ -332,7 +351,7 @@ tryTakeResource pool@Pool{..} = do else do writeTVar inUse $! used + 1 return $ Just <$> - create `onException` atomically (modifyTVar_ inUse (subtract 1)) + create `onException` atomically (destroyResourceSTM local) return $ (flip (,) local) <$> resource #if __GLASGOW_HASKELL__ >= 700 {-# INLINABLE tryTakeResource #-} @@ -352,22 +371,38 @@ getLocalPool Pool{..} = do -- | Destroy a resource. Note that this will ignore any exceptions in the -- destroy function. destroyResource :: Pool a -> LocalPool a -> a -> IO () -destroyResource Pool{..} LocalPool{..} resource = do +destroyResource Pool{..} local resource = do destroy resource `E.catch` \(_::SomeException) -> return () - atomically (modifyTVar_ inUse (subtract 1)) + atomically (destroyResourceSTM local) #if __GLASGOW_HASKELL__ >= 700 {-# INLINABLE destroyResource #-} #endif -- | Return a resource to the given 'LocalPool'. putResource :: LocalPool a -> a -> IO () -putResource LocalPool{..} resource = do +putResource lp resource = do now <- getCurrentTime - atomically $ modifyTVar_ entries (Entry resource now:) + atomically $ putResourceSTM lp (Entry resource now) #if __GLASGOW_HASKELL__ >= 700 {-# INLINABLE putResource #-} #endif +putResourceSTM :: LocalPool a -> Entry a -> STM () +putResourceSTM LocalPool{..} resourceEntry = do + mWaiters <- pop waiters + case mWaiters of + Nothing -> modifyTVar_ entries (resourceEntry:) + Just w -> putTMVar w (Just resourceEntry) +{-# INLINE putResourceSTM #-} + +destroyResourceSTM :: LocalPool a -> STM () +destroyResourceSTM LocalPool{..} = do + mwaiter <- pop waiters + case mwaiter of + Nothing -> modifyTVar_ inUse (subtract 1) + Just w -> putTMVar w Nothing +{-# INLINE destroyResourceSTM #-} + -- | Destroy all resources in all stripes in the pool. Note that this -- will ignore any exceptions in the destroy function. -- diff --git a/Data/Pool/WaiterQueue.hs b/Data/Pool/WaiterQueue.hs new file mode 100644 index 0000000..5cdee1d --- /dev/null +++ b/Data/Pool/WaiterQueue.hs @@ -0,0 +1,87 @@ +module Data.Pool.WaiterQueue + ( WaiterQueue, + newQueueIO, + push, + pop, + ) +where + +import Control.Concurrent.STM + +-- | A FIFO queue that supports removing any element from the queue. +-- +-- We have a pointer to the head of the list, and a pointer to the +-- final foward pointer in the list. +data WaiterQueue a + = WaiterQueue + (TVar (TDList a)) + (TVar (TVar (TDList a))) + +-- | Each element has a pointer to the previous element's forward +-- pointer where "previous element" can be a 'TDList' cons cell or the +-- 'WaiterQueue' head pointer. +data TDList a + = TCons + (TVar (TVar (TDList a))) + a + (TVar (TDList a)) + | TNil + +newQueueIO :: IO (WaiterQueue a) +newQueueIO = do + emptyVarL <- newTVarIO TNil + emptyVarR <- newTVarIO emptyVarL + pure (WaiterQueue emptyVarL emptyVarR) + +removeSelf :: + -- | 'WaiterQueue's final foward pointer pointer + TVar (TVar (TDList a)) -> + -- | Our back pointer + TVar (TVar (TDList a)) -> + -- | Our forward pointter + TVar (TDList a) -> + STM () +removeSelf tv prevPP nextP = do + prevP <- readTVar prevPP + -- If our back pointer points to our forward pointer then we have + -- already been removed from the queue + case prevP == nextP of + True -> pure () + False -> do + next <- readTVar nextP + writeTVar prevP next + case next of + TNil -> writeTVar tv prevP + TCons bp _ _ -> writeTVar bp prevP + writeTVar prevPP nextP +{-# INLINE removeSelf #-} + +-- | Returns an STM action that removes the pushed element from the +-- queue +push :: WaiterQueue a -> a -> STM (STM ()) +push (WaiterQueue _ tv) a = do + fwdPointer <- readTVar tv + backPointer <- newTVar fwdPointer + emptyVar <- newTVar TNil + let cell = TCons backPointer a emptyVar + writeTVar fwdPointer cell + writeTVar tv emptyVar + pure (removeSelf tv backPointer emptyVar) +{-# INLINE push #-} + +pop :: WaiterQueue a -> STM (Maybe a) +pop (WaiterQueue hv tv) = do + firstElem <- readTVar hv + case firstElem of + TNil -> pure Nothing + TCons bp a fp -> do + f <- readTVar fp + writeTVar hv f + case f of + TNil -> writeTVar tv hv + TCons fbp _ _ -> writeTVar fbp hv + -- point the back pointer to the forward pointer as a sign that + -- the cell has been popped (referenced in removeSelf) + writeTVar bp fp + pure (Just a) +{-# INLINE pop #-} diff --git a/resource-pool.cabal b/resource-pool.cabal index 6a9bc09..2a64ac4 100644 --- a/resource-pool.cabal +++ b/resource-pool.cabal @@ -28,6 +28,7 @@ flag developer library exposed-modules: Data.Pool + other-modules: Data.Pool.WaiterQueue build-depends: base >= 4.4 && < 5,