{-# LANGUAGE PatternSynonyms #-}
{-# LANGUAGE TemplateHaskell #-}

-- |
-- = Event Triggers
--
-- Event triggers are like ordinary SQL triggers, except instead of calling a SQL
-- procedure, they call a webhook. The event delivery mechanism involves coordination
-- between both the database and graphql-engine: only the SQL database knows
-- when the events should fire, but only graphql-engine know how to actually
-- deliver them.
--
-- Therefore, event triggers are implemented in two parts:
--
-- 1. Every event trigger is backed by a bona fide SQL trigger. When the SQL trigger
--    fires, it creates a new record in the hdb_catalog.event_log table.
--
-- 2. Concurrently, a thread in graphql-engine monitors the hdb_catalog.event_log
--    table for new events. When new event(s) are found, it uses the information
--    (URL,payload and headers) stored in the event to deliver the event
--    to the webhook.
--
-- The creation and deletion of SQL trigger itself is managed by the metadata DDL
-- APIs (see Hasura.RQL.DDL.EventTrigger), so this module focuses on event delivery.
--
-- Most of the subtleties involve guaranteeing reliable delivery of events:
-- we guarantee that every event will be delivered at least once,
-- even if graphql-engine crashes. This means we have to record the state
-- of each event in the database, and we have to retry
-- failed requests at a regular (user-configurable) interval.
module Hasura.Eventing.EventTrigger
  ( initEventEngineCtx,
    processEventQueue,
    defaultMaxEventThreads,
    defaultFetchInterval,
    Event (..),
    EventEngineCtx (..),
    -- Exported for testing
    saveLockedEventTriggerEvents,
    removeEventTriggerEventFromLockedEvents,
  )
where

import Control.Concurrent.Async.Lifted.Safe qualified as LA
import Control.Concurrent.Extended (Forever (..), sleep)
import Control.Concurrent.STM.TVar
import Control.Lens
import Control.Monad.Catch (MonadMask, bracket_, finally, mask_)
import Control.Monad.STM
import Control.Monad.Trans.Control (MonadBaseControl)
import Data.Aeson qualified as J
import Data.Aeson.TH
import Data.Has
import Data.HashMap.Strict qualified as M
import Data.SerializableBlob qualified as SB
import Data.Set qualified as Set
import Data.String
import Data.Text qualified as T
import Data.Text.Extended
import Data.Text.NonEmpty
import Data.Time.Clock
import Data.Time.Clock qualified as Time
import Hasura.Backends.Postgres.SQL.Types hiding (TableName)
import Hasura.Base.Error
import Hasura.Eventing.Common
import Hasura.Eventing.HTTP
import Hasura.HTTP (getHTTPExceptionStatus)
import Hasura.Logging qualified as L
import Hasura.Prelude
import Hasura.RQL.DDL.Headers
import Hasura.RQL.DDL.Webhook.Transform
import Hasura.RQL.Types.Backend
import Hasura.RQL.Types.Common
import Hasura.RQL.Types.EventTrigger
import Hasura.RQL.Types.Eventing.Backend
import Hasura.RQL.Types.Numeric (NonNegativeInt)
import Hasura.RQL.Types.Numeric qualified as Numeric
import Hasura.RQL.Types.SchemaCache
import Hasura.RQL.Types.Source
import Hasura.SQL.AnyBackend qualified as AB
import Hasura.SQL.Backend
import Hasura.Server.Metrics (ServerMetrics (..))
import Hasura.Server.Prometheus (EventTriggerMetrics (..))
import Hasura.Server.Types
import Hasura.Tracing qualified as Tracing
import Network.HTTP.Client.Transformable qualified as HTTP
import System.Metrics.Distribution qualified as EKG.Distribution
import System.Metrics.Gauge qualified as EKG.Gauge
import System.Metrics.Prometheus.Gauge qualified as Prometheus.Gauge
import System.Metrics.Prometheus.Histogram qualified as Prometheus.Histogram

newtype EventInternalErr
  = EventInternalErr QErr
  deriving (EventInternalErr -> EventInternalErr -> Bool
(EventInternalErr -> EventInternalErr -> Bool)
-> (EventInternalErr -> EventInternalErr -> Bool)
-> Eq EventInternalErr
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: EventInternalErr -> EventInternalErr -> Bool
$c/= :: EventInternalErr -> EventInternalErr -> Bool
== :: EventInternalErr -> EventInternalErr -> Bool
$c== :: EventInternalErr -> EventInternalErr -> Bool
Eq)

instance L.ToEngineLog EventInternalErr L.Hasura where
  toEngineLog :: EventInternalErr -> (LogLevel, EngineLogType Hasura, Value)
toEngineLog (EventInternalErr QErr
qerr) = (LogLevel
L.LevelError, EngineLogType Hasura
L.eventTriggerLogType, QErr -> Value
forall a. ToJSON a => a -> Value
J.toJSON QErr
qerr)

{- Note [Maintenance mode]
~~~~~~~~~~~~~~~~~~~~~~~~~~

Maintenance mode is a mode in which users can upgrade their graphql-engine
without any down time. More on maintenance mode can be found here:
https://github.com/hasura/graphql-engine-mono/issues/431.

Basically, there are a few main things that maintenance mode boils down to:

1. No operation that may change the metadata will be allowed.
2. Migrations are not applied when the graphql-engine is started, so the
   catalog schema will be in the older version.
3. Event triggers should continue working in the new code with the older
   catalog schema i.e it should work even if there are any schema changes
   to the `hdb_catalog.event_log` table.

#1 and #2 are fairly self-explanatory. For #3, we need to support fetching
events depending upon the catalog version. So, fetch events works in the
following way now:

1. Check if maintenance mode is enabled
2. If maintenance mode is enabled then read the catalog version from the DB
   and accordingly fire the appropriate query to the events log table.
   When maintenance mode is disabled, we query the events log table according
   to the latest catalog, we do not read the catalog version for this.
-}

-- | See Note [Maintenance Mode]
data EventEngineCtx = EventEngineCtx
  { EventEngineCtx -> TVar Int
_eeCtxEventThreadsCapacity :: TVar Int,
    EventEngineCtx -> DiffTime
_eeCtxFetchInterval :: DiffTime,
    EventEngineCtx -> NonNegativeInt
_eeCtxFetchSize :: NonNegativeInt
  }

data DeliveryInfo = DeliveryInfo
  { DeliveryInfo -> Int
diCurrentRetry :: Int,
    DeliveryInfo -> Int
diMaxRetries :: Int
  }
  deriving (Int -> DeliveryInfo -> ShowS
[DeliveryInfo] -> ShowS
DeliveryInfo -> String
(Int -> DeliveryInfo -> ShowS)
-> (DeliveryInfo -> String)
-> ([DeliveryInfo] -> ShowS)
-> Show DeliveryInfo
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [DeliveryInfo] -> ShowS
$cshowList :: [DeliveryInfo] -> ShowS
show :: DeliveryInfo -> String
$cshow :: DeliveryInfo -> String
showsPrec :: Int -> DeliveryInfo -> ShowS
$cshowsPrec :: Int -> DeliveryInfo -> ShowS
Show, DeliveryInfo -> DeliveryInfo -> Bool
(DeliveryInfo -> DeliveryInfo -> Bool)
-> (DeliveryInfo -> DeliveryInfo -> Bool) -> Eq DeliveryInfo
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: DeliveryInfo -> DeliveryInfo -> Bool
$c/= :: DeliveryInfo -> DeliveryInfo -> Bool
== :: DeliveryInfo -> DeliveryInfo -> Bool
$c== :: DeliveryInfo -> DeliveryInfo -> Bool
Eq)

$(deriveJSON hasuraJSON {omitNothingFields = True} ''DeliveryInfo)

newtype QualifiedTableStrict = QualifiedTableStrict
  { QualifiedTableStrict -> QualifiedTable
getQualifiedTable :: QualifiedTable
  }
  deriving (Int -> QualifiedTableStrict -> ShowS
[QualifiedTableStrict] -> ShowS
QualifiedTableStrict -> String
(Int -> QualifiedTableStrict -> ShowS)
-> (QualifiedTableStrict -> String)
-> ([QualifiedTableStrict] -> ShowS)
-> Show QualifiedTableStrict
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [QualifiedTableStrict] -> ShowS
$cshowList :: [QualifiedTableStrict] -> ShowS
show :: QualifiedTableStrict -> String
$cshow :: QualifiedTableStrict -> String
showsPrec :: Int -> QualifiedTableStrict -> ShowS
$cshowsPrec :: Int -> QualifiedTableStrict -> ShowS
Show, QualifiedTableStrict -> QualifiedTableStrict -> Bool
(QualifiedTableStrict -> QualifiedTableStrict -> Bool)
-> (QualifiedTableStrict -> QualifiedTableStrict -> Bool)
-> Eq QualifiedTableStrict
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: QualifiedTableStrict -> QualifiedTableStrict -> Bool
$c/= :: QualifiedTableStrict -> QualifiedTableStrict -> Bool
== :: QualifiedTableStrict -> QualifiedTableStrict -> Bool
$c== :: QualifiedTableStrict -> QualifiedTableStrict -> Bool
Eq)

instance J.ToJSON QualifiedTableStrict where
  toJSON :: QualifiedTableStrict -> Value
toJSON (QualifiedTableStrict (QualifiedObject SchemaName
sn TableName
tn)) =
    [Pair] -> Value
J.object
      [ Key
"schema" Key -> SchemaName -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= SchemaName
sn,
        Key
"name" Key -> TableName -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= TableName
tn
      ]

data EventPayload (b :: BackendType) = EventPayload
  { EventPayload b -> EventId
epId :: EventId,
    EventPayload b -> TableName b
epTable :: TableName b,
    EventPayload b -> TriggerMetadata
epTrigger :: TriggerMetadata,
    EventPayload b -> Value
epEvent :: J.Value,
    EventPayload b -> DeliveryInfo
epDeliveryInfo :: DeliveryInfo,
    EventPayload b -> UTCTime
epCreatedAt :: Time.UTCTime
  }
  deriving ((forall x. EventPayload b -> Rep (EventPayload b) x)
-> (forall x. Rep (EventPayload b) x -> EventPayload b)
-> Generic (EventPayload b)
forall x. Rep (EventPayload b) x -> EventPayload b
forall x. EventPayload b -> Rep (EventPayload b) x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
forall (b :: BackendType) x.
Rep (EventPayload b) x -> EventPayload b
forall (b :: BackendType) x.
EventPayload b -> Rep (EventPayload b) x
$cto :: forall (b :: BackendType) x.
Rep (EventPayload b) x -> EventPayload b
$cfrom :: forall (b :: BackendType) x.
EventPayload b -> Rep (EventPayload b) x
Generic)

deriving instance Backend b => Show (EventPayload b)

deriving instance Backend b => Eq (EventPayload b)

instance Backend b => J.ToJSON (EventPayload b) where
  toJSON :: EventPayload b -> Value
toJSON = Options -> EventPayload b -> Value
forall a.
(Generic a, GToJSON' Value Zero (Rep a)) =>
Options -> a -> Value
J.genericToJSON Options
hasuraJSON {omitNothingFields :: Bool
omitNothingFields = Bool
True}

defaultMaxEventThreads :: Numeric.PositiveInt
defaultMaxEventThreads :: PositiveInt
defaultMaxEventThreads = Int -> PositiveInt
Numeric.unsafePositiveInt Int
100

defaultFetchInterval :: DiffTime
defaultFetchInterval :: DiffTime
defaultFetchInterval = Seconds -> DiffTime
seconds Seconds
1

initEventEngineCtx :: Int -> DiffTime -> NonNegativeInt -> STM EventEngineCtx
initEventEngineCtx :: Int -> DiffTime -> NonNegativeInt -> STM EventEngineCtx
initEventEngineCtx Int
maxT DiffTime
_eeCtxFetchInterval NonNegativeInt
_eeCtxFetchSize = do
  TVar Int
_eeCtxEventThreadsCapacity <- Int -> STM (TVar Int)
forall a. a -> STM (TVar a)
newTVar Int
maxT
  EventEngineCtx -> STM EventEngineCtx
forall (m :: * -> *) a. Monad m => a -> m a
return (EventEngineCtx -> STM EventEngineCtx)
-> EventEngineCtx -> STM EventEngineCtx
forall a b. (a -> b) -> a -> b
$ EventEngineCtx :: TVar Int -> DiffTime -> NonNegativeInt -> EventEngineCtx
EventEngineCtx {TVar Int
DiffTime
NonNegativeInt
_eeCtxEventThreadsCapacity :: TVar Int
_eeCtxFetchSize :: NonNegativeInt
_eeCtxFetchInterval :: DiffTime
_eeCtxFetchSize :: NonNegativeInt
_eeCtxFetchInterval :: DiffTime
_eeCtxEventThreadsCapacity :: TVar Int
..}

saveLockedEventTriggerEvents :: MonadIO m => SourceName -> [EventId] -> TVar (HashMap SourceName (Set.Set EventId)) -> m ()
saveLockedEventTriggerEvents :: SourceName
-> [EventId] -> TVar (HashMap SourceName (Set EventId)) -> m ()
saveLockedEventTriggerEvents SourceName
sourceName [EventId]
eventIds TVar (HashMap SourceName (Set EventId))
lockedEvents =
  IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$
    STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
      HashMap SourceName (Set EventId)
lockedEventsVals <- TVar (HashMap SourceName (Set EventId))
-> STM (HashMap SourceName (Set EventId))
forall a. TVar a -> STM a
readTVar TVar (HashMap SourceName (Set EventId))
lockedEvents
      case SourceName
-> HashMap SourceName (Set EventId) -> Maybe (Set EventId)
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
M.lookup SourceName
sourceName HashMap SourceName (Set EventId)
lockedEventsVals of
        Maybe (Set EventId)
Nothing -> TVar (HashMap SourceName (Set EventId))
-> HashMap SourceName (Set EventId) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (HashMap SourceName (Set EventId))
lockedEvents (HashMap SourceName (Set EventId) -> STM ())
-> HashMap SourceName (Set EventId) -> STM ()
forall a b. (a -> b) -> a -> b
$! SourceName -> Set EventId -> HashMap SourceName (Set EventId)
forall k v. Hashable k => k -> v -> HashMap k v
M.singleton SourceName
sourceName ([EventId] -> Set EventId
forall a. Ord a => [a] -> Set a
Set.fromList [EventId]
eventIds)
        Just Set EventId
_ -> TVar (HashMap SourceName (Set EventId))
-> HashMap SourceName (Set EventId) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (HashMap SourceName (Set EventId))
lockedEvents (HashMap SourceName (Set EventId) -> STM ())
-> HashMap SourceName (Set EventId) -> STM ()
forall a b. (a -> b) -> a -> b
$! (Set EventId -> Set EventId -> Set EventId)
-> SourceName
-> Set EventId
-> HashMap SourceName (Set EventId)
-> HashMap SourceName (Set EventId)
forall k v.
(Eq k, Hashable k) =>
(v -> v -> v) -> k -> v -> HashMap k v -> HashMap k v
M.insertWith Set EventId -> Set EventId -> Set EventId
forall a. Ord a => Set a -> Set a -> Set a
Set.union SourceName
sourceName ([EventId] -> Set EventId
forall a. Ord a => [a] -> Set a
Set.fromList [EventId]
eventIds) HashMap SourceName (Set EventId)
lockedEventsVals

removeEventTriggerEventFromLockedEvents ::
  MonadIO m => SourceName -> EventId -> TVar (HashMap SourceName (Set.Set EventId)) -> m ()
removeEventTriggerEventFromLockedEvents :: SourceName
-> EventId -> TVar (HashMap SourceName (Set EventId)) -> m ()
removeEventTriggerEventFromLockedEvents SourceName
sourceName EventId
eventId TVar (HashMap SourceName (Set EventId))
lockedEvents =
  IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$
    STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
      HashMap SourceName (Set EventId)
lockedEventsVals <- TVar (HashMap SourceName (Set EventId))
-> STM (HashMap SourceName (Set EventId))
forall a. TVar a -> STM a
readTVar TVar (HashMap SourceName (Set EventId))
lockedEvents
      TVar (HashMap SourceName (Set EventId))
-> HashMap SourceName (Set EventId) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (HashMap SourceName (Set EventId))
lockedEvents (HashMap SourceName (Set EventId) -> STM ())
-> HashMap SourceName (Set EventId) -> STM ()
forall a b. (a -> b) -> a -> b
$! (Set EventId -> Set EventId)
-> SourceName
-> HashMap SourceName (Set EventId)
-> HashMap SourceName (Set EventId)
forall k v.
(Eq k, Hashable k) =>
(v -> v) -> k -> HashMap k v -> HashMap k v
M.adjust (EventId -> Set EventId -> Set EventId
forall a. Ord a => a -> Set a -> Set a
Set.delete EventId
eventId) SourceName
sourceName HashMap SourceName (Set EventId)
lockedEventsVals

type BackendEventWithSource = AB.AnyBackend EventWithSource

type FetchEventArguments = ([BackendEventWithSource], Int, Bool)

-- | Service events from our in-DB queue.
--
-- There are a few competing concerns and constraints here; we want to...
--   - fetch events in batches for lower DB pressure
--   - don't fetch more than N at a time (since that can mean: space leak, less
--     effective scale out, possible double sends for events we've checked out
--     on exit (TODO clean shutdown procedure))
--   - try not to cause webhook workers to stall waiting on DB fetch
--   - limit webhook HTTP concurrency per HASURA_GRAPHQL_EVENTS_HTTP_POOL_SIZE
processEventQueue ::
  forall m.
  ( MonadIO m,
    Tracing.HasReporter m,
    MonadBaseControl IO m,
    LA.Forall (LA.Pure m),
    MonadMask m
  ) =>
  L.Logger L.Hasura ->
  HTTP.Manager ->
  IO SchemaCache ->
  EventEngineCtx ->
  LockedEventsCtx ->
  ServerMetrics ->
  EventTriggerMetrics ->
  MaintenanceMode () ->
  m (Forever m)
processEventQueue :: Logger Hasura
-> Manager
-> IO SchemaCache
-> EventEngineCtx
-> LockedEventsCtx
-> ServerMetrics
-> EventTriggerMetrics
-> MaintenanceMode ()
-> m (Forever m)
processEventQueue Logger Hasura
logger Manager
httpMgr IO SchemaCache
getSchemaCache EventEngineCtx {TVar Int
DiffTime
NonNegativeInt
_eeCtxFetchSize :: NonNegativeInt
_eeCtxFetchInterval :: DiffTime
_eeCtxEventThreadsCapacity :: TVar Int
_eeCtxFetchSize :: EventEngineCtx -> NonNegativeInt
_eeCtxFetchInterval :: EventEngineCtx -> DiffTime
_eeCtxEventThreadsCapacity :: EventEngineCtx -> TVar Int
..} LockedEventsCtx {TVar (HashMap SourceName (Set EventId))
leEvents :: LockedEventsCtx -> TVar (HashMap SourceName (Set EventId))
leEvents :: TVar (HashMap SourceName (Set EventId))
leEvents} ServerMetrics
serverMetrics EventTriggerMetrics
eventTriggerMetrics MaintenanceMode ()
maintenanceMode = do
  [BackendEventWithSource]
events0 <- m [BackendEventWithSource]
popEventsBatch
  Forever m -> m (Forever m)
forall (m :: * -> *) a. Monad m => a -> m a
return (Forever m -> m (Forever m)) -> Forever m -> m (Forever m)
forall a b. (a -> b) -> a -> b
$ ([BackendEventWithSource], Int, Bool)
-> (([BackendEventWithSource], Int, Bool)
    -> m ([BackendEventWithSource], Int, Bool))
-> Forever m
forall (m :: * -> *) a. a -> (a -> m a) -> Forever m
Forever ([BackendEventWithSource]
events0, Int
0, Bool
False) ([BackendEventWithSource], Int, Bool)
-> m ([BackendEventWithSource], Int, Bool)
go
  where
    fetchBatchSize :: Int
fetchBatchSize = NonNegativeInt -> Int
Numeric.getNonNegativeInt NonNegativeInt
_eeCtxFetchSize

    popEventsBatch :: m [BackendEventWithSource]
    popEventsBatch :: m [BackendEventWithSource]
popEventsBatch = do
      {-
        SELECT FOR UPDATE .. SKIP LOCKED can throw serialization errors in RepeatableRead: https://stackoverflow.com/a/53289263/1911889
        We can avoid this safely by running it in ReadCommitted as Postgres will recheck the
        predicate condition if a row is updated concurrently: https://www.postgresql.org/docs/9.5/transaction-iso.html#XACT-READ-COMMITTED

        Every other action on an event_log row (like post-processing, archival, etc) are single writes (no R-W or W-R)
        so it is safe to perform them in ReadCommitted as well (the writes will then acquire some serial order).
        Any serial order of updates to a row will lead to an eventually consistent state as the row will have
        (delivered=t or error=t or archived=t) after a fixed number of tries (assuming it begins with locked='f').
      -}
      SourceCache
allSources <- SchemaCache -> SourceCache
scSources (SchemaCache -> SourceCache) -> m SchemaCache -> m SourceCache
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO SchemaCache -> m SchemaCache
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO SchemaCache
getSchemaCache
      IO [BackendEventWithSource] -> m [BackendEventWithSource]
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [BackendEventWithSource] -> m [BackendEventWithSource])
-> (IO [[BackendEventWithSource]] -> IO [BackendEventWithSource])
-> IO [[BackendEventWithSource]]
-> m [BackendEventWithSource]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ([[BackendEventWithSource]] -> [BackendEventWithSource])
-> IO [[BackendEventWithSource]] -> IO [BackendEventWithSource]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap [[BackendEventWithSource]] -> [BackendEventWithSource]
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat (IO [[BackendEventWithSource]] -> m [BackendEventWithSource])
-> IO [[BackendEventWithSource]] -> m [BackendEventWithSource]
forall a b. (a -> b) -> a -> b
$
        -- fetch pending events across all the sources asynchronously
        [(SourceName, BackendSourceInfo)]
-> ((SourceName, BackendSourceInfo) -> IO [BackendEventWithSource])
-> IO [[BackendEventWithSource]]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, MonadBaseControl IO m, Forall (Pure m)) =>
t a -> (a -> m b) -> m (t b)
LA.forConcurrently (SourceCache -> [(SourceName, BackendSourceInfo)]
forall k v. HashMap k v -> [(k, v)]
M.toList SourceCache
allSources) \(SourceName
sourceName, BackendSourceInfo
sourceCache) ->
          BackendSourceInfo
