{-# LANGUAGE TemplateHaskell #-}

-- | Top-level management of subscription poller threads.
-- The implementation of the polling itself is
-- in "Hasura.GraphQL.Execute.Subscription.Poll". See "Hasura.GraphQL.Execute.Subscription" for high-level
-- details.
module Hasura.GraphQL.Execute.Subscription.State
  ( SubscriptionsState (..),
    initSubscriptionsState,
    dumpSubscriptionsState,
    SubscriberDetails,
    SubscriptionPostPollHook,
    addLiveQuery,
    addStreamSubscriptionQuery,
    removeLiveQuery,
    removeStreamingQuery,
    LiveAsyncActionQueryOnSource (..),
    LiveAsyncActionQueryWithNoRelationships (..),
    LiveAsyncActionQuery (..),
    AsyncActionQueryLive (..),
    AsyncActionSubscriptionState,
    addAsyncActionLiveQuery,
    removeAsyncActionLiveQuery,
    LiveQuerySubscriberDetails,
    StreamingSubscriberDetails,
  )
where

import Control.Concurrent.Extended (forkImmortal, sleep)
import Control.Concurrent.STM qualified as STM
import Control.Exception (mask_)
import Control.Immortal qualified as Immortal
import Data.Aeson.Extended qualified as J
import Data.String
import Data.Text.Extended
import Data.UUID.V4 qualified as UUID
import GHC.AssertNF.CPP
import Hasura.Base.Error
import Hasura.GraphQL.Execute.Backend
import Hasura.GraphQL.Execute.Subscription.Options
import Hasura.GraphQL.Execute.Subscription.Plan
import Hasura.GraphQL.Execute.Subscription.Poll
import Hasura.GraphQL.Execute.Subscription.TMap qualified as TMap
import Hasura.GraphQL.ParameterizedQueryHash (ParameterizedQueryHash)
import Hasura.GraphQL.Transport.Backend
import Hasura.GraphQL.Transport.HTTP.Protocol (OperationName)
import Hasura.GraphQL.Transport.WebSocket.Protocol (OperationId)
import Hasura.Logging qualified as L
import Hasura.Prelude
import Hasura.RQL.Types.Action
import Hasura.RQL.Types.Common (SourceName)
import Hasura.RQL.Types.Numeric qualified as Numeric
import Hasura.Server.Metrics (ServerMetrics (..))
import Hasura.Server.Prometheus (PrometheusMetrics (..))
import Hasura.Server.Types (RequestId)
import Language.GraphQL.Draft.Syntax qualified as G
import StmContainers.Map qualified as STMMap
import System.Metrics.Gauge qualified as EKG.Gauge
import System.Metrics.Prometheus.Gauge qualified as Prometheus.Gauge

-- | The top-level datatype that holds the state for all active subscriptions.
--
-- NOTE!: This must be kept consistent with a websocket connection's
-- 'OperationMap', in 'onClose' and 'onStart'.
data SubscriptionsState = SubscriptionsState
  { SubscriptionsState -> LiveQueriesOptions
_ssLiveQueryOptions :: LiveQueriesOptions,
    SubscriptionsState -> LiveQueriesOptions
_ssStreamQueryOptions :: StreamQueriesOptions,
    SubscriptionsState -> PollerMap ()
_ssLiveQueryMap :: PollerMap (),
    SubscriptionsState -> PollerMap (TVar CursorVariableValues)
_ssStreamQueryMap :: PollerMap (STM.TVar CursorVariableValues),
    -- | A hook function which is run after each fetch cycle
    SubscriptionsState -> SubscriptionPostPollHook
_ssPostPollHook :: SubscriptionPostPollHook,
    SubscriptionsState -> AsyncActionSubscriptionState
_ssAsyncActions :: AsyncActionSubscriptionState
  }

initSubscriptionsState ::
  LiveQueriesOptions -> StreamQueriesOptions -> SubscriptionPostPollHook -> IO SubscriptionsState
initSubscriptionsState :: LiveQueriesOptions
-> LiveQueriesOptions
-> SubscriptionPostPollHook
-> IO SubscriptionsState
initSubscriptionsState LiveQueriesOptions
liveQOptions LiveQueriesOptions
streamQOptions SubscriptionPostPollHook
pollHook =
  STM SubscriptionsState -> IO SubscriptionsState
forall a. STM a -> IO a
STM.atomically (STM SubscriptionsState -> IO SubscriptionsState)
-> STM SubscriptionsState -> IO SubscriptionsState
forall a b. (a -> b) -> a -> b
$
    LiveQueriesOptions
-> LiveQueriesOptions
-> PollerMap ()
-> PollerMap (TVar CursorVariableValues)
-> SubscriptionPostPollHook
-> AsyncActionSubscriptionState
-> SubscriptionsState
SubscriptionsState LiveQueriesOptions
liveQOptions (LiveQueriesOptions
 -> PollerMap ()
 -> PollerMap (TVar CursorVariableValues)
 -> SubscriptionPostPollHook
 -> AsyncActionSubscriptionState
 -> SubscriptionsState)
-> STM LiveQueriesOptions
-> STM
     (PollerMap ()
      -> PollerMap (TVar CursorVariableValues)
      -> SubscriptionPostPollHook
      -> AsyncActionSubscriptionState
      -> SubscriptionsState)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> LiveQueriesOptions -> STM LiveQueriesOptions
forall (f :: * -> *) a. Applicative f => a -> f a
pure LiveQueriesOptions
streamQOptions STM
  (PollerMap ()
   -> PollerMap (TVar CursorVariableValues)
   -> SubscriptionPostPollHook
   -> AsyncActionSubscriptionState
   -> SubscriptionsState)
-> STM (PollerMap ())
-> STM
     (PollerMap (TVar CursorVariableValues)
      -> SubscriptionPostPollHook
      -> AsyncActionSubscriptionState
      -> SubscriptionsState)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> STM (PollerMap ())
forall key value. STM (Map key value)
STMMap.new STM
  (PollerMap (TVar CursorVariableValues)
   -> SubscriptionPostPollHook
   -> AsyncActionSubscriptionState
   -> SubscriptionsState)
-> STM (PollerMap (TVar CursorVariableValues))
-> STM
     (SubscriptionPostPollHook
      -> AsyncActionSubscriptionState -> SubscriptionsState)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> STM (PollerMap (TVar CursorVariableValues))
forall key value. STM (Map key value)
STMMap.new STM
  (SubscriptionPostPollHook
   -> AsyncActionSubscriptionState -> SubscriptionsState)
-> STM SubscriptionPostPollHook
-> STM (AsyncActionSubscriptionState -> SubscriptionsState)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> SubscriptionPostPollHook -> STM SubscriptionPostPollHook
forall (f :: * -> *) a. Applicative f => a -> f a
pure SubscriptionPostPollHook
pollHook STM (AsyncActionSubscriptionState -> SubscriptionsState)
-> STM AsyncActionSubscriptionState -> STM SubscriptionsState
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> STM AsyncActionSubscriptionState
forall k v. STM (TMap k v)
TMap.new

dumpSubscriptionsState :: Bool -> SubscriptionsState -> IO J.Value
dumpSubscriptionsState :: Bool -> SubscriptionsState -> IO Value
dumpSubscriptionsState Bool
extended (SubscriptionsState LiveQueriesOptions
liveQOpts LiveQueriesOptions
streamQOpts PollerMap ()
lqMap PollerMap (TVar CursorVariableValues)
streamMap SubscriptionPostPollHook
_ AsyncActionSubscriptionState
_) = do
  Value
lqMapJ <- Bool -> PollerMap () -> IO Value
forall streamCursor. Bool -> PollerMap streamCursor -> IO Value
dumpPollerMap Bool
extended PollerMap ()
lqMap
  Value
streamMapJ <- Bool -> PollerMap (TVar CursorVariableValues) -> IO Value
forall streamCursor. Bool -> PollerMap streamCursor -> IO Value
dumpPollerMap Bool
extended PollerMap (TVar CursorVariableValues)
streamMap
  Value -> IO Value
forall (m :: * -> *) a. Monad m => a -> m a
return (Value -> IO Value) -> Value -> IO Value
forall a b. (a -> b) -> a -> b
$
    [Pair] -> Value
J.object
      [ Key
"options" Key -> LiveQueriesOptions -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= LiveQueriesOptions
liveQOpts,
        Key
"live_queries_map" Key -> Value -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= Value
lqMapJ,
        Key
"stream_queries_map" Key -> Value -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= Value
streamMapJ,
        Key
"stream_queries_options" Key -> LiveQueriesOptions -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= LiveQueriesOptions
streamQOpts
      ]

-- | SubscriberDetails contains the data required to locate a subscriber
--   in the correct cohort within the correct poller in the operation map.
data SubscriberDetails a = SubscriberDetails
  { SubscriberDetails a -> PollerKey
_sdPoller :: !PollerKey,
    SubscriberDetails a -> a
_sdCohort :: !a,
    SubscriberDetails a -> SubscriberId
_sdSubscriber :: !SubscriberId
  }
  deriving (Int -> SubscriberDetails a -> ShowS
[SubscriberDetails a] -> ShowS
SubscriberDetails a -> String
(Int -> SubscriberDetails a -> ShowS)
-> (SubscriberDetails a -> String)
-> ([SubscriberDetails a] -> ShowS)
-> Show (SubscriberDetails a)
forall a. Show a => Int -> SubscriberDetails a -> ShowS
forall a. Show a => [SubscriberDetails a] -> ShowS
forall a. Show a => SubscriberDetails a -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [SubscriberDetails a] -> ShowS
$cshowList :: forall a. Show a => [SubscriberDetails a] -> ShowS
show :: SubscriberDetails a -> String
$cshow :: forall a. Show a => SubscriberDetails a -> String
showsPrec :: Int -> SubscriberDetails a -> ShowS
$cshowsPrec :: forall a. Show a => Int -> SubscriberDetails a -> ShowS
Show)

type LiveQuerySubscriberDetails = SubscriberDetails CohortKey

-- | The `CohortKey` contains the variables with which the subscription was started
--   and which will remain unchanged. The second type contains the mutable reference
--   through which we can get the latest value of the cursor and using both the `CohortKey`
--   and the latest cursor value, we locate the subscriber in the operation map to find its
--   details and then stop it.
type StreamingSubscriberDetails = SubscriberDetails (CohortKey, STM.TVar CursorVariableValues)

-- | `findPollerForSubscriber` places a subscriber in the correct poller.
--   If the poller doesn't exist then we create one otherwise we return the
--   existing one.
findPollerForSubscriber ::
  Subscriber ->
  CohortId ->
  PollerMap streamCursorVars ->
  PollerKey ->
  CohortKey ->
  (Subscriber -> Cohort streamCursorVars -> STM.STM streamCursorVars) ->
  (Subscriber -> CohortId -> Poller streamCursorVars -> STM.STM streamCursorVars) ->
  STM.STM ((Maybe (Poller streamCursorVars)), streamCursorVars)
findPollerForSubscriber :: Subscriber
-> CohortId
-> PollerMap streamCursorVars
-> PollerKey
-> CohortKey
-> (Subscriber -> Cohort streamCursorVars -> STM streamCursorVars)
-> (Subscriber
    -> CohortId -> Poller streamCursorVars -> STM streamCursorVars)
-> STM (Maybe (Poller streamCursorVars), streamCursorVars)
findPollerForSubscriber Subscriber
subscriber CohortId
cohortId PollerMap streamCursorVars
pollerMap PollerKey
pollerKey CohortKey
cohortKey Subscriber -> Cohort streamCursorVars -> STM streamCursorVars
addToCohort Subscriber
-> CohortId -> Poller streamCursorVars -> STM streamCursorVars
addToPoller =
  -- a handler is returned only when it is newly created
  PollerKey
-> PollerMap streamCursorVars
-> STM (Maybe (Poller streamCursorVars))
forall key value.
(Eq key, Hashable key) =>
key -> Map key value -> STM (Maybe value)
STMMap.lookup PollerKey
pollerKey PollerMap streamCursorVars
pollerMap STM (Maybe (Poller streamCursorVars))
-> (Maybe (Poller streamCursorVars)
    -> STM (Maybe (Poller streamCursorVars), streamCursorVars))
-> STM (Maybe (Poller streamCursorVars), streamCursorVars)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Just Poller streamCursorVars
poller -> do
      -- Found a poller, now check if a cohort also exists
      streamCursorVars
cursorVars <-
        CohortKey
-> TMap CohortKey (Cohort streamCursorVars)
-> STM (Maybe (Cohort streamCursorVars))
forall k v. (Eq k, Hashable k) => k -> TMap k v -> STM (Maybe v)
TMap.lookup CohortKey
cohortKey (Poller streamCursorVars -> TMap CohortKey (Cohort streamCursorVars)
forall streamCursor. Poller streamCursor -> CohortMap streamCursor
_pCohorts Poller streamCursorVars
poller) STM (Maybe (Cohort streamCursorVars))
-> (Maybe (Cohort streamCursorVars) -> STM streamCursorVars)
-> STM streamCursorVars
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
          -- cohort found too! Simply add the subscriber to the cohort
          Just Cohort streamCursorVars
cohort -> Subscriber -> Cohort streamCursorVars -> STM streamCursorVars
addToCohort Subscriber
subscriber Cohort streamCursorVars
cohort
          -- cohort not found. Create a cohort with the subscriber and add
          -- the cohort to the poller
          Maybe (Cohort streamCursorVars)
Nothing -> Subscriber
-> CohortId -> Poller streamCursorVars -> STM streamCursorVars
addToPoller Subscriber
subscriber CohortId
cohortId Poller streamCursorVars
poller
      (Maybe (Poller streamCursorVars), streamCursorVars)
-> STM (Maybe (Poller streamCursorVars), streamCursorVars)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (Poller streamCursorVars)
forall a. Maybe a
Nothing, streamCursorVars
cursorVars)
    Maybe (Poller streamCursorVars)
