{-# LANGUAGE TemplateHaskell #-}

-- | Multiplexed subscription poller threads; see "Hasura.GraphQL.Execute.Subscription" for details.
module Hasura.GraphQL.Execute.Subscription.Poll.LiveQuery
  ( -- * Pollers
    pollLiveQuery,
  )
where

import Control.Concurrent.Async qualified as A
import Control.Concurrent.STM qualified as STM
import Control.Lens
import Data.Aeson.Ordered qualified as JO
import Data.ByteString qualified as BS
import Data.HashMap.Strict qualified as HashMap
import Data.List.Split (chunksOf)
import Data.Monoid (Endo (..), Sum (..))
import Data.Text.Extended
import GHC.AssertNF.CPP
import Hasura.Base.Error
import Hasura.GraphQL.Execute.Backend
import Hasura.GraphQL.Execute.Subscription.Options
import Hasura.GraphQL.Execute.Subscription.Plan (applyModifier)
import Hasura.GraphQL.Execute.Subscription.Poll.Common hiding (Cohort (..), CohortMap, CohortSnapshot (..))
import Hasura.GraphQL.Execute.Subscription.Poll.Common qualified as C
import Hasura.GraphQL.Execute.Subscription.TMap qualified as TMap
import Hasura.GraphQL.Execute.Subscription.Types
import Hasura.GraphQL.ParameterizedQueryHash (ParameterizedQueryHash)
import Hasura.GraphQL.Transport.Backend
import Hasura.GraphQL.Transport.HTTP.Protocol
import Hasura.Logging (LogLevel (..))
import Hasura.Prelude
import Hasura.RQL.Types.Backend
import Hasura.RQL.Types.BackendTag (backendTag, reify)
import Hasura.RQL.Types.BackendType (BackendType (..), PostgresKind (Vanilla))
import Hasura.RQL.Types.Common (SourceName)
import Hasura.RQL.Types.Roles (RoleName)
import Hasura.RQL.Types.Subscription (SubscriptionType (..))
import Hasura.Server.Prometheus (PrometheusMetrics (..), SubscriptionMetrics (..), liveQuerySubscriptionLabel, recordSubcriptionMetric)
import Hasura.Server.Types (GranularPrometheusMetricsState (..))
import Refined (unrefine)
import System.Metrics.Prometheus.Gauge qualified as Prometheus.Gauge
import System.Metrics.Prometheus.HistogramVector qualified as HistogramVector

pushResultToCohort ::
  GQResult BS.ByteString ->
  Maybe ResponseHash ->
  SubscriptionMetadata ->
  CohortSnapshot 'LiveQuery ->
  -- | subscribers to which data has been pushed, subscribers which already
  -- have this data (this information is exposed by metrics reporting)
  IO ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
pushResultToCohort :: GQResult ByteString
-> Maybe ResponseHash
-> SubscriptionMetadata
-> CohortSnapshot 'LiveQuery
-> IO ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
pushResultToCohort GQResult ByteString
result !Maybe ResponseHash
respHashM (SubscriptionMetadata DiffTime
dTime) CohortSnapshot 'LiveQuery
cohortSnapshot = do
  Maybe ResponseHash
prevRespHashM <- TVar (Maybe ResponseHash) -> IO (Maybe ResponseHash)
forall a. TVar a -> IO a
STM.readTVarIO TVar (Maybe ResponseHash)
respRef
  -- write to the current websockets if needed
  ([Subscriber]
subscribersToPush, [Subscriber]
subscribersToIgnore) <-
    if GQResult ByteString -> Bool
forall a. GQResult a -> Bool
isExecError GQResult ByteString
result Bool -> Bool -> Bool
|| Maybe ResponseHash
respHashM Maybe ResponseHash -> Maybe ResponseHash -> Bool
forall a. Eq a => a -> a -> Bool
/= Maybe ResponseHash
prevRespHashM
      then do
        $String
String -> Maybe ResponseHash -> IO ()
forall a. String -> a -> IO ()
assertNFHere Maybe ResponseHash
respHashM -- so we don't write thunks to mutable vars
        STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
          TVar (Maybe ResponseHash) -> Maybe ResponseHash -> STM ()
forall a. TVar a -> a -> STM ()
STM.writeTVar TVar (Maybe ResponseHash)
respRef Maybe ResponseHash
respHashM
        ([Subscriber], [Subscriber]) -> IO ([Subscriber], [Subscriber])
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ([Subscriber]
newSinks [Subscriber] -> [Subscriber] -> [Subscriber]
forall a. Semigroup a => a -> a -> a
<> [Subscriber]
curSinks, [Subscriber]
forall a. Monoid a => a
mempty)
      else ([Subscriber], [Subscriber]) -> IO ([Subscriber], [Subscriber])
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ([Subscriber]
newSinks, [Subscriber]
curSinks)
  [Subscriber] -> IO ()
pushResultToSubscribers [Subscriber]
subscribersToPush
  ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
-> IO ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
    (([SubscriberExecutionDetails], [SubscriberExecutionDetails])
 -> IO ([SubscriberExecutionDetails], [SubscriberExecutionDetails]))
-> ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
-> IO ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
forall a b. (a -> b) -> a -> b
$ ASetter
  ([Subscriber], [Subscriber])
  ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
  Subscriber
  SubscriberExecutionDetails
-> (Subscriber -> SubscriberExecutionDetails)
-> ([Subscriber], [Subscriber])
-> ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
forall s t a b. ASetter s t a b -> (a -> b) -> s -> t
over
      (([Subscriber] -> Identity [SubscriberExecutionDetails])
-> ([Subscriber], [Subscriber])
-> Identity
     ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
forall s t a b. Each s t a b => Traversal s t a b
Traversal
  ([Subscriber], [Subscriber])
  ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
  [Subscriber]
  [SubscriberExecutionDetails]
each (([Subscriber] -> Identity [SubscriberExecutionDetails])
 -> ([Subscriber], [Subscriber])
 -> Identity
      ([SubscriberExecutionDetails], [SubscriberExecutionDetails]))
-> ((Subscriber -> Identity SubscriberExecutionDetails)
    -> [Subscriber] -> Identity [SubscriberExecutionDetails])
-> ASetter
     ([Subscriber], [Subscriber])
     ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
     Subscriber
     SubscriberExecutionDetails
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Subscriber -> Identity SubscriberExecutionDetails)
-> [Subscriber] -> Identity [SubscriberExecutionDetails]
forall s t a b. Each s t a b => Traversal s t a b
Traversal
  [Subscriber]
  [SubscriberExecutionDetails]
  Subscriber
  SubscriberExecutionDetails
each)
      ( \Subscriber {Maybe OperationName
RequestId
SubscriberMetadata
SubscriberId
OnChange
_sId :: SubscriberId
_sMetadata :: SubscriberMetadata
_sRequestId :: RequestId
_sOperationName :: Maybe OperationName
_sOnChangeCallback :: OnChange
_sId :: Subscriber -> SubscriberId
_sMetadata :: Subscriber -> SubscriberMetadata
_sRequestId :: Subscriber -> RequestId
_sOperationName :: Subscriber -> Maybe OperationName
_sOnChangeCallback :: Subscriber -> OnChange
..} ->
          SubscriberId -> SubscriberMetadata -> SubscriberExecutionDetails
SubscriberExecutionDetails SubscriberId
_sId SubscriberMetadata
_sMetadata
      )
      ([Subscriber]
subscribersToPush, [Subscriber]
subscribersToIgnore)
  where
    C.CohortSnapshot CohortVariables
