{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE TemplateHaskell #-}

module Hasura.GraphQL.Execute.Subscription.Poll.StreamingQuery
  ( -- * Pollers
    pollStreamingQuery,
  )
where

import Control.Concurrent.Async qualified as A
import Control.Concurrent.STM qualified as STM
import Control.Lens
import Data.ByteString qualified as BS
import Data.HashMap.Strict qualified as Map
import Data.HashMap.Strict.Extended qualified as Map
import Data.HashSet qualified as Set
import Data.List.Split (chunksOf)
import Data.Monoid (Sum (..))
import Data.Text.Extended
import GHC.AssertNF.CPP
import Hasura.Base.Error
import Hasura.GraphQL.Execute.Backend
import Hasura.GraphQL.Execute.Subscription.Options
import Hasura.GraphQL.Execute.Subscription.Plan
import Hasura.GraphQL.Execute.Subscription.Plan qualified as C
import Hasura.GraphQL.Execute.Subscription.Poll.Common hiding (Cohort (..), CohortMap, CohortSnapshot (..))
import Hasura.GraphQL.Execute.Subscription.Poll.Common qualified as C
import Hasura.GraphQL.Execute.Subscription.TMap qualified as TMap
import Hasura.GraphQL.Execute.Subscription.Types
import Hasura.GraphQL.ParameterizedQueryHash (ParameterizedQueryHash)
import Hasura.GraphQL.Transport.Backend
import Hasura.GraphQL.Transport.HTTP.Protocol
import Hasura.Prelude
import Hasura.RQL.Types.Backend
import Hasura.RQL.Types.Common (SourceName)
import Hasura.RQL.Types.Numeric qualified as Numeric
import Hasura.RQL.Types.Subscription (SubscriptionType (..))
import Hasura.SQL.Value (TxtEncodedVal (..))
import Hasura.Session
import Language.GraphQL.Draft.Syntax qualified as G
import Text.Shakespeare.Text (st)

{- Note [Streaming subscriptions rebuilding cohort map]
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

When the multiplexed query is polled, the cohort is snapshotted to get the
existing and newly added subscribers (if any), let's call these subscribers as
current poll subscribers.

Every cohort is associated with a cohort key and the cohort key is basically the
variables the cohort uses. The cohort variables contains multiple types of
variables in it, session variables, query variables, synthetic variables and
cursor variables, out of all these, the cursor variable may change with every
poll. So, we rebuild the cohort map at the end of every poll, see Note
[Streaming subscription polling]. But, rebuilding the cohort map is not straight
forward because there are race conditions that need to be taken care of. Some of
the race conditions which can happen when the current cohorts are processing:

1. A cohort is removed concurrently
2. A subscriber is removed concurrently
3. A new subscriber has started a subscription and it should be placed in the correct cohort

In the current code, the rebuilding of the cohort map goes as the follows:

1. After snapshotting the cohorts, we build a cohort map out of those cohorts,
   in the code these are called as "processedCohorts", it's important to note
   that these are not retrieved from the mutable "CohortMap", these are the
   snapshotted cohorts which were processed in the current poll. The reason we
   maintain the snapshotted cohorts is because it is later used while rebuilding
   the cohort map.

2. We create a processed cohort map which looks like HashMap CohortKey (Cohort
   'Streaming, CohortKey). The key of the map is the CohortKey which was
   associated with the cohort during the poll and the CohortKey in the value
   type is the cohort key after updating the cursor value. Note that both the
   values may or may not be equal.

3. We atomically get the list of the cohorts from the cohort map (mutable
reference), let's call it current cohorts and then traverse over it.

   1. Lookup with the given cohort key into the processed cohort map

      a. If no cohort is found, it means that the cohort with the given cohort
         key has been added while we were polling. So, we keep this as it is.

      b. If a processed cohort is found:

         i. We have to see if any new subscribers have been added to the current
            cohort, this is calculated using the diff of existing subscribers in
            the current cohort and the existing cohort, if there are any then we
            create a new cohort which includes only the newly added subscribers
            and add the new cohort into the cohort map.

         ii. We only retain the subscribers found in the processed cohort which
             exist in the current cohort. We do this because it is possible that
             a subscriber has been stopped their subscription in the time
             between the cohorts were snapshotted for processing and the time
             the cohort map is rebuilt.

         iii. Insert the processed cohort with the updated cohort key into the
         cohort map.
-}

{- Note [Streaming subscription polling]
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Every cohort of a streaming subscription is associated with a mutable latest
cursor value reference, which contains the current value of the cursor.

After a poll, the mutable cursor value gets updated if its value is a non-null
one, null value means that there were no rows to get the min/max value from.

After this, the cohort map associated with the poller is also rebuilt, which
will make sure that the cohort map's key contains the latest cursor values. So,
any new subscriber will get added to an existing cohort if the new subscriber's
cohort key matches with any of the existing cohorts. We *need* to rebuild the
cohort map, because, say if we don't rebuild the cohort map then a new
subscriber may get added to a cohort which has been streaming for a while now,
then the new subscriber will get the responses according to the cursor value
stored in the cohort, instead of the initial value specified by the client. For
example:

Client 1 started streaming a query at t1, say:

```
   subscription {
      log_stream(cursor: {initial_value: {created_at: "2020-01-01"}}, batch_size: 1000) {
         id
         level
         detail
     }
   }
```

Client 2 starts streaming at t2 with the same query (where t2 > t1), if the
cohort map is not rebuilt (to reflect cursor value in the cohort key), then
client 2 and client 1 will both start getting the same responses, which is wrong
because client 2 should start streaming from the initial value it provided.

-}

{- Note [Lifecycle of a streaming subscription poller]
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  +-------------------------------------------+           +-----------------------------------------------------------------+
  | Execute multiplexed query in the database | ------->  | Parse the response, every row of response contains three things:|
  +-------------------------------------------+           |  1. Cohort ID                                                   |
             ^                                            |  2. Cohort's Response                                           |
             |                                            |  3. Cohort's latest cursor value                                |
             |                                            +-----------------------------------------------------------------+
             |                                                                            |
             |                                                                            |
             |                                                                            v
             |                                            +------------------------------------------------------------------------+
             |                                            | Processing of the response:                                            |
             |                                            | 1. Send the result to the subscribers                                  |
   +------------------------------------+                 | 2. Update the cursor value in the mutable reference                    |
   | Rebuild the cohort map             |                 |    of the snapshot so that the next poll uses this value               |
   +------------------------------------+        <------  +------------------------------------------------------------------------+

-}

mergeOldAndNewCursorValues :: CursorVariableValues -> CursorVariableValues -> CursorVariableValues
mergeOldAndNewCursorValues :: CursorVariableValues
-> CursorVariableValues -> CursorVariableValues
mergeOldAndNewCursorValues (CursorVariableValues HashMap Name TxtEncodedVal
prevPollCursorValues) (CursorVariableValues HashMap Name TxtEncodedVal
currentPollCursorValues) =
  let combineFn :: TxtEncodedVal -> TxtEncodedVal -> TxtEncodedVal
combineFn TxtEncodedVal
previousVal TxtEncodedVal
currentVal =
        case TxtEncodedVal
currentVal of
          TxtEncodedVal
TENull -> TxtEncodedVal
previousVal -- When we get a null value from the DB, we retain the previous value
          TELit Text
t -> Text -> TxtEncodedVal
TELit Text
t
   in HashMap Name TxtEncodedVal -> CursorVariableValues
CursorVariableValues (HashMap Name TxtEncodedVal -> CursorVariableValues)
-> HashMap Name TxtEncodedVal -> CursorVariableValues
forall a b. (a -> b) -> a -> b
$ (TxtEncodedVal -> TxtEncodedVal -> TxtEncodedVal)
-> HashMap Name TxtEncodedVal
-> HashMap Name TxtEncodedVal
-> HashMap Name TxtEncodedVal
forall k v.
(Eq k, Hashable k) =>
(v -> v -> v) -> HashMap k v -> HashMap k v -> HashMap k v
Map.unionWith TxtEncodedVal -> TxtEncodedVal -> TxtEncodedVal
combineFn HashMap Name TxtEncodedVal
prevPollCursorValues HashMap Name TxtEncodedVal
currentPollCursorValues

pushResultToCohort ::
  GQResult BS.ByteString ->
  Maybe ResponseHash ->
  SubscriptionMetadata ->
  CursorVariableValues ->
  -- | Root field name
  G.Name ->
  -- | subscribers to which data has been pushed, subscribers which already
  -- have this data (this information is exposed by metrics reporting)
  (CohortSnapshot 'Streaming, Cohort 'Streaming) ->
  IO ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
pushResultToCohort :: GQResult ByteString
-> Maybe ResponseHash
-> SubscriptionMetadata
-> CursorVariableValues
-> Name
-> (CohortSnapshot 'Streaming, Cohort 'Streaming)
-> IO ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
pushResultToCohort GQResult ByteString
result !Maybe ResponseHash
respHashM (SubscriptionMetadata DiffTime
dTime) CursorVariableValues
cursorValues Name
rootFieldName (CohortSnapshot 'Streaming
cohortSnapshot, Cohort 'Streaming
cohort) = do
  Maybe ResponseHash
prevRespHashM <- TVar (Maybe ResponseHash) -> IO (Maybe ResponseHash)
forall a. TVar a -> IO a
STM.readTVarIO TVar (Maybe ResponseHash)
respRef
  -- write to the current websockets if needed
  ([Subscriber]
subscribersToPush, [Subscriber]
subscribersToIgnore) <-
    -- We send the response to all the subscribers only when the response changes.
    if GQResult ByteString -> Bool
forall a. GQResult a -> Bool
isExecError GQResult ByteString
result Bool -> Bool -> Bool
|| Maybe ResponseHash
respHashM Maybe ResponseHash -> Maybe ResponseHash -> Bool
forall a. Eq a => a -> a -> Bool
/= Maybe ResponseHash
prevRespHashM
      then do
        String
String -> Maybe ResponseHash -> IO ()
forall a. String -> a -> IO ()
$assertNFHere Maybe ResponseHash
respHashM -- so we don't write thunks to mutable vars
        STM ([Subscriber], [Subscriber]) -> IO ([Subscriber], [Subscriber])
forall a. STM a -> IO a
STM.atomically (STM ([Subscriber], [Subscriber])
 -> IO ([Subscriber], [Subscriber]))
-> STM ([Subscriber], [Subscriber])
-> IO ([Subscriber], [Subscriber])
forall a b. (a -> b) -> a -> b
$ do
          TVar (Maybe ResponseHash) -> Maybe ResponseHash -> STM ()
forall a. TVar a -> a -> STM ()
STM.writeTVar TVar (Maybe ResponseHash)
respRef Maybe ResponseHash
respHashM
          TVar CursorVariableValues -> CursorVariableValues -> STM ()
forall a. TVar a -> a -> STM ()
STM.writeTVar (Cohort (TVar CursorVariableValues) -> TVar CursorVariableValues
forall streamCursorVars.
Cohort streamCursorVars -> streamCursorVars
C._cStreamCursorVariables Cohort (TVar CursorVariableValues)
Cohort 'Streaming
cohort) CursorVariableValues
cursorValues
          ([Subscriber], [Subscriber]) -> STM ([Subscriber], [Subscriber])
forall (m :: * -> *) a. Monad m => a -> m a
return ([Subscriber]
newSinks [Subscriber] -> [Subscriber] -> [Subscriber]
forall a. Semigroup a => a -> a -> a
<> [Subscriber]
curSinks, [Subscriber]
forall a. Monoid a => a
mempty)
      else -- when the response is unchanged, the response is only sent to the newly added subscribers
        ([Subscriber], [Subscriber]) -> IO ([Subscriber], [Subscriber])
forall (m :: * -> *) a. Monad m => a -> m a
return ([Subscriber]
newSinks, [Subscriber]
curSinks)
  [Subscriber] -> IO ()
pushResultToSubscribers [Subscriber]
subscribersToPush
  ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
-> IO ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
forall (f :: * -> *) a. Applicative f => a -> f a
pure (([SubscriberExecutionDetails], [SubscriberExecutionDetails])
 -> IO ([SubscriberExecutionDetails], [SubscriberExecutionDetails]))
-> ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
-> IO ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
forall a b. (a -> b) -> a -> b
$
    ASetter
  ([Subscriber], [Subscriber])
  ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
  Subscriber
  SubscriberExecutionDetails
-> (Subscriber -> SubscriberExecutionDetails)
-> ([Subscriber], [Subscriber])
-> ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
forall s t a b. ASetter s t a b -> (a -> b) -> s -> t
over
      (([Subscriber] -> Identity [SubscriberExecutionDetails])
-> ([Subscriber], [Subscriber])
-> Identity
     ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
forall s t a b. Each s t a b => Traversal s t a b
each (([Subscriber] -> Identity [SubscriberExecutionDetails])
 -> ([Subscriber], [Subscriber])
 -> Identity
      ([SubscriberExecutionDetails], [SubscriberExecutionDetails]))
-> ((Subscriber -> Identity SubscriberExecutionDetails)
    -> [Subscriber] -> Identity [SubscriberExecutionDetails])
-> ASetter
     ([Subscriber], [Subscriber])
     ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
     Subscriber
     SubscriberExecutionDetails
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Subscriber -> Identity SubscriberExecutionDetails)
-> [Subscriber] -> Identity [SubscriberExecutionDetails]
forall s t a b. Each s t a b => Traversal s t a b
each)
      ( \Subscriber {Maybe OperationName
RequestId
SubscriberMetadata
SubscriberId
OnChange
_sOnChangeCallback :: Subscriber -> OnChange
_sOperationName :: Subscriber -> Maybe OperationName
_sRequestId :: Subscriber -> RequestId
_sMetadata :: Subscriber -> SubscriberMetadata
_sId :: Subscriber -> SubscriberId
_sOnChangeCallback :: OnChange
_sOperationName :: Maybe OperationName
_sRequestId :: RequestId
_sMetadata :: SubscriberMetadata
_sId :: SubscriberId
..} ->
          SubscriberId -> SubscriberMetadata -> SubscriberExecutionDetails
SubscriberExecutionDetails SubscriberId
_sId SubscriberMetadata
_sMetadata
      )
      ([Subscriber]
subscribersToPush, [Subscriber]
subscribersToIgnore)
  where
    rootFieldNameText :: Text
rootFieldNameText = Name -> Text
G.unName Name
rootFieldName
    -- we want to know if the DB response is an empty array and if it is, then we don't send anything
    -- to the client. We could do this by parsing the response and check for an empty list, but
    -- this will have performance impact when large responses are parsed. Instead, we compare
    -- the DB response to a templated empty array response.

    -- We are using templating instead of doing something like
    -- J.encode $ J.object [ rootFieldNameText J..= [] :: [J.Value] ]
    -- is because, unfortunately, the value returned by the above is
    -- {<rootFieldNameText>:[]} (notice the lack of spaces). So, instead
    -- we're templating according to the format postgres sends JSON responses.
    emptyRespBS :: ByteString
emptyRespBS = Text -> ByteString
txtToBs (Text -> ByteString) -> Text -> ByteString
forall a b. (a -> b) -> a -> b
$ [st|{"#{rootFieldNameText}" : []}|]

    isResponseEmpty :: Bool
isResponseEmpty = GQResult ByteString
result GQResult ByteString -> GQResult ByteString -> Bool
forall a. Eq a => a -> a -> Bool
== ByteString -> GQResult ByteString
forall a b. b -> Either a b
Right ByteString
emptyRespBS

    C.CohortSnapshot CohortVariables
_ TVar (Maybe ResponseHash)
respRef [Subscriber]
curSinks [Subscriber]
newSinks = CohortSnapshot
CohortSnapshot 'Streaming
cohortSnapshot

    response :: Either GQExecError SubscriptionResponse
response = GQResult ByteString
result GQResult ByteString
-> (ByteString -> SubscriptionResponse)
-> Either GQExecError SubscriptionResponse
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \ByteString
payload -> ByteString -> DiffTime -> SubscriptionResponse
SubscriptionResponse ByteString
payload DiffTime
dTime

    pushResultToSubscribers :: [Subscriber] -> IO ()
pushResultToSubscribers [Subscriber]
subscribers =
      Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
isResponseEmpty (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
        ((Subscriber -> IO ()) -> [Subscriber] -> IO ())
-> [Subscriber] -> (Subscriber -> IO ()) -> IO ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip (Subscriber -> IO ()) -> [Subscriber] -> IO ()
forall (f :: * -> *) a b. Foldable f => (a -> IO b) -> f a -> IO ()
A.mapConcurrently_ [Subscriber]
subscribers ((Subscriber -> IO ()) -> IO ()) -> (Subscriber -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Subscriber {Maybe OperationName
RequestId
SubscriberMetadata
SubscriberId
OnChange
_sOnChangeCallback :: OnChange
_sOperationName :: Maybe OperationName
_sRequestId :: RequestId
_sMetadata :: SubscriberMetadata
_sId :: SubscriberId
_sOnChangeCallback :: Subscriber -> OnChange
_sOperationName :: Subscriber -> Maybe OperationName
_sRequestId :: Subscriber -> RequestId
_sMetadata :: Subscriber -> SubscriberMetadata
_sId :: Subscriber -> SubscriberId
..} -> OnChange
_sOnChangeCallback Either GQExecError SubscriptionResponse
response

-- | A single iteration of the streaming query polling loop. Invocations on the
-- same mutable objects may race.
pollStreamingQuery ::
  forall b.
  BackendTransport b =>
  PollerId ->
  SubscriptionsOptions ->
  (SourceName, SourceConfig b) ->
  RoleName ->
  ParameterizedQueryHash ->
  MultiplexedQuery b ->
  CohortMap 'Streaming ->
  G.Name ->
  SubscriptionPostPollHook ->
  Maybe (IO ()) -> -- Optional IO action to make this function (pollStreamingQuery) testable
  IO ()
pollStreamingQuery :: PollerId
-> SubscriptionsOptions
-> (SourceName, SourceConfig b)
-> RoleName
-> ParameterizedQueryHash
-> MultiplexedQuery b
-> CohortMap 'Streaming
-> Name
-> SubscriptionPostPollHook
-> Maybe (IO ())
-> IO ()
pollStreamingQuery PollerId
pollerId SubscriptionsOptions
lqOpts (SourceName
sourceName, SourceConfig b
sourceConfig) RoleName
roleName ParameterizedQueryHash
parameterizedQueryHash MultiplexedQuery b
query CohortMap 'Streaming
cohortMap Name
rootFieldName SubscriptionPostPollHook
postPollHook Maybe (IO ())
testActionMaybe = do
  (DiffTime
totalTime, (DiffTime
snapshotTime, [(BatchExecutionDetails,
  [(CohortVariables,
    (Cohort (TVar CursorVariableValues), CohortVariables,
     [Subscriber]))])]
batchesDetailsAndProcessedCohorts)) <- IO
  (DiffTime,
   [(BatchExecutionDetails,
     [(CohortVariables,
       (Cohort (TVar CursorVariableValues), CohortVariables,
        [Subscriber]))])])
-> IO
     (DiffTime,
      (DiffTime,
       [(BatchExecutionDetails,
         [(CohortVariables,
           (Cohort (TVar CursorVariableValues), CohortVariables,
            [Subscriber]))])]))
forall (m :: * -> *) a. MonadIO m => m a -> m (DiffTime, a)
withElapsedTime (IO
   (DiffTime,
    [(BatchExecutionDetails,
      [(CohortVariables,
        (Cohort (TVar CursorVariableValues), CohortVariables,
         [Subscriber]))])])
 -> IO
      (DiffTime,
       (DiffTime,
        [(BatchExecutionDetails,
          [(CohortVariables,
            (Cohort (TVar CursorVariableValues), CohortVariables,
             [Subscriber]))])])))
-> IO
     (DiffTime,
      [(BatchExecutionDetails,
        [(CohortVariables,
          (Cohort (TVar CursorVariableValues), CohortVariables,
           [Subscriber]))])])
-> IO
     (DiffTime,
      (DiffTime,
       [(BatchExecutionDetails,
         [(CohortVariables,
           (Cohort (TVar CursorVariableValues), CohortVariables,
            [Subscriber]))])]))
forall a b. (a -> b) -> a -> b
$ do
    -- snapshot the current cohorts and split them into batches
    -- This STM transaction is a read only transaction i.e. it doesn't mutate any state
    (DiffTime
snapshotTime, [(BatchId,
  [(CohortId,
    (CohortSnapshot, Cohort (TVar CursorVariableValues)))])]
cohortBatches) <- IO
  [(BatchId,
    [(CohortId,
      (CohortSnapshot, Cohort (TVar CursorVariableValues)))])]
-> IO
     (DiffTime,
      [(BatchId,
        [(CohortId,
          (CohortSnapshot, Cohort (TVar CursorVariableValues)))])])
forall (m :: * -> *) a. MonadIO m => m a -> m (DiffTime, a)
withElapsedTime (IO
   [(BatchId,
     [(CohortId,
       (CohortSnapshot, Cohort (TVar CursorVariableValues)))])]
 -> IO
      (DiffTime,
       [(BatchId,
         [(CohortId,
           (CohortSnapshot, Cohort (TVar CursorVariableValues)))])]))
-> IO
     [(BatchId,
       [(CohortId,
         (CohortSnapshot, Cohort (TVar CursorVariableValues)))])]
-> IO
     (DiffTime,
      [(BatchId,
        [(CohortId,
          (CohortSnapshot, Cohort (TVar CursorVariableValues)))])])
forall a b. (a -> b) -> a -> b
$ do
      -- get a snapshot of all the cohorts
      -- this need not be done in a transaction
      [(CohortVariables, Cohort (TVar CursorVariableValues))]
cohorts <- STM [(CohortVariables, Cohort (TVar CursorVariableValues))]
-> IO [(CohortVariables, Cohort (TVar CursorVariableValues))]
forall a. STM a -> IO a
STM.atomically (STM [(CohortVariables, Cohort (TVar CursorVariableValues))]
 -> IO [(CohortVariables, Cohort (TVar CursorVariableValues))])
-> STM [(CohortVariables, Cohort (TVar CursorVariableValues))]
-> IO [(CohortVariables, Cohort (TVar CursorVariableValues))]
forall a b. (a -> b) -> a -> b
$ TMap CohortVariables (Cohort (TVar CursorVariableValues))
-> STM [(CohortVariables, Cohort (TVar CursorVariableValues))]
forall k v. TMap k v -> STM [(k, v)]
TMap.toList TMap CohortVariables (Cohort (TVar CursorVariableValues))
CohortMap 'Streaming
cohortMap
      [(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))]
cohortSnapshots <- ((CohortVariables, Cohort (TVar CursorVariableValues))
 -> IO
      (CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues))))
