{-# LANGUAGE PatternSynonyms #-}
{-# LANGUAGE QuasiQuotes #-}

-- |
-- = Scheduled Triggers
--
-- This module implements the functionality of invoking webhooks during specified
-- time events aka scheduled events. The scheduled events are the events generated
-- by the graphql-engine using the cron triggers or/and a scheduled event can
-- be created by the user at a specified time with the payload, webhook, headers
-- and the retry configuration. Scheduled events are modeled using rows in Postgres
-- with a @timestamp@ column.
--
-- This module implements scheduling and delivery of scheduled
-- events:
--
-- 1. Scheduling a cron event involves creating new cron events. New
-- cron events are created based on the cron schedule and the number of
-- scheduled events that are already present in the scheduled events buffer.
-- The graphql-engine computes the new scheduled events and writes them to
-- the database.(Generator)
--
-- 2. Delivering a scheduled event involves reading undelivered scheduled events
-- from the database and delivering them to the webhook server. (Processor)
--
-- The rationale behind separating the event scheduling and event delivery
-- mechanism into two different threads is that the scheduling and delivering of
-- the scheduled events are not directly dependent on each other. The generator
-- will almost always try to create scheduled events which are supposed to be
-- delivered in the future (timestamp > current_timestamp) and the processor
-- will fetch scheduled events of the past (timestamp < current_timestamp). So,
-- the set of the scheduled events generated by the generator and the processor
-- will never be the same. The point here is that they're not correlated to each
-- other. They can be split into different threads for a better performance.
--
-- == Implementation
--
-- The scheduled triggers eventing is being implemented in the metadata storage.
-- All functions that make interaction to storage system are abstracted in
-- the @'MonadMetadataStorage' class.
--
-- During the startup, two threads are started:
--
-- 1. Generator: Fetches the list of scheduled triggers from cache and generates
--    the scheduled events.
--
--     - Additional events will be generated only if there are fewer than 100
--       scheduled events.
--
--     - The upcoming events timestamp will be generated using:
--
--         - cron schedule of the scheduled trigger
--
--         - max timestamp of the scheduled events that already exist or
--           current_timestamp(when no scheduled events exist)
--
--         - The timestamp of the scheduled events is stored with timezone because
--           `SELECT NOW()` returns timestamp with timezone, so it's good to
--           compare two things of the same type.
--
--     This effectively corresponds to doing an INSERT with values containing
--     specific timestamp.
--
-- 2. Processor: Fetches the undelivered cron events and the scheduled events
--    from the database and which have timestamp lesser than the
--    current timestamp and then process them.
--
-- TODO
-- - Consider and document ordering guarantees
--   - do we have any in the presence of multiple hasura instances?
-- - If we have nothing useful to say about ordering, then consider processing
--   events asynchronously, so that a slow webhook doesn't cause everything
--   subsequent to be delayed
module Hasura.Eventing.ScheduledTrigger
  ( runCronEventsGenerator,
    processScheduledTriggers,
    generateScheduleTimes,
    CronEventSeed (..),
    LockedEventsCtx (..),

    -- * Cron trigger stats logger
    createFetchedCronTriggerStatsLogger,
    closeFetchedCronTriggersStatsLogger,

    -- * Scheduled events stats logger
    createFetchedScheduledEventsStatsLogger,
    closeFetchedScheduledEventsStatsLogger,

    -- * Database interactions

    -- Following function names are similar to those present in
    -- 'MonadMetadataStorage' type class. To avoid duplication,
    -- 'Tx' is suffixed to identify as database transactions
    getDeprivedCronTriggerStatsTx,
    getScheduledEventsForDeliveryTx,
    insertInvocationTx,
    setScheduledEventOpTx,
    unlockScheduledEventsTx,
    unlockAllLockedScheduledEventsTx,
    insertCronEventsTx,
    insertOneOffScheduledEventTx,
    dropFutureCronEventsTx,
    getOneOffScheduledEventsTx,
    getCronEventsTx,
    deleteScheduledEventTx,
    getScheduledEventInvocationsTx,
    getScheduledEventsInvocationsQuery,
    getScheduledEventsInvocationsQueryNoPagination,

    -- * Export utility functions which are useful to build

    -- SQLs for fetching data from metadata storage
    mkScheduledEventStatusFilter,
    scheduledTimeOrderBy,
    executeWithOptionalTotalCount,
    mkPaginationSelectExp,
    withCount,
    invocationFieldExtractors,
    mkEventIdBoolExp,
    EventTables (..),
  )
where

import Control.Concurrent.Async.Lifted (forConcurrently_)
import Control.Concurrent.Extended (Forever (..), sleep)
import Control.Concurrent.STM
import Control.Lens (preview)
import Control.Monad.Trans.Control (MonadBaseControl)
import Data.Aeson qualified as J
import Data.Environment qualified as Env
import Data.Has
import Data.HashMap.Strict qualified as HashMap
import Data.Int (Int64)
import Data.List.NonEmpty qualified as NE
import Data.SerializableBlob qualified as SB
import Data.Set qualified as Set
import Data.Text qualified as T
import Data.Text.Extended (ToTxt (..), (<<>))
import Data.These
import Data.Time.Clock
import Data.URL.Template (printTemplate)
import Database.PG.Query qualified as PG
import Hasura.Backends.Postgres.Execute.Types
import Hasura.Backends.Postgres.SQL.DML qualified as S
import Hasura.Backends.Postgres.SQL.Types
import Hasura.Base.Error
import Hasura.Eventing.Common
import Hasura.Eventing.HTTP
import Hasura.Eventing.ScheduledTrigger.Types
import Hasura.HTTP (getHTTPExceptionStatus)
import Hasura.Logging qualified as L
import Hasura.Metadata.Class
import Hasura.Prelude
import Hasura.RQL.DDL.EventTrigger (ResolveHeaderError, getHeaderInfosFromConfEither)
import Hasura.RQL.DDL.Webhook.Transform
import Hasura.RQL.Types.Common
import Hasura.RQL.Types.EventTrigger
import Hasura.RQL.Types.Eventing
import Hasura.RQL.Types.ScheduledTrigger
import Hasura.RQL.Types.SchemaCache
import Hasura.SQL.Types
import Hasura.Server.Prometheus (ScheduledTriggerMetrics (..))
import Hasura.Tracing qualified as Tracing
import Network.HTTP.Client.Transformable qualified as HTTP
import Refined (unrefine)
import System.Metrics.Prometheus.Counter as Prometheus.Counter
import System.Timeout.Lifted (timeout)
import Text.Builder qualified as TB

-- | runCronEventsGenerator makes sure that all the cron triggers
--   have an adequate buffer of cron events.
runCronEventsGenerator ::
  ( MonadIO m,
    MonadMetadataStorage m
  ) =>
  L.Logger L.Hasura ->
  FetchedCronTriggerStatsLogger ->
  IO SchemaCache ->
  m void
runCronEventsGenerator :: forall (m :: * -> *) void.
(MonadIO m, MonadMetadataStorage m) =>
Logger Hasura
-> FetchedCronTriggerStatsLogger -> IO SchemaCache -> m void
runCronEventsGenerator Logger Hasura
logger FetchedCronTriggerStatsLogger
cronTriggerStatsLogger IO SchemaCache
getSC = do
  m () -> m void
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m void) -> m () -> m void
forall a b. (a -> b) -> a -> b
$ do
    SchemaCache
sc <- IO SchemaCache -> m SchemaCache
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO SchemaCache
getSC
    -- get cron triggers from cache
    let cronTriggersCache :: HashMap TriggerName CronTriggerInfo
cronTriggersCache = SchemaCache -> HashMap TriggerName CronTriggerInfo
scCronTriggers SchemaCache
sc

    Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (HashMap TriggerName CronTriggerInfo -> Bool
forall k v. HashMap k v -> Bool
HashMap.null HashMap TriggerName CronTriggerInfo
cronTriggersCache) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
      -- Poll the DB only when there's at-least one cron trigger present
      -- in the schema cache
      -- get cron trigger stats from db
      -- When shutdown is initiated, we stop generating new cron events
      Either QErr ()
eitherRes <- ExceptT QErr m () -> m (Either QErr ())
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT QErr m () -> m (Either QErr ()))
-> ExceptT QErr m () -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$ do
        [CronTriggerStats]
deprivedCronTriggerStats <- ExceptT QErr m (Either QErr [CronTriggerStats])
-> ExceptT QErr m [CronTriggerStats]
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM (ExceptT QErr m (Either QErr [CronTriggerStats])
 -> ExceptT QErr m [CronTriggerStats])
-> ExceptT QErr m (Either QErr [CronTriggerStats])
-> ExceptT QErr m [CronTriggerStats]
forall a b. (a -> b) -> a -> b
$ [TriggerName] -> ExceptT QErr m (Either QErr [CronTriggerStats])
forall (m :: * -> *).
MonadMetadataStorage m =>
[TriggerName] -> m (Either QErr [CronTriggerStats])
getDeprivedCronTriggerStats ([TriggerName] -> ExceptT QErr m (Either QErr [CronTriggerStats]))
-> [TriggerName] -> ExceptT QErr m (Either QErr [CronTriggerStats])
forall a b. (a -> b) -> a -> b
$ HashMap TriggerName CronTriggerInfo -> [TriggerName]
forall k v. HashMap k v -> [k]
HashMap.keys HashMap TriggerName CronTriggerInfo
cronTriggersCache
        -- Log fetched deprived cron trigger stats
        FetchedCronTriggerStatsLogger
-> [CronTriggerStats] -> ExceptT QErr m ()
forall (m :: * -> *).
MonadIO m =>
FetchedCronTriggerStatsLogger -> [CronTriggerStats] -> m ()
logFetchedCronTriggersStats FetchedCronTriggerStatsLogger
cronTriggerStatsLogger [CronTriggerStats]
deprivedCronTriggerStats
        -- join stats with cron triggers and produce @[(CronTriggerInfo, CronTriggerStats)]@
        [(CronTriggerInfo, CronTriggerStats)]
cronTriggersForHydrationWithStats <-
          [Maybe (CronTriggerInfo, CronTriggerStats)]
-> [(CronTriggerInfo, CronTriggerStats)]
forall a. [Maybe a] -> [a]
forall (f :: * -> *) a. Filterable f => f (Maybe a) -> f a
catMaybes
            ([Maybe (CronTriggerInfo, CronTriggerStats)]
 -> [(CronTriggerInfo, CronTriggerStats)])
-> ExceptT QErr m [Maybe (CronTriggerInfo, CronTriggerStats)]
-> ExceptT QErr m [(CronTriggerInfo, CronTriggerStats)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (CronTriggerStats
 -> ExceptT QErr m (Maybe (CronTriggerInfo, CronTriggerStats)))
-> [CronTriggerStats]
-> ExceptT QErr m [Maybe (CronTriggerInfo, CronTriggerStats)]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM (HashMap TriggerName CronTriggerInfo
-> CronTriggerStats
-> ExceptT QErr m (Maybe (CronTriggerInfo, CronTriggerStats))
withCronTrigger HashMap TriggerName CronTriggerInfo
cronTriggersCache) [CronTriggerStats]
deprivedCronTriggerStats
        [(CronTriggerInfo, CronTriggerStats)] -> ExceptT QErr m ()
forall (m :: * -> *).
(MonadMetadataStorage m, MonadError QErr m) =>
[(CronTriggerInfo, CronTriggerStats)] -> m ()
insertCronEventsFor [(CronTriggerInfo, CronTriggerStats)]
cronTriggersForHydrationWithStats

      Either QErr () -> (QErr -> m ()) -> m ()
forall (m :: * -> *) e a.
Applicative m =>
Either e a -> (e -> m a) -> m a
onLeft Either QErr ()
eitherRes ((QErr -> m ()) -> m ()) -> (QErr -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ Logger Hasura
-> forall a (m :: * -> *).
   (ToEngineLog a Hasura, MonadIO m) =>
   a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
   (ToEngineLog a impl, MonadIO m) =>
   a -> m ()
L.unLogger Logger Hasura
logger (ScheduledTriggerInternalErr -> m ())
-> (QErr -> ScheduledTriggerInternalErr) -> QErr -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. QErr -> ScheduledTriggerInternalErr
ScheduledTriggerInternalErr

    -- See discussion: https://github.com/hasura/graphql-engine-mono/issues/1001
    IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ DiffTime -> IO ()
sleep (Minutes -> DiffTime
minutes Minutes
1)
  where
    withCronTrigger :: HashMap TriggerName CronTriggerInfo
-> CronTriggerStats
-> ExceptT QErr m (Maybe (CronTriggerInfo, CronTriggerStats))
withCronTrigger HashMap TriggerName CronTriggerInfo
cronTriggerCache CronTriggerStats
cronTriggerStat = do
      case TriggerName
-> HashMap TriggerName CronTriggerInfo -> Maybe CronTriggerInfo
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HashMap.lookup (CronTriggerStats -> TriggerName
_ctsName CronTriggerStats
cronTriggerStat) HashMap TriggerName CronTriggerInfo
cronTriggerCache of
        Maybe CronTriggerInfo
Nothing -> do
          Logger Hasura
-> forall a (m :: * -> *).
   (ToEngineLog a Hasura, MonadIO m) =>
   a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
   (ToEngineLog a impl, MonadIO m) =>
   a -> m ()
L.unLogger Logger Hasura
logger
            (ScheduledTriggerInternalErr -> ExceptT QErr m ())
-> ScheduledTriggerInternalErr -> ExceptT QErr m ()
forall a b. (a -> b) -> a -> b
$ QErr -> ScheduledTriggerInternalErr
ScheduledTriggerInternalErr
            (QErr -> ScheduledTriggerInternalErr)
-> QErr -> ScheduledTriggerInternalErr
forall a b. (a -> b) -> a -> b
$ Code -> Text -> QErr
err500 Code
Unexpected Text
"could not find scheduled trigger in the schema cache"
          Maybe (CronTriggerInfo, CronTriggerStats)
-> ExceptT QErr m (Maybe (CronTriggerInfo, CronTriggerStats))
forall a. a -> ExceptT QErr m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (CronTriggerInfo, CronTriggerStats)
forall a. Maybe a
Nothing
        Just CronTriggerInfo
cronTrigger ->
          Maybe (CronTriggerInfo, CronTriggerStats)
-> ExceptT QErr m (Maybe (CronTriggerInfo, CronTriggerStats))
forall a. a -> ExceptT QErr m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
            (Maybe (CronTriggerInfo, CronTriggerStats)
 -> ExceptT QErr m (Maybe (CronTriggerInfo, CronTriggerStats)))
-> Maybe (CronTriggerInfo, CronTriggerStats)
-> ExceptT QErr m (Maybe (CronTriggerInfo, CronTriggerStats))
forall a b. (a -> b) -> a -> b
$ (CronTriggerInfo, CronTriggerStats)
-> Maybe (CronTriggerInfo, CronTriggerStats)
forall a. a -> Maybe a
Just (CronTriggerInfo
cronTrigger, CronTriggerStats
cronTriggerStat)

insertCronEventsFor ::
  (MonadMetadataStorage m, MonadError QErr m) =>
  [(CronTriggerInfo, CronTriggerStats)] ->
  m ()
insertCronEventsFor :: forall (m :: * -> *).
(MonadMetadataStorage m, MonadError QErr m) =>
[(CronTriggerInfo, CronTriggerStats)] -> m ()
insertCronEventsFor [(CronTriggerInfo, CronTriggerStats)]
cronTriggersWithStats = do
  let scheduledEvents :: [CronEventSeed]
scheduledEvents = (((CronTriggerInfo, CronTriggerStats) -> [CronEventSeed])
 -> [(CronTriggerInfo, CronTriggerStats)] -> [CronEventSeed])
-> [(CronTriggerInfo, CronTriggerStats)]
-> ((CronTriggerInfo, CronTriggerStats) -> [CronEventSeed])
-> [CronEventSeed]
forall a b c. (a -> b -> c) -> b -> a -> c
flip ((CronTriggerInfo, CronTriggerStats) -> [CronEventSeed])
-> [(CronTriggerInfo, CronTriggerStats)] -> [CronEventSeed]
forall (t :: * -> *) a b. Foldable t => (a -> [b]) -> t a -> [b]
concatMap [(CronTriggerInfo, CronTriggerStats)]
cronTriggersWithStats (((CronTriggerInfo, CronTriggerStats) -> [CronEventSeed])
 -> [CronEventSeed])
-> ((CronTriggerInfo, CronTriggerStats) -> [CronEventSeed])
-> [CronEventSeed]
forall a b. (a -> b) -> a -> b
$ \(CronTriggerInfo
cti, CronTriggerStats
stats) ->
        UTCTime -> CronTriggerInfo -> [CronEventSeed]
generateCronEventsFrom (CronTriggerStats -> UTCTime
_ctsMaxScheduledTime CronTriggerStats
stats) CronTriggerInfo
cti
  case [CronEventSeed]
scheduledEvents of
    [] -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    [CronEventSeed]
events -> m (Either QErr ()) -> m ()
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM (m (Either QErr ()) -> m ()) -> m (Either QErr ()) -> m ()
forall a b. (a -> b) -> a -> b
$ [CronEventSeed] -> m (Either QErr ())
forall (m :: * -> *).
MonadMetadataStorage m =>
[CronEventSeed] -> m (Either QErr ())
insertCronEvents [CronEventSeed]
events

generateCronEventsFrom :: UTCTime -> CronTriggerInfo -> [CronEventSeed]
generateCronEventsFrom :: UTCTime -> CronTriggerInfo -> [CronEventSeed]
generateCronEventsFrom UTCTime
startTime CronTriggerInfo {[EventHeaderInfo]
Maybe Text
Maybe Value
Maybe RequestTransform
Maybe MetadataResponseTransform
CronSchedule
EnvRecord ResolvedWebhook
TriggerName
STRetryConf
ctiName :: TriggerName
ctiSchedule :: CronSchedule
ctiPayload :: Maybe Value
ctiRetryConf :: STRetryConf
ctiWebhookInfo :: EnvRecord ResolvedWebhook
ctiHeaders :: [EventHeaderInfo]
ctiComment :: Maybe Text
ctiRequestTransform :: Maybe RequestTransform
ctiResponseTransform :: Maybe MetadataResponseTransform
ctiName :: CronTriggerInfo -> TriggerName
ctiSchedule :: CronTriggerInfo -> CronSchedule
ctiPayload :: CronTriggerInfo -> Maybe Value
ctiRetryConf :: CronTriggerInfo -> STRetryConf
ctiWebhookInfo :: CronTriggerInfo -> EnvRecord ResolvedWebhook
ctiHeaders :: CronTriggerInfo -> [EventHeaderInfo]
ctiComment :: CronTriggerInfo -> Maybe Text
ctiRequestTransform :: CronTriggerInfo -> Maybe RequestTransform
ctiResponseTransform :: CronTriggerInfo -> Maybe MetadataResponseTransform
..} =
  (UTCTime -> CronEventSeed) -> [UTCTime] -> [CronEventSeed]
forall a b. (a -> b) -> [a] -> [b]
map (TriggerName -> UTCTime -> CronEventSeed
CronEventSeed TriggerName
ctiName)
    ([UTCTime] -> [CronEventSeed]) -> [UTCTime] -> [CronEventSeed]
forall a b. (a -> b) -> a -> b
$
    -- generate next 100 events; see getDeprivedCronTriggerStatsTx:
    UTCTime -> Int -> CronSchedule -> [UTCTime]
generateScheduleTimes UTCTime
startTime Int
100 CronSchedule
ctiSchedule

-- | `upperBoundScheduledEventTimeout` is the maximum amount of time
--    a scheduled event can take to process. This function is intended
--    to use with a timeout.
upperBoundScheduledEventTimeout :: DiffTime
upperBoundScheduledEventTimeout :: DiffTime
upperBoundScheduledEventTimeout = Minutes -> DiffTime
minutes Minutes
30

processCronEvents ::
  ( MonadIO m,
    MonadMetadataStorage m,
    Tracing.MonadTrace m,
    MonadBaseControl IO m
  ) =>
  L.Logger L.Hasura ->
  HTTP.Manager ->
  ScheduledTriggerMetrics ->
  [CronEvent] ->
  HashMap TriggerName CronTriggerInfo ->
  TVar (Set.Set CronEventId) ->
  m ()
processCronEvents :: forall (m :: * -> *).
(MonadIO m, MonadMetadataStorage m, MonadTrace m,
 MonadBaseControl IO m) =>
Logger Hasura
-> Manager
-> ScheduledTriggerMetrics
-> [CronEvent]
-> HashMap TriggerName CronTriggerInfo
-> TVar (Set ScheduledEventId)
-> m ()
processCronEvents Logger Hasura
logger Manager
httpMgr ScheduledTriggerMetrics
scheduledTriggerMetrics [CronEvent]
cronEvents HashMap TriggerName CronTriggerInfo
cronTriggersInfo TVar (Set ScheduledEventId)
lockedCronEvents = do
  -- save the locked cron events that have been fetched from the
  -- database, the events stored here will be unlocked in case a
  -- graceful shutdown is initiated in midst of processing these events
  [ScheduledEventId] -> TVar (Set ScheduledEventId) -> m ()
forall (m :: * -> *).
MonadIO m =>
[ScheduledEventId] -> TVar (Set ScheduledEventId) -> m ()
saveLockedEvents ((CronEvent -> ScheduledEventId)
-> [CronEvent] -> [ScheduledEventId]
forall a b. (a -> b) -> [a] -> [b]
map CronEvent -> ScheduledEventId
_ceId [CronEvent]
cronEvents) TVar (Set ScheduledEventId)
lockedCronEvents
  -- The `createdAt` of a cron event is the `created_at` of the cron trigger
  [CronEvent] -> (CronEvent -> m ()) -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, MonadBaseControl IO m) =>
t a -> (a -> m b) -> m ()
forConcurrently_ [CronEvent]
cronEvents ((CronEvent -> m ()) -> m ()) -> (CronEvent -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \(CronEvent ScheduledEventId
id' TriggerName
name UTCTime
st Text
_ Int
tries UTCTime
_ Maybe UTCTime
_) -> do
    case TriggerName
-> HashMap TriggerName CronTriggerInfo -> Maybe CronTriggerInfo
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HashMap.lookup TriggerName
name HashMap TriggerName CronTriggerInfo
cronTriggersInfo of
      Maybe CronTriggerInfo
Nothing ->
        QErr -> m ()
logInternalError
          (QErr -> m ()) -> QErr -> m ()
forall a b. (a -> b) -> a -> b
$ Code -> Text -> QErr
err500 Code
Unexpected
          (Text -> QErr) -> Text -> QErr
forall a b. (a -> b) -> a -> b
$ Text
"could not find cron trigger "
          Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> TriggerName
name
          TriggerName -> Text -> Text
forall t. ToTxt t => t -> Text -> Text
<<> Text
" in the schema cache"
      Just CronTriggerInfo {[EventHeaderInfo]
Maybe Text
Maybe Value
Maybe RequestTransform
Maybe MetadataResponseTransform
CronSchedule
EnvRecord ResolvedWebhook
TriggerName
STRetryConf
ctiName :: CronTriggerInfo -> TriggerName
ctiSchedule :: CronTriggerInfo -> CronSchedule
ctiPayload :: CronTriggerInfo -> Maybe Value
ctiRetryConf :: CronTriggerInfo -> STRetryConf
ctiWebhookInfo :: CronTriggerInfo -> EnvRecord ResolvedWebhook
ctiHeaders :: CronTriggerInfo -> [EventHeaderInfo]
ctiComment :: CronTriggerInfo -> Maybe Text
ctiRequestTransform :: CronTriggerInfo -> Maybe RequestTransform
ctiResponseTransform :: CronTriggerInfo -> Maybe MetadataResponseTransform
ctiName :: TriggerName
ctiSchedule :: CronSchedule
ctiPayload :: Maybe Value
ctiRetryConf :: STRetryConf
ctiWebhookInfo :: EnvRecord ResolvedWebhook
ctiHeaders :: [EventHeaderInfo]
ctiComment :: Maybe Text
ctiRequestTransform :: Maybe RequestTransform
ctiResponseTransform :: Maybe MetadataResponseTransform
..} -> do
        let payload :: ScheduledEventWebhookPayload
payload =
              ScheduledEventId
-> Maybe TriggerName
-> UTCTime
-> Value
-> Maybe Text
-> Maybe UTCTime
-> Maybe RequestTransform
-> Maybe MetadataResponseTransform
-> ScheduledEventWebhookPayload
ScheduledEventWebhookPayload
                ScheduledEventId
id'
                (TriggerName -> Maybe TriggerName
forall a. a -> Maybe a
Just TriggerName
name)
                UTCTime
st
                (Value -> Maybe Value -> Value
forall a. a -> Maybe a -> a
fromMaybe Value
J.Null Maybe Value
ctiPayload)
                Maybe Text
ctiComment
                Maybe UTCTime
forall a. Maybe a
Nothing
                Maybe RequestTransform
ctiRequestTransform
                Maybe MetadataResponseTransform
ctiResponseTransform
            retryCtx :: RetryContext
retryCtx = Int -> STRetryConf -> RetryContext
RetryContext Int
tries STRetryConf
ctiRetryConf
            eventProcessingTimeout :: DiffTime
eventProcessingTimeout = DiffTime -> DiffTime -> DiffTime
forall a. Ord a => a -> a -> a
min DiffTime
upperBoundScheduledEventTimeout (Refined NonNegative DiffTime -> DiffTime
forall {k} (p :: k) x. Refined p x -> x
unrefine (Refined NonNegative DiffTime -> DiffTime)
-> Refined NonNegative DiffTime -> DiffTime
forall a b. (a -> b) -> a -> b
$ STRetryConf -> Refined NonNegative DiffTime
strcTimeoutSeconds (STRetryConf -> Refined NonNegative DiffTime)
-> STRetryConf -> Refined NonNegative DiffTime
forall a b. (a -> b) -> a -> b
$ STRetryConf
ctiRetryConf)
            processScheduledEventAction :: m (Either QErr ())
processScheduledEventAction =
              ExceptT QErr m () -> m (Either QErr ())
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT
                (ExceptT QErr m () -> m (Either QErr ()))
-> ExceptT QErr m () -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$ (ReaderT (Logger Hasura, Manager) (ExceptT QErr m) ()
 -> (Logger Hasura, Manager) -> ExceptT QErr m ())
-> (Logger Hasura, Manager)
-> ReaderT (Logger Hasura, Manager) (ExceptT QErr m) ()
-> ExceptT QErr m ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip ReaderT (Logger Hasura, Manager) (ExceptT QErr m) ()
-> (Logger Hasura, Manager) -> ExceptT QErr m ()
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT (Logger Hasura
logger, Manager
httpMgr)
                (ReaderT (Logger Hasura, Manager) (ExceptT QErr m) ()
 -> ExceptT QErr m ())
-> ReaderT (Logger Hasura, Manager) (ExceptT QErr m) ()
-> ExceptT QErr m ()
forall a b. (a -> b) -> a -> b
$ ScheduledTriggerMetrics
-> ScheduledEventId
-> [EventHeaderInfo]
-> RetryContext
-> ScheduledEventWebhookPayload
-> EnvRecord ResolvedWebhook
-> ScheduledEventType
-> ReaderT (Logger Hasura, Manager) (ExceptT QErr m) ()
forall r (m :: * -> *).
(MonadReader r m, Has Manager r, Has (Logger Hasura) r, MonadIO m,
 MonadTrace m, MonadMetadataStorage m, MonadError QErr m) =>
ScheduledTriggerMetrics
-> ScheduledEventId
-> [EventHeaderInfo]
-> RetryContext
-> ScheduledEventWebhookPayload
-> EnvRecord ResolvedWebhook
-> ScheduledEventType
-> m ()
processScheduledEvent
                  ScheduledTriggerMetrics
scheduledTriggerMetrics
                  ScheduledEventId
id'
                  [EventHeaderInfo]
ctiHeaders
                  RetryContext
retryCtx
                  ScheduledEventWebhookPayload
payload
                  EnvRecord ResolvedWebhook
ctiWebhookInfo
                  ScheduledEventType
Cron
        Maybe (Either QErr ())
eventProcessedMaybe <-
          Int -> m (Either QErr ()) -> m (Maybe (Either QErr ()))
forall (m :: * -> *) a.
MonadBaseControl IO m =>
Int -> m a -> m (Maybe a)
timeout (Integer -> Int
forall a. Num a => Integer -> a
fromInteger (DiffTime -> Integer
diffTimeToMicroSeconds DiffTime
eventProcessingTimeout)) (m (Either QErr ()) -> m (Maybe (Either QErr ())))
-> m (Either QErr ()) -> m (Maybe (Either QErr ()))
forall a b. (a -> b) -> a -> b
$ m (Either QErr ())
processScheduledEventAction
        case Maybe (Either QErr ())
eventProcessedMaybe of
          Maybe (Either QErr ())
Nothing -> do
            let eventTimeoutMessage :: Text
eventTimeoutMessage = Text
"Cron Scheduled event " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> ScheduledEventId
id' ScheduledEventId -> Text -> Text
forall t. ToTxt t => t -> Text -> Text
<<> Text
" of cron trigger " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> TriggerName
name TriggerName -> Text -> Text
forall t. ToTxt t => t -> Text -> Text
<<> Text
" timed out while processing."
                eventTimeoutError :: QErr
eventTimeoutError = Code -> Text -> QErr
err500 Code
TimeoutErrorCode Text
eventTimeoutMessage
            QErr -> m ()
logInternalError QErr
eventTimeoutError
            ExceptT QErr m () -> m (Either QErr ())
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ScheduledEventId
-> RetryContext
-> [HeaderConf]
-> ScheduledEventType
-> Value
-> HTTPErr Any
-> ScheduledTriggerMetrics
-> ExceptT QErr m ()
forall (m :: * -> *) (a :: TriggerTypes).
(MonadIO m, MonadMetadataStorage m, MonadError QErr m) =>
ScheduledEventId
-> RetryContext
-> [HeaderConf]
-> ScheduledEventType
-> Value
-> HTTPErr a
-> ScheduledTriggerMetrics
-> m ()
processError ScheduledEventId
id' RetryContext
retryCtx [] ScheduledEventType
Cron (Text -> Value
mkErrorObject Text
eventTimeoutMessage) (String -> HTTPErr Any
forall (a :: TriggerTypes). String -> HTTPErr a
HOther (String -> HTTPErr Any) -> String -> HTTPErr Any
forall a b. (a -> b) -> a -> b
$ Text -> String
T.unpack Text
eventTimeoutMessage) ScheduledTriggerMetrics
scheduledTriggerMetrics)
              m (Either QErr ()) -> (Either QErr () -> m ()) -> m ()
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Either QErr () -> (QErr -> m ()) -> m ()
forall (m :: * -> *) e a.
Applicative m =>
Either e a -> (e -> m a) -> m a
`onLeft` QErr -> m ()
logInternalError)
          Just Either QErr ()
