{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE TemplateHaskell #-}
module Hasura.GraphQL.Execute.Subscription.Poll.StreamingQuery
(
pollStreamingQuery,
)
where
import Control.Concurrent.Async qualified as A
import Control.Concurrent.STM qualified as STM
import Control.Lens
import Data.Aeson.Ordered qualified as JO
import Data.ByteString qualified as BS
import Data.HashMap.Strict.Extended qualified as HashMap
import Data.HashSet qualified as Set
import Data.List.Split (chunksOf)
import Data.Monoid (Endo (..), Sum (..))
import Data.Text.Extended
import GHC.AssertNF.CPP
import Hasura.Base.Error
import Hasura.GraphQL.Execute.Backend
import Hasura.GraphQL.Execute.Subscription.Options
import Hasura.GraphQL.Execute.Subscription.Plan
import Hasura.GraphQL.Execute.Subscription.Plan qualified as C
import Hasura.GraphQL.Execute.Subscription.Poll.Common hiding (Cohort (..), CohortMap, CohortSnapshot (..))
import Hasura.GraphQL.Execute.Subscription.Poll.Common qualified as C
import Hasura.GraphQL.Execute.Subscription.TMap qualified as TMap
import Hasura.GraphQL.Execute.Subscription.Types
import Hasura.GraphQL.ParameterizedQueryHash (ParameterizedQueryHash)
import Hasura.GraphQL.Transport.Backend
import Hasura.GraphQL.Transport.HTTP.Protocol
import Hasura.Logging (LogLevel (..))
import Hasura.Prelude
import Hasura.RQL.Types.Backend
import Hasura.RQL.Types.BackendTag (backendTag, reify)
import Hasura.RQL.Types.BackendType (BackendType (..), PostgresKind (Vanilla))
import Hasura.RQL.Types.Common (SourceName)
import Hasura.RQL.Types.Roles (RoleName)
import Hasura.RQL.Types.Subscription (SubscriptionType (..))
import Hasura.SQL.Value (TxtEncodedVal (..))
import Hasura.Server.Prometheus (PrometheusMetrics (..), SubscriptionMetrics (..), recordSubcriptionMetric, streamingSubscriptionLabel)
import Hasura.Server.Types (GranularPrometheusMetricsState (..))
import Language.GraphQL.Draft.Syntax qualified as G
import Refined (unrefine)
import System.Metrics.Prometheus.Gauge qualified as Prometheus.Gauge
import System.Metrics.Prometheus.HistogramVector qualified as HistogramVector
import Text.Shakespeare.Text (st)
mergeOldAndNewCursorValues :: CursorVariableValues -> CursorVariableValues -> CursorVariableValues
mergeOldAndNewCursorValues :: CursorVariableValues
-> CursorVariableValues -> CursorVariableValues
mergeOldAndNewCursorValues (CursorVariableValues HashMap Name TxtEncodedVal
prevPollCursorValues) (CursorVariableValues HashMap Name TxtEncodedVal
currentPollCursorValues) =
let combineFn :: TxtEncodedVal -> TxtEncodedVal -> TxtEncodedVal
combineFn TxtEncodedVal
previousVal TxtEncodedVal
currentVal =
case TxtEncodedVal
currentVal of
TxtEncodedVal
TENull -> TxtEncodedVal
previousVal
TELit Text
t -> Text -> TxtEncodedVal
TELit Text
t
in HashMap Name TxtEncodedVal -> CursorVariableValues
CursorVariableValues (HashMap Name TxtEncodedVal -> CursorVariableValues)
-> HashMap Name TxtEncodedVal -> CursorVariableValues
forall a b. (a -> b) -> a -> b
$ (TxtEncodedVal -> TxtEncodedVal -> TxtEncodedVal)
-> HashMap Name TxtEncodedVal
-> HashMap Name TxtEncodedVal
-> HashMap Name TxtEncodedVal
forall k v.
(Eq k, Hashable k) =>
(v -> v -> v) -> HashMap k v -> HashMap k v -> HashMap k v
HashMap.unionWith TxtEncodedVal -> TxtEncodedVal -> TxtEncodedVal
combineFn HashMap Name TxtEncodedVal
prevPollCursorValues HashMap Name TxtEncodedVal
currentPollCursorValues
pushResultToCohort ::
GQResult BS.ByteString ->
Maybe ResponseHash ->
SubscriptionMetadata ->
CursorVariableValues ->
G.Name ->
(CohortSnapshot 'Streaming, Cohort 'Streaming) ->
IO ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
pushResultToCohort :: GQResult ByteString
-> Maybe ResponseHash
-> SubscriptionMetadata
-> CursorVariableValues
-> Name
-> (CohortSnapshot 'Streaming, Cohort 'Streaming)
-> IO ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
pushResultToCohort GQResult ByteString
result !Maybe ResponseHash
respHashM (SubscriptionMetadata DiffTime
dTime) CursorVariableValues
cursorValues Name
rootFieldName (CohortSnapshot 'Streaming
cohortSnapshot, Cohort 'Streaming
cohort) = do
Maybe ResponseHash
prevRespHashM <- TVar (Maybe ResponseHash) -> IO (Maybe ResponseHash)
forall a. TVar a -> IO a
STM.readTVarIO TVar (Maybe ResponseHash)
respRef
([Subscriber]
subscribersToPush, [Subscriber]
subscribersToIgnore) <-
if GQResult ByteString -> Bool
forall a. GQResult a -> Bool
isExecError GQResult ByteString
result Bool -> Bool -> Bool
|| Maybe ResponseHash
respHashM Maybe ResponseHash -> Maybe ResponseHash -> Bool
forall a. Eq a => a -> a -> Bool
/= Maybe ResponseHash
prevRespHashM
then do
$String
String -> Maybe ResponseHash -> IO ()
forall a. String -> a -> IO ()
assertNFHere Maybe ResponseHash
respHashM
STM ([Subscriber], [Subscriber]) -> IO ([Subscriber], [Subscriber])
forall a. STM a -> IO a
STM.atomically (STM ([Subscriber], [Subscriber])
-> IO ([Subscriber], [Subscriber]))
-> STM ([Subscriber], [Subscriber])
-> IO ([Subscriber], [Subscriber])
forall a b. (a -> b) -> a -> b
$ do
TVar (Maybe ResponseHash) -> Maybe ResponseHash -> STM ()
forall a. TVar a -> a -> STM ()
STM.writeTVar TVar (Maybe ResponseHash)
respRef Maybe ResponseHash
respHashM
TVar CursorVariableValues -> CursorVariableValues -> STM ()
forall a. TVar a -> a -> STM ()
STM.writeTVar (Cohort (TVar CursorVariableValues) -> TVar CursorVariableValues
forall streamCursorVars.
Cohort streamCursorVars -> streamCursorVars
C._cStreamCursorVariables Cohort (TVar CursorVariableValues)
Cohort 'Streaming
cohort) CursorVariableValues
cursorValues
([Subscriber], [Subscriber]) -> STM ([Subscriber], [Subscriber])
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return ([Subscriber]
newSinks [Subscriber] -> [Subscriber] -> [Subscriber]
forall a. Semigroup a => a -> a -> a
<> [Subscriber]
curSinks, [Subscriber]
forall a. Monoid a => a
mempty)
else
([Subscriber], [Subscriber]) -> IO ([Subscriber], [Subscriber])
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ([Subscriber]
newSinks, [Subscriber]
curSinks)
[Subscriber] -> IO ()
pushResultToSubscribers [Subscriber]
subscribersToPush
([SubscriberExecutionDetails], [SubscriberExecutionDetails])
-> IO ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
(([SubscriberExecutionDetails], [SubscriberExecutionDetails])
-> IO ([SubscriberExecutionDetails], [SubscriberExecutionDetails]))
-> ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
-> IO ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
forall a b. (a -> b) -> a -> b
$ ASetter
([Subscriber], [Subscriber])
([SubscriberExecutionDetails], [SubscriberExecutionDetails])
Subscriber
SubscriberExecutionDetails
-> (Subscriber -> SubscriberExecutionDetails)
-> ([Subscriber], [Subscriber])
-> ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
forall s t a b. ASetter s t a b -> (a -> b) -> s -> t
over
(([Subscriber] -> Identity [SubscriberExecutionDetails])
-> ([Subscriber], [Subscriber])
-> Identity
([SubscriberExecutionDetails], [SubscriberExecutionDetails])
forall s t a b. Each s t a b => Traversal s t a b
Traversal
([Subscriber], [Subscriber])
([SubscriberExecutionDetails], [SubscriberExecutionDetails])
[Subscriber]
[SubscriberExecutionDetails]
each (([Subscriber] -> Identity [SubscriberExecutionDetails])
-> ([Subscriber], [Subscriber])
-> Identity
([SubscriberExecutionDetails], [SubscriberExecutionDetails]))
-> ((Subscriber -> Identity SubscriberExecutionDetails)
-> [Subscriber] -> Identity [SubscriberExecutionDetails])
-> ASetter
([Subscriber], [Subscriber])
([SubscriberExecutionDetails], [SubscriberExecutionDetails])
Subscriber
SubscriberExecutionDetails
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Subscriber -> Identity SubscriberExecutionDetails)
-> [Subscriber] -> Identity [SubscriberExecutionDetails]
forall s t a b. Each s t a b => Traversal s t a b
Traversal
[Subscriber]
[SubscriberExecutionDetails]
Subscriber
SubscriberExecutionDetails
each)
( \Subscriber {Maybe OperationName
RequestId
SubscriberMetadata
SubscriberId
OnChange
_sId :: SubscriberId
_sMetadata :: SubscriberMetadata
_sRequestId :: RequestId
_sOperationName :: Maybe OperationName
_sOnChangeCallback :: OnChange
_sId :: Subscriber -> SubscriberId
_sMetadata :: Subscriber -> SubscriberMetadata
_sRequestId :: Subscriber -> RequestId
_sOperationName :: Subscriber -> Maybe OperationName
_sOnChangeCallback :: Subscriber -> OnChange
..} ->
SubscriberId -> SubscriberMetadata -> SubscriberExecutionDetails
SubscriberExecutionDetails SubscriberId
_sId SubscriberMetadata
_sMetadata
)
([Subscriber]
subscribersToPush, [Subscriber]
subscribersToIgnore)
where
rootFieldNameText :: Text
rootFieldNameText = Name -> Text
G.unName Name
rootFieldName
emptyRespBS :: GQResult ByteString
emptyRespBS = ByteString -> GQResult ByteString
forall a b. b -> Either a b
Right (ByteString -> GQResult ByteString)
-> ByteString -> GQResult ByteString
forall a b. (a -> b) -> a -> b
$ Text -> ByteString
txtToBs [st|{"#{rootFieldNameText}" : []}|]
roachEmptyRespBS :: GQResult ByteString
roachEmptyRespBS = ByteString -> GQResult ByteString
forall a b. b -> Either a b
Right (ByteString -> GQResult ByteString)
-> ByteString -> GQResult ByteString
forall a b. (a -> b) -> a -> b
$ ByteString
"\x1" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> Text -> ByteString
txtToBs [st|{"#{rootFieldNameText}": []}|]
isResponseEmpty :: Bool
isResponseEmpty =
GQResult ByteString
result GQResult ByteString -> GQResult ByteString -> Bool
forall a. Eq a => a -> a -> Bool
== GQResult ByteString
emptyRespBS Bool -> Bool -> Bool
|| GQResult ByteString
result GQResult ByteString -> GQResult ByteString -> Bool
forall a. Eq a => a -> a -> Bool
== GQResult ByteString
roachEmptyRespBS
C.CohortSnapshot CohortVariables
_ TVar (Maybe ResponseHash)
respRef [Subscriber]
curSinks [Subscriber]
newSinks = CohortSnapshot 'Streaming
cohortSnapshot
response :: Either GQExecError SubscriptionResponse
response = GQResult ByteString
result GQResult ByteString
-> (ByteString -> SubscriptionResponse)
-> Either GQExecError SubscriptionResponse
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \ByteString
payload -> ByteString -> DiffTime -> SubscriptionResponse
SubscriptionResponse ByteString
payload DiffTime
dTime
pushResultToSubscribers :: [Subscriber] -> IO ()
pushResultToSubscribers [Subscriber]
subscribers =
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
isResponseEmpty
(IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ ((Subscriber -> IO ()) -> [Subscriber] -> IO ())
-> [Subscriber] -> (Subscriber -> IO ()) -> IO ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip (Subscriber -> IO ()) -> [Subscriber] -> IO ()
forall (f :: * -> *) a b. Foldable f => (a -> IO b) -> f a -> IO ()
A.mapConcurrently_ [Subscriber]
subscribers
((Subscriber -> IO ()) -> IO ()) -> (Subscriber -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Subscriber {Maybe OperationName
RequestId
SubscriberMetadata
SubscriberId
OnChange
_sId :: Subscriber -> SubscriberId
_sMetadata :: Subscriber -> SubscriberMetadata
_sRequestId :: Subscriber -> RequestId
_sOperationName :: Subscriber -> Maybe OperationName
_sOnChangeCallback :: Subscriber -> OnChange
_sId :: SubscriberId
_sMetadata :: SubscriberMetadata
_sRequestId :: RequestId
_sOperationName :: Maybe OperationName
_sOnChangeCallback :: OnChange
..} -> OnChange
_sOnChangeCallback Either GQExecError SubscriptionResponse
response
pollStreamingQuery ::
forall b.
(BackendTransport b) =>
PollerId ->
STM.TVar PollerResponseState ->
SubscriptionsOptions ->
(SourceName, SourceConfig b) ->
RoleName ->
ParameterizedQueryHash ->
MultiplexedQuery b ->
CohortMap 'Streaming ->
G.Name ->
SubscriptionPostPollHook ->
Maybe (IO ()) ->
PrometheusMetrics ->
IO GranularPrometheusMetricsState ->
TMap.TMap (Maybe OperationName) Int ->
ResolvedConnectionTemplate b ->
Maybe (Endo JO.Value) ->
IO ()
pollStreamingQuery :: forall (b :: BackendType).
BackendTransport b =>
PollerId
-> TVar PollerResponseState
-> SubscriptionsOptions
-> (SourceName, SourceConfig b)
-> RoleName
-> ParameterizedQueryHash
-> MultiplexedQuery b
-> CohortMap 'Streaming
-> Name
-> SubscriptionPostPollHook
-> Maybe (IO ())
-> PrometheusMetrics
-> IO GranularPrometheusMetricsState
-> TMap (Maybe OperationName) Int
-> ResolvedConnectionTemplate b
-> Maybe (Endo Value)
-> IO ()
pollStreamingQuery PollerId
pollerId TVar PollerResponseState
pollerResponseState SubscriptionsOptions
streamingQueryOpts (SourceName
sourceName, SourceConfig b
sourceConfig) RoleName
roleName ParameterizedQueryHash
parameterizedQueryHash MultiplexedQuery b
query CohortMap 'Streaming
cohortMap Name
rootFieldName SubscriptionPostPollHook
postPollHook Maybe (IO ())
testActionMaybe PrometheusMetrics
prometheusMetrics IO GranularPrometheusMetricsState
granularPrometheusMetricsState TMap (Maybe OperationName) Int
operationNames' ResolvedConnectionTemplate b
resolvedConnectionTemplate Maybe (Endo Value)
modifier = do
HashMap (Maybe OperationName) Int
operationNames <- STM (HashMap (Maybe OperationName) Int)
-> IO (HashMap (Maybe OperationName) Int)
forall a. STM a -> IO a
STM.atomically (STM (HashMap (Maybe OperationName) Int)
-> IO (HashMap (Maybe OperationName) Int))
-> STM (HashMap (Maybe OperationName) Int)
-> IO (HashMap (Maybe OperationName) Int)
forall a b. (a -> b) -> a -> b
$ TMap (Maybe OperationName) Int
-> STM (HashMap (Maybe OperationName) Int)
forall k v. TMap k v -> STM (HashMap k v)
TMap.getMap TMap (Maybe OperationName) Int
operationNames'
(DiffTime
totalTime, (DiffTime
snapshotTime, ([BatchExecutionDetails]
batchesDetails, [[(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))]]
processedCohorts, [Maybe PollDetailsError]
maybeErrors))) <- IO
(DiffTime,
([BatchExecutionDetails],
[[(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))]],
[Maybe PollDetailsError]))
-> IO
(DiffTime,
(DiffTime,
([BatchExecutionDetails],
[[(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))]],
[Maybe PollDetailsError])))
forall (m :: * -> *) a. MonadIO m => m a -> m (DiffTime, a)
withElapsedTime (IO
(DiffTime,
([BatchExecutionDetails],
[[(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))]],
[Maybe PollDetailsError]))
-> IO
(DiffTime,
(DiffTime,
([BatchExecutionDetails],
[[(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))]],
[Maybe PollDetailsError]))))
-> IO
(DiffTime,
([BatchExecutionDetails],
[[(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))]],
[Maybe PollDetailsError]))
-> IO
(DiffTime,
(DiffTime,
([BatchExecutionDetails],
[[(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))]],
[Maybe PollDetailsError])))
forall a b. (a -> b) -> a -> b
$ do
(DiffTime
snapshotTime, [(BatchId,
[(CohortId,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))])]
cohortBatches) <- IO
[(BatchId,
[(CohortId,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))])]
-> IO
(DiffTime,
[(BatchId,
[(CohortId,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))])])
forall (m :: * -> *) a. MonadIO m => m a -> m (DiffTime, a)
withElapsedTime (IO
[(BatchId,
[(CohortId,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))])]
-> IO
(DiffTime,
[(BatchId,
[(CohortId,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))])]))
-> IO
[(BatchId,
[(CohortId,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))])]
-> IO
(DiffTime,
[(BatchId,
[(CohortId,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))])])
forall a b. (a -> b) -> a -> b
$ do
[(CohortVariables, Cohort (TVar CursorVariableValues))]
cohorts <- STM [(CohortVariables, Cohort (TVar CursorVariableValues))]
-> IO [(CohortVariables, Cohort (TVar CursorVariableValues))]
forall a. STM a -> IO a
STM.atomically (STM [(CohortVariables, Cohort (TVar CursorVariableValues))]
-> IO [(CohortVariables, Cohort (TVar CursorVariableValues))])
-> STM [(CohortVariables, Cohort (TVar CursorVariableValues))]
-> IO [(CohortVariables, Cohort (TVar CursorVariableValues))]
forall a b. (a -> b) -> a -> b
$ TMap CohortVariables (Cohort (TVar CursorVariableValues))
-> STM [(CohortVariables, Cohort (TVar CursorVariableValues))]
forall k v. TMap k v -> STM [(k, v)]
TMap.toList TMap CohortVariables (Cohort (TVar CursorVariableValues))
CohortMap 'Streaming
cohortMap
[(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))]
cohortSnapshots <- ((CohortVariables, Cohort (TVar CursorVariableValues))
-> IO
(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues))))
-> [(CohortVariables, Cohort (TVar CursorVariableValues))]
-> IO
[(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM (STM
(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))
-> IO
(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))
forall a. STM a -> IO a
STM.atomically (STM
(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))
-> IO
(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues))))
-> ((CohortVariables, Cohort (TVar CursorVariableValues))
-> STM
(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues))))
-> (CohortVariables, Cohort (TVar CursorVariableValues))
-> IO
(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (CohortVariables, Cohort (TVar CursorVariableValues))
-> STM
(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))
forall {streamCursorVars}.
(CohortVariables, Cohort streamCursorVars)
-> STM (CohortId, (CohortSnapshot, Cohort streamCursorVars))
getCohortSnapshot) [(CohortVariables, Cohort (TVar CursorVariableValues))]
cohorts
let cohortBatches :: [[(CohortId,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))]]
cohortBatches = Int
-> [(CohortId,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))]
-> [[(CohortId,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))]]
forall e. Int -> [e] -> [[e]]
chunksOf (Refined NonNegative Int -> Int
forall {k} (p :: k) x. Refined p x -> x
unrefine (BatchSize -> Refined NonNegative Int
unBatchSize BatchSize
batchSize)) [(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))]
cohortSnapshots
[(BatchId,
[(CohortId,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))])]
-> IO
[(BatchId,
[(CohortId,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))])]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([(BatchId,
[(CohortId,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))])]
-> IO
[(BatchId,
[(CohortId,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))])])
-> [(BatchId,
[(CohortId,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))])]
-> IO
[(BatchId,
[(CohortId,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))])]
forall a b. (a -> b) -> a -> b
$ [BatchId]
-> [[(CohortId,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))]]
-> [(BatchId,
[(CohortId,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))])]
forall a b. [a] -> [b] -> [(a, b)]
zip (Int -> BatchId
BatchId (Int -> BatchId) -> [Int] -> [BatchId]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [Int
1 ..]) [[(CohortId,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))]]
cohortBatches
Maybe (IO ()) -> (IO () -> IO ()) -> IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ Maybe (IO ())
testActionMaybe IO () -> IO ()
forall a. a -> a
id
[(BatchExecutionDetails,
[(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))],
Maybe PollDetailsError)]
batchesDetailsAndProcessedCohortsWithMaybeError <- [(BatchId,
[(CohortId,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))])]
-> ((BatchId,
[(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))])
-> IO
(BatchExecutionDetails,
[(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))],
Maybe PollDetailsError))
-> IO
[(BatchExecutionDetails,
[(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))],
Maybe PollDetailsError)]
forall (t :: * -> *) a b.
Traversable t =>
t a -> (a -> IO b) -> IO (t b)
A.forConcurrently [(BatchId,
[(CohortId,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))])]
cohortBatches (((BatchId,
[(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))])
-> IO
(BatchExecutionDetails,
[(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))],
Maybe PollDetailsError))
-> IO
[(BatchExecutionDetails,
[(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))],
Maybe PollDetailsError)])
-> ((BatchId,
[(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))])
-> IO
(BatchExecutionDetails,
[(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))],
Maybe PollDetailsError))
-> IO
[(BatchExecutionDetails,
[(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))],
Maybe PollDetailsError)]
forall a b. (a -> b) -> a -> b
$ \(BatchId
batchId, [(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))]
cohorts) -> do
(DiffTime
queryExecutionTime, Either QErr [(CohortId, ByteString, CursorVariableValues)]
mxRes) <-
forall (b :: BackendType) (m :: * -> *).
(BackendTransport b, MonadIO m, MonadBaseControl IO m) =>
SourceConfig b
-> MultiplexedQuery b
-> [(CohortId, CohortVariables)]
-> ResolvedConnectionTemplate b
-> m (DiffTime,
Either QErr [(CohortId, ByteString, CursorVariableValues)])
runDBStreamingSubscription @b
SourceConfig b
sourceConfig
MultiplexedQuery b
query
(ASetter
[(CohortId, CohortSnapshot)]
[(CohortId, CohortVariables)]
CohortSnapshot
CohortVariables
-> (CohortSnapshot -> CohortVariables)
-> [(CohortId, CohortSnapshot)]
-> [(CohortId, CohortVariables)]
forall s t a b. ASetter s t a b -> (a -> b) -> s -> t
over (((CohortId, CohortSnapshot)
-> Identity (CohortId, CohortVariables))
-> [(CohortId, CohortSnapshot)]
-> Identity [(CohortId, CohortVariables)]
forall s t a b. Each s t a b => Traversal s t a b
Traversal
[(CohortId, CohortSnapshot)]
[(CohortId, CohortVariables)]
(CohortId, CohortSnapshot)
(CohortId, CohortVariables)
each (((CohortId, CohortSnapshot)
-> Identity (CohortId, CohortVariables))
-> [(CohortId, CohortSnapshot)]
-> Identity [(CohortId, CohortVariables)])
-> ((CohortSnapshot -> Identity CohortVariables)
-> (CohortId, CohortSnapshot)
-> Identity (CohortId, CohortVariables))
-> ASetter
[(CohortId, CohortSnapshot)]
[(CohortId, CohortVariables)]
CohortSnapshot
CohortVariables
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (CohortSnapshot -> Identity CohortVariables)
-> (CohortId, CohortSnapshot)
-> Identity (CohortId, CohortVariables)
forall s t a b. Field2 s t a b => Lens s t a b
Lens
(CohortId, CohortSnapshot)
(CohortId, CohortVariables)
CohortSnapshot
CohortVariables
_2) CohortSnapshot -> CohortVariables
C._csVariables ([(CohortId, CohortSnapshot)] -> [(CohortId, CohortVariables)])
-> [(CohortId, CohortSnapshot)] -> [(CohortId, CohortVariables)]
forall a b. (a -> b) -> a -> b
$ ((CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))
-> (CohortId, CohortSnapshot))
-> [(CohortId,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))]
-> [(CohortId, CohortSnapshot)]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (((CohortSnapshot, Cohort (TVar CursorVariableValues))
-> CohortSnapshot)
-> (CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))
-> (CohortId, CohortSnapshot)
forall a b. (a -> b) -> (CohortId, a) -> (CohortId, b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (CohortSnapshot, Cohort (TVar CursorVariableValues))
-> CohortSnapshot
forall a b. (a, b) -> a
fst) [(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))]
cohorts)
ResolvedConnectionTemplate b
resolvedConnectionTemplate
let dbExecTimeMetric :: HistogramVector SubscriptionLabel
dbExecTimeMetric = SubscriptionMetrics -> HistogramVector SubscriptionLabel
submDBExecTotalTime (SubscriptionMetrics -> HistogramVector SubscriptionLabel)
-> SubscriptionMetrics -> HistogramVector SubscriptionLabel
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics -> SubscriptionMetrics
pmSubscriptionMetrics (PrometheusMetrics -> SubscriptionMetrics)
-> PrometheusMetrics -> SubscriptionMetrics
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics
prometheusMetrics
IO GranularPrometheusMetricsState
-> Bool
-> HashMap (Maybe OperationName) Int
-> ParameterizedQueryHash
-> SubscriptionKindLabel
-> (SubscriptionLabel -> IO ())
-> IO ()
forall (m :: * -> *).
MonadIO m =>
IO GranularPrometheusMetricsState
-> Bool
-> HashMap (Maybe OperationName) Int
-> ParameterizedQueryHash
-> SubscriptionKindLabel
-> (SubscriptionLabel -> IO ())
-> m ()
recordSubcriptionMetric
IO GranularPrometheusMetricsState
granularPrometheusMetricsState
Bool
True
HashMap (Maybe OperationName) Int
operationNames
ParameterizedQueryHash
parameterizedQueryHash
SubscriptionKindLabel
streamingSubscriptionLabel
((SubscriptionLabel -> Double -> IO ())
-> Double -> SubscriptionLabel -> IO ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip (HistogramVector SubscriptionLabel
-> SubscriptionLabel -> Double -> IO ()
forall label.
Ord label =>
HistogramVector label -> label -> Double -> IO ()
HistogramVector.observe HistogramVector SubscriptionLabel
dbExecTimeMetric) (DiffTime -> Double
forall a b. (Real a, Fractional b) => a -> b
realToFrac DiffTime
queryExecutionTime))
PollerResponseState
previousPollerResponseState <- TVar PollerResponseState -> IO PollerResponseState
forall a. TVar a -> IO a
STM.readTVarIO TVar PollerResponseState
pollerResponseState
Maybe PollDetailsError
maybeError <- case Either QErr [(CohortId, ByteString, CursorVariableValues)]
mxRes of
Left QErr
err -> do
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (PollerResponseState
previousPollerResponseState PollerResponseState -> PollerResponseState -> Bool
forall a. Eq a => a -> a -> Bool
== PollerResponseState
PRSSuccess) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Gauge -> IO ()
Prometheus.Gauge.inc (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ SubscriptionMetrics -> Gauge
submActiveStreamingPollersInError (SubscriptionMetrics -> Gauge) -> SubscriptionMetrics -> Gauge
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics -> SubscriptionMetrics
pmSubscriptionMetrics PrometheusMetrics
prometheusMetrics
STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar PollerResponseState -> PollerResponseState -> STM ()
forall a. TVar a -> a -> STM ()
STM.writeTVar TVar PollerResponseState
pollerResponseState PollerResponseState
PRSError
let pollDetailsError :: PollDetailsError
pollDetailsError =
PollDetailsError
{ _pdeBatchId :: BatchId
_pdeBatchId = BatchId
batchId,
_pdeErrorDetails :: QErr
_pdeErrorDetails = QErr
err
}
Maybe PollDetailsError -> IO (Maybe PollDetailsError)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe PollDetailsError -> IO (Maybe PollDetailsError))
-> Maybe PollDetailsError -> IO (Maybe PollDetailsError)
forall a b. (a -> b) -> a -> b
$ PollDetailsError -> Maybe PollDetailsError
forall a. a -> Maybe a
Just PollDetailsError
pollDetailsError
Right [(CohortId, ByteString, CursorVariableValues)]
_ -> do
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (PollerResponseState
previousPollerResponseState PollerResponseState -> PollerResponseState -> Bool
forall a. Eq a => a -> a -> Bool
== PollerResponseState
PRSError) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Gauge -> IO ()
Prometheus.Gauge.dec (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ SubscriptionMetrics -> Gauge
submActiveStreamingPollersInError (SubscriptionMetrics -> Gauge) -> SubscriptionMetrics -> Gauge
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics -> SubscriptionMetrics
pmSubscriptionMetrics PrometheusMetrics
prometheusMetrics
STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar PollerResponseState -> PollerResponseState -> STM ()
forall a. TVar a -> a -> STM ()
STM.writeTVar TVar PollerResponseState
pollerResponseState PollerResponseState
PRSSuccess
Maybe PollDetailsError -> IO (Maybe PollDetailsError)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe PollDetailsError
forall a. Maybe a
Nothing
let subscriptionMeta :: SubscriptionMetadata
subscriptionMeta = DiffTime -> SubscriptionMetadata
SubscriptionMetadata (DiffTime -> SubscriptionMetadata)
-> DiffTime -> SubscriptionMetadata
forall a b. (a -> b) -> a -> b
$ DiffTime -> DiffTime
forall x y. (Duration x, Duration y) => x -> y
convertDuration DiffTime
queryExecutionTime
operations :: [(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
Maybe CursorVariableValues,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))]
operations = [(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))]
-> Either QErr [(CohortId, ByteString, CursorVariableValues)]
-> [(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
Maybe CursorVariableValues,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))]
getCohortOperations [(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))]
cohorts Either QErr [(CohortId, ByteString, CursorVariableValues)]
mxRes
batchResponseSize :: Maybe Int
batchResponseSize =
case Either QErr [(CohortId, ByteString, CursorVariableValues)]
mxRes of
Left QErr
_ -> Maybe Int
forall a. Maybe a
Nothing
Right [(CohortId, ByteString, CursorVariableValues)]
resp -> Int -> Maybe Int
forall a. a -> Maybe a
Just (Int -> Maybe Int) -> Int -> Maybe Int
forall a b. (a -> b) -> a -> b
$ Sum Int -> Int
forall a. Sum a -> a
getSum (Sum Int -> Int) -> Sum Int -> Int
forall a b. (a -> b) -> a -> b
$ ((CohortId, ByteString, CursorVariableValues) -> Sum Int)
-> [(CohortId, ByteString, CursorVariableValues)] -> Sum Int
forall m a. Monoid m => (a -> m) -> [a] -> m
forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
foldMap ((\(CohortId
_, ByteString
sqlResp, CursorVariableValues
_) -> Int -> Sum Int
forall a. a -> Sum a
Sum (Int -> Sum Int) -> (ByteString -> Int) -> ByteString -> Sum Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Int
BS.length (ByteString -> Sum Int) -> ByteString -> Sum Int
forall a b. (a -> b) -> a -> b
$ ByteString
sqlResp)) [(CohortId, ByteString, CursorVariableValues)]
resp
(DiffTime
pushTime, [(CohortExecutionDetails,
(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber])))]
cohortsExecutionDetails) <- IO
[(CohortExecutionDetails,
(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber])))]
-> IO
(DiffTime,
[(CohortExecutionDetails,
(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber])))])
forall (m :: * -> *) a. MonadIO m => m a -> m (DiffTime, a)
withElapsedTime
(IO
[(CohortExecutionDetails,
(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber])))]
-> IO
(DiffTime,
[(CohortExecutionDetails,
(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber])))]))
-> IO
[(CohortExecutionDetails,
(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber])))]
-> IO
(DiffTime,
[(CohortExecutionDetails,
(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber])))])
forall a b. (a -> b) -> a -> b
$ [(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
Maybe CursorVariableValues,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))]
-> ((GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
Maybe CursorVariableValues,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))
-> IO
(CohortExecutionDetails,
(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))))
-> IO
[(CohortExecutionDetails,
(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber])))]
forall (t :: * -> *) a b.
Traversable t =>
t a -> (a -> IO b) -> IO (t b)
A.forConcurrently [(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
Maybe CursorVariableValues,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))]
operations
(((GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
Maybe CursorVariableValues,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))
-> IO
(CohortExecutionDetails,
(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))))
-> IO
[(CohortExecutionDetails,
(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber])))])
-> ((GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
Maybe CursorVariableValues,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))
-> IO
(CohortExecutionDetails,
(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))))
-> IO
[(CohortExecutionDetails,
(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber])))]
forall a b. (a -> b) -> a -> b
$ \(GQResult ByteString
res, CohortId
cohortId, Maybe (ResponseHash, Int)
respData, Maybe CursorVariableValues
latestCursorValueMaybe, (CohortSnapshot
snapshot, Cohort (TVar CursorVariableValues)
cohort)) -> do
let latestCursorValue :: CursorVariableValues
latestCursorValue@(CursorVariableValues HashMap Name TxtEncodedVal
updatedCursorVarVal) =
let prevCursorVariableValue :: CursorVariableValues
prevCursorVariableValue = HashMap Name TxtEncodedVal -> CursorVariableValues
CursorVariableValues (HashMap Name TxtEncodedVal -> CursorVariableValues)
-> HashMap Name TxtEncodedVal -> CursorVariableValues
forall a b. (a -> b) -> a -> b
$ ValidatedVariables (HashMap Name) -> HashMap Name TxtEncodedVal
forall (f :: * -> *). ValidatedVariables f -> f TxtEncodedVal
C._unValidatedVariables (ValidatedVariables (HashMap Name) -> HashMap Name TxtEncodedVal)
-> ValidatedVariables (HashMap Name) -> HashMap Name TxtEncodedVal
forall a b. (a -> b) -> a -> b
$ CohortVariables -> ValidatedVariables (HashMap Name)
C._cvCursorVariables (CohortVariables -> ValidatedVariables (HashMap Name))
-> CohortVariables -> ValidatedVariables (HashMap Name)
forall a b. (a -> b) -> a -> b
$ CohortSnapshot -> CohortVariables
C._csVariables CohortSnapshot
snapshot
in case Maybe CursorVariableValues
latestCursorValueMaybe of
Maybe CursorVariableValues
Nothing -> CursorVariableValues
prevCursorVariableValue
Just CursorVariableValues
currentPollCursorValue -> CursorVariableValues
-> CursorVariableValues -> CursorVariableValues
mergeOldAndNewCursorValues CursorVariableValues
prevCursorVariableValue CursorVariableValues
currentPollCursorValue
([SubscriberExecutionDetails]
pushedSubscribers, [SubscriberExecutionDetails]
ignoredSubscribers) <-
GQResult ByteString
-> Maybe ResponseHash
-> SubscriptionMetadata
-> CursorVariableValues
-> Name
-> (CohortSnapshot 'Streaming, Cohort 'Streaming)
-> IO ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
pushResultToCohort GQResult ByteString
res ((ResponseHash, Int) -> ResponseHash
forall a b. (a, b) -> a
fst ((ResponseHash, Int) -> ResponseHash)
-> Maybe (ResponseHash, Int) -> Maybe ResponseHash
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (ResponseHash, Int)
respData) SubscriptionMetadata
subscriptionMeta CursorVariableValues
latestCursorValue Name
rootFieldName (CohortSnapshot
CohortSnapshot 'Streaming
snapshot, Cohort (TVar CursorVariableValues)
Cohort 'Streaming
cohort)
let currentCohortKey :: CohortVariables
currentCohortKey = CohortSnapshot -> CohortVariables
C._csVariables CohortSnapshot
snapshot
updatedCohortKey :: CohortVariables
updatedCohortKey = ValidatedVariables (HashMap Name)
-> CohortVariables -> CohortVariables
modifyCursorCohortVariables (HashMap Name TxtEncodedVal -> ValidatedVariables (HashMap Name)
forall (f :: * -> *). f TxtEncodedVal -> ValidatedVariables f
mkUnsafeValidateVariables HashMap Name TxtEncodedVal
updatedCursorVarVal) (CohortVariables -> CohortVariables)
-> CohortVariables -> CohortVariables
forall a b. (a -> b) -> a -> b
$ CohortSnapshot -> CohortVariables
C._csVariables CohortSnapshot
snapshot
snapshottedNewSubs :: [Subscriber]
snapshottedNewSubs = CohortSnapshot -> [Subscriber]
C._csNewSubscribers CohortSnapshot
snapshot
cohortExecutionDetails :: CohortExecutionDetails
cohortExecutionDetails =
CohortExecutionDetails
{ _cedCohortId :: CohortId
_cedCohortId = CohortId
cohortId,
_cedVariables :: CohortVariables
_cedVariables = CohortVariables
currentCohortKey,
_cedPushedTo :: [SubscriberExecutionDetails]
_cedPushedTo = [SubscriberExecutionDetails]
pushedSubscribers,
_cedIgnored :: [SubscriberExecutionDetails]
_cedIgnored = [SubscriberExecutionDetails]
ignoredSubscribers,
_cedResponseSize :: Maybe Int
_cedResponseSize = (ResponseHash, Int) -> Int
forall a b. (a, b) -> b
snd ((ResponseHash, Int) -> Int)
-> Maybe (ResponseHash, Int) -> Maybe Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (ResponseHash, Int)
respData,
_cedBatchId :: BatchId
_cedBatchId = BatchId
batchId
}
(CohortExecutionDetails,
(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber])))
-> IO
(CohortExecutionDetails,
(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber])))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (CohortExecutionDetails
cohortExecutionDetails, (CohortVariables
currentCohortKey, (Cohort (TVar CursorVariableValues)
cohort, CohortVariables
updatedCohortKey, [Subscriber]
snapshottedNewSubs)))
let processedCohortBatch :: [(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))]
processedCohortBatch = (CohortExecutionDetails,
(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber])))
-> (CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))
forall a b. (a, b) -> b
snd ((CohortExecutionDetails,
(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber])))
-> (CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber])))
-> [(CohortExecutionDetails,
(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber])))]
-> [(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [(CohortExecutionDetails,
(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber])))]
cohortsExecutionDetails
pgExecutionTime :: Maybe DiffTime
pgExecutionTime = case BackendTag b -> BackendType
forall (b :: BackendType). BackendTag b -> BackendType
reify (forall (b :: BackendType). HasTag b => BackendTag b
backendTag @b) of
Postgres PostgresKind
Vanilla -> DiffTime -> Maybe DiffTime
forall a. a -> Maybe a
Just DiffTime
queryExecutionTime
BackendType
_ -> Maybe DiffTime
forall a. Maybe a
Nothing
batchExecDetails :: BatchExecutionDetails
batchExecDetails =
Maybe DiffTime
-> DiffTime
-> DiffTime
-> BatchId
-> [CohortExecutionDetails]
-> Maybe Int
-> BatchExecutionDetails
BatchExecutionDetails
Maybe DiffTime
pgExecutionTime
DiffTime
queryExecutionTime
DiffTime
pushTime
BatchId
batchId
((CohortExecutionDetails,
(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber])))
-> CohortExecutionDetails
forall a b. (a, b) -> a
fst ((CohortExecutionDetails,
(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber])))
-> CohortExecutionDetails)
-> [(CohortExecutionDetails,
(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber])))]
-> [CohortExecutionDetails]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [(CohortExecutionDetails,
(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber])))]
cohortsExecutionDetails)
Maybe Int
batchResponseSize
(BatchExecutionDetails,
[(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))],
Maybe PollDetailsError)
-> IO
(BatchExecutionDetails,
[(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))],
Maybe PollDetailsError)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((BatchExecutionDetails,
[(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))],
Maybe PollDetailsError)
-> IO
(BatchExecutionDetails,
[(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))],
Maybe PollDetailsError))
-> (BatchExecutionDetails,
[(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))],
Maybe PollDetailsError)
-> IO
(BatchExecutionDetails,
[(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))],
Maybe PollDetailsError)
forall a b. (a -> b) -> a -> b
$ (BatchExecutionDetails
batchExecDetails, [(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))]
processedCohortBatch, Maybe PollDetailsError
maybeError)
(DiffTime,
([BatchExecutionDetails],
[[(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))]],
[Maybe PollDetailsError]))
-> IO
(DiffTime,
([BatchExecutionDetails],
[[(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))]],
[Maybe PollDetailsError]))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (DiffTime
snapshotTime, [(BatchExecutionDetails,
[(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))],
Maybe PollDetailsError)]
-> ([BatchExecutionDetails],
[[(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))]],
[Maybe PollDetailsError])
forall a b c. [(a, b, c)] -> ([a], [b], [c])
unzip3 [(BatchExecutionDetails,
[(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))],
Maybe PollDetailsError)]
batchesDetailsAndProcessedCohortsWithMaybeError)
let initPollDetails :: PollDetails
initPollDetails =
PollDetails
{ _pdPollerId :: PollerId
_pdPollerId = PollerId
pollerId,
_pdKind :: SubscriptionType
_pdKind = SubscriptionType
Streaming,
_pdGeneratedSql :: Text
_pdGeneratedSql = MultiplexedQuery b -> Text
forall a. ToTxt a => a -> Text
toTxt MultiplexedQuery b
query,
_pdSnapshotTime :: DiffTime
_pdSnapshotTime = DiffTime
snapshotTime,
_pdBatches :: [BatchExecutionDetails]
_pdBatches = [BatchExecutionDetails]
batchesDetails,
_pdLiveQueryOptions :: SubscriptionsOptions
_pdLiveQueryOptions = SubscriptionsOptions
streamingQueryOpts,
_pdTotalTime :: DiffTime
_pdTotalTime = DiffTime
totalTime,
_pdSource :: SourceName
_pdSource = SourceName
sourceName,
_pdRole :: RoleName
_pdRole = RoleName
roleName,
_pdParameterizedQueryHash :: ParameterizedQueryHash
_pdParameterizedQueryHash = ParameterizedQueryHash
parameterizedQueryHash,
_pdLogLevel :: LogLevel
_pdLogLevel = LogLevel
LevelInfo,
_pdErrors :: Maybe [PollDetailsError]
_pdErrors = Maybe [PollDetailsError]
forall a. Maybe a
Nothing
}
maybePollDetailsErrors :: Maybe [PollDetailsError]
maybePollDetailsErrors = [Maybe PollDetailsError] -> Maybe [PollDetailsError]
forall (t :: * -> *) (f :: * -> *) a.
(Traversable t, Applicative f) =>
t (f a) -> f (t a)
forall (f :: * -> *) a. Applicative f => [f a] -> f [a]
sequenceA [Maybe PollDetailsError]
maybeErrors
pollDetails :: PollDetails
pollDetails = case Maybe [PollDetailsError]
maybePollDetailsErrors of
Maybe [PollDetailsError]
Nothing -> PollDetails
initPollDetails
Just [PollDetailsError]
pollDetailsError ->
PollDetails
initPollDetails
{ _pdLogLevel :: LogLevel
_pdLogLevel = LogLevel
LevelError,
_pdErrors :: Maybe [PollDetailsError]
_pdErrors = [PollDetailsError] -> Maybe [PollDetailsError]
forall a. a -> Maybe a
Just [PollDetailsError]
pollDetailsError
}
STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
let processedCohortsMap :: HashMap
CohortVariables
(Cohort (TVar CursorVariableValues), CohortVariables, [Subscriber])
processedCohortsMap = [(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))]
-> HashMap
CohortVariables
(Cohort (TVar CursorVariableValues), CohortVariables, [Subscriber])
forall k v. (Eq k, Hashable k) => [(k, v)] -> HashMap k v
HashMap.fromList ([(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))]
-> HashMap
CohortVariables
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))
-> [(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))]
-> HashMap
CohortVariables
(Cohort (TVar CursorVariableValues), CohortVariables, [Subscriber])
forall a b. (a -> b) -> a -> b
$ [[(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))]]
-> [(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))]
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat [[(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))]]
processedCohorts
[(CohortVariables, Cohort (TVar CursorVariableValues))]
currentCohorts <- TMap CohortVariables (Cohort (TVar CursorVariableValues))
-> STM [(CohortVariables, Cohort (TVar CursorVariableValues))]
forall k v. TMap k v -> STM [(k, v)]
TMap.toList TMap CohortVariables (Cohort (TVar CursorVariableValues))
CohortMap 'Streaming
cohortMap
HashMap CohortVariables (Cohort (TVar CursorVariableValues))
updatedCohortsMap <-
(HashMap CohortVariables (Cohort (TVar CursorVariableValues))
-> (CohortVariables, Cohort (TVar CursorVariableValues))
-> STM
(HashMap CohortVariables (Cohort (TVar CursorVariableValues))))
-> HashMap CohortVariables (Cohort (TVar CursorVariableValues))
-> [(CohortVariables, Cohort (TVar CursorVariableValues))]
-> STM
(HashMap CohortVariables (Cohort (TVar CursorVariableValues)))
forall (t :: * -> *) (m :: * -> *) b a.
(Foldable t, Monad m) =>
(b -> a -> m b) -> b -> t a -> m b
foldM
( \HashMap CohortVariables (Cohort (TVar CursorVariableValues))
accCohortMap (CohortVariables
currentCohortKey, Cohort (TVar CursorVariableValues)
currentCohort) -> do
let processedCohortMaybe :: Maybe
(Cohort (TVar CursorVariableValues), CohortVariables, [Subscriber])
processedCohortMaybe = CohortVariables
-> HashMap
CohortVariables
(Cohort (TVar CursorVariableValues), CohortVariables, [Subscriber])
-> Maybe
(Cohort (TVar CursorVariableValues), CohortVariables, [Subscriber])
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HashMap.lookup CohortVariables
currentCohortKey HashMap
CohortVariables
(Cohort (TVar CursorVariableValues), CohortVariables, [Subscriber])
processedCohortsMap
case Maybe
(Cohort (TVar CursorVariableValues), CohortVariables, [Subscriber])
processedCohortMaybe of
Maybe
(Cohort (TVar CursorVariableValues), CohortVariables, [Subscriber])
Nothing -> (Cohort (TVar CursorVariableValues)
-> Cohort (TVar CursorVariableValues)
-> STM (Cohort (TVar CursorVariableValues)))
-> CohortVariables
-> Cohort (TVar CursorVariableValues)
-> HashMap CohortVariables (Cohort (TVar CursorVariableValues))
-> STM
(HashMap CohortVariables (Cohort (TVar CursorVariableValues)))
forall (m :: * -> *) k v.
(Monad m, Hashable k) =>
(v -> v -> m v) -> k -> v -> HashMap k v -> m (HashMap k v)
HashMap.insertWithM Cohort (TVar CursorVariableValues)
-> Cohort (TVar CursorVariableValues)
-> STM (Cohort (TVar CursorVariableValues))
Cohort 'Streaming -> Cohort 'Streaming -> STM (Cohort 'Streaming)
mergeCohorts CohortVariables
currentCohortKey Cohort (TVar CursorVariableValues)
currentCohort HashMap CohortVariables (Cohort (TVar CursorVariableValues))
accCohortMap
Just (Cohort (TVar CursorVariableValues)
processedCohort, CohortVariables
updatedCohortKey, [Subscriber]
snapshottedNewSubs) -> do
Cohort (TVar CursorVariableValues) -> [Subscriber] -> STM ()
forall {streamCursorVars}.
Cohort streamCursorVars -> [Subscriber] -> STM ()
updateCohortSubscribers Cohort (TVar CursorVariableValues)
currentCohort [Subscriber]
snapshottedNewSubs
[(SubscriberId, Subscriber)]
currentCohortExistingSubscribers <- TMap SubscriberId Subscriber -> STM [(SubscriberId, Subscriber)]
forall k v. TMap k v -> STM [(k, v)]
TMap.toList (TMap SubscriberId Subscriber -> STM [(SubscriberId, Subscriber)])
-> TMap SubscriberId Subscriber -> STM [(SubscriberId, Subscriber)]
forall a b. (a -> b) -> a -> b
$ Cohort (TVar CursorVariableValues) -> TMap SubscriberId Subscriber
forall streamCursorVars.
Cohort streamCursorVars -> TMap SubscriberId Subscriber
C._cExistingSubscribers Cohort (TVar CursorVariableValues)
currentCohort
HashMap SubscriberId Subscriber
newlyAddedSubscribers <- TMap SubscriberId Subscriber
-> STM (HashMap SubscriberId Subscriber)
forall k v. TMap k v -> STM (HashMap k v)
TMap.getMap (TMap SubscriberId Subscriber
-> STM (HashMap SubscriberId Subscriber))
-> TMap SubscriberId Subscriber
-> STM (HashMap SubscriberId Subscriber)
forall a b. (a -> b) -> a -> b
$ Cohort (TVar CursorVariableValues) -> TMap SubscriberId Subscriber
forall streamCursorVars.
Cohort streamCursorVars -> TMap SubscriberId Subscriber
C._cNewSubscribers Cohort (TVar CursorVariableValues)
currentCohort
HashMap CohortVariables (Cohort (TVar CursorVariableValues))
accCohortMapWithCurrentCohort <-
if HashMap SubscriberId Subscriber -> Bool
forall a. HashMap SubscriberId a -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null HashMap SubscriberId Subscriber
newlyAddedSubscribers
then HashMap CohortVariables (Cohort (TVar CursorVariableValues))
-> STM
(HashMap CohortVariables (Cohort (TVar CursorVariableValues)))
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure HashMap CohortVariables (Cohort (TVar CursorVariableValues))
accCohortMap
else do
Cohort (TVar CursorVariableValues)
newCohort <- do
TMap SubscriberId Subscriber
existingSubs <- STM (TMap SubscriberId Subscriber)
forall k v. STM (TMap k v)
TMap.new
TMap SubscriberId Subscriber
newSubs <- STM (TMap SubscriberId Subscriber)
forall k v. STM (TMap k v)
TMap.new
Cohort (TVar CursorVariableValues)
-> STM (Cohort (TVar CursorVariableValues))
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
(Cohort (TVar CursorVariableValues)
-> STM (Cohort (TVar CursorVariableValues)))
-> Cohort (TVar CursorVariableValues)
-> STM (Cohort (TVar CursorVariableValues))
forall a b. (a -> b) -> a -> b
$ CohortId
-> TVar (Maybe ResponseHash)
-> TMap SubscriberId Subscriber
-> TMap SubscriberId Subscriber
-> TVar CursorVariableValues
-> Cohort (TVar CursorVariableValues)
forall streamCursorVars.
CohortId
-> TVar (Maybe ResponseHash)
-> TMap SubscriberId Subscriber
-> TMap SubscriberId Subscriber
-> streamCursorVars
-> Cohort streamCursorVars
C.Cohort
(Cohort (TVar CursorVariableValues) -> CohortId
forall streamCursorVars. Cohort streamCursorVars -> CohortId
C._cCohortId Cohort (TVar CursorVariableValues)
currentCohort)
(Cohort (TVar CursorVariableValues) -> TVar (Maybe ResponseHash)
forall streamCursorVars.
Cohort streamCursorVars -> TVar (Maybe ResponseHash)
C._cPreviousResponse Cohort (TVar CursorVariableValues)
currentCohort)
TMap SubscriberId Subscriber
existingSubs
TMap SubscriberId Subscriber
newSubs
(Cohort (TVar CursorVariableValues) -> TVar CursorVariableValues
forall streamCursorVars.
Cohort streamCursorVars -> streamCursorVars
C._cStreamCursorVariables Cohort (TVar CursorVariableValues)
currentCohort)
TMap SubscriberId Subscriber
-> HashMap SubscriberId Subscriber -> STM ()
forall k v. TMap k v -> HashMap k v -> STM ()
TMap.replace (Cohort (TVar CursorVariableValues) -> TMap SubscriberId Subscriber
forall streamCursorVars.
Cohort streamCursorVars -> TMap SubscriberId Subscriber
C._cNewSubscribers Cohort (TVar CursorVariableValues)
newCohort) HashMap SubscriberId Subscriber
newlyAddedSubscribers
(Cohort (TVar CursorVariableValues)
-> Cohort (TVar CursorVariableValues)
-> STM (Cohort (TVar CursorVariableValues)))
-> CohortVariables
-> Cohort (TVar CursorVariableValues)
-> HashMap CohortVariables (Cohort (TVar CursorVariableValues))
-> STM
(HashMap CohortVariables (Cohort (TVar CursorVariableValues)))
forall (m :: * -> *) k v.
(Monad m, Hashable k) =>
(v -> v -> m v) -> k -> v -> HashMap k v -> m (HashMap k v)
HashMap.insertWithM Cohort (TVar CursorVariableValues)
-> Cohort (TVar CursorVariableValues)
-> STM (Cohort (TVar CursorVariableValues))
Cohort 'Streaming -> Cohort 'Streaming -> STM (Cohort 'Streaming)
mergeCohorts CohortVariables
currentCohortKey Cohort (TVar CursorVariableValues)
newCohort HashMap CohortVariables (Cohort (TVar CursorVariableValues))
accCohortMap
let allCurrentSubscribers :: HashSet SubscriberId
allCurrentSubscribers = [SubscriberId] -> HashSet SubscriberId
forall a. (Eq a, Hashable a) => [a] -> HashSet a
Set.fromList ([SubscriberId] -> HashSet SubscriberId)
-> [SubscriberId] -> HashSet SubscriberId
forall a b. (a -> b) -> a -> b
$ (SubscriberId, Subscriber) -> SubscriberId
forall a b. (a, b) -> a
fst ((SubscriberId, Subscriber) -> SubscriberId)
-> [(SubscriberId, Subscriber)] -> [SubscriberId]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (HashMap SubscriberId Subscriber -> [(SubscriberId, Subscriber)]
forall k v. HashMap k v -> [(k, v)]
HashMap.toList HashMap SubscriberId Subscriber
newlyAddedSubscribers [(SubscriberId, Subscriber)]
-> [(SubscriberId, Subscriber)] -> [(SubscriberId, Subscriber)]
forall a. Semigroup a => a -> a -> a
<> [(SubscriberId, Subscriber)]
currentCohortExistingSubscribers)
(SubscriberId -> Subscriber -> Bool)
-> TMap SubscriberId Subscriber -> STM ()
forall k v. (k -> v -> Bool) -> TMap k v -> STM ()
TMap.filterWithKey (\SubscriberId
k Subscriber
_ -> SubscriberId
k SubscriberId -> HashSet SubscriberId -> Bool
forall a. Eq a => a -> HashSet a -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` HashSet SubscriberId
allCurrentSubscribers) (TMap SubscriberId Subscriber -> STM ())
-> TMap SubscriberId Subscriber -> STM ()
forall a b. (a -> b) -> a -> b
$ Cohort (TVar CursorVariableValues) -> TMap SubscriberId Subscriber
forall streamCursorVars.
Cohort streamCursorVars -> TMap SubscriberId Subscriber
C._cExistingSubscribers Cohort (TVar CursorVariableValues)
processedCohort
(SubscriberId -> Subscriber -> Bool)
-> TMap SubscriberId Subscriber -> STM ()
forall k v. (k -> v -> Bool) -> TMap k v -> STM ()
TMap.filterWithKey (\SubscriberId
k Subscriber
_ -> SubscriberId
k SubscriberId -> HashSet SubscriberId -> Bool
forall a. Eq a => a -> HashSet a -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` HashSet SubscriberId
allCurrentSubscribers) (TMap SubscriberId Subscriber -> STM ())
-> TMap SubscriberId Subscriber -> STM ()
forall a b. (a -> b) -> a -> b
$ Cohort (TVar CursorVariableValues) -> TMap SubscriberId Subscriber
forall streamCursorVars.
Cohort streamCursorVars -> TMap SubscriberId Subscriber
C._cNewSubscribers Cohort (TVar CursorVariableValues)
processedCohort
(Cohort (TVar CursorVariableValues)
-> Cohort (TVar CursorVariableValues)
-> STM (Cohort (TVar CursorVariableValues)))
-> CohortVariables
-> Cohort (TVar CursorVariableValues)
-> HashMap CohortVariables (Cohort (TVar CursorVariableValues))
-> STM
(HashMap CohortVariables (Cohort (TVar CursorVariableValues)))
forall (m :: * -> *) k v.
(Monad m, Hashable k) =>
(v -> v -> m v) -> k -> v -> HashMap k v -> m (HashMap k v)
HashMap.insertWithM Cohort (TVar CursorVariableValues)
-> Cohort (TVar CursorVariableValues)
-> STM (Cohort (TVar CursorVariableValues))
Cohort 'Streaming -> Cohort 'Streaming -> STM (Cohort 'Streaming)
mergeCohorts CohortVariables
updatedCohortKey Cohort (TVar CursorVariableValues)
processedCohort HashMap CohortVariables (Cohort (TVar CursorVariableValues))
accCohortMapWithCurrentCohort
)
HashMap CohortVariables (Cohort (TVar CursorVariableValues))
forall a. Monoid a => a
mempty
[(CohortVariables, Cohort (TVar CursorVariableValues))]
currentCohorts
TMap CohortVariables (Cohort (TVar CursorVariableValues))
-> HashMap CohortVariables (Cohort (TVar CursorVariableValues))
-> STM ()
forall k v. TMap k v -> HashMap k v -> STM ()
TMap.replace TMap CohortVariables (Cohort (TVar CursorVariableValues))
CohortMap 'Streaming
cohortMap HashMap CohortVariables (Cohort (TVar CursorVariableValues))
updatedCohortsMap
SubscriptionPostPollHook
postPollHook PollDetails
pollDetails
let totalTimeMetric :: HistogramVector SubscriptionLabel
totalTimeMetric = SubscriptionMetrics -> HistogramVector SubscriptionLabel
submTotalTime (SubscriptionMetrics -> HistogramVector SubscriptionLabel)
-> SubscriptionMetrics -> HistogramVector SubscriptionLabel
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics -> SubscriptionMetrics
pmSubscriptionMetrics (PrometheusMetrics -> SubscriptionMetrics)
-> PrometheusMetrics -> SubscriptionMetrics
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics
prometheusMetrics
IO GranularPrometheusMetricsState
-> Bool
-> HashMap (Maybe OperationName) Int
-> ParameterizedQueryHash
-> SubscriptionKindLabel
-> (SubscriptionLabel -> IO ())
-> IO ()
forall (m :: * -> *).
MonadIO m =>
IO GranularPrometheusMetricsState
-> Bool
-> HashMap (Maybe OperationName) Int
-> ParameterizedQueryHash
-> SubscriptionKindLabel
-> (SubscriptionLabel -> IO ())
-> m ()
recordSubcriptionMetric
IO GranularPrometheusMetricsState
granularPrometheusMetricsState
Bool
True
HashMap (Maybe OperationName) Int
operationNames
ParameterizedQueryHash
parameterizedQueryHash
SubscriptionKindLabel
streamingSubscriptionLabel
((SubscriptionLabel -> Double -> IO ())
-> Double -> SubscriptionLabel -> IO ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip (HistogramVector SubscriptionLabel
-> SubscriptionLabel -> Double -> IO ()
forall label.
Ord label =>
HistogramVector label -> label -> Double -> IO ()
HistogramVector.observe HistogramVector SubscriptionLabel
totalTimeMetric) (DiffTime -> Double
forall a b. (Real a, Fractional b) => a -> b
realToFrac DiffTime
totalTime))
where
SubscriptionsOptions BatchSize
batchSize RefetchInterval
_ = SubscriptionsOptions
streamingQueryOpts
updateCohortSubscribers :: Cohort streamCursorVars -> [Subscriber] -> STM ()
updateCohortSubscribers (C.Cohort CohortId
_id TVar (Maybe ResponseHash)
_respRef TMap SubscriberId Subscriber
curOpsTV TMap SubscriberId Subscriber
newOpsTV streamCursorVars
_) [Subscriber]
snapshottedNewSubs = do
[(SubscriberId, Subscriber)]
allNewOpsL <- TMap SubscriberId Subscriber -> STM [(SubscriberId, Subscriber)]
forall k v. TMap k v -> STM [(k, v)]
TMap.toList TMap SubscriberId Subscriber
newOpsTV
let snapshottedNewSubsSet :: HashSet SubscriberId
snapshottedNewSubsSet = [SubscriberId] -> HashSet SubscriberId
forall a. (Eq a, Hashable a) => [a] -> HashSet a
Set.fromList ([SubscriberId] -> HashSet SubscriberId)
-> [SubscriberId] -> HashSet SubscriberId
forall a b. (a -> b) -> a -> b
$ Subscriber -> SubscriberId
_sId (Subscriber -> SubscriberId) -> [Subscriber] -> [SubscriberId]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [Subscriber]
snapshottedNewSubs
[(SubscriberId, Subscriber)]
-> ((SubscriberId, Subscriber) -> STM ()) -> STM ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [(SubscriberId, Subscriber)]
allNewOpsL (((SubscriberId, Subscriber) -> STM ()) -> STM ())
-> ((SubscriberId, Subscriber) -> STM ()) -> STM ()
forall a b. (a -> b) -> a -> b
$ \(SubscriberId
subId, Subscriber
subscriber) ->
Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (SubscriberId
subId SubscriberId -> HashSet SubscriberId -> Bool
forall a. Eq a => a -> HashSet a -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` HashSet SubscriberId
snapshottedNewSubsSet) do
Subscriber
-> SubscriberId -> TMap SubscriberId Subscriber -> STM ()
forall k v. Hashable k => v -> k -> TMap k v -> STM ()
TMap.insert Subscriber
subscriber SubscriberId
subId TMap SubscriberId Subscriber
curOpsTV
SubscriberId -> TMap SubscriberId Subscriber -> STM ()
forall k v. Hashable k => k -> TMap k v -> STM ()
TMap.delete SubscriberId
subId TMap SubscriberId Subscriber
newOpsTV
getCohortSnapshot :: (CohortVariables, Cohort streamCursorVars)
-> STM (CohortId, (CohortSnapshot, Cohort streamCursorVars))
getCohortSnapshot (CohortVariables
cohortVars, Cohort streamCursorVars
cohort) = do
let C.Cohort CohortId
resId TVar (Maybe ResponseHash)
respRef TMap SubscriberId Subscriber
curOpsTV TMap SubscriberId Subscriber
newOpsTV streamCursorVars
_ = Cohort streamCursorVars
cohort
[(SubscriberId, Subscriber)]
curOpsL <- TMap SubscriberId Subscriber -> STM [(SubscriberId, Subscriber)]
forall k v. TMap k v -> STM [(k, v)]
TMap.toList TMap SubscriberId Subscriber
curOpsTV
[(SubscriberId, Subscriber)]
newOpsL <- TMap SubscriberId Subscriber -> STM [(SubscriberId, Subscriber)]
forall k v. TMap k v -> STM [(k, v)]
TMap.toList TMap SubscriberId Subscriber
newOpsTV
let cohortSnapshot :: CohortSnapshot
cohortSnapshot = CohortVariables
-> TVar (Maybe ResponseHash)
-> [Subscriber]
-> [Subscriber]
-> CohortSnapshot
C.CohortSnapshot CohortVariables
cohortVars TVar (Maybe ResponseHash)
respRef (((SubscriberId, Subscriber) -> Subscriber)
-> [(SubscriberId, Subscriber)] -> [Subscriber]
forall a b. (a -> b) -> [a] -> [b]
map (SubscriberId, Subscriber) -> Subscriber
forall a b. (a, b) -> b
snd [(SubscriberId, Subscriber)]
curOpsL) (((SubscriberId, Subscriber) -> Subscriber)
-> [(SubscriberId, Subscriber)] -> [Subscriber]
forall a b. (a -> b) -> [a] -> [b]
map (SubscriberId, Subscriber) -> Subscriber
forall a b. (a, b) -> b
snd [(SubscriberId, Subscriber)]
newOpsL)
(CohortId, (CohortSnapshot, Cohort streamCursorVars))
-> STM (CohortId, (CohortSnapshot, Cohort streamCursorVars))
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (CohortId
resId, (CohortSnapshot
cohortSnapshot, Cohort streamCursorVars
cohort))
mergeCohorts :: Cohort 'Streaming -> Cohort 'Streaming -> STM.STM (Cohort 'Streaming)
mergeCohorts :: Cohort 'Streaming -> Cohort 'Streaming -> STM (Cohort 'Streaming)
mergeCohorts Cohort 'Streaming
newCohort Cohort 'Streaming
oldCohort = do
let newCohortExistingSubscribers :: TMap SubscriberId Subscriber
newCohortExistingSubscribers = Cohort (TVar CursorVariableValues) -> TMap SubscriberId Subscriber
forall streamCursorVars.
Cohort streamCursorVars -> TMap SubscriberId Subscriber
C._cExistingSubscribers Cohort (TVar CursorVariableValues)
Cohort 'Streaming
newCohort
oldCohortExistingSubscribers :: TMap SubscriberId Subscriber
oldCohortExistingSubscribers = Cohort (TVar CursorVariableValues) -> TMap SubscriberId Subscriber
forall streamCursorVars.
Cohort streamCursorVars -> TMap SubscriberId Subscriber
C._cExistingSubscribers Cohort (TVar CursorVariableValues)
Cohort 'Streaming
oldCohort
newCohortNewSubscribers :: TMap SubscriberId Subscriber
newCohortNewSubscribers = Cohort (TVar CursorVariableValues) -> TMap SubscriberId Subscriber
forall streamCursorVars.
Cohort streamCursorVars -> TMap SubscriberId Subscriber
C._cNewSubscribers Cohort (TVar CursorVariableValues)
Cohort 'Streaming
newCohort
oldCohortNewSubscribers :: TMap SubscriberId Subscriber
oldCohortNewSubscribers = Cohort (TVar CursorVariableValues) -> TMap SubscriberId Subscriber
forall streamCursorVars.
Cohort streamCursorVars -> TMap SubscriberId Subscriber
C._cNewSubscribers Cohort (TVar CursorVariableValues)
Cohort 'Streaming
oldCohort
TMap SubscriberId Subscriber
mergedExistingSubscribers <- TMap SubscriberId Subscriber
-> TMap SubscriberId Subscriber
-> STM (TMap SubscriberId Subscriber)
forall k v. Hashable k => TMap k v -> TMap k v -> STM (TMap k v)
TMap.union TMap SubscriberId Subscriber
newCohortExistingSubscribers TMap SubscriberId Subscriber
oldCohortExistingSubscribers
TMap SubscriberId Subscriber
mergedNewSubscribers <- TMap SubscriberId Subscriber
-> TMap SubscriberId Subscriber
-> STM (TMap SubscriberId Subscriber)
forall k v. Hashable k => TMap k v -> TMap k v -> STM (TMap k v)
TMap.union TMap SubscriberId Subscriber
newCohortNewSubscribers TMap SubscriberId Subscriber
oldCohortNewSubscribers
Cohort (TVar CursorVariableValues)
-> STM (Cohort (TVar CursorVariableValues))
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
(Cohort (TVar CursorVariableValues)
-> STM (Cohort (TVar CursorVariableValues)))
-> Cohort (TVar CursorVariableValues)
-> STM (Cohort (TVar CursorVariableValues))
forall a b. (a -> b) -> a -> b
$ Cohort (TVar CursorVariableValues)
Cohort 'Streaming
newCohort
{ _cNewSubscribers :: TMap SubscriberId Subscriber
C._cNewSubscribers = TMap SubscriberId Subscriber
mergedNewSubscribers,
_cExistingSubscribers :: TMap SubscriberId Subscriber
C._cExistingSubscribers = TMap SubscriberId Subscriber
mergedExistingSubscribers
}
getCohortOperations :: [(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))]
-> Either QErr [(CohortId, ByteString, CursorVariableValues)]
-> [(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
Maybe CursorVariableValues,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))]
getCohortOperations [(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))]
cohorts = \case
Left QErr
e ->
let resp :: GQResult ByteString
resp = GQExecError -> GQResult ByteString
forall a. GQExecError -> Either GQExecError a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (GQExecError -> GQResult ByteString)
-> GQExecError -> GQResult ByteString
forall a b. (a -> b) -> a -> b
$ [Encoding] -> GQExecError
GQExecError [Bool -> QErr -> Encoding
encodeGQLErr Bool
False QErr
e]
in [(GQResult ByteString
resp, CohortId
cohortId, Maybe (ResponseHash, Int)
forall a. Maybe a
Nothing, Maybe CursorVariableValues
forall a. Maybe a
Nothing, (CohortSnapshot, Cohort (TVar CursorVariableValues))
snapshot) | (CohortId
cohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues))
snapshot) <- [(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))]
cohorts]
Right [(CohortId, ByteString, CursorVariableValues)]
responses -> do
let cohortSnapshotMap :: HashMap
CohortId (CohortSnapshot, Cohort (TVar CursorVariableValues))
cohortSnapshotMap = [(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))]
-> HashMap
CohortId (CohortSnapshot, Cohort (TVar CursorVariableValues))
forall k v. (Eq k, Hashable k) => [(k, v)] -> HashMap k v
HashMap.fromList [(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))]
cohorts
(((CohortId, ByteString, CursorVariableValues)
-> Maybe
(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
Maybe CursorVariableValues,
(CohortSnapshot, Cohort (TVar CursorVariableValues))))
-> [(CohortId, ByteString, CursorVariableValues)]
-> [(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
Maybe CursorVariableValues,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))])
-> [(CohortId, ByteString, CursorVariableValues)]
-> ((CohortId, ByteString, CursorVariableValues)
-> Maybe
(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
Maybe CursorVariableValues,
(CohortSnapshot, Cohort (TVar CursorVariableValues))))
-> [(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
Maybe CursorVariableValues,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))]
forall a b c. (a -> b -> c) -> b -> a -> c
flip ((CohortId, ByteString, CursorVariableValues)
-> Maybe
(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
Maybe CursorVariableValues,
(CohortSnapshot, Cohort (TVar CursorVariableValues))))
-> [(CohortId, ByteString, CursorVariableValues)]
-> [(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
Maybe CursorVariableValues,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))]
forall a b. (a -> Maybe b) -> [a] -> [b]
forall (f :: * -> *) a b.
Filterable f =>
(a -> Maybe b) -> f a -> f b
mapMaybe [(CohortId, ByteString, CursorVariableValues)]
responses (((CohortId, ByteString, CursorVariableValues)
-> Maybe
(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
Maybe CursorVariableValues,
(CohortSnapshot, Cohort (TVar CursorVariableValues))))
-> [(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
Maybe CursorVariableValues,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))])
-> ((CohortId, ByteString, CursorVariableValues)
-> Maybe
(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
Maybe CursorVariableValues,
(CohortSnapshot, Cohort (TVar CursorVariableValues))))
-> [(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
Maybe CursorVariableValues,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))]
forall a b. (a -> b) -> a -> b
$ \(CohortId
cohortId, ByteString
respBS, CursorVariableValues
respCursorLatestValue) ->
let respHash :: ResponseHash
respHash = ByteString -> ResponseHash
mkRespHash ByteString
respBS
respSize :: Int
respSize = ByteString -> Int
BS.length ByteString
respBS
in
(ByteString -> GQResult ByteString
forall a. a -> Either GQExecError a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe (Endo Value) -> ByteString -> ByteString
applyModifier Maybe (Endo Value)
modifier ByteString
respBS),CohortId
cohortId,(ResponseHash, Int) -> Maybe (ResponseHash, Int)
forall a. a -> Maybe a
Just (ResponseHash
respHash, Int
respSize),CursorVariableValues -> Maybe CursorVariableValues
forall a. a -> Maybe a
Just CursorVariableValues
respCursorLatestValue,)
((CohortSnapshot, Cohort (TVar CursorVariableValues))
-> (GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
Maybe CursorVariableValues,
(CohortSnapshot, Cohort (TVar CursorVariableValues))))
-> Maybe (CohortSnapshot, Cohort (TVar CursorVariableValues))
-> Maybe
(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
Maybe CursorVariableValues,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> CohortId
-> HashMap
CohortId (CohortSnapshot, Cohort (TVar CursorVariableValues))
-> Maybe (CohortSnapshot, Cohort (TVar CursorVariableValues))
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HashMap.lookup CohortId
cohortId HashMap
CohortId (CohortSnapshot, Cohort (TVar CursorVariableValues))
cohortSnapshotMap