{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE TemplateHaskell #-}

module Hasura.GraphQL.Execute.Subscription.Poll.StreamingQuery
  ( -- * Pollers
    pollStreamingQuery,
  )
where

import Control.Concurrent.Async qualified as A
import Control.Concurrent.STM qualified as STM
import Control.Lens
import Data.Aeson.Ordered qualified as JO
import Data.ByteString qualified as BS
import Data.HashMap.Strict.Extended qualified as HashMap
import Data.HashSet qualified as Set
import Data.List.Split (chunksOf)
import Data.Monoid (Endo (..), Sum (..))
import Data.Text.Extended
import GHC.AssertNF.CPP
import Hasura.Base.Error
import Hasura.GraphQL.Execute.Backend
import Hasura.GraphQL.Execute.Subscription.Options
import Hasura.GraphQL.Execute.Subscription.Plan
import Hasura.GraphQL.Execute.Subscription.Plan qualified as C
import Hasura.GraphQL.Execute.Subscription.Poll.Common hiding (Cohort (..), CohortMap, CohortSnapshot (..))
import Hasura.GraphQL.Execute.Subscription.Poll.Common qualified as C
import Hasura.GraphQL.Execute.Subscription.TMap qualified as TMap
import Hasura.GraphQL.Execute.Subscription.Types
import Hasura.GraphQL.ParameterizedQueryHash (ParameterizedQueryHash)
import Hasura.GraphQL.Transport.Backend
import Hasura.GraphQL.Transport.HTTP.Protocol
import Hasura.Logging (LogLevel (..))
import Hasura.Prelude
import Hasura.RQL.Types.Backend
import Hasura.RQL.Types.BackendTag (backendTag, reify)
import Hasura.RQL.Types.BackendType (BackendType (..), PostgresKind (Vanilla))
import Hasura.RQL.Types.Common (SourceName)
import Hasura.RQL.Types.Roles (RoleName)
import Hasura.RQL.Types.Subscription (SubscriptionType (..))
import Hasura.SQL.Value (TxtEncodedVal (..))
import Hasura.Server.Prometheus (PrometheusMetrics (..), SubscriptionMetrics (..), recordSubcriptionMetric, streamingSubscriptionLabel)
import Hasura.Server.Types (GranularPrometheusMetricsState (..))
import Language.GraphQL.Draft.Syntax qualified as G
import Refined (unrefine)
import System.Metrics.Prometheus.Gauge qualified as Prometheus.Gauge
import System.Metrics.Prometheus.HistogramVector qualified as HistogramVector
import Text.Shakespeare.Text (st)

{- Note [Streaming subscriptions rebuilding cohort map]
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

When the multiplexed query is polled, the cohort is snapshotted to get the
existing and newly added subscribers (if any), let's call these subscribers as
current poll subscribers.

Every cohort is associated with a cohort key and the cohort key is basically the
variables the cohort uses. The cohort variables contains multiple types of
variables in it, session variables, query variables, synthetic variables and
cursor variables, out of all these, the cursor variable may change with every
poll. So, we rebuild the cohort map at the end of every poll, see Note
[Streaming subscription polling]. But, rebuilding the cohort map is not straight
forward because there are race conditions that need to be taken care of. Some of
the race conditions which can happen when the current cohorts are processing:

1. A cohort is removed concurrently
2. A subscriber is removed concurrently
3. A new subscriber has started a subscription and it should be placed in the correct cohort

In the current code, the rebuilding of the cohort map goes as the follows:

1. After snapshotting the cohorts, we build a cohort map out of those cohorts,
   in the code these are called as "processedCohorts", it's important to note
   that these are not retrieved from the mutable "CohortMap", these are the
   snapshotted cohorts which were processed in the current poll. The reason we
   maintain the snapshotted cohorts is because it is later used while rebuilding
   the cohort map.

2. We create a processed cohort map which looks like HashMap CohortKey (Cohort
   'Streaming, CohortKey). The key of the map is the CohortKey which was
   associated with the cohort during the poll and the CohortKey in the value
   type is the cohort key after updating the cursor value. Note that both the
   values may or may not be equal.

3. We atomically get the list of the cohorts from the cohort map (mutable
reference), let's call it current cohorts and then traverse over it.

   1. Lookup with the given cohort key into the processed cohort map

      a. If no cohort is found, it means that the cohort with the given cohort
         key has been added while we were polling. So, we keep this as it is.

      b. If a processed cohort is found:

         i. We have to see if any new subscribers have been added to the current
            cohort, this is calculated using the diff of existing subscribers in
            the current cohort and the existing cohort, if there are any then we
            create a new cohort which includes only the newly added subscribers
            and add the new cohort into the cohort map.

         ii. We only retain the subscribers found in the processed cohort which
             exist in the current cohort. We do this because it is possible that
             a subscriber has been stopped their subscription in the time
             between the cohorts were snapshotted for processing and the time
             the cohort map is rebuilt.

         iii. Insert the processed cohort with the updated cohort key into the
         cohort map.
-}

{- Note [Streaming subscription polling]
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Every cohort of a streaming subscription is associated with a mutable latest
cursor value reference, which contains the current value of the cursor.

After a poll, the mutable cursor value gets updated if its value is a non-null
one, null value means that there were no rows to get the min/max value from.

After this, the cohort map associated with the poller is also rebuilt, which
will make sure that the cohort map's key contains the latest cursor values. So,
any new subscriber will get added to an existing cohort if the new subscriber's
cohort key matches with any of the existing cohorts. We *need* to rebuild the
cohort map, because, say if we don't rebuild the cohort map then a new
subscriber may get added to a cohort which has been streaming for a while now,
then the new subscriber will get the responses according to the cursor value
stored in the cohort, instead of the initial value specified by the client. For
example:

Client 1 started streaming a query at t1, say:

```
   subscription {
      log_stream(cursor: {initial_value: {created_at: "2020-01-01"}}, batch_size: 1000) {
         id
         level
         detail
     }
   }
```

Client 2 starts streaming at t2 with the same query (where t2 > t1), if the
cohort map is not rebuilt (to reflect cursor value in the cohort key), then
client 2 and client 1 will both start getting the same responses, which is wrong
because client 2 should start streaming from the initial value it provided.

-}

{- Note [Lifecycle of a streaming subscription poller]
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  +-------------------------------------------+           +-----------------------------------------------------------------+
  | Execute multiplexed query in the database | ------->  | Parse the response, every row of response contains three things:|
  +-------------------------------------------+           |  1. Cohort ID                                                   |
             ^                                            |  2. Cohort's Response                                           |
             |                                            |  3. Cohort's latest cursor value                                |
             |                                            +-----------------------------------------------------------------+
             |                                                                            |
             |                                                                            |
             |                                                                            v
             |                                            +------------------------------------------------------------------------+
             |                                            | Processing of the response:                                            |
             |                                            | 1. Send the result to the subscribers                                  |
   +------------------------------------+                 | 2. Update the cursor value in the mutable reference                    |
   | Rebuild the cohort map             |                 |    of the snapshot so that the next poll uses this value               |
   +------------------------------------+        <------  +------------------------------------------------------------------------+

-}

mergeOldAndNewCursorValues :: CursorVariableValues -> CursorVariableValues -> CursorVariableValues
mergeOldAndNewCursorValues :: CursorVariableValues
-> CursorVariableValues -> CursorVariableValues
mergeOldAndNewCursorValues (CursorVariableValues HashMap Name TxtEncodedVal
prevPollCursorValues) (CursorVariableValues HashMap Name TxtEncodedVal
currentPollCursorValues) =
  let combineFn :: TxtEncodedVal -> TxtEncodedVal -> TxtEncodedVal
combineFn TxtEncodedVal
previousVal TxtEncodedVal
currentVal =
        case TxtEncodedVal
currentVal of
          TxtEncodedVal
TENull -> TxtEncodedVal
previousVal -- When we get a null value from the DB, we retain the previous value
          TELit Text
t -> Text -> TxtEncodedVal
TELit Text
t
   in HashMap Name TxtEncodedVal -> CursorVariableValues
CursorVariableValues (HashMap Name TxtEncodedVal -> CursorVariableValues)
-> HashMap Name TxtEncodedVal -> CursorVariableValues
forall a b. (a -> b) -> a -> b
$ (TxtEncodedVal -> TxtEncodedVal -> TxtEncodedVal)
-> HashMap Name TxtEncodedVal
-> HashMap Name TxtEncodedVal
-> HashMap Name TxtEncodedVal
forall k v.
(Eq k, Hashable k) =>
(v -> v -> v) -> HashMap k v -> HashMap k v -> HashMap k v
HashMap.unionWith TxtEncodedVal -> TxtEncodedVal -> TxtEncodedVal
combineFn HashMap Name TxtEncodedVal
prevPollCursorValues HashMap Name TxtEncodedVal
currentPollCursorValues

pushResultToCohort ::
  GQResult BS.ByteString ->
  Maybe ResponseHash ->
  SubscriptionMetadata ->
  CursorVariableValues ->
  -- | Root field name
  G.Name ->
  -- | subscribers to which data has been pushed, subscribers which already
  -- have this data (this information is exposed by metrics reporting)
  (CohortSnapshot 'Streaming, Cohort 'Streaming) ->
  IO ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
pushResultToCohort :: GQResult ByteString
-> Maybe ResponseHash
-> SubscriptionMetadata
-> CursorVariableValues
-> Name
-> (CohortSnapshot 'Streaming, Cohort 'Streaming)
-> IO ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
pushResultToCohort GQResult ByteString
result !Maybe ResponseHash
respHashM (SubscriptionMetadata DiffTime
dTime) CursorVariableValues
cursorValues Name
rootFieldName (CohortSnapshot 'Streaming
cohortSnapshot, Cohort 'Streaming
cohort) = do
  Maybe ResponseHash
prevRespHashM <- TVar (Maybe ResponseHash) -> IO (Maybe ResponseHash)
forall a. TVar a -> IO a
STM.readTVarIO TVar (Maybe ResponseHash)
respRef
  -- write to the current websockets if needed
  ([Subscriber]
subscribersToPush, [Subscriber]
subscribersToIgnore) <-
    -- We send the response to all the subscribers only when the response changes.
    if GQResult ByteString -> Bool
forall a. GQResult a -> Bool
isExecError GQResult ByteString
result Bool -> Bool -> Bool
|| Maybe ResponseHash
respHashM Maybe ResponseHash -> Maybe ResponseHash -> Bool
forall a. Eq a => a -> a -> Bool
/= Maybe ResponseHash
prevRespHashM
      then do
        $String
String -> Maybe ResponseHash -> IO ()
forall a. String -> a -> IO ()
assertNFHere Maybe ResponseHash
respHashM -- so we don't write thunks to mutable vars
        STM ([Subscriber], [Subscriber]) -> IO ([Subscriber], [Subscriber])
forall a. STM a -> IO a
STM.atomically (STM ([Subscriber], [Subscriber])
 -> IO ([Subscriber], [Subscriber]))
-> STM ([Subscriber], [Subscriber])
-> IO ([Subscriber], [Subscriber])
forall a b. (a -> b) -> a -> b
$ do
          TVar (Maybe ResponseHash) -> Maybe ResponseHash -> STM ()
forall a. TVar a -> a -> STM ()
STM.writeTVar TVar (Maybe ResponseHash)
respRef Maybe ResponseHash
respHashM
          TVar CursorVariableValues -> CursorVariableValues -> STM ()
forall a. TVar a -> a -> STM ()
STM.writeTVar (Cohort (TVar CursorVariableValues) -> TVar CursorVariableValues
forall streamCursorVars.
Cohort streamCursorVars -> streamCursorVars
C._cStreamCursorVariables Cohort (TVar CursorVariableValues)
Cohort 'Streaming
cohort) CursorVariableValues
cursorValues
          ([Subscriber], [Subscriber]) -> STM ([Subscriber], [Subscriber])
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return ([Subscriber]
newSinks [Subscriber] -> [Subscriber] -> [Subscriber]
forall a. Semigroup a => a -> a -> a
<> [Subscriber]
curSinks, [Subscriber]
forall a. Monoid a => a
mempty)
      else -- when the response is unchanged, the response is only sent to the newly added subscribers
        ([Subscriber], [Subscriber]) -> IO ([Subscriber], [Subscriber])
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ([Subscriber]
newSinks, [Subscriber]
curSinks)
  [Subscriber] -> IO ()
pushResultToSubscribers [Subscriber]
subscribersToPush
  ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
-> IO ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
    (([SubscriberExecutionDetails], [SubscriberExecutionDetails])
 -> IO ([SubscriberExecutionDetails], [SubscriberExecutionDetails]))
-> ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
-> IO ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
forall a b. (a -> b) -> a -> b
$ ASetter
  ([Subscriber], [Subscriber])
  ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
  Subscriber
  SubscriberExecutionDetails
-> (Subscriber -> SubscriberExecutionDetails)
-> ([Subscriber], [Subscriber])
-> ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
forall s t a b. ASetter s t a b -> (a -> b) -> s -> t
over
      (([Subscriber] -> Identity [SubscriberExecutionDetails])
-> ([Subscriber], [Subscriber])
-> Identity
     ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
forall s t a b. Each s t a b => Traversal s t a b
Traversal
  ([Subscriber], [Subscriber])
  ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
  [Subscriber]
  [SubscriberExecutionDetails]
each (([Subscriber] -> Identity [SubscriberExecutionDetails])
 -> ([Subscriber], [Subscriber])
 -> Identity
      ([SubscriberExecutionDetails], [SubscriberExecutionDetails]))
-> ((Subscriber -> Identity SubscriberExecutionDetails)
    -> [Subscriber] -> Identity [SubscriberExecutionDetails])
-> ASetter
     ([Subscriber], [Subscriber])
     ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
     Subscriber
     SubscriberExecutionDetails
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Subscriber -> Identity SubscriberExecutionDetails)
-> [Subscriber] -> Identity [SubscriberExecutionDetails]
forall s t a b. Each s t a b => Traversal s t a b
Traversal
  [Subscriber]
  [SubscriberExecutionDetails]
  Subscriber
  SubscriberExecutionDetails
each)
      ( \Subscriber {Maybe OperationName
RequestId
SubscriberMetadata
SubscriberId
OnChange
_sId :: SubscriberId
_sMetadata :: SubscriberMetadata
_sRequestId :: RequestId
_sOperationName :: Maybe OperationName
_sOnChangeCallback :: OnChange
_sId :: Subscriber -> SubscriberId
_sMetadata :: Subscriber -> SubscriberMetadata
_sRequestId :: Subscriber -> RequestId
_sOperationName :: Subscriber -> Maybe OperationName
_sOnChangeCallback :: Subscriber -> OnChange
..} ->
          SubscriberId -> SubscriberMetadata -> SubscriberExecutionDetails
SubscriberExecutionDetails SubscriberId
_sId SubscriberMetadata
_sMetadata
      )
      ([Subscriber]
subscribersToPush, [Subscriber]
subscribersToIgnore)
  where
    rootFieldNameText :: Text
rootFieldNameText = Name -> Text
G.unName Name
rootFieldName
    -- we want to know if the DB response is an empty array and if it is, then we don't send anything
    -- to the client. We could do this by parsing the response and check for an empty list, but
    -- this will have performance impact when large responses are parsed. Instead, we compare
    -- the DB response to a templated empty array response.

    -- We are using templating instead of doing something like
    -- J.encode $ J.object [ rootFieldNameText J..= [] :: [J.Value] ]
    -- is because, unfortunately, the value returned by the above is
    -- {<rootFieldNameText>:[]} (notice the lack of spaces). So, instead
    -- we're templating according to the format postgres sends JSON responses.
    emptyRespBS :: GQResult ByteString
emptyRespBS = ByteString -> GQResult ByteString
forall a b. b -> Either a b
Right (ByteString -> GQResult ByteString)
-> ByteString -> GQResult ByteString
forall a b. (a -> b) -> a -> b
$ Text -> ByteString
txtToBs [st|{"#{rootFieldNameText}" : []}|]
    -- Same as the above, but cockroach prefixes the responses with @\x1@ and does not
    -- have a space between the key and the colon.
    roachEmptyRespBS :: GQResult ByteString
roachEmptyRespBS = ByteString -> GQResult ByteString
forall a b. b -> Either a b
Right (ByteString -> GQResult ByteString)
-> ByteString -> GQResult ByteString
forall a b. (a -> b) -> a -> b
$ ByteString
"\x1" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> Text -> ByteString
txtToBs [st|{"#{rootFieldNameText}": []}|]

    isResponseEmpty :: Bool
isResponseEmpty =
      GQResult ByteString
result GQResult ByteString -> GQResult ByteString -> Bool
forall a. Eq a => a -> a -> Bool
== GQResult ByteString
emptyRespBS Bool -> Bool -> Bool
|| GQResult ByteString
result GQResult ByteString -> GQResult ByteString -> Bool
forall a. Eq a => a -> a -> Bool
== GQResult ByteString
roachEmptyRespBS

    C.CohortSnapshot CohortVariables
_ TVar (Maybe ResponseHash)
respRef [Subscriber]
curSinks [Subscriber]
newSinks = CohortSnapshot 'Streaming
cohortSnapshot

    response :: Either GQExecError SubscriptionResponse
response = GQResult ByteString
result GQResult ByteString
-> (ByteString -> SubscriptionResponse)
-> Either GQExecError SubscriptionResponse
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \ByteString
payload -> ByteString -> DiffTime -> SubscriptionResponse
SubscriptionResponse ByteString
payload DiffTime
dTime

    pushResultToSubscribers :: [Subscriber] -> IO ()
pushResultToSubscribers [Subscriber]
subscribers =
      Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
isResponseEmpty
        (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ ((Subscriber -> IO ()) -> [Subscriber] -> IO ())
-> [Subscriber] -> (Subscriber -> IO ()) -> IO ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip (Subscriber -> IO ()) -> [Subscriber] -> IO ()
forall (f :: * -> *) a b. Foldable f => (a -> IO b) -> f a -> IO ()
A.mapConcurrently_ [Subscriber]
subscribers
        ((Subscriber -> IO ()) -> IO ()) -> (Subscriber -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Subscriber {Maybe OperationName
RequestId
SubscriberMetadata
SubscriberId
OnChange
_sId :: Subscriber -> SubscriberId
_sMetadata :: Subscriber -> SubscriberMetadata
_sRequestId :: Subscriber -> RequestId
_sOperationName :: Subscriber -> Maybe OperationName
_sOnChangeCallback :: Subscriber -> OnChange
_sId :: SubscriberId
_sMetadata :: SubscriberMetadata
_sRequestId :: RequestId
_sOperationName :: Maybe OperationName
_sOnChangeCallback :: OnChange
..} -> OnChange
_sOnChangeCallback Either GQExecError SubscriptionResponse
response

-- | A single iteration of the streaming query polling loop. Invocations on the
-- same mutable objects may race.
pollStreamingQuery ::
  forall b.
  (BackendTransport b) =>
  PollerId ->
  STM.TVar PollerResponseState ->
  SubscriptionsOptions ->
  (SourceName, SourceConfig b) ->
  RoleName ->
  ParameterizedQueryHash ->
  MultiplexedQuery b ->
  CohortMap 'Streaming ->
  G.Name ->
  SubscriptionPostPollHook ->
  Maybe (IO ()) -> -- Optional IO action to make this function (pollStreamingQuery) testable
  PrometheusMetrics ->
  IO GranularPrometheusMetricsState ->
  TMap.TMap (Maybe OperationName) Int ->
  ResolvedConnectionTemplate b ->
  Maybe (Endo JO.Value) ->
  IO ()
pollStreamingQuery :: forall (b :: BackendType).
BackendTransport b =>
PollerId
-> TVar PollerResponseState
-> SubscriptionsOptions
-> (SourceName, SourceConfig b)
-> RoleName
-> ParameterizedQueryHash
-> MultiplexedQuery b
-> CohortMap 'Streaming
-> Name
-> SubscriptionPostPollHook
-> Maybe (IO ())
-> PrometheusMetrics
-> IO GranularPrometheusMetricsState
-> TMap (Maybe OperationName) Int
-> ResolvedConnectionTemplate b
-> Maybe (Endo Value)
-> IO ()
pollStreamingQuery PollerId
pollerId TVar PollerResponseState
pollerResponseState SubscriptionsOptions
streamingQueryOpts (SourceName
sourceName, SourceConfig b
sourceConfig) RoleName
roleName ParameterizedQueryHash
parameterizedQueryHash MultiplexedQuery b
query CohortMap 'Streaming
cohortMap Name
rootFieldName SubscriptionPostPollHook
postPollHook Maybe (IO ())
testActionMaybe PrometheusMetrics
prometheusMetrics IO GranularPrometheusMetricsState
granularPrometheusMetricsState TMap (Maybe OperationName) Int
operationNames' ResolvedConnectionTemplate b
resolvedConnectionTemplate Maybe (Endo Value)
modifier = do
  HashMap (Maybe OperationName) Int
operationNames <- STM (HashMap (Maybe OperationName) Int)
-> IO (HashMap (Maybe OperationName) Int)
forall a. STM a -> IO a
STM.atomically (STM (HashMap (Maybe OperationName) Int)
 -> IO (HashMap (Maybe OperationName) Int))
-> STM (HashMap (Maybe OperationName) Int)
-> IO (HashMap (Maybe OperationName) Int)
forall a b. (a -> b) -> a -> b
$ TMap (Maybe OperationName) Int
-> STM (HashMap (Maybe OperationName) Int)
forall k v. TMap k v -> STM (HashMap k v)
TMap.getMap TMap (Maybe OperationName) Int
operationNames'
  (DiffTime
totalTime, (DiffTime
snapshotTime, ([BatchExecutionDetails]
batchesDetails, [[(CohortVariables,
   (Cohort (TVar CursorVariableValues), CohortVariables,
    [Subscriber]))]]
processedCohorts, [Maybe PollDetailsError]
maybeErrors))) <- IO
  (DiffTime,
   ([BatchExecutionDetails],
    [[(CohortVariables,
       (Cohort (TVar CursorVariableValues), CohortVariables,
        [Subscriber]))]],
    [Maybe PollDetailsError]))
-> IO
     (DiffTime,
      (DiffTime,
       ([BatchExecutionDetails],
        [[(CohortVariables,
           (Cohort (TVar CursorVariableValues), CohortVariables,
            [Subscriber]))]],
        [Maybe PollDetailsError])))
forall (m :: * -> *) a. MonadIO m => m a -> m (DiffTime, a)
withElapsedTime (IO
   (DiffTime,
    ([BatchExecutionDetails],
     [[(CohortVariables,
        (Cohort (TVar CursorVariableValues), CohortVariables,
         [Subscriber]))]],
     [Maybe PollDetailsError]))
 -> IO
      (DiffTime,
       (DiffTime,
        ([BatchExecutionDetails],
         [[(CohortVariables,
            (Cohort (TVar CursorVariableValues), CohortVariables,
             [Subscriber]))]],
         [Maybe PollDetailsError]))))
-> IO
     (DiffTime,
      ([BatchExecutionDetails],
       [[(CohortVariables,
          (Cohort (TVar CursorVariableValues), CohortVariables,
           [Subscriber]))]],
       [Maybe PollDetailsError]))
-> IO
     (DiffTime,
      (DiffTime,
       ([BatchExecutionDetails],
        [[(CohortVariables,
           (Cohort (TVar CursorVariableValues), CohortVariables,
            [Subscriber]))]],
        [Maybe PollDetailsError])))
forall a b. (a -> b) -> a -> b
$ do
    -- snapshot the current cohorts and split them into batches
    -- This STM transaction is a read only transaction i.e. it doesn't mutate any state
    (DiffTime
snapshotTime, [(BatchId,
  [(CohortId,
    (CohortSnapshot, Cohort (TVar CursorVariableValues)))])]
cohortBatches) <- IO
  [(BatchId,
    [(CohortId,
      (CohortSnapshot, Cohort (TVar CursorVariableValues)))])]
-> IO
     (DiffTime,
      [(BatchId,
        [(CohortId,
          (CohortSnapshot, Cohort (TVar CursorVariableValues)))])])
forall (m :: * -> *) a. MonadIO m => m a -> m (DiffTime, a)
withElapsedTime (IO
   [(BatchId,
     [(CohortId,
       (CohortSnapshot, Cohort (TVar CursorVariableValues)))])]
 -> IO
      (DiffTime,
       [(BatchId,
         [(CohortId,
           (CohortSnapshot, Cohort (TVar CursorVariableValues)))])]))
-> IO
     [(BatchId,
       [(CohortId,
         (CohortSnapshot, Cohort (TVar CursorVariableValues)))])]
-> IO
     (DiffTime,
      [(BatchId,
        [(CohortId,
          (CohortSnapshot, Cohort (TVar CursorVariableValues)))])])
forall a b. (a -> b) -> a -> b
$ do
      -- get a snapshot of all the cohorts
      -- this need not be done in a transaction
      [(CohortVariables, Cohort (TVar CursorVariableValues))]
cohorts <- STM [(CohortVariables, Cohort (TVar CursorVariableValues))]
-> IO [(CohortVariables, Cohort (TVar CursorVariableValues))]
forall a. STM a -> IO a
STM.atomically (STM [(CohortVariables, Cohort (TVar CursorVariableValues))]
 -> IO [(CohortVariables, Cohort (TVar CursorVariableValues))])
-> STM [(CohortVariables, Cohort (TVar CursorVariableValues))]
-> IO [(CohortVariables, Cohort (TVar CursorVariableValues))]
forall a b. (a -> b) -> a -> b
$ TMap CohortVariables (Cohort (TVar CursorVariableValues))
-> STM [(CohortVariables, Cohort (TVar CursorVariableValues))]
forall k v. TMap k v -> STM [(k, v)]
TMap.toList TMap CohortVariables (Cohort (TVar CursorVariableValues))
CohortMap 'Streaming
cohortMap
      [(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))]
cohortSnapshots <- ((CohortVariables, Cohort (TVar CursorVariableValues))
 -> IO
      (CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues))))
