{-# LANGUAGE PatternSynonyms #-}
{-# LANGUAGE QuasiQuotes #-}
module Hasura.Eventing.ScheduledTrigger
( runCronEventsGenerator,
processScheduledTriggers,
generateScheduleTimes,
CronEventSeed (..),
LockedEventsCtx (..),
createFetchedCronTriggerStatsLogger,
closeFetchedCronTriggersStatsLogger,
createFetchedScheduledEventsStatsLogger,
closeFetchedScheduledEventsStatsLogger,
getDeprivedCronTriggerStatsTx,
getScheduledEventsForDeliveryTx,
insertInvocationTx,
setScheduledEventOpTx,
unlockScheduledEventsTx,
unlockAllLockedScheduledEventsTx,
insertCronEventsTx,
insertOneOffScheduledEventTx,
dropFutureCronEventsTx,
getOneOffScheduledEventsTx,
getCronEventsTx,
deleteScheduledEventTx,
getScheduledEventInvocationsTx,
getScheduledEventsInvocationsQuery,
getScheduledEventsInvocationsQueryNoPagination,
mkScheduledEventStatusFilter,
scheduledTimeOrderBy,
executeWithOptionalTotalCount,
mkPaginationSelectExp,
withCount,
invocationFieldExtractors,
mkEventIdBoolExp,
EventTables (..),
)
where
import Control.Concurrent.Async.Lifted (forConcurrently_)
import Control.Concurrent.Extended (Forever (..), sleep)
import Control.Concurrent.STM
import Control.Lens (preview)
import Control.Monad.Trans.Control (MonadBaseControl)
import Data.Aeson qualified as J
import Data.Environment qualified as Env
import Data.Has
import Data.HashMap.Strict qualified as HashMap
import Data.Int (Int64)
import Data.List.NonEmpty qualified as NE
import Data.SerializableBlob qualified as SB
import Data.Set qualified as Set
import Data.Text qualified as T
import Data.Text.Extended (ToTxt (..), (<<>))
import Data.These
import Data.Time.Clock
import Data.URL.Template (printTemplate)
import Database.PG.Query qualified as PG
import Hasura.Backends.Postgres.Execute.Types
import Hasura.Backends.Postgres.SQL.DML qualified as S
import Hasura.Backends.Postgres.SQL.Types
import Hasura.Base.Error
import Hasura.Eventing.Common
import Hasura.Eventing.HTTP
import Hasura.Eventing.ScheduledTrigger.Types
import Hasura.HTTP (getHTTPExceptionStatus)
import Hasura.Logging qualified as L
import Hasura.Metadata.Class
import Hasura.Prelude
import Hasura.RQL.DDL.EventTrigger (ResolveHeaderError, getHeaderInfosFromConfEither)
import Hasura.RQL.DDL.Webhook.Transform
import Hasura.RQL.Types.Common
import Hasura.RQL.Types.EventTrigger
import Hasura.RQL.Types.Eventing
import Hasura.RQL.Types.ScheduledTrigger
import Hasura.RQL.Types.SchemaCache
import Hasura.SQL.Types
import Hasura.Server.Prometheus (ScheduledTriggerMetrics (..))
import Hasura.Tracing qualified as Tracing
import Network.HTTP.Client.Transformable qualified as HTTP
import Refined (unrefine)
import System.Metrics.Prometheus.Counter as Prometheus.Counter
import System.Timeout.Lifted (timeout)
import Text.Builder qualified as TB
runCronEventsGenerator ::
( MonadIO m,
MonadMetadataStorage m
) =>
L.Logger L.Hasura ->
FetchedCronTriggerStatsLogger ->
IO SchemaCache ->
m void
runCronEventsGenerator :: forall (m :: * -> *) void.
(MonadIO m, MonadMetadataStorage m) =>
Logger Hasura
-> FetchedCronTriggerStatsLogger -> IO SchemaCache -> m void
runCronEventsGenerator Logger Hasura
logger FetchedCronTriggerStatsLogger
cronTriggerStatsLogger IO SchemaCache
getSC = do
m () -> m void
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m void) -> m () -> m void
forall a b. (a -> b) -> a -> b
$ do
SchemaCache
sc <- IO SchemaCache -> m SchemaCache
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO SchemaCache
getSC
let cronTriggersCache :: HashMap TriggerName CronTriggerInfo
cronTriggersCache = SchemaCache -> HashMap TriggerName CronTriggerInfo
scCronTriggers SchemaCache
sc
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (HashMap TriggerName CronTriggerInfo -> Bool
forall k v. HashMap k v -> Bool
HashMap.null HashMap TriggerName CronTriggerInfo
cronTriggersCache) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
Either QErr ()
eitherRes <- ExceptT QErr m () -> m (Either QErr ())
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT QErr m () -> m (Either QErr ()))
-> ExceptT QErr m () -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$ do
[CronTriggerStats]
deprivedCronTriggerStats <- ExceptT QErr m (Either QErr [CronTriggerStats])
-> ExceptT QErr m [CronTriggerStats]
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM (ExceptT QErr m (Either QErr [CronTriggerStats])
-> ExceptT QErr m [CronTriggerStats])
-> ExceptT QErr m (Either QErr [CronTriggerStats])
-> ExceptT QErr m [CronTriggerStats]
forall a b. (a -> b) -> a -> b
$ [TriggerName] -> ExceptT QErr m (Either QErr [CronTriggerStats])
forall (m :: * -> *).
MonadMetadataStorage m =>
[TriggerName] -> m (Either QErr [CronTriggerStats])
getDeprivedCronTriggerStats ([TriggerName] -> ExceptT QErr m (Either QErr [CronTriggerStats]))
-> [TriggerName] -> ExceptT QErr m (Either QErr [CronTriggerStats])
forall a b. (a -> b) -> a -> b
$ HashMap TriggerName CronTriggerInfo -> [TriggerName]
forall k v. HashMap k v -> [k]
HashMap.keys HashMap TriggerName CronTriggerInfo
cronTriggersCache
FetchedCronTriggerStatsLogger
-> [CronTriggerStats] -> ExceptT QErr m ()
forall (m :: * -> *).
MonadIO m =>
FetchedCronTriggerStatsLogger -> [CronTriggerStats] -> m ()
logFetchedCronTriggersStats FetchedCronTriggerStatsLogger
cronTriggerStatsLogger [CronTriggerStats]
deprivedCronTriggerStats
[(CronTriggerInfo, CronTriggerStats)]
cronTriggersForHydrationWithStats <-
[Maybe (CronTriggerInfo, CronTriggerStats)]
-> [(CronTriggerInfo, CronTriggerStats)]
forall a. [Maybe a] -> [a]
forall (f :: * -> *) a. Filterable f => f (Maybe a) -> f a
catMaybes
([Maybe (CronTriggerInfo, CronTriggerStats)]
-> [(CronTriggerInfo, CronTriggerStats)])
-> ExceptT QErr m [Maybe (CronTriggerInfo, CronTriggerStats)]
-> ExceptT QErr m [(CronTriggerInfo, CronTriggerStats)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (CronTriggerStats
-> ExceptT QErr m (Maybe (CronTriggerInfo, CronTriggerStats)))
-> [CronTriggerStats]
-> ExceptT QErr m [Maybe (CronTriggerInfo, CronTriggerStats)]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM (HashMap TriggerName CronTriggerInfo
-> CronTriggerStats
-> ExceptT QErr m (Maybe (CronTriggerInfo, CronTriggerStats))
withCronTrigger HashMap TriggerName CronTriggerInfo
cronTriggersCache) [CronTriggerStats]
deprivedCronTriggerStats
[(CronTriggerInfo, CronTriggerStats)] -> ExceptT QErr m ()
forall (m :: * -> *).
(MonadMetadataStorage m, MonadError QErr m) =>
[(CronTriggerInfo, CronTriggerStats)] -> m ()
insertCronEventsFor [(CronTriggerInfo, CronTriggerStats)]
cronTriggersForHydrationWithStats
Either QErr () -> (QErr -> m ()) -> m ()
forall (m :: * -> *) e a.
Applicative m =>
Either e a -> (e -> m a) -> m a
onLeft Either QErr ()
eitherRes ((QErr -> m ()) -> m ()) -> (QErr -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ Logger Hasura
-> forall a (m :: * -> *).
(ToEngineLog a Hasura, MonadIO m) =>
a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
(ToEngineLog a impl, MonadIO m) =>
a -> m ()
L.unLogger Logger Hasura
logger (ScheduledTriggerInternalErr -> m ())
-> (QErr -> ScheduledTriggerInternalErr) -> QErr -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. QErr -> ScheduledTriggerInternalErr
ScheduledTriggerInternalErr
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ DiffTime -> IO ()
sleep (Minutes -> DiffTime
minutes Minutes
1)
where
withCronTrigger :: HashMap TriggerName CronTriggerInfo
-> CronTriggerStats
-> ExceptT QErr m (Maybe (CronTriggerInfo, CronTriggerStats))
withCronTrigger HashMap TriggerName CronTriggerInfo
cronTriggerCache CronTriggerStats
cronTriggerStat = do
case TriggerName
-> HashMap TriggerName CronTriggerInfo -> Maybe CronTriggerInfo
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HashMap.lookup (CronTriggerStats -> TriggerName
_ctsName CronTriggerStats
cronTriggerStat) HashMap TriggerName CronTriggerInfo
cronTriggerCache of
Maybe CronTriggerInfo
Nothing -> do
Logger Hasura
-> forall a (m :: * -> *).
(ToEngineLog a Hasura, MonadIO m) =>
a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
(ToEngineLog a impl, MonadIO m) =>
a -> m ()
L.unLogger Logger Hasura
logger
(ScheduledTriggerInternalErr -> ExceptT QErr m ())
-> ScheduledTriggerInternalErr -> ExceptT QErr m ()
forall a b. (a -> b) -> a -> b
$ QErr -> ScheduledTriggerInternalErr
ScheduledTriggerInternalErr
(QErr -> ScheduledTriggerInternalErr)
-> QErr -> ScheduledTriggerInternalErr
forall a b. (a -> b) -> a -> b
$ Code -> Text -> QErr
err500 Code
Unexpected Text
"could not find scheduled trigger in the schema cache"
Maybe (CronTriggerInfo, CronTriggerStats)
-> ExceptT QErr m (Maybe (CronTriggerInfo, CronTriggerStats))
forall a. a -> ExceptT QErr m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (CronTriggerInfo, CronTriggerStats)
forall a. Maybe a
Nothing
Just CronTriggerInfo
cronTrigger ->
Maybe (CronTriggerInfo, CronTriggerStats)
-> ExceptT QErr m (Maybe (CronTriggerInfo, CronTriggerStats))
forall a. a -> ExceptT QErr m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
(Maybe (CronTriggerInfo, CronTriggerStats)
-> ExceptT QErr m (Maybe (CronTriggerInfo, CronTriggerStats)))
-> Maybe (CronTriggerInfo, CronTriggerStats)
-> ExceptT QErr m (Maybe (CronTriggerInfo, CronTriggerStats))
forall a b. (a -> b) -> a -> b
$ (CronTriggerInfo, CronTriggerStats)
-> Maybe (CronTriggerInfo, CronTriggerStats)
forall a. a -> Maybe a
Just (CronTriggerInfo
cronTrigger, CronTriggerStats
cronTriggerStat)
insertCronEventsFor ::
(MonadMetadataStorage m, MonadError QErr m) =>
[(CronTriggerInfo, CronTriggerStats)] ->
m ()
insertCronEventsFor :: forall (m :: * -> *).
(MonadMetadataStorage m, MonadError QErr m) =>
[(CronTriggerInfo, CronTriggerStats)] -> m ()
insertCronEventsFor [(CronTriggerInfo, CronTriggerStats)]
cronTriggersWithStats = do
let scheduledEvents :: [CronEventSeed]
scheduledEvents = (((CronTriggerInfo, CronTriggerStats) -> [CronEventSeed])
-> [(CronTriggerInfo, CronTriggerStats)] -> [CronEventSeed])
-> [(CronTriggerInfo, CronTriggerStats)]
-> ((CronTriggerInfo, CronTriggerStats) -> [CronEventSeed])
-> [CronEventSeed]
forall a b c. (a -> b -> c) -> b -> a -> c
flip ((CronTriggerInfo, CronTriggerStats) -> [CronEventSeed])
-> [(CronTriggerInfo, CronTriggerStats)] -> [CronEventSeed]
forall (t :: * -> *) a b. Foldable t => (a -> [b]) -> t a -> [b]
concatMap [(CronTriggerInfo, CronTriggerStats)]
cronTriggersWithStats (((CronTriggerInfo, CronTriggerStats) -> [CronEventSeed])
-> [CronEventSeed])
-> ((CronTriggerInfo, CronTriggerStats) -> [CronEventSeed])
-> [CronEventSeed]
forall a b. (a -> b) -> a -> b
$ \(CronTriggerInfo
cti, CronTriggerStats
stats) ->
UTCTime -> CronTriggerInfo -> [CronEventSeed]
generateCronEventsFrom (CronTriggerStats -> UTCTime
_ctsMaxScheduledTime CronTriggerStats
stats) CronTriggerInfo
cti
case [CronEventSeed]
scheduledEvents of
[] -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
[CronEventSeed]
events -> m (Either QErr ()) -> m ()
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM (m (Either QErr ()) -> m ()) -> m (Either QErr ()) -> m ()
forall a b. (a -> b) -> a -> b
$ [CronEventSeed] -> m (Either QErr ())
forall (m :: * -> *).
MonadMetadataStorage m =>
[CronEventSeed] -> m (Either QErr ())
insertCronEvents [CronEventSeed]
events
generateCronEventsFrom :: UTCTime -> CronTriggerInfo -> [CronEventSeed]
generateCronEventsFrom :: UTCTime -> CronTriggerInfo -> [CronEventSeed]
generateCronEventsFrom UTCTime
startTime CronTriggerInfo {[EventHeaderInfo]
Maybe Text
Maybe Value
Maybe RequestTransform
Maybe MetadataResponseTransform
CronSchedule
EnvRecord ResolvedWebhook
TriggerName
STRetryConf
ctiName :: TriggerName
ctiSchedule :: CronSchedule
ctiPayload :: Maybe Value
ctiRetryConf :: STRetryConf
ctiWebhookInfo :: EnvRecord ResolvedWebhook
ctiHeaders :: [EventHeaderInfo]
ctiComment :: Maybe Text
ctiRequestTransform :: Maybe RequestTransform
ctiResponseTransform :: Maybe MetadataResponseTransform
ctiName :: CronTriggerInfo -> TriggerName
ctiSchedule :: CronTriggerInfo -> CronSchedule
ctiPayload :: CronTriggerInfo -> Maybe Value
ctiRetryConf :: CronTriggerInfo -> STRetryConf
ctiWebhookInfo :: CronTriggerInfo -> EnvRecord ResolvedWebhook
ctiHeaders :: CronTriggerInfo -> [EventHeaderInfo]
ctiComment :: CronTriggerInfo -> Maybe Text
ctiRequestTransform :: CronTriggerInfo -> Maybe RequestTransform
ctiResponseTransform :: CronTriggerInfo -> Maybe MetadataResponseTransform
..} =
(UTCTime -> CronEventSeed) -> [UTCTime] -> [CronEventSeed]
forall a b. (a -> b) -> [a] -> [b]
map (TriggerName -> UTCTime -> CronEventSeed
CronEventSeed TriggerName
ctiName)
([UTCTime] -> [CronEventSeed]) -> [UTCTime] -> [CronEventSeed]
forall a b. (a -> b) -> a -> b
$
UTCTime -> Int -> CronSchedule -> [UTCTime]
generateScheduleTimes UTCTime
startTime Int
100 CronSchedule
ctiSchedule
upperBoundScheduledEventTimeout :: DiffTime
upperBoundScheduledEventTimeout :: DiffTime
upperBoundScheduledEventTimeout = Minutes -> DiffTime
minutes Minutes
30
processCronEvents ::
( MonadIO m,
MonadMetadataStorage m,
Tracing.MonadTrace m,
MonadBaseControl IO m
) =>
L.Logger L.Hasura ->
HTTP.Manager ->
ScheduledTriggerMetrics ->
[CronEvent] ->
HashMap TriggerName CronTriggerInfo ->
TVar (Set.Set CronEventId) ->
m ()
processCronEvents :: forall (m :: * -> *).
(MonadIO m, MonadMetadataStorage m, MonadTrace m,
MonadBaseControl IO m) =>
Logger Hasura
-> Manager
-> ScheduledTriggerMetrics
-> [CronEvent]
-> HashMap TriggerName CronTriggerInfo
-> TVar (Set ScheduledEventId)
-> m ()
processCronEvents Logger Hasura
logger Manager
httpMgr ScheduledTriggerMetrics
scheduledTriggerMetrics [CronEvent]
cronEvents HashMap TriggerName CronTriggerInfo
cronTriggersInfo TVar (Set ScheduledEventId)
lockedCronEvents = do
[ScheduledEventId] -> TVar (Set ScheduledEventId) -> m ()
forall (m :: * -> *).
MonadIO m =>
[ScheduledEventId] -> TVar (Set ScheduledEventId) -> m ()
saveLockedEvents ((CronEvent -> ScheduledEventId)
-> [CronEvent] -> [ScheduledEventId]
forall a b. (a -> b) -> [a] -> [b]
map CronEvent -> ScheduledEventId
_ceId [CronEvent]
cronEvents) TVar (Set ScheduledEventId)
lockedCronEvents
[CronEvent] -> (CronEvent -> m ()) -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, MonadBaseControl IO m) =>
t a -> (a -> m b) -> m ()
forConcurrently_ [CronEvent]
cronEvents ((CronEvent -> m ()) -> m ()) -> (CronEvent -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \(CronEvent ScheduledEventId
id' TriggerName
name UTCTime
st Text
_ Int
tries UTCTime
_ Maybe UTCTime
_) -> do
case TriggerName
-> HashMap TriggerName CronTriggerInfo -> Maybe CronTriggerInfo
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HashMap.lookup TriggerName
name HashMap TriggerName CronTriggerInfo
cronTriggersInfo of
Maybe CronTriggerInfo
Nothing ->
QErr -> m ()
logInternalError
(QErr -> m ()) -> QErr -> m ()
forall a b. (a -> b) -> a -> b
$ Code -> Text -> QErr
err500 Code
Unexpected
(Text -> QErr) -> Text -> QErr
forall a b. (a -> b) -> a -> b
$ Text
"could not find cron trigger "
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> TriggerName
name
TriggerName -> Text -> Text
forall t. ToTxt t => t -> Text -> Text
<<> Text
" in the schema cache"
Just CronTriggerInfo {[EventHeaderInfo]
Maybe Text
Maybe Value
Maybe RequestTransform
Maybe MetadataResponseTransform
CronSchedule
EnvRecord ResolvedWebhook
TriggerName
STRetryConf
ctiName :: CronTriggerInfo -> TriggerName
ctiSchedule :: CronTriggerInfo -> CronSchedule
ctiPayload :: CronTriggerInfo -> Maybe Value
ctiRetryConf :: CronTriggerInfo -> STRetryConf
ctiWebhookInfo :: CronTriggerInfo -> EnvRecord ResolvedWebhook
ctiHeaders :: CronTriggerInfo -> [EventHeaderInfo]
ctiComment :: CronTriggerInfo -> Maybe Text
ctiRequestTransform :: CronTriggerInfo -> Maybe RequestTransform
ctiResponseTransform :: CronTriggerInfo -> Maybe MetadataResponseTransform
ctiName :: TriggerName
ctiSchedule :: CronSchedule
ctiPayload :: Maybe Value
ctiRetryConf :: STRetryConf
ctiWebhookInfo :: EnvRecord ResolvedWebhook
ctiHeaders :: [EventHeaderInfo]
ctiComment :: Maybe Text
ctiRequestTransform :: Maybe RequestTransform
ctiResponseTransform :: Maybe MetadataResponseTransform
..} -> do
let payload :: ScheduledEventWebhookPayload
payload =
ScheduledEventId
-> Maybe TriggerName
-> UTCTime
-> Value
-> Maybe Text
-> Maybe UTCTime
-> Maybe RequestTransform
-> Maybe MetadataResponseTransform
-> ScheduledEventWebhookPayload
ScheduledEventWebhookPayload
ScheduledEventId
id'
(TriggerName -> Maybe TriggerName
forall a. a -> Maybe a
Just TriggerName
name)
UTCTime
st
(Value -> Maybe Value -> Value
forall a. a -> Maybe a -> a
fromMaybe Value
J.Null Maybe Value
ctiPayload)
Maybe Text
ctiComment
Maybe UTCTime
forall a. Maybe a
Nothing
Maybe RequestTransform
ctiRequestTransform
Maybe MetadataResponseTransform
ctiResponseTransform
retryCtx :: RetryContext
retryCtx = Int -> STRetryConf -> RetryContext
RetryContext Int
tries STRetryConf
ctiRetryConf
eventProcessingTimeout :: DiffTime
eventProcessingTimeout = DiffTime -> DiffTime -> DiffTime
forall a. Ord a => a -> a -> a
min DiffTime
upperBoundScheduledEventTimeout (Refined NonNegative DiffTime -> DiffTime
forall {k} (p :: k) x. Refined p x -> x
unrefine (Refined NonNegative DiffTime -> DiffTime)
-> Refined NonNegative DiffTime -> DiffTime
forall a b. (a -> b) -> a -> b
$ STRetryConf -> Refined NonNegative DiffTime
strcTimeoutSeconds (STRetryConf -> Refined NonNegative DiffTime)
-> STRetryConf -> Refined NonNegative DiffTime
forall a b. (a -> b) -> a -> b
$ STRetryConf
ctiRetryConf)
processScheduledEventAction :: m (Either QErr ())
processScheduledEventAction =
ExceptT QErr m () -> m (Either QErr ())
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT
(ExceptT QErr m () -> m (Either QErr ()))
-> ExceptT QErr m () -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$ (ReaderT (Logger Hasura, Manager) (ExceptT QErr m) ()
-> (Logger Hasura, Manager) -> ExceptT QErr m ())
-> (Logger Hasura, Manager)
-> ReaderT (Logger Hasura, Manager) (ExceptT QErr m) ()
-> ExceptT QErr m ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip ReaderT (Logger Hasura, Manager) (ExceptT QErr m) ()
-> (Logger Hasura, Manager) -> ExceptT QErr m ()
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT (Logger Hasura
logger, Manager
httpMgr)
(ReaderT (Logger Hasura, Manager) (ExceptT QErr m) ()
-> ExceptT QErr m ())
-> ReaderT (Logger Hasura, Manager) (ExceptT QErr m) ()
-> ExceptT QErr m ()
forall a b. (a -> b) -> a -> b
$ ScheduledTriggerMetrics
-> ScheduledEventId
-> [EventHeaderInfo]
-> RetryContext
-> ScheduledEventWebhookPayload
-> EnvRecord ResolvedWebhook
-> ScheduledEventType
-> ReaderT (Logger Hasura, Manager) (ExceptT QErr m) ()
forall r (m :: * -> *).
(MonadReader r m, Has Manager r, Has (Logger Hasura) r, MonadIO m,
MonadTrace m, MonadMetadataStorage m, MonadError QErr m) =>
ScheduledTriggerMetrics
-> ScheduledEventId
-> [EventHeaderInfo]
-> RetryContext
-> ScheduledEventWebhookPayload
-> EnvRecord ResolvedWebhook
-> ScheduledEventType
-> m ()
processScheduledEvent
ScheduledTriggerMetrics
scheduledTriggerMetrics
ScheduledEventId
id'
[EventHeaderInfo]
ctiHeaders
RetryContext
retryCtx
ScheduledEventWebhookPayload
payload
EnvRecord ResolvedWebhook
ctiWebhookInfo
ScheduledEventType
Cron
Maybe (Either QErr ())
eventProcessedMaybe <-
Int -> m (Either QErr ()) -> m (Maybe (Either QErr ()))
forall (m :: * -> *) a.
MonadBaseControl IO m =>
Int -> m a -> m (Maybe a)
timeout (Integer -> Int
forall a. Num a => Integer -> a
fromInteger (DiffTime -> Integer
diffTimeToMicroSeconds DiffTime
eventProcessingTimeout)) (m (Either QErr ()) -> m (Maybe (Either QErr ())))
-> m (Either QErr ()) -> m (Maybe (Either QErr ()))
forall a b. (a -> b) -> a -> b
$ m (Either QErr ())
processScheduledEventAction
case Maybe (Either QErr ())
eventProcessedMaybe of
Maybe (Either QErr ())
Nothing -> do
let eventTimeoutMessage :: Text
eventTimeoutMessage = Text
"Cron Scheduled event " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> ScheduledEventId
id' ScheduledEventId -> Text -> Text
forall t. ToTxt t => t -> Text -> Text
<<> Text
" of cron trigger " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> TriggerName
name TriggerName -> Text -> Text
forall t. ToTxt t => t -> Text -> Text
<<> Text
" timed out while processing."
eventTimeoutError :: QErr
eventTimeoutError = Code -> Text -> QErr
err500 Code
TimeoutErrorCode Text
eventTimeoutMessage
QErr -> m ()
logInternalError QErr
eventTimeoutError
ExceptT QErr m () -> m (Either QErr ())
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ScheduledEventId
-> RetryContext
-> [HeaderConf]
-> ScheduledEventType
-> Value
-> HTTPErr Any
-> ScheduledTriggerMetrics
-> ExceptT QErr m ()
forall (m :: * -> *) (a :: TriggerTypes).
(MonadIO m, MonadMetadataStorage m, MonadError QErr m) =>
ScheduledEventId
-> RetryContext
-> [HeaderConf]
-> ScheduledEventType
-> Value
-> HTTPErr a
-> ScheduledTriggerMetrics
-> m ()
processError ScheduledEventId
id' RetryContext
retryCtx [] ScheduledEventType
Cron (Text -> Value
mkErrorObject Text
eventTimeoutMessage) (String -> HTTPErr Any
forall (a :: TriggerTypes). String -> HTTPErr a
HOther (String -> HTTPErr Any) -> String -> HTTPErr Any
forall a b. (a -> b) -> a -> b
$ Text -> String
T.unpack Text
eventTimeoutMessage) ScheduledTriggerMetrics
scheduledTriggerMetrics)
m (Either QErr ()) -> (Either QErr () -> m ()) -> m ()
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Either QErr () -> (QErr -> m ()) -> m ()
forall (m :: * -> *) e a.
Applicative m =>
Either e a -> (e -> m a) -> m a
`onLeft` QErr -> m ()
logInternalError)
Just Either QErr ()
finally -> Either QErr () -> (QErr -> m ()) -> m ()
forall (m :: * -> *) e a.
Applicative m =>
Either e a -> (e -> m a) -> m a
onLeft Either QErr ()
finally QErr -> m ()
logInternalError
ScheduledEventId -> TVar (Set ScheduledEventId) -> m ()
forall (m :: * -> *).
MonadIO m =>
ScheduledEventId -> TVar (Set ScheduledEventId) -> m ()
removeEventFromLockedEvents ScheduledEventId
id' TVar (Set ScheduledEventId)
lockedCronEvents
where
logInternalError :: QErr -> m ()
logInternalError QErr
err = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ())
-> (ScheduledTriggerInternalErr -> IO ())
-> ScheduledTriggerInternalErr
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Logger Hasura
-> forall a (m :: * -> *).
(ToEngineLog a Hasura, MonadIO m) =>
a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
(ToEngineLog a impl, MonadIO m) =>
a -> m ()
L.unLogger Logger Hasura
logger (ScheduledTriggerInternalErr -> m ())
-> ScheduledTriggerInternalErr -> m ()
forall a b. (a -> b) -> a -> b
$ QErr -> ScheduledTriggerInternalErr
ScheduledTriggerInternalErr QErr
err
mkErrorObject :: Text -> J.Value
mkErrorObject :: Text -> Value
mkErrorObject Text
errorMessage =
[Pair] -> Value
J.object ([Pair] -> Value) -> [Pair] -> Value
forall a b. (a -> b) -> a -> b
$ [Key
"error" Key -> Text -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
J..= Text
errorMessage]
processOneOffScheduledEvents ::
( MonadIO m,
Tracing.MonadTrace m,
MonadMetadataStorage m,
MonadBaseControl IO m
) =>
Env.Environment ->
L.Logger L.Hasura ->
HTTP.Manager ->
ScheduledTriggerMetrics ->
[OneOffScheduledEvent] ->
TVar (Set.Set OneOffScheduledEventId) ->
m ()
processOneOffScheduledEvents :: forall (m :: * -> *).
(MonadIO m, MonadTrace m, MonadMetadataStorage m,
MonadBaseControl IO m) =>
Environment
-> Logger Hasura
-> Manager
-> ScheduledTriggerMetrics
-> [OneOffScheduledEvent]
-> TVar (Set ScheduledEventId)
-> m ()
processOneOffScheduledEvents
Environment
env
Logger Hasura
logger
Manager
httpMgr
ScheduledTriggerMetrics
scheduledTriggerMetrics
[OneOffScheduledEvent]
oneOffEvents
TVar (Set ScheduledEventId)
lockedOneOffScheduledEvents = do
[ScheduledEventId] -> TVar (Set ScheduledEventId) -> m ()
forall (m :: * -> *).
MonadIO m =>
[ScheduledEventId] -> TVar (Set ScheduledEventId) -> m ()
saveLockedEvents ((OneOffScheduledEvent -> ScheduledEventId)
-> [OneOffScheduledEvent] -> [ScheduledEventId]
forall a b. (a -> b) -> [a] -> [b]
map OneOffScheduledEvent -> ScheduledEventId
_ooseId [OneOffScheduledEvent]
oneOffEvents) TVar (Set ScheduledEventId)
lockedOneOffScheduledEvents
[OneOffScheduledEvent] -> (OneOffScheduledEvent -> m ()) -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, MonadBaseControl IO m) =>
t a -> (a -> m b) -> m ()
forConcurrently_ [OneOffScheduledEvent]
oneOffEvents ((OneOffScheduledEvent -> m ()) -> m ())
-> (OneOffScheduledEvent -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \OneOffScheduledEvent {Int
[HeaderConf]
Maybe Text
Maybe Value
Maybe UTCTime
Maybe RequestTransform
Maybe MetadataResponseTransform
Text
UTCTime
ScheduledEventId
InputWebhook
STRetryConf
_ooseId :: OneOffScheduledEvent -> ScheduledEventId
_ooseId :: ScheduledEventId
_ooseWebhookConf :: InputWebhook
_ooseScheduledTime :: UTCTime
_ooseRetryConf :: STRetryConf
_oosePayload :: Maybe Value
_ooseHeaderConf :: [HeaderConf]
_ooseStatus :: Text
_ooseTries :: Int
_ooseCreatedAt :: UTCTime
_ooseNextRetryAt :: Maybe UTCTime
_ooseComment :: Maybe Text
_ooseRequestTransform :: Maybe RequestTransform
_ooseResponseTransform :: Maybe MetadataResponseTransform
_ooseWebhookConf :: OneOffScheduledEvent -> InputWebhook
_ooseScheduledTime :: OneOffScheduledEvent -> UTCTime
_ooseRetryConf :: OneOffScheduledEvent -> STRetryConf
_oosePayload :: OneOffScheduledEvent -> Maybe Value
_ooseHeaderConf :: OneOffScheduledEvent -> [HeaderConf]
_ooseStatus :: OneOffScheduledEvent -> Text
_ooseTries :: OneOffScheduledEvent -> Int
_ooseCreatedAt :: OneOffScheduledEvent -> UTCTime
_ooseNextRetryAt :: OneOffScheduledEvent -> Maybe UTCTime
_ooseComment :: OneOffScheduledEvent -> Maybe Text
_ooseRequestTransform :: OneOffScheduledEvent -> Maybe RequestTransform
_ooseResponseTransform :: OneOffScheduledEvent -> Maybe MetadataResponseTransform
..} -> do
((QErr -> m ()) -> (() -> m ()) -> Either QErr () -> m ()
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either QErr -> m ()
logInternalError () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure) (Either QErr () -> m ()) -> m (Either QErr ()) -> m ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< ExceptT QErr m () -> m (Either QErr ())
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT do
let payload :: ScheduledEventWebhookPayload
payload =
ScheduledEventId
-> Maybe TriggerName
-> UTCTime
-> Value
-> Maybe Text
-> Maybe UTCTime
-> Maybe RequestTransform
-> Maybe MetadataResponseTransform
-> ScheduledEventWebhookPayload
ScheduledEventWebhookPayload
ScheduledEventId
_ooseId
Maybe TriggerName
forall a. Maybe a
Nothing
UTCTime
_ooseScheduledTime
(Value -> Maybe Value -> Value
forall a. a -> Maybe a -> a
fromMaybe Value
J.Null Maybe Value
_oosePayload)
Maybe Text
_ooseComment
(UTCTime -> Maybe UTCTime
forall a. a -> Maybe a
Just UTCTime
_ooseCreatedAt)
Maybe RequestTransform
_ooseRequestTransform
Maybe MetadataResponseTransform
_ooseResponseTransform
retryCtx :: RetryContext
retryCtx = Int -> STRetryConf -> RetryContext
RetryContext Int
_ooseTries STRetryConf
_ooseRetryConf
resolvedWebhookInfoEither :: Either ResolveWebhookError ResolvedWebhook
resolvedWebhookInfoEither = Environment
-> InputWebhook -> Either ResolveWebhookError ResolvedWebhook
resolveWebhookEither Environment
env InputWebhook
_ooseWebhookConf
resolvedHeaderInfoEither :: Either ResolveHeaderError [EventHeaderInfo]
resolvedHeaderInfoEither = Environment
-> [HeaderConf] -> Either ResolveHeaderError [EventHeaderInfo]
getHeaderInfosFromConfEither Environment
env [HeaderConf]
_ooseHeaderConf
webhookAndHeaderInfo :: Either
(These ResolveWebhookError ResolveHeaderError)
(EnvRecord ResolvedWebhook, [EventHeaderInfo])
webhookAndHeaderInfo = case (Either ResolveWebhookError ResolvedWebhook
resolvedWebhookInfoEither, Either ResolveHeaderError [EventHeaderInfo]
resolvedHeaderInfoEither) of
(Right ResolvedWebhook
resolvedEventWebhookInfo, Right [EventHeaderInfo]
resolvedEventHeaderInfo) -> do
let resolvedWebhookEnvRecord :: EnvRecord ResolvedWebhook
resolvedWebhookEnvRecord = Text -> ResolvedWebhook -> EnvRecord ResolvedWebhook
forall a. Text -> a -> EnvRecord a
EnvRecord (InputWebhook -> Text
getTemplateFromUrl InputWebhook
_ooseWebhookConf) ResolvedWebhook
resolvedEventWebhookInfo
(EnvRecord ResolvedWebhook, [EventHeaderInfo])
-> Either
(These ResolveWebhookError ResolveHeaderError)
(EnvRecord ResolvedWebhook, [EventHeaderInfo])
forall a b. b -> Either a b
Right (EnvRecord ResolvedWebhook
resolvedWebhookEnvRecord, [EventHeaderInfo]
resolvedEventHeaderInfo)
(Left ResolveWebhookError
eventWebhookErrorVars, Right [EventHeaderInfo]
_) -> These ResolveWebhookError ResolveHeaderError
-> Either
(These ResolveWebhookError ResolveHeaderError)
(EnvRecord ResolvedWebhook, [EventHeaderInfo])
forall a b. a -> Either a b
Left (These ResolveWebhookError ResolveHeaderError
-> Either
(These ResolveWebhookError ResolveHeaderError)
(EnvRecord ResolvedWebhook, [EventHeaderInfo]))
-> These ResolveWebhookError ResolveHeaderError
-> Either
(These ResolveWebhookError ResolveHeaderError)
(EnvRecord ResolvedWebhook, [EventHeaderInfo])
forall a b. (a -> b) -> a -> b
$ ResolveWebhookError -> These ResolveWebhookError ResolveHeaderError
forall a b. a -> These a b
This ResolveWebhookError
eventWebhookErrorVars
(Right ResolvedWebhook
_, Left ResolveHeaderError
eventHeaderErrorVars) -> These ResolveWebhookError ResolveHeaderError
-> Either
(These ResolveWebhookError ResolveHeaderError)
(EnvRecord ResolvedWebhook, [EventHeaderInfo])
forall a b. a -> Either a b
Left (These ResolveWebhookError ResolveHeaderError
-> Either
(These ResolveWebhookError ResolveHeaderError)
(EnvRecord ResolvedWebhook, [EventHeaderInfo]))
-> These ResolveWebhookError ResolveHeaderError
-> Either
(These ResolveWebhookError ResolveHeaderError)
(EnvRecord ResolvedWebhook, [EventHeaderInfo])
forall a b. (a -> b) -> a -> b
$ ResolveHeaderError -> These ResolveWebhookError ResolveHeaderError
forall a b. b -> These a b
That ResolveHeaderError
eventHeaderErrorVars
(Left ResolveWebhookError
eventWebhookErrors, Left ResolveHeaderError
eventHeaderErrorVars) -> These ResolveWebhookError ResolveHeaderError
-> Either
(These ResolveWebhookError ResolveHeaderError)
(EnvRecord ResolvedWebhook, [EventHeaderInfo])
forall a b. a -> Either a b
Left (These ResolveWebhookError ResolveHeaderError
-> Either
(These ResolveWebhookError ResolveHeaderError)
(EnvRecord ResolvedWebhook, [EventHeaderInfo]))
-> These ResolveWebhookError ResolveHeaderError
-> Either
(These ResolveWebhookError ResolveHeaderError)
(EnvRecord ResolvedWebhook, [EventHeaderInfo])
forall a b. (a -> b) -> a -> b
$ ResolveWebhookError
-> ResolveHeaderError
-> These ResolveWebhookError ResolveHeaderError
forall a b. a -> b -> These a b
These ResolveWebhookError
eventWebhookErrors ResolveHeaderError
eventHeaderErrorVars
case Either
(These ResolveWebhookError ResolveHeaderError)
(EnvRecord ResolvedWebhook, [EventHeaderInfo])
webhookAndHeaderInfo of
Right (EnvRecord ResolvedWebhook
webhookEnvRecord, [EventHeaderInfo]
eventHeaderInfo) -> do
let processScheduledEventAction :: ExceptT QErr m ()
processScheduledEventAction =
(ReaderT (Logger Hasura, Manager) (ExceptT QErr m) ()
-> (Logger Hasura, Manager) -> ExceptT QErr m ())
-> (Logger Hasura, Manager)
-> ReaderT (Logger Hasura, Manager) (ExceptT QErr m) ()
-> ExceptT QErr m ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip ReaderT (Logger Hasura, Manager) (ExceptT QErr m) ()
-> (Logger Hasura, Manager) -> ExceptT QErr m ()
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT (Logger Hasura
logger, Manager
httpMgr)
(ReaderT (Logger Hasura, Manager) (ExceptT QErr m) ()
-> ExceptT QErr m ())
-> ReaderT (Logger Hasura, Manager) (ExceptT QErr m) ()
-> ExceptT QErr m ()
forall a b. (a -> b) -> a -> b
$ ScheduledTriggerMetrics
-> ScheduledEventId
-> [EventHeaderInfo]
-> RetryContext
-> ScheduledEventWebhookPayload
-> EnvRecord ResolvedWebhook
-> ScheduledEventType
-> ReaderT (Logger Hasura, Manager) (ExceptT QErr m) ()
forall r (m :: * -> *).
(MonadReader r m, Has Manager r, Has (Logger Hasura) r, MonadIO m,
MonadTrace m, MonadMetadataStorage m, MonadError QErr m) =>
ScheduledTriggerMetrics
-> ScheduledEventId
-> [EventHeaderInfo]
-> RetryContext
-> ScheduledEventWebhookPayload
-> EnvRecord ResolvedWebhook
-> ScheduledEventType
-> m ()
processScheduledEvent ScheduledTriggerMetrics
scheduledTriggerMetrics ScheduledEventId
_ooseId [EventHeaderInfo]
eventHeaderInfo RetryContext
retryCtx ScheduledEventWebhookPayload
payload EnvRecord ResolvedWebhook
webhookEnvRecord ScheduledEventType
OneOff
eventTimeout :: DiffTime
eventTimeout = Refined NonNegative DiffTime -> DiffTime
forall {k} (p :: k) x. Refined p x -> x
unrefine (Refined NonNegative DiffTime -> DiffTime)
-> Refined NonNegative DiffTime -> DiffTime
forall a b. (a -> b) -> a -> b
$ STRetryConf -> Refined NonNegative DiffTime
strcTimeoutSeconds (STRetryConf -> Refined NonNegative DiffTime)
-> STRetryConf -> Refined NonNegative DiffTime
forall a b. (a -> b) -> a -> b
$ STRetryConf
_ooseRetryConf
Int -> ExceptT QErr m () -> ExceptT QErr m (Maybe ())
forall (m :: * -> *) a.
MonadBaseControl IO m =>
Int -> m a -> m (Maybe a)
timeout (Integer -> Int
forall a. Num a => Integer -> a
fromInteger (DiffTime -> Integer
diffTimeToMicroSeconds (DiffTime -> DiffTime -> DiffTime
forall a. Ord a => a -> a -> a
min DiffTime
upperBoundScheduledEventTimeout DiffTime
eventTimeout))) ExceptT QErr m ()
processScheduledEventAction
ExceptT QErr m (Maybe ()) -> ExceptT QErr m () -> ExceptT QErr m ()
forall (m :: * -> *) a. Monad m => m (Maybe a) -> m a -> m a
`onNothingM` ( do
let eventTimeoutMessage :: Text
eventTimeoutMessage = Text
"One-off Scheduled event " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> ScheduledEventId
_ooseId ScheduledEventId -> Text -> Text
forall t. ToTxt t => t -> Text -> Text
<<> Text
" timed out while processing."
eventTimeoutError :: QErr
eventTimeoutError = Code -> Text -> QErr
err500 Code
TimeoutErrorCode Text
eventTimeoutMessage
m () -> ExceptT QErr m ()
forall (m :: * -> *) a. Monad m => m a -> ExceptT QErr m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m () -> ExceptT QErr m ()) -> m () -> ExceptT QErr m ()
forall a b. (a -> b) -> a -> b
$ QErr -> m ()
logInternalError QErr
eventTimeoutError
ScheduledEventId
-> RetryContext
-> [HeaderConf]
-> ScheduledEventType
-> Value
-> HTTPErr Any
-> ScheduledTriggerMetrics
-> ExceptT QErr m ()
forall (m :: * -> *) (a :: TriggerTypes).
(MonadIO m, MonadMetadataStorage m, MonadError QErr m) =>
ScheduledEventId
-> RetryContext
-> [HeaderConf]
-> ScheduledEventType
-> Value
-> HTTPErr a
-> ScheduledTriggerMetrics
-> m ()
processError ScheduledEventId
_ooseId RetryContext
retryCtx [] ScheduledEventType
OneOff (Text -> Value
mkErrorObject Text
eventTimeoutMessage) (String -> HTTPErr Any
forall (a :: TriggerTypes). String -> HTTPErr a
HOther (String -> HTTPErr Any) -> String -> HTTPErr Any
forall a b. (a -> b) -> a -> b
$ Text -> String
T.unpack Text
eventTimeoutMessage) ScheduledTriggerMetrics
scheduledTriggerMetrics
)
ScheduledEventId
-> TVar (Set ScheduledEventId) -> ExceptT QErr m ()
forall (m :: * -> *).
MonadIO m =>
ScheduledEventId -> TVar (Set ScheduledEventId) -> m ()
removeEventFromLockedEvents ScheduledEventId
_ooseId TVar (Set ScheduledEventId)
lockedOneOffScheduledEvents
Left These ResolveWebhookError ResolveHeaderError
envVarError ->
ScheduledEventId
-> RetryContext
-> [HeaderConf]
-> ScheduledEventType
-> Value
-> HTTPErr Any
-> ScheduledTriggerMetrics
-> ExceptT QErr m ()
forall (m :: * -> *) (a :: TriggerTypes).
(MonadIO m, MonadMetadataStorage m, MonadError QErr m) =>
ScheduledEventId
-> RetryContext
-> [HeaderConf]
-> ScheduledEventType
-> Value
-> HTTPErr a
-> ScheduledTriggerMetrics
-> m ()
processError
ScheduledEventId
_ooseId
RetryContext
retryCtx
[]
ScheduledEventType
OneOff
(Text -> Value
mkErrorObject (Text -> Value) -> Text -> Value
forall a b. (a -> b) -> a -> b
$ Text
"Error creating the request. " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> (These ResolveWebhookError ResolveHeaderError -> Text
mkInvalidEnvVarErrMsg (These ResolveWebhookError ResolveHeaderError -> Text)
-> These ResolveWebhookError ResolveHeaderError -> Text
forall a b. (a -> b) -> a -> b
$ These ResolveWebhookError ResolveHeaderError
envVarError))
(String -> HTTPErr Any
forall (a :: TriggerTypes). String -> HTTPErr a
HOther (String -> HTTPErr Any) -> String -> HTTPErr Any
forall a b. (a -> b) -> a -> b
$ Text -> String
T.unpack (Text -> String) -> Text -> String
forall a b. (a -> b) -> a -> b
$ QErr -> Text
qeError (Code -> Text -> QErr
err400 Code
NotFound (These ResolveWebhookError ResolveHeaderError -> Text
mkInvalidEnvVarErrMsg These ResolveWebhookError ResolveHeaderError
envVarError)))
ScheduledTriggerMetrics
scheduledTriggerMetrics
where
logInternalError :: QErr -> m ()
logInternalError QErr
err = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ())
-> (ScheduledTriggerInternalErr -> IO ())
-> ScheduledTriggerInternalErr
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Logger Hasura
-> forall a (m :: * -> *).
(ToEngineLog a Hasura, MonadIO m) =>
a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
(ToEngineLog a impl, MonadIO m) =>
a -> m ()
L.unLogger Logger Hasura
logger (ScheduledTriggerInternalErr -> m ())
-> ScheduledTriggerInternalErr -> m ()
forall a b. (a -> b) -> a -> b
$ QErr -> ScheduledTriggerInternalErr
ScheduledTriggerInternalErr QErr
err
getTemplateFromUrl :: InputWebhook -> Text
getTemplateFromUrl InputWebhook
url = Template -> Text
printTemplate (Template -> Text) -> Template -> Text
forall a b. (a -> b) -> a -> b
$ InputWebhook -> Template
unInputWebhook InputWebhook
url
mkInvalidEnvVarErrMsg :: These ResolveWebhookError ResolveHeaderError -> Text
mkInvalidEnvVarErrMsg These ResolveWebhookError ResolveHeaderError
envVarErrorValues = Text
"The value for environment variables not found: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> (These ResolveWebhookError ResolveHeaderError -> Text
getInvalidEnvVarText These ResolveWebhookError ResolveHeaderError
envVarErrorValues)
mkErrorObject :: Text -> J.Value
mkErrorObject :: Text -> Value
mkErrorObject Text
errorMessage =
[Pair] -> Value
J.object ([Pair] -> Value) -> [Pair] -> Value
forall a b. (a -> b) -> a -> b
$ [Key
"error" Key -> Text -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
J..= Text
errorMessage]
getInvalidEnvVarText :: These ResolveWebhookError ResolveHeaderError -> Text
getInvalidEnvVarText :: These ResolveWebhookError ResolveHeaderError -> Text
getInvalidEnvVarText (This ResolveWebhookError
a) = ResolveWebhookError -> Text
forall a. ToTxt a => a -> Text
toTxt ResolveWebhookError
a
getInvalidEnvVarText (That ResolveHeaderError
b) = ResolveHeaderError -> Text
forall a. ToTxt a => a -> Text
toTxt ResolveHeaderError
b
getInvalidEnvVarText (These ResolveWebhookError
a ResolveHeaderError
b) = ResolveWebhookError -> Text
forall a. ToTxt a => a -> Text
toTxt ResolveWebhookError
a Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
", " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> ResolveHeaderError -> Text
forall a. ToTxt a => a -> Text
toTxt ResolveHeaderError
b
processScheduledTriggers ::
( MonadIO m,
Tracing.MonadTrace m,
MonadMetadataStorage m,
MonadBaseControl IO m
) =>
IO Env.Environment ->
L.Logger L.Hasura ->
FetchedScheduledEventsStatsLogger ->
HTTP.Manager ->
ScheduledTriggerMetrics ->
IO SchemaCache ->
LockedEventsCtx ->
m (Forever m)
processScheduledTriggers :: forall (m :: * -> *).
(MonadIO m, MonadTrace m, MonadMetadataStorage m,
MonadBaseControl IO m) =>
IO Environment
-> Logger Hasura
-> FetchedScheduledEventsStatsLogger
-> Manager
-> ScheduledTriggerMetrics
-> IO SchemaCache
-> LockedEventsCtx
-> m (Forever m)
processScheduledTriggers IO Environment
getEnvHook Logger Hasura
logger FetchedScheduledEventsStatsLogger
statsLogger Manager
httpMgr ScheduledTriggerMetrics
scheduledTriggerMetrics IO SchemaCache
getSC LockedEventsCtx {TVar (HashMap SourceName (Set ScheduledEventId))
TVar (Set ScheduledEventId)
leCronEvents :: TVar (Set ScheduledEventId)
leOneOffEvents :: TVar (Set ScheduledEventId)
leEvents :: TVar (HashMap SourceName (Set ScheduledEventId))
leActionEvents :: TVar (Set ScheduledEventId)
leCronEvents :: LockedEventsCtx -> TVar (Set ScheduledEventId)
leOneOffEvents :: LockedEventsCtx -> TVar (Set ScheduledEventId)
leEvents :: LockedEventsCtx -> TVar (HashMap SourceName (Set ScheduledEventId))
leActionEvents :: LockedEventsCtx -> TVar (Set ScheduledEventId)
..} = do
Forever m -> m (Forever m)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return
(Forever m -> m (Forever m)) -> Forever m -> m (Forever m)
forall a b. (a -> b) -> a -> b
$ () -> (() -> m ()) -> Forever m
forall (m :: * -> *) a. a -> (a -> m a) -> Forever m
Forever ()
((() -> m ()) -> Forever m) -> (() -> m ()) -> Forever m
forall a b. (a -> b) -> a -> b
$ m () -> () -> m ()
forall a b. a -> b -> a
const do
HashMap TriggerName CronTriggerInfo
cronTriggersInfo <- SchemaCache -> HashMap TriggerName CronTriggerInfo
scCronTriggers (SchemaCache -> HashMap TriggerName CronTriggerInfo)
-> m SchemaCache -> m (HashMap TriggerName CronTriggerInfo)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO SchemaCache -> m SchemaCache
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO SchemaCache
getSC
Environment
env <- IO Environment -> m Environment
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO Environment
getEnvHook
[TriggerName]
-> m (Either QErr ([CronEvent], [OneOffScheduledEvent]))
forall (m :: * -> *).
MonadMetadataStorage m =>
[TriggerName]
-> m (Either QErr ([CronEvent], [OneOffScheduledEvent]))
getScheduledEventsForDelivery (HashMap TriggerName CronTriggerInfo -> [TriggerName]
forall k v. HashMap k v -> [k]
HashMap.keys HashMap TriggerName CronTriggerInfo
cronTriggersInfo) m (Either QErr ([CronEvent], [OneOffScheduledEvent]))
-> (Either QErr ([CronEvent], [OneOffScheduledEvent]) -> m ())
-> m ()
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Left QErr
e -> QErr -> m ()
logInternalError QErr
e
Right ([CronEvent]
cronEvents, [OneOffScheduledEvent]
oneOffEvents) -> do
FetchedScheduledEventsStatsLogger
-> CronEventsCount -> OneOffScheduledEventsCount -> m ()
forall (m :: * -> *).
MonadIO m =>
FetchedScheduledEventsStatsLogger
-> CronEventsCount -> OneOffScheduledEventsCount -> m ()
logFetchedScheduledEventsStats FetchedScheduledEventsStatsLogger
statsLogger (Int -> CronEventsCount
CronEventsCount (Int -> CronEventsCount) -> Int -> CronEventsCount
forall a b. (a -> b) -> a -> b
$ [CronEvent] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [CronEvent]
cronEvents) (Int -> OneOffScheduledEventsCount
OneOffScheduledEventsCount (Int -> OneOffScheduledEventsCount)
-> Int -> OneOffScheduledEventsCount
forall a b. (a -> b) -> a -> b
$ [OneOffScheduledEvent] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [OneOffScheduledEvent]
oneOffEvents)
Logger Hasura
-> Manager
-> ScheduledTriggerMetrics
-> [CronEvent]
-> HashMap TriggerName CronTriggerInfo
-> TVar (Set ScheduledEventId)
-> m ()
forall (m :: * -> *).
(MonadIO m, MonadMetadataStorage m, MonadTrace m,
MonadBaseControl IO m) =>
Logger Hasura
-> Manager
-> ScheduledTriggerMetrics
-> [CronEvent]
-> HashMap TriggerName CronTriggerInfo
-> TVar (Set ScheduledEventId)
-> m ()
processCronEvents Logger Hasura
logger Manager
httpMgr ScheduledTriggerMetrics
scheduledTriggerMetrics [CronEvent]
cronEvents HashMap TriggerName CronTriggerInfo
cronTriggersInfo TVar (Set ScheduledEventId)
leCronEvents
Environment
-> Logger Hasura
-> Manager
-> ScheduledTriggerMetrics
-> [OneOffScheduledEvent]
-> TVar (Set ScheduledEventId)
-> m ()
forall (m :: * -> *).
(MonadIO m, MonadTrace m, MonadMetadataStorage m,
MonadBaseControl IO m) =>
Environment
-> Logger Hasura
-> Manager
-> ScheduledTriggerMetrics
-> [OneOffScheduledEvent]
-> TVar (Set ScheduledEventId)
-> m ()
processOneOffScheduledEvents Environment
env Logger Hasura
logger Manager
httpMgr ScheduledTriggerMetrics
scheduledTriggerMetrics [OneOffScheduledEvent]
oneOffEvents TVar (Set ScheduledEventId)
leOneOffEvents
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ DiffTime -> IO ()
sleep (Seconds -> DiffTime
seconds Seconds
10)
where
logInternalError :: QErr -> m ()
logInternalError QErr
err = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ())
-> (ScheduledTriggerInternalErr -> IO ())
-> ScheduledTriggerInternalErr
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Logger Hasura
-> forall a (m :: * -> *).
(ToEngineLog a Hasura, MonadIO m) =>
a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
(ToEngineLog a impl, MonadIO m) =>
a -> m ()
L.unLogger Logger Hasura
logger (ScheduledTriggerInternalErr -> m ())
-> ScheduledTriggerInternalErr -> m ()
forall a b. (a -> b) -> a -> b
$ QErr -> ScheduledTriggerInternalErr
ScheduledTriggerInternalErr QErr
err
processScheduledEvent ::
( MonadReader r m,
Has HTTP.Manager r,
Has (L.Logger L.Hasura) r,
MonadIO m,
Tracing.MonadTrace m,
MonadMetadataStorage m,
MonadError QErr m
) =>
ScheduledTriggerMetrics ->
ScheduledEventId ->
[EventHeaderInfo] ->
RetryContext ->
ScheduledEventWebhookPayload ->
EnvRecord ResolvedWebhook ->
ScheduledEventType ->
m ()
processScheduledEvent :: forall r (m :: * -> *).
(MonadReader r m, Has Manager r, Has (Logger Hasura) r, MonadIO m,
MonadTrace m, MonadMetadataStorage m, MonadError QErr m) =>
ScheduledTriggerMetrics
-> ScheduledEventId
-> [EventHeaderInfo]
-> RetryContext
-> ScheduledEventWebhookPayload
-> EnvRecord ResolvedWebhook
-> ScheduledEventType
-> m ()
processScheduledEvent ScheduledTriggerMetrics
scheduledTriggerMetrics ScheduledEventId
eventId [EventHeaderInfo]
eventHeaders RetryContext
retryCtx ScheduledEventWebhookPayload
payload EnvRecord ResolvedWebhook
webhookUrl ScheduledEventType
type' =
SamplingPolicy -> Text -> m () -> m ()
forall (m :: * -> *) a.
(MonadIO m, MonadTrace m) =>
SamplingPolicy -> Text -> m a -> m a
Tracing.newTrace SamplingPolicy
Tracing.sampleAlways Text
traceNote do
UTCTime
currentTime <- IO UTCTime -> m UTCTime
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
let retryConf :: STRetryConf
retryConf = RetryContext -> STRetryConf
_rctxConf RetryContext
retryCtx
scheduledTime :: UTCTime
scheduledTime = ScheduledEventWebhookPayload -> UTCTime
sewpScheduledTime ScheduledEventWebhookPayload
payload
if NominalDiffTime -> DiffTime
forall x y. (Duration x, Duration y) => x -> y
convertDuration (UTCTime -> UTCTime -> NominalDiffTime
diffUTCTime UTCTime
currentTime UTCTime
scheduledTime)
DiffTime -> DiffTime -> Bool
forall a. Ord a => a -> a -> Bool
> Refined NonNegative DiffTime -> DiffTime
forall {k} (p :: k) x. Refined p x -> x
unrefine (STRetryConf -> Refined NonNegative DiffTime
strcToleranceSeconds STRetryConf
retryConf)
then ScheduledEventId -> ScheduledEventType -> m ()
forall (m :: * -> *).
(MonadMetadataStorage m, MonadError QErr m) =>
ScheduledEventId -> ScheduledEventType -> m ()
processDead ScheduledEventId
eventId ScheduledEventType
type'
else do
let timeoutSeconds :: Int
timeoutSeconds = DiffTime -> Int
forall b. Integral b => DiffTime -> b
forall a b. (RealFrac a, Integral b) => a -> b
round (DiffTime -> Int) -> DiffTime -> Int
forall a b. (a -> b) -> a -> b
$ Refined NonNegative DiffTime -> DiffTime
forall {k} (p :: k) x. Refined p x -> x
unrefine (STRetryConf -> Refined NonNegative DiffTime
strcTimeoutSeconds STRetryConf
retryConf)
httpTimeout :: ResponseTimeout
httpTimeout = Int -> ResponseTimeout
HTTP.responseTimeoutMicro (Int
timeoutSeconds Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000000)
([Header]
headers, [HeaderConf]
decodedHeaders) = [EventHeaderInfo] -> ([Header], [HeaderConf])
prepareHeaders [EventHeaderInfo]
eventHeaders
extraLogCtx :: ExtraLogContext
extraLogCtx = ScheduledEventId -> Maybe TriggerName -> ExtraLogContext
ExtraLogContext ScheduledEventId
eventId (ScheduledEventWebhookPayload -> Maybe TriggerName
sewpName ScheduledEventWebhookPayload
payload)
webhookReqBodyJson :: Value
webhookReqBodyJson = ScheduledEventWebhookPayload -> Value
forall a. ToJSON a => a -> Value
J.toJSON ScheduledEventWebhookPayload
payload
webhookReqBody :: ByteString
webhookReqBody = Value -> ByteString
forall a. ToJSON a => a -> ByteString
J.encode Value
webhookReqBodyJson
requestTransform :: Maybe RequestTransform
requestTransform = ScheduledEventWebhookPayload -> Maybe RequestTransform
sewpRequestTransform ScheduledEventWebhookPayload
payload
responseTransform :: Maybe ResponseTransform
responseTransform = MetadataResponseTransform -> ResponseTransform
mkResponseTransform (MetadataResponseTransform -> ResponseTransform)
-> Maybe MetadataResponseTransform -> Maybe ResponseTransform
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ScheduledEventWebhookPayload -> Maybe MetadataResponseTransform
sewpResponseTransform ScheduledEventWebhookPayload
payload
Either
(TransformableRequestError 'ScheduledType)
(Request, HTTPResp 'ScheduledType)
eitherReqRes <-
ExceptT
(TransformableRequestError 'ScheduledType)
m
(Request, HTTPResp 'ScheduledType)
-> m (Either
(TransformableRequestError 'ScheduledType)
(Request, HTTPResp 'ScheduledType))
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT
(ExceptT
(TransformableRequestError 'ScheduledType)
m
(Request, HTTPResp 'ScheduledType)
-> m (Either
(TransformableRequestError 'ScheduledType)
(Request, HTTPResp 'ScheduledType)))
-> ExceptT
(TransformableRequestError 'ScheduledType)
m
(Request, HTTPResp 'ScheduledType)
-> m (Either
(TransformableRequestError 'ScheduledType)
(Request, HTTPResp 'ScheduledType))
forall a b. (a -> b) -> a -> b
$ [Header]
-> ResponseTimeout
-> ByteString
-> Maybe RequestTransform
-> ResolvedWebhook
-> ExceptT
(TransformableRequestError 'ScheduledType) m RequestDetails
forall (a :: TriggerTypes) (m :: * -> *).
MonadError (TransformableRequestError a) m =>
[Header]
-> ResponseTimeout
-> ByteString
-> Maybe RequestTransform
-> ResolvedWebhook
-> m RequestDetails
mkRequest [Header]
headers ResponseTimeout
httpTimeout ByteString
webhookReqBody Maybe RequestTransform
requestTransform (EnvRecord ResolvedWebhook -> ResolvedWebhook
forall a. EnvRecord a -> a
_envVarValue EnvRecord ResolvedWebhook
webhookUrl)
ExceptT (TransformableRequestError 'ScheduledType) m RequestDetails
-> (RequestDetails
-> ExceptT
(TransformableRequestError 'ScheduledType)
m
(Request, HTTPResp 'ScheduledType))
-> ExceptT
(TransformableRequestError 'ScheduledType)
m
(Request, HTTPResp 'ScheduledType)
forall a b.
ExceptT (TransformableRequestError 'ScheduledType) m a
-> (a -> ExceptT (TransformableRequestError 'ScheduledType) m b)
-> ExceptT (TransformableRequestError 'ScheduledType) m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \RequestDetails
reqDetails -> do
let request :: Request
request = RequestDetails -> Request
extractRequest RequestDetails
reqDetails
logger :: Either (HTTPErr 'ScheduledType) (HTTPResp 'ScheduledType)
-> RequestDetails
-> ExceptT (TransformableRequestError 'ScheduledType) m ()
logger Either (HTTPErr 'ScheduledType) (HTTPResp 'ScheduledType)
e RequestDetails
d = do
Either (HTTPErr 'ScheduledType) (HTTPResp 'ScheduledType)
-> ExtraLogContext
-> RequestDetails
-> Text
-> [HeaderConf]
-> ExceptT (TransformableRequestError 'ScheduledType) m ()
forall r (m :: * -> *).
(MonadReader r m, Has (Logger Hasura) r, MonadIO m) =>
Either (HTTPErr 'ScheduledType) (HTTPResp 'ScheduledType)
-> ExtraLogContext
-> RequestDetails
-> Text
-> [HeaderConf]
-> m ()
logHTTPForST Either (HTTPErr 'ScheduledType) (HTTPResp 'ScheduledType)
e ExtraLogContext
extraLogCtx RequestDetails
d (EnvRecord ResolvedWebhook -> Text
forall a. EnvRecord a -> Text
_envVarName EnvRecord ResolvedWebhook
webhookUrl) [HeaderConf]
decodedHeaders
IO () -> ExceptT (TransformableRequestError 'ScheduledType) m ()
forall a.
IO a -> ExceptT (TransformableRequestError 'ScheduledType) m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ExceptT (TransformableRequestError 'ScheduledType) m ())
-> IO () -> ExceptT (TransformableRequestError 'ScheduledType) m ()
forall a b. (a -> b) -> a -> b
$ do
case Either (HTTPErr 'ScheduledType) (HTTPResp 'ScheduledType)
e of
Left HTTPErr 'ScheduledType
_err -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Right HTTPResp 'ScheduledType
response ->
Counter -> Int64 -> IO ()
Prometheus.Counter.add
(ScheduledTriggerMetrics -> Counter
stmScheduledTriggerBytesReceived ScheduledTriggerMetrics
scheduledTriggerMetrics)
(HTTPResp 'ScheduledType -> Int64
forall (a :: TriggerTypes). HTTPResp a -> Int64
hrsSize HTTPResp 'ScheduledType
response)
let RequestDetails {Int64
_rdOriginalSize :: Int64
_rdOriginalSize :: RequestDetails -> Int64
_rdOriginalSize, Maybe Int64
_rdTransformedSize :: Maybe Int64
_rdTransformedSize :: RequestDetails -> Maybe Int64
_rdTransformedSize} = RequestDetails
d
in Counter -> Int64 -> IO ()
Prometheus.Counter.add
(ScheduledTriggerMetrics -> Counter
stmScheduledTriggerBytesSent ScheduledTriggerMetrics
scheduledTriggerMetrics)
(Int64 -> Maybe Int64 -> Int64
forall a. a -> Maybe a -> a
fromMaybe Int64
_rdOriginalSize Maybe Int64
_rdTransformedSize)
case (ScheduledEventType
type', Either (HTTPErr 'ScheduledType) (HTTPResp 'ScheduledType)
e) of
(ScheduledEventType
Cron, Left HTTPErr 'ScheduledType
_err) -> Counter -> IO ()
Prometheus.Counter.inc (ScheduledTriggerMetrics -> Counter
stmCronEventsInvocationTotalFailure ScheduledTriggerMetrics
scheduledTriggerMetrics)
(ScheduledEventType
Cron, Right HTTPResp 'ScheduledType
_) -> Counter -> IO ()
Prometheus.Counter.inc (ScheduledTriggerMetrics -> Counter
stmCronEventsInvocationTotalSuccess ScheduledTriggerMetrics
scheduledTriggerMetrics)
(ScheduledEventType
OneOff, Left HTTPErr 'ScheduledType
_err) -> Counter -> IO ()
Prometheus.Counter.inc (ScheduledTriggerMetrics -> Counter
stmOneOffEventsInvocationTotalFailure ScheduledTriggerMetrics
scheduledTriggerMetrics)
(ScheduledEventType
OneOff, Right HTTPResp 'ScheduledType
_) -> Counter -> IO ()
Prometheus.Counter.inc (ScheduledTriggerMetrics -> Counter
stmOneOffEventsInvocationTotalSuccess ScheduledTriggerMetrics
scheduledTriggerMetrics)
sessionVars :: Maybe SessionVariables
sessionVars = RequestDetails -> Maybe SessionVariables
_rdSessionVars RequestDetails
reqDetails
HTTPResp 'ScheduledType
resp <- RequestDetails
-> Maybe ResponseTransform
-> Maybe SessionVariables
-> (Either (HTTPErr 'ScheduledType) (HTTPResp 'ScheduledType)
-> RequestDetails
-> ExceptT (TransformableRequestError 'ScheduledType) m ())
-> ExceptT
(TransformableRequestError 'ScheduledType)
m
(HTTPResp 'ScheduledType)
forall r (m :: * -> *) (a :: TriggerTypes).
(MonadReader r m, MonadError (TransformableRequestError a) m,
Has Manager r, Has (Logger Hasura) r, MonadIO m, MonadTrace m) =>
RequestDetails
-> Maybe ResponseTransform
-> Maybe SessionVariables
-> (Either (HTTPErr a) (HTTPResp a) -> RequestDetails -> m ())
-> m (HTTPResp a)
invokeRequest RequestDetails
reqDetails Maybe ResponseTransform
responseTransform Maybe SessionVariables
sessionVars Either (HTTPErr 'ScheduledType) (HTTPResp 'ScheduledType)
-> RequestDetails
-> ExceptT (TransformableRequestError 'ScheduledType) m ()
logger
(Request, HTTPResp 'ScheduledType)
-> ExceptT
(TransformableRequestError 'ScheduledType)
m
(Request, HTTPResp 'ScheduledType)
forall a.
a -> ExceptT (TransformableRequestError 'ScheduledType) m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Request
request, HTTPResp 'ScheduledType
resp)
case Either
(TransformableRequestError 'ScheduledType)
(Request, HTTPResp 'ScheduledType)
eitherReqRes of
Right (Request
req, HTTPResp 'ScheduledType
resp) ->
let reqBody :: Value
reqBody = Value -> Maybe Value -> Value
forall a. a -> Maybe a -> a
fromMaybe Value
J.Null (Maybe Value -> Value) -> Maybe Value -> Value
forall a b. (a -> b) -> a -> b
$ Getting (First ByteString) Request ByteString
-> Request -> Maybe ByteString
forall s (m :: * -> *) a.
MonadReader s m =>
Getting (First a) s a -> m (Maybe a)
preview ((RequestBody -> Const (First ByteString) RequestBody)
-> Request -> Const (First ByteString) Request
Lens' Request RequestBody
HTTP.body ((RequestBody -> Const (First ByteString) RequestBody)
-> Request -> Const (First ByteString) Request)
-> ((ByteString -> Const (First ByteString) ByteString)
-> RequestBody -> Const (First ByteString) RequestBody)
-> Getting (First ByteString) Request ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ByteString -> Const (First ByteString) ByteString)
-> RequestBody -> Const (First ByteString) RequestBody
Prism' RequestBody ByteString
HTTP._RequestBodyLBS) Request
req Maybe ByteString -> (ByteString -> Maybe Value) -> Maybe Value
forall a b. Maybe a -> (a -> Maybe b) -> Maybe b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall a. FromJSON a => ByteString -> Maybe a
J.decode @J.Value
in ScheduledEventId
-> [HeaderConf]
-> ScheduledEventType
-> Value
-> HTTPResp 'ScheduledType
-> ScheduledTriggerMetrics
-> m ()
forall (m :: * -> *) (a :: TriggerTypes).
(MonadMetadataStorage m, MonadError QErr m, MonadIO m) =>
ScheduledEventId
-> [HeaderConf]
-> ScheduledEventType
-> Value
-> HTTPResp a
-> ScheduledTriggerMetrics
-> m ()
processSuccess ScheduledEventId
eventId [HeaderConf]
decodedHeaders ScheduledEventType
type' Value
reqBody HTTPResp 'ScheduledType
resp ScheduledTriggerMetrics
scheduledTriggerMetrics
Left (HTTPError Value
reqBody HTTPErr 'ScheduledType
e) -> ScheduledEventId
-> RetryContext
-> [HeaderConf]
-> ScheduledEventType
-> Value
-> HTTPErr 'ScheduledType
-> ScheduledTriggerMetrics
-> m ()
forall (m :: * -> *) (a :: TriggerTypes).
(MonadIO m, MonadMetadataStorage m, MonadError QErr m) =>
ScheduledEventId
-> RetryContext
-> [HeaderConf]
-> ScheduledEventType
-> Value
-> HTTPErr a
-> ScheduledTriggerMetrics
-> m ()
processError ScheduledEventId
eventId RetryContext
retryCtx [HeaderConf]
decodedHeaders ScheduledEventType
type' Value
reqBody HTTPErr 'ScheduledType
e ScheduledTriggerMetrics
scheduledTriggerMetrics
Left (TransformationError Value
_ TransformErrorBundle
e) -> do
Logger Hasura
logger :: L.Logger L.Hasura <- (r -> Logger Hasura) -> m (Logger Hasura)
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks r -> Logger Hasura
forall a t. Has a t => t -> a
getter
Logger Hasura
-> forall a (m :: * -> *).
(ToEngineLog a Hasura, MonadIO m) =>
a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
(ToEngineLog a impl, MonadIO m) =>
a -> m ()
L.unLogger Logger Hasura
logger (UnstructuredLog -> m ()) -> UnstructuredLog -> m ()
forall a b. (a -> b) -> a -> b
$ LogLevel -> SerializableBlob -> UnstructuredLog
L.UnstructuredLog LogLevel
L.LevelError (ByteString -> SerializableBlob
SB.fromLBS (ByteString -> SerializableBlob) -> ByteString -> SerializableBlob
forall a b. (a -> b) -> a -> b
$ TransformErrorBundle -> ByteString
forall a. ToJSON a => a -> ByteString
J.encode TransformErrorBundle
e)
m (Either QErr ()) -> m ()
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM (m (Either QErr ()) -> m ()) -> m (Either QErr ()) -> m ()
forall a b. (a -> b) -> a -> b
$ ScheduledEventId
-> ScheduledEventOp -> ScheduledEventType -> m (Either QErr ())
forall (m :: * -> *).
MonadMetadataStorage m =>
ScheduledEventId
-> ScheduledEventOp -> ScheduledEventType -> m (Either QErr ())
setScheduledEventOp ScheduledEventId
eventId (ScheduledEventStatus -> ScheduledEventOp
SEOpStatus ScheduledEventStatus
SESError) ScheduledEventType
type'
where
traceNote :: Text
traceNote = Text
"Scheduled trigger" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> (TriggerName -> Text) -> Maybe TriggerName -> Text
forall m a. Monoid m => (a -> m) -> Maybe a -> m
forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
foldMap ((Text
": " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<>) (Text -> Text) -> (TriggerName -> Text) -> TriggerName -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TriggerName -> Text
triggerNameToTxt) (ScheduledEventWebhookPayload -> Maybe TriggerName
sewpName ScheduledEventWebhookPayload
payload)
processError ::
( MonadIO m,
MonadMetadataStorage m,
MonadError QErr m
) =>
ScheduledEventId ->
RetryContext ->
[HeaderConf] ->
ScheduledEventType ->
J.Value ->
HTTPErr a ->
ScheduledTriggerMetrics ->
m ()
processError :: forall (m :: * -> *) (a :: TriggerTypes).
(MonadIO m, MonadMetadataStorage m, MonadError QErr m) =>
ScheduledEventId
-> RetryContext
-> [HeaderConf]
-> ScheduledEventType
-> Value
-> HTTPErr a
-> ScheduledTriggerMetrics
-> m ()
processError ScheduledEventId
eventId RetryContext
retryCtx [HeaderConf]
decodedHeaders ScheduledEventType
type' Value
reqJson HTTPErr a
err ScheduledTriggerMetrics
scheduledTriggerMetric = do
let invocation :: Invocation 'ScheduledType
invocation = case HTTPErr a
err of
HClient HttpException
httpException ->
let statusMaybe :: Maybe Int
statusMaybe = HttpException -> Maybe Int
getHTTPExceptionStatus HttpException
httpException
in ScheduledEventId
-> Maybe Int
-> [HeaderConf]
-> SerializableBlob
-> [HeaderConf]
-> Value
-> Invocation 'ScheduledType
mkInvocation ScheduledEventId
eventId Maybe Int
statusMaybe [HeaderConf]
decodedHeaders (ByteString -> SerializableBlob
SB.fromLBS (ByteString -> SerializableBlob) -> ByteString -> SerializableBlob
forall a b. (a -> b) -> a -> b
$ HttpException -> ByteString
httpExceptionErrorEncoding HttpException
httpException) [] Value
reqJson
HStatus HTTPResp a
errResp -> do
let respPayload :: SerializableBlob
respPayload = HTTPResp a -> SerializableBlob
forall (a :: TriggerTypes). HTTPResp a -> SerializableBlob
hrsBody HTTPResp a
errResp
respHeaders :: [HeaderConf]
respHeaders = HTTPResp a -> [HeaderConf]
forall (a :: TriggerTypes). HTTPResp a -> [HeaderConf]
hrsHeaders HTTPResp a
errResp
respStatus :: Int
respStatus = HTTPResp a -> Int
forall (a :: TriggerTypes). HTTPResp a -> Int
hrsStatus HTTPResp a
errResp
ScheduledEventId
-> Maybe Int
-> [HeaderConf]
-> SerializableBlob
-> [HeaderConf]
-> Value
-> Invocation 'ScheduledType
mkInvocation ScheduledEventId
eventId (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
respStatus) [HeaderConf]
decodedHeaders SerializableBlob
respPayload [HeaderConf]
respHeaders Value
reqJson
HOther String
detail -> do
let errMsg :: SerializableBlob
errMsg = (ByteString -> SerializableBlob
SB.fromLBS (ByteString -> SerializableBlob) -> ByteString -> SerializableBlob
forall a b. (a -> b) -> a -> b
$ String -> ByteString
forall a. ToJSON a => a -> ByteString
J.encode String
detail)
ScheduledEventId
-> Maybe Int
-> [HeaderConf]
-> SerializableBlob
-> [HeaderConf]
-> Value
-> Invocation 'ScheduledType
mkInvocation ScheduledEventId
eventId (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
500) [HeaderConf]
decodedHeaders SerializableBlob
errMsg [] Value
reqJson
m (Either QErr ()) -> m ()
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM (m (Either QErr ()) -> m ()) -> m (Either QErr ()) -> m ()
forall a b. (a -> b) -> a -> b
$ Invocation 'ScheduledType
-> ScheduledEventType -> m (Either QErr ())
forall (m :: * -> *).
MonadMetadataStorage m =>
Invocation 'ScheduledType
-> ScheduledEventType -> m (Either QErr ())
insertScheduledEventInvocation Invocation 'ScheduledType
invocation ScheduledEventType
type'
ScheduledEventId
-> RetryContext
-> HTTPErr a
-> ScheduledEventType
-> ScheduledTriggerMetrics
-> m ()
forall (m :: * -> *) (a :: TriggerTypes).
(MonadIO m, MonadMetadataStorage m, MonadError QErr m) =>
ScheduledEventId
-> RetryContext
-> HTTPErr a
-> ScheduledEventType
-> ScheduledTriggerMetrics
-> m ()
retryOrMarkError ScheduledEventId
eventId RetryContext
retryCtx HTTPErr a
err ScheduledEventType
type' ScheduledTriggerMetrics
scheduledTriggerMetric
retryOrMarkError ::
(MonadIO m, MonadMetadataStorage m, MonadError QErr m) =>
ScheduledEventId ->
RetryContext ->
HTTPErr a ->
ScheduledEventType ->
ScheduledTriggerMetrics ->
m ()
retryOrMarkError :: forall (m :: * -> *) (a :: TriggerTypes).
(MonadIO m, MonadMetadataStorage m, MonadError QErr m) =>
ScheduledEventId
-> RetryContext
-> HTTPErr a
-> ScheduledEventType
-> ScheduledTriggerMetrics
-> m ()
retryOrMarkError ScheduledEventId
eventId RetryContext
retryCtx HTTPErr a
err ScheduledEventType
type' ScheduledTriggerMetrics
scheduledTriggerMetric = do
let RetryContext Int
tries STRetryConf
retryConf = RetryContext
retryCtx
mRetryHeader :: Maybe Text
mRetryHeader = HTTPErr a -> Maybe Text
forall (a :: TriggerTypes). HTTPErr a -> Maybe Text
getRetryAfterHeaderFromHTTPErr HTTPErr a
err
mRetryHeaderSeconds :: Maybe Int
mRetryHeaderSeconds = Text -> Maybe Int
parseRetryHeaderValue (Text -> Maybe Int) -> Maybe Text -> Maybe Int
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Maybe Text
mRetryHeader
triesExhausted :: Bool
triesExhausted = Int
tries Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= STRetryConf -> Int
strcNumRetries STRetryConf
retryConf
noRetryHeader :: Bool
noRetryHeader = Maybe Int -> Bool
forall a. Maybe a -> Bool
isNothing Maybe Int
mRetryHeaderSeconds
if Bool
triesExhausted Bool -> Bool -> Bool
&& Bool
noRetryHeader
then do
m (Either QErr ()) -> m ()
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM (m (Either QErr ()) -> m ()) -> m (Either QErr ()) -> m ()
forall a b. (a -> b) -> a -> b
$ ScheduledEventId
-> ScheduledEventOp -> ScheduledEventType -> m (Either QErr ())
forall (m :: * -> *).
MonadMetadataStorage m =>
ScheduledEventId
-> ScheduledEventOp -> ScheduledEventType -> m (Either QErr ())
setScheduledEventOp ScheduledEventId
eventId (ScheduledEventStatus -> ScheduledEventOp
SEOpStatus ScheduledEventStatus
SESError) ScheduledEventType
type'
case ScheduledEventType
type' of
ScheduledEventType
Cron -> IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Counter -> IO ()
Prometheus.Counter.inc (ScheduledTriggerMetrics -> Counter
stmCronEventsProcessedTotalFailure ScheduledTriggerMetrics
scheduledTriggerMetric)
ScheduledEventType
OneOff -> IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Counter -> IO ()
Prometheus.Counter.inc (ScheduledTriggerMetrics -> Counter
stmOneOffEventsProcessedTotalFailure ScheduledTriggerMetrics
scheduledTriggerMetric)
else do
UTCTime
currentTime <- IO UTCTime -> m UTCTime
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
let delay :: Int
delay =
Int -> Maybe Int -> Int
forall a. a -> Maybe a -> a
fromMaybe
( DiffTime -> Int
forall b. Integral b => DiffTime -> b
forall a b. (RealFrac a, Integral b) => a -> b
round (DiffTime -> Int) -> DiffTime -> Int
forall a b. (a -> b) -> a -> b
$ Refined NonNegative DiffTime -> DiffTime
forall {k} (p :: k) x. Refined p x -> x
unrefine (STRetryConf -> Refined NonNegative DiffTime
strcRetryIntervalSeconds STRetryConf
retryConf)
)
Maybe Int
mRetryHeaderSeconds
diff :: NominalDiffTime
diff = Int -> NominalDiffTime
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
delay
retryTime :: UTCTime
retryTime = NominalDiffTime -> UTCTime -> UTCTime
addUTCTime NominalDiffTime
diff UTCTime
currentTime
m (Either QErr ()) -> m ()
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM (m (Either QErr ()) -> m ()) -> m (Either QErr ()) -> m ()
forall a b. (a -> b) -> a -> b
$ ScheduledEventId
-> ScheduledEventOp -> ScheduledEventType -> m (Either QErr ())
forall (m :: * -> *).
MonadMetadataStorage m =>
ScheduledEventId
-> ScheduledEventOp -> ScheduledEventType -> m (Either QErr ())
setScheduledEventOp ScheduledEventId
eventId (UTCTime -> ScheduledEventOp
SEOpRetry UTCTime
retryTime) ScheduledEventType
type'
processSuccess ::
(MonadMetadataStorage m, MonadError QErr m, MonadIO m) =>
ScheduledEventId ->
[HeaderConf] ->
ScheduledEventType ->
J.Value ->
HTTPResp a ->
ScheduledTriggerMetrics ->
m ()
processSuccess :: forall (m :: * -> *) (a :: TriggerTypes).
(MonadMetadataStorage m, MonadError QErr m, MonadIO m) =>
ScheduledEventId
-> [HeaderConf]
-> ScheduledEventType
-> Value
-> HTTPResp a
-> ScheduledTriggerMetrics
-> m ()
processSuccess ScheduledEventId
eventId [HeaderConf]
decodedHeaders ScheduledEventType
type' Value
reqBodyJson HTTPResp a
resp ScheduledTriggerMetrics
scheduledTriggerMetric = do
let respBody :: SerializableBlob
respBody = HTTPResp a -> SerializableBlob
forall (a :: TriggerTypes). HTTPResp a -> SerializableBlob
hrsBody HTTPResp a
resp
respHeaders :: [HeaderConf]
respHeaders = HTTPResp a -> [HeaderConf]
forall (a :: TriggerTypes). HTTPResp a -> [HeaderConf]
hrsHeaders HTTPResp a
resp
respStatus :: Int
respStatus = HTTPResp a -> Int
forall (a :: TriggerTypes). HTTPResp a -> Int
hrsStatus HTTPResp a
resp
invocation :: Invocation 'ScheduledType
invocation = ScheduledEventId
-> Maybe Int
-> [HeaderConf]
-> SerializableBlob
-> [HeaderConf]
-> Value
-> Invocation 'ScheduledType
mkInvocation ScheduledEventId
eventId (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
respStatus) [HeaderConf]
decodedHeaders SerializableBlob
respBody [HeaderConf]
respHeaders Value
reqBodyJson
m (Either QErr ()) -> m ()
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM (m (Either QErr ()) -> m ()) -> m (Either QErr ()) -> m ()
forall a b. (a -> b) -> a -> b
$ Invocation 'ScheduledType
-> ScheduledEventType -> m (Either QErr ())
forall (m :: * -> *).
MonadMetadataStorage m =>
Invocation 'ScheduledType
-> ScheduledEventType -> m (Either QErr ())
insertScheduledEventInvocation Invocation 'ScheduledType
invocation ScheduledEventType
type'
m (Either QErr ()) -> m ()
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM (m (Either QErr ()) -> m ()) -> m (Either QErr ()) -> m ()
forall a b. (a -> b) -> a -> b
$ ScheduledEventId
-> ScheduledEventOp -> ScheduledEventType -> m (Either QErr ())
forall (m :: * -> *).
MonadMetadataStorage m =>
ScheduledEventId
-> ScheduledEventOp -> ScheduledEventType -> m (Either QErr ())
setScheduledEventOp ScheduledEventId
eventId (ScheduledEventStatus -> ScheduledEventOp
SEOpStatus ScheduledEventStatus
SESDelivered) ScheduledEventType
type'
case ScheduledEventType
type' of
ScheduledEventType
Cron -> IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Counter -> IO ()
Prometheus.Counter.inc (ScheduledTriggerMetrics -> Counter
stmCronEventsProcessedTotalSuccess ScheduledTriggerMetrics
scheduledTriggerMetric)
ScheduledEventType
OneOff -> IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Counter -> IO ()
Prometheus.Counter.inc (ScheduledTriggerMetrics -> Counter
stmOneOffEventsProcessedTotalSuccess ScheduledTriggerMetrics
scheduledTriggerMetric)
processDead ::
(MonadMetadataStorage m, MonadError QErr m) =>
ScheduledEventId ->
ScheduledEventType ->
m ()
processDead :: forall (m :: * -> *).
(MonadMetadataStorage m, MonadError QErr m) =>
ScheduledEventId -> ScheduledEventType -> m ()
processDead ScheduledEventId
eventId ScheduledEventType
type' =
m (Either QErr ()) -> m ()
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM (m (Either QErr ()) -> m ()) -> m (Either QErr ()) -> m ()
forall a b. (a -> b) -> a -> b
$ ScheduledEventId
-> ScheduledEventOp -> ScheduledEventType -> m (Either QErr ())
forall (m :: * -> *).
MonadMetadataStorage m =>
ScheduledEventId
-> ScheduledEventOp -> ScheduledEventType -> m (Either QErr ())
setScheduledEventOp ScheduledEventId
eventId (ScheduledEventStatus -> ScheduledEventOp
SEOpStatus ScheduledEventStatus
SESDead) ScheduledEventType
type'
mkInvocation ::
ScheduledEventId ->
Maybe Int ->
[HeaderConf] ->
SB.SerializableBlob ->
[HeaderConf] ->
J.Value ->
(Invocation 'ScheduledType)
mkInvocation :: ScheduledEventId
-> Maybe Int
-> [HeaderConf]
-> SerializableBlob
-> [HeaderConf]
-> Value
-> Invocation 'ScheduledType
mkInvocation ScheduledEventId
eventId Maybe Int
status [HeaderConf]
reqHeaders SerializableBlob
respBody [HeaderConf]
respHeaders Value
reqBodyJson =
ScheduledEventId
-> Maybe Int
-> WebhookRequest
-> Response 'ScheduledType
-> Invocation 'ScheduledType
forall (a :: TriggerTypes).
ScheduledEventId
-> Maybe Int -> WebhookRequest -> Response a -> Invocation a
Invocation
ScheduledEventId
eventId
Maybe Int
status
(Value -> [HeaderConf] -> Text -> WebhookRequest
mkWebhookReq Value
reqBodyJson [HeaderConf]
reqHeaders Text
invocationVersionST)
(Maybe Int
-> SerializableBlob -> [HeaderConf] -> Response 'ScheduledType
forall (a :: TriggerTypes).
Maybe Int -> SerializableBlob -> [HeaderConf] -> Response a
mkInvocationResp Maybe Int
status SerializableBlob
respBody [HeaderConf]
respHeaders)
getDeprivedCronTriggerStatsTx :: [TriggerName] -> PG.TxE QErr [CronTriggerStats]
getDeprivedCronTriggerStatsTx :: [TriggerName] -> TxE QErr [CronTriggerStats]
getDeprivedCronTriggerStatsTx [TriggerName]
cronTriggerNames =
((TriggerName, Int, UTCTime) -> CronTriggerStats)
-> [(TriggerName, Int, UTCTime)] -> [CronTriggerStats]
forall a b. (a -> b) -> [a] -> [b]
map (\(TriggerName
n, Int
count, UTCTime
maxTx) -> TriggerName -> Int -> UTCTime -> CronTriggerStats
CronTriggerStats TriggerName
n Int
count UTCTime
maxTx)
([(TriggerName, Int, UTCTime)] -> [CronTriggerStats])
-> TxET QErr IO [(TriggerName, Int, UTCTime)]
-> TxE QErr [CronTriggerStats]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> Identity PGTextArray
-> Bool
-> TxET QErr IO [(TriggerName, Int, UTCTime)]
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
PG.withQE
PGTxErr -> QErr
defaultTxErrorHandler
[PG.sql|
SELECT t.trigger_name, coalesce(q.upcoming_events_count, 0), coalesce(q.max_scheduled_time, now())
FROM (SELECT UNNEST ($1::text[]) as trigger_name) as t
LEFT JOIN
( SELECT
trigger_name,
count(1) as upcoming_events_count,
max(scheduled_time) as max_scheduled_time
FROM hdb_catalog.hdb_cron_events
WHERE tries = 0 and status = 'scheduled'
GROUP BY trigger_name
) AS q
ON t.trigger_name = q.trigger_name
WHERE coalesce(q.upcoming_events_count, 0) < 100
|]
(PGTextArray -> Identity PGTextArray
forall a. a -> Identity a
Identity (PGTextArray -> Identity PGTextArray)
-> PGTextArray -> Identity PGTextArray
forall a b. (a -> b) -> a -> b
$ [Text] -> PGTextArray
PGTextArray ([Text] -> PGTextArray) -> [Text] -> PGTextArray
forall a b. (a -> b) -> a -> b
$ (TriggerName -> Text) -> [TriggerName] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map TriggerName -> Text
triggerNameToTxt [TriggerName]
cronTriggerNames)
Bool
True
getScheduledEventsForDeliveryTx :: [TriggerName] -> PG.TxE QErr ([CronEvent], [OneOffScheduledEvent])
getScheduledEventsForDeliveryTx :: [TriggerName] -> TxE QErr ([CronEvent], [OneOffScheduledEvent])
getScheduledEventsForDeliveryTx [TriggerName]
cronTriggerNames =
(,) ([CronEvent]
-> [OneOffScheduledEvent] -> ([CronEvent], [OneOffScheduledEvent]))
-> TxET QErr IO [CronEvent]
-> TxET
QErr
IO
([OneOffScheduledEvent] -> ([CronEvent], [OneOffScheduledEvent]))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TxET QErr IO [CronEvent]
getCronEventsForDelivery TxET
QErr
IO
([OneOffScheduledEvent] -> ([CronEvent], [OneOffScheduledEvent]))
-> TxET QErr IO [OneOffScheduledEvent]
-> TxE QErr ([CronEvent], [OneOffScheduledEvent])
forall a b.
TxET QErr IO (a -> b) -> TxET QErr IO a -> TxET QErr IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> TxET QErr IO [OneOffScheduledEvent]
getOneOffEventsForDelivery
where
getCronEventsForDelivery :: PG.TxE QErr [CronEvent]
getCronEventsForDelivery :: TxET QErr IO [CronEvent]
getCronEventsForDelivery =
(Identity (ViaJSON CronEvent) -> CronEvent)
-> [Identity (ViaJSON CronEvent)] -> [CronEvent]
forall a b. (a -> b) -> [a] -> [b]
map (ViaJSON CronEvent -> CronEvent
forall a. ViaJSON a -> a
PG.getViaJSON (ViaJSON CronEvent -> CronEvent)
-> (Identity (ViaJSON CronEvent) -> ViaJSON CronEvent)
-> Identity (ViaJSON CronEvent)
-> CronEvent
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Identity (ViaJSON CronEvent) -> ViaJSON CronEvent
forall a. Identity a -> a
runIdentity)
([Identity (ViaJSON CronEvent)] -> [CronEvent])
-> TxET QErr IO [Identity (ViaJSON CronEvent)]
-> TxET QErr IO [CronEvent]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> Identity PGTextArray
-> Bool
-> TxET QErr IO [Identity (ViaJSON CronEvent)]
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
PG.withQE
PGTxErr -> QErr
defaultTxErrorHandler
[PG.sql|
WITH cte AS
( UPDATE hdb_catalog.hdb_cron_events
SET status = 'locked'
WHERE id IN ( SELECT t.id
FROM hdb_catalog.hdb_cron_events t
WHERE ( t.status = 'scheduled'
and (
(t.next_retry_at is NULL and t.scheduled_time <= now()) or
(t.next_retry_at is not NULL and t.next_retry_at <= now())
)
) AND trigger_name = ANY($1)
FOR UPDATE SKIP LOCKED
)
RETURNING *
)
SELECT row_to_json(t.*) FROM cte AS t
|]
(PGTextArray -> Identity PGTextArray
forall a. a -> Identity a
Identity (PGTextArray -> Identity PGTextArray)
-> PGTextArray -> Identity PGTextArray
forall a b. (a -> b) -> a -> b
$ [Text] -> PGTextArray
PGTextArray ([Text] -> PGTextArray) -> [Text] -> PGTextArray
forall a b. (a -> b) -> a -> b
$ TriggerName -> Text
triggerNameToTxt (TriggerName -> Text) -> [TriggerName] -> [Text]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [TriggerName]
cronTriggerNames)
Bool
True
getOneOffEventsForDelivery :: PG.TxE QErr [OneOffScheduledEvent]
getOneOffEventsForDelivery :: TxET QErr IO [OneOffScheduledEvent]
getOneOffEventsForDelivery = do
(Identity (ViaJSON OneOffScheduledEvent) -> OneOffScheduledEvent)
-> [Identity (ViaJSON OneOffScheduledEvent)]
-> [OneOffScheduledEvent]
forall a b. (a -> b) -> [a] -> [b]
map (ViaJSON OneOffScheduledEvent -> OneOffScheduledEvent
forall a. ViaJSON a -> a
PG.getViaJSON (ViaJSON OneOffScheduledEvent -> OneOffScheduledEvent)
-> (Identity (ViaJSON OneOffScheduledEvent)
-> ViaJSON OneOffScheduledEvent)
-> Identity (ViaJSON OneOffScheduledEvent)
-> OneOffScheduledEvent
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Identity (ViaJSON OneOffScheduledEvent)
-> ViaJSON OneOffScheduledEvent
forall a. Identity a -> a
runIdentity)
([Identity (ViaJSON OneOffScheduledEvent)]
-> [OneOffScheduledEvent])
-> TxET QErr IO [Identity (ViaJSON OneOffScheduledEvent)]
-> TxET QErr IO [OneOffScheduledEvent]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> ()
-> Bool
-> TxET QErr IO [Identity (ViaJSON OneOffScheduledEvent)]
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
PG.withQE
PGTxErr -> QErr
defaultTxErrorHandler
[PG.sql|
WITH cte AS (
UPDATE hdb_catalog.hdb_scheduled_events
SET status = 'locked'
WHERE id IN ( SELECT t.id
FROM hdb_catalog.hdb_scheduled_events t
WHERE ( t.status = 'scheduled'
and (
(t.next_retry_at is NULL and t.scheduled_time <= now()) or
(t.next_retry_at is not NULL and t.next_retry_at <= now())
)
)
FOR UPDATE SKIP LOCKED
)
RETURNING *
)
SELECT row_to_json(t.*) FROM cte AS t
|]
()
Bool
False
insertInvocationTx :: Invocation 'ScheduledType -> ScheduledEventType -> PG.TxE QErr ()
insertInvocationTx :: Invocation 'ScheduledType -> ScheduledEventType -> TxE QErr ()
insertInvocationTx Invocation 'ScheduledType
invo ScheduledEventType
type' = do
case ScheduledEventType
type' of
ScheduledEventType
Cron -> do
(PGTxErr -> QErr)
-> Query
-> (ScheduledEventId, Maybe Int64, ViaJSON Value, ViaJSON Value)
-> Bool
-> TxE QErr ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE
PGTxErr -> QErr
defaultTxErrorHandler
[PG.sql|
INSERT INTO hdb_catalog.hdb_cron_event_invocation_logs
(event_id, status, request, response)
VALUES ($1, $2, $3, $4)
|]
( Invocation 'ScheduledType -> ScheduledEventId
forall (a :: TriggerTypes). Invocation a -> ScheduledEventId
iEventId Invocation 'ScheduledType
invo,
Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Int64) -> Maybe Int -> Maybe Int64
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Invocation 'ScheduledType -> Maybe Int
forall (a :: TriggerTypes). Invocation a -> Maybe Int
iStatus Invocation 'ScheduledType
invo :: Maybe Int64,
Value -> ViaJSON Value
forall a. a -> ViaJSON a
PG.ViaJSON (Value -> ViaJSON Value) -> Value -> ViaJSON Value
forall a b. (a -> b) -> a -> b
$ WebhookRequest -> Value
forall a. ToJSON a => a -> Value
J.toJSON (WebhookRequest -> Value) -> WebhookRequest -> Value
forall a b. (a -> b) -> a -> b
$ Invocation 'ScheduledType -> WebhookRequest
forall (a :: TriggerTypes). Invocation a -> WebhookRequest
iRequest Invocation 'ScheduledType
invo,
Value -> ViaJSON Value
forall a. a -> ViaJSON a
PG.ViaJSON (Value -> ViaJSON Value) -> Value -> ViaJSON Value
forall a b. (a -> b) -> a -> b
$ Response 'ScheduledType -> Value
forall a. ToJSON a => a -> Value
J.toJSON (Response 'ScheduledType -> Value)
-> Response 'ScheduledType -> Value
forall a b. (a -> b) -> a -> b
$ Invocation 'ScheduledType -> Response 'ScheduledType
forall (a :: TriggerTypes). Invocation a -> Response a
iResponse Invocation 'ScheduledType
invo
)
Bool
True
(PGTxErr -> QErr)
-> Query -> Identity ScheduledEventId -> Bool -> TxE QErr ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE
PGTxErr -> QErr
defaultTxErrorHandler
[PG.sql|
UPDATE hdb_catalog.hdb_cron_events
SET tries = tries + 1
WHERE id = $1
|]
(ScheduledEventId -> Identity ScheduledEventId
forall a. a -> Identity a
Identity (ScheduledEventId -> Identity ScheduledEventId)
-> ScheduledEventId -> Identity ScheduledEventId
forall a b. (a -> b) -> a -> b
$ Invocation 'ScheduledType -> ScheduledEventId
forall (a :: TriggerTypes). Invocation a -> ScheduledEventId
iEventId Invocation 'ScheduledType
invo)
Bool
True
ScheduledEventType
OneOff -> do
(PGTxErr -> QErr)
-> Query
-> (ScheduledEventId, Maybe Int64, ViaJSON Value, ViaJSON Value)
-> Bool
-> TxE QErr ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE
PGTxErr -> QErr
defaultTxErrorHandler
[PG.sql|
INSERT INTO hdb_catalog.hdb_scheduled_event_invocation_logs
(event_id, status, request, response)
VALUES ($1, $2, $3, $4)
|]
( Invocation 'ScheduledType -> ScheduledEventId
forall (a :: TriggerTypes). Invocation a -> ScheduledEventId
iEventId Invocation 'ScheduledType
invo,
Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Int64) -> Maybe Int -> Maybe Int64
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Invocation 'ScheduledType -> Maybe Int
forall (a :: TriggerTypes). Invocation a -> Maybe Int
iStatus Invocation 'ScheduledType
invo :: Maybe Int64,
Value -> ViaJSON Value
forall a. a -> ViaJSON a
PG.ViaJSON (Value -> ViaJSON Value) -> Value -> ViaJSON Value
forall a b. (a -> b) -> a -> b
$ WebhookRequest -> Value
forall a. ToJSON a => a -> Value
J.toJSON (WebhookRequest -> Value) -> WebhookRequest -> Value
forall a b. (a -> b) -> a -> b
$ Invocation 'ScheduledType -> WebhookRequest
forall (a :: TriggerTypes). Invocation a -> WebhookRequest
iRequest Invocation 'ScheduledType
invo,
Value -> ViaJSON Value
forall a. a -> ViaJSON a
PG.ViaJSON (Value -> ViaJSON Value) -> Value -> ViaJSON Value
forall a b. (a -> b) -> a -> b
$ Response 'ScheduledType -> Value
forall a. ToJSON a => a -> Value
J.toJSON (Response 'ScheduledType -> Value)
-> Response 'ScheduledType -> Value
forall a b. (a -> b) -> a -> b
$ Invocation 'ScheduledType -> Response 'ScheduledType
forall (a :: TriggerTypes). Invocation a -> Response a
iResponse Invocation 'ScheduledType
invo
)
Bool
True
(PGTxErr -> QErr)
-> Query -> Identity ScheduledEventId -> Bool -> TxE QErr ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE
PGTxErr -> QErr
defaultTxErrorHandler
[PG.sql|
UPDATE hdb_catalog.hdb_scheduled_events
SET tries = tries + 1
WHERE id = $1
|]
(ScheduledEventId -> Identity ScheduledEventId
forall a. a -> Identity a
Identity (ScheduledEventId -> Identity ScheduledEventId)
-> ScheduledEventId -> Identity ScheduledEventId
forall a b. (a -> b) -> a -> b
$ Invocation 'ScheduledType -> ScheduledEventId
forall (a :: TriggerTypes). Invocation a -> ScheduledEventId
iEventId Invocation 'ScheduledType
invo)
Bool
True
setScheduledEventOpTx ::
ScheduledEventId -> ScheduledEventOp -> ScheduledEventType -> PG.TxE QErr ()
setScheduledEventOpTx :: ScheduledEventId
-> ScheduledEventOp -> ScheduledEventType -> TxE QErr ()
setScheduledEventOpTx ScheduledEventId
eventId ScheduledEventOp
op ScheduledEventType
type' = case ScheduledEventOp
op of
SEOpRetry UTCTime
time -> UTCTime -> TxE QErr ()
setRetry UTCTime
time
SEOpStatus ScheduledEventStatus
status -> ScheduledEventStatus -> TxE QErr ()
setStatus ScheduledEventStatus
status
where
setRetry :: UTCTime -> TxE QErr ()
setRetry UTCTime
time =
case ScheduledEventType
type' of
ScheduledEventType
Cron ->
(PGTxErr -> QErr)
-> Query -> (UTCTime, ScheduledEventId) -> Bool -> TxE QErr ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE
PGTxErr -> QErr
defaultTxErrorHandler
[PG.sql|
UPDATE hdb_catalog.hdb_cron_events
SET next_retry_at = $1,
STATUS = 'scheduled'
WHERE id = $2
|]
(UTCTime
time, ScheduledEventId
eventId)
Bool
True
ScheduledEventType
OneOff ->
(PGTxErr -> QErr)
-> Query -> (UTCTime, ScheduledEventId) -> Bool -> TxE QErr ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE
PGTxErr -> QErr
defaultTxErrorHandler
[PG.sql|
UPDATE hdb_catalog.hdb_scheduled_events
SET next_retry_at = $1,
STATUS = 'scheduled'
WHERE id = $2
|]
(UTCTime
time, ScheduledEventId
eventId)
Bool
True
setStatus :: ScheduledEventStatus -> TxE QErr ()
setStatus ScheduledEventStatus
status =
case ScheduledEventType
type' of
ScheduledEventType
Cron -> do
(PGTxErr -> QErr)
-> Query
-> (ScheduledEventId, ScheduledEventStatus)
-> Bool
-> TxE QErr ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE
PGTxErr -> QErr
defaultTxErrorHandler
[PG.sql|
UPDATE hdb_catalog.hdb_cron_events
SET status = $2
WHERE id = $1
|]
(ScheduledEventId
eventId, ScheduledEventStatus
status)
Bool
True
ScheduledEventType
OneOff -> do
(PGTxErr -> QErr)
-> Query
-> (ScheduledEventId, ScheduledEventStatus)
-> Bool
-> TxE QErr ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE
PGTxErr -> QErr
defaultTxErrorHandler
[PG.sql|
UPDATE hdb_catalog.hdb_scheduled_events
SET status = $2
WHERE id = $1
|]
(ScheduledEventId
eventId, ScheduledEventStatus
status)
Bool
True
unlockScheduledEventsTx :: ScheduledEventType -> [ScheduledEventId] -> PG.TxE QErr Int
unlockScheduledEventsTx :: ScheduledEventType -> [ScheduledEventId] -> TxE QErr Int
unlockScheduledEventsTx ScheduledEventType
type' [ScheduledEventId]
eventIds =
let eventIdsTextArray :: [Text]
eventIdsTextArray = (ScheduledEventId -> Text) -> [ScheduledEventId] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map ScheduledEventId -> Text
unEventId [ScheduledEventId]
eventIds
in case ScheduledEventType
type' of
ScheduledEventType
Cron ->
(Identity Int -> Int
forall a. Identity a -> a
runIdentity (Identity Int -> Int)
-> (SingleRow (Identity Int) -> Identity Int)
-> SingleRow (Identity Int)
-> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SingleRow (Identity Int) -> Identity Int
forall a. SingleRow a -> a
PG.getRow)
(SingleRow (Identity Int) -> Int)
-> TxET QErr IO (SingleRow (Identity Int)) -> TxE QErr Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> Identity PGTextArray
-> Bool
-> TxET QErr IO (SingleRow (Identity Int))
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
PG.withQE
PGTxErr -> QErr
defaultTxErrorHandler
[PG.sql|
WITH "cte" AS
(UPDATE hdb_catalog.hdb_cron_events
SET status = 'scheduled'
WHERE id = ANY($1::text[]) and status = 'locked'
RETURNING *)
SELECT count(*) FROM "cte"
|]
(PGTextArray -> Identity PGTextArray
forall a. a -> Identity a
Identity (PGTextArray -> Identity PGTextArray)
-> PGTextArray -> Identity PGTextArray
forall a b. (a -> b) -> a -> b
$ [Text] -> PGTextArray
PGTextArray [Text]
eventIdsTextArray)
Bool
True
ScheduledEventType
OneOff ->
(Identity Int -> Int
forall a. Identity a -> a
runIdentity (Identity Int -> Int)
-> (SingleRow (Identity Int) -> Identity Int)
-> SingleRow (Identity Int)
-> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SingleRow (Identity Int) -> Identity Int
forall a. SingleRow a -> a
PG.getRow)
(SingleRow (Identity Int) -> Int)
-> TxET QErr IO (SingleRow (Identity Int)) -> TxE QErr Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> Identity PGTextArray
-> Bool
-> TxET QErr IO (SingleRow (Identity Int))
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
PG.withQE
PGTxErr -> QErr
defaultTxErrorHandler
[PG.sql|
WITH "cte" AS
(UPDATE hdb_catalog.hdb_scheduled_events
SET status = 'scheduled'
WHERE id = ANY($1::text[]) AND status = 'locked'
RETURNING *)
SELECT count(*) FROM "cte"
|]
(PGTextArray -> Identity PGTextArray
forall a. a -> Identity a
Identity (PGTextArray -> Identity PGTextArray)
-> PGTextArray -> Identity PGTextArray
forall a b. (a -> b) -> a -> b
$ [Text] -> PGTextArray
PGTextArray [Text]
eventIdsTextArray)
Bool
True
unlockAllLockedScheduledEventsTx :: PG.TxE QErr ()
unlockAllLockedScheduledEventsTx :: TxE QErr ()
unlockAllLockedScheduledEventsTx = do
(PGTxErr -> QErr) -> Query -> () -> Bool -> TxE QErr ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE
PGTxErr -> QErr
defaultTxErrorHandler
[PG.sql|
UPDATE hdb_catalog.hdb_cron_events
SET status = 'scheduled'
WHERE status = 'locked'
|]
()
Bool
True
(PGTxErr -> QErr) -> Query -> () -> Bool -> TxE QErr ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE
PGTxErr -> QErr
defaultTxErrorHandler
[PG.sql|
UPDATE hdb_catalog.hdb_scheduled_events
SET status = 'scheduled'
WHERE status = 'locked'
|]
()
Bool
True
insertCronEventsTx :: [CronEventSeed] -> PG.TxE QErr ()
insertCronEventsTx :: [CronEventSeed] -> TxE QErr ()
insertCronEventsTx [CronEventSeed]
cronSeeds = do
let insertCronEventsSql :: Text
insertCronEventsSql =
Builder -> Text
TB.run
(Builder -> Text) -> Builder -> Text
forall a b. (a -> b) -> a -> b
$ SQLInsert -> Builder
forall a. ToSQL a => a -> Builder
toSQL
S.SQLInsert
{ siTable :: QualifiedTable
siTable = QualifiedTable
cronEventsTable,
siCols :: [PGCol]
siCols = (Text -> PGCol) -> [Text] -> [PGCol]
forall a b. (a -> b) -> [a] -> [b]
map Text -> PGCol
unsafePGCol [Text
"trigger_name", Text
"scheduled_time"],
siValues :: ValuesExp
siValues = [TupleExp] -> ValuesExp
S.ValuesExp ([TupleExp] -> ValuesExp) -> [TupleExp] -> ValuesExp
forall a b. (a -> b) -> a -> b
$ (CronEventSeed -> TupleExp) -> [CronEventSeed] -> [TupleExp]
forall a b. (a -> b) -> [a] -> [b]
map ([Text] -> TupleExp
toTupleExp ([Text] -> TupleExp)
-> (CronEventSeed -> [Text]) -> CronEventSeed -> TupleExp
forall b c a. (b -> c) -> (a -> b) -> a -> c
. CronEventSeed -> [Text]
toArr) [CronEventSeed]
cronSeeds,
siConflict :: Maybe SQLConflict
siConflict = SQLConflict -> Maybe SQLConflict
forall a. a -> Maybe a
Just (SQLConflict -> Maybe SQLConflict)
-> SQLConflict -> Maybe SQLConflict
forall a b. (a -> b) -> a -> b
$ Maybe SQLConflictTarget -> SQLConflict
S.DoNothing Maybe SQLConflictTarget
forall a. Maybe a
Nothing,
siRet :: Maybe RetExp
siRet = Maybe RetExp
forall a. Maybe a
Nothing
}
(PGTxErr -> QErr) -> Query -> () -> Bool -> TxE QErr ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE PGTxErr -> QErr
defaultTxErrorHandler (Text -> Query
PG.fromText Text
insertCronEventsSql) () Bool
False
where
toArr :: CronEventSeed -> [Text]
toArr (CronEventSeed TriggerName
n UTCTime
t) = [(TriggerName -> Text
triggerNameToTxt TriggerName
n), (UTCTime -> Text
formatTime' UTCTime
t)]
toTupleExp :: [Text] -> TupleExp
toTupleExp = [SQLExp] -> TupleExp
S.TupleExp ([SQLExp] -> TupleExp)
-> ([Text] -> [SQLExp]) -> [Text] -> TupleExp
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Text -> SQLExp) -> [Text] -> [SQLExp]
forall a b. (a -> b) -> [a] -> [b]
map Text -> SQLExp
S.SELit
insertOneOffScheduledEventTx :: OneOffEvent -> PG.TxE QErr EventId
insertOneOffScheduledEventTx :: OneOffEvent -> TxE QErr ScheduledEventId
insertOneOffScheduledEventTx CreateScheduledEvent {[HeaderConf]
Maybe Text
Maybe Value
Maybe RequestTransform
Maybe MetadataResponseTransform
UTCTime
InputWebhook
STRetryConf
cseWebhook :: InputWebhook
cseScheduleAt :: UTCTime
csePayload :: Maybe Value
cseHeaders :: [HeaderConf]
cseRetryConf :: STRetryConf
cseComment :: Maybe Text
cseRequestTransform :: Maybe RequestTransform
cseResponseTransform :: Maybe MetadataResponseTransform
cseWebhook :: OneOffEvent -> InputWebhook
cseScheduleAt :: OneOffEvent -> UTCTime
csePayload :: OneOffEvent -> Maybe Value
cseHeaders :: OneOffEvent -> [HeaderConf]
cseRetryConf :: OneOffEvent -> STRetryConf
cseComment :: OneOffEvent -> Maybe Text
cseRequestTransform :: OneOffEvent -> Maybe RequestTransform
cseResponseTransform :: OneOffEvent -> Maybe MetadataResponseTransform
..} =
Identity ScheduledEventId -> ScheduledEventId
forall a. Identity a -> a
runIdentity
(Identity ScheduledEventId -> ScheduledEventId)
-> (SingleRow (Identity ScheduledEventId)
-> Identity ScheduledEventId)
-> SingleRow (Identity ScheduledEventId)
-> ScheduledEventId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SingleRow (Identity ScheduledEventId) -> Identity ScheduledEventId
forall a. SingleRow a -> a
PG.getRow
(SingleRow (Identity ScheduledEventId) -> ScheduledEventId)
-> TxET QErr IO (SingleRow (Identity ScheduledEventId))
-> TxE QErr ScheduledEventId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> (ViaJSON InputWebhook, UTCTime, ViaJSON (Maybe Value),
ViaJSON STRetryConf, ViaJSON [HeaderConf], Maybe Text)
-> Bool
-> TxET QErr IO (SingleRow (Identity ScheduledEventId))
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
PG.withQE
PGTxErr -> QErr
defaultTxErrorHandler
[PG.sql|
INSERT INTO hdb_catalog.hdb_scheduled_events
(webhook_conf,scheduled_time,payload,retry_conf,header_conf,comment)
VALUES
($1, $2, $3, $4, $5, $6) RETURNING id
|]
( InputWebhook -> ViaJSON InputWebhook
forall a. a -> ViaJSON a
PG.ViaJSON InputWebhook
cseWebhook,
UTCTime
cseScheduleAt,
Maybe Value -> ViaJSON (Maybe Value)
forall a. a -> ViaJSON a
PG.ViaJSON Maybe Value
csePayload,
STRetryConf -> ViaJSON STRetryConf
forall a. a -> ViaJSON a
PG.ViaJSON STRetryConf
cseRetryConf,
[HeaderConf] -> ViaJSON [HeaderConf]
forall a. a -> ViaJSON a
PG.ViaJSON [HeaderConf]
cseHeaders,
Maybe Text
cseComment
)
Bool
False
dropFutureCronEventsTx :: ClearCronEvents -> PG.TxE QErr ()
dropFutureCronEventsTx :: ClearCronEvents -> TxE QErr ()
dropFutureCronEventsTx = \case
SingleCronTrigger TriggerName
triggerName ->
(PGTxErr -> QErr)
-> Query -> Identity TriggerName -> Bool -> TxE QErr ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE
PGTxErr -> QErr
defaultTxErrorHandler
[PG.sql|
DELETE FROM hdb_catalog.hdb_cron_events
WHERE trigger_name = $1 AND scheduled_time > now() AND tries = 0
|]
(TriggerName -> Identity TriggerName
forall a. a -> Identity a
Identity TriggerName
triggerName)
Bool
True
MetadataCronTriggers [TriggerName]
triggerNames ->
(PGTxErr -> QErr)
-> Query -> Identity PGTextArray -> Bool -> TxE QErr ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE
PGTxErr -> QErr
defaultTxErrorHandler
[PG.sql|
DELETE FROM hdb_catalog.hdb_cron_events
WHERE scheduled_time > now() AND tries = 0 AND trigger_name = ANY($1::text[])
|]
(PGTextArray -> Identity PGTextArray
forall a. a -> Identity a
Identity (PGTextArray -> Identity PGTextArray)
-> PGTextArray -> Identity PGTextArray
forall a b. (a -> b) -> a -> b
$ [Text] -> PGTextArray
PGTextArray ([Text] -> PGTextArray) -> [Text] -> PGTextArray
forall a b. (a -> b) -> a -> b
$ (TriggerName -> Text) -> [TriggerName] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map TriggerName -> Text
triggerNameToTxt [TriggerName]
triggerNames)
Bool
False
cronEventsTable :: QualifiedTable
cronEventsTable :: QualifiedTable
cronEventsTable =
SchemaName -> TableName -> QualifiedTable
forall a. SchemaName -> a -> QualifiedObject a
QualifiedObject SchemaName
"hdb_catalog" (TableName -> QualifiedTable) -> TableName -> QualifiedTable
forall a b. (a -> b) -> a -> b
$ Text -> TableName
TableName Text
"hdb_cron_events"
mkScheduledEventStatusFilter :: [ScheduledEventStatus] -> S.BoolExp
mkScheduledEventStatusFilter :: [ScheduledEventStatus] -> BoolExp
mkScheduledEventStatusFilter = \case
[] -> Bool -> BoolExp
S.BELit Bool
True
[ScheduledEventStatus]
v ->
SQLExp -> [SQLExp] -> BoolExp
S.BEIN (Identifier -> SQLExp
S.SEIdentifier (Identifier -> SQLExp) -> Identifier -> SQLExp
forall a b. (a -> b) -> a -> b
$ Text -> Identifier
Identifier Text
"status")
([SQLExp] -> BoolExp) -> [SQLExp] -> BoolExp
forall a b. (a -> b) -> a -> b
$ (ScheduledEventStatus -> SQLExp)
-> [ScheduledEventStatus] -> [SQLExp]
forall a b. (a -> b) -> [a] -> [b]
map (Text -> SQLExp
S.SELit (Text -> SQLExp)
-> (ScheduledEventStatus -> Text) -> ScheduledEventStatus -> SQLExp
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ScheduledEventStatus -> Text
scheduledEventStatusToText) [ScheduledEventStatus]
v
scheduledTimeOrderBy :: S.OrderByExp
scheduledTimeOrderBy :: OrderByExp
scheduledTimeOrderBy =
let scheduledTimeCol :: SQLExp
scheduledTimeCol = Identifier -> SQLExp
S.SEIdentifier (Identifier -> SQLExp) -> Identifier -> SQLExp
forall a b. (a -> b) -> a -> b
$ Text -> Identifier
Identifier Text
"scheduled_time"
in NonEmpty OrderByItem -> OrderByExp
S.OrderByExp
(NonEmpty OrderByItem -> OrderByExp)
-> NonEmpty OrderByItem -> OrderByExp
forall a b. (a -> b) -> a -> b
$ (OrderByItem -> [OrderByItem] -> NonEmpty OrderByItem)
-> [OrderByItem] -> OrderByItem -> NonEmpty OrderByItem
forall a b c. (a -> b -> c) -> b -> a -> c
flip OrderByItem -> [OrderByItem] -> NonEmpty OrderByItem
forall a. a -> [a] -> NonEmpty a
(NE.:|) []
(OrderByItem -> NonEmpty OrderByItem)
-> OrderByItem -> NonEmpty OrderByItem
forall a b. (a -> b) -> a -> b
$ SQLExp -> Maybe OrderType -> Maybe NullsOrder -> OrderByItem
S.OrderByItem
SQLExp
scheduledTimeCol
(OrderType -> Maybe OrderType
forall a. a -> Maybe a
Just OrderType
S.OTAsc)
Maybe NullsOrder
forall a. Maybe a
Nothing
mkPaginationSelectExp ::
S.Select ->
ScheduledEventPagination ->
RowsCountOption ->
S.Select
Select
allRowsSelect ScheduledEventPagination {Maybe Int
_sepLimit :: Maybe Int
_sepOffset :: Maybe Int
_sepLimit :: ScheduledEventPagination -> Maybe Int
_sepOffset :: ScheduledEventPagination -> Maybe Int
..} RowsCountOption
shouldIncludeRowsCount =
Select
S.mkSelect
{ selCTEs :: [(TableAlias, InnerCTE)]
S.selCTEs = [(TableAlias
countCteAlias, Select -> InnerCTE
S.ICTESelect Select
allRowsSelect), (TableAlias
limitCteAlias, InnerCTE
limitCteSelect)],
selExtr :: [Extractor]
S.selExtr =
case RowsCountOption
shouldIncludeRowsCount of
RowsCountOption
IncludeRowsCount -> [Extractor
countExtractor, Extractor
rowsExtractor]
RowsCountOption
DontIncludeRowsCount -> [Extractor
rowsExtractor]
}
where
countCteAlias :: TableAlias
countCteAlias = Text -> TableAlias
S.mkTableAlias Text
"count_cte"
limitCteAlias :: TableAlias
limitCteAlias = Text -> TableAlias
S.mkTableAlias Text
"limit_cte"
countExtractor :: Extractor
countExtractor =
let selectExp :: Select
selectExp =
Select
S.mkSelect
{ selExtr :: [Extractor]
S.selExtr = [SQLExp -> Maybe ColumnAlias -> Extractor
S.Extractor SQLExp
S.countStar Maybe ColumnAlias
forall a. Maybe a
Nothing],
selFrom :: Maybe FromExp
S.selFrom = FromExp -> Maybe FromExp
forall a. a -> Maybe a
Just (FromExp -> Maybe FromExp) -> FromExp -> Maybe FromExp
forall a b. (a -> b) -> a -> b
$ TableIdentifier -> FromExp
S.mkIdenFromExp (TableAlias -> TableIdentifier
S.tableAliasToIdentifier TableAlias
countCteAlias)
}
in SQLExp -> Maybe ColumnAlias -> Extractor
S.Extractor (Select -> SQLExp
S.SESelect Select
selectExp) Maybe ColumnAlias
forall a. Maybe a
Nothing
limitCteSelect :: InnerCTE
limitCteSelect =
Select -> InnerCTE
S.ICTESelect
Select
S.mkSelect
{ selExtr :: [Extractor]
S.selExtr = [Extractor
S.selectStar],
selFrom :: Maybe FromExp
S.selFrom = FromExp -> Maybe FromExp
forall a. a -> Maybe a
Just (FromExp -> Maybe FromExp) -> FromExp -> Maybe FromExp
forall a b. (a -> b) -> a -> b
$ TableIdentifier -> FromExp
S.mkIdenFromExp (TableAlias -> TableIdentifier
S.tableAliasToIdentifier TableAlias
countCteAlias),
selLimit :: Maybe LimitExp
S.selLimit = (SQLExp -> LimitExp
S.LimitExp (SQLExp -> LimitExp) -> (Int -> SQLExp) -> Int -> LimitExp
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> SQLExp
S.intToSQLExp) (Int -> LimitExp) -> Maybe Int -> Maybe LimitExp
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe Int
_sepLimit,
selOffset :: Maybe OffsetExp
S.selOffset = (SQLExp -> OffsetExp
S.OffsetExp (SQLExp -> OffsetExp) -> (Int -> SQLExp) -> Int -> OffsetExp
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> SQLExp
S.intToSQLExp) (Int -> OffsetExp) -> Maybe Int -> Maybe OffsetExp
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe Int
_sepOffset
}
rowsExtractor :: Extractor
rowsExtractor =
let jsonAgg :: SQLExp
jsonAgg = Text -> SQLExp
S.SEUnsafe Text
"json_agg(row_to_json(limit_cte.*))"
selectExp :: Select
selectExp =
Select
S.mkSelect
{ selExtr :: [Extractor]
S.selExtr = [SQLExp -> Maybe ColumnAlias -> Extractor
S.Extractor SQLExp
jsonAgg Maybe ColumnAlias
forall a. Maybe a
Nothing],
selFrom :: Maybe FromExp
S.selFrom = FromExp -> Maybe FromExp
forall a. a -> Maybe a
Just (FromExp -> Maybe FromExp) -> FromExp -> Maybe FromExp
forall a b. (a -> b) -> a -> b
$ TableIdentifier -> FromExp
S.mkIdenFromExp (TableAlias -> TableIdentifier
S.tableAliasToIdentifier TableAlias
limitCteAlias)
}
in SQLExp -> Maybe ColumnAlias -> Extractor
S.Extractor (SQLExp -> SQLExp -> SQLExp
S.handleIfNull (Text -> SQLExp
S.SELit Text
"[]") (Select -> SQLExp
S.SESelect Select
selectExp)) Maybe ColumnAlias
forall a. Maybe a
Nothing
withCount :: (Int, PG.ViaJSON a) -> WithOptionalTotalCount a
withCount :: forall a. (Int, ViaJSON a) -> WithOptionalTotalCount a
withCount (Int
count, PG.ViaJSON a
a) = Maybe Int -> a -> WithOptionalTotalCount a
forall a. Maybe Int -> a -> WithOptionalTotalCount a
WithOptionalTotalCount (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
count) a
a
withoutCount :: PG.ViaJSON a -> WithOptionalTotalCount a
withoutCount :: forall a. ViaJSON a -> WithOptionalTotalCount a
withoutCount (PG.ViaJSON a
a) = Maybe Int -> a -> WithOptionalTotalCount a
forall a. Maybe Int -> a -> WithOptionalTotalCount a
WithOptionalTotalCount Maybe Int
forall a. Maybe a
Nothing a
a
executeWithOptionalTotalCount :: (J.FromJSON a) => PG.Query -> RowsCountOption -> PG.TxE QErr (WithOptionalTotalCount a)
executeWithOptionalTotalCount :: forall a.
FromJSON a =>
Query -> RowsCountOption -> TxE QErr (WithOptionalTotalCount a)
executeWithOptionalTotalCount Query
sql RowsCountOption
getRowsCount =
case RowsCountOption
getRowsCount of
RowsCountOption
IncludeRowsCount -> ((Int, ViaJSON a) -> WithOptionalTotalCount a
forall a. (Int, ViaJSON a) -> WithOptionalTotalCount a
withCount ((Int, ViaJSON a) -> WithOptionalTotalCount a)
-> (SingleRow (Int, ViaJSON a) -> (Int, ViaJSON a))
-> SingleRow (Int, ViaJSON a)
-> WithOptionalTotalCount a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SingleRow (Int, ViaJSON a) -> (Int, ViaJSON a)
forall a. SingleRow a -> a
PG.getRow) (SingleRow (Int, ViaJSON a) -> WithOptionalTotalCount a)
-> TxET QErr IO (SingleRow (Int, ViaJSON a))
-> TxE QErr (WithOptionalTotalCount a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query -> () -> Bool -> TxET QErr IO (SingleRow (Int, ViaJSON a))
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
PG.withQE PGTxErr -> QErr
defaultTxErrorHandler Query
sql () Bool
False
RowsCountOption
DontIncludeRowsCount -> (ViaJSON a -> WithOptionalTotalCount a
forall a. ViaJSON a -> WithOptionalTotalCount a
withoutCount (ViaJSON a -> WithOptionalTotalCount a)
-> (SingleRow (Identity (ViaJSON a)) -> ViaJSON a)
-> SingleRow (Identity (ViaJSON a))
-> WithOptionalTotalCount a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Identity (ViaJSON a) -> ViaJSON a
forall a. Identity a -> a
runIdentity (Identity (ViaJSON a) -> ViaJSON a)
-> (SingleRow (Identity (ViaJSON a)) -> Identity (ViaJSON a))
-> SingleRow (Identity (ViaJSON a))
-> ViaJSON a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SingleRow (Identity (ViaJSON a)) -> Identity (ViaJSON a)
forall a. SingleRow a -> a
PG.getRow) (SingleRow (Identity (ViaJSON a)) -> WithOptionalTotalCount a)
-> TxET QErr IO (SingleRow (Identity (ViaJSON a)))
-> TxE QErr (WithOptionalTotalCount a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> ()
-> Bool
-> TxET QErr IO (SingleRow (Identity (ViaJSON a)))
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
PG.withQE PGTxErr -> QErr
defaultTxErrorHandler Query
sql () Bool
False
getOneOffScheduledEventsTx ::
ScheduledEventPagination ->
[ScheduledEventStatus] ->
RowsCountOption ->
PG.TxE QErr (WithOptionalTotalCount [OneOffScheduledEvent])
getOneOffScheduledEventsTx :: ScheduledEventPagination
-> [ScheduledEventStatus]
-> RowsCountOption
-> TxE QErr (WithOptionalTotalCount [OneOffScheduledEvent])
getOneOffScheduledEventsTx ScheduledEventPagination
pagination [ScheduledEventStatus]
statuses RowsCountOption
getRowsCount = do
let table :: QualifiedTable
table = SchemaName -> TableName -> QualifiedTable
forall a. SchemaName -> a -> QualifiedObject a
QualifiedObject SchemaName
"hdb_catalog" (TableName -> QualifiedTable) -> TableName -> QualifiedTable
forall a b. (a -> b) -> a -> b
$ Text -> TableName
TableName Text
"hdb_scheduled_events"
statusFilter :: BoolExp
statusFilter = [ScheduledEventStatus] -> BoolExp
mkScheduledEventStatusFilter [ScheduledEventStatus]
statuses
select :: Select
select =
Select
S.mkSelect
{ selExtr :: [Extractor]
S.selExtr = [Extractor
S.selectStar],
selFrom :: Maybe FromExp
S.selFrom = FromExp -> Maybe FromExp
forall a. a -> Maybe a
Just (FromExp -> Maybe FromExp) -> FromExp -> Maybe FromExp
forall a b. (a -> b) -> a -> b
$ QualifiedTable -> FromExp
S.mkSimpleFromExp QualifiedTable
table,
selWhere :: Maybe WhereFrag
S.selWhere = WhereFrag -> Maybe WhereFrag
forall a. a -> Maybe a
Just (WhereFrag -> Maybe WhereFrag) -> WhereFrag -> Maybe WhereFrag
forall a b. (a -> b) -> a -> b
$ BoolExp -> WhereFrag
S.WhereFrag BoolExp
statusFilter,
selOrderBy :: Maybe OrderByExp
S.selOrderBy = OrderByExp -> Maybe OrderByExp
forall a. a -> Maybe a
Just OrderByExp
scheduledTimeOrderBy
}
sql :: Query
sql = Builder -> Query
PG.fromBuilder (Builder -> Query) -> Builder -> Query
forall a b. (a -> b) -> a -> b
$ Select -> Builder
forall a. ToSQL a => a -> Builder
toSQL (Select -> Builder) -> Select -> Builder
forall a b. (a -> b) -> a -> b
$ Select -> ScheduledEventPagination -> RowsCountOption -> Select
mkPaginationSelectExp Select
select ScheduledEventPagination
pagination RowsCountOption
getRowsCount
Query
-> RowsCountOption
-> TxE QErr (WithOptionalTotalCount [OneOffScheduledEvent])
forall a.
FromJSON a =>
Query -> RowsCountOption -> TxE QErr (WithOptionalTotalCount a)
executeWithOptionalTotalCount Query
sql RowsCountOption
getRowsCount
getCronEventsTx ::
TriggerName ->
ScheduledEventPagination ->
[ScheduledEventStatus] ->
RowsCountOption ->
PG.TxE QErr (WithOptionalTotalCount [CronEvent])
getCronEventsTx :: TriggerName
-> ScheduledEventPagination
-> [ScheduledEventStatus]
-> RowsCountOption
-> TxE QErr (WithOptionalTotalCount [CronEvent])
getCronEventsTx TriggerName
triggerName ScheduledEventPagination
pagination [ScheduledEventStatus]
status RowsCountOption
getRowsCount = do
let triggerNameFilter :: BoolExp
triggerNameFilter =
CompareOp -> SQLExp -> SQLExp -> BoolExp
S.BECompare CompareOp
S.SEQ (Identifier -> SQLExp
S.SEIdentifier (Identifier -> SQLExp) -> Identifier -> SQLExp
forall a b. (a -> b) -> a -> b
$ Text -> Identifier
Identifier Text
"trigger_name") (Text -> SQLExp
S.SELit (Text -> SQLExp) -> Text -> SQLExp
forall a b. (a -> b) -> a -> b
$ TriggerName -> Text
triggerNameToTxt TriggerName
triggerName)
statusFilter :: BoolExp
statusFilter = [ScheduledEventStatus] -> BoolExp
mkScheduledEventStatusFilter [ScheduledEventStatus]
status
select :: Select
select =
Select
S.mkSelect
{ selExtr :: [Extractor]
S.selExtr = [Extractor
S.selectStar],
selFrom :: Maybe FromExp
S.selFrom = FromExp -> Maybe FromExp
forall a. a -> Maybe a
Just (FromExp -> Maybe FromExp) -> FromExp -> Maybe FromExp
forall a b. (a -> b) -> a -> b
$ QualifiedTable -> FromExp
S.mkSimpleFromExp QualifiedTable
cronEventsTable,
selWhere :: Maybe WhereFrag
S.selWhere = WhereFrag -> Maybe WhereFrag
forall a. a -> Maybe a
Just (WhereFrag -> Maybe WhereFrag) -> WhereFrag -> Maybe WhereFrag
forall a b. (a -> b) -> a -> b
$ BoolExp -> WhereFrag
S.WhereFrag (BoolExp -> WhereFrag) -> BoolExp -> WhereFrag
forall a b. (a -> b) -> a -> b
$ BinOp -> BoolExp -> BoolExp -> BoolExp
S.BEBin BinOp
S.AndOp BoolExp
triggerNameFilter BoolExp
statusFilter,
selOrderBy :: Maybe OrderByExp
S.selOrderBy = OrderByExp -> Maybe OrderByExp
forall a. a -> Maybe a
Just OrderByExp
scheduledTimeOrderBy
}
sql :: Query
sql = Builder -> Query
PG.fromBuilder (Builder -> Query) -> Builder -> Query
forall a b. (a -> b) -> a -> b
$ Select -> Builder
forall a. ToSQL a => a -> Builder
toSQL (Select -> Builder) -> Select -> Builder
forall a b. (a -> b) -> a -> b
$ Select -> ScheduledEventPagination -> RowsCountOption -> Select
mkPaginationSelectExp Select
select ScheduledEventPagination
pagination RowsCountOption
getRowsCount
Query
-> RowsCountOption -> TxE QErr (WithOptionalTotalCount [CronEvent])
forall a.
FromJSON a =>
Query -> RowsCountOption -> TxE QErr (WithOptionalTotalCount a)
executeWithOptionalTotalCount Query
sql RowsCountOption
getRowsCount
deleteScheduledEventTx ::
ScheduledEventId -> ScheduledEventType -> PG.TxE QErr ()
deleteScheduledEventTx :: ScheduledEventId -> ScheduledEventType -> TxE QErr ()
deleteScheduledEventTx ScheduledEventId
eventId = \case
ScheduledEventType
OneOff ->
(PGTxErr -> QErr)
-> Query -> Identity ScheduledEventId -> Bool -> TxE QErr ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE
PGTxErr -> QErr
defaultTxErrorHandler
[PG.sql|
DELETE FROM hdb_catalog.hdb_scheduled_events
WHERE id = $1
|]
(ScheduledEventId -> Identity ScheduledEventId
forall a. a -> Identity a
Identity ScheduledEventId
eventId)
Bool
False
ScheduledEventType
Cron ->
(PGTxErr -> QErr)
-> Query -> Identity ScheduledEventId -> Bool -> TxE QErr ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
PG.unitQE
PGTxErr -> QErr
defaultTxErrorHandler
[PG.sql|
DELETE FROM hdb_catalog.hdb_cron_events
WHERE id = $1
|]
(ScheduledEventId -> Identity ScheduledEventId
forall a. a -> Identity a
Identity ScheduledEventId
eventId)
Bool
False
invocationFieldExtractors :: QualifiedTable -> [S.Extractor]
QualifiedTable
table =
[ SQLExp -> Maybe ColumnAlias -> Extractor
S.Extractor (Text -> SQLExp
seIden Text
"id") Maybe ColumnAlias
forall a. Maybe a
Nothing,
SQLExp -> Maybe ColumnAlias -> Extractor
S.Extractor (Text -> SQLExp
seIden Text
"event_id") Maybe ColumnAlias
forall a. Maybe a
Nothing,
SQLExp -> Maybe ColumnAlias -> Extractor
S.Extractor (Text -> SQLExp
seIden Text
"status") Maybe ColumnAlias
forall a. Maybe a
Nothing,
SQLExp -> Maybe ColumnAlias -> Extractor
S.Extractor (SQLExp -> SQLExp
withJsonTypeAnn (SQLExp -> SQLExp) -> SQLExp -> SQLExp
forall a b. (a -> b) -> a -> b
$ Text -> SQLExp
seIden Text
"request") Maybe ColumnAlias
forall a. Maybe a
Nothing,
SQLExp -> Maybe ColumnAlias -> Extractor
S.Extractor (SQLExp -> SQLExp
withJsonTypeAnn (SQLExp -> SQLExp) -> SQLExp -> SQLExp
forall a b. (a -> b) -> a -> b
$ Text -> SQLExp
seIden Text
"response") Maybe ColumnAlias
forall a. Maybe a
Nothing,
SQLExp -> Maybe ColumnAlias -> Extractor
S.Extractor (Text -> SQLExp
seIden Text
"created_at") Maybe ColumnAlias
forall a. Maybe a
Nothing
]
where
withJsonTypeAnn :: SQLExp -> SQLExp
withJsonTypeAnn SQLExp
e = SQLExp -> TypeAnn -> SQLExp
S.SETyAnn SQLExp
e (TypeAnn -> SQLExp) -> TypeAnn -> SQLExp
forall a b. (a -> b) -> a -> b
$ Text -> TypeAnn
S.TypeAnn Text
"json"
seIden :: Text -> SQLExp
seIden = QIdentifier -> SQLExp
S.SEQIdentifier (QIdentifier -> SQLExp) -> (Text -> QIdentifier) -> Text -> SQLExp
forall b c a. (b -> c) -> (a -> b) -> a -> c
. QualifiedTable -> Identifier -> QIdentifier
forall a. IsIdentifier a => QualifiedTable -> a -> QIdentifier
S.mkQIdentifierTable QualifiedTable
table (Identifier -> QIdentifier)
-> (Text -> Identifier) -> Text -> QIdentifier
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> Identifier
Identifier
mkEventIdBoolExp :: QualifiedTable -> EventId -> S.BoolExp
mkEventIdBoolExp :: QualifiedTable -> ScheduledEventId -> BoolExp
mkEventIdBoolExp QualifiedTable
table ScheduledEventId
eventId =
CompareOp -> SQLExp -> SQLExp -> BoolExp
S.BECompare
CompareOp
S.SEQ
(QIdentifier -> SQLExp
S.SEQIdentifier (QIdentifier -> SQLExp) -> QIdentifier -> SQLExp
forall a b. (a -> b) -> a -> b
$ QualifiedTable -> Identifier -> QIdentifier
forall a. IsIdentifier a => QualifiedTable -> a -> QIdentifier
S.mkQIdentifierTable QualifiedTable
table (Identifier -> QIdentifier) -> Identifier -> QIdentifier
forall a b. (a -> b) -> a -> b
$ Text -> Identifier
Identifier Text
"event_id")
(Text -> SQLExp
S.SELit (Text -> SQLExp) -> Text -> SQLExp
forall a b. (a -> b) -> a -> b
$ ScheduledEventId -> Text
unEventId ScheduledEventId
eventId)
getScheduledEventInvocationsTx ::
GetScheduledEventInvocations ->
PG.TxE QErr (WithOptionalTotalCount [ScheduledEventInvocation])
getScheduledEventInvocationsTx :: GetScheduledEventInvocations
-> TxE QErr (WithOptionalTotalCount [ScheduledEventInvocation])
getScheduledEventInvocationsTx GetScheduledEventInvocations
getEventInvocations = do
let eventsTables :: EventTables
eventsTables = QualifiedTable -> QualifiedTable -> QualifiedTable -> EventTables
EventTables QualifiedTable
oneOffInvocationsTable QualifiedTable
cronInvocationsTable QualifiedTable
cronEventsTable
sql :: Query
sql = Builder -> Query
PG.fromBuilder (Builder -> Query) -> Builder -> Query
forall a b. (a -> b) -> a -> b
$ Select -> Builder
forall a. ToSQL a => a -> Builder
toSQL (Select -> Builder) -> Select -> Builder
forall a b. (a -> b) -> a -> b
$ EventTables -> GetScheduledEventInvocations -> Select
getScheduledEventsInvocationsQuery EventTables
eventsTables GetScheduledEventInvocations
getEventInvocations
Query
-> RowsCountOption
-> TxE QErr (WithOptionalTotalCount [ScheduledEventInvocation])
forall a.
FromJSON a =>
Query -> RowsCountOption -> TxE QErr (WithOptionalTotalCount a)
executeWithOptionalTotalCount Query
sql (GetScheduledEventInvocations -> RowsCountOption
_geiGetRowsCount GetScheduledEventInvocations
getEventInvocations)
where
oneOffInvocationsTable :: QualifiedTable
oneOffInvocationsTable = SchemaName -> TableName -> QualifiedTable
forall a. SchemaName -> a -> QualifiedObject a
QualifiedObject SchemaName
"hdb_catalog" (TableName -> QualifiedTable) -> TableName -> QualifiedTable
forall a b. (a -> b) -> a -> b
$ Text -> TableName
TableName Text
"hdb_scheduled_event_invocation_logs"
cronInvocationsTable :: QualifiedTable
cronInvocationsTable = SchemaName -> TableName -> QualifiedTable
forall a. SchemaName -> a -> QualifiedObject a
QualifiedObject SchemaName
"hdb_catalog" (TableName -> QualifiedTable) -> TableName -> QualifiedTable
forall a b. (a -> b) -> a -> b
$ Text -> TableName
TableName Text
"hdb_cron_event_invocation_logs"
data EventTables = EventTables
{ EventTables -> QualifiedTable
etOneOffInvocationsTable :: QualifiedTable,
EventTables -> QualifiedTable
etCronInvocationsTable :: QualifiedTable,
EventTables -> QualifiedTable
etCronEventsTable :: QualifiedTable
}
getScheduledEventsInvocationsQueryNoPagination :: EventTables -> GetScheduledEventInvocationsBy -> S.Select
(EventTables QualifiedTable
oneOffInvocationsTable QualifiedTable
cronInvocationsTable QualifiedTable
cronEventsTable') GetScheduledEventInvocationsBy
invocationsBy =
Select
allRowsSelect
where
createdAtOrderBy :: QualifiedTable -> OrderByExp
createdAtOrderBy QualifiedTable
table =
let createdAtCol :: SQLExp
createdAtCol = QIdentifier -> SQLExp
S.SEQIdentifier (QIdentifier -> SQLExp) -> QIdentifier -> SQLExp
forall a b. (a -> b) -> a -> b
$ QualifiedTable -> Identifier -> QIdentifier
forall a. IsIdentifier a => QualifiedTable -> a -> QIdentifier
S.mkQIdentifierTable QualifiedTable
table (Identifier -> QIdentifier) -> Identifier -> QIdentifier
forall a b. (a -> b) -> a -> b
$ Text -> Identifier
Identifier Text
"created_at"
in NonEmpty OrderByItem -> OrderByExp
S.OrderByExp (NonEmpty OrderByItem -> OrderByExp)
-> NonEmpty OrderByItem -> OrderByExp
forall a b. (a -> b) -> a -> b
$ (OrderByItem -> [OrderByItem] -> NonEmpty OrderByItem)
-> [OrderByItem] -> OrderByItem -> NonEmpty OrderByItem
forall a b c. (a -> b -> c) -> b -> a -> c
flip OrderByItem -> [OrderByItem] -> NonEmpty OrderByItem
forall a. a -> [a] -> NonEmpty a
(NE.:|) [] (OrderByItem -> NonEmpty OrderByItem)
-> OrderByItem -> NonEmpty OrderByItem
forall a b. (a -> b) -> a -> b
$ SQLExp -> Maybe OrderType -> Maybe NullsOrder -> OrderByItem
S.OrderByItem SQLExp
createdAtCol (OrderType -> Maybe OrderType
forall a. a -> Maybe a
Just OrderType
S.OTDesc) Maybe NullsOrder
forall a. Maybe a
Nothing
allRowsSelect :: Select
allRowsSelect = case GetScheduledEventInvocationsBy
invocationsBy of
GIBEventId ScheduledEventId
eventId ScheduledEventType
eventType ->
let table :: QualifiedTable
table = case ScheduledEventType
eventType of
ScheduledEventType
OneOff -> QualifiedTable
oneOffInvocationsTable
ScheduledEventType
Cron -> QualifiedTable
cronInvocationsTable
in Select
S.mkSelect
{ selExtr :: [Extractor]
S.selExtr = QualifiedTable -> [Extractor]
invocationFieldExtractors QualifiedTable
table,
selFrom :: Maybe FromExp
S.selFrom = FromExp -> Maybe FromExp
forall a. a -> Maybe a
Just (FromExp -> Maybe FromExp) -> FromExp -> Maybe FromExp
forall a b. (a -> b) -> a -> b
$ QualifiedTable -> FromExp
S.mkSimpleFromExp QualifiedTable
table,
selOrderBy :: Maybe OrderByExp
S.selOrderBy = OrderByExp -> Maybe OrderByExp
forall a. a -> Maybe a
Just (OrderByExp -> Maybe OrderByExp) -> OrderByExp -> Maybe OrderByExp
forall a b. (a -> b) -> a -> b
$ QualifiedTable -> OrderByExp
createdAtOrderBy QualifiedTable
table,
selWhere :: Maybe WhereFrag
S.selWhere = WhereFrag -> Maybe WhereFrag
forall a. a -> Maybe a
Just (WhereFrag -> Maybe WhereFrag) -> WhereFrag -> Maybe WhereFrag
forall a b. (a -> b) -> a -> b
$ BoolExp -> WhereFrag
S.WhereFrag (BoolExp -> WhereFrag) -> BoolExp -> WhereFrag
forall a b. (a -> b) -> a -> b
$ QualifiedTable -> ScheduledEventId -> BoolExp
mkEventIdBoolExp QualifiedTable
table ScheduledEventId
eventId
}
GIBEvent ScheduledEvent
event -> case ScheduledEvent
event of
ScheduledEvent
SEOneOff ->
let table :: QualifiedTable
table = QualifiedTable
oneOffInvocationsTable
in Select
S.mkSelect
{ selExtr :: [Extractor]
S.selExtr = QualifiedTable -> [Extractor]
invocationFieldExtractors QualifiedTable
table,
selFrom :: Maybe FromExp
S.selFrom = FromExp -> Maybe FromExp
forall a. a -> Maybe a
Just (FromExp -> Maybe FromExp) -> FromExp -> Maybe FromExp
forall a b. (a -> b) -> a -> b
$ QualifiedTable -> FromExp
S.mkSimpleFromExp QualifiedTable
table,
selOrderBy :: Maybe OrderByExp
S.selOrderBy = OrderByExp -> Maybe OrderByExp
forall a. a -> Maybe a
Just (OrderByExp -> Maybe OrderByExp) -> OrderByExp -> Maybe OrderByExp
forall a b. (a -> b) -> a -> b
$ QualifiedTable -> OrderByExp
createdAtOrderBy QualifiedTable
table
}
SECron TriggerName
triggerName ->
let invocationTable :: QualifiedTable
invocationTable = QualifiedTable
cronInvocationsTable
eventTable :: QualifiedTable
eventTable = QualifiedTable
cronEventsTable'
joinCondition :: JoinCond
joinCondition =
BoolExp -> JoinCond
S.JoinOn
(BoolExp -> JoinCond) -> BoolExp -> JoinCond
forall a b. (a -> b) -> a -> b
$ CompareOp -> SQLExp -> SQLExp -> BoolExp
S.BECompare
CompareOp
S.SEQ
(QIdentifier -> SQLExp
S.SEQIdentifier (QIdentifier -> SQLExp) -> QIdentifier -> SQLExp
forall a b. (a -> b) -> a -> b
$ QualifiedTable -> Identifier -> QIdentifier
forall a. IsIdentifier a => QualifiedTable -> a -> QIdentifier
S.mkQIdentifierTable QualifiedTable
eventTable (Identifier -> QIdentifier) -> Identifier -> QIdentifier
forall a b. (a -> b) -> a -> b
$ Text -> Identifier
Identifier Text
"id")
(QIdentifier -> SQLExp
S.SEQIdentifier (QIdentifier -> SQLExp) -> QIdentifier -> SQLExp
forall a b. (a -> b) -> a -> b
$ QualifiedTable -> Identifier -> QIdentifier
forall a. IsIdentifier a => QualifiedTable -> a -> QIdentifier
S.mkQIdentifierTable QualifiedTable
invocationTable (Identifier -> QIdentifier) -> Identifier -> QIdentifier
forall a b. (a -> b) -> a -> b
$ Text -> Identifier
Identifier Text
"event_id")
joinTables :: JoinExpr
joinTables =
FromItem -> JoinType -> FromItem -> JoinCond -> JoinExpr
S.JoinExpr
(QualifiedTable -> Maybe TableAlias -> FromItem
S.FISimple QualifiedTable
invocationTable Maybe TableAlias
forall a. Maybe a
Nothing)
JoinType
S.Inner
(QualifiedTable -> Maybe TableAlias -> FromItem
S.FISimple QualifiedTable
eventTable Maybe TableAlias
forall a. Maybe a
Nothing)
JoinCond
joinCondition
triggerBoolExp :: BoolExp
triggerBoolExp =
CompareOp -> SQLExp -> SQLExp -> BoolExp
S.BECompare
CompareOp
S.SEQ
(QIdentifier -> SQLExp
S.SEQIdentifier (QIdentifier -> SQLExp) -> QIdentifier -> SQLExp
forall a b. (a -> b) -> a -> b
$ QualifiedTable -> Identifier -> QIdentifier
forall a. IsIdentifier a => QualifiedTable -> a -> QIdentifier
S.mkQIdentifierTable QualifiedTable
eventTable (Text -> Identifier
Identifier Text
"trigger_name"))
(Text -> SQLExp
S.SELit (Text -> SQLExp) -> Text -> SQLExp
forall a b. (a -> b) -> a -> b
$ TriggerName -> Text
triggerNameToTxt TriggerName
triggerName)
in Select
S.mkSelect
{ selExtr :: [Extractor]
S.selExtr = QualifiedTable -> [Extractor]
invocationFieldExtractors QualifiedTable
invocationTable,
selFrom :: Maybe FromExp
S.selFrom = FromExp -> Maybe FromExp
forall a. a -> Maybe a
Just (FromExp -> Maybe FromExp) -> FromExp -> Maybe FromExp
forall a b. (a -> b) -> a -> b
$ [FromItem] -> FromExp
S.FromExp [JoinExpr -> FromItem
S.FIJoin JoinExpr
joinTables],
selWhere :: Maybe WhereFrag
S.selWhere = WhereFrag -> Maybe WhereFrag
forall a. a -> Maybe a
Just (WhereFrag -> Maybe WhereFrag) -> WhereFrag -> Maybe WhereFrag
forall a b. (a -> b) -> a -> b
$ BoolExp -> WhereFrag
S.WhereFrag BoolExp
triggerBoolExp,
selOrderBy :: Maybe OrderByExp
S.selOrderBy = OrderByExp -> Maybe OrderByExp
forall a. a -> Maybe a
Just (OrderByExp -> Maybe OrderByExp) -> OrderByExp -> Maybe OrderByExp
forall a b. (a -> b) -> a -> b
$ QualifiedTable -> OrderByExp
createdAtOrderBy QualifiedTable
invocationTable
}
getScheduledEventsInvocationsQuery :: EventTables -> GetScheduledEventInvocations -> S.Select
getScheduledEventsInvocationsQuery :: EventTables -> GetScheduledEventInvocations -> Select
getScheduledEventsInvocationsQuery EventTables
eventTables (GetScheduledEventInvocations GetScheduledEventInvocationsBy
invocationsBy ScheduledEventPagination
pagination RowsCountOption
shouldIncludeRowsCount) =
let invocationsSelect :: Select
invocationsSelect = EventTables -> GetScheduledEventInvocationsBy -> Select
getScheduledEventsInvocationsQueryNoPagination EventTables
eventTables GetScheduledEventInvocationsBy
invocationsBy
in Select -> ScheduledEventPagination -> RowsCountOption -> Select
mkPaginationSelectExp Select
invocationsSelect ScheduledEventPagination
pagination RowsCountOption
shouldIncludeRowsCount
createFetchedScheduledEventsStatsLogger :: (MonadIO m) => L.Logger L.Hasura -> m FetchedScheduledEventsStatsLogger
createFetchedScheduledEventsStatsLogger :: forall (m :: * -> *).
MonadIO m =>
Logger Hasura -> m FetchedScheduledEventsStatsLogger
createFetchedScheduledEventsStatsLogger = Logger Hasura -> m FetchedScheduledEventsStatsLogger
forall (m :: * -> *) stats impl.
(MonadIO m, ToEngineLog stats impl, Monoid stats) =>
Logger impl -> m (Trigger stats stats)
L.createStatsLogger
closeFetchedScheduledEventsStatsLogger ::
(MonadIO m) => L.Logger L.Hasura -> FetchedScheduledEventsStatsLogger -> m ()
closeFetchedScheduledEventsStatsLogger :: forall (m :: * -> *).
MonadIO m =>
Logger Hasura -> FetchedScheduledEventsStatsLogger -> m ()
closeFetchedScheduledEventsStatsLogger = EngineLogType Hasura
-> Logger Hasura -> FetchedScheduledEventsStatsLogger -> m ()
forall (m :: * -> *) impl stats.
(MonadIO m, EnabledLogTypes impl) =>
EngineLogType impl -> Logger impl -> Trigger stats stats -> m ()
L.closeStatsLogger EngineLogType Hasura
L.scheduledTriggerProcessLogType
logFetchedScheduledEventsStats ::
(MonadIO m) =>
FetchedScheduledEventsStatsLogger ->
CronEventsCount ->
OneOffScheduledEventsCount ->
m ()
logFetchedScheduledEventsStats :: forall (m :: * -> *).
MonadIO m =>
FetchedScheduledEventsStatsLogger
-> CronEventsCount -> OneOffScheduledEventsCount -> m ()
logFetchedScheduledEventsStats FetchedScheduledEventsStatsLogger
logger CronEventsCount
cron OneOffScheduledEventsCount
oneOff =
FetchedScheduledEventsStatsLogger
-> FetchedScheduledEventsStats -> m ()
forall (m :: * -> *) stats.
MonadIO m =>
Trigger stats stats -> stats -> m ()
L.logStats FetchedScheduledEventsStatsLogger
logger (CronEventsCount
-> OneOffScheduledEventsCount -> Int -> FetchedScheduledEventsStats
FetchedScheduledEventsStats CronEventsCount
cron OneOffScheduledEventsCount
oneOff Int
1)
createFetchedCronTriggerStatsLogger :: (MonadIO m) => L.Logger L.Hasura -> m FetchedCronTriggerStatsLogger
createFetchedCronTriggerStatsLogger :: forall (m :: * -> *).
MonadIO m =>
Logger Hasura -> m FetchedCronTriggerStatsLogger
createFetchedCronTriggerStatsLogger = Logger Hasura -> m FetchedCronTriggerStatsLogger
forall (m :: * -> *) stats impl.
(MonadIO m, ToEngineLog stats impl, Monoid stats) =>
Logger impl -> m (Trigger stats stats)
L.createStatsLogger
closeFetchedCronTriggersStatsLogger ::
(MonadIO m) => L.Logger L.Hasura -> FetchedCronTriggerStatsLogger -> m ()
= EngineLogType Hasura
-> Logger Hasura -> FetchedCronTriggerStatsLogger -> m ()
forall (m :: * -> *) impl stats.
(MonadIO m, EnabledLogTypes impl) =>
EngineLogType impl -> Logger impl -> Trigger stats stats -> m ()
L.closeStatsLogger EngineLogType Hasura
L.cronEventGeneratorProcessType
logFetchedCronTriggersStats ::
(MonadIO m) =>
FetchedCronTriggerStatsLogger ->
[CronTriggerStats] ->
m ()
FetchedCronTriggerStatsLogger
logger [CronTriggerStats]
cronTriggerStats =
FetchedCronTriggerStatsLogger -> FetchedCronTriggerStats -> m ()
forall (m :: * -> *) stats.
MonadIO m =>
Trigger stats stats -> stats -> m ()
L.logStats FetchedCronTriggerStatsLogger
logger ([CronTriggerStats] -> Int -> FetchedCronTriggerStats
FetchedCronTriggerStats [CronTriggerStats]
cronTriggerStats Int
1)