{-# LANGUAGE TemplateHaskell #-}

-- | Top-level management of subscription poller threads.
-- The implementation of the polling itself is
-- in "Hasura.GraphQL.Execute.Subscription.Poll". See "Hasura.GraphQL.Execute.Subscription" for high-level
-- details.
module Hasura.GraphQL.Execute.Subscription.State
  ( SubscriptionsState (..),
    initSubscriptionsState,
    dumpSubscriptionsState,
    SubscriberDetails,
    SubscriptionPostPollHook,
    addLiveQuery,
    addStreamSubscriptionQuery,
    removeLiveQuery,
    removeStreamingQuery,
    LiveAsyncActionQueryOnSource (..),
    LiveAsyncActionQueryWithNoRelationships (..),
    LiveAsyncActionQuery (..),
    AsyncActionQueryLive (..),
    AsyncActionSubscriptionState,
    addAsyncActionLiveQuery,
    removeAsyncActionLiveQuery,
    LiveQuerySubscriberDetails,
    StreamingSubscriberDetails,
  )
where

import Control.Concurrent.Extended (forkImmortal, sleep)
import Control.Concurrent.STM qualified as STM
import Control.Exception (mask_)
import Control.Immortal qualified as Immortal
import Data.Aeson.Extended qualified as J
import Data.Aeson.Ordered qualified as JO
import Data.Monoid (Endo)
import Data.String
import Data.Text.Extended
import Data.UUID.V4 qualified as UUID
import GHC.AssertNF.CPP
import Hasura.Base.Error
import Hasura.GraphQL.Execute.Backend
import Hasura.GraphQL.Execute.Subscription.Options
import Hasura.GraphQL.Execute.Subscription.Plan
import Hasura.GraphQL.Execute.Subscription.Poll
import Hasura.GraphQL.Execute.Subscription.Poll.Common (PollerResponseState (PRSError, PRSSuccess))
import Hasura.GraphQL.Execute.Subscription.TMap qualified as TMap
import Hasura.GraphQL.ParameterizedQueryHash (ParameterizedQueryHash)
import Hasura.GraphQL.Transport.Backend
import Hasura.GraphQL.Transport.HTTP.Protocol (OperationName)
import Hasura.GraphQL.Transport.WebSocket.Protocol (OperationId)
import Hasura.Logging qualified as L
import Hasura.Prelude
import Hasura.RQL.Types.Action
import Hasura.RQL.Types.Common (SourceName)
import Hasura.SQL.AnyBackend qualified as AB
import Hasura.Server.Metrics (ServerMetrics (..))
import Hasura.Server.Prometheus
  ( DynamicSubscriptionLabel (..),
    PrometheusMetrics (..),
    SubscriptionLabel (..),
    SubscriptionMetrics (..),
    liveQuerySubscriptionLabel,
    recordMetricWithLabel,
    streamingSubscriptionLabel,
  )
import Hasura.Server.Types (GranularPrometheusMetricsState (..), RequestId)
import Language.GraphQL.Draft.Syntax qualified as G
import Refined (unrefine)
import StmContainers.Map qualified as STMMap
import System.Metrics.Gauge qualified as EKG.Gauge
import System.Metrics.Prometheus.Gauge qualified as Prometheus.Gauge
import System.Metrics.Prometheus.GaugeVector qualified as GaugeVector

-- | The top-level datatype that holds the state for all active subscriptions.
--
-- NOTE!: This must be kept consistent with a websocket connection's
-- 'OperationMap', in 'onClose' and 'onStart'.
data SubscriptionsState = SubscriptionsState
  { SubscriptionsState -> PollerMap ()
_ssLiveQueryMap :: PollerMap (),
    SubscriptionsState -> PollerMap (TVar CursorVariableValues)
_ssStreamQueryMap :: PollerMap (STM.TVar CursorVariableValues),
    -- | A hook function which is run after each fetch cycle
    SubscriptionsState -> SubscriptionPostPollHook
_ssPostPollHook :: SubscriptionPostPollHook,
    SubscriptionsState -> AsyncActionSubscriptionState
_ssAsyncActions :: AsyncActionSubscriptionState
  }

initSubscriptionsState :: SubscriptionPostPollHook -> IO SubscriptionsState
initSubscriptionsState :: SubscriptionPostPollHook -> IO SubscriptionsState
initSubscriptionsState SubscriptionPostPollHook
pollHook =
  STM SubscriptionsState -> IO SubscriptionsState
forall a. STM a -> IO a
STM.atomically
    (STM SubscriptionsState -> IO SubscriptionsState)
-> STM SubscriptionsState -> IO SubscriptionsState
forall a b. (a -> b) -> a -> b
$ PollerMap ()
-> PollerMap (TVar CursorVariableValues)
-> SubscriptionPostPollHook
-> AsyncActionSubscriptionState
-> SubscriptionsState
SubscriptionsState
    (PollerMap ()
 -> PollerMap (TVar CursorVariableValues)
 -> SubscriptionPostPollHook
 -> AsyncActionSubscriptionState
 -> SubscriptionsState)
-> STM (PollerMap ())
-> STM
     (PollerMap (TVar CursorVariableValues)
      -> SubscriptionPostPollHook
      -> AsyncActionSubscriptionState
      -> SubscriptionsState)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> STM (PollerMap ())
forall key value. STM (Map key value)
STMMap.new
    STM
  (PollerMap (TVar CursorVariableValues)
   -> SubscriptionPostPollHook
   -> AsyncActionSubscriptionState
   -> SubscriptionsState)
-> STM (PollerMap (TVar CursorVariableValues))
-> STM
     (SubscriptionPostPollHook
      -> AsyncActionSubscriptionState -> SubscriptionsState)
forall a b. STM (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> STM (PollerMap (TVar CursorVariableValues))
forall key value. STM (Map key value)
STMMap.new
    STM
  (SubscriptionPostPollHook
   -> AsyncActionSubscriptionState -> SubscriptionsState)
-> STM SubscriptionPostPollHook
-> STM (AsyncActionSubscriptionState -> SubscriptionsState)
forall a b. STM (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> SubscriptionPostPollHook -> STM SubscriptionPostPollHook
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SubscriptionPostPollHook
pollHook
    STM (AsyncActionSubscriptionState -> SubscriptionsState)
-> STM AsyncActionSubscriptionState -> STM SubscriptionsState
forall a b. STM (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> STM AsyncActionSubscriptionState
forall k v. STM (TMap k v)
TMap.new

-- | For dev debugging, output subject to change.
dumpSubscriptionsState :: Bool -> LiveQueriesOptions -> StreamQueriesOptions -> SubscriptionsState -> IO J.Value
dumpSubscriptionsState :: Bool
-> LiveQueriesOptions
-> LiveQueriesOptions
-> SubscriptionsState
-> IO Value
dumpSubscriptionsState Bool
extended LiveQueriesOptions
liveQOpts LiveQueriesOptions
streamQOpts (SubscriptionsState PollerMap ()
lqMap PollerMap (TVar CursorVariableValues)
streamMap SubscriptionPostPollHook
_ AsyncActionSubscriptionState
_) = do
  Value
lqMapJ <- Bool -> PollerMap () -> IO Value
forall streamCursor. Bool -> PollerMap streamCursor -> IO Value
dumpPollerMap Bool
extended PollerMap ()
lqMap
  Value
streamMapJ <- Bool -> PollerMap (TVar CursorVariableValues) -> IO Value
forall streamCursor. Bool -> PollerMap streamCursor -> IO Value
dumpPollerMap Bool
extended PollerMap (TVar CursorVariableValues)
streamMap
  Value -> IO Value
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return
    (Value -> IO Value) -> Value -> IO Value
forall a b. (a -> b) -> a -> b
$ [Pair] -> Value
J.object
      [ Key
"options" Key -> LiveQueriesOptions -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
J..= LiveQueriesOptions
liveQOpts,
        Key
"live_queries_map" Key -> Value -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
J..= Value
lqMapJ,
        Key
"stream_queries_map" Key -> Value -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
J..= Value
streamMapJ,
        Key
"stream_queries_options" Key -> LiveQueriesOptions -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
J..= LiveQueriesOptions
streamQOpts
      ]

-- | SubscriberDetails contains the data required to locate a subscriber
--   in the correct cohort within the correct poller in the operation map.
data SubscriberDetails a = SubscriberDetails
  { forall a. SubscriberDetails a -> BackendPollerKey
_sdPoller :: BackendPollerKey,
    forall a. SubscriberDetails a -> a
_sdCohort :: a,
    forall a. SubscriberDetails a -> SubscriberId
_sdSubscriber :: SubscriberId
  }
  deriving (Int -> SubscriberDetails a -> ShowS
[SubscriberDetails a] -> ShowS
SubscriberDetails a -> String
(Int -> SubscriberDetails a -> ShowS)
-> (SubscriberDetails a -> String)
-> ([SubscriberDetails a] -> ShowS)
-> Show (SubscriberDetails a)
forall a. Show a => Int -> SubscriberDetails a -> ShowS
forall a. Show a => [SubscriberDetails a] -> ShowS
forall a. Show a => SubscriberDetails a -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: forall a. Show a => Int -> SubscriberDetails a -> ShowS
showsPrec :: Int -> SubscriberDetails a -> ShowS
$cshow :: forall a. Show a => SubscriberDetails a -> String
show :: SubscriberDetails a -> String
$cshowList :: forall a. Show a => [SubscriberDetails a] -> ShowS
showList :: [SubscriberDetails a] -> ShowS
Show)

type LiveQuerySubscriberDetails = SubscriberDetails CohortKey

-- | The `CohortKey` contains the variables with which the subscription was started
--   and which will remain unchanged. The second type contains the mutable reference
--   through which we can get the latest value of the cursor and using both the `CohortKey`
--   and the latest cursor value, we locate the subscriber in the operation map to find its
--   details and then stop it.
type StreamingSubscriberDetails = SubscriberDetails (CohortKey, STM.TVar CursorVariableValues)

-- | `findPollerForSubscriber` places a subscriber in the correct poller.
--   If the poller doesn't exist then we create one otherwise we return the
--   existing one.
findPollerForSubscriber ::
  Subscriber ->
  PollerMap streamCursorVars ->
  BackendPollerKey ->
  CohortKey ->
  (Subscriber -> Cohort streamCursorVars -> STM.STM streamCursorVars) ->
  (Subscriber -> Poller streamCursorVars -> STM.STM streamCursorVars) ->
  ParameterizedQueryHash ->
  Maybe OperationName ->
  STM.STM ((Maybe (Poller streamCursorVars)), streamCursorVars)
findPollerForSubscriber :: forall streamCursorVars.
Subscriber
-> PollerMap streamCursorVars
-> BackendPollerKey
-> CohortVariables
-> (Subscriber -> Cohort streamCursorVars -> STM streamCursorVars)
-> (Subscriber -> Poller streamCursorVars -> STM streamCursorVars)
-> ParameterizedQueryHash
-> Maybe OperationName
-> STM (Maybe (Poller streamCursorVars), streamCursorVars)
findPollerForSubscriber Subscriber
subscriber PollerMap streamCursorVars
pollerMap BackendPollerKey
pollerKey CohortVariables
cohortKey Subscriber -> Cohort streamCursorVars -> STM streamCursorVars
addToCohort Subscriber -> Poller streamCursorVars -> STM streamCursorVars
addToPoller ParameterizedQueryHash
parameterizedQueryHash Maybe OperationName
maybeOperationName =
  -- a handler is returned only when it is newly created
  BackendPollerKey
-> PollerMap streamCursorVars
-> STM (Maybe (Poller streamCursorVars))
forall key value.
(Eq key, Hashable key) =>
key -> Map key value -> STM (Maybe value)
STMMap.lookup BackendPollerKey
pollerKey PollerMap streamCursorVars
pollerMap STM (Maybe (Poller streamCursorVars))
-> (Maybe (Poller streamCursorVars)
    -> STM (Maybe (Poller streamCursorVars), streamCursorVars))
-> STM (Maybe (Poller streamCursorVars), streamCursorVars)
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Just Poller streamCursorVars
poller -> do
      -- Found a poller, now check if a cohort also exists
      streamCursorVars
cursorVars <-
        CohortVariables
-> TMap CohortVariables (Cohort streamCursorVars)
-> STM (Maybe (Cohort streamCursorVars))
forall k v. Hashable k => k -> TMap k v -> STM (Maybe v)
TMap.lookup CohortVariables
cohortKey (Poller streamCursorVars
-> TMap CohortVariables (Cohort streamCursorVars)
forall streamCursor. Poller streamCursor -> CohortMap streamCursor
_pCohorts Poller streamCursorVars
poller) STM (Maybe (Cohort streamCursorVars))
-> (Maybe (Cohort streamCursorVars) -> STM streamCursorVars)
-> STM streamCursorVars
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
          -- cohort found too! Simply add the subscriber to the cohort
          Just Cohort streamCursorVars
cohort -> Subscriber -> Cohort streamCursorVars -> STM streamCursorVars
addToCohort Subscriber
subscriber Cohort streamCursorVars
cohort
          -- cohort not found. Create a cohort with the subscriber and add
          -- the cohort to the poller
          Maybe (Cohort streamCursorVars)
Nothing -> Subscriber -> Poller streamCursorVars -> STM streamCursorVars
addToPoller Subscriber
subscriber Poller streamCursorVars
poller
      -- Add the operation name of the subcription to the poller, if it doesn't exist
      -- else increment the count for the operation name
      Maybe OperationName
-> TMap (Maybe OperationName) Int -> STM (Maybe Int)
forall k v. Hashable k => k -> TMap k v -> STM (Maybe v)
TMap.lookup Maybe OperationName
maybeOperationName (Poller streamCursorVars -> TMap (Maybe OperationName) Int
forall streamCursor.
Poller streamCursor -> TMap (Maybe OperationName) Int
_pOperationNamesMap Poller streamCursorVars
poller) STM (Maybe Int) -> (Maybe Int -> STM ()) -> STM ()
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        Maybe Int
Nothing -> Int
-> Maybe OperationName -> TMap (Maybe OperationName) Int -> STM ()
forall k v. Hashable k => v -> k -> TMap k v -> STM ()
TMap.insert Int
1 Maybe OperationName
maybeOperationName (Poller streamCursorVars -> TMap (Maybe OperationName) Int
forall streamCursor.
Poller streamCursor -> TMap (Maybe OperationName) Int
_pOperationNamesMap Poller streamCursorVars
poller)
        Just Int
_ -> (Int -> Int)
-> Maybe OperationName -> TMap (Maybe OperationName) Int -> STM ()
forall k v. Hashable k => (v -> v) -> k -> TMap k v -> STM ()
TMap.adjust (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) Maybe OperationName
maybeOperationName (Poller streamCursorVars -> TMap (Maybe OperationName) Int
forall streamCursor.
Poller streamCursor -> TMap (Maybe OperationName) Int
_pOperationNamesMap Poller streamCursorVars
poller)
      (Maybe (Poller streamCursorVars), streamCursorVars)
-> STM (Maybe (Poller streamCursorVars), streamCursorVars)
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (Poller streamCursorVars)
forall a. Maybe a
Nothing, streamCursorVars
cursorVars)
    Maybe (Poller streamCursorVars)
Nothing -> do
      -- no poller found, so create one with the cohort
      -- and the subscriber within it.
      TMap (Maybe OperationName) Int
operationNamesMap <- STM (TMap (Maybe OperationName) Int)
forall k v. STM (TMap k v)
TMap.new
      Int
-> Maybe OperationName -> TMap (Maybe OperationName) Int -> STM ()
forall k v. Hashable k => v -> k -> TMap k v -> STM ()
TMap.insert Int
1 Maybe OperationName
maybeOperationName TMap (Maybe OperationName) Int
operationNamesMap
      !Poller streamCursorVars
poller <- TMap CohortVariables (Cohort streamCursorVars)
-> TVar PollerResponseState
-> TMVar PollerIOState
-> ParameterizedQueryHash
-> TMap (Maybe OperationName) Int
-> Poller streamCursorVars
forall streamCursor.
CohortMap streamCursor
-> TVar PollerResponseState
-> TMVar PollerIOState
-> ParameterizedQueryHash
-> TMap (Maybe OperationName) Int
-> Poller streamCursor
Poller (TMap CohortVariables (Cohort streamCursorVars)
 -> TVar PollerResponseState
 -> TMVar PollerIOState
 -> ParameterizedQueryHash
 -> TMap (Maybe OperationName) Int
 -> Poller streamCursorVars)
-> STM (TMap CohortVariables (Cohort streamCursorVars))
-> STM
     (TVar PollerResponseState
      -> TMVar PollerIOState
      -> ParameterizedQueryHash
      -> TMap (Maybe OperationName) Int
      -> Poller streamCursorVars)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> STM (TMap CohortVariables (Cohort streamCursorVars))
forall k v. STM (TMap k v)
TMap.new STM
  (TVar PollerResponseState
   -> TMVar PollerIOState
   -> ParameterizedQueryHash
   -> TMap (Maybe OperationName) Int
   -> Poller streamCursorVars)
-> STM (TVar PollerResponseState)
-> STM
     (TMVar PollerIOState
      -> ParameterizedQueryHash
      -> TMap (Maybe OperationName) Int
      -> Poller streamCursorVars)
forall a b. STM (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> PollerResponseState -> STM (TVar PollerResponseState)
forall a. a -> STM (TVar a)
STM.newTVar PollerResponseState
PRSSuccess STM
  (TMVar PollerIOState
   -> ParameterizedQueryHash
   -> TMap (Maybe OperationName) Int
   -> Poller streamCursorVars)
-> STM (TMVar PollerIOState)
-> STM
     (ParameterizedQueryHash
      -> TMap (Maybe OperationName) Int -> Poller streamCursorVars)
forall a b. STM (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> STM (TMVar PollerIOState)
forall a. STM (TMVar a)
STM.newEmptyTMVar STM
  (ParameterizedQueryHash
   -> TMap (Maybe OperationName) Int -> Poller streamCursorVars)
-> STM ParameterizedQueryHash
-> STM (TMap (Maybe OperationName) Int -> Poller streamCursorVars)
forall a b. STM (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> ParameterizedQueryHash -> STM ParameterizedQueryHash
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ParameterizedQueryHash
parameterizedQueryHash STM (TMap (Maybe OperationName) Int -> Poller streamCursorVars)
-> STM (TMap (Maybe OperationName) Int)
-> STM (Poller streamCursorVars)
forall a b. STM (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> TMap (Maybe OperationName) Int
-> STM (TMap (Maybe OperationName) Int)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure TMap (Maybe OperationName) Int
operationNamesMap
      streamCursorVars
cursorVars <- Subscriber -> Poller streamCursorVars -> STM streamCursorVars
addToPoller Subscriber
subscriber Poller streamCursorVars
poller
      Poller streamCursorVars
-> BackendPollerKey -> PollerMap streamCursorVars -> STM ()
forall key value.
(Eq key, Hashable key) =>
value -> key -> Map key value -> STM ()
STMMap.insert Poller streamCursorVars
poller BackendPollerKey
pollerKey PollerMap streamCursorVars
pollerMap
      (Maybe (Poller streamCursorVars), streamCursorVars)
-> STM (Maybe (Poller streamCursorVars), streamCursorVars)
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return ((Maybe (Poller streamCursorVars), streamCursorVars)
 -> STM (Maybe (Poller streamCursorVars), streamCursorVars))
-> (Maybe (Poller streamCursorVars), streamCursorVars)
-> STM (Maybe (Poller streamCursorVars), streamCursorVars)
forall a b. (a -> b) -> a -> b
$ (Poller streamCursorVars -> Maybe (Poller streamCursorVars)
forall a. a -> Maybe a
Just Poller streamCursorVars
poller, streamCursorVars
cursorVars)

-- | Fork a thread handling a regular (live query) subscription
addLiveQuery ::
  forall b.
  (BackendTransport b) =>
  L.Logger L.Hasura ->
  ServerMetrics ->
  PrometheusMetrics ->
  SubscriberMetadata ->
  SubscriptionsState ->
  IO (LiveQueriesOptions, StreamQueriesOptions) ->
  SourceName ->
  ParameterizedQueryHash ->
  -- | operation name of the query
  Maybe OperationName ->
  RequestId ->
  SubscriptionQueryPlan b (MultiplexedQuery b) ->
  IO GranularPrometheusMetricsState ->
  -- | the action to be executed when result changes
  OnChange ->
  (Maybe (Endo JO.Value)) ->
  IO LiveQuerySubscriberDetails
addLiveQuery :: forall (b :: BackendType).
BackendTransport b =>
Logger Hasura
-> ServerMetrics
-> PrometheusMetrics
-> SubscriberMetadata
-> SubscriptionsState
-> IO (LiveQueriesOptions, LiveQueriesOptions)
-> SourceName
-> ParameterizedQueryHash
-> Maybe OperationName
-> RequestId
-> SubscriptionQueryPlan b (MultiplexedQuery b)
-> IO GranularPrometheusMetricsState
-> OnChange
-> Maybe (Endo Value)
-> IO LiveQuerySubscriberDetails
addLiveQuery
  Logger Hasura
logger
  ServerMetrics
serverMetrics
  PrometheusMetrics
prometheusMetrics
  SubscriberMetadata
subscriberMetadata
  SubscriptionsState
subscriptionState
  IO (LiveQueriesOptions, LiveQueriesOptions)
getSubscriptionOptions
  SourceName
source
  ParameterizedQueryHash
parameterizedQueryHash
  Maybe OperationName
operationName
  RequestId
requestId
  SubscriptionQueryPlan b (MultiplexedQuery b)
plan
  IO GranularPrometheusMetricsState
granularPrometheusMetricsState
  OnChange
onResultAction
  Maybe (Endo Value)
modifier = do
    -- CAREFUL!: It's absolutely crucial that we can't throw any exceptions here!

    -- disposable subscriber UUID:
    SubscriberId
subscriberId <- IO SubscriberId
newSubscriberId
    let !subscriber :: Subscriber
subscriber = SubscriberId
-> SubscriberMetadata
-> RequestId
-> Maybe OperationName
-> OnChange
-> Subscriber
Subscriber SubscriberId
subscriberId SubscriberMetadata
subscriberMetadata RequestId
requestId Maybe OperationName
operationName OnChange
onResultAction

    $String
String -> Subscriber -> IO ()
forall a. String -> a -> IO ()
assertNFHere Subscriber
subscriber -- so we don't write thunks to mutable vars
    (Maybe (Poller ())
pollerMaybe, ()) <-
      STM (Maybe (Poller ()), ()) -> IO (Maybe (Poller ()), ())
forall a. STM a -> IO a
STM.atomically
        (STM (Maybe (Poller ()), ()) -> IO (Maybe (Poller ()), ()))
-> STM (Maybe (Poller ()), ()) -> IO (Maybe (Poller ()), ())
forall a b. (a -> b) -> a -> b
$ Subscriber
-> PollerMap ()
-> BackendPollerKey
-> CohortVariables
-> (Subscriber -> Cohort () -> STM ())
-> (Subscriber -> Poller () -> STM ())
-> ParameterizedQueryHash
-> Maybe OperationName
-> STM (Maybe (Poller ()), ())
forall streamCursorVars.
Subscriber
-> PollerMap streamCursorVars
-> BackendPollerKey
-> CohortVariables
-> (Subscriber -> Cohort streamCursorVars -> STM streamCursorVars)
-> (Subscriber -> Poller streamCursorVars -> STM streamCursorVars)
-> ParameterizedQueryHash
-> Maybe OperationName
-> STM (Maybe (Poller streamCursorVars), streamCursorVars)
findPollerForSubscriber
          Subscriber
subscriber
          PollerMap ()
lqMap
          BackendPollerKey
handlerId
          CohortVariables
cohortKey
          Subscriber -> Cohort () -> STM ()
forall {streamCursorVars}.
Subscriber -> Cohort streamCursorVars -> STM ()
addToCohort
          Subscriber -> Poller () -> STM ()
addToPoller
          ParameterizedQueryHash
parameterizedQueryHash
          Maybe OperationName
operationName

    -- we can then attach a polling thread if it is new the livequery can only be
    -- cancelled after putTMVar
    Maybe (Poller ()) -> (Poller () -> IO ()) -> IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ Maybe (Poller ())
pollerMaybe ((Poller () -> IO ()) -> IO ()) -> (Poller () -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Poller ()
poller -> do
      PollerId
pollerId <- UUID -> PollerId
PollerId (UUID -> PollerId) -> IO UUID -> IO PollerId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
      Thread
threadRef <- String -> Logger Hasura -> IO Void -> IO Thread
forall (m :: * -> *).
ForkableMonadIO m =>
String -> Logger Hasura -> m Void -> m Thread
forkImmortal (String
"pollLiveQuery." String -> ShowS
forall a. Semigroup a => a -> a -> a
<> PollerId -> String
forall a. Show a => a -> String
show PollerId
pollerId) Logger Hasura
logger
        (IO Void -> IO Thread) -> IO Void -> IO Thread
forall a b. (a -> b) -> a -> b
$ IO () -> IO Void
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever
        (IO () -> IO Void) -> IO () -> IO Void
forall a b. (a -> b) -> a -> b
$ do
          (LiveQueriesOptions
lqOpts, LiveQueriesOptions
_) <- IO (LiveQueriesOptions, LiveQueriesOptions)
getSubscriptionOptions
          let SubscriptionsOptions BatchSize
_ RefetchInterval
refetchInterval = LiveQueriesOptions
lqOpts
          forall (b :: BackendType).
BackendTransport b =>
PollerId
-> TVar PollerResponseState
-> LiveQueriesOptions
-> (SourceName, SourceConfig b)
-> RoleName
-> ParameterizedQueryHash
-> MultiplexedQuery b
-> CohortMap 'LiveQuery
-> SubscriptionPostPollHook
-> PrometheusMetrics
-> IO GranularPrometheusMetricsState
-> TMap (Maybe OperationName) Int
-> ResolvedConnectionTemplate b
-> Maybe (Endo Value)
-> IO ()
pollLiveQuery @b
            PollerId
pollerId
            (Poller () -> TVar PollerResponseState
forall streamCursor.
Poller streamCursor -> TVar PollerResponseState
_pPollerState Poller ()
poller)
            LiveQueriesOptions
lqOpts
            (SourceName
source, SourceConfig b
sourceConfig)
            RoleName
role
            ParameterizedQueryHash
parameterizedQueryHash
            MultiplexedQuery b
query
            (Poller () -> CohortMap ()
forall streamCursor. Poller streamCursor -> CohortMap streamCursor
_pCohorts Poller ()
poller)
            SubscriptionPostPollHook
postPollHook
            PrometheusMetrics
prometheusMetrics
            IO GranularPrometheusMetricsState
granularPrometheusMetricsState
            (Poller () -> TMap (Maybe OperationName) Int
forall streamCursor.
Poller streamCursor -> TMap (Maybe OperationName) Int
_pOperationNamesMap Poller ()
poller)
            ResolvedConnectionTemplate b
resolvedConnectionTemplate
            Maybe (Endo Value)
modifier
          DiffTime -> IO ()
sleep (DiffTime -> IO ()) -> DiffTime -> IO ()
forall a b. (a -> b) -> a -> b
$ Refined NonNegative DiffTime -> DiffTime
forall {k} (p :: k) x. Refined p x -> x
unrefine (Refined NonNegative DiffTime -> DiffTime)
-> Refined NonNegative DiffTime -> DiffTime
forall a b. (a -> b) -> a -> b
$ RefetchInterval -> Refined NonNegative DiffTime
unRefetchInterval RefetchInterval
refetchInterval
      let !pState :: PollerIOState
pState = Thread -> PollerId -> PollerIOState
PollerIOState Thread
threadRef PollerId
pollerId
      $String
String -> PollerIOState -> IO ()
forall a. String -> a -> IO ()
assertNFHere PollerIOState
pState -- so we don't write thunks to mutable vars
      STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar PollerIOState -> PollerIOState -> STM ()
forall a. TMVar a -> a -> STM ()
STM.putTMVar (Poller () -> TMVar PollerIOState
forall streamCursor. Poller streamCursor -> TMVar PollerIOState
_pIOState Poller ()
poller) PollerIOState
pState
      IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Gauge -> IO ()
Prometheus.Gauge.inc (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ SubscriptionMetrics -> Gauge
submActiveLiveQueryPollers (SubscriptionMetrics -> Gauge) -> SubscriptionMetrics -> Gauge
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics -> SubscriptionMetrics
pmSubscriptionMetrics (PrometheusMetrics -> SubscriptionMetrics)
-> PrometheusMetrics -> SubscriptionMetrics
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics
prometheusMetrics

    IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Gauge -> IO ()
EKG.Gauge.inc (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ ServerMetrics -> Gauge
smActiveSubscriptions ServerMetrics
serverMetrics
    let promMetricGranularLabel :: SubscriptionLabel
promMetricGranularLabel = SubscriptionKindLabel
-> Maybe DynamicSubscriptionLabel -> SubscriptionLabel
SubscriptionLabel SubscriptionKindLabel
liveQuerySubscriptionLabel (DynamicSubscriptionLabel -> Maybe DynamicSubscriptionLabel
forall a. a -> Maybe a
Just (DynamicSubscriptionLabel -> Maybe DynamicSubscriptionLabel)
-> DynamicSubscriptionLabel -> Maybe DynamicSubscriptionLabel
forall a b. (a -> b) -> a -> b
$ ParameterizedQueryHash
-> Maybe OperationName -> DynamicSubscriptionLabel
DynamicSubscriptionLabel ParameterizedQueryHash
parameterizedQueryHash Maybe OperationName
operationName)
        promMetricLabel :: SubscriptionLabel
promMetricLabel = SubscriptionKindLabel
-> Maybe DynamicSubscriptionLabel -> SubscriptionLabel
SubscriptionLabel SubscriptionKindLabel
liveQuerySubscriptionLabel Maybe DynamicSubscriptionLabel
forall a. Maybe a
Nothing
    let numSubscriptionMetric :: GaugeVector SubscriptionLabel
numSubscriptionMetric = SubscriptionMetrics -> GaugeVector SubscriptionLabel
submActiveSubscriptions (SubscriptionMetrics -> GaugeVector SubscriptionLabel)
-> SubscriptionMetrics -> GaugeVector SubscriptionLabel
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics -> SubscriptionMetrics
pmSubscriptionMetrics (PrometheusMetrics -> SubscriptionMetrics)
-> PrometheusMetrics -> SubscriptionMetrics
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics
prometheusMetrics
    IO GranularPrometheusMetricsState
-> Bool -> IO () -> IO () -> IO ()
forall (m :: * -> *).
MonadIO m =>
IO GranularPrometheusMetricsState -> Bool -> IO () -> IO () -> m ()
recordMetricWithLabel
      IO GranularPrometheusMetricsState
granularPrometheusMetricsState
      Bool
True
      (GaugeVector SubscriptionLabel -> SubscriptionLabel -> IO ()
forall label. Ord label => GaugeVector label -> label -> IO ()
GaugeVector.inc GaugeVector SubscriptionLabel
numSubscriptionMetric SubscriptionLabel
promMetricGranularLabel)
      (GaugeVector SubscriptionLabel -> SubscriptionLabel -> IO ()
forall label. Ord label => GaugeVector label -> label -> IO ()
GaugeVector.inc GaugeVector SubscriptionLabel
numSubscriptionMetric SubscriptionLabel
promMetricLabel)
    IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Gauge -> IO ()
EKG.Gauge.inc (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ ServerMetrics -> Gauge
smActiveLiveQueries ServerMetrics
serverMetrics

    LiveQuerySubscriberDetails -> IO LiveQuerySubscriberDetails
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (LiveQuerySubscriberDetails -> IO LiveQuerySubscriberDetails)
-> LiveQuerySubscriberDetails -> IO LiveQuerySubscriberDetails
forall a b. (a -> b) -> a -> b
$ BackendPollerKey
-> CohortVariables -> SubscriberId -> LiveQuerySubscriberDetails
forall a.
BackendPollerKey -> a -> SubscriberId -> SubscriberDetails a
SubscriberDetails BackendPollerKey
handlerId CohortVariables
cohortKey SubscriberId
subscriberId
    where
      SubscriptionsState PollerMap ()
lqMap PollerMap (TVar CursorVariableValues)
_ SubscriptionPostPollHook
postPollHook AsyncActionSubscriptionState
_ = SubscriptionsState
subscriptionState
      SubscriptionQueryPlan (ParameterizedSubscriptionQueryPlan RoleName
role MultiplexedQuery b
query) SourceConfig b
sourceConfig CohortId
cohortId ResolvedConnectionTemplate b
resolvedConnectionTemplate CohortVariables
cohortKey Maybe Name
_ = SubscriptionQueryPlan b (MultiplexedQuery b)
plan

      handlerId :: BackendPollerKey
handlerId = AnyBackend PollerKey -> BackendPollerKey
BackendPollerKey (AnyBackend PollerKey -> BackendPollerKey)
-> AnyBackend PollerKey -> BackendPollerKey
forall a b. (a -> b) -> a -> b
$ forall (b :: BackendType) (i :: BackendType -> *).
HasTag b =>
i b -> AnyBackend i
AB.mkAnyBackend @b (PollerKey b -> AnyBackend PollerKey)
-> PollerKey b -> AnyBackend PollerKey
forall a b. (a -> b) -> a -> b
$ SourceName
-> RoleName
-> Text
-> ResolvedConnectionTemplate b
-> ParameterizedQueryHash
-> PollerKey b
forall (b :: BackendType).
SourceName
-> RoleName
-> Text
-> ResolvedConnectionTemplate b
-> ParameterizedQueryHash
-> PollerKey b
PollerKey SourceName
source RoleName
role (MultiplexedQuery b -> Text
forall a. ToTxt a => a -> Text
toTxt MultiplexedQuery b
query) ResolvedConnectionTemplate b
resolvedConnectionTemplate ParameterizedQueryHash
parameterizedQueryHash

      addToCohort :: Subscriber -> Cohort streamCursorVars -> STM ()
addToCohort Subscriber
subscriber Cohort streamCursorVars
handlerC =
        Subscriber
-> SubscriberId -> TMap SubscriberId Subscriber -> STM ()
forall k v. Hashable k => v -> k -> TMap k v -> STM ()
TMap.insert Subscriber
subscriber (Subscriber -> SubscriberId
_sId Subscriber
subscriber) (TMap SubscriberId Subscriber -> STM ())
-> TMap SubscriberId Subscriber -> STM ()
forall a b. (a -> b) -> a -> b
$ Cohort streamCursorVars -> TMap SubscriberId Subscriber
forall streamCursorVars.
Cohort streamCursorVars -> TMap SubscriberId Subscriber
_cNewSubscribers Cohort streamCursorVars
handlerC

      addToPoller :: Subscriber -> Poller () -> STM ()
addToPoller Subscriber
subscriber Poller ()
handler = do
        !Cohort ()
newCohort <-
          CohortId
-> TVar (Maybe ResponseHash)
-> TMap SubscriberId Subscriber
-> TMap SubscriberId Subscriber
-> ()
-> Cohort ()
forall streamCursorVars.
CohortId
-> TVar (Maybe ResponseHash)
-> TMap SubscriberId Subscriber
-> TMap SubscriberId Subscriber
-> streamCursorVars
-> Cohort streamCursorVars
Cohort CohortId
cohortId
            (TVar (Maybe ResponseHash)
 -> TMap SubscriberId Subscriber
 -> TMap SubscriberId Subscriber
 -> ()
 -> Cohort ())
-> STM (TVar (Maybe ResponseHash))
-> STM
     (TMap SubscriberId Subscriber
      -> TMap SubscriberId Subscriber -> () -> Cohort ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe ResponseHash -> STM (TVar (Maybe ResponseHash))
forall a. a -> STM (TVar a)
STM.newTVar Maybe ResponseHash
forall a. Maybe a
Nothing
            STM
  (TMap SubscriberId Subscriber
   -> TMap SubscriberId Subscriber -> () -> Cohort ())
-> STM (TMap SubscriberId Subscriber)
-> STM (TMap SubscriberId Subscriber -> () -> Cohort ())
forall a b. STM (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> STM (TMap SubscriberId Subscriber)
forall k v. STM (TMap k v)
TMap.new
            STM (TMap SubscriberId Subscriber -> () -> Cohort ())
-> STM (TMap SubscriberId Subscriber) -> STM (() -> Cohort ())
forall a b. STM (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> STM (TMap SubscriberId Subscriber)
forall k v. STM (TMap k v)
TMap.new
            STM (() -> Cohort ()) -> STM () -> STM (Cohort ())
forall a b. STM (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> () -> STM ()
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
        Subscriber -> Cohort () -> STM ()
forall {streamCursorVars}.
Subscriber -> Cohort streamCursorVars -> STM ()
addToCohort Subscriber
subscriber Cohort ()
newCohort
        Cohort () -> CohortVariables -> CohortMap () -> STM ()
forall k v. Hashable k => v -> k -> TMap k v -> STM ()
TMap.insert Cohort ()
newCohort CohortVariables
cohortKey (CohortMap () -> STM ()) -> CohortMap () -> STM ()
forall a b. (a -> b) -> a -> b
$ Poller () -> CohortMap ()
forall streamCursor. Poller streamCursor -> CohortMap streamCursor
_pCohorts Poller ()
handler

-- | Fork a thread handling a streaming subscription
addStreamSubscriptionQuery ::
  forall b.
  (BackendTransport b) =>
  L.Logger L.Hasura ->
  ServerMetrics ->
  PrometheusMetrics ->
  SubscriberMetadata ->
  SubscriptionsState ->
  IO (LiveQueriesOptions, StreamQueriesOptions) ->
  SourceName ->
  ParameterizedQueryHash ->
  -- | operation name of the query
  Maybe OperationName ->
  RequestId ->
  -- | root field name
  G.Name ->
  SubscriptionQueryPlan b (MultiplexedQuery b) ->
  IO GranularPrometheusMetricsState ->
  -- | the action to be executed when result changes
  OnChange ->
  -- | the modifier for adding typename for namespaced queries
  (Maybe (Endo JO.Value)) ->
  IO StreamingSubscriberDetails
addStreamSubscriptionQuery :: forall (b :: BackendType).
BackendTransport b =>
Logger Hasura
-> ServerMetrics
-> PrometheusMetrics
-> SubscriberMetadata
-> SubscriptionsState
-> IO (LiveQueriesOptions, LiveQueriesOptions)
-> SourceName
-> ParameterizedQueryHash
-> Maybe OperationName
-> RequestId
-> Name
-> SubscriptionQueryPlan b (MultiplexedQuery b)
-> IO GranularPrometheusMetricsState
-> OnChange
-> Maybe (Endo Value)
-> IO StreamingSubscriberDetails
addStreamSubscriptionQuery
  Logger Hasura
logger
  ServerMetrics
serverMetrics
  PrometheusMetrics
prometheusMetrics
  SubscriberMetadata
subscriberMetadata
  SubscriptionsState
subscriptionState
  IO (LiveQueriesOptions, LiveQueriesOptions)
getSubscriptionOptions
  SourceName
source
  ParameterizedQueryHash
parameterizedQueryHash
  Maybe OperationName
operationName
  RequestId
requestId
  Name
rootFieldName
  SubscriptionQueryPlan b (MultiplexedQuery b)
plan
  IO GranularPrometheusMetricsState
granularPrometheusMetricsState
  OnChange
onResultAction
  Maybe (Endo Value)
modifier = do
    -- CAREFUL!: It's absolutely crucial that we can't throw any exceptions here!

    -- disposable subscriber UUID:
    SubscriberId
subscriberId <- IO SubscriberId
newSubscriberId
    let !subscriber :: Subscriber
subscriber = SubscriberId
-> SubscriberMetadata
-> RequestId
-> Maybe OperationName
-> OnChange
-> Subscriber
Subscriber SubscriberId
subscriberId SubscriberMetadata
subscriberMetadata RequestId
requestId Maybe OperationName
operationName OnChange
onResultAction

    $String
String -> Subscriber -> IO ()
forall a. String -> a -> IO ()
assertNFHere Subscriber
subscriber -- so we don't write thunks to mutable vars
    (Maybe (Poller (TVar CursorVariableValues))
handlerM, TVar CursorVariableValues
cohortCursorTVar) <-
      STM
  (Maybe (Poller (TVar CursorVariableValues)),
   TVar CursorVariableValues)
-> IO
     (Maybe (Poller (TVar CursorVariableValues)),
      TVar CursorVariableValues)
forall a. STM a -> IO a
STM.atomically
        (STM
   (Maybe (Poller (TVar CursorVariableValues)),
    TVar CursorVariableValues)
 -> IO
      (Maybe (Poller (TVar CursorVariableValues)),
       TVar CursorVariableValues))
-> STM
     (Maybe (Poller (TVar CursorVariableValues)),
      TVar CursorVariableValues)
-> IO
     (Maybe (Poller (TVar CursorVariableValues)),
      TVar CursorVariableValues)
forall a b. (a -> b) -> a -> b
$ Subscriber
-> PollerMap (TVar CursorVariableValues)
-> BackendPollerKey
-> CohortVariables
-> (Subscriber
    -> Cohort (TVar CursorVariableValues)
    -> STM (TVar CursorVariableValues))
-> (Subscriber
    -> Poller (TVar CursorVariableValues)
    -> STM (TVar CursorVariableValues))
-> ParameterizedQueryHash
-> Maybe OperationName
-> STM
     (Maybe (Poller (TVar CursorVariableValues)),
      TVar CursorVariableValues)
forall streamCursorVars.
Subscriber
-> PollerMap streamCursorVars
-> BackendPollerKey
-> CohortVariables
-> (Subscriber -> Cohort streamCursorVars -> STM streamCursorVars)
-> (Subscriber -> Poller streamCursorVars -> STM streamCursorVars)
-> ParameterizedQueryHash
-> Maybe OperationName
-> STM (Maybe (Poller streamCursorVars), streamCursorVars)
findPollerForSubscriber
          Subscriber
subscriber
          PollerMap (TVar CursorVariableValues)
streamQueryMap
          BackendPollerKey
handlerId
          CohortVariables
cohortKey
          Subscriber
-> Cohort (TVar CursorVariableValues)
-> STM (TVar CursorVariableValues)
forall {b}. Subscriber -> Cohort b -> STM b
addToCohort
          Subscriber
-> Poller (TVar CursorVariableValues)
-> STM (TVar CursorVariableValues)
addToPoller
          ParameterizedQueryHash
parameterizedQueryHash
          Maybe OperationName
operationName

    -- we can then attach a polling thread if it is new the subscription can only be
    -- cancelled after putTMVar
    Maybe (Poller (TVar CursorVariableValues))
-> (Poller (TVar CursorVariableValues) -> IO ()) -> IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ Maybe (Poller (TVar CursorVariableValues))
handlerM ((Poller (TVar CursorVariableValues) -> IO ()) -> IO ())
-> (Poller (TVar CursorVariableValues) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Poller (TVar CursorVariableValues)
handler -> do
      PollerId
pollerId <- UUID -> PollerId
PollerId (UUID -> PollerId) -> IO UUID -> IO PollerId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
      Thread
threadRef <- String -> Logger Hasura -> IO Void -> IO Thread
forall (m :: * -> *).
ForkableMonadIO m =>
String -> Logger Hasura -> m Void -> m Thread
forkImmortal (String
"pollStreamingQuery." String -> ShowS
forall a. Semigroup a => a -> a -> a
<> UUID -> String
forall a. Show a => a -> String
show (PollerId -> UUID
unPollerId PollerId
pollerId)) Logger Hasura
logger
        (IO Void -> IO Thread) -> IO Void -> IO Thread
forall a b. (a -> b) -> a -> b
$ IO () -> IO Void
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever
        (IO () -> IO Void) -> IO () -> IO Void
forall a b. (a -> b) -> a -> b
$ do
          (LiveQueriesOptions
_, LiveQueriesOptions
streamQOpts) <- IO (LiveQueriesOptions, LiveQueriesOptions)
getSubscriptionOptions
          let SubscriptionsOptions BatchSize
_ RefetchInterval
refetchInterval = LiveQueriesOptions
streamQOpts
          forall (b :: BackendType).
BackendTransport b =>
PollerId
-> TVar PollerResponseState
-> LiveQueriesOptions
-> (SourceName, SourceConfig b)
-> RoleName
-> ParameterizedQueryHash
-> MultiplexedQuery b
-> CohortMap 'Streaming
-> Name
-> SubscriptionPostPollHook
-> Maybe (IO ())
-> PrometheusMetrics
-> IO GranularPrometheusMetricsState
-> TMap (Maybe OperationName) Int
-> ResolvedConnectionTemplate b
-> Maybe (Endo Value)
-> IO ()
pollStreamingQuery @b
            PollerId
pollerId
            (Poller (TVar CursorVariableValues) -> TVar PollerResponseState
forall streamCursor.
Poller streamCursor -> TVar PollerResponseState
_pPollerState Poller (TVar CursorVariableValues)
handler)
            LiveQueriesOptions
streamQOpts
            (SourceName
source, SourceConfig b
sourceConfig)
            RoleName
role
            ParameterizedQueryHash
parameterizedQueryHash
            MultiplexedQuery b
query
            (Poller (TVar CursorVariableValues)
-> CohortMap (TVar CursorVariableValues)
forall streamCursor. Poller streamCursor -> CohortMap streamCursor
_pCohorts Poller (TVar CursorVariableValues)
handler)
            Name
rootFieldName
            SubscriptionPostPollHook
postPollHook
            Maybe (IO ())
forall a. Maybe a
Nothing
            PrometheusMetrics
prometheusMetrics
            IO GranularPrometheusMetricsState
granularPrometheusMetricsState
            (Poller (TVar CursorVariableValues)
-> TMap (Maybe OperationName) Int
forall streamCursor.
Poller streamCursor -> TMap (Maybe OperationName) Int
_pOperationNamesMap Poller (TVar CursorVariableValues)
handler)
            ResolvedConnectionTemplate b
resolvedConnectionTemplate
            Maybe (Endo Value)
modifier
          DiffTime -> IO ()
sleep (DiffTime -> IO ()) -> DiffTime -> IO ()
forall a b. (a -> b) -> a -> b
$ Refined NonNegative DiffTime -> DiffTime
forall {k} (p :: k) x. Refined p x -> x
unrefine (Refined NonNegative DiffTime -> DiffTime)
-> Refined NonNegative DiffTime -> DiffTime
forall a b. (a -> b) -> a -> b
$ RefetchInterval -> Refined NonNegative DiffTime
unRefetchInterval RefetchInterval
refetchInterval
      let !pState :: PollerIOState
pState = Thread -> PollerId -> PollerIOState
PollerIOState Thread
threadRef PollerId
pollerId
      $String
String -> PollerIOState -> IO ()
forall a. String -> a -> IO ()
assertNFHere PollerIOState
pState -- so we don't write thunks to mutable vars
      STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar PollerIOState -> PollerIOState -> STM ()
forall a. TMVar a -> a -> STM ()
STM.putTMVar (Poller (TVar CursorVariableValues) -> TMVar PollerIOState
forall streamCursor. Poller streamCursor -> TMVar PollerIOState
_pIOState Poller (TVar CursorVariableValues)
handler) PollerIOState
pState
      IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Gauge -> IO ()
Prometheus.Gauge.inc (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ SubscriptionMetrics -> Gauge
submActiveStreamingPollers (SubscriptionMetrics -> Gauge) -> SubscriptionMetrics -> Gauge
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics -> SubscriptionMetrics
pmSubscriptionMetrics (PrometheusMetrics -> SubscriptionMetrics)
-> PrometheusMetrics -> SubscriptionMetrics
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics
prometheusMetrics

    IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
      Gauge -> IO ()
EKG.Gauge.inc (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ ServerMetrics -> Gauge
smActiveSubscriptions ServerMetrics
serverMetrics
      Gauge -> IO ()
EKG.Gauge.inc (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ ServerMetrics -> Gauge
smActiveStreamingSubscriptions ServerMetrics
serverMetrics

    let promMetricGranularLabel :: SubscriptionLabel
promMetricGranularLabel = SubscriptionKindLabel
-> Maybe DynamicSubscriptionLabel -> SubscriptionLabel
SubscriptionLabel SubscriptionKindLabel
streamingSubscriptionLabel (DynamicSubscriptionLabel -> Maybe DynamicSubscriptionLabel
forall a. a -> Maybe a
Just (DynamicSubscriptionLabel -> Maybe DynamicSubscriptionLabel)
-> DynamicSubscriptionLabel -> Maybe DynamicSubscriptionLabel
forall a b. (a -> b) -> a -> b
$ ParameterizedQueryHash
-> Maybe OperationName -> DynamicSubscriptionLabel
DynamicSubscriptionLabel ParameterizedQueryHash
parameterizedQueryHash Maybe OperationName
operationName)
        promMetricLabel :: SubscriptionLabel
promMetricLabel = SubscriptionKindLabel
-> Maybe DynamicSubscriptionLabel -> SubscriptionLabel
SubscriptionLabel SubscriptionKindLabel
streamingSubscriptionLabel Maybe DynamicSubscriptionLabel
forall a. Maybe a
Nothing
        numSubscriptionMetric :: GaugeVector SubscriptionLabel
numSubscriptionMetric = SubscriptionMetrics -> GaugeVector SubscriptionLabel
submActiveSubscriptions (SubscriptionMetrics -> GaugeVector SubscriptionLabel)
-> SubscriptionMetrics -> GaugeVector SubscriptionLabel
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics -> SubscriptionMetrics
pmSubscriptionMetrics (PrometheusMetrics -> SubscriptionMetrics)
-> PrometheusMetrics -> SubscriptionMetrics
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics
prometheusMetrics
    IO GranularPrometheusMetricsState
-> Bool -> IO () -> IO () -> IO ()
forall (m :: * -> *).
MonadIO m =>
IO GranularPrometheusMetricsState -> Bool -> IO () -> IO () -> m ()
recordMetricWithLabel
      IO GranularPrometheusMetricsState
granularPrometheusMetricsState
      Bool
True
      (GaugeVector SubscriptionLabel -> SubscriptionLabel -> IO ()
forall label. Ord label => GaugeVector label -> label -> IO ()
GaugeVector.inc GaugeVector SubscriptionLabel
numSubscriptionMetric SubscriptionLabel
promMetricGranularLabel)
      (GaugeVector SubscriptionLabel -> SubscriptionLabel -> IO ()
forall label. Ord label => GaugeVector label -> label -> IO ()
GaugeVector.inc GaugeVector SubscriptionLabel
numSubscriptionMetric SubscriptionLabel
promMetricLabel)

    StreamingSubscriberDetails -> IO StreamingSubscriberDetails
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (StreamingSubscriberDetails -> IO StreamingSubscriberDetails)
-> StreamingSubscriberDetails -> IO StreamingSubscriberDetails
forall a b. (a -> b) -> a -> b
$ BackendPollerKey
-> (CohortVariables, TVar CursorVariableValues)
-> SubscriberId
-> StreamingSubscriberDetails
forall a.
BackendPollerKey -> a -> SubscriberId -> SubscriberDetails a
SubscriberDetails BackendPollerKey
handlerId (CohortVariables
cohortKey, TVar CursorVariableValues
cohortCursorTVar) SubscriberId
subscriberId
    where
      SubscriptionsState PollerMap ()
_ PollerMap (TVar CursorVariableValues)
streamQueryMap SubscriptionPostPollHook
postPollHook AsyncActionSubscriptionState
_ = SubscriptionsState
subscriptionState
      SubscriptionQueryPlan (ParameterizedSubscriptionQueryPlan RoleName
role MultiplexedQuery b
query) SourceConfig b
sourceConfig CohortId
cohortId ResolvedConnectionTemplate b
resolvedConnectionTemplate CohortVariables
cohortKey Maybe Name
_ = SubscriptionQueryPlan b (MultiplexedQuery b)
plan

      handlerId :: BackendPollerKey
handlerId = AnyBackend PollerKey -> BackendPollerKey
BackendPollerKey (AnyBackend PollerKey -> BackendPollerKey)
-> AnyBackend PollerKey -> BackendPollerKey
forall a b. (a -> b) -> a -> b
$ forall (b :: BackendType) (i :: BackendType -> *).
HasTag b =>
i b -> AnyBackend i
AB.mkAnyBackend @b (PollerKey b -> AnyBackend PollerKey)
-> PollerKey b -> AnyBackend PollerKey
forall a b. (a -> b) -> a -> b
$ SourceName
-> RoleName
-> Text
-> ResolvedConnectionTemplate b
-> ParameterizedQueryHash
-> PollerKey b
forall (b :: BackendType).
SourceName
-> RoleName
-> Text
-> ResolvedConnectionTemplate b
-> ParameterizedQueryHash
-> PollerKey b
PollerKey SourceName
source RoleName
role (MultiplexedQuery b -> Text
forall a. ToTxt a => a -> Text
toTxt MultiplexedQuery b
query) ResolvedConnectionTemplate b
resolvedConnectionTemplate ParameterizedQueryHash
parameterizedQueryHash

      addToCohort :: Subscriber -> Cohort b -> STM b
addToCohort Subscriber
subscriber Cohort b
handlerC = do
        Subscriber
-> SubscriberId -> TMap SubscriberId Subscriber -> STM ()
forall k v. Hashable k => v -> k -> TMap k v -> STM ()
TMap.insert Subscriber
subscriber (Subscriber -> SubscriberId
_sId Subscriber
subscriber) (TMap SubscriberId Subscriber -> STM ())
-> TMap SubscriberId Subscriber -> STM ()
forall a b. (a -> b) -> a -> b
$ Cohort b -> TMap SubscriberId Subscriber
forall streamCursorVars.
Cohort streamCursorVars -> TMap SubscriberId Subscriber
_cNewSubscribers Cohort b
handlerC
        b -> STM b
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (b -> STM b) -> b -> STM b
forall a b. (a -> b) -> a -> b
$ Cohort b -> b
forall streamCursorVars.
Cohort streamCursorVars -> streamCursorVars
_cStreamCursorVariables Cohort b
handlerC

      addToPoller :: Subscriber
-> Poller (TVar CursorVariableValues)
-> STM (TVar CursorVariableValues)
addToPoller Subscriber
subscriber Poller (TVar CursorVariableValues)
handler = do
        TVar CursorVariableValues
latestCursorValues <-
          CursorVariableValues -> STM (TVar CursorVariableValues)
forall a. a -> STM (TVar a)
STM.newTVar (HashMap Name TxtEncodedVal -> CursorVariableValues
CursorVariableValues (ValidatedVariables (HashMap Name) -> HashMap Name TxtEncodedVal
forall (f :: * -> *). ValidatedVariables f -> f TxtEncodedVal
_unValidatedVariables (CohortVariables -> ValidatedVariables (HashMap Name)
_cvCursorVariables CohortVariables
cohortKey)))
        !Cohort (TVar CursorVariableValues)
newCohort <- CohortId
-> TVar (Maybe ResponseHash)
-> TMap SubscriberId Subscriber
-> TMap SubscriberId Subscriber
-> TVar CursorVariableValues
-> Cohort (TVar CursorVariableValues)
forall streamCursorVars.
CohortId
-> TVar (Maybe ResponseHash)
-> TMap SubscriberId Subscriber
-> TMap SubscriberId Subscriber
-> streamCursorVars
-> Cohort streamCursorVars
Cohort CohortId
cohortId (TVar (Maybe ResponseHash)
 -> TMap SubscriberId Subscriber
 -> TMap SubscriberId Subscriber
 -> TVar CursorVariableValues
 -> Cohort (TVar CursorVariableValues))
-> STM (TVar (Maybe ResponseHash))
-> STM
     (TMap SubscriberId Subscriber
      -> TMap SubscriberId Subscriber
      -> TVar CursorVariableValues
      -> Cohort (TVar CursorVariableValues))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe ResponseHash -> STM (TVar (Maybe ResponseHash))
forall a. a -> STM (TVar a)
STM.newTVar Maybe ResponseHash
forall a. Maybe a
Nothing STM
  (TMap SubscriberId Subscriber
   -> TMap SubscriberId Subscriber
   -> TVar CursorVariableValues
   -> Cohort (TVar CursorVariableValues))
-> STM (TMap SubscriberId Subscriber)
-> STM
     (TMap SubscriberId Subscriber
      -> TVar CursorVariableValues -> Cohort (TVar CursorVariableValues))
forall a b. STM (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> STM (TMap SubscriberId Subscriber)
forall k v. STM (TMap k v)
TMap.new STM
  (TMap SubscriberId Subscriber
   -> TVar CursorVariableValues -> Cohort (TVar CursorVariableValues))
-> STM (TMap SubscriberId Subscriber)
-> STM
     (TVar CursorVariableValues -> Cohort (TVar CursorVariableValues))
forall a b. STM (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> STM (TMap SubscriberId Subscriber)
forall k v. STM (TMap k v)
TMap.new STM
  (TVar CursorVariableValues -> Cohort (TVar CursorVariableValues))
-> STM (TVar CursorVariableValues)
-> STM (Cohort (TVar CursorVariableValues))
forall a b. STM (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> TVar CursorVariableValues -> STM (TVar CursorVariableValues)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure TVar CursorVariableValues
latestCursorValues
        TVar CursorVariableValues
cohortCursorVals <- Subscriber
-> Cohort (TVar CursorVariableValues)
-> STM (TVar CursorVariableValues)
forall {b}. Subscriber -> Cohort b -> STM b
addToCohort Subscriber
subscriber Cohort (TVar CursorVariableValues)
newCohort
        Cohort (TVar CursorVariableValues)
-> CohortVariables
-> CohortMap (TVar CursorVariableValues)
-> STM ()
forall k v. Hashable k => v -> k -> TMap k v -> STM ()
TMap.insert Cohort (TVar CursorVariableValues)
newCohort CohortVariables
cohortKey (CohortMap (TVar CursorVariableValues) -> STM ())
-> CohortMap (TVar CursorVariableValues) -> STM ()
forall a b. (a -> b) -> a -> b
$ Poller (TVar CursorVariableValues)
-> CohortMap (TVar CursorVariableValues)
forall streamCursor. Poller streamCursor -> CohortMap streamCursor
_pCohorts Poller (TVar CursorVariableValues)
handler
        TVar CursorVariableValues -> STM (TVar CursorVariableValues)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure TVar CursorVariableValues
cohortCursorVals

removeLiveQuery ::
  L.Logger L.Hasura ->
  ServerMetrics ->
  PrometheusMetrics ->
  SubscriptionsState ->
  -- the query and the associated operation
  LiveQuerySubscriberDetails ->
  IO GranularPrometheusMetricsState ->
  Maybe OperationName ->
  IO ()
removeLiveQuery :: Logger Hasura
-> ServerMetrics
-> PrometheusMetrics
-> SubscriptionsState
-> LiveQuerySubscriberDetails
-> IO GranularPrometheusMetricsState
-> Maybe OperationName
-> IO ()
removeLiveQuery Logger Hasura
logger ServerMetrics
serverMetrics PrometheusMetrics
prometheusMetrics SubscriptionsState
lqState lqId :: LiveQuerySubscriberDetails
lqId@(SubscriberDetails BackendPollerKey
handlerId CohortVariables
cohortId SubscriberId
sinkId) IO GranularPrometheusMetricsState
granularPrometheusMetricsState Maybe OperationName
maybeOperationName = IO () -> IO ()
forall a. IO a -> IO a
mask_ (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
  IO (IO ()) -> IO ()
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join
    (IO (IO ()) -> IO ()) -> IO (IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ STM (IO ()) -> IO (IO ())
forall a. STM a -> IO a
STM.atomically
    (STM (IO ()) -> IO (IO ())) -> STM (IO ()) -> IO (IO ())
forall a b. (a -> b) -> a -> b
$ do
      Maybe (Poller (), Cohort ())
detM <- PollerMap () -> STM (Maybe (Poller (), Cohort ()))
getQueryDet PollerMap ()
lqMap
      case Maybe (Poller (), Cohort ())
detM of
        Maybe (Poller (), Cohort ())
Nothing -> IO () -> STM (IO ())
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (() -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
        Just (Poller CohortMap ()
cohorts TVar PollerResponseState
pollerState TMVar PollerIOState
ioState ParameterizedQueryHash
parameterizedQueryHash TMap (Maybe OperationName) Int
operationNamesMap, Cohort ()
cohort) -> do
          Maybe OperationName
-> TMap (Maybe OperationName) Int -> STM (Maybe Int)
forall k v. Hashable k => k -> TMap k v -> STM (Maybe v)
TMap.lookup Maybe OperationName
maybeOperationName TMap (Maybe OperationName) Int
operationNamesMap STM (Maybe Int) -> (Maybe Int -> STM ()) -> STM ()
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
            -- If only one operation name is present in the map, delete it
            Just Int
1 -> Maybe OperationName -> TMap (Maybe OperationName) Int -> STM ()
forall k v. Hashable k => k -> TMap k v -> STM ()
TMap.delete Maybe OperationName
maybeOperationName TMap (Maybe OperationName) Int
operationNamesMap
            -- If the count of a operation name is more than 1, then it means there
            -- are more subscriptions with the same name and we should keep emitting
            -- the metrics until the all the subscription with that operaion name are
            -- removed
            Just Int
_ -> (Int -> Int)
-> Maybe OperationName -> TMap (Maybe OperationName) Int -> STM ()
forall k v. Hashable k => (v -> v) -> k -> TMap k v -> STM ()
TMap.adjust (\Int
v -> Int
v Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1) Maybe OperationName
maybeOperationName TMap (Maybe OperationName) Int
operationNamesMap
            Maybe Int
Nothing -> () -> STM ()
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
          CohortMap ()
-> TVar PollerResponseState
-> TMVar PollerIOState
-> Cohort ()
-> ParameterizedQueryHash
-> STM (IO ())
cleanHandlerC CohortMap ()
cohorts TVar PollerResponseState
pollerState TMVar PollerIOState
ioState Cohort ()
cohort ParameterizedQueryHash
parameterizedQueryHash
  IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Gauge -> IO ()
EKG.Gauge.dec (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ ServerMetrics -> Gauge
smActiveSubscriptions ServerMetrics
serverMetrics
  IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Gauge -> IO ()
EKG.Gauge.dec (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ ServerMetrics -> Gauge
smActiveLiveQueries ServerMetrics
serverMetrics
  where
    lqMap :: PollerMap ()
lqMap = SubscriptionsState -> PollerMap ()
_ssLiveQueryMap SubscriptionsState
lqState

    getQueryDet :: PollerMap () -> STM (Maybe (Poller (), Cohort ()))
getQueryDet PollerMap ()
subMap = do
      Maybe (Poller ())
pollerM <- BackendPollerKey -> PollerMap () -> STM (Maybe (Poller ()))
forall key value.
(Eq key, Hashable key) =>
key -> Map key value -> STM (Maybe value)
STMMap.lookup BackendPollerKey
handlerId PollerMap ()
subMap
      (Maybe (Maybe (Poller (), Cohort ()))
 -> Maybe (Poller (), Cohort ()))
-> STM (Maybe (Maybe (Poller (), Cohort ())))
-> STM (Maybe (Poller (), Cohort ()))
forall a b. (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Maybe (Maybe (Poller (), Cohort ()))
-> Maybe (Poller (), Cohort ())
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join
        (STM (Maybe (Maybe (Poller (), Cohort ())))
 -> STM (Maybe (Poller (), Cohort ())))
-> STM (Maybe (Maybe (Poller (), Cohort ())))
-> STM (Maybe (Poller (), Cohort ()))
forall a b. (a -> b) -> a -> b
$ Maybe (Poller ())
-> (Poller () -> STM (Maybe (Poller (), Cohort ())))
-> STM (Maybe (Maybe (Poller (), Cohort ())))
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM Maybe (Poller ())
pollerM
        ((Poller () -> STM (Maybe (Poller (), Cohort ())))
 -> STM (Maybe (Maybe (Poller (), Cohort ()))))
-> (Poller () -> STM (Maybe (Poller (), Cohort ())))
-> STM (Maybe (Maybe (Poller (), Cohort ())))
forall a b. (a -> b) -> a -> b
$ \Poller ()
poller -> do
          Maybe (Cohort ())
cohortM <- CohortVariables -> CohortMap () -> STM (Maybe (Cohort ()))
forall k v. Hashable k => k -> TMap k v -> STM (Maybe v)
TMap.lookup CohortVariables
cohortId (Poller () -> CohortMap ()
forall streamCursor. Poller streamCursor -> CohortMap streamCursor
_pCohorts Poller ()
poller)
          Maybe (Poller (), Cohort ()) -> STM (Maybe (Poller (), Cohort ()))
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (Poller (), Cohort ())
 -> STM (Maybe (Poller (), Cohort ())))
-> Maybe (Poller (), Cohort ())
-> STM (Maybe (Poller (), Cohort ()))
forall a b. (a -> b) -> a -> b
$ (Poller ()
poller,) (Cohort () -> (Poller (), Cohort ()))
-> Maybe (Cohort ()) -> Maybe (Poller (), Cohort ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (Cohort ())
cohortM

    cleanHandlerC :: CohortMap ()
-> TVar PollerResponseState
-> TMVar PollerIOState
-> Cohort ()
-> ParameterizedQueryHash
-> STM (IO ())
cleanHandlerC CohortMap ()
cohortMap TVar PollerResponseState
pollerState TMVar PollerIOState
ioState Cohort ()
handlerC ParameterizedQueryHash
parameterizedQueryHash = do
      let curOps :: TMap SubscriberId Subscriber
curOps = Cohort () -> TMap SubscriberId Subscriber
forall streamCursorVars.
Cohort streamCursorVars -> TMap SubscriberId Subscriber
_cExistingSubscribers Cohort ()
handlerC
          newOps :: TMap SubscriberId Subscriber
newOps = Cohort () -> TMap SubscriberId Subscriber
forall streamCursorVars.
Cohort streamCursorVars -> TMap SubscriberId Subscriber
_cNewSubscribers Cohort ()
handlerC
      SubscriberId -> TMap SubscriberId Subscriber -> STM ()
forall k v. Hashable k => k -> TMap k v -> STM ()
TMap.delete SubscriberId
sinkId TMap SubscriberId Subscriber
curOps
      SubscriberId -> TMap SubscriberId Subscriber -> STM ()
forall k v. Hashable k => k -> TMap k v -> STM ()
TMap.delete SubscriberId
sinkId TMap SubscriberId Subscriber
newOps
      Bool
cohortIsEmpty <-
        Bool -> Bool -> Bool
(&&)
          (Bool -> Bool -> Bool) -> STM Bool -> STM (Bool -> Bool)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TMap SubscriberId Subscriber -> STM Bool
forall k v. TMap k v -> STM Bool
TMap.null TMap SubscriberId Subscriber
curOps
          STM (Bool -> Bool) -> STM Bool -> STM Bool
forall a b. STM (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> TMap SubscriberId Subscriber -> STM Bool
forall k v. TMap k v -> STM Bool
TMap.null TMap SubscriberId Subscriber
newOps
      Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
cohortIsEmpty (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ CohortVariables -> CohortMap () -> STM ()
forall k v. Hashable k => k -> TMap k v -> STM ()
TMap.delete CohortVariables
cohortId CohortMap ()
cohortMap
      Bool
handlerIsEmpty <- CohortMap () -> STM Bool
forall k v. TMap k v -> STM Bool
TMap.null CohortMap ()
cohortMap
      let promMetricGranularLabel :: SubscriptionLabel
promMetricGranularLabel = SubscriptionKindLabel
-> Maybe DynamicSubscriptionLabel -> SubscriptionLabel
SubscriptionLabel SubscriptionKindLabel
liveQuerySubscriptionLabel (DynamicSubscriptionLabel -> Maybe DynamicSubscriptionLabel
forall a. a -> Maybe a
Just (DynamicSubscriptionLabel -> Maybe DynamicSubscriptionLabel)
-> DynamicSubscriptionLabel -> Maybe DynamicSubscriptionLabel
forall a b. (a -> b) -> a -> b
$ ParameterizedQueryHash
-> Maybe OperationName -> DynamicSubscriptionLabel
DynamicSubscriptionLabel ParameterizedQueryHash
parameterizedQueryHash Maybe OperationName
maybeOperationName)
          promMetricLabel :: SubscriptionLabel
promMetricLabel = SubscriptionKindLabel
-> Maybe DynamicSubscriptionLabel -> SubscriptionLabel
SubscriptionLabel SubscriptionKindLabel
liveQuerySubscriptionLabel Maybe DynamicSubscriptionLabel
forall a. Maybe a
Nothing
      -- when there is no need for handler i.e, this happens to be the last
      -- operation, take the ref for the polling thread to cancel it
      if Bool
handlerIsEmpty
        then do
          BackendPollerKey -> PollerMap () -> STM ()
forall key value.
(Eq key, Hashable key) =>
key -> Map key value -> STM ()
STMMap.delete BackendPollerKey
handlerId PollerMap ()
lqMap
          Maybe Thread
threadRefM <- (PollerIOState -> Thread) -> Maybe PollerIOState -> Maybe Thread
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap PollerIOState -> Thread
_pThread (Maybe PollerIOState -> Maybe Thread)
-> STM (Maybe PollerIOState) -> STM (Maybe Thread)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TMVar PollerIOState -> STM (Maybe PollerIOState)
forall a. TMVar a -> STM (Maybe a)
STM.tryReadTMVar TMVar PollerIOState
ioState
          IO () -> STM (IO ())
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return
            (IO () -> STM (IO ())) -> IO () -> STM (IO ())
forall a b. (a -> b) -> a -> b
$
            -- deferred IO:
            case Maybe Thread
threadRefM of
              Just Thread
threadRef -> do
                Thread -> IO ()
Immortal.stop Thread
threadRef
                IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
                  PollerResponseState
pollerLastState <- TVar PollerResponseState -> IO PollerResponseState
forall a. TVar a -> IO a
STM.readTVarIO TVar PollerResponseState
pollerState
                  Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (PollerResponseState
pollerLastState PollerResponseState -> PollerResponseState -> Bool
forall a. Eq a => a -> a -> Bool
== PollerResponseState
PRSError)
                    (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Gauge -> IO ()
Prometheus.Gauge.dec
                    (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ SubscriptionMetrics -> Gauge
submActiveLiveQueryPollersInError
                    (SubscriptionMetrics -> Gauge) -> SubscriptionMetrics -> Gauge
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics -> SubscriptionMetrics
pmSubscriptionMetrics PrometheusMetrics
prometheusMetrics
                  Gauge -> IO ()
Prometheus.Gauge.dec (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ SubscriptionMetrics -> Gauge
submActiveLiveQueryPollers (SubscriptionMetrics -> Gauge) -> SubscriptionMetrics -> Gauge
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics -> SubscriptionMetrics
pmSubscriptionMetrics PrometheusMetrics
prometheusMetrics
                  let numSubscriptionMetric :: GaugeVector SubscriptionLabel
numSubscriptionMetric = SubscriptionMetrics -> GaugeVector SubscriptionLabel
submActiveSubscriptions (SubscriptionMetrics -> GaugeVector SubscriptionLabel)
-> SubscriptionMetrics -> GaugeVector SubscriptionLabel
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics -> SubscriptionMetrics
pmSubscriptionMetrics (PrometheusMetrics -> SubscriptionMetrics)
-> PrometheusMetrics -> SubscriptionMetrics
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics
prometheusMetrics
                  IO GranularPrometheusMetricsState
-> Bool -> IO () -> IO () -> IO ()
forall (m :: * -> *).
MonadIO m =>
IO GranularPrometheusMetricsState -> Bool -> IO () -> IO () -> m ()
recordMetricWithLabel
                    IO GranularPrometheusMetricsState
granularPrometheusMetricsState
                    Bool
True
                    (GaugeVector SubscriptionLabel -> SubscriptionLabel -> IO ()
forall label. Ord label => GaugeVector label -> label -> IO ()
GaugeVector.dec GaugeVector SubscriptionLabel
numSubscriptionMetric SubscriptionLabel
promMetricGranularLabel)
                    (GaugeVector SubscriptionLabel -> SubscriptionLabel -> IO ()
forall label. Ord label => GaugeVector label -> label -> IO ()
GaugeVector.dec GaugeVector SubscriptionLabel
numSubscriptionMetric SubscriptionLabel
promMetricLabel)
              -- This would seem to imply addLiveQuery broke or a bug
              -- elsewhere. Be paranoid and log:
              Maybe Thread
Nothing ->
                Logger Hasura
-> forall a (m :: * -> *).
   (ToEngineLog a Hasura, MonadIO m) =>
   a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
   (ToEngineLog a impl, MonadIO m) =>
   a -> m ()
L.unLogger Logger Hasura
logger
                  (UnstructuredLog -> IO ()) -> UnstructuredLog -> IO ()
forall a b. (a -> b) -> a -> b
$ LogLevel -> SerializableBlob -> UnstructuredLog
L.UnstructuredLog LogLevel
L.LevelError
                  (SerializableBlob -> UnstructuredLog)
-> SerializableBlob -> UnstructuredLog
forall a b. (a -> b) -> a -> b
$ String -> SerializableBlob
forall a. IsString a => String -> a
fromString
                  (String -> SerializableBlob) -> String -> SerializableBlob
forall a b. (a -> b) -> a -> b
$ String
"In removeLiveQuery no worker thread installed. Please report this as a bug: "
                  String -> ShowS
forall a. Semigroup a => a -> a -> a
<> LiveQuerySubscriberDetails -> String
forall a. Show a => a -> String
show LiveQuerySubscriberDetails
lqId
        else do
          let numSubscriptionMetric :: GaugeVector SubscriptionLabel
numSubscriptionMetric = SubscriptionMetrics -> GaugeVector SubscriptionLabel
submActiveSubscriptions (SubscriptionMetrics -> GaugeVector SubscriptionLabel)
-> SubscriptionMetrics -> GaugeVector SubscriptionLabel
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics -> SubscriptionMetrics
pmSubscriptionMetrics (PrometheusMetrics -> SubscriptionMetrics)
-> PrometheusMetrics -> SubscriptionMetrics
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics
prometheusMetrics
          IO () -> STM (IO ())
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return
            (IO () -> STM (IO ())) -> IO () -> STM (IO ())
forall a b. (a -> b) -> a -> b
$ IO GranularPrometheusMetricsState
-> Bool -> IO () -> IO () -> IO ()
forall (m :: * -> *).
MonadIO m =>
IO GranularPrometheusMetricsState -> Bool -> IO () -> IO () -> m ()
recordMetricWithLabel
              IO GranularPrometheusMetricsState
granularPrometheusMetricsState
              Bool
True
              (GaugeVector SubscriptionLabel -> SubscriptionLabel -> IO ()
forall label. Ord label => GaugeVector label -> label -> IO ()
GaugeVector.dec GaugeVector SubscriptionLabel
numSubscriptionMetric SubscriptionLabel
promMetricGranularLabel)
              (GaugeVector SubscriptionLabel -> SubscriptionLabel -> IO ()
forall label. Ord label => GaugeVector label -> label -> IO ()
GaugeVector.dec GaugeVector SubscriptionLabel
numSubscriptionMetric SubscriptionLabel
promMetricLabel)

removeStreamingQuery ::
  L.Logger L.Hasura ->
  ServerMetrics ->
  PrometheusMetrics ->
  SubscriptionsState ->
  -- the query and the associated operation
  StreamingSubscriberDetails ->
  IO GranularPrometheusMetricsState ->
  Maybe OperationName ->
  IO ()
removeStreamingQuery :: Logger Hasura
-> ServerMetrics
-> PrometheusMetrics
-> SubscriptionsState
-> StreamingSubscriberDetails
-> IO GranularPrometheusMetricsState
-> Maybe OperationName
-> IO ()
removeStreamingQuery Logger Hasura
logger ServerMetrics
serverMetrics PrometheusMetrics
prometheusMetrics SubscriptionsState
subscriptionState (SubscriberDetails BackendPollerKey
handlerId (CohortVariables
cohortId, TVar CursorVariableValues
cursorVariableTV) SubscriberId
sinkId) IO GranularPrometheusMetricsState
granularPrometheusMetricsState Maybe OperationName
maybeOperationName = IO () -> IO ()
forall a. IO a -> IO a
mask_ (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
  IO (IO ()) -> IO ()
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join
    (IO (IO ()) -> IO ()) -> IO (IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ STM (IO ()) -> IO (IO ())
forall a. STM a -> IO a
STM.atomically
    (STM (IO ()) -> IO (IO ())) -> STM (IO ()) -> IO (IO ())
forall a b. (a -> b) -> a -> b
$ do
      Maybe
  (Poller (TVar CursorVariableValues), CohortVariables,
   Cohort (TVar CursorVariableValues))
detM <- PollerMap (TVar CursorVariableValues)
-> STM
     (Maybe
        (Poller (TVar CursorVariableValues), CohortVariables,
         Cohort (TVar CursorVariableValues)))
getQueryDet PollerMap (TVar CursorVariableValues)
streamQMap
      case Maybe
  (Poller (TVar CursorVariableValues), CohortVariables,
   Cohort (TVar CursorVariableValues))
detM of
        Maybe
  (Poller (TVar CursorVariableValues), CohortVariables,
   Cohort (TVar CursorVariableValues))
Nothing -> IO () -> STM (IO ())
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (() -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
        Just (Poller CohortMap (TVar CursorVariableValues)
cohorts TVar PollerResponseState
pollerState TMVar PollerIOState
ioState ParameterizedQueryHash
parameterizedQueryHash TMap (Maybe OperationName) Int
operationNamesMap, CohortVariables
currentCohortId, Cohort (TVar CursorVariableValues)
cohort) -> do
          Maybe OperationName
-> TMap (Maybe OperationName) Int -> STM (Maybe Int)
forall k v. Hashable k => k -> TMap k v -> STM (Maybe v)
TMap.lookup Maybe OperationName
maybeOperationName TMap (Maybe OperationName) Int
operationNamesMap STM (Maybe Int) -> (Maybe Int -> STM ()) -> STM ()
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
            -- If only one operation name is present in the map, delete it
            Just Int
1 -> Maybe OperationName -> TMap (Maybe OperationName) Int -> STM ()
forall k v. Hashable k => k -> TMap k v -> STM ()
TMap.delete Maybe OperationName
maybeOperationName TMap (Maybe OperationName) Int
operationNamesMap
            -- If the count of a operation name is more than 1, then it means there
            -- are more subscriptions with the same name and we should keep emitting
            -- the metrics until the all the subscription with the operaion name are
            -- removed
            Just Int
_ -> (Int -> Int)
-> Maybe OperationName -> TMap (Maybe OperationName) Int -> STM ()
forall k v. Hashable k => (v -> v) -> k -> TMap k v -> STM ()
TMap.adjust (\Int
v -> Int
v Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1) Maybe OperationName
maybeOperationName TMap (Maybe OperationName) Int
operationNamesMap
            Maybe Int
Nothing -> () -> STM ()
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
          CohortMap (TVar CursorVariableValues)
-> TVar PollerResponseState
-> TMVar PollerIOState
-> (Cohort (TVar CursorVariableValues), CohortVariables)
-> ParameterizedQueryHash
-> STM (IO ())
cleanHandlerC CohortMap (TVar CursorVariableValues)
cohorts TVar PollerResponseState
pollerState TMVar PollerIOState
ioState (Cohort (TVar CursorVariableValues)
cohort, CohortVariables
currentCohortId) ParameterizedQueryHash
parameterizedQueryHash
  IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    Gauge -> IO ()
EKG.Gauge.dec (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ ServerMetrics -> Gauge
smActiveSubscriptions ServerMetrics
serverMetrics
    Gauge -> IO ()
EKG.Gauge.dec (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ ServerMetrics -> Gauge
smActiveStreamingSubscriptions ServerMetrics
serverMetrics
  where
    streamQMap :: PollerMap (TVar CursorVariableValues)
streamQMap = SubscriptionsState -> PollerMap (TVar CursorVariableValues)
_ssStreamQueryMap SubscriptionsState
subscriptionState

    getQueryDet :: PollerMap (TVar CursorVariableValues)
-> STM
     (Maybe
        (Poller (TVar CursorVariableValues), CohortVariables,
         Cohort (TVar CursorVariableValues)))
getQueryDet PollerMap (TVar CursorVariableValues)
subMap = do
      Maybe (Poller (TVar CursorVariableValues))
pollerM <- BackendPollerKey
-> PollerMap (TVar CursorVariableValues)
-> STM (Maybe (Poller (TVar CursorVariableValues)))
forall key value.
(Eq key, Hashable key) =>
key -> Map key value -> STM (Maybe value)
STMMap.lookup BackendPollerKey
handlerId PollerMap (TVar CursorVariableValues)
subMap
      (CursorVariableValues HashMap Name TxtEncodedVal
currentCohortCursorVal) <- TVar CursorVariableValues -> STM CursorVariableValues
forall a. TVar a -> STM a
STM.readTVar TVar CursorVariableValues
cursorVariableTV
      let updatedCohortId :: CohortVariables
updatedCohortId = ValidatedVariables (HashMap Name)
-> CohortVariables -> CohortVariables
modifyCursorCohortVariables (HashMap Name TxtEncodedVal -> ValidatedVariables (HashMap Name)
forall (f :: * -> *). f TxtEncodedVal -> ValidatedVariables f
mkUnsafeValidateVariables HashMap Name TxtEncodedVal
currentCohortCursorVal) CohortVariables
cohortId
      (Maybe
   (Maybe
      (Poller (TVar CursorVariableValues), CohortVariables,
       Cohort (TVar CursorVariableValues)))
 -> Maybe
      (Poller (TVar CursorVariableValues), CohortVariables,
       Cohort (TVar CursorVariableValues)))
-> STM
     (Maybe
        (Maybe
           (Poller (TVar CursorVariableValues), CohortVariables,
            Cohort (TVar CursorVariableValues))))
-> STM
     (Maybe
        (Poller (TVar CursorVariableValues), CohortVariables,
         Cohort (TVar CursorVariableValues)))
forall a b. (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Maybe
  (Maybe
     (Poller (TVar CursorVariableValues), CohortVariables,
      Cohort (TVar CursorVariableValues)))
-> Maybe
     (Poller (TVar CursorVariableValues), CohortVariables,
      Cohort (TVar CursorVariableValues))
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join
        (STM
   (Maybe
      (Maybe
         (Poller (TVar CursorVariableValues), CohortVariables,
          Cohort (TVar CursorVariableValues))))
 -> STM
      (Maybe
         (Poller (TVar CursorVariableValues), CohortVariables,
          Cohort (TVar CursorVariableValues))))
-> STM
     (Maybe
        (Maybe
           (Poller (TVar CursorVariableValues), CohortVariables,
            Cohort (TVar CursorVariableValues))))
-> STM
     (Maybe
        (Poller (TVar CursorVariableValues), CohortVariables,
         Cohort (TVar CursorVariableValues)))
forall a b. (a -> b) -> a -> b
$ Maybe (Poller (TVar CursorVariableValues))
-> (Poller (TVar CursorVariableValues)
    -> STM
         (Maybe
            (Poller (TVar CursorVariableValues), CohortVariables,
             Cohort (TVar CursorVariableValues))))
-> STM
     (Maybe
        (Maybe
           (Poller (TVar CursorVariableValues), CohortVariables,
            Cohort (TVar CursorVariableValues))))
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM Maybe (Poller (TVar CursorVariableValues))
pollerM
        ((Poller (TVar CursorVariableValues)
  -> STM
       (Maybe
          (Poller (TVar CursorVariableValues), CohortVariables,
           Cohort (TVar CursorVariableValues))))
 -> STM
      (Maybe
         (Maybe
            (Poller (TVar CursorVariableValues), CohortVariables,
             Cohort (TVar CursorVariableValues)))))
-> (Poller (TVar CursorVariableValues)
    -> STM
         (Maybe
            (Poller (TVar CursorVariableValues), CohortVariables,
             Cohort (TVar CursorVariableValues))))
-> STM
     (Maybe
        (Maybe
           (Poller (TVar CursorVariableValues), CohortVariables,
            Cohort (TVar CursorVariableValues))))
forall a b. (a -> b) -> a -> b
$ \Poller (TVar CursorVariableValues)
poller -> do
          Maybe (Cohort (TVar CursorVariableValues))
cohortM <- CohortVariables
-> CohortMap (TVar CursorVariableValues)
-> STM (Maybe (Cohort (TVar CursorVariableValues)))
forall k v. Hashable k => k -> TMap k v -> STM (Maybe v)
TMap.lookup CohortVariables
updatedCohortId (Poller (TVar CursorVariableValues)
-> CohortMap (TVar CursorVariableValues)
forall streamCursor. Poller streamCursor -> CohortMap streamCursor
_pCohorts Poller (TVar CursorVariableValues)
poller)
          Maybe
  (Poller (TVar CursorVariableValues), CohortVariables,
   Cohort (TVar CursorVariableValues))
-> STM
     (Maybe
        (Poller (TVar CursorVariableValues), CohortVariables,
         Cohort (TVar CursorVariableValues)))
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe
   (Poller (TVar CursorVariableValues), CohortVariables,
    Cohort (TVar CursorVariableValues))
 -> STM
      (Maybe
         (Poller (TVar CursorVariableValues), CohortVariables,
          Cohort (TVar CursorVariableValues))))
-> Maybe
     (Poller (TVar CursorVariableValues), CohortVariables,
      Cohort (TVar CursorVariableValues))
-> STM
     (Maybe
        (Poller (TVar CursorVariableValues), CohortVariables,
         Cohort (TVar CursorVariableValues)))
forall a b. (a -> b) -> a -> b
$ (Poller (TVar CursorVariableValues)
poller,CohortVariables
updatedCohortId,) (Cohort (TVar CursorVariableValues)
 -> (Poller (TVar CursorVariableValues), CohortVariables,
     Cohort (TVar CursorVariableValues)))
-> Maybe (Cohort (TVar CursorVariableValues))
-> Maybe
     (Poller (TVar CursorVariableValues), CohortVariables,
      Cohort (TVar CursorVariableValues))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (Cohort (TVar CursorVariableValues))
cohortM

    cleanHandlerC :: CohortMap (TVar CursorVariableValues)
-> TVar PollerResponseState
-> TMVar PollerIOState
-> (Cohort (TVar CursorVariableValues), CohortVariables)
-> ParameterizedQueryHash
-> STM (IO ())
cleanHandlerC CohortMap (TVar CursorVariableValues)
cohortMap TVar PollerResponseState
pollerState TMVar PollerIOState
ioState (Cohort (TVar CursorVariableValues)
handlerC, CohortVariables
currentCohortId) ParameterizedQueryHash
parameterizedQueryHash = do
      let curOps :: TMap SubscriberId Subscriber
curOps = Cohort (TVar CursorVariableValues) -> TMap SubscriberId Subscriber
forall streamCursorVars.
Cohort streamCursorVars -> TMap SubscriberId Subscriber
_cExistingSubscribers Cohort (TVar CursorVariableValues)
handlerC
          newOps :: TMap SubscriberId Subscriber
newOps = Cohort (TVar CursorVariableValues) -> TMap SubscriberId Subscriber
forall streamCursorVars.
Cohort streamCursorVars -> TMap SubscriberId Subscriber
_cNewSubscribers Cohort (TVar CursorVariableValues)
handlerC
      SubscriberId -> TMap SubscriberId Subscriber -> STM ()
forall k v. Hashable k => k -> TMap k v -> STM ()
TMap.delete SubscriberId
sinkId TMap SubscriberId Subscriber
curOps
      SubscriberId -> TMap SubscriberId Subscriber -> STM ()
forall k v. Hashable k => k -> TMap k v -> STM ()
TMap.delete SubscriberId
sinkId TMap SubscriberId Subscriber
newOps
      Bool
cohortIsEmpty <-
        Bool -> Bool -> Bool
(&&)
          (Bool -> Bool -> Bool) -> STM Bool -> STM (Bool -> Bool)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TMap SubscriberId Subscriber -> STM Bool
forall k v. TMap k v -> STM Bool
TMap.null TMap SubscriberId Subscriber
curOps
          STM (Bool -> Bool) -> STM Bool -> STM Bool
forall a b. STM (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> TMap SubscriberId Subscriber -> STM Bool
forall k v. TMap k v -> STM Bool
TMap.null TMap SubscriberId Subscriber
newOps
      Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
cohortIsEmpty (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ CohortVariables -> CohortMap (TVar CursorVariableValues) -> STM ()
forall k v. Hashable k => k -> TMap k v -> STM ()
TMap.delete CohortVariables
currentCohortId CohortMap (TVar CursorVariableValues)
cohortMap
      Bool
handlerIsEmpty <- CohortMap (TVar CursorVariableValues) -> STM Bool
forall k v. TMap k v -> STM Bool
TMap.null CohortMap (TVar CursorVariableValues)
cohortMap
      let promMetricGranularLabel :: SubscriptionLabel
promMetricGranularLabel = SubscriptionKindLabel
-> Maybe DynamicSubscriptionLabel -> SubscriptionLabel
SubscriptionLabel SubscriptionKindLabel
streamingSubscriptionLabel (DynamicSubscriptionLabel -> Maybe DynamicSubscriptionLabel
forall a. a -> Maybe a
Just (DynamicSubscriptionLabel -> Maybe DynamicSubscriptionLabel)
-> DynamicSubscriptionLabel -> Maybe DynamicSubscriptionLabel
forall a b. (a -> b) -> a -> b
$ ParameterizedQueryHash
-> Maybe OperationName -> DynamicSubscriptionLabel
DynamicSubscriptionLabel ParameterizedQueryHash
parameterizedQueryHash Maybe OperationName
maybeOperationName)
          promMetricLabel :: SubscriptionLabel
promMetricLabel = SubscriptionKindLabel
-> Maybe DynamicSubscriptionLabel -> SubscriptionLabel
SubscriptionLabel SubscriptionKindLabel
streamingSubscriptionLabel Maybe DynamicSubscriptionLabel
forall a. Maybe a
Nothing
      -- when there is no need for handler i.e,
      -- operation, take the ref for the polling thread to cancel it
      if Bool
handlerIsEmpty
        then do
          BackendPollerKey -> PollerMap (TVar CursorVariableValues) -> STM ()
forall key value.
(Eq key, Hashable key) =>
key -> Map key value -> STM ()
STMMap.delete BackendPollerKey
handlerId PollerMap (TVar CursorVariableValues)
streamQMap
          Maybe Thread
threadRefM <- (PollerIOState -> Thread) -> Maybe PollerIOState -> Maybe Thread
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap PollerIOState -> Thread
_pThread (Maybe PollerIOState -> Maybe Thread)
-> STM (Maybe PollerIOState) -> STM (Maybe Thread)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TMVar PollerIOState -> STM (Maybe PollerIOState)
forall a. TMVar a -> STM (Maybe a)
STM.tryReadTMVar TMVar PollerIOState
ioState
          IO () -> STM (IO ())
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return
            (IO () -> STM (IO ())) -> IO () -> STM (IO ())
forall a b. (a -> b) -> a -> b
$
            -- deferred IO:
            case Maybe Thread
threadRefM of
              Just Thread
threadRef -> do
                Thread -> IO ()
Immortal.stop Thread
threadRef
                IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
                  PollerResponseState
pollerLastState <- TVar PollerResponseState -> IO PollerResponseState
forall a. TVar a -> IO a
STM.readTVarIO TVar PollerResponseState
pollerState
                  Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (PollerResponseState
pollerLastState PollerResponseState -> PollerResponseState -> Bool
forall a. Eq a => a -> a -> Bool
== PollerResponseState
PRSError)
                    (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Gauge -> IO ()
Prometheus.Gauge.dec
                    (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ SubscriptionMetrics -> Gauge
submActiveStreamingPollersInError
                    (SubscriptionMetrics -> Gauge) -> SubscriptionMetrics -> Gauge
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics -> SubscriptionMetrics
pmSubscriptionMetrics PrometheusMetrics
prometheusMetrics
                  Gauge -> IO ()
Prometheus.Gauge.dec (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ SubscriptionMetrics -> Gauge
submActiveStreamingPollers (SubscriptionMetrics -> Gauge) -> SubscriptionMetrics -> Gauge
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics -> SubscriptionMetrics
pmSubscriptionMetrics PrometheusMetrics
prometheusMetrics
                  let numSubscriptionMetric :: GaugeVector SubscriptionLabel
numSubscriptionMetric = SubscriptionMetrics -> GaugeVector SubscriptionLabel
submActiveSubscriptions (SubscriptionMetrics -> GaugeVector SubscriptionLabel)
-> SubscriptionMetrics -> GaugeVector SubscriptionLabel
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics -> SubscriptionMetrics
pmSubscriptionMetrics (PrometheusMetrics -> SubscriptionMetrics)
-> PrometheusMetrics -> SubscriptionMetrics
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics
prometheusMetrics
                  IO GranularPrometheusMetricsState
-> Bool -> IO () -> IO () -> IO ()
forall (m :: * -> *).
MonadIO m =>
IO GranularPrometheusMetricsState -> Bool -> IO () -> IO () -> m ()
recordMetricWithLabel
                    IO GranularPrometheusMetricsState
granularPrometheusMetricsState
                    Bool
True
                    (GaugeVector SubscriptionLabel -> SubscriptionLabel -> IO ()
forall label. Ord label => GaugeVector label -> label -> IO ()
GaugeVector.dec GaugeVector SubscriptionLabel
numSubscriptionMetric SubscriptionLabel
promMetricGranularLabel)
                    (GaugeVector SubscriptionLabel -> SubscriptionLabel -> IO ()
forall label. Ord label => GaugeVector label -> label -> IO ()
GaugeVector.dec GaugeVector SubscriptionLabel
numSubscriptionMetric SubscriptionLabel
promMetricLabel)
              -- This would seem to imply addStreamSubscriptionQuery broke or a bug
              -- elsewhere. Be paranoid and log:
              Maybe Thread
Nothing ->
                Logger Hasura
-> forall a (m :: * -> *).
   (ToEngineLog a Hasura, MonadIO m) =>
   a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
   (ToEngineLog a impl, MonadIO m) =>
   a -> m ()
L.unLogger Logger Hasura
logger
                  (UnstructuredLog -> IO ()) -> UnstructuredLog -> IO ()
forall a b. (a -> b) -> a -> b
$ LogLevel -> SerializableBlob -> UnstructuredLog
L.UnstructuredLog LogLevel
L.LevelError
                  (SerializableBlob -> UnstructuredLog)
-> SerializableBlob -> UnstructuredLog
forall a b. (a -> b) -> a -> b
$ String -> SerializableBlob
forall a. IsString a => String -> a
fromString
                  (String -> SerializableBlob) -> String -> SerializableBlob
forall a b. (a -> b) -> a -> b
$ String
"In removeStreamingQuery no worker thread installed. Please report this as a bug: "
                  String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
" poller_id: "
                  String -> ShowS
forall a. Semigroup a => a -> a -> a
<> BackendPollerKey -> String
forall a. Show a => a -> String
show BackendPollerKey
handlerId
                  String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
", cohort_id: "
                  String -> ShowS
forall a. Semigroup a => a -> a -> a
<> CohortVariables -> String
forall a. Show a => a -> String
show CohortVariables
cohortId
                  String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
", subscriber_id:"
                  String -> ShowS
forall a. Semigroup a => a -> a -> a
<> SubscriberId -> String
forall a. Show a => a -> String
show SubscriberId
sinkId
        else do
          let numSubscriptionMetric :: GaugeVector SubscriptionLabel
numSubscriptionMetric = SubscriptionMetrics -> GaugeVector SubscriptionLabel
submActiveSubscriptions (SubscriptionMetrics -> GaugeVector SubscriptionLabel)
-> SubscriptionMetrics -> GaugeVector SubscriptionLabel
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics -> SubscriptionMetrics
pmSubscriptionMetrics (PrometheusMetrics -> SubscriptionMetrics)
-> PrometheusMetrics -> SubscriptionMetrics
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics
prometheusMetrics
          IO () -> STM (IO ())
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return
            (IO () -> STM (IO ())) -> IO () -> STM (IO ())
forall a b. (a -> b) -> a -> b
$ IO GranularPrometheusMetricsState
-> Bool -> IO () -> IO () -> IO ()
forall (m :: * -> *).
MonadIO m =>
IO GranularPrometheusMetricsState -> Bool -> IO () -> IO () -> m ()
recordMetricWithLabel
              IO GranularPrometheusMetricsState
granularPrometheusMetricsState
              Bool
True
              (GaugeVector SubscriptionLabel -> SubscriptionLabel -> IO ()
forall label. Ord label => GaugeVector label -> label -> IO ()
GaugeVector.dec GaugeVector SubscriptionLabel
numSubscriptionMetric SubscriptionLabel
promMetricGranularLabel)
              (GaugeVector SubscriptionLabel -> SubscriptionLabel -> IO ()
forall label. Ord label => GaugeVector label -> label -> IO ()
GaugeVector.dec GaugeVector SubscriptionLabel
numSubscriptionMetric SubscriptionLabel
promMetricLabel)

-- | An async action query whose relationships are referred to table in a source.
-- We need to generate an SQL statement with the action response and execute it
-- in the source database so as to fetch response joined with relationship rows.
-- For more details see Note [Resolving async action query]
data LiveAsyncActionQueryOnSource = LiveAsyncActionQueryOnSource
  { LiveAsyncActionQueryOnSource -> LiveQuerySubscriberDetails
_laaqpCurrentLqId :: !LiveQuerySubscriberDetails,
    LiveAsyncActionQueryOnSource -> ActionLogResponseMap
_laaqpPrevActionLogMap :: !ActionLogResponseMap,
    -- | An IO action to restart the live query poller with updated action log responses fetched from metadata storage
    -- Restarting a live query re-generates the SQL statement with new action log responses to send latest action
    -- response to the client.
    LiveAsyncActionQueryOnSource
-> LiveQuerySubscriberDetails
-> ActionLogResponseMap
-> IO (Maybe LiveQuerySubscriberDetails)
_laaqpRestartLq :: !(LiveQuerySubscriberDetails -> ActionLogResponseMap -> IO (Maybe LiveQuerySubscriberDetails))
  }

data LiveAsyncActionQueryWithNoRelationships = LiveAsyncActionQueryWithNoRelationships
  { -- | An IO action to send response to the websocket client
    LiveAsyncActionQueryWithNoRelationships
-> ActionLogResponseMap -> IO ()
_laaqwnrSendResponse :: !(ActionLogResponseMap -> IO ()),
    -- | An IO action to send "completed" message to the websocket client
    LiveAsyncActionQueryWithNoRelationships -> IO ()
_laaqwnrSendCompleted :: !(IO ())
  }

data LiveAsyncActionQuery
  = LAAQNoRelationships LiveAsyncActionQueryWithNoRelationships
  | LAAQOnSourceDB LiveAsyncActionQueryOnSource

data AsyncActionQueryLive = AsyncActionQueryLive
  { AsyncActionQueryLive -> NonEmpty ActionId
_aaqlActionIds :: NonEmpty ActionId,
    -- | An IO action to send error message (in case of any exception) to the websocket client
    AsyncActionQueryLive -> QErr -> IO ()
_aaqlOnException :: (QErr -> IO ()),
    AsyncActionQueryLive -> LiveAsyncActionQuery
_aaqlLiveExecution :: LiveAsyncActionQuery
  }

-- | A share-able state map which stores an async action live query with it's subscription operation id
type AsyncActionSubscriptionState = TMap.TMap OperationId AsyncActionQueryLive

addAsyncActionLiveQuery ::
  AsyncActionSubscriptionState ->
  OperationId ->
  NonEmpty ActionId ->
  (QErr -> IO ()) ->
  LiveAsyncActionQuery ->
  IO ()
addAsyncActionLiveQuery :: AsyncActionSubscriptionState
-> OperationId
-> NonEmpty ActionId
-> (QErr -> IO ())
-> LiveAsyncActionQuery
-> IO ()
addAsyncActionLiveQuery AsyncActionSubscriptionState
queriesState OperationId
opId NonEmpty ActionId
actionIds QErr -> IO ()
onException LiveAsyncActionQuery
liveQuery =
  STM () -> IO ()
forall a. STM a -> IO a
STM.atomically
    (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ AsyncActionQueryLive
-> OperationId -> AsyncActionSubscriptionState -> STM ()
forall k v. Hashable k => v -> k -> TMap k v -> STM ()
TMap.insert (NonEmpty ActionId
-> (QErr -> IO ()) -> LiveAsyncActionQuery -> AsyncActionQueryLive
AsyncActionQueryLive NonEmpty ActionId
actionIds QErr -> IO ()
onException LiveAsyncActionQuery
liveQuery) OperationId
opId AsyncActionSubscriptionState
queriesState

removeAsyncActionLiveQuery ::
  AsyncActionSubscriptionState -> OperationId -> IO ()
removeAsyncActionLiveQuery :: AsyncActionSubscriptionState -> OperationId -> IO ()
removeAsyncActionLiveQuery AsyncActionSubscriptionState
queriesState OperationId
opId =
  STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ OperationId -> AsyncActionSubscriptionState -> STM ()
forall k v. Hashable k => k -> TMap k v -> STM ()
TMap.delete OperationId
opId AsyncActionSubscriptionState
queriesState