-> [(CohortVariables, Cohort (TVar CursorVariableValues))]
-> IO
     [(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM (STM
  (CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))
-> IO
     (CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))
forall a. STM a -> IO a
STM.atomically (STM
   (CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))
 -> IO
      (CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues))))
-> ((CohortVariables, Cohort (TVar CursorVariableValues))
    -> STM
         (CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues))))
-> (CohortVariables, Cohort (TVar CursorVariableValues))
-> IO
     (CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (CohortVariables, Cohort (TVar CursorVariableValues))
-> STM
     (CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))
forall {streamCursorVars}.
(CohortVariables, Cohort streamCursorVars)
-> STM (CohortId, (CohortSnapshot, Cohort streamCursorVars))
getCohortSnapshot) [(CohortVariables, Cohort (TVar CursorVariableValues))]
cohorts
      -- cohorts are broken down into batches specified by the batch size
      let cohortBatches :: [[(CohortId,
   (CohortSnapshot, Cohort (TVar CursorVariableValues)))]]
cohortBatches = Int
-> [(CohortId,
     (CohortSnapshot, Cohort (TVar CursorVariableValues)))]
-> [[(CohortId,
      (CohortSnapshot, Cohort (TVar CursorVariableValues)))]]
forall e. Int -> [e] -> [[e]]
chunksOf (Refined NonNegative Int -> Int
forall {k} (p :: k) x. Refined p x -> x
unrefine (BatchSize -> Refined NonNegative Int
unBatchSize BatchSize
batchSize)) [(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))]
cohortSnapshots
      -- associating every batch with their BatchId
      [(BatchId,
  [(CohortId,
    (CohortSnapshot, Cohort (TVar CursorVariableValues)))])]