-> [(CohortVariables, Cohort (TVar CursorVariableValues))]
-> IO
     [(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM (STM
  (CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))
-> IO
     (CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))
forall a. STM a -> IO a
STM.atomically (STM
   (CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))
 -> IO
      (CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues))))
-> ((CohortVariables, Cohort (TVar CursorVariableValues))
    -> STM
         (CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues))))
-> (CohortVariables, Cohort (TVar CursorVariableValues))
-> IO
     (CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (CohortVariables, Cohort (TVar CursorVariableValues))
-> STM
     (CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))
forall streamCursorVars.
(CohortVariables, Cohort streamCursorVars)
-> STM (CohortId, (CohortSnapshot, Cohort streamCursorVars))
getCohortSnapshot) [(CohortVariables, Cohort (TVar CursorVariableValues))]
cohorts
      -- cohorts are broken down into batches specified by the batch size
      let cohortBatches :: [[(CohortId,
   (CohortSnapshot, Cohort (TVar CursorVariableValues)))]]
cohortBatches = Int
-> [(CohortId,
     (CohortSnapshot, Cohort (TVar CursorVariableValues)))]
-> [[(CohortId,
      (CohortSnapshot, Cohort (TVar CursorVariableValues)))]]
forall e. Int -> [e] -> [[e]]
chunksOf (NonNegativeInt -> Int
Numeric.getNonNegativeInt (BatchSize -> NonNegativeInt
unBatchSize BatchSize
batchSize)) [(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))]
cohortSnapshots
      -- associating every batch with their BatchId
      [(BatchId,
  [(CohortId,
    (CohortSnapshot, Cohort (TVar CursorVariableValues)))])]
