Safe Haskell | Safe-Inferred |
---|---|
Language | Haskell2010 |
Synopsis
- class Backend b => BackendEventTrigger (b :: BackendType) where
- insertManualEvent :: (MonadIO m, MonadError QErr m) => SourceConfig b -> TableName b -> TriggerName -> Value -> UserInfo -> Maybe TraceContext -> m EventId
- fetchUndeliveredEvents :: (MonadIO m, MonadError QErr m) => SourceConfig b -> SourceName -> [TriggerName] -> MaintenanceMode () -> FetchBatchSize -> m [Event b]
- setRetry :: (MonadIO m, MonadError QErr m) => SourceConfig b -> Event b -> UTCTime -> MaintenanceMode MaintenanceModeVersion -> m ()
- getMaintenanceModeVersion :: (MonadIO m, MonadError QErr m) => SourceConfig b -> m MaintenanceModeVersion
- recordSuccess :: MonadIO m => SourceConfig b -> Event b -> Invocation 'EventType -> MaintenanceMode MaintenanceModeVersion -> m (Either QErr ())
- recordError :: MonadIO m => SourceConfig b -> Event b -> Invocation 'EventType -> ProcessEventError -> MaintenanceMode MaintenanceModeVersion -> m (Either QErr ())
- recordError' :: MonadIO m => SourceConfig b -> Event b -> Maybe (Invocation 'EventType) -> ProcessEventError -> MaintenanceMode MaintenanceModeVersion -> m (Either QErr ())
- dropTriggerAndArchiveEvents :: (MonadIO m, MonadError QErr m) => SourceConfig b -> TriggerName -> TableName b -> m ()
- dropDanglingSQLTrigger :: (MonadIO m, MonadError QErr m) => SourceConfig b -> TriggerName -> TableName b -> HashSet Ops -> m ()
- redeliverEvent :: (MonadIO m, MonadError QErr m) => SourceConfig b -> EventId -> m ()
- unlockEventsInSource :: MonadIO m => SourceConfig b -> NESet EventId -> m (Either QErr Int)
- createMissingSQLTriggers :: (MonadIO m, MonadError QErr m, MonadBaseControl IO m, Backend b) => SQLGenCtx -> SourceConfig b -> TableName b -> ([ColumnInfo b], Maybe (PrimaryKey b (ColumnInfo b))) -> TriggerName -> TriggerOnReplication -> TriggerOpsDef b -> m ()
- createTableEventTrigger :: (MonadBaseControl IO m, MonadIO m, MonadError QErr m) => SQLGenCtx -> SourceConfig b -> TableName b -> [ColumnInfo b] -> TriggerName -> TriggerOnReplication -> TriggerOpsDef b -> Maybe (PrimaryKey b (ColumnInfo b)) -> m (Either QErr ())
- checkIfTriggerExists :: (MonadIO m, MonadError QErr m) => SourceConfig b -> TriggerName -> HashSet Ops -> m Bool
- addCleanupSchedules :: (MonadIO m, MonadError QErr m) => SourceConfig b -> [(TriggerName, AutoTriggerLogCleanupConfig)] -> m ()
- deleteAllScheduledCleanups :: (MonadIO m, MonadError QErr m) => SourceConfig b -> TriggerName -> m ()
- getCleanupEventsForDeletion :: (MonadIO m, MonadError QErr m) => SourceConfig b -> m [(Text, TriggerName)]
- updateCleanupEventStatusToDead :: (MonadIO m, MonadError QErr m) => SourceConfig b -> [Text] -> m ()
- updateCleanupEventStatusToPaused :: (MonadIO m, MonadError QErr m) => SourceConfig b -> Text -> m ()
- updateCleanupEventStatusToCompleted :: (MonadIO m, MonadError QErr m) => SourceConfig b -> Text -> DeletedEventLogStats -> m ()
- deleteEventTriggerLogs :: (MonadIO m, MonadError QErr m) => SourceConfig b -> TriggerLogCleanupConfig -> IO (Maybe (TriggerLogCleanupConfig, EventTriggerCleanupStatus)) -> m DeletedEventLogStats
- fetchEventLogs :: (MonadIO m, MonadError QErr m) => SourceConfig b -> GetEventLogs b -> m [EventLog]
- fetchEventInvocationLogs :: (MonadIO m, MonadError QErr m) => SourceConfig b -> GetEventInvocations b -> m [EventInvocationLog]
- fetchEventById :: (MonadIO m, MonadError QErr m) => SourceConfig b -> GetEventById b -> m EventLogWithInvocations
Documentation
class Backend b => BackendEventTrigger (b :: BackendType) where Source #
The BackendEventTrigger
type class contains functions which interacts
with the source database to perform event trigger related operations like
fetching pending events from the database or inserting a new invocation log
after processing an event.
insertManualEvent :: (MonadIO m, MonadError QErr m) => SourceConfig b -> TableName b -> TriggerName -> Value -> UserInfo -> Maybe TraceContext -> m EventId Source #
insertManualEvent inserts the specified event in the event log table, note that this method should also set the trace context and session variables in the source database context (if available).
fetchUndeliveredEvents Source #
:: (MonadIO m, MonadError QErr m) | |
=> SourceConfig b | |
-> SourceName | |
-> [TriggerName] | List of trigger names which exist in the metadata |
-> MaintenanceMode () | |
-> FetchBatchSize | |
-> m [Event b] |
fetchUndeliveredEvents
fetches the undelivered events from the source
and locks those events for processing. The locking is done so that when
there are multiple instances of graphql-engine connected to the same
source they don't end up processing the same events concurrently.
Also, it's crucial that the SQL query used to fetch events in this function uses something like Postgres's `FOR UPDATE SKIP LOCKED` mechanism so that it skips past the events which are locked by the database and pick newer undelivered events to achieve maximum throughput.
The locking mechanism for event triggers is timestamp based i.e. when an
event is fetched from the database, the locked
column will contain the
timestamp of when it was fetched from the database. Undelivered events
will have NULL
value as their locked
column value.
The idea behind having a timestamp based locking mechanism is that if the
graphql-engine is shutdown abruptly with events being fetched by the
events processor, it will be locked and after the shutdown it will remain
locked. Now with a timestamp based lock, when the graphql-engine is
started again it will also fetch events which have a locked
value of
older than 30 mins along with the undelivered events. So, this way no
events remain in a locked
state.
When fetching the events from the event_log table we also include the list of the triggers that exist in the metadata at that point of time, because we have seen in some cases there are events that do not belong to any of the event triggers present in the metadata and those are fetched only to be failed saying the said event trigger doesn't exist. So, to avoid this (atleast, as much as possible) we get only the events of the event triggers we have in the metadata.
setRetry :: (MonadIO m, MonadError QErr m) => SourceConfig b -> Event b -> UTCTime -> MaintenanceMode MaintenanceModeVersion -> m () Source #
Ad-hoc function to set a retry for an undelivered event
getMaintenanceModeVersion :: (MonadIO m, MonadError QErr m) => SourceConfig b -> m MaintenanceModeVersion Source #
getMaintenanceModeVersion
gets the source catalog version from the
source
recordSuccess :: MonadIO m => SourceConfig b -> Event b -> Invocation 'EventType -> MaintenanceMode MaintenanceModeVersion -> m (Either QErr ()) Source #
recordSuccess
records a successful event invocation, it does a couple
of things,
- Insert the invocation in the invocation logs table
- Mark the event as
delivered
in the event_log table
recordError :: MonadIO m => SourceConfig b -> Event b -> Invocation 'EventType -> ProcessEventError -> MaintenanceMode MaintenanceModeVersion -> m (Either QErr ()) Source #
recordError
records an erronous event invocation, it does a couple of
things,
- Insert the invocation in the invocation logs table
- Depending on the value of
ProcessEventError
, it will either, - Set a retry for the given event - Mark the event aserror
recordError' :: MonadIO m => SourceConfig b -> Event b -> Maybe (Invocation 'EventType) -> ProcessEventError -> MaintenanceMode MaintenanceModeVersion -> m (Either QErr ()) Source #
recordError'
records an erronous event invocation, it does a couple of
things,
- If present, insert the invocation in the invocation logs table
- Depending on the value of
ProcessEventError
, it will either, - Set a retry for the given event - Mark the event aserror
dropTriggerAndArchiveEvents :: (MonadIO m, MonadError QErr m) => SourceConfig b -> TriggerName -> TableName b -> m () Source #
dropTriggerAndArchiveEvents
drops the database trigger and
marks all the events related to the event trigger as archived.
See Note [Cleanup for dropped triggers]
dropDanglingSQLTrigger :: (MonadIO m, MonadError QErr m) => SourceConfig b -> TriggerName -> TableName b -> HashSet Ops -> m () Source #
dropDanglingSQLTriggger
is used to delete the extraneous SQL triggers
created by an event trigger. The extraneous SQL triggers can be created
when an event trigger's definition is replaced to a new definition. For
example, an event trigger authors_all
had an INSERT and UPDATE trigger
defined earlier and after it has UPDATE and DELETE triggers. So, in this
case, we need to drop the trigger created by us earlier for the INSERT
trigger.
redeliverEvent :: (MonadIO m, MonadError QErr m) => SourceConfig b -> EventId -> m () Source #
unlockEventsInSource :: MonadIO m => SourceConfig b -> NESet EventId -> m (Either QErr Int) Source #
unlockEventsInSource
unlocks the cached locked events which were
captured when a graceful shutdown is initiated, so that when the
graphql-engine restarts these events can be fetched to process them
immediately.
createMissingSQLTriggers :: (MonadIO m, MonadError QErr m, MonadBaseControl IO m, Backend b) => SQLGenCtx -> SourceConfig b -> TableName b -> ([ColumnInfo b], Maybe (PrimaryKey b (ColumnInfo b))) -> TriggerName -> TriggerOnReplication -> TriggerOpsDef b -> m () Source #
createMissingSQLTriggers
checks in the source whether all the triggers
exist according to the event trigger's specification. If any SQL trigger doesn't
exist then it will create it.
createTableEventTrigger :: (MonadBaseControl IO m, MonadIO m, MonadError QErr m) => SQLGenCtx -> SourceConfig b -> TableName b -> [ColumnInfo b] -> TriggerName -> TriggerOnReplication -> TriggerOpsDef b -> Maybe (PrimaryKey b (ColumnInfo b)) -> m (Either QErr ()) Source #
checkIfTriggerExists :: (MonadIO m, MonadError QErr m) => SourceConfig b -> TriggerName -> HashSet Ops -> m Bool Source #
addCleanupSchedules :: (MonadIO m, MonadError QErr m) => SourceConfig b -> [(TriggerName, AutoTriggerLogCleanupConfig)] -> m () Source #
addCleanupSchedules
adds cleanup logs for given trigger names and cleanup configs.
This will perform the following steps:
- Get last scheduled cleanup event and count.
- If count is less than 5, then add add more cleanup logs, else do nothing
deleteAllScheduledCleanups :: (MonadIO m, MonadError QErr m) => SourceConfig b -> TriggerName -> m () Source #
deleteAllScheduledCleanups
deletes all scheduled cleanup logs for a given event trigger
getCleanupEventsForDeletion :: (MonadIO m, MonadError QErr m) => SourceConfig b -> m [(Text, TriggerName)] Source #
getCleanupEventsForDeletion
returns the cleanup logs that are to be deleted.
This will perform the following steps:
- Get the scheduled cleanup events that were scheduled before current time.
- If there are multiple entries for the same trigger name with different scheduled time, then fetch the latest entry and mark others as dead.
updateCleanupEventStatusToDead :: (MonadIO m, MonadError QErr m) => SourceConfig b -> [Text] -> m () Source #
updateCleanupEventStatusToDead
updates the event trigger cleanup logs as dead
updateCleanupEventStatusToPaused :: (MonadIO m, MonadError QErr m) => SourceConfig b -> Text -> m () Source #
updateCleanupEventStatusToPaused
updates the cleanup log status to paused
if the event trigger configuration is paused.
updateCleanupEventStatusToCompleted :: (MonadIO m, MonadError QErr m) => SourceConfig b -> Text -> DeletedEventLogStats -> m () Source #
updateCleanupEventStatusToCompleted
updates the cleanup log status after the event logs are deleted.
This will perform the following steps:
- Updates the cleanup config status to
completed
. - Updates the number of event logs and event invocation logs that were deleted for a trigger name
deleteEventTriggerLogs :: (MonadIO m, MonadError QErr m) => SourceConfig b -> TriggerLogCleanupConfig -> IO (Maybe (TriggerLogCleanupConfig, EventTriggerCleanupStatus)) -> m DeletedEventLogStats Source #
deleteEventTriggerLogs
deletes the event logs (and event invocation logs) based on the cleanup configuration given
This will perform the following steps:
- Select all the dead events based on criteria set in the cleanup config.
- Lock the events in the database so that other HGE instances don't pick them up for deletion.
- Based on the config, perform the delete action.
fetchEventLogs :: (MonadIO m, MonadError QErr m) => SourceConfig b -> GetEventLogs b -> m [EventLog] Source #
@fetchEventLogs fetches event logs from the source for a given event trigger.
fetchEventInvocationLogs :: (MonadIO m, MonadError QErr m) => SourceConfig b -> GetEventInvocations b -> m [EventInvocationLog] Source #
@fetchEventInvocationLogs fetches invocation logs from the source for a given event trigger.
fetchEventById :: (MonadIO m, MonadError QErr m) => SourceConfig b -> GetEventById b -> m EventLogWithInvocations Source #
@fetchEventById fetches the event and it's invocation logs from the source for a given EventId.