_ TVar (Maybe ResponseHash)
respRef [Subscriber]
curSinks [Subscriber]
newSinks = CohortSnapshot 'LiveQuery
cohortSnapshot

    response :: Either GQExecError SubscriptionResponse
response = GQResult ByteString
result GQResult ByteString
-> (ByteString -> SubscriptionResponse)
-> Either GQExecError SubscriptionResponse
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> (ByteString -> DiffTime -> SubscriptionResponse
`SubscriptionResponse` DiffTime
dTime)
    pushResultToSubscribers :: [Subscriber] -> IO ()
pushResultToSubscribers =
      (Subscriber -> IO ()) -> [Subscriber] -> IO ()
forall (f :: * -> *) a b. Foldable f => (a -> IO b) -> f a -> IO ()
A.mapConcurrently_ ((Subscriber -> IO ()) -> [Subscriber] -> IO ())
-> (Subscriber -> IO ()) -> [Subscriber] -> IO ()
forall a b. (a -> b) -> a -> b
$ \Subscriber {Maybe OperationName
RequestId
SubscriberMetadata
SubscriberId
OnChange
_sId :: Subscriber -> SubscriberId
_sMetadata :: Subscriber -> SubscriberMetadata
_sRequestId :: Subscriber -> RequestId
_sOperationName :: Subscriber -> Maybe OperationName
_sOnChangeCallback :: Subscriber -> OnChange
_sId :: SubscriberId
_sMetadata :: SubscriberMetadata
_sRequestId :: RequestId
_sOperationName :: Maybe OperationName
_sOnChangeCallback :: OnChange
..} -> OnChange
_sOnChangeCallback Either GQExecError SubscriptionResponse
response

-- | Where the magic happens: the top-level action run periodically by each
-- active 'Poller'. This needs to be async exception safe.
pollLiveQuery ::
  forall b.
  (BackendTransport b) =>
  PollerId ->
  STM.TVar PollerResponseState ->
  SubscriptionsOptions ->
  (SourceName, SourceConfig b) ->
  RoleName ->
  ParameterizedQueryHash ->
  MultiplexedQuery b ->
  CohortMap 'LiveQuery ->
  SubscriptionPostPollHook ->
  PrometheusMetrics ->
  IO GranularPrometheusMetricsState ->
  TMap.TMap (Maybe OperationName) Int ->
  ResolvedConnectionTemplate b ->
  (Maybe (Endo JO.Value)) ->
  IO ()
pollLiveQuery :: forall (b :: BackendType).
BackendTransport b =>
PollerId
-> TVar PollerResponseState
-> SubscriptionsOptions
-> (SourceName, SourceConfig b)
-> RoleName
-> ParameterizedQueryHash
-> MultiplexedQuery b
-> CohortMap 'LiveQuery
-> SubscriptionPostPollHook
-> PrometheusMetrics
-> IO GranularPrometheusMetricsState
-> TMap (Maybe OperationName) Int
-> ResolvedConnectionTemplate b
-> Maybe (Endo Value)
-> IO ()
pollLiveQuery PollerId
pollerId TVar PollerResponseState
pollerResponseState SubscriptionsOptions
lqOpts (SourceName
sourceName, SourceConfig b
sourceConfig) RoleName
roleName ParameterizedQueryHash
parameterizedQueryHash MultiplexedQuery b
query CohortMap 'LiveQuery
cohortMap SubscriptionPostPollHook
postPollHook PrometheusMetrics
prometheusMetrics IO GranularPrometheusMetricsState
granularPrometheusMetricsState TMap (Maybe OperationName) Int
operationNamesMap' ResolvedConnectionTemplate b
resolvedConnectionTemplate Maybe (Endo Value)
modifier = do
  HashMap (Maybe OperationName) Int