-> IO
     [(BatchId,
       [(CohortId,
         (CohortSnapshot, Cohort (TVar CursorVariableValues)))])]
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([(BatchId,
   [(CohortId,
     (CohortSnapshot, Cohort (TVar CursorVariableValues)))])]
 -> IO
      [(BatchId,
        [(CohortId,
          (CohortSnapshot, Cohort (TVar CursorVariableValues)))])])
-> [(BatchId,
     [(CohortId,
       (CohortSnapshot, Cohort (TVar CursorVariableValues)))])]
-> IO
     [(BatchId,
       [(CohortId,
         (CohortSnapshot, Cohort (TVar CursorVariableValues)))])]
forall a b. (a -> b) -> a -> b
$ [BatchId]
-> [[(CohortId,
      (CohortSnapshot, Cohort (TVar CursorVariableValues)))]]
-> [(BatchId,
     [(CohortId,
       (CohortSnapshot, Cohort (TVar CursorVariableValues)))])]
forall a b. [a] -> [b] -> [(a, b)]
zip (Int -> BatchId
BatchId (Int -> BatchId) -> [Int] -> [BatchId]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [Int
1 ..]) [[(CohortId,
   (CohortSnapshot, Cohort (TVar CursorVariableValues)))]]
cohortBatches

    Maybe (IO ()) -> (IO () -> IO ()) -> IO ()
forall (m :: * -> *) a.
Applicative m =>
Maybe a -> (a -> m ()) -> m ()
onJust Maybe (IO ())
testActionMaybe IO () -> IO ()
forall a. a -> a
id -- IO action intended to run after the cohorts have been snapshotted

    -- concurrently process each batch and also get the processed cohort with the new updated cohort key
    [(BatchExecutionDetails,
  [(CohortVariables,
    (Cohort (TVar CursorVariableValues), CohortVariables,
     [Subscriber]))])]
batchesDetailsAndProcessedCohorts <- [(BatchId,
  [(CohortId,
    (CohortSnapshot, Cohort (TVar CursorVariableValues)))])]
-> ((BatchId,
     [(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))])
    -> IO
         (BatchExecutionDetails,
          [(CohortVariables,
            (Cohort (TVar CursorVariableValues), CohortVariables,
             [Subscriber]))]))
-> IO
     [(BatchExecutionDetails,
       [(CohortVariables,
         (Cohort (TVar CursorVariableValues), CohortVariables,
          [Subscriber]))])]
forall (t :: * -> *) a b.
Traversable t =>
t a -> (a -> IO b) -> IO (t b)
A.forConcurrently [(BatchId,
  [(CohortId,
    (CohortSnapshot, Cohort (TVar CursorVariableValues)))])]
cohortBatches (((BatchId,
   [(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))])
  -> IO
       (BatchExecutionDetails,
        [(CohortVariables,
          (Cohort (TVar CursorVariableValues), CohortVariables,
           [Subscriber]))]))
 -> IO
      [(BatchExecutionDetails,
        [(CohortVariables,
          (Cohort (TVar CursorVariableValues), CohortVariables,
           [Subscriber]))])])
-> ((BatchId,
     [(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))])
    -> IO
         (BatchExecutionDetails,
          [(CohortVariables,
            (Cohort (TVar CursorVariableValues), CohortVariables,
             [Subscriber]))]))
-> IO
     [(BatchExecutionDetails,
       [(CohortVariables,
         (Cohort (TVar CursorVariableValues), CohortVariables,
          [Subscriber]))])]
forall a b. (a -> b) -> a -> b
$ \(BatchId
batchId, [(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))]
cohorts) -> do
      (DiffTime
queryExecutionTime, Either QErr [(CohortId, ByteString, CursorVariableValues)]
mxRes) <-
        SourceConfig b
-> MultiplexedQuery b
-> [(CohortId, CohortVariables)]
-> IO
     (DiffTime,
      Either QErr [(CohortId, ByteString, CursorVariableValues)])
forall (b :: BackendType) (m :: * -> *).
(BackendTransport b, MonadIO m) =>
SourceConfig b
-> MultiplexedQuery b
-> [(CohortId, CohortVariables)]
-> m (DiffTime,
      Either QErr [(CohortId, ByteString, CursorVariableValues)])
runDBStreamingSubscription @b SourceConfig b
sourceConfig MultiplexedQuery b
query ([(CohortId, CohortVariables)]
 -> IO
      (DiffTime,
       Either QErr [(CohortId, ByteString, CursorVariableValues)]))
-> [(CohortId, CohortVariables)]
-> IO
     (DiffTime,
      Either QErr [(CohortId, ByteString, CursorVariableValues)])
forall a b. (a -> b) -> a -> b
$
          ASetter
  [(CohortId, CohortSnapshot)]
  [(CohortId, CohortVariables)]
  CohortSnapshot
  CohortVariables
