{-# LANGUAGE TemplateHaskell #-}
module Hasura.GraphQL.Execute.Subscription.State
( SubscriptionsState (..),
initSubscriptionsState,
dumpSubscriptionsState,
SubscriberDetails,
SubscriptionPostPollHook,
addLiveQuery,
addStreamSubscriptionQuery,
removeLiveQuery,
removeStreamingQuery,
LiveAsyncActionQueryOnSource (..),
LiveAsyncActionQueryWithNoRelationships (..),
LiveAsyncActionQuery (..),
AsyncActionQueryLive (..),
AsyncActionSubscriptionState,
addAsyncActionLiveQuery,
removeAsyncActionLiveQuery,
LiveQuerySubscriberDetails,
StreamingSubscriberDetails,
)
where
import Control.Concurrent.Extended (forkImmortal, sleep)
import Control.Concurrent.STM qualified as STM
import Control.Exception (mask_)
import Control.Immortal qualified as Immortal
import Data.Aeson.Extended qualified as J
import Data.String
import Data.Text.Extended
import Data.UUID.V4 qualified as UUID
import GHC.AssertNF.CPP
import Hasura.Base.Error
import Hasura.GraphQL.Execute.Backend
import Hasura.GraphQL.Execute.Subscription.Options
import Hasura.GraphQL.Execute.Subscription.Plan
import Hasura.GraphQL.Execute.Subscription.Poll
import Hasura.GraphQL.Execute.Subscription.TMap qualified as TMap
import Hasura.GraphQL.ParameterizedQueryHash (ParameterizedQueryHash)
import Hasura.GraphQL.Transport.Backend
import Hasura.GraphQL.Transport.HTTP.Protocol (OperationName)
import Hasura.GraphQL.Transport.WebSocket.Protocol (OperationId)
import Hasura.Logging qualified as L
import Hasura.Prelude
import Hasura.RQL.Types.Action
import Hasura.RQL.Types.Common (SourceName)
import Hasura.RQL.Types.Numeric qualified as Numeric
import Hasura.Server.Metrics (ServerMetrics (..))
import Hasura.Server.Prometheus (PrometheusMetrics (..))
import Hasura.Server.Types (RequestId)
import Language.GraphQL.Draft.Syntax qualified as G
import StmContainers.Map qualified as STMMap
import System.Metrics.Gauge qualified as EKG.Gauge
import System.Metrics.Prometheus.Gauge qualified as Prometheus.Gauge
data SubscriptionsState = SubscriptionsState
{ SubscriptionsState -> LiveQueriesOptions
_ssLiveQueryOptions :: LiveQueriesOptions,
SubscriptionsState -> LiveQueriesOptions
_ssStreamQueryOptions :: StreamQueriesOptions,
SubscriptionsState -> PollerMap ()
_ssLiveQueryMap :: PollerMap (),
SubscriptionsState -> PollerMap (TVar CursorVariableValues)
_ssStreamQueryMap :: PollerMap (STM.TVar CursorVariableValues),
SubscriptionsState -> SubscriptionPostPollHook
_ssPostPollHook :: SubscriptionPostPollHook,
SubscriptionsState -> AsyncActionSubscriptionState
_ssAsyncActions :: AsyncActionSubscriptionState
}
initSubscriptionsState ::
LiveQueriesOptions -> StreamQueriesOptions -> SubscriptionPostPollHook -> IO SubscriptionsState
initSubscriptionsState :: LiveQueriesOptions
-> LiveQueriesOptions
-> SubscriptionPostPollHook
-> IO SubscriptionsState
initSubscriptionsState LiveQueriesOptions
liveQOptions LiveQueriesOptions
streamQOptions SubscriptionPostPollHook
pollHook =
STM SubscriptionsState -> IO SubscriptionsState
forall a. STM a -> IO a
STM.atomically (STM SubscriptionsState -> IO SubscriptionsState)
-> STM SubscriptionsState -> IO SubscriptionsState
forall a b. (a -> b) -> a -> b
$
LiveQueriesOptions
-> LiveQueriesOptions
-> PollerMap ()
-> PollerMap (TVar CursorVariableValues)
-> SubscriptionPostPollHook
-> AsyncActionSubscriptionState
-> SubscriptionsState
SubscriptionsState LiveQueriesOptions
liveQOptions (LiveQueriesOptions
-> PollerMap ()
-> PollerMap (TVar CursorVariableValues)
-> SubscriptionPostPollHook
-> AsyncActionSubscriptionState
-> SubscriptionsState)
-> STM LiveQueriesOptions
-> STM
(PollerMap ()
-> PollerMap (TVar CursorVariableValues)
-> SubscriptionPostPollHook
-> AsyncActionSubscriptionState
-> SubscriptionsState)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> LiveQueriesOptions -> STM LiveQueriesOptions
forall (f :: * -> *) a. Applicative f => a -> f a
pure LiveQueriesOptions
streamQOptions STM
(PollerMap ()
-> PollerMap (TVar CursorVariableValues)
-> SubscriptionPostPollHook
-> AsyncActionSubscriptionState
-> SubscriptionsState)
-> STM (PollerMap ())
-> STM
(PollerMap (TVar CursorVariableValues)
-> SubscriptionPostPollHook
-> AsyncActionSubscriptionState
-> SubscriptionsState)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> STM (PollerMap ())
forall key value. STM (Map key value)
STMMap.new STM
(PollerMap (TVar CursorVariableValues)
-> SubscriptionPostPollHook
-> AsyncActionSubscriptionState
-> SubscriptionsState)
-> STM (PollerMap (TVar CursorVariableValues))
-> STM
(SubscriptionPostPollHook
-> AsyncActionSubscriptionState -> SubscriptionsState)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> STM (PollerMap (TVar CursorVariableValues))
forall key value. STM (Map key value)
STMMap.new STM
(SubscriptionPostPollHook
-> AsyncActionSubscriptionState -> SubscriptionsState)
-> STM SubscriptionPostPollHook
-> STM (AsyncActionSubscriptionState -> SubscriptionsState)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> SubscriptionPostPollHook -> STM SubscriptionPostPollHook
forall (f :: * -> *) a. Applicative f => a -> f a
pure SubscriptionPostPollHook
pollHook STM (AsyncActionSubscriptionState -> SubscriptionsState)
-> STM AsyncActionSubscriptionState -> STM SubscriptionsState
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> STM AsyncActionSubscriptionState
forall k v. STM (TMap k v)
TMap.new
dumpSubscriptionsState :: Bool -> SubscriptionsState -> IO J.Value
dumpSubscriptionsState :: Bool -> SubscriptionsState -> IO Value
dumpSubscriptionsState Bool
extended (SubscriptionsState LiveQueriesOptions
liveQOpts LiveQueriesOptions
streamQOpts PollerMap ()
lqMap PollerMap (TVar CursorVariableValues)
streamMap SubscriptionPostPollHook
_ AsyncActionSubscriptionState
_) = do
Value
lqMapJ <- Bool -> PollerMap () -> IO Value
forall streamCursor. Bool -> PollerMap streamCursor -> IO Value
dumpPollerMap Bool
extended PollerMap ()
lqMap
Value
streamMapJ <- Bool -> PollerMap (TVar CursorVariableValues) -> IO Value
forall streamCursor. Bool -> PollerMap streamCursor -> IO Value
dumpPollerMap Bool
extended PollerMap (TVar CursorVariableValues)
streamMap
Value -> IO Value
forall (m :: * -> *) a. Monad m => a -> m a
return (Value -> IO Value) -> Value -> IO Value
forall a b. (a -> b) -> a -> b
$
[Pair] -> Value
J.object
[ Key
"options" Key -> LiveQueriesOptions -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= LiveQueriesOptions
liveQOpts,
Key
"live_queries_map" Key -> Value -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= Value
lqMapJ,
Key
"stream_queries_map" Key -> Value -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= Value
streamMapJ,
Key
"stream_queries_options" Key -> LiveQueriesOptions -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= LiveQueriesOptions
streamQOpts
]
data SubscriberDetails a = SubscriberDetails
{ SubscriberDetails a -> PollerKey
_sdPoller :: !PollerKey,
SubscriberDetails a -> a
_sdCohort :: !a,
SubscriberDetails a -> SubscriberId
_sdSubscriber :: !SubscriberId
}
deriving (Int -> SubscriberDetails a -> ShowS
[SubscriberDetails a] -> ShowS
SubscriberDetails a -> String
(Int -> SubscriberDetails a -> ShowS)
-> (SubscriberDetails a -> String)
-> ([SubscriberDetails a] -> ShowS)
-> Show (SubscriberDetails a)
forall a. Show a => Int -> SubscriberDetails a -> ShowS
forall a. Show a => [SubscriberDetails a] -> ShowS
forall a. Show a => SubscriberDetails a -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [SubscriberDetails a] -> ShowS
$cshowList :: forall a. Show a => [SubscriberDetails a] -> ShowS
show :: SubscriberDetails a -> String
$cshow :: forall a. Show a => SubscriberDetails a -> String
showsPrec :: Int -> SubscriberDetails a -> ShowS
$cshowsPrec :: forall a. Show a => Int -> SubscriberDetails a -> ShowS
Show)
type LiveQuerySubscriberDetails = SubscriberDetails CohortKey
type StreamingSubscriberDetails = SubscriberDetails (CohortKey, STM.TVar CursorVariableValues)
findPollerForSubscriber ::
Subscriber ->
CohortId ->
PollerMap streamCursorVars ->
PollerKey ->
CohortKey ->
(Subscriber -> Cohort streamCursorVars -> STM.STM streamCursorVars) ->
(Subscriber -> CohortId -> Poller streamCursorVars -> STM.STM streamCursorVars) ->
STM.STM ((Maybe (Poller streamCursorVars)), streamCursorVars)
findPollerForSubscriber :: Subscriber
-> CohortId
-> PollerMap streamCursorVars
-> PollerKey
-> CohortKey
-> (Subscriber -> Cohort streamCursorVars -> STM streamCursorVars)
-> (Subscriber
-> CohortId -> Poller streamCursorVars -> STM streamCursorVars)
-> STM (Maybe (Poller streamCursorVars), streamCursorVars)
findPollerForSubscriber Subscriber
subscriber CohortId
cohortId PollerMap streamCursorVars
pollerMap PollerKey
pollerKey CohortKey
cohortKey Subscriber -> Cohort streamCursorVars -> STM streamCursorVars
addToCohort Subscriber
-> CohortId -> Poller streamCursorVars -> STM streamCursorVars
addToPoller =
PollerKey
-> PollerMap streamCursorVars
-> STM (Maybe (Poller streamCursorVars))
forall key value.
(Eq key, Hashable key) =>
key -> Map key value -> STM (Maybe value)
STMMap.lookup PollerKey
pollerKey PollerMap streamCursorVars
pollerMap STM (Maybe (Poller streamCursorVars))
-> (Maybe (Poller streamCursorVars)
-> STM (Maybe (Poller streamCursorVars), streamCursorVars))
-> STM (Maybe (Poller streamCursorVars), streamCursorVars)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Just Poller streamCursorVars
poller -> do
streamCursorVars
cursorVars <-
CohortKey
-> TMap CohortKey (Cohort streamCursorVars)
-> STM (Maybe (Cohort streamCursorVars))
forall k v. (Eq k, Hashable k) => k -> TMap k v -> STM (Maybe v)
TMap.lookup CohortKey
cohortKey (Poller streamCursorVars -> TMap CohortKey (Cohort streamCursorVars)
forall streamCursor. Poller streamCursor -> CohortMap streamCursor
_pCohorts Poller streamCursorVars
poller) STM (Maybe (Cohort streamCursorVars))
-> (Maybe (Cohort streamCursorVars) -> STM streamCursorVars)
-> STM streamCursorVars
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Just Cohort streamCursorVars
cohort -> Subscriber -> Cohort streamCursorVars -> STM streamCursorVars
addToCohort Subscriber
subscriber Cohort streamCursorVars
cohort
Maybe (Cohort streamCursorVars)
Nothing -> Subscriber
-> CohortId -> Poller streamCursorVars -> STM streamCursorVars
addToPoller Subscriber
subscriber CohortId
cohortId Poller streamCursorVars
poller
(Maybe (Poller streamCursorVars), streamCursorVars)
-> STM (Maybe (Poller streamCursorVars), streamCursorVars)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (Poller streamCursorVars)
forall a. Maybe a
Nothing, streamCursorVars
cursorVars)
Maybe (Poller streamCursorVars)
Nothing -> do
!Poller streamCursorVars
poller <- TMap CohortKey (Cohort streamCursorVars)
-> TMVar PollerIOState -> Poller streamCursorVars
forall streamCursor.
CohortMap streamCursor
-> TMVar PollerIOState -> Poller streamCursor
Poller (TMap CohortKey (Cohort streamCursorVars)
-> TMVar PollerIOState -> Poller streamCursorVars)
-> STM (TMap CohortKey (Cohort streamCursorVars))
-> STM (TMVar PollerIOState -> Poller streamCursorVars)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> STM (TMap CohortKey (Cohort streamCursorVars))
forall k v. STM (TMap k v)
TMap.new STM (TMVar PollerIOState -> Poller streamCursorVars)
-> STM (TMVar PollerIOState) -> STM (Poller streamCursorVars)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> STM (TMVar PollerIOState)
forall a. STM (TMVar a)
STM.newEmptyTMVar
streamCursorVars
cursorVars <- Subscriber
-> CohortId -> Poller streamCursorVars -> STM streamCursorVars
addToPoller Subscriber
subscriber CohortId
cohortId Poller streamCursorVars
poller
Poller streamCursorVars
-> PollerKey -> PollerMap streamCursorVars -> STM ()
forall key value.
(Eq key, Hashable key) =>
value -> key -> Map key value -> STM ()
STMMap.insert Poller streamCursorVars
poller PollerKey
pollerKey PollerMap streamCursorVars
pollerMap
(Maybe (Poller streamCursorVars), streamCursorVars)
-> STM (Maybe (Poller streamCursorVars), streamCursorVars)
forall (m :: * -> *) a. Monad m => a -> m a
return ((Maybe (Poller streamCursorVars), streamCursorVars)
-> STM (Maybe (Poller streamCursorVars), streamCursorVars))
-> (Maybe (Poller streamCursorVars), streamCursorVars)
-> STM (Maybe (Poller streamCursorVars), streamCursorVars)
forall a b. (a -> b) -> a -> b
$ (Poller streamCursorVars -> Maybe (Poller streamCursorVars)
forall a. a -> Maybe a
Just Poller streamCursorVars
poller, streamCursorVars
cursorVars)
addLiveQuery ::
forall b.
BackendTransport b =>
L.Logger L.Hasura ->
ServerMetrics ->
PrometheusMetrics ->
SubscriberMetadata ->
SubscriptionsState ->
SourceName ->
ParameterizedQueryHash ->
Maybe OperationName ->
RequestId ->
SubscriptionQueryPlan b (MultiplexedQuery b) ->
OnChange ->
IO LiveQuerySubscriberDetails
addLiveQuery :: Logger Hasura
-> ServerMetrics
-> PrometheusMetrics
-> SubscriberMetadata
-> SubscriptionsState
-> SourceName
-> ParameterizedQueryHash
-> Maybe OperationName
-> RequestId
-> SubscriptionQueryPlan b (MultiplexedQuery b)
-> OnChange
-> IO LiveQuerySubscriberDetails
addLiveQuery
Logger Hasura
logger
ServerMetrics
serverMetrics
PrometheusMetrics
prometheusMetrics
SubscriberMetadata
subscriberMetadata
SubscriptionsState
subscriptionState
SourceName
source
ParameterizedQueryHash
parameterizedQueryHash
Maybe OperationName
operationName
RequestId
requestId
SubscriptionQueryPlan b (MultiplexedQuery b)
plan
OnChange
onResultAction = do
CohortId
cohortId <- IO CohortId
forall (m :: * -> *). MonadIO m => m CohortId
newCohortId
SubscriberId
subscriberId <- IO SubscriberId
newSubscriberId
let !subscriber :: Subscriber
subscriber = SubscriberId
-> SubscriberMetadata
-> RequestId
-> Maybe OperationName
-> OnChange
-> Subscriber
Subscriber SubscriberId
subscriberId SubscriberMetadata
subscriberMetadata RequestId
requestId Maybe OperationName
operationName OnChange
onResultAction
String
String -> Subscriber -> IO ()
forall a. String -> a -> IO ()
$assertNFHere Subscriber
subscriber
(Maybe (Poller ())
pollerMaybe, ()) <-
STM (Maybe (Poller ()), ()) -> IO (Maybe (Poller ()), ())
forall a. STM a -> IO a
STM.atomically (STM (Maybe (Poller ()), ()) -> IO (Maybe (Poller ()), ()))
-> STM (Maybe (Poller ()), ()) -> IO (Maybe (Poller ()), ())
forall a b. (a -> b) -> a -> b
$
Subscriber
-> CohortId
-> PollerMap ()
-> PollerKey
-> CohortKey
-> (Subscriber -> Cohort () -> STM ())
-> (Subscriber -> CohortId -> Poller () -> STM ())
-> STM (Maybe (Poller ()), ())
forall streamCursorVars.
Subscriber
-> CohortId
-> PollerMap streamCursorVars
-> PollerKey
-> CohortKey
-> (Subscriber -> Cohort streamCursorVars -> STM streamCursorVars)
-> (Subscriber
-> CohortId -> Poller streamCursorVars -> STM streamCursorVars)
-> STM (Maybe (Poller streamCursorVars), streamCursorVars)
findPollerForSubscriber
Subscriber
subscriber
CohortId
cohortId
PollerMap ()
lqMap
PollerKey
handlerId
CohortKey
cohortKey
Subscriber -> Cohort () -> STM ()
forall streamCursorVars.
Subscriber -> Cohort streamCursorVars -> STM ()
addToCohort
Subscriber -> CohortId -> Poller () -> STM ()
addToPoller
Maybe (Poller ()) -> (Poller () -> IO ()) -> IO ()
forall (m :: * -> *) a.
Applicative m =>
Maybe a -> (a -> m ()) -> m ()
onJust Maybe (Poller ())
pollerMaybe ((Poller () -> IO ()) -> IO ()) -> (Poller () -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Poller ()
poller -> do
PollerId
pollerId <- UUID -> PollerId
PollerId (UUID -> PollerId) -> IO UUID -> IO PollerId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
Thread
threadRef <- String -> Logger Hasura -> IO Void -> IO Thread
forall (m :: * -> *).
ForkableMonadIO m =>
String -> Logger Hasura -> m Void -> m Thread
forkImmortal (String
"pollLiveQuery." String -> ShowS
forall a. Semigroup a => a -> a -> a
<> PollerId -> String
forall a. Show a => a -> String
show PollerId
pollerId) Logger Hasura
logger (IO Void -> IO Thread) -> IO Void -> IO Thread
forall a b. (a -> b) -> a -> b
$
IO () -> IO Void
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO Void) -> IO () -> IO Void
forall a b. (a -> b) -> a -> b
$ do
PollerId
-> LiveQueriesOptions
-> (SourceName, SourceConfig b)
-> RoleName
-> ParameterizedQueryHash
-> MultiplexedQuery b
-> CohortMap 'LiveQuery
-> SubscriptionPostPollHook
-> IO ()
forall (b :: BackendType).
BackendTransport b =>
PollerId
-> LiveQueriesOptions
-> (SourceName, SourceConfig b)
-> RoleName
-> ParameterizedQueryHash
-> MultiplexedQuery b
-> CohortMap 'LiveQuery
-> SubscriptionPostPollHook
-> IO ()
pollLiveQuery @b PollerId
pollerId LiveQueriesOptions
lqOpts (SourceName
source, SourceConfig b
sourceConfig) RoleName
role ParameterizedQueryHash
parameterizedQueryHash MultiplexedQuery b
query (Poller () -> CohortMap ()
forall streamCursor. Poller streamCursor -> CohortMap streamCursor
_pCohorts Poller ()
poller) SubscriptionPostPollHook
postPollHook
DiffTime -> IO ()
sleep (DiffTime -> IO ()) -> DiffTime -> IO ()
forall a b. (a -> b) -> a -> b
$ NonNegativeDiffTime -> DiffTime
Numeric.unNonNegativeDiffTime (NonNegativeDiffTime -> DiffTime)
-> NonNegativeDiffTime -> DiffTime
forall a b. (a -> b) -> a -> b
$ RefetchInterval -> NonNegativeDiffTime
unRefetchInterval RefetchInterval
refetchInterval
let !pState :: PollerIOState
pState = Thread -> PollerId -> PollerIOState
PollerIOState Thread
threadRef PollerId
pollerId
String
String -> PollerIOState -> IO ()
forall a. String -> a -> IO ()
$assertNFHere PollerIOState
pState
STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar PollerIOState -> PollerIOState -> STM ()
forall a. TMVar a -> a -> STM ()
STM.putTMVar (Poller () -> TMVar PollerIOState
forall streamCursor. Poller streamCursor -> TMVar PollerIOState
_pIOState Poller ()
poller) PollerIOState
pState
IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Gauge -> IO ()
EKG.Gauge.inc (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ ServerMetrics -> Gauge
smActiveSubscriptions ServerMetrics
serverMetrics
IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Gauge -> IO ()
Prometheus.Gauge.inc (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics -> Gauge
pmActiveSubscriptions PrometheusMetrics
prometheusMetrics
IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Gauge -> IO ()
EKG.Gauge.inc (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ ServerMetrics -> Gauge
smActiveLiveQueries ServerMetrics
serverMetrics
LiveQuerySubscriberDetails -> IO LiveQuerySubscriberDetails
forall (f :: * -> *) a. Applicative f => a -> f a
pure (LiveQuerySubscriberDetails -> IO LiveQuerySubscriberDetails)
-> LiveQuerySubscriberDetails -> IO LiveQuerySubscriberDetails
forall a b. (a -> b) -> a -> b
$ PollerKey
-> CohortKey -> SubscriberId -> LiveQuerySubscriberDetails
forall a. PollerKey -> a -> SubscriberId -> SubscriberDetails a
SubscriberDetails PollerKey
handlerId CohortKey
cohortKey SubscriberId
subscriberId
where
SubscriptionsState LiveQueriesOptions
lqOpts LiveQueriesOptions
_ PollerMap ()
lqMap PollerMap (TVar CursorVariableValues)
_ SubscriptionPostPollHook
postPollHook AsyncActionSubscriptionState
_ = SubscriptionsState
subscriptionState
SubscriptionsOptions BatchSize
_ RefetchInterval
refetchInterval = LiveQueriesOptions
lqOpts
SubscriptionQueryPlan (ParameterizedSubscriptionQueryPlan RoleName
role MultiplexedQuery b
query) SourceConfig b
sourceConfig CohortKey
cohortKey Maybe Name
_ = SubscriptionQueryPlan b (MultiplexedQuery b)
plan
handlerId :: PollerKey
handlerId = SourceName -> RoleName -> Text -> PollerKey
PollerKey SourceName
source RoleName
role (Text -> PollerKey) -> Text -> PollerKey
forall a b. (a -> b) -> a -> b
$ MultiplexedQuery b -> Text
forall a. ToTxt a => a -> Text
toTxt MultiplexedQuery b
query
addToCohort :: Subscriber -> Cohort streamCursorVars -> STM ()
addToCohort Subscriber
subscriber Cohort streamCursorVars
handlerC =
Subscriber
-> SubscriberId -> TMap SubscriberId Subscriber -> STM ()
forall k v. (Eq k, Hashable k) => v -> k -> TMap k v -> STM ()
TMap.insert Subscriber
subscriber (Subscriber -> SubscriberId
_sId Subscriber
subscriber) (TMap SubscriberId Subscriber -> STM ())
-> TMap SubscriberId Subscriber -> STM ()
forall a b. (a -> b) -> a -> b
$ Cohort streamCursorVars -> TMap SubscriberId Subscriber
forall streamCursorVars.
Cohort streamCursorVars -> TMap SubscriberId Subscriber
_cNewSubscribers Cohort streamCursorVars
handlerC
addToPoller :: Subscriber -> CohortId -> Poller () -> STM ()
addToPoller Subscriber
subscriber CohortId
cohortId Poller ()
handler = do
!Cohort ()
newCohort <-
CohortId
-> TVar (Maybe ResponseHash)
-> TMap SubscriberId Subscriber
-> TMap SubscriberId Subscriber
-> ()
-> Cohort ()
forall streamCursorVars.
CohortId
-> TVar (Maybe ResponseHash)
-> TMap SubscriberId Subscriber
-> TMap SubscriberId Subscriber
-> streamCursorVars
-> Cohort streamCursorVars
Cohort CohortId
cohortId
(TVar (Maybe ResponseHash)
-> TMap SubscriberId Subscriber
-> TMap SubscriberId Subscriber
-> ()
-> Cohort ())
-> STM (TVar (Maybe ResponseHash))
-> STM
(TMap SubscriberId Subscriber
-> TMap SubscriberId Subscriber -> () -> Cohort ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe ResponseHash -> STM (TVar (Maybe ResponseHash))
forall a. a -> STM (TVar a)
STM.newTVar Maybe ResponseHash
forall a. Maybe a
Nothing
STM
(TMap SubscriberId Subscriber
-> TMap SubscriberId Subscriber -> () -> Cohort ())
-> STM (TMap SubscriberId Subscriber)
-> STM (TMap SubscriberId Subscriber -> () -> Cohort ())
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> STM (TMap SubscriberId Subscriber)
forall k v. STM (TMap k v)
TMap.new
STM (TMap SubscriberId Subscriber -> () -> Cohort ())
-> STM (TMap SubscriberId Subscriber) -> STM (() -> Cohort ())
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> STM (TMap SubscriberId Subscriber)
forall k v. STM (TMap k v)
TMap.new
STM (() -> Cohort ()) -> STM () -> STM (Cohort ())
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> () -> STM ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Subscriber -> Cohort () -> STM ()
forall streamCursorVars.
Subscriber -> Cohort streamCursorVars -> STM ()
addToCohort Subscriber
subscriber Cohort ()
newCohort
Cohort () -> CohortKey -> CohortMap () -> STM ()
forall k v. (Eq k, Hashable k) => v -> k -> TMap k v -> STM ()
TMap.insert Cohort ()
newCohort CohortKey
cohortKey (CohortMap () -> STM ()) -> CohortMap () -> STM ()
forall a b. (a -> b) -> a -> b
$ Poller () -> CohortMap ()
forall streamCursor. Poller streamCursor -> CohortMap streamCursor
_pCohorts Poller ()
handler
addStreamSubscriptionQuery ::
forall b.
BackendTransport b =>
L.Logger L.Hasura ->
ServerMetrics ->
PrometheusMetrics ->
SubscriberMetadata ->
SubscriptionsState ->
SourceName ->
ParameterizedQueryHash ->
Maybe OperationName ->
RequestId ->
G.Name ->
SubscriptionQueryPlan b (MultiplexedQuery b) ->
OnChange ->
IO StreamingSubscriberDetails
addStreamSubscriptionQuery :: Logger Hasura
-> ServerMetrics
-> PrometheusMetrics
-> SubscriberMetadata
-> SubscriptionsState
-> SourceName
-> ParameterizedQueryHash
-> Maybe OperationName
-> RequestId
-> Name
-> SubscriptionQueryPlan b (MultiplexedQuery b)
-> OnChange
-> IO StreamingSubscriberDetails
addStreamSubscriptionQuery
Logger Hasura
logger
ServerMetrics
serverMetrics
PrometheusMetrics
prometheusMetrics
SubscriberMetadata
subscriberMetadata
SubscriptionsState
subscriptionState
SourceName
source
ParameterizedQueryHash
parameterizedQueryHash
Maybe OperationName
operationName
RequestId
requestId
Name
rootFieldName
SubscriptionQueryPlan b (MultiplexedQuery b)
plan
OnChange
onResultAction = do
CohortId
cohortId <- IO CohortId
forall (m :: * -> *). MonadIO m => m CohortId
newCohortId
SubscriberId
subscriberId <- IO SubscriberId
newSubscriberId
let !subscriber :: Subscriber
subscriber = SubscriberId
-> SubscriberMetadata
-> RequestId
-> Maybe OperationName
-> OnChange
-> Subscriber
Subscriber SubscriberId
subscriberId SubscriberMetadata
subscriberMetadata RequestId
requestId Maybe OperationName
operationName OnChange
onResultAction
String
String -> Subscriber -> IO ()
forall a. String -> a -> IO ()
$assertNFHere Subscriber
subscriber
(Maybe (Poller (TVar CursorVariableValues))
handlerM, TVar CursorVariableValues
cohortCursorTVar) <-
STM
(Maybe (Poller (TVar CursorVariableValues)),
TVar CursorVariableValues)
-> IO
(Maybe (Poller (TVar CursorVariableValues)),
TVar CursorVariableValues)
forall a. STM a -> IO a
STM.atomically (STM
(Maybe (Poller (TVar CursorVariableValues)),
TVar CursorVariableValues)
-> IO
(Maybe (Poller (TVar CursorVariableValues)),
TVar CursorVariableValues))
-> STM
(Maybe (Poller (TVar CursorVariableValues)),
TVar CursorVariableValues)
-> IO
(Maybe (Poller (TVar CursorVariableValues)),
TVar CursorVariableValues)
forall a b. (a -> b) -> a -> b
$
Subscriber
-> CohortId
-> PollerMap (TVar CursorVariableValues)
-> PollerKey
-> CohortKey
-> (Subscriber
-> Cohort (TVar CursorVariableValues)
-> STM (TVar CursorVariableValues))
-> (Subscriber
-> CohortId
-> Poller (TVar CursorVariableValues)
-> STM (TVar CursorVariableValues))
-> STM
(Maybe (Poller (TVar CursorVariableValues)),
TVar CursorVariableValues)
forall streamCursorVars.
Subscriber
-> CohortId
-> PollerMap streamCursorVars
-> PollerKey
-> CohortKey
-> (Subscriber -> Cohort streamCursorVars -> STM streamCursorVars)
-> (Subscriber
-> CohortId -> Poller streamCursorVars -> STM streamCursorVars)
-> STM (Maybe (Poller streamCursorVars), streamCursorVars)
findPollerForSubscriber
Subscriber
subscriber
CohortId
cohortId
PollerMap (TVar CursorVariableValues)
streamQueryMap
PollerKey
handlerId
CohortKey
cohortKey
Subscriber
-> Cohort (TVar CursorVariableValues)
-> STM (TVar CursorVariableValues)
forall b. Subscriber -> Cohort b -> STM b
addToCohort
Subscriber
-> CohortId
-> Poller (TVar CursorVariableValues)
-> STM (TVar CursorVariableValues)
addToPoller
Maybe (Poller (TVar CursorVariableValues))
-> (Poller (TVar CursorVariableValues) -> IO ()) -> IO ()
forall (m :: * -> *) a.
Applicative m =>
Maybe a -> (a -> m ()) -> m ()
onJust Maybe (Poller (TVar CursorVariableValues))
handlerM ((Poller (TVar CursorVariableValues) -> IO ()) -> IO ())
-> (Poller (TVar CursorVariableValues) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Poller (TVar CursorVariableValues)
handler -> do
PollerId
pollerId <- UUID -> PollerId
PollerId (UUID -> PollerId) -> IO UUID -> IO PollerId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
Thread
threadRef <- String -> Logger Hasura -> IO Void -> IO Thread
forall (m :: * -> *).
ForkableMonadIO m =>
String -> Logger Hasura -> m Void -> m Thread
forkImmortal (String
"pollStreamingQuery." String -> ShowS
forall a. Semigroup a => a -> a -> a
<> UUID -> String
forall a. Show a => a -> String
show (PollerId -> UUID
unPollerId PollerId
pollerId)) Logger Hasura
logger (IO Void -> IO Thread) -> IO Void -> IO Thread
forall a b. (a -> b) -> a -> b
$
IO () -> IO Void
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO Void) -> IO () -> IO Void
forall a b. (a -> b) -> a -> b
$ do
PollerId
-> LiveQueriesOptions
-> (SourceName, SourceConfig b)
-> RoleName
-> ParameterizedQueryHash
-> MultiplexedQuery b
-> CohortMap 'Streaming
-> Name
-> SubscriptionPostPollHook
-> Maybe (IO ())
-> IO ()
forall (b :: BackendType).
BackendTransport b =>
PollerId
-> LiveQueriesOptions
-> (SourceName, SourceConfig b)
-> RoleName
-> ParameterizedQueryHash
-> MultiplexedQuery b
-> CohortMap 'Streaming
-> Name
-> SubscriptionPostPollHook
-> Maybe (IO ())
-> IO ()
pollStreamingQuery @b PollerId
pollerId LiveQueriesOptions
streamQOpts (SourceName
source, SourceConfig b
sourceConfig) RoleName
role ParameterizedQueryHash
parameterizedQueryHash MultiplexedQuery b
query (Poller (TVar CursorVariableValues)
-> CohortMap (TVar CursorVariableValues)
forall streamCursor. Poller streamCursor -> CohortMap streamCursor
_pCohorts Poller (TVar CursorVariableValues)
handler) Name
rootFieldName SubscriptionPostPollHook
postPollHook Maybe (IO ())
forall a. Maybe a
Nothing
DiffTime -> IO ()
sleep (DiffTime -> IO ()) -> DiffTime -> IO ()
forall a b. (a -> b) -> a -> b
$ NonNegativeDiffTime -> DiffTime
Numeric.unNonNegativeDiffTime (NonNegativeDiffTime -> DiffTime)
-> NonNegativeDiffTime -> DiffTime
forall a b. (a -> b) -> a -> b
$ RefetchInterval -> NonNegativeDiffTime
unRefetchInterval RefetchInterval
refetchInterval
let !pState :: PollerIOState
pState = Thread -> PollerId -> PollerIOState
PollerIOState Thread
threadRef PollerId
pollerId
String
String -> PollerIOState -> IO ()
forall a. String -> a -> IO ()
$assertNFHere PollerIOState
pState
STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar PollerIOState -> PollerIOState -> STM ()
forall a. TMVar a -> a -> STM ()
STM.putTMVar (Poller (TVar CursorVariableValues) -> TMVar PollerIOState
forall streamCursor. Poller streamCursor -> TMVar PollerIOState
_pIOState Poller (TVar CursorVariableValues)
handler) PollerIOState
pState
IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Gauge -> IO ()
EKG.Gauge.inc (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ ServerMetrics -> Gauge
smActiveSubscriptions ServerMetrics
serverMetrics
IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Gauge -> IO ()
Prometheus.Gauge.inc (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics -> Gauge
pmActiveSubscriptions PrometheusMetrics
prometheusMetrics
IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Gauge -> IO ()
EKG.Gauge.inc (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ ServerMetrics -> Gauge
smActiveStreamingSubscriptions ServerMetrics
serverMetrics
StreamingSubscriberDetails -> IO StreamingSubscriberDetails
forall (f :: * -> *) a. Applicative f => a -> f a
pure (StreamingSubscriberDetails -> IO StreamingSubscriberDetails)
-> StreamingSubscriberDetails -> IO StreamingSubscriberDetails
forall a b. (a -> b) -> a -> b
$ PollerKey
-> (CohortKey, TVar CursorVariableValues)
-> SubscriberId
-> StreamingSubscriberDetails
forall a. PollerKey -> a -> SubscriberId -> SubscriberDetails a
SubscriberDetails PollerKey
handlerId (CohortKey
cohortKey, TVar CursorVariableValues
cohortCursorTVar) SubscriberId
subscriberId
where
SubscriptionsState LiveQueriesOptions
_ LiveQueriesOptions
streamQOpts PollerMap ()
_ PollerMap (TVar CursorVariableValues)
streamQueryMap SubscriptionPostPollHook
postPollHook AsyncActionSubscriptionState
_ = SubscriptionsState
subscriptionState
SubscriptionsOptions BatchSize
_ RefetchInterval
refetchInterval = LiveQueriesOptions
streamQOpts
SubscriptionQueryPlan (ParameterizedSubscriptionQueryPlan RoleName
role MultiplexedQuery b
query) SourceConfig b
sourceConfig CohortKey
cohortKey Maybe Name
_ = SubscriptionQueryPlan b (MultiplexedQuery b)
plan
handlerId :: PollerKey
handlerId = SourceName -> RoleName -> Text -> PollerKey
PollerKey SourceName
source RoleName
role (Text -> PollerKey) -> Text -> PollerKey
forall a b. (a -> b) -> a -> b
$ MultiplexedQuery b -> Text
forall a. ToTxt a => a -> Text
toTxt MultiplexedQuery b
query
addToCohort :: Subscriber -> Cohort b -> STM b
addToCohort Subscriber
subscriber Cohort b
handlerC = do
Subscriber
-> SubscriberId -> TMap SubscriberId Subscriber -> STM ()
forall k v. (Eq k, Hashable k) => v -> k -> TMap k v -> STM ()
TMap.insert Subscriber
subscriber (Subscriber -> SubscriberId
_sId Subscriber
subscriber) (TMap SubscriberId Subscriber -> STM ())
-> TMap SubscriberId Subscriber -> STM ()
forall a b. (a -> b) -> a -> b
$ Cohort b -> TMap SubscriberId Subscriber
forall streamCursorVars.
Cohort streamCursorVars -> TMap SubscriberId Subscriber
_cNewSubscribers Cohort b
handlerC
b -> STM b
forall (f :: * -> *) a. Applicative f => a -> f a
pure (b -> STM b) -> b -> STM b
forall a b. (a -> b) -> a -> b
$ Cohort b -> b
forall streamCursorVars.
Cohort streamCursorVars -> streamCursorVars
_cStreamCursorVariables Cohort b
handlerC
addToPoller :: Subscriber
-> CohortId
-> Poller (TVar CursorVariableValues)
-> STM (TVar CursorVariableValues)
addToPoller Subscriber
subscriber CohortId
cohortId Poller (TVar CursorVariableValues)
handler = do
TVar CursorVariableValues
latestCursorValues <-
CursorVariableValues -> STM (TVar CursorVariableValues)
forall a. a -> STM (TVar a)
STM.newTVar (HashMap Name TxtEncodedVal -> CursorVariableValues
CursorVariableValues (ValidatedVariables (HashMap Name) -> HashMap Name TxtEncodedVal
forall (f :: * -> *). ValidatedVariables f -> f TxtEncodedVal
_unValidatedVariables (CohortKey -> ValidatedVariables (HashMap Name)
_cvCursorVariables CohortKey
cohortKey)))
!Cohort (TVar CursorVariableValues)
newCohort <- CohortId
-> TVar (Maybe ResponseHash)
-> TMap SubscriberId Subscriber
-> TMap SubscriberId Subscriber
-> TVar CursorVariableValues
-> Cohort (TVar CursorVariableValues)
forall streamCursorVars.
CohortId
-> TVar (Maybe ResponseHash)
-> TMap SubscriberId Subscriber
-> TMap SubscriberId Subscriber
-> streamCursorVars
-> Cohort streamCursorVars
Cohort CohortId
cohortId (TVar (Maybe ResponseHash)
-> TMap SubscriberId Subscriber
-> TMap SubscriberId Subscriber
-> TVar CursorVariableValues
-> Cohort (TVar CursorVariableValues))
-> STM (TVar (Maybe ResponseHash))
-> STM
(TMap SubscriberId Subscriber
-> TMap SubscriberId Subscriber
-> TVar CursorVariableValues
-> Cohort (TVar CursorVariableValues))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe ResponseHash -> STM (TVar (Maybe ResponseHash))
forall a. a -> STM (TVar a)
STM.newTVar Maybe ResponseHash
forall a. Maybe a
Nothing STM
(TMap SubscriberId Subscriber
-> TMap SubscriberId Subscriber
-> TVar CursorVariableValues
-> Cohort (TVar CursorVariableValues))
-> STM (TMap SubscriberId Subscriber)
-> STM
(TMap SubscriberId Subscriber
-> TVar CursorVariableValues -> Cohort (TVar CursorVariableValues))
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> STM (TMap SubscriberId Subscriber)
forall k v. STM (TMap k v)
TMap.new STM
(TMap SubscriberId Subscriber
-> TVar CursorVariableValues -> Cohort (TVar CursorVariableValues))
-> STM (TMap SubscriberId Subscriber)
-> STM
(TVar CursorVariableValues -> Cohort (TVar CursorVariableValues))
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> STM (TMap SubscriberId Subscriber)
forall k v. STM (TMap k v)
TMap.new STM
(TVar CursorVariableValues -> Cohort (TVar CursorVariableValues))
-> STM (TVar CursorVariableValues)
-> STM (Cohort (TVar CursorVariableValues))
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> TVar CursorVariableValues -> STM (TVar CursorVariableValues)
forall (f :: * -> *) a. Applicative f => a -> f a
pure TVar CursorVariableValues
latestCursorValues
TVar CursorVariableValues
cohortCursorVals <- Subscriber
-> Cohort (TVar CursorVariableValues)
-> STM (TVar CursorVariableValues)
forall b. Subscriber -> Cohort b -> STM b
addToCohort Subscriber
subscriber Cohort (TVar CursorVariableValues)
newCohort
Cohort (TVar CursorVariableValues)
-> CohortKey -> CohortMap (TVar CursorVariableValues) -> STM ()
forall k v. (Eq k, Hashable k) => v -> k -> TMap k v -> STM ()
TMap.insert Cohort (TVar CursorVariableValues)
newCohort CohortKey
cohortKey (CohortMap (TVar CursorVariableValues) -> STM ())
-> CohortMap (TVar CursorVariableValues) -> STM ()
forall a b. (a -> b) -> a -> b
$ Poller (TVar CursorVariableValues)
-> CohortMap (TVar CursorVariableValues)
forall streamCursor. Poller streamCursor -> CohortMap streamCursor
_pCohorts Poller (TVar CursorVariableValues)
handler
TVar CursorVariableValues -> STM (TVar CursorVariableValues)
forall (f :: * -> *) a. Applicative f => a -> f a
pure TVar CursorVariableValues
cohortCursorVals
removeLiveQuery ::
L.Logger L.Hasura ->
ServerMetrics ->
PrometheusMetrics ->
SubscriptionsState ->
LiveQuerySubscriberDetails ->
IO ()
removeLiveQuery :: Logger Hasura
-> ServerMetrics
-> PrometheusMetrics
-> SubscriptionsState
-> LiveQuerySubscriberDetails
-> IO ()
removeLiveQuery Logger Hasura
logger ServerMetrics
serverMetrics PrometheusMetrics
prometheusMetrics SubscriptionsState
lqState lqId :: LiveQuerySubscriberDetails
lqId@(SubscriberDetails PollerKey
handlerId CohortKey
cohortId SubscriberId
sinkId) = IO () -> IO ()
forall a. IO a -> IO a
mask_ (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Maybe (IO ())
mbCleanupIO <- STM (Maybe (IO ())) -> IO (Maybe (IO ()))
forall a. STM a -> IO a
STM.atomically (STM (Maybe (IO ())) -> IO (Maybe (IO ())))
-> STM (Maybe (IO ())) -> IO (Maybe (IO ()))
forall a b. (a -> b) -> a -> b
$ do
Maybe (Poller (), Cohort ())
detM <- PollerMap () -> STM (Maybe (Poller (), Cohort ()))
getQueryDet PollerMap ()
lqMap
(Maybe (Maybe (IO ())) -> Maybe (IO ()))
-> STM (Maybe (Maybe (IO ()))) -> STM (Maybe (IO ()))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Maybe (Maybe (IO ())) -> Maybe (IO ())
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (STM (Maybe (Maybe (IO ()))) -> STM (Maybe (IO ())))
-> STM (Maybe (Maybe (IO ()))) -> STM (Maybe (IO ()))
forall a b. (a -> b) -> a -> b
$
Maybe (Poller (), Cohort ())
-> ((Poller (), Cohort ()) -> STM (Maybe (IO ())))
-> STM (Maybe (Maybe (IO ())))
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM Maybe (Poller (), Cohort ())
detM (((Poller (), Cohort ()) -> STM (Maybe (IO ())))
-> STM (Maybe (Maybe (IO ()))))
-> ((Poller (), Cohort ()) -> STM (Maybe (IO ())))
-> STM (Maybe (Maybe (IO ())))
forall a b. (a -> b) -> a -> b
$ \(Poller CohortMap ()
cohorts TMVar PollerIOState
ioState, Cohort ()
cohort) ->
CohortMap ()
-> TMVar PollerIOState -> Cohort () -> STM (Maybe (IO ()))
cleanHandlerC CohortMap ()
cohorts TMVar PollerIOState
ioState Cohort ()
cohort
Maybe (IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a.
(Foldable t, Monad m) =>
t (m a) -> m ()
sequence_ Maybe (IO ())
mbCleanupIO
IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Gauge -> IO ()
EKG.Gauge.dec (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ ServerMetrics -> Gauge
smActiveSubscriptions ServerMetrics
serverMetrics
IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Gauge -> IO ()
Prometheus.Gauge.dec (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics -> Gauge
pmActiveSubscriptions PrometheusMetrics
prometheusMetrics
IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Gauge -> IO ()
EKG.Gauge.dec (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ ServerMetrics -> Gauge
smActiveLiveQueries ServerMetrics
serverMetrics
where
lqMap :: PollerMap ()
lqMap = SubscriptionsState -> PollerMap ()
_ssLiveQueryMap SubscriptionsState
lqState
getQueryDet :: PollerMap () -> STM (Maybe (Poller (), Cohort ()))
getQueryDet PollerMap ()
subMap = do
Maybe (Poller ())
pollerM <- PollerKey -> PollerMap () -> STM (Maybe (Poller ()))
forall key value.
(Eq key, Hashable key) =>
key -> Map key value -> STM (Maybe value)
STMMap.lookup PollerKey
handlerId PollerMap ()
subMap
(Maybe (Maybe (Poller (), Cohort ()))
-> Maybe (Poller (), Cohort ()))
-> STM (Maybe (Maybe (Poller (), Cohort ())))
-> STM (Maybe (Poller (), Cohort ()))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Maybe (Maybe (Poller (), Cohort ()))
-> Maybe (Poller (), Cohort ())
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (STM (Maybe (Maybe (Poller (), Cohort ())))
-> STM (Maybe (Poller (), Cohort ())))
-> STM (Maybe (Maybe (Poller (), Cohort ())))
-> STM (Maybe (Poller (), Cohort ()))
forall a b. (a -> b) -> a -> b
$
Maybe (Poller ())
-> (Poller () -> STM (Maybe (Poller (), Cohort ())))
-> STM (Maybe (Maybe (Poller (), Cohort ())))
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM Maybe (Poller ())
pollerM ((Poller () -> STM (Maybe (Poller (), Cohort ())))
-> STM (Maybe (Maybe (Poller (), Cohort ()))))
-> (Poller () -> STM (Maybe (Poller (), Cohort ())))
-> STM (Maybe (Maybe (Poller (), Cohort ())))
forall a b. (a -> b) -> a -> b
$ \Poller ()
poller -> do
Maybe (Cohort ())
cohortM <- CohortKey -> CohortMap () -> STM (Maybe (Cohort ()))
forall k v. (Eq k, Hashable k) => k -> TMap k v -> STM (Maybe v)
TMap.lookup CohortKey
cohortId (Poller () -> CohortMap ()
forall streamCursor. Poller streamCursor -> CohortMap streamCursor
_pCohorts Poller ()
poller)
Maybe (Poller (), Cohort ()) -> STM (Maybe (Poller (), Cohort ()))
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (Poller (), Cohort ())
-> STM (Maybe (Poller (), Cohort ())))
-> Maybe (Poller (), Cohort ())
-> STM (Maybe (Poller (), Cohort ()))
forall a b. (a -> b) -> a -> b
$ (Poller ()
poller,) (Cohort () -> (Poller (), Cohort ()))
-> Maybe (Cohort ()) -> Maybe (Poller (), Cohort ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (Cohort ())
cohortM
cleanHandlerC :: CohortMap ()
-> TMVar PollerIOState -> Cohort () -> STM (Maybe (IO ()))
cleanHandlerC CohortMap ()
cohortMap TMVar PollerIOState
ioState Cohort ()
handlerC = do
let curOps :: TMap SubscriberId Subscriber
curOps = Cohort () -> TMap SubscriberId Subscriber
forall streamCursorVars.
Cohort streamCursorVars -> TMap SubscriberId Subscriber
_cExistingSubscribers Cohort ()
handlerC
newOps :: TMap SubscriberId Subscriber
newOps = Cohort () -> TMap SubscriberId Subscriber
forall streamCursorVars.
Cohort streamCursorVars -> TMap SubscriberId Subscriber
_cNewSubscribers Cohort ()
handlerC
SubscriberId -> TMap SubscriberId Subscriber -> STM ()
forall k v. (Eq k, Hashable k) => k -> TMap k v -> STM ()
TMap.delete SubscriberId
sinkId TMap SubscriberId Subscriber
curOps
SubscriberId -> TMap SubscriberId Subscriber -> STM ()
forall k v. (Eq k, Hashable k) => k -> TMap k v -> STM ()
TMap.delete SubscriberId
sinkId TMap SubscriberId Subscriber
newOps
Bool
cohortIsEmpty <-
Bool -> Bool -> Bool
(&&)
(Bool -> Bool -> Bool) -> STM Bool -> STM (Bool -> Bool)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TMap SubscriberId Subscriber -> STM Bool
forall k v. TMap k v -> STM Bool
TMap.null TMap SubscriberId Subscriber
curOps
STM (Bool -> Bool) -> STM Bool -> STM Bool
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> TMap SubscriberId Subscriber -> STM Bool
forall k v. TMap k v -> STM Bool
TMap.null TMap SubscriberId Subscriber
newOps
Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
cohortIsEmpty (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ CohortKey -> CohortMap () -> STM ()
forall k v. (Eq k, Hashable k) => k -> TMap k v -> STM ()
TMap.delete CohortKey
cohortId CohortMap ()
cohortMap
Bool
handlerIsEmpty <- CohortMap () -> STM Bool
forall k v. TMap k v -> STM Bool
TMap.null CohortMap ()
cohortMap
if Bool
handlerIsEmpty
then do
PollerKey -> PollerMap () -> STM ()
forall key value.
(Eq key, Hashable key) =>
key -> Map key value -> STM ()
STMMap.delete PollerKey
handlerId PollerMap ()
lqMap
Maybe Thread
threadRefM <- (PollerIOState -> Thread) -> Maybe PollerIOState -> Maybe Thread
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap PollerIOState -> Thread
_pThread (Maybe PollerIOState -> Maybe Thread)
-> STM (Maybe PollerIOState) -> STM (Maybe Thread)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TMVar PollerIOState -> STM (Maybe PollerIOState)
forall a. TMVar a -> STM (Maybe a)
STM.tryReadTMVar TMVar PollerIOState
ioState
Maybe (IO ()) -> STM (Maybe (IO ()))
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (IO ()) -> STM (Maybe (IO ())))
-> Maybe (IO ()) -> STM (Maybe (IO ()))
forall a b. (a -> b) -> a -> b
$
IO () -> Maybe (IO ())
forall a. a -> Maybe a
Just (IO () -> Maybe (IO ())) -> IO () -> Maybe (IO ())
forall a b. (a -> b) -> a -> b
$
case Maybe Thread
threadRefM of
Just Thread
threadRef -> Thread -> IO ()
Immortal.stop Thread
threadRef
Maybe Thread
Nothing ->
Logger Hasura
-> forall a (m :: * -> *).
(ToEngineLog a Hasura, MonadIO m) =>
a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
(ToEngineLog a impl, MonadIO m) =>
a -> m ()
L.unLogger Logger Hasura
logger (UnstructuredLog -> IO ()) -> UnstructuredLog -> IO ()
forall a b. (a -> b) -> a -> b
$
LogLevel -> SerializableBlob -> UnstructuredLog
L.UnstructuredLog LogLevel
L.LevelError (SerializableBlob -> UnstructuredLog)
-> SerializableBlob -> UnstructuredLog
forall a b. (a -> b) -> a -> b
$
String -> SerializableBlob
forall a. IsString a => String -> a
fromString (String -> SerializableBlob) -> String -> SerializableBlob
forall a b. (a -> b) -> a -> b
$
String
"In removeLiveQuery no worker thread installed. Please report this as a bug: "
String -> ShowS
forall a. Semigroup a => a -> a -> a
<> LiveQuerySubscriberDetails -> String
forall a. Show a => a -> String
show LiveQuerySubscriberDetails
lqId
else Maybe (IO ()) -> STM (Maybe (IO ()))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (IO ())
forall a. Maybe a
Nothing
removeStreamingQuery ::
L.Logger L.Hasura ->
ServerMetrics ->
PrometheusMetrics ->
SubscriptionsState ->
StreamingSubscriberDetails ->
IO ()
removeStreamingQuery :: Logger Hasura
-> ServerMetrics
-> PrometheusMetrics
-> SubscriptionsState
-> StreamingSubscriberDetails
-> IO ()
removeStreamingQuery Logger Hasura
logger ServerMetrics
serverMetrics PrometheusMetrics
prometheusMetrics SubscriptionsState
subscriptionState (SubscriberDetails PollerKey
handlerId (CohortKey
cohortId, TVar CursorVariableValues
cursorVariableTV) SubscriberId
sinkId) = IO () -> IO ()
forall a. IO a -> IO a
mask_ (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Maybe (IO ())
mbCleanupIO <- STM (Maybe (IO ())) -> IO (Maybe (IO ()))
forall a. STM a -> IO a
STM.atomically (STM (Maybe (IO ())) -> IO (Maybe (IO ())))
-> STM (Maybe (IO ())) -> IO (Maybe (IO ()))
forall a b. (a -> b) -> a -> b
$ do
Maybe
(Poller (TVar CursorVariableValues), CohortKey,
Cohort (TVar CursorVariableValues))
detM <- PollerMap (TVar CursorVariableValues)
-> STM
(Maybe
(Poller (TVar CursorVariableValues), CohortKey,
Cohort (TVar CursorVariableValues)))
getQueryDet PollerMap (TVar CursorVariableValues)
streamQMap
(Maybe (Maybe (IO ())) -> Maybe (IO ()))
-> STM (Maybe (Maybe (IO ()))) -> STM (Maybe (IO ()))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Maybe (Maybe (IO ())) -> Maybe (IO ())
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (STM (Maybe (Maybe (IO ()))) -> STM (Maybe (IO ())))
-> STM (Maybe (Maybe (IO ()))) -> STM (Maybe (IO ()))
forall a b. (a -> b) -> a -> b
$
Maybe
(Poller (TVar CursorVariableValues), CohortKey,
Cohort (TVar CursorVariableValues))
-> ((Poller (TVar CursorVariableValues), CohortKey,
Cohort (TVar CursorVariableValues))
-> STM (Maybe (IO ())))
-> STM (Maybe (Maybe (IO ())))
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM Maybe
(Poller (TVar CursorVariableValues), CohortKey,
Cohort (TVar CursorVariableValues))
detM (((Poller (TVar CursorVariableValues), CohortKey,
Cohort (TVar CursorVariableValues))
-> STM (Maybe (IO ())))
-> STM (Maybe (Maybe (IO ()))))
-> ((Poller (TVar CursorVariableValues), CohortKey,
Cohort (TVar CursorVariableValues))
-> STM (Maybe (IO ())))
-> STM (Maybe (Maybe (IO ())))
forall a b. (a -> b) -> a -> b
$ \(Poller CohortMap (TVar CursorVariableValues)
cohorts TMVar PollerIOState
ioState, CohortKey
currentCohortId, Cohort (TVar CursorVariableValues)
cohort) ->
CohortMap (TVar CursorVariableValues)
-> TMVar PollerIOState
-> (Cohort (TVar CursorVariableValues), CohortKey)
-> STM (Maybe (IO ()))
cleanHandlerC CohortMap (TVar CursorVariableValues)
cohorts TMVar PollerIOState
ioState (Cohort (TVar CursorVariableValues)
cohort, CohortKey
currentCohortId)
Maybe (IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a.
(Foldable t, Monad m) =>
t (m a) -> m ()
sequence_ Maybe (IO ())
mbCleanupIO
IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Gauge -> IO ()
EKG.Gauge.dec (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ ServerMetrics -> Gauge
smActiveSubscriptions ServerMetrics
serverMetrics
IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Gauge -> IO ()
Prometheus.Gauge.dec (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics -> Gauge
pmActiveSubscriptions PrometheusMetrics
prometheusMetrics
IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Gauge -> IO ()
EKG.Gauge.dec (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ ServerMetrics -> Gauge
smActiveStreamingSubscriptions ServerMetrics
serverMetrics
where
streamQMap :: PollerMap (TVar CursorVariableValues)
streamQMap = SubscriptionsState -> PollerMap (TVar CursorVariableValues)
_ssStreamQueryMap SubscriptionsState
subscriptionState
getQueryDet :: PollerMap (TVar CursorVariableValues)
-> STM
(Maybe
(Poller (TVar CursorVariableValues), CohortKey,
Cohort (TVar CursorVariableValues)))
getQueryDet PollerMap (TVar CursorVariableValues)
subMap = do
Maybe (Poller (TVar CursorVariableValues))
pollerM <- PollerKey
-> PollerMap (TVar CursorVariableValues)
-> STM (Maybe (Poller (TVar CursorVariableValues)))
forall key value.
(Eq key, Hashable key) =>
key -> Map key value -> STM (Maybe value)
STMMap.lookup PollerKey
handlerId PollerMap (TVar CursorVariableValues)
subMap
(CursorVariableValues HashMap Name TxtEncodedVal
currentCohortCursorVal) <- TVar CursorVariableValues -> STM CursorVariableValues
forall a. TVar a -> STM a
STM.readTVar TVar CursorVariableValues
cursorVariableTV
let updatedCohortId :: CohortKey
updatedCohortId = ValidatedVariables (HashMap Name) -> CohortKey -> CohortKey
modifyCursorCohortVariables (HashMap Name TxtEncodedVal -> ValidatedVariables (HashMap Name)
forall (f :: * -> *). f TxtEncodedVal -> ValidatedVariables f
mkUnsafeValidateVariables HashMap Name TxtEncodedVal
currentCohortCursorVal) CohortKey
cohortId
(Maybe
(Maybe
(Poller (TVar CursorVariableValues), CohortKey,
Cohort (TVar CursorVariableValues)))
-> Maybe
(Poller (TVar CursorVariableValues), CohortKey,
Cohort (TVar CursorVariableValues)))
-> STM
(Maybe
(Maybe
(Poller (TVar CursorVariableValues), CohortKey,
Cohort (TVar CursorVariableValues))))
-> STM
(Maybe
(Poller (TVar CursorVariableValues), CohortKey,
Cohort (TVar CursorVariableValues)))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Maybe
(Maybe
(Poller (TVar CursorVariableValues), CohortKey,
Cohort (TVar CursorVariableValues)))
-> Maybe
(Poller (TVar CursorVariableValues), CohortKey,
Cohort (TVar CursorVariableValues))
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (STM
(Maybe
(Maybe
(Poller (TVar CursorVariableValues), CohortKey,
Cohort (TVar CursorVariableValues))))
-> STM
(Maybe
(Poller (TVar CursorVariableValues), CohortKey,
Cohort (TVar CursorVariableValues))))
-> STM
(Maybe
(Maybe
(Poller (TVar CursorVariableValues), CohortKey,
Cohort (TVar CursorVariableValues))))
-> STM
(Maybe
(Poller (TVar CursorVariableValues), CohortKey,
Cohort (TVar CursorVariableValues)))
forall a b. (a -> b) -> a -> b
$
Maybe (Poller (TVar CursorVariableValues))
-> (Poller (TVar CursorVariableValues)
-> STM
(Maybe
(Poller (TVar CursorVariableValues), CohortKey,
Cohort (TVar CursorVariableValues))))
-> STM
(Maybe
(Maybe
(Poller (TVar CursorVariableValues), CohortKey,
Cohort (TVar CursorVariableValues))))
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM Maybe (Poller (TVar CursorVariableValues))
pollerM ((Poller (TVar CursorVariableValues)
-> STM
(Maybe
(Poller (TVar CursorVariableValues), CohortKey,
Cohort (TVar CursorVariableValues))))
-> STM
(Maybe
(Maybe
(Poller (TVar CursorVariableValues), CohortKey,
Cohort (TVar CursorVariableValues)))))
-> (Poller (TVar CursorVariableValues)
-> STM
(Maybe
(Poller (TVar CursorVariableValues), CohortKey,
Cohort (TVar CursorVariableValues))))
-> STM
(Maybe
(Maybe
(Poller (TVar CursorVariableValues), CohortKey,
Cohort (TVar CursorVariableValues))))
forall a b. (a -> b) -> a -> b
$ \Poller (TVar CursorVariableValues)
poller -> do
Maybe (Cohort (TVar CursorVariableValues))
cohortM <- CohortKey
-> CohortMap (TVar CursorVariableValues)
-> STM (Maybe (Cohort (TVar CursorVariableValues)))
forall k v. (Eq k, Hashable k) => k -> TMap k v -> STM (Maybe v)
TMap.lookup CohortKey
updatedCohortId (Poller (TVar CursorVariableValues)
-> CohortMap (TVar CursorVariableValues)
forall streamCursor. Poller streamCursor -> CohortMap streamCursor
_pCohorts Poller (TVar CursorVariableValues)
poller)
Maybe
(Poller (TVar CursorVariableValues), CohortKey,
Cohort (TVar CursorVariableValues))
-> STM
(Maybe
(Poller (TVar CursorVariableValues), CohortKey,
Cohort (TVar CursorVariableValues)))
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe
(Poller (TVar CursorVariableValues), CohortKey,
Cohort (TVar CursorVariableValues))
-> STM
(Maybe
(Poller (TVar CursorVariableValues), CohortKey,
Cohort (TVar CursorVariableValues))))
-> Maybe
(Poller (TVar CursorVariableValues), CohortKey,
Cohort (TVar CursorVariableValues))
-> STM
(Maybe
(Poller (TVar CursorVariableValues), CohortKey,
Cohort (TVar CursorVariableValues)))
forall a b. (a -> b) -> a -> b
$ (Poller (TVar CursorVariableValues)
poller,CohortKey
updatedCohortId,) (Cohort (TVar CursorVariableValues)
-> (Poller (TVar CursorVariableValues), CohortKey,
Cohort (TVar CursorVariableValues)))
-> Maybe (Cohort (TVar CursorVariableValues))
-> Maybe
(Poller (TVar CursorVariableValues), CohortKey,
Cohort (TVar CursorVariableValues))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (Cohort (TVar CursorVariableValues))
cohortM
cleanHandlerC :: CohortMap (TVar CursorVariableValues)
-> TMVar PollerIOState
-> (Cohort (TVar CursorVariableValues), CohortKey)
-> STM (Maybe (IO ()))
cleanHandlerC CohortMap (TVar CursorVariableValues)
cohortMap TMVar PollerIOState
ioState (Cohort (TVar CursorVariableValues)
handlerC, CohortKey
currentCohortId) = do
let curOps :: TMap SubscriberId Subscriber
curOps = Cohort (TVar CursorVariableValues) -> TMap SubscriberId Subscriber
forall streamCursorVars.
Cohort streamCursorVars -> TMap SubscriberId Subscriber
_cExistingSubscribers Cohort (TVar CursorVariableValues)
handlerC
newOps :: TMap SubscriberId Subscriber
newOps = Cohort (TVar CursorVariableValues) -> TMap SubscriberId Subscriber
forall streamCursorVars.
Cohort streamCursorVars -> TMap SubscriberId Subscriber
_cNewSubscribers Cohort (TVar CursorVariableValues)
handlerC
SubscriberId -> TMap SubscriberId Subscriber -> STM ()
forall k v. (Eq k, Hashable k) => k -> TMap k v -> STM ()
TMap.delete SubscriberId
sinkId TMap SubscriberId Subscriber
curOps
SubscriberId -> TMap SubscriberId Subscriber -> STM ()
forall k v. (Eq k, Hashable k) => k -> TMap k v -> STM ()
TMap.delete SubscriberId
sinkId TMap SubscriberId Subscriber
newOps
Bool
cohortIsEmpty <-
Bool -> Bool -> Bool
(&&)
(Bool -> Bool -> Bool) -> STM Bool -> STM (Bool -> Bool)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TMap SubscriberId Subscriber -> STM Bool
forall k v. TMap k v -> STM Bool
TMap.null TMap SubscriberId Subscriber
curOps
STM (Bool -> Bool) -> STM Bool -> STM Bool
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> TMap SubscriberId Subscriber -> STM Bool
forall k v. TMap k v -> STM Bool
TMap.null TMap SubscriberId Subscriber
newOps
Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
cohortIsEmpty (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ CohortKey -> CohortMap (TVar CursorVariableValues) -> STM ()
forall k v. (Eq k, Hashable k) => k -> TMap k v -> STM ()
TMap.delete CohortKey
currentCohortId CohortMap (TVar CursorVariableValues)
cohortMap
Bool
handlerIsEmpty <- CohortMap (TVar CursorVariableValues) -> STM Bool
forall k v. TMap k v -> STM Bool
TMap.null CohortMap (TVar CursorVariableValues)
cohortMap
if Bool
handlerIsEmpty
then do
PollerKey -> PollerMap (TVar CursorVariableValues) -> STM ()
forall key value.
(Eq key, Hashable key) =>
key -> Map key value -> STM ()
STMMap.delete PollerKey
handlerId PollerMap (TVar CursorVariableValues)
streamQMap
Maybe Thread
threadRefM <- (PollerIOState -> Thread) -> Maybe PollerIOState -> Maybe Thread
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap PollerIOState -> Thread
_pThread (Maybe PollerIOState -> Maybe Thread)
-> STM (Maybe PollerIOState) -> STM (Maybe Thread)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TMVar PollerIOState -> STM (Maybe PollerIOState)
forall a. TMVar a -> STM (Maybe a)
STM.tryReadTMVar TMVar PollerIOState
ioState
Maybe (IO ()) -> STM (Maybe (IO ()))
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (IO ()) -> STM (Maybe (IO ())))
-> Maybe (IO ()) -> STM (Maybe (IO ()))
forall a b. (a -> b) -> a -> b
$
IO () -> Maybe (IO ())
forall a. a -> Maybe a
Just (IO () -> Maybe (IO ())) -> IO () -> Maybe (IO ())
forall a b. (a -> b) -> a -> b
$
case Maybe Thread
threadRefM of
Just Thread
threadRef -> Thread -> IO ()
Immortal.stop Thread
threadRef
Maybe Thread
Nothing ->
Logger Hasura
-> forall a (m :: * -> *).
(ToEngineLog a Hasura, MonadIO m) =>
a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
(ToEngineLog a impl, MonadIO m) =>
a -> m ()
L.unLogger Logger Hasura
logger (UnstructuredLog -> IO ()) -> UnstructuredLog -> IO ()
forall a b. (a -> b) -> a -> b
$
LogLevel -> SerializableBlob -> UnstructuredLog
L.UnstructuredLog LogLevel
L.LevelError (SerializableBlob -> UnstructuredLog)
-> SerializableBlob -> UnstructuredLog
forall a b. (a -> b) -> a -> b
$
String -> SerializableBlob
forall a. IsString a => String -> a
fromString (String -> SerializableBlob) -> String -> SerializableBlob
forall a b. (a -> b) -> a -> b
$
String
"In removeLiveQuery no worker thread installed. Please report this as a bug: "
String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
" poller_id: "
String -> ShowS
forall a. Semigroup a => a -> a -> a
<> PollerKey -> String
forall a. Show a => a -> String
show PollerKey
handlerId
String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
", cohort_id: "
String -> ShowS
forall a. Semigroup a => a -> a -> a
<> CohortKey -> String
forall a. Show a => a -> String
show CohortKey
cohortId
String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
", subscriber_id:"
String -> ShowS
forall a. Semigroup a => a -> a -> a
<> SubscriberId -> String
forall a. Show a => a -> String
show SubscriberId
sinkId
else Maybe (IO ()) -> STM (Maybe (IO ()))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (IO ())
forall a. Maybe a
Nothing
data LiveAsyncActionQueryOnSource = LiveAsyncActionQueryOnSource
{ LiveAsyncActionQueryOnSource -> LiveQuerySubscriberDetails
_laaqpCurrentLqId :: !LiveQuerySubscriberDetails,
LiveAsyncActionQueryOnSource -> ActionLogResponseMap
_laaqpPrevActionLogMap :: !ActionLogResponseMap,
LiveAsyncActionQueryOnSource
-> LiveQuerySubscriberDetails
-> ActionLogResponseMap
-> IO (Maybe LiveQuerySubscriberDetails)
_laaqpRestartLq :: !(LiveQuerySubscriberDetails -> ActionLogResponseMap -> IO (Maybe LiveQuerySubscriberDetails))
}
data LiveAsyncActionQueryWithNoRelationships = LiveAsyncActionQueryWithNoRelationships
{
LiveAsyncActionQueryWithNoRelationships
-> ActionLogResponseMap -> IO ()
_laaqwnrSendResponse :: !(ActionLogResponseMap -> IO ()),
LiveAsyncActionQueryWithNoRelationships -> IO ()
_laaqwnrSendCompleted :: !(IO ())
}
data LiveAsyncActionQuery
= LAAQNoRelationships !LiveAsyncActionQueryWithNoRelationships
| LAAQOnSourceDB !LiveAsyncActionQueryOnSource
data AsyncActionQueryLive = AsyncActionQueryLive
{ AsyncActionQueryLive -> NonEmpty ActionId
_aaqlActionIds :: !(NonEmpty ActionId),
AsyncActionQueryLive -> QErr -> IO ()
_aaqlOnException :: !(QErr -> IO ()),
AsyncActionQueryLive -> LiveAsyncActionQuery
_aaqlLiveExecution :: !LiveAsyncActionQuery
}
type AsyncActionSubscriptionState = TMap.TMap OperationId AsyncActionQueryLive
addAsyncActionLiveQuery ::
AsyncActionSubscriptionState ->
OperationId ->
NonEmpty ActionId ->
(QErr -> IO ()) ->
LiveAsyncActionQuery ->
IO ()
addAsyncActionLiveQuery :: AsyncActionSubscriptionState
-> OperationId
-> NonEmpty ActionId
-> (QErr -> IO ())
-> LiveAsyncActionQuery
-> IO ()
addAsyncActionLiveQuery AsyncActionSubscriptionState
queriesState OperationId
opId NonEmpty ActionId
actionIds QErr -> IO ()
onException LiveAsyncActionQuery
liveQuery =
STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
AsyncActionQueryLive
-> OperationId -> AsyncActionSubscriptionState -> STM ()
forall k v. (Eq k, Hashable k) => v -> k -> TMap k v -> STM ()
TMap.insert (NonEmpty ActionId
-> (QErr -> IO ()) -> LiveAsyncActionQuery -> AsyncActionQueryLive
AsyncActionQueryLive NonEmpty ActionId
actionIds QErr -> IO ()
onException LiveAsyncActionQuery
liveQuery) OperationId
opId AsyncActionSubscriptionState
queriesState
removeAsyncActionLiveQuery ::
AsyncActionSubscriptionState -> OperationId -> IO ()
removeAsyncActionLiveQuery :: AsyncActionSubscriptionState -> OperationId -> IO ()
removeAsyncActionLiveQuery AsyncActionSubscriptionState
queriesState OperationId
opId =
STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ OperationId -> AsyncActionSubscriptionState -> STM ()
forall k v. (Eq k, Hashable k) => k -> TMap k v -> STM ()
TMap.delete OperationId
opId AsyncActionSubscriptionState
queriesState