{-# LANGUAGE PatternSynonyms #-}
{-# LANGUAGE TemplateHaskell #-}
module Hasura.Eventing.EventTrigger
( initEventEngineCtx,
processEventQueue,
defaultMaxEventThreads,
defaultFetchInterval,
Event (..),
EventEngineCtx (..),
saveLockedEventTriggerEvents,
removeEventTriggerEventFromLockedEvents,
)
where
import Control.Concurrent.Async.Lifted.Safe qualified as LA
import Control.Concurrent.Extended (Forever (..), sleep)
import Control.Concurrent.STM.TVar
import Control.Lens
import Control.Monad.Catch (MonadMask, bracket_, finally, mask_)
import Control.Monad.STM
import Control.Monad.Trans.Control (MonadBaseControl)
import Data.Aeson qualified as J
import Data.Aeson.TH
import Data.Has
import Data.HashMap.Strict qualified as M
import Data.SerializableBlob qualified as SB
import Data.Set qualified as Set
import Data.String
import Data.Text qualified as T
import Data.Text.Extended
import Data.Text.NonEmpty
import Data.Time.Clock
import Data.Time.Clock qualified as Time
import Hasura.Backends.Postgres.SQL.Types hiding (TableName)
import Hasura.Base.Error
import Hasura.Eventing.Common
import Hasura.Eventing.HTTP
import Hasura.HTTP (getHTTPExceptionStatus)
import Hasura.Logging qualified as L
import Hasura.Prelude
import Hasura.RQL.DDL.Headers
import Hasura.RQL.DDL.Webhook.Transform
import Hasura.RQL.Types.Backend
import Hasura.RQL.Types.Common
import Hasura.RQL.Types.EventTrigger
import Hasura.RQL.Types.Eventing.Backend
import Hasura.RQL.Types.Numeric (NonNegativeInt)
import Hasura.RQL.Types.Numeric qualified as Numeric
import Hasura.RQL.Types.SchemaCache
import Hasura.RQL.Types.Source
import Hasura.SQL.AnyBackend qualified as AB
import Hasura.SQL.Backend
import Hasura.Server.Metrics (ServerMetrics (..))
import Hasura.Server.Prometheus (EventTriggerMetrics (..))
import Hasura.Server.Types
import Hasura.Tracing qualified as Tracing
import Network.HTTP.Client.Transformable qualified as HTTP
import System.Metrics.Distribution qualified as EKG.Distribution
import System.Metrics.Gauge qualified as EKG.Gauge
import System.Metrics.Prometheus.Gauge qualified as Prometheus.Gauge
import System.Metrics.Prometheus.Histogram qualified as Prometheus.Histogram
newtype EventInternalErr
= EventInternalErr QErr
deriving (EventInternalErr -> EventInternalErr -> Bool
(EventInternalErr -> EventInternalErr -> Bool)
-> (EventInternalErr -> EventInternalErr -> Bool)
-> Eq EventInternalErr
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: EventInternalErr -> EventInternalErr -> Bool
$c/= :: EventInternalErr -> EventInternalErr -> Bool
== :: EventInternalErr -> EventInternalErr -> Bool
$c== :: EventInternalErr -> EventInternalErr -> Bool
Eq)
instance L.ToEngineLog EventInternalErr L.Hasura where
toEngineLog :: EventInternalErr -> (LogLevel, EngineLogType Hasura, Value)
toEngineLog (EventInternalErr QErr
qerr) = (LogLevel
L.LevelError, EngineLogType Hasura
L.eventTriggerLogType, QErr -> Value
forall a. ToJSON a => a -> Value
J.toJSON QErr
qerr)
data EventEngineCtx = EventEngineCtx
{ EventEngineCtx -> TVar Int
_eeCtxEventThreadsCapacity :: TVar Int,
EventEngineCtx -> DiffTime
_eeCtxFetchInterval :: DiffTime,
EventEngineCtx -> NonNegativeInt
_eeCtxFetchSize :: NonNegativeInt
}
data DeliveryInfo = DeliveryInfo
{ DeliveryInfo -> Int
diCurrentRetry :: Int,
DeliveryInfo -> Int
diMaxRetries :: Int
}
deriving (Int -> DeliveryInfo -> ShowS
[DeliveryInfo] -> ShowS
DeliveryInfo -> String
(Int -> DeliveryInfo -> ShowS)
-> (DeliveryInfo -> String)
-> ([DeliveryInfo] -> ShowS)
-> Show DeliveryInfo
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [DeliveryInfo] -> ShowS
$cshowList :: [DeliveryInfo] -> ShowS
show :: DeliveryInfo -> String
$cshow :: DeliveryInfo -> String
showsPrec :: Int -> DeliveryInfo -> ShowS
$cshowsPrec :: Int -> DeliveryInfo -> ShowS
Show, DeliveryInfo -> DeliveryInfo -> Bool
(DeliveryInfo -> DeliveryInfo -> Bool)
-> (DeliveryInfo -> DeliveryInfo -> Bool) -> Eq DeliveryInfo
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: DeliveryInfo -> DeliveryInfo -> Bool
$c/= :: DeliveryInfo -> DeliveryInfo -> Bool
== :: DeliveryInfo -> DeliveryInfo -> Bool
$c== :: DeliveryInfo -> DeliveryInfo -> Bool
Eq)
$(deriveJSON hasuraJSON {omitNothingFields = True} ''DeliveryInfo)
newtype QualifiedTableStrict = QualifiedTableStrict
{ QualifiedTableStrict -> QualifiedTable
getQualifiedTable :: QualifiedTable
}
deriving (Int -> QualifiedTableStrict -> ShowS
[QualifiedTableStrict] -> ShowS
QualifiedTableStrict -> String
(Int -> QualifiedTableStrict -> ShowS)
-> (QualifiedTableStrict -> String)
-> ([QualifiedTableStrict] -> ShowS)
-> Show QualifiedTableStrict
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [QualifiedTableStrict] -> ShowS
$cshowList :: [QualifiedTableStrict] -> ShowS
show :: QualifiedTableStrict -> String
$cshow :: QualifiedTableStrict -> String
showsPrec :: Int -> QualifiedTableStrict -> ShowS
$cshowsPrec :: Int -> QualifiedTableStrict -> ShowS
Show, QualifiedTableStrict -> QualifiedTableStrict -> Bool
(QualifiedTableStrict -> QualifiedTableStrict -> Bool)
-> (QualifiedTableStrict -> QualifiedTableStrict -> Bool)
-> Eq QualifiedTableStrict
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: QualifiedTableStrict -> QualifiedTableStrict -> Bool
$c/= :: QualifiedTableStrict -> QualifiedTableStrict -> Bool
== :: QualifiedTableStrict -> QualifiedTableStrict -> Bool
$c== :: QualifiedTableStrict -> QualifiedTableStrict -> Bool
Eq)
instance J.ToJSON QualifiedTableStrict where
toJSON :: QualifiedTableStrict -> Value
toJSON (QualifiedTableStrict (QualifiedObject SchemaName
sn TableName
tn)) =
[Pair] -> Value
J.object
[ Key
"schema" Key -> SchemaName -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= SchemaName
sn,
Key
"name" Key -> TableName -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= TableName
tn
]
data EventPayload (b :: BackendType) = EventPayload
{ EventPayload b -> EventId
epId :: EventId,
EventPayload b -> TableName b
epTable :: TableName b,
EventPayload b -> TriggerMetadata
epTrigger :: TriggerMetadata,
EventPayload b -> Value
epEvent :: J.Value,
EventPayload b -> DeliveryInfo
epDeliveryInfo :: DeliveryInfo,
EventPayload b -> UTCTime
epCreatedAt :: Time.UTCTime
}
deriving ((forall x. EventPayload b -> Rep (EventPayload b) x)
-> (forall x. Rep (EventPayload b) x -> EventPayload b)
-> Generic (EventPayload b)
forall x. Rep (EventPayload b) x -> EventPayload b
forall x. EventPayload b -> Rep (EventPayload b) x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
forall (b :: BackendType) x.
Rep (EventPayload b) x -> EventPayload b
forall (b :: BackendType) x.
EventPayload b -> Rep (EventPayload b) x
$cto :: forall (b :: BackendType) x.
Rep (EventPayload b) x -> EventPayload b
$cfrom :: forall (b :: BackendType) x.
EventPayload b -> Rep (EventPayload b) x
Generic)
deriving instance Backend b => Show (EventPayload b)
deriving instance Backend b => Eq (EventPayload b)
instance Backend b => J.ToJSON (EventPayload b) where
toJSON :: EventPayload b -> Value
toJSON = Options -> EventPayload b -> Value
forall a.
(Generic a, GToJSON' Value Zero (Rep a)) =>
Options -> a -> Value
J.genericToJSON Options
hasuraJSON {omitNothingFields :: Bool
omitNothingFields = Bool
True}
defaultMaxEventThreads :: Numeric.PositiveInt
defaultMaxEventThreads :: PositiveInt
defaultMaxEventThreads = Int -> PositiveInt
Numeric.unsafePositiveInt Int
100
defaultFetchInterval :: DiffTime
defaultFetchInterval :: DiffTime
defaultFetchInterval = Seconds -> DiffTime
seconds Seconds
1
initEventEngineCtx :: Int -> DiffTime -> NonNegativeInt -> STM EventEngineCtx
initEventEngineCtx :: Int -> DiffTime -> NonNegativeInt -> STM EventEngineCtx
initEventEngineCtx Int
maxT DiffTime
_eeCtxFetchInterval NonNegativeInt
_eeCtxFetchSize = do
TVar Int
_eeCtxEventThreadsCapacity <- Int -> STM (TVar Int)
forall a. a -> STM (TVar a)
newTVar Int
maxT
EventEngineCtx -> STM EventEngineCtx
forall (m :: * -> *) a. Monad m => a -> m a
return (EventEngineCtx -> STM EventEngineCtx)
-> EventEngineCtx -> STM EventEngineCtx
forall a b. (a -> b) -> a -> b
$ EventEngineCtx :: TVar Int -> DiffTime -> NonNegativeInt -> EventEngineCtx
EventEngineCtx {TVar Int
DiffTime
NonNegativeInt
_eeCtxEventThreadsCapacity :: TVar Int
_eeCtxFetchSize :: NonNegativeInt
_eeCtxFetchInterval :: DiffTime
_eeCtxFetchSize :: NonNegativeInt
_eeCtxFetchInterval :: DiffTime
_eeCtxEventThreadsCapacity :: TVar Int
..}
saveLockedEventTriggerEvents :: MonadIO m => SourceName -> [EventId] -> TVar (HashMap SourceName (Set.Set EventId)) -> m ()
saveLockedEventTriggerEvents :: SourceName
-> [EventId] -> TVar (HashMap SourceName (Set EventId)) -> m ()
saveLockedEventTriggerEvents SourceName
sourceName [EventId]
eventIds TVar (HashMap SourceName (Set EventId))
lockedEvents =
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
HashMap SourceName (Set EventId)
lockedEventsVals <- TVar (HashMap SourceName (Set EventId))
-> STM (HashMap SourceName (Set EventId))
forall a. TVar a -> STM a
readTVar TVar (HashMap SourceName (Set EventId))
lockedEvents
case SourceName
-> HashMap SourceName (Set EventId) -> Maybe (Set EventId)
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
M.lookup SourceName
sourceName HashMap SourceName (Set EventId)
lockedEventsVals of
Maybe (Set EventId)
Nothing -> TVar (HashMap SourceName (Set EventId))
-> HashMap SourceName (Set EventId) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (HashMap SourceName (Set EventId))
lockedEvents (HashMap SourceName (Set EventId) -> STM ())
-> HashMap SourceName (Set EventId) -> STM ()
forall a b. (a -> b) -> a -> b
$! SourceName -> Set EventId -> HashMap SourceName (Set EventId)
forall k v. Hashable k => k -> v -> HashMap k v
M.singleton SourceName
sourceName ([EventId] -> Set EventId
forall a. Ord a => [a] -> Set a
Set.fromList [EventId]
eventIds)
Just Set EventId
_ -> TVar (HashMap SourceName (Set EventId))
-> HashMap SourceName (Set EventId) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (HashMap SourceName (Set EventId))
lockedEvents (HashMap SourceName (Set EventId) -> STM ())
-> HashMap SourceName (Set EventId) -> STM ()
forall a b. (a -> b) -> a -> b
$! (Set EventId -> Set EventId -> Set EventId)
-> SourceName
-> Set EventId
-> HashMap SourceName (Set EventId)
-> HashMap SourceName (Set EventId)
forall k v.
(Eq k, Hashable k) =>
(v -> v -> v) -> k -> v -> HashMap k v -> HashMap k v
M.insertWith Set EventId -> Set EventId -> Set EventId
forall a. Ord a => Set a -> Set a -> Set a
Set.union SourceName
sourceName ([EventId] -> Set EventId
forall a. Ord a => [a] -> Set a
Set.fromList [EventId]
eventIds) HashMap SourceName (Set EventId)
lockedEventsVals
removeEventTriggerEventFromLockedEvents ::
MonadIO m => SourceName -> EventId -> TVar (HashMap SourceName (Set.Set EventId)) -> m ()
removeEventTriggerEventFromLockedEvents :: SourceName
-> EventId -> TVar (HashMap SourceName (Set EventId)) -> m ()
removeEventTriggerEventFromLockedEvents SourceName
sourceName EventId
eventId TVar (HashMap SourceName (Set EventId))
lockedEvents =
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
HashMap SourceName (Set EventId)
lockedEventsVals <- TVar (HashMap SourceName (Set EventId))
-> STM (HashMap SourceName (Set EventId))
forall a. TVar a -> STM a
readTVar TVar (HashMap SourceName (Set EventId))
lockedEvents
TVar (HashMap SourceName (Set EventId))
-> HashMap SourceName (Set EventId) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (HashMap SourceName (Set EventId))
lockedEvents (HashMap SourceName (Set EventId) -> STM ())
-> HashMap SourceName (Set EventId) -> STM ()
forall a b. (a -> b) -> a -> b
$! (Set EventId -> Set EventId)
-> SourceName
-> HashMap SourceName (Set EventId)
-> HashMap SourceName (Set EventId)
forall k v.
(Eq k, Hashable k) =>
(v -> v) -> k -> HashMap k v -> HashMap k v
M.adjust (EventId -> Set EventId -> Set EventId
forall a. Ord a => a -> Set a -> Set a
Set.delete EventId
eventId) SourceName
sourceName HashMap SourceName (Set EventId)
lockedEventsVals
type BackendEventWithSource = AB.AnyBackend EventWithSource
type FetchEventArguments = ([BackendEventWithSource], Int, Bool)
processEventQueue ::
forall m.
( MonadIO m,
Tracing.HasReporter m,
MonadBaseControl IO m,
LA.Forall (LA.Pure m),
MonadMask m
) =>
L.Logger L.Hasura ->
HTTP.Manager ->
IO SchemaCache ->
EventEngineCtx ->
LockedEventsCtx ->
ServerMetrics ->
EventTriggerMetrics ->
MaintenanceMode () ->
m (Forever m)
processEventQueue :: Logger Hasura
-> Manager
-> IO SchemaCache
-> EventEngineCtx
-> LockedEventsCtx
-> ServerMetrics
-> EventTriggerMetrics
-> MaintenanceMode ()
-> m (Forever m)
processEventQueue Logger Hasura
logger Manager
httpMgr IO SchemaCache
getSchemaCache EventEngineCtx {TVar Int
DiffTime
NonNegativeInt
_eeCtxFetchSize :: NonNegativeInt
_eeCtxFetchInterval :: DiffTime
_eeCtxEventThreadsCapacity :: TVar Int
_eeCtxFetchSize :: EventEngineCtx -> NonNegativeInt
_eeCtxFetchInterval :: EventEngineCtx -> DiffTime
_eeCtxEventThreadsCapacity :: EventEngineCtx -> TVar Int
..} LockedEventsCtx {TVar (HashMap SourceName (Set EventId))
leEvents :: LockedEventsCtx -> TVar (HashMap SourceName (Set EventId))
leEvents :: TVar (HashMap SourceName (Set EventId))
leEvents} ServerMetrics
serverMetrics EventTriggerMetrics
eventTriggerMetrics MaintenanceMode ()
maintenanceMode = do
[BackendEventWithSource]
events0 <- m [BackendEventWithSource]
popEventsBatch
Forever m -> m (Forever m)
forall (m :: * -> *) a. Monad m => a -> m a
return (Forever m -> m (Forever m)) -> Forever m -> m (Forever m)
forall a b. (a -> b) -> a -> b
$ ([BackendEventWithSource], Int, Bool)
-> (([BackendEventWithSource], Int, Bool)
-> m ([BackendEventWithSource], Int, Bool))
-> Forever m
forall (m :: * -> *) a. a -> (a -> m a) -> Forever m
Forever ([BackendEventWithSource]
events0, Int
0, Bool
False) ([BackendEventWithSource], Int, Bool)
-> m ([BackendEventWithSource], Int, Bool)
go
where
fetchBatchSize :: Int
fetchBatchSize = NonNegativeInt -> Int
Numeric.getNonNegativeInt NonNegativeInt
_eeCtxFetchSize
popEventsBatch :: m [BackendEventWithSource]
popEventsBatch :: m [BackendEventWithSource]
popEventsBatch = do
SourceCache
allSources <- SchemaCache -> SourceCache
scSources (SchemaCache -> SourceCache) -> m SchemaCache -> m SourceCache
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO SchemaCache -> m SchemaCache
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO SchemaCache
getSchemaCache
IO [BackendEventWithSource] -> m [BackendEventWithSource]
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [BackendEventWithSource] -> m [BackendEventWithSource])
-> (IO [[BackendEventWithSource]] -> IO [BackendEventWithSource])
-> IO [[BackendEventWithSource]]
-> m [BackendEventWithSource]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ([[BackendEventWithSource]] -> [BackendEventWithSource])
-> IO [[BackendEventWithSource]] -> IO [BackendEventWithSource]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap [[BackendEventWithSource]] -> [BackendEventWithSource]
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat (IO [[BackendEventWithSource]] -> m [BackendEventWithSource])
-> IO [[BackendEventWithSource]] -> m [BackendEventWithSource]
forall a b. (a -> b) -> a -> b
$
[(SourceName, BackendSourceInfo)]
-> ((SourceName, BackendSourceInfo) -> IO [BackendEventWithSource])
-> IO [[BackendEventWithSource]]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, MonadBaseControl IO m, Forall (Pure m)) =>
t a -> (a -> m b) -> m (t b)
LA.forConcurrently (SourceCache -> [(SourceName, BackendSourceInfo)]
forall k v. HashMap k v -> [(k, v)]
M.toList SourceCache
allSources) \(SourceName
sourceName, BackendSourceInfo
sourceCache) ->
BackendSourceInfo
-> (forall (b :: BackendType).
BackendEventTrigger b =>
SourceInfo b -> IO [BackendEventWithSource])
-> IO [BackendEventWithSource]
forall (c :: BackendType -> Constraint) (i :: BackendType -> *) r.
AllBackendsSatisfy c =>
AnyBackend i -> (forall (b :: BackendType). c b => i b -> r) -> r
AB.dispatchAnyBackend @BackendEventTrigger BackendSourceInfo
sourceCache \(SourceInfo SourceName
_sourceName TableCache b
tableCache FunctionCache b
_functionCache SourceConfig b
sourceConfig Maybe QueryTagsConfig
_queryTagsConfig SourceCustomization
_sourceCustomization :: SourceInfo b) -> do
let tables :: [TableInfo b]
tables = TableCache b -> [TableInfo b]
forall k v. HashMap k v -> [v]
M.elems TableCache b
tableCache
triggerMap :: [EventTriggerInfoMap b]
triggerMap = TableInfo b -> EventTriggerInfoMap b
forall (b :: BackendType). TableInfo b -> EventTriggerInfoMap b
_tiEventTriggerInfoMap (TableInfo b -> EventTriggerInfoMap b)
-> [TableInfo b] -> [EventTriggerInfoMap b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [TableInfo b]
tables
eventTriggerCount :: Int
eventTriggerCount = [Int] -> Int
forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum (EventTriggerInfoMap b -> Int
forall k v. HashMap k v -> Int
M.size (EventTriggerInfoMap b -> Int) -> [EventTriggerInfoMap b] -> [Int]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [EventTriggerInfoMap b]
triggerMap)
triggerNames :: [TriggerName]
triggerNames = (EventTriggerInfoMap b -> [TriggerName])
-> [EventTriggerInfoMap b] -> [TriggerName]
forall (t :: * -> *) a b. Foldable t => (a -> [b]) -> t a -> [b]
concatMap EventTriggerInfoMap b -> [TriggerName]
forall k v. HashMap k v -> [k]
M.keys [EventTriggerInfoMap b]
triggerMap
if Int
eventTriggerCount Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
then
( ExceptT QErr IO [Event b] -> IO (Either QErr [Event b])
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (SourceConfig b
-> SourceName
-> [TriggerName]
-> MaintenanceMode ()
-> FetchBatchSize
-> ExceptT QErr IO [Event b]
forall (b :: BackendType) (m :: * -> *).
(BackendEventTrigger b, MonadIO m, MonadError QErr m) =>
SourceConfig b
-> SourceName
-> [TriggerName]
-> MaintenanceMode ()
-> FetchBatchSize
-> m [Event b]
fetchUndeliveredEvents @b SourceConfig b
sourceConfig SourceName
sourceName [TriggerName]
triggerNames MaintenanceMode ()
maintenanceMode (Int -> FetchBatchSize
FetchBatchSize Int
fetchBatchSize)) IO (Either QErr [Event b])
-> (Either QErr [Event b] -> IO [BackendEventWithSource])
-> IO [BackendEventWithSource]
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Right [Event b]
events -> do
()
_ <- IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Distribution -> Double -> IO ()
EKG.Distribution.add (ServerMetrics -> Distribution
smNumEventsFetchedPerBatch ServerMetrics
serverMetrics) (Int -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Double) -> Int -> Double
forall a b. (a -> b) -> a -> b
$ [Event b] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Event b]
events)
UTCTime
eventsFetchedTime <- IO UTCTime -> IO UTCTime
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
SourceName
-> [EventId] -> TVar (HashMap SourceName (Set EventId)) -> IO ()
forall (m :: * -> *).
MonadIO m =>
SourceName
-> [EventId] -> TVar (HashMap SourceName (Set EventId)) -> m ()
saveLockedEventTriggerEvents SourceName
sourceName (Event b -> EventId
forall (b :: BackendType). Event b -> EventId
eId (Event b -> EventId) -> [Event b] -> [EventId]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [Event b]
events) TVar (HashMap SourceName (Set EventId))
leEvents
[BackendEventWithSource] -> IO [BackendEventWithSource]
forall (m :: * -> *) a. Monad m => a -> m a
return ([BackendEventWithSource] -> IO [BackendEventWithSource])
-> [BackendEventWithSource] -> IO [BackendEventWithSource]
forall a b. (a -> b) -> a -> b
$ (Event b -> BackendEventWithSource)
-> [Event b] -> [BackendEventWithSource]
forall a b. (a -> b) -> [a] -> [b]
map (\Event b
event -> forall (b :: BackendType) (i :: BackendType -> *).
HasTag b =>
i b -> AnyBackend i
forall (i :: BackendType -> *). HasTag b => i b -> AnyBackend i
AB.mkAnyBackend @b (EventWithSource b -> BackendEventWithSource)
-> EventWithSource b -> BackendEventWithSource
forall a b. (a -> b) -> a -> b
$ Event b
-> SourceConfig b -> SourceName -> UTCTime -> EventWithSource b
forall (b :: BackendType).
Event b
-> SourceConfig b -> SourceName -> UTCTime -> EventWithSource b
EventWithSource Event b
event SourceConfig b
sourceConfig SourceName
sourceName UTCTime
eventsFetchedTime) [Event b]
events
Left QErr
err -> do
IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Logger Hasura
-> forall a (m :: * -> *).
(ToEngineLog a Hasura, MonadIO m) =>
a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
(ToEngineLog a impl, MonadIO m) =>
a -> m ()
L.unLogger Logger Hasura
logger (EventInternalErr -> IO ()) -> EventInternalErr -> IO ()
forall a b. (a -> b) -> a -> b
$ QErr -> EventInternalErr
EventInternalErr QErr
err
[BackendEventWithSource] -> IO [BackendEventWithSource]
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
)
else [BackendEventWithSource] -> IO [BackendEventWithSource]
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
go :: FetchEventArguments -> m FetchEventArguments
go :: ([BackendEventWithSource], Int, Bool)
-> m ([BackendEventWithSource], Int, Bool)
go ([BackendEventWithSource]
events, !Int
fullFetchCount, !Bool
alreadyWarned) = do
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when ([BackendEventWithSource] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [BackendEventWithSource]
events) (m () -> m ()) -> (IO () -> m ()) -> IO () -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ DiffTime -> IO ()
sleep DiffTime
_eeCtxFetchInterval
[BackendEventWithSource]
eventsNext <- m [BackendEventWithSource]
-> (Async [BackendEventWithSource] -> m [BackendEventWithSource])
-> m [BackendEventWithSource]
forall (m :: * -> *) a b.
(MonadBaseControl IO m, Forall (Pure m)) =>
m a -> (Async a -> m b) -> m b
LA.withAsync m [BackendEventWithSource]
popEventsBatch ((Async [BackendEventWithSource] -> m [BackendEventWithSource])
-> m [BackendEventWithSource])
-> (Async [BackendEventWithSource] -> m [BackendEventWithSource])
-> m [BackendEventWithSource]
forall a b. (a -> b) -> a -> b
$ \Async [BackendEventWithSource]
eventsNextA -> do
[BackendEventWithSource]
-> (BackendEventWithSource -> m ()) -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [BackendEventWithSource]
events ((BackendEventWithSource -> m ()) -> m ())
-> (BackendEventWithSource -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \BackendEventWithSource
eventWithSource ->
BackendEventWithSource
-> (forall (b :: BackendType).
BackendEventTrigger b =>
EventWithSource b -> m ())
-> m ()
forall (c :: BackendType -> Constraint) (i :: BackendType -> *) r.
AllBackendsSatisfy c =>
AnyBackend i -> (forall (b :: BackendType). c b => i b -> r) -> r
AB.dispatchAnyBackend @BackendEventTrigger BackendEventWithSource
eventWithSource \(EventWithSource b
eventWithSource' :: EventWithSource b) ->
m () -> m ()
forall (m :: * -> *) a. MonadMask m => m a -> m a
mask_ (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Int
capacity <- TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
_eeCtxEventThreadsCapacity
Bool -> STM ()
check (Bool -> STM ()) -> Bool -> STM ()
forall a b. (a -> b) -> a -> b
$ Int
capacity Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
TVar Int -> Int -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Int
_eeCtxEventThreadsCapacity (Int
capacity Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1)
let restoreCapacity :: ReaderT (Logger Hasura, Manager) m ()
restoreCapacity =
IO () -> ReaderT (Logger Hasura, Manager) m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ReaderT (Logger Hasura, Manager) m ())
-> IO () -> ReaderT (Logger Hasura, Manager) m ()
forall a b. (a -> b) -> a -> b
$
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
_eeCtxEventThreadsCapacity (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
Async ()
t <-
m () -> m (Async ())
forall (m :: * -> *) a.
(MonadBaseControl IO m, Forall (Pure m)) =>
m a -> m (Async a)
LA.async (m () -> m (Async ())) -> m () -> m (Async ())
forall a b. (a -> b) -> a -> b
$
(ReaderT (Logger Hasura, Manager) m ()
-> (Logger Hasura, Manager) -> m ())
-> (Logger Hasura, Manager)
-> ReaderT (Logger Hasura, Manager) m ()
-> m ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip ReaderT (Logger Hasura, Manager) m ()
-> (Logger Hasura, Manager) -> m ()
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT (Logger Hasura
logger, Manager
httpMgr) (ReaderT (Logger Hasura, Manager) m () -> m ())
-> ReaderT (Logger Hasura, Manager) m () -> m ()
forall a b. (a -> b) -> a -> b
$
EventWithSource b -> ReaderT (Logger Hasura, Manager) m ()
forall (io :: * -> *) r (b :: BackendType).
(MonadIO io, MonadReader r io, Has Manager r,
Has (Logger Hasura) r, HasReporter io, MonadMask io,
BackendEventTrigger b) =>
EventWithSource b -> io ()
processEvent EventWithSource b
eventWithSource'
ReaderT (Logger Hasura, Manager) m ()
-> ReaderT (Logger Hasura, Manager) m ()
-> ReaderT (Logger Hasura, Manager) m ()
forall (m :: * -> *) a b. MonadMask m => m a -> m b -> m a
`finally`
ReaderT (Logger Hasura, Manager) m ()
restoreCapacity
Async () -> m ()
forall (m :: * -> *) a. MonadBase IO m => Async a -> m ()
LA.link Async ()
t
Async [BackendEventWithSource] -> m [BackendEventWithSource]
forall (m :: * -> *) a.
(MonadBase IO m, Forall (Pure m)) =>
Async a -> m a
LA.wait Async [BackendEventWithSource]
eventsNextA
let lenEvents :: Int
lenEvents = [BackendEventWithSource] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [BackendEventWithSource]
events
if
| Int
lenEvents Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
fetchBatchSize -> do
let clearlyBehind :: Bool
clearlyBehind = Int
fullFetchCount Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
3
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
alreadyWarned (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
clearlyBehind (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
Logger Hasura
-> forall a (m :: * -> *).
(ToEngineLog a Hasura, MonadIO m) =>
a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
(ToEngineLog a impl, MonadIO m) =>
a -> m ()
L.unLogger Logger Hasura
logger (UnstructuredLog -> m ()) -> UnstructuredLog -> m ()
forall a b. (a -> b) -> a -> b
$
LogLevel -> SerializableBlob -> UnstructuredLog
L.UnstructuredLog LogLevel
L.LevelWarn (SerializableBlob -> UnstructuredLog)
-> SerializableBlob -> UnstructuredLog
forall a b. (a -> b) -> a -> b
$
String -> SerializableBlob
forall a. IsString a => String -> a
fromString (String -> SerializableBlob) -> String -> SerializableBlob
forall a b. (a -> b) -> a -> b
$
String
"Events processor may not be keeping up with events generated in postgres, "
String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
"or we're working on a backlog of events. Consider increasing "
String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
"HASURA_GRAPHQL_EVENTS_HTTP_POOL_SIZE"
([BackendEventWithSource], Int, Bool)
-> m ([BackendEventWithSource], Int, Bool)
forall (m :: * -> *) a. Monad m => a -> m a
return ([BackendEventWithSource]
eventsNext, (Int
fullFetchCount Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1), (Bool
alreadyWarned Bool -> Bool -> Bool
|| Bool
clearlyBehind))
| Bool
otherwise -> do
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
lenEvents Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
/= Int
fetchBatchSize Bool -> Bool -> Bool
&& Bool
alreadyWarned) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
Logger Hasura
-> forall a (m :: * -> *).
(ToEngineLog a Hasura, MonadIO m) =>
a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
(ToEngineLog a impl, MonadIO m) =>
a -> m ()
L.unLogger Logger Hasura
logger (UnstructuredLog -> m ()) -> UnstructuredLog -> m ()
forall a b. (a -> b) -> a -> b
$
LogLevel -> SerializableBlob -> UnstructuredLog
L.UnstructuredLog LogLevel
L.LevelWarn (SerializableBlob -> UnstructuredLog)
-> SerializableBlob -> UnstructuredLog
forall a b. (a -> b) -> a -> b
$
String -> SerializableBlob
forall a. IsString a => String -> a
fromString (String -> SerializableBlob) -> String -> SerializableBlob
forall a b. (a -> b) -> a -> b
$
String
"It looks like the events processor is keeping up again."
([BackendEventWithSource], Int, Bool)
-> m ([BackendEventWithSource], Int, Bool)
forall (m :: * -> *) a. Monad m => a -> m a
return ([BackendEventWithSource]
eventsNext, Int
0, Bool
False)
processEvent ::
forall io r b.
( MonadIO io,
MonadReader r io,
Has HTTP.Manager r,
Has (L.Logger L.Hasura) r,
Tracing.HasReporter io,
MonadMask io,
BackendEventTrigger b
) =>
EventWithSource b ->
io ()
processEvent :: EventWithSource b -> io ()
processEvent (EventWithSource Event b
e SourceConfig b
sourceConfig SourceName
sourceName UTCTime
eventFetchedTime) = do
UTCTime
eventProcessTime <- IO UTCTime -> io UTCTime
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
let eventQueueTime :: Double
eventQueueTime = NominalDiffTime -> Double
forall a b. (Real a, Fractional b) => a -> b
realToFrac (NominalDiffTime -> Double) -> NominalDiffTime -> Double
forall a b. (a -> b) -> a -> b
$ UTCTime -> UTCTime -> NominalDiffTime
diffUTCTime UTCTime
eventProcessTime UTCTime
eventFetchedTime
()
_ <- IO () -> io ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> io ()) -> IO () -> io ()
forall a b. (a -> b) -> a -> b
$ Distribution -> Double -> IO ()
EKG.Distribution.add (ServerMetrics -> Distribution
smEventQueueTime ServerMetrics
serverMetrics) Double
eventQueueTime
IO () -> io ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> io ()) -> IO () -> io ()
forall a b. (a -> b) -> a -> b
$ Histogram -> Double -> IO ()
Prometheus.Histogram.observe (EventTriggerMetrics -> Histogram
eventQueueTimeSeconds EventTriggerMetrics
eventTriggerMetrics) Double
eventQueueTime
SchemaCache
cache <- IO SchemaCache -> io SchemaCache
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO SchemaCache
getSchemaCache
Maybe TraceContext
tracingCtx <- IO (Maybe TraceContext) -> io (Maybe TraceContext)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Value -> IO (Maybe TraceContext)
Tracing.extractEventContext (Event b -> Value
forall (b :: BackendType). Event b -> Value
eEvent Event b
e))
let spanName :: EventTriggerInfo b -> Text
spanName EventTriggerInfo b
eti = Text
"Event trigger: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> NonEmptyText -> Text
unNonEmptyText (TriggerName -> NonEmptyText
unTriggerName (EventTriggerInfo b -> TriggerName
forall (b :: BackendType). EventTriggerInfo b -> TriggerName
etiName EventTriggerInfo b
eti))
runTraceT :: Text -> TraceT io () -> io ()
runTraceT =
(Text -> TraceT io () -> io ())
-> (TraceContext -> Text -> TraceT io () -> io ())
-> Maybe TraceContext
-> Text
-> TraceT io ()
-> io ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe
Text -> TraceT io () -> io ()
forall (m :: * -> *) a.
(HasReporter m, MonadIO m) =>
Text -> TraceT m a -> m a
Tracing.runTraceT
TraceContext -> Text -> TraceT io () -> io ()
forall (m :: * -> *) a.
(MonadIO m, HasReporter m) =>
TraceContext -> Text -> TraceT m a -> m a
Tracing.runTraceTInContext
Maybe TraceContext
tracingCtx
Either QErr (MaintenanceMode MaintenanceModeVersion)
maintenanceModeVersionEither :: Either QErr (MaintenanceMode MaintenanceModeVersion) <-
case MaintenanceMode ()
maintenanceMode of
MaintenanceModeEnabled () -> do
ExceptT QErr io MaintenanceModeVersion
-> io (Either QErr MaintenanceModeVersion)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (SourceConfig b -> ExceptT QErr io MaintenanceModeVersion
forall (b :: BackendType) (m :: * -> *).
(BackendEventTrigger b, MonadIO m, MonadError QErr m) =>
SourceConfig b -> m MaintenanceModeVersion
getMaintenanceModeVersion @b SourceConfig b
sourceConfig) io (Either QErr MaintenanceModeVersion)
-> (Either QErr MaintenanceModeVersion
-> Either QErr (MaintenanceMode MaintenanceModeVersion))
-> io (Either QErr (MaintenanceMode MaintenanceModeVersion))
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \case
Left QErr
err -> QErr -> Either QErr (MaintenanceMode MaintenanceModeVersion)
forall a b. a -> Either a b
Left QErr
err
Right MaintenanceModeVersion
maintenanceModeVersion -> MaintenanceMode MaintenanceModeVersion
-> Either QErr (MaintenanceMode MaintenanceModeVersion)
forall a b. b -> Either a b
Right (MaintenanceMode MaintenanceModeVersion
-> Either QErr (MaintenanceMode MaintenanceModeVersion))
-> MaintenanceMode MaintenanceModeVersion
-> Either QErr (MaintenanceMode MaintenanceModeVersion)
forall a b. (a -> b) -> a -> b
$ (MaintenanceModeVersion -> MaintenanceMode MaintenanceModeVersion
forall a. a -> MaintenanceMode a
MaintenanceModeEnabled MaintenanceModeVersion
maintenanceModeVersion)
MaintenanceMode ()
MaintenanceModeDisabled -> Either QErr (MaintenanceMode MaintenanceModeVersion)
-> io (Either QErr (MaintenanceMode MaintenanceModeVersion))
forall (m :: * -> *) a. Monad m => a -> m a
return (Either QErr (MaintenanceMode MaintenanceModeVersion)
-> io (Either QErr (MaintenanceMode MaintenanceModeVersion)))
-> Either QErr (MaintenanceMode MaintenanceModeVersion)
-> io (Either QErr (MaintenanceMode MaintenanceModeVersion))
forall a b. (a -> b) -> a -> b
$ MaintenanceMode MaintenanceModeVersion
-> Either QErr (MaintenanceMode MaintenanceModeVersion)
forall a b. b -> Either a b
Right MaintenanceMode MaintenanceModeVersion
forall a. MaintenanceMode a
MaintenanceModeDisabled
case Either QErr (MaintenanceMode MaintenanceModeVersion)
maintenanceModeVersionEither of
Left QErr
maintenanceModeVersionErr -> QErr -> io ()
forall r (m :: * -> *).
(MonadReader r m, Has (Logger Hasura) r, MonadIO m) =>
QErr -> m ()
logQErr QErr
maintenanceModeVersionErr
Right MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion ->
case SchemaCache -> Event b -> Either Text (EventTriggerInfo b)
forall (b :: BackendType).
Backend b =>
SchemaCache -> Event b -> Either Text (EventTriggerInfo b)
getEventTriggerInfoFromEvent SchemaCache
cache Event b
e of
Left Text
err -> do
QErr -> io ()
forall r (m :: * -> *).
(MonadReader r m, Has (Logger Hasura) r, MonadIO m) =>
QErr -> m ()
logQErr (QErr -> io ()) -> QErr -> io ()
forall a b. (a -> b) -> a -> b
$ Code -> Text -> QErr
err500 Code
Unexpected Text
err
UTCTime
currentTime <- IO UTCTime -> io UTCTime
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
ExceptT QErr io () -> io (Either QErr ())
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (SourceConfig b
-> Event b
-> UTCTime
-> MaintenanceMode MaintenanceModeVersion
-> ExceptT QErr io ()
forall (b :: BackendType) (m :: * -> *).
(BackendEventTrigger b, MonadIO m, MonadError QErr m) =>
SourceConfig b
-> Event b
-> UTCTime
-> MaintenanceMode MaintenanceModeVersion
-> m ()
setRetry SourceConfig b
sourceConfig Event b
e (NominalDiffTime -> UTCTime -> UTCTime
addUTCTime NominalDiffTime
60 UTCTime
currentTime) MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion)
io (Either QErr ()) -> (Either QErr () -> io ()) -> io ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Either QErr () -> (QErr -> io ()) -> io ())
-> (QErr -> io ()) -> Either QErr () -> io ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip Either QErr () -> (QErr -> io ()) -> io ()
forall (m :: * -> *) e a.
Applicative m =>
Either e a -> (e -> m a) -> m a
onLeft QErr -> io ()
forall r (m :: * -> *).
(MonadReader r m, Has (Logger Hasura) r, MonadIO m) =>
QErr -> m ()
logQErr
Right EventTriggerInfo b
eti -> Text -> TraceT io () -> io ()
runTraceT (EventTriggerInfo b -> Text
forall (b :: BackendType). EventTriggerInfo b -> Text
spanName EventTriggerInfo b
eti) do
let webhook :: EnvRecord ResolvedWebhook
webhook = WebhookConfInfo -> EnvRecord ResolvedWebhook
wciCachedValue (WebhookConfInfo -> EnvRecord ResolvedWebhook)
-> WebhookConfInfo -> EnvRecord ResolvedWebhook
forall a b. (a -> b) -> a -> b
$ EventTriggerInfo b -> WebhookConfInfo
forall (b :: BackendType). EventTriggerInfo b -> WebhookConfInfo
etiWebhookInfo EventTriggerInfo b
eti
retryConf :: RetryConf
retryConf = EventTriggerInfo b -> RetryConf
forall (b :: BackendType). EventTriggerInfo b -> RetryConf
etiRetryConf EventTriggerInfo b
eti
timeoutSeconds :: Int
timeoutSeconds = Int -> Maybe Int -> Int
forall a. a -> Maybe a -> a
fromMaybe Int
defaultTimeoutSeconds (RetryConf -> Maybe Int
rcTimeoutSec RetryConf
retryConf)
httpTimeout :: ResponseTimeout
httpTimeout = Int -> ResponseTimeout
HTTP.responseTimeoutMicro (Int
timeoutSeconds Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000000)
([Header]
headers, [HeaderConf]
logHeaders) = [EventHeaderInfo] -> ([Header], [HeaderConf])
prepareHeaders (EventTriggerInfo b -> [EventHeaderInfo]
forall (b :: BackendType). EventTriggerInfo b -> [EventHeaderInfo]
etiHeaders EventTriggerInfo b
eti)
ep :: EventPayload b
ep = RetryConf -> Event b -> EventPayload b
forall (b :: BackendType). RetryConf -> Event b -> EventPayload b
createEventPayload RetryConf
retryConf Event b
e
payload :: ByteString
payload = Value -> ByteString
forall a. ToJSON a => a -> ByteString
J.encode (Value -> ByteString) -> Value -> ByteString
forall a b. (a -> b) -> a -> b
$ EventPayload b -> Value
forall a. ToJSON a => a -> Value
J.toJSON EventPayload b
ep
extraLogCtx :: ExtraLogContext
extraLogCtx = EventId -> Maybe TriggerName -> ExtraLogContext
ExtraLogContext (EventPayload b -> EventId
forall (b :: BackendType). EventPayload b -> EventId
epId EventPayload b
ep) (TriggerName -> Maybe TriggerName
forall a. a -> Maybe a
Just (TriggerName -> Maybe TriggerName)
-> TriggerName -> Maybe TriggerName
forall a b. (a -> b) -> a -> b
$ EventTriggerInfo b -> TriggerName
forall (b :: BackendType). EventTriggerInfo b -> TriggerName
etiName EventTriggerInfo b
eti)
requestTransform :: Maybe RequestTransform
requestTransform = EventTriggerInfo b -> Maybe RequestTransform
forall (b :: BackendType).
EventTriggerInfo b -> Maybe RequestTransform
etiRequestTransform EventTriggerInfo b
eti
responseTransform :: Maybe ResponseTransform
responseTransform = MetadataResponseTransform -> ResponseTransform
mkResponseTransform (MetadataResponseTransform -> ResponseTransform)
-> Maybe MetadataResponseTransform -> Maybe ResponseTransform
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> EventTriggerInfo b -> Maybe MetadataResponseTransform
forall (b :: BackendType).
EventTriggerInfo b -> Maybe MetadataResponseTransform
etiResponseTransform EventTriggerInfo b
eti
Either
(TransformableRequestError 'EventType)
(Request, HTTPResp 'EventType)
eitherReqRes <-
ExceptT
(TransformableRequestError 'EventType)
(TraceT io)
(Request, HTTPResp 'EventType)
-> TraceT
io
(Either
(TransformableRequestError 'EventType)
(Request, HTTPResp 'EventType))
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT
(TransformableRequestError 'EventType)
(TraceT io)
(Request, HTTPResp 'EventType)
-> TraceT
io
(Either
(TransformableRequestError 'EventType)
(Request, HTTPResp 'EventType)))
-> ExceptT
(TransformableRequestError 'EventType)
(TraceT io)
(Request, HTTPResp 'EventType)
-> TraceT
io
(Either
(TransformableRequestError 'EventType)
(Request, HTTPResp 'EventType))
forall a b. (a -> b) -> a -> b
$
[Header]
-> ResponseTimeout
-> ByteString
-> Maybe RequestTransform
-> ResolvedWebhook
-> ExceptT
(TransformableRequestError 'EventType) (TraceT io) RequestDetails
forall (a :: TriggerTypes) (m :: * -> *).
MonadError (TransformableRequestError a) m =>
[Header]
-> ResponseTimeout
-> ByteString
-> Maybe RequestTransform
-> ResolvedWebhook
-> m RequestDetails
mkRequest [Header]
headers ResponseTimeout
httpTimeout ByteString
payload Maybe RequestTransform
requestTransform (EnvRecord ResolvedWebhook -> ResolvedWebhook
forall a. EnvRecord a -> a
_envVarValue EnvRecord ResolvedWebhook
webhook) ExceptT
(TransformableRequestError 'EventType) (TraceT io) RequestDetails
-> (RequestDetails
-> ExceptT
(TransformableRequestError 'EventType)
(TraceT io)
(Request, HTTPResp 'EventType))
-> ExceptT
(TransformableRequestError 'EventType)
(TraceT io)
(Request, HTTPResp 'EventType)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \RequestDetails
reqDetails -> do
let request :: Request
request = RequestDetails -> Request
extractRequest RequestDetails
reqDetails
logger' :: Either (HTTPErr 'EventType) (HTTPResp 'EventType)
-> RequestDetails
-> ExceptT (TransformableRequestError 'EventType) (TraceT io) ()
logger' Either (HTTPErr 'EventType) (HTTPResp 'EventType)
res RequestDetails
details = Either (HTTPErr 'EventType) (HTTPResp 'EventType)
-> ExtraLogContext
-> RequestDetails
-> Text
-> [HeaderConf]
-> ExceptT (TransformableRequestError 'EventType) (TraceT io) ()
forall r (m :: * -> *).
(MonadReader r m, Has (Logger Hasura) r, MonadIO m) =>
Either (HTTPErr 'EventType) (HTTPResp 'EventType)
-> ExtraLogContext
-> RequestDetails
-> Text
-> [HeaderConf]
-> m ()
logHTTPForET Either (HTTPErr 'EventType) (HTTPResp 'EventType)
res ExtraLogContext
extraLogCtx RequestDetails
details (EnvRecord ResolvedWebhook -> Text
forall a. EnvRecord a -> Text
_envVarName EnvRecord ResolvedWebhook
webhook) [HeaderConf]
logHeaders
HTTPResp 'EventType
resp <-
ExceptT (TransformableRequestError 'EventType) (TraceT io) ()
-> ExceptT (TransformableRequestError 'EventType) (TraceT io) ()
-> ExceptT
(TransformableRequestError 'EventType)
(TraceT io)
(HTTPResp 'EventType)
-> ExceptT
(TransformableRequestError 'EventType)
(TraceT io)
(HTTPResp 'EventType)
forall (m :: * -> *) a c b. MonadMask m => m a -> m c -> m b -> m b
bracket_
( do
IO ()
-> ExceptT (TransformableRequestError 'EventType) (TraceT io) ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ()
-> ExceptT (TransformableRequestError 'EventType) (TraceT io) ())
-> IO ()
-> ExceptT (TransformableRequestError 'EventType) (TraceT io) ()
forall a b. (a -> b) -> a -> b
$ Gauge -> IO ()
EKG.Gauge.inc (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ ServerMetrics -> Gauge
smNumEventHTTPWorkers ServerMetrics
serverMetrics
IO ()
-> ExceptT (TransformableRequestError 'EventType) (TraceT io) ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ()
-> ExceptT (TransformableRequestError 'EventType) (TraceT io) ())
-> IO ()
-> ExceptT (TransformableRequestError 'EventType) (TraceT io) ()
forall a b. (a -> b) -> a -> b
$ Gauge -> IO ()
Prometheus.Gauge.inc (EventTriggerMetrics -> Gauge
eventTriggerHTTPWorkers EventTriggerMetrics
eventTriggerMetrics)
)
( do
IO ()
-> ExceptT (TransformableRequestError 'EventType) (TraceT io) ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ()
-> ExceptT (TransformableRequestError 'EventType) (TraceT io) ())
-> IO ()
-> ExceptT (TransformableRequestError 'EventType) (TraceT io) ()
forall a b. (a -> b) -> a -> b
$ Gauge -> IO ()
EKG.Gauge.dec (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ ServerMetrics -> Gauge
smNumEventHTTPWorkers ServerMetrics
serverMetrics
IO ()
-> ExceptT (TransformableRequestError 'EventType) (TraceT io) ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ()
-> ExceptT (TransformableRequestError 'EventType) (TraceT io) ())
-> IO ()
-> ExceptT (TransformableRequestError 'EventType) (TraceT io) ()
forall a b. (a -> b) -> a -> b
$ Gauge -> IO ()
Prometheus.Gauge.dec (EventTriggerMetrics -> Gauge
eventTriggerHTTPWorkers EventTriggerMetrics
eventTriggerMetrics)
)
(RequestDetails
-> Maybe ResponseTransform
-> Maybe SessionVariables
-> (Either (HTTPErr 'EventType) (HTTPResp 'EventType)
-> RequestDetails
-> ExceptT (TransformableRequestError 'EventType) (TraceT io) ())
-> ExceptT
(TransformableRequestError 'EventType)
(TraceT io)
(HTTPResp 'EventType)
forall r (m :: * -> *) (a :: TriggerTypes).
(MonadReader r m, MonadError (TransformableRequestError a) m,
Has Manager r, Has (Logger Hasura) r, MonadIO m, MonadTrace m) =>
RequestDetails
-> Maybe ResponseTransform
-> Maybe SessionVariables
-> (Either (HTTPErr a) (HTTPResp a) -> RequestDetails -> m ())
-> m (HTTPResp a)
invokeRequest RequestDetails
reqDetails Maybe ResponseTransform
responseTransform (RequestDetails -> Maybe SessionVariables
_rdSessionVars RequestDetails
reqDetails) Either (HTTPErr 'EventType) (HTTPResp 'EventType)
-> RequestDetails
-> ExceptT (TransformableRequestError 'EventType) (TraceT io) ()
logger')
(Request, HTTPResp 'EventType)
-> ExceptT
(TransformableRequestError 'EventType)
(TraceT io)
(Request, HTTPResp 'EventType)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Request
request, HTTPResp 'EventType
resp)
case Either
(TransformableRequestError 'EventType)
(Request, HTTPResp 'EventType)
eitherReqRes of
Right (Request
req, HTTPResp 'EventType
resp) ->
let reqBody :: Value
reqBody = Value -> Maybe Value -> Value
forall a. a -> Maybe a -> a
fromMaybe Value
J.Null (Maybe Value -> Value) -> Maybe Value -> Value
forall a b. (a -> b) -> a -> b
$ Getting (Maybe ByteString) Request (Maybe ByteString)
-> Request -> Maybe ByteString
forall s (m :: * -> *) a. MonadReader s m => Getting a s a -> m a
view Getting (Maybe ByteString) Request (Maybe ByteString)
Lens' Request (Maybe ByteString)
HTTP.body Request
req Maybe ByteString -> (ByteString -> Maybe Value) -> Maybe Value
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= FromJSON Value => ByteString -> Maybe Value
forall a. FromJSON a => ByteString -> Maybe a
J.decode @J.Value
in SourceConfig b
-> Event b
-> [HeaderConf]
-> Value
-> MaintenanceMode MaintenanceModeVersion
-> HTTPResp 'EventType
-> TraceT io (Either QErr ())
forall (b :: BackendType) (m :: * -> *) (a :: TriggerTypes).
(MonadIO m, BackendEventTrigger b) =>
SourceConfig b
-> Event b
-> [HeaderConf]
-> Value
-> MaintenanceMode MaintenanceModeVersion
-> HTTPResp a
-> m (Either QErr ())
processSuccess SourceConfig b
sourceConfig Event b
e [HeaderConf]
logHeaders Value
reqBody MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion HTTPResp 'EventType
resp TraceT io (Either QErr ())
-> (Either QErr () -> TraceT io ()) -> TraceT io ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Either QErr () -> (QErr -> TraceT io ()) -> TraceT io ())
-> (QErr -> TraceT io ()) -> Either QErr () -> TraceT io ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip Either QErr () -> (QErr -> TraceT io ()) -> TraceT io ()
forall (m :: * -> *) e a.
Applicative m =>
Either e a -> (e -> m a) -> m a
onLeft QErr -> TraceT io ()
forall r (m :: * -> *).
(MonadReader r m, Has (Logger Hasura) r, MonadIO m) =>
QErr -> m ()
logQErr
Left (HTTPError Value
reqBody HTTPErr 'EventType
err) ->
SourceConfig b
-> Event b
-> RetryConf
-> [HeaderConf]
-> Value
-> MaintenanceMode MaintenanceModeVersion
-> HTTPErr 'EventType
-> TraceT io (Either QErr ())
forall (b :: BackendType) (m :: * -> *) (a :: TriggerTypes).
(MonadIO m, BackendEventTrigger b) =>
SourceConfig b
-> Event b
-> RetryConf
-> [HeaderConf]
-> Value
-> MaintenanceMode MaintenanceModeVersion
-> HTTPErr a
-> m (Either QErr ())
processError @b SourceConfig b
sourceConfig Event b
e RetryConf
retryConf [HeaderConf]
logHeaders Value
reqBody MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion HTTPErr 'EventType
err TraceT io (Either QErr ())
-> (Either QErr () -> TraceT io ()) -> TraceT io ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Either QErr () -> (QErr -> TraceT io ()) -> TraceT io ())
-> (QErr -> TraceT io ()) -> Either QErr () -> TraceT io ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip Either QErr () -> (QErr -> TraceT io ()) -> TraceT io ()
forall (m :: * -> *) e a.
Applicative m =>
Either e a -> (e -> m a) -> m a
onLeft QErr -> TraceT io ()
forall r (m :: * -> *).
(MonadReader r m, Has (Logger Hasura) r, MonadIO m) =>
QErr -> m ()
logQErr
Left (TransformationError Value
_ TransformErrorBundle
err) -> do
Logger Hasura
-> forall a (m :: * -> *).
(ToEngineLog a Hasura, MonadIO m) =>
a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
(ToEngineLog a impl, MonadIO m) =>
a -> m ()
L.unLogger Logger Hasura
logger (UnstructuredLog -> TraceT io ())
-> UnstructuredLog -> TraceT io ()
forall a b. (a -> b) -> a -> b
$ LogLevel -> SerializableBlob -> UnstructuredLog
L.UnstructuredLog LogLevel
L.LevelError (ByteString -> SerializableBlob
SB.fromLBS (ByteString -> SerializableBlob) -> ByteString -> SerializableBlob
forall a b. (a -> b) -> a -> b
$ TransformErrorBundle -> ByteString
forall a. ToJSON a => a -> ByteString
J.encode TransformErrorBundle
err)
SourceConfig b
-> Event b
-> Maybe (Invocation 'EventType)
-> ProcessEventError
-> MaintenanceMode MaintenanceModeVersion
-> TraceT io (Either QErr ())
forall (b :: BackendType) (m :: * -> *).
(BackendEventTrigger b, MonadIO m) =>
SourceConfig b
-> Event b
-> Maybe (Invocation 'EventType)
-> ProcessEventError
-> MaintenanceMode MaintenanceModeVersion
-> m (Either QErr ())
recordError' @b SourceConfig b
sourceConfig Event b
e Maybe (Invocation 'EventType)
forall a. Maybe a
Nothing ProcessEventError
PESetError MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion TraceT io (Either QErr ())
-> (Either QErr () -> TraceT io ()) -> TraceT io ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Either QErr () -> (QErr -> TraceT io ()) -> TraceT io ())
-> (QErr -> TraceT io ()) -> Either QErr () -> TraceT io ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip Either QErr () -> (QErr -> TraceT io ()) -> TraceT io ()
forall (m :: * -> *) e a.
Applicative m =>
Either e a -> (e -> m a) -> m a
onLeft QErr -> TraceT io ()
forall r (m :: * -> *).
(MonadReader r m, Has (Logger Hasura) r, MonadIO m) =>
QErr -> m ()
logQErr
SourceName
-> EventId -> TVar (HashMap SourceName (Set EventId)) -> io ()
forall (m :: * -> *).
MonadIO m =>
SourceName
-> EventId -> TVar (HashMap SourceName (Set EventId)) -> m ()
removeEventTriggerEventFromLockedEvents SourceName
sourceName (Event b -> EventId
forall (b :: BackendType). Event b -> EventId
eId Event b
e) TVar (HashMap SourceName (Set EventId))
leEvents
createEventPayload :: RetryConf -> Event b -> EventPayload b
createEventPayload :: RetryConf -> Event b -> EventPayload b
createEventPayload RetryConf
retryConf Event b
e =
EventPayload :: forall (b :: BackendType).
EventId
-> TableName b
-> TriggerMetadata
-> Value
-> DeliveryInfo
-> UTCTime
-> EventPayload b
EventPayload
{ epId :: EventId
epId = Event b -> EventId
forall (b :: BackendType). Event b -> EventId
eId Event b
e,
epTable :: TableName b
epTable = Event b -> TableName b
forall (b :: BackendType). Event b -> TableName b
eTable Event b
e,
epTrigger :: TriggerMetadata
epTrigger = Event b -> TriggerMetadata
forall (b :: BackendType). Event b -> TriggerMetadata
eTrigger Event b
e,
epEvent :: Value
epEvent = Event b -> Value
forall (b :: BackendType). Event b -> Value
eEvent Event b
e,
epDeliveryInfo :: DeliveryInfo
epDeliveryInfo =
DeliveryInfo :: Int -> Int -> DeliveryInfo
DeliveryInfo
{ diCurrentRetry :: Int
diCurrentRetry = Event b -> Int
forall (b :: BackendType). Event b -> Int
eTries Event b
e,
diMaxRetries :: Int
diMaxRetries = RetryConf -> Int
rcNumRetries RetryConf
retryConf
},
epCreatedAt :: UTCTime
epCreatedAt = Event b -> UTCTime
forall (b :: BackendType). Event b -> UTCTime
eCreatedAt Event b
e
}
processSuccess ::
forall b m a.
(MonadIO m, BackendEventTrigger b) =>
SourceConfig b ->
Event b ->
[HeaderConf] ->
J.Value ->
MaintenanceMode MaintenanceModeVersion ->
HTTPResp a ->
m (Either QErr ())
processSuccess :: SourceConfig b
-> Event b
-> [HeaderConf]
-> Value
-> MaintenanceMode MaintenanceModeVersion
-> HTTPResp a
-> m (Either QErr ())
processSuccess SourceConfig b
sourceConfig Event b
e [HeaderConf]
reqHeaders Value
ep MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion HTTPResp a
resp = do
let respBody :: SerializableBlob
respBody = HTTPResp a -> SerializableBlob
forall (a :: TriggerTypes). HTTPResp a -> SerializableBlob
hrsBody HTTPResp a
resp
respHeaders :: [HeaderConf]
respHeaders = HTTPResp a -> [HeaderConf]
forall (a :: TriggerTypes). HTTPResp a -> [HeaderConf]
hrsHeaders HTTPResp a
resp
respStatus :: Int
respStatus = HTTPResp a -> Int
forall (a :: TriggerTypes). HTTPResp a -> Int
hrsStatus HTTPResp a
resp
eid :: EventId
eid = Event b -> EventId
forall (b :: BackendType). Event b -> EventId
eId Event b
e
invocation :: Invocation 'EventType
invocation = EventId
-> Value
-> Maybe Int
-> [HeaderConf]
-> SerializableBlob
-> [HeaderConf]
-> Invocation 'EventType
mkInvocation EventId
eid Value
ep (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
respStatus) [HeaderConf]
reqHeaders SerializableBlob
respBody [HeaderConf]
respHeaders
SourceConfig b
-> Event b
-> Invocation 'EventType
-> MaintenanceMode MaintenanceModeVersion
-> m (Either QErr ())
forall (b :: BackendType) (m :: * -> *).
(BackendEventTrigger b, MonadIO m) =>
SourceConfig b
-> Event b
-> Invocation 'EventType
-> MaintenanceMode MaintenanceModeVersion
-> m (Either QErr ())
recordSuccess @b SourceConfig b
sourceConfig Event b
e Invocation 'EventType
invocation MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion
processError ::
forall b m a.
( MonadIO m,
BackendEventTrigger b
) =>
SourceConfig b ->
Event b ->
RetryConf ->
[HeaderConf] ->
J.Value ->
MaintenanceMode MaintenanceModeVersion ->
HTTPErr a ->
m (Either QErr ())
processError :: SourceConfig b
-> Event b
-> RetryConf
-> [HeaderConf]
-> Value
-> MaintenanceMode MaintenanceModeVersion
-> HTTPErr a
-> m (Either QErr ())
processError SourceConfig b
sourceConfig Event b
e RetryConf
retryConf [HeaderConf]
reqHeaders Value
ep MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion HTTPErr a
err = do
let invocation :: Invocation 'EventType
invocation = case HTTPErr a
err of
HClient HttpException
httpException ->
let statusMaybe :: Maybe Int
statusMaybe = HttpException -> Maybe Int
getHTTPExceptionStatus HttpException
httpException
in EventId
-> Value
-> Maybe Int
-> [HeaderConf]
-> SerializableBlob
-> [HeaderConf]
-> Invocation 'EventType
mkInvocation (Event b -> EventId
forall (b :: BackendType). Event b -> EventId
eId Event b
e) Value
ep Maybe Int
statusMaybe [HeaderConf]
reqHeaders (ByteString -> SerializableBlob
SB.fromLBS (HttpException -> ByteString
forall a. ToJSON a => a -> ByteString
J.encode HttpException
httpException)) []
HStatus HTTPResp a
errResp -> do
let respPayload :: SerializableBlob
respPayload = HTTPResp a -> SerializableBlob
forall (a :: TriggerTypes). HTTPResp a -> SerializableBlob
hrsBody HTTPResp a
errResp
respHeaders :: [HeaderConf]
respHeaders = HTTPResp a -> [HeaderConf]
forall (a :: TriggerTypes). HTTPResp a -> [HeaderConf]
hrsHeaders HTTPResp a
errResp
respStatus :: Int
respStatus = HTTPResp a -> Int
forall (a :: TriggerTypes). HTTPResp a -> Int
hrsStatus HTTPResp a
errResp
EventId
-> Value
-> Maybe Int
-> [HeaderConf]
-> SerializableBlob
-> [HeaderConf]
-> Invocation 'EventType
mkInvocation (Event b -> EventId
forall (b :: BackendType). Event b -> EventId
eId Event b
e) Value
ep (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
respStatus) [HeaderConf]
reqHeaders SerializableBlob
respPayload [HeaderConf]
respHeaders
HOther String
detail -> do
let errMsg :: SerializableBlob
errMsg = ByteString -> SerializableBlob
SB.fromLBS (ByteString -> SerializableBlob) -> ByteString -> SerializableBlob
forall a b. (a -> b) -> a -> b
$ String -> ByteString
forall a. ToJSON a => a -> ByteString
J.encode String
detail
EventId
-> Value
-> Maybe Int
-> [HeaderConf]
-> SerializableBlob
-> [HeaderConf]
-> Invocation 'EventType
mkInvocation (Event b -> EventId
forall (b :: BackendType). Event b -> EventId
eId Event b
e) Value
ep (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
500) [HeaderConf]
reqHeaders SerializableBlob
errMsg []
ProcessEventError
retryOrError <- Event b -> RetryConf -> HTTPErr a -> m ProcessEventError
forall (m :: * -> *) (b :: BackendType) (a :: TriggerTypes).
MonadIO m =>
Event b -> RetryConf -> HTTPErr a -> m ProcessEventError
retryOrSetError Event b
e RetryConf
retryConf HTTPErr a
err
SourceConfig b
-> Event b
-> Invocation 'EventType
-> ProcessEventError
-> MaintenanceMode MaintenanceModeVersion
-> m (Either QErr ())
forall (b :: BackendType) (m :: * -> *).
(BackendEventTrigger b, MonadIO m) =>
SourceConfig b
-> Event b
-> Invocation 'EventType
-> ProcessEventError
-> MaintenanceMode MaintenanceModeVersion
-> m (Either QErr ())
recordError @b SourceConfig b
sourceConfig Event b
e Invocation 'EventType
invocation ProcessEventError
retryOrError MaintenanceMode MaintenanceModeVersion
maintenanceModeVersion
retryOrSetError ::
MonadIO m =>
Event b ->
RetryConf ->
HTTPErr a ->
m ProcessEventError
retryOrSetError :: Event b -> RetryConf -> HTTPErr a -> m ProcessEventError
retryOrSetError Event b
e RetryConf
retryConf HTTPErr a
err = do
let mretryHeader :: Maybe Text
mretryHeader = HTTPErr a -> Maybe Text
forall (a :: TriggerTypes). HTTPErr a -> Maybe Text
getRetryAfterHeaderFromError HTTPErr a
err
tries :: Int
tries = Event b -> Int
forall (b :: BackendType). Event b -> Int
eTries Event b
e
mretryHeaderSeconds :: Maybe Int
mretryHeaderSeconds = Maybe Text
mretryHeader Maybe Text -> (Text -> Maybe Int) -> Maybe Int
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Text -> Maybe Int
parseRetryHeader
triesExhausted :: Bool
triesExhausted = Int
tries Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= RetryConf -> Int
rcNumRetries RetryConf
retryConf
noRetryHeader :: Bool
noRetryHeader = Maybe Int -> Bool
forall a. Maybe a -> Bool
isNothing Maybe Int
mretryHeaderSeconds
if Bool
triesExhausted Bool -> Bool -> Bool
&& Bool
noRetryHeader
then ProcessEventError -> m ProcessEventError
forall (f :: * -> *) a. Applicative f => a -> f a
pure ProcessEventError
PESetError
else do
UTCTime
currentTime <- IO UTCTime -> m UTCTime
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
let delay :: Int
delay = Int -> Maybe Int -> Int
forall a. a -> Maybe a -> a
fromMaybe (RetryConf -> Int
rcIntervalSec RetryConf
retryConf) Maybe Int
mretryHeaderSeconds
diff :: NominalDiffTime
diff = Int -> NominalDiffTime
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
delay
retryTime :: UTCTime
retryTime = NominalDiffTime -> UTCTime -> UTCTime
addUTCTime NominalDiffTime
diff UTCTime
currentTime
ProcessEventError -> m ProcessEventError
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ProcessEventError -> m ProcessEventError)
-> ProcessEventError -> m ProcessEventError
forall a b. (a -> b) -> a -> b
$ UTCTime -> ProcessEventError
PESetRetry UTCTime
retryTime
where
getRetryAfterHeaderFromError :: HTTPErr a -> Maybe Text
getRetryAfterHeaderFromError (HStatus HTTPResp a
resp) = HTTPResp a -> Maybe Text
forall (a :: TriggerTypes). HTTPResp a -> Maybe Text
getRetryAfterHeaderFromResp HTTPResp a
resp
getRetryAfterHeaderFromError HTTPErr a
_ = Maybe Text
forall a. Maybe a
Nothing
parseRetryHeader :: Text -> Maybe Int
parseRetryHeader = (Int -> Bool) -> Maybe Int -> Maybe Int
forall (m :: * -> *) a. MonadPlus m => (a -> Bool) -> m a -> m a
mfilter (Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0) (Maybe Int -> Maybe Int)
-> (Text -> Maybe Int) -> Text -> Maybe Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Maybe Int
forall a. Read a => String -> Maybe a
readMaybe (String -> Maybe Int) -> (Text -> String) -> Text -> Maybe Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> String
T.unpack
mkInvocation ::
EventId ->
J.Value ->
Maybe Int ->
[HeaderConf] ->
SB.SerializableBlob ->
[HeaderConf] ->
Invocation 'EventType
mkInvocation :: EventId
-> Value
-> Maybe Int
-> [HeaderConf]
-> SerializableBlob
-> [HeaderConf]
-> Invocation 'EventType
mkInvocation EventId
eid Value
ep Maybe Int
statusMaybe [HeaderConf]
reqHeaders SerializableBlob
respBody [HeaderConf]
respHeaders =
let resp :: Response 'EventType
resp =
case Maybe Int
statusMaybe of
Maybe Int
Nothing -> SerializableBlob -> Response 'EventType
forall (a :: TriggerTypes). SerializableBlob -> Response a
mkClientErr SerializableBlob
respBody
Just Int
status ->
if Int
status Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
200 Bool -> Bool -> Bool
&& Int
status Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
300
then Int -> SerializableBlob -> [HeaderConf] -> Response 'EventType
forall (a :: TriggerTypes).
Int -> SerializableBlob -> [HeaderConf] -> Response a
mkResp Int
status SerializableBlob
respBody [HeaderConf]
respHeaders
else SerializableBlob -> Response 'EventType
forall (a :: TriggerTypes). SerializableBlob -> Response a
mkClientErr SerializableBlob
respBody
in EventId
-> Maybe Int
-> WebhookRequest
-> Response 'EventType
-> Invocation 'EventType
forall (a :: TriggerTypes).
EventId
-> Maybe Int -> WebhookRequest -> Response a -> Invocation a
Invocation
EventId
eid
Maybe Int
statusMaybe
(Value -> [HeaderConf] -> Text -> WebhookRequest
mkWebhookReq Value
ep [HeaderConf]
reqHeaders Text
invocationVersionET)
Response 'EventType
resp
logQErr :: (MonadReader r m, Has (L.Logger L.Hasura) r, MonadIO m) => QErr -> m ()
logQErr :: QErr -> m ()
logQErr QErr
err = do
Logger Hasura
logger :: L.Logger L.Hasura <- (r -> Logger Hasura) -> m (Logger Hasura)
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks r -> Logger Hasura
forall a t. Has a t => t -> a
getter
Logger Hasura
-> forall a (m :: * -> *).
(ToEngineLog a Hasura, MonadIO m) =>
a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
(ToEngineLog a impl, MonadIO m) =>
a -> m ()
L.unLogger Logger Hasura
logger (EventInternalErr -> m ()) -> EventInternalErr -> m ()
forall a b. (a -> b) -> a -> b
$ QErr -> EventInternalErr
EventInternalErr QErr
err
getEventTriggerInfoFromEvent ::
forall b. Backend b => SchemaCache -> Event b -> Either Text (EventTriggerInfo b)
getEventTriggerInfoFromEvent :: SchemaCache -> Event b -> Either Text (EventTriggerInfo b)
getEventTriggerInfoFromEvent SchemaCache
sc Event b
e = do
let table :: TableName b
table = Event b -> TableName b
forall (b :: BackendType). Event b -> TableName b
eTable Event b
e
mTableInfo :: Maybe (TableInfo b)
mTableInfo = SourceName -> TableName b -> SourceCache -> Maybe (TableInfo b)
forall (b :: BackendType).
Backend b =>
SourceName -> TableName b -> SourceCache -> Maybe (TableInfo b)
unsafeTableInfo @b (Event b -> SourceName
forall (b :: BackendType). Event b -> SourceName
eSource Event b
e) TableName b
table (SourceCache -> Maybe (TableInfo b))
-> SourceCache -> Maybe (TableInfo b)
forall a b. (a -> b) -> a -> b
$ SchemaCache -> SourceCache
scSources SchemaCache
sc
TableInfo b
tableInfo <- Maybe (TableInfo b)
-> Either Text (TableInfo b) -> Either Text (TableInfo b)
forall (m :: * -> *) a. Applicative m => Maybe a -> m a -> m a
onNothing Maybe (TableInfo b)
mTableInfo (Either Text (TableInfo b) -> Either Text (TableInfo b))
-> Either Text (TableInfo b) -> Either Text (TableInfo b)
forall a b. (a -> b) -> a -> b
$ Text -> Either Text (TableInfo b)
forall a b. a -> Either a b
Left (Text
"table '" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> TableName b
table TableName b -> Text -> Text
forall t. ToTxt t => t -> Text -> Text
<<> Text
"' not found")
let triggerName :: TriggerName
triggerName = TriggerMetadata -> TriggerName
tmName (TriggerMetadata -> TriggerName) -> TriggerMetadata -> TriggerName
forall a b. (a -> b) -> a -> b
$ Event b -> TriggerMetadata
forall (b :: BackendType). Event b -> TriggerMetadata
eTrigger Event b
e
mEventTriggerInfo :: Maybe (EventTriggerInfo b)
mEventTriggerInfo = TriggerName
-> HashMap TriggerName (EventTriggerInfo b)
-> Maybe (EventTriggerInfo b)
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
M.lookup TriggerName
triggerName (TableInfo b -> HashMap TriggerName (EventTriggerInfo b)
forall (b :: BackendType). TableInfo b -> EventTriggerInfoMap b
_tiEventTriggerInfoMap TableInfo b
tableInfo)
Maybe (EventTriggerInfo b)
-> Either Text (EventTriggerInfo b)
-> Either Text (EventTriggerInfo b)
forall (m :: * -> *) a. Applicative m => Maybe a -> m a -> m a
onNothing Maybe (EventTriggerInfo b)
mEventTriggerInfo (Either Text (EventTriggerInfo b)
-> Either Text (EventTriggerInfo b))
-> Either Text (EventTriggerInfo b)
-> Either Text (EventTriggerInfo b)
forall a b. (a -> b) -> a -> b
$
Text -> Either Text (EventTriggerInfo b)
forall a b. a -> Either a b
Left
( Text
"event trigger '" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> TriggerName -> Text
triggerNameToTxt TriggerName
triggerName
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"' on table '"
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> TableName b
table TableName b -> Text -> Text
forall t. ToTxt t => t -> Text -> Text
<<> Text
"' not found"
)