operationNamesMap <- STM (HashMap (Maybe OperationName) Int)
-> IO (HashMap (Maybe OperationName) Int)
forall a. STM a -> IO a
STM.atomically (STM (HashMap (Maybe OperationName) Int)
 -> IO (HashMap (Maybe OperationName) Int))
-> STM (HashMap (Maybe OperationName) Int)
-> IO (HashMap (Maybe OperationName) Int)
forall a b. (a -> b) -> a -> b
$ TMap (Maybe OperationName) Int
-> STM (HashMap (Maybe OperationName) Int)
forall k v. TMap k v -> STM (HashMap k v)
TMap.getMap TMap (Maybe OperationName) Int
operationNamesMap'
  (DiffTime
totalTime, (DiffTime
snapshotTime, ([BatchExecutionDetails]
batchesDetails, [Maybe PollDetailsError]
maybeErrors))) <- IO (DiffTime, ([BatchExecutionDetails], [Maybe PollDetailsError]))
-> IO
     (DiffTime,
      (DiffTime, ([BatchExecutionDetails], [Maybe PollDetailsError])))
forall (m :: * -> *) a. MonadIO m => m a -> m (DiffTime, a)
withElapsedTime (IO (DiffTime, ([BatchExecutionDetails], [Maybe PollDetailsError]))
 -> IO
      (DiffTime,
       (DiffTime, ([BatchExecutionDetails], [Maybe PollDetailsError]))))
-> IO
     (DiffTime, ([BatchExecutionDetails], [Maybe PollDetailsError]))
-> IO
     (DiffTime,
      (DiffTime, ([BatchExecutionDetails], [Maybe PollDetailsError])))
forall a b. (a -> b) -> a -> b
$ do
    -- snapshot the current cohorts and split them into batches
    (DiffTime
snapshotTime, [(BatchId, [(CohortId, CohortSnapshot)])]
cohortBatches) <- IO [(BatchId, [(CohortId, CohortSnapshot)])]
-> IO (DiffTime, [(BatchId, [(CohortId, CohortSnapshot)])])
forall (m :: * -> *) a. MonadIO m => m a -> m (DiffTime, a)
withElapsedTime (IO [(BatchId, [(CohortId, CohortSnapshot)])]
 -> IO (DiffTime, [(BatchId, [(CohortId, CohortSnapshot)])]))
-> IO [(BatchId, [(CohortId, CohortSnapshot)])]
-> IO (DiffTime, [(BatchId, [(CohortId, CohortSnapshot)])])
forall a b. (a -> b) -> a -> b
$ do
      -- get a snapshot of all the cohorts
      -- this need not be done in a transaction
      [(CohortVariables, Cohort ())]
cohorts <- STM [(CohortVariables, Cohort ())]
-> IO [(CohortVariables, Cohort ())]
forall a. STM a -> IO a
STM.atomically (STM [(CohortVariables, Cohort ())]
 -> IO [(CohortVariables, Cohort ())])
-> STM [(CohortVariables, Cohort ())]
-> IO [(CohortVariables, Cohort ())]
forall a b. (a -> b) -> a -> b
$ TMap CohortVariables (Cohort ())
-> STM [(CohortVariables, Cohort ())]
forall k v. TMap k v -> STM [(k, v)]
TMap.toList TMap CohortVariables (Cohort ())
CohortMap 'LiveQuery
cohortMap
      [(CohortId, CohortSnapshot)]
cohortSnapshots <- ((CohortVariables, Cohort ()) -> IO (CohortId, CohortSnapshot))
-> [(CohortVariables, Cohort ())]
-> IO [(CohortId, CohortSnapshot)]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM (STM (CohortId, CohortSnapshot) -> IO (CohortId, CohortSnapshot)
forall a. STM a -> IO a
STM.atomically (STM (CohortId, CohortSnapshot) -> IO (CohortId, CohortSnapshot))
-> ((CohortVariables, Cohort ()) -> STM (CohortId, CohortSnapshot))
-> (CohortVariables, Cohort ())
-> IO (CohortId, CohortSnapshot)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (CohortVariables, Cohort ()) -> STM (CohortId, CohortSnapshot)
getCohortSnapshot) [(CohortVariables, Cohort ())]
cohorts
      -- cohorts are broken down into batches specified by the batch size
      let cohortBatches :: [[(CohortId, CohortSnapshot)]]
cohortBatches = Int
-> [(CohortId, CohortSnapshot)] -> [[(CohortId, CohortSnapshot)]]
forall e. Int -> [e] -> [[e]]
chunksOf (Refined NonNegative Int -> Int
forall {k} (p :: k) x. Refined p x -> x
unrefine (BatchSize -> Refined NonNegative Int
unBatchSize BatchSize
batchSize)) [(CohortId, CohortSnapshot)]
cohortSnapshots
      -- associating every batch with their BatchId
      [(BatchId, [(CohortId, CohortSnapshot)])]
-> IO [(BatchId, [(CohortId, CohortSnapshot)])]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([(BatchId, [(CohortId, CohortSnapshot)])]
 -> IO [(BatchId, [(CohortId, CohortSnapshot)])])
