{-# LANGUAGE CPP #-}
{-# LANGUAGE TemplateHaskell #-}

module Hasura.Server.SchemaUpdate
  ( startSchemaSyncListenerThread,
    startSchemaSyncProcessorThread,
    ThreadType (..),
    ThreadError (..),
  )
where

import Control.Concurrent.Extended qualified as C
import Control.Concurrent.STM qualified as STM
import Control.Immortal qualified as Immortal
import Control.Monad.Loops qualified as L
import Control.Monad.Trans.Control (MonadBaseControl)
import Control.Monad.Trans.Managed (ManagedT)
import Data.Aeson
import Data.Aeson.Casing
import Data.Aeson.TH
import Data.HashMap.Strict qualified as HM
import Data.HashSet qualified as HS
import Database.PG.Query qualified as Q
import Hasura.Base.Error
import Hasura.Logging
import Hasura.Metadata.Class
import Hasura.Prelude
import Hasura.RQL.DDL.Schema (runCacheRWT)
import Hasura.RQL.DDL.Schema.Catalog
import Hasura.RQL.Types.Numeric (NonNegative)
import Hasura.RQL.Types.Numeric qualified as Numeric
import Hasura.RQL.Types.Run
import Hasura.RQL.Types.SchemaCache
import Hasura.RQL.Types.SchemaCache.Build
import Hasura.RQL.Types.Source
import Hasura.Server.Logging
import Hasura.Server.SchemaCacheRef
  ( SchemaCacheRef,
    readSchemaCacheRef,
    withSchemaCacheUpdate,
  )
import Hasura.Server.Types
import Hasura.Session
import Network.HTTP.Client qualified as HTTP

data ThreadType
  = TTListener
  | TTProcessor
  deriving (ThreadType -> ThreadType -> Bool
(ThreadType -> ThreadType -> Bool)
-> (ThreadType -> ThreadType -> Bool) -> Eq ThreadType
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ThreadType -> ThreadType -> Bool
$c/= :: ThreadType -> ThreadType -> Bool
== :: ThreadType -> ThreadType -> Bool
$c== :: ThreadType -> ThreadType -> Bool
Eq)

instance Show ThreadType where
  show :: ThreadType -> String
show ThreadType
TTListener = String
"listener"
  show ThreadType
TTProcessor = String
"processor"

data SchemaSyncThreadLog = SchemaSyncThreadLog
  { SchemaSyncThreadLog -> LogLevel
suelLogLevel :: !LogLevel,
    SchemaSyncThreadLog -> ThreadType
suelThreadType :: !ThreadType,
    SchemaSyncThreadLog -> Value
suelInfo :: !Value
  }
  deriving (Int -> SchemaSyncThreadLog -> ShowS
[SchemaSyncThreadLog] -> ShowS
SchemaSyncThreadLog -> String
(Int -> SchemaSyncThreadLog -> ShowS)
-> (SchemaSyncThreadLog -> String)
-> ([SchemaSyncThreadLog] -> ShowS)
-> Show SchemaSyncThreadLog
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [SchemaSyncThreadLog] -> ShowS
$cshowList :: [SchemaSyncThreadLog] -> ShowS
show :: SchemaSyncThreadLog -> String
$cshow :: SchemaSyncThreadLog -> String
showsPrec :: Int -> SchemaSyncThreadLog -> ShowS
$cshowsPrec :: Int -> SchemaSyncThreadLog -> ShowS
Show, SchemaSyncThreadLog -> SchemaSyncThreadLog -> Bool
(SchemaSyncThreadLog -> SchemaSyncThreadLog -> Bool)
-> (SchemaSyncThreadLog -> SchemaSyncThreadLog -> Bool)
-> Eq SchemaSyncThreadLog
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: SchemaSyncThreadLog -> SchemaSyncThreadLog -> Bool
$c/= :: SchemaSyncThreadLog -> SchemaSyncThreadLog -> Bool
== :: SchemaSyncThreadLog -> SchemaSyncThreadLog -> Bool
$c== :: SchemaSyncThreadLog -> SchemaSyncThreadLog -> Bool
Eq)

instance ToJSON SchemaSyncThreadLog where
  toJSON :: SchemaSyncThreadLog -> Value
toJSON (SchemaSyncThreadLog LogLevel
_ ThreadType
t Value
info) =
    [Pair] -> Value
object
      [ Key
"thread_type" Key -> String -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= ThreadType -> String
forall a. Show a => a -> String
show ThreadType
t,
        Key
"info" Key -> Value -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= Value
info
      ]

instance ToEngineLog SchemaSyncThreadLog Hasura where
  toEngineLog :: SchemaSyncThreadLog -> (LogLevel, EngineLogType Hasura, Value)
toEngineLog SchemaSyncThreadLog
threadLog =
    (SchemaSyncThreadLog -> LogLevel
suelLogLevel SchemaSyncThreadLog
threadLog, InternalLogTypes -> EngineLogType Hasura
ELTInternal InternalLogTypes
ILTSchemaSyncThread, SchemaSyncThreadLog -> Value
forall a. ToJSON a => a -> Value
toJSON SchemaSyncThreadLog
threadLog)

data ThreadError
  = TEPayloadParse !Text
  | TEQueryError !QErr

$( deriveToJSON
     defaultOptions
       { constructorTagModifier = snakeCase . drop 2,
         sumEncoding = TaggedObject "type" "info"
       }
     ''ThreadError
 )

logThreadStarted ::
  (MonadIO m) =>
  Logger Hasura ->
  InstanceId ->
  ThreadType ->
  Immortal.Thread ->
  m ()
logThreadStarted :: Logger Hasura -> InstanceId -> ThreadType -> Thread -> m ()
logThreadStarted Logger Hasura
logger InstanceId
instanceId ThreadType
threadType Thread
thread =
  let msg :: Text
msg = ThreadType -> Text
forall a. Show a => a -> Text
tshow ThreadType
threadType Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" thread started"
   in Logger Hasura
-> forall a (m :: * -> *).
   (ToEngineLog a Hasura, MonadIO m) =>
   a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
   (ToEngineLog a impl, MonadIO m) =>
   a -> m ()
unLogger Logger Hasura
logger (StartupLog -> m ()) -> StartupLog -> m ()
forall a b. (a -> b) -> a -> b
$
        LogLevel -> Text -> Value -> StartupLog
StartupLog LogLevel
LevelInfo Text
"schema-sync" (Value -> StartupLog) -> Value -> StartupLog
forall a b. (a -> b) -> a -> b
$
          [Pair] -> Value
object
            [ Key
"instance_id" Key -> Text -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= InstanceId -> Text
getInstanceId InstanceId
instanceId,
              Key
"thread_id" Key -> String -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= ThreadId -> String
forall a. Show a => a -> String
show (Thread -> ThreadId
Immortal.threadId Thread
thread),
              Key
"message" Key -> Text -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= Text
msg
            ]

{- Note [Schema Cache Sync]
~~~~~~~~~~~~~~~~~~~~~~~~~~~

When multiple graphql-engine instances are serving on same metadata storage,
each instance should have schema cache in sync with latest metadata. Somehow
all instances should communicate each other when any request has modified metadata.

We track the metadata schema version in postgres and poll for this
value in a thread.  When the schema version has changed, the instance
will update its local metadata schema and remove any invalidated schema cache data.

The following steps take place when an API request made to update metadata:

1. After handling the request we insert the new metadata schema json
   into a postgres tablealong with a schema version.

2. On start up, before initialising schema cache, an async thread is
   invoked to continuously poll the Postgres notifications table for
   the latest metadata schema version. The schema version is pushed to
   a shared `TMVar`.

3. Before starting API server, another async thread is invoked to
   process events pushed by the listener thread via the `TMVar`. If
   the instance's schema version is not current with the freshly
   updated TMVar version then we update the local metadata.

Why we need two threads if we can capture and reload schema cache in a single thread?

If we want to implement schema sync in a single async thread we have to invoke the same
after initialising schema cache. We may loose events that published after schema cache
init and before invoking the thread. In such case, schema cache is not in sync with metadata.
So we choose two threads in which one will start listening before schema cache init and the
other after it.

What happens if listen connection to Postgres is lost?

Listener thread will keep trying to establish connection to Postgres for every one second.
Once connection established, it pushes @'SSEListenStart' event with time. We aren't sure
about any metadata modify requests made in meanwhile. So we reload schema cache unconditionally
if listen started after schema cache init start time.

-}

-- | An async thread which listen to Postgres notify to enable schema syncing
-- See Note [Schema Cache Sync]
startSchemaSyncListenerThread ::
  C.ForkableMonadIO m =>
  Logger Hasura ->
  Q.PGPool ->
  InstanceId ->
  NonNegative Milliseconds ->
  STM.TMVar MetadataResourceVersion ->
  ManagedT m (Immortal.Thread)
startSchemaSyncListenerThread :: Logger Hasura
-> PGPool
-> InstanceId
-> NonNegative Milliseconds
-> TMVar MetadataResourceVersion
-> ManagedT m Thread
startSchemaSyncListenerThread Logger Hasura
logger PGPool
pool InstanceId
instanceId NonNegative Milliseconds
interval TMVar MetadataResourceVersion
metaVersionRef = do
  -- Start listener thread
  Thread
listenerThread <-
    String -> Logger Hasura -> m Void -> ManagedT m Thread
forall (m :: * -> *).
ForkableMonadIO m =>
String -> Logger Hasura -> m Void -> ManagedT m Thread
C.forkManagedT String
"SchemeUpdate.listener" Logger Hasura
logger (m Void -> ManagedT m Thread) -> m Void -> ManagedT m Thread
forall a b. (a -> b) -> a -> b
$
      Logger Hasura
-> PGPool
-> TMVar MetadataResourceVersion
-> Milliseconds
-> m Void
forall (m :: * -> *) void.
MonadIO m =>
Logger Hasura
-> PGPool
-> TMVar MetadataResourceVersion
-> Milliseconds
-> m void
listener Logger Hasura
logger PGPool
pool TMVar MetadataResourceVersion
metaVersionRef (NonNegative Milliseconds -> Milliseconds
forall a. NonNegative a -> a
Numeric.getNonNegative NonNegative Milliseconds
interval)
  Logger Hasura
-> InstanceId -> ThreadType -> Thread -> ManagedT m ()
forall (m :: * -> *).
MonadIO m =>
Logger Hasura -> InstanceId -> ThreadType -> Thread -> m ()
logThreadStarted Logger Hasura
logger InstanceId
instanceId ThreadType
TTListener Thread
listenerThread
  Thread -> ManagedT m Thread
forall (f :: * -> *) a. Applicative f => a -> f a
pure Thread
listenerThread

-- | An async thread which processes the schema sync events
-- See Note [Schema Cache Sync]
startSchemaSyncProcessorThread ::
  ( C.ForkableMonadIO m,
    MonadMetadataStorage (MetadataStorageT m),
    MonadResolveSource m
  ) =>
  Logger Hasura ->
  HTTP.Manager ->
  STM.TMVar MetadataResourceVersion ->
  SchemaCacheRef ->
  InstanceId ->
  ServerConfigCtx ->
  STM.TVar Bool ->
  ManagedT m Immortal.Thread
startSchemaSyncProcessorThread :: Logger Hasura
-> Manager
-> TMVar MetadataResourceVersion
-> SchemaCacheRef
-> InstanceId
-> ServerConfigCtx
-> TVar Bool
-> ManagedT m Thread
startSchemaSyncProcessorThread
  Logger Hasura
logger
  Manager
httpMgr
  TMVar MetadataResourceVersion
schemaSyncEventRef
  SchemaCacheRef
cacheRef
  InstanceId
instanceId
  ServerConfigCtx
serverConfigCtx
  TVar Bool
logTVar = do
    -- Start processor thread
    Thread
processorThread <-
      String -> Logger Hasura -> m Void -> ManagedT m Thread
forall (m :: * -> *).
ForkableMonadIO m =>
String -> Logger Hasura -> m Void -> ManagedT m Thread
C.forkManagedT String
"SchemeUpdate.processor" Logger Hasura
logger (m Void -> ManagedT m Thread) -> m Void -> ManagedT m Thread
forall a b. (a -> b) -> a -> b
$
        Logger Hasura
-> Manager
-> TMVar MetadataResourceVersion
-> SchemaCacheRef
-> InstanceId
-> ServerConfigCtx
-> TVar Bool
-> m Void
forall (m :: * -> *) void.
(ForkableMonadIO m, MonadMetadataStorage (MetadataStorageT m),
 MonadResolveSource m) =>
Logger Hasura
-> Manager
-> TMVar MetadataResourceVersion
-> SchemaCacheRef
-> InstanceId
-> ServerConfigCtx
-> TVar Bool
-> m void
processor Logger Hasura
logger Manager
httpMgr TMVar MetadataResourceVersion
schemaSyncEventRef SchemaCacheRef
cacheRef InstanceId
instanceId ServerConfigCtx
serverConfigCtx TVar Bool
logTVar
    Logger Hasura
-> InstanceId -> ThreadType -> Thread -> ManagedT m ()
forall (m :: * -> *).
MonadIO m =>
Logger Hasura -> InstanceId -> ThreadType -> Thread -> m ()
logThreadStarted Logger Hasura
logger InstanceId
instanceId ThreadType
TTProcessor Thread
processorThread
    Thread -> ManagedT m Thread
forall (f :: * -> *) a. Applicative f => a -> f a
pure Thread
processorThread

-- TODO: This is also defined in multitenant, consider putting it in a library somewhere
forcePut :: STM.TMVar a -> a -> IO ()
forcePut :: TMVar a -> a -> IO ()
forcePut TMVar a
v a
a = STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar a -> STM (Maybe a)
forall a. TMVar a -> STM (Maybe a)
STM.tryTakeTMVar TMVar a
v STM (Maybe a) -> STM () -> STM ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> TMVar a -> a -> STM ()
forall a. TMVar a -> a -> STM ()
STM.putTMVar TMVar a
v a
a

schemaVersionCheckHandler ::
  Q.PGPool -> STM.TMVar MetadataResourceVersion -> IO (Either QErr ())
schemaVersionCheckHandler :: PGPool -> TMVar MetadataResourceVersion -> IO (Either QErr ())
schemaVersionCheckHandler PGPool
pool TMVar MetadataResourceVersion
metaVersionRef =
  ExceptT QErr IO MetadataResourceVersion
-> IO (Either QErr MetadataResourceVersion)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT
    ( PGPool
-> TxMode
-> TxET QErr IO MetadataResourceVersion
-> ExceptT QErr IO MetadataResourceVersion
forall (m :: * -> *) e a.
(MonadIO m, MonadBaseControl IO m, FromPGTxErr e,
 FromPGConnErr e) =>
PGPool -> TxMode -> TxET e m a -> ExceptT e m a
Q.runTx PGPool
pool (TxIsolation
Q.RepeatableRead, Maybe TxAccess
forall a. Maybe a
Nothing) (TxET QErr IO MetadataResourceVersion
 -> ExceptT QErr IO MetadataResourceVersion)
-> TxET QErr IO MetadataResourceVersion
-> ExceptT QErr IO MetadataResourceVersion
forall a b. (a -> b) -> a -> b
$
        TxET QErr IO MetadataResourceVersion
fetchMetadataResourceVersionFromCatalog
    )
    IO (Either QErr MetadataResourceVersion)
-> (Either QErr MetadataResourceVersion -> IO (Either QErr ()))
-> IO (Either QErr ())
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      Right MetadataResourceVersion
version -> () -> Either QErr ()
forall a b. b -> Either a b
Right (() -> Either QErr ()) -> IO () -> IO (Either QErr ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TMVar MetadataResourceVersion -> MetadataResourceVersion -> IO ()
forall a. TMVar a -> a -> IO ()
forcePut TMVar MetadataResourceVersion
metaVersionRef MetadataResourceVersion
version
      Left QErr
err -> Either QErr () -> IO (Either QErr ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either QErr () -> IO (Either QErr ()))
-> Either QErr () -> IO (Either QErr ())
forall a b. (a -> b) -> a -> b
$ QErr -> Either QErr ()
forall a b. a -> Either a b
Left QErr
err

data ErrorState = ErrorState
  { ErrorState -> Maybe QErr
_esLastErrorSeen :: !(Maybe QErr),
    ErrorState -> Maybe MetadataResourceVersion
_esLastMetadataVersion :: !(Maybe MetadataResourceVersion)
  }
  deriving (ErrorState -> ErrorState -> Bool
(ErrorState -> ErrorState -> Bool)
-> (ErrorState -> ErrorState -> Bool) -> Eq ErrorState
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ErrorState -> ErrorState -> Bool
$c/= :: ErrorState -> ErrorState -> Bool
== :: ErrorState -> ErrorState -> Bool
$c== :: ErrorState -> ErrorState -> Bool
Eq)

-- NOTE: The ErrorState type is to be used mainly for the `listener` method below.
--       This will help prevent logging the same error with the same MetadataResourceVersion
--       multiple times consecutively. When the `listener` is in ErrorState we don't log the
--       next error until the resource version has changed/updated.

defaultErrorState :: ErrorState
defaultErrorState :: ErrorState
defaultErrorState = Maybe QErr -> Maybe MetadataResourceVersion -> ErrorState
ErrorState Maybe QErr
forall a. Maybe a
Nothing Maybe MetadataResourceVersion
forall a. Maybe a
Nothing

-- | NOTE: this can be updated to use lenses
updateErrorInState :: ErrorState -> QErr -> MetadataResourceVersion -> ErrorState
updateErrorInState :: ErrorState -> QErr -> MetadataResourceVersion -> ErrorState
updateErrorInState ErrorState
es QErr
qerr MetadataResourceVersion
mrv =
  ErrorState
es
    { _esLastErrorSeen :: Maybe QErr
_esLastErrorSeen = QErr -> Maybe QErr
forall a. a -> Maybe a
Just QErr
qerr,
      _esLastMetadataVersion :: Maybe MetadataResourceVersion
_esLastMetadataVersion = MetadataResourceVersion -> Maybe MetadataResourceVersion
forall a. a -> Maybe a
Just MetadataResourceVersion
mrv
    }

isInErrorState :: ErrorState -> Bool
isInErrorState :: ErrorState -> Bool
isInErrorState ErrorState
es =
  (Maybe QErr -> Bool
forall a. Maybe a -> Bool
isJust (Maybe QErr -> Bool)
-> (ErrorState -> Maybe QErr) -> ErrorState -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ErrorState -> Maybe QErr
_esLastErrorSeen) ErrorState
es Bool -> Bool -> Bool
&& (Maybe MetadataResourceVersion -> Bool
forall a. Maybe a -> Bool
isJust (Maybe MetadataResourceVersion -> Bool)
-> (ErrorState -> Maybe MetadataResourceVersion)
-> ErrorState
-> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ErrorState -> Maybe MetadataResourceVersion
_esLastMetadataVersion) ErrorState
es

toLogError :: ErrorState -> QErr -> MetadataResourceVersion -> Bool
toLogError :: ErrorState -> QErr -> MetadataResourceVersion -> Bool
toLogError ErrorState
es QErr
qerr MetadataResourceVersion
mrv = Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ Bool
isQErrLastSeen Bool -> Bool -> Bool
|| Bool
isMetadataResourceVersionLastSeen
  where
    isQErrLastSeen :: Bool
isQErrLastSeen = case ErrorState -> Maybe QErr
_esLastErrorSeen ErrorState
es of
      Just QErr
lErrS -> QErr
lErrS QErr -> QErr -> Bool
forall a. Eq a => a -> a -> Bool
== QErr
qerr
      Maybe QErr
Nothing -> Bool
False

    isMetadataResourceVersionLastSeen :: Bool
isMetadataResourceVersionLastSeen = case ErrorState -> Maybe MetadataResourceVersion
_esLastMetadataVersion ErrorState
es of
      Just MetadataResourceVersion
lMRV -> MetadataResourceVersion
lMRV MetadataResourceVersion -> MetadataResourceVersion -> Bool
forall a. Eq a => a -> a -> Bool
== MetadataResourceVersion
mrv
      Maybe MetadataResourceVersion
Nothing -> Bool
False

-- | An IO action that listens to postgres for events and pushes them to a Queue, in a loop forever.
listener ::
  MonadIO m =>
  Logger Hasura ->
  Q.PGPool ->
  STM.TMVar MetadataResourceVersion ->
  Milliseconds ->
  m void
listener :: Logger Hasura
-> PGPool
-> TMVar MetadataResourceVersion
-> Milliseconds
-> m void
listener Logger Hasura
logger PGPool
pool TMVar MetadataResourceVersion
metaVersionRef Milliseconds
interval = (ErrorState -> m ErrorState) -> ErrorState -> m void
forall (m :: * -> *) a b. Monad m => (a -> m a) -> a -> m b
L.iterateM_ ErrorState -> m ErrorState
listenerLoop ErrorState
defaultErrorState
  where
    listenerLoop :: ErrorState -> m ErrorState
listenerLoop ErrorState
errorState = do
      Maybe MetadataResourceVersion
mrv <- IO (Maybe MetadataResourceVersion)
-> m (Maybe MetadataResourceVersion)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe MetadataResourceVersion)
 -> m (Maybe MetadataResourceVersion))
-> IO (Maybe MetadataResourceVersion)
-> m (Maybe MetadataResourceVersion)
forall a b. (a -> b) -> a -> b
$ STM (Maybe MetadataResourceVersion)
-> IO (Maybe MetadataResourceVersion)
forall a. STM a -> IO a
STM.atomically (STM (Maybe MetadataResourceVersion)
 -> IO (Maybe MetadataResourceVersion))
-> STM (Maybe MetadataResourceVersion)
-> IO (Maybe MetadataResourceVersion)
forall a b. (a -> b) -> a -> b
$ TMVar MetadataResourceVersion
-> STM (Maybe MetadataResourceVersion)
forall a. TMVar a -> STM (Maybe a)
STM.tryTakeTMVar TMVar MetadataResourceVersion
metaVersionRef
      Either QErr ()
resp <- IO (Either QErr ()) -> m (Either QErr ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either QErr ()) -> m (Either QErr ()))
-> IO (Either QErr ()) -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$ PGPool -> TMVar MetadataResourceVersion -> IO (Either QErr ())
schemaVersionCheckHandler PGPool
pool TMVar MetadataResourceVersion
metaVersionRef
      let metadataVersion :: MetadataResourceVersion
