{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE TemplateHaskell #-}
module Hasura.Backends.Postgres.DDL.EventTrigger
( insertManualEvent,
redeliverEvent,
dropTriggerAndArchiveEvents,
createTableEventTrigger,
createMissingSQLTriggers,
dropTriggerQ,
dropDanglingSQLTrigger,
mkAllTriggersQ,
getMaintenanceModeVersion,
fetchUndeliveredEvents,
setRetry,
recordSuccess,
recordError,
recordError',
unlockEventsInSource,
updateColumnInEventTrigger,
checkIfTriggerExists,
)
where
import Control.Monad.Trans.Control (MonadBaseControl)
import Data.Aeson
import Data.FileEmbed (makeRelativeToProject)
import Data.HashSet qualified as HashSet
import Data.Int (Int64)
import Data.Set.NonEmpty qualified as NE
import Data.Text.Lazy qualified as TL
import Data.Time.Clock qualified as Time
import Database.PG.Query qualified as Q
import Hasura.Backends.Postgres.Connection
import Hasura.Backends.Postgres.SQL.DML
import Hasura.Backends.Postgres.SQL.Types hiding (TableName)
import Hasura.Backends.Postgres.Translate.Column
import Hasura.Base.Error
import Hasura.Prelude
import Hasura.RQL.Types.Backend (Backend, SourceConfig, TableName)
import Hasura.RQL.Types.Column
import Hasura.RQL.Types.Common
import Hasura.RQL.Types.EventTrigger
import Hasura.RQL.Types.Eventing
import Hasura.RQL.Types.Source
import Hasura.RQL.Types.Table (PrimaryKey)
import Hasura.SQL.Backend
import Hasura.SQL.Types
import Hasura.Server.Migrate.Internal
import Hasura.Server.Migrate.LatestVersion
import Hasura.Server.Migrate.Version
import Hasura.Server.Types
import Hasura.Session
import Hasura.Tracing qualified as Tracing
import Text.Shakespeare.Text qualified as ST
fetchUndeliveredEvents ::
(MonadIO m, MonadError QErr m) =>
SourceConfig ('Postgres pgKind) ->
SourceName ->
[TriggerName] ->
MaintenanceMode () ->
FetchBatchSize ->
m [Event ('Postgres pgKind)]
fetchUndeliveredEvents :: SourceConfig ('Postgres pgKind)
-> SourceName
-> [TriggerName]
-> MaintenanceMode ()
-> FetchBatchSize
-> m [Event ('Postgres pgKind)]
fetchUndeliveredEvents SourceConfig ('Postgres pgKind)
sourceConfig SourceName
sourceName [TriggerName]
triggerNames MaintenanceMode ()
maintenanceMode FetchBatchSize
fetchBatchSize = do
Either QErr (TxE QErr [Event ('Postgres pgKind)])
fetchEventsTxE <-
case MaintenanceMode ()
maintenanceMode of
MaintenanceModeEnabled () -> do
Either QErr MaintenanceModeVersion
maintenanceModeVersion <- IO (Either QErr MaintenanceModeVersion)
-> m (Either QErr MaintenanceModeVersion)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either QErr MaintenanceModeVersion)
-> m (Either QErr MaintenanceModeVersion))
-> IO (Either QErr MaintenanceModeVersion)
-> m (Either QErr MaintenanceModeVersion)
forall a b. (a -> b) -> a -> b
$ PGSourceConfig
-> TxET QErr IO MaintenanceModeVersion
-> IO (Either QErr MaintenanceModeVersion)
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> TxET QErr m a -> m (Either QErr a)
runPgSourceReadTx PGSourceConfig
SourceConfig ('Postgres pgKind)
sourceConfig TxET QErr IO MaintenanceModeVersion
getMaintenanceModeVersionTx
Either QErr (TxE QErr [Event ('Postgres pgKind)])
-> m (Either QErr (TxE QErr [Event ('Postgres pgKind)]))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either QErr (TxE QErr [Event ('Postgres pgKind)])
-> m (Either QErr (TxE QErr [Event ('Postgres pgKind)])))
-> Either QErr (TxE QErr [Event ('Postgres pgKind)])
-> m (Either QErr (TxE QErr [Event ('Postgres pgKind)]))
forall a b. (a -> b) -> a -> b
$ (MaintenanceModeVersion -> TxE QErr [Event ('Postgres pgKind)])
-> Either QErr MaintenanceModeVersion
-> Either QErr (TxE QErr [Event ('Postgres pgKind)])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (SourceName
-> [TriggerName]
-> FetchBatchSize
-> MaintenanceModeVersion
-> TxE QErr [Event ('Postgres pgKind)]
forall (pgKind :: PostgresKind).
SourceName
-> [TriggerName]
-> FetchBatchSize
-> MaintenanceModeVersion
-> TxE QErr [Event ('Postgres pgKind)]
fetchEventsMaintenanceMode SourceName
sourceName [TriggerName]
triggerNames FetchBatchSize
fetchBatchSize) Either QErr MaintenanceModeVersion
maintenanceModeVersion
MaintenanceMode ()
MaintenanceModeDisabled -> Either QErr (TxE QErr [Event ('Postgres pgKind)])
-> m (Either QErr (TxE QErr [Event ('Postgres pgKind)]))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either QErr (TxE QErr [Event ('Postgres pgKind)])
-> m (Either QErr (TxE QErr [Event ('Postgres pgKind)])))
-> Either QErr (TxE QErr [Event ('Postgres pgKind)])
-> m (Either QErr (TxE QErr [Event ('Postgres pgKind)]))
forall a b. (a -> b) -> a -> b
$ TxE QErr [Event ('Postgres pgKind)]
-> Either QErr (TxE QErr [Event ('Postgres pgKind)])
forall a b. b -> Either a b
Right (TxE QErr [Event ('Postgres pgKind)]
-> Either QErr (TxE QErr [Event ('Postgres pgKind)]))
-> TxE QErr [Event ('Postgres pgKind)]
-> Either QErr (TxE QErr [Event ('Postgres pgKind)])
forall a b. (a -> b) -> a -> b
$ SourceName
-> [TriggerName]
-> FetchBatchSize
-> TxE QErr [Event ('Postgres pgKind)]
forall (pgKind :: PostgresKind).
SourceName
-> [TriggerName]
-> FetchBatchSize
-> TxE QErr [Event ('Postgres pgKind)]
fetchEvents SourceName
sourceName [TriggerName]
triggerNames FetchBatchSize
fetchBatchSize
case Either QErr (TxE QErr [Event ('Postgres pgKind)])
fetchEventsTxE of
Left QErr
err -> QErr -> m [Event ('Postgres pgKind)]
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (QErr -> m [Event ('Postgres pgKind)])
-> QErr -> m [Event ('Postgres pgKind)]
forall a b. (a -> b) -> a -> b
$ Text -> QErr -> QErr
prefixQErr Text
"something went wrong while fetching events: " QErr
err
Right TxE QErr [Event ('Postgres pgKind)]
fetchEventsTx ->
m (Either QErr [Event ('Postgres pgKind)])
-> m [Event ('Postgres pgKind)]
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM (m (Either QErr [Event ('Postgres pgKind)])
-> m [Event ('Postgres pgKind)])
-> m (Either QErr [Event ('Postgres pgKind)])
-> m [Event ('Postgres pgKind)]
forall a b. (a -> b) -> a -> b
$
IO (Either QErr [Event ('Postgres pgKind)])
-> m (Either QErr [Event ('Postgres pgKind)])
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either QErr [Event ('Postgres pgKind)])
-> m (Either QErr [Event ('Postgres pgKind)]))
-> IO (Either QErr [Event ('Postgres pgKind)])
-> m (Either QErr [Event ('Postgres pgKind)])
forall a b. (a -> b) -> a -> b
$
PGSourceConfig
-> TxE QErr [Event ('Postgres pgKind)]
-> IO (Either QErr [Event ('Postgres pgKind)])
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> TxET QErr m a -> m (Either QErr a)
runPgSourceWriteTx PGSourceConfig
SourceConfig ('Postgres pgKind)
sourceConfig TxE QErr [Event ('Postgres pgKind)]
fetchEventsTx
setRetry ::
( MonadIO m,
MonadError QErr m
) =>
SourceConfig ('Postgres pgKind) ->
Event ('Postgres pgKind) ->
Time.UTCTime ->
MaintenanceMode MaintenanceModeVersion ->
m ()
setRetry :: SourceConfig ('Postgres pgKind)
-> Event ('Postgres pgKind)
-> UTCTime
-> MaintenanceMode MaintenanceModeVersion
-> m ()
setRetry SourceConfig ('Postgres pgKind)
sourceConfig Event ('Postgres pgKind)
event UTCTime
retryTime MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion =
m (Either QErr ()) -> m ()
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM (m (Either QErr ()) -> m ()) -> m (Either QErr ()) -> m ()
forall a b. (a -> b) -> a -> b
$ IO (Either QErr ()) -> m (Either QErr ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either QErr ()) -> m (Either QErr ()))
-> IO (Either QErr ()) -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$ PGSourceConfig -> TxET QErr IO () -> IO (Either QErr ())
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> TxET QErr m a -> m (Either QErr a)
runPgSourceWriteTx PGSourceConfig
SourceConfig ('Postgres pgKind)
sourceConfig (Event ('Postgres pgKind)
-> UTCTime
-> MaintenanceMode MaintenanceModeVersion
-> TxET QErr IO ()
forall (pgKind :: PostgresKind).
Event ('Postgres pgKind)
-> UTCTime
-> MaintenanceMode MaintenanceModeVersion
-> TxET QErr IO ()
setRetryTx Event ('Postgres pgKind)
event UTCTime
retryTime MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion)
insertManualEvent ::
(MonadIO m, MonadError QErr m) =>
SourceConfig ('Postgres pgKind) ->
TableName ('Postgres pgKind) ->
TriggerName ->
Value ->
UserInfo ->
Tracing.TraceContext ->
m EventId
insertManualEvent :: SourceConfig ('Postgres pgKind)
-> TableName ('Postgres pgKind)
-> TriggerName
-> Value
-> UserInfo
-> TraceContext
-> m EventId
insertManualEvent SourceConfig ('Postgres pgKind)
sourceConfig TableName ('Postgres pgKind)
tableName TriggerName
triggerName Value
payload UserInfo
userInfo TraceContext
traceCtx =
m (Either QErr EventId) -> m EventId
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM (m (Either QErr EventId) -> m EventId)
-> m (Either QErr EventId) -> m EventId
forall a b. (a -> b) -> a -> b
$
IO (Either QErr EventId) -> m (Either QErr EventId)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either QErr EventId) -> m (Either QErr EventId))
-> IO (Either QErr EventId) -> m (Either QErr EventId)
forall a b. (a -> b) -> a -> b
$
PGSourceConfig -> TxET QErr IO EventId -> IO (Either QErr EventId)
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> TxET QErr m a -> m (Either QErr a)
runPgSourceWriteTx PGSourceConfig
SourceConfig ('Postgres pgKind)
sourceConfig (TxET QErr IO EventId -> IO (Either QErr EventId))
-> TxET QErr IO EventId -> IO (Either QErr EventId)
forall a b. (a -> b) -> a -> b
$
SessionVariables -> TxET QErr IO ()
forall (m :: * -> *).
MonadIO m =>
SessionVariables -> TxET QErr m ()
setHeadersTx (UserInfo -> SessionVariables
_uiSession UserInfo
userInfo)
TxET QErr IO () -> TxET QErr IO () -> TxET QErr IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> TraceContext -> TxET QErr IO ()
forall (m :: * -> *). MonadIO m => TraceContext -> TxET QErr m ()
setTraceContextInTx TraceContext
traceCtx
TxET QErr IO () -> TxET QErr IO EventId -> TxET QErr IO EventId
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> QualifiedTable -> TriggerName -> Value -> TxET QErr IO EventId
insertPGManualEvent TableName ('Postgres pgKind)
QualifiedTable
tableName TriggerName
triggerName Value
payload
getMaintenanceModeVersion ::
( MonadIO m,
MonadError QErr m
) =>
SourceConfig ('Postgres pgKind) ->
m MaintenanceModeVersion
getMaintenanceModeVersion :: SourceConfig ('Postgres pgKind) -> m MaintenanceModeVersion
getMaintenanceModeVersion SourceConfig ('Postgres pgKind)
sourceConfig =
m (Either QErr MaintenanceModeVersion) -> m MaintenanceModeVersion
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM (m (Either QErr MaintenanceModeVersion)
-> m MaintenanceModeVersion)
-> m (Either QErr MaintenanceModeVersion)
-> m MaintenanceModeVersion
forall a b. (a -> b) -> a -> b
$ IO (Either QErr MaintenanceModeVersion)
-> m (Either QErr MaintenanceModeVersion)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either QErr MaintenanceModeVersion)
-> m (Either QErr MaintenanceModeVersion))
-> IO (Either QErr MaintenanceModeVersion)
-> m (Either QErr MaintenanceModeVersion)
forall a b. (a -> b) -> a -> b
$ PGSourceConfig
-> TxET QErr IO MaintenanceModeVersion
-> IO (Either QErr MaintenanceModeVersion)
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> TxET QErr m a -> m (Either QErr a)
runPgSourceReadTx PGSourceConfig
SourceConfig ('Postgres pgKind)
sourceConfig TxET QErr IO MaintenanceModeVersion
getMaintenanceModeVersionTx
recordSuccess ::
(MonadIO m) =>
SourceConfig ('Postgres pgKind) ->
Event ('Postgres pgKind) ->
Invocation 'EventType ->
MaintenanceMode MaintenanceModeVersion ->
m (Either QErr ())
recordSuccess :: SourceConfig ('Postgres pgKind)
-> Event ('Postgres pgKind)
-> Invocation 'EventType
-> MaintenanceMode MaintenanceModeVersion
-> m (Either QErr ())
recordSuccess SourceConfig ('Postgres pgKind)
sourceConfig Event ('Postgres pgKind)
event Invocation 'EventType
invocation MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion =
IO (Either QErr ()) -> m (Either QErr ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either QErr ()) -> m (Either QErr ()))
-> IO (Either QErr ()) -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$
PGSourceConfig -> TxET QErr IO () -> IO (Either QErr ())
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> TxET QErr m a -> m (Either QErr a)
runPgSourceWriteTx PGSourceConfig
SourceConfig ('Postgres pgKind)
sourceConfig (TxET QErr IO () -> IO (Either QErr ()))
-> TxET QErr IO () -> IO (Either QErr ())
forall a b. (a -> b) -> a -> b
$ do
Invocation 'EventType -> TxET QErr IO ()
insertInvocation Invocation 'EventType
invocation
Event ('Postgres pgKind)
-> MaintenanceMode MaintenanceModeVersion -> TxET QErr IO ()
forall (pgKind :: PostgresKind).
Event ('Postgres pgKind)
-> MaintenanceMode MaintenanceModeVersion -> TxET QErr IO ()
setSuccessTx Event ('Postgres pgKind)
event MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion
recordError ::
(MonadIO m) =>
SourceConfig ('Postgres pgKind) ->
Event ('Postgres pgKind) ->
Invocation 'EventType ->
ProcessEventError ->
MaintenanceMode MaintenanceModeVersion ->
m (Either QErr ())
recordError :: SourceConfig ('Postgres pgKind)
-> Event ('Postgres pgKind)
-> Invocation 'EventType
-> ProcessEventError
-> MaintenanceMode MaintenanceModeVersion
-> m (Either QErr ())
recordError SourceConfig ('Postgres pgKind)
sourceConfig Event ('Postgres pgKind)
event Invocation 'EventType
invocation ProcessEventError
processEventError MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion =
SourceConfig ('Postgres pgKind)
-> Event ('Postgres pgKind)
-> Maybe (Invocation 'EventType)
-> ProcessEventError
-> MaintenanceMode MaintenanceModeVersion
-> m (Either QErr ())
forall (m :: * -> *) (pgKind :: PostgresKind).
MonadIO m =>
SourceConfig ('Postgres pgKind)
-> Event ('Postgres pgKind)
-> Maybe (Invocation 'EventType)
-> ProcessEventError
-> MaintenanceMode MaintenanceModeVersion
-> m (Either QErr ())
recordError' SourceConfig ('Postgres pgKind)
sourceConfig Event ('Postgres pgKind)
event (Invocation 'EventType -> Maybe (Invocation 'EventType)
forall a. a -> Maybe a
Just Invocation 'EventType
invocation) ProcessEventError
processEventError MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion
recordError' ::
(MonadIO m) =>
SourceConfig ('Postgres pgKind) ->
Event ('Postgres pgKind) ->
Maybe (Invocation 'EventType) ->
ProcessEventError ->
MaintenanceMode MaintenanceModeVersion ->
m (Either QErr ())
recordError' :: SourceConfig ('Postgres pgKind)
-> Event ('Postgres pgKind)
-> Maybe (Invocation 'EventType)
-> ProcessEventError
-> MaintenanceMode MaintenanceModeVersion
-> m (Either QErr ())
recordError' SourceConfig ('Postgres pgKind)
sourceConfig Event ('Postgres pgKind)
event Maybe (Invocation 'EventType)
invocation ProcessEventError
processEventError MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion =
IO (Either QErr ()) -> m (Either QErr ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either QErr ()) -> m (Either QErr ()))
-> IO (Either QErr ()) -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$
PGSourceConfig -> TxET QErr IO () -> IO (Either QErr ())
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> TxET QErr m a -> m (Either QErr a)
runPgSourceWriteTx PGSourceConfig
SourceConfig ('Postgres pgKind)
sourceConfig (TxET QErr IO () -> IO (Either QErr ()))
-> TxET QErr IO () -> IO (Either QErr ())
forall a b. (a -> b) -> a -> b
$ do
Maybe (Invocation 'EventType)
-> (Invocation 'EventType -> TxET QErr IO ()) -> TxET QErr IO ()
forall (m :: * -> *) a.
Applicative m =>
Maybe a -> (a -> m ()) -> m ()
onJust Maybe (Invocation 'EventType)
invocation Invocation 'EventType -> TxET QErr IO ()
insertInvocation
case ProcessEventError
processEventError of
PESetRetry UTCTime
retryTime -> Event ('Postgres pgKind)
-> UTCTime
-> MaintenanceMode MaintenanceModeVersion
-> TxET QErr IO ()
forall (pgKind :: PostgresKind).
Event ('Postgres pgKind)
-> UTCTime
-> MaintenanceMode MaintenanceModeVersion
-> TxET QErr IO ()
setRetryTx Event ('Postgres pgKind)
event UTCTime
retryTime MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion
ProcessEventError
PESetError -> Event ('Postgres pgKind)
-> MaintenanceMode MaintenanceModeVersion -> TxET QErr IO ()
forall (pgKind :: PostgresKind).
Event ('Postgres pgKind)
-> MaintenanceMode MaintenanceModeVersion -> TxET QErr IO ()
setErrorTx Event ('Postgres pgKind)
event MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion
redeliverEvent ::
(MonadIO m, MonadError QErr m) =>
SourceConfig ('Postgres pgKind) ->
EventId ->
m ()
redeliverEvent :: SourceConfig ('Postgres pgKind) -> EventId -> m ()
redeliverEvent SourceConfig ('Postgres pgKind)
sourceConfig EventId
eventId =
m (Either QErr ()) -> m ()
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM (m (Either QErr ()) -> m ()) -> m (Either QErr ()) -> m ()
forall a b. (a -> b) -> a -> b
$ IO (Either QErr ()) -> m (Either QErr ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either QErr ()) -> m (Either QErr ()))
-> IO (Either QErr ()) -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$ PGSourceConfig -> TxET QErr IO () -> IO (Either QErr ())
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> TxET QErr m a -> m (Either QErr a)
runPgSourceWriteTx PGSourceConfig
SourceConfig ('Postgres pgKind)
sourceConfig (EventId -> TxET QErr IO ()
redeliverEventTx EventId
eventId)
dropTriggerAndArchiveEvents ::
( MonadIO m,
MonadError QErr m
) =>
SourceConfig ('Postgres pgKind) ->
TriggerName ->
QualifiedTable ->
m ()
dropTriggerAndArchiveEvents :: SourceConfig ('Postgres pgKind)
-> TriggerName -> QualifiedTable -> m ()
dropTriggerAndArchiveEvents SourceConfig ('Postgres pgKind)
sourceConfig TriggerName
triggerName QualifiedTable
_table =
m (Either QErr ()) -> m ()
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM (m (Either QErr ()) -> m ()) -> m (Either QErr ()) -> m ()
forall a b. (a -> b) -> a -> b
$
IO (Either QErr ()) -> m (Either QErr ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either QErr ()) -> m (Either QErr ()))
-> IO (Either QErr ()) -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$
PGSourceConfig -> TxET QErr IO () -> IO (Either QErr ())
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> TxET QErr m a -> m (Either QErr a)
runPgSourceWriteTx PGSourceConfig
SourceConfig ('Postgres pgKind)
sourceConfig (TxET QErr IO () -> IO (Either QErr ()))
-> TxET QErr IO () -> IO (Either QErr ())
forall a b. (a -> b) -> a -> b
$ do
TriggerName -> TxET QErr IO ()
dropTriggerQ TriggerName
triggerName
TriggerName -> TxET QErr IO ()
archiveEvents TriggerName
triggerName
createMissingSQLTriggers ::
( MonadIO m,
MonadError QErr m,
MonadBaseControl IO m,
HasServerConfigCtx m,
Backend ('Postgres pgKind)
) =>
PGSourceConfig ->
TableName ('Postgres pgKind) ->
([(ColumnInfo ('Postgres pgKind))], Maybe (PrimaryKey ('Postgres pgKind) (ColumnInfo ('Postgres pgKind)))) ->
TriggerName ->
TriggerOpsDef ('Postgres pgKind) ->
m ()
createMissingSQLTriggers :: PGSourceConfig
-> TableName ('Postgres pgKind)
-> ([ColumnInfo ('Postgres pgKind)],
Maybe
(PrimaryKey ('Postgres pgKind) (ColumnInfo ('Postgres pgKind))))
-> TriggerName
-> TriggerOpsDef ('Postgres pgKind)
-> m ()
createMissingSQLTriggers PGSourceConfig
sourceConfig TableName ('Postgres pgKind)
table ([ColumnInfo ('Postgres pgKind)]
allCols, Maybe
(PrimaryKey ('Postgres pgKind) (ColumnInfo ('Postgres pgKind)))
_) TriggerName
triggerName TriggerOpsDef ('Postgres pgKind)
opsDefinition = do
ServerConfigCtx
serverConfigCtx <- m ServerConfigCtx
forall (m :: * -> *). HasServerConfigCtx m => m ServerConfigCtx
askServerConfigCtx
m (Either QErr ()) -> m ()
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM (m (Either QErr ()) -> m ()) -> m (Either QErr ()) -> m ()
forall a b. (a -> b) -> a -> b
$
PGSourceConfig -> TxET QErr m () -> m (Either QErr ())
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> TxET QErr m a -> m (Either QErr a)
runPgSourceWriteTx PGSourceConfig
sourceConfig (TxET QErr m () -> m (Either QErr ()))
-> TxET QErr m () -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$ do
Maybe (SubscribeOpSpec ('Postgres pgKind))
-> (SubscribeOpSpec ('Postgres pgKind) -> TxET QErr m ())
-> TxET QErr m ()
forall (m :: * -> *) a.
Applicative m =>
Maybe a -> (a -> m ()) -> m ()
onJust (TriggerOpsDef ('Postgres pgKind)
-> Maybe (SubscribeOpSpec ('Postgres pgKind))
forall (b :: BackendType).
TriggerOpsDef b -> Maybe (SubscribeOpSpec b)
tdInsert TriggerOpsDef ('Postgres pgKind)
opsDefinition) (ServerConfigCtx
-> Ops -> SubscribeOpSpec ('Postgres pgKind) -> TxET QErr m ()
doesSQLTriggerExist ServerConfigCtx
serverConfigCtx Ops
INSERT)
Maybe (SubscribeOpSpec ('Postgres pgKind))
-> (SubscribeOpSpec ('Postgres pgKind) -> TxET QErr m ())
-> TxET QErr m ()
forall (m :: * -> *) a.
Applicative m =>
Maybe a -> (a -> m ()) -> m ()
onJust (TriggerOpsDef ('Postgres pgKind)
-> Maybe (SubscribeOpSpec ('Postgres pgKind))
forall (b :: BackendType).
TriggerOpsDef b -> Maybe (SubscribeOpSpec b)
tdUpdate TriggerOpsDef ('Postgres pgKind)
opsDefinition) (ServerConfigCtx
-> Ops -> SubscribeOpSpec ('Postgres pgKind) -> TxET QErr m ()
doesSQLTriggerExist ServerConfigCtx
serverConfigCtx Ops
UPDATE)
Maybe (SubscribeOpSpec ('Postgres pgKind))
-> (SubscribeOpSpec ('Postgres pgKind) -> TxET QErr m ())
-> TxET QErr m ()
forall (m :: * -> *) a.
Applicative m =>
Maybe a -> (a -> m ()) -> m ()
onJust (TriggerOpsDef ('Postgres pgKind)
-> Maybe (SubscribeOpSpec ('Postgres pgKind))
forall (b :: BackendType).
TriggerOpsDef b -> Maybe (SubscribeOpSpec b)
tdDelete TriggerOpsDef ('Postgres pgKind)
opsDefinition) (ServerConfigCtx
-> Ops -> SubscribeOpSpec ('Postgres pgKind) -> TxET QErr m ()
doesSQLTriggerExist ServerConfigCtx
serverConfigCtx Ops
DELETE)
where
doesSQLTriggerExist :: ServerConfigCtx
-> Ops -> SubscribeOpSpec ('Postgres pgKind) -> TxET QErr m ()
doesSQLTriggerExist ServerConfigCtx
serverConfigCtx Ops
op SubscribeOpSpec ('Postgres pgKind)
opSpec = do
let opTriggerName :: QualifiedTriggerName
opTriggerName = Ops -> TriggerName -> QualifiedTriggerName
pgTriggerName Ops
op TriggerName
triggerName
Bool
doesOpTriggerFunctionExist <-
Identity Bool -> Bool
forall a. Identity a -> a
runIdentity (Identity Bool -> Bool)
-> (SingleRow (Identity Bool) -> Identity Bool)
-> SingleRow (Identity Bool)
-> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SingleRow (Identity Bool) -> Identity Bool
forall a. SingleRow a -> a
Q.getRow
(SingleRow (Identity Bool) -> Bool)
-> TxET QErr m (SingleRow (Identity Bool)) -> TxET QErr m Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> Identity QualifiedTriggerName
-> Bool
-> TxET QErr m (SingleRow (Identity Bool))
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
Q.withQE
PGTxErr -> QErr
defaultTxErrorHandler
[Q.sql|
SELECT EXISTS
( SELECT 1
FROM pg_proc
WHERE proname = $1
)
|]
(QualifiedTriggerName -> Identity QualifiedTriggerName
forall a. a -> Identity a
Identity QualifiedTriggerName
opTriggerName)
Bool
True
Bool -> TxET QErr m () -> TxET QErr m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
doesOpTriggerFunctionExist (TxET QErr m () -> TxET QErr m ())
-> TxET QErr m () -> TxET QErr m ()
forall a b. (a -> b) -> a -> b
$
(ReaderT ServerConfigCtx (TxET QErr m) ()
-> ServerConfigCtx -> TxET QErr m ())
-> ServerConfigCtx
-> ReaderT ServerConfigCtx (TxET QErr m) ()
-> TxET QErr m ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip ReaderT ServerConfigCtx (TxET QErr m) ()
-> ServerConfigCtx -> TxET QErr m ()
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT ServerConfigCtx
serverConfigCtx (ReaderT ServerConfigCtx (TxET QErr m) () -> TxET QErr m ())
-> ReaderT ServerConfigCtx (TxET QErr m) () -> TxET QErr m ()
forall a b. (a -> b) -> a -> b
$ TriggerName
-> QualifiedTable
-> [ColumnInfo ('Postgres pgKind)]
-> Ops
-> SubscribeOpSpec ('Postgres pgKind)
-> ReaderT ServerConfigCtx (TxET QErr m) ()
forall (pgKind :: PostgresKind) (m :: * -> *).
(Backend ('Postgres pgKind), MonadTx m,
MonadReader ServerConfigCtx m) =>
TriggerName
-> QualifiedTable
-> [ColumnInfo ('Postgres pgKind)]
-> Ops
-> SubscribeOpSpec ('Postgres pgKind)
-> m ()
mkTrigger TriggerName
triggerName TableName ('Postgres pgKind)
QualifiedTable
table [ColumnInfo ('Postgres pgKind)]
allCols Ops
op SubscribeOpSpec ('Postgres pgKind)
opSpec
createTableEventTrigger ::
(Backend ('Postgres pgKind), MonadIO m, MonadBaseControl IO m) =>
ServerConfigCtx ->
PGSourceConfig ->
QualifiedTable ->
[ColumnInfo ('Postgres pgKind)] ->
TriggerName ->
TriggerOpsDef ('Postgres pgKind) ->
Maybe (PrimaryKey ('Postgres pgKind) (ColumnInfo ('Postgres pgKind))) ->
m (Either QErr ())
createTableEventTrigger :: ServerConfigCtx
-> PGSourceConfig
-> QualifiedTable
-> [ColumnInfo ('Postgres pgKind)]
-> TriggerName
-> TriggerOpsDef ('Postgres pgKind)
-> Maybe
(PrimaryKey ('Postgres pgKind) (ColumnInfo ('Postgres pgKind)))
-> m (Either QErr ())
createTableEventTrigger ServerConfigCtx
serverConfigCtx PGSourceConfig
sourceConfig QualifiedTable
table [ColumnInfo ('Postgres pgKind)]
columns TriggerName
triggerName TriggerOpsDef ('Postgres pgKind)
opsDefinition Maybe
(PrimaryKey ('Postgres pgKind) (ColumnInfo ('Postgres pgKind)))
_ = PGSourceConfig -> TxET QErr m () -> m (Either QErr ())
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> TxET QErr m a -> m (Either QErr a)
runPgSourceWriteTx PGSourceConfig
sourceConfig (TxET QErr m () -> m (Either QErr ()))
-> TxET QErr m () -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$ do
(ReaderT ServerConfigCtx (TxET QErr m) ()
-> ServerConfigCtx -> TxET QErr m ())
-> ServerConfigCtx
-> ReaderT ServerConfigCtx (TxET QErr m) ()
-> TxET QErr m ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip ReaderT ServerConfigCtx (TxET QErr m) ()
-> ServerConfigCtx -> TxET QErr m ()
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT ServerConfigCtx
serverConfigCtx (ReaderT ServerConfigCtx (TxET QErr m) () -> TxET QErr m ())
-> ReaderT ServerConfigCtx (TxET QErr m) () -> TxET QErr m ()
forall a b. (a -> b) -> a -> b
$
TriggerName
-> QualifiedTable
-> [ColumnInfo ('Postgres pgKind)]
-> TriggerOpsDef ('Postgres pgKind)
-> ReaderT ServerConfigCtx (TxET QErr m) ()
forall (pgKind :: PostgresKind) (m :: * -> *).
(Backend ('Postgres pgKind), MonadTx m,
MonadReader ServerConfigCtx m) =>
TriggerName
-> QualifiedTable
-> [ColumnInfo ('Postgres pgKind)]
-> TriggerOpsDef ('Postgres pgKind)
-> m ()
mkAllTriggersQ TriggerName
triggerName QualifiedTable
table [ColumnInfo ('Postgres pgKind)]
columns TriggerOpsDef ('Postgres pgKind)
opsDefinition
dropDanglingSQLTrigger ::
( MonadIO m,
MonadError QErr m
) =>
SourceConfig ('Postgres pgKind) ->
TriggerName ->
QualifiedTable ->
HashSet Ops ->
m ()
dropDanglingSQLTrigger :: SourceConfig ('Postgres pgKind)
-> TriggerName -> QualifiedTable -> HashSet Ops -> m ()
dropDanglingSQLTrigger SourceConfig ('Postgres pgKind)
sourceConfig TriggerName
triggerName QualifiedTable
_ HashSet Ops
ops =
m (Either QErr ()) -> m ()
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM (m (Either QErr ()) -> m ()) -> m (Either QErr ()) -> m ()
forall a b. (a -> b) -> a -> b
$
IO (Either QErr ()) -> m (Either QErr ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either QErr ()) -> m (Either QErr ()))
-> IO (Either QErr ()) -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$
PGSourceConfig -> TxET QErr IO () -> IO (Either QErr ())
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> TxET QErr m a -> m (Either QErr a)
runPgSourceWriteTx PGSourceConfig
SourceConfig ('Postgres pgKind)
sourceConfig (TxET QErr IO () -> IO (Either QErr ()))
-> TxET QErr IO () -> IO (Either QErr ())
forall a b. (a -> b) -> a -> b
$
(Ops -> TxET QErr IO ()) -> HashSet Ops -> TxET QErr IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (TriggerName -> Ops -> TxET QErr IO ()
dropTriggerOp TriggerName
triggerName) HashSet Ops
ops
updateColumnInEventTrigger ::
QualifiedTable ->
PGCol ->
PGCol ->
QualifiedTable ->
EventTriggerConf ('Postgres pgKind) ->
EventTriggerConf ('Postgres pgKind)
updateColumnInEventTrigger :: QualifiedTable
-> PGCol
-> PGCol
-> QualifiedTable
-> EventTriggerConf ('Postgres pgKind)
-> EventTriggerConf ('Postgres pgKind)
updateColumnInEventTrigger QualifiedTable
table PGCol
oCol PGCol
nCol QualifiedTable
refTable = EventTriggerConf ('Postgres pgKind)
-> EventTriggerConf ('Postgres pgKind)
rewriteEventTriggerConf
where
rewriteSubsCols :: SubscribeColumns ('Postgres pgKind)
-> SubscribeColumns ('Postgres pgKind)
rewriteSubsCols = \case
SubscribeColumns ('Postgres pgKind)
SubCStar -> SubscribeColumns ('Postgres pgKind)
forall (b :: BackendType). SubscribeColumns b
SubCStar
SubCArray [Column ('Postgres pgKind)]
cols -> [Column ('Postgres pgKind)] -> SubscribeColumns ('Postgres pgKind)
forall (b :: BackendType). [Column b] -> SubscribeColumns b
SubCArray ([Column ('Postgres pgKind)]
-> SubscribeColumns ('Postgres pgKind))
-> [Column ('Postgres pgKind)]
-> SubscribeColumns ('Postgres pgKind)
forall a b. (a -> b) -> a -> b
$ (PGCol -> PGCol) -> [PGCol] -> [PGCol]
forall a b. (a -> b) -> [a] -> [b]
map PGCol -> PGCol
getNewCol [Column ('Postgres pgKind)]
[PGCol]
cols
rewriteOpSpec :: SubscribeOpSpec ('Postgres pgKind)
-> SubscribeOpSpec ('Postgres pgKind)
rewriteOpSpec (SubscribeOpSpec SubscribeColumns ('Postgres pgKind)
listenColumns Maybe (SubscribeColumns ('Postgres pgKind))
deliveryColumns) =
SubscribeColumns ('Postgres pgKind)
-> Maybe (SubscribeColumns ('Postgres pgKind))
-> SubscribeOpSpec ('Postgres pgKind)
forall (b :: BackendType).
SubscribeColumns b
-> Maybe (SubscribeColumns b) -> SubscribeOpSpec b
SubscribeOpSpec
(SubscribeColumns ('Postgres pgKind)
-> SubscribeColumns ('Postgres pgKind)
rewriteSubsCols SubscribeColumns ('Postgres pgKind)
listenColumns)
(SubscribeColumns ('Postgres pgKind)
-> SubscribeColumns ('Postgres pgKind)
rewriteSubsCols (SubscribeColumns ('Postgres pgKind)
-> SubscribeColumns ('Postgres pgKind))
-> Maybe (SubscribeColumns ('Postgres pgKind))
-> Maybe (SubscribeColumns ('Postgres pgKind))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (SubscribeColumns ('Postgres pgKind))
deliveryColumns)
rewriteTrigOpsDef :: TriggerOpsDef ('Postgres pgKind)
-> TriggerOpsDef ('Postgres pgKind)
rewriteTrigOpsDef (TriggerOpsDef Maybe (SubscribeOpSpec ('Postgres pgKind))
ins Maybe (SubscribeOpSpec ('Postgres pgKind))
upd Maybe (SubscribeOpSpec ('Postgres pgKind))
del Maybe Bool
man) =
Maybe (SubscribeOpSpec ('Postgres pgKind))
-> Maybe (SubscribeOpSpec ('Postgres pgKind))
-> Maybe (SubscribeOpSpec ('Postgres pgKind))
-> Maybe Bool
-> TriggerOpsDef ('Postgres pgKind)
forall (b :: BackendType).
Maybe (SubscribeOpSpec b)
-> Maybe (SubscribeOpSpec b)
-> Maybe (SubscribeOpSpec b)
-> Maybe Bool
-> TriggerOpsDef b
TriggerOpsDef
(SubscribeOpSpec ('Postgres pgKind)
-> SubscribeOpSpec ('Postgres pgKind)
rewriteOpSpec (SubscribeOpSpec ('Postgres pgKind)
-> SubscribeOpSpec ('Postgres pgKind))
-> Maybe (SubscribeOpSpec ('Postgres pgKind))
-> Maybe (SubscribeOpSpec ('Postgres pgKind))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (SubscribeOpSpec ('Postgres pgKind))
ins)
(SubscribeOpSpec ('Postgres pgKind)
-> SubscribeOpSpec ('Postgres pgKind)
rewriteOpSpec (SubscribeOpSpec ('Postgres pgKind)
-> SubscribeOpSpec ('Postgres pgKind))
-> Maybe (SubscribeOpSpec ('Postgres pgKind))
-> Maybe (SubscribeOpSpec ('Postgres pgKind))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (SubscribeOpSpec ('Postgres pgKind))
upd)
(SubscribeOpSpec ('Postgres pgKind)
-> SubscribeOpSpec ('Postgres pgKind)
rewriteOpSpec (SubscribeOpSpec ('Postgres pgKind)
-> SubscribeOpSpec ('Postgres pgKind))
-> Maybe (SubscribeOpSpec ('Postgres pgKind))
-> Maybe (SubscribeOpSpec ('Postgres pgKind))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (SubscribeOpSpec ('Postgres pgKind))
del)
Maybe Bool
man
rewriteEventTriggerConf :: EventTriggerConf ('Postgres pgKind)
-> EventTriggerConf ('Postgres pgKind)
rewriteEventTriggerConf EventTriggerConf ('Postgres pgKind)
etc =
EventTriggerConf ('Postgres pgKind)
etc
{ etcDefinition :: TriggerOpsDef ('Postgres pgKind)
etcDefinition =
TriggerOpsDef ('Postgres pgKind)
-> TriggerOpsDef ('Postgres pgKind)
rewriteTrigOpsDef (TriggerOpsDef ('Postgres pgKind)
-> TriggerOpsDef ('Postgres pgKind))
-> TriggerOpsDef ('Postgres pgKind)
-> TriggerOpsDef ('Postgres pgKind)
forall a b. (a -> b) -> a -> b
$ EventTriggerConf ('Postgres pgKind)
-> TriggerOpsDef ('Postgres pgKind)
forall (b :: BackendType). EventTriggerConf b -> TriggerOpsDef b
etcDefinition EventTriggerConf ('Postgres pgKind)
etc
}
getNewCol :: PGCol -> PGCol
getNewCol PGCol
col =
if QualifiedTable
table QualifiedTable -> QualifiedTable -> Bool
forall a. Eq a => a -> a -> Bool
== QualifiedTable
refTable Bool -> Bool -> Bool
&& PGCol
oCol PGCol -> PGCol -> Bool
forall a. Eq a => a -> a -> Bool
== PGCol
col then PGCol
nCol else PGCol
col
unlockEventsInSource ::
MonadIO m =>
SourceConfig ('Postgres pgKind) ->
NE.NESet EventId ->
m (Either QErr Int)
unlockEventsInSource :: SourceConfig ('Postgres pgKind)
-> NESet EventId -> m (Either QErr Int)
unlockEventsInSource SourceConfig ('Postgres pgKind)
sourceConfig NESet EventId
eventIds =
IO (Either QErr Int) -> m (Either QErr Int)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either QErr Int) -> m (Either QErr Int))
-> IO (Either QErr Int) -> m (Either QErr Int)
forall a b. (a -> b) -> a -> b
$ PGSourceConfig -> TxET QErr IO Int -> IO (Either QErr Int)
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> TxET QErr m a -> m (Either QErr a)
runPgSourceWriteTx PGSourceConfig
SourceConfig ('Postgres pgKind)
sourceConfig ([EventId] -> TxET QErr IO Int
unlockEventsTx ([EventId] -> TxET QErr IO Int) -> [EventId] -> TxET QErr IO Int
forall a b. (a -> b) -> a -> b
$ NESet EventId -> [EventId]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList NESet EventId
eventIds)
checkIfTriggerExists ::
(MonadIO m, MonadError QErr m) =>
PGSourceConfig ->
TriggerName ->
HashSet Ops ->
m Bool
checkIfTriggerExists :: PGSourceConfig -> TriggerName -> HashSet Ops -> m Bool
checkIfTriggerExists PGSourceConfig
sourceConfig TriggerName
triggerName HashSet Ops
ops = do
m (Either QErr Bool) -> m Bool
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM (m (Either QErr Bool) -> m Bool) -> m (Either QErr Bool) -> m Bool
forall a b. (a -> b) -> a -> b
$
IO (Either QErr Bool) -> m (Either QErr Bool)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either QErr Bool) -> m (Either QErr Bool))
-> IO (Either QErr Bool) -> m (Either QErr Bool)
forall a b. (a -> b) -> a -> b
$
PGSourceConfig -> TxET QErr IO Bool -> IO (Either QErr Bool)
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> TxET QErr m a -> m (Either QErr a)
runPgSourceWriteTx PGSourceConfig
sourceConfig (TxET QErr IO Bool -> IO (Either QErr Bool))
-> TxET QErr IO Bool -> IO (Either QErr Bool)
forall a b. (a -> b) -> a -> b
$
([Bool] -> Bool) -> TxET QErr IO [Bool] -> TxET QErr IO Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap [Bool] -> Bool
forall (t :: * -> *). Foldable t => t Bool -> Bool
or ((Ops -> TxET QErr IO Bool) -> [Ops] -> TxET QErr IO [Bool]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse (TriggerName -> Ops -> TxET QErr IO Bool
checkIfFunctionExistsQ TriggerName
triggerName) (HashSet Ops -> [Ops]
forall a. HashSet a -> [a]
HashSet.toList HashSet Ops
ops))
insertInvocation :: Invocation 'EventType -> Q.TxE QErr ()
insertInvocation :: Invocation 'EventType -> TxET QErr IO ()
insertInvocation Invocation 'EventType
invo = do
(PGTxErr -> QErr)
-> Query
-> (EventId, Maybe Int64, AltJ Value, AltJ Value)
-> Bool
-> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
Q.unitQE
PGTxErr -> QErr
defaultTxErrorHandler
[Q.sql|
INSERT INTO hdb_catalog.event_invocation_logs (event_id, status, request, response)
VALUES ($1, $2, $3, $4)
|]
( Invocation 'EventType -> EventId
forall (a :: TriggerTypes). Invocation a -> EventId
iEventId Invocation 'EventType
invo,
Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Int64) -> Maybe Int -> Maybe Int64
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Invocation 'EventType -> Maybe Int
forall (a :: TriggerTypes). Invocation a -> Maybe Int
iStatus Invocation 'EventType
invo :: Maybe Int64,
Value -> AltJ Value
forall a. a -> AltJ a
Q.AltJ (Value -> AltJ Value) -> Value -> AltJ Value
forall a b. (a -> b) -> a -> b
$ WebhookRequest -> Value
forall a. ToJSON a => a -> Value
toJSON (WebhookRequest -> Value) -> WebhookRequest -> Value
forall a b. (a -> b) -> a -> b
$ Invocation 'EventType -> WebhookRequest
forall (a :: TriggerTypes). Invocation a -> WebhookRequest
iRequest Invocation 'EventType
invo,
Value -> AltJ Value
forall a. a -> AltJ a
Q.AltJ (Value -> AltJ Value) -> Value -> AltJ Value
forall a b. (a -> b) -> a -> b
$ Response 'EventType -> Value
forall a. ToJSON a => a -> Value
toJSON (Response 'EventType -> Value) -> Response 'EventType -> Value
forall a b. (a -> b) -> a -> b
$ Invocation 'EventType -> Response 'EventType
forall (a :: TriggerTypes). Invocation a -> Response a
iResponse Invocation 'EventType
invo
)
Bool
True
(PGTxErr -> QErr)
-> Query -> Identity EventId -> Bool -> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
Q.unitQE
PGTxErr -> QErr
defaultTxErrorHandler
[Q.sql|
UPDATE hdb_catalog.event_log
SET tries = tries + 1
WHERE id = $1
|]
(EventId -> Identity EventId
forall a. a -> Identity a
Identity (EventId -> Identity EventId) -> EventId -> Identity EventId
forall a b. (a -> b) -> a -> b
$ Invocation 'EventType -> EventId
forall (a :: TriggerTypes). Invocation a -> EventId
iEventId Invocation 'EventType
invo)
Bool
True
insertPGManualEvent ::
QualifiedTable ->
TriggerName ->
Value ->
Q.TxE QErr EventId
insertPGManualEvent :: QualifiedTable -> TriggerName -> Value -> TxET QErr IO EventId
insertPGManualEvent (QualifiedObject SchemaName
schemaName TableName
tableName) TriggerName
triggerName Value
rowData = do
Identity EventId -> EventId
forall a. Identity a -> a
runIdentity (Identity EventId -> EventId)
-> (SingleRow (Identity EventId) -> Identity EventId)
-> SingleRow (Identity EventId)
-> EventId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SingleRow (Identity EventId) -> Identity EventId
forall a. SingleRow a -> a
Q.getRow
(SingleRow (Identity EventId) -> EventId)
-> TxET QErr IO (SingleRow (Identity EventId))
-> TxET QErr IO EventId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> (SchemaName, TableName, TriggerName, Text, AltJ Value)
-> Bool
-> TxET QErr IO (SingleRow (Identity EventId))
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
Q.withQE
PGTxErr -> QErr
defaultTxErrorHandler
[Q.sql|
SELECT hdb_catalog.insert_event_log($1, $2, $3, $4, $5)
|]
(SchemaName
schemaName, TableName
tableName, TriggerName
triggerName, (Ops -> Text
forall a. Show a => a -> Text
tshow Ops
MANUAL), Value -> AltJ Value
forall a. a -> AltJ a
Q.AltJ Value
rowData)
Bool
False
archiveEvents :: TriggerName -> Q.TxE QErr ()
archiveEvents :: TriggerName -> TxET QErr IO ()
archiveEvents TriggerName
trn =
(PGTxErr -> QErr)
-> Query -> Identity TriggerName -> Bool -> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
Q.unitQE
PGTxErr -> QErr
defaultTxErrorHandler
[Q.sql|
UPDATE hdb_catalog.event_log
SET archived = 't'
WHERE trigger_name = $1
|]
(TriggerName -> Identity TriggerName
forall a. a -> Identity a
Identity TriggerName
trn)
Bool
False
getMaintenanceModeVersionTx :: Q.TxE QErr MaintenanceModeVersion
getMaintenanceModeVersionTx :: TxET QErr IO MaintenanceModeVersion
getMaintenanceModeVersionTx = TxET QErr IO MaintenanceModeVersion
-> TxET QErr IO MaintenanceModeVersion
forall (m :: * -> *) a. MonadTx m => TxE QErr a -> m a
liftTx (TxET QErr IO MaintenanceModeVersion
-> TxET QErr IO MaintenanceModeVersion)
-> TxET QErr IO MaintenanceModeVersion
-> TxET QErr IO MaintenanceModeVersion
forall a b. (a -> b) -> a -> b
$ do
MetadataCatalogVersion
catalogVersion <- TxE QErr MetadataCatalogVersion
getCatalogVersion
if
| MetadataCatalogVersion
catalogVersion MetadataCatalogVersion -> MetadataCatalogVersion -> Bool
forall a. Eq a => a -> a -> Bool
== Int -> MetadataCatalogVersion
MetadataCatalogVersion Int
40 -> MaintenanceModeVersion -> TxET QErr IO MaintenanceModeVersion
forall (f :: * -> *) a. Applicative f => a -> f a
pure MaintenanceModeVersion
PreviousMMVersion
| MetadataCatalogVersion
catalogVersion MetadataCatalogVersion -> MetadataCatalogVersion -> Bool
forall a. Eq a => a -> a -> Bool
== Int -> MetadataCatalogVersion
MetadataCatalogVersion Int
43 -> MaintenanceModeVersion -> TxET QErr IO MaintenanceModeVersion
forall (f :: * -> *) a. Applicative f => a -> f a
pure MaintenanceModeVersion
CurrentMMVersion
| MetadataCatalogVersion
catalogVersion MetadataCatalogVersion -> MetadataCatalogVersion -> Bool
forall a. Eq a => a -> a -> Bool
== MetadataCatalogVersion
latestCatalogVersion -> MaintenanceModeVersion -> TxET QErr IO MaintenanceModeVersion
forall (f :: * -> *) a. Applicative f => a -> f a
pure MaintenanceModeVersion
CurrentMMVersion
| Bool
otherwise ->
Text -> TxET QErr IO MaintenanceModeVersion
forall (m :: * -> *) a. QErrM m => Text -> m a
throw500 (Text -> TxET QErr IO MaintenanceModeVersion)
-> Text -> TxET QErr IO MaintenanceModeVersion
forall a b. (a -> b) -> a -> b
$
Text
"Maintenance mode is only supported with catalog versions: 40, 43 and "
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
forall a. Show a => a -> Text
tshow Text
latestCatalogVersionString
fetchEvents :: SourceName -> [TriggerName] -> FetchBatchSize -> Q.TxE QErr [Event ('Postgres pgKind)]
fetchEvents :: SourceName
-> [TriggerName]
-> FetchBatchSize
-> TxE QErr [Event ('Postgres pgKind)]
fetchEvents SourceName
source [TriggerName]
triggerNames (FetchBatchSize Int
fetchBatchSize) =
((EventId, SchemaName, TableName, TriggerName, AltJ Value, Int,
UTCTime)
-> Event ('Postgres pgKind))
-> [(EventId, SchemaName, TableName, TriggerName, AltJ Value, Int,
UTCTime)]
-> [Event ('Postgres pgKind)]
forall a b. (a -> b) -> [a] -> [b]
map (EventId, SchemaName, TableName, TriggerName, AltJ Value, Int,
UTCTime)
-> Event ('Postgres pgKind)
uncurryEvent
([(EventId, SchemaName, TableName, TriggerName, AltJ Value, Int,
UTCTime)]
-> [Event ('Postgres pgKind)])
-> TxET
QErr
IO
[(EventId, SchemaName, TableName, TriggerName, AltJ Value, Int,
UTCTime)]
-> TxE QErr [Event ('Postgres pgKind)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> (Word64, PGTextArray)
-> Bool
-> TxET
QErr
IO
[(EventId, SchemaName, TableName, TriggerName, AltJ Value, Int,
UTCTime)]
forall (m :: * -> *) a r e.
(MonadIO m, FromRow a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m [a]
Q.listQE
PGTxErr -> QErr
defaultTxErrorHandler
[Q.sql|
UPDATE hdb_catalog.event_log
SET locked = NOW()
WHERE id IN ( SELECT l.id
FROM hdb_catalog.event_log l
WHERE l.delivered = 'f' and l.error = 'f'
and (l.locked IS NULL or l.locked < (NOW() - interval '30 minute'))
and (l.next_retry_at is NULL or l.next_retry_at <= now())
and l.archived = 'f'
and l.trigger_name = ANY($2)
/* NB: this ordering is important for our index `event_log_fetch_events` */
/* (see `init_pg_source.sql`) */
ORDER BY locked NULLS FIRST, next_retry_at NULLS FIRST, created_at
LIMIT $1
FOR UPDATE SKIP LOCKED )
RETURNING id, schema_name, table_name, trigger_name, payload::json, tries, created_at
|]
(Word64
limit, PGTextArray
triggerNamesTxt)
Bool
True
where
uncurryEvent :: (EventId, SchemaName, TableName, TriggerName, AltJ Value, Int,
UTCTime)
-> Event ('Postgres pgKind)
uncurryEvent (EventId
id', SchemaName
sourceName, TableName
tableName, TriggerName
triggerName, Q.AltJ Value
payload, Int
tries, UTCTime
created) =
Event :: forall (b :: BackendType).
EventId
-> SourceName
-> TableName b
-> TriggerMetadata
-> Value
-> Int
-> UTCTime
-> Event b
Event
{ eId :: EventId
eId = EventId
id',
eSource :: SourceName
eSource = SourceName
source,
eTable :: TableName ('Postgres pgKind)
eTable = SchemaName -> TableName -> QualifiedTable
forall a. SchemaName -> a -> QualifiedObject a
QualifiedObject SchemaName
sourceName TableName
tableName,
eTrigger :: TriggerMetadata
eTrigger = TriggerName -> TriggerMetadata
TriggerMetadata TriggerName
triggerName,
eEvent :: Value
eEvent = Value
payload,
eTries :: Int
eTries = Int
tries,
eCreatedAt :: UTCTime
eCreatedAt = UTCTime
created
}
limit :: Word64
limit = Int -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
fetchBatchSize :: Word64
triggerNamesTxt :: PGTextArray
triggerNamesTxt = [Text] -> PGTextArray
PGTextArray ([Text] -> PGTextArray) -> [Text] -> PGTextArray
forall a b. (a -> b) -> a -> b
$ TriggerName -> Text
triggerNameToTxt (TriggerName -> Text) -> [TriggerName] -> [Text]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [TriggerName]
triggerNames
fetchEventsMaintenanceMode :: SourceName -> [TriggerName] -> FetchBatchSize -> MaintenanceModeVersion -> Q.TxE QErr [Event ('Postgres pgKind)]
fetchEventsMaintenanceMode :: SourceName
-> [TriggerName]
-> FetchBatchSize
-> MaintenanceModeVersion
-> TxE QErr [Event ('Postgres pgKind)]
fetchEventsMaintenanceMode SourceName
sourceName [TriggerName]
triggerNames FetchBatchSize
fetchBatchSize = \case
MaintenanceModeVersion
PreviousMMVersion ->
((EventId, SchemaName, TableName, TriggerName, AltJ Value, Int,
UTCTime)
-> Event ('Postgres pgKind))
-> [(EventId, SchemaName, TableName, TriggerName, AltJ Value, Int,
UTCTime)]
-> [Event ('Postgres pgKind)]
forall a b. (a -> b) -> [a] -> [b]
map (EventId, SchemaName, TableName, TriggerName, AltJ Value, Int,
UTCTime)
-> Event ('Postgres pgKind)
forall (b :: BackendType) a.
(TableName b ~ QualifiedObject a) =>
(EventId, SchemaName, a, TriggerName, AltJ Value, Int, UTCTime)
-> Event b
uncurryEvent
([(EventId, SchemaName, TableName, TriggerName, AltJ Value, Int,
UTCTime)]
-> [Event ('Postgres pgKind)])
-> TxET
QErr
IO
[(EventId, SchemaName, TableName, TriggerName, AltJ Value, Int,
UTCTime)]
-> TxE QErr [Event ('Postgres pgKind)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> Identity Word64
-> Bool
-> TxET
QErr
IO
[(EventId, SchemaName, TableName, TriggerName, AltJ Value, Int,
UTCTime)]
forall (m :: * -> *) a r e.
(MonadIO m, FromRow a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m [a]
Q.listQE
PGTxErr -> QErr
defaultTxErrorHandler
[Q.sql|
UPDATE hdb_catalog.event_log
SET locked = 't'
WHERE id IN ( SELECT l.id
FROM hdb_catalog.event_log l
WHERE l.delivered = 'f' and l.error = 'f' and l.locked = 'f'
and (l.next_retry_at is NULL or l.next_retry_at <= now())
and l.archived = 'f'
ORDER BY created_at
LIMIT $1
FOR UPDATE SKIP LOCKED )
RETURNING id, schema_name, table_name, trigger_name, payload::json, tries, created_at
|]
(Word64 -> Identity Word64
forall a. a -> Identity a
Identity Word64
limit)
Bool
True
where
uncurryEvent :: (EventId, SchemaName, a, TriggerName, AltJ Value, Int, UTCTime)
-> Event b
uncurryEvent (EventId
id', SchemaName
sn, a
tn, TriggerName
trn, Q.AltJ Value
payload, Int
tries, UTCTime
created) =
Event :: forall (b :: BackendType).
EventId
-> SourceName
-> TableName b
-> TriggerMetadata
-> Value
-> Int
-> UTCTime
-> Event b
Event
{ eId :: EventId
eId = EventId
id',
eSource :: SourceName
eSource = SourceName
SNDefault,
eTable :: TableName b
eTable = SchemaName -> a -> QualifiedObject a
forall a. SchemaName -> a -> QualifiedObject a
QualifiedObject SchemaName
sn a
tn,
eTrigger :: TriggerMetadata
eTrigger = TriggerName -> TriggerMetadata
TriggerMetadata TriggerName
trn,
eEvent :: Value
eEvent = Value
payload,
eTries :: Int
eTries = Int
tries,
eCreatedAt :: UTCTime
eCreatedAt = UTCTime
created
}
limit :: Word64
limit = Int -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (FetchBatchSize -> Int
_unFetchBatchSize FetchBatchSize
fetchBatchSize) :: Word64
MaintenanceModeVersion
CurrentMMVersion -> SourceName
-> [TriggerName]
-> FetchBatchSize
-> TxE QErr [Event ('Postgres pgKind)]
forall (pgKind :: PostgresKind).
SourceName
-> [TriggerName]
-> FetchBatchSize
-> TxE QErr [Event ('Postgres pgKind)]
fetchEvents SourceName
sourceName [TriggerName]
triggerNames FetchBatchSize
fetchBatchSize
setSuccessTx :: Event ('Postgres pgKind) -> MaintenanceMode MaintenanceModeVersion -> Q.TxE QErr ()
setSuccessTx :: Event ('Postgres pgKind)
-> MaintenanceMode MaintenanceModeVersion -> TxET QErr IO ()
setSuccessTx Event ('Postgres pgKind)
e = \case
(MaintenanceModeEnabled MaintenanceModeVersion
PreviousMMVersion) ->
(PGTxErr -> QErr)
-> Query -> Identity EventId -> Bool -> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
Q.unitQE
PGTxErr -> QErr
defaultTxErrorHandler
[Q.sql|
UPDATE hdb_catalog.event_log
SET delivered = 't', next_retry_at = NULL, locked = 'f'
WHERE id = $1
|]
(EventId -> Identity EventId
forall a. a -> Identity a
Identity (EventId -> Identity EventId) -> EventId -> Identity EventId
forall a b. (a -> b) -> a -> b
$ Event ('Postgres pgKind) -> EventId
forall (b :: BackendType). Event b -> EventId
eId Event ('Postgres pgKind)
e)
Bool
True
(MaintenanceModeEnabled MaintenanceModeVersion
CurrentMMVersion) -> TxET QErr IO ()
latestVersionSetSuccess
MaintenanceMode MaintenanceModeVersion
MaintenanceModeDisabled -> TxET QErr IO ()
latestVersionSetSuccess
where
latestVersionSetSuccess :: TxET QErr IO ()
latestVersionSetSuccess =
(PGTxErr -> QErr)
-> Query -> Identity EventId -> Bool -> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
Q.unitQE
PGTxErr -> QErr
defaultTxErrorHandler
[Q.sql|
UPDATE hdb_catalog.event_log
SET delivered = 't', next_retry_at = NULL, locked = NULL
WHERE id = $1
|]
(EventId -> Identity EventId
forall a. a -> Identity a
Identity (EventId -> Identity EventId) -> EventId -> Identity EventId
forall a b. (a -> b) -> a -> b
$ Event ('Postgres pgKind) -> EventId
forall (b :: BackendType). Event b -> EventId
eId Event ('Postgres pgKind)
e)
Bool
True
setErrorTx :: Event ('Postgres pgKind) -> MaintenanceMode MaintenanceModeVersion -> Q.TxE QErr ()
setErrorTx :: Event ('Postgres pgKind)
-> MaintenanceMode MaintenanceModeVersion -> TxET QErr IO ()
setErrorTx Event ('Postgres pgKind)
e = \case
(MaintenanceModeEnabled MaintenanceModeVersion
PreviousMMVersion) ->
(PGTxErr -> QErr)
-> Query -> Identity EventId -> Bool -> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
Q.unitQE
PGTxErr -> QErr
defaultTxErrorHandler
[Q.sql|
UPDATE hdb_catalog.event_log
SET error = 't', next_retry_at = NULL, locked = 'f'
WHERE id = $1
|]
(EventId -> Identity EventId
forall a. a -> Identity a
Identity (EventId -> Identity EventId) -> EventId -> Identity EventId
forall a b. (a -> b) -> a -> b
$ Event ('Postgres pgKind) -> EventId
forall (b :: BackendType). Event b -> EventId
eId Event ('Postgres pgKind)
e)
Bool
True
(MaintenanceModeEnabled MaintenanceModeVersion
CurrentMMVersion) -> TxET QErr IO ()
latestVersionSetError
MaintenanceMode MaintenanceModeVersion
MaintenanceModeDisabled -> TxET QErr IO ()
latestVersionSetError
where
latestVersionSetError :: TxET QErr IO ()
latestVersionSetError =
(PGTxErr -> QErr)
-> Query -> Identity EventId -> Bool -> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
Q.unitQE
PGTxErr -> QErr
defaultTxErrorHandler
[Q.sql|
UPDATE hdb_catalog.event_log
SET error = 't', next_retry_at = NULL, locked = NULL
WHERE id = $1
|]
(EventId -> Identity EventId
forall a. a -> Identity a
Identity (EventId -> Identity EventId) -> EventId -> Identity EventId
forall a b. (a -> b) -> a -> b
$ Event ('Postgres pgKind) -> EventId
forall (b :: BackendType). Event b -> EventId
eId Event ('Postgres pgKind)
e)
Bool
True
setRetryTx :: Event ('Postgres pgKind) -> Time.UTCTime -> MaintenanceMode MaintenanceModeVersion -> Q.TxE QErr ()
setRetryTx :: Event ('Postgres pgKind)
-> UTCTime
-> MaintenanceMode MaintenanceModeVersion
-> TxET QErr IO ()
setRetryTx Event ('Postgres pgKind)
e UTCTime
time = \case
(MaintenanceModeEnabled MaintenanceModeVersion
PreviousMMVersion) ->
(PGTxErr -> QErr)
-> Query -> (UTCTime, EventId) -> Bool -> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
Q.unitQE
PGTxErr -> QErr
defaultTxErrorHandler
[Q.sql|
UPDATE hdb_catalog.event_log
SET next_retry_at = $1, locked = 'f'
WHERE id = $2
|]
(UTCTime
time, Event ('Postgres pgKind) -> EventId
forall (b :: BackendType). Event b -> EventId
eId Event ('Postgres pgKind)
e)
Bool
True
(MaintenanceModeEnabled MaintenanceModeVersion
CurrentMMVersion) -> TxET QErr IO ()
latestVersionSetRetry
MaintenanceMode MaintenanceModeVersion
MaintenanceModeDisabled -> TxET QErr IO ()
latestVersionSetRetry
where
latestVersionSetRetry :: TxET QErr IO ()
latestVersionSetRetry =
(PGTxErr -> QErr)
-> Query -> (UTCTime, EventId) -> Bool -> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
Q.unitQE
PGTxErr -> QErr
defaultTxErrorHandler
[Q.sql|
UPDATE hdb_catalog.event_log
SET next_retry_at = $1, locked = NULL
WHERE id = $2
|]
(UTCTime
time, Event ('Postgres pgKind) -> EventId
forall (b :: BackendType). Event b -> EventId
eId Event ('Postgres pgKind)
e)
Bool
True
dropTriggerQ :: TriggerName -> Q.TxE QErr ()
dropTriggerQ :: TriggerName -> TxET QErr IO ()
dropTriggerQ TriggerName
trn =
(Ops -> TxET QErr IO ()) -> [Ops] -> TxET QErr IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (TriggerName -> Ops -> TxET QErr IO ()
dropTriggerOp TriggerName
trn) [Ops
INSERT, Ops
UPDATE, Ops
DELETE]
dropTriggerOp :: TriggerName -> Ops -> Q.TxE QErr ()
dropTriggerOp :: TriggerName -> Ops -> TxET QErr IO ()
dropTriggerOp TriggerName
triggerName Ops
triggerOp =
(PGTxErr -> QErr) -> Query -> () -> Bool -> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
Q.unitQE
PGTxErr -> QErr
defaultTxErrorHandler
(Text -> Query
Q.fromText (Text -> Query) -> Text -> Query
forall a b. (a -> b) -> a -> b
$ Ops -> Text
getDropFuncSql Ops
triggerOp)
()
Bool
False
where
getDropFuncSql :: Ops -> Text
getDropFuncSql :: Ops -> Text
getDropFuncSql Ops
op =
Text
"DROP FUNCTION IF EXISTS"
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" hdb_catalog."
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> QualifiedTriggerName -> Text
unQualifiedTriggerName (Ops -> TriggerName -> QualifiedTriggerName
pgIdenTrigger Ops
op TriggerName
triggerName)
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"()"
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" CASCADE"
checkEvent :: EventId -> Q.TxE QErr ()
checkEvent :: EventId -> TxET QErr IO ()
checkEvent EventId
eid = do
[Identity Bool]
events <-
(PGTxErr -> QErr)
-> Query
-> Identity EventId
-> Bool
-> TxET QErr IO [Identity Bool]
forall (m :: * -> *) a r e.
(MonadIO m, FromRow a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m [a]
Q.listQE
PGTxErr -> QErr
defaultTxErrorHandler
[Q.sql|
SELECT l.locked IS NOT NULL AND l.locked >= (NOW() - interval '30 minute')
FROM hdb_catalog.event_log l
WHERE l.id = $1
|]
(EventId -> Identity EventId
forall a. a -> Identity a
Identity EventId
eid)
Bool
True
Identity Bool
event <- [Identity Bool] -> TxET QErr IO (Identity Bool)
forall (m :: * -> *) a. MonadError QErr m => [a] -> m a
getEvent [Identity Bool]
events
Identity Bool -> TxET QErr IO ()
forall (f :: * -> *). MonadError QErr f => Identity Bool -> f ()
assertEventUnlocked Identity Bool
event
where
getEvent :: [a] -> m a
getEvent [] = Code -> Text -> m a
forall (m :: * -> *) a. QErrM m => Code -> Text -> m a
throw400 Code
NotExists Text
"event not found"
getEvent (a
x : [a]
_) = a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
x
assertEventUnlocked :: Identity Bool -> f ()
assertEventUnlocked (Identity Bool
locked) =
Bool -> f () -> f ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
locked (f () -> f ()) -> f () -> f ()
forall a b. (a -> b) -> a -> b
$
Code -> Text -> f ()
forall (m :: * -> *) a. QErrM m => Code -> Text -> m a
throw400 Code
Busy Text
"event is already being processed"
markForDelivery :: EventId -> Q.TxE QErr ()
markForDelivery :: EventId -> TxET QErr IO ()
markForDelivery EventId
eid =
(PGTxErr -> QErr)
-> Query -> Identity EventId -> Bool -> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
Q.unitQE
PGTxErr -> QErr
defaultTxErrorHandler
[Q.sql|
UPDATE hdb_catalog.event_log
SET
delivered = 'f',
error = 'f',
tries = 0
WHERE id = $1
|]
(EventId -> Identity EventId
forall a. a -> Identity a
Identity EventId
eid)
Bool
True
redeliverEventTx :: EventId -> Q.TxE QErr ()
redeliverEventTx :: EventId -> TxET QErr IO ()
redeliverEventTx EventId
eventId = do
EventId -> TxET QErr IO ()
checkEvent EventId
eventId
EventId -> TxET QErr IO ()
markForDelivery EventId
eventId
unlockEventsTx :: [EventId] -> Q.TxE QErr Int
unlockEventsTx :: [EventId] -> TxET QErr IO Int
unlockEventsTx [EventId]
eventIds =
Identity Int -> Int
forall a. Identity a -> a
runIdentity (Identity Int -> Int)
-> (SingleRow (Identity Int) -> Identity Int)
-> SingleRow (Identity Int)
-> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SingleRow (Identity Int) -> Identity Int
forall a. SingleRow a -> a
Q.getRow
(SingleRow (Identity Int) -> Int)
-> TxET QErr IO (SingleRow (Identity Int)) -> TxET QErr IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> Identity PGTextArray
-> Bool
-> TxET QErr IO (SingleRow (Identity Int))
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
Q.withQE
PGTxErr -> QErr
defaultTxErrorHandler
[Q.sql|
WITH "cte" AS
(UPDATE hdb_catalog.event_log
SET locked = NULL
WHERE id = ANY($1::text[])
-- only unlock those events that have been locked, it's possible
-- that an event has been processed but not yet been removed from
-- the saved locked events, which will lead to a double send
AND locked IS NOT NULL
RETURNING *)
SELECT count(*) FROM "cte"
|]
(PGTextArray -> Identity PGTextArray
forall a. a -> Identity a
Identity (PGTextArray -> Identity PGTextArray)
-> PGTextArray -> Identity PGTextArray
forall a b. (a -> b) -> a -> b
$ [Text] -> PGTextArray
PGTextArray ([Text] -> PGTextArray) -> [Text] -> PGTextArray
forall a b. (a -> b) -> a -> b
$ (EventId -> Text) -> [EventId] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map EventId -> Text
unEventId [EventId]
eventIds)
Bool
True
newtype QualifiedTriggerName = QualifiedTriggerName {QualifiedTriggerName -> Text
unQualifiedTriggerName :: Text}
deriving (Int -> QualifiedTriggerName -> ShowS
[QualifiedTriggerName] -> ShowS
QualifiedTriggerName -> String
(Int -> QualifiedTriggerName -> ShowS)
-> (QualifiedTriggerName -> String)
-> ([QualifiedTriggerName] -> ShowS)
-> Show QualifiedTriggerName
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [QualifiedTriggerName] -> ShowS
$cshowList :: [QualifiedTriggerName] -> ShowS
show :: QualifiedTriggerName -> String
$cshow :: QualifiedTriggerName -> String
showsPrec :: Int -> QualifiedTriggerName -> ShowS
$cshowsPrec :: Int -> QualifiedTriggerName -> ShowS
Show, QualifiedTriggerName -> QualifiedTriggerName -> Bool
(QualifiedTriggerName -> QualifiedTriggerName -> Bool)
-> (QualifiedTriggerName -> QualifiedTriggerName -> Bool)
-> Eq QualifiedTriggerName
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: QualifiedTriggerName -> QualifiedTriggerName -> Bool
$c/= :: QualifiedTriggerName -> QualifiedTriggerName -> Bool
== :: QualifiedTriggerName -> QualifiedTriggerName -> Bool
$c== :: QualifiedTriggerName -> QualifiedTriggerName -> Bool
Eq, QualifiedTriggerName -> PrepArg
(QualifiedTriggerName -> PrepArg) -> ToPrepArg QualifiedTriggerName
forall a. (a -> PrepArg) -> ToPrepArg a
toPrepVal :: QualifiedTriggerName -> PrepArg
$ctoPrepVal :: QualifiedTriggerName -> PrepArg
Q.ToPrepArg)
pgTriggerName :: Ops -> TriggerName -> QualifiedTriggerName
pgTriggerName :: Ops -> TriggerName -> QualifiedTriggerName
pgTriggerName Ops
op TriggerName
trn = Ops -> Text -> QualifiedTriggerName
forall a. Show a => a -> Text -> QualifiedTriggerName
qualifyTriggerName Ops
op (Text -> QualifiedTriggerName) -> Text -> QualifiedTriggerName
forall a b. (a -> b) -> a -> b
$ TriggerName -> Text
triggerNameToTxt TriggerName
trn
where
qualifyTriggerName :: a -> Text -> QualifiedTriggerName
qualifyTriggerName a
op' Text
trn' =
Text -> QualifiedTriggerName
QualifiedTriggerName (Text -> QualifiedTriggerName) -> Text -> QualifiedTriggerName
forall a b. (a -> b) -> a -> b
$ Text
"notify_hasura_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
trn' Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> a -> Text
forall a. Show a => a -> Text
tshow a
op'
pgIdenTrigger :: Ops -> TriggerName -> QualifiedTriggerName
pgIdenTrigger :: Ops -> TriggerName -> QualifiedTriggerName
pgIdenTrigger Ops
op = Text -> QualifiedTriggerName
QualifiedTriggerName (Text -> QualifiedTriggerName)
-> (TriggerName -> Text) -> TriggerName -> QualifiedTriggerName
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> Text
pgFmtIdentifier (Text -> Text) -> (TriggerName -> Text) -> TriggerName -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. QualifiedTriggerName -> Text
unQualifiedTriggerName (QualifiedTriggerName -> Text)
-> (TriggerName -> QualifiedTriggerName) -> TriggerName -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Ops -> TriggerName -> QualifiedTriggerName
pgTriggerName Ops
op
mkTriggerFunctionQ ::
forall pgKind m.
(Backend ('Postgres pgKind), MonadTx m, MonadReader ServerConfigCtx m) =>
TriggerName ->
QualifiedTable ->
[ColumnInfo ('Postgres pgKind)] ->
Ops ->
SubscribeOpSpec ('Postgres pgKind) ->
m QualifiedTriggerName
mkTriggerFunctionQ :: TriggerName
-> QualifiedTable
-> [ColumnInfo ('Postgres pgKind)]
-> Ops
-> SubscribeOpSpec ('Postgres pgKind)
-> m QualifiedTriggerName
mkTriggerFunctionQ TriggerName
triggerName (QualifiedObject SchemaName
schema TableName
table) [ColumnInfo ('Postgres pgKind)]
allCols Ops
op (SubscribeOpSpec SubscribeColumns ('Postgres pgKind)
listenColumns Maybe (SubscribeColumns ('Postgres pgKind))
deliveryColumns') = do
StringifyNumbers
strfyNum <- SQLGenCtx -> StringifyNumbers
stringifyNum (SQLGenCtx -> StringifyNumbers)
-> (ServerConfigCtx -> SQLGenCtx)
-> ServerConfigCtx
-> StringifyNumbers
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ServerConfigCtx -> SQLGenCtx
_sccSQLGenCtx (ServerConfigCtx -> StringifyNumbers)
-> m ServerConfigCtx -> m StringifyNumbers
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m ServerConfigCtx
forall r (m :: * -> *). MonadReader r m => m r
ask
let dbQualifiedTriggerName :: QualifiedTriggerName
dbQualifiedTriggerName = Ops -> TriggerName -> QualifiedTriggerName
pgIdenTrigger Ops
op TriggerName
triggerName
() <-
TxET QErr IO () -> m ()
forall (m :: * -> *) a. MonadTx m => TxE QErr a -> m a
liftTx (TxET QErr IO () -> m ()) -> TxET QErr IO () -> m ()
forall a b. (a -> b) -> a -> b
$
(PGTxErr -> QErr) -> Query -> TxET QErr IO ()
forall (m :: * -> *) a e.
(MonadIO m, FromRes a) =>
(PGTxErr -> e) -> Query -> TxET e m a
Q.multiQE PGTxErr -> QErr
defaultTxErrorHandler (Query -> TxET QErr IO ()) -> Query -> TxET QErr IO ()
forall a b. (a -> b) -> a -> b
$
Text -> Query
Q.fromText (Text -> Query) -> (Text -> Text) -> Text -> Query
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> Text
TL.toStrict (Text -> Query) -> Text -> Query
forall a b. (a -> b) -> a -> b
$
let
deliveryColumns :: SubscribeColumns ('Postgres pgKind)
deliveryColumns = SubscribeColumns ('Postgres pgKind)
-> Maybe (SubscribeColumns ('Postgres pgKind))
-> SubscribeColumns ('Postgres pgKind)
forall a. a -> Maybe a -> a
fromMaybe SubscribeColumns ('Postgres pgKind)
forall (b :: BackendType). SubscribeColumns b
SubCStar Maybe (SubscribeColumns ('Postgres pgKind))
deliveryColumns'
getApplicableColumns :: SubscribeColumns ('Postgres pgKind)
-> [ColumnInfo ('Postgres pgKind)]
getApplicableColumns = \case
SubscribeColumns ('Postgres pgKind)
SubCStar -> [ColumnInfo ('Postgres pgKind)]
allCols
SubCArray [Column ('Postgres pgKind)]
cols -> [Column ('Postgres pgKind)]
-> [ColumnInfo ('Postgres pgKind)]
-> [ColumnInfo ('Postgres pgKind)]
forall (b :: BackendType).
Backend b =>
[Column b] -> [ColumnInfo b] -> [ColumnInfo b]
getColInfos [Column ('Postgres pgKind)]
cols [ColumnInfo ('Postgres pgKind)]
allCols
applicableDeliveryCols :: [ColumnInfo ('Postgres pgKind)]
applicableDeliveryCols = SubscribeColumns ('Postgres pgKind)
-> [ColumnInfo ('Postgres pgKind)]
getApplicableColumns SubscribeColumns ('Postgres pgKind)
deliveryColumns
getRowExpression :: OpVar -> SQLExp
getRowExpression OpVar
opVar = SQLExp -> SQLExp
applyRowToJson' (SQLExp -> SQLExp) -> SQLExp -> SQLExp
forall a b. (a -> b) -> a -> b
$ OpVar
-> StringifyNumbers -> [ColumnInfo ('Postgres pgKind)] -> SQLExp
mkRowExpression OpVar
opVar StringifyNumbers
strfyNum [ColumnInfo ('Postgres pgKind)]
applicableDeliveryCols
applicableListenCols :: [ColumnInfo ('Postgres pgKind)]
applicableListenCols = SubscribeColumns ('Postgres pgKind)
-> [ColumnInfo ('Postgres pgKind)]
getApplicableColumns SubscribeColumns ('Postgres pgKind)
listenColumns
renderRow :: OpVar -> SQLExp
renderRow OpVar
opVar = SQLExp -> SQLExp
applyRow (SQLExp -> SQLExp) -> SQLExp -> SQLExp
forall a b. (a -> b) -> a -> b
$ OpVar
-> StringifyNumbers -> [ColumnInfo ('Postgres pgKind)] -> SQLExp
mkRowExpression OpVar
opVar StringifyNumbers
strfyNum [ColumnInfo ('Postgres pgKind)]
applicableListenCols
oldDataExp :: SQLExp
oldDataExp = case Ops
op of
Ops
INSERT -> SQLExp
SENull
Ops
UPDATE -> OpVar -> SQLExp
getRowExpression OpVar
OLD
Ops
DELETE -> OpVar -> SQLExp
getRowExpression OpVar
OLD
Ops
MANUAL -> SQLExp
SENull
newDataExp :: SQLExp
newDataExp = case Ops
op of
Ops
INSERT -> OpVar -> SQLExp
getRowExpression OpVar
NEW
Ops
UPDATE -> OpVar -> SQLExp
getRowExpression OpVar
NEW
Ops
DELETE -> SQLExp
SENull
Ops
MANUAL -> SQLExp
SENull
name :: Text
name = TriggerName -> Text
triggerNameToTxt TriggerName
triggerName
qualifiedTriggerName :: Text
qualifiedTriggerName = QualifiedTriggerName -> Text
unQualifiedTriggerName QualifiedTriggerName
dbQualifiedTriggerName
schemaName :: Text
schemaName = Text -> Text
pgFmtLit (Text -> Text) -> Text -> Text
forall a b. (a -> b) -> a -> b
$ SchemaName -> Text
getSchemaTxt SchemaName
schema
tableName :: Text
tableName = Text -> Text
pgFmtLit (Text -> Text) -> Text -> Text
forall a b. (a -> b) -> a -> b
$ TableName -> Text
getTableTxt TableName
table
oldRow :: Text
oldRow = SQLExp -> Text
forall a. ToSQL a => a -> Text
toSQLTxt (SQLExp -> Text) -> SQLExp -> Text
forall a b. (a -> b) -> a -> b
$ OpVar -> SQLExp
renderRow OpVar
OLD
newRow :: Text
newRow = SQLExp -> Text
forall a. ToSQL a => a -> Text
toSQLTxt (SQLExp -> Text) -> SQLExp -> Text
forall a b. (a -> b) -> a -> b
$ OpVar -> SQLExp
renderRow OpVar
NEW
oldPayloadExpression :: Text
oldPayloadExpression = SQLExp -> Text
forall a. ToSQL a => a -> Text
toSQLTxt SQLExp
oldDataExp
newPayloadExpression :: Text
newPayloadExpression = SQLExp -> Text
forall a. ToSQL a => a -> Text
toSQLTxt SQLExp
newDataExp
in $(makeRelativeToProject "src-rsr/trigger.sql.shakespeare" >>= ST.stextFile)
QualifiedTriggerName -> m QualifiedTriggerName
forall (f :: * -> *) a. Applicative f => a -> f a
pure QualifiedTriggerName
dbQualifiedTriggerName
where
applyRowToJson' :: SQLExp -> SQLExp
applyRowToJson' SQLExp
e = Text -> [SQLExp] -> Maybe OrderByExp -> SQLExp
SEFnApp Text
"row_to_json" [SQLExp
e] Maybe OrderByExp
forall a. Maybe a
Nothing
applyRow :: SQLExp -> SQLExp
applyRow SQLExp
e = Text -> [SQLExp] -> Maybe OrderByExp -> SQLExp
SEFnApp Text
"row" [SQLExp
e] Maybe OrderByExp
forall a. Maybe a
Nothing
opToQual :: OpVar -> Qual
opToQual = Text -> Qual
QualVar (Text -> Qual) -> (OpVar -> Text) -> OpVar -> Qual
forall b c a. (b -> c) -> (a -> b) -> a -> c
. OpVar -> Text
forall a. Show a => a -> Text
tshow
mkRowExpression :: OpVar
-> StringifyNumbers -> [ColumnInfo ('Postgres pgKind)] -> SQLExp
mkRowExpression OpVar
opVar StringifyNumbers
strfyNum [ColumnInfo ('Postgres pgKind)]
columns =
[Extractor] -> SQLExp
mkRowExp ([Extractor] -> SQLExp) -> [Extractor] -> SQLExp
forall a b. (a -> b) -> a -> b
$ (ColumnInfo ('Postgres pgKind) -> Extractor)
-> [ColumnInfo ('Postgres pgKind)] -> [Extractor]
forall a b. (a -> b) -> [a] -> [b]
map (\ColumnInfo ('Postgres pgKind)
col -> SQLExp -> ColumnInfo ('Postgres pgKind) -> Extractor
forall (b :: BackendType).
(Column b ~ PGCol, ScalarType b ~ PGScalarType) =>
SQLExp -> ColumnInfo b -> Extractor
toExtractor (OpVar
-> StringifyNumbers -> ColumnInfo ('Postgres pgKind) -> SQLExp
mkQId OpVar
opVar StringifyNumbers
strfyNum ColumnInfo ('Postgres pgKind)
col) ColumnInfo ('Postgres pgKind)
col) [ColumnInfo ('Postgres pgKind)]
columns
mkQId :: OpVar
-> StringifyNumbers -> ColumnInfo ('Postgres pgKind) -> SQLExp
mkQId OpVar
opVar StringifyNumbers
strfyNum ColumnInfo ('Postgres pgKind)
colInfo =
StringifyNumbers
-> ColumnType ('Postgres pgKind)
-> Bool
-> Maybe NamingCase
-> SQLExp
-> SQLExp
forall (pgKind :: PostgresKind).
StringifyNumbers
-> ColumnType ('Postgres pgKind)
-> Bool
-> Maybe NamingCase
-> SQLExp
-> SQLExp
toJSONableExp StringifyNumbers
strfyNum (ColumnInfo ('Postgres pgKind) -> ColumnType ('Postgres pgKind)
forall (b :: BackendType). ColumnInfo b -> ColumnType b
ciType ColumnInfo ('Postgres pgKind)
colInfo) Bool
False Maybe NamingCase
forall a. Maybe a
Nothing (SQLExp -> SQLExp) -> SQLExp -> SQLExp
forall a b. (a -> b) -> a -> b
$
QIdentifier -> SQLExp
SEQIdentifier (QIdentifier -> SQLExp) -> QIdentifier -> SQLExp
forall a b. (a -> b) -> a -> b
$ Qual -> Identifier -> QIdentifier
QIdentifier (OpVar -> Qual
opToQual OpVar
opVar) (Identifier -> QIdentifier) -> Identifier -> QIdentifier
forall a b. (a -> b) -> a -> b
$ PGCol -> Identifier
forall a. IsIdentifier a => a -> Identifier
toIdentifier (PGCol -> Identifier) -> PGCol -> Identifier
forall a b. (a -> b) -> a -> b
$ ColumnInfo ('Postgres pgKind) -> Column ('Postgres pgKind)
forall (b :: BackendType). ColumnInfo b -> Column b
ciColumn ColumnInfo ('Postgres pgKind)
colInfo
toExtractor :: SQLExp -> ColumnInfo b -> Extractor
toExtractor SQLExp
sqlExp ColumnInfo b
column
| (ScalarType b -> Bool) -> ColumnType b -> Bool
forall (b :: BackendType).
(ScalarType b -> Bool) -> ColumnType b -> Bool
isScalarColumnWhere ScalarType b -> Bool
PGScalarType -> Bool
isGeoType (ColumnInfo b -> ColumnType b
forall (b :: BackendType). ColumnInfo b -> ColumnType b
ciType ColumnInfo b
column) = SQLExp -> Maybe ColumnAlias -> Extractor
Extractor SQLExp
sqlExp (ColumnAlias -> Maybe ColumnAlias
forall a. a -> Maybe a
Just (ColumnAlias -> Maybe ColumnAlias)
-> ColumnAlias -> Maybe ColumnAlias
forall a b. (a -> b) -> a -> b
$ ColumnInfo b -> ColumnAlias
forall (b :: BackendType).
(Column b ~ PGCol) =>
ColumnInfo b -> ColumnAlias
getAlias ColumnInfo b
column)
| Bool
otherwise = SQLExp -> Maybe ColumnAlias -> Extractor
Extractor SQLExp
sqlExp Maybe ColumnAlias
forall a. Maybe a
Nothing
getAlias :: ColumnInfo b -> ColumnAlias
getAlias ColumnInfo b
col = Identifier -> ColumnAlias
forall a. IsIdentifier a => a -> ColumnAlias
toColumnAlias (Identifier -> ColumnAlias) -> Identifier -> ColumnAlias
forall a b. (a -> b) -> a -> b
$ Text -> Identifier
Identifier (Text -> Identifier) -> Text -> Identifier
forall a b. (a -> b) -> a -> b
$ PGCol -> Text
getPGColTxt (ColumnInfo b -> Column b
forall (b :: BackendType). ColumnInfo b -> Column b
ciColumn ColumnInfo b
col)
checkIfTriggerExistsForTableQ ::
QualifiedTriggerName ->
QualifiedTable ->
Q.TxE QErr Bool
checkIfTriggerExistsForTableQ :: QualifiedTriggerName -> QualifiedTable -> TxET QErr IO Bool
checkIfTriggerExistsForTableQ (QualifiedTriggerName Text
triggerName) (QualifiedObject SchemaName
schemaName TableName
tableName) =
(SingleRow (Identity Bool) -> Bool)
-> TxET QErr IO (SingleRow (Identity Bool)) -> TxET QErr IO Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Identity Bool -> Bool
forall a. Identity a -> a
runIdentity (Identity Bool -> Bool)
-> (SingleRow (Identity Bool) -> Identity Bool)
-> SingleRow (Identity Bool)
-> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SingleRow (Identity Bool) -> Identity Bool
forall a. SingleRow a -> a
Q.getRow) (TxET QErr IO (SingleRow (Identity Bool)) -> TxET QErr IO Bool)
-> TxET QErr IO (SingleRow (Identity Bool)) -> TxET QErr IO Bool
forall a b. (a -> b) -> a -> b
$
(PGTxErr -> QErr)
-> Query
-> (Text, SchemaName, TableName)
-> Bool
-> TxET QErr IO (SingleRow (Identity Bool))
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
Q.withQE
PGTxErr -> QErr
defaultTxErrorHandler
[Q.sql|
SELECT EXISTS (
SELECT 1
FROM pg_trigger
WHERE NOT tgisinternal
AND tgname = $1 AND tgrelid = (quote_ident($2) || '.' || quote_ident($3))::regclass
)
|]
(Text
triggerName, SchemaName
schemaName, TableName
tableName)
Bool
True
checkIfFunctionExistsQ ::
TriggerName ->
Ops ->
Q.TxE QErr Bool
checkIfFunctionExistsQ :: TriggerName -> Ops -> TxET QErr IO Bool
checkIfFunctionExistsQ TriggerName
triggerName Ops
op = do
let qualifiedTriggerName :: QualifiedTriggerName
qualifiedTriggerName = Ops -> TriggerName -> QualifiedTriggerName
pgTriggerName Ops
op TriggerName
triggerName
(SingleRow (Identity Bool) -> Bool)
-> TxET QErr IO (SingleRow (Identity Bool)) -> TxET QErr IO Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Identity Bool -> Bool
forall a. Identity a -> a
runIdentity (Identity Bool -> Bool)
-> (SingleRow (Identity Bool) -> Identity Bool)
-> SingleRow (Identity Bool)
-> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SingleRow (Identity Bool) -> Identity Bool
forall a. SingleRow a -> a
Q.getRow) (TxET QErr IO (SingleRow (Identity Bool)) -> TxET QErr IO Bool)
-> TxET QErr IO (SingleRow (Identity Bool)) -> TxET QErr IO Bool
forall a b. (a -> b) -> a -> b
$
(PGTxErr -> QErr)
-> Query
-> Identity QualifiedTriggerName
-> Bool
-> TxET QErr IO (SingleRow (Identity Bool))
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
Q.withQE
PGTxErr -> QErr
defaultTxErrorHandler
[Q.sql|
SELECT EXISTS (
SELECT 1
FROM pg_catalog.pg_proc
JOIN pg_namespace ON pg_catalog.pg_proc.pronamespace = pg_namespace.oid
WHERE proname = $1
AND pg_namespace.nspname = 'hdb_catalog'
)
|]
(QualifiedTriggerName -> Identity QualifiedTriggerName
forall a. a -> Identity a
Identity QualifiedTriggerName
qualifiedTriggerName)
Bool
True
mkTrigger ::
forall pgKind m.
(Backend ('Postgres pgKind), MonadTx m, MonadReader ServerConfigCtx m) =>
TriggerName ->
QualifiedTable ->
[ColumnInfo ('Postgres pgKind)] ->
Ops ->
SubscribeOpSpec ('Postgres pgKind) ->
m ()
mkTrigger :: TriggerName
-> QualifiedTable
-> [ColumnInfo ('Postgres pgKind)]
-> Ops
-> SubscribeOpSpec ('Postgres pgKind)
-> m ()
mkTrigger TriggerName
triggerName QualifiedTable
table [ColumnInfo ('Postgres pgKind)]
allCols Ops
op SubscribeOpSpec ('Postgres pgKind)
subOpSpec = do
QualifiedTriggerName
dbTriggerName <- TriggerName
-> QualifiedTable
-> [ColumnInfo ('Postgres pgKind)]
-> Ops
-> SubscribeOpSpec ('Postgres pgKind)
-> m QualifiedTriggerName
forall (pgKind :: PostgresKind) (m :: * -> *).
(Backend ('Postgres pgKind), MonadTx m,
MonadReader ServerConfigCtx m) =>
TriggerName
-> QualifiedTable
-> [ColumnInfo ('Postgres pgKind)]
-> Ops
-> SubscribeOpSpec ('Postgres pgKind)
-> m QualifiedTriggerName
mkTriggerFunctionQ TriggerName
triggerName QualifiedTable
table [ColumnInfo ('Postgres pgKind)]
allCols Ops
op SubscribeOpSpec ('Postgres pgKind)
subOpSpec
Bool
doesTriggerExist <- TxET QErr IO Bool -> m Bool
forall (m :: * -> *) a. MonadTx m => TxE QErr a -> m a
liftTx (TxET QErr IO Bool -> m Bool) -> TxET QErr IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ QualifiedTriggerName -> QualifiedTable -> TxET QErr IO Bool
checkIfTriggerExistsForTableQ (Ops -> TriggerName -> QualifiedTriggerName
pgTriggerName Ops
op TriggerName
triggerName) QualifiedTable
table
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
doesTriggerExist (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
let sqlQuery :: Query
sqlQuery =
Text -> Query
Q.fromText (Text -> Query) -> Text -> Query
forall a b. (a -> b) -> a -> b
$ QualifiedTriggerName -> Text -> Text -> Text
forall a a.
(ToText a, ToText a) =>
QualifiedTriggerName -> a -> a -> Text
createTriggerSQL QualifiedTriggerName
dbTriggerName (QualifiedTable -> Text
forall a. ToSQL a => a -> Text
toSQLTxt QualifiedTable
table) (Ops -> Text
forall a. Show a => a -> Text
tshow Ops
op)
in TxET QErr IO () -> m ()
forall (m :: * -> *) a. MonadTx m => TxE QErr a -> m a
liftTx (TxET QErr IO () -> m ()) -> TxET QErr IO () -> m ()
forall a b. (a -> b) -> a -> b
$ (PGTxErr -> QErr) -> Query -> () -> Bool -> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
Q.unitQE PGTxErr -> QErr
defaultTxErrorHandler Query
sqlQuery () Bool
False
where
createTriggerSQL :: QualifiedTriggerName -> a -> a -> Text
createTriggerSQL (QualifiedTriggerName Text
triggerNameTxt) a
tableName a
opText =
[ST.st|
CREATE TRIGGER #{triggerNameTxt} AFTER #{opText} ON #{tableName} FOR EACH ROW EXECUTE PROCEDURE hdb_catalog.#{triggerNameTxt}()
|]
mkAllTriggersQ ::
forall pgKind m.
(Backend ('Postgres pgKind), MonadTx m, MonadReader ServerConfigCtx m) =>
TriggerName ->
QualifiedTable ->
[ColumnInfo ('Postgres pgKind)] ->
TriggerOpsDef ('Postgres pgKind) ->
m ()
mkAllTriggersQ :: TriggerName
-> QualifiedTable
-> [ColumnInfo ('Postgres pgKind)]
-> TriggerOpsDef ('Postgres pgKind)
-> m ()
mkAllTriggersQ TriggerName
triggerName QualifiedTable
table [ColumnInfo ('Postgres pgKind)]
allCols TriggerOpsDef ('Postgres pgKind)
fullspec = do
Maybe (SubscribeOpSpec ('Postgres pgKind))
-> (SubscribeOpSpec ('Postgres pgKind) -> m ()) -> m ()
forall (m :: * -> *) a.
Applicative m =>
Maybe a -> (a -> m ()) -> m ()
onJust (TriggerOpsDef ('Postgres pgKind)
-> Maybe (SubscribeOpSpec ('Postgres pgKind))
forall (b :: BackendType).
TriggerOpsDef b -> Maybe (SubscribeOpSpec b)
tdInsert TriggerOpsDef ('Postgres pgKind)
fullspec) (TriggerName
-> QualifiedTable
-> [ColumnInfo ('Postgres pgKind)]
-> Ops
-> SubscribeOpSpec ('Postgres pgKind)
-> m ()
forall (pgKind :: PostgresKind) (m :: * -> *).
(Backend ('Postgres pgKind), MonadTx m,
MonadReader ServerConfigCtx m) =>
TriggerName
-> QualifiedTable
-> [ColumnInfo ('Postgres pgKind)]
-> Ops
-> SubscribeOpSpec ('Postgres pgKind)
-> m ()
mkTrigger TriggerName
triggerName QualifiedTable
table [ColumnInfo ('Postgres pgKind)]
allCols Ops
INSERT)
Maybe (SubscribeOpSpec ('Postgres pgKind))
-> (SubscribeOpSpec ('Postgres pgKind) -> m ()) -> m ()
forall (m :: * -> *) a.
Applicative m =>
Maybe a -> (a -> m ()) -> m ()
onJust (TriggerOpsDef ('Postgres pgKind)
-> Maybe (SubscribeOpSpec ('Postgres pgKind))
forall (b :: BackendType).
TriggerOpsDef b -> Maybe (SubscribeOpSpec b)
tdUpdate TriggerOpsDef ('Postgres pgKind)
fullspec) (TriggerName
-> QualifiedTable
-> [ColumnInfo ('Postgres pgKind)]
-> Ops
-> SubscribeOpSpec ('Postgres pgKind)
-> m ()
forall (pgKind :: PostgresKind) (m :: * -> *).
(Backend ('Postgres pgKind), MonadTx m,
MonadReader ServerConfigCtx m) =>
TriggerName
-> QualifiedTable
-> [ColumnInfo ('Postgres pgKind)]
-> Ops
-> SubscribeOpSpec ('Postgres pgKind)
-> m ()
mkTrigger TriggerName
triggerName QualifiedTable
table [ColumnInfo ('Postgres pgKind)]
allCols Ops
UPDATE)
Maybe (SubscribeOpSpec ('Postgres pgKind))
-> (SubscribeOpSpec ('Postgres pgKind) -> m ()) -> m ()
forall (m :: * -> *) a.
Applicative m =>
Maybe a -> (a -> m ()) -> m ()
onJust (TriggerOpsDef ('Postgres pgKind)
-> Maybe (SubscribeOpSpec ('Postgres pgKind))
forall (b :: BackendType).
TriggerOpsDef b -> Maybe (SubscribeOpSpec b)
tdDelete TriggerOpsDef ('Postgres pgKind)
fullspec) (TriggerName
-> QualifiedTable
-> [ColumnInfo ('Postgres pgKind)]
-> Ops
-> SubscribeOpSpec ('Postgres pgKind)
-> m ()
forall (pgKind :: PostgresKind) (m :: * -> *).
(Backend ('Postgres pgKind), MonadTx m,
MonadReader ServerConfigCtx m) =>
TriggerName
-> QualifiedTable
-> [ColumnInfo ('Postgres pgKind)]
-> Ops
-> SubscribeOpSpec ('Postgres pgKind)
-> m ()
mkTrigger TriggerName
triggerName QualifiedTable
table [ColumnInfo ('Postgres pgKind)]
allCols Ops
DELETE)