module Hasura.Server.SchemaUpdate
  ( startSchemaSyncListenerThread,
    startSchemaSyncProcessorThread,
    SchemaSyncThreadType (..),
  )
where

import Control.Concurrent.Extended qualified as C
import Control.Concurrent.STM qualified as STM
import Control.Immortal qualified as Immortal
import Control.Monad.Loops qualified as L
import Control.Monad.Trans.Control (MonadBaseControl)
import Control.Monad.Trans.Managed (ManagedT)
import Data.Aeson
import Data.Aeson.Casing
import Data.HashMap.Strict qualified as HashMap
import Data.HashSet qualified as HS
import Data.Text qualified as T
import Database.PG.Query qualified as PG
import Hasura.App.State
import Hasura.Base.Error
import Hasura.Logging
import Hasura.Metadata.Class
import Hasura.Prelude
import Hasura.RQL.DDL.Schema (runCacheRWT)
import Hasura.RQL.DDL.Schema.Cache.Config
import Hasura.RQL.DDL.Schema.Catalog
import Hasura.RQL.Types.BackendType (BackendType (..))
import Hasura.RQL.Types.SchemaCache
import Hasura.RQL.Types.SchemaCache.Build
import Hasura.RQL.Types.Source
import Hasura.SQL.BackendMap qualified as BackendMap
import Hasura.Server.AppStateRef
  ( AppStateRef,
    getAppContext,
    getRebuildableSchemaCacheWithVersion,
    withSchemaCacheUpdate,
  )
import Hasura.Server.Logging
import Hasura.Server.Types
import Hasura.Services
import Refined (NonNegative, Refined, unrefine)

data ThreadError
  = TEPayloadParse !Text
  | TEQueryError !QErr
  deriving ((forall x. ThreadError -> Rep ThreadError x)
-> (forall x. Rep ThreadError x -> ThreadError)
-> Generic ThreadError
forall x. Rep ThreadError x -> ThreadError
forall x. ThreadError -> Rep ThreadError x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. ThreadError -> Rep ThreadError x
from :: forall x. ThreadError -> Rep ThreadError x
$cto :: forall x. Rep ThreadError x -> ThreadError
to :: forall x. Rep ThreadError x -> ThreadError
Generic)

instance ToJSON ThreadError where
  toJSON :: ThreadError -> Value
toJSON =
    Options -> ThreadError -> Value