-> [(BatchId, [(CohortId, CohortSnapshot)])]
-> IO [(BatchId, [(CohortId, CohortSnapshot)])]
forall a b. (a -> b) -> a -> b
$ [BatchId]
-> [[(CohortId, CohortSnapshot)]]
-> [(BatchId, [(CohortId, CohortSnapshot)])]
forall a b. [a] -> [b] -> [(a, b)]
zip (Int -> BatchId
BatchId (Int -> BatchId) -> [Int] -> [BatchId]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [Int
1 ..]) [[(CohortId, CohortSnapshot)]]
cohortBatches

    -- concurrently process each batch
    [(BatchExecutionDetails, Maybe PollDetailsError)]
batchesDetailsWithMaybeError <- [(BatchId, [(CohortId, CohortSnapshot)])]
-> ((BatchId, [(CohortId, CohortSnapshot)])
    -> IO (BatchExecutionDetails, Maybe PollDetailsError))
-> IO [(BatchExecutionDetails, Maybe PollDetailsError)]
forall (t :: * -> *) a b.
Traversable t =>
t a -> (a -> IO b) -> IO (t b)
A.forConcurrently [(BatchId, [(CohortId, CohortSnapshot)])]
cohortBatches (((BatchId, [(CohortId, CohortSnapshot)])
  -> IO (BatchExecutionDetails, Maybe PollDetailsError))
 -> IO [(BatchExecutionDetails, Maybe PollDetailsError)])
-> ((BatchId, [(CohortId, CohortSnapshot)])
    -> IO (BatchExecutionDetails, Maybe PollDetailsError))
-> IO [(BatchExecutionDetails, Maybe PollDetailsError)]
forall a b. (a -> b) -> a -> b
$ \(BatchId
batchId, [(CohortId, CohortSnapshot)]
cohorts) -> do
      (DiffTime
queryExecutionTime, Either QErr [(CohortId, ByteString)]
mxRes) <- forall (b :: BackendType) (m :: * -> *).
(BackendTransport b, MonadIO m, MonadBaseControl IO m) =>
SourceConfig b
-> MultiplexedQuery b
-> [(CohortId, CohortVariables)]
-> ResolvedConnectionTemplate b
-> m (DiffTime, Either QErr [(CohortId, ByteString)])
runDBSubscription @b SourceConfig b
sourceConfig MultiplexedQuery b
query (ASetter
  [(CohortId, CohortSnapshot)]
  [(CohortId, CohortVariables)]
  CohortSnapshot
  CohortVariables
-> (CohortSnapshot -> CohortVariables)
-> [(CohortId, CohortSnapshot)]
-> [(CohortId, CohortVariables)]
forall s t a b. ASetter s t a b -> (a -> b) -> s -> t
over (((CohortId, CohortSnapshot)
 -> Identity (CohortId, CohortVariables))
-> [(CohortId, CohortSnapshot)]
-> Identity [(CohortId, CohortVariables)]
forall s t a b. Each s t a b => Traversal s t a b
Traversal
  [(CohortId, CohortSnapshot)]
  [(CohortId, CohortVariables)]
  (CohortId, CohortSnapshot)
  (CohortId, CohortVariables)
each (((CohortId, CohortSnapshot)
  -> Identity (CohortId, CohortVariables))
 -> [(CohortId, CohortSnapshot)]
 -> Identity [(CohortId, CohortVariables)])
-> ((CohortSnapshot -> Identity CohortVariables)
    -> (CohortId, CohortSnapshot)
    -> Identity (CohortId, CohortVariables))
-> ASetter
     [(CohortId, CohortSnapshot)]
     [(CohortId, CohortVariables)]
     CohortSnapshot
     CohortVariables
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (CohortSnapshot -> Identity CohortVariables)
-> (CohortId, CohortSnapshot)
-> Identity (CohortId, CohortVariables)
forall s t a b. Field2 s t a b => Lens s t a b
Lens
  (CohortId, CohortSnapshot)
  (CohortId, CohortVariables)
  CohortSnapshot
  CohortVariables
_2) CohortSnapshot -> CohortVariables
C._csVariables [(CohortId, CohortSnapshot)]
cohorts) ResolvedConnectionTemplate b
resolvedConnectionTemplate

      let dbExecTimeMetric :: HistogramVector SubscriptionLabel
dbExecTimeMetric = SubscriptionMetrics -> HistogramVector SubscriptionLabel
submDBExecTotalTime (SubscriptionMetrics -> HistogramVector SubscriptionLabel)
-> SubscriptionMetrics -> HistogramVector SubscriptionLabel
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics -> SubscriptionMetrics
pmSubscriptionMetrics (PrometheusMetrics -> SubscriptionMetrics)
-> PrometheusMetrics -> SubscriptionMetrics
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics
prometheusMetrics
      IO GranularPrometheusMetricsState
-> Bool
-> HashMap (Maybe OperationName) Int
-> ParameterizedQueryHash
-> SubscriptionKindLabel
-> (SubscriptionLabel -> IO ())
-> IO ()
forall (m :: * -> *).
MonadIO m =>
IO GranularPrometheusMetricsState
-> Bool
-> HashMap (Maybe OperationName) Int
-> ParameterizedQueryHash
-> SubscriptionKindLabel
-> (SubscriptionLabel -> IO ())
-> m ()
recordSubcriptionMetric
        IO GranularPrometheusMetricsState
granularPrometheusMetricsState
        Bool
True
        HashMap (Maybe OperationName) Int
operationNamesMap
        ParameterizedQueryHash
parameterizedQueryHash
        SubscriptionKindLabel