-> (forall (b :: BackendType).
    BackendEventTrigger b =>
    SourceInfo b -> IO [BackendEventWithSource])
-> IO [BackendEventWithSource]
forall (c :: BackendType -> Constraint) (i :: BackendType -> *) r.
AllBackendsSatisfy c =>
AnyBackend i -> (forall (b :: BackendType). c b => i b -> r) -> r
AB.dispatchAnyBackend @BackendEventTrigger BackendSourceInfo
sourceCache \(SourceInfo SourceName
_sourceName TableCache b
tableCache FunctionCache b
_functionCache SourceConfig b
sourceConfig Maybe QueryTagsConfig
_queryTagsConfig SourceCustomization
_sourceCustomization :: SourceInfo b) -> do
            let tables :: [TableInfo b]
tables = TableCache b -> [TableInfo b]
forall k v. HashMap k v -> [v]
M.elems TableCache b
tableCache
                triggerMap :: [EventTriggerInfoMap b]
triggerMap = TableInfo b -> EventTriggerInfoMap b
forall (b :: BackendType). TableInfo b -> EventTriggerInfoMap b
_tiEventTriggerInfoMap (TableInfo b -> EventTriggerInfoMap b)
-> [TableInfo b] -> [EventTriggerInfoMap b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [TableInfo b]
tables
                eventTriggerCount :: Int
eventTriggerCount = [Int] -> Int
forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum (EventTriggerInfoMap b -> Int
forall k v. HashMap k v -> Int
M.size (EventTriggerInfoMap b -> Int) -> [EventTriggerInfoMap b] -> [Int]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [EventTriggerInfoMap b]
triggerMap)
                triggerNames :: [TriggerName]
triggerNames = (EventTriggerInfoMap b -> [TriggerName])
-> [EventTriggerInfoMap b] -> [TriggerName]
forall (t :: * -> *) a b. Foldable t => (a -> [b]) -> t a -> [b]
concatMap EventTriggerInfoMap b -> [TriggerName]
forall k v. HashMap k v -> [k]
M.keys [EventTriggerInfoMap b]
triggerMap

            -- only process events for this source if at least one event trigger exists
            if Int
eventTriggerCount Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
              then
                ( ExceptT QErr IO [Event b] -> IO (Either QErr [Event b])
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (SourceConfig b
-> SourceName
-> [TriggerName]
-> MaintenanceMode ()
-> FetchBatchSize
-> ExceptT QErr IO [Event b]
forall (b :: BackendType) (m :: * -> *).
(BackendEventTrigger b, MonadIO m, MonadError QErr m) =>
SourceConfig b
-> SourceName
-> [TriggerName]
-> MaintenanceMode ()
-> FetchBatchSize
-> m [Event b]
fetchUndeliveredEvents @b SourceConfig b
sourceConfig SourceName
sourceName [TriggerName]
triggerNames MaintenanceMode ()
maintenanceMode (Int -> FetchBatchSize
FetchBatchSize Int
fetchBatchSize)) IO (Either QErr [Event b])
-> (Either QErr [Event b] -> IO [BackendEventWithSource])
-> IO [BackendEventWithSource]
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
                    Right [Event b]
events -> do
                      ()
_ <- IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Distribution -> Double -> IO ()
EKG.Distribution.add (ServerMetrics -> Distribution
smNumEventsFetchedPerBatch ServerMetrics
serverMetrics) (Int -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Double) -> Int -> Double
forall a b. (a -> b) -> a -> b
$ [Event b] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Event b]
events)
                      UTCTime