metadataVersion = MetadataResourceVersion
-> Maybe MetadataResourceVersion -> MetadataResourceVersion
forall a. a -> Maybe a -> a
fromMaybe MetadataResourceVersion
initialResourceVersion Maybe MetadataResourceVersion
mrv
      ErrorState
nextErr <- case Either QErr ()
resp of
        Left QErr
respErr -> do
          if (ErrorState -> QErr -> MetadataResourceVersion -> Bool
toLogError ErrorState
errorState QErr
respErr MetadataResourceVersion
metadataVersion)
            then do
              Logger Hasura -> ThreadType -> ThreadError -> m ()
forall (m :: * -> *) a.
(MonadIO m, ToJSON a) =>
Logger Hasura -> ThreadType -> a -> m ()
logError Logger Hasura
logger ThreadType
TTListener (ThreadError -> m ()) -> ThreadError -> m ()
forall a b. (a -> b) -> a -> b
$ QErr -> ThreadError
TEQueryError QErr
respErr
              Logger Hasura -> ThreadType -> Value -> m ()
forall (m :: * -> *).
MonadIO m =>
Logger Hasura -> ThreadType -> Value -> m ()
logInfo Logger Hasura
logger ThreadType
TTListener (Value -> m ()) -> Value -> m ()
forall a b. (a -> b) -> a -> b
$ [Pair] -> Value
object [Key
"metadataResourceVersion" Key -> Value -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= MetadataResourceVersion -> Value
forall a. ToJSON a => a -> Value
toJSON MetadataResourceVersion
metadataVersion]
              ErrorState -> m ErrorState
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ErrorState -> m ErrorState) -> ErrorState -> m ErrorState
forall a b. (a -> b) -> a -> b
$ ErrorState -> QErr -> MetadataResourceVersion -> ErrorState
updateErrorInState ErrorState
errorState QErr
respErr MetadataResourceVersion
metadataVersion
            else do
              ErrorState -> m ErrorState