-> IO
     [(BatchId,
       [(CohortId,
         (CohortSnapshot, Cohort (TVar CursorVariableValues)))])]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([(BatchId,
   [(CohortId,
     (CohortSnapshot, Cohort (TVar CursorVariableValues)))])]
 -> IO
      [(BatchId,
        [(CohortId,
          (CohortSnapshot, Cohort (TVar CursorVariableValues)))])])
-> [(BatchId,
     [(CohortId,
       (CohortSnapshot, Cohort (TVar CursorVariableValues)))])]
-> IO
     [(BatchId,
       [(CohortId,
         (CohortSnapshot, Cohort (TVar CursorVariableValues)))])]
forall a b. (a -> b) -> a -> b
$ [BatchId]
-> [[(CohortId,
      (CohortSnapshot, Cohort (TVar CursorVariableValues)))]]
-> [(BatchId,
     [(CohortId,
       (CohortSnapshot, Cohort (TVar CursorVariableValues)))])]
forall a b. [a] -> [b] -> [(a, b)]
zip (Int -> BatchId
BatchId (Int -> BatchId) -> [Int] -> [BatchId]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [Int
1 ..]) [[(CohortId,
   (CohortSnapshot, Cohort (TVar CursorVariableValues)))]]
cohortBatches

    Maybe (IO ()) -> (IO () -> IO ()) -> IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ Maybe (IO ())
testActionMaybe IO () -> IO ()
forall a. a -> a
id -- IO action intended to run after the cohorts have been snapshotted

    -- concurrently process each batch and also get the processed cohort with the new updated cohort key
    [(BatchExecutionDetails,
  [(CohortVariables,
    (Cohort (TVar CursorVariableValues), CohortVariables,
     [Subscriber]))],
  Maybe PollDetailsError)]
batchesDetailsAndProcessedCohortsWithMaybeError <- [(BatchId,
  [(CohortId,
    (CohortSnapshot, Cohort (TVar CursorVariableValues)))])]
-> ((BatchId,
     [(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))])
    -> IO
         (BatchExecutionDetails,
          [(CohortVariables,
            (Cohort (TVar CursorVariableValues), CohortVariables,
             [Subscriber]))],
          Maybe PollDetailsError))
-> IO
     [(BatchExecutionDetails,
       [(CohortVariables,
         (Cohort (TVar CursorVariableValues), CohortVariables,
          [Subscriber]))],
       Maybe PollDetailsError)]
forall (t :: * -> *) a b.
Traversable t =>
t a -> (a -> IO b) -> IO (t b)
A.forConcurrently [(BatchId,
  [(CohortId,
    (CohortSnapshot, Cohort (TVar CursorVariableValues)))])]
cohortBatches (((BatchId,
   [(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))])
  -> IO
       (BatchExecutionDetails,
        [(CohortVariables,
          (Cohort (TVar CursorVariableValues), CohortVariables,
           [Subscriber]))],
        Maybe PollDetailsError))
 -> IO
      [(BatchExecutionDetails,
        [(CohortVariables,
          (Cohort (TVar CursorVariableValues), CohortVariables,
           [Subscriber]))],
        Maybe PollDetailsError)])
-> ((BatchId,
     [(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))])
    -> IO
         (BatchExecutionDetails,
          [(CohortVariables,
            (Cohort (TVar CursorVariableValues), CohortVariables,
             [Subscriber]))],
          Maybe PollDetailsError))
-> IO
     [(BatchExecutionDetails,
       [(CohortVariables,
         (Cohort (TVar CursorVariableValues), CohortVariables,
          [Subscriber]))],
       Maybe PollDetailsError)]
forall a b. (a -> b) -> a -> b
$ \(BatchId
batchId, [(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))]
cohorts) -> do
      (DiffTime
queryExecutionTime, Either QErr [(CohortId, ByteString, CursorVariableValues)]
mxRes) <-
        forall (b :: BackendType) (m :: * -> *).
(BackendTransport b, MonadIO m, MonadBaseControl IO m) =>
SourceConfig b
-> MultiplexedQuery b
-> [(CohortId, CohortVariables)]
-> ResolvedConnectionTemplate b
-> m (DiffTime,
      Either QErr [(CohortId, ByteString, CursorVariableValues)])
runDBStreamingSubscription @b
          SourceConfig b
sourceConfig
          MultiplexedQuery b
query
          (ASetter
  [(CohortId, CohortSnapshot)]
  [(CohortId, CohortVariables)]
  CohortSnapshot
  CohortVariables
-> (CohortSnapshot -> CohortVariables)
-> [(CohortId, CohortSnapshot)]
-> [(CohortId, CohortVariables)]
forall s t a b. ASetter s t a b -> (a -> b) -> s -> t
over (((CohortId, CohortSnapshot)
 -> Identity (CohortId, CohortVariables))
-> [(CohortId, CohortSnapshot)]
-> Identity [(CohortId, CohortVariables)]
forall s t a b. Each s t a b => Traversal s t a b
Traversal
  [(CohortId, CohortSnapshot)]
  [(CohortId, CohortVariables)]
  (CohortId, CohortSnapshot)
  (CohortId, CohortVariables)
each (((CohortId, CohortSnapshot)
  -> Identity (CohortId, CohortVariables))
 -> [(CohortId, CohortSnapshot)]
 -> Identity [(CohortId, CohortVariables)])
-> ((CohortSnapshot -> Identity CohortVariables)
    -> (CohortId, CohortSnapshot)
    -> Identity (CohortId, CohortVariables))
-> ASetter
     [(CohortId, CohortSnapshot)]
     [(CohortId, CohortVariables)]
     CohortSnapshot
     CohortVariables
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (CohortSnapshot -> Identity CohortVariables)
-> (CohortId, CohortSnapshot)
-> Identity (CohortId, CohortVariables)
forall s t a b. Field2 s t a b => Lens s t a b
Lens
  (CohortId, CohortSnapshot)
  (CohortId, CohortVariables)
  CohortSnapshot
  CohortVariables
_2) CohortSnapshot -> CohortVariables
C._csVariables ([(CohortId, CohortSnapshot)] -> [(CohortId, CohortVariables)])
-> [(CohortId, CohortSnapshot)] -> [(CohortId, CohortVariables)]
forall a b. (a -> b) -> a -> b
$ ((CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))
 -> (CohortId, CohortSnapshot))
-> [(CohortId,
     (CohortSnapshot, Cohort (TVar CursorVariableValues)))]
-> [(CohortId, CohortSnapshot)]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (((CohortSnapshot, Cohort (TVar CursorVariableValues))
 -> CohortSnapshot)
-> (CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))
-> (CohortId, CohortSnapshot)
forall a b. (a -> b) -> (CohortId, a) -> (CohortId, b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (CohortSnapshot, Cohort (TVar CursorVariableValues))
-> CohortSnapshot
forall a b. (a, b) -> a
fst) [(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))]
cohorts)
          ResolvedConnectionTemplate b
