module Hasura.Eventing.EventTrigger
( initEventEngineCtx,
createFetchedEventsStatsLogger,
closeFetchedEventsStatsLogger,
processEventQueue,
defaultMaxEventThreads,
defaultFetchInterval,
Event (..),
EventEngineCtx (..),
saveLockedEventTriggerEvents,
removeEventTriggerEventFromLockedEvents,
logQErr,
)
where
import Control.Concurrent.Async.Lifted.Safe qualified as LA
import Control.Concurrent.Extended (Forever (..), sleep)
import Control.Concurrent.STM.TVar
import Control.FoldDebounce qualified as FDebounce
import Control.Lens
import Control.Monad.Catch (MonadMask, bracket_, finally, mask_)
import Control.Monad.STM
import Control.Monad.Trans.Control (MonadBaseControl)
import Data.Aeson qualified as J
import Data.Aeson.Key qualified as Key
import Data.Aeson.KeyMap qualified as KeyMap
import Data.Aeson.Lens qualified as JL
import Data.Has
import Data.HashMap.Strict qualified as HashMap
import Data.SerializableBlob qualified as SB
import Data.Set qualified as Set
import Data.String
import Data.Text qualified as T
import Data.Text.Extended
import Data.Text.NonEmpty
import Data.Time qualified as Time
import Data.Time.Clock
import Hasura.Backends.Postgres.SQL.Types hiding (TableName)
import Hasura.Base.Error
import Hasura.Eventing.Backend
import Hasura.Eventing.Common
import Hasura.Eventing.HTTP
import Hasura.HTTP (getHTTPExceptionStatus)
import Hasura.Logging qualified as L
import Hasura.Prelude
import Hasura.RQL.DDL.Webhook.Transform
import Hasura.RQL.Types.Backend
import Hasura.RQL.Types.BackendType
import Hasura.RQL.Types.Common
import Hasura.RQL.Types.EventTrigger
import Hasura.RQL.Types.SchemaCache
import Hasura.RQL.Types.Source
import Hasura.SQL.AnyBackend qualified as AB
import Hasura.Server.Metrics (ServerMetrics (..))
import Hasura.Server.Prometheus
import Hasura.Server.Types
import Hasura.Tracing qualified as Tracing
import Network.HTTP.Client.Transformable qualified as HTTP
import Refined (NonNegative, Positive, Refined, unrefine)
import Refined.Unsafe (unsafeRefine)
import System.Metrics.Distribution qualified as EKG.Distribution
import System.Metrics.Gauge qualified as EKG.Gauge
import System.Metrics.Prometheus.Counter qualified as Prometheus.Counter
import System.Metrics.Prometheus.CounterVector (CounterVector)
import System.Metrics.Prometheus.CounterVector qualified as CounterVector
import System.Metrics.Prometheus.Gauge qualified as Prometheus.Gauge
import System.Metrics.Prometheus.Histogram qualified as Prometheus.Histogram
import System.Timeout.Lifted (timeout)
newtype EventInternalErr
= EventInternalErr QErr
deriving (EventInternalErr -> EventInternalErr -> Bool
(EventInternalErr -> EventInternalErr -> Bool)
-> (EventInternalErr -> EventInternalErr -> Bool)
-> Eq EventInternalErr
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: EventInternalErr -> EventInternalErr -> Bool
== :: EventInternalErr -> EventInternalErr -> Bool
$c/= :: EventInternalErr -> EventInternalErr -> Bool
/= :: EventInternalErr -> EventInternalErr -> Bool
Eq)
instance L.ToEngineLog EventInternalErr L.Hasura where
toEngineLog :: EventInternalErr -> (LogLevel, EngineLogType Hasura, Value)
toEngineLog (EventInternalErr QErr
qerr) = (LogLevel
L.LevelError, EngineLogType Hasura
L.eventTriggerLogType, QErr -> Value
forall a. ToJSON a => a -> Value
J.toJSON QErr
qerr)
data EventEngineCtx = EventEngineCtx
{ EventEngineCtx -> TVar Int
_eeCtxEventThreadsCapacity :: TVar Int,
EventEngineCtx -> DiffTime
_eeCtxFetchInterval :: DiffTime,
EventEngineCtx -> Refined NonNegative Int
_eeCtxFetchSize :: Refined NonNegative Int
}
data DeliveryInfo = DeliveryInfo
{ DeliveryInfo -> Int
diCurrentRetry :: Int,
DeliveryInfo -> Int
diMaxRetries :: Int
}
deriving (Int -> DeliveryInfo -> ShowS
[DeliveryInfo] -> ShowS
DeliveryInfo -> String
(Int -> DeliveryInfo -> ShowS)
-> (DeliveryInfo -> String)
-> ([DeliveryInfo] -> ShowS)
-> Show DeliveryInfo
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> DeliveryInfo -> ShowS
showsPrec :: Int -> DeliveryInfo -> ShowS
$cshow :: DeliveryInfo -> String
show :: DeliveryInfo -> String
$cshowList :: [DeliveryInfo] -> ShowS
showList :: [DeliveryInfo] -> ShowS
Show, (forall x. DeliveryInfo -> Rep DeliveryInfo x)
-> (forall x. Rep DeliveryInfo x -> DeliveryInfo)
-> Generic DeliveryInfo
forall x. Rep DeliveryInfo x -> DeliveryInfo
forall x. DeliveryInfo -> Rep DeliveryInfo x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. DeliveryInfo -> Rep DeliveryInfo x
from :: forall x. DeliveryInfo -> Rep DeliveryInfo x
$cto :: forall x. Rep DeliveryInfo x -> DeliveryInfo
to :: forall x. Rep DeliveryInfo x -> DeliveryInfo
Generic, DeliveryInfo -> DeliveryInfo -> Bool
(DeliveryInfo -> DeliveryInfo -> Bool)
-> (DeliveryInfo -> DeliveryInfo -> Bool) -> Eq DeliveryInfo
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: DeliveryInfo -> DeliveryInfo -> Bool
== :: DeliveryInfo -> DeliveryInfo -> Bool
$c/= :: DeliveryInfo -> DeliveryInfo -> Bool
/= :: DeliveryInfo -> DeliveryInfo -> Bool
Eq)
instance J.ToJSON DeliveryInfo where
toJSON :: DeliveryInfo -> Value
toJSON = Options -> DeliveryInfo -> Value
forall a.
(Generic a, GToJSON' Value Zero (Rep a)) =>
Options -> a -> Value
J.genericToJSON Options
hasuraJSON {omitNothingFields :: Bool
J.omitNothingFields = Bool
True}
toEncoding :: DeliveryInfo -> Encoding
toEncoding = Options -> DeliveryInfo -> Encoding
forall a.
(Generic a, GToJSON' Encoding Zero (Rep a)) =>
Options -> a -> Encoding
J.genericToEncoding Options
hasuraJSON {omitNothingFields :: Bool
J.omitNothingFields = Bool
True}
newtype QualifiedTableStrict = QualifiedTableStrict
{ QualifiedTableStrict -> QualifiedTable
getQualifiedTable :: QualifiedTable
}
deriving (Int -> QualifiedTableStrict -> ShowS
[QualifiedTableStrict] -> ShowS
QualifiedTableStrict -> String
(Int -> QualifiedTableStrict -> ShowS)
-> (QualifiedTableStrict -> String)
-> ([QualifiedTableStrict] -> ShowS)
-> Show QualifiedTableStrict
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> QualifiedTableStrict -> ShowS
showsPrec :: Int -> QualifiedTableStrict -> ShowS
$cshow :: QualifiedTableStrict -> String
show :: QualifiedTableStrict -> String
$cshowList :: [QualifiedTableStrict] -> ShowS
showList :: [QualifiedTableStrict] -> ShowS
Show, QualifiedTableStrict -> QualifiedTableStrict -> Bool
(QualifiedTableStrict -> QualifiedTableStrict -> Bool)
-> (QualifiedTableStrict -> QualifiedTableStrict -> Bool)
-> Eq QualifiedTableStrict
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: QualifiedTableStrict -> QualifiedTableStrict -> Bool
== :: QualifiedTableStrict -> QualifiedTableStrict -> Bool
$c/= :: QualifiedTableStrict -> QualifiedTableStrict -> Bool
/= :: QualifiedTableStrict -> QualifiedTableStrict -> Bool
Eq)
instance J.ToJSON QualifiedTableStrict where
toJSON :: QualifiedTableStrict -> Value
toJSON (QualifiedTableStrict (QualifiedObject SchemaName
sn TableName
tn)) =
[Pair] -> Value
J.object
[ Key
"schema" Key -> SchemaName -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
J..= SchemaName
sn,
Key
"name" Key -> TableName -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
J..= TableName
tn
]
data EventPayload (b :: BackendType) = EventPayload
{ forall (b :: BackendType). EventPayload b -> EventId
epId :: EventId,
forall (b :: BackendType). EventPayload b -> TableName b
epTable :: TableName b,
forall (b :: BackendType). EventPayload b -> TriggerMetadata
epTrigger :: TriggerMetadata,
forall (b :: BackendType). EventPayload b -> Value
epEvent :: J.Value,
forall (b :: BackendType). EventPayload b -> DeliveryInfo
epDeliveryInfo :: DeliveryInfo,
forall (b :: BackendType). EventPayload b -> LocalTime
epCreatedAt :: Time.LocalTime
}
deriving ((forall x. EventPayload b -> Rep (EventPayload b) x)
-> (forall x. Rep (EventPayload b) x -> EventPayload b)
-> Generic (EventPayload b)
forall x. Rep (EventPayload b) x -> EventPayload b
forall x. EventPayload b -> Rep (EventPayload b) x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
forall (b :: BackendType) x.
Rep (EventPayload b) x -> EventPayload b
forall (b :: BackendType) x.
EventPayload b -> Rep (EventPayload b) x
$cfrom :: forall (b :: BackendType) x.
EventPayload b -> Rep (EventPayload b) x
from :: forall x. EventPayload b -> Rep (EventPayload b) x
$cto :: forall (b :: BackendType) x.
Rep (EventPayload b) x -> EventPayload b
to :: forall x. Rep (EventPayload b) x -> EventPayload b
Generic)
deriving instance (Backend b) => Show (EventPayload b)
deriving instance (Backend b) => Eq (EventPayload b)
instance (Backend b) => J.ToJSON (EventPayload b) where
toJSON :: EventPayload b -> Value
toJSON = Options -> EventPayload b -> Value
forall a.
(Generic a, GToJSON' Value Zero (Rep a)) =>
Options -> a -> Value
J.genericToJSON Options
hasuraJSON {omitNothingFields :: Bool
J.omitNothingFields = Bool
True}
defaultMaxEventThreads :: Refined Positive Int
defaultMaxEventThreads :: Refined Positive Int
defaultMaxEventThreads = Int -> Refined Positive Int
forall {k} (p :: k) x. Predicate p x => x -> Refined p x
unsafeRefine Int
100
defaultFetchInterval :: DiffTime
defaultFetchInterval :: DiffTime
defaultFetchInterval = Seconds -> DiffTime
seconds Seconds
1
initEventEngineCtx :: (MonadIO m) => Refined Positive Int -> Refined NonNegative Milliseconds -> Refined NonNegative Int -> m EventEngineCtx
initEventEngineCtx :: forall (m :: * -> *).
MonadIO m =>
Refined Positive Int
-> Refined NonNegative Milliseconds
-> Refined NonNegative Int
-> m EventEngineCtx
initEventEngineCtx Refined Positive Int
maxThreads Refined NonNegative Milliseconds
fetchInterval Refined NonNegative Int
_eeCtxFetchSize = do
TVar Int
_eeCtxEventThreadsCapacity <- IO (TVar Int) -> m (TVar Int)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (TVar Int) -> m (TVar Int)) -> IO (TVar Int) -> m (TVar Int)
forall a b. (a -> b) -> a -> b
$ Int -> IO (TVar Int)
forall a. a -> IO (TVar a)
newTVarIO (Int -> IO (TVar Int)) -> Int -> IO (TVar Int)
forall a b. (a -> b) -> a -> b
$ Refined Positive Int -> Int
forall {k} (p :: k) x. Refined p x -> x
unrefine Refined Positive Int
maxThreads
let _eeCtxFetchInterval :: DiffTime
_eeCtxFetchInterval = Milliseconds -> DiffTime
milliseconds (Milliseconds -> DiffTime) -> Milliseconds -> DiffTime
forall a b. (a -> b) -> a -> b
$ Refined NonNegative Milliseconds -> Milliseconds
forall {k} (p :: k) x. Refined p x -> x
unrefine Refined NonNegative Milliseconds
fetchInterval
EventEngineCtx -> m EventEngineCtx
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return EventEngineCtx {TVar Int
DiffTime
Refined NonNegative Int
_eeCtxEventThreadsCapacity :: TVar Int
_eeCtxFetchInterval :: DiffTime
_eeCtxFetchSize :: Refined NonNegative Int
_eeCtxFetchSize :: Refined NonNegative Int
_eeCtxEventThreadsCapacity :: TVar Int
_eeCtxFetchInterval :: DiffTime
..}
saveLockedEventTriggerEvents :: (MonadIO m) => SourceName -> [EventId] -> TVar (HashMap SourceName (Set.Set EventId)) -> m ()
saveLockedEventTriggerEvents :: forall (m :: * -> *).
MonadIO m =>
SourceName
-> [EventId] -> TVar (HashMap SourceName (Set EventId)) -> m ()
saveLockedEventTriggerEvents SourceName
sourceName [EventId]
eventIds TVar (HashMap SourceName (Set EventId))
lockedEvents =
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
(IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically
(STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
HashMap SourceName (Set EventId)
lockedEventsVals <- TVar (HashMap SourceName (Set EventId))
-> STM (HashMap SourceName (Set EventId))
forall a. TVar a -> STM a
readTVar TVar (HashMap SourceName (Set EventId))
lockedEvents
case SourceName
-> HashMap SourceName (Set EventId) -> Maybe (Set EventId)
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HashMap.lookup SourceName
sourceName HashMap SourceName (Set EventId)
lockedEventsVals of
Maybe (Set EventId)
Nothing -> TVar (HashMap SourceName (Set EventId))
-> HashMap SourceName (Set EventId) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (HashMap SourceName (Set EventId))
lockedEvents (HashMap SourceName (Set EventId) -> STM ())
-> HashMap SourceName (Set EventId) -> STM ()
forall a b. (a -> b) -> a -> b
$! SourceName -> Set EventId -> HashMap SourceName (Set EventId)
forall k v. Hashable k => k -> v -> HashMap k v
HashMap.singleton SourceName
sourceName ([EventId] -> Set EventId
forall a. Ord a => [a] -> Set a
Set.fromList [EventId]
eventIds)
Just Set EventId
_ -> TVar (HashMap SourceName (Set EventId))
-> HashMap SourceName (Set EventId) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (HashMap SourceName (Set EventId))
lockedEvents (HashMap SourceName (Set EventId) -> STM ())
-> HashMap SourceName (Set EventId) -> STM ()
forall a b. (a -> b) -> a -> b
$! (Set EventId -> Set EventId -> Set EventId)
-> SourceName
-> Set EventId
-> HashMap SourceName (Set EventId)
-> HashMap SourceName (Set EventId)
forall k v.
(Eq k, Hashable k) =>
(v -> v -> v) -> k -> v -> HashMap k v -> HashMap k v
HashMap.insertWith Set EventId -> Set EventId -> Set EventId
forall a. Ord a => Set a -> Set a -> Set a
Set.union SourceName
sourceName ([EventId] -> Set EventId
forall a. Ord a => [a] -> Set a
Set.fromList [EventId]
eventIds) HashMap SourceName (Set EventId)
lockedEventsVals
removeEventTriggerEventFromLockedEvents ::
(MonadIO m) => SourceName -> EventId -> TVar (HashMap SourceName (Set.Set EventId)) -> m ()
removeEventTriggerEventFromLockedEvents :: forall (m :: * -> *).
MonadIO m =>
SourceName
-> EventId -> TVar (HashMap SourceName (Set EventId)) -> m ()
removeEventTriggerEventFromLockedEvents SourceName
sourceName EventId
eventId TVar (HashMap SourceName (Set EventId))
lockedEvents =
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
(IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically
(STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
HashMap SourceName (Set EventId)
lockedEventsVals <- TVar (HashMap SourceName (Set EventId))
-> STM (HashMap SourceName (Set EventId))
forall a. TVar a -> STM a
readTVar TVar (HashMap SourceName (Set EventId))
lockedEvents
TVar (HashMap SourceName (Set EventId))
-> HashMap SourceName (Set EventId) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (HashMap SourceName (Set EventId))
lockedEvents (HashMap SourceName (Set EventId) -> STM ())
-> HashMap SourceName (Set EventId) -> STM ()
forall a b. (a -> b) -> a -> b
$! (Set EventId -> Set EventId)
-> SourceName
-> HashMap SourceName (Set EventId)
-> HashMap SourceName (Set EventId)
forall k v.
(Eq k, Hashable k) =>
(v -> v) -> k -> HashMap k v -> HashMap k v
HashMap.adjust (EventId -> Set EventId -> Set EventId
forall a. Ord a => a -> Set a -> Set a
Set.delete EventId
eventId) SourceName
sourceName HashMap SourceName (Set EventId)
lockedEventsVals
type BackendEventWithSource = AB.AnyBackend EventWithSource
type FetchEventArguments = ([BackendEventWithSource], Int, Bool)
newtype EventsCount = EventsCount {EventsCount -> Int
unEventsCount :: Int}
deriving (EventsCount -> EventsCount -> Bool
(EventsCount -> EventsCount -> Bool)
-> (EventsCount -> EventsCount -> Bool) -> Eq EventsCount
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: EventsCount -> EventsCount -> Bool
== :: EventsCount -> EventsCount -> Bool
$c/= :: EventsCount -> EventsCount -> Bool
/= :: EventsCount -> EventsCount -> Bool
Eq, Int -> EventsCount -> ShowS
[EventsCount] -> ShowS
EventsCount -> String
(Int -> EventsCount -> ShowS)
-> (EventsCount -> String)
-> ([EventsCount] -> ShowS)
-> Show EventsCount
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> EventsCount -> ShowS
showsPrec :: Int -> EventsCount -> ShowS
$cshow :: EventsCount -> String
show :: EventsCount -> String
$cshowList :: [EventsCount] -> ShowS
showList :: [EventsCount] -> ShowS
Show, [EventsCount] -> Value
[EventsCount] -> Encoding
EventsCount -> Value
EventsCount -> Encoding
(EventsCount -> Value)
-> (EventsCount -> Encoding)
-> ([EventsCount] -> Value)
-> ([EventsCount] -> Encoding)
-> ToJSON EventsCount
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> ToJSON a
$ctoJSON :: EventsCount -> Value
toJSON :: EventsCount -> Value
$ctoEncoding :: EventsCount -> Encoding
toEncoding :: EventsCount -> Encoding
$ctoJSONList :: [EventsCount] -> Value
toJSONList :: [EventsCount] -> Value
$ctoEncodingList :: [EventsCount] -> Encoding
toEncodingList :: [EventsCount] -> Encoding
J.ToJSON, Value -> Parser [EventsCount]
Value -> Parser EventsCount
(Value -> Parser EventsCount)
-> (Value -> Parser [EventsCount]) -> FromJSON EventsCount
forall a.
(Value -> Parser a) -> (Value -> Parser [a]) -> FromJSON a
$cparseJSON :: Value -> Parser EventsCount
parseJSON :: Value -> Parser EventsCount
$cparseJSONList :: Value -> Parser [EventsCount]
parseJSONList :: Value -> Parser [EventsCount]
J.FromJSON, Integer -> EventsCount
EventsCount -> EventsCount
EventsCount -> EventsCount -> EventsCount
(EventsCount -> EventsCount -> EventsCount)
-> (EventsCount -> EventsCount -> EventsCount)
-> (EventsCount -> EventsCount -> EventsCount)
-> (EventsCount -> EventsCount)
-> (EventsCount -> EventsCount)
-> (EventsCount -> EventsCount)
-> (Integer -> EventsCount)
-> Num EventsCount
forall a.
(a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a)
-> (a -> a)
-> (a -> a)
-> (Integer -> a)
-> Num a
$c+ :: EventsCount -> EventsCount -> EventsCount
+ :: EventsCount -> EventsCount -> EventsCount
$c- :: EventsCount -> EventsCount -> EventsCount
- :: EventsCount -> EventsCount -> EventsCount
$c* :: EventsCount -> EventsCount -> EventsCount
* :: EventsCount -> EventsCount -> EventsCount
$cnegate :: EventsCount -> EventsCount
negate :: EventsCount -> EventsCount
$cabs :: EventsCount -> EventsCount
abs :: EventsCount -> EventsCount
$csignum :: EventsCount -> EventsCount
signum :: EventsCount -> EventsCount
$cfromInteger :: Integer -> EventsCount
fromInteger :: Integer -> EventsCount
Num)
newtype NumEventsFetchedPerSource = NumEventsFetchedPerSource {NumEventsFetchedPerSource -> HashMap SourceName EventsCount
unNumEventsFetchedPerSource :: HashMap SourceName EventsCount}
deriving (NumEventsFetchedPerSource -> NumEventsFetchedPerSource -> Bool
(NumEventsFetchedPerSource -> NumEventsFetchedPerSource -> Bool)
-> (NumEventsFetchedPerSource -> NumEventsFetchedPerSource -> Bool)
-> Eq NumEventsFetchedPerSource
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: NumEventsFetchedPerSource -> NumEventsFetchedPerSource -> Bool
== :: NumEventsFetchedPerSource -> NumEventsFetchedPerSource -> Bool
$c/= :: NumEventsFetchedPerSource -> NumEventsFetchedPerSource -> Bool
/= :: NumEventsFetchedPerSource -> NumEventsFetchedPerSource -> Bool
Eq, Int -> NumEventsFetchedPerSource -> ShowS
[NumEventsFetchedPerSource] -> ShowS
NumEventsFetchedPerSource -> String
(Int -> NumEventsFetchedPerSource -> ShowS)
-> (NumEventsFetchedPerSource -> String)
-> ([NumEventsFetchedPerSource] -> ShowS)
-> Show NumEventsFetchedPerSource
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> NumEventsFetchedPerSource -> ShowS
showsPrec :: Int -> NumEventsFetchedPerSource -> ShowS
$cshow :: NumEventsFetchedPerSource -> String
show :: NumEventsFetchedPerSource -> String
$cshowList :: [NumEventsFetchedPerSource] -> ShowS
showList :: [NumEventsFetchedPerSource] -> ShowS
Show)
instance J.ToJSON NumEventsFetchedPerSource where
toJSON :: NumEventsFetchedPerSource -> Value
toJSON (NumEventsFetchedPerSource HashMap SourceName EventsCount
m) =
Object -> Value
J.Object (Object -> Value) -> Object -> Value
forall a b. (a -> b) -> a -> b
$ [Pair] -> Object
forall v. [(Key, v)] -> KeyMap v
KeyMap.fromList ([Pair] -> Object) -> [Pair] -> Object
forall a b. (a -> b) -> a -> b
$ ((SourceName, EventsCount) -> Pair)
-> [(SourceName, EventsCount)] -> [Pair]
forall a b. (a -> b) -> [a] -> [b]
map ((Text -> Key
Key.fromText (Text -> Key) -> (SourceName -> Text) -> SourceName -> Key
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SourceName -> Text
sourceNameToText) (SourceName -> Key)
-> (EventsCount -> Value) -> (SourceName, EventsCount) -> Pair
forall b c b' c'. (b -> c) -> (b' -> c') -> (b, b') -> (c, c')
forall (a :: * -> * -> *) b c b' c'.
Arrow a =>
a b c -> a b' c' -> a (b, b') (c, c')
*** EventsCount -> Value
forall a. ToJSON a => a -> Value
J.toJSON) ([(SourceName, EventsCount)] -> [Pair])
-> [(SourceName, EventsCount)] -> [Pair]
forall a b. (a -> b) -> a -> b
$ HashMap SourceName EventsCount -> [(SourceName, EventsCount)]
forall k v. HashMap k v -> [(k, v)]
HashMap.toList HashMap SourceName EventsCount
m
instance Semigroup NumEventsFetchedPerSource where
(NumEventsFetchedPerSource HashMap SourceName EventsCount
lMap) <> :: NumEventsFetchedPerSource
-> NumEventsFetchedPerSource -> NumEventsFetchedPerSource
<> (NumEventsFetchedPerSource HashMap SourceName EventsCount
rMap) =
HashMap SourceName EventsCount -> NumEventsFetchedPerSource
NumEventsFetchedPerSource (HashMap SourceName EventsCount -> NumEventsFetchedPerSource)
-> HashMap SourceName EventsCount -> NumEventsFetchedPerSource
forall a b. (a -> b) -> a -> b
$ (EventsCount -> EventsCount -> EventsCount)
-> HashMap SourceName EventsCount
-> HashMap SourceName EventsCount
-> HashMap SourceName EventsCount
forall k v.
(Eq k, Hashable k) =>
(v -> v -> v) -> HashMap k v -> HashMap k v -> HashMap k v
HashMap.unionWith EventsCount -> EventsCount -> EventsCount
forall a. Num a => a -> a -> a
(+) HashMap SourceName EventsCount
lMap HashMap SourceName EventsCount
rMap
instance Monoid NumEventsFetchedPerSource where
mempty :: NumEventsFetchedPerSource
mempty = HashMap SourceName EventsCount -> NumEventsFetchedPerSource
NumEventsFetchedPerSource HashMap SourceName EventsCount
forall a. Monoid a => a
mempty
data FetchedEventsStats = FetchedEventsStats
{ FetchedEventsStats -> NumEventsFetchedPerSource
_fesNumEventsFetched :: NumEventsFetchedPerSource,
FetchedEventsStats -> Int
_fesNumFetches :: Int
}
deriving (FetchedEventsStats -> FetchedEventsStats -> Bool
(FetchedEventsStats -> FetchedEventsStats -> Bool)
-> (FetchedEventsStats -> FetchedEventsStats -> Bool)
-> Eq FetchedEventsStats
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: FetchedEventsStats -> FetchedEventsStats -> Bool
== :: FetchedEventsStats -> FetchedEventsStats -> Bool
$c/= :: FetchedEventsStats -> FetchedEventsStats -> Bool
/= :: FetchedEventsStats -> FetchedEventsStats -> Bool
Eq, (forall x. FetchedEventsStats -> Rep FetchedEventsStats x)
-> (forall x. Rep FetchedEventsStats x -> FetchedEventsStats)
-> Generic FetchedEventsStats
forall x. Rep FetchedEventsStats x -> FetchedEventsStats
forall x. FetchedEventsStats -> Rep FetchedEventsStats x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. FetchedEventsStats -> Rep FetchedEventsStats x
from :: forall x. FetchedEventsStats -> Rep FetchedEventsStats x
$cto :: forall x. Rep FetchedEventsStats x -> FetchedEventsStats
to :: forall x. Rep FetchedEventsStats x -> FetchedEventsStats
Generic, Int -> FetchedEventsStats -> ShowS
[FetchedEventsStats] -> ShowS
FetchedEventsStats -> String
(Int -> FetchedEventsStats -> ShowS)
-> (FetchedEventsStats -> String)
-> ([FetchedEventsStats] -> ShowS)
-> Show FetchedEventsStats
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> FetchedEventsStats -> ShowS
showsPrec :: Int -> FetchedEventsStats -> ShowS
$cshow :: FetchedEventsStats -> String
show :: FetchedEventsStats -> String
$cshowList :: [FetchedEventsStats] -> ShowS
showList :: [FetchedEventsStats] -> ShowS
Show)
instance J.ToJSON FetchedEventsStats where
toJSON :: FetchedEventsStats -> Value
toJSON = Options -> FetchedEventsStats -> Value
forall a.
(Generic a, GToJSON' Value Zero (Rep a)) =>
Options -> a -> Value
J.genericToJSON Options
hasuraJSON
toEncoding :: FetchedEventsStats -> Encoding
toEncoding = Options -> FetchedEventsStats -> Encoding
forall a.
(Generic a, GToJSON' Encoding Zero (Rep a)) =>
Options -> a -> Encoding
J.genericToEncoding Options
hasuraJSON
instance L.ToEngineLog FetchedEventsStats L.Hasura where
toEngineLog :: FetchedEventsStats -> (LogLevel, EngineLogType Hasura, Value)
toEngineLog FetchedEventsStats
stats =
(LogLevel
L.LevelInfo, EngineLogType Hasura
L.eventTriggerProcessLogType, FetchedEventsStats -> Value
forall a. ToJSON a => a -> Value
J.toJSON FetchedEventsStats
stats)
instance Semigroup FetchedEventsStats where
(FetchedEventsStats NumEventsFetchedPerSource
lMap Int
lFetches) <> :: FetchedEventsStats -> FetchedEventsStats -> FetchedEventsStats
<> (FetchedEventsStats NumEventsFetchedPerSource
rMap Int
rFetches) =
NumEventsFetchedPerSource -> Int -> FetchedEventsStats
FetchedEventsStats (NumEventsFetchedPerSource
lMap NumEventsFetchedPerSource
-> NumEventsFetchedPerSource -> NumEventsFetchedPerSource
forall a. Semigroup a => a -> a -> a
<> NumEventsFetchedPerSource
rMap) (Int
lFetches Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
rFetches)
instance Monoid FetchedEventsStats where
mempty :: FetchedEventsStats
mempty = NumEventsFetchedPerSource -> Int -> FetchedEventsStats
FetchedEventsStats NumEventsFetchedPerSource
forall a. Monoid a => a
mempty Int
0
type FetchedEventsStatsLogger = FDebounce.Trigger FetchedEventsStats FetchedEventsStats
createFetchedEventsStatsLogger :: (MonadIO m) => L.Logger L.Hasura -> m FetchedEventsStatsLogger
createFetchedEventsStatsLogger :: forall (m :: * -> *).
MonadIO m =>
Logger Hasura -> m FetchedEventsStatsLogger
createFetchedEventsStatsLogger = Logger Hasura -> m FetchedEventsStatsLogger
forall (m :: * -> *) stats impl.
(MonadIO m, ToEngineLog stats impl, Monoid stats) =>
Logger impl -> m (Trigger stats stats)
L.createStatsLogger
closeFetchedEventsStatsLogger :: (MonadIO m) => L.Logger L.Hasura -> FetchedEventsStatsLogger -> m ()
closeFetchedEventsStatsLogger :: forall (m :: * -> *).
MonadIO m =>
Logger Hasura -> FetchedEventsStatsLogger -> m ()
closeFetchedEventsStatsLogger = EngineLogType Hasura
-> Logger Hasura -> FetchedEventsStatsLogger -> m ()
forall (m :: * -> *) impl stats.
(MonadIO m, EnabledLogTypes impl) =>
EngineLogType impl -> Logger impl -> Trigger stats stats -> m ()
L.closeStatsLogger EngineLogType Hasura
L.eventTriggerProcessLogType
logFetchedEventsStatistics ::
(MonadIO m) =>
FetchedEventsStatsLogger ->
[BackendEventWithSource] ->
m ()
logFetchedEventsStatistics :: forall (m :: * -> *).
MonadIO m =>
FetchedEventsStatsLogger -> [BackendEventWithSource] -> m ()
logFetchedEventsStatistics FetchedEventsStatsLogger
logger [BackendEventWithSource]
backendEvents =
FetchedEventsStatsLogger -> FetchedEventsStats -> m ()
forall (m :: * -> *) stats.
MonadIO m =>
Trigger stats stats -> stats -> m ()
L.logStats FetchedEventsStatsLogger
logger (NumEventsFetchedPerSource -> Int -> FetchedEventsStats
FetchedEventsStats NumEventsFetchedPerSource
numEventsFetchedPerSource Int
1)
where
numEventsFetchedPerSource :: NumEventsFetchedPerSource
numEventsFetchedPerSource =
let sourceNames :: [SourceName]
sourceNames = ((BackendEventWithSource -> SourceName)
-> [BackendEventWithSource] -> [SourceName])
-> [BackendEventWithSource]
-> (BackendEventWithSource -> SourceName)
-> [SourceName]
forall a b c. (a -> b -> c) -> b -> a -> c
flip (BackendEventWithSource -> SourceName)
-> [BackendEventWithSource] -> [SourceName]
forall a b. (a -> b) -> [a] -> [b]
map [BackendEventWithSource]
backendEvents
((BackendEventWithSource -> SourceName) -> [SourceName])
-> (BackendEventWithSource -> SourceName) -> [SourceName]
forall a b. (a -> b) -> a -> b
$ \BackendEventWithSource
backendEvent -> forall (c :: BackendType -> Constraint) (i :: BackendType -> *) r.
AllBackendsSatisfy c =>
AnyBackend i -> (forall (b :: BackendType). c b => i b -> r) -> r
AB.dispatchAnyBackend @Backend BackendEventWithSource
backendEvent EventWithSource b -> SourceName
forall (b :: BackendType).
Backend b =>
EventWithSource b -> SourceName
forall (b :: BackendType). EventWithSource b -> SourceName
_ewsSourceName
in HashMap SourceName EventsCount -> NumEventsFetchedPerSource
NumEventsFetchedPerSource (HashMap SourceName EventsCount -> NumEventsFetchedPerSource)
-> HashMap SourceName EventsCount -> NumEventsFetchedPerSource
forall a b. (a -> b) -> a -> b
$ (EventsCount -> EventsCount -> EventsCount)
-> [(SourceName, EventsCount)] -> HashMap SourceName EventsCount
forall k v.
(Eq k, Hashable k) =>
(v -> v -> v) -> [(k, v)] -> HashMap k v
HashMap.fromListWith EventsCount -> EventsCount -> EventsCount
forall a. Num a => a -> a -> a
(+) [(SourceName
sourceName, EventsCount
1) | SourceName
sourceName <- [SourceName]
sourceNames]
{-# ANN processEventQueue ("HLint: ignore Use withAsync" :: String) #-}
upperBoundEventTriggerTimeout :: DiffTime
upperBoundEventTriggerTimeout :: DiffTime
upperBoundEventTriggerTimeout = Minutes -> DiffTime
minutes Minutes
30
processEventQueue ::
forall m.
( MonadIO m,
MonadBaseControl IO m,
LA.Forall (LA.Pure m),
MonadMask m,
Tracing.MonadTrace m,
MonadGetPolicies m
) =>
L.Logger L.Hasura ->
FetchedEventsStatsLogger ->
HTTP.Manager ->
IO SchemaCache ->
IO EventEngineCtx ->
TVar Int ->
LockedEventsCtx ->
ServerMetrics ->
EventTriggerMetrics ->
MaintenanceMode () ->
m (Forever m)
processEventQueue :: forall (m :: * -> *).
(MonadIO m, MonadBaseControl IO m, Forall (Pure m), MonadMask m,
MonadTrace m, MonadGetPolicies m) =>
Logger Hasura
-> FetchedEventsStatsLogger
-> Manager
-> IO SchemaCache
-> IO EventEngineCtx
-> TVar Int
-> LockedEventsCtx
-> ServerMetrics
-> EventTriggerMetrics
-> MaintenanceMode ()
-> m (Forever m)
processEventQueue Logger Hasura
logger FetchedEventsStatsLogger
statsLogger Manager
httpMgr IO SchemaCache
getSchemaCache IO EventEngineCtx
getEventEngineCtx TVar Int
activeEventProcessingThreads LockedEventsCtx {TVar (HashMap SourceName (Set EventId))
leEvents :: TVar (HashMap SourceName (Set EventId))
leEvents :: LockedEventsCtx -> TVar (HashMap SourceName (Set EventId))
leEvents} ServerMetrics
serverMetrics EventTriggerMetrics
eventTriggerMetrics MaintenanceMode ()
maintenanceMode = do
[BackendEventWithSource]
events0 <- m [BackendEventWithSource]
popEventsBatch
Forever m -> m (Forever m)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Forever m -> m (Forever m)) -> Forever m -> m (Forever m)
forall a b. (a -> b) -> a -> b
$ ([BackendEventWithSource], Int, Bool)
-> (([BackendEventWithSource], Int, Bool)
-> m ([BackendEventWithSource], Int, Bool))
-> Forever m
forall (m :: * -> *) a. a -> (a -> m a) -> Forever m
Forever ([BackendEventWithSource]
events0, Int
0, Bool
False) ([BackendEventWithSource], Int, Bool)
-> m ([BackendEventWithSource], Int, Bool)
go
where
popEventsBatch :: m [BackendEventWithSource]
popEventsBatch :: m [BackendEventWithSource]
popEventsBatch = do
SourceCache
allSources <- SchemaCache -> SourceCache
scSources (SchemaCache -> SourceCache) -> m SchemaCache -> m SourceCache
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO SchemaCache -> m SchemaCache
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO SchemaCache
getSchemaCache
Int
fetchBatchSize <- Refined NonNegative Int -> Int
forall {k} (p :: k) x. Refined p x -> x
unrefine (Refined NonNegative Int -> Int)
-> (EventEngineCtx -> Refined NonNegative Int)
-> EventEngineCtx
-> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. EventEngineCtx -> Refined NonNegative Int
_eeCtxFetchSize (EventEngineCtx -> Int) -> m EventEngineCtx -> m Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO EventEngineCtx -> m EventEngineCtx
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO EventEngineCtx
getEventEngineCtx
[BackendEventWithSource]
events <- IO [BackendEventWithSource] -> m [BackendEventWithSource]
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
(IO [BackendEventWithSource] -> m [BackendEventWithSource])
-> (IO [[BackendEventWithSource]] -> IO [BackendEventWithSource])
-> IO [[BackendEventWithSource]]
-> m [BackendEventWithSource]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ([[BackendEventWithSource]] -> [BackendEventWithSource])
-> IO [[BackendEventWithSource]] -> IO [BackendEventWithSource]
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap [[BackendEventWithSource]] -> [BackendEventWithSource]
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat
(IO [[BackendEventWithSource]] -> m [BackendEventWithSource])
-> IO [[BackendEventWithSource]] -> m [BackendEventWithSource]
forall a b. (a -> b) -> a -> b
$
[(SourceName, BackendSourceInfo)]
-> ((SourceName, BackendSourceInfo) -> IO [BackendEventWithSource])
-> IO [[BackendEventWithSource]]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, MonadBaseControl IO m, Forall (Pure m)) =>
t a -> (a -> m b) -> m (t b)
LA.forConcurrently (SourceCache -> [(SourceName, BackendSourceInfo)]
forall k v. HashMap k v -> [(k, v)]
HashMap.toList SourceCache
allSources) \(SourceName
sourceName, BackendSourceInfo
sourceCache) ->
forall (c :: BackendType -> Constraint) (i :: BackendType -> *) r.
AllBackendsSatisfy c =>
AnyBackend i -> (forall (b :: BackendType). c b => i b -> r) -> r
AB.dispatchAnyBackend @BackendEventTrigger BackendSourceInfo
sourceCache \(SourceInfo {Maybe QueryTagsConfig
TableCache b
FunctionCache b
StoredProcedureCache b
LogicalModelCache b
NativeQueryCache b
BackendSourceKind b
SourceName
SourceConfig b
ResolvedSourceCustomization
DBObjectsIntrospection b
_siName :: SourceName
_siSourceKind :: BackendSourceKind b
_siTables :: TableCache b
_siFunctions :: FunctionCache b
_siNativeQueries :: NativeQueryCache b
_siStoredProcedures :: StoredProcedureCache b
_siLogicalModels :: LogicalModelCache b
_siConfiguration :: SourceConfig b
_siQueryTagsConfig :: Maybe QueryTagsConfig
_siCustomization :: ResolvedSourceCustomization
_siDbObjectsIntrospection :: DBObjectsIntrospection b
_siName :: forall (b :: BackendType). SourceInfo b -> SourceName
_siSourceKind :: forall (b :: BackendType). SourceInfo b -> BackendSourceKind b
_siTables :: forall (b :: BackendType). SourceInfo b -> TableCache b
_siFunctions :: forall (b :: BackendType). SourceInfo b -> FunctionCache b
_siNativeQueries :: forall (b :: BackendType). SourceInfo b -> NativeQueryCache b
_siStoredProcedures :: forall (b :: BackendType). SourceInfo b -> StoredProcedureCache b
_siLogicalModels :: forall (b :: BackendType). SourceInfo b -> LogicalModelCache b
_siConfiguration :: forall (b :: BackendType). SourceInfo b -> SourceConfig b
_siQueryTagsConfig :: forall (b :: BackendType). SourceInfo b -> Maybe QueryTagsConfig
_siCustomization :: forall (b :: BackendType).
SourceInfo b -> ResolvedSourceCustomization
_siDbObjectsIntrospection :: forall (b :: BackendType). SourceInfo b -> DBObjectsIntrospection b
..} :: SourceInfo b) -> do
let tables :: [TableInfo b]
tables = TableCache b -> [TableInfo b]
forall k v. HashMap k v -> [v]
HashMap.elems TableCache b
_siTables
triggerMap :: [EventTriggerInfoMap b]
triggerMap = TableInfo b -> EventTriggerInfoMap b
forall (b :: BackendType). TableInfo b -> EventTriggerInfoMap b
_tiEventTriggerInfoMap (TableInfo b -> EventTriggerInfoMap b)
-> [TableInfo b] -> [EventTriggerInfoMap b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [TableInfo b]
tables
eventTriggerCount :: Int
eventTriggerCount = [Int] -> Int
forall a. Num a => [a] -> a
forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum (EventTriggerInfoMap b -> Int
forall k v. HashMap k v -> Int
HashMap.size (EventTriggerInfoMap b -> Int) -> [EventTriggerInfoMap b] -> [Int]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [EventTriggerInfoMap b]
triggerMap)
triggerNames :: [TriggerName]
triggerNames = (EventTriggerInfoMap b -> [TriggerName])
-> [EventTriggerInfoMap b] -> [TriggerName]
forall (t :: * -> *) a b. Foldable t => (a -> [b]) -> t a -> [b]
concatMap EventTriggerInfoMap b -> [TriggerName]
forall k v. HashMap k v -> [k]
HashMap.keys [EventTriggerInfoMap b]
triggerMap
if Int
eventTriggerCount Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
then do
UTCTime
eventPollStartTime <- IO UTCTime
getCurrentTime
ExceptT QErr IO [Event b] -> IO (Either QErr [Event b])
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (forall (b :: BackendType) (m :: * -> *).
(BackendEventTrigger b, MonadIO m, MonadError QErr m) =>
SourceConfig b
-> SourceName
-> [TriggerName]
-> MaintenanceMode ()
-> FetchBatchSize
-> m [Event b]
fetchUndeliveredEvents @b SourceConfig b
_siConfiguration SourceName
sourceName [TriggerName]
triggerNames MaintenanceMode ()
maintenanceMode (Int -> FetchBatchSize
FetchBatchSize Int
fetchBatchSize)) IO (Either QErr [Event b])
-> (Either QErr [Event b] -> IO [BackendEventWithSource])
-> IO [BackendEventWithSource]
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Right [Event b]
events -> do
let eventFetchCount :: Int64
eventFetchCount = Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Int64) -> Int -> Int64
forall a b. (a -> b) -> a -> b
$ [Event b] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Event b]
events
Gauge -> Int64 -> IO ()
Prometheus.Gauge.set (EventTriggerMetrics -> Gauge
eventsFetchedPerBatch EventTriggerMetrics
eventTriggerMetrics) Int64
eventFetchCount
if ([Event b] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [Event b]
events)
then [BackendEventWithSource] -> IO [BackendEventWithSource]
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return []
else do
UTCTime
eventsFetchedTime <- IO UTCTime
getCurrentTime
let eventPollTime :: Double
eventPollTime = NominalDiffTime -> Double
forall a b. (Real a, Fractional b) => a -> b
realToFrac (NominalDiffTime -> Double) -> NominalDiffTime -> Double
forall a b. (a -> b) -> a -> b
$ UTCTime -> UTCTime -> NominalDiffTime
diffUTCTime UTCTime
eventsFetchedTime UTCTime
eventPollStartTime
()
_ <- Distribution -> Double -> IO ()
EKG.Distribution.add (ServerMetrics -> Distribution
smEventFetchTimePerBatch ServerMetrics
serverMetrics) Double
eventPollTime
Histogram -> Double -> IO ()
Prometheus.Histogram.observe (EventTriggerMetrics -> Histogram
eventsFetchTimePerBatch EventTriggerMetrics
eventTriggerMetrics) Double
eventPollTime
()
_ <- Distribution -> Double -> IO ()
EKG.Distribution.add (ServerMetrics -> Distribution
smNumEventsFetchedPerBatch ServerMetrics
serverMetrics) (Int -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Double) -> Int -> Double
forall a b. (a -> b) -> a -> b
$ [Event b] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Event b]
events)
SourceName
-> [EventId] -> TVar (HashMap SourceName (Set EventId)) -> IO ()
forall (m :: * -> *).
MonadIO m =>
SourceName
-> [EventId] -> TVar (HashMap SourceName (Set EventId)) -> m ()
saveLockedEventTriggerEvents SourceName
sourceName (Event b -> EventId
forall (b :: BackendType). Event b -> EventId
eId (Event b -> EventId) -> [Event b] -> [EventId]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [Event b]
events) TVar (HashMap SourceName (Set EventId))
leEvents
[BackendEventWithSource] -> IO [BackendEventWithSource]
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ([BackendEventWithSource] -> IO [BackendEventWithSource])
-> [BackendEventWithSource] -> IO [BackendEventWithSource]
forall a b. (a -> b) -> a -> b
$ (Event b -> BackendEventWithSource)
-> [Event b] -> [BackendEventWithSource]
forall a b. (a -> b) -> [a] -> [b]
map (\Event b
event -> forall (b :: BackendType) (i :: BackendType -> *).
HasTag b =>
i b -> AnyBackend i
AB.mkAnyBackend @b (EventWithSource b -> BackendEventWithSource)
-> EventWithSource b -> BackendEventWithSource
forall a b. (a -> b) -> a -> b
$ Event b
-> SourceConfig b -> SourceName -> UTCTime -> EventWithSource b
forall (b :: BackendType).
Event b
-> SourceConfig b -> SourceName -> UTCTime -> EventWithSource b
EventWithSource Event b
event SourceConfig b
_siConfiguration SourceName
sourceName UTCTime
eventsFetchedTime) [Event b]
events
Left QErr
err -> do
IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Logger Hasura
-> forall a (m :: * -> *).
(ToEngineLog a Hasura, MonadIO m) =>
a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
(ToEngineLog a impl, MonadIO m) =>
a -> m ()
L.unLogger Logger Hasura
logger (EventInternalErr -> IO ()) -> EventInternalErr -> IO ()
forall a b. (a -> b) -> a -> b
$ QErr -> EventInternalErr
EventInternalErr QErr
err
[BackendEventWithSource] -> IO [BackendEventWithSource]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
else [BackendEventWithSource] -> IO [BackendEventWithSource]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
FetchedEventsStatsLogger -> [BackendEventWithSource] -> m ()
forall (m :: * -> *).
MonadIO m =>
FetchedEventsStatsLogger -> [BackendEventWithSource] -> m ()
logFetchedEventsStatistics FetchedEventsStatsLogger
statsLogger [BackendEventWithSource]
events
[BackendEventWithSource] -> m [BackendEventWithSource]
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure [BackendEventWithSource]
events
go :: FetchEventArguments -> m FetchEventArguments
go :: ([BackendEventWithSource], Int, Bool)
-> m ([BackendEventWithSource], Int, Bool)
go ([BackendEventWithSource]
events, !Int
fullFetchCount, !Bool
alreadyWarned) = do
EventEngineCtx {TVar Int
DiffTime
Refined NonNegative Int
_eeCtxEventThreadsCapacity :: EventEngineCtx -> TVar Int
_eeCtxFetchInterval :: EventEngineCtx -> DiffTime
_eeCtxFetchSize :: EventEngineCtx -> Refined NonNegative Int
_eeCtxEventThreadsCapacity :: TVar Int
_eeCtxFetchInterval :: DiffTime
_eeCtxFetchSize :: Refined NonNegative Int
..} <- IO EventEngineCtx -> m EventEngineCtx
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO EventEngineCtx
getEventEngineCtx
let fetchBatchSize :: Int
fetchBatchSize = Refined NonNegative Int -> Int
forall {k} (p :: k) x. Refined p x -> x
unrefine Refined NonNegative Int
_eeCtxFetchSize
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when ([BackendEventWithSource] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [BackendEventWithSource]
events) (m () -> m ()) -> (IO () -> m ()) -> IO () -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ DiffTime -> IO ()
sleep DiffTime
_eeCtxFetchInterval
[BackendEventWithSource]
eventsNext <- m [BackendEventWithSource]
-> (Async [BackendEventWithSource] -> m [BackendEventWithSource])
-> m [BackendEventWithSource]
forall (m :: * -> *) a b.
(MonadBaseControl IO m, Forall (Pure m)) =>
m a -> (Async a -> m b) -> m b
LA.withAsync m [BackendEventWithSource]
popEventsBatch ((Async [BackendEventWithSource] -> m [BackendEventWithSource])
-> m [BackendEventWithSource])
-> (Async [BackendEventWithSource] -> m [BackendEventWithSource])
-> m [BackendEventWithSource]
forall a b. (a -> b) -> a -> b
$ \Async [BackendEventWithSource]
eventsNextA -> do
[BackendEventWithSource]
-> (BackendEventWithSource -> m ()) -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [BackendEventWithSource]
events ((BackendEventWithSource -> m ()) -> m ())
-> (BackendEventWithSource -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \BackendEventWithSource
eventWithSource ->
forall (c :: BackendType -> Constraint) (i :: BackendType -> *) r.
AllBackendsSatisfy c =>
AnyBackend i -> (forall (b :: BackendType). c b => i b -> r) -> r
AB.dispatchAnyBackend @BackendEventTrigger BackendEventWithSource
eventWithSource \(EventWithSource b
eventWithSource' :: EventWithSource b) ->
m () -> m ()
forall (m :: * -> *) a. MonadMask m => m a -> m a
mask_ (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
(IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically
(STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Int
maxCapacity <- TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
_eeCtxEventThreadsCapacity
Int
activeThreadCount <- TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
activeEventProcessingThreads
Bool -> STM ()
check (Bool -> STM ()) -> Bool -> STM ()
forall a b. (a -> b) -> a -> b
$ Int
maxCapacity Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
activeThreadCount
TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
activeEventProcessingThreads (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
Async ()
t <-
m () -> m (Async ())
forall (m :: * -> *) a.
(MonadBaseControl IO m, Forall (Pure m)) =>
m a -> m (Async a)
LA.async
(m () -> m (Async ())) -> m () -> m (Async ())
forall a b. (a -> b) -> a -> b
$ (ReaderT (Logger Hasura, Manager) m ()
-> (Logger Hasura, Manager) -> m ())
-> (Logger Hasura, Manager)
-> ReaderT (Logger Hasura, Manager) m ()
-> m ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip ReaderT (Logger Hasura, Manager) m ()
-> (Logger Hasura, Manager) -> m ()
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT (Logger Hasura
logger, Manager
httpMgr)
(ReaderT (Logger Hasura, Manager) m () -> m ())
-> ReaderT (Logger Hasura, Manager) m () -> m ()
forall a b. (a -> b) -> a -> b
$ EventWithSource b -> ReaderT (Logger Hasura, Manager) m ()
forall (io :: * -> *) r (b :: BackendType).
(MonadBaseControl IO io, MonadIO io, MonadReader r io,
Has Manager r, Has (Logger Hasura) r, MonadMask io,
BackendEventTrigger b, MonadTrace io, MonadGetPolicies io) =>
EventWithSource b -> io ()
processEvent EventWithSource b
eventWithSource'
ReaderT (Logger Hasura, Manager) m ()
-> ReaderT (Logger Hasura, Manager) m ()
-> ReaderT (Logger Hasura, Manager) m ()
forall (m :: * -> *) a b. MonadMask m => m a -> m b -> m a
`finally`
ReaderT (Logger Hasura, Manager) m ()
decrementActiveThreadCount
Async () -> m ()
forall (m :: * -> *) a. MonadBase IO m => Async a -> m ()
LA.link Async ()
t
Async [BackendEventWithSource] -> m [BackendEventWithSource]
forall (m :: * -> *) a.
(MonadBase IO m, Forall (Pure m)) =>
Async a -> m a
LA.wait Async [BackendEventWithSource]
eventsNextA
let lenEvents :: Int
lenEvents = [BackendEventWithSource] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [BackendEventWithSource]
events
if
| Int
lenEvents Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
fetchBatchSize -> do
let clearlyBehind :: Bool
clearlyBehind = Int
fullFetchCount Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
3
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
alreadyWarned
(m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
clearlyBehind
(m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ Logger Hasura
-> forall a (m :: * -> *).
(ToEngineLog a Hasura, MonadIO m) =>
a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
(ToEngineLog a impl, MonadIO m) =>
a -> m ()
L.unLogger Logger Hasura
logger
(UnstructuredLog -> m ()) -> UnstructuredLog -> m ()
forall a b. (a -> b) -> a -> b
$ LogLevel -> SerializableBlob -> UnstructuredLog
L.UnstructuredLog LogLevel
L.LevelWarn
(SerializableBlob -> UnstructuredLog)
-> SerializableBlob -> UnstructuredLog
forall a b. (a -> b) -> a -> b
$ String -> SerializableBlob
forall a. IsString a => String -> a
fromString
(String -> SerializableBlob) -> String -> SerializableBlob
forall a b. (a -> b) -> a -> b
$ String
"Events processor may not be keeping up with events generated in postgres, "
String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
"or we're working on a backlog of events. Consider increasing "
String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
"HASURA_GRAPHQL_EVENTS_HTTP_POOL_SIZE"
([BackendEventWithSource], Int, Bool)
-> m ([BackendEventWithSource], Int, Bool)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ([BackendEventWithSource]
eventsNext, (Int
fullFetchCount Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1), (Bool
alreadyWarned Bool -> Bool -> Bool
|| Bool
clearlyBehind))
| Bool
otherwise -> do
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
lenEvents Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
/= Int
fetchBatchSize Bool -> Bool -> Bool
&& Bool
alreadyWarned)
(m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
Logger Hasura
-> forall a (m :: * -> *).
(ToEngineLog a Hasura, MonadIO m) =>
a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
(ToEngineLog a impl, MonadIO m) =>
a -> m ()
L.unLogger Logger Hasura
logger
(UnstructuredLog -> m ()) -> UnstructuredLog -> m ()
forall a b. (a -> b) -> a -> b
$ LogLevel -> SerializableBlob -> UnstructuredLog
L.UnstructuredLog LogLevel
L.LevelWarn
(SerializableBlob -> UnstructuredLog)
-> SerializableBlob -> UnstructuredLog
forall a b. (a -> b) -> a -> b
$ String -> SerializableBlob
forall a. IsString a => String -> a
fromString
(String -> SerializableBlob) -> String -> SerializableBlob
forall a b. (a -> b) -> a -> b
$ String
"It looks like the events processor is keeping up again."
([BackendEventWithSource], Int, Bool)
-> m ([BackendEventWithSource], Int, Bool)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ([BackendEventWithSource]
eventsNext, Int
0, Bool
False)
decrementActiveThreadCount :: ReaderT (Logger Hasura, Manager) m ()
decrementActiveThreadCount =
IO () -> ReaderT (Logger Hasura, Manager) m ()
forall a. IO a -> ReaderT (Logger Hasura, Manager) m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
(IO () -> ReaderT (Logger Hasura, Manager) m ())
-> IO () -> ReaderT (Logger Hasura, Manager) m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically
(STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
activeEventProcessingThreads (Int -> Int -> Int
forall a. Num a => a -> a -> a
subtract Int
1)
extractEventContext :: forall io. (MonadIO io) => J.Value -> io (Maybe Tracing.TraceContext)
extractEventContext :: forall (io :: * -> *).
MonadIO io =>
Value -> io (Maybe TraceContext)
extractEventContext Value
e = do
let traceIdMaybe :: Maybe TraceId
traceIdMaybe =
ByteString -> Maybe TraceId
Tracing.traceIdFromHex
(ByteString -> Maybe TraceId)
-> (Text -> ByteString) -> Text -> Maybe TraceId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> ByteString
txtToBs
(Text -> Maybe TraceId) -> Maybe Text -> Maybe TraceId
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Value
e
Value -> Getting (First Text) Value Text -> Maybe Text
forall s a. s -> Getting (First a) s a -> Maybe a
^? Key -> Traversal' Value Value
forall t. AsValue t => Key -> Traversal' t Value
JL.key Key
"trace_context" ((Value -> Const (First Text) Value)
-> Value -> Const (First Text) Value)
-> Getting (First Text) Value Text
-> Getting (First Text) Value Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Key -> Traversal' Value Value
forall t. AsValue t => Key -> Traversal' t Value
JL.key Key
"trace_id" ((Value -> Const (First Text) Value)
-> Value -> Const (First Text) Value)
-> Getting (First Text) Value Text
-> Getting (First Text) Value Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Getting (First Text) Value Text
forall t. AsValue t => Prism' t Text
Prism' Value Text
JL._String
Maybe TraceId
-> (TraceId -> io TraceContext) -> io (Maybe TraceContext)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
t a -> (a -> f b) -> f (t b)
for Maybe TraceId
traceIdMaybe ((TraceId -> io TraceContext) -> io (Maybe TraceContext))
-> (TraceId -> io TraceContext) -> io (Maybe TraceContext)
forall a b. (a -> b) -> a -> b
$ \TraceId
traceId -> do
SpanId
freshSpanId <- io SpanId
forall (m :: * -> *). MonadIO m => m SpanId
Tracing.randomSpanId
let parentSpanId :: Maybe SpanId
parentSpanId =
ByteString -> Maybe SpanId
Tracing.spanIdFromHex
(ByteString -> Maybe SpanId)
-> (Text -> ByteString) -> Text -> Maybe SpanId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> ByteString
txtToBs
(Text -> Maybe SpanId) -> Maybe Text -> Maybe SpanId
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Value
e
Value -> Getting (First Text) Value Text -> Maybe Text
forall s a. s -> Getting (First a) s a -> Maybe a
^? Key -> Traversal' Value Value
forall t. AsValue t => Key -> Traversal' t Value
JL.key Key
"trace_context" ((Value -> Const (First Text) Value)
-> Value -> Const (First Text) Value)
-> Getting (First Text) Value Text
-> Getting (First Text) Value Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Key -> Traversal' Value Value
forall t. AsValue t => Key -> Traversal' t Value
JL.key Key
"span_id" ((Value -> Const (First Text) Value)
-> Value -> Const (First Text) Value)
-> Getting (First Text) Value Text
-> Getting (First Text) Value Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Getting (First Text) Value Text
forall t. AsValue t => Prism' t Text
Prism' Value Text
JL._String
samplingState :: SamplingState
samplingState =
Maybe Text -> SamplingState
forall s. (IsString s, Eq s) => Maybe s -> SamplingState
Tracing.samplingStateFromHeader
(Maybe Text -> SamplingState) -> Maybe Text -> SamplingState
forall a b. (a -> b) -> a -> b
$ Value
e
Value -> Getting (First Text) Value Text -> Maybe Text
forall s a. s -> Getting (First a) s a -> Maybe a
^? Key -> Traversal' Value Value
forall t. AsValue t => Key -> Traversal' t Value
JL.key Key
"trace_context" ((Value -> Const (First Text) Value)
-> Value -> Const (First Text) Value)
-> Getting (First Text) Value Text
-> Getting (First Text) Value Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Key -> Traversal' Value Value
forall t. AsValue t => Key -> Traversal' t Value
JL.key Key
"sampling_state" ((Value -> Const (First Text) Value)
-> Value -> Const (First Text) Value)
-> Getting (First Text) Value Text
-> Getting (First Text) Value Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Getting (First Text) Value Text
forall t. AsValue t => Prism' t Text
Prism' Value Text
JL._String
TraceContext -> io TraceContext
forall a. a -> io a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (TraceContext -> io TraceContext)
-> TraceContext -> io TraceContext
forall a b. (a -> b) -> a -> b
$ TraceId -> SpanId -> Maybe SpanId -> SamplingState -> TraceContext
Tracing.TraceContext TraceId
traceId SpanId
freshSpanId Maybe SpanId
parentSpanId SamplingState
samplingState
processEvent ::
forall io r b.
( MonadBaseControl IO io,
MonadIO io,
MonadReader r io,
Has HTTP.Manager r,
Has (L.Logger L.Hasura) r,
MonadMask io,
BackendEventTrigger b,
Tracing.MonadTrace io,
MonadGetPolicies io
) =>
EventWithSource b ->
io ()
processEvent :: forall (io :: * -> *) r (b :: BackendType).
(MonadBaseControl IO io, MonadIO io, MonadReader r io,
Has Manager r, Has (Logger Hasura) r, MonadMask io,
BackendEventTrigger b, MonadTrace io, MonadGetPolicies io) =>
EventWithSource b -> io ()
processEvent (EventWithSource Event b
e SourceConfig b
sourceConfig SourceName
sourceName UTCTime
eventFetchedTime) = do
IO GranularPrometheusMetricsState
getPrometheusMetricsGranularity <- io (IO GranularPrometheusMetricsState)
forall (m :: * -> *).
MonadGetPolicies m =>
m (IO GranularPrometheusMetricsState)
runGetPrometheusMetricsGranularity
UTCTime
eventProcessTime <- IO UTCTime -> io UTCTime
forall a. IO a -> io a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
let eventQueueTime :: Double
eventQueueTime = NominalDiffTime -> Double
forall a b. (Real a, Fractional b) => a -> b
realToFrac (NominalDiffTime -> Double) -> NominalDiffTime -> Double
forall a b. (a -> b) -> a -> b
$ UTCTime -> UTCTime -> NominalDiffTime
diffUTCTime UTCTime
eventProcessTime UTCTime
eventFetchedTime
()
_ <- IO () -> io ()
forall a. IO a -> io a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> io ()) -> IO () -> io ()
forall a b. (a -> b) -> a -> b
$ Distribution -> Double -> IO ()
EKG.Distribution.add (ServerMetrics -> Distribution
smEventQueueTime ServerMetrics
serverMetrics) Double
eventQueueTime
IO () -> io ()
forall a. IO a -> io a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
(IO () -> io ()) -> IO () -> io ()
forall a b. (a -> b) -> a -> b
$ IO GranularPrometheusMetricsState
-> Bool
-> HistogramVector (Maybe DynamicEventTriggerLabel)
-> DynamicEventTriggerLabel
-> Double
-> IO ()
forall l (m :: * -> *).
(Ord l, MonadIO m) =>
IO GranularPrometheusMetricsState
-> Bool -> HistogramVector (Maybe l) -> l -> Double -> m ()
observeHistogramWithLabel
IO GranularPrometheusMetricsState
getPrometheusMetricsGranularity
Bool
True
(EventTriggerMetrics
-> HistogramVector (Maybe DynamicEventTriggerLabel)
eventQueueTimeSeconds EventTriggerMetrics
eventTriggerMetrics)
(TriggerName -> SourceName -> DynamicEventTriggerLabel
DynamicEventTriggerLabel (TriggerMetadata -> TriggerName
tmName (Event b -> TriggerMetadata
forall (b :: BackendType). Event b -> TriggerMetadata
eTrigger Event b
e)) SourceName
sourceName)
Double
eventQueueTime
SchemaCache
cache <- IO SchemaCache -> io SchemaCache
forall a. IO a -> io a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO SchemaCache
getSchemaCache
Text -> io () -> io ()
trace <-
Value -> io (Maybe TraceContext)
forall (io :: * -> *).
MonadIO io =>
Value -> io (Maybe TraceContext)
extractEventContext (Event b -> Value
forall (b :: BackendType). Event b -> Value
eEvent Event b
e) io (Maybe TraceContext)
-> (Maybe TraceContext -> Text -> io () -> io ())
-> io (Text -> io () -> io ())
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \case
Maybe TraceContext
Nothing -> SamplingPolicy -> Text -> io () -> io ()
forall (m :: * -> *) a.
(MonadIO m, MonadTrace m) =>
SamplingPolicy -> Text -> m a -> m a
Tracing.newTrace SamplingPolicy
Tracing.sampleAlways
Just TraceContext
ctx -> TraceContext -> SamplingPolicy -> Text -> io () -> io ()
forall a. TraceContext -> SamplingPolicy -> Text -> io a -> io a
forall (m :: * -> *) a.
MonadTrace m =>
TraceContext -> SamplingPolicy -> Text -> m a -> m a
Tracing.newTraceWith TraceContext
ctx SamplingPolicy
Tracing.sampleAlways
let spanName :: EventTriggerInfo b -> Text
spanName EventTriggerInfo b
eti = Text
"Event trigger: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> NonEmptyText -> Text
unNonEmptyText (TriggerName -> NonEmptyText
unTriggerName (EventTriggerInfo b -> TriggerName
forall (b :: BackendType). EventTriggerInfo b -> TriggerName
etiName EventTriggerInfo b
eti))
Either QErr (MaintenanceMode MaintenanceModeVersion)
maintenanceModeVersionEither :: Either QErr (MaintenanceMode MaintenanceModeVersion) <-
case MaintenanceMode ()
maintenanceMode of
MaintenanceModeEnabled () -> do
ExceptT QErr io MaintenanceModeVersion
-> io (Either QErr MaintenanceModeVersion)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (forall (b :: BackendType) (m :: * -> *).
(BackendEventTrigger b, MonadIO m, MonadError QErr m) =>
SourceConfig b -> m MaintenanceModeVersion
getMaintenanceModeVersion @b SourceConfig b
sourceConfig) io (Either QErr MaintenanceModeVersion)
-> (Either QErr MaintenanceModeVersion
-> Either QErr (MaintenanceMode MaintenanceModeVersion))
-> io (Either QErr (MaintenanceMode MaintenanceModeVersion))
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \case
Left QErr
err -> QErr -> Either QErr (MaintenanceMode MaintenanceModeVersion)
forall a b. a -> Either a b
Left QErr
err
Right MaintenanceModeVersion
maintenanceModeVersion -> MaintenanceMode MaintenanceModeVersion
-> Either QErr (MaintenanceMode MaintenanceModeVersion)
forall a b. b -> Either a b
Right (MaintenanceMode MaintenanceModeVersion
-> Either QErr (MaintenanceMode MaintenanceModeVersion))
-> MaintenanceMode MaintenanceModeVersion
-> Either QErr (MaintenanceMode MaintenanceModeVersion)
forall a b. (a -> b) -> a -> b
$ (MaintenanceModeVersion -> MaintenanceMode MaintenanceModeVersion
forall a. a -> MaintenanceMode a
MaintenanceModeEnabled MaintenanceModeVersion
maintenanceModeVersion)
MaintenanceMode ()
MaintenanceModeDisabled -> Either QErr (MaintenanceMode MaintenanceModeVersion)
-> io (Either QErr (MaintenanceMode MaintenanceModeVersion))
forall a. a -> io a
forall (m :: * -> *) a. Monad m => a -> m a
return (Either QErr (MaintenanceMode MaintenanceModeVersion)
-> io (Either QErr (MaintenanceMode MaintenanceModeVersion)))
-> Either QErr (MaintenanceMode MaintenanceModeVersion)
-> io (Either QErr (MaintenanceMode MaintenanceModeVersion))
forall a b. (a -> b) -> a -> b
$ MaintenanceMode MaintenanceModeVersion
-> Either QErr (MaintenanceMode MaintenanceModeVersion)
forall a b. b -> Either a b
Right MaintenanceMode MaintenanceModeVersion
forall a. MaintenanceMode a
MaintenanceModeDisabled
case Either QErr (MaintenanceMode MaintenanceModeVersion)
maintenanceModeVersionEither of
Left QErr
maintenanceModeVersionErr -> QErr -> io ()
forall r (m :: * -> *).
(MonadReader r m, Has (Logger Hasura) r, MonadIO m) =>
QErr -> m ()
logQErr QErr
maintenanceModeVersionErr
Right MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion ->
case SchemaCache -> Event b -> Either Text (EventTriggerInfo b)
forall (b :: BackendType).
Backend b =>
SchemaCache -> Event b -> Either Text (EventTriggerInfo b)
getEventTriggerInfoFromEvent SchemaCache
cache Event b
e of
Left Text
err -> do
QErr -> io ()
forall r (m :: * -> *).
(MonadReader r m, Has (Logger Hasura) r, MonadIO m) =>
QErr -> m ()
logQErr (QErr -> io ()) -> QErr -> io ()
forall a b. (a -> b) -> a -> b
$ Code -> Text -> QErr
err500 Code
Unexpected Text
err
UTCTime
currentTime <- IO UTCTime -> io UTCTime
forall a. IO a -> io a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
ExceptT QErr io () -> io (Either QErr ())
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (SourceConfig b
-> Event b
-> UTCTime
-> MaintenanceMode MaintenanceModeVersion
-> ExceptT QErr io ()
forall (b :: BackendType) (m :: * -> *).
(BackendEventTrigger b, MonadIO m, MonadError QErr m) =>
SourceConfig b
-> Event b
-> UTCTime
-> MaintenanceMode MaintenanceModeVersion
-> m ()
forall (m :: * -> *).
(MonadIO m, MonadError QErr m) =>
SourceConfig b
-> Event b
-> UTCTime
-> MaintenanceMode MaintenanceModeVersion
-> m ()
setRetry SourceConfig b
sourceConfig Event b
e (NominalDiffTime -> UTCTime -> UTCTime
addUTCTime NominalDiffTime
60 UTCTime
currentTime) MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion)
io (Either QErr ()) -> (Either QErr () -> io ()) -> io ()
forall a b. io a -> (a -> io b) -> io b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Either QErr () -> (QErr -> io ()) -> io ())
-> (QErr -> io ()) -> Either QErr () -> io ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip Either QErr () -> (QErr -> io ()) -> io ()
forall (m :: * -> *) e a.
Applicative m =>
Either e a -> (e -> m a) -> m a
onLeft QErr -> io ()
forall r (m :: * -> *).
(MonadReader r m, Has (Logger Hasura) r, MonadIO m) =>
QErr -> m ()
logQErr
Right EventTriggerInfo b
eti -> Text -> io () -> io ()
trace (EventTriggerInfo b -> Text
forall {b :: BackendType}. EventTriggerInfo b -> Text
spanName EventTriggerInfo b
eti) do
let webhook :: EnvRecord ResolvedWebhook
webhook = WebhookConfInfo -> EnvRecord ResolvedWebhook
wciCachedValue (WebhookConfInfo -> EnvRecord ResolvedWebhook)
-> WebhookConfInfo -> EnvRecord ResolvedWebhook
forall a b. (a -> b) -> a -> b
$ EventTriggerInfo b -> WebhookConfInfo
forall (b :: BackendType). EventTriggerInfo b -> WebhookConfInfo
etiWebhookInfo EventTriggerInfo b
eti
retryConf :: RetryConf
retryConf = EventTriggerInfo b -> RetryConf
forall (b :: BackendType). EventTriggerInfo b -> RetryConf
etiRetryConf EventTriggerInfo b
eti
timeoutSeconds :: Int
timeoutSeconds = Int -> Maybe Int -> Int
forall a. a -> Maybe a -> a
fromMaybe Int
defaultTimeoutSeconds (RetryConf -> Maybe Int
rcTimeoutSec RetryConf
retryConf)
httpTimeout :: ResponseTimeout
httpTimeout = Int -> ResponseTimeout
HTTP.responseTimeoutMicro (Int
timeoutSeconds Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000000)
([Header]
headers, [HeaderConf]
logHeaders) = [EventHeaderInfo] -> ([Header], [HeaderConf])
prepareHeaders (EventTriggerInfo b -> [EventHeaderInfo]
forall (b :: BackendType). EventTriggerInfo b -> [EventHeaderInfo]
etiHeaders EventTriggerInfo b
eti)
ep :: EventPayload b
ep = RetryConf -> Event b -> EventPayload b
forall (b :: BackendType). RetryConf -> Event b -> EventPayload b
createEventPayload RetryConf
retryConf Event b
e
payload :: ByteString
payload = Value -> ByteString
forall a. ToJSON a => a -> ByteString
J.encode (Value -> ByteString) -> Value -> ByteString
forall a b. (a -> b) -> a -> b
$ EventPayload b -> Value
forall a. ToJSON a => a -> Value
J.toJSON EventPayload b
ep
extraLogCtx :: ExtraLogContext
extraLogCtx = EventId -> Maybe TriggerName -> ExtraLogContext
ExtraLogContext (EventPayload b -> EventId
forall (b :: BackendType). EventPayload b -> EventId
epId EventPayload b
ep) (TriggerName -> Maybe TriggerName
forall a. a -> Maybe a
Just (TriggerName -> Maybe TriggerName)
-> TriggerName -> Maybe TriggerName
forall a b. (a -> b) -> a -> b
$ EventTriggerInfo b -> TriggerName
forall (b :: BackendType). EventTriggerInfo b -> TriggerName
etiName EventTriggerInfo b
eti)
requestTransform :: Maybe RequestTransform
requestTransform = EventTriggerInfo b -> Maybe RequestTransform
forall (b :: BackendType).
EventTriggerInfo b -> Maybe RequestTransform
etiRequestTransform EventTriggerInfo b
eti
responseTransform :: Maybe ResponseTransform
responseTransform = MetadataResponseTransform -> ResponseTransform
mkResponseTransform (MetadataResponseTransform -> ResponseTransform)
-> Maybe MetadataResponseTransform -> Maybe ResponseTransform
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> EventTriggerInfo b -> Maybe MetadataResponseTransform
forall (b :: BackendType).
EventTriggerInfo b -> Maybe MetadataResponseTransform
etiResponseTransform EventTriggerInfo b
eti
eventTriggerProcessingTimeout :: DiffTime
eventTriggerProcessingTimeout = DiffTime -> (Int -> DiffTime) -> Maybe Int -> DiffTime
forall b a. b -> (a -> b) -> Maybe a -> b
maybe DiffTime
upperBoundEventTriggerTimeout (DiffTime -> DiffTime -> DiffTime
forall a. Ord a => a -> a -> a
min DiffTime
upperBoundEventTriggerTimeout (DiffTime -> DiffTime) -> (Int -> DiffTime) -> Int -> DiffTime
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> DiffTime
forall a b. (Integral a, Num b) => a -> b
fromIntegral) (RetryConf -> Maybe Int
rcTimeoutSec RetryConf
retryConf)
eventTriggerProcessAction :: io ()
eventTriggerProcessAction = do
UTCTime
eventExecutionStartTime <- IO UTCTime -> io UTCTime
forall a. IO a -> io a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
Either
(TransformableRequestError 'EventType)
(Request, HTTPResp 'EventType)
eitherReqRes <-
ExceptT
(TransformableRequestError 'EventType)
io
(Request, HTTPResp 'EventType)
-> io
(Either
(TransformableRequestError 'EventType)
(Request, HTTPResp 'EventType))
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT
(ExceptT
(TransformableRequestError 'EventType)
io
(Request, HTTPResp 'EventType)
-> io
(Either
(TransformableRequestError 'EventType)
(Request, HTTPResp 'EventType)))
-> ExceptT
(TransformableRequestError 'EventType)
io
(Request, HTTPResp 'EventType)
-> io
(Either
(TransformableRequestError 'EventType)
(Request, HTTPResp 'EventType))
forall a b. (a -> b) -> a -> b
$ [Header]
-> ResponseTimeout
-> ByteString
-> Maybe RequestTransform
-> ResolvedWebhook
-> ExceptT (TransformableRequestError 'EventType) io RequestDetails
forall (a :: TriggerTypes) (m :: * -> *).
MonadError (TransformableRequestError a) m =>
[Header]
-> ResponseTimeout
-> ByteString
-> Maybe RequestTransform
-> ResolvedWebhook
-> m RequestDetails
mkRequest [Header]
headers ResponseTimeout
httpTimeout ByteString
payload Maybe RequestTransform
requestTransform (EnvRecord ResolvedWebhook -> ResolvedWebhook
forall a. EnvRecord a -> a
_envVarValue EnvRecord ResolvedWebhook
webhook)
ExceptT (TransformableRequestError 'EventType) io RequestDetails
-> (RequestDetails
-> ExceptT
(TransformableRequestError 'EventType)
io
(Request, HTTPResp 'EventType))
-> ExceptT
(TransformableRequestError 'EventType)
io
(Request, HTTPResp 'EventType)
forall a b.
ExceptT (TransformableRequestError 'EventType) io a
-> (a -> ExceptT (TransformableRequestError 'EventType) io b)
-> ExceptT (TransformableRequestError 'EventType) io b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \RequestDetails
reqDetails -> do
let request :: Request
request = RequestDetails -> Request
extractRequest RequestDetails
reqDetails
logger' :: Either (HTTPErr 'EventType) (HTTPResp 'EventType)
-> RequestDetails
-> ExceptT (TransformableRequestError 'EventType) io ()
logger' Either (HTTPErr 'EventType) (HTTPResp 'EventType)
res RequestDetails
details = do
Either (HTTPErr 'EventType) (HTTPResp 'EventType)
-> ExtraLogContext
-> RequestDetails
-> Text
-> [HeaderConf]
-> ExceptT (TransformableRequestError 'EventType) io ()
forall r (m :: * -> *).
(MonadReader r m, Has (Logger Hasura) r, MonadIO m) =>
Either (HTTPErr 'EventType) (HTTPResp 'EventType)
-> ExtraLogContext
-> RequestDetails
-> Text
-> [HeaderConf]
-> m ()
logHTTPForET Either (HTTPErr 'EventType) (HTTPResp 'EventType)
res ExtraLogContext
extraLogCtx RequestDetails
details (EnvRecord ResolvedWebhook -> Text
forall a. EnvRecord a -> Text
_envVarName EnvRecord ResolvedWebhook
webhook) [HeaderConf]
logHeaders
IO () -> ExceptT (TransformableRequestError 'EventType) io ()
forall a.
IO a -> ExceptT (TransformableRequestError 'EventType) io a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ExceptT (TransformableRequestError 'EventType) io ())
-> IO () -> ExceptT (TransformableRequestError 'EventType) io ()
forall a b. (a -> b) -> a -> b
$ do
case Either (HTTPErr 'EventType) (HTTPResp 'EventType)
res of
Left HTTPErr 'EventType
_err -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Right HTTPResp 'EventType
response ->
Counter -> Int64 -> IO ()
Prometheus.Counter.add
(EventTriggerMetrics -> Counter
eventTriggerBytesReceived EventTriggerMetrics
eventTriggerMetrics)
(HTTPResp 'EventType -> Int64
forall (a :: TriggerTypes). HTTPResp a -> Int64
hrsSize HTTPResp 'EventType
response)
let RequestDetails {Int64
_rdOriginalSize :: Int64
_rdOriginalSize :: RequestDetails -> Int64
_rdOriginalSize, Maybe Int64
_rdTransformedSize :: Maybe Int64
_rdTransformedSize :: RequestDetails -> Maybe Int64
_rdTransformedSize} = RequestDetails
details
in Counter -> Int64 -> IO ()
Prometheus.Counter.add
(EventTriggerMetrics -> Counter
eventTriggerBytesSent EventTriggerMetrics
eventTriggerMetrics)
(Int64 -> Maybe Int64 -> Int64
forall a. a -> Maybe a -> a
fromMaybe Int64
_rdOriginalSize Maybe Int64
_rdTransformedSize)
HTTPResp 'EventType
resp <-
ExceptT (TransformableRequestError 'EventType) io ()
-> ExceptT (TransformableRequestError 'EventType) io ()
-> ExceptT
(TransformableRequestError 'EventType) io (HTTPResp 'EventType)
-> ExceptT
(TransformableRequestError 'EventType) io (HTTPResp 'EventType)
forall (m :: * -> *) a c b. MonadMask m => m a -> m c -> m b -> m b
bracket_
( do
IO () -> ExceptT (TransformableRequestError 'EventType) io ()
forall a.
IO a -> ExceptT (TransformableRequestError 'EventType) io a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ExceptT (TransformableRequestError 'EventType) io ())
-> IO () -> ExceptT (TransformableRequestError 'EventType) io ()
forall a b. (a -> b) -> a -> b
$ Gauge -> IO ()
EKG.Gauge.inc (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ ServerMetrics -> Gauge
smNumEventHTTPWorkers ServerMetrics
serverMetrics
IO () -> ExceptT (TransformableRequestError 'EventType) io ()
forall a.
IO a -> ExceptT (TransformableRequestError 'EventType) io a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ExceptT (TransformableRequestError 'EventType) io ())
-> IO () -> ExceptT (TransformableRequestError 'EventType) io ()
forall a b. (a -> b) -> a -> b
$ Gauge -> IO ()
Prometheus.Gauge.inc (EventTriggerMetrics -> Gauge
eventTriggerHTTPWorkers EventTriggerMetrics
eventTriggerMetrics)
)
( do
IO () -> ExceptT (TransformableRequestError 'EventType) io ()
forall a.
IO a -> ExceptT (TransformableRequestError 'EventType) io a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ExceptT (TransformableRequestError 'EventType) io ())
-> IO () -> ExceptT (TransformableRequestError 'EventType) io ()
forall a b. (a -> b) -> a -> b
$ Gauge -> IO ()
EKG.Gauge.dec (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ ServerMetrics -> Gauge
smNumEventHTTPWorkers ServerMetrics
serverMetrics
IO () -> ExceptT (TransformableRequestError 'EventType) io ()
forall a.
IO a -> ExceptT (TransformableRequestError 'EventType) io a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ExceptT (TransformableRequestError 'EventType) io ())
-> IO () -> ExceptT (TransformableRequestError 'EventType) io ()
forall a b. (a -> b) -> a -> b
$ Gauge -> IO ()
Prometheus.Gauge.dec (EventTriggerMetrics -> Gauge
eventTriggerHTTPWorkers EventTriggerMetrics
eventTriggerMetrics)
)
(RequestDetails
-> Maybe ResponseTransform
-> Maybe SessionVariables
-> (Either (HTTPErr 'EventType) (HTTPResp 'EventType)
-> RequestDetails
-> ExceptT (TransformableRequestError 'EventType) io ())
-> ExceptT
(TransformableRequestError 'EventType) io (HTTPResp 'EventType)
forall r (m :: * -> *) (a :: TriggerTypes).
(MonadReader r m, MonadError (TransformableRequestError a) m,
Has Manager r, Has (Logger Hasura) r, MonadIO m, MonadTrace m) =>
RequestDetails
-> Maybe ResponseTransform
-> Maybe SessionVariables
-> (Either (HTTPErr a) (HTTPResp a) -> RequestDetails -> m ())
-> m (HTTPResp a)
invokeRequest RequestDetails
reqDetails Maybe ResponseTransform
responseTransform (RequestDetails -> Maybe SessionVariables
_rdSessionVars RequestDetails
reqDetails) Either (HTTPErr 'EventType) (HTTPResp 'EventType)
-> RequestDetails
-> ExceptT (TransformableRequestError 'EventType) io ()
logger')
(Request, HTTPResp 'EventType)
-> ExceptT
(TransformableRequestError 'EventType)
io
(Request, HTTPResp 'EventType)
forall a. a -> ExceptT (TransformableRequestError 'EventType) io a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Request
request, HTTPResp 'EventType
resp)
case Either
(TransformableRequestError 'EventType)
(Request, HTTPResp 'EventType)
eitherReqRes of
Right (Request
req, HTTPResp 'EventType
resp) -> do
let reqBody :: Value
reqBody = Value -> Maybe Value -> Value
forall a. a -> Maybe a -> a
fromMaybe Value
J.Null (Maybe Value -> Value) -> Maybe Value -> Value
forall a b. (a -> b) -> a -> b
$ Getting (First ByteString) Request ByteString
-> Request -> Maybe ByteString
forall s (m :: * -> *) a.
MonadReader s m =>
Getting (First a) s a -> m (Maybe a)
preview ((RequestBody -> Const (First ByteString) RequestBody)
-> Request -> Const (First ByteString) Request
Lens' Request RequestBody
HTTP.body ((RequestBody -> Const (First ByteString) RequestBody)
-> Request -> Const (First ByteString) Request)
-> ((ByteString -> Const (First ByteString) ByteString)
-> RequestBody -> Const (First ByteString) RequestBody)
-> Getting (First ByteString) Request ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ByteString -> Const (First ByteString) ByteString)
-> RequestBody -> Const (First ByteString) RequestBody
Prism' RequestBody ByteString
HTTP._RequestBodyLBS) Request
req Maybe ByteString -> (ByteString -> Maybe Value) -> Maybe Value
forall a b. Maybe a -> (a -> Maybe b) -> Maybe b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall a. FromJSON a => ByteString -> Maybe a
J.decode @J.Value
SourceConfig b
-> Event b
-> [HeaderConf]
-> Value
-> MaintenanceMode MaintenanceModeVersion
-> HTTPResp 'EventType
-> io (Either QErr ())
forall (b :: BackendType) (m :: * -> *) (a :: TriggerTypes).
(MonadIO m, BackendEventTrigger b) =>
SourceConfig b
-> Event b
-> [HeaderConf]
-> Value
-> MaintenanceMode MaintenanceModeVersion
-> HTTPResp a
-> m (Either QErr ())
processSuccess SourceConfig b
sourceConfig Event b
e [HeaderConf]
logHeaders Value
reqBody MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion HTTPResp 'EventType
resp io (Either QErr ()) -> (Either QErr () -> io ()) -> io ()
forall a b. io a -> (a -> io b) -> io b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Either QErr () -> (QErr -> io ()) -> io ())
-> (QErr -> io ()) -> Either QErr () -> io ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip Either QErr () -> (QErr -> io ()) -> io ()
forall (m :: * -> *) e a.
Applicative m =>
Either e a -> (e -> m a) -> m a
onLeft QErr -> io ()
forall r (m :: * -> *).
(MonadReader r m, Has (Logger Hasura) r, MonadIO m) =>
QErr -> m ()
logQErr
UTCTime
eventExecutionFinishTime <- IO UTCTime -> io UTCTime
forall a. IO a -> io a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
let eventWebhookProcessingTime' :: Double
eventWebhookProcessingTime' = NominalDiffTime -> Double
forall a b. (Real a, Fractional b) => a -> b
realToFrac (NominalDiffTime -> Double) -> NominalDiffTime -> Double
forall a b. (a -> b) -> a -> b
$ UTCTime -> UTCTime -> NominalDiffTime
diffUTCTime UTCTime
eventExecutionFinishTime UTCTime
eventExecutionStartTime
eventStartTime :: UTCTime
eventStartTime = UTCTime -> Maybe UTCTime -> UTCTime
forall a. a -> Maybe a -> a
fromMaybe (Event b -> UTCTime
forall (b :: BackendType). Event b -> UTCTime
eCreatedAtUTC Event b
e) (Event b -> Maybe UTCTime
forall (b :: BackendType). Event b -> Maybe UTCTime
eRetryAtUTC Event b
e)
eventProcessingTime' :: Double
eventProcessingTime' = NominalDiffTime -> Double
forall a b. (Real a, Fractional b) => a -> b
realToFrac (NominalDiffTime -> Double) -> NominalDiffTime -> Double
forall a b. (a -> b) -> a -> b
$ UTCTime -> UTCTime -> NominalDiffTime
diffUTCTime UTCTime
eventExecutionFinishTime UTCTime
eventStartTime
IO GranularPrometheusMetricsState
-> Bool
-> HistogramVector (Maybe DynamicEventTriggerLabel)
-> DynamicEventTriggerLabel
-> Double
-> io ()
forall l (m :: * -> *).
(Ord l, MonadIO m) =>
IO GranularPrometheusMetricsState
-> Bool -> HistogramVector (Maybe l) -> l -> Double -> m ()
observeHistogramWithLabel
IO GranularPrometheusMetricsState
getPrometheusMetricsGranularity
Bool
True
(EventTriggerMetrics
-> HistogramVector (Maybe DynamicEventTriggerLabel)
eventProcessingTime EventTriggerMetrics
eventTriggerMetrics)
(TriggerName -> SourceName -> DynamicEventTriggerLabel
DynamicEventTriggerLabel (EventTriggerInfo b -> TriggerName
forall (b :: BackendType). EventTriggerInfo b -> TriggerName
etiName EventTriggerInfo b
eti) SourceName
sourceName)
Double
eventProcessingTime'
IO () -> io ()
forall a. IO a -> io a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> io ()) -> IO () -> io ()
forall a b. (a -> b) -> a -> b
$ do
Distribution -> Double -> IO ()
EKG.Distribution.add (ServerMetrics -> Distribution
smEventWebhookProcessingTime ServerMetrics
serverMetrics) Double
eventWebhookProcessingTime'
IO GranularPrometheusMetricsState
-> Bool
-> HistogramVector (Maybe DynamicEventTriggerLabel)
-> DynamicEventTriggerLabel
-> Double
-> IO ()
forall l (m :: * -> *).
(Ord l, MonadIO m) =>
IO GranularPrometheusMetricsState
-> Bool -> HistogramVector (Maybe l) -> l -> Double -> m ()
observeHistogramWithLabel
IO GranularPrometheusMetricsState
getPrometheusMetricsGranularity
Bool
True
(EventTriggerMetrics
-> HistogramVector (Maybe DynamicEventTriggerLabel)
eventWebhookProcessingTime EventTriggerMetrics
eventTriggerMetrics)
(TriggerName -> SourceName -> DynamicEventTriggerLabel
DynamicEventTriggerLabel (EventTriggerInfo b -> TriggerName
forall (b :: BackendType). EventTriggerInfo b -> TriggerName
etiName EventTriggerInfo b
eti) SourceName
sourceName)
Double
eventWebhookProcessingTime'
Distribution -> Double -> IO ()
EKG.Distribution.add (ServerMetrics -> Distribution
smEventProcessingTime ServerMetrics
serverMetrics) Double
eventProcessingTime'
IO GranularPrometheusMetricsState
-> Bool
-> CounterVector EventStatusWithTriggerLabel
-> EventStatusWithTriggerLabel
-> IO ()
forall (m :: * -> *).
MonadIO m =>
IO GranularPrometheusMetricsState
-> Bool
-> CounterVector EventStatusWithTriggerLabel
-> EventStatusWithTriggerLabel
-> m ()
incEventTriggerCounterWithLabel
IO GranularPrometheusMetricsState
getPrometheusMetricsGranularity
Bool
True
(EventTriggerMetrics -> CounterVector EventStatusWithTriggerLabel
eventProcessedTotal EventTriggerMetrics
eventTriggerMetrics)
(EventStatusLabel
-> Maybe DynamicEventTriggerLabel -> EventStatusWithTriggerLabel
EventStatusWithTriggerLabel EventStatusLabel
eventSuccessLabel (DynamicEventTriggerLabel -> Maybe DynamicEventTriggerLabel
forall a. a -> Maybe a
Just (TriggerName -> SourceName -> DynamicEventTriggerLabel
DynamicEventTriggerLabel (EventTriggerInfo b -> TriggerName
forall (b :: BackendType). EventTriggerInfo b -> TriggerName
etiName EventTriggerInfo b
eti) SourceName
sourceName)))
IO GranularPrometheusMetricsState
-> Bool
-> CounterVector EventStatusWithTriggerLabel
-> EventStatusWithTriggerLabel
-> IO ()
forall (m :: * -> *).
MonadIO m =>
IO GranularPrometheusMetricsState
-> Bool
-> CounterVector EventStatusWithTriggerLabel
-> EventStatusWithTriggerLabel
-> m ()
incEventTriggerCounterWithLabel
IO GranularPrometheusMetricsState
getPrometheusMetricsGranularity
Bool
True
(EventTriggerMetrics -> CounterVector EventStatusWithTriggerLabel
eventInvocationTotal EventTriggerMetrics
eventTriggerMetrics)
(EventStatusLabel
-> Maybe DynamicEventTriggerLabel -> EventStatusWithTriggerLabel
EventStatusWithTriggerLabel EventStatusLabel
eventSuccessLabel (DynamicEventTriggerLabel -> Maybe DynamicEventTriggerLabel
forall a. a -> Maybe a
Just (TriggerName -> SourceName -> DynamicEventTriggerLabel
DynamicEventTriggerLabel (EventTriggerInfo b -> TriggerName
forall (b :: BackendType). EventTriggerInfo b -> TriggerName
etiName EventTriggerInfo b
eti) SourceName
sourceName)))
Left TransformableRequestError 'EventType
eventError -> do
IO () -> io ()
forall a. IO a -> io a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
(IO () -> io ()) -> IO () -> io ()
forall a b. (a -> b) -> a -> b
$ IO GranularPrometheusMetricsState
-> Bool
-> CounterVector EventStatusWithTriggerLabel
-> EventStatusWithTriggerLabel
-> IO ()
forall (m :: * -> *).
MonadIO m =>
IO GranularPrometheusMetricsState
-> Bool
-> CounterVector EventStatusWithTriggerLabel
-> EventStatusWithTriggerLabel
-> m ()
incEventTriggerCounterWithLabel
IO GranularPrometheusMetricsState
getPrometheusMetricsGranularity
Bool
True
(EventTriggerMetrics -> CounterVector EventStatusWithTriggerLabel
eventInvocationTotal EventTriggerMetrics
eventTriggerMetrics)
(EventStatusLabel
-> Maybe DynamicEventTriggerLabel -> EventStatusWithTriggerLabel
EventStatusWithTriggerLabel EventStatusLabel
eventFailedLabel (DynamicEventTriggerLabel -> Maybe DynamicEventTriggerLabel
forall a. a -> Maybe a
Just (TriggerName -> SourceName -> DynamicEventTriggerLabel
DynamicEventTriggerLabel (EventTriggerInfo b -> TriggerName
forall (b :: BackendType). EventTriggerInfo b -> TriggerName
etiName EventTriggerInfo b
eti) SourceName
sourceName)))
case TransformableRequestError 'EventType
eventError of
(HTTPError Value
reqBody HTTPErr 'EventType
err) ->
forall (b :: BackendType) (m :: * -> *) (a :: TriggerTypes).
(MonadIO m, BackendEventTrigger b, MonadGetPolicies m) =>
SourceConfig b
-> Event b
-> RetryConf
-> [HeaderConf]
-> Value
-> MaintenanceMode MaintenanceModeVersion
-> EventTriggerMetrics
-> HTTPErr a
-> m (Either QErr ())
processError @b SourceConfig b
sourceConfig Event b
e RetryConf
retryConf [HeaderConf]
logHeaders Value
reqBody MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion EventTriggerMetrics
eventTriggerMetrics HTTPErr 'EventType
err io (Either QErr ()) -> (Either QErr () -> io ()) -> io ()
forall a b. io a -> (a -> io b) -> io b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Either QErr () -> (QErr -> io ()) -> io ())
-> (QErr -> io ()) -> Either QErr () -> io ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip Either QErr () -> (QErr -> io ()) -> io ()
forall (m :: * -> *) e a.
Applicative m =>
Either e a -> (e -> m a) -> m a
onLeft QErr -> io ()
forall r (m :: * -> *).
(MonadReader r m, Has (Logger Hasura) r, MonadIO m) =>
QErr -> m ()
logQErr
(TransformationError Value
_ TransformErrorBundle
err) -> do
Logger Hasura
-> forall a (m :: * -> *).
(ToEngineLog a Hasura, MonadIO m) =>
a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
(ToEngineLog a impl, MonadIO m) =>
a -> m ()
L.unLogger Logger Hasura
logger (UnstructuredLog -> io ()) -> UnstructuredLog -> io ()
forall a b. (a -> b) -> a -> b
$ LogLevel -> SerializableBlob -> UnstructuredLog
L.UnstructuredLog LogLevel
L.LevelError (ByteString -> SerializableBlob
SB.fromLBS (ByteString -> SerializableBlob) -> ByteString -> SerializableBlob
forall a b. (a -> b) -> a -> b
$ TransformErrorBundle -> ByteString
forall a. ToJSON a => a -> ByteString
J.encode TransformErrorBundle
err)
forall (b :: BackendType) (m :: * -> *).
(BackendEventTrigger b, MonadIO m) =>
SourceConfig b
-> Event b
-> Maybe (Invocation 'EventType)
-> ProcessEventError
-> MaintenanceMode MaintenanceModeVersion
-> m (Either QErr ())
recordError' @b SourceConfig b
sourceConfig Event b
e Maybe (Invocation 'EventType)
forall a. Maybe a
Nothing ProcessEventError
PESetError MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion io (Either QErr ()) -> (Either QErr () -> io ()) -> io ()
forall a b. io a -> (a -> io b) -> io b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Either QErr () -> (QErr -> io ()) -> io ())
-> (QErr -> io ()) -> Either QErr () -> io ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip Either QErr () -> (QErr -> io ()) -> io ()
forall (m :: * -> *) e a.
Applicative m =>
Either e a -> (e -> m a) -> m a
onLeft QErr -> io ()
forall r (m :: * -> *).
(MonadReader r m, Has (Logger Hasura) r, MonadIO m) =>
QErr -> m ()
logQErr
Int -> io () -> io (Maybe ())
forall (m :: * -> *) a.
MonadBaseControl IO m =>
Int -> m a -> m (Maybe a)
timeout (Integer -> Int
forall a. Num a => Integer -> a
fromInteger (DiffTime -> Integer
diffTimeToMicroSeconds DiffTime
eventTriggerProcessingTimeout)) io ()
eventTriggerProcessAction
io (Maybe ()) -> io () -> io ()
forall (m :: * -> *) a. Monad m => m (Maybe a) -> m a -> m a
`onNothingM` do
let eventTriggerTimeoutMessage :: Text
eventTriggerTimeoutMessage = Text
"Event Trigger " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> EventTriggerInfo b -> TriggerName
forall (b :: BackendType). EventTriggerInfo b -> TriggerName
etiName EventTriggerInfo b
eti TriggerName -> Text -> Text
forall t. ToTxt t => t -> Text -> Text
<<> Text
" timed out while processing."
eventTriggerTimeoutError :: QErr
eventTriggerTimeoutError = Code -> Text -> QErr
err500 Code
TimeoutErrorCode Text
eventTriggerTimeoutMessage
Logger Hasura
-> forall a (m :: * -> *).
(ToEngineLog a Hasura, MonadIO m) =>
a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
(ToEngineLog a impl, MonadIO m) =>
a -> m ()
L.unLogger Logger Hasura
logger (EventInternalErr -> io ()) -> EventInternalErr -> io ()
forall a b. (a -> b) -> a -> b
$ QErr -> EventInternalErr
EventInternalErr QErr
eventTriggerTimeoutError
forall (b :: BackendType) (m :: * -> *) (a :: TriggerTypes).
(MonadIO m, BackendEventTrigger b, MonadGetPolicies m) =>
SourceConfig b
-> Event b
-> RetryConf
-> [HeaderConf]
-> Value
-> MaintenanceMode MaintenanceModeVersion
-> EventTriggerMetrics
-> HTTPErr a
-> m (Either QErr ())
processError @b SourceConfig b
sourceConfig Event b
e RetryConf
retryConf [HeaderConf]
logHeaders Value
J.Null MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion EventTriggerMetrics
eventTriggerMetrics (String -> HTTPErr Any
forall (a :: TriggerTypes). String -> HTTPErr a
HOther (String -> HTTPErr Any) -> String -> HTTPErr Any
forall a b. (a -> b) -> a -> b
$ Text -> String
T.unpack Text
eventTriggerTimeoutMessage)
io (Either QErr ()) -> (Either QErr () -> io ()) -> io ()
forall a b. io a -> (a -> io b) -> io b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Either QErr () -> (QErr -> io ()) -> io ())
-> (QErr -> io ()) -> Either QErr () -> io ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip Either QErr () -> (QErr -> io ()) -> io ()
forall (m :: * -> *) e a.
Applicative m =>
Either e a -> (e -> m a) -> m a
onLeft QErr -> io ()
forall r (m :: * -> *).
(MonadReader r m, Has (Logger Hasura) r, MonadIO m) =>
QErr -> m ()
logQErr
SourceName
-> EventId -> TVar (HashMap SourceName (Set EventId)) -> io ()
forall (m :: * -> *).
MonadIO m =>
SourceName
-> EventId -> TVar (HashMap SourceName (Set EventId)) -> m ()
removeEventTriggerEventFromLockedEvents SourceName
sourceName (Event b -> EventId
forall (b :: BackendType). Event b -> EventId
eId Event b
e) TVar (HashMap SourceName (Set EventId))
leEvents
createEventPayload :: RetryConf -> Event b -> EventPayload b
createEventPayload :: forall (b :: BackendType). RetryConf -> Event b -> EventPayload b
createEventPayload RetryConf
retryConf Event b
e =
EventPayload
{ epId :: EventId
epId = Event b -> EventId
forall (b :: BackendType). Event b -> EventId
eId Event b
e,
epTable :: TableName b
epTable = Event b -> TableName b
forall (b :: BackendType). Event b -> TableName b
eTable Event b
e,
epTrigger :: TriggerMetadata
epTrigger = Event b -> TriggerMetadata
forall (b :: BackendType). Event b -> TriggerMetadata
eTrigger Event b
e,
epEvent :: Value
epEvent = Event b -> Value
forall (b :: BackendType). Event b -> Value
eEvent Event b
e,
epDeliveryInfo :: DeliveryInfo
epDeliveryInfo =
DeliveryInfo
{ diCurrentRetry :: Int
diCurrentRetry = Event b -> Int
forall (b :: BackendType). Event b -> Int
eTries Event b
e,
diMaxRetries :: Int
diMaxRetries = RetryConf -> Int
rcNumRetries RetryConf
retryConf
},
epCreatedAt :: LocalTime
epCreatedAt = Event b -> LocalTime
forall (b :: BackendType). Event b -> LocalTime
eCreatedAt Event b
e
}
processSuccess ::
forall b m a.
(MonadIO m, BackendEventTrigger b) =>
SourceConfig b ->
Event b ->
[HeaderConf] ->
J.Value ->
MaintenanceMode MaintenanceModeVersion ->
HTTPResp a ->
m (Either QErr ())
processSuccess :: forall (b :: BackendType) (m :: * -> *) (a :: TriggerTypes).
(MonadIO m, BackendEventTrigger b) =>
SourceConfig b
-> Event b
-> [HeaderConf]
-> Value
-> MaintenanceMode MaintenanceModeVersion
-> HTTPResp a
-> m (Either QErr ())
processSuccess SourceConfig b
sourceConfig Event b
e [HeaderConf]
reqHeaders Value
ep MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion HTTPResp a
resp = do
let respBody :: SerializableBlob
respBody = HTTPResp a -> SerializableBlob
forall (a :: TriggerTypes). HTTPResp a -> SerializableBlob
hrsBody HTTPResp a
resp
respHeaders :: [HeaderConf]
respHeaders = HTTPResp a -> [HeaderConf]
forall (a :: TriggerTypes). HTTPResp a -> [HeaderConf]
hrsHeaders HTTPResp a
resp
respStatus :: Int
respStatus = HTTPResp a -> Int
forall (a :: TriggerTypes). HTTPResp a -> Int
hrsStatus HTTPResp a
resp
eid :: EventId
eid = Event b -> EventId
forall (b :: BackendType). Event b -> EventId
eId Event b
e
invocation :: Invocation 'EventType
invocation = EventId
-> Value
-> Maybe Int
-> [HeaderConf]
-> SerializableBlob
-> [HeaderConf]
-> Invocation 'EventType
mkInvocation EventId
eid Value
ep (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
respStatus) [HeaderConf]
reqHeaders SerializableBlob
respBody [HeaderConf]
respHeaders
forall (b :: BackendType) (m :: * -> *).
(BackendEventTrigger b, MonadIO m) =>
SourceConfig b
-> Event b
-> Invocation 'EventType
-> MaintenanceMode MaintenanceModeVersion
-> m (Either QErr ())
recordSuccess @b SourceConfig b
sourceConfig Event b
e Invocation 'EventType
invocation MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion
processError ::
forall b m a.
( MonadIO m,
BackendEventTrigger b,
MonadGetPolicies m
) =>
SourceConfig b ->
Event b ->
RetryConf ->
[HeaderConf] ->
J.Value ->
MaintenanceMode MaintenanceModeVersion ->
EventTriggerMetrics ->
HTTPErr a ->
m (Either QErr ())
processError :: forall (b :: BackendType) (m :: * -> *) (a :: TriggerTypes).
(MonadIO m, BackendEventTrigger b, MonadGetPolicies m) =>
SourceConfig b
-> Event b
-> RetryConf
-> [HeaderConf]
-> Value
-> MaintenanceMode MaintenanceModeVersion
-> EventTriggerMetrics
-> HTTPErr a
-> m (Either QErr ())
processError SourceConfig b
sourceConfig Event b
e RetryConf
retryConf [HeaderConf]
reqHeaders Value
ep MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion EventTriggerMetrics
eventTriggerMetrics HTTPErr a
err = do
let invocation :: Invocation 'EventType
invocation = case HTTPErr a
err of
HClient HttpException
httpException ->
let statusMaybe :: Maybe Int
statusMaybe = HttpException -> Maybe Int
getHTTPExceptionStatus HttpException
httpException
in EventId
-> Value
-> Maybe Int
-> [HeaderConf]
-> SerializableBlob
-> [HeaderConf]
-> Invocation 'EventType
mkInvocation (Event b -> EventId
forall (b :: BackendType). Event b -> EventId
eId Event b
e) Value
ep Maybe Int
statusMaybe [HeaderConf]
reqHeaders (ByteString -> SerializableBlob
SB.fromLBS (HttpException -> ByteString
httpExceptionErrorEncoding HttpException
httpException)) []
HStatus HTTPResp a
errResp -> do
let respPayload :: SerializableBlob
respPayload = HTTPResp a -> SerializableBlob
forall (a :: TriggerTypes). HTTPResp a -> SerializableBlob
hrsBody HTTPResp a
errResp
respHeaders :: [HeaderConf]
respHeaders = HTTPResp a -> [HeaderConf]
forall (a :: TriggerTypes). HTTPResp a -> [HeaderConf]
hrsHeaders HTTPResp a
errResp
respStatus :: Int
respStatus = HTTPResp a -> Int
forall (a :: TriggerTypes). HTTPResp a -> Int
hrsStatus HTTPResp a
errResp
EventId
-> Value
-> Maybe Int
-> [HeaderConf]
-> SerializableBlob
-> [HeaderConf]
-> Invocation 'EventType
mkInvocation (Event b -> EventId
forall (b :: BackendType). Event b -> EventId
eId Event b
e) Value
ep (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
respStatus) [HeaderConf]
reqHeaders SerializableBlob
respPayload [HeaderConf]
respHeaders
HOther String
detail -> do
let errMsg :: SerializableBlob
errMsg = ByteString -> SerializableBlob
SB.fromLBS (ByteString -> SerializableBlob) -> ByteString -> SerializableBlob
forall a b. (a -> b) -> a -> b
$ String -> ByteString
forall a. ToJSON a => a -> ByteString
J.encode String
detail
EventId
-> Value
-> Maybe Int
-> [HeaderConf]
-> SerializableBlob
-> [HeaderConf]
-> Invocation 'EventType
mkInvocation (Event b -> EventId
forall (b :: BackendType). Event b -> EventId
eId Event b
e) Value
ep (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
500) [HeaderConf]
reqHeaders SerializableBlob
errMsg []
ProcessEventError
retryOrError <- Event b
-> RetryConf
-> EventTriggerMetrics
-> HTTPErr a
-> m ProcessEventError
forall (m :: * -> *) (b :: BackendType) (a :: TriggerTypes).
(MonadIO m, MonadGetPolicies m) =>
Event b
-> RetryConf
-> EventTriggerMetrics
-> HTTPErr a
-> m ProcessEventError
retryOrSetError Event b
e RetryConf
retryConf EventTriggerMetrics
eventTriggerMetrics HTTPErr a
err
forall (b :: BackendType) (m :: * -> *).
(BackendEventTrigger b, MonadIO m) =>
SourceConfig b
-> Event b
-> Invocation 'EventType
-> ProcessEventError
-> MaintenanceMode MaintenanceModeVersion
-> m (Either QErr ())
recordError @b SourceConfig b
sourceConfig Event b
e Invocation 'EventType
invocation ProcessEventError
retryOrError MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion
retryOrSetError ::
( MonadIO m,
MonadGetPolicies m
) =>
Event b ->
RetryConf ->
EventTriggerMetrics ->
HTTPErr a ->
m ProcessEventError
retryOrSetError :: forall (m :: * -> *) (b :: BackendType) (a :: TriggerTypes).
(MonadIO m, MonadGetPolicies m) =>
Event b
-> RetryConf
-> EventTriggerMetrics
-> HTTPErr a
-> m ProcessEventError
retryOrSetError Event b
e RetryConf
retryConf EventTriggerMetrics
eventTriggerMetrics HTTPErr a
err = do
IO GranularPrometheusMetricsState
getPrometheusMetricsGranularity <- m (IO GranularPrometheusMetricsState)
forall (m :: * -> *).
MonadGetPolicies m =>
m (IO GranularPrometheusMetricsState)
runGetPrometheusMetricsGranularity
let mretryHeader :: Maybe Text
mretryHeader = HTTPErr a -> Maybe Text
forall {a :: TriggerTypes}. HTTPErr a -> Maybe Text
getRetryAfterHeaderFromError HTTPErr a
err
tries :: Int
tries = Event b -> Int
forall (b :: BackendType). Event b -> Int
eTries Event b
e
mretryHeaderSeconds :: Maybe Int
mretryHeaderSeconds = Maybe Text
mretryHeader Maybe Text -> (Text -> Maybe Int) -> Maybe Int
forall a b. Maybe a -> (a -> Maybe b) -> Maybe b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Text -> Maybe Int
parseRetryHeader
triesExhausted :: Bool
triesExhausted = Int
tries Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= RetryConf -> Int
rcNumRetries RetryConf
retryConf
noRetryHeader :: Bool
noRetryHeader = Maybe Int -> Bool
forall a. Maybe a -> Bool
isNothing Maybe Int
mretryHeaderSeconds
if Bool
triesExhausted Bool -> Bool -> Bool
&& Bool
noRetryHeader
then do
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
(IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IO GranularPrometheusMetricsState
-> Bool
-> CounterVector EventStatusWithTriggerLabel
-> EventStatusWithTriggerLabel
-> IO ()
forall (m :: * -> *).
MonadIO m =>
IO GranularPrometheusMetricsState
-> Bool
-> CounterVector EventStatusWithTriggerLabel
-> EventStatusWithTriggerLabel
-> m ()
incEventTriggerCounterWithLabel
IO GranularPrometheusMetricsState
getPrometheusMetricsGranularity
Bool
True
(EventTriggerMetrics -> CounterVector EventStatusWithTriggerLabel
eventProcessedTotal EventTriggerMetrics
eventTriggerMetrics)
(EventStatusLabel
-> Maybe DynamicEventTriggerLabel -> EventStatusWithTriggerLabel
EventStatusWithTriggerLabel EventStatusLabel
eventFailedLabel (DynamicEventTriggerLabel -> Maybe DynamicEventTriggerLabel
forall a. a -> Maybe a
Just (TriggerName -> SourceName -> DynamicEventTriggerLabel
DynamicEventTriggerLabel (TriggerMetadata -> TriggerName
tmName (Event b -> TriggerMetadata
forall (b :: BackendType). Event b -> TriggerMetadata
eTrigger Event b
e)) (Event b -> SourceName
forall (b :: BackendType). Event b -> SourceName
eSource Event b
e))))
ProcessEventError -> m ProcessEventError
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ProcessEventError
PESetError
else do
UTCTime
currentTime <- IO UTCTime -> m UTCTime
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
let delay :: Int
delay = Int -> Maybe Int -> Int
forall a. a -> Maybe a -> a
fromMaybe (RetryConf -> Int
rcIntervalSec RetryConf
retryConf) Maybe Int
mretryHeaderSeconds
diff :: NominalDiffTime
diff = Int -> NominalDiffTime
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
delay
retryTime :: UTCTime
retryTime = NominalDiffTime -> UTCTime -> UTCTime
addUTCTime NominalDiffTime
diff UTCTime
currentTime
ProcessEventError -> m ProcessEventError
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ProcessEventError -> m ProcessEventError)
-> ProcessEventError -> m ProcessEventError
forall a b. (a -> b) -> a -> b
$ UTCTime -> ProcessEventError
PESetRetry UTCTime
retryTime
where
getRetryAfterHeaderFromError :: HTTPErr a -> Maybe Text
getRetryAfterHeaderFromError (HStatus HTTPResp a
resp) = HTTPResp a -> Maybe Text
forall (a :: TriggerTypes). HTTPResp a -> Maybe Text
getRetryAfterHeaderFromResp HTTPResp a
resp
getRetryAfterHeaderFromError HTTPErr a
_ = Maybe Text
forall a. Maybe a
Nothing
parseRetryHeader :: Text -> Maybe Int
parseRetryHeader = (Int -> Bool) -> Maybe Int -> Maybe Int
forall (m :: * -> *) a. MonadPlus m => (a -> Bool) -> m a -> m a
mfilter (Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0) (Maybe Int -> Maybe Int)
-> (Text -> Maybe Int) -> Text -> Maybe Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Maybe Int
forall a. Read a => String -> Maybe a
readMaybe (String -> Maybe Int) -> (Text -> String) -> Text -> Maybe Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> String
T.unpack
mkInvocation ::
EventId ->
J.Value ->
Maybe Int ->
[HeaderConf] ->
SB.SerializableBlob ->
[HeaderConf] ->
Invocation 'EventType
mkInvocation :: EventId
-> Value
-> Maybe Int
-> [HeaderConf]
-> SerializableBlob
-> [HeaderConf]
-> Invocation 'EventType
mkInvocation EventId
eid Value
ep Maybe Int
statusMaybe [HeaderConf]
reqHeaders SerializableBlob
respBody [HeaderConf]
respHeaders =
let resp :: Response 'EventType
resp =
case Maybe Int
statusMaybe of
Maybe Int
Nothing -> SerializableBlob -> Response 'EventType
forall (a :: TriggerTypes). SerializableBlob -> Response a
mkClientErr SerializableBlob
respBody
Just Int
status ->
if Int
status Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
200 Bool -> Bool -> Bool
&& Int
status Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
300
then Int -> SerializableBlob -> [HeaderConf] -> Response 'EventType
forall (a :: TriggerTypes).
Int -> SerializableBlob -> [HeaderConf] -> Response a
mkResp Int
status SerializableBlob
respBody [HeaderConf]
respHeaders
else SerializableBlob -> Response 'EventType
forall (a :: TriggerTypes). SerializableBlob -> Response a
mkClientErr SerializableBlob
respBody
in EventId
-> Maybe Int
-> WebhookRequest
-> Response 'EventType
-> Invocation 'EventType
forall (a :: TriggerTypes).
EventId
-> Maybe Int -> WebhookRequest -> Response a -> Invocation a
Invocation
EventId
eid
Maybe Int
statusMaybe
(Value -> [HeaderConf] -> Text -> WebhookRequest
mkWebhookReq Value
ep [HeaderConf]
reqHeaders Text
invocationVersionET)
Response 'EventType
resp
logQErr :: (MonadReader r m, Has (L.Logger L.Hasura) r, MonadIO m) => QErr -> m ()
logQErr :: forall r (m :: * -> *).
(MonadReader r m, Has (Logger Hasura) r, MonadIO m) =>
QErr -> m ()
logQErr QErr
err = do
Logger Hasura
logger :: L.Logger L.Hasura <- (r -> Logger Hasura) -> m (Logger Hasura)
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks r -> Logger Hasura
forall a t. Has a t => t -> a
getter
Logger Hasura
-> forall a (m :: * -> *).
(ToEngineLog a Hasura, MonadIO m) =>
a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
(ToEngineLog a impl, MonadIO m) =>
a -> m ()
L.unLogger Logger Hasura
logger (EventInternalErr -> m ()) -> EventInternalErr -> m ()
forall a b. (a -> b) -> a -> b
$ QErr -> EventInternalErr
EventInternalErr QErr
err
getEventTriggerInfoFromEvent ::
forall b. (Backend b) => SchemaCache -> Event b -> Either Text (EventTriggerInfo b)
getEventTriggerInfoFromEvent :: forall (b :: BackendType).
Backend b =>
SchemaCache -> Event b -> Either Text (EventTriggerInfo b)
getEventTriggerInfoFromEvent SchemaCache
sc Event b
e = do
let table :: TableName b
table = Event b -> TableName b
forall (b :: BackendType). Event b -> TableName b
eTable Event b
e
mTableInfo :: Maybe (TableInfo b)
mTableInfo = forall (b :: BackendType).
Backend b =>
SourceName -> TableName b -> SourceCache -> Maybe (TableInfo b)
unsafeTableInfo @b (Event b -> SourceName
forall (b :: BackendType). Event b -> SourceName
eSource Event b
e) TableName b
table (SourceCache -> Maybe (TableInfo b))
-> SourceCache -> Maybe (TableInfo b)
forall a b. (a -> b) -> a -> b
$ SchemaCache -> SourceCache
scSources SchemaCache
sc
TableInfo b
tableInfo <- Maybe (TableInfo b)
-> Either Text (TableInfo b) -> Either Text (TableInfo b)
forall (m :: * -> *) a. Applicative m => Maybe a -> m a -> m a
onNothing Maybe (TableInfo b)
mTableInfo (Either Text (TableInfo b) -> Either Text (TableInfo b))
-> Either Text (TableInfo b) -> Either Text (TableInfo b)
forall a b. (a -> b) -> a -> b
$ Text -> Either Text (TableInfo b)
forall a b. a -> Either a b
Left (Text
"table '" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> TableName b
table TableName b -> Text -> Text
forall t. ToTxt t => t -> Text -> Text
<<> Text
"' not found")
let triggerName :: TriggerName
triggerName = TriggerMetadata -> TriggerName
tmName (TriggerMetadata -> TriggerName) -> TriggerMetadata -> TriggerName
forall a b. (a -> b) -> a -> b
$ Event b -> TriggerMetadata
forall (b :: BackendType). Event b -> TriggerMetadata
eTrigger Event b
e
mEventTriggerInfo :: Maybe (EventTriggerInfo b)
mEventTriggerInfo = TriggerName
-> HashMap TriggerName (EventTriggerInfo b)
-> Maybe (EventTriggerInfo b)
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HashMap.lookup TriggerName
triggerName (TableInfo b -> HashMap TriggerName (EventTriggerInfo b)
forall (b :: BackendType). TableInfo b -> EventTriggerInfoMap b
_tiEventTriggerInfoMap TableInfo b
tableInfo)
Maybe (EventTriggerInfo b)
-> Either Text (EventTriggerInfo b)
-> Either Text (EventTriggerInfo b)
forall (m :: * -> *) a. Applicative m => Maybe a -> m a -> m a
onNothing Maybe (EventTriggerInfo b)
mEventTriggerInfo
(Either Text (EventTriggerInfo b)
-> Either Text (EventTriggerInfo b))
-> Either Text (EventTriggerInfo b)
-> Either Text (EventTriggerInfo b)
forall a b. (a -> b) -> a -> b
$ Text -> Either Text (EventTriggerInfo b)
forall a b. a -> Either a b
Left
( Text
"event trigger '"
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> TriggerName -> Text
triggerNameToTxt TriggerName
triggerName
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"' on table '"
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> TableName b
table
TableName b -> Text -> Text
forall t. ToTxt t => t -> Text -> Text
<<> Text
"' not found"
)
incEventTriggerCounterWithLabel ::
(MonadIO m) =>
(IO GranularPrometheusMetricsState) ->
Bool ->
CounterVector EventStatusWithTriggerLabel ->
EventStatusWithTriggerLabel ->
m ()
incEventTriggerCounterWithLabel :: forall (m :: * -> *).
MonadIO m =>
IO GranularPrometheusMetricsState
-> Bool
-> CounterVector EventStatusWithTriggerLabel
-> EventStatusWithTriggerLabel
-> m ()
incEventTriggerCounterWithLabel IO GranularPrometheusMetricsState
getMetricState Bool
alwaysObserve CounterVector EventStatusWithTriggerLabel
counterVector (EventStatusWithTriggerLabel EventStatusLabel
status Maybe DynamicEventTriggerLabel
tl) = do
IO GranularPrometheusMetricsState -> Bool -> IO () -> IO () -> m ()
forall (m :: * -> *).
MonadIO m =>
IO GranularPrometheusMetricsState -> Bool -> IO () -> IO () -> m ()
recordMetricWithLabel
IO GranularPrometheusMetricsState
getMetricState
Bool
alwaysObserve
(IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ CounterVector EventStatusWithTriggerLabel
-> EventStatusWithTriggerLabel -> IO ()
forall label. Ord label => CounterVector label -> label -> IO ()
CounterVector.inc CounterVector EventStatusWithTriggerLabel
counterVector (EventStatusLabel
-> Maybe DynamicEventTriggerLabel -> EventStatusWithTriggerLabel
EventStatusWithTriggerLabel EventStatusLabel
status Maybe DynamicEventTriggerLabel
tl))
(IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ CounterVector EventStatusWithTriggerLabel
-> EventStatusWithTriggerLabel -> IO ()
forall label. Ord label => CounterVector label -> label -> IO ()
CounterVector.inc CounterVector EventStatusWithTriggerLabel
counterVector (EventStatusLabel
-> Maybe DynamicEventTriggerLabel -> EventStatusWithTriggerLabel
EventStatusWithTriggerLabel EventStatusLabel
status Maybe DynamicEventTriggerLabel
forall a. Maybe a
Nothing))