forall (f :: * -> *) a. Applicative f => a -> f a
pure ErrorState
errorState
        Right ()
_ -> do
          Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ErrorState -> Bool
isInErrorState ErrorState
errorState) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
            Logger Hasura -> ThreadType -> Value -> m ()
forall (m :: * -> *).
MonadIO m =>
Logger Hasura -> ThreadType -> Value -> m ()
logInfo Logger Hasura
logger ThreadType
TTListener (Value -> m ()) -> Value -> m ()
forall a b. (a -> b) -> a -> b
$ [Pair] -> Value
object [Key
"message" Key -> Text -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= (Text
"SchemaSync Restored..." :: Text)]
          ErrorState -> m ErrorState
forall (f :: * -> *) a. Applicative f => a -> f a
pure ErrorState
defaultErrorState
      IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ DiffTime -> IO ()
C.sleep (DiffTime -> IO ()) -> DiffTime -> IO ()
forall a b. (a -> b) -> a -> b
$ Milliseconds -> DiffTime
milliseconds Milliseconds
interval
      ErrorState -> m ErrorState
forall (f :: * -> *) a. Applicative f => a -> f a
pure ErrorState
nextErr

-- | An IO action that processes events from Queue, in a loop forever.
processor ::
  forall m void.
  ( C.ForkableMonadIO m,
    MonadMetadataStorage (MetadataStorageT m),
    MonadResolveSource m
  ) =>
  Logger Hasura ->
  HTTP.Manager ->
  STM.TMVar MetadataResourceVersion ->
  SchemaCacheRef ->
  InstanceId ->
  ServerConfigCtx ->
  STM.TVar Bool ->
  m void