resolvedConnectionTemplate
      let dbExecTimeMetric :: HistogramVector SubscriptionLabel
dbExecTimeMetric = SubscriptionMetrics -> HistogramVector SubscriptionLabel
submDBExecTotalTime (SubscriptionMetrics -> HistogramVector SubscriptionLabel)
-> SubscriptionMetrics -> HistogramVector SubscriptionLabel
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics -> SubscriptionMetrics
pmSubscriptionMetrics (PrometheusMetrics -> SubscriptionMetrics)
-> PrometheusMetrics -> SubscriptionMetrics
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics
prometheusMetrics
      IO GranularPrometheusMetricsState
-> Bool
-> HashMap (Maybe OperationName) Int
-> ParameterizedQueryHash
-> SubscriptionKindLabel
-> (SubscriptionLabel -> IO ())
-> IO ()
forall (m :: * -> *).
MonadIO m =>
IO GranularPrometheusMetricsState
-> Bool
-> HashMap (Maybe OperationName) Int
-> ParameterizedQueryHash
-> SubscriptionKindLabel
-> (SubscriptionLabel -> IO ())
-> m ()
recordSubcriptionMetric
        IO GranularPrometheusMetricsState
granularPrometheusMetricsState
        Bool
True
        HashMap (Maybe OperationName) Int
operationNames
        ParameterizedQueryHash
parameterizedQueryHash
        SubscriptionKindLabel
streamingSubscriptionLabel
        ((SubscriptionLabel -> Double -> IO ())
-> Double -> SubscriptionLabel -> IO ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip (HistogramVector SubscriptionLabel
-> SubscriptionLabel -> Double -> IO ()
forall label.
Ord label =>
HistogramVector label -> label -> Double -> IO ()
HistogramVector.observe HistogramVector SubscriptionLabel
dbExecTimeMetric) (DiffTime -> Double
forall a b. (Real a, Fractional b) => a -> b
realToFrac DiffTime
queryExecutionTime))

      PollerResponseState
previousPollerResponseState <- TVar PollerResponseState -> IO PollerResponseState
forall a. TVar a -> IO a
STM.readTVarIO TVar PollerResponseState
pollerResponseState

      Maybe PollDetailsError
maybeError <- case Either QErr [(CohortId, ByteString, CursorVariableValues)]
mxRes of
        Left QErr
err -> do
          Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (PollerResponseState
previousPollerResponseState PollerResponseState -> PollerResponseState -> Bool
forall a. Eq a => a -> a -> Bool
== PollerResponseState
PRSSuccess) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
            Gauge -> IO ()
Prometheus.Gauge.inc (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ SubscriptionMetrics -> Gauge
submActiveStreamingPollersInError (SubscriptionMetrics -> Gauge) -> SubscriptionMetrics -> Gauge
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics -> SubscriptionMetrics
pmSubscriptionMetrics PrometheusMetrics
prometheusMetrics
            STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar PollerResponseState -> PollerResponseState -> STM ()
forall a. TVar a -> a -> STM ()
STM.writeTVar TVar PollerResponseState
pollerResponseState PollerResponseState
PRSError
          let pollDetailsError :: PollDetailsError
pollDetailsError =
                PollDetailsError
                  { _pdeBatchId :: BatchId
_pdeBatchId = BatchId
batchId,
                    _pdeErrorDetails :: QErr
_pdeErrorDetails = QErr
err
                  }
          Maybe PollDetailsError -> IO (Maybe PollDetailsError)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe PollDetailsError -> IO (Maybe PollDetailsError))
-> Maybe PollDetailsError -> IO (Maybe PollDetailsError)
forall a b. (a -> b) -> a -> b
$ PollDetailsError -> Maybe PollDetailsError
forall a. a -> Maybe a
Just PollDetailsError
pollDetailsError
        Right [(CohortId, ByteString, CursorVariableValues)]
_ -> do
          Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (PollerResponseState
previousPollerResponseState PollerResponseState -> PollerResponseState -> Bool
forall a. Eq a => a -> a -> Bool
== PollerResponseState
PRSError) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
            Gauge -> IO ()
Prometheus.Gauge.dec (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ SubscriptionMetrics -> Gauge
submActiveStreamingPollersInError (SubscriptionMetrics -> Gauge) -> SubscriptionMetrics -> Gauge
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics -> SubscriptionMetrics
pmSubscriptionMetrics PrometheusMetrics
prometheusMetrics
            STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar PollerResponseState -> PollerResponseState -> STM ()
forall a. TVar a -> a -> STM ()
STM.writeTVar TVar PollerResponseState
pollerResponseState PollerResponseState
PRSSuccess
          Maybe PollDetailsError -> IO (Maybe PollDetailsError)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe PollDetailsError
forall a. Maybe a
Nothing

      let subscriptionMeta :: SubscriptionMetadata
subscriptionMeta = DiffTime -> SubscriptionMetadata
SubscriptionMetadata (DiffTime -> SubscriptionMetadata)
-> DiffTime -> SubscriptionMetadata
forall a b. (a -> b) -> a -> b
$ DiffTime -> DiffTime
forall x y. (Duration x, Duration y) => x -> y
convertDuration DiffTime
queryExecutionTime
          operations :: [(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
  Maybe CursorVariableValues,
  (CohortSnapshot, Cohort (TVar CursorVariableValues)))]
operations = [(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))]
-> Either QErr [(CohortId, ByteString, CursorVariableValues)]
-> [(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
     Maybe CursorVariableValues,
     (CohortSnapshot, Cohort (TVar CursorVariableValues)))]
getCohortOperations [(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))]
cohorts Either QErr [(CohortId, ByteString, CursorVariableValues)]
mxRes
          -- batch response size is the sum of the response sizes of the cohorts
          batchResponseSize :: Maybe Int
batchResponseSize =
            case Either QErr [(CohortId, ByteString, CursorVariableValues)]
mxRes of
              Left QErr
_ -> Maybe Int
forall a. Maybe a
Nothing
              Right [(CohortId, ByteString, CursorVariableValues)]
resp -> Int -> Maybe Int
forall a. a -> Maybe a
Just (Int -> Maybe Int) -> Int -> Maybe Int
forall a b. (a -> b) -> a -> b
$ Sum Int -> Int
forall a. Sum a -> a
getSum (Sum Int -> Int) -> Sum Int -> Int
forall a b. (a -> b) -> a -> b
$ ((CohortId, ByteString, CursorVariableValues) -> Sum Int)
-> [(CohortId, ByteString, CursorVariableValues)] -> Sum Int
forall m a. Monoid m => (a -> m) -> [a] -> m
forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
foldMap ((\(CohortId
_, ByteString
sqlResp, CursorVariableValues
_) -> Int -> Sum Int
forall a. a -> Sum a
Sum (Int -> Sum Int) -> (ByteString -> Int) -> ByteString -> Sum Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Int
BS.length (ByteString -> Sum Int) -> ByteString -> Sum Int
forall a b. (a -> b) -> a -> b
$ ByteString
sqlResp)) [(CohortId, ByteString, CursorVariableValues)]
resp
      (DiffTime
pushTime, [(CohortExecutionDetails,
  (CohortVariables,
   (Cohort (TVar CursorVariableValues), CohortVariables,
    [Subscriber])))]
cohortsExecutionDetails) <- IO
  [(CohortExecutionDetails,
    (CohortVariables,
     (Cohort (TVar CursorVariableValues), CohortVariables,
      [Subscriber])))]
-> IO
     (DiffTime,
      [(CohortExecutionDetails,
        (CohortVariables,
         (Cohort (TVar CursorVariableValues), CohortVariables,
          [Subscriber])))])
forall (m :: * -> *) a. MonadIO m => m a -> m (DiffTime, a)
withElapsedTime
        (IO
   [(CohortExecutionDetails,
     (CohortVariables,
      (Cohort (TVar CursorVariableValues), CohortVariables,
       [Subscriber])))]
 -> IO
      (DiffTime,
       [(CohortExecutionDetails,
         (CohortVariables,
          (Cohort (TVar CursorVariableValues), CohortVariables,
           [Subscriber])))]))
-> IO
     [(CohortExecutionDetails,
       (CohortVariables,
        (Cohort (TVar CursorVariableValues), CohortVariables,
         [Subscriber])))]
-> IO
     (DiffTime,
      [(CohortExecutionDetails,
        (CohortVariables,
         (Cohort (TVar CursorVariableValues), CohortVariables,
          [Subscriber])))])
forall a b. (a -> b) -> a -> b
$ [(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
  Maybe CursorVariableValues,
  (CohortSnapshot, Cohort (TVar CursorVariableValues)))]
-> ((GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
     Maybe CursorVariableValues,
     (CohortSnapshot, Cohort (TVar CursorVariableValues)))
    -> IO
         (CohortExecutionDetails,
          (CohortVariables,
           (Cohort (TVar CursorVariableValues), CohortVariables,
            [Subscriber]))))
-> IO
     [(CohortExecutionDetails,
       (CohortVariables,
        (Cohort (TVar CursorVariableValues), CohortVariables,
         [Subscriber])))]
forall (t :: * -> *) a b.
Traversable t =>
t a -> (a -> IO b) -> IO (t b)
A.forConcurrently [(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
  Maybe CursorVariableValues,
  (CohortSnapshot, Cohort (TVar CursorVariableValues)))]
operations
        (((GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
   Maybe CursorVariableValues,
   (CohortSnapshot, Cohort (TVar CursorVariableValues)))
  -> IO
       (CohortExecutionDetails,
        (CohortVariables,
         (Cohort (TVar CursorVariableValues), CohortVariables,
          [Subscriber]))))
 -> IO
      [(CohortExecutionDetails,
        (CohortVariables,
         (Cohort (TVar CursorVariableValues), CohortVariables,
          [Subscriber])))])
-> ((GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
     Maybe CursorVariableValues,
     (CohortSnapshot, Cohort (TVar CursorVariableValues)))
    -> IO
         (CohortExecutionDetails,
          (CohortVariables,
           (Cohort (TVar CursorVariableValues), CohortVariables,
            [Subscriber]))))
-> IO
     [(CohortExecutionDetails,
       (CohortVariables,
        (Cohort (TVar CursorVariableValues), CohortVariables,
         [Subscriber])))]
forall a b. (a -> b) -> a -> b
$ \(GQResult ByteString
res, CohortId
cohortId, Maybe (ResponseHash, Int)
respData, Maybe CursorVariableValues
latestCursorValueMaybe, (CohortSnapshot
snapshot, Cohort (TVar CursorVariableValues)
cohort)) -> do
          let latestCursorValue :: CursorVariableValues
latestCursorValue@(CursorVariableValues HashMap Name TxtEncodedVal
updatedCursorVarVal) =
                let prevCursorVariableValue :: CursorVariableValues
