{-# LANGUAGE TemplateHaskell #-}

-- | Multiplexed subscription poller threads; see "Hasura.GraphQL.Execute.Subscription" for details.
module Hasura.GraphQL.Execute.Subscription.Poll.LiveQuery
  ( -- * Pollers
    pollLiveQuery,
  )
where

import Control.Concurrent.Async qualified as A
import Control.Concurrent.STM qualified as STM
import Control.Lens
import Data.ByteString qualified as BS
import Data.HashMap.Strict qualified as Map
import Data.List.Split (chunksOf)
import Data.Monoid (Sum (..))
import Data.Text.Extended
import GHC.AssertNF.CPP
import Hasura.Base.Error
import Hasura.GraphQL.Execute.Backend
import Hasura.GraphQL.Execute.Subscription.Options
import Hasura.GraphQL.Execute.Subscription.Poll.Common hiding (Cohort (..), CohortMap, CohortSnapshot (..))
import Hasura.GraphQL.Execute.Subscription.Poll.Common qualified as C
import Hasura.GraphQL.Execute.Subscription.TMap qualified as TMap
import Hasura.GraphQL.Execute.Subscription.Types
import Hasura.GraphQL.ParameterizedQueryHash (ParameterizedQueryHash)
import Hasura.GraphQL.Transport.Backend
import Hasura.GraphQL.Transport.HTTP.Protocol
import Hasura.Prelude
import Hasura.RQL.Types.Backend
import Hasura.RQL.Types.Common (SourceName)
import Hasura.RQL.Types.Numeric qualified as Numeric
import Hasura.RQL.Types.Subscription (SubscriptionType (..))
import Hasura.Session

pushResultToCohort ::
  GQResult BS.ByteString ->
  Maybe ResponseHash ->
  SubscriptionMetadata ->
  CohortSnapshot 'LiveQuery ->
  -- | subscribers to which data has been pushed, subscribers which already
  -- have this data (this information is exposed by metrics reporting)
  IO ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
pushResultToCohort :: GQResult ByteString
-> Maybe ResponseHash
-> SubscriptionMetadata
-> CohortSnapshot 'LiveQuery
-> IO ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
pushResultToCohort GQResult ByteString
result !Maybe ResponseHash
respHashM (SubscriptionMetadata DiffTime
dTime) CohortSnapshot 'LiveQuery
cohortSnapshot = do
  Maybe ResponseHash
prevRespHashM <- TVar (Maybe ResponseHash) -> IO (Maybe ResponseHash)
forall a. TVar a -> IO a
STM.readTVarIO TVar (Maybe ResponseHash)
respRef
  -- write to the current websockets if needed
  ([Subscriber]
subscribersToPush, [Subscriber]
subscribersToIgnore) <-
    if GQResult ByteString -> Bool
forall a. GQResult a -> Bool
isExecError GQResult ByteString
result Bool -> Bool -> Bool
|| Maybe ResponseHash
respHashM Maybe ResponseHash -> Maybe ResponseHash -> Bool
forall a. Eq a => a -> a -> Bool
/= Maybe ResponseHash
prevRespHashM
      then do
        String
String -> Maybe ResponseHash -> IO ()
forall a. String -> a -> IO ()
$assertNFHere Maybe ResponseHash
respHashM -- so we don't write thunks to mutable vars
        STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
          TVar (Maybe ResponseHash) -> Maybe ResponseHash -> STM ()
forall a. TVar a -> a -> STM ()
STM.writeTVar TVar (Maybe ResponseHash)
respRef Maybe ResponseHash
respHashM
        ([Subscriber], [Subscriber]) -> IO ([Subscriber], [Subscriber])
forall (m :: * -> *) a. Monad m => a -> m a
return ([Subscriber]
newSinks [Subscriber] -> [Subscriber] -> [Subscriber]
forall a. Semigroup a => a -> a -> a
<> [Subscriber]
curSinks, [Subscriber]
forall a. Monoid a => a
mempty)
      else ([Subscriber], [Subscriber]) -> IO ([Subscriber], [Subscriber])
forall (m :: * -> *) a. Monad m => a -> m a
return ([Subscriber]
newSinks, [Subscriber]
curSinks)
  [Subscriber] -> IO ()
pushResultToSubscribers [Subscriber]
subscribersToPush
  ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
-> IO ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
forall (f :: * -> *) a. Applicative f => a -> f a
pure (([SubscriberExecutionDetails], [SubscriberExecutionDetails])
 -> IO ([SubscriberExecutionDetails], [SubscriberExecutionDetails]))
-> ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
-> IO ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
forall a b. (a -> b) -> a -> b
$
    ASetter
  ([Subscriber], [Subscriber])
  ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
  Subscriber
  SubscriberExecutionDetails
-> (Subscriber -> SubscriberExecutionDetails)
-> ([Subscriber], [Subscriber])
-> ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
forall s t a b. ASetter s t a b -> (a -> b) -> s -> t
over
      (([Subscriber] -> Identity [SubscriberExecutionDetails])
-> ([Subscriber], [Subscriber])
-> Identity
     ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
forall s t a b. Each s t a b => Traversal s t a b
each (([Subscriber] -> Identity [SubscriberExecutionDetails])
 -> ([Subscriber], [Subscriber])
 -> Identity
      ([SubscriberExecutionDetails], [SubscriberExecutionDetails]))
-> ((Subscriber -> Identity SubscriberExecutionDetails)
    -> [Subscriber] -> Identity [SubscriberExecutionDetails])
-> ASetter
     ([Subscriber], [Subscriber])
     ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
     Subscriber
     SubscriberExecutionDetails
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Subscriber -> Identity SubscriberExecutionDetails)
-> [Subscriber] -> Identity [SubscriberExecutionDetails]
forall s t a b. Each s t a b => Traversal s t a b
each)
      ( \Subscriber {Maybe OperationName
RequestId
SubscriberMetadata
SubscriberId
OnChange
_sOnChangeCallback :: Subscriber -> OnChange
_sOperationName :: Subscriber -> Maybe OperationName
_sRequestId :: Subscriber -> RequestId
_sMetadata :: Subscriber -> SubscriberMetadata
_sId :: Subscriber -> SubscriberId
_sOnChangeCallback :: OnChange
_sOperationName :: Maybe OperationName
_sRequestId :: RequestId
_sMetadata :: SubscriberMetadata
_sId :: SubscriberId
..} ->
          SubscriberId -> SubscriberMetadata -> SubscriberExecutionDetails
SubscriberExecutionDetails SubscriberId
_sId SubscriberMetadata
_sMetadata
      )
      ([Subscriber]
subscribersToPush, [Subscriber]
subscribersToIgnore)
  where
    C.CohortSnapshot CohortVariables