forall a.
(Generic a, GToJSON' Value Zero (Rep a)) =>
Options -> a -> Value
genericToJSON
      Options
defaultOptions
        { constructorTagModifier :: String -> String
constructorTagModifier = String -> String
snakeCase (String -> String) -> (String -> String) -> String -> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> String -> String
forall a. Int -> [a] -> [a]
drop Int
2,
          sumEncoding :: SumEncoding
sumEncoding = String -> String -> SumEncoding
TaggedObject String
"type" String
"info"
        }
  toEncoding :: ThreadError -> Encoding
toEncoding =
    Options -> ThreadError -> Encoding
forall a.
(Generic a, GToJSON' Encoding Zero (Rep a)) =>
Options -> a -> Encoding
genericToEncoding
      Options
defaultOptions
        { constructorTagModifier :: String -> String
constructorTagModifier = String -> String
snakeCase (String -> String) -> (String -> String) -> String -> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> String -> String
forall a. Int -> [a] -> [a]
drop Int
2,
          sumEncoding :: SumEncoding
sumEncoding = String -> String -> SumEncoding
TaggedObject String
"type" String
"info"
        }

logThreadStarted ::
  (MonadIO m) =>
  Logger Hasura ->
  InstanceId ->
  SchemaSyncThreadType ->
  Immortal.Thread ->
  m ()
logThreadStarted :: forall (m :: * -> *).
MonadIO m =>
Logger Hasura
-> InstanceId -> SchemaSyncThreadType -> Thread -> m ()
logThreadStarted Logger Hasura
logger InstanceId
instanceId SchemaSyncThreadType
threadType Thread
thread =
  let msg :: Text
msg = SchemaSyncThreadType -> Text
forall a. Show a => a -> Text
tshow SchemaSyncThreadType
threadType Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" thread started"
   in Logger Hasura
-> forall a (m :: * -> *).
   (ToEngineLog a Hasura, MonadIO m) =>
   a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
   (ToEngineLog a impl, MonadIO m) =>
   a -> m ()
unLogger Logger Hasura
logger
        (StartupLog -> m ()) -> StartupLog -> m ()
forall a b. (a -> b) -> a -> b
$ LogLevel -> Text -> Value -> StartupLog
StartupLog LogLevel
LevelInfo Text
"schema-sync"
        (Value -> StartupLog) -> Value -> StartupLog
forall a b. (a -> b) -> a -> b
$ [Pair] -> Value
object
          [ Key
"instance_id" Key -> Text -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
.= InstanceId -> Text
getInstanceId InstanceId
instanceId,
            Key
"thread_id" Key -> String -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
.= ThreadId -> String
forall a. Show a => a -> String
show (Thread -> ThreadId
Immortal.threadId Thread
thread),
            Key
"message" Key -> Text -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
.= Text
msg
          ]

{- Note [Schema Cache Sync]
~~~~~~~~~~~~~~~~~~~~~~~~~~~

When multiple graphql-engine instances are serving on same metadata storage,
each instance should have schema cache in sync with latest metadata. Somehow
all instances should communicate each other when any request has modified metadata.

We track the metadata schema version in postgres and poll for this
value in a thread.  When the schema version has changed, the instance
will update its local metadata schema and remove any invalidated schema cache data.

The following steps take place when an API request made to update metadata:

1. After handling the request we insert the new metadata schema json
   into a postgres tablealong with a schema version.

2. On start up, before initialising schema cache, an async thread is
   invoked to continuously poll the Postgres notifications table for
   the latest metadata schema version. The schema version is pushed to
   a shared `TMVar`.

3. Before starting API server, another async thread is invoked to
   process events pushed by the listener thread via the `TMVar`. If
   the instance's schema version is not current with the freshly
   updated TMVar version then we update the local metadata.

Why we need two threads if we can capture and reload schema cache in a single thread?

If we want to implement schema sync in a single async thread we have to invoke the same
after initialising schema cache. We may loose events that published after schema cache
init and before invoking the thread. In such case, schema cache is not in sync with metadata.
So we choose two threads in which one will start listening before schema cache init and the
other after it.

What happens if listen connection to Postgres is lost?

Listener thread will keep trying to establish connection to Postgres for every one second.
Once connection established, it pushes @'SSEListenStart' event with time. We aren't sure
about any metadata modify requests made in meanwhile. So we reload schema cache unconditionally
if listen started after schema cache init start time.

-}

-- | An async thread which listen to Postgres notify to enable schema syncing
-- See Note [Schema Cache Sync]
startSchemaSyncListenerThread ::
  (C.ForkableMonadIO m) =>
  Logger Hasura ->
  PG.PGPool ->
  InstanceId ->
  Refined NonNegative Milliseconds ->
  STM.TMVar MetadataResourceVersion ->
  ManagedT m (Immortal.Thread)
startSchemaSyncListenerThread :: forall (m :: * -> *).
ForkableMonadIO m =>
Logger Hasura
-> PGPool
-> InstanceId
-> Refined NonNegative Milliseconds
-> TMVar MetadataResourceVersion
-> ManagedT m Thread
startSchemaSyncListenerThread Logger Hasura
logger PGPool
pool InstanceId
instanceId Refined NonNegative Milliseconds
interval TMVar MetadataResourceVersion
metaVersionRef = do
  -- Start listener thread
  Thread
listenerThread <-
    String -> Logger Hasura -> m Void -> ManagedT m Thread
forall (m :: * -> *).
ForkableMonadIO m =>
String -> Logger Hasura -> m Void -> ManagedT m Thread
C.forkManagedT String
"SchemeUpdate.listener" Logger Hasura
logger
      (m Void -> ManagedT m Thread) -> m Void -> ManagedT m Thread
forall a b. (a -> b) -> a -> b
$ Logger Hasura
-> PGPool
-> TMVar MetadataResourceVersion
-> Milliseconds
-> m Void
forall (m :: * -> *) void.
MonadIO m =>
Logger Hasura
-> PGPool
-> TMVar MetadataResourceVersion
-> Milliseconds
-> m void
listener Logger Hasura
logger PGPool
pool TMVar MetadataResourceVersion
metaVersionRef (Refined NonNegative Milliseconds -> Milliseconds
forall {k} (p :: k) x. Refined p x -> x
unrefine Refined NonNegative Milliseconds
interval)
  Logger Hasura
-> InstanceId -> SchemaSyncThreadType -> Thread -> ManagedT m ()
forall (m :: * -> *).
MonadIO m =>
Logger Hasura
-> InstanceId -> SchemaSyncThreadType -> Thread -> m ()
logThreadStarted Logger Hasura
logger InstanceId
instanceId SchemaSyncThreadType
TTListener Thread
listenerThread
  Thread -> ManagedT m Thread
forall a. a -> ManagedT m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Thread
listenerThread

-- | An async thread which processes the schema sync events
-- See Note [Schema Cache Sync]
startSchemaSyncProcessorThread ::
  ( C.ForkableMonadIO m,
    HasAppEnv m,
    HasCacheStaticConfig m,
    MonadMetadataStorage m,
    MonadResolveSource m,
    ProvidesNetwork m
  ) =>
  AppStateRef impl ->
  STM.TVar Bool ->
  ManagedT m Immortal.Thread
startSchemaSyncProcessorThread :: forall (m :: * -> *) impl.
(ForkableMonadIO m, HasAppEnv m, HasCacheStaticConfig m,
 MonadMetadataStorage m, MonadResolveSource m, ProvidesNetwork m) =>
AppStateRef impl -> TVar Bool -> ManagedT m Thread
startSchemaSyncProcessorThread AppStateRef impl
appStateRef TVar Bool
logTVar = do
  AppEnv {Int
Maybe Text
Maybe PGPool
Maybe (CredentialCache AgentLicenseKey)
SamplingPolicy
HostPreference
Manager
TxIsolation
ConnParams
PGPool
Refined NonNegative Seconds
TMVar MetadataResourceVersion
ConnectionOptions
CheckFeatureFlag
ServerMetrics
EventingMode
ReadOnlyMode
MaintenanceMode ()
InstanceId
PrometheusMetrics
ShutdownLatch
LoggingSettings
LockedEventsCtx
WSConnectionInitTimeout
KeepAliveDelay
OptionalInterval
Port
SubscriptionsState
Loggers
appEnvPort :: Port
appEnvHost :: HostPreference
appEnvMetadataDbPool :: PGPool
appEnvIntrospectionDbPool :: Maybe PGPool
appEnvManager :: Manager
appEnvLoggers :: Loggers
appEnvMetadataVersionRef :: TMVar MetadataResourceVersion
appEnvInstanceId :: InstanceId
appEnvEnableMaintenanceMode :: MaintenanceMode ()
appEnvLoggingSettings :: LoggingSettings
appEnvEventingMode :: EventingMode
appEnvEnableReadOnlyMode :: ReadOnlyMode
appEnvServerMetrics :: ServerMetrics
appEnvShutdownLatch :: ShutdownLatch
appEnvMetaVersionRef :: TMVar MetadataResourceVersion
appEnvPrometheusMetrics :: PrometheusMetrics
appEnvTraceSamplingPolicy :: SamplingPolicy
appEnvSubscriptionState :: SubscriptionsState
appEnvLockedEventsCtx :: LockedEventsCtx
appEnvConnParams :: ConnParams
appEnvTxIso :: TxIsolation
appEnvConsoleAssetsDir :: Maybe Text
appEnvConsoleSentryDsn :: Maybe Text
appEnvConnectionOptions :: ConnectionOptions
appEnvWebSocketKeepAlive :: KeepAliveDelay
appEnvWebSocketConnectionInitTimeout :: WSConnectionInitTimeout
appEnvGracefulShutdownTimeout :: Refined NonNegative Seconds
appEnvSchemaPollInterval :: OptionalInterval
appEnvCheckFeatureFlag :: CheckFeatureFlag
appEnvLicenseKeyCache :: Maybe (CredentialCache AgentLicenseKey)
appEnvMaxTotalHeaderLength :: Int
appEnvPort :: AppEnv -> Port
appEnvHost :: AppEnv -> HostPreference
appEnvMetadataDbPool :: AppEnv -> PGPool
appEnvIntrospectionDbPool :: AppEnv -> Maybe PGPool
appEnvManager :: AppEnv -> Manager
appEnvLoggers :: AppEnv -> Loggers
appEnvMetadataVersionRef :: AppEnv -> TMVar MetadataResourceVersion
appEnvInstanceId :: AppEnv -> InstanceId
appEnvEnableMaintenanceMode :: AppEnv -> MaintenanceMode ()
appEnvLoggingSettings :: AppEnv -> LoggingSettings
appEnvEventingMode :: AppEnv -> EventingMode
appEnvEnableReadOnlyMode :: AppEnv -> ReadOnlyMode
appEnvServerMetrics :: AppEnv -> ServerMetrics
appEnvShutdownLatch :: AppEnv -> ShutdownLatch
appEnvMetaVersionRef :: AppEnv -> TMVar MetadataResourceVersion
appEnvPrometheusMetrics :: AppEnv -> PrometheusMetrics
appEnvTraceSamplingPolicy :: AppEnv -> SamplingPolicy
appEnvSubscriptionState :: AppEnv -> SubscriptionsState
appEnvLockedEventsCtx :: AppEnv -> LockedEventsCtx
appEnvConnParams :: AppEnv -> ConnParams
appEnvTxIso :: AppEnv -> TxIsolation
appEnvConsoleAssetsDir :: AppEnv -> Maybe Text
appEnvConsoleSentryDsn :: AppEnv -> Maybe Text
appEnvConnectionOptions :: AppEnv -> ConnectionOptions
appEnvWebSocketKeepAlive :: AppEnv -> KeepAliveDelay
appEnvWebSocketConnectionInitTimeout :: AppEnv -> WSConnectionInitTimeout
appEnvGracefulShutdownTimeout :: AppEnv -> Refined NonNegative Seconds
appEnvSchemaPollInterval :: AppEnv -> OptionalInterval
appEnvCheckFeatureFlag :: AppEnv -> CheckFeatureFlag
appEnvLicenseKeyCache :: AppEnv -> Maybe (CredentialCache AgentLicenseKey)
appEnvMaxTotalHeaderLength :: AppEnv -> Int
..} <- m AppEnv -> ManagedT m AppEnv
forall (m :: * -> *) a. Monad m => m a -> ManagedT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift m AppEnv
forall (m :: * -> *). HasAppEnv m => m AppEnv
askAppEnv
  let logger :: Logger Hasura
logger = Loggers -> Logger Hasura
_lsLogger Loggers
appEnvLoggers
  -- Start processor thread
  Thread
processorThread <-
    String -> Logger Hasura -> m Void -> ManagedT m Thread
forall (m :: * -> *).
ForkableMonadIO m =>
String -> Logger Hasura -> m Void -> ManagedT m Thread
C.forkManagedT String
"SchemeUpdate.processor" Logger Hasura
logger
      (m Void -> ManagedT m Thread) -> m Void -> ManagedT m Thread
forall a b. (a -> b) -> a -> b
$ TMVar MetadataResourceVersion
-> AppStateRef impl -> TVar Bool -> m Void
forall (m :: * -> *) void impl.
(ForkableMonadIO m, HasAppEnv m, HasCacheStaticConfig m,
 MonadMetadataStorage m, MonadResolveSource m, ProvidesNetwork m) =>
TMVar MetadataResourceVersion
-> AppStateRef impl -> TVar Bool -> m void
processor TMVar MetadataResourceVersion
appEnvMetadataVersionRef AppStateRef impl
appStateRef TVar Bool
logTVar
  Logger Hasura
-> InstanceId -> SchemaSyncThreadType -> Thread -> ManagedT m ()
forall (m :: * -> *).
MonadIO m =>
Logger Hasura
-> InstanceId -> SchemaSyncThreadType -> Thread -> m ()
logThreadStarted Logger Hasura
logger InstanceId
appEnvInstanceId SchemaSyncThreadType
TTProcessor Thread
processorThread
  Thread -> ManagedT m Thread
forall a. a -> ManagedT m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Thread
processorThread

-- TODO: This is also defined in multitenant, consider putting it in a library somewhere
forcePut :: STM.TMVar a -> a -> IO ()
forcePut :: forall a. TMVar a -> a -> IO ()
forcePut TMVar a
v a
a = STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar a -> STM (Maybe a)
forall a. TMVar a -> STM (Maybe a)
STM.tryTakeTMVar TMVar a
v STM (Maybe a) -> STM () -> STM ()
forall a b. STM a -> STM b -> STM b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> TMVar a -> a -> STM ()
forall a. TMVar a -> a -> STM ()
STM.putTMVar TMVar a
v a
a

schemaVersionCheckHandler ::
  PG.PGPool -> STM.TMVar MetadataResourceVersion -> IO (Either QErr ())
schemaVersionCheckHandler :: PGPool -> TMVar MetadataResourceVersion -> IO (Either QErr ())
schemaVersionCheckHandler PGPool
pool TMVar MetadataResourceVersion
metaVersionRef =
  ExceptT QErr IO MetadataResourceVersion
-> IO (Either QErr MetadataResourceVersion)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT
    ( PGPool
-> TxMode
-> TxET QErr IO MetadataResourceVersion
-> ExceptT QErr IO MetadataResourceVersion
forall (m :: * -> *) e a.
(MonadIO m, MonadBaseControl IO m, FromPGTxErr e,
 FromPGConnErr e) =>
PGPool -> TxMode -> TxET e m a -> ExceptT e m a
PG.runTx PGPool
pool (TxIsolation
PG.RepeatableRead, Maybe TxAccess
forall a. Maybe a
Nothing)
        (TxET QErr IO MetadataResourceVersion
 -> ExceptT QErr IO MetadataResourceVersion)
-> TxET QErr IO MetadataResourceVersion
-> ExceptT QErr IO MetadataResourceVersion
forall a b. (a -> b) -> a -> b
$ TxET QErr IO MetadataResourceVersion
fetchMetadataResourceVersionFromCatalog
    )
    IO (Either QErr MetadataResourceVersion)
-> (Either QErr MetadataResourceVersion -> IO (Either QErr ()))
-> IO (Either QErr ())
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      Right MetadataResourceVersion
version -> () -> Either QErr ()
forall a b. b -> Either a b
Right (() -> Either QErr ()) -> IO () -> IO (Either QErr ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TMVar MetadataResourceVersion -> MetadataResourceVersion -> IO ()
forall a. TMVar a -> a -> IO ()
forcePut TMVar MetadataResourceVersion
metaVersionRef MetadataResourceVersion
version
      Left QErr
err -> Either QErr () -> IO (Either QErr ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either QErr () -> IO (Either QErr ()))
-> Either QErr () -> IO (Either QErr ())
forall a b. (a -> b) -> a -> b
$ QErr -> Either QErr ()
forall a b. a -> Either a b
Left QErr
err

data ErrorState = ErrorState
  { ErrorState -> Maybe QErr
_esLastErrorSeen :: !(Maybe QErr),
    ErrorState -> Maybe MetadataResourceVersion
_esLastMetadataVersion :: !(Maybe MetadataResourceVersion)
  }
  deriving (ErrorState -> ErrorState -> Bool
(ErrorState -> ErrorState -> Bool)
-> (ErrorState -> ErrorState -> Bool) -> Eq ErrorState
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ErrorState -> ErrorState -> Bool
== :: ErrorState -> ErrorState -> Bool
$c/= :: ErrorState -> ErrorState -> Bool
/= :: ErrorState -> ErrorState -> Bool
Eq)

-- NOTE: The ErrorState type is to be used mainly for the `listener` method below.
--       This will help prevent logging the same error with the same MetadataResourceVersion
--       multiple times consecutively. When the `listener` is in ErrorState we don't log the
--       next error until the resource version has changed/updated.

defaultErrorState :: ErrorState
defaultErrorState :: ErrorState
defaultErrorState = Maybe QErr -> Maybe MetadataResourceVersion -> ErrorState
ErrorState Maybe QErr
forall a. Maybe a
Nothing Maybe MetadataResourceVersion
forall a. Maybe a
Nothing

-- | NOTE: this can be updated to use lenses
updateErrorInState :: ErrorState -> QErr -> MetadataResourceVersion -> ErrorState
updateErrorInState :: ErrorState -> QErr -> MetadataResourceVersion -> ErrorState
updateErrorInState ErrorState
es QErr
qerr MetadataResourceVersion
mrv =
  ErrorState
es
    { _esLastErrorSeen :: Maybe QErr
_esLastErrorSeen = QErr -> Maybe QErr
forall a. a -> Maybe a
Just QErr
qerr,
      _esLastMetadataVersion :: Maybe MetadataResourceVersion
_esLastMetadataVersion = MetadataResourceVersion -> Maybe MetadataResourceVersion
forall a. a -> Maybe a
Just MetadataResourceVersion
mrv
    }

isInErrorState :: ErrorState -> Bool
isInErrorState :: ErrorState -> Bool
isInErrorState ErrorState
es =
  (Maybe QErr -> Bool
forall a. Maybe a -> Bool
isJust (Maybe QErr -> Bool)
-> (ErrorState -> Maybe QErr) -> ErrorState -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ErrorState -> Maybe QErr
_esLastErrorSeen) ErrorState
es Bool -> Bool -> Bool
&& (Maybe MetadataResourceVersion -> Bool
forall a. Maybe a -> Bool
isJust (Maybe MetadataResourceVersion -> Bool)
-> (ErrorState -> Maybe MetadataResourceVersion)
-> ErrorState
-> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ErrorState -> Maybe MetadataResourceVersion
_esLastMetadataVersion) ErrorState
es

toLogError :: ErrorState -> QErr -> MetadataResourceVersion -> Bool
toLogError :: ErrorState -> QErr -> MetadataResourceVersion -> Bool
toLogError ErrorState
es QErr
qerr MetadataResourceVersion
mrv = Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ Bool
isQErrLastSeen Bool -> Bool -> Bool
|| Bool
isMetadataResourceVersionLastSeen
  where
    isQErrLastSeen :: Bool
isQErrLastSeen = case ErrorState -> Maybe QErr
_esLastErrorSeen ErrorState
es of
      Just QErr
lErrS -> QErr
lErrS QErr -> QErr -> Bool
forall a. Eq a => a -> a -> Bool
== QErr
qerr
      Maybe QErr
Nothing -> Bool
False

    isMetadataResourceVersionLastSeen :: Bool
isMetadataResourceVersionLastSeen = case ErrorState -> Maybe MetadataResourceVersion
_esLastMetadataVersion ErrorState
es of
      Just MetadataResourceVersion
lMRV -> MetadataResourceVersion
lMRV MetadataResourceVersion -> MetadataResourceVersion -> Bool
forall a. Eq a => a -> a -> Bool
== MetadataResourceVersion
mrv
      Maybe MetadataResourceVersion
Nothing -> Bool
False

-- | An IO action that listens to postgres for events and pushes them to a Queue, in a loop forever.
listener ::
  (MonadIO m) =>
  Logger Hasura ->
  PG.PGPool ->
  STM.TMVar MetadataResourceVersion ->
  Milliseconds ->
  m void
listener :: forall (m :: * -> *) void.
MonadIO m =>
Logger Hasura
-> PGPool
-> TMVar MetadataResourceVersion
-> Milliseconds
-> m void
listener Logger Hasura
logger PGPool
pool TMVar MetadataResourceVersion
metaVersionRef Milliseconds
interval = (ErrorState -> m ErrorState) -> ErrorState -> m void
forall (m :: * -> *) a b. Monad m => (a -> m a) -> a -> m b
L.iterateM_ ErrorState -> m ErrorState
listenerLoop ErrorState
defaultErrorState
  where
    listenerLoop :: ErrorState -> m ErrorState
listenerLoop ErrorState
errorState = do
      Maybe MetadataResourceVersion
mrv <- IO (Maybe MetadataResourceVersion)
-> m (Maybe MetadataResourceVersion)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe MetadataResourceVersion)
 -> m (Maybe MetadataResourceVersion))
-> IO (Maybe MetadataResourceVersion)
-> m (Maybe MetadataResourceVersion)
forall a b. (a -> b) -> a -> b
$ STM (Maybe MetadataResourceVersion)
-> IO (Maybe MetadataResourceVersion)
forall a. STM a -> IO a
STM.atomically (STM (Maybe MetadataResourceVersion)
 -> IO (Maybe MetadataResourceVersion))
-> STM (Maybe MetadataResourceVersion)
-> IO (Maybe MetadataResourceVersion)
forall a b. (a -> b) -> a -> b
$ TMVar MetadataResourceVersion
-> STM (Maybe MetadataResourceVersion)
forall a. TMVar a -> STM (Maybe a)
STM.tryTakeTMVar TMVar MetadataResourceVersion
metaVersionRef
      Either QErr ()
resp <- IO (Either QErr ()) -> m (Either QErr ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either QErr ()) -> m (Either QErr ()))
-> IO (Either QErr ()) -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$ PGPool -> TMVar MetadataResourceVersion -> IO (Either QErr ())
schemaVersionCheckHandler PGPool
pool TMVar MetadataResourceVersion
metaVersionRef
      let metadataVersion :: MetadataResourceVersion