Nothing -> do
      -- no poller found, so create one with the cohort
      -- and the subscriber within it.
      !Poller streamCursorVars
poller <- TMap CohortKey (Cohort streamCursorVars)
-> TMVar PollerIOState -> Poller streamCursorVars
forall streamCursor.
CohortMap streamCursor
-> TMVar PollerIOState -> Poller streamCursor
Poller (TMap CohortKey (Cohort streamCursorVars)
 -> TMVar PollerIOState -> Poller streamCursorVars)
-> STM (TMap CohortKey (Cohort streamCursorVars))
-> STM (TMVar PollerIOState -> Poller streamCursorVars)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> STM (TMap CohortKey (Cohort streamCursorVars))
forall k v. STM (TMap k v)
TMap.new STM (TMVar PollerIOState -> Poller streamCursorVars)
-> STM (TMVar PollerIOState) -> STM (Poller streamCursorVars)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> STM (TMVar PollerIOState)
forall a. STM (TMVar a)
STM.newEmptyTMVar
      streamCursorVars
cursorVars <- Subscriber
-> CohortId -> Poller streamCursorVars -> STM streamCursorVars
addToPoller Subscriber
subscriber CohortId
cohortId Poller streamCursorVars
poller
      Poller streamCursorVars
-> PollerKey -> PollerMap streamCursorVars -> STM ()
forall key value.
(Eq key, Hashable key) =>
value -> key -> Map key value -> STM ()
STMMap.insert Poller streamCursorVars
poller PollerKey
pollerKey PollerMap streamCursorVars
pollerMap
      (Maybe (Poller streamCursorVars), streamCursorVars)
-> STM (Maybe (Poller streamCursorVars), streamCursorVars)
forall (m :: * -> *) a. Monad m => a -> m a
return ((Maybe (Poller streamCursorVars), streamCursorVars)
 -> STM (Maybe (Poller streamCursorVars), streamCursorVars))
-> (Maybe (Poller streamCursorVars), streamCursorVars)
-> STM (Maybe (Poller streamCursorVars), streamCursorVars)
forall a b. (a -> b) -> a -> b
$ (Poller streamCursorVars -> Maybe (Poller streamCursorVars)
forall a. a -> Maybe a
Just Poller streamCursorVars
poller, streamCursorVars
cursorVars)

-- | Fork a thread handling a regular (live query) subscription
addLiveQuery ::
  forall b.
  BackendTransport b =>
  L.Logger L.Hasura ->
  ServerMetrics ->
  PrometheusMetrics ->
  SubscriberMetadata ->
  SubscriptionsState ->
  SourceName ->
  ParameterizedQueryHash ->
  -- | operation name of the query
  Maybe OperationName ->
  RequestId ->
  SubscriptionQueryPlan b (MultiplexedQuery b) ->
  -- | the action to be executed when result changes
  OnChange ->
  IO LiveQuerySubscriberDetails
addLiveQuery :: Logger Hasura
-> ServerMetrics
-> PrometheusMetrics
-> SubscriberMetadata
-> SubscriptionsState
-> SourceName
-> ParameterizedQueryHash
-> Maybe OperationName
-> RequestId
-> SubscriptionQueryPlan b (MultiplexedQuery b)
-> OnChange
-> IO LiveQuerySubscriberDetails
addLiveQuery
  Logger Hasura
logger
  ServerMetrics
serverMetrics
  PrometheusMetrics
prometheusMetrics
  SubscriberMetadata
subscriberMetadata
  SubscriptionsState
subscriptionState
  SourceName
source
  ParameterizedQueryHash
parameterizedQueryHash
  Maybe OperationName
operationName
  RequestId
requestId
  SubscriptionQueryPlan b (MultiplexedQuery b)
plan
  OnChange
onResultAction = do
    -- CAREFUL!: It's absolutely crucial that we can't throw any exceptions here!

    -- disposable UUIDs:
    CohortId
cohortId <- IO CohortId
forall (m :: * -> *). MonadIO m => m CohortId
newCohortId
    SubscriberId
subscriberId <- IO SubscriberId
newSubscriberId

    let !subscriber :: Subscriber
subscriber = SubscriberId
-> SubscriberMetadata
-> RequestId
-> Maybe OperationName
-> OnChange
-> Subscriber
Subscriber SubscriberId
subscriberId SubscriberMetadata
subscriberMetadata RequestId
requestId Maybe OperationName
operationName OnChange
onResultAction

    String
String -> Subscriber -> IO ()
forall a. String -> a -> IO ()
$assertNFHere Subscriber
subscriber -- so we don't write thunks to mutable vars
    (Maybe (Poller ())
pollerMaybe, ()) <-
      STM (Maybe (Poller ()), ()) -> IO (Maybe (Poller ()), ())
forall a. STM a -> IO a
STM.atomically (STM (Maybe (Poller ()), ()) -> IO (Maybe (Poller ()), ()))
-> STM (Maybe (Poller ()), ()) -> IO (Maybe (Poller ()), ())
forall a b. (a -> b) -> a -> b
$
        Subscriber
-> CohortId
-> PollerMap ()
-> PollerKey
-> CohortKey
-> (Subscriber -> Cohort () -> STM ())
-> (Subscriber -> CohortId -> Poller () -> STM ())
-> STM (Maybe (Poller ()), ())
forall streamCursorVars.
Subscriber
-> CohortId
-> PollerMap streamCursorVars
-> PollerKey
-> CohortKey
-> (Subscriber -> Cohort streamCursorVars -> STM streamCursorVars)
-> (Subscriber
    -> CohortId -> Poller streamCursorVars -> STM streamCursorVars)