processor :: Logger Hasura
-> Manager
-> TMVar MetadataResourceVersion
-> SchemaCacheRef
-> InstanceId
-> ServerConfigCtx
-> TVar Bool
-> m void
processor
  Logger Hasura
logger
  Manager
httpMgr
  TMVar MetadataResourceVersion
metaVersionRef
  SchemaCacheRef
cacheRef
  InstanceId
instanceId
  ServerConfigCtx
serverConfigCtx
  TVar Bool
logTVar = m () -> m void
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m void) -> m () -> m void
forall a b. (a -> b) -> a -> b
$ do
    MetadataResourceVersion
metaVersion <- IO MetadataResourceVersion -> m MetadataResourceVersion
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO MetadataResourceVersion -> m MetadataResourceVersion)
-> IO MetadataResourceVersion -> m MetadataResourceVersion
forall a b. (a -> b) -> a -> b
$ STM MetadataResourceVersion -> IO MetadataResourceVersion
forall a. STM a -> IO a
STM.atomically (STM MetadataResourceVersion -> IO MetadataResourceVersion)
-> STM MetadataResourceVersion -> IO MetadataResourceVersion
forall a b. (a -> b) -> a -> b
$ TMVar MetadataResourceVersion -> STM MetadataResourceVersion
forall a. TMVar a -> STM a
STM.takeTMVar TMVar MetadataResourceVersion
metaVersionRef
    Either QErr ()