metadataVersion = MetadataResourceVersion
-> Maybe MetadataResourceVersion -> MetadataResourceVersion
forall a. a -> Maybe a -> a
fromMaybe MetadataResourceVersion
initialResourceVersion Maybe MetadataResourceVersion
mrv
      ErrorState
nextErr <- case Either QErr ()
resp of
        Left QErr
respErr -> do
          if (ErrorState -> QErr -> MetadataResourceVersion -> Bool
toLogError ErrorState
errorState QErr
respErr MetadataResourceVersion
metadataVersion)
            then do
              Logger Hasura -> SchemaSyncThreadType -> ThreadError -> m ()
forall (m :: * -> *) a.
(MonadIO m, ToJSON a) =>
Logger Hasura -> SchemaSyncThreadType -> a -> m ()
logError Logger Hasura
logger SchemaSyncThreadType
TTListener (ThreadError -> m ()) -> ThreadError -> m ()
forall a b. (a -> b) -> a -> b
$ QErr -> ThreadError
TEQueryError QErr
respErr
              Logger Hasura -> SchemaSyncThreadType -> Value -> m ()
forall (m :: * -> *).
MonadIO m =>
Logger Hasura -> SchemaSyncThreadType -> Value -> m ()
logInfo Logger Hasura
logger SchemaSyncThreadType
TTListener (Value -> m ()) -> Value -> m ()
forall a b. (a -> b) -> a -> b
$ [Pair] -> Value
object [Key
"metadataResourceVersion" Key -> Value -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
.= MetadataResourceVersion -> Value
forall a. ToJSON a => a -> Value
toJSON MetadataResourceVersion
metadataVersion]
              ErrorState -> m ErrorState
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ErrorState -> m ErrorState) -> ErrorState -> m ErrorState
forall a b. (a -> b) -> a -> b
$ ErrorState -> QErr -> MetadataResourceVersion -> ErrorState
updateErrorInState ErrorState
errorState QErr
respErr MetadataResourceVersion
metadataVersion
            else do
              ErrorState -> m ErrorState
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ErrorState
errorState
        Right ()
