{-# LANGUAGE TemplateHaskell #-}
module Hasura.GraphQL.Execute.Subscription.Poll.LiveQuery
(
pollLiveQuery,
)
where
import Control.Concurrent.Async qualified as A
import Control.Concurrent.STM qualified as STM
import Control.Lens
import Data.Aeson.Ordered qualified as JO
import Data.ByteString qualified as BS
import Data.HashMap.Strict qualified as HashMap
import Data.List.Split (chunksOf)
import Data.Monoid (Endo (..), Sum (..))
import Data.Text.Extended
import GHC.AssertNF.CPP
import Hasura.Base.Error
import Hasura.GraphQL.Execute.Backend
import Hasura.GraphQL.Execute.Subscription.Options
import Hasura.GraphQL.Execute.Subscription.Plan (applyModifier)
import Hasura.GraphQL.Execute.Subscription.Poll.Common hiding (Cohort (..), CohortMap, CohortSnapshot (..))
import Hasura.GraphQL.Execute.Subscription.Poll.Common qualified as C
import Hasura.GraphQL.Execute.Subscription.TMap qualified as TMap
import Hasura.GraphQL.Execute.Subscription.Types
import Hasura.GraphQL.ParameterizedQueryHash (ParameterizedQueryHash)
import Hasura.GraphQL.Transport.Backend
import Hasura.GraphQL.Transport.HTTP.Protocol
import Hasura.Logging (LogLevel (..))
import Hasura.Prelude
import Hasura.RQL.Types.Backend
import Hasura.RQL.Types.BackendTag (backendTag, reify)
import Hasura.RQL.Types.BackendType (BackendType (..), PostgresKind (Vanilla))
import Hasura.RQL.Types.Common (SourceName)
import Hasura.RQL.Types.Roles (RoleName)
import Hasura.RQL.Types.Subscription (SubscriptionType (..))
import Hasura.Server.Prometheus (PrometheusMetrics (..), SubscriptionMetrics (..), liveQuerySubscriptionLabel, recordSubcriptionMetric)
import Hasura.Server.Types (GranularPrometheusMetricsState (..))
import Refined (unrefine)
import System.Metrics.Prometheus.Gauge qualified as Prometheus.Gauge
import System.Metrics.Prometheus.HistogramVector qualified as HistogramVector
pushResultToCohort ::
GQResult BS.ByteString ->
Maybe ResponseHash ->
SubscriptionMetadata ->
CohortSnapshot 'LiveQuery ->
IO ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
pushResultToCohort :: GQResult ByteString
-> Maybe ResponseHash
-> SubscriptionMetadata
-> CohortSnapshot 'LiveQuery
-> IO ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
pushResultToCohort GQResult ByteString
result !Maybe ResponseHash
respHashM (SubscriptionMetadata DiffTime
dTime) CohortSnapshot 'LiveQuery
cohortSnapshot = do
Maybe ResponseHash
prevRespHashM <- TVar (Maybe ResponseHash) -> IO (Maybe ResponseHash)
forall a. TVar a -> IO a
STM.readTVarIO TVar (Maybe ResponseHash)
respRef
([Subscriber]
subscribersToPush, [Subscriber]
subscribersToIgnore) <-
if GQResult ByteString -> Bool
forall a. GQResult a -> Bool
isExecError GQResult ByteString
result Bool -> Bool -> Bool
|| Maybe ResponseHash
respHashM Maybe ResponseHash -> Maybe ResponseHash -> Bool
forall a. Eq a => a -> a -> Bool
/= Maybe ResponseHash
prevRespHashM
then do
$String
String -> Maybe ResponseHash -> IO ()
forall a. String -> a -> IO ()
assertNFHere Maybe ResponseHash
respHashM
STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
TVar (Maybe ResponseHash) -> Maybe ResponseHash -> STM ()
forall a. TVar a -> a -> STM ()
STM.writeTVar TVar (Maybe ResponseHash)
respRef Maybe ResponseHash
respHashM
([Subscriber], [Subscriber]) -> IO ([Subscriber], [Subscriber])
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ([Subscriber]
newSinks [Subscriber] -> [Subscriber] -> [Subscriber]
forall a. Semigroup a => a -> a -> a
<> [Subscriber]
curSinks, [Subscriber]
forall a. Monoid a => a
mempty)
else ([Subscriber], [Subscriber]) -> IO ([Subscriber], [Subscriber])
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ([Subscriber]
newSinks, [Subscriber]
curSinks)
[Subscriber] -> IO ()
pushResultToSubscribers [Subscriber]
subscribersToPush
([SubscriberExecutionDetails], [SubscriberExecutionDetails])
-> IO ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
(([SubscriberExecutionDetails], [SubscriberExecutionDetails])
-> IO ([SubscriberExecutionDetails], [SubscriberExecutionDetails]))
-> ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
-> IO ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
forall a b. (a -> b) -> a -> b
$ ASetter
([Subscriber], [Subscriber])
([SubscriberExecutionDetails], [SubscriberExecutionDetails])
Subscriber
SubscriberExecutionDetails
-> (Subscriber -> SubscriberExecutionDetails)
-> ([Subscriber], [Subscriber])
-> ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
forall s t a b. ASetter s t a b -> (a -> b) -> s -> t
over
(([Subscriber] -> Identity [SubscriberExecutionDetails])
-> ([Subscriber], [Subscriber])
-> Identity
([SubscriberExecutionDetails], [SubscriberExecutionDetails])
forall s t a b. Each s t a b => Traversal s t a b
Traversal
([Subscriber], [Subscriber])
([SubscriberExecutionDetails], [SubscriberExecutionDetails])
[Subscriber]
[SubscriberExecutionDetails]
each (([Subscriber] -> Identity [SubscriberExecutionDetails])
-> ([Subscriber], [Subscriber])
-> Identity
([SubscriberExecutionDetails], [SubscriberExecutionDetails]))
-> ((Subscriber -> Identity SubscriberExecutionDetails)
-> [Subscriber] -> Identity [SubscriberExecutionDetails])
-> ASetter
([Subscriber], [Subscriber])
([SubscriberExecutionDetails], [SubscriberExecutionDetails])
Subscriber
SubscriberExecutionDetails
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Subscriber -> Identity SubscriberExecutionDetails)
-> [Subscriber] -> Identity [SubscriberExecutionDetails]
forall s t a b. Each s t a b => Traversal s t a b
Traversal
[Subscriber]
[SubscriberExecutionDetails]
Subscriber
SubscriberExecutionDetails
each)
( \Subscriber {Maybe OperationName
RequestId
SubscriberMetadata
SubscriberId
OnChange
_sId :: SubscriberId
_sMetadata :: SubscriberMetadata
_sRequestId :: RequestId
_sOperationName :: Maybe OperationName
_sOnChangeCallback :: OnChange
_sId :: Subscriber -> SubscriberId
_sMetadata :: Subscriber -> SubscriberMetadata
_sRequestId :: Subscriber -> RequestId
_sOperationName :: Subscriber -> Maybe OperationName
_sOnChangeCallback :: Subscriber -> OnChange
..} ->
SubscriberId -> SubscriberMetadata -> SubscriberExecutionDetails
SubscriberExecutionDetails SubscriberId
_sId SubscriberMetadata
_sMetadata
)
([Subscriber]
subscribersToPush, [Subscriber]
subscribersToIgnore)
where
C.CohortSnapshot CohortVariables
_ TVar (Maybe ResponseHash)
respRef [Subscriber]
curSinks [Subscriber]
newSinks = CohortSnapshot 'LiveQuery
cohortSnapshot
response :: Either GQExecError SubscriptionResponse
response = GQResult ByteString
result GQResult ByteString
-> (ByteString -> SubscriptionResponse)
-> Either GQExecError SubscriptionResponse
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> (ByteString -> DiffTime -> SubscriptionResponse
`SubscriptionResponse` DiffTime
dTime)
pushResultToSubscribers :: [Subscriber] -> IO ()
pushResultToSubscribers =
(Subscriber -> IO ()) -> [Subscriber] -> IO ()
forall (f :: * -> *) a b. Foldable f => (a -> IO b) -> f a -> IO ()
A.mapConcurrently_ ((Subscriber -> IO ()) -> [Subscriber] -> IO ())
-> (Subscriber -> IO ()) -> [Subscriber] -> IO ()
forall a b. (a -> b) -> a -> b
$ \Subscriber {Maybe OperationName
RequestId
SubscriberMetadata
SubscriberId
OnChange
_sId :: Subscriber -> SubscriberId
_sMetadata :: Subscriber -> SubscriberMetadata
_sRequestId :: Subscriber -> RequestId
_sOperationName :: Subscriber -> Maybe OperationName
_sOnChangeCallback :: Subscriber -> OnChange
_sId :: SubscriberId
_sMetadata :: SubscriberMetadata
_sRequestId :: RequestId
_sOperationName :: Maybe OperationName
_sOnChangeCallback :: OnChange
..} -> OnChange
_sOnChangeCallback Either GQExecError SubscriptionResponse
response
pollLiveQuery ::
forall b.
(BackendTransport b) =>
PollerId ->
STM.TVar PollerResponseState ->
SubscriptionsOptions ->
(SourceName, SourceConfig b) ->
RoleName ->
ParameterizedQueryHash ->
MultiplexedQuery b ->
CohortMap 'LiveQuery ->
SubscriptionPostPollHook ->
PrometheusMetrics ->
IO GranularPrometheusMetricsState ->
TMap.TMap (Maybe OperationName) Int ->
ResolvedConnectionTemplate b ->
(Maybe (Endo JO.Value)) ->
IO ()
pollLiveQuery :: forall (b :: BackendType).
BackendTransport b =>
PollerId
-> TVar PollerResponseState
-> SubscriptionsOptions
-> (SourceName, SourceConfig b)
-> RoleName
-> ParameterizedQueryHash
-> MultiplexedQuery b
-> CohortMap 'LiveQuery
-> SubscriptionPostPollHook
-> PrometheusMetrics
-> IO GranularPrometheusMetricsState
-> TMap (Maybe OperationName) Int
-> ResolvedConnectionTemplate b
-> Maybe (Endo Value)
-> IO ()
pollLiveQuery PollerId
pollerId TVar PollerResponseState
pollerResponseState SubscriptionsOptions
lqOpts (SourceName
sourceName, SourceConfig b
sourceConfig) RoleName
roleName ParameterizedQueryHash
parameterizedQueryHash MultiplexedQuery b
query CohortMap 'LiveQuery
cohortMap SubscriptionPostPollHook
postPollHook PrometheusMetrics
prometheusMetrics IO GranularPrometheusMetricsState
granularPrometheusMetricsState TMap (Maybe OperationName) Int
operationNamesMap' ResolvedConnectionTemplate b
resolvedConnectionTemplate Maybe (Endo Value)
modifier = do
HashMap (Maybe OperationName) Int
operationNamesMap <- STM (HashMap (Maybe OperationName) Int)
-> IO (HashMap (Maybe OperationName) Int)
forall a. STM a -> IO a
STM.atomically (STM (HashMap (Maybe OperationName) Int)
-> IO (HashMap (Maybe OperationName) Int))
-> STM (HashMap (Maybe OperationName) Int)
-> IO (HashMap (Maybe OperationName) Int)
forall a b. (a -> b) -> a -> b
$ TMap (Maybe OperationName) Int
-> STM (HashMap (Maybe OperationName) Int)
forall k v. TMap k v -> STM (HashMap k v)
TMap.getMap TMap (Maybe OperationName) Int
operationNamesMap'
(DiffTime
totalTime, (DiffTime
snapshotTime, ([BatchExecutionDetails]
batchesDetails, [Maybe PollDetailsError]
maybeErrors))) <- IO (DiffTime, ([BatchExecutionDetails], [Maybe PollDetailsError]))
-> IO
(DiffTime,
(DiffTime, ([BatchExecutionDetails], [Maybe PollDetailsError])))
forall (m :: * -> *) a. MonadIO m => m a -> m (DiffTime, a)
withElapsedTime (IO (DiffTime, ([BatchExecutionDetails], [Maybe PollDetailsError]))
-> IO
(DiffTime,
(DiffTime, ([BatchExecutionDetails], [Maybe PollDetailsError]))))
-> IO
(DiffTime, ([BatchExecutionDetails], [Maybe PollDetailsError]))
-> IO
(DiffTime,
(DiffTime, ([BatchExecutionDetails], [Maybe PollDetailsError])))
forall a b. (a -> b) -> a -> b
$ do
(DiffTime
snapshotTime, [(BatchId, [(CohortId, CohortSnapshot)])]
cohortBatches) <- IO [(BatchId, [(CohortId, CohortSnapshot)])]
-> IO (DiffTime, [(BatchId, [(CohortId, CohortSnapshot)])])
forall (m :: * -> *) a. MonadIO m => m a -> m (DiffTime, a)
withElapsedTime (IO [(BatchId, [(CohortId, CohortSnapshot)])]
-> IO (DiffTime, [(BatchId, [(CohortId, CohortSnapshot)])]))
-> IO [(BatchId, [(CohortId, CohortSnapshot)])]
-> IO (DiffTime, [(BatchId, [(CohortId, CohortSnapshot)])])
forall a b. (a -> b) -> a -> b
$ do
[(CohortVariables, Cohort ())]
cohorts <- STM [(CohortVariables, Cohort ())]
-> IO [(CohortVariables, Cohort ())]
forall a. STM a -> IO a
STM.atomically (STM [(CohortVariables, Cohort ())]
-> IO [(CohortVariables, Cohort ())])
-> STM [(CohortVariables, Cohort ())]
-> IO [(CohortVariables, Cohort ())]
forall a b. (a -> b) -> a -> b
$ TMap CohortVariables (Cohort ())
-> STM [(CohortVariables, Cohort ())]
forall k v. TMap k v -> STM [(k, v)]
TMap.toList TMap CohortVariables (Cohort ())
CohortMap 'LiveQuery
cohortMap
[(CohortId, CohortSnapshot)]
cohortSnapshots <- ((CohortVariables, Cohort ()) -> IO (CohortId, CohortSnapshot))
-> [(CohortVariables, Cohort ())]
-> IO [(CohortId, CohortSnapshot)]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM (STM (CohortId, CohortSnapshot) -> IO (CohortId, CohortSnapshot)
forall a. STM a -> IO a
STM.atomically (STM (CohortId, CohortSnapshot) -> IO (CohortId, CohortSnapshot))
-> ((CohortVariables, Cohort ()) -> STM (CohortId, CohortSnapshot))
-> (CohortVariables, Cohort ())
-> IO (CohortId, CohortSnapshot)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (CohortVariables, Cohort ()) -> STM (CohortId, CohortSnapshot)
getCohortSnapshot) [(CohortVariables, Cohort ())]
cohorts
let cohortBatches :: [[(CohortId, CohortSnapshot)]]
cohortBatches = Int
-> [(CohortId, CohortSnapshot)] -> [[(CohortId, CohortSnapshot)]]
forall e. Int -> [e] -> [[e]]
chunksOf (Refined NonNegative Int -> Int
forall {k} (p :: k) x. Refined p x -> x
unrefine (BatchSize -> Refined NonNegative Int
unBatchSize BatchSize
batchSize)) [(CohortId, CohortSnapshot)]
cohortSnapshots
[(BatchId, [(CohortId, CohortSnapshot)])]
-> IO [(BatchId, [(CohortId, CohortSnapshot)])]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([(BatchId, [(CohortId, CohortSnapshot)])]
-> IO [(BatchId, [(CohortId, CohortSnapshot)])])
-> [(BatchId, [(CohortId, CohortSnapshot)])]
-> IO [(BatchId, [(CohortId, CohortSnapshot)])]
forall a b. (a -> b) -> a -> b
$ [BatchId]
-> [[(CohortId, CohortSnapshot)]]
-> [(BatchId, [(CohortId, CohortSnapshot)])]
forall a b. [a] -> [b] -> [(a, b)]
zip (Int -> BatchId
BatchId (Int -> BatchId) -> [Int] -> [BatchId]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [Int
1 ..]) [[(CohortId, CohortSnapshot)]]
cohortBatches
[(BatchExecutionDetails, Maybe PollDetailsError)]
batchesDetailsWithMaybeError <- [(BatchId, [(CohortId, CohortSnapshot)])]
-> ((BatchId, [(CohortId, CohortSnapshot)])
-> IO (BatchExecutionDetails, Maybe PollDetailsError))
-> IO [(BatchExecutionDetails, Maybe PollDetailsError)]
forall (t :: * -> *) a b.
Traversable t =>
t a -> (a -> IO b) -> IO (t b)
A.forConcurrently [(BatchId, [(CohortId, CohortSnapshot)])]
cohortBatches (((BatchId, [(CohortId, CohortSnapshot)])
-> IO (BatchExecutionDetails, Maybe PollDetailsError))
-> IO [(BatchExecutionDetails, Maybe PollDetailsError)])
-> ((BatchId, [(CohortId, CohortSnapshot)])
-> IO (BatchExecutionDetails, Maybe PollDetailsError))
-> IO [(BatchExecutionDetails, Maybe PollDetailsError)]
forall a b. (a -> b) -> a -> b
$ \(BatchId
batchId, [(CohortId, CohortSnapshot)]
cohorts) -> do
(DiffTime
queryExecutionTime, Either QErr [(CohortId, ByteString)]
mxRes) <- forall (b :: BackendType) (m :: * -> *).
(BackendTransport b, MonadIO m, MonadBaseControl IO m) =>
SourceConfig b
-> MultiplexedQuery b
-> [(CohortId, CohortVariables)]
-> ResolvedConnectionTemplate b
-> m (DiffTime, Either QErr [(CohortId, ByteString)])
runDBSubscription @b SourceConfig b
sourceConfig MultiplexedQuery b
query (ASetter
[(CohortId, CohortSnapshot)]
[(CohortId, CohortVariables)]
CohortSnapshot
CohortVariables
-> (CohortSnapshot -> CohortVariables)
-> [(CohortId, CohortSnapshot)]
-> [(CohortId, CohortVariables)]
forall s t a b. ASetter s t a b -> (a -> b) -> s -> t
over (((CohortId, CohortSnapshot)
-> Identity (CohortId, CohortVariables))
-> [(CohortId, CohortSnapshot)]
-> Identity [(CohortId, CohortVariables)]
forall s t a b. Each s t a b => Traversal s t a b
Traversal
[(CohortId, CohortSnapshot)]
[(CohortId, CohortVariables)]
(CohortId, CohortSnapshot)
(CohortId, CohortVariables)
each (((CohortId, CohortSnapshot)
-> Identity (CohortId, CohortVariables))
-> [(CohortId, CohortSnapshot)]
-> Identity [(CohortId, CohortVariables)])
-> ((CohortSnapshot -> Identity CohortVariables)
-> (CohortId, CohortSnapshot)
-> Identity (CohortId, CohortVariables))
-> ASetter
[(CohortId, CohortSnapshot)]
[(CohortId, CohortVariables)]
CohortSnapshot
CohortVariables
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (CohortSnapshot -> Identity CohortVariables)
-> (CohortId, CohortSnapshot)
-> Identity (CohortId, CohortVariables)
forall s t a b. Field2 s t a b => Lens s t a b
Lens
(CohortId, CohortSnapshot)
(CohortId, CohortVariables)
CohortSnapshot
CohortVariables
_2) CohortSnapshot -> CohortVariables
C._csVariables [(CohortId, CohortSnapshot)]
cohorts) ResolvedConnectionTemplate b
resolvedConnectionTemplate
let dbExecTimeMetric :: HistogramVector SubscriptionLabel
dbExecTimeMetric = SubscriptionMetrics -> HistogramVector SubscriptionLabel
submDBExecTotalTime (SubscriptionMetrics -> HistogramVector SubscriptionLabel)
-> SubscriptionMetrics -> HistogramVector SubscriptionLabel
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics -> SubscriptionMetrics
pmSubscriptionMetrics (PrometheusMetrics -> SubscriptionMetrics)
-> PrometheusMetrics -> SubscriptionMetrics
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics
prometheusMetrics
IO GranularPrometheusMetricsState
-> Bool
-> HashMap (Maybe OperationName) Int
-> ParameterizedQueryHash
-> SubscriptionKindLabel
-> (SubscriptionLabel -> IO ())
-> IO ()
forall (m :: * -> *).
MonadIO m =>
IO GranularPrometheusMetricsState
-> Bool
-> HashMap (Maybe OperationName) Int
-> ParameterizedQueryHash
-> SubscriptionKindLabel
-> (SubscriptionLabel -> IO ())
-> m ()
recordSubcriptionMetric
IO GranularPrometheusMetricsState
granularPrometheusMetricsState
Bool
True
HashMap (Maybe OperationName) Int
operationNamesMap
ParameterizedQueryHash
parameterizedQueryHash
SubscriptionKindLabel
liveQuerySubscriptionLabel
((SubscriptionLabel -> Double -> IO ())
-> Double -> SubscriptionLabel -> IO ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip (HistogramVector SubscriptionLabel
-> SubscriptionLabel -> Double -> IO ()
forall label.
Ord label =>
HistogramVector label -> label -> Double -> IO ()
HistogramVector.observe HistogramVector SubscriptionLabel
dbExecTimeMetric) (DiffTime -> Double
forall a b. (Real a, Fractional b) => a -> b
realToFrac DiffTime
queryExecutionTime))
PollerResponseState
previousPollerResponseState <- TVar PollerResponseState -> IO PollerResponseState
forall a. TVar a -> IO a
STM.readTVarIO TVar PollerResponseState
pollerResponseState
Maybe PollDetailsError
maybeError <- case Either QErr [(CohortId, ByteString)]
mxRes of
Left QErr
err -> do
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (PollerResponseState
previousPollerResponseState PollerResponseState -> PollerResponseState -> Bool
forall a. Eq a => a -> a -> Bool
== PollerResponseState
PRSSuccess) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Gauge -> IO ()
Prometheus.Gauge.inc (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ SubscriptionMetrics -> Gauge
submActiveLiveQueryPollersInError (SubscriptionMetrics -> Gauge) -> SubscriptionMetrics -> Gauge
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics -> SubscriptionMetrics
pmSubscriptionMetrics PrometheusMetrics
prometheusMetrics
STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar PollerResponseState -> PollerResponseState -> STM ()
forall a. TVar a -> a -> STM ()
STM.writeTVar TVar PollerResponseState
pollerResponseState PollerResponseState
PRSError
let pollDetailsError :: PollDetailsError
pollDetailsError =
PollDetailsError
{ _pdeBatchId :: BatchId
_pdeBatchId = BatchId
batchId,
_pdeErrorDetails :: QErr
_pdeErrorDetails = QErr
err
}
Maybe PollDetailsError -> IO (Maybe PollDetailsError)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe PollDetailsError -> IO (Maybe PollDetailsError))
-> Maybe PollDetailsError -> IO (Maybe PollDetailsError)
forall a b. (a -> b) -> a -> b
$ PollDetailsError -> Maybe PollDetailsError
forall a. a -> Maybe a
Just PollDetailsError
pollDetailsError
Right [(CohortId, ByteString)]
_ -> do
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (PollerResponseState
previousPollerResponseState PollerResponseState -> PollerResponseState -> Bool
forall a. Eq a => a -> a -> Bool
== PollerResponseState
PRSError) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Gauge -> IO ()
Prometheus.Gauge.dec (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ SubscriptionMetrics -> Gauge
submActiveLiveQueryPollersInError (SubscriptionMetrics -> Gauge) -> SubscriptionMetrics -> Gauge
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics -> SubscriptionMetrics
pmSubscriptionMetrics PrometheusMetrics
prometheusMetrics
STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar PollerResponseState -> PollerResponseState -> STM ()
forall a. TVar a -> a -> STM ()
STM.writeTVar TVar PollerResponseState
pollerResponseState PollerResponseState
PRSSuccess
Maybe PollDetailsError -> IO (Maybe PollDetailsError)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe PollDetailsError
forall a. Maybe a
Nothing
let lqMeta :: SubscriptionMetadata
lqMeta = DiffTime -> SubscriptionMetadata
SubscriptionMetadata (DiffTime -> SubscriptionMetadata)
-> DiffTime -> SubscriptionMetadata
forall a b. (a -> b) -> a -> b
$ DiffTime -> DiffTime
forall x y. (Duration x, Duration y) => x -> y
convertDuration DiffTime
queryExecutionTime
operations :: [(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
CohortSnapshot)]
operations = [(CohortId, CohortSnapshot)]
-> Either QErr [(CohortId, ByteString)]
-> [(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
CohortSnapshot)]
getCohortOperations [(CohortId, CohortSnapshot)]
cohorts Either QErr [(CohortId, ByteString)]
mxRes
batchResponseSize :: Maybe Int
batchResponseSize =
case Either QErr [(CohortId, ByteString)]
mxRes of
Left QErr
_ -> Maybe Int
forall a. Maybe a
Nothing
Right [(CohortId, ByteString)]
resp -> Int -> Maybe Int
forall a. a -> Maybe a
Just (Int -> Maybe Int) -> Int -> Maybe Int
forall a b. (a -> b) -> a -> b
$ Sum Int -> Int
forall a. Sum a -> a
getSum (Sum Int -> Int) -> Sum Int -> Int
forall a b. (a -> b) -> a -> b
$ ((CohortId, ByteString) -> Sum Int)
-> [(CohortId, ByteString)] -> Sum Int
forall m a. Monoid m => (a -> m) -> [a] -> m
forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
foldMap (Int -> Sum Int
forall a. a -> Sum a
Sum (Int -> Sum Int)
-> ((CohortId, ByteString) -> Int)
-> (CohortId, ByteString)
-> Sum Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Int
BS.length (ByteString -> Int)
-> ((CohortId, ByteString) -> ByteString)
-> (CohortId, ByteString)
-> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (CohortId, ByteString) -> ByteString
forall a b. (a, b) -> b
snd) [(CohortId, ByteString)]
resp
(DiffTime
pushTime, [CohortExecutionDetails]
cohortsExecutionDetails) <- IO [CohortExecutionDetails]
-> IO (DiffTime, [CohortExecutionDetails])
forall (m :: * -> *) a. MonadIO m => m a -> m (DiffTime, a)
withElapsedTime
(IO [CohortExecutionDetails]
-> IO (DiffTime, [CohortExecutionDetails]))
-> IO [CohortExecutionDetails]
-> IO (DiffTime, [CohortExecutionDetails])
forall a b. (a -> b) -> a -> b
$ [(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
CohortSnapshot)]
-> ((GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
CohortSnapshot)
-> IO CohortExecutionDetails)
-> IO [CohortExecutionDetails]
forall (t :: * -> *) a b.
Traversable t =>
t a -> (a -> IO b) -> IO (t b)
A.forConcurrently [(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
CohortSnapshot)]
operations
(((GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
CohortSnapshot)
-> IO CohortExecutionDetails)
-> IO [CohortExecutionDetails])
-> ((GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
CohortSnapshot)
-> IO CohortExecutionDetails)
-> IO [CohortExecutionDetails]
forall a b. (a -> b) -> a -> b
$ \(GQResult ByteString
res, CohortId
cohortId, Maybe (ResponseHash, Int)
respData, CohortSnapshot
snapshot) -> do
([SubscriberExecutionDetails]
pushedSubscribers, [SubscriberExecutionDetails]
ignoredSubscribers) <-
GQResult ByteString
-> Maybe ResponseHash
-> SubscriptionMetadata
-> CohortSnapshot 'LiveQuery
-> IO ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
pushResultToCohort GQResult ByteString
res ((ResponseHash, Int) -> ResponseHash
forall a b. (a, b) -> a
fst ((ResponseHash, Int) -> ResponseHash)
-> Maybe (ResponseHash, Int) -> Maybe ResponseHash
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (ResponseHash, Int)
respData) SubscriptionMetadata
lqMeta CohortSnapshot
CohortSnapshot 'LiveQuery
snapshot
CohortExecutionDetails -> IO CohortExecutionDetails
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
CohortExecutionDetails
{ _cedCohortId :: CohortId
_cedCohortId = CohortId
cohortId,
_cedVariables :: CohortVariables
_cedVariables = CohortSnapshot -> CohortVariables
C._csVariables CohortSnapshot
snapshot,
_cedPushedTo :: [SubscriberExecutionDetails]
_cedPushedTo = [SubscriberExecutionDetails]
pushedSubscribers,
_cedIgnored :: [SubscriberExecutionDetails]
_cedIgnored = [SubscriberExecutionDetails]
ignoredSubscribers,
_cedResponseSize :: Maybe Int
_cedResponseSize = (ResponseHash, Int) -> Int
forall a b. (a, b) -> b
snd ((ResponseHash, Int) -> Int)
-> Maybe (ResponseHash, Int) -> Maybe Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (ResponseHash, Int)
respData,
_cedBatchId :: BatchId
_cedBatchId = BatchId
batchId
}
let pgExecutionTime :: Maybe DiffTime
pgExecutionTime = case BackendTag b -> BackendType
forall (b :: BackendType). BackendTag b -> BackendType
reify (forall (b :: BackendType). HasTag b => BackendTag b
backendTag @b) of
Postgres PostgresKind
Vanilla -> DiffTime -> Maybe DiffTime
forall a. a -> Maybe a
Just DiffTime
queryExecutionTime
BackendType
_ -> Maybe DiffTime
forall a. Maybe a
Nothing
batchExecDetails :: BatchExecutionDetails
batchExecDetails =
Maybe DiffTime
-> DiffTime
-> DiffTime
-> BatchId
-> [CohortExecutionDetails]
-> Maybe Int
-> BatchExecutionDetails
BatchExecutionDetails
Maybe DiffTime
pgExecutionTime
DiffTime
queryExecutionTime
DiffTime
pushTime
BatchId
batchId
[CohortExecutionDetails]
cohortsExecutionDetails
Maybe Int
batchResponseSize
(BatchExecutionDetails, Maybe PollDetailsError)
-> IO (BatchExecutionDetails, Maybe PollDetailsError)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((BatchExecutionDetails, Maybe PollDetailsError)
-> IO (BatchExecutionDetails, Maybe PollDetailsError))
-> (BatchExecutionDetails, Maybe PollDetailsError)
-> IO (BatchExecutionDetails, Maybe PollDetailsError)
forall a b. (a -> b) -> a -> b
$ (BatchExecutionDetails
batchExecDetails, Maybe PollDetailsError
maybeError)
(DiffTime, ([BatchExecutionDetails], [Maybe PollDetailsError]))
-> IO
(DiffTime, ([BatchExecutionDetails], [Maybe PollDetailsError]))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (DiffTime
snapshotTime, [(BatchExecutionDetails, Maybe PollDetailsError)]
-> ([BatchExecutionDetails], [Maybe PollDetailsError])
forall a b. [(a, b)] -> ([a], [b])
unzip [(BatchExecutionDetails, Maybe PollDetailsError)]
batchesDetailsWithMaybeError)
let initPollDetails :: PollDetails
initPollDetails =
PollDetails
{ _pdPollerId :: PollerId
_pdPollerId = PollerId
pollerId,
_pdKind :: SubscriptionType
_pdKind = SubscriptionType
LiveQuery,
_pdGeneratedSql :: Text
_pdGeneratedSql = MultiplexedQuery b -> Text
forall a. ToTxt a => a -> Text
toTxt MultiplexedQuery b
query,
_pdSnapshotTime :: DiffTime
_pdSnapshotTime = DiffTime
snapshotTime,
_pdBatches :: [BatchExecutionDetails]
_pdBatches = [BatchExecutionDetails]
batchesDetails,
_pdLiveQueryOptions :: SubscriptionsOptions
_pdLiveQueryOptions = SubscriptionsOptions
lqOpts,
_pdTotalTime :: DiffTime
_pdTotalTime = DiffTime
totalTime,
_pdSource :: SourceName
_pdSource = SourceName
sourceName,
_pdRole :: RoleName
_pdRole = RoleName
roleName,
_pdParameterizedQueryHash :: ParameterizedQueryHash
_pdParameterizedQueryHash = ParameterizedQueryHash
parameterizedQueryHash,
_pdLogLevel :: LogLevel
_pdLogLevel = LogLevel
LevelInfo,
_pdErrors :: Maybe [PollDetailsError]
_pdErrors = Maybe [PollDetailsError]
forall a. Maybe a
Nothing
}
maybePollDetailsErrors :: Maybe [PollDetailsError]
maybePollDetailsErrors = [Maybe PollDetailsError] -> Maybe [PollDetailsError]
forall (t :: * -> *) (f :: * -> *) a.
(Traversable t, Applicative f) =>
t (f a) -> f (t a)
forall (f :: * -> *) a. Applicative f => [f a] -> f [a]
sequenceA [Maybe PollDetailsError]
maybeErrors
pollDetails :: PollDetails
pollDetails = case Maybe [PollDetailsError]
maybePollDetailsErrors of
Maybe [PollDetailsError]
Nothing -> PollDetails
initPollDetails
Just [PollDetailsError]
pollDetailsErrors ->
PollDetails
initPollDetails
{ _pdLogLevel :: LogLevel
_pdLogLevel = LogLevel
LevelError,
_pdErrors :: Maybe [PollDetailsError]
_pdErrors = [PollDetailsError] -> Maybe [PollDetailsError]
forall a. a -> Maybe a
Just [PollDetailsError]
pollDetailsErrors
}
SubscriptionPostPollHook
postPollHook PollDetails
pollDetails
let totalTimeMetric :: HistogramVector SubscriptionLabel
totalTimeMetric = SubscriptionMetrics -> HistogramVector SubscriptionLabel
submTotalTime (SubscriptionMetrics -> HistogramVector SubscriptionLabel)
-> SubscriptionMetrics -> HistogramVector SubscriptionLabel
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics -> SubscriptionMetrics
pmSubscriptionMetrics (PrometheusMetrics -> SubscriptionMetrics)
-> PrometheusMetrics -> SubscriptionMetrics
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics
prometheusMetrics
IO GranularPrometheusMetricsState
-> Bool
-> HashMap (Maybe OperationName) Int
-> ParameterizedQueryHash
-> SubscriptionKindLabel
-> (SubscriptionLabel -> IO ())
-> IO ()
forall (m :: * -> *).
MonadIO m =>
IO GranularPrometheusMetricsState
-> Bool
-> HashMap (Maybe OperationName) Int
-> ParameterizedQueryHash
-> SubscriptionKindLabel
-> (SubscriptionLabel -> IO ())
-> m ()
recordSubcriptionMetric
IO GranularPrometheusMetricsState
granularPrometheusMetricsState
Bool
True
HashMap (Maybe OperationName) Int
operationNamesMap
ParameterizedQueryHash
parameterizedQueryHash
SubscriptionKindLabel
liveQuerySubscriptionLabel
((SubscriptionLabel -> Double -> IO ())
-> Double -> SubscriptionLabel -> IO ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip (HistogramVector SubscriptionLabel
-> SubscriptionLabel -> Double -> IO ()
forall label.
Ord label =>
HistogramVector label -> label -> Double -> IO ()
HistogramVector.observe HistogramVector SubscriptionLabel
totalTimeMetric) (DiffTime -> Double
forall a b. (Real a, Fractional b) => a -> b
realToFrac DiffTime
totalTime))
where
SubscriptionsOptions BatchSize
batchSize RefetchInterval
_ = SubscriptionsOptions
lqOpts
getCohortSnapshot :: (CohortVariables, Cohort ()) -> STM (CohortId, CohortSnapshot)
getCohortSnapshot (CohortVariables
cohortVars, Cohort ()
handlerC) = do
let C.Cohort CohortId
resId TVar (Maybe ResponseHash)
respRef SubscriberMap
curOpsTV SubscriberMap
newOpsTV () = Cohort ()
handlerC
[(SubscriberId, Subscriber)]
curOpsL <- SubscriberMap -> STM [(SubscriberId, Subscriber)]
forall k v. TMap k v -> STM [(k, v)]
TMap.toList SubscriberMap
curOpsTV
[(SubscriberId, Subscriber)]
newOpsL <- SubscriberMap -> STM [(SubscriberId, Subscriber)]
forall k v. TMap k v -> STM [(k, v)]
TMap.toList SubscriberMap
newOpsTV
[(SubscriberId, Subscriber)]
-> ((SubscriberId, Subscriber) -> STM ()) -> STM ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [(SubscriberId, Subscriber)]
newOpsL (((SubscriberId, Subscriber) -> STM ()) -> STM ())
-> ((SubscriberId, Subscriber) -> STM ()) -> STM ()
forall a b. (a -> b) -> a -> b
$ \(SubscriberId
k, Subscriber
action) -> Subscriber -> SubscriberId -> SubscriberMap -> STM ()
forall k v. Hashable k => v -> k -> TMap k v -> STM ()
TMap.insert Subscriber
action SubscriberId
k SubscriberMap
curOpsTV
SubscriberMap -> STM ()
forall k v. TMap k v -> STM ()
TMap.reset SubscriberMap
newOpsTV
let cohortSnapshot :: CohortSnapshot
cohortSnapshot = CohortVariables
-> TVar (Maybe ResponseHash)
-> [Subscriber]
-> [Subscriber]
-> CohortSnapshot
C.CohortSnapshot CohortVariables
cohortVars TVar (Maybe ResponseHash)
respRef (((SubscriberId, Subscriber) -> Subscriber)
-> [(SubscriberId, Subscriber)] -> [Subscriber]
forall a b. (a -> b) -> [a] -> [b]
map (SubscriberId, Subscriber) -> Subscriber
forall a b. (a, b) -> b
snd [(SubscriberId, Subscriber)]
curOpsL) (((SubscriberId, Subscriber) -> Subscriber)
-> [(SubscriberId, Subscriber)] -> [Subscriber]
forall a b. (a -> b) -> [a] -> [b]
map (SubscriberId, Subscriber) -> Subscriber
forall a b. (a, b) -> b
snd [(SubscriberId, Subscriber)]
newOpsL)
(CohortId, CohortSnapshot) -> STM (CohortId, CohortSnapshot)
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (CohortId
resId, CohortSnapshot
cohortSnapshot)
getCohortOperations :: [(CohortId, CohortSnapshot)]
-> Either QErr [(CohortId, ByteString)]
-> [(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
CohortSnapshot)]
getCohortOperations [(CohortId, CohortSnapshot)]
cohorts = \case
Left QErr
e ->
let resp :: GQResult ByteString
resp = GQExecError -> GQResult ByteString
forall a. GQExecError -> Either GQExecError a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (GQExecError -> GQResult ByteString)
-> GQExecError -> GQResult ByteString
forall a b. (a -> b) -> a -> b
$ [Encoding] -> GQExecError
GQExecError [Bool -> QErr -> Encoding
encodeGQLErr Bool
False QErr
e]
in [(GQResult ByteString
resp, CohortId
cohortId, Maybe (ResponseHash, Int)
forall a. Maybe a
Nothing, CohortSnapshot
snapshot) | (CohortId
cohortId, CohortSnapshot
snapshot) <- [(CohortId, CohortSnapshot)]
cohorts]
Right [(CohortId, ByteString)]
responses -> do
let cohortSnapshotMap :: HashMap CohortId CohortSnapshot
cohortSnapshotMap = [(CohortId, CohortSnapshot)] -> HashMap CohortId CohortSnapshot
forall k v. (Eq k, Hashable k) => [(k, v)] -> HashMap k v
HashMap.fromList [(CohortId, CohortSnapshot)]
cohorts
(((CohortId, ByteString)
-> Maybe
(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
CohortSnapshot))
-> [(CohortId, ByteString)]
-> [(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
CohortSnapshot)])
-> [(CohortId, ByteString)]
-> ((CohortId, ByteString)
-> Maybe
(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
CohortSnapshot))
-> [(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
CohortSnapshot)]
forall a b c. (a -> b -> c) -> b -> a -> c
flip ((CohortId, ByteString)
-> Maybe
(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
CohortSnapshot))
-> [(CohortId, ByteString)]
-> [(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
CohortSnapshot)]
forall a b. (a -> Maybe b) -> [a] -> [b]
forall (f :: * -> *) a b.
Filterable f =>
(a -> Maybe b) -> f a -> f b
mapMaybe [(CohortId, ByteString)]
responses (((CohortId, ByteString)
-> Maybe
(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
CohortSnapshot))
-> [(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
CohortSnapshot)])
-> ((CohortId, ByteString)
-> Maybe
(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
CohortSnapshot))
-> [(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
CohortSnapshot)]
forall a b. (a -> b) -> a -> b
$ \(CohortId
cohortId, ByteString
respBS) ->
let respHash :: ResponseHash
respHash = ByteString -> ResponseHash
mkRespHash ByteString
respBS
respSize :: Int
respSize = ByteString -> Int
BS.length ByteString
respBS
in
(ByteString -> GQResult ByteString
forall a. a -> Either GQExecError a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe (Endo Value) -> ByteString -> ByteString
applyModifier Maybe (Endo Value)
modifier ByteString
respBS),CohortId
cohortId,(ResponseHash, Int) -> Maybe (ResponseHash, Int)
forall a. a -> Maybe a
Just (ResponseHash
respHash, Int
respSize),)
(CohortSnapshot
-> (GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
CohortSnapshot))
-> Maybe CohortSnapshot
-> Maybe
(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
CohortSnapshot)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> CohortId -> HashMap CohortId CohortSnapshot -> Maybe CohortSnapshot
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HashMap.lookup CohortId
cohortId HashMap CohortId CohortSnapshot
cohortSnapshotMap