{-# LANGUAGE CPP #-}
{-# LANGUAGE TemplateHaskell #-}
module Hasura.Server.SchemaUpdate
( startSchemaSyncListenerThread,
startSchemaSyncProcessorThread,
ThreadType (..),
ThreadError (..),
)
where
import Control.Concurrent.Extended qualified as C
import Control.Concurrent.STM qualified as STM
import Control.Immortal qualified as Immortal
import Control.Monad.Loops qualified as L
import Control.Monad.Trans.Control (MonadBaseControl)
import Control.Monad.Trans.Managed (ManagedT)
import Data.Aeson
import Data.Aeson.Casing
import Data.Aeson.TH
import Data.HashMap.Strict qualified as HM
import Data.HashSet qualified as HS
import Database.PG.Query qualified as Q
import Hasura.Base.Error
import Hasura.Logging
import Hasura.Metadata.Class
import Hasura.Prelude
import Hasura.RQL.DDL.Schema (runCacheRWT)
import Hasura.RQL.DDL.Schema.Catalog
import Hasura.RQL.Types.Numeric (NonNegative)
import Hasura.RQL.Types.Numeric qualified as Numeric
import Hasura.RQL.Types.Run
import Hasura.RQL.Types.SchemaCache
import Hasura.RQL.Types.SchemaCache.Build
import Hasura.RQL.Types.Source
import Hasura.Server.Logging
import Hasura.Server.SchemaCacheRef
( SchemaCacheRef,
readSchemaCacheRef,
withSchemaCacheUpdate,
)
import Hasura.Server.Types
import Hasura.Session
import Network.HTTP.Client qualified as HTTP
data ThreadType
= TTListener
| TTProcessor
deriving (ThreadType -> ThreadType -> Bool
(ThreadType -> ThreadType -> Bool)
-> (ThreadType -> ThreadType -> Bool) -> Eq ThreadType
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ThreadType -> ThreadType -> Bool
$c/= :: ThreadType -> ThreadType -> Bool
== :: ThreadType -> ThreadType -> Bool
$c== :: ThreadType -> ThreadType -> Bool
Eq)
instance Show ThreadType where
show :: ThreadType -> String
show ThreadType
TTListener = String
"listener"
show ThreadType
TTProcessor = String
"processor"
data SchemaSyncThreadLog = SchemaSyncThreadLog
{ SchemaSyncThreadLog -> LogLevel
suelLogLevel :: !LogLevel,
SchemaSyncThreadLog -> ThreadType
suelThreadType :: !ThreadType,
SchemaSyncThreadLog -> Value
suelInfo :: !Value
}
deriving (Int -> SchemaSyncThreadLog -> ShowS
[SchemaSyncThreadLog] -> ShowS
SchemaSyncThreadLog -> String
(Int -> SchemaSyncThreadLog -> ShowS)
-> (SchemaSyncThreadLog -> String)
-> ([SchemaSyncThreadLog] -> ShowS)
-> Show SchemaSyncThreadLog
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [SchemaSyncThreadLog] -> ShowS
$cshowList :: [SchemaSyncThreadLog] -> ShowS
show :: SchemaSyncThreadLog -> String
$cshow :: SchemaSyncThreadLog -> String
showsPrec :: Int -> SchemaSyncThreadLog -> ShowS
$cshowsPrec :: Int -> SchemaSyncThreadLog -> ShowS
Show, SchemaSyncThreadLog -> SchemaSyncThreadLog -> Bool
(SchemaSyncThreadLog -> SchemaSyncThreadLog -> Bool)
-> (SchemaSyncThreadLog -> SchemaSyncThreadLog -> Bool)
-> Eq SchemaSyncThreadLog
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: SchemaSyncThreadLog -> SchemaSyncThreadLog -> Bool
$c/= :: SchemaSyncThreadLog -> SchemaSyncThreadLog -> Bool
== :: SchemaSyncThreadLog -> SchemaSyncThreadLog -> Bool
$c== :: SchemaSyncThreadLog -> SchemaSyncThreadLog -> Bool
Eq)
instance ToJSON SchemaSyncThreadLog where
toJSON :: SchemaSyncThreadLog -> Value
toJSON (SchemaSyncThreadLog LogLevel
_ ThreadType
t Value
info) =
[Pair] -> Value
object
[ Key
"thread_type" Key -> String -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= ThreadType -> String
forall a. Show a => a -> String
show ThreadType
t,
Key
"info" Key -> Value -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= Value
info
]
instance ToEngineLog SchemaSyncThreadLog Hasura where
toEngineLog :: SchemaSyncThreadLog -> (LogLevel, EngineLogType Hasura, Value)
toEngineLog SchemaSyncThreadLog
threadLog =
(SchemaSyncThreadLog -> LogLevel
suelLogLevel SchemaSyncThreadLog
threadLog, InternalLogTypes -> EngineLogType Hasura
ELTInternal InternalLogTypes
ILTSchemaSyncThread, SchemaSyncThreadLog -> Value
forall a. ToJSON a => a -> Value
toJSON SchemaSyncThreadLog
threadLog)
data ThreadError
= TEPayloadParse !Text
| TEQueryError !QErr
$( deriveToJSON
defaultOptions
{ constructorTagModifier = snakeCase . drop 2,
sumEncoding = TaggedObject "type" "info"
}
''ThreadError
)
logThreadStarted ::
(MonadIO m) =>
Logger Hasura ->
InstanceId ->
ThreadType ->
Immortal.Thread ->
m ()
logThreadStarted :: Logger Hasura -> InstanceId -> ThreadType -> Thread -> m ()
logThreadStarted Logger Hasura
logger InstanceId
instanceId ThreadType
threadType Thread
thread =
let msg :: Text
msg = ThreadType -> Text
forall a. Show a => a -> Text
tshow ThreadType
threadType Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" thread started"
in Logger Hasura
-> forall a (m :: * -> *).
(ToEngineLog a Hasura, MonadIO m) =>
a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
(ToEngineLog a impl, MonadIO m) =>
a -> m ()
unLogger Logger Hasura
logger (StartupLog -> m ()) -> StartupLog -> m ()
forall a b. (a -> b) -> a -> b
$
LogLevel -> Text -> Value -> StartupLog
StartupLog LogLevel
LevelInfo Text
"schema-sync" (Value -> StartupLog) -> Value -> StartupLog
forall a b. (a -> b) -> a -> b
$
[Pair] -> Value
object
[ Key
"instance_id" Key -> Text -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= InstanceId -> Text
getInstanceId InstanceId
instanceId,
Key
"thread_id" Key -> String -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= ThreadId -> String
forall a. Show a => a -> String
show (Thread -> ThreadId
Immortal.threadId Thread
thread),
Key
"message" Key -> Text -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= Text
msg
]
startSchemaSyncListenerThread ::
C.ForkableMonadIO m =>
Logger Hasura ->
Q.PGPool ->
InstanceId ->
NonNegative Milliseconds ->
STM.TMVar MetadataResourceVersion ->
ManagedT m (Immortal.Thread)
startSchemaSyncListenerThread :: Logger Hasura
-> PGPool
-> InstanceId
-> NonNegative Milliseconds
-> TMVar MetadataResourceVersion
-> ManagedT m Thread
startSchemaSyncListenerThread Logger Hasura
logger PGPool
pool InstanceId
instanceId NonNegative Milliseconds
interval TMVar MetadataResourceVersion
metaVersionRef = do
Thread
listenerThread <-
String -> Logger Hasura -> m Void -> ManagedT m Thread
forall (m :: * -> *).
ForkableMonadIO m =>
String -> Logger Hasura -> m Void -> ManagedT m Thread
C.forkManagedT String
"SchemeUpdate.listener" Logger Hasura
logger (m Void -> ManagedT m Thread) -> m Void -> ManagedT m Thread
forall a b. (a -> b) -> a -> b
$
Logger Hasura
-> PGPool
-> TMVar MetadataResourceVersion
-> Milliseconds
-> m Void
forall (m :: * -> *) void.
MonadIO m =>
Logger Hasura
-> PGPool
-> TMVar MetadataResourceVersion
-> Milliseconds
-> m void
listener Logger Hasura
logger PGPool
pool TMVar MetadataResourceVersion
metaVersionRef (NonNegative Milliseconds -> Milliseconds
forall a. NonNegative a -> a
Numeric.getNonNegative NonNegative Milliseconds
interval)
Logger Hasura
-> InstanceId -> ThreadType -> Thread -> ManagedT m ()
forall (m :: * -> *).
MonadIO m =>
Logger Hasura -> InstanceId -> ThreadType -> Thread -> m ()
logThreadStarted Logger Hasura
logger InstanceId
instanceId ThreadType
TTListener Thread
listenerThread
Thread -> ManagedT m Thread
forall (f :: * -> *) a. Applicative f => a -> f a
pure Thread
listenerThread
startSchemaSyncProcessorThread ::
( C.ForkableMonadIO m,
MonadMetadataStorage (MetadataStorageT m),
MonadResolveSource m
) =>
Logger Hasura ->
HTTP.Manager ->
STM.TMVar MetadataResourceVersion ->
SchemaCacheRef ->
InstanceId ->
ServerConfigCtx ->
STM.TVar Bool ->
ManagedT m Immortal.Thread
startSchemaSyncProcessorThread :: Logger Hasura
-> Manager
-> TMVar MetadataResourceVersion
-> SchemaCacheRef
-> InstanceId
-> ServerConfigCtx
-> TVar Bool
-> ManagedT m Thread
startSchemaSyncProcessorThread
Logger Hasura
logger
Manager
httpMgr
TMVar MetadataResourceVersion
schemaSyncEventRef
SchemaCacheRef
cacheRef
InstanceId
instanceId
ServerConfigCtx
serverConfigCtx
TVar Bool
logTVar = do
Thread
processorThread <-
String -> Logger Hasura -> m Void -> ManagedT m Thread
forall (m :: * -> *).
ForkableMonadIO m =>
String -> Logger Hasura -> m Void -> ManagedT m Thread
C.forkManagedT String
"SchemeUpdate.processor" Logger Hasura
logger (m Void -> ManagedT m Thread) -> m Void -> ManagedT m Thread
forall a b. (a -> b) -> a -> b
$
Logger Hasura
-> Manager
-> TMVar MetadataResourceVersion
-> SchemaCacheRef
-> InstanceId
-> ServerConfigCtx
-> TVar Bool
-> m Void
forall (m :: * -> *) void.
(ForkableMonadIO m, MonadMetadataStorage (MetadataStorageT m),
MonadResolveSource m) =>
Logger Hasura
-> Manager
-> TMVar MetadataResourceVersion
-> SchemaCacheRef
-> InstanceId
-> ServerConfigCtx
-> TVar Bool
-> m void
processor Logger Hasura
logger Manager
httpMgr TMVar MetadataResourceVersion
schemaSyncEventRef SchemaCacheRef
cacheRef InstanceId
instanceId ServerConfigCtx
serverConfigCtx TVar Bool
logTVar
Logger Hasura
-> InstanceId -> ThreadType -> Thread -> ManagedT m ()
forall (m :: * -> *).
MonadIO m =>
Logger Hasura -> InstanceId -> ThreadType -> Thread -> m ()
logThreadStarted Logger Hasura
logger InstanceId
instanceId ThreadType
TTProcessor Thread
processorThread
Thread -> ManagedT m Thread
forall (f :: * -> *) a. Applicative f => a -> f a
pure Thread
processorThread
forcePut :: STM.TMVar a -> a -> IO ()
forcePut :: TMVar a -> a -> IO ()
forcePut TMVar a
v a
a = STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar a -> STM (Maybe a)
forall a. TMVar a -> STM (Maybe a)
STM.tryTakeTMVar TMVar a
v STM (Maybe a) -> STM () -> STM ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> TMVar a -> a -> STM ()
forall a. TMVar a -> a -> STM ()
STM.putTMVar TMVar a
v a
a
schemaVersionCheckHandler ::
Q.PGPool -> STM.TMVar MetadataResourceVersion -> IO (Either QErr ())
schemaVersionCheckHandler :: PGPool -> TMVar MetadataResourceVersion -> IO (Either QErr ())
schemaVersionCheckHandler PGPool
pool TMVar MetadataResourceVersion
metaVersionRef =
ExceptT QErr IO MetadataResourceVersion
-> IO (Either QErr MetadataResourceVersion)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT
( PGPool
-> TxMode
-> TxET QErr IO MetadataResourceVersion
-> ExceptT QErr IO MetadataResourceVersion
forall (m :: * -> *) e a.
(MonadIO m, MonadBaseControl IO m, FromPGTxErr e,
FromPGConnErr e) =>
PGPool -> TxMode -> TxET e m a -> ExceptT e m a
Q.runTx PGPool
pool (TxIsolation
Q.RepeatableRead, Maybe TxAccess
forall a. Maybe a
Nothing) (TxET QErr IO MetadataResourceVersion
-> ExceptT QErr IO MetadataResourceVersion)
-> TxET QErr IO MetadataResourceVersion
-> ExceptT QErr IO MetadataResourceVersion
forall a b. (a -> b) -> a -> b
$
TxET QErr IO MetadataResourceVersion
fetchMetadataResourceVersionFromCatalog
)
IO (Either QErr MetadataResourceVersion)
-> (Either QErr MetadataResourceVersion -> IO (Either QErr ()))
-> IO (Either QErr ())
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Right MetadataResourceVersion
version -> () -> Either QErr ()
forall a b. b -> Either a b
Right (() -> Either QErr ()) -> IO () -> IO (Either QErr ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TMVar MetadataResourceVersion -> MetadataResourceVersion -> IO ()
forall a. TMVar a -> a -> IO ()
forcePut TMVar MetadataResourceVersion
metaVersionRef MetadataResourceVersion
version
Left QErr
err -> Either QErr () -> IO (Either QErr ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either QErr () -> IO (Either QErr ()))
-> Either QErr () -> IO (Either QErr ())
forall a b. (a -> b) -> a -> b
$ QErr -> Either QErr ()
forall a b. a -> Either a b
Left QErr
err
data ErrorState = ErrorState
{ ErrorState -> Maybe QErr
_esLastErrorSeen :: !(Maybe QErr),
ErrorState -> Maybe MetadataResourceVersion
_esLastMetadataVersion :: !(Maybe MetadataResourceVersion)
}
deriving (ErrorState -> ErrorState -> Bool
(ErrorState -> ErrorState -> Bool)
-> (ErrorState -> ErrorState -> Bool) -> Eq ErrorState
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ErrorState -> ErrorState -> Bool
$c/= :: ErrorState -> ErrorState -> Bool
== :: ErrorState -> ErrorState -> Bool
$c== :: ErrorState -> ErrorState -> Bool
Eq)
defaultErrorState :: ErrorState
defaultErrorState :: ErrorState
defaultErrorState = Maybe QErr -> Maybe MetadataResourceVersion -> ErrorState
ErrorState Maybe QErr
forall a. Maybe a
Nothing Maybe MetadataResourceVersion
forall a. Maybe a
Nothing
updateErrorInState :: ErrorState -> QErr -> MetadataResourceVersion -> ErrorState
updateErrorInState :: ErrorState -> QErr -> MetadataResourceVersion -> ErrorState
updateErrorInState ErrorState
es QErr
qerr MetadataResourceVersion
mrv =
ErrorState
es
{ _esLastErrorSeen :: Maybe QErr
_esLastErrorSeen = QErr -> Maybe QErr
forall a. a -> Maybe a
Just QErr
qerr,
_esLastMetadataVersion :: Maybe MetadataResourceVersion
_esLastMetadataVersion = MetadataResourceVersion -> Maybe MetadataResourceVersion
forall a. a -> Maybe a
Just MetadataResourceVersion
mrv
}
isInErrorState :: ErrorState -> Bool
isInErrorState :: ErrorState -> Bool
isInErrorState ErrorState
es =
(Maybe QErr -> Bool
forall a. Maybe a -> Bool
isJust (Maybe QErr -> Bool)
-> (ErrorState -> Maybe QErr) -> ErrorState -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ErrorState -> Maybe QErr
_esLastErrorSeen) ErrorState
es Bool -> Bool -> Bool
&& (Maybe MetadataResourceVersion -> Bool
forall a. Maybe a -> Bool
isJust (Maybe MetadataResourceVersion -> Bool)
-> (ErrorState -> Maybe MetadataResourceVersion)
-> ErrorState
-> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ErrorState -> Maybe MetadataResourceVersion
_esLastMetadataVersion) ErrorState
es
toLogError :: ErrorState -> QErr -> MetadataResourceVersion -> Bool
toLogError :: ErrorState -> QErr -> MetadataResourceVersion -> Bool
toLogError ErrorState
es QErr
qerr MetadataResourceVersion
mrv = Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ Bool
isQErrLastSeen Bool -> Bool -> Bool
|| Bool
isMetadataResourceVersionLastSeen
where
isQErrLastSeen :: Bool
isQErrLastSeen = case ErrorState -> Maybe QErr
_esLastErrorSeen ErrorState
es of
Just QErr
lErrS -> QErr
lErrS QErr -> QErr -> Bool
forall a. Eq a => a -> a -> Bool
== QErr
qerr
Maybe QErr
Nothing -> Bool
False
isMetadataResourceVersionLastSeen :: Bool
isMetadataResourceVersionLastSeen = case ErrorState -> Maybe MetadataResourceVersion
_esLastMetadataVersion ErrorState
es of
Just MetadataResourceVersion
lMRV -> MetadataResourceVersion
lMRV MetadataResourceVersion -> MetadataResourceVersion -> Bool
forall a. Eq a => a -> a -> Bool
== MetadataResourceVersion
mrv
Maybe MetadataResourceVersion
Nothing -> Bool
False
listener ::
MonadIO m =>
Logger Hasura ->
Q.PGPool ->
STM.TMVar MetadataResourceVersion ->
Milliseconds ->
m void
listener :: Logger Hasura
-> PGPool
-> TMVar MetadataResourceVersion
-> Milliseconds
-> m void
listener Logger Hasura
logger PGPool
pool TMVar MetadataResourceVersion
metaVersionRef Milliseconds
interval = (ErrorState -> m ErrorState) -> ErrorState -> m void
forall (m :: * -> *) a b. Monad m => (a -> m a) -> a -> m b
L.iterateM_ ErrorState -> m ErrorState
listenerLoop ErrorState
defaultErrorState
where
listenerLoop :: ErrorState -> m ErrorState
listenerLoop ErrorState
errorState = do
Maybe MetadataResourceVersion
mrv <- IO (Maybe MetadataResourceVersion)
-> m (Maybe MetadataResourceVersion)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe MetadataResourceVersion)
-> m (Maybe MetadataResourceVersion))
-> IO (Maybe MetadataResourceVersion)
-> m (Maybe MetadataResourceVersion)
forall a b. (a -> b) -> a -> b
$ STM (Maybe MetadataResourceVersion)
-> IO (Maybe MetadataResourceVersion)
forall a. STM a -> IO a
STM.atomically (STM (Maybe MetadataResourceVersion)
-> IO (Maybe MetadataResourceVersion))
-> STM (Maybe MetadataResourceVersion)
-> IO (Maybe MetadataResourceVersion)
forall a b. (a -> b) -> a -> b
$ TMVar MetadataResourceVersion
-> STM (Maybe MetadataResourceVersion)
forall a. TMVar a -> STM (Maybe a)
STM.tryTakeTMVar TMVar MetadataResourceVersion
metaVersionRef
Either QErr ()
resp <- IO (Either QErr ()) -> m (Either QErr ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either QErr ()) -> m (Either QErr ()))
-> IO (Either QErr ()) -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$ PGPool -> TMVar MetadataResourceVersion -> IO (Either QErr ())
schemaVersionCheckHandler PGPool
pool TMVar MetadataResourceVersion
metaVersionRef
let metadataVersion :: MetadataResourceVersion
metadataVersion = MetadataResourceVersion
-> Maybe MetadataResourceVersion -> MetadataResourceVersion
forall a. a -> Maybe a -> a
fromMaybe MetadataResourceVersion
initialResourceVersion Maybe MetadataResourceVersion
mrv
ErrorState
nextErr <- case Either QErr ()
resp of
Left QErr
respErr -> do
if (ErrorState -> QErr -> MetadataResourceVersion -> Bool
toLogError ErrorState
errorState QErr
respErr MetadataResourceVersion
metadataVersion)
then do
Logger Hasura -> ThreadType -> ThreadError -> m ()
forall (m :: * -> *) a.
(MonadIO m, ToJSON a) =>
Logger Hasura -> ThreadType -> a -> m ()
logError Logger Hasura
logger ThreadType
TTListener (ThreadError -> m ()) -> ThreadError -> m ()
forall a b. (a -> b) -> a -> b
$ QErr -> ThreadError
TEQueryError QErr
respErr
Logger Hasura -> ThreadType -> Value -> m ()
forall (m :: * -> *).
MonadIO m =>
Logger Hasura -> ThreadType -> Value -> m ()
logInfo Logger Hasura
logger ThreadType
TTListener (Value -> m ()) -> Value -> m ()
forall a b. (a -> b) -> a -> b
$ [Pair] -> Value
object [Key
"metadataResourceVersion" Key -> Value -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= MetadataResourceVersion -> Value
forall a. ToJSON a => a -> Value
toJSON MetadataResourceVersion
metadataVersion]
ErrorState -> m ErrorState
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ErrorState -> m ErrorState) -> ErrorState -> m ErrorState
forall a b. (a -> b) -> a -> b
$ ErrorState -> QErr -> MetadataResourceVersion -> ErrorState
updateErrorInState ErrorState
errorState QErr
respErr MetadataResourceVersion
metadataVersion
else do
ErrorState -> m ErrorState
forall (f :: * -> *) a. Applicative f => a -> f a
pure ErrorState
errorState
Right ()
_ -> do
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ErrorState -> Bool
isInErrorState ErrorState
errorState) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
Logger Hasura -> ThreadType -> Value -> m ()
forall (m :: * -> *).
MonadIO m =>
Logger Hasura -> ThreadType -> Value -> m ()
logInfo Logger Hasura
logger ThreadType
TTListener (Value -> m ()) -> Value -> m ()
forall a b. (a -> b) -> a -> b
$ [Pair] -> Value
object [Key
"message" Key -> Text -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= (Text
"SchemaSync Restored..." :: Text)]
ErrorState -> m ErrorState
forall (f :: * -> *) a. Applicative f => a -> f a
pure ErrorState
defaultErrorState
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ DiffTime -> IO ()
C.sleep (DiffTime -> IO ()) -> DiffTime -> IO ()
forall a b. (a -> b) -> a -> b
$ Milliseconds -> DiffTime
milliseconds Milliseconds
interval
ErrorState -> m ErrorState
forall (f :: * -> *) a. Applicative f => a -> f a
pure ErrorState
nextErr
processor ::
forall m void.
( C.ForkableMonadIO m,
MonadMetadataStorage (MetadataStorageT m),
MonadResolveSource m
) =>
Logger Hasura ->
HTTP.Manager ->
STM.TMVar MetadataResourceVersion ->
SchemaCacheRef ->
InstanceId ->
ServerConfigCtx ->
STM.TVar Bool ->
m void
processor :: Logger Hasura
-> Manager
-> TMVar MetadataResourceVersion
-> SchemaCacheRef
-> InstanceId
-> ServerConfigCtx
-> TVar Bool
-> m void
processor
Logger Hasura
logger
Manager
httpMgr
TMVar MetadataResourceVersion
metaVersionRef
SchemaCacheRef
cacheRef
InstanceId
instanceId
ServerConfigCtx
serverConfigCtx
TVar Bool
logTVar = m () -> m void
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m void) -> m () -> m void
forall a b. (a -> b) -> a -> b
$ do
MetadataResourceVersion
metaVersion <- IO MetadataResourceVersion -> m MetadataResourceVersion
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO MetadataResourceVersion -> m MetadataResourceVersion)
-> IO MetadataResourceVersion -> m MetadataResourceVersion
forall a b. (a -> b) -> a -> b
$ STM MetadataResourceVersion -> IO MetadataResourceVersion
forall a. STM a -> IO a
STM.atomically (STM MetadataResourceVersion -> IO MetadataResourceVersion)
-> STM MetadataResourceVersion -> IO MetadataResourceVersion
forall a b. (a -> b) -> a -> b
$ TMVar MetadataResourceVersion -> STM MetadataResourceVersion
forall a. TMVar a -> STM a
STM.takeTMVar TMVar MetadataResourceVersion
metaVersionRef
Either QErr ()
respErr <-
MetadataStorageT m () -> m (Either QErr ())
forall (m :: * -> *) a. MetadataStorageT m a -> m (Either QErr a)
runMetadataStorageT (MetadataStorageT m () -> m (Either QErr ()))
-> MetadataStorageT m () -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$
MetadataResourceVersion
-> InstanceId
-> Logger Hasura
-> Manager
-> SchemaCacheRef
-> ThreadType
-> ServerConfigCtx
-> TVar Bool
-> MetadataStorageT m ()
forall (m :: * -> *).
(MonadIO m, MonadBaseControl IO m, MonadMetadataStorage m,
MonadResolveSource m) =>
MetadataResourceVersion
-> InstanceId
-> Logger Hasura
-> Manager
-> SchemaCacheRef
-> ThreadType
-> ServerConfigCtx
-> TVar Bool
-> m ()
refreshSchemaCache MetadataResourceVersion
metaVersion InstanceId
instanceId Logger Hasura
logger Manager
httpMgr SchemaCacheRef
cacheRef ThreadType
TTProcessor ServerConfigCtx
serverConfigCtx TVar Bool
logTVar
Either QErr () -> (QErr -> m ()) -> m ()
forall (m :: * -> *) e a.
Applicative m =>
Either e a -> (e -> m a) -> m a
onLeft Either QErr ()
respErr (Logger Hasura -> ThreadType -> ThreadError -> m ()
forall (m :: * -> *) a.
(MonadIO m, ToJSON a) =>
Logger Hasura -> ThreadType -> a -> m ()
logError Logger Hasura
logger ThreadType
TTProcessor (ThreadError -> m ()) -> (QErr -> ThreadError) -> QErr -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. QErr -> ThreadError
TEQueryError)
refreshSchemaCache ::
( MonadIO m,
MonadBaseControl IO m,
MonadMetadataStorage m,
MonadResolveSource m
) =>
MetadataResourceVersion ->
InstanceId ->
Logger Hasura ->
HTTP.Manager ->
SchemaCacheRef ->
ThreadType ->
ServerConfigCtx ->
STM.TVar Bool ->
m ()
refreshSchemaCache :: MetadataResourceVersion
-> InstanceId
-> Logger Hasura
-> Manager
-> SchemaCacheRef
-> ThreadType
-> ServerConfigCtx
-> TVar Bool
-> m ()
refreshSchemaCache
MetadataResourceVersion
resourceVersion
InstanceId
instanceId
Logger Hasura
logger
Manager
httpManager
SchemaCacheRef
cacheRef
ThreadType
threadType
ServerConfigCtx
serverConfigCtx
TVar Bool
logTVar = do
Either QErr ()
respErr <- ExceptT QErr m () -> m (Either QErr ())
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT QErr m () -> m (Either QErr ()))
-> ExceptT QErr m () -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$
SchemaCacheRef
-> Logger Hasura
-> Maybe (TVar Bool)
-> ExceptT QErr m ((), RebuildableSchemaCache)
-> ExceptT QErr m ()
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
SchemaCacheRef
-> Logger Hasura
-> Maybe (TVar Bool)
-> m (a, RebuildableSchemaCache)
-> m a
withSchemaCacheUpdate SchemaCacheRef
cacheRef Logger Hasura
logger (TVar Bool -> Maybe (TVar Bool)
forall a. a -> Maybe a
Just TVar Bool
logTVar) (ExceptT QErr m ((), RebuildableSchemaCache) -> ExceptT QErr m ())
-> ExceptT QErr m ((), RebuildableSchemaCache) -> ExceptT QErr m ()
forall a b. (a -> b) -> a -> b
$ do
RebuildableSchemaCache
rebuildableCache <- IO RebuildableSchemaCache -> ExceptT QErr m RebuildableSchemaCache
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO RebuildableSchemaCache
-> ExceptT QErr m RebuildableSchemaCache)
-> IO RebuildableSchemaCache
-> ExceptT QErr m RebuildableSchemaCache
forall a b. (a -> b) -> a -> b
$ (RebuildableSchemaCache, SchemaCacheVer) -> RebuildableSchemaCache
forall a b. (a, b) -> a
fst ((RebuildableSchemaCache, SchemaCacheVer)
-> RebuildableSchemaCache)
-> IO (RebuildableSchemaCache, SchemaCacheVer)
-> IO RebuildableSchemaCache
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> SchemaCacheRef -> IO (RebuildableSchemaCache, SchemaCacheVer)
readSchemaCacheRef SchemaCacheRef
cacheRef
(()
msg, RebuildableSchemaCache
cache, CacheInvalidations
_) <- RunCtx
-> RunT m ((), RebuildableSchemaCache, CacheInvalidations)
-> ExceptT QErr m ((), RebuildableSchemaCache, CacheInvalidations)
forall (m :: * -> *) a. RunCtx -> RunT m a -> ExceptT QErr m a
peelRun RunCtx
runCtx (RunT m ((), RebuildableSchemaCache, CacheInvalidations)
-> ExceptT QErr m ((), RebuildableSchemaCache, CacheInvalidations))
-> RunT m ((), RebuildableSchemaCache, CacheInvalidations)
-> ExceptT QErr m ((), RebuildableSchemaCache, CacheInvalidations)
forall a b. (a -> b) -> a -> b
$
RebuildableSchemaCache
-> CacheRWT (RunT m) ()
-> RunT m ((), RebuildableSchemaCache, CacheInvalidations)
forall (m :: * -> *) a.
Functor m =>
RebuildableSchemaCache
-> CacheRWT m a
-> m (a, RebuildableSchemaCache, CacheInvalidations)
runCacheRWT RebuildableSchemaCache
rebuildableCache (CacheRWT (RunT m) ()
-> RunT m ((), RebuildableSchemaCache, CacheInvalidations))
-> CacheRWT (RunT m) ()
-> RunT m ((), RebuildableSchemaCache, CacheInvalidations)
forall a b. (a -> b) -> a -> b
$ do
SchemaCache
schemaCache <- CacheRWT (RunT m) SchemaCache
forall (m :: * -> *). CacheRM m => m SchemaCache
askSchemaCache
case SchemaCache -> Maybe MetadataResourceVersion
scMetadataResourceVersion SchemaCache
schemaCache of
Maybe MetadataResourceVersion
Nothing -> MetadataResourceVersion -> CacheRWT (RunT m) ()
forall (m :: * -> *). CacheRWM m => MetadataResourceVersion -> m ()
setMetadataResourceVersionInSchemaCache MetadataResourceVersion
resourceVersion
Just MetadataResourceVersion
engineResourceVersion ->
Bool -> CacheRWT (RunT m) () -> CacheRWT (RunT m) ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (MetadataResourceVersion
engineResourceVersion MetadataResourceVersion -> MetadataResourceVersion -> Bool
forall a. Eq a => a -> a -> Bool
== MetadataResourceVersion
resourceVersion) (CacheRWT (RunT m) () -> CacheRWT (RunT m) ())
-> CacheRWT (RunT m) () -> CacheRWT (RunT m) ()
forall a b. (a -> b) -> a -> b
$ do
(Metadata
metadata, MetadataResourceVersion
latestResourceVersion) <- CacheRWT (RunT m) (Metadata, MetadataResourceVersion)
forall (m :: * -> *).
MonadMetadataStorage m =>
m (Metadata, MetadataResourceVersion)
fetchMetadata
[(MetadataResourceVersion, CacheInvalidations)]
notifications <- MetadataResourceVersion
-> InstanceId
-> CacheRWT
(RunT m) [(MetadataResourceVersion, CacheInvalidations)]
forall (m :: * -> *).
MonadMetadataStorage m =>
MetadataResourceVersion
-> InstanceId -> m [(MetadataResourceVersion, CacheInvalidations)]
fetchMetadataNotifications MetadataResourceVersion
engineResourceVersion InstanceId
instanceId
Logger Hasura -> ThreadType -> String -> CacheRWT (RunT m) ()
forall (m :: * -> *).
MonadIO m =>
Logger Hasura -> ThreadType -> String -> m ()
logDebug Logger Hasura
logger ThreadType
threadType (String -> CacheRWT (RunT m) ()) -> String -> CacheRWT (RunT m) ()
forall a b. (a -> b) -> a -> b
$ String
"DEBUG: refreshSchemaCache Called: engineResourceVersion: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> MetadataResourceVersion -> String
forall a. Show a => a -> String
show MetadataResourceVersion
engineResourceVersion String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
", fresh resource version: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> MetadataResourceVersion -> String
forall a. Show a => a -> String
show MetadataResourceVersion
latestResourceVersion
case [(MetadataResourceVersion, CacheInvalidations)]
notifications of
[] -> do
Logger Hasura -> ThreadType -> Value -> CacheRWT (RunT m) ()
forall (m :: * -> *).
MonadIO m =>
Logger Hasura -> ThreadType -> Value -> m ()
logInfo Logger Hasura
logger ThreadType
threadType (Value -> CacheRWT (RunT m) ()) -> Value -> CacheRWT (RunT m) ()
forall a b. (a -> b) -> a -> b
$ [Pair] -> Value
object [Key
"message" Key -> Text -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= (Text
"Schema Version changed with no notifications" :: Text)]
MetadataResourceVersion -> CacheRWT (RunT m) ()
forall (m :: * -> *). CacheRWM m => MetadataResourceVersion -> m ()
setMetadataResourceVersionInSchemaCache MetadataResourceVersion
latestResourceVersion
[(MetadataResourceVersion, CacheInvalidations)]
_ -> do
let cacheInvalidations :: CacheInvalidations
cacheInvalidations =
if ((MetadataResourceVersion, CacheInvalidations) -> Bool)
-> [(MetadataResourceVersion, CacheInvalidations)] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
any ((MetadataResourceVersion -> MetadataResourceVersion -> Bool
forall a. Eq a => a -> a -> Bool
== (MetadataResourceVersion
engineResourceVersion MetadataResourceVersion
-> MetadataResourceVersion -> MetadataResourceVersion
forall a. Num a => a -> a -> a
+ MetadataResourceVersion
1)) (MetadataResourceVersion -> Bool)
-> ((MetadataResourceVersion, CacheInvalidations)
-> MetadataResourceVersion)
-> (MetadataResourceVersion, CacheInvalidations)
-> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (MetadataResourceVersion, CacheInvalidations)
-> MetadataResourceVersion
forall a b. (a, b) -> a
fst) [(MetadataResourceVersion, CacheInvalidations)]
notifications
then
[CacheInvalidations] -> CacheInvalidations
forall a. Monoid a => [a] -> a
mconcat ([CacheInvalidations] -> CacheInvalidations)
-> [CacheInvalidations] -> CacheInvalidations
forall a b. (a -> b) -> a -> b
$ (MetadataResourceVersion, CacheInvalidations) -> CacheInvalidations
forall a b. (a, b) -> b
snd ((MetadataResourceVersion, CacheInvalidations)
-> CacheInvalidations)
-> [(MetadataResourceVersion, CacheInvalidations)]
-> [CacheInvalidations]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [(MetadataResourceVersion, CacheInvalidations)]
notifications
else
CacheInvalidations :: Bool
-> HashSet RemoteSchemaName
-> HashSet SourceName
-> CacheInvalidations
CacheInvalidations
{ ciMetadata :: Bool
ciMetadata = Bool
True,
ciRemoteSchemas :: HashSet RemoteSchemaName
ciRemoteSchemas = [RemoteSchemaName] -> HashSet RemoteSchemaName
forall a. (Eq a, Hashable a) => [a] -> HashSet a
HS.fromList ([RemoteSchemaName] -> HashSet RemoteSchemaName)
-> [RemoteSchemaName] -> HashSet RemoteSchemaName
forall a b. (a -> b) -> a -> b
$ SchemaCache -> [RemoteSchemaName]
getAllRemoteSchemas SchemaCache
schemaCache,
ciSources :: HashSet SourceName
ciSources = [SourceName] -> HashSet SourceName
forall a. (Eq a, Hashable a) => [a] -> HashSet a
HS.fromList ([SourceName] -> HashSet SourceName)
-> [SourceName] -> HashSet SourceName
forall a b. (a -> b) -> a -> b
$ HashMap SourceName BackendSourceInfo -> [SourceName]
forall k v. HashMap k v -> [k]
HM.keys (HashMap SourceName BackendSourceInfo -> [SourceName])
-> HashMap SourceName BackendSourceInfo -> [SourceName]
forall a b. (a -> b) -> a -> b
$ SchemaCache -> HashMap SourceName BackendSourceInfo
scSources SchemaCache
schemaCache
}
Logger Hasura -> ThreadType -> Value -> CacheRWT (RunT m) ()
forall (m :: * -> *).
MonadIO m =>
Logger Hasura -> ThreadType -> Value -> m ()
logInfo Logger Hasura
logger ThreadType
threadType (Value -> CacheRWT (RunT m) ()) -> Value -> CacheRWT (RunT m) ()
forall a b. (a -> b) -> a -> b
$ [Pair] -> Value
object [Key
"currentVersion" Key -> MetadataResourceVersion -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= MetadataResourceVersion
engineResourceVersion, Key
"latestResourceVersion" Key -> MetadataResourceVersion -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= MetadataResourceVersion
latestResourceVersion]
BuildReason
-> CacheInvalidations -> Metadata -> CacheRWT (RunT m) ()
forall (m :: * -> *).
CacheRWM m =>
BuildReason -> CacheInvalidations -> Metadata -> m ()
buildSchemaCacheWithOptions BuildReason
CatalogSync CacheInvalidations
cacheInvalidations Metadata
metadata
MetadataResourceVersion -> CacheRWT (RunT m) ()
forall (m :: * -> *). CacheRWM m => MetadataResourceVersion -> m ()
setMetadataResourceVersionInSchemaCache MetadataResourceVersion
latestResourceVersion
Logger Hasura -> ThreadType -> Value -> CacheRWT (RunT m) ()
forall (m :: * -> *).
MonadIO m =>
Logger Hasura -> ThreadType -> Value -> m ()
logInfo Logger Hasura
logger ThreadType
threadType (Value -> CacheRWT (RunT m) ()) -> Value -> CacheRWT (RunT m) ()
forall a b. (a -> b) -> a -> b
$ [Pair] -> Value
object [Key
"message" Key -> Text -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= (Text
"Schema Version changed with notifications" :: Text)]
((), RebuildableSchemaCache)
-> ExceptT QErr m ((), RebuildableSchemaCache)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (()
msg, RebuildableSchemaCache
cache)
Either QErr () -> (QErr -> m ()) -> m ()
forall (m :: * -> *) e a.
Applicative m =>
Either e a -> (e -> m a) -> m a
onLeft Either QErr ()
respErr (Logger Hasura -> ThreadType -> ThreadError -> m ()
forall (m :: * -> *) a.
(MonadIO m, ToJSON a) =>
Logger Hasura -> ThreadType -> a -> m ()
logError Logger Hasura
logger ThreadType
threadType (ThreadError -> m ()) -> (QErr -> ThreadError) -> QErr -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. QErr -> ThreadError
TEQueryError)
where
runCtx :: RunCtx
runCtx = UserInfo -> Manager -> ServerConfigCtx -> RunCtx
RunCtx UserInfo
adminUserInfo Manager
httpManager ServerConfigCtx
serverConfigCtx
logInfo :: (MonadIO m) => Logger Hasura -> ThreadType -> Value -> m ()
logInfo :: Logger Hasura -> ThreadType -> Value -> m ()
logInfo Logger Hasura
logger ThreadType
threadType Value
val =
Logger Hasura
-> forall a (m :: * -> *).
(ToEngineLog a Hasura, MonadIO m) =>
a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
(ToEngineLog a impl, MonadIO m) =>
a -> m ()
unLogger Logger Hasura
logger (SchemaSyncThreadLog -> m ()) -> SchemaSyncThreadLog -> m ()
forall a b. (a -> b) -> a -> b
$
LogLevel -> ThreadType -> Value -> SchemaSyncThreadLog
SchemaSyncThreadLog LogLevel
LevelInfo ThreadType
threadType Value
val
logError :: (MonadIO m, ToJSON a) => Logger Hasura -> ThreadType -> a -> m ()
logError :: Logger Hasura -> ThreadType -> a -> m ()
logError Logger Hasura
logger ThreadType
threadType a
err =
Logger Hasura
-> forall a (m :: * -> *).
(ToEngineLog a Hasura, MonadIO m) =>
a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
(ToEngineLog a impl, MonadIO m) =>
a -> m ()
unLogger Logger Hasura
logger (SchemaSyncThreadLog -> m ()) -> SchemaSyncThreadLog -> m ()
forall a b. (a -> b) -> a -> b
$
LogLevel -> ThreadType -> Value -> SchemaSyncThreadLog
SchemaSyncThreadLog LogLevel
LevelError ThreadType
threadType (Value -> SchemaSyncThreadLog) -> Value -> SchemaSyncThreadLog
forall a b. (a -> b) -> a -> b
$
[Pair] -> Value
object [Key
"error" Key -> Value -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= a -> Value
forall a. ToJSON a => a -> Value
toJSON a
err]
logDebug :: (MonadIO m) => Logger Hasura -> ThreadType -> String -> m ()
logDebug :: Logger Hasura -> ThreadType -> String -> m ()
logDebug Logger Hasura
logger ThreadType
threadType String
msg =
Logger Hasura
-> forall a (m :: * -> *).
(ToEngineLog a Hasura, MonadIO m) =>
a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
(ToEngineLog a impl, MonadIO m) =>
a -> m ()
unLogger Logger Hasura
logger (SchemaSyncThreadLog -> m ()) -> SchemaSyncThreadLog -> m ()
forall a b. (a -> b) -> a -> b
$
LogLevel -> ThreadType -> Value -> SchemaSyncThreadLog
SchemaSyncThreadLog LogLevel
LevelDebug ThreadType
threadType (Value -> SchemaSyncThreadLog) -> Value -> SchemaSyncThreadLog
forall a b. (a -> b) -> a -> b
$ [Pair] -> Value
object [Key
"message" Key -> String -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= String
msg]