_ -> do
          Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ErrorState -> Bool
isInErrorState ErrorState
errorState)
            (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ Logger Hasura -> SchemaSyncThreadType -> Value -> m ()
forall (m :: * -> *).
MonadIO m =>
Logger Hasura -> SchemaSyncThreadType -> Value -> m ()
logInfo Logger Hasura
logger SchemaSyncThreadType
TTListener
            (Value -> m ()) -> Value -> m ()
forall a b. (a -> b) -> a -> b
$ [Pair] -> Value
object [Key
"message" Key -> Text -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
.= (Text
"SchemaSync Restored..." :: Text)]
          ErrorState -> m ErrorState
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ErrorState
defaultErrorState
      IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ DiffTime -> IO ()
C.sleep (DiffTime -> IO ()) -> DiffTime -> IO ()
forall a b. (a -> b) -> a -> b
$ Milliseconds -> DiffTime
milliseconds Milliseconds
interval
      ErrorState -> m ErrorState
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ErrorState
nextErr

-- | An IO action that processes events from Queue, in a loop forever.
processor ::
  forall m void impl.
  ( C.ForkableMonadIO m,
    HasAppEnv m,
    HasCacheStaticConfig m,
    MonadMetadataStorage m,
    MonadResolveSource m,
    ProvidesNetwork m
  ) =>
  STM.TMVar MetadataResourceVersion ->
  AppStateRef impl ->
  STM.TVar Bool ->
  m void
processor :: forall (m :: * -> *) void impl.
(ForkableMonadIO m, HasAppEnv m, HasCacheStaticConfig m,
 MonadMetadataStorage m, MonadResolveSource m, ProvidesNetwork m) =>
TMVar MetadataResourceVersion
-> AppStateRef impl -> TVar Bool -> m void
processor
  TMVar MetadataResourceVersion
metaVersionRef
  AppStateRef impl
appStateRef
  TVar Bool
logTVar = m () -> m void
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever do
    MetadataResourceVersion
metaVersion <- IO MetadataResourceVersion -> m MetadataResourceVersion
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO MetadataResourceVersion -> m MetadataResourceVersion)
-> IO MetadataResourceVersion -> m MetadataResourceVersion
forall a b. (a -> b) -> a -> b
$ STM MetadataResourceVersion -> IO MetadataResourceVersion
forall a. STM a -> IO a
STM.atomically (STM MetadataResourceVersion -> IO MetadataResourceVersion)
-> STM MetadataResourceVersion -> IO MetadataResourceVersion
forall a b. (a -> b) -> a -> b
$ TMVar MetadataResourceVersion -> STM MetadataResourceVersion
forall a. TMVar a -> STM a
STM.takeTMVar TMVar MetadataResourceVersion
metaVersionRef
    MetadataResourceVersion