eventsFetchedTime <- IO UTCTime -> IO UTCTime
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
                      SourceName
-> [EventId] -> TVar (HashMap SourceName (Set EventId)) -> IO ()
forall (m :: * -> *).
MonadIO m =>
SourceName
-> [EventId] -> TVar (HashMap SourceName (Set EventId)) -> m ()
saveLockedEventTriggerEvents SourceName
sourceName (Event b -> EventId
forall (b :: BackendType). Event b -> EventId
eId (Event b -> EventId) -> [Event b] -> [EventId]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [Event b]
events) TVar (HashMap SourceName (Set EventId))
leEvents
                      [BackendEventWithSource] -> IO [BackendEventWithSource]
forall (m :: * -> *) a. Monad m => a -> m a
return ([BackendEventWithSource] -> IO [BackendEventWithSource])
-> [BackendEventWithSource] -> IO [BackendEventWithSource]
forall a b. (a -> b) -> a -> b
$ (Event b -> BackendEventWithSource)
-> [Event b] -> [BackendEventWithSource]
forall a b. (a -> b) -> [a] -> [b]
map (\Event b
event -> forall (b :: BackendType) (i :: BackendType -> *).
HasTag b =>
i b -> AnyBackend i
forall (i :: BackendType -> *). HasTag b => i b -> AnyBackend i
AB.mkAnyBackend @b (EventWithSource b -> BackendEventWithSource)
-> EventWithSource b -> BackendEventWithSource
forall a b. (a -> b) -> a -> b
$ Event b
-> SourceConfig b -> SourceName -> UTCTime -> EventWithSource b
forall (b :: BackendType).
Event b
-> SourceConfig b -> SourceName -> UTCTime -> EventWithSource b
EventWithSource Event b
event SourceConfig b
sourceConfig SourceName
sourceName UTCTime
eventsFetchedTime) [Event b]
events
                    Left QErr
err -> do
                      IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Logger Hasura
-> forall a (m :: * -> *).
   (ToEngineLog a Hasura, MonadIO m) =>
   a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
   (ToEngineLog a impl, MonadIO m) =>
   a -> m ()
L.unLogger Logger Hasura
logger (EventInternalErr -> IO ()) -> EventInternalErr -> IO ()
forall a b. (a -> b) -> a -> b
$ QErr -> EventInternalErr
EventInternalErr QErr
err
                      [BackendEventWithSource] -> IO [BackendEventWithSource]
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
                )
              else [BackendEventWithSource] -> IO [BackendEventWithSource]
forall (f :: * -> *) a. Applicative f => a -> f a
pure []

    -- !!! CAREFUL !!!
    --     The logic here in particular is subtle and has been fixed, broken,
    --     and fixed again in several different ways, several times.
    -- !!! CAREFUL !!!
    --
    -- work on this batch of events while prefetching the next. Recurse after we've forked workers
    -- for each in the batch, minding the requested pool size.
    go :: FetchEventArguments -> m FetchEventArguments
    go :: ([BackendEventWithSource], Int, Bool)
-> m ([BackendEventWithSource], Int, Bool)
go ([BackendEventWithSource]
events, !Int
fullFetchCount, !Bool
alreadyWarned) = do
      -- process events ASAP until we've caught up; only then can we sleep
      Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when ([BackendEventWithSource] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [BackendEventWithSource]
events) (m () -> m ()) -> (IO () -> m ()) -> IO () -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ DiffTime -> IO ()
sleep DiffTime
_eeCtxFetchInterval

      -- Prefetch next events payload while concurrently working through our current batch.
      -- NOTE: we probably don't need to prefetch so early, but probably not
      -- worth the effort for something more fine-tuned
      [BackendEventWithSource]
eventsNext <- m [BackendEventWithSource]
-> (Async [BackendEventWithSource] -> m [BackendEventWithSource])
-> m [BackendEventWithSource]
forall (m :: * -> *) a b.
(MonadBaseControl IO m, Forall (Pure m)) =>
m a -> (Async a -> m b) -> m b
LA.withAsync m [BackendEventWithSource]
popEventsBatch ((Async [BackendEventWithSource] -> m [BackendEventWithSource])
 -> m [BackendEventWithSource])
-> (Async [BackendEventWithSource] -> m [BackendEventWithSource])
-> m [BackendEventWithSource]
forall a b. (a -> b) -> a -> b
$ \Async [BackendEventWithSource]
eventsNextA -> do
        -- process approximately in order, minding HASURA_GRAPHQL_EVENTS_HTTP_POOL_SIZE:
        [BackendEventWithSource]
