Safe Haskell | Safe-Inferred |
---|---|
Language | Haskell2010 |
Scheduled Triggers
This module implements the functionality of invoking webhooks during specified
time events aka scheduled events. The scheduled events are the events generated
by the graphql-engine using the cron triggers or/and a scheduled event can
be created by the user at a specified time with the payload, webhook, headers
and the retry configuration. Scheduled events are modeled using rows in Postgres
with a timestamp
column.
This module implements scheduling and delivery of scheduled events:
- Scheduling a cron event involves creating new cron events. New cron events are created based on the cron schedule and the number of scheduled events that are already present in the scheduled events buffer. The graphql-engine computes the new scheduled events and writes them to the database.(Generator)
- Delivering a scheduled event involves reading undelivered scheduled events from the database and delivering them to the webhook server. (Processor)
The rationale behind separating the event scheduling and event delivery mechanism into two different threads is that the scheduling and delivering of the scheduled events are not directly dependent on each other. The generator will almost always try to create scheduled events which are supposed to be delivered in the future (timestamp > current_timestamp) and the processor will fetch scheduled events of the past (timestamp < current_timestamp). So, the set of the scheduled events generated by the generator and the processor will never be the same. The point here is that they're not correlated to each other. They can be split into different threads for a better performance.
Implementation
The scheduled triggers eventing is being implemented in the metadata storage.
All functions that make interaction to storage system are abstracted in
the @MonadMetadataStorage
class.
During the startup, two threads are started:
Generator: Fetches the list of scheduled triggers from cache and generates the scheduled events.
- Additional events will be generated only if there are fewer than 100 scheduled events.
The upcoming events timestamp will be generated using:
- cron schedule of the scheduled trigger
- max timestamp of the scheduled events that already exist or current_timestamp(when no scheduled events exist)
- The timestamp of the scheduled events is stored with timezone because `SELECT NOW()` returns timestamp with timezone, so it's good to compare two things of the same type.
This effectively corresponds to doing an INSERT with values containing specific timestamp.
- Processor: Fetches the undelivered cron events and the scheduled events from the database and which have timestamp lesser than the current timestamp and then process them.
TODO - Consider and document ordering guarantees - do we have any in the presence of multiple hasura instances? - If we have nothing useful to say about ordering, then consider processing events asynchronously, so that a slow webhook doesn't cause everything subsequent to be delayed
Synopsis
- runCronEventsGenerator :: (MonadIO m, MonadMetadataStorage m) => Logger Hasura -> FetchedCronTriggerStatsLogger -> IO SchemaCache -> m void
- processScheduledTriggers :: (MonadIO m, MonadTrace m, MonadMetadataStorage m, MonadBaseControl IO m) => IO Environment -> Logger Hasura -> FetchedScheduledEventsStatsLogger -> Manager -> ScheduledTriggerMetrics -> IO SchemaCache -> LockedEventsCtx -> m (Forever m)
- getDeprivedCronTriggerStatsTx :: [TriggerName] -> TxE QErr [CronTriggerStats]
- getScheduledEventsForDeliveryTx :: [TriggerName] -> TxE QErr ([CronEvent], [OneOffScheduledEvent])
- insertInvocationTx :: Invocation 'ScheduledType -> ScheduledEventType -> TxE QErr ()
- setScheduledEventOpTx :: ScheduledEventId -> ScheduledEventOp -> ScheduledEventType -> TxE QErr ()
- unlockScheduledEventsTx :: ScheduledEventType -> [ScheduledEventId] -> TxE QErr Int
- unlockAllLockedScheduledEventsTx :: TxE QErr ()
- insertCronEventsTx :: [CronEventSeed] -> TxE QErr ()
- insertOneOffScheduledEventTx :: OneOffEvent -> TxE QErr EventId
- dropFutureCronEventsTx :: ClearCronEvents -> TxE QErr ()
- mkScheduledEventStatusFilter :: [ScheduledEventStatus] -> BoolExp
- scheduledTimeOrderBy :: OrderByExp
- mkPaginationSelectExp :: Select -> ScheduledEventPagination -> RowsCountOption -> Select
- withCount :: (Int, ViaJSON a) -> WithOptionalTotalCount a
- executeWithOptionalTotalCount :: FromJSON a => Query -> RowsCountOption -> TxE QErr (WithOptionalTotalCount a)
- getOneOffScheduledEventsTx :: ScheduledEventPagination -> [ScheduledEventStatus] -> RowsCountOption -> TxE QErr (WithOptionalTotalCount [OneOffScheduledEvent])
- getCronEventsTx :: TriggerName -> ScheduledEventPagination -> [ScheduledEventStatus] -> RowsCountOption -> TxE QErr (WithOptionalTotalCount [CronEvent])
- deleteScheduledEventTx :: ScheduledEventId -> ScheduledEventType -> TxE QErr ()
- invocationFieldExtractors :: QualifiedTable -> [Extractor]
- mkEventIdBoolExp :: QualifiedTable -> EventId -> BoolExp
- getScheduledEventInvocationsTx :: GetScheduledEventInvocations -> TxE QErr (WithOptionalTotalCount [ScheduledEventInvocation])
- data EventTables = EventTables {}
- getScheduledEventsInvocationsQueryNoPagination :: EventTables -> GetScheduledEventInvocationsBy -> Select
- getScheduledEventsInvocationsQuery :: EventTables -> GetScheduledEventInvocations -> Select
- createFetchedScheduledEventsStatsLogger :: MonadIO m => Logger Hasura -> m FetchedScheduledEventsStatsLogger
- closeFetchedScheduledEventsStatsLogger :: MonadIO m => Logger Hasura -> FetchedScheduledEventsStatsLogger -> m ()
- createFetchedCronTriggerStatsLogger :: MonadIO m => Logger Hasura -> m FetchedCronTriggerStatsLogger
- closeFetchedCronTriggersStatsLogger :: MonadIO m => Logger Hasura -> FetchedCronTriggerStatsLogger -> m ()
Documentation
runCronEventsGenerator :: (MonadIO m, MonadMetadataStorage m) => Logger Hasura -> FetchedCronTriggerStatsLogger -> IO SchemaCache -> m void Source #
runCronEventsGenerator makes sure that all the cron triggers have an adequate buffer of cron events.
processScheduledTriggers :: (MonadIO m, MonadTrace m, MonadMetadataStorage m, MonadBaseControl IO m) => IO Environment -> Logger Hasura -> FetchedScheduledEventsStatsLogger -> Manager -> ScheduledTriggerMetrics -> IO SchemaCache -> LockedEventsCtx -> m (Forever m) Source #
getDeprivedCronTriggerStatsTx :: [TriggerName] -> TxE QErr [CronTriggerStats] Source #
Get cron trigger stats for cron jobs with fewer than 100 future reified events in the database
The point here is to maintain a certain number of future events so the user
can kind of see what's coming up, and obviously to give processCronEvents
something to do.
getScheduledEventsForDeliveryTx :: [TriggerName] -> TxE QErr ([CronEvent], [OneOffScheduledEvent]) Source #
insertInvocationTx :: Invocation 'ScheduledType -> ScheduledEventType -> TxE QErr () Source #
setScheduledEventOpTx :: ScheduledEventId -> ScheduledEventOp -> ScheduledEventType -> TxE QErr () Source #
insertCronEventsTx :: [CronEventSeed] -> TxE QErr () Source #
dropFutureCronEventsTx :: ClearCronEvents -> TxE QErr () Source #
mkPaginationSelectExp :: Select -> ScheduledEventPagination -> RowsCountOption -> Select Source #
Build a select expression which outputs total count and list of json rows with pagination limit and offset applied
executeWithOptionalTotalCount :: FromJSON a => Query -> RowsCountOption -> TxE QErr (WithOptionalTotalCount a) Source #
getOneOffScheduledEventsTx :: ScheduledEventPagination -> [ScheduledEventStatus] -> RowsCountOption -> TxE QErr (WithOptionalTotalCount [OneOffScheduledEvent]) Source #
getCronEventsTx :: TriggerName -> ScheduledEventPagination -> [ScheduledEventStatus] -> RowsCountOption -> TxE QErr (WithOptionalTotalCount [CronEvent]) Source #
deleteScheduledEventTx :: ScheduledEventId -> ScheduledEventType -> TxE QErr () Source #
mkEventIdBoolExp :: QualifiedTable -> EventId -> BoolExp Source #
getScheduledEventInvocationsTx :: GetScheduledEventInvocations -> TxE QErr (WithOptionalTotalCount [ScheduledEventInvocation]) Source #
data EventTables Source #
getScheduledEventsInvocationsQueryNoPagination :: EventTables -> GetScheduledEventInvocationsBy -> Select Source #
getScheduledEventsInvocationsQuery :: EventTables -> GetScheduledEventInvocations -> Select Source #
createFetchedScheduledEventsStatsLogger :: MonadIO m => Logger Hasura -> m FetchedScheduledEventsStatsLogger Source #
Logger to accumulate stats of fetched scheduled events over a period of time and log once using 'L.Logger L.Hasura'.
See
createStatsLogger
for more details.
closeFetchedScheduledEventsStatsLogger :: MonadIO m => Logger Hasura -> FetchedScheduledEventsStatsLogger -> m () Source #
Close the fetched scheduled events stats logger.
createFetchedCronTriggerStatsLogger :: MonadIO m => Logger Hasura -> m FetchedCronTriggerStatsLogger Source #
Logger to accumulate stats of fetched cron triggers, for generating cron events, over a period of time and
log once using 'L.Logger L.Hasura'.
See
createStatsLogger
for more details.
closeFetchedCronTriggersStatsLogger :: MonadIO m => Logger Hasura -> FetchedCronTriggerStatsLogger -> m () Source #
Close the fetched cron trigger stats logger.