respErr <-
      MetadataStorageT m () -> m (Either QErr ())
forall (m :: * -> *) a. MetadataStorageT m a -> m (Either QErr a)
runMetadataStorageT (MetadataStorageT m () -> m (Either QErr ()))
-> MetadataStorageT m () -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$
        MetadataResourceVersion
-> InstanceId
-> Logger Hasura
-> Manager
-> SchemaCacheRef
-> ThreadType
-> ServerConfigCtx
-> TVar Bool
-> MetadataStorageT m ()
forall (m :: * -> *).
(MonadIO m, MonadBaseControl IO m, MonadMetadataStorage m,
 MonadResolveSource m) =>
MetadataResourceVersion
-> InstanceId
-> Logger Hasura
-> Manager
-> SchemaCacheRef
-> ThreadType
-> ServerConfigCtx
-> TVar Bool
-> m ()
refreshSchemaCache MetadataResourceVersion
metaVersion InstanceId
instanceId Logger Hasura
logger Manager
httpMgr SchemaCacheRef
cacheRef ThreadType
TTProcessor ServerConfigCtx
serverConfigCtx TVar Bool
logTVar
    Either QErr () -> (QErr -> m ()) -> m ()
forall (m :: * -> *) e a.
Applicative m =>
Either e a -> (e -> m a) -> m a
onLeft Either QErr ()
respErr (Logger Hasura -> ThreadType -> ThreadError -> m ()
forall (m :: * -> *) a.
(MonadIO m, ToJSON a) =>
Logger Hasura -> ThreadType -> a -> m ()
logError Logger Hasura
logger ThreadType
TTProcessor (ThreadError -> m ()) -> (QErr -> ThreadError) -> QErr -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. QErr -> ThreadError
TEQueryError)

