module Hasura.Server.SchemaUpdate
( startSchemaSyncListenerThread,
startSchemaSyncProcessorThread,
SchemaSyncThreadType (..),
)
where
import Control.Concurrent.Extended qualified as C
import Control.Concurrent.STM qualified as STM
import Control.Immortal qualified as Immortal
import Control.Monad.Loops qualified as L
import Control.Monad.Trans.Control (MonadBaseControl)
import Control.Monad.Trans.Managed (ManagedT)
import Data.Aeson
import Data.Aeson.Casing
import Data.HashMap.Strict qualified as HashMap
import Data.HashSet qualified as HS
import Data.Text qualified as T
import Database.PG.Query qualified as PG
import Hasura.App.State
import Hasura.Base.Error
import Hasura.Logging
import Hasura.Metadata.Class
import Hasura.Prelude
import Hasura.RQL.DDL.Schema (runCacheRWT)
import Hasura.RQL.DDL.Schema.Cache.Config
import Hasura.RQL.DDL.Schema.Catalog
import Hasura.RQL.Types.BackendType (BackendType (..))
import Hasura.RQL.Types.SchemaCache
import Hasura.RQL.Types.SchemaCache.Build
import Hasura.RQL.Types.Source
import Hasura.SQL.BackendMap qualified as BackendMap
import Hasura.Server.AppStateRef
( AppStateRef,
getAppContext,
getRebuildableSchemaCacheWithVersion,
withSchemaCacheUpdate,
)
import Hasura.Server.Logging
import Hasura.Server.Types
import Hasura.Services
import Refined (NonNegative, Refined, unrefine)
data ThreadError
= TEPayloadParse !Text
| TEQueryError !QErr
deriving ((forall x. ThreadError -> Rep ThreadError x)
-> (forall x. Rep ThreadError x -> ThreadError)
-> Generic ThreadError
forall x. Rep ThreadError x -> ThreadError
forall x. ThreadError -> Rep ThreadError x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. ThreadError -> Rep ThreadError x
from :: forall x. ThreadError -> Rep ThreadError x
$cto :: forall x. Rep ThreadError x -> ThreadError
to :: forall x. Rep ThreadError x -> ThreadError
Generic)
instance ToJSON ThreadError where
toJSON :: ThreadError -> Value
toJSON =
Options -> ThreadError -> Value
forall a.
(Generic a, GToJSON' Value Zero (Rep a)) =>
Options -> a -> Value
genericToJSON
Options
defaultOptions
{ constructorTagModifier :: String -> String
constructorTagModifier = String -> String
snakeCase (String -> String) -> (String -> String) -> String -> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> String -> String
forall a. Int -> [a] -> [a]
drop Int
2,
sumEncoding :: SumEncoding
sumEncoding = String -> String -> SumEncoding
TaggedObject String
"type" String
"info"
}
toEncoding :: ThreadError -> Encoding
toEncoding =
Options -> ThreadError -> Encoding
forall a.
(Generic a, GToJSON' Encoding Zero (Rep a)) =>
Options -> a -> Encoding
genericToEncoding
Options
defaultOptions
{ constructorTagModifier :: String -> String
constructorTagModifier = String -> String
snakeCase (String -> String) -> (String -> String) -> String -> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> String -> String
forall a. Int -> [a] -> [a]
drop Int
2,
sumEncoding :: SumEncoding
sumEncoding = String -> String -> SumEncoding
TaggedObject String
"type" String
"info"
}
logThreadStarted ::
(MonadIO m) =>
Logger Hasura ->
InstanceId ->
SchemaSyncThreadType ->
Immortal.Thread ->
m ()
logThreadStarted :: forall (m :: * -> *).
MonadIO m =>
Logger Hasura
-> InstanceId -> SchemaSyncThreadType -> Thread -> m ()
logThreadStarted Logger Hasura
logger InstanceId
instanceId SchemaSyncThreadType
threadType Thread
thread =
let msg :: Text
msg = SchemaSyncThreadType -> Text
forall a. Show a => a -> Text
tshow SchemaSyncThreadType
threadType Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" thread started"
in Logger Hasura
-> forall a (m :: * -> *).
(ToEngineLog a Hasura, MonadIO m) =>
a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
(ToEngineLog a impl, MonadIO m) =>
a -> m ()
unLogger Logger Hasura
logger
(StartupLog -> m ()) -> StartupLog -> m ()
forall a b. (a -> b) -> a -> b
$ LogLevel -> Text -> Value -> StartupLog
StartupLog LogLevel
LevelInfo Text
"schema-sync"
(Value -> StartupLog) -> Value -> StartupLog
forall a b. (a -> b) -> a -> b
$ [Pair] -> Value
object
[ Key
"instance_id" Key -> Text -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
.= InstanceId -> Text
getInstanceId InstanceId
instanceId,
Key
"thread_id" Key -> String -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
.= ThreadId -> String
forall a. Show a => a -> String
show (Thread -> ThreadId
Immortal.threadId Thread
thread),
Key
"message" Key -> Text -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
.= Text
msg
]
startSchemaSyncListenerThread ::
(C.ForkableMonadIO m) =>
Logger Hasura ->
PG.PGPool ->
InstanceId ->
Refined NonNegative Milliseconds ->
STM.TMVar MetadataResourceVersion ->
ManagedT m (Immortal.Thread)
startSchemaSyncListenerThread :: forall (m :: * -> *).
ForkableMonadIO m =>
Logger Hasura
-> PGPool
-> InstanceId
-> Refined NonNegative Milliseconds
-> TMVar MetadataResourceVersion
-> ManagedT m Thread
startSchemaSyncListenerThread Logger Hasura
logger PGPool
pool InstanceId
instanceId Refined NonNegative Milliseconds
interval TMVar MetadataResourceVersion
metaVersionRef = do
Thread
listenerThread <-
String -> Logger Hasura -> m Void -> ManagedT m Thread
forall (m :: * -> *).
ForkableMonadIO m =>
String -> Logger Hasura -> m Void -> ManagedT m Thread
C.forkManagedT String
"SchemeUpdate.listener" Logger Hasura
logger
(m Void -> ManagedT m Thread) -> m Void -> ManagedT m Thread
forall a b. (a -> b) -> a -> b
$ Logger Hasura
-> PGPool
-> TMVar MetadataResourceVersion
-> Milliseconds
-> m Void
forall (m :: * -> *) void.
MonadIO m =>
Logger Hasura
-> PGPool
-> TMVar MetadataResourceVersion
-> Milliseconds
-> m void
listener Logger Hasura
logger PGPool
pool TMVar MetadataResourceVersion
metaVersionRef (Refined NonNegative Milliseconds -> Milliseconds
forall {k} (p :: k) x. Refined p x -> x
unrefine Refined NonNegative Milliseconds
interval)
Logger Hasura
-> InstanceId -> SchemaSyncThreadType -> Thread -> ManagedT m ()
forall (m :: * -> *).
MonadIO m =>
Logger Hasura
-> InstanceId -> SchemaSyncThreadType -> Thread -> m ()
logThreadStarted Logger Hasura
logger InstanceId
instanceId SchemaSyncThreadType
TTListener Thread
listenerThread
Thread -> ManagedT m Thread
forall a. a -> ManagedT m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Thread
listenerThread
startSchemaSyncProcessorThread ::
( C.ForkableMonadIO m,
HasAppEnv m,
HasCacheStaticConfig m,
MonadMetadataStorage m,
MonadResolveSource m,
ProvidesNetwork m
) =>
AppStateRef impl ->
STM.TVar Bool ->
ManagedT m Immortal.Thread
startSchemaSyncProcessorThread :: forall (m :: * -> *) impl.
(ForkableMonadIO m, HasAppEnv m, HasCacheStaticConfig m,
MonadMetadataStorage m, MonadResolveSource m, ProvidesNetwork m) =>
AppStateRef impl -> TVar Bool -> ManagedT m Thread
startSchemaSyncProcessorThread AppStateRef impl
appStateRef TVar Bool
logTVar = do
AppEnv {Int
Maybe Text
Maybe PGPool
Maybe (CredentialCache AgentLicenseKey)
SamplingPolicy
HostPreference
Manager
TxIsolation
ConnParams
PGPool
Refined NonNegative Seconds
TMVar MetadataResourceVersion
ConnectionOptions
CheckFeatureFlag
ServerMetrics
EventingMode
ReadOnlyMode
MaintenanceMode ()
InstanceId
PrometheusMetrics
ShutdownLatch
LoggingSettings
LockedEventsCtx
WSConnectionInitTimeout
KeepAliveDelay
OptionalInterval
Port
SubscriptionsState
Loggers
appEnvPort :: Port
appEnvHost :: HostPreference
appEnvMetadataDbPool :: PGPool
appEnvIntrospectionDbPool :: Maybe PGPool
appEnvManager :: Manager
appEnvLoggers :: Loggers
appEnvMetadataVersionRef :: TMVar MetadataResourceVersion
appEnvInstanceId :: InstanceId
appEnvEnableMaintenanceMode :: MaintenanceMode ()
appEnvLoggingSettings :: LoggingSettings
appEnvEventingMode :: EventingMode
appEnvEnableReadOnlyMode :: ReadOnlyMode
appEnvServerMetrics :: ServerMetrics
appEnvShutdownLatch :: ShutdownLatch
appEnvMetaVersionRef :: TMVar MetadataResourceVersion
appEnvPrometheusMetrics :: PrometheusMetrics
appEnvTraceSamplingPolicy :: SamplingPolicy
appEnvSubscriptionState :: SubscriptionsState
appEnvLockedEventsCtx :: LockedEventsCtx
appEnvConnParams :: ConnParams
appEnvTxIso :: TxIsolation
appEnvConsoleAssetsDir :: Maybe Text
appEnvConsoleSentryDsn :: Maybe Text
appEnvConnectionOptions :: ConnectionOptions
appEnvWebSocketKeepAlive :: KeepAliveDelay
appEnvWebSocketConnectionInitTimeout :: WSConnectionInitTimeout
appEnvGracefulShutdownTimeout :: Refined NonNegative Seconds
appEnvSchemaPollInterval :: OptionalInterval
appEnvCheckFeatureFlag :: CheckFeatureFlag
appEnvLicenseKeyCache :: Maybe (CredentialCache AgentLicenseKey)
appEnvMaxTotalHeaderLength :: Int
appEnvPort :: AppEnv -> Port
appEnvHost :: AppEnv -> HostPreference
appEnvMetadataDbPool :: AppEnv -> PGPool
appEnvIntrospectionDbPool :: AppEnv -> Maybe PGPool
appEnvManager :: AppEnv -> Manager
appEnvLoggers :: AppEnv -> Loggers
appEnvMetadataVersionRef :: AppEnv -> TMVar MetadataResourceVersion
appEnvInstanceId :: AppEnv -> InstanceId
appEnvEnableMaintenanceMode :: AppEnv -> MaintenanceMode ()
appEnvLoggingSettings :: AppEnv -> LoggingSettings
appEnvEventingMode :: AppEnv -> EventingMode
appEnvEnableReadOnlyMode :: AppEnv -> ReadOnlyMode
appEnvServerMetrics :: AppEnv -> ServerMetrics
appEnvShutdownLatch :: AppEnv -> ShutdownLatch
appEnvMetaVersionRef :: AppEnv -> TMVar MetadataResourceVersion
appEnvPrometheusMetrics :: AppEnv -> PrometheusMetrics
appEnvTraceSamplingPolicy :: AppEnv -> SamplingPolicy
appEnvSubscriptionState :: AppEnv -> SubscriptionsState
appEnvLockedEventsCtx :: AppEnv -> LockedEventsCtx
appEnvConnParams :: AppEnv -> ConnParams
appEnvTxIso :: AppEnv -> TxIsolation
appEnvConsoleAssetsDir :: AppEnv -> Maybe Text
appEnvConsoleSentryDsn :: AppEnv -> Maybe Text
appEnvConnectionOptions :: AppEnv -> ConnectionOptions
appEnvWebSocketKeepAlive :: AppEnv -> KeepAliveDelay
appEnvWebSocketConnectionInitTimeout :: AppEnv -> WSConnectionInitTimeout
appEnvGracefulShutdownTimeout :: AppEnv -> Refined NonNegative Seconds
appEnvSchemaPollInterval :: AppEnv -> OptionalInterval
appEnvCheckFeatureFlag :: AppEnv -> CheckFeatureFlag
appEnvLicenseKeyCache :: AppEnv -> Maybe (CredentialCache AgentLicenseKey)
appEnvMaxTotalHeaderLength :: AppEnv -> Int
..} <- m AppEnv -> ManagedT m AppEnv
forall (m :: * -> *) a. Monad m => m a -> ManagedT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift m AppEnv
forall (m :: * -> *). HasAppEnv m => m AppEnv
askAppEnv
let logger :: Logger Hasura
logger = Loggers -> Logger Hasura
_lsLogger Loggers
appEnvLoggers
Thread
processorThread <-
String -> Logger Hasura -> m Void -> ManagedT m Thread
forall (m :: * -> *).
ForkableMonadIO m =>
String -> Logger Hasura -> m Void -> ManagedT m Thread
C.forkManagedT String
"SchemeUpdate.processor" Logger Hasura
logger
(m Void -> ManagedT m Thread) -> m Void -> ManagedT m Thread
forall a b. (a -> b) -> a -> b
$ TMVar MetadataResourceVersion
-> AppStateRef impl -> TVar Bool -> m Void
forall (m :: * -> *) void impl.
(ForkableMonadIO m, HasAppEnv m, HasCacheStaticConfig m,
MonadMetadataStorage m, MonadResolveSource m, ProvidesNetwork m) =>
TMVar MetadataResourceVersion
-> AppStateRef impl -> TVar Bool -> m void
processor TMVar MetadataResourceVersion
appEnvMetadataVersionRef AppStateRef impl
appStateRef TVar Bool
logTVar
Logger Hasura
-> InstanceId -> SchemaSyncThreadType -> Thread -> ManagedT m ()
forall (m :: * -> *).
MonadIO m =>
Logger Hasura
-> InstanceId -> SchemaSyncThreadType -> Thread -> m ()
logThreadStarted Logger Hasura
logger InstanceId
appEnvInstanceId SchemaSyncThreadType
TTProcessor Thread
processorThread
Thread -> ManagedT m Thread
forall a. a -> ManagedT m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Thread
processorThread
forcePut :: STM.TMVar a -> a -> IO ()
forcePut :: forall a. TMVar a -> a -> IO ()
forcePut TMVar a
v a
a = STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar a -> STM (Maybe a)
forall a. TMVar a -> STM (Maybe a)
STM.tryTakeTMVar TMVar a
v STM (Maybe a) -> STM () -> STM ()
forall a b. STM a -> STM b -> STM b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> TMVar a -> a -> STM ()
forall a. TMVar a -> a -> STM ()
STM.putTMVar TMVar a
v a
a
schemaVersionCheckHandler ::
PG.PGPool -> STM.TMVar MetadataResourceVersion -> IO (Either QErr ())
schemaVersionCheckHandler :: PGPool -> TMVar MetadataResourceVersion -> IO (Either QErr ())
schemaVersionCheckHandler PGPool
pool TMVar MetadataResourceVersion
metaVersionRef =
ExceptT QErr IO MetadataResourceVersion
-> IO (Either QErr MetadataResourceVersion)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT
( PGPool
-> TxMode
-> TxET QErr IO MetadataResourceVersion
-> ExceptT QErr IO MetadataResourceVersion
forall (m :: * -> *) e a.
(MonadIO m, MonadBaseControl IO m, FromPGTxErr e,
FromPGConnErr e) =>
PGPool -> TxMode -> TxET e m a -> ExceptT e m a
PG.runTx PGPool
pool (TxIsolation
PG.RepeatableRead, Maybe TxAccess
forall a. Maybe a
Nothing)
(TxET QErr IO MetadataResourceVersion
-> ExceptT QErr IO MetadataResourceVersion)
-> TxET QErr IO MetadataResourceVersion
-> ExceptT QErr IO MetadataResourceVersion
forall a b. (a -> b) -> a -> b
$ TxET QErr IO MetadataResourceVersion
fetchMetadataResourceVersionFromCatalog
)
IO (Either QErr MetadataResourceVersion)
-> (Either QErr MetadataResourceVersion -> IO (Either QErr ()))
-> IO (Either QErr ())
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Right MetadataResourceVersion
version -> () -> Either QErr ()
forall a b. b -> Either a b
Right (() -> Either QErr ()) -> IO () -> IO (Either QErr ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TMVar MetadataResourceVersion -> MetadataResourceVersion -> IO ()
forall a. TMVar a -> a -> IO ()
forcePut TMVar MetadataResourceVersion
metaVersionRef MetadataResourceVersion
version
Left QErr
err -> Either QErr () -> IO (Either QErr ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either QErr () -> IO (Either QErr ()))
-> Either QErr () -> IO (Either QErr ())
forall a b. (a -> b) -> a -> b
$ QErr -> Either QErr ()
forall a b. a -> Either a b
Left QErr
err
data ErrorState = ErrorState
{ ErrorState -> Maybe QErr
_esLastErrorSeen :: !(Maybe QErr),
ErrorState -> Maybe MetadataResourceVersion
_esLastMetadataVersion :: !(Maybe MetadataResourceVersion)
}
deriving (ErrorState -> ErrorState -> Bool
(ErrorState -> ErrorState -> Bool)
-> (ErrorState -> ErrorState -> Bool) -> Eq ErrorState
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ErrorState -> ErrorState -> Bool
== :: ErrorState -> ErrorState -> Bool
$c/= :: ErrorState -> ErrorState -> Bool
/= :: ErrorState -> ErrorState -> Bool
Eq)
defaultErrorState :: ErrorState
defaultErrorState :: ErrorState
defaultErrorState = Maybe QErr -> Maybe MetadataResourceVersion -> ErrorState
ErrorState Maybe QErr
forall a. Maybe a
Nothing Maybe MetadataResourceVersion
forall a. Maybe a
Nothing
updateErrorInState :: ErrorState -> QErr -> MetadataResourceVersion -> ErrorState
updateErrorInState :: ErrorState -> QErr -> MetadataResourceVersion -> ErrorState
updateErrorInState ErrorState
es QErr
qerr MetadataResourceVersion
mrv =
ErrorState
es
{ _esLastErrorSeen :: Maybe QErr
_esLastErrorSeen = QErr -> Maybe QErr
forall a. a -> Maybe a
Just QErr
qerr,
_esLastMetadataVersion :: Maybe MetadataResourceVersion
_esLastMetadataVersion = MetadataResourceVersion -> Maybe MetadataResourceVersion
forall a. a -> Maybe a
Just MetadataResourceVersion
mrv
}
isInErrorState :: ErrorState -> Bool
isInErrorState :: ErrorState -> Bool
isInErrorState ErrorState
es =
(Maybe QErr -> Bool
forall a. Maybe a -> Bool
isJust (Maybe QErr -> Bool)
-> (ErrorState -> Maybe QErr) -> ErrorState -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ErrorState -> Maybe QErr
_esLastErrorSeen) ErrorState
es Bool -> Bool -> Bool
&& (Maybe MetadataResourceVersion -> Bool
forall a. Maybe a -> Bool
isJust (Maybe MetadataResourceVersion -> Bool)
-> (ErrorState -> Maybe MetadataResourceVersion)
-> ErrorState
-> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ErrorState -> Maybe MetadataResourceVersion
_esLastMetadataVersion) ErrorState
es
toLogError :: ErrorState -> QErr -> MetadataResourceVersion -> Bool
toLogError :: ErrorState -> QErr -> MetadataResourceVersion -> Bool
toLogError ErrorState
es QErr
qerr MetadataResourceVersion
mrv = Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ Bool
isQErrLastSeen Bool -> Bool -> Bool
|| Bool
isMetadataResourceVersionLastSeen
where
isQErrLastSeen :: Bool
isQErrLastSeen = case ErrorState -> Maybe QErr
_esLastErrorSeen ErrorState
es of
Just QErr
lErrS -> QErr
lErrS QErr -> QErr -> Bool
forall a. Eq a => a -> a -> Bool
== QErr
qerr
Maybe QErr
Nothing -> Bool
False
isMetadataResourceVersionLastSeen :: Bool
isMetadataResourceVersionLastSeen = case ErrorState -> Maybe MetadataResourceVersion
_esLastMetadataVersion ErrorState
es of
Just MetadataResourceVersion
lMRV -> MetadataResourceVersion
lMRV MetadataResourceVersion -> MetadataResourceVersion -> Bool
forall a. Eq a => a -> a -> Bool
== MetadataResourceVersion
mrv
Maybe MetadataResourceVersion
Nothing -> Bool
False
listener ::
(MonadIO m) =>
Logger Hasura ->
PG.PGPool ->
STM.TMVar MetadataResourceVersion ->
Milliseconds ->
m void
listener :: forall (m :: * -> *) void.
MonadIO m =>
Logger Hasura
-> PGPool
-> TMVar MetadataResourceVersion
-> Milliseconds
-> m void
listener Logger Hasura
logger PGPool
pool TMVar MetadataResourceVersion
metaVersionRef Milliseconds
interval = (ErrorState -> m ErrorState) -> ErrorState -> m void
forall (m :: * -> *) a b. Monad m => (a -> m a) -> a -> m b
L.iterateM_ ErrorState -> m ErrorState
listenerLoop ErrorState
defaultErrorState
where
listenerLoop :: ErrorState -> m ErrorState
listenerLoop ErrorState
errorState = do
Maybe MetadataResourceVersion
mrv <- IO (Maybe MetadataResourceVersion)
-> m (Maybe MetadataResourceVersion)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe MetadataResourceVersion)
-> m (Maybe MetadataResourceVersion))
-> IO (Maybe MetadataResourceVersion)
-> m (Maybe MetadataResourceVersion)
forall a b. (a -> b) -> a -> b
$ STM (Maybe MetadataResourceVersion)
-> IO (Maybe MetadataResourceVersion)
forall a. STM a -> IO a
STM.atomically (STM (Maybe MetadataResourceVersion)
-> IO (Maybe MetadataResourceVersion))
-> STM (Maybe MetadataResourceVersion)
-> IO (Maybe MetadataResourceVersion)
forall a b. (a -> b) -> a -> b
$ TMVar MetadataResourceVersion
-> STM (Maybe MetadataResourceVersion)
forall a. TMVar a -> STM (Maybe a)
STM.tryTakeTMVar TMVar MetadataResourceVersion
metaVersionRef
Either QErr ()
resp <- IO (Either QErr ()) -> m (Either QErr ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either QErr ()) -> m (Either QErr ()))
-> IO (Either QErr ()) -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$ PGPool -> TMVar MetadataResourceVersion -> IO (Either QErr ())
schemaVersionCheckHandler PGPool
pool TMVar MetadataResourceVersion
metaVersionRef
let metadataVersion :: MetadataResourceVersion
metadataVersion = MetadataResourceVersion
-> Maybe MetadataResourceVersion -> MetadataResourceVersion
forall a. a -> Maybe a -> a
fromMaybe MetadataResourceVersion
initialResourceVersion Maybe MetadataResourceVersion
mrv
ErrorState
nextErr <- case Either QErr ()
resp of
Left QErr
respErr -> do
if (ErrorState -> QErr -> MetadataResourceVersion -> Bool
toLogError ErrorState
errorState QErr
respErr MetadataResourceVersion
metadataVersion)
then do
Logger Hasura -> SchemaSyncThreadType -> ThreadError -> m ()
forall (m :: * -> *) a.
(MonadIO m, ToJSON a) =>
Logger Hasura -> SchemaSyncThreadType -> a -> m ()
logError Logger Hasura
logger SchemaSyncThreadType
TTListener (ThreadError -> m ()) -> ThreadError -> m ()
forall a b. (a -> b) -> a -> b
$ QErr -> ThreadError
TEQueryError QErr
respErr
Logger Hasura -> SchemaSyncThreadType -> Value -> m ()
forall (m :: * -> *).
MonadIO m =>
Logger Hasura -> SchemaSyncThreadType -> Value -> m ()
logInfo Logger Hasura
logger SchemaSyncThreadType
TTListener (Value -> m ()) -> Value -> m ()
forall a b. (a -> b) -> a -> b
$ [Pair] -> Value
object [Key
"metadataResourceVersion" Key -> Value -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
.= MetadataResourceVersion -> Value
forall a. ToJSON a => a -> Value
toJSON MetadataResourceVersion
metadataVersion]
ErrorState -> m ErrorState
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ErrorState -> m ErrorState) -> ErrorState -> m ErrorState
forall a b. (a -> b) -> a -> b
$ ErrorState -> QErr -> MetadataResourceVersion -> ErrorState
updateErrorInState ErrorState
errorState QErr
respErr MetadataResourceVersion
metadataVersion
else do
ErrorState -> m ErrorState
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ErrorState
errorState
Right ()
_ -> do
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ErrorState -> Bool
isInErrorState ErrorState
errorState)
(m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ Logger Hasura -> SchemaSyncThreadType -> Value -> m ()
forall (m :: * -> *).
MonadIO m =>
Logger Hasura -> SchemaSyncThreadType -> Value -> m ()
logInfo Logger Hasura
logger SchemaSyncThreadType
TTListener
(Value -> m ()) -> Value -> m ()
forall a b. (a -> b) -> a -> b
$ [Pair] -> Value
object [Key
"message" Key -> Text -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
.= (Text
"SchemaSync Restored..." :: Text)]
ErrorState -> m ErrorState
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ErrorState
defaultErrorState
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ DiffTime -> IO ()
C.sleep (DiffTime -> IO ()) -> DiffTime -> IO ()
forall a b. (a -> b) -> a -> b
$ Milliseconds -> DiffTime
milliseconds Milliseconds
interval
ErrorState -> m ErrorState
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ErrorState
nextErr
processor ::
forall m void impl.
( C.ForkableMonadIO m,
HasAppEnv m,
HasCacheStaticConfig m,
MonadMetadataStorage m,
MonadResolveSource m,
ProvidesNetwork m
) =>
STM.TMVar MetadataResourceVersion ->
AppStateRef impl ->
STM.TVar Bool ->
m void
processor :: forall (m :: * -> *) void impl.
(ForkableMonadIO m, HasAppEnv m, HasCacheStaticConfig m,
MonadMetadataStorage m, MonadResolveSource m, ProvidesNetwork m) =>
TMVar MetadataResourceVersion
-> AppStateRef impl -> TVar Bool -> m void
processor
TMVar MetadataResourceVersion
metaVersionRef
AppStateRef impl
appStateRef
TVar Bool
logTVar = m () -> m void
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever do
MetadataResourceVersion
metaVersion <- IO MetadataResourceVersion -> m MetadataResourceVersion
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO MetadataResourceVersion -> m MetadataResourceVersion)
-> IO MetadataResourceVersion -> m MetadataResourceVersion
forall a b. (a -> b) -> a -> b
$ STM MetadataResourceVersion -> IO MetadataResourceVersion
forall a. STM a -> IO a
STM.atomically (STM MetadataResourceVersion -> IO MetadataResourceVersion)
-> STM MetadataResourceVersion -> IO MetadataResourceVersion
forall a b. (a -> b) -> a -> b
$ TMVar MetadataResourceVersion -> STM MetadataResourceVersion
forall a. TMVar a -> STM a
STM.takeTMVar TMVar MetadataResourceVersion
metaVersionRef
MetadataResourceVersion
-> AppStateRef impl -> SchemaSyncThreadType -> TVar Bool -> m ()
forall (m :: * -> *) impl.
(MonadIO m, MonadBaseControl IO m, HasAppEnv m,
HasCacheStaticConfig m, MonadMetadataStorage m,
MonadResolveSource m, ProvidesNetwork m) =>
MetadataResourceVersion
-> AppStateRef impl -> SchemaSyncThreadType -> TVar Bool -> m ()
refreshSchemaCache MetadataResourceVersion
metaVersion AppStateRef impl
appStateRef SchemaSyncThreadType
TTProcessor TVar Bool
logTVar
refreshSchemaCache ::
( MonadIO m,
MonadBaseControl IO m,
HasAppEnv m,
HasCacheStaticConfig m,
MonadMetadataStorage m,
MonadResolveSource m,
ProvidesNetwork m
) =>
MetadataResourceVersion ->
AppStateRef impl ->
SchemaSyncThreadType ->
STM.TVar Bool ->
m ()
refreshSchemaCache :: forall (m :: * -> *) impl.
(MonadIO m, MonadBaseControl IO m, HasAppEnv m,
HasCacheStaticConfig m, MonadMetadataStorage m,
MonadResolveSource m, ProvidesNetwork m) =>
MetadataResourceVersion
-> AppStateRef impl -> SchemaSyncThreadType -> TVar Bool -> m ()
refreshSchemaCache
MetadataResourceVersion
resourceVersion
AppStateRef impl
appStateRef
SchemaSyncThreadType
threadType
TVar Bool
logTVar = do
AppEnv {Int
Maybe Text
Maybe PGPool
Maybe (CredentialCache AgentLicenseKey)
SamplingPolicy
HostPreference
Manager
TxIsolation
ConnParams
PGPool
Refined NonNegative Seconds
TMVar MetadataResourceVersion
ConnectionOptions
CheckFeatureFlag
ServerMetrics
EventingMode
ReadOnlyMode
MaintenanceMode ()
InstanceId
PrometheusMetrics
ShutdownLatch
LoggingSettings
LockedEventsCtx
WSConnectionInitTimeout
KeepAliveDelay
OptionalInterval
Port
SubscriptionsState
Loggers
appEnvPort :: AppEnv -> Port
appEnvHost :: AppEnv -> HostPreference
appEnvMetadataDbPool :: AppEnv -> PGPool
appEnvIntrospectionDbPool :: AppEnv -> Maybe PGPool
appEnvManager :: AppEnv -> Manager
appEnvLoggers :: AppEnv -> Loggers
appEnvMetadataVersionRef :: AppEnv -> TMVar MetadataResourceVersion
appEnvInstanceId :: AppEnv -> InstanceId
appEnvEnableMaintenanceMode :: AppEnv -> MaintenanceMode ()
appEnvLoggingSettings :: AppEnv -> LoggingSettings
appEnvEventingMode :: AppEnv -> EventingMode
appEnvEnableReadOnlyMode :: AppEnv -> ReadOnlyMode
appEnvServerMetrics :: AppEnv -> ServerMetrics
appEnvShutdownLatch :: AppEnv -> ShutdownLatch
appEnvMetaVersionRef :: AppEnv -> TMVar MetadataResourceVersion
appEnvPrometheusMetrics :: AppEnv -> PrometheusMetrics
appEnvTraceSamplingPolicy :: AppEnv -> SamplingPolicy
appEnvSubscriptionState :: AppEnv -> SubscriptionsState
appEnvLockedEventsCtx :: AppEnv -> LockedEventsCtx
appEnvConnParams :: AppEnv -> ConnParams
appEnvTxIso :: AppEnv -> TxIsolation
appEnvConsoleAssetsDir :: AppEnv -> Maybe Text
appEnvConsoleSentryDsn :: AppEnv -> Maybe Text
appEnvConnectionOptions :: AppEnv -> ConnectionOptions
appEnvWebSocketKeepAlive :: AppEnv -> KeepAliveDelay
appEnvWebSocketConnectionInitTimeout :: AppEnv -> WSConnectionInitTimeout
appEnvGracefulShutdownTimeout :: AppEnv -> Refined NonNegative Seconds
appEnvSchemaPollInterval :: AppEnv -> OptionalInterval
appEnvCheckFeatureFlag :: AppEnv -> CheckFeatureFlag
appEnvLicenseKeyCache :: AppEnv -> Maybe (CredentialCache AgentLicenseKey)
appEnvMaxTotalHeaderLength :: AppEnv -> Int
appEnvPort :: Port
appEnvHost :: HostPreference
appEnvMetadataDbPool :: PGPool
appEnvIntrospectionDbPool :: Maybe PGPool
appEnvManager :: Manager
appEnvLoggers :: Loggers
appEnvMetadataVersionRef :: TMVar MetadataResourceVersion
appEnvInstanceId :: InstanceId
appEnvEnableMaintenanceMode :: MaintenanceMode ()
appEnvLoggingSettings :: LoggingSettings
appEnvEventingMode :: EventingMode
appEnvEnableReadOnlyMode :: ReadOnlyMode
appEnvServerMetrics :: ServerMetrics
appEnvShutdownLatch :: ShutdownLatch
appEnvMetaVersionRef :: TMVar MetadataResourceVersion
appEnvPrometheusMetrics :: PrometheusMetrics
appEnvTraceSamplingPolicy :: SamplingPolicy
appEnvSubscriptionState :: SubscriptionsState
appEnvLockedEventsCtx :: LockedEventsCtx
appEnvConnParams :: ConnParams
appEnvTxIso :: TxIsolation
appEnvConsoleAssetsDir :: Maybe Text
appEnvConsoleSentryDsn :: Maybe Text
appEnvConnectionOptions :: ConnectionOptions
appEnvWebSocketKeepAlive :: KeepAliveDelay
appEnvWebSocketConnectionInitTimeout :: WSConnectionInitTimeout
appEnvGracefulShutdownTimeout :: Refined NonNegative Seconds
appEnvSchemaPollInterval :: OptionalInterval
appEnvCheckFeatureFlag :: CheckFeatureFlag
appEnvLicenseKeyCache :: Maybe (CredentialCache AgentLicenseKey)
appEnvMaxTotalHeaderLength :: Int
..} <- m AppEnv
forall (m :: * -> *). HasAppEnv m => m AppEnv
askAppEnv
let logger :: Logger Hasura
logger = Loggers -> Logger Hasura
_lsLogger Loggers
appEnvLoggers
Either QErr ()
respErr <- ExceptT QErr m () -> m (Either QErr ())
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT
(ExceptT QErr m () -> m (Either QErr ()))
-> ExceptT QErr m () -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$ AppStateRef impl
-> Logger Hasura
-> Maybe (TVar Bool)
-> ExceptT QErr m ((), RebuildableSchemaCache)
-> ExceptT QErr m ()
forall (m :: * -> *) impl a.
(MonadIO m, MonadBaseControl IO m, MonadError QErr m) =>
AppStateRef impl
-> Logger Hasura
-> Maybe (TVar Bool)
-> m (a, RebuildableSchemaCache)
-> m a
withSchemaCacheUpdate AppStateRef impl
appStateRef Logger Hasura
logger (TVar Bool -> Maybe (TVar Bool)
forall a. a -> Maybe a
Just TVar Bool
logTVar)
(ExceptT QErr m ((), RebuildableSchemaCache) -> ExceptT QErr m ())
-> ExceptT QErr m ((), RebuildableSchemaCache) -> ExceptT QErr m ()
forall a b. (a -> b) -> a -> b
$ do
RebuildableSchemaCache
rebuildableCache <- IO RebuildableSchemaCache -> ExceptT QErr m RebuildableSchemaCache
forall a. IO a -> ExceptT QErr m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO RebuildableSchemaCache
-> ExceptT QErr m RebuildableSchemaCache)
-> IO RebuildableSchemaCache
-> ExceptT QErr m RebuildableSchemaCache
forall a b. (a -> b) -> a -> b
$ AppStateRef impl -> IO RebuildableSchemaCache
forall impl. AppStateRef impl -> IO RebuildableSchemaCache
getRebuildableSchemaCacheWithVersion AppStateRef impl
appStateRef
AppContext
appContext <- IO AppContext -> ExceptT QErr m AppContext
forall a. IO a -> ExceptT QErr m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO AppContext -> ExceptT QErr m AppContext)
-> IO AppContext -> ExceptT QErr m AppContext
forall a b. (a -> b) -> a -> b
$ AppStateRef impl -> IO AppContext
forall impl. AppStateRef impl -> IO AppContext
getAppContext AppStateRef impl
appStateRef
let dynamicConfig :: CacheDynamicConfig
dynamicConfig = AppContext -> CacheDynamicConfig
buildCacheDynamicConfig AppContext
appContext
(()
msg, RebuildableSchemaCache
cache, CacheInvalidations
_, SourcesIntrospectionStatus
_sourcesIntrospection, SchemaRegistryAction
_schemaRegistryAction) <-
CacheDynamicConfig
-> RebuildableSchemaCache
-> CacheRWT (ExceptT QErr m) ()
-> ExceptT
QErr
m
((), RebuildableSchemaCache, CacheInvalidations,
SourcesIntrospectionStatus, SchemaRegistryAction)
forall (m :: * -> *) a.
Monad m =>
CacheDynamicConfig
-> RebuildableSchemaCache
-> CacheRWT m a
-> m (a, RebuildableSchemaCache, CacheInvalidations,
SourcesIntrospectionStatus, SchemaRegistryAction)
runCacheRWT CacheDynamicConfig
dynamicConfig RebuildableSchemaCache
rebuildableCache (CacheRWT (ExceptT QErr m) ()
-> ExceptT
QErr
m
((), RebuildableSchemaCache, CacheInvalidations,
SourcesIntrospectionStatus, SchemaRegistryAction))
-> CacheRWT (ExceptT QErr m) ()
-> ExceptT
QErr
m
((), RebuildableSchemaCache, CacheInvalidations,
SourcesIntrospectionStatus, SchemaRegistryAction)
forall a b. (a -> b) -> a -> b
$ do
SchemaCache
schemaCache <- CacheRWT (ExceptT QErr m) SchemaCache
forall (m :: * -> *). CacheRM m => m SchemaCache
askSchemaCache
let engineResourceVersion :: MetadataResourceVersion
engineResourceVersion = SchemaCache -> MetadataResourceVersion
scMetadataResourceVersion SchemaCache
schemaCache
Bool
-> CacheRWT (ExceptT QErr m) () -> CacheRWT (ExceptT QErr m) ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (MetadataResourceVersion
engineResourceVersion MetadataResourceVersion -> MetadataResourceVersion -> Bool
forall a. Eq a => a -> a -> Bool
== MetadataResourceVersion
resourceVersion) (CacheRWT (ExceptT QErr m) () -> CacheRWT (ExceptT QErr m) ())
-> CacheRWT (ExceptT QErr m) () -> CacheRWT (ExceptT QErr m) ()
forall a b. (a -> b) -> a -> b
$ do
Logger Hasura
-> SchemaSyncThreadType -> Value -> CacheRWT (ExceptT QErr m) ()
forall (m :: * -> *).
MonadIO m =>
Logger Hasura -> SchemaSyncThreadType -> Value -> m ()
logInfo Logger Hasura
logger SchemaSyncThreadType
threadType
(Value -> CacheRWT (ExceptT QErr m) ())
-> Value -> CacheRWT (ExceptT QErr m) ()
forall a b. (a -> b) -> a -> b
$ Text -> Value
String
(Text -> Value) -> Text -> Value
forall a b. (a -> b) -> a -> b
$ [Text] -> Text
T.unwords
[ Text
"Received metadata resource version:",
MetadataResourceVersion -> Text
showMetadataResourceVersion MetadataResourceVersion
resourceVersion Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
",",
Text
"different from the current engine resource version:",
MetadataResourceVersion -> Text
showMetadataResourceVersion MetadataResourceVersion
engineResourceVersion Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
".",
Text
"Trying to update the schema cache."
]
MetadataWithResourceVersion Metadata
metadata MetadataResourceVersion
latestResourceVersion <- CacheRWT (ExceptT QErr m) (Either QErr MetadataWithResourceVersion)
-> CacheRWT (ExceptT QErr m) MetadataWithResourceVersion
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM CacheRWT (ExceptT QErr m) (Either QErr MetadataWithResourceVersion)
forall (m :: * -> *).
MonadMetadataStorage m =>
m (Either QErr MetadataWithResourceVersion)
fetchMetadata
Logger Hasura
-> SchemaSyncThreadType -> Value -> CacheRWT (ExceptT QErr m) ()
forall (m :: * -> *).
MonadIO m =>
Logger Hasura -> SchemaSyncThreadType -> Value -> m ()
logInfo Logger Hasura
logger SchemaSyncThreadType
threadType
(Value -> CacheRWT (ExceptT QErr m) ())
-> Value -> CacheRWT (ExceptT QErr m) ()
forall a b. (a -> b) -> a -> b
$ Text -> Value
String
(Text -> Value) -> Text -> Value
forall a b. (a -> b) -> a -> b
$ [Text] -> Text
T.unwords
[ Text
"Fetched metadata with resource version:",
MetadataResourceVersion -> Text
showMetadataResourceVersion MetadataResourceVersion
latestResourceVersion
]
[(MetadataResourceVersion, CacheInvalidations)]
notifications <- CacheRWT
(ExceptT QErr m)
(Either QErr [(MetadataResourceVersion, CacheInvalidations)])
-> CacheRWT
(ExceptT QErr m) [(MetadataResourceVersion, CacheInvalidations)]
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM (CacheRWT
(ExceptT QErr m)
(Either QErr [(MetadataResourceVersion, CacheInvalidations)])
-> CacheRWT
(ExceptT QErr m) [(MetadataResourceVersion, CacheInvalidations)])
-> CacheRWT
(ExceptT QErr m)
(Either QErr [(MetadataResourceVersion, CacheInvalidations)])
-> CacheRWT
(ExceptT QErr m) [(MetadataResourceVersion, CacheInvalidations)]
forall a b. (a -> b) -> a -> b
$ MetadataResourceVersion
-> InstanceId
-> CacheRWT
(ExceptT QErr m)
(Either QErr [(MetadataResourceVersion, CacheInvalidations)])
forall (m :: * -> *).
MonadMetadataStorage m =>
MetadataResourceVersion
-> InstanceId
-> m (Either QErr [(MetadataResourceVersion, CacheInvalidations)])
fetchMetadataNotifications MetadataResourceVersion
engineResourceVersion InstanceId
appEnvInstanceId
case [(MetadataResourceVersion, CacheInvalidations)]
notifications of
[] -> do
Logger Hasura
-> SchemaSyncThreadType -> Value -> CacheRWT (ExceptT QErr m) ()
forall (m :: * -> *).
MonadIO m =>
Logger Hasura -> SchemaSyncThreadType -> Value -> m ()
logInfo Logger Hasura
logger SchemaSyncThreadType
threadType
(Value -> CacheRWT (ExceptT QErr m) ())
-> Value -> CacheRWT (ExceptT QErr m) ()
forall a b. (a -> b) -> a -> b
$ Text -> Value
String
(Text -> Value) -> Text -> Value
forall a b. (a -> b) -> a -> b
$ [Text] -> Text
T.unwords
[ Text
"Fetched metadata notifications and received no notifications. Not updating the schema cache.",
Text
"Only setting resource version:",
MetadataResourceVersion -> Text
showMetadataResourceVersion MetadataResourceVersion
latestResourceVersion,
Text
"in schema cache"
]
MetadataResourceVersion -> CacheRWT (ExceptT QErr m) ()
forall (m :: * -> *). CacheRWM m => MetadataResourceVersion -> m ()
setMetadataResourceVersionInSchemaCache MetadataResourceVersion
latestResourceVersion
[(MetadataResourceVersion, CacheInvalidations)]
_ -> do
Logger Hasura
-> SchemaSyncThreadType -> Value -> CacheRWT (ExceptT QErr m) ()
forall (m :: * -> *).
MonadIO m =>
Logger Hasura -> SchemaSyncThreadType -> Value -> m ()
logInfo Logger Hasura
logger SchemaSyncThreadType
threadType
(Value -> CacheRWT (ExceptT QErr m) ())
-> Value -> CacheRWT (ExceptT QErr m) ()
forall a b. (a -> b) -> a -> b
$ Text -> Value
String Text
"Fetched metadata notifications and received some notifications. Updating the schema cache."
let cacheInvalidations :: CacheInvalidations
cacheInvalidations =
if ((MetadataResourceVersion, CacheInvalidations) -> Bool)
-> [(MetadataResourceVersion, CacheInvalidations)] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
any ((MetadataResourceVersion -> MetadataResourceVersion -> Bool
forall a. Eq a => a -> a -> Bool
== (MetadataResourceVersion
engineResourceVersion MetadataResourceVersion
-> MetadataResourceVersion -> MetadataResourceVersion
forall a. Num a => a -> a -> a
+ MetadataResourceVersion
1)) (MetadataResourceVersion -> Bool)
-> ((MetadataResourceVersion, CacheInvalidations)
-> MetadataResourceVersion)
-> (MetadataResourceVersion, CacheInvalidations)
-> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (MetadataResourceVersion, CacheInvalidations)
-> MetadataResourceVersion
forall a b. (a, b) -> a
fst) [(MetadataResourceVersion, CacheInvalidations)]
notifications
then
[CacheInvalidations] -> CacheInvalidations
forall a. Monoid a => [a] -> a
mconcat ([CacheInvalidations] -> CacheInvalidations)
-> [CacheInvalidations] -> CacheInvalidations
forall a b. (a -> b) -> a -> b
$ (MetadataResourceVersion, CacheInvalidations) -> CacheInvalidations
forall a b. (a, b) -> b
snd ((MetadataResourceVersion, CacheInvalidations)
-> CacheInvalidations)
-> [(MetadataResourceVersion, CacheInvalidations)]
-> [CacheInvalidations]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [(MetadataResourceVersion, CacheInvalidations)]
notifications
else
CacheInvalidations
{ ciMetadata :: Bool
ciMetadata = Bool
True,
ciRemoteSchemas :: HashSet RemoteSchemaName
ciRemoteSchemas = [RemoteSchemaName] -> HashSet RemoteSchemaName
forall a. (Eq a, Hashable a) => [a] -> HashSet a
HS.fromList ([RemoteSchemaName] -> HashSet RemoteSchemaName)
-> [RemoteSchemaName] -> HashSet RemoteSchemaName
forall a b. (a -> b) -> a -> b
$ SchemaCache -> [RemoteSchemaName]
getAllRemoteSchemas SchemaCache
schemaCache,
ciSources :: HashSet SourceName
ciSources = [SourceName] -> HashSet SourceName
forall a. (Eq a, Hashable a) => [a] -> HashSet a
HS.fromList ([SourceName] -> HashSet SourceName)
-> [SourceName] -> HashSet SourceName
forall a b. (a -> b) -> a -> b
$ HashMap SourceName BackendSourceInfo -> [SourceName]
forall k v. HashMap k v -> [k]
HashMap.keys (HashMap SourceName BackendSourceInfo -> [SourceName])
-> HashMap SourceName BackendSourceInfo -> [SourceName]
forall a b. (a -> b) -> a -> b
$ SchemaCache -> HashMap SourceName BackendSourceInfo
scSources SchemaCache
schemaCache,
ciDataConnectors :: HashSet DataConnectorName
ciDataConnectors =
HashSet DataConnectorName
-> (BackendInfoWrapper 'DataConnector -> HashSet DataConnectorName)
-> Maybe (BackendInfoWrapper 'DataConnector)
-> HashSet DataConnectorName
forall b a. b -> (a -> b) -> Maybe a -> b
maybe HashSet DataConnectorName
forall a. Monoid a => a
mempty ([DataConnectorName] -> HashSet DataConnectorName
forall a. (Eq a, Hashable a) => [a] -> HashSet a
HS.fromList ([DataConnectorName] -> HashSet DataConnectorName)
-> (BackendInfoWrapper 'DataConnector -> [DataConnectorName])
-> BackendInfoWrapper 'DataConnector
-> HashSet DataConnectorName
forall b c a. (b -> c) -> (a -> b) -> a -> c
. HashMap DataConnectorName DataConnectorInfo -> [DataConnectorName]
forall k v. HashMap k v -> [k]
HashMap.keys (HashMap DataConnectorName DataConnectorInfo
-> [DataConnectorName])
-> (BackendInfoWrapper 'DataConnector
-> HashMap DataConnectorName DataConnectorInfo)
-> BackendInfoWrapper 'DataConnector
-> [DataConnectorName]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. BackendInfoWrapper 'DataConnector
-> HashMap DataConnectorName DataConnectorInfo
BackendInfoWrapper 'DataConnector -> BackendInfo 'DataConnector
forall (b :: BackendType). BackendInfoWrapper b -> BackendInfo b
unBackendInfoWrapper)
(Maybe (BackendInfoWrapper 'DataConnector)
-> HashSet DataConnectorName)
-> Maybe (BackendInfoWrapper 'DataConnector)
-> HashSet DataConnectorName
forall a b. (a -> b) -> a -> b
$ forall (b :: BackendType) (i :: BackendType -> *).
HasTag b =>
BackendMap i -> Maybe (i b)
BackendMap.lookup @'DataConnector
(BackendMap BackendInfoWrapper
-> Maybe (BackendInfoWrapper 'DataConnector))
-> BackendMap BackendInfoWrapper
-> Maybe (BackendInfoWrapper 'DataConnector)
forall a b. (a -> b) -> a -> b
$ SchemaCache -> BackendMap BackendInfoWrapper
scBackendCache SchemaCache
schemaCache
}
BuildReason
-> CacheInvalidations
-> Metadata
-> Maybe MetadataResourceVersion
-> CacheRWT (ExceptT QErr m) ()
forall (m :: * -> *).
CacheRWM m =>
BuildReason
-> CacheInvalidations
-> Metadata
-> Maybe MetadataResourceVersion
-> m ()
buildSchemaCacheWithOptions BuildReason
CatalogSync CacheInvalidations
cacheInvalidations Metadata
metadata (MetadataResourceVersion -> Maybe MetadataResourceVersion
forall a. a -> Maybe a
Just MetadataResourceVersion
latestResourceVersion)
MetadataResourceVersion -> CacheRWT (ExceptT QErr m) ()
forall (m :: * -> *). CacheRWM m => MetadataResourceVersion -> m ()
setMetadataResourceVersionInSchemaCache MetadataResourceVersion
latestResourceVersion
Logger Hasura
-> SchemaSyncThreadType -> Value -> CacheRWT (ExceptT QErr m) ()
forall (m :: * -> *).
MonadIO m =>
Logger Hasura -> SchemaSyncThreadType -> Value -> m ()
logInfo Logger Hasura
logger SchemaSyncThreadType
threadType
(Value -> CacheRWT (ExceptT QErr m) ())
-> Value -> CacheRWT (ExceptT QErr m) ()
forall a b. (a -> b) -> a -> b
$ Text -> Value
String
(Text -> Value) -> Text -> Value
forall a b. (a -> b) -> a -> b
$ Text
"Schema cache updated with resource version: "
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> MetadataResourceVersion -> Text
showMetadataResourceVersion MetadataResourceVersion
latestResourceVersion
((), RebuildableSchemaCache)
-> ExceptT QErr m ((), RebuildableSchemaCache)
forall a. a -> ExceptT QErr m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (()
msg, RebuildableSchemaCache
cache)
Either QErr () -> (QErr -> m ()) -> m ()
forall (m :: * -> *) e a.
Applicative m =>
Either e a -> (e -> m a) -> m a
onLeft Either QErr ()
respErr (Logger Hasura -> SchemaSyncThreadType -> ThreadError -> m ()
forall (m :: * -> *) a.
(MonadIO m, ToJSON a) =>
Logger Hasura -> SchemaSyncThreadType -> a -> m ()
logError Logger Hasura
logger SchemaSyncThreadType
threadType (ThreadError -> m ()) -> (QErr -> ThreadError) -> QErr -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. QErr -> ThreadError
TEQueryError)
logInfo :: (MonadIO m) => Logger Hasura -> SchemaSyncThreadType -> Value -> m ()
logInfo :: forall (m :: * -> *).
MonadIO m =>
Logger Hasura -> SchemaSyncThreadType -> Value -> m ()
logInfo Logger Hasura
logger SchemaSyncThreadType
threadType Value
val =
Logger Hasura
-> forall a (m :: * -> *).
(ToEngineLog a Hasura, MonadIO m) =>
a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
(ToEngineLog a impl, MonadIO m) =>
a -> m ()
unLogger Logger Hasura
logger
(SchemaSyncLog -> m ()) -> SchemaSyncLog -> m ()
forall a b. (a -> b) -> a -> b
$ LogLevel -> SchemaSyncThreadType -> Value -> SchemaSyncLog
SchemaSyncLog LogLevel
LevelInfo SchemaSyncThreadType
threadType Value
val
logError :: (MonadIO m, ToJSON a) => Logger Hasura -> SchemaSyncThreadType -> a -> m ()
logError :: forall (m :: * -> *) a.
(MonadIO m, ToJSON a) =>
Logger Hasura -> SchemaSyncThreadType -> a -> m ()
logError Logger Hasura
logger SchemaSyncThreadType
threadType a
err =
Logger Hasura
-> forall a (m :: * -> *).
(ToEngineLog a Hasura, MonadIO m) =>
a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
(ToEngineLog a impl, MonadIO m) =>
a -> m ()
unLogger Logger Hasura
logger
(SchemaSyncLog -> m ()) -> SchemaSyncLog -> m ()
forall a b. (a -> b) -> a -> b
$ LogLevel -> SchemaSyncThreadType -> Value -> SchemaSyncLog
SchemaSyncLog LogLevel
LevelError SchemaSyncThreadType
threadType
(Value -> SchemaSyncLog) -> Value -> SchemaSyncLog
forall a b. (a -> b) -> a -> b
$ [Pair] -> Value
object [Key
"error" Key -> Value -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
.= a -> Value
forall a. ToJSON a => a -> Value
toJSON a
err]