-> AppStateRef impl -> SchemaSyncThreadType -> TVar Bool -> m ()
forall (m :: * -> *) impl.
(MonadIO m, MonadBaseControl IO m, HasAppEnv m,
 HasCacheStaticConfig m, MonadMetadataStorage m,
 MonadResolveSource m, ProvidesNetwork m) =>
MetadataResourceVersion
-> AppStateRef impl -> SchemaSyncThreadType -> TVar Bool -> m ()
refreshSchemaCache MetadataResourceVersion
metaVersion AppStateRef impl
appStateRef SchemaSyncThreadType
TTProcessor TVar Bool
logTVar

refreshSchemaCache ::
  ( MonadIO m,
    MonadBaseControl IO m,
    HasAppEnv m,
    HasCacheStaticConfig m,
    MonadMetadataStorage m,
    MonadResolveSource m,
    ProvidesNetwork m
  ) =>
  MetadataResourceVersion ->
  AppStateRef impl ->
  SchemaSyncThreadType ->
  STM.TVar Bool ->
  m ()
refreshSchemaCache :: forall (m :: * -> *) impl.
(MonadIO m, MonadBaseControl IO m, HasAppEnv m,
 HasCacheStaticConfig m, MonadMetadataStorage m,
 MonadResolveSource m, ProvidesNetwork m) =>
MetadataResourceVersion
-> AppStateRef impl -> SchemaSyncThreadType -> TVar Bool -> m ()
refreshSchemaCache
  MetadataResourceVersion
resourceVersion
  AppStateRef impl
appStateRef
  SchemaSyncThreadType
threadType
  TVar Bool
logTVar = do
    AppEnv {Int
Maybe Text
Maybe PGPool
Maybe (CredentialCache AgentLicenseKey)
SamplingPolicy
HostPreference
Manager
TxIsolation
ConnParams
PGPool
Refined NonNegative Seconds
TMVar MetadataResourceVersion
ConnectionOptions
CheckFeatureFlag
ServerMetrics
EventingMode
ReadOnlyMode
MaintenanceMode ()
InstanceId
PrometheusMetrics
ShutdownLatch
LoggingSettings
LockedEventsCtx
WSConnectionInitTimeout
KeepAliveDelay
OptionalInterval
Port
SubscriptionsState
Loggers
appEnvPort :: AppEnv -> Port
appEnvHost :: AppEnv -> HostPreference
appEnvMetadataDbPool :: AppEnv -> PGPool
appEnvIntrospectionDbPool :: AppEnv -> Maybe PGPool
appEnvManager :: AppEnv -> Manager
appEnvLoggers :: AppEnv -> Loggers
appEnvMetadataVersionRef :: AppEnv -> TMVar MetadataResourceVersion
appEnvInstanceId :: AppEnv -> InstanceId
appEnvEnableMaintenanceMode :: AppEnv -> MaintenanceMode ()
appEnvLoggingSettings :: AppEnv -> LoggingSettings
appEnvEventingMode :: AppEnv -> EventingMode
appEnvEnableReadOnlyMode :: AppEnv -> ReadOnlyMode
appEnvServerMetrics :: AppEnv -> ServerMetrics
appEnvShutdownLatch :: AppEnv -> ShutdownLatch
appEnvMetaVersionRef :: AppEnv -> TMVar MetadataResourceVersion
appEnvPrometheusMetrics :: AppEnv -> PrometheusMetrics
appEnvTraceSamplingPolicy :: AppEnv -> SamplingPolicy
appEnvSubscriptionState :: AppEnv -> SubscriptionsState
appEnvLockedEventsCtx :: AppEnv -> LockedEventsCtx
appEnvConnParams :: AppEnv -> ConnParams
appEnvTxIso :: AppEnv -> TxIsolation
appEnvConsoleAssetsDir :: AppEnv -> Maybe Text
appEnvConsoleSentryDsn :: AppEnv -> Maybe Text
appEnvConnectionOptions :: AppEnv -> ConnectionOptions
appEnvWebSocketKeepAlive :: AppEnv -> KeepAliveDelay
appEnvWebSocketConnectionInitTimeout :: AppEnv -> WSConnectionInitTimeout
appEnvGracefulShutdownTimeout :: AppEnv -> Refined NonNegative Seconds
appEnvSchemaPollInterval :: AppEnv -> OptionalInterval
appEnvCheckFeatureFlag :: AppEnv -> CheckFeatureFlag
appEnvLicenseKeyCache :: AppEnv -> Maybe (CredentialCache AgentLicenseKey)
appEnvMaxTotalHeaderLength :: AppEnv -> Int
appEnvPort :: Port
appEnvHost :: HostPreference
appEnvMetadataDbPool :: PGPool
appEnvIntrospectionDbPool :: Maybe PGPool
appEnvManager :: Manager
appEnvLoggers :: Loggers
appEnvMetadataVersionRef :: TMVar MetadataResourceVersion
appEnvInstanceId :: InstanceId
appEnvEnableMaintenanceMode :: MaintenanceMode ()
appEnvLoggingSettings :: LoggingSettings
appEnvEventingMode :: EventingMode
appEnvEnableReadOnlyMode :: ReadOnlyMode
appEnvServerMetrics :: ServerMetrics
appEnvShutdownLatch :: ShutdownLatch
appEnvMetaVersionRef :: TMVar MetadataResourceVersion
appEnvPrometheusMetrics :: PrometheusMetrics
appEnvTraceSamplingPolicy :: SamplingPolicy
appEnvSubscriptionState :: SubscriptionsState
appEnvLockedEventsCtx :: LockedEventsCtx
appEnvConnParams :: ConnParams
appEnvTxIso :: TxIsolation
appEnvConsoleAssetsDir :: Maybe Text
appEnvConsoleSentryDsn :: Maybe Text
appEnvConnectionOptions :: ConnectionOptions
appEnvWebSocketKeepAlive :: KeepAliveDelay
appEnvWebSocketConnectionInitTimeout :: WSConnectionInitTimeout
appEnvGracefulShutdownTimeout :: Refined NonNegative Seconds
appEnvSchemaPollInterval :: OptionalInterval
appEnvCheckFeatureFlag :: CheckFeatureFlag
appEnvLicenseKeyCache :: Maybe (CredentialCache AgentLicenseKey)
appEnvMaxTotalHeaderLength :: Int
..} <- m AppEnv
forall (m :: * -> *). HasAppEnv m => m AppEnv
askAppEnv
    let logger :: Logger Hasura
