{-# OPTIONS_GHC -Wno-unrecognised-pragmas #-}

{-# HLINT ignore "Use withAsync" #-}

module Control.Concurrent.Extended
  ( module Control.Concurrent,
    sleep,
    ForkableMonadIO,

    -- * Robust forking
    forkImmortal,
    forkManagedT,
    forkManagedTWithGracefulShutdown,

    -- * Concurrency in MonadError
    forConcurrentlyEIO,
    concurrentlyEIO,

    -- * Deprecated
    ImmortalThreadLog (..),
    ThreadState (..),
    ThreadShutdown (..),
    Forever (..),
  )
where

import Control.Concurrent hiding (threadDelay)
import Control.Concurrent qualified as Base
import Control.Concurrent.Async as A
import Control.Concurrent.Async.Lifted.Safe qualified as LA
import Control.Concurrent.STM qualified as STM
import Control.Exception
import Control.Immortal qualified as Immortal
import Control.Monad.Except
import Control.Monad.Loops (iterateM_)
import Control.Monad.Trans.Control qualified as MC
import Control.Monad.Trans.Managed (ManagedT (..), allocate)
import Data.Aeson
import Data.List.Split
import Data.Traversable
import Data.Void
-- For forkImmortal. We could also have it take a cumbersome continuation if we
-- want to break this dependency. Probably best to move Hasura.Logging into a
-- separate lib with this if we do the override thing.
import Hasura.Logging
import Hasura.Prelude

{-# HLINT ignore sleep #-}

-- | Like 'Base.threadDelay', but takes a 'DiffTime' instead of an 'Int' microseconds.
--
-- NOTE: you cannot simply replace e.g. @threadDelay 1000@ with @sleep 1000@ since those literals
-- have different meanings!
sleep :: DiffTime -> IO ()
sleep :: DiffTime -> IO ()
sleep = Int -> IO ()
Base.threadDelay (Int -> IO ()) -> (DiffTime -> Int) -> DiffTime -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Microseconds -> Int
forall b. Integral b => Microseconds -> b
forall a b. (RealFrac a, Integral b) => a -> b
round (Microseconds -> Int)
-> (DiffTime -> Microseconds) -> DiffTime -> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DiffTime -> Microseconds
Microseconds

-- | Note: Please consider using 'forkManagedT' instead to ensure reliable
-- resource cleanup.
forkImmortal ::
  (ForkableMonadIO m) =>
  -- | A label describing this thread's function (see 'labelThread').
  String ->
  Logger Hasura ->
  -- | An IO action we expect never to return normally. This will have the type
  -- signature ':: m a' (see e.g. the type of 'forever').
  m Void ->
  -- | A handle for the forked thread. See "Control.Immortal".
  m Immortal.Thread
forkImmortal :: forall (m :: * -> *).
ForkableMonadIO m =>
String -> Logger Hasura -> m Void -> m Thread
forkImmortal String
label Logger Hasura
logger m Void
m =
  String -> (Thread -> m ()) -> m Thread
forall (m :: * -> *).
MonadBaseControl IO m =>
String -> (Thread -> m ()) -> m Thread
Immortal.createWithLabel String
label ((Thread -> m ()) -> m Thread) -> (Thread -> m ()) -> m Thread
forall a b. (a -> b) -> a -> b
$ \Thread
this -> do
    -- Log that the thread has started
    IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Logger Hasura
-> forall a (m :: * -> *).
   (ToEngineLog a Hasura, MonadIO m) =>
   a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
   (ToEngineLog a impl, MonadIO m) =>
   a -> m ()
unLogger Logger Hasura
logger (String -> ImmortalThreadLog
ImmortalThreadRestarted String
label)
    -- In this case, we are handling unexpected exceptions.
    -- i.e This does not catch the asynchronous exception which stops the thread.
    Thread -> (Either SomeException () -> m ()) -> m () -> m ()
forall (m :: * -> *).
MonadBaseControl IO m =>
Thread -> (Either SomeException () -> m ()) -> m () -> m ()
Immortal.onUnexpectedFinish Thread
this Either SomeException () -> m ()
logAndPause (m Void -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void m Void
m)
  where
    logAndPause :: Either SomeException () -> m ()
logAndPause = \case
      Right ()
_void -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure () -- absurd _void (i.e. unreachable)
      Left SomeException
e -> IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
        IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Logger Hasura
-> forall a (m :: * -> *).
   (ToEngineLog a Hasura, MonadIO m) =>
   a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
   (ToEngineLog a impl, MonadIO m) =>
   a -> m ()
unLogger Logger Hasura
logger (String -> SomeException -> ImmortalThreadLog
ImmortalThreadUnexpectedException String
label SomeException
e)
        -- pause before restarting some arbitrary amount of time. The idea is not to flood
        -- logs or cause other cascading failures.
        DiffTime -> IO ()
sleep (Seconds -> DiffTime
seconds Seconds
1)

data ThreadState = ThreadForked | ThreadBlocking | ThreadShutdownInitiated
  deriving (Int -> ThreadState -> ShowS
[ThreadState] -> ShowS
ThreadState -> String
(Int -> ThreadState -> ShowS)
-> (ThreadState -> String)
-> ([ThreadState] -> ShowS)
-> Show ThreadState
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ThreadState -> ShowS
showsPrec :: Int -> ThreadState -> ShowS
$cshow :: ThreadState -> String
show :: ThreadState -> String
$cshowList :: [ThreadState] -> ShowS
showList :: [ThreadState] -> ShowS
Show, ThreadState -> ThreadState -> Bool
(ThreadState -> ThreadState -> Bool)
-> (ThreadState -> ThreadState -> Bool) -> Eq ThreadState
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ThreadState -> ThreadState -> Bool
== :: ThreadState -> ThreadState -> Bool
$c/= :: ThreadState -> ThreadState -> Bool
/= :: ThreadState -> ThreadState -> Bool
Eq)

-- | @ThreadShutdown@ is a newtype wrapper over an action which is intended
--   to execute when a thread's shutdown is initiated before killing the thread
newtype ThreadShutdown m = ThreadShutdown {forall (m :: * -> *). ThreadShutdown m -> m ()
tsThreadShutdown :: m ()}

-- | This function pairs a call to 'forkImmortal' with a finalizer which stops
-- the immortal thread.

-- Note, the thread object can leave its scope if this function is incorrectly
-- used. Generally, the result should only be used later in the same ManagedT
-- scope.
forkManagedT ::
  (ForkableMonadIO m) =>
  String ->
  Logger Hasura ->
  m Void ->
  ManagedT m Immortal.Thread
forkManagedT :: forall (m :: * -> *).
ForkableMonadIO m =>
String -> Logger Hasura -> m Void -> ManagedT m Thread
forkManagedT String
label Logger Hasura
logger m Void
m =
  m Thread -> (Thread -> m ()) -> ManagedT m Thread
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
m a -> (a -> m b) -> ManagedT m a
allocate
    (String -> Logger Hasura -> m Void -> m Thread
forall (m :: * -> *).
ForkableMonadIO m =>
String -> Logger Hasura -> m Void -> m Thread
forkImmortal String
label Logger Hasura
logger m Void
m)
    ( \Thread
thread -> do
        Logger Hasura
-> forall a (m :: * -> *).
   (ToEngineLog a Hasura, MonadIO m) =>
   a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
   (ToEngineLog a impl, MonadIO m) =>
   a -> m ()
unLogger Logger Hasura
logger (String -> ImmortalThreadLog
ImmortalThreadStopping String
label)
        IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Thread -> IO ()
Immortal.stop Thread
thread
    )

-- | The @Forever@ type defines an infinite looping monadic action (like @m void@), but allows the
-- caller to control the recursion or insert code before each iteration. The @a@ is the initial argument,
-- and subsequent iterations will be fed the argument returned by the previous one.  See
-- @forkManagedTWithGracefulShutdown@ to see how it's used
data Forever m = forall a. Forever a (a -> m a)

-- | @forkManagedTWithGracefulShutdown@ is an extension of the @forkManagedT@
--   function this function also attempts to gracefully shutdown the thread. This function
--   accepts a `m (Forever m)` argument. The @Forever@ type contains a function and an argument
--   to the function. The function supplied will be run repeatedly until shutdown is initiated. The
--   response of the function will be the argument to the next iteration.
--
--   For reference, this function is used to run the async actions processor. Check
--   `asyncActionsProcessor`
forkManagedTWithGracefulShutdown ::
  (ForkableMonadIO m) =>
  String ->
  Logger Hasura ->
  ThreadShutdown m ->
  m (Forever m) ->
  ManagedT m Immortal.Thread
forkManagedTWithGracefulShutdown :: forall (m :: * -> *).
ForkableMonadIO m =>
String
-> Logger Hasura
-> ThreadShutdown m
-> m (Forever m)
-> ManagedT m Thread
forkManagedTWithGracefulShutdown String
label Logger Hasura
logger (ThreadShutdown m ()
threadShutdownHandler) m (Forever m)
loopIteration = do
  TVar ThreadState
threadStateTVar <- IO (TVar ThreadState) -> ManagedT m (TVar ThreadState)
forall a. IO a -> ManagedT m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (TVar ThreadState) -> ManagedT m (TVar ThreadState))
-> IO (TVar ThreadState) -> ManagedT m (TVar ThreadState)
forall a b. (a -> b) -> a -> b
$ ThreadState -> IO (TVar ThreadState)
forall a. a -> IO (TVar a)
STM.newTVarIO ThreadState
ThreadForked
  m Thread -> (Thread -> m ()) -> ManagedT m Thread
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
m a -> (a -> m b) -> ManagedT m a
allocate
    ( String -> (Thread -> m ()) -> m Thread
forall (m :: * -> *).
MonadBaseControl IO m =>
String -> (Thread -> m ()) -> m Thread
Immortal.createWithLabel String
label ((Thread -> m ()) -> m Thread) -> (Thread -> m ()) -> m Thread
forall a b. (a -> b) -> a -> b
$ \Thread
this -> do
        -- Log that the thread has started
        IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Logger Hasura
-> forall a (m :: * -> *).
   (ToEngineLog a Hasura, MonadIO m) =>
   a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
   (ToEngineLog a impl, MonadIO m) =>
   a -> m ()
unLogger Logger Hasura
logger (String -> ImmortalThreadLog
ImmortalThreadRestarted String
label)
        -- In this case, we are handling unexpected exceptions.
        -- i.e This does not catch the asynchronous exception which stops the thread.
        Thread -> (Either SomeException () -> m ()) -> m () -> m ()
forall (m :: * -> *).
MonadBaseControl IO m =>
Thread -> (Either SomeException () -> m ()) -> m () -> m ()
Immortal.onUnexpectedFinish Thread
this Either SomeException () -> m ()
logAndPause
          (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ ( do
                let mLoop :: Forever m -> m Any
mLoop (Forever a
loopFunctionInitArg a -> m a
loopFunction) =
                      ((a -> m a) -> a -> m Any) -> a -> (a -> m a) -> m Any
forall a b c. (a -> b -> c) -> b -> a -> c
flip (a -> m a) -> a -> m Any
forall (m :: * -> *) a b. Monad m => (a -> m a) -> a -> m b
iterateM_ a
loopFunctionInitArg ((a -> m a) -> m Any) -> (a -> m a) -> m Any
forall a b. (a -> b) -> a -> b
$ \a
args -> do
                        IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
                          (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
STM.atomically
                          (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
                            TVar ThreadState -> STM ThreadState
forall a. TVar a -> STM a
STM.readTVar TVar ThreadState
threadStateTVar STM ThreadState -> (ThreadState -> STM ()) -> STM ()
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
                              ThreadState
ThreadShutdownInitiated -> do
                                -- signal to the finalizer that we are now blocking
                                -- and blocking forever since this
                                -- var moves monotonically from forked -> shutdown -> blocking
                                TVar ThreadState -> ThreadState -> STM ()
forall a. TVar a -> a -> STM ()
STM.writeTVar TVar ThreadState
threadStateTVar ThreadState
ThreadBlocking
                              ThreadState
ThreadBlocking -> STM ()
forall a. STM a
STM.retry
                              ThreadState
ThreadForked -> () -> STM ()
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
                        a -> m a
loopFunction a
args
                Async Any
t <- m Any -> m (Async Any)
forall (m :: * -> *) a.
(MonadBaseControl IO m, Forall (Pure m)) =>
m a -> m (Async a)
LA.async (m Any -> m (Async Any)) -> m Any -> m (Async Any)
forall a b. (a -> b) -> a -> b
$ Forever m -> m Any
mLoop (Forever m -> m Any) -> m (Forever m) -> m Any
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< m (Forever m)
loopIteration
                Async Any -> m ()
forall (m :: * -> *) a. MonadBase IO m => Async a -> m ()
LA.link Async Any
t
                m Any -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Any -> m ()) -> m Any -> m ()
forall a b. (a -> b) -> a -> b
$ Async Any -> m Any
forall (m :: * -> *) a.
(MonadBase IO m, Forall (Pure m)) =>
Async a -> m a
LA.wait Async Any
t
            )
    )
    ( \Thread
thread -> do
        IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
          (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
STM.atomically
          (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar ThreadState -> (ThreadState -> ThreadState) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
STM.modifyTVar' TVar ThreadState
threadStateTVar (ThreadState -> ThreadState -> ThreadState
forall a b. a -> b -> a
const ThreadState
ThreadShutdownInitiated)
        -- the threadShutdownHandler here will wait for any in-flight events
        -- to finish processing
        {-
            There is a conundrum here about whether the @threadShutdownHandler@
            should be before or after the @ThreadBlocking@ check call, this is because
            there are problems with both the cases:

            1. @threadShutdownHandler@ before the @ThreadBlocking@ check
            ------------------------------------------------------------

            Let's say we're just about to start processing a new iteration of the
            loop function and before the processing actually starts the shutdown is
            initiated, there will be no in-flight events (because the batch hasn't started processing yet) so
            @threadShutdownHandler@ will return immediately and the new batch will start processing
            which were fetched earlier. This is a race condition and may kill the thread with some
            of the events still processing.

            2. @threadShutdownHandler@ after the @ThreadBlocking@ check
            -----------------------------------------------------------

            This will solve the above race condition but will cause a new problem. The
            graphql-engine accepts a config called `--graceful-shutdown-timeout` which is a timeout
            for any in-flight processing events that are running in the graphql-engine to complete
            processing within this time.

            Let's say we are going to start iterating over the next iteration of `processEventQueue`
            and without loss of generality let's say this batch takes 100 seconds to finish processing
            and the graceful shutdown timeout is 10 seconds and shutdown is initiated in the midst of processing
            this batch, this will have no effect and the thread will be shutdown after the batch completes (after
            100 seconds) which is wrong because it doesn't respect the graceful shutdown timeout

            TODO: figure out a way which solves both the problems

            At the time of writing this PR, we decided to go with 1 because the worst thing
            that will happen is that some events might get processed more than once but this
            is a better solution than what we had earlier where we were shutting down all the in-flight
            processing events without the graceful shutdown timeout.
        -}
        m ()
threadShutdownHandler
        IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
          (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
STM.atomically
          (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
            TVar ThreadState -> STM ThreadState
forall a. TVar a -> STM a
STM.readTVar TVar ThreadState
threadStateTVar STM ThreadState -> (ThreadState -> STM ()) -> STM ()
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Bool -> STM ()
STM.check (Bool -> STM ()) -> (ThreadState -> Bool) -> ThreadState -> STM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ThreadState -> ThreadState -> Bool
forall a. Eq a => a -> a -> Bool
== ThreadState
ThreadBlocking)
        Logger Hasura
-> forall a (m :: * -> *).
   (ToEngineLog a Hasura, MonadIO m) =>
   a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
   (ToEngineLog a impl, MonadIO m) =>
   a -> m ()
unLogger Logger Hasura
logger (String -> ImmortalThreadLog
ImmortalThreadStopping String
label)
        IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Thread -> IO ()
Immortal.stop Thread
thread
    )
  where
    logAndPause :: Either SomeException () -> m ()
logAndPause = \case
      Right () -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
      Left SomeException
e -> IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
        IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Logger Hasura
-> forall a (m :: * -> *).
   (ToEngineLog a Hasura, MonadIO m) =>
   a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
   (ToEngineLog a impl, MonadIO m) =>
   a -> m ()
unLogger Logger Hasura
logger (String -> SomeException -> ImmortalThreadLog
ImmortalThreadUnexpectedException String
label SomeException
e)
        -- pause before restarting some arbitrary amount of time. The idea is not to flood
        -- logs or cause other cascading failures.
        DiffTime -> IO ()
sleep (Seconds -> DiffTime
seconds Seconds
1)

data ImmortalThreadLog
  = -- | Synchronous Exception
    ImmortalThreadUnexpectedException String SomeException
  | -- | Asynchronous Exception about to be sent
    ImmortalThreadStopping String
  | ImmortalThreadRestarted String

instance ToEngineLog ImmortalThreadLog Hasura where
  toEngineLog :: ImmortalThreadLog -> (LogLevel, EngineLogType Hasura, Value)
toEngineLog (ImmortalThreadStopping String
label) =
    (LogLevel
LevelInfo, InternalLogTypes -> EngineLogType Hasura
ELTInternal InternalLogTypes
ILTUnstructured, String -> Value
forall a. ToJSON a => a -> Value
toJSON String
msg)
    where
      msg :: String
msg = String
"Stopping immortal " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
label String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
" thread"
  toEngineLog (ImmortalThreadUnexpectedException String
label SomeException
e) =
    (LogLevel
LevelError, InternalLogTypes -> EngineLogType Hasura
ELTInternal InternalLogTypes
ILTUnstructured, String -> Value
forall a. ToJSON a => a -> Value
toJSON String
msg)
    where
      msg :: String
msg =
        String
"Unexpected exception in immortal thread "
          String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
label
          String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
" (it will be restarted):\n"
          String -> ShowS
forall a. Semigroup a => a -> a -> a
<> SomeException -> String
forall a. Show a => a -> String
show SomeException
e
  toEngineLog (ImmortalThreadRestarted String
label) =
    (LogLevel
LevelInfo, InternalLogTypes -> EngineLogType Hasura
ELTInternal InternalLogTypes
ILTUnstructured, String -> Value
forall a. ToJSON a => a -> Value
toJSON String
msg)
    where
      msg :: String
msg = String
"Thread " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
label String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
" (re)started"

-- TODO
--   - maybe use this everywhere, but also:
--     - consider unifying with: src-lib/Control/Monad/Stateless.hs  ?
--   - nice TypeError:  https://kodimensional.dev/type-errors
--

-- | Like 'MonadIO' but constrained to stacks in which forking a new thread is reasonable/safe.
-- In particular 'StateT' causes problems.
--
-- This is the constraint you can use for functions that call 'LA.async', or 'immortal'.
type ForkableMonadIO m = (MonadIO m, MC.MonadBaseControl IO m, LA.Forall (LA.Pure m))

-- TODO consider deprecating async.
--        export something with polymorphic return type, which makes "fork and forget" difficult
--        this could automatically link in one variant
--        another variant might return ThreadId that self destructs w/ finalizer (mkWeakThreadId)
--          and note: "Holding a normal ThreadId reference will prevent the delivery of BlockedIndefinitely exceptions because the reference could be used as the target of throwTo at any time,  "

-- | A somewhat wonky function for parallelizing @for xs f@ where @f@ is
-- @(MonadIO m, MonadError e m)@. This is equivalent to @for xs f@ modulo the
-- IO effects (i.e. when the IO has no real side effects we care about).
--
-- This also takes a @chunkSize@ argument so you can manipulate the amount of
-- work given to each thread.
forConcurrentlyEIO :: (MonadIO m, MonadError e m) => Int -> [a] -> (a -> ExceptT e IO b) -> m [b]
forConcurrentlyEIO :: forall (m :: * -> *) e a b.
(MonadIO m, MonadError e m) =>
Int -> [a] -> (a -> ExceptT e IO b) -> m [b]
forConcurrentlyEIO Int
chunkSize [a]
xs a -> ExceptT e IO b
f = do
  let fIO :: a -> IO (Either e b)
fIO a
a = ExceptT e IO b -> IO (Either e b)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (a -> ExceptT e IO b
f a
a) IO (Either e b)
-> (Either e b -> IO (Either e b)) -> IO (Either e b)
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Either e b -> IO (Either e b)
forall a. a -> IO a
evaluate
  [Either e b]
xs' <- IO [Either e b] -> m [Either e b]
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [Either e b] -> m [Either e b])
-> IO [Either e b] -> m [Either e b]
forall a b. (a -> b) -> a -> b
$ ([[Either e b]] -> [Either e b])
-> IO [[Either e b]] -> IO [Either e b]
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap [[Either e b]] -> [Either e b]
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat (IO [[Either e b]] -> IO [Either e b])
-> IO [[Either e b]] -> IO [Either e b]
forall a b. (a -> b) -> a -> b
$ [[a]] -> ([a] -> IO [Either e b]) -> IO [[Either e b]]
forall (t :: * -> *) a b.
Traversable t =>
t a -> (a -> IO b) -> IO (t b)
A.forConcurrently (Int -> [a] -> [[a]]
forall e. Int -> [e] -> [[e]]
chunksOf Int
chunkSize [a]
xs) (([a] -> IO [Either e b]) -> IO [[Either e b]])
-> ([a] -> IO [Either e b]) -> IO [[Either e b]]
forall a b. (a -> b) -> a -> b
$ (a -> IO (Either e b)) -> [a] -> IO [Either e b]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> [a] -> f [b]
traverse a -> IO (Either e b)
fIO
  [Either e b] -> (Either e b -> m b) -> m [b]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
t a -> (a -> f b) -> f (t b)
for [Either e b]
xs' ((e -> m b) -> (b -> m b) -> Either e b -> m b
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either e -> m b
forall a. e -> m a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError b -> m b
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure)

concurrentlyEIO :: (MonadIO m, MonadError e m) => ExceptT e IO a -> ExceptT e IO b -> m (a, b)
concurrentlyEIO :: forall (m :: * -> *) e a b.
(MonadIO m, MonadError e m) =>
ExceptT e IO a -> ExceptT e IO b -> m (a, b)
concurrentlyEIO ExceptT e IO a
left ExceptT e IO b
right = do
  (Either e a
leftE, Either e b
rightE) <- IO (Either e a, Either e b) -> m (Either e a, Either e b)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either e a, Either e b) -> m (Either e a, Either e b))
-> IO (Either e a, Either e b) -> m (Either e a, Either e b)
forall a b. (a -> b) -> a -> b
$ IO (Either e a) -> IO (Either e b) -> IO (Either e a, Either e b)
forall a b. IO a -> IO b -> IO (a, b)
A.concurrently (ExceptT e IO a -> IO (Either e a)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT ExceptT e IO a
left IO (Either e a)
-> (Either e a -> IO (Either e a)) -> IO (Either e a)
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Either e a -> IO (Either e a)
forall a. a -> IO a
evaluate) (ExceptT e IO b -> IO (Either e b)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT ExceptT e IO b
right IO (Either e b)
-> (Either e b -> IO (Either e b)) -> IO (Either e b)
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Either e b -> IO (Either e b)
forall a. a -> IO a
evaluate)
  a
x <- Either e a
leftE Either e a -> (e -> m a) -> m a
forall (m :: * -> *) e a.
Applicative m =>
Either e a -> (e -> m a) -> m a
`onLeft` e -> m a
forall a. e -> m a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError
  b
y <- Either e b
rightE Either e b -> (e -> m b) -> m b
forall (m :: * -> *) e a.
Applicative m =>
Either e a -> (e -> m a) -> m a
`onLeft` e -> m b
forall a. e -> m a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError
  (a, b) -> m (a, b)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (a
x, b
y)