finally -> Either QErr () -> (QErr -> m ()) -> m ()
forall (m :: * -> *) e a.
Applicative m =>
Either e a -> (e -> m a) -> m a
onLeft Either QErr ()
finally QErr -> m ()
logInternalError
        ScheduledEventId -> TVar (Set ScheduledEventId) -> m ()
forall (m :: * -> *).
MonadIO m =>
ScheduledEventId -> TVar (Set ScheduledEventId) -> m ()
removeEventFromLockedEvents ScheduledEventId
id' TVar (Set ScheduledEventId)
lockedCronEvents
  where
    logInternalError :: QErr -> m ()
logInternalError QErr
err = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ())
-> (ScheduledTriggerInternalErr -> IO ())
-> ScheduledTriggerInternalErr
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Logger Hasura
-> forall a (m :: * -> *).
   (ToEngineLog a Hasura, MonadIO m) =>
   a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
   (ToEngineLog a impl, MonadIO m) =>
   a -> m ()
L.unLogger Logger Hasura
logger (ScheduledTriggerInternalErr -> m ())
-> ScheduledTriggerInternalErr -> m ()
forall a b. (a -> b) -> a -> b
$ QErr -> ScheduledTriggerInternalErr
ScheduledTriggerInternalErr QErr
err

    mkErrorObject :: Text -> J.Value
    mkErrorObject :: Text -> Value
mkErrorObject Text
errorMessage =
      [Pair] -> Value
J.object ([Pair] -> Value) -> [Pair] -> Value
forall a b. (a -> b) -> a -> b
$ [Key
"error" Key -> Text -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
J..= Text
errorMessage]

processOneOffScheduledEvents ::
  ( MonadIO m,
    Tracing.MonadTrace m,
    MonadMetadataStorage m,
    MonadBaseControl IO m
  ) =>
  Env.Environment ->
  L.Logger L.Hasura ->
  HTTP.Manager ->
  ScheduledTriggerMetrics ->
  [OneOffScheduledEvent] ->
  TVar (Set.Set OneOffScheduledEventId) ->
  m ()
processOneOffScheduledEvents :: forall (m :: * -> *).
(MonadIO m, MonadTrace m, MonadMetadataStorage m,
 MonadBaseControl IO m) =>
Environment
-> Logger Hasura
-> Manager
-> ScheduledTriggerMetrics
-> [OneOffScheduledEvent]
-> TVar (Set ScheduledEventId)
-> m ()
processOneOffScheduledEvents
  Environment
env
  Logger Hasura
logger
  Manager
httpMgr
  ScheduledTriggerMetrics
scheduledTriggerMetrics
  [OneOffScheduledEvent]
oneOffEvents
  TVar (Set ScheduledEventId)
lockedOneOffScheduledEvents = do
    -- save the locked one-off events that have been fetched from the
    -- database, the events stored here will be unlocked in case a
    -- graceful shutdown is initiated in midst of processing these events
    [ScheduledEventId] -> TVar (Set ScheduledEventId) -> m ()
forall (m :: * -> *).
MonadIO m =>
[ScheduledEventId] -> TVar (Set ScheduledEventId) -> m ()
saveLockedEvents ((OneOffScheduledEvent -> ScheduledEventId)
-> [OneOffScheduledEvent] -> [ScheduledEventId]
forall a b. (a -> b) -> [a] -> [b]
map OneOffScheduledEvent -> ScheduledEventId
_ooseId [OneOffScheduledEvent]
oneOffEvents) TVar (Set ScheduledEventId)
lockedOneOffScheduledEvents
    [OneOffScheduledEvent] -> (OneOffScheduledEvent -> m ()) -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, MonadBaseControl IO m) =>
