{-# LANGUAGE TemplateHaskell #-}
module Hasura.GraphQL.Execute.Subscription.State
( SubscriptionsState (..),
initSubscriptionsState,
dumpSubscriptionsState,
SubscriberDetails,
SubscriptionPostPollHook,
addLiveQuery,
addStreamSubscriptionQuery,
removeLiveQuery,
removeStreamingQuery,
LiveAsyncActionQueryOnSource (..),
LiveAsyncActionQueryWithNoRelationships (..),
LiveAsyncActionQuery (..),
AsyncActionQueryLive (..),
AsyncActionSubscriptionState,
addAsyncActionLiveQuery,
removeAsyncActionLiveQuery,
LiveQuerySubscriberDetails,
StreamingSubscriberDetails,
)
where
import Control.Concurrent.Extended (forkImmortal, sleep)
import Control.Concurrent.STM qualified as STM
import Control.Exception (mask_)
import Control.Immortal qualified as Immortal
import Data.Aeson.Extended qualified as J
import Data.Aeson.Ordered qualified as JO
import Data.Monoid (Endo)
import Data.String
import Data.Text.Extended
import Data.UUID.V4 qualified as UUID
import GHC.AssertNF.CPP
import Hasura.Base.Error
import Hasura.GraphQL.Execute.Backend
import Hasura.GraphQL.Execute.Subscription.Options
import Hasura.GraphQL.Execute.Subscription.Plan
import Hasura.GraphQL.Execute.Subscription.Poll
import Hasura.GraphQL.Execute.Subscription.Poll.Common (PollerResponseState (PRSError, PRSSuccess))
import Hasura.GraphQL.Execute.Subscription.TMap qualified as TMap
import Hasura.GraphQL.ParameterizedQueryHash (ParameterizedQueryHash)
import Hasura.GraphQL.Transport.Backend
import Hasura.GraphQL.Transport.HTTP.Protocol (OperationName)
import Hasura.GraphQL.Transport.WebSocket.Protocol (OperationId)
import Hasura.Logging qualified as L
import Hasura.Prelude
import Hasura.RQL.Types.Action
import Hasura.RQL.Types.Common (SourceName)
import Hasura.SQL.AnyBackend qualified as AB
import Hasura.Server.Metrics (ServerMetrics (..))
import Hasura.Server.Prometheus
( DynamicSubscriptionLabel (..),
PrometheusMetrics (..),
SubscriptionLabel (..),
SubscriptionMetrics (..),
liveQuerySubscriptionLabel,
recordMetricWithLabel,
streamingSubscriptionLabel,
)
import Hasura.Server.Types (GranularPrometheusMetricsState (..), RequestId)
import Language.GraphQL.Draft.Syntax qualified as G
import Refined (unrefine)
import StmContainers.Map qualified as STMMap
import System.Metrics.Gauge qualified as EKG.Gauge
import System.Metrics.Prometheus.Gauge qualified as Prometheus.Gauge
import System.Metrics.Prometheus.GaugeVector qualified as GaugeVector
data SubscriptionsState = SubscriptionsState
{ SubscriptionsState -> PollerMap ()
_ssLiveQueryMap :: PollerMap (),
SubscriptionsState -> PollerMap (TVar CursorVariableValues)
_ssStreamQueryMap :: PollerMap (STM.TVar CursorVariableValues),
SubscriptionsState -> SubscriptionPostPollHook
_ssPostPollHook :: SubscriptionPostPollHook,
SubscriptionsState -> AsyncActionSubscriptionState
_ssAsyncActions :: AsyncActionSubscriptionState
}
initSubscriptionsState :: SubscriptionPostPollHook -> IO SubscriptionsState
initSubscriptionsState :: SubscriptionPostPollHook -> IO SubscriptionsState
initSubscriptionsState SubscriptionPostPollHook
pollHook =
STM SubscriptionsState -> IO SubscriptionsState
forall a. STM a -> IO a
STM.atomically
(STM SubscriptionsState -> IO SubscriptionsState)
-> STM SubscriptionsState -> IO SubscriptionsState
forall a b. (a -> b) -> a -> b
$ PollerMap ()
-> PollerMap (TVar CursorVariableValues)
-> SubscriptionPostPollHook
-> AsyncActionSubscriptionState
-> SubscriptionsState
SubscriptionsState
(PollerMap ()
-> PollerMap (TVar CursorVariableValues)
-> SubscriptionPostPollHook
-> AsyncActionSubscriptionState
-> SubscriptionsState)
-> STM (PollerMap ())
-> STM
(PollerMap (TVar CursorVariableValues)
-> SubscriptionPostPollHook
-> AsyncActionSubscriptionState
-> SubscriptionsState)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> STM (PollerMap ())
forall key value. STM (Map key value)
STMMap.new
STM
(PollerMap (TVar CursorVariableValues)
-> SubscriptionPostPollHook
-> AsyncActionSubscriptionState
-> SubscriptionsState)
-> STM (PollerMap (TVar CursorVariableValues))
-> STM
(SubscriptionPostPollHook
-> AsyncActionSubscriptionState -> SubscriptionsState)
forall a b. STM (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> STM (PollerMap (TVar CursorVariableValues))
forall key value. STM (Map key value)
STMMap.new
STM
(SubscriptionPostPollHook
-> AsyncActionSubscriptionState -> SubscriptionsState)
-> STM SubscriptionPostPollHook
-> STM (AsyncActionSubscriptionState -> SubscriptionsState)
forall a b. STM (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> SubscriptionPostPollHook -> STM SubscriptionPostPollHook
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SubscriptionPostPollHook
pollHook
STM (AsyncActionSubscriptionState -> SubscriptionsState)
-> STM AsyncActionSubscriptionState -> STM SubscriptionsState
forall a b. STM (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> STM AsyncActionSubscriptionState
forall k v. STM (TMap k v)
TMap.new
dumpSubscriptionsState :: Bool -> LiveQueriesOptions -> StreamQueriesOptions -> SubscriptionsState -> IO J.Value
dumpSubscriptionsState :: Bool
-> LiveQueriesOptions
-> LiveQueriesOptions
-> SubscriptionsState
-> IO Value
dumpSubscriptionsState Bool
extended LiveQueriesOptions
liveQOpts LiveQueriesOptions
streamQOpts (SubscriptionsState PollerMap ()
lqMap PollerMap (TVar CursorVariableValues)
streamMap SubscriptionPostPollHook
_ AsyncActionSubscriptionState
_) = do
Value
lqMapJ <- Bool -> PollerMap () -> IO Value
forall streamCursor. Bool -> PollerMap streamCursor -> IO Value
dumpPollerMap Bool
extended PollerMap ()
lqMap
Value
streamMapJ <- Bool -> PollerMap (TVar CursorVariableValues) -> IO Value
forall streamCursor. Bool -> PollerMap streamCursor -> IO Value
dumpPollerMap Bool
extended PollerMap (TVar CursorVariableValues)
streamMap
Value -> IO Value
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return
(Value -> IO Value) -> Value -> IO Value
forall a b. (a -> b) -> a -> b
$ [Pair] -> Value
J.object
[ Key
"options" Key -> LiveQueriesOptions -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
J..= LiveQueriesOptions
liveQOpts,
Key
"live_queries_map" Key -> Value -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
J..= Value
lqMapJ,
Key
"stream_queries_map" Key -> Value -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
J..= Value
streamMapJ,
Key
"stream_queries_options" Key -> LiveQueriesOptions -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
J..= LiveQueriesOptions
streamQOpts
]
data SubscriberDetails a = SubscriberDetails
{ forall a. SubscriberDetails a -> BackendPollerKey
_sdPoller :: BackendPollerKey,
forall a. SubscriberDetails a -> a
_sdCohort :: a,
forall a. SubscriberDetails a -> SubscriberId
_sdSubscriber :: SubscriberId
}
deriving (Int -> SubscriberDetails a -> ShowS
[SubscriberDetails a] -> ShowS
SubscriberDetails a -> String
(Int -> SubscriberDetails a -> ShowS)
-> (SubscriberDetails a -> String)
-> ([SubscriberDetails a] -> ShowS)
-> Show (SubscriberDetails a)
forall a. Show a => Int -> SubscriberDetails a -> ShowS
forall a. Show a => [SubscriberDetails a] -> ShowS
forall a. Show a => SubscriberDetails a -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: forall a. Show a => Int -> SubscriberDetails a -> ShowS
showsPrec :: Int -> SubscriberDetails a -> ShowS
$cshow :: forall a. Show a => SubscriberDetails a -> String
show :: SubscriberDetails a -> String
$cshowList :: forall a. Show a => [SubscriberDetails a] -> ShowS
showList :: [SubscriberDetails a] -> ShowS
Show)
type LiveQuerySubscriberDetails = SubscriberDetails CohortKey
type StreamingSubscriberDetails = SubscriberDetails (CohortKey, STM.TVar CursorVariableValues)
findPollerForSubscriber ::
Subscriber ->
PollerMap streamCursorVars ->
BackendPollerKey ->
CohortKey ->
(Subscriber -> Cohort streamCursorVars -> STM.STM streamCursorVars) ->
(Subscriber -> Poller streamCursorVars -> STM.STM streamCursorVars) ->
ParameterizedQueryHash ->
Maybe OperationName ->
STM.STM ((Maybe (Poller streamCursorVars)), streamCursorVars)
findPollerForSubscriber :: forall streamCursorVars.
Subscriber
-> PollerMap streamCursorVars
-> BackendPollerKey
-> CohortVariables
-> (Subscriber -> Cohort streamCursorVars -> STM streamCursorVars)
-> (Subscriber -> Poller streamCursorVars -> STM streamCursorVars)
-> ParameterizedQueryHash
-> Maybe OperationName
-> STM (Maybe (Poller streamCursorVars), streamCursorVars)
findPollerForSubscriber Subscriber
subscriber PollerMap streamCursorVars
pollerMap BackendPollerKey
pollerKey CohortVariables
cohortKey Subscriber -> Cohort streamCursorVars -> STM streamCursorVars
addToCohort Subscriber -> Poller streamCursorVars -> STM streamCursorVars
addToPoller ParameterizedQueryHash
parameterizedQueryHash Maybe OperationName
maybeOperationName =
BackendPollerKey
-> PollerMap streamCursorVars
-> STM (Maybe (Poller streamCursorVars))
forall key value.
(Eq key, Hashable key) =>
key -> Map key value -> STM (Maybe value)
STMMap.lookup BackendPollerKey
pollerKey PollerMap streamCursorVars
pollerMap STM (Maybe (Poller streamCursorVars))
-> (Maybe (Poller streamCursorVars)
-> STM (Maybe (Poller streamCursorVars), streamCursorVars))
-> STM (Maybe (Poller streamCursorVars), streamCursorVars)
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Just Poller streamCursorVars
poller -> do
streamCursorVars
cursorVars <-
CohortVariables
-> TMap CohortVariables (Cohort streamCursorVars)
-> STM (Maybe (Cohort streamCursorVars))
forall k v. Hashable k => k -> TMap k v -> STM (Maybe v)
TMap.lookup CohortVariables
cohortKey (Poller streamCursorVars
-> TMap CohortVariables (Cohort streamCursorVars)
forall streamCursor. Poller streamCursor -> CohortMap streamCursor
_pCohorts Poller streamCursorVars
poller) STM (Maybe (Cohort streamCursorVars))
-> (Maybe (Cohort streamCursorVars) -> STM streamCursorVars)
-> STM streamCursorVars
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Just Cohort streamCursorVars
cohort -> Subscriber -> Cohort streamCursorVars -> STM streamCursorVars
addToCohort Subscriber
subscriber Cohort streamCursorVars
cohort
Maybe (Cohort streamCursorVars)
Nothing -> Subscriber -> Poller streamCursorVars -> STM streamCursorVars
addToPoller Subscriber
subscriber Poller streamCursorVars
poller
Maybe OperationName
-> TMap (Maybe OperationName) Int -> STM (Maybe Int)
forall k v. Hashable k => k -> TMap k v -> STM (Maybe v)
TMap.lookup Maybe OperationName
maybeOperationName (Poller streamCursorVars -> TMap (Maybe OperationName) Int
forall streamCursor.
Poller streamCursor -> TMap (Maybe OperationName) Int
_pOperationNamesMap Poller streamCursorVars
poller) STM (Maybe Int) -> (Maybe Int -> STM ()) -> STM ()
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe Int
Nothing -> Int
-> Maybe OperationName -> TMap (Maybe OperationName) Int -> STM ()
forall k v. Hashable k => v -> k -> TMap k v -> STM ()
TMap.insert Int
1 Maybe OperationName
maybeOperationName (Poller streamCursorVars -> TMap (Maybe OperationName) Int
forall streamCursor.
Poller streamCursor -> TMap (Maybe OperationName) Int
_pOperationNamesMap Poller streamCursorVars
poller)
Just Int
_ -> (Int -> Int)
-> Maybe OperationName -> TMap (Maybe OperationName) Int -> STM ()
forall k v. Hashable k => (v -> v) -> k -> TMap k v -> STM ()
TMap.adjust (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) Maybe OperationName
maybeOperationName (Poller streamCursorVars -> TMap (Maybe OperationName) Int
forall streamCursor.
Poller streamCursor -> TMap (Maybe OperationName) Int
_pOperationNamesMap Poller streamCursorVars
poller)
(Maybe (Poller streamCursorVars), streamCursorVars)
-> STM (Maybe (Poller streamCursorVars), streamCursorVars)
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (Poller streamCursorVars)
forall a. Maybe a
Nothing, streamCursorVars
cursorVars)
Maybe (Poller streamCursorVars)
Nothing -> do
TMap (Maybe OperationName) Int
operationNamesMap <- STM (TMap (Maybe OperationName) Int)
forall k v. STM (TMap k v)
TMap.new
Int
-> Maybe OperationName -> TMap (Maybe OperationName) Int -> STM ()
forall k v. Hashable k => v -> k -> TMap k v -> STM ()
TMap.insert Int
1 Maybe OperationName
maybeOperationName TMap (Maybe OperationName) Int
operationNamesMap
!Poller streamCursorVars
poller <- TMap CohortVariables (Cohort streamCursorVars)
-> TVar PollerResponseState
-> TMVar PollerIOState
-> ParameterizedQueryHash
-> TMap (Maybe OperationName) Int
-> Poller streamCursorVars
forall streamCursor.
CohortMap streamCursor
-> TVar PollerResponseState
-> TMVar PollerIOState
-> ParameterizedQueryHash
-> TMap (Maybe OperationName) Int
-> Poller streamCursor
Poller (TMap CohortVariables (Cohort streamCursorVars)
-> TVar PollerResponseState
-> TMVar PollerIOState
-> ParameterizedQueryHash
-> TMap (Maybe OperationName) Int
-> Poller streamCursorVars)
-> STM (TMap CohortVariables (Cohort streamCursorVars))
-> STM
(TVar PollerResponseState
-> TMVar PollerIOState
-> ParameterizedQueryHash
-> TMap (Maybe OperationName) Int
-> Poller streamCursorVars)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> STM (TMap CohortVariables (Cohort streamCursorVars))
forall k v. STM (TMap k v)
TMap.new STM
(TVar PollerResponseState
-> TMVar PollerIOState
-> ParameterizedQueryHash
-> TMap (Maybe OperationName) Int
-> Poller streamCursorVars)
-> STM (TVar PollerResponseState)
-> STM
(TMVar PollerIOState
-> ParameterizedQueryHash
-> TMap (Maybe OperationName) Int
-> Poller streamCursorVars)
forall a b. STM (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> PollerResponseState -> STM (TVar PollerResponseState)
forall a. a -> STM (TVar a)
STM.newTVar PollerResponseState
PRSSuccess STM
(TMVar PollerIOState
-> ParameterizedQueryHash
-> TMap (Maybe OperationName) Int
-> Poller streamCursorVars)
-> STM (TMVar PollerIOState)
-> STM
(ParameterizedQueryHash
-> TMap (Maybe OperationName) Int -> Poller streamCursorVars)
forall a b. STM (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> STM (TMVar PollerIOState)
forall a. STM (TMVar a)
STM.newEmptyTMVar STM
(ParameterizedQueryHash
-> TMap (Maybe OperationName) Int -> Poller streamCursorVars)
-> STM ParameterizedQueryHash
-> STM (TMap (Maybe OperationName) Int -> Poller streamCursorVars)
forall a b. STM (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> ParameterizedQueryHash -> STM ParameterizedQueryHash
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ParameterizedQueryHash
parameterizedQueryHash STM (TMap (Maybe OperationName) Int -> Poller streamCursorVars)
-> STM (TMap (Maybe OperationName) Int)
-> STM (Poller streamCursorVars)
forall a b. STM (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> TMap (Maybe OperationName) Int
-> STM (TMap (Maybe OperationName) Int)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure TMap (Maybe OperationName) Int
operationNamesMap
streamCursorVars
cursorVars <- Subscriber -> Poller streamCursorVars -> STM streamCursorVars
addToPoller Subscriber
subscriber Poller streamCursorVars
poller
Poller streamCursorVars
-> BackendPollerKey -> PollerMap streamCursorVars -> STM ()
forall key value.
(Eq key, Hashable key) =>
value -> key -> Map key value -> STM ()
STMMap.insert Poller streamCursorVars
poller BackendPollerKey
pollerKey PollerMap streamCursorVars
pollerMap
(Maybe (Poller streamCursorVars), streamCursorVars)
-> STM (Maybe (Poller streamCursorVars), streamCursorVars)
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return ((Maybe (Poller streamCursorVars), streamCursorVars)
-> STM (Maybe (Poller streamCursorVars), streamCursorVars))
-> (Maybe (Poller streamCursorVars), streamCursorVars)
-> STM (Maybe (Poller streamCursorVars), streamCursorVars)
forall a b. (a -> b) -> a -> b
$ (Poller streamCursorVars -> Maybe (Poller streamCursorVars)
forall a. a -> Maybe a
Just Poller streamCursorVars
poller, streamCursorVars
cursorVars)
addLiveQuery ::
forall b.
(BackendTransport b) =>
L.Logger L.Hasura ->
ServerMetrics ->
PrometheusMetrics ->
SubscriberMetadata ->
SubscriptionsState ->
IO (LiveQueriesOptions, StreamQueriesOptions) ->
SourceName ->
ParameterizedQueryHash ->
Maybe OperationName ->
RequestId ->
SubscriptionQueryPlan b (MultiplexedQuery b) ->
IO GranularPrometheusMetricsState ->
OnChange ->
(Maybe (Endo JO.Value)) ->
IO LiveQuerySubscriberDetails
addLiveQuery :: forall (b :: BackendType).
BackendTransport b =>
Logger Hasura
-> ServerMetrics
-> PrometheusMetrics
-> SubscriberMetadata
-> SubscriptionsState
-> IO (LiveQueriesOptions, LiveQueriesOptions)
-> SourceName
-> ParameterizedQueryHash
-> Maybe OperationName
-> RequestId
-> SubscriptionQueryPlan b (MultiplexedQuery b)
-> IO GranularPrometheusMetricsState
-> OnChange
-> Maybe (Endo Value)
-> IO LiveQuerySubscriberDetails
addLiveQuery
Logger Hasura
logger
ServerMetrics
serverMetrics
PrometheusMetrics
prometheusMetrics
SubscriberMetadata
subscriberMetadata
SubscriptionsState
subscriptionState
IO (LiveQueriesOptions, LiveQueriesOptions)
getSubscriptionOptions
SourceName
source
ParameterizedQueryHash
parameterizedQueryHash
Maybe OperationName
operationName
RequestId
requestId
SubscriptionQueryPlan b (MultiplexedQuery b)
plan
IO GranularPrometheusMetricsState
granularPrometheusMetricsState
OnChange
onResultAction
Maybe (Endo Value)
modifier = do
SubscriberId
subscriberId <- IO SubscriberId
newSubscriberId
let !subscriber :: Subscriber
subscriber = SubscriberId
-> SubscriberMetadata
-> RequestId
-> Maybe OperationName
-> OnChange
-> Subscriber
Subscriber SubscriberId
subscriberId SubscriberMetadata
subscriberMetadata RequestId
requestId Maybe OperationName
operationName OnChange
onResultAction
$String
String -> Subscriber -> IO ()
forall a. String -> a -> IO ()
assertNFHere Subscriber
subscriber
(Maybe (Poller ())
pollerMaybe, ()) <-
STM (Maybe (Poller ()), ()) -> IO (Maybe (Poller ()), ())
forall a. STM a -> IO a
STM.atomically
(STM (Maybe (Poller ()), ()) -> IO (Maybe (Poller ()), ()))
-> STM (Maybe (Poller ()), ()) -> IO (Maybe (Poller ()), ())
forall a b. (a -> b) -> a -> b
$ Subscriber
-> PollerMap ()
-> BackendPollerKey
-> CohortVariables
-> (Subscriber -> Cohort () -> STM ())
-> (Subscriber -> Poller () -> STM ())
-> ParameterizedQueryHash
-> Maybe OperationName
-> STM (Maybe (Poller ()), ())
forall streamCursorVars.
Subscriber
-> PollerMap streamCursorVars
-> BackendPollerKey
-> CohortVariables
-> (Subscriber -> Cohort streamCursorVars -> STM streamCursorVars)
-> (Subscriber -> Poller streamCursorVars -> STM streamCursorVars)
-> ParameterizedQueryHash
-> Maybe OperationName
-> STM (Maybe (Poller streamCursorVars), streamCursorVars)
findPollerForSubscriber
Subscriber
subscriber
PollerMap ()
lqMap
BackendPollerKey
handlerId
CohortVariables
cohortKey
Subscriber -> Cohort () -> STM ()
forall {streamCursorVars}.
Subscriber -> Cohort streamCursorVars -> STM ()
addToCohort
Subscriber -> Poller () -> STM ()
addToPoller
ParameterizedQueryHash
parameterizedQueryHash
Maybe OperationName
operationName
Maybe (Poller ()) -> (Poller () -> IO ()) -> IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ Maybe (Poller ())
pollerMaybe ((Poller () -> IO ()) -> IO ()) -> (Poller () -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Poller ()
poller -> do
PollerId
pollerId <- UUID -> PollerId
PollerId (UUID -> PollerId) -> IO UUID -> IO PollerId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
Thread
threadRef <- String -> Logger Hasura -> IO Void -> IO Thread
forall (m :: * -> *).
ForkableMonadIO m =>
String -> Logger Hasura -> m Void -> m Thread
forkImmortal (String
"pollLiveQuery." String -> ShowS
forall a. Semigroup a => a -> a -> a
<> PollerId -> String
forall a. Show a => a -> String
show PollerId
pollerId) Logger Hasura
logger
(IO Void -> IO Thread) -> IO Void -> IO Thread
forall a b. (a -> b) -> a -> b
$ IO () -> IO Void
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever
(IO () -> IO Void) -> IO () -> IO Void
forall a b. (a -> b) -> a -> b
$ do
(LiveQueriesOptions
lqOpts, LiveQueriesOptions
_) <- IO (LiveQueriesOptions, LiveQueriesOptions)
getSubscriptionOptions
let SubscriptionsOptions BatchSize
_ RefetchInterval
refetchInterval = LiveQueriesOptions
lqOpts
forall (b :: BackendType).
BackendTransport b =>
PollerId
-> TVar PollerResponseState
-> LiveQueriesOptions
-> (SourceName, SourceConfig b)
-> RoleName
-> ParameterizedQueryHash
-> MultiplexedQuery b
-> CohortMap 'LiveQuery
-> SubscriptionPostPollHook
-> PrometheusMetrics
-> IO GranularPrometheusMetricsState
-> TMap (Maybe OperationName) Int
-> ResolvedConnectionTemplate b
-> Maybe (Endo Value)
-> IO ()
pollLiveQuery @b
PollerId
pollerId
(Poller () -> TVar PollerResponseState
forall streamCursor.
Poller streamCursor -> TVar PollerResponseState
_pPollerState Poller ()
poller)
LiveQueriesOptions
lqOpts
(SourceName
source, SourceConfig b
sourceConfig)
RoleName
role
ParameterizedQueryHash
parameterizedQueryHash
MultiplexedQuery b
query
(Poller () -> CohortMap ()
forall streamCursor. Poller streamCursor -> CohortMap streamCursor
_pCohorts Poller ()
poller)
SubscriptionPostPollHook
postPollHook
PrometheusMetrics
prometheusMetrics
IO GranularPrometheusMetricsState
granularPrometheusMetricsState
(Poller () -> TMap (Maybe OperationName) Int
forall streamCursor.
Poller streamCursor -> TMap (Maybe OperationName) Int
_pOperationNamesMap Poller ()
poller)
ResolvedConnectionTemplate b
resolvedConnectionTemplate
Maybe (Endo Value)
modifier
DiffTime -> IO ()
sleep (DiffTime -> IO ()) -> DiffTime -> IO ()
forall a b. (a -> b) -> a -> b
$ Refined NonNegative DiffTime -> DiffTime
forall {k} (p :: k) x. Refined p x -> x
unrefine (Refined NonNegative DiffTime -> DiffTime)
-> Refined NonNegative DiffTime -> DiffTime
forall a b. (a -> b) -> a -> b
$ RefetchInterval -> Refined NonNegative DiffTime
unRefetchInterval RefetchInterval
refetchInterval
let !pState :: PollerIOState
pState = Thread -> PollerId -> PollerIOState
PollerIOState Thread
threadRef PollerId
pollerId
$String
String -> PollerIOState -> IO ()
forall a. String -> a -> IO ()
assertNFHere PollerIOState
pState
STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar PollerIOState -> PollerIOState -> STM ()
forall a. TMVar a -> a -> STM ()
STM.putTMVar (Poller () -> TMVar PollerIOState
forall streamCursor. Poller streamCursor -> TMVar PollerIOState
_pIOState Poller ()
poller) PollerIOState
pState
IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Gauge -> IO ()
Prometheus.Gauge.inc (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ SubscriptionMetrics -> Gauge
submActiveLiveQueryPollers (SubscriptionMetrics -> Gauge) -> SubscriptionMetrics -> Gauge
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics -> SubscriptionMetrics
pmSubscriptionMetrics (PrometheusMetrics -> SubscriptionMetrics)
-> PrometheusMetrics -> SubscriptionMetrics
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics
prometheusMetrics
IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Gauge -> IO ()
EKG.Gauge.inc (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ ServerMetrics -> Gauge
smActiveSubscriptions ServerMetrics
serverMetrics
let promMetricGranularLabel :: SubscriptionLabel
promMetricGranularLabel = SubscriptionKindLabel
-> Maybe DynamicSubscriptionLabel -> SubscriptionLabel
SubscriptionLabel SubscriptionKindLabel
liveQuerySubscriptionLabel (DynamicSubscriptionLabel -> Maybe DynamicSubscriptionLabel
forall a. a -> Maybe a
Just (DynamicSubscriptionLabel -> Maybe DynamicSubscriptionLabel)
-> DynamicSubscriptionLabel -> Maybe DynamicSubscriptionLabel
forall a b. (a -> b) -> a -> b
$ ParameterizedQueryHash
-> Maybe OperationName -> DynamicSubscriptionLabel
DynamicSubscriptionLabel ParameterizedQueryHash
parameterizedQueryHash Maybe OperationName
operationName)
promMetricLabel :: SubscriptionLabel
promMetricLabel = SubscriptionKindLabel
-> Maybe DynamicSubscriptionLabel -> SubscriptionLabel
SubscriptionLabel SubscriptionKindLabel
liveQuerySubscriptionLabel Maybe DynamicSubscriptionLabel
forall a. Maybe a
Nothing
let numSubscriptionMetric :: GaugeVector SubscriptionLabel
numSubscriptionMetric = SubscriptionMetrics -> GaugeVector SubscriptionLabel
submActiveSubscriptions (SubscriptionMetrics -> GaugeVector SubscriptionLabel)
-> SubscriptionMetrics -> GaugeVector SubscriptionLabel
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics -> SubscriptionMetrics
pmSubscriptionMetrics (PrometheusMetrics -> SubscriptionMetrics)
-> PrometheusMetrics -> SubscriptionMetrics
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics
prometheusMetrics
IO GranularPrometheusMetricsState
-> Bool -> IO () -> IO () -> IO ()
forall (m :: * -> *).
MonadIO m =>
IO GranularPrometheusMetricsState -> Bool -> IO () -> IO () -> m ()
recordMetricWithLabel
IO GranularPrometheusMetricsState
granularPrometheusMetricsState
Bool
True
(GaugeVector SubscriptionLabel -> SubscriptionLabel -> IO ()
forall label. Ord label => GaugeVector label -> label -> IO ()
GaugeVector.inc GaugeVector SubscriptionLabel
numSubscriptionMetric SubscriptionLabel
promMetricGranularLabel)
(GaugeVector SubscriptionLabel -> SubscriptionLabel -> IO ()
forall label. Ord label => GaugeVector label -> label -> IO ()
GaugeVector.inc GaugeVector SubscriptionLabel
numSubscriptionMetric SubscriptionLabel
promMetricLabel)
IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Gauge -> IO ()
EKG.Gauge.inc (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ ServerMetrics -> Gauge
smActiveLiveQueries ServerMetrics
serverMetrics
LiveQuerySubscriberDetails -> IO LiveQuerySubscriberDetails
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (LiveQuerySubscriberDetails -> IO LiveQuerySubscriberDetails)
-> LiveQuerySubscriberDetails -> IO LiveQuerySubscriberDetails
forall a b. (a -> b) -> a -> b
$ BackendPollerKey
-> CohortVariables -> SubscriberId -> LiveQuerySubscriberDetails
forall a.
BackendPollerKey -> a -> SubscriberId -> SubscriberDetails a
SubscriberDetails BackendPollerKey
handlerId CohortVariables
cohortKey SubscriberId
subscriberId
where
SubscriptionsState PollerMap ()
lqMap PollerMap (TVar CursorVariableValues)
_ SubscriptionPostPollHook
postPollHook AsyncActionSubscriptionState
_ = SubscriptionsState
subscriptionState
SubscriptionQueryPlan (ParameterizedSubscriptionQueryPlan RoleName
role MultiplexedQuery b
query) SourceConfig b
sourceConfig CohortId
cohortId ResolvedConnectionTemplate b
resolvedConnectionTemplate CohortVariables
cohortKey Maybe Name
_ = SubscriptionQueryPlan b (MultiplexedQuery b)
plan
handlerId :: BackendPollerKey
handlerId = AnyBackend PollerKey -> BackendPollerKey
BackendPollerKey (AnyBackend PollerKey -> BackendPollerKey)
-> AnyBackend PollerKey -> BackendPollerKey
forall a b. (a -> b) -> a -> b
$ forall (b :: BackendType) (i :: BackendType -> *).
HasTag b =>
i b -> AnyBackend i
AB.mkAnyBackend @b (PollerKey b -> AnyBackend PollerKey)
-> PollerKey b -> AnyBackend PollerKey
forall a b. (a -> b) -> a -> b
$ SourceName
-> RoleName
-> Text
-> ResolvedConnectionTemplate b
-> ParameterizedQueryHash
-> PollerKey b
forall (b :: BackendType).
SourceName
-> RoleName
-> Text
-> ResolvedConnectionTemplate b
-> ParameterizedQueryHash
-> PollerKey b
PollerKey SourceName
source RoleName
role (MultiplexedQuery b -> Text
forall a. ToTxt a => a -> Text
toTxt MultiplexedQuery b
query) ResolvedConnectionTemplate b
resolvedConnectionTemplate ParameterizedQueryHash
parameterizedQueryHash
addToCohort :: Subscriber -> Cohort streamCursorVars -> STM ()
addToCohort Subscriber
subscriber Cohort streamCursorVars
handlerC =
Subscriber
-> SubscriberId -> TMap SubscriberId Subscriber -> STM ()
forall k v. Hashable k => v -> k -> TMap k v -> STM ()
TMap.insert Subscriber
subscriber (Subscriber -> SubscriberId
_sId Subscriber
subscriber) (TMap SubscriberId Subscriber -> STM ())
-> TMap SubscriberId Subscriber -> STM ()
forall a b. (a -> b) -> a -> b
$ Cohort streamCursorVars -> TMap SubscriberId Subscriber
forall streamCursorVars.
Cohort streamCursorVars -> TMap SubscriberId Subscriber
_cNewSubscribers Cohort streamCursorVars
handlerC
addToPoller :: Subscriber -> Poller () -> STM ()
addToPoller Subscriber
subscriber Poller ()
handler = do
!Cohort ()
newCohort <-
CohortId
-> TVar (Maybe ResponseHash)
-> TMap SubscriberId Subscriber
-> TMap SubscriberId Subscriber
-> ()
-> Cohort ()
forall streamCursorVars.
CohortId
-> TVar (Maybe ResponseHash)
-> TMap SubscriberId Subscriber
-> TMap SubscriberId Subscriber
-> streamCursorVars
-> Cohort streamCursorVars
Cohort CohortId
cohortId
(TVar (Maybe ResponseHash)
-> TMap SubscriberId Subscriber
-> TMap SubscriberId Subscriber
-> ()
-> Cohort ())
-> STM (TVar (Maybe ResponseHash))
-> STM
(TMap SubscriberId Subscriber
-> TMap SubscriberId Subscriber -> () -> Cohort ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe ResponseHash -> STM (TVar (Maybe ResponseHash))
forall a. a -> STM (TVar a)
STM.newTVar Maybe ResponseHash
forall a. Maybe a
Nothing
STM
(TMap SubscriberId Subscriber
-> TMap SubscriberId Subscriber -> () -> Cohort ())
-> STM (TMap SubscriberId Subscriber)
-> STM (TMap SubscriberId Subscriber -> () -> Cohort ())
forall a b. STM (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> STM (TMap SubscriberId Subscriber)
forall k v. STM (TMap k v)
TMap.new
STM (TMap SubscriberId Subscriber -> () -> Cohort ())
-> STM (TMap SubscriberId Subscriber) -> STM (() -> Cohort ())
forall a b. STM (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> STM (TMap SubscriberId Subscriber)
forall k v. STM (TMap k v)
TMap.new
STM (() -> Cohort ()) -> STM () -> STM (Cohort ())
forall a b. STM (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> () -> STM ()
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Subscriber -> Cohort () -> STM ()
forall {streamCursorVars}.
Subscriber -> Cohort streamCursorVars -> STM ()
addToCohort Subscriber
subscriber Cohort ()
newCohort
Cohort () -> CohortVariables -> CohortMap () -> STM ()
forall k v. Hashable k => v -> k -> TMap k v -> STM ()
TMap.insert Cohort ()
newCohort CohortVariables
cohortKey (CohortMap () -> STM ()) -> CohortMap () -> STM ()
forall a b. (a -> b) -> a -> b
$ Poller () -> CohortMap ()
forall streamCursor. Poller streamCursor -> CohortMap streamCursor
_pCohorts Poller ()
handler
addStreamSubscriptionQuery ::
forall b.
(BackendTransport b) =>
L.Logger L.Hasura ->
ServerMetrics ->
PrometheusMetrics ->
SubscriberMetadata ->
SubscriptionsState ->
IO (LiveQueriesOptions, StreamQueriesOptions) ->
SourceName ->
ParameterizedQueryHash ->
Maybe OperationName ->
RequestId ->
G.Name ->
SubscriptionQueryPlan b (MultiplexedQuery b) ->
IO GranularPrometheusMetricsState ->
OnChange ->
(Maybe (Endo JO.Value)) ->
IO StreamingSubscriberDetails
addStreamSubscriptionQuery :: forall (b :: BackendType).
BackendTransport b =>
Logger Hasura
-> ServerMetrics
-> PrometheusMetrics
-> SubscriberMetadata
-> SubscriptionsState
-> IO (LiveQueriesOptions, LiveQueriesOptions)
-> SourceName
-> ParameterizedQueryHash
-> Maybe OperationName
-> RequestId
-> Name
-> SubscriptionQueryPlan b (MultiplexedQuery b)
-> IO GranularPrometheusMetricsState
-> OnChange
-> Maybe (Endo Value)
-> IO StreamingSubscriberDetails
addStreamSubscriptionQuery
Logger Hasura
logger
ServerMetrics
serverMetrics
PrometheusMetrics
prometheusMetrics
SubscriberMetadata
subscriberMetadata
SubscriptionsState
subscriptionState
IO (LiveQueriesOptions, LiveQueriesOptions)
getSubscriptionOptions
SourceName
source
ParameterizedQueryHash
parameterizedQueryHash
Maybe OperationName
operationName
RequestId
requestId
Name
rootFieldName
SubscriptionQueryPlan b (MultiplexedQuery b)
plan
IO GranularPrometheusMetricsState
granularPrometheusMetricsState
OnChange
onResultAction
Maybe (Endo Value)
modifier = do
SubscriberId
subscriberId <- IO SubscriberId
newSubscriberId
let !subscriber :: Subscriber
subscriber = SubscriberId
-> SubscriberMetadata
-> RequestId
-> Maybe OperationName
-> OnChange
-> Subscriber
Subscriber SubscriberId
subscriberId SubscriberMetadata
subscriberMetadata RequestId
requestId Maybe OperationName
operationName OnChange
onResultAction
$String
String -> Subscriber -> IO ()
forall a. String -> a -> IO ()
assertNFHere Subscriber
subscriber
(Maybe (Poller (TVar CursorVariableValues))
handlerM, TVar CursorVariableValues
cohortCursorTVar) <-
STM
(Maybe (Poller (TVar CursorVariableValues)),
TVar CursorVariableValues)
-> IO
(Maybe (Poller (TVar CursorVariableValues)),
TVar CursorVariableValues)
forall a. STM a -> IO a
STM.atomically
(STM
(Maybe (Poller (TVar CursorVariableValues)),
TVar CursorVariableValues)
-> IO
(Maybe (Poller (TVar CursorVariableValues)),
TVar CursorVariableValues))
-> STM
(Maybe (Poller (TVar CursorVariableValues)),
TVar CursorVariableValues)
-> IO
(Maybe (Poller (TVar CursorVariableValues)),
TVar CursorVariableValues)
forall a b. (a -> b) -> a -> b
$ Subscriber
-> PollerMap (TVar CursorVariableValues)
-> BackendPollerKey
-> CohortVariables
-> (Subscriber
-> Cohort (TVar CursorVariableValues)
-> STM (TVar CursorVariableValues))
-> (Subscriber
-> Poller (TVar CursorVariableValues)
-> STM (TVar CursorVariableValues))
-> ParameterizedQueryHash
-> Maybe OperationName
-> STM
(Maybe (Poller (TVar CursorVariableValues)),
TVar CursorVariableValues)
forall streamCursorVars.
Subscriber
-> PollerMap streamCursorVars
-> BackendPollerKey
-> CohortVariables
-> (Subscriber -> Cohort streamCursorVars -> STM streamCursorVars)
-> (Subscriber -> Poller streamCursorVars -> STM streamCursorVars)
-> ParameterizedQueryHash
-> Maybe OperationName
-> STM (Maybe (Poller streamCursorVars), streamCursorVars)
findPollerForSubscriber
Subscriber
subscriber
PollerMap (TVar CursorVariableValues)
streamQueryMap
BackendPollerKey
handlerId
CohortVariables
cohortKey
Subscriber
-> Cohort (TVar CursorVariableValues)
-> STM (TVar CursorVariableValues)
forall {b}. Subscriber -> Cohort b -> STM b
addToCohort
Subscriber
-> Poller (TVar CursorVariableValues)
-> STM (TVar CursorVariableValues)
addToPoller
ParameterizedQueryHash
parameterizedQueryHash
Maybe OperationName
operationName
Maybe (Poller (TVar CursorVariableValues))
-> (Poller (TVar CursorVariableValues) -> IO ()) -> IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ Maybe (Poller (TVar CursorVariableValues))
handlerM ((Poller (TVar CursorVariableValues) -> IO ()) -> IO ())
-> (Poller (TVar CursorVariableValues) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Poller (TVar CursorVariableValues)
handler -> do
PollerId
pollerId <- UUID -> PollerId
PollerId (UUID -> PollerId) -> IO UUID -> IO PollerId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
Thread
threadRef <- String -> Logger Hasura -> IO Void -> IO Thread
forall (m :: * -> *).
ForkableMonadIO m =>
String -> Logger Hasura -> m Void -> m Thread
forkImmortal (String
"pollStreamingQuery." String -> ShowS
forall a. Semigroup a => a -> a -> a
<> UUID -> String
forall a. Show a => a -> String
show (PollerId -> UUID
unPollerId PollerId
pollerId)) Logger Hasura
logger
(IO Void -> IO Thread) -> IO Void -> IO Thread
forall a b. (a -> b) -> a -> b
$ IO () -> IO Void
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever
(IO () -> IO Void) -> IO () -> IO Void
forall a b. (a -> b) -> a -> b
$ do
(LiveQueriesOptions
_, LiveQueriesOptions
streamQOpts) <- IO (LiveQueriesOptions, LiveQueriesOptions)
getSubscriptionOptions
let SubscriptionsOptions BatchSize
_ RefetchInterval
refetchInterval = LiveQueriesOptions
streamQOpts
forall (b :: BackendType).
BackendTransport b =>
PollerId
-> TVar PollerResponseState
-> LiveQueriesOptions
-> (SourceName, SourceConfig b)
-> RoleName
-> ParameterizedQueryHash
-> MultiplexedQuery b
-> CohortMap 'Streaming
-> Name
-> SubscriptionPostPollHook
-> Maybe (IO ())
-> PrometheusMetrics
-> IO GranularPrometheusMetricsState
-> TMap (Maybe OperationName) Int
-> ResolvedConnectionTemplate b
-> Maybe (Endo Value)
-> IO ()
pollStreamingQuery @b
PollerId
pollerId
(Poller (TVar CursorVariableValues) -> TVar PollerResponseState
forall streamCursor.
Poller streamCursor -> TVar PollerResponseState
_pPollerState Poller (TVar CursorVariableValues)
handler)
LiveQueriesOptions
streamQOpts
(SourceName
source, SourceConfig b
sourceConfig)
RoleName
role
ParameterizedQueryHash
parameterizedQueryHash
MultiplexedQuery b
query
(Poller (TVar CursorVariableValues)
-> CohortMap (TVar CursorVariableValues)
forall streamCursor. Poller streamCursor -> CohortMap streamCursor
_pCohorts Poller (TVar CursorVariableValues)
handler)
Name
rootFieldName
SubscriptionPostPollHook
postPollHook
Maybe (IO ())
forall a. Maybe a
Nothing
PrometheusMetrics
prometheusMetrics
IO GranularPrometheusMetricsState
granularPrometheusMetricsState
(Poller (TVar CursorVariableValues)
-> TMap (Maybe OperationName) Int
forall streamCursor.
Poller streamCursor -> TMap (Maybe OperationName) Int
_pOperationNamesMap Poller (TVar CursorVariableValues)
handler)
ResolvedConnectionTemplate b
resolvedConnectionTemplate
Maybe (Endo Value)
modifier
DiffTime -> IO ()
sleep (DiffTime -> IO ()) -> DiffTime -> IO ()
forall a b. (a -> b) -> a -> b
$ Refined NonNegative DiffTime -> DiffTime
forall {k} (p :: k) x. Refined p x -> x
unrefine (Refined NonNegative DiffTime -> DiffTime)
-> Refined NonNegative DiffTime -> DiffTime
forall a b. (a -> b) -> a -> b
$ RefetchInterval -> Refined NonNegative DiffTime
unRefetchInterval RefetchInterval
refetchInterval
let !pState :: PollerIOState
pState = Thread -> PollerId -> PollerIOState
PollerIOState Thread
threadRef PollerId
pollerId
$String
String -> PollerIOState -> IO ()
forall a. String -> a -> IO ()
assertNFHere PollerIOState
pState
STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar PollerIOState -> PollerIOState -> STM ()
forall a. TMVar a -> a -> STM ()
STM.putTMVar (Poller (TVar CursorVariableValues) -> TMVar PollerIOState
forall streamCursor. Poller streamCursor -> TMVar PollerIOState
_pIOState Poller (TVar CursorVariableValues)
handler) PollerIOState
pState
IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Gauge -> IO ()
Prometheus.Gauge.inc (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ SubscriptionMetrics -> Gauge
submActiveStreamingPollers (SubscriptionMetrics -> Gauge) -> SubscriptionMetrics -> Gauge
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics -> SubscriptionMetrics
pmSubscriptionMetrics (PrometheusMetrics -> SubscriptionMetrics)
-> PrometheusMetrics -> SubscriptionMetrics
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics
prometheusMetrics
IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Gauge -> IO ()
EKG.Gauge.inc (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ ServerMetrics -> Gauge
smActiveSubscriptions ServerMetrics
serverMetrics
Gauge -> IO ()
EKG.Gauge.inc (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ ServerMetrics -> Gauge
smActiveStreamingSubscriptions ServerMetrics
serverMetrics
let promMetricGranularLabel :: SubscriptionLabel
promMetricGranularLabel = SubscriptionKindLabel
-> Maybe DynamicSubscriptionLabel -> SubscriptionLabel
SubscriptionLabel SubscriptionKindLabel
streamingSubscriptionLabel (DynamicSubscriptionLabel -> Maybe DynamicSubscriptionLabel
forall a. a -> Maybe a
Just (DynamicSubscriptionLabel -> Maybe DynamicSubscriptionLabel)
-> DynamicSubscriptionLabel -> Maybe DynamicSubscriptionLabel
forall a b. (a -> b) -> a -> b
$ ParameterizedQueryHash
-> Maybe OperationName -> DynamicSubscriptionLabel
DynamicSubscriptionLabel ParameterizedQueryHash
parameterizedQueryHash Maybe OperationName
operationName)
promMetricLabel :: SubscriptionLabel
promMetricLabel = SubscriptionKindLabel
-> Maybe DynamicSubscriptionLabel -> SubscriptionLabel
SubscriptionLabel SubscriptionKindLabel
streamingSubscriptionLabel Maybe DynamicSubscriptionLabel
forall a. Maybe a
Nothing
numSubscriptionMetric :: GaugeVector SubscriptionLabel
numSubscriptionMetric = SubscriptionMetrics -> GaugeVector SubscriptionLabel
submActiveSubscriptions (SubscriptionMetrics -> GaugeVector SubscriptionLabel)
-> SubscriptionMetrics -> GaugeVector SubscriptionLabel
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics -> SubscriptionMetrics
pmSubscriptionMetrics (PrometheusMetrics -> SubscriptionMetrics)
-> PrometheusMetrics -> SubscriptionMetrics
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics
prometheusMetrics
IO GranularPrometheusMetricsState
-> Bool -> IO () -> IO () -> IO ()
forall (m :: * -> *).
MonadIO m =>
IO GranularPrometheusMetricsState -> Bool -> IO () -> IO () -> m ()
recordMetricWithLabel
IO GranularPrometheusMetricsState
granularPrometheusMetricsState
Bool
True
(GaugeVector SubscriptionLabel -> SubscriptionLabel -> IO ()
forall label. Ord label => GaugeVector label -> label -> IO ()
GaugeVector.inc GaugeVector SubscriptionLabel
numSubscriptionMetric SubscriptionLabel
promMetricGranularLabel)
(GaugeVector SubscriptionLabel -> SubscriptionLabel -> IO ()
forall label. Ord label => GaugeVector label -> label -> IO ()
GaugeVector.inc GaugeVector SubscriptionLabel
numSubscriptionMetric SubscriptionLabel
promMetricLabel)
StreamingSubscriberDetails -> IO StreamingSubscriberDetails
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (StreamingSubscriberDetails -> IO StreamingSubscriberDetails)
-> StreamingSubscriberDetails -> IO StreamingSubscriberDetails
forall a b. (a -> b) -> a -> b
$ BackendPollerKey
-> (CohortVariables, TVar CursorVariableValues)
-> SubscriberId
-> StreamingSubscriberDetails
forall a.
BackendPollerKey -> a -> SubscriberId -> SubscriberDetails a
SubscriberDetails BackendPollerKey
handlerId (CohortVariables
cohortKey, TVar CursorVariableValues
cohortCursorTVar) SubscriberId
subscriberId
where
SubscriptionsState PollerMap ()
_ PollerMap (TVar CursorVariableValues)
streamQueryMap SubscriptionPostPollHook
postPollHook AsyncActionSubscriptionState
_ = SubscriptionsState
subscriptionState
SubscriptionQueryPlan (ParameterizedSubscriptionQueryPlan RoleName
role MultiplexedQuery b
query) SourceConfig b
sourceConfig CohortId
cohortId ResolvedConnectionTemplate b
resolvedConnectionTemplate CohortVariables
cohortKey Maybe Name
_ = SubscriptionQueryPlan b (MultiplexedQuery b)
plan
handlerId :: BackendPollerKey
handlerId = AnyBackend PollerKey -> BackendPollerKey
BackendPollerKey (AnyBackend PollerKey -> BackendPollerKey)
-> AnyBackend PollerKey -> BackendPollerKey
forall a b. (a -> b) -> a -> b
$ forall (b :: BackendType) (i :: BackendType -> *).
HasTag b =>
i b -> AnyBackend i
AB.mkAnyBackend @b (PollerKey b -> AnyBackend PollerKey)
-> PollerKey b -> AnyBackend PollerKey
forall a b. (a -> b) -> a -> b
$ SourceName
-> RoleName
-> Text
-> ResolvedConnectionTemplate b
-> ParameterizedQueryHash
-> PollerKey b
forall (b :: BackendType).
SourceName
-> RoleName
-> Text
-> ResolvedConnectionTemplate b
-> ParameterizedQueryHash
-> PollerKey b
PollerKey SourceName
source RoleName
role (MultiplexedQuery b -> Text
forall a. ToTxt a => a -> Text
toTxt MultiplexedQuery b
query) ResolvedConnectionTemplate b
resolvedConnectionTemplate ParameterizedQueryHash
parameterizedQueryHash
addToCohort :: Subscriber -> Cohort b -> STM b
addToCohort Subscriber
subscriber Cohort b
handlerC = do
Subscriber
-> SubscriberId -> TMap SubscriberId Subscriber -> STM ()
forall k v. Hashable k => v -> k -> TMap k v -> STM ()
TMap.insert Subscriber
subscriber (Subscriber -> SubscriberId
_sId Subscriber
subscriber) (TMap SubscriberId Subscriber -> STM ())
-> TMap SubscriberId Subscriber -> STM ()
forall a b. (a -> b) -> a -> b
$ Cohort b -> TMap SubscriberId Subscriber
forall streamCursorVars.
Cohort streamCursorVars -> TMap SubscriberId Subscriber
_cNewSubscribers Cohort b
handlerC
b -> STM b
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (b -> STM b) -> b -> STM b
forall a b. (a -> b) -> a -> b
$ Cohort b -> b
forall streamCursorVars.
Cohort streamCursorVars -> streamCursorVars
_cStreamCursorVariables Cohort b
handlerC
addToPoller :: Subscriber
-> Poller (TVar CursorVariableValues)
-> STM (TVar CursorVariableValues)
addToPoller Subscriber
subscriber Poller (TVar CursorVariableValues)
handler = do
TVar CursorVariableValues
latestCursorValues <-
CursorVariableValues -> STM (TVar CursorVariableValues)
forall a. a -> STM (TVar a)
STM.newTVar (HashMap Name TxtEncodedVal -> CursorVariableValues
CursorVariableValues (ValidatedVariables (HashMap Name) -> HashMap Name TxtEncodedVal
forall (f :: * -> *). ValidatedVariables f -> f TxtEncodedVal
_unValidatedVariables (CohortVariables -> ValidatedVariables (HashMap Name)
_cvCursorVariables CohortVariables
cohortKey)))
!Cohort (TVar CursorVariableValues)
newCohort <- CohortId
-> TVar (Maybe ResponseHash)
-> TMap SubscriberId Subscriber
-> TMap SubscriberId Subscriber
-> TVar CursorVariableValues
-> Cohort (TVar CursorVariableValues)
forall streamCursorVars.
CohortId
-> TVar (Maybe ResponseHash)
-> TMap SubscriberId Subscriber
-> TMap SubscriberId Subscriber
-> streamCursorVars
-> Cohort streamCursorVars
Cohort CohortId
cohortId (TVar (Maybe ResponseHash)
-> TMap SubscriberId Subscriber
-> TMap SubscriberId Subscriber
-> TVar CursorVariableValues
-> Cohort (TVar CursorVariableValues))
-> STM (TVar (Maybe ResponseHash))
-> STM
(TMap SubscriberId Subscriber
-> TMap SubscriberId Subscriber
-> TVar CursorVariableValues
-> Cohort (TVar CursorVariableValues))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe ResponseHash -> STM (TVar (Maybe ResponseHash))
forall a. a -> STM (TVar a)
STM.newTVar Maybe ResponseHash
forall a. Maybe a
Nothing STM
(TMap SubscriberId Subscriber
-> TMap SubscriberId Subscriber
-> TVar CursorVariableValues
-> Cohort (TVar CursorVariableValues))
-> STM (TMap SubscriberId Subscriber)
-> STM
(TMap SubscriberId Subscriber
-> TVar CursorVariableValues -> Cohort (TVar CursorVariableValues))
forall a b. STM (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> STM (TMap SubscriberId Subscriber)
forall k v. STM (TMap k v)
TMap.new STM
(TMap SubscriberId Subscriber
-> TVar CursorVariableValues -> Cohort (TVar CursorVariableValues))
-> STM (TMap SubscriberId Subscriber)
-> STM
(TVar CursorVariableValues -> Cohort (TVar CursorVariableValues))
forall a b. STM (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> STM (TMap SubscriberId Subscriber)
forall k v. STM (TMap k v)
TMap.new STM
(TVar CursorVariableValues -> Cohort (TVar CursorVariableValues))
-> STM (TVar CursorVariableValues)
-> STM (Cohort (TVar CursorVariableValues))
forall a b. STM (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> TVar CursorVariableValues -> STM (TVar CursorVariableValues)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure TVar CursorVariableValues
latestCursorValues
TVar CursorVariableValues
cohortCursorVals <- Subscriber
-> Cohort (TVar CursorVariableValues)
-> STM (TVar CursorVariableValues)
forall {b}. Subscriber -> Cohort b -> STM b
addToCohort Subscriber
subscriber Cohort (TVar CursorVariableValues)
newCohort
Cohort (TVar CursorVariableValues)
-> CohortVariables
-> CohortMap (TVar CursorVariableValues)
-> STM ()
forall k v. Hashable k => v -> k -> TMap k v -> STM ()
TMap.insert Cohort (TVar CursorVariableValues)
newCohort CohortVariables
cohortKey (CohortMap (TVar CursorVariableValues) -> STM ())
-> CohortMap (TVar CursorVariableValues) -> STM ()
forall a b. (a -> b) -> a -> b
$ Poller (TVar CursorVariableValues)
-> CohortMap (TVar CursorVariableValues)
forall streamCursor. Poller streamCursor -> CohortMap streamCursor
_pCohorts Poller (TVar CursorVariableValues)
handler
TVar CursorVariableValues -> STM (TVar CursorVariableValues)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure TVar CursorVariableValues
cohortCursorVals
removeLiveQuery ::
L.Logger L.Hasura ->
ServerMetrics ->
PrometheusMetrics ->
SubscriptionsState ->
LiveQuerySubscriberDetails ->
IO GranularPrometheusMetricsState ->
Maybe OperationName ->
IO ()
removeLiveQuery :: Logger Hasura
-> ServerMetrics
-> PrometheusMetrics
-> SubscriptionsState
-> LiveQuerySubscriberDetails
-> IO GranularPrometheusMetricsState
-> Maybe OperationName
-> IO ()
removeLiveQuery Logger Hasura
logger ServerMetrics
serverMetrics PrometheusMetrics
prometheusMetrics SubscriptionsState
lqState lqId :: LiveQuerySubscriberDetails
lqId@(SubscriberDetails BackendPollerKey
handlerId CohortVariables
cohortId SubscriberId
sinkId) IO GranularPrometheusMetricsState
granularPrometheusMetricsState Maybe OperationName
maybeOperationName = IO () -> IO ()
forall a. IO a -> IO a
mask_ (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
IO (IO ()) -> IO ()
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join
(IO (IO ()) -> IO ()) -> IO (IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ STM (IO ()) -> IO (IO ())
forall a. STM a -> IO a
STM.atomically
(STM (IO ()) -> IO (IO ())) -> STM (IO ()) -> IO (IO ())
forall a b. (a -> b) -> a -> b
$ do
Maybe (Poller (), Cohort ())
detM <- PollerMap () -> STM (Maybe (Poller (), Cohort ()))
getQueryDet PollerMap ()
lqMap
case Maybe (Poller (), Cohort ())
detM of
Maybe (Poller (), Cohort ())
Nothing -> IO () -> STM (IO ())
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (() -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
Just (Poller CohortMap ()
cohorts TVar PollerResponseState
pollerState TMVar PollerIOState
ioState ParameterizedQueryHash
parameterizedQueryHash TMap (Maybe OperationName) Int
operationNamesMap, Cohort ()
cohort) -> do
Maybe OperationName
-> TMap (Maybe OperationName) Int -> STM (Maybe Int)
forall k v. Hashable k => k -> TMap k v -> STM (Maybe v)
TMap.lookup Maybe OperationName
maybeOperationName TMap (Maybe OperationName) Int
operationNamesMap STM (Maybe Int) -> (Maybe Int -> STM ()) -> STM ()
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Just Int
1 -> Maybe OperationName -> TMap (Maybe OperationName) Int -> STM ()
forall k v. Hashable k => k -> TMap k v -> STM ()
TMap.delete Maybe OperationName
maybeOperationName TMap (Maybe OperationName) Int
operationNamesMap
Just Int
_ -> (Int -> Int)
-> Maybe OperationName -> TMap (Maybe OperationName) Int -> STM ()
forall k v. Hashable k => (v -> v) -> k -> TMap k v -> STM ()
TMap.adjust (\Int
v -> Int
v Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1) Maybe OperationName
maybeOperationName TMap (Maybe OperationName) Int
operationNamesMap
Maybe Int
Nothing -> () -> STM ()
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
CohortMap ()
-> TVar PollerResponseState
-> TMVar PollerIOState
-> Cohort ()
-> ParameterizedQueryHash
-> STM (IO ())
cleanHandlerC CohortMap ()
cohorts TVar PollerResponseState
pollerState TMVar PollerIOState
ioState Cohort ()
cohort ParameterizedQueryHash
parameterizedQueryHash
IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Gauge -> IO ()
EKG.Gauge.dec (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ ServerMetrics -> Gauge
smActiveSubscriptions ServerMetrics
serverMetrics
IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Gauge -> IO ()
EKG.Gauge.dec (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ ServerMetrics -> Gauge
smActiveLiveQueries ServerMetrics
serverMetrics
where
lqMap :: PollerMap ()
lqMap = SubscriptionsState -> PollerMap ()
_ssLiveQueryMap SubscriptionsState
lqState
getQueryDet :: PollerMap () -> STM (Maybe (Poller (), Cohort ()))
getQueryDet PollerMap ()
subMap = do
Maybe (Poller ())
pollerM <- BackendPollerKey -> PollerMap () -> STM (Maybe (Poller ()))
forall key value.
(Eq key, Hashable key) =>
key -> Map key value -> STM (Maybe value)
STMMap.lookup BackendPollerKey
handlerId PollerMap ()
subMap
(Maybe (Maybe (Poller (), Cohort ()))
-> Maybe (Poller (), Cohort ()))
-> STM (Maybe (Maybe (Poller (), Cohort ())))
-> STM (Maybe (Poller (), Cohort ()))
forall a b. (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Maybe (Maybe (Poller (), Cohort ()))
-> Maybe (Poller (), Cohort ())
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join
(STM (Maybe (Maybe (Poller (), Cohort ())))
-> STM (Maybe (Poller (), Cohort ())))
-> STM (Maybe (Maybe (Poller (), Cohort ())))
-> STM (Maybe (Poller (), Cohort ()))
forall a b. (a -> b) -> a -> b
$ Maybe (Poller ())
-> (Poller () -> STM (Maybe (Poller (), Cohort ())))
-> STM (Maybe (Maybe (Poller (), Cohort ())))
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM Maybe (Poller ())
pollerM
((Poller () -> STM (Maybe (Poller (), Cohort ())))
-> STM (Maybe (Maybe (Poller (), Cohort ()))))
-> (Poller () -> STM (Maybe (Poller (), Cohort ())))
-> STM (Maybe (Maybe (Poller (), Cohort ())))
forall a b. (a -> b) -> a -> b
$ \Poller ()
poller -> do
Maybe (Cohort ())
cohortM <- CohortVariables -> CohortMap () -> STM (Maybe (Cohort ()))
forall k v. Hashable k => k -> TMap k v -> STM (Maybe v)
TMap.lookup CohortVariables
cohortId (Poller () -> CohortMap ()
forall streamCursor. Poller streamCursor -> CohortMap streamCursor
_pCohorts Poller ()
poller)
Maybe (Poller (), Cohort ()) -> STM (Maybe (Poller (), Cohort ()))
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (Poller (), Cohort ())
-> STM (Maybe (Poller (), Cohort ())))
-> Maybe (Poller (), Cohort ())
-> STM (Maybe (Poller (), Cohort ()))
forall a b. (a -> b) -> a -> b
$ (Poller ()
poller,) (Cohort () -> (Poller (), Cohort ()))
-> Maybe (Cohort ()) -> Maybe (Poller (), Cohort ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (Cohort ())
cohortM
cleanHandlerC :: CohortMap ()
-> TVar PollerResponseState
-> TMVar PollerIOState
-> Cohort ()
-> ParameterizedQueryHash
-> STM (IO ())
cleanHandlerC CohortMap ()
cohortMap TVar PollerResponseState
pollerState TMVar PollerIOState
ioState Cohort ()
handlerC ParameterizedQueryHash
parameterizedQueryHash = do
let curOps :: TMap SubscriberId Subscriber
curOps = Cohort () -> TMap SubscriberId Subscriber
forall streamCursorVars.
Cohort streamCursorVars -> TMap SubscriberId Subscriber
_cExistingSubscribers Cohort ()
handlerC
newOps :: TMap SubscriberId Subscriber
newOps = Cohort () -> TMap SubscriberId Subscriber
forall streamCursorVars.
Cohort streamCursorVars -> TMap SubscriberId Subscriber
_cNewSubscribers Cohort ()
handlerC
SubscriberId -> TMap SubscriberId Subscriber -> STM ()
forall k v. Hashable k => k -> TMap k v -> STM ()
TMap.delete SubscriberId
sinkId TMap SubscriberId Subscriber
curOps
SubscriberId -> TMap SubscriberId Subscriber -> STM ()
forall k v. Hashable k => k -> TMap k v -> STM ()
TMap.delete SubscriberId
sinkId TMap SubscriberId Subscriber
newOps
Bool
cohortIsEmpty <-
Bool -> Bool -> Bool
(&&)
(Bool -> Bool -> Bool) -> STM Bool -> STM (Bool -> Bool)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TMap SubscriberId Subscriber -> STM Bool
forall k v. TMap k v -> STM Bool
TMap.null TMap SubscriberId Subscriber
curOps
STM (Bool -> Bool) -> STM Bool -> STM Bool
forall a b. STM (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> TMap SubscriberId Subscriber -> STM Bool
forall k v. TMap k v -> STM Bool
TMap.null TMap SubscriberId Subscriber
newOps
Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
cohortIsEmpty (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ CohortVariables -> CohortMap () -> STM ()
forall k v. Hashable k => k -> TMap k v -> STM ()
TMap.delete CohortVariables
cohortId CohortMap ()
cohortMap
Bool
handlerIsEmpty <- CohortMap () -> STM Bool
forall k v. TMap k v -> STM Bool
TMap.null CohortMap ()
cohortMap
let promMetricGranularLabel :: SubscriptionLabel
promMetricGranularLabel = SubscriptionKindLabel
-> Maybe DynamicSubscriptionLabel -> SubscriptionLabel
SubscriptionLabel SubscriptionKindLabel
liveQuerySubscriptionLabel (DynamicSubscriptionLabel -> Maybe DynamicSubscriptionLabel
forall a. a -> Maybe a
Just (DynamicSubscriptionLabel -> Maybe DynamicSubscriptionLabel)
-> DynamicSubscriptionLabel -> Maybe DynamicSubscriptionLabel
forall a b. (a -> b) -> a -> b
$ ParameterizedQueryHash
-> Maybe OperationName -> DynamicSubscriptionLabel
DynamicSubscriptionLabel ParameterizedQueryHash
parameterizedQueryHash Maybe OperationName
maybeOperationName)
promMetricLabel :: SubscriptionLabel
promMetricLabel = SubscriptionKindLabel
-> Maybe DynamicSubscriptionLabel -> SubscriptionLabel
SubscriptionLabel SubscriptionKindLabel
liveQuerySubscriptionLabel Maybe DynamicSubscriptionLabel
forall a. Maybe a
Nothing
if Bool
handlerIsEmpty
then do
BackendPollerKey -> PollerMap () -> STM ()
forall key value.
(Eq key, Hashable key) =>
key -> Map key value -> STM ()
STMMap.delete BackendPollerKey
handlerId PollerMap ()
lqMap
Maybe Thread
threadRefM <- (PollerIOState -> Thread) -> Maybe PollerIOState -> Maybe Thread
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap PollerIOState -> Thread
_pThread (Maybe PollerIOState -> Maybe Thread)
-> STM (Maybe PollerIOState) -> STM (Maybe Thread)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TMVar PollerIOState -> STM (Maybe PollerIOState)
forall a. TMVar a -> STM (Maybe a)
STM.tryReadTMVar TMVar PollerIOState
ioState
IO () -> STM (IO ())
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return
(IO () -> STM (IO ())) -> IO () -> STM (IO ())
forall a b. (a -> b) -> a -> b
$
case Maybe Thread
threadRefM of
Just Thread
threadRef -> do
Thread -> IO ()
Immortal.stop Thread
threadRef
IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
PollerResponseState
pollerLastState <- TVar PollerResponseState -> IO PollerResponseState
forall a. TVar a -> IO a
STM.readTVarIO TVar PollerResponseState
pollerState
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (PollerResponseState
pollerLastState PollerResponseState -> PollerResponseState -> Bool
forall a. Eq a => a -> a -> Bool
== PollerResponseState
PRSError)
(IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Gauge -> IO ()
Prometheus.Gauge.dec
(Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ SubscriptionMetrics -> Gauge
submActiveLiveQueryPollersInError
(SubscriptionMetrics -> Gauge) -> SubscriptionMetrics -> Gauge
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics -> SubscriptionMetrics
pmSubscriptionMetrics PrometheusMetrics
prometheusMetrics
Gauge -> IO ()
Prometheus.Gauge.dec (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ SubscriptionMetrics -> Gauge
submActiveLiveQueryPollers (SubscriptionMetrics -> Gauge) -> SubscriptionMetrics -> Gauge
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics -> SubscriptionMetrics
pmSubscriptionMetrics PrometheusMetrics
prometheusMetrics
let numSubscriptionMetric :: GaugeVector SubscriptionLabel
numSubscriptionMetric = SubscriptionMetrics -> GaugeVector SubscriptionLabel
submActiveSubscriptions (SubscriptionMetrics -> GaugeVector SubscriptionLabel)
-> SubscriptionMetrics -> GaugeVector SubscriptionLabel
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics -> SubscriptionMetrics
pmSubscriptionMetrics (PrometheusMetrics -> SubscriptionMetrics)
-> PrometheusMetrics -> SubscriptionMetrics
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics
prometheusMetrics
IO GranularPrometheusMetricsState
-> Bool -> IO () -> IO () -> IO ()
forall (m :: * -> *).
MonadIO m =>
IO GranularPrometheusMetricsState -> Bool -> IO () -> IO () -> m ()
recordMetricWithLabel
IO GranularPrometheusMetricsState
granularPrometheusMetricsState
Bool
True
(GaugeVector SubscriptionLabel -> SubscriptionLabel -> IO ()
forall label. Ord label => GaugeVector label -> label -> IO ()
GaugeVector.dec GaugeVector SubscriptionLabel
numSubscriptionMetric SubscriptionLabel
promMetricGranularLabel)
(GaugeVector SubscriptionLabel -> SubscriptionLabel -> IO ()
forall label. Ord label => GaugeVector label -> label -> IO ()
GaugeVector.dec GaugeVector SubscriptionLabel
numSubscriptionMetric SubscriptionLabel
promMetricLabel)
Maybe Thread
Nothing ->
Logger Hasura
-> forall a (m :: * -> *).
(ToEngineLog a Hasura, MonadIO m) =>
a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
(ToEngineLog a impl, MonadIO m) =>
a -> m ()
L.unLogger Logger Hasura
logger
(UnstructuredLog -> IO ()) -> UnstructuredLog -> IO ()
forall a b. (a -> b) -> a -> b
$ LogLevel -> SerializableBlob -> UnstructuredLog
L.UnstructuredLog LogLevel
L.LevelError
(SerializableBlob -> UnstructuredLog)
-> SerializableBlob -> UnstructuredLog
forall a b. (a -> b) -> a -> b
$ String -> SerializableBlob
forall a. IsString a => String -> a
fromString
(String -> SerializableBlob) -> String -> SerializableBlob
forall a b. (a -> b) -> a -> b
$ String
"In removeLiveQuery no worker thread installed. Please report this as a bug: "
String -> ShowS
forall a. Semigroup a => a -> a -> a
<> LiveQuerySubscriberDetails -> String
forall a. Show a => a -> String
show LiveQuerySubscriberDetails
lqId
else do
let numSubscriptionMetric :: GaugeVector SubscriptionLabel
numSubscriptionMetric = SubscriptionMetrics -> GaugeVector SubscriptionLabel
submActiveSubscriptions (SubscriptionMetrics -> GaugeVector SubscriptionLabel)
-> SubscriptionMetrics -> GaugeVector SubscriptionLabel
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics -> SubscriptionMetrics
pmSubscriptionMetrics (PrometheusMetrics -> SubscriptionMetrics)
-> PrometheusMetrics -> SubscriptionMetrics
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics
prometheusMetrics
IO () -> STM (IO ())
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return
(IO () -> STM (IO ())) -> IO () -> STM (IO ())
forall a b. (a -> b) -> a -> b
$ IO GranularPrometheusMetricsState
-> Bool -> IO () -> IO () -> IO ()
forall (m :: * -> *).
MonadIO m =>
IO GranularPrometheusMetricsState -> Bool -> IO () -> IO () -> m ()
recordMetricWithLabel
IO GranularPrometheusMetricsState
granularPrometheusMetricsState
Bool
True
(GaugeVector SubscriptionLabel -> SubscriptionLabel -> IO ()
forall label. Ord label => GaugeVector label -> label -> IO ()
GaugeVector.dec GaugeVector SubscriptionLabel
numSubscriptionMetric SubscriptionLabel
promMetricGranularLabel)
(GaugeVector SubscriptionLabel -> SubscriptionLabel -> IO ()
forall label. Ord label => GaugeVector label -> label -> IO ()
GaugeVector.dec GaugeVector SubscriptionLabel
numSubscriptionMetric SubscriptionLabel
promMetricLabel)
removeStreamingQuery ::
L.Logger L.Hasura ->
ServerMetrics ->
PrometheusMetrics ->
SubscriptionsState ->
StreamingSubscriberDetails ->
IO GranularPrometheusMetricsState ->
Maybe OperationName ->
IO ()
removeStreamingQuery :: Logger Hasura
-> ServerMetrics
-> PrometheusMetrics
-> SubscriptionsState
-> StreamingSubscriberDetails
-> IO GranularPrometheusMetricsState
-> Maybe OperationName
-> IO ()
removeStreamingQuery Logger Hasura
logger ServerMetrics
serverMetrics PrometheusMetrics
prometheusMetrics SubscriptionsState
subscriptionState (SubscriberDetails BackendPollerKey
handlerId (CohortVariables
cohortId, TVar CursorVariableValues
cursorVariableTV) SubscriberId
sinkId) IO GranularPrometheusMetricsState
granularPrometheusMetricsState Maybe OperationName
maybeOperationName = IO () -> IO ()
forall a. IO a -> IO a
mask_ (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
IO (IO ()) -> IO ()
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join
(IO (IO ()) -> IO ()) -> IO (IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ STM (IO ()) -> IO (IO ())
forall a. STM a -> IO a
STM.atomically
(STM (IO ()) -> IO (IO ())) -> STM (IO ()) -> IO (IO ())
forall a b. (a -> b) -> a -> b
$ do
Maybe
(Poller (TVar CursorVariableValues), CohortVariables,
Cohort (TVar CursorVariableValues))
detM <- PollerMap (TVar CursorVariableValues)
-> STM
(Maybe
(Poller (TVar CursorVariableValues), CohortVariables,
Cohort (TVar CursorVariableValues)))
getQueryDet PollerMap (TVar CursorVariableValues)
streamQMap
case Maybe
(Poller (TVar CursorVariableValues), CohortVariables,
Cohort (TVar CursorVariableValues))
detM of
Maybe
(Poller (TVar CursorVariableValues), CohortVariables,
Cohort (TVar CursorVariableValues))
Nothing -> IO () -> STM (IO ())
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (() -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
Just (Poller CohortMap (TVar CursorVariableValues)
cohorts TVar PollerResponseState
pollerState TMVar PollerIOState
ioState ParameterizedQueryHash
parameterizedQueryHash TMap (Maybe OperationName) Int
operationNamesMap, CohortVariables
currentCohortId, Cohort (TVar CursorVariableValues)
cohort) -> do
Maybe OperationName
-> TMap (Maybe OperationName) Int -> STM (Maybe Int)
forall k v. Hashable k => k -> TMap k v -> STM (Maybe v)
TMap.lookup Maybe OperationName
maybeOperationName TMap (Maybe OperationName) Int
operationNamesMap STM (Maybe Int) -> (Maybe Int -> STM ()) -> STM ()
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Just Int
1 -> Maybe OperationName -> TMap (Maybe OperationName) Int -> STM ()
forall k v. Hashable k => k -> TMap k v -> STM ()
TMap.delete Maybe OperationName
maybeOperationName TMap (Maybe OperationName) Int
operationNamesMap
Just Int
_ -> (Int -> Int)
-> Maybe OperationName -> TMap (Maybe OperationName) Int -> STM ()
forall k v. Hashable k => (v -> v) -> k -> TMap k v -> STM ()
TMap.adjust (\Int
v -> Int
v Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1) Maybe OperationName
maybeOperationName TMap (Maybe OperationName) Int
operationNamesMap
Maybe Int
Nothing -> () -> STM ()
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
CohortMap (TVar CursorVariableValues)
-> TVar PollerResponseState
-> TMVar PollerIOState
-> (Cohort (TVar CursorVariableValues), CohortVariables)
-> ParameterizedQueryHash
-> STM (IO ())
cleanHandlerC CohortMap (TVar CursorVariableValues)
cohorts TVar PollerResponseState
pollerState TMVar PollerIOState
ioState (Cohort (TVar CursorVariableValues)
cohort, CohortVariables
currentCohortId) ParameterizedQueryHash
parameterizedQueryHash
IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Gauge -> IO ()
EKG.Gauge.dec (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ ServerMetrics -> Gauge
smActiveSubscriptions ServerMetrics
serverMetrics
Gauge -> IO ()
EKG.Gauge.dec (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ ServerMetrics -> Gauge
smActiveStreamingSubscriptions ServerMetrics
serverMetrics
where
streamQMap :: PollerMap (TVar CursorVariableValues)
streamQMap = SubscriptionsState -> PollerMap (TVar CursorVariableValues)
_ssStreamQueryMap SubscriptionsState
subscriptionState
getQueryDet :: PollerMap (TVar CursorVariableValues)
-> STM
(Maybe
(Poller (TVar CursorVariableValues), CohortVariables,
Cohort (TVar CursorVariableValues)))
getQueryDet PollerMap (TVar CursorVariableValues)
subMap = do
Maybe (Poller (TVar CursorVariableValues))
pollerM <- BackendPollerKey
-> PollerMap (TVar CursorVariableValues)
-> STM (Maybe (Poller (TVar CursorVariableValues)))
forall key value.
(Eq key, Hashable key) =>
key -> Map key value -> STM (Maybe value)
STMMap.lookup BackendPollerKey
handlerId PollerMap (TVar CursorVariableValues)
subMap
(CursorVariableValues HashMap Name TxtEncodedVal
currentCohortCursorVal) <- TVar CursorVariableValues -> STM CursorVariableValues
forall a. TVar a -> STM a
STM.readTVar TVar CursorVariableValues
cursorVariableTV
let updatedCohortId :: CohortVariables
updatedCohortId = ValidatedVariables (HashMap Name)
-> CohortVariables -> CohortVariables
modifyCursorCohortVariables (HashMap Name TxtEncodedVal -> ValidatedVariables (HashMap Name)
forall (f :: * -> *). f TxtEncodedVal -> ValidatedVariables f
mkUnsafeValidateVariables HashMap Name TxtEncodedVal
currentCohortCursorVal) CohortVariables
cohortId
(Maybe
(Maybe
(Poller (TVar CursorVariableValues), CohortVariables,
Cohort (TVar CursorVariableValues)))
-> Maybe
(Poller (TVar CursorVariableValues), CohortVariables,
Cohort (TVar CursorVariableValues)))
-> STM
(Maybe
(Maybe
(Poller (TVar CursorVariableValues), CohortVariables,
Cohort (TVar CursorVariableValues))))
-> STM
(Maybe
(Poller (TVar CursorVariableValues), CohortVariables,
Cohort (TVar CursorVariableValues)))
forall a b. (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Maybe
(Maybe
(Poller (TVar CursorVariableValues), CohortVariables,
Cohort (TVar CursorVariableValues)))
-> Maybe
(Poller (TVar CursorVariableValues), CohortVariables,
Cohort (TVar CursorVariableValues))
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join
(STM
(Maybe
(Maybe
(Poller (TVar CursorVariableValues), CohortVariables,
Cohort (TVar CursorVariableValues))))
-> STM
(Maybe
(Poller (TVar CursorVariableValues), CohortVariables,
Cohort (TVar CursorVariableValues))))
-> STM
(Maybe
(Maybe
(Poller (TVar CursorVariableValues), CohortVariables,
Cohort (TVar CursorVariableValues))))
-> STM
(Maybe
(Poller (TVar CursorVariableValues), CohortVariables,
Cohort (TVar CursorVariableValues)))
forall a b. (a -> b) -> a -> b
$ Maybe (Poller (TVar CursorVariableValues))
-> (Poller (TVar CursorVariableValues)
-> STM
(Maybe
(Poller (TVar CursorVariableValues), CohortVariables,
Cohort (TVar CursorVariableValues))))
-> STM
(Maybe
(Maybe
(Poller (TVar CursorVariableValues), CohortVariables,
Cohort (TVar CursorVariableValues))))
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM Maybe (Poller (TVar CursorVariableValues))
pollerM
((Poller (TVar CursorVariableValues)
-> STM
(Maybe
(Poller (TVar CursorVariableValues), CohortVariables,
Cohort (TVar CursorVariableValues))))
-> STM
(Maybe
(Maybe
(Poller (TVar CursorVariableValues), CohortVariables,
Cohort (TVar CursorVariableValues)))))
-> (Poller (TVar CursorVariableValues)
-> STM
(Maybe
(Poller (TVar CursorVariableValues), CohortVariables,
Cohort (TVar CursorVariableValues))))
-> STM
(Maybe
(Maybe
(Poller (TVar CursorVariableValues), CohortVariables,
Cohort (TVar CursorVariableValues))))
forall a b. (a -> b) -> a -> b
$ \Poller (TVar CursorVariableValues)
poller -> do
Maybe (Cohort (TVar CursorVariableValues))
cohortM <- CohortVariables
-> CohortMap (TVar CursorVariableValues)
-> STM (Maybe (Cohort (TVar CursorVariableValues)))
forall k v. Hashable k => k -> TMap k v -> STM (Maybe v)
TMap.lookup CohortVariables
updatedCohortId (Poller (TVar CursorVariableValues)
-> CohortMap (TVar CursorVariableValues)
forall streamCursor. Poller streamCursor -> CohortMap streamCursor
_pCohorts Poller (TVar CursorVariableValues)
poller)
Maybe
(Poller (TVar CursorVariableValues), CohortVariables,
Cohort (TVar CursorVariableValues))
-> STM
(Maybe
(Poller (TVar CursorVariableValues), CohortVariables,
Cohort (TVar CursorVariableValues)))
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe
(Poller (TVar CursorVariableValues), CohortVariables,
Cohort (TVar CursorVariableValues))
-> STM
(Maybe
(Poller (TVar CursorVariableValues), CohortVariables,
Cohort (TVar CursorVariableValues))))
-> Maybe
(Poller (TVar CursorVariableValues), CohortVariables,
Cohort (TVar CursorVariableValues))
-> STM
(Maybe
(Poller (TVar CursorVariableValues), CohortVariables,
Cohort (TVar CursorVariableValues)))
forall a b. (a -> b) -> a -> b
$ (Poller (TVar CursorVariableValues)
poller,CohortVariables
updatedCohortId,) (Cohort (TVar CursorVariableValues)
-> (Poller (TVar CursorVariableValues), CohortVariables,
Cohort (TVar CursorVariableValues)))
-> Maybe (Cohort (TVar CursorVariableValues))
-> Maybe
(Poller (TVar CursorVariableValues), CohortVariables,
Cohort (TVar CursorVariableValues))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (Cohort (TVar CursorVariableValues))
cohortM
cleanHandlerC :: CohortMap (TVar CursorVariableValues)
-> TVar PollerResponseState
-> TMVar PollerIOState
-> (Cohort (TVar CursorVariableValues), CohortVariables)
-> ParameterizedQueryHash
-> STM (IO ())
cleanHandlerC CohortMap (TVar CursorVariableValues)
cohortMap TVar PollerResponseState
pollerState TMVar PollerIOState
ioState (Cohort (TVar CursorVariableValues)
handlerC, CohortVariables
currentCohortId) ParameterizedQueryHash
parameterizedQueryHash = do
let curOps :: TMap SubscriberId Subscriber
curOps = Cohort (TVar CursorVariableValues) -> TMap SubscriberId Subscriber
forall streamCursorVars.
Cohort streamCursorVars -> TMap SubscriberId Subscriber
_cExistingSubscribers Cohort (TVar CursorVariableValues)
handlerC
newOps :: TMap SubscriberId Subscriber
newOps = Cohort (TVar CursorVariableValues) -> TMap SubscriberId Subscriber
forall streamCursorVars.
Cohort streamCursorVars -> TMap SubscriberId Subscriber
_cNewSubscribers Cohort (TVar CursorVariableValues)
handlerC
SubscriberId -> TMap SubscriberId Subscriber -> STM ()
forall k v. Hashable k => k -> TMap k v -> STM ()
TMap.delete SubscriberId
sinkId TMap SubscriberId Subscriber
curOps
SubscriberId -> TMap SubscriberId Subscriber -> STM ()
forall k v. Hashable k => k -> TMap k v -> STM ()
TMap.delete SubscriberId
sinkId TMap SubscriberId Subscriber
newOps
Bool
cohortIsEmpty <-
Bool -> Bool -> Bool
(&&)
(Bool -> Bool -> Bool) -> STM Bool -> STM (Bool -> Bool)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TMap SubscriberId Subscriber -> STM Bool
forall k v. TMap k v -> STM Bool
TMap.null TMap SubscriberId Subscriber
curOps
STM (Bool -> Bool) -> STM Bool -> STM Bool
forall a b. STM (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> TMap SubscriberId Subscriber -> STM Bool
forall k v. TMap k v -> STM Bool
TMap.null TMap SubscriberId Subscriber
newOps
Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
cohortIsEmpty (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ CohortVariables -> CohortMap (TVar CursorVariableValues) -> STM ()
forall k v. Hashable k => k -> TMap k v -> STM ()
TMap.delete CohortVariables
currentCohortId CohortMap (TVar CursorVariableValues)
cohortMap
Bool
handlerIsEmpty <- CohortMap (TVar CursorVariableValues) -> STM Bool
forall k v. TMap k v -> STM Bool
TMap.null CohortMap (TVar CursorVariableValues)
cohortMap
let promMetricGranularLabel :: SubscriptionLabel
promMetricGranularLabel = SubscriptionKindLabel
-> Maybe DynamicSubscriptionLabel -> SubscriptionLabel
SubscriptionLabel SubscriptionKindLabel
streamingSubscriptionLabel (DynamicSubscriptionLabel -> Maybe DynamicSubscriptionLabel
forall a. a -> Maybe a
Just (DynamicSubscriptionLabel -> Maybe DynamicSubscriptionLabel)
-> DynamicSubscriptionLabel -> Maybe DynamicSubscriptionLabel
forall a b. (a -> b) -> a -> b
$ ParameterizedQueryHash
-> Maybe OperationName -> DynamicSubscriptionLabel
DynamicSubscriptionLabel ParameterizedQueryHash
parameterizedQueryHash Maybe OperationName
maybeOperationName)
promMetricLabel :: SubscriptionLabel
promMetricLabel = SubscriptionKindLabel
-> Maybe DynamicSubscriptionLabel -> SubscriptionLabel
SubscriptionLabel SubscriptionKindLabel
streamingSubscriptionLabel Maybe DynamicSubscriptionLabel
forall a. Maybe a
Nothing
if Bool
handlerIsEmpty
then do
BackendPollerKey -> PollerMap (TVar CursorVariableValues) -> STM ()
forall key value.
(Eq key, Hashable key) =>
key -> Map key value -> STM ()
STMMap.delete BackendPollerKey
handlerId PollerMap (TVar CursorVariableValues)
streamQMap
Maybe Thread
threadRefM <- (PollerIOState -> Thread) -> Maybe PollerIOState -> Maybe Thread
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap PollerIOState -> Thread
_pThread (Maybe PollerIOState -> Maybe Thread)
-> STM (Maybe PollerIOState) -> STM (Maybe Thread)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TMVar PollerIOState -> STM (Maybe PollerIOState)
forall a. TMVar a -> STM (Maybe a)
STM.tryReadTMVar TMVar PollerIOState
ioState
IO () -> STM (IO ())
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return
(IO () -> STM (IO ())) -> IO () -> STM (IO ())
forall a b. (a -> b) -> a -> b
$
case Maybe Thread
threadRefM of
Just Thread
threadRef -> do
Thread -> IO ()
Immortal.stop Thread
threadRef
IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
PollerResponseState
pollerLastState <- TVar PollerResponseState -> IO PollerResponseState
forall a. TVar a -> IO a
STM.readTVarIO TVar PollerResponseState
pollerState
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (PollerResponseState
pollerLastState PollerResponseState -> PollerResponseState -> Bool
forall a. Eq a => a -> a -> Bool
== PollerResponseState
PRSError)
(IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Gauge -> IO ()
Prometheus.Gauge.dec
(Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ SubscriptionMetrics -> Gauge
submActiveStreamingPollersInError
(SubscriptionMetrics -> Gauge) -> SubscriptionMetrics -> Gauge
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics -> SubscriptionMetrics
pmSubscriptionMetrics PrometheusMetrics
prometheusMetrics
Gauge -> IO ()
Prometheus.Gauge.dec (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ SubscriptionMetrics -> Gauge
submActiveStreamingPollers (SubscriptionMetrics -> Gauge) -> SubscriptionMetrics -> Gauge
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics -> SubscriptionMetrics
pmSubscriptionMetrics PrometheusMetrics
prometheusMetrics
let numSubscriptionMetric :: GaugeVector SubscriptionLabel
numSubscriptionMetric = SubscriptionMetrics -> GaugeVector SubscriptionLabel
submActiveSubscriptions (SubscriptionMetrics -> GaugeVector SubscriptionLabel)
-> SubscriptionMetrics -> GaugeVector SubscriptionLabel
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics -> SubscriptionMetrics
pmSubscriptionMetrics (PrometheusMetrics -> SubscriptionMetrics)
-> PrometheusMetrics -> SubscriptionMetrics
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics
prometheusMetrics
IO GranularPrometheusMetricsState
-> Bool -> IO () -> IO () -> IO ()
forall (m :: * -> *).
MonadIO m =>
IO GranularPrometheusMetricsState -> Bool -> IO () -> IO () -> m ()
recordMetricWithLabel
IO GranularPrometheusMetricsState
granularPrometheusMetricsState
Bool
True
(GaugeVector SubscriptionLabel -> SubscriptionLabel -> IO ()
forall label. Ord label => GaugeVector label -> label -> IO ()
GaugeVector.dec GaugeVector SubscriptionLabel
numSubscriptionMetric SubscriptionLabel
promMetricGranularLabel)
(GaugeVector SubscriptionLabel -> SubscriptionLabel -> IO ()
forall label. Ord label => GaugeVector label -> label -> IO ()
GaugeVector.dec GaugeVector SubscriptionLabel
numSubscriptionMetric SubscriptionLabel
promMetricLabel)
Maybe Thread
Nothing ->
Logger Hasura
-> forall a (m :: * -> *).
(ToEngineLog a Hasura, MonadIO m) =>
a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
(ToEngineLog a impl, MonadIO m) =>
a -> m ()
L.unLogger Logger Hasura
logger
(UnstructuredLog -> IO ()) -> UnstructuredLog -> IO ()
forall a b. (a -> b) -> a -> b
$ LogLevel -> SerializableBlob -> UnstructuredLog
L.UnstructuredLog LogLevel
L.LevelError
(SerializableBlob -> UnstructuredLog)
-> SerializableBlob -> UnstructuredLog
forall a b. (a -> b) -> a -> b
$ String -> SerializableBlob
forall a. IsString a => String -> a
fromString
(String -> SerializableBlob) -> String -> SerializableBlob
forall a b. (a -> b) -> a -> b
$ String
"In removeStreamingQuery no worker thread installed. Please report this as a bug: "
String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
" poller_id: "
String -> ShowS
forall a. Semigroup a => a -> a -> a
<> BackendPollerKey -> String
forall a. Show a => a -> String
show BackendPollerKey
handlerId
String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
", cohort_id: "
String -> ShowS
forall a. Semigroup a => a -> a -> a
<> CohortVariables -> String
forall a. Show a => a -> String
show CohortVariables
cohortId
String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
", subscriber_id:"
String -> ShowS
forall a. Semigroup a => a -> a -> a
<> SubscriberId -> String
forall a. Show a => a -> String
show SubscriberId
sinkId
else do
let numSubscriptionMetric :: GaugeVector SubscriptionLabel
numSubscriptionMetric = SubscriptionMetrics -> GaugeVector SubscriptionLabel
submActiveSubscriptions (SubscriptionMetrics -> GaugeVector SubscriptionLabel)
-> SubscriptionMetrics -> GaugeVector SubscriptionLabel
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics -> SubscriptionMetrics
pmSubscriptionMetrics (PrometheusMetrics -> SubscriptionMetrics)
-> PrometheusMetrics -> SubscriptionMetrics
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics
prometheusMetrics
IO () -> STM (IO ())
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return
(IO () -> STM (IO ())) -> IO () -> STM (IO ())
forall a b. (a -> b) -> a -> b
$ IO GranularPrometheusMetricsState
-> Bool -> IO () -> IO () -> IO ()
forall (m :: * -> *).
MonadIO m =>
IO GranularPrometheusMetricsState -> Bool -> IO () -> IO () -> m ()
recordMetricWithLabel
IO GranularPrometheusMetricsState
granularPrometheusMetricsState
Bool
True
(GaugeVector SubscriptionLabel -> SubscriptionLabel -> IO ()
forall label. Ord label => GaugeVector label -> label -> IO ()
GaugeVector.dec GaugeVector SubscriptionLabel
numSubscriptionMetric SubscriptionLabel
promMetricGranularLabel)
(GaugeVector SubscriptionLabel -> SubscriptionLabel -> IO ()
forall label. Ord label => GaugeVector label -> label -> IO ()
GaugeVector.dec GaugeVector SubscriptionLabel
numSubscriptionMetric SubscriptionLabel
promMetricLabel)
data LiveAsyncActionQueryOnSource = LiveAsyncActionQueryOnSource
{ LiveAsyncActionQueryOnSource -> LiveQuerySubscriberDetails
_laaqpCurrentLqId :: !LiveQuerySubscriberDetails,
LiveAsyncActionQueryOnSource -> ActionLogResponseMap
_laaqpPrevActionLogMap :: !ActionLogResponseMap,
LiveAsyncActionQueryOnSource
-> LiveQuerySubscriberDetails
-> ActionLogResponseMap
-> IO (Maybe LiveQuerySubscriberDetails)
_laaqpRestartLq :: !(LiveQuerySubscriberDetails -> ActionLogResponseMap -> IO (Maybe LiveQuerySubscriberDetails))
}
data LiveAsyncActionQueryWithNoRelationships = LiveAsyncActionQueryWithNoRelationships
{
LiveAsyncActionQueryWithNoRelationships
-> ActionLogResponseMap -> IO ()
_laaqwnrSendResponse :: !(ActionLogResponseMap -> IO ()),
LiveAsyncActionQueryWithNoRelationships -> IO ()
_laaqwnrSendCompleted :: !(IO ())
}
data LiveAsyncActionQuery
= LAAQNoRelationships LiveAsyncActionQueryWithNoRelationships
| LAAQOnSourceDB LiveAsyncActionQueryOnSource
data AsyncActionQueryLive = AsyncActionQueryLive
{ AsyncActionQueryLive -> NonEmpty ActionId
_aaqlActionIds :: NonEmpty ActionId,
AsyncActionQueryLive -> QErr -> IO ()
_aaqlOnException :: (QErr -> IO ()),
AsyncActionQueryLive -> LiveAsyncActionQuery
_aaqlLiveExecution :: LiveAsyncActionQuery
}
type AsyncActionSubscriptionState = TMap.TMap OperationId AsyncActionQueryLive
addAsyncActionLiveQuery ::
AsyncActionSubscriptionState ->
OperationId ->
NonEmpty ActionId ->
(QErr -> IO ()) ->
LiveAsyncActionQuery ->
IO ()
addAsyncActionLiveQuery :: AsyncActionSubscriptionState
-> OperationId
-> NonEmpty ActionId
-> (QErr -> IO ())
-> LiveAsyncActionQuery
-> IO ()
addAsyncActionLiveQuery AsyncActionSubscriptionState
queriesState OperationId
opId NonEmpty ActionId
actionIds QErr -> IO ()
onException LiveAsyncActionQuery
liveQuery =
STM () -> IO ()
forall a. STM a -> IO a
STM.atomically
(STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ AsyncActionQueryLive
-> OperationId -> AsyncActionSubscriptionState -> STM ()
forall k v. Hashable k => v -> k -> TMap k v -> STM ()
TMap.insert (NonEmpty ActionId
-> (QErr -> IO ()) -> LiveAsyncActionQuery -> AsyncActionQueryLive
AsyncActionQueryLive NonEmpty ActionId
actionIds QErr -> IO ()
onException LiveAsyncActionQuery
liveQuery) OperationId
opId AsyncActionSubscriptionState
queriesState
removeAsyncActionLiveQuery ::
AsyncActionSubscriptionState -> OperationId -> IO ()
removeAsyncActionLiveQuery :: AsyncActionSubscriptionState -> OperationId -> IO ()
removeAsyncActionLiveQuery AsyncActionSubscriptionState
queriesState OperationId
opId =
STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ OperationId -> AsyncActionSubscriptionState -> STM ()
forall k v. Hashable k => k -> TMap k v -> STM ()
TMap.delete OperationId
opId AsyncActionSubscriptionState
queriesState