-> (BackendEventWithSource -> m ()) -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [BackendEventWithSource]
events ((BackendEventWithSource -> m ()) -> m ())
-> (BackendEventWithSource -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \BackendEventWithSource
eventWithSource ->
          -- NOTE: we implement a logical bracket pattern here with the
          -- increment and decrement of _eeCtxEventThreadsCapacity which
          -- depends on not putting anything that can throw in the body here:
          BackendEventWithSource
-> (forall (b :: BackendType).
    BackendEventTrigger b =>
    EventWithSource b -> m ())
-> m ()
forall (c :: BackendType -> Constraint) (i :: BackendType -> *) r.
AllBackendsSatisfy c =>
AnyBackend i -> (forall (b :: BackendType). c b => i b -> r) -> r
AB.dispatchAnyBackend @BackendEventTrigger BackendEventWithSource
eventWithSource \(EventWithSource b
eventWithSource' :: EventWithSource b) ->
            m () -> m ()
forall (m :: * -> *) a. MonadMask m => m a -> m a
mask_ (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
              IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$
                STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
                  -- block until < HASURA_GRAPHQL_EVENTS_HTTP_POOL_SIZE threads:
                  Int
capacity <- TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
_eeCtxEventThreadsCapacity
                  Bool -> STM ()
check (Bool -> STM ()) -> Bool -> STM ()
forall a b. (a -> b) -> a -> b
$ Int
capacity Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
                  TVar Int -> Int -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Int
_eeCtxEventThreadsCapacity (Int
capacity Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1)
              -- since there is some capacity in our worker threads, we can launch another:
              let restoreCapacity :: ReaderT (Logger Hasura, Manager) m ()
restoreCapacity =
                    IO () -> ReaderT (Logger Hasura, Manager) m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ReaderT (Logger Hasura, Manager) m ())
-> IO () -> ReaderT (Logger Hasura, Manager) m ()
forall a b. (a -> b) -> a -> b
$
                      STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
                        TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
_eeCtxEventThreadsCapacity (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
              Async ()
t <-
                m () -> m (Async ())
forall (m :: * -> *) a.
(MonadBaseControl IO m, Forall (Pure m)) =>
m a -> m (Async a)
LA.async (m () -> m (Async ())) -> m () -> m (Async ())
forall a b. (a -> b) -> a -> b
$
                  (ReaderT (Logger Hasura, Manager) m ()
 -> (Logger Hasura, Manager) -> m ())
-> (Logger Hasura, Manager)
-> ReaderT (Logger Hasura, Manager) m ()
-> m ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip ReaderT (Logger Hasura, Manager) m ()
-> (Logger Hasura, Manager) -> m ()
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT (Logger Hasura
logger, Manager
httpMgr) (ReaderT (Logger Hasura, Manager) m () -> m ())
-> ReaderT (Logger Hasura, Manager) m () -> m ()
forall a b. (a -> b) -> a -> b
$
                    EventWithSource b -> ReaderT (Logger Hasura, Manager) m ()
forall (io :: * -> *) r (b :: BackendType).
(MonadIO io, MonadReader r io, Has Manager r,
 Has (Logger Hasura) r, HasReporter io, MonadMask io,
 BackendEventTrigger b) =>
EventWithSource b -> io ()
processEvent EventWithSource b
eventWithSource'
                      ReaderT (Logger Hasura, Manager) m ()
-> ReaderT (Logger Hasura, Manager) m ()
-> ReaderT (Logger Hasura, Manager) m ()
forall (m :: * -> *) a b. MonadMask m => m a -> m b -> m a
`finally`
                      -- NOTE!: this needs to happen IN THE FORKED THREAD:
                      ReaderT (Logger Hasura, Manager) m ()
restoreCapacity
              Async () -> m ()
forall (m :: * -> *) a. MonadBase IO m => Async a -> m ()
LA.link Async ()
t

        -- return when next batch ready; some 'processEvent' threads may be running.
        Async [BackendEventWithSource] -> m [BackendEventWithSource]
forall (m :: * -> *) a.
(MonadBase IO m, Forall (Pure m)) =>
Async a -> m a
LA.wait Async [BackendEventWithSource]
eventsNextA

      let lenEvents :: Int
lenEvents = [BackendEventWithSource] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [BackendEventWithSource]
events
      if
          | Int
lenEvents Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
fetchBatchSize -> do
            -- If we've seen N fetches in a row from the DB come back full (i.e. only limited
            -- by our LIMIT clause), then we say we're clearly falling behind:
            let clearlyBehind :: Bool
clearlyBehind = Int
fullFetchCount Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
3
            Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
alreadyWarned (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
              Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
clearlyBehind (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
                Logger Hasura
-> forall a (m :: * -> *).
   (ToEngineLog a Hasura, MonadIO m) =>
   a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
   (ToEngineLog a impl, MonadIO m) =>
   a -> m ()
L.unLogger Logger Hasura
logger (UnstructuredLog -> m ()) -> UnstructuredLog -> m ()
forall a b. (a -> b) -> a -> b
$
                  LogLevel -> SerializableBlob -> UnstructuredLog
L.UnstructuredLog LogLevel
L.LevelWarn (SerializableBlob -> UnstructuredLog)
-> SerializableBlob -> UnstructuredLog
forall a b. (a -> b) -> a -> b
$
                    String -> SerializableBlob
forall a. IsString a => String -> a
fromString (String -> SerializableBlob) -> String -> SerializableBlob
forall a b. (a -> b) -> a -> b
$
                      String
"Events processor may not be keeping up with events generated in postgres, "
                        String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
"or we're working on a backlog of events. Consider increasing "
                        String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
"HASURA_GRAPHQL_EVENTS_HTTP_POOL_SIZE"
            ([BackendEventWithSource], Int, Bool)
-> m ([BackendEventWithSource], Int, Bool)
forall (m :: * -> *) a. Monad m => a -> m a
return ([BackendEventWithSource]
eventsNext, (Int
fullFetchCount Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1), (Bool
alreadyWarned Bool -> Bool -> Bool
|| Bool
clearlyBehind))
          | Bool
otherwise -> do
            Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
lenEvents Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
/= Int
fetchBatchSize Bool -> Bool -> Bool
&& Bool
alreadyWarned) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
              -- emit as warning in case users are only logging warning severity and saw above
              Logger Hasura
-> forall a (m :: * -> *).
   (ToEngineLog a Hasura, MonadIO m) =>
   a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
   (ToEngineLog a impl, MonadIO m) =>
   a -> m ()
L.unLogger Logger Hasura
logger (UnstructuredLog -> m ()) -> UnstructuredLog -> m ()
forall a b. (a -> b) -> a -> b
$
                LogLevel -> SerializableBlob -> UnstructuredLog
L.UnstructuredLog LogLevel
L.LevelWarn (SerializableBlob -> UnstructuredLog)
-> SerializableBlob -> UnstructuredLog
forall a b. (a -> b) -> a -> b
$
                  String -> SerializableBlob
forall a. IsString a => String -> a
fromString (String -> SerializableBlob) -> String -> SerializableBlob
forall a b. (a -> b) -> a -> b
$
                    String
"It looks like the events processor is keeping up again."
            ([BackendEventWithSource], Int, Bool)
-> m ([BackendEventWithSource], Int, Bool)
forall (m :: * -> *) a. Monad m => a -> m a
return ([BackendEventWithSource]
eventsNext, Int
0, Bool
False)

    processEvent ::
      forall io r b.
      ( MonadIO io,
        MonadReader r io,
        Has HTTP.Manager r,
        Has (L.Logger L.Hasura) r,
        Tracing.HasReporter io,
        MonadMask io,
        BackendEventTrigger b
      ) =>
      EventWithSource b ->
      io ()
    processEvent :: EventWithSource b -> io ()
processEvent (EventWithSource Event b
e SourceConfig b
sourceConfig SourceName
sourceName UTCTime
eventFetchedTime) = do
      -- Track Queue Time of Event (in seconds). See `smEventQueueTime`
      -- Queue Time = Time when the event was fetched from DB - Time when the event is being processed
      UTCTime
eventProcessTime <- IO UTCTime -> io UTCTime
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
      let eventQueueTime :: Double
eventQueueTime = NominalDiffTime -> Double
forall a b. (Real a, Fractional b) => a -> b
realToFrac (NominalDiffTime -> Double) -> NominalDiffTime -> Double
forall a b. (a -> b) -> a -> b
$ UTCTime -> UTCTime -> NominalDiffTime
diffUTCTime UTCTime
eventProcessTime UTCTime
eventFetchedTime
      ()
_ <- IO () -> io ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> io ()) -> IO () -> io ()
forall a b. (a -> b) -> a -> b
$ Distribution -> Double -> IO ()
EKG.Distribution.add (ServerMetrics -> Distribution
smEventQueueTime ServerMetrics
serverMetrics) Double
eventQueueTime
      IO () -> io ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> io ()) -> IO () -> io ()
forall a b. (a -> b) -> a -> b
$ Histogram -> Double -> IO ()
Prometheus.Histogram.observe (EventTriggerMetrics -> Histogram
eventQueueTimeSeconds EventTriggerMetrics
eventTriggerMetrics) Double
eventQueueTime

      SchemaCache
cache <- IO SchemaCache -> io SchemaCache
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO SchemaCache
getSchemaCache

      Maybe TraceContext
tracingCtx <- IO (Maybe TraceContext) -> io (Maybe TraceContext)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Value -> IO (Maybe TraceContext)
Tracing.extractEventContext (Event b -> Value
forall (b :: BackendType). Event b -> Value
eEvent Event b
e))
      let spanName :: EventTriggerInfo b -> Text
spanName EventTriggerInfo b
eti = Text
"Event trigger: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> NonEmptyText -> Text
unNonEmptyText (TriggerName -> NonEmptyText
unTriggerName (EventTriggerInfo b -> TriggerName
forall (b :: BackendType). EventTriggerInfo b -> TriggerName
etiName EventTriggerInfo b
eti))
          runTraceT :: Text -> TraceT io () -> io ()
runTraceT =
            (Text -> TraceT io () -> io ())
-> (TraceContext -> Text -> TraceT io () -> io ())
-> Maybe TraceContext
-> Text
-> TraceT io ()
-> io ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe
              Text -> TraceT io () -> io ()
forall (m :: * -> *) a.
(HasReporter m, MonadIO m) =>
Text -> TraceT m a -> m a
Tracing.runTraceT
              TraceContext -> Text -> TraceT io () -> io ()