logger = Loggers -> Logger Hasura
_lsLogger Loggers
appEnvLoggers
    Either QErr ()
respErr <- ExceptT QErr m () -> m (Either QErr ())
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT
      (ExceptT QErr m () -> m (Either QErr ()))
-> ExceptT QErr m () -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$ AppStateRef impl
-> Logger Hasura
-> Maybe (TVar Bool)
-> ExceptT QErr m ((), RebuildableSchemaCache)
-> ExceptT QErr m ()
forall (m :: * -> *) impl a.
(MonadIO m, MonadBaseControl IO m, MonadError QErr m) =>
AppStateRef impl
-> Logger Hasura
-> Maybe (TVar Bool)
-> m (a, RebuildableSchemaCache)
-> m a
withSchemaCacheUpdate AppStateRef impl
appStateRef Logger Hasura
logger (TVar Bool -> Maybe (TVar Bool)
forall a. a -> Maybe a
Just TVar Bool
logTVar)
      (ExceptT QErr m ((), RebuildableSchemaCache) -> ExceptT QErr m ())
-> ExceptT QErr m ((), RebuildableSchemaCache) -> ExceptT QErr m ()
forall a b. (a -> b) -> a -> b
$ do
        RebuildableSchemaCache
rebuildableCache <- IO RebuildableSchemaCache -> ExceptT QErr m RebuildableSchemaCache
forall a. IO a -> ExceptT QErr m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO RebuildableSchemaCache
 -> ExceptT QErr m RebuildableSchemaCache)
-> IO RebuildableSchemaCache
-> ExceptT QErr m RebuildableSchemaCache
forall a b. (a -> b) -> a -> b
$ AppStateRef impl -> IO RebuildableSchemaCache
forall impl. AppStateRef impl -> IO RebuildableSchemaCache
getRebuildableSchemaCacheWithVersion AppStateRef impl
appStateRef
        AppContext
appContext <- IO AppContext -> ExceptT QErr m AppContext
forall a. IO a -> ExceptT QErr m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO AppContext -> ExceptT QErr m AppContext)
-> IO AppContext -> ExceptT QErr m AppContext
forall a b. (a -> b) -> a -> b
$ AppStateRef impl -> IO AppContext
forall impl. AppStateRef impl -> IO AppContext
getAppContext AppStateRef impl
appStateRef
        let dynamicConfig :: CacheDynamicConfig
dynamicConfig = AppContext -> CacheDynamicConfig
buildCacheDynamicConfig AppContext
appContext
        -- the instance which triggered the schema sync event would have stored
        -- the source introspection, hence we can ignore it here
        (()
msg, RebuildableSchemaCache
cache, CacheInvalidations
_, SourcesIntrospectionStatus
_sourcesIntrospection, SchemaRegistryAction
_schemaRegistryAction) <-
          CacheDynamicConfig
-> RebuildableSchemaCache
-> CacheRWT (ExceptT QErr m) ()
-> ExceptT
     QErr
     m
     ((), RebuildableSchemaCache, CacheInvalidations,
      SourcesIntrospectionStatus, SchemaRegistryAction)
forall (m :: * -> *) a.
Monad m =>
CacheDynamicConfig
-> RebuildableSchemaCache
-> CacheRWT m a
-> m (a, RebuildableSchemaCache, CacheInvalidations,
      SourcesIntrospectionStatus, SchemaRegistryAction)
runCacheRWT CacheDynamicConfig
dynamicConfig RebuildableSchemaCache
rebuildableCache (CacheRWT (ExceptT QErr m) ()
 -> ExceptT
      QErr
      m
      ((), RebuildableSchemaCache, CacheInvalidations,
       SourcesIntrospectionStatus, SchemaRegistryAction))
-> CacheRWT (ExceptT QErr m) ()
-> ExceptT
     QErr
     m
     ((), RebuildableSchemaCache, CacheInvalidations,
      SourcesIntrospectionStatus, SchemaRegistryAction)
forall a b. (a -> b) -> a -> b
$ do
            SchemaCache
schemaCache <- CacheRWT (ExceptT QErr m) SchemaCache
forall (m :: * -> *). CacheRM m => m SchemaCache
askSchemaCache
            let engineResourceVersion :: MetadataResourceVersion
engineResourceVersion = SchemaCache -> MetadataResourceVersion
scMetadataResourceVersion SchemaCache
schemaCache
            Bool
-> CacheRWT (ExceptT QErr m) () -> CacheRWT (ExceptT QErr m) ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (MetadataResourceVersion
engineResourceVersion MetadataResourceVersion -> MetadataResourceVersion -> Bool
forall a. Eq a => a -> a -> Bool
== MetadataResourceVersion
resourceVersion) (CacheRWT (ExceptT QErr m) () -> CacheRWT (ExceptT QErr m) ())
-> CacheRWT (ExceptT QErr m) () -> CacheRWT (ExceptT QErr m) ()
forall a b. (a -> b) -> a -> b
$ do
              Logger Hasura
-> SchemaSyncThreadType -> Value -> CacheRWT (ExceptT QErr m) ()
forall (m :: * -> *).
MonadIO m =>
Logger Hasura -> SchemaSyncThreadType -> Value -> m ()
logInfo Logger Hasura
logger SchemaSyncThreadType
threadType
                (Value -> CacheRWT (ExceptT QErr m) ())
-> Value -> CacheRWT (ExceptT QErr m) ()
forall a b. (a -> b) -> a -> b
$ Text -> Value
String
                (Text -> Value) -> Text -> Value
forall a b. (a -> b) -> a -> b
$ [Text] -> Text
T.unwords
                  [ Text
"Received metadata resource version:",
                    MetadataResourceVersion -> Text
showMetadataResourceVersion MetadataResourceVersion
resourceVersion Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
",",
                    Text
"different from the current engine resource version:",
                    MetadataResourceVersion -> Text
showMetadataResourceVersion MetadataResourceVersion
engineResourceVersion Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
".",
                    Text
"Trying to update the schema cache."
                  ]

              MetadataWithResourceVersion Metadata
metadata MetadataResourceVersion
latestResourceVersion <- CacheRWT (ExceptT QErr m) (Either QErr MetadataWithResourceVersion)
-> CacheRWT (ExceptT QErr m) MetadataWithResourceVersion
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM CacheRWT (ExceptT QErr m) (Either QErr MetadataWithResourceVersion)
forall (m :: * -> *).
MonadMetadataStorage m =>
m (Either QErr MetadataWithResourceVersion)
fetchMetadata

              Logger Hasura
