-- |
-- = Event Triggers
--
-- Event triggers are like ordinary SQL triggers, except instead of calling a SQL
-- procedure, they call a webhook. The event delivery mechanism involves coordination
-- between both the database and graphql-engine: only the SQL database knows
-- when the events should fire, but only graphql-engine know how to actually
-- deliver them.
--
-- Therefore, event triggers are implemented in two parts:
--
-- 1. Every event trigger is backed by a bona fide SQL trigger. When the SQL trigger
--    fires, it creates a new record in the hdb_catalog.event_log table.
--
-- 2. Concurrently, a thread in graphql-engine monitors the hdb_catalog.event_log
--    table for new events. When new event(s) are found, it uses the information
--    (URL,payload and headers) stored in the event to deliver the event
--    to the webhook.
--
-- The creation and deletion of SQL trigger itself is managed by the metadata DDL
-- APIs (see Hasura.RQL.DDL.EventTrigger), so this module focuses on event delivery.
--
-- Most of the subtleties involve guaranteeing reliable delivery of events:
-- we guarantee that every event will be delivered at least once,
-- even if graphql-engine crashes. This means we have to record the state
-- of each event in the database, and we have to retry
-- failed requests at a regular (user-configurable) interval.
module Hasura.Eventing.EventTrigger
  ( initEventEngineCtx,
    createFetchedEventsStatsLogger,
    closeFetchedEventsStatsLogger,
    processEventQueue,
    defaultMaxEventThreads,
    defaultFetchInterval,
    Event (..),
    EventEngineCtx (..),
    -- Exported for testing
    saveLockedEventTriggerEvents,
    removeEventTriggerEventFromLockedEvents,
    logQErr,
  )
where

import Control.Concurrent.Async.Lifted.Safe qualified as LA
import Control.Concurrent.Extended (Forever (..), sleep)
import Control.Concurrent.STM.TVar
import Control.FoldDebounce qualified as FDebounce
import Control.Lens
import Control.Monad.Catch (MonadMask, bracket_, finally, mask_)
import Control.Monad.STM
import Control.Monad.Trans.Control (MonadBaseControl)
import Data.Aeson qualified as J
import Data.Aeson.Key qualified as Key
import Data.Aeson.KeyMap qualified as KeyMap
import Data.Aeson.Lens qualified as JL
import Data.Has
import Data.HashMap.Strict qualified as HashMap
import Data.SerializableBlob qualified as SB
import Data.Set qualified as Set
import Data.String
import Data.Text qualified as T
import Data.Text.Extended
import Data.Text.NonEmpty
import Data.Time qualified as Time
import Data.Time.Clock
import Hasura.Backends.Postgres.SQL.Types hiding (TableName)
import Hasura.Base.Error
import Hasura.Eventing.Backend
import Hasura.Eventing.Common
import Hasura.Eventing.HTTP
import Hasura.HTTP (getHTTPExceptionStatus)
import Hasura.Logging qualified as L
import Hasura.Prelude
import Hasura.RQL.DDL.Webhook.Transform
import Hasura.RQL.Types.Backend
import Hasura.RQL.Types.BackendType
import Hasura.RQL.Types.Common
import Hasura.RQL.Types.EventTrigger
import Hasura.RQL.Types.SchemaCache
import Hasura.RQL.Types.Source
import Hasura.SQL.AnyBackend qualified as AB
import Hasura.Server.Metrics (ServerMetrics (..))
import Hasura.Server.Prometheus
import Hasura.Server.Types
import Hasura.Tracing qualified as Tracing
import Network.HTTP.Client.Transformable qualified as HTTP
import Refined (NonNegative, Positive, Refined, unrefine)
import Refined.Unsafe (unsafeRefine)
import System.Metrics.Distribution qualified as EKG.Distribution
import System.Metrics.Gauge qualified as EKG.Gauge
import System.Metrics.Prometheus.Counter qualified as Prometheus.Counter
import System.Metrics.Prometheus.CounterVector (CounterVector)
import System.Metrics.Prometheus.CounterVector qualified as CounterVector
import System.Metrics.Prometheus.Gauge qualified as Prometheus.Gauge
import System.Metrics.Prometheus.Histogram qualified as Prometheus.Histogram
import System.Timeout.Lifted (timeout)

newtype EventInternalErr
  = EventInternalErr QErr
  deriving (EventInternalErr -> EventInternalErr -> Bool
(EventInternalErr -> EventInternalErr -> Bool)
-> (EventInternalErr -> EventInternalErr -> Bool)
-> Eq EventInternalErr
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: EventInternalErr -> EventInternalErr -> Bool
== :: EventInternalErr -> EventInternalErr -> Bool
$c/= :: EventInternalErr -> EventInternalErr -> Bool
/= :: EventInternalErr -> EventInternalErr -> Bool
Eq)

instance L.ToEngineLog EventInternalErr L.Hasura where
  toEngineLog :: EventInternalErr -> (LogLevel, EngineLogType Hasura, Value)
toEngineLog (EventInternalErr QErr
qerr) = (LogLevel
L.LevelError, EngineLogType Hasura
L.eventTriggerLogType, QErr -> Value
forall a. ToJSON a => a -> Value
J.toJSON QErr
qerr)

{- Note [Maintenance mode]
~~~~~~~~~~~~~~~~~~~~~~~~~~

Maintenance mode is a mode in which users can upgrade their graphql-engine
without any down time. More on maintenance mode can be found here:
https://github.com/hasura/graphql-engine-mono/issues/431.

Basically, there are a few main things that maintenance mode boils down to:

1. No operation that may change the metadata will be allowed.
2. Migrations are not applied when the graphql-engine is started, so the
   catalog schema will be in the older version.
3. Event triggers should continue working in the new code with the older
   catalog schema i.e it should work even if there are any schema changes
   to the `hdb_catalog.event_log` table.

#1 and #2 are fairly self-explanatory. For #3, we need to support fetching
events depending upon the catalog version. So, fetch events works in the
following way now:

1. Check if maintenance mode is enabled
2. If maintenance mode is enabled then read the catalog version from the DB
   and accordingly fire the appropriate query to the events log table.
   When maintenance mode is disabled, we query the events log table according
   to the latest catalog, we do not read the catalog version for this.
-}

-- | See Note [Maintenance Mode]
data EventEngineCtx = EventEngineCtx
  { EventEngineCtx -> TVar Int
_eeCtxEventThreadsCapacity :: TVar Int,
    EventEngineCtx -> DiffTime
_eeCtxFetchInterval :: DiffTime,
    EventEngineCtx -> Refined NonNegative Int
_eeCtxFetchSize :: Refined NonNegative Int
  }

data DeliveryInfo = DeliveryInfo
  { DeliveryInfo -> Int
diCurrentRetry :: Int,
    DeliveryInfo -> Int
diMaxRetries :: Int
  }
  deriving (Int -> DeliveryInfo -> ShowS
[DeliveryInfo] -> ShowS
DeliveryInfo -> String
(Int -> DeliveryInfo -> ShowS)
-> (DeliveryInfo -> String)
-> ([DeliveryInfo] -> ShowS)
-> Show DeliveryInfo
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> DeliveryInfo -> ShowS
showsPrec :: Int -> DeliveryInfo -> ShowS
$cshow :: DeliveryInfo -> String
show :: DeliveryInfo -> String
$cshowList :: [DeliveryInfo] -> ShowS
showList :: [DeliveryInfo] -> ShowS
Show, (forall x. DeliveryInfo -> Rep DeliveryInfo x)
-> (forall x. Rep DeliveryInfo x -> DeliveryInfo)
-> Generic DeliveryInfo
forall x. Rep DeliveryInfo x -> DeliveryInfo
forall x. DeliveryInfo -> Rep DeliveryInfo x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. DeliveryInfo -> Rep DeliveryInfo x
from :: forall x. DeliveryInfo -> Rep DeliveryInfo x
$cto :: forall x. Rep DeliveryInfo x -> DeliveryInfo
to :: forall x. Rep DeliveryInfo x -> DeliveryInfo
Generic, DeliveryInfo -> DeliveryInfo -> Bool
(DeliveryInfo -> DeliveryInfo -> Bool)
-> (DeliveryInfo -> DeliveryInfo -> Bool) -> Eq DeliveryInfo
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: DeliveryInfo -> DeliveryInfo -> Bool
== :: DeliveryInfo -> DeliveryInfo -> Bool
$c/= :: DeliveryInfo -> DeliveryInfo -> Bool
/= :: DeliveryInfo -> DeliveryInfo -> Bool
Eq)

instance J.ToJSON DeliveryInfo where
  toJSON :: DeliveryInfo -> Value