prevCursorVariableValue = HashMap Name TxtEncodedVal -> CursorVariableValues
CursorVariableValues (HashMap Name TxtEncodedVal -> CursorVariableValues)
-> HashMap Name TxtEncodedVal -> CursorVariableValues
forall a b. (a -> b) -> a -> b
$ ValidatedVariables (HashMap Name) -> HashMap Name TxtEncodedVal
forall (f :: * -> *). ValidatedVariables f -> f TxtEncodedVal
C._unValidatedVariables (ValidatedVariables (HashMap Name) -> HashMap Name TxtEncodedVal)
-> ValidatedVariables (HashMap Name) -> HashMap Name TxtEncodedVal
forall a b. (a -> b) -> a -> b
$ CohortVariables -> ValidatedVariables (HashMap Name)
C._cvCursorVariables (CohortVariables -> ValidatedVariables (HashMap Name))
-> CohortVariables -> ValidatedVariables (HashMap Name)
forall a b. (a -> b) -> a -> b
$ CohortSnapshot -> CohortVariables
C._csVariables CohortSnapshot
snapshot
                 in case Maybe CursorVariableValues
latestCursorValueMaybe of
                      Maybe CursorVariableValues
Nothing -> CursorVariableValues
prevCursorVariableValue -- Nothing indicates there was an error when the query was polled
                      Just CursorVariableValues
currentPollCursorValue -> CursorVariableValues
-> CursorVariableValues -> CursorVariableValues
mergeOldAndNewCursorValues CursorVariableValues
prevCursorVariableValue CursorVariableValues
currentPollCursorValue
          ([SubscriberExecutionDetails]
pushedSubscribers, [SubscriberExecutionDetails]
ignoredSubscribers) <-
            -- Push the result to the subscribers present in the cohorts
            GQResult ByteString
-> Maybe ResponseHash
-> SubscriptionMetadata
-> CursorVariableValues
-> Name
-> (CohortSnapshot 'Streaming, Cohort 'Streaming)
-> IO ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
pushResultToCohort GQResult ByteString
res ((ResponseHash, Int) -> ResponseHash
forall a b. (a, b) -> a
fst ((ResponseHash, Int) -> ResponseHash)
-> Maybe (ResponseHash, Int) -> Maybe ResponseHash
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (ResponseHash, Int)
respData) SubscriptionMetadata
subscriptionMeta CursorVariableValues
latestCursorValue Name
rootFieldName (CohortSnapshot
CohortSnapshot 'Streaming
snapshot, Cohort (TVar CursorVariableValues)
Cohort 'Streaming
cohort)
          let currentCohortKey :: CohortVariables
currentCohortKey = CohortSnapshot -> CohortVariables
C._csVariables CohortSnapshot
snapshot
              updatedCohortKey :: CohortVariables
updatedCohortKey = ValidatedVariables (HashMap Name)
-> CohortVariables -> CohortVariables
modifyCursorCohortVariables (HashMap Name TxtEncodedVal -> ValidatedVariables (HashMap Name)
forall (f :: * -> *). f TxtEncodedVal -> ValidatedVariables f
mkUnsafeValidateVariables HashMap Name TxtEncodedVal
updatedCursorVarVal) (CohortVariables -> CohortVariables)
-> CohortVariables -> CohortVariables
forall a b. (a -> b) -> a -> b
$ CohortSnapshot -> CohortVariables
C._csVariables CohortSnapshot
snapshot
              snapshottedNewSubs :: [Subscriber]
snapshottedNewSubs = CohortSnapshot -> [Subscriber]
C._csNewSubscribers CohortSnapshot
snapshot
              cohortExecutionDetails :: CohortExecutionDetails
cohortExecutionDetails =
                CohortExecutionDetails
                  { _cedCohortId :: CohortId
_cedCohortId = CohortId
cohortId,
                    _cedVariables :: CohortVariables
_cedVariables = CohortVariables
currentCohortKey,
                    _cedPushedTo :: [SubscriberExecutionDetails]
_cedPushedTo = [SubscriberExecutionDetails]
pushedSubscribers,
                    _cedIgnored :: [SubscriberExecutionDetails]
_cedIgnored = [SubscriberExecutionDetails]
ignoredSubscribers,
                    _cedResponseSize :: Maybe Int
_cedResponseSize = (ResponseHash, Int) -> Int
forall a b. (a, b) -> b
snd ((ResponseHash, Int) -> Int)
-> Maybe (ResponseHash, Int) -> Maybe Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (ResponseHash, Int)
respData,
                    _cedBatchId :: BatchId
_cedBatchId = BatchId
batchId
                  }
          (CohortExecutionDetails,
 (CohortVariables,
  (Cohort (TVar CursorVariableValues), CohortVariables,
   [Subscriber])))
-> IO
     (CohortExecutionDetails,
      (CohortVariables,
       (Cohort (TVar CursorVariableValues), CohortVariables,
        [Subscriber])))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (CohortExecutionDetails
cohortExecutionDetails, (CohortVariables
currentCohortKey, (Cohort (TVar CursorVariableValues)
cohort, CohortVariables
updatedCohortKey, [Subscriber]
snapshottedNewSubs)))
      let processedCohortBatch :: [(CohortVariables,
  (Cohort (TVar CursorVariableValues), CohortVariables,
   [Subscriber]))]
processedCohortBatch = (CohortExecutionDetails,
 (CohortVariables,
  (Cohort (TVar CursorVariableValues), CohortVariables,
   [Subscriber])))
-> (CohortVariables,
    (Cohort (TVar CursorVariableValues), CohortVariables,
     [Subscriber]))
forall a b. (a, b) -> b
snd ((CohortExecutionDetails,
  (CohortVariables,
   (Cohort (TVar CursorVariableValues), CohortVariables,
    [Subscriber])))
 -> (CohortVariables,
     (Cohort (TVar CursorVariableValues), CohortVariables,
      [Subscriber])))
-> [(CohortExecutionDetails,
     (CohortVariables,
      (Cohort (TVar CursorVariableValues), CohortVariables,
       [Subscriber])))]
-> [(CohortVariables,
     (Cohort (TVar CursorVariableValues), CohortVariables,
      [Subscriber]))]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [(CohortExecutionDetails,
  (CohortVariables,
   (Cohort (TVar CursorVariableValues), CohortVariables,
    [Subscriber])))]
cohortsExecutionDetails

          -- Note: We want to keep the '_bedPgExecutionTime' field for backwards
          -- compatibility reason, which will be 'Nothing' for non-PG backends.
          -- See https://hasurahq.atlassian.net/browse/GS-329
          pgExecutionTime :: Maybe DiffTime
pgExecutionTime = case BackendTag b -> BackendType
forall (b :: BackendType). BackendTag b -> BackendType
reify (forall (b :: BackendType). HasTag b => BackendTag b
backendTag @b) of
            Postgres PostgresKind
Vanilla -> DiffTime -> Maybe DiffTime
forall a. a -> Maybe a
Just DiffTime
queryExecutionTime
            BackendType
_ -> Maybe DiffTime
forall a. Maybe a
Nothing

          batchExecDetails :: BatchExecutionDetails
batchExecDetails =
            Maybe DiffTime
-> DiffTime
-> DiffTime
-> BatchId
-> [CohortExecutionDetails]
-> Maybe Int
-> BatchExecutionDetails
BatchExecutionDetails
              Maybe DiffTime
pgExecutionTime
              DiffTime
queryExecutionTime
              DiffTime
pushTime
              BatchId
batchId
              ((CohortExecutionDetails,
 (CohortVariables,
  (Cohort (TVar CursorVariableValues), CohortVariables,
   [Subscriber])))
-> CohortExecutionDetails
forall a b. (a, b) -> a
fst ((CohortExecutionDetails,
  (CohortVariables,
   (Cohort (TVar CursorVariableValues), CohortVariables,
    [Subscriber])))
 -> CohortExecutionDetails)
-> [(CohortExecutionDetails,
     (CohortVariables,
      (Cohort (TVar CursorVariableValues), CohortVariables,
       [Subscriber])))]
-> [CohortExecutionDetails]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [(CohortExecutionDetails,
  (CohortVariables,
   (Cohort (TVar CursorVariableValues), CohortVariables,
    [Subscriber])))]
cohortsExecutionDetails)
              Maybe Int
batchResponseSize
      (BatchExecutionDetails,
 [(CohortVariables,
   (Cohort (TVar CursorVariableValues), CohortVariables,
    [Subscriber]))],
 Maybe PollDetailsError)
-> IO
     (BatchExecutionDetails,
      [(CohortVariables,
        (Cohort (TVar CursorVariableValues), CohortVariables,
         [Subscriber]))],
      Maybe PollDetailsError)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((BatchExecutionDetails,
  [(CohortVariables,
    (Cohort (TVar CursorVariableValues), CohortVariables,
     [Subscriber]))],
  Maybe PollDetailsError)
 -> IO
      (BatchExecutionDetails,
       [(CohortVariables,
         (Cohort (TVar CursorVariableValues), CohortVariables,
          [Subscriber]))],
       Maybe PollDetailsError))
-> (BatchExecutionDetails,
    [(CohortVariables,
      (Cohort (TVar CursorVariableValues), CohortVariables,
       [Subscriber]))],
    Maybe PollDetailsError)
-> IO
     (BatchExecutionDetails,
      [(CohortVariables,
        (Cohort (TVar CursorVariableValues), CohortVariables,
         [Subscriber]))],
      Maybe PollDetailsError)
forall a b. (a -> b) -> a -> b
$ (BatchExecutionDetails
batchExecDetails, [(CohortVariables,
  (Cohort (TVar CursorVariableValues), CohortVariables,
   [Subscriber]))]
processedCohortBatch, Maybe PollDetailsError
maybeError)

    (DiffTime,
 ([BatchExecutionDetails],
  [[(CohortVariables,
     (Cohort (TVar CursorVariableValues), CohortVariables,
      [Subscriber]))]],
  [Maybe PollDetailsError]))
-> IO
     (DiffTime,
      ([BatchExecutionDetails],
       [[(CohortVariables,
          (Cohort (TVar CursorVariableValues), CohortVariables,
           [Subscriber]))]],
       [Maybe PollDetailsError]))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (DiffTime
snapshotTime, [(BatchExecutionDetails,
  [(CohortVariables,
    (Cohort (TVar CursorVariableValues), CohortVariables,
     [Subscriber]))],
  Maybe PollDetailsError)]
-> ([BatchExecutionDetails],
    [[(CohortVariables,
       (Cohort (TVar CursorVariableValues), CohortVariables,
        [Subscriber]))]],
    [Maybe PollDetailsError])
forall a b c. [(a, b, c)] -> ([a], [b], [c])
unzip3 [(BatchExecutionDetails,
  [(CohortVariables,
    (Cohort (TVar CursorVariableValues), CohortVariables,
     [Subscriber]))],
  Maybe PollDetailsError)]
batchesDetailsAndProcessedCohortsWithMaybeError)

  let initPollDetails :: PollDetails
initPollDetails =
        PollDetails
          { _pdPollerId :: PollerId
_pdPollerId = PollerId
pollerId,
            _pdKind :: SubscriptionType
_pdKind = SubscriptionType
Streaming,
            _pdGeneratedSql :: Text
_pdGeneratedSql = MultiplexedQuery b -> Text
forall a. ToTxt a => a -> Text
toTxt MultiplexedQuery b
query,
            _pdSnapshotTime :: DiffTime
_pdSnapshotTime = DiffTime
snapshotTime,
            _pdBatches :: [BatchExecutionDetails]
_pdBatches = [BatchExecutionDetails]
batchesDetails,
            _pdLiveQueryOptions :: SubscriptionsOptions
_pdLiveQueryOptions = SubscriptionsOptions
streamingQueryOpts,
            _pdTotalTime :: DiffTime
_pdTotalTime = DiffTime
totalTime,
            _pdSource :: SourceName
_pdSource = SourceName
sourceName,
            _pdRole :: RoleName
_pdRole = RoleName
roleName,
            _pdParameterizedQueryHash :: ParameterizedQueryHash
_pdParameterizedQueryHash = ParameterizedQueryHash
parameterizedQueryHash,
            _pdLogLevel :: LogLevel
_pdLogLevel = LogLevel
LevelInfo,
            _pdErrors :: Maybe [PollDetailsError]
_pdErrors = Maybe [PollDetailsError]
forall a. Maybe a
Nothing
          }
      maybePollDetailsErrors :: Maybe [PollDetailsError]