refreshSchemaCache ::
  ( MonadIO m,
    MonadBaseControl IO m,
    MonadMetadataStorage m,
    MonadResolveSource m
  ) =>
  MetadataResourceVersion ->
  InstanceId ->
  Logger Hasura ->
  HTTP.Manager ->
  SchemaCacheRef ->
  ThreadType ->
  ServerConfigCtx ->
  STM.TVar Bool ->
  m ()
refreshSchemaCache :: MetadataResourceVersion
-> InstanceId
-> Logger Hasura
-> Manager
-> SchemaCacheRef
-> ThreadType
-> ServerConfigCtx
-> TVar Bool
-> m ()
refreshSchemaCache
  MetadataResourceVersion
resourceVersion
  InstanceId
instanceId
  Logger Hasura
logger
  Manager
httpManager
  SchemaCacheRef
cacheRef
  ThreadType
threadType
  ServerConfigCtx
serverConfigCtx
  TVar Bool
logTVar = do
    Either QErr ()
respErr <- ExceptT QErr m () -> m (Either QErr ())
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT QErr m () -> m (Either QErr ()))
-> ExceptT QErr m () -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$
      SchemaCacheRef
-> Logger Hasura
-> Maybe (TVar Bool)
-> ExceptT QErr m ((), RebuildableSchemaCache)
-> ExceptT QErr m ()
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
SchemaCacheRef
-> Logger Hasura
-> Maybe (TVar Bool)
-> m (a, RebuildableSchemaCache)
-> m a
withSchemaCacheUpdate SchemaCacheRef
cacheRef Logger Hasura
logger (TVar Bool -> Maybe (TVar Bool)
forall a. a -> Maybe a
Just TVar Bool
logTVar) (ExceptT QErr m ((), RebuildableSchemaCache) -> ExceptT QErr m ())
-> ExceptT QErr m ((), RebuildableSchemaCache) -> ExceptT QErr m ()
forall a b. (a -> b) -> a -> b
$ do
        RebuildableSchemaCache
rebuildableCache <- IO RebuildableSchemaCache -> ExceptT QErr m RebuildableSchemaCache
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO RebuildableSchemaCache
 -> ExceptT QErr m RebuildableSchemaCache)
-> IO RebuildableSchemaCache
-> ExceptT QErr m RebuildableSchemaCache
forall a b. (a -> b) -> a -> b
$ (RebuildableSchemaCache, SchemaCacheVer) -> RebuildableSchemaCache
forall a b. (a, b) -> a
fst ((RebuildableSchemaCache, SchemaCacheVer)
 -> RebuildableSchemaCache)
-> IO (RebuildableSchemaCache, SchemaCacheVer)
-> IO RebuildableSchemaCache
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> SchemaCacheRef -> IO (RebuildableSchemaCache, SchemaCacheVer)
readSchemaCacheRef SchemaCacheRef
cacheRef
        (()
msg, RebuildableSchemaCache
cache, CacheInvalidations
_) <- RunCtx
-> RunT m ((), RebuildableSchemaCache, CacheInvalidations)
-> ExceptT QErr m ((), RebuildableSchemaCache, CacheInvalidations)
forall (m :: * -> *) a. RunCtx -> RunT m a -> ExceptT QErr m a
peelRun RunCtx
runCtx (RunT m ((), RebuildableSchemaCache, CacheInvalidations)
 -> ExceptT QErr m ((), RebuildableSchemaCache, CacheInvalidations))
-> RunT m ((), RebuildableSchemaCache, CacheInvalidations)
-> ExceptT QErr m ((), RebuildableSchemaCache, CacheInvalidations)
forall a b. (a -> b) -> a -> b
$
          RebuildableSchemaCache
-> CacheRWT (RunT m) ()
-> RunT m ((), RebuildableSchemaCache, CacheInvalidations)
forall (m :: * -> *) a.
Functor m =>
RebuildableSchemaCache
-> CacheRWT m a
-> m (a, RebuildableSchemaCache, CacheInvalidations)
runCacheRWT RebuildableSchemaCache
rebuildableCache (CacheRWT (RunT m) ()
 -> RunT m ((), RebuildableSchemaCache, CacheInvalidations))
-> CacheRWT (RunT m) ()
-> RunT m ((), RebuildableSchemaCache, CacheInvalidations)
forall a b. (a -> b) -> a -> b
$ do
            SchemaCache
schemaCache <- CacheRWT (RunT m) SchemaCache
forall (m :: * -> *). CacheRM m => m SchemaCache
askSchemaCache
            case SchemaCache -> Maybe MetadataResourceVersion
scMetadataResourceVersion SchemaCache
schemaCache of
              -- While starting up, the metadata resource version is set to nothing, so we want to set the version
              -- without fetching the database metadata (as we have already fetched it during the startup, so, we
              -- skip fetching it twice)
              Maybe MetadataResourceVersion
Nothing -> MetadataResourceVersion -> CacheRWT (RunT m) ()
forall (m :: * -> *). CacheRWM m => MetadataResourceVersion -> m ()
setMetadataResourceVersionInSchemaCache MetadataResourceVersion
resourceVersion
              Just MetadataResourceVersion
engineResourceVersion ->
                Bool -> CacheRWT (RunT m) () -> CacheRWT (RunT m) ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (MetadataResourceVersion
engineResourceVersion MetadataResourceVersion -> MetadataResourceVersion -> Bool
forall a. Eq a => a -> a -> Bool
== MetadataResourceVersion
resourceVersion) (CacheRWT (RunT m) () -> CacheRWT (RunT m) ())
-> CacheRWT (RunT m) () -> CacheRWT (RunT m) ()
forall a b. (a -> b) -> a -> b
$ do
                  (Metadata
metadata, MetadataResourceVersion
latestResourceVersion) <- CacheRWT (RunT m) (Metadata, MetadataResourceVersion)
forall (m :: * -> *).
MonadMetadataStorage m =>
m (Metadata, MetadataResourceVersion)
fetchMetadata
                  [(MetadataResourceVersion, CacheInvalidations)]