toJSON = Options -> DeliveryInfo -> Value
forall a.
(Generic a, GToJSON' Value Zero (Rep a)) =>
Options -> a -> Value
J.genericToJSON Options
hasuraJSON {omitNothingFields :: Bool
J.omitNothingFields = Bool
True}
  toEncoding :: DeliveryInfo -> Encoding
toEncoding = Options -> DeliveryInfo -> Encoding
forall a.
(Generic a, GToJSON' Encoding Zero (Rep a)) =>
Options -> a -> Encoding
J.genericToEncoding Options
hasuraJSON {omitNothingFields :: Bool
J.omitNothingFields = Bool
True}

newtype QualifiedTableStrict = QualifiedTableStrict
  { QualifiedTableStrict -> QualifiedTable
getQualifiedTable :: QualifiedTable
  }
  deriving (Int -> QualifiedTableStrict -> ShowS
[QualifiedTableStrict] -> ShowS
QualifiedTableStrict -> String
(Int -> QualifiedTableStrict -> ShowS)
-> (QualifiedTableStrict -> String)
-> ([QualifiedTableStrict] -> ShowS)
-> Show QualifiedTableStrict
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> QualifiedTableStrict -> ShowS
showsPrec :: Int -> QualifiedTableStrict -> ShowS
$cshow :: QualifiedTableStrict -> String
show :: QualifiedTableStrict -> String
$cshowList :: [QualifiedTableStrict] -> ShowS
showList :: [QualifiedTableStrict] -> ShowS
Show, QualifiedTableStrict -> QualifiedTableStrict -> Bool
(QualifiedTableStrict -> QualifiedTableStrict -> Bool)
-> (QualifiedTableStrict -> QualifiedTableStrict -> Bool)
-> Eq QualifiedTableStrict
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: QualifiedTableStrict -> QualifiedTableStrict -> Bool
== :: QualifiedTableStrict -> QualifiedTableStrict -> Bool
$c/= :: QualifiedTableStrict -> QualifiedTableStrict -> Bool
/= :: QualifiedTableStrict -> QualifiedTableStrict -> Bool
Eq)

instance J.ToJSON QualifiedTableStrict where
  toJSON :: QualifiedTableStrict -> Value
toJSON (QualifiedTableStrict (QualifiedObject SchemaName
sn TableName
tn)) =
    [Pair] -> Value
J.object
      [ Key
"schema" Key -> SchemaName -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
J..= SchemaName
sn,
        Key
"name" Key -> TableName -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
J..= TableName
tn
      ]

data EventPayload (b :: BackendType) = EventPayload
  { forall (b :: BackendType). EventPayload b -> EventId
epId :: EventId,
    forall (b :: BackendType). EventPayload b -> TableName b
epTable :: TableName b,
    forall (b :: BackendType). EventPayload b -> TriggerMetadata
epTrigger :: TriggerMetadata,
    forall (b :: BackendType). EventPayload b -> Value
epEvent :: J.Value,
    forall (b :: BackendType). EventPayload b -> DeliveryInfo
epDeliveryInfo :: DeliveryInfo,
    forall (b :: BackendType). EventPayload b -> LocalTime
epCreatedAt :: Time.LocalTime
  }
  deriving ((forall x. EventPayload b -> Rep (EventPayload b) x)
-> (forall x. Rep (EventPayload b) x -> EventPayload b)
-> Generic (EventPayload b)
forall x. Rep (EventPayload b) x -> EventPayload b
forall x. EventPayload b -> Rep (EventPayload b) x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
forall (b :: BackendType) x.
Rep (EventPayload b) x -> EventPayload b
forall (b :: BackendType) x.
EventPayload b -> Rep (EventPayload b) x
$cfrom :: forall (b :: BackendType) x.
EventPayload b -> Rep (EventPayload b) x
from :: forall x. EventPayload b -> Rep (EventPayload b) x
$cto :: forall (b :: BackendType) x.
Rep (EventPayload b) x -> EventPayload b
to :: forall x. Rep (EventPayload b) x -> EventPayload b
Generic)

deriving instance (Backend b) => Show (EventPayload b)

deriving instance (Backend b) => Eq (EventPayload b)

instance (Backend b) => J.ToJSON (EventPayload b) where
  toJSON :: EventPayload b -> Value
toJSON = Options -> EventPayload b -> Value
forall a.
(Generic a, GToJSON' Value Zero (Rep a)) =>
Options -> a -> Value
J.genericToJSON Options
hasuraJSON {omitNothingFields :: Bool
J.omitNothingFields = Bool
True}

defaultMaxEventThreads :: Refined Positive Int
defaultMaxEventThreads :: Refined Positive Int
defaultMaxEventThreads = Int -> Refined Positive Int
forall {k} (p :: k) x. Predicate p x => x -> Refined p x
unsafeRefine Int
100

defaultFetchInterval :: DiffTime
defaultFetchInterval :: DiffTime
defaultFetchInterval = Seconds -> DiffTime
seconds Seconds
1

initEventEngineCtx :: (MonadIO m) => Refined Positive Int -> Refined NonNegative Milliseconds -> Refined NonNegative Int -> m EventEngineCtx
initEventEngineCtx :: forall (m :: * -> *).
MonadIO m =>
Refined Positive Int
-> Refined NonNegative Milliseconds
-> Refined NonNegative Int
-> m EventEngineCtx
initEventEngineCtx Refined Positive Int
maxThreads Refined NonNegative Milliseconds
fetchInterval Refined NonNegative Int
_eeCtxFetchSize = do
  TVar Int
_eeCtxEventThreadsCapacity <- IO (TVar Int) -> m (TVar Int)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (TVar Int) -> m (TVar Int)) -> IO (TVar Int) -> m (TVar Int)
forall a b. (a -> b) -> a -> b
$ Int -> IO (TVar Int)
forall a. a -> IO (TVar a)
newTVarIO (Int -> IO (TVar Int)) -> Int -> IO (TVar Int)
forall a b. (a -> b) -> a -> b
$ Refined Positive Int -> Int
forall {k} (p :: k) x. Refined p x -> x
unrefine Refined Positive Int
maxThreads
  let _eeCtxFetchInterval :: DiffTime
_eeCtxFetchInterval = Milliseconds -> DiffTime
milliseconds (Milliseconds -> DiffTime) -> Milliseconds -> DiffTime
forall a b. (a -> b) -> a -> b
$ Refined NonNegative Milliseconds -> Milliseconds
forall {k} (p :: k) x. Refined p x -> x
unrefine Refined NonNegative Milliseconds
fetchInterval
  EventEngineCtx -> m EventEngineCtx
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return EventEngineCtx {TVar Int
DiffTime
Refined NonNegative Int
_eeCtxEventThreadsCapacity :: TVar Int
_eeCtxFetchInterval :: DiffTime
_eeCtxFetchSize :: Refined NonNegative Int
_eeCtxFetchSize :: Refined NonNegative Int
_eeCtxEventThreadsCapacity :: TVar Int
_eeCtxFetchInterval :: DiffTime
..}

saveLockedEventTriggerEvents :: (MonadIO m) => SourceName -> [EventId] -> TVar (HashMap SourceName (Set.Set EventId)) -> m ()
saveLockedEventTriggerEvents :: forall (m :: * -> *).
MonadIO m =>
SourceName
-> [EventId] -> TVar (HashMap SourceName (Set EventId)) -> m ()
saveLockedEventTriggerEvents SourceName
sourceName [EventId]
eventIds TVar (HashMap SourceName (Set EventId))
lockedEvents =
  IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
    (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically
    (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
      HashMap SourceName (Set EventId)
lockedEventsVals <- TVar (HashMap SourceName (Set EventId))
-> STM (HashMap SourceName (Set EventId))
forall a. TVar a -> STM a
readTVar TVar (HashMap SourceName (Set EventId))
lockedEvents
      case SourceName
-> HashMap SourceName (Set EventId) -> Maybe (Set EventId)
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HashMap.lookup SourceName
sourceName HashMap SourceName (Set EventId)
lockedEventsVals of
        Maybe (Set EventId)
Nothing -> TVar (HashMap SourceName (Set EventId))
-> HashMap SourceName (Set EventId) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (HashMap SourceName (Set EventId))
lockedEvents (HashMap SourceName (Set EventId) -> STM ())
-> HashMap SourceName (Set EventId) -> STM ()
forall a b. (a -> b) -> a -> b
$! SourceName -> Set EventId -> HashMap SourceName (Set EventId)
forall k v. Hashable k => k -> v -> HashMap k v
HashMap.singleton SourceName
sourceName ([EventId] -> Set EventId
forall a. Ord a => [a] -> Set a
Set.fromList [EventId]
eventIds)
        Just Set EventId
_ -> TVar (HashMap SourceName (Set EventId))
-> HashMap SourceName (Set EventId) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (HashMap SourceName (Set EventId))
lockedEvents (HashMap SourceName (Set EventId) -> STM ())
-> HashMap SourceName (Set EventId) -> STM ()
forall a b. (a -> b) -> a -> b
$! (Set EventId -> Set EventId -> Set EventId)
-> SourceName
-> Set EventId
-> HashMap SourceName (Set EventId)
-> HashMap SourceName (Set EventId)
forall k v.
(Eq k, Hashable k) =>
(v -> v -> v) -> k -> v -> HashMap k v -> HashMap k v
HashMap.insertWith Set EventId -> Set EventId -> Set EventId
forall a. Ord a => Set a -> Set a -> Set a
Set.union SourceName
sourceName ([EventId] -> Set EventId
forall a. Ord a => [a] -> Set a
Set.fromList [EventId]
eventIds) HashMap SourceName (Set EventId)
lockedEventsVals

removeEventTriggerEventFromLockedEvents ::
  (MonadIO m) => SourceName -> EventId -> TVar (HashMap SourceName (Set.Set EventId)) -> m ()
removeEventTriggerEventFromLockedEvents :: forall (m :: * -> *).
MonadIO m =>
SourceName
-> EventId -> TVar (HashMap SourceName (Set EventId)) -> m ()
removeEventTriggerEventFromLockedEvents SourceName
sourceName EventId
eventId TVar (HashMap SourceName (Set EventId))
lockedEvents =
  IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
    (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically
    (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
      HashMap SourceName (Set EventId)
lockedEventsVals <- TVar (HashMap SourceName (Set EventId))
-> STM (HashMap SourceName (Set EventId))
forall a. TVar a -> STM a
readTVar TVar (HashMap SourceName (Set EventId))
lockedEvents
      TVar (HashMap SourceName (Set EventId))
-> HashMap SourceName (Set EventId) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (HashMap SourceName (Set EventId))
lockedEvents (HashMap SourceName (Set EventId) -> STM ())
-> HashMap SourceName (Set EventId) -> STM ()
forall a b. (a -> b) -> a -> b
$! (Set EventId -> Set EventId)
-> SourceName
-> HashMap SourceName (Set EventId)
-> HashMap SourceName (Set EventId)
forall k v.
(Eq k, Hashable k) =>
(v -> v) -> k -> HashMap k v -> HashMap k v
HashMap.adjust (EventId -> Set EventId -> Set EventId
forall a. Ord a => a -> Set a -> Set a
Set.delete EventId
eventId) SourceName
sourceName HashMap SourceName (Set EventId)
lockedEventsVals

type BackendEventWithSource = AB.AnyBackend EventWithSource

type FetchEventArguments = ([BackendEventWithSource], Int, Bool)

newtype EventsCount = EventsCount {EventsCount -> Int
unEventsCount :: Int}
  deriving (EventsCount -> EventsCount -> Bool
(EventsCount -> EventsCount -> Bool)
-> (EventsCount -> EventsCount -> Bool) -> Eq EventsCount
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: EventsCount -> EventsCount -> Bool
== :: EventsCount -> EventsCount -> Bool
$c/= :: EventsCount -> EventsCount -> Bool
/= :: EventsCount -> EventsCount -> Bool
Eq, Int -> EventsCount -> ShowS
[EventsCount] -> ShowS
EventsCount -> String
(Int -> EventsCount -> ShowS)
-> (EventsCount -> String)
-> ([EventsCount] -> ShowS)
-> Show EventsCount
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> EventsCount -> ShowS
showsPrec :: Int -> EventsCount -> ShowS
$cshow :: EventsCount -> String
show :: EventsCount -> String
$cshowList :: [EventsCount] -> ShowS
showList :: [EventsCount] -> ShowS
Show, [EventsCount] -> Value
[EventsCount] -> Encoding
EventsCount -> Value
EventsCount -> Encoding
(EventsCount -> Value)
-> (EventsCount -> Encoding)
-> ([EventsCount] -> Value)
-> ([EventsCount] -> Encoding)
-> ToJSON EventsCount
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> ToJSON a
$ctoJSON :: EventsCount -> Value
toJSON :: EventsCount -> Value
$ctoEncoding :: EventsCount -> Encoding
toEncoding :: EventsCount -> Encoding
$ctoJSONList :: [EventsCount] -> Value
toJSONList :: [EventsCount] -> Value
$ctoEncodingList :: [EventsCount] -> Encoding
toEncodingList :: [EventsCount] -> Encoding
J.ToJSON, Value -> Parser [EventsCount]
Value -> Parser EventsCount
(Value -> Parser EventsCount)
-> (Value -> Parser [EventsCount]) -> FromJSON EventsCount
forall a.
(Value -> Parser a) -> (Value -> Parser [a]) -> FromJSON a
$cparseJSON :: Value -> Parser EventsCount
parseJSON :: Value -> Parser EventsCount
$cparseJSONList :: Value -> Parser [EventsCount]
parseJSONList :: Value -> Parser [EventsCount]
J.FromJSON, Integer -> EventsCount
EventsCount -> EventsCount
EventsCount -> EventsCount -> EventsCount
(EventsCount -> EventsCount -> EventsCount)
-> (EventsCount -> EventsCount -> EventsCount)
-> (EventsCount -> EventsCount -> EventsCount)
-> (EventsCount -> EventsCount)
-> (EventsCount -> EventsCount)
-> (EventsCount -> EventsCount)
-> (Integer -> EventsCount)
-> Num EventsCount
forall a.
(a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a)
-> (a -> a)
-> (a -> a)
-> (Integer -> a)
-> Num a
$c+ :: EventsCount -> EventsCount -> EventsCount
+ :: EventsCount -> EventsCount -> EventsCount
$c- :: EventsCount -> EventsCount -> EventsCount
- :: EventsCount -> EventsCount -> EventsCount
$c* :: EventsCount -> EventsCount -> EventsCount
* :: EventsCount -> EventsCount -> EventsCount
$cnegate :: EventsCount -> EventsCount
negate :: EventsCount -> EventsCount
$cabs :: EventsCount -> EventsCount
abs :: EventsCount -> EventsCount
$csignum :: EventsCount -> EventsCount
signum :: EventsCount -> EventsCount
$cfromInteger :: Integer -> EventsCount
fromInteger :: Integer -> EventsCount
Num)

newtype NumEventsFetchedPerSource = NumEventsFetchedPerSource {NumEventsFetchedPerSource -> HashMap SourceName EventsCount
unNumEventsFetchedPerSource :: HashMap SourceName EventsCount}
  deriving (NumEventsFetchedPerSource -> NumEventsFetchedPerSource -> Bool
(NumEventsFetchedPerSource -> NumEventsFetchedPerSource -> Bool)
-> (NumEventsFetchedPerSource -> NumEventsFetchedPerSource -> Bool)
-> Eq NumEventsFetchedPerSource
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: NumEventsFetchedPerSource -> NumEventsFetchedPerSource -> Bool
== :: NumEventsFetchedPerSource -> NumEventsFetchedPerSource -> Bool
$c/= :: NumEventsFetchedPerSource -> NumEventsFetchedPerSource -> Bool
/= :: NumEventsFetchedPerSource -> NumEventsFetchedPerSource -> Bool
Eq, Int -> NumEventsFetchedPerSource -> ShowS
[NumEventsFetchedPerSource] -> ShowS
NumEventsFetchedPerSource -> String
(Int -> NumEventsFetchedPerSource -> ShowS)
-> (NumEventsFetchedPerSource -> String)
-> ([NumEventsFetchedPerSource] -> ShowS)
-> Show NumEventsFetchedPerSource
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> NumEventsFetchedPerSource -> ShowS
showsPrec :: Int -> NumEventsFetchedPerSource -> ShowS
$cshow :: NumEventsFetchedPerSource -> String
show :: NumEventsFetchedPerSource -> String
$cshowList :: [NumEventsFetchedPerSource] -> ShowS
showList :: [NumEventsFetchedPerSource] -> ShowS
Show)

instance J.ToJSON NumEventsFetchedPerSource where
  toJSON :: NumEventsFetchedPerSource -> Value
toJSON (NumEventsFetchedPerSource HashMap SourceName EventsCount
m) =
    Object -> Value
J.Object (Object -> Value) -> Object -> Value
forall a b. (a -> b) -> a -> b
$ [Pair] -> Object
forall v. [(Key, v)] -> KeyMap v
KeyMap.fromList ([Pair] -> Object) -> [Pair] -> Object
forall a b. (a -> b) -> a -> b
$ ((SourceName, EventsCount) -> Pair)
-> [(SourceName, EventsCount)] -> [Pair]
forall a b. (a -> b) -> [a] -> [b]
map ((Text -> Key
Key.fromText (Text -> Key) -> (SourceName -> Text) -> SourceName -> Key
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SourceName -> Text
sourceNameToText) (SourceName -> Key)
-> (EventsCount -> Value) -> (SourceName, EventsCount) -> Pair
forall b c b' c'. (b -> c) -> (b' -> c') -> (b, b') -> (c, c')
forall (a :: * -> * -> *) b c b' c'.
Arrow a =>
a b c -> a b' c' -> a (b, b') (c, c')
*** EventsCount -> Value
forall a. ToJSON a => a -> Value
J.toJSON) ([(SourceName, EventsCount)] -> [Pair])
-> [(SourceName, EventsCount)] -> [Pair]
forall a b. (a -> b) -> a -> b
$ HashMap SourceName EventsCount -> [(SourceName, EventsCount)]
forall k v. HashMap k v -> [(k, v)]
HashMap.toList HashMap SourceName EventsCount
m

instance Semigroup NumEventsFetchedPerSource where
  (NumEventsFetchedPerSource HashMap SourceName EventsCount
lMap) <> :: NumEventsFetchedPerSource
-> NumEventsFetchedPerSource -> NumEventsFetchedPerSource
<> (NumEventsFetchedPerSource HashMap SourceName EventsCount
rMap) =
    HashMap SourceName EventsCount -> NumEventsFetchedPerSource
NumEventsFetchedPerSource (HashMap SourceName EventsCount -> NumEventsFetchedPerSource)
-> HashMap SourceName EventsCount -> NumEventsFetchedPerSource
forall a b. (a -> b) -> a -> b
$ (EventsCount -> EventsCount -> EventsCount)
-> HashMap SourceName EventsCount
-> HashMap SourceName EventsCount
-> HashMap SourceName EventsCount
forall k v.
(Eq k, Hashable k) =>
(v -> v -> v) -> HashMap k v -> HashMap k v -> HashMap k v
HashMap.unionWith EventsCount -> EventsCount -> EventsCount
forall a. Num a => a -> a -> a
(+) HashMap SourceName EventsCount
lMap HashMap SourceName EventsCount
rMap

instance Monoid NumEventsFetchedPerSource where
  mempty :: NumEventsFetchedPerSource
mempty = HashMap SourceName EventsCount -> NumEventsFetchedPerSource
NumEventsFetchedPerSource HashMap SourceName EventsCount
forall a. Monoid a => a
mempty

data FetchedEventsStats = FetchedEventsStats
  { FetchedEventsStats -> NumEventsFetchedPerSource
_fesNumEventsFetched :: NumEventsFetchedPerSource,
    FetchedEventsStats -> Int
_fesNumFetches :: Int
  }
  deriving (FetchedEventsStats -> FetchedEventsStats -> Bool
(FetchedEventsStats -> FetchedEventsStats -> Bool)
-> (FetchedEventsStats -> FetchedEventsStats -> Bool)
-> Eq FetchedEventsStats
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: FetchedEventsStats -> FetchedEventsStats -> Bool
== :: FetchedEventsStats -> FetchedEventsStats -> Bool
$c/= :: FetchedEventsStats -> FetchedEventsStats -> Bool
/= :: FetchedEventsStats -> FetchedEventsStats -> Bool
Eq, (forall x. FetchedEventsStats -> Rep FetchedEventsStats x)
-> (forall x. Rep FetchedEventsStats x -> FetchedEventsStats)
-> Generic FetchedEventsStats
forall x. Rep FetchedEventsStats x -> FetchedEventsStats
forall x. FetchedEventsStats -> Rep FetchedEventsStats x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. FetchedEventsStats -> Rep FetchedEventsStats x
from :: forall x. FetchedEventsStats -> Rep FetchedEventsStats x
$cto :: forall x. Rep FetchedEventsStats x -> FetchedEventsStats
to :: forall x. Rep FetchedEventsStats x -> FetchedEventsStats
Generic, Int -> FetchedEventsStats -> ShowS
[FetchedEventsStats] -> ShowS
FetchedEventsStats -> String
(Int -> FetchedEventsStats -> ShowS)
-> (FetchedEventsStats -> String)
-> ([FetchedEventsStats] -> ShowS)
-> Show FetchedEventsStats
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> FetchedEventsStats -> ShowS
showsPrec :: Int -> FetchedEventsStats -> ShowS
$cshow :: FetchedEventsStats -> String
show :: FetchedEventsStats -> String
$cshowList :: [FetchedEventsStats] -> ShowS
showList :: [FetchedEventsStats] -> ShowS
Show)

instance J.ToJSON FetchedEventsStats where
  toJSON :: FetchedEventsStats -> Value
toJSON = Options -> FetchedEventsStats -> Value
forall a.
(Generic a, GToJSON' Value Zero (Rep a)) =>
Options -> a -> Value
J.genericToJSON Options
hasuraJSON
  toEncoding :: FetchedEventsStats -> Encoding
toEncoding = Options -> FetchedEventsStats -> Encoding
forall a.
(Generic a, GToJSON' Encoding Zero (Rep a)) =>
Options -> a -> Encoding
J.genericToEncoding Options
hasuraJSON

instance L.ToEngineLog FetchedEventsStats L.Hasura where
  toEngineLog :: FetchedEventsStats -> (LogLevel, EngineLogType Hasura, Value)
toEngineLog FetchedEventsStats
stats =
    (LogLevel
L.LevelInfo, EngineLogType Hasura
L.eventTriggerProcessLogType, FetchedEventsStats -> Value
forall a. ToJSON a => a -> Value
J.toJSON FetchedEventsStats
stats)

instance Semigroup FetchedEventsStats where
  (FetchedEventsStats NumEventsFetchedPerSource
lMap Int
lFetches) <> :: FetchedEventsStats -> FetchedEventsStats -> FetchedEventsStats
<> (FetchedEventsStats NumEventsFetchedPerSource
rMap Int
rFetches) =
    NumEventsFetchedPerSource -> Int -> FetchedEventsStats
FetchedEventsStats (NumEventsFetchedPerSource
lMap NumEventsFetchedPerSource
-> NumEventsFetchedPerSource -> NumEventsFetchedPerSource
forall a. Semigroup a => a -> a -> a
<> NumEventsFetchedPerSource
rMap) (Int
lFetches Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
rFetches)

instance Monoid FetchedEventsStats where
  mempty :: FetchedEventsStats
mempty = NumEventsFetchedPerSource -> Int -> FetchedEventsStats
FetchedEventsStats NumEventsFetchedPerSource
forall a. Monoid a => a
mempty Int
0

type FetchedEventsStatsLogger = FDebounce.Trigger FetchedEventsStats FetchedEventsStats

-- | Logger to accumulate stats of fetched events over a period of time and log once using @'L.Logger L.Hasura'.
-- See @'createStatsLogger' for more details.
createFetchedEventsStatsLogger :: (MonadIO m) => L.Logger L.Hasura -> m FetchedEventsStatsLogger
createFetchedEventsStatsLogger :: forall (m :: * -> *).
MonadIO m =>
Logger Hasura -> m FetchedEventsStatsLogger
createFetchedEventsStatsLogger = Logger Hasura -> m FetchedEventsStatsLogger
forall (m :: * -> *) stats impl.
(MonadIO m, ToEngineLog stats impl, Monoid stats) =>
Logger impl -> m (Trigger stats stats)
L.createStatsLogger

-- | Close the fetched events stats logger.
closeFetchedEventsStatsLogger :: (MonadIO m) => L.Logger L.Hasura -> FetchedEventsStatsLogger -> m ()
closeFetchedEventsStatsLogger :: forall (m :: * -> *).
MonadIO m =>
Logger Hasura -> FetchedEventsStatsLogger -> m ()
closeFetchedEventsStatsLogger = EngineLogType Hasura
-> Logger Hasura -> FetchedEventsStatsLogger -> m ()
forall (m :: * -> *) impl stats.
(MonadIO m, EnabledLogTypes impl) =>
EngineLogType impl -> Logger impl -> Trigger stats stats -> m ()
L.closeStatsLogger EngineLogType Hasura
L.eventTriggerProcessLogType

-- | Log statistics of fetched events. See @'logStats' for more details.
logFetchedEventsStatistics ::
  (MonadIO m) =>
  FetchedEventsStatsLogger ->
  [BackendEventWithSource] ->
  m ()
logFetchedEventsStatistics :: forall (m :: * -> *).
MonadIO m =>
FetchedEventsStatsLogger -> [BackendEventWithSource] -> m ()
logFetchedEventsStatistics FetchedEventsStatsLogger
logger [BackendEventWithSource]
backendEvents =
  FetchedEventsStatsLogger -> FetchedEventsStats -> m ()
forall (m :: * -> *) stats.
MonadIO m =>
Trigger stats stats -> stats -> m ()
L.logStats FetchedEventsStatsLogger
logger (NumEventsFetchedPerSource -> Int -> FetchedEventsStats
FetchedEventsStats NumEventsFetchedPerSource
numEventsFetchedPerSource Int
1)
  where
    numEventsFetchedPerSource :: NumEventsFetchedPerSource
numEventsFetchedPerSource =
      let sourceNames :: [SourceName]
sourceNames = ((BackendEventWithSource -> SourceName)
 -> [BackendEventWithSource] -> [SourceName])
-> [BackendEventWithSource]
-> (BackendEventWithSource -> SourceName)
-> [SourceName]
forall a b c. (a -> b -> c) -> b -> a -> c
flip (BackendEventWithSource -> SourceName)
-> [BackendEventWithSource] -> [SourceName]
forall a b. (a -> b) -> [a] -> [b]
map [BackendEventWithSource]
backendEvents
            ((BackendEventWithSource -> SourceName) -> [SourceName])
-> (BackendEventWithSource -> SourceName) -> [SourceName]
forall a b. (a -> b) -> a -> b
$ \BackendEventWithSource
backendEvent -> forall (c :: BackendType -> Constraint) (i :: BackendType -> *) r.
AllBackendsSatisfy c =>
AnyBackend i -> (forall (b :: BackendType). c b => i b -> r) -> r
AB.dispatchAnyBackend @Backend BackendEventWithSource
backendEvent EventWithSource b -> SourceName
forall (b :: BackendType).
Backend b =>
EventWithSource b -> SourceName
forall (b :: BackendType). EventWithSource b -> SourceName
_ewsSourceName
       in HashMap SourceName EventsCount -> NumEventsFetchedPerSource
NumEventsFetchedPerSource (HashMap SourceName EventsCount -> NumEventsFetchedPerSource)
-> HashMap SourceName EventsCount -> NumEventsFetchedPerSource
forall a b. (a -> b) -> a -> b
$ (EventsCount -> EventsCount -> EventsCount)
-> [(SourceName, EventsCount)] -> HashMap SourceName EventsCount
forall k v.
(Eq k, Hashable k) =>
(v -> v -> v) -> [(k, v)] -> HashMap k v
HashMap.fromListWith EventsCount -> EventsCount -> EventsCount
forall a. Num a => a -> a -> a
(+) [(SourceName
sourceName, EventsCount
1) | SourceName
sourceName <- [SourceName]
sourceNames]

{-# ANN processEventQueue ("HLint: ignore Use withAsync" :: String) #-}

-- | `upperBoundEventTriggerTimeout` is the maximum amount of time
--    an event trigger can take to process. This function is intended
--    to use with a timeout.
upperBoundEventTriggerTimeout :: DiffTime
upperBoundEventTriggerTimeout :: DiffTime
upperBoundEventTriggerTimeout = Minutes -> DiffTime
minutes Minutes
30

-- | Service events from our in-DB queue.
--
-- There are a few competing concerns and constraints here; we want to...
--   - fetch events in batches for lower DB pressure
--   - don't fetch more than N at a time (since that can mean: space leak, less
--     effective scale out, possible double sends for events we've checked out
--     on exit (TODO clean shutdown procedure))
--   - try not to cause webhook workers to stall waiting on DB fetch
--   - limit webhook HTTP concurrency per HASURA_GRAPHQL_EVENTS_HTTP_POOL_SIZE
processEventQueue ::
  forall m.
  ( MonadIO m,
    MonadBaseControl IO m,
    LA.Forall (LA.Pure m),
    MonadMask m,
    Tracing.MonadTrace m,
    MonadGetPolicies m
  ) =>
  L.Logger L.Hasura ->
  FetchedEventsStatsLogger ->
  HTTP.Manager ->
  IO SchemaCache ->
  IO EventEngineCtx ->
  TVar Int ->
  LockedEventsCtx ->
  ServerMetrics ->
  EventTriggerMetrics ->
  MaintenanceMode () ->
  m (Forever m)
processEventQueue :: forall (m :: * -> *).
(MonadIO m, MonadBaseControl IO m, Forall (Pure m), MonadMask m,
 MonadTrace m, MonadGetPolicies m) =>
Logger Hasura
-> FetchedEventsStatsLogger
-> Manager
-> IO SchemaCache
-> IO EventEngineCtx
-> TVar Int
-> LockedEventsCtx
-> ServerMetrics
-> EventTriggerMetrics
-> MaintenanceMode ()
-> m (Forever m)
processEventQueue Logger Hasura
logger FetchedEventsStatsLogger
statsLogger Manager
httpMgr IO SchemaCache
getSchemaCache IO EventEngineCtx
getEventEngineCtx TVar Int
activeEventProcessingThreads LockedEventsCtx {TVar (HashMap SourceName (Set EventId))
leEvents :: TVar (HashMap SourceName (Set EventId))
leEvents :: LockedEventsCtx -> TVar (HashMap SourceName (Set EventId))
leEvents} ServerMetrics
serverMetrics EventTriggerMetrics
eventTriggerMetrics MaintenanceMode ()
maintenanceMode = do
  [BackendEventWithSource]
events0 <- m [BackendEventWithSource]
popEventsBatch
  Forever m -> m (Forever m)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Forever m -> m (Forever m)) -> Forever m -> m (Forever m)
forall a b. (a -> b) -> a -> b
$ ([BackendEventWithSource], Int, Bool)
-> (([BackendEventWithSource], Int, Bool)
    -> m ([BackendEventWithSource], Int, Bool))
-> Forever m
forall (m :: * -> *) a. a -> (a -> m a) -> Forever m
Forever ([BackendEventWithSource]
events0, Int
0, Bool
False) ([BackendEventWithSource], Int, Bool)
-> m ([BackendEventWithSource], Int, Bool)
go
  where
    popEventsBatch :: m [BackendEventWithSource]
    popEventsBatch :: m [BackendEventWithSource]
popEventsBatch = do
      {-
        SELECT FOR UPDATE .. SKIP LOCKED can throw serialization errors in RepeatableRead: https://stackoverflow.com/a/53289263/1911889
        We can avoid this safely by running it in ReadCommitted as Postgres will recheck the
        predicate condition if a row is updated concurrently: https://www.postgresql.org/docs/9.5/transaction-iso.html#XACT-READ-COMMITTED

        Every other action on an event_log row (like post-processing, archival, etc) are single writes (no R-W or W-R)
        so it is safe to perform them in ReadCommitted as well (the writes will then acquire some serial order).
        Any serial order of updates to a row will lead to an eventually consistent state as the row will have
        (delivered=t or error=t or archived=t) after a fixed number of tries (assuming it begins with locked='f').
      -}
      SourceCache
allSources <- SchemaCache -> SourceCache
scSources (SchemaCache -> SourceCache) -> m SchemaCache -> m SourceCache
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO SchemaCache -> m SchemaCache
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO SchemaCache
getSchemaCache
      Int
fetchBatchSize <- Refined NonNegative Int -> Int
forall {k} (p :: k) x. Refined p x -> x
unrefine (Refined NonNegative Int -> Int)
-> (EventEngineCtx -> Refined NonNegative Int)
-> EventEngineCtx
-> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. EventEngineCtx -> Refined NonNegative Int
_eeCtxFetchSize (EventEngineCtx -> Int) -> m EventEngineCtx -> m Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO EventEngineCtx -> m EventEngineCtx
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO EventEngineCtx
getEventEngineCtx
      [BackendEventWithSource]
events <- IO [BackendEventWithSource] -> m [BackendEventWithSource]
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
        (IO [BackendEventWithSource] -> m [BackendEventWithSource])
-> (IO [[BackendEventWithSource]] -> IO [BackendEventWithSource])
-> IO [[BackendEventWithSource]]
-> m [BackendEventWithSource]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ([[BackendEventWithSource]] -> [BackendEventWithSource])
-> IO [[BackendEventWithSource]] -> IO [BackendEventWithSource]
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap [[BackendEventWithSource]] -> [BackendEventWithSource]
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat
        (IO [[BackendEventWithSource]] -> m [BackendEventWithSource])
-> IO [[BackendEventWithSource]] -> m [BackendEventWithSource]
forall a b. (a -> b) -> a -> b
$
        -- fetch pending events across all the sources asynchronously
        [(SourceName, BackendSourceInfo)]
-> ((SourceName, BackendSourceInfo) -> IO [BackendEventWithSource])
-> IO [[BackendEventWithSource]]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, MonadBaseControl IO m, Forall (Pure m)) =>
t a -> (a -> m b) -> m (t b)
LA.forConcurrently (SourceCache -> [(SourceName, BackendSourceInfo)]
forall k v. HashMap k v -> [(k, v)]
HashMap.toList SourceCache
allSources) \(SourceName
sourceName, BackendSourceInfo
sourceCache) ->
          forall (c :: BackendType -> Constraint) (i :: BackendType -> *) r.
AllBackendsSatisfy c =>
AnyBackend i -> (forall (b :: BackendType). c b => i b -> r) -> r
AB.dispatchAnyBackend @BackendEventTrigger BackendSourceInfo
sourceCache \(SourceInfo {Maybe QueryTagsConfig
TableCache b
FunctionCache b
StoredProcedureCache b
LogicalModelCache b
NativeQueryCache b
BackendSourceKind b
SourceName
SourceConfig b
ResolvedSourceCustomization
DBObjectsIntrospection b
_siName :: SourceName
_siSourceKind :: BackendSourceKind b
_siTables :: TableCache b
_siFunctions :: FunctionCache b
_siNativeQueries :: NativeQueryCache b
_siStoredProcedures :: StoredProcedureCache b
_siLogicalModels :: LogicalModelCache b
_siConfiguration :: SourceConfig b
_siQueryTagsConfig :: Maybe QueryTagsConfig
_siCustomization :: ResolvedSourceCustomization
_siDbObjectsIntrospection :: DBObjectsIntrospection b
_siName :: forall (b :: BackendType). SourceInfo b -> SourceName
_siSourceKind :: forall (b :: BackendType). SourceInfo b -> BackendSourceKind b
_siTables :: forall (b :: BackendType). SourceInfo b -> TableCache b
_siFunctions :: forall (b :: BackendType). SourceInfo b -> FunctionCache b
_siNativeQueries :: forall (b :: BackendType). SourceInfo b -> NativeQueryCache b
_siStoredProcedures :: forall (b :: BackendType). SourceInfo b -> StoredProcedureCache b
_siLogicalModels :: forall (b :: BackendType). SourceInfo b -> LogicalModelCache b
_siConfiguration :: forall (b :: BackendType). SourceInfo b -> SourceConfig b
_siQueryTagsConfig :: forall (b :: BackendType). SourceInfo b -> Maybe QueryTagsConfig
_siCustomization :: forall (b :: BackendType).
SourceInfo b -> ResolvedSourceCustomization
_siDbObjectsIntrospection :: forall (b :: BackendType). SourceInfo b -> DBObjectsIntrospection b
..} :: SourceInfo b) -> do
            let tables :: [TableInfo b]
tables = TableCache b -> [TableInfo b]
forall k v. HashMap k v -> [v]
HashMap.elems TableCache b
_siTables
                triggerMap :: [EventTriggerInfoMap b]
triggerMap = TableInfo b -> EventTriggerInfoMap b
forall (b :: BackendType). TableInfo b -> EventTriggerInfoMap b
_tiEventTriggerInfoMap (TableInfo b -> EventTriggerInfoMap b)
-> [TableInfo b] -> [EventTriggerInfoMap b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [TableInfo b]
tables
                eventTriggerCount :: Int
eventTriggerCount = [Int] -> Int
forall a. Num a => [a] -> a
forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum (EventTriggerInfoMap b -> Int
forall k v. HashMap k v -> Int
HashMap.size (EventTriggerInfoMap b -> Int) -> [EventTriggerInfoMap b] -> [Int]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [EventTriggerInfoMap b]
triggerMap)
                triggerNames :: [TriggerName]
triggerNames = (EventTriggerInfoMap b -> [TriggerName])
-> [EventTriggerInfoMap b] -> [TriggerName]
forall (t :: * -> *) a b. Foldable t => (a -> [b]) -> t a -> [b]
concatMap EventTriggerInfoMap b -> [TriggerName]
forall k v. HashMap k v -> [k]
HashMap.keys [EventTriggerInfoMap b]
triggerMap

            -- only process events for this source if at least one event trigger exists
            if Int
eventTriggerCount Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
              then do
                UTCTime
eventPollStartTime <- IO UTCTime
getCurrentTime
                ExceptT QErr IO [Event b] -> IO (Either QErr [Event b])
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (forall (b :: BackendType) (m :: * -> *).
(BackendEventTrigger b, MonadIO m, MonadError QErr m) =>
SourceConfig b
-> SourceName
-> [TriggerName]
-> MaintenanceMode ()
-> FetchBatchSize
-> m [Event b]
fetchUndeliveredEvents @b SourceConfig b
_siConfiguration SourceName
sourceName [TriggerName]
triggerNames MaintenanceMode ()
maintenanceMode (Int -> FetchBatchSize
FetchBatchSize Int
fetchBatchSize)) IO (Either QErr [Event b])
-> (Either QErr [Event b] -> IO [BackendEventWithSource])
-> IO [BackendEventWithSource]
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
                  Right [Event b]
events -> do
                    let eventFetchCount :: Int64
eventFetchCount = Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Int64) -> Int -> Int64
forall a b. (a -> b) -> a -> b
$ [Event b] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Event b]
events
                    Gauge -> Int64 -> IO ()
Prometheus.Gauge.set (EventTriggerMetrics -> Gauge
eventsFetchedPerBatch EventTriggerMetrics
eventTriggerMetrics) Int64
eventFetchCount
                    if ([Event b] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [Event b]
events)
                      then [BackendEventWithSource] -> IO [BackendEventWithSource]
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return []
                      else do
                        UTCTime
eventsFetchedTime <- IO UTCTime
getCurrentTime -- This is also the poll end time
                        let eventPollTime :: Double
eventPollTime = NominalDiffTime -> Double
forall a b. (Real a, Fractional b) => a -> b
realToFrac (NominalDiffTime -> Double) -> NominalDiffTime -> Double
forall a b. (a -> b) -> a -> b
$ UTCTime -> UTCTime -> NominalDiffTime
diffUTCTime UTCTime
eventsFetchedTime UTCTime
eventPollStartTime
                        ()
_ <- Distribution -> Double -> IO ()
EKG.Distribution.add (ServerMetrics -> Distribution
smEventFetchTimePerBatch ServerMetrics
serverMetrics) Double
eventPollTime
                        Histogram -> Double -> IO ()
Prometheus.Histogram.observe (EventTriggerMetrics -> Histogram
eventsFetchTimePerBatch EventTriggerMetrics
eventTriggerMetrics) Double
eventPollTime
                        ()
_ <- Distribution -> Double -> IO ()
EKG.Distribution.add (ServerMetrics -> Distribution
smNumEventsFetchedPerBatch ServerMetrics
serverMetrics) (Int -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Double) -> Int -> Double
forall a b. (a -> b) -> a -> b
$ [Event b] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Event b]
events)
                        SourceName
-> [EventId] -> TVar (HashMap SourceName (Set EventId)) -> IO ()
forall (m :: * -> *).
MonadIO m =>
SourceName
-> [EventId] -> TVar (HashMap SourceName (Set EventId)) -> m ()
saveLockedEventTriggerEvents SourceName
sourceName (Event b -> EventId
forall (b :: BackendType). Event b -> EventId
eId (Event b -> EventId) -> [Event b] -> [EventId]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [Event b]
events) TVar (HashMap SourceName (Set EventId))
leEvents
                        [BackendEventWithSource] -> IO [BackendEventWithSource]
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ([BackendEventWithSource] -> IO [BackendEventWithSource])
-> [BackendEventWithSource] -> IO [BackendEventWithSource]
forall a b. (a -> b) -> a -> b
$ (Event b -> BackendEventWithSource)
-> [Event b] -> [BackendEventWithSource]
forall a b. (a -> b) -> [a] -> [b]
map (\Event b
event -> forall (b :: BackendType) (i :: BackendType -> *).
HasTag b =>
i b -> AnyBackend i
AB.mkAnyBackend @b (EventWithSource b -> BackendEventWithSource)
-> EventWithSource b -> BackendEventWithSource
forall a b. (a -> b) -> a -> b
$ Event b
-> SourceConfig b -> SourceName -> UTCTime -> EventWithSource b
forall (b :: BackendType).
Event b
-> SourceConfig b -> SourceName -> UTCTime -> EventWithSource b
EventWithSource Event b
event SourceConfig b
_siConfiguration SourceName
sourceName UTCTime
eventsFetchedTime) [Event b]
events
                  Left QErr
err -> do
                    IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Logger Hasura
-> forall a (m :: * -> *).
   (ToEngineLog a Hasura, MonadIO m) =>
   a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
   (ToEngineLog a impl, MonadIO m) =>
   a -> m ()
L.unLogger Logger Hasura
logger (EventInternalErr -> IO ()) -> EventInternalErr -> IO ()
forall a b. (a -> b) -> a -> b
$ QErr -> EventInternalErr
EventInternalErr QErr
err
                    [BackendEventWithSource] -> IO [BackendEventWithSource]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
              else [BackendEventWithSource] -> IO [BackendEventWithSource]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []

      -- Log the statistics of events fetched
      FetchedEventsStatsLogger -> [BackendEventWithSource] -> m ()
forall (m :: * -> *).
MonadIO m =>
FetchedEventsStatsLogger -> [BackendEventWithSource] -> m ()
logFetchedEventsStatistics FetchedEventsStatsLogger
statsLogger [BackendEventWithSource]
events
      [BackendEventWithSource] -> m [BackendEventWithSource]
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure [BackendEventWithSource]
events

    -- !!! CAREFUL !!!
    --     The logic here in particular is subtle and has been fixed, broken,
    --     and fixed again in several different ways, several times.
    -- !!! CAREFUL !!!
    --
    -- work on this batch of events while prefetching the next. Recurse after we've forked workers
    -- for each in the batch, minding the requested pool size.
    go :: FetchEventArguments -> m FetchEventArguments
    go :: ([BackendEventWithSource], Int, Bool)
-> m ([BackendEventWithSource], Int, Bool)
go ([BackendEventWithSource]
events, !Int
fullFetchCount, !Bool
alreadyWarned) = do
      EventEngineCtx {TVar Int
DiffTime
Refined NonNegative Int
_eeCtxEventThreadsCapacity :: EventEngineCtx -> TVar Int
_eeCtxFetchInterval :: EventEngineCtx -> DiffTime
_eeCtxFetchSize :: EventEngineCtx -> Refined NonNegative Int
_eeCtxEventThreadsCapacity :: TVar Int
_eeCtxFetchInterval :: DiffTime
_eeCtxFetchSize :: Refined NonNegative Int
..} <- IO EventEngineCtx -> m EventEngineCtx
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO EventEngineCtx
getEventEngineCtx
      let fetchBatchSize :: Int
fetchBatchSize = Refined NonNegative Int -> Int
forall {k} (p :: k) x. Refined p x -> x
unrefine Refined NonNegative Int
_eeCtxFetchSize
      -- process events ASAP until we've caught up; only then can we sleep
      Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when ([BackendEventWithSource] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [BackendEventWithSource]
events) (m () -> m ()) -> (IO () -> m ()) -> IO () -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ DiffTime -> IO ()
sleep DiffTime
_eeCtxFetchInterval

      -- Prefetch next events payload while concurrently working through our current batch.
      -- NOTE: we probably don't need to prefetch so early, but probably not
      -- worth the effort for something more fine-tuned
      [BackendEventWithSource]
eventsNext <- m [BackendEventWithSource]
-> (Async [BackendEventWithSource] -> m [BackendEventWithSource])
-> m [BackendEventWithSource]
forall (m :: * -> *) a b.
(MonadBaseControl IO m, Forall (Pure m)) =>
m a -> (Async a -> m b) -> m b
LA.withAsync m [BackendEventWithSource]
popEventsBatch ((Async [BackendEventWithSource] -> m [BackendEventWithSource])
 -> m [BackendEventWithSource])
-> (Async [BackendEventWithSource] -> m [BackendEventWithSource])
-> m [BackendEventWithSource]
forall a b. (a -> b) -> a -> b
$ \Async [BackendEventWithSource]
eventsNextA -> do
        -- process approximately in order, minding HASURA_GRAPHQL_EVENTS_HTTP_POOL_SIZE:
        [BackendEventWithSource]
-> (BackendEventWithSource -> m ()) -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [BackendEventWithSource]
events ((BackendEventWithSource -> m ()) -> m ())
-> (BackendEventWithSource -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \BackendEventWithSource
eventWithSource ->
          -- NOTE: we implement a logical bracket pattern here with the
          -- increment and decrement of _eeCtxEventThreadsCapacity which
          -- depends on not putting anything that can throw in the body here:
          forall (c :: BackendType -> Constraint) (i :: BackendType -> *) r.
AllBackendsSatisfy c =>
AnyBackend i -> (forall (b :: BackendType). c b => i b -> r) -> r
AB.dispatchAnyBackend @BackendEventTrigger BackendEventWithSource
eventWithSource \(EventWithSource b
eventWithSource' :: EventWithSource b) ->
            m () -> m ()
forall (m :: * -> *) a. MonadMask m => m a -> m a
mask_ (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
              IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
                (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically
                (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
                  -- block until < HASURA_GRAPHQL_EVENTS_HTTP_POOL_SIZE threads:
                  Int
maxCapacity <- TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
_eeCtxEventThreadsCapacity
                  Int
activeThreadCount <- TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
activeEventProcessingThreads
                  Bool -> STM ()
check (Bool -> STM ()) -> Bool -> STM ()
forall a b. (a -> b) -> a -> b
$ Int
maxCapacity Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
activeThreadCount
                  TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
activeEventProcessingThreads (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
              -- since there is some capacity in our worker threads, we can launch another:
              Async ()
t <-
                m () -> m (Async ())
forall (m :: * -> *) a.
(MonadBaseControl IO m, Forall (Pure m)) =>
m a -> m (Async a)
LA.async
                  (m () -> m (Async ())) -> m () -> m (Async ())
forall a b. (a -> b) -> a -> b
$ (ReaderT (Logger Hasura, Manager) m ()
 -> (Logger Hasura, Manager) -> m ())
-> (Logger Hasura, Manager)
-> ReaderT (Logger Hasura, Manager) m ()
-> m ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip ReaderT (Logger Hasura, Manager) m ()
-> (Logger Hasura, Manager) -> m ()
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT (Logger Hasura
logger, Manager
httpMgr)
                  (ReaderT (Logger Hasura, Manager) m () -> m ())
-> ReaderT (Logger Hasura, Manager) m () -> m ()
forall a b. (a -> b) -> a -> b
$ EventWithSource b -> ReaderT (Logger Hasura, Manager) m ()
forall (io :: * -> *) r (b :: BackendType).
(MonadBaseControl IO io, MonadIO io, MonadReader r io,
 Has Manager r, Has (Logger Hasura) r, MonadMask io,
 BackendEventTrigger b, MonadTrace io, MonadGetPolicies io) =>
EventWithSource b -> io ()
processEvent EventWithSource b
eventWithSource'
                  ReaderT (Logger Hasura, Manager) m ()
-> ReaderT (Logger Hasura, Manager) m ()
-> ReaderT (Logger Hasura, Manager) m ()
forall (m :: * -> *) a b. MonadMask m => m a -> m b -> m a
`finally`
                  -- NOTE!: this needs to happen IN THE FORKED THREAD:
                  ReaderT (Logger Hasura, Manager) m ()
decrementActiveThreadCount
              Async () -> m ()
forall (m :: * -> *) a. MonadBase IO m => Async a -> m ()
LA.link Async ()
t

        -- return when next batch ready; some 'processEvent' threads may be running.
        Async [BackendEventWithSource] -> m [BackendEventWithSource]
forall (m :: * -> *) a.
(MonadBase IO m, Forall (Pure m)) =>
Async a -> m a
LA.wait Async [BackendEventWithSource]
eventsNextA

      let lenEvents :: Int
lenEvents = [BackendEventWithSource] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [BackendEventWithSource]
events
      if
        | Int
lenEvents Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
fetchBatchSize -> do
            -- If we've seen N fetches in a row from the DB come back full (i.e. only limited
            -- by our LIMIT clause), then we say we're clearly falling behind:
            let clearlyBehind :: Bool
clearlyBehind = Int
fullFetchCount Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
3
            Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
alreadyWarned
              (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
clearlyBehind
              (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ Logger Hasura
-> forall a (m :: * -> *).
   (ToEngineLog a Hasura, MonadIO m) =>
   a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
   (ToEngineLog a impl, MonadIO m) =>
   a -> m ()
L.unLogger Logger Hasura
logger
              (UnstructuredLog -> m ()) -> UnstructuredLog -> m ()
forall a b. (a -> b) -> a -> b
$ LogLevel -> SerializableBlob -> UnstructuredLog
L.UnstructuredLog LogLevel
L.LevelWarn
              (SerializableBlob -> UnstructuredLog)
-> SerializableBlob -> UnstructuredLog
forall a b. (a -> b) -> a -> b
$ String -> SerializableBlob
forall a. IsString a => String -> a
fromString
              (String -> SerializableBlob) -> String -> SerializableBlob
forall a b. (a -> b) -> a -> b
$ String
"Events processor may not be keeping up with events generated in postgres, "
              String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
"or we're working on a backlog of events. Consider increasing "
              String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
"HASURA_GRAPHQL_EVENTS_HTTP_POOL_SIZE"
            ([BackendEventWithSource], Int, Bool)
-> m ([BackendEventWithSource], Int, Bool)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ([BackendEventWithSource]
eventsNext, (Int
fullFetchCount Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1), (Bool
alreadyWarned Bool -> Bool -> Bool
|| Bool
clearlyBehind))
        | Bool
otherwise -> do
            Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
lenEvents Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
/= Int
fetchBatchSize Bool -> Bool -> Bool
&& Bool
alreadyWarned)
              (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
              -- emit as warning in case users are only logging warning severity and saw above
              Logger Hasura
-> forall a (m :: * -> *).
   (ToEngineLog a Hasura, MonadIO m) =>
   a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
   (ToEngineLog a impl, MonadIO m) =>
   a -> m ()
L.unLogger Logger Hasura
logger
              (UnstructuredLog -> m ()) -> UnstructuredLog -> m ()
forall a b. (a -> b) -> a -> b
$ LogLevel -> SerializableBlob -> UnstructuredLog
L.UnstructuredLog LogLevel
L.LevelWarn
              (SerializableBlob -> UnstructuredLog)
-> SerializableBlob -> UnstructuredLog
forall a b. (a -> b) -> a -> b
$ String -> SerializableBlob
forall a. IsString a => String -> a
fromString
              (String -> SerializableBlob) -> String -> SerializableBlob
forall a b. (a -> b) -> a -> b
$ String
"It looks like the events processor is keeping up again."
            ([BackendEventWithSource], Int, Bool)
-> m ([BackendEventWithSource], Int, Bool)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ([BackendEventWithSource]
eventsNext, Int
0, Bool
False)

    decrementActiveThreadCount :: ReaderT (Logger Hasura, Manager) m ()
decrementActiveThreadCount =
      IO () -> ReaderT (Logger Hasura, Manager) m ()
forall a. IO a -> ReaderT (Logger Hasura, Manager) m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
        (IO () -> ReaderT (Logger Hasura, Manager) m ())
-> IO () -> ReaderT (Logger Hasura, Manager) m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically
        (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
activeEventProcessingThreads (Int -> Int -> Int
forall a. Num a => a -> a -> a
subtract Int
1)

    -- \| Extract a trace context from an event trigger payload.
    extractEventContext :: forall io. (MonadIO io) => J.Value -> io (Maybe Tracing.TraceContext)
    extractEventContext :: forall (io :: * -> *).
MonadIO io =>
Value -> io (Maybe TraceContext)
extractEventContext Value
e = do
      let traceIdMaybe :: Maybe TraceId
traceIdMaybe =
            ByteString -> Maybe TraceId
Tracing.traceIdFromHex
              (ByteString -> Maybe TraceId)
-> (Text -> ByteString) -> Text -> Maybe TraceId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> ByteString
txtToBs
              (Text -> Maybe TraceId) -> Maybe Text -> Maybe TraceId
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Value
e
              Value -> Getting (First Text) Value Text -> Maybe Text
forall s a. s -> Getting (First a) s a -> Maybe a
^? Key -> Traversal' Value Value
forall t. AsValue t => Key -> Traversal' t Value
JL.key Key
"trace_context" ((Value -> Const (First Text) Value)
 -> Value -> Const (First Text) Value)
-> Getting (First Text) Value Text
-> Getting (First Text) Value Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Key -> Traversal' Value Value
forall t. AsValue t => Key -> Traversal' t Value
JL.key Key
"trace_id" ((Value -> Const (First Text) Value)
 -> Value -> Const (First Text) Value)
-> Getting (First Text) Value Text
-> Getting (First Text) Value Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Getting (First Text) Value Text
forall t. AsValue t => Prism' t Text
Prism' Value Text
JL._String
      Maybe TraceId
-> (TraceId -> io TraceContext) -> io (Maybe TraceContext)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
t a -> (a -> f b) -> f (t b)
for Maybe TraceId
traceIdMaybe ((TraceId -> io TraceContext) -> io (Maybe TraceContext))
-> (TraceId -> io TraceContext) -> io (Maybe TraceContext)
forall a b. (a -> b) -> a -> b
$ \TraceId
traceId -> do
        SpanId
freshSpanId <- io SpanId
forall (m :: * -> *). MonadIO m => m SpanId
Tracing.randomSpanId
        let parentSpanId :: Maybe SpanId
parentSpanId =
              ByteString -> Maybe SpanId
Tracing.spanIdFromHex
                (ByteString -> Maybe SpanId)
-> (Text -> ByteString) -> Text -> Maybe SpanId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> ByteString
txtToBs
                (Text -> Maybe SpanId) -> Maybe Text -> Maybe SpanId
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Value
e
                Value -> Getting (First Text) Value Text -> Maybe Text
forall s a. s -> Getting (First a) s a -> Maybe a
^? Key -> Traversal' Value Value
forall t. AsValue t => Key -> Traversal' t Value
JL.key Key
"trace_context" ((Value -> Const (First Text) Value)
 -> Value -> Const (First Text) Value)
-> Getting (First Text) Value Text
-> Getting (First Text) Value Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Key -> Traversal' Value Value
forall t. AsValue t => Key -> Traversal' t Value
JL.key Key
"span_id" ((Value -> Const (First Text) Value)
 -> Value -> Const (First Text) Value)
-> Getting (First Text) Value Text
-> Getting (First Text) Value Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Getting (First Text) Value Text
forall t. AsValue t => Prism' t Text
Prism' Value Text
JL._String
            samplingState :: SamplingState
samplingState =
              Maybe Text -> SamplingState
forall s. (IsString s, Eq s) => Maybe s -> SamplingState
Tracing.samplingStateFromHeader
                (Maybe Text -> SamplingState) -> Maybe Text -> SamplingState
forall a b. (a -> b) -> a -> b
$ Value
e
                Value -> Getting (First Text) Value Text -> Maybe Text
forall s a. s -> Getting (First a) s a -> Maybe a
^? Key -> Traversal' Value Value
forall t. AsValue t => Key -> Traversal' t Value
JL.key Key
"trace_context" ((Value -> Const (First Text) Value)
 -> Value -> Const (First Text) Value)
-> Getting (First Text) Value Text
-> Getting (First Text) Value Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Key -> Traversal' Value Value
forall t. AsValue t => Key -> Traversal' t Value
JL.key Key
"sampling_state" ((Value -> Const (First Text) Value)
 -> Value -> Const (First Text) Value)
-> Getting (First Text) Value Text
-> Getting (First Text) Value Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Getting (First Text) Value Text
forall t. AsValue t => Prism' t Text
Prism' Value Text
JL._String
        TraceContext -> io TraceContext
forall a. a -> io a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (TraceContext -> io TraceContext)
-> TraceContext -> io TraceContext
forall a b. (a -> b) -> a -> b
$ TraceId -> SpanId -> Maybe SpanId -> SamplingState -> TraceContext
Tracing.TraceContext TraceId
traceId SpanId
freshSpanId Maybe SpanId
parentSpanId SamplingState
samplingState

    processEvent ::
      forall io r b.
      ( MonadBaseControl IO io,
        MonadIO io,
        MonadReader r io,
        Has HTTP.Manager r,
        Has (L.Logger L.Hasura) r,
        MonadMask io,
        BackendEventTrigger b,
        Tracing.MonadTrace io,
        MonadGetPolicies io
      ) =>
      EventWithSource b ->
      io ()
    processEvent :: forall (io :: * -> *) r (b :: BackendType).
(MonadBaseControl IO io, MonadIO io, MonadReader r io,
 Has Manager r, Has (Logger Hasura) r, MonadMask io,
 BackendEventTrigger b, MonadTrace io, MonadGetPolicies io) =>
EventWithSource b -> io ()
processEvent (EventWithSource Event b
e SourceConfig b
sourceConfig SourceName
sourceName UTCTime
eventFetchedTime) = do
      -- IO action for fetching the configured metrics granularity
      IO GranularPrometheusMetricsState
getPrometheusMetricsGranularity <- io (IO GranularPrometheusMetricsState)
forall (m :: * -> *).
MonadGetPolicies m =>
m (IO GranularPrometheusMetricsState)
runGetPrometheusMetricsGranularity
      -- Track Queue Time of Event (in seconds). See `smEventQueueTime`
      -- Queue Time = Time when the event was fetched from DB - Time when the event is being processed
      UTCTime
eventProcessTime <- IO UTCTime -> io UTCTime
forall a. IO a -> io a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
      let eventQueueTime :: Double
eventQueueTime = NominalDiffTime -> Double
forall a b. (Real a, Fractional b) => a -> b
realToFrac (NominalDiffTime -> Double) -> NominalDiffTime -> Double
forall a b. (a -> b) -> a -> b
$ UTCTime -> UTCTime -> NominalDiffTime
diffUTCTime UTCTime
eventProcessTime UTCTime
eventFetchedTime
      ()
_ <- IO () -> io ()
forall a. IO a -> io a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> io ()) -> IO () -> io ()
forall a b. (a -> b) -> a -> b
$ Distribution -> Double -> IO ()
EKG.Distribution.add (ServerMetrics -> Distribution
smEventQueueTime ServerMetrics
serverMetrics) Double
eventQueueTime
      IO () -> io ()
forall a. IO a -> io a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
        (IO () -> io ()) -> IO () -> io ()
forall a b. (a -> b) -> a -> b
$ IO GranularPrometheusMetricsState
-> Bool
-> HistogramVector (Maybe DynamicEventTriggerLabel)
-> DynamicEventTriggerLabel
-> Double
-> IO ()
forall l (m :: * -> *).
(Ord l, MonadIO m) =>
IO GranularPrometheusMetricsState
-> Bool -> HistogramVector (Maybe l) -> l -> Double -> m ()
observeHistogramWithLabel
          IO GranularPrometheusMetricsState
getPrometheusMetricsGranularity
          Bool
True
          (EventTriggerMetrics
-> HistogramVector (Maybe DynamicEventTriggerLabel)
eventQueueTimeSeconds EventTriggerMetrics
eventTriggerMetrics)
          (TriggerName -> SourceName -> DynamicEventTriggerLabel
DynamicEventTriggerLabel (TriggerMetadata -> TriggerName
tmName (Event b -> TriggerMetadata
forall (b :: BackendType). Event b -> TriggerMetadata
eTrigger Event b
e)) SourceName
sourceName)
          Double
eventQueueTime

      SchemaCache
cache <- IO SchemaCache -> io SchemaCache
forall a. IO a -> io a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO SchemaCache
getSchemaCache

      Text -> io () -> io ()
trace <-
        Value -> io (Maybe TraceContext)
forall (io :: * -> *).
MonadIO io =>
Value -> io (Maybe TraceContext)
extractEventContext (Event b -> Value
forall (b :: BackendType). Event b -> Value
eEvent Event b
e) io (Maybe TraceContext)
-> (Maybe TraceContext -> Text -> io () -> io ())
-> io (Text -> io () -> io ())
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \case
          Maybe TraceContext
Nothing -> SamplingPolicy -> Text -> io () -> io ()
forall (m :: * -> *) a.
(MonadIO m, MonadTrace m) =>
SamplingPolicy -> Text -> m a -> m a
Tracing.newTrace SamplingPolicy
Tracing.sampleAlways
          Just TraceContext
ctx -> TraceContext -> SamplingPolicy -> Text -> io () -> io ()
forall a. TraceContext -> SamplingPolicy -> Text -> io a -> io a
forall (m :: * -> *) a.
MonadTrace m =>
TraceContext -> SamplingPolicy -> Text -> m a -> m a
Tracing.newTraceWith TraceContext
ctx SamplingPolicy
Tracing.sampleAlways
      let spanName :: EventTriggerInfo b -> Text
spanName EventTriggerInfo b
eti = Text
"Event trigger: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> NonEmptyText -> Text
unNonEmptyText (TriggerName -> NonEmptyText
unTriggerName (EventTriggerInfo b -> TriggerName
forall (b :: BackendType). EventTriggerInfo b -> TriggerName
etiName EventTriggerInfo b
eti))

      Either QErr (MaintenanceMode MaintenanceModeVersion)
maintenanceModeVersionEither :: Either QErr (MaintenanceMode MaintenanceModeVersion) <-
        case MaintenanceMode ()
maintenanceMode of
          MaintenanceModeEnabled () -> do
            ExceptT QErr io MaintenanceModeVersion
-> io (Either QErr MaintenanceModeVersion)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (forall (b :: BackendType) (m :: * -> *).
(BackendEventTrigger b, MonadIO m, MonadError QErr m) =>
SourceConfig b -> m MaintenanceModeVersion
getMaintenanceModeVersion @b SourceConfig b
sourceConfig) io (Either QErr MaintenanceModeVersion)
-> (Either QErr MaintenanceModeVersion
    -> Either QErr (MaintenanceMode MaintenanceModeVersion))
-> io (Either QErr (MaintenanceMode MaintenanceModeVersion))
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \case
              Left QErr
err -> QErr -> Either QErr (MaintenanceMode MaintenanceModeVersion)
forall a b. a -> Either a b
Left QErr
err
              Right MaintenanceModeVersion
maintenanceModeVersion -> MaintenanceMode MaintenanceModeVersion
-> Either QErr (MaintenanceMode MaintenanceModeVersion)
forall a b. b -> Either a b
Right (MaintenanceMode MaintenanceModeVersion
 -> Either QErr (MaintenanceMode MaintenanceModeVersion))
-> MaintenanceMode MaintenanceModeVersion
-> Either QErr (MaintenanceMode MaintenanceModeVersion)
forall a b. (a -> b) -> a -> b
$ (MaintenanceModeVersion -> MaintenanceMode MaintenanceModeVersion
forall a. a -> MaintenanceMode a
MaintenanceModeEnabled MaintenanceModeVersion
maintenanceModeVersion)
          MaintenanceMode ()
MaintenanceModeDisabled -> Either QErr (MaintenanceMode MaintenanceModeVersion)
-> io (Either QErr (MaintenanceMode MaintenanceModeVersion))
forall a. a -> io a
forall (m :: * -> *) a. Monad m => a -> m a
return (Either QErr (MaintenanceMode MaintenanceModeVersion)
 -> io (Either QErr (MaintenanceMode MaintenanceModeVersion)))
-> Either QErr (MaintenanceMode MaintenanceModeVersion)
-> io (Either QErr (MaintenanceMode MaintenanceModeVersion))
forall a b. (a -> b) -> a -> b
$ MaintenanceMode MaintenanceModeVersion
-> Either QErr (MaintenanceMode MaintenanceModeVersion)
forall a b. b -> Either a b
Right MaintenanceMode MaintenanceModeVersion
forall a. MaintenanceMode a
MaintenanceModeDisabled

      case Either QErr (MaintenanceMode MaintenanceModeVersion)
maintenanceModeVersionEither of
        Left QErr
maintenanceModeVersionErr -> QErr -> io ()
forall r (m :: * -> *).
(MonadReader r m, Has (Logger Hasura) r, MonadIO m) =>
QErr -> m ()
logQErr QErr
maintenanceModeVersionErr
        Right MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion ->
          case SchemaCache -> Event b -> Either Text (EventTriggerInfo b)
forall (b :: BackendType).
Backend b =>
SchemaCache -> Event b -> Either Text (EventTriggerInfo b)
getEventTriggerInfoFromEvent SchemaCache
cache Event b
e of
            Left Text
err -> do
              --  This rare error can happen in the following known cases:
              --  i) schema cache is not up-to-date (due to some bug, say during schema syncing across multiple instances)
              --  ii) the event trigger is dropped when this event was just fetched
              QErr -> io ()
forall r (m :: * -> *).
(MonadReader r m, Has (Logger Hasura) r, MonadIO m) =>
QErr -> m ()
logQErr (QErr -> io ()) -> QErr -> io ()
forall a b. (a -> b) -> a -> b
$ Code -> Text -> QErr
err500 Code
Unexpected Text
err
              UTCTime
currentTime <- IO UTCTime -> io UTCTime
forall a. IO a -> io a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
              -- For such an event, we unlock the event and retry after a minute
              ExceptT QErr io () -> io (Either QErr ())
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (SourceConfig b
-> Event b
-> UTCTime
-> MaintenanceMode MaintenanceModeVersion
-> ExceptT QErr io ()
forall (b :: BackendType) (m :: * -> *).
(BackendEventTrigger b, MonadIO m, MonadError QErr m) =>
SourceConfig b
-> Event b
-> UTCTime
-> MaintenanceMode MaintenanceModeVersion
-> m ()
forall (m :: * -> *).
(MonadIO m, MonadError QErr m) =>
SourceConfig b
-> Event b
-> UTCTime
-> MaintenanceMode MaintenanceModeVersion
-> m ()
setRetry SourceConfig b
sourceConfig Event b
e (NominalDiffTime -> UTCTime -> UTCTime
addUTCTime NominalDiffTime
60 UTCTime
currentTime) MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion)
                io (Either QErr ()) -> (Either QErr () -> io ()) -> io ()
forall a b. io a -> (a -> io b) -> io b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Either QErr () -> (QErr -> io ()) -> io ())
-> (QErr -> io ()) -> Either QErr () -> io ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip Either QErr () -> (QErr -> io ()) -> io ()
forall (m :: * -> *) e a.
Applicative m =>
Either e a -> (e -> m a) -> m a
onLeft QErr -> io ()
forall r (m :: * -> *).
(MonadReader r m, Has (Logger Hasura) r, MonadIO m) =>
QErr -> m ()
logQErr
            Right EventTriggerInfo b
eti -> Text -> io () -> io ()
trace (EventTriggerInfo b -> Text
forall {b :: BackendType}. EventTriggerInfo b -> Text
spanName EventTriggerInfo b
eti) do
              let webhook :: EnvRecord ResolvedWebhook
webhook = WebhookConfInfo -> EnvRecord ResolvedWebhook
wciCachedValue (WebhookConfInfo -> EnvRecord ResolvedWebhook)
-> WebhookConfInfo -> EnvRecord ResolvedWebhook
forall a b. (a -> b) -> a -> b
$ EventTriggerInfo b -> WebhookConfInfo
forall (b :: BackendType). EventTriggerInfo b -> WebhookConfInfo
etiWebhookInfo EventTriggerInfo b
eti
                  retryConf :: RetryConf
retryConf = EventTriggerInfo b -> RetryConf
forall (b :: BackendType). EventTriggerInfo b -> RetryConf
etiRetryConf EventTriggerInfo b
eti
                  timeoutSeconds :: Int
timeoutSeconds = Int -> Maybe Int -> Int
forall a. a -> Maybe a -> a
fromMaybe Int
defaultTimeoutSeconds (RetryConf -> Maybe Int
rcTimeoutSec RetryConf
retryConf)
                  httpTimeout :: ResponseTimeout
httpTimeout = Int -> ResponseTimeout
HTTP.responseTimeoutMicro (Int
timeoutSeconds Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000000)
                  ([Header]
headers, [HeaderConf]
logHeaders) = [EventHeaderInfo] -> ([Header], [HeaderConf])
prepareHeaders (EventTriggerInfo b -> [EventHeaderInfo]
forall (b :: BackendType). EventTriggerInfo b -> [EventHeaderInfo]
etiHeaders EventTriggerInfo b
eti)
                  ep :: EventPayload b
ep = RetryConf -> Event b -> EventPayload b
forall (b :: BackendType). RetryConf -> Event b -> EventPayload b
createEventPayload RetryConf
retryConf Event b
e
                  payload :: ByteString
payload = Value -> ByteString
forall a. ToJSON a => a -> ByteString
J.encode (Value -> ByteString) -> Value -> ByteString
forall a b. (a -> b) -> a -> b
$ EventPayload b -> Value
forall a. ToJSON a => a -> Value
J.toJSON EventPayload b
ep
                  extraLogCtx :: ExtraLogContext
extraLogCtx = EventId -> Maybe TriggerName -> ExtraLogContext
ExtraLogContext (EventPayload b -> EventId
forall (b :: BackendType). EventPayload b -> EventId
epId EventPayload b
ep) (TriggerName -> Maybe TriggerName
forall a. a -> Maybe a
Just (TriggerName -> Maybe TriggerName)
-> TriggerName -> Maybe TriggerName
forall a b. (a -> b) -> a -> b
$ EventTriggerInfo b -> TriggerName
forall (b :: BackendType). EventTriggerInfo b -> TriggerName
etiName EventTriggerInfo b
eti)
                  requestTransform :: Maybe RequestTransform
requestTransform = EventTriggerInfo b -> Maybe RequestTransform
forall (b :: BackendType).
EventTriggerInfo b -> Maybe RequestTransform
etiRequestTransform EventTriggerInfo b
eti
                  responseTransform :: Maybe ResponseTransform
responseTransform = MetadataResponseTransform -> ResponseTransform
mkResponseTransform (MetadataResponseTransform -> ResponseTransform)
-> Maybe MetadataResponseTransform -> Maybe ResponseTransform
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> EventTriggerInfo b -> Maybe MetadataResponseTransform
forall (b :: BackendType).
EventTriggerInfo b -> Maybe MetadataResponseTransform
etiResponseTransform EventTriggerInfo b
eti
                  eventTriggerProcessingTimeout :: DiffTime
eventTriggerProcessingTimeout = DiffTime -> (Int -> DiffTime) -> Maybe Int -> DiffTime
forall b a. b -> (a -> b) -> Maybe a -> b
maybe DiffTime
upperBoundEventTriggerTimeout (DiffTime -> DiffTime -> DiffTime
forall a. Ord a => a -> a -> a
min DiffTime
upperBoundEventTriggerTimeout (DiffTime -> DiffTime) -> (Int -> DiffTime) -> Int -> DiffTime
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> DiffTime
forall a b. (Integral a, Num b) => a -> b
fromIntegral) (RetryConf -> Maybe Int
rcTimeoutSec RetryConf
retryConf)
                  eventTriggerProcessAction :: io ()
eventTriggerProcessAction = do
                    UTCTime
eventExecutionStartTime <- IO UTCTime -> io UTCTime
forall a. IO a -> io a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
                    Either
  (TransformableRequestError 'EventType)
  (Request, HTTPResp 'EventType)
eitherReqRes <-
                      ExceptT
  (TransformableRequestError 'EventType)
  io
  (Request, HTTPResp 'EventType)
-> io
     (Either
        (TransformableRequestError 'EventType)
        (Request, HTTPResp 'EventType))
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT
                        (ExceptT
   (TransformableRequestError 'EventType)
   io
   (Request, HTTPResp 'EventType)
 -> io
      (Either
         (TransformableRequestError 'EventType)
         (Request, HTTPResp 'EventType)))
-> ExceptT
     (TransformableRequestError 'EventType)
     io
     (Request, HTTPResp 'EventType)
-> io
     (Either
        (TransformableRequestError 'EventType)
        (Request, HTTPResp 'EventType))
forall a b. (a -> b) -> a -> b
$ [Header]
-> ResponseTimeout
-> ByteString
-> Maybe RequestTransform
-> ResolvedWebhook
-> ExceptT (TransformableRequestError 'EventType) io RequestDetails
forall (a :: TriggerTypes) (m :: * -> *).
MonadError (TransformableRequestError a) m =>
[Header]
-> ResponseTimeout
-> ByteString
-> Maybe RequestTransform
-> ResolvedWebhook
-> m RequestDetails
mkRequest [Header]
headers ResponseTimeout
httpTimeout ByteString
payload Maybe RequestTransform
requestTransform (EnvRecord ResolvedWebhook -> ResolvedWebhook
forall a. EnvRecord a -> a
_envVarValue EnvRecord ResolvedWebhook
webhook)
                        ExceptT (TransformableRequestError 'EventType) io RequestDetails
-> (RequestDetails
    -> ExceptT
         (TransformableRequestError 'EventType)
         io
         (Request, HTTPResp 'EventType))
-> ExceptT
     (TransformableRequestError 'EventType)
     io
     (Request, HTTPResp 'EventType)
forall a b.
ExceptT (TransformableRequestError 'EventType) io a
-> (a -> ExceptT (TransformableRequestError 'EventType) io b)
-> ExceptT (TransformableRequestError 'EventType) io b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \RequestDetails
reqDetails -> do
                          let request :: Request
request = RequestDetails -> Request
extractRequest RequestDetails
reqDetails
                              logger' :: Either (HTTPErr 'EventType) (HTTPResp 'EventType)
-> RequestDetails
-> ExceptT (TransformableRequestError 'EventType) io ()
logger' Either (HTTPErr 'EventType) (HTTPResp 'EventType)
res RequestDetails
details = do
                                Either (HTTPErr 'EventType) (HTTPResp 'EventType)
-> ExtraLogContext
-> RequestDetails
-> Text
-> [HeaderConf]
-> ExceptT (TransformableRequestError 'EventType) io ()
forall r (m :: * -> *).
(MonadReader r m, Has (Logger Hasura) r, MonadIO m) =>
Either (HTTPErr 'EventType) (HTTPResp 'EventType)
-> ExtraLogContext
-> RequestDetails
-> Text
-> [HeaderConf]
-> m ()
logHTTPForET Either (HTTPErr 'EventType) (HTTPResp 'EventType)
res ExtraLogContext
extraLogCtx RequestDetails
details (EnvRecord ResolvedWebhook -> Text
forall a. EnvRecord a -> Text
_envVarName EnvRecord ResolvedWebhook
webhook) [HeaderConf]
logHeaders
                                IO () -> ExceptT (TransformableRequestError 'EventType) io ()
forall a.
IO a -> ExceptT (TransformableRequestError 'EventType) io a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ExceptT (TransformableRequestError 'EventType) io ())
-> IO () -> ExceptT (TransformableRequestError 'EventType) io ()
forall a b. (a -> b) -> a -> b
$ do
                                  case Either (HTTPErr 'EventType) (HTTPResp 'EventType)
res of
                                    Left HTTPErr 'EventType
_err -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
                                    Right HTTPResp 'EventType
response ->
                                      Counter -> Int64 -> IO ()
Prometheus.Counter.add
                                        (EventTriggerMetrics -> Counter
eventTriggerBytesReceived EventTriggerMetrics
eventTriggerMetrics)
                                        (HTTPResp 'EventType -> Int64
forall (a :: TriggerTypes). HTTPResp a -> Int64
hrsSize HTTPResp 'EventType
response)
                                  let RequestDetails {Int64
_rdOriginalSize :: Int64
_rdOriginalSize :: RequestDetails -> Int64
_rdOriginalSize, Maybe Int64
_rdTransformedSize :: Maybe Int64
_rdTransformedSize :: RequestDetails -> Maybe Int64
_rdTransformedSize} = RequestDetails
details
                                   in Counter -> Int64 -> IO ()
Prometheus.Counter.add
                                        (EventTriggerMetrics -> Counter
eventTriggerBytesSent EventTriggerMetrics
eventTriggerMetrics)
                                        (Int64 -> Maybe Int64 -> Int64
forall a. a -> Maybe a -> a
fromMaybe Int64
_rdOriginalSize Maybe Int64
_rdTransformedSize)
                          -- Event Triggers have a configuration parameter called
                          -- HASURA_GRAPHQL_EVENTS_HTTP_WORKERS, which is used
                          -- to control the concurrency of http delivery.
                          -- This bracket is used to increment and decrement an
                          -- HTTP Worker EKG Gauge for the duration of the
                          -- request invocation
                          HTTPResp 'EventType
resp <-
                            ExceptT (TransformableRequestError 'EventType) io ()
-> ExceptT (TransformableRequestError 'EventType) io ()
-> ExceptT
     (TransformableRequestError 'EventType) io (HTTPResp 'EventType)
-> ExceptT
     (TransformableRequestError 'EventType) io (HTTPResp 'EventType)
forall (m :: * -> *) a c b. MonadMask m => m a -> m c -> m b -> m b
bracket_
                              ( do
                                  IO () -> ExceptT (TransformableRequestError 'EventType) io ()
forall a.
IO a -> ExceptT (TransformableRequestError 'EventType) io a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ExceptT (TransformableRequestError 'EventType) io ())
-> IO () -> ExceptT (TransformableRequestError 'EventType) io ()
forall a b. (a -> b) -> a -> b
$ Gauge -> IO ()
EKG.Gauge.inc (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ ServerMetrics -> Gauge
smNumEventHTTPWorkers ServerMetrics
serverMetrics
                                  IO () -> ExceptT (TransformableRequestError 'EventType) io ()
forall a.
IO a -> ExceptT (TransformableRequestError 'EventType) io a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ExceptT (TransformableRequestError 'EventType) io ())
-> IO () -> ExceptT (TransformableRequestError 'EventType) io ()
forall a b. (a -> b) -> a -> b
$ Gauge -> IO ()
Prometheus.Gauge.inc (EventTriggerMetrics -> Gauge
eventTriggerHTTPWorkers EventTriggerMetrics
eventTriggerMetrics)
                              )
                              ( do
                                  IO () -> ExceptT (TransformableRequestError 'EventType) io ()
forall a.
IO a -> ExceptT (TransformableRequestError 'EventType) io a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ExceptT (TransformableRequestError 'EventType) io ())
-> IO () -> ExceptT (TransformableRequestError 'EventType) io ()
forall a b. (a -> b) -> a -> b
$ Gauge -> IO ()
EKG.Gauge.dec (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ ServerMetrics -> Gauge
smNumEventHTTPWorkers ServerMetrics
serverMetrics
                                  IO () -> ExceptT (TransformableRequestError 'EventType) io ()
forall a.
IO a -> ExceptT (TransformableRequestError 'EventType) io a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ExceptT (TransformableRequestError 'EventType) io ())
-> IO () -> ExceptT (TransformableRequestError 'EventType) io ()
forall a b. (a -> b) -> a -> b
$ Gauge -> IO ()
Prometheus.Gauge.dec (EventTriggerMetrics -> Gauge
eventTriggerHTTPWorkers EventTriggerMetrics
eventTriggerMetrics)
                              )
                              (RequestDetails
-> Maybe ResponseTransform
-> Maybe SessionVariables
-> (Either (HTTPErr 'EventType) (HTTPResp 'EventType)
    -> RequestDetails
    -> ExceptT (TransformableRequestError 'EventType) io ())
-> ExceptT
     (TransformableRequestError 'EventType) io (HTTPResp 'EventType)
forall r (m :: * -> *) (a :: TriggerTypes).
(MonadReader r m, MonadError (TransformableRequestError a) m,
 Has Manager r, Has (Logger Hasura) r, MonadIO m, MonadTrace m) =>
RequestDetails
-> Maybe ResponseTransform
-> Maybe SessionVariables
-> (Either (HTTPErr a) (HTTPResp a) -> RequestDetails -> m ())
-> m (HTTPResp a)
invokeRequest RequestDetails
reqDetails Maybe ResponseTransform
responseTransform (RequestDetails -> Maybe SessionVariables
_rdSessionVars RequestDetails
reqDetails) Either (HTTPErr 'EventType) (HTTPResp 'EventType)
-> RequestDetails
-> ExceptT (TransformableRequestError 'EventType) io ()
logger')
                          (Request, HTTPResp 'EventType)
-> ExceptT
     (TransformableRequestError 'EventType)
     io
     (Request, HTTPResp 'EventType)
forall a. a -> ExceptT (TransformableRequestError 'EventType) io a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Request
request, HTTPResp 'EventType
resp)
                    case Either
  (TransformableRequestError 'EventType)
  (Request, HTTPResp 'EventType)
eitherReqRes of
                      Right (Request
req, HTTPResp 'EventType
resp) -> do
                        let reqBody :: Value
reqBody = Value -> Maybe Value -> Value
forall a. a -> Maybe a -> a
fromMaybe Value
J.Null (Maybe Value -> Value) -> Maybe Value -> Value
forall a b. (a -> b) -> a -> b
$ Getting (First ByteString) Request ByteString
-> Request -> Maybe ByteString
forall s (m :: * -> *) a.
MonadReader s m =>
Getting (First a) s a -> m (Maybe a)
preview ((RequestBody -> Const (First ByteString) RequestBody)
-> Request -> Const (First ByteString) Request
Lens' Request RequestBody
HTTP.body ((RequestBody -> Const (First ByteString) RequestBody)
 -> Request -> Const (First ByteString) Request)
-> ((ByteString -> Const (First ByteString) ByteString)
    -> RequestBody -> Const (First ByteString) RequestBody)
-> Getting (First ByteString) Request ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ByteString -> Const (First ByteString) ByteString)
-> RequestBody -> Const (First ByteString) RequestBody
Prism' RequestBody ByteString
HTTP._RequestBodyLBS) Request
req Maybe ByteString -> (ByteString -> Maybe Value) -> Maybe Value
forall a b. Maybe a -> (a -> Maybe b) -> Maybe b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall a. FromJSON a => ByteString -> Maybe a
J.decode @J.Value
                        SourceConfig b
-> Event b
-> [HeaderConf]
-> Value
-> MaintenanceMode MaintenanceModeVersion
-> HTTPResp 'EventType
-> io (Either QErr ())
forall (b :: BackendType) (m :: * -> *) (a :: TriggerTypes).
(MonadIO m, BackendEventTrigger b) =>
SourceConfig b
-> Event b
-> [HeaderConf]
-> Value
-> MaintenanceMode MaintenanceModeVersion
-> HTTPResp a
-> m (Either QErr ())
processSuccess SourceConfig b
sourceConfig Event b
e [HeaderConf]
logHeaders Value
reqBody MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion HTTPResp 'EventType
resp io (Either QErr ()) -> (Either QErr () -> io ()) -> io ()
forall a b. io a -> (a -> io b) -> io b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Either QErr () -> (QErr -> io ()) -> io ())
-> (QErr -> io ()) -> Either QErr () -> io ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip Either QErr () -> (QErr -> io ()) -> io ()
forall (m :: * -> *) e a.
Applicative m =>
Either e a -> (e -> m a) -> m a
onLeft QErr -> io ()
forall r (m :: * -> *).
(MonadReader r m, Has (Logger Hasura) r, MonadIO m) =>
QErr -> m ()
logQErr
                        UTCTime
eventExecutionFinishTime <- IO UTCTime -> io UTCTime
forall a. IO a -> io a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
                        let eventWebhookProcessingTime' :: Double
eventWebhookProcessingTime' = NominalDiffTime -> Double
forall a b. (Real a, Fractional b) => a -> b
realToFrac (NominalDiffTime -> Double) -> NominalDiffTime -> Double
forall a b. (a -> b) -> a -> b
$ UTCTime -> UTCTime -> NominalDiffTime
diffUTCTime UTCTime
eventExecutionFinishTime UTCTime
eventExecutionStartTime
                            -- For event_processing_time, the start time is defined as the expected delivery time for an event, i.e.:
                            --  - For event with no retries: created_at (at UTC) time
                            --  - For event with retries: next_retry_at (at UTC) time

                            -- The timestamps in the DB are supposed to be UTC time, so the timestamps (`eventExecutionFinishTime` and
                            -- `eventStartTime`) used here in calculation are all UTC time.
                            eventStartTime :: UTCTime
eventStartTime = UTCTime -> Maybe UTCTime -> UTCTime
forall a. a -> Maybe a -> a
fromMaybe (Event b -> UTCTime
forall (b :: BackendType). Event b -> UTCTime
eCreatedAtUTC Event b
e) (Event b -> Maybe UTCTime
forall (b :: BackendType). Event b -> Maybe UTCTime
eRetryAtUTC Event b
e)
                            eventProcessingTime' :: Double
eventProcessingTime' = NominalDiffTime -> Double
forall a b. (Real a, Fractional b) => a -> b
realToFrac (NominalDiffTime -> Double) -> NominalDiffTime -> Double
forall a b. (a -> b) -> a -> b
$ UTCTime -> UTCTime -> NominalDiffTime
diffUTCTime UTCTime
eventExecutionFinishTime UTCTime
eventStartTime
                        IO GranularPrometheusMetricsState
-> Bool
-> HistogramVector (Maybe DynamicEventTriggerLabel)
-> DynamicEventTriggerLabel
-> Double
-> io ()
forall l (m :: * -> *).
(Ord l, MonadIO m) =>
IO GranularPrometheusMetricsState
-> Bool -> HistogramVector (Maybe l) -> l -> Double -> m ()
observeHistogramWithLabel
                          IO GranularPrometheusMetricsState
getPrometheusMetricsGranularity
                          Bool
True
                          (EventTriggerMetrics
-> HistogramVector (Maybe DynamicEventTriggerLabel)
eventProcessingTime EventTriggerMetrics
eventTriggerMetrics)
                          (TriggerName -> SourceName -> DynamicEventTriggerLabel
DynamicEventTriggerLabel (EventTriggerInfo b -> TriggerName
forall (b :: BackendType). EventTriggerInfo b -> TriggerName
etiName EventTriggerInfo b
eti) SourceName
sourceName)
                          Double
eventProcessingTime'
                        IO () -> io ()
forall a. IO a -> io a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> io ()) -> IO () -> io ()
forall a b. (a -> b) -> a -> b
$ do
                          Distribution -> Double -> IO ()
EKG.Distribution.add (ServerMetrics -> Distribution
smEventWebhookProcessingTime ServerMetrics
serverMetrics) Double
eventWebhookProcessingTime'
                          IO GranularPrometheusMetricsState
-> Bool
-> HistogramVector (Maybe DynamicEventTriggerLabel)
-> DynamicEventTriggerLabel
-> Double
-> IO ()
forall l (m :: * -> *).
(Ord l, MonadIO m) =>
IO GranularPrometheusMetricsState
-> Bool -> HistogramVector (Maybe l) -> l -> Double -> m ()
observeHistogramWithLabel
                            IO GranularPrometheusMetricsState
getPrometheusMetricsGranularity
                            Bool
True
                            (EventTriggerMetrics
-> HistogramVector (Maybe DynamicEventTriggerLabel)
eventWebhookProcessingTime EventTriggerMetrics
eventTriggerMetrics)
                            (TriggerName -> SourceName -> DynamicEventTriggerLabel
DynamicEventTriggerLabel (EventTriggerInfo b -> TriggerName
forall (b :: BackendType). EventTriggerInfo b -> TriggerName
etiName EventTriggerInfo b
eti) SourceName
sourceName)
                            Double
eventWebhookProcessingTime'
                          Distribution -> Double -> IO ()
EKG.Distribution.add (ServerMetrics -> Distribution
smEventProcessingTime ServerMetrics
serverMetrics) Double
eventProcessingTime'
                          IO GranularPrometheusMetricsState
-> Bool
-> CounterVector EventStatusWithTriggerLabel
-> EventStatusWithTriggerLabel
-> IO ()
forall (m :: * -> *).
MonadIO m =>
IO GranularPrometheusMetricsState
-> Bool
-> CounterVector EventStatusWithTriggerLabel
-> EventStatusWithTriggerLabel
-> m ()
incEventTriggerCounterWithLabel
                            IO GranularPrometheusMetricsState
getPrometheusMetricsGranularity
                            Bool
True
                            (EventTriggerMetrics -> CounterVector EventStatusWithTriggerLabel
eventProcessedTotal EventTriggerMetrics
eventTriggerMetrics)
                            (EventStatusLabel
-> Maybe DynamicEventTriggerLabel -> EventStatusWithTriggerLabel
EventStatusWithTriggerLabel EventStatusLabel
eventSuccessLabel (DynamicEventTriggerLabel -> Maybe DynamicEventTriggerLabel
forall a. a -> Maybe a
Just (TriggerName -> SourceName -> DynamicEventTriggerLabel
DynamicEventTriggerLabel (EventTriggerInfo b -> TriggerName
forall (b :: BackendType). EventTriggerInfo b -> TriggerName
etiName EventTriggerInfo b
eti) SourceName
sourceName)))
                          IO GranularPrometheusMetricsState
-> Bool
-> CounterVector EventStatusWithTriggerLabel
-> EventStatusWithTriggerLabel
-> IO ()
forall (m :: * -> *).
MonadIO m =>
IO GranularPrometheusMetricsState
-> Bool
-> CounterVector EventStatusWithTriggerLabel
-> EventStatusWithTriggerLabel
-> m ()
incEventTriggerCounterWithLabel
                            IO GranularPrometheusMetricsState
getPrometheusMetricsGranularity
                            Bool
True
                            (EventTriggerMetrics -> CounterVector EventStatusWithTriggerLabel
eventInvocationTotal EventTriggerMetrics
eventTriggerMetrics)
                            (EventStatusLabel
-> Maybe DynamicEventTriggerLabel -> EventStatusWithTriggerLabel
EventStatusWithTriggerLabel EventStatusLabel
eventSuccessLabel (DynamicEventTriggerLabel -> Maybe DynamicEventTriggerLabel
forall a. a -> Maybe a
Just (TriggerName -> SourceName -> DynamicEventTriggerLabel
DynamicEventTriggerLabel (EventTriggerInfo b -> TriggerName
forall (b :: BackendType). EventTriggerInfo b -> TriggerName
etiName EventTriggerInfo b
eti) SourceName
sourceName)))
                      Left TransformableRequestError 'EventType
eventError -> do
                        -- TODO (paritosh): We can also add a label to the metric to indicate the type of error
                        IO () -> io ()
forall a. IO a -> io a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
                          (IO () -> io ()) -> IO () -> io ()
forall a b. (a -> b) -> a -> b
$ IO GranularPrometheusMetricsState
-> Bool
-> CounterVector EventStatusWithTriggerLabel
-> EventStatusWithTriggerLabel
-> IO ()
forall (m :: * -> *).
MonadIO m =>
IO GranularPrometheusMetricsState
-> Bool
-> CounterVector EventStatusWithTriggerLabel
-> EventStatusWithTriggerLabel
-> m ()
incEventTriggerCounterWithLabel
                            IO GranularPrometheusMetricsState
getPrometheusMetricsGranularity
                            Bool
True
                            (EventTriggerMetrics -> CounterVector EventStatusWithTriggerLabel
eventInvocationTotal EventTriggerMetrics
eventTriggerMetrics)
                            (EventStatusLabel
-> Maybe DynamicEventTriggerLabel -> EventStatusWithTriggerLabel
EventStatusWithTriggerLabel EventStatusLabel
eventFailedLabel (DynamicEventTriggerLabel -> Maybe DynamicEventTriggerLabel
forall a. a -> Maybe a
Just (TriggerName -> SourceName -> DynamicEventTriggerLabel
DynamicEventTriggerLabel (EventTriggerInfo b -> TriggerName
forall (b :: BackendType). EventTriggerInfo b -> TriggerName
etiName EventTriggerInfo b
eti) SourceName
sourceName)))
                        case TransformableRequestError 'EventType
eventError of
                          (HTTPError Value
reqBody HTTPErr 'EventType
err) ->
                            forall (b :: BackendType) (m :: * -> *) (a :: TriggerTypes).
(MonadIO m, BackendEventTrigger b, MonadGetPolicies m) =>
SourceConfig b
-> Event b
-> RetryConf
-> [HeaderConf]
-> Value
-> MaintenanceMode MaintenanceModeVersion
-> EventTriggerMetrics
-> HTTPErr a
-> m (Either QErr ())
processError @b SourceConfig b
sourceConfig Event b
e RetryConf
retryConf [HeaderConf]
logHeaders Value
reqBody MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion EventTriggerMetrics
eventTriggerMetrics HTTPErr 'EventType
err io (Either QErr ()) -> (Either QErr () -> io ()) -> io ()
forall a b. io a -> (a -> io b) -> io b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Either QErr () -> (QErr -> io ()) -> io ())
-> (QErr -> io ()) -> Either QErr () -> io ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip Either QErr () -> (QErr -> io ()) -> io ()
forall (m :: * -> *) e a.
Applicative m =>
Either e a -> (e -> m a) -> m a
onLeft QErr -> io ()
forall r (m :: * -> *).
(MonadReader r m, Has (Logger Hasura) r, MonadIO m) =>
QErr -> m ()
logQErr
                          (TransformationError Value
_ TransformErrorBundle
err) -> do
                            Logger Hasura
-> forall a (m :: * -> *).
   (ToEngineLog a Hasura, MonadIO m) =>
   a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
   (ToEngineLog a impl, MonadIO m) =>
   a -> m ()
L.unLogger Logger Hasura
logger (UnstructuredLog -> io ()) -> UnstructuredLog -> io ()
forall a b. (a -> b) -> a -> b
$ LogLevel -> SerializableBlob -> UnstructuredLog
L.UnstructuredLog LogLevel
L.LevelError (ByteString -> SerializableBlob
SB.fromLBS (ByteString -> SerializableBlob) -> ByteString -> SerializableBlob
forall a b. (a -> b) -> a -> b
$ TransformErrorBundle -> ByteString
forall a. ToJSON a => a -> ByteString
J.encode TransformErrorBundle
err)

                            -- Record an Event Error
                            forall (b :: BackendType) (m :: * -> *).
(BackendEventTrigger b, MonadIO m) =>
SourceConfig b
-> Event b
-> Maybe (Invocation 'EventType)
-> ProcessEventError
-> MaintenanceMode MaintenanceModeVersion
-> m (Either QErr ())
recordError' @b SourceConfig b
sourceConfig Event b
e Maybe (Invocation 'EventType)
forall a. Maybe a
Nothing ProcessEventError
PESetError MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion io (Either QErr ()) -> (Either QErr () -> io ()) -> io ()
forall a b. io a -> (a -> io b) -> io b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Either QErr () -> (QErr -> io ()) -> io ())
-> (QErr -> io ()) -> Either QErr () -> io ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip Either QErr () -> (QErr -> io ()) -> io ()
forall (m :: * -> *) e a.
Applicative m =>
Either e a -> (e -> m a) -> m a
onLeft QErr -> io ()
forall r (m :: * -> *).
(MonadReader r m, Has (Logger Hasura) r, MonadIO m) =>
QErr -> m ()
logQErr
              -- Try to process the event trigger with a timeout of min(`uppserBoundEventTriggerTimeout`, event's response timeout),
              -- so that we're never blocked forever while processing a single event trigger.
              --
              -- If the request times out, then process it as an erroneous invocation and move on.
              Int -> io () -> io (Maybe ())
forall (m :: * -> *) a.
MonadBaseControl IO m =>
Int -> m a -> m (Maybe a)
timeout (Integer -> Int
forall a. Num a => Integer -> a
fromInteger (DiffTime -> Integer
diffTimeToMicroSeconds DiffTime
eventTriggerProcessingTimeout)) io ()
eventTriggerProcessAction
                io (Maybe ()) -> io () -> io ()
forall (m :: * -> *) a. Monad m => m (Maybe a) -> m a -> m a
`onNothingM` do
                  let eventTriggerTimeoutMessage :: Text
eventTriggerTimeoutMessage = Text
"Event Trigger " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> EventTriggerInfo b -> TriggerName
forall (b :: BackendType). EventTriggerInfo b -> TriggerName
etiName EventTriggerInfo b
eti TriggerName -> Text -> Text
forall t. ToTxt t => t -> Text -> Text
<<> Text
" timed out while processing."
                      eventTriggerTimeoutError :: QErr
eventTriggerTimeoutError = Code -> Text -> QErr
err500 Code
TimeoutErrorCode Text
eventTriggerTimeoutMessage
                  Logger Hasura
-> forall a (m :: * -> *).
   (ToEngineLog a Hasura, MonadIO m) =>
   a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
   (ToEngineLog a impl, MonadIO m) =>
   a -> m ()
L.unLogger Logger Hasura
logger (EventInternalErr -> io ()) -> EventInternalErr -> io ()
forall a b. (a -> b) -> a -> b
$ QErr -> EventInternalErr
EventInternalErr QErr
eventTriggerTimeoutError
                  forall (b :: BackendType) (m :: * -> *) (a :: TriggerTypes).
(MonadIO m, BackendEventTrigger b, MonadGetPolicies m) =>
SourceConfig b
-> Event b
-> RetryConf
-> [HeaderConf]
-> Value
-> MaintenanceMode MaintenanceModeVersion
-> EventTriggerMetrics
-> HTTPErr a
-> m (Either QErr ())
processError @b SourceConfig b
sourceConfig Event b
e RetryConf
retryConf [HeaderConf]
logHeaders Value
J.Null MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion EventTriggerMetrics
eventTriggerMetrics (String -> HTTPErr Any
forall (a :: TriggerTypes). String -> HTTPErr a
HOther (String -> HTTPErr Any) -> String -> HTTPErr Any
forall a b. (a -> b) -> a -> b
$ Text -> String
T.unpack Text
eventTriggerTimeoutMessage)
                    io (Either QErr ()) -> (Either QErr () -> io ()) -> io ()
forall a b. io a -> (a -> io b) -> io b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Either QErr () -> (QErr -> io ()) -> io ())
-> (QErr -> io ()) -> Either QErr () -> io ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip Either QErr () -> (QErr -> io ()) -> io ()
forall (m :: * -> *) e a.
Applicative m =>
Either e a -> (e -> m a) -> m a
onLeft QErr -> io ()
forall r (m :: * -> *).
(MonadReader r m, Has (Logger Hasura) r, MonadIO m) =>
QErr -> m ()
logQErr

      -- removing an event from the _eeCtxLockedEvents after the event has been processed:
      SourceName
-> EventId -> TVar (HashMap SourceName (Set EventId)) -> io ()
forall (m :: * -> *).
MonadIO m =>
SourceName
-> EventId -> TVar (HashMap SourceName (Set EventId)) -> m ()
removeEventTriggerEventFromLockedEvents SourceName
sourceName (Event b -> EventId
forall (b :: BackendType). Event b -> EventId
eId Event b
e) TVar (HashMap SourceName (Set EventId))
leEvents

createEventPayload :: RetryConf -> Event b -> EventPayload b
createEventPayload :: forall (b :: BackendType). RetryConf -> Event b -> EventPayload b
createEventPayload RetryConf
retryConf Event b
e =
  EventPayload
    { epId :: EventId
epId = Event b -> EventId
forall (b :: BackendType). Event b -> EventId
eId Event b
e,
      epTable :: TableName b
epTable = Event b -> TableName b
forall (b :: BackendType). Event b -> TableName b
eTable Event b
e,
      epTrigger :: TriggerMetadata
epTrigger = Event b -> TriggerMetadata
forall (b :: BackendType). Event b -> TriggerMetadata
eTrigger Event b
e,
      epEvent :: Value
epEvent = Event b -> Value
forall (b :: BackendType). Event b -> Value
eEvent Event b
e,
      epDeliveryInfo :: DeliveryInfo
epDeliveryInfo =
        DeliveryInfo
          { diCurrentRetry :: Int
diCurrentRetry = Event b -> Int
forall (b :: BackendType). Event b -> Int
eTries Event b
e,
            diMaxRetries :: Int
diMaxRetries = RetryConf -> Int
rcNumRetries RetryConf
retryConf
          },
      epCreatedAt :: LocalTime
epCreatedAt = Event b -> LocalTime
forall (b :: BackendType). Event b -> LocalTime
eCreatedAt Event b
e
    }

processSuccess ::
  forall b m a.
  (MonadIO m, BackendEventTrigger b) =>
  SourceConfig b ->
  Event b ->
  [HeaderConf] ->
  J.Value ->
  MaintenanceMode MaintenanceModeVersion ->
  HTTPResp a ->
  m (Either QErr ())
processSuccess :: forall (b :: BackendType) (m :: * -> *) (a :: TriggerTypes).
(MonadIO m, BackendEventTrigger b) =>
SourceConfig b
-> Event b
-> [HeaderConf]
-> Value
-> MaintenanceMode MaintenanceModeVersion
-> HTTPResp a
-> m (Either QErr ())
processSuccess SourceConfig b
sourceConfig Event b
e [HeaderConf]
reqHeaders Value
ep MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion HTTPResp a
resp = do
  let respBody :: SerializableBlob
respBody = HTTPResp a -> SerializableBlob
forall (a :: TriggerTypes). HTTPResp a -> SerializableBlob
hrsBody HTTPResp a
resp
      respHeaders :: [HeaderConf]
respHeaders = HTTPResp a -> [HeaderConf]
forall (a :: TriggerTypes). HTTPResp a -> [HeaderConf]
hrsHeaders HTTPResp a
resp
      respStatus :: Int
respStatus = HTTPResp a -> Int
forall (a :: TriggerTypes). HTTPResp a -> Int
hrsStatus HTTPResp a
resp
      eid :: EventId
eid = Event b -> EventId
forall (b :: BackendType). Event b -> EventId
eId Event b
e
      invocation :: Invocation 'EventType
invocation = EventId
-> Value
-> Maybe Int
-> [HeaderConf]
-> SerializableBlob
-> [HeaderConf]
-> Invocation 'EventType
mkInvocation EventId
eid Value
ep (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
respStatus) [HeaderConf]
reqHeaders SerializableBlob
respBody [HeaderConf]
respHeaders
  forall (b :: BackendType) (m :: * -> *).
(BackendEventTrigger b, MonadIO m) =>
SourceConfig b
-> Event b
-> Invocation 'EventType
-> MaintenanceMode MaintenanceModeVersion
-> m (Either QErr ())
recordSuccess @b SourceConfig b
sourceConfig Event b
e Invocation 'EventType
invocation MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion

processError ::
  forall b m a.
  ( MonadIO m,
    BackendEventTrigger b,
    MonadGetPolicies m
  ) =>
  SourceConfig b ->
  Event b ->
  RetryConf ->
  [HeaderConf] ->
  J.Value ->
  MaintenanceMode MaintenanceModeVersion ->
  EventTriggerMetrics ->
  HTTPErr a ->
  m (Either QErr ())
processError :: forall (b :: BackendType) (m :: * -> *) (a :: TriggerTypes).
(MonadIO m, BackendEventTrigger b, MonadGetPolicies m) =>
SourceConfig b
-> Event b
-> RetryConf
-> [HeaderConf]
-> Value
-> MaintenanceMode MaintenanceModeVersion
-> EventTriggerMetrics
-> HTTPErr a
-> m (Either QErr ())
processError SourceConfig b
sourceConfig Event b
e RetryConf
retryConf [HeaderConf]
reqHeaders Value
ep MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion EventTriggerMetrics
eventTriggerMetrics HTTPErr a
err = do
  let invocation :: Invocation 'EventType
invocation = case HTTPErr a
err of
        HClient HttpException
httpException ->
          let statusMaybe :: Maybe Int
statusMaybe = HttpException -> Maybe Int
getHTTPExceptionStatus HttpException
httpException
           in EventId
-> Value
-> Maybe Int
-> [HeaderConf]
-> SerializableBlob
-> [HeaderConf]
-> Invocation 'EventType
mkInvocation (Event b -> EventId
forall (b :: BackendType). Event b -> EventId
eId Event b
e) Value
ep Maybe Int
statusMaybe [HeaderConf]
reqHeaders (ByteString -> SerializableBlob
SB.fromLBS (HttpException -> ByteString
httpExceptionErrorEncoding HttpException
httpException)) []
        HStatus HTTPResp a
errResp -> do
          let respPayload :: SerializableBlob
respPayload = HTTPResp a -> SerializableBlob
forall (a :: TriggerTypes). HTTPResp a -> SerializableBlob
hrsBody HTTPResp a
errResp
              respHeaders :: [HeaderConf]
respHeaders = HTTPResp a -> [HeaderConf]
forall (a :: TriggerTypes). HTTPResp a -> [HeaderConf]
hrsHeaders HTTPResp a
errResp
              respStatus :: Int
respStatus = HTTPResp a -> Int
forall (a :: TriggerTypes). HTTPResp a -> Int
hrsStatus HTTPResp a
errResp
          EventId
-> Value
-> Maybe Int
-> [HeaderConf]
-> SerializableBlob
-> [HeaderConf]
-> Invocation 'EventType
mkInvocation (Event b -> EventId
forall (b :: BackendType). Event b -> EventId
eId Event b
e) Value
ep (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
respStatus) [HeaderConf]
reqHeaders SerializableBlob
respPayload [HeaderConf]
respHeaders
        HOther String
detail -> do
          let errMsg :: SerializableBlob
errMsg = ByteString -> SerializableBlob
SB.fromLBS (ByteString -> SerializableBlob) -> ByteString -> SerializableBlob
forall a b. (a -> b) -> a -> b
$ String -> ByteString
forall a. ToJSON a => a -> ByteString
J.encode String
detail
          EventId
-> Value
-> Maybe Int
-> [HeaderConf]
-> SerializableBlob
-> [HeaderConf]
-> Invocation 'EventType
mkInvocation (Event b -> EventId
forall (b :: BackendType). Event b -> EventId
eId Event b
e) Value
ep (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
500) [HeaderConf]
reqHeaders SerializableBlob
errMsg []
  ProcessEventError
retryOrError <- Event b
-> RetryConf
-> EventTriggerMetrics
-> HTTPErr a
-> m ProcessEventError
forall (m :: * -> *) (b :: BackendType) (a :: TriggerTypes).
(MonadIO m, MonadGetPolicies m) =>
Event b
-> RetryConf
-> EventTriggerMetrics
-> HTTPErr a
-> m ProcessEventError
retryOrSetError Event b
e RetryConf
retryConf EventTriggerMetrics
eventTriggerMetrics HTTPErr a
err
  forall (b :: BackendType) (m :: * -> *).
(BackendEventTrigger b, MonadIO m) =>
SourceConfig b
-> Event b
-> Invocation 'EventType
-> ProcessEventError
-> MaintenanceMode MaintenanceModeVersion
-> m (Either QErr ())
recordError @b SourceConfig b
sourceConfig Event b
e Invocation 'EventType
invocation ProcessEventError
retryOrError MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion

retryOrSetError ::
  ( MonadIO m,
    MonadGetPolicies m
  ) =>
  Event b ->
  RetryConf ->
  EventTriggerMetrics ->
  HTTPErr a ->
  m ProcessEventError
retryOrSetError :: forall (m :: * -> *) (b :: BackendType) (a :: TriggerTypes).
(MonadIO m, MonadGetPolicies m) =>
Event b
-> RetryConf
-> EventTriggerMetrics
-> HTTPErr a
-> m ProcessEventError
retryOrSetError Event b
e RetryConf
retryConf EventTriggerMetrics
eventTriggerMetrics HTTPErr a
err = do
  IO GranularPrometheusMetricsState
getPrometheusMetricsGranularity <- m (IO GranularPrometheusMetricsState)
forall (m :: * -> *).
MonadGetPolicies m =>
m (IO GranularPrometheusMetricsState)
runGetPrometheusMetricsGranularity
  let mretryHeader :: Maybe Text
mretryHeader = HTTPErr a -> Maybe Text
forall {a :: TriggerTypes}. HTTPErr a -> Maybe Text
getRetryAfterHeaderFromError HTTPErr a
err
      tries :: Int
tries = Event b -> Int
forall (b :: BackendType). Event b -> Int
eTries Event b
e
      mretryHeaderSeconds :: Maybe Int
mretryHeaderSeconds = Maybe Text
mretryHeader Maybe Text -> (Text -> Maybe Int) -> Maybe Int
forall a b. Maybe a -> (a -> Maybe b) -> Maybe b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Text -> Maybe Int
parseRetryHeader
      triesExhausted :: Bool
triesExhausted = Int
tries Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= RetryConf -> Int
rcNumRetries RetryConf
retryConf
      noRetryHeader :: Bool
noRetryHeader = Maybe Int -> Bool
forall a. Maybe a -> Bool
isNothing Maybe Int
mretryHeaderSeconds
  -- current_try = tries + 1 , allowed_total_tries = rcNumRetries retryConf + 1
  if Bool
triesExhausted Bool -> Bool -> Bool
&& Bool
noRetryHeader
    then do
      IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
        (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IO GranularPrometheusMetricsState
-> Bool
-> CounterVector EventStatusWithTriggerLabel
-> EventStatusWithTriggerLabel
-> IO ()
forall (m :: * -> *).
MonadIO m =>
IO GranularPrometheusMetricsState
-> Bool
-> CounterVector EventStatusWithTriggerLabel
-> EventStatusWithTriggerLabel
-> m ()
incEventTriggerCounterWithLabel
          IO GranularPrometheusMetricsState
getPrometheusMetricsGranularity
          Bool
True
          (EventTriggerMetrics -> CounterVector EventStatusWithTriggerLabel
eventProcessedTotal EventTriggerMetrics
eventTriggerMetrics)
          (EventStatusLabel
-> Maybe DynamicEventTriggerLabel -> EventStatusWithTriggerLabel
EventStatusWithTriggerLabel EventStatusLabel
eventFailedLabel (DynamicEventTriggerLabel -> Maybe DynamicEventTriggerLabel
forall a. a -> Maybe a
Just (TriggerName -> SourceName -> DynamicEventTriggerLabel
DynamicEventTriggerLabel (TriggerMetadata -> TriggerName
tmName (Event b -> TriggerMetadata
forall (b :: BackendType). Event b -> TriggerMetadata
eTrigger Event b
e)) (Event b -> SourceName
forall (b :: BackendType). Event b -> SourceName
eSource Event b
e))))
      ProcessEventError -> m ProcessEventError
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ProcessEventError
PESetError
    else do
      UTCTime
currentTime <- IO UTCTime -> m UTCTime
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
      let delay :: Int
delay = Int -> Maybe Int -> Int
forall a. a -> Maybe a -> a
fromMaybe (RetryConf -> Int
rcIntervalSec RetryConf
retryConf) Maybe Int
mretryHeaderSeconds
          diff :: NominalDiffTime
diff = Int -> NominalDiffTime
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
delay
          retryTime :: UTCTime
retryTime = NominalDiffTime -> UTCTime -> UTCTime
addUTCTime NominalDiffTime
diff UTCTime
currentTime
      ProcessEventError -> m ProcessEventError
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ProcessEventError -> m ProcessEventError)
-> ProcessEventError -> m ProcessEventError
forall a b. (a -> b) -> a -> b
$ UTCTime -> ProcessEventError
PESetRetry UTCTime
retryTime
  where
    getRetryAfterHeaderFromError :: HTTPErr a -> Maybe Text
getRetryAfterHeaderFromError (HStatus HTTPResp a
resp) = HTTPResp a -> Maybe Text
forall (a :: TriggerTypes). HTTPResp a -> Maybe Text
getRetryAfterHeaderFromResp HTTPResp a
resp
    getRetryAfterHeaderFromError HTTPErr a
_ = Maybe Text
forall a. Maybe a
Nothing

    parseRetryHeader :: Text -> Maybe Int
parseRetryHeader = (Int -> Bool) -> Maybe Int -> Maybe Int
forall (m :: * -> *) a. MonadPlus m => (a -> Bool) -> m a -> m a
mfilter (Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0) (Maybe Int -> Maybe Int)
-> (Text -> Maybe Int) -> Text -> Maybe Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Maybe Int
forall a. Read a => String -> Maybe a
readMaybe (String -> Maybe Int) -> (Text -> String) -> Text -> Maybe Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> String
T.unpack

mkInvocation ::
  EventId ->
  J.Value ->
  Maybe Int ->
  [HeaderConf] ->
  SB.SerializableBlob ->
  [HeaderConf] ->
  Invocation 'EventType
mkInvocation :: EventId
-> Value
-> Maybe Int
-> [HeaderConf]
-> SerializableBlob
-> [HeaderConf]
-> Invocation 'EventType
mkInvocation EventId
eid Value
ep Maybe Int
statusMaybe [HeaderConf]
reqHeaders SerializableBlob
respBody [HeaderConf]
respHeaders =
  let resp :: Response 'EventType
resp =
        case Maybe Int
statusMaybe of
          Maybe Int
Nothing -> SerializableBlob -> Response 'EventType
forall (a :: TriggerTypes). SerializableBlob -> Response a
mkClientErr SerializableBlob
respBody
          Just Int
status ->
            if Int
status Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
200 Bool -> Bool -> Bool
&& Int
status Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
300
              then Int -> SerializableBlob -> [HeaderConf] -> Response 'EventType
forall (a :: TriggerTypes).
Int -> SerializableBlob -> [HeaderConf] -> Response a
mkResp Int
status SerializableBlob
respBody [HeaderConf]
respHeaders
              else SerializableBlob -> Response 'EventType
forall (a :: TriggerTypes). SerializableBlob -> Response a
mkClientErr SerializableBlob
respBody
   in EventId
-> Maybe Int
-> WebhookRequest
-> Response 'EventType
-> Invocation 'EventType
forall (a :: TriggerTypes).
EventId
-> Maybe Int -> WebhookRequest -> Response a -> Invocation a
Invocation
        EventId
eid
        Maybe Int
statusMaybe
        (Value -> [HeaderConf] -> Text -> WebhookRequest
mkWebhookReq Value
ep [HeaderConf]
reqHeaders Text
invocationVersionET)
        Response 'EventType
resp

logQErr :: (MonadReader r m, Has (L.Logger L.Hasura) r, MonadIO m) => QErr -> m ()
logQErr :: forall r (m :: * -> *).
(MonadReader r m, Has (Logger Hasura) r, MonadIO m) =>
QErr -> m ()
logQErr QErr
err = do
  Logger Hasura
logger :: L.Logger L.Hasura <- (r -> Logger Hasura) -> m (Logger Hasura)
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks r -> Logger Hasura
forall a t. Has a t => t -> a
getter
  Logger Hasura
-> forall a (m :: * -> *).
   (ToEngineLog a Hasura, MonadIO m) =>
   a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
   (ToEngineLog a impl, MonadIO m) =>
   a -> m ()
L.unLogger Logger Hasura
logger (EventInternalErr -> m ()) -> EventInternalErr -> m ()
forall a b. (a -> b) -> a -> b
$ QErr -> EventInternalErr
EventInternalErr QErr
err

getEventTriggerInfoFromEvent ::
  forall b. (Backend b) => SchemaCache -> Event b -> Either Text (EventTriggerInfo b)
getEventTriggerInfoFromEvent :: forall (b :: BackendType).
Backend b =>
SchemaCache -> Event b -> Either Text (EventTriggerInfo b)
getEventTriggerInfoFromEvent SchemaCache
sc Event b
e = do
  let table :: TableName b
table = Event b -> TableName b
forall (b :: BackendType). Event b -> TableName b
eTable Event b
e
      mTableInfo :: Maybe (TableInfo b)
mTableInfo = forall (b :: BackendType).
Backend b =>
SourceName -> TableName b -> SourceCache -> Maybe (TableInfo b)
unsafeTableInfo @b (Event b -> SourceName
forall (b :: BackendType). Event b -> SourceName
eSource Event b
e) TableName b
table (SourceCache -> Maybe (TableInfo b))
-> SourceCache -> Maybe (TableInfo b)
forall a b. (a -> b) -> a -> b
$ SchemaCache -> SourceCache
scSources SchemaCache
sc
  TableInfo b
tableInfo <- Maybe (TableInfo b)
-> Either Text (TableInfo b) -> Either Text (TableInfo b)
forall (m :: * -> *) a. Applicative m => Maybe a -> m a -> m a
onNothing Maybe (TableInfo b)
mTableInfo (Either Text (TableInfo b) -> Either Text (TableInfo b))
-> Either Text (TableInfo b) -> Either Text (TableInfo b)
forall a b. (a -> b) -> a -> b
$ Text -> Either Text (TableInfo b)
forall a b. a -> Either a b
Left (Text
"table '" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> TableName b
table TableName b -> Text -> Text
forall t. ToTxt t => t -> Text -> Text
<<> Text
"' not found")
  let triggerName :: TriggerName
triggerName = TriggerMetadata -> TriggerName
tmName (TriggerMetadata -> TriggerName) -> TriggerMetadata -> TriggerName
forall a b. (a -> b) -> a -> b
$ Event b -> TriggerMetadata
forall (b :: BackendType). Event b -> TriggerMetadata
eTrigger Event b
e
      mEventTriggerInfo :: Maybe (EventTriggerInfo b)
mEventTriggerInfo = TriggerName
-> HashMap TriggerName (EventTriggerInfo b)
-> Maybe (EventTriggerInfo b)
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HashMap.lookup TriggerName
triggerName (TableInfo b -> HashMap TriggerName (EventTriggerInfo b)
forall (b :: BackendType). TableInfo b -> EventTriggerInfoMap b
_tiEventTriggerInfoMap TableInfo b
tableInfo)
  Maybe (EventTriggerInfo b)
-> Either Text (EventTriggerInfo b)
-> Either Text (EventTriggerInfo b)
forall (m :: * -> *) a. Applicative m => Maybe a -> m a -> m a
onNothing Maybe (EventTriggerInfo b)
mEventTriggerInfo
    (Either Text (EventTriggerInfo b)
 -> Either Text (EventTriggerInfo b))
-> Either Text (EventTriggerInfo b)
-> Either Text (EventTriggerInfo b)
forall a b. (a -> b) -> a -> b
$ Text -> Either Text (EventTriggerInfo b)
forall a b. a -> Either a b
Left
      ( Text
"event trigger '"
          Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> TriggerName -> Text
triggerNameToTxt TriggerName
triggerName
          Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"' on table '"
          Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> TableName b
table
          TableName b -> Text -> Text
forall t. ToTxt t => t -> Text -> Text
<<> Text
"' not found"
      )

incEventTriggerCounterWithLabel ::
  (MonadIO m) =>
  (IO GranularPrometheusMetricsState) ->
  -- should the metric be observed without a label when granularMetricsState is OFF
  Bool ->
  CounterVector EventStatusWithTriggerLabel ->
  EventStatusWithTriggerLabel ->
  m ()
incEventTriggerCounterWithLabel :: forall (m :: * -> *).
MonadIO m =>
IO GranularPrometheusMetricsState
-> Bool
-> CounterVector EventStatusWithTriggerLabel
-> EventStatusWithTriggerLabel
-> m ()
incEventTriggerCounterWithLabel IO GranularPrometheusMetricsState
getMetricState Bool
alwaysObserve CounterVector EventStatusWithTriggerLabel
counterVector (EventStatusWithTriggerLabel EventStatusLabel
status Maybe DynamicEventTriggerLabel
tl) = do
  IO GranularPrometheusMetricsState -> Bool -> IO () -> IO () -> m ()
forall (m :: * -> *).
MonadIO m =>
IO GranularPrometheusMetricsState -> Bool -> IO () -> IO () -> m ()
recordMetricWithLabel
    IO GranularPrometheusMetricsState
getMetricState
    Bool
alwaysObserve
    (IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ CounterVector EventStatusWithTriggerLabel
-> EventStatusWithTriggerLabel -> IO ()
forall label. Ord label => CounterVector label -> label -> IO ()
CounterVector.inc CounterVector EventStatusWithTriggerLabel
counterVector (EventStatusLabel
-> Maybe DynamicEventTriggerLabel -> EventStatusWithTriggerLabel
EventStatusWithTriggerLabel EventStatusLabel
status Maybe DynamicEventTriggerLabel
tl))
    (IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ CounterVector EventStatusWithTriggerLabel
-> EventStatusWithTriggerLabel -> IO ()
forall label. Ord label => CounterVector label -> label -> IO ()
CounterVector.inc CounterVector EventStatusWithTriggerLabel
counterVector (EventStatusLabel
-> Maybe DynamicEventTriggerLabel -> EventStatusWithTriggerLabel
EventStatusWithTriggerLabel EventStatusLabel
status Maybe DynamicEventTriggerLabel
forall a. Maybe a
Nothing))