{-# LANGUAGE TemplateHaskell #-}
module Hasura.GraphQL.Execute.Subscription.Poll.LiveQuery
(
pollLiveQuery,
)
where
import Control.Concurrent.Async qualified as A
import Control.Concurrent.STM qualified as STM
import Control.Lens
import Data.ByteString qualified as BS
import Data.HashMap.Strict qualified as Map
import Data.List.Split (chunksOf)
import Data.Monoid (Sum (..))
import Data.Text.Extended
import GHC.AssertNF.CPP
import Hasura.Base.Error
import Hasura.GraphQL.Execute.Backend
import Hasura.GraphQL.Execute.Subscription.Options
import Hasura.GraphQL.Execute.Subscription.Poll.Common hiding (Cohort (..), CohortMap, CohortSnapshot (..))
import Hasura.GraphQL.Execute.Subscription.Poll.Common qualified as C
import Hasura.GraphQL.Execute.Subscription.TMap qualified as TMap
import Hasura.GraphQL.Execute.Subscription.Types
import Hasura.GraphQL.ParameterizedQueryHash (ParameterizedQueryHash)
import Hasura.GraphQL.Transport.Backend
import Hasura.GraphQL.Transport.HTTP.Protocol
import Hasura.Prelude
import Hasura.RQL.Types.Backend
import Hasura.RQL.Types.Common (SourceName)
import Hasura.RQL.Types.Numeric qualified as Numeric
import Hasura.RQL.Types.Subscription (SubscriptionType (..))
import Hasura.Session
pushResultToCohort ::
GQResult BS.ByteString ->
Maybe ResponseHash ->
SubscriptionMetadata ->
CohortSnapshot 'LiveQuery ->
IO ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
pushResultToCohort :: GQResult ByteString
-> Maybe ResponseHash
-> SubscriptionMetadata
-> CohortSnapshot 'LiveQuery
-> IO ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
pushResultToCohort GQResult ByteString
result !Maybe ResponseHash
respHashM (SubscriptionMetadata DiffTime
dTime) CohortSnapshot 'LiveQuery
cohortSnapshot = do
Maybe ResponseHash
prevRespHashM <- TVar (Maybe ResponseHash) -> IO (Maybe ResponseHash)
forall a. TVar a -> IO a
STM.readTVarIO TVar (Maybe ResponseHash)
respRef
([Subscriber]
subscribersToPush, [Subscriber]
subscribersToIgnore) <-
if GQResult ByteString -> Bool
forall a. GQResult a -> Bool
isExecError GQResult ByteString
result Bool -> Bool -> Bool
|| Maybe ResponseHash
respHashM Maybe ResponseHash -> Maybe ResponseHash -> Bool
forall a. Eq a => a -> a -> Bool
/= Maybe ResponseHash
prevRespHashM
then do
String
String -> Maybe ResponseHash -> IO ()
forall a. String -> a -> IO ()
$assertNFHere Maybe ResponseHash
respHashM
STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
TVar (Maybe ResponseHash) -> Maybe ResponseHash -> STM ()
forall a. TVar a -> a -> STM ()
STM.writeTVar TVar (Maybe ResponseHash)
respRef Maybe ResponseHash
respHashM
([Subscriber], [Subscriber]) -> IO ([Subscriber], [Subscriber])
forall (m :: * -> *) a. Monad m => a -> m a
return ([Subscriber]
newSinks [Subscriber] -> [Subscriber] -> [Subscriber]
forall a. Semigroup a => a -> a -> a
<> [Subscriber]
curSinks, [Subscriber]
forall a. Monoid a => a
mempty)
else ([Subscriber], [Subscriber]) -> IO ([Subscriber], [Subscriber])
forall (m :: * -> *) a. Monad m => a -> m a
return ([Subscriber]
newSinks, [Subscriber]
curSinks)
[Subscriber] -> IO ()
pushResultToSubscribers [Subscriber]
subscribersToPush
([SubscriberExecutionDetails], [SubscriberExecutionDetails])
-> IO ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
forall (f :: * -> *) a. Applicative f => a -> f a
pure (([SubscriberExecutionDetails], [SubscriberExecutionDetails])
-> IO ([SubscriberExecutionDetails], [SubscriberExecutionDetails]))
-> ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
-> IO ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
forall a b. (a -> b) -> a -> b
$
ASetter
([Subscriber], [Subscriber])
([SubscriberExecutionDetails], [SubscriberExecutionDetails])
Subscriber
SubscriberExecutionDetails
-> (Subscriber -> SubscriberExecutionDetails)
-> ([Subscriber], [Subscriber])
-> ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
forall s t a b. ASetter s t a b -> (a -> b) -> s -> t
over
(([Subscriber] -> Identity [SubscriberExecutionDetails])
-> ([Subscriber], [Subscriber])
-> Identity
([SubscriberExecutionDetails], [SubscriberExecutionDetails])
forall s t a b. Each s t a b => Traversal s t a b
each (([Subscriber] -> Identity [SubscriberExecutionDetails])
-> ([Subscriber], [Subscriber])
-> Identity
([SubscriberExecutionDetails], [SubscriberExecutionDetails]))
-> ((Subscriber -> Identity SubscriberExecutionDetails)
-> [Subscriber] -> Identity [SubscriberExecutionDetails])
-> ASetter
([Subscriber], [Subscriber])
([SubscriberExecutionDetails], [SubscriberExecutionDetails])
Subscriber
SubscriberExecutionDetails
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Subscriber -> Identity SubscriberExecutionDetails)
-> [Subscriber] -> Identity [SubscriberExecutionDetails]
forall s t a b. Each s t a b => Traversal s t a b
each)
( \Subscriber {Maybe OperationName
RequestId
SubscriberMetadata
SubscriberId
OnChange
_sOnChangeCallback :: Subscriber -> OnChange
_sOperationName :: Subscriber -> Maybe OperationName
_sRequestId :: Subscriber -> RequestId
_sMetadata :: Subscriber -> SubscriberMetadata
_sId :: Subscriber -> SubscriberId
_sOnChangeCallback :: OnChange
_sOperationName :: Maybe OperationName
_sRequestId :: RequestId
_sMetadata :: SubscriberMetadata
_sId :: SubscriberId
..} ->
SubscriberId -> SubscriberMetadata -> SubscriberExecutionDetails
SubscriberExecutionDetails SubscriberId
_sId SubscriberMetadata
_sMetadata
)
([Subscriber]
subscribersToPush, [Subscriber]
subscribersToIgnore)
where
C.CohortSnapshot CohortVariables
_ TVar (Maybe ResponseHash)
respRef [Subscriber]
curSinks [Subscriber]
newSinks = CohortSnapshot
CohortSnapshot 'LiveQuery
cohortSnapshot
response :: Either GQExecError SubscriptionResponse
response = GQResult ByteString
result GQResult ByteString
-> (ByteString -> SubscriptionResponse)
-> Either GQExecError SubscriptionResponse
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> (ByteString -> DiffTime -> SubscriptionResponse
`SubscriptionResponse` DiffTime
dTime)
pushResultToSubscribers :: [Subscriber] -> IO ()
pushResultToSubscribers =
(Subscriber -> IO ()) -> [Subscriber] -> IO ()
forall (f :: * -> *) a b. Foldable f => (a -> IO b) -> f a -> IO ()
A.mapConcurrently_ ((Subscriber -> IO ()) -> [Subscriber] -> IO ())
-> (Subscriber -> IO ()) -> [Subscriber] -> IO ()
forall a b. (a -> b) -> a -> b
$ \Subscriber {Maybe OperationName
RequestId
SubscriberMetadata
SubscriberId
OnChange
_sOnChangeCallback :: OnChange
_sOperationName :: Maybe OperationName
_sRequestId :: RequestId
_sMetadata :: SubscriberMetadata
_sId :: SubscriberId
_sOnChangeCallback :: Subscriber -> OnChange
_sOperationName :: Subscriber -> Maybe OperationName
_sRequestId :: Subscriber -> RequestId
_sMetadata :: Subscriber -> SubscriberMetadata
_sId :: Subscriber -> SubscriberId
..} -> OnChange
_sOnChangeCallback Either GQExecError SubscriptionResponse
response
pollLiveQuery ::
forall b.
BackendTransport b =>
PollerId ->
SubscriptionsOptions ->
(SourceName, SourceConfig b) ->
RoleName ->
ParameterizedQueryHash ->
MultiplexedQuery b ->
CohortMap 'LiveQuery ->
SubscriptionPostPollHook ->
IO ()
pollLiveQuery :: PollerId
-> SubscriptionsOptions
-> (SourceName, SourceConfig b)
-> RoleName
-> ParameterizedQueryHash
-> MultiplexedQuery b
-> CohortMap 'LiveQuery
-> SubscriptionPostPollHook
-> IO ()
pollLiveQuery PollerId
pollerId SubscriptionsOptions
lqOpts (SourceName
sourceName, SourceConfig b
sourceConfig) RoleName
roleName ParameterizedQueryHash
parameterizedQueryHash MultiplexedQuery b
query CohortMap 'LiveQuery
cohortMap SubscriptionPostPollHook
postPollHook = do
(DiffTime
totalTime, (DiffTime
snapshotTime, [BatchExecutionDetails]
batchesDetails)) <- IO (DiffTime, [BatchExecutionDetails])
-> IO (DiffTime, (DiffTime, [BatchExecutionDetails]))
forall (m :: * -> *) a. MonadIO m => m a -> m (DiffTime, a)
withElapsedTime (IO (DiffTime, [BatchExecutionDetails])
-> IO (DiffTime, (DiffTime, [BatchExecutionDetails])))
-> IO (DiffTime, [BatchExecutionDetails])
-> IO (DiffTime, (DiffTime, [BatchExecutionDetails]))
forall a b. (a -> b) -> a -> b
$ do
(DiffTime
snapshotTime, [(BatchId, [(CohortId, CohortSnapshot)])]
cohortBatches) <- IO [(BatchId, [(CohortId, CohortSnapshot)])]
-> IO (DiffTime, [(BatchId, [(CohortId, CohortSnapshot)])])
forall (m :: * -> *) a. MonadIO m => m a -> m (DiffTime, a)
withElapsedTime (IO [(BatchId, [(CohortId, CohortSnapshot)])]
-> IO (DiffTime, [(BatchId, [(CohortId, CohortSnapshot)])]))
-> IO [(BatchId, [(CohortId, CohortSnapshot)])]
-> IO (DiffTime, [(BatchId, [(CohortId, CohortSnapshot)])])
forall a b. (a -> b) -> a -> b
$ do
[(CohortVariables, Cohort ())]
cohorts <- STM [(CohortVariables, Cohort ())]
-> IO [(CohortVariables, Cohort ())]
forall a. STM a -> IO a
STM.atomically (STM [(CohortVariables, Cohort ())]
-> IO [(CohortVariables, Cohort ())])
-> STM [(CohortVariables, Cohort ())]
-> IO [(CohortVariables, Cohort ())]
forall a b. (a -> b) -> a -> b
$ TMap CohortVariables (Cohort ())
-> STM [(CohortVariables, Cohort ())]
forall k v. TMap k v -> STM [(k, v)]
TMap.toList TMap CohortVariables (Cohort ())
CohortMap 'LiveQuery
cohortMap
[(CohortId, CohortSnapshot)]
cohortSnapshots <- ((CohortVariables, Cohort ()) -> IO (CohortId, CohortSnapshot))
-> [(CohortVariables, Cohort ())]
-> IO [(CohortId, CohortSnapshot)]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM (STM (CohortId, CohortSnapshot) -> IO (CohortId, CohortSnapshot)
forall a. STM a -> IO a
STM.atomically (STM (CohortId, CohortSnapshot) -> IO (CohortId, CohortSnapshot))
-> ((CohortVariables, Cohort ()) -> STM (CohortId, CohortSnapshot))
-> (CohortVariables, Cohort ())
-> IO (CohortId, CohortSnapshot)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (CohortVariables, Cohort ()) -> STM (CohortId, CohortSnapshot)
getCohortSnapshot) [(CohortVariables, Cohort ())]
cohorts
let cohortBatches :: [[(CohortId, CohortSnapshot)]]
cohortBatches = Int
-> [(CohortId, CohortSnapshot)] -> [[(CohortId, CohortSnapshot)]]
forall e. Int -> [e] -> [[e]]
chunksOf (NonNegativeInt -> Int
Numeric.getNonNegativeInt (BatchSize -> NonNegativeInt
unBatchSize BatchSize
batchSize)) [(CohortId, CohortSnapshot)]
cohortSnapshots
[(BatchId, [(CohortId, CohortSnapshot)])]
-> IO [(BatchId, [(CohortId, CohortSnapshot)])]
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([(BatchId, [(CohortId, CohortSnapshot)])]
-> IO [(BatchId, [(CohortId, CohortSnapshot)])])
-> [(BatchId, [(CohortId, CohortSnapshot)])]
-> IO [(BatchId, [(CohortId, CohortSnapshot)])]
forall a b. (a -> b) -> a -> b
$ [BatchId]
-> [[(CohortId, CohortSnapshot)]]
-> [(BatchId, [(CohortId, CohortSnapshot)])]
forall a b. [a] -> [b] -> [(a, b)]
zip (Int -> BatchId
BatchId (Int -> BatchId) -> [Int] -> [BatchId]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [Int
1 ..]) [[(CohortId, CohortSnapshot)]]
cohortBatches
[BatchExecutionDetails]
batchesDetails <- [(BatchId, [(CohortId, CohortSnapshot)])]
-> ((BatchId, [(CohortId, CohortSnapshot)])
-> IO BatchExecutionDetails)
-> IO [BatchExecutionDetails]
forall (t :: * -> *) a b.
Traversable t =>
t a -> (a -> IO b) -> IO (t b)
A.forConcurrently [(BatchId, [(CohortId, CohortSnapshot)])]
cohortBatches (((BatchId, [(CohortId, CohortSnapshot)])
-> IO BatchExecutionDetails)
-> IO [BatchExecutionDetails])
-> ((BatchId, [(CohortId, CohortSnapshot)])
-> IO BatchExecutionDetails)
-> IO [BatchExecutionDetails]
forall a b. (a -> b) -> a -> b
$ \(BatchId
batchId, [(CohortId, CohortSnapshot)]
cohorts) -> do
(DiffTime
queryExecutionTime, Either QErr [(CohortId, ByteString)]
mxRes) <- SourceConfig b
-> MultiplexedQuery b
-> [(CohortId, CohortVariables)]
-> IO (DiffTime, Either QErr [(CohortId, ByteString)])
forall (b :: BackendType) (m :: * -> *).
(BackendTransport b, MonadIO m) =>
SourceConfig b
-> MultiplexedQuery b
-> [(CohortId, CohortVariables)]
-> m (DiffTime, Either QErr [(CohortId, ByteString)])
runDBSubscription @b SourceConfig b
sourceConfig MultiplexedQuery b
query ([(CohortId, CohortVariables)]
-> IO (DiffTime, Either QErr [(CohortId, ByteString)]))
-> [(CohortId, CohortVariables)]
-> IO (DiffTime, Either QErr [(CohortId, ByteString)])
forall a b. (a -> b) -> a -> b
$ ASetter
[(CohortId, CohortSnapshot)]
[(CohortId, CohortVariables)]
CohortSnapshot
CohortVariables
-> (CohortSnapshot -> CohortVariables)
-> [(CohortId, CohortSnapshot)]
-> [(CohortId, CohortVariables)]
forall s t a b. ASetter s t a b -> (a -> b) -> s -> t
over (((CohortId, CohortSnapshot)
-> Identity (CohortId, CohortVariables))
-> [(CohortId, CohortSnapshot)]
-> Identity [(CohortId, CohortVariables)]
forall s t a b. Each s t a b => Traversal s t a b
each (((CohortId, CohortSnapshot)
-> Identity (CohortId, CohortVariables))
-> [(CohortId, CohortSnapshot)]
-> Identity [(CohortId, CohortVariables)])
-> ((CohortSnapshot -> Identity CohortVariables)
-> (CohortId, CohortSnapshot)
-> Identity (CohortId, CohortVariables))
-> ASetter
[(CohortId, CohortSnapshot)]
[(CohortId, CohortVariables)]
CohortSnapshot
CohortVariables
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (CohortSnapshot -> Identity CohortVariables)
-> (CohortId, CohortSnapshot)
-> Identity (CohortId, CohortVariables)
forall s t a b. Field2 s t a b => Lens s t a b
_2) CohortSnapshot -> CohortVariables
C._csVariables [(CohortId, CohortSnapshot)]
cohorts
let lqMeta :: SubscriptionMetadata
lqMeta = DiffTime -> SubscriptionMetadata
SubscriptionMetadata (DiffTime -> SubscriptionMetadata)
-> DiffTime -> SubscriptionMetadata
forall a b. (a -> b) -> a -> b
$ DiffTime -> DiffTime
forall x y. (Duration x, Duration y) => x -> y
convertDuration DiffTime
queryExecutionTime
operations :: [(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
CohortSnapshot)]
operations = [(CohortId, CohortSnapshot)]
-> Either QErr [(CohortId, ByteString)]
-> [(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
CohortSnapshot)]
forall (m :: * -> *) k t.
(MonadError GQExecError m, Eq k, Hashable k) =>
[(k, t)]
-> Either QErr [(k, ByteString)]
-> [(m ByteString, k, Maybe (ResponseHash, Int), t)]
getCohortOperations [(CohortId, CohortSnapshot)]
cohorts Either QErr [(CohortId, ByteString)]
mxRes
batchResponseSize :: Maybe Int
batchResponseSize =
case Either QErr [(CohortId, ByteString)]
mxRes of
Left QErr
_ -> Maybe Int
forall a. Maybe a
Nothing
Right [(CohortId, ByteString)]
resp -> Int -> Maybe Int
forall a. a -> Maybe a
Just (Int -> Maybe Int) -> Int -> Maybe Int
forall a b. (a -> b) -> a -> b
$ Sum Int -> Int
forall a. Sum a -> a
getSum (Sum Int -> Int) -> Sum Int -> Int
forall a b. (a -> b) -> a -> b
$ ((CohortId, ByteString) -> Sum Int)
-> [(CohortId, ByteString)] -> Sum Int
forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
foldMap (Int -> Sum Int
forall a. a -> Sum a
Sum (Int -> Sum Int)
-> ((CohortId, ByteString) -> Int)
-> (CohortId, ByteString)
-> Sum Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Int
BS.length (ByteString -> Int)
-> ((CohortId, ByteString) -> ByteString)
-> (CohortId, ByteString)
-> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (CohortId, ByteString) -> ByteString
forall a b. (a, b) -> b
snd) [(CohortId, ByteString)]
resp
(DiffTime
pushTime, [CohortExecutionDetails]
cohortsExecutionDetails) <- IO [CohortExecutionDetails]
-> IO (DiffTime, [CohortExecutionDetails])
forall (m :: * -> *) a. MonadIO m => m a -> m (DiffTime, a)
withElapsedTime (IO [CohortExecutionDetails]
-> IO (DiffTime, [CohortExecutionDetails]))
-> IO [CohortExecutionDetails]
-> IO (DiffTime, [CohortExecutionDetails])
forall a b. (a -> b) -> a -> b
$
[(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
CohortSnapshot)]
-> ((GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
CohortSnapshot)
-> IO CohortExecutionDetails)
-> IO [CohortExecutionDetails]
forall (t :: * -> *) a b.
Traversable t =>
t a -> (a -> IO b) -> IO (t b)
A.forConcurrently [(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
CohortSnapshot)]
operations (((GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
CohortSnapshot)
-> IO CohortExecutionDetails)
-> IO [CohortExecutionDetails])
-> ((GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
CohortSnapshot)
-> IO CohortExecutionDetails)
-> IO [CohortExecutionDetails]
forall a b. (a -> b) -> a -> b
$ \(GQResult ByteString
res, CohortId
cohortId, Maybe (ResponseHash, Int)
respData, CohortSnapshot
snapshot) -> do
([SubscriberExecutionDetails]
pushedSubscribers, [SubscriberExecutionDetails]
ignoredSubscribers) <-
GQResult ByteString
-> Maybe ResponseHash
-> SubscriptionMetadata
-> CohortSnapshot 'LiveQuery
-> IO ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
pushResultToCohort GQResult ByteString
res ((ResponseHash, Int) -> ResponseHash
forall a b. (a, b) -> a
fst ((ResponseHash, Int) -> ResponseHash)
-> Maybe (ResponseHash, Int) -> Maybe ResponseHash
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (ResponseHash, Int)
respData) SubscriptionMetadata
lqMeta CohortSnapshot
CohortSnapshot 'LiveQuery
snapshot
CohortExecutionDetails -> IO CohortExecutionDetails
forall (f :: * -> *) a. Applicative f => a -> f a
pure
CohortExecutionDetails :: CohortId
-> CohortVariables
-> Maybe Int
-> [SubscriberExecutionDetails]
-> [SubscriberExecutionDetails]
-> BatchId
-> CohortExecutionDetails
CohortExecutionDetails
{ _cedCohortId :: CohortId
_cedCohortId = CohortId
cohortId,
_cedVariables :: CohortVariables
_cedVariables = CohortSnapshot -> CohortVariables
C._csVariables CohortSnapshot
snapshot,
_cedPushedTo :: [SubscriberExecutionDetails]
_cedPushedTo = [SubscriberExecutionDetails]
pushedSubscribers,
_cedIgnored :: [SubscriberExecutionDetails]
_cedIgnored = [SubscriberExecutionDetails]
ignoredSubscribers,
_cedResponseSize :: Maybe Int
_cedResponseSize = (ResponseHash, Int) -> Int
forall a b. (a, b) -> b
snd ((ResponseHash, Int) -> Int)
-> Maybe (ResponseHash, Int) -> Maybe Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (ResponseHash, Int)
respData,
_cedBatchId :: BatchId
_cedBatchId = BatchId
batchId
}
BatchExecutionDetails -> IO BatchExecutionDetails
forall (f :: * -> *) a. Applicative f => a -> f a
pure (BatchExecutionDetails -> IO BatchExecutionDetails)
-> BatchExecutionDetails -> IO BatchExecutionDetails
forall a b. (a -> b) -> a -> b
$
DiffTime
-> DiffTime
-> BatchId
-> [CohortExecutionDetails]
-> Maybe Int
-> BatchExecutionDetails
BatchExecutionDetails
DiffTime
queryExecutionTime
DiffTime
pushTime
BatchId
batchId
[CohortExecutionDetails]
cohortsExecutionDetails
Maybe Int
batchResponseSize
(DiffTime, [BatchExecutionDetails])
-> IO (DiffTime, [BatchExecutionDetails])
forall (f :: * -> *) a. Applicative f => a -> f a
pure (DiffTime
snapshotTime, [BatchExecutionDetails]
batchesDetails)
let pollDetails :: PollDetails
pollDetails =
PollDetails :: PollerId
-> Text
-> DiffTime
-> [BatchExecutionDetails]
-> DiffTime
-> SubscriptionsOptions
-> SourceName
-> RoleName
-> ParameterizedQueryHash
-> PollDetails
PollDetails
{ _pdPollerId :: PollerId
_pdPollerId = PollerId
pollerId,
_pdGeneratedSql :: Text
_pdGeneratedSql = MultiplexedQuery b -> Text
forall a. ToTxt a => a -> Text
toTxt MultiplexedQuery b
query,
_pdSnapshotTime :: DiffTime
_pdSnapshotTime = DiffTime
snapshotTime,
_pdBatches :: [BatchExecutionDetails]
_pdBatches = [BatchExecutionDetails]
batchesDetails,
_pdLiveQueryOptions :: SubscriptionsOptions
_pdLiveQueryOptions = SubscriptionsOptions
lqOpts,
_pdTotalTime :: DiffTime
_pdTotalTime = DiffTime
totalTime,
_pdSource :: SourceName
_pdSource = SourceName
sourceName,
_pdRole :: RoleName
_pdRole = RoleName
roleName,
_pdParameterizedQueryHash :: ParameterizedQueryHash
_pdParameterizedQueryHash = ParameterizedQueryHash
parameterizedQueryHash
}
SubscriptionPostPollHook
postPollHook PollDetails
pollDetails
where
SubscriptionsOptions BatchSize
batchSize RefetchInterval
_ = SubscriptionsOptions
lqOpts
getCohortSnapshot :: (CohortVariables, Cohort ()) -> STM (CohortId, CohortSnapshot)
getCohortSnapshot (CohortVariables
cohortVars, Cohort ()
handlerC) = do
let C.Cohort CohortId
resId TVar (Maybe ResponseHash)
respRef SubscriberMap
curOpsTV SubscriberMap
newOpsTV () = Cohort ()
handlerC
[(SubscriberId, Subscriber)]
curOpsL <- SubscriberMap -> STM [(SubscriberId, Subscriber)]
forall k v. TMap k v -> STM [(k, v)]
TMap.toList SubscriberMap
curOpsTV
[(SubscriberId, Subscriber)]
newOpsL <- SubscriberMap -> STM [(SubscriberId, Subscriber)]
forall k v. TMap k v -> STM [(k, v)]
TMap.toList SubscriberMap
newOpsTV
[(SubscriberId, Subscriber)]
-> ((SubscriberId, Subscriber) -> STM ()) -> STM ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [(SubscriberId, Subscriber)]
newOpsL (((SubscriberId, Subscriber) -> STM ()) -> STM ())
-> ((SubscriberId, Subscriber) -> STM ()) -> STM ()
forall a b. (a -> b) -> a -> b
$ \(SubscriberId
k, Subscriber
action) -> Subscriber -> SubscriberId -> SubscriberMap -> STM ()
forall k v. (Eq k, Hashable k) => v -> k -> TMap k v -> STM ()
TMap.insert Subscriber
action SubscriberId
k SubscriberMap
curOpsTV
SubscriberMap -> STM ()
forall k v. TMap k v -> STM ()
TMap.reset SubscriberMap
newOpsTV
let cohortSnapshot :: CohortSnapshot
cohortSnapshot = CohortVariables
-> TVar (Maybe ResponseHash)
-> [Subscriber]
-> [Subscriber]
-> CohortSnapshot
C.CohortSnapshot CohortVariables
cohortVars TVar (Maybe ResponseHash)
respRef (((SubscriberId, Subscriber) -> Subscriber)
-> [(SubscriberId, Subscriber)] -> [Subscriber]
forall a b. (a -> b) -> [a] -> [b]
map (SubscriberId, Subscriber) -> Subscriber
forall a b. (a, b) -> b
snd [(SubscriberId, Subscriber)]
curOpsL) (((SubscriberId, Subscriber) -> Subscriber)
-> [(SubscriberId, Subscriber)] -> [Subscriber]
forall a b. (a -> b) -> [a] -> [b]
map (SubscriberId, Subscriber) -> Subscriber
forall a b. (a, b) -> b
snd [(SubscriberId, Subscriber)]
newOpsL)
(CohortId, CohortSnapshot) -> STM (CohortId, CohortSnapshot)
forall (m :: * -> *) a. Monad m => a -> m a
return (CohortId
resId, CohortSnapshot
cohortSnapshot)
getCohortOperations :: [(k, t)]
-> Either QErr [(k, ByteString)]
-> [(m ByteString, k, Maybe (ResponseHash, Int), t)]
getCohortOperations [(k, t)]
cohorts = \case
Left QErr
e ->
let resp :: m ByteString
resp = GQExecError -> m ByteString
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (GQExecError -> m ByteString) -> GQExecError -> m ByteString
forall a b. (a -> b) -> a -> b
$ [Value] -> GQExecError
GQExecError [Bool -> QErr -> Value
encodeGQLErr Bool
False QErr
e]
in [(m ByteString
resp, k
cohortId, Maybe (ResponseHash, Int)
forall a. Maybe a
Nothing, t
snapshot) | (k
cohortId, t
snapshot) <- [(k, t)]
cohorts]
Right [(k, ByteString)]
responses -> do
let cohortSnapshotMap :: HashMap k t
cohortSnapshotMap = [(k, t)] -> HashMap k t
forall k v. (Eq k, Hashable k) => [(k, v)] -> HashMap k v
Map.fromList [(k, t)]
cohorts
(((k, ByteString)
-> Maybe (m ByteString, k, Maybe (ResponseHash, Int), t))
-> [(k, ByteString)]
-> [(m ByteString, k, Maybe (ResponseHash, Int), t)])
-> [(k, ByteString)]
-> ((k, ByteString)
-> Maybe (m ByteString, k, Maybe (ResponseHash, Int), t))
-> [(m ByteString, k, Maybe (ResponseHash, Int), t)]
forall a b c. (a -> b -> c) -> b -> a -> c
flip ((k, ByteString)
-> Maybe (m ByteString, k, Maybe (ResponseHash, Int), t))
-> [(k, ByteString)]
-> [(m ByteString, k, Maybe (ResponseHash, Int), t)]
forall (f :: * -> *) a b.
Filterable f =>
(a -> Maybe b) -> f a -> f b
mapMaybe [(k, ByteString)]
responses (((k, ByteString)
-> Maybe (m ByteString, k, Maybe (ResponseHash, Int), t))
-> [(m ByteString, k, Maybe (ResponseHash, Int), t)])
-> ((k, ByteString)
-> Maybe (m ByteString, k, Maybe (ResponseHash, Int), t))
-> [(m ByteString, k, Maybe (ResponseHash, Int), t)]
forall a b. (a -> b) -> a -> b
$ \(k
cohortId, ByteString
respBS) ->
let respHash :: ResponseHash
respHash = ByteString -> ResponseHash
mkRespHash ByteString
respBS
respSize :: Int
respSize = ByteString -> Int
BS.length ByteString
respBS
in
(ByteString -> m ByteString
forall (f :: * -> *) a. Applicative f => a -> f a
pure ByteString
respBS,k
cohortId,(ResponseHash, Int) -> Maybe (ResponseHash, Int)
forall a. a -> Maybe a
Just (ResponseHash
respHash, Int
respSize),)
(t -> (m ByteString, k, Maybe (ResponseHash, Int), t))
-> Maybe t -> Maybe (m ByteString, k, Maybe (ResponseHash, Int), t)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> k -> HashMap k t -> Maybe t
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
Map.lookup k
cohortId HashMap k t
cohortSnapshotMap