_ TVar (Maybe ResponseHash)
respRef [Subscriber]
curSinks [Subscriber]
newSinks = CohortSnapshot
CohortSnapshot 'LiveQuery
cohortSnapshot

    response :: Either GQExecError SubscriptionResponse
response = GQResult ByteString
result GQResult ByteString
-> (ByteString -> SubscriptionResponse)
-> Either GQExecError SubscriptionResponse
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> (ByteString -> DiffTime -> SubscriptionResponse
`SubscriptionResponse` DiffTime
dTime)
    pushResultToSubscribers :: [Subscriber] -> IO ()
pushResultToSubscribers =
      (Subscriber -> IO ()) -> [Subscriber] -> IO ()
forall (f :: * -> *) a b. Foldable f => (a -> IO b) -> f a -> IO ()
A.mapConcurrently_ ((Subscriber -> IO ()) -> [Subscriber] -> IO ())
-> (Subscriber -> IO ()) -> [Subscriber] -> IO ()
forall a b. (a -> b) -> a -> b
$ \Subscriber {Maybe OperationName
RequestId
SubscriberMetadata
SubscriberId
OnChange
_sOnChangeCallback :: OnChange
_sOperationName :: Maybe OperationName
_sRequestId :: RequestId
_sMetadata :: SubscriberMetadata
_sId :: SubscriberId
_sOnChangeCallback :: Subscriber -> OnChange
_sOperationName :: Subscriber -> Maybe OperationName
_sRequestId :: Subscriber -> RequestId
_sMetadata :: Subscriber -> SubscriberMetadata
_sId :: Subscriber -> SubscriberId
..} -> OnChange
_sOnChangeCallback Either GQExecError SubscriptionResponse
response