-> SchemaSyncThreadType -> Value -> CacheRWT (ExceptT QErr m) ()
forall (m :: * -> *).
MonadIO m =>
Logger Hasura -> SchemaSyncThreadType -> Value -> m ()
logInfo Logger Hasura
logger SchemaSyncThreadType
threadType
                (Value -> CacheRWT (ExceptT QErr m) ())
-> Value -> CacheRWT (ExceptT QErr m) ()
forall a b. (a -> b) -> a -> b
$ Text -> Value
String
                (Text -> Value) -> Text -> Value
forall a b. (a -> b) -> a -> b
$ [Text] -> Text
T.unwords
                  [ Text
"Fetched metadata with resource version:",
                    MetadataResourceVersion -> Text
showMetadataResourceVersion MetadataResourceVersion
latestResourceVersion
                  ]

              [(MetadataResourceVersion, CacheInvalidations)]
notifications <- CacheRWT
  (ExceptT QErr m)
  (Either QErr [(MetadataResourceVersion, CacheInvalidations)])
-> CacheRWT
     (ExceptT QErr m) [(MetadataResourceVersion, CacheInvalidations)]
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM (CacheRWT
   (ExceptT QErr m)
   (Either QErr [(MetadataResourceVersion, CacheInvalidations)])
 -> CacheRWT
      (ExceptT QErr m) [(MetadataResourceVersion, CacheInvalidations)])
-> CacheRWT
     (ExceptT QErr m)
     (Either QErr [(MetadataResourceVersion, CacheInvalidations)])
-> CacheRWT
     (ExceptT QErr m) [(MetadataResourceVersion, CacheInvalidations)]
forall a b. (a -> b) -> a -> b
$ MetadataResourceVersion
-> InstanceId
-> CacheRWT
     (ExceptT QErr m)
     (Either QErr [(MetadataResourceVersion, CacheInvalidations)])
forall (m :: * -> *).
MonadMetadataStorage m =>
MetadataResourceVersion
-> InstanceId
-> m (Either QErr [(MetadataResourceVersion, CacheInvalidations)])
fetchMetadataNotifications MetadataResourceVersion
engineResourceVersion InstanceId
appEnvInstanceId

              case [(MetadataResourceVersion, CacheInvalidations)]
notifications of
                [] -> do
                  Logger Hasura
-> SchemaSyncThreadType -> Value -> CacheRWT (ExceptT QErr m) ()
forall (m :: * -> *).
MonadIO m =>
Logger Hasura -> SchemaSyncThreadType -> Value -> m ()
logInfo Logger Hasura
logger SchemaSyncThreadType
threadType
                    (Value -> CacheRWT (ExceptT QErr m) ())
-> Value -> CacheRWT (ExceptT QErr m) ()
forall a b. (a -> b) -> a -> b
$ Text -> Value
String
                    (Text -> Value) -> Text -> Value
forall a b. (a -> b) -> a -> b
$ [Text] -> Text
T.unwords
                      [ Text
"Fetched metadata notifications and received no notifications. Not updating the schema cache.",
                        Text
"Only setting resource version:",
                        MetadataResourceVersion -> Text
showMetadataResourceVersion MetadataResourceVersion
latestResourceVersion,
                        Text
"in schema cache"
                      ]
                  MetadataResourceVersion -> CacheRWT (ExceptT QErr m) ()
forall (m :: * -> *). CacheRWM m => MetadataResourceVersion -> m ()
setMetadataResourceVersionInSchemaCache MetadataResourceVersion
latestResourceVersion
                [(MetadataResourceVersion, CacheInvalidations)]
_ -> do
                  Logger Hasura
-> SchemaSyncThreadType -> Value -> CacheRWT (ExceptT QErr m) ()
forall (m :: * -> *).
MonadIO m =>
Logger Hasura -> SchemaSyncThreadType -> Value -> m ()
logInfo Logger Hasura
logger SchemaSyncThreadType
threadType
                    (Value -> CacheRWT (ExceptT QErr m) ())
-> Value -> CacheRWT (ExceptT QErr m) ()
forall a b. (a -> b) -> a -> b
$ Text -> Value
String Text
"Fetched metadata notifications and received some notifications. Updating the schema cache."
                  let cacheInvalidations :: CacheInvalidations
cacheInvalidations =
                        if ((MetadataResourceVersion, CacheInvalidations) -> Bool)
-> [(MetadataResourceVersion, CacheInvalidations)] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
any ((MetadataResourceVersion -> MetadataResourceVersion -> Bool
forall a. Eq a => a -> a -> Bool
== (MetadataResourceVersion
engineResourceVersion MetadataResourceVersion
-> MetadataResourceVersion -> MetadataResourceVersion
forall a. Num a => a -> a -> a
+ MetadataResourceVersion
1)) (MetadataResourceVersion -> Bool)
-> ((MetadataResourceVersion, CacheInvalidations)
    -> MetadataResourceVersion)
-> (MetadataResourceVersion, CacheInvalidations)
-> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (MetadataResourceVersion, CacheInvalidations)
-> MetadataResourceVersion
forall a b. (a, b) -> a
fst) [(MetadataResourceVersion, CacheInvalidations)]
notifications
                          then -- If (engineResourceVersion + 1) is in the list of notifications then
                          -- we know that we haven't missed any.
                            [CacheInvalidations] -> CacheInvalidations
forall a. Monoid a => [a] -> a
mconcat ([CacheInvalidations] -> CacheInvalidations)
-> [CacheInvalidations] -> CacheInvalidations
forall a b. (a -> b) -> a -> b
$ (MetadataResourceVersion, CacheInvalidations) -> CacheInvalidations
forall a b. (a, b) -> b
snd ((MetadataResourceVersion, CacheInvalidations)
 -> CacheInvalidations)