-> (CohortSnapshot -> CohortVariables)
-> [(CohortId, CohortSnapshot)]
-> [(CohortId, CohortVariables)]
forall s t a b. ASetter s t a b -> (a -> b) -> s -> t
over (((CohortId, CohortSnapshot)
 -> Identity (CohortId, CohortVariables))
-> [(CohortId, CohortSnapshot)]
-> Identity [(CohortId, CohortVariables)]
forall s t a b. Each s t a b => Traversal s t a b
each (((CohortId, CohortSnapshot)
  -> Identity (CohortId, CohortVariables))
 -> [(CohortId, CohortSnapshot)]
 -> Identity [(CohortId, CohortVariables)])
-> ((CohortSnapshot -> Identity CohortVariables)
    -> (CohortId, CohortSnapshot)
    -> Identity (CohortId, CohortVariables))
-> ASetter
     [(CohortId, CohortSnapshot)]
     [(CohortId, CohortVariables)]
     CohortSnapshot
     CohortVariables
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (CohortSnapshot -> Identity CohortVariables)
-> (CohortId, CohortSnapshot)
-> Identity (CohortId, CohortVariables)
forall s t a b. Field2 s t a b => Lens s t a b
_2) CohortSnapshot -> CohortVariables
C._csVariables ([(CohortId, CohortSnapshot)] -> [(CohortId, CohortVariables)])
-> [(CohortId, CohortSnapshot)] -> [(CohortId, CohortVariables)]
forall a b. (a -> b) -> a -> b
$
            ((CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))
 -> (CohortId, CohortSnapshot))
-> [(CohortId,
     (CohortSnapshot, Cohort (TVar CursorVariableValues)))]
-> [(CohortId, CohortSnapshot)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (((CohortSnapshot, Cohort (TVar CursorVariableValues))
 -> CohortSnapshot)
-> (CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))
-> (CohortId, CohortSnapshot)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (CohortSnapshot, Cohort (TVar CursorVariableValues))
-> CohortSnapshot
forall a b. (a, b) -> a
fst) [(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))]
cohorts
      let lqMeta :: SubscriptionMetadata
lqMeta = DiffTime -> SubscriptionMetadata
SubscriptionMetadata (DiffTime -> SubscriptionMetadata)
-> DiffTime -> SubscriptionMetadata
forall a b. (a -> b) -> a -> b
$ DiffTime -> DiffTime
forall x y. (Duration x, Duration y) => x -> y
convertDuration DiffTime
queryExecutionTime
          operations :: [(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
  Maybe CursorVariableValues,
  (CohortSnapshot, Cohort (TVar CursorVariableValues)))]
operations = [(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))]
-> Either QErr [(CohortId, ByteString, CursorVariableValues)]
-> [(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
     Maybe CursorVariableValues,
     (CohortSnapshot, Cohort (TVar CursorVariableValues)))]
forall (m :: * -> *) k t a.
(MonadError GQExecError m, Eq k, Hashable k) =>
[(k, t)]
-> Either QErr [(k, ByteString, a)]
-> [(m ByteString, k, Maybe (ResponseHash, Int), Maybe a, t)]
getCohortOperations [(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))]
cohorts Either QErr [(CohortId, ByteString, CursorVariableValues)]
mxRes
          -- batch response size is the sum of the response sizes of the cohorts
          batchResponseSize :: Maybe Int
batchResponseSize =
            case Either QErr [(CohortId, ByteString, CursorVariableValues)]
mxRes of
              Left QErr
_ -> Maybe Int
forall a. Maybe a
Nothing
              Right [(CohortId, ByteString, CursorVariableValues)]
resp -> Int -> Maybe Int
forall a. a -> Maybe a
Just (Int -> Maybe Int) -> Int -> Maybe Int
forall a b. (a -> b) -> a -> b
$ Sum Int -> Int
forall a. Sum a -> a
getSum (Sum Int -> Int) -> Sum Int -> Int
forall a b. (a -> b) -> a -> b
$ ((CohortId, ByteString, CursorVariableValues) -> Sum Int)
-> [(CohortId, ByteString, CursorVariableValues)] -> Sum Int
forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
foldMap ((\(CohortId
_, ByteString
sqlResp, CursorVariableValues
_) -> Int -> Sum Int
forall a. a -> Sum a
Sum (Int -> Sum Int) -> (ByteString -> Int) -> ByteString -> Sum Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Int
BS.length (ByteString -> Sum Int) -> ByteString -> Sum Int
forall a b. (a -> b) -> a -> b
$ ByteString
sqlResp)) [(CohortId, ByteString, CursorVariableValues)]
resp
      (DiffTime
pushTime, [(CohortExecutionDetails,
  (CohortVariables,
   (Cohort (TVar CursorVariableValues), CohortVariables,
    [Subscriber])))]
cohortsExecutionDetails) <- IO
  [(CohortExecutionDetails,
    (CohortVariables,
     (Cohort (TVar CursorVariableValues), CohortVariables,
      [Subscriber])))]
-> IO
     (DiffTime,
      [(CohortExecutionDetails,
        (CohortVariables,
         (Cohort (TVar CursorVariableValues), CohortVariables,
          [Subscriber])))])
forall (m :: * -> *) a. MonadIO m => m a -> m (DiffTime, a)
withElapsedTime (IO
   [(CohortExecutionDetails,
     (CohortVariables,
      (Cohort (TVar CursorVariableValues), CohortVariables,
       [Subscriber])))]
 -> IO
      (DiffTime,
       [(CohortExecutionDetails,
         (CohortVariables,
          (Cohort (TVar CursorVariableValues), CohortVariables,
           [Subscriber])))]))
-> IO
     [(CohortExecutionDetails,
       (CohortVariables,
        (Cohort (TVar CursorVariableValues), CohortVariables,
         [Subscriber])))]
-> IO
     (DiffTime,
      [(CohortExecutionDetails,
        (CohortVariables,
         (Cohort (TVar CursorVariableValues), CohortVariables,
          [Subscriber])))])
forall a b. (a -> b) -> a -> b
$
        [(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
  Maybe CursorVariableValues,
  (CohortSnapshot, Cohort (TVar CursorVariableValues)))]
-> ((GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
     Maybe CursorVariableValues,
     (CohortSnapshot, Cohort (TVar CursorVariableValues)))
    -> IO
         (CohortExecutionDetails,
          (CohortVariables,
           (Cohort (TVar CursorVariableValues), CohortVariables,
            [Subscriber]))))
-> IO
     [(CohortExecutionDetails,
       (CohortVariables,
        (Cohort (TVar CursorVariableValues), CohortVariables,
         [Subscriber])))]
forall (t :: * -> *) a b.
Traversable t =>
t a -> (a -> IO b) -> IO (t b)
A.forConcurrently [(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
  Maybe CursorVariableValues,
  (CohortSnapshot, Cohort (TVar CursorVariableValues)))]
operations (((GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
   Maybe CursorVariableValues,
   (CohortSnapshot, Cohort (TVar CursorVariableValues)))
  -> IO
       (CohortExecutionDetails,
        (CohortVariables,
         (Cohort (TVar CursorVariableValues), CohortVariables,
          [Subscriber]))))
 -> IO
      [(CohortExecutionDetails,
        (CohortVariables,
         (Cohort (TVar CursorVariableValues), CohortVariables,
          [Subscriber])))])
-> ((GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
     Maybe CursorVariableValues,
     (CohortSnapshot, Cohort (TVar CursorVariableValues)))
    -> IO
         (CohortExecutionDetails,
          (CohortVariables,
           (Cohort (TVar CursorVariableValues), CohortVariables,
            [Subscriber]))))
-> IO
     [(CohortExecutionDetails,
       (CohortVariables,
        (Cohort (TVar CursorVariableValues), CohortVariables,
         [Subscriber])))]
forall a b. (a -> b) -> a -> b
$ \(GQResult ByteString
res, CohortId
cohortId, Maybe (ResponseHash, Int)
respData, Maybe CursorVariableValues
latestCursorValueMaybe, (CohortSnapshot
snapshot, Cohort (TVar CursorVariableValues)
cohort)) -> do
          let latestCursorValue :: CursorVariableValues
latestCursorValue@(CursorVariableValues HashMap Name TxtEncodedVal
updatedCursorVarVal) =
                let prevCursorVariableValue :: CursorVariableValues
prevCursorVariableValue = HashMap Name TxtEncodedVal -> CursorVariableValues
CursorVariableValues (HashMap Name TxtEncodedVal -> CursorVariableValues)
-> HashMap Name TxtEncodedVal -> CursorVariableValues
forall a b. (a -> b) -> a -> b
$ ValidatedVariables (HashMap Name) -> HashMap Name TxtEncodedVal
forall (f :: * -> *). ValidatedVariables f -> f TxtEncodedVal
C._unValidatedVariables (ValidatedVariables (HashMap Name) -> HashMap Name TxtEncodedVal)
-> ValidatedVariables (HashMap Name) -> HashMap Name TxtEncodedVal
forall a b. (a -> b) -> a -> b
$ CohortVariables -> ValidatedVariables (HashMap Name)
C._cvCursorVariables (CohortVariables -> ValidatedVariables (HashMap Name))
-> CohortVariables -> ValidatedVariables (HashMap Name)
forall a b. (a -> b) -> a -> b
$ CohortSnapshot -> CohortVariables
C._csVariables CohortSnapshot
snapshot
                 in case Maybe CursorVariableValues
latestCursorValueMaybe of
                      Maybe CursorVariableValues
Nothing -> CursorVariableValues
prevCursorVariableValue -- Nothing indicates there was an error when the query was polled
                      Just CursorVariableValues
currentPollCursorValue -> CursorVariableValues
-> CursorVariableValues -> CursorVariableValues
mergeOldAndNewCursorValues CursorVariableValues
prevCursorVariableValue CursorVariableValues
currentPollCursorValue
          ([SubscriberExecutionDetails]
pushedSubscribers, [SubscriberExecutionDetails]
ignoredSubscribers) <-
            -- Push the result to the subscribers present in the cohorts
            GQResult ByteString
-> Maybe ResponseHash
-> SubscriptionMetadata
-> CursorVariableValues
-> Name
-> (CohortSnapshot 'Streaming, Cohort 'Streaming)
-> IO ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
pushResultToCohort GQResult ByteString
res ((ResponseHash, Int) -> ResponseHash
forall a b. (a, b) -> a
fst ((ResponseHash, Int) -> ResponseHash)
-> Maybe (ResponseHash, Int) -> Maybe ResponseHash
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (ResponseHash, Int)
respData) SubscriptionMetadata
lqMeta CursorVariableValues
latestCursorValue Name
rootFieldName (CohortSnapshot
CohortSnapshot 'Streaming
snapshot, Cohort (TVar CursorVariableValues)
Cohort 'Streaming
cohort)
          let currentCohortKey :: CohortVariables
