{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE TemplateHaskell #-}

-- | Postgres DDL EventTrigger
--
-- Used for creating event triggers for metadata changes.
--
-- See 'Hasura.Eventing.Backend'.
module Hasura.Backends.Postgres.DDL.EventTrigger
  ( insertManualEvent,
    redeliverEvent,
    dropTriggerAndArchiveEvents,
    createTableEventTrigger,
    createMissingSQLTriggers,
    dropTriggerQ,
    dropDanglingSQLTrigger,
    mkAllTriggersQ,
    getMaintenanceModeVersion,
    fetchUndeliveredEvents,
    setRetry,
    recordSuccess,
    recordError,
    recordError',
    unlockEventsInSource,
    updateColumnInEventTrigger,
    checkIfTriggerExists,
    addCleanupSchedules,
    deleteAllScheduledCleanups,
    getCleanupEventsForDeletion,
    updateCleanupEventStatusToDead,
    updateCleanupEventStatusToPaused,
    updateCleanupEventStatusToCompleted,
    deleteEventTriggerLogs,
    fetchEventLogs,
    fetchEventInvocationLogs,
    fetchEventById,
  )
where

import Control.Monad.Trans.Control (MonadBaseControl)
import Data.Aeson
import Data.FileEmbed (makeRelativeToProject)
import Data.HashMap.Strict qualified as HashMap
import Data.HashSet qualified as HashSet
import Data.Int (Int64)
import Data.Set.NonEmpty qualified as NE
import Data.Text.Lazy qualified as TL
import Data.Time (UTCTime)
import Data.Time.Clock qualified as Time
import Database.PG.Query qualified as PG
import Hasura.Backends.Postgres.Connection
import Hasura.Backends.Postgres.SQL.DML
import Hasura.Backends.Postgres.SQL.DML qualified as S
import Hasura.Backends.Postgres.SQL.Types hiding (TableName)
import Hasura.Backends.Postgres.Translate.Column
import Hasura.Base.Error
import Hasura.Eventing.Common
import Hasura.Prelude
import Hasura.RQL.Types.Backend (Backend, SourceConfig, TableName)
import Hasura.RQL.Types.BackendType
import Hasura.RQL.Types.Column
import Hasura.RQL.Types.Common
import Hasura.RQL.Types.EventTrigger
import Hasura.RQL.Types.Eventing
import Hasura.RQL.Types.ScheduledTrigger (formatTime')
import Hasura.RQL.Types.Source
import Hasura.SQL.Types
import Hasura.Server.Migrate.Internal
import Hasura.Server.Migrate.LatestVersion
import Hasura.Server.Migrate.Version
import Hasura.Server.Types
import Hasura.Session
import Hasura.Table.Cache (PrimaryKey)
import Hasura.Tracing qualified as Tracing
import Text.Builder qualified as TB
import Text.Shakespeare.Text qualified as ST

fetchUndeliveredEvents ::
  (MonadIO m, MonadError QErr m) =>
  SourceConfig ('Postgres pgKind) ->
  SourceName ->
  [TriggerName] ->
  MaintenanceMode () ->
  FetchBatchSize ->
  m [Event ('Postgres pgKind)]
fetchUndeliveredEvents :: forall (m :: * -> *) (pgKind :: PostgresKind).
(MonadIO m, MonadError QErr m) =>
SourceConfig ('Postgres pgKind)
-> SourceName
-> [TriggerName]
-> MaintenanceMode ()
-> FetchBatchSize
-> m [Event ('Postgres pgKind)]
fetchUndeliveredEvents SourceConfig ('Postgres pgKind)
sourceConfig SourceName
sourceName [TriggerName]
triggerNames MaintenanceMode ()
maintenanceMode FetchBatchSize
fetchBatchSize = do
  Either QErr (TxE QErr [Event ('Postgres pgKind)])
fetchEventsTxE <-
    case MaintenanceMode ()
maintenanceMode of
      MaintenanceModeEnabled () -> do
        Either QErr MaintenanceModeVersion
maintenanceModeVersion <- IO (Either QErr MaintenanceModeVersion)
-> m (Either QErr MaintenanceModeVersion)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either QErr MaintenanceModeVersion)
 -> m (Either QErr MaintenanceModeVersion))
-> IO (Either QErr MaintenanceModeVersion)
-> m (Either QErr MaintenanceModeVersion)
forall a b. (a -> b) -> a -> b
$ PGSourceConfig
-> TxET QErr IO MaintenanceModeVersion
-> IO (Either QErr MaintenanceModeVersion)
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> TxET QErr m a -> m (Either QErr a)
runPgSourceReadTx SourceConfig ('Postgres pgKind)
PGSourceConfig
sourceConfig TxET QErr IO MaintenanceModeVersion
getMaintenanceModeVersionTx
        Either QErr (TxE QErr [Event ('Postgres pgKind)])
-> m (Either QErr (TxE QErr [Event ('Postgres pgKind)]))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either QErr (TxE QErr [Event ('Postgres pgKind)])
 -> m (Either QErr (TxE QErr [Event ('Postgres pgKind)])))
-> Either QErr (TxE QErr [Event ('Postgres pgKind)])
-> m (Either QErr (TxE QErr [Event ('Postgres pgKind)]))
forall a b. (a -> b) -> a -> b
$ (MaintenanceModeVersion -> TxE QErr [Event ('Postgres pgKind)])
-> Either QErr MaintenanceModeVersion
-> Either QErr (TxE QErr [Event ('Postgres pgKind)])
forall a b. (a -> b) -> Either QErr a -> Either QErr b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (SourceName
-> [TriggerName]
-> FetchBatchSize
-> MaintenanceModeVersion
-> TxE QErr [Event ('Postgres pgKind)]
forall (pgKind :: PostgresKind).
SourceName
-> [TriggerName]
-> FetchBatchSize
-> MaintenanceModeVersion
-> TxE QErr [Event ('Postgres pgKind)]
fetchEventsMaintenanceMode SourceName
sourceName [TriggerName]
triggerNames FetchBatchSize
fetchBatchSize) Either QErr MaintenanceModeVersion
maintenanceModeVersion
      MaintenanceMode ()
MaintenanceModeDisabled -> Either QErr (TxE QErr [Event ('Postgres pgKind)])
-> m (Either QErr (TxE QErr [Event ('Postgres pgKind)]))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either QErr (TxE QErr [Event ('Postgres pgKind)])
 -> m (Either QErr (TxE QErr [Event ('Postgres pgKind)])))
-> Either QErr (TxE QErr [Event ('Postgres pgKind)])
-> m (Either QErr (TxE QErr [Event ('Postgres pgKind)]))
forall a b. (a -> b) -> a -> b
$ TxE QErr [Event ('Postgres pgKind)]
-> Either QErr (TxE QErr [Event ('Postgres pgKind)])
forall a b. b -> Either a b
Right (TxE QErr [Event ('Postgres pgKind)]
 -> Either QErr (TxE QErr [Event ('Postgres pgKind)]))
-> TxE QErr [Event ('Postgres pgKind)]
-> Either QErr (TxE QErr [Event ('Postgres pgKind)])
forall a b. (a -> b) -> a -> b
$ SourceName
-> [TriggerName]
-> FetchBatchSize
-> TxE QErr [Event ('Postgres pgKind)]
forall (pgKind :: PostgresKind).
SourceName
-> [TriggerName]
-> FetchBatchSize
-> TxE QErr [Event ('Postgres pgKind)]
fetchEvents SourceName
sourceName [TriggerName]
triggerNames FetchBatchSize
fetchBatchSize
  case Either QErr (TxE QErr [Event ('Postgres pgKind)])
fetchEventsTxE of
    Left QErr
err -> QErr -> m [Event ('Postgres pgKind)]
forall a. QErr -> m a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (QErr -> m [Event ('Postgres pgKind)])
-> QErr -> m [Event ('Postgres pgKind)]
forall a b. (a -> b) -> a -> b
$ Text -> QErr -> QErr
prefixQErr Text
"something went wrong while fetching events: " QErr
err
    Right TxE QErr [Event ('Postgres pgKind)]
fetchEventsTx ->
      m (Either QErr [Event ('Postgres pgKind)])
-> m [Event ('Postgres pgKind)]
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM
        (m (Either QErr [Event ('Postgres pgKind)])
 -> m [Event ('Postgres pgKind)])
-> m (Either QErr [Event ('Postgres pgKind)])
-> m [Event ('Postgres pgKind)]
forall a b. (a -> b) -> a -> b
$ IO (Either QErr [Event ('Postgres pgKind)])
-> m (Either QErr [Event ('Postgres pgKind)])
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
        (IO (Either QErr [Event ('Postgres pgKind)])
 -> m (Either QErr [Event ('Postgres pgKind)]))
-> IO (Either QErr [Event ('Postgres pgKind)])
-> m (Either QErr [Event ('Postgres pgKind)])
forall a b. (a -> b) -> a -> b
$ PGSourceConfig
-> PGExecFrom
-> TxE QErr [Event ('Postgres pgKind)]
-> IO (Either QErr [Event ('Postgres pgKind)])
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> PGExecFrom -> TxET QErr m a -> m (Either QErr a)
runPgSourceWriteTx SourceConfig ('Postgres pgKind)
PGSourceConfig
sourceConfig PGExecFrom
InternalRawQuery TxE QErr [Event ('Postgres pgKind)]
fetchEventsTx

setRetry ::
  ( MonadIO m,
    MonadError QErr m
  ) =>
  SourceConfig ('Postgres pgKind) ->
  Event ('Postgres pgKind) ->
  Time.UTCTime ->
  MaintenanceMode MaintenanceModeVersion ->
  m ()
setRetry :: forall (m :: * -> *) (pgKind :: PostgresKind).
(MonadIO m, MonadError QErr m) =>
SourceConfig ('Postgres pgKind)
-> Event ('Postgres pgKind)
-> UTCTime
-> MaintenanceMode MaintenanceModeVersion
-> m ()
setRetry SourceConfig ('Postgres pgKind)
sourceConfig Event ('Postgres pgKind)
event UTCTime
retryTime MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion =
  m (Either QErr ()) -> m ()
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM (m (Either QErr ()) -> m ()) -> m (Either QErr ()) -> m ()
forall a b. (a -> b) -> a -> b
$ IO (Either QErr ()) -> m (Either QErr ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either QErr ()) -> m (Either QErr ()))
-> IO (Either QErr ()) -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$ PGSourceConfig
-> PGExecFrom -> TxET QErr IO () -> IO (Either QErr ())
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> PGExecFrom -> TxET QErr m a -> m (Either QErr a)
runPgSourceWriteTx SourceConfig ('Postgres pgKind)
PGSourceConfig
sourceConfig PGExecFrom
InternalRawQuery (Event ('Postgres pgKind)
-> UTCTime
-> MaintenanceMode MaintenanceModeVersion
-> TxET QErr IO ()
forall (pgKind :: PostgresKind).
Event ('Postgres pgKind)
-> UTCTime
-> MaintenanceMode MaintenanceModeVersion
-> TxET QErr IO ()
setRetryTx Event ('Postgres pgKind)
event UTCTime
retryTime MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion)

insertManualEvent ::
  (MonadIO m, MonadError QErr m) =>
  SourceConfig ('Postgres pgKind) ->
  TableName ('Postgres pgKind) ->
  TriggerName ->
  Value ->
  UserInfo ->
  Maybe Tracing.TraceContext ->
  m EventId
insertManualEvent :: forall (m :: * -> *) (pgKind :: PostgresKind).
(MonadIO m, MonadError QErr m) =>
SourceConfig ('Postgres pgKind)
-> TableName ('Postgres pgKind)
-> TriggerName
-> Value
-> UserInfo
-> Maybe TraceContext
-> m EventId
insertManualEvent SourceConfig ('Postgres pgKind)
sourceConfig TableName ('Postgres pgKind)
tableName TriggerName
triggerName Value
payload UserInfo
userInfo Maybe TraceContext
traceCtx =
  -- NOTE: The methods `setTraceContextInTx` and `setHeadersTx` are being used
  -- to ensure that the trace context and user info are set with valid values
  -- while being used in the PG function `insert_event_log`.
  -- See Issue(#7087) for more details on a bug that was being caused
  -- in the absence of these methods.
  m (Either QErr EventId) -> m EventId
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM
    (m (Either QErr EventId) -> m EventId)
-> m (Either QErr EventId) -> m EventId
forall a b. (a -> b) -> a -> b
$ IO (Either QErr EventId) -> m (Either QErr EventId)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
    (IO (Either QErr EventId) -> m (Either QErr EventId))
-> IO (Either QErr EventId) -> m (Either QErr EventId)
forall a b. (a -> b) -> a -> b
$ PGSourceConfig
-> PGExecFrom -> TxET QErr IO EventId -> IO (Either QErr EventId)
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> PGExecFrom -> TxET QErr m a -> m (Either QErr a)
runPgSourceWriteTx SourceConfig ('Postgres pgKind)
PGSourceConfig
sourceConfig PGExecFrom
InternalRawQuery
    (TxET QErr IO EventId -> IO (Either QErr EventId))
-> TxET QErr IO EventId -> IO (Either QErr EventId)
forall a b. (a -> b) -> a -> b
$ SessionVariables -> TxET QErr IO ()
forall (m :: * -> *).
MonadIO m =>
SessionVariables -> TxET QErr m ()
setHeadersTx (UserInfo -> SessionVariables
_uiSession UserInfo
userInfo)
    TxET QErr IO () -> TxET QErr IO () -> TxET QErr IO ()
forall a b. TxET QErr IO a -> TxET QErr IO b -> TxET QErr IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Maybe TraceContext -> TxET QErr IO ()
forall (m :: * -> *).
MonadIO m =>
Maybe TraceContext -> TxET QErr m ()
setTraceContextInTx Maybe TraceContext
traceCtx
    TxET QErr IO () -> TxET QErr IO EventId -> TxET QErr IO EventId
forall a b. TxET QErr IO a -> TxET QErr IO b -> TxET QErr IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> QualifiedTable -> TriggerName -> Value -> TxET QErr IO EventId
insertPGManualEvent TableName ('Postgres pgKind)
QualifiedTable
tableName TriggerName
triggerName Value
payload

getMaintenanceModeVersion ::
  ( MonadIO m,
    MonadError QErr m
  ) =>
  SourceConfig ('Postgres pgKind) ->
  m MaintenanceModeVersion
getMaintenanceModeVersion :: forall (m :: * -> *) (pgKind :: PostgresKind).
(MonadIO m, MonadError QErr m) =>
SourceConfig ('Postgres pgKind) -> m MaintenanceModeVersion
getMaintenanceModeVersion SourceConfig ('Postgres pgKind)
sourceConfig =
  m (Either QErr MaintenanceModeVersion) -> m MaintenanceModeVersion
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM (m (Either QErr MaintenanceModeVersion)
 -> m MaintenanceModeVersion)
-> m (Either QErr MaintenanceModeVersion)
-> m MaintenanceModeVersion
forall a b. (a -> b) -> a -> b
$ IO (Either QErr MaintenanceModeVersion)
-> m (Either QErr MaintenanceModeVersion)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either QErr MaintenanceModeVersion)
 -> m (Either QErr MaintenanceModeVersion))
-> IO (Either QErr MaintenanceModeVersion)
-> m (Either QErr MaintenanceModeVersion)
forall a b. (a -> b) -> a -> b
$ PGSourceConfig
-> TxET QErr IO MaintenanceModeVersion
-> IO (Either QErr MaintenanceModeVersion)
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> TxET QErr m a -> m (Either QErr a)
runPgSourceReadTx SourceConfig ('Postgres pgKind)
PGSourceConfig
sourceConfig TxET QErr IO MaintenanceModeVersion
getMaintenanceModeVersionTx

recordSuccess ::
  (MonadIO m) =>
  SourceConfig ('Postgres pgKind) ->
  Event ('Postgres pgKind) ->
  Invocation 'EventType ->
  MaintenanceMode MaintenanceModeVersion ->
  m (Either QErr ())
recordSuccess :: forall (m :: * -> *) (pgKind :: PostgresKind).
MonadIO m =>
SourceConfig ('Postgres pgKind)
-> Event ('Postgres pgKind)
-> Invocation 'EventType
-> MaintenanceMode MaintenanceModeVersion
-> m (Either QErr ())
recordSuccess SourceConfig ('Postgres pgKind)
sourceConfig Event ('Postgres pgKind)
event Invocation 'EventType
invocation MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion =
  IO (Either QErr ()) -> m (Either QErr ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
    (IO (Either QErr ()) -> m (Either QErr ()))
-> IO (Either QErr ()) -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$ PGSourceConfig
-> PGExecFrom -> TxET QErr IO () -> IO (Either QErr ())
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> PGExecFrom -> TxET QErr m a -> m (Either QErr a)
runPgSourceWriteTx SourceConfig ('Postgres pgKind)
PGSourceConfig
sourceConfig PGExecFrom
InternalRawQuery
    (TxET QErr IO () -> IO (Either QErr ()))
-> TxET QErr IO () -> IO (Either QErr ())
forall a b. (a -> b) -> a -> b
$ do
      TriggerName -> Invocation 'EventType -> TxET QErr IO ()
insertInvocation (TriggerMetadata -> TriggerName
tmName (Event ('Postgres pgKind) -> TriggerMetadata
forall (b :: BackendType). Event b -> TriggerMetadata
eTrigger Event ('Postgres pgKind)
event)) Invocation 'EventType
invocation
      Event ('Postgres pgKind)
-> MaintenanceMode MaintenanceModeVersion -> TxET QErr IO ()
forall (pgKind :: PostgresKind).
Event ('Postgres pgKind)
-> MaintenanceMode MaintenanceModeVersion -> TxET QErr IO ()
setSuccessTx Event ('Postgres pgKind)
event MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion

recordError ::
  (MonadIO m) =>
  SourceConfig ('Postgres pgKind) ->
  Event ('Postgres pgKind) ->
  Invocation 'EventType ->
  ProcessEventError ->
  MaintenanceMode MaintenanceModeVersion ->
  m (Either QErr ())
recordError :: forall (m :: * -> *) (pgKind :: PostgresKind).
MonadIO m =>
SourceConfig ('Postgres pgKind)
-> Event ('Postgres pgKind)
-> Invocation 'EventType
-> ProcessEventError
-> MaintenanceMode MaintenanceModeVersion
-> m (Either QErr ())
recordError SourceConfig ('Postgres pgKind)
sourceConfig Event ('Postgres pgKind)
event Invocation 'EventType
invocation ProcessEventError
processEventError MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion =
  SourceConfig ('Postgres pgKind)
-> Event ('Postgres pgKind)
-> Maybe (Invocation 'EventType)
-> ProcessEventError
-> MaintenanceMode MaintenanceModeVersion
-> m (Either QErr ())
forall (m :: * -> *) (pgKind :: PostgresKind).
MonadIO m =>
SourceConfig ('Postgres pgKind)
-> Event ('Postgres pgKind)
-> Maybe (Invocation 'EventType)
-> ProcessEventError
-> MaintenanceMode MaintenanceModeVersion
-> m (Either QErr ())
recordError' SourceConfig ('Postgres pgKind)
sourceConfig Event ('Postgres pgKind)
event (Invocation 'EventType -> Maybe (Invocation 'EventType)
forall a. a -> Maybe a
Just Invocation 'EventType
invocation) ProcessEventError
processEventError MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion

recordError' ::
  (MonadIO m) =>
  SourceConfig ('Postgres pgKind) ->
  Event ('Postgres pgKind) ->
  Maybe (Invocation 'EventType) ->
  ProcessEventError ->
  MaintenanceMode MaintenanceModeVersion ->
  m (Either QErr ())
recordError' :: forall (m :: * -> *) (pgKind :: PostgresKind).
MonadIO m =>
SourceConfig ('Postgres pgKind)
-> Event ('Postgres pgKind)
-> Maybe (Invocation 'EventType)
-> ProcessEventError
-> MaintenanceMode MaintenanceModeVersion
-> m (Either QErr ())
recordError' SourceConfig ('Postgres pgKind)
sourceConfig Event ('Postgres pgKind)
event Maybe (Invocation 'EventType)
invocation ProcessEventError
processEventError MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion =
  IO (Either QErr ()) -> m (Either QErr ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
    (IO (Either QErr ()) -> m (Either QErr ()))
-> IO (Either QErr ()) -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$ PGSourceConfig
-> PGExecFrom -> TxET QErr IO () -> IO (Either QErr ())
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> PGExecFrom -> TxET QErr m a -> m (Either QErr a)
runPgSourceWriteTx SourceConfig ('Postgres pgKind)
PGSourceConfig
sourceConfig PGExecFrom
InternalRawQuery
    (TxET QErr IO () -> IO (Either QErr ()))
-> TxET QErr IO () -> IO (Either QErr ())
forall a b. (a -> b) -> a -> b
$ do
      Maybe (Invocation 'EventType)
-> (Invocation 'EventType -> TxET QErr IO ()) -> TxET QErr IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ Maybe (Invocation 'EventType)
invocation ((Invocation 'EventType -> TxET QErr IO ()) -> TxET QErr IO ())
-> (Invocation 'EventType -> TxET QErr IO ()) -> TxET QErr IO ()
forall a b. (a -> b) -> a -> b
$ TriggerName -> Invocation 'EventType -> TxET QErr IO ()
insertInvocation (TriggerMetadata -> TriggerName
tmName (Event ('Postgres pgKind) -> TriggerMetadata
forall (b :: BackendType). Event b -> TriggerMetadata
eTrigger Event ('Postgres pgKind)
event))
      case ProcessEventError
processEventError of
        PESetRetry UTCTime
retryTime -> Event ('Postgres pgKind)
-> UTCTime
-> MaintenanceMode MaintenanceModeVersion
-> TxET QErr IO ()
forall (pgKind :: PostgresKind).
Event ('Postgres pgKind)
-> UTCTime
-> MaintenanceMode MaintenanceModeVersion
-> TxET QErr IO ()
setRetryTx Event ('Postgres pgKind)
event UTCTime
retryTime MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion
        ProcessEventError
PESetError -> Event ('Postgres pgKind)
-> MaintenanceMode MaintenanceModeVersion -> TxET QErr IO ()
forall (pgKind :: PostgresKind).
Event ('Postgres pgKind)
-> MaintenanceMode MaintenanceModeVersion -> TxET QErr IO ()
setErrorTx Event ('Postgres pgKind)
event MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion

redeliverEvent ::
  (MonadIO m, MonadError QErr m) =>
  SourceConfig ('Postgres pgKind) ->
  EventId ->
  m ()
redeliverEvent :: forall (m :: * -> *) (pgKind :: PostgresKind).
(MonadIO m, MonadError QErr m) =>
SourceConfig ('Postgres pgKind) -> EventId -> m ()
redeliverEvent SourceConfig ('Postgres pgKind)
sourceConfig EventId
eventId =
  m (Either QErr ()) -> m ()
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM (m (Either QErr ()) -> m ()) -> m (Either QErr ()) -> m ()
forall a b. (a -> b) -> a -> b
$ IO (Either QErr ()) -> m (Either QErr ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either QErr ()) -> m (Either QErr ()))
-> IO (Either QErr ()) -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$ PGSourceConfig
-> PGExecFrom -> TxET QErr IO () -> IO (Either QErr ())
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> PGExecFrom -> TxET QErr m a -> m (Either QErr a)
runPgSourceWriteTx SourceConfig ('Postgres pgKind)
PGSourceConfig
sourceConfig PGExecFrom
InternalRawQuery (EventId -> TxET QErr IO ()
redeliverEventTx EventId
eventId)

dropTriggerAndArchiveEvents ::
  ( MonadIO m,
    MonadError QErr m
  ) =>
  SourceConfig ('Postgres pgKind) ->
  TriggerName ->
  QualifiedTable ->
  m ()
dropTriggerAndArchiveEvents :: forall (m :: * -> *) (pgKind :: PostgresKind).
(MonadIO m, MonadError QErr m) =>
SourceConfig ('Postgres pgKind)
-> TriggerName -> QualifiedTable -> m ()
dropTriggerAndArchiveEvents SourceConfig ('Postgres pgKind)
sourceConfig TriggerName
triggerName QualifiedTable
_table =
  m (Either QErr ()) -> m ()
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM
    (m (Either QErr ()) -> m ()) -> m (Either QErr ()) -> m ()
forall a b. (a -> b) -> a -> b
$ IO (Either QErr ()) -> m (Either QErr ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
    (IO (Either QErr ()) -> m (Either QErr ()))
-> IO (Either QErr ()) -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$ PGSourceConfig
-> PGExecFrom -> TxET QErr IO () -> IO (Either QErr ())
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> PGExecFrom -> TxET QErr m a -> m (Either QErr a)
runPgSourceWriteTx SourceConfig ('Postgres pgKind)
PGSourceConfig
sourceConfig PGExecFrom
InternalRawQuery
    (TxET QErr IO () -> IO (Either QErr ()))
-> TxET QErr IO () -> IO (Either QErr ())
forall a b. (a -> b) -> a -> b
$ do
      TriggerName -> TxET QErr IO ()
dropTriggerQ TriggerName
triggerName
      TriggerName -> TxET QErr IO ()
archiveEvents TriggerName
triggerName

createMissingSQLTriggers ::
  ( MonadIO m,
    MonadError QErr m,
    MonadBaseControl IO m,
    Backend ('Postgres pgKind)
  ) =>
  SQLGenCtx ->
  PGSourceConfig ->
  TableName ('Postgres pgKind) ->
  ([(ColumnInfo ('Postgres pgKind))], Maybe (PrimaryKey ('Postgres pgKind) (ColumnInfo ('Postgres pgKind)))) ->
  TriggerName ->
  TriggerOnReplication ->
  TriggerOpsDef ('Postgres pgKind) ->
  m ()
createMissingSQLTriggers :: forall (m :: * -> *) (pgKind :: PostgresKind).
(MonadIO m, MonadError QErr m, MonadBaseControl IO m,
 Backend ('Postgres pgKind)) =>
SQLGenCtx
-> PGSourceConfig
-> TableName ('Postgres pgKind)
-> ([ColumnInfo ('Postgres pgKind)],
    Maybe
      (PrimaryKey ('Postgres pgKind) (ColumnInfo ('Postgres pgKind))))
-> TriggerName
-> TriggerOnReplication
-> TriggerOpsDef ('Postgres pgKind)
-> m ()
createMissingSQLTriggers SQLGenCtx
serverConfigCtx PGSourceConfig
sourceConfig TableName ('Postgres pgKind)
table ([ColumnInfo ('Postgres pgKind)]
allCols, Maybe
  (PrimaryKey ('Postgres pgKind) (ColumnInfo ('Postgres pgKind)))
_) TriggerName
triggerName TriggerOnReplication
triggerOnReplication TriggerOpsDef ('Postgres pgKind)
opsDefinition = do
  m (Either QErr ()) -> m ()
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM
    (m (Either QErr ()) -> m ()) -> m (Either QErr ()) -> m ()
forall a b. (a -> b) -> a -> b
$ PGSourceConfig
-> PGExecFrom -> TxET QErr m () -> m (Either QErr ())
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> PGExecFrom -> TxET QErr m a -> m (Either QErr a)
runPgSourceWriteTx PGSourceConfig
sourceConfig PGExecFrom
InternalRawQuery
    (TxET QErr m () -> m (Either QErr ()))
-> TxET QErr m () -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$ do
      Maybe (SubscribeOpSpec ('Postgres pgKind))
-> (SubscribeOpSpec ('Postgres pgKind) -> TxET QErr m ())
-> TxET QErr m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ (TriggerOpsDef ('Postgres pgKind)
-> Maybe (SubscribeOpSpec ('Postgres pgKind))
forall (b :: BackendType).
TriggerOpsDef b -> Maybe (SubscribeOpSpec b)
tdInsert TriggerOpsDef ('Postgres pgKind)
opsDefinition) (Ops -> SubscribeOpSpec ('Postgres pgKind) -> TxET QErr m ()
doesSQLTriggerExist Ops
INSERT)
      Maybe (SubscribeOpSpec ('Postgres pgKind))
-> (SubscribeOpSpec ('Postgres pgKind) -> TxET QErr m ())
-> TxET QErr m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ (TriggerOpsDef ('Postgres pgKind)
-> Maybe (SubscribeOpSpec ('Postgres pgKind))
forall (b :: BackendType).
TriggerOpsDef b -> Maybe (SubscribeOpSpec b)
tdUpdate TriggerOpsDef ('Postgres pgKind)
opsDefinition) (Ops -> SubscribeOpSpec ('Postgres pgKind) -> TxET QErr m ()
doesSQLTriggerExist Ops
UPDATE)
      Maybe (SubscribeOpSpec ('Postgres pgKind))
-> (SubscribeOpSpec ('Postgres pgKind) -> TxET QErr m ())
-> TxET QErr m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ (TriggerOpsDef ('Postgres pgKind)
-> Maybe (SubscribeOpSpec ('Postgres pgKind))
forall (b :: BackendType).
TriggerOpsDef b -> Maybe (SubscribeOpSpec b)
tdDelete TriggerOpsDef ('Postgres pgKind)
opsDefinition) (Ops -> SubscribeOpSpec ('Postgres pgKind) -> TxET QErr m ()
doesSQLTriggerExist Ops
DELETE)
  where
    doesSQLTriggerExist :: Ops -> SubscribeOpSpec ('Postgres pgKind) -> TxET QErr m ()
doesSQLTriggerExist Ops
op SubscribeOpSpec ('Postgres pgKind)
opSpec = do
      let opTriggerName :: QualifiedTriggerName
opTriggerName = Ops -> TriggerName -> QualifiedTriggerName
pgTriggerName Ops
op TriggerName
triggerName
      Bool
doesOpTriggerFunctionExist <-
        Identity Bool -> Bool
forall a. Identity a -> a
runIdentity
          (Identity Bool -> Bool)
-> (SingleRow (Identity Bool) -> Identity Bool)
-> SingleRow (Identity Bool)
-> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SingleRow (Identity Bool) -> Identity Bool
forall a. SingleRow a -> a
PG.getRow
          (SingleRow (Identity Bool) -> Bool)
-> TxET QErr m (SingleRow (Identity Bool)) -> TxET QErr m Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> Identity QualifiedTriggerName
-> Bool
-> TxET QErr m (SingleRow (Identity Bool))
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
PG.withQE
            PGTxErr -> QErr
defaultTxErrorHandler
            [PG.sql|
                 SELECT EXISTS
                   ( SELECT 1
                     FROM pg_proc
                     WHERE proname = $1
                   )
              |]
            (QualifiedTriggerName -> Identity QualifiedTriggerName
forall a. a -> Identity a
Identity QualifiedTriggerName
opTriggerName)
            Bool
True
      Bool -> TxET QErr m () -> TxET QErr m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
doesOpTriggerFunctionExist
        (TxET QErr m () -> TxET QErr m ())
-> TxET QErr m () -> TxET QErr m ()
forall a b. (a -> b) -> a -> b
$ (ReaderT SQLGenCtx (TxET QErr m) () -> SQLGenCtx -> TxET QErr m ())
-> SQLGenCtx
-> ReaderT SQLGenCtx (TxET QErr m) ()
-> TxET QErr m ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip ReaderT SQLGenCtx (TxET QErr m) () -> SQLGenCtx -> TxET QErr m ()
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT SQLGenCtx
serverConfigCtx
        (ReaderT SQLGenCtx (TxET QErr m) () -> TxET QErr m ())
-> ReaderT SQLGenCtx (TxET QErr m) () -> TxET QErr m ()
forall a b. (a -> b) -> a -> b
$ TriggerName
-> QualifiedTable
-> TriggerOnReplication
-> [ColumnInfo ('Postgres pgKind)]
-> Ops
-> SubscribeOpSpec ('Postgres pgKind)
-> ReaderT SQLGenCtx (TxET QErr m) ()
forall (pgKind :: PostgresKind) (m :: * -> *).
(Backend ('Postgres pgKind), MonadTx m, MonadReader SQLGenCtx m) =>
TriggerName
-> QualifiedTable
-> TriggerOnReplication
-> [ColumnInfo ('Postgres pgKind)]
-> Ops
-> SubscribeOpSpec ('Postgres pgKind)
-> m ()
mkTrigger TriggerName
triggerName TableName ('Postgres pgKind)
QualifiedTable
table TriggerOnReplication
triggerOnReplication [ColumnInfo ('Postgres pgKind)]
allCols Ops
op SubscribeOpSpec ('Postgres pgKind)
opSpec

createTableEventTrigger ::
  (Backend ('Postgres pgKind), MonadIO m, MonadBaseControl IO m) =>
  SQLGenCtx ->
  PGSourceConfig ->
  QualifiedTable ->
  [ColumnInfo ('Postgres pgKind)] ->
  TriggerName ->
  TriggerOnReplication ->
  TriggerOpsDef ('Postgres pgKind) ->
  Maybe (PrimaryKey ('Postgres pgKind) (ColumnInfo ('Postgres pgKind))) ->
  m (Either QErr ())
createTableEventTrigger :: forall (pgKind :: PostgresKind) (m :: * -> *).
(Backend ('Postgres pgKind), MonadIO m, MonadBaseControl IO m) =>
SQLGenCtx
-> PGSourceConfig
-> QualifiedTable
-> [ColumnInfo ('Postgres pgKind)]
-> TriggerName
-> TriggerOnReplication
-> TriggerOpsDef ('Postgres pgKind)
-> Maybe
     (PrimaryKey ('Postgres pgKind) (ColumnInfo ('Postgres pgKind)))
-> m (Either QErr ())
createTableEventTrigger SQLGenCtx
serverConfigCtx PGSourceConfig
sourceConfig QualifiedTable
table [ColumnInfo ('Postgres pgKind)]
columns TriggerName
triggerName TriggerOnReplication
triggerOnReplication TriggerOpsDef ('Postgres pgKind)
opsDefinition Maybe
  (PrimaryKey ('Postgres pgKind) (ColumnInfo ('Postgres pgKind)))
_ = PGSourceConfig
-> PGExecFrom -> TxET QErr m () -> m (Either QErr ())
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> PGExecFrom -> TxET QErr m a -> m (Either QErr a)
runPgSourceWriteTx PGSourceConfig
sourceConfig PGExecFrom
InternalRawQuery (TxET QErr m () -> m (Either QErr ()))
-> TxET QErr m () -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$ do
  -- Create the given triggers
  (ReaderT SQLGenCtx (TxET QErr m) () -> SQLGenCtx -> TxET QErr m ())
-> SQLGenCtx
-> ReaderT SQLGenCtx (TxET QErr m) ()
-> TxET QErr m ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip ReaderT SQLGenCtx (TxET QErr m) () -> SQLGenCtx -> TxET QErr m ()
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT SQLGenCtx
serverConfigCtx
    (ReaderT SQLGenCtx (TxET QErr m) () -> TxET QErr m ())
-> ReaderT SQLGenCtx (TxET QErr m) () -> TxET QErr m ()
forall a b. (a -> b) -> a -> b
$ TriggerName
-> QualifiedTable
-> TriggerOnReplication
-> [ColumnInfo ('Postgres pgKind)]
-> TriggerOpsDef ('Postgres pgKind)
-> ReaderT SQLGenCtx (TxET QErr m) ()
forall (pgKind :: PostgresKind) (m :: * -> *).
(Backend ('Postgres pgKind), MonadTx m, MonadReader SQLGenCtx m) =>
TriggerName
-> QualifiedTable
-> TriggerOnReplication
-> [ColumnInfo ('Postgres pgKind)]
-> TriggerOpsDef ('Postgres pgKind)
-> m ()
mkAllTriggersQ TriggerName
triggerName QualifiedTable
table TriggerOnReplication
triggerOnReplication [ColumnInfo ('Postgres pgKind)]
columns TriggerOpsDef ('Postgres pgKind)
opsDefinition

dropDanglingSQLTrigger ::
  ( MonadIO m,
    MonadError QErr m
  ) =>
  SourceConfig ('Postgres pgKind) ->
  TriggerName ->
  QualifiedTable ->
  HashSet Ops ->
  m ()
dropDanglingSQLTrigger :: forall (m :: * -> *) (pgKind :: PostgresKind).
(MonadIO m, MonadError QErr m) =>
SourceConfig ('Postgres pgKind)
-> TriggerName -> QualifiedTable -> HashSet Ops -> m ()
dropDanglingSQLTrigger SourceConfig ('Postgres pgKind)
sourceConfig TriggerName
triggerName QualifiedTable
_ HashSet Ops
ops =
  m (Either QErr ()) -> m ()
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM
    (m (Either QErr ()) -> m ()) -> m (Either QErr ()) -> m ()
forall a b. (a -> b) -> a -> b
$ IO (Either QErr ()) -> m (Either QErr ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
    (IO (Either QErr ()) -> m (Either QErr ()))
-> IO (Either QErr ()) -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$ PGSourceConfig
-> PGExecFrom -> TxET QErr IO () -> IO (Either QErr ())
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> PGExecFrom -> TxET QErr m a -> m (Either QErr a)
runPgSourceWriteTx SourceConfig ('Postgres pgKind)
PGSourceConfig
sourceConfig PGExecFrom
InternalRawQuery
    (TxET QErr IO () -> IO (Either QErr ()))
-> TxET QErr IO () -> IO (Either QErr ())
forall a b. (a -> b) -> a -> b
$ (Ops -> TxET QErr IO ()) -> HashSet Ops -> TxET QErr IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (TriggerName -> Ops -> TxET QErr IO ()
dropTriggerOp TriggerName
triggerName) HashSet Ops
ops

updateColumnInEventTrigger ::
  QualifiedTable ->
  PGCol ->
  PGCol ->
  QualifiedTable ->
  EventTriggerConf ('Postgres pgKind) ->
  EventTriggerConf ('Postgres pgKind)
updateColumnInEventTrigger :: forall (pgKind :: PostgresKind).
QualifiedTable
-> PGCol
-> PGCol
-> QualifiedTable
-> EventTriggerConf ('Postgres pgKind)
-> EventTriggerConf ('Postgres pgKind)
updateColumnInEventTrigger QualifiedTable
table PGCol
oCol PGCol
nCol QualifiedTable
refTable = EventTriggerConf ('Postgres pgKind)
-> EventTriggerConf ('Postgres pgKind)
rewriteEventTriggerConf
  where
    rewriteSubsCols :: SubscribeColumns ('Postgres pgKind)
-> SubscribeColumns ('Postgres pgKind)
rewriteSubsCols = \case
      SubscribeColumns ('Postgres pgKind)
SubCStar -> SubscribeColumns ('Postgres pgKind)
forall (b :: BackendType). SubscribeColumns b
SubCStar
      SubCArray [Column ('Postgres pgKind)]
cols -> [Column ('Postgres pgKind)] -> SubscribeColumns ('Postgres pgKind)
forall (b :: BackendType). [Column b] -> SubscribeColumns b
SubCArray ([Column ('Postgres pgKind)]
 -> SubscribeColumns ('Postgres pgKind))
-> [Column ('Postgres pgKind)]
-> SubscribeColumns ('Postgres pgKind)
forall a b. (a -> b) -> a -> b
$ (PGCol -> PGCol) -> [PGCol] -> [PGCol]
forall a b. (a -> b) -> [a] -> [b]
map PGCol -> PGCol
getNewCol [Column ('Postgres pgKind)]
[PGCol]
cols
    rewriteOpSpec :: SubscribeOpSpec ('Postgres pgKind)
-> SubscribeOpSpec ('Postgres pgKind)
rewriteOpSpec (SubscribeOpSpec SubscribeColumns ('Postgres pgKind)
listenColumns Maybe (SubscribeColumns ('Postgres pgKind))
deliveryColumns) =
      SubscribeColumns ('Postgres pgKind)
-> Maybe (SubscribeColumns ('Postgres pgKind))
-> SubscribeOpSpec ('Postgres pgKind)
forall (b :: BackendType).
SubscribeColumns b
-> Maybe (SubscribeColumns b) -> SubscribeOpSpec b
SubscribeOpSpec
        (SubscribeColumns ('Postgres pgKind)
-> SubscribeColumns ('Postgres pgKind)
rewriteSubsCols SubscribeColumns ('Postgres pgKind)
listenColumns)
        (SubscribeColumns ('Postgres pgKind)
-> SubscribeColumns ('Postgres pgKind)
rewriteSubsCols (SubscribeColumns ('Postgres pgKind)
 -> SubscribeColumns ('Postgres pgKind))
-> Maybe (SubscribeColumns ('Postgres pgKind))
-> Maybe (SubscribeColumns ('Postgres pgKind))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (SubscribeColumns ('Postgres pgKind))
deliveryColumns)
    rewriteTrigOpsDef :: TriggerOpsDef ('Postgres pgKind)
-> TriggerOpsDef ('Postgres pgKind)
rewriteTrigOpsDef (TriggerOpsDef Maybe (SubscribeOpSpec ('Postgres pgKind))
ins Maybe (SubscribeOpSpec ('Postgres pgKind))
upd Maybe (SubscribeOpSpec ('Postgres pgKind))
del Maybe Bool
man) =
      Maybe (SubscribeOpSpec ('Postgres pgKind))
-> Maybe (SubscribeOpSpec ('Postgres pgKind))
-> Maybe (SubscribeOpSpec ('Postgres pgKind))
-> Maybe Bool
-> TriggerOpsDef ('Postgres pgKind)
forall (b :: BackendType).
Maybe (SubscribeOpSpec b)
-> Maybe (SubscribeOpSpec b)
-> Maybe (SubscribeOpSpec b)
-> Maybe Bool
-> TriggerOpsDef b
TriggerOpsDef
        (SubscribeOpSpec ('Postgres pgKind)
-> SubscribeOpSpec ('Postgres pgKind)
rewriteOpSpec (SubscribeOpSpec ('Postgres pgKind)
 -> SubscribeOpSpec ('Postgres pgKind))
-> Maybe (SubscribeOpSpec ('Postgres pgKind))
-> Maybe (SubscribeOpSpec ('Postgres pgKind))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (SubscribeOpSpec ('Postgres pgKind))
ins)
        (SubscribeOpSpec ('Postgres pgKind)
-> SubscribeOpSpec ('Postgres pgKind)
rewriteOpSpec (SubscribeOpSpec ('Postgres pgKind)
 -> SubscribeOpSpec ('Postgres pgKind))
-> Maybe (SubscribeOpSpec ('Postgres pgKind))
-> Maybe (SubscribeOpSpec ('Postgres pgKind))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (SubscribeOpSpec ('Postgres pgKind))
upd)
        (SubscribeOpSpec ('Postgres pgKind)
-> SubscribeOpSpec ('Postgres pgKind)
rewriteOpSpec (SubscribeOpSpec ('Postgres pgKind)
 -> SubscribeOpSpec ('Postgres pgKind))
-> Maybe (SubscribeOpSpec ('Postgres pgKind))
-> Maybe (SubscribeOpSpec ('Postgres pgKind))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (SubscribeOpSpec ('Postgres pgKind))
del)
        Maybe Bool
man
    rewriteEventTriggerConf :: EventTriggerConf ('Postgres pgKind)
-> EventTriggerConf ('Postgres pgKind)
rewriteEventTriggerConf EventTriggerConf ('Postgres pgKind)
etc =
      EventTriggerConf ('Postgres pgKind)
etc
        { etcDefinition :: TriggerOpsDef ('Postgres pgKind)
etcDefinition =
            TriggerOpsDef ('Postgres pgKind)
-> TriggerOpsDef ('Postgres pgKind)
rewriteTrigOpsDef (TriggerOpsDef ('Postgres pgKind)
 -> TriggerOpsDef ('Postgres pgKind))
-> TriggerOpsDef ('Postgres pgKind)
-> TriggerOpsDef ('Postgres pgKind)
forall a b. (a -> b) -> a -> b
$ EventTriggerConf ('Postgres pgKind)
-> TriggerOpsDef ('Postgres pgKind)
forall (b :: BackendType). EventTriggerConf b -> TriggerOpsDef b
etcDefinition EventTriggerConf ('Postgres pgKind)
etc
        }
    getNewCol :: PGCol -> PGCol
getNewCol PGCol
col =
      if QualifiedTable
table QualifiedTable -> QualifiedTable -> Bool
forall a. Eq a => a -> a -> Bool
== QualifiedTable
refTable Bool -> Bool -> Bool
&& PGCol
oCol PGCol -> PGCol -> Bool
forall a. Eq a => a -> a -> Bool
== PGCol
col then PGCol
nCol else PGCol
col

unlockEventsInSource ::
  (MonadIO m) =>
  SourceConfig ('Postgres pgKind) ->
  NE.NESet EventId ->
  m (Either QErr Int)
unlockEventsInSource :: forall (m :: * -> *) (pgKind :: PostgresKind).
MonadIO m =>
SourceConfig ('Postgres pgKind)
-> NESet EventId -> m (Either QErr Int)
unlockEventsInSource SourceConfig ('Postgres pgKind)
sourceConfig NESet EventId
eventIds =
  IO (Either QErr Int) -> m (Either QErr Int)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either QErr Int) -> m (Either QErr Int))
-> IO (Either QErr Int) -> m (Either QErr Int)
forall a b. (a -> b) -> a -> b
$ PGSourceConfig
-> PGExecFrom -> TxET QErr IO Int -> IO (Either QErr Int)
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> PGExecFrom -> TxET QErr m a -> m (Either QErr a)
runPgSourceWriteTx SourceConfig ('Postgres pgKind)
PGSourceConfig
sourceConfig PGExecFrom
InternalRawQuery ([EventId] -> TxET QErr IO Int
unlockEventsTx ([EventId] -> TxET QErr IO Int) -> [EventId] -> TxET QErr IO Int
forall a b. (a -> b) -> a -> b
$ NESet EventId -> [EventId]
forall a. NESet a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList NESet EventId
eventIds)

-- Check if any trigger function for any of the operation exists with the 'triggerName'
checkIfTriggerExists ::
  (MonadIO m, MonadError QErr m) =>
  PGSourceConfig ->
  TriggerName ->
  HashSet Ops ->
  m Bool
checkIfTriggerExists :: forall (m :: * -> *).
(MonadIO m, MonadError QErr m) =>
PGSourceConfig -> TriggerName -> HashSet Ops -> m Bool
checkIfTriggerExists PGSourceConfig
sourceConfig TriggerName
triggerName HashSet Ops
ops = do
  m (Either QErr Bool) -> m Bool
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM
    (m (Either QErr Bool) -> m Bool) -> m (Either QErr Bool) -> m Bool
forall a b. (a -> b) -> a -> b
$ IO (Either QErr Bool) -> m (Either QErr Bool)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
    (IO (Either QErr Bool) -> m (Either QErr Bool))
-> IO (Either QErr Bool) -> m (Either QErr Bool)
forall a b. (a -> b) -> a -> b
$ PGSourceConfig
-> PGExecFrom -> TxET QErr IO Bool -> IO (Either QErr Bool)
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> PGExecFrom -> TxET QErr m a -> m (Either QErr a)
runPgSourceWriteTx PGSourceConfig
sourceConfig PGExecFrom
InternalRawQuery
    (TxET QErr IO Bool -> IO (Either QErr Bool))
-> TxET QErr IO Bool -> IO (Either QErr Bool)
forall a b. (a -> b) -> a -> b
$
    -- We want to avoid creating event triggers with same name since this will
    -- cause undesired behaviour. Note that only SQL functions associated with
    -- SQL triggers are dropped when "replace = true" is set in the event trigger
    -- configuration. Hence, the best way to check if we should allow the
    -- creation of a trigger with the name 'triggerName' is to check if any
    -- function with such a name exists in the the hdb_catalog.
    --
    -- For eg: If a create_event_trigger request comes with trigger name as
    -- "triggerName" and there is already a trigger with "triggerName" in the
    -- metadata, then
    --    1. When "replace = false", the function with name 'triggerName' exists
    --       so the creation is not allowed
    --    2. When "replace = true", the function with name 'triggerName' is first
    --       dropped, hence we are allowed to create the trigger with name
    --       'triggerName'
    ([Bool] -> Bool) -> TxET QErr IO [Bool] -> TxET QErr IO Bool
forall a b. (a -> b) -> TxET QErr IO a -> TxET QErr IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap [Bool] -> Bool
forall (t :: * -> *). Foldable t => t Bool -> Bool
or ((Ops -> TxET QErr IO Bool) -> [Ops] -> TxET QErr IO [Bool]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> [a] -> f [b]
traverse (TriggerName -> Ops -> TxET QErr IO Bool
checkIfFunctionExistsQ TriggerName
triggerName) (HashSet Ops -> [Ops]
forall a. HashSet a -> [a]
HashSet.toList HashSet Ops
ops))

---- DATABASE QUERIES ---------------------
--
--   The API for our in-database work queue:
-------------------------------------------

insertInvocation :: TriggerName -> Invocation 'EventType -> PG.TxE QErr ()
insertInvocation :: TriggerName -> Invocation 'EventType -> TxET QErr IO ()
insertInvocation TriggerName
tName Invocation 'EventType
invo = do
  (PGTxErr -> QErr)
-> Query
-> (EventId, Text, Maybe Int64, ViaJSON Value, ViaJSON Value)
-> Bool
-> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE
    PGTxErr -> QErr
defaultTxErrorHandler
    [PG.sql|
          INSERT INTO hdb_catalog.event_invocation_logs (event_id, trigger_name, status, request, response)
          VALUES ($1, $2, $3, $4, $5)
          |]
    ( Invocation 'EventType -> EventId
forall (a :: TriggerTypes). Invocation a -> EventId
iEventId Invocation 'EventType
invo,
      (TriggerName -> Text
triggerNameToTxt TriggerName
tName),
      Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Int64) -> Maybe Int -> Maybe Int64
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Invocation 'EventType -> Maybe Int
forall (a :: TriggerTypes). Invocation a -> Maybe Int
iStatus Invocation 'EventType
invo :: Maybe Int64,
      Value -> ViaJSON Value
forall a. a -> ViaJSON a
PG.ViaJSON (Value -> ViaJSON Value) -> Value -> ViaJSON Value
forall a b. (a -> b) -> a -> b
$ WebhookRequest -> Value
forall a. ToJSON a => a -> Value
toJSON (WebhookRequest -> Value) -> WebhookRequest -> Value
forall a b. (a -> b) -> a -> b
$ Invocation 'EventType -> WebhookRequest
forall (a :: TriggerTypes). Invocation a -> WebhookRequest
iRequest Invocation 'EventType
invo,
      Value -> ViaJSON Value
forall a. a -> ViaJSON a
PG.ViaJSON (Value -> ViaJSON Value) -> Value -> ViaJSON Value
forall a b. (a -> b) -> a -> b
$ Response 'EventType -> Value
forall a. ToJSON a => a -> Value
toJSON (Response 'EventType -> Value) -> Response 'EventType -> Value
forall a b. (a -> b) -> a -> b
$ Invocation 'EventType -> Response 'EventType
forall (a :: TriggerTypes). Invocation a -> Response a
iResponse Invocation 'EventType
invo
    )
    Bool
True
  (PGTxErr -> QErr)
-> Query -> Identity EventId -> Bool -> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE
    PGTxErr -> QErr
defaultTxErrorHandler
    [PG.sql|
          UPDATE hdb_catalog.event_log

          SET tries = tries + 1
          WHERE id = $1
          |]
    (EventId -> Identity EventId
forall a. a -> Identity a
Identity (EventId -> Identity EventId) -> EventId -> Identity EventId
forall a b. (a -> b) -> a -> b
$ Invocation 'EventType -> EventId
forall (a :: TriggerTypes). Invocation a -> EventId
iEventId Invocation 'EventType
invo)
    Bool
True

insertPGManualEvent ::
  QualifiedTable ->
  TriggerName ->
  Value ->
  PG.TxE QErr EventId
insertPGManualEvent :: QualifiedTable -> TriggerName -> Value -> TxET QErr IO EventId
insertPGManualEvent (QualifiedObject SchemaName
schemaName TableName
tableName) TriggerName
triggerName Value
rowData = do
  Identity EventId -> EventId
forall a. Identity a -> a
runIdentity
    (Identity EventId -> EventId)
-> (SingleRow (Identity EventId) -> Identity EventId)
-> SingleRow (Identity EventId)
-> EventId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SingleRow (Identity EventId) -> Identity EventId
forall a. SingleRow a -> a
PG.getRow
    (SingleRow (Identity EventId) -> EventId)
-> TxET QErr IO (SingleRow (Identity EventId))
-> TxET QErr IO EventId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> (SchemaName, TableName, TriggerName, Text, ViaJSON Value)
-> Bool
-> TxET QErr IO (SingleRow (Identity EventId))
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
PG.withQE
      PGTxErr -> QErr
defaultTxErrorHandler
      [PG.sql|
    SELECT hdb_catalog.insert_event_log($1, $2, $3, $4, $5)
  |]
      (SchemaName
schemaName, TableName
tableName, TriggerName
triggerName, (Ops -> Text
forall a. Show a => a -> Text
tshow Ops
MANUAL), Value -> ViaJSON Value
forall a. a -> ViaJSON a
PG.ViaJSON Value
rowData)
      Bool
False

archiveEvents :: TriggerName -> PG.TxE QErr ()
archiveEvents :: TriggerName -> TxET QErr IO ()
archiveEvents TriggerName
trn =
  (PGTxErr -> QErr)
-> Query -> Identity TriggerName -> Bool -> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE
    PGTxErr -> QErr
defaultTxErrorHandler
    [PG.sql|
           UPDATE hdb_catalog.event_log
           SET archived = 't'
           WHERE trigger_name = $1
                |]
    (TriggerName -> Identity TriggerName
forall a. a -> Identity a
Identity TriggerName
trn)
    Bool
False

getMaintenanceModeVersionTx :: PG.TxE QErr MaintenanceModeVersion
getMaintenanceModeVersionTx :: TxET QErr IO MaintenanceModeVersion
getMaintenanceModeVersionTx = TxET QErr IO MaintenanceModeVersion
-> TxET QErr IO MaintenanceModeVersion
forall a. TxE QErr a -> TxE QErr a
forall (m :: * -> *) a. MonadTx m => TxE QErr a -> m a
liftTx (TxET QErr IO MaintenanceModeVersion
 -> TxET QErr IO MaintenanceModeVersion)
-> TxET QErr IO MaintenanceModeVersion
-> TxET QErr IO MaintenanceModeVersion
forall a b. (a -> b) -> a -> b
$ do
  MetadataCatalogVersion
catalogVersion <- TxE QErr MetadataCatalogVersion
getCatalogVersion -- From the user's DB
  -- the previous version and the current version will change depending
  -- upon between which versions we need to support maintenance mode
  if
    | MetadataCatalogVersion
catalogVersion MetadataCatalogVersion -> MetadataCatalogVersion -> Bool
forall a. Eq a => a -> a -> Bool
== Int -> MetadataCatalogVersion
MetadataCatalogVersion Int
40 -> MaintenanceModeVersion -> TxET QErr IO MaintenanceModeVersion
forall a. a -> TxET QErr IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure MaintenanceModeVersion
PreviousMMVersion
    -- The catalog is migrated to the 43rd version for a source
    -- which was initialised by a v1 graphql-engine instance (See @initSource@).
    | MetadataCatalogVersion
catalogVersion MetadataCatalogVersion -> MetadataCatalogVersion -> Bool
forall a. Eq a => a -> a -> Bool
== Int -> MetadataCatalogVersion
MetadataCatalogVersion Int
43 -> MaintenanceModeVersion -> TxET QErr IO MaintenanceModeVersion
forall a. a -> TxET QErr IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure MaintenanceModeVersion
CurrentMMVersion
    | MetadataCatalogVersion
catalogVersion MetadataCatalogVersion -> MetadataCatalogVersion -> Bool
forall a. Eq a => a -> a -> Bool
== MetadataCatalogVersion
latestCatalogVersion -> MaintenanceModeVersion -> TxET QErr IO MaintenanceModeVersion
forall a. a -> TxET QErr IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure MaintenanceModeVersion
CurrentMMVersion
    | Bool
otherwise ->
        Text -> TxET QErr IO MaintenanceModeVersion
forall (m :: * -> *) a. QErrM m => Text -> m a
throw500
          (Text -> TxET QErr IO MaintenanceModeVersion)
-> Text -> TxET QErr IO MaintenanceModeVersion
forall a b. (a -> b) -> a -> b
$ Text
"Maintenance mode is only supported with catalog versions: 40, 43 and "
          Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
forall a. Show a => a -> Text
tshow Text
latestCatalogVersionString

-- | Lock and return events not yet being processed or completed, up to some
-- limit. Process events approximately in created_at order, but we make no
-- ordering guarentees; events can and will race. Nevertheless we want to
-- ensure newer change events don't starve older ones.
fetchEvents :: SourceName -> [TriggerName] -> FetchBatchSize -> PG.TxE QErr [Event ('Postgres pgKind)]
fetchEvents :: forall (pgKind :: PostgresKind).
SourceName
-> [TriggerName]
-> FetchBatchSize
-> TxE QErr [Event ('Postgres pgKind)]
fetchEvents SourceName
source [TriggerName]
triggerNames (FetchBatchSize Int
fetchBatchSize) =
  ((EventId, SchemaName, TableName, TriggerName, ViaJSON Value, Int,
  LocalTime, Maybe UTCTime, UTCTime, Maybe UTCTime)
 -> Event ('Postgres pgKind))
-> [(EventId, SchemaName, TableName, TriggerName, ViaJSON Value,
     Int, LocalTime, Maybe UTCTime, UTCTime, Maybe UTCTime)]
-> [Event ('Postgres pgKind)]
forall a b. (a -> b) -> [a] -> [b]
map (EventId, SchemaName, TableName, TriggerName, ViaJSON Value, Int,
 LocalTime, Maybe UTCTime, UTCTime, Maybe UTCTime)
-> Event ('Postgres pgKind)
uncurryEvent
    ([(EventId, SchemaName, TableName, TriggerName, ViaJSON Value, Int,
   LocalTime, Maybe UTCTime, UTCTime, Maybe UTCTime)]
 -> [Event ('Postgres pgKind)])
-> TxET
     QErr
     IO
     [(EventId, SchemaName, TableName, TriggerName, ViaJSON Value, Int,
       LocalTime, Maybe UTCTime, UTCTime, Maybe UTCTime)]
-> TxET QErr IO [Event ('Postgres pgKind)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> (Word64, PGTextArray)
-> Bool
-> TxET
     QErr
     IO
     [(EventId, SchemaName, TableName, TriggerName, ViaJSON Value, Int,
       LocalTime, Maybe UTCTime, UTCTime, Maybe UTCTime)]
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
PG.withQE
      PGTxErr -> QErr
defaultTxErrorHandler
      [PG.sql|
      UPDATE hdb_catalog.event_log
      SET locked = NOW()
      WHERE id IN ( SELECT l.id
                    FROM hdb_catalog.event_log l
                    WHERE l.delivered = 'f' and l.error = 'f'
                          and (l.locked IS NULL or l.locked < (NOW() - interval '30 minute'))
                          and (l.next_retry_at is NULL or l.next_retry_at <= now())
                          and l.archived = 'f'
                          and l.trigger_name = ANY($2)
                    /* NB: this ordering is important for our index `event_log_fetch_events` */
                    /* (see `init_pg_source.sql`) */
                    ORDER BY locked NULLS FIRST, next_retry_at NULLS FIRST, created_at
                    LIMIT $1
                    FOR UPDATE SKIP LOCKED )
      RETURNING id, schema_name, table_name, trigger_name, payload::json, tries, created_at, next_retry_at, 
      -- We need the UTC values of `created_at` and `next_retry_at` for metrics
      -- calculation.
      --
      -- Only `TIMESTAMPZ` (time with timezone offset) values can be used for proper  
      -- conversions between timezones.
      --
      -- Since `created_at` and `next_retry_at` are `TIMESTAMP` values (time without
      -- timezone offset), we first convert them to TIMESTAMPZ` values by appending
      -- the timezone offset of the database. And then convert the `TIMESTAMPZ`
      -- values to UTC.
      (select (select created_at at time zone (select current_setting ('TIMEZONE'))) at time zone 'UTC'), 
      (select (select next_retry_at at time zone (select current_setting ('TIMEZONE'))) at time zone 'UTC');
      |]
      (Word64
limit, PGTextArray
triggerNamesTxt)
      Bool
True
  where
    uncurryEvent :: (EventId, SchemaName, TableName, TriggerName, ViaJSON Value, Int,
 LocalTime, Maybe UTCTime, UTCTime, Maybe UTCTime)
-> Event ('Postgres pgKind)
uncurryEvent (EventId
id', SchemaName
sourceName, TableName
tableName, TriggerName
triggerName, PG.ViaJSON Value
payload, Int
tries, LocalTime
created, Maybe UTCTime
retryAt, UTCTime
createdAtUTC, Maybe UTCTime
retryAtUTC :: Maybe UTCTime) =
      Event
        { eId :: EventId
eId = EventId
id',
          eSource :: SourceName
eSource = SourceName
source,
          eTable :: TableName ('Postgres pgKind)
eTable = SchemaName -> TableName -> QualifiedTable
forall a. SchemaName -> a -> QualifiedObject a
QualifiedObject SchemaName
sourceName TableName
tableName,
          eTrigger :: TriggerMetadata
eTrigger = TriggerName -> TriggerMetadata
TriggerMetadata TriggerName
triggerName,
          eEvent :: Value
eEvent = Value
payload,
          eTries :: Int
eTries = Int
tries,
          eCreatedAt :: LocalTime
eCreatedAt = LocalTime
created,
          eRetryAt :: Maybe UTCTime
eRetryAt = Maybe UTCTime
retryAt,
          -- eCreatedAtUTC and eRetryAtUTC are used for calculating metrics only
          eCreatedAtUTC :: UTCTime
eCreatedAtUTC = UTCTime
createdAtUTC,
          eRetryAtUTC :: Maybe UTCTime
eRetryAtUTC = Maybe UTCTime
retryAtUTC
        }
    limit :: Word64
limit = Int -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
fetchBatchSize :: Word64

    triggerNamesTxt :: PGTextArray
triggerNamesTxt = [Text] -> PGTextArray
PGTextArray ([Text] -> PGTextArray) -> [Text] -> PGTextArray
forall a b. (a -> b) -> a -> b
$ TriggerName -> Text
triggerNameToTxt (TriggerName -> Text) -> [TriggerName] -> [Text]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [TriggerName]
triggerNames

fetchEventsMaintenanceMode :: SourceName -> [TriggerName] -> FetchBatchSize -> MaintenanceModeVersion -> PG.TxE QErr [Event ('Postgres pgKind)]
fetchEventsMaintenanceMode :: forall (pgKind :: PostgresKind).
SourceName
-> [TriggerName]
-> FetchBatchSize
-> MaintenanceModeVersion
-> TxE QErr [Event ('Postgres pgKind)]
fetchEventsMaintenanceMode SourceName
sourceName [TriggerName]
triggerNames FetchBatchSize
fetchBatchSize = \case
  MaintenanceModeVersion
PreviousMMVersion ->
    ((EventId, SchemaName, TableName, TriggerName, ViaJSON Value, Int,
  LocalTime, Maybe UTCTime, UTCTime, Maybe UTCTime)
 -> Event ('Postgres pgKind))
-> [(EventId, SchemaName, TableName, TriggerName, ViaJSON Value,
     Int, LocalTime, Maybe UTCTime, UTCTime, Maybe UTCTime)]
-> [Event ('Postgres pgKind)]
forall a b. (a -> b) -> [a] -> [b]
map (EventId, SchemaName, TableName, TriggerName, ViaJSON Value, Int,
 LocalTime, Maybe UTCTime, UTCTime, Maybe UTCTime)
-> Event ('Postgres pgKind)
forall {b :: BackendType} {a}.
(TableName b ~ QualifiedObject a) =>
(EventId, SchemaName, a, TriggerName, ViaJSON Value, Int,
 LocalTime, Maybe UTCTime, UTCTime, Maybe UTCTime)
-> Event b
uncurryEvent
      ([(EventId, SchemaName, TableName, TriggerName, ViaJSON Value, Int,
   LocalTime, Maybe UTCTime, UTCTime, Maybe UTCTime)]
 -> [Event ('Postgres pgKind)])
-> TxET
     QErr
     IO
     [(EventId, SchemaName, TableName, TriggerName, ViaJSON Value, Int,
       LocalTime, Maybe UTCTime, UTCTime, Maybe UTCTime)]
-> TxE QErr [Event ('Postgres pgKind)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> Identity Word64
-> Bool
-> TxET
     QErr
     IO
     [(EventId, SchemaName, TableName, TriggerName, ViaJSON Value, Int,
       LocalTime, Maybe UTCTime, UTCTime, Maybe UTCTime)]
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
PG.withQE
        PGTxErr -> QErr
defaultTxErrorHandler
        [PG.sql|
        UPDATE hdb_catalog.event_log
        SET locked = 't'
        WHERE id IN ( SELECT l.id
                      FROM hdb_catalog.event_log l
                      WHERE l.delivered = 'f' and l.error = 'f' and l.locked = 'f'
                            and (l.next_retry_at is NULL or l.next_retry_at <= now())
                            and l.archived = 'f'
                      ORDER BY created_at
                      LIMIT $1
                      FOR UPDATE SKIP LOCKED )
        RETURNING id, schema_name, table_name, trigger_name, payload::json, tries, created_at, next_retry_at,
        (select (select created_at at time zone (select current_setting ('TIMEZONE'))) at time zone 'UTC'), 
        (select (select next_retry_at at time zone (select current_setting ('TIMEZONE'))) at time zone 'UTC');
        |]
        (Word64 -> Identity Word64
forall a. a -> Identity a
Identity Word64
limit)
        Bool
True
    where
      uncurryEvent :: (EventId, SchemaName, a, TriggerName, ViaJSON Value, Int,
 LocalTime, Maybe UTCTime, UTCTime, Maybe UTCTime)
-> Event b
uncurryEvent (EventId
id', SchemaName
sn, a
tn, TriggerName
trn, PG.ViaJSON Value
payload, Int
tries, LocalTime
created, Maybe UTCTime
retryAt, UTCTime
createdAtUTC, Maybe UTCTime
retryAtUTC) =
        Event
          { eId :: EventId
eId = EventId
id',
            eSource :: SourceName
eSource = SourceName
SNDefault, -- in v1, there'll only be the default source
            eTable :: TableName b
eTable = SchemaName -> a -> QualifiedObject a
forall a. SchemaName -> a -> QualifiedObject a
QualifiedObject SchemaName
sn a
tn,
            eTrigger :: TriggerMetadata
eTrigger = TriggerName -> TriggerMetadata
TriggerMetadata TriggerName
trn,
            eEvent :: Value
eEvent = Value
payload,
            eTries :: Int
eTries = Int
tries,
            eCreatedAt :: LocalTime
eCreatedAt = LocalTime
created,
            eRetryAt :: Maybe UTCTime
eRetryAt = Maybe UTCTime
retryAt,
            -- eCreatedAtUTC and eRetryAtUTC are used for calculating metrics only
            eCreatedAtUTC :: UTCTime
eCreatedAtUTC = UTCTime
createdAtUTC,
            eRetryAtUTC :: Maybe UTCTime
eRetryAtUTC = Maybe UTCTime
retryAtUTC
          }
      limit :: Word64
limit = Int -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (FetchBatchSize -> Int
_unFetchBatchSize FetchBatchSize
fetchBatchSize) :: Word64
  MaintenanceModeVersion
CurrentMMVersion -> SourceName
-> [TriggerName]
-> FetchBatchSize
-> TxE QErr [Event ('Postgres pgKind)]
forall (pgKind :: PostgresKind).
SourceName
-> [TriggerName]
-> FetchBatchSize
-> TxE QErr [Event ('Postgres pgKind)]
fetchEvents SourceName
sourceName [TriggerName]
triggerNames FetchBatchSize
fetchBatchSize

setSuccessTx :: Event ('Postgres pgKind) -> MaintenanceMode MaintenanceModeVersion -> PG.TxE QErr ()
setSuccessTx :: forall (pgKind :: PostgresKind).
Event ('Postgres pgKind)
-> MaintenanceMode MaintenanceModeVersion -> TxET QErr IO ()
setSuccessTx Event ('Postgres pgKind)
e = \case
  (MaintenanceModeEnabled MaintenanceModeVersion
PreviousMMVersion) ->
    (PGTxErr -> QErr)
-> Query -> Identity EventId -> Bool -> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE
      PGTxErr -> QErr
defaultTxErrorHandler
      [PG.sql|
    UPDATE hdb_catalog.event_log
    SET delivered = 't', next_retry_at = NULL, locked = 'f'
    WHERE id = $1
    |]
      (EventId -> Identity EventId
forall a. a -> Identity a
Identity (EventId -> Identity EventId) -> EventId -> Identity EventId
forall a b. (a -> b) -> a -> b
$ Event ('Postgres pgKind) -> EventId
forall (b :: BackendType). Event b -> EventId
eId Event ('Postgres pgKind)
e)
      Bool
True
  (MaintenanceModeEnabled MaintenanceModeVersion
CurrentMMVersion) -> TxET QErr IO ()
latestVersionSetSuccess
  MaintenanceMode MaintenanceModeVersion
MaintenanceModeDisabled -> TxET QErr IO ()
latestVersionSetSuccess
  where
    latestVersionSetSuccess :: TxET QErr IO ()
latestVersionSetSuccess =
      (PGTxErr -> QErr)
-> Query -> Identity EventId -> Bool -> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE
        PGTxErr -> QErr
defaultTxErrorHandler
        [PG.sql|
      UPDATE hdb_catalog.event_log
      SET delivered = 't', next_retry_at = NULL, locked = NULL
      WHERE id = $1
      |]
        (EventId -> Identity EventId
forall a. a -> Identity a
Identity (EventId -> Identity EventId) -> EventId -> Identity EventId
forall a b. (a -> b) -> a -> b
$ Event ('Postgres pgKind) -> EventId
forall (b :: BackendType). Event b -> EventId
eId Event ('Postgres pgKind)
e)
        Bool
True

setErrorTx :: Event ('Postgres pgKind) -> MaintenanceMode MaintenanceModeVersion -> PG.TxE QErr ()
setErrorTx :: forall (pgKind :: PostgresKind).
Event ('Postgres pgKind)
-> MaintenanceMode MaintenanceModeVersion -> TxET QErr IO ()
setErrorTx Event ('Postgres pgKind)
e = \case
  (MaintenanceModeEnabled MaintenanceModeVersion
PreviousMMVersion) ->
    (PGTxErr -> QErr)
-> Query -> Identity EventId -> Bool -> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE
      PGTxErr -> QErr
defaultTxErrorHandler
      [PG.sql|
    UPDATE hdb_catalog.event_log
    SET error = 't', next_retry_at = NULL, locked = 'f'
    WHERE id = $1
    |]
      (EventId -> Identity EventId
forall a. a -> Identity a
Identity (EventId -> Identity EventId) -> EventId -> Identity EventId
forall a b. (a -> b) -> a -> b
$ Event ('Postgres pgKind) -> EventId
forall (b :: BackendType). Event b -> EventId
eId Event ('Postgres pgKind)
e)
      Bool
True
  (MaintenanceModeEnabled MaintenanceModeVersion
CurrentMMVersion) -> TxET QErr IO ()
latestVersionSetError
  MaintenanceMode MaintenanceModeVersion
MaintenanceModeDisabled -> TxET QErr IO ()
latestVersionSetError
  where
    latestVersionSetError :: TxET QErr IO ()
latestVersionSetError =
      (PGTxErr -> QErr)
-> Query -> Identity EventId -> Bool -> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE
        PGTxErr -> QErr
defaultTxErrorHandler
        [PG.sql|
      UPDATE hdb_catalog.event_log
      SET error = 't', next_retry_at = NULL, locked = NULL
      WHERE id = $1
      |]
        (EventId -> Identity EventId
forall a. a -> Identity a
Identity (EventId -> Identity EventId) -> EventId -> Identity EventId
forall a b. (a -> b) -> a -> b
$ Event ('Postgres pgKind) -> EventId
forall (b :: BackendType). Event b -> EventId
eId Event ('Postgres pgKind)
e)
        Bool
True

setRetryTx :: Event ('Postgres pgKind) -> Time.UTCTime -> MaintenanceMode MaintenanceModeVersion -> PG.TxE QErr ()
setRetryTx :: forall (pgKind :: PostgresKind).
Event ('Postgres pgKind)
-> UTCTime
-> MaintenanceMode MaintenanceModeVersion
-> TxET QErr IO ()
setRetryTx Event ('Postgres pgKind)
e UTCTime
time = \case
  (MaintenanceModeEnabled MaintenanceModeVersion
PreviousMMVersion) ->
    (PGTxErr -> QErr)
-> Query -> (UTCTime, EventId) -> Bool -> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE
      PGTxErr -> QErr
defaultTxErrorHandler
      [PG.sql|
    UPDATE hdb_catalog.event_log
    SET next_retry_at = $1, locked = 'f'
    WHERE id = $2
    |]
      (UTCTime
time, Event ('Postgres pgKind) -> EventId
forall (b :: BackendType). Event b -> EventId
eId Event ('Postgres pgKind)
e)
      Bool
True
  (MaintenanceModeEnabled MaintenanceModeVersion
CurrentMMVersion) -> TxET QErr IO ()
latestVersionSetRetry
  MaintenanceMode MaintenanceModeVersion
MaintenanceModeDisabled -> TxET QErr IO ()
latestVersionSetRetry
  where
    latestVersionSetRetry :: TxET QErr IO ()
latestVersionSetRetry =
      (PGTxErr -> QErr)
-> Query -> (UTCTime, EventId) -> Bool -> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE
        PGTxErr -> QErr
defaultTxErrorHandler
        [PG.sql|
              UPDATE hdb_catalog.event_log
              SET next_retry_at = $1, locked = NULL
              WHERE id = $2
              |]
        (UTCTime
time, Event ('Postgres pgKind) -> EventId
forall (b :: BackendType). Event b -> EventId
eId Event ('Postgres pgKind)
e)
        Bool
True

dropTriggerQ :: TriggerName -> PG.TxE QErr ()
dropTriggerQ :: TriggerName -> TxET QErr IO ()
dropTriggerQ TriggerName
trn =
  (Ops -> TxET QErr IO ()) -> [Ops] -> TxET QErr IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (TriggerName -> Ops -> TxET QErr IO ()
dropTriggerOp TriggerName
trn) [Ops
INSERT, Ops
UPDATE, Ops
DELETE]

dropTriggerOp :: TriggerName -> Ops -> PG.TxE QErr ()
dropTriggerOp :: TriggerName -> Ops -> TxET QErr IO ()
dropTriggerOp TriggerName
triggerName Ops
triggerOp =
  (PGTxErr -> QErr) -> Query -> () -> Bool -> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE
    PGTxErr -> QErr
defaultTxErrorHandler
    (Text -> Query
PG.fromText (Text -> Query) -> Text -> Query
forall a b. (a -> b) -> a -> b
$ Ops -> Text
getDropFuncSql Ops
triggerOp)
    ()
    Bool
False
  where
    getDropFuncSql :: Ops -> Text
    getDropFuncSql :: Ops -> Text
getDropFuncSql Ops
op =
      Text
"DROP FUNCTION IF EXISTS"
        Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" hdb_catalog."
        Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> QualifiedTriggerName -> Text
unQualifiedTriggerName (Ops -> TriggerName -> QualifiedTriggerName
pgIdenTrigger Ops
op TriggerName
triggerName)
        Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"()"
        Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" CASCADE"

checkEvent :: EventId -> PG.TxE QErr ()
checkEvent :: EventId -> TxET QErr IO ()
checkEvent EventId
eid = do
  [Identity Bool]
events <-
    (PGTxErr -> QErr)
-> Query
-> Identity EventId
-> Bool
-> TxET QErr IO [Identity Bool]
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
PG.withQE
      PGTxErr -> QErr
defaultTxErrorHandler
      [PG.sql|
              SELECT l.locked IS NOT NULL AND l.locked >= (NOW() - interval '30 minute')
              FROM hdb_catalog.event_log l
              WHERE l.id = $1
              |]
      (EventId -> Identity EventId
forall a. a -> Identity a
Identity EventId
eid)
      Bool
True
  Identity Bool
event <- [Identity Bool] -> TxET QErr IO (Identity Bool)
forall {m :: * -> *} {a}. MonadError QErr m => [a] -> m a
getEvent [Identity Bool]
events
  Identity Bool -> TxET QErr IO ()
forall {f :: * -> *}. MonadError QErr f => Identity Bool -> f ()
assertEventUnlocked Identity Bool
event
  where
    getEvent :: [a] -> m a
getEvent [] = Code -> Text -> m a
forall (m :: * -> *) a. QErrM m => Code -> Text -> m a
throw400 Code
NotExists Text
"event not found"
    getEvent (a
x : [a]
_) = a -> m a
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
x

    assertEventUnlocked :: Identity Bool -> f ()
assertEventUnlocked (Identity Bool
locked) =
      Bool -> f () -> f ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
locked
        (f () -> f ()) -> f () -> f ()
forall a b. (a -> b) -> a -> b
$ Code -> Text -> f ()
forall (m :: * -> *) a. QErrM m => Code -> Text -> m a
throw400 Code
Busy Text
"event is already being processed"

markForDelivery :: EventId -> PG.TxE QErr ()
markForDelivery :: EventId -> TxET QErr IO ()
markForDelivery EventId
eid =
  (PGTxErr -> QErr)
-> Query -> Identity EventId -> Bool -> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE
    PGTxErr -> QErr
defaultTxErrorHandler
    [PG.sql|
          UPDATE hdb_catalog.event_log
          SET
          delivered = 'f',
          error = 'f',
          tries = 0
          WHERE id = $1
          |]
    (EventId -> Identity EventId
forall a. a -> Identity a
Identity EventId
eid)
    Bool
True

redeliverEventTx :: EventId -> PG.TxE QErr ()
redeliverEventTx :: EventId -> TxET QErr IO ()
redeliverEventTx EventId
eventId = do
  EventId -> TxET QErr IO ()
checkEvent EventId
eventId
  EventId -> TxET QErr IO ()
markForDelivery EventId
eventId

-- | unlockEvents takes an array of 'EventId' and unlocks them. This function is called
--   when a graceful shutdown is initiated.
unlockEventsTx :: [EventId] -> PG.TxE QErr Int
unlockEventsTx :: [EventId] -> TxET QErr IO Int
unlockEventsTx [EventId]
eventIds =
  Identity Int -> Int
forall a. Identity a -> a
runIdentity
    (Identity Int -> Int)
-> (SingleRow (Identity Int) -> Identity Int)
-> SingleRow (Identity Int)
-> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SingleRow (Identity Int) -> Identity Int
forall a. SingleRow a -> a
PG.getRow
    (SingleRow (Identity Int) -> Int)
-> TxET QErr IO (SingleRow (Identity Int)) -> TxET QErr IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> Identity PGTextArray
-> Bool
-> TxET QErr IO (SingleRow (Identity Int))
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
PG.withQE
      PGTxErr -> QErr
defaultTxErrorHandler
      [PG.sql|
     WITH "cte" AS
     (UPDATE hdb_catalog.event_log
     SET locked = NULL
     WHERE id = ANY($1::text[])
     -- only unlock those events that have been locked, it's possible
     -- that an event has been processed but not yet been removed from
     -- the saved locked events, which will lead to a double send
     AND locked IS NOT NULL
     RETURNING *)
     SELECT count(*) FROM "cte"
   |]
      (PGTextArray -> Identity PGTextArray
forall a. a -> Identity a
Identity (PGTextArray -> Identity PGTextArray)
-> PGTextArray -> Identity PGTextArray
forall a b. (a -> b) -> a -> b
$ [Text] -> PGTextArray
PGTextArray ([Text] -> PGTextArray) -> [Text] -> PGTextArray
forall a b. (a -> b) -> a -> b
$ (EventId -> Text) -> [EventId] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map EventId -> Text
unEventId [EventId]
eventIds)
      Bool
True

---- Postgres event trigger utility functions ---------------------

-- | QualifiedTriggerName is a type to store the name of the SQL trigger.
--   An example of it is `"notify_hasura_users_all_INSERT"` where `users_all`
--   is the name of the event trigger.
newtype QualifiedTriggerName = QualifiedTriggerName {QualifiedTriggerName -> Text
unQualifiedTriggerName :: Text}
  deriving (Int -> QualifiedTriggerName -> ShowS
[QualifiedTriggerName] -> ShowS
QualifiedTriggerName -> String
(Int -> QualifiedTriggerName -> ShowS)
-> (QualifiedTriggerName -> String)
-> ([QualifiedTriggerName] -> ShowS)
-> Show QualifiedTriggerName
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> QualifiedTriggerName -> ShowS
showsPrec :: Int -> QualifiedTriggerName -> ShowS
$cshow :: QualifiedTriggerName -> String
show :: QualifiedTriggerName -> String
$cshowList :: [QualifiedTriggerName] -> ShowS
showList :: [QualifiedTriggerName] -> ShowS
Show, QualifiedTriggerName -> QualifiedTriggerName -> Bool
(QualifiedTriggerName -> QualifiedTriggerName -> Bool)
-> (QualifiedTriggerName -> QualifiedTriggerName -> Bool)
-> Eq QualifiedTriggerName
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: QualifiedTriggerName -> QualifiedTriggerName -> Bool
== :: QualifiedTriggerName -> QualifiedTriggerName -> Bool
$c/= :: QualifiedTriggerName -> QualifiedTriggerName -> Bool
/= :: QualifiedTriggerName -> QualifiedTriggerName -> Bool
Eq, QualifiedTriggerName -> PrepArg
(QualifiedTriggerName -> PrepArg) -> ToPrepArg QualifiedTriggerName
forall a. (a -> PrepArg) -> ToPrepArg a
$ctoPrepVal :: QualifiedTriggerName -> PrepArg
toPrepVal :: QualifiedTriggerName -> PrepArg
PG.ToPrepArg)

pgTriggerName :: Ops -> TriggerName -> QualifiedTriggerName
pgTriggerName :: Ops -> TriggerName -> QualifiedTriggerName
pgTriggerName Ops
op TriggerName
trn = Ops -> Text -> QualifiedTriggerName
forall {a}. Show a => a -> Text -> QualifiedTriggerName
qualifyTriggerName Ops
op (Text -> QualifiedTriggerName) -> Text -> QualifiedTriggerName
forall a b. (a -> b) -> a -> b
$ TriggerName -> Text
triggerNameToTxt TriggerName
trn
  where
    qualifyTriggerName :: a -> Text -> QualifiedTriggerName
qualifyTriggerName a
op' Text
trn' =
      Text -> QualifiedTriggerName
QualifiedTriggerName (Text -> QualifiedTriggerName) -> Text -> QualifiedTriggerName
forall a b. (a -> b) -> a -> b
$ Text
"notify_hasura_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
trn' Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> a -> Text
forall a. Show a => a -> Text
tshow a
op'

-- | pgIdenTrigger is a method used to construct the name of the pg function
-- used for event triggers which are present in the hdb_catalog schema.
pgIdenTrigger :: Ops -> TriggerName -> QualifiedTriggerName
pgIdenTrigger :: Ops -> TriggerName -> QualifiedTriggerName
pgIdenTrigger Ops
op = Text -> QualifiedTriggerName
QualifiedTriggerName (Text -> QualifiedTriggerName)
-> (TriggerName -> Text) -> TriggerName -> QualifiedTriggerName
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> Text
pgFmtIdentifier (Text -> Text) -> (TriggerName -> Text) -> TriggerName -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. QualifiedTriggerName -> Text
unQualifiedTriggerName (QualifiedTriggerName -> Text)
-> (TriggerName -> QualifiedTriggerName) -> TriggerName -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Ops -> TriggerName -> QualifiedTriggerName
pgTriggerName Ops
op

-- | Define the pgSQL trigger functions on database events.
mkTriggerFunctionQ ::
  forall pgKind m.
  (Backend ('Postgres pgKind), MonadTx m, MonadReader SQLGenCtx m) =>
  TriggerName ->
  QualifiedTable ->
  [ColumnInfo ('Postgres pgKind)] ->
  Ops ->
  SubscribeOpSpec ('Postgres pgKind) ->
  m QualifiedTriggerName
mkTriggerFunctionQ :: forall (pgKind :: PostgresKind) (m :: * -> *).
(Backend ('Postgres pgKind), MonadTx m, MonadReader SQLGenCtx m) =>
TriggerName
-> QualifiedTable
-> [ColumnInfo ('Postgres pgKind)]
-> Ops
-> SubscribeOpSpec ('Postgres pgKind)
-> m QualifiedTriggerName
mkTriggerFunctionQ TriggerName
triggerName (QualifiedObject SchemaName
schema TableName
table) [ColumnInfo ('Postgres pgKind)]
allCols Ops
op (SubscribeOpSpec SubscribeColumns ('Postgres pgKind)
listenColumns Maybe (SubscribeColumns ('Postgres pgKind))
deliveryColumns') = do
  StringifyNumbers
strfyNum <- (SQLGenCtx -> StringifyNumbers) -> m StringifyNumbers
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SQLGenCtx -> StringifyNumbers
stringifyNum
  let dbQualifiedTriggerName :: QualifiedTriggerName
dbQualifiedTriggerName = Ops -> TriggerName -> QualifiedTriggerName
pgIdenTrigger Ops
op TriggerName
triggerName
  () <-
    TxET QErr IO () -> m ()
forall a. TxE QErr a -> m a
forall (m :: * -> *) a. MonadTx m => TxE QErr a -> m a
liftTx
      (TxET QErr IO () -> m ()) -> TxET QErr IO () -> m ()
forall a b. (a -> b) -> a -> b
$ (PGTxErr -> QErr) -> Query -> TxET QErr IO ()
forall (m :: * -> *) a e.
(MonadIO m, FromRes a) =>
(PGTxErr -> e) -> Query -> TxET e m a
PG.multiQE PGTxErr -> QErr
defaultTxErrorHandler
      (Query -> TxET QErr IO ()) -> Query -> TxET QErr IO ()
forall a b. (a -> b) -> a -> b
$ Text -> Query
PG.fromText
      (Text -> Query) -> (Text -> Text) -> Text -> Query
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> Text
TL.toStrict
      (Text -> Query) -> Text -> Query
forall a b. (a -> b) -> a -> b
$ let
            -- If there are no specific delivery columns selected by user then all the columns will be delivered
            -- in payload hence 'SubCStar'.
            deliveryColumns :: SubscribeColumns ('Postgres pgKind)
deliveryColumns = SubscribeColumns ('Postgres pgKind)
-> Maybe (SubscribeColumns ('Postgres pgKind))
-> SubscribeColumns ('Postgres pgKind)
forall a. a -> Maybe a -> a
fromMaybe SubscribeColumns ('Postgres pgKind)
forall (b :: BackendType). SubscribeColumns b
SubCStar Maybe (SubscribeColumns ('Postgres pgKind))
deliveryColumns'
            getApplicableColumns :: SubscribeColumns ('Postgres pgKind)
-> [ColumnInfo ('Postgres pgKind)]
getApplicableColumns = \case
              SubscribeColumns ('Postgres pgKind)
SubCStar -> [ColumnInfo ('Postgres pgKind)]
allCols
              SubCArray [Column ('Postgres pgKind)]
cols -> [Column ('Postgres pgKind)]
-> [ColumnInfo ('Postgres pgKind)]
-> [ColumnInfo ('Postgres pgKind)]
forall (b :: BackendType).
Backend b =>
[Column b] -> [ColumnInfo b] -> [ColumnInfo b]
getColInfos [Column ('Postgres pgKind)]
cols [ColumnInfo ('Postgres pgKind)]
allCols

            -- Columns that should be present in the payload. By default, all columns are present.
            applicableDeliveryCols :: [ColumnInfo ('Postgres pgKind)]
applicableDeliveryCols = SubscribeColumns ('Postgres pgKind)
-> [ColumnInfo ('Postgres pgKind)]
getApplicableColumns SubscribeColumns ('Postgres pgKind)
deliveryColumns
            getRowExpression :: OpVar -> SQLExp
getRowExpression OpVar
opVar = SQLExp -> SQLExp
applyRowToJson' (SQLExp -> SQLExp) -> SQLExp -> SQLExp
forall a b. (a -> b) -> a -> b
$ OpVar
-> StringifyNumbers -> [ColumnInfo ('Postgres pgKind)] -> SQLExp
mkRowExpression OpVar
opVar StringifyNumbers
strfyNum [ColumnInfo ('Postgres pgKind)]
applicableDeliveryCols

            -- Columns that user subscribed to listen for changes. By default, we listen on all columns.
            applicableListenCols :: [ColumnInfo ('Postgres pgKind)]
applicableListenCols = SubscribeColumns ('Postgres pgKind)
-> [ColumnInfo ('Postgres pgKind)]
getApplicableColumns SubscribeColumns ('Postgres pgKind)
listenColumns
            renderRow :: OpVar -> SQLExp
renderRow OpVar
opVar = SQLExp -> SQLExp
applyRow (SQLExp -> SQLExp) -> SQLExp -> SQLExp
forall a b. (a -> b) -> a -> b
$ OpVar
-> StringifyNumbers -> [ColumnInfo ('Postgres pgKind)] -> SQLExp
mkRowExpression OpVar
opVar StringifyNumbers
strfyNum [ColumnInfo ('Postgres pgKind)]
applicableListenCols

            oldDataExp :: SQLExp
oldDataExp = case Ops
op of
              Ops
INSERT -> SQLExp
SENull
              Ops
UPDATE -> OpVar -> SQLExp
getRowExpression OpVar
OLD
              Ops
DELETE -> OpVar -> SQLExp
getRowExpression OpVar
OLD
              Ops
MANUAL -> SQLExp
SENull
            newDataExp :: SQLExp
newDataExp = case Ops
op of
              Ops
INSERT -> OpVar -> SQLExp
getRowExpression OpVar
NEW
              Ops
UPDATE -> OpVar -> SQLExp
getRowExpression OpVar
NEW
              Ops
DELETE -> SQLExp
SENull
              Ops
MANUAL -> SQLExp
SENull

            name :: Text
name = TriggerName -> Text
triggerNameToTxt TriggerName
triggerName
            qualifiedTriggerName :: Text
qualifiedTriggerName = QualifiedTriggerName -> Text
unQualifiedTriggerName QualifiedTriggerName
dbQualifiedTriggerName
            schemaName :: Text
schemaName = Text -> Text
pgFmtLit (Text -> Text) -> Text -> Text
forall a b. (a -> b) -> a -> b
$ SchemaName -> Text
getSchemaTxt SchemaName
schema
            tableName :: Text
tableName = Text -> Text
pgFmtLit (Text -> Text) -> Text -> Text
forall a b. (a -> b) -> a -> b
$ TableName -> Text
getTableTxt TableName
table

            oldRow :: Text
oldRow = SQLExp -> Text
forall a. ToSQL a => a -> Text
toSQLTxt (SQLExp -> Text) -> SQLExp -> Text
forall a b. (a -> b) -> a -> b
$ OpVar -> SQLExp
renderRow OpVar
OLD
            newRow :: Text
newRow = SQLExp -> Text
forall a. ToSQL a => a -> Text
toSQLTxt (SQLExp -> Text) -> SQLExp -> Text
forall a b. (a -> b) -> a -> b
$ OpVar -> SQLExp
renderRow OpVar
NEW
            oldPayloadExpression :: Text
oldPayloadExpression = SQLExp -> Text
forall a. ToSQL a => a -> Text
toSQLTxt SQLExp
oldDataExp
            newPayloadExpression :: Text
newPayloadExpression = SQLExp -> Text
forall a. ToSQL a => a -> Text
toSQLTxt SQLExp
newDataExp
         in $(makeRelativeToProject "src-rsr/trigger.sql.shakespeare" >>= ST.stextFile)
  QualifiedTriggerName -> m QualifiedTriggerName
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure QualifiedTriggerName
dbQualifiedTriggerName
  where
    applyRowToJson' :: SQLExp -> SQLExp
applyRowToJson' SQLExp
e = Text -> [SQLExp] -> Maybe OrderByExp -> SQLExp
SEFnApp Text
"row_to_json" [SQLExp
e] Maybe OrderByExp
forall a. Maybe a
Nothing
    applyRow :: SQLExp -> SQLExp
applyRow SQLExp
e = Text -> [SQLExp] -> Maybe OrderByExp -> SQLExp
SEFnApp Text
"row" [SQLExp
e] Maybe OrderByExp
forall a. Maybe a
Nothing
    opToQual :: OpVar -> Qual
opToQual = Text -> Qual
QualVar (Text -> Qual) -> (OpVar -> Text) -> OpVar -> Qual
forall b c a. (b -> c) -> (a -> b) -> a -> c
. OpVar -> Text
forall a. Show a => a -> Text
tshow

    mkRowExpression :: OpVar
-> StringifyNumbers -> [ColumnInfo ('Postgres pgKind)] -> SQLExp
mkRowExpression OpVar
opVar StringifyNumbers
strfyNum [ColumnInfo ('Postgres pgKind)]
columns =
      [Extractor] -> SQLExp
mkRowExp ([Extractor] -> SQLExp) -> [Extractor] -> SQLExp
forall a b. (a -> b) -> a -> b
$ (ColumnInfo ('Postgres pgKind) -> Extractor)
-> [ColumnInfo ('Postgres pgKind)] -> [Extractor]
forall a b. (a -> b) -> [a] -> [b]
map (\ColumnInfo ('Postgres pgKind)
col -> SQLExp -> ColumnInfo ('Postgres pgKind) -> Extractor
forall {b :: BackendType}.
(ScalarType b ~ PGScalarType, Column b ~ PGCol) =>
SQLExp -> ColumnInfo b -> Extractor
toExtractor (OpVar
-> StringifyNumbers -> ColumnInfo ('Postgres pgKind) -> SQLExp
mkQId OpVar
opVar StringifyNumbers
strfyNum ColumnInfo ('Postgres pgKind)
col) ColumnInfo ('Postgres pgKind)
col) [ColumnInfo ('Postgres pgKind)]
columns

    mkQId :: OpVar
-> StringifyNumbers -> ColumnInfo ('Postgres pgKind) -> SQLExp
mkQId OpVar
opVar StringifyNumbers
strfyNum ColumnInfo ('Postgres pgKind)
colInfo =
      StringifyNumbers
-> ColumnType ('Postgres pgKind)
-> Bool
-> Maybe NamingCase
-> SQLExp
-> SQLExp
forall (pgKind :: PostgresKind).
StringifyNumbers
-> ColumnType ('Postgres pgKind)
-> Bool
-> Maybe NamingCase
-> SQLExp
-> SQLExp
toJSONableExp StringifyNumbers
strfyNum (ColumnInfo ('Postgres pgKind) -> ColumnType ('Postgres pgKind)
forall (b :: BackendType). ColumnInfo b -> ColumnType b
ciType ColumnInfo ('Postgres pgKind)
colInfo) Bool
False Maybe NamingCase
forall a. Maybe a
Nothing
        (SQLExp -> SQLExp) -> SQLExp -> SQLExp
forall a b. (a -> b) -> a -> b
$ QIdentifier -> SQLExp
SEQIdentifier
        (QIdentifier -> SQLExp) -> QIdentifier -> SQLExp
forall a b. (a -> b) -> a -> b
$ Qual -> Identifier -> QIdentifier
QIdentifier (OpVar -> Qual
opToQual OpVar
opVar)
        (Identifier -> QIdentifier) -> Identifier -> QIdentifier
forall a b. (a -> b) -> a -> b
$ PGCol -> Identifier
forall a. IsIdentifier a => a -> Identifier
toIdentifier
        (PGCol -> Identifier) -> PGCol -> Identifier
forall a b. (a -> b) -> a -> b
$ ColumnInfo ('Postgres pgKind) -> Column ('Postgres pgKind)
forall (b :: BackendType). ColumnInfo b -> Column b
ciColumn ColumnInfo ('Postgres pgKind)
colInfo

    -- Generate the SQL expression
    toExtractor :: SQLExp -> ColumnInfo b -> Extractor
toExtractor SQLExp
sqlExp ColumnInfo b
column
      -- If the column type is either 'Geography' or 'Geometry', then after applying the 'ST_AsGeoJSON' function
      -- to the column, alias the value of the expression with the column name else it uses `st_asgeojson` as
      -- the column name.
      | (ScalarType b -> Bool) -> ColumnType b -> Bool
forall (b :: BackendType).
(ScalarType b -> Bool) -> ColumnType b -> Bool
isScalarColumnWhere ScalarType b -> Bool
PGScalarType -> Bool
isGeoType (ColumnInfo b -> ColumnType b
forall (b :: BackendType). ColumnInfo b -> ColumnType b
ciType ColumnInfo b
column) = SQLExp -> Maybe ColumnAlias -> Extractor
Extractor SQLExp
sqlExp (ColumnAlias -> Maybe ColumnAlias
forall a. a -> Maybe a
Just (ColumnAlias -> Maybe ColumnAlias)
-> ColumnAlias -> Maybe ColumnAlias
forall a b. (a -> b) -> a -> b
$ ColumnInfo b -> ColumnAlias
forall {b :: BackendType}.
(Column b ~ PGCol) =>
ColumnInfo b -> ColumnAlias
getAlias ColumnInfo b
column)
      | Bool
otherwise = SQLExp -> Maybe ColumnAlias -> Extractor
Extractor SQLExp
sqlExp Maybe ColumnAlias
forall a. Maybe a
Nothing
    getAlias :: ColumnInfo b -> ColumnAlias
getAlias ColumnInfo b
col = Identifier -> ColumnAlias
forall a. IsIdentifier a => a -> ColumnAlias
toColumnAlias (Identifier -> ColumnAlias) -> Identifier -> ColumnAlias
forall a b. (a -> b) -> a -> b
$ Text -> Identifier
Identifier (Text -> Identifier) -> Text -> Identifier
forall a b. (a -> b) -> a -> b
$ PGCol -> Text
getPGColTxt (ColumnInfo b -> Column b
forall (b :: BackendType). ColumnInfo b -> Column b
ciColumn ColumnInfo b
col)

checkIfTriggerExistsForTableQ ::
  QualifiedTriggerName ->
  QualifiedTable ->
  PG.TxE QErr Bool
checkIfTriggerExistsForTableQ :: QualifiedTriggerName -> QualifiedTable -> TxET QErr IO Bool
checkIfTriggerExistsForTableQ (QualifiedTriggerName Text
triggerName) (QualifiedObject SchemaName
schemaName TableName
tableName) =
  (SingleRow (Identity Bool) -> Bool)
-> TxET QErr IO (SingleRow (Identity Bool)) -> TxET QErr IO Bool
forall a b. (a -> b) -> TxET QErr IO a -> TxET QErr IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Identity Bool -> Bool
forall a. Identity a -> a
runIdentity (Identity Bool -> Bool)
-> (SingleRow (Identity Bool) -> Identity Bool)
-> SingleRow (Identity Bool)
-> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SingleRow (Identity Bool) -> Identity Bool
forall a. SingleRow a -> a
PG.getRow)
    (TxET QErr IO (SingleRow (Identity Bool)) -> TxET QErr IO Bool)
-> TxET QErr IO (SingleRow (Identity Bool)) -> TxET QErr IO Bool
forall a b. (a -> b) -> a -> b
$ (PGTxErr -> QErr)
-> Query
-> (Text, SchemaName, TableName)
-> Bool
-> TxET QErr IO (SingleRow (Identity Bool))
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
PG.withQE
      PGTxErr -> QErr
defaultTxErrorHandler
      -- 'regclass' converts non-quoted strings to lowercase but since identifiers
      -- such as table name needs are case-sensitive, we add quotes to table name
      -- using 'quote_ident'.
      -- Ref: https://www.postgresql.org/message-id/3896142.1620136761%40sss.pgh.pa.us
      [PG.sql|
      SELECT EXISTS (
        SELECT 1
        FROM pg_trigger
        WHERE NOT tgisinternal
        AND tgname = $1 AND tgrelid = (quote_ident($2) || '.' || quote_ident($3))::regclass
        )
     |]
      (Text
triggerName, SchemaName
schemaName, TableName
tableName)
      Bool
True

checkIfFunctionExistsQ ::
  TriggerName ->
  Ops ->
  PG.TxE QErr Bool
checkIfFunctionExistsQ :: TriggerName -> Ops -> TxET QErr IO Bool
checkIfFunctionExistsQ TriggerName
triggerName Ops
op = do
  let qualifiedTriggerName :: QualifiedTriggerName
qualifiedTriggerName = Ops -> TriggerName -> QualifiedTriggerName
pgTriggerName Ops
op TriggerName
triggerName
  (SingleRow (Identity Bool) -> Bool)
-> TxET QErr IO (SingleRow (Identity Bool)) -> TxET QErr IO Bool
forall a b. (a -> b) -> TxET QErr IO a -> TxET QErr IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Identity Bool -> Bool
forall a. Identity a -> a
runIdentity (Identity Bool -> Bool)
-> (SingleRow (Identity Bool) -> Identity Bool)
-> SingleRow (Identity Bool)
-> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SingleRow (Identity Bool) -> Identity Bool
forall a. SingleRow a -> a
PG.getRow)
    (TxET QErr IO (SingleRow (Identity Bool)) -> TxET QErr IO Bool)
-> TxET QErr IO (SingleRow (Identity Bool)) -> TxET QErr IO Bool
forall a b. (a -> b) -> a -> b
$ (PGTxErr -> QErr)
-> Query
-> Identity QualifiedTriggerName
-> Bool
-> TxET QErr IO (SingleRow (Identity Bool))
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
PG.withQE
      PGTxErr -> QErr
defaultTxErrorHandler
      [PG.sql|
      SELECT EXISTS (
        SELECT 1
        FROM pg_catalog.pg_proc
        JOIN pg_namespace ON pg_catalog.pg_proc.pronamespace = pg_namespace.oid
        WHERE proname = $1
        AND pg_namespace.nspname = 'hdb_catalog'
        )
     |]
      (QualifiedTriggerName -> Identity QualifiedTriggerName
forall a. a -> Identity a
Identity QualifiedTriggerName
qualifiedTriggerName)
      Bool
True

mkTrigger ::
  forall pgKind m.
  (Backend ('Postgres pgKind), MonadTx m, MonadReader SQLGenCtx m) =>
  TriggerName ->
  QualifiedTable ->
  TriggerOnReplication ->
  [ColumnInfo ('Postgres pgKind)] ->
  Ops ->
  SubscribeOpSpec ('Postgres pgKind) ->
  m ()
mkTrigger :: forall (pgKind :: PostgresKind) (m :: * -> *).
(Backend ('Postgres pgKind), MonadTx m, MonadReader SQLGenCtx m) =>
TriggerName
-> QualifiedTable
-> TriggerOnReplication
-> [ColumnInfo ('Postgres pgKind)]
-> Ops
-> SubscribeOpSpec ('Postgres pgKind)
-> m ()
mkTrigger TriggerName
triggerName QualifiedTable
table TriggerOnReplication
triggerOnReplication [ColumnInfo ('Postgres pgKind)]
allCols Ops
op SubscribeOpSpec ('Postgres pgKind)
subOpSpec = do
  -- create/replace the trigger function
  QualifiedTriggerName Text
dbTriggerNameTxt <- TriggerName
-> QualifiedTable
-> [ColumnInfo ('Postgres pgKind)]
-> Ops
-> SubscribeOpSpec ('Postgres pgKind)
-> m QualifiedTriggerName
forall (pgKind :: PostgresKind) (m :: * -> *).
(Backend ('Postgres pgKind), MonadTx m, MonadReader SQLGenCtx m) =>
TriggerName
-> QualifiedTable
-> [ColumnInfo ('Postgres pgKind)]
-> Ops
-> SubscribeOpSpec ('Postgres pgKind)
-> m QualifiedTriggerName
mkTriggerFunctionQ TriggerName
triggerName QualifiedTable
table [ColumnInfo ('Postgres pgKind)]
allCols Ops
op SubscribeOpSpec ('Postgres pgKind)
subOpSpec
  -- check if the SQL trigger exists and only if the SQL trigger doesn't exist
  -- we create the SQL trigger.
  Bool
doesTriggerExist <- TxET QErr IO Bool -> m Bool
forall a. TxE QErr a -> m a
forall (m :: * -> *) a. MonadTx m => TxE QErr a -> m a
liftTx (TxET QErr IO Bool -> m Bool) -> TxET QErr IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ QualifiedTriggerName -> QualifiedTable -> TxET QErr IO Bool
checkIfTriggerExistsForTableQ (Ops -> TriggerName -> QualifiedTriggerName
pgTriggerName Ops
op TriggerName
triggerName) QualifiedTable
table
  Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
doesTriggerExist
    (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ let createTriggerSqlQuery :: Query
createTriggerSqlQuery =
            Text -> Query
PG.fromText (Text -> Query) -> Text -> Query
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text -> Text
forall {a} {a} {a}.
(ToText a, ToText a, ToText a) =>
a -> a -> a -> Text
createTriggerSQL Text
dbTriggerNameTxt (QualifiedTable -> Text
forall a. ToSQL a => a -> Text
toSQLTxt QualifiedTable
table) (Ops -> Text
forall a. Show a => a -> Text
tshow Ops
op)
       in TxET QErr IO () -> m ()
forall a. TxE QErr a -> m a
forall (m :: * -> *) a. MonadTx m => TxE QErr a -> m a
liftTx (TxET QErr IO () -> m ()) -> TxET QErr IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
            (PGTxErr -> QErr) -> Query -> () -> Bool -> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE PGTxErr -> QErr
defaultTxErrorHandler Query
createTriggerSqlQuery () Bool
False
            Bool -> TxET QErr IO () -> TxET QErr IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (TriggerOnReplication
triggerOnReplication TriggerOnReplication -> TriggerOnReplication -> Bool
forall a. Eq a => a -> a -> Bool
== TriggerOnReplication
TOREnableTrigger)
              (TxET QErr IO () -> TxET QErr IO ())
-> TxET QErr IO () -> TxET QErr IO ()
forall a b. (a -> b) -> a -> b
$ (PGTxErr -> QErr) -> Query -> () -> Bool -> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE PGTxErr -> QErr
defaultTxErrorHandler (Text -> Text -> Query
forall {a} {a}. (ToText a, ToText a) => a -> a -> Query
alwaysEnableTriggerQuery Text
dbTriggerNameTxt (QualifiedTable -> Text
forall a. ToSQL a => a -> Text
toSQLTxt QualifiedTable
table)) () Bool
False
  where
    createTriggerSQL :: a -> a -> a -> Text
createTriggerSQL a
triggerNameTxt a
tableName a
opText =
      [ST.st|
         CREATE TRIGGER #{triggerNameTxt} AFTER #{opText} ON #{tableName} FOR EACH ROW EXECUTE PROCEDURE hdb_catalog.#{triggerNameTxt}()
      |]

    alwaysEnableTriggerQuery :: a -> a -> Query
alwaysEnableTriggerQuery a
triggerNameTxt a
tableTxt =
      Text -> Query
PG.fromText
        (Text -> Query) -> Text -> Query
forall a b. (a -> b) -> a -> b
$ [ST.st|
        ALTER TABLE #{tableTxt} ENABLE ALWAYS TRIGGER #{triggerNameTxt};
      |]

mkAllTriggersQ ::
  forall pgKind m.
  (Backend ('Postgres pgKind), MonadTx m, MonadReader SQLGenCtx m) =>
  TriggerName ->
  QualifiedTable ->
  TriggerOnReplication ->
  [ColumnInfo ('Postgres pgKind)] ->
  TriggerOpsDef ('Postgres pgKind) ->
  m ()
mkAllTriggersQ :: forall (pgKind :: PostgresKind) (m :: * -> *).
(Backend ('Postgres pgKind), MonadTx m, MonadReader SQLGenCtx m) =>
TriggerName
-> QualifiedTable
-> TriggerOnReplication
-> [ColumnInfo ('Postgres pgKind)]
-> TriggerOpsDef ('Postgres pgKind)
-> m ()
mkAllTriggersQ TriggerName
triggerName QualifiedTable
table TriggerOnReplication
triggerOnReplication [ColumnInfo ('Postgres pgKind)]
allCols TriggerOpsDef ('Postgres pgKind)
fullspec = do
  Maybe (SubscribeOpSpec ('Postgres pgKind))
-> (SubscribeOpSpec ('Postgres pgKind) -> m ()) -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ (TriggerOpsDef ('Postgres pgKind)
-> Maybe (SubscribeOpSpec ('Postgres pgKind))
forall (b :: BackendType).
TriggerOpsDef b -> Maybe (SubscribeOpSpec b)
tdInsert TriggerOpsDef ('Postgres pgKind)
fullspec) (TriggerName
-> QualifiedTable
-> TriggerOnReplication
-> [ColumnInfo ('Postgres pgKind)]
-> Ops
-> SubscribeOpSpec ('Postgres pgKind)
-> m ()
forall (pgKind :: PostgresKind) (m :: * -> *).
(Backend ('Postgres pgKind), MonadTx m, MonadReader SQLGenCtx m) =>
TriggerName
-> QualifiedTable
-> TriggerOnReplication
-> [ColumnInfo ('Postgres pgKind)]
-> Ops
-> SubscribeOpSpec ('Postgres pgKind)
-> m ()
mkTrigger TriggerName
triggerName QualifiedTable
table TriggerOnReplication
triggerOnReplication [ColumnInfo ('Postgres pgKind)]
allCols Ops
INSERT)
  Maybe (SubscribeOpSpec ('Postgres pgKind))
-> (SubscribeOpSpec ('Postgres pgKind) -> m ()) -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ (TriggerOpsDef ('Postgres pgKind)
-> Maybe (SubscribeOpSpec ('Postgres pgKind))
forall (b :: BackendType).
TriggerOpsDef b -> Maybe (SubscribeOpSpec b)
tdUpdate TriggerOpsDef ('Postgres pgKind)
fullspec) (TriggerName
-> QualifiedTable
-> TriggerOnReplication
-> [ColumnInfo ('Postgres pgKind)]
-> Ops
-> SubscribeOpSpec ('Postgres pgKind)
-> m ()
forall (pgKind :: PostgresKind) (m :: * -> *).
(Backend ('Postgres pgKind), MonadTx m, MonadReader SQLGenCtx m) =>
TriggerName
-> QualifiedTable
-> TriggerOnReplication
-> [ColumnInfo ('Postgres pgKind)]
-> Ops
-> SubscribeOpSpec ('Postgres pgKind)
-> m ()
mkTrigger TriggerName
triggerName QualifiedTable
table TriggerOnReplication
triggerOnReplication [ColumnInfo ('Postgres pgKind)]
allCols Ops
UPDATE)
  Maybe (SubscribeOpSpec ('Postgres pgKind))
-> (SubscribeOpSpec ('Postgres pgKind) -> m ()) -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ (TriggerOpsDef ('Postgres pgKind)
-> Maybe (SubscribeOpSpec ('Postgres pgKind))
forall (b :: BackendType).
TriggerOpsDef b -> Maybe (SubscribeOpSpec b)
tdDelete TriggerOpsDef ('Postgres pgKind)
fullspec) (TriggerName
-> QualifiedTable
-> TriggerOnReplication
-> [ColumnInfo ('Postgres pgKind)]
-> Ops
-> SubscribeOpSpec ('Postgres pgKind)
-> m ()
forall (pgKind :: PostgresKind) (m :: * -> *).
(Backend ('Postgres pgKind), MonadTx m, MonadReader SQLGenCtx m) =>
TriggerName
-> QualifiedTable
-> TriggerOnReplication
-> [ColumnInfo ('Postgres pgKind)]
-> Ops
-> SubscribeOpSpec ('Postgres pgKind)
-> m ()
mkTrigger TriggerName
triggerName QualifiedTable
table TriggerOnReplication
triggerOnReplication [ColumnInfo ('Postgres pgKind)]
allCols Ops
DELETE)

-- | Add cleanup logs for given trigger names and cleanup configs. This will perform the following steps:
--
--   1. Get last scheduled cleanup event and count.
--   2. If count is less than 5, then add add more cleanup logs, else do nothing
addCleanupSchedules ::
  (MonadIO m, MonadError QErr m) =>
  PGSourceConfig ->
  [(TriggerName, AutoTriggerLogCleanupConfig)] ->
  m ()
addCleanupSchedules :: forall (m :: * -> *).
(MonadIO m, MonadError QErr m) =>
PGSourceConfig
-> [(TriggerName, AutoTriggerLogCleanupConfig)] -> m ()
addCleanupSchedules PGSourceConfig
sourceConfig [(TriggerName, AutoTriggerLogCleanupConfig)]
triggersWithcleanupConfig =
  Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless ([(TriggerName, AutoTriggerLogCleanupConfig)] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [(TriggerName, AutoTriggerLogCleanupConfig)]
triggersWithcleanupConfig) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
    let triggerNames :: [TriggerName]
triggerNames = ((TriggerName, AutoTriggerLogCleanupConfig) -> TriggerName)
-> [(TriggerName, AutoTriggerLogCleanupConfig)] -> [TriggerName]
forall a b. (a -> b) -> [a] -> [b]
map (TriggerName, AutoTriggerLogCleanupConfig) -> TriggerName
forall a b. (a, b) -> a
fst [(TriggerName, AutoTriggerLogCleanupConfig)]
triggersWithcleanupConfig
    [(TriggerName, Int, UTCTime)]
countAndLastSchedules <- m (Either QErr [(TriggerName, Int, UTCTime)])
-> m [(TriggerName, Int, UTCTime)]
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM (m (Either QErr [(TriggerName, Int, UTCTime)])
 -> m [(TriggerName, Int, UTCTime)])
-> m (Either QErr [(TriggerName, Int, UTCTime)])
-> m [(TriggerName, Int, UTCTime)]
forall a b. (a -> b) -> a -> b
$ IO (Either QErr [(TriggerName, Int, UTCTime)])
-> m (Either QErr [(TriggerName, Int, UTCTime)])
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either QErr [(TriggerName, Int, UTCTime)])
 -> m (Either QErr [(TriggerName, Int, UTCTime)]))
-> IO (Either QErr [(TriggerName, Int, UTCTime)])
-> m (Either QErr [(TriggerName, Int, UTCTime)])
forall a b. (a -> b) -> a -> b
$ PGSourceConfig
-> TxET QErr IO [(TriggerName, Int, UTCTime)]
-> IO (Either QErr [(TriggerName, Int, UTCTime)])
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> TxET QErr m a -> m (Either QErr a)
runPgSourceReadTx PGSourceConfig
sourceConfig (TxET QErr IO [(TriggerName, Int, UTCTime)]
 -> IO (Either QErr [(TriggerName, Int, UTCTime)]))
-> TxET QErr IO [(TriggerName, Int, UTCTime)]
-> IO (Either QErr [(TriggerName, Int, UTCTime)])
forall a b. (a -> b) -> a -> b
$ [TriggerName] -> TxET QErr IO [(TriggerName, Int, UTCTime)]
selectLastCleanupScheduledTimestamp [TriggerName]
triggerNames
    UTCTime
currTime <- IO UTCTime -> m UTCTime
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO UTCTime -> m UTCTime) -> IO UTCTime -> m UTCTime
forall a b. (a -> b) -> a -> b
$ IO UTCTime
Time.getCurrentTime
    let triggerMap :: HashMap TriggerName (Int, UTCTime)
triggerMap = [(TriggerName, (Int, UTCTime))]
-> HashMap TriggerName (Int, UTCTime)
forall k v. (Eq k, Hashable k) => [(k, v)] -> HashMap k v
HashMap.fromList ([(TriggerName, (Int, UTCTime))]
 -> HashMap TriggerName (Int, UTCTime))
-> [(TriggerName, (Int, UTCTime))]
-> HashMap TriggerName (Int, UTCTime)
forall a b. (a -> b) -> a -> b
$ ((TriggerName, Int, UTCTime) -> (TriggerName, (Int, UTCTime)))
-> [(TriggerName, Int, UTCTime)] -> [(TriggerName, (Int, UTCTime))]
forall a b. (a -> b) -> [a] -> [b]
map (\(TriggerName
triggerName, Int
count, UTCTime
lastTime) -> (TriggerName
triggerName, (Int
count, UTCTime
lastTime))) [(TriggerName, Int, UTCTime)]
countAndLastSchedules
        scheduledTriggersAndTimestamps :: [(TriggerName, [UTCTime])]
scheduledTriggersAndTimestamps =
          ((TriggerName, AutoTriggerLogCleanupConfig)
 -> Maybe (TriggerName, [UTCTime]))
-> [(TriggerName, AutoTriggerLogCleanupConfig)]
-> [(TriggerName, [UTCTime])]
forall a b. (a -> Maybe b) -> [a] -> [b]
forall (f :: * -> *) a b.
Filterable f =>
(a -> Maybe b) -> f a -> f b
mapMaybe
            ( \(TriggerName
triggerName, AutoTriggerLogCleanupConfig
cleanupConfig) ->
                let lastScheduledTime :: Maybe UTCTime
lastScheduledTime = case TriggerName
-> HashMap TriggerName (Int, UTCTime) -> Maybe (Int, UTCTime)
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HashMap.lookup TriggerName
triggerName HashMap TriggerName (Int, UTCTime)
triggerMap of
                      Maybe (Int, UTCTime)
Nothing -> UTCTime -> Maybe UTCTime
forall a. a -> Maybe a
Just UTCTime
currTime
                      Just (Int
count, UTCTime
lastTime) -> if Int
count Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
5 then (UTCTime -> Maybe UTCTime
forall a. a -> Maybe a
Just UTCTime
lastTime) else Maybe UTCTime
forall a. Maybe a
Nothing
                 in (UTCTime -> (TriggerName, [UTCTime]))
-> Maybe UTCTime -> Maybe (TriggerName, [UTCTime])
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap
                      ( \UTCTime
lastScheduledTimestamp ->
                          (TriggerName
triggerName, UTCTime -> Int -> CronSchedule -> [UTCTime]
generateScheduleTimes UTCTime
lastScheduledTimestamp Int
cleanupSchedulesToBeGenerated (AutoTriggerLogCleanupConfig -> CronSchedule
_atlccSchedule AutoTriggerLogCleanupConfig
cleanupConfig))
                      )
                      Maybe UTCTime
lastScheduledTime
            )
            [(TriggerName, AutoTriggerLogCleanupConfig)]
triggersWithcleanupConfig
    Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless ([(TriggerName, [UTCTime])] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [(TriggerName, [UTCTime])]
scheduledTriggersAndTimestamps)
      (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ m (Either QErr ()) -> m ()
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM
      (m (Either QErr ()) -> m ()) -> m (Either QErr ()) -> m ()
forall a b. (a -> b) -> a -> b
$ IO (Either QErr ()) -> m (Either QErr ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
      (IO (Either QErr ()) -> m (Either QErr ()))
-> IO (Either QErr ()) -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$ PGSourceConfig
-> PGExecFrom -> TxET QErr IO () -> IO (Either QErr ())
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> PGExecFrom -> TxET QErr m a -> m (Either QErr a)
runPgSourceWriteTx PGSourceConfig
sourceConfig PGExecFrom
InternalRawQuery
      (TxET QErr IO () -> IO (Either QErr ()))
-> TxET QErr IO () -> IO (Either QErr ())
forall a b. (a -> b) -> a -> b
$ [(TriggerName, [UTCTime])] -> TxET QErr IO ()
insertEventTriggerCleanupLogsTx [(TriggerName, [UTCTime])]
scheduledTriggersAndTimestamps

-- | Insert the cleanup logs for the fiven trigger name and schedules
insertEventTriggerCleanupLogsTx :: [(TriggerName, [Time.UTCTime])] -> PG.TxET QErr IO ()
insertEventTriggerCleanupLogsTx :: [(TriggerName, [UTCTime])] -> TxET QErr IO ()
insertEventTriggerCleanupLogsTx [(TriggerName, [UTCTime])]
triggersWithschedules = do
  let insertCleanupEventsSql :: Text
insertCleanupEventsSql =
        Builder -> Text
TB.run
          (Builder -> Text) -> Builder -> Text
forall a b. (a -> b) -> a -> b
$ SQLInsert -> Builder
forall a. ToSQL a => a -> Builder
toSQL
            S.SQLInsert
              { siTable :: QualifiedTable
siTable = QualifiedTable
cleanupLogTable,
                siCols :: [PGCol]
siCols = (Text -> PGCol) -> [Text] -> [PGCol]
forall a b. (a -> b) -> [a] -> [b]
map Text -> PGCol
unsafePGCol [Text
"trigger_name", Text
"scheduled_at", Text
"status"],
                siValues :: ValuesExp
siValues = [TupleExp] -> ValuesExp
S.ValuesExp ([TupleExp] -> ValuesExp) -> [TupleExp] -> ValuesExp
forall a b. (a -> b) -> a -> b
$ ((TriggerName, [UTCTime]) -> [TupleExp])
-> [(TriggerName, [UTCTime])] -> [TupleExp]
forall (t :: * -> *) a b. Foldable t => (a -> [b]) -> t a -> [b]
concatMap (TriggerName, [UTCTime]) -> [TupleExp]
genArr [(TriggerName, [UTCTime])]
triggersWithschedules,
                siConflict :: Maybe SQLConflict
siConflict = SQLConflict -> Maybe SQLConflict
forall a. a -> Maybe a
Just (SQLConflict -> Maybe SQLConflict)
-> SQLConflict -> Maybe SQLConflict
forall a b. (a -> b) -> a -> b
$ Maybe SQLConflictTarget -> SQLConflict
S.DoNothing Maybe SQLConflictTarget
forall a. Maybe a
Nothing,
                siRet :: Maybe RetExp
siRet = Maybe RetExp
forall a. Maybe a
Nothing
              }
  (PGTxErr -> QErr) -> Query -> () -> Bool -> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE PGTxErr -> QErr
defaultTxErrorHandler (Text -> Query
PG.fromText Text
insertCleanupEventsSql) () Bool
False
  where
    cleanupLogTable :: QualifiedTable
cleanupLogTable = SchemaName -> TableName -> QualifiedTable
forall a. SchemaName -> a -> QualifiedObject a
QualifiedObject SchemaName
"hdb_catalog" TableName
"hdb_event_log_cleanups"
    genArr :: (TriggerName, [UTCTime]) -> [TupleExp]
genArr (TriggerName
t, [UTCTime]
schedules) = (UTCTime -> TupleExp) -> [UTCTime] -> [TupleExp]
forall a b. (a -> b) -> [a] -> [b]
map ([Text] -> TupleExp
toTupleExp ([Text] -> TupleExp) -> (UTCTime -> [Text]) -> UTCTime -> TupleExp
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (\UTCTime
s -> [(TriggerName -> Text
triggerNameToTxt TriggerName
t), (UTCTime -> Text
formatTime' UTCTime
s), Text
"scheduled"])) [UTCTime]
schedules
    toTupleExp :: [Text] -> TupleExp
toTupleExp = [SQLExp] -> TupleExp
S.TupleExp ([SQLExp] -> TupleExp)
-> ([Text] -> [SQLExp]) -> [Text] -> TupleExp
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Text -> SQLExp) -> [Text] -> [SQLExp]
forall a b. (a -> b) -> [a] -> [b]
map Text -> SQLExp
S.SELit

-- | Get the last scheduled timestamp for a given event trigger name
selectLastCleanupScheduledTimestamp :: [TriggerName] -> PG.TxET QErr IO [(TriggerName, Int, Time.UTCTime)]
selectLastCleanupScheduledTimestamp :: [TriggerName] -> TxET QErr IO [(TriggerName, Int, UTCTime)]
selectLastCleanupScheduledTimestamp [TriggerName]
triggerNames =
  (PGTxErr -> QErr)
-> Query
-> Identity PGTextArray
-> Bool
-> TxET QErr IO [(TriggerName, Int, UTCTime)]
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
PG.withQE
    PGTxErr -> QErr
defaultTxErrorHandler
    [PG.sql|
      SELECT trigger_name, count(1), max(scheduled_at)
      FROM hdb_catalog.hdb_event_log_cleanups
      WHERE status='scheduled' AND trigger_name = ANY($1::text[])
      GROUP BY trigger_name
    |]
    (PGTextArray -> Identity PGTextArray
forall a. a -> Identity a
Identity (PGTextArray -> Identity PGTextArray)
-> PGTextArray -> Identity PGTextArray
forall a b. (a -> b) -> a -> b
$ [Text] -> PGTextArray
PGTextArray ([Text] -> PGTextArray) -> [Text] -> PGTextArray
forall a b. (a -> b) -> a -> b
$ (TriggerName -> Text) -> [TriggerName] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map TriggerName -> Text
triggerNameToTxt [TriggerName]
triggerNames)
    Bool
True

deleteAllScheduledCleanupsTx :: TriggerName -> PG.TxE QErr ()
deleteAllScheduledCleanupsTx :: TriggerName -> TxET QErr IO ()
deleteAllScheduledCleanupsTx TriggerName
triggerName = do
  (PGTxErr -> QErr)
-> Query -> Identity Text -> Bool -> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE
    PGTxErr -> QErr
defaultTxErrorHandler
    [PG.sql|
      DELETE from hdb_catalog.hdb_event_log_cleanups
      WHERE (status = 'scheduled') AND (trigger_name = $1)
    |]
    (Text -> Identity Text
forall a. a -> Identity a
Identity (TriggerName -> Text
triggerNameToTxt TriggerName
triggerName))
    Bool
True

-- | @deleteAllScheduledCleanups@ deletes all scheduled cleanup logs for a given event trigger
deleteAllScheduledCleanups ::
  (MonadIO m, MonadError QErr m) =>
  PGSourceConfig ->
  TriggerName ->
  m ()
deleteAllScheduledCleanups :: forall (m :: * -> *).
(MonadIO m, MonadError QErr m) =>
PGSourceConfig -> TriggerName -> m ()
deleteAllScheduledCleanups PGSourceConfig
sourceConfig TriggerName
triggerName =
  m (Either QErr ()) -> m ()
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM (m (Either QErr ()) -> m ()) -> m (Either QErr ()) -> m ()
forall a b. (a -> b) -> a -> b
$ IO (Either QErr ()) -> m (Either QErr ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either QErr ()) -> m (Either QErr ()))
-> IO (Either QErr ()) -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$ PGSourceConfig
-> PGExecFrom -> TxET QErr IO () -> IO (Either QErr ())
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> PGExecFrom -> TxET QErr m a -> m (Either QErr a)
runPgSourceWriteTx PGSourceConfig
sourceConfig PGExecFrom
InternalRawQuery (TxET QErr IO () -> IO (Either QErr ()))
-> TxET QErr IO () -> IO (Either QErr ())
forall a b. (a -> b) -> a -> b
$ TriggerName -> TxET QErr IO ()
deleteAllScheduledCleanupsTx TriggerName
triggerName

getCleanupEventsForDeletionTx :: PG.TxE QErr ([(Text, TriggerName)])
getCleanupEventsForDeletionTx :: TxE QErr [(Text, TriggerName)]
getCleanupEventsForDeletionTx =
  (PGTxErr -> QErr)
-> Query -> () -> Bool -> TxE QErr [(Text, TriggerName)]
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
PG.withQE
    PGTxErr -> QErr
defaultTxErrorHandler
    [PG.sql|
          WITH latest_events as (
            SELECT * from hdb_catalog.hdb_event_log_cleanups WHERE status = 'scheduled' AND scheduled_at < (now() at time zone 'utc')
          ),
            grouped_events as (
              SELECT trigger_name, max(scheduled_at) as scheduled_at
                from latest_events
              group by trigger_name
            ),
            mark_events_as_dead as (
              UPDATE hdb_catalog.hdb_event_log_cleanups l
              SET status = 'dead'
              FROM grouped_events AS g
              WHERE l.trigger_name = g.trigger_name AND l.scheduled_at < g.scheduled_at AND l.status = 'scheduled'
            )
          SELECT l.id, l.trigger_name
            FROM latest_events l
                JOIN grouped_events g ON l.trigger_name = g.trigger_name
                WHERE l.scheduled_at = g.scheduled_at;
      |]
    ()
    Bool
False

-- | @getCleanupEventsForDeletion@ returns the cleanup logs that are to be deleted.
-- This will perform the following steps:
--
-- 1. Get the scheduled cleanup events that were scheduled before current time.
-- 2. If there are multiple entries for the same trigger name with different scheduled time,
--    then fetch the latest entry and mark others as dead.
getCleanupEventsForDeletion ::
  (MonadIO m, MonadError QErr m) =>
  PGSourceConfig ->
  m [(Text, TriggerName)]
getCleanupEventsForDeletion :: forall (m :: * -> *).
(MonadIO m, MonadError QErr m) =>
PGSourceConfig -> m [(Text, TriggerName)]
getCleanupEventsForDeletion PGSourceConfig
sourceConfig =
  m (Either QErr [(Text, TriggerName)]) -> m [(Text, TriggerName)]
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM (m (Either QErr [(Text, TriggerName)]) -> m [(Text, TriggerName)])
-> m (Either QErr [(Text, TriggerName)]) -> m [(Text, TriggerName)]
forall a b. (a -> b) -> a -> b
$ IO (Either QErr [(Text, TriggerName)])
-> m (Either QErr [(Text, TriggerName)])
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either QErr [(Text, TriggerName)])
 -> m (Either QErr [(Text, TriggerName)]))
-> IO (Either QErr [(Text, TriggerName)])
-> m (Either QErr [(Text, TriggerName)])
forall a b. (a -> b) -> a -> b
$ PGSourceConfig
-> PGExecFrom
-> TxE QErr [(Text, TriggerName)]
-> IO (Either QErr [(Text, TriggerName)])
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> PGExecFrom -> TxET QErr m a -> m (Either QErr a)
runPgSourceWriteTx PGSourceConfig
sourceConfig PGExecFrom
InternalRawQuery (TxE QErr [(Text, TriggerName)]
 -> IO (Either QErr [(Text, TriggerName)]))
-> TxE QErr [(Text, TriggerName)]
-> IO (Either QErr [(Text, TriggerName)])
forall a b. (a -> b) -> a -> b
$ TxE QErr [(Text, TriggerName)]
getCleanupEventsForDeletionTx

markCleanupEventsAsDeadTx :: [Text] -> PG.TxE QErr ()
markCleanupEventsAsDeadTx :: [Text] -> TxET QErr IO ()
markCleanupEventsAsDeadTx [Text]
toDeadEvents = do
  Bool -> TxET QErr IO () -> TxET QErr IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless ([Text] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [Text]
toDeadEvents)
    (TxET QErr IO () -> TxET QErr IO ())
-> TxET QErr IO () -> TxET QErr IO ()
forall a b. (a -> b) -> a -> b
$ (PGTxErr -> QErr)
-> Query -> Identity PGTextArray -> Bool -> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE
      PGTxErr -> QErr
defaultTxErrorHandler
      [PG.sql|
        UPDATE hdb_catalog.hdb_event_log_cleanups l
        SET status = 'dead'
        WHERE id = ANY($1::text[])
      |]
      (PGTextArray -> Identity PGTextArray
forall a. a -> Identity a
Identity (PGTextArray -> Identity PGTextArray)
-> PGTextArray -> Identity PGTextArray
forall a b. (a -> b) -> a -> b
$ [Text] -> PGTextArray
PGTextArray [Text]
toDeadEvents)
      Bool
True

-- unitQueryE HGE.defaultMSSQLTxErrorHandler $
--   rawUnescapedText . LT.toStrict $
--     $(makeRelativeToProject "src-rsr/mssql/event_logs_cleanup_sqls/mssql_update_events_to_dead.sql.shakespeare" >>= ST.stextFile)

updateCleanupEventStatusToDead ::
  (MonadIO m, MonadError QErr m) =>
  PGSourceConfig ->
  [Text] ->
  m ()
updateCleanupEventStatusToDead :: forall (m :: * -> *).
(MonadIO m, MonadError QErr m) =>
PGSourceConfig -> [Text] -> m ()
updateCleanupEventStatusToDead PGSourceConfig
sourceConfig [Text]
toDeadEvents =
  m (Either QErr ()) -> m ()
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM (m (Either QErr ()) -> m ()) -> m (Either QErr ()) -> m ()
forall a b. (a -> b) -> a -> b
$ IO (Either QErr ()) -> m (Either QErr ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either QErr ()) -> m (Either QErr ()))
-> IO (Either QErr ()) -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$ PGSourceConfig
-> PGExecFrom -> TxET QErr IO () -> IO (Either QErr ())
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> PGExecFrom -> TxET QErr m a -> m (Either QErr a)
runPgSourceWriteTx PGSourceConfig
sourceConfig PGExecFrom
InternalRawQuery (TxET QErr IO () -> IO (Either QErr ()))
-> TxET QErr IO () -> IO (Either QErr ())
forall a b. (a -> b) -> a -> b
$ [Text] -> TxET QErr IO ()
markCleanupEventsAsDeadTx [Text]
toDeadEvents

updateCleanupEventStatusToPausedTx :: Text -> PG.TxE QErr ()
updateCleanupEventStatusToPausedTx :: Text -> TxET QErr IO ()
updateCleanupEventStatusToPausedTx Text
cleanupLogId =
  (PGTxErr -> QErr)
-> Query -> Identity Text -> Bool -> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE
    PGTxErr -> QErr
defaultTxErrorHandler
    [PG.sql|
          UPDATE hdb_catalog.hdb_event_log_cleanups
          SET status = 'paused'
          WHERE id = $1
          |]
    (Text -> Identity Text
forall a. a -> Identity a
Identity Text
cleanupLogId)
    Bool
True

-- | @updateCleanupEventStatusToPaused@ updates the cleanup log status to `paused` if the event trigger configuration is paused.
updateCleanupEventStatusToPaused ::
  (MonadIO m, MonadError QErr m) =>
  PGSourceConfig ->
  Text ->
  m ()
updateCleanupEventStatusToPaused :: forall (m :: * -> *).
(MonadIO m, MonadError QErr m) =>
PGSourceConfig -> Text -> m ()
updateCleanupEventStatusToPaused PGSourceConfig
sourceConfig Text
cleanupLogId =
  m (Either QErr ()) -> m ()
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM (m (Either QErr ()) -> m ()) -> m (Either QErr ()) -> m ()
forall a b. (a -> b) -> a -> b
$ IO (Either QErr ()) -> m (Either QErr ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either QErr ()) -> m (Either QErr ()))
-> IO (Either QErr ()) -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$ PGSourceConfig
-> PGExecFrom -> TxET QErr IO () -> IO (Either QErr ())
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> PGExecFrom -> TxET QErr m a -> m (Either QErr a)
runPgSourceWriteTx PGSourceConfig
sourceConfig PGExecFrom
InternalRawQuery (TxET QErr IO () -> IO (Either QErr ()))
-> TxET QErr IO () -> IO (Either QErr ())
forall a b. (a -> b) -> a -> b
$ Text -> TxET QErr IO ()
updateCleanupEventStatusToPausedTx Text
cleanupLogId

updateCleanupEventStatusToCompletedTx :: Text -> DeletedEventLogStats -> PG.TxE QErr ()
updateCleanupEventStatusToCompletedTx :: Text -> DeletedEventLogStats -> TxET QErr IO ()
updateCleanupEventStatusToCompletedTx Text
cleanupLogId (DeletedEventLogStats Int
numEventLogs Int
numInvocationLogs) =
  (PGTxErr -> QErr)
-> Query -> (Text, Int64, Int64) -> Bool -> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE
    PGTxErr -> QErr
defaultTxErrorHandler
    [PG.sql|
          UPDATE hdb_catalog.hdb_event_log_cleanups
          SET status = 'completed', deleted_event_logs = $2 , deleted_event_invocation_logs = $3
          WHERE id = $1
          |]
    (Text
cleanupLogId, Int64
delLogs, Int64
delInvLogs)
    Bool
True
  where
    delLogs :: Int64
delLogs = (Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Int64) -> Int -> Int64
forall a b. (a -> b) -> a -> b
$ Int
numEventLogs) :: Int64
    delInvLogs :: Int64
delInvLogs = (Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Int64) -> Int -> Int64
forall a b. (a -> b) -> a -> b
$ Int
numInvocationLogs) :: Int64

-- | @updateCleanupEventStatusToCompleted@ updates the cleanup log status after the event logs are deleted.
-- This will perform the following steps:
--
-- 1. Updates the cleanup config status to `completed`.
-- 2. Updates the number of event logs and event invocation logs that were deleted for a trigger name
updateCleanupEventStatusToCompleted ::
  (MonadIO m, MonadError QErr m) =>
  PGSourceConfig ->
  Text ->
  DeletedEventLogStats ->
  m ()
updateCleanupEventStatusToCompleted :: forall (m :: * -> *).
(MonadIO m, MonadError QErr m) =>
PGSourceConfig -> Text -> DeletedEventLogStats -> m ()
updateCleanupEventStatusToCompleted PGSourceConfig
sourceConfig Text
cleanupLogId DeletedEventLogStats
delStats =
  m (Either QErr ()) -> m ()
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM (m (Either QErr ()) -> m ()) -> m (Either QErr ()) -> m ()
forall a b. (a -> b) -> a -> b
$ IO (Either QErr ()) -> m (Either QErr ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either QErr ()) -> m (Either QErr ()))
-> IO (Either QErr ()) -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$ PGSourceConfig
-> PGExecFrom -> TxET QErr IO () -> IO (Either QErr ())
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> PGExecFrom -> TxET QErr m a -> m (Either QErr a)
runPgSourceWriteTx PGSourceConfig
sourceConfig PGExecFrom
InternalRawQuery (TxET QErr IO () -> IO (Either QErr ()))
-> TxET QErr IO () -> IO (Either QErr ())
forall a b. (a -> b) -> a -> b
$ Text -> DeletedEventLogStats -> TxET QErr IO ()
updateCleanupEventStatusToCompletedTx Text
cleanupLogId DeletedEventLogStats
delStats

deleteEventTriggerLogsTx :: TriggerLogCleanupConfig -> PG.TxE QErr DeletedEventLogStats
deleteEventTriggerLogsTx :: TriggerLogCleanupConfig -> TxE QErr DeletedEventLogStats
deleteEventTriggerLogsTx TriggerLogCleanupConfig {Bool
Int
SourceName
TriggerName
tlccEventTriggerName :: TriggerName
tlccSourceName :: SourceName
tlccBatchSize :: Int
tlccClearOlderThan :: Int
tlccTimeout :: Int
tlccCleanInvocationLogs :: Bool
tlccEventTriggerName :: TriggerLogCleanupConfig -> TriggerName
tlccSourceName :: TriggerLogCleanupConfig -> SourceName
tlccBatchSize :: TriggerLogCleanupConfig -> Int
tlccClearOlderThan :: TriggerLogCleanupConfig -> Int
tlccTimeout :: TriggerLogCleanupConfig -> Int
tlccCleanInvocationLogs :: TriggerLogCleanupConfig -> Bool
..} = do
  -- Setting the timeout
  (PGTxErr -> QErr) -> Query -> () -> Bool -> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE PGTxErr -> QErr
defaultTxErrorHandler (Text -> Query
PG.fromText (Text -> Query) -> Text -> Query
forall a b. (a -> b) -> a -> b
$ Text
"SET statement_timeout = " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> (Int64 -> Text
forall a. Show a => a -> Text
tshow Int64
qTimeout)) () Bool
True
  -- Select all the dead events based on criteria set in the cleanup config.
  [EventId]
deadEventIDs <-
    (Identity EventId -> EventId) -> [Identity EventId] -> [EventId]
forall a b. (a -> b) -> [a] -> [b]
map Identity EventId -> EventId
forall a. Identity a -> a
runIdentity
      ([Identity EventId] -> [EventId])
-> TxET QErr IO [Identity EventId] -> TxET QErr IO [EventId]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> (Text, Int64)
-> Bool
-> TxET QErr IO [Identity EventId]
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
PG.withQE
        PGTxErr -> QErr
defaultTxErrorHandler
        ( Text -> Query
PG.fromText
            [ST.st|
          SELECT id FROM hdb_catalog.event_log
          WHERE ((delivered = true OR error = true) AND trigger_name = $1)
          AND created_at < now() - interval '#{qRetentionPeriod}'
          AND locked IS NULL
          LIMIT $2
        |]
        )
        (Text
qTriggerName, Int64
qBatchSize)
        Bool
True
  --  Lock the events in the database so that other HGE instances don't pick them up for deletion.
  (PGTxErr -> QErr)
-> Query -> Identity PGTextArray -> Bool -> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE
    PGTxErr -> QErr
defaultTxErrorHandler
    [PG.sql|
      UPDATE hdb_catalog.event_log
      SET locked = now()
      WHERE id = ANY($1::text[]);
    |]
    (PGTextArray -> Identity PGTextArray
forall a. a -> Identity a
Identity (PGTextArray -> Identity PGTextArray)
-> PGTextArray -> Identity PGTextArray
forall a b. (a -> b) -> a -> b
$ [Text] -> PGTextArray
PGTextArray ([Text] -> PGTextArray) -> [Text] -> PGTextArray
forall a b. (a -> b) -> a -> b
$ (EventId -> Text) -> [EventId] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map EventId -> Text
unEventId [EventId]
deadEventIDs)
    Bool
True
  --  Based on the config either delete the corresponding invocation logs or set trigger_name
  --  to appropriate value. Please note that the event_id won't exist anymore in the event_log
  --  table, but we are still retaining it for debugging purpose.
  Int
deletedInvocationLogs <-
    if Bool
tlccCleanInvocationLogs
      then
        Identity Int -> Int
forall a. Identity a -> a
runIdentity
          (Identity Int -> Int)
-> (SingleRow (Identity Int) -> Identity Int)
-> SingleRow (Identity Int)
-> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SingleRow (Identity Int) -> Identity Int
forall a. SingleRow a -> a
PG.getRow
          (SingleRow (Identity Int) -> Int)
-> TxET QErr IO (SingleRow (Identity Int)) -> TxET QErr IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> Identity PGTextArray
-> Bool
-> TxET QErr IO (SingleRow (Identity Int))
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
PG.withQE
            PGTxErr -> QErr
defaultTxErrorHandler
            [PG.sql|
              WITH deletedInvocations AS (
                DELETE FROM hdb_catalog.event_invocation_logs
                WHERE event_id = ANY($1::text[])
                RETURNING 1
              )
              SELECT count(*) FROM deletedInvocations;
            |]
            (PGTextArray -> Identity PGTextArray
forall a. a -> Identity a
Identity (PGTextArray -> Identity PGTextArray)
-> PGTextArray -> Identity PGTextArray
forall a b. (a -> b) -> a -> b
$ [Text] -> PGTextArray
PGTextArray ([Text] -> PGTextArray) -> [Text] -> PGTextArray
forall a b. (a -> b) -> a -> b
$ (EventId -> Text) -> [EventId] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map EventId -> Text
unEventId [EventId]
deadEventIDs)
            Bool
True
      else do
        (PGTxErr -> QErr)
-> Query -> (PGTextArray, Text) -> Bool -> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE
          PGTxErr -> QErr
defaultTxErrorHandler
          [PG.sql|
            UPDATE hdb_catalog.event_invocation_logs
            SET trigger_name = $2
            WHERE event_id = ANY($1::text[])
          |]
          ([Text] -> PGTextArray
PGTextArray ([Text] -> PGTextArray) -> [Text] -> PGTextArray
forall a b. (a -> b) -> a -> b
$ (EventId -> Text) -> [EventId] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map EventId -> Text
unEventId [EventId]
deadEventIDs, Text
qTriggerName)
          Bool
True
        Int -> TxET QErr IO Int
forall a. a -> TxET QErr IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int
0
  --  Finally delete the event logs.
  Int
deletedEventLogs <-
    Identity Int -> Int
forall a. Identity a -> a
runIdentity
      (Identity Int -> Int)
-> (SingleRow (Identity Int) -> Identity Int)
-> SingleRow (Identity Int)
-> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SingleRow (Identity Int) -> Identity Int
forall a. SingleRow a -> a
PG.getRow
      (SingleRow (Identity Int) -> Int)
-> TxET QErr IO (SingleRow (Identity Int)) -> TxET QErr IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> Identity PGTextArray
-> Bool
-> TxET QErr IO (SingleRow (Identity Int))
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
PG.withQE
        PGTxErr -> QErr
defaultTxErrorHandler
        [PG.sql|
          WITH deletedEvents AS (
            DELETE FROM hdb_catalog.event_log
            WHERE id = ANY($1::text[])
            RETURNING 1
          )
          SELECT count(*) FROM deletedEvents;
        |]
        (PGTextArray -> Identity PGTextArray
forall a. a -> Identity a
Identity (PGTextArray -> Identity PGTextArray)
-> PGTextArray -> Identity PGTextArray
forall a b. (a -> b) -> a -> b
$ [Text] -> PGTextArray
PGTextArray ([Text] -> PGTextArray) -> [Text] -> PGTextArray
forall a b. (a -> b) -> a -> b
$ (EventId -> Text) -> [EventId] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map EventId -> Text
unEventId [EventId]
deadEventIDs)
        Bool
True
  -- Resetting the timeout to default value (0)
  (PGTxErr -> QErr) -> Query -> () -> Bool -> TxET QErr IO ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE
    PGTxErr -> QErr
defaultTxErrorHandler
    [PG.sql|
      SET statement_timeout = 0;
    |]
    ()
    Bool
False
  DeletedEventLogStats -> TxE QErr DeletedEventLogStats
forall a. a -> TxET QErr IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure DeletedEventLogStats {Int
deletedInvocationLogs :: Int
deletedEventLogs :: Int
deletedEventLogs :: Int
deletedInvocationLogs :: Int
..}
  where
    qTimeout :: Int64
qTimeout = (Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Int64) -> Int -> Int64
forall a b. (a -> b) -> a -> b
$ Int
tlccTimeout Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000) :: Int64
    qTriggerName :: Text
qTriggerName = TriggerName -> Text
triggerNameToTxt TriggerName
tlccEventTriggerName
    qRetentionPeriod :: Text
qRetentionPeriod = Int -> Text
forall a. Show a => a -> Text
tshow Int
tlccClearOlderThan Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" hours"
    qBatchSize :: Int64
qBatchSize = (Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
tlccBatchSize) :: Int64

-- | @deleteEventTriggerLogs@ deletes the event logs (and event invocation logs) based on the cleanup configuration given
-- This will perform the following steps:
--
-- 1. Select all the dead events based on criteria set in the cleanup config.
-- 2. Lock the events in the database so that other HGE instances don't pick them up for deletion.
-- 3. Based on the config, perform the delete action.
deleteEventTriggerLogs ::
  (MonadIO m, MonadError QErr m) =>
  PGSourceConfig ->
  TriggerLogCleanupConfig ->
  IO (Maybe (TriggerLogCleanupConfig, EventTriggerCleanupStatus)) ->
  m DeletedEventLogStats
deleteEventTriggerLogs :: forall (m :: * -> *).
(MonadIO m, MonadError QErr m) =>
PGSourceConfig
-> TriggerLogCleanupConfig
-> IO (Maybe (TriggerLogCleanupConfig, EventTriggerCleanupStatus))
-> m DeletedEventLogStats
deleteEventTriggerLogs PGSourceConfig
sourceConfig TriggerLogCleanupConfig
oldCleanupConfig IO (Maybe (TriggerLogCleanupConfig, EventTriggerCleanupStatus))
getLatestCleanupConfig = do
  IO (Maybe (TriggerLogCleanupConfig, EventTriggerCleanupStatus))
-> TriggerLogCleanupConfig
-> (TriggerLogCleanupConfig
    -> IO (Either QErr DeletedEventLogStats))
-> m DeletedEventLogStats
forall (m :: * -> *).
(MonadIO m, MonadError QErr m) =>
IO (Maybe (TriggerLogCleanupConfig, EventTriggerCleanupStatus))
-> TriggerLogCleanupConfig
-> (TriggerLogCleanupConfig
    -> IO (Either QErr DeletedEventLogStats))
-> m DeletedEventLogStats
deleteEventTriggerLogsInBatchesWith IO (Maybe (TriggerLogCleanupConfig, EventTriggerCleanupStatus))
getLatestCleanupConfig TriggerLogCleanupConfig
oldCleanupConfig ((TriggerLogCleanupConfig -> IO (Either QErr DeletedEventLogStats))
 -> m DeletedEventLogStats)
-> (TriggerLogCleanupConfig
    -> IO (Either QErr DeletedEventLogStats))
-> m DeletedEventLogStats
forall a b. (a -> b) -> a -> b
$ \TriggerLogCleanupConfig
cleanupConfig -> do
    PGSourceConfig
-> PGExecFrom
-> TxE QErr DeletedEventLogStats
-> IO (Either QErr DeletedEventLogStats)
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> PGExecFrom -> TxET QErr m a -> m (Either QErr a)
runPgSourceWriteTx PGSourceConfig
sourceConfig PGExecFrom
InternalRawQuery (TxE QErr DeletedEventLogStats
 -> IO (Either QErr DeletedEventLogStats))
-> TxE QErr DeletedEventLogStats
-> IO (Either QErr DeletedEventLogStats)
forall a b. (a -> b) -> a -> b
$ TriggerLogCleanupConfig -> TxE QErr DeletedEventLogStats
deleteEventTriggerLogsTx TriggerLogCleanupConfig
cleanupConfig

fetchEventLogs ::
  (MonadError QErr m, MonadIO m) =>
  PGSourceConfig ->
  GetEventLogs b ->
  m [EventLog]
fetchEventLogs :: forall (m :: * -> *) (b :: BackendType).
(MonadError QErr m, MonadIO m) =>
PGSourceConfig -> GetEventLogs b -> m [EventLog]
fetchEventLogs PGSourceConfig
sourceConfig GetEventLogs b
getEventLogs = do
  IO (Either QErr [EventLog]) -> m (Either QErr [EventLog])
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (PGSourceConfig
-> TxET QErr IO [EventLog] -> IO (Either QErr [EventLog])
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> TxET QErr m a -> m (Either QErr a)
runPgSourceReadTx PGSourceConfig
sourceConfig (TxET QErr IO [EventLog] -> IO (Either QErr [EventLog]))
-> TxET QErr IO [EventLog] -> IO (Either QErr [EventLog])
forall a b. (a -> b) -> a -> b
$ GetEventLogs b -> TxET QErr IO [EventLog]
forall (b :: BackendType).
GetEventLogs b -> TxET QErr IO [EventLog]
fetchEventLogsTxE GetEventLogs b
getEventLogs)
    m (Either QErr [EventLog])
-> (QErr -> m [EventLog]) -> m [EventLog]
forall (m :: * -> *) e a.
Monad m =>
m (Either e a) -> (e -> m a) -> m a
`onLeftM` (QErr -> m [EventLog]
forall a. QErr -> m a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (QErr -> m [EventLog]) -> (QErr -> QErr) -> QErr -> m [EventLog]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> QErr -> QErr
prefixQErr Text
"unexpected error while fetching event logs: ")

fetchEventLogsTxE :: GetEventLogs b -> PG.TxE QErr [EventLog]
fetchEventLogsTxE :: forall (b :: BackendType).
GetEventLogs b -> TxET QErr IO [EventLog]
fetchEventLogsTxE GetEventLogs {Int
SourceName
EventLogStatus
TriggerName
_gelName :: TriggerName
_gelSourceName :: SourceName
_gelLimit :: Int
_gelOffset :: Int
_gelStatus :: EventLogStatus
_gelName :: forall (b :: BackendType). GetEventLogs b -> TriggerName
_gelSourceName :: forall (b :: BackendType). GetEventLogs b -> SourceName
_gelLimit :: forall (b :: BackendType). GetEventLogs b -> Int
_gelOffset :: forall (b :: BackendType). GetEventLogs b -> Int
_gelStatus :: forall (b :: BackendType). GetEventLogs b -> EventLogStatus
..} = do
  case EventLogStatus
status of
    EventLogStatus
Pending -> do
      ((EventId, Text, Text, TriggerName, ViaJSON Value, Bool, Bool, Int,
  UTCTime, Maybe UTCTime, Maybe UTCTime, Bool)
 -> EventLog)
-> [(EventId, Text, Text, TriggerName, ViaJSON Value, Bool, Bool,
     Int, UTCTime, Maybe UTCTime, Maybe UTCTime, Bool)]
-> [EventLog]
forall a b. (a -> b) -> [a] -> [b]
map (EventId, Text, Text, TriggerName, ViaJSON Value, Bool, Bool, Int,
 UTCTime, Maybe UTCTime, Maybe UTCTime, Bool)
-> EventLog
uncurryEventLog
        ([(EventId, Text, Text, TriggerName, ViaJSON Value, Bool, Bool,
   Int, UTCTime, Maybe UTCTime, Maybe UTCTime, Bool)]
 -> [EventLog])
-> TxET
     QErr
     IO
     [(EventId, Text, Text, TriggerName, ViaJSON Value, Bool, Bool, Int,
       UTCTime, Maybe UTCTime, Maybe UTCTime, Bool)]
-> TxET QErr IO [EventLog]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> (Text, Int64, Int64)
-> Bool
-> TxET
     QErr
     IO
     [(EventId, Text, Text, TriggerName, ViaJSON Value, Bool, Bool, Int,
       UTCTime, Maybe UTCTime, Maybe UTCTime, Bool)]
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
PG.withQE
          PGTxErr -> QErr
defaultTxErrorHandler
          [PG.sql|
            SELECT id, schema_name, table_name, trigger_name, payload, delivered, error, tries, created_at, locked, next_retry_at, archived
              FROM hdb_catalog.event_log
              WHERE trigger_name = $1 
              AND delivered=false AND error=false AND archived=false ORDER BY created_at DESC LIMIT $2 OFFSET $3;
            |]
          (Text
triggerName, Int64
limit, Int64
offset)
          Bool
True
    EventLogStatus
Processed -> do
      ((EventId, Text, Text, TriggerName, ViaJSON Value, Bool, Bool, Int,
  UTCTime, Maybe UTCTime, Maybe UTCTime, Bool)
 -> EventLog)
-> [(EventId, Text, Text, TriggerName, ViaJSON Value, Bool, Bool,
     Int, UTCTime, Maybe UTCTime, Maybe UTCTime, Bool)]
-> [EventLog]
forall a b. (a -> b) -> [a] -> [b]
map (EventId, Text, Text, TriggerName, ViaJSON Value, Bool, Bool, Int,
 UTCTime, Maybe UTCTime, Maybe UTCTime, Bool)
-> EventLog
uncurryEventLog
        ([(EventId, Text, Text, TriggerName, ViaJSON Value, Bool, Bool,
   Int, UTCTime, Maybe UTCTime, Maybe UTCTime, Bool)]
 -> [EventLog])
-> TxET
     QErr
     IO
     [(EventId, Text, Text, TriggerName, ViaJSON Value, Bool, Bool, Int,
       UTCTime, Maybe UTCTime, Maybe UTCTime, Bool)]
-> TxET QErr IO [EventLog]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> (Text, Int64, Int64)
-> Bool
-> TxET
     QErr
     IO
     [(EventId, Text, Text, TriggerName, ViaJSON Value, Bool, Bool, Int,
       UTCTime, Maybe UTCTime, Maybe UTCTime, Bool)]
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
PG.withQE
          PGTxErr -> QErr
defaultTxErrorHandler
          [PG.sql|
            SELECT id, schema_name, table_name, trigger_name, payload, delivered, error, tries, created_at, locked, next_retry_at, archived
              FROM hdb_catalog.event_log
              WHERE trigger_name = $1 
              AND (delivered=true OR error=true) AND archived=false ORDER BY created_at DESC LIMIT $2 OFFSET $3;
            |]
          (Text
triggerName, Int64
limit, Int64
offset)
          Bool
True
    EventLogStatus
All -> do
      ((EventId, Text, Text, TriggerName, ViaJSON Value, Bool, Bool, Int,
  UTCTime, Maybe UTCTime, Maybe UTCTime, Bool)
 -> EventLog)
-> [(EventId, Text, Text, TriggerName, ViaJSON Value, Bool, Bool,
     Int, UTCTime, Maybe UTCTime, Maybe UTCTime, Bool)]
-> [EventLog]
forall a b. (a -> b) -> [a] -> [b]
map (EventId, Text, Text, TriggerName, ViaJSON Value, Bool, Bool, Int,
 UTCTime, Maybe UTCTime, Maybe UTCTime, Bool)
-> EventLog
uncurryEventLog
        ([(EventId, Text, Text, TriggerName, ViaJSON Value, Bool, Bool,
   Int, UTCTime, Maybe UTCTime, Maybe UTCTime, Bool)]
 -> [EventLog])
-> TxET
     QErr
     IO
     [(EventId, Text, Text, TriggerName, ViaJSON Value, Bool, Bool, Int,
       UTCTime, Maybe UTCTime, Maybe UTCTime, Bool)]
-> TxET QErr IO [EventLog]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> (Text, Int64, Int64)
-> Bool
-> TxET
     QErr
     IO
     [(EventId, Text, Text, TriggerName, ViaJSON Value, Bool, Bool, Int,
       UTCTime, Maybe UTCTime, Maybe UTCTime, Bool)]
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
PG.withQE
          PGTxErr -> QErr
defaultTxErrorHandler
          [PG.sql|
            SELECT id, schema_name, table_name, trigger_name, payload, delivered, error, tries, created_at, locked, next_retry_at, archived
              FROM hdb_catalog.event_log
              WHERE trigger_name = $1 
              ORDER BY created_at DESC LIMIT $2 OFFSET $3;
            |]
          (Text
triggerName, Int64
limit, Int64
offset)
          Bool
True
  where
    triggerName :: Text
triggerName = TriggerName -> Text
triggerNameToTxt TriggerName
_gelName
    status :: EventLogStatus
status = EventLogStatus
_gelStatus
    Int64
limit :: Int64 = Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Int64) -> Int -> Int64
forall a b. (a -> b) -> a -> b
$ Int
_gelLimit
    Int64
offset :: Int64 = Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Int64) -> Int -> Int64
forall a b. (a -> b) -> a -> b
$ Int
_gelOffset

fetchEventInvocationLogs ::
  (MonadError QErr m, MonadIO m) =>
  PGSourceConfig ->
  GetEventInvocations b ->
  m [EventInvocationLog]
fetchEventInvocationLogs :: forall (m :: * -> *) (b :: BackendType).
(MonadError QErr m, MonadIO m) =>
PGSourceConfig -> GetEventInvocations b -> m [EventInvocationLog]
fetchEventInvocationLogs PGSourceConfig
sourceConfig GetEventInvocations b
getEventInvocationLogs = do
  IO (Either QErr [EventInvocationLog])
-> m (Either QErr [EventInvocationLog])
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (PGSourceConfig
-> TxET QErr IO [EventInvocationLog]
-> IO (Either QErr [EventInvocationLog])
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> TxET QErr m a -> m (Either QErr a)
runPgSourceReadTx PGSourceConfig
sourceConfig (TxET QErr IO [EventInvocationLog]
 -> IO (Either QErr [EventInvocationLog]))
-> TxET QErr IO [EventInvocationLog]
-> IO (Either QErr [EventInvocationLog])
forall a b. (a -> b) -> a -> b
$ GetEventInvocations b -> TxET QErr IO [EventInvocationLog]
forall (b :: BackendType).
GetEventInvocations b -> TxET QErr IO [EventInvocationLog]
fetchEventInvocationLogsTxE GetEventInvocations b
getEventInvocationLogs)
    m (Either QErr [EventInvocationLog])
-> (QErr -> m [EventInvocationLog]) -> m [EventInvocationLog]
forall (m :: * -> *) e a.
Monad m =>
m (Either e a) -> (e -> m a) -> m a
`onLeftM` (QErr -> m [EventInvocationLog]
forall a. QErr -> m a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (QErr -> m [EventInvocationLog])
-> (QErr -> QErr) -> QErr -> m [EventInvocationLog]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> QErr -> QErr
prefixQErr Text
"unexpected error while fetching invocation logs: ")

fetchEventInvocationLogsTxE :: GetEventInvocations b -> PG.TxE QErr [EventInvocationLog]
fetchEventInvocationLogsTxE :: forall (b :: BackendType).
GetEventInvocations b -> TxET QErr IO [EventInvocationLog]
fetchEventInvocationLogsTxE GetEventInvocations {Int
SourceName
TriggerName
_geiName :: TriggerName
_geiSourceName :: SourceName
_geiLimit :: Int
_geiOffset :: Int
_geiName :: forall (b :: BackendType). GetEventInvocations b -> TriggerName
_geiSourceName :: forall (b :: BackendType). GetEventInvocations b -> SourceName
_geiLimit :: forall (b :: BackendType). GetEventInvocations b -> Int
_geiOffset :: forall (b :: BackendType). GetEventInvocations b -> Int
..} = do
  ((Text, TriggerName, EventId, Maybe Int, ViaJSON Value,
  ViaJSON Value, UTCTime)
 -> EventInvocationLog)
-> [(Text, TriggerName, EventId, Maybe Int, ViaJSON Value,
     ViaJSON Value, UTCTime)]
-> [EventInvocationLog]
forall a b. (a -> b) -> [a] -> [b]
map (Text, TriggerName, EventId, Maybe Int, ViaJSON Value,
 ViaJSON Value, UTCTime)
-> EventInvocationLog
uncurryEventInvocationLog
    ([(Text, TriggerName, EventId, Maybe Int, ViaJSON Value,
   ViaJSON Value, UTCTime)]
 -> [EventInvocationLog])
-> TxET
     QErr
     IO
     [(Text, TriggerName, EventId, Maybe Int, ViaJSON Value,
       ViaJSON Value, UTCTime)]
-> TxET QErr IO [EventInvocationLog]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> (Text, Int64, Int64)
-> Bool
-> TxET
     QErr
     IO
     [(Text, TriggerName, EventId, Maybe Int, ViaJSON Value,
       ViaJSON Value, UTCTime)]
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
PG.withQE
      PGTxErr -> QErr
defaultTxErrorHandler
      [PG.sql|
        SELECT id, trigger_name, event_id, status, request, response, created_at
          FROM hdb_catalog.event_invocation_logs
          WHERE trigger_name = $1 
          ORDER BY created_at DESC LIMIT $2 OFFSET $3;
        |]
      (Text
triggerName, Int64
limit, Int64
offset)
      Bool
True
  where
    triggerName :: Text
triggerName = TriggerName -> Text
triggerNameToTxt TriggerName
_geiName
    Int64
limit :: Int64 = Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Int64) -> Int -> Int64
forall a b. (a -> b) -> a -> b
$ Int
_geiLimit
    Int64
offset :: Int64 = Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Int64) -> Int -> Int64
forall a b. (a -> b) -> a -> b
$ Int
_geiOffset

fetchEventById ::
  (MonadError QErr m, MonadIO m) =>
  PGSourceConfig ->
  GetEventById b ->
  m (EventLogWithInvocations)
fetchEventById :: forall (m :: * -> *) (b :: BackendType).
(MonadError QErr m, MonadIO m) =>
PGSourceConfig -> GetEventById b -> m EventLogWithInvocations
fetchEventById PGSourceConfig
sourceConfig GetEventById b
getEventById = do
  Either QErr EventLogWithInvocations
fetchEventByIdTxE' <- IO (Either QErr EventLogWithInvocations)
-> m (Either QErr EventLogWithInvocations)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either QErr EventLogWithInvocations)
 -> m (Either QErr EventLogWithInvocations))
-> IO (Either QErr EventLogWithInvocations)
-> m (Either QErr EventLogWithInvocations)
forall a b. (a -> b) -> a -> b
$ PGSourceConfig
-> TxET QErr IO EventLogWithInvocations
-> IO (Either QErr EventLogWithInvocations)
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
PGSourceConfig -> TxET QErr m a -> m (Either QErr a)
runPgSourceReadTx PGSourceConfig
sourceConfig (TxET QErr IO EventLogWithInvocations
 -> IO (Either QErr EventLogWithInvocations))
-> TxET QErr IO EventLogWithInvocations
-> IO (Either QErr EventLogWithInvocations)
forall a b. (a -> b) -> a -> b
$ GetEventById b -> TxET QErr IO EventLogWithInvocations
forall (b :: BackendType).
GetEventById b -> TxET QErr IO EventLogWithInvocations
fetchEventByIdTxE GetEventById b
getEventById
  case Either QErr EventLogWithInvocations
fetchEventByIdTxE' of
    Left QErr
err ->
      QErr -> m EventLogWithInvocations
forall a. QErr -> m a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError
        (QErr -> m EventLogWithInvocations)
-> QErr -> m EventLogWithInvocations
forall a b. (a -> b) -> a -> b
$ Text -> QErr -> QErr
prefixQErr (Text
"unexpected error while fetching event with id " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
eventId Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
": ") QErr
err
    Right EventLogWithInvocations
eventLogWithInvocations -> do
      if Maybe EventLog -> Bool
forall a. Maybe a -> Bool
isNothing (EventLogWithInvocations -> Maybe EventLog
elwiEvent EventLogWithInvocations
eventLogWithInvocations)
        then Code -> Text -> m EventLogWithInvocations
forall (m :: * -> *) a. QErrM m => Code -> Text -> m a
throw400 Code
NotExists Text
errMsg
        else EventLogWithInvocations -> m EventLogWithInvocations
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return EventLogWithInvocations
eventLogWithInvocations
  where
    eventId :: Text
eventId = EventId -> Text
unEventId (EventId -> Text) -> EventId -> Text
forall a b. (a -> b) -> a -> b
$ GetEventById b -> EventId
forall (b :: BackendType). GetEventById b -> EventId
_gebiEventId GetEventById b
getEventById
    errMsg :: Text
errMsg = Text
"event id " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
eventId Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" does not exist"

fetchEventByIdTxE :: GetEventById b -> PG.TxE QErr (EventLogWithInvocations)
fetchEventByIdTxE :: forall (b :: BackendType).
GetEventById b -> TxET QErr IO EventLogWithInvocations
fetchEventByIdTxE GetEventById {Int
EventId
SourceName
_gebiEventId :: forall (b :: BackendType). GetEventById b -> EventId
_gebiSourceName :: SourceName
_gebiEventId :: EventId
_gebiInvocationLogLimit :: Int
_gebiInvocationLogOffset :: Int
_gebiSourceName :: forall (b :: BackendType). GetEventById b -> SourceName
_gebiInvocationLogLimit :: forall (b :: BackendType). GetEventById b -> Int
_gebiInvocationLogOffset :: forall (b :: BackendType). GetEventById b -> Int
..} = do
  [EventLog]
events <-
    ((EventId, Text, Text, TriggerName, ViaJSON Value, Bool, Bool, Int,
  UTCTime, Maybe UTCTime, Maybe UTCTime, Bool)
 -> EventLog)
-> [(EventId, Text, Text, TriggerName, ViaJSON Value, Bool, Bool,
     Int, UTCTime, Maybe UTCTime, Maybe UTCTime, Bool)]
-> [EventLog]
forall a b. (a -> b) -> [a] -> [b]
map (EventId, Text, Text, TriggerName, ViaJSON Value, Bool, Bool, Int,
 UTCTime, Maybe UTCTime, Maybe UTCTime, Bool)
-> EventLog
uncurryEventLog
      ([(EventId, Text, Text, TriggerName, ViaJSON Value, Bool, Bool,
   Int, UTCTime, Maybe UTCTime, Maybe UTCTime, Bool)]
 -> [EventLog])
-> TxET
     QErr
     IO
     [(EventId, Text, Text, TriggerName, ViaJSON Value, Bool, Bool, Int,
       UTCTime, Maybe UTCTime, Maybe UTCTime, Bool)]
-> TxET QErr IO [EventLog]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> Identity Text
-> Bool
-> TxET
     QErr
     IO
     [(EventId, Text, Text, TriggerName, ViaJSON Value, Bool, Bool, Int,
       UTCTime, Maybe UTCTime, Maybe UTCTime, Bool)]
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
PG.withQE
        PGTxErr -> QErr
defaultTxErrorHandler
        [PG.sql|
          SELECT id, schema_name, table_name, trigger_name, payload, delivered, error, tries, created_at, locked, next_retry_at, archived
            FROM hdb_catalog.event_log
            WHERE id = $1;
          |]
        (Text -> Identity Text
forall a. a -> Identity a
Identity Text
eventId)
        Bool
True
  case [EventLog]
events of
    [] -> EventLogWithInvocations -> TxET QErr IO EventLogWithInvocations
forall a. a -> TxET QErr IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (EventLogWithInvocations -> TxET QErr IO EventLogWithInvocations)
-> EventLogWithInvocations -> TxET QErr IO EventLogWithInvocations
forall a b. (a -> b) -> a -> b
$ Maybe EventLog -> [EventInvocationLog] -> EventLogWithInvocations
EventLogWithInvocations Maybe EventLog
forall a. Maybe a
Nothing []
    [EventLog
event] -> do
      [EventInvocationLog]
invocations <-
        ((Text, TriggerName, EventId, Maybe Int, ViaJSON Value,
  ViaJSON Value, UTCTime)
 -> EventInvocationLog)
-> [(Text, TriggerName, EventId, Maybe Int, ViaJSON Value,
     ViaJSON Value, UTCTime)]
-> [EventInvocationLog]
forall a b. (a -> b) -> [a] -> [b]
map (Text, TriggerName, EventId, Maybe Int, ViaJSON Value,
 ViaJSON Value, UTCTime)
-> EventInvocationLog
uncurryEventInvocationLog
          ([(Text, TriggerName, EventId, Maybe Int, ViaJSON Value,
   ViaJSON Value, UTCTime)]
 -> [EventInvocationLog])
-> TxET
     QErr
     IO
     [(Text, TriggerName, EventId, Maybe Int, ViaJSON Value,
       ViaJSON Value, UTCTime)]
-> TxET QErr IO [EventInvocationLog]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> (Text, Int64, Int64)
-> Bool
-> TxET
     QErr
     IO
     [(Text, TriggerName, EventId, Maybe Int, ViaJSON Value,
       ViaJSON Value, UTCTime)]
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
PG.withQE
            PGTxErr -> QErr
defaultTxErrorHandler
            [PG.sql|
              SELECT id, trigger_name, event_id, status, request, response, created_at
                FROM hdb_catalog.event_invocation_logs
                WHERE event_id = $1
                ORDER BY created_at DESC LIMIT $2 OFFSET $3;
              |]
            (Text
eventId, Int64
limit, Int64
offset)
            Bool
True
      EventLogWithInvocations -> TxET QErr IO EventLogWithInvocations
forall a. a -> TxET QErr IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (EventLogWithInvocations -> TxET QErr IO EventLogWithInvocations)
-> EventLogWithInvocations -> TxET QErr IO EventLogWithInvocations
forall a b. (a -> b) -> a -> b
$ Maybe EventLog -> [EventInvocationLog] -> EventLogWithInvocations
EventLogWithInvocations (EventLog -> Maybe EventLog
forall a. a -> Maybe a
Just EventLog
event) [EventInvocationLog]
invocations
    [EventLog]
_ -> Text -> TxET QErr IO EventLogWithInvocations
forall (m :: * -> *) a. QErrM m => Text -> m a
throw500 (Text -> TxET QErr IO EventLogWithInvocations)
-> Text -> TxET QErr IO EventLogWithInvocations
forall a b. (a -> b) -> a -> b
$ Text
"Unexpected error: Multiple events present with event id " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
eventId
  where
    eventId :: Text
eventId = EventId -> Text
unEventId EventId
_gebiEventId
    Int64
limit :: Int64 = Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Int64) -> Int -> Int64
forall a b. (a -> b) -> a -> b
$ Int
_gebiInvocationLogLimit
    Int64
offset :: Int64 = Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Int64) -> Int -> Int64
forall a b. (a -> b) -> a -> b
$ Int
_gebiInvocationLogOffset

uncurryEventLog ::
  (EventId, Text, Text, TriggerName, PG.ViaJSON Value, Bool, Bool, Int, Time.UTCTime, Maybe Time.UTCTime, Maybe Time.UTCTime, Bool) ->
  EventLog
uncurryEventLog :: (EventId, Text, Text, TriggerName, ViaJSON Value, Bool, Bool, Int,
 UTCTime, Maybe UTCTime, Maybe UTCTime, Bool)
-> EventLog
uncurryEventLog (EventId
eventId, Text
schemaName, Text
tableName, TriggerName
triggerName, PG.ViaJSON Value
payload, Bool
delivered, Bool
isError, Int
tries, UTCTime
createdAt, Maybe UTCTime
locked, Maybe UTCTime
nextRetryAt, Bool
archived) =
  EventLog
    { elId :: EventId
elId = EventId
eventId,
      elSchemaName :: Text
elSchemaName = Text
schemaName,
      elTableName :: Text
elTableName = Text
tableName,
      elTriggerName :: TriggerName
elTriggerName = TriggerName
triggerName,
      elPayload :: Value
elPayload = Value
payload,
      elDelivered :: Bool
elDelivered = Bool
delivered,
      elError :: Bool
elError = Bool
isError,
      elTries :: Int
elTries = Int
tries,
      elCreatedAt :: UTCTime
elCreatedAt = UTCTime
createdAt,
      elLocked :: Maybe UTCTime
elLocked = Maybe UTCTime
locked,
      elNextRetryAt :: Maybe UTCTime
elNextRetryAt = Maybe UTCTime
nextRetryAt,
      elArchived :: Bool
elArchived = Bool
archived
    }

uncurryEventInvocationLog ::
  (Text, TriggerName, EventId, Maybe Int, PG.ViaJSON Value, PG.ViaJSON Value, Time.UTCTime) ->
  EventInvocationLog
uncurryEventInvocationLog :: (Text, TriggerName, EventId, Maybe Int, ViaJSON Value,
 ViaJSON Value, UTCTime)
-> EventInvocationLog
uncurryEventInvocationLog (Text
invocationId, TriggerName
triggerName, EventId
eventId, Maybe Int
status, PG.ViaJSON Value
request, PG.ViaJSON Value
response, UTCTime
createdAt) =
  EventInvocationLog
    { eilId :: Text
eilId = Text
invocationId,
      eilTriggerName :: TriggerName
eilTriggerName = TriggerName
triggerName,
      eilEventId :: EventId
eilEventId = EventId
eventId,
      eilHttpStatus :: Maybe Int
eilHttpStatus = Maybe Int
status,
      eilRequest :: Value
eilRequest = Value
request,
      eilResponse :: Value
eilResponse = Value
response,
      eilCreatedAt :: UTCTime
eilCreatedAt = UTCTime
createdAt
    }