-> [(MetadataResourceVersion, CacheInvalidations)]
-> [CacheInvalidations]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [(MetadataResourceVersion, CacheInvalidations)]
notifications
                          else -- Otherwise we may have missed some notifications so we need to invalidate the
                          -- whole cache.

                            CacheInvalidations
                              { ciMetadata :: Bool
ciMetadata = Bool
True,
                                ciRemoteSchemas :: HashSet RemoteSchemaName
ciRemoteSchemas = [RemoteSchemaName] -> HashSet RemoteSchemaName
forall a. (Eq a, Hashable a) => [a] -> HashSet a
HS.fromList ([RemoteSchemaName] -> HashSet RemoteSchemaName)
-> [RemoteSchemaName] -> HashSet RemoteSchemaName
forall a b. (a -> b) -> a -> b
$ SchemaCache -> [RemoteSchemaName]
getAllRemoteSchemas SchemaCache
schemaCache,
                                ciSources :: HashSet SourceName
ciSources = [SourceName] -> HashSet SourceName
forall a. (Eq a, Hashable a) => [a] -> HashSet a
HS.fromList ([SourceName] -> HashSet SourceName)
-> [SourceName] -> HashSet SourceName
forall a b. (a -> b) -> a -> b
$ HashMap SourceName BackendSourceInfo -> [SourceName]
forall k v. HashMap k v -> [k]
HashMap.keys (HashMap SourceName BackendSourceInfo -> [SourceName])
-> HashMap SourceName BackendSourceInfo -> [SourceName]
forall a b. (a -> b) -> a -> b
$ SchemaCache -> HashMap SourceName BackendSourceInfo
scSources SchemaCache
schemaCache,
                                ciDataConnectors :: HashSet DataConnectorName
ciDataConnectors =
                                  HashSet DataConnectorName
-> (BackendInfoWrapper 'DataConnector -> HashSet DataConnectorName)
-> Maybe (BackendInfoWrapper 'DataConnector)
-> HashSet DataConnectorName
forall b a. b -> (a -> b) -> Maybe a -> b
maybe HashSet DataConnectorName
forall a. Monoid a => a
mempty ([DataConnectorName] -> HashSet DataConnectorName
forall a. (Eq a, Hashable a) => [a] -> HashSet a
HS.fromList ([DataConnectorName] -> HashSet DataConnectorName)
-> (BackendInfoWrapper 'DataConnector -> [DataConnectorName])
-> BackendInfoWrapper 'DataConnector
-> HashSet DataConnectorName
forall b c a. (b -> c) -> (a -> b) -> a -> c
. HashMap DataConnectorName DataConnectorInfo -> [DataConnectorName]
forall k v. HashMap k v -> [k]
HashMap.keys (HashMap DataConnectorName DataConnectorInfo
 -> [DataConnectorName])
-> (BackendInfoWrapper 'DataConnector
    -> HashMap DataConnectorName DataConnectorInfo)
-> BackendInfoWrapper 'DataConnector
-> [DataConnectorName]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. BackendInfoWrapper 'DataConnector
-> HashMap DataConnectorName DataConnectorInfo
BackendInfoWrapper 'DataConnector -> BackendInfo 'DataConnector
forall (b :: BackendType). BackendInfoWrapper b -> BackendInfo b
unBackendInfoWrapper)
                                    (Maybe (BackendInfoWrapper 'DataConnector)
 -> HashSet DataConnectorName)
-> Maybe (BackendInfoWrapper 'DataConnector)
-> HashSet DataConnectorName
forall a b. (a -> b) -> a -> b
$ forall (b :: BackendType) (i :: BackendType -> *).
HasTag b =>
BackendMap i -> Maybe (i b)
BackendMap.lookup @'DataConnector
                                    (BackendMap BackendInfoWrapper
 -> Maybe (BackendInfoWrapper 'DataConnector))
-> BackendMap BackendInfoWrapper
-> Maybe (BackendInfoWrapper 'DataConnector)
forall a b. (a -> b) -> a -> b
$ SchemaCache -> BackendMap BackendInfoWrapper
scBackendCache SchemaCache
schemaCache
                              }
                  BuildReason
-> CacheInvalidations
-> Metadata
-> Maybe MetadataResourceVersion
-> CacheRWT (ExceptT QErr m) ()
forall (m :: * -> *).
CacheRWM m =>
BuildReason
-> CacheInvalidations
-> Metadata
-> Maybe MetadataResourceVersion
-> m ()
buildSchemaCacheWithOptions BuildReason
CatalogSync CacheInvalidations
cacheInvalidations Metadata
metadata (MetadataResourceVersion -> Maybe MetadataResourceVersion
forall a. a -> Maybe a
Just MetadataResourceVersion
latestResourceVersion)
                  MetadataResourceVersion -> CacheRWT (ExceptT QErr m) ()
forall (m :: * -> *). CacheRWM m => MetadataResourceVersion -> m ()
setMetadataResourceVersionInSchemaCache MetadataResourceVersion
latestResourceVersion
                  Logger Hasura
-> SchemaSyncThreadType -> Value -> CacheRWT (ExceptT QErr m) ()
forall (m :: * -> *).
MonadIO m =>
Logger Hasura -> SchemaSyncThreadType -> Value -> m ()
logInfo Logger Hasura
logger SchemaSyncThreadType
threadType
                    (Value -> CacheRWT (ExceptT QErr m) ())
-> Value -> CacheRWT (ExceptT QErr m) ()
forall a b. (a -> b) -> a -> b
$ Text -> Value
String
                    (Text -> Value) -> Text -> Value
forall a b. (a -> b) -> a -> b
$ Text
"Schema cache updated with resource version: "
                    Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> MetadataResourceVersion -> Text
showMetadataResourceVersion MetadataResourceVersion
latestResourceVersion
        ((), RebuildableSchemaCache)
-> ExceptT QErr m ((), RebuildableSchemaCache)
forall a. a -> ExceptT QErr m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (()
msg, RebuildableSchemaCache
cache)
    Either QErr () -> (QErr -> m ()) -> m ()
forall (m :: * -> *) e a.
Applicative m =>
Either e a -> (e -> m a) -> m a
onLeft Either QErr ()
respErr (Logger Hasura -> SchemaSyncThreadType -> ThreadError -> m ()
forall (m :: * -> *) a.
(MonadIO m, ToJSON a) =>
Logger Hasura -> SchemaSyncThreadType -> a -> m ()
logError Logger Hasura
logger SchemaSyncThreadType
threadType (ThreadError -> m ()) -> (QErr -> ThreadError) -> QErr -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. QErr -> ThreadError
TEQueryError)

logInfo :: (MonadIO m) => Logger Hasura -> SchemaSyncThreadType -> Value -> m ()
logInfo :: forall (m :: * -> *).
MonadIO m =>
Logger Hasura -> SchemaSyncThreadType -> Value -> m ()
logInfo Logger Hasura
logger SchemaSyncThreadType
threadType Value
val =
  Logger Hasura
-> forall a (m :: * -> *).
   (ToEngineLog a Hasura, MonadIO m) =>
   a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
   (ToEngineLog a impl, MonadIO m) =>
   a -> m ()
unLogger Logger Hasura
logger
    (SchemaSyncLog -> m ()) -> SchemaSyncLog -> m ()
forall a b. (a -> b) -> a -> b
$ LogLevel -> SchemaSyncThreadType -> Value -> SchemaSyncLog
SchemaSyncLog LogLevel
LevelInfo SchemaSyncThreadType
threadType Value
val

logError :: (MonadIO m, ToJSON a) => Logger Hasura -> SchemaSyncThreadType -> a -> m ()
logError :: forall (m :: * -> *) a.
(MonadIO m, ToJSON a) =>
Logger Hasura -> SchemaSyncThreadType -> a -> m ()
logError Logger Hasura
logger SchemaSyncThreadType
threadType a
err =
  Logger Hasura
-> forall a (m :: * -> *).
   (ToEngineLog a Hasura, MonadIO m) =>
   a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
   (ToEngineLog a impl, MonadIO m) =>
   a -> m ()
unLogger Logger Hasura
logger
    (SchemaSyncLog -> m ()) -> SchemaSyncLog -> m ()
forall a b. (a -> b) -> a -> b
$ LogLevel -> SchemaSyncThreadType -> Value -> SchemaSyncLog
SchemaSyncLog LogLevel
LevelError SchemaSyncThreadType
threadType
    (Value -> SchemaSyncLog) -> Value -> SchemaSyncLog
forall a b. (a -> b) -> a -> b
$ [Pair] -> Value
object [Key
"error" Key -> Value -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
.= a -> Value
forall a. ToJSON a => a -> Value
toJSON a
err]