currentCohortKey = CohortSnapshot -> CohortVariables
C._csVariables CohortSnapshot
snapshot
              updatedCohortKey :: CohortVariables
updatedCohortKey = ValidatedVariables (HashMap Name)
-> CohortVariables -> CohortVariables
modifyCursorCohortVariables (HashMap Name TxtEncodedVal -> ValidatedVariables (HashMap Name)
forall (f :: * -> *). f TxtEncodedVal -> ValidatedVariables f
mkUnsafeValidateVariables HashMap Name TxtEncodedVal
updatedCursorVarVal) (CohortVariables -> CohortVariables)
-> CohortVariables -> CohortVariables
forall a b. (a -> b) -> a -> b
$ CohortSnapshot -> CohortVariables
C._csVariables CohortSnapshot
snapshot
              snapshottedNewSubs :: [Subscriber]
snapshottedNewSubs = CohortSnapshot -> [Subscriber]
C._csNewSubscribers CohortSnapshot
snapshot
              cohortExecutionDetails :: CohortExecutionDetails
cohortExecutionDetails =
                CohortExecutionDetails :: CohortId
-> CohortVariables
-> Maybe Int
-> [SubscriberExecutionDetails]
-> [SubscriberExecutionDetails]
-> BatchId
-> CohortExecutionDetails
CohortExecutionDetails
                  { _cedCohortId :: CohortId
_cedCohortId = CohortId
cohortId,
                    _cedVariables :: CohortVariables
_cedVariables = CohortVariables
currentCohortKey,
                    _cedPushedTo :: [SubscriberExecutionDetails]
_cedPushedTo = [SubscriberExecutionDetails]
pushedSubscribers,
                    _cedIgnored :: [SubscriberExecutionDetails]
_cedIgnored = [SubscriberExecutionDetails]
ignoredSubscribers,
                    _cedResponseSize :: Maybe Int
_cedResponseSize = (ResponseHash, Int) -> Int
forall a b. (a, b) -> b
snd ((ResponseHash, Int) -> Int)
-> Maybe (ResponseHash, Int) -> Maybe Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (ResponseHash, Int)
respData,
                    _cedBatchId :: BatchId
_cedBatchId = BatchId
batchId
                  }
          (CohortExecutionDetails,
 (CohortVariables,
  (Cohort (TVar CursorVariableValues), CohortVariables,
   [Subscriber])))
-> IO
     (CohortExecutionDetails,
      (CohortVariables,
       (Cohort (TVar CursorVariableValues), CohortVariables,
        [Subscriber])))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (CohortExecutionDetails
cohortExecutionDetails, (CohortVariables
currentCohortKey, (Cohort (TVar CursorVariableValues)
cohort, CohortVariables
updatedCohortKey, [Subscriber]
snapshottedNewSubs)))
      let processedCohortBatch :: [(CohortVariables,
  (Cohort (TVar CursorVariableValues), CohortVariables,
   [Subscriber]))]
processedCohortBatch = (CohortExecutionDetails,
 (CohortVariables,
  (Cohort (TVar CursorVariableValues), CohortVariables,
   [Subscriber])))
-> (CohortVariables,
    (Cohort (TVar CursorVariableValues), CohortVariables,
     [Subscriber]))
forall a b. (a, b) -> b
snd ((CohortExecutionDetails,
  (CohortVariables,
   (Cohort (TVar CursorVariableValues), CohortVariables,
    [Subscriber])))
 -> (CohortVariables,
     (Cohort (TVar CursorVariableValues), CohortVariables,
      [Subscriber])))
-> [(CohortExecutionDetails,
     (CohortVariables,
      (Cohort (TVar CursorVariableValues), CohortVariables,
       [Subscriber])))]
-> [(CohortVariables,
     (Cohort (TVar CursorVariableValues), CohortVariables,
      [Subscriber]))]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [(CohortExecutionDetails,
  (CohortVariables,
   (Cohort (TVar CursorVariableValues), CohortVariables,
    [Subscriber])))]
cohortsExecutionDetails
          batchExecDetails :: BatchExecutionDetails
batchExecDetails =
            DiffTime
-> DiffTime
-> BatchId
-> [CohortExecutionDetails]
-> Maybe Int
-> BatchExecutionDetails
BatchExecutionDetails
              DiffTime
queryExecutionTime
              DiffTime
pushTime
              BatchId
batchId
              ((CohortExecutionDetails,
 (CohortVariables,
  (Cohort (TVar CursorVariableValues), CohortVariables,
   [Subscriber])))
-> CohortExecutionDetails
forall a b. (a, b) -> a
fst ((CohortExecutionDetails,
  (CohortVariables,
   (Cohort (TVar CursorVariableValues), CohortVariables,
    [Subscriber])))
 -> CohortExecutionDetails)
-> [(CohortExecutionDetails,
     (CohortVariables,
      (Cohort (TVar CursorVariableValues), CohortVariables,
       [Subscriber])))]
-> [CohortExecutionDetails]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [(CohortExecutionDetails,
  (CohortVariables,
   (Cohort (TVar CursorVariableValues), CohortVariables,
    [Subscriber])))]
cohortsExecutionDetails)
              Maybe Int
batchResponseSize
      (BatchExecutionDetails,
 [(CohortVariables,
   (Cohort (TVar CursorVariableValues), CohortVariables,
    [Subscriber]))])
-> IO
     (BatchExecutionDetails,
      [(CohortVariables,
        (Cohort (TVar CursorVariableValues), CohortVariables,
         [Subscriber]))])
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((BatchExecutionDetails,
  [(CohortVariables,
    (Cohort (TVar CursorVariableValues), CohortVariables,
     [Subscriber]))])
 -> IO
      (BatchExecutionDetails,
       [(CohortVariables,
         (Cohort (TVar CursorVariableValues), CohortVariables,
          [Subscriber]))]))
-> (BatchExecutionDetails,
    [(CohortVariables,
      (Cohort (TVar CursorVariableValues), CohortVariables,
       [Subscriber]))])
-> IO
     (BatchExecutionDetails,
      [(CohortVariables,
        (Cohort (TVar CursorVariableValues), CohortVariables,
         [Subscriber]))])
forall a b. (a -> b) -> a -> b
$ (BatchExecutionDetails
batchExecDetails, [(CohortVariables,
  (Cohort (TVar CursorVariableValues), CohortVariables,
   [Subscriber]))]
processedCohortBatch)

    (DiffTime,
 [(BatchExecutionDetails,
   [(CohortVariables,
     (Cohort (TVar CursorVariableValues), CohortVariables,
      [Subscriber]))])])
-> IO
     (DiffTime,
      [(BatchExecutionDetails,
        [(CohortVariables,
          (Cohort (TVar CursorVariableValues), CohortVariables,
           [Subscriber]))])])
forall (f :: * -> *) a. Applicative f => a -> f a
pure (DiffTime
snapshotTime, [(BatchExecutionDetails,
  [(CohortVariables,
    (Cohort (TVar CursorVariableValues), CohortVariables,
     [Subscriber]))])]
batchesDetailsAndProcessedCohorts)

  let pollDetails :: PollDetails
pollDetails =
        PollDetails :: PollerId
-> Text
-> DiffTime
-> [BatchExecutionDetails]
-> DiffTime
-> SubscriptionsOptions
-> SourceName
-> RoleName
-> ParameterizedQueryHash
-> PollDetails
PollDetails
          { _pdPollerId :: PollerId
_pdPollerId = PollerId
pollerId,
            _pdGeneratedSql :: Text
_pdGeneratedSql = MultiplexedQuery b -> Text
forall a. ToTxt a => a -> Text
toTxt MultiplexedQuery b
query,
            _pdSnapshotTime :: DiffTime
_pdSnapshotTime = DiffTime
snapshotTime,
            _pdBatches :: [BatchExecutionDetails]
_pdBatches = (BatchExecutionDetails,
 [(CohortVariables,
   (Cohort (TVar CursorVariableValues), CohortVariables,
    [Subscriber]))])
-> BatchExecutionDetails
forall a b. (a, b) -> a
fst ((BatchExecutionDetails,
  [(CohortVariables,
    (Cohort (TVar CursorVariableValues), CohortVariables,
     [Subscriber]))])
 -> BatchExecutionDetails)
-> [(BatchExecutionDetails,
     [(CohortVariables,
       (Cohort (TVar CursorVariableValues), CohortVariables,
        [Subscriber]))])]
-> [BatchExecutionDetails]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [(BatchExecutionDetails,
  [(CohortVariables,
    (Cohort (TVar CursorVariableValues), CohortVariables,
     [Subscriber]))])]
batchesDetailsAndProcessedCohorts,
            _pdLiveQueryOptions :: SubscriptionsOptions
_pdLiveQueryOptions = SubscriptionsOptions
lqOpts,
            _pdTotalTime :: DiffTime
_pdTotalTime = DiffTime
totalTime,
            _pdSource :: SourceName
_pdSource = SourceName
sourceName,
            _pdRole :: RoleName
_pdRole = RoleName
roleName,
            _pdParameterizedQueryHash :: ParameterizedQueryHash
_pdParameterizedQueryHash = ParameterizedQueryHash
parameterizedQueryHash
          }

  STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    -- constructing a cohort map for all the cohorts that have been
    -- processed in the current poll

    -- processed cohorts is an array of tuples of the current poll cohort variables and a tuple
    -- of the cohort and the new cohort key
    let processedCohortsMap :: HashMap
  CohortVariables
  (Cohort (TVar CursorVariableValues), CohortVariables, [Subscriber])
processedCohortsMap = [(CohortVariables,
  (Cohort (TVar CursorVariableValues), CohortVariables,
   [Subscriber]))]