-- | Where the magic happens: the top-level action run periodically by each
-- active 'Poller'. This needs to be async exception safe.
pollLiveQuery ::
  forall b.
  BackendTransport b =>
  PollerId ->
  SubscriptionsOptions ->
  (SourceName, SourceConfig b) ->
  RoleName ->
  ParameterizedQueryHash ->
  MultiplexedQuery b ->
  CohortMap 'LiveQuery ->
  SubscriptionPostPollHook ->
  IO ()
pollLiveQuery :: PollerId
-> SubscriptionsOptions
-> (SourceName, SourceConfig b)
-> RoleName
-> ParameterizedQueryHash
-> MultiplexedQuery b
-> CohortMap 'LiveQuery
-> SubscriptionPostPollHook
-> IO ()
pollLiveQuery PollerId
pollerId SubscriptionsOptions
lqOpts (SourceName
sourceName, SourceConfig b
sourceConfig) RoleName
roleName ParameterizedQueryHash
parameterizedQueryHash MultiplexedQuery b
query CohortMap 'LiveQuery
cohortMap SubscriptionPostPollHook
postPollHook = do
  (DiffTime
totalTime, (DiffTime
snapshotTime, [BatchExecutionDetails]
batchesDetails)) <- IO (DiffTime, [BatchExecutionDetails])
-> IO (DiffTime, (DiffTime, [BatchExecutionDetails]))
forall (m :: * -> *) a. MonadIO m => m a -> m (DiffTime, a)
withElapsedTime (IO (DiffTime, [BatchExecutionDetails])
 -> IO (DiffTime, (DiffTime, [BatchExecutionDetails])))
-> IO (DiffTime, [BatchExecutionDetails])
-> IO (DiffTime, (DiffTime, [BatchExecutionDetails]))
forall a b. (a -> b) -> a -> b
$ do
    -- snapshot the current cohorts and split them into batches
    (DiffTime
snapshotTime, [(BatchId, [(CohortId, CohortSnapshot)])]
cohortBatches) <- IO [(BatchId, [(CohortId, CohortSnapshot)])]
-> IO (DiffTime, [(BatchId, [(CohortId, CohortSnapshot)])])
forall (m :: * -> *) a. MonadIO m => m a -> m (DiffTime, a)
withElapsedTime (IO [(BatchId, [(CohortId, CohortSnapshot)])]
 -> IO (DiffTime, [(BatchId, [(CohortId, CohortSnapshot)])]))
-> IO [(BatchId, [(CohortId, CohortSnapshot)])]
-> IO (DiffTime, [(BatchId, [(CohortId, CohortSnapshot)])])
forall a b. (a -> b) -> a -> b
$ do
      -- get a snapshot of all the cohorts
      -- this need not be done in a transaction
      [(CohortVariables, Cohort ())]
cohorts <- STM [(CohortVariables, Cohort ())]
-> IO [(CohortVariables, Cohort ())]
forall a. STM a -> IO a
STM.atomically (STM [(CohortVariables, Cohort ())]
 -> IO [(CohortVariables, Cohort ())])
-> STM [(CohortVariables, Cohort ())]
-> IO [(CohortVariables, Cohort ())]
forall a b. (a -> b) -> a -> b
$ TMap CohortVariables (Cohort ())
-> STM [(CohortVariables, Cohort ())]
forall k v. TMap k v -> STM [(k, v)]
TMap.toList TMap CohortVariables (Cohort ())
CohortMap 'LiveQuery
cohortMap
      [(CohortId, CohortSnapshot)]