maybePollDetailsErrors = [Maybe PollDetailsError] -> Maybe [PollDetailsError]
forall (t :: * -> *) (f :: * -> *) a.
(Traversable t, Applicative f) =>
t (f a) -> f (t a)
forall (f :: * -> *) a. Applicative f => [f a] -> f [a]
sequenceA [Maybe PollDetailsError]
maybeErrors
      pollDetails :: PollDetails
pollDetails = case Maybe [PollDetailsError]
maybePollDetailsErrors of
        Maybe [PollDetailsError]
Nothing -> PollDetails
initPollDetails
        Just [PollDetailsError]
pollDetailsError ->
          PollDetails
initPollDetails
            { _pdLogLevel :: LogLevel
_pdLogLevel = LogLevel
LevelError,
              _pdErrors :: Maybe [PollDetailsError]
_pdErrors = [PollDetailsError] -> Maybe [PollDetailsError]
forall a. a -> Maybe a
Just [PollDetailsError]
pollDetailsError
            }

  STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    -- constructing a cohort map for all the cohorts that have been
    -- processed in the current poll

    -- processed cohorts is an array of tuples of the current poll cohort variables and a tuple
    -- of the cohort and the new cohort key
    let processedCohortsMap :: HashMap
  CohortVariables
  (Cohort (TVar CursorVariableValues), CohortVariables, [Subscriber])
processedCohortsMap = [(CohortVariables,
  (Cohort (TVar CursorVariableValues), CohortVariables,
   [Subscriber]))]
-> HashMap
     CohortVariables
     (Cohort (TVar CursorVariableValues), CohortVariables, [Subscriber])
forall k v. (Eq k, Hashable k) => [(k, v)] -> HashMap k v
HashMap.fromList ([(CohortVariables,
   (Cohort (TVar CursorVariableValues), CohortVariables,
    [Subscriber]))]
 -> HashMap
      CohortVariables
      (Cohort (TVar CursorVariableValues), CohortVariables,
       [Subscriber]))
-> [(CohortVariables,
     (Cohort (TVar CursorVariableValues), CohortVariables,
      [Subscriber]))]
-> HashMap
     CohortVariables
     (Cohort (TVar CursorVariableValues), CohortVariables, [Subscriber])
forall a b. (a -> b) -> a -> b
$ [[(CohortVariables,
   (Cohort (TVar CursorVariableValues), CohortVariables,
    [Subscriber]))]]
-> [(CohortVariables,
     (Cohort (TVar CursorVariableValues), CohortVariables,
      [Subscriber]))]
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat [[(CohortVariables,
   (Cohort (TVar CursorVariableValues), CohortVariables,
    [Subscriber]))]]
processedCohorts

    -- rebuilding the cohorts and the cohort map, see [Streaming subscription polling]
    -- and [Streaming subscriptions rebuilding cohort map]
    [(CohortVariables, Cohort (TVar CursorVariableValues))]
currentCohorts <- TMap CohortVariables (Cohort (TVar CursorVariableValues))
-> STM [(CohortVariables, Cohort (TVar CursorVariableValues))]
forall k v. TMap k v -> STM [(k, v)]
TMap.toList TMap CohortVariables (Cohort (TVar CursorVariableValues))
CohortMap 'Streaming
cohortMap
    HashMap CohortVariables (Cohort (TVar CursorVariableValues))
updatedCohortsMap <-
      (HashMap CohortVariables (Cohort (TVar CursorVariableValues))
 -> (CohortVariables, Cohort (TVar CursorVariableValues))
 -> STM
      (HashMap CohortVariables (Cohort (TVar CursorVariableValues))))
-> HashMap CohortVariables (Cohort (TVar CursorVariableValues))
-> [(CohortVariables, Cohort (TVar CursorVariableValues))]
-> STM
     (HashMap CohortVariables (Cohort (TVar CursorVariableValues)))