t a -> (a -> m b) -> m ()
forConcurrently_ [OneOffScheduledEvent]
oneOffEvents ((OneOffScheduledEvent -> m ()) -> m ())
-> (OneOffScheduledEvent -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \OneOffScheduledEvent {Int
[HeaderConf]
Maybe Text
Maybe Value
Maybe UTCTime
Maybe RequestTransform
Maybe MetadataResponseTransform
Text
UTCTime
ScheduledEventId
InputWebhook
STRetryConf
_ooseId :: OneOffScheduledEvent -> ScheduledEventId
_ooseId :: ScheduledEventId
_ooseWebhookConf :: InputWebhook
_ooseScheduledTime :: UTCTime
_ooseRetryConf :: STRetryConf
_oosePayload :: Maybe Value
_ooseHeaderConf :: [HeaderConf]
_ooseStatus :: Text
_ooseTries :: Int
_ooseCreatedAt :: UTCTime
_ooseNextRetryAt :: Maybe UTCTime
_ooseComment :: Maybe Text
_ooseRequestTransform :: Maybe RequestTransform
_ooseResponseTransform :: Maybe MetadataResponseTransform
_ooseWebhookConf :: OneOffScheduledEvent -> InputWebhook
_ooseScheduledTime :: OneOffScheduledEvent -> UTCTime
_ooseRetryConf :: OneOffScheduledEvent -> STRetryConf
_oosePayload :: OneOffScheduledEvent -> Maybe Value
_ooseHeaderConf :: OneOffScheduledEvent -> [HeaderConf]
_ooseStatus :: OneOffScheduledEvent -> Text
_ooseTries :: OneOffScheduledEvent -> Int
_ooseCreatedAt :: OneOffScheduledEvent -> UTCTime
_ooseNextRetryAt :: OneOffScheduledEvent -> Maybe UTCTime
_ooseComment :: OneOffScheduledEvent -> Maybe Text
_ooseRequestTransform :: OneOffScheduledEvent -> Maybe RequestTransform
_ooseResponseTransform :: OneOffScheduledEvent -> Maybe MetadataResponseTransform
..} -> do
      ((QErr -> m ()) -> (() -> m ()) -> Either QErr () -> m ()
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either QErr -> m ()
logInternalError () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure) (Either QErr () -> m ()) -> m (Either QErr ()) -> m ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< ExceptT QErr m () -> m (Either QErr ())
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT do
        let payload :: ScheduledEventWebhookPayload
payload =
              ScheduledEventId
-> Maybe TriggerName
-> UTCTime
-> Value
-> Maybe Text
-> Maybe UTCTime
-> Maybe RequestTransform
-> Maybe MetadataResponseTransform
-> ScheduledEventWebhookPayload
ScheduledEventWebhookPayload
                ScheduledEventId
_ooseId
                Maybe TriggerName
forall a. Maybe a
Nothing
                UTCTime
_ooseScheduledTime
                (Value -> Maybe Value -> Value
forall a. a -> Maybe a -> a
fromMaybe Value
J.Null Maybe Value
_oosePayload)
                Maybe Text
_ooseComment
                (UTCTime -> Maybe UTCTime
forall a. a -> Maybe a
Just UTCTime
_ooseCreatedAt)
                Maybe RequestTransform
_ooseRequestTransform
                Maybe MetadataResponseTransform
_ooseResponseTransform
            retryCtx :: RetryContext
retryCtx = Int -> STRetryConf -> RetryContext
RetryContext Int
_ooseTries STRetryConf
_ooseRetryConf
            resolvedWebhookInfoEither :: Either ResolveWebhookError ResolvedWebhook
resolvedWebhookInfoEither = Environment
-> InputWebhook -> Either ResolveWebhookError ResolvedWebhook
resolveWebhookEither Environment
env InputWebhook
_ooseWebhookConf
            resolvedHeaderInfoEither :: Either ResolveHeaderError [EventHeaderInfo]
resolvedHeaderInfoEither = Environment
-> [HeaderConf] -> Either ResolveHeaderError [EventHeaderInfo]
getHeaderInfosFromConfEither Environment
env [HeaderConf]
_ooseHeaderConf
            -- `webhookAndHeaderInfo` returns webhook and header info (and errors)
            webhookAndHeaderInfo :: Either
  (These ResolveWebhookError ResolveHeaderError)
  (EnvRecord ResolvedWebhook, [EventHeaderInfo])
webhookAndHeaderInfo = case (Either ResolveWebhookError ResolvedWebhook
resolvedWebhookInfoEither, Either ResolveHeaderError [EventHeaderInfo]
resolvedHeaderInfoEither) of
              (Right ResolvedWebhook
resolvedEventWebhookInfo, Right [EventHeaderInfo]
resolvedEventHeaderInfo) -> do
                let resolvedWebhookEnvRecord :: EnvRecord ResolvedWebhook
resolvedWebhookEnvRecord = Text -> ResolvedWebhook -> EnvRecord ResolvedWebhook
forall a. Text -> a -> EnvRecord a
EnvRecord (InputWebhook -> Text
getTemplateFromUrl InputWebhook
_ooseWebhookConf) ResolvedWebhook
resolvedEventWebhookInfo
                (EnvRecord ResolvedWebhook, [EventHeaderInfo])
-> Either
     (These ResolveWebhookError ResolveHeaderError)
     (EnvRecord ResolvedWebhook, [EventHeaderInfo])
forall a b. b -> Either a b
Right (EnvRecord ResolvedWebhook
resolvedWebhookEnvRecord, [EventHeaderInfo]
resolvedEventHeaderInfo)
              (Left ResolveWebhookError
eventWebhookErrorVars, Right [EventHeaderInfo]
_) -> These ResolveWebhookError ResolveHeaderError
-> Either
     (These ResolveWebhookError ResolveHeaderError)
     (EnvRecord ResolvedWebhook, [EventHeaderInfo])
forall a b. a -> Either a b
Left (These ResolveWebhookError ResolveHeaderError
 -> Either
      (These ResolveWebhookError ResolveHeaderError)
      (EnvRecord ResolvedWebhook, [EventHeaderInfo]))
-> These ResolveWebhookError ResolveHeaderError
-> Either
     (These ResolveWebhookError ResolveHeaderError)
     (EnvRecord ResolvedWebhook, [EventHeaderInfo])
forall a b. (a -> b) -> a -> b
$ ResolveWebhookError -> These ResolveWebhookError ResolveHeaderError
forall a b. a -> These a b
This ResolveWebhookError
eventWebhookErrorVars
              (Right ResolvedWebhook
_, Left ResolveHeaderError
eventHeaderErrorVars) -> These ResolveWebhookError ResolveHeaderError
-> Either
     (These ResolveWebhookError ResolveHeaderError)
     (EnvRecord ResolvedWebhook, [EventHeaderInfo])
forall a b. a -> Either a b
Left (These ResolveWebhookError ResolveHeaderError
 -> Either
      (These ResolveWebhookError ResolveHeaderError)
      (EnvRecord ResolvedWebhook, [EventHeaderInfo]))
-> These ResolveWebhookError ResolveHeaderError
-> Either
     (These ResolveWebhookError ResolveHeaderError)
     (EnvRecord ResolvedWebhook, [EventHeaderInfo])
forall a b. (a -> b) -> a -> b
$ ResolveHeaderError -> These ResolveWebhookError ResolveHeaderError
forall a b. b -> These a b
That ResolveHeaderError
eventHeaderErrorVars
              (Left ResolveWebhookError
eventWebhookErrors, Left ResolveHeaderError
eventHeaderErrorVars) -> These ResolveWebhookError ResolveHeaderError
-> Either
     (These ResolveWebhookError ResolveHeaderError)
     (EnvRecord ResolvedWebhook, [EventHeaderInfo])
forall a b. a -> Either a b
Left (These ResolveWebhookError ResolveHeaderError
 -> Either
      (These ResolveWebhookError ResolveHeaderError)
      (EnvRecord ResolvedWebhook, [EventHeaderInfo]))
-> These ResolveWebhookError ResolveHeaderError
-> Either
     (These ResolveWebhookError ResolveHeaderError)
     (EnvRecord ResolvedWebhook, [EventHeaderInfo])
forall a b. (a -> b) -> a -> b
$ ResolveWebhookError
-> ResolveHeaderError
-> These ResolveWebhookError ResolveHeaderError
forall a b. a -> b -> These a b
These ResolveWebhookError
eventWebhookErrors ResolveHeaderError
eventHeaderErrorVars
        case Either
  (These ResolveWebhookError ResolveHeaderError)
  (EnvRecord ResolvedWebhook, [EventHeaderInfo])
webhookAndHeaderInfo of
          Right (EnvRecord ResolvedWebhook
webhookEnvRecord, [EventHeaderInfo]
eventHeaderInfo) -> do
            let processScheduledEventAction :: ExceptT QErr m ()
processScheduledEventAction =
                  (ReaderT (Logger Hasura, Manager) (ExceptT QErr m) ()
 -> (Logger Hasura, Manager) -> ExceptT QErr m ())
-> (Logger Hasura, Manager)
-> ReaderT (Logger Hasura, Manager) (ExceptT QErr m) ()
-> ExceptT QErr m ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip ReaderT (Logger Hasura, Manager) (ExceptT QErr m) ()
-> (Logger Hasura, Manager) -> ExceptT QErr m ()
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT (Logger Hasura
logger, Manager
httpMgr)
                    (ReaderT (Logger Hasura, Manager) (ExceptT QErr m) ()
 -> ExceptT QErr m ())
-> ReaderT (Logger Hasura, Manager) (ExceptT QErr m) ()
-> ExceptT QErr m ()
forall a b. (a -> b) -> a -> b
$ ScheduledTriggerMetrics
-> ScheduledEventId
-> [EventHeaderInfo]
-> RetryContext
-> ScheduledEventWebhookPayload
-> EnvRecord ResolvedWebhook
-> ScheduledEventType
-> ReaderT (Logger Hasura, Manager) (ExceptT QErr m) ()
forall r (m :: * -> *).
(MonadReader r m, Has Manager r, Has (Logger Hasura) r, MonadIO m,
 MonadTrace m, MonadMetadataStorage m, MonadError QErr m) =>
ScheduledTriggerMetrics
-> ScheduledEventId
-> [EventHeaderInfo]
-> RetryContext
-> ScheduledEventWebhookPayload
-> EnvRecord ResolvedWebhook
-> ScheduledEventType
-> m ()
processScheduledEvent ScheduledTriggerMetrics
scheduledTriggerMetrics ScheduledEventId
_ooseId [EventHeaderInfo]
eventHeaderInfo RetryContext
retryCtx ScheduledEventWebhookPayload
payload EnvRecord ResolvedWebhook
webhookEnvRecord ScheduledEventType
OneOff

                eventTimeout :: DiffTime
eventTimeout = Refined NonNegative DiffTime -> DiffTime
forall {k} (p :: k) x. Refined p x -> x
unrefine (Refined NonNegative DiffTime -> DiffTime)
-> Refined NonNegative DiffTime -> DiffTime
forall a b. (a -> b) -> a -> b
$ STRetryConf -> Refined NonNegative DiffTime
strcTimeoutSeconds (STRetryConf -> Refined NonNegative DiffTime)
-> STRetryConf -> Refined NonNegative DiffTime
forall a b. (a -> b) -> a -> b
$ STRetryConf
_ooseRetryConf

            -- Try to process the event with a timeout of min(`uppserBoundScheduledEventTimeout`, event's response timeout),
            -- so that we're never blocked forever while processing a single event.
            --
            -- If the request times out, then process it as an erroneous invocation and move on.
            Int -> ExceptT QErr m () -> ExceptT QErr m (Maybe ())
forall (m :: * -> *) a.
MonadBaseControl IO m =>
Int -> m a -> m (Maybe a)
timeout (Integer -> Int
forall a. Num a => Integer -> a
fromInteger (DiffTime -> Integer
diffTimeToMicroSeconds (DiffTime -> DiffTime -> DiffTime
forall a. Ord a => a -> a -> a
min DiffTime
upperBoundScheduledEventTimeout DiffTime
eventTimeout))) ExceptT QErr m ()
processScheduledEventAction
              ExceptT QErr m (Maybe ()) -> ExceptT QErr m () -> ExceptT QErr m ()
forall (m :: * -> *) a. Monad m => m (Maybe a) -> m a -> m a
`onNothingM` ( do
                               let eventTimeoutMessage :: Text
eventTimeoutMessage = Text
"One-off Scheduled event " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> ScheduledEventId
_ooseId ScheduledEventId -> Text -> Text
forall t. ToTxt t => t -> Text -> Text
<<> Text
" timed out while processing."
                                   eventTimeoutError :: QErr
eventTimeoutError = Code -> Text -> QErr
err500 Code
TimeoutErrorCode Text
eventTimeoutMessage
                               m () -> ExceptT QErr m ()
forall (m :: * -> *) a. Monad m => m a -> ExceptT QErr m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m () -> ExceptT QErr m ()) -> m () -> ExceptT QErr m ()
forall a b. (a -> b) -> a -> b
$ QErr -> m ()
logInternalError QErr
eventTimeoutError
                               ScheduledEventId
-> RetryContext
-> [HeaderConf]
-> ScheduledEventType
-> Value
-> HTTPErr Any
-> ScheduledTriggerMetrics
-> ExceptT QErr m ()
forall (m :: * -> *) (a :: TriggerTypes).
(MonadIO m, MonadMetadataStorage m, MonadError QErr m) =>
ScheduledEventId
-> RetryContext
-> [HeaderConf]
-> ScheduledEventType
-> Value
-> HTTPErr a
-> ScheduledTriggerMetrics
-> m ()
processError ScheduledEventId
_ooseId RetryContext
retryCtx [] ScheduledEventType
OneOff (Text -> Value
mkErrorObject Text
eventTimeoutMessage) (String -> HTTPErr Any
forall (a :: TriggerTypes). String -> HTTPErr a
HOther (String -> HTTPErr Any) -> String -> HTTPErr Any
forall a b. (a -> b) -> a -> b
$ Text -> String
T.unpack Text
eventTimeoutMessage) ScheduledTriggerMetrics
scheduledTriggerMetrics
                           )
            ScheduledEventId
-> TVar (Set ScheduledEventId) -> ExceptT QErr m ()
forall (m :: * -> *).
MonadIO m =>
ScheduledEventId -> TVar (Set ScheduledEventId) -> m ()
removeEventFromLockedEvents ScheduledEventId
_ooseId TVar (Set ScheduledEventId)
lockedOneOffScheduledEvents
          Left These ResolveWebhookError ResolveHeaderError
envVarError ->
            ScheduledEventId
-> RetryContext
-> [HeaderConf]
-> ScheduledEventType
-> Value
-> HTTPErr Any
-> ScheduledTriggerMetrics
-> ExceptT QErr m ()
forall (m :: * -> *) (a :: TriggerTypes).
(MonadIO m, MonadMetadataStorage m, MonadError QErr m) =>
ScheduledEventId
-> RetryContext
-> [HeaderConf]
-> ScheduledEventType
-> Value
-> HTTPErr a
-> ScheduledTriggerMetrics
-> m ()
processError
              ScheduledEventId
_ooseId
              RetryContext
retryCtx
              []
              ScheduledEventType
OneOff
              (Text -> Value
mkErrorObject (Text -> Value) -> Text -> Value
forall a b. (a -> b) -> a -> b
$ Text
"Error creating the request. " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> (These ResolveWebhookError ResolveHeaderError -> Text
mkInvalidEnvVarErrMsg (These ResolveWebhookError ResolveHeaderError -> Text)
-> These ResolveWebhookError ResolveHeaderError -> Text
forall a b. (a -> b) -> a -> b
$ These ResolveWebhookError ResolveHeaderError
envVarError))
              (String -> HTTPErr Any
forall (a :: TriggerTypes). String -> HTTPErr a
HOther (String -> HTTPErr Any) -> String -> HTTPErr Any
forall a b. (a -> b) -> a -> b
$ Text -> String
T.unpack (Text -> String) -> Text -> String
forall a b. (a -> b) -> a -> b
$ QErr -> Text
qeError (Code -> Text -> QErr
err400 Code
NotFound (These ResolveWebhookError ResolveHeaderError -> Text
mkInvalidEnvVarErrMsg These ResolveWebhookError ResolveHeaderError
envVarError)))
              ScheduledTriggerMetrics
scheduledTriggerMetrics
    where
      logInternalError :: QErr -> m ()
logInternalError QErr
err = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ())
-> (ScheduledTriggerInternalErr -> IO ())
-> ScheduledTriggerInternalErr
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Logger Hasura
-> forall a (m :: * -> *).
   (ToEngineLog a Hasura, MonadIO m) =>
   a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
   (ToEngineLog a impl, MonadIO m) =>
   a -> m ()
L.unLogger Logger Hasura
logger (ScheduledTriggerInternalErr -> m ())
-> ScheduledTriggerInternalErr -> m ()
forall a b. (a -> b) -> a -> b
$ QErr -> ScheduledTriggerInternalErr
ScheduledTriggerInternalErr QErr
err
      getTemplateFromUrl :: InputWebhook -> Text
getTemplateFromUrl InputWebhook
url = Template -> Text
printTemplate (Template -> Text) -> Template -> Text
forall a b. (a -> b) -> a -> b
$ InputWebhook -> Template
unInputWebhook InputWebhook
url
      mkInvalidEnvVarErrMsg :: These ResolveWebhookError ResolveHeaderError -> Text
mkInvalidEnvVarErrMsg These ResolveWebhookError ResolveHeaderError
envVarErrorValues = Text
"The value for environment variables not found: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> (These ResolveWebhookError ResolveHeaderError -> Text
getInvalidEnvVarText These ResolveWebhookError ResolveHeaderError
envVarErrorValues)
      mkErrorObject :: Text -> J.Value
      mkErrorObject :: Text -> Value
mkErrorObject Text
errorMessage =
        [Pair] -> Value
J.object ([Pair] -> Value) -> [Pair] -> Value
forall a b. (a -> b) -> a -> b
$ [Key
"error" Key -> Text -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
J..= Text
errorMessage]
      getInvalidEnvVarText :: These ResolveWebhookError ResolveHeaderError -> Text
      getInvalidEnvVarText :: These ResolveWebhookError ResolveHeaderError -> Text
getInvalidEnvVarText (This ResolveWebhookError
a) = ResolveWebhookError -> Text
forall a. ToTxt a => a -> Text
toTxt ResolveWebhookError
a
      getInvalidEnvVarText (That ResolveHeaderError
b) = ResolveHeaderError -> Text
forall a. ToTxt a => a -> Text
toTxt ResolveHeaderError
b
      getInvalidEnvVarText (These ResolveWebhookError
a ResolveHeaderError
b) = ResolveWebhookError -> Text
forall a. ToTxt a => a -> Text
toTxt ResolveWebhookError
a Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
", " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> ResolveHeaderError -> Text
forall a. ToTxt a => a -> Text
toTxt ResolveHeaderError
b

processScheduledTriggers ::
  ( MonadIO m,
    Tracing.MonadTrace m,
    MonadMetadataStorage m,
    MonadBaseControl IO m
  ) =>
  IO Env.Environment ->
  L.Logger L.Hasura ->
  FetchedScheduledEventsStatsLogger ->
  HTTP.Manager ->
  ScheduledTriggerMetrics ->
  IO SchemaCache ->
  LockedEventsCtx ->
  m (Forever m)
processScheduledTriggers :: forall (m :: * -> *).
(MonadIO m, MonadTrace m, MonadMetadataStorage m,
 MonadBaseControl IO m) =>
IO Environment
-> Logger Hasura
-> FetchedScheduledEventsStatsLogger
-> Manager
-> ScheduledTriggerMetrics
-> IO SchemaCache
-> LockedEventsCtx
-> m (Forever m)
processScheduledTriggers IO Environment
getEnvHook Logger Hasura
logger FetchedScheduledEventsStatsLogger
statsLogger Manager
httpMgr ScheduledTriggerMetrics
scheduledTriggerMetrics IO SchemaCache
getSC LockedEventsCtx {TVar (HashMap SourceName (Set ScheduledEventId))
TVar (Set ScheduledEventId)
leCronEvents :: TVar (Set ScheduledEventId)
leOneOffEvents :: TVar (Set ScheduledEventId)
leEvents :: TVar (HashMap SourceName (Set ScheduledEventId))
leActionEvents :: TVar (Set ScheduledEventId)
leCronEvents :: LockedEventsCtx -> TVar (Set ScheduledEventId)
leOneOffEvents :: LockedEventsCtx -> TVar (Set ScheduledEventId)
leEvents :: LockedEventsCtx -> TVar (HashMap SourceName (Set ScheduledEventId))
leActionEvents :: LockedEventsCtx -> TVar (Set ScheduledEventId)
..} = do
  Forever m -> m (Forever m)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return
    (Forever m -> m (Forever m)) -> Forever m -> m (Forever m)
forall a b. (a -> b) -> a -> b
$ () -> (() -> m ()) -> Forever m
forall (m :: * -> *) a. a -> (a -> m a) -> Forever m
Forever ()
    ((() -> m ()) -> Forever m) -> (() -> m ()) -> Forever m
forall a b. (a -> b) -> a -> b
$ m () -> () -> m ()
forall a b. a -> b -> a
const do
      HashMap TriggerName CronTriggerInfo
cronTriggersInfo <- SchemaCache -> HashMap TriggerName CronTriggerInfo
scCronTriggers (SchemaCache -> HashMap TriggerName CronTriggerInfo)
-> m SchemaCache -> m (HashMap TriggerName CronTriggerInfo)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO SchemaCache -> m SchemaCache
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO SchemaCache
getSC
      Environment
env <- IO Environment -> m Environment
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO Environment
getEnvHook
      [TriggerName]
-> m (Either QErr ([CronEvent], [OneOffScheduledEvent]))
forall (m :: * -> *).
MonadMetadataStorage m =>
[TriggerName]
-> m (Either QErr ([CronEvent], [OneOffScheduledEvent]))
getScheduledEventsForDelivery (HashMap TriggerName CronTriggerInfo -> [TriggerName]
forall k v. HashMap k v -> [k]
HashMap.keys HashMap TriggerName CronTriggerInfo
cronTriggersInfo) m (Either QErr ([CronEvent], [OneOffScheduledEvent]))
-> (Either QErr ([CronEvent], [OneOffScheduledEvent]) -> m ())
-> m ()
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        Left QErr
e -> QErr -> m ()
logInternalError QErr
e
        Right ([CronEvent]
cronEvents, [OneOffScheduledEvent]
oneOffEvents) -> do
          FetchedScheduledEventsStatsLogger
-> CronEventsCount -> OneOffScheduledEventsCount -> m ()
forall (m :: * -> *).
MonadIO m =>
FetchedScheduledEventsStatsLogger
-> CronEventsCount -> OneOffScheduledEventsCount -> m ()
logFetchedScheduledEventsStats FetchedScheduledEventsStatsLogger
statsLogger (Int -> CronEventsCount
CronEventsCount (Int -> CronEventsCount) -> Int -> CronEventsCount
forall a b. (a -> b) -> a -> b
$ [CronEvent] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [CronEvent]
cronEvents) (Int -> OneOffScheduledEventsCount
OneOffScheduledEventsCount (Int -> OneOffScheduledEventsCount)
-> Int -> OneOffScheduledEventsCount
forall a b. (a -> b) -> a -> b
$ [OneOffScheduledEvent] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [OneOffScheduledEvent]
oneOffEvents)
          Logger Hasura
-> Manager
-> ScheduledTriggerMetrics
-> [CronEvent]
-> HashMap TriggerName CronTriggerInfo
-> TVar (Set ScheduledEventId)
-> m ()
forall (m :: * -> *).
(MonadIO m, MonadMetadataStorage m, MonadTrace m,
 MonadBaseControl IO m) =>
Logger Hasura
-> Manager
-> ScheduledTriggerMetrics
-> [CronEvent]
-> HashMap TriggerName CronTriggerInfo
-> TVar (Set ScheduledEventId)
-> m ()
processCronEvents Logger Hasura
logger Manager
httpMgr ScheduledTriggerMetrics
scheduledTriggerMetrics [CronEvent]
cronEvents HashMap TriggerName CronTriggerInfo
cronTriggersInfo TVar (Set ScheduledEventId)
leCronEvents
          Environment
-> Logger Hasura
-> Manager
-> ScheduledTriggerMetrics
-> [OneOffScheduledEvent]
-> TVar (Set ScheduledEventId)
-> m ()
forall (m :: * -> *).
(MonadIO m, MonadTrace m, MonadMetadataStorage m,
 MonadBaseControl IO m) =>
Environment
-> Logger Hasura
-> Manager
-> ScheduledTriggerMetrics
-> [OneOffScheduledEvent]
-> TVar (Set ScheduledEventId)
-> m ()
processOneOffScheduledEvents Environment
env Logger Hasura
logger Manager
httpMgr ScheduledTriggerMetrics
scheduledTriggerMetrics [OneOffScheduledEvent]
oneOffEvents TVar (Set ScheduledEventId)
leOneOffEvents
      -- NOTE: cron events are scheduled at times with minute resolution (as on
      -- unix), while one-off events can be set for arbitrary times. The sleep
      -- time here determines how overdue a scheduled event (cron or one-off)
      -- might be before we begin processing:
      IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ DiffTime -> IO ()
sleep (Seconds -> DiffTime
seconds Seconds
10)
  where
    logInternalError :: QErr -> m ()
logInternalError QErr
err = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ())
-> (ScheduledTriggerInternalErr -> IO ())
-> ScheduledTriggerInternalErr
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Logger Hasura
-> forall a (m :: * -> *).
   (ToEngineLog a Hasura, MonadIO m) =>
   a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
   (ToEngineLog a impl, MonadIO m) =>
   a -> m ()
L.unLogger Logger Hasura
logger (ScheduledTriggerInternalErr -> m ())
-> ScheduledTriggerInternalErr -> m ()
forall a b. (a -> b) -> a -> b
$ QErr -> ScheduledTriggerInternalErr
ScheduledTriggerInternalErr QErr
err

processScheduledEvent ::
  ( MonadReader r m,
    Has HTTP.Manager r,
    Has (L.Logger L.Hasura) r,
    MonadIO m,
    Tracing.MonadTrace m,
    MonadMetadataStorage m,
    MonadError QErr m
  ) =>
  ScheduledTriggerMetrics ->
  ScheduledEventId ->
  [EventHeaderInfo] ->
  RetryContext ->
  ScheduledEventWebhookPayload ->
  EnvRecord ResolvedWebhook ->
  ScheduledEventType ->
  m ()
processScheduledEvent :: forall r (m :: * -> *).
(MonadReader r m, Has Manager r, Has (Logger Hasura) r, MonadIO m,
 MonadTrace m, MonadMetadataStorage m, MonadError QErr m) =>
ScheduledTriggerMetrics
-> ScheduledEventId
-> [EventHeaderInfo]
-> RetryContext
-> ScheduledEventWebhookPayload
-> EnvRecord ResolvedWebhook
-> ScheduledEventType
-> m ()
processScheduledEvent ScheduledTriggerMetrics
scheduledTriggerMetrics ScheduledEventId
eventId [EventHeaderInfo]
eventHeaders RetryContext
retryCtx ScheduledEventWebhookPayload
payload EnvRecord ResolvedWebhook
webhookUrl ScheduledEventType
type' =
  SamplingPolicy -> Text -> m () -> m ()
forall (m :: * -> *) a.
(MonadIO m, MonadTrace m) =>
SamplingPolicy -> Text -> m a -> m a
Tracing.newTrace SamplingPolicy
Tracing.sampleAlways Text
traceNote do
    UTCTime
currentTime <- IO UTCTime -> m UTCTime
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
    let retryConf :: STRetryConf
retryConf = RetryContext -> STRetryConf
_rctxConf RetryContext
retryCtx
        scheduledTime :: UTCTime
scheduledTime = ScheduledEventWebhookPayload -> UTCTime
sewpScheduledTime ScheduledEventWebhookPayload
payload
    if NominalDiffTime -> DiffTime
forall x y. (Duration x, Duration y) => x -> y
convertDuration (UTCTime -> UTCTime -> NominalDiffTime
diffUTCTime UTCTime
currentTime UTCTime
scheduledTime)
      DiffTime -> DiffTime -> Bool