cohortSnapshots <- ((CohortVariables, Cohort ()) -> IO (CohortId, CohortSnapshot))
-> [(CohortVariables, Cohort ())]
-> IO [(CohortId, CohortSnapshot)]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM (STM (CohortId, CohortSnapshot) -> IO (CohortId, CohortSnapshot)
forall a. STM a -> IO a
STM.atomically (STM (CohortId, CohortSnapshot) -> IO (CohortId, CohortSnapshot))
-> ((CohortVariables, Cohort ()) -> STM (CohortId, CohortSnapshot))
-> (CohortVariables, Cohort ())
-> IO (CohortId, CohortSnapshot)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (CohortVariables, Cohort ()) -> STM (CohortId, CohortSnapshot)
getCohortSnapshot) [(CohortVariables, Cohort ())]
cohorts
      -- cohorts are broken down into batches specified by the batch size
      let cohortBatches :: [[(CohortId, CohortSnapshot)]]
cohortBatches = Int
-> [(CohortId, CohortSnapshot)] -> [[(CohortId, CohortSnapshot)]]
forall e. Int -> [e] -> [[e]]
chunksOf (NonNegativeInt -> Int
Numeric.getNonNegativeInt (BatchSize -> NonNegativeInt
unBatchSize BatchSize
batchSize)) [(CohortId, CohortSnapshot)]
cohortSnapshots
      -- associating every batch with their BatchId
      [(BatchId, [(CohortId, CohortSnapshot)])]
-> IO [(BatchId, [(CohortId, CohortSnapshot)])]
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([(BatchId, [(CohortId, CohortSnapshot)])]
 -> IO [(BatchId, [(CohortId, CohortSnapshot)])])
-> [(BatchId, [(CohortId, CohortSnapshot)])]
-> IO [(BatchId, [(CohortId, CohortSnapshot)])]
forall a b. (a -> b) -> a -> b
$ [BatchId]
-> [[(CohortId, CohortSnapshot)]]
-> [(BatchId, [(CohortId, CohortSnapshot)])]
forall a b. [a] -> [b] -> [(a, b)]
zip (Int -> BatchId
BatchId (Int -> BatchId) -> [Int] -> [BatchId]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [Int
1 ..]) [[(CohortId, CohortSnapshot)]]
cohortBatches

    -- concurrently process each batch
    [BatchExecutionDetails]
batchesDetails <- [(BatchId, [(CohortId, CohortSnapshot)])]
-> ((BatchId, [(CohortId, CohortSnapshot)])
    -> IO BatchExecutionDetails)
-> IO [BatchExecutionDetails]
forall (t :: * -> *) a b.
Traversable t =>
t a -> (a -> IO b) -> IO (t b)
A.forConcurrently [(BatchId, [(CohortId, CohortSnapshot)])]
cohortBatches (((BatchId, [(CohortId, CohortSnapshot)])
  -> IO BatchExecutionDetails)
 -> IO [BatchExecutionDetails])
-> ((BatchId, [(CohortId, CohortSnapshot)])
    -> IO BatchExecutionDetails)
-> IO [BatchExecutionDetails]
forall a b. (a -> b) -> a -> b
$ \(BatchId
batchId, [(CohortId, CohortSnapshot)]
cohorts) -> do
      (DiffTime
queryExecutionTime, Either QErr [(CohortId, ByteString)]
mxRes) <- SourceConfig b
-> MultiplexedQuery b
-> [(CohortId, CohortVariables)]
-> IO (DiffTime, Either QErr [(CohortId, ByteString)])
forall (b :: BackendType) (m :: * -> *).
(BackendTransport b, MonadIO m) =>
SourceConfig b
-> MultiplexedQuery b
-> [(CohortId, CohortVariables)]
-> m (DiffTime, Either QErr [(CohortId, ByteString)])
runDBSubscription @b SourceConfig b
sourceConfig MultiplexedQuery b
query ([(CohortId, CohortVariables)]
 -> IO (DiffTime, Either QErr [(CohortId, ByteString)]))
-> [(CohortId, CohortVariables)]
-> IO (DiffTime, Either QErr [(CohortId, ByteString)])
forall a b. (a -> b) -> a -> b
$ ASetter
  [(CohortId, CohortSnapshot)]
  [(CohortId, CohortVariables)]
  CohortSnapshot
  CohortVariables
