{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE TemplateHaskell #-}

-- | Postgres DDL EventTrigger
--
-- Used for creating event triggers for metadata changes.
--
-- See 'Hasura.RQL.DDL.Schema.Cache' and 'Hasura.RQL.Types.Eventing.Backend'.
module Hasura.Backends.Postgres.DDL.EventTrigger
  ( insertManualEvent,
    redeliverEvent,
    dropTriggerAndArchiveEvents,
    createTableEventTrigger,
    createMissingSQLTriggers,
    dropTriggerQ,
    dropDanglingSQLTrigger,
    mkAllTriggersQ,
    getMaintenanceModeVersion,
    fetchUndeliveredEvents,
    setRetry,
    recordSuccess,
    recordError,
    recordError',
    unlockEventsInSource,
    updateColumnInEventTrigger,
    checkIfTriggerExists,
  )
where

import Control.Monad.Trans.Control (MonadBaseControl)
import Data.Aeson
import Data.FileEmbed (makeRelativeToProject)
import Data.HashSet qualified as HashSet
import Data.Int (Int64)
import Data.Set.NonEmpty qualified as NE
import Data.Text.Lazy qualified as TL
import Data.Time.Clock qualified as Time
import Database.PG.Query qualified as Q
import Hasura.Backends.Postgres.Connection
import Hasura.Backends.Postgres.SQL.DML
import Hasura.Backends.Postgres.SQL.Types hiding (TableName)
import Hasura.Backends.Postgres.Translate.Column
import Hasura.Base.Error
import Hasura.Prelude
import Hasura.RQL.Types.Backend (Backend, SourceConfig, TableName)
import Hasura.RQL.Types.Column
import Hasura.RQL.Types.Common
import Hasura.RQL.Types.EventTrigger
import Hasura.RQL.Types.Eventing
import Hasura.RQL.Types.Source
import Hasura.RQL.Types.Table (PrimaryKey)
import Hasura.SQL.Backend
import Hasura.SQL.Types
import Hasura.Server.Migrate.Internal
import Hasura.Server.Migrate.LatestVersion
import Hasura.Server.Migrate.Version
import Hasura.Server.Types
import Hasura.Session
import Hasura.Tracing qualified as Tracing
import Text.Shakespeare.Text qualified as ST

fetchUndeliveredEvents ::
  (MonadIO m, MonadError QErr m) =>
  SourceConfig ('Postgres pgKind) ->
  SourceName ->
  [TriggerName] ->
  MaintenanceMode () ->
  FetchBatchSize ->
  m [Event ('Postgres pgKind)]
fetchUndeliveredEvents :: SourceConfig ('Postgres pgKind)
-> SourceName
-> [TriggerName]
-> MaintenanceMode ()
-> FetchBatchSize
-> m [Event ('Postgres pgKind)]
fetchUndeliveredEvents SourceConfig ('Postgres pgKind)
sourceConfig SourceName
sourceName [TriggerName]
triggerNames MaintenanceMode ()
maintenanceMode FetchBatchSize
fetchBatchSize = do
  Either QErr (TxE QErr [Event ('Postgres pgKind)])
fetchEventsTxE <-
    case MaintenanceMode ()
maintenanceMode of
      MaintenanceModeEnabled () -> do
        Either QErr MaintenanceModeVersion
maintenanceModeVersion <- IO (Either QErr MaintenanceModeVersion)
-> m (Either QErr MaintenanceModeVersion)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either QErr MaintenanceModeVersion)
 -> m (Either QErr MaintenanceModeVersion))
-> IO (Either QErr MaintenanceModeVersion)
-> m (Either QErr MaintenanceModeVersion)
forall a b. (a -> b) -> a -> b
$ PGSourceConfig
-> TxET QErr IO MaintenanceModeVersion
-> IO (Either QErr MaintenanceModeVersion)
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> TxET QErr m a -> m (Either QErr a)
runPgSourceReadTx PGSourceConfig
SourceConfig ('Postgres pgKind)
sourceConfig TxET QErr IO MaintenanceModeVersion
getMaintenanceModeVersionTx
        Either QErr (TxE QErr [Event ('Postgres pgKind)])
-> m (Either QErr (TxE QErr [Event ('Postgres pgKind)]))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either QErr (TxE QErr [Event ('Postgres pgKind)])
 -> m (Either QErr (TxE QErr [Event ('Postgres pgKind)])))
-> Either QErr (TxE QErr [Event ('Postgres pgKind)])
-> m (Either QErr (TxE QErr [Event ('Postgres pgKind)]))
forall a b. (a -> b) -> a -> b
$ (MaintenanceModeVersion -> TxE QErr [Event ('Postgres pgKind)])
-> Either QErr MaintenanceModeVersion
-> Either QErr (TxE QErr [Event ('Postgres pgKind)])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (SourceName
-> [TriggerName]
-> FetchBatchSize
-> MaintenanceModeVersion
-> TxE QErr [Event ('Postgres pgKind)]
forall (pgKind :: PostgresKind).
SourceName
-> [TriggerName]
-> FetchBatchSize
-> MaintenanceModeVersion
-> TxE QErr [Event ('Postgres pgKind)]
fetchEventsMaintenanceMode SourceName
sourceName [TriggerName]
triggerNames FetchBatchSize
fetchBatchSize) Either QErr MaintenanceModeVersion
maintenanceModeVersion
      MaintenanceMode ()
MaintenanceModeDisabled -> Either QErr (TxE QErr [Event ('Postgres pgKind)])
-> m (Either QErr (TxE QErr [Event ('Postgres pgKind)]))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either QErr (TxE QErr [Event ('Postgres pgKind)])
 -> m (Either QErr (TxE QErr [Event ('Postgres pgKind)])))
-> Either QErr (TxE QErr [Event ('Postgres pgKind)])
-> m (Either QErr (TxE QErr [Event ('Postgres pgKind)]))
forall a b. (a -> b) -> a -> b
$ TxE QErr [Event ('Postgres pgKind)]
-> Either QErr (TxE QErr [Event ('Postgres pgKind)])
forall a b. b -> Either a b
Right (TxE QErr [Event ('Postgres pgKind)]
 -> Either QErr (TxE QErr [Event ('Postgres pgKind)]))
-> TxE QErr [Event ('Postgres pgKind)]
-> Either QErr (TxE QErr [Event ('Postgres pgKind)])
forall a b. (a -> b) -> a -> b
$ SourceName
-> [TriggerName]
-> FetchBatchSize
-> TxE QErr [Event ('Postgres pgKind)]
forall (pgKind :: PostgresKind).
SourceName
-> [TriggerName]
-> FetchBatchSize
-> TxE QErr [Event ('Postgres pgKind)]
fetchEvents SourceName
sourceName [TriggerName]
triggerNames FetchBatchSize
fetchBatchSize
  case Either QErr (TxE QErr [Event ('Postgres pgKind)])
fetchEventsTxE of
    Left QErr
err -> QErr -> m [Event ('Postgres pgKind)]
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (QErr -> m [Event ('Postgres pgKind)])
-> QErr -> m [Event ('Postgres pgKind)]
forall a b. (a -> b) -> a -> b
$ Text -> QErr -> QErr
prefixQErr Text
"something went wrong while fetching events: " QErr
err
    Right TxE QErr [Event ('Postgres pgKind)]
fetchEventsTx ->
      m (Either QErr [Event ('Postgres pgKind)])
-> m [Event ('Postgres pgKind)]
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM (m (Either QErr [Event ('Postgres pgKind)])
 -> m [Event ('Postgres pgKind)])
-> m (Either QErr [Event ('Postgres pgKind)])
-> m [Event ('Postgres pgKind)]
forall a b. (a -> b) -> a -> b
$
        IO (Either QErr [Event ('Postgres pgKind)])
-> m (Either QErr [Event ('Postgres pgKind)])
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either QErr [Event ('Postgres pgKind)])
 -> m (Either QErr [Event ('Postgres pgKind)]))
-> IO (Either QErr [Event ('Postgres pgKind)])
-> m (Either QErr [Event ('Postgres pgKind)])
forall a b. (a -> b) -> a -> b
$
          PGSourceConfig
-> TxE QErr [Event ('Postgres pgKind)]
-> IO (Either QErr [Event ('Postgres pgKind)])
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> TxET QErr m a -> m (Either QErr a)
runPgSourceWriteTx PGSourceConfig
SourceConfig ('Postgres pgKind)
sourceConfig TxE QErr [Event ('Postgres pgKind)]
fetchEventsTx

setRetry ::
  ( MonadIO m,
    MonadError QErr m
  ) =>
  SourceConfig ('Postgres pgKind) ->
  Event ('Postgres pgKind) ->
  Time.UTCTime ->
  MaintenanceMode MaintenanceModeVersion ->
  m ()
setRetry :: SourceConfig ('Postgres pgKind)
-> Event ('Postgres pgKind)
-> UTCTime
-> MaintenanceMode MaintenanceModeVersion
-> m ()
setRetry SourceConfig ('Postgres pgKind)
sourceConfig Event ('Postgres pgKind)
event UTCTime
retryTime MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion =
  m (Either QErr ()) -> m ()
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM (m (Either QErr ()) -> m ()) -> m (Either QErr ()) -> m ()
forall a b. (a -> b) -> a -> b
$ IO (Either QErr ()) -> m (Either QErr ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either QErr ()) -> m (Either QErr ()))
-> IO (Either QErr ()) -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$ PGSourceConfig -> TxET QErr IO () -> IO (Either QErr ())
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> TxET QErr m a -> m (Either QErr a)
runPgSourceWriteTx PGSourceConfig
SourceConfig ('Postgres pgKind)
sourceConfig (Event ('Postgres pgKind)
-> UTCTime
-> MaintenanceMode MaintenanceModeVersion
-> TxET QErr IO ()
forall (pgKind :: PostgresKind).
Event ('Postgres pgKind)
-> UTCTime
-> MaintenanceMode MaintenanceModeVersion
-> TxET QErr IO ()
setRetryTx Event ('Postgres pgKind)
event UTCTime
retryTime MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion)

insertManualEvent ::
  (MonadIO m, MonadError QErr m) =>
  SourceConfig ('Postgres pgKind) ->
  TableName ('Postgres pgKind) ->
  TriggerName ->
  Value ->
  UserInfo ->
  Tracing.TraceContext ->
  m EventId
insertManualEvent :: SourceConfig ('Postgres pgKind)
-> TableName ('Postgres pgKind)
-> TriggerName
-> Value
-> UserInfo
-> TraceContext
-> m EventId
insertManualEvent SourceConfig ('Postgres pgKind)
sourceConfig TableName ('Postgres pgKind)
tableName TriggerName
triggerName Value
payload UserInfo
userInfo TraceContext
traceCtx =
  -- NOTE: The methods `setTraceContextInTx` and `setHeadersTx` are being used
  -- to ensure that the trace context and user info are set with valid values
  -- while being used in the PG function `insert_event_log`.
  -- See Issue(#7087) for more details on a bug that was being caused
  -- in the absence of these methods.
  m (Either QErr EventId) -> m EventId
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM (m (Either QErr EventId) -> m EventId)
-> m (Either QErr EventId) -> m EventId
forall a b. (a -> b) -> a -> b
$
    IO (Either QErr EventId) -> m (Either QErr EventId)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either QErr EventId) -> m (Either QErr EventId))
-> IO (Either QErr EventId) -> m (Either QErr EventId)
forall a b. (a -> b) -> a -> b
$
      PGSourceConfig -> TxET QErr IO EventId -> IO (Either QErr EventId)
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> TxET QErr m a -> m (Either QErr a)
runPgSourceWriteTx PGSourceConfig
SourceConfig ('Postgres pgKind)
sourceConfig (TxET QErr IO EventId -> IO (Either QErr EventId))
-> TxET QErr IO EventId -> IO (Either QErr EventId)
forall a b. (a -> b) -> a -> b
$
        SessionVariables -> TxET QErr IO ()
forall (m :: * -> *).
MonadIO m =>
SessionVariables -> TxET QErr m ()
setHeadersTx (UserInfo -> SessionVariables
_uiSession UserInfo
userInfo)
          TxET QErr IO () -> TxET QErr IO () -> TxET QErr IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> TraceContext -> TxET QErr IO ()
forall (m :: * -> *). MonadIO m => TraceContext -> TxET QErr m ()
setTraceContextInTx TraceContext
traceCtx
          TxET QErr IO () -> TxET QErr IO EventId -> TxET QErr IO EventId
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> QualifiedTable -> TriggerName -> Value -> TxET QErr IO EventId
insertPGManualEvent TableName ('Postgres pgKind)
QualifiedTable
tableName TriggerName
triggerName Value
payload

getMaintenanceModeVersion ::
  ( MonadIO m,
    MonadError QErr m
  ) =>
  SourceConfig ('Postgres pgKind) ->
  m MaintenanceModeVersion
getMaintenanceModeVersion :: SourceConfig ('Postgres pgKind) -> m MaintenanceModeVersion
getMaintenanceModeVersion SourceConfig ('Postgres pgKind)
sourceConfig =
  m (Either QErr MaintenanceModeVersion) -> m MaintenanceModeVersion
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM (m (Either QErr MaintenanceModeVersion)
 -> m MaintenanceModeVersion)
-> m (Either QErr MaintenanceModeVersion)
-> m MaintenanceModeVersion
forall a b. (a -> b) -> a -> b
$ IO (Either QErr MaintenanceModeVersion)
-> m (Either QErr MaintenanceModeVersion)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either QErr MaintenanceModeVersion)
 -> m (Either QErr MaintenanceModeVersion))
-> IO (Either QErr MaintenanceModeVersion)
-> m (Either QErr MaintenanceModeVersion)
forall a b. (a -> b) -> a -> b
$ PGSourceConfig
-> TxET QErr IO MaintenanceModeVersion
-> IO (Either QErr MaintenanceModeVersion)
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> TxET QErr m a -> m (Either QErr a)
runPgSourceReadTx PGSourceConfig
SourceConfig ('Postgres pgKind)
sourceConfig TxET QErr IO MaintenanceModeVersion
getMaintenanceModeVersionTx

recordSuccess ::
  (MonadIO m) =>
  SourceConfig ('Postgres pgKind) ->
  Event ('Postgres pgKind) ->
  Invocation 'EventType ->
  MaintenanceMode MaintenanceModeVersion ->
  m (Either QErr ())
recordSuccess :: SourceConfig ('Postgres pgKind)
-> Event ('Postgres pgKind)
-> Invocation 'EventType
-> MaintenanceMode MaintenanceModeVersion
-> m (Either QErr ())
recordSuccess SourceConfig ('Postgres pgKind)
sourceConfig Event ('Postgres pgKind)
event Invocation 'EventType
invocation MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion =
  IO (Either QErr ()) -> m (Either QErr ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either QErr ()) -> m (Either QErr ()))
-> IO (Either QErr ()) -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$
    PGSourceConfig -> TxET QErr IO () -> IO (Either QErr ())
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> TxET QErr m a -> m (Either QErr a)
runPgSourceWriteTx PGSourceConfig
SourceConfig ('Postgres pgKind)
sourceConfig (TxET QErr IO () -> IO (Either QErr ()))
-> TxET QErr IO () -> IO (Either QErr ())
forall a b. (a -> b) -> a -> b
$ do
      Invocation 'EventType -> TxET QErr IO ()
insertInvocation Invocation 'EventType
invocation
      Event ('Postgres pgKind)
-> MaintenanceMode MaintenanceModeVersion -> TxET QErr IO ()
forall (pgKind :: PostgresKind).
Event ('Postgres pgKind)
-> MaintenanceMode MaintenanceModeVersion -> TxET QErr IO ()
setSuccessTx Event ('Postgres pgKind)
event MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion

recordError ::
  (MonadIO m) =>
  SourceConfig ('Postgres pgKind) ->
  Event ('Postgres pgKind) ->
  Invocation 'EventType ->
  ProcessEventError ->
  MaintenanceMode MaintenanceModeVersion ->
  m (Either QErr ())
recordError :: SourceConfig ('Postgres pgKind)
-> Event ('Postgres pgKind)
-> Invocation 'EventType
-> ProcessEventError
-> MaintenanceMode MaintenanceModeVersion
-> m (Either QErr ())
recordError SourceConfig ('Postgres pgKind)
sourceConfig Event ('Postgres pgKind)
event Invocation 'EventType
invocation ProcessEventError
processEventError MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion =
  SourceConfig ('Postgres pgKind)
-> Event ('Postgres pgKind)
-> Maybe (Invocation 'EventType)
-> ProcessEventError
-> MaintenanceMode MaintenanceModeVersion
-> m (Either QErr ())
forall (m :: * -> *) (pgKind :: PostgresKind).
MonadIO m =>
SourceConfig ('Postgres pgKind)
-> Event ('Postgres pgKind)
-> Maybe (Invocation 'EventType)
-> ProcessEventError
-> MaintenanceMode MaintenanceModeVersion
-> m (Either QErr ())
recordError' SourceConfig ('Postgres pgKind)
sourceConfig Event ('Postgres pgKind)
event (Invocation 'EventType -> Maybe (Invocation 'EventType)
forall a. a -> Maybe a
Just Invocation 'EventType
invocation) ProcessEventError
processEventError MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion

recordError' ::
  (MonadIO m) =>
  SourceConfig ('Postgres pgKind) ->
  Event ('Postgres pgKind) ->
  Maybe (Invocation 'EventType) ->
  ProcessEventError ->
  MaintenanceMode MaintenanceModeVersion ->
  m (Either QErr ())
recordError' :: SourceConfig ('Postgres pgKind)
-> Event ('Postgres pgKind)
-> Maybe (Invocation 'EventType)
-> ProcessEventError
-> MaintenanceMode MaintenanceModeVersion
-> m (Either QErr ())
recordError' SourceConfig ('Postgres pgKind)
sourceConfig Event ('Postgres pgKind)
event Maybe (Invocation 'EventType)
invocation ProcessEventError
processEventError MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion =
  IO (Either QErr ()) -> m (Either QErr ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either QErr ()) -> m (Either QErr ()))
-> IO (Either QErr ()) -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$
    PGSourceConfig -> TxET QErr IO () -> IO (Either QErr ())
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> TxET QErr m a -> m (Either QErr a)
runPgSourceWriteTx PGSourceConfig
SourceConfig ('Postgres pgKind)
sourceConfig (TxET QErr IO () -> IO (Either QErr ()))
-> TxET QErr IO () -> IO (Either QErr ())
forall a b. (a -> b) -> a -> b
$ do
      Maybe (Invocation 'EventType)
-> (Invocation 'EventType -> TxET QErr IO ()) -> TxET QErr IO ()
forall (m :: * -> *) a.
Applicative m =>
Maybe a -> (a -> m ()) -> m ()
onJust Maybe (Invocation 'EventType)
invocation Invocation 'EventType -> TxET QErr IO ()
insertInvocation
      case ProcessEventError
processEventError of
        PESetRetry UTCTime
retryTime -> Event ('Postgres pgKind)
-> UTCTime
-> MaintenanceMode MaintenanceModeVersion
-> TxET QErr IO ()
forall (pgKind :: PostgresKind).
Event ('Postgres pgKind)
-> UTCTime
-> MaintenanceMode MaintenanceModeVersion
-> TxET QErr IO ()
setRetryTx Event ('Postgres pgKind)
event UTCTime
retryTime MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion
        ProcessEventError
PESetError -> Event ('Postgres pgKind)
-> MaintenanceMode MaintenanceModeVersion -> TxET QErr IO ()
forall (pgKind :: PostgresKind).
Event ('Postgres pgKind)
-> MaintenanceMode MaintenanceModeVersion -> TxET QErr IO ()
setErrorTx Event ('Postgres pgKind)
event MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion

redeliverEvent ::
  (MonadIO m, MonadError QErr m) =>
  SourceConfig ('Postgres pgKind) ->
  EventId ->
  m ()
redeliverEvent :: SourceConfig ('Postgres pgKind) -> EventId -> m ()
redeliverEvent SourceConfig ('Postgres pgKind)
sourceConfig EventId
eventId =
  m (Either QErr ()) -> m ()
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM (m (Either QErr ()) -> m ()) -> m (Either QErr ()) -> m ()
forall a b. (a -> b) -> a -> b
$ IO (Either QErr ()) -> m (Either QErr ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either QErr ()) -> m (Either QErr ()))
-> IO (Either QErr ()) -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$ PGSourceConfig -> TxET QErr IO () -> IO (Either QErr ())
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> TxET QErr m a -> m (Either QErr a)
runPgSourceWriteTx PGSourceConfig
SourceConfig ('Postgres pgKind)
sourceConfig (EventId -> TxET QErr IO ()
redeliverEventTx EventId
eventId)

dropTriggerAndArchiveEvents ::
  ( MonadIO m,
    MonadError QErr m
  ) =>
  SourceConfig ('Postgres pgKind) ->
  TriggerName ->
  QualifiedTable ->
  m ()
dropTriggerAndArchiveEvents :: SourceConfig ('Postgres pgKind)
-> TriggerName -> QualifiedTable -> m ()
dropTriggerAndArchiveEvents SourceConfig ('Postgres pgKind)
sourceConfig TriggerName
triggerName QualifiedTable
_table =
  m (Either QErr ()) -> m ()
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM (m (Either QErr ()) -> m ()) -> m (Either QErr ()) -> m ()
forall a b. (a -> b) -> a -> b
$
    IO (Either QErr ()) -> m (Either QErr ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either QErr ()) -> m (Either QErr ()))
-> IO (Either QErr ()) -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$
      PGSourceConfig -> TxET QErr IO () -> IO (Either QErr ())
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> TxET QErr m a -> m (Either QErr a)
runPgSourceWriteTx PGSourceConfig
SourceConfig ('Postgres pgKind)
sourceConfig (TxET QErr IO () -> IO (Either QErr ()))
-> TxET QErr IO () -> IO (Either QErr ())
forall a b. (a -> b) -> a -> b
$ do
        TriggerName -> TxET QErr IO ()
dropTriggerQ TriggerName
triggerName
        TriggerName -> TxET QErr IO ()
archiveEvents TriggerName
triggerName

createMissingSQLTriggers ::
  ( MonadIO m,
    MonadError QErr m,
    MonadBaseControl IO m,
    HasServerConfigCtx m,
    Backend ('Postgres pgKind)
  ) =>
  PGSourceConfig ->
  TableName ('Postgres pgKind) ->
  ([(ColumnInfo ('Postgres pgKind))], Maybe (PrimaryKey ('Postgres pgKind) (ColumnInfo ('Postgres pgKind)))) ->
  TriggerName ->
  TriggerOpsDef ('Postgres pgKind) ->
  m ()
createMissingSQLTriggers :: PGSourceConfig
-> TableName ('Postgres pgKind)
-> ([ColumnInfo ('Postgres pgKind)],
    Maybe
      (PrimaryKey ('Postgres pgKind) (ColumnInfo ('Postgres pgKind))))
-> TriggerName
-> TriggerOpsDef ('Postgres pgKind)
-> m ()
createMissingSQLTriggers PGSourceConfig
sourceConfig TableName ('Postgres pgKind)
table ([ColumnInfo ('Postgres pgKind)]
allCols, Maybe
  (PrimaryKey ('Postgres pgKind) (ColumnInfo ('Postgres pgKind)))
_) TriggerName
triggerName TriggerOpsDef ('Postgres pgKind)
opsDefinition = do
  ServerConfigCtx
serverConfigCtx <- m ServerConfigCtx
forall (m :: * -> *). HasServerConfigCtx m => m ServerConfigCtx
askServerConfigCtx
  m (Either QErr ()) -> m ()
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM (m (Either QErr ()) -> m ()) -> m (Either QErr ()) -> m ()
forall a b. (a -> b) -> a -> b
$
    PGSourceConfig -> TxET QErr m () -> m (Either QErr ())
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> TxET QErr m a -> m (Either QErr a)
runPgSourceWriteTx PGSourceConfig
sourceConfig (TxET QErr m () -> m (Either QErr ()))
-> TxET QErr m () -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$ do
      Maybe (SubscribeOpSpec ('Postgres pgKind))
-> (SubscribeOpSpec ('Postgres pgKind) -> TxET QErr m ())
-> TxET QErr m ()
forall (m :: * -> *) a.
Applicative m =>
Maybe a -> (a -> m ()) -> m ()
onJust (TriggerOpsDef ('Postgres pgKind)
-> Maybe (SubscribeOpSpec ('Postgres pgKind))
forall (b :: BackendType).
TriggerOpsDef b -> Maybe (SubscribeOpSpec b)
tdInsert TriggerOpsDef ('Postgres pgKind)
opsDefinition) (ServerConfigCtx
-> Ops -> SubscribeOpSpec ('Postgres pgKind) -> TxET QErr m ()
doesSQLTriggerExist ServerConfigCtx
serverConfigCtx Ops
INSERT)
      Maybe (SubscribeOpSpec ('Postgres pgKind))
-> (SubscribeOpSpec ('Postgres pgKind) -> TxET QErr m ())
-> TxET QErr m ()
forall (m :: * -> *) a.
Applicative m =>
Maybe a -> (a -> m ()) -> m ()
onJust (TriggerOpsDef ('Postgres pgKind)
-> Maybe (SubscribeOpSpec ('Postgres pgKind))
forall (b :: BackendType).
TriggerOpsDef b -> Maybe (SubscribeOpSpec b)
tdUpdate TriggerOpsDef ('Postgres pgKind)
opsDefinition) (ServerConfigCtx
-> Ops -> SubscribeOpSpec ('Postgres pgKind) -> TxET QErr m ()
doesSQLTriggerExist ServerConfigCtx
serverConfigCtx Ops
UPDATE)
      Maybe (SubscribeOpSpec ('Postgres pgKind))
-> (SubscribeOpSpec ('Postgres pgKind) -> TxET QErr m ())
-> TxET QErr m ()
forall (m :: * -> *) a.
Applicative m =>
Maybe a -> (a -> m ()) -> m ()
onJust (TriggerOpsDef ('Postgres pgKind)
-> Maybe (SubscribeOpSpec ('Postgres pgKind))
forall (b :: BackendType).
TriggerOpsDef b -> Maybe (SubscribeOpSpec b)
tdDelete TriggerOpsDef ('Postgres pgKind)
opsDefinition) (ServerConfigCtx
-> Ops -> SubscribeOpSpec ('Postgres pgKind) -> TxET QErr m ()
doesSQLTriggerExist ServerConfigCtx
serverConfigCtx Ops
DELETE)
  where
    doesSQLTriggerExist :: ServerConfigCtx
-> Ops -> SubscribeOpSpec ('Postgres pgKind) -> TxET QErr m ()
doesSQLTriggerExist ServerConfigCtx
serverConfigCtx Ops
op SubscribeOpSpec ('Postgres pgKind)
opSpec = do
      let opTriggerName :: QualifiedTriggerName
opTriggerName = Ops -> TriggerName -> QualifiedTriggerName
pgTriggerName Ops
op TriggerName
triggerName
      Bool
doesOpTriggerFunctionExist <-
        Identity Bool -> Bool
forall a. Identity a -> a
runIdentity (Identity Bool -> Bool)
-> (SingleRow (Identity Bool) -> Identity Bool)
-> SingleRow (Identity Bool)
-> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SingleRow (Identity Bool) -> Identity Bool
forall a. SingleRow a -> a
Q.getRow
          (SingleRow (Identity Bool) -> Bool)
-> TxET QErr m (SingleRow (Identity Bool)) -> TxET QErr m Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> Identity QualifiedTriggerName
-> Bool
-> TxET QErr m (SingleRow (Identity Bool))
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
Q.withQE
            PGTxErr -> QErr
defaultTxErrorHandler
            [Q.sql|
                 SELECT EXISTS
                   ( SELECT 1
                     FROM pg_proc
                     WHERE proname = $1
                   )
              |]
            (QualifiedTriggerName -> Identity QualifiedTriggerName
forall a. a -> Identity a
Identity QualifiedTriggerName
opTriggerName)
            Bool
True
      Bool -> TxET QErr m () -> TxET QErr m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
doesOpTriggerFunctionExist (TxET QErr m () -> TxET QErr m ())
-> TxET QErr m () -> TxET QErr m ()
forall a b. (a -> b) -> a -> b
$
        (ReaderT ServerConfigCtx (TxET QErr m) ()
 -> ServerConfigCtx -> TxET QErr m ())
-> ServerConfigCtx
-> ReaderT ServerConfigCtx (TxET QErr m) ()
-> TxET QErr m ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip ReaderT ServerConfigCtx (TxET QErr m) ()
-> ServerConfigCtx -> TxET QErr m ()
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT ServerConfigCtx
serverConfigCtx (ReaderT ServerConfigCtx (TxET QErr m) () -> TxET QErr m ())
-> ReaderT ServerConfigCtx (TxET QErr m) () -> TxET QErr m ()
forall a b. (a -> b) -> a -> b
$ TriggerName
-> QualifiedTable
-> [ColumnInfo ('Postgres pgKind)]
-> Ops
-> SubscribeOpSpec ('Postgres pgKind)
-> ReaderT ServerConfigCtx (TxET QErr m) ()
forall (pgKind :: PostgresKind) (m :: * -> *).
(Backend ('Postgres pgKind), MonadTx m,
 MonadReader ServerConfigCtx m) =>
TriggerName
-> QualifiedTable
-> [ColumnInfo ('Postgres pgKind)]
-> Ops
-> SubscribeOpSpec ('Postgres pgKind)
-> m ()
mkTrigger TriggerName
triggerName TableName ('Postgres pgKind)
QualifiedTable
table [ColumnInfo ('Postgres pgKind)]
allCols Ops
op SubscribeOpSpec ('Postgres pgKind)
opSpec

createTableEventTrigger ::
  (Backend ('Postgres pgKind), MonadIO m, MonadBaseControl IO m) =>
  ServerConfigCtx ->
  PGSourceConfig ->
  QualifiedTable ->
  [ColumnInfo ('Postgres pgKind)] ->
  TriggerName ->
  TriggerOpsDef ('Postgres pgKind) ->
  Maybe (PrimaryKey ('Postgres pgKind) (ColumnInfo ('Postgres pgKind))) ->
  m (Either QErr ())
createTableEventTrigger :: ServerConfigCtx
-> PGSourceConfig
-> QualifiedTable
-> [ColumnInfo ('Postgres pgKind)]
-> TriggerName
-> TriggerOpsDef ('Postgres pgKind)
-> Maybe
     (PrimaryKey ('Postgres pgKind) (ColumnInfo ('Postgres pgKind)))
-> m (Either QErr ())
createTableEventTrigger ServerConfigCtx
serverConfigCtx PGSourceConfig
sourceConfig QualifiedTable
table [ColumnInfo ('Postgres pgKind)]
columns TriggerName
triggerName TriggerOpsDef ('Postgres pgKind)
opsDefinition Maybe
  (PrimaryKey ('Postgres pgKind) (ColumnInfo ('Postgres pgKind)))
_ = PGSourceConfig -> TxET QErr m () -> m (Either QErr ())
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> TxET QErr m a -> m (Either QErr a)
runPgSourceWriteTx PGSourceConfig
sourceConfig (TxET QErr m () -> m (Either QErr ()))
-> TxET QErr m () -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$ do
  -- Create the given triggers
  (ReaderT ServerConfigCtx (TxET QErr m) ()
 -> ServerConfigCtx -> TxET QErr m ())
-> ServerConfigCtx
-> ReaderT ServerConfigCtx (TxET QErr m) ()
-> TxET QErr m ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip ReaderT ServerConfigCtx (TxET QErr m) ()
-> ServerConfigCtx -> TxET QErr m ()
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT ServerConfigCtx
serverConfigCtx (ReaderT ServerConfigCtx (TxET QErr m) () -> TxET QErr m ())
-> ReaderT ServerConfigCtx (TxET QErr m) () -> TxET QErr m ()
forall a b. (a -> b) -> a -> b
$
    TriggerName
-> QualifiedTable
-> [ColumnInfo ('Postgres pgKind)]
-> TriggerOpsDef ('Postgres pgKind)
-> ReaderT ServerConfigCtx (TxET QErr m) ()
forall (pgKind :: PostgresKind) (m :: * -> *).
(Backend ('Postgres pgKind), MonadTx m,
 MonadReader ServerConfigCtx m) =>
TriggerName
-> QualifiedTable
-> [ColumnInfo ('Postgres pgKind)]
-> TriggerOpsDef ('Postgres pgKind)
-> m ()
mkAllTriggersQ TriggerName
triggerName QualifiedTable
table [ColumnInfo ('Postgres pgKind)]
columns TriggerOpsDef ('Postgres pgKind)
opsDefinition

dropDanglingSQLTrigger ::
  ( MonadIO m,
    MonadError QErr m
  ) =>
  SourceConfig ('Postgres pgKind) ->
  TriggerName ->
  QualifiedTable ->
  HashSet Ops ->
  m ()
dropDanglingSQLTrigger :: SourceConfig ('Postgres pgKind)
-> TriggerName -> QualifiedTable -> HashSet Ops -> m ()
dropDanglingSQLTrigger SourceConfig ('Postgres pgKind)
sourceConfig TriggerName
triggerName QualifiedTable
_ HashSet Ops
ops =
  m (Either QErr ()) -> m ()
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM (m (Either QErr ()) -> m ()) -> m (Either QErr ()) -> m ()
forall a b. (a -> b) -> a -> b
$
    IO (Either QErr ()) -> m (Either QErr ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either QErr ()) -> m (Either QErr ()))
-> IO (Either QErr ()) -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$
      PGSourceConfig -> TxET QErr IO () -> IO (Either QErr ())
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> TxET QErr m a -> m (Either QErr a)
runPgSourceWriteTx PGSourceConfig
SourceConfig ('Postgres pgKind)
sourceConfig (TxET QErr IO () -> IO (Either QErr ()))
-> TxET QErr IO () -> IO (Either QErr ())
forall a b. (a -> b) -> a -> b
$
        (Ops -> TxET QErr IO ()) -> HashSet Ops -> TxET QErr IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (TriggerName -> Ops -> TxET QErr IO ()
dropTriggerOp TriggerName
triggerName) HashSet Ops
ops

updateColumnInEventTrigger ::
  QualifiedTable ->
  PGCol ->
  PGCol ->
  QualifiedTable ->
  EventTriggerConf ('Postgres pgKind) ->
  EventTriggerConf ('Postgres pgKind)
updateColumnInEventTrigger :: QualifiedTable
-> PGCol
-> PGCol
-> QualifiedTable
-> EventTriggerConf ('Postgres pgKind)
-> EventTriggerConf ('Postgres pgKind)
updateColumnInEventTrigger QualifiedTable
table PGCol
oCol PGCol
nCol QualifiedTable
refTable = EventTriggerConf ('Postgres pgKind)
-> EventTriggerConf ('Postgres pgKind)
rewriteEventTriggerConf
  where
    rewriteSubsCols :: SubscribeColumns ('Postgres pgKind)
-> SubscribeColumns ('Postgres pgKind)
rewriteSubsCols = \case
      SubscribeColumns ('Postgres pgKind)
SubCStar -> SubscribeColumns ('Postgres pgKind)
forall (b :: BackendType). SubscribeColumns b
SubCStar
      SubCArray [Column ('Postgres pgKind)]
cols -> [Column ('Postgres pgKind)] -> SubscribeColumns ('Postgres pgKind)
forall (b :: BackendType). [Column b] -> SubscribeColumns b
SubCArray ([Column ('Postgres pgKind)]
 -> SubscribeColumns ('Postgres pgKind))
-> [Column ('Postgres pgKind)]
-> SubscribeColumns ('Postgres pgKind)
forall a b. (a -> b) -> a -> b
$ (PGCol -> PGCol) -> [PGCol] -> [PGCol]
forall a b. (a -> b) -> [a] -> [b]
map PGCol -> PGCol
getNewCol [Column ('Postgres pgKind)]
[PGCol]
cols
    rewriteOpSpec :: SubscribeOpSpec ('Postgres pgKind)
-> SubscribeOpSpec ('Postgres pgKind)
rewriteOpSpec (SubscribeOpSpec SubscribeColumns ('Postgres pgKind)
listenColumns Maybe (SubscribeColumns ('Postgres pgKind))
deliveryColumns) =
      SubscribeColumns ('Postgres pgKind)
-> Maybe (SubscribeColumns ('Postgres pgKind))
-> SubscribeOpSpec ('Postgres pgKind)
forall (b :: BackendType).
SubscribeColumns b
-> Maybe (SubscribeColumns b) -> SubscribeOpSpec b
SubscribeOpSpec
        (SubscribeColumns ('Postgres pgKind)
-> SubscribeColumns ('Postgres pgKind)
rewriteSubsCols SubscribeColumns ('Postgres pgKind)
listenColumns)
        (SubscribeColumns ('Postgres pgKind)
-> SubscribeColumns ('Postgres pgKind)
rewriteSubsCols (SubscribeColumns ('Postgres pgKind)
 -> SubscribeColumns ('Postgres pgKind))
-> Maybe (SubscribeColumns ('Postgres pgKind))
-> Maybe (SubscribeColumns ('Postgres pgKind))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (SubscribeColumns ('Postgres pgKind))
deliveryColumns)
    rewriteTrigOpsDef :: TriggerOpsDef ('Postgres pgKind)
-> TriggerOpsDef ('Postgres pgKind)
rewriteTrigOpsDef (TriggerOpsDef Maybe (SubscribeOpSpec ('Postgres pgKind))
ins Maybe (SubscribeOpSpec ('Postgres pgKind))
upd Maybe (SubscribeOpSpec ('Postgres pgKind))
del Maybe Bool
man) =
      Maybe (SubscribeOpSpec ('Postgres pgKind))
-> Maybe (SubscribeOpSpec ('Postgres pgKind))
-> Maybe (SubscribeOpSpec ('Postgres pgKind))
-> Maybe Bool
-> TriggerOpsDef ('Postgres pgKind)
forall (b :: BackendType).
Maybe (SubscribeOpSpec b)
-> Maybe (SubscribeOpSpec b)
-> Maybe (SubscribeOpSpec b)
-> Maybe Bool
-> TriggerOpsDef b
TriggerOpsDef
        (SubscribeOpSpec ('Postgres pgKind)
-> SubscribeOpSpec ('Postgres pgKind)
rewriteOpSpec (SubscribeOpSpec ('Postgres pgKind)
 -> SubscribeOpSpec ('Postgres pgKind))
-> Maybe (SubscribeOpSpec ('Postgres pgKind))
-> Maybe (SubscribeOpSpec ('Postgres pgKind))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (SubscribeOpSpec ('Postgres pgKind))
ins)
        (SubscribeOpSpec ('Postgres pgKind)
-> SubscribeOpSpec ('Postgres pgKind)
rewriteOpSpec (SubscribeOpSpec ('Postgres pgKind)
 -> SubscribeOpSpec ('Postgres pgKind))
-> Maybe (SubscribeOpSpec ('Postgres pgKind))
-> Maybe (SubscribeOpSpec ('Postgres pgKind))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (SubscribeOpSpec ('Postgres pgKind))
upd)
        (SubscribeOpSpec ('Postgres pgKind)
-> SubscribeOpSpec ('Postgres pgKind)
rewriteOpSpec (SubscribeOpSpec ('Postgres pgKind)
 -> SubscribeOpSpec ('Postgres pgKind))
-> Maybe (SubscribeOpSpec ('Postgres pgKind))
-> Maybe (SubscribeOpSpec ('Postgres pgKind))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (SubscribeOpSpec ('Postgres pgKind))
del)
        Maybe Bool
man
    rewriteEventTriggerConf :: EventTriggerConf ('Postgres pgKind)
-> EventTriggerConf ('Postgres pgKind)
rewriteEventTriggerConf EventTriggerConf ('Postgres pgKind)
etc =
      EventTriggerConf ('Postgres pgKind)
etc
        { etcDefinition :: TriggerOpsDef ('Postgres pgKind)
etcDefinition =
            TriggerOpsDef ('Postgres pgKind)
-> TriggerOpsDef ('Postgres pgKind)
rewriteTrigOpsDef (TriggerOpsDef ('Postgres pgKind)
 -> TriggerOpsDef ('Postgres pgKind))
-> TriggerOpsDef ('Postgres pgKind)
-> TriggerOpsDef ('Postgres pgKind)
forall a b. (a -> b) -> a -> b
$ EventTriggerConf ('Postgres pgKind)
-> TriggerOpsDef ('Postgres pgKind)
forall (b :: BackendType). EventTriggerConf b -> TriggerOpsDef b
etcDefinition EventTriggerConf ('Postgres pgKind)
etc
        }
    getNewCol :: PGCol -> PGCol
getNewCol PGCol
col =
      if QualifiedTable
table QualifiedTable -> QualifiedTable -> Bool
forall a. Eq a => a -> a -> Bool
== QualifiedTable
refTable Bool -> Bool -> Bool
&& PGCol
oCol PGCol -> PGCol -> Bool
forall a. Eq a => a -> a -> Bool
== PGCol
col then PGCol
nCol else PGCol
col

unlockEventsInSource ::
  MonadIO m =>
  SourceConfig ('Postgres pgKind) ->
  NE.NESet EventId ->
  m (Either QErr Int)
unlockEventsInSource :: SourceConfig ('Postgres pgKind)
-> NESet EventId -> m (Either QErr Int)
unlockEventsInSource SourceConfig ('Postgres pgKind)
sourceConfig NESet EventId
eventIds =
  IO (Either QErr Int) -> m (Either QErr Int)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either QErr Int) -> m (Either QErr Int))
-> IO (Either QErr Int) -> m (Either QErr Int)
forall a b. (a -> b) -> a -> b
$ PGSourceConfig -> TxET QErr IO Int -> IO (Either QErr Int)
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> TxET QErr m a -> m (Either QErr a)
runPgSourceWriteTx PGSourceConfig
SourceConfig ('Postgres pgKind)
sourceConfig ([EventId] -> TxET QErr IO Int
unlockEventsTx ([EventId] -> TxET QErr IO Int) -> [EventId] -> TxET QErr IO Int
forall a b. (a -> b) -> a -> b
$ NESet EventId -> [EventId]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList NESet EventId
eventIds)

-- Check if any trigger function for any of the operation exists with the 'triggerName'
checkIfTriggerExists ::
  (MonadIO m, MonadError QErr m) =>
  PGSourceConfig ->
  TriggerName ->
  HashSet Ops ->
  m Bool
checkIfTriggerExists :: PGSourceConfig -> TriggerName -> HashSet Ops -> m Bool
checkIfTriggerExists PGSourceConfig
sourceConfig TriggerName
triggerName HashSet Ops
ops = do
  m (Either QErr Bool) -> m Bool
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM (m (Either QErr Bool) -> m Bool) -> m (Either QErr Bool) -> m Bool
forall a b. (a -> b) -> a -> b
$
    IO (Either QErr Bool) -> m (Either QErr Bool)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either QErr Bool) -> m (Either QErr Bool))
-> IO (Either QErr Bool) -> m (Either QErr Bool)
forall a b. (a -> b) -> a -> b
$
      PGSourceConfig -> TxET QErr IO Bool -> IO (Either QErr Bool)
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> TxET QErr m a -> m (Either QErr a)
runPgSourceWriteTx PGSourceConfig
sourceConfig (TxET QErr IO Bool -> IO (Either QErr Bool))
-> TxET QErr IO Bool -> IO (Either QErr Bool)
forall a b. (a -> b) -> a -> b
$
        -- We want to avoid creating event triggers with same name since this will
        -- cause undesired behaviour. Note that only SQL functions associated with
        -- SQL triggers are dropped when "replace = true" is set in the event trigger
        -- configuration. Hence, the best way to check if we should allow the
        -- creation of a trigger with the name 'triggerName' is to check if any
        -- function with such a name exists in the the hdb_catalog.
        --
        -- For eg: If a create_event_trigger request comes with trigger name as
        -- "triggerName" and there is already a trigger with "triggerName" in the
        -- metadata, then
        --    1. When "replace = false", the function with name 'triggerName' exists
        --       so the creation is not allowed
        --    2. When "replace = true", the function with name 'triggerName' is first
        --       dropped, hence we are allowed to create the trigger with name
        --       'triggerName'
        ([Bool] -> Bool) -> TxET QErr IO [Bool] -> TxET QErr IO Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap [Bool] -> Bool
forall (t :: * -> *). Foldable t => t Bool -> Bool
or ((Ops -> TxET QErr IO Bool) -> [Ops] -> TxET QErr IO [Bool]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse (TriggerName -> Ops -> TxET QErr IO Bool
checkIfFunctionExistsQ TriggerName
triggerName) (HashSet Ops -> [Ops]
forall a. HashSet a -> [a]
HashSet.toList HashSet Ops
ops))

---- DATABASE QUERIES ---------------------
--
--   The API for our in-database work queue:
-------------------------------------------

insertInvocation :: Invocation 'EventType -> Q.TxE QErr ()
insertInvocation :: Invocation 'EventType -> TxET QErr IO ()
insertInvocation Invocation 'EventType
invo = do
  (PGTxErr -> QErr)
-> Query
-> (EventId, Maybe Int64, AltJ Value, AltJ Value)
-> Bool
-> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
Q.unitQE
    PGTxErr -> QErr
defaultTxErrorHandler
    [Q.sql|
          INSERT INTO hdb_catalog.event_invocation_logs (event_id, status, request, response)
          VALUES ($1, $2, $3, $4)
          |]
    ( Invocation 'EventType -> EventId
forall (a :: TriggerTypes). Invocation a -> EventId
iEventId Invocation 'EventType
invo,
      Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Int64) -> Maybe Int -> Maybe Int64
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Invocation 'EventType -> Maybe Int
forall (a :: TriggerTypes). Invocation a -> Maybe Int
iStatus Invocation 'EventType
invo :: Maybe Int64,
      Value -> AltJ Value
forall a. a -> AltJ a
Q.AltJ (Value -> AltJ Value) -> Value -> AltJ Value
forall a b. (a -> b) -> a -> b
$ WebhookRequest -> Value
forall a. ToJSON a => a -> Value
toJSON (WebhookRequest -> Value) -> WebhookRequest -> Value
forall a b. (a -> b) -> a -> b
$ Invocation 'EventType -> WebhookRequest
forall (a :: TriggerTypes). Invocation a -> WebhookRequest
iRequest Invocation 'EventType
invo,
      Value -> AltJ Value
forall a. a -> AltJ a
Q.AltJ (Value -> AltJ Value) -> Value -> AltJ Value
forall a b. (a -> b) -> a -> b
$ Response 'EventType -> Value
forall a. ToJSON a => a -> Value
toJSON (Response 'EventType -> Value) -> Response 'EventType -> Value
forall a b. (a -> b) -> a -> b
$ Invocation 'EventType -> Response 'EventType
forall (a :: TriggerTypes). Invocation a -> Response a
iResponse Invocation 'EventType
invo
    )
    Bool
True
  (PGTxErr -> QErr)
-> Query -> Identity EventId -> Bool -> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
Q.unitQE
    PGTxErr -> QErr
defaultTxErrorHandler
    [Q.sql|
          UPDATE hdb_catalog.event_log

          SET tries = tries + 1
          WHERE id = $1
          |]
    (EventId -> Identity EventId
forall a. a -> Identity a
Identity (EventId -> Identity EventId) -> EventId -> Identity EventId
forall a b. (a -> b) -> a -> b
$ Invocation 'EventType -> EventId
forall (a :: TriggerTypes). Invocation a -> EventId
iEventId Invocation 'EventType
invo)
    Bool
True

insertPGManualEvent ::
  QualifiedTable ->
  TriggerName ->
  Value ->
  Q.TxE QErr EventId
insertPGManualEvent :: QualifiedTable -> TriggerName -> Value -> TxET QErr IO EventId
insertPGManualEvent (QualifiedObject SchemaName
schemaName TableName
tableName) TriggerName
triggerName Value
rowData = do
  Identity EventId -> EventId
forall a. Identity a -> a
runIdentity (Identity EventId -> EventId)
-> (SingleRow (Identity EventId) -> Identity EventId)
-> SingleRow (Identity EventId)
-> EventId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SingleRow (Identity EventId) -> Identity EventId
forall a. SingleRow a -> a
Q.getRow
    (SingleRow (Identity EventId) -> EventId)
-> TxET QErr IO (SingleRow (Identity EventId))
-> TxET QErr IO EventId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> (SchemaName, TableName, TriggerName, Text, AltJ Value)
-> Bool
-> TxET QErr IO (SingleRow (Identity EventId))
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
Q.withQE
      PGTxErr -> QErr
defaultTxErrorHandler
      [Q.sql|
    SELECT hdb_catalog.insert_event_log($1, $2, $3, $4, $5)
  |]
      (SchemaName
schemaName, TableName
tableName, TriggerName
triggerName, (Ops -> Text
forall a. Show a => a -> Text
tshow Ops
MANUAL), Value -> AltJ Value
forall a. a -> AltJ a
Q.AltJ Value
rowData)
      Bool
False

archiveEvents :: TriggerName -> Q.TxE QErr ()
archiveEvents :: TriggerName -> TxET QErr IO ()
archiveEvents TriggerName
trn =
  (PGTxErr -> QErr)
-> Query -> Identity TriggerName -> Bool -> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
Q.unitQE
    PGTxErr -> QErr
defaultTxErrorHandler
    [Q.sql|
           UPDATE hdb_catalog.event_log
           SET archived = 't'
           WHERE trigger_name = $1
                |]
    (TriggerName -> Identity TriggerName
forall a. a -> Identity a
Identity TriggerName
trn)
    Bool
False

getMaintenanceModeVersionTx :: Q.TxE QErr MaintenanceModeVersion
getMaintenanceModeVersionTx :: TxET QErr IO MaintenanceModeVersion
getMaintenanceModeVersionTx = TxET QErr IO MaintenanceModeVersion
-> TxET QErr IO MaintenanceModeVersion
forall (m :: * -> *) a. MonadTx m => TxE QErr a -> m a
liftTx (TxET QErr IO MaintenanceModeVersion
 -> TxET QErr IO MaintenanceModeVersion)
-> TxET QErr IO MaintenanceModeVersion
-> TxET QErr IO MaintenanceModeVersion
forall a b. (a -> b) -> a -> b
$ do
  MetadataCatalogVersion
catalogVersion <- TxE QErr MetadataCatalogVersion
getCatalogVersion -- From the user's DB
  -- the previous version and the current version will change depending
  -- upon between which versions we need to support maintenance mode
  if
      | MetadataCatalogVersion
catalogVersion MetadataCatalogVersion -> MetadataCatalogVersion -> Bool
forall a. Eq a => a -> a -> Bool
== Int -> MetadataCatalogVersion
MetadataCatalogVersion Int
40 -> MaintenanceModeVersion -> TxET QErr IO MaintenanceModeVersion
forall (f :: * -> *) a. Applicative f => a -> f a
pure MaintenanceModeVersion
PreviousMMVersion
      -- The catalog is migrated to the 43rd version for a source
      -- which was initialised by a v1 graphql-engine instance (See @initSource@).
      | MetadataCatalogVersion
catalogVersion MetadataCatalogVersion -> MetadataCatalogVersion -> Bool
forall a. Eq a => a -> a -> Bool
== Int -> MetadataCatalogVersion
MetadataCatalogVersion Int
43 -> MaintenanceModeVersion -> TxET QErr IO MaintenanceModeVersion
forall (f :: * -> *) a. Applicative f => a -> f a
pure MaintenanceModeVersion
CurrentMMVersion
      | MetadataCatalogVersion
catalogVersion MetadataCatalogVersion -> MetadataCatalogVersion -> Bool
forall a. Eq a => a -> a -> Bool
== MetadataCatalogVersion
latestCatalogVersion -> MaintenanceModeVersion -> TxET QErr IO MaintenanceModeVersion
forall (f :: * -> *) a. Applicative f => a -> f a
pure MaintenanceModeVersion
CurrentMMVersion
      | Bool
otherwise ->
        Text -> TxET QErr IO MaintenanceModeVersion
forall (m :: * -> *) a. QErrM m => Text -> m a
throw500 (Text -> TxET QErr IO MaintenanceModeVersion)
-> Text -> TxET QErr IO MaintenanceModeVersion
forall a b. (a -> b) -> a -> b
$
          Text
"Maintenance mode is only supported with catalog versions: 40, 43 and "
            Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
forall a. Show a => a -> Text
tshow Text
latestCatalogVersionString

-- | Lock and return events not yet being processed or completed, up to some
-- limit. Process events approximately in created_at order, but we make no
-- ordering guarentees; events can and will race. Nevertheless we want to
-- ensure newer change events don't starve older ones.
fetchEvents :: SourceName -> [TriggerName] -> FetchBatchSize -> Q.TxE QErr [Event ('Postgres pgKind)]
fetchEvents :: SourceName
-> [TriggerName]
-> FetchBatchSize
-> TxE QErr [Event ('Postgres pgKind)]
fetchEvents SourceName
source [TriggerName]
triggerNames (FetchBatchSize Int
fetchBatchSize) =
  ((EventId, SchemaName, TableName, TriggerName, AltJ Value, Int,
  UTCTime)
 -> Event ('Postgres pgKind))
-> [(EventId, SchemaName, TableName, TriggerName, AltJ Value, Int,
     UTCTime)]
-> [Event ('Postgres pgKind)]
forall a b. (a -> b) -> [a] -> [b]
map (EventId, SchemaName, TableName, TriggerName, AltJ Value, Int,
 UTCTime)
-> Event ('Postgres pgKind)
uncurryEvent
    ([(EventId, SchemaName, TableName, TriggerName, AltJ Value, Int,
   UTCTime)]
 -> [Event ('Postgres pgKind)])
-> TxET
     QErr
     IO
     [(EventId, SchemaName, TableName, TriggerName, AltJ Value, Int,
       UTCTime)]
-> TxE QErr [Event ('Postgres pgKind)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> (Word64, PGTextArray)
-> Bool
-> TxET
     QErr
     IO
     [(EventId, SchemaName, TableName, TriggerName, AltJ Value, Int,
       UTCTime)]
forall (m :: * -> *) a r e.
(MonadIO m, FromRow a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m [a]
Q.listQE
      PGTxErr -> QErr
defaultTxErrorHandler
      [Q.sql|
      UPDATE hdb_catalog.event_log
      SET locked = NOW()
      WHERE id IN ( SELECT l.id
                    FROM hdb_catalog.event_log l
                    WHERE l.delivered = 'f' and l.error = 'f'
                          and (l.locked IS NULL or l.locked < (NOW() - interval '30 minute'))
                          and (l.next_retry_at is NULL or l.next_retry_at <= now())
                          and l.archived = 'f'
                          and l.trigger_name = ANY($2)
                    /* NB: this ordering is important for our index `event_log_fetch_events` */
                    /* (see `init_pg_source.sql`) */
                    ORDER BY locked NULLS FIRST, next_retry_at NULLS FIRST, created_at
                    LIMIT $1
                    FOR UPDATE SKIP LOCKED )
      RETURNING id, schema_name, table_name, trigger_name, payload::json, tries, created_at
      |]
      (Word64
limit, PGTextArray
triggerNamesTxt)
      Bool
True
  where
    uncurryEvent :: (EventId, SchemaName, TableName, TriggerName, AltJ Value, Int,
 UTCTime)
-> Event ('Postgres pgKind)
uncurryEvent (EventId
id', SchemaName
sourceName, TableName
tableName, TriggerName
triggerName, Q.AltJ Value
payload, Int
tries, UTCTime
created) =
      Event :: forall (b :: BackendType).
EventId
-> SourceName
-> TableName b
-> TriggerMetadata
-> Value
-> Int
-> UTCTime
-> Event b
Event
        { eId :: EventId
eId = EventId
id',
          eSource :: SourceName
eSource = SourceName
source,
          eTable :: TableName ('Postgres pgKind)
eTable = SchemaName -> TableName -> QualifiedTable
forall a. SchemaName -> a -> QualifiedObject a
QualifiedObject SchemaName
sourceName TableName
tableName,
          eTrigger :: TriggerMetadata
eTrigger = TriggerName -> TriggerMetadata
TriggerMetadata TriggerName
triggerName,
          eEvent :: Value
eEvent = Value
payload,
          eTries :: Int
eTries = Int
tries,
          eCreatedAt :: UTCTime
eCreatedAt = UTCTime
created
        }
    limit :: Word64
limit = Int -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
fetchBatchSize :: Word64

    triggerNamesTxt :: PGTextArray
triggerNamesTxt = [Text] -> PGTextArray
PGTextArray ([Text] -> PGTextArray) -> [Text] -> PGTextArray
forall a b. (a -> b) -> a -> b
$ TriggerName -> Text
triggerNameToTxt (TriggerName -> Text) -> [TriggerName] -> [Text]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [TriggerName]
triggerNames

fetchEventsMaintenanceMode :: SourceName -> [TriggerName] -> FetchBatchSize -> MaintenanceModeVersion -> Q.TxE QErr [Event ('Postgres pgKind)]
fetchEventsMaintenanceMode :: SourceName
-> [TriggerName]
-> FetchBatchSize
-> MaintenanceModeVersion
-> TxE QErr [Event ('Postgres pgKind)]
fetchEventsMaintenanceMode SourceName
sourceName [TriggerName]
triggerNames FetchBatchSize
fetchBatchSize = \case
  MaintenanceModeVersion
PreviousMMVersion ->
    ((EventId, SchemaName, TableName, TriggerName, AltJ Value, Int,
  UTCTime)
 -> Event ('Postgres pgKind))
-> [(EventId, SchemaName, TableName, TriggerName, AltJ Value, Int,
     UTCTime)]
-> [Event ('Postgres pgKind)]
forall a b. (a -> b) -> [a] -> [b]
map (EventId, SchemaName, TableName, TriggerName, AltJ Value, Int,
 UTCTime)
-> Event ('Postgres pgKind)
forall (b :: BackendType) a.
(TableName b ~ QualifiedObject a) =>
(EventId, SchemaName, a, TriggerName, AltJ Value, Int, UTCTime)
-> Event b
uncurryEvent
      ([(EventId, SchemaName, TableName, TriggerName, AltJ Value, Int,
   UTCTime)]
 -> [Event ('Postgres pgKind)])
-> TxET
     QErr
     IO
     [(EventId, SchemaName, TableName, TriggerName, AltJ Value, Int,
       UTCTime)]
-> TxE QErr [Event ('Postgres pgKind)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> Identity Word64
-> Bool
-> TxET
     QErr
     IO
     [(EventId, SchemaName, TableName, TriggerName, AltJ Value, Int,
       UTCTime)]
forall (m :: * -> *) a r e.
(MonadIO m, FromRow a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m [a]
Q.listQE
        PGTxErr -> QErr
defaultTxErrorHandler
        [Q.sql|
        UPDATE hdb_catalog.event_log
        SET locked = 't'
        WHERE id IN ( SELECT l.id
                      FROM hdb_catalog.event_log l
                      WHERE l.delivered = 'f' and l.error = 'f' and l.locked = 'f'
                            and (l.next_retry_at is NULL or l.next_retry_at <= now())
                            and l.archived = 'f'
                      ORDER BY created_at
                      LIMIT $1
                      FOR UPDATE SKIP LOCKED )
        RETURNING id, schema_name, table_name, trigger_name, payload::json, tries, created_at
        |]
        (Word64 -> Identity Word64
forall a. a -> Identity a
Identity Word64
limit)
        Bool
True
    where
      uncurryEvent :: (EventId, SchemaName, a, TriggerName, AltJ Value, Int, UTCTime)
-> Event b
uncurryEvent (EventId
id', SchemaName
sn, a
tn, TriggerName
trn, Q.AltJ Value
payload, Int
tries, UTCTime
created) =
        Event :: forall (b :: BackendType).
EventId
-> SourceName
-> TableName b
-> TriggerMetadata
-> Value
-> Int
-> UTCTime
-> Event b
Event
          { eId :: EventId
eId = EventId
id',
            eSource :: SourceName
eSource = SourceName
SNDefault, -- in v1, there'll only be the default source
            eTable :: TableName b
eTable = SchemaName -> a -> QualifiedObject a
forall a. SchemaName -> a -> QualifiedObject a
QualifiedObject SchemaName
sn a
tn,
            eTrigger :: TriggerMetadata
eTrigger = TriggerName -> TriggerMetadata
TriggerMetadata TriggerName
trn,
            eEvent :: Value
eEvent = Value
payload,
            eTries :: Int
eTries = Int
tries,
            eCreatedAt :: UTCTime
eCreatedAt = UTCTime
created
          }
      limit :: Word64
limit = Int -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (FetchBatchSize -> Int
_unFetchBatchSize FetchBatchSize
fetchBatchSize) :: Word64
  MaintenanceModeVersion
CurrentMMVersion -> SourceName
-> [TriggerName]
-> FetchBatchSize
-> TxE QErr [Event ('Postgres pgKind)]
forall (pgKind :: PostgresKind).
SourceName
-> [TriggerName]
-> FetchBatchSize
-> TxE QErr [Event ('Postgres pgKind)]
fetchEvents SourceName
sourceName [TriggerName]
triggerNames FetchBatchSize
fetchBatchSize

setSuccessTx :: Event ('Postgres pgKind) -> MaintenanceMode MaintenanceModeVersion -> Q.TxE QErr ()
setSuccessTx :: Event ('Postgres pgKind)
-> MaintenanceMode MaintenanceModeVersion -> TxET QErr IO ()
setSuccessTx Event ('Postgres pgKind)
e = \case
  (MaintenanceModeEnabled MaintenanceModeVersion
PreviousMMVersion) ->
    (PGTxErr -> QErr)
-> Query -> Identity EventId -> Bool -> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
Q.unitQE
      PGTxErr -> QErr
defaultTxErrorHandler
      [Q.sql|
    UPDATE hdb_catalog.event_log
    SET delivered = 't', next_retry_at = NULL, locked = 'f'
    WHERE id = $1
    |]
      (EventId -> Identity EventId
forall a. a -> Identity a
Identity (EventId -> Identity EventId) -> EventId -> Identity EventId
forall a b. (a -> b) -> a -> b
$ Event ('Postgres pgKind) -> EventId
forall (b :: BackendType). Event b -> EventId
eId Event ('Postgres pgKind)
e)
      Bool
True
  (MaintenanceModeEnabled MaintenanceModeVersion
CurrentMMVersion) -> TxET QErr IO ()
latestVersionSetSuccess
  MaintenanceMode MaintenanceModeVersion
MaintenanceModeDisabled -> TxET QErr IO ()
latestVersionSetSuccess
  where
    latestVersionSetSuccess :: TxET QErr IO ()
latestVersionSetSuccess =
      (PGTxErr -> QErr)
-> Query -> Identity EventId -> Bool -> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
Q.unitQE
        PGTxErr -> QErr
defaultTxErrorHandler
        [Q.sql|
      UPDATE hdb_catalog.event_log
      SET delivered = 't', next_retry_at = NULL, locked = NULL
      WHERE id = $1
      |]
        (EventId -> Identity EventId
forall a. a -> Identity a
Identity (EventId -> Identity EventId) -> EventId -> Identity EventId
forall a b. (a -> b) -> a -> b
$ Event ('Postgres pgKind) -> EventId
forall (b :: BackendType). Event b -> EventId
eId Event ('Postgres pgKind)
e)
        Bool
True

setErrorTx :: Event ('Postgres pgKind) -> MaintenanceMode MaintenanceModeVersion -> Q.TxE QErr ()
setErrorTx :: Event ('Postgres pgKind)
-> MaintenanceMode MaintenanceModeVersion -> TxET QErr IO ()
setErrorTx Event ('Postgres pgKind)
e = \case
  (MaintenanceModeEnabled MaintenanceModeVersion
PreviousMMVersion) ->
    (PGTxErr -> QErr)
-> Query -> Identity EventId -> Bool -> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
Q.unitQE
      PGTxErr -> QErr
defaultTxErrorHandler
      [Q.sql|
    UPDATE hdb_catalog.event_log
    SET error = 't', next_retry_at = NULL, locked = 'f'
    WHERE id = $1
    |]
      (EventId -> Identity EventId
forall a. a -> Identity a
Identity (EventId -> Identity EventId) -> EventId -> Identity EventId
forall a b. (a -> b) -> a -> b
$ Event ('Postgres pgKind) -> EventId
forall (b :: BackendType). Event b -> EventId
eId Event ('Postgres pgKind)
e)
      Bool
True
  (MaintenanceModeEnabled MaintenanceModeVersion
CurrentMMVersion) -> TxET QErr IO ()
latestVersionSetError
  MaintenanceMode MaintenanceModeVersion
MaintenanceModeDisabled -> TxET QErr IO ()
latestVersionSetError
  where
    latestVersionSetError :: TxET QErr IO ()
latestVersionSetError =
      (PGTxErr -> QErr)
-> Query -> Identity EventId -> Bool -> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
Q.unitQE
        PGTxErr -> QErr
defaultTxErrorHandler
        [Q.sql|
      UPDATE hdb_catalog.event_log
      SET error = 't', next_retry_at = NULL, locked = NULL
      WHERE id = $1
      |]
        (EventId -> Identity EventId
forall a. a -> Identity a
Identity (EventId -> Identity EventId) -> EventId -> Identity EventId
forall a b. (a -> b) -> a -> b
$ Event ('Postgres pgKind) -> EventId
forall (b :: BackendType). Event b -> EventId
eId Event ('Postgres pgKind)
e)
        Bool
True

setRetryTx :: Event ('Postgres pgKind) -> Time.UTCTime -> MaintenanceMode MaintenanceModeVersion -> Q.TxE QErr ()
setRetryTx :: Event ('Postgres pgKind)
-> UTCTime
-> MaintenanceMode MaintenanceModeVersion
-> TxET QErr IO ()
setRetryTx Event ('Postgres pgKind)
e UTCTime
time = \case
  (MaintenanceModeEnabled MaintenanceModeVersion
PreviousMMVersion) ->
    (PGTxErr -> QErr)
-> Query -> (UTCTime, EventId) -> Bool -> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
Q.unitQE
      PGTxErr -> QErr
defaultTxErrorHandler
      [Q.sql|
    UPDATE hdb_catalog.event_log
    SET next_retry_at = $1, locked = 'f'
    WHERE id = $2
    |]
      (UTCTime
time, Event ('Postgres pgKind) -> EventId
forall (b :: BackendType). Event b -> EventId
eId Event ('Postgres pgKind)
e)
      Bool
True
  (MaintenanceModeEnabled MaintenanceModeVersion
CurrentMMVersion) -> TxET QErr IO ()
latestVersionSetRetry
  MaintenanceMode MaintenanceModeVersion
MaintenanceModeDisabled -> TxET QErr IO ()
latestVersionSetRetry
  where
    latestVersionSetRetry :: TxET QErr IO ()
latestVersionSetRetry =
      (PGTxErr -> QErr)
-> Query -> (UTCTime, EventId) -> Bool -> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
Q.unitQE
        PGTxErr -> QErr
defaultTxErrorHandler
        [Q.sql|
              UPDATE hdb_catalog.event_log
              SET next_retry_at = $1, locked = NULL
              WHERE id = $2
              |]
        (UTCTime
time, Event ('Postgres pgKind) -> EventId
forall (b :: BackendType). Event b -> EventId
eId Event ('Postgres pgKind)
e)
        Bool
True

dropTriggerQ :: TriggerName -> Q.TxE QErr ()
dropTriggerQ :: TriggerName -> TxET QErr IO ()
dropTriggerQ TriggerName
trn =
  (Ops -> TxET QErr IO ()) -> [Ops] -> TxET QErr IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (TriggerName -> Ops -> TxET QErr IO ()
dropTriggerOp TriggerName
trn) [Ops
INSERT, Ops
UPDATE, Ops
DELETE]

dropTriggerOp :: TriggerName -> Ops -> Q.TxE QErr ()
dropTriggerOp :: TriggerName -> Ops -> TxET QErr IO ()
dropTriggerOp TriggerName
triggerName Ops
triggerOp =
  (PGTxErr -> QErr) -> Query -> () -> Bool -> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
Q.unitQE
    PGTxErr -> QErr
defaultTxErrorHandler
    (Text -> Query
Q.fromText (Text -> Query) -> Text -> Query
forall a b. (a -> b) -> a -> b
$ Ops -> Text
getDropFuncSql Ops
triggerOp)
    ()
    Bool
False
  where
    getDropFuncSql :: Ops -> Text
    getDropFuncSql :: Ops -> Text
getDropFuncSql Ops
op =
      Text
"DROP FUNCTION IF EXISTS"
        Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" hdb_catalog."
        Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> QualifiedTriggerName -> Text
unQualifiedTriggerName (Ops -> TriggerName -> QualifiedTriggerName
pgIdenTrigger Ops
op TriggerName
triggerName)
        Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"()"
        Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" CASCADE"

checkEvent :: EventId -> Q.TxE QErr ()
checkEvent :: EventId -> TxET QErr IO ()
checkEvent EventId
eid = do
  [Identity Bool]
events <-
    (PGTxErr -> QErr)
-> Query
-> Identity EventId
-> Bool
-> TxET QErr IO [Identity Bool]
forall (m :: * -> *) a r e.
(MonadIO m, FromRow a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m [a]
Q.listQE
      PGTxErr -> QErr
defaultTxErrorHandler
      [Q.sql|
              SELECT l.locked IS NOT NULL AND l.locked >= (NOW() - interval '30 minute')
              FROM hdb_catalog.event_log l
              WHERE l.id = $1
              |]
      (EventId -> Identity EventId
forall a. a -> Identity a
Identity EventId
eid)
      Bool
True
  Identity Bool
event <- [Identity Bool] -> TxET QErr IO (Identity Bool)
forall (m :: * -> *) a. MonadError QErr m => [a] -> m a
getEvent [Identity Bool]
events
  Identity Bool -> TxET QErr IO ()
forall (f :: * -> *). MonadError QErr f => Identity Bool -> f ()
assertEventUnlocked Identity Bool
event
  where
    getEvent :: [a] -> m a
getEvent [] = Code -> Text -> m a
forall (m :: * -> *) a. QErrM m => Code -> Text -> m a
throw400 Code
NotExists Text
"event not found"
    getEvent (a
x : [a]
_) = a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
x

    assertEventUnlocked :: Identity Bool -> f ()
assertEventUnlocked (Identity Bool
locked) =
      Bool -> f () -> f ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
locked (f () -> f ()) -> f () -> f ()
forall a b. (a -> b) -> a -> b
$
        Code -> Text -> f ()
forall (m :: * -> *) a. QErrM m => Code -> Text -> m a
throw400 Code
Busy Text
"event is already being processed"

markForDelivery :: EventId -> Q.TxE QErr ()
markForDelivery :: EventId -> TxET QErr IO ()
markForDelivery EventId
eid =
  (PGTxErr -> QErr)
-> Query -> Identity EventId -> Bool -> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
Q.unitQE
    PGTxErr -> QErr
defaultTxErrorHandler
    [Q.sql|
          UPDATE hdb_catalog.event_log
          SET
          delivered = 'f',
          error = 'f',
          tries = 0
          WHERE id = $1
          |]
    (EventId -> Identity EventId
forall a. a -> Identity a
Identity EventId
eid)
    Bool
True

redeliverEventTx :: EventId -> Q.TxE QErr ()
redeliverEventTx :: EventId -> TxET QErr IO ()
redeliverEventTx EventId
eventId = do
  EventId -> TxET QErr IO ()
checkEvent EventId
eventId
  EventId -> TxET QErr IO ()
markForDelivery EventId
eventId

-- | unlockEvents takes an array of 'EventId' and unlocks them. This function is called
--   when a graceful shutdown is initiated.
unlockEventsTx :: [EventId] -> Q.TxE QErr Int
unlockEventsTx :: [EventId] -> TxET QErr IO Int
unlockEventsTx [EventId]
eventIds =
  Identity Int -> Int
forall a. Identity a -> a
runIdentity (Identity Int -> Int)
-> (SingleRow (Identity Int) -> Identity Int)
-> SingleRow (Identity Int)
-> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SingleRow (Identity Int) -> Identity Int
forall a. SingleRow a -> a
Q.getRow
    (SingleRow (Identity Int) -> Int)
-> TxET QErr IO (SingleRow (Identity Int)) -> TxET QErr IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> Identity PGTextArray
-> Bool
-> TxET QErr IO (SingleRow (Identity Int))
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
Q.withQE
      PGTxErr -> QErr
defaultTxErrorHandler
      [Q.sql|
     WITH "cte" AS
     (UPDATE hdb_catalog.event_log
     SET locked = NULL
     WHERE id = ANY($1::text[])
     -- only unlock those events that have been locked, it's possible
     -- that an event has been processed but not yet been removed from
     -- the saved locked events, which will lead to a double send
     AND locked IS NOT NULL
     RETURNING *)
     SELECT count(*) FROM "cte"
   |]
      (PGTextArray -> Identity PGTextArray
forall a. a -> Identity a
Identity (PGTextArray -> Identity PGTextArray)
-> PGTextArray -> Identity PGTextArray
forall a b. (a -> b) -> a -> b
$ [Text] -> PGTextArray
PGTextArray ([Text] -> PGTextArray) -> [Text] -> PGTextArray
forall a b. (a -> b) -> a -> b
$ (EventId -> Text) -> [EventId] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map EventId -> Text
unEventId [EventId]
eventIds)
      Bool
True

---- Postgres event trigger utility functions ---------------------

-- | QualifiedTriggerName is a type to store the name of the SQL trigger.
--   An example of it is `"notify_hasura_users_all_INSERT"` where `users_all`
--   is the name of the event trigger.
newtype QualifiedTriggerName = QualifiedTriggerName {QualifiedTriggerName -> Text
unQualifiedTriggerName :: Text}
  deriving (Int -> QualifiedTriggerName -> ShowS
[QualifiedTriggerName] -> ShowS
QualifiedTriggerName -> String
(Int -> QualifiedTriggerName -> ShowS)
-> (QualifiedTriggerName -> String)
-> ([QualifiedTriggerName] -> ShowS)
-> Show QualifiedTriggerName
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [QualifiedTriggerName] -> ShowS
$cshowList :: [QualifiedTriggerName] -> ShowS
show :: QualifiedTriggerName -> String
$cshow :: QualifiedTriggerName -> String
showsPrec :: Int -> QualifiedTriggerName -> ShowS
$cshowsPrec :: Int -> QualifiedTriggerName -> ShowS
Show, QualifiedTriggerName -> QualifiedTriggerName -> Bool
(QualifiedTriggerName -> QualifiedTriggerName -> Bool)
-> (QualifiedTriggerName -> QualifiedTriggerName -> Bool)
-> Eq QualifiedTriggerName
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: QualifiedTriggerName -> QualifiedTriggerName -> Bool
$c/= :: QualifiedTriggerName -> QualifiedTriggerName -> Bool
== :: QualifiedTriggerName -> QualifiedTriggerName -> Bool
$c== :: QualifiedTriggerName -> QualifiedTriggerName -> Bool
Eq, QualifiedTriggerName -> PrepArg
(QualifiedTriggerName -> PrepArg) -> ToPrepArg QualifiedTriggerName
forall a. (a -> PrepArg) -> ToPrepArg a
toPrepVal :: QualifiedTriggerName -> PrepArg
$ctoPrepVal :: QualifiedTriggerName -> PrepArg
Q.ToPrepArg)

pgTriggerName :: Ops -> TriggerName -> QualifiedTriggerName
pgTriggerName :: Ops -> TriggerName -> QualifiedTriggerName
pgTriggerName Ops
op TriggerName
trn = Ops -> Text -> QualifiedTriggerName
forall a. Show a => a -> Text -> QualifiedTriggerName
qualifyTriggerName Ops
op (Text -> QualifiedTriggerName) -> Text -> QualifiedTriggerName
forall a b. (a -> b) -> a -> b
$ TriggerName -> Text
triggerNameToTxt TriggerName
trn
  where
    qualifyTriggerName :: a -> Text -> QualifiedTriggerName
qualifyTriggerName a
op' Text
trn' =
      Text -> QualifiedTriggerName
QualifiedTriggerName (Text -> QualifiedTriggerName) -> Text -> QualifiedTriggerName
forall a b. (a -> b) -> a -> b
$ Text
"notify_hasura_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
trn' Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> a -> Text
forall a. Show a => a -> Text
tshow a
op'

pgIdenTrigger :: Ops -> TriggerName -> QualifiedTriggerName
pgIdenTrigger :: Ops -> TriggerName -> QualifiedTriggerName
pgIdenTrigger Ops
op = Text -> QualifiedTriggerName
QualifiedTriggerName (Text -> QualifiedTriggerName)
-> (TriggerName -> Text) -> TriggerName -> QualifiedTriggerName
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> Text
pgFmtIdentifier (Text -> Text) -> (TriggerName -> Text) -> TriggerName -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. QualifiedTriggerName -> Text
unQualifiedTriggerName (QualifiedTriggerName -> Text)
-> (TriggerName -> QualifiedTriggerName) -> TriggerName -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Ops -> TriggerName -> QualifiedTriggerName
pgTriggerName Ops
op

-- | pgIdenTrigger is a method used to construct the name of the pg function
-- used for event triggers which are present in the hdb_catalog schema.

-- | Define the pgSQL trigger functions on database events.
mkTriggerFunctionQ ::
  forall pgKind m.
  (Backend ('Postgres pgKind), MonadTx m, MonadReader ServerConfigCtx m) =>
  TriggerName ->
  QualifiedTable ->
  [ColumnInfo ('Postgres pgKind)] ->
  Ops ->
  SubscribeOpSpec ('Postgres pgKind) ->
  m QualifiedTriggerName
mkTriggerFunctionQ :: TriggerName
-> QualifiedTable
-> [ColumnInfo ('Postgres pgKind)]
-> Ops
-> SubscribeOpSpec ('Postgres pgKind)
-> m QualifiedTriggerName
mkTriggerFunctionQ TriggerName
triggerName (QualifiedObject SchemaName
schema TableName
table) [ColumnInfo ('Postgres pgKind)]
allCols Ops
op (SubscribeOpSpec SubscribeColumns ('Postgres pgKind)
listenColumns Maybe (SubscribeColumns ('Postgres pgKind))
deliveryColumns') = do
  StringifyNumbers
strfyNum <- SQLGenCtx -> StringifyNumbers
stringifyNum (SQLGenCtx -> StringifyNumbers)
-> (ServerConfigCtx -> SQLGenCtx)
-> ServerConfigCtx
-> StringifyNumbers
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ServerConfigCtx -> SQLGenCtx
_sccSQLGenCtx (ServerConfigCtx -> StringifyNumbers)
-> m ServerConfigCtx -> m StringifyNumbers
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m ServerConfigCtx
forall r (m :: * -> *). MonadReader r m => m r
ask
  let dbQualifiedTriggerName :: QualifiedTriggerName
dbQualifiedTriggerName = Ops -> TriggerName -> QualifiedTriggerName
pgIdenTrigger Ops
op TriggerName
triggerName
  () <-
    TxET QErr IO () -> m ()
forall (m :: * -> *) a. MonadTx m => TxE QErr a -> m a
liftTx (TxET QErr IO () -> m ()) -> TxET QErr IO () -> m ()
forall a b. (a -> b) -> a -> b
$
      (PGTxErr -> QErr) -> Query -> TxET QErr IO ()
forall (m :: * -> *) a e.
(MonadIO m, FromRes a) =>
(PGTxErr -> e) -> Query -> TxET e m a
Q.multiQE PGTxErr -> QErr
defaultTxErrorHandler (Query -> TxET QErr IO ()) -> Query -> TxET QErr IO ()
forall a b. (a -> b) -> a -> b
$
        Text -> Query
Q.fromText (Text -> Query) -> (Text -> Text) -> Text -> Query
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> Text
TL.toStrict (Text -> Query) -> Text -> Query
forall a b. (a -> b) -> a -> b
$
          let -- If there are no specific delivery columns selected by user then all the columns will be delivered
              -- in payload hence 'SubCStar'.
              deliveryColumns :: SubscribeColumns ('Postgres pgKind)
deliveryColumns = SubscribeColumns ('Postgres pgKind)
-> Maybe (SubscribeColumns ('Postgres pgKind))
-> SubscribeColumns ('Postgres pgKind)
forall a. a -> Maybe a -> a
fromMaybe SubscribeColumns ('Postgres pgKind)
forall (b :: BackendType). SubscribeColumns b
SubCStar Maybe (SubscribeColumns ('Postgres pgKind))
deliveryColumns'
              getApplicableColumns :: SubscribeColumns ('Postgres pgKind)
-> [ColumnInfo ('Postgres pgKind)]
getApplicableColumns = \case
                SubscribeColumns ('Postgres pgKind)
SubCStar -> [ColumnInfo ('Postgres pgKind)]
allCols
                SubCArray [Column ('Postgres pgKind)]
cols -> [Column ('Postgres pgKind)]
-> [ColumnInfo ('Postgres pgKind)]
-> [ColumnInfo ('Postgres pgKind)]
forall (b :: BackendType).
Backend b =>
[Column b] -> [ColumnInfo b] -> [ColumnInfo b]
getColInfos [Column ('Postgres pgKind)]
cols [ColumnInfo ('Postgres pgKind)]
allCols

              -- Columns that should be present in the payload. By default, all columns are present.
              applicableDeliveryCols :: [ColumnInfo ('Postgres pgKind)]
applicableDeliveryCols = SubscribeColumns ('Postgres pgKind)
-> [ColumnInfo ('Postgres pgKind)]
getApplicableColumns SubscribeColumns ('Postgres pgKind)
deliveryColumns
              getRowExpression :: OpVar -> SQLExp
getRowExpression OpVar
opVar = SQLExp -> SQLExp
applyRowToJson' (SQLExp -> SQLExp) -> SQLExp -> SQLExp
forall a b. (a -> b) -> a -> b
$ OpVar
-> StringifyNumbers -> [ColumnInfo ('Postgres pgKind)] -> SQLExp
mkRowExpression OpVar
opVar StringifyNumbers
strfyNum [ColumnInfo ('Postgres pgKind)]
applicableDeliveryCols

              -- Columns that user subscribed to listen for changes. By default, we listen on all columns.
              applicableListenCols :: [ColumnInfo ('Postgres pgKind)]
applicableListenCols = SubscribeColumns ('Postgres pgKind)
-> [ColumnInfo ('Postgres pgKind)]
getApplicableColumns SubscribeColumns ('Postgres pgKind)
listenColumns
              renderRow :: OpVar -> SQLExp
renderRow OpVar
opVar = SQLExp -> SQLExp
applyRow (SQLExp -> SQLExp) -> SQLExp -> SQLExp
forall a b. (a -> b) -> a -> b
$ OpVar
-> StringifyNumbers -> [ColumnInfo ('Postgres pgKind)] -> SQLExp
mkRowExpression OpVar
opVar StringifyNumbers
strfyNum [ColumnInfo ('Postgres pgKind)]
applicableListenCols

              oldDataExp :: SQLExp
oldDataExp = case Ops
op of
                Ops
INSERT -> SQLExp
SENull
                Ops
UPDATE -> OpVar -> SQLExp
getRowExpression OpVar
OLD
                Ops
DELETE -> OpVar -> SQLExp
getRowExpression OpVar
OLD
                Ops
MANUAL -> SQLExp
SENull
              newDataExp :: SQLExp
newDataExp = case Ops
op of
                Ops
INSERT -> OpVar -> SQLExp
getRowExpression OpVar
NEW
                Ops
UPDATE -> OpVar -> SQLExp
getRowExpression OpVar
NEW
                Ops
DELETE -> SQLExp
SENull
                Ops
MANUAL -> SQLExp
SENull

              name :: Text
name = TriggerName -> Text
triggerNameToTxt TriggerName
triggerName
              qualifiedTriggerName :: Text
qualifiedTriggerName = QualifiedTriggerName -> Text
unQualifiedTriggerName QualifiedTriggerName
dbQualifiedTriggerName
              schemaName :: Text
schemaName = Text -> Text
pgFmtLit (Text -> Text) -> Text -> Text
forall a b. (a -> b) -> a -> b
$ SchemaName -> Text
getSchemaTxt SchemaName
schema
              tableName :: Text
tableName = Text -> Text
pgFmtLit (Text -> Text) -> Text -> Text
forall a b. (a -> b) -> a -> b
$ TableName -> Text
getTableTxt TableName
table

              oldRow :: Text
oldRow = SQLExp -> Text
forall a. ToSQL a => a -> Text
toSQLTxt (SQLExp -> Text) -> SQLExp -> Text
forall a b. (a -> b) -> a -> b
$ OpVar -> SQLExp
renderRow OpVar
OLD
              newRow :: Text
newRow = SQLExp -> Text
forall a. ToSQL a => a -> Text
toSQLTxt (SQLExp -> Text) -> SQLExp -> Text
forall a b. (a -> b) -> a -> b
$ OpVar -> SQLExp
renderRow OpVar
NEW
              oldPayloadExpression :: Text
oldPayloadExpression = SQLExp -> Text
forall a. ToSQL a => a -> Text
toSQLTxt SQLExp
oldDataExp
              newPayloadExpression :: Text
newPayloadExpression = SQLExp -> Text
forall a. ToSQL a => a -> Text
toSQLTxt SQLExp
newDataExp
           in $(makeRelativeToProject "src-rsr/trigger.sql.shakespeare" >>= ST.stextFile)
  QualifiedTriggerName -> m QualifiedTriggerName
forall (f :: * -> *) a. Applicative f => a -> f a
pure QualifiedTriggerName
dbQualifiedTriggerName
  where
    applyRowToJson' :: SQLExp -> SQLExp
applyRowToJson' SQLExp
e = Text -> [SQLExp] -> Maybe OrderByExp -> SQLExp
SEFnApp Text
"row_to_json" [SQLExp
e] Maybe OrderByExp
forall a. Maybe a
Nothing
    applyRow :: SQLExp -> SQLExp
applyRow SQLExp
e = Text -> [SQLExp] -> Maybe OrderByExp -> SQLExp
SEFnApp Text
"row" [SQLExp
e] Maybe OrderByExp
forall a. Maybe a
Nothing
    opToQual :: OpVar -> Qual
opToQual = Text -> Qual
QualVar (Text -> Qual) -> (OpVar -> Text) -> OpVar -> Qual
forall b c a. (b -> c) -> (a -> b) -> a -> c
. OpVar -> Text
forall a. Show a => a -> Text
tshow

    mkRowExpression :: OpVar
-> StringifyNumbers -> [ColumnInfo ('Postgres pgKind)] -> SQLExp
mkRowExpression OpVar
opVar StringifyNumbers
strfyNum [ColumnInfo ('Postgres pgKind)]
columns =
      [Extractor] -> SQLExp
mkRowExp ([Extractor] -> SQLExp) -> [Extractor] -> SQLExp
forall a b. (a -> b) -> a -> b
$ (ColumnInfo ('Postgres pgKind) -> Extractor)
-> [ColumnInfo ('Postgres pgKind)] -> [Extractor]
forall a b. (a -> b) -> [a] -> [b]
map (\ColumnInfo ('Postgres pgKind)
col -> SQLExp -> ColumnInfo ('Postgres pgKind) -> Extractor
forall (b :: BackendType).
(Column b ~ PGCol, ScalarType b ~ PGScalarType) =>
SQLExp -> ColumnInfo b -> Extractor
toExtractor (OpVar
-> StringifyNumbers -> ColumnInfo ('Postgres pgKind) -> SQLExp
mkQId OpVar
opVar StringifyNumbers
strfyNum ColumnInfo ('Postgres pgKind)
col) ColumnInfo ('Postgres pgKind)
col) [ColumnInfo ('Postgres pgKind)]
columns

    mkQId :: OpVar
-> StringifyNumbers -> ColumnInfo ('Postgres pgKind) -> SQLExp
mkQId OpVar
opVar StringifyNumbers
strfyNum ColumnInfo ('Postgres pgKind)
colInfo =
      StringifyNumbers
-> ColumnType ('Postgres pgKind)
-> Bool
-> Maybe NamingCase
-> SQLExp
-> SQLExp
forall (pgKind :: PostgresKind).
StringifyNumbers
-> ColumnType ('Postgres pgKind)
-> Bool
-> Maybe NamingCase
-> SQLExp
-> SQLExp
toJSONableExp StringifyNumbers
strfyNum (ColumnInfo ('Postgres pgKind) -> ColumnType ('Postgres pgKind)
forall (b :: BackendType). ColumnInfo b -> ColumnType b
ciType ColumnInfo ('Postgres pgKind)
colInfo) Bool
False Maybe NamingCase
forall a. Maybe a
Nothing (SQLExp -> SQLExp) -> SQLExp -> SQLExp
forall a b. (a -> b) -> a -> b
$
        QIdentifier -> SQLExp
SEQIdentifier (QIdentifier -> SQLExp) -> QIdentifier -> SQLExp
forall a b. (a -> b) -> a -> b
$ Qual -> Identifier -> QIdentifier
QIdentifier (OpVar -> Qual
opToQual OpVar
opVar) (Identifier -> QIdentifier) -> Identifier -> QIdentifier
forall a b. (a -> b) -> a -> b
$ PGCol -> Identifier
forall a. IsIdentifier a => a -> Identifier
toIdentifier (PGCol -> Identifier) -> PGCol -> Identifier
forall a b. (a -> b) -> a -> b
$ ColumnInfo ('Postgres pgKind) -> Column ('Postgres pgKind)
forall (b :: BackendType). ColumnInfo b -> Column b
ciColumn ColumnInfo ('Postgres pgKind)
colInfo

    -- Generate the SQL expression
    toExtractor :: SQLExp -> ColumnInfo b -> Extractor
toExtractor SQLExp
sqlExp ColumnInfo b
column
      -- If the column type is either 'Geography' or 'Geometry', then after applying the 'ST_AsGeoJSON' function
      -- to the column, alias the value of the expression with the column name else it uses `st_asgeojson` as
      -- the column name.
      | (ScalarType b -> Bool) -> ColumnType b -> Bool
forall (b :: BackendType).
(ScalarType b -> Bool) -> ColumnType b -> Bool
isScalarColumnWhere ScalarType b -> Bool
PGScalarType -> Bool
isGeoType (ColumnInfo b -> ColumnType b
forall (b :: BackendType). ColumnInfo b -> ColumnType b
ciType ColumnInfo b
column) = SQLExp -> Maybe ColumnAlias -> Extractor
Extractor SQLExp
sqlExp (ColumnAlias -> Maybe ColumnAlias
forall a. a -> Maybe a
Just (ColumnAlias -> Maybe ColumnAlias)
-> ColumnAlias -> Maybe ColumnAlias
forall a b. (a -> b) -> a -> b
$ ColumnInfo b -> ColumnAlias
forall (b :: BackendType).
(Column b ~ PGCol) =>
ColumnInfo b -> ColumnAlias
getAlias ColumnInfo b
column)
      | Bool
otherwise = SQLExp -> Maybe ColumnAlias -> Extractor
Extractor SQLExp
sqlExp Maybe ColumnAlias
forall a. Maybe a
Nothing
    getAlias :: ColumnInfo b -> ColumnAlias
getAlias ColumnInfo b
col = Identifier -> ColumnAlias
forall a. IsIdentifier a => a -> ColumnAlias
toColumnAlias (Identifier -> ColumnAlias) -> Identifier -> ColumnAlias
forall a b. (a -> b) -> a -> b
$ Text -> Identifier
Identifier (Text -> Identifier) -> Text -> Identifier
forall a b. (a -> b) -> a -> b
$ PGCol -> Text
getPGColTxt (ColumnInfo b -> Column b
forall (b :: BackendType). ColumnInfo b -> Column b
ciColumn ColumnInfo b
col)

checkIfTriggerExistsForTableQ ::
  QualifiedTriggerName ->
  QualifiedTable ->
  Q.TxE QErr Bool
checkIfTriggerExistsForTableQ :: QualifiedTriggerName -> QualifiedTable -> TxET QErr IO Bool
checkIfTriggerExistsForTableQ (QualifiedTriggerName Text
triggerName) (QualifiedObject SchemaName
schemaName TableName
tableName) =
  (SingleRow (Identity Bool) -> Bool)
-> TxET QErr IO (SingleRow (Identity Bool)) -> TxET QErr IO Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Identity Bool -> Bool
forall a. Identity a -> a
runIdentity (Identity Bool -> Bool)
-> (SingleRow (Identity Bool) -> Identity Bool)
-> SingleRow (Identity Bool)
-> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SingleRow (Identity Bool) -> Identity Bool
forall a. SingleRow a -> a
Q.getRow) (TxET QErr IO (SingleRow (Identity Bool)) -> TxET QErr IO Bool)
-> TxET QErr IO (SingleRow (Identity Bool)) -> TxET QErr IO Bool
forall a b. (a -> b) -> a -> b
$
    (PGTxErr -> QErr)
-> Query
-> (Text, SchemaName, TableName)
-> Bool
-> TxET QErr IO (SingleRow (Identity Bool))
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
Q.withQE
      PGTxErr -> QErr
defaultTxErrorHandler
      -- 'regclass' converts non-quoted strings to lowercase but since identifiers
      -- such as table name needs are case-sensitive, we add quotes to table name
      -- using 'quote_ident'.
      -- Ref: https://www.postgresql.org/message-id/3896142.1620136761%40sss.pgh.pa.us
      [Q.sql|
      SELECT EXISTS (
        SELECT 1
        FROM pg_trigger
        WHERE NOT tgisinternal
        AND tgname = $1 AND tgrelid = (quote_ident($2) || '.' || quote_ident($3))::regclass
        )
     |]
      (Text
triggerName, SchemaName
schemaName, TableName
tableName)
      Bool
True

checkIfFunctionExistsQ ::
  TriggerName ->
  Ops ->
  Q.TxE QErr Bool
checkIfFunctionExistsQ :: TriggerName -> Ops -> TxET QErr IO Bool
checkIfFunctionExistsQ TriggerName
triggerName Ops
op = do
  let qualifiedTriggerName :: QualifiedTriggerName
qualifiedTriggerName = Ops -> TriggerName -> QualifiedTriggerName
pgTriggerName Ops
op TriggerName
triggerName
  (SingleRow (Identity Bool) -> Bool)
-> TxET QErr IO (SingleRow (Identity Bool)) -> TxET QErr IO Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Identity Bool -> Bool
forall a. Identity a -> a
runIdentity (Identity Bool -> Bool)
-> (SingleRow (Identity Bool) -> Identity Bool)
-> SingleRow (Identity Bool)
-> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SingleRow (Identity Bool) -> Identity Bool
forall a. SingleRow a -> a
Q.getRow) (TxET QErr IO (SingleRow (Identity Bool)) -> TxET QErr IO Bool)
-> TxET QErr IO (SingleRow (Identity Bool)) -> TxET QErr IO Bool
forall a b. (a -> b) -> a -> b
$
    (PGTxErr -> QErr)
-> Query
-> Identity QualifiedTriggerName
-> Bool
-> TxET QErr IO (SingleRow (Identity Bool))
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
Q.withQE
      PGTxErr -> QErr
defaultTxErrorHandler
      [Q.sql|
      SELECT EXISTS (
        SELECT 1
        FROM pg_catalog.pg_proc
        JOIN pg_namespace ON pg_catalog.pg_proc.pronamespace = pg_namespace.oid
        WHERE proname = $1
        AND pg_namespace.nspname = 'hdb_catalog'
        )
     |]
      (QualifiedTriggerName -> Identity QualifiedTriggerName
forall a. a -> Identity a
Identity QualifiedTriggerName
qualifiedTriggerName)
      Bool
True

mkTrigger ::
  forall pgKind m.
  (Backend ('Postgres pgKind), MonadTx m, MonadReader ServerConfigCtx m) =>
  TriggerName ->
  QualifiedTable ->
  [ColumnInfo ('Postgres pgKind)] ->
  Ops ->
  SubscribeOpSpec ('Postgres pgKind) ->
  m ()
mkTrigger :: TriggerName
-> QualifiedTable
-> [ColumnInfo ('Postgres pgKind)]
-> Ops
-> SubscribeOpSpec ('Postgres pgKind)
-> m ()
mkTrigger TriggerName
triggerName QualifiedTable
table [ColumnInfo ('Postgres pgKind)]
allCols Ops
op SubscribeOpSpec ('Postgres pgKind)
subOpSpec = do
  -- create/replace the trigger function
  QualifiedTriggerName
dbTriggerName <- TriggerName
-> QualifiedTable
-> [ColumnInfo ('Postgres pgKind)]
-> Ops
-> SubscribeOpSpec ('Postgres pgKind)
-> m QualifiedTriggerName
forall (pgKind :: PostgresKind) (m :: * -> *).
(Backend ('Postgres pgKind), MonadTx m,
 MonadReader ServerConfigCtx m) =>
TriggerName
-> QualifiedTable
-> [ColumnInfo ('Postgres pgKind)]
-> Ops
-> SubscribeOpSpec ('Postgres pgKind)
-> m QualifiedTriggerName
mkTriggerFunctionQ TriggerName
triggerName QualifiedTable
table [ColumnInfo ('Postgres pgKind)]
allCols Ops
op SubscribeOpSpec ('Postgres pgKind)
subOpSpec
  -- check if the SQL trigger exists and only if the SQL trigger doesn't exist
  -- we create the SQL trigger.
  Bool
doesTriggerExist <- TxET QErr IO Bool -> m Bool
forall (m :: * -> *) a. MonadTx m => TxE QErr a -> m a
liftTx (TxET QErr IO Bool -> m Bool) -> TxET QErr IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ QualifiedTriggerName -> QualifiedTable -> TxET QErr IO Bool
checkIfTriggerExistsForTableQ (Ops -> TriggerName -> QualifiedTriggerName
pgTriggerName Ops
op TriggerName
triggerName) QualifiedTable
table
  Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
doesTriggerExist (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
    let sqlQuery :: Query
sqlQuery =
          Text -> Query
Q.fromText (Text -> Query) -> Text -> Query
forall a b. (a -> b) -> a -> b
$ QualifiedTriggerName -> Text -> Text -> Text
forall a a.
(ToText a, ToText a) =>
QualifiedTriggerName -> a -> a -> Text
createTriggerSQL QualifiedTriggerName
dbTriggerName (QualifiedTable -> Text
forall a. ToSQL a => a -> Text
toSQLTxt QualifiedTable
table) (Ops -> Text
forall a. Show a => a -> Text
tshow Ops
op)
     in TxET QErr IO () -> m ()
forall (m :: * -> *) a. MonadTx m => TxE QErr a -> m a
liftTx (TxET QErr IO () -> m ()) -> TxET QErr IO () -> m ()
forall a b. (a -> b) -> a -> b
$ (PGTxErr -> QErr) -> Query -> () -> Bool -> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
Q.unitQE PGTxErr -> QErr
defaultTxErrorHandler Query
sqlQuery () Bool
False
  where
    createTriggerSQL :: QualifiedTriggerName -> a -> a -> Text
createTriggerSQL (QualifiedTriggerName Text
triggerNameTxt) a
tableName a
opText =
      [ST.st|
         CREATE TRIGGER #{triggerNameTxt} AFTER #{opText} ON #{tableName} FOR EACH ROW EXECUTE PROCEDURE hdb_catalog.#{triggerNameTxt}()
      |]

mkAllTriggersQ ::
  forall pgKind m.
  (Backend ('Postgres pgKind), MonadTx m, MonadReader ServerConfigCtx m) =>
  TriggerName ->
  QualifiedTable ->
  [ColumnInfo ('Postgres pgKind)] ->
  TriggerOpsDef ('Postgres pgKind) ->
  m ()
mkAllTriggersQ :: TriggerName
-> QualifiedTable
-> [ColumnInfo ('Postgres pgKind)]
-> TriggerOpsDef ('Postgres pgKind)
-> m ()
mkAllTriggersQ TriggerName
triggerName QualifiedTable
table [ColumnInfo ('Postgres pgKind)]
allCols TriggerOpsDef ('Postgres pgKind)
fullspec = do
  Maybe (SubscribeOpSpec ('Postgres pgKind))
-> (SubscribeOpSpec ('Postgres pgKind) -> m ()) -> m ()
forall (m :: * -> *) a.
Applicative m =>
Maybe a -> (a -> m ()) -> m ()
onJust (TriggerOpsDef ('Postgres pgKind)
-> Maybe (SubscribeOpSpec ('Postgres pgKind))
forall (b :: BackendType).
TriggerOpsDef b -> Maybe (SubscribeOpSpec b)
tdInsert TriggerOpsDef ('Postgres pgKind)
fullspec) (TriggerName
-> QualifiedTable
-> [ColumnInfo ('Postgres pgKind)]
-> Ops
-> SubscribeOpSpec ('Postgres pgKind)
-> m ()
forall (pgKind :: PostgresKind) (m :: * -> *).
(Backend ('Postgres pgKind), MonadTx m,
 MonadReader ServerConfigCtx m) =>
TriggerName
-> QualifiedTable
-> [ColumnInfo ('Postgres pgKind)]
-> Ops
-> SubscribeOpSpec ('Postgres pgKind)
-> m ()
mkTrigger TriggerName
triggerName QualifiedTable
table [ColumnInfo ('Postgres pgKind)]
allCols Ops
INSERT)
  Maybe (SubscribeOpSpec ('Postgres pgKind))
-> (SubscribeOpSpec ('Postgres pgKind) -> m ()) -> m ()
forall (m :: * -> *) a.
Applicative m =>
Maybe a -> (a -> m ()) -> m ()
onJust (TriggerOpsDef ('Postgres pgKind)
-> Maybe (SubscribeOpSpec ('Postgres pgKind))
forall (b :: BackendType).
TriggerOpsDef b -> Maybe (SubscribeOpSpec b)
tdUpdate TriggerOpsDef ('Postgres pgKind)
fullspec) (TriggerName
-> QualifiedTable
-> [ColumnInfo ('Postgres pgKind)]
-> Ops
-> SubscribeOpSpec ('Postgres pgKind)
-> m ()
forall (pgKind :: PostgresKind) (m :: * -> *).
(Backend ('Postgres pgKind), MonadTx m,
 MonadReader ServerConfigCtx m) =>
TriggerName
-> QualifiedTable
-> [ColumnInfo ('Postgres pgKind)]
-> Ops
-> SubscribeOpSpec ('Postgres pgKind)
-> m ()
mkTrigger TriggerName
triggerName QualifiedTable
table [ColumnInfo ('Postgres pgKind)]
allCols Ops
UPDATE)
  Maybe (SubscribeOpSpec ('Postgres pgKind))
-> (SubscribeOpSpec ('Postgres pgKind) -> m ()) -> m ()
forall (m :: * -> *) a.
Applicative m =>
Maybe a -> (a -> m ()) -> m ()
onJust (TriggerOpsDef ('Postgres pgKind)
-> Maybe (SubscribeOpSpec ('Postgres pgKind))
forall (b :: BackendType).
TriggerOpsDef b -> Maybe (SubscribeOpSpec b)
tdDelete TriggerOpsDef ('Postgres pgKind)
fullspec) (TriggerName
-> QualifiedTable
-> [ColumnInfo ('Postgres pgKind)]
-> Ops
-> SubscribeOpSpec ('Postgres pgKind)
-> m ()
forall (pgKind :: PostgresKind) (m :: * -> *).
(Backend ('Postgres pgKind), MonadTx m,
 MonadReader ServerConfigCtx m) =>
TriggerName
-> QualifiedTable
-> [ColumnInfo ('Postgres pgKind)]
-> Ops
-> SubscribeOpSpec ('Postgres pgKind)
-> m ()
mkTrigger TriggerName
triggerName QualifiedTable
table [ColumnInfo ('Postgres pgKind)]
allCols Ops
DELETE)