{-# LANGUAGE PatternSynonyms #-}
{-# LANGUAGE QuasiQuotes #-}

-- |
-- = Scheduled Triggers
--
-- This module implements the functionality of invoking webhooks during specified
-- time events aka scheduled events. The scheduled events are the events generated
-- by the graphql-engine using the cron triggers or/and a scheduled event can
-- be created by the user at a specified time with the payload, webhook, headers
-- and the retry configuration. Scheduled events are modeled using rows in Postgres
-- with a @timestamp@ column.
--
-- This module implements scheduling and delivery of scheduled
-- events:
--
-- 1. Scheduling a cron event involves creating new cron events. New
-- cron events are created based on the cron schedule and the number of
-- scheduled events that are already present in the scheduled events buffer.
-- The graphql-engine computes the new scheduled events and writes them to
-- the database.(Generator)
--
-- 2. Delivering a scheduled event involves reading undelivered scheduled events
-- from the database and delivering them to the webhook server. (Processor)
--
-- The rationale behind separating the event scheduling and event delivery
-- mechanism into two different threads is that the scheduling and delivering of
-- the scheduled events are not directly dependent on each other. The generator
-- will almost always try to create scheduled events which are supposed to be
-- delivered in the future (timestamp > current_timestamp) and the processor
-- will fetch scheduled events of the past (timestamp < current_timestamp). So,
-- the set of the scheduled events generated by the generator and the processor
-- will never be the same. The point here is that they're not correlated to each
-- other. They can be split into different threads for a better performance.
--
-- == Implementation
--
-- The scheduled triggers eventing is being implemented in the metadata storage.
-- All functions that make interaction to storage system are abstracted in
-- the @'MonadMetadataStorage' class.
--
-- During the startup, two threads are started:
--
-- 1. Generator: Fetches the list of scheduled triggers from cache and generates
--    the scheduled events.
--
--     - Additional events will be generated only if there are fewer than 100
--       scheduled events.
--
--     - The upcoming events timestamp will be generated using:
--
--         - cron schedule of the scheduled trigger
--
--         - max timestamp of the scheduled events that already exist or
--           current_timestamp(when no scheduled events exist)
--
--         - The timestamp of the scheduled events is stored with timezone because
--           `SELECT NOW()` returns timestamp with timezone, so it's good to
--           compare two things of the same type.
--
--     This effectively corresponds to doing an INSERT with values containing
--     specific timestamp.
--
-- 2. Processor: Fetches the undelivered cron events and the scheduled events
--    from the database and which have timestamp lesser than the
--    current timestamp and then process them.
--
-- TODO
-- - Consider and document ordering guarantees
--   - do we have any in the presence of multiple hasura instances?
-- - If we have nothing useful to say about ordering, then consider processing
--   events asynchronously, so that a slow webhook doesn't cause everything
--   subsequent to be delayed
module Hasura.Eventing.ScheduledTrigger
  ( runCronEventsGenerator,
    processScheduledTriggers,
    generateScheduleTimes,
    CronEventSeed (..),
    LockedEventsCtx (..),

    -- * Database interactions

    -- Following function names are similar to those present in
    -- 'MonadMetadataStorage' type class. To avoid duplication,
    -- 'Tx' is suffixed to identify as database transactions
    getDeprivedCronTriggerStatsTx,
    getScheduledEventsForDeliveryTx,
    insertInvocationTx,
    setScheduledEventOpTx,
    unlockScheduledEventsTx,
    unlockAllLockedScheduledEventsTx,
    insertCronEventsTx,
    insertOneOffScheduledEventTx,
    dropFutureCronEventsTx,
    getOneOffScheduledEventsTx,
    getCronEventsTx,
    deleteScheduledEventTx,
    getInvocationsTx,
    getInvocationsQuery,
    getInvocationsQueryNoPagination,

    -- * Export utility functions which are useful to build

    -- SQLs for fetching data from metadata storage
    mkScheduledEventStatusFilter,
    scheduledTimeOrderBy,
    mkPaginationSelectExp,
    withCount,
    invocationFieldExtractors,
    mkEventIdBoolExp,
    EventTables (..),
  )
where

import Control.Arrow.Extended (dup)
import Control.Concurrent.Extended (Forever (..), sleep)
import Control.Concurrent.STM
import Control.Lens (view)
import Data.Aeson qualified as J
import Data.Environment qualified as Env
import Data.Has
import Data.HashMap.Strict qualified as Map
import Data.Int (Int64)
import Data.List (unfoldr)
import Data.List.NonEmpty qualified as NE
import Data.SerializableBlob qualified as SB
import Data.Set qualified as Set
import Data.Time.Clock
import Data.URL.Template (printURLTemplate)
import Database.PG.Query qualified as Q
import Hasura.Backends.Postgres.Execute.Types
import Hasura.Backends.Postgres.SQL.DML qualified as S
import Hasura.Backends.Postgres.SQL.Types
import Hasura.Base.Error
import Hasura.Eventing.Common
import Hasura.Eventing.HTTP
import Hasura.Eventing.ScheduledTrigger.Types
import Hasura.HTTP (getHTTPExceptionStatus)
import Hasura.Logging qualified as L
import Hasura.Metadata.Class
import Hasura.Prelude
import Hasura.RQL.DDL.EventTrigger (getHeaderInfosFromConf)
import Hasura.RQL.DDL.Headers
import Hasura.RQL.DDL.Webhook.Transform
import Hasura.RQL.Types.Common
import Hasura.RQL.Types.EventTrigger
import Hasura.RQL.Types.Eventing
import Hasura.RQL.Types.Numeric qualified as Numeric
import Hasura.RQL.Types.ScheduledTrigger
import Hasura.RQL.Types.SchemaCache
import Hasura.SQL.Types
import Hasura.Tracing qualified as Tracing
import Network.HTTP.Client.Transformable qualified as HTTP
import System.Cron
import Text.Builder qualified as TB

-- | runCronEventsGenerator makes sure that all the cron triggers
--   have an adequate buffer of cron events.
runCronEventsGenerator ::
  ( MonadIO m,
    MonadMetadataStorage (MetadataStorageT m)
  ) =>
  L.Logger L.Hasura ->
  IO SchemaCache ->
  m void
runCronEventsGenerator :: Logger Hasura -> IO SchemaCache -> m void
runCronEventsGenerator Logger Hasura
logger IO SchemaCache
getSC = do
  m () -> m void
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m void) -> m () -> m void
forall a b. (a -> b) -> a -> b
$ do
    SchemaCache
sc <- IO SchemaCache -> m SchemaCache
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO SchemaCache
getSC
    -- get cron triggers from cache
    let cronTriggersCache :: HashMap TriggerName CronTriggerInfo
cronTriggersCache = SchemaCache -> HashMap TriggerName CronTriggerInfo
scCronTriggers SchemaCache
sc

    Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (HashMap TriggerName CronTriggerInfo -> Bool
forall k v. HashMap k v -> Bool
Map.null HashMap TriggerName CronTriggerInfo
cronTriggersCache) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
      -- Poll the DB only when there's at-least one cron trigger present
      -- in the schema cache
      -- get cron trigger stats from db
      -- When shutdown is initiated, we stop generating new cron events
      Either QErr ()
eitherRes <- MetadataStorageT m () -> m (Either QErr ())
forall (m :: * -> *) a. MetadataStorageT m a -> m (Either QErr a)
runMetadataStorageT (MetadataStorageT m () -> m (Either QErr ()))
-> MetadataStorageT m () -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$ do
        [CronTriggerStats]
deprivedCronTriggerStats <- [TriggerName] -> MetadataStorageT m [CronTriggerStats]
forall (m :: * -> *).
MonadMetadataStorage m =>
[TriggerName] -> m [CronTriggerStats]
getDeprivedCronTriggerStats ([TriggerName] -> MetadataStorageT m [CronTriggerStats])
-> [TriggerName] -> MetadataStorageT m [CronTriggerStats]
forall a b. (a -> b) -> a -> b
$ HashMap TriggerName CronTriggerInfo -> [TriggerName]
forall k v. HashMap k v -> [k]
Map.keys HashMap TriggerName CronTriggerInfo
cronTriggersCache
        -- join stats with cron triggers and produce @[(CronTriggerInfo, CronTriggerStats)]@
        [(CronTriggerInfo, CronTriggerStats)]
cronTriggersForHydrationWithStats <-
          [Maybe (CronTriggerInfo, CronTriggerStats)]
-> [(CronTriggerInfo, CronTriggerStats)]
forall (f :: * -> *) a. Filterable f => f (Maybe a) -> f a
catMaybes
            ([Maybe (CronTriggerInfo, CronTriggerStats)]
 -> [(CronTriggerInfo, CronTriggerStats)])
-> MetadataStorageT m [Maybe (CronTriggerInfo, CronTriggerStats)]
-> MetadataStorageT m [(CronTriggerInfo, CronTriggerStats)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (CronTriggerStats
 -> MetadataStorageT m (Maybe (CronTriggerInfo, CronTriggerStats)))
-> [CronTriggerStats]
-> MetadataStorageT m [Maybe (CronTriggerInfo, CronTriggerStats)]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM (HashMap TriggerName CronTriggerInfo
-> CronTriggerStats
-> MetadataStorageT m (Maybe (CronTriggerInfo, CronTriggerStats))
withCronTrigger HashMap TriggerName CronTriggerInfo
cronTriggersCache) [CronTriggerStats]
deprivedCronTriggerStats
        [(CronTriggerInfo, CronTriggerStats)] -> MetadataStorageT m ()
forall (m :: * -> *).
MonadMetadataStorage m =>
[(CronTriggerInfo, CronTriggerStats)] -> m ()
insertCronEventsFor [(CronTriggerInfo, CronTriggerStats)]
cronTriggersForHydrationWithStats

      Either QErr () -> (QErr -> m ()) -> m ()
forall (m :: * -> *) e a.
Applicative m =>
Either e a -> (e -> m a) -> m a
onLeft Either QErr ()
eitherRes ((QErr -> m ()) -> m ()) -> (QErr -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ Logger Hasura
-> forall a (m :: * -> *).
   (ToEngineLog a Hasura, MonadIO m) =>
   a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
   (ToEngineLog a impl, MonadIO m) =>
   a -> m ()
L.unLogger Logger Hasura
logger (ScheduledTriggerInternalErr -> m ())
-> (QErr -> ScheduledTriggerInternalErr) -> QErr -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. QErr -> ScheduledTriggerInternalErr
ScheduledTriggerInternalErr

    -- See discussion: https://github.com/hasura/graphql-engine-mono/issues/1001
    IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ DiffTime -> IO ()
sleep (Minutes -> DiffTime
minutes Minutes
1)
  where
    withCronTrigger :: HashMap TriggerName CronTriggerInfo
-> CronTriggerStats
-> MetadataStorageT m (Maybe (CronTriggerInfo, CronTriggerStats))
withCronTrigger HashMap TriggerName CronTriggerInfo
cronTriggerCache CronTriggerStats
cronTriggerStat = do
      case TriggerName
-> HashMap TriggerName CronTriggerInfo -> Maybe CronTriggerInfo
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
Map.lookup (CronTriggerStats -> TriggerName
_ctsName CronTriggerStats
cronTriggerStat) HashMap TriggerName CronTriggerInfo
cronTriggerCache of
        Maybe CronTriggerInfo
Nothing -> do
          Logger Hasura
-> forall a (m :: * -> *).
   (ToEngineLog a Hasura, MonadIO m) =>
   a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
   (ToEngineLog a impl, MonadIO m) =>
   a -> m ()
L.unLogger Logger Hasura
logger (ScheduledTriggerInternalErr -> MetadataStorageT m ())
-> ScheduledTriggerInternalErr -> MetadataStorageT m ()
forall a b. (a -> b) -> a -> b
$
            QErr -> ScheduledTriggerInternalErr
ScheduledTriggerInternalErr (QErr -> ScheduledTriggerInternalErr)
-> QErr -> ScheduledTriggerInternalErr
forall a b. (a -> b) -> a -> b
$
              Code -> Text -> QErr
err500 Code
Unexpected Text
"could not find scheduled trigger in the schema cache"
          Maybe (CronTriggerInfo, CronTriggerStats)
-> MetadataStorageT m (Maybe (CronTriggerInfo, CronTriggerStats))
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (CronTriggerInfo, CronTriggerStats)
forall a. Maybe a
Nothing
        Just CronTriggerInfo
cronTrigger ->
          Maybe (CronTriggerInfo, CronTriggerStats)
-> MetadataStorageT m (Maybe (CronTriggerInfo, CronTriggerStats))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe (CronTriggerInfo, CronTriggerStats)
 -> MetadataStorageT m (Maybe (CronTriggerInfo, CronTriggerStats)))
-> Maybe (CronTriggerInfo, CronTriggerStats)
-> MetadataStorageT m (Maybe (CronTriggerInfo, CronTriggerStats))
forall a b. (a -> b) -> a -> b
$
            (CronTriggerInfo, CronTriggerStats)
-> Maybe (CronTriggerInfo, CronTriggerStats)
forall a. a -> Maybe a
Just (CronTriggerInfo
cronTrigger, CronTriggerStats
cronTriggerStat)

insertCronEventsFor ::
  (MonadMetadataStorage m) =>
  [(CronTriggerInfo, CronTriggerStats)] ->
  m ()
insertCronEventsFor :: [(CronTriggerInfo, CronTriggerStats)] -> m ()
insertCronEventsFor [(CronTriggerInfo, CronTriggerStats)]
cronTriggersWithStats = do
  let scheduledEvents :: [CronEventSeed]
scheduledEvents = (((CronTriggerInfo, CronTriggerStats) -> [CronEventSeed])
 -> [(CronTriggerInfo, CronTriggerStats)] -> [CronEventSeed])
-> [(CronTriggerInfo, CronTriggerStats)]
-> ((CronTriggerInfo, CronTriggerStats) -> [CronEventSeed])
-> [CronEventSeed]
forall a b c. (a -> b -> c) -> b -> a -> c
flip ((CronTriggerInfo, CronTriggerStats) -> [CronEventSeed])
-> [(CronTriggerInfo, CronTriggerStats)] -> [CronEventSeed]
forall (t :: * -> *) a b. Foldable t => (a -> [b]) -> t a -> [b]
concatMap [(CronTriggerInfo, CronTriggerStats)]
cronTriggersWithStats (((CronTriggerInfo, CronTriggerStats) -> [CronEventSeed])
 -> [CronEventSeed])
-> ((CronTriggerInfo, CronTriggerStats) -> [CronEventSeed])
-> [CronEventSeed]
forall a b. (a -> b) -> a -> b
$ \(CronTriggerInfo
cti, CronTriggerStats
stats) ->
        UTCTime -> CronTriggerInfo -> [CronEventSeed]
generateCronEventsFrom (CronTriggerStats -> UTCTime
_ctsMaxScheduledTime CronTriggerStats
stats) CronTriggerInfo
cti
  case [CronEventSeed]
scheduledEvents of
    [] -> () -> m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    [CronEventSeed]
events -> [CronEventSeed] -> m ()
forall (m :: * -> *).
MonadMetadataStorage m =>
[CronEventSeed] -> m ()
insertCronEvents [CronEventSeed]
events

generateCronEventsFrom :: UTCTime -> CronTriggerInfo -> [CronEventSeed]
generateCronEventsFrom :: UTCTime -> CronTriggerInfo -> [CronEventSeed]
generateCronEventsFrom UTCTime
startTime CronTriggerInfo {[EventHeaderInfo]
Maybe Value
Maybe Text
Maybe MetadataResponseTransform
Maybe RequestTransform
CronSchedule
EnvRecord ResolvedWebhook
TriggerName
STRetryConf
ctiResponseTransform :: CronTriggerInfo -> Maybe MetadataResponseTransform
ctiRequestTransform :: CronTriggerInfo -> Maybe RequestTransform
ctiComment :: CronTriggerInfo -> Maybe Text
ctiHeaders :: CronTriggerInfo -> [EventHeaderInfo]
ctiWebhookInfo :: CronTriggerInfo -> EnvRecord ResolvedWebhook
ctiRetryConf :: CronTriggerInfo -> STRetryConf
ctiPayload :: CronTriggerInfo -> Maybe Value
ctiSchedule :: CronTriggerInfo -> CronSchedule
ctiName :: CronTriggerInfo -> TriggerName
ctiResponseTransform :: Maybe MetadataResponseTransform
ctiRequestTransform :: Maybe RequestTransform
ctiComment :: Maybe Text
ctiHeaders :: [EventHeaderInfo]
ctiWebhookInfo :: EnvRecord ResolvedWebhook
ctiRetryConf :: STRetryConf
ctiPayload :: Maybe Value
ctiSchedule :: CronSchedule
ctiName :: TriggerName
..} =
  (UTCTime -> CronEventSeed) -> [UTCTime] -> [CronEventSeed]
forall a b. (a -> b) -> [a] -> [b]
map (TriggerName -> UTCTime -> CronEventSeed
CronEventSeed TriggerName
ctiName) ([UTCTime] -> [CronEventSeed]) -> [UTCTime] -> [CronEventSeed]
forall a b. (a -> b) -> a -> b
$
    -- generate next 100 events; see getDeprivedCronTriggerStatsTx:
    UTCTime -> Int -> CronSchedule -> [UTCTime]
generateScheduleTimes UTCTime
startTime Int
100 CronSchedule
ctiSchedule

-- | Generates next @n events starting @from according to 'CronSchedule'
generateScheduleTimes :: UTCTime -> Int -> CronSchedule -> [UTCTime]
generateScheduleTimes :: UTCTime -> Int -> CronSchedule -> [UTCTime]
generateScheduleTimes UTCTime
from Int
n CronSchedule
cron = Int -> [UTCTime] -> [UTCTime]
forall a. Int -> [a] -> [a]
take Int
n ([UTCTime] -> [UTCTime]) -> [UTCTime] -> [UTCTime]
forall a b. (a -> b) -> a -> b
$ UTCTime -> [UTCTime]
go UTCTime
from
  where
    go :: UTCTime -> [UTCTime]
go = (UTCTime -> Maybe (UTCTime, UTCTime)) -> UTCTime -> [UTCTime]
forall b a. (b -> Maybe (a, b)) -> b -> [a]
unfoldr ((UTCTime -> (UTCTime, UTCTime))
-> Maybe UTCTime -> Maybe (UTCTime, UTCTime)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap UTCTime -> (UTCTime, UTCTime)
forall (arr :: * -> * -> *) a. Arrow arr => arr a (a, a)
dup (Maybe UTCTime -> Maybe (UTCTime, UTCTime))
-> (UTCTime -> Maybe UTCTime)
-> UTCTime
-> Maybe (UTCTime, UTCTime)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. CronSchedule -> UTCTime -> Maybe UTCTime
nextMatch CronSchedule
cron)

processCronEvents ::
  ( MonadIO m,
    Tracing.HasReporter m,
    MonadMetadataStorage (MetadataStorageT m)
  ) =>
  L.Logger L.Hasura ->
  HTTP.Manager ->
  [CronEvent] ->
  IO SchemaCache ->
  TVar (Set.Set CronEventId) ->
  m ()
processCronEvents :: Logger Hasura
-> Manager
-> [CronEvent]
-> IO SchemaCache
-> TVar (Set CronEventId)
-> m ()
processCronEvents Logger Hasura
logger Manager
httpMgr [CronEvent]
cronEvents IO SchemaCache
getSC TVar (Set CronEventId)
lockedCronEvents = do
  HashMap TriggerName CronTriggerInfo
cronTriggersInfo <- SchemaCache -> HashMap TriggerName CronTriggerInfo
scCronTriggers (SchemaCache -> HashMap TriggerName CronTriggerInfo)
-> m SchemaCache -> m (HashMap TriggerName CronTriggerInfo)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO SchemaCache -> m SchemaCache
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO SchemaCache
getSC
  -- save the locked cron events that have been fetched from the
  -- database, the events stored here will be unlocked in case a
  -- graceful shutdown is initiated in midst of processing these events
  [CronEventId] -> TVar (Set CronEventId) -> m ()
forall (m :: * -> *).
MonadIO m =>
[CronEventId] -> TVar (Set CronEventId) -> m ()
saveLockedEvents ((CronEvent -> CronEventId) -> [CronEvent] -> [CronEventId]
forall a b. (a -> b) -> [a] -> [b]
map CronEvent -> CronEventId
_ceId [CronEvent]
cronEvents) TVar (Set CronEventId)
lockedCronEvents
  -- The `createdAt` of a cron event is the `created_at` of the cron trigger
  [CronEvent] -> (CronEvent -> m ()) -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ [CronEvent]
cronEvents ((CronEvent -> m ()) -> m ()) -> (CronEvent -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \(CronEvent CronEventId
id' TriggerName
name UTCTime
st Text
_ Int
tries UTCTime
_ Maybe UTCTime
_) -> do
    case TriggerName
-> HashMap TriggerName CronTriggerInfo -> Maybe CronTriggerInfo
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
Map.lookup TriggerName
name HashMap TriggerName CronTriggerInfo
cronTriggersInfo of
      Maybe CronTriggerInfo
Nothing ->
        QErr -> m ()
logInternalError (QErr -> m ()) -> QErr -> m ()
forall a b. (a -> b) -> a -> b
$
          Code -> Text -> QErr
err500 Code
Unexpected Text
"could not find cron trigger in cache"
      Just CronTriggerInfo {[EventHeaderInfo]
Maybe Value
Maybe Text
Maybe MetadataResponseTransform
Maybe RequestTransform
CronSchedule
EnvRecord ResolvedWebhook
TriggerName
STRetryConf
ctiResponseTransform :: Maybe MetadataResponseTransform
ctiRequestTransform :: Maybe RequestTransform
ctiComment :: Maybe Text
ctiHeaders :: [EventHeaderInfo]
ctiWebhookInfo :: EnvRecord ResolvedWebhook
ctiRetryConf :: STRetryConf
ctiPayload :: Maybe Value
ctiSchedule :: CronSchedule
ctiName :: TriggerName
ctiResponseTransform :: CronTriggerInfo -> Maybe MetadataResponseTransform
ctiRequestTransform :: CronTriggerInfo -> Maybe RequestTransform
ctiComment :: CronTriggerInfo -> Maybe Text
ctiHeaders :: CronTriggerInfo -> [EventHeaderInfo]
ctiWebhookInfo :: CronTriggerInfo -> EnvRecord ResolvedWebhook
ctiRetryConf :: CronTriggerInfo -> STRetryConf
ctiPayload :: CronTriggerInfo -> Maybe Value
ctiSchedule :: CronTriggerInfo -> CronSchedule
ctiName :: CronTriggerInfo -> TriggerName
..} -> do
        let payload :: ScheduledEventWebhookPayload
payload =
              CronEventId
-> Maybe TriggerName
-> UTCTime
-> Value
-> Maybe Text
-> Maybe UTCTime
-> Maybe RequestTransform
-> Maybe MetadataResponseTransform
-> ScheduledEventWebhookPayload
ScheduledEventWebhookPayload
                CronEventId
id'
                (TriggerName -> Maybe TriggerName
forall a. a -> Maybe a
Just TriggerName
name)
                UTCTime
st
                (Value -> Maybe Value -> Value
forall a. a -> Maybe a -> a
fromMaybe Value
J.Null Maybe Value
ctiPayload)
                Maybe Text
ctiComment
                Maybe UTCTime
forall a. Maybe a
Nothing
                Maybe RequestTransform
ctiRequestTransform
                Maybe MetadataResponseTransform
ctiResponseTransform
            retryCtx :: RetryContext
retryCtx = Int -> STRetryConf -> RetryContext
RetryContext Int
tries STRetryConf
ctiRetryConf
        Either QErr ()
finally <-
          MetadataStorageT m () -> m (Either QErr ())
forall (m :: * -> *) a. MetadataStorageT m a -> m (Either QErr a)
runMetadataStorageT (MetadataStorageT m () -> m (Either QErr ()))
-> MetadataStorageT m () -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$
            (ReaderT (Logger Hasura, Manager) (MetadataStorageT m) ()
 -> (Logger Hasura, Manager) -> MetadataStorageT m ())
-> (Logger Hasura, Manager)
-> ReaderT (Logger Hasura, Manager) (MetadataStorageT m) ()
-> MetadataStorageT m ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip ReaderT (Logger Hasura, Manager) (MetadataStorageT m) ()
-> (Logger Hasura, Manager) -> MetadataStorageT m ()
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT (Logger Hasura
logger, Manager
httpMgr) (ReaderT (Logger Hasura, Manager) (MetadataStorageT m) ()
 -> MetadataStorageT m ())
-> ReaderT (Logger Hasura, Manager) (MetadataStorageT m) ()
-> MetadataStorageT m ()
forall a b. (a -> b) -> a -> b
$
              CronEventId
-> [EventHeaderInfo]
-> RetryContext
-> ScheduledEventWebhookPayload
-> EnvRecord ResolvedWebhook
-> ScheduledEventType
-> ReaderT (Logger Hasura, Manager) (MetadataStorageT m) ()
forall r (m :: * -> *).
(MonadReader r m, Has Manager r, Has (Logger Hasura) r, MonadIO m,
 HasReporter m, MonadMetadataStorage m) =>
CronEventId
-> [EventHeaderInfo]
-> RetryContext
-> ScheduledEventWebhookPayload
-> EnvRecord ResolvedWebhook
-> ScheduledEventType
-> m ()
processScheduledEvent
                CronEventId
id'
                [EventHeaderInfo]
ctiHeaders
                RetryContext
retryCtx
                ScheduledEventWebhookPayload
payload
                EnvRecord ResolvedWebhook
ctiWebhookInfo
                ScheduledEventType
Cron
        CronEventId -> TVar (Set CronEventId) -> m ()
forall (m :: * -> *).
MonadIO m =>
CronEventId -> TVar (Set CronEventId) -> m ()
removeEventFromLockedEvents CronEventId
id' TVar (Set CronEventId)
lockedCronEvents
        Either QErr () -> (QErr -> m ()) -> m ()
forall (m :: * -> *) e a.
Applicative m =>
Either e a -> (e -> m a) -> m a
onLeft Either QErr ()
finally QErr -> m ()
logInternalError
  where
    logInternalError :: QErr -> m ()
logInternalError QErr
err = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ())
-> (ScheduledTriggerInternalErr -> IO ())
-> ScheduledTriggerInternalErr
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Logger Hasura
-> forall a (m :: * -> *).
   (ToEngineLog a Hasura, MonadIO m) =>
   a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
   (ToEngineLog a impl, MonadIO m) =>
   a -> m ()
L.unLogger Logger Hasura
logger (ScheduledTriggerInternalErr -> m ())
-> ScheduledTriggerInternalErr -> m ()
forall a b. (a -> b) -> a -> b
$ QErr -> ScheduledTriggerInternalErr
ScheduledTriggerInternalErr QErr
err

processOneOffScheduledEvents ::
  ( MonadIO m,
    Tracing.HasReporter m,
    MonadMetadataStorage (MetadataStorageT m)
  ) =>
  Env.Environment ->
  L.Logger L.Hasura ->
  HTTP.Manager ->
  [OneOffScheduledEvent] ->
  TVar (Set.Set OneOffScheduledEventId) ->
  m ()
processOneOffScheduledEvents :: Environment
-> Logger Hasura
-> Manager
-> [OneOffScheduledEvent]
-> TVar (Set CronEventId)
-> m ()
processOneOffScheduledEvents
  Environment
env
  Logger Hasura
logger
  Manager
httpMgr
  [OneOffScheduledEvent]
oneOffEvents
  TVar (Set CronEventId)
lockedOneOffScheduledEvents = do
    -- save the locked one-off events that have been fetched from the
    -- database, the events stored here will be unlocked in case a
    -- graceful shutdown is initiated in midst of processing these events
    [CronEventId] -> TVar (Set CronEventId) -> m ()
forall (m :: * -> *).
MonadIO m =>
[CronEventId] -> TVar (Set CronEventId) -> m ()
saveLockedEvents ((OneOffScheduledEvent -> CronEventId)
-> [OneOffScheduledEvent] -> [CronEventId]
forall a b. (a -> b) -> [a] -> [b]
map OneOffScheduledEvent -> CronEventId
_ooseId [OneOffScheduledEvent]
oneOffEvents) TVar (Set CronEventId)
lockedOneOffScheduledEvents
    [OneOffScheduledEvent] -> (OneOffScheduledEvent -> m ()) -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ [OneOffScheduledEvent]
oneOffEvents ((OneOffScheduledEvent -> m ()) -> m ())
-> (OneOffScheduledEvent -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \OneOffScheduledEvent {Int
[HeaderConf]
Maybe Value
Maybe Text
Maybe UTCTime
Maybe MetadataResponseTransform
Maybe RequestTransform
Text
UTCTime
CronEventId
InputWebhook
STRetryConf
_ooseResponseTransform :: OneOffScheduledEvent -> Maybe MetadataResponseTransform
_ooseRequestTransform :: OneOffScheduledEvent -> Maybe RequestTransform
_ooseComment :: OneOffScheduledEvent -> Maybe Text
_ooseNextRetryAt :: OneOffScheduledEvent -> Maybe UTCTime
_ooseCreatedAt :: OneOffScheduledEvent -> UTCTime
_ooseTries :: OneOffScheduledEvent -> Int
_ooseStatus :: OneOffScheduledEvent -> Text
_ooseHeaderConf :: OneOffScheduledEvent -> [HeaderConf]
_oosePayload :: OneOffScheduledEvent -> Maybe Value
_ooseRetryConf :: OneOffScheduledEvent -> STRetryConf
_ooseScheduledTime :: OneOffScheduledEvent -> UTCTime
_ooseWebhookConf :: OneOffScheduledEvent -> InputWebhook
_ooseResponseTransform :: Maybe MetadataResponseTransform
_ooseRequestTransform :: Maybe RequestTransform
_ooseComment :: Maybe Text
_ooseNextRetryAt :: Maybe UTCTime
_ooseCreatedAt :: UTCTime
_ooseTries :: Int
_ooseStatus :: Text
_ooseHeaderConf :: [HeaderConf]
_oosePayload :: Maybe Value
_ooseRetryConf :: STRetryConf
_ooseScheduledTime :: UTCTime
_ooseWebhookConf :: InputWebhook
_ooseId :: CronEventId
_ooseId :: OneOffScheduledEvent -> CronEventId
..} -> do
      ((QErr -> m ()) -> (() -> m ()) -> Either QErr () -> m ()
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either QErr -> m ()
logInternalError () -> m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure) (Either QErr () -> m ()) -> m (Either QErr ()) -> m ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< MetadataStorageT m () -> m (Either QErr ())
forall (m :: * -> *) a. MetadataStorageT m a -> m (Either QErr a)
runMetadataStorageT do
        ResolvedWebhook
webhookInfo <- Environment -> InputWebhook -> MetadataStorageT m ResolvedWebhook
forall (m :: * -> *).
QErrM m =>
Environment -> InputWebhook -> m ResolvedWebhook
resolveWebhook Environment
env InputWebhook
_ooseWebhookConf
        [EventHeaderInfo]
headerInfo <- Environment -> [HeaderConf] -> MetadataStorageT m [EventHeaderInfo]
forall (m :: * -> *).
QErrM m =>
Environment -> [HeaderConf] -> m [EventHeaderInfo]
getHeaderInfosFromConf Environment
env [HeaderConf]
_ooseHeaderConf

        let payload :: ScheduledEventWebhookPayload
payload =
              CronEventId
-> Maybe TriggerName
-> UTCTime
-> Value
-> Maybe Text
-> Maybe UTCTime
-> Maybe RequestTransform
-> Maybe MetadataResponseTransform
-> ScheduledEventWebhookPayload
ScheduledEventWebhookPayload
                CronEventId
_ooseId
                Maybe TriggerName
forall a. Maybe a
Nothing
                UTCTime
_ooseScheduledTime
                (Value -> Maybe Value -> Value
forall a. a -> Maybe a -> a
fromMaybe Value
J.Null Maybe Value
_oosePayload)
                Maybe Text
_ooseComment
                (UTCTime -> Maybe UTCTime
forall a. a -> Maybe a
Just UTCTime
_ooseCreatedAt)
                Maybe RequestTransform
_ooseRequestTransform
                Maybe MetadataResponseTransform
_ooseResponseTransform
            retryCtx :: RetryContext
retryCtx = Int -> STRetryConf -> RetryContext
RetryContext Int
_ooseTries STRetryConf
_ooseRetryConf
            webhookEnvRecord :: EnvRecord ResolvedWebhook
webhookEnvRecord = Text -> ResolvedWebhook -> EnvRecord ResolvedWebhook
forall a. Text -> a -> EnvRecord a
EnvRecord (InputWebhook -> Text
getTemplateFromUrl InputWebhook
_ooseWebhookConf) ResolvedWebhook
webhookInfo
        (ReaderT (Logger Hasura, Manager) (MetadataStorageT m) ()
 -> (Logger Hasura, Manager) -> MetadataStorageT m ())
-> (Logger Hasura, Manager)
-> ReaderT (Logger Hasura, Manager) (MetadataStorageT m) ()
-> MetadataStorageT m ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip ReaderT (Logger Hasura, Manager) (MetadataStorageT m) ()
-> (Logger Hasura, Manager) -> MetadataStorageT m ()
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT (Logger Hasura
logger, Manager
httpMgr) (ReaderT (Logger Hasura, Manager) (MetadataStorageT m) ()
 -> MetadataStorageT m ())
-> ReaderT (Logger Hasura, Manager) (MetadataStorageT m) ()
-> MetadataStorageT m ()
forall a b. (a -> b) -> a -> b
$
          CronEventId
-> [EventHeaderInfo]
-> RetryContext
-> ScheduledEventWebhookPayload
-> EnvRecord ResolvedWebhook
-> ScheduledEventType
-> ReaderT (Logger Hasura, Manager) (MetadataStorageT m) ()
forall r (m :: * -> *).
(MonadReader r m, Has Manager r, Has (Logger Hasura) r, MonadIO m,
 HasReporter m, MonadMetadataStorage m) =>
CronEventId
-> [EventHeaderInfo]
-> RetryContext
-> ScheduledEventWebhookPayload
-> EnvRecord ResolvedWebhook
-> ScheduledEventType
-> m ()
processScheduledEvent CronEventId
_ooseId [EventHeaderInfo]
headerInfo RetryContext
retryCtx ScheduledEventWebhookPayload
payload EnvRecord ResolvedWebhook
webhookEnvRecord ScheduledEventType
OneOff
        CronEventId -> TVar (Set CronEventId) -> MetadataStorageT m ()
forall (m :: * -> *).
MonadIO m =>
CronEventId -> TVar (Set CronEventId) -> m ()
removeEventFromLockedEvents CronEventId
_ooseId TVar (Set CronEventId)
lockedOneOffScheduledEvents
    where
      logInternalError :: QErr -> m ()
logInternalError QErr
err = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ())
-> (ScheduledTriggerInternalErr -> IO ())
-> ScheduledTriggerInternalErr
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Logger Hasura
-> forall a (m :: * -> *).
   (ToEngineLog a Hasura, MonadIO m) =>
   a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
   (ToEngineLog a impl, MonadIO m) =>
   a -> m ()
L.unLogger Logger Hasura
logger (ScheduledTriggerInternalErr -> m ())
-> ScheduledTriggerInternalErr -> m ()
forall a b. (a -> b) -> a -> b
$ QErr -> ScheduledTriggerInternalErr
ScheduledTriggerInternalErr QErr
err
      getTemplateFromUrl :: InputWebhook -> Text
getTemplateFromUrl InputWebhook
url = URLTemplate -> Text
printURLTemplate (URLTemplate -> Text) -> URLTemplate -> Text
forall a b. (a -> b) -> a -> b
$ InputWebhook -> URLTemplate
unInputWebhook InputWebhook
url

processScheduledTriggers ::
  ( MonadIO m,
    Tracing.HasReporter m,
    MonadMetadataStorage (MetadataStorageT m)
  ) =>
  Env.Environment ->
  L.Logger L.Hasura ->
  HTTP.Manager ->
  IO SchemaCache ->
  LockedEventsCtx ->
  m (Forever m)
processScheduledTriggers :: Environment
-> Logger Hasura
-> Manager
-> IO SchemaCache
-> LockedEventsCtx
-> m (Forever m)
processScheduledTriggers Environment
env Logger Hasura
logger Manager
httpMgr IO SchemaCache
getSC LockedEventsCtx {TVar (Set CronEventId)
TVar (HashMap SourceName (Set CronEventId))
leActionEvents :: LockedEventsCtx -> TVar (Set CronEventId)
leEvents :: LockedEventsCtx -> TVar (HashMap SourceName (Set CronEventId))
leOneOffEvents :: LockedEventsCtx -> TVar (Set CronEventId)
leCronEvents :: LockedEventsCtx -> TVar (Set CronEventId)
leActionEvents :: TVar (Set CronEventId)
leEvents :: TVar (HashMap SourceName (Set CronEventId))
leOneOffEvents :: TVar (Set CronEventId)
leCronEvents :: TVar (Set CronEventId)
..} = do
  Forever m -> m (Forever m)
forall (m :: * -> *) a. Monad m => a -> m a
return (Forever m -> m (Forever m)) -> Forever m -> m (Forever m)
forall a b. (a -> b) -> a -> b
$
    () -> (() -> m ()) -> Forever m
forall (m :: * -> *) a. a -> (a -> m a) -> Forever m
Forever () ((() -> m ()) -> Forever m) -> (() -> m ()) -> Forever m
forall a b. (a -> b) -> a -> b
$
      m () -> () -> m ()
forall a b. a -> b -> a
const (m () -> () -> m ()) -> m () -> () -> m ()
forall a b. (a -> b) -> a -> b
$ do
        Either QErr ([CronEvent], [OneOffScheduledEvent])
result <- MetadataStorageT m ([CronEvent], [OneOffScheduledEvent])
-> m (Either QErr ([CronEvent], [OneOffScheduledEvent]))
forall (m :: * -> *) a. MetadataStorageT m a -> m (Either QErr a)
runMetadataStorageT MetadataStorageT m ([CronEvent], [OneOffScheduledEvent])
forall (m :: * -> *).
MonadMetadataStorage m =>
m ([CronEvent], [OneOffScheduledEvent])
getScheduledEventsForDelivery
        case Either QErr ([CronEvent], [OneOffScheduledEvent])
result of
          Left QErr
e -> QErr -> m ()
logInternalError QErr
e
          Right ([CronEvent]
cronEvents, [OneOffScheduledEvent]
oneOffEvents) -> do
            Logger Hasura
-> Manager
-> [CronEvent]
-> IO SchemaCache
-> TVar (Set CronEventId)
-> m ()
forall (m :: * -> *).
(MonadIO m, HasReporter m,
 MonadMetadataStorage (MetadataStorageT m)) =>
Logger Hasura
-> Manager
-> [CronEvent]
-> IO SchemaCache
-> TVar (Set CronEventId)
-> m ()
processCronEvents Logger Hasura
logger Manager
httpMgr [CronEvent]
cronEvents IO SchemaCache
getSC TVar (Set CronEventId)
leCronEvents
            Environment
-> Logger Hasura
-> Manager
-> [OneOffScheduledEvent]
-> TVar (Set CronEventId)
-> m ()
forall (m :: * -> *).
(MonadIO m, HasReporter m,
 MonadMetadataStorage (MetadataStorageT m)) =>
Environment
-> Logger Hasura
-> Manager
-> [OneOffScheduledEvent]
-> TVar (Set CronEventId)
-> m ()
processOneOffScheduledEvents Environment
env Logger Hasura
logger Manager
httpMgr [OneOffScheduledEvent]
oneOffEvents TVar (Set CronEventId)
leOneOffEvents
        -- NOTE: cron events are scheduled at times with minute resolution (as on
        -- unix), while one-off events can be set for arbitrary times. The sleep
        -- time here determines how overdue a scheduled event (cron or one-off)
        -- might be before we begin processing:
        IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ DiffTime -> IO ()
sleep (Seconds -> DiffTime
seconds Seconds
10)
  where
    logInternalError :: QErr -> m ()
logInternalError QErr
err = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ())
-> (ScheduledTriggerInternalErr -> IO ())
-> ScheduledTriggerInternalErr
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Logger Hasura
-> forall a (m :: * -> *).
   (ToEngineLog a Hasura, MonadIO m) =>
   a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
   (ToEngineLog a impl, MonadIO m) =>
   a -> m ()
L.unLogger Logger Hasura
logger (ScheduledTriggerInternalErr -> m ())
-> ScheduledTriggerInternalErr -> m ()
forall a b. (a -> b) -> a -> b
$ QErr -> ScheduledTriggerInternalErr
ScheduledTriggerInternalErr QErr
err

processScheduledEvent ::
  ( MonadReader r m,
    Has HTTP.Manager r,
    Has (L.Logger L.Hasura) r,
    MonadIO m,
    Tracing.HasReporter m,
    MonadMetadataStorage m
  ) =>
  ScheduledEventId ->
  [EventHeaderInfo] ->
  RetryContext ->
  ScheduledEventWebhookPayload ->
  EnvRecord ResolvedWebhook ->
  ScheduledEventType ->
  m ()
processScheduledEvent :: CronEventId
-> [EventHeaderInfo]
-> RetryContext
-> ScheduledEventWebhookPayload
-> EnvRecord ResolvedWebhook
-> ScheduledEventType
-> m ()
processScheduledEvent CronEventId
eventId [EventHeaderInfo]
eventHeaders RetryContext
retryCtx ScheduledEventWebhookPayload
payload EnvRecord ResolvedWebhook
webhookUrl ScheduledEventType
type' =
  Text -> TraceT m () -> m ()
forall (m :: * -> *) a.
(HasReporter m, MonadIO m) =>
Text -> TraceT m a -> m a
Tracing.runTraceT Text
traceNote do
    UTCTime
currentTime <- IO UTCTime -> TraceT m UTCTime
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
    let retryConf :: STRetryConf
retryConf = RetryContext -> STRetryConf
_rctxConf RetryContext
retryCtx
        scheduledTime :: UTCTime
scheduledTime = ScheduledEventWebhookPayload -> UTCTime
sewpScheduledTime ScheduledEventWebhookPayload
payload
    if NominalDiffTime -> DiffTime
forall x y. (Duration x, Duration y) => x -> y
convertDuration (UTCTime -> UTCTime -> NominalDiffTime
diffUTCTime UTCTime
currentTime UTCTime
scheduledTime)
      DiffTime -> DiffTime -> Bool
forall a. Ord a => a -> a -> Bool
> NonNegativeDiffTime -> DiffTime
Numeric.unNonNegativeDiffTime (STRetryConf -> NonNegativeDiffTime
strcToleranceSeconds STRetryConf
retryConf)
      then CronEventId -> ScheduledEventType -> TraceT m ()
forall (m :: * -> *).
MonadMetadataStorage m =>
CronEventId -> ScheduledEventType -> m ()
processDead CronEventId
eventId ScheduledEventType
type'
      else do
        let timeoutSeconds :: Int
timeoutSeconds =
              DiffTime -> Int
forall a b. (RealFrac a, Integral b) => a -> b
round (DiffTime -> Int) -> DiffTime -> Int
forall a b. (a -> b) -> a -> b
$
                NonNegativeDiffTime -> DiffTime
Numeric.unNonNegativeDiffTime (NonNegativeDiffTime -> DiffTime)
-> NonNegativeDiffTime -> DiffTime
forall a b. (a -> b) -> a -> b
$
                  STRetryConf -> NonNegativeDiffTime
strcTimeoutSeconds STRetryConf
retryConf
            httpTimeout :: ResponseTimeout
httpTimeout = Int -> ResponseTimeout
HTTP.responseTimeoutMicro (Int
timeoutSeconds Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000000)
            ([Header]
headers, [HeaderConf]
decodedHeaders) = [EventHeaderInfo] -> ([Header], [HeaderConf])
prepareHeaders [EventHeaderInfo]
eventHeaders
            extraLogCtx :: ExtraLogContext
extraLogCtx = CronEventId -> Maybe TriggerName -> ExtraLogContext
ExtraLogContext CronEventId
eventId (ScheduledEventWebhookPayload -> Maybe TriggerName
sewpName ScheduledEventWebhookPayload
payload)
            webhookReqBodyJson :: Value
webhookReqBodyJson = ScheduledEventWebhookPayload -> Value
forall a. ToJSON a => a -> Value
J.toJSON ScheduledEventWebhookPayload
payload
            webhookReqBody :: ByteString
webhookReqBody = Value -> ByteString
forall a. ToJSON a => a -> ByteString
J.encode Value
webhookReqBodyJson
            requestTransform :: Maybe RequestTransform
requestTransform = ScheduledEventWebhookPayload -> Maybe RequestTransform
sewpRequestTransform ScheduledEventWebhookPayload
payload
            responseTransform :: Maybe ResponseTransform
responseTransform = MetadataResponseTransform -> ResponseTransform
mkResponseTransform (MetadataResponseTransform -> ResponseTransform)
-> Maybe MetadataResponseTransform -> Maybe ResponseTransform
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ScheduledEventWebhookPayload -> Maybe MetadataResponseTransform
sewpResponseTransform ScheduledEventWebhookPayload
payload

        Either
  (TransformableRequestError 'ScheduledType)
  (Request, HTTPResp 'ScheduledType)
eitherReqRes <-
          ExceptT
  (TransformableRequestError 'ScheduledType)
  (TraceT m)
  (Request, HTTPResp 'ScheduledType)
-> TraceT
     m
     (Either
        (TransformableRequestError 'ScheduledType)
        (Request, HTTPResp 'ScheduledType))
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT
   (TransformableRequestError 'ScheduledType)
   (TraceT m)
   (Request, HTTPResp 'ScheduledType)
 -> TraceT
      m
      (Either
         (TransformableRequestError 'ScheduledType)
         (Request, HTTPResp 'ScheduledType)))
-> ExceptT
     (TransformableRequestError 'ScheduledType)
     (TraceT m)
     (Request, HTTPResp 'ScheduledType)
-> TraceT
     m
     (Either
        (TransformableRequestError 'ScheduledType)
        (Request, HTTPResp 'ScheduledType))
forall a b. (a -> b) -> a -> b
$
            [Header]
-> ResponseTimeout
-> ByteString
-> Maybe RequestTransform
-> ResolvedWebhook
-> ExceptT
     (TransformableRequestError 'ScheduledType)
     (TraceT m)
     RequestDetails
forall (a :: TriggerTypes) (m :: * -> *).
MonadError (TransformableRequestError a) m =>
[Header]
-> ResponseTimeout
-> ByteString
-> Maybe RequestTransform
-> ResolvedWebhook
-> m RequestDetails
mkRequest [Header]
headers ResponseTimeout
httpTimeout ByteString
webhookReqBody Maybe RequestTransform
requestTransform (EnvRecord ResolvedWebhook -> ResolvedWebhook
forall a. EnvRecord a -> a
_envVarValue EnvRecord ResolvedWebhook
webhookUrl) ExceptT
  (TransformableRequestError 'ScheduledType)
  (TraceT m)
  RequestDetails
-> (RequestDetails
    -> ExceptT
         (TransformableRequestError 'ScheduledType)
         (TraceT m)
         (Request, HTTPResp 'ScheduledType))
-> ExceptT
     (TransformableRequestError 'ScheduledType)
     (TraceT m)
     (Request, HTTPResp 'ScheduledType)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \RequestDetails
reqDetails -> do
              let request :: Request
request = RequestDetails -> Request
extractRequest RequestDetails
reqDetails
                  logger :: Either (HTTPErr 'ScheduledType) (HTTPResp 'ScheduledType)
-> RequestDetails
-> ExceptT (TransformableRequestError 'ScheduledType) (TraceT m) ()
logger Either (HTTPErr 'ScheduledType) (HTTPResp 'ScheduledType)
e RequestDetails
d = Either (HTTPErr 'ScheduledType) (HTTPResp 'ScheduledType)
-> ExtraLogContext
-> RequestDetails
-> Text
-> [HeaderConf]
-> ExceptT (TransformableRequestError 'ScheduledType) (TraceT m) ()
forall r (m :: * -> *).
(MonadReader r m, Has (Logger Hasura) r, MonadIO m) =>
Either (HTTPErr 'ScheduledType) (HTTPResp 'ScheduledType)
-> ExtraLogContext
-> RequestDetails
-> Text
-> [HeaderConf]
-> m ()
logHTTPForST Either (HTTPErr 'ScheduledType) (HTTPResp 'ScheduledType)
e ExtraLogContext
extraLogCtx RequestDetails
d (EnvRecord ResolvedWebhook -> Text
forall a. EnvRecord a -> Text
_envVarName EnvRecord ResolvedWebhook
webhookUrl) [HeaderConf]
decodedHeaders
                  sessionVars :: Maybe SessionVariables
sessionVars = RequestDetails -> Maybe SessionVariables
_rdSessionVars RequestDetails
reqDetails
              HTTPResp 'ScheduledType
resp <- RequestDetails
-> Maybe ResponseTransform
-> Maybe SessionVariables
-> (Either (HTTPErr 'ScheduledType) (HTTPResp 'ScheduledType)
    -> RequestDetails
    -> ExceptT
         (TransformableRequestError 'ScheduledType) (TraceT m) ())
-> ExceptT
     (TransformableRequestError 'ScheduledType)
     (TraceT m)
     (HTTPResp 'ScheduledType)
forall r (m :: * -> *) (a :: TriggerTypes).
(MonadReader r m, MonadError (TransformableRequestError a) m,
 Has Manager r, Has (Logger Hasura) r, MonadIO m, MonadTrace m) =>
RequestDetails
-> Maybe ResponseTransform
-> Maybe SessionVariables
-> (Either (HTTPErr a) (HTTPResp a) -> RequestDetails -> m ())
-> m (HTTPResp a)
invokeRequest RequestDetails
reqDetails Maybe ResponseTransform
responseTransform Maybe SessionVariables
sessionVars Either (HTTPErr 'ScheduledType) (HTTPResp 'ScheduledType)
-> RequestDetails
-> ExceptT (TransformableRequestError 'ScheduledType) (TraceT m) ()
logger
              (Request, HTTPResp 'ScheduledType)
-> ExceptT
     (TransformableRequestError 'ScheduledType)
     (TraceT m)
     (Request, HTTPResp 'ScheduledType)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Request
request, HTTPResp 'ScheduledType
resp)
        case Either
  (TransformableRequestError 'ScheduledType)
  (Request, HTTPResp 'ScheduledType)
eitherReqRes of
          Right (Request
req, HTTPResp 'ScheduledType
resp) ->
            let reqBody :: Value
reqBody = Value -> Maybe Value -> Value
forall a. a -> Maybe a -> a
fromMaybe Value
J.Null (Maybe Value -> Value) -> Maybe Value -> Value
forall a b. (a -> b) -> a -> b
$ Getting (Maybe ByteString) Request (Maybe ByteString)
-> Request -> Maybe ByteString
forall s (m :: * -> *) a. MonadReader s m => Getting a s a -> m a
view Getting (Maybe ByteString) Request (Maybe ByteString)
Lens' Request (Maybe ByteString)
HTTP.body Request
req Maybe ByteString -> (ByteString -> Maybe Value) -> Maybe Value
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= FromJSON Value => ByteString -> Maybe Value
forall a. FromJSON a => ByteString -> Maybe a
J.decode @J.Value
             in CronEventId
-> [HeaderConf]
-> ScheduledEventType
-> Value
-> HTTPResp 'ScheduledType
-> TraceT m ()
forall (m :: * -> *) (a :: TriggerTypes).
MonadMetadataStorage m =>
CronEventId
-> [HeaderConf]
-> ScheduledEventType
-> Value
-> HTTPResp a
-> m ()
processSuccess CronEventId
eventId [HeaderConf]
decodedHeaders ScheduledEventType
type' Value
reqBody HTTPResp 'ScheduledType
resp
          Left (HTTPError Value
reqBody HTTPErr 'ScheduledType
e) -> CronEventId
-> RetryContext
-> [HeaderConf]
-> ScheduledEventType
-> Value
-> HTTPErr 'ScheduledType
-> TraceT m ()
forall (m :: * -> *) (a :: TriggerTypes).
(MonadIO m, MonadMetadataStorage m) =>
CronEventId
-> RetryContext
-> [HeaderConf]
-> ScheduledEventType
-> Value
-> HTTPErr a
-> m ()
processError CronEventId
eventId RetryContext
retryCtx [HeaderConf]
decodedHeaders ScheduledEventType
type' Value
reqBody HTTPErr 'ScheduledType
e
          Left (TransformationError Value
_ TransformErrorBundle
e) -> do
            -- Log The Transformation Error
            Logger Hasura
logger :: L.Logger L.Hasura <- (r -> Logger Hasura) -> TraceT m (Logger Hasura)
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks r -> Logger Hasura
forall a t. Has a t => t -> a
getter
            Logger Hasura
-> forall a (m :: * -> *).
   (ToEngineLog a Hasura, MonadIO m) =>
   a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
   (ToEngineLog a impl, MonadIO m) =>
   a -> m ()
L.unLogger Logger Hasura
logger (UnstructuredLog -> TraceT m ()) -> UnstructuredLog -> TraceT m ()
forall a b. (a -> b) -> a -> b
$ LogLevel -> SerializableBlob -> UnstructuredLog
L.UnstructuredLog LogLevel
L.LevelError (ByteString -> SerializableBlob
SB.fromLBS (ByteString -> SerializableBlob) -> ByteString -> SerializableBlob
forall a b. (a -> b) -> a -> b
$ TransformErrorBundle -> ByteString
forall a. ToJSON a => a -> ByteString
J.encode TransformErrorBundle
e)

            -- Set event state to Error
            CronEventId
-> ScheduledEventOp -> ScheduledEventType -> TraceT m ()
forall (m :: * -> *).
MonadMetadataStorage m =>
CronEventId -> ScheduledEventOp -> ScheduledEventType -> m ()
setScheduledEventOp CronEventId
eventId (ScheduledEventStatus -> ScheduledEventOp
SEOpStatus ScheduledEventStatus
SESError) ScheduledEventType
type'
  where
    traceNote :: Text
traceNote = Text
"Scheduled trigger" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> (TriggerName -> Text) -> Maybe TriggerName -> Text
forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
foldMap ((Text
": " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<>) (Text -> Text) -> (TriggerName -> Text) -> TriggerName -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TriggerName -> Text
triggerNameToTxt) (ScheduledEventWebhookPayload -> Maybe TriggerName
sewpName ScheduledEventWebhookPayload
payload)

processError ::
  ( MonadIO m,
    MonadMetadataStorage m
  ) =>
  ScheduledEventId ->
  RetryContext ->
  [HeaderConf] ->
  ScheduledEventType ->
  J.Value ->
  HTTPErr a ->
  m ()
processError :: CronEventId
-> RetryContext
-> [HeaderConf]
-> ScheduledEventType
-> Value
-> HTTPErr a
-> m ()
processError CronEventId
eventId RetryContext
retryCtx [HeaderConf]
decodedHeaders ScheduledEventType
type' Value
reqJson HTTPErr a
err = do
  let invocation :: Invocation 'ScheduledType
invocation = case HTTPErr a
err of
        HClient HttpException
httpException ->
          let statusMaybe :: Maybe Int
statusMaybe = HttpException -> Maybe Int
getHTTPExceptionStatus HttpException
httpException
           in CronEventId
-> Maybe Int
-> [HeaderConf]
-> SerializableBlob
-> [HeaderConf]
-> Value
-> Invocation 'ScheduledType
mkInvocation CronEventId
eventId Maybe Int
statusMaybe [HeaderConf]
decodedHeaders (ByteString -> SerializableBlob
SB.fromLBS (ByteString -> SerializableBlob) -> ByteString -> SerializableBlob
forall a b. (a -> b) -> a -> b
$ HttpException -> ByteString
forall a. ToJSON a => a -> ByteString
J.encode HttpException
httpException) [] Value
reqJson
        HStatus HTTPResp a
errResp -> do
          let respPayload :: SerializableBlob
respPayload = HTTPResp a -> SerializableBlob
forall (a :: TriggerTypes). HTTPResp a -> SerializableBlob
hrsBody HTTPResp a
errResp
              respHeaders :: [HeaderConf]
respHeaders = HTTPResp a -> [HeaderConf]
forall (a :: TriggerTypes). HTTPResp a -> [HeaderConf]
hrsHeaders HTTPResp a
errResp
              respStatus :: Int
respStatus = HTTPResp a -> Int
forall (a :: TriggerTypes). HTTPResp a -> Int
hrsStatus HTTPResp a
errResp
          CronEventId
-> Maybe Int
-> [HeaderConf]
-> SerializableBlob
-> [HeaderConf]
-> Value
-> Invocation 'ScheduledType
mkInvocation CronEventId
eventId (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
respStatus) [HeaderConf]
decodedHeaders SerializableBlob
respPayload [HeaderConf]
respHeaders Value
reqJson
        HOther String
detail -> do
          let errMsg :: SerializableBlob
errMsg = (ByteString -> SerializableBlob
SB.fromLBS (ByteString -> SerializableBlob) -> ByteString -> SerializableBlob
forall a b. (a -> b) -> a -> b
$ String -> ByteString
forall a. ToJSON a => a -> ByteString
J.encode String
detail)
          CronEventId
-> Maybe Int
-> [HeaderConf]
-> SerializableBlob
-> [HeaderConf]
-> Value
-> Invocation 'ScheduledType
mkInvocation CronEventId
eventId (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
500) [HeaderConf]
decodedHeaders SerializableBlob
errMsg [] Value
reqJson
  Invocation 'ScheduledType -> ScheduledEventType -> m ()
forall (m :: * -> *).
MonadMetadataStorage m =>
Invocation 'ScheduledType -> ScheduledEventType -> m ()
insertScheduledEventInvocation Invocation 'ScheduledType
invocation ScheduledEventType
type'
  CronEventId
-> RetryContext -> HTTPErr a -> ScheduledEventType -> m ()
forall (m :: * -> *) (a :: TriggerTypes).
(MonadIO m, MonadMetadataStorage m) =>
CronEventId
-> RetryContext -> HTTPErr a -> ScheduledEventType -> m ()
retryOrMarkError CronEventId
eventId RetryContext
retryCtx HTTPErr a
err ScheduledEventType
type'

retryOrMarkError ::
  (MonadIO m, MonadMetadataStorage m) =>
  ScheduledEventId ->
  RetryContext ->
  HTTPErr a ->
  ScheduledEventType ->
  m ()
retryOrMarkError :: CronEventId
-> RetryContext -> HTTPErr a -> ScheduledEventType -> m ()
retryOrMarkError CronEventId
eventId RetryContext
retryCtx HTTPErr a
err ScheduledEventType
type' = do
  let RetryContext Int
tries STRetryConf
retryConf = RetryContext
retryCtx
      mRetryHeader :: Maybe Text
mRetryHeader = HTTPErr a -> Maybe Text
forall (a :: TriggerTypes). HTTPErr a -> Maybe Text
getRetryAfterHeaderFromHTTPErr HTTPErr a
err
      mRetryHeaderSeconds :: Maybe Int
mRetryHeaderSeconds = Text -> Maybe Int
parseRetryHeaderValue (Text -> Maybe Int) -> Maybe Text -> Maybe Int
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Maybe Text
mRetryHeader
      triesExhausted :: Bool
triesExhausted = Int
tries Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= STRetryConf -> Int
strcNumRetries STRetryConf
retryConf
      noRetryHeader :: Bool
noRetryHeader = Maybe Int -> Bool
forall a. Maybe a -> Bool
isNothing Maybe Int
mRetryHeaderSeconds
  if Bool
triesExhausted Bool -> Bool -> Bool
&& Bool
noRetryHeader
    then CronEventId -> ScheduledEventOp -> ScheduledEventType -> m ()
forall (m :: * -> *).
MonadMetadataStorage m =>
CronEventId -> ScheduledEventOp -> ScheduledEventType -> m ()
setScheduledEventOp CronEventId
eventId (ScheduledEventStatus -> ScheduledEventOp
SEOpStatus ScheduledEventStatus
SESError) ScheduledEventType
type'
    else do
      UTCTime
currentTime <- IO UTCTime -> m UTCTime
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
      let delay :: Int
delay =
            Int -> Maybe Int -> Int
forall a. a -> Maybe a -> a
fromMaybe
              ( DiffTime -> Int
forall a b. (RealFrac a, Integral b) => a -> b
round (DiffTime -> Int) -> DiffTime -> Int
forall a b. (a -> b) -> a -> b
$
                  NonNegativeDiffTime -> DiffTime
Numeric.unNonNegativeDiffTime (NonNegativeDiffTime -> DiffTime)
-> NonNegativeDiffTime -> DiffTime
forall a b. (a -> b) -> a -> b
$
                    STRetryConf -> NonNegativeDiffTime
strcRetryIntervalSeconds STRetryConf
retryConf
              )
              Maybe Int
mRetryHeaderSeconds
          diff :: NominalDiffTime
diff = Int -> NominalDiffTime
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
delay
          retryTime :: UTCTime
retryTime = NominalDiffTime -> UTCTime -> UTCTime
addUTCTime NominalDiffTime
diff UTCTime
currentTime
      CronEventId -> ScheduledEventOp -> ScheduledEventType -> m ()
forall (m :: * -> *).
MonadMetadataStorage m =>
CronEventId -> ScheduledEventOp -> ScheduledEventType -> m ()
setScheduledEventOp CronEventId
eventId (UTCTime -> ScheduledEventOp
SEOpRetry UTCTime
retryTime) ScheduledEventType
type'

{- Note [Scheduled event lifecycle]
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Scheduled events move between six different states over the course of their
lifetime, as represented by the following flowchart:

    ┌───────────┐      ┌────────┐      ┌───────────┐
    │ scheduled │─(1)─→│ locked │─(2)─→│ delivered │
    └───────────┘      └────────┘      └───────────┘
            ↑              │           ┌───────┐
            └────(3)───────┼─────(4)──→│ error │
                           │           └───────┘
                           │           ┌──────┐
                           └─────(5)──→│ dead │
                                       └──────┘

When a scheduled event is first created, it starts in the 'scheduled' state,
and it can transition to other states in the following ways:
  1. When graphql-engine fetches a scheduled event from the database to process
     it, it sets its state to 'locked'. This prevents multiple graphql-engine
     instances running on the same database from processing the same
     scheduled event concurrently.
  2. When a scheduled event is processed successfully, it is marked 'delivered'.
  3. If a scheduled event fails to be processed, but it hasn’t yet reached
     its maximum retry limit, its retry counter is incremented and
     it is returned to the 'scheduled' state.
  4. If a scheduled event fails to be processed and *has* reached its
     retry limit, its state is set to 'error'.
  5. If for whatever reason the difference between the current time and the
     scheduled time is greater than the tolerance of the scheduled event, it
     will not be processed and its state will be set to 'dead'.
-}

processSuccess ::
  (MonadMetadataStorage m) =>
  ScheduledEventId ->
  [HeaderConf] ->
  ScheduledEventType ->
  J.Value ->
  HTTPResp a ->
  m ()
processSuccess :: CronEventId
-> [HeaderConf]
-> ScheduledEventType
-> Value
-> HTTPResp a
-> m ()
processSuccess CronEventId
eventId [HeaderConf]
decodedHeaders ScheduledEventType
type' Value
reqBodyJson HTTPResp a
resp = do
  let respBody :: SerializableBlob
respBody = HTTPResp a -> SerializableBlob
forall (a :: TriggerTypes). HTTPResp a -> SerializableBlob
hrsBody HTTPResp a
resp
      respHeaders :: [HeaderConf]
respHeaders = HTTPResp a -> [HeaderConf]
forall (a :: TriggerTypes). HTTPResp a -> [HeaderConf]
hrsHeaders HTTPResp a
resp
      respStatus :: Int
respStatus = HTTPResp a -> Int
forall (a :: TriggerTypes). HTTPResp a -> Int
hrsStatus HTTPResp a
resp
      invocation :: Invocation 'ScheduledType
invocation = CronEventId
-> Maybe Int
-> [HeaderConf]
-> SerializableBlob
-> [HeaderConf]
-> Value
-> Invocation 'ScheduledType
mkInvocation CronEventId
eventId (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
respStatus) [HeaderConf]
decodedHeaders SerializableBlob
respBody [HeaderConf]
respHeaders Value
reqBodyJson
  Invocation 'ScheduledType -> ScheduledEventType -> m ()
forall (m :: * -> *).
MonadMetadataStorage m =>
Invocation 'ScheduledType -> ScheduledEventType -> m ()
insertScheduledEventInvocation Invocation 'ScheduledType
invocation ScheduledEventType
type'
  CronEventId -> ScheduledEventOp -> ScheduledEventType -> m ()
forall (m :: * -> *).
MonadMetadataStorage m =>
CronEventId -> ScheduledEventOp -> ScheduledEventType -> m ()
setScheduledEventOp CronEventId
eventId (ScheduledEventStatus -> ScheduledEventOp
SEOpStatus ScheduledEventStatus
SESDelivered) ScheduledEventType
type'

processDead ::
  (MonadMetadataStorage m) =>
  ScheduledEventId ->
  ScheduledEventType ->
  m ()
processDead :: CronEventId -> ScheduledEventType -> m ()
processDead CronEventId
eventId ScheduledEventType
type' =
  CronEventId -> ScheduledEventOp -> ScheduledEventType -> m ()
forall (m :: * -> *).
MonadMetadataStorage m =>
CronEventId -> ScheduledEventOp -> ScheduledEventType -> m ()
setScheduledEventOp CronEventId
eventId (ScheduledEventStatus -> ScheduledEventOp
SEOpStatus ScheduledEventStatus
SESDead) ScheduledEventType
type'

mkInvocation ::
  ScheduledEventId ->
  Maybe Int ->
  [HeaderConf] ->
  SB.SerializableBlob ->
  [HeaderConf] ->
  J.Value ->
  (Invocation 'ScheduledType)
mkInvocation :: CronEventId
-> Maybe Int
-> [HeaderConf]
-> SerializableBlob
-> [HeaderConf]
-> Value
-> Invocation 'ScheduledType
mkInvocation CronEventId
eventId Maybe Int
status [HeaderConf]
reqHeaders SerializableBlob
respBody [HeaderConf]
respHeaders Value
reqBodyJson =
  CronEventId
-> Maybe Int
-> WebhookRequest
-> Response 'ScheduledType
-> Invocation 'ScheduledType
forall (a :: TriggerTypes).
CronEventId
-> Maybe Int -> WebhookRequest -> Response a -> Invocation a
Invocation
    CronEventId
eventId
    Maybe Int
status
    (Value -> [HeaderConf] -> Text -> WebhookRequest
mkWebhookReq Value
reqBodyJson [HeaderConf]
reqHeaders Text
invocationVersionST)
    (Maybe Int
-> SerializableBlob -> [HeaderConf] -> Response 'ScheduledType
forall (a :: TriggerTypes).
Maybe Int -> SerializableBlob -> [HeaderConf] -> Response a
mkInvocationResp Maybe Int
status SerializableBlob
respBody [HeaderConf]
respHeaders)

-- metadata database transactions

-- | Get cron trigger stats for cron jobs with fewer than 100 future reified
-- events in the database
--
-- The point here is to maintain a certain number of future events so the user
-- can kind of see what's coming up, and obviously to give 'processCronEvents'
-- something to do.
getDeprivedCronTriggerStatsTx :: [TriggerName] -> Q.TxE QErr [CronTriggerStats]
getDeprivedCronTriggerStatsTx :: [TriggerName] -> TxE QErr [CronTriggerStats]
getDeprivedCronTriggerStatsTx [TriggerName]
cronTriggerNames =
  ((TriggerName, Int, UTCTime) -> CronTriggerStats)
-> [(TriggerName, Int, UTCTime)] -> [CronTriggerStats]
forall a b. (a -> b) -> [a] -> [b]
map (\(TriggerName
n, Int
count, UTCTime
maxTx) -> TriggerName -> Int -> UTCTime -> CronTriggerStats
CronTriggerStats TriggerName
n Int
count UTCTime
maxTx)
    ([(TriggerName, Int, UTCTime)] -> [CronTriggerStats])
-> TxET QErr IO [(TriggerName, Int, UTCTime)]
-> TxE QErr [CronTriggerStats]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> Identity PGTextArray
-> Bool
-> TxET QErr IO [(TriggerName, Int, UTCTime)]
forall (m :: * -> *) a r e.
(MonadIO m, FromRow a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m [a]
Q.listQE
      PGTxErr -> QErr
defaultTxErrorHandler
      [Q.sql|
      SELECT t.trigger_name, coalesce(q.upcoming_events_count, 0), coalesce(q.max_scheduled_time, now())
      FROM (SELECT UNNEST ($1::text[]) as trigger_name) as t
      LEFT JOIN
      ( SELECT
         trigger_name,
          count(1) as upcoming_events_count,
          max(scheduled_time) as max_scheduled_time
         FROM hdb_catalog.hdb_cron_events
         WHERE tries = 0 and status = 'scheduled'
         GROUP BY trigger_name
      ) AS q
      ON t.trigger_name = q.trigger_name
      WHERE coalesce(q.upcoming_events_count, 0) < 100
     |]
      (PGTextArray -> Identity PGTextArray
forall a. a -> Identity a
Identity (PGTextArray -> Identity PGTextArray)
-> PGTextArray -> Identity PGTextArray
forall a b. (a -> b) -> a -> b
$ [Text] -> PGTextArray
PGTextArray ([Text] -> PGTextArray) -> [Text] -> PGTextArray
forall a b. (a -> b) -> a -> b
$ (TriggerName -> Text) -> [TriggerName] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map TriggerName -> Text
triggerNameToTxt [TriggerName]
cronTriggerNames)
      Bool
True

-- TODO
--  - cron events have minute resolution, while one-off events have arbitrary
--    resolution, so it doesn't make sense to fetch them at the same rate
--  - if we decide to fetch cron events less frequently we should wake up that
--    thread at second 0 of every minute, and then pass hasura's now time into
--    the query (since the DB may disagree about the time)
getScheduledEventsForDeliveryTx :: Q.TxE QErr ([CronEvent], [OneOffScheduledEvent])
getScheduledEventsForDeliveryTx :: TxE QErr ([CronEvent], [OneOffScheduledEvent])
getScheduledEventsForDeliveryTx =
  (,) ([CronEvent]
 -> [OneOffScheduledEvent] -> ([CronEvent], [OneOffScheduledEvent]))
-> TxET QErr IO [CronEvent]
-> TxET
     QErr
     IO
     ([OneOffScheduledEvent] -> ([CronEvent], [OneOffScheduledEvent]))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TxET QErr IO [CronEvent]
getCronEventsForDelivery TxET
  QErr
  IO
  ([OneOffScheduledEvent] -> ([CronEvent], [OneOffScheduledEvent]))
-> TxET QErr IO [OneOffScheduledEvent]
-> TxE QErr ([CronEvent], [OneOffScheduledEvent])
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> TxET QErr IO [OneOffScheduledEvent]
getOneOffEventsForDelivery
  where
    getCronEventsForDelivery :: Q.TxE QErr [CronEvent]
    getCronEventsForDelivery :: TxET QErr IO [CronEvent]
getCronEventsForDelivery =
      (Identity (AltJ CronEvent) -> CronEvent)
-> [Identity (AltJ CronEvent)] -> [CronEvent]
forall a b. (a -> b) -> [a] -> [b]
map (AltJ CronEvent -> CronEvent
forall a. AltJ a -> a
Q.getAltJ (AltJ CronEvent -> CronEvent)
-> (Identity (AltJ CronEvent) -> AltJ CronEvent)
-> Identity (AltJ CronEvent)
-> CronEvent
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Identity (AltJ CronEvent) -> AltJ CronEvent
forall a. Identity a -> a
runIdentity)
        ([Identity (AltJ CronEvent)] -> [CronEvent])
-> TxET QErr IO [Identity (AltJ CronEvent)]
-> TxET QErr IO [CronEvent]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query -> () -> Bool -> TxET QErr IO [Identity (AltJ CronEvent)]
forall (m :: * -> *) a r e.
(MonadIO m, FromRow a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m [a]
Q.listQE
          PGTxErr -> QErr
defaultTxErrorHandler
          [Q.sql|
        WITH cte AS
          ( UPDATE hdb_catalog.hdb_cron_events
            SET status = 'locked'
            WHERE id IN ( SELECT t.id
                          FROM hdb_catalog.hdb_cron_events t
                          WHERE ( t.status = 'scheduled'
                                  and (
                                   (t.next_retry_at is NULL and t.scheduled_time <= now()) or
                                   (t.next_retry_at is not NULL and t.next_retry_at <= now())
                                  )
                                )
                          FOR UPDATE SKIP LOCKED
                          )
            RETURNING *
          )
        SELECT row_to_json(t.*) FROM cte AS t
      |]
          ()
          Bool
True

    getOneOffEventsForDelivery :: Q.TxE QErr [OneOffScheduledEvent]
    getOneOffEventsForDelivery :: TxET QErr IO [OneOffScheduledEvent]
getOneOffEventsForDelivery = do
      (Identity (AltJ OneOffScheduledEvent) -> OneOffScheduledEvent)
-> [Identity (AltJ OneOffScheduledEvent)] -> [OneOffScheduledEvent]
forall a b. (a -> b) -> [a] -> [b]
map (AltJ OneOffScheduledEvent -> OneOffScheduledEvent
forall a. AltJ a -> a
Q.getAltJ (AltJ OneOffScheduledEvent -> OneOffScheduledEvent)
-> (Identity (AltJ OneOffScheduledEvent)
    -> AltJ OneOffScheduledEvent)
-> Identity (AltJ OneOffScheduledEvent)
-> OneOffScheduledEvent
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Identity (AltJ OneOffScheduledEvent) -> AltJ OneOffScheduledEvent
forall a. Identity a -> a
runIdentity)
        ([Identity (AltJ OneOffScheduledEvent)] -> [OneOffScheduledEvent])
-> TxET QErr IO [Identity (AltJ OneOffScheduledEvent)]
-> TxET QErr IO [OneOffScheduledEvent]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> ()
-> Bool
-> TxET QErr IO [Identity (AltJ OneOffScheduledEvent)]
forall (m :: * -> *) a r e.
(MonadIO m, FromRow a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m [a]
Q.listQE
          PGTxErr -> QErr
defaultTxErrorHandler
          [Q.sql|
         WITH cte AS (
            UPDATE hdb_catalog.hdb_scheduled_events
            SET status = 'locked'
            WHERE id IN ( SELECT t.id
                          FROM hdb_catalog.hdb_scheduled_events t
                          WHERE ( t.status = 'scheduled'
                                  and (
                                   (t.next_retry_at is NULL and t.scheduled_time <= now()) or
                                   (t.next_retry_at is not NULL and t.next_retry_at <= now())
                                  )
                                )
                          FOR UPDATE SKIP LOCKED
                          )
            RETURNING *
          )
         SELECT row_to_json(t.*) FROM cte AS t
      |]
          ()
          Bool
False

insertInvocationTx :: Invocation 'ScheduledType -> ScheduledEventType -> Q.TxE QErr ()
insertInvocationTx :: Invocation 'ScheduledType -> ScheduledEventType -> TxE QErr ()
insertInvocationTx Invocation 'ScheduledType
invo ScheduledEventType
type' = do
  case ScheduledEventType
type' of
    ScheduledEventType
Cron -> do
      (PGTxErr -> QErr)
-> Query
-> (CronEventId, Maybe Int64, AltJ Value, AltJ Value)
-> Bool
-> TxE QErr ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
Q.unitQE
        PGTxErr -> QErr
defaultTxErrorHandler
        [Q.sql|
         INSERT INTO hdb_catalog.hdb_cron_event_invocation_logs
         (event_id, status, request, response)
         VALUES ($1, $2, $3, $4)
        |]
        ( Invocation 'ScheduledType -> CronEventId
forall (a :: TriggerTypes). Invocation a -> CronEventId
iEventId Invocation 'ScheduledType
invo,
          Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Int64) -> Maybe Int -> Maybe Int64
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Invocation 'ScheduledType -> Maybe Int
forall (a :: TriggerTypes). Invocation a -> Maybe Int
iStatus Invocation 'ScheduledType
invo :: Maybe Int64,
          Value -> AltJ Value
forall a. a -> AltJ a
Q.AltJ (Value -> AltJ Value) -> Value -> AltJ Value
forall a b. (a -> b) -> a -> b
$ WebhookRequest -> Value
forall a. ToJSON a => a -> Value
J.toJSON (WebhookRequest -> Value) -> WebhookRequest -> Value
forall a b. (a -> b) -> a -> b
$ Invocation 'ScheduledType -> WebhookRequest
forall (a :: TriggerTypes). Invocation a -> WebhookRequest
iRequest Invocation 'ScheduledType
invo,
          Value -> AltJ Value
forall a. a -> AltJ a
Q.AltJ (Value -> AltJ Value) -> Value -> AltJ Value
forall a b. (a -> b) -> a -> b
$ Response 'ScheduledType -> Value
forall a. ToJSON a => a -> Value
J.toJSON (Response 'ScheduledType -> Value)
-> Response 'ScheduledType -> Value
forall a b. (a -> b) -> a -> b
$ Invocation 'ScheduledType -> Response 'ScheduledType
forall (a :: TriggerTypes). Invocation a -> Response a
iResponse Invocation 'ScheduledType
invo
        )
        Bool
True
      (PGTxErr -> QErr)
-> Query -> Identity CronEventId -> Bool -> TxE QErr ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
Q.unitQE
        PGTxErr -> QErr
defaultTxErrorHandler
        [Q.sql|
          UPDATE hdb_catalog.hdb_cron_events
          SET tries = tries + 1
          WHERE id = $1
          |]
        (CronEventId -> Identity CronEventId
forall a. a -> Identity a
Identity (CronEventId -> Identity CronEventId)
-> CronEventId -> Identity CronEventId
forall a b. (a -> b) -> a -> b
$ Invocation 'ScheduledType -> CronEventId
forall (a :: TriggerTypes). Invocation a -> CronEventId
iEventId Invocation 'ScheduledType
invo)
        Bool
True
    ScheduledEventType
OneOff -> do
      (PGTxErr -> QErr)
-> Query
-> (CronEventId, Maybe Int64, AltJ Value, AltJ Value)
-> Bool
-> TxE QErr ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
Q.unitQE
        PGTxErr -> QErr
defaultTxErrorHandler
        [Q.sql|
         INSERT INTO hdb_catalog.hdb_scheduled_event_invocation_logs
         (event_id, status, request, response)
         VALUES ($1, $2, $3, $4)
        |]
        ( Invocation 'ScheduledType -> CronEventId
forall (a :: TriggerTypes). Invocation a -> CronEventId
iEventId Invocation 'ScheduledType
invo,
          Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Int64) -> Maybe Int -> Maybe Int64
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Invocation 'ScheduledType -> Maybe Int
forall (a :: TriggerTypes). Invocation a -> Maybe Int
iStatus Invocation 'ScheduledType
invo :: Maybe Int64,
          Value -> AltJ Value
forall a. a -> AltJ a
Q.AltJ (Value -> AltJ Value) -> Value -> AltJ Value
forall a b. (a -> b) -> a -> b
$ WebhookRequest -> Value
forall a. ToJSON a => a -> Value
J.toJSON (WebhookRequest -> Value) -> WebhookRequest -> Value
forall a b. (a -> b) -> a -> b
$ Invocation 'ScheduledType -> WebhookRequest
forall (a :: TriggerTypes). Invocation a -> WebhookRequest
iRequest Invocation 'ScheduledType
invo,
          Value -> AltJ Value
forall a. a -> AltJ a
Q.AltJ (Value -> AltJ Value) -> Value -> AltJ Value
forall a b. (a -> b) -> a -> b
$ Response 'ScheduledType -> Value
forall a. ToJSON a => a -> Value
J.toJSON (Response 'ScheduledType -> Value)
-> Response 'ScheduledType -> Value
forall a b. (a -> b) -> a -> b
$ Invocation 'ScheduledType -> Response 'ScheduledType
forall (a :: TriggerTypes). Invocation a -> Response a
iResponse Invocation 'ScheduledType
invo
        )
        Bool
True
      (PGTxErr -> QErr)
-> Query -> Identity CronEventId -> Bool -> TxE QErr ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
Q.unitQE
        PGTxErr -> QErr
defaultTxErrorHandler
        [Q.sql|
          UPDATE hdb_catalog.hdb_scheduled_events
          SET tries = tries + 1
          WHERE id = $1
          |]
        (CronEventId -> Identity CronEventId
forall a. a -> Identity a
Identity (CronEventId -> Identity CronEventId)
-> CronEventId -> Identity CronEventId
forall a b. (a -> b) -> a -> b
$ Invocation 'ScheduledType -> CronEventId
forall (a :: TriggerTypes). Invocation a -> CronEventId
iEventId Invocation 'ScheduledType
invo)
        Bool
True

setScheduledEventOpTx ::
  ScheduledEventId -> ScheduledEventOp -> ScheduledEventType -> Q.TxE QErr ()
setScheduledEventOpTx :: CronEventId
-> ScheduledEventOp -> ScheduledEventType -> TxE QErr ()
setScheduledEventOpTx CronEventId
eventId ScheduledEventOp
op ScheduledEventType
type' = case ScheduledEventOp
op of
  SEOpRetry UTCTime
time -> UTCTime -> TxE QErr ()
setRetry UTCTime
time
  SEOpStatus ScheduledEventStatus
status -> ScheduledEventStatus -> TxE QErr ()
setStatus ScheduledEventStatus
status
  where
    setRetry :: UTCTime -> TxE QErr ()
setRetry UTCTime
time =
      case ScheduledEventType
type' of
        ScheduledEventType
Cron ->
          (PGTxErr -> QErr)
-> Query -> (UTCTime, CronEventId) -> Bool -> TxE QErr ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
Q.unitQE
            PGTxErr -> QErr
defaultTxErrorHandler
            [Q.sql|
            UPDATE hdb_catalog.hdb_cron_events
            SET next_retry_at = $1,
            STATUS = 'scheduled'
            WHERE id = $2
            |]
            (UTCTime
time, CronEventId
eventId)
            Bool
True
        ScheduledEventType
OneOff ->
          (PGTxErr -> QErr)
-> Query -> (UTCTime, CronEventId) -> Bool -> TxE QErr ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
Q.unitQE
            PGTxErr -> QErr
defaultTxErrorHandler
            [Q.sql|
            UPDATE hdb_catalog.hdb_scheduled_events
            SET next_retry_at = $1,
            STATUS = 'scheduled'
            WHERE id = $2
            |]
            (UTCTime
time, CronEventId
eventId)
            Bool
True
    setStatus :: ScheduledEventStatus -> TxE QErr ()
setStatus ScheduledEventStatus
status =
      case ScheduledEventType
type' of
        ScheduledEventType
Cron -> do
          (PGTxErr -> QErr)
-> Query
-> (CronEventId, ScheduledEventStatus)
-> Bool
-> TxE QErr ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
Q.unitQE
            PGTxErr -> QErr
defaultTxErrorHandler
            [Q.sql|
            UPDATE hdb_catalog.hdb_cron_events
            SET status = $2
            WHERE id = $1
           |]
            (CronEventId
eventId, ScheduledEventStatus
status)
            Bool
True
        ScheduledEventType
OneOff -> do
          (PGTxErr -> QErr)
-> Query
-> (CronEventId, ScheduledEventStatus)
-> Bool
-> TxE QErr ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
Q.unitQE
            PGTxErr -> QErr
defaultTxErrorHandler
            [Q.sql|
            UPDATE hdb_catalog.hdb_scheduled_events
            SET status = $2
            WHERE id = $1
           |]
            (CronEventId
eventId, ScheduledEventStatus
status)
            Bool
True

unlockScheduledEventsTx :: ScheduledEventType -> [ScheduledEventId] -> Q.TxE QErr Int
unlockScheduledEventsTx :: ScheduledEventType -> [CronEventId] -> TxE QErr Int
unlockScheduledEventsTx ScheduledEventType
type' [CronEventId]
eventIds =
  let eventIdsTextArray :: [Text]
eventIdsTextArray = (CronEventId -> Text) -> [CronEventId] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map CronEventId -> Text
unEventId [CronEventId]
eventIds
   in case ScheduledEventType
type' of
        ScheduledEventType
Cron ->
          (Identity Int -> Int
forall a. Identity a -> a
runIdentity (Identity Int -> Int)
-> (SingleRow (Identity Int) -> Identity Int)
-> SingleRow (Identity Int)
-> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SingleRow (Identity Int) -> Identity Int
forall a. SingleRow a -> a
Q.getRow)
            (SingleRow (Identity Int) -> Int)
-> TxET QErr IO (SingleRow (Identity Int)) -> TxE QErr Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> Identity PGTextArray
-> Bool
-> TxET QErr IO (SingleRow (Identity Int))
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
Q.withQE
              PGTxErr -> QErr
defaultTxErrorHandler
              [Q.sql|
        WITH "cte" AS
        (UPDATE hdb_catalog.hdb_cron_events
        SET status = 'scheduled'
        WHERE id = ANY($1::text[]) and status = 'locked'
        RETURNING *)
        SELECT count(*) FROM "cte"
      |]
              (PGTextArray -> Identity PGTextArray
forall a. a -> Identity a
Identity (PGTextArray -> Identity PGTextArray)
-> PGTextArray -> Identity PGTextArray
forall a b. (a -> b) -> a -> b
$ [Text] -> PGTextArray
PGTextArray [Text]
eventIdsTextArray)
              Bool
True
        ScheduledEventType
OneOff ->
          (Identity Int -> Int
forall a. Identity a -> a
runIdentity (Identity Int -> Int)
-> (SingleRow (Identity Int) -> Identity Int)
-> SingleRow (Identity Int)
-> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SingleRow (Identity Int) -> Identity Int
forall a. SingleRow a -> a
Q.getRow)
            (SingleRow (Identity Int) -> Int)
-> TxET QErr IO (SingleRow (Identity Int)) -> TxE QErr Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> Identity PGTextArray
-> Bool
-> TxET QErr IO (SingleRow (Identity Int))
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
Q.withQE
              PGTxErr -> QErr
defaultTxErrorHandler
              [Q.sql|
        WITH "cte" AS
        (UPDATE hdb_catalog.hdb_scheduled_events
        SET status = 'scheduled'
        WHERE id = ANY($1::text[]) AND status = 'locked'
        RETURNING *)
        SELECT count(*) FROM "cte"
      |]
              (PGTextArray -> Identity PGTextArray
forall a. a -> Identity a
Identity (PGTextArray -> Identity PGTextArray)
-> PGTextArray -> Identity PGTextArray
forall a b. (a -> b) -> a -> b
$ [Text] -> PGTextArray
PGTextArray [Text]
eventIdsTextArray)
              Bool
True

unlockAllLockedScheduledEventsTx :: Q.TxE QErr ()
unlockAllLockedScheduledEventsTx :: TxE QErr ()
unlockAllLockedScheduledEventsTx = do
  (PGTxErr -> QErr) -> Query -> () -> Bool -> TxE QErr ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
Q.unitQE
    PGTxErr -> QErr
defaultTxErrorHandler
    [Q.sql|
          UPDATE hdb_catalog.hdb_cron_events
          SET status = 'scheduled'
          WHERE status = 'locked'
          |]
    ()
    Bool
True
  (PGTxErr -> QErr) -> Query -> () -> Bool -> TxE QErr ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
Q.unitQE
    PGTxErr -> QErr
defaultTxErrorHandler
    [Q.sql|
          UPDATE hdb_catalog.hdb_scheduled_events
          SET status = 'scheduled'
          WHERE status = 'locked'
          |]
    ()
    Bool
True

insertCronEventsTx :: [CronEventSeed] -> Q.TxE QErr ()
insertCronEventsTx :: [CronEventSeed] -> TxE QErr ()
insertCronEventsTx [CronEventSeed]
cronSeeds = do
  let insertCronEventsSql :: Text
insertCronEventsSql =
        Builder -> Text
TB.run (Builder -> Text) -> Builder -> Text
forall a b. (a -> b) -> a -> b
$
          SQLInsert -> Builder
forall a. ToSQL a => a -> Builder
toSQL
            SQLInsert :: QualifiedTable
-> [PGCol]
-> ValuesExp
-> Maybe SQLConflict
-> Maybe RetExp
-> SQLInsert
S.SQLInsert
              { siTable :: QualifiedTable
siTable = QualifiedTable
cronEventsTable,
                siCols :: [PGCol]
siCols = (Text -> PGCol) -> [Text] -> [PGCol]
forall a b. (a -> b) -> [a] -> [b]
map Text -> PGCol
unsafePGCol [Text
"trigger_name", Text
"scheduled_time"],
                siValues :: ValuesExp
siValues = [TupleExp] -> ValuesExp
S.ValuesExp ([TupleExp] -> ValuesExp) -> [TupleExp] -> ValuesExp
forall a b. (a -> b) -> a -> b
$ (CronEventSeed -> TupleExp) -> [CronEventSeed] -> [TupleExp]
forall a b. (a -> b) -> [a] -> [b]
map ([Text] -> TupleExp
toTupleExp ([Text] -> TupleExp)
-> (CronEventSeed -> [Text]) -> CronEventSeed -> TupleExp
forall b c a. (b -> c) -> (a -> b) -> a -> c
. CronEventSeed -> [Text]
toArr) [CronEventSeed]
cronSeeds,
                siConflict :: Maybe SQLConflict
siConflict = SQLConflict -> Maybe SQLConflict
forall a. a -> Maybe a
Just (SQLConflict -> Maybe SQLConflict)
-> SQLConflict -> Maybe SQLConflict
forall a b. (a -> b) -> a -> b
$ Maybe SQLConflictTarget -> SQLConflict
S.DoNothing Maybe SQLConflictTarget
forall a. Maybe a
Nothing,
                siRet :: Maybe RetExp
siRet = Maybe RetExp
forall a. Maybe a
Nothing
              }
  (PGTxErr -> QErr) -> Query -> () -> Bool -> TxE QErr ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
Q.unitQE PGTxErr -> QErr
defaultTxErrorHandler (Text -> Query
Q.fromText Text
insertCronEventsSql) () Bool
False
  where
    toArr :: CronEventSeed -> [Text]
toArr (CronEventSeed TriggerName
n UTCTime
t) = [(TriggerName -> Text
triggerNameToTxt TriggerName
n), (UTCTime -> Text
formatTime' UTCTime
t)]
    toTupleExp :: [Text] -> TupleExp
toTupleExp = [SQLExp] -> TupleExp
S.TupleExp ([SQLExp] -> TupleExp)
-> ([Text] -> [SQLExp]) -> [Text] -> TupleExp
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Text -> SQLExp) -> [Text] -> [SQLExp]
forall a b. (a -> b) -> [a] -> [b]
map Text -> SQLExp
S.SELit

insertOneOffScheduledEventTx :: OneOffEvent -> Q.TxE QErr EventId
insertOneOffScheduledEventTx :: OneOffEvent -> TxE QErr CronEventId
insertOneOffScheduledEventTx CreateScheduledEvent {[HeaderConf]
Maybe Value
Maybe Text
Maybe MetadataResponseTransform
Maybe RequestTransform
UTCTime
InputWebhook
STRetryConf
cseResponseTransform :: OneOffEvent -> Maybe MetadataResponseTransform
cseRequestTransform :: OneOffEvent -> Maybe RequestTransform
cseComment :: OneOffEvent -> Maybe Text
cseRetryConf :: OneOffEvent -> STRetryConf
cseHeaders :: OneOffEvent -> [HeaderConf]
csePayload :: OneOffEvent -> Maybe Value
cseScheduleAt :: OneOffEvent -> UTCTime
cseWebhook :: OneOffEvent -> InputWebhook
cseResponseTransform :: Maybe MetadataResponseTransform
cseRequestTransform :: Maybe RequestTransform
cseComment :: Maybe Text
cseRetryConf :: STRetryConf
cseHeaders :: [HeaderConf]
csePayload :: Maybe Value
cseScheduleAt :: UTCTime
cseWebhook :: InputWebhook
..} =
  Identity CronEventId -> CronEventId
forall a. Identity a -> a
runIdentity (Identity CronEventId -> CronEventId)
-> (SingleRow (Identity CronEventId) -> Identity CronEventId)
-> SingleRow (Identity CronEventId)
-> CronEventId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SingleRow (Identity CronEventId) -> Identity CronEventId
forall a. SingleRow a -> a
Q.getRow
    (SingleRow (Identity CronEventId) -> CronEventId)
-> TxET QErr IO (SingleRow (Identity CronEventId))
-> TxE QErr CronEventId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> (AltJ InputWebhook, UTCTime, AltJ (Maybe Value),
    AltJ STRetryConf, AltJ [HeaderConf], Maybe Text)
-> Bool
-> TxET QErr IO (SingleRow (Identity CronEventId))
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
Q.withQE
      PGTxErr -> QErr
defaultTxErrorHandler
      [Q.sql|
    INSERT INTO hdb_catalog.hdb_scheduled_events
    (webhook_conf,scheduled_time,payload,retry_conf,header_conf,comment)
    VALUES
    ($1, $2, $3, $4, $5, $6) RETURNING id
    |]
      ( InputWebhook -> AltJ InputWebhook
forall a. a -> AltJ a
Q.AltJ InputWebhook
cseWebhook,
        UTCTime
cseScheduleAt,
        Maybe Value -> AltJ (Maybe Value)
forall a. a -> AltJ a
Q.AltJ Maybe Value
csePayload,
        STRetryConf -> AltJ STRetryConf
forall a. a -> AltJ a
Q.AltJ STRetryConf
cseRetryConf,
        [HeaderConf] -> AltJ [HeaderConf]
forall a. a -> AltJ a
Q.AltJ [HeaderConf]
cseHeaders,
        Maybe Text
cseComment
      )
      Bool
False

dropFutureCronEventsTx :: ClearCronEvents -> Q.TxE QErr ()
dropFutureCronEventsTx :: ClearCronEvents -> TxE QErr ()
dropFutureCronEventsTx = \case
  SingleCronTrigger TriggerName
triggerName ->
    (PGTxErr -> QErr)
-> Query -> Identity TriggerName -> Bool -> TxE QErr ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
Q.unitQE
      PGTxErr -> QErr
defaultTxErrorHandler
      [Q.sql|
     DELETE FROM hdb_catalog.hdb_cron_events
     WHERE trigger_name = $1 AND scheduled_time > now() AND tries = 0
    |]
      (TriggerName -> Identity TriggerName
forall a. a -> Identity a
Identity TriggerName
triggerName)
      Bool
True
  MetadataCronTriggers [TriggerName]
triggerNames ->
    (PGTxErr -> QErr)
-> Query -> Identity PGTextArray -> Bool -> TxE QErr ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
Q.unitQE
      PGTxErr -> QErr
defaultTxErrorHandler
      [Q.sql|
     DELETE FROM hdb_catalog.hdb_cron_events
     WHERE scheduled_time > now() AND tries = 0 AND trigger_name = ANY($1::text[])
    |]
      (PGTextArray -> Identity PGTextArray
forall a. a -> Identity a
Identity (PGTextArray -> Identity PGTextArray)
-> PGTextArray -> Identity PGTextArray
forall a b. (a -> b) -> a -> b
$ [Text] -> PGTextArray
PGTextArray ([Text] -> PGTextArray) -> [Text] -> PGTextArray
forall a b. (a -> b) -> a -> b
$ (TriggerName -> Text) -> [TriggerName] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map TriggerName -> Text
triggerNameToTxt [TriggerName]
triggerNames)
      Bool
False

cronEventsTable :: QualifiedTable
cronEventsTable :: QualifiedTable
cronEventsTable =
  SchemaName -> TableName -> QualifiedTable
forall a. SchemaName -> a -> QualifiedObject a
QualifiedObject SchemaName
"hdb_catalog" (TableName -> QualifiedTable) -> TableName -> QualifiedTable
forall a b. (a -> b) -> a -> b
$ Text -> TableName
TableName Text
"hdb_cron_events"

mkScheduledEventStatusFilter :: [ScheduledEventStatus] -> S.BoolExp
mkScheduledEventStatusFilter :: [ScheduledEventStatus] -> BoolExp
mkScheduledEventStatusFilter = \case
  [] -> Bool -> BoolExp
S.BELit Bool
True
  [ScheduledEventStatus]
v ->
    SQLExp -> [SQLExp] -> BoolExp
S.BEIN (Identifier -> SQLExp
S.SEIdentifier (Identifier -> SQLExp) -> Identifier -> SQLExp
forall a b. (a -> b) -> a -> b
$ Text -> Identifier
Identifier Text
"status") ([SQLExp] -> BoolExp) -> [SQLExp] -> BoolExp
forall a b. (a -> b) -> a -> b
$
      (ScheduledEventStatus -> SQLExp)
-> [ScheduledEventStatus] -> [SQLExp]
forall a b. (a -> b) -> [a] -> [b]
map (Text -> SQLExp
S.SELit (Text -> SQLExp)
-> (ScheduledEventStatus -> Text) -> ScheduledEventStatus -> SQLExp
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ScheduledEventStatus -> Text
scheduledEventStatusToText) [ScheduledEventStatus]
v

scheduledTimeOrderBy :: S.OrderByExp
scheduledTimeOrderBy :: OrderByExp
scheduledTimeOrderBy =
  let scheduledTimeCol :: SQLExp
scheduledTimeCol = Identifier -> SQLExp
S.SEIdentifier (Identifier -> SQLExp) -> Identifier -> SQLExp
forall a b. (a -> b) -> a -> b
$ Text -> Identifier
Identifier Text
"scheduled_time"
   in NonEmpty OrderByItem -> OrderByExp
S.OrderByExp (NonEmpty OrderByItem -> OrderByExp)
-> NonEmpty OrderByItem -> OrderByExp
forall a b. (a -> b) -> a -> b
$
        (OrderByItem -> [OrderByItem] -> NonEmpty OrderByItem)
-> [OrderByItem] -> OrderByItem -> NonEmpty OrderByItem
forall a b c. (a -> b -> c) -> b -> a -> c
flip OrderByItem -> [OrderByItem] -> NonEmpty OrderByItem
forall a. a -> [a] -> NonEmpty a
(NE.:|) [] (OrderByItem -> NonEmpty OrderByItem)
-> OrderByItem -> NonEmpty OrderByItem
forall a b. (a -> b) -> a -> b
$
          SQLExp -> Maybe OrderType -> Maybe NullsOrder -> OrderByItem
S.OrderByItem
            SQLExp
scheduledTimeCol
            (OrderType -> Maybe OrderType
forall a. a -> Maybe a
Just OrderType
S.OTAsc)
            Maybe NullsOrder
forall a. Maybe a
Nothing

-- | Build a select expression which outputs total count and
-- list of json rows with pagination limit and offset applied
mkPaginationSelectExp ::
  S.Select ->
  ScheduledEventPagination ->
  S.Select
mkPaginationSelectExp :: Select -> ScheduledEventPagination -> Select
mkPaginationSelectExp Select
allRowsSelect ScheduledEventPagination {Maybe Int
_sepOffset :: ScheduledEventPagination -> Maybe Int
_sepLimit :: ScheduledEventPagination -> Maybe Int
_sepOffset :: Maybe Int
_sepLimit :: Maybe Int
..} =
  Select
S.mkSelect
    { selCTEs :: [(TableAlias, Select)]
S.selCTEs = [(Identifier -> TableAlias
forall a. IsIdentifier a => a -> TableAlias
S.toTableAlias Identifier
countCteAlias, Select
allRowsSelect), (Identifier -> TableAlias
forall a. IsIdentifier a => a -> TableAlias
S.toTableAlias Identifier
limitCteAlias, Select
limitCteSelect)],
      selExtr :: [Extractor]
S.selExtr = [Extractor
countExtractor, Extractor
rowsExtractor]
    }
  where
    countCteAlias :: Identifier
countCteAlias = Text -> Identifier
Identifier Text
"count_cte"
    limitCteAlias :: Identifier
limitCteAlias = Text -> Identifier
Identifier Text
"limit_cte"

    countExtractor :: Extractor
countExtractor =
      let selectExp :: Select
selectExp =
            Select
S.mkSelect
              { selExtr :: [Extractor]
S.selExtr = [SQLExp -> Maybe ColumnAlias -> Extractor
S.Extractor SQLExp
S.countStar Maybe ColumnAlias
forall a. Maybe a
Nothing],
                selFrom :: Maybe FromExp
S.selFrom = FromExp -> Maybe FromExp
forall a. a -> Maybe a
Just (FromExp -> Maybe FromExp) -> FromExp -> Maybe FromExp
forall a b. (a -> b) -> a -> b
$ Identifier -> FromExp
forall a. IsIdentifier a => a -> FromExp
S.mkIdenFromExp Identifier
countCteAlias
              }
       in SQLExp -> Maybe ColumnAlias -> Extractor
S.Extractor (Select -> SQLExp
S.SESelect Select
selectExp) Maybe ColumnAlias
forall a. Maybe a
Nothing

    limitCteSelect :: Select
limitCteSelect =
      Select
S.mkSelect
        { selExtr :: [Extractor]
S.selExtr = [Extractor
S.selectStar],
          selFrom :: Maybe FromExp
S.selFrom = FromExp -> Maybe FromExp
forall a. a -> Maybe a
Just (FromExp -> Maybe FromExp) -> FromExp -> Maybe FromExp
forall a b. (a -> b) -> a -> b
$ Identifier -> FromExp
forall a. IsIdentifier a => a -> FromExp
S.mkIdenFromExp Identifier
countCteAlias,
          selLimit :: Maybe LimitExp
S.selLimit = (SQLExp -> LimitExp
S.LimitExp (SQLExp -> LimitExp) -> (Int -> SQLExp) -> Int -> LimitExp
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> SQLExp
S.intToSQLExp) (Int -> LimitExp) -> Maybe Int -> Maybe LimitExp
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe Int
_sepLimit,
          selOffset :: Maybe OffsetExp
S.selOffset = (SQLExp -> OffsetExp
S.OffsetExp (SQLExp -> OffsetExp) -> (Int -> SQLExp) -> Int -> OffsetExp
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> SQLExp
S.intToSQLExp) (Int -> OffsetExp) -> Maybe Int -> Maybe OffsetExp
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe Int
_sepOffset
        }

    rowsExtractor :: Extractor
rowsExtractor =
      let jsonAgg :: SQLExp
jsonAgg = Text -> SQLExp
S.SEUnsafe Text
"json_agg(row_to_json(limit_cte.*))"
          selectExp :: Select
selectExp =
            Select
S.mkSelect
              { selExtr :: [Extractor]
S.selExtr = [SQLExp -> Maybe ColumnAlias -> Extractor
S.Extractor SQLExp
jsonAgg Maybe ColumnAlias
forall a. Maybe a
Nothing],
                selFrom :: Maybe FromExp
S.selFrom = FromExp -> Maybe FromExp
forall a. a -> Maybe a
Just (FromExp -> Maybe FromExp) -> FromExp -> Maybe FromExp
forall a b. (a -> b) -> a -> b
$ Identifier -> FromExp
forall a. IsIdentifier a => a -> FromExp
S.mkIdenFromExp Identifier
limitCteAlias
              }
       in SQLExp -> Maybe ColumnAlias -> Extractor
S.Extractor (SQLExp -> SQLExp -> SQLExp
S.handleIfNull (Text -> SQLExp
S.SELit Text
"[]") (Select -> SQLExp
S.SESelect Select
selectExp)) Maybe ColumnAlias
forall a. Maybe a
Nothing

withCount :: (Int, Q.AltJ a) -> WithTotalCount a
withCount :: (Int, AltJ a) -> WithTotalCount a
withCount (Int
count, Q.AltJ a
a) = Int -> a -> WithTotalCount a
forall a. Int -> a -> WithTotalCount a
WithTotalCount Int
count a
a

getOneOffScheduledEventsTx ::
  ScheduledEventPagination ->
  [ScheduledEventStatus] ->
  Q.TxE QErr (WithTotalCount [OneOffScheduledEvent])
getOneOffScheduledEventsTx :: ScheduledEventPagination
-> [ScheduledEventStatus]
-> TxE QErr (WithTotalCount [OneOffScheduledEvent])
getOneOffScheduledEventsTx ScheduledEventPagination
pagination [ScheduledEventStatus]
statuses = do
  let table :: QualifiedTable
table = SchemaName -> TableName -> QualifiedTable
forall a. SchemaName -> a -> QualifiedObject a
QualifiedObject SchemaName
"hdb_catalog" (TableName -> QualifiedTable) -> TableName -> QualifiedTable
forall a b. (a -> b) -> a -> b
$ Text -> TableName
TableName Text
"hdb_scheduled_events"
      statusFilter :: BoolExp
statusFilter = [ScheduledEventStatus] -> BoolExp
mkScheduledEventStatusFilter [ScheduledEventStatus]
statuses
      select :: Select
select =
        Select
S.mkSelect
          { selExtr :: [Extractor]
S.selExtr = [Extractor
S.selectStar],
            selFrom :: Maybe FromExp
S.selFrom = FromExp -> Maybe FromExp
forall a. a -> Maybe a
Just (FromExp -> Maybe FromExp) -> FromExp -> Maybe FromExp
forall a b. (a -> b) -> a -> b
$ QualifiedTable -> FromExp
S.mkSimpleFromExp QualifiedTable
table,
            selWhere :: Maybe WhereFrag
S.selWhere = WhereFrag -> Maybe WhereFrag
forall a. a -> Maybe a
Just (WhereFrag -> Maybe WhereFrag) -> WhereFrag -> Maybe WhereFrag
forall a b. (a -> b) -> a -> b
$ BoolExp -> WhereFrag
S.WhereFrag BoolExp
statusFilter,
            selOrderBy :: Maybe OrderByExp
S.selOrderBy = OrderByExp -> Maybe OrderByExp
forall a. a -> Maybe a
Just OrderByExp
scheduledTimeOrderBy
          }
      sql :: Query
sql = Builder -> Query
Q.fromBuilder (Builder -> Query) -> Builder -> Query
forall a b. (a -> b) -> a -> b
$ Select -> Builder
forall a. ToSQL a => a -> Builder
toSQL (Select -> Builder) -> Select -> Builder
forall a b. (a -> b) -> a -> b
$ Select -> ScheduledEventPagination -> Select
mkPaginationSelectExp Select
select ScheduledEventPagination
pagination
  ((Int, AltJ [OneOffScheduledEvent])
-> WithTotalCount [OneOffScheduledEvent]
forall a. (Int, AltJ a) -> WithTotalCount a
withCount ((Int, AltJ [OneOffScheduledEvent])
 -> WithTotalCount [OneOffScheduledEvent])
-> (SingleRow (Int, AltJ [OneOffScheduledEvent])
    -> (Int, AltJ [OneOffScheduledEvent]))
-> SingleRow (Int, AltJ [OneOffScheduledEvent])
-> WithTotalCount [OneOffScheduledEvent]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SingleRow (Int, AltJ [OneOffScheduledEvent])
-> (Int, AltJ [OneOffScheduledEvent])
forall a. SingleRow a -> a
Q.getRow) (SingleRow (Int, AltJ [OneOffScheduledEvent])
 -> WithTotalCount [OneOffScheduledEvent])
-> TxET QErr IO (SingleRow (Int, AltJ [OneOffScheduledEvent]))
-> TxE QErr (WithTotalCount [OneOffScheduledEvent])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> ()
-> Bool
-> TxET QErr IO (SingleRow (Int, AltJ [OneOffScheduledEvent]))
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
Q.withQE PGTxErr -> QErr
defaultTxErrorHandler Query
sql () Bool
False

getCronEventsTx ::
  TriggerName ->
  ScheduledEventPagination ->
  [ScheduledEventStatus] ->
  Q.TxE QErr (WithTotalCount [CronEvent])
getCronEventsTx :: TriggerName
-> ScheduledEventPagination
-> [ScheduledEventStatus]
-> TxE QErr (WithTotalCount [CronEvent])
getCronEventsTx TriggerName
triggerName ScheduledEventPagination
pagination [ScheduledEventStatus]
status = do
  let triggerNameFilter :: BoolExp
triggerNameFilter =
        CompareOp -> SQLExp -> SQLExp -> BoolExp
S.BECompare CompareOp
S.SEQ (Identifier -> SQLExp
S.SEIdentifier (Identifier -> SQLExp) -> Identifier -> SQLExp
forall a b. (a -> b) -> a -> b
$ Text -> Identifier
Identifier Text
"trigger_name") (Text -> SQLExp
S.SELit (Text -> SQLExp) -> Text -> SQLExp
forall a b. (a -> b) -> a -> b
$ TriggerName -> Text
triggerNameToTxt TriggerName
triggerName)
      statusFilter :: BoolExp
statusFilter = [ScheduledEventStatus] -> BoolExp
mkScheduledEventStatusFilter [ScheduledEventStatus]
status
      select :: Select
select =
        Select
S.mkSelect
          { selExtr :: [Extractor]
S.selExtr = [Extractor
S.selectStar],
            selFrom :: Maybe FromExp
S.selFrom = FromExp -> Maybe FromExp
forall a. a -> Maybe a
Just (FromExp -> Maybe FromExp) -> FromExp -> Maybe FromExp
forall a b. (a -> b) -> a -> b
$ QualifiedTable -> FromExp
S.mkSimpleFromExp QualifiedTable
cronEventsTable,
            selWhere :: Maybe WhereFrag
S.selWhere = WhereFrag -> Maybe WhereFrag
forall a. a -> Maybe a
Just (WhereFrag -> Maybe WhereFrag) -> WhereFrag -> Maybe WhereFrag
forall a b. (a -> b) -> a -> b
$ BoolExp -> WhereFrag
S.WhereFrag (BoolExp -> WhereFrag) -> BoolExp -> WhereFrag
forall a b. (a -> b) -> a -> b
$ BinOp -> BoolExp -> BoolExp -> BoolExp
S.BEBin BinOp
S.AndOp BoolExp
triggerNameFilter BoolExp
statusFilter,
            selOrderBy :: Maybe OrderByExp
S.selOrderBy = OrderByExp -> Maybe OrderByExp
forall a. a -> Maybe a
Just OrderByExp
scheduledTimeOrderBy
          }
      sql :: Query
sql = Builder -> Query
Q.fromBuilder (Builder -> Query) -> Builder -> Query
forall a b. (a -> b) -> a -> b
$ Select -> Builder
forall a. ToSQL a => a -> Builder
toSQL (Select -> Builder) -> Select -> Builder
forall a b. (a -> b) -> a -> b
$ Select -> ScheduledEventPagination -> Select
mkPaginationSelectExp Select
select ScheduledEventPagination
pagination
  ((Int, AltJ [CronEvent]) -> WithTotalCount [CronEvent]
forall a. (Int, AltJ a) -> WithTotalCount a
withCount ((Int, AltJ [CronEvent]) -> WithTotalCount [CronEvent])
-> (SingleRow (Int, AltJ [CronEvent]) -> (Int, AltJ [CronEvent]))
-> SingleRow (Int, AltJ [CronEvent])
-> WithTotalCount [CronEvent]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SingleRow (Int, AltJ [CronEvent]) -> (Int, AltJ [CronEvent])
forall a. SingleRow a -> a
Q.getRow) (SingleRow (Int, AltJ [CronEvent]) -> WithTotalCount [CronEvent])
-> TxET QErr IO (SingleRow (Int, AltJ [CronEvent]))
-> TxE QErr (WithTotalCount [CronEvent])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> ()
-> Bool
-> TxET QErr IO (SingleRow (Int, AltJ [CronEvent]))
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
Q.withQE PGTxErr -> QErr
defaultTxErrorHandler Query
sql () Bool
False

deleteScheduledEventTx ::
  ScheduledEventId -> ScheduledEventType -> Q.TxE QErr ()
deleteScheduledEventTx :: CronEventId -> ScheduledEventType -> TxE QErr ()
deleteScheduledEventTx CronEventId
eventId = \case
  ScheduledEventType
OneOff ->
    (PGTxErr -> QErr)
-> Query -> Identity CronEventId -> Bool -> TxE QErr ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
Q.unitQE
      PGTxErr -> QErr
defaultTxErrorHandler
      [Q.sql|
      DELETE FROM hdb_catalog.hdb_scheduled_events
       WHERE id = $1
    |]
      (CronEventId -> Identity CronEventId
forall a. a -> Identity a
Identity CronEventId
eventId)
      Bool
False
  ScheduledEventType
Cron ->
    (PGTxErr -> QErr)
-> Query -> Identity CronEventId -> Bool -> TxE QErr ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
Q.unitQE
      PGTxErr -> QErr
defaultTxErrorHandler
      [Q.sql|
      DELETE FROM hdb_catalog.hdb_cron_events
       WHERE id = $1
    |]
      (CronEventId -> Identity CronEventId
forall a. a -> Identity a
Identity CronEventId
eventId)
      Bool
False

invocationFieldExtractors :: QualifiedTable -> [S.Extractor]
invocationFieldExtractors :: QualifiedTable -> [Extractor]
invocationFieldExtractors QualifiedTable
table =
  [ SQLExp -> Maybe ColumnAlias -> Extractor
S.Extractor (Text -> SQLExp
seIden Text
"id") Maybe ColumnAlias
forall a. Maybe a
Nothing,
    SQLExp -> Maybe ColumnAlias -> Extractor
S.Extractor (Text -> SQLExp
seIden Text
"event_id") Maybe ColumnAlias
forall a. Maybe a
Nothing,
    SQLExp -> Maybe ColumnAlias -> Extractor
S.Extractor (Text -> SQLExp
seIden Text
"status") Maybe ColumnAlias
forall a. Maybe a
Nothing,
    SQLExp -> Maybe ColumnAlias -> Extractor
S.Extractor (SQLExp -> SQLExp
withJsonTypeAnn (SQLExp -> SQLExp) -> SQLExp -> SQLExp
forall a b. (a -> b) -> a -> b
$ Text -> SQLExp
seIden Text
"request") Maybe ColumnAlias
forall a. Maybe a
Nothing,
    SQLExp -> Maybe ColumnAlias -> Extractor
S.Extractor (SQLExp -> SQLExp
withJsonTypeAnn (SQLExp -> SQLExp) -> SQLExp -> SQLExp
forall a b. (a -> b) -> a -> b
$ Text -> SQLExp
seIden Text
"response") Maybe ColumnAlias
forall a. Maybe a
Nothing,
    SQLExp -> Maybe ColumnAlias -> Extractor
S.Extractor (Text -> SQLExp
seIden Text
"created_at") Maybe ColumnAlias
forall a. Maybe a
Nothing
  ]
  where
    withJsonTypeAnn :: SQLExp -> SQLExp
withJsonTypeAnn SQLExp
e = SQLExp -> TypeAnn -> SQLExp
S.SETyAnn SQLExp
e (TypeAnn -> SQLExp) -> TypeAnn -> SQLExp
forall a b. (a -> b) -> a -> b
$ Text -> TypeAnn
S.TypeAnn Text
"json"
    seIden :: Text -> SQLExp
seIden = QIdentifier -> SQLExp
S.SEQIdentifier (QIdentifier -> SQLExp) -> (Text -> QIdentifier) -> Text -> SQLExp
forall b c a. (b -> c) -> (a -> b) -> a -> c
. QualifiedTable -> Identifier -> QIdentifier
forall a. IsIdentifier a => QualifiedTable -> a -> QIdentifier
S.mkQIdentifierTable QualifiedTable
table (Identifier -> QIdentifier)
-> (Text -> Identifier) -> Text -> QIdentifier
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> Identifier
Identifier

mkEventIdBoolExp :: QualifiedTable -> EventId -> S.BoolExp
mkEventIdBoolExp :: QualifiedTable -> CronEventId -> BoolExp
mkEventIdBoolExp QualifiedTable
table CronEventId
eventId =
  CompareOp -> SQLExp -> SQLExp -> BoolExp
S.BECompare
    CompareOp
S.SEQ
    (QIdentifier -> SQLExp
S.SEQIdentifier (QIdentifier -> SQLExp) -> QIdentifier -> SQLExp
forall a b. (a -> b) -> a -> b
$ QualifiedTable -> Identifier -> QIdentifier
forall a. IsIdentifier a => QualifiedTable -> a -> QIdentifier
S.mkQIdentifierTable QualifiedTable
table (Identifier -> QIdentifier) -> Identifier -> QIdentifier
forall a b. (a -> b) -> a -> b
$ Text -> Identifier
Identifier Text
"event_id")
    (Text -> SQLExp
S.SELit (Text -> SQLExp) -> Text -> SQLExp
forall a b. (a -> b) -> a -> b
$ CronEventId -> Text
unEventId CronEventId
eventId)

getInvocationsTx ::
  GetInvocationsBy ->
  ScheduledEventPagination ->
  Q.TxE QErr (WithTotalCount [ScheduledEventInvocation])
getInvocationsTx :: GetInvocationsBy
-> ScheduledEventPagination
-> TxE QErr (WithTotalCount [ScheduledEventInvocation])
getInvocationsTx GetInvocationsBy
invocationsBy ScheduledEventPagination
pagination = do
  let eventsTables :: EventTables
eventsTables = QualifiedTable -> QualifiedTable -> QualifiedTable -> EventTables
EventTables QualifiedTable
oneOffInvocationsTable QualifiedTable
cronInvocationsTable QualifiedTable
cronEventsTable
      sql :: Query
sql = Builder -> Query
Q.fromBuilder (Builder -> Query) -> Builder -> Query
forall a b. (a -> b) -> a -> b
$ Select -> Builder
forall a. ToSQL a => a -> Builder
toSQL (Select -> Builder) -> Select -> Builder
forall a b. (a -> b) -> a -> b
$ EventTables
-> GetInvocationsBy -> ScheduledEventPagination -> Select
getInvocationsQuery EventTables
eventsTables GetInvocationsBy
invocationsBy ScheduledEventPagination
pagination
  ((Int, AltJ [ScheduledEventInvocation])
-> WithTotalCount [ScheduledEventInvocation]
forall a. (Int, AltJ a) -> WithTotalCount a
withCount ((Int, AltJ [ScheduledEventInvocation])
 -> WithTotalCount [ScheduledEventInvocation])
-> (SingleRow (Int, AltJ [ScheduledEventInvocation])
    -> (Int, AltJ [ScheduledEventInvocation]))
-> SingleRow (Int, AltJ [ScheduledEventInvocation])
-> WithTotalCount [ScheduledEventInvocation]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SingleRow (Int, AltJ [ScheduledEventInvocation])
-> (Int, AltJ [ScheduledEventInvocation])
forall a. SingleRow a -> a
Q.getRow) (SingleRow (Int, AltJ [ScheduledEventInvocation])
 -> WithTotalCount [ScheduledEventInvocation])
-> TxET QErr IO (SingleRow (Int, AltJ [ScheduledEventInvocation]))
-> TxE QErr (WithTotalCount [ScheduledEventInvocation])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> ()
-> Bool
-> TxET QErr IO (SingleRow (Int, AltJ [ScheduledEventInvocation]))
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
Q.withQE PGTxErr -> QErr
defaultTxErrorHandler Query
sql () Bool
True
  where
    oneOffInvocationsTable :: QualifiedTable
oneOffInvocationsTable = SchemaName -> TableName -> QualifiedTable
forall a. SchemaName -> a -> QualifiedObject a
QualifiedObject SchemaName
"hdb_catalog" (TableName -> QualifiedTable) -> TableName -> QualifiedTable
forall a b. (a -> b) -> a -> b
$ Text -> TableName
TableName Text
"hdb_scheduled_event_invocation_logs"
    cronInvocationsTable :: QualifiedTable
cronInvocationsTable = SchemaName -> TableName -> QualifiedTable
forall a. SchemaName -> a -> QualifiedObject a
QualifiedObject SchemaName
"hdb_catalog" (TableName -> QualifiedTable) -> TableName -> QualifiedTable
forall a b. (a -> b) -> a -> b
$ Text -> TableName
TableName Text
"hdb_cron_event_invocation_logs"

data EventTables = EventTables
  { EventTables -> QualifiedTable
etOneOffInvocationsTable :: QualifiedTable,
    EventTables -> QualifiedTable
etCronInvocationsTable :: QualifiedTable,
    EventTables -> QualifiedTable
etCronEventsTable :: QualifiedTable
  }

getInvocationsQueryNoPagination :: EventTables -> GetInvocationsBy -> S.Select
getInvocationsQueryNoPagination :: EventTables -> GetInvocationsBy -> Select
getInvocationsQueryNoPagination (EventTables QualifiedTable
oneOffInvocationsTable QualifiedTable
cronInvocationsTable QualifiedTable
cronEventsTable') GetInvocationsBy
invocationsBy =
  Select
allRowsSelect
  where
    createdAtOrderBy :: QualifiedTable -> OrderByExp
createdAtOrderBy QualifiedTable
table =
      let createdAtCol :: SQLExp
createdAtCol = QIdentifier -> SQLExp
S.SEQIdentifier (QIdentifier -> SQLExp) -> QIdentifier -> SQLExp
forall a b. (a -> b) -> a -> b
$ QualifiedTable -> Identifier -> QIdentifier
forall a. IsIdentifier a => QualifiedTable -> a -> QIdentifier
S.mkQIdentifierTable QualifiedTable
table (Identifier -> QIdentifier) -> Identifier -> QIdentifier
forall a b. (a -> b) -> a -> b
$ Text -> Identifier
Identifier Text
"created_at"
       in NonEmpty OrderByItem -> OrderByExp
S.OrderByExp (NonEmpty OrderByItem -> OrderByExp)
-> NonEmpty OrderByItem -> OrderByExp
forall a b. (a -> b) -> a -> b
$ (OrderByItem -> [OrderByItem] -> NonEmpty OrderByItem)
-> [OrderByItem] -> OrderByItem -> NonEmpty OrderByItem
forall a b c. (a -> b -> c) -> b -> a -> c
flip OrderByItem -> [OrderByItem] -> NonEmpty OrderByItem
forall a. a -> [a] -> NonEmpty a
(NE.:|) [] (OrderByItem -> NonEmpty OrderByItem)
-> OrderByItem -> NonEmpty OrderByItem
forall a b. (a -> b) -> a -> b
$ SQLExp -> Maybe OrderType -> Maybe NullsOrder -> OrderByItem
S.OrderByItem SQLExp
createdAtCol (OrderType -> Maybe OrderType
forall a. a -> Maybe a
Just OrderType
S.OTDesc) Maybe NullsOrder
forall a. Maybe a
Nothing

    allRowsSelect :: Select
allRowsSelect = case GetInvocationsBy
invocationsBy of
      GIBEventId CronEventId
eventId ScheduledEventType
eventType ->
        let table :: QualifiedTable
table = case ScheduledEventType
eventType of
              ScheduledEventType
OneOff -> QualifiedTable
oneOffInvocationsTable
              ScheduledEventType
Cron -> QualifiedTable
cronInvocationsTable
         in Select
S.mkSelect
              { selExtr :: [Extractor]
S.selExtr = QualifiedTable -> [Extractor]
invocationFieldExtractors QualifiedTable
table,
                selFrom :: Maybe FromExp
S.selFrom = FromExp -> Maybe FromExp
forall a. a -> Maybe a
Just (FromExp -> Maybe FromExp) -> FromExp -> Maybe FromExp
forall a b. (a -> b) -> a -> b
$ QualifiedTable -> FromExp
S.mkSimpleFromExp QualifiedTable
table,
                selOrderBy :: Maybe OrderByExp
S.selOrderBy = OrderByExp -> Maybe OrderByExp
forall a. a -> Maybe a
Just (OrderByExp -> Maybe OrderByExp) -> OrderByExp -> Maybe OrderByExp
forall a b. (a -> b) -> a -> b
$ QualifiedTable -> OrderByExp
createdAtOrderBy QualifiedTable
table,
                selWhere :: Maybe WhereFrag
S.selWhere = WhereFrag -> Maybe WhereFrag
forall a. a -> Maybe a
Just (WhereFrag -> Maybe WhereFrag) -> WhereFrag -> Maybe WhereFrag
forall a b. (a -> b) -> a -> b
$ BoolExp -> WhereFrag
S.WhereFrag (BoolExp -> WhereFrag) -> BoolExp -> WhereFrag
forall a b. (a -> b) -> a -> b
$ QualifiedTable -> CronEventId -> BoolExp
mkEventIdBoolExp QualifiedTable
table CronEventId
eventId
              }
      GIBEvent ScheduledEvent
event -> case ScheduledEvent
event of
        ScheduledEvent
SEOneOff ->
          let table :: QualifiedTable
table = QualifiedTable
oneOffInvocationsTable
           in Select
S.mkSelect
                { selExtr :: [Extractor]
S.selExtr = QualifiedTable -> [Extractor]
invocationFieldExtractors QualifiedTable
table,
                  selFrom :: Maybe FromExp
S.selFrom = FromExp -> Maybe FromExp
forall a. a -> Maybe a
Just (FromExp -> Maybe FromExp) -> FromExp -> Maybe FromExp
forall a b. (a -> b) -> a -> b
$ QualifiedTable -> FromExp
S.mkSimpleFromExp QualifiedTable
table,
                  selOrderBy :: Maybe OrderByExp
S.selOrderBy = OrderByExp -> Maybe OrderByExp
forall a. a -> Maybe a
Just (OrderByExp -> Maybe OrderByExp) -> OrderByExp -> Maybe OrderByExp
forall a b. (a -> b) -> a -> b
$ QualifiedTable -> OrderByExp
createdAtOrderBy QualifiedTable
table
                }
        SECron TriggerName
triggerName ->
          let invocationTable :: QualifiedTable
invocationTable = QualifiedTable
cronInvocationsTable
              eventTable :: QualifiedTable
eventTable = QualifiedTable
cronEventsTable'
              joinCondition :: JoinCond
joinCondition =
                BoolExp -> JoinCond
S.JoinOn (BoolExp -> JoinCond) -> BoolExp -> JoinCond
forall a b. (a -> b) -> a -> b
$
                  CompareOp -> SQLExp -> SQLExp -> BoolExp
S.BECompare
                    CompareOp
S.SEQ
                    (QIdentifier -> SQLExp
S.SEQIdentifier (QIdentifier -> SQLExp) -> QIdentifier -> SQLExp
forall a b. (a -> b) -> a -> b
$ QualifiedTable -> Identifier -> QIdentifier
forall a. IsIdentifier a => QualifiedTable -> a -> QIdentifier
S.mkQIdentifierTable QualifiedTable
eventTable (Identifier -> QIdentifier) -> Identifier -> QIdentifier
forall a b. (a -> b) -> a -> b
$ Text -> Identifier
Identifier Text
"id")
                    (QIdentifier -> SQLExp
S.SEQIdentifier (QIdentifier -> SQLExp) -> QIdentifier -> SQLExp
forall a b. (a -> b) -> a -> b
$ QualifiedTable -> Identifier -> QIdentifier
forall a. IsIdentifier a => QualifiedTable -> a -> QIdentifier
S.mkQIdentifierTable QualifiedTable
invocationTable (Identifier -> QIdentifier) -> Identifier -> QIdentifier
forall a b. (a -> b) -> a -> b
$ Text -> Identifier
Identifier Text
"event_id")
              joinTables :: JoinExpr
joinTables =
                FromItem -> JoinType -> FromItem -> JoinCond -> JoinExpr
S.JoinExpr
                  (QualifiedTable -> Maybe TableAlias -> FromItem
S.FISimple QualifiedTable
invocationTable Maybe TableAlias
forall a. Maybe a
Nothing)
                  JoinType
S.Inner
                  (QualifiedTable -> Maybe TableAlias -> FromItem
S.FISimple QualifiedTable
eventTable Maybe TableAlias
forall a. Maybe a
Nothing)
                  JoinCond
joinCondition
              triggerBoolExp :: BoolExp
triggerBoolExp =
                CompareOp -> SQLExp -> SQLExp -> BoolExp
S.BECompare
                  CompareOp
S.SEQ
                  (QIdentifier -> SQLExp
S.SEQIdentifier (QIdentifier -> SQLExp) -> QIdentifier -> SQLExp
forall a b. (a -> b) -> a -> b
$ QualifiedTable -> Identifier -> QIdentifier
forall a. IsIdentifier a => QualifiedTable -> a -> QIdentifier
S.mkQIdentifierTable QualifiedTable
eventTable (Text -> Identifier
Identifier Text
"trigger_name"))
                  (Text -> SQLExp
S.SELit (Text -> SQLExp) -> Text -> SQLExp
forall a b. (a -> b) -> a -> b
$ TriggerName -> Text
triggerNameToTxt TriggerName
triggerName)
           in Select
S.mkSelect
                { selExtr :: [Extractor]
S.selExtr = QualifiedTable -> [Extractor]
invocationFieldExtractors QualifiedTable
invocationTable,
                  selFrom :: Maybe FromExp
S.selFrom = FromExp -> Maybe FromExp
forall a. a -> Maybe a
Just (FromExp -> Maybe FromExp) -> FromExp -> Maybe FromExp
forall a b. (a -> b) -> a -> b
$ [FromItem] -> FromExp
S.FromExp [JoinExpr -> FromItem
S.FIJoin JoinExpr
joinTables],
                  selWhere :: Maybe WhereFrag
S.selWhere = WhereFrag -> Maybe WhereFrag
forall a. a -> Maybe a
Just (WhereFrag -> Maybe WhereFrag) -> WhereFrag -> Maybe WhereFrag
forall a b. (a -> b) -> a -> b
$ BoolExp -> WhereFrag
S.WhereFrag BoolExp
triggerBoolExp,
                  selOrderBy :: Maybe OrderByExp
S.selOrderBy = OrderByExp -> Maybe OrderByExp
forall a. a -> Maybe a
Just (OrderByExp -> Maybe OrderByExp) -> OrderByExp -> Maybe OrderByExp
forall a b. (a -> b) -> a -> b
$ QualifiedTable -> OrderByExp
createdAtOrderBy QualifiedTable
invocationTable
                }

getInvocationsQuery :: EventTables -> GetInvocationsBy -> ScheduledEventPagination -> S.Select
getInvocationsQuery :: EventTables
-> GetInvocationsBy -> ScheduledEventPagination -> Select
getInvocationsQuery EventTables
ets GetInvocationsBy
invocationsBy ScheduledEventPagination
pagination =
  Select -> ScheduledEventPagination -> Select
mkPaginationSelectExp (EventTables -> GetInvocationsBy -> Select
getInvocationsQueryNoPagination EventTables
ets GetInvocationsBy
invocationsBy) ScheduledEventPagination
pagination