-> (CohortSnapshot -> CohortVariables)
-> [(CohortId, CohortSnapshot)]
-> [(CohortId, CohortVariables)]
forall s t a b. ASetter s t a b -> (a -> b) -> s -> t
over (((CohortId, CohortSnapshot)
 -> Identity (CohortId, CohortVariables))
-> [(CohortId, CohortSnapshot)]
-> Identity [(CohortId, CohortVariables)]
forall s t a b. Each s t a b => Traversal s t a b
each (((CohortId, CohortSnapshot)
  -> Identity (CohortId, CohortVariables))
 -> [(CohortId, CohortSnapshot)]
 -> Identity [(CohortId, CohortVariables)])
-> ((CohortSnapshot -> Identity CohortVariables)
    -> (CohortId, CohortSnapshot)
    -> Identity (CohortId, CohortVariables))
-> ASetter
     [(CohortId, CohortSnapshot)]
     [(CohortId, CohortVariables)]
     CohortSnapshot
     CohortVariables
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (CohortSnapshot -> Identity CohortVariables)
-> (CohortId, CohortSnapshot)
-> Identity (CohortId, CohortVariables)
forall s t a b. Field2 s t a b => Lens s t a b
_2) CohortSnapshot -> CohortVariables
C._csVariables [(CohortId, CohortSnapshot)]
cohorts

      let lqMeta :: SubscriptionMetadata
lqMeta = DiffTime -> SubscriptionMetadata
SubscriptionMetadata (DiffTime -> SubscriptionMetadata)
-> DiffTime -> SubscriptionMetadata
forall a b. (a -> b) -> a -> b
$ DiffTime -> DiffTime
forall x y. (Duration x, Duration y) => x -> y
convertDuration DiffTime
queryExecutionTime
          operations :: [(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
  CohortSnapshot)]
operations = [(CohortId, CohortSnapshot)]
-> Either QErr [(CohortId, ByteString)]
-> [(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
     CohortSnapshot)]
forall (m :: * -> *) k t.
(MonadError GQExecError m, Eq k, Hashable k) =>
[(k, t)]
-> Either QErr [(k, ByteString)]
-> [(m ByteString, k, Maybe (ResponseHash, Int), t)]
getCohortOperations [(CohortId, CohortSnapshot)]
cohorts Either QErr [(CohortId, ByteString)]
mxRes
          -- batch response size is the sum of the response sizes of the cohorts
          batchResponseSize :: Maybe Int
batchResponseSize =
            case Either QErr [(CohortId, ByteString)]
mxRes of
              Left QErr
_ -> Maybe Int
forall a. Maybe a
Nothing
              Right [(CohortId, ByteString)]
resp -> Int -> Maybe Int
forall a. a -> Maybe a
Just (Int -> Maybe Int) -> Int -> Maybe Int
forall a b. (a -> b) -> a -> b
$ Sum Int -> Int
forall a. Sum a -> a
getSum (Sum Int -> Int) -> Sum Int -> Int
forall a b. (a -> b) -> a -> b
$ ((CohortId, ByteString) -> Sum Int)
-> [(CohortId, ByteString)] -> Sum Int
forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
foldMap (Int -> Sum Int
forall a. a -> Sum a
Sum (Int -> Sum Int)
-> ((CohortId, ByteString) -> Int)
-> (CohortId, ByteString)
-> Sum Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Int
BS.length (ByteString -> Int)
-> ((CohortId, ByteString) -> ByteString)
-> (CohortId, ByteString)
-> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (CohortId, ByteString) -> ByteString
forall a b. (a, b) -> b
snd) [(CohortId, ByteString)]
resp
      (DiffTime
pushTime, [CohortExecutionDetails]
cohortsExecutionDetails) <- IO [CohortExecutionDetails]
-> IO (DiffTime, [CohortExecutionDetails])
forall (m :: * -> *) a. MonadIO m => m a -> m (DiffTime, a)
withElapsedTime (IO [CohortExecutionDetails]
 -> IO (DiffTime, [CohortExecutionDetails]))