-> HashMap
     CohortVariables
     (Cohort (TVar CursorVariableValues), CohortVariables, [Subscriber])
forall k v. (Eq k, Hashable k) => [(k, v)] -> HashMap k v
Map.fromList ([(CohortVariables,
   (Cohort (TVar CursorVariableValues), CohortVariables,
    [Subscriber]))]
 -> HashMap
      CohortVariables
      (Cohort (TVar CursorVariableValues), CohortVariables,
       [Subscriber]))
-> [(CohortVariables,
     (Cohort (TVar CursorVariableValues), CohortVariables,
      [Subscriber]))]
-> HashMap
     CohortVariables
     (Cohort (TVar CursorVariableValues), CohortVariables, [Subscriber])
forall a b. (a -> b) -> a -> b
$ (BatchExecutionDetails,
 [(CohortVariables,
   (Cohort (TVar CursorVariableValues), CohortVariables,
    [Subscriber]))])
-> [(CohortVariables,
     (Cohort (TVar CursorVariableValues), CohortVariables,
      [Subscriber]))]
forall a b. (a, b) -> b
snd ((BatchExecutionDetails,
  [(CohortVariables,
    (Cohort (TVar CursorVariableValues), CohortVariables,
     [Subscriber]))])
 -> [(CohortVariables,
      (Cohort (TVar CursorVariableValues), CohortVariables,
       [Subscriber]))])
-> [(BatchExecutionDetails,
     [(CohortVariables,
       (Cohort (TVar CursorVariableValues), CohortVariables,
        [Subscriber]))])]
-> [(CohortVariables,
     (Cohort (TVar CursorVariableValues), CohortVariables,
      [Subscriber]))]
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< [(BatchExecutionDetails,
  [(CohortVariables,
    (Cohort (TVar CursorVariableValues), CohortVariables,
     [Subscriber]))])]
batchesDetailsAndProcessedCohorts

    -- rebuilding the cohorts and the cohort map, see [Streaming subscription polling]
    -- and [Streaming subscriptions rebuilding cohort map]
    [(CohortVariables, Cohort (TVar CursorVariableValues))]
currentCohorts <- TMap CohortVariables (Cohort (TVar CursorVariableValues))
-> STM [(CohortVariables, Cohort (TVar CursorVariableValues))]
forall k v. TMap k v -> STM [(k, v)]
TMap.toList TMap CohortVariables (Cohort (TVar CursorVariableValues))
CohortMap 'Streaming
cohortMap
    HashMap CohortVariables (Cohort (TVar CursorVariableValues))
updatedCohortsMap <-
      (HashMap CohortVariables (Cohort (TVar CursorVariableValues))
 -> (CohortVariables, Cohort (TVar CursorVariableValues))
 -> STM
      (HashMap CohortVariables (Cohort (TVar CursorVariableValues))))
-> HashMap CohortVariables (Cohort (TVar CursorVariableValues))
-> [(CohortVariables, Cohort (TVar CursorVariableValues))]
-> STM
     (HashMap CohortVariables (Cohort (TVar CursorVariableValues)))
