Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 46 additions & 11 deletions Data/Pool.hs
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,11 @@ import Control.Applicative ((<$>))
import Control.Concurrent (ThreadId, forkIOWithUnmask, killThread, myThreadId, threadDelay)
import Control.Concurrent.STM
import Control.Exception (SomeException, onException, mask_)
import Control.Monad (forM_, forever, join, liftM3, unless, when)
import Control.Monad (forM_, forever, join, liftM4, unless, when)
import Data.Hashable (hash)
import Data.IORef (IORef, newIORef, mkWeakIORef)
import Data.List (partition)
import Data.Pool.WaiterQueue (WaiterQueue, newQueueIO, push, pop)
import Data.Time.Clock (NominalDiffTime, UTCTime, diffUTCTime, getCurrentTime)
import Data.Typeable (Typeable)
import GHC.Conc.Sync (labelThread)
Expand Down Expand Up @@ -86,6 +87,8 @@ data LocalPool a = LocalPool {
-- ^ Count of open entries (both idle and in use).
, entries :: TVar [Entry a]
-- ^ Idle entries.
, waiters :: WaiterQueue (TMVar (Maybe (Entry a)))
-- ^ threads waiting for a resource
, lfin :: IORef ()
-- ^ empty value used to attach a finalizer to (internal)
} deriving (Typeable)
Expand Down Expand Up @@ -159,7 +162,7 @@ createPool create destroy numStripes idleTime maxResources = do
when (maxResources < 1) $
modError "pool " $ "invalid maximum resource count " ++ show maxResources
localPools <- V.replicateM numStripes $
liftM3 LocalPool (newTVarIO 0) (newTVarIO []) (newIORef ())
liftM4 LocalPool (newTVarIO 0) (newTVarIO []) newQueueIO (newIORef ())
reaperId <- forkIOLabeledWithUnmask "resource-pool: reaper" $ \unmask ->
unmask $ reaper destroy idleTime localPools
fin <- newIORef ()
Expand Down Expand Up @@ -281,10 +284,26 @@ takeResource pool@Pool{..} = do
(Entry{..}:es) -> writeTVar entries es >> return (return entry)
[] -> do
used <- readTVar inUse
when (used == maxResources) retry
writeTVar inUse $! used + 1
return $
create `onException` atomically (modifyTVar_ inUse (subtract 1))
case used == maxResources of
False -> do
writeTVar inUse $! used + 1
return $
create `onException` atomically (destroyResourceSTM local)
True -> do
var <- newEmptyTMVar
removeSelf <- push waiters var
let getResource x = case x of
Just y -> pure (entry y)
Nothing -> create `onException` atomically (destroyResourceSTM local)
let dequeue = do
maybeEntry <- atomically $ do
removeSelf
tryTakeTMVar var
atomically $ case maybeEntry of
Nothing -> pure ()
Just Nothing -> destroyResourceSTM local
Just (Just v) -> putResourceSTM local v
return (getResource =<< atomically (takeTMVar var) `onException` dequeue)
return (resource, local)
#if __GLASGOW_HASKELL__ >= 700
{-# INLINABLE takeResource #-}
Expand Down Expand Up @@ -332,7 +351,7 @@ tryTakeResource pool@Pool{..} = do
else do
writeTVar inUse $! used + 1
return $ Just <$>
create `onException` atomically (modifyTVar_ inUse (subtract 1))
create `onException` atomically (destroyResourceSTM local)
return $ (flip (,) local) <$> resource
#if __GLASGOW_HASKELL__ >= 700
{-# INLINABLE tryTakeResource #-}
Expand All @@ -352,22 +371,38 @@ getLocalPool Pool{..} = do
-- | Destroy a resource. Note that this will ignore any exceptions in the
-- destroy function.
destroyResource :: Pool a -> LocalPool a -> a -> IO ()
destroyResource Pool{..} LocalPool{..} resource = do
destroyResource Pool{..} local resource = do
destroy resource `E.catch` \(_::SomeException) -> return ()
atomically (modifyTVar_ inUse (subtract 1))
atomically (destroyResourceSTM local)
#if __GLASGOW_HASKELL__ >= 700
{-# INLINABLE destroyResource #-}
#endif

-- | Return a resource to the given 'LocalPool'.
putResource :: LocalPool a -> a -> IO ()
putResource LocalPool{..} resource = do
putResource lp resource = do
now <- getCurrentTime
atomically $ modifyTVar_ entries (Entry resource now:)
atomically $ putResourceSTM lp (Entry resource now)
#if __GLASGOW_HASKELL__ >= 700
{-# INLINABLE putResource #-}
#endif

putResourceSTM :: LocalPool a -> Entry a -> STM ()
putResourceSTM LocalPool{..} resourceEntry = do
mWaiters <- pop waiters
case mWaiters of
Nothing -> modifyTVar_ entries (resourceEntry:)
Just w -> putTMVar w (Just resourceEntry)
{-# INLINE putResourceSTM #-}

destroyResourceSTM :: LocalPool a -> STM ()
destroyResourceSTM LocalPool{..} = do
mwaiter <- pop waiters
case mwaiter of
Nothing -> modifyTVar_ inUse (subtract 1)
Just w -> putTMVar w Nothing
{-# INLINE destroyResourceSTM #-}

-- | Destroy all resources in all stripes in the pool. Note that this
-- will ignore any exceptions in the destroy function.
--
Expand Down
87 changes: 87 additions & 0 deletions Data/Pool/WaiterQueue.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
module Data.Pool.WaiterQueue
( WaiterQueue,
newQueueIO,
push,
pop,
)
where

import Control.Concurrent.STM

-- | A FIFO queue that supports removing any element from the queue.
--
-- We have a pointer to the head of the list, and a pointer to the
-- final foward pointer in the list.
data WaiterQueue a
= WaiterQueue
(TVar (TDList a))
(TVar (TVar (TDList a)))

-- | Each element has a pointer to the previous element's forward
-- pointer where "previous element" can be a 'TDList' cons cell or the
-- 'WaiterQueue' head pointer.
data TDList a
= TCons
(TVar (TVar (TDList a)))
a
(TVar (TDList a))
| TNil

newQueueIO :: IO (WaiterQueue a)
newQueueIO = do
emptyVarL <- newTVarIO TNil
emptyVarR <- newTVarIO emptyVarL
pure (WaiterQueue emptyVarL emptyVarR)

removeSelf ::
-- | 'WaiterQueue's final foward pointer pointer
TVar (TVar (TDList a)) ->
-- | Our back pointer
TVar (TVar (TDList a)) ->
-- | Our forward pointter
TVar (TDList a) ->
STM ()
removeSelf tv prevPP nextP = do
prevP <- readTVar prevPP
-- If our back pointer points to our forward pointer then we have
-- already been removed from the queue
case prevP == nextP of
True -> pure ()
False -> do
next <- readTVar nextP
writeTVar prevP next
case next of
TNil -> writeTVar tv prevP
TCons bp _ _ -> writeTVar bp prevP
writeTVar prevPP nextP
{-# INLINE removeSelf #-}

-- | Returns an STM action that removes the pushed element from the
-- queue
push :: WaiterQueue a -> a -> STM (STM ())
push (WaiterQueue _ tv) a = do
fwdPointer <- readTVar tv
backPointer <- newTVar fwdPointer
emptyVar <- newTVar TNil
let cell = TCons backPointer a emptyVar
writeTVar fwdPointer cell
writeTVar tv emptyVar
pure (removeSelf tv backPointer emptyVar)
{-# INLINE push #-}

pop :: WaiterQueue a -> STM (Maybe a)
pop (WaiterQueue hv tv) = do
firstElem <- readTVar hv
case firstElem of
TNil -> pure Nothing
TCons bp a fp -> do
f <- readTVar fp
writeTVar hv f
case f of
TNil -> writeTVar tv hv
TCons fbp _ _ -> writeTVar fbp hv
-- point the back pointer to the forward pointer as a sign that
-- the cell has been popped (referenced in removeSelf)
writeTVar bp fp
pure (Just a)
{-# INLINE pop #-}
1 change: 1 addition & 0 deletions resource-pool.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ flag developer
library
exposed-modules:
Data.Pool
other-modules: Data.Pool.WaiterQueue

build-depends:
base >= 4.4 && < 5,
Expand Down