{-# OPTIONS_GHC -Wno-unrecognised-pragmas #-}
module Control.Concurrent.Extended
( module Control.Concurrent,
sleep,
ForkableMonadIO,
forkImmortal,
forkManagedT,
forkManagedTWithGracefulShutdown,
forConcurrentlyEIO,
ImmortalThreadLog (..),
ThreadState (..),
ThreadShutdown (..),
Forever (..),
)
where
import Control.Concurrent hiding (threadDelay)
import Control.Concurrent qualified as Base
import Control.Concurrent.Async as A
import Control.Concurrent.Async.Lifted.Safe qualified as LA
import Control.Concurrent.STM qualified as STM
import Control.Exception
import Control.Immortal qualified as Immortal
import Control.Monad.Except
import Control.Monad.Loops (iterateM_)
import Control.Monad.Trans.Control qualified as MC
import Control.Monad.Trans.Managed (ManagedT (..), allocate)
import Data.Aeson
import Data.List.Split
import Data.Time.Clock.Units (DiffTime, Microseconds (..), seconds)
import Data.Traversable
import Data.Void
import Hasura.Logging
import Prelude
{-# HLINT ignore sleep #-}
sleep :: DiffTime -> IO ()
sleep :: DiffTime -> IO ()
sleep = Int -> IO ()
Base.threadDelay (Int -> IO ()) -> (DiffTime -> Int) -> DiffTime -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Microseconds -> Int
forall a b. (RealFrac a, Integral b) => a -> b
round (Microseconds -> Int)
-> (DiffTime -> Microseconds) -> DiffTime -> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DiffTime -> Microseconds
Microseconds
forkImmortal ::
ForkableMonadIO m =>
String ->
Logger Hasura ->
m Void ->
m Immortal.Thread
forkImmortal :: String -> Logger Hasura -> m Void -> m Thread
forkImmortal String
label Logger Hasura
logger m Void
m =
String -> (Thread -> m ()) -> m Thread
forall (m :: * -> *).
MonadBaseControl IO m =>
String -> (Thread -> m ()) -> m Thread
Immortal.createWithLabel String
label ((Thread -> m ()) -> m Thread) -> (Thread -> m ()) -> m Thread
forall a b. (a -> b) -> a -> b
$ \Thread
this -> do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Logger Hasura -> ImmortalThreadLog -> IO ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
(ToEngineLog a impl, MonadIO m) =>
a -> m ()
unLogger Logger Hasura
logger (String -> ImmortalThreadLog
ImmortalThreadRestarted String
label)
Thread -> (Either SomeException () -> m ()) -> m () -> m ()
forall (m :: * -> *).
MonadBaseControl IO m =>
Thread -> (Either SomeException () -> m ()) -> m () -> m ()
Immortal.onUnexpectedFinish Thread
this Either SomeException () -> m ()
logAndPause (m Void -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void m Void
m)
where
logAndPause :: Either SomeException () -> m ()
logAndPause = \case
Right ()
_void -> () -> m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Left SomeException
e -> IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Logger Hasura -> ImmortalThreadLog -> IO ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
(ToEngineLog a impl, MonadIO m) =>
a -> m ()
unLogger Logger Hasura
logger (String -> SomeException -> ImmortalThreadLog
ImmortalThreadUnexpectedException String
label SomeException
e)
DiffTime -> IO ()
sleep (Seconds -> DiffTime
seconds Seconds
1)
data ThreadState = ThreadForked | ThreadBlocking | ThreadShutdownInitiated
deriving (Int -> ThreadState -> ShowS
[ThreadState] -> ShowS
ThreadState -> String
(Int -> ThreadState -> ShowS)
-> (ThreadState -> String)
-> ([ThreadState] -> ShowS)
-> Show ThreadState
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ThreadState] -> ShowS
$cshowList :: [ThreadState] -> ShowS
show :: ThreadState -> String
$cshow :: ThreadState -> String
showsPrec :: Int -> ThreadState -> ShowS
$cshowsPrec :: Int -> ThreadState -> ShowS
Show, ThreadState -> ThreadState -> Bool
(ThreadState -> ThreadState -> Bool)
-> (ThreadState -> ThreadState -> Bool) -> Eq ThreadState
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ThreadState -> ThreadState -> Bool
$c/= :: ThreadState -> ThreadState -> Bool
== :: ThreadState -> ThreadState -> Bool
$c== :: ThreadState -> ThreadState -> Bool
Eq)
newtype ThreadShutdown m = ThreadShutdown {ThreadShutdown m -> m ()
tsThreadShutdown :: m ()}
forkManagedT ::
ForkableMonadIO m =>
String ->
Logger Hasura ->
m Void ->
ManagedT m Immortal.Thread
forkManagedT :: String -> Logger Hasura -> m Void -> ManagedT m Thread
forkManagedT String
label Logger Hasura
logger m Void
m =
m Thread -> (Thread -> m ()) -> ManagedT m Thread
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
m a -> (a -> m b) -> ManagedT m a
allocate
(String -> Logger Hasura -> m Void -> m Thread
forall (m :: * -> *).
ForkableMonadIO m =>
String -> Logger Hasura -> m Void -> m Thread
forkImmortal String
label Logger Hasura
logger m Void
m)
( \Thread
thread -> do
Logger Hasura -> ImmortalThreadLog -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
(ToEngineLog a impl, MonadIO m) =>
a -> m ()
unLogger Logger Hasura
logger (String -> ImmortalThreadLog
ImmortalThreadStopping String
label)
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Thread -> IO ()
Immortal.stop Thread
thread
)
data Forever m = forall a. Forever a (a -> m a)
forkManagedTWithGracefulShutdown ::
ForkableMonadIO m =>
String ->
Logger Hasura ->
ThreadShutdown m ->
m (Forever m) ->
ManagedT m Immortal.Thread
forkManagedTWithGracefulShutdown :: String
-> Logger Hasura
-> ThreadShutdown m
-> m (Forever m)
-> ManagedT m Thread
forkManagedTWithGracefulShutdown String
label Logger Hasura
logger (ThreadShutdown m ()
threadShutdownHandler) m (Forever m)
loopIteration = do
TVar ThreadState
threadStateTVar <- IO (TVar ThreadState) -> ManagedT m (TVar ThreadState)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (TVar ThreadState) -> ManagedT m (TVar ThreadState))
-> IO (TVar ThreadState) -> ManagedT m (TVar ThreadState)
forall a b. (a -> b) -> a -> b
$ ThreadState -> IO (TVar ThreadState)
forall a. a -> IO (TVar a)
STM.newTVarIO ThreadState
ThreadForked
m Thread -> (Thread -> m ()) -> ManagedT m Thread
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
m a -> (a -> m b) -> ManagedT m a
allocate
( String -> (Thread -> m ()) -> m Thread
forall (m :: * -> *).
MonadBaseControl IO m =>
String -> (Thread -> m ()) -> m Thread
Immortal.createWithLabel String
label ((Thread -> m ()) -> m Thread) -> (Thread -> m ()) -> m Thread
forall a b. (a -> b) -> a -> b
$ \Thread
this -> do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Logger Hasura -> ImmortalThreadLog -> IO ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
(ToEngineLog a impl, MonadIO m) =>
a -> m ()
unLogger Logger Hasura
logger (String -> ImmortalThreadLog
ImmortalThreadRestarted String
label)
Thread -> (Either SomeException () -> m ()) -> m () -> m ()
forall (m :: * -> *).
MonadBaseControl IO m =>
Thread -> (Either SomeException () -> m ()) -> m () -> m ()
Immortal.onUnexpectedFinish Thread
this Either SomeException () -> m ()
logAndPause (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
( do
let mLoop :: Forever m -> m Any
mLoop (Forever a
loopFunctionInitArg a -> m a
loopFunction) =
((a -> m a) -> a -> m Any) -> a -> (a -> m a) -> m Any
forall a b c. (a -> b -> c) -> b -> a -> c
flip (a -> m a) -> a -> m Any
forall (m :: * -> *) a b. Monad m => (a -> m a) -> a -> m b
iterateM_ a
loopFunctionInitArg ((a -> m a) -> m Any) -> (a -> m a) -> m Any
forall a b. (a -> b) -> a -> b
$ \a
args -> do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$
STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
TVar ThreadState -> STM ThreadState
forall a. TVar a -> STM a
STM.readTVar TVar ThreadState
threadStateTVar STM ThreadState -> (ThreadState -> STM ()) -> STM ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
ThreadState
ThreadShutdownInitiated -> do
TVar ThreadState -> ThreadState -> STM ()
forall a. TVar a -> a -> STM ()
STM.writeTVar TVar ThreadState
threadStateTVar ThreadState
ThreadBlocking
ThreadState
ThreadBlocking -> STM ()
forall a. STM a
STM.retry
ThreadState
ThreadForked -> () -> STM ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
a -> m a
loopFunction a
args
Async Any
t <- m Any -> m (Async Any)
forall (m :: * -> *) a.
(MonadBaseControl IO m, Forall (Pure m)) =>
m a -> m (Async a)
LA.async (m Any -> m (Async Any)) -> m Any -> m (Async Any)
forall a b. (a -> b) -> a -> b
$ Forever m -> m Any
mLoop (Forever m -> m Any) -> m (Forever m) -> m Any
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< m (Forever m)
loopIteration
Async Any -> m ()
forall (m :: * -> *) a. MonadBase IO m => Async a -> m ()
LA.link Async Any
t
m Any -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Any -> m ()) -> m Any -> m ()
forall a b. (a -> b) -> a -> b
$ Async Any -> m Any
forall (m :: * -> *) a.
(MonadBase IO m, Forall (Pure m)) =>
Async a -> m a
LA.wait Async Any
t
)
)
( \Thread
thread -> do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$
STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
TVar ThreadState -> (ThreadState -> ThreadState) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
STM.modifyTVar' TVar ThreadState
threadStateTVar (ThreadState -> ThreadState -> ThreadState
forall a b. a -> b -> a
const ThreadState
ThreadShutdownInitiated)
m ()
threadShutdownHandler
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$
STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
TVar ThreadState -> STM ThreadState
forall a. TVar a -> STM a
STM.readTVar TVar ThreadState
threadStateTVar STM ThreadState -> (ThreadState -> STM ()) -> STM ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Bool -> STM ()
STM.check (Bool -> STM ()) -> (ThreadState -> Bool) -> ThreadState -> STM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ThreadState -> ThreadState -> Bool
forall a. Eq a => a -> a -> Bool
== ThreadState
ThreadBlocking)
Logger Hasura -> ImmortalThreadLog -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
(ToEngineLog a impl, MonadIO m) =>
a -> m ()
unLogger Logger Hasura
logger (String -> ImmortalThreadLog
ImmortalThreadStopping String
label)
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Thread -> IO ()
Immortal.stop Thread
thread
)
where
logAndPause :: Either SomeException () -> m ()
logAndPause = \case
Right () -> () -> m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Left SomeException
e -> IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Logger Hasura -> ImmortalThreadLog -> IO ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
(ToEngineLog a impl, MonadIO m) =>
a -> m ()
unLogger Logger Hasura
logger (String -> SomeException -> ImmortalThreadLog
ImmortalThreadUnexpectedException String
label SomeException
e)
DiffTime -> IO ()
sleep (Seconds -> DiffTime
seconds Seconds
1)
data ImmortalThreadLog
=
ImmortalThreadUnexpectedException String SomeException
|
ImmortalThreadStopping String
| ImmortalThreadRestarted String
instance ToEngineLog ImmortalThreadLog Hasura where
toEngineLog :: ImmortalThreadLog -> (LogLevel, EngineLogType Hasura, Value)
toEngineLog (ImmortalThreadStopping String
label) =
(LogLevel
LevelInfo, InternalLogTypes -> EngineLogType Hasura
ELTInternal InternalLogTypes
ILTUnstructured, String -> Value
forall a. ToJSON a => a -> Value
toJSON String
msg)
where
msg :: String
msg = String
"Stopping immortal " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
label String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
" thread"
toEngineLog (ImmortalThreadUnexpectedException String
label SomeException
e) =
(LogLevel
LevelError, InternalLogTypes -> EngineLogType Hasura
ELTInternal InternalLogTypes
ILTUnstructured, String -> Value
forall a. ToJSON a => a -> Value
toJSON String
msg)
where
msg :: String
msg =
String
"Unexpected exception in immortal thread " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
label String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
" (it will be restarted):\n"
String -> ShowS
forall a. Semigroup a => a -> a -> a
<> SomeException -> String
forall a. Show a => a -> String
show SomeException
e
toEngineLog (ImmortalThreadRestarted String
label) =
(LogLevel
LevelInfo, InternalLogTypes -> EngineLogType Hasura
ELTInternal InternalLogTypes
ILTUnstructured, String -> Value
forall a. ToJSON a => a -> Value
toJSON String
msg)
where
msg :: String
msg = String
"Thread " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
label String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
" (re)started"
type ForkableMonadIO m = (MonadIO m, MC.MonadBaseControl IO m, LA.Forall (LA.Pure m))
forConcurrentlyEIO :: (MonadIO m, MonadError e m) => Int -> [a] -> (a -> ExceptT e IO b) -> m [b]
forConcurrentlyEIO :: Int -> [a] -> (a -> ExceptT e IO b) -> m [b]
forConcurrentlyEIO Int
chunkSize [a]
xs a -> ExceptT e IO b
f = do
let fIO :: a -> IO (Either e b)
fIO a
a = ExceptT e IO b -> IO (Either e b)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (a -> ExceptT e IO b
f a
a) IO (Either e b)
-> (Either e b -> IO (Either e b)) -> IO (Either e b)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Either e b -> IO (Either e b)
forall a. a -> IO a
evaluate
[Either e b]
xs' <- IO [Either e b] -> m [Either e b]
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [Either e b] -> m [Either e b])
-> IO [Either e b] -> m [Either e b]
forall a b. (a -> b) -> a -> b
$ ([[Either e b]] -> [Either e b])
-> IO [[Either e b]] -> IO [Either e b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap [[Either e b]] -> [Either e b]
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat (IO [[Either e b]] -> IO [Either e b])
-> IO [[Either e b]] -> IO [Either e b]
forall a b. (a -> b) -> a -> b
$ [[a]] -> ([a] -> IO [Either e b]) -> IO [[Either e b]]
forall (t :: * -> *) a b.
Traversable t =>
t a -> (a -> IO b) -> IO (t b)
A.forConcurrently (Int -> [a] -> [[a]]
forall e. Int -> [e] -> [[e]]
chunksOf Int
chunkSize [a]
xs) (([a] -> IO [Either e b]) -> IO [[Either e b]])
-> ([a] -> IO [Either e b]) -> IO [[Either e b]]
forall a b. (a -> b) -> a -> b
$ (a -> IO (Either e b)) -> [a] -> IO [Either e b]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse a -> IO (Either e b)
fIO
[Either e b] -> (Either e b -> m b) -> m [b]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
t a -> (a -> f b) -> f (t b)
for [Either e b]
xs' ((e -> m b) -> (b -> m b) -> Either e b -> m b
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either e -> m b
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError b -> m b
forall (f :: * -> *) a. Applicative f => a -> f a
pure)