liveQuerySubscriptionLabel
        ((SubscriptionLabel -> Double -> IO ())
-> Double -> SubscriptionLabel -> IO ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip (HistogramVector SubscriptionLabel
-> SubscriptionLabel -> Double -> IO ()
forall label.
Ord label =>
HistogramVector label -> label -> Double -> IO ()
HistogramVector.observe HistogramVector SubscriptionLabel
dbExecTimeMetric) (DiffTime -> Double
forall a b. (Real a, Fractional b) => a -> b
realToFrac DiffTime
queryExecutionTime))

      PollerResponseState
previousPollerResponseState <- TVar PollerResponseState -> IO PollerResponseState
forall a. TVar a -> IO a
STM.readTVarIO TVar PollerResponseState
pollerResponseState

      Maybe PollDetailsError
maybeError <- case Either QErr [(CohortId, ByteString)]
mxRes of
        Left QErr
err -> do
          Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (PollerResponseState
previousPollerResponseState PollerResponseState -> PollerResponseState -> Bool
forall a. Eq a => a -> a -> Bool
== PollerResponseState
PRSSuccess) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
            Gauge -> IO ()
Prometheus.Gauge.inc (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ SubscriptionMetrics -> Gauge
submActiveLiveQueryPollersInError (SubscriptionMetrics -> Gauge) -> SubscriptionMetrics -> Gauge
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics -> SubscriptionMetrics
pmSubscriptionMetrics PrometheusMetrics
prometheusMetrics
            STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar PollerResponseState -> PollerResponseState -> STM ()
forall a. TVar a -> a -> STM ()
STM.writeTVar TVar PollerResponseState
pollerResponseState PollerResponseState
PRSError
          let pollDetailsError :: PollDetailsError
pollDetailsError =
                PollDetailsError
                  { _pdeBatchId :: BatchId
_pdeBatchId = BatchId
batchId,
                    _pdeErrorDetails :: QErr
_pdeErrorDetails = QErr
err
                  }
          Maybe PollDetailsError -> IO (Maybe PollDetailsError)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe PollDetailsError -> IO (Maybe PollDetailsError))
-> Maybe PollDetailsError -> IO (Maybe PollDetailsError)
forall a b. (a -> b) -> a -> b
$ PollDetailsError -> Maybe PollDetailsError
forall a. a -> Maybe a
Just PollDetailsError
pollDetailsError
        Right [(CohortId, ByteString)]
_ -> do
          Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (PollerResponseState
previousPollerResponseState PollerResponseState -> PollerResponseState -> Bool
forall a. Eq a => a -> a -> Bool
== PollerResponseState
PRSError) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
            Gauge -> IO ()
