graphql-engine-1.0.0: GraphQL API over Postgres
Safe HaskellNone
LanguageHaskell2010

Hasura.Eventing.ScheduledTrigger

Description

Scheduled Triggers

This module implements the functionality of invoking webhooks during specified time events aka scheduled events. The scheduled events are the events generated by the graphql-engine using the cron triggers or/and a scheduled event can be created by the user at a specified time with the payload, webhook, headers and the retry configuration. Scheduled events are modeled using rows in Postgres with a timestamp column.

This module implements scheduling and delivery of scheduled events:

  1. Scheduling a cron event involves creating new cron events. New cron events are created based on the cron schedule and the number of scheduled events that are already present in the scheduled events buffer. The graphql-engine computes the new scheduled events and writes them to the database.(Generator)
  2. Delivering a scheduled event involves reading undelivered scheduled events from the database and delivering them to the webhook server. (Processor)

The rationale behind separating the event scheduling and event delivery mechanism into two different threads is that the scheduling and delivering of the scheduled events are not directly dependent on each other. The generator will almost always try to create scheduled events which are supposed to be delivered in the future (timestamp > current_timestamp) and the processor will fetch scheduled events of the past (timestamp < current_timestamp). So, the set of the scheduled events generated by the generator and the processor will never be the same. The point here is that they're not correlated to each other. They can be split into different threads for a better performance.

Implementation

The scheduled triggers eventing is being implemented in the metadata storage. All functions that make interaction to storage system are abstracted in the @MonadMetadataStorage class.

During the startup, two threads are started:

  1. Generator: Fetches the list of scheduled triggers from cache and generates the scheduled events.

    • Additional events will be generated only if there are fewer than 100 scheduled events.
    • The upcoming events timestamp will be generated using:

      • cron schedule of the scheduled trigger
      • max timestamp of the scheduled events that already exist or current_timestamp(when no scheduled events exist)
      • The timestamp of the scheduled events is stored with timezone because `SELECT NOW()` returns timestamp with timezone, so it's good to compare two things of the same type.

    This effectively corresponds to doing an INSERT with values containing specific timestamp.

  2. Processor: Fetches the undelivered cron events and the scheduled events from the database and which have timestamp lesser than the current timestamp and then process them.

TODO - Consider and document ordering guarantees - do we have any in the presence of multiple hasura instances? - If we have nothing useful to say about ordering, then consider processing events asynchronously, so that a slow webhook doesn't cause everything subsequent to be delayed

Synopsis

Documentation

runCronEventsGenerator :: (MonadIO m, MonadMetadataStorage (MetadataStorageT m)) => Logger Hasura -> IO SchemaCache -> m void Source #

runCronEventsGenerator makes sure that all the cron triggers have an adequate buffer of cron events.

generateScheduleTimes :: UTCTime -> Int -> CronSchedule -> [UTCTime] Source #

Generates next n events starting from according to CronSchedule

getDeprivedCronTriggerStatsTx :: [TriggerName] -> TxE QErr [CronTriggerStats] Source #

Get cron trigger stats for cron jobs with fewer than 100 future reified events in the database

The point here is to maintain a certain number of future events so the user can kind of see what's coming up, and obviously to give processCronEvents something to do.

mkPaginationSelectExp :: Select -> ScheduledEventPagination -> Select Source #

Build a select expression which outputs total count and list of json rows with pagination limit and offset applied

withCount :: (Int, AltJ a) -> WithTotalCount a Source #