{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE TemplateHaskell #-}

module Hasura.Backends.MSSQL.DDL.EventTrigger
  ( createTableEventTrigger,
    fetchUndeliveredEvents,
    setRetry,
    recordSuccess,
    recordError,
    recordError',
    dropTriggerQ,
    dropTriggerAndArchiveEvents,
    dropDanglingSQLTrigger,
    redeliverEvent,
    insertManualEvent,
    unlockEventsInSource,
    getMaintenanceModeVersion,
    qualifyTableName,
    createMissingSQLTriggers,
    checkIfTriggerExists,
    addCleanupSchedules,
    deleteAllScheduledCleanups,
    getCleanupEventsForDeletion,
    updateCleanupEventStatusToDead,
    updateCleanupEventStatusToPaused,
    updateCleanupEventStatusToCompleted,
    deleteEventTriggerLogs,
    fetchEventLogs,
    fetchEventInvocationLogs,
    fetchEventById,
  )
where

import Control.Monad.Trans.Control (MonadBaseControl)
import Data.Aeson qualified as J
import Data.ByteString qualified as B
import Data.ByteString.Lazy (fromStrict)
import Data.FileEmbed (makeRelativeToProject)
import Data.HashMap.Strict qualified as HashMap
import Data.HashSet qualified as HashSet
import Data.Set.NonEmpty qualified as NE
import Data.Text qualified as T
import Data.Text.Encoding qualified as TE
import Data.Text.Extended (ToTxt, commaSeparated, toTxt)
import Data.Text.Lazy qualified as LT
import Data.Text.NonEmpty (mkNonEmptyTextUnsafe)
import Data.Time
import Data.Time.Format.ISO8601 (iso8601Show)
import Database.MSSQL.Transaction (TxE, TxET, multiRowQueryE, singleRowQueryE, unitQueryE)
import Database.ODBC.SQLServer (Datetime2 (..), Datetimeoffset (..), rawUnescapedText, toSql)
import Database.ODBC.TH qualified as ODBC
import Hasura.Backends.MSSQL.Connection
import Hasura.Backends.MSSQL.DDL.Source.Version
import Hasura.Backends.MSSQL.SQL.Error qualified as HGE
import Hasura.Backends.MSSQL.ToQuery (fromTableName, toQueryFlat)
import Hasura.Backends.MSSQL.Types (SchemaName (..), TableName (..))
import Hasura.Backends.MSSQL.Types.Internal (columnNameText, geoTypes)
import Hasura.Base.Error
import Hasura.Eventing.Common
import Hasura.Prelude
import Hasura.RQL.Types.BackendType
import Hasura.RQL.Types.Column
import Hasura.RQL.Types.Common
import Hasura.RQL.Types.EventTrigger
import Hasura.RQL.Types.Eventing (EventId (..), OpVar (..))
import Hasura.RQL.Types.Source
import Hasura.SQL.Types
import Hasura.Server.Types
import Hasura.Session
import Hasura.Table.Cache (PrimaryKey (..))
import Hasura.Tracing qualified as Tracing
import Text.Builder qualified as TB
import Text.Shakespeare.Text qualified as ST

-- | creates a SQL Values list from haskell list  (('123-abc'), ('456-vgh'), ('234-asd'))
generateSQLValuesFromList :: (ToTxt a) => [a] -> Text
generateSQLValuesFromList :: forall a. ToTxt a => [a] -> Text
generateSQLValuesFromList = (a -> Text) -> [a] -> Text
forall a. (a -> Text) -> [a] -> Text
generateSQLValuesFromListWith (\a
t -> Text
"'" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> a -> Text
forall a. ToTxt a => a -> Text
toTxt a
t Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"'")

generateSQLValuesFromListWith :: (a -> Text) -> [a] -> Text
generateSQLValuesFromListWith :: forall a. (a -> Text) -> [a] -> Text
generateSQLValuesFromListWith a -> Text
f [a]
events = [Text] -> Text
forall t (f :: * -> *). (ToTxt t, Foldable f) => f t -> Text
commaSeparated [Text]
values
  where
    values :: [Text]
values = (a -> Text) -> [a] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map (\a
e -> Text
"(" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> a -> Text
f a
e Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
")") [a]
events

fetchUndeliveredEvents ::
  (MonadIO m, MonadError QErr m) =>
  MSSQLSourceConfig ->
  SourceName ->
  [TriggerName] ->
  MaintenanceMode () ->
  FetchBatchSize ->
  m [Event 'MSSQL]
fetchUndeliveredEvents :: forall (m :: * -> *).
(MonadIO m, MonadError QErr m) =>
MSSQLSourceConfig
-> SourceName
-> [TriggerName]
-> MaintenanceMode ()
-> FetchBatchSize
-> m [Event 'MSSQL]
fetchUndeliveredEvents MSSQLSourceConfig
sourceConfig SourceName
sourceName [TriggerName]
triggerNames MaintenanceMode ()
_ FetchBatchSize
fetchBatchSize = do
  m (Either QErr [Event 'MSSQL]) -> m [Event 'MSSQL]
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM
    (m (Either QErr [Event 'MSSQL]) -> m [Event 'MSSQL])
-> m (Either QErr [Event 'MSSQL]) -> m [Event 'MSSQL]
forall a b. (a -> b) -> a -> b
$ IO (Either QErr [Event 'MSSQL]) -> m (Either QErr [Event 'MSSQL])
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
    (IO (Either QErr [Event 'MSSQL]) -> m (Either QErr [Event 'MSSQL]))
-> IO (Either QErr [Event 'MSSQL])
-> m (Either QErr [Event 'MSSQL])
forall a b. (a -> b) -> a -> b
$ MSSQLSourceConfig
-> TxET QErr IO [Event 'MSSQL] -> IO (Either QErr [Event 'MSSQL])
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
MSSQLSourceConfig -> TxET QErr m a -> m (Either QErr a)
runMSSQLSourceWriteTx MSSQLSourceConfig
sourceConfig
    (TxET QErr IO [Event 'MSSQL] -> IO (Either QErr [Event 'MSSQL]))
-> TxET QErr IO [Event 'MSSQL] -> IO (Either QErr [Event 'MSSQL])
forall a b. (a -> b) -> a -> b
$ SourceName
-> [TriggerName] -> FetchBatchSize -> TxET QErr IO [Event 'MSSQL]
fetchEvents SourceName
sourceName [TriggerName]
triggerNames FetchBatchSize
fetchBatchSize

setRetry ::
  (MonadIO m, MonadError QErr m) =>
  MSSQLSourceConfig ->
  Event 'MSSQL ->
  UTCTime ->
  MaintenanceMode MaintenanceModeVersion ->
  m ()
setRetry :: forall (m :: * -> *).
(MonadIO m, MonadError QErr m) =>
MSSQLSourceConfig
-> Event 'MSSQL
-> UTCTime
-> MaintenanceMode MaintenanceModeVersion
-> m ()
setRetry MSSQLSourceConfig
sourceConfig Event 'MSSQL
event UTCTime
retryTime MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion = do
  m (Either QErr ()) -> m ()
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM
    (m (Either QErr ()) -> m ()) -> m (Either QErr ()) -> m ()
forall a b. (a -> b) -> a -> b
$ IO (Either QErr ()) -> m (Either QErr ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
    (IO (Either QErr ()) -> m (Either QErr ()))
-> IO (Either QErr ()) -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$ MSSQLSourceConfig -> TxET QErr IO () -> IO (Either QErr ())
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
MSSQLSourceConfig -> TxET QErr m a -> m (Either QErr a)
runMSSQLSourceWriteTx MSSQLSourceConfig
sourceConfig
    (TxET QErr IO () -> IO (Either QErr ()))
-> TxET QErr IO () -> IO (Either QErr ())
forall a b. (a -> b) -> a -> b
$ Event 'MSSQL
-> UTCTime
-> MaintenanceMode MaintenanceModeVersion
-> TxET QErr IO ()
setRetryTx Event 'MSSQL
event UTCTime
retryTime MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion

insertManualEvent ::
  (MonadIO m, MonadError QErr m) =>
  MSSQLSourceConfig ->
  TableName ->
  TriggerName ->
  J.Value ->
  UserInfo ->
  Maybe Tracing.TraceContext ->
  m EventId
insertManualEvent :: forall (m :: * -> *).
(MonadIO m, MonadError QErr m) =>
MSSQLSourceConfig
-> TableName
-> TriggerName
-> Value
-> UserInfo
-> Maybe TraceContext
-> m EventId
insertManualEvent MSSQLSourceConfig
sourceConfig TableName
tableName TriggerName
triggerName Value
payload UserInfo
_userInfo Maybe TraceContext
_traceCtx =
  m (Either QErr EventId) -> m EventId
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM
    (m (Either QErr EventId) -> m EventId)
-> m (Either QErr EventId) -> m EventId
forall a b. (a -> b) -> a -> b
$ IO (Either QErr EventId) -> m (Either QErr EventId)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
    (IO (Either QErr EventId) -> m (Either QErr EventId))
-> IO (Either QErr EventId) -> m (Either QErr EventId)
forall a b. (a -> b) -> a -> b
$ MSSQLSourceConfig
-> TxET QErr IO EventId -> IO (Either QErr EventId)
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
MSSQLSourceConfig -> TxET QErr m a -> m (Either QErr a)
runMSSQLSourceWriteTx MSSQLSourceConfig
sourceConfig
    (TxET QErr IO EventId -> IO (Either QErr EventId))
-> TxET QErr IO EventId -> IO (Either QErr EventId)
forall a b. (a -> b) -> a -> b
$
    -- TODO: Include TraceContext in payload
    TableName -> TriggerName -> Value -> TxET QErr IO EventId
insertMSSQLManualEventTx TableName
tableName TriggerName
triggerName Value
payload

getMaintenanceModeVersion ::
  ( MonadIO m,
    MonadError QErr m
  ) =>
  MSSQLSourceConfig ->
  m MaintenanceModeVersion
getMaintenanceModeVersion :: forall (m :: * -> *).
(MonadIO m, MonadError QErr m) =>
MSSQLSourceConfig -> m MaintenanceModeVersion
getMaintenanceModeVersion MSSQLSourceConfig
sourceConfig =
  m (Either QErr MaintenanceModeVersion) -> m MaintenanceModeVersion
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM
    (m (Either QErr MaintenanceModeVersion)
 -> m MaintenanceModeVersion)
-> m (Either QErr MaintenanceModeVersion)
-> m MaintenanceModeVersion
forall a b. (a -> b) -> a -> b
$ IO (Either QErr MaintenanceModeVersion)
-> m (Either QErr MaintenanceModeVersion)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
    (IO (Either QErr MaintenanceModeVersion)
 -> m (Either QErr MaintenanceModeVersion))
-> IO (Either QErr MaintenanceModeVersion)
-> m (Either QErr MaintenanceModeVersion)
forall a b. (a -> b) -> a -> b
$ MSSQLSourceConfig
-> TxET QErr IO MaintenanceModeVersion
-> IO (Either QErr MaintenanceModeVersion)
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
MSSQLSourceConfig -> TxET QErr m a -> m (Either QErr a)
runMSSQLSourceReadTx MSSQLSourceConfig
sourceConfig
    (TxET QErr IO MaintenanceModeVersion
 -> IO (Either QErr MaintenanceModeVersion))
-> TxET QErr IO MaintenanceModeVersion
-> IO (Either QErr MaintenanceModeVersion)
forall a b. (a -> b) -> a -> b
$ TxET QErr IO MaintenanceModeVersion
getMaintenanceModeVersionTx

recordSuccess ::
  (MonadIO m) =>
  MSSQLSourceConfig ->
  Event 'MSSQL ->
  Invocation 'EventType ->
  MaintenanceMode MaintenanceModeVersion ->
  m (Either QErr ())
recordSuccess :: forall (m :: * -> *).
MonadIO m =>
MSSQLSourceConfig
-> Event 'MSSQL
-> Invocation 'EventType
-> MaintenanceMode MaintenanceModeVersion
-> m (Either QErr ())
recordSuccess MSSQLSourceConfig
sourceConfig Event 'MSSQL
event Invocation 'EventType
invocation MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion =
  IO (Either QErr ()) -> m (Either QErr ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
    (IO (Either QErr ()) -> m (Either QErr ()))
-> IO (Either QErr ()) -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$ MSSQLSourceConfig -> TxET QErr IO () -> IO (Either QErr ())
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
MSSQLSourceConfig -> TxET QErr m a -> m (Either QErr a)
runMSSQLSourceWriteTx MSSQLSourceConfig
sourceConfig
    (TxET QErr IO () -> IO (Either QErr ()))
-> TxET QErr IO () -> IO (Either QErr ())
forall a b. (a -> b) -> a -> b
$ do
      TriggerName -> Invocation 'EventType -> TxET QErr IO ()
insertInvocation (TriggerMetadata -> TriggerName
tmName (Event 'MSSQL -> TriggerMetadata
forall (b :: BackendType). Event b -> TriggerMetadata
eTrigger Event 'MSSQL
event)) Invocation 'EventType
invocation
      Event 'MSSQL
-> MaintenanceMode MaintenanceModeVersion -> TxET QErr IO ()
setSuccessTx Event 'MSSQL
event MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion

recordError ::
  (MonadIO m) =>
  MSSQLSourceConfig ->
  Event 'MSSQL ->
  Invocation 'EventType ->
  ProcessEventError ->
  MaintenanceMode MaintenanceModeVersion ->
  m (Either QErr ())
recordError :: forall (m :: * -> *).
MonadIO m =>
MSSQLSourceConfig
-> Event 'MSSQL
-> Invocation 'EventType
-> ProcessEventError
-> MaintenanceMode MaintenanceModeVersion
-> m (Either QErr ())
recordError MSSQLSourceConfig
sourceConfig Event 'MSSQL
event Invocation 'EventType
invocation ProcessEventError
processEventError MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion =
  MSSQLSourceConfig
-> Event 'MSSQL
-> Maybe (Invocation 'EventType)
-> ProcessEventError
-> MaintenanceMode MaintenanceModeVersion
-> m (Either QErr ())
forall (m :: * -> *).
MonadIO m =>
MSSQLSourceConfig
-> Event 'MSSQL
-> Maybe (Invocation 'EventType)
-> ProcessEventError
-> MaintenanceMode MaintenanceModeVersion
-> m (Either QErr ())
recordError' MSSQLSourceConfig
sourceConfig Event 'MSSQL
event (Invocation 'EventType -> Maybe (Invocation 'EventType)
forall a. a -> Maybe a
Just Invocation 'EventType
invocation) ProcessEventError
processEventError MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion

recordError' ::
  (MonadIO m) =>
  MSSQLSourceConfig ->
  Event 'MSSQL ->
  Maybe (Invocation 'EventType) ->
  ProcessEventError ->
  MaintenanceMode MaintenanceModeVersion ->
  m (Either QErr ())
recordError' :: forall (m :: * -> *).
MonadIO m =>
MSSQLSourceConfig
-> Event 'MSSQL
-> Maybe (Invocation 'EventType)
-> ProcessEventError
-> MaintenanceMode MaintenanceModeVersion
-> m (Either QErr ())
recordError' MSSQLSourceConfig
sourceConfig Event 'MSSQL
event Maybe (Invocation 'EventType)
invocation ProcessEventError
processEventError MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion =
  IO (Either QErr ()) -> m (Either QErr ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
    (IO (Either QErr ()) -> m (Either QErr ()))
-> IO (Either QErr ()) -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$ MSSQLSourceConfig -> TxET QErr IO () -> IO (Either QErr ())
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
MSSQLSourceConfig -> TxET QErr m a -> m (Either QErr a)
runMSSQLSourceWriteTx MSSQLSourceConfig
sourceConfig
    (TxET QErr IO () -> IO (Either QErr ()))
-> TxET QErr IO () -> IO (Either QErr ())
forall a b. (a -> b) -> a -> b
$ do
      Maybe (Invocation 'EventType)
-> (Invocation 'EventType -> TxET QErr IO ()) -> TxET QErr IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ Maybe (Invocation 'EventType)
invocation ((Invocation 'EventType -> TxET QErr IO ()) -> TxET QErr IO ())
-> (Invocation 'EventType -> TxET QErr IO ()) -> TxET QErr IO ()
forall a b. (a -> b) -> a -> b
$ TriggerName -> Invocation 'EventType -> TxET QErr IO ()
insertInvocation (TriggerMetadata -> TriggerName
tmName (Event 'MSSQL -> TriggerMetadata
forall (b :: BackendType). Event b -> TriggerMetadata
eTrigger Event 'MSSQL
event))
      case ProcessEventError
processEventError of
        PESetRetry UTCTime
retryTime -> do
          Event 'MSSQL
-> UTCTime
-> MaintenanceMode MaintenanceModeVersion
-> TxET QErr IO ()
setRetryTx Event 'MSSQL
event UTCTime
retryTime MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion
        ProcessEventError
PESetError -> Event 'MSSQL
-> MaintenanceMode MaintenanceModeVersion -> TxET QErr IO ()
setErrorTx Event 'MSSQL
event MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion

redeliverEvent ::
  (MonadIO m, MonadError QErr m) =>
  MSSQLSourceConfig ->
  EventId ->
  m ()
redeliverEvent :: forall (m :: * -> *).
(MonadIO m, MonadError QErr m) =>
MSSQLSourceConfig -> EventId -> m ()
redeliverEvent MSSQLSourceConfig
sourceConfig EventId
eventId =
  m (Either QErr ()) -> m ()
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM
    (m (Either QErr ()) -> m ()) -> m (Either QErr ()) -> m ()
forall a b. (a -> b) -> a -> b
$ IO (Either QErr ()) -> m (Either QErr ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
    (IO (Either QErr ()) -> m (Either QErr ()))
-> IO (Either QErr ()) -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$ MSSQLSourceConfig -> TxET QErr IO () -> IO (Either QErr ())
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
MSSQLSourceConfig -> TxET QErr m a -> m (Either QErr a)
runMSSQLSourceWriteTx MSSQLSourceConfig
sourceConfig
    (TxET QErr IO () -> IO (Either QErr ()))
-> TxET QErr IO () -> IO (Either QErr ())
forall a b. (a -> b) -> a -> b
$ do
      EventId -> TxET QErr IO ()
checkEventTx EventId
eventId
      EventId -> TxET QErr IO ()
markForDeliveryTx EventId
eventId

dropTriggerAndArchiveEvents ::
  (MonadIO m, MonadError QErr m) =>
  MSSQLSourceConfig ->
  TriggerName ->
  TableName ->
  m ()
dropTriggerAndArchiveEvents :: forall (m :: * -> *).
(MonadIO m, MonadError QErr m) =>
MSSQLSourceConfig -> TriggerName -> TableName -> m ()
dropTriggerAndArchiveEvents MSSQLSourceConfig
sourceConfig TriggerName
triggerName TableName
table =
  m (Either QErr ()) -> m ()
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM
    (m (Either QErr ()) -> m ()) -> m (Either QErr ()) -> m ()
forall a b. (a -> b) -> a -> b
$ IO (Either QErr ()) -> m (Either QErr ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
    (IO (Either QErr ()) -> m (Either QErr ()))
-> IO (Either QErr ()) -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$ MSSQLSourceConfig -> TxET QErr IO () -> IO (Either QErr ())
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
MSSQLSourceConfig -> TxET QErr m a -> m (Either QErr a)
runMSSQLSourceWriteTx MSSQLSourceConfig
sourceConfig
    (TxET QErr IO () -> IO (Either QErr ()))
-> TxET QErr IO () -> IO (Either QErr ())
forall a b. (a -> b) -> a -> b
$ do
      TriggerName -> SchemaName -> TxET QErr IO ()
dropTriggerQ TriggerName
triggerName (TableName -> SchemaName
tableSchema TableName
table)
      TriggerName -> TxET QErr IO ()
archiveEvents TriggerName
triggerName

dropDanglingSQLTrigger ::
  (MonadIO m, MonadError QErr m) =>
  MSSQLSourceConfig ->
  TriggerName ->
  TableName ->
  HashSet Ops ->
  m ()
dropDanglingSQLTrigger :: forall (m :: * -> *).
(MonadIO m, MonadError QErr m) =>
MSSQLSourceConfig
-> TriggerName -> TableName -> HashSet Ops -> m ()
dropDanglingSQLTrigger MSSQLSourceConfig
sourceConfig TriggerName
triggerName TableName
table HashSet Ops
ops =
  m (Either QErr ()) -> m ()
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM
    (m (Either QErr ()) -> m ()) -> m (Either QErr ()) -> m ()
forall a b. (a -> b) -> a -> b
$ IO (Either QErr ()) -> m (Either QErr ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
    (IO (Either QErr ()) -> m (Either QErr ()))
-> IO (Either QErr ()) -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$ MSSQLSourceConfig -> TxET QErr IO () -> IO (Either QErr ())
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
MSSQLSourceConfig -> TxET QErr m a -> m (Either QErr a)
runMSSQLSourceWriteTx MSSQLSourceConfig
sourceConfig
    (TxET QErr IO () -> IO (Either QErr ()))
-> TxET QErr IO () -> IO (Either QErr ())
forall a b. (a -> b) -> a -> b
$ do
      (Ops -> TxET QErr IO ()) -> HashSet Ops -> TxET QErr IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (TriggerName -> SchemaName -> Ops -> TxET QErr IO ()
dropTriggerOp TriggerName
triggerName (TableName -> SchemaName
tableSchema TableName
table)) HashSet Ops
ops

createTableEventTrigger ::
  (MonadIO m) =>
  SQLGenCtx ->
  MSSQLSourceConfig ->
  TableName ->
  [ColumnInfo 'MSSQL] ->
  TriggerName ->
  TriggerOnReplication ->
  TriggerOpsDef 'MSSQL ->
  Maybe (PrimaryKey 'MSSQL (ColumnInfo 'MSSQL)) ->
  m (Either QErr ())
createTableEventTrigger :: forall (m :: * -> *).
MonadIO m =>
SQLGenCtx
-> MSSQLSourceConfig
-> TableName
-> [ColumnInfo 'MSSQL]
-> TriggerName
-> TriggerOnReplication
-> TriggerOpsDef 'MSSQL
-> Maybe (PrimaryKey 'MSSQL (ColumnInfo 'MSSQL))
-> m (Either QErr ())
createTableEventTrigger SQLGenCtx
_sqlGen MSSQLSourceConfig
sourceConfig TableName
table [ColumnInfo 'MSSQL]
columns TriggerName
triggerName TriggerOnReplication
triggerOnReplication TriggerOpsDef 'MSSQL
opsDefinition Maybe (PrimaryKey 'MSSQL (ColumnInfo 'MSSQL))
primaryKeyMaybe = do
  IO (Either QErr ()) -> m (Either QErr ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
    (IO (Either QErr ()) -> m (Either QErr ()))
-> IO (Either QErr ()) -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$ MSSQLSourceConfig -> TxET QErr IO () -> IO (Either QErr ())
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
MSSQLSourceConfig -> TxET QErr m a -> m (Either QErr a)
runMSSQLSourceWriteTx MSSQLSourceConfig
sourceConfig
    (TxET QErr IO () -> IO (Either QErr ()))
-> TxET QErr IO () -> IO (Either QErr ())
forall a b. (a -> b) -> a -> b
$ do
      TriggerName
-> TableName
-> TriggerOnReplication
-> [ColumnInfo 'MSSQL]
-> TriggerOpsDef 'MSSQL
-> Maybe (PrimaryKey 'MSSQL (ColumnInfo 'MSSQL))
-> TxET QErr IO ()
forall (m :: * -> *).
MonadMSSQLTx m =>
TriggerName
-> TableName
-> TriggerOnReplication
-> [ColumnInfo 'MSSQL]
-> TriggerOpsDef 'MSSQL
-> Maybe (PrimaryKey 'MSSQL (ColumnInfo 'MSSQL))
-> m ()
mkAllTriggersQ TriggerName
triggerName TableName
table TriggerOnReplication
triggerOnReplication [ColumnInfo 'MSSQL]
columns TriggerOpsDef 'MSSQL
opsDefinition Maybe (PrimaryKey 'MSSQL (ColumnInfo 'MSSQL))
primaryKeyMaybe

createMissingSQLTriggers ::
  ( MonadIO m,
    MonadError QErr m,
    MonadBaseControl IO m
  ) =>
  SQLGenCtx ->
  MSSQLSourceConfig ->
  TableName ->
  ([ColumnInfo 'MSSQL], Maybe (PrimaryKey 'MSSQL (ColumnInfo 'MSSQL))) ->
  TriggerName ->
  TriggerOnReplication ->
  TriggerOpsDef 'MSSQL ->
  m ()
createMissingSQLTriggers :: forall (m :: * -> *).
(MonadIO m, MonadError QErr m, MonadBaseControl IO m) =>
SQLGenCtx
-> MSSQLSourceConfig
-> TableName
-> ([ColumnInfo 'MSSQL],
    Maybe (PrimaryKey 'MSSQL (ColumnInfo 'MSSQL)))
-> TriggerName
-> TriggerOnReplication
-> TriggerOpsDef 'MSSQL
-> m ()
createMissingSQLTriggers
  SQLGenCtx
_serverConfigCtx
  MSSQLSourceConfig
sourceConfig
  table :: TableName
table@(TableName Text
tableNameText (SchemaName Text
schemaText))
  ([ColumnInfo 'MSSQL]
allCols, Maybe (PrimaryKey 'MSSQL (ColumnInfo 'MSSQL))
primaryKeyMaybe)
  TriggerName
triggerName
  TriggerOnReplication
triggerOnReplication
  TriggerOpsDef 'MSSQL
opsDefinition = do
    m (Either QErr ()) -> m ()
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM
      (m (Either QErr ()) -> m ()) -> m (Either QErr ()) -> m ()
forall a b. (a -> b) -> a -> b
$ MSSQLSourceConfig -> TxET QErr m () -> m (Either QErr ())
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
MSSQLSourceConfig -> TxET QErr m a -> m (Either QErr a)
runMSSQLSourceWriteTx MSSQLSourceConfig
sourceConfig
      (TxET QErr m () -> m (Either QErr ()))
-> TxET QErr m () -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$ do
        Maybe (SubscribeOpSpec 'MSSQL)
-> (SubscribeOpSpec 'MSSQL -> TxET QErr m ()) -> TxET QErr m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ (TriggerOpsDef 'MSSQL -> Maybe (SubscribeOpSpec 'MSSQL)
forall (b :: BackendType).
TriggerOpsDef b -> Maybe (SubscribeOpSpec b)
tdInsert TriggerOpsDef 'MSSQL
opsDefinition) (Ops -> SubscribeOpSpec 'MSSQL -> TxET QErr m ()
doesSQLTriggerExist Ops
INSERT)
        Maybe (SubscribeOpSpec 'MSSQL)
-> (SubscribeOpSpec 'MSSQL -> TxET QErr m ()) -> TxET QErr m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ (TriggerOpsDef 'MSSQL -> Maybe (SubscribeOpSpec 'MSSQL)
forall (b :: BackendType).
TriggerOpsDef b -> Maybe (SubscribeOpSpec b)
tdUpdate TriggerOpsDef 'MSSQL
opsDefinition) (Ops -> SubscribeOpSpec 'MSSQL -> TxET QErr m ()
doesSQLTriggerExist Ops
UPDATE)
        Maybe (SubscribeOpSpec 'MSSQL)
-> (SubscribeOpSpec 'MSSQL -> TxET QErr m ()) -> TxET QErr m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ (TriggerOpsDef 'MSSQL -> Maybe (SubscribeOpSpec 'MSSQL)
forall (b :: BackendType).
TriggerOpsDef b -> Maybe (SubscribeOpSpec b)
tdDelete TriggerOpsDef 'MSSQL
opsDefinition) (Ops -> SubscribeOpSpec 'MSSQL -> TxET QErr m ()
doesSQLTriggerExist Ops
DELETE)
    where
      doesSQLTriggerExist :: Ops -> SubscribeOpSpec 'MSSQL -> TxET QErr m ()
doesSQLTriggerExist Ops
op SubscribeOpSpec 'MSSQL
opSpec = do
        let triggerNameWithOp :: Text
triggerNameWithOp = Text
"notify_hasura_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> TriggerName -> Text
triggerNameToTxt TriggerName
triggerName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Ops -> Text
forall a. Show a => a -> Text
tshow Ops
op
        Bool
doesOpTriggerExist <-
          TxE QErr Bool -> TxET QErr m Bool
forall a. TxE QErr a -> TxET QErr m a
forall (m :: * -> *) a. MonadMSSQLTx m => TxE QErr a -> m a
liftMSSQLTx
            (TxE QErr Bool -> TxET QErr m Bool)
-> TxE QErr Bool -> TxET QErr m Bool
forall a b. (a -> b) -> a -> b
$ (MSSQLTxError -> QErr) -> Query -> TxE QErr Bool
forall (m :: * -> *) a e.
(MonadIO m, FromRow a) =>
(MSSQLTxError -> e) -> Query -> TxET e m a
singleRowQueryE
              MSSQLTxError -> QErr
HGE.defaultMSSQLTxErrorHandler
              [ODBC.sql|
               SELECT CASE WHEN EXISTS
                 ( SELECT 1
                   FROM sys.triggers tr
                   INNER join sys.tables tb on tr.parent_id = tb.object_id
                   INNER join sys.schemas s on tb.schema_id = s.schema_id
                   WHERE tb.name = $tableNameText AND tr.name = $triggerNameWithOp AND s.name = $schemaText
                 )
               THEN CAST(1 AS BIT)
               ELSE CAST(0 AS BIT)
               END;
             |]
        Bool -> TxET QErr m () -> TxET QErr m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
doesOpTriggerExist (TxET QErr m () -> TxET QErr m ())
-> TxET QErr m () -> TxET QErr m ()
forall a b. (a -> b) -> a -> b
$ do
          case Ops
op of
            Ops
INSERT -> TriggerName
-> TableName
-> [ColumnInfo 'MSSQL]
-> TriggerOnReplication
-> SubscribeOpSpec 'MSSQL
-> TxET QErr m ()
forall (m :: * -> *).
MonadMSSQLTx m =>
TriggerName
-> TableName
-> [ColumnInfo 'MSSQL]
-> TriggerOnReplication
-> SubscribeOpSpec 'MSSQL
-> m ()
mkInsertTriggerQ TriggerName
triggerName TableName
table [ColumnInfo 'MSSQL]
allCols TriggerOnReplication
triggerOnReplication SubscribeOpSpec 'MSSQL
opSpec
            Ops
UPDATE -> TriggerName
-> TableName
-> [ColumnInfo 'MSSQL]
-> TriggerOnReplication
-> Maybe (PrimaryKey 'MSSQL (ColumnInfo 'MSSQL))
-> SubscribeOpSpec 'MSSQL
-> TxET QErr m ()
forall (m :: * -> *).
MonadMSSQLTx m =>
TriggerName
-> TableName
-> [ColumnInfo 'MSSQL]
-> TriggerOnReplication
-> Maybe (PrimaryKey 'MSSQL (ColumnInfo 'MSSQL))
-> SubscribeOpSpec 'MSSQL
-> m ()
mkUpdateTriggerQ TriggerName
triggerName TableName
table [ColumnInfo 'MSSQL]
allCols TriggerOnReplication
triggerOnReplication Maybe (PrimaryKey 'MSSQL (ColumnInfo 'MSSQL))
primaryKeyMaybe SubscribeOpSpec 'MSSQL
opSpec
            Ops
DELETE -> TriggerName
-> TableName
-> [ColumnInfo 'MSSQL]
-> TriggerOnReplication
-> SubscribeOpSpec 'MSSQL
-> TxET QErr m ()
forall (m :: * -> *).
MonadMSSQLTx m =>
TriggerName
-> TableName
-> [ColumnInfo 'MSSQL]
-> TriggerOnReplication
-> SubscribeOpSpec 'MSSQL
-> m ()
mkDeleteTriggerQ TriggerName
triggerName TableName
table [ColumnInfo 'MSSQL]
allCols TriggerOnReplication
triggerOnReplication SubscribeOpSpec 'MSSQL
opSpec
            Ops
MANUAL -> () -> TxET QErr m ()
forall a. a -> TxET QErr m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

unlockEventsInSource ::
  (MonadIO m) =>
  MSSQLSourceConfig ->
  NE.NESet EventId ->
  m (Either QErr Int)
unlockEventsInSource :: forall (m :: * -> *).
MonadIO m =>
MSSQLSourceConfig -> NESet EventId -> m (Either QErr Int)
unlockEventsInSource MSSQLSourceConfig
sourceConfig NESet EventId
eventIds =
  IO (Either QErr Int) -> m (Either QErr Int)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
    (IO (Either QErr Int) -> m (Either QErr Int))
-> IO (Either QErr Int) -> m (Either QErr Int)
forall a b. (a -> b) -> a -> b
$ MSSQLSourceConfig -> TxET QErr IO Int -> IO (Either QErr Int)
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
MSSQLSourceConfig -> TxET QErr m a -> m (Either QErr a)
runMSSQLSourceWriteTx MSSQLSourceConfig
sourceConfig
    (TxET QErr IO Int -> IO (Either QErr Int))
-> TxET QErr IO Int -> IO (Either QErr Int)
forall a b. (a -> b) -> a -> b
$ do
      [EventId] -> TxET QErr IO Int
unlockEventsTx ([EventId] -> TxET QErr IO Int) -> [EventId] -> TxET QErr IO Int
forall a b. (a -> b) -> a -> b
$ NESet EventId -> [EventId]
forall a. NESet a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList NESet EventId
eventIds

-- Check if any trigger for any of the operation exists with the 'triggerName'
checkIfTriggerExists ::
  (MonadIO m, MonadError QErr m) =>
  MSSQLSourceConfig ->
  TriggerName ->
  HashSet Ops ->
  m Bool
checkIfTriggerExists :: forall (m :: * -> *).
(MonadIO m, MonadError QErr m) =>
MSSQLSourceConfig -> TriggerName -> HashSet Ops -> m Bool
checkIfTriggerExists MSSQLSourceConfig
sourceConfig TriggerName
triggerName HashSet Ops
ops = do
  m (Either QErr Bool) -> m Bool
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM
    (m (Either QErr Bool) -> m Bool) -> m (Either QErr Bool) -> m Bool
forall a b. (a -> b) -> a -> b
$ IO (Either QErr Bool) -> m (Either QErr Bool)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
    (IO (Either QErr Bool) -> m (Either QErr Bool))
-> IO (Either QErr Bool) -> m (Either QErr Bool)
forall a b. (a -> b) -> a -> b
$ MSSQLSourceConfig -> TxE QErr Bool -> IO (Either QErr Bool)
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
MSSQLSourceConfig -> TxET QErr m a -> m (Either QErr a)
runMSSQLSourceWriteTx MSSQLSourceConfig
sourceConfig
    (TxE QErr Bool -> IO (Either QErr Bool))
-> TxE QErr Bool -> IO (Either QErr Bool)
forall a b. (a -> b) -> a -> b
$ ([Bool] -> Bool) -> TxET QErr IO [Bool] -> TxE QErr Bool
forall a b. (a -> b) -> TxET QErr IO a -> TxET QErr IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap [Bool] -> Bool
forall (t :: * -> *). Foldable t => t Bool -> Bool
or ((Ops -> TxE QErr Bool) -> [Ops] -> TxET QErr IO [Bool]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> [a] -> f [b]
traverse (TriggerName -> Ops -> TxE QErr Bool
checkIfTriggerExistsQ TriggerName
triggerName) (HashSet Ops -> [Ops]
forall a. HashSet a -> [a]
HashSet.toList HashSet Ops
ops))

---- DATABASE QUERIES ---------------------
--
--   The API for our in-database work queue:
-------------------------------------------

insertInvocation :: TriggerName -> Invocation 'EventType -> TxE QErr ()
insertInvocation :: TriggerName -> Invocation 'EventType -> TxET QErr IO ()
insertInvocation TriggerName
tName Invocation 'EventType
invo = do
  (MSSQLTxError -> QErr) -> Query -> TxET QErr IO ()
forall (m :: * -> *) e.
MonadIO m =>
(MSSQLTxError -> e) -> Query -> TxET e m ()
unitQueryE
    MSSQLTxError -> QErr
HGE.defaultMSSQLTxErrorHandler
    [ODBC.sql|
      INSERT INTO hdb_catalog.event_invocation_logs (event_id, trigger_name, status, request, response)
          VALUES ($invoEventId, $invoTriggerName, $invoStatus, $invoRequest, $invoResponse)
    |]

  (MSSQLTxError -> QErr) -> Query -> TxET QErr IO ()
forall (m :: * -> *) e.
MonadIO m =>
(MSSQLTxError -> e) -> Query -> TxET e m ()
unitQueryE
    MSSQLTxError -> QErr
HGE.defaultMSSQLTxErrorHandler
    [ODBC.sql|
      UPDATE hdb_catalog.event_log

      SET tries = tries + 1
      WHERE id = $invoEventId
    |]
  where
    invoEventId :: Text
invoEventId = EventId -> Text
unEventId (EventId -> Text) -> EventId -> Text
forall a b. (a -> b) -> a -> b
$ Invocation 'EventType -> EventId
forall (a :: TriggerTypes). Invocation a -> EventId
iEventId Invocation 'EventType
invo
    invoStatus :: Maybe Int
invoStatus = Int -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Int) -> Maybe Int -> Maybe Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Invocation 'EventType -> Maybe Int
forall (a :: TriggerTypes). Invocation a -> Maybe Int
iStatus Invocation 'EventType
invo :: Maybe Int
    invoRequest :: ByteString
invoRequest = Value -> ByteString
forall a. ToJSON a => a -> ByteString
J.encode (Value -> ByteString) -> Value -> ByteString
forall a b. (a -> b) -> a -> b
$ WebhookRequest -> Value
forall a. ToJSON a => a -> Value
J.toJSON (WebhookRequest -> Value) -> WebhookRequest -> Value
forall a b. (a -> b) -> a -> b
$ Invocation 'EventType -> WebhookRequest
forall (a :: TriggerTypes). Invocation a -> WebhookRequest
iRequest Invocation 'EventType
invo
    invoResponse :: ByteString
invoResponse = Value -> ByteString
forall a. ToJSON a => a -> ByteString
J.encode (Value -> ByteString) -> Value -> ByteString
forall a b. (a -> b) -> a -> b
$ Response 'EventType -> Value
forall a. ToJSON a => a -> Value
J.toJSON (Response 'EventType -> Value) -> Response 'EventType -> Value
forall a b. (a -> b) -> a -> b
$ Invocation 'EventType -> Response 'EventType
forall (a :: TriggerTypes). Invocation a -> Response a
iResponse Invocation 'EventType
invo
    invoTriggerName :: Text
invoTriggerName = TriggerName -> Text
triggerNameToTxt TriggerName
tName

insertMSSQLManualEventTx ::
  TableName ->
  TriggerName ->
  J.Value ->
  TxE QErr EventId
insertMSSQLManualEventTx :: TableName -> TriggerName -> Value -> TxET QErr IO EventId
insertMSSQLManualEventTx (TableName Text
tableName (SchemaName Text
schemaName)) TriggerName
triggerName Value
rowData = do
  ByteString
eventId <-
    (MSSQLTxError -> QErr) -> Query -> TxET QErr IO ByteString
forall (m :: * -> *) a e.
(MonadIO m, FromRow a) =>
(MSSQLTxError -> e) -> Query -> TxET e m a
singleRowQueryE
      MSSQLTxError -> QErr
HGE.defaultMSSQLTxErrorHandler
      [ODBC.sql|
          INSERT INTO hdb_catalog.event_log (schema_name, table_name, trigger_name, payload)
          OUTPUT CONVERT(varchar(MAX), inserted.id)
          VALUES
          ($schemaName, $tableName, $triggerNameTxt, $payload)
        |]
  EventId -> TxET QErr IO EventId
forall a. a -> TxET QErr IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Text -> EventId
EventId (ByteString -> Text
bsToTxt ByteString
eventId))
  where
    triggerNameTxt :: Text
triggerNameTxt = TriggerName -> Text
triggerNameToTxt TriggerName
triggerName
    payload :: ByteString
payload = Value -> ByteString
forall a. ToJSON a => a -> ByteString
J.encode Value
rowData

setSuccessTx :: Event 'MSSQL -> MaintenanceMode MaintenanceModeVersion -> TxE QErr ()
setSuccessTx :: Event 'MSSQL
-> MaintenanceMode MaintenanceModeVersion -> TxET QErr IO ()
setSuccessTx Event 'MSSQL
event = \case
  (MaintenanceModeEnabled MaintenanceModeVersion
PreviousMMVersion) -> Text -> TxET QErr IO ()
forall (m :: * -> *) a. QErrM m => Text -> m a
throw500 Text
"unexpected: no previous maintenance mode version found for MSSQL source"
  (MaintenanceModeEnabled MaintenanceModeVersion
CurrentMMVersion) -> TxET QErr IO ()
latestVersionSetSuccess
  MaintenanceMode MaintenanceModeVersion
MaintenanceModeDisabled -> TxET QErr IO ()
latestVersionSetSuccess
  where
    eventId :: Text
eventId = EventId -> Text
unEventId (EventId -> Text) -> EventId -> Text
forall a b. (a -> b) -> a -> b
$ Event 'MSSQL -> EventId
forall (b :: BackendType). Event b -> EventId
eId Event 'MSSQL
event

    latestVersionSetSuccess :: TxET QErr IO ()
latestVersionSetSuccess =
      (MSSQLTxError -> QErr) -> Query -> TxET QErr IO ()
forall (m :: * -> *) e.
MonadIO m =>
(MSSQLTxError -> e) -> Query -> TxET e m ()
unitQueryE
        MSSQLTxError -> QErr
HGE.defaultMSSQLTxErrorHandler
        [ODBC.sql|
          UPDATE hdb_catalog.event_log
          SET delivered = 1 , next_retry_at = NULL, locked = NULL
          WHERE id = $eventId
        |]

setErrorTx :: Event 'MSSQL -> MaintenanceMode MaintenanceModeVersion -> TxE QErr ()
setErrorTx :: Event 'MSSQL
-> MaintenanceMode MaintenanceModeVersion -> TxET QErr IO ()
setErrorTx Event 'MSSQL
event = \case
  (MaintenanceModeEnabled MaintenanceModeVersion
PreviousMMVersion) -> Text -> TxET QErr IO ()
forall (m :: * -> *) a. QErrM m => Text -> m a
throw500 Text
"unexpected: there is no previous maintenance mode version supported for MSSQL event triggers"
  (MaintenanceModeEnabled MaintenanceModeVersion
CurrentMMVersion) -> TxET QErr IO ()
latestVersionSetSuccess
  MaintenanceMode MaintenanceModeVersion
MaintenanceModeDisabled -> TxET QErr IO ()
latestVersionSetSuccess
  where
    eventId :: Text
eventId = EventId -> Text
unEventId (EventId -> Text) -> EventId -> Text
forall a b. (a -> b) -> a -> b
$ Event 'MSSQL -> EventId
forall (b :: BackendType). Event b -> EventId
eId Event 'MSSQL
event

    latestVersionSetSuccess :: TxET QErr IO ()
latestVersionSetSuccess =
      (MSSQLTxError -> QErr) -> Query -> TxET QErr IO ()
forall (m :: * -> *) e.
MonadIO m =>
(MSSQLTxError -> e) -> Query -> TxET e m ()
unitQueryE
        MSSQLTxError -> QErr
HGE.defaultMSSQLTxErrorHandler
        [ODBC.sql|
          UPDATE hdb_catalog.event_log
          SET error = 1 , next_retry_at = NULL, locked = NULL
          WHERE id = $eventId
        |]

setRetryTx :: Event 'MSSQL -> UTCTime -> MaintenanceMode MaintenanceModeVersion -> TxE QErr ()
setRetryTx :: Event 'MSSQL
-> UTCTime
-> MaintenanceMode MaintenanceModeVersion
-> TxET QErr IO ()
setRetryTx Event 'MSSQL
event UTCTime
utcTime MaintenanceMode MaintenanceModeVersion
maintenanceMode = do
  -- since `convertUTCToDatetime2` uses utc as timezone, it will not affect the value
  Datetime2
time <- UTCTime -> TxET QErr IO Datetime2
forall (m :: * -> *). MonadIO m => UTCTime -> m Datetime2
convertUTCToDatetime2 UTCTime
utcTime
  case MaintenanceMode MaintenanceModeVersion
maintenanceMode of
    (MaintenanceModeEnabled MaintenanceModeVersion
PreviousMMVersion) -> Text -> TxET QErr IO ()
forall (m :: * -> *) a. QErrM m => Text -> m a
throw500 Text
"unexpected: there is no previous maintenance mode version supported for MSSQL event triggers"
    (MaintenanceModeEnabled MaintenanceModeVersion
CurrentMMVersion) -> Datetime2 -> TxET QErr IO ()
latestVersionSetRetry Datetime2
time
    MaintenanceMode MaintenanceModeVersion
MaintenanceModeDisabled -> Datetime2 -> TxET QErr IO ()
latestVersionSetRetry Datetime2
time
  where
    eventId :: Text
eventId = EventId -> Text
unEventId (EventId -> Text) -> EventId -> Text
forall a b. (a -> b) -> a -> b
$ Event 'MSSQL -> EventId
forall (b :: BackendType). Event b -> EventId
eId Event 'MSSQL
event
    -- NOTE: Naveen: The following method to convert from Datetime to Datetimeoffset  was
    -- taken from https://stackoverflow.com/questions/17866311/how-to-cast-datetime-to-datetimeoffset
    latestVersionSetRetry :: Datetime2 -> TxET QErr IO ()
latestVersionSetRetry Datetime2
time =
      -- `time` is in UTC (without the timezone offset). The function TODATETIMEOFFSET adds the offset 00:00 (UTC) to
      -- `time`, which collectively represents the value present in next_retry_at
      (MSSQLTxError -> QErr) -> Query -> TxET QErr IO ()
forall (m :: * -> *) e.
MonadIO m =>
(MSSQLTxError -> e) -> Query -> TxET e m ()
unitQueryE
        MSSQLTxError -> QErr
HGE.defaultMSSQLTxErrorHandler
        [ODBC.sql|
          UPDATE hdb_catalog.event_log
          SET next_retry_at = TODATETIMEOFFSET ($time, 0), locked = NULL
          WHERE id = $eventId
        |]

-- | Lock and return events not yet being processed or completed, up to some
-- limit. Process events approximately in created_at order, but we make no
-- ordering guarentees; events can and will race. Nevertheless we want to
-- ensure newer change events don't starve older ones.
fetchEvents :: SourceName -> [TriggerName] -> FetchBatchSize -> TxE QErr [Event 'MSSQL]
fetchEvents :: SourceName
-> [TriggerName] -> FetchBatchSize -> TxET QErr IO [Event 'MSSQL]
fetchEvents SourceName
source [TriggerName]
triggerNames (FetchBatchSize Int
fetchBatchSize) = do
  -- The reason we do not inline the SQL but rather  create a template string is due
  -- to the problem with `ODBC.sql` variable substitution. When you use a variable
  -- whose value is a text and it contains a single quote `'`, the `ODBC.sql`
  -- function escapes that single quote. i.e it converts `'` to `''`
  --
  -- Note: A single quote in MSSQL is escaped by doubling it up (`''`)
  --
  -- This is problematic, since we use a list of list of trigger names to fetch and
  -- lock the events. A list of trigger names in MSSQL looks like Eg:
  -- ('insert_test_books', 'et_test_bigint')
  --
  -- We use this list of trigger names in the `IN` operator to fetch only those
  -- events.
  --
  -- If we were to use the `ODBC.sql` function it would convert the list into
  -- something which is not a valid list.
  -- Eg: ('insert_test_books', 'et_test_bigint') -> (''insert_test_books'', ''et_test_bigint'')
  --
  -- Due to the problematic variable substitution of `ODBC.sql` it is imperative that
  -- we resort to template strings, since that does not do any changes to the string.
  [(ByteString, Text, Text, Text, Text, Int, ByteString,
  Maybe ByteString)]
events <-
    (MSSQLTxError -> QErr)
-> Query
-> TxET
     QErr
     IO
     [(ByteString, Text, Text, Text, Text, Int, ByteString,
       Maybe ByteString)]
forall (m :: * -> *) a e.
(MonadIO m, FromRow a) =>
(MSSQLTxError -> e) -> Query -> TxET e m [a]
multiRowQueryE MSSQLTxError -> QErr
HGE.defaultMSSQLTxErrorHandler
      (Query
 -> TxET
      QErr
      IO
      [(ByteString, Text, Text, Text, Text, Int, ByteString,
        Maybe ByteString)])
-> Query
-> TxET
     QErr
     IO
     [(ByteString, Text, Text, Text, Text, Int, ByteString,
       Maybe ByteString)]
forall a b. (a -> b) -> a -> b
$ Text -> Query
rawUnescapedText
      (Text -> Query) -> (Text -> Text) -> Text -> Query
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> Text
LT.toStrict
      (Text -> Query) -> Text -> Query
forall a b. (a -> b) -> a -> b
$ $(makeRelativeToProject "src-rsr/mssql/mssql_fetch_events.sql.shakespeare" >>= ST.stextFile)
  ((ByteString, Text, Text, Text, Text, Int, ByteString,
  Maybe ByteString)
 -> TxET QErr IO (Event 'MSSQL))
-> [(ByteString, Text, Text, Text, Text, Int, ByteString,
     Maybe ByteString)]
-> TxET QErr IO [Event 'MSSQL]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM (ByteString, Text, Text, Text, Text, Int, ByteString,
 Maybe ByteString)
-> TxET QErr IO (Event 'MSSQL)
uncurryEvent [(ByteString, Text, Text, Text, Text, Int, ByteString,
  Maybe ByteString)]
events
  where
    -- Creates a list of trigger names to be used for 'IN' operator
    -- Eg: ('insert_test_books', 'et_test_bigint')
    --
    -- We cannot use 'commaseperated()' because it creates the Text as
    -- 'insert_test_books, et_test_bigint' which is not useful to compare values in
    -- 'IN' MSSQL operator.
    triggerNamesTxt :: Text
triggerNamesTxt = Text
"(" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> [Text] -> Text
forall t (f :: * -> *). (ToTxt t, Foldable f) => f t -> Text
commaSeparated ((TriggerName -> Text) -> [TriggerName] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map (\TriggerName
t -> Text
"'" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> TriggerName -> Text
forall a. ToTxt a => a -> Text
toTxt TriggerName
t Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"'") [TriggerName]
triggerNames) Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
")"

    uncurryEvent :: (ByteString, Text, Text, Text, Text, Int, ByteString,
 Maybe ByteString)
-> TxET QErr IO (Event 'MSSQL)
uncurryEvent (ByteString
eventId, Text
sn, Text
tn, Text
trn, Text
payload :: Text, Int
tries, ByteString
createdAt :: B.ByteString, Maybe ByteString
nextRetryAt :: Maybe B.ByteString) = do
      -- see Note [Encode Event Trigger Payload to JSON in SQL Server]
      Value
payload' <- Text -> String -> TxET QErr IO Value
forall a (m :: * -> *).
(FromJSON a, QErrM m) =>
Text -> String -> m a
encodeJSON Text
payload String
"payload decode failed while fetching MSSQL events"
      UTCTime
createdAt' <- ByteString -> String -> TxET QErr IO UTCTime
forall (m :: * -> *).
MonadError QErr m =>
ByteString -> String -> m UTCTime
bsToUTCTime ByteString
createdAt String
"conversion of created_at to UTCTime failed while fetching MSSQL events"
      Maybe UTCTime
retryAt <- (ByteString -> TxET QErr IO UTCTime)
-> Maybe ByteString -> TxET QErr IO (Maybe UTCTime)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Maybe a -> f (Maybe b)
traverse (ByteString -> String -> TxET QErr IO UTCTime
forall (m :: * -> *).
MonadError QErr m =>
ByteString -> String -> m UTCTime
`bsToUTCTime` String
"conversion of next_retry_at to UTCTime failed while fetching MSSQL events") Maybe ByteString
nextRetryAt

      Event 'MSSQL -> TxET QErr IO (Event 'MSSQL)
forall a. a -> TxET QErr IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
        (Event 'MSSQL -> TxET QErr IO (Event 'MSSQL))
-> Event 'MSSQL -> TxET QErr IO (Event 'MSSQL)
forall a b. (a -> b) -> a -> b
$ Event
          { eId :: EventId
eId = Text -> EventId
EventId (ByteString -> Text
bsToTxt ByteString
eventId),
            eSource :: SourceName
eSource = SourceName
source,
            eTable :: TableName 'MSSQL
eTable = (Text -> SchemaName -> TableName
TableName Text
tn (Text -> SchemaName
SchemaName Text
sn)),
            eTrigger :: TriggerMetadata
eTrigger = TriggerName -> TriggerMetadata
TriggerMetadata (NonEmptyText -> TriggerName
TriggerName (NonEmptyText -> TriggerName) -> NonEmptyText -> TriggerName
forall a b. (a -> b) -> a -> b
$ Text -> NonEmptyText
mkNonEmptyTextUnsafe Text
trn),
            eEvent :: Value
eEvent = Value
payload',
            eTries :: Int
eTries = Int
tries,
            eCreatedAt :: LocalTime
eCreatedAt = TimeZone -> UTCTime -> LocalTime
utcToLocalTime TimeZone
utc UTCTime
createdAt',
            eRetryAt :: Maybe UTCTime
eRetryAt = Maybe UTCTime
retryAt,
            eCreatedAtUTC :: UTCTime
eCreatedAtUTC = UTCTime
createdAt',
            eRetryAtUTC :: Maybe UTCTime
eRetryAtUTC = Maybe UTCTime
retryAt
          }

dropTriggerQ :: TriggerName -> SchemaName -> TxE QErr ()
dropTriggerQ :: TriggerName -> SchemaName -> TxET QErr IO ()
dropTriggerQ TriggerName
triggerName SchemaName
schemaName =
  (Ops -> TxET QErr IO ()) -> [Ops] -> TxET QErr IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (TriggerName -> SchemaName -> Ops -> TxET QErr IO ()
dropTriggerOp TriggerName
triggerName SchemaName
schemaName) [Ops
INSERT, Ops
UPDATE, Ops
DELETE]

dropTriggerOp :: TriggerName -> SchemaName -> Ops -> TxE QErr ()
dropTriggerOp :: TriggerName -> SchemaName -> Ops -> TxET QErr IO ()
dropTriggerOp TriggerName
triggerName SchemaName
schemaName Ops
triggerOp =
  (MSSQLTxError -> QErr) -> Query -> TxET QErr IO ()
forall (m :: * -> *) e.
MonadIO m =>
(MSSQLTxError -> e) -> Query -> TxET e m ()
unitQueryE
    MSSQLTxError -> QErr
HGE.defaultMSSQLTxErrorHandler
    (Text -> Query
rawUnescapedText (Text -> Query) -> Text -> Query
forall a b. (a -> b) -> a -> b
$ Ops -> Text
getDropTriggerSQL Ops
triggerOp)
  where
    getDropTriggerSQL :: Ops -> Text
    getDropTriggerSQL :: Ops -> Text
getDropTriggerSQL Ops
op =
      Text
"DROP TRIGGER IF EXISTS " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> QualifiedTriggerName -> Text
qualifiedTriggerNameToText (SchemaName -> SQLTriggerName -> QualifiedTriggerName
QualifiedTriggerName SchemaName
schemaName (TriggerName -> Ops -> SQLTriggerName
mkSQLTriggerName TriggerName
triggerName Ops
op))

archiveEvents :: TriggerName -> TxE QErr ()
archiveEvents :: TriggerName -> TxET QErr IO ()
archiveEvents TriggerName
triggerName =
  (MSSQLTxError -> QErr) -> Query -> TxET QErr IO ()
forall (m :: * -> *) e.
MonadIO m =>
(MSSQLTxError -> e) -> Query -> TxET e m ()
unitQueryE
    MSSQLTxError -> QErr
HGE.defaultMSSQLTxErrorHandler
    [ODBC.sql|
      UPDATE hdb_catalog.event_log
      SET archived = 1
      WHERE trigger_name = $triggerNameTxt
    |]
  where
    triggerNameTxt :: Text
triggerNameTxt = TriggerName -> Text
triggerNameToTxt TriggerName
triggerName

checkEventTx :: EventId -> TxE QErr ()
checkEventTx :: EventId -> TxET QErr IO ()
checkEventTx EventId
eventId = do
  -- If an event got locked within the last 30 minutes then it means that the event
  -- got picked up by during the last fetch-event poll and is being processed. Hence
  -- we do not allow the redelivery of such an event.
  ([Bool]
events :: [Bool]) <-
    (MSSQLTxError -> QErr) -> Query -> TxET QErr IO [Bool]
forall (m :: * -> *) a e.
(MonadIO m, FromRow a) =>
(MSSQLTxError -> e) -> Query -> TxET e m [a]
multiRowQueryE
      MSSQLTxError -> QErr
HGE.defaultMSSQLTxErrorHandler
      [ODBC.sql|
        SELECT
          CAST(CASE
                  WHEN (l.locked IS NOT NULL AND l.locked >= DATEADD(MINUTE, -30, SYSDATETIMEOFFSET())) THEN 1 ELSE 0
              END
          AS bit)
        FROM hdb_catalog.event_log l
        WHERE l.id = $eId
      |]

  Bool
event <- [Bool] -> TxE QErr Bool
forall {m :: * -> *} {a}. MonadError QErr m => [a] -> m a
getEvent [Bool]
events
  Bool -> TxET QErr IO ()
forall {f :: * -> *}. MonadError QErr f => Bool -> f ()
assertEventUnlocked Bool
event
  where
    eId :: Text
eId = EventId -> Text
unEventId (EventId -> Text) -> EventId -> Text
forall a b. (a -> b) -> a -> b
$ EventId
eventId

    getEvent :: [a] -> m a
getEvent [] = Code -> Text -> m a
forall (m :: * -> *) a. QErrM m => Code -> Text -> m a
throw400 Code
NotExists Text
"event not found"
    getEvent (a
x : [a]
_) = a -> m a
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
x

    assertEventUnlocked :: Bool -> f ()
assertEventUnlocked Bool
locked =
      Bool -> f () -> f ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
locked
        (f () -> f ()) -> f () -> f ()
forall a b. (a -> b) -> a -> b
$ Code -> Text -> f ()
forall (m :: * -> *) a. QErrM m => Code -> Text -> m a
throw400 Code
Busy Text
"event is already being processed"

markForDeliveryTx :: EventId -> TxE QErr ()
markForDeliveryTx :: EventId -> TxET QErr IO ()
markForDeliveryTx EventId
eventId = do
  (MSSQLTxError -> QErr) -> Query -> TxET QErr IO ()
forall (m :: * -> *) e.
MonadIO m =>
(MSSQLTxError -> e) -> Query -> TxET e m ()
unitQueryE
    MSSQLTxError -> QErr
HGE.defaultMSSQLTxErrorHandler
    [ODBC.sql|
      UPDATE hdb_catalog.event_log
      SET delivered = 0, error = 0, tries = 0
      WHERE id = $eId
    |]
  where
    eId :: Text
eId = EventId -> Text
unEventId (EventId -> Text) -> EventId -> Text
forall a b. (a -> b) -> a -> b
$ EventId
eventId

unlockEventsTx :: [EventId] -> TxE QErr Int
unlockEventsTx :: [EventId] -> TxET QErr IO Int
unlockEventsTx [EventId]
eventIds = do
  Int
numEvents <-
    (MSSQLTxError -> QErr) -> Query -> TxET QErr IO Int
forall (m :: * -> *) a e.
(MonadIO m, FromRow a) =>
(MSSQLTxError -> e) -> Query -> TxET e m a
singleRowQueryE MSSQLTxError -> QErr
HGE.defaultMSSQLTxErrorHandler
      (Query -> TxET QErr IO Int) -> Query -> TxET QErr IO Int
forall a b. (a -> b) -> a -> b
$ Text -> Query
rawUnescapedText
      (Text -> Query) -> (Text -> Text) -> Text -> Query
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> Text
LT.toStrict
      (Text -> Query) -> Text -> Query
forall a b. (a -> b) -> a -> b
$
      -- EventIds as list of VALUES (Eg: ('123-abc'), ('456-vgh'), ('234-asd'))
      let eventIdsValues :: Text
eventIdsValues = [EventId] -> Text
generateValuesFromEvents [EventId]
eventIds
       in $(makeRelativeToProject "src-rsr/mssql/mssql_unlock_events.sql.shakespeare" >>= ST.stextFile)
  Int -> TxET QErr IO Int
forall a. a -> TxET QErr IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Int
numEvents
  where
    generateValuesFromEvents :: [EventId] -> Text
    -- creates a list of event id's  (('123-abc'), ('456-vgh'), ('234-asd'))
    generateValuesFromEvents :: [EventId] -> Text
generateValuesFromEvents [EventId]
events = [Text] -> Text
forall t (f :: * -> *). (ToTxt t, Foldable f) => f t -> Text
commaSeparated [Text]
values
      where
        values :: [Text]
values = (EventId -> Text) -> [EventId] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map (\EventId
e -> Text
"(" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> EventId -> Text
forall a. ToTxt a => a -> Text
toTxt EventId
e Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
")") [EventId]
events

getMaintenanceModeVersionTx :: TxE QErr MaintenanceModeVersion
getMaintenanceModeVersionTx :: TxET QErr IO MaintenanceModeVersion
getMaintenanceModeVersionTx = do
  SourceCatalogVersion
catalogVersion <- TxET QErr IO SourceCatalogVersion
forall (m :: * -> *). MonadMSSQLTx m => m SourceCatalogVersion
getSourceCatalogVersion
  if
    | SourceCatalogVersion
catalogVersion SourceCatalogVersion -> SourceCatalogVersion -> Bool
forall a. Eq a => a -> a -> Bool
== SourceCatalogVersion
latestSourceCatalogVersion -> MaintenanceModeVersion -> TxET QErr IO MaintenanceModeVersion
forall a. a -> TxET QErr IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure MaintenanceModeVersion
CurrentMMVersion
    | Bool
otherwise ->
        Text -> TxET QErr IO MaintenanceModeVersion
forall (m :: * -> *) a. QErrM m => Text -> m a
throw500
          (Text -> TxET QErr IO MaintenanceModeVersion)
-> Text -> TxET QErr IO MaintenanceModeVersion
forall a b. (a -> b) -> a -> b
$ Text
"Maintenance mode is only supported with catalog versions: "
          Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> SourceCatalogVersion -> Text
forall a. Show a => a -> Text
tshow SourceCatalogVersion
latestSourceCatalogVersion
          Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" but received "
          Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> SourceCatalogVersion -> Text
forall a. Show a => a -> Text
tshow SourceCatalogVersion
catalogVersion

convertUTCToDatetime2 :: (MonadIO m) => UTCTime -> m Datetime2
convertUTCToDatetime2 :: forall (m :: * -> *). MonadIO m => UTCTime -> m Datetime2
convertUTCToDatetime2 UTCTime
utcTime = do
  let localTime :: LocalTime
localTime = TimeZone -> UTCTime -> LocalTime
utcToLocalTime TimeZone
utc UTCTime
utcTime
  Datetime2 -> m Datetime2
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Datetime2 -> m Datetime2) -> Datetime2 -> m Datetime2
forall a b. (a -> b) -> a -> b
$ LocalTime -> Datetime2
Datetime2 LocalTime
localTime

checkIfTriggerExistsQ ::
  TriggerName ->
  Ops ->
  TxE QErr Bool
checkIfTriggerExistsQ :: TriggerName -> Ops -> TxE QErr Bool
checkIfTriggerExistsQ TriggerName
triggerName Ops
op = do
  let triggerNameWithOp :: Text
triggerNameWithOp = Text
"notify_hasura_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> TriggerName -> Text
triggerNameToTxt TriggerName
triggerName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Ops -> Text
forall a. Show a => a -> Text
tshow Ops
op
  TxE QErr Bool -> TxE QErr Bool
forall a. TxE QErr a -> TxE QErr a
forall (m :: * -> *) a. MonadMSSQLTx m => TxE QErr a -> m a
liftMSSQLTx
    (TxE QErr Bool -> TxE QErr Bool) -> TxE QErr Bool -> TxE QErr Bool
forall a b. (a -> b) -> a -> b
$ (MSSQLTxError -> QErr) -> Query -> TxE QErr Bool
forall (m :: * -> *) a e.
(MonadIO m, FromRow a) =>
(MSSQLTxError -> e) -> Query -> TxET e m a
singleRowQueryE
      MSSQLTxError -> QErr
HGE.defaultMSSQLTxErrorHandler
      -- We check the existence of trigger across the entire database irrespective of
      -- the schema of the table
      [ODBC.sql|
          SELECT CASE WHEN EXISTS
            ( SELECT 1
              FROM sys.triggers WHERE name = $triggerNameWithOp
            )
          THEN CAST(1 AS BIT)
          ELSE CAST(0 AS BIT)
          END;
        |]

---- MSSQL event trigger utility functions -----------------

-- | This will quote the object name (similar to the @QUOTENAME@ function in SQL
-- server), i.e.
--
-- >>> mssqlFmtIdentifier "object_name" "[object_name]"
--
-- >>> mssqlFmtIdentifier "o]bject_nam[e" "[o]]bject_nam[e]"
--
-- TODO: Use some external tool for quoting, we should not quote the names by
-- ourselves.
mssqlFmtIdentifier :: Text -> Text
mssqlFmtIdentifier :: Text -> Text
mssqlFmtIdentifier Text
x =
  Text
"[" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> HasCallStack => Text -> Text -> Text -> Text
Text -> Text -> Text -> Text
T.replace Text
"]" Text
"]]" Text
x Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"]"

-- | A Representation of SQL Trigger name for an event trigger in MSSQL.
newtype SQLTriggerName = SQLTriggerName {SQLTriggerName -> Text
getSQLTriggerName :: Text}

instance ToSQL SQLTriggerName where
  toSQL :: SQLTriggerName -> Builder
toSQL = Text -> Builder
TB.text (Text -> Builder)
-> (SQLTriggerName -> Text) -> SQLTriggerName -> Builder
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> Text
mssqlFmtIdentifier (Text -> Text)
-> (SQLTriggerName -> Text) -> SQLTriggerName -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SQLTriggerName -> Text
getSQLTriggerName

mkSQLTriggerName :: TriggerName -> Ops -> SQLTriggerName
mkSQLTriggerName :: TriggerName -> Ops -> SQLTriggerName
mkSQLTriggerName TriggerName
triggerName Ops
op = Text -> SQLTriggerName
SQLTriggerName (Text -> SQLTriggerName) -> Text -> SQLTriggerName
forall a b. (a -> b) -> a -> b
$ Text
"notify_hasura_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> (TriggerName -> Text
triggerNameToTxt TriggerName
triggerName) Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Ops -> Text
forall a. Show a => a -> Text
tshow Ops
op

-- | A Representation of qualified SQL trigger object (`schema_name.SQL_trigger_name`).
data QualifiedTriggerName = QualifiedTriggerName
  { QualifiedTriggerName -> SchemaName
_qtnSchemaName :: SchemaName,
    QualifiedTriggerName -> SQLTriggerName
_qtnTriggerName :: SQLTriggerName
  }

instance ToSQL QualifiedTriggerName where
  toSQL :: QualifiedTriggerName -> Builder
toSQL (QualifiedTriggerName (SchemaName Text
schemaName) SQLTriggerName
triggerName) =
    Text -> Builder
TB.text (Text -> Text
mssqlFmtIdentifier Text
schemaName) Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<> Builder
"." Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<> SQLTriggerName -> Builder
forall a. ToSQL a => a -> Builder
toSQL SQLTriggerName
triggerName

qualifiedTriggerNameToText :: QualifiedTriggerName -> Text
qualifiedTriggerNameToText :: QualifiedTriggerName -> Text
qualifiedTriggerNameToText = Builder -> Text
TB.run (Builder -> Text)
-> (QualifiedTriggerName -> Builder)
-> QualifiedTriggerName
-> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. QualifiedTriggerName -> Builder
forall a. ToSQL a => a -> Builder
toSQL

-- | Store a fragment of SQL expression
newtype SQLFragment = SQLFragment {SQLFragment -> Text
unSQLFragment :: Text}

mkAllTriggersQ ::
  (MonadMSSQLTx m) =>
  TriggerName ->
  TableName ->
  TriggerOnReplication ->
  [ColumnInfo 'MSSQL] ->
  TriggerOpsDef 'MSSQL ->
  Maybe (PrimaryKey 'MSSQL (ColumnInfo 'MSSQL)) ->
  m ()
mkAllTriggersQ :: forall (m :: * -> *).
MonadMSSQLTx m =>
TriggerName
-> TableName
-> TriggerOnReplication
-> [ColumnInfo 'MSSQL]
-> TriggerOpsDef 'MSSQL
-> Maybe (PrimaryKey 'MSSQL (ColumnInfo 'MSSQL))
-> m ()
mkAllTriggersQ TriggerName
triggerName TableName
tableName TriggerOnReplication
triggerOnReplication [ColumnInfo 'MSSQL]
allCols TriggerOpsDef 'MSSQL
fullSpec Maybe (PrimaryKey 'MSSQL (ColumnInfo 'MSSQL))
primaryKey = do
  Maybe (SubscribeOpSpec 'MSSQL)
-> (SubscribeOpSpec 'MSSQL -> m ()) -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ (TriggerOpsDef 'MSSQL -> Maybe (SubscribeOpSpec 'MSSQL)
forall (b :: BackendType).
TriggerOpsDef b -> Maybe (SubscribeOpSpec b)
tdInsert TriggerOpsDef 'MSSQL
fullSpec) (TriggerName
-> TableName
-> [ColumnInfo 'MSSQL]
-> TriggerOnReplication
-> SubscribeOpSpec 'MSSQL
-> m ()
forall (m :: * -> *).
MonadMSSQLTx m =>
TriggerName
-> TableName
-> [ColumnInfo 'MSSQL]
-> TriggerOnReplication
-> SubscribeOpSpec 'MSSQL
-> m ()
mkInsertTriggerQ TriggerName
triggerName TableName
tableName [ColumnInfo 'MSSQL]
allCols TriggerOnReplication
triggerOnReplication)
  Maybe (SubscribeOpSpec 'MSSQL)
-> (SubscribeOpSpec 'MSSQL -> m ()) -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ (TriggerOpsDef 'MSSQL -> Maybe (SubscribeOpSpec 'MSSQL)
forall (b :: BackendType).
TriggerOpsDef b -> Maybe (SubscribeOpSpec b)
tdDelete TriggerOpsDef 'MSSQL
fullSpec) (TriggerName
-> TableName
-> [ColumnInfo 'MSSQL]
-> TriggerOnReplication
-> SubscribeOpSpec 'MSSQL
-> m ()
forall (m :: * -> *).
MonadMSSQLTx m =>
TriggerName
-> TableName
-> [ColumnInfo 'MSSQL]
-> TriggerOnReplication
-> SubscribeOpSpec 'MSSQL
-> m ()
mkDeleteTriggerQ TriggerName
triggerName TableName
tableName [ColumnInfo 'MSSQL]
allCols TriggerOnReplication
triggerOnReplication)
  Maybe (SubscribeOpSpec 'MSSQL)
-> (SubscribeOpSpec 'MSSQL -> m ()) -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ (TriggerOpsDef 'MSSQL -> Maybe (SubscribeOpSpec 'MSSQL)
forall (b :: BackendType).
TriggerOpsDef b -> Maybe (SubscribeOpSpec b)
tdUpdate TriggerOpsDef 'MSSQL
fullSpec) (TriggerName
-> TableName
-> [ColumnInfo 'MSSQL]
-> TriggerOnReplication
-> Maybe (PrimaryKey 'MSSQL (ColumnInfo 'MSSQL))
-> SubscribeOpSpec 'MSSQL
-> m ()
forall (m :: * -> *).
MonadMSSQLTx m =>
TriggerName
-> TableName
-> [ColumnInfo 'MSSQL]
-> TriggerOnReplication
-> Maybe (PrimaryKey 'MSSQL (ColumnInfo 'MSSQL))
-> SubscribeOpSpec 'MSSQL
-> m ()
mkUpdateTriggerQ TriggerName
triggerName TableName
tableName [ColumnInfo 'MSSQL]
allCols TriggerOnReplication
triggerOnReplication Maybe (PrimaryKey 'MSSQL (ColumnInfo 'MSSQL))
primaryKey)

getApplicableColumns :: [ColumnInfo 'MSSQL] -> SubscribeColumns 'MSSQL -> [ColumnInfo 'MSSQL]
getApplicableColumns :: [ColumnInfo 'MSSQL]
-> SubscribeColumns 'MSSQL -> [ColumnInfo 'MSSQL]
getApplicableColumns [ColumnInfo 'MSSQL]
allColumnInfos = \case
  SubscribeColumns 'MSSQL
SubCStar -> [ColumnInfo 'MSSQL]
allColumnInfos
  SubCArray [Column 'MSSQL]
cols -> [Column 'MSSQL] -> [ColumnInfo 'MSSQL] -> [ColumnInfo 'MSSQL]
forall (b :: BackendType).
Backend b =>
[Column b] -> [ColumnInfo b] -> [ColumnInfo b]
getColInfos [Column 'MSSQL]
cols [ColumnInfo 'MSSQL]
allColumnInfos

-- | Currently we do not support Event Triggers on columns of Spatial data types.
-- We do this because, currently the graphQL API for these types is broken
-- for MSSQL sources. Ref: https://github.com/hasura/graphql-engine-mono/issues/787
checkSpatialDataTypeColumns ::
  (MonadMSSQLTx m) =>
  [ColumnInfo 'MSSQL] ->
  SubscribeOpSpec 'MSSQL ->
  m ()
checkSpatialDataTypeColumns :: forall (m :: * -> *).
MonadMSSQLTx m =>
[ColumnInfo 'MSSQL] -> SubscribeOpSpec 'MSSQL -> m ()
checkSpatialDataTypeColumns [ColumnInfo 'MSSQL]
allCols (SubscribeOpSpec SubscribeColumns 'MSSQL
listenCols Maybe (SubscribeColumns 'MSSQL)
deliveryCols) = do
  let listenColumns :: [ColumnInfo 'MSSQL]
listenColumns = [ColumnInfo 'MSSQL]
-> SubscribeColumns 'MSSQL -> [ColumnInfo 'MSSQL]
getApplicableColumns [ColumnInfo 'MSSQL]
allCols SubscribeColumns 'MSSQL
listenCols
      deliveryColumns :: [ColumnInfo 'MSSQL]
deliveryColumns = [ColumnInfo 'MSSQL]
-> SubscribeColumns 'MSSQL -> [ColumnInfo 'MSSQL]
getApplicableColumns [ColumnInfo 'MSSQL]
allCols (SubscribeColumns 'MSSQL -> [ColumnInfo 'MSSQL])
-> SubscribeColumns 'MSSQL -> [ColumnInfo 'MSSQL]
forall a b. (a -> b) -> a -> b
$ SubscribeColumns 'MSSQL
-> Maybe (SubscribeColumns 'MSSQL) -> SubscribeColumns 'MSSQL
forall a. a -> Maybe a -> a
fromMaybe SubscribeColumns 'MSSQL
forall (b :: BackendType). SubscribeColumns b
SubCStar Maybe (SubscribeColumns 'MSSQL)
deliveryCols
      isGeoTypesInListenCols :: Bool
isGeoTypesInListenCols = (ColumnInfo 'MSSQL -> Bool) -> [ColumnInfo 'MSSQL] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
any ((ScalarType 'MSSQL -> Bool) -> ColumnType 'MSSQL -> Bool
forall (b :: BackendType).
(ScalarType b -> Bool) -> ColumnType b -> Bool
isScalarColumnWhere ScalarType 'MSSQL -> Bool
ScalarType -> Bool
isGeoType (ColumnType 'MSSQL -> Bool)
-> (ColumnInfo 'MSSQL -> ColumnType 'MSSQL)
-> ColumnInfo 'MSSQL
-> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ColumnInfo 'MSSQL -> ColumnType 'MSSQL
forall (b :: BackendType). ColumnInfo b -> ColumnType b
ciType) [ColumnInfo 'MSSQL]
listenColumns
      isGeoTypesInDeliversCols :: Bool
isGeoTypesInDeliversCols = (ColumnInfo 'MSSQL -> Bool) -> [ColumnInfo 'MSSQL] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
any ((ScalarType 'MSSQL -> Bool) -> ColumnType 'MSSQL -> Bool
forall (b :: BackendType).
(ScalarType b -> Bool) -> ColumnType b -> Bool
isScalarColumnWhere ScalarType 'MSSQL -> Bool
ScalarType -> Bool
isGeoType (ColumnType 'MSSQL -> Bool)
-> (ColumnInfo 'MSSQL -> ColumnType 'MSSQL)
-> ColumnInfo 'MSSQL
-> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ColumnInfo 'MSSQL -> ColumnType 'MSSQL
forall (b :: BackendType). ColumnInfo b -> ColumnType b
ciType) [ColumnInfo 'MSSQL]
deliveryColumns
  Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool
isGeoTypesInListenCols Bool -> Bool -> Bool
|| Bool
isGeoTypesInDeliversCols)
    (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ Code -> Text -> m ()
forall (m :: * -> *) a. QErrM m => Code -> Text -> m a
throw400 Code
NotSupported Text
"Event triggers for MS-SQL sources are not supported on tables having Geometry or Geography column types"
  where
    isGeoType :: ScalarType -> Bool
isGeoType = (ScalarType -> [ScalarType] -> Bool
forall a. Eq a => a -> [a] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [ScalarType]
geoTypes)

mkInsertTriggerQ ::
  (MonadMSSQLTx m) =>
  TriggerName ->
  TableName ->
  [ColumnInfo 'MSSQL] ->
  TriggerOnReplication ->
  SubscribeOpSpec 'MSSQL ->
  m ()
mkInsertTriggerQ :: forall (m :: * -> *).
MonadMSSQLTx m =>
TriggerName
-> TableName
-> [ColumnInfo 'MSSQL]
-> TriggerOnReplication
-> SubscribeOpSpec 'MSSQL
-> m ()
mkInsertTriggerQ TriggerName
triggerName TableName
table [ColumnInfo 'MSSQL]
allCols TriggerOnReplication
triggerOnReplication subOpSpec :: SubscribeOpSpec 'MSSQL
subOpSpec@(SubscribeOpSpec SubscribeColumns 'MSSQL
_listenCols Maybe (SubscribeColumns 'MSSQL)
deliveryCols) = do
  [ColumnInfo 'MSSQL] -> SubscribeOpSpec 'MSSQL -> m ()
forall (m :: * -> *).
MonadMSSQLTx m =>
[ColumnInfo 'MSSQL] -> SubscribeOpSpec 'MSSQL -> m ()
checkSpatialDataTypeColumns [ColumnInfo 'MSSQL]
allCols SubscribeOpSpec 'MSSQL
subOpSpec
  TxET QErr IO () -> m ()
forall a. TxE QErr a -> m a
forall (m :: * -> *) a. MonadMSSQLTx m => TxE QErr a -> m a
liftMSSQLTx (TxET QErr IO () -> m ()) -> TxET QErr IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
    (MSSQLTxError -> QErr) -> Query -> TxET QErr IO ()
forall (m :: * -> *) e.
MonadIO m =>
(MSSQLTxError -> e) -> Query -> TxET e m ()
unitQueryE MSSQLTxError -> QErr
HGE.defaultMSSQLTxErrorHandler
      (Query -> TxET QErr IO ()) -> Query -> TxET QErr IO ()
forall a b. (a -> b) -> a -> b
$ Text -> Query
rawUnescapedText
      (Text -> Query) -> (Text -> Text) -> Text -> Query
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> Text
LT.toStrict
      (Text -> Query) -> Text -> Query
forall a b. (a -> b) -> a -> b
$ do
        let deliveryColumns :: [ColumnInfo 'MSSQL]
deliveryColumns = [ColumnInfo 'MSSQL]
-> SubscribeColumns 'MSSQL -> [ColumnInfo 'MSSQL]
getApplicableColumns [ColumnInfo 'MSSQL]
allCols (SubscribeColumns 'MSSQL -> [ColumnInfo 'MSSQL])
-> SubscribeColumns 'MSSQL -> [ColumnInfo 'MSSQL]
forall a b. (a -> b) -> a -> b
$ SubscribeColumns 'MSSQL
-> Maybe (SubscribeColumns 'MSSQL) -> SubscribeColumns 'MSSQL
forall a. a -> Maybe a -> a
fromMaybe SubscribeColumns 'MSSQL
forall (b :: BackendType). SubscribeColumns b
SubCStar Maybe (SubscribeColumns 'MSSQL)
deliveryCols
        TableName
-> TriggerName
-> [ColumnInfo 'MSSQL]
-> TriggerOnReplication
-> Text
mkInsertTriggerQuery TableName
table TriggerName
triggerName [ColumnInfo 'MSSQL]
deliveryColumns TriggerOnReplication
triggerOnReplication

mkDeleteTriggerQ ::
  (MonadMSSQLTx m) =>
  TriggerName ->
  TableName ->
  [ColumnInfo 'MSSQL] ->
  TriggerOnReplication ->
  SubscribeOpSpec 'MSSQL ->
  m ()
mkDeleteTriggerQ :: forall (m :: * -> *).
MonadMSSQLTx m =>
TriggerName
-> TableName
-> [ColumnInfo 'MSSQL]
-> TriggerOnReplication
-> SubscribeOpSpec 'MSSQL
-> m ()
mkDeleteTriggerQ TriggerName
triggerName TableName
table [ColumnInfo 'MSSQL]
allCols TriggerOnReplication
triggerOnReplication subOpSpec :: SubscribeOpSpec 'MSSQL
subOpSpec@(SubscribeOpSpec SubscribeColumns 'MSSQL
_listenCols Maybe (SubscribeColumns 'MSSQL)
deliveryCols) = do
  [ColumnInfo 'MSSQL] -> SubscribeOpSpec 'MSSQL -> m ()
forall (m :: * -> *).
MonadMSSQLTx m =>
[ColumnInfo 'MSSQL] -> SubscribeOpSpec 'MSSQL -> m ()
checkSpatialDataTypeColumns [ColumnInfo 'MSSQL]
allCols SubscribeOpSpec 'MSSQL
subOpSpec
  TxET QErr IO () -> m ()
forall a. TxE QErr a -> m a
forall (m :: * -> *) a. MonadMSSQLTx m => TxE QErr a -> m a
liftMSSQLTx (TxET QErr IO () -> m ()) -> TxET QErr IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
    (MSSQLTxError -> QErr) -> Query -> TxET QErr IO ()
forall (m :: * -> *) e.
MonadIO m =>
(MSSQLTxError -> e) -> Query -> TxET e m ()
unitQueryE MSSQLTxError -> QErr
HGE.defaultMSSQLTxErrorHandler
      (Query -> TxET QErr IO ()) -> Query -> TxET QErr IO ()
forall a b. (a -> b) -> a -> b
$ Text -> Query
rawUnescapedText
      (Text -> Query) -> (Text -> Text) -> Text -> Query
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> Text
LT.toStrict
      (Text -> Query) -> Text -> Query
forall a b. (a -> b) -> a -> b
$ do
        let deliveryColumns :: [ColumnInfo 'MSSQL]
deliveryColumns = [ColumnInfo 'MSSQL]
-> SubscribeColumns 'MSSQL -> [ColumnInfo 'MSSQL]
getApplicableColumns [ColumnInfo 'MSSQL]
allCols (SubscribeColumns 'MSSQL -> [ColumnInfo 'MSSQL])
-> SubscribeColumns 'MSSQL -> [ColumnInfo 'MSSQL]
forall a b. (a -> b) -> a -> b
$ SubscribeColumns 'MSSQL
-> Maybe (SubscribeColumns 'MSSQL) -> SubscribeColumns 'MSSQL
forall a. a -> Maybe a -> a
fromMaybe SubscribeColumns 'MSSQL
forall (b :: BackendType). SubscribeColumns b
SubCStar Maybe (SubscribeColumns 'MSSQL)
deliveryCols
        TableName
-> TriggerName
-> [ColumnInfo 'MSSQL]
-> TriggerOnReplication
-> Text
mkDeleteTriggerQuery TableName
table TriggerName
triggerName [ColumnInfo 'MSSQL]
deliveryColumns TriggerOnReplication
triggerOnReplication

mkUpdateTriggerQ ::
  (MonadMSSQLTx m) =>
  TriggerName ->
  TableName ->
  [ColumnInfo 'MSSQL] ->
  TriggerOnReplication ->
  Maybe (PrimaryKey 'MSSQL (ColumnInfo 'MSSQL)) ->
  SubscribeOpSpec 'MSSQL ->
  m ()
mkUpdateTriggerQ :: forall (m :: * -> *).
MonadMSSQLTx m =>
TriggerName
-> TableName
-> [ColumnInfo 'MSSQL]
-> TriggerOnReplication
-> Maybe (PrimaryKey 'MSSQL (ColumnInfo 'MSSQL))
-> SubscribeOpSpec 'MSSQL
-> m ()
mkUpdateTriggerQ TriggerName
triggerName TableName
table [ColumnInfo 'MSSQL]
allCols TriggerOnReplication
triggerOnReplication Maybe (PrimaryKey 'MSSQL (ColumnInfo 'MSSQL))
primaryKeyMaybe subOpSpec :: SubscribeOpSpec 'MSSQL
subOpSpec@(SubscribeOpSpec SubscribeColumns 'MSSQL
listenCols Maybe (SubscribeColumns 'MSSQL)
deliveryCols) = do
  [ColumnInfo 'MSSQL] -> SubscribeOpSpec 'MSSQL -> m ()
forall (m :: * -> *).
MonadMSSQLTx m =>
[ColumnInfo 'MSSQL] -> SubscribeOpSpec 'MSSQL -> m ()
checkSpatialDataTypeColumns [ColumnInfo 'MSSQL]
allCols SubscribeOpSpec 'MSSQL
subOpSpec
  TxET QErr IO () -> m ()
forall a. TxE QErr a -> m a
forall (m :: * -> *) a. MonadMSSQLTx m => TxE QErr a -> m a
liftMSSQLTx (TxET QErr IO () -> m ()) -> TxET QErr IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
    PrimaryKey 'MSSQL (ColumnInfo 'MSSQL)
primaryKey <- Maybe (PrimaryKey 'MSSQL (ColumnInfo 'MSSQL))
-> TxET QErr IO (PrimaryKey 'MSSQL (ColumnInfo 'MSSQL))
-> TxET QErr IO (PrimaryKey 'MSSQL (ColumnInfo 'MSSQL))
forall (m :: * -> *) a. Applicative m => Maybe a -> m a -> m a
onNothing Maybe (PrimaryKey 'MSSQL (ColumnInfo 'MSSQL))
primaryKeyMaybe (Code
-> Text -> TxET QErr IO (PrimaryKey 'MSSQL (ColumnInfo 'MSSQL))
forall (m :: * -> *) a. QErrM m => Code -> Text -> m a
throw400 Code
NotSupported Text
"Update event triggers for MS-SQL sources are only supported on tables with primary keys")
    let deliveryColumns :: [ColumnInfo 'MSSQL]
deliveryColumns = [ColumnInfo 'MSSQL]
-> SubscribeColumns 'MSSQL -> [ColumnInfo 'MSSQL]
getApplicableColumns [ColumnInfo 'MSSQL]
allCols (SubscribeColumns 'MSSQL -> [ColumnInfo 'MSSQL])
-> SubscribeColumns 'MSSQL -> [ColumnInfo 'MSSQL]
forall a b. (a -> b) -> a -> b
$ SubscribeColumns 'MSSQL
-> Maybe (SubscribeColumns 'MSSQL) -> SubscribeColumns 'MSSQL
forall a. a -> Maybe a -> a
fromMaybe SubscribeColumns 'MSSQL
forall (b :: BackendType). SubscribeColumns b
SubCStar Maybe (SubscribeColumns 'MSSQL)
deliveryCols
        listenColumns :: [ColumnInfo 'MSSQL]
listenColumns = [ColumnInfo 'MSSQL]
-> SubscribeColumns 'MSSQL -> [ColumnInfo 'MSSQL]
getApplicableColumns [ColumnInfo 'MSSQL]
allCols SubscribeColumns 'MSSQL
listenCols
    (MSSQLTxError -> QErr) -> Query -> TxET QErr IO ()
forall (m :: * -> *) e.
MonadIO m =>
(MSSQLTxError -> e) -> Query -> TxET e m ()
unitQueryE MSSQLTxError -> QErr
HGE.defaultMSSQLTxErrorHandler
      (Query -> TxET QErr IO ()) -> Query -> TxET QErr IO ()
forall a b. (a -> b) -> a -> b
$ Text -> Query
rawUnescapedText
      (Text -> Query) -> (Text -> Text) -> Text -> Query
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> Text
LT.toStrict
      (Text -> Query) -> Text -> Query
forall a b. (a -> b) -> a -> b
$ TableName
-> TriggerName
-> [ColumnInfo 'MSSQL]
-> [ColumnInfo 'MSSQL]
-> PrimaryKey 'MSSQL (ColumnInfo 'MSSQL)
-> TriggerOnReplication
-> Text
mkUpdateTriggerQuery TableName
table TriggerName
triggerName [ColumnInfo 'MSSQL]
listenColumns [ColumnInfo 'MSSQL]
deliveryColumns PrimaryKey 'MSSQL (ColumnInfo 'MSSQL)
primaryKey TriggerOnReplication
triggerOnReplication

-- Create alias for columns
-- eg: If colPrefixMaybe is defined then 'inserted.id as payload.data.old.id'
--     else 'id as payload.data.old.id'
-- We create such an alias for the columns of the table because it helps in
-- structuring the JSON payload.
generateColumnTriggerAlias :: OpVar -> Maybe Text -> ColumnInfo 'MSSQL -> SQLFragment
generateColumnTriggerAlias :: OpVar -> Maybe Text -> ColumnInfo 'MSSQL -> SQLFragment
generateColumnTriggerAlias OpVar
op Maybe Text
colPrefixMaybe ColumnInfo 'MSSQL
colInfo =
  let opText :: Text
opText =
        case OpVar
op of
          OpVar
OLD -> Text
"old"
          OpVar
NEW -> Text
"new"
      -- Let's say we have a column 'id', dbColNameText returns the column name as
      -- text. i.e 'id'
      dbColNameText :: Text
dbColNameText = ColumnName -> Text
columnNameText (ColumnName -> Text) -> ColumnName -> Text
forall a b. (a -> b) -> a -> b
$ ColumnInfo 'MSSQL -> Column 'MSSQL
forall (b :: BackendType). ColumnInfo b -> Column b
ciColumn ColumnInfo 'MSSQL
colInfo
      joinPrefixedDbColNameText :: Text
joinPrefixedDbColNameText =
        case Maybe Text
colPrefixMaybe of
          -- prefix with the joining table's name
          -- `id` -> `inserted.id` (prefix = 'inserted')
          Just Text
colPrefix -> Text
colPrefix Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"." Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
dbColNameText
          -- do not prefix anthing to the column name
          Maybe Text
Nothing -> Text
dbColNameText
      -- create the alias for the column
      -- `payload.data.old.id` (opText = old) (dbColNameText = id)
      dbColAlias :: Text
dbColAlias = Text
"payload.data" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"." Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
opText Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"." Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
dbColNameText
   in -- create the SQL alias using the `as` keyword
      -- If colPrefixMaybe existed then alias will be `inserted.id as payload.data.old.id`
      -- If no colPrefixMaybe was Nothing then alias will be 'id as payload.data.old.id`
      Text -> SQLFragment
SQLFragment (Text -> SQLFragment) -> Text -> SQLFragment
forall a b. (a -> b) -> a -> b
$ Text -> Text
LT.toStrict (Text -> Text) -> Text -> Text
forall a b. (a -> b) -> a -> b
$ [ST.stext| #{joinPrefixedDbColNameText} as [#{dbColAlias}]|]

-- Converts tables name to the format [SCHEMA].[TABLENAME]
-- eg: [dbo].[author], [hge].[books]
qualifyTableName :: TableName -> Text
qualifyTableName :: TableName -> Text
qualifyTableName = Query -> Text
forall a. ToTxt a => a -> Text
toTxt (Query -> Text) -> (TableName -> Query) -> TableName -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Printer -> Query
toQueryFlat (Printer -> Query) -> (TableName -> Printer) -> TableName -> Query
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TableName -> Printer
fromTableName

mkInsertTriggerQuery :: TableName -> TriggerName -> [ColumnInfo 'MSSQL] -> TriggerOnReplication -> LT.Text
mkInsertTriggerQuery :: TableName
-> TriggerName
-> [ColumnInfo 'MSSQL]
-> TriggerOnReplication
-> Text
mkInsertTriggerQuery table :: TableName
table@(TableName Text
tableName schema :: SchemaName
schema@(SchemaName Text
schemaName)) TriggerName
triggerName [ColumnInfo 'MSSQL]
columns TriggerOnReplication
triggerOnReplication =
  let qualifiedTriggerName :: Text
qualifiedTriggerName = QualifiedTriggerName -> Text
qualifiedTriggerNameToText (QualifiedTriggerName -> Text) -> QualifiedTriggerName -> Text
forall a b. (a -> b) -> a -> b
$ SchemaName -> SQLTriggerName -> QualifiedTriggerName
QualifiedTriggerName SchemaName
schema (SQLTriggerName -> QualifiedTriggerName)
-> SQLTriggerName -> QualifiedTriggerName
forall a b. (a -> b) -> a -> b
$ TriggerName -> Ops -> SQLTriggerName
mkSQLTriggerName TriggerName
triggerName Ops
INSERT
      triggerNameText :: Text
triggerNameText = TriggerName -> Text
triggerNameToTxt TriggerName
triggerName
      qualifiedTableName :: Text
qualifiedTableName = TableName -> Text
qualifyTableName TableName
table
      operation :: Text
operation = Ops -> Text
forall a. Show a => a -> Text
tshow Ops
INSERT
      String
replicationClause :: String = if TriggerOnReplication
triggerOnReplication TriggerOnReplication -> TriggerOnReplication -> Bool
forall a. Eq a => a -> a -> Bool
/= TriggerOnReplication
TOREnableTrigger then String
"NOT FOR REPLICATION" else String
""
      Text
deliveryColsSQLExpression :: Text =
        [Text] -> Text
forall t (f :: * -> *). (ToTxt t, Foldable f) => f t -> Text
commaSeparated ([Text] -> Text) -> [Text] -> Text
forall a b. (a -> b) -> a -> b
$ (ColumnInfo 'MSSQL -> Text) -> [ColumnInfo 'MSSQL] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map (SQLFragment -> Text
unSQLFragment (SQLFragment -> Text)
-> (ColumnInfo 'MSSQL -> SQLFragment) -> ColumnInfo 'MSSQL -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. OpVar -> Maybe Text -> ColumnInfo 'MSSQL -> SQLFragment
generateColumnTriggerAlias OpVar
NEW Maybe Text
forall a. Maybe a
Nothing) [ColumnInfo 'MSSQL]
columns
   in $(makeRelativeToProject "src-rsr/mssql/mssql_insert_trigger.sql.shakespeare" >>= ST.stextFile)

mkDeleteTriggerQuery :: TableName -> TriggerName -> [ColumnInfo 'MSSQL] -> TriggerOnReplication -> LT.Text
mkDeleteTriggerQuery :: TableName
-> TriggerName
-> [ColumnInfo 'MSSQL]
-> TriggerOnReplication
-> Text
mkDeleteTriggerQuery table :: TableName
table@(TableName Text
tableName schema :: SchemaName
schema@(SchemaName Text
schemaName)) TriggerName
triggerName [ColumnInfo 'MSSQL]
columns TriggerOnReplication
triggerOnReplication =
  let qualifiedTriggerName :: Text
qualifiedTriggerName = QualifiedTriggerName -> Text
qualifiedTriggerNameToText (QualifiedTriggerName -> Text) -> QualifiedTriggerName -> Text
forall a b. (a -> b) -> a -> b
$ SchemaName -> SQLTriggerName -> QualifiedTriggerName
QualifiedTriggerName SchemaName
schema (SQLTriggerName -> QualifiedTriggerName)
-> SQLTriggerName -> QualifiedTriggerName
forall a b. (a -> b) -> a -> b
$ TriggerName -> Ops -> SQLTriggerName
mkSQLTriggerName TriggerName
triggerName Ops
DELETE
      triggerNameText :: Text
triggerNameText = TriggerName -> Text
triggerNameToTxt TriggerName
triggerName
      qualifiedTableName :: Text
qualifiedTableName = TableName -> Text
qualifyTableName TableName
table
      operation :: Text
operation = Ops -> Text
forall a. Show a => a -> Text
tshow Ops
DELETE
      String
replicationClause :: String = if TriggerOnReplication
triggerOnReplication TriggerOnReplication -> TriggerOnReplication -> Bool
forall a. Eq a => a -> a -> Bool
/= TriggerOnReplication
TOREnableTrigger then String
"NOT FOR REPLICATION" else String
""
      Text
deliveryColsSQLExpression :: Text = [Text] -> Text
forall t (f :: * -> *). (ToTxt t, Foldable f) => f t -> Text
commaSeparated ([Text] -> Text) -> [Text] -> Text
forall a b. (a -> b) -> a -> b
$ (ColumnInfo 'MSSQL -> Text) -> [ColumnInfo 'MSSQL] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map (SQLFragment -> Text
unSQLFragment (SQLFragment -> Text)
-> (ColumnInfo 'MSSQL -> SQLFragment) -> ColumnInfo 'MSSQL -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. OpVar -> Maybe Text -> ColumnInfo 'MSSQL -> SQLFragment
generateColumnTriggerAlias OpVar
OLD Maybe Text
forall a. Maybe a
Nothing) [ColumnInfo 'MSSQL]
columns
   in $(makeRelativeToProject "src-rsr/mssql/mssql_delete_trigger.sql.shakespeare" >>= ST.stextFile)

-- Creates Primary Join key expression for UPDATE SQL Trigger
-- eg: 'INSERTED.id = DELETED.id AND INSERTED.emp_code = DELETED.emp_code'
mkPrimaryKeyJoinExp :: Text -> Text -> [ColumnInfo 'MSSQL] -> SQLFragment
mkPrimaryKeyJoinExp :: Text -> Text -> [ColumnInfo 'MSSQL] -> SQLFragment
mkPrimaryKeyJoinExp Text
lhsPrefix Text
rhsPrefix [ColumnInfo 'MSSQL]
columns =
  Text -> SQLFragment
SQLFragment (Text -> SQLFragment) -> Text -> SQLFragment
forall a b. (a -> b) -> a -> b
$ Text -> [Text] -> Text
T.intercalate Text
" AND " ([Text] -> Text) -> [Text] -> Text
forall a b. (a -> b) -> a -> b
$ ColumnInfo 'MSSQL -> Text
singleColExp (ColumnInfo 'MSSQL -> Text) -> [ColumnInfo 'MSSQL] -> [Text]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [ColumnInfo 'MSSQL]
columns
  where
    singleColExp :: ColumnInfo 'MSSQL -> Text
singleColExp ColumnInfo 'MSSQL
colInfo =
      let dbColNameText :: Text
dbColNameText = ColumnName -> Text
columnNameText (ColumnName -> Text) -> ColumnName -> Text
forall a b. (a -> b) -> a -> b
$ ColumnInfo 'MSSQL -> Column 'MSSQL
forall (b :: BackendType). ColumnInfo b -> Column b
ciColumn ColumnInfo 'MSSQL
colInfo
       in Text -> Text
LT.toStrict (Text -> Text) -> Text -> Text
forall a b. (a -> b) -> a -> b
$ [ST.stext| #{lhsPrefix}.#{dbColNameText} = #{rhsPrefix}.#{dbColNameText} |]

-- Creates the WHERE clause for UPDATE SQL Trigger
-- eg: If no listenColumns are defined then the where clause is an empty text
--     else, 'WHERE INSERTED.id != DELETED.id OR INSERTED.name != DELTED.name'
mkListenColumnsExp :: Text -> Text -> [ColumnInfo 'MSSQL] -> SQLFragment
mkListenColumnsExp :: Text -> Text -> [ColumnInfo 'MSSQL] -> SQLFragment
mkListenColumnsExp Text
_ Text
_ [] = Text -> SQLFragment
SQLFragment Text
""
mkListenColumnsExp Text
lhsPrefix Text
rhsPrefix [ColumnInfo 'MSSQL]
columns =
  Text -> SQLFragment
SQLFragment (Text -> SQLFragment) -> Text -> SQLFragment
forall a b. (a -> b) -> a -> b
$ Text
"where " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> [Text] -> Text
T.intercalate Text
" OR " (ColumnInfo 'MSSQL -> Text
singleColExp (ColumnInfo 'MSSQL -> Text) -> [ColumnInfo 'MSSQL] -> [Text]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [ColumnInfo 'MSSQL]
columns)
  where
    singleColExp :: ColumnInfo 'MSSQL -> Text
singleColExp ColumnInfo 'MSSQL
colInfo =
      let dbColNameText :: Text
dbColNameText = ColumnName -> Text
columnNameText (ColumnName -> Text) -> ColumnName -> Text
forall a b. (a -> b) -> a -> b
$ ColumnInfo 'MSSQL -> Column 'MSSQL
forall (b :: BackendType). ColumnInfo b -> Column b
ciColumn ColumnInfo 'MSSQL
colInfo
       in Text -> Text
LT.toStrict (Text -> Text) -> Text -> Text
forall a b. (a -> b) -> a -> b
$ [ST.stext| #{lhsPrefix}.#{dbColNameText} != #{rhsPrefix}.#{dbColNameText} |]

-- | Check if primary key is present in listen columns
-- We use this in update event trigger, to check if the primary key has been updated
-- and construct the payload accordingly.
isPrimaryKeyInListenColumns :: [ColumnInfo 'MSSQL] -> PrimaryKey 'MSSQL (ColumnInfo 'MSSQL) -> SQLFragment
isPrimaryKeyInListenColumns :: [ColumnInfo 'MSSQL]
-> PrimaryKey 'MSSQL (ColumnInfo 'MSSQL) -> SQLFragment
isPrimaryKeyInListenColumns [ColumnInfo 'MSSQL]
listenCols PrimaryKey 'MSSQL (ColumnInfo 'MSSQL)
primaryKey =
  case (NESeq (ColumnInfo 'MSSQL) -> [ColumnInfo 'MSSQL]
forall a. NESeq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (PrimaryKey 'MSSQL (ColumnInfo 'MSSQL) -> NESeq (ColumnInfo 'MSSQL)
forall (b :: BackendType) a. PrimaryKey b a -> NESeq a
_pkColumns PrimaryKey 'MSSQL (ColumnInfo 'MSSQL)
primaryKey) [ColumnInfo 'MSSQL] -> [ColumnInfo 'MSSQL] -> [ColumnInfo 'MSSQL]
forall a. Eq a => [a] -> [a] -> [a]
`intersect` [ColumnInfo 'MSSQL]
listenCols) of
    -- Do not fire Event Trigger if primary key not in listen column
    [] -> Text -> SQLFragment
SQLFragment (Text -> SQLFragment) -> Text -> SQLFragment
forall a b. (a -> b) -> a -> b
$ Text
"1 != 1"
    -- Fire Event Trigger, since primary key is in listen column
    [ColumnInfo 'MSSQL]
_ -> Text -> SQLFragment
SQLFragment (Text -> SQLFragment) -> Text -> SQLFragment
forall a b. (a -> b) -> a -> b
$ Text
"1 = 1"

{- Note [Update Event Trigger MSSQL Spec]
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
An MS-SQL trigger is different from a postgres trigger in some ways
  * MS-SQL doesn't support triggers which trigger for each row, so in case of
    mutations which affect multiple rows, there'll only be a single trigger
    fired which will contain the data of all the rows that were affected.
  * MS-SQL maintains two logical tables, namely, [`inserted` and `deleted`](https://docs.microsoft.com/en-us/sql/relational-databases/triggers/use-the-inserted-and-deleted-tables?view=sql-server-ver15).
    The rows in the `inserted` table are copies of the new rows in the trigger
    table and similarly the `deleted` table contains the copies of the rows
    that were deleted from the trigger table.
  * When there's an update transaction, the old data (before the update) will
    be copied to the `deleted` table and the new data will be copied to the
    `inserted` table.

Since we deliver the 'old' and 'new' data in the event trigger payload, we would need
a way to correlate the values from `inserted` and `deleted` tables. And this is why,
It is mandatory for a MSSQL Update trigger table to have a primary key. We use this
primary key to correlate between `inserted` and `deleted`

MSSQL UPDATE trigger's join clause depends on the fact that the primary key is never
updated. But when the primary key is updated, you cannot join the 'INSERTED' and
the 'DELETED' tables. Hence for those cases, we consider the old payload as NULL and
the new payload will contain the updated row changes.

To figure out if a primary key has been updated, we do the following:
For each row present in the INSERTED table, we check if there are any rows in DELETED
tabled that has the same primary key as that of the row in INSERTED table. If such a
row does not exists, then it means that the primary key has been updated. The sample
SQL which does this looks like:
  SELECT * FROM INSERTED
  WHERE NOT EXISTS (SELECT * FROM DELETED WHERE  INSERTED.id = DELETED.id )

The spec for MSSQL UPDATE Event Trigger is as follows:
1. UPDATE Event Trigger can only be created on tables with a primary key.
2. When Primary Key is not updated during a UPDATE transaction then both 'data.new'
   and 'data.old' fields in payload will be constructed.
3. When Primary Key is updated during a UPDATE transaction then there are two cases:
    a. If the updated Primary key is equal to one of the already present primary key in
       the table then, we construct both the 'data.old' and 'data.new'
    b. If the updated primary key is not equal to any of the already present primary key
       in the table then, 'data.old' is NULL and only 'data.new' is constructed.
-}
mkUpdateTriggerQuery :: TableName -> TriggerName -> [ColumnInfo 'MSSQL] -> [ColumnInfo 'MSSQL] -> PrimaryKey 'MSSQL (ColumnInfo 'MSSQL) -> TriggerOnReplication -> LT.Text
mkUpdateTriggerQuery :: TableName
-> TriggerName
-> [ColumnInfo 'MSSQL]
-> [ColumnInfo 'MSSQL]
-> PrimaryKey 'MSSQL (ColumnInfo 'MSSQL)
-> TriggerOnReplication
-> Text
mkUpdateTriggerQuery
  table :: TableName
table@(TableName Text
tableName schema :: SchemaName
schema@(SchemaName Text
schemaName))
  TriggerName
triggerName
  [ColumnInfo 'MSSQL]
listenColumns
  [ColumnInfo 'MSSQL]
deliveryColumns
  PrimaryKey 'MSSQL (ColumnInfo 'MSSQL)
primaryKey
  TriggerOnReplication
triggerOnReplication =
    let qualifiedTriggerName :: Text
qualifiedTriggerName = QualifiedTriggerName -> Text
qualifiedTriggerNameToText (QualifiedTriggerName -> Text) -> QualifiedTriggerName -> Text
forall a b. (a -> b) -> a -> b
$ SchemaName -> SQLTriggerName -> QualifiedTriggerName
QualifiedTriggerName SchemaName
schema (SQLTriggerName -> QualifiedTriggerName)
-> SQLTriggerName -> QualifiedTriggerName
forall a b. (a -> b) -> a -> b
$ TriggerName -> Ops -> SQLTriggerName
mkSQLTriggerName TriggerName
triggerName Ops
UPDATE
        triggerNameText :: Text
triggerNameText = TriggerName -> Text
triggerNameToTxt TriggerName
triggerName
        qualifiedTableName :: Text
qualifiedTableName = TableName -> Text
qualifyTableName TableName
table
        operation :: Text
operation = Ops -> Text
forall a. Show a => a -> Text
tshow Ops
UPDATE
        String
replicationClause :: String = if TriggerOnReplication
triggerOnReplication TriggerOnReplication -> TriggerOnReplication -> Bool
forall a. Eq a => a -> a -> Bool
/= TriggerOnReplication
TOREnableTrigger then String
"NOT FOR REPLICATION" else String
""

        Text
oldDeliveryColsSQLExp :: Text = [Text] -> Text
forall t (f :: * -> *). (ToTxt t, Foldable f) => f t -> Text
commaSeparated ([Text] -> Text) -> [Text] -> Text
forall a b. (a -> b) -> a -> b
$ (ColumnInfo 'MSSQL -> Text) -> [ColumnInfo 'MSSQL] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map (SQLFragment -> Text
unSQLFragment (SQLFragment -> Text)
-> (ColumnInfo 'MSSQL -> SQLFragment) -> ColumnInfo 'MSSQL -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. OpVar -> Maybe Text -> ColumnInfo 'MSSQL -> SQLFragment
generateColumnTriggerAlias OpVar
OLD (Text -> Maybe Text
forall a. a -> Maybe a
Just Text
"DELETED")) [ColumnInfo 'MSSQL]
deliveryColumns
        Text
newDeliveryColsSQLExp :: Text = [Text] -> Text
forall t (f :: * -> *). (ToTxt t, Foldable f) => f t -> Text
commaSeparated ([Text] -> Text) -> [Text] -> Text
forall a b. (a -> b) -> a -> b
$ (ColumnInfo 'MSSQL -> Text) -> [ColumnInfo 'MSSQL] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map (SQLFragment -> Text
unSQLFragment (SQLFragment -> Text)
-> (ColumnInfo 'MSSQL -> SQLFragment) -> ColumnInfo 'MSSQL -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. OpVar -> Maybe Text -> ColumnInfo 'MSSQL -> SQLFragment
generateColumnTriggerAlias OpVar
NEW (Text -> Maybe Text
forall a. a -> Maybe a
Just Text
"INSERTED")) [ColumnInfo 'MSSQL]
deliveryColumns

        -- When Primary key is updated, then 'data.old' would be NULL
        -- See Note [Update Event Trigger MSSQL Spec]
        Text
oldDeliveryColsSQLExpWhenPrimaryKeyUpdated :: Text =
          Text
"NULL as [payload.data.old]"
        Text
newDeliveryColsSQLExpWhenPrimaryKeyUpdated :: Text =
          [Text] -> Text
forall t (f :: * -> *). (ToTxt t, Foldable f) => f t -> Text
commaSeparated ([Text] -> Text) -> [Text] -> Text
forall a b. (a -> b) -> a -> b
$ (ColumnInfo 'MSSQL -> Text) -> [ColumnInfo 'MSSQL] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map (SQLFragment -> Text
unSQLFragment (SQLFragment -> Text)
-> (ColumnInfo 'MSSQL -> SQLFragment) -> ColumnInfo 'MSSQL -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. OpVar -> Maybe Text -> ColumnInfo 'MSSQL -> SQLFragment
generateColumnTriggerAlias OpVar
NEW (Text -> Maybe Text
forall a. a -> Maybe a
Just Text
"INSERTED")) [ColumnInfo 'MSSQL]
deliveryColumns

        primaryKeyJoinExp :: Text
primaryKeyJoinExp = SQLFragment -> Text
unSQLFragment (SQLFragment -> Text) -> SQLFragment -> Text
forall a b. (a -> b) -> a -> b
$ Text -> Text -> [ColumnInfo 'MSSQL] -> SQLFragment
mkPrimaryKeyJoinExp Text
"INSERTED" Text
"DELETED" (NESeq (ColumnInfo 'MSSQL) -> [ColumnInfo 'MSSQL]
forall a. NESeq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (PrimaryKey 'MSSQL (ColumnInfo 'MSSQL) -> NESeq (ColumnInfo 'MSSQL)
forall (b :: BackendType) a. PrimaryKey b a -> NESeq a
_pkColumns PrimaryKey 'MSSQL (ColumnInfo 'MSSQL)
primaryKey))
        listenColumnExp :: Text
listenColumnExp = SQLFragment -> Text
unSQLFragment (SQLFragment -> Text) -> SQLFragment -> Text
forall a b. (a -> b) -> a -> b
$ Text -> Text -> [ColumnInfo 'MSSQL] -> SQLFragment
mkListenColumnsExp Text
"INSERTED" Text
"DELETED" [ColumnInfo 'MSSQL]
listenColumns
        isPrimaryKeyInListenColumnsExp :: Text
isPrimaryKeyInListenColumnsExp = SQLFragment -> Text
unSQLFragment (SQLFragment -> Text) -> SQLFragment -> Text
forall a b. (a -> b) -> a -> b
$ [ColumnInfo 'MSSQL]
-> PrimaryKey 'MSSQL (ColumnInfo 'MSSQL) -> SQLFragment
isPrimaryKeyInListenColumns [ColumnInfo 'MSSQL]
listenColumns PrimaryKey 'MSSQL (ColumnInfo 'MSSQL)
primaryKey
     in $(makeRelativeToProject "src-rsr/mssql/mssql_update_trigger.sql.shakespeare" >>= ST.stextFile)

-- | Add cleanup logs for given trigger names and cleanup configs. This will perform the following steps:
--
--   1. Get last scheduled cleanup event and count.
--   2. If count is less than 5, then add add more cleanup logs, else do nothing
addCleanupSchedules ::
  (MonadIO m, MonadError QErr m) =>
  MSSQLSourceConfig ->
  [(TriggerName, AutoTriggerLogCleanupConfig)] ->
  m ()
addCleanupSchedules :: forall (m :: * -> *).
(MonadIO m, MonadError QErr m) =>
MSSQLSourceConfig
-> [(TriggerName, AutoTriggerLogCleanupConfig)] -> m ()
addCleanupSchedules MSSQLSourceConfig
sourceConfig [(TriggerName, AutoTriggerLogCleanupConfig)]
triggersWithcleanupConfig =
  Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless ([(TriggerName, AutoTriggerLogCleanupConfig)] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [(TriggerName, AutoTriggerLogCleanupConfig)]
triggersWithcleanupConfig) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
    UTCTime
currTimeUTC <- IO UTCTime -> m UTCTime
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
    let currTime :: ZonedTime
currTime = TimeZone -> UTCTime -> ZonedTime
utcToZonedTime TimeZone
utc UTCTime
currTimeUTC
        triggerNames :: [TriggerName]
triggerNames = ((TriggerName, AutoTriggerLogCleanupConfig) -> TriggerName)
-> [(TriggerName, AutoTriggerLogCleanupConfig)] -> [TriggerName]
forall a b. (a -> b) -> [a] -> [b]
map (TriggerName, AutoTriggerLogCleanupConfig) -> TriggerName
forall a b. (a, b) -> a
fst [(TriggerName, AutoTriggerLogCleanupConfig)]
triggersWithcleanupConfig
    [(TriggerName, (Int, ZonedTime))]
allScheduledCleanupsInDB <- m (Either QErr [(TriggerName, (Int, ZonedTime))])
-> m [(TriggerName, (Int, ZonedTime))]
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM (m (Either QErr [(TriggerName, (Int, ZonedTime))])
 -> m [(TriggerName, (Int, ZonedTime))])
-> m (Either QErr [(TriggerName, (Int, ZonedTime))])
-> m [(TriggerName, (Int, ZonedTime))]
forall a b. (a -> b) -> a -> b
$ IO (Either QErr [(TriggerName, (Int, ZonedTime))])
-> m (Either QErr [(TriggerName, (Int, ZonedTime))])
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either QErr [(TriggerName, (Int, ZonedTime))])
 -> m (Either QErr [(TriggerName, (Int, ZonedTime))]))
-> IO (Either QErr [(TriggerName, (Int, ZonedTime))])
-> m (Either QErr [(TriggerName, (Int, ZonedTime))])
forall a b. (a -> b) -> a -> b
$ MSSQLSourceConfig
-> TxET QErr IO [(TriggerName, (Int, ZonedTime))]
-> IO (Either QErr [(TriggerName, (Int, ZonedTime))])
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
MSSQLSourceConfig -> TxET QErr m a -> m (Either QErr a)
runMSSQLSourceWriteTx MSSQLSourceConfig
sourceConfig (TxET QErr IO [(TriggerName, (Int, ZonedTime))]
 -> IO (Either QErr [(TriggerName, (Int, ZonedTime))]))
-> TxET QErr IO [(TriggerName, (Int, ZonedTime))]
-> IO (Either QErr [(TriggerName, (Int, ZonedTime))])
forall a b. (a -> b) -> a -> b
$ [TriggerName] -> TxET QErr IO [(TriggerName, (Int, ZonedTime))]
selectLastCleanupScheduledTimestamp [TriggerName]
triggerNames
    let triggerMap :: HashMap TriggerName (Int, ZonedTime)
triggerMap = [(TriggerName, (Int, ZonedTime))]
-> HashMap TriggerName (Int, ZonedTime)
forall k v. (Eq k, Hashable k) => [(k, v)] -> HashMap k v
HashMap.fromList ([(TriggerName, (Int, ZonedTime))]
 -> HashMap TriggerName (Int, ZonedTime))
-> [(TriggerName, (Int, ZonedTime))]
-> HashMap TriggerName (Int, ZonedTime)
forall a b. (a -> b) -> a -> b
$ [(TriggerName, (Int, ZonedTime))]
allScheduledCleanupsInDB
        scheduledTriggersAndTimestamps :: [(TriggerName, [Datetimeoffset])]
scheduledTriggersAndTimestamps =
          ((TriggerName, AutoTriggerLogCleanupConfig)
 -> Maybe (TriggerName, [Datetimeoffset]))
-> [(TriggerName, AutoTriggerLogCleanupConfig)]
-> [(TriggerName, [Datetimeoffset])]
forall a b. (a -> Maybe b) -> [a] -> [b]
forall (f :: * -> *) a b.
Filterable f =>
(a -> Maybe b) -> f a -> f b
mapMaybe
            ( \(TriggerName
tName, AutoTriggerLogCleanupConfig
cConfig) ->
                let lastScheduledTime :: Maybe ZonedTime
lastScheduledTime = case TriggerName
-> HashMap TriggerName (Int, ZonedTime) -> Maybe (Int, ZonedTime)
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HashMap.lookup TriggerName
tName HashMap TriggerName (Int, ZonedTime)
triggerMap of
                      Maybe (Int, ZonedTime)
Nothing -> ZonedTime -> Maybe ZonedTime
forall a. a -> Maybe a
Just ZonedTime
currTime
                      Just (Int
count, ZonedTime
lastTime) -> if Int
count Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
5 then (ZonedTime -> Maybe ZonedTime
forall a. a -> Maybe a
Just ZonedTime
lastTime) else Maybe ZonedTime
forall a. Maybe a
Nothing
                 in (ZonedTime -> (TriggerName, [Datetimeoffset]))
-> Maybe ZonedTime -> Maybe (TriggerName, [Datetimeoffset])
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap
                      ( \ZonedTime
lastScheduledTimestamp ->
                          (TriggerName
tName, (UTCTime -> Datetimeoffset) -> [UTCTime] -> [Datetimeoffset]
forall a b. (a -> b) -> [a] -> [b]
map (ZonedTime -> Datetimeoffset
Datetimeoffset (ZonedTime -> Datetimeoffset)
-> (UTCTime -> ZonedTime) -> UTCTime -> Datetimeoffset
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TimeZone -> UTCTime -> ZonedTime
utcToZonedTime TimeZone
utc) ([UTCTime] -> [Datetimeoffset]) -> [UTCTime] -> [Datetimeoffset]
forall a b. (a -> b) -> a -> b
$ UTCTime -> Int -> CronSchedule -> [UTCTime]
generateScheduleTimes (ZonedTime -> UTCTime
zonedTimeToUTC ZonedTime
lastScheduledTimestamp) Int
cleanupSchedulesToBeGenerated (AutoTriggerLogCleanupConfig -> CronSchedule
_atlccSchedule AutoTriggerLogCleanupConfig
cConfig))
                      )
                      Maybe ZonedTime
lastScheduledTime
            )
            [(TriggerName, AutoTriggerLogCleanupConfig)]
triggersWithcleanupConfig
    Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless ([(TriggerName, [Datetimeoffset])] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [(TriggerName, [Datetimeoffset])]
scheduledTriggersAndTimestamps)
      (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ m (Either QErr ()) -> m ()
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM
      (m (Either QErr ()) -> m ()) -> m (Either QErr ()) -> m ()
forall a b. (a -> b) -> a -> b
$ IO (Either QErr ()) -> m (Either QErr ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
      (IO (Either QErr ()) -> m (Either QErr ()))
-> IO (Either QErr ()) -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$ MSSQLSourceConfig -> TxET QErr IO () -> IO (Either QErr ())
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
MSSQLSourceConfig -> TxET QErr m a -> m (Either QErr a)
runMSSQLSourceWriteTx MSSQLSourceConfig
sourceConfig
      (TxET QErr IO () -> IO (Either QErr ()))
-> TxET QErr IO () -> IO (Either QErr ())
forall a b. (a -> b) -> a -> b
$ [(TriggerName, [Datetimeoffset])] -> TxET QErr IO ()
insertEventTriggerCleanupLogsTx [(TriggerName, [Datetimeoffset])]
scheduledTriggersAndTimestamps

-- | Insert the cleanup logs for the given trigger name and schedules
insertEventTriggerCleanupLogsTx :: [(TriggerName, [Datetimeoffset])] -> TxET QErr IO ()
insertEventTriggerCleanupLogsTx :: [(TriggerName, [Datetimeoffset])] -> TxET QErr IO ()
insertEventTriggerCleanupLogsTx [(TriggerName, [Datetimeoffset])]
triggerNameWithSchedules =
  (MSSQLTxError -> QErr) -> Query -> TxET QErr IO ()
forall (m :: * -> *) e.
MonadIO m =>
(MSSQLTxError -> e) -> Query -> TxET e m ()
unitQueryE
    MSSQLTxError -> QErr
HGE.defaultMSSQLTxErrorHandler
    ( Text -> Query
rawUnescapedText
        [ST.st|
      INSERT INTO hdb_catalog.hdb_event_log_cleanups(trigger_name, scheduled_at, status)
      VALUES #{sqlValues};
      |]
    )
  where
    sqlValues :: Text
sqlValues =
      [Text] -> Text
forall t (f :: * -> *). (ToTxt t, Foldable f) => f t -> Text
commaSeparated
        ([Text] -> Text) -> [Text] -> Text
forall a b. (a -> b) -> a -> b
$ ((TriggerName, [Datetimeoffset]) -> Text)
-> [(TriggerName, [Datetimeoffset])] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map
          ( \(TriggerName
triggerName, [Datetimeoffset]
schedules) ->
              (Datetimeoffset -> Text) -> [Datetimeoffset] -> Text
forall a. (a -> Text) -> [a] -> Text
generateSQLValuesFromListWith
                ( \Datetimeoffset
schedule ->
                    Text
"'" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> TriggerName -> Text
triggerNameToTxt TriggerName
triggerName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"', '" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> (String -> Text
T.pack (String -> Text)
-> (Datetimeoffset -> String) -> Datetimeoffset -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ZonedTime -> String
forall t. ISO8601 t => t -> String
iso8601Show (ZonedTime -> String)
-> (Datetimeoffset -> ZonedTime) -> Datetimeoffset -> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Datetimeoffset -> ZonedTime
unDatetimeoffset) Datetimeoffset
schedule Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"', 'scheduled'"
                )
                [Datetimeoffset]
schedules
          )
          [(TriggerName, [Datetimeoffset])]
triggerNameWithSchedules

-- | Get the last scheduled timestamp for a given event trigger name
selectLastCleanupScheduledTimestamp :: [TriggerName] -> TxET QErr IO [(TriggerName, (Int, ZonedTime))]
selectLastCleanupScheduledTimestamp :: [TriggerName] -> TxET QErr IO [(TriggerName, (Int, ZonedTime))]
selectLastCleanupScheduledTimestamp [TriggerName]
triggerNames =
  ((Text, Int, ZonedTime) -> (TriggerName, (Int, ZonedTime)))
-> [(Text, Int, ZonedTime)] -> [(TriggerName, (Int, ZonedTime))]
forall a b. (a -> b) -> [a] -> [b]
map
    ( \(Text
triggerName, Int
count, ZonedTime
lastScheduledTime) ->
        (NonEmptyText -> TriggerName
TriggerName (Text -> NonEmptyText
mkNonEmptyTextUnsafe Text
triggerName), (Int
count, ZonedTime
lastScheduledTime))
    )
    ([(Text, Int, ZonedTime)] -> [(TriggerName, (Int, ZonedTime))])
-> TxET QErr IO [(Text, Int, ZonedTime)]
-> TxET QErr IO [(TriggerName, (Int, ZonedTime))]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (MSSQLTxError -> QErr)
-> Query -> TxET QErr IO [(Text, Int, ZonedTime)]
forall (m :: * -> *) a e.
(MonadIO m, FromRow a) =>
(MSSQLTxError -> e) -> Query -> TxET e m [a]
multiRowQueryE
      MSSQLTxError -> QErr
HGE.defaultMSSQLTxErrorHandler
      ( Text -> Query
rawUnescapedText
          [ST.st|
          SELECT trigger_name, count(1), max(scheduled_at)
          FROM hdb_catalog.hdb_event_log_cleanups
          WHERE status='scheduled' AND trigger_name =
            ANY(SELECT n from  (VALUES #{triggerNamesValues}) AS X(n))
          GROUP BY trigger_name;
        |]
      )
  where
    triggerNamesValues :: Text
triggerNamesValues = [Text] -> Text
forall a. ToTxt a => [a] -> Text
generateSQLValuesFromList ([Text] -> Text) -> [Text] -> Text
forall a b. (a -> b) -> a -> b
$ (TriggerName -> Text) -> [TriggerName] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map TriggerName -> Text
triggerNameToTxt [TriggerName]
triggerNames

deleteAllScheduledCleanupsTx :: TriggerName -> TxE QErr ()
deleteAllScheduledCleanupsTx :: TriggerName -> TxET QErr IO ()
deleteAllScheduledCleanupsTx TriggerName
triggerName = do
  let triggerNameText :: Text
triggerNameText = TriggerName -> Text
triggerNameToTxt TriggerName
triggerName
  (MSSQLTxError -> QErr) -> Query -> TxET QErr IO ()
forall (m :: * -> *) e.
MonadIO m =>
(MSSQLTxError -> e) -> Query -> TxET e m ()
unitQueryE
    MSSQLTxError -> QErr
HGE.defaultMSSQLTxErrorHandler
    [ODBC.sql|
      DELETE from hdb_catalog.hdb_event_log_cleanups
      WHERE status = 'scheduled' AND trigger_name = $triggerNameText
    |]

-- | @deleteAllScheduledCleanups@ deletes all scheduled cleanup logs for a given event trigger
deleteAllScheduledCleanups ::
  (MonadIO m, MonadError QErr m) =>
  MSSQLSourceConfig ->
  TriggerName ->
  m ()
deleteAllScheduledCleanups :: forall (m :: * -> *).
(MonadIO m, MonadError QErr m) =>
MSSQLSourceConfig -> TriggerName -> m ()
deleteAllScheduledCleanups MSSQLSourceConfig
sourceConfig TriggerName
triggerName =
  m (Either QErr ()) -> m ()
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM (m (Either QErr ()) -> m ()) -> m (Either QErr ()) -> m ()
forall a b. (a -> b) -> a -> b
$ IO (Either QErr ()) -> m (Either QErr ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either QErr ()) -> m (Either QErr ()))
-> IO (Either QErr ()) -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$ MSSQLSourceConfig -> TxET QErr IO () -> IO (Either QErr ())
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
MSSQLSourceConfig -> TxET QErr m a -> m (Either QErr a)
runMSSQLSourceWriteTx MSSQLSourceConfig
sourceConfig (TxET QErr IO () -> IO (Either QErr ()))
-> TxET QErr IO () -> IO (Either QErr ())
forall a b. (a -> b) -> a -> b
$ TriggerName -> TxET QErr IO ()
deleteAllScheduledCleanupsTx TriggerName
triggerName

getCleanupEventsForDeletionTx :: TxE QErr ([(Text, TriggerName)])
getCleanupEventsForDeletionTx :: TxE QErr [(Text, TriggerName)]
getCleanupEventsForDeletionTx = do
  [(Text, TriggerName)]
latestEvents :: [(Text, TriggerName)] <-
    ((Text, Text) -> (Text, TriggerName))
-> [(Text, Text)] -> [(Text, TriggerName)]
forall a b. (a -> b) -> [a] -> [b]
map ((Text -> TriggerName) -> (Text, Text) -> (Text, TriggerName)
forall b c d. (b -> c) -> (d, b) -> (d, c)
forall (a :: * -> * -> *) b c d.
Arrow a =>
a b c -> a (d, b) (d, c)
second (NonEmptyText -> TriggerName
TriggerName (NonEmptyText -> TriggerName)
-> (Text -> NonEmptyText) -> Text -> TriggerName
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> NonEmptyText
mkNonEmptyTextUnsafe))
      ([(Text, Text)] -> [(Text, TriggerName)])
-> TxET QErr IO [(Text, Text)] -> TxE QErr [(Text, TriggerName)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (MSSQLTxError -> QErr) -> Query -> TxET QErr IO [(Text, Text)]
forall (m :: * -> *) a e.
(MonadIO m, FromRow a) =>
(MSSQLTxError -> e) -> Query -> TxET e m [a]
multiRowQueryE
        MSSQLTxError -> QErr
HGE.defaultMSSQLTxErrorHandler
        Query
[ODBC.sql|
          select CAST(id AS nvarchar(36)), trigger_name
          from(
              SELECT id, trigger_name, ROW_NUMBER()
              OVER(PARTITION BY trigger_name ORDER BY scheduled_at DESC) AS rn
              FROM hdb_catalog.hdb_event_log_cleanups
              WHERE status = 'scheduled' AND scheduled_at < SYSDATETIMEOFFSET() AT TIME ZONE 'UTC'
          ) AS a
          WHERE rn = 1
        |]
  let cleanupIDs :: [Text]
cleanupIDs = ((Text, TriggerName) -> Text) -> [(Text, TriggerName)] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map (Text, TriggerName) -> Text
forall a b. (a, b) -> a
fst [(Text, TriggerName)]
latestEvents
      cleanupIDsSQLValue :: Text
cleanupIDsSQLValue = [Text] -> Text
forall a. ToTxt a => [a] -> Text
generateSQLValuesFromList [Text]
cleanupIDs
  Bool -> TxET QErr IO () -> TxET QErr IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless ([Text] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [Text]
cleanupIDs) (TxET QErr IO () -> TxET QErr IO ())
-> TxET QErr IO () -> TxET QErr IO ()
forall a b. (a -> b) -> a -> b
$ do
    [Text]
toDeadEvents <-
      (MSSQLTxError -> QErr) -> Query -> TxET QErr IO [Text]
forall (m :: * -> *) a e.
(MonadIO m, FromRow a) =>
(MSSQLTxError -> e) -> Query -> TxET e m [a]
multiRowQueryE
        MSSQLTxError -> QErr
HGE.defaultMSSQLTxErrorHandler
        ( Text -> Query
rawUnescapedText
            [ST.st|
            SELECT CAST(id AS nvarchar(36)) FROM hdb_catalog.hdb_event_log_cleanups
            WHERE status = 'scheduled' AND scheduled_at < SYSDATETIMEOFFSET() AT TIME ZONE 'UTC' AND id NOT IN
              (SELECT n from  (VALUES #{cleanupIDsSQLValue}) AS X(n));
          |]
        )
    [Text] -> TxET QErr IO ()
markCleanupEventsAsDeadTx [Text]
toDeadEvents

  [(Text, TriggerName)] -> TxE QErr [(Text, TriggerName)]
forall a. a -> TxET QErr IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure [(Text, TriggerName)]
latestEvents

-- | @getCleanupEventsForDeletion@ returns the cleanup logs that are to be deleted.
-- This will perform the following steps:
--
-- 1. Get the scheduled cleanup events that were scheduled before current time.
-- 2. If there are multiple entries for the same trigger name with different scheduled time,
--    then fetch the latest entry and mark others as dead.
getCleanupEventsForDeletion ::
  (MonadIO m, MonadError QErr m) =>
  MSSQLSourceConfig ->
  m [(Text, TriggerName)]
getCleanupEventsForDeletion :: forall (m :: * -> *).
(MonadIO m, MonadError QErr m) =>
MSSQLSourceConfig -> m [(Text, TriggerName)]
getCleanupEventsForDeletion MSSQLSourceConfig
sourceConfig =
  m (Either QErr [(Text, TriggerName)]) -> m [(Text, TriggerName)]
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM (m (Either QErr [(Text, TriggerName)]) -> m [(Text, TriggerName)])
-> m (Either QErr [(Text, TriggerName)]) -> m [(Text, TriggerName)]
forall a b. (a -> b) -> a -> b
$ IO (Either QErr [(Text, TriggerName)])
-> m (Either QErr [(Text, TriggerName)])
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either QErr [(Text, TriggerName)])
 -> m (Either QErr [(Text, TriggerName)]))
-> IO (Either QErr [(Text, TriggerName)])
-> m (Either QErr [(Text, TriggerName)])
forall a b. (a -> b) -> a -> b
$ MSSQLSourceConfig
-> TxE QErr [(Text, TriggerName)]
-> IO (Either QErr [(Text, TriggerName)])
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
MSSQLSourceConfig -> TxET QErr m a -> m (Either QErr a)
runMSSQLSourceWriteTx MSSQLSourceConfig
sourceConfig (TxE QErr [(Text, TriggerName)]
 -> IO (Either QErr [(Text, TriggerName)]))
-> TxE QErr [(Text, TriggerName)]
-> IO (Either QErr [(Text, TriggerName)])
forall a b. (a -> b) -> a -> b
$ TxE QErr [(Text, TriggerName)]
getCleanupEventsForDeletionTx

markCleanupEventsAsDeadTx :: [Text] -> TxE QErr ()
markCleanupEventsAsDeadTx :: [Text] -> TxET QErr IO ()
markCleanupEventsAsDeadTx [Text]
toDeadEvents = do
  let deadEventsValues :: Text
deadEventsValues = [Text] -> Text
forall a. ToTxt a => [a] -> Text
generateSQLValuesFromList [Text]
toDeadEvents
  Bool -> TxET QErr IO () -> TxET QErr IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless ([Text] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [Text]
toDeadEvents)
    (TxET QErr IO () -> TxET QErr IO ())
-> TxET QErr IO () -> TxET QErr IO ()
forall a b. (a -> b) -> a -> b
$ (MSSQLTxError -> QErr) -> Query -> TxET QErr IO ()
forall (m :: * -> *) e.
MonadIO m =>
(MSSQLTxError -> e) -> Query -> TxET e m ()
unitQueryE MSSQLTxError -> QErr
HGE.defaultMSSQLTxErrorHandler
    (Query -> TxET QErr IO ()) -> Query -> TxET QErr IO ()
forall a b. (a -> b) -> a -> b
$ Text -> Query
rawUnescapedText
    (Text -> Query) -> Text -> Query
forall a b. (a -> b) -> a -> b
$ [ST.st|
        UPDATE hdb_catalog.hdb_event_log_cleanups
        SET status = 'dead'
        WHERE id = ANY ( SELECT id from  (VALUES #{deadEventsValues}) AS X(id));
        |]

updateCleanupEventStatusToDead ::
  (MonadIO m, MonadError QErr m) =>
  MSSQLSourceConfig ->
  [Text] ->
  m ()
updateCleanupEventStatusToDead :: forall (m :: * -> *).
(MonadIO m, MonadError QErr m) =>
MSSQLSourceConfig -> [Text] -> m ()
updateCleanupEventStatusToDead MSSQLSourceConfig
sourceConfig [Text]
toDeadEvents =
  m (Either QErr ()) -> m ()
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM (m (Either QErr ()) -> m ()) -> m (Either QErr ()) -> m ()
forall a b. (a -> b) -> a -> b
$ IO (Either QErr ()) -> m (Either QErr ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either QErr ()) -> m (Either QErr ()))
-> IO (Either QErr ()) -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$ MSSQLSourceConfig -> TxET QErr IO () -> IO (Either QErr ())
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
MSSQLSourceConfig -> TxET QErr m a -> m (Either QErr a)
runMSSQLSourceWriteTx MSSQLSourceConfig
sourceConfig (TxET QErr IO () -> IO (Either QErr ()))
-> TxET QErr IO () -> IO (Either QErr ())
forall a b. (a -> b) -> a -> b
$ [Text] -> TxET QErr IO ()
markCleanupEventsAsDeadTx [Text]
toDeadEvents

updateCleanupEventStatusToPausedTx :: Text -> TxE QErr ()
updateCleanupEventStatusToPausedTx :: Text -> TxET QErr IO ()
updateCleanupEventStatusToPausedTx Text
cleanupLogId =
  (MSSQLTxError -> QErr) -> Query -> TxET QErr IO ()
forall (m :: * -> *) e.
MonadIO m =>
(MSSQLTxError -> e) -> Query -> TxET e m ()
unitQueryE
    MSSQLTxError -> QErr
HGE.defaultMSSQLTxErrorHandler
    [ODBC.sql|
          UPDATE hdb_catalog.hdb_event_log_cleanups
          SET status = 'paused'
          WHERE id = $cleanupLogId
          |]

-- | @updateCleanupEventStatusToPaused@ updates the cleanup log status to `paused` if the event trigger configuration is paused.
updateCleanupEventStatusToPaused ::
  (MonadIO m, MonadError QErr m) =>
  MSSQLSourceConfig ->
  Text ->
  m ()
updateCleanupEventStatusToPaused :: forall (m :: * -> *).
(MonadIO m, MonadError QErr m) =>
MSSQLSourceConfig -> Text -> m ()
updateCleanupEventStatusToPaused MSSQLSourceConfig
sourceConfig Text
cleanupLogId =
  m (Either QErr ()) -> m ()
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM (m (Either QErr ()) -> m ()) -> m (Either QErr ()) -> m ()
forall a b. (a -> b) -> a -> b
$ IO (Either QErr ()) -> m (Either QErr ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either QErr ()) -> m (Either QErr ()))
-> IO (Either QErr ()) -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$ MSSQLSourceConfig -> TxET QErr IO () -> IO (Either QErr ())
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
MSSQLSourceConfig -> TxET QErr m a -> m (Either QErr a)
runMSSQLSourceWriteTx MSSQLSourceConfig
sourceConfig (TxET QErr IO () -> IO (Either QErr ()))
-> TxET QErr IO () -> IO (Either QErr ())
forall a b. (a -> b) -> a -> b
$ Text -> TxET QErr IO ()
updateCleanupEventStatusToPausedTx Text
cleanupLogId

updateCleanupEventStatusToCompletedTx :: Text -> DeletedEventLogStats -> TxE QErr ()
updateCleanupEventStatusToCompletedTx :: Text -> DeletedEventLogStats -> TxET QErr IO ()
updateCleanupEventStatusToCompletedTx Text
cleanupLogId (DeletedEventLogStats Int
numEventLogs Int
numInvocationLogs) =
  (MSSQLTxError -> QErr) -> Query -> TxET QErr IO ()
forall (m :: * -> *) e.
MonadIO m =>
(MSSQLTxError -> e) -> Query -> TxET e m ()
unitQueryE
    MSSQLTxError -> QErr
HGE.defaultMSSQLTxErrorHandler
    [ODBC.sql|
          UPDATE hdb_catalog.hdb_event_log_cleanups
          SET status = 'completed', deleted_event_logs = $numEventLogs, deleted_event_invocation_logs = $numInvocationLogs
          WHERE id = $cleanupLogId
          |]

-- | @updateCleanupEventStatusToCompleted@ updates the cleanup log status after the event logs are deleted.
-- This will perform the following steps:
--
-- 1. Updates the cleanup config status to `completed`.
-- 2. Updates the number of event logs and event invocation logs that were deleted for a trigger name
updateCleanupEventStatusToCompleted ::
  (MonadIO m, MonadError QErr m) =>
  MSSQLSourceConfig ->
  Text ->
  DeletedEventLogStats ->
  m ()
updateCleanupEventStatusToCompleted :: forall (m :: * -> *).
(MonadIO m, MonadError QErr m) =>
MSSQLSourceConfig -> Text -> DeletedEventLogStats -> m ()
updateCleanupEventStatusToCompleted MSSQLSourceConfig
sourceConfig Text
cleanupLogId DeletedEventLogStats
delStats =
  m (Either QErr ()) -> m ()
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM (m (Either QErr ()) -> m ()) -> m (Either QErr ()) -> m ()
forall a b. (a -> b) -> a -> b
$ IO (Either QErr ()) -> m (Either QErr ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either QErr ()) -> m (Either QErr ()))
-> IO (Either QErr ()) -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$ MSSQLSourceConfig -> TxET QErr IO () -> IO (Either QErr ())
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
MSSQLSourceConfig -> TxET QErr m a -> m (Either QErr a)
runMSSQLSourceWriteTx MSSQLSourceConfig
sourceConfig (TxET QErr IO () -> IO (Either QErr ()))
-> TxET QErr IO () -> IO (Either QErr ())
forall a b. (a -> b) -> a -> b
$ Text -> DeletedEventLogStats -> TxET QErr IO ()
updateCleanupEventStatusToCompletedTx Text
cleanupLogId DeletedEventLogStats
delStats

deleteEventTriggerLogsTx :: TriggerLogCleanupConfig -> TxE QErr DeletedEventLogStats
deleteEventTriggerLogsTx :: TriggerLogCleanupConfig -> TxE QErr DeletedEventLogStats
deleteEventTriggerLogsTx TriggerLogCleanupConfig {Bool
Int
SourceName
TriggerName
tlccEventTriggerName :: TriggerName
tlccSourceName :: SourceName
tlccBatchSize :: Int
tlccClearOlderThan :: Int
tlccTimeout :: Int
tlccCleanInvocationLogs :: Bool
tlccEventTriggerName :: TriggerLogCleanupConfig -> TriggerName
tlccSourceName :: TriggerLogCleanupConfig -> SourceName
tlccBatchSize :: TriggerLogCleanupConfig -> Int
tlccClearOlderThan :: TriggerLogCleanupConfig -> Int
tlccTimeout :: TriggerLogCleanupConfig -> Int
tlccCleanInvocationLogs :: TriggerLogCleanupConfig -> Bool
..} = do
  -- Setting the timeout
  (MSSQLTxError -> QErr) -> Query -> TxET QErr IO ()
forall (m :: * -> *) e.
MonadIO m =>
(MSSQLTxError -> e) -> Query -> TxET e m ()
unitQueryE
    MSSQLTxError -> QErr
HGE.defaultMSSQLTxErrorHandler
    [ODBC.sql|
          SET LOCK_TIMEOUT $qTimeout;
        |]
  --  Select all the dead events based on criteria set in the cleanup config.
  [EventId]
deadEventIDs :: [EventId] <-
    (Text -> EventId) -> [Text] -> [EventId]
forall a b. (a -> b) -> [a] -> [b]
map Text -> EventId
EventId
      ([Text] -> [EventId])
-> TxET QErr IO [Text] -> TxET QErr IO [EventId]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (MSSQLTxError -> QErr) -> Query -> TxET QErr IO [Text]
forall (m :: * -> *) a e.
(MonadIO m, FromRow a) =>
(MSSQLTxError -> e) -> Query -> TxET e m [a]
multiRowQueryE
        MSSQLTxError -> QErr
HGE.defaultMSSQLTxErrorHandler
        [ODBC.sql|
          SELECT TOP ($qBatchSize) CAST(id AS nvarchar(36)) FROM hdb_catalog.event_log WITH (UPDLOCK, READPAST)
          WHERE ((delivered = 1 OR error = 1) AND trigger_name = $qTriggerName  )
          AND created_at < DATEADD(HOUR, - $qRetentionPeriod, SYSDATETIMEOFFSET() AT TIME ZONE 'UTC')
          AND locked IS NULL
        |]
  if [EventId] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [EventId]
deadEventIDs
    then DeletedEventLogStats -> TxE QErr DeletedEventLogStats
forall a. a -> TxET QErr IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (DeletedEventLogStats -> TxE QErr DeletedEventLogStats)
-> DeletedEventLogStats -> TxE QErr DeletedEventLogStats
forall a b. (a -> b) -> a -> b
$ Int -> Int -> DeletedEventLogStats
DeletedEventLogStats Int
0 Int
0
    else do
      let eventIdsValues :: Text
eventIdsValues = [EventId] -> Text
forall a. ToTxt a => [a] -> Text
generateSQLValuesFromList [EventId]
deadEventIDs
      --  Lock the events in the database so that other HGE instances don't pick them up for deletion.
      (MSSQLTxError -> QErr) -> Query -> TxET QErr IO ()
forall (m :: * -> *) e.
MonadIO m =>
(MSSQLTxError -> e) -> Query -> TxET e m ()
unitQueryE MSSQLTxError -> QErr
HGE.defaultMSSQLTxErrorHandler
        (Query -> TxET QErr IO ()) -> Query -> TxET QErr IO ()
forall a b. (a -> b) -> a -> b
$ Text -> Query
rawUnescapedText
        (Text -> Query) -> Text -> Query
forall a b. (a -> b) -> a -> b
$ [ST.st|
          UPDATE hdb_catalog.event_log
          SET locked = SYSDATETIMEOFFSET() AT TIME ZONE 'UTC'
          WHERE id = ANY ( SELECT id from  (VALUES #{eventIdsValues}) AS X(id))
              AND locked IS NULL
          |]
      --  Based on the config either delete the corresponding invocation logs or set trigger_name
      --  to appropriate value. Please note that the event_id won't exist anymore in the event_log
      --  table, but we are still retaining it for debugging purpose.
      [Int]
deletedInvocationLogs :: [Int] <- -- This will be an array of 1 and is only used to count the number of deleted rows.
        (MSSQLTxError -> QErr) -> Query -> TxET QErr IO [Int]
forall (m :: * -> *) a e.
(MonadIO m, FromRow a) =>
(MSSQLTxError -> e) -> Query -> TxET e m [a]
multiRowQueryE MSSQLTxError -> QErr
HGE.defaultMSSQLTxErrorHandler
          (Query -> TxET QErr IO [Int]) -> Query -> TxET QErr IO [Int]
forall a b. (a -> b) -> a -> b
$ Text -> Query
rawUnescapedText
          (Text -> Query) -> Text -> Query
forall a b. (a -> b) -> a -> b
$ if Bool
tlccCleanInvocationLogs
            then
              [ST.st|
                DELETE FROM hdb_catalog.event_invocation_logs
                OUTPUT 1
                WHERE event_id = ANY ( SELECT id from  (VALUES #{eventIdsValues}) AS X(id));
                |]
            else
              [ST.st|
                UPDATE hdb_catalog.event_invocation_logs
                SET trigger_name = '#{qTriggerName}'
                WHERE event_id = ANY ( SELECT id from  (VALUES #{eventIdsValues}) AS X(id));
                |]
      --  Finally delete the event logs.
      [Int]
deletedEventLogs :: [Int] <- -- This will be an array of 1 and is only used to count the number of deleted rows.
        (MSSQLTxError -> QErr) -> Query -> TxET QErr IO [Int]
forall (m :: * -> *) a e.
(MonadIO m, FromRow a) =>
(MSSQLTxError -> e) -> Query -> TxET e m [a]
multiRowQueryE MSSQLTxError -> QErr
HGE.defaultMSSQLTxErrorHandler
          (Query -> TxET QErr IO [Int]) -> Query -> TxET QErr IO [Int]
forall a b. (a -> b) -> a -> b
$ Text -> Query
rawUnescapedText
          (Text -> Query) -> Text -> Query
forall a b. (a -> b) -> a -> b
$ [ST.st|
            DELETE FROM hdb_catalog.event_log
            OUTPUT 1
            WHERE id = ANY ( SELECT id from  (VALUES #{eventIdsValues}) AS X(id));
            |]
      -- Removing the timeout (-1 is the default timeout)
      (MSSQLTxError -> QErr) -> Query -> TxET QErr IO ()
forall (m :: * -> *) e.
MonadIO m =>
(MSSQLTxError -> e) -> Query -> TxET e m ()
unitQueryE
        MSSQLTxError -> QErr
HGE.defaultMSSQLTxErrorHandler
        Query
[ODBC.sql|
              SET LOCK_TIMEOUT -1;
            |]
      DeletedEventLogStats -> TxE QErr DeletedEventLogStats
forall a. a -> TxET QErr IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (DeletedEventLogStats -> TxE QErr DeletedEventLogStats)
-> DeletedEventLogStats -> TxE QErr DeletedEventLogStats
forall a b. (a -> b) -> a -> b
$ Int -> Int -> DeletedEventLogStats
DeletedEventLogStats ([Int] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Int]
deletedEventLogs) ([Int] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Int]
deletedInvocationLogs)
  where
    qTimeout :: Int
qTimeout = Int
tlccTimeout Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000
    qTriggerName :: Text
qTriggerName = TriggerName -> Text
triggerNameToTxt TriggerName
tlccEventTriggerName
    qRetentionPeriod :: Int
qRetentionPeriod = Int
tlccClearOlderThan
    qBatchSize :: Int
qBatchSize = Int
tlccBatchSize

-- | @deleteEventTriggerLogs@ deletes the event logs (and event invocation logs) based on the cleanup configuration given
-- This will perform the following steps:
--
-- 1. Select all the dead events based on criteria set in the cleanup config.
-- 2. Lock the events in the database so that other HGE instances don't pick them up for deletion.
-- 3. Based on the config, perform the delete action.
deleteEventTriggerLogs ::
  (MonadIO m, MonadError QErr m) =>
  MSSQLSourceConfig ->
  TriggerLogCleanupConfig ->
  IO (Maybe (TriggerLogCleanupConfig, EventTriggerCleanupStatus)) ->
  m DeletedEventLogStats
deleteEventTriggerLogs :: forall (m :: * -> *).
(MonadIO m, MonadError QErr m) =>
MSSQLSourceConfig
-> TriggerLogCleanupConfig
-> IO (Maybe (TriggerLogCleanupConfig, EventTriggerCleanupStatus))
-> m DeletedEventLogStats
deleteEventTriggerLogs MSSQLSourceConfig
sourceConfig TriggerLogCleanupConfig
oldCleanupConfig IO (Maybe (TriggerLogCleanupConfig, EventTriggerCleanupStatus))
getLatestCleanupConfig = do
  IO (Maybe (TriggerLogCleanupConfig, EventTriggerCleanupStatus))
-> TriggerLogCleanupConfig
-> (TriggerLogCleanupConfig
    -> IO (Either QErr DeletedEventLogStats))
-> m DeletedEventLogStats
forall (m :: * -> *).
(MonadIO m, MonadError QErr m) =>
IO (Maybe (TriggerLogCleanupConfig, EventTriggerCleanupStatus))
-> TriggerLogCleanupConfig
-> (TriggerLogCleanupConfig
    -> IO (Either QErr DeletedEventLogStats))
-> m DeletedEventLogStats
deleteEventTriggerLogsInBatchesWith IO (Maybe (TriggerLogCleanupConfig, EventTriggerCleanupStatus))
getLatestCleanupConfig TriggerLogCleanupConfig
oldCleanupConfig ((TriggerLogCleanupConfig -> IO (Either QErr DeletedEventLogStats))
 -> m DeletedEventLogStats)
-> (TriggerLogCleanupConfig
    -> IO (Either QErr DeletedEventLogStats))
-> m DeletedEventLogStats
forall a b. (a -> b) -> a -> b
$ \TriggerLogCleanupConfig
cleanupConfig -> do
    MSSQLSourceConfig
-> TxE QErr DeletedEventLogStats
-> IO (Either QErr DeletedEventLogStats)
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
MSSQLSourceConfig -> TxET QErr m a -> m (Either QErr a)
runMSSQLSourceWriteTx MSSQLSourceConfig
sourceConfig (TxE QErr DeletedEventLogStats
 -> IO (Either QErr DeletedEventLogStats))
-> TxE QErr DeletedEventLogStats
-> IO (Either QErr DeletedEventLogStats)
forall a b. (a -> b) -> a -> b
$ TriggerLogCleanupConfig -> TxE QErr DeletedEventLogStats
deleteEventTriggerLogsTx TriggerLogCleanupConfig
cleanupConfig

fetchEventLogs ::
  (MonadIO m, MonadError QErr m) =>
  MSSQLSourceConfig ->
  GetEventLogs b ->
  m [EventLog]
fetchEventLogs :: forall (m :: * -> *) (b :: BackendType).
(MonadIO m, MonadError QErr m) =>
MSSQLSourceConfig -> GetEventLogs b -> m [EventLog]
fetchEventLogs MSSQLSourceConfig
sourceConfig GetEventLogs b
getEventLogs = do
  IO (Either QErr [EventLog]) -> m (Either QErr [EventLog])
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (MSSQLSourceConfig
-> TxET QErr IO [EventLog] -> IO (Either QErr [EventLog])
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
MSSQLSourceConfig -> TxET QErr m a -> m (Either QErr a)
runMSSQLSourceReadTx MSSQLSourceConfig
sourceConfig (TxET QErr IO [EventLog] -> IO (Either QErr [EventLog]))
-> TxET QErr IO [EventLog] -> IO (Either QErr [EventLog])
forall a b. (a -> b) -> a -> b
$ GetEventLogs b -> TxET QErr IO [EventLog]
forall (b :: BackendType).
GetEventLogs b -> TxET QErr IO [EventLog]
fetchEventLogsTxE GetEventLogs b
getEventLogs)
    m (Either QErr [EventLog])
-> (QErr -> m [EventLog]) -> m [EventLog]
forall (m :: * -> *) e a.
Monad m =>
m (Either e a) -> (e -> m a) -> m a
`onLeftM` (QErr -> m [EventLog]
forall a. QErr -> m a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (QErr -> m [EventLog]) -> (QErr -> QErr) -> QErr -> m [EventLog]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> QErr -> QErr
prefixQErr Text
"unexpected error while fetching event logs: ")

fetchEventLogsTxE :: GetEventLogs b -> TxE QErr [EventLog]
fetchEventLogsTxE :: forall (b :: BackendType).
GetEventLogs b -> TxET QErr IO [EventLog]
fetchEventLogsTxE GetEventLogs {Int
SourceName
EventLogStatus
TriggerName
_gelName :: TriggerName
_gelSourceName :: SourceName
_gelLimit :: Int
_gelOffset :: Int
_gelStatus :: EventLogStatus
_gelName :: forall (b :: BackendType). GetEventLogs b -> TriggerName
_gelSourceName :: forall (b :: BackendType). GetEventLogs b -> SourceName
_gelLimit :: forall (b :: BackendType). GetEventLogs b -> Int
_gelOffset :: forall (b :: BackendType). GetEventLogs b -> Int
_gelStatus :: forall (b :: BackendType). GetEventLogs b -> EventLogStatus
..} = do
  case EventLogStatus
status of
    EventLogStatus
Pending -> do
      [(ByteString, Text, Text, Text, Text, Bool, Bool, Int, ByteString,
  Maybe ByteString, Maybe ByteString, Bool)]
events <-
        (MSSQLTxError -> QErr)
-> Query
-> TxET
     QErr
     IO
     [(ByteString, Text, Text, Text, Text, Bool, Bool, Int, ByteString,
       Maybe ByteString, Maybe ByteString, Bool)]
forall (m :: * -> *) a e.
(MonadIO m, FromRow a) =>
(MSSQLTxError -> e) -> Query -> TxET e m [a]
multiRowQueryE
          MSSQLTxError -> QErr
HGE.defaultMSSQLTxErrorHandler
          [ODBC.sql|
            SELECT CONVERT(varchar(MAX), id), schema_name, table_name, trigger_name, payload, delivered, error, tries,
            CONVERT(varchar(MAX), created_at), CONVERT(varchar(MAX), locked), CONVERT(varchar(MAX), next_retry_at), archived
            FROM hdb_catalog.event_log
            WHERE trigger_name = $triggerName
            AND delivered=0 AND error=0 AND archived=0
            ORDER BY created_at DESC OFFSET $offset ROWS FETCH NEXT $limit ROWS ONLY;
          |]
      ((ByteString, Text, Text, Text, Text, Bool, Bool, Int, ByteString,
  Maybe ByteString, Maybe ByteString, Bool)
 -> TxET QErr IO EventLog)
-> [(ByteString, Text, Text, Text, Text, Bool, Bool, Int,
     ByteString, Maybe ByteString, Maybe ByteString, Bool)]
-> TxET QErr IO [EventLog]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM (ByteString, Text, Text, Text, Text, Bool, Bool, Int, ByteString,
 Maybe ByteString, Maybe ByteString, Bool)
-> TxET QErr IO EventLog
forall (m :: * -> *).
MonadError QErr m =>
(ByteString, Text, Text, Text, Text, Bool, Bool, Int, ByteString,
 Maybe ByteString, Maybe ByteString, Bool)
-> m EventLog
uncurryEventLog [(ByteString, Text, Text, Text, Text, Bool, Bool, Int, ByteString,
  Maybe ByteString, Maybe ByteString, Bool)]
events
    EventLogStatus
Processed -> do
      [(ByteString, Text, Text, Text, Text, Bool, Bool, Int, ByteString,
  Maybe ByteString, Maybe ByteString, Bool)]
events <-
        (MSSQLTxError -> QErr)
-> Query
-> TxET
     QErr
     IO
     [(ByteString, Text, Text, Text, Text, Bool, Bool, Int, ByteString,
       Maybe ByteString, Maybe ByteString, Bool)]
forall (m :: * -> *) a e.
(MonadIO m, FromRow a) =>
(MSSQLTxError -> e) -> Query -> TxET e m [a]
multiRowQueryE
          MSSQLTxError -> QErr
HGE.defaultMSSQLTxErrorHandler
          [ODBC.sql|
            SELECT CONVERT(varchar(MAX), id), schema_name, table_name, trigger_name, payload, delivered, error, tries,
            CONVERT(varchar(MAX), created_at), CONVERT(varchar(MAX), locked), CONVERT(varchar(MAX), next_retry_at), archived
            FROM hdb_catalog.event_log
            WHERE trigger_name = $triggerName
            AND (delivered=1 OR error=1) AND archived=0
            ORDER BY created_at DESC OFFSET $offset ROWS FETCH NEXT $limit ROWS ONLY;
          |]
      ((ByteString, Text, Text, Text, Text, Bool, Bool, Int, ByteString,
  Maybe ByteString, Maybe ByteString, Bool)
 -> TxET QErr IO EventLog)
-> [(ByteString, Text, Text, Text, Text, Bool, Bool, Int,
     ByteString, Maybe ByteString, Maybe ByteString, Bool)]
-> TxET QErr IO [EventLog]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM (ByteString, Text, Text, Text, Text, Bool, Bool, Int, ByteString,
 Maybe ByteString, Maybe ByteString, Bool)
-> TxET QErr IO EventLog
forall (m :: * -> *).
MonadError QErr m =>
(ByteString, Text, Text, Text, Text, Bool, Bool, Int, ByteString,
 Maybe ByteString, Maybe ByteString, Bool)
-> m EventLog
uncurryEventLog [(ByteString, Text, Text, Text, Text, Bool, Bool, Int, ByteString,
  Maybe ByteString, Maybe ByteString, Bool)]
events
    EventLogStatus
All -> do
      [(ByteString, Text, Text, Text, Text, Bool, Bool, Int, ByteString,
  Maybe ByteString, Maybe ByteString, Bool)]
events <-
        (MSSQLTxError -> QErr)
-> Query
-> TxET
     QErr
     IO
     [(ByteString, Text, Text, Text, Text, Bool, Bool, Int, ByteString,
       Maybe ByteString, Maybe ByteString, Bool)]
forall (m :: * -> *) a e.
(MonadIO m, FromRow a) =>
(MSSQLTxError -> e) -> Query -> TxET e m [a]
multiRowQueryE
          MSSQLTxError -> QErr
HGE.defaultMSSQLTxErrorHandler
          [ODBC.sql|
            SELECT CONVERT(varchar(MAX), id), schema_name, table_name, trigger_name, payload, delivered, error, tries,
            CONVERT(varchar(MAX), created_at), CONVERT(varchar(MAX), locked), CONVERT(varchar(MAX), next_retry_at), archived
            FROM hdb_catalog.event_log
            WHERE trigger_name = $triggerName
            ORDER BY created_at DESC OFFSET $offset ROWS FETCH NEXT $limit ROWS ONLY;
          |]
      ((ByteString, Text, Text, Text, Text, Bool, Bool, Int, ByteString,
  Maybe ByteString, Maybe ByteString, Bool)
 -> TxET QErr IO EventLog)
-> [(ByteString, Text, Text, Text, Text, Bool, Bool, Int,
     ByteString, Maybe ByteString, Maybe ByteString, Bool)]
-> TxET QErr IO [EventLog]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM (ByteString, Text, Text, Text, Text, Bool, Bool, Int, ByteString,
 Maybe ByteString, Maybe ByteString, Bool)
-> TxET QErr IO EventLog
forall (m :: * -> *).
MonadError QErr m =>
(ByteString, Text, Text, Text, Text, Bool, Bool, Int, ByteString,
 Maybe ByteString, Maybe ByteString, Bool)
-> m EventLog
uncurryEventLog [(ByteString, Text, Text, Text, Text, Bool, Bool, Int, ByteString,
  Maybe ByteString, Maybe ByteString, Bool)]
events
  where
    triggerName :: Text
triggerName = TriggerName -> Text
triggerNameToTxt TriggerName
_gelName
    limit :: Int
limit = Int
_gelLimit
    offset :: Int
offset = Int
_gelOffset
    status :: EventLogStatus
status = EventLogStatus
_gelStatus

fetchEventInvocationLogs ::
  (MonadError QErr m, MonadIO m) =>
  MSSQLSourceConfig ->
  GetEventInvocations b ->
  m [EventInvocationLog]
fetchEventInvocationLogs :: forall (m :: * -> *) (b :: BackendType).
(MonadError QErr m, MonadIO m) =>
MSSQLSourceConfig
-> GetEventInvocations b -> m [EventInvocationLog]
fetchEventInvocationLogs MSSQLSourceConfig
sourceConfig GetEventInvocations b
getEventInvocationLogs = do
  IO (Either QErr [EventInvocationLog])
-> m (Either QErr [EventInvocationLog])
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (MSSQLSourceConfig
-> TxET QErr IO [EventInvocationLog]
-> IO (Either QErr [EventInvocationLog])
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
MSSQLSourceConfig -> TxET QErr m a -> m (Either QErr a)
runMSSQLSourceReadTx MSSQLSourceConfig
sourceConfig (TxET QErr IO [EventInvocationLog]
 -> IO (Either QErr [EventInvocationLog]))
-> TxET QErr IO [EventInvocationLog]
-> IO (Either QErr [EventInvocationLog])
forall a b. (a -> b) -> a -> b
$ GetEventInvocations b -> TxET QErr IO [EventInvocationLog]
forall (b :: BackendType).
GetEventInvocations b -> TxET QErr IO [EventInvocationLog]
fetchEventInvocationLogsTxE GetEventInvocations b
getEventInvocationLogs)
    m (Either QErr [EventInvocationLog])
-> (QErr -> m [EventInvocationLog]) -> m [EventInvocationLog]
forall (m :: * -> *) e a.
Monad m =>
m (Either e a) -> (e -> m a) -> m a
`onLeftM` (QErr -> m [EventInvocationLog]
forall a. QErr -> m a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (QErr -> m [EventInvocationLog])
-> (QErr -> QErr) -> QErr -> m [EventInvocationLog]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> QErr -> QErr
prefixQErr Text
"unexpected error while fetching invocation logs: ")

fetchEventInvocationLogsTxE :: GetEventInvocations b -> TxE QErr [EventInvocationLog]
fetchEventInvocationLogsTxE :: forall (b :: BackendType).
GetEventInvocations b -> TxET QErr IO [EventInvocationLog]
fetchEventInvocationLogsTxE GetEventInvocations {Int
SourceName
TriggerName
_geiName :: TriggerName
_geiSourceName :: SourceName
_geiLimit :: Int
_geiOffset :: Int
_geiName :: forall (b :: BackendType). GetEventInvocations b -> TriggerName
_geiSourceName :: forall (b :: BackendType). GetEventInvocations b -> SourceName
_geiLimit :: forall (b :: BackendType). GetEventInvocations b -> Int
_geiOffset :: forall (b :: BackendType). GetEventInvocations b -> Int
..} = do
  [(ByteString, Text, ByteString, Maybe Int, Text, Text, ByteString)]
invocations <-
    (MSSQLTxError -> QErr)
-> Query
-> TxET
     QErr
     IO
     [(ByteString, Text, ByteString, Maybe Int, Text, Text, ByteString)]
forall (m :: * -> *) a e.
(MonadIO m, FromRow a) =>
(MSSQLTxError -> e) -> Query -> TxET e m [a]
multiRowQueryE
      MSSQLTxError -> QErr
HGE.defaultMSSQLTxErrorHandler
      [ODBC.sql|
        SELECT CONVERT(varchar(MAX), id), trigger_name, CONVERT(varchar(MAX), event_id),
        status, request, response, CONVERT(varchar(MAX), created_at)
        FROM hdb_catalog.event_invocation_logs
        WHERE trigger_name = $triggerName
        ORDER BY created_at DESC OFFSET $offset ROWS FETCH NEXT $limit ROWS ONLY;
      |]
  ((ByteString, Text, ByteString, Maybe Int, Text, Text, ByteString)
 -> TxET QErr IO EventInvocationLog)
-> [(ByteString, Text, ByteString, Maybe Int, Text, Text,
     ByteString)]
-> TxET QErr IO [EventInvocationLog]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM (ByteString, Text, ByteString, Maybe Int, Text, Text, ByteString)
-> TxET QErr IO EventInvocationLog
forall (m :: * -> *).
MonadError QErr m =>
(ByteString, Text, ByteString, Maybe Int, Text, Text, ByteString)
-> m EventInvocationLog
uncurryEventInvocationLog [(ByteString, Text, ByteString, Maybe Int, Text, Text, ByteString)]
invocations
  where
    triggerName :: Text
triggerName = TriggerName -> Text
triggerNameToTxt TriggerName
_geiName
    limit :: Int
limit = Int
_geiLimit
    offset :: Int
offset = Int
_geiOffset

fetchEventById ::
  (MonadError QErr m, MonadIO m) =>
  MSSQLSourceConfig ->
  GetEventById b ->
  m (EventLogWithInvocations)
fetchEventById :: forall (m :: * -> *) (b :: BackendType).
(MonadError QErr m, MonadIO m) =>
MSSQLSourceConfig -> GetEventById b -> m EventLogWithInvocations
fetchEventById MSSQLSourceConfig
sourceConfig GetEventById b
getEventById = do
  Either QErr EventLogWithInvocations
fetchEventByIdTxE' <- IO (Either QErr EventLogWithInvocations)
-> m (Either QErr EventLogWithInvocations)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either QErr EventLogWithInvocations)
 -> m (Either QErr EventLogWithInvocations))
-> IO (Either QErr EventLogWithInvocations)
-> m (Either QErr EventLogWithInvocations)
forall a b. (a -> b) -> a -> b
$ MSSQLSourceConfig
-> TxET QErr IO EventLogWithInvocations
-> IO (Either QErr EventLogWithInvocations)
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
MSSQLSourceConfig -> TxET QErr m a -> m (Either QErr a)
runMSSQLSourceReadTx MSSQLSourceConfig
sourceConfig (TxET QErr IO EventLogWithInvocations
 -> IO (Either QErr EventLogWithInvocations))
-> TxET QErr IO EventLogWithInvocations
-> IO (Either QErr EventLogWithInvocations)
forall a b. (a -> b) -> a -> b
$ GetEventById b -> TxET QErr IO EventLogWithInvocations
forall (b :: BackendType).
GetEventById b -> TxET QErr IO EventLogWithInvocations
fetchEventByIdTxE GetEventById b
getEventById
  case Either QErr EventLogWithInvocations
fetchEventByIdTxE' of
    Left QErr
err ->
      QErr -> m EventLogWithInvocations
forall a. QErr -> m a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError
        (QErr -> m EventLogWithInvocations)
-> QErr -> m EventLogWithInvocations
forall a b. (a -> b) -> a -> b
$ Text -> QErr -> QErr
prefixQErr (Text
"unexpected error while fetching event with id " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
eventId Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
": ") QErr
err
    Right EventLogWithInvocations
eventLogWithInvocations -> do
      if Maybe EventLog -> Bool
forall a. Maybe a -> Bool
isNothing (EventLogWithInvocations -> Maybe EventLog
elwiEvent EventLogWithInvocations
eventLogWithInvocations)
        then Code -> Text -> m EventLogWithInvocations
forall (m :: * -> *) a. QErrM m => Code -> Text -> m a
throw400 Code
NotExists Text
errMsg
        else EventLogWithInvocations -> m EventLogWithInvocations
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return EventLogWithInvocations
eventLogWithInvocations
  where
    eventId :: Text
eventId = EventId -> Text
unEventId (EventId -> Text) -> EventId -> Text
forall a b. (a -> b) -> a -> b
$ GetEventById b -> EventId
forall (b :: BackendType). GetEventById b -> EventId
_gebiEventId GetEventById b
getEventById
    errMsg :: Text
errMsg = Text
"event id " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
eventId Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" does not exist"

fetchEventByIdTxE :: GetEventById b -> TxE QErr (EventLogWithInvocations)
fetchEventByIdTxE :: forall (b :: BackendType).
GetEventById b -> TxET QErr IO EventLogWithInvocations
fetchEventByIdTxE GetEventById {Int
EventId
SourceName
_gebiEventId :: forall (b :: BackendType). GetEventById b -> EventId
_gebiSourceName :: SourceName
_gebiEventId :: EventId
_gebiInvocationLogLimit :: Int
_gebiInvocationLogOffset :: Int
_gebiSourceName :: forall (b :: BackendType). GetEventById b -> SourceName
_gebiInvocationLogLimit :: forall (b :: BackendType). GetEventById b -> Int
_gebiInvocationLogOffset :: forall (b :: BackendType). GetEventById b -> Int
..} = do
  [(ByteString, Text, Text, Text, Text, Bool, Bool, Int, ByteString,
  Maybe ByteString, Maybe ByteString, Bool)]
eventsQuery <-
    (MSSQLTxError -> QErr)
-> Query
-> TxET
     QErr
     IO
     [(ByteString, Text, Text, Text, Text, Bool, Bool, Int, ByteString,
       Maybe ByteString, Maybe ByteString, Bool)]
forall (m :: * -> *) a e.
(MonadIO m, FromRow a) =>
(MSSQLTxError -> e) -> Query -> TxET e m [a]
multiRowQueryE
      MSSQLTxError -> QErr
HGE.defaultMSSQLTxErrorHandler
      [ODBC.sql|
        SELECT CONVERT(varchar(MAX), id), schema_name, table_name, trigger_name, payload, delivered, error, tries,
        CONVERT(varchar(MAX), created_at), CONVERT(varchar(MAX), locked), CONVERT(varchar(MAX), next_retry_at), archived
        FROM hdb_catalog.event_log
        WHERE id = $eventId;
      |]
  [EventLog]
events <- ((ByteString, Text, Text, Text, Text, Bool, Bool, Int, ByteString,
  Maybe ByteString, Maybe ByteString, Bool)
 -> TxET QErr IO EventLog)
-> [(ByteString, Text, Text, Text, Text, Bool, Bool, Int,
     ByteString, Maybe ByteString, Maybe ByteString, Bool)]
-> TxET QErr IO [EventLog]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM (ByteString, Text, Text, Text, Text, Bool, Bool, Int, ByteString,
 Maybe ByteString, Maybe ByteString, Bool)
-> TxET QErr IO EventLog
forall (m :: * -> *).
MonadError QErr m =>
(ByteString, Text, Text, Text, Text, Bool, Bool, Int, ByteString,
 Maybe ByteString, Maybe ByteString, Bool)
-> m EventLog
uncurryEventLog [(ByteString, Text, Text, Text, Text, Bool, Bool, Int, ByteString,
  Maybe ByteString, Maybe ByteString, Bool)]
eventsQuery
  case [EventLog]
events of
    [] -> EventLogWithInvocations -> TxET QErr IO EventLogWithInvocations
forall a. a -> TxET QErr IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (EventLogWithInvocations -> TxET QErr IO EventLogWithInvocations)
-> EventLogWithInvocations -> TxET QErr IO EventLogWithInvocations
forall a b. (a -> b) -> a -> b
$ Maybe EventLog -> [EventInvocationLog] -> EventLogWithInvocations
EventLogWithInvocations Maybe EventLog
forall a. Maybe a
Nothing []
    [EventLog
event] -> do
      [(ByteString, Text, ByteString, Maybe Int, Text, Text, ByteString)]
invocationsQuery <-
        (MSSQLTxError -> QErr)
-> Query
-> TxET
     QErr
     IO
     [(ByteString, Text, ByteString, Maybe Int, Text, Text, ByteString)]
forall (m :: * -> *) a e.
(MonadIO m, FromRow a) =>
(MSSQLTxError -> e) -> Query -> TxET e m [a]
multiRowQueryE
          MSSQLTxError -> QErr
HGE.defaultMSSQLTxErrorHandler
          [ODBC.sql|
            SELECT CONVERT(varchar(MAX), id), trigger_name, CONVERT(varchar(MAX), event_id),
            status, request, response, CONVERT(varchar(MAX), created_at)
            FROM hdb_catalog.event_invocation_logs
            WHERE event_id = $eventId
            ORDER BY created_at DESC OFFSET $offset ROWS FETCH NEXT $limit ROWS ONLY;
          |]
      [EventInvocationLog]
invocations <- ((ByteString, Text, ByteString, Maybe Int, Text, Text, ByteString)
 -> TxET QErr IO EventInvocationLog)
-> [(ByteString, Text, ByteString, Maybe Int, Text, Text,
     ByteString)]
-> TxET QErr IO [EventInvocationLog]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM (ByteString, Text, ByteString, Maybe Int, Text, Text, ByteString)
-> TxET QErr IO EventInvocationLog
forall (m :: * -> *).
MonadError QErr m =>
(ByteString, Text, ByteString, Maybe Int, Text, Text, ByteString)
-> m EventInvocationLog
uncurryEventInvocationLog [(ByteString, Text, ByteString, Maybe Int, Text, Text, ByteString)]
invocationsQuery
      EventLogWithInvocations -> TxET QErr IO EventLogWithInvocations
forall a. a -> TxET QErr IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (EventLogWithInvocations -> TxET QErr IO EventLogWithInvocations)
-> EventLogWithInvocations -> TxET QErr IO EventLogWithInvocations
forall a b. (a -> b) -> a -> b
$ Maybe EventLog -> [EventInvocationLog] -> EventLogWithInvocations
EventLogWithInvocations (EventLog -> Maybe EventLog
forall a. a -> Maybe a
Just EventLog
event) [EventInvocationLog]
invocations
    [EventLog]
_ -> Text -> TxET QErr IO EventLogWithInvocations
forall (m :: * -> *) a. QErrM m => Text -> m a
throw500 (Text -> TxET QErr IO EventLogWithInvocations)
-> Text -> TxET QErr IO EventLogWithInvocations
forall a b. (a -> b) -> a -> b
$ Text
"Unexpected error: Multiple events present with event id " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
eventId
  where
    eventId :: Text
eventId = EventId -> Text
unEventId EventId
_gebiEventId
    limit :: Int
limit = Int
_gebiInvocationLogLimit
    offset :: Int
offset = Int
_gebiInvocationLogOffset

uncurryEventLog ::
  (MonadError QErr m) =>
  (B.ByteString, Text, Text, Text, Text, Bool, Bool, Int, B.ByteString, Maybe B.ByteString, Maybe B.ByteString, Bool) ->
  m EventLog
uncurryEventLog :: forall (m :: * -> *).
MonadError QErr m =>
(ByteString, Text, Text, Text, Text, Bool, Bool, Int, ByteString,
 Maybe ByteString, Maybe ByteString, Bool)
-> m EventLog
uncurryEventLog (ByteString
eventId, Text
schemaName, Text
tableName, Text
triggerName, Text
payload, Bool
delivered, Bool
isError, Int
tries, ByteString
createdAt, Maybe ByteString
locked, Maybe ByteString
nextRetryAt, Bool
archived) = do
  -- see Note [Encode Event Trigger Payload to JSON in SQL Server]
  Value
payload' <- Text -> String -> m Value
forall a (m :: * -> *).
(FromJSON a, QErrM m) =>
Text -> String -> m a
encodeJSON Text
payload String
"payload decode failed while fetching MSSQL events"
  UTCTime
createdAt' <- ByteString -> String -> m UTCTime
forall (m :: * -> *).
MonadError QErr m =>
ByteString -> String -> m UTCTime
bsToUTCTime ByteString
createdAt String
"conversion of created_at to UTCTime failed while fetching MSSQL events"
  Maybe UTCTime
locked' <- (ByteString -> m UTCTime) -> Maybe ByteString -> m (Maybe UTCTime)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Maybe a -> f (Maybe b)
traverse (ByteString -> String -> m UTCTime
forall (m :: * -> *).
MonadError QErr m =>
ByteString -> String -> m UTCTime
`bsToUTCTime` String
"conversion of locked to UTCTime failed while fetching MSSQL events") Maybe ByteString
locked
  Maybe UTCTime
nextRetryAt' <- (ByteString -> m UTCTime) -> Maybe ByteString -> m (Maybe UTCTime)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Maybe a -> f (Maybe b)
traverse (ByteString -> String -> m UTCTime
forall (m :: * -> *).
MonadError QErr m =>
ByteString -> String -> m UTCTime
`bsToUTCTime` String
"conversion of next_retry_at to UTCTime failed while fetching MSSQL events") Maybe ByteString
nextRetryAt
  EventLog -> m EventLog
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
    EventLog
      { elId :: EventId
elId = Text -> EventId
EventId (ByteString -> Text
bsToTxt ByteString
eventId),
        elSchemaName :: Text
elSchemaName = Text
schemaName,
        elTableName :: Text
elTableName = Text
tableName,
        elTriggerName :: TriggerName
elTriggerName = NonEmptyText -> TriggerName
TriggerName (Text -> NonEmptyText
mkNonEmptyTextUnsafe Text
triggerName),
        elPayload :: Value
elPayload = Value
payload',
        elDelivered :: Bool
elDelivered = Bool
delivered,
        elError :: Bool
elError = Bool
isError,
        elTries :: Int
elTries = Int
tries,
        elCreatedAt :: UTCTime
elCreatedAt = UTCTime
createdAt',
        elLocked :: Maybe UTCTime
elLocked = Maybe UTCTime
locked',
        elNextRetryAt :: Maybe UTCTime
elNextRetryAt = Maybe UTCTime
nextRetryAt',
        elArchived :: Bool
elArchived = Bool
archived
      }

uncurryEventInvocationLog ::
  (MonadError QErr m) =>
  (B.ByteString, Text, B.ByteString, Maybe Int, Text, Text, B.ByteString) ->
  m EventInvocationLog
uncurryEventInvocationLog :: forall (m :: * -> *).
MonadError QErr m =>
(ByteString, Text, ByteString, Maybe Int, Text, Text, ByteString)
-> m EventInvocationLog
uncurryEventInvocationLog (ByteString
invocationId, Text
triggerName, ByteString
eventId, Maybe Int
status, Text
request, Text
response, ByteString
createdAt) = do
  Value
request' <- Text -> String -> m Value
forall a (m :: * -> *).
(FromJSON a, QErrM m) =>
Text -> String -> m a
encodeJSON Text
request String
"request decode failed while fetching MSSQL event invocations"
  Value
response' <- Text -> String -> m Value
forall a (m :: * -> *).
(FromJSON a, QErrM m) =>
Text -> String -> m a
encodeJSON Text
response String
"response decode failed while fetching MSSQL event invocations"
  UTCTime
createdAt' <- ByteString -> String -> m UTCTime
forall (m :: * -> *).
MonadError QErr m =>
ByteString -> String -> m UTCTime
bsToUTCTime ByteString
createdAt String
"conversion of created_at to UTCTime failed while fetching MSSQL event invocations"
  EventInvocationLog -> m EventInvocationLog
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
    EventInvocationLog
      { eilId :: Text
eilId = ByteString -> Text
bsToTxt ByteString
invocationId,
        eilTriggerName :: TriggerName
eilTriggerName = NonEmptyText -> TriggerName
TriggerName (Text -> NonEmptyText
mkNonEmptyTextUnsafe Text
triggerName),
        eilEventId :: EventId
eilEventId = Text -> EventId
EventId (ByteString -> Text
bsToTxt ByteString
eventId),
        eilHttpStatus :: Maybe Int
eilHttpStatus = Maybe Int
status,
        eilRequest :: Value
eilRequest = Value
request',
        eilResponse :: Value
eilResponse = Value
response',
        eilCreatedAt :: UTCTime
eilCreatedAt = UTCTime
createdAt'
      }

{- Note [Encode Event Trigger Payload to JSON in SQL Server]

We do not have JSON datatype in SQL Server. But since in 'mkAllTriggersQ' we
ensure that all the values in the payload column of hdb_catalog.event_log is
always a JSON. We can directly decode the payload value and not worry that the
decoding will fail.

We ensure that the values in 'hd_catalog.event_log' is always a JSON is by using
the 'FOR JSON PATH' MSSQL operand when inserting value into the
'hdb_catalog.event_log' table.

-}
encodeJSON :: (J.FromJSON a, QErrM m) => Text -> String -> m a
encodeJSON :: forall a (m :: * -> *).
(FromJSON a, QErrM m) =>
Text -> String -> m a
encodeJSON Text
json String
err =
  Either String a -> (String -> m a) -> m a
forall (m :: * -> *) e a.
Applicative m =>
Either e a -> (e -> m a) -> m a
onLeft
    -- The NVARCHAR column has UTF-16 or UCS-2 encoding. Ref:
    -- https://learn.microsoft.com/en-us/sql/t-sql/data-types/nchar-and-nvarchar-transact-sql?view=sql-server-ver16#nvarchar---n--max--
    -- But JSON strings are expected to have UTF-8 encoding as per spec. Ref:
    -- https://www.rfc-editor.org/rfc/rfc8259#section-8.1 Hence it's important
    -- to encode the json into UTF-8 else the decoding of text to JSON will
    -- fail.
    (ByteString -> Either String a
forall a. FromJSON a => ByteString -> Either String a
J.eitherDecode (ByteString -> Either String a) -> ByteString -> Either String a
forall a b. (a -> b) -> a -> b
$ ByteString -> ByteString
fromStrict (ByteString -> ByteString) -> ByteString -> ByteString
forall a b. (a -> b) -> a -> b
$ Text -> ByteString
TE.encodeUtf8 Text
json)
    (\String
_ -> Text -> m a
forall (m :: * -> *) a. QErrM m => Text -> m a
throw500 (Text -> m a) -> Text -> m a
forall a b. (a -> b) -> a -> b
$ String -> Text
T.pack String
err)

-- | UTCTime type is used to store all the time related information pertaining
-- to  event triggers (i.e `created_at`, `locked` and `next_retry_at`).  The ODBC
-- server does not have a FromJSON instance of UTCTime datatype. This mean the
-- direct conversion of the "time related data" which ODBC server fetches to
-- UTCTime is not possible.
--
-- As a workaround, we cast the data from ODBC server to Bytestring and then use
-- the `readEither` to parse that bytestring to UTCTime.
--
-- We make sure that the parse will never fail, by ensuring that values present
-- in the `created_at`, `locked` and `next_retry_at` columns are always in UTC
-- Time.
bsToUTCTime :: (MonadError QErr m) => B.ByteString -> String -> m UTCTime
bsToUTCTime :: forall (m :: * -> *).
MonadError QErr m =>
ByteString -> String -> m UTCTime
bsToUTCTime ByteString
timeInByteString String
err =
  Either String UTCTime -> (String -> m UTCTime) -> m UTCTime
forall (m :: * -> *) e a.
Applicative m =>
Either e a -> (e -> m a) -> m a
onLeft
    (String -> Either String UTCTime
forall a. Read a => String -> Either String a
readEither (Text -> String
T.unpack (Text -> String) -> Text -> String
forall a b. (a -> b) -> a -> b
$ ByteString -> Text
bsToTxt ByteString
timeInByteString) :: Either String UTCTime)
    (\String
_ -> Text -> m UTCTime
forall (m :: * -> *) a. QErrM m => Text -> m a
throw500 (Text -> m UTCTime) -> Text -> m UTCTime
forall a b. (a -> b) -> a -> b
$ String -> Text
T.pack String
err)