-> STM (Maybe (Poller streamCursorVars), streamCursorVars)
findPollerForSubscriber
          Subscriber
subscriber
          CohortId
cohortId
          PollerMap ()
lqMap
          PollerKey
handlerId
          CohortKey
cohortKey
          Subscriber -> Cohort () -> STM ()
forall streamCursorVars.
Subscriber -> Cohort streamCursorVars -> STM ()
addToCohort
          Subscriber -> CohortId -> Poller () -> STM ()
addToPoller

    -- we can then attach a polling thread if it is new the livequery can only be
    -- cancelled after putTMVar
    Maybe (Poller ()) -> (Poller () -> IO ()) -> IO ()
forall (m :: * -> *) a.
Applicative m =>
Maybe a -> (a -> m ()) -> m ()
onJust Maybe (Poller ())
pollerMaybe ((Poller () -> IO ()) -> IO ()) -> (Poller () -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Poller ()
poller -> do
      PollerId
pollerId <- UUID -> PollerId
PollerId (UUID -> PollerId) -> IO UUID -> IO PollerId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
      Thread
threadRef <- String -> Logger Hasura -> IO Void -> IO Thread
forall (m :: * -> *).
ForkableMonadIO m =>
String -> Logger Hasura -> m Void -> m Thread
forkImmortal (String
"pollLiveQuery." String -> ShowS
forall a. Semigroup a => a -> a -> a
<> PollerId -> String
forall a. Show a => a -> String
show PollerId
pollerId) Logger Hasura
logger (IO Void -> IO Thread) -> IO Void -> IO Thread
forall a b. (a -> b) -> a -> b
$
        IO () -> IO Void
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO Void) -> IO () -> IO Void
forall a b. (a -> b) -> a -> b
$ do
          PollerId
-> LiveQueriesOptions
-> (SourceName, SourceConfig b)
-> RoleName
-> ParameterizedQueryHash
-> MultiplexedQuery b
-> CohortMap 'LiveQuery
-> SubscriptionPostPollHook
-> IO ()
forall (b :: BackendType).
BackendTransport b =>
PollerId
-> LiveQueriesOptions
-> (SourceName, SourceConfig b)
-> RoleName
-> ParameterizedQueryHash
-> MultiplexedQuery b
-> CohortMap 'LiveQuery
-> SubscriptionPostPollHook
-> IO ()
pollLiveQuery @b PollerId
pollerId LiveQueriesOptions
lqOpts (SourceName
source, SourceConfig b
sourceConfig) RoleName
role ParameterizedQueryHash
parameterizedQueryHash MultiplexedQuery b
query (Poller () -> CohortMap ()
forall streamCursor. Poller streamCursor -> CohortMap streamCursor
_pCohorts Poller ()
poller) SubscriptionPostPollHook
postPollHook
          DiffTime -> IO ()
sleep (DiffTime -> IO ()) -> DiffTime -> IO ()
forall a b. (a -> b) -> a -> b
$ NonNegativeDiffTime -> DiffTime
Numeric.unNonNegativeDiffTime (NonNegativeDiffTime -> DiffTime)
-> NonNegativeDiffTime -> DiffTime
forall a b. (a -> b) -> a -> b
$ RefetchInterval -> NonNegativeDiffTime
unRefetchInterval RefetchInterval
refetchInterval
      let !pState :: PollerIOState
pState = Thread -> PollerId -> PollerIOState
PollerIOState Thread
threadRef PollerId
pollerId
      String
String -> PollerIOState -> IO ()
forall a. String -> a -> IO ()
$assertNFHere PollerIOState
pState -- so we don't write thunks to mutable vars
      STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar PollerIOState -> PollerIOState -> STM ()
forall a. TMVar a -> a -> STM ()
STM.putTMVar (Poller () -> TMVar PollerIOState
forall streamCursor. Poller streamCursor -> TMVar PollerIOState
_pIOState Poller ()
poller) PollerIOState
pState

    IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Gauge -> IO ()