Prometheus.Gauge.dec (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ SubscriptionMetrics -> Gauge
submActiveLiveQueryPollersInError (SubscriptionMetrics -> Gauge) -> SubscriptionMetrics -> Gauge
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics -> SubscriptionMetrics
pmSubscriptionMetrics PrometheusMetrics
prometheusMetrics
            STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar PollerResponseState -> PollerResponseState -> STM ()
forall a. TVar a -> a -> STM ()
STM.writeTVar TVar PollerResponseState
pollerResponseState PollerResponseState
PRSSuccess
          Maybe PollDetailsError -> IO (Maybe PollDetailsError)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe PollDetailsError
forall a. Maybe a
Nothing

      let lqMeta :: SubscriptionMetadata
lqMeta = DiffTime -> SubscriptionMetadata
SubscriptionMetadata (DiffTime -> SubscriptionMetadata)
-> DiffTime -> SubscriptionMetadata
forall a b. (a -> b) -> a -> b
$ DiffTime -> DiffTime
forall x y. (Duration x, Duration y) => x -> y
convertDuration DiffTime
queryExecutionTime
          operations :: [(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
  CohortSnapshot)]
operations = [(CohortId, CohortSnapshot)]
-> Either QErr [(CohortId, ByteString)]
-> [(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
     CohortSnapshot)]
getCohortOperations [(CohortId, CohortSnapshot)]
cohorts Either QErr [(CohortId, ByteString)]
mxRes
          -- batch response size is the sum of the response sizes of the cohorts
          batchResponseSize :: Maybe Int
batchResponseSize =
            case Either QErr [(CohortId, ByteString)]
mxRes of
              Left QErr
_ -> Maybe Int
forall a. Maybe a
Nothing
              Right [(CohortId, ByteString)]
resp -> Int -> Maybe Int
forall a. a -> Maybe a
Just (Int -> Maybe Int) -> Int -> Maybe Int
forall a b. (a -> b) -> a -> b
$ Sum Int -> Int
forall a. Sum a -> a
getSum (Sum Int -> Int) -> Sum Int -> Int
forall a b. (a -> b) -> a -> b
$ ((CohortId, ByteString) -> Sum Int)
-> [(CohortId, ByteString)] -> Sum Int
forall m a. Monoid m => (a -> m) -> [a] -> m
forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
foldMap (Int -> Sum Int
forall a. a -> Sum a
Sum (Int -> Sum Int)
-> ((CohortId, ByteString) -> Int)
-> (CohortId, ByteString)
-> Sum Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Int
BS.length (ByteString -> Int)
-> ((CohortId, ByteString) -> ByteString)
-> (CohortId, ByteString)
-> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (CohortId, ByteString) -> ByteString
forall a b. (a, b) -> b
snd) [(CohortId, ByteString)]
resp
      (DiffTime
pushTime, [CohortExecutionDetails]
cohortsExecutionDetails) <- IO [CohortExecutionDetails]
-> IO (DiffTime, [CohortExecutionDetails])
forall (m :: * -> *) a. MonadIO m => m a -> m (DiffTime, a)
withElapsedTime
        (IO [CohortExecutionDetails]
 -> IO (DiffTime, [CohortExecutionDetails]))
-> IO [CohortExecutionDetails]
-> IO (DiffTime, [CohortExecutionDetails])
forall a b. (a -> b) -> a -> b
$ [(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
  CohortSnapshot)]
-> ((GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
     CohortSnapshot)
    -> IO CohortExecutionDetails)
-> IO [CohortExecutionDetails]
forall (t :: * -> *) a b.
Traversable t =>
t a -> (a -> IO b) -> IO (t b)
A.forConcurrently [(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
  CohortSnapshot)]
operations
        (((GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
   CohortSnapshot)
  -> IO CohortExecutionDetails)
 -> IO [CohortExecutionDetails])
-> ((GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
     CohortSnapshot)
    -> IO CohortExecutionDetails)
-> IO [CohortExecutionDetails]
forall a b. (a -> b) -> a -> b
$ \(GQResult ByteString
res, CohortId
cohortId, Maybe (ResponseHash, Int)
respData, CohortSnapshot
snapshot) -> do
          ([SubscriberExecutionDetails]
pushedSubscribers, [SubscriberExecutionDetails]
ignoredSubscribers) <-
            GQResult ByteString
-> Maybe ResponseHash
-> SubscriptionMetadata
-> CohortSnapshot 'LiveQuery
-> IO ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
pushResultToCohort GQResult ByteString
res ((ResponseHash, Int) -> ResponseHash
forall a b. (a, b) -> a
fst ((ResponseHash, Int) -> ResponseHash)
-> Maybe (ResponseHash, Int) -> Maybe ResponseHash
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (ResponseHash, Int)
respData) SubscriptionMetadata
lqMeta CohortSnapshot
CohortSnapshot 'LiveQuery
snapshot
          CohortExecutionDetails -> IO CohortExecutionDetails
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
            CohortExecutionDetails
              { _cedCohortId :: CohortId
_cedCohortId = CohortId
cohortId,
                _cedVariables :: CohortVariables
_cedVariables = CohortSnapshot -> CohortVariables
C._csVariables CohortSnapshot
snapshot,
                _cedPushedTo :: [SubscriberExecutionDetails]
_cedPushedTo = [SubscriberExecutionDetails]
pushedSubscribers,
                _cedIgnored :: [SubscriberExecutionDetails]
_cedIgnored = [SubscriberExecutionDetails]
ignoredSubscribers,
                _cedResponseSize :: Maybe Int
_cedResponseSize = (ResponseHash, Int) -> Int
forall a b. (a, b) -> b
snd ((ResponseHash, Int) -> Int)
-> Maybe (ResponseHash, Int) -> Maybe Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (ResponseHash, Int)
respData,
                _cedBatchId :: BatchId
_cedBatchId = BatchId
batchId
              }

      -- Note: We want to keep the '_bedPgExecutionTime' field for backwards
      -- compatibility reason, which will be 'Nothing' for non-PG backends. See
      -- https://hasurahq.atlassian.net/browse/GS-329
      let pgExecutionTime :: Maybe DiffTime
pgExecutionTime = case BackendTag b -> BackendType
forall (b :: BackendType). BackendTag b -> BackendType
reify (forall (b :: BackendType). HasTag b => BackendTag b
backendTag @b) of
            Postgres PostgresKind
Vanilla -> DiffTime -> Maybe DiffTime
forall a. a -> Maybe a
Just DiffTime
queryExecutionTime
            BackendType
_ -> Maybe DiffTime
forall a. Maybe a
Nothing
          batchExecDetails :: BatchExecutionDetails
batchExecDetails =
            Maybe DiffTime
-> DiffTime
-> DiffTime
-> BatchId
-> [CohortExecutionDetails]
-> Maybe Int
-> BatchExecutionDetails
BatchExecutionDetails
              Maybe DiffTime
pgExecutionTime
              DiffTime
queryExecutionTime
              DiffTime
pushTime
              BatchId
batchId
              [CohortExecutionDetails]
cohortsExecutionDetails
              Maybe Int
batchResponseSize
      (BatchExecutionDetails, Maybe PollDetailsError)
-> IO (BatchExecutionDetails, Maybe PollDetailsError)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((BatchExecutionDetails, Maybe PollDetailsError)
 -> IO (BatchExecutionDetails, Maybe PollDetailsError))
-> (BatchExecutionDetails, Maybe PollDetailsError)
-> IO (BatchExecutionDetails, Maybe PollDetailsError)
forall a b. (a -> b) -> a -> b
$ (BatchExecutionDetails
batchExecDetails, Maybe PollDetailsError
maybeError)
    (DiffTime, ([BatchExecutionDetails], [Maybe PollDetailsError]))
-> IO
     (DiffTime, ([BatchExecutionDetails], [Maybe PollDetailsError]))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (DiffTime
snapshotTime, [(BatchExecutionDetails, Maybe PollDetailsError)]
-> ([BatchExecutionDetails], [Maybe PollDetailsError])
forall a b. [(a, b)] -> ([a], [b])
unzip [(BatchExecutionDetails, Maybe PollDetailsError)]
batchesDetailsWithMaybeError)
  let initPollDetails :: PollDetails
initPollDetails =
        PollDetails
          { _pdPollerId :: PollerId
_pdPollerId = PollerId
pollerId,
            _pdKind :: SubscriptionType
_pdKind = SubscriptionType
LiveQuery,
            _pdGeneratedSql :: Text
_pdGeneratedSql = MultiplexedQuery b -> Text
forall a. ToTxt a => a -> Text
toTxt MultiplexedQuery b
query,
            _pdSnapshotTime :: DiffTime
_pdSnapshotTime = DiffTime
snapshotTime,
            _pdBatches :: [BatchExecutionDetails]
_pdBatches = [BatchExecutionDetails]
batchesDetails,
            _pdLiveQueryOptions :: SubscriptionsOptions
_pdLiveQueryOptions = SubscriptionsOptions
lqOpts,
            _pdTotalTime :: DiffTime
_pdTotalTime = DiffTime
totalTime,
            _pdSource :: SourceName
_pdSource = SourceName
sourceName,
            _pdRole :: RoleName
_pdRole = RoleName
roleName,
            _pdParameterizedQueryHash :: ParameterizedQueryHash
_pdParameterizedQueryHash = ParameterizedQueryHash
parameterizedQueryHash,
            _pdLogLevel :: LogLevel
_pdLogLevel = LogLevel
LevelInfo,
            _pdErrors :: Maybe [PollDetailsError]
_pdErrors = Maybe [PollDetailsError]
forall a. Maybe a
Nothing
          }
      maybePollDetailsErrors :: Maybe [PollDetailsError]
maybePollDetailsErrors = [Maybe PollDetailsError] -> Maybe [PollDetailsError]
forall (t :: * -> *) (f :: * -> *) a.
(Traversable t, Applicative f) =>
t (f a) -> f (t a)
forall (f :: * -> *) a. Applicative f => [f a] -> f [a]
sequenceA [Maybe PollDetailsError]
maybeErrors
      pollDetails :: PollDetails
pollDetails = case Maybe [PollDetailsError]
maybePollDetailsErrors of
        Maybe [PollDetailsError]
Nothing -> PollDetails
initPollDetails
        Just [PollDetailsError]
pollDetailsErrors ->
          PollDetails
initPollDetails
            { _pdLogLevel :: LogLevel
_pdLogLevel = LogLevel
LevelError,
              _pdErrors :: Maybe [PollDetailsError]
_pdErrors = [PollDetailsError] -> Maybe [PollDetailsError]
forall a. a -> Maybe a
Just [PollDetailsError]
pollDetailsErrors
            }
  SubscriptionPostPollHook
postPollHook PollDetails
pollDetails
  let totalTimeMetric :: HistogramVector SubscriptionLabel
totalTimeMetric = SubscriptionMetrics -> HistogramVector SubscriptionLabel
submTotalTime (SubscriptionMetrics -> HistogramVector SubscriptionLabel)
-> SubscriptionMetrics -> HistogramVector SubscriptionLabel
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics -> SubscriptionMetrics
pmSubscriptionMetrics (PrometheusMetrics -> SubscriptionMetrics)
-> PrometheusMetrics -> SubscriptionMetrics
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics
prometheusMetrics
  IO GranularPrometheusMetricsState
-> Bool
-> HashMap (Maybe OperationName) Int
-> ParameterizedQueryHash
-> SubscriptionKindLabel
-> (SubscriptionLabel -> IO ())
-> IO ()
forall (m :: * -> *).
MonadIO m =>
IO GranularPrometheusMetricsState
-> Bool
-> HashMap (Maybe OperationName) Int
-> ParameterizedQueryHash
-> SubscriptionKindLabel
-> (SubscriptionLabel -> IO ())
-> m ()
recordSubcriptionMetric
    IO GranularPrometheusMetricsState
granularPrometheusMetricsState
    Bool
True
    HashMap (Maybe OperationName) Int
operationNamesMap
    ParameterizedQueryHash
parameterizedQueryHash
    SubscriptionKindLabel
liveQuerySubscriptionLabel
    ((SubscriptionLabel -> Double -> IO ())
-> Double -> SubscriptionLabel -> IO ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip (HistogramVector SubscriptionLabel
-> SubscriptionLabel -> Double -> IO ()
forall label.
Ord label =>
HistogramVector label -> label -> Double -> IO ()
HistogramVector.observe HistogramVector SubscriptionLabel
totalTimeMetric) (DiffTime -> Double
forall a b. (Real a, Fractional b) => a -> b
realToFrac DiffTime
totalTime))
  where
    SubscriptionsOptions BatchSize
batchSize RefetchInterval
_ = SubscriptionsOptions
lqOpts

    getCohortSnapshot :: (CohortVariables, Cohort ()) -> STM (CohortId, CohortSnapshot)
getCohortSnapshot (CohortVariables
cohortVars, Cohort ()
handlerC) = do
      let C.Cohort CohortId
resId TVar (Maybe ResponseHash)
respRef SubscriberMap
curOpsTV SubscriberMap
newOpsTV () = Cohort ()
handlerC
      [(SubscriberId, Subscriber)]
curOpsL <- SubscriberMap -> STM [(SubscriberId, Subscriber)]
forall k v. TMap k v -> STM [(k, v)]
TMap.toList SubscriberMap
curOpsTV
      [(SubscriberId, Subscriber)]
newOpsL <- SubscriberMap -> STM [(SubscriberId, Subscriber)]
forall k v. TMap k v -> STM [(k, v)]
TMap.toList SubscriberMap
newOpsTV
      [(SubscriberId, Subscriber)]
-> ((SubscriberId, Subscriber) -> STM ()) -> STM ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [(SubscriberId, Subscriber)]
newOpsL (((SubscriberId, Subscriber) -> STM ()) -> STM ())
-> ((SubscriberId, Subscriber) -> STM ()) -> STM ()
forall a b. (a -> b) -> a -> b
$ \(SubscriberId
k, Subscriber
action) -> Subscriber -> SubscriberId -> SubscriberMap -> STM ()
forall k v. Hashable k => v -> k -> TMap k v -> STM ()
TMap.insert Subscriber
action SubscriberId
k SubscriberMap
curOpsTV
      SubscriberMap -> STM ()
forall k v. TMap k v -> STM ()
TMap.reset SubscriberMap
newOpsTV

      let cohortSnapshot :: CohortSnapshot
cohortSnapshot = CohortVariables
-> TVar (Maybe ResponseHash)
-> [Subscriber]
-> [Subscriber]
-> CohortSnapshot
C.CohortSnapshot CohortVariables
cohortVars TVar (Maybe ResponseHash)
respRef (((SubscriberId, Subscriber) -> Subscriber)
-> [(SubscriberId, Subscriber)] -> [Subscriber]
forall a b. (a -> b) -> [a] -> [b]
map (SubscriberId, Subscriber) -> Subscriber
forall a b. (a, b) -> b
snd [(SubscriberId, Subscriber)]
curOpsL) (((SubscriberId, Subscriber) -> Subscriber)
-> [(SubscriberId, Subscriber)] -> [Subscriber]
forall a b. (a -> b) -> [a] -> [b]
map (SubscriberId, Subscriber) -> Subscriber
forall a b. (a, b) -> b
snd [(SubscriberId, Subscriber)]
newOpsL)
      (CohortId, CohortSnapshot) -> STM (CohortId, CohortSnapshot)
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (CohortId
resId, CohortSnapshot
cohortSnapshot)

    getCohortOperations :: [(CohortId, CohortSnapshot)]
-> Either QErr [(CohortId, ByteString)]
-> [(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
     CohortSnapshot)]
getCohortOperations [(CohortId, CohortSnapshot)]
cohorts = \case
      Left QErr
e ->
        -- TODO: this is internal error
        let resp :: GQResult ByteString
resp = GQExecError -> GQResult ByteString
forall a. GQExecError -> Either GQExecError a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (GQExecError -> GQResult ByteString)
-> GQExecError -> GQResult ByteString
forall a b. (a -> b) -> a -> b
$ [Encoding] -> GQExecError
GQExecError [Bool -> QErr -> Encoding
encodeGQLErr Bool
False QErr
e]
         in [(GQResult ByteString
resp, CohortId
cohortId, Maybe (ResponseHash, Int)
forall a. Maybe a
Nothing, CohortSnapshot
snapshot) | (CohortId
cohortId, CohortSnapshot
snapshot) <- [(CohortId, CohortSnapshot)]
cohorts]
      Right [(CohortId, ByteString)]
responses -> do
        let cohortSnapshotMap :: HashMap CohortId CohortSnapshot
cohortSnapshotMap = [(CohortId, CohortSnapshot)] -> HashMap CohortId CohortSnapshot
forall k v. (Eq k, Hashable k) => [(k, v)] -> HashMap k v
HashMap.fromList [(CohortId, CohortSnapshot)]
cohorts
        (((CohortId, ByteString)
  -> Maybe
       (GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
        CohortSnapshot))
 -> [(CohortId, ByteString)]
 -> [(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
      CohortSnapshot)])
-> [(CohortId, ByteString)]
-> ((CohortId, ByteString)
    -> Maybe
         (GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
          CohortSnapshot))
-> [(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
     CohortSnapshot)]
forall a b c. (a -> b -> c) -> b -> a -> c
flip ((CohortId, ByteString)
 -> Maybe
      (GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
       CohortSnapshot))
-> [(CohortId, ByteString)]
-> [(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
     CohortSnapshot)]
forall a b. (a -> Maybe b) -> [a] -> [b]
forall (f :: * -> *) a b.
Filterable f =>
(a -> Maybe b) -> f a -> f b
mapMaybe [(CohortId, ByteString)]
responses (((CohortId, ByteString)
  -> Maybe
       (GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
        CohortSnapshot))
 -> [(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
      CohortSnapshot)])
-> ((CohortId, ByteString)
    -> Maybe
         (GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
          CohortSnapshot))
-> [(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
     CohortSnapshot)]
forall a b. (a -> b) -> a -> b
$ \(CohortId
cohortId, ByteString
respBS) ->
          let respHash :: ResponseHash
respHash = ByteString -> ResponseHash
mkRespHash ByteString
respBS
              respSize :: Int
respSize = ByteString -> Int
BS.length ByteString
respBS
           in -- TODO: currently we ignore the cases when the cohortId from
              -- Postgres response is not present in the cohort map of this batch
              -- (this shouldn't happen but if it happens it means a logic error and
              -- we should log it)
              (ByteString -> GQResult ByteString
forall a. a -> Either GQExecError a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe (Endo Value) -> ByteString -> ByteString
applyModifier Maybe (Endo Value)
modifier ByteString
respBS),CohortId
cohortId,(ResponseHash, Int) -> Maybe (ResponseHash, Int)
forall a. a -> Maybe a
Just (ResponseHash
respHash, Int
respSize),)
                (CohortSnapshot
 -> (GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
     CohortSnapshot))
-> Maybe CohortSnapshot
-> Maybe
     (GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
      CohortSnapshot)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> CohortId -> HashMap CohortId CohortSnapshot -> Maybe CohortSnapshot
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HashMap.lookup CohortId
cohortId HashMap CohortId CohortSnapshot
cohortSnapshotMap