-> IO [CohortExecutionDetails]
-> IO (DiffTime, [CohortExecutionDetails])
forall a b. (a -> b) -> a -> b
$
        [(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
  CohortSnapshot)]
-> ((GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
     CohortSnapshot)
    -> IO CohortExecutionDetails)
-> IO [CohortExecutionDetails]
forall (t :: * -> *) a b.
Traversable t =>
t a -> (a -> IO b) -> IO (t b)
A.forConcurrently [(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
  CohortSnapshot)]
operations (((GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
   CohortSnapshot)
  -> IO CohortExecutionDetails)
 -> IO [CohortExecutionDetails])
-> ((GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
     CohortSnapshot)
    -> IO CohortExecutionDetails)
-> IO [CohortExecutionDetails]
forall a b. (a -> b) -> a -> b
$ \(GQResult ByteString
res, CohortId
cohortId, Maybe (ResponseHash, Int)
respData, CohortSnapshot
snapshot) -> do
          ([SubscriberExecutionDetails]
pushedSubscribers, [SubscriberExecutionDetails]
ignoredSubscribers) <-
            GQResult ByteString
-> Maybe ResponseHash
-> SubscriptionMetadata
-> CohortSnapshot 'LiveQuery
-> IO ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
pushResultToCohort GQResult ByteString
res ((ResponseHash, Int) -> ResponseHash
forall a b. (a, b) -> a
fst ((ResponseHash, Int) -> ResponseHash)
-> Maybe (ResponseHash, Int) -> Maybe ResponseHash
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (ResponseHash, Int)
respData) SubscriptionMetadata
lqMeta CohortSnapshot
CohortSnapshot 'LiveQuery
snapshot
          CohortExecutionDetails -> IO CohortExecutionDetails
forall (f :: * -> *) a. Applicative f => a -> f a
pure
            CohortExecutionDetails :: CohortId
-> CohortVariables
-> Maybe Int
-> [SubscriberExecutionDetails]
-> [SubscriberExecutionDetails]
-> BatchId
-> CohortExecutionDetails
CohortExecutionDetails
              { _cedCohortId :: CohortId
_cedCohortId = CohortId
cohortId,
                _cedVariables :: CohortVariables
_cedVariables = CohortSnapshot -> CohortVariables
C._csVariables CohortSnapshot
snapshot,
                _cedPushedTo :: [SubscriberExecutionDetails]
_cedPushedTo = [SubscriberExecutionDetails]
pushedSubscribers,
                _cedIgnored :: [SubscriberExecutionDetails]
_cedIgnored = [SubscriberExecutionDetails]
ignoredSubscribers,
                _cedResponseSize :: Maybe Int
_cedResponseSize = (ResponseHash, Int) -> Int
forall a b. (a, b) -> b
snd ((ResponseHash, Int) -> Int)
-> Maybe (ResponseHash, Int) -> Maybe Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (ResponseHash, Int)
respData,
                _cedBatchId :: BatchId
_cedBatchId = BatchId
batchId
              }
      BatchExecutionDetails -> IO BatchExecutionDetails
forall (f :: * -> *) a. Applicative f => a -> f a
pure (BatchExecutionDetails -> IO BatchExecutionDetails)
-> BatchExecutionDetails -> IO BatchExecutionDetails
forall a b. (a -> b) -> a -> b
$
        DiffTime
-> DiffTime
-> BatchId
-> [CohortExecutionDetails]
-> Maybe Int
-> BatchExecutionDetails
BatchExecutionDetails
          DiffTime
queryExecutionTime
          DiffTime
pushTime
          BatchId
batchId
          [CohortExecutionDetails]
cohortsExecutionDetails
          Maybe Int
batchResponseSize

    (DiffTime, [BatchExecutionDetails])
-> IO (DiffTime, [BatchExecutionDetails])
forall (f :: * -> *) a. Applicative f => a -> f a
pure (DiffTime
snapshotTime, [BatchExecutionDetails]
batchesDetails)

  let pollDetails :: PollDetails