forall (t :: * -> *) (m :: * -> *) b a.
(Foldable t, Monad m) =>
(b -> a -> m b) -> b -> t a -> m b
foldM
        ( \HashMap CohortVariables (Cohort (TVar CursorVariableValues))
accCohortMap (CohortVariables
currentCohortKey, Cohort (TVar CursorVariableValues)
currentCohort) -> do
            let processedCohortMaybe :: Maybe
  (Cohort (TVar CursorVariableValues), CohortVariables, [Subscriber])
processedCohortMaybe = CohortVariables
-> HashMap
     CohortVariables
     (Cohort (TVar CursorVariableValues), CohortVariables, [Subscriber])
-> Maybe
     (Cohort (TVar CursorVariableValues), CohortVariables, [Subscriber])
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HashMap.lookup CohortVariables
currentCohortKey HashMap
  CohortVariables
  (Cohort (TVar CursorVariableValues), CohortVariables, [Subscriber])
processedCohortsMap
            case Maybe
  (Cohort (TVar CursorVariableValues), CohortVariables, [Subscriber])
processedCohortMaybe of
              -- A new cohort has been added in the cohort map whilst the
              -- current poll was happening, in this case we just return it
              -- as it is
              Maybe
  (Cohort (TVar CursorVariableValues), CohortVariables, [Subscriber])
Nothing -> (Cohort (TVar CursorVariableValues)
 -> Cohort (TVar CursorVariableValues)
 -> STM (Cohort (TVar CursorVariableValues)))
-> CohortVariables
-> Cohort (TVar CursorVariableValues)
-> HashMap CohortVariables (Cohort (TVar CursorVariableValues))
-> STM
     (HashMap CohortVariables (Cohort (TVar CursorVariableValues)))
forall (m :: * -> *) k v.
(Monad m, Hashable k) =>
(v -> v -> m v) -> k -> v -> HashMap k v -> m (HashMap k v)
HashMap.insertWithM Cohort (TVar CursorVariableValues)
-> Cohort (TVar CursorVariableValues)
-> STM (Cohort (TVar CursorVariableValues))
Cohort 'Streaming -> Cohort 'Streaming -> STM (Cohort 'Streaming)
mergeCohorts CohortVariables
currentCohortKey Cohort (TVar CursorVariableValues)
currentCohort HashMap CohortVariables (Cohort (TVar CursorVariableValues))
accCohortMap
              Just (Cohort (TVar CursorVariableValues)
processedCohort, CohortVariables
updatedCohortKey, [Subscriber]
snapshottedNewSubs) -> do
                Cohort (TVar CursorVariableValues) -> [Subscriber] -> STM ()
forall {streamCursorVars}.
Cohort streamCursorVars -> [Subscriber] -> STM ()
updateCohortSubscribers Cohort (TVar CursorVariableValues)
currentCohort [Subscriber]
snapshottedNewSubs
                [(SubscriberId, Subscriber)]
currentCohortExistingSubscribers <- TMap SubscriberId Subscriber -> STM [(SubscriberId, Subscriber)]
forall k v. TMap k v -> STM [(k, v)]
TMap.toList (TMap SubscriberId Subscriber -> STM [(SubscriberId, Subscriber)])
-> TMap SubscriberId Subscriber -> STM [(SubscriberId, Subscriber)]
forall a b. (a -> b) -> a -> b
$ Cohort (TVar CursorVariableValues) -> TMap SubscriberId Subscriber
forall streamCursorVars.
Cohort streamCursorVars -> TMap SubscriberId Subscriber
C._cExistingSubscribers Cohort (TVar CursorVariableValues)
currentCohort
                HashMap SubscriberId Subscriber
newlyAddedSubscribers <- TMap SubscriberId Subscriber
-> STM (HashMap SubscriberId Subscriber)
forall k v. TMap k v -> STM (HashMap k v)
TMap.getMap (TMap SubscriberId Subscriber
 -> STM (HashMap SubscriberId Subscriber))
-> TMap SubscriberId Subscriber
-> STM (HashMap SubscriberId Subscriber)
forall a b. (a -> b) -> a -> b
$ Cohort (TVar CursorVariableValues) -> TMap SubscriberId Subscriber
forall streamCursorVars.
Cohort streamCursorVars -> TMap SubscriberId Subscriber
C._cNewSubscribers Cohort (TVar CursorVariableValues)
currentCohort
                -- The newly added subscribers should not be added to the updated cohort, they should be added
                -- to the old cohort because they need to be procesed for the first time with their initial value.
                -- For example: let's take 2 cohorts,
                -- s means a subscriber
                -- C1 - [s1, s2]
                -- C2 -> [s3]
                -- and S4 is added to C1 and S5 is added to C2 during the poll.
                --
                -- Let's say C1 is updated to C2 and C2 to C3, then
                -- the updated cohort map should look like:
                -- C1 -> [s4]
                -- C2 -> [s1, s2, s5]
                -- C3 -> [s3]
                --
                -- Note that s4 and s5 have not been added to the updated cohort and instead
                -- are in the original cohort they were added in.

                -- all the existing subsribers are removed from the current cohort and
                -- the newly added subscribers are added back
                HashMap CohortVariables (Cohort (TVar CursorVariableValues))
accCohortMapWithCurrentCohort <-
                  if HashMap SubscriberId Subscriber -> Bool
forall a. HashMap SubscriberId a -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null HashMap SubscriberId Subscriber
newlyAddedSubscribers
                    then HashMap CohortVariables (Cohort (TVar CursorVariableValues))
-> STM
     (HashMap CohortVariables (Cohort (TVar CursorVariableValues)))
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure HashMap CohortVariables (Cohort (TVar CursorVariableValues))
accCohortMap
                    else do
                      -- Creating a new cohort which will only contain the newly added subscribers
                      Cohort (TVar CursorVariableValues)
newCohort <- do
                        TMap SubscriberId Subscriber
existingSubs <- STM (TMap SubscriberId Subscriber)
forall k v. STM (TMap k v)
TMap.new
                        TMap SubscriberId Subscriber
newSubs <- STM (TMap SubscriberId Subscriber)
forall k v. STM (TMap k v)
TMap.new
                        Cohort (TVar CursorVariableValues)
-> STM (Cohort (TVar CursorVariableValues))
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
                          (Cohort (TVar CursorVariableValues)
 -> STM (Cohort (TVar CursorVariableValues)))
-> Cohort (TVar CursorVariableValues)
-> STM (Cohort (TVar CursorVariableValues))
forall a b. (a -> b) -> a -> b
$ CohortId
-> TVar (Maybe ResponseHash)
-> TMap SubscriberId Subscriber
-> TMap SubscriberId Subscriber
-> TVar CursorVariableValues
-> Cohort (TVar CursorVariableValues)
forall streamCursorVars.
CohortId
-> TVar (Maybe ResponseHash)
-> TMap SubscriberId Subscriber
-> TMap SubscriberId Subscriber
-> streamCursorVars
-> Cohort streamCursorVars
C.Cohort
                            (Cohort (TVar CursorVariableValues) -> CohortId
forall streamCursorVars. Cohort streamCursorVars -> CohortId
C._cCohortId Cohort (TVar CursorVariableValues)
currentCohort)
                            (Cohort (TVar CursorVariableValues) -> TVar (Maybe ResponseHash)
forall streamCursorVars.
Cohort streamCursorVars -> TVar (Maybe ResponseHash)
C._cPreviousResponse Cohort (TVar CursorVariableValues)
currentCohort)
                            TMap SubscriberId Subscriber
existingSubs
                            TMap SubscriberId Subscriber
newSubs
                            (Cohort (TVar CursorVariableValues) -> TVar CursorVariableValues
forall streamCursorVars.
Cohort streamCursorVars -> streamCursorVars
C._cStreamCursorVariables Cohort (TVar CursorVariableValues)
currentCohort)
                      TMap SubscriberId Subscriber
-> HashMap SubscriberId Subscriber -> STM ()
forall k v. TMap k v -> HashMap k v -> STM ()
TMap.replace (Cohort (TVar CursorVariableValues) -> TMap SubscriberId Subscriber
forall streamCursorVars.
Cohort streamCursorVars -> TMap SubscriberId Subscriber
C._cNewSubscribers Cohort (TVar CursorVariableValues)
newCohort) HashMap SubscriberId Subscriber
newlyAddedSubscribers
                      (Cohort (TVar CursorVariableValues)
 -> Cohort (TVar CursorVariableValues)
 -> STM (Cohort (TVar CursorVariableValues)))
-> CohortVariables
-> Cohort (TVar CursorVariableValues)
-> HashMap CohortVariables (Cohort (TVar CursorVariableValues))
-> STM
     (HashMap CohortVariables (Cohort (TVar CursorVariableValues)))
forall (m :: * -> *) k v.
(Monad m, Hashable k) =>
(v -> v -> m v) -> k -> v -> HashMap k v -> m (HashMap k v)
HashMap.insertWithM Cohort (TVar CursorVariableValues)
-> Cohort (TVar CursorVariableValues)
-> STM (Cohort (TVar CursorVariableValues))
Cohort 'Streaming -> Cohort 'Streaming -> STM (Cohort 'Streaming)
mergeCohorts CohortVariables
currentCohortKey Cohort (TVar CursorVariableValues)
newCohort HashMap CohortVariables (Cohort (TVar CursorVariableValues))
accCohortMap
                let allCurrentSubscribers :: HashSet SubscriberId
allCurrentSubscribers = [SubscriberId] -> HashSet SubscriberId
forall a. (Eq a, Hashable a) => [a] -> HashSet a
Set.fromList ([SubscriberId] -> HashSet SubscriberId)
-> [SubscriberId] -> HashSet SubscriberId
forall a b. (a -> b) -> a -> b
$ (SubscriberId, Subscriber) -> SubscriberId
forall a b. (a, b) -> a
fst ((SubscriberId, Subscriber) -> SubscriberId)
-> [(SubscriberId, Subscriber)] -> [SubscriberId]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (HashMap SubscriberId Subscriber -> [(SubscriberId, Subscriber)]
forall k v. HashMap k v -> [(k, v)]
HashMap.toList HashMap SubscriberId Subscriber
newlyAddedSubscribers [(SubscriberId, Subscriber)]
-> [(SubscriberId, Subscriber)] -> [(SubscriberId, Subscriber)]
forall a. Semigroup a => a -> a -> a
<> [(SubscriberId, Subscriber)]
currentCohortExistingSubscribers)
                -- retain subscribers only if they still exist in the original cohort's subscriber.
                -- It may happen that a subscriber has stopped their subscription which means it will
                -- no longer exist in the cohort map, so we need to accordingly remove such subscribers
                -- from our processed cohorts.
                (SubscriberId -> Subscriber -> Bool)
-> TMap SubscriberId Subscriber -> STM ()
forall k v. (k -> v -> Bool) -> TMap k v -> STM ()
TMap.filterWithKey (\SubscriberId
k Subscriber
_ -> SubscriberId
k SubscriberId -> HashSet SubscriberId -> Bool
forall a. Eq a => a -> HashSet a -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` HashSet SubscriberId
allCurrentSubscribers) (TMap SubscriberId Subscriber -> STM ())
-> TMap SubscriberId Subscriber -> STM ()
forall a b. (a -> b) -> a -> b
$ Cohort (TVar CursorVariableValues) -> TMap SubscriberId Subscriber
forall streamCursorVars.
Cohort streamCursorVars -> TMap SubscriberId Subscriber
C._cExistingSubscribers Cohort (TVar CursorVariableValues)
processedCohort
                (SubscriberId -> Subscriber -> Bool)
-> TMap SubscriberId Subscriber -> STM ()
forall k v. (k -> v -> Bool) -> TMap k v -> STM ()
TMap.filterWithKey (\SubscriberId
k Subscriber
_ -> SubscriberId
k SubscriberId -> HashSet SubscriberId -> Bool
forall a. Eq a => a -> HashSet a -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` HashSet SubscriberId
allCurrentSubscribers) (TMap SubscriberId Subscriber -> STM ())
-> TMap SubscriberId Subscriber -> STM ()
forall a b. (a -> b) -> a -> b
$ Cohort (TVar CursorVariableValues) -> TMap SubscriberId Subscriber
forall streamCursorVars.
Cohort streamCursorVars -> TMap SubscriberId Subscriber
C._cNewSubscribers Cohort (TVar CursorVariableValues)
processedCohort
                (Cohort (TVar CursorVariableValues)
 -> Cohort (TVar CursorVariableValues)
 -> STM (Cohort (TVar CursorVariableValues)))
-> CohortVariables
-> Cohort (TVar CursorVariableValues)
-> HashMap CohortVariables (Cohort (TVar CursorVariableValues))
-> STM
     (HashMap CohortVariables (Cohort (TVar CursorVariableValues)))
forall (m :: * -> *) k v.
(Monad m, Hashable k) =>
(v -> v -> m v) -> k -> v -> HashMap k v -> m (HashMap k v)
HashMap.insertWithM Cohort (TVar CursorVariableValues)
-> Cohort (TVar CursorVariableValues)
-> STM (Cohort (TVar CursorVariableValues))
Cohort 'Streaming -> Cohort 'Streaming -> STM (Cohort 'Streaming)
mergeCohorts CohortVariables
updatedCohortKey Cohort (TVar CursorVariableValues)
processedCohort HashMap CohortVariables (Cohort (TVar CursorVariableValues))
accCohortMapWithCurrentCohort
        )
        HashMap CohortVariables (Cohort (TVar CursorVariableValues))
forall a. Monoid a => a
mempty
        [(CohortVariables, Cohort (TVar CursorVariableValues))]
currentCohorts
    TMap CohortVariables (Cohort (TVar CursorVariableValues))
-> HashMap CohortVariables (Cohort (TVar CursorVariableValues))
-> STM ()
forall k v. TMap k v -> HashMap k v -> STM ()
TMap.replace TMap CohortVariables (Cohort (TVar CursorVariableValues))
CohortMap 'Streaming
cohortMap HashMap CohortVariables (Cohort (TVar CursorVariableValues))
updatedCohortsMap
  SubscriptionPostPollHook
postPollHook PollDetails
pollDetails
  let totalTimeMetric :: HistogramVector SubscriptionLabel
totalTimeMetric = SubscriptionMetrics -> HistogramVector SubscriptionLabel
submTotalTime (SubscriptionMetrics -> HistogramVector SubscriptionLabel)
-> SubscriptionMetrics -> HistogramVector SubscriptionLabel
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics -> SubscriptionMetrics
pmSubscriptionMetrics (PrometheusMetrics -> SubscriptionMetrics)
-> PrometheusMetrics -> SubscriptionMetrics
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics
prometheusMetrics
  IO GranularPrometheusMetricsState
-> Bool
-> HashMap (Maybe OperationName) Int
-> ParameterizedQueryHash
-> SubscriptionKindLabel
-> (SubscriptionLabel -> IO ())
-> IO ()
forall (m :: * -> *).
MonadIO m =>
IO GranularPrometheusMetricsState
-> Bool
-> HashMap (Maybe OperationName) Int
-> ParameterizedQueryHash
-> SubscriptionKindLabel
-> (SubscriptionLabel -> IO ())
-> m ()
recordSubcriptionMetric
    IO GranularPrometheusMetricsState
granularPrometheusMetricsState
    Bool
True
    HashMap (Maybe OperationName) Int
operationNames
    ParameterizedQueryHash
parameterizedQueryHash
    SubscriptionKindLabel
streamingSubscriptionLabel
    ((SubscriptionLabel -> Double -> IO ())
-> Double -> SubscriptionLabel -> IO ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip (HistogramVector SubscriptionLabel
-> SubscriptionLabel -> Double -> IO ()
forall label.
Ord label =>
HistogramVector label -> label -> Double -> IO ()
HistogramVector.observe HistogramVector SubscriptionLabel
totalTimeMetric) (DiffTime -> Double
forall a b. (Real a, Fractional b) => a -> b
realToFrac DiffTime
totalTime))
  where
    SubscriptionsOptions BatchSize
batchSize RefetchInterval
_ = SubscriptionsOptions
streamingQueryOpts

    updateCohortSubscribers :: Cohort streamCursorVars -> [Subscriber] -> STM ()
updateCohortSubscribers (C.Cohort CohortId
_id TVar (Maybe ResponseHash)
_respRef TMap SubscriberId Subscriber
curOpsTV TMap SubscriberId Subscriber
newOpsTV streamCursorVars
_) [Subscriber]
snapshottedNewSubs = do
      [(SubscriberId, Subscriber)]
allNewOpsL <- TMap SubscriberId Subscriber -> STM [(SubscriberId, Subscriber)]
forall k v. TMap k v -> STM [(k, v)]
TMap.toList TMap SubscriberId Subscriber
newOpsTV
      let snapshottedNewSubsSet :: HashSet SubscriberId
snapshottedNewSubsSet = [SubscriberId] -> HashSet SubscriberId
forall a. (Eq a, Hashable a) => [a] -> HashSet a
Set.fromList ([SubscriberId] -> HashSet SubscriberId)
-> [SubscriberId] -> HashSet SubscriberId
forall a b. (a -> b) -> a -> b
$ Subscriber -> SubscriberId
_sId (Subscriber -> SubscriberId) -> [Subscriber] -> [SubscriberId]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [Subscriber]
snapshottedNewSubs
      [(SubscriberId, Subscriber)]
-> ((SubscriberId, Subscriber) -> STM ()) -> STM ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [(SubscriberId, Subscriber)]
allNewOpsL (((SubscriberId, Subscriber) -> STM ()) -> STM ())
-> ((SubscriberId, Subscriber) -> STM ()) -> STM ()
forall a b. (a -> b) -> a -> b
$ \(SubscriberId
subId, Subscriber
subscriber) ->
        Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (SubscriberId
subId SubscriberId -> HashSet SubscriberId -> Bool
forall a. Eq a => a -> HashSet a -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` HashSet SubscriberId
snapshottedNewSubsSet) do
          -- we only add the snapshotted new subscribers to the current subscribers
          -- because they have been sent the first message of the subscription. The
          -- new subscribers apart from the snapshotted new subscribers are yet to
          -- recieve their first message so we just keep them as new subscribers
          Subscriber
-> SubscriberId -> TMap SubscriberId Subscriber -> STM ()
forall k v. Hashable k => v -> k -> TMap k v -> STM ()
TMap.insert Subscriber
subscriber SubscriberId
subId TMap SubscriberId Subscriber
curOpsTV
          SubscriberId -> TMap SubscriberId Subscriber -> STM ()
forall k v. Hashable k => k -> TMap k v -> STM ()
TMap.delete SubscriberId
subId TMap SubscriberId Subscriber
newOpsTV

    getCohortSnapshot :: (CohortVariables, Cohort streamCursorVars)
-> STM (CohortId, (CohortSnapshot, Cohort streamCursorVars))
getCohortSnapshot (CohortVariables
cohortVars, Cohort streamCursorVars
cohort) = do
      let C.Cohort CohortId
resId TVar (Maybe ResponseHash)
respRef TMap SubscriberId Subscriber
curOpsTV TMap SubscriberId Subscriber
newOpsTV streamCursorVars
_ = Cohort streamCursorVars
cohort
      [(SubscriberId, Subscriber)]
curOpsL <- TMap SubscriberId Subscriber -> STM [(SubscriberId, Subscriber)]
forall k v. TMap k v -> STM [(k, v)]
TMap.toList TMap SubscriberId Subscriber
curOpsTV
      [(SubscriberId, Subscriber)]
newOpsL <- TMap SubscriberId Subscriber -> STM [(SubscriberId, Subscriber)]
forall k v. TMap k v -> STM [(k, v)]
TMap.toList TMap SubscriberId Subscriber
newOpsTV
      let cohortSnapshot :: CohortSnapshot
cohortSnapshot = CohortVariables
-> TVar (Maybe ResponseHash)
-> [Subscriber]
-> [Subscriber]
-> CohortSnapshot
C.CohortSnapshot CohortVariables
cohortVars TVar (Maybe ResponseHash)
respRef (((SubscriberId, Subscriber) -> Subscriber)
-> [(SubscriberId, Subscriber)] -> [Subscriber]
forall a b. (a -> b) -> [a] -> [b]
map (SubscriberId, Subscriber) -> Subscriber
forall a b. (a, b) -> b
snd [(SubscriberId, Subscriber)]
curOpsL) (((SubscriberId, Subscriber) -> Subscriber)
-> [(SubscriberId, Subscriber)] -> [Subscriber]
forall a b. (a -> b) -> [a] -> [b]
map (SubscriberId, Subscriber) -> Subscriber
forall a b. (a, b) -> b
snd [(SubscriberId, Subscriber)]
newOpsL)
      (CohortId, (CohortSnapshot, Cohort streamCursorVars))
-> STM (CohortId, (CohortSnapshot, Cohort streamCursorVars))
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (CohortId
resId, (CohortSnapshot
cohortSnapshot, Cohort streamCursorVars
cohort))

    mergeCohorts :: Cohort 'Streaming -> Cohort 'Streaming -> STM.STM (Cohort 'Streaming)
    mergeCohorts :: Cohort 'Streaming -> Cohort 'Streaming -> STM (Cohort 'Streaming)
mergeCohorts Cohort 'Streaming
newCohort Cohort 'Streaming
oldCohort = do
      let newCohortExistingSubscribers :: TMap SubscriberId Subscriber
newCohortExistingSubscribers = Cohort (TVar CursorVariableValues) -> TMap SubscriberId Subscriber
forall streamCursorVars.
Cohort streamCursorVars -> TMap SubscriberId Subscriber
C._cExistingSubscribers Cohort (TVar CursorVariableValues)
Cohort 'Streaming
newCohort
          oldCohortExistingSubscribers :: TMap SubscriberId Subscriber
oldCohortExistingSubscribers = Cohort (TVar CursorVariableValues) -> TMap SubscriberId Subscriber
forall streamCursorVars.
Cohort streamCursorVars -> TMap SubscriberId Subscriber
C._cExistingSubscribers Cohort (TVar CursorVariableValues)
Cohort 'Streaming
oldCohort
          newCohortNewSubscribers :: TMap SubscriberId Subscriber
newCohortNewSubscribers = Cohort (TVar CursorVariableValues) -> TMap SubscriberId Subscriber
forall streamCursorVars.
Cohort streamCursorVars -> TMap SubscriberId Subscriber
C._cNewSubscribers Cohort (TVar CursorVariableValues)
Cohort 'Streaming
newCohort
          oldCohortNewSubscribers :: TMap SubscriberId Subscriber
oldCohortNewSubscribers = Cohort (TVar CursorVariableValues) -> TMap SubscriberId Subscriber
forall streamCursorVars.
Cohort streamCursorVars -> TMap SubscriberId Subscriber
C._cNewSubscribers Cohort (TVar CursorVariableValues)
Cohort 'Streaming
oldCohort
      TMap SubscriberId Subscriber
mergedExistingSubscribers <- TMap SubscriberId Subscriber
-> TMap SubscriberId Subscriber
-> STM (TMap SubscriberId Subscriber)
forall k v. Hashable k => TMap k v -> TMap k v -> STM (TMap k v)
TMap.union TMap SubscriberId Subscriber
newCohortExistingSubscribers TMap SubscriberId Subscriber
oldCohortExistingSubscribers
      TMap SubscriberId Subscriber
mergedNewSubscribers <- TMap SubscriberId Subscriber
-> TMap SubscriberId Subscriber
-> STM (TMap SubscriberId Subscriber)
forall k v. Hashable k => TMap k v -> TMap k v -> STM (TMap k v)
TMap.union TMap SubscriberId Subscriber
newCohortNewSubscribers TMap SubscriberId Subscriber
oldCohortNewSubscribers
      Cohort (TVar CursorVariableValues)
-> STM (Cohort (TVar CursorVariableValues))
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
        (Cohort (TVar CursorVariableValues)
 -> STM (Cohort (TVar CursorVariableValues)))
-> Cohort (TVar CursorVariableValues)
-> STM (Cohort (TVar CursorVariableValues))
forall a b. (a -> b) -> a -> b
$ Cohort (TVar CursorVariableValues)
Cohort 'Streaming
newCohort
          { _cNewSubscribers :: TMap SubscriberId Subscriber
C._cNewSubscribers = TMap SubscriberId Subscriber
mergedNewSubscribers,
            _cExistingSubscribers :: TMap SubscriberId Subscriber
C._cExistingSubscribers = TMap SubscriberId Subscriber
mergedExistingSubscribers
          }

    getCohortOperations :: [(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))]
-> Either QErr [(CohortId, ByteString, CursorVariableValues)]
-> [(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
     Maybe CursorVariableValues,
     (CohortSnapshot, Cohort (TVar CursorVariableValues)))]
getCohortOperations [(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))]
cohorts = \case
      Left QErr
e ->
        let resp :: GQResult ByteString
resp = GQExecError -> GQResult ByteString
forall a. GQExecError -> Either GQExecError a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (GQExecError -> GQResult ByteString)
-> GQExecError -> GQResult ByteString
forall a b. (a -> b) -> a -> b
$ [Encoding] -> GQExecError
GQExecError [Bool -> QErr -> Encoding
encodeGQLErr Bool
False QErr
e]
         in [(GQResult ByteString
resp, CohortId
cohortId, Maybe (ResponseHash, Int)
forall a. Maybe a
Nothing, Maybe CursorVariableValues
forall a. Maybe a
Nothing, (CohortSnapshot, Cohort (TVar CursorVariableValues))
snapshot) | (CohortId
cohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues))
snapshot) <- [(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))]
cohorts]
      Right [(CohortId, ByteString, CursorVariableValues)]
responses -> do
        let cohortSnapshotMap :: HashMap
  CohortId (CohortSnapshot, Cohort (TVar CursorVariableValues))
cohortSnapshotMap = [(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))]
-> HashMap
     CohortId (CohortSnapshot, Cohort (TVar CursorVariableValues))
forall k v. (Eq k, Hashable k) => [(k, v)] -> HashMap k v
HashMap.fromList [(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))]
cohorts
        -- every row of the response will contain the cohortId, response of the query and the latest value of the cursor for the cohort
        (((CohortId, ByteString, CursorVariableValues)
  -> Maybe
       (GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
        Maybe CursorVariableValues,
        (CohortSnapshot, Cohort (TVar CursorVariableValues))))
 -> [(CohortId, ByteString, CursorVariableValues)]
 -> [(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
      Maybe CursorVariableValues,
      (CohortSnapshot, Cohort (TVar CursorVariableValues)))])
-> [(CohortId, ByteString, CursorVariableValues)]
-> ((CohortId, ByteString, CursorVariableValues)
    -> Maybe
         (GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
          Maybe CursorVariableValues,
          (CohortSnapshot, Cohort (TVar CursorVariableValues))))
-> [(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
     Maybe CursorVariableValues,
     (CohortSnapshot, Cohort (TVar CursorVariableValues)))]
forall a b c. (a -> b -> c) -> b -> a -> c
flip ((CohortId, ByteString, CursorVariableValues)
 -> Maybe
      (GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
       Maybe CursorVariableValues,
       (CohortSnapshot, Cohort (TVar CursorVariableValues))))
-> [(CohortId, ByteString, CursorVariableValues)]
-> [(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
     Maybe CursorVariableValues,
     (CohortSnapshot, Cohort (TVar CursorVariableValues)))]
forall a b. (a -> Maybe b) -> [a] -> [b]
forall (f :: * -> *) a b.
Filterable f =>
(a -> Maybe b) -> f a -> f b
mapMaybe [(CohortId, ByteString, CursorVariableValues)]
responses (((CohortId, ByteString, CursorVariableValues)
  -> Maybe
       (GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
        Maybe CursorVariableValues,
        (CohortSnapshot, Cohort (TVar CursorVariableValues))))
 -> [(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
      Maybe CursorVariableValues,
      (CohortSnapshot, Cohort (TVar CursorVariableValues)))])
-> ((CohortId, ByteString, CursorVariableValues)
    -> Maybe
         (GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
          Maybe CursorVariableValues,
          (CohortSnapshot, Cohort (TVar CursorVariableValues))))
-> [(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
     Maybe CursorVariableValues,
     (CohortSnapshot, Cohort (TVar CursorVariableValues)))]
forall a b. (a -> b) -> a -> b
$ \(CohortId
cohortId, ByteString
respBS, CursorVariableValues
respCursorLatestValue) ->
          let respHash :: ResponseHash
respHash = ByteString -> ResponseHash
mkRespHash ByteString
respBS
              respSize :: Int
respSize = ByteString -> Int
BS.length ByteString
respBS
           in -- TODO: currently we ignore the cases when the cohortId from
              -- Postgres response is not present in the cohort map of this batch
              -- (this shouldn't happen but if it happens it means a logic error and
              -- we should log it)
              (ByteString -> GQResult ByteString
forall a. a -> Either GQExecError a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe (Endo Value) -> ByteString -> ByteString
applyModifier Maybe (Endo Value)
modifier ByteString
respBS),CohortId
cohortId,(ResponseHash, Int) -> Maybe (ResponseHash, Int)
forall a. a -> Maybe a
Just (ResponseHash
respHash, Int
respSize),CursorVariableValues -> Maybe CursorVariableValues
forall a. a -> Maybe a
Just CursorVariableValues
respCursorLatestValue,)
                ((CohortSnapshot, Cohort (TVar CursorVariableValues))
 -> (GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
     Maybe CursorVariableValues,
     (CohortSnapshot, Cohort (TVar CursorVariableValues))))
-> Maybe (CohortSnapshot, Cohort (TVar CursorVariableValues))
-> Maybe
     (GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
      Maybe CursorVariableValues,
      (CohortSnapshot, Cohort (TVar CursorVariableValues)))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> CohortId
-> HashMap
     CohortId (CohortSnapshot, Cohort (TVar CursorVariableValues))
-> Maybe (CohortSnapshot, Cohort (TVar CursorVariableValues))
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HashMap.lookup CohortId
cohortId HashMap
  CohortId (CohortSnapshot, Cohort (TVar CursorVariableValues))
cohortSnapshotMap