notifications <- MetadataResourceVersion
-> InstanceId
-> CacheRWT
     (RunT m) [(MetadataResourceVersion, CacheInvalidations)]
forall (m :: * -> *).
MonadMetadataStorage m =>
MetadataResourceVersion
-> InstanceId -> m [(MetadataResourceVersion, CacheInvalidations)]
fetchMetadataNotifications MetadataResourceVersion
engineResourceVersion InstanceId
instanceId

                  Logger Hasura -> ThreadType -> String -> CacheRWT (RunT m) ()
forall (m :: * -> *).
MonadIO m =>
Logger Hasura -> ThreadType -> String -> m ()
logDebug Logger Hasura
logger ThreadType
threadType (String -> CacheRWT (RunT m) ()) -> String -> CacheRWT (RunT m) ()
forall a b. (a -> b) -> a -> b
$ String
"DEBUG: refreshSchemaCache Called: engineResourceVersion: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> MetadataResourceVersion -> String
forall a. Show a => a -> String
show MetadataResourceVersion
engineResourceVersion String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
", fresh resource version: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> MetadataResourceVersion -> String
forall a. Show a => a -> String
show MetadataResourceVersion
latestResourceVersion
                  case [(MetadataResourceVersion, CacheInvalidations)]
notifications of
                    [] -> do
                      Logger Hasura -> ThreadType -> Value -> CacheRWT (RunT m) ()
forall (m :: * -> *).
MonadIO m =>
Logger Hasura -> ThreadType -> Value -> m ()
logInfo Logger Hasura
logger ThreadType
threadType (Value -> CacheRWT (RunT m) ()) -> Value -> CacheRWT (RunT m) ()
forall a b. (a -> b) -> a -> b
$ [Pair] -> Value
object [Key
"message" Key -> Text -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= (Text
"Schema Version changed with no notifications" :: Text)]
                      MetadataResourceVersion -> CacheRWT (RunT m) ()
forall (m :: * -> *). CacheRWM m => MetadataResourceVersion -> m ()
setMetadataResourceVersionInSchemaCache MetadataResourceVersion
latestResourceVersion
                    [(MetadataResourceVersion, CacheInvalidations)]
_ -> do
                      let cacheInvalidations :: CacheInvalidations
cacheInvalidations =
                            if ((MetadataResourceVersion, CacheInvalidations) -> Bool)
-> [(MetadataResourceVersion, CacheInvalidations)] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
any ((MetadataResourceVersion -> MetadataResourceVersion -> Bool
forall a. Eq a => a -> a -> Bool
== (MetadataResourceVersion
engineResourceVersion MetadataResourceVersion
-> MetadataResourceVersion -> MetadataResourceVersion
forall a. Num a => a -> a -> a
+ MetadataResourceVersion
1)) (MetadataResourceVersion -> Bool)
-> ((MetadataResourceVersion, CacheInvalidations)
    -> MetadataResourceVersion)
-> (MetadataResourceVersion, CacheInvalidations)
-> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (MetadataResourceVersion, CacheInvalidations)
-> MetadataResourceVersion
forall a b. (a, b) -> a
fst) [(MetadataResourceVersion, CacheInvalidations)]
notifications
                              then -- If (engineResourceVersion + 1) is in the list of notifications then
                              -- we know that we haven't missed any.
                                [CacheInvalidations] -> CacheInvalidations
forall a. Monoid a => [a] -> a
mconcat ([CacheInvalidations] -> CacheInvalidations)
-> [CacheInvalidations] -> CacheInvalidations
forall a b. (a -> b) -> a -> b
$ (MetadataResourceVersion, CacheInvalidations) -> CacheInvalidations
forall a b. (a, b) -> b
snd ((MetadataResourceVersion, CacheInvalidations)
 -> CacheInvalidations)
-> [(MetadataResourceVersion, CacheInvalidations)]
-> [CacheInvalidations]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [(MetadataResourceVersion, CacheInvalidations)]
notifications
                              else -- Otherwise we may have missed some notifications so we need to invalidate the
                              -- whole cache.

                                CacheInvalidations :: Bool
-> HashSet RemoteSchemaName
-> HashSet SourceName
-> CacheInvalidations
CacheInvalidations
                                  { ciMetadata :: Bool
ciMetadata = Bool
True,
                                    ciRemoteSchemas :: HashSet RemoteSchemaName
ciRemoteSchemas = [RemoteSchemaName] -> HashSet RemoteSchemaName
forall a. (Eq a, Hashable a) => [a] -> HashSet a
HS.fromList ([RemoteSchemaName] -> HashSet RemoteSchemaName)
-> [RemoteSchemaName] -> HashSet RemoteSchemaName
forall a b. (a -> b) -> a -> b
$ SchemaCache -> [RemoteSchemaName]
getAllRemoteSchemas SchemaCache
schemaCache,
                                    ciSources :: HashSet SourceName
ciSources = [SourceName] -> HashSet SourceName
forall a. (Eq a, Hashable a) => [a] -> HashSet a
HS.fromList ([SourceName] -> HashSet SourceName)
-> [SourceName] -> HashSet SourceName
forall a b. (a -> b) -> a -> b
$ HashMap SourceName BackendSourceInfo -> [SourceName]
forall k v. HashMap k v -> [k]
HM.keys (HashMap SourceName BackendSourceInfo -> [SourceName])
-> HashMap SourceName BackendSourceInfo -> [SourceName]
forall a b. (a -> b) -> a -> b
$ SchemaCache -> HashMap SourceName BackendSourceInfo
scSources SchemaCache
schemaCache
                                  }
                      Logger Hasura -> ThreadType -> Value -> CacheRWT (RunT m) ()