pollDetails =
        PollDetails :: PollerId
-> Text
-> DiffTime
-> [BatchExecutionDetails]
-> DiffTime
-> SubscriptionsOptions
-> SourceName
-> RoleName
-> ParameterizedQueryHash
-> PollDetails
PollDetails
          { _pdPollerId :: PollerId
_pdPollerId = PollerId
pollerId,
            _pdGeneratedSql :: Text
_pdGeneratedSql = MultiplexedQuery b -> Text
forall a. ToTxt a => a -> Text
toTxt MultiplexedQuery b
query,
            _pdSnapshotTime :: DiffTime
_pdSnapshotTime = DiffTime
snapshotTime,
            _pdBatches :: [BatchExecutionDetails]
_pdBatches = [BatchExecutionDetails]
batchesDetails,
            _pdLiveQueryOptions :: SubscriptionsOptions
_pdLiveQueryOptions = SubscriptionsOptions
lqOpts,
            _pdTotalTime :: DiffTime
_pdTotalTime = DiffTime
totalTime,
            _pdSource :: SourceName
_pdSource = SourceName
sourceName,
            _pdRole :: RoleName
_pdRole = RoleName
roleName,
            _pdParameterizedQueryHash :: ParameterizedQueryHash
_pdParameterizedQueryHash = ParameterizedQueryHash
parameterizedQueryHash
          }
  SubscriptionPostPollHook
postPollHook PollDetails
pollDetails
  where
    SubscriptionsOptions BatchSize
batchSize RefetchInterval
_ = SubscriptionsOptions
lqOpts

    getCohortSnapshot :: (CohortVariables, Cohort ()) -> STM (CohortId, CohortSnapshot)
getCohortSnapshot (CohortVariables
cohortVars, Cohort ()
handlerC) = do
      let C.Cohort CohortId
resId TVar (Maybe ResponseHash)
respRef SubscriberMap
curOpsTV SubscriberMap
newOpsTV () = Cohort ()
handlerC
      [(SubscriberId, Subscriber)]
curOpsL <- SubscriberMap -> STM [(SubscriberId, Subscriber)]
forall k v. TMap k v -> STM [(k, v)]
TMap.toList SubscriberMap
curOpsTV
      [(SubscriberId, Subscriber)]
newOpsL <- SubscriberMap -> STM [(SubscriberId, Subscriber)]
forall k v. TMap k v -> STM [(k, v)]
TMap.toList SubscriberMap
newOpsTV
      [(SubscriberId, Subscriber)]
-> ((SubscriberId, Subscriber) -> STM ()) -> STM ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [(SubscriberId, Subscriber)]
newOpsL (((SubscriberId, Subscriber) -> STM ()) -> STM ())
-> ((SubscriberId, Subscriber) -> STM ()) -> STM ()
forall a b. (a -> b) -> a -> b
$ \(SubscriberId
k, Subscriber
action) -> Subscriber -> SubscriberId -> SubscriberMap -> STM ()
forall k v. (Eq k, Hashable k) => v -> k -> TMap k v -> STM ()
TMap.insert Subscriber
action SubscriberId
k SubscriberMap
curOpsTV
      SubscriberMap -> STM ()
forall k v. TMap k v -> STM ()
TMap.reset SubscriberMap
newOpsTV

      let cohortSnapshot :: CohortSnapshot
cohortSnapshot = CohortVariables
-> TVar (Maybe ResponseHash)
-> [Subscriber]
-> [Subscriber]
-> CohortSnapshot
C.CohortSnapshot CohortVariables
cohortVars TVar (Maybe ResponseHash)
respRef (((SubscriberId, Subscriber) -> Subscriber)
-> [(SubscriberId, Subscriber)] -> [Subscriber]
forall a b. (a -> b) -> [a] -> [b]
map (SubscriberId, Subscriber) -> Subscriber
forall a b. (a, b) -> b
snd [(SubscriberId, Subscriber)]
curOpsL) (((SubscriberId, Subscriber) -> Subscriber)
-> [(SubscriberId, Subscriber)] -> [Subscriber]
forall a b. (a -> b) -> [a] -> [b]
map (SubscriberId, Subscriber) -> Subscriber
forall a b. (a, b) -> b
snd [(SubscriberId, Subscriber)]
newOpsL)
      (CohortId, CohortSnapshot) -> STM (CohortId, CohortSnapshot)