forall (t :: * -> *) (m :: * -> *) b a.
(Foldable t, Monad m) =>
(b -> a -> m b) -> b -> t a -> m b
foldM
        ( \HashMap CohortVariables (Cohort (TVar CursorVariableValues))
accCohortMap (CohortVariables
currentCohortKey, Cohort (TVar CursorVariableValues)
currentCohort) -> do
            let processedCohortMaybe :: Maybe
  (Cohort (TVar CursorVariableValues), CohortVariables, [Subscriber])
processedCohortMaybe = CohortVariables
-> HashMap
     CohortVariables
     (Cohort (TVar CursorVariableValues), CohortVariables, [Subscriber])
-> Maybe
     (Cohort (TVar CursorVariableValues), CohortVariables, [Subscriber])
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
Map.lookup CohortVariables
currentCohortKey HashMap
  CohortVariables
  (Cohort (TVar CursorVariableValues), CohortVariables, [Subscriber])
processedCohortsMap
            case Maybe
  (Cohort (TVar CursorVariableValues), CohortVariables, [Subscriber])
processedCohortMaybe of
              -- A new cohort has been added in the cohort map whilst the
              -- current poll was happening, in this case we just return it
              -- as it is
              Maybe
  (Cohort (TVar CursorVariableValues), CohortVariables, [Subscriber])
Nothing -> (Cohort (TVar CursorVariableValues)
 -> Cohort (TVar CursorVariableValues)
 -> STM (Cohort (TVar CursorVariableValues)))
-> CohortVariables
-> Cohort (TVar CursorVariableValues)
-> HashMap CohortVariables (Cohort (TVar CursorVariableValues))
-> STM
     (HashMap CohortVariables (Cohort (TVar CursorVariableValues)))
forall (m :: * -> *) k v.
(Monad m, Hashable k, Eq k) =>
(v -> v -> m v) -> k -> v -> HashMap k v -> m (HashMap k v)
Map.insertWithM Cohort (TVar CursorVariableValues)
-> Cohort (TVar CursorVariableValues)
-> STM (Cohort (TVar CursorVariableValues))
Cohort 'Streaming -> Cohort 'Streaming -> STM (Cohort 'Streaming)
mergeCohorts CohortVariables
currentCohortKey Cohort (TVar CursorVariableValues)
currentCohort HashMap CohortVariables (Cohort (TVar CursorVariableValues))
accCohortMap
              Just (Cohort (TVar CursorVariableValues)
processedCohort, CohortVariables
updatedCohortKey, [Subscriber]
snapshottedNewSubs) -> do
                Cohort (TVar CursorVariableValues) -> [Subscriber] -> STM ()
forall streamCursorVars.
Cohort streamCursorVars -> [Subscriber] -> STM ()
updateCohortSubscribers Cohort (TVar CursorVariableValues)
currentCohort [Subscriber]
snapshottedNewSubs
                [(SubscriberId, Subscriber)]
currentCohortExistingSubscribers <- TMap SubscriberId Subscriber -> STM [(SubscriberId, Subscriber)]
forall k v. TMap k v -> STM [(k, v)]
TMap.toList (TMap SubscriberId Subscriber -> STM [(SubscriberId, Subscriber)])
-> TMap SubscriberId Subscriber -> STM [(SubscriberId, Subscriber)]
forall a b. (a -> b) -> a -> b
$ Cohort (TVar CursorVariableValues) -> TMap SubscriberId Subscriber
forall streamCursorVars.
Cohort streamCursorVars -> TMap SubscriberId Subscriber
C._cExistingSubscribers Cohort (TVar CursorVariableValues)
currentCohort
                HashMap SubscriberId Subscriber
newlyAddedSubscribers <- TMap SubscriberId Subscriber
-> STM (HashMap SubscriberId Subscriber)
forall k v. TMap k v -> STM (HashMap k v)
TMap.getMap (TMap SubscriberId Subscriber
 -> STM (HashMap SubscriberId Subscriber))
-> TMap SubscriberId Subscriber
-> STM (HashMap SubscriberId Subscriber)
forall a b. (a -> b) -> a -> b
$ Cohort (TVar CursorVariableValues) -> TMap SubscriberId Subscriber
forall streamCursorVars.
Cohort streamCursorVars -> TMap SubscriberId Subscriber
C._cNewSubscribers Cohort (TVar CursorVariableValues)
currentCohort
                -- The newly added subscribers should not be added to the updated cohort, they should be added
                -- to the old cohort because they need to be procesed for the first time with their initial value.
                -- For example: let's take 2 cohorts,
                -- s means a subscriber
                -- C1 - [s1, s2]
                -- C2 -> [s3]
                -- and S4 is added to C1 and S5 is added to C2 during the poll.
                --
                -- Let's say C1 is updated to C2 and C2 to C3, then
                -- the updated cohort map should look like:
                -- C1 -> [s4]
                -- C2 -> [s1, s2, s5]
                -- C3 -> [s3]
                --
                -- Note that s4 and s5 have not been added to the updated cohort and instead
                -- are in the original cohort they were added in.

                -- all the existing subsribers are removed from the current cohort and
                -- the newly added subscribers are added back
                HashMap CohortVariables (Cohort (TVar CursorVariableValues))
accCohortMapWithCurrentCohort <-
                  if HashMap SubscriberId Subscriber -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null HashMap SubscriberId Subscriber
newlyAddedSubscribers
                    then HashMap CohortVariables (Cohort (TVar CursorVariableValues))
-> STM
     (HashMap CohortVariables (Cohort (TVar CursorVariableValues)))
forall (f :: * -> *) a. Applicative f => a -> f a
pure HashMap CohortVariables (Cohort (TVar CursorVariableValues))
accCohortMap
                    else do
                      -- Creating a new cohort which will only contain the newly added subscribers
                      Cohort (TVar CursorVariableValues)
newCohort <- do
                        TMap SubscriberId Subscriber
existingSubs <- STM (TMap SubscriberId Subscriber)
forall k v. STM (TMap k v)
TMap.new
                        TMap SubscriberId Subscriber
newSubs <- STM (TMap SubscriberId Subscriber)
forall k v. STM (TMap k v)
TMap.new
                        Cohort (TVar CursorVariableValues)
-> STM (Cohort (TVar CursorVariableValues))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Cohort (TVar CursorVariableValues)
 -> STM (Cohort (TVar CursorVariableValues)))
-> Cohort (TVar CursorVariableValues)
-> STM (Cohort (TVar CursorVariableValues))
forall a b. (a -> b) -> a -> b
$
                          CohortId
-> TVar (Maybe ResponseHash)
-> TMap SubscriberId Subscriber
-> TMap SubscriberId Subscriber
-> TVar CursorVariableValues
-> Cohort (TVar CursorVariableValues)
forall streamCursorVars.
CohortId
-> TVar (Maybe ResponseHash)
-> TMap SubscriberId Subscriber
-> TMap SubscriberId Subscriber
-> streamCursorVars
-> Cohort streamCursorVars
C.Cohort
                            (Cohort (TVar CursorVariableValues) -> CohortId
forall streamCursorVars. Cohort streamCursorVars -> CohortId
C._cCohortId Cohort (TVar CursorVariableValues)
currentCohort)
                            (Cohort (TVar CursorVariableValues) -> TVar (Maybe ResponseHash)
forall streamCursorVars.
Cohort streamCursorVars -> TVar (Maybe ResponseHash)
C._cPreviousResponse Cohort (TVar CursorVariableValues)
currentCohort)
                            TMap SubscriberId Subscriber
existingSubs
                            TMap SubscriberId Subscriber
newSubs
                            (Cohort (TVar CursorVariableValues) -> TVar CursorVariableValues
forall streamCursorVars.
Cohort streamCursorVars -> streamCursorVars
C._cStreamCursorVariables Cohort (TVar CursorVariableValues)
currentCohort)
                      TMap SubscriberId Subscriber
-> HashMap SubscriberId Subscriber -> STM ()
forall k v. TMap k v -> HashMap k v -> STM ()
TMap.replace (Cohort (TVar CursorVariableValues) -> TMap SubscriberId Subscriber
forall streamCursorVars.
Cohort streamCursorVars -> TMap SubscriberId Subscriber
C._cNewSubscribers Cohort (TVar CursorVariableValues)
newCohort) HashMap SubscriberId Subscriber
newlyAddedSubscribers
                      (Cohort (TVar CursorVariableValues)
 -> Cohort (TVar CursorVariableValues)
 -> STM (Cohort (TVar CursorVariableValues)))
-> CohortVariables
-> Cohort (TVar CursorVariableValues)
-> HashMap CohortVariables (Cohort (TVar CursorVariableValues))
-> STM
     (HashMap CohortVariables (Cohort (TVar CursorVariableValues)))
forall (m :: * -> *) k v.
(Monad m, Hashable k, Eq k) =>
(v -> v -> m v) -> k -> v -> HashMap k v -> m (HashMap k v)
Map.insertWithM Cohort (TVar CursorVariableValues)
-> Cohort (TVar CursorVariableValues)
-> STM (Cohort (TVar CursorVariableValues))
Cohort 'Streaming -> Cohort 'Streaming -> STM (Cohort 'Streaming)
mergeCohorts CohortVariables
currentCohortKey Cohort (TVar CursorVariableValues)
newCohort HashMap CohortVariables (Cohort (TVar CursorVariableValues))
accCohortMap
                let allCurrentSubscribers :: HashSet SubscriberId
allCurrentSubscribers = [SubscriberId] -> HashSet SubscriberId
forall a. (Eq a, Hashable a) => [a] -> HashSet a
Set.fromList ([SubscriberId] -> HashSet SubscriberId)
-> [SubscriberId] -> HashSet SubscriberId
forall a b. (a -> b) -> a -> b
$ (SubscriberId, Subscriber) -> SubscriberId
forall a b. (a, b) -> a
fst ((SubscriberId, Subscriber) -> SubscriberId)
-> [(SubscriberId, Subscriber)] -> [SubscriberId]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (HashMap SubscriberId Subscriber -> [(SubscriberId, Subscriber)]
forall k v. HashMap k v -> [(k, v)]
Map.toList HashMap SubscriberId Subscriber
newlyAddedSubscribers [(SubscriberId, Subscriber)]
-> [(SubscriberId, Subscriber)] -> [(SubscriberId, Subscriber)]
forall a. Semigroup a => a -> a -> a
<> [(SubscriberId, Subscriber)]
currentCohortExistingSubscribers)
                -- retain subscribers only if they still exist in the original cohort's subscriber.
                -- It may happen that a subscriber has stopped their subscription which means it will
                -- no longer exist in the cohort map, so we need to accordingly remove such subscribers
                -- from our processed cohorts.
                (SubscriberId -> Subscriber -> Bool)
-> TMap SubscriberId Subscriber -> STM ()
forall k v. (k -> v -> Bool) -> TMap k v -> STM ()
TMap.filterWithKey (\SubscriberId
k Subscriber
_ -> SubscriberId
k SubscriberId -> HashSet SubscriberId -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` HashSet SubscriberId
allCurrentSubscribers) (TMap SubscriberId Subscriber -> STM ())
-> TMap SubscriberId Subscriber -> STM ()
forall a b. (a -> b) -> a -> b
$ Cohort (TVar CursorVariableValues) -> TMap SubscriberId Subscriber
forall streamCursorVars.
Cohort streamCursorVars -> TMap SubscriberId Subscriber
C._cExistingSubscribers Cohort (TVar CursorVariableValues)
processedCohort
                (SubscriberId -> Subscriber -> Bool)
-> TMap SubscriberId Subscriber -> STM ()
forall k v. (k -> v -> Bool) -> TMap k v -> STM ()
TMap.filterWithKey (\SubscriberId
k Subscriber
_ -> SubscriberId
k SubscriberId -> HashSet SubscriberId -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` HashSet SubscriberId
allCurrentSubscribers) (TMap SubscriberId Subscriber -> STM ())
-> TMap SubscriberId Subscriber -> STM ()
forall a b. (a -> b) -> a -> b
$ Cohort (TVar CursorVariableValues) -> TMap SubscriberId Subscriber
forall streamCursorVars.
Cohort streamCursorVars -> TMap SubscriberId Subscriber
C._cNewSubscribers Cohort (TVar CursorVariableValues)
processedCohort
                (Cohort (TVar CursorVariableValues)
 -> Cohort (TVar CursorVariableValues)
 -> STM (Cohort (TVar CursorVariableValues)))
-> CohortVariables
-> Cohort (TVar CursorVariableValues)
-> HashMap CohortVariables (Cohort (TVar CursorVariableValues))
-> STM
     (HashMap CohortVariables (Cohort (TVar CursorVariableValues)))
forall (m :: * -> *) k v.
(Monad m, Hashable k, Eq k) =>
(v -> v -> m v) -> k -> v -> HashMap k v -> m (HashMap k v)
Map.insertWithM Cohort (TVar CursorVariableValues)
-> Cohort (TVar CursorVariableValues)
-> STM (Cohort (TVar CursorVariableValues))
Cohort 'Streaming -> Cohort 'Streaming -> STM (Cohort 'Streaming)
mergeCohorts CohortVariables
updatedCohortKey Cohort (TVar CursorVariableValues)
processedCohort HashMap CohortVariables (Cohort (TVar CursorVariableValues))
accCohortMapWithCurrentCohort
        )
        HashMap CohortVariables (Cohort (TVar CursorVariableValues))
forall a. Monoid a => a
mempty
        [(CohortVariables, Cohort (TVar CursorVariableValues))]
currentCohorts
    TMap CohortVariables (Cohort (TVar CursorVariableValues))
-> HashMap CohortVariables (Cohort (TVar CursorVariableValues))
-> STM ()
forall k v. TMap k v -> HashMap k v -> STM ()
TMap.replace TMap CohortVariables (Cohort (TVar CursorVariableValues))
CohortMap 'Streaming
cohortMap HashMap CohortVariables (Cohort (TVar CursorVariableValues))
updatedCohortsMap
  SubscriptionPostPollHook
postPollHook PollDetails
pollDetails
  where
    SubscriptionsOptions BatchSize
batchSize RefetchInterval
_ = SubscriptionsOptions
lqOpts

    updateCohortSubscribers :: Cohort streamCursorVars -> [Subscriber] -> STM ()
updateCohortSubscribers (C.Cohort CohortId
_id TVar (Maybe ResponseHash)
_respRef TMap SubscriberId Subscriber
curOpsTV TMap SubscriberId Subscriber
newOpsTV streamCursorVars
_) [Subscriber]
snapshottedNewSubs = do
      [(SubscriberId, Subscriber)]
allNewOpsL <- TMap SubscriberId Subscriber -> STM [(SubscriberId, Subscriber)]
forall k v. TMap k v -> STM [(k, v)]
TMap.toList TMap SubscriberId Subscriber
newOpsTV
      let snapshottedNewSubsSet :: HashSet SubscriberId
snapshottedNewSubsSet = [SubscriberId] -> HashSet SubscriberId
forall a. (Eq a, Hashable a) => [a] -> HashSet a
Set.fromList ([SubscriberId] -> HashSet SubscriberId)
-> [SubscriberId] -> HashSet SubscriberId
forall a b. (a -> b) -> a -> b
$ Subscriber -> SubscriberId
_sId (Subscriber -> SubscriberId) -> [Subscriber] -> [SubscriberId]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [Subscriber]
snapshottedNewSubs
      [(SubscriberId, Subscriber)]
-> ((SubscriberId, Subscriber) -> STM ()) -> STM ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [(SubscriberId, Subscriber)]
allNewOpsL (((SubscriberId, Subscriber) -> STM ()) -> STM ())
-> ((SubscriberId, Subscriber) -> STM ()) -> STM ()
forall a b. (a -> b) -> a -> b
$ \(SubscriberId
subId, Subscriber
subscriber) ->
        Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (SubscriberId
subId SubscriberId -> HashSet SubscriberId -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` HashSet SubscriberId
snapshottedNewSubsSet) do
          -- we only add the snapshotted new subscribers to the current subscribers
          -- because they have been sent the first message of the subscription. The
          -- new subscribers apart from the snapshotted new subscribers are yet to
          -- recieve their first message so we just keep them as new subscribers
          Subscriber
-> SubscriberId -> TMap SubscriberId Subscriber -> STM ()
forall k v. (Eq k, Hashable k) => v -> k -> TMap k v -> STM ()
TMap.insert Subscriber
subscriber SubscriberId
subId TMap SubscriberId Subscriber
curOpsTV
          SubscriberId -> TMap SubscriberId Subscriber -> STM ()
forall k v. (Eq k, Hashable k) => k -> TMap k v -> STM ()
TMap.delete SubscriberId
subId TMap SubscriberId Subscriber
newOpsTV

    getCohortSnapshot :: (CohortVariables, Cohort streamCursorVars)
-> STM (CohortId, (CohortSnapshot, Cohort streamCursorVars))
getCohortSnapshot (CohortVariables
cohortVars, Cohort streamCursorVars
cohort) = do
      let C.Cohort CohortId
resId TVar (Maybe ResponseHash)
respRef TMap SubscriberId Subscriber
curOpsTV TMap SubscriberId Subscriber
newOpsTV streamCursorVars
_ = Cohort streamCursorVars
cohort
      [(SubscriberId, Subscriber)]
curOpsL <- TMap SubscriberId Subscriber -> STM [(SubscriberId, Subscriber)]
forall k v. TMap k v -> STM [(k, v)]
TMap.toList TMap SubscriberId Subscriber
curOpsTV
      [(SubscriberId, Subscriber)]
newOpsL <- TMap SubscriberId Subscriber -> STM [(SubscriberId, Subscriber)]
forall k v. TMap k v -> STM [(k, v)]
TMap.toList TMap SubscriberId Subscriber
newOpsTV
      let cohortSnapshot :: CohortSnapshot
cohortSnapshot = CohortVariables
-> TVar (Maybe ResponseHash)
-> [Subscriber]
-> [Subscriber]
-> CohortSnapshot
C.CohortSnapshot CohortVariables
cohortVars TVar (Maybe ResponseHash)
respRef (((SubscriberId, Subscriber) -> Subscriber)
-> [(SubscriberId, Subscriber)] -> [Subscriber]
forall a b. (a -> b) -> [a] -> [b]
map (SubscriberId, Subscriber) -> Subscriber
forall a b. (a, b) -> b
snd [(SubscriberId, Subscriber)]
curOpsL) (((SubscriberId, Subscriber) -> Subscriber)
-> [(SubscriberId, Subscriber)] -> [Subscriber]
forall a b. (a -> b) -> [a] -> [b]
map (SubscriberId, Subscriber) -> Subscriber
forall a b. (a, b) -> b
snd [(SubscriberId, Subscriber)]
newOpsL)
      (CohortId, (CohortSnapshot, Cohort streamCursorVars))
-> STM (CohortId, (CohortSnapshot, Cohort streamCursorVars))
forall (m :: * -> *) a. Monad m => a -> m a
return (CohortId
resId, (CohortSnapshot
cohortSnapshot, Cohort streamCursorVars
cohort))

    mergeCohorts :: Cohort 'Streaming -> Cohort 'Streaming -> STM.STM (Cohort 'Streaming)
    mergeCohorts :: Cohort 'Streaming -> Cohort 'Streaming -> STM (Cohort 'Streaming)
mergeCohorts Cohort 'Streaming
newCohort Cohort 'Streaming
oldCohort = do
      let newCohortExistingSubscribers :: TMap SubscriberId Subscriber
newCohortExistingSubscribers = Cohort (TVar CursorVariableValues) -> TMap SubscriberId Subscriber
forall streamCursorVars.
Cohort streamCursorVars -> TMap SubscriberId Subscriber
C._cExistingSubscribers Cohort (TVar CursorVariableValues)
Cohort 'Streaming
newCohort
          oldCohortExistingSubscribers :: TMap SubscriberId Subscriber
oldCohortExistingSubscribers = Cohort (TVar CursorVariableValues) -> TMap SubscriberId Subscriber
forall streamCursorVars.
Cohort streamCursorVars -> TMap SubscriberId Subscriber
C._cExistingSubscribers Cohort (TVar CursorVariableValues)
Cohort 'Streaming
oldCohort
          newCohortNewSubscribers :: TMap SubscriberId Subscriber
newCohortNewSubscribers = Cohort (TVar CursorVariableValues) -> TMap SubscriberId Subscriber
forall streamCursorVars.
Cohort streamCursorVars -> TMap SubscriberId Subscriber
C._cNewSubscribers Cohort (TVar CursorVariableValues)
Cohort 'Streaming
newCohort
          oldCohortNewSubscribers :: TMap SubscriberId Subscriber
oldCohortNewSubscribers = Cohort (TVar CursorVariableValues) -> TMap SubscriberId Subscriber
forall streamCursorVars.
Cohort streamCursorVars -> TMap SubscriberId Subscriber
C._cNewSubscribers Cohort (TVar CursorVariableValues)
Cohort 'Streaming
oldCohort
      TMap SubscriberId Subscriber
mergedExistingSubscribers <- TMap SubscriberId Subscriber
-> TMap SubscriberId Subscriber
-> STM (TMap SubscriberId Subscriber)
forall k v.
(Eq k, Hashable k) =>
TMap k v -> TMap k v -> STM (TMap k v)
TMap.union TMap SubscriberId Subscriber
newCohortExistingSubscribers TMap SubscriberId Subscriber
oldCohortExistingSubscribers
      TMap SubscriberId Subscriber
mergedNewSubscribers <- TMap SubscriberId Subscriber
-> TMap SubscriberId Subscriber
-> STM (TMap SubscriberId Subscriber)
forall k v.
(Eq k, Hashable k) =>
TMap k v -> TMap k v -> STM (TMap k v)
TMap.union TMap SubscriberId Subscriber
newCohortNewSubscribers TMap SubscriberId Subscriber
oldCohortNewSubscribers
      Cohort (TVar CursorVariableValues)
-> STM (Cohort (TVar CursorVariableValues))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Cohort (TVar CursorVariableValues)
 -> STM (Cohort (TVar CursorVariableValues)))
-> Cohort (TVar CursorVariableValues)
-> STM (Cohort (TVar CursorVariableValues))
forall a b. (a -> b) -> a -> b
$
        Cohort (TVar CursorVariableValues)
Cohort 'Streaming
newCohort
          { _cNewSubscribers :: TMap SubscriberId Subscriber
C._cNewSubscribers = TMap SubscriberId Subscriber
mergedNewSubscribers,
            _cExistingSubscribers :: TMap SubscriberId Subscriber
C._cExistingSubscribers = TMap SubscriberId Subscriber
mergedExistingSubscribers
          }

    getCohortOperations :: [(k, t)]
-> Either QErr [(k, ByteString, a)]
-> [(m ByteString, k, Maybe (ResponseHash, Int), Maybe a, t)]
getCohortOperations [(k, t)]
cohorts = \case
      Left QErr
e ->
        let resp :: m ByteString
resp = GQExecError -> m ByteString
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (GQExecError -> m ByteString) -> GQExecError -> m ByteString
forall a b. (a -> b) -> a -> b
$ [Value] -> GQExecError
GQExecError [Bool -> QErr -> Value
encodeGQLErr Bool
False QErr
e]
         in [(m ByteString
resp, k
cohortId, Maybe (ResponseHash, Int)
forall a. Maybe a
Nothing, Maybe a
forall a. Maybe a
Nothing, t
snapshot) | (k
cohortId, t
snapshot) <- [(k, t)]
cohorts]
      Right [(k, ByteString, a)]
responses -> do
        let cohortSnapshotMap :: HashMap k t
cohortSnapshotMap = [(k, t)] -> HashMap k t
forall k v. (Eq k, Hashable k) => [(k, v)] -> HashMap k v
Map.fromList [(k, t)]
cohorts
        -- every row of the response will contain the cohortId, response of the query and the latest value of the cursor for the cohort
        (((k, ByteString, a)
  -> Maybe (m ByteString, k, Maybe (ResponseHash, Int), Maybe a, t))
 -> [(k, ByteString, a)]
 -> [(m ByteString, k, Maybe (ResponseHash, Int), Maybe a, t)])
-> [(k, ByteString, a)]
-> ((k, ByteString, a)
    -> Maybe (m ByteString, k, Maybe (ResponseHash, Int), Maybe a, t))
-> [(m ByteString, k, Maybe (ResponseHash, Int), Maybe a, t)]
forall a b c. (a -> b -> c) -> b -> a -> c
flip ((k, ByteString, a)
 -> Maybe (m ByteString, k, Maybe (ResponseHash, Int), Maybe a, t))
-> [(k, ByteString, a)]
-> [(m ByteString, k, Maybe (ResponseHash, Int), Maybe a, t)]
forall (f :: * -> *) a b.
Filterable f =>
(a -> Maybe b) -> f a -> f b
mapMaybe [(k, ByteString, a)]
responses (((k, ByteString, a)
  -> Maybe (m ByteString, k, Maybe (ResponseHash, Int), Maybe a, t))
 -> [(m ByteString, k, Maybe (ResponseHash, Int), Maybe a, t)])
-> ((k, ByteString, a)
    -> Maybe (m ByteString, k, Maybe (ResponseHash, Int), Maybe a, t))
-> [(m ByteString, k, Maybe (ResponseHash, Int), Maybe a, t)]
forall a b. (a -> b) -> a -> b
$ \(k
cohortId, ByteString
respBS, a
respCursorLatestValue) ->
          let respHash :: ResponseHash
respHash = ByteString -> ResponseHash
mkRespHash ByteString
respBS
              respSize :: Int
respSize = ByteString -> Int
BS.length ByteString
respBS
           in -- TODO: currently we ignore the cases when the cohortId from
              -- Postgres response is not present in the cohort map of this batch
              -- (this shouldn't happen but if it happens it means a logic error and
              -- we should log it)
              (ByteString -> m ByteString
forall (f :: * -> *) a. Applicative f => a -> f a
pure ByteString
respBS,k
cohortId,(ResponseHash, Int) -> Maybe (ResponseHash, Int)
forall a. a -> Maybe a
Just (ResponseHash
respHash, Int
respSize),a -> Maybe a
forall a. a -> Maybe a
Just a
respCursorLatestValue,)
                (t -> (m ByteString, k, Maybe (ResponseHash, Int), Maybe a, t))
-> Maybe t
-> Maybe (m ByteString, k, Maybe (ResponseHash, Int), Maybe a, t)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> k -> HashMap k t -> Maybe t
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
Map.lookup k
cohortId HashMap k t
cohortSnapshotMap