Safe Haskell | None |
---|---|
Language | Haskell2010 |
Event Triggers
Event triggers are like ordinary SQL triggers, except instead of calling a SQL procedure, they call a webhook. The event delivery mechanism involves coordination between both the database and graphql-engine: only the SQL database knows when the events should fire, but only graphql-engine know how to actually deliver them.
Therefore, event triggers are implemented in two parts:
- Every event trigger is backed by a bona fide SQL trigger. When the SQL trigger fires, it creates a new record in the hdb_catalog.event_log table.
- Concurrently, a thread in graphql-engine monitors the hdb_catalog.event_log table for new events. When new event(s) are found, it uses the information (URL,payload and headers) stored in the event to deliver the event to the webhook.
The creation and deletion of SQL trigger itself is managed by the metadata DDL APIs (see Hasura.RQL.DDL.EventTrigger), so this module focuses on event delivery.
Most of the subtleties involve guaranteeing reliable delivery of events: we guarantee that every event will be delivered at least once, even if graphql-engine crashes. This means we have to record the state of each event in the database, and we have to retry failed requests at a regular (user-configurable) interval.
Synopsis
- newtype EventInternalErr = EventInternalErr QErr
- data EventEngineCtx = EventEngineCtx {}
- data DeliveryInfo = DeliveryInfo {
- diCurrentRetry :: Int
- diMaxRetries :: Int
- newtype QualifiedTableStrict = QualifiedTableStrict {}
- data EventPayload (b :: BackendType) = EventPayload {
- epId :: EventId
- epTable :: TableName b
- epTrigger :: TriggerMetadata
- epEvent :: Value
- epDeliveryInfo :: DeliveryInfo
- epCreatedAt :: UTCTime
- defaultMaxEventThreads :: PositiveInt
- defaultFetchInterval :: DiffTime
- initEventEngineCtx :: Int -> DiffTime -> NonNegativeInt -> STM EventEngineCtx
- saveLockedEventTriggerEvents :: MonadIO m => SourceName -> [EventId] -> TVar (HashMap SourceName (Set EventId)) -> m ()
- removeEventTriggerEventFromLockedEvents :: MonadIO m => SourceName -> EventId -> TVar (HashMap SourceName (Set EventId)) -> m ()
- type BackendEventWithSource = AnyBackend EventWithSource
- type FetchEventArguments = ([BackendEventWithSource], Int, Bool)
- processEventQueue :: forall m. (MonadIO m, HasReporter m, MonadBaseControl IO m, Forall (Pure m), MonadMask m) => Logger Hasura -> Manager -> IO SchemaCache -> EventEngineCtx -> LockedEventsCtx -> ServerMetrics -> EventTriggerMetrics -> MaintenanceMode () -> m (Forever m)
- createEventPayload :: RetryConf -> Event b -> EventPayload b
- processSuccess :: forall b m a. (MonadIO m, BackendEventTrigger b) => SourceConfig b -> Event b -> [HeaderConf] -> Value -> MaintenanceMode MaintenanceModeVersion -> HTTPResp a -> m (Either QErr ())
- processError :: forall b m a. (MonadIO m, BackendEventTrigger b) => SourceConfig b -> Event b -> RetryConf -> [HeaderConf] -> Value -> MaintenanceMode MaintenanceModeVersion -> HTTPErr a -> m (Either QErr ())
- retryOrSetError :: MonadIO m => Event b -> RetryConf -> HTTPErr a -> m ProcessEventError
- mkInvocation :: EventId -> Value -> Maybe Int -> [HeaderConf] -> SerializableBlob -> [HeaderConf] -> Invocation 'EventType
- logQErr :: (MonadReader r m, Has (Logger Hasura) r, MonadIO m) => QErr -> m ()
- getEventTriggerInfoFromEvent :: forall b. Backend b => SchemaCache -> Event b -> Either Text (EventTriggerInfo b)
Documentation
newtype EventInternalErr Source #
data EventEngineCtx Source #
See Note [Maintenance Mode]
data DeliveryInfo Source #
data EventPayload (b :: BackendType) Source #
EventPayload | |
|
initEventEngineCtx :: Int -> DiffTime -> NonNegativeInt -> STM EventEngineCtx Source #
saveLockedEventTriggerEvents :: MonadIO m => SourceName -> [EventId] -> TVar (HashMap SourceName (Set EventId)) -> m () Source #
removeEventTriggerEventFromLockedEvents :: MonadIO m => SourceName -> EventId -> TVar (HashMap SourceName (Set EventId)) -> m () Source #
type FetchEventArguments = ([BackendEventWithSource], Int, Bool) Source #
processEventQueue :: forall m. (MonadIO m, HasReporter m, MonadBaseControl IO m, Forall (Pure m), MonadMask m) => Logger Hasura -> Manager -> IO SchemaCache -> EventEngineCtx -> LockedEventsCtx -> ServerMetrics -> EventTriggerMetrics -> MaintenanceMode () -> m (Forever m) Source #
Service events from our in-DB queue.
There are a few competing concerns and constraints here; we want to... - fetch events in batches for lower DB pressure - don't fetch more than N at a time (since that can mean: space leak, less effective scale out, possible double sends for events we've checked out on exit (TODO clean shutdown procedure)) - try not to cause webhook workers to stall waiting on DB fetch - limit webhook HTTP concurrency per HASURA_GRAPHQL_EVENTS_HTTP_POOL_SIZE
createEventPayload :: RetryConf -> Event b -> EventPayload b Source #
processSuccess :: forall b m a. (MonadIO m, BackendEventTrigger b) => SourceConfig b -> Event b -> [HeaderConf] -> Value -> MaintenanceMode MaintenanceModeVersion -> HTTPResp a -> m (Either QErr ()) Source #
processError :: forall b m a. (MonadIO m, BackendEventTrigger b) => SourceConfig b -> Event b -> RetryConf -> [HeaderConf] -> Value -> MaintenanceMode MaintenanceModeVersion -> HTTPErr a -> m (Either QErr ()) Source #
retryOrSetError :: MonadIO m => Event b -> RetryConf -> HTTPErr a -> m ProcessEventError Source #
mkInvocation :: EventId -> Value -> Maybe Int -> [HeaderConf] -> SerializableBlob -> [HeaderConf] -> Invocation 'EventType Source #
getEventTriggerInfoFromEvent :: forall b. Backend b => SchemaCache -> Event b -> Either Text (EventTriggerInfo b) Source #