forall (m :: * -> *) a. Monad m => a -> m a
return (CohortId
resId, CohortSnapshot
cohortSnapshot)

    getCohortOperations :: [(k, t)]
-> Either QErr [(k, ByteString)]
-> [(m ByteString, k, Maybe (ResponseHash, Int), t)]
getCohortOperations [(k, t)]
cohorts = \case
      Left QErr
e ->
        -- TODO: this is internal error
        let resp :: m ByteString
resp = GQExecError -> m ByteString
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (GQExecError -> m ByteString) -> GQExecError -> m ByteString
forall a b. (a -> b) -> a -> b
$ [Value] -> GQExecError
GQExecError [Bool -> QErr -> Value
encodeGQLErr Bool
False QErr
e]
         in [(m ByteString
resp, k
cohortId, Maybe (ResponseHash, Int)
forall a. Maybe a
Nothing, t
snapshot) | (k
cohortId, t
snapshot) <- [(k, t)]
cohorts]
      Right [(k, ByteString)]
responses -> do
        let cohortSnapshotMap :: HashMap k t
cohortSnapshotMap = [(k, t)] -> HashMap k t
forall k v. (Eq k, Hashable k) => [(k, v)] -> HashMap k v
Map.fromList [(k, t)]
cohorts
        (((k, ByteString)
  -> Maybe (m ByteString, k, Maybe (ResponseHash, Int), t))
 -> [(k, ByteString)]
 -> [(m ByteString, k, Maybe (ResponseHash, Int), t)])
-> [(k, ByteString)]
-> ((k, ByteString)
    -> Maybe (m ByteString, k, Maybe (ResponseHash, Int), t))
-> [(m ByteString, k, Maybe (ResponseHash, Int), t)]
forall a b c. (a -> b -> c) -> b -> a -> c
flip ((k, ByteString)
 -> Maybe (m ByteString, k, Maybe (ResponseHash, Int), t))
-> [(k, ByteString)]
-> [(m ByteString, k, Maybe (ResponseHash, Int), t)]
forall (f :: * -> *) a b.
Filterable f =>
(a -> Maybe b) -> f a -> f b
mapMaybe [(k, ByteString)]
responses (((k, ByteString)
  -> Maybe (m ByteString, k, Maybe (ResponseHash, Int), t))
 -> [(m ByteString, k, Maybe (ResponseHash, Int), t)])
-> ((k, ByteString)
    -> Maybe (m ByteString, k, Maybe (ResponseHash, Int), t))
-> [(m ByteString, k, Maybe (ResponseHash, Int), t)]
forall a b. (a -> b) -> a -> b
$ \(k
cohortId, ByteString
respBS) ->
          let respHash :: ResponseHash
respHash = ByteString -> ResponseHash
mkRespHash ByteString
respBS
              respSize :: Int
respSize = ByteString -> Int
BS.length ByteString
respBS
           in -- TODO: currently we ignore the cases when the cohortId from
              -- Postgres response is not present in the cohort map of this batch
              -- (this shouldn't happen but if it happens it means a logic error and
              -- we should log it)
              (ByteString -> m ByteString
forall (f :: * -> *) a. Applicative f => a -> f a
pure ByteString
respBS,k
cohortId,(ResponseHash, Int) -> Maybe (ResponseHash, Int)
forall a. a -> Maybe a
Just (ResponseHash
respHash, Int
respSize),)
                (t -> (m ByteString, k, Maybe (ResponseHash, Int), t))
-> Maybe t -> Maybe (m ByteString, k, Maybe (ResponseHash, Int), t)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> k -> HashMap k t -> Maybe t
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
Map.lookup k
cohortId HashMap k t
cohortSnapshotMap