forall (m :: * -> *) a.
(MonadIO m, HasReporter m) =>
TraceContext -> Text -> TraceT m a -> m a
Tracing.runTraceTInContext
              Maybe TraceContext
tracingCtx

      Either QErr (MaintenanceMode MaintenanceModeVersion)
maintenanceModeVersionEither :: Either QErr (MaintenanceMode MaintenanceModeVersion) <-
        case MaintenanceMode ()
maintenanceMode of
          MaintenanceModeEnabled () -> do
            ExceptT QErr io MaintenanceModeVersion
-> io (Either QErr MaintenanceModeVersion)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (SourceConfig b -> ExceptT QErr io MaintenanceModeVersion
forall (b :: BackendType) (m :: * -> *).
(BackendEventTrigger b, MonadIO m, MonadError QErr m) =>
SourceConfig b -> m MaintenanceModeVersion
getMaintenanceModeVersion @b SourceConfig b
sourceConfig) io (Either QErr MaintenanceModeVersion)
-> (Either QErr MaintenanceModeVersion
    -> Either QErr (MaintenanceMode MaintenanceModeVersion))
-> io (Either QErr (MaintenanceMode MaintenanceModeVersion))
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \case
              Left QErr
err -> QErr -> Either QErr (MaintenanceMode MaintenanceModeVersion)
forall a b. a -> Either a b
Left QErr
err
              Right MaintenanceModeVersion
maintenanceModeVersion -> MaintenanceMode MaintenanceModeVersion
-> Either QErr (MaintenanceMode MaintenanceModeVersion)
forall a b. b -> Either a b
Right (MaintenanceMode MaintenanceModeVersion
 -> Either QErr (MaintenanceMode MaintenanceModeVersion))
-> MaintenanceMode MaintenanceModeVersion
-> Either QErr (MaintenanceMode MaintenanceModeVersion)
forall a b. (a -> b) -> a -> b
$ (MaintenanceModeVersion -> MaintenanceMode MaintenanceModeVersion
forall a. a -> MaintenanceMode a
MaintenanceModeEnabled MaintenanceModeVersion
maintenanceModeVersion)
          MaintenanceMode ()
MaintenanceModeDisabled -> Either QErr (MaintenanceMode MaintenanceModeVersion)
-> io (Either QErr (MaintenanceMode MaintenanceModeVersion))
forall (m :: * -> *) a. Monad m => a -> m a
return (Either QErr (MaintenanceMode MaintenanceModeVersion)
 -> io (Either QErr (MaintenanceMode MaintenanceModeVersion)))
-> Either QErr (MaintenanceMode MaintenanceModeVersion)
-> io (Either QErr (MaintenanceMode MaintenanceModeVersion))
forall a b. (a -> b) -> a -> b
$ MaintenanceMode MaintenanceModeVersion
-> Either QErr (MaintenanceMode MaintenanceModeVersion)
forall a b. b -> Either a b
Right MaintenanceMode MaintenanceModeVersion
forall a. MaintenanceMode a
MaintenanceModeDisabled

      case Either QErr (MaintenanceMode MaintenanceModeVersion)
maintenanceModeVersionEither of
        Left QErr
maintenanceModeVersionErr -> QErr -> io ()
forall r (m :: * -> *).
(MonadReader r m, Has (Logger Hasura) r, MonadIO m) =>
QErr -> m ()
logQErr QErr
maintenanceModeVersionErr
        Right MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion ->
          case SchemaCache -> Event b -> Either Text (EventTriggerInfo b)
forall (b :: BackendType).
Backend b =>
SchemaCache -> Event b -> Either Text (EventTriggerInfo b)
getEventTriggerInfoFromEvent SchemaCache
cache Event b
e of
            Left Text
err -> do
              --  This rare error can happen in the following known cases:
              --  i) schema cache is not up-to-date (due to some bug, say during schema syncing across multiple instances)
              --  ii) the event trigger is dropped when this event was just fetched
              QErr -> io ()
forall r (m :: * -> *).
(MonadReader r m, Has (Logger Hasura) r, MonadIO m) =>
QErr -> m ()
logQErr (QErr -> io ()) -> QErr -> io ()
forall a b. (a -> b) -> a -> b
$ Code -> Text -> QErr
err500 Code
Unexpected Text
err
              UTCTime
currentTime <- IO UTCTime -> io UTCTime
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
              -- For such an event, we unlock the event and retry after a minute
              ExceptT QErr io () -> io (Either QErr ())
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (SourceConfig b
-> Event b
-> UTCTime
-> MaintenanceMode MaintenanceModeVersion
-> ExceptT QErr io ()
forall (b :: BackendType) (m :: * -> *).
(BackendEventTrigger b, MonadIO m, MonadError QErr m) =>
SourceConfig b
-> Event b
-> UTCTime
-> MaintenanceMode MaintenanceModeVersion
-> m ()
setRetry SourceConfig b
sourceConfig Event b
e (NominalDiffTime -> UTCTime -> UTCTime
addUTCTime NominalDiffTime
60 UTCTime
currentTime) MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion)
                io (Either QErr ()) -> (Either QErr () -> io ()) -> io ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Either QErr () -> (QErr -> io ()) -> io ())
-> (QErr -> io ()) -> Either QErr () -> io ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip Either QErr () -> (QErr -> io ()) -> io ()
forall (m :: * -> *) e a.
Applicative m =>
Either e a -> (e -> m a) -> m a
onLeft QErr -> io ()
forall r (m :: * -> *).
(MonadReader r m, Has (Logger Hasura) r, MonadIO m) =>
QErr -> m ()
logQErr
            Right EventTriggerInfo b
eti -> Text -> TraceT io () -> io ()
runTraceT (EventTriggerInfo b -> Text
forall (b :: BackendType). EventTriggerInfo b -> Text
spanName EventTriggerInfo b
eti) do
              let webhook :: EnvRecord ResolvedWebhook
webhook = WebhookConfInfo -> EnvRecord ResolvedWebhook
wciCachedValue (WebhookConfInfo -> EnvRecord ResolvedWebhook)
-> WebhookConfInfo -> EnvRecord ResolvedWebhook
forall a b. (a -> b) -> a -> b
$ EventTriggerInfo b -> WebhookConfInfo
forall (b :: BackendType). EventTriggerInfo b -> WebhookConfInfo
etiWebhookInfo EventTriggerInfo b
eti
                  retryConf :: RetryConf
retryConf = EventTriggerInfo b -> RetryConf
forall (b :: BackendType). EventTriggerInfo b -> RetryConf
etiRetryConf EventTriggerInfo b
eti
                  timeoutSeconds :: Int
timeoutSeconds = Int -> Maybe Int -> Int
forall a. a -> Maybe a -> a
fromMaybe Int
defaultTimeoutSeconds (RetryConf -> Maybe Int
rcTimeoutSec RetryConf
retryConf)
                  httpTimeout :: ResponseTimeout
httpTimeout = Int -> ResponseTimeout
HTTP.responseTimeoutMicro (Int
timeoutSeconds Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000000)
                  ([Header]
headers, [HeaderConf]
logHeaders) = [EventHeaderInfo] -> ([Header], [HeaderConf])
prepareHeaders (EventTriggerInfo b -> [EventHeaderInfo]
forall (b :: BackendType). EventTriggerInfo b -> [EventHeaderInfo]
etiHeaders EventTriggerInfo b
eti)
                  ep :: EventPayload b
ep = RetryConf -> Event b -> EventPayload b
forall (b :: BackendType). RetryConf -> Event b -> EventPayload b
createEventPayload RetryConf
retryConf Event b
e
                  payload :: ByteString
payload = Value -> ByteString
forall a. ToJSON a => a -> ByteString
J.encode (Value -> ByteString) -> Value -> ByteString
forall a b. (a -> b) -> a -> b
$ EventPayload b -> Value
forall a. ToJSON a => a -> Value
J.toJSON EventPayload b
ep
                  extraLogCtx :: ExtraLogContext
extraLogCtx = EventId -> Maybe TriggerName -> ExtraLogContext
ExtraLogContext (EventPayload b -> EventId
forall (b :: BackendType). EventPayload b -> EventId
epId EventPayload b
ep) (TriggerName -> Maybe TriggerName
forall a. a -> Maybe a
Just (TriggerName -> Maybe TriggerName)
-> TriggerName -> Maybe TriggerName
forall a b. (a -> b) -> a -> b
$ EventTriggerInfo b -> TriggerName
forall (b :: BackendType). EventTriggerInfo b -> TriggerName
etiName EventTriggerInfo b
eti)
                  requestTransform :: Maybe RequestTransform
requestTransform = EventTriggerInfo b -> Maybe RequestTransform
forall (b :: BackendType).
EventTriggerInfo b -> Maybe RequestTransform
etiRequestTransform EventTriggerInfo b
eti
                  responseTransform :: Maybe ResponseTransform
