{-# LANGUAGE TemplateHaskell #-}

-- | Multiplexed subscription poller threads; see "Hasura.GraphQL.Execute.Subscription" for details.
module Hasura.GraphQL.Execute.Subscription.Poll.LiveQuery
  ( -- * Pollers

import Control.Concurrent.Async qualified as A
import Control.Concurrent.STM qualified as STM
import Control.Lens
import Data.ByteString qualified as BS
import Data.HashMap.Strict qualified as Map
import Data.List.Split (chunksOf)
import Data.Monoid (Sum (..))
import Data.Text.Extended
import GHC.AssertNF.CPP
import Hasura.Base.Error
import Hasura.GraphQL.Execute.Backend
import Hasura.GraphQL.Execute.Subscription.Options
import Hasura.GraphQL.Execute.Subscription.Poll.Common hiding (Cohort (..), CohortMap, CohortSnapshot (..))
import Hasura.GraphQL.Execute.Subscription.Poll.Common qualified as C
import Hasura.GraphQL.Execute.Subscription.TMap qualified as TMap
import Hasura.GraphQL.Execute.Subscription.Types
import Hasura.GraphQL.ParameterizedQueryHash (ParameterizedQueryHash)
import Hasura.GraphQL.Transport.Backend
import Hasura.GraphQL.Transport.HTTP.Protocol
import Hasura.Prelude
import Hasura.RQL.Types.Backend
import Hasura.RQL.Types.Common (SourceName)
import Hasura.RQL.Types.Numeric qualified as Numeric
import Hasura.RQL.Types.Subscription (SubscriptionType (..))
import Hasura.Session

pushResultToCohort ::
  GQResult BS.ByteString ->
  Maybe ResponseHash ->
  SubscriptionMetadata ->
  CohortSnapshot 'LiveQuery ->
  -- | subscribers to which data has been pushed, subscribers which already
  -- have this data (this information is exposed by metrics reporting)
  IO ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
pushResultToCohort :: GQResult ByteString
-> Maybe ResponseHash
-> SubscriptionMetadata
-> CohortSnapshot 'LiveQuery
-> IO ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
pushResultToCohort GQResult ByteString
result !Maybe ResponseHash
respHashM (SubscriptionMetadata DiffTime
dTime) CohortSnapshot 'LiveQuery
cohortSnapshot = do
  Maybe ResponseHash
prevRespHashM <- TVar (Maybe ResponseHash) -> IO (Maybe ResponseHash)
forall a. TVar a -> IO a
STM.readTVarIO TVar (Maybe ResponseHash)
  -- write to the current websockets if needed
subscribersToPush, [Subscriber]
subscribersToIgnore) <-
    if GQResult ByteString -> Bool
forall a. GQResult a -> Bool
isExecError GQResult ByteString
result Bool -> Bool -> Bool
|| Maybe ResponseHash
respHashM Maybe ResponseHash -> Maybe ResponseHash -> Bool
forall a. Eq a => a -> a -> Bool
/= Maybe ResponseHash
      then do
String -> Maybe ResponseHash -> IO ()
forall a. String -> a -> IO ()
$assertNFHere Maybe ResponseHash
respHashM -- so we don't write thunks to mutable vars
        STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
          TVar (Maybe ResponseHash) -> Maybe ResponseHash -> STM ()
forall a. TVar a -> a -> STM ()
STM.writeTVar TVar (Maybe ResponseHash)
respRef Maybe ResponseHash
        ([Subscriber], [Subscriber]) -> IO ([Subscriber], [Subscriber])
forall (m :: * -> *) a. Monad m => a -> m a
return ([Subscriber]
newSinks [Subscriber] -> [Subscriber] -> [Subscriber]
forall a. Semigroup a => a -> a -> a
<> [Subscriber]
curSinks, [Subscriber]
forall a. Monoid a => a
      else ([Subscriber], [Subscriber]) -> IO ([Subscriber], [Subscriber])
forall (m :: * -> *) a. Monad m => a -> m a
return ([Subscriber]
newSinks, [Subscriber]
  [Subscriber] -> IO ()
pushResultToSubscribers [Subscriber]
  ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
-> IO ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
forall (f :: * -> *) a. Applicative f => a -> f a
pure (([SubscriberExecutionDetails], [SubscriberExecutionDetails])
 -> IO ([SubscriberExecutionDetails], [SubscriberExecutionDetails]))
-> ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
-> IO ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
forall a b. (a -> b) -> a -> b
  ([Subscriber], [Subscriber])
  ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
-> (Subscriber -> SubscriberExecutionDetails)
-> ([Subscriber], [Subscriber])
-> ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
forall s t a b. ASetter s t a b -> (a -> b) -> s -> t
      (([Subscriber] -> Identity [SubscriberExecutionDetails])
-> ([Subscriber], [Subscriber])
-> Identity
     ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
forall s t a b. Each s t a b => Traversal s t a b
each (([Subscriber] -> Identity [SubscriberExecutionDetails])
 -> ([Subscriber], [Subscriber])
 -> Identity
      ([SubscriberExecutionDetails], [SubscriberExecutionDetails]))
-> ((Subscriber -> Identity SubscriberExecutionDetails)
    -> [Subscriber] -> Identity [SubscriberExecutionDetails])
-> ASetter
     ([Subscriber], [Subscriber])
     ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Subscriber -> Identity SubscriberExecutionDetails)
-> [Subscriber] -> Identity [SubscriberExecutionDetails]
forall s t a b. Each s t a b => Traversal s t a b
      ( \Subscriber {Maybe OperationName
_sOnChangeCallback :: Subscriber -> OnChange
_sOperationName :: Subscriber -> Maybe OperationName
_sRequestId :: Subscriber -> RequestId
_sMetadata :: Subscriber -> SubscriberMetadata
_sId :: Subscriber -> SubscriberId
_sOnChangeCallback :: OnChange
_sOperationName :: Maybe OperationName
_sRequestId :: RequestId
_sMetadata :: SubscriberMetadata
_sId :: SubscriberId
..} ->
          SubscriberId -> SubscriberMetadata -> SubscriberExecutionDetails
SubscriberExecutionDetails SubscriberId
_sId SubscriberMetadata
subscribersToPush, [Subscriber]
    C.CohortSnapshot CohortVariables
_ TVar (Maybe ResponseHash)
respRef [Subscriber]
curSinks [Subscriber]
newSinks = CohortSnapshot
CohortSnapshot 'LiveQuery

    response :: Either GQExecError SubscriptionResponse
response = GQResult ByteString
result GQResult ByteString
-> (ByteString -> SubscriptionResponse)
-> Either GQExecError SubscriptionResponse
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> (ByteString -> DiffTime -> SubscriptionResponse
`SubscriptionResponse` DiffTime
    pushResultToSubscribers :: [Subscriber] -> IO ()
pushResultToSubscribers =
      (Subscriber -> IO ()) -> [Subscriber] -> IO ()
forall (f :: * -> *) a b. Foldable f => (a -> IO b) -> f a -> IO ()
A.mapConcurrently_ ((Subscriber -> IO ()) -> [Subscriber] -> IO ())
-> (Subscriber -> IO ()) -> [Subscriber] -> IO ()
forall a b. (a -> b) -> a -> b
$ \Subscriber {Maybe OperationName
_sOnChangeCallback :: OnChange
_sOperationName :: Maybe OperationName
_sRequestId :: RequestId
_sMetadata :: SubscriberMetadata
_sId :: SubscriberId
_sOnChangeCallback :: Subscriber -> OnChange
_sOperationName :: Subscriber -> Maybe OperationName
_sRequestId :: Subscriber -> RequestId
_sMetadata :: Subscriber -> SubscriberMetadata
_sId :: Subscriber -> SubscriberId
..} -> OnChange
_sOnChangeCallback Either GQExecError SubscriptionResponse

-- | Where the magic happens: the top-level action run periodically by each
-- active 'Poller'. This needs to be async exception safe.
pollLiveQuery ::
  forall b.
  BackendTransport b =>
  PollerId ->
  SubscriptionsOptions ->
  (SourceName, SourceConfig b) ->
  RoleName ->
  ParameterizedQueryHash ->
  MultiplexedQuery b ->
  CohortMap 'LiveQuery ->
  SubscriptionPostPollHook ->
  IO ()
pollLiveQuery :: PollerId
-> SubscriptionsOptions
-> (SourceName, SourceConfig b)
-> RoleName
-> ParameterizedQueryHash
-> MultiplexedQuery b
-> CohortMap 'LiveQuery
-> SubscriptionPostPollHook
-> IO ()
pollLiveQuery PollerId
pollerId SubscriptionsOptions
lqOpts (SourceName
sourceName, SourceConfig b
sourceConfig) RoleName
roleName ParameterizedQueryHash
parameterizedQueryHash MultiplexedQuery b
query CohortMap 'LiveQuery
cohortMap SubscriptionPostPollHook
postPollHook = do
totalTime, (DiffTime
snapshotTime, [BatchExecutionDetails]
batchesDetails)) <- IO (DiffTime, [BatchExecutionDetails])
-> IO (DiffTime, (DiffTime, [BatchExecutionDetails]))
forall (m :: * -> *) a. MonadIO m => m a -> m (DiffTime, a)
withElapsedTime (IO (DiffTime, [BatchExecutionDetails])
 -> IO (DiffTime, (DiffTime, [BatchExecutionDetails])))
-> IO (DiffTime, [BatchExecutionDetails])
-> IO (DiffTime, (DiffTime, [BatchExecutionDetails]))
forall a b. (a -> b) -> a -> b
$ do
    -- snapshot the current cohorts and split them into batches
snapshotTime, [(BatchId, [(CohortId, CohortSnapshot)])]
cohortBatches) <- IO [(BatchId, [(CohortId, CohortSnapshot)])]
-> IO (DiffTime, [(BatchId, [(CohortId, CohortSnapshot)])])
forall (m :: * -> *) a. MonadIO m => m a -> m (DiffTime, a)
withElapsedTime (IO [(BatchId, [(CohortId, CohortSnapshot)])]
 -> IO (DiffTime, [(BatchId, [(CohortId, CohortSnapshot)])]))
-> IO [(BatchId, [(CohortId, CohortSnapshot)])]
-> IO (DiffTime, [(BatchId, [(CohortId, CohortSnapshot)])])
forall a b. (a -> b) -> a -> b
$ do
      -- get a snapshot of all the cohorts
      -- this need not be done in a transaction
      [(CohortVariables, Cohort ())]
cohorts <- STM [(CohortVariables, Cohort ())]
-> IO [(CohortVariables, Cohort ())]
forall a. STM a -> IO a
STM.atomically (STM [(CohortVariables, Cohort ())]
 -> IO [(CohortVariables, Cohort ())])
-> STM [(CohortVariables, Cohort ())]
-> IO [(CohortVariables, Cohort ())]
forall a b. (a -> b) -> a -> b
$ TMap CohortVariables (Cohort ())
-> STM [(CohortVariables, Cohort ())]
forall k v. TMap k v -> STM [(k, v)]
TMap.toList TMap CohortVariables (Cohort ())
CohortMap 'LiveQuery
      [(CohortId, CohortSnapshot)]
cohortSnapshots <- ((CohortVariables, Cohort ()) -> IO (CohortId, CohortSnapshot))
-> [(CohortVariables, Cohort ())]
-> IO [(CohortId, CohortSnapshot)]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM (STM (CohortId, CohortSnapshot) -> IO (CohortId, CohortSnapshot)
forall a. STM a -> IO a
STM.atomically (STM (CohortId, CohortSnapshot) -> IO (CohortId, CohortSnapshot))
-> ((CohortVariables, Cohort ()) -> STM (CohortId, CohortSnapshot))
-> (CohortVariables, Cohort ())
-> IO (CohortId, CohortSnapshot)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (CohortVariables, Cohort ()) -> STM (CohortId, CohortSnapshot)
getCohortSnapshot) [(CohortVariables, Cohort ())]
      -- cohorts are broken down into batches specified by the batch size
      let cohortBatches :: [[(CohortId, CohortSnapshot)]]
cohortBatches = Int
-> [(CohortId, CohortSnapshot)] -> [[(CohortId, CohortSnapshot)]]
forall e. Int -> [e] -> [[e]]
chunksOf (NonNegativeInt -> Int
Numeric.getNonNegativeInt (BatchSize -> NonNegativeInt
unBatchSize BatchSize
batchSize)) [(CohortId, CohortSnapshot)]
      -- associating every batch with their BatchId
      [(BatchId, [(CohortId, CohortSnapshot)])]
-> IO [(BatchId, [(CohortId, CohortSnapshot)])]
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([(BatchId, [(CohortId, CohortSnapshot)])]
 -> IO [(BatchId, [(CohortId, CohortSnapshot)])])
-> [(BatchId, [(CohortId, CohortSnapshot)])]
-> IO [(BatchId, [(CohortId, CohortSnapshot)])]
forall a b. (a -> b) -> a -> b
$ [BatchId]
-> [[(CohortId, CohortSnapshot)]]
-> [(BatchId, [(CohortId, CohortSnapshot)])]
forall a b. [a] -> [b] -> [(a, b)]
zip (Int -> BatchId
BatchId (Int -> BatchId) -> [Int] -> [BatchId]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [Int
1 ..]) [[(CohortId, CohortSnapshot)]]

    -- concurrently process each batch
batchesDetails <- [(BatchId, [(CohortId, CohortSnapshot)])]
-> ((BatchId, [(CohortId, CohortSnapshot)])
    -> IO BatchExecutionDetails)
-> IO [BatchExecutionDetails]
forall (t :: * -> *) a b.
Traversable t =>
t a -> (a -> IO b) -> IO (t b)
A.forConcurrently [(BatchId, [(CohortId, CohortSnapshot)])]
cohortBatches (((BatchId, [(CohortId, CohortSnapshot)])
  -> IO BatchExecutionDetails)
 -> IO [BatchExecutionDetails])
-> ((BatchId, [(CohortId, CohortSnapshot)])
    -> IO BatchExecutionDetails)
-> IO [BatchExecutionDetails]
forall a b. (a -> b) -> a -> b
$ \(BatchId
batchId, [(CohortId, CohortSnapshot)]
cohorts) -> do
queryExecutionTime, Either QErr [(CohortId, ByteString)]
mxRes) <- SourceConfig b
-> MultiplexedQuery b
-> [(CohortId, CohortVariables)]
-> IO (DiffTime, Either QErr [(CohortId, ByteString)])
forall (b :: BackendType) (m :: * -> *).
(BackendTransport b, MonadIO m) =>
SourceConfig b
-> MultiplexedQuery b
-> [(CohortId, CohortVariables)]
-> m (DiffTime, Either QErr [(CohortId, ByteString)])
runDBSubscription @b SourceConfig b
sourceConfig MultiplexedQuery b
query ([(CohortId, CohortVariables)]
 -> IO (DiffTime, Either QErr [(CohortId, ByteString)]))
-> [(CohortId, CohortVariables)]
-> IO (DiffTime, Either QErr [(CohortId, ByteString)])
forall a b. (a -> b) -> a -> b
$ ASetter
  [(CohortId, CohortSnapshot)]
  [(CohortId, CohortVariables)]
-> (CohortSnapshot -> CohortVariables)
-> [(CohortId, CohortSnapshot)]
-> [(CohortId, CohortVariables)]
forall s t a b. ASetter s t a b -> (a -> b) -> s -> t
over (((CohortId, CohortSnapshot)
 -> Identity (CohortId, CohortVariables))
-> [(CohortId, CohortSnapshot)]
-> Identity [(CohortId, CohortVariables)]
forall s t a b. Each s t a b => Traversal s t a b
each (((CohortId, CohortSnapshot)
  -> Identity (CohortId, CohortVariables))
 -> [(CohortId, CohortSnapshot)]
 -> Identity [(CohortId, CohortVariables)])
-> ((CohortSnapshot -> Identity CohortVariables)
    -> (CohortId, CohortSnapshot)
    -> Identity (CohortId, CohortVariables))
-> ASetter
     [(CohortId, CohortSnapshot)]
     [(CohortId, CohortVariables)]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (CohortSnapshot -> Identity CohortVariables)
-> (CohortId, CohortSnapshot)
-> Identity (CohortId, CohortVariables)
forall s t a b. Field2 s t a b => Lens s t a b
_2) CohortSnapshot -> CohortVariables
C._csVariables [(CohortId, CohortSnapshot)]

      let lqMeta :: SubscriptionMetadata
lqMeta = DiffTime -> SubscriptionMetadata
SubscriptionMetadata (DiffTime -> SubscriptionMetadata)
-> DiffTime -> SubscriptionMetadata
forall a b. (a -> b) -> a -> b
$ DiffTime -> DiffTime
forall x y. (Duration x, Duration y) => x -> y
convertDuration DiffTime
          operations :: [(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
operations = [(CohortId, CohortSnapshot)]
-> Either QErr [(CohortId, ByteString)]
-> [(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
forall (m :: * -> *) k t.
(MonadError GQExecError m, Eq k, Hashable k) =>
[(k, t)]
-> Either QErr [(k, ByteString)]
-> [(m ByteString, k, Maybe (ResponseHash, Int), t)]
getCohortOperations [(CohortId, CohortSnapshot)]
cohorts Either QErr [(CohortId, ByteString)]
          -- batch response size is the sum of the response sizes of the cohorts
          batchResponseSize :: Maybe Int
batchResponseSize =
            case Either QErr [(CohortId, ByteString)]
mxRes of
              Left QErr
_ -> Maybe Int
forall a. Maybe a
              Right [(CohortId, ByteString)]
resp -> Int -> Maybe Int
forall a. a -> Maybe a
Just (Int -> Maybe Int) -> Int -> Maybe Int
forall a b. (a -> b) -> a -> b
$ Sum Int -> Int
forall a. Sum a -> a
getSum (Sum Int -> Int) -> Sum Int -> Int
forall a b. (a -> b) -> a -> b
$ ((CohortId, ByteString) -> Sum Int)
-> [(CohortId, ByteString)] -> Sum Int
forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
foldMap (Int -> Sum Int
forall a. a -> Sum a
Sum (Int -> Sum Int)
-> ((CohortId, ByteString) -> Int)
-> (CohortId, ByteString)
-> Sum Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Int
BS.length (ByteString -> Int)
-> ((CohortId, ByteString) -> ByteString)
-> (CohortId, ByteString)
-> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (CohortId, ByteString) -> ByteString
forall a b. (a, b) -> b
snd) [(CohortId, ByteString)]
pushTime, [CohortExecutionDetails]
cohortsExecutionDetails) <- IO [CohortExecutionDetails]
-> IO (DiffTime, [CohortExecutionDetails])
forall (m :: * -> *) a. MonadIO m => m a -> m (DiffTime, a)
withElapsedTime (IO [CohortExecutionDetails]
 -> IO (DiffTime, [CohortExecutionDetails]))
-> IO [CohortExecutionDetails]
-> IO (DiffTime, [CohortExecutionDetails])
forall a b. (a -> b) -> a -> b
        [(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
-> ((GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
    -> IO CohortExecutionDetails)
-> IO [CohortExecutionDetails]
forall (t :: * -> *) a b.
Traversable t =>
t a -> (a -> IO b) -> IO (t b)
A.forConcurrently [(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
operations (((GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
  -> IO CohortExecutionDetails)
 -> IO [CohortExecutionDetails])
-> ((GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
    -> IO CohortExecutionDetails)
-> IO [CohortExecutionDetails]
forall a b. (a -> b) -> a -> b
$ \(GQResult ByteString
res, CohortId
cohortId, Maybe (ResponseHash, Int)
respData, CohortSnapshot
snapshot) -> do
pushedSubscribers, [SubscriberExecutionDetails]
ignoredSubscribers) <-
            GQResult ByteString
-> Maybe ResponseHash
-> SubscriptionMetadata
-> CohortSnapshot 'LiveQuery
-> IO ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
pushResultToCohort GQResult ByteString
res ((ResponseHash, Int) -> ResponseHash
forall a b. (a, b) -> a
fst ((ResponseHash, Int) -> ResponseHash)
-> Maybe (ResponseHash, Int) -> Maybe ResponseHash
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (ResponseHash, Int)
respData) SubscriptionMetadata
lqMeta CohortSnapshot
CohortSnapshot 'LiveQuery
          CohortExecutionDetails -> IO CohortExecutionDetails
forall (f :: * -> *) a. Applicative f => a -> f a
            CohortExecutionDetails :: CohortId
-> CohortVariables
-> Maybe Int
-> [SubscriberExecutionDetails]
-> [SubscriberExecutionDetails]
-> BatchId
-> CohortExecutionDetails
              { _cedCohortId :: CohortId
_cedCohortId = CohortId
                _cedVariables :: CohortVariables
_cedVariables = CohortSnapshot -> CohortVariables
C._csVariables CohortSnapshot
                _cedPushedTo :: [SubscriberExecutionDetails]
_cedPushedTo = [SubscriberExecutionDetails]
                _cedIgnored :: [SubscriberExecutionDetails]
_cedIgnored = [SubscriberExecutionDetails]
                _cedResponseSize :: Maybe Int
_cedResponseSize = (ResponseHash, Int) -> Int
forall a b. (a, b) -> b
snd ((ResponseHash, Int) -> Int)
-> Maybe (ResponseHash, Int) -> Maybe Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (ResponseHash, Int)
                _cedBatchId :: BatchId
_cedBatchId = BatchId
      BatchExecutionDetails -> IO BatchExecutionDetails
forall (f :: * -> *) a. Applicative f => a -> f a
pure (BatchExecutionDetails -> IO BatchExecutionDetails)
-> BatchExecutionDetails -> IO BatchExecutionDetails
forall a b. (a -> b) -> a -> b
-> DiffTime
-> BatchId
-> [CohortExecutionDetails]
-> Maybe Int
-> BatchExecutionDetails
          Maybe Int

    (DiffTime, [BatchExecutionDetails])
-> IO (DiffTime, [BatchExecutionDetails])
forall (f :: * -> *) a. Applicative f => a -> f a
pure (DiffTime
snapshotTime, [BatchExecutionDetails]

  let pollDetails :: PollDetails
pollDetails =
        PollDetails :: PollerId
-> Text
-> DiffTime
-> [BatchExecutionDetails]
-> DiffTime
-> SubscriptionsOptions
-> SourceName
-> RoleName
-> ParameterizedQueryHash
-> PollDetails
          { _pdPollerId :: PollerId
_pdPollerId = PollerId
            _pdGeneratedSql :: Text
_pdGeneratedSql = MultiplexedQuery b -> Text
forall a. ToTxt a => a -> Text
toTxt MultiplexedQuery b
            _pdSnapshotTime :: DiffTime
_pdSnapshotTime = DiffTime
            _pdBatches :: [BatchExecutionDetails]
_pdBatches = [BatchExecutionDetails]
            _pdLiveQueryOptions :: SubscriptionsOptions
_pdLiveQueryOptions = SubscriptionsOptions
            _pdTotalTime :: DiffTime
_pdTotalTime = DiffTime
            _pdSource :: SourceName
_pdSource = SourceName
            _pdRole :: RoleName
_pdRole = RoleName
            _pdParameterizedQueryHash :: ParameterizedQueryHash
_pdParameterizedQueryHash = ParameterizedQueryHash
postPollHook PollDetails
    SubscriptionsOptions BatchSize
batchSize RefetchInterval
_ = SubscriptionsOptions

    getCohortSnapshot :: (CohortVariables, Cohort ()) -> STM (CohortId, CohortSnapshot)
getCohortSnapshot (CohortVariables
cohortVars, Cohort ()
handlerC) = do
      let C.Cohort CohortId
resId TVar (Maybe ResponseHash)
respRef SubscriberMap
curOpsTV SubscriberMap
newOpsTV () = Cohort ()
      [(SubscriberId, Subscriber)]
curOpsL <- SubscriberMap -> STM [(SubscriberId, Subscriber)]
forall k v. TMap k v -> STM [(k, v)]
TMap.toList SubscriberMap
      [(SubscriberId, Subscriber)]
newOpsL <- SubscriberMap -> STM [(SubscriberId, Subscriber)]
forall k v. TMap k v -> STM [(k, v)]
TMap.toList SubscriberMap
      [(SubscriberId, Subscriber)]
-> ((SubscriberId, Subscriber) -> STM ()) -> STM ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [(SubscriberId, Subscriber)]
newOpsL (((SubscriberId, Subscriber) -> STM ()) -> STM ())
-> ((SubscriberId, Subscriber) -> STM ()) -> STM ()
forall a b. (a -> b) -> a -> b
$ \(SubscriberId
k, Subscriber
action) -> Subscriber -> SubscriberId -> SubscriberMap -> STM ()
forall k v. (Eq k, Hashable k) => v -> k -> TMap k v -> STM ()
TMap.insert Subscriber
action SubscriberId
k SubscriberMap
      SubscriberMap -> STM ()
forall k v. TMap k v -> STM ()
TMap.reset SubscriberMap

      let cohortSnapshot :: CohortSnapshot
cohortSnapshot = CohortVariables
-> TVar (Maybe ResponseHash)
-> [Subscriber]
-> [Subscriber]
-> CohortSnapshot
C.CohortSnapshot CohortVariables
cohortVars TVar (Maybe ResponseHash)
respRef (((SubscriberId, Subscriber) -> Subscriber)
-> [(SubscriberId, Subscriber)] -> [Subscriber]
forall a b. (a -> b) -> [a] -> [b]
map (SubscriberId, Subscriber) -> Subscriber
forall a b. (a, b) -> b
snd [(SubscriberId, Subscriber)]
curOpsL) (((SubscriberId, Subscriber) -> Subscriber)
-> [(SubscriberId, Subscriber)] -> [Subscriber]
forall a b. (a -> b) -> [a] -> [b]
map (SubscriberId, Subscriber) -> Subscriber
forall a b. (a, b) -> b
snd [(SubscriberId, Subscriber)]
      (CohortId, CohortSnapshot) -> STM (CohortId, CohortSnapshot)
forall (m :: * -> *) a. Monad m => a -> m a
return (CohortId
resId, CohortSnapshot

    getCohortOperations :: [(k, t)]
-> Either QErr [(k, ByteString)]
-> [(m ByteString, k, Maybe (ResponseHash, Int), t)]
getCohortOperations [(k, t)]
cohorts = \case
      Left QErr
e ->
        -- TODO: this is internal error
        let resp :: m ByteString
resp = GQExecError -> m ByteString
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (GQExecError -> m ByteString) -> GQExecError -> m ByteString
forall a b. (a -> b) -> a -> b
$ [Value] -> GQExecError
GQExecError [Bool -> QErr -> Value
encodeGQLErr Bool
False QErr
         in [(m ByteString
resp, k
cohortId, Maybe (ResponseHash, Int)
forall a. Maybe a
Nothing, t
snapshot) | (k
cohortId, t
snapshot) <- [(k, t)]
      Right [(k, ByteString)]
responses -> do
        let cohortSnapshotMap :: HashMap k t
cohortSnapshotMap = [(k, t)] -> HashMap k t
forall k v. (Eq k, Hashable k) => [(k, v)] -> HashMap k v
Map.fromList [(k, t)]
        (((k, ByteString)
  -> Maybe (m ByteString, k, Maybe (ResponseHash, Int), t))
 -> [(k, ByteString)]
 -> [(m ByteString, k, Maybe (ResponseHash, Int), t)])
-> [(k, ByteString)]
-> ((k, ByteString)
    -> Maybe (m ByteString, k, Maybe (ResponseHash, Int), t))
-> [(m ByteString, k, Maybe (ResponseHash, Int), t)]
forall a b c. (a -> b -> c) -> b -> a -> c
flip ((k, ByteString)
 -> Maybe (m ByteString, k, Maybe (ResponseHash, Int), t))
-> [(k, ByteString)]
-> [(m ByteString, k, Maybe (ResponseHash, Int), t)]
forall (f :: * -> *) a b.
Filterable f =>
(a -> Maybe b) -> f a -> f b
mapMaybe [(k, ByteString)]
responses (((k, ByteString)
  -> Maybe (m ByteString, k, Maybe (ResponseHash, Int), t))
 -> [(m ByteString, k, Maybe (ResponseHash, Int), t)])
-> ((k, ByteString)
    -> Maybe (m ByteString, k, Maybe (ResponseHash, Int), t))
-> [(m ByteString, k, Maybe (ResponseHash, Int), t)]
forall a b. (a -> b) -> a -> b
$ \(k
cohortId, ByteString
respBS) ->
          let respHash :: ResponseHash
respHash = ByteString -> ResponseHash
mkRespHash ByteString
              respSize :: Int
respSize = ByteString -> Int
BS.length ByteString
           in -- TODO: currently we ignore the cases when the cohortId from
              -- Postgres response is not present in the cohort map of this batch
              -- (this shouldn't happen but if it happens it means a logic error and
              -- we should log it)
              (ByteString -> m ByteString
forall (f :: * -> *) a. Applicative f => a -> f a
pure ByteString
cohortId,(ResponseHash, Int) -> Maybe (ResponseHash, Int)
forall a. a -> Maybe a
Just (ResponseHash
respHash, Int
                (t -> (m ByteString, k, Maybe (ResponseHash, Int), t))
-> Maybe t -> Maybe (m ByteString, k, Maybe (ResponseHash, Int), t)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> k -> HashMap k t -> Maybe t
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
Map.lookup k
cohortId HashMap k t