{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE TemplateHaskell #-}
module Hasura.Backends.Postgres.DDL.EventTrigger
( insertManualEvent,
redeliverEvent,
dropTriggerAndArchiveEvents,
createTableEventTrigger,
createMissingSQLTriggers,
dropTriggerQ,
dropDanglingSQLTrigger,
mkAllTriggersQ,
getMaintenanceModeVersion,
fetchUndeliveredEvents,
setRetry,
recordSuccess,
recordError,
recordError',
unlockEventsInSource,
updateColumnInEventTrigger,
checkIfTriggerExists,
addCleanupSchedules,
deleteAllScheduledCleanups,
getCleanupEventsForDeletion,
updateCleanupEventStatusToDead,
updateCleanupEventStatusToPaused,
updateCleanupEventStatusToCompleted,
deleteEventTriggerLogs,
fetchEventLogs,
fetchEventInvocationLogs,
fetchEventById,
)
where
import Control.Monad.Trans.Control (MonadBaseControl)
import Data.Aeson
import Data.FileEmbed (makeRelativeToProject)
import Data.HashMap.Strict qualified as HashMap
import Data.HashSet qualified as HashSet
import Data.Int (Int64)
import Data.Set.NonEmpty qualified as NE
import Data.Text.Lazy qualified as TL
import Data.Time (UTCTime)
import Data.Time.Clock qualified as Time
import Database.PG.Query qualified as PG
import Hasura.Backends.Postgres.Connection
import Hasura.Backends.Postgres.SQL.DML
import Hasura.Backends.Postgres.SQL.DML qualified as S
import Hasura.Backends.Postgres.SQL.Types hiding (TableName)
import Hasura.Backends.Postgres.Translate.Column
import Hasura.Base.Error
import Hasura.Eventing.Common
import Hasura.Prelude
import Hasura.RQL.Types.Backend (Backend, SourceConfig, TableName)
import Hasura.RQL.Types.BackendType
import Hasura.RQL.Types.Column
import Hasura.RQL.Types.Common
import Hasura.RQL.Types.EventTrigger
import Hasura.RQL.Types.Eventing
import Hasura.RQL.Types.ScheduledTrigger (formatTime')
import Hasura.RQL.Types.Source
import Hasura.SQL.Types
import Hasura.Server.Migrate.Internal
import Hasura.Server.Migrate.LatestVersion
import Hasura.Server.Migrate.Version
import Hasura.Server.Types
import Hasura.Session
import Hasura.Table.Cache (PrimaryKey)
import Hasura.Tracing qualified as Tracing
import Text.Builder qualified as TB
import Text.Shakespeare.Text qualified as ST
fetchUndeliveredEvents ::
(MonadIO m, MonadError QErr m) =>
SourceConfig ('Postgres pgKind) ->
SourceName ->
[TriggerName] ->
MaintenanceMode () ->
FetchBatchSize ->
m [Event ('Postgres pgKind)]
fetchUndeliveredEvents :: forall (m :: * -> *) (pgKind :: PostgresKind).
(MonadIO m, MonadError QErr m) =>
SourceConfig ('Postgres pgKind)
-> SourceName
-> [TriggerName]
-> MaintenanceMode ()
-> FetchBatchSize
-> m [Event ('Postgres pgKind)]
fetchUndeliveredEvents SourceConfig ('Postgres pgKind)
sourceConfig SourceName
sourceName [TriggerName]
triggerNames MaintenanceMode ()
maintenanceMode FetchBatchSize
fetchBatchSize = do
Either QErr (TxE QErr [Event ('Postgres pgKind)])
fetchEventsTxE <-
case MaintenanceMode ()
maintenanceMode of
MaintenanceModeEnabled () -> do
Either QErr MaintenanceModeVersion
maintenanceModeVersion <- IO (Either QErr MaintenanceModeVersion)
-> m (Either QErr MaintenanceModeVersion)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either QErr MaintenanceModeVersion)
-> m (Either QErr MaintenanceModeVersion))
-> IO (Either QErr MaintenanceModeVersion)
-> m (Either QErr MaintenanceModeVersion)
forall a b. (a -> b) -> a -> b
$ PGSourceConfig
-> TxET QErr IO MaintenanceModeVersion
-> IO (Either QErr MaintenanceModeVersion)
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> TxET QErr m a -> m (Either QErr a)
runPgSourceReadTx SourceConfig ('Postgres pgKind)
PGSourceConfig
sourceConfig TxET QErr IO MaintenanceModeVersion
getMaintenanceModeVersionTx
Either QErr (TxE QErr [Event ('Postgres pgKind)])
-> m (Either QErr (TxE QErr [Event ('Postgres pgKind)]))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either QErr (TxE QErr [Event ('Postgres pgKind)])
-> m (Either QErr (TxE QErr [Event ('Postgres pgKind)])))
-> Either QErr (TxE QErr [Event ('Postgres pgKind)])
-> m (Either QErr (TxE QErr [Event ('Postgres pgKind)]))
forall a b. (a -> b) -> a -> b
$ (MaintenanceModeVersion -> TxE QErr [Event ('Postgres pgKind)])
-> Either QErr MaintenanceModeVersion
-> Either QErr (TxE QErr [Event ('Postgres pgKind)])
forall a b. (a -> b) -> Either QErr a -> Either QErr b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (SourceName
-> [TriggerName]
-> FetchBatchSize
-> MaintenanceModeVersion
-> TxE QErr [Event ('Postgres pgKind)]
forall (pgKind :: PostgresKind).
SourceName
-> [TriggerName]
-> FetchBatchSize
-> MaintenanceModeVersion
-> TxE QErr [Event ('Postgres pgKind)]
fetchEventsMaintenanceMode SourceName
sourceName [TriggerName]
triggerNames FetchBatchSize
fetchBatchSize) Either QErr MaintenanceModeVersion
maintenanceModeVersion
MaintenanceMode ()
MaintenanceModeDisabled -> Either QErr (TxE QErr [Event ('Postgres pgKind)])
-> m (Either QErr (TxE QErr [Event ('Postgres pgKind)]))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either QErr (TxE QErr [Event ('Postgres pgKind)])
-> m (Either QErr (TxE QErr [Event ('Postgres pgKind)])))
-> Either QErr (TxE QErr [Event ('Postgres pgKind)])
-> m (Either QErr (TxE QErr [Event ('Postgres pgKind)]))
forall a b. (a -> b) -> a -> b
$ TxE QErr [Event ('Postgres pgKind)]
-> Either QErr (TxE QErr [Event ('Postgres pgKind)])
forall a b. b -> Either a b
Right (TxE QErr [Event ('Postgres pgKind)]
-> Either QErr (TxE QErr [Event ('Postgres pgKind)]))
-> TxE QErr [Event ('Postgres pgKind)]
-> Either QErr (TxE QErr [Event ('Postgres pgKind)])
forall a b. (a -> b) -> a -> b
$ SourceName
-> [TriggerName]
-> FetchBatchSize
-> TxE QErr [Event ('Postgres pgKind)]
forall (pgKind :: PostgresKind).
SourceName
-> [TriggerName]
-> FetchBatchSize
-> TxE QErr [Event ('Postgres pgKind)]
fetchEvents SourceName
sourceName [TriggerName]
triggerNames FetchBatchSize
fetchBatchSize
case Either QErr (TxE QErr [Event ('Postgres pgKind)])
fetchEventsTxE of
Left QErr
err -> QErr -> m [Event ('Postgres pgKind)]
forall a. QErr -> m a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (QErr -> m [Event ('Postgres pgKind)])
-> QErr -> m [Event ('Postgres pgKind)]
forall a b. (a -> b) -> a -> b
$ Text -> QErr -> QErr
prefixQErr Text
"something went wrong while fetching events: " QErr
err
Right TxE QErr [Event ('Postgres pgKind)]
fetchEventsTx ->
m (Either QErr [Event ('Postgres pgKind)])
-> m [Event ('Postgres pgKind)]
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM
(m (Either QErr [Event ('Postgres pgKind)])
-> m [Event ('Postgres pgKind)])
-> m (Either QErr [Event ('Postgres pgKind)])
-> m [Event ('Postgres pgKind)]
forall a b. (a -> b) -> a -> b
$ IO (Either QErr [Event ('Postgres pgKind)])
-> m (Either QErr [Event ('Postgres pgKind)])
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
(IO (Either QErr [Event ('Postgres pgKind)])
-> m (Either QErr [Event ('Postgres pgKind)]))
-> IO (Either QErr [Event ('Postgres pgKind)])
-> m (Either QErr [Event ('Postgres pgKind)])
forall a b. (a -> b) -> a -> b
$ PGSourceConfig
-> PGExecFrom
-> TxE QErr [Event ('Postgres pgKind)]
-> IO (Either QErr [Event ('Postgres pgKind)])
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> PGExecFrom -> TxET QErr m a -> m (Either QErr a)
runPgSourceWriteTx SourceConfig ('Postgres pgKind)
PGSourceConfig
sourceConfig PGExecFrom
InternalRawQuery TxE QErr [Event ('Postgres pgKind)]
fetchEventsTx
setRetry ::
( MonadIO m,
MonadError QErr m
) =>
SourceConfig ('Postgres pgKind) ->
Event ('Postgres pgKind) ->
Time.UTCTime ->
MaintenanceMode MaintenanceModeVersion ->
m ()
setRetry :: forall (m :: * -> *) (pgKind :: PostgresKind).
(MonadIO m, MonadError QErr m) =>
SourceConfig ('Postgres pgKind)
-> Event ('Postgres pgKind)
-> UTCTime
-> MaintenanceMode MaintenanceModeVersion
-> m ()
setRetry SourceConfig ('Postgres pgKind)
sourceConfig Event ('Postgres pgKind)
event UTCTime
retryTime MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion =
m (Either QErr ()) -> m ()
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM (m (Either QErr ()) -> m ()) -> m (Either QErr ()) -> m ()
forall a b. (a -> b) -> a -> b
$ IO (Either QErr ()) -> m (Either QErr ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either QErr ()) -> m (Either QErr ()))
-> IO (Either QErr ()) -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$ PGSourceConfig
-> PGExecFrom -> TxET QErr IO () -> IO (Either QErr ())
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> PGExecFrom -> TxET QErr m a -> m (Either QErr a)
runPgSourceWriteTx SourceConfig ('Postgres pgKind)
PGSourceConfig
sourceConfig PGExecFrom
InternalRawQuery (Event ('Postgres pgKind)
-> UTCTime
-> MaintenanceMode MaintenanceModeVersion
-> TxET QErr IO ()
forall (pgKind :: PostgresKind).
Event ('Postgres pgKind)
-> UTCTime
-> MaintenanceMode MaintenanceModeVersion
-> TxET QErr IO ()
setRetryTx Event ('Postgres pgKind)
event UTCTime
retryTime MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion)
insertManualEvent ::
(MonadIO m, MonadError QErr m) =>
SourceConfig ('Postgres pgKind) ->
TableName ('Postgres pgKind) ->
TriggerName ->
Value ->
UserInfo ->
Maybe Tracing.TraceContext ->
m EventId
insertManualEvent :: forall (m :: * -> *) (pgKind :: PostgresKind).
(MonadIO m, MonadError QErr m) =>
SourceConfig ('Postgres pgKind)
-> TableName ('Postgres pgKind)
-> TriggerName
-> Value
-> UserInfo
-> Maybe TraceContext
-> m EventId
insertManualEvent SourceConfig ('Postgres pgKind)
sourceConfig TableName ('Postgres pgKind)
tableName TriggerName
triggerName Value
payload UserInfo
userInfo Maybe TraceContext
traceCtx =
m (Either QErr EventId) -> m EventId
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM
(m (Either QErr EventId) -> m EventId)
-> m (Either QErr EventId) -> m EventId
forall a b. (a -> b) -> a -> b
$ IO (Either QErr EventId) -> m (Either QErr EventId)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
(IO (Either QErr EventId) -> m (Either QErr EventId))
-> IO (Either QErr EventId) -> m (Either QErr EventId)
forall a b. (a -> b) -> a -> b
$ PGSourceConfig
-> PGExecFrom -> TxET QErr IO EventId -> IO (Either QErr EventId)
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> PGExecFrom -> TxET QErr m a -> m (Either QErr a)
runPgSourceWriteTx SourceConfig ('Postgres pgKind)
PGSourceConfig
sourceConfig PGExecFrom
InternalRawQuery
(TxET QErr IO EventId -> IO (Either QErr EventId))
-> TxET QErr IO EventId -> IO (Either QErr EventId)
forall a b. (a -> b) -> a -> b
$ SessionVariables -> TxET QErr IO ()
forall (m :: * -> *).
MonadIO m =>
SessionVariables -> TxET QErr m ()
setHeadersTx (UserInfo -> SessionVariables
_uiSession UserInfo
userInfo)
TxET QErr IO () -> TxET QErr IO () -> TxET QErr IO ()
forall a b. TxET QErr IO a -> TxET QErr IO b -> TxET QErr IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Maybe TraceContext -> TxET QErr IO ()
forall (m :: * -> *).
MonadIO m =>
Maybe TraceContext -> TxET QErr m ()
setTraceContextInTx Maybe TraceContext
traceCtx
TxET QErr IO () -> TxET QErr IO EventId -> TxET QErr IO EventId
forall a b. TxET QErr IO a -> TxET QErr IO b -> TxET QErr IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> QualifiedTable -> TriggerName -> Value -> TxET QErr IO EventId
insertPGManualEvent TableName ('Postgres pgKind)
QualifiedTable
tableName TriggerName
triggerName Value
payload
getMaintenanceModeVersion ::
( MonadIO m,
MonadError QErr m
) =>
SourceConfig ('Postgres pgKind) ->
m MaintenanceModeVersion
getMaintenanceModeVersion :: forall (m :: * -> *) (pgKind :: PostgresKind).
(MonadIO m, MonadError QErr m) =>
SourceConfig ('Postgres pgKind) -> m MaintenanceModeVersion
getMaintenanceModeVersion SourceConfig ('Postgres pgKind)
sourceConfig =
m (Either QErr MaintenanceModeVersion) -> m MaintenanceModeVersion
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM (m (Either QErr MaintenanceModeVersion)
-> m MaintenanceModeVersion)
-> m (Either QErr MaintenanceModeVersion)
-> m MaintenanceModeVersion
forall a b. (a -> b) -> a -> b
$ IO (Either QErr MaintenanceModeVersion)
-> m (Either QErr MaintenanceModeVersion)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either QErr MaintenanceModeVersion)
-> m (Either QErr MaintenanceModeVersion))
-> IO (Either QErr MaintenanceModeVersion)
-> m (Either QErr MaintenanceModeVersion)
forall a b. (a -> b) -> a -> b
$ PGSourceConfig
-> TxET QErr IO MaintenanceModeVersion
-> IO (Either QErr MaintenanceModeVersion)
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> TxET QErr m a -> m (Either QErr a)
runPgSourceReadTx SourceConfig ('Postgres pgKind)
PGSourceConfig
sourceConfig TxET QErr IO MaintenanceModeVersion
getMaintenanceModeVersionTx
recordSuccess ::
(MonadIO m) =>
SourceConfig ('Postgres pgKind) ->
Event ('Postgres pgKind) ->
Invocation 'EventType ->
MaintenanceMode MaintenanceModeVersion ->
m (Either QErr ())
recordSuccess :: forall (m :: * -> *) (pgKind :: PostgresKind).
MonadIO m =>
SourceConfig ('Postgres pgKind)
-> Event ('Postgres pgKind)
-> Invocation 'EventType
-> MaintenanceMode MaintenanceModeVersion
-> m (Either QErr ())
recordSuccess SourceConfig ('Postgres pgKind)
sourceConfig Event ('Postgres pgKind)
event Invocation 'EventType
invocation MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion =
IO (Either QErr ()) -> m (Either QErr ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
(IO (Either QErr ()) -> m (Either QErr ()))
-> IO (Either QErr ()) -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$ PGSourceConfig
-> PGExecFrom -> TxET QErr IO () -> IO (Either QErr ())
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> PGExecFrom -> TxET QErr m a -> m (Either QErr a)
runPgSourceWriteTx SourceConfig ('Postgres pgKind)
PGSourceConfig
sourceConfig PGExecFrom
InternalRawQuery
(TxET QErr IO () -> IO (Either QErr ()))
-> TxET QErr IO () -> IO (Either QErr ())
forall a b. (a -> b) -> a -> b
$ do
TriggerName -> Invocation 'EventType -> TxET QErr IO ()
insertInvocation (TriggerMetadata -> TriggerName
tmName (Event ('Postgres pgKind) -> TriggerMetadata
forall (b :: BackendType). Event b -> TriggerMetadata
eTrigger Event ('Postgres pgKind)
event)) Invocation 'EventType
invocation
Event ('Postgres pgKind)
-> MaintenanceMode MaintenanceModeVersion -> TxET QErr IO ()
forall (pgKind :: PostgresKind).
Event ('Postgres pgKind)
-> MaintenanceMode MaintenanceModeVersion -> TxET QErr IO ()
setSuccessTx Event ('Postgres pgKind)
event MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion
recordError ::
(MonadIO m) =>
SourceConfig ('Postgres pgKind) ->
Event ('Postgres pgKind) ->
Invocation 'EventType ->
ProcessEventError ->
MaintenanceMode MaintenanceModeVersion ->
m (Either QErr ())
recordError :: forall (m :: * -> *) (pgKind :: PostgresKind).
MonadIO m =>
SourceConfig ('Postgres pgKind)
-> Event ('Postgres pgKind)
-> Invocation 'EventType
-> ProcessEventError
-> MaintenanceMode MaintenanceModeVersion
-> m (Either QErr ())
recordError SourceConfig ('Postgres pgKind)
sourceConfig Event ('Postgres pgKind)
event Invocation 'EventType
invocation ProcessEventError
processEventError MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion =
SourceConfig ('Postgres pgKind)
-> Event ('Postgres pgKind)
-> Maybe (Invocation 'EventType)
-> ProcessEventError
-> MaintenanceMode MaintenanceModeVersion
-> m (Either QErr ())
forall (m :: * -> *) (pgKind :: PostgresKind).
MonadIO m =>
SourceConfig ('Postgres pgKind)
-> Event ('Postgres pgKind)
-> Maybe (Invocation 'EventType)
-> ProcessEventError
-> MaintenanceMode MaintenanceModeVersion
-> m (Either QErr ())
recordError' SourceConfig ('Postgres pgKind)
sourceConfig Event ('Postgres pgKind)
event (Invocation 'EventType -> Maybe (Invocation 'EventType)
forall a. a -> Maybe a
Just Invocation 'EventType
invocation) ProcessEventError
processEventError MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion
recordError' ::
(MonadIO m) =>
SourceConfig ('Postgres pgKind) ->
Event ('Postgres pgKind) ->
Maybe (Invocation 'EventType) ->
ProcessEventError ->
MaintenanceMode MaintenanceModeVersion ->
m (Either QErr ())
recordError' :: forall (m :: * -> *) (pgKind :: PostgresKind).
MonadIO m =>
SourceConfig ('Postgres pgKind)
-> Event ('Postgres pgKind)
-> Maybe (Invocation 'EventType)
-> ProcessEventError
-> MaintenanceMode MaintenanceModeVersion
-> m (Either QErr ())
recordError' SourceConfig ('Postgres pgKind)
sourceConfig Event ('Postgres pgKind)
event Maybe (Invocation 'EventType)
invocation ProcessEventError
processEventError MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion =
IO (Either QErr ()) -> m (Either QErr ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
(IO (Either QErr ()) -> m (Either QErr ()))
-> IO (Either QErr ()) -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$ PGSourceConfig
-> PGExecFrom -> TxET QErr IO () -> IO (Either QErr ())
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> PGExecFrom -> TxET QErr m a -> m (Either QErr a)
runPgSourceWriteTx SourceConfig ('Postgres pgKind)
PGSourceConfig
sourceConfig PGExecFrom
InternalRawQuery
(TxET QErr IO () -> IO (Either QErr ()))
-> TxET QErr IO () -> IO (Either QErr ())
forall a b. (a -> b) -> a -> b
$ do
Maybe (Invocation 'EventType)
-> (Invocation 'EventType -> TxET QErr IO ()) -> TxET QErr IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ Maybe (Invocation 'EventType)
invocation ((Invocation 'EventType -> TxET QErr IO ()) -> TxET QErr IO ())
-> (Invocation 'EventType -> TxET QErr IO ()) -> TxET QErr IO ()
forall a b. (a -> b) -> a -> b
$ TriggerName -> Invocation 'EventType -> TxET QErr IO ()
insertInvocation (TriggerMetadata -> TriggerName
tmName (Event ('Postgres pgKind) -> TriggerMetadata
forall (b :: BackendType). Event b -> TriggerMetadata
eTrigger Event ('Postgres pgKind)
event))
case ProcessEventError
processEventError of
PESetRetry UTCTime
retryTime -> Event ('Postgres pgKind)
-> UTCTime
-> MaintenanceMode MaintenanceModeVersion
-> TxET QErr IO ()
forall (pgKind :: PostgresKind).
Event ('Postgres pgKind)
-> UTCTime
-> MaintenanceMode MaintenanceModeVersion
-> TxET QErr IO ()
setRetryTx Event ('Postgres pgKind)
event UTCTime
retryTime MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion
ProcessEventError
PESetError -> Event ('Postgres pgKind)
-> MaintenanceMode MaintenanceModeVersion -> TxET QErr IO ()
forall (pgKind :: PostgresKind).
Event ('Postgres pgKind)
-> MaintenanceMode MaintenanceModeVersion -> TxET QErr IO ()
setErrorTx Event ('Postgres pgKind)
event MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion
redeliverEvent ::
(MonadIO m, MonadError QErr m) =>
SourceConfig ('Postgres pgKind) ->
EventId ->
m ()
redeliverEvent :: forall (m :: * -> *) (pgKind :: PostgresKind).
(MonadIO m, MonadError QErr m) =>
SourceConfig ('Postgres pgKind) -> EventId -> m ()
redeliverEvent SourceConfig ('Postgres pgKind)
sourceConfig EventId
eventId =
m (Either QErr ()) -> m ()
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM (m (Either QErr ()) -> m ()) -> m (Either QErr ()) -> m ()
forall a b. (a -> b) -> a -> b
$ IO (Either QErr ()) -> m (Either QErr ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either QErr ()) -> m (Either QErr ()))
-> IO (Either QErr ()) -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$ PGSourceConfig
-> PGExecFrom -> TxET QErr IO () -> IO (Either QErr ())
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> PGExecFrom -> TxET QErr m a -> m (Either QErr a)
runPgSourceWriteTx SourceConfig ('Postgres pgKind)
PGSourceConfig
sourceConfig PGExecFrom
InternalRawQuery (EventId -> TxET QErr IO ()
redeliverEventTx EventId
eventId)
dropTriggerAndArchiveEvents ::
( MonadIO m,
MonadError QErr m
) =>
SourceConfig ('Postgres pgKind) ->
TriggerName ->
QualifiedTable ->
m ()
dropTriggerAndArchiveEvents :: forall (m :: * -> *) (pgKind :: PostgresKind).
(MonadIO m, MonadError QErr m) =>
SourceConfig ('Postgres pgKind)
-> TriggerName -> QualifiedTable -> m ()
dropTriggerAndArchiveEvents SourceConfig ('Postgres pgKind)
sourceConfig TriggerName
triggerName QualifiedTable
_table =
m (Either QErr ()) -> m ()
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM
(m (Either QErr ()) -> m ()) -> m (Either QErr ()) -> m ()
forall a b. (a -> b) -> a -> b
$ IO (Either QErr ()) -> m (Either QErr ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
(IO (Either QErr ()) -> m (Either QErr ()))
-> IO (Either QErr ()) -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$ PGSourceConfig
-> PGExecFrom -> TxET QErr IO () -> IO (Either QErr ())
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> PGExecFrom -> TxET QErr m a -> m (Either QErr a)
runPgSourceWriteTx SourceConfig ('Postgres pgKind)
PGSourceConfig
sourceConfig PGExecFrom
InternalRawQuery
(TxET QErr IO () -> IO (Either QErr ()))
-> TxET QErr IO () -> IO (Either QErr ())
forall a b. (a -> b) -> a -> b
$ do
TriggerName -> TxET QErr IO ()
dropTriggerQ TriggerName
triggerName
TriggerName -> TxET QErr IO ()
archiveEvents TriggerName
triggerName
createMissingSQLTriggers ::
( MonadIO m,
MonadError QErr m,
MonadBaseControl IO m,
Backend ('Postgres pgKind)
) =>
SQLGenCtx ->
PGSourceConfig ->
TableName ('Postgres pgKind) ->
([(ColumnInfo ('Postgres pgKind))], Maybe (PrimaryKey ('Postgres pgKind) (ColumnInfo ('Postgres pgKind)))) ->
TriggerName ->
TriggerOnReplication ->
TriggerOpsDef ('Postgres pgKind) ->
m ()
createMissingSQLTriggers :: forall (m :: * -> *) (pgKind :: PostgresKind).
(MonadIO m, MonadError QErr m, MonadBaseControl IO m,
Backend ('Postgres pgKind)) =>
SQLGenCtx
-> PGSourceConfig
-> TableName ('Postgres pgKind)
-> ([ColumnInfo ('Postgres pgKind)],
Maybe
(PrimaryKey ('Postgres pgKind) (ColumnInfo ('Postgres pgKind))))
-> TriggerName
-> TriggerOnReplication
-> TriggerOpsDef ('Postgres pgKind)
-> m ()
createMissingSQLTriggers SQLGenCtx
serverConfigCtx PGSourceConfig
sourceConfig TableName ('Postgres pgKind)
table ([ColumnInfo ('Postgres pgKind)]
allCols, Maybe
(PrimaryKey ('Postgres pgKind) (ColumnInfo ('Postgres pgKind)))
_) TriggerName
triggerName TriggerOnReplication
triggerOnReplication TriggerOpsDef ('Postgres pgKind)
opsDefinition = do
m (Either QErr ()) -> m ()
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM
(m (Either QErr ()) -> m ()) -> m (Either QErr ()) -> m ()
forall a b. (a -> b) -> a -> b
$ PGSourceConfig
-> PGExecFrom -> TxET QErr m () -> m (Either QErr ())
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> PGExecFrom -> TxET QErr m a -> m (Either QErr a)
runPgSourceWriteTx PGSourceConfig
sourceConfig PGExecFrom
InternalRawQuery
(TxET QErr m () -> m (Either QErr ()))
-> TxET QErr m () -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$ do
Maybe (SubscribeOpSpec ('Postgres pgKind))
-> (SubscribeOpSpec ('Postgres pgKind) -> TxET QErr m ())
-> TxET QErr m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ (TriggerOpsDef ('Postgres pgKind)
-> Maybe (SubscribeOpSpec ('Postgres pgKind))
forall (b :: BackendType).
TriggerOpsDef b -> Maybe (SubscribeOpSpec b)
tdInsert TriggerOpsDef ('Postgres pgKind)
opsDefinition) (Ops -> SubscribeOpSpec ('Postgres pgKind) -> TxET QErr m ()
doesSQLTriggerExist Ops
INSERT)
Maybe (SubscribeOpSpec ('Postgres pgKind))
-> (SubscribeOpSpec ('Postgres pgKind) -> TxET QErr m ())
-> TxET QErr m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ (TriggerOpsDef ('Postgres pgKind)
-> Maybe (SubscribeOpSpec ('Postgres pgKind))
forall (b :: BackendType).
TriggerOpsDef b -> Maybe (SubscribeOpSpec b)
tdUpdate TriggerOpsDef ('Postgres pgKind)
opsDefinition) (Ops -> SubscribeOpSpec ('Postgres pgKind) -> TxET QErr m ()
doesSQLTriggerExist Ops
UPDATE)
Maybe (SubscribeOpSpec ('Postgres pgKind))
-> (SubscribeOpSpec ('Postgres pgKind) -> TxET QErr m ())
-> TxET QErr m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ (TriggerOpsDef ('Postgres pgKind)
-> Maybe (SubscribeOpSpec ('Postgres pgKind))
forall (b :: BackendType).
TriggerOpsDef b -> Maybe (SubscribeOpSpec b)
tdDelete TriggerOpsDef ('Postgres pgKind)
opsDefinition) (Ops -> SubscribeOpSpec ('Postgres pgKind) -> TxET QErr m ()
doesSQLTriggerExist Ops
DELETE)
where
doesSQLTriggerExist :: Ops -> SubscribeOpSpec ('Postgres pgKind) -> TxET QErr m ()
doesSQLTriggerExist Ops
op SubscribeOpSpec ('Postgres pgKind)
opSpec = do
let opTriggerName :: QualifiedTriggerName
opTriggerName = Ops -> TriggerName -> QualifiedTriggerName
pgTriggerName Ops
op TriggerName
triggerName
Bool
doesOpTriggerFunctionExist <-
Identity Bool -> Bool
forall a. Identity a -> a
runIdentity
(Identity Bool -> Bool)
-> (SingleRow (Identity Bool) -> Identity Bool)
-> SingleRow (Identity Bool)
-> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SingleRow (Identity Bool) -> Identity Bool
forall a. SingleRow a -> a
PG.getRow
(SingleRow (Identity Bool) -> Bool)
-> TxET QErr m (SingleRow (Identity Bool)) -> TxET QErr m Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> Identity QualifiedTriggerName
-> Bool
-> TxET QErr m (SingleRow (Identity Bool))
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
PG.withQE
PGTxErr -> QErr
defaultTxErrorHandler
[PG.sql|
SELECT EXISTS
( SELECT 1
FROM pg_proc
WHERE proname = $1
)
|]
(QualifiedTriggerName -> Identity QualifiedTriggerName
forall a. a -> Identity a
Identity QualifiedTriggerName
opTriggerName)
Bool
True
Bool -> TxET QErr m () -> TxET QErr m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
doesOpTriggerFunctionExist
(TxET QErr m () -> TxET QErr m ())
-> TxET QErr m () -> TxET QErr m ()
forall a b. (a -> b) -> a -> b
$ (ReaderT SQLGenCtx (TxET QErr m) () -> SQLGenCtx -> TxET QErr m ())
-> SQLGenCtx
-> ReaderT SQLGenCtx (TxET QErr m) ()
-> TxET QErr m ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip ReaderT SQLGenCtx (TxET QErr m) () -> SQLGenCtx -> TxET QErr m ()
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT SQLGenCtx
serverConfigCtx
(ReaderT SQLGenCtx (TxET QErr m) () -> TxET QErr m ())
-> ReaderT SQLGenCtx (TxET QErr m) () -> TxET QErr m ()
forall a b. (a -> b) -> a -> b
$ TriggerName
-> QualifiedTable
-> TriggerOnReplication
-> [ColumnInfo ('Postgres pgKind)]
-> Ops
-> SubscribeOpSpec ('Postgres pgKind)
-> ReaderT SQLGenCtx (TxET QErr m) ()
forall (pgKind :: PostgresKind) (m :: * -> *).
(Backend ('Postgres pgKind), MonadTx m, MonadReader SQLGenCtx m) =>
TriggerName
-> QualifiedTable
-> TriggerOnReplication
-> [ColumnInfo ('Postgres pgKind)]
-> Ops
-> SubscribeOpSpec ('Postgres pgKind)
-> m ()
mkTrigger TriggerName
triggerName TableName ('Postgres pgKind)
QualifiedTable
table TriggerOnReplication
triggerOnReplication [ColumnInfo ('Postgres pgKind)]
allCols Ops
op SubscribeOpSpec ('Postgres pgKind)
opSpec
createTableEventTrigger ::
(Backend ('Postgres pgKind), MonadIO m, MonadBaseControl IO m) =>
SQLGenCtx ->
PGSourceConfig ->
QualifiedTable ->
[ColumnInfo ('Postgres pgKind)] ->
TriggerName ->
TriggerOnReplication ->
TriggerOpsDef ('Postgres pgKind) ->
Maybe (PrimaryKey ('Postgres pgKind) (ColumnInfo ('Postgres pgKind))) ->
m (Either QErr ())
createTableEventTrigger :: forall (pgKind :: PostgresKind) (m :: * -> *).
(Backend ('Postgres pgKind), MonadIO m, MonadBaseControl IO m) =>
SQLGenCtx
-> PGSourceConfig
-> QualifiedTable
-> [ColumnInfo ('Postgres pgKind)]
-> TriggerName
-> TriggerOnReplication
-> TriggerOpsDef ('Postgres pgKind)
-> Maybe
(PrimaryKey ('Postgres pgKind) (ColumnInfo ('Postgres pgKind)))
-> m (Either QErr ())
createTableEventTrigger SQLGenCtx
serverConfigCtx PGSourceConfig
sourceConfig QualifiedTable
table [ColumnInfo ('Postgres pgKind)]
columns TriggerName
triggerName TriggerOnReplication
triggerOnReplication TriggerOpsDef ('Postgres pgKind)
opsDefinition Maybe
(PrimaryKey ('Postgres pgKind) (ColumnInfo ('Postgres pgKind)))
_ = PGSourceConfig
-> PGExecFrom -> TxET QErr m () -> m (Either QErr ())
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> PGExecFrom -> TxET QErr m a -> m (Either QErr a)
runPgSourceWriteTx PGSourceConfig
sourceConfig PGExecFrom
InternalRawQuery (TxET QErr m () -> m (Either QErr ()))
-> TxET QErr m () -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$ do
(ReaderT SQLGenCtx (TxET QErr m) () -> SQLGenCtx -> TxET QErr m ())
-> SQLGenCtx
-> ReaderT SQLGenCtx (TxET QErr m) ()
-> TxET QErr m ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip ReaderT SQLGenCtx (TxET QErr m) () -> SQLGenCtx -> TxET QErr m ()
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT SQLGenCtx
serverConfigCtx
(ReaderT SQLGenCtx (TxET QErr m) () -> TxET QErr m ())
-> ReaderT SQLGenCtx (TxET QErr m) () -> TxET QErr m ()
forall a b. (a -> b) -> a -> b
$ TriggerName
-> QualifiedTable
-> TriggerOnReplication
-> [ColumnInfo ('Postgres pgKind)]
-> TriggerOpsDef ('Postgres pgKind)
-> ReaderT SQLGenCtx (TxET QErr m) ()
forall (pgKind :: PostgresKind) (m :: * -> *).
(Backend ('Postgres pgKind), MonadTx m, MonadReader SQLGenCtx m) =>
TriggerName
-> QualifiedTable
-> TriggerOnReplication
-> [ColumnInfo ('Postgres pgKind)]
-> TriggerOpsDef ('Postgres pgKind)
-> m ()
mkAllTriggersQ TriggerName
triggerName QualifiedTable
table TriggerOnReplication
triggerOnReplication [ColumnInfo ('Postgres pgKind)]
columns TriggerOpsDef ('Postgres pgKind)
opsDefinition
dropDanglingSQLTrigger ::
( MonadIO m,
MonadError QErr m
) =>
SourceConfig ('Postgres pgKind) ->
TriggerName ->
QualifiedTable ->
HashSet Ops ->
m ()
dropDanglingSQLTrigger :: forall (m :: * -> *) (pgKind :: PostgresKind).
(MonadIO m, MonadError QErr m) =>
SourceConfig ('Postgres pgKind)
-> TriggerName -> QualifiedTable -> HashSet Ops -> m ()
dropDanglingSQLTrigger SourceConfig ('Postgres pgKind)
sourceConfig TriggerName
triggerName QualifiedTable
_ HashSet Ops
ops =
m (Either QErr ()) -> m ()
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM
(m (Either QErr ()) -> m ()) -> m (Either QErr ()) -> m ()
forall a b. (a -> b) -> a -> b
$ IO (Either QErr ()) -> m (Either QErr ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
(IO (Either QErr ()) -> m (Either QErr ()))
-> IO (Either QErr ()) -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$ PGSourceConfig
-> PGExecFrom -> TxET QErr IO () -> IO (Either QErr ())
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> PGExecFrom -> TxET QErr m a -> m (Either QErr a)
runPgSourceWriteTx SourceConfig ('Postgres pgKind)
PGSourceConfig
sourceConfig PGExecFrom
InternalRawQuery
(TxET QErr IO () -> IO (Either QErr ()))
-> TxET QErr IO () -> IO (Either QErr ())
forall a b. (a -> b) -> a -> b
$ (Ops -> TxET QErr IO ()) -> HashSet Ops -> TxET QErr IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (TriggerName -> Ops -> TxET QErr IO ()
dropTriggerOp TriggerName
triggerName) HashSet Ops
ops
updateColumnInEventTrigger ::
QualifiedTable ->
PGCol ->
PGCol ->
QualifiedTable ->
EventTriggerConf ('Postgres pgKind) ->
EventTriggerConf ('Postgres pgKind)
updateColumnInEventTrigger :: forall (pgKind :: PostgresKind).
QualifiedTable
-> PGCol
-> PGCol
-> QualifiedTable
-> EventTriggerConf ('Postgres pgKind)
-> EventTriggerConf ('Postgres pgKind)
updateColumnInEventTrigger QualifiedTable
table PGCol
oCol PGCol
nCol QualifiedTable
refTable = EventTriggerConf ('Postgres pgKind)
-> EventTriggerConf ('Postgres pgKind)
rewriteEventTriggerConf
where
rewriteSubsCols :: SubscribeColumns ('Postgres pgKind)
-> SubscribeColumns ('Postgres pgKind)
rewriteSubsCols = \case
SubscribeColumns ('Postgres pgKind)
SubCStar -> SubscribeColumns ('Postgres pgKind)
forall (b :: BackendType). SubscribeColumns b
SubCStar
SubCArray [Column ('Postgres pgKind)]
cols -> [Column ('Postgres pgKind)] -> SubscribeColumns ('Postgres pgKind)
forall (b :: BackendType). [Column b] -> SubscribeColumns b
SubCArray ([Column ('Postgres pgKind)]
-> SubscribeColumns ('Postgres pgKind))
-> [Column ('Postgres pgKind)]
-> SubscribeColumns ('Postgres pgKind)
forall a b. (a -> b) -> a -> b
$ (PGCol -> PGCol) -> [PGCol] -> [PGCol]
forall a b. (a -> b) -> [a] -> [b]
map PGCol -> PGCol
getNewCol [Column ('Postgres pgKind)]
[PGCol]
cols
rewriteOpSpec :: SubscribeOpSpec ('Postgres pgKind)
-> SubscribeOpSpec ('Postgres pgKind)
rewriteOpSpec (SubscribeOpSpec SubscribeColumns ('Postgres pgKind)
listenColumns Maybe (SubscribeColumns ('Postgres pgKind))
deliveryColumns) =
SubscribeColumns ('Postgres pgKind)
-> Maybe (SubscribeColumns ('Postgres pgKind))
-> SubscribeOpSpec ('Postgres pgKind)
forall (b :: BackendType).
SubscribeColumns b
-> Maybe (SubscribeColumns b) -> SubscribeOpSpec b
SubscribeOpSpec
(SubscribeColumns ('Postgres pgKind)
-> SubscribeColumns ('Postgres pgKind)
rewriteSubsCols SubscribeColumns ('Postgres pgKind)
listenColumns)
(SubscribeColumns ('Postgres pgKind)
-> SubscribeColumns ('Postgres pgKind)
rewriteSubsCols (SubscribeColumns ('Postgres pgKind)
-> SubscribeColumns ('Postgres pgKind))
-> Maybe (SubscribeColumns ('Postgres pgKind))
-> Maybe (SubscribeColumns ('Postgres pgKind))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (SubscribeColumns ('Postgres pgKind))
deliveryColumns)
rewriteTrigOpsDef :: TriggerOpsDef ('Postgres pgKind)
-> TriggerOpsDef ('Postgres pgKind)
rewriteTrigOpsDef (TriggerOpsDef Maybe (SubscribeOpSpec ('Postgres pgKind))
ins Maybe (SubscribeOpSpec ('Postgres pgKind))
upd Maybe (SubscribeOpSpec ('Postgres pgKind))
del Maybe Bool
man) =
Maybe (SubscribeOpSpec ('Postgres pgKind))
-> Maybe (SubscribeOpSpec ('Postgres pgKind))
-> Maybe (SubscribeOpSpec ('Postgres pgKind))
-> Maybe Bool
-> TriggerOpsDef ('Postgres pgKind)
forall (b :: BackendType).
Maybe (SubscribeOpSpec b)
-> Maybe (SubscribeOpSpec b)
-> Maybe (SubscribeOpSpec b)
-> Maybe Bool
-> TriggerOpsDef b
TriggerOpsDef
(SubscribeOpSpec ('Postgres pgKind)
-> SubscribeOpSpec ('Postgres pgKind)
rewriteOpSpec (SubscribeOpSpec ('Postgres pgKind)
-> SubscribeOpSpec ('Postgres pgKind))
-> Maybe (SubscribeOpSpec ('Postgres pgKind))
-> Maybe (SubscribeOpSpec ('Postgres pgKind))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (SubscribeOpSpec ('Postgres pgKind))
ins)
(SubscribeOpSpec ('Postgres pgKind)
-> SubscribeOpSpec ('Postgres pgKind)
rewriteOpSpec (SubscribeOpSpec ('Postgres pgKind)
-> SubscribeOpSpec ('Postgres pgKind))
-> Maybe (SubscribeOpSpec ('Postgres pgKind))
-> Maybe (SubscribeOpSpec ('Postgres pgKind))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (SubscribeOpSpec ('Postgres pgKind))
upd)
(SubscribeOpSpec ('Postgres pgKind)
-> SubscribeOpSpec ('Postgres pgKind)
rewriteOpSpec (SubscribeOpSpec ('Postgres pgKind)
-> SubscribeOpSpec ('Postgres pgKind))
-> Maybe (SubscribeOpSpec ('Postgres pgKind))
-> Maybe (SubscribeOpSpec ('Postgres pgKind))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (SubscribeOpSpec ('Postgres pgKind))
del)
Maybe Bool
man
rewriteEventTriggerConf :: EventTriggerConf ('Postgres pgKind)
-> EventTriggerConf ('Postgres pgKind)
rewriteEventTriggerConf EventTriggerConf ('Postgres pgKind)
etc =
EventTriggerConf ('Postgres pgKind)
etc
{ etcDefinition :: TriggerOpsDef ('Postgres pgKind)
etcDefinition =
TriggerOpsDef ('Postgres pgKind)
-> TriggerOpsDef ('Postgres pgKind)
rewriteTrigOpsDef (TriggerOpsDef ('Postgres pgKind)
-> TriggerOpsDef ('Postgres pgKind))
-> TriggerOpsDef ('Postgres pgKind)
-> TriggerOpsDef ('Postgres pgKind)
forall a b. (a -> b) -> a -> b
$ EventTriggerConf ('Postgres pgKind)
-> TriggerOpsDef ('Postgres pgKind)
forall (b :: BackendType). EventTriggerConf b -> TriggerOpsDef b
etcDefinition EventTriggerConf ('Postgres pgKind)
etc
}
getNewCol :: PGCol -> PGCol
getNewCol PGCol
col =
if QualifiedTable
table QualifiedTable -> QualifiedTable -> Bool
forall a. Eq a => a -> a -> Bool
== QualifiedTable
refTable Bool -> Bool -> Bool
&& PGCol
oCol PGCol -> PGCol -> Bool
forall a. Eq a => a -> a -> Bool
== PGCol
col then PGCol
nCol else PGCol
col
unlockEventsInSource ::
(MonadIO m) =>
SourceConfig ('Postgres pgKind) ->
NE.NESet EventId ->
m (Either QErr Int)
unlockEventsInSource :: forall (m :: * -> *) (pgKind :: PostgresKind).
MonadIO m =>
SourceConfig ('Postgres pgKind)
-> NESet EventId -> m (Either QErr Int)
unlockEventsInSource SourceConfig ('Postgres pgKind)
sourceConfig NESet EventId
eventIds =
IO (Either QErr Int) -> m (Either QErr Int)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either QErr Int) -> m (Either QErr Int))
-> IO (Either QErr Int) -> m (Either QErr Int)
forall a b. (a -> b) -> a -> b
$ PGSourceConfig
-> PGExecFrom -> TxET QErr IO Int -> IO (Either QErr Int)
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> PGExecFrom -> TxET QErr m a -> m (Either QErr a)
runPgSourceWriteTx SourceConfig ('Postgres pgKind)
PGSourceConfig
sourceConfig PGExecFrom
InternalRawQuery ([EventId] -> TxET QErr IO Int
unlockEventsTx ([EventId] -> TxET QErr IO Int) -> [EventId] -> TxET QErr IO Int
forall a b. (a -> b) -> a -> b
$ NESet EventId -> [EventId]
forall a. NESet a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList NESet EventId
eventIds)
checkIfTriggerExists ::
(MonadIO m, MonadError QErr m) =>
PGSourceConfig ->
TriggerName ->
HashSet Ops ->
m Bool
checkIfTriggerExists :: forall (m :: * -> *).
(MonadIO m, MonadError QErr m) =>
PGSourceConfig -> TriggerName -> HashSet Ops -> m Bool
checkIfTriggerExists PGSourceConfig
sourceConfig TriggerName
triggerName HashSet Ops
ops = do
m (Either QErr Bool) -> m Bool
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM
(m (Either QErr Bool) -> m Bool) -> m (Either QErr Bool) -> m Bool
forall a b. (a -> b) -> a -> b
$ IO (Either QErr Bool) -> m (Either QErr Bool)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
(IO (Either QErr Bool) -> m (Either QErr Bool))
-> IO (Either QErr Bool) -> m (Either QErr Bool)
forall a b. (a -> b) -> a -> b
$ PGSourceConfig
-> PGExecFrom -> TxET QErr IO Bool -> IO (Either QErr Bool)
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> PGExecFrom -> TxET QErr m a -> m (Either QErr a)
runPgSourceWriteTx PGSourceConfig
sourceConfig PGExecFrom
InternalRawQuery
(TxET QErr IO Bool -> IO (Either QErr Bool))
-> TxET QErr IO Bool -> IO (Either QErr Bool)
forall a b. (a -> b) -> a -> b
$
([Bool] -> Bool) -> TxET QErr IO [Bool] -> TxET QErr IO Bool
forall a b. (a -> b) -> TxET QErr IO a -> TxET QErr IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap [Bool] -> Bool
forall (t :: * -> *). Foldable t => t Bool -> Bool
or ((Ops -> TxET QErr IO Bool) -> [Ops] -> TxET QErr IO [Bool]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> [a] -> f [b]
traverse (TriggerName -> Ops -> TxET QErr IO Bool
checkIfFunctionExistsQ TriggerName
triggerName) (HashSet Ops -> [Ops]
forall a. HashSet a -> [a]
HashSet.toList HashSet Ops
ops))
insertInvocation :: TriggerName -> Invocation 'EventType -> PG.TxE QErr ()
insertInvocation :: TriggerName -> Invocation 'EventType -> TxET QErr IO ()
insertInvocation TriggerName
tName Invocation 'EventType
invo = do
(PGTxErr -> QErr)
-> Query
-> (EventId, Text, Maybe Int64, ViaJSON Value, ViaJSON Value)
-> Bool
-> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE
PGTxErr -> QErr
defaultTxErrorHandler
[PG.sql|
INSERT INTO hdb_catalog.event_invocation_logs (event_id, trigger_name, status, request, response)
VALUES ($1, $2, $3, $4, $5)
|]
( Invocation 'EventType -> EventId
forall (a :: TriggerTypes). Invocation a -> EventId
iEventId Invocation 'EventType
invo,
(TriggerName -> Text
triggerNameToTxt TriggerName
tName),
Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Int64) -> Maybe Int -> Maybe Int64
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Invocation 'EventType -> Maybe Int
forall (a :: TriggerTypes). Invocation a -> Maybe Int
iStatus Invocation 'EventType
invo :: Maybe Int64,
Value -> ViaJSON Value
forall a. a -> ViaJSON a
PG.ViaJSON (Value -> ViaJSON Value) -> Value -> ViaJSON Value
forall a b. (a -> b) -> a -> b
$ WebhookRequest -> Value
forall a. ToJSON a => a -> Value
toJSON (WebhookRequest -> Value) -> WebhookRequest -> Value
forall a b. (a -> b) -> a -> b
$ Invocation 'EventType -> WebhookRequest
forall (a :: TriggerTypes). Invocation a -> WebhookRequest
iRequest Invocation 'EventType
invo,
Value -> ViaJSON Value
forall a. a -> ViaJSON a
PG.ViaJSON (Value -> ViaJSON Value) -> Value -> ViaJSON Value
forall a b. (a -> b) -> a -> b
$ Response 'EventType -> Value
forall a. ToJSON a => a -> Value
toJSON (Response 'EventType -> Value) -> Response 'EventType -> Value
forall a b. (a -> b) -> a -> b
$ Invocation 'EventType -> Response 'EventType
forall (a :: TriggerTypes). Invocation a -> Response a
iResponse Invocation 'EventType
invo
)
Bool
True
(PGTxErr -> QErr)
-> Query -> Identity EventId -> Bool -> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE
PGTxErr -> QErr
defaultTxErrorHandler
[PG.sql|
UPDATE hdb_catalog.event_log
SET tries = tries + 1
WHERE id = $1
|]
(EventId -> Identity EventId
forall a. a -> Identity a
Identity (EventId -> Identity EventId) -> EventId -> Identity EventId
forall a b. (a -> b) -> a -> b
$ Invocation 'EventType -> EventId
forall (a :: TriggerTypes). Invocation a -> EventId
iEventId Invocation 'EventType
invo)
Bool
True
insertPGManualEvent ::
QualifiedTable ->
TriggerName ->
Value ->
PG.TxE QErr EventId
insertPGManualEvent :: QualifiedTable -> TriggerName -> Value -> TxET QErr IO EventId
insertPGManualEvent (QualifiedObject SchemaName
schemaName TableName
tableName) TriggerName
triggerName Value
rowData = do
Identity EventId -> EventId
forall a. Identity a -> a
runIdentity
(Identity EventId -> EventId)
-> (SingleRow (Identity EventId) -> Identity EventId)
-> SingleRow (Identity EventId)
-> EventId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SingleRow (Identity EventId) -> Identity EventId
forall a. SingleRow a -> a
PG.getRow
(SingleRow (Identity EventId) -> EventId)
-> TxET QErr IO (SingleRow (Identity EventId))
-> TxET QErr IO EventId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> (SchemaName, TableName, TriggerName, Text, ViaJSON Value)
-> Bool
-> TxET QErr IO (SingleRow (Identity EventId))
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
PG.withQE
PGTxErr -> QErr
defaultTxErrorHandler
[PG.sql|
SELECT hdb_catalog.insert_event_log($1, $2, $3, $4, $5)
|]
(SchemaName
schemaName, TableName
tableName, TriggerName
triggerName, (Ops -> Text
forall a. Show a => a -> Text
tshow Ops
MANUAL), Value -> ViaJSON Value
forall a. a -> ViaJSON a
PG.ViaJSON Value
rowData)
Bool
False
archiveEvents :: TriggerName -> PG.TxE QErr ()
archiveEvents :: TriggerName -> TxET QErr IO ()
archiveEvents TriggerName
trn =
(PGTxErr -> QErr)
-> Query -> Identity TriggerName -> Bool -> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE
PGTxErr -> QErr
defaultTxErrorHandler
[PG.sql|
UPDATE hdb_catalog.event_log
SET archived = 't'
WHERE trigger_name = $1
|]
(TriggerName -> Identity TriggerName
forall a. a -> Identity a
Identity TriggerName
trn)
Bool
False
getMaintenanceModeVersionTx :: PG.TxE QErr MaintenanceModeVersion
getMaintenanceModeVersionTx :: TxET QErr IO MaintenanceModeVersion
getMaintenanceModeVersionTx = TxET QErr IO MaintenanceModeVersion
-> TxET QErr IO MaintenanceModeVersion
forall a. TxE QErr a -> TxE QErr a
forall (m :: * -> *) a. MonadTx m => TxE QErr a -> m a
liftTx (TxET QErr IO MaintenanceModeVersion
-> TxET QErr IO MaintenanceModeVersion)
-> TxET QErr IO MaintenanceModeVersion
-> TxET QErr IO MaintenanceModeVersion
forall a b. (a -> b) -> a -> b
$ do
MetadataCatalogVersion
catalogVersion <- TxE QErr MetadataCatalogVersion
getCatalogVersion
if
| MetadataCatalogVersion
catalogVersion MetadataCatalogVersion -> MetadataCatalogVersion -> Bool
forall a. Eq a => a -> a -> Bool
== Int -> MetadataCatalogVersion
MetadataCatalogVersion Int
40 -> MaintenanceModeVersion -> TxET QErr IO MaintenanceModeVersion
forall a. a -> TxET QErr IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure MaintenanceModeVersion
PreviousMMVersion
| MetadataCatalogVersion
catalogVersion MetadataCatalogVersion -> MetadataCatalogVersion -> Bool
forall a. Eq a => a -> a -> Bool
== Int -> MetadataCatalogVersion
MetadataCatalogVersion Int
43 -> MaintenanceModeVersion -> TxET QErr IO MaintenanceModeVersion
forall a. a -> TxET QErr IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure MaintenanceModeVersion
CurrentMMVersion
| MetadataCatalogVersion
catalogVersion MetadataCatalogVersion -> MetadataCatalogVersion -> Bool
forall a. Eq a => a -> a -> Bool
== MetadataCatalogVersion
latestCatalogVersion -> MaintenanceModeVersion -> TxET QErr IO MaintenanceModeVersion
forall a. a -> TxET QErr IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure MaintenanceModeVersion
CurrentMMVersion
| Bool
otherwise ->
Text -> TxET QErr IO MaintenanceModeVersion
forall (m :: * -> *) a. QErrM m => Text -> m a
throw500
(Text -> TxET QErr IO MaintenanceModeVersion)
-> Text -> TxET QErr IO MaintenanceModeVersion
forall a b. (a -> b) -> a -> b
$ Text
"Maintenance mode is only supported with catalog versions: 40, 43 and "
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
forall a. Show a => a -> Text
tshow Text
latestCatalogVersionString
fetchEvents :: SourceName -> [TriggerName] -> FetchBatchSize -> PG.TxE QErr [Event ('Postgres pgKind)]
fetchEvents :: forall (pgKind :: PostgresKind).
SourceName
-> [TriggerName]
-> FetchBatchSize
-> TxE QErr [Event ('Postgres pgKind)]
fetchEvents SourceName
source [TriggerName]
triggerNames (FetchBatchSize Int
fetchBatchSize) =
((EventId, SchemaName, TableName, TriggerName, ViaJSON Value, Int,
LocalTime, Maybe UTCTime, UTCTime, Maybe UTCTime)
-> Event ('Postgres pgKind))
-> [(EventId, SchemaName, TableName, TriggerName, ViaJSON Value,
Int, LocalTime, Maybe UTCTime, UTCTime, Maybe UTCTime)]
-> [Event ('Postgres pgKind)]
forall a b. (a -> b) -> [a] -> [b]
map (EventId, SchemaName, TableName, TriggerName, ViaJSON Value, Int,
LocalTime, Maybe UTCTime, UTCTime, Maybe UTCTime)
-> Event ('Postgres pgKind)
uncurryEvent
([(EventId, SchemaName, TableName, TriggerName, ViaJSON Value, Int,
LocalTime, Maybe UTCTime, UTCTime, Maybe UTCTime)]
-> [Event ('Postgres pgKind)])
-> TxET
QErr
IO
[(EventId, SchemaName, TableName, TriggerName, ViaJSON Value, Int,
LocalTime, Maybe UTCTime, UTCTime, Maybe UTCTime)]
-> TxET QErr IO [Event ('Postgres pgKind)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> (Word64, PGTextArray)
-> Bool
-> TxET
QErr
IO
[(EventId, SchemaName, TableName, TriggerName, ViaJSON Value, Int,
LocalTime, Maybe UTCTime, UTCTime, Maybe UTCTime)]
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
PG.withQE
PGTxErr -> QErr
defaultTxErrorHandler
[PG.sql|
UPDATE hdb_catalog.event_log
SET locked = NOW()
WHERE id IN ( SELECT l.id
FROM hdb_catalog.event_log l
WHERE l.delivered = 'f' and l.error = 'f'
and (l.locked IS NULL or l.locked < (NOW() - interval '30 minute'))
and (l.next_retry_at is NULL or l.next_retry_at <= now())
and l.archived = 'f'
and l.trigger_name = ANY($2)
/* NB: this ordering is important for our index `event_log_fetch_events` */
/* (see `init_pg_source.sql`) */
ORDER BY locked NULLS FIRST, next_retry_at NULLS FIRST, created_at
LIMIT $1
FOR UPDATE SKIP LOCKED )
RETURNING id, schema_name, table_name, trigger_name, payload::json, tries, created_at, next_retry_at,
-- We need the UTC values of `created_at` and `next_retry_at` for metrics
-- calculation.
--
-- Only `TIMESTAMPZ` (time with timezone offset) values can be used for proper
-- conversions between timezones.
--
-- Since `created_at` and `next_retry_at` are `TIMESTAMP` values (time without
-- timezone offset), we first convert them to TIMESTAMPZ` values by appending
-- the timezone offset of the database. And then convert the `TIMESTAMPZ`
-- values to UTC.
(select (select created_at at time zone (select current_setting ('TIMEZONE'))) at time zone 'UTC'),
(select (select next_retry_at at time zone (select current_setting ('TIMEZONE'))) at time zone 'UTC');
|]
(Word64
limit, PGTextArray
triggerNamesTxt)
Bool
True
where
uncurryEvent :: (EventId, SchemaName, TableName, TriggerName, ViaJSON Value, Int,
LocalTime, Maybe UTCTime, UTCTime, Maybe UTCTime)
-> Event ('Postgres pgKind)
uncurryEvent (EventId
id', SchemaName
sourceName, TableName
tableName, TriggerName
triggerName, PG.ViaJSON Value
payload, Int
tries, LocalTime
created, Maybe UTCTime
retryAt, UTCTime
createdAtUTC, Maybe UTCTime
retryAtUTC :: Maybe UTCTime) =
Event
{ eId :: EventId
eId = EventId
id',
eSource :: SourceName
eSource = SourceName
source,
eTable :: TableName ('Postgres pgKind)
eTable = SchemaName -> TableName -> QualifiedTable
forall a. SchemaName -> a -> QualifiedObject a
QualifiedObject SchemaName
sourceName TableName
tableName,
eTrigger :: TriggerMetadata
eTrigger = TriggerName -> TriggerMetadata
TriggerMetadata TriggerName
triggerName,
eEvent :: Value
eEvent = Value
payload,
eTries :: Int
eTries = Int
tries,
eCreatedAt :: LocalTime
eCreatedAt = LocalTime
created,
eRetryAt :: Maybe UTCTime
eRetryAt = Maybe UTCTime
retryAt,
eCreatedAtUTC :: UTCTime
eCreatedAtUTC = UTCTime
createdAtUTC,
eRetryAtUTC :: Maybe UTCTime
eRetryAtUTC = Maybe UTCTime
retryAtUTC
}
limit :: Word64
limit = Int -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
fetchBatchSize :: Word64
triggerNamesTxt :: PGTextArray
triggerNamesTxt = [Text] -> PGTextArray
PGTextArray ([Text] -> PGTextArray) -> [Text] -> PGTextArray
forall a b. (a -> b) -> a -> b
$ TriggerName -> Text
triggerNameToTxt (TriggerName -> Text) -> [TriggerName] -> [Text]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [TriggerName]
triggerNames
fetchEventsMaintenanceMode :: SourceName -> [TriggerName] -> FetchBatchSize -> MaintenanceModeVersion -> PG.TxE QErr [Event ('Postgres pgKind)]
fetchEventsMaintenanceMode :: forall (pgKind :: PostgresKind).
SourceName
-> [TriggerName]
-> FetchBatchSize
-> MaintenanceModeVersion
-> TxE QErr [Event ('Postgres pgKind)]
fetchEventsMaintenanceMode SourceName
sourceName [TriggerName]
triggerNames FetchBatchSize
fetchBatchSize = \case
MaintenanceModeVersion
PreviousMMVersion ->
((EventId, SchemaName, TableName, TriggerName, ViaJSON Value, Int,
LocalTime, Maybe UTCTime, UTCTime, Maybe UTCTime)
-> Event ('Postgres pgKind))
-> [(EventId, SchemaName, TableName, TriggerName, ViaJSON Value,
Int, LocalTime, Maybe UTCTime, UTCTime, Maybe UTCTime)]
-> [Event ('Postgres pgKind)]
forall a b. (a -> b) -> [a] -> [b]
map (EventId, SchemaName, TableName, TriggerName, ViaJSON Value, Int,
LocalTime, Maybe UTCTime, UTCTime, Maybe UTCTime)
-> Event ('Postgres pgKind)
forall {b :: BackendType} {a}.
(TableName b ~ QualifiedObject a) =>
(EventId, SchemaName, a, TriggerName, ViaJSON Value, Int,
LocalTime, Maybe UTCTime, UTCTime, Maybe UTCTime)
-> Event b
uncurryEvent
([(EventId, SchemaName, TableName, TriggerName, ViaJSON Value, Int,
LocalTime, Maybe UTCTime, UTCTime, Maybe UTCTime)]
-> [Event ('Postgres pgKind)])
-> TxET
QErr
IO
[(EventId, SchemaName, TableName, TriggerName, ViaJSON Value, Int,
LocalTime, Maybe UTCTime, UTCTime, Maybe UTCTime)]
-> TxE QErr [Event ('Postgres pgKind)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> Identity Word64
-> Bool
-> TxET
QErr
IO
[(EventId, SchemaName, TableName, TriggerName, ViaJSON Value, Int,
LocalTime, Maybe UTCTime, UTCTime, Maybe UTCTime)]
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
PG.withQE
PGTxErr -> QErr
defaultTxErrorHandler
[PG.sql|
UPDATE hdb_catalog.event_log
SET locked = 't'
WHERE id IN ( SELECT l.id
FROM hdb_catalog.event_log l
WHERE l.delivered = 'f' and l.error = 'f' and l.locked = 'f'
and (l.next_retry_at is NULL or l.next_retry_at <= now())
and l.archived = 'f'
ORDER BY created_at
LIMIT $1
FOR UPDATE SKIP LOCKED )
RETURNING id, schema_name, table_name, trigger_name, payload::json, tries, created_at, next_retry_at,
(select (select created_at at time zone (select current_setting ('TIMEZONE'))) at time zone 'UTC'),
(select (select next_retry_at at time zone (select current_setting ('TIMEZONE'))) at time zone 'UTC');
|]
(Word64 -> Identity Word64
forall a. a -> Identity a
Identity Word64
limit)
Bool
True
where
uncurryEvent :: (EventId, SchemaName, a, TriggerName, ViaJSON Value, Int,
LocalTime, Maybe UTCTime, UTCTime, Maybe UTCTime)
-> Event b
uncurryEvent (EventId
id', SchemaName
sn, a
tn, TriggerName
trn, PG.ViaJSON Value
payload, Int
tries, LocalTime
created, Maybe UTCTime
retryAt, UTCTime
createdAtUTC, Maybe UTCTime
retryAtUTC) =
Event
{ eId :: EventId
eId = EventId
id',
eSource :: SourceName
eSource = SourceName
SNDefault,
eTable :: TableName b
eTable = SchemaName -> a -> QualifiedObject a
forall a. SchemaName -> a -> QualifiedObject a
QualifiedObject SchemaName
sn a
tn,
eTrigger :: TriggerMetadata
eTrigger = TriggerName -> TriggerMetadata
TriggerMetadata TriggerName
trn,
eEvent :: Value
eEvent = Value
payload,
eTries :: Int
eTries = Int
tries,
eCreatedAt :: LocalTime
eCreatedAt = LocalTime
created,
eRetryAt :: Maybe UTCTime
eRetryAt = Maybe UTCTime
retryAt,
eCreatedAtUTC :: UTCTime
eCreatedAtUTC = UTCTime
createdAtUTC,
eRetryAtUTC :: Maybe UTCTime
eRetryAtUTC = Maybe UTCTime
retryAtUTC
}
limit :: Word64
limit = Int -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (FetchBatchSize -> Int
_unFetchBatchSize FetchBatchSize
fetchBatchSize) :: Word64
MaintenanceModeVersion
CurrentMMVersion -> SourceName
-> [TriggerName]
-> FetchBatchSize
-> TxE QErr [Event ('Postgres pgKind)]
forall (pgKind :: PostgresKind).
SourceName
-> [TriggerName]
-> FetchBatchSize
-> TxE QErr [Event ('Postgres pgKind)]
fetchEvents SourceName
sourceName [TriggerName]
triggerNames FetchBatchSize
fetchBatchSize
setSuccessTx :: Event ('Postgres pgKind) -> MaintenanceMode MaintenanceModeVersion -> PG.TxE QErr ()
setSuccessTx :: forall (pgKind :: PostgresKind).
Event ('Postgres pgKind)
-> MaintenanceMode MaintenanceModeVersion -> TxET QErr IO ()
setSuccessTx Event ('Postgres pgKind)
e = \case
(MaintenanceModeEnabled MaintenanceModeVersion
PreviousMMVersion) ->
(PGTxErr -> QErr)
-> Query -> Identity EventId -> Bool -> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE
PGTxErr -> QErr
defaultTxErrorHandler
[PG.sql|
UPDATE hdb_catalog.event_log
SET delivered = 't', next_retry_at = NULL, locked = 'f'
WHERE id = $1
|]
(EventId -> Identity EventId
forall a. a -> Identity a
Identity (EventId -> Identity EventId) -> EventId -> Identity EventId
forall a b. (a -> b) -> a -> b
$ Event ('Postgres pgKind) -> EventId
forall (b :: BackendType). Event b -> EventId
eId Event ('Postgres pgKind)
e)
Bool
True
(MaintenanceModeEnabled MaintenanceModeVersion
CurrentMMVersion) -> TxET QErr IO ()
latestVersionSetSuccess
MaintenanceMode MaintenanceModeVersion
MaintenanceModeDisabled -> TxET QErr IO ()
latestVersionSetSuccess
where
latestVersionSetSuccess :: TxET QErr IO ()
latestVersionSetSuccess =
(PGTxErr -> QErr)
-> Query -> Identity EventId -> Bool -> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE
PGTxErr -> QErr
defaultTxErrorHandler
[PG.sql|
UPDATE hdb_catalog.event_log
SET delivered = 't', next_retry_at = NULL, locked = NULL
WHERE id = $1
|]
(EventId -> Identity EventId
forall a. a -> Identity a
Identity (EventId -> Identity EventId) -> EventId -> Identity EventId
forall a b. (a -> b) -> a -> b
$ Event ('Postgres pgKind) -> EventId
forall (b :: BackendType). Event b -> EventId
eId Event ('Postgres pgKind)
e)
Bool
True
setErrorTx :: Event ('Postgres pgKind) -> MaintenanceMode MaintenanceModeVersion -> PG.TxE QErr ()
setErrorTx :: forall (pgKind :: PostgresKind).
Event ('Postgres pgKind)
-> MaintenanceMode MaintenanceModeVersion -> TxET QErr IO ()
setErrorTx Event ('Postgres pgKind)
e = \case
(MaintenanceModeEnabled MaintenanceModeVersion
PreviousMMVersion) ->
(PGTxErr -> QErr)
-> Query -> Identity EventId -> Bool -> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE
PGTxErr -> QErr
defaultTxErrorHandler
[PG.sql|
UPDATE hdb_catalog.event_log
SET error = 't', next_retry_at = NULL, locked = 'f'
WHERE id = $1
|]
(EventId -> Identity EventId
forall a. a -> Identity a
Identity (EventId -> Identity EventId) -> EventId -> Identity EventId
forall a b. (a -> b) -> a -> b
$ Event ('Postgres pgKind) -> EventId
forall (b :: BackendType). Event b -> EventId
eId Event ('Postgres pgKind)
e)
Bool
True
(MaintenanceModeEnabled MaintenanceModeVersion
CurrentMMVersion) -> TxET QErr IO ()
latestVersionSetError
MaintenanceMode MaintenanceModeVersion
MaintenanceModeDisabled -> TxET QErr IO ()
latestVersionSetError
where
latestVersionSetError :: TxET QErr IO ()
latestVersionSetError =
(PGTxErr -> QErr)
-> Query -> Identity EventId -> Bool -> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE
PGTxErr -> QErr
defaultTxErrorHandler
[PG.sql|
UPDATE hdb_catalog.event_log
SET error = 't', next_retry_at = NULL, locked = NULL
WHERE id = $1
|]
(EventId -> Identity EventId
forall a. a -> Identity a
Identity (EventId -> Identity EventId) -> EventId -> Identity EventId
forall a b. (a -> b) -> a -> b
$ Event ('Postgres pgKind) -> EventId
forall (b :: BackendType). Event b -> EventId
eId Event ('Postgres pgKind)
e)
Bool
True
setRetryTx :: Event ('Postgres pgKind) -> Time.UTCTime -> MaintenanceMode MaintenanceModeVersion -> PG.TxE QErr ()
setRetryTx :: forall (pgKind :: PostgresKind).
Event ('Postgres pgKind)
-> UTCTime
-> MaintenanceMode MaintenanceModeVersion
-> TxET QErr IO ()
setRetryTx Event ('Postgres pgKind)
e UTCTime
time = \case
(MaintenanceModeEnabled MaintenanceModeVersion
PreviousMMVersion) ->
(PGTxErr -> QErr)
-> Query -> (UTCTime, EventId) -> Bool -> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE
PGTxErr -> QErr
defaultTxErrorHandler
[PG.sql|
UPDATE hdb_catalog.event_log
SET next_retry_at = $1, locked = 'f'
WHERE id = $2
|]
(UTCTime
time, Event ('Postgres pgKind) -> EventId
forall (b :: BackendType). Event b -> EventId
eId Event ('Postgres pgKind)
e)
Bool
True
(MaintenanceModeEnabled MaintenanceModeVersion
CurrentMMVersion) -> TxET QErr IO ()
latestVersionSetRetry
MaintenanceMode MaintenanceModeVersion
MaintenanceModeDisabled -> TxET QErr IO ()
latestVersionSetRetry
where
latestVersionSetRetry :: TxET QErr IO ()
latestVersionSetRetry =
(PGTxErr -> QErr)
-> Query -> (UTCTime, EventId) -> Bool -> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE
PGTxErr -> QErr
defaultTxErrorHandler
[PG.sql|
UPDATE hdb_catalog.event_log
SET next_retry_at = $1, locked = NULL
WHERE id = $2
|]
(UTCTime
time, Event ('Postgres pgKind) -> EventId
forall (b :: BackendType). Event b -> EventId
eId Event ('Postgres pgKind)
e)
Bool
True
dropTriggerQ :: TriggerName -> PG.TxE QErr ()
dropTriggerQ :: TriggerName -> TxET QErr IO ()
dropTriggerQ TriggerName
trn =
(Ops -> TxET QErr IO ()) -> [Ops] -> TxET QErr IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (TriggerName -> Ops -> TxET QErr IO ()
dropTriggerOp TriggerName
trn) [Ops
INSERT, Ops
UPDATE, Ops
DELETE]
dropTriggerOp :: TriggerName -> Ops -> PG.TxE QErr ()
dropTriggerOp :: TriggerName -> Ops -> TxET QErr IO ()
dropTriggerOp TriggerName
triggerName Ops
triggerOp =
(PGTxErr -> QErr) -> Query -> () -> Bool -> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE
PGTxErr -> QErr
defaultTxErrorHandler
(Text -> Query
PG.fromText (Text -> Query) -> Text -> Query
forall a b. (a -> b) -> a -> b
$ Ops -> Text
getDropFuncSql Ops
triggerOp)
()
Bool
False
where
getDropFuncSql :: Ops -> Text
getDropFuncSql :: Ops -> Text
getDropFuncSql Ops
op =
Text
"DROP FUNCTION IF EXISTS"
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" hdb_catalog."
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> QualifiedTriggerName -> Text
unQualifiedTriggerName (Ops -> TriggerName -> QualifiedTriggerName
pgIdenTrigger Ops
op TriggerName
triggerName)
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"()"
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" CASCADE"
checkEvent :: EventId -> PG.TxE QErr ()
checkEvent :: EventId -> TxET QErr IO ()
checkEvent EventId
eid = do
[Identity Bool]
events <-
(PGTxErr -> QErr)
-> Query
-> Identity EventId
-> Bool
-> TxET QErr IO [Identity Bool]
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
PG.withQE
PGTxErr -> QErr
defaultTxErrorHandler
[PG.sql|
SELECT l.locked IS NOT NULL AND l.locked >= (NOW() - interval '30 minute')
FROM hdb_catalog.event_log l
WHERE l.id = $1
|]
(EventId -> Identity EventId
forall a. a -> Identity a
Identity EventId
eid)
Bool
True
Identity Bool
event <- [Identity Bool] -> TxET QErr IO (Identity Bool)
forall {m :: * -> *} {a}. MonadError QErr m => [a] -> m a
getEvent [Identity Bool]
events
Identity Bool -> TxET QErr IO ()
forall {f :: * -> *}. MonadError QErr f => Identity Bool -> f ()
assertEventUnlocked Identity Bool
event
where
getEvent :: [a] -> m a
getEvent [] = Code -> Text -> m a
forall (m :: * -> *) a. QErrM m => Code -> Text -> m a
throw400 Code
NotExists Text
"event not found"
getEvent (a
x : [a]
_) = a -> m a
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
x
assertEventUnlocked :: Identity Bool -> f ()
assertEventUnlocked (Identity Bool
locked) =
Bool -> f () -> f ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
locked
(f () -> f ()) -> f () -> f ()
forall a b. (a -> b) -> a -> b
$ Code -> Text -> f ()
forall (m :: * -> *) a. QErrM m => Code -> Text -> m a
throw400 Code
Busy Text
"event is already being processed"
markForDelivery :: EventId -> PG.TxE QErr ()
markForDelivery :: EventId -> TxET QErr IO ()
markForDelivery EventId
eid =
(PGTxErr -> QErr)
-> Query -> Identity EventId -> Bool -> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE
PGTxErr -> QErr
defaultTxErrorHandler
[PG.sql|
UPDATE hdb_catalog.event_log
SET
delivered = 'f',
error = 'f',
tries = 0
WHERE id = $1
|]
(EventId -> Identity EventId
forall a. a -> Identity a
Identity EventId
eid)
Bool
True
redeliverEventTx :: EventId -> PG.TxE QErr ()
redeliverEventTx :: EventId -> TxET QErr IO ()
redeliverEventTx EventId
eventId = do
EventId -> TxET QErr IO ()
checkEvent EventId
eventId
EventId -> TxET QErr IO ()
markForDelivery EventId
eventId
unlockEventsTx :: [EventId] -> PG.TxE QErr Int
unlockEventsTx :: [EventId] -> TxET QErr IO Int
unlockEventsTx [EventId]
eventIds =
Identity Int -> Int
forall a. Identity a -> a
runIdentity
(Identity Int -> Int)
-> (SingleRow (Identity Int) -> Identity Int)
-> SingleRow (Identity Int)
-> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SingleRow (Identity Int) -> Identity Int
forall a. SingleRow a -> a
PG.getRow
(SingleRow (Identity Int) -> Int)
-> TxET QErr IO (SingleRow (Identity Int)) -> TxET QErr IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> Identity PGTextArray
-> Bool
-> TxET QErr IO (SingleRow (Identity Int))
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
PG.withQE
PGTxErr -> QErr
defaultTxErrorHandler
[PG.sql|
WITH "cte" AS
(UPDATE hdb_catalog.event_log
SET locked = NULL
WHERE id = ANY($1::text[])
-- only unlock those events that have been locked, it's possible
-- that an event has been processed but not yet been removed from
-- the saved locked events, which will lead to a double send
AND locked IS NOT NULL
RETURNING *)
SELECT count(*) FROM "cte"
|]
(PGTextArray -> Identity PGTextArray
forall a. a -> Identity a
Identity (PGTextArray -> Identity PGTextArray)
-> PGTextArray -> Identity PGTextArray
forall a b. (a -> b) -> a -> b
$ [Text] -> PGTextArray
PGTextArray ([Text] -> PGTextArray) -> [Text] -> PGTextArray
forall a b. (a -> b) -> a -> b
$ (EventId -> Text) -> [EventId] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map EventId -> Text
unEventId [EventId]
eventIds)
Bool
True
newtype QualifiedTriggerName = QualifiedTriggerName {QualifiedTriggerName -> Text
unQualifiedTriggerName :: Text}
deriving (Int -> QualifiedTriggerName -> ShowS
[QualifiedTriggerName] -> ShowS
QualifiedTriggerName -> String
(Int -> QualifiedTriggerName -> ShowS)
-> (QualifiedTriggerName -> String)
-> ([QualifiedTriggerName] -> ShowS)
-> Show QualifiedTriggerName
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> QualifiedTriggerName -> ShowS
showsPrec :: Int -> QualifiedTriggerName -> ShowS
$cshow :: QualifiedTriggerName -> String
show :: QualifiedTriggerName -> String
$cshowList :: [QualifiedTriggerName] -> ShowS
showList :: [QualifiedTriggerName] -> ShowS
Show, QualifiedTriggerName -> QualifiedTriggerName -> Bool
(QualifiedTriggerName -> QualifiedTriggerName -> Bool)
-> (QualifiedTriggerName -> QualifiedTriggerName -> Bool)
-> Eq QualifiedTriggerName
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: QualifiedTriggerName -> QualifiedTriggerName -> Bool
== :: QualifiedTriggerName -> QualifiedTriggerName -> Bool
$c/= :: QualifiedTriggerName -> QualifiedTriggerName -> Bool
/= :: QualifiedTriggerName -> QualifiedTriggerName -> Bool
Eq, QualifiedTriggerName -> PrepArg
(QualifiedTriggerName -> PrepArg) -> ToPrepArg QualifiedTriggerName
forall a. (a -> PrepArg) -> ToPrepArg a
$ctoPrepVal :: QualifiedTriggerName -> PrepArg
toPrepVal :: QualifiedTriggerName -> PrepArg
PG.ToPrepArg)
pgTriggerName :: Ops -> TriggerName -> QualifiedTriggerName
pgTriggerName :: Ops -> TriggerName -> QualifiedTriggerName
pgTriggerName Ops
op TriggerName
trn = Ops -> Text -> QualifiedTriggerName
forall {a}. Show a => a -> Text -> QualifiedTriggerName
qualifyTriggerName Ops
op (Text -> QualifiedTriggerName) -> Text -> QualifiedTriggerName
forall a b. (a -> b) -> a -> b
$ TriggerName -> Text
triggerNameToTxt TriggerName
trn
where
qualifyTriggerName :: a -> Text -> QualifiedTriggerName
qualifyTriggerName a
op' Text
trn' =
Text -> QualifiedTriggerName
QualifiedTriggerName (Text -> QualifiedTriggerName) -> Text -> QualifiedTriggerName
forall a b. (a -> b) -> a -> b
$ Text
"notify_hasura_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
trn' Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> a -> Text
forall a. Show a => a -> Text
tshow a
op'
pgIdenTrigger :: Ops -> TriggerName -> QualifiedTriggerName
pgIdenTrigger :: Ops -> TriggerName -> QualifiedTriggerName
pgIdenTrigger Ops
op = Text -> QualifiedTriggerName
QualifiedTriggerName (Text -> QualifiedTriggerName)
-> (TriggerName -> Text) -> TriggerName -> QualifiedTriggerName
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> Text
pgFmtIdentifier (Text -> Text) -> (TriggerName -> Text) -> TriggerName -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. QualifiedTriggerName -> Text
unQualifiedTriggerName (QualifiedTriggerName -> Text)
-> (TriggerName -> QualifiedTriggerName) -> TriggerName -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Ops -> TriggerName -> QualifiedTriggerName
pgTriggerName Ops
op
mkTriggerFunctionQ ::
forall pgKind m.
(Backend ('Postgres pgKind), MonadTx m, MonadReader SQLGenCtx m) =>
TriggerName ->
QualifiedTable ->
[ColumnInfo ('Postgres pgKind)] ->
Ops ->
SubscribeOpSpec ('Postgres pgKind) ->
m QualifiedTriggerName
mkTriggerFunctionQ :: forall (pgKind :: PostgresKind) (m :: * -> *).
(Backend ('Postgres pgKind), MonadTx m, MonadReader SQLGenCtx m) =>
TriggerName
-> QualifiedTable
-> [ColumnInfo ('Postgres pgKind)]
-> Ops
-> SubscribeOpSpec ('Postgres pgKind)
-> m QualifiedTriggerName
mkTriggerFunctionQ TriggerName
triggerName (QualifiedObject SchemaName
schema TableName
table) [ColumnInfo ('Postgres pgKind)]
allCols Ops
op (SubscribeOpSpec SubscribeColumns ('Postgres pgKind)
listenColumns Maybe (SubscribeColumns ('Postgres pgKind))
deliveryColumns') = do
StringifyNumbers
strfyNum <- (SQLGenCtx -> StringifyNumbers) -> m StringifyNumbers
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SQLGenCtx -> StringifyNumbers
stringifyNum
let dbQualifiedTriggerName :: QualifiedTriggerName
dbQualifiedTriggerName = Ops -> TriggerName -> QualifiedTriggerName
pgIdenTrigger Ops
op TriggerName
triggerName
() <-
TxET QErr IO () -> m ()
forall a. TxE QErr a -> m a
forall (m :: * -> *) a. MonadTx m => TxE QErr a -> m a
liftTx
(TxET QErr IO () -> m ()) -> TxET QErr IO () -> m ()
forall a b. (a -> b) -> a -> b
$ (PGTxErr -> QErr) -> Query -> TxET QErr IO ()
forall (m :: * -> *) a e.
(MonadIO m, FromRes a) =>
(PGTxErr -> e) -> Query -> TxET e m a
PG.multiQE PGTxErr -> QErr
defaultTxErrorHandler
(Query -> TxET QErr IO ()) -> Query -> TxET QErr IO ()
forall a b. (a -> b) -> a -> b
$ Text -> Query
PG.fromText
(Text -> Query) -> (Text -> Text) -> Text -> Query
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> Text
TL.toStrict
(Text -> Query) -> Text -> Query
forall a b. (a -> b) -> a -> b
$ let
deliveryColumns :: SubscribeColumns ('Postgres pgKind)
deliveryColumns = SubscribeColumns ('Postgres pgKind)
-> Maybe (SubscribeColumns ('Postgres pgKind))
-> SubscribeColumns ('Postgres pgKind)
forall a. a -> Maybe a -> a
fromMaybe SubscribeColumns ('Postgres pgKind)
forall (b :: BackendType). SubscribeColumns b
SubCStar Maybe (SubscribeColumns ('Postgres pgKind))
deliveryColumns'
getApplicableColumns :: SubscribeColumns ('Postgres pgKind)
-> [ColumnInfo ('Postgres pgKind)]
getApplicableColumns = \case
SubscribeColumns ('Postgres pgKind)
SubCStar -> [ColumnInfo ('Postgres pgKind)]
allCols
SubCArray [Column ('Postgres pgKind)]
cols -> [Column ('Postgres pgKind)]
-> [ColumnInfo ('Postgres pgKind)]
-> [ColumnInfo ('Postgres pgKind)]
forall (b :: BackendType).
Backend b =>
[Column b] -> [ColumnInfo b] -> [ColumnInfo b]
getColInfos [Column ('Postgres pgKind)]
cols [ColumnInfo ('Postgres pgKind)]
allCols
applicableDeliveryCols :: [ColumnInfo ('Postgres pgKind)]
applicableDeliveryCols = SubscribeColumns ('Postgres pgKind)
-> [ColumnInfo ('Postgres pgKind)]
getApplicableColumns SubscribeColumns ('Postgres pgKind)
deliveryColumns
getRowExpression :: OpVar -> SQLExp
getRowExpression OpVar
opVar = SQLExp -> SQLExp
applyRowToJson' (SQLExp -> SQLExp) -> SQLExp -> SQLExp
forall a b. (a -> b) -> a -> b
$ OpVar
-> StringifyNumbers -> [ColumnInfo ('Postgres pgKind)] -> SQLExp
mkRowExpression OpVar
opVar StringifyNumbers
strfyNum [ColumnInfo ('Postgres pgKind)]
applicableDeliveryCols
applicableListenCols :: [ColumnInfo ('Postgres pgKind)]
applicableListenCols = SubscribeColumns ('Postgres pgKind)
-> [ColumnInfo ('Postgres pgKind)]
getApplicableColumns SubscribeColumns ('Postgres pgKind)
listenColumns
renderRow :: OpVar -> SQLExp
renderRow OpVar
opVar = SQLExp -> SQLExp
applyRow (SQLExp -> SQLExp) -> SQLExp -> SQLExp
forall a b. (a -> b) -> a -> b
$ OpVar
-> StringifyNumbers -> [ColumnInfo ('Postgres pgKind)] -> SQLExp
mkRowExpression OpVar
opVar StringifyNumbers
strfyNum [ColumnInfo ('Postgres pgKind)]
applicableListenCols
oldDataExp :: SQLExp
oldDataExp = case Ops
op of
Ops
INSERT -> SQLExp
SENull
Ops
UPDATE -> OpVar -> SQLExp
getRowExpression OpVar
OLD
Ops
DELETE -> OpVar -> SQLExp
getRowExpression OpVar
OLD
Ops
MANUAL -> SQLExp
SENull
newDataExp :: SQLExp
newDataExp = case Ops
op of
Ops
INSERT -> OpVar -> SQLExp
getRowExpression OpVar
NEW
Ops
UPDATE -> OpVar -> SQLExp
getRowExpression OpVar
NEW
Ops
DELETE -> SQLExp
SENull
Ops
MANUAL -> SQLExp
SENull
name :: Text
name = TriggerName -> Text
triggerNameToTxt TriggerName
triggerName
qualifiedTriggerName :: Text
qualifiedTriggerName = QualifiedTriggerName -> Text
unQualifiedTriggerName QualifiedTriggerName
dbQualifiedTriggerName
schemaName :: Text
schemaName = Text -> Text
pgFmtLit (Text -> Text) -> Text -> Text
forall a b. (a -> b) -> a -> b
$ SchemaName -> Text
getSchemaTxt SchemaName
schema
tableName :: Text
tableName = Text -> Text
pgFmtLit (Text -> Text) -> Text -> Text
forall a b. (a -> b) -> a -> b
$ TableName -> Text
getTableTxt TableName
table
oldRow :: Text
oldRow = SQLExp -> Text
forall a. ToSQL a => a -> Text
toSQLTxt (SQLExp -> Text) -> SQLExp -> Text
forall a b. (a -> b) -> a -> b
$ OpVar -> SQLExp
renderRow OpVar
OLD
newRow :: Text
newRow = SQLExp -> Text
forall a. ToSQL a => a -> Text
toSQLTxt (SQLExp -> Text) -> SQLExp -> Text
forall a b. (a -> b) -> a -> b
$ OpVar -> SQLExp
renderRow OpVar
NEW
oldPayloadExpression :: Text
oldPayloadExpression = SQLExp -> Text
forall a. ToSQL a => a -> Text
toSQLTxt SQLExp
oldDataExp
newPayloadExpression :: Text
newPayloadExpression = SQLExp -> Text
forall a. ToSQL a => a -> Text
toSQLTxt SQLExp
newDataExp
in $(makeRelativeToProject "src-rsr/trigger.sql.shakespeare" >>= ST.stextFile)
QualifiedTriggerName -> m QualifiedTriggerName
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure QualifiedTriggerName
dbQualifiedTriggerName
where
applyRowToJson' :: SQLExp -> SQLExp
applyRowToJson' SQLExp
e = Text -> [SQLExp] -> Maybe OrderByExp -> SQLExp
SEFnApp Text
"row_to_json" [SQLExp
e] Maybe OrderByExp
forall a. Maybe a
Nothing
applyRow :: SQLExp -> SQLExp
applyRow SQLExp
e = Text -> [SQLExp] -> Maybe OrderByExp -> SQLExp
SEFnApp Text
"row" [SQLExp
e] Maybe OrderByExp
forall a. Maybe a
Nothing
opToQual :: OpVar -> Qual
opToQual = Text -> Qual
QualVar (Text -> Qual) -> (OpVar -> Text) -> OpVar -> Qual
forall b c a. (b -> c) -> (a -> b) -> a -> c
. OpVar -> Text
forall a. Show a => a -> Text
tshow
mkRowExpression :: OpVar
-> StringifyNumbers -> [ColumnInfo ('Postgres pgKind)] -> SQLExp
mkRowExpression OpVar
opVar StringifyNumbers
strfyNum [ColumnInfo ('Postgres pgKind)]
columns =
[Extractor] -> SQLExp
mkRowExp ([Extractor] -> SQLExp) -> [Extractor] -> SQLExp
forall a b. (a -> b) -> a -> b
$ (ColumnInfo ('Postgres pgKind) -> Extractor)
-> [ColumnInfo ('Postgres pgKind)] -> [Extractor]
forall a b. (a -> b) -> [a] -> [b]
map (\ColumnInfo ('Postgres pgKind)
col -> SQLExp -> ColumnInfo ('Postgres pgKind) -> Extractor
forall {b :: BackendType}.
(ScalarType b ~ PGScalarType, Column b ~ PGCol) =>
SQLExp -> ColumnInfo b -> Extractor
toExtractor (OpVar
-> StringifyNumbers -> ColumnInfo ('Postgres pgKind) -> SQLExp
mkQId OpVar
opVar StringifyNumbers
strfyNum ColumnInfo ('Postgres pgKind)
col) ColumnInfo ('Postgres pgKind)
col) [ColumnInfo ('Postgres pgKind)]
columns
mkQId :: OpVar
-> StringifyNumbers -> ColumnInfo ('Postgres pgKind) -> SQLExp
mkQId OpVar
opVar StringifyNumbers
strfyNum ColumnInfo ('Postgres pgKind)
colInfo =
StringifyNumbers
-> ColumnType ('Postgres pgKind)
-> Bool
-> Maybe NamingCase
-> SQLExp
-> SQLExp
forall (pgKind :: PostgresKind).
StringifyNumbers
-> ColumnType ('Postgres pgKind)
-> Bool
-> Maybe NamingCase
-> SQLExp
-> SQLExp
toJSONableExp StringifyNumbers
strfyNum (ColumnInfo ('Postgres pgKind) -> ColumnType ('Postgres pgKind)
forall (b :: BackendType). ColumnInfo b -> ColumnType b
ciType ColumnInfo ('Postgres pgKind)
colInfo) Bool
False Maybe NamingCase
forall a. Maybe a
Nothing
(SQLExp -> SQLExp) -> SQLExp -> SQLExp
forall a b. (a -> b) -> a -> b
$ QIdentifier -> SQLExp
SEQIdentifier
(QIdentifier -> SQLExp) -> QIdentifier -> SQLExp
forall a b. (a -> b) -> a -> b
$ Qual -> Identifier -> QIdentifier
QIdentifier (OpVar -> Qual
opToQual OpVar
opVar)
(Identifier -> QIdentifier) -> Identifier -> QIdentifier
forall a b. (a -> b) -> a -> b
$ PGCol -> Identifier
forall a. IsIdentifier a => a -> Identifier
toIdentifier
(PGCol -> Identifier) -> PGCol -> Identifier
forall a b. (a -> b) -> a -> b
$ ColumnInfo ('Postgres pgKind) -> Column ('Postgres pgKind)
forall (b :: BackendType). ColumnInfo b -> Column b
ciColumn ColumnInfo ('Postgres pgKind)
colInfo
toExtractor :: SQLExp -> ColumnInfo b -> Extractor
toExtractor SQLExp
sqlExp ColumnInfo b
column
| (ScalarType b -> Bool) -> ColumnType b -> Bool
forall (b :: BackendType).
(ScalarType b -> Bool) -> ColumnType b -> Bool
isScalarColumnWhere ScalarType b -> Bool
PGScalarType -> Bool
isGeoType (ColumnInfo b -> ColumnType b
forall (b :: BackendType). ColumnInfo b -> ColumnType b
ciType ColumnInfo b
column) = SQLExp -> Maybe ColumnAlias -> Extractor
Extractor SQLExp
sqlExp (ColumnAlias -> Maybe ColumnAlias
forall a. a -> Maybe a
Just (ColumnAlias -> Maybe ColumnAlias)
-> ColumnAlias -> Maybe ColumnAlias
forall a b. (a -> b) -> a -> b
$ ColumnInfo b -> ColumnAlias
forall {b :: BackendType}.
(Column b ~ PGCol) =>
ColumnInfo b -> ColumnAlias
getAlias ColumnInfo b
column)
| Bool
otherwise = SQLExp -> Maybe ColumnAlias -> Extractor
Extractor SQLExp
sqlExp Maybe ColumnAlias
forall a. Maybe a
Nothing
getAlias :: ColumnInfo b -> ColumnAlias
getAlias ColumnInfo b
col = Identifier -> ColumnAlias
forall a. IsIdentifier a => a -> ColumnAlias
toColumnAlias (Identifier -> ColumnAlias) -> Identifier -> ColumnAlias
forall a b. (a -> b) -> a -> b
$ Text -> Identifier
Identifier (Text -> Identifier) -> Text -> Identifier
forall a b. (a -> b) -> a -> b
$ PGCol -> Text
getPGColTxt (ColumnInfo b -> Column b
forall (b :: BackendType). ColumnInfo b -> Column b
ciColumn ColumnInfo b
col)
checkIfTriggerExistsForTableQ ::
QualifiedTriggerName ->
QualifiedTable ->
PG.TxE QErr Bool
checkIfTriggerExistsForTableQ :: QualifiedTriggerName -> QualifiedTable -> TxET QErr IO Bool
checkIfTriggerExistsForTableQ (QualifiedTriggerName Text
triggerName) (QualifiedObject SchemaName
schemaName TableName
tableName) =
(SingleRow (Identity Bool) -> Bool)
-> TxET QErr IO (SingleRow (Identity Bool)) -> TxET QErr IO Bool
forall a b. (a -> b) -> TxET QErr IO a -> TxET QErr IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Identity Bool -> Bool
forall a. Identity a -> a
runIdentity (Identity Bool -> Bool)
-> (SingleRow (Identity Bool) -> Identity Bool)
-> SingleRow (Identity Bool)
-> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SingleRow (Identity Bool) -> Identity Bool
forall a. SingleRow a -> a
PG.getRow)
(TxET QErr IO (SingleRow (Identity Bool)) -> TxET QErr IO Bool)
-> TxET QErr IO (SingleRow (Identity Bool)) -> TxET QErr IO Bool
forall a b. (a -> b) -> a -> b
$ (PGTxErr -> QErr)
-> Query
-> (Text, SchemaName, TableName)
-> Bool
-> TxET QErr IO (SingleRow (Identity Bool))
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
PG.withQE
PGTxErr -> QErr
defaultTxErrorHandler
[PG.sql|
SELECT EXISTS (
SELECT 1
FROM pg_trigger
WHERE NOT tgisinternal
AND tgname = $1 AND tgrelid = (quote_ident($2) || '.' || quote_ident($3))::regclass
)
|]
(Text
triggerName, SchemaName
schemaName, TableName
tableName)
Bool
True
checkIfFunctionExistsQ ::
TriggerName ->
Ops ->
PG.TxE QErr Bool
checkIfFunctionExistsQ :: TriggerName -> Ops -> TxET QErr IO Bool
checkIfFunctionExistsQ TriggerName
triggerName Ops
op = do
let qualifiedTriggerName :: QualifiedTriggerName
qualifiedTriggerName = Ops -> TriggerName -> QualifiedTriggerName
pgTriggerName Ops
op TriggerName
triggerName
(SingleRow (Identity Bool) -> Bool)
-> TxET QErr IO (SingleRow (Identity Bool)) -> TxET QErr IO Bool
forall a b. (a -> b) -> TxET QErr IO a -> TxET QErr IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Identity Bool -> Bool
forall a. Identity a -> a
runIdentity (Identity Bool -> Bool)
-> (SingleRow (Identity Bool) -> Identity Bool)
-> SingleRow (Identity Bool)
-> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SingleRow (Identity Bool) -> Identity Bool
forall a. SingleRow a -> a
PG.getRow)
(TxET QErr IO (SingleRow (Identity Bool)) -> TxET QErr IO Bool)
-> TxET QErr IO (SingleRow (Identity Bool)) -> TxET QErr IO Bool
forall a b. (a -> b) -> a -> b
$ (PGTxErr -> QErr)
-> Query
-> Identity QualifiedTriggerName
-> Bool
-> TxET QErr IO (SingleRow (Identity Bool))
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
PG.withQE
PGTxErr -> QErr
defaultTxErrorHandler
[PG.sql|
SELECT EXISTS (
SELECT 1
FROM pg_catalog.pg_proc
JOIN pg_namespace ON pg_catalog.pg_proc.pronamespace = pg_namespace.oid
WHERE proname = $1
AND pg_namespace.nspname = 'hdb_catalog'
)
|]
(QualifiedTriggerName -> Identity QualifiedTriggerName
forall a. a -> Identity a
Identity QualifiedTriggerName
qualifiedTriggerName)
Bool
True
mkTrigger ::
forall pgKind m.
(Backend ('Postgres pgKind), MonadTx m, MonadReader SQLGenCtx m) =>
TriggerName ->
QualifiedTable ->
TriggerOnReplication ->
[ColumnInfo ('Postgres pgKind)] ->
Ops ->
SubscribeOpSpec ('Postgres pgKind) ->
m ()
mkTrigger :: forall (pgKind :: PostgresKind) (m :: * -> *).
(Backend ('Postgres pgKind), MonadTx m, MonadReader SQLGenCtx m) =>
TriggerName
-> QualifiedTable
-> TriggerOnReplication
-> [ColumnInfo ('Postgres pgKind)]
-> Ops
-> SubscribeOpSpec ('Postgres pgKind)
-> m ()
mkTrigger TriggerName
triggerName QualifiedTable
table TriggerOnReplication
triggerOnReplication [ColumnInfo ('Postgres pgKind)]
allCols Ops
op SubscribeOpSpec ('Postgres pgKind)
subOpSpec = do
QualifiedTriggerName Text
dbTriggerNameTxt <- TriggerName
-> QualifiedTable
-> [ColumnInfo ('Postgres pgKind)]
-> Ops
-> SubscribeOpSpec ('Postgres pgKind)
-> m QualifiedTriggerName
forall (pgKind :: PostgresKind) (m :: * -> *).
(Backend ('Postgres pgKind), MonadTx m, MonadReader SQLGenCtx m) =>
TriggerName
-> QualifiedTable
-> [ColumnInfo ('Postgres pgKind)]
-> Ops
-> SubscribeOpSpec ('Postgres pgKind)
-> m QualifiedTriggerName
mkTriggerFunctionQ TriggerName
triggerName QualifiedTable
table [ColumnInfo ('Postgres pgKind)]
allCols Ops
op SubscribeOpSpec ('Postgres pgKind)
subOpSpec
Bool
doesTriggerExist <- TxET QErr IO Bool -> m Bool
forall a. TxE QErr a -> m a
forall (m :: * -> *) a. MonadTx m => TxE QErr a -> m a
liftTx (TxET QErr IO Bool -> m Bool) -> TxET QErr IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ QualifiedTriggerName -> QualifiedTable -> TxET QErr IO Bool
checkIfTriggerExistsForTableQ (Ops -> TriggerName -> QualifiedTriggerName
pgTriggerName Ops
op TriggerName
triggerName) QualifiedTable
table
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
doesTriggerExist
(m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ let createTriggerSqlQuery :: Query
createTriggerSqlQuery =
Text -> Query
PG.fromText (Text -> Query) -> Text -> Query
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text -> Text
forall {a} {a} {a}.
(ToText a, ToText a, ToText a) =>
a -> a -> a -> Text
createTriggerSQL Text
dbTriggerNameTxt (QualifiedTable -> Text
forall a. ToSQL a => a -> Text
toSQLTxt QualifiedTable
table) (Ops -> Text
forall a. Show a => a -> Text
tshow Ops
op)
in TxET QErr IO () -> m ()
forall a. TxE QErr a -> m a
forall (m :: * -> *) a. MonadTx m => TxE QErr a -> m a
liftTx (TxET QErr IO () -> m ()) -> TxET QErr IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
(PGTxErr -> QErr) -> Query -> () -> Bool -> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE PGTxErr -> QErr
defaultTxErrorHandler Query
createTriggerSqlQuery () Bool
False
Bool -> TxET QErr IO () -> TxET QErr IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (TriggerOnReplication
triggerOnReplication TriggerOnReplication -> TriggerOnReplication -> Bool
forall a. Eq a => a -> a -> Bool
== TriggerOnReplication
TOREnableTrigger)
(TxET QErr IO () -> TxET QErr IO ())
-> TxET QErr IO () -> TxET QErr IO ()
forall a b. (a -> b) -> a -> b
$ (PGTxErr -> QErr) -> Query -> () -> Bool -> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE PGTxErr -> QErr
defaultTxErrorHandler (Text -> Text -> Query
forall {a} {a}. (ToText a, ToText a) => a -> a -> Query
alwaysEnableTriggerQuery Text
dbTriggerNameTxt (QualifiedTable -> Text
forall a. ToSQL a => a -> Text
toSQLTxt QualifiedTable
table)) () Bool
False
where
createTriggerSQL :: a -> a -> a -> Text
createTriggerSQL a
triggerNameTxt a
tableName a
opText =
[ST.st|
CREATE TRIGGER #{triggerNameTxt} AFTER #{opText} ON #{tableName} FOR EACH ROW EXECUTE PROCEDURE hdb_catalog.#{triggerNameTxt}()
|]
alwaysEnableTriggerQuery :: a -> a -> Query
alwaysEnableTriggerQuery a
triggerNameTxt a
tableTxt =
Text -> Query
PG.fromText
(Text -> Query) -> Text -> Query
forall a b. (a -> b) -> a -> b
$ [ST.st|
ALTER TABLE #{tableTxt} ENABLE ALWAYS TRIGGER #{triggerNameTxt};
|]
mkAllTriggersQ ::
forall pgKind m.
(Backend ('Postgres pgKind), MonadTx m, MonadReader SQLGenCtx m) =>
TriggerName ->
QualifiedTable ->
TriggerOnReplication ->
[ColumnInfo ('Postgres pgKind)] ->
TriggerOpsDef ('Postgres pgKind) ->
m ()
mkAllTriggersQ :: forall (pgKind :: PostgresKind) (m :: * -> *).
(Backend ('Postgres pgKind), MonadTx m, MonadReader SQLGenCtx m) =>
TriggerName
-> QualifiedTable
-> TriggerOnReplication
-> [ColumnInfo ('Postgres pgKind)]
-> TriggerOpsDef ('Postgres pgKind)
-> m ()
mkAllTriggersQ TriggerName
triggerName QualifiedTable
table TriggerOnReplication
triggerOnReplication [ColumnInfo ('Postgres pgKind)]
allCols TriggerOpsDef ('Postgres pgKind)
fullspec = do
Maybe (SubscribeOpSpec ('Postgres pgKind))
-> (SubscribeOpSpec ('Postgres pgKind) -> m ()) -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ (TriggerOpsDef ('Postgres pgKind)
-> Maybe (SubscribeOpSpec ('Postgres pgKind))
forall (b :: BackendType).
TriggerOpsDef b -> Maybe (SubscribeOpSpec b)
tdInsert TriggerOpsDef ('Postgres pgKind)
fullspec) (TriggerName
-> QualifiedTable
-> TriggerOnReplication
-> [ColumnInfo ('Postgres pgKind)]
-> Ops
-> SubscribeOpSpec ('Postgres pgKind)
-> m ()
forall (pgKind :: PostgresKind) (m :: * -> *).
(Backend ('Postgres pgKind), MonadTx m, MonadReader SQLGenCtx m) =>
TriggerName
-> QualifiedTable
-> TriggerOnReplication
-> [ColumnInfo ('Postgres pgKind)]
-> Ops
-> SubscribeOpSpec ('Postgres pgKind)
-> m ()
mkTrigger TriggerName
triggerName QualifiedTable
table TriggerOnReplication
triggerOnReplication [ColumnInfo ('Postgres pgKind)]
allCols Ops
INSERT)
Maybe (SubscribeOpSpec ('Postgres pgKind))
-> (SubscribeOpSpec ('Postgres pgKind) -> m ()) -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ (TriggerOpsDef ('Postgres pgKind)
-> Maybe (SubscribeOpSpec ('Postgres pgKind))
forall (b :: BackendType).
TriggerOpsDef b -> Maybe (SubscribeOpSpec b)
tdUpdate TriggerOpsDef ('Postgres pgKind)
fullspec) (TriggerName
-> QualifiedTable
-> TriggerOnReplication
-> [ColumnInfo ('Postgres pgKind)]
-> Ops
-> SubscribeOpSpec ('Postgres pgKind)
-> m ()
forall (pgKind :: PostgresKind) (m :: * -> *).
(Backend ('Postgres pgKind), MonadTx m, MonadReader SQLGenCtx m) =>
TriggerName
-> QualifiedTable
-> TriggerOnReplication
-> [ColumnInfo ('Postgres pgKind)]
-> Ops
-> SubscribeOpSpec ('Postgres pgKind)
-> m ()
mkTrigger TriggerName
triggerName QualifiedTable
table TriggerOnReplication
triggerOnReplication [ColumnInfo ('Postgres pgKind)]
allCols Ops
UPDATE)
Maybe (SubscribeOpSpec ('Postgres pgKind))
-> (SubscribeOpSpec ('Postgres pgKind) -> m ()) -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ (TriggerOpsDef ('Postgres pgKind)
-> Maybe (SubscribeOpSpec ('Postgres pgKind))
forall (b :: BackendType).
TriggerOpsDef b -> Maybe (SubscribeOpSpec b)
tdDelete TriggerOpsDef ('Postgres pgKind)
fullspec) (TriggerName
-> QualifiedTable
-> TriggerOnReplication
-> [ColumnInfo ('Postgres pgKind)]
-> Ops
-> SubscribeOpSpec ('Postgres pgKind)
-> m ()
forall (pgKind :: PostgresKind) (m :: * -> *).
(Backend ('Postgres pgKind), MonadTx m, MonadReader SQLGenCtx m) =>
TriggerName
-> QualifiedTable
-> TriggerOnReplication
-> [ColumnInfo ('Postgres pgKind)]
-> Ops
-> SubscribeOpSpec ('Postgres pgKind)
-> m ()
mkTrigger TriggerName
triggerName QualifiedTable
table TriggerOnReplication
triggerOnReplication [ColumnInfo ('Postgres pgKind)]
allCols Ops
DELETE)
addCleanupSchedules ::
(MonadIO m, MonadError QErr m) =>
PGSourceConfig ->
[(TriggerName, AutoTriggerLogCleanupConfig)] ->
m ()
addCleanupSchedules :: forall (m :: * -> *).
(MonadIO m, MonadError QErr m) =>
PGSourceConfig
-> [(TriggerName, AutoTriggerLogCleanupConfig)] -> m ()
addCleanupSchedules PGSourceConfig
sourceConfig [(TriggerName, AutoTriggerLogCleanupConfig)]
triggersWithcleanupConfig =
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless ([(TriggerName, AutoTriggerLogCleanupConfig)] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [(TriggerName, AutoTriggerLogCleanupConfig)]
triggersWithcleanupConfig) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
let triggerNames :: [TriggerName]
triggerNames = ((TriggerName, AutoTriggerLogCleanupConfig) -> TriggerName)
-> [(TriggerName, AutoTriggerLogCleanupConfig)] -> [TriggerName]
forall a b. (a -> b) -> [a] -> [b]
map (TriggerName, AutoTriggerLogCleanupConfig) -> TriggerName
forall a b. (a, b) -> a
fst [(TriggerName, AutoTriggerLogCleanupConfig)]
triggersWithcleanupConfig
[(TriggerName, Int, UTCTime)]
countAndLastSchedules <- m (Either QErr [(TriggerName, Int, UTCTime)])
-> m [(TriggerName, Int, UTCTime)]
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM (m (Either QErr [(TriggerName, Int, UTCTime)])
-> m [(TriggerName, Int, UTCTime)])
-> m (Either QErr [(TriggerName, Int, UTCTime)])
-> m [(TriggerName, Int, UTCTime)]
forall a b. (a -> b) -> a -> b
$ IO (Either QErr [(TriggerName, Int, UTCTime)])
-> m (Either QErr [(TriggerName, Int, UTCTime)])
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either QErr [(TriggerName, Int, UTCTime)])
-> m (Either QErr [(TriggerName, Int, UTCTime)]))
-> IO (Either QErr [(TriggerName, Int, UTCTime)])
-> m (Either QErr [(TriggerName, Int, UTCTime)])
forall a b. (a -> b) -> a -> b
$ PGSourceConfig
-> TxET QErr IO [(TriggerName, Int, UTCTime)]
-> IO (Either QErr [(TriggerName, Int, UTCTime)])
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> TxET QErr m a -> m (Either QErr a)
runPgSourceReadTx PGSourceConfig
sourceConfig (TxET QErr IO [(TriggerName, Int, UTCTime)]
-> IO (Either QErr [(TriggerName, Int, UTCTime)]))
-> TxET QErr IO [(TriggerName, Int, UTCTime)]
-> IO (Either QErr [(TriggerName, Int, UTCTime)])
forall a b. (a -> b) -> a -> b
$ [TriggerName] -> TxET QErr IO [(TriggerName, Int, UTCTime)]
selectLastCleanupScheduledTimestamp [TriggerName]
triggerNames
UTCTime
currTime <- IO UTCTime -> m UTCTime
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO UTCTime -> m UTCTime) -> IO UTCTime -> m UTCTime
forall a b. (a -> b) -> a -> b
$ IO UTCTime
Time.getCurrentTime
let triggerMap :: HashMap TriggerName (Int, UTCTime)
triggerMap = [(TriggerName, (Int, UTCTime))]
-> HashMap TriggerName (Int, UTCTime)
forall k v. (Eq k, Hashable k) => [(k, v)] -> HashMap k v
HashMap.fromList ([(TriggerName, (Int, UTCTime))]
-> HashMap TriggerName (Int, UTCTime))
-> [(TriggerName, (Int, UTCTime))]
-> HashMap TriggerName (Int, UTCTime)
forall a b. (a -> b) -> a -> b
$ ((TriggerName, Int, UTCTime) -> (TriggerName, (Int, UTCTime)))
-> [(TriggerName, Int, UTCTime)] -> [(TriggerName, (Int, UTCTime))]
forall a b. (a -> b) -> [a] -> [b]
map (\(TriggerName
triggerName, Int
count, UTCTime
lastTime) -> (TriggerName
triggerName, (Int
count, UTCTime
lastTime))) [(TriggerName, Int, UTCTime)]
countAndLastSchedules
scheduledTriggersAndTimestamps :: [(TriggerName, [UTCTime])]
scheduledTriggersAndTimestamps =
((TriggerName, AutoTriggerLogCleanupConfig)
-> Maybe (TriggerName, [UTCTime]))
-> [(TriggerName, AutoTriggerLogCleanupConfig)]
-> [(TriggerName, [UTCTime])]
forall a b. (a -> Maybe b) -> [a] -> [b]
forall (f :: * -> *) a b.
Filterable f =>
(a -> Maybe b) -> f a -> f b
mapMaybe
( \(TriggerName
triggerName, AutoTriggerLogCleanupConfig
cleanupConfig) ->
let lastScheduledTime :: Maybe UTCTime
lastScheduledTime = case TriggerName
-> HashMap TriggerName (Int, UTCTime) -> Maybe (Int, UTCTime)
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HashMap.lookup TriggerName
triggerName HashMap TriggerName (Int, UTCTime)
triggerMap of
Maybe (Int, UTCTime)
Nothing -> UTCTime -> Maybe UTCTime
forall a. a -> Maybe a
Just UTCTime
currTime
Just (Int
count, UTCTime
lastTime) -> if Int
count Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
5 then (UTCTime -> Maybe UTCTime
forall a. a -> Maybe a
Just UTCTime
lastTime) else Maybe UTCTime
forall a. Maybe a
Nothing
in (UTCTime -> (TriggerName, [UTCTime]))
-> Maybe UTCTime -> Maybe (TriggerName, [UTCTime])
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap
( \UTCTime
lastScheduledTimestamp ->
(TriggerName
triggerName, UTCTime -> Int -> CronSchedule -> [UTCTime]
generateScheduleTimes UTCTime
lastScheduledTimestamp Int
cleanupSchedulesToBeGenerated (AutoTriggerLogCleanupConfig -> CronSchedule
_atlccSchedule AutoTriggerLogCleanupConfig
cleanupConfig))
)
Maybe UTCTime
lastScheduledTime
)
[(TriggerName, AutoTriggerLogCleanupConfig)]
triggersWithcleanupConfig
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless ([(TriggerName, [UTCTime])] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [(TriggerName, [UTCTime])]
scheduledTriggersAndTimestamps)
(m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ m (Either QErr ()) -> m ()
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM
(m (Either QErr ()) -> m ()) -> m (Either QErr ()) -> m ()
forall a b. (a -> b) -> a -> b
$ IO (Either QErr ()) -> m (Either QErr ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
(IO (Either QErr ()) -> m (Either QErr ()))
-> IO (Either QErr ()) -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$ PGSourceConfig
-> PGExecFrom -> TxET QErr IO () -> IO (Either QErr ())
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> PGExecFrom -> TxET QErr m a -> m (Either QErr a)
runPgSourceWriteTx PGSourceConfig
sourceConfig PGExecFrom
InternalRawQuery
(TxET QErr IO () -> IO (Either QErr ()))
-> TxET QErr IO () -> IO (Either QErr ())
forall a b. (a -> b) -> a -> b
$ [(TriggerName, [UTCTime])] -> TxET QErr IO ()
insertEventTriggerCleanupLogsTx [(TriggerName, [UTCTime])]
scheduledTriggersAndTimestamps
insertEventTriggerCleanupLogsTx :: [(TriggerName, [Time.UTCTime])] -> PG.TxET QErr IO ()
insertEventTriggerCleanupLogsTx :: [(TriggerName, [UTCTime])] -> TxET QErr IO ()
insertEventTriggerCleanupLogsTx [(TriggerName, [UTCTime])]
triggersWithschedules = do
let insertCleanupEventsSql :: Text
insertCleanupEventsSql =
Builder -> Text
TB.run
(Builder -> Text) -> Builder -> Text
forall a b. (a -> b) -> a -> b
$ SQLInsert -> Builder
forall a. ToSQL a => a -> Builder
toSQL
S.SQLInsert
{ siTable :: QualifiedTable
siTable = QualifiedTable
cleanupLogTable,
siCols :: [PGCol]
siCols = (Text -> PGCol) -> [Text] -> [PGCol]
forall a b. (a -> b) -> [a] -> [b]
map Text -> PGCol
unsafePGCol [Text
"trigger_name", Text
"scheduled_at", Text
"status"],
siValues :: ValuesExp
siValues = [TupleExp] -> ValuesExp
S.ValuesExp ([TupleExp] -> ValuesExp) -> [TupleExp] -> ValuesExp
forall a b. (a -> b) -> a -> b
$ ((TriggerName, [UTCTime]) -> [TupleExp])
-> [(TriggerName, [UTCTime])] -> [TupleExp]
forall (t :: * -> *) a b. Foldable t => (a -> [b]) -> t a -> [b]
concatMap (TriggerName, [UTCTime]) -> [TupleExp]
genArr [(TriggerName, [UTCTime])]
triggersWithschedules,
siConflict :: Maybe SQLConflict
siConflict = SQLConflict -> Maybe SQLConflict
forall a. a -> Maybe a
Just (SQLConflict -> Maybe SQLConflict)
-> SQLConflict -> Maybe SQLConflict
forall a b. (a -> b) -> a -> b
$ Maybe SQLConflictTarget -> SQLConflict
S.DoNothing Maybe SQLConflictTarget
forall a. Maybe a
Nothing,
siRet :: Maybe RetExp
siRet = Maybe RetExp
forall a. Maybe a
Nothing
}
(PGTxErr -> QErr) -> Query -> () -> Bool -> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE PGTxErr -> QErr
defaultTxErrorHandler (Text -> Query
PG.fromText Text
insertCleanupEventsSql) () Bool
False
where
cleanupLogTable :: QualifiedTable
cleanupLogTable = SchemaName -> TableName -> QualifiedTable
forall a. SchemaName -> a -> QualifiedObject a
QualifiedObject SchemaName
"hdb_catalog" TableName
"hdb_event_log_cleanups"
genArr :: (TriggerName, [UTCTime]) -> [TupleExp]
genArr (TriggerName
t, [UTCTime]
schedules) = (UTCTime -> TupleExp) -> [UTCTime] -> [TupleExp]
forall a b. (a -> b) -> [a] -> [b]
map ([Text] -> TupleExp
toTupleExp ([Text] -> TupleExp) -> (UTCTime -> [Text]) -> UTCTime -> TupleExp
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (\UTCTime
s -> [(TriggerName -> Text
triggerNameToTxt TriggerName
t), (UTCTime -> Text
formatTime' UTCTime
s), Text
"scheduled"])) [UTCTime]
schedules
toTupleExp :: [Text] -> TupleExp
toTupleExp = [SQLExp] -> TupleExp
S.TupleExp ([SQLExp] -> TupleExp)
-> ([Text] -> [SQLExp]) -> [Text] -> TupleExp
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Text -> SQLExp) -> [Text] -> [SQLExp]
forall a b. (a -> b) -> [a] -> [b]
map Text -> SQLExp
S.SELit
selectLastCleanupScheduledTimestamp :: [TriggerName] -> PG.TxET QErr IO [(TriggerName, Int, Time.UTCTime)]
selectLastCleanupScheduledTimestamp :: [TriggerName] -> TxET QErr IO [(TriggerName, Int, UTCTime)]
selectLastCleanupScheduledTimestamp [TriggerName]
triggerNames =
(PGTxErr -> QErr)
-> Query
-> Identity PGTextArray
-> Bool
-> TxET QErr IO [(TriggerName, Int, UTCTime)]
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
PG.withQE
PGTxErr -> QErr
defaultTxErrorHandler
[PG.sql|
SELECT trigger_name, count(1), max(scheduled_at)
FROM hdb_catalog.hdb_event_log_cleanups
WHERE status='scheduled' AND trigger_name = ANY($1::text[])
GROUP BY trigger_name
|]
(PGTextArray -> Identity PGTextArray
forall a. a -> Identity a
Identity (PGTextArray -> Identity PGTextArray)
-> PGTextArray -> Identity PGTextArray
forall a b. (a -> b) -> a -> b
$ [Text] -> PGTextArray
PGTextArray ([Text] -> PGTextArray) -> [Text] -> PGTextArray
forall a b. (a -> b) -> a -> b
$ (TriggerName -> Text) -> [TriggerName] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map TriggerName -> Text
triggerNameToTxt [TriggerName]
triggerNames)
Bool
True
deleteAllScheduledCleanupsTx :: TriggerName -> PG.TxE QErr ()
deleteAllScheduledCleanupsTx :: TriggerName -> TxET QErr IO ()
deleteAllScheduledCleanupsTx TriggerName
triggerName = do
(PGTxErr -> QErr)
-> Query -> Identity Text -> Bool -> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE
PGTxErr -> QErr
defaultTxErrorHandler
[PG.sql|
DELETE from hdb_catalog.hdb_event_log_cleanups
WHERE (status = 'scheduled') AND (trigger_name = $1)
|]
(Text -> Identity Text
forall a. a -> Identity a
Identity (TriggerName -> Text
triggerNameToTxt TriggerName
triggerName))
Bool
True
deleteAllScheduledCleanups ::
(MonadIO m, MonadError QErr m) =>
PGSourceConfig ->
TriggerName ->
m ()
deleteAllScheduledCleanups :: forall (m :: * -> *).
(MonadIO m, MonadError QErr m) =>
PGSourceConfig -> TriggerName -> m ()
deleteAllScheduledCleanups PGSourceConfig
sourceConfig TriggerName
triggerName =
m (Either QErr ()) -> m ()
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM (m (Either QErr ()) -> m ()) -> m (Either QErr ()) -> m ()
forall a b. (a -> b) -> a -> b
$ IO (Either QErr ()) -> m (Either QErr ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either QErr ()) -> m (Either QErr ()))
-> IO (Either QErr ()) -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$ PGSourceConfig
-> PGExecFrom -> TxET QErr IO () -> IO (Either QErr ())
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> PGExecFrom -> TxET QErr m a -> m (Either QErr a)
runPgSourceWriteTx PGSourceConfig
sourceConfig PGExecFrom
InternalRawQuery (TxET QErr IO () -> IO (Either QErr ()))
-> TxET QErr IO () -> IO (Either QErr ())
forall a b. (a -> b) -> a -> b
$ TriggerName -> TxET QErr IO ()
deleteAllScheduledCleanupsTx TriggerName
triggerName
getCleanupEventsForDeletionTx :: PG.TxE QErr ([(Text, TriggerName)])
getCleanupEventsForDeletionTx :: TxE QErr [(Text, TriggerName)]
getCleanupEventsForDeletionTx =
(PGTxErr -> QErr)
-> Query -> () -> Bool -> TxE QErr [(Text, TriggerName)]
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
PG.withQE
PGTxErr -> QErr
defaultTxErrorHandler
[PG.sql|
WITH latest_events as (
SELECT * from hdb_catalog.hdb_event_log_cleanups WHERE status = 'scheduled' AND scheduled_at < (now() at time zone 'utc')
),
grouped_events as (
SELECT trigger_name, max(scheduled_at) as scheduled_at
from latest_events
group by trigger_name
),
mark_events_as_dead as (
UPDATE hdb_catalog.hdb_event_log_cleanups l
SET status = 'dead'
FROM grouped_events AS g
WHERE l.trigger_name = g.trigger_name AND l.scheduled_at < g.scheduled_at AND l.status = 'scheduled'
)
SELECT l.id, l.trigger_name
FROM latest_events l
JOIN grouped_events g ON l.trigger_name = g.trigger_name
WHERE l.scheduled_at = g.scheduled_at;
|]
()
Bool
False
getCleanupEventsForDeletion ::
(MonadIO m, MonadError QErr m) =>
PGSourceConfig ->
m [(Text, TriggerName)]
getCleanupEventsForDeletion :: forall (m :: * -> *).
(MonadIO m, MonadError QErr m) =>
PGSourceConfig -> m [(Text, TriggerName)]
getCleanupEventsForDeletion PGSourceConfig
sourceConfig =
m (Either QErr [(Text, TriggerName)]) -> m [(Text, TriggerName)]
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM (m (Either QErr [(Text, TriggerName)]) -> m [(Text, TriggerName)])
-> m (Either QErr [(Text, TriggerName)]) -> m [(Text, TriggerName)]
forall a b. (a -> b) -> a -> b
$ IO (Either QErr [(Text, TriggerName)])
-> m (Either QErr [(Text, TriggerName)])
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either QErr [(Text, TriggerName)])
-> m (Either QErr [(Text, TriggerName)]))
-> IO (Either QErr [(Text, TriggerName)])
-> m (Either QErr [(Text, TriggerName)])
forall a b. (a -> b) -> a -> b
$ PGSourceConfig
-> PGExecFrom
-> TxE QErr [(Text, TriggerName)]
-> IO (Either QErr [(Text, TriggerName)])
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> PGExecFrom -> TxET QErr m a -> m (Either QErr a)
runPgSourceWriteTx PGSourceConfig
sourceConfig PGExecFrom
InternalRawQuery (TxE QErr [(Text, TriggerName)]
-> IO (Either QErr [(Text, TriggerName)]))
-> TxE QErr [(Text, TriggerName)]
-> IO (Either QErr [(Text, TriggerName)])
forall a b. (a -> b) -> a -> b
$ TxE QErr [(Text, TriggerName)]
getCleanupEventsForDeletionTx
markCleanupEventsAsDeadTx :: [Text] -> PG.TxE QErr ()
markCleanupEventsAsDeadTx :: [Text] -> TxET QErr IO ()
markCleanupEventsAsDeadTx [Text]
toDeadEvents = do
Bool -> TxET QErr IO () -> TxET QErr IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless ([Text] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [Text]
toDeadEvents)
(TxET QErr IO () -> TxET QErr IO ())
-> TxET QErr IO () -> TxET QErr IO ()
forall a b. (a -> b) -> a -> b
$ (PGTxErr -> QErr)
-> Query -> Identity PGTextArray -> Bool -> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE
PGTxErr -> QErr
defaultTxErrorHandler
[PG.sql|
UPDATE hdb_catalog.hdb_event_log_cleanups l
SET status = 'dead'
WHERE id = ANY($1::text[])
|]
(PGTextArray -> Identity PGTextArray
forall a. a -> Identity a
Identity (PGTextArray -> Identity PGTextArray)
-> PGTextArray -> Identity PGTextArray
forall a b. (a -> b) -> a -> b
$ [Text] -> PGTextArray
PGTextArray [Text]
toDeadEvents)
Bool
True
updateCleanupEventStatusToDead ::
(MonadIO m, MonadError QErr m) =>
PGSourceConfig ->
[Text] ->
m ()
updateCleanupEventStatusToDead :: forall (m :: * -> *).
(MonadIO m, MonadError QErr m) =>
PGSourceConfig -> [Text] -> m ()
updateCleanupEventStatusToDead PGSourceConfig
sourceConfig [Text]
toDeadEvents =
m (Either QErr ()) -> m ()
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM (m (Either QErr ()) -> m ()) -> m (Either QErr ()) -> m ()
forall a b. (a -> b) -> a -> b
$ IO (Either QErr ()) -> m (Either QErr ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either QErr ()) -> m (Either QErr ()))
-> IO (Either QErr ()) -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$ PGSourceConfig
-> PGExecFrom -> TxET QErr IO () -> IO (Either QErr ())
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> PGExecFrom -> TxET QErr m a -> m (Either QErr a)
runPgSourceWriteTx PGSourceConfig
sourceConfig PGExecFrom
InternalRawQuery (TxET QErr IO () -> IO (Either QErr ()))
-> TxET QErr IO () -> IO (Either QErr ())
forall a b. (a -> b) -> a -> b
$ [Text] -> TxET QErr IO ()
markCleanupEventsAsDeadTx [Text]
toDeadEvents
updateCleanupEventStatusToPausedTx :: Text -> PG.TxE QErr ()
updateCleanupEventStatusToPausedTx :: Text -> TxET QErr IO ()
updateCleanupEventStatusToPausedTx Text
cleanupLogId =
(PGTxErr -> QErr)
-> Query -> Identity Text -> Bool -> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE
PGTxErr -> QErr
defaultTxErrorHandler
[PG.sql|
UPDATE hdb_catalog.hdb_event_log_cleanups
SET status = 'paused'
WHERE id = $1
|]
(Text -> Identity Text
forall a. a -> Identity a
Identity Text
cleanupLogId)
Bool
True
updateCleanupEventStatusToPaused ::
(MonadIO m, MonadError QErr m) =>
PGSourceConfig ->
Text ->
m ()
updateCleanupEventStatusToPaused :: forall (m :: * -> *).
(MonadIO m, MonadError QErr m) =>
PGSourceConfig -> Text -> m ()
updateCleanupEventStatusToPaused PGSourceConfig
sourceConfig Text
cleanupLogId =
m (Either QErr ()) -> m ()
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM (m (Either QErr ()) -> m ()) -> m (Either QErr ()) -> m ()
forall a b. (a -> b) -> a -> b
$ IO (Either QErr ()) -> m (Either QErr ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either QErr ()) -> m (Either QErr ()))
-> IO (Either QErr ()) -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$ PGSourceConfig
-> PGExecFrom -> TxET QErr IO () -> IO (Either QErr ())
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> PGExecFrom -> TxET QErr m a -> m (Either QErr a)
runPgSourceWriteTx PGSourceConfig
sourceConfig PGExecFrom
InternalRawQuery (TxET QErr IO () -> IO (Either QErr ()))
-> TxET QErr IO () -> IO (Either QErr ())
forall a b. (a -> b) -> a -> b
$ Text -> TxET QErr IO ()
updateCleanupEventStatusToPausedTx Text
cleanupLogId
updateCleanupEventStatusToCompletedTx :: Text -> DeletedEventLogStats -> PG.TxE QErr ()
updateCleanupEventStatusToCompletedTx :: Text -> DeletedEventLogStats -> TxET QErr IO ()
updateCleanupEventStatusToCompletedTx Text
cleanupLogId (DeletedEventLogStats Int
numEventLogs Int
numInvocationLogs) =
(PGTxErr -> QErr)
-> Query -> (Text, Int64, Int64) -> Bool -> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE
PGTxErr -> QErr
defaultTxErrorHandler
[PG.sql|
UPDATE hdb_catalog.hdb_event_log_cleanups
SET status = 'completed', deleted_event_logs = $2 , deleted_event_invocation_logs = $3
WHERE id = $1
|]
(Text
cleanupLogId, Int64
delLogs, Int64
delInvLogs)
Bool
True
where
delLogs :: Int64
delLogs = (Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Int64) -> Int -> Int64
forall a b. (a -> b) -> a -> b
$ Int
numEventLogs) :: Int64
delInvLogs :: Int64
delInvLogs = (Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Int64) -> Int -> Int64
forall a b. (a -> b) -> a -> b
$ Int
numInvocationLogs) :: Int64
updateCleanupEventStatusToCompleted ::
(MonadIO m, MonadError QErr m) =>
PGSourceConfig ->
Text ->
DeletedEventLogStats ->
m ()
updateCleanupEventStatusToCompleted :: forall (m :: * -> *).
(MonadIO m, MonadError QErr m) =>
PGSourceConfig -> Text -> DeletedEventLogStats -> m ()
updateCleanupEventStatusToCompleted PGSourceConfig
sourceConfig Text
cleanupLogId DeletedEventLogStats
delStats =
m (Either QErr ()) -> m ()
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM (m (Either QErr ()) -> m ()) -> m (Either QErr ()) -> m ()
forall a b. (a -> b) -> a -> b
$ IO (Either QErr ()) -> m (Either QErr ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either QErr ()) -> m (Either QErr ()))
-> IO (Either QErr ()) -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$ PGSourceConfig
-> PGExecFrom -> TxET QErr IO () -> IO (Either QErr ())
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> PGExecFrom -> TxET QErr m a -> m (Either QErr a)
runPgSourceWriteTx PGSourceConfig
sourceConfig PGExecFrom
InternalRawQuery (TxET QErr IO () -> IO (Either QErr ()))
-> TxET QErr IO () -> IO (Either QErr ())
forall a b. (a -> b) -> a -> b
$ Text -> DeletedEventLogStats -> TxET QErr IO ()
updateCleanupEventStatusToCompletedTx Text
cleanupLogId DeletedEventLogStats
delStats
deleteEventTriggerLogsTx :: TriggerLogCleanupConfig -> PG.TxE QErr DeletedEventLogStats
deleteEventTriggerLogsTx :: TriggerLogCleanupConfig -> TxE QErr DeletedEventLogStats
deleteEventTriggerLogsTx TriggerLogCleanupConfig {Bool
Int
SourceName
TriggerName
tlccEventTriggerName :: TriggerName
tlccSourceName :: SourceName
tlccBatchSize :: Int
tlccClearOlderThan :: Int
tlccTimeout :: Int
tlccCleanInvocationLogs :: Bool
tlccEventTriggerName :: TriggerLogCleanupConfig -> TriggerName
tlccSourceName :: TriggerLogCleanupConfig -> SourceName
tlccBatchSize :: TriggerLogCleanupConfig -> Int
tlccClearOlderThan :: TriggerLogCleanupConfig -> Int
tlccTimeout :: TriggerLogCleanupConfig -> Int
tlccCleanInvocationLogs :: TriggerLogCleanupConfig -> Bool
..} = do
(PGTxErr -> QErr) -> Query -> () -> Bool -> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE PGTxErr -> QErr
defaultTxErrorHandler (Text -> Query
PG.fromText (Text -> Query) -> Text -> Query
forall a b. (a -> b) -> a -> b
$ Text
"SET statement_timeout = " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> (Int64 -> Text
forall a. Show a => a -> Text
tshow Int64
qTimeout)) () Bool
True
[EventId]
deadEventIDs <-
(Identity EventId -> EventId) -> [Identity EventId] -> [EventId]
forall a b. (a -> b) -> [a] -> [b]
map Identity EventId -> EventId
forall a. Identity a -> a
runIdentity
([Identity EventId] -> [EventId])
-> TxET QErr IO [Identity EventId] -> TxET QErr IO [EventId]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> (Text, Int64)
-> Bool
-> TxET QErr IO [Identity EventId]
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
PG.withQE
PGTxErr -> QErr
defaultTxErrorHandler
( Text -> Query
PG.fromText
[ST.st|
SELECT id FROM hdb_catalog.event_log
WHERE ((delivered = true OR error = true) AND trigger_name = $1)
AND created_at < now() - interval '#{qRetentionPeriod}'
AND locked IS NULL
LIMIT $2
|]
)
(Text
qTriggerName, Int64
qBatchSize)
Bool
True
(PGTxErr -> QErr)
-> Query -> Identity PGTextArray -> Bool -> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE
PGTxErr -> QErr
defaultTxErrorHandler
[PG.sql|
UPDATE hdb_catalog.event_log
SET locked = now()
WHERE id = ANY($1::text[]);
|]
(PGTextArray -> Identity PGTextArray
forall a. a -> Identity a
Identity (PGTextArray -> Identity PGTextArray)
-> PGTextArray -> Identity PGTextArray
forall a b. (a -> b) -> a -> b
$ [Text] -> PGTextArray
PGTextArray ([Text] -> PGTextArray) -> [Text] -> PGTextArray
forall a b. (a -> b) -> a -> b
$ (EventId -> Text) -> [EventId] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map EventId -> Text
unEventId [EventId]
deadEventIDs)
Bool
True
Int
deletedInvocationLogs <-
if Bool
tlccCleanInvocationLogs
then
Identity Int -> Int
forall a. Identity a -> a
runIdentity
(Identity Int -> Int)
-> (SingleRow (Identity Int) -> Identity Int)
-> SingleRow (Identity Int)
-> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SingleRow (Identity Int) -> Identity Int
forall a. SingleRow a -> a
PG.getRow
(SingleRow (Identity Int) -> Int)
-> TxET QErr IO (SingleRow (Identity Int)) -> TxET QErr IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> Identity PGTextArray
-> Bool
-> TxET QErr IO (SingleRow (Identity Int))
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
PG.withQE
PGTxErr -> QErr
defaultTxErrorHandler
[PG.sql|
WITH deletedInvocations AS (
DELETE FROM hdb_catalog.event_invocation_logs
WHERE event_id = ANY($1::text[])
RETURNING 1
)
SELECT count(*) FROM deletedInvocations;
|]
(PGTextArray -> Identity PGTextArray
forall a. a -> Identity a
Identity (PGTextArray -> Identity PGTextArray)
-> PGTextArray -> Identity PGTextArray
forall a b. (a -> b) -> a -> b
$ [Text] -> PGTextArray
PGTextArray ([Text] -> PGTextArray) -> [Text] -> PGTextArray
forall a b. (a -> b) -> a -> b
$ (EventId -> Text) -> [EventId] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map EventId -> Text
unEventId [EventId]
deadEventIDs)
Bool
True
else do
(PGTxErr -> QErr)
-> Query -> (PGTextArray, Text) -> Bool -> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE
PGTxErr -> QErr
defaultTxErrorHandler
[PG.sql|
UPDATE hdb_catalog.event_invocation_logs
SET trigger_name = $2
WHERE event_id = ANY($1::text[])
|]
([Text] -> PGTextArray
PGTextArray ([Text] -> PGTextArray) -> [Text] -> PGTextArray
forall a b. (a -> b) -> a -> b
$ (EventId -> Text) -> [EventId] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map EventId -> Text
unEventId [EventId]
deadEventIDs, Text
qTriggerName)
Bool
True
Int -> TxET QErr IO Int
forall a. a -> TxET QErr IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int
0
Int
deletedEventLogs <-
Identity Int -> Int
forall a. Identity a -> a
runIdentity
(Identity Int -> Int)
-> (SingleRow (Identity Int) -> Identity Int)
-> SingleRow (Identity Int)
-> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SingleRow (Identity Int) -> Identity Int
forall a. SingleRow a -> a
PG.getRow
(SingleRow (Identity Int) -> Int)
-> TxET QErr IO (SingleRow (Identity Int)) -> TxET QErr IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> Identity PGTextArray
-> Bool
-> TxET QErr IO (SingleRow (Identity Int))
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
PG.withQE
PGTxErr -> QErr
defaultTxErrorHandler
[PG.sql|
WITH deletedEvents AS (
DELETE FROM hdb_catalog.event_log
WHERE id = ANY($1::text[])
RETURNING 1
)
SELECT count(*) FROM deletedEvents;
|]
(PGTextArray -> Identity PGTextArray
forall a. a -> Identity a
Identity (PGTextArray -> Identity PGTextArray)
-> PGTextArray -> Identity PGTextArray
forall a b. (a -> b) -> a -> b
$ [Text] -> PGTextArray
PGTextArray ([Text] -> PGTextArray) -> [Text] -> PGTextArray
forall a b. (a -> b) -> a -> b
$ (EventId -> Text) -> [EventId] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map EventId -> Text
unEventId [EventId]
deadEventIDs)
Bool
True
(PGTxErr -> QErr) -> Query -> () -> Bool -> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE
PGTxErr -> QErr
defaultTxErrorHandler
[PG.sql|
SET statement_timeout = 0;
|]
()
Bool
False
DeletedEventLogStats -> TxE QErr DeletedEventLogStats
forall a. a -> TxET QErr IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure DeletedEventLogStats {Int
deletedInvocationLogs :: Int
deletedEventLogs :: Int
deletedEventLogs :: Int
deletedInvocationLogs :: Int
..}
where
qTimeout :: Int64
qTimeout = (Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Int64) -> Int -> Int64
forall a b. (a -> b) -> a -> b
$ Int
tlccTimeout Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000) :: Int64
qTriggerName :: Text
qTriggerName = TriggerName -> Text
triggerNameToTxt TriggerName
tlccEventTriggerName
qRetentionPeriod :: Text
qRetentionPeriod = Int -> Text
forall a. Show a => a -> Text
tshow Int
tlccClearOlderThan Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" hours"
qBatchSize :: Int64
qBatchSize = (Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
tlccBatchSize) :: Int64
deleteEventTriggerLogs ::
(MonadIO m, MonadError QErr m) =>
PGSourceConfig ->
TriggerLogCleanupConfig ->
IO (Maybe (TriggerLogCleanupConfig, EventTriggerCleanupStatus)) ->
m DeletedEventLogStats
deleteEventTriggerLogs :: forall (m :: * -> *).
(MonadIO m, MonadError QErr m) =>
PGSourceConfig
-> TriggerLogCleanupConfig
-> IO (Maybe (TriggerLogCleanupConfig, EventTriggerCleanupStatus))
-> m DeletedEventLogStats
deleteEventTriggerLogs PGSourceConfig
sourceConfig TriggerLogCleanupConfig
oldCleanupConfig IO (Maybe (TriggerLogCleanupConfig, EventTriggerCleanupStatus))
getLatestCleanupConfig = do
IO (Maybe (TriggerLogCleanupConfig, EventTriggerCleanupStatus))
-> TriggerLogCleanupConfig
-> (TriggerLogCleanupConfig
-> IO (Either QErr DeletedEventLogStats))
-> m DeletedEventLogStats
forall (m :: * -> *).
(MonadIO m, MonadError QErr m) =>
IO (Maybe (TriggerLogCleanupConfig, EventTriggerCleanupStatus))
-> TriggerLogCleanupConfig
-> (TriggerLogCleanupConfig
-> IO (Either QErr DeletedEventLogStats))
-> m DeletedEventLogStats
deleteEventTriggerLogsInBatchesWith IO (Maybe (TriggerLogCleanupConfig, EventTriggerCleanupStatus))
getLatestCleanupConfig TriggerLogCleanupConfig
oldCleanupConfig ((TriggerLogCleanupConfig -> IO (Either QErr DeletedEventLogStats))
-> m DeletedEventLogStats)
-> (TriggerLogCleanupConfig
-> IO (Either QErr DeletedEventLogStats))
-> m DeletedEventLogStats
forall a b. (a -> b) -> a -> b
$ \TriggerLogCleanupConfig
cleanupConfig -> do
PGSourceConfig
-> PGExecFrom
-> TxE QErr DeletedEventLogStats
-> IO (Either QErr DeletedEventLogStats)
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> PGExecFrom -> TxET QErr m a -> m (Either QErr a)
runPgSourceWriteTx PGSourceConfig
sourceConfig PGExecFrom
InternalRawQuery (TxE QErr DeletedEventLogStats
-> IO (Either QErr DeletedEventLogStats))
-> TxE QErr DeletedEventLogStats
-> IO (Either QErr DeletedEventLogStats)
forall a b. (a -> b) -> a -> b
$ TriggerLogCleanupConfig -> TxE QErr DeletedEventLogStats
deleteEventTriggerLogsTx TriggerLogCleanupConfig
cleanupConfig
fetchEventLogs ::
(MonadError QErr m, MonadIO m) =>
PGSourceConfig ->
GetEventLogs b ->
m [EventLog]
fetchEventLogs :: forall (m :: * -> *) (b :: BackendType).
(MonadError QErr m, MonadIO m) =>
PGSourceConfig -> GetEventLogs b -> m [EventLog]
fetchEventLogs PGSourceConfig
sourceConfig GetEventLogs b
getEventLogs = do
IO (Either QErr [EventLog]) -> m (Either QErr [EventLog])
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (PGSourceConfig
-> TxET QErr IO [EventLog] -> IO (Either QErr [EventLog])
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> TxET QErr m a -> m (Either QErr a)
runPgSourceReadTx PGSourceConfig
sourceConfig (TxET QErr IO [EventLog] -> IO (Either QErr [EventLog]))
-> TxET QErr IO [EventLog] -> IO (Either QErr [EventLog])
forall a b. (a -> b) -> a -> b
$ GetEventLogs b -> TxET QErr IO [EventLog]
forall (b :: BackendType).
GetEventLogs b -> TxET QErr IO [EventLog]
fetchEventLogsTxE GetEventLogs b
getEventLogs)
m (Either QErr [EventLog])
-> (QErr -> m [EventLog]) -> m [EventLog]
forall (m :: * -> *) e a.
Monad m =>
m (Either e a) -> (e -> m a) -> m a
`onLeftM` (QErr -> m [EventLog]
forall a. QErr -> m a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (QErr -> m [EventLog]) -> (QErr -> QErr) -> QErr -> m [EventLog]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> QErr -> QErr
prefixQErr Text
"unexpected error while fetching event logs: ")
fetchEventLogsTxE :: GetEventLogs b -> PG.TxE QErr [EventLog]
fetchEventLogsTxE :: forall (b :: BackendType).
GetEventLogs b -> TxET QErr IO [EventLog]
fetchEventLogsTxE GetEventLogs {Int
SourceName
EventLogStatus
TriggerName
_gelName :: TriggerName
_gelSourceName :: SourceName
_gelLimit :: Int
_gelOffset :: Int
_gelStatus :: EventLogStatus
_gelName :: forall (b :: BackendType). GetEventLogs b -> TriggerName
_gelSourceName :: forall (b :: BackendType). GetEventLogs b -> SourceName
_gelLimit :: forall (b :: BackendType). GetEventLogs b -> Int
_gelOffset :: forall (b :: BackendType). GetEventLogs b -> Int
_gelStatus :: forall (b :: BackendType). GetEventLogs b -> EventLogStatus
..} = do
case EventLogStatus
status of
EventLogStatus
Pending -> do
((EventId, Text, Text, TriggerName, ViaJSON Value, Bool, Bool, Int,
UTCTime, Maybe UTCTime, Maybe UTCTime, Bool)
-> EventLog)
-> [(EventId, Text, Text, TriggerName, ViaJSON Value, Bool, Bool,
Int, UTCTime, Maybe UTCTime, Maybe UTCTime, Bool)]
-> [EventLog]
forall a b. (a -> b) -> [a] -> [b]
map (EventId, Text, Text, TriggerName, ViaJSON Value, Bool, Bool, Int,
UTCTime, Maybe UTCTime, Maybe UTCTime, Bool)
-> EventLog
uncurryEventLog
([(EventId, Text, Text, TriggerName, ViaJSON Value, Bool, Bool,
Int, UTCTime, Maybe UTCTime, Maybe UTCTime, Bool)]
-> [EventLog])
-> TxET
QErr
IO
[(EventId, Text, Text, TriggerName, ViaJSON Value, Bool, Bool, Int,
UTCTime, Maybe UTCTime, Maybe UTCTime, Bool)]
-> TxET QErr IO [EventLog]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> (Text, Int64, Int64)
-> Bool
-> TxET
QErr
IO
[(EventId, Text, Text, TriggerName, ViaJSON Value, Bool, Bool, Int,
UTCTime, Maybe UTCTime, Maybe UTCTime, Bool)]
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
PG.withQE
PGTxErr -> QErr
defaultTxErrorHandler
[PG.sql|
SELECT id, schema_name, table_name, trigger_name, payload, delivered, error, tries, created_at, locked, next_retry_at, archived
FROM hdb_catalog.event_log
WHERE trigger_name = $1
AND delivered=false AND error=false AND archived=false ORDER BY created_at DESC LIMIT $2 OFFSET $3;
|]
(Text
triggerName, Int64
limit, Int64
offset)
Bool
True
EventLogStatus
Processed -> do
((EventId, Text, Text, TriggerName, ViaJSON Value, Bool, Bool, Int,
UTCTime, Maybe UTCTime, Maybe UTCTime, Bool)
-> EventLog)
-> [(EventId, Text, Text, TriggerName, ViaJSON Value, Bool, Bool,
Int, UTCTime, Maybe UTCTime, Maybe UTCTime, Bool)]
-> [EventLog]
forall a b. (a -> b) -> [a] -> [b]
map (EventId, Text, Text, TriggerName, ViaJSON Value, Bool, Bool, Int,
UTCTime, Maybe UTCTime, Maybe UTCTime, Bool)
-> EventLog
uncurryEventLog
([(EventId, Text, Text, TriggerName, ViaJSON Value, Bool, Bool,
Int, UTCTime, Maybe UTCTime, Maybe UTCTime, Bool)]
-> [EventLog])
-> TxET
QErr
IO
[(EventId, Text, Text, TriggerName, ViaJSON Value, Bool, Bool, Int,
UTCTime, Maybe UTCTime, Maybe UTCTime, Bool)]
-> TxET QErr IO [EventLog]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> (Text, Int64, Int64)
-> Bool
-> TxET
QErr
IO
[(EventId, Text, Text, TriggerName, ViaJSON Value, Bool, Bool, Int,
UTCTime, Maybe UTCTime, Maybe UTCTime, Bool)]
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
PG.withQE
PGTxErr -> QErr
defaultTxErrorHandler
[PG.sql|
SELECT id, schema_name, table_name, trigger_name, payload, delivered, error, tries, created_at, locked, next_retry_at, archived
FROM hdb_catalog.event_log
WHERE trigger_name = $1
AND (delivered=true OR error=true) AND archived=false ORDER BY created_at DESC LIMIT $2 OFFSET $3;
|]
(Text
triggerName, Int64
limit, Int64
offset)
Bool
True
EventLogStatus
All -> do
((EventId, Text, Text, TriggerName, ViaJSON Value, Bool, Bool, Int,
UTCTime, Maybe UTCTime, Maybe UTCTime, Bool)
-> EventLog)
-> [(EventId, Text, Text, TriggerName, ViaJSON Value, Bool, Bool,
Int, UTCTime, Maybe UTCTime, Maybe UTCTime, Bool)]
-> [EventLog]
forall a b. (a -> b) -> [a] -> [b]
map (EventId, Text, Text, TriggerName, ViaJSON Value, Bool, Bool, Int,
UTCTime, Maybe UTCTime, Maybe UTCTime, Bool)
-> EventLog
uncurryEventLog
([(EventId, Text, Text, TriggerName, ViaJSON Value, Bool, Bool,
Int, UTCTime, Maybe UTCTime, Maybe UTCTime, Bool)]
-> [EventLog])
-> TxET
QErr
IO
[(EventId, Text, Text, TriggerName, ViaJSON Value, Bool, Bool, Int,
UTCTime, Maybe UTCTime, Maybe UTCTime, Bool)]
-> TxET QErr IO [EventLog]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> (Text, Int64, Int64)
-> Bool
-> TxET
QErr
IO
[(EventId, Text, Text, TriggerName, ViaJSON Value, Bool, Bool, Int,
UTCTime, Maybe UTCTime, Maybe UTCTime, Bool)]
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
PG.withQE
PGTxErr -> QErr
defaultTxErrorHandler
[PG.sql|
SELECT id, schema_name, table_name, trigger_name, payload, delivered, error, tries, created_at, locked, next_retry_at, archived
FROM hdb_catalog.event_log
WHERE trigger_name = $1
ORDER BY created_at DESC LIMIT $2 OFFSET $3;
|]
(Text
triggerName, Int64
limit, Int64
offset)
Bool
True
where
triggerName :: Text
triggerName = TriggerName -> Text
triggerNameToTxt TriggerName
_gelName
status :: EventLogStatus
status = EventLogStatus
_gelStatus
Int64
limit :: Int64 = Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Int64) -> Int -> Int64
forall a b. (a -> b) -> a -> b
$ Int
_gelLimit
Int64
offset :: Int64 = Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Int64) -> Int -> Int64
forall a b. (a -> b) -> a -> b
$ Int
_gelOffset
fetchEventInvocationLogs ::
(MonadError QErr m, MonadIO m) =>
PGSourceConfig ->
GetEventInvocations b ->
m [EventInvocationLog]
fetchEventInvocationLogs :: forall (m :: * -> *) (b :: BackendType).
(MonadError QErr m, MonadIO m) =>
PGSourceConfig -> GetEventInvocations b -> m [EventInvocationLog]
fetchEventInvocationLogs PGSourceConfig
sourceConfig GetEventInvocations b
getEventInvocationLogs = do
IO (Either QErr [EventInvocationLog])
-> m (Either QErr [EventInvocationLog])
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (PGSourceConfig
-> TxET QErr IO [EventInvocationLog]
-> IO (Either QErr [EventInvocationLog])
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> TxET QErr m a -> m (Either QErr a)
runPgSourceReadTx PGSourceConfig
sourceConfig (TxET QErr IO [EventInvocationLog]
-> IO (Either QErr [EventInvocationLog]))
-> TxET QErr IO [EventInvocationLog]
-> IO (Either QErr [EventInvocationLog])
forall a b. (a -> b) -> a -> b
$ GetEventInvocations b -> TxET QErr IO [EventInvocationLog]
forall (b :: BackendType).
GetEventInvocations b -> TxET QErr IO [EventInvocationLog]
fetchEventInvocationLogsTxE GetEventInvocations b
getEventInvocationLogs)
m (Either QErr [EventInvocationLog])
-> (QErr -> m [EventInvocationLog]) -> m [EventInvocationLog]
forall (m :: * -> *) e a.
Monad m =>
m (Either e a) -> (e -> m a) -> m a
`onLeftM` (QErr -> m [EventInvocationLog]
forall a. QErr -> m a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (QErr -> m [EventInvocationLog])
-> (QErr -> QErr) -> QErr -> m [EventInvocationLog]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> QErr -> QErr
prefixQErr Text
"unexpected error while fetching invocation logs: ")
fetchEventInvocationLogsTxE :: GetEventInvocations b -> PG.TxE QErr [EventInvocationLog]
fetchEventInvocationLogsTxE :: forall (b :: BackendType).
GetEventInvocations b -> TxET QErr IO [EventInvocationLog]
fetchEventInvocationLogsTxE GetEventInvocations {Int
SourceName
TriggerName
_geiName :: TriggerName
_geiSourceName :: SourceName
_geiLimit :: Int
_geiOffset :: Int
_geiName :: forall (b :: BackendType). GetEventInvocations b -> TriggerName
_geiSourceName :: forall (b :: BackendType). GetEventInvocations b -> SourceName
_geiLimit :: forall (b :: BackendType). GetEventInvocations b -> Int
_geiOffset :: forall (b :: BackendType). GetEventInvocations b -> Int
..} = do
((Text, TriggerName, EventId, Maybe Int, ViaJSON Value,
ViaJSON Value, UTCTime)
-> EventInvocationLog)
-> [(Text, TriggerName, EventId, Maybe Int, ViaJSON Value,
ViaJSON Value, UTCTime)]
-> [EventInvocationLog]
forall a b. (a -> b) -> [a] -> [b]
map (Text, TriggerName, EventId, Maybe Int, ViaJSON Value,
ViaJSON Value, UTCTime)
-> EventInvocationLog
uncurryEventInvocationLog
([(Text, TriggerName, EventId, Maybe Int, ViaJSON Value,
ViaJSON Value, UTCTime)]
-> [EventInvocationLog])
-> TxET
QErr
IO
[(Text, TriggerName, EventId, Maybe Int, ViaJSON Value,
ViaJSON Value, UTCTime)]
-> TxET QErr IO [EventInvocationLog]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> (Text, Int64, Int64)
-> Bool
-> TxET
QErr
IO
[(Text, TriggerName, EventId, Maybe Int, ViaJSON Value,
ViaJSON Value, UTCTime)]
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
PG.withQE
PGTxErr -> QErr
defaultTxErrorHandler
[PG.sql|
SELECT id, trigger_name, event_id, status, request, response, created_at
FROM hdb_catalog.event_invocation_logs
WHERE trigger_name = $1
ORDER BY created_at DESC LIMIT $2 OFFSET $3;
|]
(Text
triggerName, Int64
limit, Int64
offset)
Bool
True
where
triggerName :: Text
triggerName = TriggerName -> Text
triggerNameToTxt TriggerName
_geiName
Int64
limit :: Int64 = Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Int64) -> Int -> Int64
forall a b. (a -> b) -> a -> b
$ Int
_geiLimit
Int64
offset :: Int64 = Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Int64) -> Int -> Int64
forall a b. (a -> b) -> a -> b
$ Int
_geiOffset
fetchEventById ::
(MonadError QErr m, MonadIO m) =>
PGSourceConfig ->
GetEventById b ->
m (EventLogWithInvocations)
fetchEventById :: forall (m :: * -> *) (b :: BackendType).
(MonadError QErr m, MonadIO m) =>
PGSourceConfig -> GetEventById b -> m EventLogWithInvocations
fetchEventById PGSourceConfig
sourceConfig GetEventById b
getEventById = do
Either QErr EventLogWithInvocations
fetchEventByIdTxE' <- IO (Either QErr EventLogWithInvocations)
-> m (Either QErr EventLogWithInvocations)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either QErr EventLogWithInvocations)
-> m (Either QErr EventLogWithInvocations))
-> IO (Either QErr EventLogWithInvocations)
-> m (Either QErr EventLogWithInvocations)
forall a b. (a -> b) -> a -> b
$ PGSourceConfig
-> TxET QErr IO EventLogWithInvocations
-> IO (Either QErr EventLogWithInvocations)
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> TxET QErr m a -> m (Either QErr a)
runPgSourceReadTx PGSourceConfig
sourceConfig (TxET QErr IO EventLogWithInvocations
-> IO (Either QErr EventLogWithInvocations))
-> TxET QErr IO EventLogWithInvocations
-> IO (Either QErr EventLogWithInvocations)
forall a b. (a -> b) -> a -> b
$ GetEventById b -> TxET QErr IO EventLogWithInvocations
forall (b :: BackendType).
GetEventById b -> TxET QErr IO EventLogWithInvocations
fetchEventByIdTxE GetEventById b
getEventById
case Either QErr EventLogWithInvocations
fetchEventByIdTxE' of
Left QErr
err ->
QErr -> m EventLogWithInvocations
forall a. QErr -> m a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError
(QErr -> m EventLogWithInvocations)
-> QErr -> m EventLogWithInvocations
forall a b. (a -> b) -> a -> b
$ Text -> QErr -> QErr
prefixQErr (Text
"unexpected error while fetching event with id " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
eventId Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
": ") QErr
err
Right EventLogWithInvocations
eventLogWithInvocations -> do
if Maybe EventLog -> Bool
forall a. Maybe a -> Bool
isNothing (EventLogWithInvocations -> Maybe EventLog
elwiEvent EventLogWithInvocations
eventLogWithInvocations)
then Code -> Text -> m EventLogWithInvocations
forall (m :: * -> *) a. QErrM m => Code -> Text -> m a
throw400 Code
NotExists Text
errMsg
else EventLogWithInvocations -> m EventLogWithInvocations
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return EventLogWithInvocations
eventLogWithInvocations
where
eventId :: Text
eventId = EventId -> Text
unEventId (EventId -> Text) -> EventId -> Text
forall a b. (a -> b) -> a -> b
$ GetEventById b -> EventId
forall (b :: BackendType). GetEventById b -> EventId
_gebiEventId GetEventById b
getEventById
errMsg :: Text
errMsg = Text
"event id " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
eventId Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" does not exist"
fetchEventByIdTxE :: GetEventById b -> PG.TxE QErr (EventLogWithInvocations)
fetchEventByIdTxE :: forall (b :: BackendType).
GetEventById b -> TxET QErr IO EventLogWithInvocations
fetchEventByIdTxE GetEventById {Int
EventId
SourceName
_gebiEventId :: forall (b :: BackendType). GetEventById b -> EventId
_gebiSourceName :: SourceName
_gebiEventId :: EventId
_gebiInvocationLogLimit :: Int
_gebiInvocationLogOffset :: Int
_gebiSourceName :: forall (b :: BackendType). GetEventById b -> SourceName
_gebiInvocationLogLimit :: forall (b :: BackendType). GetEventById b -> Int
_gebiInvocationLogOffset :: forall (b :: BackendType). GetEventById b -> Int
..} = do
[EventLog]
events <-
((EventId, Text, Text, TriggerName, ViaJSON Value, Bool, Bool, Int,
UTCTime, Maybe UTCTime, Maybe UTCTime, Bool)
-> EventLog)
-> [(EventId, Text, Text, TriggerName, ViaJSON Value, Bool, Bool,
Int, UTCTime, Maybe UTCTime, Maybe UTCTime, Bool)]
-> [EventLog]
forall a b. (a -> b) -> [a] -> [b]
map (EventId, Text, Text, TriggerName, ViaJSON Value, Bool, Bool, Int,
UTCTime, Maybe UTCTime, Maybe UTCTime, Bool)
-> EventLog
uncurryEventLog
([(EventId, Text, Text, TriggerName, ViaJSON Value, Bool, Bool,
Int, UTCTime, Maybe UTCTime, Maybe UTCTime, Bool)]
-> [EventLog])
-> TxET
QErr
IO
[(EventId, Text, Text, TriggerName, ViaJSON Value, Bool, Bool, Int,
UTCTime, Maybe UTCTime, Maybe UTCTime, Bool)]
-> TxET QErr IO [EventLog]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> Identity Text
-> Bool
-> TxET
QErr
IO
[(EventId, Text, Text, TriggerName, ViaJSON Value, Bool, Bool, Int,
UTCTime, Maybe UTCTime, Maybe UTCTime, Bool)]
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
PG.withQE
PGTxErr -> QErr
defaultTxErrorHandler
[PG.sql|
SELECT id, schema_name, table_name, trigger_name, payload, delivered, error, tries, created_at, locked, next_retry_at, archived
FROM hdb_catalog.event_log
WHERE id = $1;
|]
(Text -> Identity Text
forall a. a -> Identity a
Identity Text
eventId)
Bool
True
case [EventLog]
events of
[] -> EventLogWithInvocations -> TxET QErr IO EventLogWithInvocations
forall a. a -> TxET QErr IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (EventLogWithInvocations -> TxET QErr IO EventLogWithInvocations)
-> EventLogWithInvocations -> TxET QErr IO EventLogWithInvocations
forall a b. (a -> b) -> a -> b
$ Maybe EventLog -> [EventInvocationLog] -> EventLogWithInvocations
EventLogWithInvocations Maybe EventLog
forall a. Maybe a
Nothing []
[EventLog
event] -> do
[EventInvocationLog]
invocations <-
((Text, TriggerName, EventId, Maybe Int, ViaJSON Value,
ViaJSON Value, UTCTime)
-> EventInvocationLog)
-> [(Text, TriggerName, EventId, Maybe Int, ViaJSON Value,
ViaJSON Value, UTCTime)]
-> [EventInvocationLog]
forall a b. (a -> b) -> [a] -> [b]
map (Text, TriggerName, EventId, Maybe Int, ViaJSON Value,
ViaJSON Value, UTCTime)
-> EventInvocationLog
uncurryEventInvocationLog
([(Text, TriggerName, EventId, Maybe Int, ViaJSON Value,
ViaJSON Value, UTCTime)]
-> [EventInvocationLog])
-> TxET
QErr
IO
[(Text, TriggerName, EventId, Maybe Int, ViaJSON Value,
ViaJSON Value, UTCTime)]
-> TxET QErr IO [EventInvocationLog]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> (Text, Int64, Int64)
-> Bool
-> TxET
QErr
IO
[(Text, TriggerName, EventId, Maybe Int, ViaJSON Value,
ViaJSON Value, UTCTime)]
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
PG.withQE
PGTxErr -> QErr
defaultTxErrorHandler
[PG.sql|
SELECT id, trigger_name, event_id, status, request, response, created_at
FROM hdb_catalog.event_invocation_logs
WHERE event_id = $1
ORDER BY created_at DESC LIMIT $2 OFFSET $3;
|]
(Text
eventId, Int64
limit, Int64
offset)
Bool
True
EventLogWithInvocations -> TxET QErr IO EventLogWithInvocations
forall a. a -> TxET QErr IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (EventLogWithInvocations -> TxET QErr IO EventLogWithInvocations)
-> EventLogWithInvocations -> TxET QErr IO EventLogWithInvocations
forall a b. (a -> b) -> a -> b
$ Maybe EventLog -> [EventInvocationLog] -> EventLogWithInvocations
EventLogWithInvocations (EventLog -> Maybe EventLog
forall a. a -> Maybe a
Just EventLog
event) [EventInvocationLog]
invocations
[EventLog]
_ -> Text -> TxET QErr IO EventLogWithInvocations
forall (m :: * -> *) a. QErrM m => Text -> m a
throw500 (Text -> TxET QErr IO EventLogWithInvocations)
-> Text -> TxET QErr IO EventLogWithInvocations
forall a b. (a -> b) -> a -> b
$ Text
"Unexpected error: Multiple events present with event id " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
eventId
where
eventId :: Text
eventId = EventId -> Text
unEventId EventId
_gebiEventId
Int64
limit :: Int64 = Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Int64) -> Int -> Int64
forall a b. (a -> b) -> a -> b
$ Int
_gebiInvocationLogLimit
Int64
offset :: Int64 = Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Int64) -> Int -> Int64
forall a b. (a -> b) -> a -> b
$ Int
_gebiInvocationLogOffset
uncurryEventLog ::
(EventId, Text, Text, TriggerName, PG.ViaJSON Value, Bool, Bool, Int, Time.UTCTime, Maybe Time.UTCTime, Maybe Time.UTCTime, Bool) ->
EventLog
uncurryEventLog :: (EventId, Text, Text, TriggerName, ViaJSON Value, Bool, Bool, Int,
UTCTime, Maybe UTCTime, Maybe UTCTime, Bool)
-> EventLog
uncurryEventLog (EventId
eventId, Text
schemaName, Text
tableName, TriggerName
triggerName, PG.ViaJSON Value
payload, Bool
delivered, Bool
isError, Int
tries, UTCTime
createdAt, Maybe UTCTime
locked, Maybe UTCTime
nextRetryAt, Bool
archived) =
EventLog
{ elId :: EventId
elId = EventId
eventId,
elSchemaName :: Text
elSchemaName = Text
schemaName,
elTableName :: Text
elTableName = Text
tableName,
elTriggerName :: TriggerName
elTriggerName = TriggerName
triggerName,
elPayload :: Value
elPayload = Value
payload,
elDelivered :: Bool
elDelivered = Bool
delivered,
elError :: Bool
elError = Bool
isError,
elTries :: Int
elTries = Int
tries,
elCreatedAt :: UTCTime
elCreatedAt = UTCTime
createdAt,
elLocked :: Maybe UTCTime
elLocked = Maybe UTCTime
locked,
elNextRetryAt :: Maybe UTCTime
elNextRetryAt = Maybe UTCTime
nextRetryAt,
elArchived :: Bool
elArchived = Bool
archived
}
uncurryEventInvocationLog ::
(Text, TriggerName, EventId, Maybe Int, PG.ViaJSON Value, PG.ViaJSON Value, Time.UTCTime) ->
EventInvocationLog
uncurryEventInvocationLog :: (Text, TriggerName, EventId, Maybe Int, ViaJSON Value,
ViaJSON Value, UTCTime)
-> EventInvocationLog
uncurryEventInvocationLog (Text
invocationId, TriggerName
triggerName, EventId
eventId, Maybe Int
status, PG.ViaJSON Value
request, PG.ViaJSON Value
response, UTCTime
createdAt) =
EventInvocationLog
{ eilId :: Text
eilId = Text
invocationId,
eilTriggerName :: TriggerName
eilTriggerName = TriggerName
triggerName,
eilEventId :: EventId
eilEventId = EventId
eventId,
eilHttpStatus :: Maybe Int
eilHttpStatus = Maybe Int
status,
eilRequest :: Value
eilRequest = Value
request,
eilResponse :: Value
eilResponse = Value
response,
eilCreatedAt :: UTCTime
eilCreatedAt = UTCTime
createdAt
}