forall (m :: * -> *).
MonadIO m =>
Logger Hasura -> ThreadType -> Value -> m ()
logInfo Logger Hasura
logger ThreadType
threadType (Value -> CacheRWT (RunT m) ()) -> Value -> CacheRWT (RunT m) ()
forall a b. (a -> b) -> a -> b
$ [Pair] -> Value
object [Key
"currentVersion" Key -> MetadataResourceVersion -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= MetadataResourceVersion
engineResourceVersion, Key
"latestResourceVersion" Key -> MetadataResourceVersion -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= MetadataResourceVersion
latestResourceVersion]
                      BuildReason
-> CacheInvalidations -> Metadata -> CacheRWT (RunT m) ()
forall (m :: * -> *).
CacheRWM m =>
BuildReason -> CacheInvalidations -> Metadata -> m ()
buildSchemaCacheWithOptions BuildReason
CatalogSync CacheInvalidations
cacheInvalidations Metadata
metadata
                      MetadataResourceVersion -> CacheRWT (RunT m) ()
forall (m :: * -> *). CacheRWM m => MetadataResourceVersion -> m ()
setMetadataResourceVersionInSchemaCache MetadataResourceVersion
latestResourceVersion
                      Logger Hasura -> ThreadType -> Value -> CacheRWT (RunT m) ()
forall (m :: * -> *).
MonadIO m =>
Logger Hasura -> ThreadType -> Value -> m ()
logInfo Logger Hasura
logger ThreadType
threadType (Value -> CacheRWT (RunT m) ()) -> Value -> CacheRWT (RunT m) ()
forall a b. (a -> b) -> a -> b
$ [Pair] -> Value
object [Key
"message" Key -> Text -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= (Text
"Schema Version changed with notifications" :: Text)]
        ((), RebuildableSchemaCache)
-> ExceptT QErr m ((), RebuildableSchemaCache)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (()
msg, RebuildableSchemaCache
cache)
    Either QErr () -> (QErr -> m ()) -> m ()
forall (m :: * -> *) e a.
Applicative m =>
Either e a -> (e -> m a) -> m a
onLeft Either QErr ()
respErr (Logger Hasura -> ThreadType -> ThreadError -> m ()
forall (m :: * -> *) a.
(MonadIO m, ToJSON a) =>
Logger Hasura -> ThreadType -> a -> m ()
logError Logger Hasura
logger ThreadType
threadType (ThreadError -> m ()) -> (QErr -> ThreadError) -> QErr -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. QErr -> ThreadError
TEQueryError)
    where
      runCtx :: RunCtx
runCtx = UserInfo -> Manager -> ServerConfigCtx -> RunCtx
RunCtx UserInfo
adminUserInfo Manager
httpManager ServerConfigCtx
serverConfigCtx

logInfo :: (MonadIO m) => Logger Hasura -> ThreadType -> Value -> m ()
logInfo :: Logger Hasura -> ThreadType -> Value -> m ()
logInfo Logger Hasura
logger ThreadType
threadType Value
val =
  Logger Hasura
-> forall a (m :: * -> *).
   (ToEngineLog a Hasura, MonadIO m) =>
   a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
   (ToEngineLog a impl, MonadIO m) =>
   a -> m ()
unLogger Logger Hasura
logger (SchemaSyncThreadLog -> m ()) -> SchemaSyncThreadLog -> m ()
forall a b. (a -> b) -> a -> b
$
    LogLevel -> ThreadType -> Value -> SchemaSyncThreadLog
SchemaSyncThreadLog LogLevel
LevelInfo ThreadType
threadType Value
val

logError :: (MonadIO m, ToJSON a) => Logger Hasura -> ThreadType -> a -> m ()
logError :: Logger Hasura -> ThreadType -> a -> m ()
logError Logger Hasura
logger ThreadType
threadType a
err =
  Logger Hasura
-> forall a (m :: * -> *).
   (ToEngineLog a Hasura, MonadIO m) =>
   a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
   (ToEngineLog a impl, MonadIO m) =>
   a -> m ()
unLogger Logger Hasura
logger (SchemaSyncThreadLog -> m ()) -> SchemaSyncThreadLog -> m ()
forall a b. (a -> b) -> a -> b
$
    LogLevel -> ThreadType -> Value -> SchemaSyncThreadLog
SchemaSyncThreadLog LogLevel
LevelError ThreadType
threadType (Value -> SchemaSyncThreadLog) -> Value -> SchemaSyncThreadLog
forall a b. (a -> b) -> a -> b
$
      [Pair] -> Value
object [Key
"error" Key -> Value -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= a -> Value
forall a. ToJSON a => a -> Value
toJSON a
err]

logDebug :: (MonadIO m) => Logger Hasura -> ThreadType -> String -> m ()
logDebug :: Logger Hasura -> ThreadType -> String -> m ()
logDebug Logger Hasura
logger ThreadType
threadType String
msg =
  Logger Hasura
-> forall a (m :: * -> *).
   (ToEngineLog a Hasura, MonadIO m) =>
   a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
   (ToEngineLog a impl, MonadIO m) =>
   a -> m ()
unLogger Logger Hasura
logger (SchemaSyncThreadLog -> m ()) -> SchemaSyncThreadLog -> m ()
forall a b. (a -> b) -> a -> b
$
    LogLevel -> ThreadType -> Value -> SchemaSyncThreadLog
SchemaSyncThreadLog LogLevel
LevelDebug ThreadType
threadType (Value -> SchemaSyncThreadLog) -> Value -> SchemaSyncThreadLog
forall a b. (a -> b) -> a -> b
$ [Pair] -> Value
object [Key
"message" Key -> String -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= String
msg]