{-# LANGUAGE PatternSynonyms #-}
{-# LANGUAGE QuasiQuotes #-}
module Hasura.Eventing.ScheduledTrigger
( runCronEventsGenerator,
processScheduledTriggers,
generateScheduleTimes,
CronEventSeed (..),
LockedEventsCtx (..),
getDeprivedCronTriggerStatsTx,
getScheduledEventsForDeliveryTx,
insertInvocationTx,
setScheduledEventOpTx,
unlockScheduledEventsTx,
unlockAllLockedScheduledEventsTx,
insertCronEventsTx,
insertOneOffScheduledEventTx,
dropFutureCronEventsTx,
getOneOffScheduledEventsTx,
getCronEventsTx,
deleteScheduledEventTx,
getInvocationsTx,
getInvocationsQuery,
getInvocationsQueryNoPagination,
mkScheduledEventStatusFilter,
scheduledTimeOrderBy,
mkPaginationSelectExp,
withCount,
invocationFieldExtractors,
mkEventIdBoolExp,
EventTables (..),
)
where
import Control.Arrow.Extended (dup)
import Control.Concurrent.Extended (Forever (..), sleep)
import Control.Concurrent.STM
import Control.Lens (view)
import Data.Aeson qualified as J
import Data.Environment qualified as Env
import Data.Has
import Data.HashMap.Strict qualified as Map
import Data.Int (Int64)
import Data.List (unfoldr)
import Data.List.NonEmpty qualified as NE
import Data.SerializableBlob qualified as SB
import Data.Set qualified as Set
import Data.Time.Clock
import Data.URL.Template (printURLTemplate)
import Database.PG.Query qualified as Q
import Hasura.Backends.Postgres.Execute.Types
import Hasura.Backends.Postgres.SQL.DML qualified as S
import Hasura.Backends.Postgres.SQL.Types
import Hasura.Base.Error
import Hasura.Eventing.Common
import Hasura.Eventing.HTTP
import Hasura.Eventing.ScheduledTrigger.Types
import Hasura.HTTP (getHTTPExceptionStatus)
import Hasura.Logging qualified as L
import Hasura.Metadata.Class
import Hasura.Prelude
import Hasura.RQL.DDL.EventTrigger (getHeaderInfosFromConf)
import Hasura.RQL.DDL.Headers
import Hasura.RQL.DDL.Webhook.Transform
import Hasura.RQL.Types.Common
import Hasura.RQL.Types.EventTrigger
import Hasura.RQL.Types.Eventing
import Hasura.RQL.Types.Numeric qualified as Numeric
import Hasura.RQL.Types.ScheduledTrigger
import Hasura.RQL.Types.SchemaCache
import Hasura.SQL.Types
import Hasura.Tracing qualified as Tracing
import Network.HTTP.Client.Transformable qualified as HTTP
import System.Cron
import Text.Builder qualified as TB
runCronEventsGenerator ::
( MonadIO m,
MonadMetadataStorage (MetadataStorageT m)
) =>
L.Logger L.Hasura ->
IO SchemaCache ->
m void
runCronEventsGenerator :: Logger Hasura -> IO SchemaCache -> m void
runCronEventsGenerator Logger Hasura
logger IO SchemaCache
getSC = do
m () -> m void
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m void) -> m () -> m void
forall a b. (a -> b) -> a -> b
$ do
SchemaCache
sc <- IO SchemaCache -> m SchemaCache
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO SchemaCache
getSC
let cronTriggersCache :: HashMap TriggerName CronTriggerInfo
cronTriggersCache = SchemaCache -> HashMap TriggerName CronTriggerInfo
scCronTriggers SchemaCache
sc
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (HashMap TriggerName CronTriggerInfo -> Bool
forall k v. HashMap k v -> Bool
Map.null HashMap TriggerName CronTriggerInfo
cronTriggersCache) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
Either QErr ()
eitherRes <- MetadataStorageT m () -> m (Either QErr ())
forall (m :: * -> *) a. MetadataStorageT m a -> m (Either QErr a)
runMetadataStorageT (MetadataStorageT m () -> m (Either QErr ()))
-> MetadataStorageT m () -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$ do
[CronTriggerStats]
deprivedCronTriggerStats <- [TriggerName] -> MetadataStorageT m [CronTriggerStats]
forall (m :: * -> *).
MonadMetadataStorage m =>
[TriggerName] -> m [CronTriggerStats]
getDeprivedCronTriggerStats ([TriggerName] -> MetadataStorageT m [CronTriggerStats])
-> [TriggerName] -> MetadataStorageT m [CronTriggerStats]
forall a b. (a -> b) -> a -> b
$ HashMap TriggerName CronTriggerInfo -> [TriggerName]
forall k v. HashMap k v -> [k]
Map.keys HashMap TriggerName CronTriggerInfo
cronTriggersCache
[(CronTriggerInfo, CronTriggerStats)]
cronTriggersForHydrationWithStats <-
[Maybe (CronTriggerInfo, CronTriggerStats)]
-> [(CronTriggerInfo, CronTriggerStats)]
forall (f :: * -> *) a. Filterable f => f (Maybe a) -> f a
catMaybes
([Maybe (CronTriggerInfo, CronTriggerStats)]
-> [(CronTriggerInfo, CronTriggerStats)])
-> MetadataStorageT m [Maybe (CronTriggerInfo, CronTriggerStats)]
-> MetadataStorageT m [(CronTriggerInfo, CronTriggerStats)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (CronTriggerStats
-> MetadataStorageT m (Maybe (CronTriggerInfo, CronTriggerStats)))
-> [CronTriggerStats]
-> MetadataStorageT m [Maybe (CronTriggerInfo, CronTriggerStats)]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM (HashMap TriggerName CronTriggerInfo
-> CronTriggerStats
-> MetadataStorageT m (Maybe (CronTriggerInfo, CronTriggerStats))
withCronTrigger HashMap TriggerName CronTriggerInfo
cronTriggersCache) [CronTriggerStats]
deprivedCronTriggerStats
[(CronTriggerInfo, CronTriggerStats)] -> MetadataStorageT m ()
forall (m :: * -> *).
MonadMetadataStorage m =>
[(CronTriggerInfo, CronTriggerStats)] -> m ()
insertCronEventsFor [(CronTriggerInfo, CronTriggerStats)]
cronTriggersForHydrationWithStats
Either QErr () -> (QErr -> m ()) -> m ()
forall (m :: * -> *) e a.
Applicative m =>
Either e a -> (e -> m a) -> m a
onLeft Either QErr ()
eitherRes ((QErr -> m ()) -> m ()) -> (QErr -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ Logger Hasura
-> forall a (m :: * -> *).
(ToEngineLog a Hasura, MonadIO m) =>
a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
(ToEngineLog a impl, MonadIO m) =>
a -> m ()
L.unLogger Logger Hasura
logger (ScheduledTriggerInternalErr -> m ())
-> (QErr -> ScheduledTriggerInternalErr) -> QErr -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. QErr -> ScheduledTriggerInternalErr
ScheduledTriggerInternalErr
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ DiffTime -> IO ()
sleep (Minutes -> DiffTime
minutes Minutes
1)
where
withCronTrigger :: HashMap TriggerName CronTriggerInfo
-> CronTriggerStats
-> MetadataStorageT m (Maybe (CronTriggerInfo, CronTriggerStats))
withCronTrigger HashMap TriggerName CronTriggerInfo
cronTriggerCache CronTriggerStats
cronTriggerStat = do
case TriggerName
-> HashMap TriggerName CronTriggerInfo -> Maybe CronTriggerInfo
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
Map.lookup (CronTriggerStats -> TriggerName
_ctsName CronTriggerStats
cronTriggerStat) HashMap TriggerName CronTriggerInfo
cronTriggerCache of
Maybe CronTriggerInfo
Nothing -> do
Logger Hasura
-> forall a (m :: * -> *).
(ToEngineLog a Hasura, MonadIO m) =>
a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
(ToEngineLog a impl, MonadIO m) =>
a -> m ()
L.unLogger Logger Hasura
logger (ScheduledTriggerInternalErr -> MetadataStorageT m ())
-> ScheduledTriggerInternalErr -> MetadataStorageT m ()
forall a b. (a -> b) -> a -> b
$
QErr -> ScheduledTriggerInternalErr
ScheduledTriggerInternalErr (QErr -> ScheduledTriggerInternalErr)
-> QErr -> ScheduledTriggerInternalErr
forall a b. (a -> b) -> a -> b
$
Code -> Text -> QErr
err500 Code
Unexpected Text
"could not find scheduled trigger in the schema cache"
Maybe (CronTriggerInfo, CronTriggerStats)
-> MetadataStorageT m (Maybe (CronTriggerInfo, CronTriggerStats))
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (CronTriggerInfo, CronTriggerStats)
forall a. Maybe a
Nothing
Just CronTriggerInfo
cronTrigger ->
Maybe (CronTriggerInfo, CronTriggerStats)
-> MetadataStorageT m (Maybe (CronTriggerInfo, CronTriggerStats))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe (CronTriggerInfo, CronTriggerStats)
-> MetadataStorageT m (Maybe (CronTriggerInfo, CronTriggerStats)))
-> Maybe (CronTriggerInfo, CronTriggerStats)
-> MetadataStorageT m (Maybe (CronTriggerInfo, CronTriggerStats))
forall a b. (a -> b) -> a -> b
$
(CronTriggerInfo, CronTriggerStats)
-> Maybe (CronTriggerInfo, CronTriggerStats)
forall a. a -> Maybe a
Just (CronTriggerInfo
cronTrigger, CronTriggerStats
cronTriggerStat)
insertCronEventsFor ::
(MonadMetadataStorage m) =>
[(CronTriggerInfo, CronTriggerStats)] ->
m ()
insertCronEventsFor :: [(CronTriggerInfo, CronTriggerStats)] -> m ()
insertCronEventsFor [(CronTriggerInfo, CronTriggerStats)]
cronTriggersWithStats = do
let scheduledEvents :: [CronEventSeed]
scheduledEvents = (((CronTriggerInfo, CronTriggerStats) -> [CronEventSeed])
-> [(CronTriggerInfo, CronTriggerStats)] -> [CronEventSeed])
-> [(CronTriggerInfo, CronTriggerStats)]
-> ((CronTriggerInfo, CronTriggerStats) -> [CronEventSeed])
-> [CronEventSeed]
forall a b c. (a -> b -> c) -> b -> a -> c
flip ((CronTriggerInfo, CronTriggerStats) -> [CronEventSeed])
-> [(CronTriggerInfo, CronTriggerStats)] -> [CronEventSeed]
forall (t :: * -> *) a b. Foldable t => (a -> [b]) -> t a -> [b]
concatMap [(CronTriggerInfo, CronTriggerStats)]
cronTriggersWithStats (((CronTriggerInfo, CronTriggerStats) -> [CronEventSeed])
-> [CronEventSeed])
-> ((CronTriggerInfo, CronTriggerStats) -> [CronEventSeed])
-> [CronEventSeed]
forall a b. (a -> b) -> a -> b
$ \(CronTriggerInfo
cti, CronTriggerStats
stats) ->
UTCTime -> CronTriggerInfo -> [CronEventSeed]
generateCronEventsFrom (CronTriggerStats -> UTCTime
_ctsMaxScheduledTime CronTriggerStats
stats) CronTriggerInfo
cti
case [CronEventSeed]
scheduledEvents of
[] -> () -> m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
[CronEventSeed]
events -> [CronEventSeed] -> m ()
forall (m :: * -> *).
MonadMetadataStorage m =>
[CronEventSeed] -> m ()
insertCronEvents [CronEventSeed]
events
generateCronEventsFrom :: UTCTime -> CronTriggerInfo -> [CronEventSeed]
generateCronEventsFrom :: UTCTime -> CronTriggerInfo -> [CronEventSeed]
generateCronEventsFrom UTCTime
startTime CronTriggerInfo {[EventHeaderInfo]
Maybe Value
Maybe Text
Maybe MetadataResponseTransform
Maybe RequestTransform
CronSchedule
EnvRecord ResolvedWebhook
TriggerName
STRetryConf
ctiResponseTransform :: CronTriggerInfo -> Maybe MetadataResponseTransform
ctiRequestTransform :: CronTriggerInfo -> Maybe RequestTransform
ctiComment :: CronTriggerInfo -> Maybe Text
ctiHeaders :: CronTriggerInfo -> [EventHeaderInfo]
ctiWebhookInfo :: CronTriggerInfo -> EnvRecord ResolvedWebhook
ctiRetryConf :: CronTriggerInfo -> STRetryConf
ctiPayload :: CronTriggerInfo -> Maybe Value
ctiSchedule :: CronTriggerInfo -> CronSchedule
ctiName :: CronTriggerInfo -> TriggerName
ctiResponseTransform :: Maybe MetadataResponseTransform
ctiRequestTransform :: Maybe RequestTransform
ctiComment :: Maybe Text
ctiHeaders :: [EventHeaderInfo]
ctiWebhookInfo :: EnvRecord ResolvedWebhook
ctiRetryConf :: STRetryConf
ctiPayload :: Maybe Value
ctiSchedule :: CronSchedule
ctiName :: TriggerName
..} =
(UTCTime -> CronEventSeed) -> [UTCTime] -> [CronEventSeed]
forall a b. (a -> b) -> [a] -> [b]
map (TriggerName -> UTCTime -> CronEventSeed
CronEventSeed TriggerName
ctiName) ([UTCTime] -> [CronEventSeed]) -> [UTCTime] -> [CronEventSeed]
forall a b. (a -> b) -> a -> b
$
UTCTime -> Int -> CronSchedule -> [UTCTime]
generateScheduleTimes UTCTime
startTime Int
100 CronSchedule
ctiSchedule
generateScheduleTimes :: UTCTime -> Int -> CronSchedule -> [UTCTime]
generateScheduleTimes :: UTCTime -> Int -> CronSchedule -> [UTCTime]
generateScheduleTimes UTCTime
from Int
n CronSchedule
cron = Int -> [UTCTime] -> [UTCTime]
forall a. Int -> [a] -> [a]
take Int
n ([UTCTime] -> [UTCTime]) -> [UTCTime] -> [UTCTime]
forall a b. (a -> b) -> a -> b
$ UTCTime -> [UTCTime]
go UTCTime
from
where
go :: UTCTime -> [UTCTime]
go = (UTCTime -> Maybe (UTCTime, UTCTime)) -> UTCTime -> [UTCTime]
forall b a. (b -> Maybe (a, b)) -> b -> [a]
unfoldr ((UTCTime -> (UTCTime, UTCTime))
-> Maybe UTCTime -> Maybe (UTCTime, UTCTime)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap UTCTime -> (UTCTime, UTCTime)
forall (arr :: * -> * -> *) a. Arrow arr => arr a (a, a)
dup (Maybe UTCTime -> Maybe (UTCTime, UTCTime))
-> (UTCTime -> Maybe UTCTime)
-> UTCTime
-> Maybe (UTCTime, UTCTime)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. CronSchedule -> UTCTime -> Maybe UTCTime
nextMatch CronSchedule
cron)
processCronEvents ::
( MonadIO m,
Tracing.HasReporter m,
MonadMetadataStorage (MetadataStorageT m)
) =>
L.Logger L.Hasura ->
HTTP.Manager ->
[CronEvent] ->
IO SchemaCache ->
TVar (Set.Set CronEventId) ->
m ()
processCronEvents :: Logger Hasura
-> Manager
-> [CronEvent]
-> IO SchemaCache
-> TVar (Set CronEventId)
-> m ()
processCronEvents Logger Hasura
logger Manager
httpMgr [CronEvent]
cronEvents IO SchemaCache
getSC TVar (Set CronEventId)
lockedCronEvents = do
HashMap TriggerName CronTriggerInfo
cronTriggersInfo <- SchemaCache -> HashMap TriggerName CronTriggerInfo
scCronTriggers (SchemaCache -> HashMap TriggerName CronTriggerInfo)
-> m SchemaCache -> m (HashMap TriggerName CronTriggerInfo)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO SchemaCache -> m SchemaCache
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO SchemaCache
getSC
[CronEventId] -> TVar (Set CronEventId) -> m ()
forall (m :: * -> *).
MonadIO m =>
[CronEventId] -> TVar (Set CronEventId) -> m ()
saveLockedEvents ((CronEvent -> CronEventId) -> [CronEvent] -> [CronEventId]
forall a b. (a -> b) -> [a] -> [b]
map CronEvent -> CronEventId
_ceId [CronEvent]
cronEvents) TVar (Set CronEventId)
lockedCronEvents
[CronEvent] -> (CronEvent -> m ()) -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ [CronEvent]
cronEvents ((CronEvent -> m ()) -> m ()) -> (CronEvent -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \(CronEvent CronEventId
id' TriggerName
name UTCTime
st Text
_ Int
tries UTCTime
_ Maybe UTCTime
_) -> do
case TriggerName
-> HashMap TriggerName CronTriggerInfo -> Maybe CronTriggerInfo
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
Map.lookup TriggerName
name HashMap TriggerName CronTriggerInfo
cronTriggersInfo of
Maybe CronTriggerInfo
Nothing ->
QErr -> m ()
logInternalError (QErr -> m ()) -> QErr -> m ()
forall a b. (a -> b) -> a -> b
$
Code -> Text -> QErr
err500 Code
Unexpected Text
"could not find cron trigger in cache"
Just CronTriggerInfo {[EventHeaderInfo]
Maybe Value
Maybe Text
Maybe MetadataResponseTransform
Maybe RequestTransform
CronSchedule
EnvRecord ResolvedWebhook
TriggerName
STRetryConf
ctiResponseTransform :: Maybe MetadataResponseTransform
ctiRequestTransform :: Maybe RequestTransform
ctiComment :: Maybe Text
ctiHeaders :: [EventHeaderInfo]
ctiWebhookInfo :: EnvRecord ResolvedWebhook
ctiRetryConf :: STRetryConf
ctiPayload :: Maybe Value
ctiSchedule :: CronSchedule
ctiName :: TriggerName
ctiResponseTransform :: CronTriggerInfo -> Maybe MetadataResponseTransform
ctiRequestTransform :: CronTriggerInfo -> Maybe RequestTransform
ctiComment :: CronTriggerInfo -> Maybe Text
ctiHeaders :: CronTriggerInfo -> [EventHeaderInfo]
ctiWebhookInfo :: CronTriggerInfo -> EnvRecord ResolvedWebhook
ctiRetryConf :: CronTriggerInfo -> STRetryConf
ctiPayload :: CronTriggerInfo -> Maybe Value
ctiSchedule :: CronTriggerInfo -> CronSchedule
ctiName :: CronTriggerInfo -> TriggerName
..} -> do
let payload :: ScheduledEventWebhookPayload
payload =
CronEventId
-> Maybe TriggerName
-> UTCTime
-> Value
-> Maybe Text
-> Maybe UTCTime
-> Maybe RequestTransform
-> Maybe MetadataResponseTransform
-> ScheduledEventWebhookPayload
ScheduledEventWebhookPayload
CronEventId
id'
(TriggerName -> Maybe TriggerName
forall a. a -> Maybe a
Just TriggerName
name)
UTCTime
st
(Value -> Maybe Value -> Value
forall a. a -> Maybe a -> a
fromMaybe Value
J.Null Maybe Value
ctiPayload)
Maybe Text
ctiComment
Maybe UTCTime
forall a. Maybe a
Nothing
Maybe RequestTransform
ctiRequestTransform
Maybe MetadataResponseTransform
ctiResponseTransform
retryCtx :: RetryContext
retryCtx = Int -> STRetryConf -> RetryContext
RetryContext Int
tries STRetryConf
ctiRetryConf
Either QErr ()
finally <-
MetadataStorageT m () -> m (Either QErr ())
forall (m :: * -> *) a. MetadataStorageT m a -> m (Either QErr a)
runMetadataStorageT (MetadataStorageT m () -> m (Either QErr ()))
-> MetadataStorageT m () -> m (Either QErr ())
forall a b. (a -> b) -> a -> b
$
(ReaderT (Logger Hasura, Manager) (MetadataStorageT m) ()
-> (Logger Hasura, Manager) -> MetadataStorageT m ())
-> (Logger Hasura, Manager)
-> ReaderT (Logger Hasura, Manager) (MetadataStorageT m) ()
-> MetadataStorageT m ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip ReaderT (Logger Hasura, Manager) (MetadataStorageT m) ()
-> (Logger Hasura, Manager) -> MetadataStorageT m ()
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT (Logger Hasura
logger, Manager
httpMgr) (ReaderT (Logger Hasura, Manager) (MetadataStorageT m) ()
-> MetadataStorageT m ())
-> ReaderT (Logger Hasura, Manager) (MetadataStorageT m) ()
-> MetadataStorageT m ()
forall a b. (a -> b) -> a -> b
$
CronEventId
-> [EventHeaderInfo]
-> RetryContext
-> ScheduledEventWebhookPayload
-> EnvRecord ResolvedWebhook
-> ScheduledEventType
-> ReaderT (Logger Hasura, Manager) (MetadataStorageT m) ()
forall r (m :: * -> *).
(MonadReader r m, Has Manager r, Has (Logger Hasura) r, MonadIO m,
HasReporter m, MonadMetadataStorage m) =>
CronEventId
-> [EventHeaderInfo]
-> RetryContext
-> ScheduledEventWebhookPayload
-> EnvRecord ResolvedWebhook
-> ScheduledEventType
-> m ()
processScheduledEvent
CronEventId
id'
[EventHeaderInfo]
ctiHeaders
RetryContext
retryCtx
ScheduledEventWebhookPayload
payload
EnvRecord ResolvedWebhook
ctiWebhookInfo
ScheduledEventType
Cron
CronEventId -> TVar (Set CronEventId) -> m ()
forall (m :: * -> *).
MonadIO m =>
CronEventId -> TVar (Set CronEventId) -> m ()
removeEventFromLockedEvents CronEventId
id' TVar (Set CronEventId)
lockedCronEvents
Either QErr () -> (QErr -> m ()) -> m ()
forall (m :: * -> *) e a.
Applicative m =>
Either e a -> (e -> m a) -> m a
onLeft Either QErr ()
finally QErr -> m ()
logInternalError
where
logInternalError :: QErr -> m ()
logInternalError QErr
err = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ())
-> (ScheduledTriggerInternalErr -> IO ())
-> ScheduledTriggerInternalErr
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Logger Hasura
-> forall a (m :: * -> *).
(ToEngineLog a Hasura, MonadIO m) =>
a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
(ToEngineLog a impl, MonadIO m) =>
a -> m ()
L.unLogger Logger Hasura
logger (ScheduledTriggerInternalErr -> m ())
-> ScheduledTriggerInternalErr -> m ()
forall a b. (a -> b) -> a -> b
$ QErr -> ScheduledTriggerInternalErr
ScheduledTriggerInternalErr QErr
err
processOneOffScheduledEvents ::
( MonadIO m,
Tracing.HasReporter m,
MonadMetadataStorage (MetadataStorageT m)
) =>
Env.Environment ->
L.Logger L.Hasura ->
HTTP.Manager ->
[OneOffScheduledEvent] ->
TVar (Set.Set OneOffScheduledEventId) ->
m ()
processOneOffScheduledEvents :: Environment
-> Logger Hasura
-> Manager
-> [OneOffScheduledEvent]
-> TVar (Set CronEventId)
-> m ()
processOneOffScheduledEvents
Environment
env
Logger Hasura
logger
Manager
httpMgr
[OneOffScheduledEvent]
oneOffEvents
TVar (Set CronEventId)
lockedOneOffScheduledEvents = do
[CronEventId] -> TVar (Set CronEventId) -> m ()
forall (m :: * -> *).
MonadIO m =>
[CronEventId] -> TVar (Set CronEventId) -> m ()
saveLockedEvents ((OneOffScheduledEvent -> CronEventId)
-> [OneOffScheduledEvent] -> [CronEventId]
forall a b. (a -> b) -> [a] -> [b]
map OneOffScheduledEvent -> CronEventId
_ooseId [OneOffScheduledEvent]
oneOffEvents) TVar (Set CronEventId)
lockedOneOffScheduledEvents
[OneOffScheduledEvent] -> (OneOffScheduledEvent -> m ()) -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ [OneOffScheduledEvent]
oneOffEvents ((OneOffScheduledEvent -> m ()) -> m ())
-> (OneOffScheduledEvent -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \OneOffScheduledEvent {Int
[HeaderConf]
Maybe Value
Maybe Text
Maybe UTCTime
Maybe MetadataResponseTransform
Maybe RequestTransform
Text
UTCTime
CronEventId
InputWebhook
STRetryConf
_ooseResponseTransform :: OneOffScheduledEvent -> Maybe MetadataResponseTransform
_ooseRequestTransform :: OneOffScheduledEvent -> Maybe RequestTransform
_ooseComment :: OneOffScheduledEvent -> Maybe Text
_ooseNextRetryAt :: OneOffScheduledEvent -> Maybe UTCTime
_ooseCreatedAt :: OneOffScheduledEvent -> UTCTime
_ooseTries :: OneOffScheduledEvent -> Int
_ooseStatus :: OneOffScheduledEvent -> Text
_ooseHeaderConf :: OneOffScheduledEvent -> [HeaderConf]
_oosePayload :: OneOffScheduledEvent -> Maybe Value
_ooseRetryConf :: OneOffScheduledEvent -> STRetryConf
_ooseScheduledTime :: OneOffScheduledEvent -> UTCTime
_ooseWebhookConf :: OneOffScheduledEvent -> InputWebhook
_ooseResponseTransform :: Maybe MetadataResponseTransform
_ooseRequestTransform :: Maybe RequestTransform
_ooseComment :: Maybe Text
_ooseNextRetryAt :: Maybe UTCTime
_ooseCreatedAt :: UTCTime
_ooseTries :: Int
_ooseStatus :: Text
_ooseHeaderConf :: [HeaderConf]
_oosePayload :: Maybe Value
_ooseRetryConf :: STRetryConf
_ooseScheduledTime :: UTCTime
_ooseWebhookConf :: InputWebhook
_ooseId :: CronEventId
_ooseId :: OneOffScheduledEvent -> CronEventId
..} -> do
((QErr -> m ()) -> (() -> m ()) -> Either QErr () -> m ()
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either QErr -> m ()
logInternalError () -> m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure) (Either QErr () -> m ()) -> m (Either QErr ()) -> m ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< MetadataStorageT m () -> m (Either QErr ())
forall (m :: * -> *) a. MetadataStorageT m a -> m (Either QErr a)
runMetadataStorageT do
ResolvedWebhook
webhookInfo <- Environment -> InputWebhook -> MetadataStorageT m ResolvedWebhook
forall (m :: * -> *).
QErrM m =>
Environment -> InputWebhook -> m ResolvedWebhook
resolveWebhook Environment
env InputWebhook
_ooseWebhookConf
[EventHeaderInfo]
headerInfo <- Environment -> [HeaderConf] -> MetadataStorageT m [EventHeaderInfo]
forall (m :: * -> *).
QErrM m =>
Environment -> [HeaderConf] -> m [EventHeaderInfo]
getHeaderInfosFromConf Environment
env [HeaderConf]
_ooseHeaderConf
let payload :: ScheduledEventWebhookPayload
payload =
CronEventId
-> Maybe TriggerName
-> UTCTime
-> Value
-> Maybe Text
-> Maybe UTCTime
-> Maybe RequestTransform
-> Maybe MetadataResponseTransform
-> ScheduledEventWebhookPayload
ScheduledEventWebhookPayload
CronEventId
_ooseId
Maybe TriggerName
forall a. Maybe a
Nothing
UTCTime
_ooseScheduledTime
(Value -> Maybe Value -> Value
forall a. a -> Maybe a -> a
fromMaybe Value
J.Null Maybe Value
_oosePayload)
Maybe Text
_ooseComment
(UTCTime -> Maybe UTCTime
forall a. a -> Maybe a
Just UTCTime
_ooseCreatedAt)
Maybe RequestTransform
_ooseRequestTransform
Maybe MetadataResponseTransform
_ooseResponseTransform
retryCtx :: RetryContext
retryCtx = Int -> STRetryConf -> RetryContext
RetryContext Int
_ooseTries STRetryConf
_ooseRetryConf
webhookEnvRecord :: EnvRecord ResolvedWebhook
webhookEnvRecord = Text -> ResolvedWebhook -> EnvRecord ResolvedWebhook
forall a. Text -> a -> EnvRecord a
EnvRecord (InputWebhook -> Text
getTemplateFromUrl InputWebhook
_ooseWebhookConf) ResolvedWebhook
webhookInfo
(ReaderT (Logger Hasura, Manager) (MetadataStorageT m) ()
-> (Logger Hasura, Manager) -> MetadataStorageT m ())
-> (Logger Hasura, Manager)
-> ReaderT (Logger Hasura, Manager) (MetadataStorageT m) ()
-> MetadataStorageT m ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip ReaderT (Logger Hasura, Manager) (MetadataStorageT m) ()
-> (Logger Hasura, Manager) -> MetadataStorageT m ()
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT (Logger Hasura
logger, Manager
httpMgr) (ReaderT (Logger Hasura, Manager) (MetadataStorageT m) ()
-> MetadataStorageT m ())
-> ReaderT (Logger Hasura, Manager) (MetadataStorageT m) ()
-> MetadataStorageT m ()
forall a b. (a -> b) -> a -> b
$
CronEventId
-> [EventHeaderInfo]
-> RetryContext
-> ScheduledEventWebhookPayload
-> EnvRecord ResolvedWebhook
-> ScheduledEventType
-> ReaderT (Logger Hasura, Manager) (MetadataStorageT m) ()
forall r (m :: * -> *).
(MonadReader r m, Has Manager r, Has (Logger Hasura) r, MonadIO m,
HasReporter m, MonadMetadataStorage m) =>
CronEventId
-> [EventHeaderInfo]
-> RetryContext
-> ScheduledEventWebhookPayload
-> EnvRecord ResolvedWebhook
-> ScheduledEventType
-> m ()
processScheduledEvent CronEventId
_ooseId [EventHeaderInfo]
headerInfo RetryContext
retryCtx ScheduledEventWebhookPayload
payload EnvRecord ResolvedWebhook
webhookEnvRecord ScheduledEventType
OneOff
CronEventId -> TVar (Set CronEventId) -> MetadataStorageT m ()
forall (m :: * -> *).
MonadIO m =>
CronEventId -> TVar (Set CronEventId) -> m ()
removeEventFromLockedEvents CronEventId
_ooseId TVar (Set CronEventId)
lockedOneOffScheduledEvents
where
logInternalError :: QErr -> m ()
logInternalError QErr
err = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ())
-> (ScheduledTriggerInternalErr -> IO ())
-> ScheduledTriggerInternalErr
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Logger Hasura
-> forall a (m :: * -> *).
(ToEngineLog a Hasura, MonadIO m) =>
a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
(ToEngineLog a impl, MonadIO m) =>
a -> m ()
L.unLogger Logger Hasura
logger (ScheduledTriggerInternalErr -> m ())
-> ScheduledTriggerInternalErr -> m ()
forall a b. (a -> b) -> a -> b
$ QErr -> ScheduledTriggerInternalErr
ScheduledTriggerInternalErr QErr
err
getTemplateFromUrl :: InputWebhook -> Text
getTemplateFromUrl InputWebhook
url = URLTemplate -> Text
printURLTemplate (URLTemplate -> Text) -> URLTemplate -> Text
forall a b. (a -> b) -> a -> b
$ InputWebhook -> URLTemplate
unInputWebhook InputWebhook
url
processScheduledTriggers ::
( MonadIO m,
Tracing.HasReporter m,
MonadMetadataStorage (MetadataStorageT m)
) =>
Env.Environment ->
L.Logger L.Hasura ->
HTTP.Manager ->
IO SchemaCache ->
LockedEventsCtx ->
m (Forever m)
processScheduledTriggers :: Environment
-> Logger Hasura
-> Manager
-> IO SchemaCache
-> LockedEventsCtx
-> m (Forever m)
processScheduledTriggers Environment
env Logger Hasura
logger Manager
httpMgr IO SchemaCache
getSC LockedEventsCtx {TVar (Set CronEventId)
TVar (HashMap SourceName (Set CronEventId))
leActionEvents :: LockedEventsCtx -> TVar (Set CronEventId)
leEvents :: LockedEventsCtx -> TVar (HashMap SourceName (Set CronEventId))
leOneOffEvents :: LockedEventsCtx -> TVar (Set CronEventId)
leCronEvents :: LockedEventsCtx -> TVar (Set CronEventId)
leActionEvents :: TVar (Set CronEventId)
leEvents :: TVar (HashMap SourceName (Set CronEventId))
leOneOffEvents :: TVar (Set CronEventId)
leCronEvents :: TVar (Set CronEventId)
..} = do
Forever m -> m (Forever m)
forall (m :: * -> *) a. Monad m => a -> m a
return (Forever m -> m (Forever m)) -> Forever m -> m (Forever m)
forall a b. (a -> b) -> a -> b
$
() -> (() -> m ()) -> Forever m
forall (m :: * -> *) a. a -> (a -> m a) -> Forever m
Forever () ((() -> m ()) -> Forever m) -> (() -> m ()) -> Forever m
forall a b. (a -> b) -> a -> b
$
m () -> () -> m ()
forall a b. a -> b -> a
const (m () -> () -> m ()) -> m () -> () -> m ()
forall a b. (a -> b) -> a -> b
$ do
Either QErr ([CronEvent], [OneOffScheduledEvent])
result <- MetadataStorageT m ([CronEvent], [OneOffScheduledEvent])
-> m (Either QErr ([CronEvent], [OneOffScheduledEvent]))
forall (m :: * -> *) a. MetadataStorageT m a -> m (Either QErr a)
runMetadataStorageT MetadataStorageT m ([CronEvent], [OneOffScheduledEvent])
forall (m :: * -> *).
MonadMetadataStorage m =>
m ([CronEvent], [OneOffScheduledEvent])
getScheduledEventsForDelivery
case Either QErr ([CronEvent], [OneOffScheduledEvent])
result of
Left QErr
e -> QErr -> m ()
logInternalError QErr
e
Right ([CronEvent]
cronEvents, [OneOffScheduledEvent]
oneOffEvents) -> do
Logger Hasura
-> Manager
-> [CronEvent]
-> IO SchemaCache
-> TVar (Set CronEventId)
-> m ()
forall (m :: * -> *).
(MonadIO m, HasReporter m,
MonadMetadataStorage (MetadataStorageT m)) =>
Logger Hasura
-> Manager
-> [CronEvent]
-> IO SchemaCache
-> TVar (Set CronEventId)
-> m ()
processCronEvents Logger Hasura
logger Manager
httpMgr [CronEvent]
cronEvents IO SchemaCache
getSC TVar (Set CronEventId)
leCronEvents
Environment
-> Logger Hasura
-> Manager
-> [OneOffScheduledEvent]
-> TVar (Set CronEventId)
-> m ()
forall (m :: * -> *).
(MonadIO m, HasReporter m,
MonadMetadataStorage (MetadataStorageT m)) =>
Environment
-> Logger Hasura
-> Manager
-> [OneOffScheduledEvent]
-> TVar (Set CronEventId)
-> m ()
processOneOffScheduledEvents Environment
env Logger Hasura
logger Manager
httpMgr [OneOffScheduledEvent]
oneOffEvents TVar (Set CronEventId)
leOneOffEvents
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ DiffTime -> IO ()
sleep (Seconds -> DiffTime
seconds Seconds
10)
where
logInternalError :: QErr -> m ()
logInternalError QErr
err = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ())
-> (ScheduledTriggerInternalErr -> IO ())
-> ScheduledTriggerInternalErr
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Logger Hasura
-> forall a (m :: * -> *).
(ToEngineLog a Hasura, MonadIO m) =>
a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
(ToEngineLog a impl, MonadIO m) =>
a -> m ()
L.unLogger Logger Hasura
logger (ScheduledTriggerInternalErr -> m ())
-> ScheduledTriggerInternalErr -> m ()
forall a b. (a -> b) -> a -> b
$ QErr -> ScheduledTriggerInternalErr
ScheduledTriggerInternalErr QErr
err
processScheduledEvent ::
( MonadReader r m,
Has HTTP.Manager r,
Has (L.Logger L.Hasura) r,
MonadIO m,
Tracing.HasReporter m,
MonadMetadataStorage m
) =>
ScheduledEventId ->
[EventHeaderInfo] ->
RetryContext ->
ScheduledEventWebhookPayload ->
EnvRecord ResolvedWebhook ->
ScheduledEventType ->
m ()
processScheduledEvent :: CronEventId
-> [EventHeaderInfo]
-> RetryContext
-> ScheduledEventWebhookPayload
-> EnvRecord ResolvedWebhook
-> ScheduledEventType
-> m ()
processScheduledEvent CronEventId
eventId [EventHeaderInfo]
eventHeaders RetryContext
retryCtx ScheduledEventWebhookPayload
payload EnvRecord ResolvedWebhook
webhookUrl ScheduledEventType
type' =
Text -> TraceT m () -> m ()
forall (m :: * -> *) a.
(HasReporter m, MonadIO m) =>
Text -> TraceT m a -> m a
Tracing.runTraceT Text
traceNote do
UTCTime
currentTime <- IO UTCTime -> TraceT m UTCTime
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
let retryConf :: STRetryConf
retryConf = RetryContext -> STRetryConf
_rctxConf RetryContext
retryCtx
scheduledTime :: UTCTime
scheduledTime = ScheduledEventWebhookPayload -> UTCTime
sewpScheduledTime ScheduledEventWebhookPayload
payload
if NominalDiffTime -> DiffTime
forall x y. (Duration x, Duration y) => x -> y
convertDuration (UTCTime -> UTCTime -> NominalDiffTime
diffUTCTime UTCTime
currentTime UTCTime
scheduledTime)
DiffTime -> DiffTime -> Bool
forall a. Ord a => a -> a -> Bool
> NonNegativeDiffTime -> DiffTime
Numeric.unNonNegativeDiffTime (STRetryConf -> NonNegativeDiffTime
strcToleranceSeconds STRetryConf
retryConf)
then CronEventId -> ScheduledEventType -> TraceT m ()
forall (m :: * -> *).
MonadMetadataStorage m =>
CronEventId -> ScheduledEventType -> m ()
processDead CronEventId
eventId ScheduledEventType
type'
else do
let timeoutSeconds :: Int
timeoutSeconds =
DiffTime -> Int
forall a b. (RealFrac a, Integral b) => a -> b
round (DiffTime -> Int) -> DiffTime -> Int
forall a b. (a -> b) -> a -> b
$
NonNegativeDiffTime -> DiffTime
Numeric.unNonNegativeDiffTime (NonNegativeDiffTime -> DiffTime)
-> NonNegativeDiffTime -> DiffTime
forall a b. (a -> b) -> a -> b
$
STRetryConf -> NonNegativeDiffTime
strcTimeoutSeconds STRetryConf
retryConf
httpTimeout :: ResponseTimeout
httpTimeout = Int -> ResponseTimeout
HTTP.responseTimeoutMicro (Int
timeoutSeconds Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000000)
([Header]
headers, [HeaderConf]
decodedHeaders) = [EventHeaderInfo] -> ([Header], [HeaderConf])
prepareHeaders [EventHeaderInfo]
eventHeaders
extraLogCtx :: ExtraLogContext
extraLogCtx = CronEventId -> Maybe TriggerName -> ExtraLogContext
ExtraLogContext CronEventId
eventId (ScheduledEventWebhookPayload -> Maybe TriggerName
sewpName ScheduledEventWebhookPayload
payload)
webhookReqBodyJson :: Value
webhookReqBodyJson = ScheduledEventWebhookPayload -> Value
forall a. ToJSON a => a -> Value
J.toJSON ScheduledEventWebhookPayload
payload
webhookReqBody :: ByteString
webhookReqBody = Value -> ByteString
forall a. ToJSON a => a -> ByteString
J.encode Value
webhookReqBodyJson
requestTransform :: Maybe RequestTransform
requestTransform = ScheduledEventWebhookPayload -> Maybe RequestTransform
sewpRequestTransform ScheduledEventWebhookPayload
payload
responseTransform :: Maybe ResponseTransform
responseTransform = MetadataResponseTransform -> ResponseTransform
mkResponseTransform (MetadataResponseTransform -> ResponseTransform)
-> Maybe MetadataResponseTransform -> Maybe ResponseTransform
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ScheduledEventWebhookPayload -> Maybe MetadataResponseTransform
sewpResponseTransform ScheduledEventWebhookPayload
payload
Either
(TransformableRequestError 'ScheduledType)
(Request, HTTPResp 'ScheduledType)
eitherReqRes <-
ExceptT
(TransformableRequestError 'ScheduledType)
(TraceT m)
(Request, HTTPResp 'ScheduledType)
-> TraceT
m
(Either
(TransformableRequestError 'ScheduledType)
(Request, HTTPResp 'ScheduledType))
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT
(TransformableRequestError 'ScheduledType)
(TraceT m)
(Request, HTTPResp 'ScheduledType)
-> TraceT
m
(Either
(TransformableRequestError 'ScheduledType)
(Request, HTTPResp 'ScheduledType)))
-> ExceptT
(TransformableRequestError 'ScheduledType)
(TraceT m)
(Request, HTTPResp 'ScheduledType)
-> TraceT
m
(Either
(TransformableRequestError 'ScheduledType)
(Request, HTTPResp 'ScheduledType))
forall a b. (a -> b) -> a -> b
$
[Header]
-> ResponseTimeout
-> ByteString
-> Maybe RequestTransform
-> ResolvedWebhook
-> ExceptT
(TransformableRequestError 'ScheduledType)
(TraceT m)
RequestDetails
forall (a :: TriggerTypes) (m :: * -> *).
MonadError (TransformableRequestError a) m =>
[Header]
-> ResponseTimeout
-> ByteString
-> Maybe RequestTransform
-> ResolvedWebhook
-> m RequestDetails
mkRequest [Header]
headers ResponseTimeout
httpTimeout ByteString
webhookReqBody Maybe RequestTransform
requestTransform (EnvRecord ResolvedWebhook -> ResolvedWebhook
forall a. EnvRecord a -> a
_envVarValue EnvRecord ResolvedWebhook
webhookUrl) ExceptT
(TransformableRequestError 'ScheduledType)
(TraceT m)
RequestDetails
-> (RequestDetails
-> ExceptT
(TransformableRequestError 'ScheduledType)
(TraceT m)
(Request, HTTPResp 'ScheduledType))
-> ExceptT
(TransformableRequestError 'ScheduledType)
(TraceT m)
(Request, HTTPResp 'ScheduledType)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \RequestDetails
reqDetails -> do
let request :: Request
request = RequestDetails -> Request
extractRequest RequestDetails
reqDetails
logger :: Either (HTTPErr 'ScheduledType) (HTTPResp 'ScheduledType)
-> RequestDetails
-> ExceptT (TransformableRequestError 'ScheduledType) (TraceT m) ()
logger Either (HTTPErr 'ScheduledType) (HTTPResp 'ScheduledType)
e RequestDetails
d = Either (HTTPErr 'ScheduledType) (HTTPResp 'ScheduledType)
-> ExtraLogContext
-> RequestDetails
-> Text
-> [HeaderConf]
-> ExceptT (TransformableRequestError 'ScheduledType) (TraceT m) ()
forall r (m :: * -> *).
(MonadReader r m, Has (Logger Hasura) r, MonadIO m) =>
Either (HTTPErr 'ScheduledType) (HTTPResp 'ScheduledType)
-> ExtraLogContext
-> RequestDetails
-> Text
-> [HeaderConf]
-> m ()
logHTTPForST Either (HTTPErr 'ScheduledType) (HTTPResp 'ScheduledType)
e ExtraLogContext
extraLogCtx RequestDetails
d (EnvRecord ResolvedWebhook -> Text
forall a. EnvRecord a -> Text
_envVarName EnvRecord ResolvedWebhook
webhookUrl) [HeaderConf]
decodedHeaders
sessionVars :: Maybe SessionVariables
sessionVars = RequestDetails -> Maybe SessionVariables
_rdSessionVars RequestDetails
reqDetails
HTTPResp 'ScheduledType
resp <- RequestDetails
-> Maybe ResponseTransform
-> Maybe SessionVariables
-> (Either (HTTPErr 'ScheduledType) (HTTPResp 'ScheduledType)
-> RequestDetails
-> ExceptT
(TransformableRequestError 'ScheduledType) (TraceT m) ())
-> ExceptT
(TransformableRequestError 'ScheduledType)
(TraceT m)
(HTTPResp 'ScheduledType)
forall r (m :: * -> *) (a :: TriggerTypes).
(MonadReader r m, MonadError (TransformableRequestError a) m,
Has Manager r, Has (Logger Hasura) r, MonadIO m, MonadTrace m) =>
RequestDetails
-> Maybe ResponseTransform
-> Maybe SessionVariables
-> (Either (HTTPErr a) (HTTPResp a) -> RequestDetails -> m ())
-> m (HTTPResp a)
invokeRequest RequestDetails
reqDetails Maybe ResponseTransform
responseTransform Maybe SessionVariables
sessionVars Either (HTTPErr 'ScheduledType) (HTTPResp 'ScheduledType)
-> RequestDetails
-> ExceptT (TransformableRequestError 'ScheduledType) (TraceT m) ()
logger
(Request, HTTPResp 'ScheduledType)
-> ExceptT
(TransformableRequestError 'ScheduledType)
(TraceT m)
(Request, HTTPResp 'ScheduledType)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Request
request, HTTPResp 'ScheduledType
resp)
case Either
(TransformableRequestError 'ScheduledType)
(Request, HTTPResp 'ScheduledType)
eitherReqRes of
Right (Request
req, HTTPResp 'ScheduledType
resp) ->
let reqBody :: Value
reqBody = Value -> Maybe Value -> Value
forall a. a -> Maybe a -> a
fromMaybe Value
J.Null (Maybe Value -> Value) -> Maybe Value -> Value
forall a b. (a -> b) -> a -> b
$ Getting (Maybe ByteString) Request (Maybe ByteString)
-> Request -> Maybe ByteString
forall s (m :: * -> *) a. MonadReader s m => Getting a s a -> m a
view Getting (Maybe ByteString) Request (Maybe ByteString)
Lens' Request (Maybe ByteString)
HTTP.body Request
req Maybe ByteString -> (ByteString -> Maybe Value) -> Maybe Value
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= FromJSON Value => ByteString -> Maybe Value
forall a. FromJSON a => ByteString -> Maybe a
J.decode @J.Value
in CronEventId
-> [HeaderConf]
-> ScheduledEventType
-> Value
-> HTTPResp 'ScheduledType
-> TraceT m ()
forall (m :: * -> *) (a :: TriggerTypes).
MonadMetadataStorage m =>
CronEventId
-> [HeaderConf]
-> ScheduledEventType
-> Value
-> HTTPResp a
-> m ()
processSuccess CronEventId
eventId [HeaderConf]
decodedHeaders ScheduledEventType
type' Value
reqBody HTTPResp 'ScheduledType
resp
Left (HTTPError Value
reqBody HTTPErr 'ScheduledType
e) -> CronEventId
-> RetryContext
-> [HeaderConf]
-> ScheduledEventType
-> Value
-> HTTPErr 'ScheduledType
-> TraceT m ()
forall (m :: * -> *) (a :: TriggerTypes).
(MonadIO m, MonadMetadataStorage m) =>
CronEventId
-> RetryContext
-> [HeaderConf]
-> ScheduledEventType
-> Value
-> HTTPErr a
-> m ()
processError CronEventId
eventId RetryContext
retryCtx [HeaderConf]
decodedHeaders ScheduledEventType
type' Value
reqBody HTTPErr 'ScheduledType
e
Left (TransformationError Value
_ TransformErrorBundle
e) -> do
Logger Hasura
logger :: L.Logger L.Hasura <- (r -> Logger Hasura) -> TraceT m (Logger Hasura)
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks r -> Logger Hasura
forall a t. Has a t => t -> a
getter
Logger Hasura
-> forall a (m :: * -> *).
(ToEngineLog a Hasura, MonadIO m) =>
a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
(ToEngineLog a impl, MonadIO m) =>
a -> m ()
L.unLogger Logger Hasura
logger (UnstructuredLog -> TraceT m ()) -> UnstructuredLog -> TraceT m ()
forall a b. (a -> b) -> a -> b
$ LogLevel -> SerializableBlob -> UnstructuredLog
L.UnstructuredLog LogLevel
L.LevelError (ByteString -> SerializableBlob
SB.fromLBS (ByteString -> SerializableBlob) -> ByteString -> SerializableBlob
forall a b. (a -> b) -> a -> b
$ TransformErrorBundle -> ByteString
forall a. ToJSON a => a -> ByteString
J.encode TransformErrorBundle
e)
CronEventId
-> ScheduledEventOp -> ScheduledEventType -> TraceT m ()
forall (m :: * -> *).
MonadMetadataStorage m =>
CronEventId -> ScheduledEventOp -> ScheduledEventType -> m ()
setScheduledEventOp CronEventId
eventId (ScheduledEventStatus -> ScheduledEventOp
SEOpStatus ScheduledEventStatus
SESError) ScheduledEventType
type'
where
traceNote :: Text
traceNote = Text
"Scheduled trigger" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> (TriggerName -> Text) -> Maybe TriggerName -> Text
forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
foldMap ((Text
": " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<>) (Text -> Text) -> (TriggerName -> Text) -> TriggerName -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TriggerName -> Text
triggerNameToTxt) (ScheduledEventWebhookPayload -> Maybe TriggerName
sewpName ScheduledEventWebhookPayload
payload)
processError ::
( MonadIO m,
MonadMetadataStorage m
) =>
ScheduledEventId ->
RetryContext ->
[HeaderConf] ->
ScheduledEventType ->
J.Value ->
HTTPErr a ->
m ()
processError :: CronEventId
-> RetryContext
-> [HeaderConf]
-> ScheduledEventType
-> Value
-> HTTPErr a
-> m ()
processError CronEventId
eventId RetryContext
retryCtx [HeaderConf]
decodedHeaders ScheduledEventType
type' Value
reqJson HTTPErr a
err = do
let invocation :: Invocation 'ScheduledType
invocation = case HTTPErr a
err of
HClient HttpException
httpException ->
let statusMaybe :: Maybe Int
statusMaybe = HttpException -> Maybe Int
getHTTPExceptionStatus HttpException
httpException
in CronEventId
-> Maybe Int
-> [HeaderConf]
-> SerializableBlob
-> [HeaderConf]
-> Value
-> Invocation 'ScheduledType
mkInvocation CronEventId
eventId Maybe Int
statusMaybe [HeaderConf]
decodedHeaders (ByteString -> SerializableBlob
SB.fromLBS (ByteString -> SerializableBlob) -> ByteString -> SerializableBlob
forall a b. (a -> b) -> a -> b
$ HttpException -> ByteString
forall a. ToJSON a => a -> ByteString
J.encode HttpException
httpException) [] Value
reqJson
HStatus HTTPResp a
errResp -> do
let respPayload :: SerializableBlob
respPayload = HTTPResp a -> SerializableBlob
forall (a :: TriggerTypes). HTTPResp a -> SerializableBlob
hrsBody HTTPResp a
errResp
respHeaders :: [HeaderConf]
respHeaders = HTTPResp a -> [HeaderConf]
forall (a :: TriggerTypes). HTTPResp a -> [HeaderConf]
hrsHeaders HTTPResp a
errResp
respStatus :: Int
respStatus = HTTPResp a -> Int
forall (a :: TriggerTypes). HTTPResp a -> Int
hrsStatus HTTPResp a
errResp
CronEventId
-> Maybe Int
-> [HeaderConf]
-> SerializableBlob
-> [HeaderConf]
-> Value
-> Invocation 'ScheduledType
mkInvocation CronEventId
eventId (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
respStatus) [HeaderConf]
decodedHeaders SerializableBlob
respPayload [HeaderConf]
respHeaders Value
reqJson
HOther String
detail -> do
let errMsg :: SerializableBlob
errMsg = (ByteString -> SerializableBlob
SB.fromLBS (ByteString -> SerializableBlob) -> ByteString -> SerializableBlob
forall a b. (a -> b) -> a -> b
$ String -> ByteString
forall a. ToJSON a => a -> ByteString
J.encode String
detail)
CronEventId
-> Maybe Int
-> [HeaderConf]
-> SerializableBlob
-> [HeaderConf]
-> Value
-> Invocation 'ScheduledType
mkInvocation CronEventId
eventId (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
500) [HeaderConf]
decodedHeaders SerializableBlob
errMsg [] Value
reqJson
Invocation 'ScheduledType -> ScheduledEventType -> m ()
forall (m :: * -> *).
MonadMetadataStorage m =>
Invocation 'ScheduledType -> ScheduledEventType -> m ()
insertScheduledEventInvocation Invocation 'ScheduledType
invocation ScheduledEventType
type'
CronEventId
-> RetryContext -> HTTPErr a -> ScheduledEventType -> m ()
forall (m :: * -> *) (a :: TriggerTypes).
(MonadIO m, MonadMetadataStorage m) =>
CronEventId
-> RetryContext -> HTTPErr a -> ScheduledEventType -> m ()
retryOrMarkError CronEventId
eventId RetryContext
retryCtx HTTPErr a
err ScheduledEventType
type'
retryOrMarkError ::
(MonadIO m, MonadMetadataStorage m) =>
ScheduledEventId ->
RetryContext ->
HTTPErr a ->
ScheduledEventType ->
m ()
retryOrMarkError :: CronEventId
-> RetryContext -> HTTPErr a -> ScheduledEventType -> m ()
retryOrMarkError CronEventId
eventId RetryContext
retryCtx HTTPErr a
err ScheduledEventType
type' = do
let RetryContext Int
tries STRetryConf
retryConf = RetryContext
retryCtx
mRetryHeader :: Maybe Text
mRetryHeader = HTTPErr a -> Maybe Text
forall (a :: TriggerTypes). HTTPErr a -> Maybe Text
getRetryAfterHeaderFromHTTPErr HTTPErr a
err
mRetryHeaderSeconds :: Maybe Int
mRetryHeaderSeconds = Text -> Maybe Int
parseRetryHeaderValue (Text -> Maybe Int) -> Maybe Text -> Maybe Int
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Maybe Text
mRetryHeader
triesExhausted :: Bool
triesExhausted = Int
tries Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= STRetryConf -> Int
strcNumRetries STRetryConf
retryConf
noRetryHeader :: Bool
noRetryHeader = Maybe Int -> Bool
forall a. Maybe a -> Bool
isNothing Maybe Int
mRetryHeaderSeconds
if Bool
triesExhausted Bool -> Bool -> Bool
&& Bool
noRetryHeader
then CronEventId -> ScheduledEventOp -> ScheduledEventType -> m ()
forall (m :: * -> *).
MonadMetadataStorage m =>
CronEventId -> ScheduledEventOp -> ScheduledEventType -> m ()
setScheduledEventOp CronEventId
eventId (ScheduledEventStatus -> ScheduledEventOp
SEOpStatus ScheduledEventStatus
SESError) ScheduledEventType
type'
else do
UTCTime
currentTime <- IO UTCTime -> m UTCTime
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
let delay :: Int
delay =
Int -> Maybe Int -> Int
forall a. a -> Maybe a -> a
fromMaybe
( DiffTime -> Int
forall a b. (RealFrac a, Integral b) => a -> b
round (DiffTime -> Int) -> DiffTime -> Int
forall a b. (a -> b) -> a -> b
$
NonNegativeDiffTime -> DiffTime
Numeric.unNonNegativeDiffTime (NonNegativeDiffTime -> DiffTime)
-> NonNegativeDiffTime -> DiffTime
forall a b. (a -> b) -> a -> b
$
STRetryConf -> NonNegativeDiffTime
strcRetryIntervalSeconds STRetryConf
retryConf
)
Maybe Int
mRetryHeaderSeconds
diff :: NominalDiffTime
diff = Int -> NominalDiffTime
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
delay
retryTime :: UTCTime
retryTime = NominalDiffTime -> UTCTime -> UTCTime
addUTCTime NominalDiffTime
diff UTCTime
currentTime
CronEventId -> ScheduledEventOp -> ScheduledEventType -> m ()
forall (m :: * -> *).
MonadMetadataStorage m =>
CronEventId -> ScheduledEventOp -> ScheduledEventType -> m ()
setScheduledEventOp CronEventId
eventId (UTCTime -> ScheduledEventOp
SEOpRetry UTCTime
retryTime) ScheduledEventType
type'
processSuccess ::
(MonadMetadataStorage m) =>
ScheduledEventId ->
[HeaderConf] ->
ScheduledEventType ->
J.Value ->
HTTPResp a ->
m ()
processSuccess :: CronEventId
-> [HeaderConf]
-> ScheduledEventType
-> Value
-> HTTPResp a
-> m ()
processSuccess CronEventId
eventId [HeaderConf]
decodedHeaders ScheduledEventType
type' Value
reqBodyJson HTTPResp a
resp = do
let respBody :: SerializableBlob
respBody = HTTPResp a -> SerializableBlob
forall (a :: TriggerTypes). HTTPResp a -> SerializableBlob
hrsBody HTTPResp a
resp
respHeaders :: [HeaderConf]
respHeaders = HTTPResp a -> [HeaderConf]
forall (a :: TriggerTypes). HTTPResp a -> [HeaderConf]
hrsHeaders HTTPResp a
resp
respStatus :: Int
respStatus = HTTPResp a -> Int
forall (a :: TriggerTypes). HTTPResp a -> Int
hrsStatus HTTPResp a
resp
invocation :: Invocation 'ScheduledType
invocation = CronEventId
-> Maybe Int
-> [HeaderConf]
-> SerializableBlob
-> [HeaderConf]
-> Value
-> Invocation 'ScheduledType
mkInvocation CronEventId
eventId (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
respStatus) [HeaderConf]
decodedHeaders SerializableBlob
respBody [HeaderConf]
respHeaders Value
reqBodyJson
Invocation 'ScheduledType -> ScheduledEventType -> m ()
forall (m :: * -> *).
MonadMetadataStorage m =>
Invocation 'ScheduledType -> ScheduledEventType -> m ()
insertScheduledEventInvocation Invocation 'ScheduledType
invocation ScheduledEventType
type'
CronEventId -> ScheduledEventOp -> ScheduledEventType -> m ()
forall (m :: * -> *).
MonadMetadataStorage m =>
CronEventId -> ScheduledEventOp -> ScheduledEventType -> m ()
setScheduledEventOp CronEventId
eventId (ScheduledEventStatus -> ScheduledEventOp
SEOpStatus ScheduledEventStatus
SESDelivered) ScheduledEventType
type'
processDead ::
(MonadMetadataStorage m) =>
ScheduledEventId ->
ScheduledEventType ->
m ()
processDead :: CronEventId -> ScheduledEventType -> m ()
processDead CronEventId
eventId ScheduledEventType
type' =
CronEventId -> ScheduledEventOp -> ScheduledEventType -> m ()
forall (m :: * -> *).
MonadMetadataStorage m =>
CronEventId -> ScheduledEventOp -> ScheduledEventType -> m ()
setScheduledEventOp CronEventId
eventId (ScheduledEventStatus -> ScheduledEventOp
SEOpStatus ScheduledEventStatus
SESDead) ScheduledEventType
type'
mkInvocation ::
ScheduledEventId ->
Maybe Int ->
[HeaderConf] ->
SB.SerializableBlob ->
[HeaderConf] ->
J.Value ->
(Invocation 'ScheduledType)
mkInvocation :: CronEventId
-> Maybe Int
-> [HeaderConf]
-> SerializableBlob
-> [HeaderConf]
-> Value
-> Invocation 'ScheduledType
mkInvocation CronEventId
eventId Maybe Int
status [HeaderConf]
reqHeaders SerializableBlob
respBody [HeaderConf]
respHeaders Value
reqBodyJson =
CronEventId
-> Maybe Int
-> WebhookRequest
-> Response 'ScheduledType
-> Invocation 'ScheduledType
forall (a :: TriggerTypes).
CronEventId
-> Maybe Int -> WebhookRequest -> Response a -> Invocation a
Invocation
CronEventId
eventId
Maybe Int
status
(Value -> [HeaderConf] -> Text -> WebhookRequest
mkWebhookReq Value
reqBodyJson [HeaderConf]
reqHeaders Text
invocationVersionST)
(Maybe Int
-> SerializableBlob -> [HeaderConf] -> Response 'ScheduledType
forall (a :: TriggerTypes).
Maybe Int -> SerializableBlob -> [HeaderConf] -> Response a
mkInvocationResp Maybe Int
status SerializableBlob
respBody [HeaderConf]
respHeaders)
getDeprivedCronTriggerStatsTx :: [TriggerName] -> Q.TxE QErr [CronTriggerStats]
getDeprivedCronTriggerStatsTx :: [TriggerName] -> TxE QErr [CronTriggerStats]
getDeprivedCronTriggerStatsTx [TriggerName]
cronTriggerNames =
((TriggerName, Int, UTCTime) -> CronTriggerStats)
-> [(TriggerName, Int, UTCTime)] -> [CronTriggerStats]
forall a b. (a -> b) -> [a] -> [b]
map (\(TriggerName
n, Int
count, UTCTime
maxTx) -> TriggerName -> Int -> UTCTime -> CronTriggerStats
CronTriggerStats TriggerName
n Int
count UTCTime
maxTx)
([(TriggerName, Int, UTCTime)] -> [CronTriggerStats])
-> TxET QErr IO [(TriggerName, Int, UTCTime)]
-> TxE QErr [CronTriggerStats]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> Identity PGTextArray
-> Bool
-> TxET QErr IO [(TriggerName, Int, UTCTime)]
forall (m :: * -> *) a r e.
(MonadIO m, FromRow a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m [a]
Q.listQE
PGTxErr -> QErr
defaultTxErrorHandler
[Q.sql|
SELECT t.trigger_name, coalesce(q.upcoming_events_count, 0), coalesce(q.max_scheduled_time, now())
FROM (SELECT UNNEST ($1::text[]) as trigger_name) as t
LEFT JOIN
( SELECT
trigger_name,
count(1) as upcoming_events_count,
max(scheduled_time) as max_scheduled_time
FROM hdb_catalog.hdb_cron_events
WHERE tries = 0 and status = 'scheduled'
GROUP BY trigger_name
) AS q
ON t.trigger_name = q.trigger_name
WHERE coalesce(q.upcoming_events_count, 0) < 100
|]
(PGTextArray -> Identity PGTextArray
forall a. a -> Identity a
Identity (PGTextArray -> Identity PGTextArray)
-> PGTextArray -> Identity PGTextArray
forall a b. (a -> b) -> a -> b
$ [Text] -> PGTextArray
PGTextArray ([Text] -> PGTextArray) -> [Text] -> PGTextArray
forall a b. (a -> b) -> a -> b
$ (TriggerName -> Text) -> [TriggerName] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map TriggerName -> Text
triggerNameToTxt [TriggerName]
cronTriggerNames)
Bool
True
getScheduledEventsForDeliveryTx :: Q.TxE QErr ([CronEvent], [OneOffScheduledEvent])
getScheduledEventsForDeliveryTx :: TxE QErr ([CronEvent], [OneOffScheduledEvent])
getScheduledEventsForDeliveryTx =
(,) ([CronEvent]
-> [OneOffScheduledEvent] -> ([CronEvent], [OneOffScheduledEvent]))
-> TxET QErr IO [CronEvent]
-> TxET
QErr
IO
([OneOffScheduledEvent] -> ([CronEvent], [OneOffScheduledEvent]))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TxET QErr IO [CronEvent]
getCronEventsForDelivery TxET
QErr
IO
([OneOffScheduledEvent] -> ([CronEvent], [OneOffScheduledEvent]))
-> TxET QErr IO [OneOffScheduledEvent]
-> TxE QErr ([CronEvent], [OneOffScheduledEvent])
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> TxET QErr IO [OneOffScheduledEvent]
getOneOffEventsForDelivery
where
getCronEventsForDelivery :: Q.TxE QErr [CronEvent]
getCronEventsForDelivery :: TxET QErr IO [CronEvent]
getCronEventsForDelivery =
(Identity (AltJ CronEvent) -> CronEvent)
-> [Identity (AltJ CronEvent)] -> [CronEvent]
forall a b. (a -> b) -> [a] -> [b]
map (AltJ CronEvent -> CronEvent
forall a. AltJ a -> a
Q.getAltJ (AltJ CronEvent -> CronEvent)
-> (Identity (AltJ CronEvent) -> AltJ CronEvent)
-> Identity (AltJ CronEvent)
-> CronEvent
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Identity (AltJ CronEvent) -> AltJ CronEvent
forall a. Identity a -> a
runIdentity)
([Identity (AltJ CronEvent)] -> [CronEvent])
-> TxET QErr IO [Identity (AltJ CronEvent)]
-> TxET QErr IO [CronEvent]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query -> () -> Bool -> TxET QErr IO [Identity (AltJ CronEvent)]
forall (m :: * -> *) a r e.
(MonadIO m, FromRow a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m [a]
Q.listQE
PGTxErr -> QErr
defaultTxErrorHandler
[Q.sql|
WITH cte AS
( UPDATE hdb_catalog.hdb_cron_events
SET status = 'locked'
WHERE id IN ( SELECT t.id
FROM hdb_catalog.hdb_cron_events t
WHERE ( t.status = 'scheduled'
and (
(t.next_retry_at is NULL and t.scheduled_time <= now()) or
(t.next_retry_at is not NULL and t.next_retry_at <= now())
)
)
FOR UPDATE SKIP LOCKED
)
RETURNING *
)
SELECT row_to_json(t.*) FROM cte AS t
|]
()
Bool
True
getOneOffEventsForDelivery :: Q.TxE QErr [OneOffScheduledEvent]
getOneOffEventsForDelivery :: TxET QErr IO [OneOffScheduledEvent]
getOneOffEventsForDelivery = do
(Identity (AltJ OneOffScheduledEvent) -> OneOffScheduledEvent)
-> [Identity (AltJ OneOffScheduledEvent)] -> [OneOffScheduledEvent]
forall a b. (a -> b) -> [a] -> [b]
map (AltJ OneOffScheduledEvent -> OneOffScheduledEvent
forall a. AltJ a -> a
Q.getAltJ (AltJ OneOffScheduledEvent -> OneOffScheduledEvent)
-> (Identity (AltJ OneOffScheduledEvent)
-> AltJ OneOffScheduledEvent)
-> Identity (AltJ OneOffScheduledEvent)
-> OneOffScheduledEvent
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Identity (AltJ OneOffScheduledEvent) -> AltJ OneOffScheduledEvent
forall a. Identity a -> a
runIdentity)
([Identity (AltJ OneOffScheduledEvent)] -> [OneOffScheduledEvent])
-> TxET QErr IO [Identity (AltJ OneOffScheduledEvent)]
-> TxET QErr IO [OneOffScheduledEvent]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> ()
-> Bool
-> TxET QErr IO [Identity (AltJ OneOffScheduledEvent)]
forall (m :: * -> *) a r e.
(MonadIO m, FromRow a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m [a]
Q.listQE
PGTxErr -> QErr
defaultTxErrorHandler
[Q.sql|
WITH cte AS (
UPDATE hdb_catalog.hdb_scheduled_events
SET status = 'locked'
WHERE id IN ( SELECT t.id
FROM hdb_catalog.hdb_scheduled_events t
WHERE ( t.status = 'scheduled'
and (
(t.next_retry_at is NULL and t.scheduled_time <= now()) or
(t.next_retry_at is not NULL and t.next_retry_at <= now())
)
)
FOR UPDATE SKIP LOCKED
)
RETURNING *
)
SELECT row_to_json(t.*) FROM cte AS t
|]
()
Bool
False
insertInvocationTx :: Invocation 'ScheduledType -> ScheduledEventType -> Q.TxE QErr ()
insertInvocationTx :: Invocation 'ScheduledType -> ScheduledEventType -> TxE QErr ()
insertInvocationTx Invocation 'ScheduledType
invo ScheduledEventType
type' = do
case ScheduledEventType
type' of
ScheduledEventType
Cron -> do
(PGTxErr -> QErr)
-> Query
-> (CronEventId, Maybe Int64, AltJ Value, AltJ Value)
-> Bool
-> TxE QErr ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
Q.unitQE
PGTxErr -> QErr
defaultTxErrorHandler
[Q.sql|
INSERT INTO hdb_catalog.hdb_cron_event_invocation_logs
(event_id, status, request, response)
VALUES ($1, $2, $3, $4)
|]
( Invocation 'ScheduledType -> CronEventId
forall (a :: TriggerTypes). Invocation a -> CronEventId
iEventId Invocation 'ScheduledType
invo,
Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Int64) -> Maybe Int -> Maybe Int64
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Invocation 'ScheduledType -> Maybe Int
forall (a :: TriggerTypes). Invocation a -> Maybe Int
iStatus Invocation 'ScheduledType
invo :: Maybe Int64,
Value -> AltJ Value
forall a. a -> AltJ a
Q.AltJ (Value -> AltJ Value) -> Value -> AltJ Value
forall a b. (a -> b) -> a -> b
$ WebhookRequest -> Value
forall a. ToJSON a => a -> Value
J.toJSON (WebhookRequest -> Value) -> WebhookRequest -> Value
forall a b. (a -> b) -> a -> b
$ Invocation 'ScheduledType -> WebhookRequest
forall (a :: TriggerTypes). Invocation a -> WebhookRequest
iRequest Invocation 'ScheduledType
invo,
Value -> AltJ Value
forall a. a -> AltJ a
Q.AltJ (Value -> AltJ Value) -> Value -> AltJ Value
forall a b. (a -> b) -> a -> b
$ Response 'ScheduledType -> Value
forall a. ToJSON a => a -> Value
J.toJSON (Response 'ScheduledType -> Value)
-> Response 'ScheduledType -> Value
forall a b. (a -> b) -> a -> b
$ Invocation 'ScheduledType -> Response 'ScheduledType
forall (a :: TriggerTypes). Invocation a -> Response a
iResponse Invocation 'ScheduledType
invo
)
Bool
True
(PGTxErr -> QErr)
-> Query -> Identity CronEventId -> Bool -> TxE QErr ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
Q.unitQE
PGTxErr -> QErr
defaultTxErrorHandler
[Q.sql|
UPDATE hdb_catalog.hdb_cron_events
SET tries = tries + 1
WHERE id = $1
|]
(CronEventId -> Identity CronEventId
forall a. a -> Identity a
Identity (CronEventId -> Identity CronEventId)
-> CronEventId -> Identity CronEventId
forall a b. (a -> b) -> a -> b
$ Invocation 'ScheduledType -> CronEventId
forall (a :: TriggerTypes). Invocation a -> CronEventId
iEventId Invocation 'ScheduledType
invo)
Bool
True
ScheduledEventType
OneOff -> do
(PGTxErr -> QErr)
-> Query
-> (CronEventId, Maybe Int64, AltJ Value, AltJ Value)
-> Bool
-> TxE QErr ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
Q.unitQE
PGTxErr -> QErr
defaultTxErrorHandler
[Q.sql|
INSERT INTO hdb_catalog.hdb_scheduled_event_invocation_logs
(event_id, status, request, response)
VALUES ($1, $2, $3, $4)
|]
( Invocation 'ScheduledType -> CronEventId
forall (a :: TriggerTypes). Invocation a -> CronEventId
iEventId Invocation 'ScheduledType
invo,
Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Int64) -> Maybe Int -> Maybe Int64
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Invocation 'ScheduledType -> Maybe Int
forall (a :: TriggerTypes). Invocation a -> Maybe Int
iStatus Invocation 'ScheduledType
invo :: Maybe Int64,
Value -> AltJ Value
forall a. a -> AltJ a
Q.AltJ (Value -> AltJ Value) -> Value -> AltJ Value
forall a b. (a -> b) -> a -> b
$ WebhookRequest -> Value
forall a. ToJSON a => a -> Value
J.toJSON (WebhookRequest -> Value) -> WebhookRequest -> Value
forall a b. (a -> b) -> a -> b
$ Invocation 'ScheduledType -> WebhookRequest
forall (a :: TriggerTypes). Invocation a -> WebhookRequest
iRequest Invocation 'ScheduledType
invo,
Value -> AltJ Value
forall a. a -> AltJ a
Q.AltJ (Value -> AltJ Value) -> Value -> AltJ Value
forall a b. (a -> b) -> a -> b
$ Response 'ScheduledType -> Value
forall a. ToJSON a => a -> Value
J.toJSON (Response 'ScheduledType -> Value)
-> Response 'ScheduledType -> Value
forall a b. (a -> b) -> a -> b
$ Invocation 'ScheduledType -> Response 'ScheduledType
forall (a :: TriggerTypes). Invocation a -> Response a
iResponse Invocation 'ScheduledType
invo
)
Bool
True
(PGTxErr -> QErr)
-> Query -> Identity CronEventId -> Bool -> TxE QErr ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
Q.unitQE
PGTxErr -> QErr
defaultTxErrorHandler
[Q.sql|
UPDATE hdb_catalog.hdb_scheduled_events
SET tries = tries + 1
WHERE id = $1
|]
(CronEventId -> Identity CronEventId
forall a. a -> Identity a
Identity (CronEventId -> Identity CronEventId)
-> CronEventId -> Identity CronEventId
forall a b. (a -> b) -> a -> b
$ Invocation 'ScheduledType -> CronEventId
forall (a :: TriggerTypes). Invocation a -> CronEventId
iEventId Invocation 'ScheduledType
invo)
Bool
True
setScheduledEventOpTx ::
ScheduledEventId -> ScheduledEventOp -> ScheduledEventType -> Q.TxE QErr ()
setScheduledEventOpTx :: CronEventId
-> ScheduledEventOp -> ScheduledEventType -> TxE QErr ()
setScheduledEventOpTx CronEventId
eventId ScheduledEventOp
op ScheduledEventType
type' = case ScheduledEventOp
op of
SEOpRetry UTCTime
time -> UTCTime -> TxE QErr ()
setRetry UTCTime
time
SEOpStatus ScheduledEventStatus
status -> ScheduledEventStatus -> TxE QErr ()
setStatus ScheduledEventStatus
status
where
setRetry :: UTCTime -> TxE QErr ()
setRetry UTCTime
time =
case ScheduledEventType
type' of
ScheduledEventType
Cron ->
(PGTxErr -> QErr)
-> Query -> (UTCTime, CronEventId) -> Bool -> TxE QErr ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
Q.unitQE
PGTxErr -> QErr
defaultTxErrorHandler
[Q.sql|
UPDATE hdb_catalog.hdb_cron_events
SET next_retry_at = $1,
STATUS = 'scheduled'
WHERE id = $2
|]
(UTCTime
time, CronEventId
eventId)
Bool
True
ScheduledEventType
OneOff ->
(PGTxErr -> QErr)
-> Query -> (UTCTime, CronEventId) -> Bool -> TxE QErr ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
Q.unitQE
PGTxErr -> QErr
defaultTxErrorHandler
[Q.sql|
UPDATE hdb_catalog.hdb_scheduled_events
SET next_retry_at = $1,
STATUS = 'scheduled'
WHERE id = $2
|]
(UTCTime
time, CronEventId
eventId)
Bool
True
setStatus :: ScheduledEventStatus -> TxE QErr ()
setStatus ScheduledEventStatus
status =
case ScheduledEventType
type' of
ScheduledEventType
Cron -> do
(PGTxErr -> QErr)
-> Query
-> (CronEventId, ScheduledEventStatus)
-> Bool
-> TxE QErr ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
Q.unitQE
PGTxErr -> QErr
defaultTxErrorHandler
[Q.sql|
UPDATE hdb_catalog.hdb_cron_events
SET status = $2
WHERE id = $1
|]
(CronEventId
eventId, ScheduledEventStatus
status)
Bool
True
ScheduledEventType
OneOff -> do
(PGTxErr -> QErr)
-> Query
-> (CronEventId, ScheduledEventStatus)
-> Bool
-> TxE QErr ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
Q.unitQE
PGTxErr -> QErr
defaultTxErrorHandler
[Q.sql|
UPDATE hdb_catalog.hdb_scheduled_events
SET status = $2
WHERE id = $1
|]
(CronEventId
eventId, ScheduledEventStatus
status)
Bool
True
unlockScheduledEventsTx :: ScheduledEventType -> [ScheduledEventId] -> Q.TxE QErr Int
unlockScheduledEventsTx :: ScheduledEventType -> [CronEventId] -> TxE QErr Int
unlockScheduledEventsTx ScheduledEventType
type' [CronEventId]
eventIds =
let eventIdsTextArray :: [Text]
eventIdsTextArray = (CronEventId -> Text) -> [CronEventId] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map CronEventId -> Text
unEventId [CronEventId]
eventIds
in case ScheduledEventType
type' of
ScheduledEventType
Cron ->
(Identity Int -> Int
forall a. Identity a -> a
runIdentity (Identity Int -> Int)
-> (SingleRow (Identity Int) -> Identity Int)
-> SingleRow (Identity Int)
-> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SingleRow (Identity Int) -> Identity Int
forall a. SingleRow a -> a
Q.getRow)
(SingleRow (Identity Int) -> Int)
-> TxET QErr IO (SingleRow (Identity Int)) -> TxE QErr Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> Identity PGTextArray
-> Bool
-> TxET QErr IO (SingleRow (Identity Int))
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
Q.withQE
PGTxErr -> QErr
defaultTxErrorHandler
[Q.sql|
WITH "cte" AS
(UPDATE hdb_catalog.hdb_cron_events
SET status = 'scheduled'
WHERE id = ANY($1::text[]) and status = 'locked'
RETURNING *)
SELECT count(*) FROM "cte"
|]
(PGTextArray -> Identity PGTextArray
forall a. a -> Identity a
Identity (PGTextArray -> Identity PGTextArray)
-> PGTextArray -> Identity PGTextArray
forall a b. (a -> b) -> a -> b
$ [Text] -> PGTextArray
PGTextArray [Text]
eventIdsTextArray)
Bool
True
ScheduledEventType
OneOff ->
(Identity Int -> Int
forall a. Identity a -> a
runIdentity (Identity Int -> Int)
-> (SingleRow (Identity Int) -> Identity Int)
-> SingleRow (Identity Int)
-> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SingleRow (Identity Int) -> Identity Int
forall a. SingleRow a -> a
Q.getRow)
(SingleRow (Identity Int) -> Int)
-> TxET QErr IO (SingleRow (Identity Int)) -> TxE QErr Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> Identity PGTextArray
-> Bool
-> TxET QErr IO (SingleRow (Identity Int))
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
Q.withQE
PGTxErr -> QErr
defaultTxErrorHandler
[Q.sql|
WITH "cte" AS
(UPDATE hdb_catalog.hdb_scheduled_events
SET status = 'scheduled'
WHERE id = ANY($1::text[]) AND status = 'locked'
RETURNING *)
SELECT count(*) FROM "cte"
|]
(PGTextArray -> Identity PGTextArray
forall a. a -> Identity a
Identity (PGTextArray -> Identity PGTextArray)
-> PGTextArray -> Identity PGTextArray
forall a b. (a -> b) -> a -> b
$ [Text] -> PGTextArray
PGTextArray [Text]
eventIdsTextArray)
Bool
True
unlockAllLockedScheduledEventsTx :: Q.TxE QErr ()
unlockAllLockedScheduledEventsTx :: TxE QErr ()
unlockAllLockedScheduledEventsTx = do
(PGTxErr -> QErr) -> Query -> () -> Bool -> TxE QErr ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
Q.unitQE
PGTxErr -> QErr
defaultTxErrorHandler
[Q.sql|
UPDATE hdb_catalog.hdb_cron_events
SET status = 'scheduled'
WHERE status = 'locked'
|]
()
Bool
True
(PGTxErr -> QErr) -> Query -> () -> Bool -> TxE QErr ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
Q.unitQE
PGTxErr -> QErr
defaultTxErrorHandler
[Q.sql|
UPDATE hdb_catalog.hdb_scheduled_events
SET status = 'scheduled'
WHERE status = 'locked'
|]
()
Bool
True
insertCronEventsTx :: [CronEventSeed] -> Q.TxE QErr ()
insertCronEventsTx :: [CronEventSeed] -> TxE QErr ()
insertCronEventsTx [CronEventSeed]
cronSeeds = do
let insertCronEventsSql :: Text
insertCronEventsSql =
Builder -> Text
TB.run (Builder -> Text) -> Builder -> Text
forall a b. (a -> b) -> a -> b
$
SQLInsert -> Builder
forall a. ToSQL a => a -> Builder
toSQL
SQLInsert :: QualifiedTable
-> [PGCol]
-> ValuesExp
-> Maybe SQLConflict
-> Maybe RetExp
-> SQLInsert
S.SQLInsert
{ siTable :: QualifiedTable
siTable = QualifiedTable
cronEventsTable,
siCols :: [PGCol]
siCols = (Text -> PGCol) -> [Text] -> [PGCol]
forall a b. (a -> b) -> [a] -> [b]
map Text -> PGCol
unsafePGCol [Text
"trigger_name", Text
"scheduled_time"],
siValues :: ValuesExp
siValues = [TupleExp] -> ValuesExp
S.ValuesExp ([TupleExp] -> ValuesExp) -> [TupleExp] -> ValuesExp
forall a b. (a -> b) -> a -> b
$ (CronEventSeed -> TupleExp) -> [CronEventSeed] -> [TupleExp]
forall a b. (a -> b) -> [a] -> [b]
map ([Text] -> TupleExp
toTupleExp ([Text] -> TupleExp)
-> (CronEventSeed -> [Text]) -> CronEventSeed -> TupleExp
forall b c a. (b -> c) -> (a -> b) -> a -> c
. CronEventSeed -> [Text]
toArr) [CronEventSeed]
cronSeeds,
siConflict :: Maybe SQLConflict
siConflict = SQLConflict -> Maybe SQLConflict
forall a. a -> Maybe a
Just (SQLConflict -> Maybe SQLConflict)
-> SQLConflict -> Maybe SQLConflict
forall a b. (a -> b) -> a -> b
$ Maybe SQLConflictTarget -> SQLConflict
S.DoNothing Maybe SQLConflictTarget
forall a. Maybe a
Nothing,
siRet :: Maybe RetExp
siRet = Maybe RetExp
forall a. Maybe a
Nothing
}
(PGTxErr -> QErr) -> Query -> () -> Bool -> TxE QErr ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
Q.unitQE PGTxErr -> QErr
defaultTxErrorHandler (Text -> Query
Q.fromText Text
insertCronEventsSql) () Bool
False
where
toArr :: CronEventSeed -> [Text]
toArr (CronEventSeed TriggerName
n UTCTime
t) = [(TriggerName -> Text
triggerNameToTxt TriggerName
n), (UTCTime -> Text
formatTime' UTCTime
t)]
toTupleExp :: [Text] -> TupleExp
toTupleExp = [SQLExp] -> TupleExp
S.TupleExp ([SQLExp] -> TupleExp)
-> ([Text] -> [SQLExp]) -> [Text] -> TupleExp
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Text -> SQLExp) -> [Text] -> [SQLExp]
forall a b. (a -> b) -> [a] -> [b]
map Text -> SQLExp
S.SELit
insertOneOffScheduledEventTx :: OneOffEvent -> Q.TxE QErr EventId
insertOneOffScheduledEventTx :: OneOffEvent -> TxE QErr CronEventId
insertOneOffScheduledEventTx CreateScheduledEvent {[HeaderConf]
Maybe Value
Maybe Text
Maybe MetadataResponseTransform
Maybe RequestTransform
UTCTime
InputWebhook
STRetryConf
cseResponseTransform :: OneOffEvent -> Maybe MetadataResponseTransform
cseRequestTransform :: OneOffEvent -> Maybe RequestTransform
cseComment :: OneOffEvent -> Maybe Text
cseRetryConf :: OneOffEvent -> STRetryConf
cseHeaders :: OneOffEvent -> [HeaderConf]
csePayload :: OneOffEvent -> Maybe Value
cseScheduleAt :: OneOffEvent -> UTCTime
cseWebhook :: OneOffEvent -> InputWebhook
cseResponseTransform :: Maybe MetadataResponseTransform
cseRequestTransform :: Maybe RequestTransform
cseComment :: Maybe Text
cseRetryConf :: STRetryConf
cseHeaders :: [HeaderConf]
csePayload :: Maybe Value
cseScheduleAt :: UTCTime
cseWebhook :: InputWebhook
..} =
Identity CronEventId -> CronEventId
forall a. Identity a -> a
runIdentity (Identity CronEventId -> CronEventId)
-> (SingleRow (Identity CronEventId) -> Identity CronEventId)
-> SingleRow (Identity CronEventId)
-> CronEventId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SingleRow (Identity CronEventId) -> Identity CronEventId
forall a. SingleRow a -> a
Q.getRow
(SingleRow (Identity CronEventId) -> CronEventId)
-> TxET QErr IO (SingleRow (Identity CronEventId))
-> TxE QErr CronEventId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> (AltJ InputWebhook, UTCTime, AltJ (Maybe Value),
AltJ STRetryConf, AltJ [HeaderConf], Maybe Text)
-> Bool
-> TxET QErr IO (SingleRow (Identity CronEventId))
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
Q.withQE
PGTxErr -> QErr
defaultTxErrorHandler
[Q.sql|
INSERT INTO hdb_catalog.hdb_scheduled_events
(webhook_conf,scheduled_time,payload,retry_conf,header_conf,comment)
VALUES
($1, $2, $3, $4, $5, $6) RETURNING id
|]
( InputWebhook -> AltJ InputWebhook
forall a. a -> AltJ a
Q.AltJ InputWebhook
cseWebhook,
UTCTime
cseScheduleAt,
Maybe Value -> AltJ (Maybe Value)
forall a. a -> AltJ a
Q.AltJ Maybe Value
csePayload,
STRetryConf -> AltJ STRetryConf
forall a. a -> AltJ a
Q.AltJ STRetryConf
cseRetryConf,
[HeaderConf] -> AltJ [HeaderConf]
forall a. a -> AltJ a
Q.AltJ [HeaderConf]
cseHeaders,
Maybe Text
cseComment
)
Bool
False
dropFutureCronEventsTx :: ClearCronEvents -> Q.TxE QErr ()
dropFutureCronEventsTx :: ClearCronEvents -> TxE QErr ()
dropFutureCronEventsTx = \case
SingleCronTrigger TriggerName
triggerName ->
(PGTxErr -> QErr)
-> Query -> Identity TriggerName -> Bool -> TxE QErr ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
Q.unitQE
PGTxErr -> QErr
defaultTxErrorHandler
[Q.sql|
DELETE FROM hdb_catalog.hdb_cron_events
WHERE trigger_name = $1 AND scheduled_time > now() AND tries = 0
|]
(TriggerName -> Identity TriggerName
forall a. a -> Identity a
Identity TriggerName
triggerName)
Bool
True
MetadataCronTriggers [TriggerName]
triggerNames ->
(PGTxErr -> QErr)
-> Query -> Identity PGTextArray -> Bool -> TxE QErr ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
Q.unitQE
PGTxErr -> QErr
defaultTxErrorHandler
[Q.sql|
DELETE FROM hdb_catalog.hdb_cron_events
WHERE scheduled_time > now() AND tries = 0 AND trigger_name = ANY($1::text[])
|]
(PGTextArray -> Identity PGTextArray
forall a. a -> Identity a
Identity (PGTextArray -> Identity PGTextArray)
-> PGTextArray -> Identity PGTextArray
forall a b. (a -> b) -> a -> b
$ [Text] -> PGTextArray
PGTextArray ([Text] -> PGTextArray) -> [Text] -> PGTextArray
forall a b. (a -> b) -> a -> b
$ (TriggerName -> Text) -> [TriggerName] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map TriggerName -> Text
triggerNameToTxt [TriggerName]
triggerNames)
Bool
False
cronEventsTable :: QualifiedTable
cronEventsTable :: QualifiedTable
cronEventsTable =
SchemaName -> TableName -> QualifiedTable
forall a. SchemaName -> a -> QualifiedObject a
QualifiedObject SchemaName
"hdb_catalog" (TableName -> QualifiedTable) -> TableName -> QualifiedTable
forall a b. (a -> b) -> a -> b
$ Text -> TableName
TableName Text
"hdb_cron_events"
mkScheduledEventStatusFilter :: [ScheduledEventStatus] -> S.BoolExp
mkScheduledEventStatusFilter :: [ScheduledEventStatus] -> BoolExp
mkScheduledEventStatusFilter = \case
[] -> Bool -> BoolExp
S.BELit Bool
True
[ScheduledEventStatus]
v ->
SQLExp -> [SQLExp] -> BoolExp
S.BEIN (Identifier -> SQLExp
S.SEIdentifier (Identifier -> SQLExp) -> Identifier -> SQLExp
forall a b. (a -> b) -> a -> b
$ Text -> Identifier
Identifier Text
"status") ([SQLExp] -> BoolExp) -> [SQLExp] -> BoolExp
forall a b. (a -> b) -> a -> b
$
(ScheduledEventStatus -> SQLExp)
-> [ScheduledEventStatus] -> [SQLExp]
forall a b. (a -> b) -> [a] -> [b]
map (Text -> SQLExp
S.SELit (Text -> SQLExp)
-> (ScheduledEventStatus -> Text) -> ScheduledEventStatus -> SQLExp
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ScheduledEventStatus -> Text
scheduledEventStatusToText) [ScheduledEventStatus]
v
scheduledTimeOrderBy :: S.OrderByExp
scheduledTimeOrderBy :: OrderByExp
scheduledTimeOrderBy =
let scheduledTimeCol :: SQLExp
scheduledTimeCol = Identifier -> SQLExp
S.SEIdentifier (Identifier -> SQLExp) -> Identifier -> SQLExp
forall a b. (a -> b) -> a -> b
$ Text -> Identifier
Identifier Text
"scheduled_time"
in NonEmpty OrderByItem -> OrderByExp
S.OrderByExp (NonEmpty OrderByItem -> OrderByExp)
-> NonEmpty OrderByItem -> OrderByExp
forall a b. (a -> b) -> a -> b
$
(OrderByItem -> [OrderByItem] -> NonEmpty OrderByItem)
-> [OrderByItem] -> OrderByItem -> NonEmpty OrderByItem
forall a b c. (a -> b -> c) -> b -> a -> c
flip OrderByItem -> [OrderByItem] -> NonEmpty OrderByItem
forall a. a -> [a] -> NonEmpty a
(NE.:|) [] (OrderByItem -> NonEmpty OrderByItem)
-> OrderByItem -> NonEmpty OrderByItem
forall a b. (a -> b) -> a -> b
$
SQLExp -> Maybe OrderType -> Maybe NullsOrder -> OrderByItem
S.OrderByItem
SQLExp
scheduledTimeCol
(OrderType -> Maybe OrderType
forall a. a -> Maybe a
Just OrderType
S.OTAsc)
Maybe NullsOrder
forall a. Maybe a
Nothing
mkPaginationSelectExp ::
S.Select ->
ScheduledEventPagination ->
S.Select
Select
allRowsSelect ScheduledEventPagination {Maybe Int
_sepOffset :: ScheduledEventPagination -> Maybe Int
_sepLimit :: ScheduledEventPagination -> Maybe Int
_sepOffset :: Maybe Int
_sepLimit :: Maybe Int
..} =
Select
S.mkSelect
{ selCTEs :: [(TableAlias, Select)]
S.selCTEs = [(Identifier -> TableAlias
forall a. IsIdentifier a => a -> TableAlias
S.toTableAlias Identifier
countCteAlias, Select
allRowsSelect), (Identifier -> TableAlias
forall a. IsIdentifier a => a -> TableAlias
S.toTableAlias Identifier
limitCteAlias, Select
limitCteSelect)],
selExtr :: [Extractor]
S.selExtr = [Extractor
countExtractor, Extractor
rowsExtractor]
}
where
countCteAlias :: Identifier
countCteAlias = Text -> Identifier
Identifier Text
"count_cte"
limitCteAlias :: Identifier
limitCteAlias = Text -> Identifier
Identifier Text
"limit_cte"
countExtractor :: Extractor
countExtractor =
let selectExp :: Select
selectExp =
Select
S.mkSelect
{ selExtr :: [Extractor]
S.selExtr = [SQLExp -> Maybe ColumnAlias -> Extractor
S.Extractor SQLExp
S.countStar Maybe ColumnAlias
forall a. Maybe a
Nothing],
selFrom :: Maybe FromExp
S.selFrom = FromExp -> Maybe FromExp
forall a. a -> Maybe a
Just (FromExp -> Maybe FromExp) -> FromExp -> Maybe FromExp
forall a b. (a -> b) -> a -> b
$ Identifier -> FromExp
forall a. IsIdentifier a => a -> FromExp
S.mkIdenFromExp Identifier
countCteAlias
}
in SQLExp -> Maybe ColumnAlias -> Extractor
S.Extractor (Select -> SQLExp
S.SESelect Select
selectExp) Maybe ColumnAlias
forall a. Maybe a
Nothing
limitCteSelect :: Select
limitCteSelect =
Select
S.mkSelect
{ selExtr :: [Extractor]
S.selExtr = [Extractor
S.selectStar],
selFrom :: Maybe FromExp
S.selFrom = FromExp -> Maybe FromExp
forall a. a -> Maybe a
Just (FromExp -> Maybe FromExp) -> FromExp -> Maybe FromExp
forall a b. (a -> b) -> a -> b
$ Identifier -> FromExp
forall a. IsIdentifier a => a -> FromExp
S.mkIdenFromExp Identifier
countCteAlias,
selLimit :: Maybe LimitExp
S.selLimit = (SQLExp -> LimitExp
S.LimitExp (SQLExp -> LimitExp) -> (Int -> SQLExp) -> Int -> LimitExp
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> SQLExp
S.intToSQLExp) (Int -> LimitExp) -> Maybe Int -> Maybe LimitExp
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe Int
_sepLimit,
selOffset :: Maybe OffsetExp
S.selOffset = (SQLExp -> OffsetExp
S.OffsetExp (SQLExp -> OffsetExp) -> (Int -> SQLExp) -> Int -> OffsetExp
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> SQLExp
S.intToSQLExp) (Int -> OffsetExp) -> Maybe Int -> Maybe OffsetExp
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe Int
_sepOffset
}
rowsExtractor :: Extractor
rowsExtractor =
let jsonAgg :: SQLExp
jsonAgg = Text -> SQLExp
S.SEUnsafe Text
"json_agg(row_to_json(limit_cte.*))"
selectExp :: Select
selectExp =
Select
S.mkSelect
{ selExtr :: [Extractor]
S.selExtr = [SQLExp -> Maybe ColumnAlias -> Extractor
S.Extractor SQLExp
jsonAgg Maybe ColumnAlias
forall a. Maybe a
Nothing],
selFrom :: Maybe FromExp
S.selFrom = FromExp -> Maybe FromExp
forall a. a -> Maybe a
Just (FromExp -> Maybe FromExp) -> FromExp -> Maybe FromExp
forall a b. (a -> b) -> a -> b
$ Identifier -> FromExp
forall a. IsIdentifier a => a -> FromExp
S.mkIdenFromExp Identifier
limitCteAlias
}
in SQLExp -> Maybe ColumnAlias -> Extractor
S.Extractor (SQLExp -> SQLExp -> SQLExp
S.handleIfNull (Text -> SQLExp
S.SELit Text
"[]") (Select -> SQLExp
S.SESelect Select
selectExp)) Maybe ColumnAlias
forall a. Maybe a
Nothing
withCount :: (Int, Q.AltJ a) -> WithTotalCount a
withCount :: (Int, AltJ a) -> WithTotalCount a
withCount (Int
count, Q.AltJ a
a) = Int -> a -> WithTotalCount a
forall a. Int -> a -> WithTotalCount a
WithTotalCount Int
count a
a
getOneOffScheduledEventsTx ::
ScheduledEventPagination ->
[ScheduledEventStatus] ->
Q.TxE QErr (WithTotalCount [OneOffScheduledEvent])
getOneOffScheduledEventsTx :: ScheduledEventPagination
-> [ScheduledEventStatus]
-> TxE QErr (WithTotalCount [OneOffScheduledEvent])
getOneOffScheduledEventsTx ScheduledEventPagination
pagination [ScheduledEventStatus]
statuses = do
let table :: QualifiedTable
table = SchemaName -> TableName -> QualifiedTable
forall a. SchemaName -> a -> QualifiedObject a
QualifiedObject SchemaName
"hdb_catalog" (TableName -> QualifiedTable) -> TableName -> QualifiedTable
forall a b. (a -> b) -> a -> b
$ Text -> TableName
TableName Text
"hdb_scheduled_events"
statusFilter :: BoolExp
statusFilter = [ScheduledEventStatus] -> BoolExp
mkScheduledEventStatusFilter [ScheduledEventStatus]
statuses
select :: Select
select =
Select
S.mkSelect
{ selExtr :: [Extractor]
S.selExtr = [Extractor
S.selectStar],
selFrom :: Maybe FromExp
S.selFrom = FromExp -> Maybe FromExp
forall a. a -> Maybe a
Just (FromExp -> Maybe FromExp) -> FromExp -> Maybe FromExp
forall a b. (a -> b) -> a -> b
$ QualifiedTable -> FromExp
S.mkSimpleFromExp QualifiedTable
table,
selWhere :: Maybe WhereFrag
S.selWhere = WhereFrag -> Maybe WhereFrag
forall a. a -> Maybe a
Just (WhereFrag -> Maybe WhereFrag) -> WhereFrag -> Maybe WhereFrag
forall a b. (a -> b) -> a -> b
$ BoolExp -> WhereFrag
S.WhereFrag BoolExp
statusFilter,
selOrderBy :: Maybe OrderByExp
S.selOrderBy = OrderByExp -> Maybe OrderByExp
forall a. a -> Maybe a
Just OrderByExp
scheduledTimeOrderBy
}
sql :: Query
sql = Builder -> Query
Q.fromBuilder (Builder -> Query) -> Builder -> Query
forall a b. (a -> b) -> a -> b
$ Select -> Builder
forall a. ToSQL a => a -> Builder
toSQL (Select -> Builder) -> Select -> Builder
forall a b. (a -> b) -> a -> b
$ Select -> ScheduledEventPagination -> Select
mkPaginationSelectExp Select
select ScheduledEventPagination
pagination
((Int, AltJ [OneOffScheduledEvent])
-> WithTotalCount [OneOffScheduledEvent]
forall a. (Int, AltJ a) -> WithTotalCount a
withCount ((Int, AltJ [OneOffScheduledEvent])
-> WithTotalCount [OneOffScheduledEvent])
-> (SingleRow (Int, AltJ [OneOffScheduledEvent])
-> (Int, AltJ [OneOffScheduledEvent]))
-> SingleRow (Int, AltJ [OneOffScheduledEvent])
-> WithTotalCount [OneOffScheduledEvent]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SingleRow (Int, AltJ [OneOffScheduledEvent])
-> (Int, AltJ [OneOffScheduledEvent])
forall a. SingleRow a -> a
Q.getRow) (SingleRow (Int, AltJ [OneOffScheduledEvent])
-> WithTotalCount [OneOffScheduledEvent])
-> TxET QErr IO (SingleRow (Int, AltJ [OneOffScheduledEvent]))
-> TxE QErr (WithTotalCount [OneOffScheduledEvent])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> ()
-> Bool
-> TxET QErr IO (SingleRow (Int, AltJ [OneOffScheduledEvent]))
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
Q.withQE PGTxErr -> QErr
defaultTxErrorHandler Query
sql () Bool
False
getCronEventsTx ::
TriggerName ->
ScheduledEventPagination ->
[ScheduledEventStatus] ->
Q.TxE QErr (WithTotalCount [CronEvent])
getCronEventsTx :: TriggerName
-> ScheduledEventPagination
-> [ScheduledEventStatus]
-> TxE QErr (WithTotalCount [CronEvent])
getCronEventsTx TriggerName
triggerName ScheduledEventPagination
pagination [ScheduledEventStatus]
status = do
let triggerNameFilter :: BoolExp
triggerNameFilter =
CompareOp -> SQLExp -> SQLExp -> BoolExp
S.BECompare CompareOp
S.SEQ (Identifier -> SQLExp
S.SEIdentifier (Identifier -> SQLExp) -> Identifier -> SQLExp
forall a b. (a -> b) -> a -> b
$ Text -> Identifier
Identifier Text
"trigger_name") (Text -> SQLExp
S.SELit (Text -> SQLExp) -> Text -> SQLExp
forall a b. (a -> b) -> a -> b
$ TriggerName -> Text
triggerNameToTxt TriggerName
triggerName)
statusFilter :: BoolExp
statusFilter = [ScheduledEventStatus] -> BoolExp
mkScheduledEventStatusFilter [ScheduledEventStatus]
status
select :: Select
select =
Select
S.mkSelect
{ selExtr :: [Extractor]
S.selExtr = [Extractor
S.selectStar],
selFrom :: Maybe FromExp
S.selFrom = FromExp -> Maybe FromExp
forall a. a -> Maybe a
Just (FromExp -> Maybe FromExp) -> FromExp -> Maybe FromExp
forall a b. (a -> b) -> a -> b
$ QualifiedTable -> FromExp
S.mkSimpleFromExp QualifiedTable
cronEventsTable,
selWhere :: Maybe WhereFrag
S.selWhere = WhereFrag -> Maybe WhereFrag
forall a. a -> Maybe a
Just (WhereFrag -> Maybe WhereFrag) -> WhereFrag -> Maybe WhereFrag
forall a b. (a -> b) -> a -> b
$ BoolExp -> WhereFrag
S.WhereFrag (BoolExp -> WhereFrag) -> BoolExp -> WhereFrag
forall a b. (a -> b) -> a -> b
$ BinOp -> BoolExp -> BoolExp -> BoolExp
S.BEBin BinOp
S.AndOp BoolExp
triggerNameFilter BoolExp
statusFilter,
selOrderBy :: Maybe OrderByExp
S.selOrderBy = OrderByExp -> Maybe OrderByExp
forall a. a -> Maybe a
Just OrderByExp
scheduledTimeOrderBy
}
sql :: Query
sql = Builder -> Query
Q.fromBuilder (Builder -> Query) -> Builder -> Query
forall a b. (a -> b) -> a -> b
$ Select -> Builder
forall a. ToSQL a => a -> Builder
toSQL (Select -> Builder) -> Select -> Builder
forall a b. (a -> b) -> a -> b
$ Select -> ScheduledEventPagination -> Select
mkPaginationSelectExp Select
select ScheduledEventPagination
pagination
((Int, AltJ [CronEvent]) -> WithTotalCount [CronEvent]
forall a. (Int, AltJ a) -> WithTotalCount a
withCount ((Int, AltJ [CronEvent]) -> WithTotalCount [CronEvent])
-> (SingleRow (Int, AltJ [CronEvent]) -> (Int, AltJ [CronEvent]))
-> SingleRow (Int, AltJ [CronEvent])
-> WithTotalCount [CronEvent]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SingleRow (Int, AltJ [CronEvent]) -> (Int, AltJ [CronEvent])
forall a. SingleRow a -> a
Q.getRow) (SingleRow (Int, AltJ [CronEvent]) -> WithTotalCount [CronEvent])
-> TxET QErr IO (SingleRow (Int, AltJ [CronEvent]))
-> TxE QErr (WithTotalCount [CronEvent])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> ()
-> Bool
-> TxET QErr IO (SingleRow (Int, AltJ [CronEvent]))
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
Q.withQE PGTxErr -> QErr
defaultTxErrorHandler Query
sql () Bool
False
deleteScheduledEventTx ::
ScheduledEventId -> ScheduledEventType -> Q.TxE QErr ()
deleteScheduledEventTx :: CronEventId -> ScheduledEventType -> TxE QErr ()
deleteScheduledEventTx CronEventId
eventId = \case
ScheduledEventType
OneOff ->
(PGTxErr -> QErr)
-> Query -> Identity CronEventId -> Bool -> TxE QErr ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
Q.unitQE
PGTxErr -> QErr
defaultTxErrorHandler
[Q.sql|
DELETE FROM hdb_catalog.hdb_scheduled_events
WHERE id = $1
|]
(CronEventId -> Identity CronEventId
forall a. a -> Identity a
Identity CronEventId
eventId)
Bool
False
ScheduledEventType
Cron ->
(PGTxErr -> QErr)
-> Query -> Identity CronEventId -> Bool -> TxE QErr ()
forall (m :: * -> *) r e.
(MonadIO m, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m ()
Q.unitQE
PGTxErr -> QErr
defaultTxErrorHandler
[Q.sql|
DELETE FROM hdb_catalog.hdb_cron_events
WHERE id = $1
|]
(CronEventId -> Identity CronEventId
forall a. a -> Identity a
Identity CronEventId
eventId)
Bool
False
invocationFieldExtractors :: QualifiedTable -> [S.Extractor]
QualifiedTable
table =
[ SQLExp -> Maybe ColumnAlias -> Extractor
S.Extractor (Text -> SQLExp
seIden Text
"id") Maybe ColumnAlias
forall a. Maybe a
Nothing,
SQLExp -> Maybe ColumnAlias -> Extractor
S.Extractor (Text -> SQLExp
seIden Text
"event_id") Maybe ColumnAlias
forall a. Maybe a
Nothing,
SQLExp -> Maybe ColumnAlias -> Extractor
S.Extractor (Text -> SQLExp
seIden Text
"status") Maybe ColumnAlias
forall a. Maybe a
Nothing,
SQLExp -> Maybe ColumnAlias -> Extractor
S.Extractor (SQLExp -> SQLExp
withJsonTypeAnn (SQLExp -> SQLExp) -> SQLExp -> SQLExp
forall a b. (a -> b) -> a -> b
$ Text -> SQLExp
seIden Text
"request") Maybe ColumnAlias
forall a. Maybe a
Nothing,
SQLExp -> Maybe ColumnAlias -> Extractor
S.Extractor (SQLExp -> SQLExp
withJsonTypeAnn (SQLExp -> SQLExp) -> SQLExp -> SQLExp
forall a b. (a -> b) -> a -> b
$ Text -> SQLExp
seIden Text
"response") Maybe ColumnAlias
forall a. Maybe a
Nothing,
SQLExp -> Maybe ColumnAlias -> Extractor
S.Extractor (Text -> SQLExp
seIden Text
"created_at") Maybe ColumnAlias
forall a. Maybe a
Nothing
]
where
withJsonTypeAnn :: SQLExp -> SQLExp
withJsonTypeAnn SQLExp
e = SQLExp -> TypeAnn -> SQLExp
S.SETyAnn SQLExp
e (TypeAnn -> SQLExp) -> TypeAnn -> SQLExp
forall a b. (a -> b) -> a -> b
$ Text -> TypeAnn
S.TypeAnn Text
"json"
seIden :: Text -> SQLExp
seIden = QIdentifier -> SQLExp
S.SEQIdentifier (QIdentifier -> SQLExp) -> (Text -> QIdentifier) -> Text -> SQLExp
forall b c a. (b -> c) -> (a -> b) -> a -> c
. QualifiedTable -> Identifier -> QIdentifier
forall a. IsIdentifier a => QualifiedTable -> a -> QIdentifier
S.mkQIdentifierTable QualifiedTable
table (Identifier -> QIdentifier)
-> (Text -> Identifier) -> Text -> QIdentifier
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> Identifier
Identifier
mkEventIdBoolExp :: QualifiedTable -> EventId -> S.BoolExp
mkEventIdBoolExp :: QualifiedTable -> CronEventId -> BoolExp
mkEventIdBoolExp QualifiedTable
table CronEventId
eventId =
CompareOp -> SQLExp -> SQLExp -> BoolExp
S.BECompare
CompareOp
S.SEQ
(QIdentifier -> SQLExp
S.SEQIdentifier (QIdentifier -> SQLExp) -> QIdentifier -> SQLExp
forall a b. (a -> b) -> a -> b
$ QualifiedTable -> Identifier -> QIdentifier
forall a. IsIdentifier a => QualifiedTable -> a -> QIdentifier
S.mkQIdentifierTable QualifiedTable
table (Identifier -> QIdentifier) -> Identifier -> QIdentifier
forall a b. (a -> b) -> a -> b
$ Text -> Identifier
Identifier Text
"event_id")
(Text -> SQLExp
S.SELit (Text -> SQLExp) -> Text -> SQLExp
forall a b. (a -> b) -> a -> b
$ CronEventId -> Text
unEventId CronEventId
eventId)
getInvocationsTx ::
GetInvocationsBy ->
ScheduledEventPagination ->
Q.TxE QErr (WithTotalCount [ScheduledEventInvocation])
getInvocationsTx :: GetInvocationsBy
-> ScheduledEventPagination
-> TxE QErr (WithTotalCount [ScheduledEventInvocation])
getInvocationsTx GetInvocationsBy
invocationsBy ScheduledEventPagination
pagination = do
let eventsTables :: EventTables
eventsTables = QualifiedTable -> QualifiedTable -> QualifiedTable -> EventTables
EventTables QualifiedTable
oneOffInvocationsTable QualifiedTable
cronInvocationsTable QualifiedTable
cronEventsTable
sql :: Query
sql = Builder -> Query
Q.fromBuilder (Builder -> Query) -> Builder -> Query
forall a b. (a -> b) -> a -> b
$ Select -> Builder
forall a. ToSQL a => a -> Builder
toSQL (Select -> Builder) -> Select -> Builder
forall a b. (a -> b) -> a -> b
$ EventTables
-> GetInvocationsBy -> ScheduledEventPagination -> Select
getInvocationsQuery EventTables
eventsTables GetInvocationsBy
invocationsBy ScheduledEventPagination
pagination
((Int, AltJ [ScheduledEventInvocation])
-> WithTotalCount [ScheduledEventInvocation]
forall a. (Int, AltJ a) -> WithTotalCount a
withCount ((Int, AltJ [ScheduledEventInvocation])
-> WithTotalCount [ScheduledEventInvocation])
-> (SingleRow (Int, AltJ [ScheduledEventInvocation])
-> (Int, AltJ [ScheduledEventInvocation]))
-> SingleRow (Int, AltJ [ScheduledEventInvocation])
-> WithTotalCount [ScheduledEventInvocation]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SingleRow (Int, AltJ [ScheduledEventInvocation])
-> (Int, AltJ [ScheduledEventInvocation])
forall a. SingleRow a -> a
Q.getRow) (SingleRow (Int, AltJ [ScheduledEventInvocation])
-> WithTotalCount [ScheduledEventInvocation])
-> TxET QErr IO (SingleRow (Int, AltJ [ScheduledEventInvocation]))
-> TxE QErr (WithTotalCount [ScheduledEventInvocation])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PGTxErr -> QErr)
-> Query
-> ()
-> Bool
-> TxET QErr IO (SingleRow (Int, AltJ [ScheduledEventInvocation]))
forall (m :: * -> *) a r e.
(MonadIO m, FromRes a, ToPrepArgs r) =>
(PGTxErr -> e) -> Query -> r -> Bool -> TxET e m a
Q.withQE PGTxErr -> QErr
defaultTxErrorHandler Query
sql () Bool
True
where
oneOffInvocationsTable :: QualifiedTable
oneOffInvocationsTable = SchemaName -> TableName -> QualifiedTable
forall a. SchemaName -> a -> QualifiedObject a
QualifiedObject SchemaName
"hdb_catalog" (TableName -> QualifiedTable) -> TableName -> QualifiedTable
forall a b. (a -> b) -> a -> b
$ Text -> TableName
TableName Text
"hdb_scheduled_event_invocation_logs"
cronInvocationsTable :: QualifiedTable
cronInvocationsTable = SchemaName -> TableName -> QualifiedTable
forall a. SchemaName -> a -> QualifiedObject a
QualifiedObject SchemaName
"hdb_catalog" (TableName -> QualifiedTable) -> TableName -> QualifiedTable
forall a b. (a -> b) -> a -> b
$ Text -> TableName
TableName Text
"hdb_cron_event_invocation_logs"
data EventTables = EventTables
{ EventTables -> QualifiedTable
etOneOffInvocationsTable :: QualifiedTable,
EventTables -> QualifiedTable
etCronInvocationsTable :: QualifiedTable,
EventTables -> QualifiedTable
etCronEventsTable :: QualifiedTable
}
getInvocationsQueryNoPagination :: EventTables -> GetInvocationsBy -> S.Select
(EventTables QualifiedTable
oneOffInvocationsTable QualifiedTable
cronInvocationsTable QualifiedTable
cronEventsTable') GetInvocationsBy
invocationsBy =
Select
allRowsSelect
where
createdAtOrderBy :: QualifiedTable -> OrderByExp
createdAtOrderBy QualifiedTable
table =
let createdAtCol :: SQLExp
createdAtCol = QIdentifier -> SQLExp
S.SEQIdentifier (QIdentifier -> SQLExp) -> QIdentifier -> SQLExp
forall a b. (a -> b) -> a -> b
$ QualifiedTable -> Identifier -> QIdentifier
forall a. IsIdentifier a => QualifiedTable -> a -> QIdentifier
S.mkQIdentifierTable QualifiedTable
table (Identifier -> QIdentifier) -> Identifier -> QIdentifier
forall a b. (a -> b) -> a -> b
$ Text -> Identifier
Identifier Text
"created_at"
in NonEmpty OrderByItem -> OrderByExp
S.OrderByExp (NonEmpty OrderByItem -> OrderByExp)
-> NonEmpty OrderByItem -> OrderByExp
forall a b. (a -> b) -> a -> b
$ (OrderByItem -> [OrderByItem] -> NonEmpty OrderByItem)
-> [OrderByItem] -> OrderByItem -> NonEmpty OrderByItem
forall a b c. (a -> b -> c) -> b -> a -> c
flip OrderByItem -> [OrderByItem] -> NonEmpty OrderByItem
forall a. a -> [a] -> NonEmpty a
(NE.:|) [] (OrderByItem -> NonEmpty OrderByItem)
-> OrderByItem -> NonEmpty OrderByItem
forall a b. (a -> b) -> a -> b
$ SQLExp -> Maybe OrderType -> Maybe NullsOrder -> OrderByItem
S.OrderByItem SQLExp
createdAtCol (OrderType -> Maybe OrderType
forall a. a -> Maybe a
Just OrderType
S.OTDesc) Maybe NullsOrder
forall a. Maybe a
Nothing
allRowsSelect :: Select
allRowsSelect = case GetInvocationsBy
invocationsBy of
GIBEventId CronEventId
eventId ScheduledEventType
eventType ->
let table :: QualifiedTable
table = case ScheduledEventType
eventType of
ScheduledEventType
OneOff -> QualifiedTable
oneOffInvocationsTable
ScheduledEventType
Cron -> QualifiedTable
cronInvocationsTable
in Select
S.mkSelect
{ selExtr :: [Extractor]
S.selExtr = QualifiedTable -> [Extractor]
invocationFieldExtractors QualifiedTable
table,
selFrom :: Maybe FromExp
S.selFrom = FromExp -> Maybe FromExp
forall a. a -> Maybe a
Just (FromExp -> Maybe FromExp) -> FromExp -> Maybe FromExp
forall a b. (a -> b) -> a -> b
$ QualifiedTable -> FromExp
S.mkSimpleFromExp QualifiedTable
table,
selOrderBy :: Maybe OrderByExp
S.selOrderBy = OrderByExp -> Maybe OrderByExp
forall a. a -> Maybe a
Just (OrderByExp -> Maybe OrderByExp) -> OrderByExp -> Maybe OrderByExp
forall a b. (a -> b) -> a -> b
$ QualifiedTable -> OrderByExp
createdAtOrderBy QualifiedTable
table,
selWhere :: Maybe WhereFrag
S.selWhere = WhereFrag -> Maybe WhereFrag
forall a. a -> Maybe a
Just (WhereFrag -> Maybe WhereFrag) -> WhereFrag -> Maybe WhereFrag
forall a b. (a -> b) -> a -> b
$ BoolExp -> WhereFrag
S.WhereFrag (BoolExp -> WhereFrag) -> BoolExp -> WhereFrag
forall a b. (a -> b) -> a -> b
$ QualifiedTable -> CronEventId -> BoolExp
mkEventIdBoolExp QualifiedTable
table CronEventId
eventId
}
GIBEvent ScheduledEvent
event -> case ScheduledEvent
event of
ScheduledEvent
SEOneOff ->
let table :: QualifiedTable
table = QualifiedTable
oneOffInvocationsTable
in Select
S.mkSelect
{ selExtr :: [Extractor]
S.selExtr = QualifiedTable -> [Extractor]
invocationFieldExtractors QualifiedTable
table,
selFrom :: Maybe FromExp
S.selFrom = FromExp -> Maybe FromExp
forall a. a -> Maybe a
Just (FromExp -> Maybe FromExp) -> FromExp -> Maybe FromExp
forall a b. (a -> b) -> a -> b
$ QualifiedTable -> FromExp
S.mkSimpleFromExp QualifiedTable
table,
selOrderBy :: Maybe OrderByExp
S.selOrderBy = OrderByExp -> Maybe OrderByExp
forall a. a -> Maybe a
Just (OrderByExp -> Maybe OrderByExp) -> OrderByExp -> Maybe OrderByExp
forall a b. (a -> b) -> a -> b
$ QualifiedTable -> OrderByExp
createdAtOrderBy QualifiedTable
table
}
SECron TriggerName
triggerName ->
let invocationTable :: QualifiedTable
invocationTable = QualifiedTable
cronInvocationsTable
eventTable :: QualifiedTable
eventTable = QualifiedTable
cronEventsTable'
joinCondition :: JoinCond
joinCondition =
BoolExp -> JoinCond
S.JoinOn (BoolExp -> JoinCond) -> BoolExp -> JoinCond
forall a b. (a -> b) -> a -> b
$
CompareOp -> SQLExp -> SQLExp -> BoolExp
S.BECompare
CompareOp
S.SEQ
(QIdentifier -> SQLExp
S.SEQIdentifier (QIdentifier -> SQLExp) -> QIdentifier -> SQLExp
forall a b. (a -> b) -> a -> b
$ QualifiedTable -> Identifier -> QIdentifier
forall a. IsIdentifier a => QualifiedTable -> a -> QIdentifier
S.mkQIdentifierTable QualifiedTable
eventTable (Identifier -> QIdentifier) -> Identifier -> QIdentifier
forall a b. (a -> b) -> a -> b
$ Text -> Identifier
Identifier Text
"id")
(QIdentifier -> SQLExp
S.SEQIdentifier (QIdentifier -> SQLExp) -> QIdentifier -> SQLExp
forall a b. (a -> b) -> a -> b
$ QualifiedTable -> Identifier -> QIdentifier
forall a. IsIdentifier a => QualifiedTable -> a -> QIdentifier
S.mkQIdentifierTable QualifiedTable
invocationTable (Identifier -> QIdentifier) -> Identifier -> QIdentifier
forall a b. (a -> b) -> a -> b
$ Text -> Identifier
Identifier Text
"event_id")
joinTables :: JoinExpr
joinTables =
FromItem -> JoinType -> FromItem -> JoinCond -> JoinExpr
S.JoinExpr
(QualifiedTable -> Maybe TableAlias -> FromItem
S.FISimple QualifiedTable
invocationTable Maybe TableAlias
forall a. Maybe a
Nothing)
JoinType
S.Inner
(QualifiedTable -> Maybe TableAlias -> FromItem
S.FISimple QualifiedTable
eventTable Maybe TableAlias
forall a. Maybe a
Nothing)
JoinCond
joinCondition
triggerBoolExp :: BoolExp
triggerBoolExp =
CompareOp -> SQLExp -> SQLExp -> BoolExp
S.BECompare
CompareOp
S.SEQ
(QIdentifier -> SQLExp
S.SEQIdentifier (QIdentifier -> SQLExp) -> QIdentifier -> SQLExp
forall a b. (a -> b) -> a -> b
$ QualifiedTable -> Identifier -> QIdentifier
forall a. IsIdentifier a => QualifiedTable -> a -> QIdentifier
S.mkQIdentifierTable QualifiedTable
eventTable (Text -> Identifier
Identifier Text
"trigger_name"))
(Text -> SQLExp
S.SELit (Text -> SQLExp) -> Text -> SQLExp
forall a b. (a -> b) -> a -> b
$ TriggerName -> Text
triggerNameToTxt TriggerName
triggerName)
in Select
S.mkSelect
{ selExtr :: [Extractor]
S.selExtr = QualifiedTable -> [Extractor]
invocationFieldExtractors QualifiedTable
invocationTable,
selFrom :: Maybe FromExp
S.selFrom = FromExp -> Maybe FromExp
forall a. a -> Maybe a
Just (FromExp -> Maybe FromExp) -> FromExp -> Maybe FromExp
forall a b. (a -> b) -> a -> b
$ [FromItem] -> FromExp
S.FromExp [JoinExpr -> FromItem
S.FIJoin JoinExpr
joinTables],
selWhere :: Maybe WhereFrag
S.selWhere = WhereFrag -> Maybe WhereFrag
forall a. a -> Maybe a
Just (WhereFrag -> Maybe WhereFrag) -> WhereFrag -> Maybe WhereFrag
forall a b. (a -> b) -> a -> b
$ BoolExp -> WhereFrag
S.WhereFrag BoolExp
triggerBoolExp,
selOrderBy :: Maybe OrderByExp
S.selOrderBy = OrderByExp -> Maybe OrderByExp
forall a. a -> Maybe a
Just (OrderByExp -> Maybe OrderByExp) -> OrderByExp -> Maybe OrderByExp
forall a b. (a -> b) -> a -> b
$ QualifiedTable -> OrderByExp
createdAtOrderBy QualifiedTable
invocationTable
}
getInvocationsQuery :: EventTables -> GetInvocationsBy -> ScheduledEventPagination -> S.Select
getInvocationsQuery :: EventTables
-> GetInvocationsBy -> ScheduledEventPagination -> Select
getInvocationsQuery EventTables
ets GetInvocationsBy
invocationsBy ScheduledEventPagination
pagination =
Select -> ScheduledEventPagination -> Select
mkPaginationSelectExp (EventTables -> GetInvocationsBy -> Select
getInvocationsQueryNoPagination EventTables
ets GetInvocationsBy
invocationsBy) ScheduledEventPagination
pagination