responseTransform = MetadataResponseTransform -> ResponseTransform
mkResponseTransform (MetadataResponseTransform -> ResponseTransform)
-> Maybe MetadataResponseTransform -> Maybe ResponseTransform
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> EventTriggerInfo b -> Maybe MetadataResponseTransform
forall (b :: BackendType).
EventTriggerInfo b -> Maybe MetadataResponseTransform
etiResponseTransform EventTriggerInfo b
eti
              Either
  (TransformableRequestError 'EventType)
  (Request, HTTPResp 'EventType)
eitherReqRes <-
                ExceptT
  (TransformableRequestError 'EventType)
  (TraceT io)
  (Request, HTTPResp 'EventType)
-> TraceT
     io
     (Either
        (TransformableRequestError 'EventType)
        (Request, HTTPResp 'EventType))
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT
   (TransformableRequestError 'EventType)
   (TraceT io)
   (Request, HTTPResp 'EventType)
 -> TraceT
      io
      (Either
         (TransformableRequestError 'EventType)
         (Request, HTTPResp 'EventType)))
-> ExceptT
     (TransformableRequestError 'EventType)
     (TraceT io)
     (Request, HTTPResp 'EventType)
-> TraceT
     io
     (Either
        (TransformableRequestError 'EventType)
        (Request, HTTPResp 'EventType))
forall a b. (a -> b) -> a -> b
$
                  [Header]
-> ResponseTimeout
-> ByteString
-> Maybe RequestTransform
-> ResolvedWebhook
-> ExceptT
     (TransformableRequestError 'EventType) (TraceT io) RequestDetails
forall (a :: TriggerTypes) (m :: * -> *).
MonadError (TransformableRequestError a) m =>
[Header]
-> ResponseTimeout
-> ByteString
-> Maybe RequestTransform
-> ResolvedWebhook
-> m RequestDetails
mkRequest [Header]
headers ResponseTimeout
httpTimeout ByteString
payload Maybe RequestTransform
requestTransform (EnvRecord ResolvedWebhook -> ResolvedWebhook
forall a. EnvRecord a -> a
_envVarValue EnvRecord ResolvedWebhook
webhook) ExceptT
  (TransformableRequestError 'EventType) (TraceT io) RequestDetails
-> (RequestDetails
    -> ExceptT
         (TransformableRequestError 'EventType)
         (TraceT io)
         (Request, HTTPResp 'EventType))
-> ExceptT
     (TransformableRequestError 'EventType)
     (TraceT io)
     (Request, HTTPResp 'EventType)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \RequestDetails
reqDetails -> do
                    let request :: Request
request = RequestDetails -> Request
extractRequest RequestDetails
reqDetails
                        logger' :: Either (HTTPErr 'EventType) (HTTPResp 'EventType)
-> RequestDetails
-> ExceptT (TransformableRequestError 'EventType) (TraceT io) ()
logger' Either (HTTPErr 'EventType) (HTTPResp 'EventType)
res RequestDetails
details = Either (HTTPErr 'EventType) (HTTPResp 'EventType)
-> ExtraLogContext
-> RequestDetails
-> Text
-> [HeaderConf]
-> ExceptT (TransformableRequestError 'EventType) (TraceT io) ()
forall r (m :: * -> *).
(MonadReader r m, Has (Logger Hasura) r, MonadIO m) =>
Either (HTTPErr 'EventType) (HTTPResp 'EventType)
-> ExtraLogContext
-> RequestDetails
-> Text
-> [HeaderConf]
-> m ()
logHTTPForET Either (HTTPErr 'EventType) (HTTPResp 'EventType)
res ExtraLogContext
extraLogCtx RequestDetails
details (EnvRecord ResolvedWebhook -> Text
forall a. EnvRecord a -> Text
_envVarName EnvRecord ResolvedWebhook
webhook) [HeaderConf]
logHeaders
                    -- Event Triggers have a configuration parameter called
                    -- HASURA_GRAPHQL_EVENTS_HTTP_WORKERS, which is used
                    -- to control the concurrency of http delivery.
                    -- This bracket is used to increment and decrement an
                    -- HTTP Worker EKG Gauge for the duration of the
                    -- request invocation
                    HTTPResp 'EventType
resp <-
                      ExceptT (TransformableRequestError 'EventType) (TraceT io) ()
-> ExceptT (TransformableRequestError 'EventType) (TraceT io) ()
-> ExceptT
     (TransformableRequestError 'EventType)
     (TraceT io)
     (HTTPResp 'EventType)
-> ExceptT
     (TransformableRequestError 'EventType)
     (TraceT io)
     (HTTPResp 'EventType)
forall (m :: * -> *) a c b. MonadMask m => m a -> m c -> m b -> m b
bracket_
                        ( do
                            IO ()
-> ExceptT (TransformableRequestError 'EventType) (TraceT io) ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ()
 -> ExceptT (TransformableRequestError 'EventType) (TraceT io) ())
-> IO ()
-> ExceptT (TransformableRequestError 'EventType) (TraceT io) ()
forall a b. (a -> b) -> a -> b
$ Gauge -> IO ()
EKG.Gauge.inc (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ ServerMetrics -> Gauge
smNumEventHTTPWorkers ServerMetrics
serverMetrics
                            IO ()
-> ExceptT (TransformableRequestError 'EventType) (TraceT io) ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ()
 -> ExceptT (TransformableRequestError 'EventType) (TraceT io) ())
-> IO ()
-> ExceptT (TransformableRequestError 'EventType) (TraceT io) ()
forall a b. (a -> b) -> a -> b
$ Gauge -> IO ()
Prometheus.Gauge.inc (EventTriggerMetrics -> Gauge
eventTriggerHTTPWorkers EventTriggerMetrics
eventTriggerMetrics)
                        )
                        ( do
                            IO ()
-> ExceptT (TransformableRequestError 'EventType) (TraceT io) ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ()
 -> ExceptT (TransformableRequestError 'EventType) (TraceT io) ())
-> IO ()
-> ExceptT (TransformableRequestError 'EventType) (TraceT io) ()
forall a b. (a -> b) -> a -> b
$ Gauge -> IO ()
EKG.Gauge.dec (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ ServerMetrics -> Gauge
smNumEventHTTPWorkers ServerMetrics
serverMetrics
                            IO ()
-> ExceptT (TransformableRequestError 'EventType) (TraceT io) ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ()
 -> ExceptT (TransformableRequestError 'EventType) (TraceT io) ())
-> IO ()
-> ExceptT (TransformableRequestError 'EventType) (TraceT io) ()
forall a b. (a -> b) -> a -> b
$ Gauge -> IO ()
Prometheus.Gauge.dec (EventTriggerMetrics -> Gauge
eventTriggerHTTPWorkers EventTriggerMetrics
eventTriggerMetrics)
                        )
                        (RequestDetails
-> Maybe ResponseTransform
-> Maybe SessionVariables
-> (Either (HTTPErr 'EventType) (HTTPResp 'EventType)
    -> RequestDetails
    -> ExceptT (TransformableRequestError 'EventType) (TraceT io) ())
-> ExceptT
     (TransformableRequestError 'EventType)
     (TraceT io)
     (HTTPResp 'EventType)
forall r (m :: * -> *) (a :: TriggerTypes).
(MonadReader r m, MonadError (TransformableRequestError a) m,
 Has Manager r, Has (Logger Hasura) r, MonadIO m, MonadTrace m) =>
RequestDetails
-> Maybe ResponseTransform
-> Maybe SessionVariables
-> (Either (HTTPErr a) (HTTPResp a) -> RequestDetails -> m ())
-> m (HTTPResp a)
invokeRequest RequestDetails
reqDetails Maybe ResponseTransform
responseTransform (RequestDetails -> Maybe SessionVariables
_rdSessionVars RequestDetails
reqDetails) Either (HTTPErr 'EventType) (HTTPResp 'EventType)
-> RequestDetails
-> ExceptT (TransformableRequestError 'EventType) (TraceT io) ()
logger')
                    (Request, HTTPResp 'EventType)
-> ExceptT
     (TransformableRequestError 'EventType)
     (TraceT io)
     (Request, HTTPResp 'EventType)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Request
request, HTTPResp 'EventType
resp)
              case Either
  (TransformableRequestError 'EventType)
  (Request, HTTPResp 'EventType)
eitherReqRes of
                Right (Request
req, HTTPResp 'EventType
resp) ->
                  let reqBody :: Value
reqBody = Value -> Maybe Value -> Value
forall a. a -> Maybe a -> a
fromMaybe Value
J.Null (Maybe Value -> Value) -> Maybe Value -> Value
forall a b. (a -> b) -> a -> b
$ Getting (Maybe ByteString) Request (Maybe ByteString)
-> Request -> Maybe ByteString
forall s (m :: * -> *) a. MonadReader s m => Getting a s a -> m a
view Getting (Maybe ByteString) Request (Maybe ByteString)
Lens' Request (Maybe ByteString)
HTTP.body Request
req Maybe ByteString -> (ByteString -> Maybe Value) -> Maybe Value
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= FromJSON Value => ByteString -> Maybe Value
forall a. FromJSON a => ByteString -> Maybe a
J.decode @J.Value
                   in SourceConfig b
-> Event b
-> [HeaderConf]
-> Value
-> MaintenanceMode MaintenanceModeVersion
-> HTTPResp 'EventType
-> TraceT io (Either QErr ())
forall (b :: BackendType) (m :: * -> *) (a :: TriggerTypes).
(MonadIO m, BackendEventTrigger b) =>
SourceConfig b
-> Event b
-> [HeaderConf]
-> Value
-> MaintenanceMode MaintenanceModeVersion
-> HTTPResp a
-> m (Either QErr ())
processSuccess SourceConfig b
sourceConfig Event b
e [HeaderConf]
logHeaders Value
reqBody MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion HTTPResp 'EventType
resp TraceT io (Either QErr ())
-> (Either QErr () -> TraceT io ()) -> TraceT io ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Either QErr () -> (QErr -> TraceT io ()) -> TraceT io ())
-> (QErr -> TraceT io ()) -> Either QErr () -> TraceT io ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip Either QErr () -> (QErr -> TraceT io ()) -> TraceT io ()
forall (m :: * -> *) e a.
Applicative m =>
Either e a -> (e -> m a) -> m a
onLeft QErr -> TraceT io ()
forall r (m :: * -> *).
(MonadReader r m, Has (Logger Hasura) r, MonadIO m) =>
QErr -> m ()
logQErr
                Left (HTTPError Value
reqBody HTTPErr 'EventType
err) ->
                  SourceConfig b
-> Event b
-> RetryConf
-> [HeaderConf]
-> Value
-> MaintenanceMode MaintenanceModeVersion
-> HTTPErr 'EventType
-> TraceT io (Either QErr ())
forall (b :: BackendType) (m :: * -> *) (a :: TriggerTypes).
(MonadIO m, BackendEventTrigger b) =>
SourceConfig b
-> Event b
-> RetryConf
-> [HeaderConf]
-> Value
-> MaintenanceMode MaintenanceModeVersion
-> HTTPErr a
-> m (Either QErr ())
processError @b SourceConfig b
sourceConfig Event b
e RetryConf
retryConf [HeaderConf]
logHeaders Value
reqBody MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion HTTPErr 'EventType
err TraceT io (Either QErr ())
-> (Either QErr () -> TraceT io ()) -> TraceT io ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Either QErr () -> (QErr -> TraceT io ()) -> TraceT io ())
-> (QErr -> TraceT io ()) -> Either QErr () -> TraceT io ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip Either QErr () -> (QErr -> TraceT io ()) -> TraceT io ()
forall (m :: * -> *) e a.
Applicative m =>
Either e a -> (e -> m a) -> m a
onLeft QErr -> TraceT io ()
forall r (m :: * -> *).
(MonadReader r m, Has (Logger Hasura) r, MonadIO m) =>
QErr -> m ()
logQErr
                Left (TransformationError Value
_ TransformErrorBundle
err) -> do
                  Logger Hasura
-> forall a (m :: * -> *).
   (ToEngineLog a Hasura, MonadIO m) =>
   a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
   (ToEngineLog a impl, MonadIO m) =>
   a -> m ()
L.unLogger Logger Hasura
logger (UnstructuredLog -> TraceT io ())
-> UnstructuredLog -> TraceT io ()
forall a b. (a -> b) -> a -> b
$ LogLevel -> SerializableBlob -> UnstructuredLog
L.UnstructuredLog LogLevel
L.LevelError (ByteString -> SerializableBlob
SB.fromLBS (ByteString -> SerializableBlob) -> ByteString -> SerializableBlob
forall a b. (a -> b) -> a -> b
$ TransformErrorBundle -> ByteString
forall a. ToJSON a => a -> ByteString
J.encode TransformErrorBundle
err)

                  -- Record an Event Error
                  SourceConfig b
-> Event b
-> Maybe (Invocation 'EventType)
-> ProcessEventError
-> MaintenanceMode MaintenanceModeVersion
-> TraceT io (Either QErr ())
forall (b :: BackendType) (m :: * -> *).
(BackendEventTrigger b, MonadIO m) =>
SourceConfig b
-> Event b
-> Maybe (Invocation 'EventType)
-> ProcessEventError
-> MaintenanceMode MaintenanceModeVersion
-> m (Either QErr ())
recordError' @b SourceConfig b
sourceConfig Event b
e Maybe (Invocation 'EventType)
forall a. Maybe a
Nothing ProcessEventError
PESetError MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion TraceT io (Either QErr ())
-> (Either QErr () -> TraceT io ()) -> TraceT io ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Either QErr () -> (QErr -> TraceT io ()) -> TraceT io ())
-> (QErr -> TraceT io ()) -> Either QErr () -> TraceT io ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip Either QErr () -> (QErr -> TraceT io ()) -> TraceT io ()
forall (m :: * -> *) e a.
Applicative m =>
Either e a -> (e -> m a) -> m a
onLeft QErr -> TraceT io ()
forall r (m :: * -> *).
(MonadReader r m, Has (Logger Hasura) r, MonadIO m) =>
QErr -> m ()
logQErr
      -- removing an event from the _eeCtxLockedEvents after the event has been processed:
      SourceName
-> EventId -> TVar (HashMap SourceName (Set EventId)) -> io ()
forall (m :: * -> *).
MonadIO m =>
SourceName
-> EventId -> TVar (HashMap SourceName (Set EventId)) -> m ()
removeEventTriggerEventFromLockedEvents SourceName
sourceName (Event b -> EventId
forall (b :: BackendType). Event b -> EventId
eId Event b
e) TVar (HashMap SourceName (Set EventId))
leEvents

createEventPayload :: RetryConf -> Event b -> EventPayload b
createEventPayload :: RetryConf -> Event b -> EventPayload b
createEventPayload RetryConf
retryConf Event b
e =
  EventPayload :: forall (b :: BackendType).
EventId
-> TableName b
-> TriggerMetadata
-> Value
-> DeliveryInfo
-> UTCTime
-> EventPayload b
EventPayload
    { epId :: EventId
epId = Event b -> EventId
forall (b :: BackendType). Event b -> EventId
eId Event b
e,
      epTable :: TableName b
epTable = Event b -> TableName b
forall (b :: BackendType). Event b -> TableName b
eTable Event b
e,
      epTrigger :: TriggerMetadata
epTrigger = Event b -> TriggerMetadata
forall (b :: BackendType). Event b -> TriggerMetadata
eTrigger Event b
e,
      epEvent :: Value
epEvent = Event b -> Value
forall (b :: BackendType). Event b -> Value
eEvent Event b
e,
      epDeliveryInfo :: DeliveryInfo
epDeliveryInfo =
        DeliveryInfo :: Int -> Int -> DeliveryInfo
DeliveryInfo
          { diCurrentRetry :: Int
diCurrentRetry = Event b -> Int
forall (b :: BackendType). Event b -> Int
eTries Event b
e,
            diMaxRetries :: Int
diMaxRetries = RetryConf -> Int
rcNumRetries RetryConf
retryConf
          },
      epCreatedAt :: UTCTime
epCreatedAt = Event b -> UTCTime
forall (b :: BackendType). Event b -> UTCTime
eCreatedAt Event b
e
    }

processSuccess ::
  forall b m a.
  (MonadIO m, BackendEventTrigger b) =>
  SourceConfig b ->
  Event b ->
  [HeaderConf] ->
  J.Value ->
  MaintenanceMode MaintenanceModeVersion ->
  HTTPResp a ->
  m (Either QErr ())
processSuccess :: SourceConfig b
-> Event b
-> [HeaderConf]
-> Value
-> MaintenanceMode MaintenanceModeVersion
-> HTTPResp a
-> m (Either QErr ())
processSuccess SourceConfig b
sourceConfig Event b
e [HeaderConf]
reqHeaders Value
ep MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion HTTPResp a
resp = do
  let respBody :: SerializableBlob
respBody = HTTPResp a -> SerializableBlob
forall (a :: TriggerTypes). HTTPResp a -> SerializableBlob
hrsBody HTTPResp a
resp
      respHeaders :: [HeaderConf]
respHeaders = HTTPResp a -> [HeaderConf]
forall (a :: TriggerTypes). HTTPResp a -> [HeaderConf]
hrsHeaders HTTPResp a
resp
      respStatus :: Int
respStatus = HTTPResp a -> Int
forall (a :: TriggerTypes). HTTPResp a -> Int
hrsStatus HTTPResp a
resp
      eid :: EventId
eid = Event b -> EventId
forall (b :: BackendType). Event b -> EventId
eId Event b
e
      invocation :: Invocation 'EventType
invocation = EventId
-> Value
-> Maybe Int
-> [HeaderConf]
-> SerializableBlob
-> [HeaderConf]
-> Invocation 'EventType
mkInvocation EventId
eid Value
ep (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
respStatus) [HeaderConf]
reqHeaders SerializableBlob
respBody [HeaderConf]
respHeaders
  SourceConfig b
-> Event b
-> Invocation 'EventType
-> MaintenanceMode MaintenanceModeVersion
-> m (Either QErr ())
forall (b :: BackendType) (m :: * -> *).
(BackendEventTrigger b, MonadIO m) =>
SourceConfig b
-> Event b
-> Invocation 'EventType
-> MaintenanceMode MaintenanceModeVersion
-> m (Either QErr ())
recordSuccess @b SourceConfig b
sourceConfig Event b
e Invocation 'EventType
invocation MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion

processError ::
  forall b m a.
  ( MonadIO m,
    BackendEventTrigger b
  ) =>
  SourceConfig b ->
  Event b ->
  RetryConf ->
  [HeaderConf] ->
  J.Value ->
  MaintenanceMode MaintenanceModeVersion ->
  HTTPErr a ->
  m (Either QErr ())
processError :: SourceConfig b
-> Event b
-> RetryConf
-> [HeaderConf]
-> Value
-> MaintenanceMode MaintenanceModeVersion
-> HTTPErr a
-> m (Either QErr ())
processError SourceConfig b
sourceConfig Event b
e RetryConf
retryConf [HeaderConf]
reqHeaders Value
ep MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion HTTPErr a
err = do
  let invocation :: Invocation 'EventType
invocation = case HTTPErr a
err of
        HClient HttpException
httpException ->
          let statusMaybe :: Maybe Int
statusMaybe = HttpException -> Maybe Int
getHTTPExceptionStatus HttpException
httpException
           in EventId
-> Value
-> Maybe Int
-> [HeaderConf]
-> SerializableBlob
-> [HeaderConf]
-> Invocation 'EventType
mkInvocation (Event b -> EventId
forall (b :: BackendType). Event b -> EventId
eId Event b
e) Value
ep Maybe Int
statusMaybe [HeaderConf]
reqHeaders (ByteString -> SerializableBlob
SB.fromLBS (HttpException -> ByteString
forall a. ToJSON a => a -> ByteString
J.encode HttpException
httpException)) []
        HStatus HTTPResp a
errResp -> do
          let respPayload :: SerializableBlob
respPayload = HTTPResp a -> SerializableBlob
forall (a :: TriggerTypes). HTTPResp a -> SerializableBlob
hrsBody HTTPResp a
errResp
              respHeaders :: [HeaderConf]
respHeaders = HTTPResp a -> [HeaderConf]
forall (a :: TriggerTypes). HTTPResp a -> [HeaderConf]
hrsHeaders HTTPResp a
errResp
              respStatus :: Int
respStatus = HTTPResp a -> Int
forall (a :: TriggerTypes). HTTPResp a -> Int
hrsStatus HTTPResp a
errResp
          EventId
-> Value
-> Maybe Int
-> [HeaderConf]
-> SerializableBlob
-> [HeaderConf]
-> Invocation 'EventType
mkInvocation (Event b -> EventId
forall (b :: BackendType). Event b -> EventId
eId Event b
e) Value
ep (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
respStatus) [HeaderConf]
reqHeaders SerializableBlob
respPayload [HeaderConf]
respHeaders
        HOther String
detail -> do
          let errMsg :: SerializableBlob
errMsg = ByteString -> SerializableBlob
SB.fromLBS (ByteString -> SerializableBlob) -> ByteString -> SerializableBlob
forall a b. (a -> b) -> a -> b
$ String -> ByteString
forall a. ToJSON a => a -> ByteString
J.encode String
detail
          EventId
-> Value
-> Maybe Int
-> [HeaderConf]
-> SerializableBlob
-> [HeaderConf]
-> Invocation 'EventType
mkInvocation (Event b -> EventId
forall (b :: BackendType). Event b -> EventId
eId Event b
e) Value
ep (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
500) [HeaderConf]
reqHeaders SerializableBlob
errMsg []
  ProcessEventError
retryOrError <- Event b -> RetryConf -> HTTPErr a -> m ProcessEventError
forall (m :: * -> *) (b :: BackendType) (a :: TriggerTypes).
MonadIO m =>
Event b -> RetryConf -> HTTPErr a -> m ProcessEventError
retryOrSetError Event b
e RetryConf
retryConf HTTPErr a
err
  SourceConfig b
-> Event b
-> Invocation 'EventType
-> ProcessEventError
-> MaintenanceMode MaintenanceModeVersion
-> m (Either QErr ())
forall (b :: BackendType) (m :: * -> *).
(BackendEventTrigger b, MonadIO m) =>
SourceConfig b
-> Event b
-> Invocation 'EventType
-> ProcessEventError
-> MaintenanceMode MaintenanceModeVersion
-> m (Either QErr ())
recordError @b SourceConfig b
sourceConfig Event b
e Invocation 'EventType
invocation ProcessEventError
retryOrError MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion

retryOrSetError ::
  MonadIO m =>
  Event b ->
  RetryConf ->
  HTTPErr a ->
  m ProcessEventError
retryOrSetError :: Event b -> RetryConf -> HTTPErr a -> m ProcessEventError
retryOrSetError Event b
e RetryConf
retryConf HTTPErr a
err = do
  let mretryHeader :: Maybe Text
mretryHeader = HTTPErr a -> Maybe Text
forall (a :: TriggerTypes). HTTPErr a -> Maybe Text
getRetryAfterHeaderFromError HTTPErr a
err
      tries :: Int
tries = Event b -> Int
forall (b :: BackendType). Event b -> Int
eTries Event b
e
      mretryHeaderSeconds :: Maybe Int
mretryHeaderSeconds = Maybe Text
mretryHeader Maybe Text -> (Text -> Maybe Int) -> Maybe Int
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Text -> Maybe Int
parseRetryHeader
      triesExhausted :: Bool
triesExhausted = Int
tries Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= RetryConf -> Int
rcNumRetries RetryConf
retryConf
      noRetryHeader :: Bool
noRetryHeader = Maybe Int -> Bool
forall a. Maybe a -> Bool
isNothing Maybe Int
mretryHeaderSeconds
  -- current_try = tries + 1 , allowed_total_tries = rcNumRetries retryConf + 1
  if Bool
triesExhausted Bool -> Bool -> Bool
&& Bool
noRetryHeader
    then ProcessEventError -> m ProcessEventError
forall (f :: * -> *) a. Applicative f => a -> f a
pure ProcessEventError
PESetError
    else do
      UTCTime
currentTime <- IO UTCTime -> m UTCTime
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
      let delay :: Int
delay = Int -> Maybe Int -> Int
forall a. a -> Maybe a -> a
fromMaybe (RetryConf -> Int
rcIntervalSec RetryConf
retryConf) Maybe Int
mretryHeaderSeconds
          diff :: NominalDiffTime
diff = Int -> NominalDiffTime
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
delay
          retryTime :: UTCTime
retryTime = NominalDiffTime -> UTCTime -> UTCTime
addUTCTime NominalDiffTime
diff UTCTime
currentTime
      ProcessEventError -> m ProcessEventError
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ProcessEventError -> m ProcessEventError)
-> ProcessEventError -> m ProcessEventError
forall a b. (a -> b) -> a -> b
$ UTCTime -> ProcessEventError
PESetRetry UTCTime
retryTime
  where
    getRetryAfterHeaderFromError :: HTTPErr a -> Maybe Text
getRetryAfterHeaderFromError (HStatus HTTPResp a
resp) = HTTPResp a -> Maybe Text
forall (a :: TriggerTypes). HTTPResp a -> Maybe Text
getRetryAfterHeaderFromResp HTTPResp a
resp
    getRetryAfterHeaderFromError HTTPErr a
_ = Maybe Text
forall a. Maybe a
Nothing

    parseRetryHeader :: Text -> Maybe Int
parseRetryHeader = (Int -> Bool) -> Maybe Int -> Maybe Int
forall (m :: * -> *) a. MonadPlus m => (a -> Bool) -> m a -> m a
mfilter (Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0) (Maybe Int -> Maybe Int)
-> (Text -> Maybe Int) -> Text -> Maybe Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Maybe Int
forall a. Read a => String -> Maybe a
readMaybe (String -> Maybe Int) -> (Text -> String) -> Text -> Maybe Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> String
T.unpack

mkInvocation ::
  EventId ->
  J.Value ->
  Maybe Int ->
  [HeaderConf] ->
  SB.SerializableBlob ->
  [HeaderConf] ->
  Invocation 'EventType
mkInvocation :: EventId
-> Value
-> Maybe Int
-> [HeaderConf]
-> SerializableBlob
-> [HeaderConf]
-> Invocation 'EventType
mkInvocation EventId
eid Value
ep Maybe Int
statusMaybe [HeaderConf]
reqHeaders SerializableBlob
respBody [HeaderConf]
respHeaders =
  let resp :: Response 'EventType
resp =
        case Maybe Int
statusMaybe of
          Maybe Int
Nothing -> SerializableBlob -> Response 'EventType
forall (a :: TriggerTypes). SerializableBlob -> Response a
mkClientErr SerializableBlob
respBody
          Just Int
status ->
            if Int
status Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
200 Bool -> Bool -> Bool
&& Int
status Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
300
              then Int -> SerializableBlob -> [HeaderConf] -> Response 'EventType
forall (a :: TriggerTypes).
Int -> SerializableBlob -> [HeaderConf] -> Response a
mkResp Int
status SerializableBlob
respBody [HeaderConf]
respHeaders
              else SerializableBlob -> Response 'EventType
forall (a :: TriggerTypes). SerializableBlob -> Response a
mkClientErr SerializableBlob
respBody
   in EventId
-> Maybe Int
-> WebhookRequest
-> Response 'EventType
-> Invocation 'EventType
forall (a :: TriggerTypes).
EventId
-> Maybe Int -> WebhookRequest -> Response a -> Invocation a
Invocation
        EventId
eid
        Maybe Int
statusMaybe
        (Value -> [HeaderConf] -> Text -> WebhookRequest
mkWebhookReq Value
ep [HeaderConf]
reqHeaders Text
invocationVersionET)
        Response 'EventType
resp

logQErr :: (MonadReader r m, Has (L.Logger L.Hasura) r, MonadIO m) => QErr -> m ()
logQErr :: QErr -> m ()
logQErr QErr
err = do
  Logger Hasura
logger :: L.Logger L.Hasura <- (r -> Logger Hasura) -> m (Logger Hasura)
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks r -> Logger Hasura
forall a t. Has a t => t -> a
getter
  Logger Hasura
-> forall a (m :: * -> *).
   (ToEngineLog a Hasura, MonadIO m) =>
   a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
   (ToEngineLog a impl, MonadIO m) =>
   a -> m ()
L.unLogger Logger Hasura
logger (EventInternalErr -> m ()) -> EventInternalErr -> m ()
forall a b. (a -> b) -> a -> b
$ QErr -> EventInternalErr
EventInternalErr QErr
err

getEventTriggerInfoFromEvent ::
  forall b. Backend b => SchemaCache -> Event b -> Either Text (EventTriggerInfo b)
getEventTriggerInfoFromEvent :: SchemaCache -> Event b -> Either Text (EventTriggerInfo b)
getEventTriggerInfoFromEvent SchemaCache
sc Event b
e = do
  let table :: TableName b
table = Event b -> TableName b
forall (b :: BackendType). Event b -> TableName b
eTable Event b
e
      mTableInfo :: Maybe (TableInfo b)
mTableInfo = SourceName -> TableName b -> SourceCache -> Maybe (TableInfo b)
forall (b :: BackendType).
Backend b =>
SourceName -> TableName b -> SourceCache -> Maybe (TableInfo b)
unsafeTableInfo @b (Event b -> SourceName
forall (b :: BackendType). Event b -> SourceName
eSource Event b
e) TableName b
table (SourceCache -> Maybe (TableInfo b))
-> SourceCache -> Maybe (TableInfo b)
forall a b. (a -> b) -> a -> b
$ SchemaCache -> SourceCache
scSources SchemaCache
sc
  TableInfo b
tableInfo <- Maybe (TableInfo b)
-> Either Text (TableInfo b) -> Either Text (TableInfo b)
forall (m :: * -> *) a. Applicative m => Maybe a -> m a -> m a
onNothing Maybe (TableInfo b)
mTableInfo (Either Text (TableInfo b) -> Either Text (TableInfo b))
-> Either Text (TableInfo b) -> Either Text (TableInfo b)
forall a b. (a -> b) -> a -> b
$ Text -> Either Text (TableInfo b)
forall a b. a -> Either a b
Left (Text
"table '" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> TableName b
table TableName b -> Text -> Text
forall t. ToTxt t => t -> Text -> Text
<<> Text
"' not found")
  let triggerName :: TriggerName
triggerName = TriggerMetadata -> TriggerName
tmName (TriggerMetadata -> TriggerName) -> TriggerMetadata -> TriggerName
forall a b. (a -> b) -> a -> b
$ Event b -> TriggerMetadata
forall (b :: BackendType). Event b -> TriggerMetadata
eTrigger Event b
e
      mEventTriggerInfo :: Maybe (EventTriggerInfo b)
mEventTriggerInfo = TriggerName
-> HashMap TriggerName (EventTriggerInfo b)
-> Maybe (EventTriggerInfo b)
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
M.lookup TriggerName
triggerName (TableInfo b -> HashMap TriggerName (EventTriggerInfo b)
forall (b :: BackendType). TableInfo b -> EventTriggerInfoMap b
_tiEventTriggerInfoMap TableInfo b
tableInfo)
  Maybe (EventTriggerInfo b)
-> Either Text (EventTriggerInfo b)
-> Either Text (EventTriggerInfo b)
forall (m :: * -> *) a. Applicative m => Maybe a -> m a -> m a
onNothing Maybe (EventTriggerInfo b)
mEventTriggerInfo (Either Text (EventTriggerInfo b)
 -> Either Text (EventTriggerInfo b))
-> Either Text (EventTriggerInfo b)
-> Either Text (EventTriggerInfo b)
forall a b. (a -> b) -> a -> b
$
    Text -> Either Text (EventTriggerInfo b)
forall a b. a -> Either a b
Left
      ( Text
"event trigger '" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> TriggerName -> Text
triggerNameToTxt TriggerName
triggerName
          Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"' on table '"
          Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> TableName b
table TableName b -> Text -> Text
forall t. ToTxt t => t -> Text -> Text
<<> Text
"' not found"
      )