EKG.Gauge.inc (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ ServerMetrics -> Gauge
smActiveSubscriptions ServerMetrics
serverMetrics
    IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Gauge -> IO ()
Prometheus.Gauge.inc (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics -> Gauge
pmActiveSubscriptions PrometheusMetrics
prometheusMetrics
    IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Gauge -> IO ()
EKG.Gauge.inc (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ ServerMetrics -> Gauge
smActiveLiveQueries ServerMetrics
serverMetrics

    LiveQuerySubscriberDetails -> IO LiveQuerySubscriberDetails
forall (f :: * -> *) a. Applicative f => a -> f a
pure (LiveQuerySubscriberDetails -> IO LiveQuerySubscriberDetails)
-> LiveQuerySubscriberDetails -> IO LiveQuerySubscriberDetails
forall a b. (a -> b) -> a -> b
$ PollerKey
-> CohortKey -> SubscriberId -> LiveQuerySubscriberDetails
forall a. PollerKey -> a -> SubscriberId -> SubscriberDetails a
SubscriberDetails PollerKey
handlerId CohortKey
cohortKey SubscriberId
subscriberId
    where
      SubscriptionsState LiveQueriesOptions
lqOpts LiveQueriesOptions
_ PollerMap ()
lqMap PollerMap (TVar CursorVariableValues)
_ SubscriptionPostPollHook
postPollHook AsyncActionSubscriptionState
_ = SubscriptionsState
subscriptionState
      SubscriptionsOptions BatchSize
_ RefetchInterval
refetchInterval = LiveQueriesOptions
lqOpts
      SubscriptionQueryPlan (ParameterizedSubscriptionQueryPlan RoleName
role MultiplexedQuery b
query) SourceConfig b
sourceConfig CohortKey
cohortKey Maybe Name
_ = SubscriptionQueryPlan b (MultiplexedQuery b)
plan

      handlerId :: PollerKey
handlerId = SourceName -> RoleName -> Text -> PollerKey
PollerKey SourceName
source RoleName
role (Text -> PollerKey) -> Text -> PollerKey
forall a b. (a -> b) -> a -> b
$ MultiplexedQuery b -> Text
forall a. ToTxt a => a -> Text
toTxt MultiplexedQuery b
query

      addToCohort :: Subscriber -> Cohort streamCursorVars -> STM ()
addToCohort Subscriber
subscriber Cohort streamCursorVars
handlerC =
        Subscriber
-> SubscriberId -> TMap SubscriberId Subscriber -> STM ()
forall k v. (Eq k, Hashable k) => v -> k -> TMap k v -> STM ()
TMap.insert Subscriber
subscriber (Subscriber -> SubscriberId
_sId Subscriber
subscriber) (TMap SubscriberId Subscriber -> STM ())
-> TMap SubscriberId Subscriber -> STM ()
forall a b. (a -> b) -> a -> b
$ Cohort streamCursorVars -> TMap SubscriberId Subscriber
forall streamCursorVars.
Cohort streamCursorVars -> TMap SubscriberId Subscriber
_cNewSubscribers Cohort streamCursorVars
handlerC

      addToPoller :: Subscriber -> CohortId -> Poller () -> STM ()
addToPoller Subscriber
subscriber CohortId
cohortId Poller ()
handler = do
        !Cohort ()
newCohort <-
          CohortId
-> TVar (Maybe ResponseHash)
-> TMap SubscriberId Subscriber
-> TMap SubscriberId Subscriber
-> ()
-> Cohort ()
forall streamCursorVars.
CohortId
-> TVar (Maybe ResponseHash)
-> TMap SubscriberId Subscriber
-> TMap SubscriberId Subscriber
-> streamCursorVars
-> Cohort streamCursorVars
Cohort CohortId
cohortId
            (TVar (Maybe ResponseHash)
 -> TMap SubscriberId Subscriber
 -> TMap SubscriberId Subscriber
 -> ()
 -> Cohort ())
-> STM (TVar (Maybe ResponseHash))
-> STM
     (TMap SubscriberId Subscriber
      -> TMap SubscriberId Subscriber -> () -> Cohort ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe ResponseHash -> STM (TVar (Maybe ResponseHash))
forall a. a -> STM (TVar a)
STM.newTVar Maybe ResponseHash
forall a. Maybe a
Nothing
            STM
  (TMap SubscriberId Subscriber
   -> TMap SubscriberId Subscriber -> () -> Cohort ())
-> STM (TMap SubscriberId Subscriber)
-> STM (TMap SubscriberId Subscriber -> () -> Cohort ())
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> STM (TMap SubscriberId Subscriber)
forall k v. STM (TMap k v)
TMap.new
            STM (TMap SubscriberId Subscriber -> () -> Cohort ())
-> STM (TMap SubscriberId Subscriber) -> STM (() -> Cohort ())
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> STM (TMap SubscriberId Subscriber)
forall k v. STM (TMap k v)
TMap.new
            STM (() -> Cohort ()) -> STM () -> STM (Cohort ())
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> () -> STM ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
        Subscriber -> Cohort () -> STM ()
forall streamCursorVars.
Subscriber -> Cohort streamCursorVars -> STM ()
addToCohort Subscriber
subscriber Cohort ()
newCohort
        Cohort () -> CohortKey -> CohortMap () -> STM ()
forall k v. (Eq k, Hashable k) => v -> k -> TMap k v -> STM ()
TMap.insert Cohort ()
newCohort CohortKey
cohortKey (CohortMap () -> STM ()) -> CohortMap () -> STM ()
forall a b. (a -> b) -> a -> b
$ Poller () -> CohortMap ()
forall streamCursor. Poller streamCursor -> CohortMap streamCursor
_pCohorts Poller ()
handler

-- | Fork a thread handling a streaming subscription
addStreamSubscriptionQuery ::
  forall b.
  BackendTransport b =>
  L.Logger L.Hasura ->
  ServerMetrics ->
  PrometheusMetrics ->
  SubscriberMetadata ->
  SubscriptionsState ->
  SourceName ->
  ParameterizedQueryHash ->
  -- | operation name of the query
  Maybe OperationName ->
  RequestId ->
  -- | root field name
  G.Name ->
  SubscriptionQueryPlan b (MultiplexedQuery b) ->
  -- | the action to be executed when result changes
  OnChange ->
  IO StreamingSubscriberDetails
addStreamSubscriptionQuery :: Logger Hasura
-> ServerMetrics
-> PrometheusMetrics
-> SubscriberMetadata
-> SubscriptionsState
-> SourceName
-> ParameterizedQueryHash
-> Maybe OperationName
-> RequestId
-> Name
-> SubscriptionQueryPlan b (MultiplexedQuery b)
-> OnChange
-> IO StreamingSubscriberDetails
addStreamSubscriptionQuery
  Logger Hasura
logger
  ServerMetrics
serverMetrics
  PrometheusMetrics
prometheusMetrics
  SubscriberMetadata
subscriberMetadata
  SubscriptionsState
subscriptionState
  SourceName
source
  ParameterizedQueryHash
parameterizedQueryHash
  Maybe OperationName
operationName
  RequestId
requestId
  Name
rootFieldName
  SubscriptionQueryPlan b (MultiplexedQuery b)
plan
  OnChange
onResultAction = do
    -- CAREFUL!: It's absolutely crucial that we can't throw any exceptions here!

    -- disposable UUIDs:
    CohortId
cohortId <- IO CohortId
forall (m :: * -> *). MonadIO m => m CohortId
newCohortId
    SubscriberId
subscriberId <- IO SubscriberId
newSubscriberId

    let !subscriber :: Subscriber
subscriber = SubscriberId
-> SubscriberMetadata
-> RequestId
-> Maybe OperationName
-> OnChange
-> Subscriber
Subscriber SubscriberId
subscriberId SubscriberMetadata
subscriberMetadata RequestId
requestId Maybe OperationName
operationName OnChange
onResultAction

    String
String -> Subscriber -> IO ()
forall a. String -> a -> IO ()
$assertNFHere Subscriber
subscriber -- so we don't write thunks to mutable vars
    (Maybe (Poller (TVar CursorVariableValues))
handlerM, TVar CursorVariableValues
cohortCursorTVar) <-
      STM
  (Maybe (Poller (TVar CursorVariableValues)),
   TVar CursorVariableValues)
-> IO
     (Maybe (Poller (TVar CursorVariableValues)),
      TVar CursorVariableValues)
forall a. STM a -> IO a
STM.atomically (STM
   (Maybe (Poller (TVar CursorVariableValues)),
    TVar CursorVariableValues)
 -> IO
      (Maybe (Poller (TVar CursorVariableValues)),
       TVar CursorVariableValues))
-> STM
     (Maybe (Poller (TVar CursorVariableValues)),
      TVar CursorVariableValues)
-> IO
     (Maybe (Poller (TVar CursorVariableValues)),
      TVar CursorVariableValues)
forall a b. (a -> b) -> a -> b
$
        Subscriber
-> CohortId
-> PollerMap (TVar CursorVariableValues)
-> PollerKey
-> CohortKey
-> (Subscriber
    -> Cohort (TVar CursorVariableValues)
    -> STM (TVar CursorVariableValues))
-> (Subscriber
    -> CohortId
    -> Poller (TVar CursorVariableValues)
    -> STM (TVar CursorVariableValues))
-> STM
     (Maybe (Poller (TVar CursorVariableValues)),
      TVar CursorVariableValues)
forall streamCursorVars.
Subscriber
-> CohortId
-> PollerMap streamCursorVars
-> PollerKey
-> CohortKey
-> (Subscriber -> Cohort streamCursorVars -> STM streamCursorVars)
-> (Subscriber
    -> CohortId -> Poller streamCursorVars -> STM streamCursorVars)
-> STM (Maybe (Poller streamCursorVars), streamCursorVars)
findPollerForSubscriber
          Subscriber
subscriber
          CohortId
cohortId
          PollerMap (TVar CursorVariableValues)
streamQueryMap
          PollerKey
handlerId
          CohortKey
cohortKey
          Subscriber
-> Cohort (TVar CursorVariableValues)
-> STM (TVar CursorVariableValues)
forall b. Subscriber -> Cohort b -> STM b
addToCohort
          Subscriber
-> CohortId
-> Poller (TVar CursorVariableValues)
-> STM (TVar CursorVariableValues)
addToPoller

    -- we can then attach a polling thread if it is new the subscription can only be
    -- cancelled after putTMVar
    Maybe (Poller (TVar CursorVariableValues))
-> (Poller (TVar CursorVariableValues) -> IO ()) -> IO ()
forall (m :: * -> *) a.
Applicative m =>
Maybe a -> (a -> m ()) -> m ()
onJust Maybe (Poller (TVar CursorVariableValues))
handlerM ((Poller (TVar CursorVariableValues) -> IO ()) -> IO ())
-> (Poller (TVar CursorVariableValues) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Poller (TVar CursorVariableValues)
handler -> do
      PollerId
pollerId <- UUID -> PollerId
PollerId (UUID -> PollerId) -> IO UUID -> IO PollerId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
      Thread
threadRef <- String -> Logger Hasura -> IO Void -> IO Thread
forall (m :: * -> *).
ForkableMonadIO m =>
String -> Logger Hasura -> m Void -> m Thread
forkImmortal (String
"pollStreamingQuery." String -> ShowS
forall a. Semigroup a => a -> a -> a
<> UUID -> String
forall a. Show a => a -> String
show (PollerId -> UUID
unPollerId PollerId
pollerId)) Logger Hasura
logger (IO Void -> IO Thread) -> IO Void -> IO Thread
forall a b. (a -> b) -> a -> b
$
        IO () -> IO Void
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO Void) -> IO () -> IO Void
forall a b. (a -> b) -> a -> b
$ do
          PollerId
-> LiveQueriesOptions
-> (SourceName, SourceConfig b)
-> RoleName
-> ParameterizedQueryHash
-> MultiplexedQuery b
-> CohortMap 'Streaming
-> Name
-> SubscriptionPostPollHook
-> Maybe (IO ())
-> IO ()
forall (b :: BackendType).
BackendTransport b =>
PollerId
-> LiveQueriesOptions
-> (SourceName, SourceConfig b)
-> RoleName
-> ParameterizedQueryHash
-> MultiplexedQuery b
-> CohortMap 'Streaming
-> Name
-> SubscriptionPostPollHook
-> Maybe (IO ())
-> IO ()
pollStreamingQuery @b PollerId
pollerId LiveQueriesOptions
streamQOpts (SourceName
source, SourceConfig b
sourceConfig) RoleName
role ParameterizedQueryHash
parameterizedQueryHash MultiplexedQuery b
query (Poller (TVar CursorVariableValues)
-> CohortMap (TVar CursorVariableValues)
forall streamCursor. Poller streamCursor -> CohortMap streamCursor
_pCohorts Poller (TVar CursorVariableValues)
handler) Name
rootFieldName SubscriptionPostPollHook
postPollHook Maybe (IO ())
forall a. Maybe a
Nothing
          DiffTime -> IO ()
sleep (DiffTime -> IO ()) -> DiffTime -> IO ()
forall a b. (a -> b) -> a -> b
$ NonNegativeDiffTime -> DiffTime
Numeric.unNonNegativeDiffTime (NonNegativeDiffTime -> DiffTime)
-> NonNegativeDiffTime -> DiffTime
forall a b. (a -> b) -> a -> b
$ RefetchInterval -> NonNegativeDiffTime
unRefetchInterval RefetchInterval
refetchInterval
      let !pState :: PollerIOState
pState = Thread -> PollerId -> PollerIOState
PollerIOState Thread
threadRef PollerId
pollerId
      String
String -> PollerIOState -> IO ()
forall a. String -> a -> IO ()
$assertNFHere PollerIOState
pState -- so we don't write thunks to mutable vars
      STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar PollerIOState -> PollerIOState -> STM ()
forall a. TMVar a -> a -> STM ()
STM.putTMVar (Poller (TVar CursorVariableValues) -> TMVar PollerIOState
forall streamCursor. Poller streamCursor -> TMVar PollerIOState
_pIOState Poller (TVar CursorVariableValues)
handler) PollerIOState
pState

    IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Gauge -> IO ()
EKG.Gauge.inc (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ ServerMetrics -> Gauge
smActiveSubscriptions ServerMetrics
serverMetrics
    IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Gauge -> IO ()
Prometheus.Gauge.inc (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics -> Gauge
pmActiveSubscriptions PrometheusMetrics
prometheusMetrics
    IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Gauge -> IO ()
EKG.Gauge.inc (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ ServerMetrics -> Gauge
smActiveStreamingSubscriptions ServerMetrics
serverMetrics

    StreamingSubscriberDetails -> IO StreamingSubscriberDetails
forall (f :: * -> *) a. Applicative f => a -> f a
pure (StreamingSubscriberDetails -> IO StreamingSubscriberDetails)
-> StreamingSubscriberDetails -> IO StreamingSubscriberDetails
forall a b. (a -> b) -> a -> b
$ PollerKey
-> (CohortKey, TVar CursorVariableValues)
-> SubscriberId
-> StreamingSubscriberDetails
forall a. PollerKey -> a -> SubscriberId -> SubscriberDetails a
SubscriberDetails PollerKey
handlerId (CohortKey
cohortKey, TVar CursorVariableValues
cohortCursorTVar) SubscriberId
subscriberId
    where
      SubscriptionsState LiveQueriesOptions
_ LiveQueriesOptions
streamQOpts PollerMap ()
_ PollerMap (TVar CursorVariableValues)
streamQueryMap SubscriptionPostPollHook
postPollHook AsyncActionSubscriptionState
_ = SubscriptionsState
subscriptionState
      SubscriptionsOptions BatchSize
_ RefetchInterval
refetchInterval = LiveQueriesOptions
streamQOpts
      SubscriptionQueryPlan (ParameterizedSubscriptionQueryPlan RoleName
role MultiplexedQuery b
query) SourceConfig b
sourceConfig CohortKey
cohortKey Maybe Name
_ = SubscriptionQueryPlan b (MultiplexedQuery b)
plan

      handlerId :: PollerKey
handlerId = SourceName -> RoleName -> Text -> PollerKey
PollerKey SourceName
source RoleName
role (Text -> PollerKey) -> Text -> PollerKey
forall a b. (a -> b) -> a -> b
$ MultiplexedQuery b -> Text
forall a. ToTxt a => a -> Text
toTxt MultiplexedQuery b
query

      addToCohort :: Subscriber -> Cohort b -> STM b
addToCohort Subscriber
subscriber Cohort b
handlerC = do
        Subscriber
-> SubscriberId -> TMap SubscriberId Subscriber -> STM ()
forall k v. (Eq k, Hashable k) => v -> k -> TMap k v -> STM ()
TMap.insert Subscriber
subscriber (Subscriber -> SubscriberId
_sId Subscriber
subscriber) (TMap SubscriberId Subscriber -> STM ())
-> TMap SubscriberId Subscriber -> STM ()
forall a b. (a -> b) -> a -> b
$ Cohort b -> TMap SubscriberId Subscriber
forall streamCursorVars.
Cohort streamCursorVars -> TMap SubscriberId Subscriber
_cNewSubscribers Cohort b
handlerC
        b -> STM b
forall (f :: * -> *) a. Applicative f => a -> f a
pure (b -> STM b) -> b -> STM b
forall a b. (a -> b) -> a -> b
$ Cohort b -> b
forall streamCursorVars.
Cohort streamCursorVars -> streamCursorVars
_cStreamCursorVariables Cohort b
handlerC

      addToPoller :: Subscriber
-> CohortId
-> Poller (TVar CursorVariableValues)
-> STM (TVar CursorVariableValues)
addToPoller Subscriber
subscriber CohortId
cohortId Poller (TVar CursorVariableValues)
handler = do
        TVar CursorVariableValues
latestCursorValues <-
          CursorVariableValues -> STM (TVar CursorVariableValues)
forall a. a -> STM (TVar a)
STM.newTVar (HashMap Name TxtEncodedVal -> CursorVariableValues
CursorVariableValues (ValidatedVariables (HashMap Name) -> HashMap Name TxtEncodedVal
forall (f :: * -> *). ValidatedVariables f -> f TxtEncodedVal
_unValidatedVariables (CohortKey -> ValidatedVariables (HashMap Name)
_cvCursorVariables CohortKey
cohortKey)))
        !Cohort (TVar CursorVariableValues)
newCohort <- CohortId
-> TVar (Maybe ResponseHash)
-> TMap SubscriberId Subscriber
-> TMap SubscriberId Subscriber
-> TVar CursorVariableValues
-> Cohort (TVar CursorVariableValues)
forall streamCursorVars.
CohortId
-> TVar (Maybe ResponseHash)
-> TMap SubscriberId Subscriber
-> TMap SubscriberId Subscriber
-> streamCursorVars
-> Cohort streamCursorVars
Cohort CohortId
cohortId (TVar (Maybe ResponseHash)
 -> TMap SubscriberId Subscriber
 -> TMap SubscriberId Subscriber
 -> TVar CursorVariableValues
 -> Cohort (TVar CursorVariableValues))
-> STM (TVar (Maybe ResponseHash))
-> STM
     (TMap SubscriberId Subscriber
      -> TMap SubscriberId Subscriber
      -> TVar CursorVariableValues
      -> Cohort (TVar CursorVariableValues))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe ResponseHash -> STM (TVar (Maybe ResponseHash))
forall a. a -> STM (TVar a)
STM.newTVar Maybe ResponseHash
forall a. Maybe a
Nothing STM
  (TMap SubscriberId Subscriber
   -> TMap SubscriberId Subscriber
   -> TVar CursorVariableValues
   -> Cohort (TVar CursorVariableValues))
-> STM (TMap SubscriberId Subscriber)
-> STM
     (TMap SubscriberId Subscriber
      -> TVar CursorVariableValues -> Cohort (TVar CursorVariableValues))
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> STM (TMap SubscriberId Subscriber)
forall k v. STM (TMap k v)
TMap.new STM
  (TMap SubscriberId Subscriber
   -> TVar CursorVariableValues -> Cohort (TVar CursorVariableValues))
-> STM (TMap SubscriberId Subscriber)
-> STM
     (TVar CursorVariableValues -> Cohort (TVar CursorVariableValues))
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> STM (TMap SubscriberId Subscriber)
forall k v. STM (TMap k v)
TMap.new STM
  (TVar CursorVariableValues -> Cohort (TVar CursorVariableValues))
-> STM (TVar CursorVariableValues)
-> STM (Cohort (TVar CursorVariableValues))
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> TVar CursorVariableValues -> STM (TVar CursorVariableValues)
forall (f :: * -> *) a. Applicative f => a -> f a
pure TVar CursorVariableValues
latestCursorValues
        TVar CursorVariableValues
cohortCursorVals <- Subscriber
-> Cohort (TVar CursorVariableValues)
-> STM (TVar CursorVariableValues)
forall b. Subscriber -> Cohort b -> STM b
addToCohort Subscriber
subscriber Cohort (TVar CursorVariableValues)
newCohort
        Cohort (TVar CursorVariableValues)
-> CohortKey -> CohortMap (TVar CursorVariableValues) -> STM ()
forall k v. (Eq k, Hashable k) => v -> k -> TMap k v -> STM ()
TMap.insert Cohort (TVar CursorVariableValues)
newCohort CohortKey
cohortKey (CohortMap (TVar CursorVariableValues) -> STM ())
-> CohortMap (TVar CursorVariableValues) -> STM ()
forall a b. (a -> b) -> a -> b
$ Poller (TVar CursorVariableValues)
-> CohortMap (TVar CursorVariableValues)
forall streamCursor. Poller streamCursor -> CohortMap streamCursor
_pCohorts Poller (TVar CursorVariableValues)
handler
        TVar CursorVariableValues -> STM (TVar CursorVariableValues)
forall (f :: * -> *) a. Applicative f => a -> f a
pure TVar CursorVariableValues
cohortCursorVals

removeLiveQuery ::
  L.Logger L.Hasura ->
  ServerMetrics ->
  PrometheusMetrics ->
  SubscriptionsState ->
  -- the query and the associated operation
  LiveQuerySubscriberDetails ->
  IO ()
removeLiveQuery :: Logger Hasura
-> ServerMetrics
-> PrometheusMetrics
-> SubscriptionsState
-> LiveQuerySubscriberDetails
-> IO ()
removeLiveQuery Logger Hasura
logger ServerMetrics
serverMetrics PrometheusMetrics
prometheusMetrics SubscriptionsState
lqState lqId :: LiveQuerySubscriberDetails
lqId@(SubscriberDetails PollerKey
handlerId CohortKey
cohortId SubscriberId
sinkId) = IO () -> IO ()
forall a. IO a -> IO a
mask_ (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
  Maybe (IO ())
mbCleanupIO <- STM (Maybe (IO ())) -> IO (Maybe (IO ()))
forall a. STM a -> IO a
STM.atomically (STM (Maybe (IO ())) -> IO (Maybe (IO ())))
-> STM (Maybe (IO ())) -> IO (Maybe (IO ()))
forall a b. (a -> b) -> a -> b
$ do
    Maybe (Poller (), Cohort ())
detM <- PollerMap () -> STM (Maybe (Poller (), Cohort ()))
getQueryDet PollerMap ()
lqMap
    (Maybe (Maybe (IO ())) -> Maybe (IO ()))
-> STM (Maybe (Maybe (IO ()))) -> STM (Maybe (IO ()))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Maybe (Maybe (IO ())) -> Maybe (IO ())
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (STM (Maybe (Maybe (IO ()))) -> STM (Maybe (IO ())))
-> STM (Maybe (Maybe (IO ()))) -> STM (Maybe (IO ()))
forall a b. (a -> b) -> a -> b
$
      Maybe (Poller (), Cohort ())
-> ((Poller (), Cohort ()) -> STM (Maybe (IO ())))
-> STM (Maybe (Maybe (IO ())))
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM Maybe (Poller (), Cohort ())
detM (((Poller (), Cohort ()) -> STM (Maybe (IO ())))
 -> STM (Maybe (Maybe (IO ()))))
-> ((Poller (), Cohort ()) -> STM (Maybe (IO ())))
-> STM (Maybe (Maybe (IO ())))
forall a b. (a -> b) -> a -> b
$ \(Poller CohortMap ()
cohorts TMVar PollerIOState
ioState, Cohort ()
cohort) ->
        CohortMap ()
-> TMVar PollerIOState -> Cohort () -> STM (Maybe (IO ()))
cleanHandlerC CohortMap ()
cohorts TMVar PollerIOState
ioState Cohort ()
cohort
  Maybe (IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a.
(Foldable t, Monad m) =>
t (m a) -> m ()
sequence_ Maybe (IO ())
mbCleanupIO
  IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Gauge -> IO ()
EKG.Gauge.dec (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ ServerMetrics -> Gauge
smActiveSubscriptions ServerMetrics
serverMetrics
  IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Gauge -> IO ()
Prometheus.Gauge.dec (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics -> Gauge
pmActiveSubscriptions PrometheusMetrics
prometheusMetrics
  IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Gauge -> IO ()
EKG.Gauge.dec (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ ServerMetrics -> Gauge
smActiveLiveQueries ServerMetrics
serverMetrics
  where
    lqMap :: PollerMap ()
lqMap = SubscriptionsState -> PollerMap ()
_ssLiveQueryMap SubscriptionsState
lqState

    getQueryDet :: PollerMap () -> STM (Maybe (Poller (), Cohort ()))
getQueryDet PollerMap ()
subMap = do
      Maybe (Poller ())
pollerM <- PollerKey -> PollerMap () -> STM (Maybe (Poller ()))
forall key value.
(Eq key, Hashable key) =>
key -> Map key value -> STM (Maybe value)
STMMap.lookup PollerKey
handlerId PollerMap ()
subMap
      (Maybe (Maybe (Poller (), Cohort ()))
 -> Maybe (Poller (), Cohort ()))
-> STM (Maybe (Maybe (Poller (), Cohort ())))
-> STM (Maybe (Poller (), Cohort ()))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Maybe (Maybe (Poller (), Cohort ()))
-> Maybe (Poller (), Cohort ())
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (STM (Maybe (Maybe (Poller (), Cohort ())))
 -> STM (Maybe (Poller (), Cohort ())))
-> STM (Maybe (Maybe (Poller (), Cohort ())))
-> STM (Maybe (Poller (), Cohort ()))
forall a b. (a -> b) -> a -> b
$
        Maybe (Poller ())
-> (Poller () -> STM (Maybe (Poller (), Cohort ())))
-> STM (Maybe (Maybe (Poller (), Cohort ())))
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM Maybe (Poller ())
pollerM ((Poller () -> STM (Maybe (Poller (), Cohort ())))
 -> STM (Maybe (Maybe (Poller (), Cohort ()))))
-> (Poller () -> STM (Maybe (Poller (), Cohort ())))
-> STM (Maybe (Maybe (Poller (), Cohort ())))
forall a b. (a -> b) -> a -> b
$ \Poller ()
poller -> do
          Maybe (Cohort ())
cohortM <- CohortKey -> CohortMap () -> STM (Maybe (Cohort ()))
forall k v. (Eq k, Hashable k) => k -> TMap k v -> STM (Maybe v)
TMap.lookup CohortKey
cohortId (Poller () -> CohortMap ()
forall streamCursor. Poller streamCursor -> CohortMap streamCursor
_pCohorts Poller ()
poller)
          Maybe (Poller (), Cohort ()) -> STM (Maybe (Poller (), Cohort ()))
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (Poller (), Cohort ())
 -> STM (Maybe (Poller (), Cohort ())))
-> Maybe (Poller (), Cohort ())
-> STM (Maybe (Poller (), Cohort ()))
forall a b. (a -> b) -> a -> b
$ (Poller ()
poller,) (Cohort () -> (Poller (), Cohort ()))
-> Maybe (Cohort ()) -> Maybe (Poller (), Cohort ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (Cohort ())
cohortM

    cleanHandlerC :: CohortMap ()
-> TMVar PollerIOState -> Cohort () -> STM (Maybe (IO ()))
cleanHandlerC CohortMap ()
cohortMap TMVar PollerIOState
ioState Cohort ()
handlerC = do
      let curOps :: TMap SubscriberId Subscriber
curOps = Cohort () -> TMap SubscriberId Subscriber
forall streamCursorVars.
Cohort streamCursorVars -> TMap SubscriberId Subscriber
_cExistingSubscribers Cohort ()
handlerC
          newOps :: TMap SubscriberId Subscriber
newOps = Cohort () -> TMap SubscriberId Subscriber
forall streamCursorVars.
Cohort streamCursorVars -> TMap SubscriberId Subscriber
_cNewSubscribers Cohort ()
handlerC
      SubscriberId -> TMap SubscriberId Subscriber -> STM ()
forall k v. (Eq k, Hashable k) => k -> TMap k v -> STM ()
TMap.delete SubscriberId
sinkId TMap SubscriberId Subscriber
curOps
      SubscriberId -> TMap SubscriberId Subscriber -> STM ()
forall k v. (Eq k, Hashable k) => k -> TMap k v -> STM ()
TMap.delete SubscriberId
sinkId TMap SubscriberId Subscriber
newOps
      Bool
cohortIsEmpty <-
        Bool -> Bool -> Bool
(&&)
          (Bool -> Bool -> Bool) -> STM Bool -> STM (Bool -> Bool)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TMap SubscriberId Subscriber -> STM Bool
forall k v. TMap k v -> STM Bool
TMap.null TMap SubscriberId Subscriber
curOps
          STM (Bool -> Bool) -> STM Bool -> STM Bool
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> TMap SubscriberId Subscriber -> STM Bool
forall k v. TMap k v -> STM Bool
TMap.null TMap SubscriberId Subscriber
newOps
      Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
cohortIsEmpty (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ CohortKey -> CohortMap () -> STM ()
forall k v. (Eq k, Hashable k) => k -> TMap k v -> STM ()
TMap.delete CohortKey
cohortId CohortMap ()
cohortMap
      Bool
handlerIsEmpty <- CohortMap () -> STM Bool
forall k v. TMap k v -> STM Bool
TMap.null CohortMap ()
cohortMap
      -- when there is no need for handler i.e, this happens to be the last
      -- operation, take the ref for the polling thread to cancel it
      if Bool
handlerIsEmpty
        then do
          PollerKey -> PollerMap () -> STM ()
forall key value.
(Eq key, Hashable key) =>
key -> Map key value -> STM ()
STMMap.delete PollerKey
handlerId PollerMap ()
lqMap
          Maybe Thread
threadRefM <- (PollerIOState -> Thread) -> Maybe PollerIOState -> Maybe Thread
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap PollerIOState -> Thread
_pThread (Maybe PollerIOState -> Maybe Thread)
-> STM (Maybe PollerIOState) -> STM (Maybe Thread)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TMVar PollerIOState -> STM (Maybe PollerIOState)
forall a. TMVar a -> STM (Maybe a)
STM.tryReadTMVar TMVar PollerIOState
ioState
          Maybe (IO ()) -> STM (Maybe (IO ()))
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (IO ()) -> STM (Maybe (IO ())))
-> Maybe (IO ()) -> STM (Maybe (IO ()))
forall a b. (a -> b) -> a -> b
$
            IO () -> Maybe (IO ())
forall a. a -> Maybe a
Just (IO () -> Maybe (IO ())) -> IO () -> Maybe (IO ())
forall a b. (a -> b) -> a -> b
$ -- deferred IO:
              case Maybe Thread
threadRefM of
                Just Thread
threadRef -> Thread -> IO ()
Immortal.stop Thread
threadRef
                -- This would seem to imply addLiveQuery broke or a bug
                -- elsewhere. Be paranoid and log:
                Maybe Thread
Nothing ->
                  Logger Hasura
-> forall a (m :: * -> *).
   (ToEngineLog a Hasura, MonadIO m) =>
   a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
   (ToEngineLog a impl, MonadIO m) =>
   a -> m ()
L.unLogger Logger Hasura
logger (UnstructuredLog -> IO ()) -> UnstructuredLog -> IO ()
forall a b. (a -> b) -> a -> b
$
                    LogLevel -> SerializableBlob -> UnstructuredLog
L.UnstructuredLog LogLevel
L.LevelError (SerializableBlob -> UnstructuredLog)
-> SerializableBlob -> UnstructuredLog
forall a b. (a -> b) -> a -> b
$
                      String -> SerializableBlob
forall a. IsString a => String -> a
fromString (String -> SerializableBlob) -> String -> SerializableBlob
forall a b. (a -> b) -> a -> b
$
                        String
"In removeLiveQuery no worker thread installed. Please report this as a bug: "
                          String -> ShowS
forall a. Semigroup a => a -> a -> a
<> LiveQuerySubscriberDetails -> String
forall a. Show a => a -> String
show LiveQuerySubscriberDetails
lqId
        else Maybe (IO ()) -> STM (Maybe (IO ()))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (IO ())
forall a. Maybe a
Nothing

removeStreamingQuery ::
  L.Logger L.Hasura ->
  ServerMetrics ->
  PrometheusMetrics ->
  SubscriptionsState ->
  -- the query and the associated operation
  StreamingSubscriberDetails ->
  IO ()
removeStreamingQuery :: Logger Hasura
-> ServerMetrics
-> PrometheusMetrics
-> SubscriptionsState
-> StreamingSubscriberDetails
-> IO ()
removeStreamingQuery Logger Hasura
logger ServerMetrics
serverMetrics PrometheusMetrics
prometheusMetrics SubscriptionsState
subscriptionState (SubscriberDetails PollerKey
handlerId (CohortKey
cohortId, TVar CursorVariableValues
cursorVariableTV) SubscriberId
sinkId) = IO () -> IO ()
forall a. IO a -> IO a
mask_ (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
  Maybe (IO ())
mbCleanupIO <- STM (Maybe (IO ())) -> IO (Maybe (IO ()))
forall a. STM a -> IO a
STM.atomically (STM (Maybe (IO ())) -> IO (Maybe (IO ())))
-> STM (Maybe (IO ())) -> IO (Maybe (IO ()))
forall a b. (a -> b) -> a -> b
$ do
    Maybe
  (Poller (TVar CursorVariableValues), CohortKey,
   Cohort (TVar CursorVariableValues))
detM <- PollerMap (TVar CursorVariableValues)
-> STM
     (Maybe
        (Poller (TVar CursorVariableValues), CohortKey,
         Cohort (TVar CursorVariableValues)))
getQueryDet PollerMap (TVar CursorVariableValues)
streamQMap
    (Maybe (Maybe (IO ())) -> Maybe (IO ()))
-> STM (Maybe (Maybe (IO ()))) -> STM (Maybe (IO ()))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Maybe (Maybe (IO ())) -> Maybe (IO ())
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (STM (Maybe (Maybe (IO ()))) -> STM (Maybe (IO ())))
-> STM (Maybe (Maybe (IO ()))) -> STM (Maybe (IO ()))
forall a b. (a -> b) -> a -> b
$
      Maybe
  (Poller (TVar CursorVariableValues), CohortKey,
   Cohort (TVar CursorVariableValues))
-> ((Poller (TVar CursorVariableValues), CohortKey,
     Cohort (TVar CursorVariableValues))
    -> STM (Maybe (IO ())))
-> STM (Maybe (Maybe (IO ())))
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM Maybe
  (Poller (TVar CursorVariableValues), CohortKey,
   Cohort (TVar CursorVariableValues))
detM (((Poller (TVar CursorVariableValues), CohortKey,
   Cohort (TVar CursorVariableValues))
  -> STM (Maybe (IO ())))
 -> STM (Maybe (Maybe (IO ()))))
-> ((Poller (TVar CursorVariableValues), CohortKey,
     Cohort (TVar CursorVariableValues))
    -> STM (Maybe (IO ())))
-> STM (Maybe (Maybe (IO ())))
forall a b. (a -> b) -> a -> b
$ \(Poller CohortMap (TVar CursorVariableValues)
cohorts TMVar PollerIOState
ioState, CohortKey
currentCohortId, Cohort (TVar CursorVariableValues)
cohort) ->
        CohortMap (TVar CursorVariableValues)
-> TMVar PollerIOState
-> (Cohort (TVar CursorVariableValues), CohortKey)
-> STM (Maybe (IO ()))
cleanHandlerC CohortMap (TVar CursorVariableValues)
cohorts TMVar PollerIOState
ioState (Cohort (TVar CursorVariableValues)
cohort, CohortKey
currentCohortId)
  Maybe (IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a.
(Foldable t, Monad m) =>
t (m a) -> m ()
sequence_ Maybe (IO ())
mbCleanupIO
  IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Gauge -> IO ()
EKG.Gauge.dec (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ ServerMetrics -> Gauge
smActiveSubscriptions ServerMetrics
serverMetrics
  IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Gauge -> IO ()
Prometheus.Gauge.dec (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics -> Gauge
pmActiveSubscriptions PrometheusMetrics
prometheusMetrics
  IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Gauge -> IO ()
EKG.Gauge.dec (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ ServerMetrics -> Gauge
smActiveStreamingSubscriptions ServerMetrics
serverMetrics
  where
    streamQMap :: PollerMap (TVar CursorVariableValues)
streamQMap = SubscriptionsState -> PollerMap (TVar CursorVariableValues)
_ssStreamQueryMap SubscriptionsState
subscriptionState

    getQueryDet :: PollerMap (TVar CursorVariableValues)
-> STM
     (Maybe
        (Poller (TVar CursorVariableValues), CohortKey,
         Cohort (TVar CursorVariableValues)))
getQueryDet PollerMap (TVar CursorVariableValues)
subMap = do
      Maybe (Poller (TVar CursorVariableValues))
pollerM <- PollerKey
-> PollerMap (TVar CursorVariableValues)
-> STM (Maybe (Poller (TVar CursorVariableValues)))
forall key value.
(Eq key, Hashable key) =>
key -> Map key value -> STM (Maybe value)
STMMap.lookup PollerKey
handlerId PollerMap (TVar CursorVariableValues)
subMap
      (CursorVariableValues HashMap Name TxtEncodedVal
currentCohortCursorVal) <- TVar CursorVariableValues -> STM CursorVariableValues
forall a. TVar a -> STM a
STM.readTVar TVar CursorVariableValues
cursorVariableTV
      let updatedCohortId :: CohortKey
updatedCohortId = ValidatedVariables (HashMap Name) -> CohortKey -> CohortKey
modifyCursorCohortVariables (HashMap Name TxtEncodedVal -> ValidatedVariables (HashMap Name)
forall (f :: * -> *). f TxtEncodedVal -> ValidatedVariables f
mkUnsafeValidateVariables HashMap Name TxtEncodedVal
currentCohortCursorVal) CohortKey
cohortId
      (Maybe
   (Maybe
      (Poller (TVar CursorVariableValues), CohortKey,
       Cohort (TVar CursorVariableValues)))
 -> Maybe
      (Poller (TVar CursorVariableValues), CohortKey,
       Cohort (TVar CursorVariableValues)))
-> STM
     (Maybe
        (Maybe
           (Poller (TVar CursorVariableValues), CohortKey,
            Cohort (TVar CursorVariableValues))))
-> STM
     (Maybe
        (Poller (TVar CursorVariableValues), CohortKey,
         Cohort (TVar CursorVariableValues)))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Maybe
  (Maybe
     (Poller (TVar CursorVariableValues), CohortKey,
      Cohort (TVar CursorVariableValues)))
-> Maybe
     (Poller (TVar CursorVariableValues), CohortKey,
      Cohort (TVar CursorVariableValues))
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (STM
   (Maybe
      (Maybe
         (Poller (TVar CursorVariableValues), CohortKey,
          Cohort (TVar CursorVariableValues))))
 -> STM
      (Maybe
         (Poller (TVar CursorVariableValues), CohortKey,
          Cohort (TVar CursorVariableValues))))
-> STM
     (Maybe
        (Maybe
           (Poller (TVar CursorVariableValues), CohortKey,
            Cohort (TVar CursorVariableValues))))
-> STM
     (Maybe
        (Poller (TVar CursorVariableValues), CohortKey,
         Cohort (TVar CursorVariableValues)))
forall a b. (a -> b) -> a -> b
$
        Maybe (Poller (TVar CursorVariableValues))
-> (Poller (TVar CursorVariableValues)
    -> STM
         (Maybe
            (Poller (TVar CursorVariableValues), CohortKey,
             Cohort (TVar CursorVariableValues))))
-> STM
     (Maybe
        (Maybe
           (Poller (TVar CursorVariableValues), CohortKey,
            Cohort (TVar CursorVariableValues))))
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM Maybe (Poller (TVar CursorVariableValues))
pollerM ((Poller (TVar CursorVariableValues)
  -> STM
       (Maybe
          (Poller (TVar CursorVariableValues), CohortKey,
           Cohort (TVar CursorVariableValues))))
 -> STM
      (Maybe
         (Maybe
            (Poller (TVar CursorVariableValues), CohortKey,
             Cohort (TVar CursorVariableValues)))))
-> (Poller (TVar CursorVariableValues)
    -> STM
         (Maybe
            (Poller (TVar CursorVariableValues), CohortKey,
             Cohort (TVar CursorVariableValues))))
-> STM
     (Maybe
        (Maybe
           (Poller (TVar CursorVariableValues), CohortKey,
            Cohort (TVar CursorVariableValues))))
forall a b. (a -> b) -> a -> b
$ \Poller (TVar CursorVariableValues)
poller -> do
          Maybe (Cohort (TVar CursorVariableValues))
cohortM <- CohortKey
-> CohortMap (TVar CursorVariableValues)
-> STM (Maybe (Cohort (TVar CursorVariableValues)))
forall k v. (Eq k, Hashable k) => k -> TMap k v -> STM (Maybe v)
TMap.lookup CohortKey
updatedCohortId (Poller (TVar CursorVariableValues)
-> CohortMap (TVar CursorVariableValues)
forall streamCursor. Poller streamCursor -> CohortMap streamCursor
_pCohorts Poller (TVar CursorVariableValues)
poller)
          Maybe
  (Poller (TVar CursorVariableValues), CohortKey,
   Cohort (TVar CursorVariableValues))
-> STM
     (Maybe
        (Poller (TVar CursorVariableValues), CohortKey,
         Cohort (TVar CursorVariableValues)))
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe
   (Poller (TVar CursorVariableValues), CohortKey,
    Cohort (TVar CursorVariableValues))
 -> STM
      (Maybe
         (Poller (TVar CursorVariableValues), CohortKey,
          Cohort (TVar CursorVariableValues))))
-> Maybe
     (Poller (TVar CursorVariableValues), CohortKey,
      Cohort (TVar CursorVariableValues))
-> STM
     (Maybe
        (Poller (TVar CursorVariableValues), CohortKey,
         Cohort (TVar CursorVariableValues)))
forall a b. (a -> b) -> a -> b
$ (Poller (TVar CursorVariableValues)
poller,CohortKey
updatedCohortId,) (Cohort (TVar CursorVariableValues)
 -> (Poller (TVar CursorVariableValues), CohortKey,
     Cohort (TVar CursorVariableValues)))
-> Maybe (Cohort (TVar CursorVariableValues))
-> Maybe
     (Poller (TVar CursorVariableValues), CohortKey,
      Cohort (TVar CursorVariableValues))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (Cohort (TVar CursorVariableValues))
cohortM

    cleanHandlerC :: CohortMap (TVar CursorVariableValues)
-> TMVar PollerIOState
-> (Cohort (TVar CursorVariableValues), CohortKey)
-> STM (Maybe (IO ()))
cleanHandlerC CohortMap (TVar CursorVariableValues)
cohortMap TMVar PollerIOState
ioState (Cohort (TVar CursorVariableValues)
handlerC, CohortKey
currentCohortId) = do
      let curOps :: TMap SubscriberId Subscriber
curOps = Cohort (TVar CursorVariableValues) -> TMap SubscriberId Subscriber
forall streamCursorVars.
Cohort streamCursorVars -> TMap SubscriberId Subscriber
_cExistingSubscribers Cohort (TVar CursorVariableValues)
handlerC
          newOps :: TMap SubscriberId Subscriber
newOps = Cohort (TVar CursorVariableValues) -> TMap SubscriberId Subscriber
forall streamCursorVars.
Cohort streamCursorVars -> TMap SubscriberId Subscriber
_cNewSubscribers Cohort (TVar CursorVariableValues)
handlerC
      SubscriberId -> TMap SubscriberId Subscriber -> STM ()
forall k v. (Eq k, Hashable k) => k -> TMap k v -> STM ()
TMap.delete SubscriberId
sinkId TMap SubscriberId Subscriber
curOps
      SubscriberId -> TMap SubscriberId Subscriber -> STM ()
forall k v. (Eq k, Hashable k) => k -> TMap k v -> STM ()
TMap.delete SubscriberId
sinkId TMap SubscriberId Subscriber
newOps
      Bool
cohortIsEmpty <-
        Bool -> Bool -> Bool
(&&)
          (Bool -> Bool -> Bool) -> STM Bool -> STM (Bool -> Bool)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TMap SubscriberId Subscriber -> STM Bool
forall k v. TMap k v -> STM Bool
TMap.null TMap SubscriberId Subscriber
curOps
          STM (Bool -> Bool) -> STM Bool -> STM Bool
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> TMap SubscriberId Subscriber -> STM Bool
forall k v. TMap k v -> STM Bool
TMap.null TMap SubscriberId Subscriber
newOps
      Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
cohortIsEmpty (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ CohortKey -> CohortMap (TVar CursorVariableValues) -> STM ()
forall k v. (Eq k, Hashable k) => k -> TMap k v -> STM ()
TMap.delete CohortKey
currentCohortId CohortMap (TVar CursorVariableValues)
cohortMap
      Bool
handlerIsEmpty <- CohortMap (TVar CursorVariableValues) -> STM Bool
forall k v. TMap k v -> STM Bool
TMap.null CohortMap (TVar CursorVariableValues)
cohortMap
      -- when there is no need for handler i.e,
      -- operation, take the ref for the polling thread to cancel it
      if Bool
handlerIsEmpty
        then do
          PollerKey -> PollerMap (TVar CursorVariableValues) -> STM ()
forall key value.
(Eq key, Hashable key) =>
key -> Map key value -> STM ()
STMMap.delete PollerKey
handlerId PollerMap (TVar CursorVariableValues)
streamQMap
          Maybe Thread
threadRefM <- (PollerIOState -> Thread) -> Maybe PollerIOState -> Maybe Thread
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap PollerIOState -> Thread
_pThread (Maybe PollerIOState -> Maybe Thread)
-> STM (Maybe PollerIOState) -> STM (Maybe Thread)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TMVar PollerIOState -> STM (Maybe PollerIOState)
forall a. TMVar a -> STM (Maybe a)
STM.tryReadTMVar TMVar PollerIOState
ioState
          Maybe (IO ()) -> STM (Maybe (IO ()))
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (IO ()) -> STM (Maybe (IO ())))
-> Maybe (IO ()) -> STM (Maybe (IO ()))
forall a b. (a -> b) -> a -> b
$
            IO () -> Maybe (IO ())
forall a. a -> Maybe a
Just (IO () -> Maybe (IO ())) -> IO () -> Maybe (IO ())
forall a b. (a -> b) -> a -> b
$ -- deferred IO:
              case Maybe Thread
threadRefM of
                Just Thread
threadRef -> Thread -> IO ()
Immortal.stop Thread
threadRef
                -- This would seem to imply addStreamSubscriptionQuery broke or a bug
                -- elsewhere. Be paranoid and log:
                Maybe Thread
Nothing ->
                  Logger Hasura
-> forall a (m :: * -> *).
   (ToEngineLog a Hasura, MonadIO m) =>
   a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
   (ToEngineLog a impl, MonadIO m) =>
   a -> m ()
L.unLogger Logger Hasura
logger (UnstructuredLog -> IO ()) -> UnstructuredLog -> IO ()
forall a b. (a -> b) -> a -> b
$
                    LogLevel -> SerializableBlob -> UnstructuredLog
L.UnstructuredLog LogLevel
L.LevelError (SerializableBlob -> UnstructuredLog)
-> SerializableBlob -> UnstructuredLog
forall a b. (a -> b) -> a -> b
$
                      String -> SerializableBlob
forall a. IsString a => String -> a
fromString (String -> SerializableBlob) -> String -> SerializableBlob
forall a b. (a -> b) -> a -> b
$
                        String
"In removeLiveQuery no worker thread installed. Please report this as a bug: "
                          String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
" poller_id: "
                          String -> ShowS
forall a. Semigroup a => a -> a -> a
<> PollerKey -> String
forall a. Show a => a -> String
show PollerKey
handlerId
                          String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
", cohort_id: "
                          String -> ShowS
forall a. Semigroup a => a -> a -> a
<> CohortKey -> String
forall a. Show a => a -> String
show CohortKey
cohortId
                          String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
", subscriber_id:"
                          String -> ShowS
forall a. Semigroup a => a -> a -> a
<> SubscriberId -> String
forall a. Show a => a -> String
show SubscriberId
sinkId
        else Maybe (IO ()) -> STM (Maybe (IO ()))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (IO ())
forall a. Maybe a
Nothing

-- | An async action query whose relationships are refered to table in a source.
-- We need to generate an SQL statement with the action response and execute it
-- in the source database so as to fetch response joined with relationship rows.
-- For more details see Note [Resolving async action query]
data LiveAsyncActionQueryOnSource = LiveAsyncActionQueryOnSource
  { LiveAsyncActionQueryOnSource -> LiveQuerySubscriberDetails
_laaqpCurrentLqId :: !LiveQuerySubscriberDetails,
    LiveAsyncActionQueryOnSource -> ActionLogResponseMap
_laaqpPrevActionLogMap :: !ActionLogResponseMap,
    -- | An IO action to restart the live query poller with updated action log responses fetched from metadata storage
    -- Restarting a live query re-generates the SQL statement with new action log responses to send latest action
    -- response to the client.
    LiveAsyncActionQueryOnSource
-> LiveQuerySubscriberDetails
-> ActionLogResponseMap
-> IO (Maybe LiveQuerySubscriberDetails)
_laaqpRestartLq :: !(LiveQuerySubscriberDetails -> ActionLogResponseMap -> IO (Maybe LiveQuerySubscriberDetails))
  }

data LiveAsyncActionQueryWithNoRelationships = LiveAsyncActionQueryWithNoRelationships
  { -- | An IO action to send response to the websocket client
    LiveAsyncActionQueryWithNoRelationships
-> ActionLogResponseMap -> IO ()
_laaqwnrSendResponse :: !(ActionLogResponseMap -> IO ()),
    -- | An IO action to send "completed" message to the websocket client
    LiveAsyncActionQueryWithNoRelationships -> IO ()
_laaqwnrSendCompleted :: !(IO ())
  }

data LiveAsyncActionQuery
  = LAAQNoRelationships !LiveAsyncActionQueryWithNoRelationships
  | LAAQOnSourceDB !LiveAsyncActionQueryOnSource

data AsyncActionQueryLive = AsyncActionQueryLive
  { AsyncActionQueryLive -> NonEmpty ActionId
_aaqlActionIds :: !(NonEmpty ActionId),
    -- | An IO action to send error message (in case of any exception) to the websocket client
    AsyncActionQueryLive -> QErr -> IO ()
_aaqlOnException :: !(QErr -> IO ()),
    AsyncActionQueryLive -> LiveAsyncActionQuery
_aaqlLiveExecution :: !LiveAsyncActionQuery
  }

-- | A share-able state map which stores an async action live query with it's subscription operation id
type AsyncActionSubscriptionState = TMap.TMap OperationId AsyncActionQueryLive

addAsyncActionLiveQuery ::
  AsyncActionSubscriptionState ->
  OperationId ->
  NonEmpty ActionId ->
  (QErr -> IO ()) ->
  LiveAsyncActionQuery ->
  IO ()
addAsyncActionLiveQuery :: AsyncActionSubscriptionState
-> OperationId
-> NonEmpty ActionId
-> (QErr -> IO ())
-> LiveAsyncActionQuery
-> IO ()
addAsyncActionLiveQuery AsyncActionSubscriptionState
queriesState OperationId
opId NonEmpty ActionId
actionIds QErr -> IO ()
onException LiveAsyncActionQuery
liveQuery =
  STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
    AsyncActionQueryLive
-> OperationId -> AsyncActionSubscriptionState -> STM ()
forall k v. (Eq k, Hashable k) => v -> k -> TMap k v -> STM ()
TMap.insert (NonEmpty ActionId
-> (QErr -> IO ()) -> LiveAsyncActionQuery -> AsyncActionQueryLive
AsyncActionQueryLive NonEmpty ActionId
actionIds QErr -> IO ()
onException LiveAsyncActionQuery
liveQuery) OperationId
opId AsyncActionSubscriptionState
queriesState

removeAsyncActionLiveQuery ::
  AsyncActionSubscriptionState -> OperationId -> IO ()
removeAsyncActionLiveQuery :: AsyncActionSubscriptionState -> OperationId -> IO ()
removeAsyncActionLiveQuery AsyncActionSubscriptionState
queriesState OperationId
opId =
  STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ OperationId -> AsyncActionSubscriptionState -> STM ()
forall k v. (Eq k, Hashable k) => k -> TMap k v -> STM ()
TMap.delete OperationId
opId AsyncActionSubscriptionState
queriesState