forall a. Ord a => a -> a -> Bool
> Refined NonNegative DiffTime -> DiffTime
forall {k} (p :: k) x. Refined p x -> x
unrefine (STRetryConf -> Refined NonNegative DiffTime
strcToleranceSeconds STRetryConf
retryConf)
      then ScheduledEventId -> ScheduledEventType -> m ()
forall (m :: * -> *).
(MonadMetadataStorage m, MonadError QErr m) =>
ScheduledEventId -> ScheduledEventType -> m ()
processDead ScheduledEventId
eventId ScheduledEventType
type'
      else do
        let timeoutSeconds :: Int
timeoutSeconds = DiffTime -> Int
forall b. Integral b => DiffTime -> b
forall a b. (RealFrac a, Integral b) => a -> b
round (DiffTime -> Int) -> DiffTime -> Int
forall a b. (a -> b) -> a -> b
$ Refined NonNegative DiffTime -> DiffTime
forall {k} (p :: k) x. Refined p x -> x
unrefine (STRetryConf -> Refined NonNegative DiffTime
strcTimeoutSeconds STRetryConf
retryConf)
            httpTimeout :: ResponseTimeout
httpTimeout = Int -> ResponseTimeout
HTTP.responseTimeoutMicro (Int
timeoutSeconds Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000000)
            ([Header]
headers, [HeaderConf]
decodedHeaders) = [EventHeaderInfo] -> ([Header], [HeaderConf])
prepareHeaders [EventHeaderInfo]
eventHeaders
            extraLogCtx :: ExtraLogContext
extraLogCtx = ScheduledEventId -> Maybe TriggerName -> ExtraLogContext
ExtraLogContext ScheduledEventId
eventId (ScheduledEventWebhookPayload -> Maybe TriggerName
sewpName ScheduledEventWebhookPayload
payload)
            webhookReqBodyJson :: Value
webhookReqBodyJson = ScheduledEventWebhookPayload -> Value
forall a. ToJSON a => a -> Value
J.toJSON ScheduledEventWebhookPayload
payload
            webhookReqBody :: ByteString
webhookReqBody = Value -> ByteString
forall a. ToJSON a => a -> ByteString
J.encode Value
webhookReqBodyJson
            requestTransform :: Maybe RequestTransform
requestTransform = ScheduledEventWebhookPayload -> Maybe RequestTransform
sewpRequestTransform ScheduledEventWebhookPayload
payload
            responseTransform :: Maybe ResponseTransform
