{-# LANGUAGE FlexibleContexts #-}

-----------------------------------------------------------------------------

-----------------------------------------------------------------------------

-- |
-- Module      :  Distribution.Client.JobControl
-- Copyright   :  (c) Duncan Coutts 2012
-- License     :  BSD-like
--
-- Maintainer  :  cabal-devel@haskell.org
-- Stability   :  provisional
-- Portability :  portable
--
-- A job control concurrency abstraction
module Distribution.Client.JobControl
  ( JobControl
  , newSerialJobControl
  , newParallelJobControl
  , newSemaphoreJobControl
  , spawnJob
  , collectJob
  , remainingJobs
  , cancelJobs
  , cleanupJobControl
  , jobControlSemaphore
  , JobLimit
  , newJobLimit
  , withJobLimit
  , Lock
  , newLock
  , criticalSection
  ) where

import Distribution.Client.Compat.Prelude
import Prelude ()

import Control.Concurrent (forkIO, forkIOWithUnmask, threadDelay)
import Control.Concurrent.MVar
import Control.Concurrent.STM (STM, TVar, atomically, modifyTVar', newTVarIO, readTVar)
import Control.Concurrent.STM.TChan
import Control.Exception (bracket_, mask_, try)
import Control.Monad (forever, replicateM_)
import Distribution.Client.Compat.Semaphore
import Distribution.Compat.Stack
import Distribution.Simple.Utils
import System.Semaphore

-- | A simple concurrency abstraction. Jobs can be spawned and can complete
-- in any order. This allows both serial and parallel implementations.
data JobControl m a = JobControl
  { forall (m :: * -> *) a. JobControl m a -> m a -> m ()
spawnJob :: m a -> m ()
  -- ^ Add a new job to the pool of jobs
  , forall (m :: * -> *) a. JobControl m a -> m a
collectJob :: m a
  -- ^ Wait until one job is complete
  , forall (m :: * -> *) a. JobControl m a -> m Bool
remainingJobs :: m Bool
  -- ^ Returns True if there are any outstanding jobs
  -- (ie spawned but yet to be collected)
  , forall (m :: * -> *) a. JobControl m a -> m ()
cancelJobs :: m ()
  -- ^ Try to cancel any outstanding but not-yet-started jobs.
  -- Call 'remainingJobs' after this to find out if any jobs are left
  -- (ie could not be cancelled).
  , forall (m :: * -> *) a. JobControl m a -> m ()
cleanupJobControl :: m ()
  -- ^ cleanup any resources created by the JobControl, intended to be used
  -- as the finaliser for `bracket`.
  , forall (m :: * -> *) a. JobControl m a -> Maybe SemaphoreName
jobControlSemaphore :: Maybe SemaphoreName
  -- ^ Name of the semaphore which can be used to control parallelism, if one
  -- is available for that job control type.
  }

-- | Make a 'JobControl' that executes all jobs serially and in order.
-- It only executes jobs on demand when they are collected, not eagerly.
--
-- Cancelling will cancel /all/ jobs that have not been collected yet.
newSerialJobControl :: IO (JobControl IO a)
newSerialJobControl :: forall a. IO (JobControl IO a)
newSerialJobControl = do
  qVar <- IO (TChan (IO a))
forall a. IO (TChan a)
newTChanIO
  return
    JobControl
      { spawnJob = spawn qVar
      , collectJob = collect qVar
      , remainingJobs = remaining qVar
      , cancelJobs = cancel qVar
      , cleanupJobControl = return ()
      , jobControlSemaphore = Nothing
      }
  where
    spawn :: TChan (IO a) -> IO a -> IO ()
    spawn :: forall a. TChan (IO a) -> IO a -> IO ()
spawn TChan (IO a)
qVar IO a
job = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TChan (IO a) -> IO a -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan TChan (IO a)
qVar IO a
job

    collect :: TChan (IO a) -> IO a
    collect :: forall a. TChan (IO a) -> IO a
collect TChan (IO a)
qVar =
      IO (IO a) -> IO a
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO a) -> IO a) -> IO (IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ STM (IO a) -> IO (IO a)
forall a. STM a -> IO a
atomically (STM (IO a) -> IO (IO a)) -> STM (IO a) -> IO (IO a)
forall a b. (a -> b) -> a -> b
$ TChan (IO a) -> STM (IO a)
forall a. TChan a -> STM a
readTChan TChan (IO a)
qVar

    remaining :: TChan (IO a) -> IO Bool
    remaining :: forall a. TChan (IO a) -> IO Bool
remaining TChan (IO a)
qVar = (Bool -> Bool) -> IO Bool -> IO Bool
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Bool -> Bool
not (IO Bool -> IO Bool) -> IO Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ TChan (IO a) -> STM Bool
forall a. TChan a -> STM Bool
isEmptyTChan TChan (IO a)
qVar

    cancel :: TChan (IO a) -> IO ()
    cancel :: forall a. TChan (IO a) -> IO ()
cancel TChan (IO a)
qVar = do
      _ <- STM [IO a] -> IO [IO a]
forall a. STM a -> IO a
atomically (STM [IO a] -> IO [IO a]) -> STM [IO a] -> IO [IO a]
forall a b. (a -> b) -> a -> b
$ TChan (IO a) -> STM [IO a]
forall a. TChan a -> STM [a]
readAllTChan TChan (IO a)
qVar
      return ()

-- | Make a 'JobControl' that eagerly executes jobs in parallel, with a given
-- maximum degree of parallelism.
--
-- Cancelling will cancel jobs that have not yet begun executing, but jobs
-- that have already been executed or are currently executing cannot be
-- cancelled.
newParallelJobControl :: WithCallStack (Int -> IO (JobControl IO a))
newParallelJobControl :: forall a. WithCallStack (Int -> IO (JobControl IO a))
newParallelJobControl Int
n
  | Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
1 Bool -> Bool -> Bool
|| Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
1000 =
      [Char] -> IO (JobControl IO a)
forall a. HasCallStack => [Char] -> a
error ([Char] -> IO (JobControl IO a)) -> [Char] -> IO (JobControl IO a)
forall a b. (a -> b) -> a -> b
$ [Char]
"newParallelJobControl: not a sensible number of jobs: " [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show Int
n
newParallelJobControl Int
maxJobLimit = do
  inqVar <- IO (TChan (IO a))
forall a. IO (TChan a)
newTChanIO
  outqVar <- newTChanIO
  countVar <- newTVarIO 0
  replicateM_ maxJobLimit $
    forkIO $
      worker inqVar outqVar
  return
    JobControl
      { spawnJob = spawn inqVar countVar
      , collectJob = collect outqVar countVar
      , remainingJobs = remaining countVar
      , cancelJobs = cancel inqVar countVar
      , cleanupJobControl = return ()
      , jobControlSemaphore = Nothing
      }
  where
    worker :: TChan (IO a) -> TChan (Either SomeException a) -> IO ()
    worker :: forall a. TChan (IO a) -> TChan (Either SomeException a) -> IO ()
worker TChan (IO a)
inqVar TChan (Either SomeException a)
outqVar =
      IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
        job <- STM (IO a) -> IO (IO a)
forall a. STM a -> IO a
atomically (STM (IO a) -> IO (IO a)) -> STM (IO a) -> IO (IO a)
forall a b. (a -> b) -> a -> b
$ TChan (IO a) -> STM (IO a)
forall a. TChan a -> STM a
readTChan TChan (IO a)
inqVar
        res <- try job
        atomically $ writeTChan outqVar res

    spawn :: TChan (IO a) -> TVar Int -> IO a -> IO ()
    spawn :: forall a. TChan (IO a) -> TVar Int -> IO a -> IO ()
spawn TChan (IO a)
inqVar TVar Int
countVar IO a
job =
      STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
        TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
countVar (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
        TChan (IO a) -> IO a -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan TChan (IO a)
inqVar IO a
job

    collect :: TChan (Either SomeException a) -> TVar Int -> IO a
    collect :: forall a. TChan (Either SomeException a) -> TVar Int -> IO a
collect TChan (Either SomeException a)
outqVar TVar Int
countVar = do
      res <- STM (Either SomeException a) -> IO (Either SomeException a)
forall a. STM a -> IO a
atomically (STM (Either SomeException a) -> IO (Either SomeException a))
-> STM (Either SomeException a) -> IO (Either SomeException a)
forall a b. (a -> b) -> a -> b
$ do
        TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
countVar (Int -> Int -> Int
forall a. Num a => a -> a -> a
subtract Int
1)
        TChan (Either SomeException a) -> STM (Either SomeException a)
forall a. TChan a -> STM a
readTChan TChan (Either SomeException a)
outqVar
      either throwIO return res

    remaining :: TVar Int -> IO Bool
    remaining :: TVar Int -> IO Bool
remaining TVar Int
countVar = (Int -> Bool) -> IO Int -> IO Bool
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
/= Int
0) (IO Int -> IO Bool) -> IO Int -> IO Bool
forall a b. (a -> b) -> a -> b
$ STM Int -> IO Int
forall a. STM a -> IO a
atomically (STM Int -> IO Int) -> STM Int -> IO Int
forall a b. (a -> b) -> a -> b
$ TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
countVar

    cancel :: TChan (IO a) -> TVar Int -> IO ()
    cancel :: forall a. TChan (IO a) -> TVar Int -> IO ()
cancel TChan (IO a)
inqVar TVar Int
countVar =
      STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
        xs <- TChan (IO a) -> STM [IO a]
forall a. TChan a -> STM [a]
readAllTChan TChan (IO a)
inqVar
        modifyTVar' countVar (subtract (length xs))

readAllTChan :: TChan a -> STM [a]
readAllTChan :: forall a. TChan a -> STM [a]
readAllTChan TChan a
qvar = [a] -> STM [a]
go []
  where
    go :: [a] -> STM [a]
go [a]
xs = do
      mx <- TChan a -> STM (Maybe a)
forall a. TChan a -> STM (Maybe a)
tryReadTChan TChan a
qvar
      case mx of
        Maybe a
Nothing -> [a] -> STM [a]
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return ([a] -> [a]
forall a. [a] -> [a]
reverse [a]
xs)
        Just a
x -> [a] -> STM [a]
go (a
x a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
xs)

-- | Make a 'JobControl' where the parallism is controlled by a semaphore.
--
-- This uses the GHC -jsem option to allow GHC to take additional semaphore slots
-- if we are not using them all.
newSemaphoreJobControl :: WithCallStack (Verbosity -> Int -> IO (JobControl IO a))
newSemaphoreJobControl :: forall a. WithCallStack (Verbosity -> Int -> IO (JobControl IO a))
newSemaphoreJobControl Verbosity
_ Int
n
  | Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
1 Bool -> Bool -> Bool
|| Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
1000 =
      [Char] -> IO (JobControl IO a)
forall a. HasCallStack => [Char] -> a
error ([Char] -> IO (JobControl IO a)) -> [Char] -> IO (JobControl IO a)
forall a b. (a -> b) -> a -> b
$ [Char]
"newParallelJobControl: not a sensible number of jobs: " [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show Int
n
newSemaphoreJobControl Verbosity
verbosity Int
maxJobLimit = do
  sem <- [Char] -> Int -> IO Semaphore
freshSemaphore [Char]
"cabal_semaphore" Int
maxJobLimit
  notice verbosity $
    "Created semaphore called "
      ++ getSemaphoreName (semaphoreName sem)
      ++ " with "
      ++ show maxJobLimit
      ++ " slots."
  outqVar <- newTChanIO
  inqVar <- newTChanIO
  countVar <- newTVarIO 0
  void (forkIO (worker sem inqVar outqVar))
  return
    JobControl
      { spawnJob = spawn inqVar countVar
      , collectJob = collect outqVar countVar
      , remainingJobs = remaining countVar
      , cancelJobs = cancel inqVar countVar
      , cleanupJobControl = destroySemaphore sem
      , jobControlSemaphore = Just (semaphoreName sem)
      }
  where
    worker :: Semaphore -> TChan (IO a) -> TChan (Either SomeException a) -> IO ()
    worker :: forall a.
Semaphore
-> TChan (IO a) -> TChan (Either SomeException a) -> IO ()
worker Semaphore
sem TChan (IO a)
inqVar TChan (Either SomeException a)
outqVar =
      IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
        job <- STM (IO a) -> IO (IO a)
forall a. STM a -> IO a
atomically (STM (IO a) -> IO (IO a)) -> STM (IO a) -> IO (IO a)
forall a b. (a -> b) -> a -> b
$ TChan (IO a) -> STM (IO a)
forall a. TChan a -> STM a
readTChan TChan (IO a)
inqVar
        -- mask here, as we need to ensure that the thread which contains the
        -- release action is spawned. Otherwise, there is the chance that an
        -- async exception is thrown between the semaphore being taken and the
        -- thread being spawned.
        mask_ $ do
          waitOnSemaphore sem
          void $ forkIOWithUnmask $ \forall a. IO a -> IO a
unmask -> do
            res <- IO a -> IO (Either SomeException a)
forall e a. Exception e => IO a -> IO (Either e a)
try (IO a -> IO a
forall a. IO a -> IO a
unmask IO a
job)
            releaseSemaphore sem 1
            atomically $ writeTChan outqVar res
        -- Try to give GHC enough time to compute the module graph and then
        -- request some additional capabilities if it can make use of them. The
        -- ideal situation is that we have 1 GHC process running which has taken
        -- all the capabilities in the semaphore, as this will reduce memory usage.
        --
        -- 0.25s is chosen by discussion between MP and SD on Mar 17th 2023 as a number
        -- which isn't too big and not too small but also, not scientifically.
        threadDelay 250000

    spawn :: TChan (IO a) -> TVar Int -> IO a -> IO ()
    spawn :: forall a. TChan (IO a) -> TVar Int -> IO a -> IO ()
spawn TChan (IO a)
inqVar TVar Int
countVar IO a
job =
      STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
        TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
countVar (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
        TChan (IO a) -> IO a -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan TChan (IO a)
inqVar IO a
job

    collect :: TChan (Either SomeException a) -> TVar Int -> IO a
    collect :: forall a. TChan (Either SomeException a) -> TVar Int -> IO a
collect TChan (Either SomeException a)
outqVar TVar Int
countVar = do
      res <- STM (Either SomeException a) -> IO (Either SomeException a)
forall a. STM a -> IO a
atomically (STM (Either SomeException a) -> IO (Either SomeException a))
-> STM (Either SomeException a) -> IO (Either SomeException a)
forall a b. (a -> b) -> a -> b
$ do
        TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
countVar (Int -> Int -> Int
forall a. Num a => a -> a -> a
subtract Int
1)
        TChan (Either SomeException a) -> STM (Either SomeException a)
forall a. TChan a -> STM a
readTChan TChan (Either SomeException a)
outqVar
      either throwIO return res

    remaining :: TVar Int -> IO Bool
    remaining :: TVar Int -> IO Bool
remaining TVar Int
countVar = (Int -> Bool) -> IO Int -> IO Bool
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
/= Int
0) (IO Int -> IO Bool) -> IO Int -> IO Bool
forall a b. (a -> b) -> a -> b
$ STM Int -> IO Int
forall a. STM a -> IO a
atomically (STM Int -> IO Int) -> STM Int -> IO Int
forall a b. (a -> b) -> a -> b
$ TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
countVar

    cancel :: TChan (IO a) -> TVar Int -> IO ()
    cancel :: forall a. TChan (IO a) -> TVar Int -> IO ()
cancel TChan (IO a)
inqVar TVar Int
countVar =
      STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
        xs <- TChan (IO a) -> STM [IO a]
forall a. TChan a -> STM [a]
readAllTChan TChan (IO a)
inqVar
        modifyTVar' countVar (subtract (length xs))

-------------------------
-- Job limits and locks
--

data JobLimit = JobLimit QSem

newJobLimit :: Int -> IO JobLimit
newJobLimit :: Int -> IO JobLimit
newJobLimit Int
n =
  (QSem -> JobLimit) -> IO QSem -> IO JobLimit
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap QSem -> JobLimit
JobLimit (Int -> IO QSem
newQSem Int
n)

withJobLimit :: JobLimit -> IO a -> IO a
withJobLimit :: forall a. JobLimit -> IO a -> IO a
withJobLimit (JobLimit QSem
sem) =
  IO () -> IO () -> IO a -> IO a
forall a b c. IO a -> IO b -> IO c -> IO c
bracket_ (QSem -> IO ()
waitQSem QSem
sem) (QSem -> IO ()
signalQSem QSem
sem)

newtype Lock = Lock (MVar ())

newLock :: IO Lock
newLock :: IO Lock
newLock = (MVar () -> Lock) -> IO (MVar ()) -> IO Lock
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap MVar () -> Lock
Lock (IO (MVar ()) -> IO Lock) -> IO (MVar ()) -> IO Lock
forall a b. (a -> b) -> a -> b
$ () -> IO (MVar ())
forall a. a -> IO (MVar a)
newMVar ()

criticalSection :: Lock -> IO a -> IO a
criticalSection :: forall a. Lock -> IO a -> IO a
criticalSection (Lock MVar ()
lck) IO a
act = IO () -> IO () -> IO a -> IO a
forall a b c. IO a -> IO b -> IO c -> IO c
bracket_ (MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar MVar ()
lck) (MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
lck ()) IO a
act