responseTransform = MetadataResponseTransform -> ResponseTransform
mkResponseTransform (MetadataResponseTransform -> ResponseTransform)
-> Maybe MetadataResponseTransform -> Maybe ResponseTransform
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ScheduledEventWebhookPayload -> Maybe MetadataResponseTransform
sewpResponseTransform ScheduledEventWebhookPayload
payload

        Either
  (TransformableRequestError 'ScheduledType)
  (Request, HTTPResp 'ScheduledType)
eitherReqRes <-
          ExceptT
  (TransformableRequestError 'ScheduledType)
  m
  (Request, HTTPResp 'ScheduledType)
-> m (Either
        (TransformableRequestError 'ScheduledType)
        (Request, HTTPResp 'ScheduledType))
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT
            (ExceptT
   (TransformableRequestError 'ScheduledType)
   m
   (Request, HTTPResp 'ScheduledType)
 -> m (Either
         (TransformableRequestError 'ScheduledType)
         (Request, HTTPResp 'ScheduledType)))
-> ExceptT
     (TransformableRequestError 'ScheduledType)
     m
     (Request, HTTPResp 'ScheduledType)
-> m (Either
        (TransformableRequestError 'ScheduledType)
        (Request, HTTPResp 'ScheduledType))
forall a b. (a -> b) -> a -> b
$ [Header]
-> ResponseTimeout
-> ByteString
-> Maybe RequestTransform
-> ResolvedWebhook
-> ExceptT
     (TransformableRequestError 'ScheduledType) m RequestDetails
forall (a :: TriggerTypes) (m :: * -> *).
MonadError (TransformableRequestError a) m =>
[Header]
-> ResponseTimeout
-> ByteString
-> Maybe RequestTransform
-> ResolvedWebhook
-> m RequestDetails
mkRequest [Header]
headers ResponseTimeout
httpTimeout ByteString
webhookReqBody Maybe RequestTransform
requestTransform (EnvRecord ResolvedWebhook -> ResolvedWebhook
forall a. EnvRecord a -> a
_envVarValue EnvRecord ResolvedWebhook
webhookUrl)
            ExceptT (TransformableRequestError 'ScheduledType) m RequestDetails
-> (RequestDetails
    -> ExceptT
         (TransformableRequestError 'ScheduledType)
         m
         (Request, HTTPResp 'ScheduledType))
-> ExceptT
     (TransformableRequestError 'ScheduledType)
     m
     (Request, HTTPResp 'ScheduledType)
forall a b.
ExceptT (TransformableRequestError 'ScheduledType) m a
-> (a -> ExceptT (TransformableRequestError 'ScheduledType) m b)
-> ExceptT (TransformableRequestError 'ScheduledType) m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \RequestDetails
reqDetails -> do
              let request :: Request
request = RequestDetails -> Request
extractRequest RequestDetails
reqDetails
                  logger :: Either (HTTPErr 'ScheduledType) (HTTPResp 'ScheduledType)
-> RequestDetails
-> ExceptT (TransformableRequestError 'ScheduledType) m ()
logger Either (HTTPErr 'ScheduledType) (HTTPResp 'ScheduledType)
e RequestDetails
d = do
                    Either (HTTPErr 'ScheduledType) (HTTPResp 'ScheduledType)
-> ExtraLogContext
-> RequestDetails
-> Text
-> [HeaderConf]
-> ExceptT (TransformableRequestError 'ScheduledType) m ()
forall r (m :: * -> *).
(MonadReader r m, Has (Logger Hasura) r, MonadIO m) =>
Either (HTTPErr 'ScheduledType) (HTTPResp 'ScheduledType)
-> ExtraLogContext
-> RequestDetails
-> Text
-> [HeaderConf]
-> m ()
logHTTPForST Either (HTTPErr 'ScheduledType) (HTTPResp 'ScheduledType)
e ExtraLogContext
extraLogCtx RequestDetails
d (EnvRecord ResolvedWebhook -> Text
forall a. EnvRecord a -> Text
_envVarName EnvRecord ResolvedWebhook
webhookUrl) [HeaderConf]
decodedHeaders
                    IO () -> ExceptT (TransformableRequestError 'ScheduledType) m ()
forall a.
IO a -> ExceptT (TransformableRequestError 'ScheduledType) m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ExceptT (TransformableRequestError 'ScheduledType) m ())
-> IO () -> ExceptT (TransformableRequestError 'ScheduledType) m ()
forall a b. (a -> b) -> a -> b
$ do
                      case Either (HTTPErr 'ScheduledType) (HTTPResp 'ScheduledType)
e of
                        Left HTTPErr 'ScheduledType
_err -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
                        Right HTTPResp 'ScheduledType
response ->
                          Counter -> Int64 -> IO ()
Prometheus.Counter.add
                            (ScheduledTriggerMetrics -> Counter
stmScheduledTriggerBytesReceived ScheduledTriggerMetrics
scheduledTriggerMetrics)
                            (HTTPResp 'ScheduledType -> Int64
forall (a :: TriggerTypes). HTTPResp a -> Int64
hrsSize HTTPResp 'ScheduledType
response)
                      let RequestDetails {Int64
_rdOriginalSize :: Int64
_rdOriginalSize :: RequestDetails -> Int64
_rdOriginalSize, Maybe Int64
_rdTransformedSize :: Maybe Int64
_rdTransformedSize :: RequestDetails -> Maybe Int64
_rdTransformedSize} = RequestDetails
d
                       in Counter -> Int64 -> IO ()
Prometheus.Counter.add
                            (ScheduledTriggerMetrics -> Counter
stmScheduledTriggerBytesSent ScheduledTriggerMetrics
scheduledTriggerMetrics)
                            (Int64 -> Maybe Int64 -> Int64
forall a. a -> Maybe a -> a
fromMaybe Int64
_rdOriginalSize Maybe Int64
_rdTransformedSize)
                      case (ScheduledEventType
type', Either (HTTPErr 'ScheduledType) (HTTPResp 'ScheduledType)
e) of
                        (ScheduledEventType
Cron, Left HTTPErr 'ScheduledType
_err) -> Counter -> IO ()
Prometheus.Counter.inc (ScheduledTriggerMetrics -> Counter
stmCronEventsInvocationTotalFailure ScheduledTriggerMetrics
scheduledTriggerMetrics)
                        (ScheduledEventType
Cron, Right HTTPResp 'ScheduledType
_) -> Counter -> IO ()
Prometheus.Counter.inc (ScheduledTriggerMetrics -> Counter
stmCronEventsInvocationTotalSuccess ScheduledTriggerMetrics
scheduledTriggerMetrics)
                        (ScheduledEventType
OneOff, Left HTTPErr 'ScheduledType
_err) -> Counter -> IO ()
Prometheus.Counter.inc (ScheduledTriggerMetrics -> Counter
stmOneOffEventsInvocationTotalFailure ScheduledTriggerMetrics
scheduledTriggerMetrics)
                        (ScheduledEventType
OneOff, Right HTTPResp 'ScheduledType
_) -> Counter -> IO ()
Prometheus.Counter.inc (ScheduledTriggerMetrics -> Counter
stmOneOffEventsInvocationTotalSuccess ScheduledTriggerMetrics
scheduledTriggerMetrics)
                  sessionVars :: Maybe SessionVariables
sessionVars = RequestDetails -> Maybe SessionVariables
_rdSessionVars RequestDetails
reqDetails
              HTTPResp 'ScheduledType
resp <- RequestDetails
-> Maybe ResponseTransform
-> Maybe SessionVariables
-> (Either (HTTPErr 'ScheduledType) (HTTPResp 'ScheduledType)
    -> RequestDetails
    -> ExceptT (TransformableRequestError 'ScheduledType) m ())
-> ExceptT
     (TransformableRequestError 'ScheduledType)
     m
     (HTTPResp 'ScheduledType)
forall r (m :: * -> *) (a :: TriggerTypes).
(MonadReader r m, MonadError (TransformableRequestError a) m,
 Has Manager r, Has (Logger Hasura) r, MonadIO m, MonadTrace m) =>
RequestDetails
-> Maybe ResponseTransform
-> Maybe SessionVariables
-> (Either (HTTPErr a) (HTTPResp a) -> RequestDetails -> m ())
-> m (HTTPResp a)
invokeRequest RequestDetails
reqDetails Maybe ResponseTransform
responseTransform Maybe SessionVariables
sessionVars Either (HTTPErr 'ScheduledType) (HTTPResp 'ScheduledType)
-> RequestDetails
-> ExceptT (TransformableRequestError 'ScheduledType) m ()
logger
              (Request, HTTPResp 'ScheduledType)
-> ExceptT
     (TransformableRequestError 'ScheduledType)
     m
     (Request, HTTPResp 'ScheduledType)
forall a.
a -> ExceptT (TransformableRequestError 'ScheduledType) m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Request
request, HTTPResp 'ScheduledType
resp)
        case Either
  (TransformableRequestError 'ScheduledType)
  (Request, HTTPResp 'ScheduledType)
eitherReqRes of
          Right (Request
req, HTTPResp 'ScheduledType
resp) ->
            let reqBody :: Value
reqBody = Value -> Maybe Value -> Value
forall a. a -> Maybe a -> a
fromMaybe Value
J.Null (Maybe Value -> Value) -> Maybe Value -> Value
forall a b. (a -> b) -> a -> b
$ Getting (First ByteString) Request ByteString
-> Request -> Maybe ByteString
forall s (m :: * -> *) a.
MonadReader s m =>
Getting (First a) s a -> m (Maybe a)
preview ((RequestBody -> Const (First ByteString) RequestBody)
-> Request -> Const (First ByteString) Request
Lens' Request RequestBody
HTTP.body ((RequestBody -> Const (First ByteString) RequestBody)
 -> Request -> Const (First ByteString) Request)
-> ((ByteString -> Const (First ByteString) ByteString)
    -> RequestBody -> Const (First ByteString) RequestBody)
-> Getting (First ByteString) Request ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ByteString -> Const (First ByteString) ByteString)
-> RequestBody -> Const (First ByteString) RequestBody
Prism' RequestBody ByteString
HTTP._RequestBodyLBS) Request
req Maybe ByteString -> (ByteString -> Maybe Value) -> Maybe Value
forall a b. Maybe a -> (a -> Maybe b) -> Maybe b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall a. FromJSON a => ByteString -> Maybe a
J.decode @J.Value
             in ScheduledEventId
-> [HeaderConf]
-> ScheduledEventType
-> Value
-> HTTPResp 'ScheduledType
-> ScheduledTriggerMetrics
-> m ()
forall (m :: * -> *) (a :: TriggerTypes).
(MonadMetadataStorage m, MonadError QErr m, MonadIO m) =>
ScheduledEventId
-> [HeaderConf]
-> ScheduledEventType
-> Value
-> HTTPResp a
-> ScheduledTriggerMetrics
-> m ()
processSuccess ScheduledEventId
eventId [HeaderConf]
decodedHeaders ScheduledEventType
type' Value
reqBody HTTPResp 'ScheduledType
resp ScheduledTriggerMetrics
scheduledTriggerMetrics
          Left (HTTPError Value
reqBody HTTPErr 'ScheduledType
e) -> ScheduledEventId
-> RetryContext
-> [HeaderConf]
-> ScheduledEventType
-> Value
-> HTTPErr 'ScheduledType
-> ScheduledTriggerMetrics
-> m ()
forall (m :: * -> *) (a :: TriggerTypes).
(MonadIO m, MonadMetadataStorage m, MonadError QErr m) =>
ScheduledEventId
-> RetryContext
-> [HeaderConf]
-> ScheduledEventType
-> Value
-> HTTPErr a
-> ScheduledTriggerMetrics
-> m ()
processError ScheduledEventId
eventId RetryContext
retryCtx [HeaderConf]
decodedHeaders ScheduledEventType
type' Value
reqBody HTTPErr 'ScheduledType
e ScheduledTriggerMetrics
scheduledTriggerMetrics
          Left (TransformationError Value
_ TransformErrorBundle
e) -> do
            -- Log The Transformation Error
            Logger Hasura
logger :: L.Logger L.Hasura <- (r -> Logger Hasura) -> m (Logger Hasura)
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks r -> Logger Hasura
forall a t. Has a t => t -> a
getter
            Logger Hasura
-> forall a (m :: * -> *).
   (ToEngineLog a Hasura, MonadIO m) =>
   a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
   (ToEngineLog a impl, MonadIO m) =>
   a -> m ()
L.unLogger Logger Hasura
logger (UnstructuredLog -> m ()) -> UnstructuredLog -> m ()
forall a b. (a -> b) -> a -> b
$ LogLevel -> SerializableBlob -> UnstructuredLog
L.UnstructuredLog LogLevel
L.LevelError (ByteString -> SerializableBlob
SB.fromLBS (ByteString -> SerializableBlob) -> ByteString -> SerializableBlob
forall a b. (a -> b) -> a -> b
$ TransformErrorBundle -> ByteString
forall a. ToJSON a => a -> ByteString
J.encode TransformErrorBundle
e)

            -- Set event state to Error
            m (Either QErr ()) -> m ()
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM (m (Either QErr ()) -> m ()) -> m (Either QErr ()) -> m ()
forall a b. (a -> b) -> a -> b
$ ScheduledEventId
-> ScheduledEventOp -> ScheduledEventType -> m (Either QErr ())
forall (m :: * -> *).
MonadMetadataStorage m =>
ScheduledEventId
-> ScheduledEventOp -> ScheduledEventType -> m (Either QErr ())
setScheduledEventOp ScheduledEventId
eventId (ScheduledEventStatus -> ScheduledEventOp
SEOpStatus ScheduledEventStatus
SESError) ScheduledEventType
type'
  where
    traceNote :: Text
traceNote = Text
"Scheduled trigger" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> (TriggerName -> Text) -> Maybe TriggerName -> Text
forall m a. Monoid m => (a -> m) -> Maybe a -> m
forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
foldMap ((Text
": " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<>) (Text -> Text) -> (TriggerName -> Text) -> TriggerName -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TriggerName -> Text
triggerNameToTxt) (ScheduledEventWebhookPayload -> Maybe TriggerName
sewpName ScheduledEventWebhookPayload
payload)

processError ::
  ( MonadIO m,
    MonadMetadataStorage m,
    MonadError QErr m
  ) =>
  ScheduledEventId ->
  RetryContext ->
  [HeaderConf] ->
  ScheduledEventType ->
  J.Value ->
  HTTPErr a ->
  ScheduledTriggerMetrics ->
  m ()
processError :: forall (m :: * -> *) (a :: TriggerTypes).
(MonadIO m, MonadMetadataStorage m, MonadError QErr m) =>
ScheduledEventId
-> RetryContext
-> [HeaderConf]
-> ScheduledEventType
-> Value
-> HTTPErr a
-> ScheduledTriggerMetrics
-> m ()
processError ScheduledEventId
eventId RetryContext
retryCtx [HeaderConf]
decodedHeaders ScheduledEventType
type' Value
reqJson HTTPErr a
err ScheduledTriggerMetrics
scheduledTriggerMetric = do
  let invocation :: Invocation 'ScheduledType
invocation = case HTTPErr a
err of
        HClient HttpException
httpException ->
          let statusMaybe :: Maybe Int
statusMaybe = HttpException -> Maybe Int
getHTTPExceptionStatus HttpException
httpException
           in ScheduledEventId
-> Maybe Int
-> [HeaderConf]
-> SerializableBlob
-> [HeaderConf]
-> Value
-> Invocation 'ScheduledType
mkInvocation ScheduledEventId
eventId Maybe Int
statusMaybe [HeaderConf]
decodedHeaders (ByteString -> SerializableBlob
SB.fromLBS (ByteString -> SerializableBlob) -> ByteString -> SerializableBlob
forall a b. (a -> b) -> a -> b
$ HttpException -> ByteString
httpExceptionErrorEncoding HttpException
httpException) [] Value
reqJson
        HStatus HTTPResp a
errResp -> do
          let respPayload :: SerializableBlob
respPayload = HTTPResp a -> SerializableBlob
forall (a :: TriggerTypes). HTTPResp a -> SerializableBlob
hrsBody HTTPResp a
errResp
              respHeaders :: [HeaderConf]
respHeaders = HTTPResp a -> [HeaderConf]
forall (a :: TriggerTypes). HTTPResp a -> [HeaderConf]
hrsHeaders HTTPResp a
errResp
              respStatus :: Int
respStatus = HTTPResp a -> Int
forall (a :: TriggerTypes). HTTPResp a -> Int
hrsStatus HTTPResp a
errResp
          ScheduledEventId
-> Maybe Int
-> [HeaderConf]
-> SerializableBlob
-> [HeaderConf]
-> Value
-> Invocation 'ScheduledType
mkInvocation ScheduledEventId
eventId (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
respStatus) [HeaderConf]
decodedHeaders SerializableBlob
respPayload [HeaderConf]
respHeaders Value
reqJson
        HOther String
detail -> do
          let errMsg :: SerializableBlob
errMsg = (ByteString -> SerializableBlob
SB.fromLBS (ByteString -> SerializableBlob) -> ByteString -> SerializableBlob
forall a b. (a -> b) -> a -> b
$ String -> ByteString
forall a. ToJSON a => a -> ByteString
J.encode String
detail)
          ScheduledEventId
-> Maybe Int
-> [HeaderConf]
-> SerializableBlob
-> [HeaderConf]
-> Value
-> Invocation 'ScheduledType
mkInvocation ScheduledEventId
eventId (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
500) [HeaderConf]
decodedHeaders SerializableBlob
errMsg [] Value
reqJson
  m (Either QErr ()) -> m ()
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM (m (Either QErr ()) -> m ()) -> m (Either QErr ()) -> m ()
forall a b. (a -> b) -> a -> b
$ Invocation 'ScheduledType
-> ScheduledEventType -> m (Either QErr ())
forall (m :: * -> *).
MonadMetadataStorage m =>
Invocation 'ScheduledType
-> ScheduledEventType -> m (Either QErr ())
insertScheduledEventInvocation Invocation 'ScheduledType
invocation ScheduledEventType
type'
  ScheduledEventId
-> RetryContext
-> HTTPErr a
-> ScheduledEventType
-> ScheduledTriggerMetrics
-> m ()
forall (m :: * -> *) (a :: TriggerTypes).
(MonadIO m, MonadMetadataStorage m, MonadError QErr m) =>
ScheduledEventId
-> RetryContext
-> HTTPErr a
-> ScheduledEventType
-> ScheduledTriggerMetrics
-> m ()
retryOrMarkError ScheduledEventId
eventId RetryContext
retryCtx HTTPErr a
err ScheduledEventType
type' ScheduledTriggerMetrics
scheduledTriggerMetric

retryOrMarkError ::
  (MonadIO m, MonadMetadataStorage m, MonadError QErr m) =>
  ScheduledEventId ->
  RetryContext ->
  HTTPErr a ->
  ScheduledEventType ->
  ScheduledTriggerMetrics ->
  m ()
retryOrMarkError :: forall (m :: * -> *) (a :: TriggerTypes).
(MonadIO m, MonadMetadataStorage m, MonadError QErr m) =>
ScheduledEventId
-> RetryContext
-> HTTPErr a
-> ScheduledEventType
-> ScheduledTriggerMetrics
-> m ()
retryOrMarkError ScheduledEventId
eventId RetryContext
retryCtx HTTPErr a
err ScheduledEventType
type' ScheduledTriggerMetrics
scheduledTriggerMetric = do
  let RetryContext Int
tries STRetryConf
retryConf = RetryContext
retryCtx
      mRetryHeader :: Maybe Text
mRetryHeader = HTTPErr a -> Maybe Text
forall (a :: TriggerTypes). HTTPErr a -> Maybe Text
getRetryAfterHeaderFromHTTPErr HTTPErr a
err
      mRetryHeaderSeconds :: Maybe Int
mRetryHeaderSeconds = Text -> Maybe Int
parseRetryHeaderValue (Text -> Maybe Int) -> Maybe Text -> Maybe Int
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Maybe Text
mRetryHeader
      triesExhausted :: Bool
triesExhausted = Int
tries Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= STRetryConf -> Int
strcNumRetries STRetryConf
retryConf
      noRetryHeader :: Bool
noRetryHeader = Maybe Int -> Bool
forall a. Maybe a -> Bool
isNothing Maybe Int
mRetryHeaderSeconds
  if Bool
triesExhausted Bool -> Bool -> Bool
&& Bool
noRetryHeader
    then do
      m (Either QErr ()) -> m ()
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM (m (Either QErr ()) -> m ()) -> m (Either QErr ()) -> m ()
forall a b. (a -> b) -> a -> b
$ ScheduledEventId
-> ScheduledEventOp -> ScheduledEventType -> m (Either QErr ())
forall (m :: * -> *).
MonadMetadataStorage m =>
ScheduledEventId
-> ScheduledEventOp -> ScheduledEventType -> m (Either QErr ())
setScheduledEventOp ScheduledEventId
eventId (ScheduledEventStatus -> ScheduledEventOp
SEOpStatus ScheduledEventStatus
SESError) ScheduledEventType
type'
      case ScheduledEventType
type' of
        ScheduledEventType
Cron -> IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Counter -> IO ()
Prometheus.Counter.inc (ScheduledTriggerMetrics -> Counter
stmCronEventsProcessedTotalFailure ScheduledTriggerMetrics
scheduledTriggerMetric)
        ScheduledEventType
OneOff -> IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Counter -> IO ()
Prometheus.Counter.inc (ScheduledTriggerMetrics -> Counter
stmOneOffEventsProcessedTotalFailure ScheduledTriggerMetrics
scheduledTriggerMetric)
    else do
      UTCTime
currentTime <- IO UTCTime -> m UTCTime
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
      let delay :: Int
delay =
            Int -> Maybe Int -> Int
forall a. a -> Maybe a -> a
fromMaybe
              ( DiffTime -> Int
forall b. Integral b => DiffTime -> b
forall a b. (RealFrac a, Integral b) => a -> b
round (DiffTime -> Int) -> DiffTime -> Int
forall a b. (a -> b) -> a -> b
$ Refined NonNegative DiffTime -> DiffTime
forall {k} (p :: k) x. Refined p x -> x
unrefine (STRetryConf -> Refined NonNegative DiffTime
strcRetryIntervalSeconds STRetryConf
retryConf)
              )
              Maybe Int
mRetryHeaderSeconds
          diff :: NominalDiffTime
diff = Int -> NominalDiffTime
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
delay
          retryTime :: UTCTime
retryTime = NominalDiffTime -> UTCTime -> UTCTime
addUTCTime NominalDiffTime
diff UTCTime
currentTime
      m (Either QErr ()) -> m ()
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM (m (Either QErr ()) -> m ()) -> m (Either QErr ()) -> m ()
forall a b. (a -> b) -> a -> b
$ ScheduledEventId
-> ScheduledEventOp -> ScheduledEventType -> m (Either QErr ())
forall (m :: * -> *).
MonadMetadataStorage m =>
ScheduledEventId
-> ScheduledEventOp -> ScheduledEventType -> m (Either QErr ())
setScheduledEventOp ScheduledEventId
eventId (UTCTime -> ScheduledEventOp
SEOpRetry UTCTime
retryTime) ScheduledEventType
type'

{- Note [Scheduled event lifecycle]
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Scheduled events move between six different states over the course of their
lifetime, as represented by the following flowchart:

    ┌───────────┐      ┌────────┐      ┌───────────┐
    │ scheduled │─(1)─→│ locked │─(2)─→│ delivered │
    └───────────┘      └────────┘      └───────────┘
            ↑              │           ┌───────┐
            └────(3)───────┼─────(4)──→│ error │
                           │           └───────┘
                           │           ┌──────┐
                           └─────(5)──→│ dead │
                                       └──────┘

When a scheduled event is first created, it starts in the 'scheduled' state,
and it can transition to other states in the following ways:
  1. When graphql-engine fetches a scheduled event from the database to process
     it, it sets its state to 'locked'. This prevents multiple graphql-engine
     instances running on the same database from processing the same
     scheduled event concurrently.
  2. When a scheduled event is processed successfully, it is marked 'delivered'.
  3. If a scheduled event fails to be processed, but it hasn’t yet reached
     its maximum retry limit, its retry counter is incremented and
     it is returned to the 'scheduled' state.
  4. If a scheduled event fails to be processed and *has* reached its
     retry limit, its state is set to 'error'.
  5. If for whatever reason the difference between the current time and the
     scheduled time is greater than the tolerance of the scheduled event, it
     will not be processed and its state will be set to 'dead'.
-}

processSuccess ::
  (MonadMetadataStorage m, MonadError QErr m, MonadIO m) =>
  ScheduledEventId ->
  [HeaderConf] ->
  ScheduledEventType ->
  J.Value ->
  HTTPResp a ->
  ScheduledTriggerMetrics ->
  m ()
processSuccess :: forall (m :: * -> *) (a :: TriggerTypes).
(MonadMetadataStorage m, MonadError QErr m, MonadIO m) =>
ScheduledEventId
-> [HeaderConf]
-> ScheduledEventType
-> Value
-> HTTPResp a
-> ScheduledTriggerMetrics
-> m ()
processSuccess ScheduledEventId
eventId [HeaderConf]
decodedHeaders ScheduledEventType
type' Value
reqBodyJson HTTPResp a
resp ScheduledTriggerMetrics
scheduledTriggerMetric = do
  let respBody :: SerializableBlob
respBody = HTTPResp a -> SerializableBlob
forall (a :: TriggerTypes). HTTPResp a -> SerializableBlob
hrsBody HTTPResp a
resp
      respHeaders :: [HeaderConf]
respHeaders = HTTPResp a -> [HeaderConf]
forall (a :: TriggerTypes). HTTPResp a -> [HeaderConf]
hrsHeaders HTTPResp a
resp
      respStatus :: Int
respStatus = HTTPResp a -> Int
forall (a :: TriggerTypes). HTTPResp a -> Int
hrsStatus HTTPResp a
resp
      invocation :: Invocation 'ScheduledType
invocation = ScheduledEventId
-> Maybe Int
-> [HeaderConf]
-> SerializableBlob
-> [HeaderConf]
-> Value
-> Invocation 'ScheduledType
mkInvocation ScheduledEventId
eventId (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
respStatus) [HeaderConf]
decodedHeaders SerializableBlob
respBody [HeaderConf]
respHeaders Value
reqBodyJson
  m (Either QErr ()) -> m ()
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM (m (Either QErr ()) -> m ()) -> m (Either QErr ()) -> m ()
forall a b. (a -> b) -> a -> b
$ Invocation 'ScheduledType
-> ScheduledEventType -> m (Either QErr ())
forall (m :: * -> *).
MonadMetadataStorage m =>
Invocation 'ScheduledType
-> ScheduledEventType -> m (Either QErr ())
insertScheduledEventInvocation Invocation 'ScheduledType
invocation ScheduledEventType
type'
  m (Either QErr ()) -> m ()
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM (m (Either QErr ()) -> m ()) -> m (Either QErr ()) -> m ()
forall a b. (a -> b) -> a -> b
$ ScheduledEventId
-> ScheduledEventOp -> ScheduledEventType -> m (Either QErr ())
forall (m :: * -> *).
MonadMetadataStorage m =>
ScheduledEventId
-> ScheduledEventOp -> ScheduledEventType -> m (Either QErr ())
setScheduledEventOp ScheduledEventId
eventId (ScheduledEventStatus -> ScheduledEventOp
SEOpStatus ScheduledEventStatus
SESDelivered) ScheduledEventType
type'
  case ScheduledEventType
type' of
    ScheduledEventType
Cron -> IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Counter -> IO ()
Prometheus.Counter.inc (ScheduledTriggerMetrics -> Counter
stmCronEventsProcessedTotalSuccess ScheduledTriggerMetrics
scheduledTriggerMetric)
    ScheduledEventType
OneOff -> IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Counter -> IO ()
Prometheus.Counter.inc (ScheduledTriggerMetrics -> Counter
stmOneOffEventsProcessedTotalSuccess ScheduledTriggerMetrics
scheduledTriggerMetric)

processDead ::
  (MonadMetadataStorage m, MonadError QErr m) =>
  ScheduledEventId ->
  ScheduledEventType ->
  m ()
processDead :: forall (m :: * -> *).
(MonadMetadataStorage m, MonadError QErr m) =>
ScheduledEventId -> ScheduledEventType -> m ()
processDead ScheduledEventId
eventId ScheduledEventType
type' =
  m (Either QErr ()) -> m ()
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM (m (Either QErr ()) -> m ()) -> m (Either QErr ()) -> m ()
forall a b. (a -> b) -> a -> b
$ ScheduledEventId
-> ScheduledEventOp -> ScheduledEventType -> m (Either QErr ())
forall (m :: * -> *).
MonadMetadataStorage m =>
ScheduledEventId
-> ScheduledEventOp -> ScheduledEventType -> m (Either QErr ())
setScheduledEventOp ScheduledEventId
eventId (ScheduledEventStatus -> ScheduledEventOp
SEOpStatus ScheduledEventStatus
SESDead) ScheduledEventType
type'

mkInvocation ::
  ScheduledEventId ->
  Maybe Int ->
  [HeaderConf] ->
  SB.SerializableBlob ->
  [HeaderConf] ->
  J.Value ->
  (Invocation 'ScheduledType)
mkInvocation :: ScheduledEventId
-> Maybe Int
-> [HeaderConf]
-> SerializableBlob
-> [HeaderConf]
-> Value
-> Invocation 'ScheduledType
mkInvocation ScheduledEventId
eventId Maybe Int
status [HeaderConf]
reqHeaders SerializableBlob
respBody [HeaderConf]
respHeaders Value
reqBodyJson =
  ScheduledEventId
-> Maybe Int
-> WebhookRequest
-> Response 'ScheduledType
-> Invocation 'ScheduledType
forall (a :: TriggerTypes).
ScheduledEventId
-> Maybe Int -> WebhookRequest -> Response a -> Invocation a
Invocation
    ScheduledEventId
eventId
    Maybe Int
status
    (Value -> [HeaderConf] -> Text -> WebhookRequest
mkWebhookReq Value
reqBodyJson [HeaderConf]
reqHeaders Text
invocationVersionST)
    (Maybe Int
-> SerializableBlob -> [HeaderConf] -> Response 'ScheduledType
forall (a :: TriggerTypes).
Maybe Int -> SerializableBlob -> [HeaderConf] -> Response a
mkInvocationResp Maybe Int
status SerializableBlob
respBody [HeaderConf]
respHeaders)

-- metadata database transactions

-- | Get cron trigger stats for cron jobs with fewer than 100 future reified
-- events in the database
--
-- The point here is to maintain a certain number of future events so the user
-- can kind of see what's coming up, and obviously to give 'processCronEvents'
-- something to do.
getDeprivedCronTriggerStatsTx :: [TriggerName] -> PG.TxE QErr [CronTriggerStats]
getDeprivedCronTriggerStatsTx :: [TriggerName] -> TxE QErr [CronTriggerStats]
getDeprivedCronTriggerStatsTx [TriggerName]
cronTriggerNames =
  ((TriggerName, Int, UTCTime) -> CronTriggerStats)
-> [(TriggerName, Int, UTCTime)] -> [CronTriggerStats]
forall a b. (a -> b) -> [a] -> [b]
map (\(TriggerName
n, Int
count, UTCTime
maxTx) -> TriggerName -> Int -> UTCTime -> CronTriggerStats
CronTriggerStats TriggerName
n Int
count UTCTime
maxTx)
    ([(TriggerName, Int, UTCTime)] -> [CronTriggerStats])
-> TxET QErr IO [(TriggerName, Int, UTCTime)]
-> TxE QErr [CronTriggerStats]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> Identity PGTextArray
-> Bool
-> TxET QErr IO [(TriggerName, Int, UTCTime)]
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
PG.withQE
      PGTxErr -> QErr
defaultTxErrorHandler
      [PG.sql|
      SELECT t.trigger_name, coalesce(q.upcoming_events_count, 0), coalesce(q.max_scheduled_time, now())
      FROM (SELECT UNNEST ($1::text[]) as trigger_name) as t
      LEFT JOIN
      ( SELECT
         trigger_name,
          count(1) as upcoming_events_count,
          max(scheduled_time) as max_scheduled_time
         FROM hdb_catalog.hdb_cron_events
         WHERE tries = 0 and status = 'scheduled'
         GROUP BY trigger_name
      ) AS q
      ON t.trigger_name = q.trigger_name
      WHERE coalesce(q.upcoming_events_count, 0) < 100
     |]
      (PGTextArray -> Identity PGTextArray
forall a. a -> Identity a
Identity (PGTextArray -> Identity PGTextArray)
-> PGTextArray -> Identity PGTextArray
forall a b. (a -> b) -> a -> b
$ [Text] -> PGTextArray
PGTextArray ([Text] -> PGTextArray) -> [Text] -> PGTextArray
forall a b. (a -> b) -> a -> b
$ (TriggerName -> Text) -> [TriggerName] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map TriggerName -> Text
triggerNameToTxt [TriggerName]
cronTriggerNames)
      Bool
True

-- TODO
--  - cron events have minute resolution, while one-off events have arbitrary
--    resolution, so it doesn't make sense to fetch them at the same rate
--  - if we decide to fetch cron events less frequently we should wake up that
--    thread at second 0 of every minute, and then pass hasura's now time into
--    the query (since the DB may disagree about the time)
getScheduledEventsForDeliveryTx :: [TriggerName] -> PG.TxE QErr ([CronEvent], [OneOffScheduledEvent])
getScheduledEventsForDeliveryTx :: [TriggerName] -> TxE QErr ([CronEvent], [OneOffScheduledEvent])
getScheduledEventsForDeliveryTx [TriggerName]
cronTriggerNames =
  (,) ([CronEvent]
 -> [OneOffScheduledEvent] -> ([CronEvent], [OneOffScheduledEvent]))
-> TxET QErr IO [CronEvent]
-> TxET
     QErr
     IO
     ([OneOffScheduledEvent] -> ([CronEvent], [OneOffScheduledEvent]))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TxET QErr IO [CronEvent]
getCronEventsForDelivery TxET
  QErr
  IO
  ([OneOffScheduledEvent] -> ([CronEvent], [OneOffScheduledEvent]))
-> TxET QErr IO [OneOffScheduledEvent]
-> TxE QErr ([CronEvent], [OneOffScheduledEvent])
forall a b.
TxET QErr IO (a -> b) -> TxET QErr IO a -> TxET QErr IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> TxET QErr IO [OneOffScheduledEvent]
getOneOffEventsForDelivery
  where
    getCronEventsForDelivery :: PG.TxE QErr [CronEvent]
    getCronEventsForDelivery :: TxET QErr IO [CronEvent]
getCronEventsForDelivery =
      (Identity (ViaJSON CronEvent) -> CronEvent)
-> [Identity (ViaJSON CronEvent)] -> [CronEvent]
forall a b. (a -> b) -> [a] -> [b]
map (ViaJSON CronEvent -> CronEvent
forall a. ViaJSON a -> a
PG.getViaJSON (ViaJSON CronEvent -> CronEvent)
-> (Identity (ViaJSON CronEvent) -> ViaJSON CronEvent)
-> Identity (ViaJSON CronEvent)
-> CronEvent
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Identity (ViaJSON CronEvent) -> ViaJSON CronEvent
forall a. Identity a -> a
runIdentity)
        ([Identity (ViaJSON CronEvent)] -> [CronEvent])
-> TxET QErr IO [Identity (ViaJSON CronEvent)]
-> TxET QErr IO [CronEvent]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> Identity PGTextArray
-> Bool
-> TxET QErr IO [Identity (ViaJSON CronEvent)]
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
PG.withQE
          PGTxErr -> QErr
defaultTxErrorHandler
          [PG.sql|
        WITH cte AS
          ( UPDATE hdb_catalog.hdb_cron_events
            SET status = 'locked'
            WHERE id IN ( SELECT t.id
                          FROM hdb_catalog.hdb_cron_events t
                          WHERE ( t.status = 'scheduled'
                                  and (
                                   (t.next_retry_at is NULL and t.scheduled_time <= now()) or
                                   (t.next_retry_at is not NULL and t.next_retry_at <= now())
                                  )
                                ) AND trigger_name = ANY($1)
                          FOR UPDATE SKIP LOCKED
                          )
            RETURNING *
          )
        SELECT row_to_json(t.*) FROM cte AS t
      |]
          (PGTextArray -> Identity PGTextArray
forall a. a -> Identity a
Identity (PGTextArray -> Identity PGTextArray)
-> PGTextArray -> Identity PGTextArray
forall a b. (a -> b) -> a -> b
$ [Text] -> PGTextArray
PGTextArray ([Text] -> PGTextArray) -> [Text] -> PGTextArray
forall a b. (a -> b) -> a -> b
$ TriggerName -> Text
triggerNameToTxt (TriggerName -> Text) -> [TriggerName] -> [Text]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [TriggerName]
cronTriggerNames)
          Bool
True

    getOneOffEventsForDelivery :: PG.TxE QErr [OneOffScheduledEvent]
    getOneOffEventsForDelivery :: TxET QErr IO [OneOffScheduledEvent]
getOneOffEventsForDelivery = do
      (Identity (ViaJSON OneOffScheduledEvent) -> OneOffScheduledEvent)
-> [Identity (ViaJSON OneOffScheduledEvent)]
-> [OneOffScheduledEvent]
forall a b. (a -> b) -> [a] -> [b]
map (ViaJSON OneOffScheduledEvent -> OneOffScheduledEvent
forall a. ViaJSON a -> a
PG.getViaJSON (ViaJSON OneOffScheduledEvent -> OneOffScheduledEvent)
-> (Identity (ViaJSON OneOffScheduledEvent)
    -> ViaJSON OneOffScheduledEvent)
-> Identity (ViaJSON OneOffScheduledEvent)
-> OneOffScheduledEvent
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Identity (ViaJSON OneOffScheduledEvent)
-> ViaJSON OneOffScheduledEvent
forall a. Identity a -> a
runIdentity)
        ([Identity (ViaJSON OneOffScheduledEvent)]
 -> [OneOffScheduledEvent])
-> TxET QErr IO [Identity (ViaJSON OneOffScheduledEvent)]
-> TxET QErr IO [OneOffScheduledEvent]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> ()
-> Bool
-> TxET QErr IO [Identity (ViaJSON OneOffScheduledEvent)]
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
PG.withQE
          PGTxErr -> QErr
defaultTxErrorHandler
          [PG.sql|
         WITH cte AS (
            UPDATE hdb_catalog.hdb_scheduled_events
            SET status = 'locked'
            WHERE id IN ( SELECT t.id
                          FROM hdb_catalog.hdb_scheduled_events t
                          WHERE ( t.status = 'scheduled'
                                  and (
                                   (t.next_retry_at is NULL and t.scheduled_time <= now()) or
                                   (t.next_retry_at is not NULL and t.next_retry_at <= now())
                                  )
                                )
                          FOR UPDATE SKIP LOCKED
                          )
            RETURNING *
          )
         SELECT row_to_json(t.*) FROM cte AS t
      |]
          ()
          Bool
False

insertInvocationTx :: Invocation 'ScheduledType -> ScheduledEventType -> PG.TxE QErr ()
insertInvocationTx :: Invocation 'ScheduledType -> ScheduledEventType -> TxE QErr ()
insertInvocationTx Invocation 'ScheduledType
invo ScheduledEventType
type' = do
  case ScheduledEventType
type' of
    ScheduledEventType
Cron -> do
      (PGTxErr -> QErr)
-> Query
-> (ScheduledEventId, Maybe Int64, ViaJSON Value, ViaJSON Value)
-> Bool
-> TxE QErr ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE
        PGTxErr -> QErr
defaultTxErrorHandler
        [PG.sql|
         INSERT INTO hdb_catalog.hdb_cron_event_invocation_logs
         (event_id, status, request, response)
         VALUES ($1, $2, $3, $4)
        |]
        ( Invocation 'ScheduledType -> ScheduledEventId
forall (a :: TriggerTypes). Invocation a -> ScheduledEventId
iEventId Invocation 'ScheduledType
invo,
          Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Int64) -> Maybe Int -> Maybe Int64
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Invocation 'ScheduledType -> Maybe Int
forall (a :: TriggerTypes). Invocation a -> Maybe Int
iStatus Invocation 'ScheduledType
invo :: Maybe Int64,
          Value -> ViaJSON Value
forall a. a -> ViaJSON a
PG.ViaJSON (Value -> ViaJSON Value) -> Value -> ViaJSON Value
forall a b. (a -> b) -> a -> b
$ WebhookRequest -> Value
forall a. ToJSON a => a -> Value
J.toJSON (WebhookRequest -> Value) -> WebhookRequest -> Value
forall a b. (a -> b) -> a -> b
$ Invocation 'ScheduledType -> WebhookRequest
forall (a :: TriggerTypes). Invocation a -> WebhookRequest
iRequest Invocation 'ScheduledType
invo,
          Value -> ViaJSON Value
forall a. a -> ViaJSON a
PG.ViaJSON (Value -> ViaJSON Value) -> Value -> ViaJSON Value
forall a b. (a -> b) -> a -> b
$ Response 'ScheduledType -> Value
forall a. ToJSON a => a -> Value
J.toJSON (Response 'ScheduledType -> Value)
-> Response 'ScheduledType -> Value
forall a b. (a -> b) -> a -> b
$ Invocation 'ScheduledType -> Response 'ScheduledType
forall (a :: TriggerTypes). Invocation a -> Response a
iResponse Invocation 'ScheduledType
invo
        )
        Bool
True
      (PGTxErr -> QErr)
-> Query -> Identity ScheduledEventId -> Bool -> TxE QErr ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE
        PGTxErr -> QErr
defaultTxErrorHandler
        [PG.sql|
          UPDATE hdb_catalog.hdb_cron_events
          SET tries = tries + 1
          WHERE id = $1
          |]
        (ScheduledEventId -> Identity ScheduledEventId
forall a. a -> Identity a
Identity (ScheduledEventId -> Identity ScheduledEventId)
-> ScheduledEventId -> Identity ScheduledEventId
forall a b. (a -> b) -> a -> b
$ Invocation 'ScheduledType -> ScheduledEventId
forall (a :: TriggerTypes). Invocation a -> ScheduledEventId
iEventId Invocation 'ScheduledType
invo)
        Bool
True
    ScheduledEventType
OneOff -> do
      (PGTxErr -> QErr)
-> Query
-> (ScheduledEventId, Maybe Int64, ViaJSON Value, ViaJSON Value)
-> Bool
-> TxE QErr ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE
        PGTxErr -> QErr
defaultTxErrorHandler
        [PG.sql|
         INSERT INTO hdb_catalog.hdb_scheduled_event_invocation_logs
         (event_id, status, request, response)
         VALUES ($1, $2, $3, $4)
        |]
        ( Invocation 'ScheduledType -> ScheduledEventId
forall (a :: TriggerTypes). Invocation a -> ScheduledEventId
iEventId Invocation 'ScheduledType
invo,
          Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Int64) -> Maybe Int -> Maybe Int64
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Invocation 'ScheduledType -> Maybe Int
forall (a :: TriggerTypes). Invocation a -> Maybe Int
iStatus Invocation 'ScheduledType
invo :: Maybe Int64,
          Value -> ViaJSON Value
forall a. a -> ViaJSON a
PG.ViaJSON (Value -> ViaJSON Value) -> Value -> ViaJSON Value
forall a b. (a -> b) -> a -> b
$ WebhookRequest -> Value
forall a. ToJSON a => a -> Value
J.toJSON (WebhookRequest -> Value) -> WebhookRequest -> Value
forall a b. (a -> b) -> a -> b
$ Invocation 'ScheduledType -> WebhookRequest
forall (a :: TriggerTypes). Invocation a -> WebhookRequest
iRequest Invocation 'ScheduledType
invo,
          Value -> ViaJSON Value
forall a. a -> ViaJSON a
PG.ViaJSON (Value -> ViaJSON Value) -> Value -> ViaJSON Value
forall a b. (a -> b) -> a -> b
$ Response 'ScheduledType -> Value
forall a. ToJSON a => a -> Value
J.toJSON (Response 'ScheduledType -> Value)
-> Response 'ScheduledType -> Value
forall a b. (a -> b) -> a -> b
$ Invocation 'ScheduledType -> Response 'ScheduledType
forall (a :: TriggerTypes). Invocation a -> Response a
iResponse Invocation 'ScheduledType
invo
        )
        Bool
True
      (PGTxErr -> QErr)
-> Query -> Identity ScheduledEventId -> Bool -> TxE QErr ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE
        PGTxErr -> QErr
defaultTxErrorHandler
        [PG.sql|
          UPDATE hdb_catalog.hdb_scheduled_events
          SET tries = tries + 1
          WHERE id = $1
          |]
        (ScheduledEventId -> Identity ScheduledEventId
forall a. a -> Identity a
Identity (ScheduledEventId -> Identity ScheduledEventId)
-> ScheduledEventId -> Identity ScheduledEventId
forall a b. (a -> b) -> a -> b
$ Invocation 'ScheduledType -> ScheduledEventId
forall (a :: TriggerTypes). Invocation a -> ScheduledEventId
iEventId Invocation 'ScheduledType
invo)
        Bool
True

setScheduledEventOpTx ::
  ScheduledEventId -> ScheduledEventOp -> ScheduledEventType -> PG.TxE QErr ()
setScheduledEventOpTx :: ScheduledEventId
-> ScheduledEventOp -> ScheduledEventType -> TxE QErr ()
setScheduledEventOpTx ScheduledEventId
eventId ScheduledEventOp
op ScheduledEventType
type' = case ScheduledEventOp
op of
  SEOpRetry UTCTime
time -> UTCTime -> TxE QErr ()
setRetry UTCTime
time
  SEOpStatus ScheduledEventStatus
status -> ScheduledEventStatus -> TxE QErr ()
setStatus ScheduledEventStatus
status
  where
    setRetry :: UTCTime -> TxE QErr ()
setRetry UTCTime
time =
      case ScheduledEventType
type' of
        ScheduledEventType
Cron ->
          (PGTxErr -> QErr)
-> Query -> (UTCTime, ScheduledEventId) -> Bool -> TxE QErr ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE
            PGTxErr -> QErr
defaultTxErrorHandler
            [PG.sql|
            UPDATE hdb_catalog.hdb_cron_events
            SET next_retry_at = $1,
            STATUS = 'scheduled'
            WHERE id = $2
            |]
            (UTCTime
time, ScheduledEventId
eventId)
            Bool
True
        ScheduledEventType
OneOff ->
          (PGTxErr -> QErr)
-> Query -> (UTCTime, ScheduledEventId) -> Bool -> TxE QErr ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE
            PGTxErr -> QErr
defaultTxErrorHandler
            [PG.sql|
            UPDATE hdb_catalog.hdb_scheduled_events
            SET next_retry_at = $1,
            STATUS = 'scheduled'
            WHERE id = $2
            |]
            (UTCTime
time, ScheduledEventId
eventId)
            Bool
True
    setStatus :: ScheduledEventStatus -> TxE QErr ()
setStatus ScheduledEventStatus
status =
      case ScheduledEventType
type' of
        ScheduledEventType
Cron -> do
          (PGTxErr -> QErr)
-> Query
-> (ScheduledEventId, ScheduledEventStatus)
-> Bool
-> TxE QErr ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE
            PGTxErr -> QErr
defaultTxErrorHandler
            [PG.sql|
            UPDATE hdb_catalog.hdb_cron_events
            SET status = $2
            WHERE id = $1
           |]
            (ScheduledEventId
eventId, ScheduledEventStatus
status)
            Bool
True
        ScheduledEventType
OneOff -> do
          (PGTxErr -> QErr)
-> Query
-> (ScheduledEventId, ScheduledEventStatus)
-> Bool
-> TxE QErr ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE
            PGTxErr -> QErr
defaultTxErrorHandler
            [PG.sql|
            UPDATE hdb_catalog.hdb_scheduled_events
            SET status = $2
            WHERE id = $1
           |]
            (ScheduledEventId
eventId, ScheduledEventStatus
status)
            Bool
True

unlockScheduledEventsTx :: ScheduledEventType -> [ScheduledEventId] -> PG.TxE QErr Int
unlockScheduledEventsTx :: ScheduledEventType -> [ScheduledEventId] -> TxE QErr Int
unlockScheduledEventsTx ScheduledEventType
type' [ScheduledEventId]
eventIds =
  let eventIdsTextArray :: [Text]
eventIdsTextArray = (ScheduledEventId -> Text) -> [ScheduledEventId] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map ScheduledEventId -> Text
unEventId [ScheduledEventId]
eventIds
   in case ScheduledEventType
type' of
        ScheduledEventType
Cron ->
          (Identity Int -> Int
forall a. Identity a -> a
runIdentity (Identity Int -> Int)
-> (SingleRow (Identity Int) -> Identity Int)
-> SingleRow (Identity Int)
-> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SingleRow (Identity Int) -> Identity Int
forall a. SingleRow a -> a
PG.getRow)
            (SingleRow (Identity Int) -> Int)
-> TxET QErr IO (SingleRow (Identity Int)) -> TxE QErr Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> Identity PGTextArray
-> Bool
-> TxET QErr IO (SingleRow (Identity Int))
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
PG.withQE
              PGTxErr -> QErr
defaultTxErrorHandler
              [PG.sql|
        WITH "cte" AS
        (UPDATE hdb_catalog.hdb_cron_events
        SET status = 'scheduled'
        WHERE id = ANY($1::text[]) and status = 'locked'
        RETURNING *)
        SELECT count(*) FROM "cte"
      |]
              (PGTextArray -> Identity PGTextArray
forall a. a -> Identity a
Identity (PGTextArray -> Identity PGTextArray)
-> PGTextArray -> Identity PGTextArray
forall a b. (a -> b) -> a -> b
$ [Text] -> PGTextArray
PGTextArray [Text]
eventIdsTextArray)
              Bool
True
        ScheduledEventType
OneOff ->
          (Identity Int -> Int
forall a. Identity a -> a
runIdentity (Identity Int -> Int)
-> (SingleRow (Identity Int) -> Identity Int)
-> SingleRow (Identity Int)
-> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SingleRow (Identity Int) -> Identity Int
forall a. SingleRow a -> a
PG.getRow)
            (SingleRow (Identity Int) -> Int)
-> TxET QErr IO (SingleRow (Identity Int)) -> TxE QErr Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> Identity PGTextArray
-> Bool
-> TxET QErr IO (SingleRow (Identity Int))
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
PG.withQE
              PGTxErr -> QErr
defaultTxErrorHandler
              [PG.sql|
        WITH "cte" AS
        (UPDATE hdb_catalog.hdb_scheduled_events
        SET status = 'scheduled'
        WHERE id = ANY($1::text[]) AND status = 'locked'
        RETURNING *)
        SELECT count(*) FROM "cte"
      |]
              (PGTextArray -> Identity PGTextArray
forall a. a -> Identity a
Identity (PGTextArray -> Identity PGTextArray)
-> PGTextArray -> Identity PGTextArray
forall a b. (a -> b) -> a -> b
$ [Text] -> PGTextArray
PGTextArray [Text]
eventIdsTextArray)
              Bool
True

unlockAllLockedScheduledEventsTx :: PG.TxE QErr ()
unlockAllLockedScheduledEventsTx :: TxE QErr ()
unlockAllLockedScheduledEventsTx = do
  (PGTxErr -> QErr) -> Query -> () -> Bool -> TxE QErr ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE
    PGTxErr -> QErr
defaultTxErrorHandler
    [PG.sql|
          UPDATE hdb_catalog.hdb_cron_events
          SET status = 'scheduled'
          WHERE status = 'locked'
          |]
    ()
    Bool
True
  (PGTxErr -> QErr) -> Query -> () -> Bool -> TxE QErr ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE
    PGTxErr -> QErr
defaultTxErrorHandler
    [PG.sql|
          UPDATE hdb_catalog.hdb_scheduled_events
          SET status = 'scheduled'
          WHERE status = 'locked'
          |]
    ()
    Bool
True

insertCronEventsTx :: [CronEventSeed] -> PG.TxE QErr ()
insertCronEventsTx :: [CronEventSeed] -> TxE QErr ()
insertCronEventsTx [CronEventSeed]
cronSeeds = do
  let insertCronEventsSql :: Text
insertCronEventsSql =
        Builder -> Text
TB.run
          (Builder -> Text) -> Builder -> Text
forall a b. (a -> b) -> a -> b
$ SQLInsert -> Builder
forall a. ToSQL a => a -> Builder
toSQL
            S.SQLInsert
              { siTable :: QualifiedTable
siTable = QualifiedTable
cronEventsTable,
                siCols :: [PGCol]
siCols = (Text -> PGCol) -> [Text] -> [PGCol]
forall a b. (a -> b) -> [a] -> [b]
map Text -> PGCol
unsafePGCol [Text
"trigger_name", Text
"scheduled_time"],
                siValues :: ValuesExp
siValues = [TupleExp] -> ValuesExp
S.ValuesExp ([TupleExp] -> ValuesExp) -> [TupleExp] -> ValuesExp
forall a b. (a -> b) -> a -> b
$ (CronEventSeed -> TupleExp) -> [CronEventSeed] -> [TupleExp]
forall a b. (a -> b) -> [a] -> [b]
map ([Text] -> TupleExp
toTupleExp ([Text] -> TupleExp)
-> (CronEventSeed -> [Text]) -> CronEventSeed -> TupleExp
forall b c a. (b -> c) -> (a -> b) -> a -> c
. CronEventSeed -> [Text]
toArr) [CronEventSeed]
cronSeeds,
                siConflict :: Maybe SQLConflict
siConflict = SQLConflict -> Maybe SQLConflict
forall a. a -> Maybe a
Just (SQLConflict -> Maybe SQLConflict)
-> SQLConflict -> Maybe SQLConflict
forall a b. (a -> b) -> a -> b
$ Maybe SQLConflictTarget -> SQLConflict
S.DoNothing Maybe SQLConflictTarget
forall a. Maybe a
Nothing,
                siRet :: Maybe RetExp
siRet = Maybe RetExp
forall a. Maybe a
Nothing
              }
  (PGTxErr -> QErr) -> Query -> () -> Bool -> TxE QErr ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE PGTxErr -> QErr
defaultTxErrorHandler (Text -> Query
PG.fromText Text
insertCronEventsSql) () Bool
False
  where
    toArr :: CronEventSeed -> [Text]
toArr (CronEventSeed TriggerName
n UTCTime
t) = [(TriggerName -> Text
triggerNameToTxt TriggerName
n), (UTCTime -> Text
formatTime' UTCTime
t)]
    toTupleExp :: [Text] -> TupleExp
toTupleExp = [SQLExp] -> TupleExp
S.TupleExp ([SQLExp] -> TupleExp)
-> ([Text] -> [SQLExp]) -> [Text] -> TupleExp
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Text -> SQLExp) -> [Text] -> [SQLExp]
forall a b. (a -> b) -> [a] -> [b]
map Text -> SQLExp
S.SELit

insertOneOffScheduledEventTx :: OneOffEvent -> PG.TxE QErr EventId
insertOneOffScheduledEventTx :: OneOffEvent -> TxE QErr ScheduledEventId
insertOneOffScheduledEventTx CreateScheduledEvent {[HeaderConf]
Maybe Text
Maybe Value
Maybe RequestTransform
Maybe MetadataResponseTransform
UTCTime
InputWebhook
STRetryConf
cseWebhook :: InputWebhook
cseScheduleAt :: UTCTime
csePayload :: Maybe Value
cseHeaders :: [HeaderConf]
cseRetryConf :: STRetryConf
cseComment :: Maybe Text
cseRequestTransform :: Maybe RequestTransform
cseResponseTransform :: Maybe MetadataResponseTransform
cseWebhook :: OneOffEvent -> InputWebhook
cseScheduleAt :: OneOffEvent -> UTCTime
csePayload :: OneOffEvent -> Maybe Value
cseHeaders :: OneOffEvent -> [HeaderConf]
cseRetryConf :: OneOffEvent -> STRetryConf
cseComment :: OneOffEvent -> Maybe Text
cseRequestTransform :: OneOffEvent -> Maybe RequestTransform
cseResponseTransform :: OneOffEvent -> Maybe MetadataResponseTransform
..} =
  Identity ScheduledEventId -> ScheduledEventId
forall a. Identity a -> a
runIdentity
    (Identity ScheduledEventId -> ScheduledEventId)
-> (SingleRow (Identity ScheduledEventId)
    -> Identity ScheduledEventId)
-> SingleRow (Identity ScheduledEventId)
-> ScheduledEventId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SingleRow (Identity ScheduledEventId) -> Identity ScheduledEventId
forall a. SingleRow a -> a
PG.getRow
    (SingleRow (Identity ScheduledEventId) -> ScheduledEventId)
-> TxET QErr IO (SingleRow (Identity ScheduledEventId))
-> TxE QErr ScheduledEventId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> (ViaJSON InputWebhook, UTCTime, ViaJSON (Maybe Value),
    ViaJSON STRetryConf, ViaJSON [HeaderConf], Maybe Text)
-> Bool
-> TxET QErr IO (SingleRow (Identity ScheduledEventId))
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
PG.withQE
      PGTxErr -> QErr
defaultTxErrorHandler
      [PG.sql|
    INSERT INTO hdb_catalog.hdb_scheduled_events
    (webhook_conf,scheduled_time,payload,retry_conf,header_conf,comment)
    VALUES
    ($1, $2, $3, $4, $5, $6) RETURNING id
    |]
      ( InputWebhook -> ViaJSON InputWebhook
forall a. a -> ViaJSON a
PG.ViaJSON InputWebhook
cseWebhook,
        UTCTime
cseScheduleAt,
        Maybe Value -> ViaJSON (Maybe Value)
forall a. a -> ViaJSON a
PG.ViaJSON Maybe Value
csePayload,
        STRetryConf -> ViaJSON STRetryConf
forall a. a -> ViaJSON a
PG.ViaJSON STRetryConf
cseRetryConf,
        [HeaderConf] -> ViaJSON [HeaderConf]
forall a. a -> ViaJSON a
PG.ViaJSON [HeaderConf]
cseHeaders,
        Maybe Text
cseComment
      )
      Bool
False

dropFutureCronEventsTx :: ClearCronEvents -> PG.TxE QErr ()
dropFutureCronEventsTx :: ClearCronEvents -> TxE QErr ()
dropFutureCronEventsTx = \case
  SingleCronTrigger TriggerName
triggerName ->
    (PGTxErr -> QErr)
-> Query -> Identity TriggerName -> Bool -> TxE QErr ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE
      PGTxErr -> QErr
defaultTxErrorHandler
      [PG.sql|
     DELETE FROM hdb_catalog.hdb_cron_events
     WHERE trigger_name = $1 AND scheduled_time > now() AND tries = 0
    |]
      (TriggerName -> Identity TriggerName
forall a. a -> Identity a
Identity TriggerName
triggerName)
      Bool
True
  MetadataCronTriggers [TriggerName]
triggerNames ->
    (PGTxErr -> QErr)
-> Query -> Identity PGTextArray -> Bool -> TxE QErr ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE
      PGTxErr -> QErr
defaultTxErrorHandler
      [PG.sql|
     DELETE FROM hdb_catalog.hdb_cron_events
     WHERE scheduled_time > now() AND tries = 0 AND trigger_name = ANY($1::text[])
    |]
      (PGTextArray -> Identity PGTextArray
forall a. a -> Identity a
Identity (PGTextArray -> Identity PGTextArray)
-> PGTextArray -> Identity PGTextArray
forall a b. (a -> b) -> a -> b
$ [Text] -> PGTextArray
PGTextArray ([Text] -> PGTextArray) -> [Text] -> PGTextArray
forall a b. (a -> b) -> a -> b
$ (TriggerName -> Text) -> [TriggerName] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map TriggerName -> Text
triggerNameToTxt [TriggerName]
triggerNames)
      Bool
False

cronEventsTable :: QualifiedTable
cronEventsTable :: QualifiedTable
cronEventsTable =
  SchemaName -> TableName -> QualifiedTable
forall a. SchemaName -> a -> QualifiedObject a
QualifiedObject SchemaName
"hdb_catalog" (TableName -> QualifiedTable) -> TableName -> QualifiedTable
forall a b. (a -> b) -> a -> b
$ Text -> TableName
TableName Text
"hdb_cron_events"

mkScheduledEventStatusFilter :: [ScheduledEventStatus] -> S.BoolExp
mkScheduledEventStatusFilter :: [ScheduledEventStatus] -> BoolExp
mkScheduledEventStatusFilter = \case
  [] -> Bool -> BoolExp
S.BELit Bool
True
  [ScheduledEventStatus]
v ->
    SQLExp -> [SQLExp] -> BoolExp
S.BEIN (Identifier -> SQLExp
S.SEIdentifier (Identifier -> SQLExp) -> Identifier -> SQLExp
forall a b. (a -> b) -> a -> b
$ Text -> Identifier
Identifier Text
"status")
      ([SQLExp] -> BoolExp) -> [SQLExp] -> BoolExp
forall a b. (a -> b) -> a -> b
$ (ScheduledEventStatus -> SQLExp)
-> [ScheduledEventStatus] -> [SQLExp]
forall a b. (a -> b) -> [a] -> [b]
map (Text -> SQLExp
S.SELit (Text -> SQLExp)
-> (ScheduledEventStatus -> Text) -> ScheduledEventStatus -> SQLExp
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ScheduledEventStatus -> Text
scheduledEventStatusToText) [ScheduledEventStatus]
v

scheduledTimeOrderBy :: S.OrderByExp
scheduledTimeOrderBy :: OrderByExp
scheduledTimeOrderBy =
  let scheduledTimeCol :: SQLExp
scheduledTimeCol = Identifier -> SQLExp
S.SEIdentifier (Identifier -> SQLExp) -> Identifier -> SQLExp
forall a b. (a -> b) -> a -> b
$ Text -> Identifier
Identifier Text
"scheduled_time"
   in NonEmpty OrderByItem -> OrderByExp
S.OrderByExp
        (NonEmpty OrderByItem -> OrderByExp)
-> NonEmpty OrderByItem -> OrderByExp
forall a b. (a -> b) -> a -> b
$ (OrderByItem -> [OrderByItem] -> NonEmpty OrderByItem)
-> [OrderByItem] -> OrderByItem -> NonEmpty OrderByItem
forall a b c. (a -> b -> c) -> b -> a -> c
flip OrderByItem -> [OrderByItem] -> NonEmpty OrderByItem
forall a. a -> [a] -> NonEmpty a
(NE.:|) []
        (OrderByItem -> NonEmpty OrderByItem)
-> OrderByItem -> NonEmpty OrderByItem
forall a b. (a -> b) -> a -> b
$ SQLExp -> Maybe OrderType -> Maybe NullsOrder -> OrderByItem
S.OrderByItem
          SQLExp
scheduledTimeCol
          (OrderType -> Maybe OrderType
forall a. a -> Maybe a
Just OrderType
S.OTAsc)
          Maybe NullsOrder
forall a. Maybe a
Nothing

-- | Build a select expression which outputs total count and
-- list of json rows with pagination limit and offset applied
mkPaginationSelectExp ::
  S.Select ->
  ScheduledEventPagination ->
  RowsCountOption ->
  S.Select
mkPaginationSelectExp :: Select -> ScheduledEventPagination -> RowsCountOption -> Select
mkPaginationSelectExp Select
allRowsSelect ScheduledEventPagination {Maybe Int
_sepLimit :: Maybe Int
_sepOffset :: Maybe Int
_sepLimit :: ScheduledEventPagination -> Maybe Int
_sepOffset :: ScheduledEventPagination -> Maybe Int
..} RowsCountOption
shouldIncludeRowsCount =
  Select
S.mkSelect
    { selCTEs :: [(TableAlias, InnerCTE)]
S.selCTEs = [(TableAlias
countCteAlias, Select -> InnerCTE
S.ICTESelect Select
allRowsSelect), (TableAlias
limitCteAlias, InnerCTE
limitCteSelect)],
      selExtr :: [Extractor]
S.selExtr =
        case RowsCountOption
shouldIncludeRowsCount of
          RowsCountOption
IncludeRowsCount -> [Extractor
countExtractor, Extractor
rowsExtractor]
          RowsCountOption
DontIncludeRowsCount -> [Extractor
rowsExtractor]
    }
  where
    countCteAlias :: TableAlias
countCteAlias = Text -> TableAlias
S.mkTableAlias Text
"count_cte"
    limitCteAlias :: TableAlias
limitCteAlias = Text -> TableAlias
S.mkTableAlias Text
"limit_cte"

    countExtractor :: Extractor
countExtractor =
      let selectExp :: Select
selectExp =
            Select
S.mkSelect
              { selExtr :: [Extractor]
S.selExtr = [SQLExp -> Maybe ColumnAlias -> Extractor
S.Extractor SQLExp
S.countStar Maybe ColumnAlias
forall a. Maybe a
Nothing],
                selFrom :: Maybe FromExp
S.selFrom = FromExp -> Maybe FromExp
forall a. a -> Maybe a
Just (FromExp -> Maybe FromExp) -> FromExp -> Maybe FromExp
forall a b. (a -> b) -> a -> b
$ TableIdentifier -> FromExp
S.mkIdenFromExp (TableAlias -> TableIdentifier
S.tableAliasToIdentifier TableAlias
countCteAlias)
              }
       in SQLExp -> Maybe ColumnAlias -> Extractor
S.Extractor (Select -> SQLExp
S.SESelect Select
selectExp) Maybe ColumnAlias
forall a. Maybe a
Nothing

    limitCteSelect :: InnerCTE
limitCteSelect =
      Select -> InnerCTE
S.ICTESelect
        Select
S.mkSelect
          { selExtr :: [Extractor]
S.selExtr = [Extractor
S.selectStar],
            selFrom :: Maybe FromExp
S.selFrom = FromExp -> Maybe FromExp
forall a. a -> Maybe a
Just (FromExp -> Maybe FromExp) -> FromExp -> Maybe FromExp
forall a b. (a -> b) -> a -> b
$ TableIdentifier -> FromExp
S.mkIdenFromExp (TableAlias -> TableIdentifier
S.tableAliasToIdentifier TableAlias
countCteAlias),
            selLimit :: Maybe LimitExp
S.selLimit = (SQLExp -> LimitExp
S.LimitExp (SQLExp -> LimitExp) -> (Int -> SQLExp) -> Int -> LimitExp
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> SQLExp
S.intToSQLExp) (Int -> LimitExp) -> Maybe Int -> Maybe LimitExp
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe Int
_sepLimit,
            selOffset :: Maybe OffsetExp
S.selOffset = (SQLExp -> OffsetExp
S.OffsetExp (SQLExp -> OffsetExp) -> (Int -> SQLExp) -> Int -> OffsetExp
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> SQLExp
S.intToSQLExp) (Int -> OffsetExp) -> Maybe Int -> Maybe OffsetExp
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe Int
_sepOffset
          }

    rowsExtractor :: Extractor
rowsExtractor =
      let jsonAgg :: SQLExp
jsonAgg = Text -> SQLExp
S.SEUnsafe Text
"json_agg(row_to_json(limit_cte.*))"
          selectExp :: Select
selectExp =
            Select
S.mkSelect
              { selExtr :: [Extractor]
S.selExtr = [SQLExp -> Maybe ColumnAlias -> Extractor
S.Extractor SQLExp
jsonAgg Maybe ColumnAlias
forall a. Maybe a
Nothing],
                selFrom :: Maybe FromExp
S.selFrom = FromExp -> Maybe FromExp
forall a. a -> Maybe a
Just (FromExp -> Maybe FromExp) -> FromExp -> Maybe FromExp
forall a b. (a -> b) -> a -> b
$ TableIdentifier -> FromExp
S.mkIdenFromExp (TableAlias -> TableIdentifier
S.tableAliasToIdentifier TableAlias
limitCteAlias)
              }
       in SQLExp -> Maybe ColumnAlias -> Extractor
S.Extractor (SQLExp -> SQLExp -> SQLExp
S.handleIfNull (Text -> SQLExp
S.SELit Text
"[]") (Select -> SQLExp
S.SESelect Select
selectExp)) Maybe ColumnAlias
forall a. Maybe a
Nothing

withCount :: (Int, PG.ViaJSON a) -> WithOptionalTotalCount a
withCount :: forall a. (Int, ViaJSON a) -> WithOptionalTotalCount a
withCount (Int
count, PG.ViaJSON a
a) = Maybe Int -> a -> WithOptionalTotalCount a
forall a. Maybe Int -> a -> WithOptionalTotalCount a
WithOptionalTotalCount (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
count) a
a

withoutCount :: PG.ViaJSON a -> WithOptionalTotalCount a
withoutCount :: forall a. ViaJSON a -> WithOptionalTotalCount a
withoutCount (PG.ViaJSON a
a) = Maybe Int -> a -> WithOptionalTotalCount a
forall a. Maybe Int -> a -> WithOptionalTotalCount a
WithOptionalTotalCount Maybe Int
forall a. Maybe a
Nothing a
a

executeWithOptionalTotalCount :: (J.FromJSON a) => PG.Query -> RowsCountOption -> PG.TxE QErr (WithOptionalTotalCount a)
executeWithOptionalTotalCount :: forall a.
FromJSON a =>
Query -> RowsCountOption -> TxE QErr (WithOptionalTotalCount a)
executeWithOptionalTotalCount Query
sql RowsCountOption
getRowsCount =
  case RowsCountOption
getRowsCount of
    RowsCountOption
IncludeRowsCount -> ((Int, ViaJSON a) -> WithOptionalTotalCount a
forall a. (Int, ViaJSON a) -> WithOptionalTotalCount a
withCount ((Int, ViaJSON a) -> WithOptionalTotalCount a)
-> (SingleRow (Int, ViaJSON a) -> (Int, ViaJSON a))
-> SingleRow (Int, ViaJSON a)
-> WithOptionalTotalCount a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SingleRow (Int, ViaJSON a) -> (Int, ViaJSON a)
forall a. SingleRow a -> a
PG.getRow) (SingleRow (Int, ViaJSON a) -> WithOptionalTotalCount a)
-> TxET QErr IO (SingleRow (Int, ViaJSON a))
-> TxE QErr (WithOptionalTotalCount a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query -> () -> Bool -> TxET QErr IO (SingleRow (Int, ViaJSON a))
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
PG.withQE PGTxErr -> QErr
defaultTxErrorHandler Query
sql () Bool
False
    RowsCountOption
DontIncludeRowsCount -> (ViaJSON a -> WithOptionalTotalCount a
forall a. ViaJSON a -> WithOptionalTotalCount a
withoutCount (ViaJSON a -> WithOptionalTotalCount a)
-> (SingleRow (Identity (ViaJSON a)) -> ViaJSON a)
-> SingleRow (Identity (ViaJSON a))
-> WithOptionalTotalCount a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Identity (ViaJSON a) -> ViaJSON a
forall a. Identity a -> a
runIdentity (Identity (ViaJSON a) -> ViaJSON a)
-> (SingleRow (Identity (ViaJSON a)) -> Identity (ViaJSON a))
-> SingleRow (Identity (ViaJSON a))
-> ViaJSON a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SingleRow (Identity (ViaJSON a)) -> Identity (ViaJSON a)
forall a. SingleRow a -> a
PG.getRow) (SingleRow (Identity (ViaJSON a)) -> WithOptionalTotalCount a)
-> TxET QErr IO (SingleRow (Identity (ViaJSON a)))
-> TxE QErr (WithOptionalTotalCount a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> ()
-> Bool
-> TxET QErr IO (SingleRow (Identity (ViaJSON a)))
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
PG.withQE PGTxErr -> QErr
defaultTxErrorHandler Query
sql () Bool
False

getOneOffScheduledEventsTx ::
  ScheduledEventPagination ->
  [ScheduledEventStatus] ->
  RowsCountOption ->
  PG.TxE QErr (WithOptionalTotalCount [OneOffScheduledEvent])
getOneOffScheduledEventsTx :: ScheduledEventPagination
-> [ScheduledEventStatus]
-> RowsCountOption
-> TxE QErr (WithOptionalTotalCount [OneOffScheduledEvent])
getOneOffScheduledEventsTx ScheduledEventPagination
pagination [ScheduledEventStatus]
statuses RowsCountOption
getRowsCount = do
  let table :: QualifiedTable
table = SchemaName -> TableName -> QualifiedTable
forall a. SchemaName -> a -> QualifiedObject a
QualifiedObject SchemaName
"hdb_catalog" (TableName -> QualifiedTable) -> TableName -> QualifiedTable
forall a b. (a -> b) -> a -> b
$ Text -> TableName
TableName Text
"hdb_scheduled_events"
      statusFilter :: BoolExp
statusFilter = [ScheduledEventStatus] -> BoolExp
mkScheduledEventStatusFilter [ScheduledEventStatus]
statuses
      select :: Select
select =
        Select
S.mkSelect
          { selExtr :: [Extractor]
S.selExtr = [Extractor
S.selectStar],
            selFrom :: Maybe FromExp
S.selFrom = FromExp -> Maybe FromExp
forall a. a -> Maybe a
Just (FromExp -> Maybe FromExp) -> FromExp -> Maybe FromExp
forall a b. (a -> b) -> a -> b
$ QualifiedTable -> FromExp
S.mkSimpleFromExp QualifiedTable
table,
            selWhere :: Maybe WhereFrag
S.selWhere = WhereFrag -> Maybe WhereFrag
forall a. a -> Maybe a
Just (WhereFrag -> Maybe WhereFrag) -> WhereFrag -> Maybe WhereFrag
forall a b. (a -> b) -> a -> b
$ BoolExp -> WhereFrag
S.WhereFrag BoolExp
statusFilter,
            selOrderBy :: Maybe OrderByExp
S.selOrderBy = OrderByExp -> Maybe OrderByExp
forall a. a -> Maybe a
Just OrderByExp
scheduledTimeOrderBy
          }
      sql :: Query
sql = Builder -> Query
PG.fromBuilder (Builder -> Query) -> Builder -> Query
forall a b. (a -> b) -> a -> b
$ Select -> Builder
forall a. ToSQL a => a -> Builder
toSQL (Select -> Builder) -> Select -> Builder
forall a b. (a -> b) -> a -> b
$ Select -> ScheduledEventPagination -> RowsCountOption -> Select
mkPaginationSelectExp Select
select ScheduledEventPagination
pagination RowsCountOption
getRowsCount
  Query
-> RowsCountOption
-> TxE QErr (WithOptionalTotalCount [OneOffScheduledEvent])
forall a.
FromJSON a =>
Query -> RowsCountOption -> TxE QErr (WithOptionalTotalCount a)
executeWithOptionalTotalCount Query
sql RowsCountOption
getRowsCount

getCronEventsTx ::
  TriggerName ->
  ScheduledEventPagination ->
  [ScheduledEventStatus] ->
  RowsCountOption ->
  PG.TxE QErr (WithOptionalTotalCount [CronEvent])
getCronEventsTx :: TriggerName
-> ScheduledEventPagination
-> [ScheduledEventStatus]
-> RowsCountOption
-> TxE QErr (WithOptionalTotalCount [CronEvent])
getCronEventsTx TriggerName
triggerName ScheduledEventPagination
pagination [ScheduledEventStatus]
status RowsCountOption
getRowsCount = do
  let triggerNameFilter :: BoolExp
triggerNameFilter =
        CompareOp -> SQLExp -> SQLExp -> BoolExp
S.BECompare CompareOp
S.SEQ (Identifier -> SQLExp
S.SEIdentifier (Identifier -> SQLExp) -> Identifier -> SQLExp
forall a b. (a -> b) -> a -> b
$ Text -> Identifier
Identifier Text
"trigger_name") (Text -> SQLExp
S.SELit (Text -> SQLExp) -> Text -> SQLExp
forall a b. (a -> b) -> a -> b
$ TriggerName -> Text
triggerNameToTxt TriggerName
triggerName)
      statusFilter :: BoolExp
statusFilter = [ScheduledEventStatus] -> BoolExp
mkScheduledEventStatusFilter [ScheduledEventStatus]
status
      select :: Select
select =
        Select
S.mkSelect
          { selExtr :: [Extractor]
S.selExtr = [Extractor
S.selectStar],
            selFrom :: Maybe FromExp
S.selFrom = FromExp -> Maybe FromExp
forall a. a -> Maybe a
Just (FromExp -> Maybe FromExp) -> FromExp -> Maybe FromExp
forall a b. (a -> b) -> a -> b
$ QualifiedTable -> FromExp
S.mkSimpleFromExp QualifiedTable
cronEventsTable,
            selWhere :: Maybe WhereFrag
S.selWhere = WhereFrag -> Maybe WhereFrag
forall a. a -> Maybe a
Just (WhereFrag -> Maybe WhereFrag) -> WhereFrag -> Maybe WhereFrag
forall a b. (a -> b) -> a -> b
$ BoolExp -> WhereFrag
S.WhereFrag (BoolExp -> WhereFrag) -> BoolExp -> WhereFrag
forall a b. (a -> b) -> a -> b
$ BinOp -> BoolExp -> BoolExp -> BoolExp
S.BEBin BinOp
S.AndOp BoolExp
triggerNameFilter BoolExp
statusFilter,
            selOrderBy :: Maybe OrderByExp
S.selOrderBy = OrderByExp -> Maybe OrderByExp
forall a. a -> Maybe a
Just OrderByExp
scheduledTimeOrderBy
          }
      sql :: Query
sql = Builder -> Query
PG.fromBuilder (Builder -> Query) -> Builder -> Query
forall a b. (a -> b) -> a -> b
$ Select -> Builder
forall a. ToSQL a => a -> Builder
toSQL (Select -> Builder) -> Select -> Builder
forall a b. (a -> b) -> a -> b
$ Select -> ScheduledEventPagination -> RowsCountOption -> Select
mkPaginationSelectExp Select
select ScheduledEventPagination
pagination RowsCountOption
getRowsCount
  Query
-> RowsCountOption -> TxE QErr (WithOptionalTotalCount [CronEvent])
forall a.
FromJSON a =>
Query -> RowsCountOption -> TxE QErr (WithOptionalTotalCount a)
executeWithOptionalTotalCount Query
sql RowsCountOption
getRowsCount

deleteScheduledEventTx ::
  ScheduledEventId -> ScheduledEventType -> PG.TxE QErr ()
deleteScheduledEventTx :: ScheduledEventId -> ScheduledEventType -> TxE QErr ()
deleteScheduledEventTx ScheduledEventId
eventId = \case
  ScheduledEventType
OneOff ->
    (PGTxErr -> QErr)
-> Query -> Identity ScheduledEventId -> Bool -> TxE QErr ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE
      PGTxErr -> QErr
defaultTxErrorHandler
      [PG.sql|
      DELETE FROM hdb_catalog.hdb_scheduled_events
       WHERE id = $1
    |]
      (ScheduledEventId -> Identity ScheduledEventId
forall a. a -> Identity a
Identity ScheduledEventId
eventId)
      Bool
False
  ScheduledEventType
Cron ->
    (PGTxErr -> QErr)
-> Query -> Identity ScheduledEventId -> Bool -> TxE QErr ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE
      PGTxErr -> QErr
defaultTxErrorHandler
      [PG.sql|
      DELETE FROM hdb_catalog.hdb_cron_events
       WHERE id = $1
    |]
      (ScheduledEventId -> Identity ScheduledEventId
forall a. a -> Identity a
Identity ScheduledEventId
eventId)
      Bool
False

invocationFieldExtractors :: QualifiedTable -> [S.Extractor]
invocationFieldExtractors :: QualifiedTable -> [Extractor]
invocationFieldExtractors QualifiedTable
table =
  [ SQLExp -> Maybe ColumnAlias -> Extractor
S.Extractor (Text -> SQLExp
seIden Text
"id") Maybe ColumnAlias
forall a. Maybe a
Nothing,
    SQLExp -> Maybe ColumnAlias -> Extractor
S.Extractor (Text -> SQLExp
seIden Text
"event_id") Maybe ColumnAlias
forall a. Maybe a
Nothing,
    SQLExp -> Maybe ColumnAlias -> Extractor
S.Extractor (Text -> SQLExp
seIden Text
"status") Maybe ColumnAlias
forall a. Maybe a
Nothing,
    SQLExp -> Maybe ColumnAlias -> Extractor
S.Extractor (SQLExp -> SQLExp
withJsonTypeAnn (SQLExp -> SQLExp) -> SQLExp -> SQLExp
forall a b. (a -> b) -> a -> b
$ Text -> SQLExp
seIden Text
"request") Maybe ColumnAlias
forall a. Maybe a
Nothing,
    SQLExp -> Maybe ColumnAlias -> Extractor
S.Extractor (SQLExp -> SQLExp
withJsonTypeAnn (SQLExp -> SQLExp) -> SQLExp -> SQLExp
forall a b. (a -> b) -> a -> b
$ Text -> SQLExp
seIden Text
"response") Maybe ColumnAlias
forall a. Maybe a
Nothing,
    SQLExp -> Maybe ColumnAlias -> Extractor
S.Extractor (Text -> SQLExp
seIden Text
"created_at") Maybe ColumnAlias
forall a. Maybe a
Nothing
  ]
  where
    withJsonTypeAnn :: SQLExp -> SQLExp
withJsonTypeAnn SQLExp
e = SQLExp -> TypeAnn -> SQLExp
S.SETyAnn SQLExp
e (TypeAnn -> SQLExp) -> TypeAnn -> SQLExp
forall a b. (a -> b) -> a -> b
$ Text -> TypeAnn
S.TypeAnn Text
"json"
    seIden :: Text -> SQLExp
seIden = QIdentifier -> SQLExp
S.SEQIdentifier (QIdentifier -> SQLExp) -> (Text -> QIdentifier) -> Text -> SQLExp
forall b c a. (b -> c) -> (a -> b) -> a -> c
. QualifiedTable -> Identifier -> QIdentifier
forall a. IsIdentifier a => QualifiedTable -> a -> QIdentifier
S.mkQIdentifierTable QualifiedTable
table (Identifier -> QIdentifier)
-> (Text -> Identifier) -> Text -> QIdentifier
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> Identifier
Identifier

mkEventIdBoolExp :: QualifiedTable -> EventId -> S.BoolExp
mkEventIdBoolExp :: QualifiedTable -> ScheduledEventId -> BoolExp
mkEventIdBoolExp QualifiedTable
table ScheduledEventId
eventId =
  CompareOp -> SQLExp -> SQLExp -> BoolExp
S.BECompare
    CompareOp
S.SEQ
    (QIdentifier -> SQLExp
S.SEQIdentifier (QIdentifier -> SQLExp) -> QIdentifier -> SQLExp
forall a b. (a -> b) -> a -> b
$ QualifiedTable -> Identifier -> QIdentifier
forall a. IsIdentifier a => QualifiedTable -> a -> QIdentifier
S.mkQIdentifierTable QualifiedTable
table (Identifier -> QIdentifier) -> Identifier -> QIdentifier
forall a b. (a -> b) -> a -> b
$ Text -> Identifier
Identifier Text
"event_id")
    (Text -> SQLExp
S.SELit (Text -> SQLExp) -> Text -> SQLExp
forall a b. (a -> b) -> a -> b
$ ScheduledEventId -> Text
unEventId ScheduledEventId
eventId)

getScheduledEventInvocationsTx ::
  GetScheduledEventInvocations ->
  PG.TxE QErr (WithOptionalTotalCount [ScheduledEventInvocation])
getScheduledEventInvocationsTx :: GetScheduledEventInvocations
-> TxE QErr (WithOptionalTotalCount [ScheduledEventInvocation])
getScheduledEventInvocationsTx GetScheduledEventInvocations
getEventInvocations = do
  let eventsTables :: EventTables
eventsTables = QualifiedTable -> QualifiedTable -> QualifiedTable -> EventTables
EventTables QualifiedTable
oneOffInvocationsTable QualifiedTable
cronInvocationsTable QualifiedTable
cronEventsTable
      sql :: Query
sql = Builder -> Query
PG.fromBuilder (Builder -> Query) -> Builder -> Query
forall a b. (a -> b) -> a -> b
$ Select -> Builder
forall a. ToSQL a => a -> Builder
toSQL (Select -> Builder) -> Select -> Builder
forall a b. (a -> b) -> a -> b
$ EventTables -> GetScheduledEventInvocations -> Select
getScheduledEventsInvocationsQuery EventTables
eventsTables GetScheduledEventInvocations
getEventInvocations
  Query
-> RowsCountOption
-> TxE QErr (WithOptionalTotalCount [ScheduledEventInvocation])
forall a.
FromJSON a =>
Query -> RowsCountOption -> TxE QErr (WithOptionalTotalCount a)
executeWithOptionalTotalCount Query
sql (GetScheduledEventInvocations -> RowsCountOption
_geiGetRowsCount GetScheduledEventInvocations
getEventInvocations)
  where
    oneOffInvocationsTable :: QualifiedTable
oneOffInvocationsTable = SchemaName -> TableName -> QualifiedTable
forall a. SchemaName -> a -> QualifiedObject a
QualifiedObject SchemaName
"hdb_catalog" (TableName -> QualifiedTable) -> TableName -> QualifiedTable
forall a b. (a -> b) -> a -> b
$ Text -> TableName
TableName Text
"hdb_scheduled_event_invocation_logs"
    cronInvocationsTable :: QualifiedTable
cronInvocationsTable = SchemaName -> TableName -> QualifiedTable
forall a. SchemaName -> a -> QualifiedObject a
QualifiedObject SchemaName
"hdb_catalog" (TableName -> QualifiedTable) -> TableName -> QualifiedTable
forall a b. (a -> b) -> a -> b
$ Text -> TableName
TableName Text
"hdb_cron_event_invocation_logs"

data EventTables = EventTables
  { EventTables -> QualifiedTable
etOneOffInvocationsTable :: QualifiedTable,
    EventTables -> QualifiedTable
etCronInvocationsTable :: QualifiedTable,
    EventTables -> QualifiedTable
etCronEventsTable :: QualifiedTable
  }

getScheduledEventsInvocationsQueryNoPagination :: EventTables -> GetScheduledEventInvocationsBy -> S.Select
getScheduledEventsInvocationsQueryNoPagination :: EventTables -> GetScheduledEventInvocationsBy -> Select
getScheduledEventsInvocationsQueryNoPagination (EventTables QualifiedTable
oneOffInvocationsTable QualifiedTable
cronInvocationsTable QualifiedTable
cronEventsTable') GetScheduledEventInvocationsBy
invocationsBy =
  Select
allRowsSelect
  where
    createdAtOrderBy :: QualifiedTable -> OrderByExp
createdAtOrderBy QualifiedTable
table =
      let createdAtCol :: SQLExp
createdAtCol = QIdentifier -> SQLExp
S.SEQIdentifier (QIdentifier -> SQLExp) -> QIdentifier -> SQLExp
forall a b. (a -> b) -> a -> b
$ QualifiedTable -> Identifier -> QIdentifier
forall a. IsIdentifier a => QualifiedTable -> a -> QIdentifier
S.mkQIdentifierTable QualifiedTable
table (Identifier -> QIdentifier) -> Identifier -> QIdentifier
forall a b. (a -> b) -> a -> b
$ Text -> Identifier
Identifier Text
"created_at"
       in NonEmpty OrderByItem -> OrderByExp
S.OrderByExp (NonEmpty OrderByItem -> OrderByExp)
-> NonEmpty OrderByItem -> OrderByExp
forall a b. (a -> b) -> a -> b
$ (OrderByItem -> [OrderByItem] -> NonEmpty OrderByItem)
-> [OrderByItem] -> OrderByItem -> NonEmpty OrderByItem
forall a b c. (a -> b -> c) -> b -> a -> c
flip OrderByItem -> [OrderByItem] -> NonEmpty OrderByItem
forall a. a -> [a] -> NonEmpty a
(NE.:|) [] (OrderByItem -> NonEmpty OrderByItem)
-> OrderByItem -> NonEmpty OrderByItem
forall a b. (a -> b) -> a -> b
$ SQLExp -> Maybe OrderType -> Maybe NullsOrder -> OrderByItem
S.OrderByItem SQLExp
createdAtCol (OrderType -> Maybe OrderType
forall a. a -> Maybe a
Just OrderType
S.OTDesc) Maybe NullsOrder
forall a. Maybe a
Nothing

    allRowsSelect :: Select
allRowsSelect = case GetScheduledEventInvocationsBy
invocationsBy of
      GIBEventId ScheduledEventId
eventId ScheduledEventType
eventType ->
        let table :: QualifiedTable
table = case ScheduledEventType
eventType of
              ScheduledEventType
OneOff -> QualifiedTable
oneOffInvocationsTable
              ScheduledEventType
Cron -> QualifiedTable
cronInvocationsTable
         in Select
S.mkSelect
              { selExtr :: [Extractor]
S.selExtr = QualifiedTable -> [Extractor]
invocationFieldExtractors QualifiedTable
table,
                selFrom :: Maybe FromExp
S.selFrom = FromExp -> Maybe FromExp
forall a. a -> Maybe a
Just (FromExp -> Maybe FromExp) -> FromExp -> Maybe FromExp
forall a b. (a -> b) -> a -> b
$ QualifiedTable -> FromExp
S.mkSimpleFromExp QualifiedTable
table,
                selOrderBy :: Maybe OrderByExp
S.selOrderBy = OrderByExp -> Maybe OrderByExp
forall a. a -> Maybe a
Just (OrderByExp -> Maybe OrderByExp) -> OrderByExp -> Maybe OrderByExp
forall a b. (a -> b) -> a -> b
$ QualifiedTable -> OrderByExp
createdAtOrderBy QualifiedTable
table,
                selWhere :: Maybe WhereFrag
S.selWhere = WhereFrag -> Maybe WhereFrag
forall a. a -> Maybe a
Just (WhereFrag -> Maybe WhereFrag) -> WhereFrag -> Maybe WhereFrag
forall a b. (a -> b) -> a -> b
$ BoolExp -> WhereFrag
S.WhereFrag (BoolExp -> WhereFrag) -> BoolExp -> WhereFrag
forall a b. (a -> b) -> a -> b
$ QualifiedTable -> ScheduledEventId -> BoolExp
mkEventIdBoolExp QualifiedTable
table ScheduledEventId
eventId
              }
      GIBEvent ScheduledEvent
event -> case ScheduledEvent
event of
        ScheduledEvent
SEOneOff ->
          let table :: QualifiedTable
table = QualifiedTable
oneOffInvocationsTable
           in Select
S.mkSelect
                { selExtr :: [Extractor]
S.selExtr = QualifiedTable -> [Extractor]
invocationFieldExtractors QualifiedTable
table,
                  selFrom :: Maybe FromExp
S.selFrom = FromExp -> Maybe FromExp
forall a. a -> Maybe a
Just (FromExp -> Maybe FromExp) -> FromExp -> Maybe FromExp
forall a b. (a -> b) -> a -> b
$ QualifiedTable -> FromExp
S.mkSimpleFromExp QualifiedTable
table,
                  selOrderBy :: Maybe OrderByExp
S.selOrderBy = OrderByExp -> Maybe OrderByExp
forall a. a -> Maybe a
Just (OrderByExp -> Maybe OrderByExp) -> OrderByExp -> Maybe OrderByExp
forall a b. (a -> b) -> a -> b
$ QualifiedTable -> OrderByExp
createdAtOrderBy QualifiedTable
table
                }
        SECron TriggerName
triggerName ->
          let invocationTable :: QualifiedTable
invocationTable = QualifiedTable
cronInvocationsTable
              eventTable :: QualifiedTable
eventTable = QualifiedTable
cronEventsTable'
              joinCondition :: JoinCond
joinCondition =
                BoolExp -> JoinCond
S.JoinOn
                  (BoolExp -> JoinCond) -> BoolExp -> JoinCond
forall a b. (a -> b) -> a -> b
$ CompareOp -> SQLExp -> SQLExp -> BoolExp
S.BECompare
                    CompareOp
S.SEQ
                    (QIdentifier -> SQLExp
S.SEQIdentifier (QIdentifier -> SQLExp) -> QIdentifier -> SQLExp
forall a b. (a -> b) -> a -> b
$ QualifiedTable -> Identifier -> QIdentifier
forall a. IsIdentifier a => QualifiedTable -> a -> QIdentifier
S.mkQIdentifierTable QualifiedTable
eventTable (Identifier -> QIdentifier) -> Identifier -> QIdentifier
forall a b. (a -> b) -> a -> b
$ Text -> Identifier
Identifier Text
"id")
                    (QIdentifier -> SQLExp
S.SEQIdentifier (QIdentifier -> SQLExp) -> QIdentifier -> SQLExp
forall a b. (a -> b) -> a -> b
$ QualifiedTable -> Identifier -> QIdentifier
forall a. IsIdentifier a => QualifiedTable -> a -> QIdentifier
S.mkQIdentifierTable QualifiedTable
invocationTable (Identifier -> QIdentifier) -> Identifier -> QIdentifier
forall a b. (a -> b) -> a -> b
$ Text -> Identifier
Identifier Text
"event_id")
              joinTables :: JoinExpr
joinTables =
                FromItem -> JoinType -> FromItem -> JoinCond -> JoinExpr
S.JoinExpr
                  (QualifiedTable -> Maybe TableAlias -> FromItem
S.FISimple QualifiedTable
invocationTable Maybe TableAlias
forall a. Maybe a
Nothing)
                  JoinType
S.Inner
                  (QualifiedTable -> Maybe TableAlias -> FromItem
S.FISimple QualifiedTable
eventTable Maybe TableAlias
forall a. Maybe a
Nothing)
                  JoinCond
joinCondition
              triggerBoolExp :: BoolExp
triggerBoolExp =
                CompareOp -> SQLExp -> SQLExp -> BoolExp
S.BECompare
                  CompareOp
S.SEQ
                  (QIdentifier -> SQLExp
S.SEQIdentifier (QIdentifier -> SQLExp) -> QIdentifier -> SQLExp
forall a b. (a -> b) -> a -> b
$ QualifiedTable -> Identifier -> QIdentifier
forall a. IsIdentifier a => QualifiedTable -> a -> QIdentifier
S.mkQIdentifierTable QualifiedTable
eventTable (Text -> Identifier
Identifier Text
"trigger_name"))
                  (Text -> SQLExp
S.SELit (Text -> SQLExp) -> Text -> SQLExp
forall a b. (a -> b) -> a -> b
$ TriggerName -> Text
triggerNameToTxt TriggerName
triggerName)
           in Select
S.mkSelect
                { selExtr :: [Extractor]
S.selExtr = QualifiedTable -> [Extractor]
invocationFieldExtractors QualifiedTable
invocationTable,
                  selFrom :: Maybe FromExp
S.selFrom = FromExp -> Maybe FromExp
forall a. a -> Maybe a
Just (FromExp -> Maybe FromExp) -> FromExp -> Maybe FromExp
forall a b. (a -> b) -> a -> b
$ [FromItem] -> FromExp
S.FromExp [JoinExpr -> FromItem
S.FIJoin JoinExpr
joinTables],
                  selWhere :: Maybe WhereFrag
S.selWhere = WhereFrag -> Maybe WhereFrag
forall a. a -> Maybe a
Just (WhereFrag -> Maybe WhereFrag) -> WhereFrag -> Maybe WhereFrag
forall a b. (a -> b) -> a -> b
$ BoolExp -> WhereFrag
S.WhereFrag BoolExp
triggerBoolExp,
                  selOrderBy :: Maybe OrderByExp
S.selOrderBy = OrderByExp -> Maybe OrderByExp
forall a. a -> Maybe a
Just (OrderByExp -> Maybe OrderByExp) -> OrderByExp -> Maybe OrderByExp
forall a b. (a -> b) -> a -> b
$ QualifiedTable -> OrderByExp
createdAtOrderBy QualifiedTable
invocationTable
                }

getScheduledEventsInvocationsQuery :: EventTables -> GetScheduledEventInvocations -> S.Select
getScheduledEventsInvocationsQuery :: EventTables -> GetScheduledEventInvocations -> Select
getScheduledEventsInvocationsQuery EventTables
eventTables (GetScheduledEventInvocations GetScheduledEventInvocationsBy
invocationsBy ScheduledEventPagination
pagination RowsCountOption
shouldIncludeRowsCount) =
  let invocationsSelect :: Select
invocationsSelect = EventTables -> GetScheduledEventInvocationsBy -> Select
getScheduledEventsInvocationsQueryNoPagination EventTables
eventTables GetScheduledEventInvocationsBy
invocationsBy
   in Select -> ScheduledEventPagination -> RowsCountOption -> Select
mkPaginationSelectExp Select
invocationsSelect ScheduledEventPagination
pagination RowsCountOption
shouldIncludeRowsCount

-- | Logger to accumulate stats of fetched scheduled events over a period of time and log once using @'L.Logger L.Hasura'.
-- See @'createStatsLogger' for more details.
createFetchedScheduledEventsStatsLogger :: (MonadIO m) => L.Logger L.Hasura -> m FetchedScheduledEventsStatsLogger
createFetchedScheduledEventsStatsLogger :: forall (m :: * -> *).
MonadIO m =>
Logger Hasura -> m FetchedScheduledEventsStatsLogger
createFetchedScheduledEventsStatsLogger = Logger Hasura -> m FetchedScheduledEventsStatsLogger
forall (m :: * -> *) stats impl.
(MonadIO m, ToEngineLog stats impl, Monoid stats) =>
Logger impl -> m (Trigger stats stats)
L.createStatsLogger

-- | Close the fetched scheduled events stats logger.
closeFetchedScheduledEventsStatsLogger ::
  (MonadIO m) => L.Logger L.Hasura -> FetchedScheduledEventsStatsLogger -> m ()
closeFetchedScheduledEventsStatsLogger :: forall (m :: * -> *).
MonadIO m =>
Logger Hasura -> FetchedScheduledEventsStatsLogger -> m ()
closeFetchedScheduledEventsStatsLogger = EngineLogType Hasura
-> Logger Hasura -> FetchedScheduledEventsStatsLogger -> m ()
forall (m :: * -> *) impl stats.
(MonadIO m, EnabledLogTypes impl) =>
EngineLogType impl -> Logger impl -> Trigger stats stats -> m ()
L.closeStatsLogger EngineLogType Hasura
L.scheduledTriggerProcessLogType

-- | Log statistics of fetched scheduled events. See @'logStats' for more details.
logFetchedScheduledEventsStats ::
  (MonadIO m) =>
  FetchedScheduledEventsStatsLogger ->
  CronEventsCount ->
  OneOffScheduledEventsCount ->
  m ()
logFetchedScheduledEventsStats :: forall (m :: * -> *).
MonadIO m =>
FetchedScheduledEventsStatsLogger
-> CronEventsCount -> OneOffScheduledEventsCount -> m ()
logFetchedScheduledEventsStats FetchedScheduledEventsStatsLogger
logger CronEventsCount
cron OneOffScheduledEventsCount
oneOff =
  FetchedScheduledEventsStatsLogger
-> FetchedScheduledEventsStats -> m ()
forall (m :: * -> *) stats.
MonadIO m =>
Trigger stats stats -> stats -> m ()
L.logStats FetchedScheduledEventsStatsLogger
logger (CronEventsCount
-> OneOffScheduledEventsCount -> Int -> FetchedScheduledEventsStats
FetchedScheduledEventsStats CronEventsCount
cron OneOffScheduledEventsCount
oneOff Int
1)

-- | Logger to accumulate stats of fetched cron triggers, for generating cron events, over a period of time and
-- log once using @'L.Logger L.Hasura'.
-- See @'createStatsLogger' for more details.
createFetchedCronTriggerStatsLogger :: (MonadIO m) => L.Logger L.Hasura -> m FetchedCronTriggerStatsLogger
createFetchedCronTriggerStatsLogger :: forall (m :: * -> *).
MonadIO m =>
Logger Hasura -> m FetchedCronTriggerStatsLogger
createFetchedCronTriggerStatsLogger = Logger Hasura -> m FetchedCronTriggerStatsLogger
forall (m :: * -> *) stats impl.
(MonadIO m, ToEngineLog stats impl, Monoid stats) =>
Logger impl -> m (Trigger stats stats)
L.createStatsLogger

-- | Close the fetched cron trigger stats logger.
closeFetchedCronTriggersStatsLogger ::
  (MonadIO m) => L.Logger L.Hasura -> FetchedCronTriggerStatsLogger -> m ()
closeFetchedCronTriggersStatsLogger :: forall (m :: * -> *).
MonadIO m =>
Logger Hasura -> FetchedCronTriggerStatsLogger -> m ()
closeFetchedCronTriggersStatsLogger = EngineLogType Hasura
-> Logger Hasura -> FetchedCronTriggerStatsLogger -> m ()
forall (m :: * -> *) impl stats.
(MonadIO m, EnabledLogTypes impl) =>
EngineLogType impl -> Logger impl -> Trigger stats stats -> m ()
L.closeStatsLogger EngineLogType Hasura
L.cronEventGeneratorProcessType

-- | Log statistics of fetched cron triggers. See @'logStats' for more details.
logFetchedCronTriggersStats ::
  (MonadIO m) =>
  FetchedCronTriggerStatsLogger ->
  [CronTriggerStats] ->
  m ()
logFetchedCronTriggersStats :: forall (m :: * -> *).
MonadIO m =>
FetchedCronTriggerStatsLogger -> [CronTriggerStats] -> m ()
logFetchedCronTriggersStats FetchedCronTriggerStatsLogger
logger [CronTriggerStats]
cronTriggerStats =
  FetchedCronTriggerStatsLogger -> FetchedCronTriggerStats -> m ()
forall (m :: * -> *) stats.
MonadIO m =>
Trigger stats stats -> stats -> m ()
L.logStats FetchedCronTriggerStatsLogger
logger ([CronTriggerStats] -> Int -> FetchedCronTriggerStats
FetchedCronTriggerStats [CronTriggerStats]
cronTriggerStats Int
1)