{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE TemplateHaskell #-}
module Hasura.GraphQL.Execute.Subscription.Poll.StreamingQuery
(
pollStreamingQuery,
)
where
import Control.Concurrent.Async qualified as A
import Control.Concurrent.STM qualified as STM
import Control.Lens
import Data.ByteString qualified as BS
import Data.HashMap.Strict qualified as Map
import Data.HashMap.Strict.Extended qualified as Map
import Data.HashSet qualified as Set
import Data.List.Split (chunksOf)
import Data.Monoid (Sum (..))
import Data.Text.Extended
import GHC.AssertNF.CPP
import Hasura.Base.Error
import Hasura.GraphQL.Execute.Backend
import Hasura.GraphQL.Execute.Subscription.Options
import Hasura.GraphQL.Execute.Subscription.Plan
import Hasura.GraphQL.Execute.Subscription.Plan qualified as C
import Hasura.GraphQL.Execute.Subscription.Poll.Common hiding (Cohort (..), CohortMap, CohortSnapshot (..))
import Hasura.GraphQL.Execute.Subscription.Poll.Common qualified as C
import Hasura.GraphQL.Execute.Subscription.TMap qualified as TMap
import Hasura.GraphQL.Execute.Subscription.Types
import Hasura.GraphQL.ParameterizedQueryHash (ParameterizedQueryHash)
import Hasura.GraphQL.Transport.Backend
import Hasura.GraphQL.Transport.HTTP.Protocol
import Hasura.Prelude
import Hasura.RQL.Types.Backend
import Hasura.RQL.Types.Common (SourceName)
import Hasura.RQL.Types.Numeric qualified as Numeric
import Hasura.RQL.Types.Subscription (SubscriptionType (..))
import Hasura.SQL.Value (TxtEncodedVal (..))
import Hasura.Session
import Language.GraphQL.Draft.Syntax qualified as G
import Text.Shakespeare.Text (st)
mergeOldAndNewCursorValues :: CursorVariableValues -> CursorVariableValues -> CursorVariableValues
mergeOldAndNewCursorValues :: CursorVariableValues
-> CursorVariableValues -> CursorVariableValues
mergeOldAndNewCursorValues (CursorVariableValues HashMap Name TxtEncodedVal
prevPollCursorValues) (CursorVariableValues HashMap Name TxtEncodedVal
currentPollCursorValues) =
let combineFn :: TxtEncodedVal -> TxtEncodedVal -> TxtEncodedVal
combineFn TxtEncodedVal
previousVal TxtEncodedVal
currentVal =
case TxtEncodedVal
currentVal of
TxtEncodedVal
TENull -> TxtEncodedVal
previousVal
TELit Text
t -> Text -> TxtEncodedVal
TELit Text
t
in HashMap Name TxtEncodedVal -> CursorVariableValues
CursorVariableValues (HashMap Name TxtEncodedVal -> CursorVariableValues)
-> HashMap Name TxtEncodedVal -> CursorVariableValues
forall a b. (a -> b) -> a -> b
$ (TxtEncodedVal -> TxtEncodedVal -> TxtEncodedVal)
-> HashMap Name TxtEncodedVal
-> HashMap Name TxtEncodedVal
-> HashMap Name TxtEncodedVal
forall k v.
(Eq k, Hashable k) =>
(v -> v -> v) -> HashMap k v -> HashMap k v -> HashMap k v
Map.unionWith TxtEncodedVal -> TxtEncodedVal -> TxtEncodedVal
combineFn HashMap Name TxtEncodedVal
prevPollCursorValues HashMap Name TxtEncodedVal
currentPollCursorValues
pushResultToCohort ::
GQResult BS.ByteString ->
Maybe ResponseHash ->
SubscriptionMetadata ->
CursorVariableValues ->
G.Name ->
(CohortSnapshot 'Streaming, Cohort 'Streaming) ->
IO ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
pushResultToCohort :: GQResult ByteString
-> Maybe ResponseHash
-> SubscriptionMetadata
-> CursorVariableValues
-> Name
-> (CohortSnapshot 'Streaming, Cohort 'Streaming)
-> IO ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
pushResultToCohort GQResult ByteString
result !Maybe ResponseHash
respHashM (SubscriptionMetadata DiffTime
dTime) CursorVariableValues
cursorValues Name
rootFieldName (CohortSnapshot 'Streaming
cohortSnapshot, Cohort 'Streaming
cohort) = do
Maybe ResponseHash
prevRespHashM <- TVar (Maybe ResponseHash) -> IO (Maybe ResponseHash)
forall a. TVar a -> IO a
STM.readTVarIO TVar (Maybe ResponseHash)
respRef
([Subscriber]
subscribersToPush, [Subscriber]
subscribersToIgnore) <-
if GQResult ByteString -> Bool
forall a. GQResult a -> Bool
isExecError GQResult ByteString
result Bool -> Bool -> Bool
|| Maybe ResponseHash
respHashM Maybe ResponseHash -> Maybe ResponseHash -> Bool
forall a. Eq a => a -> a -> Bool
/= Maybe ResponseHash
prevRespHashM
then do
String
String -> Maybe ResponseHash -> IO ()
forall a. String -> a -> IO ()
$assertNFHere Maybe ResponseHash
respHashM
STM ([Subscriber], [Subscriber]) -> IO ([Subscriber], [Subscriber])
forall a. STM a -> IO a
STM.atomically (STM ([Subscriber], [Subscriber])
-> IO ([Subscriber], [Subscriber]))
-> STM ([Subscriber], [Subscriber])
-> IO ([Subscriber], [Subscriber])
forall a b. (a -> b) -> a -> b
$ do
TVar (Maybe ResponseHash) -> Maybe ResponseHash -> STM ()
forall a. TVar a -> a -> STM ()
STM.writeTVar TVar (Maybe ResponseHash)
respRef Maybe ResponseHash
respHashM
TVar CursorVariableValues -> CursorVariableValues -> STM ()
forall a. TVar a -> a -> STM ()
STM.writeTVar (Cohort (TVar CursorVariableValues) -> TVar CursorVariableValues
forall streamCursorVars.
Cohort streamCursorVars -> streamCursorVars
C._cStreamCursorVariables Cohort (TVar CursorVariableValues)
Cohort 'Streaming
cohort) CursorVariableValues
cursorValues
([Subscriber], [Subscriber]) -> STM ([Subscriber], [Subscriber])
forall (m :: * -> *) a. Monad m => a -> m a
return ([Subscriber]
newSinks [Subscriber] -> [Subscriber] -> [Subscriber]
forall a. Semigroup a => a -> a -> a
<> [Subscriber]
curSinks, [Subscriber]
forall a. Monoid a => a
mempty)
else
([Subscriber], [Subscriber]) -> IO ([Subscriber], [Subscriber])
forall (m :: * -> *) a. Monad m => a -> m a
return ([Subscriber]
newSinks, [Subscriber]
curSinks)
[Subscriber] -> IO ()
pushResultToSubscribers [Subscriber]
subscribersToPush
([SubscriberExecutionDetails], [SubscriberExecutionDetails])
-> IO ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
forall (f :: * -> *) a. Applicative f => a -> f a
pure (([SubscriberExecutionDetails], [SubscriberExecutionDetails])
-> IO ([SubscriberExecutionDetails], [SubscriberExecutionDetails]))
-> ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
-> IO ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
forall a b. (a -> b) -> a -> b
$
ASetter
([Subscriber], [Subscriber])
([SubscriberExecutionDetails], [SubscriberExecutionDetails])
Subscriber
SubscriberExecutionDetails
-> (Subscriber -> SubscriberExecutionDetails)
-> ([Subscriber], [Subscriber])
-> ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
forall s t a b. ASetter s t a b -> (a -> b) -> s -> t
over
(([Subscriber] -> Identity [SubscriberExecutionDetails])
-> ([Subscriber], [Subscriber])
-> Identity
([SubscriberExecutionDetails], [SubscriberExecutionDetails])
forall s t a b. Each s t a b => Traversal s t a b
each (([Subscriber] -> Identity [SubscriberExecutionDetails])
-> ([Subscriber], [Subscriber])
-> Identity
([SubscriberExecutionDetails], [SubscriberExecutionDetails]))
-> ((Subscriber -> Identity SubscriberExecutionDetails)
-> [Subscriber] -> Identity [SubscriberExecutionDetails])
-> ASetter
([Subscriber], [Subscriber])
([SubscriberExecutionDetails], [SubscriberExecutionDetails])
Subscriber
SubscriberExecutionDetails
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Subscriber -> Identity SubscriberExecutionDetails)
-> [Subscriber] -> Identity [SubscriberExecutionDetails]
forall s t a b. Each s t a b => Traversal s t a b
each)
( \Subscriber {Maybe OperationName
RequestId
SubscriberMetadata
SubscriberId
OnChange
_sOnChangeCallback :: Subscriber -> OnChange
_sOperationName :: Subscriber -> Maybe OperationName
_sRequestId :: Subscriber -> RequestId
_sMetadata :: Subscriber -> SubscriberMetadata
_sId :: Subscriber -> SubscriberId
_sOnChangeCallback :: OnChange
_sOperationName :: Maybe OperationName
_sRequestId :: RequestId
_sMetadata :: SubscriberMetadata
_sId :: SubscriberId
..} ->
SubscriberId -> SubscriberMetadata -> SubscriberExecutionDetails
SubscriberExecutionDetails SubscriberId
_sId SubscriberMetadata
_sMetadata
)
([Subscriber]
subscribersToPush, [Subscriber]
subscribersToIgnore)
where
rootFieldNameText :: Text
rootFieldNameText = Name -> Text
G.unName Name
rootFieldName
emptyRespBS :: ByteString
emptyRespBS = Text -> ByteString
txtToBs (Text -> ByteString) -> Text -> ByteString
forall a b. (a -> b) -> a -> b
$ [st|{"#{rootFieldNameText}" : []}|]
isResponseEmpty :: Bool
isResponseEmpty = GQResult ByteString
result GQResult ByteString -> GQResult ByteString -> Bool
forall a. Eq a => a -> a -> Bool
== ByteString -> GQResult ByteString
forall a b. b -> Either a b
Right ByteString
emptyRespBS
C.CohortSnapshot CohortVariables
_ TVar (Maybe ResponseHash)
respRef [Subscriber]
curSinks [Subscriber]
newSinks = CohortSnapshot
CohortSnapshot 'Streaming
cohortSnapshot
response :: Either GQExecError SubscriptionResponse
response = GQResult ByteString
result GQResult ByteString
-> (ByteString -> SubscriptionResponse)
-> Either GQExecError SubscriptionResponse
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \ByteString
payload -> ByteString -> DiffTime -> SubscriptionResponse
SubscriptionResponse ByteString
payload DiffTime
dTime
pushResultToSubscribers :: [Subscriber] -> IO ()
pushResultToSubscribers [Subscriber]
subscribers =
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
isResponseEmpty (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
((Subscriber -> IO ()) -> [Subscriber] -> IO ())
-> [Subscriber] -> (Subscriber -> IO ()) -> IO ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip (Subscriber -> IO ()) -> [Subscriber] -> IO ()
forall (f :: * -> *) a b. Foldable f => (a -> IO b) -> f a -> IO ()
A.mapConcurrently_ [Subscriber]
subscribers ((Subscriber -> IO ()) -> IO ()) -> (Subscriber -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Subscriber {Maybe OperationName
RequestId
SubscriberMetadata
SubscriberId
OnChange
_sOnChangeCallback :: OnChange
_sOperationName :: Maybe OperationName
_sRequestId :: RequestId
_sMetadata :: SubscriberMetadata
_sId :: SubscriberId
_sOnChangeCallback :: Subscriber -> OnChange
_sOperationName :: Subscriber -> Maybe OperationName
_sRequestId :: Subscriber -> RequestId
_sMetadata :: Subscriber -> SubscriberMetadata
_sId :: Subscriber -> SubscriberId
..} -> OnChange
_sOnChangeCallback Either GQExecError SubscriptionResponse
response
pollStreamingQuery ::
forall b.
BackendTransport b =>
PollerId ->
SubscriptionsOptions ->
(SourceName, SourceConfig b) ->
RoleName ->
ParameterizedQueryHash ->
MultiplexedQuery b ->
CohortMap 'Streaming ->
G.Name ->
SubscriptionPostPollHook ->
Maybe (IO ()) ->
IO ()
pollStreamingQuery :: PollerId
-> SubscriptionsOptions
-> (SourceName, SourceConfig b)
-> RoleName
-> ParameterizedQueryHash
-> MultiplexedQuery b
-> CohortMap 'Streaming
-> Name
-> SubscriptionPostPollHook
-> Maybe (IO ())
-> IO ()
pollStreamingQuery PollerId
pollerId SubscriptionsOptions
lqOpts (SourceName
sourceName, SourceConfig b
sourceConfig) RoleName
roleName ParameterizedQueryHash
parameterizedQueryHash MultiplexedQuery b
query CohortMap 'Streaming
cohortMap Name
rootFieldName SubscriptionPostPollHook
postPollHook Maybe (IO ())
testActionMaybe = do
(DiffTime
totalTime, (DiffTime
snapshotTime, [(BatchExecutionDetails,
[(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))])]
batchesDetailsAndProcessedCohorts)) <- IO
(DiffTime,
[(BatchExecutionDetails,
[(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))])])
-> IO
(DiffTime,
(DiffTime,
[(BatchExecutionDetails,
[(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))])]))
forall (m :: * -> *) a. MonadIO m => m a -> m (DiffTime, a)
withElapsedTime (IO
(DiffTime,
[(BatchExecutionDetails,
[(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))])])
-> IO
(DiffTime,
(DiffTime,
[(BatchExecutionDetails,
[(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))])])))
-> IO
(DiffTime,
[(BatchExecutionDetails,
[(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))])])
-> IO
(DiffTime,
(DiffTime,
[(BatchExecutionDetails,
[(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))])]))
forall a b. (a -> b) -> a -> b
$ do
(DiffTime
snapshotTime, [(BatchId,
[(CohortId,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))])]
cohortBatches) <- IO
[(BatchId,
[(CohortId,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))])]
-> IO
(DiffTime,
[(BatchId,
[(CohortId,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))])])
forall (m :: * -> *) a. MonadIO m => m a -> m (DiffTime, a)
withElapsedTime (IO
[(BatchId,
[(CohortId,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))])]
-> IO
(DiffTime,
[(BatchId,
[(CohortId,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))])]))
-> IO
[(BatchId,
[(CohortId,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))])]
-> IO
(DiffTime,
[(BatchId,
[(CohortId,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))])])
forall a b. (a -> b) -> a -> b
$ do
[(CohortVariables, Cohort (TVar CursorVariableValues))]
cohorts <- STM [(CohortVariables, Cohort (TVar CursorVariableValues))]
-> IO [(CohortVariables, Cohort (TVar CursorVariableValues))]
forall a. STM a -> IO a
STM.atomically (STM [(CohortVariables, Cohort (TVar CursorVariableValues))]
-> IO [(CohortVariables, Cohort (TVar CursorVariableValues))])
-> STM [(CohortVariables, Cohort (TVar CursorVariableValues))]
-> IO [(CohortVariables, Cohort (TVar CursorVariableValues))]
forall a b. (a -> b) -> a -> b
$ TMap CohortVariables (Cohort (TVar CursorVariableValues))
-> STM [(CohortVariables, Cohort (TVar CursorVariableValues))]
forall k v. TMap k v -> STM [(k, v)]
TMap.toList TMap CohortVariables (Cohort (TVar CursorVariableValues))
CohortMap 'Streaming
cohortMap
[(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))]
cohortSnapshots <- ((CohortVariables, Cohort (TVar CursorVariableValues))
-> IO
(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues))))
-> [(CohortVariables, Cohort (TVar CursorVariableValues))]
-> IO
[(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM (STM
(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))
-> IO
(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))
forall a. STM a -> IO a
STM.atomically (STM
(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))
-> IO
(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues))))
-> ((CohortVariables, Cohort (TVar CursorVariableValues))
-> STM
(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues))))
-> (CohortVariables, Cohort (TVar CursorVariableValues))
-> IO
(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (CohortVariables, Cohort (TVar CursorVariableValues))
-> STM
(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))
forall streamCursorVars.
(CohortVariables, Cohort streamCursorVars)
-> STM (CohortId, (CohortSnapshot, Cohort streamCursorVars))
getCohortSnapshot) [(CohortVariables, Cohort (TVar CursorVariableValues))]
cohorts
let cohortBatches :: [[(CohortId,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))]]
cohortBatches = Int
-> [(CohortId,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))]
-> [[(CohortId,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))]]
forall e. Int -> [e] -> [[e]]
chunksOf (NonNegativeInt -> Int
Numeric.getNonNegativeInt (BatchSize -> NonNegativeInt
unBatchSize BatchSize
batchSize)) [(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))]
cohortSnapshots
[(BatchId,
[(CohortId,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))])]
-> IO
[(BatchId,
[(CohortId,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))])]
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([(BatchId,
[(CohortId,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))])]
-> IO
[(BatchId,
[(CohortId,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))])])
-> [(BatchId,
[(CohortId,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))])]
-> IO
[(BatchId,
[(CohortId,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))])]
forall a b. (a -> b) -> a -> b
$ [BatchId]
-> [[(CohortId,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))]]
-> [(BatchId,
[(CohortId,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))])]
forall a b. [a] -> [b] -> [(a, b)]
zip (Int -> BatchId
BatchId (Int -> BatchId) -> [Int] -> [BatchId]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [Int
1 ..]) [[(CohortId,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))]]
cohortBatches
Maybe (IO ()) -> (IO () -> IO ()) -> IO ()
forall (m :: * -> *) a.
Applicative m =>
Maybe a -> (a -> m ()) -> m ()
onJust Maybe (IO ())
testActionMaybe IO () -> IO ()
forall a. a -> a
id
[(BatchExecutionDetails,
[(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))])]
batchesDetailsAndProcessedCohorts <- [(BatchId,
[(CohortId,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))])]
-> ((BatchId,
[(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))])
-> IO
(BatchExecutionDetails,
[(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))]))
-> IO
[(BatchExecutionDetails,
[(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))])]
forall (t :: * -> *) a b.
Traversable t =>
t a -> (a -> IO b) -> IO (t b)
A.forConcurrently [(BatchId,
[(CohortId,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))])]
cohortBatches (((BatchId,
[(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))])
-> IO
(BatchExecutionDetails,
[(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))]))
-> IO
[(BatchExecutionDetails,
[(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))])])
-> ((BatchId,
[(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))])
-> IO
(BatchExecutionDetails,
[(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))]))
-> IO
[(BatchExecutionDetails,
[(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))])]
forall a b. (a -> b) -> a -> b
$ \(BatchId
batchId, [(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))]
cohorts) -> do
(DiffTime
queryExecutionTime, Either QErr [(CohortId, ByteString, CursorVariableValues)]
mxRes) <-
SourceConfig b
-> MultiplexedQuery b
-> [(CohortId, CohortVariables)]
-> IO
(DiffTime,
Either QErr [(CohortId, ByteString, CursorVariableValues)])
forall (b :: BackendType) (m :: * -> *).
(BackendTransport b, MonadIO m) =>
SourceConfig b
-> MultiplexedQuery b
-> [(CohortId, CohortVariables)]
-> m (DiffTime,
Either QErr [(CohortId, ByteString, CursorVariableValues)])
runDBStreamingSubscription @b SourceConfig b
sourceConfig MultiplexedQuery b
query ([(CohortId, CohortVariables)]
-> IO
(DiffTime,
Either QErr [(CohortId, ByteString, CursorVariableValues)]))
-> [(CohortId, CohortVariables)]
-> IO
(DiffTime,
Either QErr [(CohortId, ByteString, CursorVariableValues)])
forall a b. (a -> b) -> a -> b
$
ASetter
[(CohortId, CohortSnapshot)]
[(CohortId, CohortVariables)]
CohortSnapshot
CohortVariables
-> (CohortSnapshot -> CohortVariables)
-> [(CohortId, CohortSnapshot)]
-> [(CohortId, CohortVariables)]
forall s t a b. ASetter s t a b -> (a -> b) -> s -> t
over (((CohortId, CohortSnapshot)
-> Identity (CohortId, CohortVariables))
-> [(CohortId, CohortSnapshot)]
-> Identity [(CohortId, CohortVariables)]
forall s t a b. Each s t a b => Traversal s t a b
each (((CohortId, CohortSnapshot)
-> Identity (CohortId, CohortVariables))
-> [(CohortId, CohortSnapshot)]
-> Identity [(CohortId, CohortVariables)])
-> ((CohortSnapshot -> Identity CohortVariables)
-> (CohortId, CohortSnapshot)
-> Identity (CohortId, CohortVariables))
-> ASetter
[(CohortId, CohortSnapshot)]
[(CohortId, CohortVariables)]
CohortSnapshot
CohortVariables
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (CohortSnapshot -> Identity CohortVariables)
-> (CohortId, CohortSnapshot)
-> Identity (CohortId, CohortVariables)
forall s t a b. Field2 s t a b => Lens s t a b
_2) CohortSnapshot -> CohortVariables
C._csVariables ([(CohortId, CohortSnapshot)] -> [(CohortId, CohortVariables)])
-> [(CohortId, CohortSnapshot)] -> [(CohortId, CohortVariables)]
forall a b. (a -> b) -> a -> b
$
((CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))
-> (CohortId, CohortSnapshot))
-> [(CohortId,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))]
-> [(CohortId, CohortSnapshot)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (((CohortSnapshot, Cohort (TVar CursorVariableValues))
-> CohortSnapshot)
-> (CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))
-> (CohortId, CohortSnapshot)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (CohortSnapshot, Cohort (TVar CursorVariableValues))
-> CohortSnapshot
forall a b. (a, b) -> a
fst) [(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))]
cohorts
let lqMeta :: SubscriptionMetadata
lqMeta = DiffTime -> SubscriptionMetadata
SubscriptionMetadata (DiffTime -> SubscriptionMetadata)
-> DiffTime -> SubscriptionMetadata
forall a b. (a -> b) -> a -> b
$ DiffTime -> DiffTime
forall x y. (Duration x, Duration y) => x -> y
convertDuration DiffTime
queryExecutionTime
operations :: [(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
Maybe CursorVariableValues,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))]
operations = [(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))]
-> Either QErr [(CohortId, ByteString, CursorVariableValues)]
-> [(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
Maybe CursorVariableValues,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))]
forall (m :: * -> *) k t a.
(MonadError GQExecError m, Eq k, Hashable k) =>
[(k, t)]
-> Either QErr [(k, ByteString, a)]
-> [(m ByteString, k, Maybe (ResponseHash, Int), Maybe a, t)]
getCohortOperations [(CohortId, (CohortSnapshot, Cohort (TVar CursorVariableValues)))]
cohorts Either QErr [(CohortId, ByteString, CursorVariableValues)]
mxRes
batchResponseSize :: Maybe Int
batchResponseSize =
case Either QErr [(CohortId, ByteString, CursorVariableValues)]
mxRes of
Left QErr
_ -> Maybe Int
forall a. Maybe a
Nothing
Right [(CohortId, ByteString, CursorVariableValues)]
resp -> Int -> Maybe Int
forall a. a -> Maybe a
Just (Int -> Maybe Int) -> Int -> Maybe Int
forall a b. (a -> b) -> a -> b
$ Sum Int -> Int
forall a. Sum a -> a
getSum (Sum Int -> Int) -> Sum Int -> Int
forall a b. (a -> b) -> a -> b
$ ((CohortId, ByteString, CursorVariableValues) -> Sum Int)
-> [(CohortId, ByteString, CursorVariableValues)] -> Sum Int
forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
foldMap ((\(CohortId
_, ByteString
sqlResp, CursorVariableValues
_) -> Int -> Sum Int
forall a. a -> Sum a
Sum (Int -> Sum Int) -> (ByteString -> Int) -> ByteString -> Sum Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Int
BS.length (ByteString -> Sum Int) -> ByteString -> Sum Int
forall a b. (a -> b) -> a -> b
$ ByteString
sqlResp)) [(CohortId, ByteString, CursorVariableValues)]
resp
(DiffTime
pushTime, [(CohortExecutionDetails,
(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber])))]
cohortsExecutionDetails) <- IO
[(CohortExecutionDetails,
(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber])))]
-> IO
(DiffTime,
[(CohortExecutionDetails,
(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber])))])
forall (m :: * -> *) a. MonadIO m => m a -> m (DiffTime, a)
withElapsedTime (IO
[(CohortExecutionDetails,
(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber])))]
-> IO
(DiffTime,
[(CohortExecutionDetails,
(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber])))]))
-> IO
[(CohortExecutionDetails,
(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber])))]
-> IO
(DiffTime,
[(CohortExecutionDetails,
(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber])))])
forall a b. (a -> b) -> a -> b
$
[(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
Maybe CursorVariableValues,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))]
-> ((GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
Maybe CursorVariableValues,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))
-> IO
(CohortExecutionDetails,
(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))))
-> IO
[(CohortExecutionDetails,
(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber])))]
forall (t :: * -> *) a b.
Traversable t =>
t a -> (a -> IO b) -> IO (t b)
A.forConcurrently [(GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
Maybe CursorVariableValues,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))]
operations (((GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
Maybe CursorVariableValues,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))
-> IO
(CohortExecutionDetails,
(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))))
-> IO
[(CohortExecutionDetails,
(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber])))])
-> ((GQResult ByteString, CohortId, Maybe (ResponseHash, Int),
Maybe CursorVariableValues,
(CohortSnapshot, Cohort (TVar CursorVariableValues)))
-> IO
(CohortExecutionDetails,
(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))))
-> IO
[(CohortExecutionDetails,
(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber])))]
forall a b. (a -> b) -> a -> b
$ \(GQResult ByteString
res, CohortId
cohortId, Maybe (ResponseHash, Int)
respData, Maybe CursorVariableValues
latestCursorValueMaybe, (CohortSnapshot
snapshot, Cohort (TVar CursorVariableValues)
cohort)) -> do
let latestCursorValue :: CursorVariableValues
latestCursorValue@(CursorVariableValues HashMap Name TxtEncodedVal
updatedCursorVarVal) =
let prevCursorVariableValue :: CursorVariableValues
prevCursorVariableValue = HashMap Name TxtEncodedVal -> CursorVariableValues
CursorVariableValues (HashMap Name TxtEncodedVal -> CursorVariableValues)
-> HashMap Name TxtEncodedVal -> CursorVariableValues
forall a b. (a -> b) -> a -> b
$ ValidatedVariables (HashMap Name) -> HashMap Name TxtEncodedVal
forall (f :: * -> *). ValidatedVariables f -> f TxtEncodedVal
C._unValidatedVariables (ValidatedVariables (HashMap Name) -> HashMap Name TxtEncodedVal)
-> ValidatedVariables (HashMap Name) -> HashMap Name TxtEncodedVal
forall a b. (a -> b) -> a -> b
$ CohortVariables -> ValidatedVariables (HashMap Name)
C._cvCursorVariables (CohortVariables -> ValidatedVariables (HashMap Name))
-> CohortVariables -> ValidatedVariables (HashMap Name)
forall a b. (a -> b) -> a -> b
$ CohortSnapshot -> CohortVariables
C._csVariables CohortSnapshot
snapshot
in case Maybe CursorVariableValues
latestCursorValueMaybe of
Maybe CursorVariableValues
Nothing -> CursorVariableValues
prevCursorVariableValue
Just CursorVariableValues
currentPollCursorValue -> CursorVariableValues
-> CursorVariableValues -> CursorVariableValues
mergeOldAndNewCursorValues CursorVariableValues
prevCursorVariableValue CursorVariableValues
currentPollCursorValue
([SubscriberExecutionDetails]
pushedSubscribers, [SubscriberExecutionDetails]
ignoredSubscribers) <-
GQResult ByteString
-> Maybe ResponseHash
-> SubscriptionMetadata
-> CursorVariableValues
-> Name
-> (CohortSnapshot 'Streaming, Cohort 'Streaming)
-> IO ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
pushResultToCohort GQResult ByteString
res ((ResponseHash, Int) -> ResponseHash
forall a b. (a, b) -> a
fst ((ResponseHash, Int) -> ResponseHash)
-> Maybe (ResponseHash, Int) -> Maybe ResponseHash
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (ResponseHash, Int)
respData) SubscriptionMetadata
lqMeta CursorVariableValues
latestCursorValue Name
rootFieldName (CohortSnapshot
CohortSnapshot 'Streaming
snapshot, Cohort (TVar CursorVariableValues)
Cohort 'Streaming
cohort)
let currentCohortKey :: CohortVariables
currentCohortKey = CohortSnapshot -> CohortVariables
C._csVariables CohortSnapshot
snapshot
updatedCohortKey :: CohortVariables
updatedCohortKey = ValidatedVariables (HashMap Name)
-> CohortVariables -> CohortVariables
modifyCursorCohortVariables (HashMap Name TxtEncodedVal -> ValidatedVariables (HashMap Name)
forall (f :: * -> *). f TxtEncodedVal -> ValidatedVariables f
mkUnsafeValidateVariables HashMap Name TxtEncodedVal
updatedCursorVarVal) (CohortVariables -> CohortVariables)
-> CohortVariables -> CohortVariables
forall a b. (a -> b) -> a -> b
$ CohortSnapshot -> CohortVariables
C._csVariables CohortSnapshot
snapshot
snapshottedNewSubs :: [Subscriber]
snapshottedNewSubs = CohortSnapshot -> [Subscriber]
C._csNewSubscribers CohortSnapshot
snapshot
cohortExecutionDetails :: CohortExecutionDetails
cohortExecutionDetails =
CohortExecutionDetails :: CohortId
-> CohortVariables
-> Maybe Int
-> [SubscriberExecutionDetails]
-> [SubscriberExecutionDetails]
-> BatchId
-> CohortExecutionDetails
CohortExecutionDetails
{ _cedCohortId :: CohortId
_cedCohortId = CohortId
cohortId,
_cedVariables :: CohortVariables
_cedVariables = CohortVariables
currentCohortKey,
_cedPushedTo :: [SubscriberExecutionDetails]
_cedPushedTo = [SubscriberExecutionDetails]
pushedSubscribers,
_cedIgnored :: [SubscriberExecutionDetails]
_cedIgnored = [SubscriberExecutionDetails]
ignoredSubscribers,
_cedResponseSize :: Maybe Int
_cedResponseSize = (ResponseHash, Int) -> Int
forall a b. (a, b) -> b
snd ((ResponseHash, Int) -> Int)
-> Maybe (ResponseHash, Int) -> Maybe Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (ResponseHash, Int)
respData,
_cedBatchId :: BatchId
_cedBatchId = BatchId
batchId
}
(CohortExecutionDetails,
(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber])))
-> IO
(CohortExecutionDetails,
(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber])))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (CohortExecutionDetails
cohortExecutionDetails, (CohortVariables
currentCohortKey, (Cohort (TVar CursorVariableValues)
cohort, CohortVariables
updatedCohortKey, [Subscriber]
snapshottedNewSubs)))
let processedCohortBatch :: [(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))]
processedCohortBatch = (CohortExecutionDetails,
(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber])))
-> (CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))
forall a b. (a, b) -> b
snd ((CohortExecutionDetails,
(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber])))
-> (CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber])))
-> [(CohortExecutionDetails,
(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber])))]
-> [(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [(CohortExecutionDetails,
(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber])))]
cohortsExecutionDetails
batchExecDetails :: BatchExecutionDetails
batchExecDetails =
DiffTime
-> DiffTime
-> BatchId
-> [CohortExecutionDetails]
-> Maybe Int
-> BatchExecutionDetails
BatchExecutionDetails
DiffTime
queryExecutionTime
DiffTime
pushTime
BatchId
batchId
((CohortExecutionDetails,
(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber])))
-> CohortExecutionDetails
forall a b. (a, b) -> a
fst ((CohortExecutionDetails,
(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber])))
-> CohortExecutionDetails)
-> [(CohortExecutionDetails,
(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber])))]
-> [CohortExecutionDetails]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [(CohortExecutionDetails,
(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber])))]
cohortsExecutionDetails)
Maybe Int
batchResponseSize
(BatchExecutionDetails,
[(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))])
-> IO
(BatchExecutionDetails,
[(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))])
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((BatchExecutionDetails,
[(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))])
-> IO
(BatchExecutionDetails,
[(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))]))
-> (BatchExecutionDetails,
[(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))])
-> IO
(BatchExecutionDetails,
[(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))])
forall a b. (a -> b) -> a -> b
$ (BatchExecutionDetails
batchExecDetails, [(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))]
processedCohortBatch)
(DiffTime,
[(BatchExecutionDetails,
[(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))])])
-> IO
(DiffTime,
[(BatchExecutionDetails,
[(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))])])
forall (f :: * -> *) a. Applicative f => a -> f a
pure (DiffTime
snapshotTime, [(BatchExecutionDetails,
[(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))])]
batchesDetailsAndProcessedCohorts)
let pollDetails :: PollDetails
pollDetails =
PollDetails :: PollerId
-> Text
-> DiffTime
-> [BatchExecutionDetails]
-> DiffTime
-> SubscriptionsOptions
-> SourceName
-> RoleName
-> ParameterizedQueryHash
-> PollDetails
PollDetails
{ _pdPollerId :: PollerId
_pdPollerId = PollerId
pollerId,
_pdGeneratedSql :: Text
_pdGeneratedSql = MultiplexedQuery b -> Text
forall a. ToTxt a => a -> Text
toTxt MultiplexedQuery b
query,
_pdSnapshotTime :: DiffTime
_pdSnapshotTime = DiffTime
snapshotTime,
_pdBatches :: [BatchExecutionDetails]
_pdBatches = (BatchExecutionDetails,
[(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))])
-> BatchExecutionDetails
forall a b. (a, b) -> a
fst ((BatchExecutionDetails,
[(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))])
-> BatchExecutionDetails)
-> [(BatchExecutionDetails,
[(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))])]
-> [BatchExecutionDetails]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [(BatchExecutionDetails,
[(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))])]
batchesDetailsAndProcessedCohorts,
_pdLiveQueryOptions :: SubscriptionsOptions
_pdLiveQueryOptions = SubscriptionsOptions
lqOpts,
_pdTotalTime :: DiffTime
_pdTotalTime = DiffTime
totalTime,
_pdSource :: SourceName
_pdSource = SourceName
sourceName,
_pdRole :: RoleName
_pdRole = RoleName
roleName,
_pdParameterizedQueryHash :: ParameterizedQueryHash
_pdParameterizedQueryHash = ParameterizedQueryHash
parameterizedQueryHash
}
STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
let processedCohortsMap :: HashMap
CohortVariables
(Cohort (TVar CursorVariableValues), CohortVariables, [Subscriber])
processedCohortsMap = [(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))]
-> HashMap
CohortVariables
(Cohort (TVar CursorVariableValues), CohortVariables, [Subscriber])
forall k v. (Eq k, Hashable k) => [(k, v)] -> HashMap k v
Map.fromList ([(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))]
-> HashMap
CohortVariables
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))
-> [(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))]
-> HashMap
CohortVariables
(Cohort (TVar CursorVariableValues), CohortVariables, [Subscriber])
forall a b. (a -> b) -> a -> b
$ (BatchExecutionDetails,
[(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))])
-> [(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))]
forall a b. (a, b) -> b
snd ((BatchExecutionDetails,
[(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))])
-> [(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))])
-> [(BatchExecutionDetails,
[(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))])]
-> [(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))]
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< [(BatchExecutionDetails,
[(CohortVariables,
(Cohort (TVar CursorVariableValues), CohortVariables,
[Subscriber]))])]
batchesDetailsAndProcessedCohorts
[(CohortVariables, Cohort (TVar CursorVariableValues))]
currentCohorts <- TMap CohortVariables (Cohort (TVar CursorVariableValues))
-> STM [(CohortVariables, Cohort (TVar CursorVariableValues))]
forall k v. TMap k v -> STM [(k, v)]
TMap.toList TMap CohortVariables (Cohort (TVar CursorVariableValues))
CohortMap 'Streaming
cohortMap
HashMap CohortVariables (Cohort (TVar CursorVariableValues))
updatedCohortsMap <-
(HashMap CohortVariables (Cohort (TVar CursorVariableValues))
-> (CohortVariables, Cohort (TVar CursorVariableValues))
-> STM
(HashMap CohortVariables (Cohort (TVar CursorVariableValues))))
-> HashMap CohortVariables (Cohort (TVar CursorVariableValues))
-> [(CohortVariables, Cohort (TVar CursorVariableValues))]
-> STM
(HashMap CohortVariables (Cohort (TVar CursorVariableValues)))
forall (t :: * -> *) (m :: * -> *) b a.
(Foldable t, Monad m) =>
(b -> a -> m b) -> b -> t a -> m b
foldM
( \HashMap CohortVariables (Cohort (TVar CursorVariableValues))
accCohortMap (CohortVariables
currentCohortKey, Cohort (TVar CursorVariableValues)
currentCohort) -> do
let processedCohortMaybe :: Maybe
(Cohort (TVar CursorVariableValues), CohortVariables, [Subscriber])
processedCohortMaybe = CohortVariables
-> HashMap
CohortVariables
(Cohort (TVar CursorVariableValues), CohortVariables, [Subscriber])
-> Maybe
(Cohort (TVar CursorVariableValues), CohortVariables, [Subscriber])
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
Map.lookup CohortVariables
currentCohortKey HashMap
CohortVariables
(Cohort (TVar CursorVariableValues), CohortVariables, [Subscriber])
processedCohortsMap
case Maybe
(Cohort (TVar CursorVariableValues), CohortVariables, [Subscriber])
processedCohortMaybe of
Maybe
(Cohort (TVar CursorVariableValues), CohortVariables, [Subscriber])
Nothing -> (Cohort (TVar CursorVariableValues)
-> Cohort (TVar CursorVariableValues)
-> STM (Cohort (TVar CursorVariableValues)))
-> CohortVariables
-> Cohort (TVar CursorVariableValues)
-> HashMap CohortVariables (Cohort (TVar CursorVariableValues))
-> STM
(HashMap CohortVariables (Cohort (TVar CursorVariableValues)))
forall (m :: * -> *) k v.
(Monad m, Hashable k, Eq k) =>
(v -> v -> m v) -> k -> v -> HashMap k v -> m (HashMap k v)
Map.insertWithM Cohort (TVar CursorVariableValues)
-> Cohort (TVar CursorVariableValues)
-> STM (Cohort (TVar CursorVariableValues))
Cohort 'Streaming -> Cohort 'Streaming -> STM (Cohort 'Streaming)
mergeCohorts CohortVariables
currentCohortKey Cohort (TVar CursorVariableValues)
currentCohort HashMap CohortVariables (Cohort (TVar CursorVariableValues))
accCohortMap
Just (Cohort (TVar CursorVariableValues)
processedCohort, CohortVariables
updatedCohortKey, [Subscriber]
snapshottedNewSubs) -> do
Cohort (TVar CursorVariableValues) -> [Subscriber] -> STM ()
forall streamCursorVars.
Cohort streamCursorVars -> [Subscriber] -> STM ()
updateCohortSubscribers Cohort (TVar CursorVariableValues)
currentCohort [Subscriber]
snapshottedNewSubs
[(SubscriberId, Subscriber)]
currentCohortExistingSubscribers <- TMap SubscriberId Subscriber -> STM [(SubscriberId, Subscriber)]
forall k v. TMap k v -> STM [(k, v)]
TMap.toList (TMap SubscriberId Subscriber -> STM [(SubscriberId, Subscriber)])
-> TMap SubscriberId Subscriber -> STM [(SubscriberId, Subscriber)]
forall a b. (a -> b) -> a -> b
$ Cohort (TVar CursorVariableValues) -> TMap SubscriberId Subscriber
forall streamCursorVars.
Cohort streamCursorVars -> TMap SubscriberId Subscriber
C._cExistingSubscribers Cohort (TVar CursorVariableValues)
currentCohort
HashMap SubscriberId Subscriber
newlyAddedSubscribers <- TMap SubscriberId Subscriber
-> STM (HashMap SubscriberId Subscriber)
forall k v. TMap k v -> STM (HashMap k v)
TMap.getMap (TMap SubscriberId Subscriber
-> STM (HashMap SubscriberId Subscriber))
-> TMap SubscriberId Subscriber
-> STM (HashMap SubscriberId Subscriber)
forall a b. (a -> b) -> a -> b
$ Cohort (TVar CursorVariableValues) -> TMap SubscriberId Subscriber
forall streamCursorVars.
Cohort streamCursorVars -> TMap SubscriberId Subscriber
C._cNewSubscribers Cohort (TVar CursorVariableValues)
currentCohort
HashMap CohortVariables (Cohort (TVar CursorVariableValues))
accCohortMapWithCurrentCohort <-
if HashMap SubscriberId Subscriber -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null HashMap SubscriberId Subscriber
newlyAddedSubscribers
then HashMap CohortVariables (Cohort (TVar CursorVariableValues))
-> STM
(HashMap CohortVariables (Cohort (TVar CursorVariableValues)))
forall (f :: * -> *) a. Applicative f => a -> f a
pure HashMap CohortVariables (Cohort (TVar CursorVariableValues))
accCohortMap
else do
Cohort (TVar CursorVariableValues)
newCohort <- do
TMap SubscriberId Subscriber
existingSubs <- STM (TMap SubscriberId Subscriber)
forall k v. STM (TMap k v)
TMap.new
TMap SubscriberId Subscriber
newSubs <- STM (TMap SubscriberId Subscriber)
forall k v. STM (TMap k v)
TMap.new
Cohort (TVar CursorVariableValues)
-> STM (Cohort (TVar CursorVariableValues))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Cohort (TVar CursorVariableValues)
-> STM (Cohort (TVar CursorVariableValues)))
-> Cohort (TVar CursorVariableValues)
-> STM (Cohort (TVar CursorVariableValues))
forall a b. (a -> b) -> a -> b
$
CohortId
-> TVar (Maybe ResponseHash)
-> TMap SubscriberId Subscriber
-> TMap SubscriberId Subscriber
-> TVar CursorVariableValues
-> Cohort (TVar CursorVariableValues)
forall streamCursorVars.
CohortId
-> TVar (Maybe ResponseHash)
-> TMap SubscriberId Subscriber
-> TMap SubscriberId Subscriber
-> streamCursorVars
-> Cohort streamCursorVars
C.Cohort
(Cohort (TVar CursorVariableValues) -> CohortId
forall streamCursorVars. Cohort streamCursorVars -> CohortId
C._cCohortId Cohort (TVar CursorVariableValues)
currentCohort)
(Cohort (TVar CursorVariableValues) -> TVar (Maybe ResponseHash)
forall streamCursorVars.
Cohort streamCursorVars -> TVar (Maybe ResponseHash)
C._cPreviousResponse Cohort (TVar CursorVariableValues)
currentCohort)
TMap SubscriberId Subscriber
existingSubs
TMap SubscriberId Subscriber
newSubs
(Cohort (TVar CursorVariableValues) -> TVar CursorVariableValues
forall streamCursorVars.
Cohort streamCursorVars -> streamCursorVars
C._cStreamCursorVariables Cohort (TVar CursorVariableValues)
currentCohort)
TMap SubscriberId Subscriber
-> HashMap SubscriberId Subscriber -> STM ()
forall k v. TMap k v -> HashMap k v -> STM ()
TMap.replace (Cohort (TVar CursorVariableValues) -> TMap SubscriberId Subscriber
forall streamCursorVars.
Cohort streamCursorVars -> TMap SubscriberId Subscriber
C._cNewSubscribers Cohort (TVar CursorVariableValues)
newCohort) HashMap SubscriberId Subscriber
newlyAddedSubscribers
(Cohort (TVar CursorVariableValues)
-> Cohort (TVar CursorVariableValues)
-> STM (Cohort (TVar CursorVariableValues)))
-> CohortVariables
-> Cohort (TVar CursorVariableValues)
-> HashMap CohortVariables (Cohort (TVar CursorVariableValues))
-> STM
(HashMap CohortVariables (Cohort (TVar CursorVariableValues)))
forall (m :: * -> *) k v.
(Monad m, Hashable k, Eq k) =>
(v -> v -> m v) -> k -> v -> HashMap k v -> m (HashMap k v)
Map.insertWithM Cohort (TVar CursorVariableValues)
-> Cohort (TVar CursorVariableValues)
-> STM (Cohort (TVar CursorVariableValues))
Cohort 'Streaming -> Cohort 'Streaming -> STM (Cohort 'Streaming)
mergeCohorts CohortVariables
currentCohortKey Cohort (TVar CursorVariableValues)
newCohort HashMap CohortVariables (Cohort (TVar CursorVariableValues))
accCohortMap
let allCurrentSubscribers :: HashSet SubscriberId
allCurrentSubscribers = [SubscriberId] -> HashSet SubscriberId
forall a. (Eq a, Hashable a) => [a] -> HashSet a
Set.fromList ([SubscriberId] -> HashSet SubscriberId)
-> [SubscriberId] -> HashSet SubscriberId
forall a b. (a -> b) -> a -> b
$ (SubscriberId, Subscriber) -> SubscriberId
forall a b. (a, b) -> a
fst ((SubscriberId, Subscriber) -> SubscriberId)
-> [(SubscriberId, Subscriber)] -> [SubscriberId]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (HashMap SubscriberId Subscriber -> [(SubscriberId, Subscriber)]
forall k v. HashMap k v -> [(k, v)]
Map.toList HashMap SubscriberId Subscriber
newlyAddedSubscribers [(SubscriberId, Subscriber)]
-> [(SubscriberId, Subscriber)] -> [(SubscriberId, Subscriber)]
forall a. Semigroup a => a -> a -> a
<> [(SubscriberId, Subscriber)]
currentCohortExistingSubscribers)
(SubscriberId -> Subscriber -> Bool)
-> TMap SubscriberId Subscriber -> STM ()
forall k v. (k -> v -> Bool) -> TMap k v -> STM ()
TMap.filterWithKey (\SubscriberId
k Subscriber
_ -> SubscriberId
k SubscriberId -> HashSet SubscriberId -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` HashSet SubscriberId
allCurrentSubscribers) (TMap SubscriberId Subscriber -> STM ())
-> TMap SubscriberId Subscriber -> STM ()
forall a b. (a -> b) -> a -> b
$ Cohort (TVar CursorVariableValues) -> TMap SubscriberId Subscriber
forall streamCursorVars.
Cohort streamCursorVars -> TMap SubscriberId Subscriber
C._cExistingSubscribers Cohort (TVar CursorVariableValues)
processedCohort
(SubscriberId -> Subscriber -> Bool)
-> TMap SubscriberId Subscriber -> STM ()
forall k v. (k -> v -> Bool) -> TMap k v -> STM ()
TMap.filterWithKey (\SubscriberId
k Subscriber
_ -> SubscriberId
k SubscriberId -> HashSet SubscriberId -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` HashSet SubscriberId
allCurrentSubscribers) (TMap SubscriberId Subscriber -> STM ())
-> TMap SubscriberId Subscriber -> STM ()
forall a b. (a -> b) -> a -> b
$ Cohort (TVar CursorVariableValues) -> TMap SubscriberId Subscriber
forall streamCursorVars.
Cohort streamCursorVars -> TMap SubscriberId Subscriber
C._cNewSubscribers Cohort (TVar CursorVariableValues)
processedCohort
(Cohort (TVar CursorVariableValues)
-> Cohort (TVar CursorVariableValues)
-> STM (Cohort (TVar CursorVariableValues)))
-> CohortVariables
-> Cohort (TVar CursorVariableValues)
-> HashMap CohortVariables (Cohort (TVar CursorVariableValues))
-> STM
(HashMap CohortVariables (Cohort (TVar CursorVariableValues)))
forall (m :: * -> *) k v.
(Monad m, Hashable k, Eq k) =>
(v -> v -> m v) -> k -> v -> HashMap k v -> m (HashMap k v)
Map.insertWithM Cohort (TVar CursorVariableValues)
-> Cohort (TVar CursorVariableValues)
-> STM (Cohort (TVar CursorVariableValues))
Cohort 'Streaming -> Cohort 'Streaming -> STM (Cohort 'Streaming)
mergeCohorts CohortVariables
updatedCohortKey Cohort (TVar CursorVariableValues)
processedCohort HashMap CohortVariables (Cohort (TVar CursorVariableValues))
accCohortMapWithCurrentCohort
)
HashMap CohortVariables (Cohort (TVar CursorVariableValues))
forall a. Monoid a => a
mempty
[(CohortVariables, Cohort (TVar CursorVariableValues))]
currentCohorts
TMap CohortVariables (Cohort (TVar CursorVariableValues))
-> HashMap CohortVariables (Cohort (TVar CursorVariableValues))
-> STM ()
forall k v. TMap k v -> HashMap k v -> STM ()
TMap.replace TMap CohortVariables (Cohort (TVar CursorVariableValues))
CohortMap 'Streaming
cohortMap HashMap CohortVariables (Cohort (TVar CursorVariableValues))
updatedCohortsMap
SubscriptionPostPollHook
postPollHook PollDetails
pollDetails
where
SubscriptionsOptions BatchSize
batchSize RefetchInterval
_ = SubscriptionsOptions
lqOpts
updateCohortSubscribers :: Cohort streamCursorVars -> [Subscriber] -> STM ()
updateCohortSubscribers (C.Cohort CohortId
_id TVar (Maybe ResponseHash)
_respRef TMap SubscriberId Subscriber
curOpsTV TMap SubscriberId Subscriber
newOpsTV streamCursorVars
_) [Subscriber]
snapshottedNewSubs = do
[(SubscriberId, Subscriber)]
allNewOpsL <- TMap SubscriberId Subscriber -> STM [(SubscriberId, Subscriber)]
forall k v. TMap k v -> STM [(k, v)]
TMap.toList TMap SubscriberId Subscriber
newOpsTV
let snapshottedNewSubsSet :: HashSet SubscriberId
snapshottedNewSubsSet = [SubscriberId] -> HashSet SubscriberId
forall a. (Eq a, Hashable a) => [a] -> HashSet a
Set.fromList ([SubscriberId] -> HashSet SubscriberId)
-> [SubscriberId] -> HashSet SubscriberId
forall a b. (a -> b) -> a -> b
$ Subscriber -> SubscriberId
_sId (Subscriber -> SubscriberId) -> [Subscriber] -> [SubscriberId]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [Subscriber]
snapshottedNewSubs
[(SubscriberId, Subscriber)]
-> ((SubscriberId, Subscriber) -> STM ()) -> STM ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [(SubscriberId, Subscriber)]
allNewOpsL (((SubscriberId, Subscriber) -> STM ()) -> STM ())
-> ((SubscriberId, Subscriber) -> STM ()) -> STM ()
forall a b. (a -> b) -> a -> b
$ \(SubscriberId
subId, Subscriber
subscriber) ->
Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (SubscriberId
subId SubscriberId -> HashSet SubscriberId -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` HashSet SubscriberId
snapshottedNewSubsSet) do
Subscriber
-> SubscriberId -> TMap SubscriberId Subscriber -> STM ()
forall k v. (Eq k, Hashable k) => v -> k -> TMap k v -> STM ()
TMap.insert Subscriber
subscriber SubscriberId
subId TMap SubscriberId Subscriber
curOpsTV
SubscriberId -> TMap SubscriberId Subscriber -> STM ()
forall k v. (Eq k, Hashable k) => k -> TMap k v -> STM ()
TMap.delete SubscriberId
subId TMap SubscriberId Subscriber
newOpsTV
getCohortSnapshot :: (CohortVariables, Cohort streamCursorVars)
-> STM (CohortId, (CohortSnapshot, Cohort streamCursorVars))
getCohortSnapshot (CohortVariables
cohortVars, Cohort streamCursorVars
cohort) = do
let C.Cohort CohortId
resId TVar (Maybe ResponseHash)
respRef TMap SubscriberId Subscriber
curOpsTV TMap SubscriberId Subscriber
newOpsTV streamCursorVars
_ = Cohort streamCursorVars
cohort
[(SubscriberId, Subscriber)]
curOpsL <- TMap SubscriberId Subscriber -> STM [(SubscriberId, Subscriber)]
forall k v. TMap k v -> STM [(k, v)]
TMap.toList TMap SubscriberId Subscriber
curOpsTV
[(SubscriberId, Subscriber)]
newOpsL <- TMap SubscriberId Subscriber -> STM [(SubscriberId, Subscriber)]
forall k v. TMap k v -> STM [(k, v)]
TMap.toList TMap SubscriberId Subscriber
newOpsTV
let cohortSnapshot :: CohortSnapshot
cohortSnapshot = CohortVariables
-> TVar (Maybe ResponseHash)
-> [Subscriber]
-> [Subscriber]
-> CohortSnapshot
C.CohortSnapshot CohortVariables
cohortVars TVar (Maybe ResponseHash)
respRef (((SubscriberId, Subscriber) -> Subscriber)
-> [(SubscriberId, Subscriber)] -> [Subscriber]
forall a b. (a -> b) -> [a] -> [b]
map (SubscriberId, Subscriber) -> Subscriber
forall a b. (a, b) -> b
snd [(SubscriberId, Subscriber)]
curOpsL) (((SubscriberId, Subscriber) -> Subscriber)
-> [(SubscriberId, Subscriber)] -> [Subscriber]
forall a b. (a -> b) -> [a] -> [b]
map (SubscriberId, Subscriber) -> Subscriber
forall a b. (a, b) -> b
snd [(SubscriberId, Subscriber)]
newOpsL)
(CohortId, (CohortSnapshot, Cohort streamCursorVars))
-> STM (CohortId, (CohortSnapshot, Cohort streamCursorVars))
forall (m :: * -> *) a. Monad m => a -> m a
return (CohortId
resId, (CohortSnapshot
cohortSnapshot, Cohort streamCursorVars
cohort))
mergeCohorts :: Cohort 'Streaming -> Cohort 'Streaming -> STM.STM (Cohort 'Streaming)
mergeCohorts :: Cohort 'Streaming -> Cohort 'Streaming -> STM (Cohort 'Streaming)
mergeCohorts Cohort 'Streaming
newCohort Cohort 'Streaming
oldCohort = do
let newCohortExistingSubscribers :: TMap SubscriberId Subscriber
newCohortExistingSubscribers = Cohort (TVar CursorVariableValues) -> TMap SubscriberId Subscriber
forall streamCursorVars.
Cohort streamCursorVars -> TMap SubscriberId Subscriber
C._cExistingSubscribers Cohort (TVar CursorVariableValues)
Cohort 'Streaming
newCohort
oldCohortExistingSubscribers :: TMap SubscriberId Subscriber
oldCohortExistingSubscribers = Cohort (TVar CursorVariableValues) -> TMap SubscriberId Subscriber
forall streamCursorVars.
Cohort streamCursorVars -> TMap SubscriberId Subscriber
C._cExistingSubscribers Cohort (TVar CursorVariableValues)
Cohort 'Streaming
oldCohort
newCohortNewSubscribers :: TMap SubscriberId Subscriber
newCohortNewSubscribers = Cohort (TVar CursorVariableValues) -> TMap SubscriberId Subscriber
forall streamCursorVars.
Cohort streamCursorVars -> TMap SubscriberId Subscriber
C._cNewSubscribers Cohort (TVar CursorVariableValues)
Cohort 'Streaming
newCohort
oldCohortNewSubscribers :: TMap SubscriberId Subscriber
oldCohortNewSubscribers = Cohort (TVar CursorVariableValues) -> TMap SubscriberId Subscriber
forall streamCursorVars.
Cohort streamCursorVars -> TMap SubscriberId Subscriber
C._cNewSubscribers Cohort (TVar CursorVariableValues)
Cohort 'Streaming
oldCohort
TMap SubscriberId Subscriber
mergedExistingSubscribers <- TMap SubscriberId Subscriber
-> TMap SubscriberId Subscriber
-> STM (TMap SubscriberId Subscriber)
forall k v.
(Eq k, Hashable k) =>
TMap k v -> TMap k v -> STM (TMap k v)
TMap.union TMap SubscriberId Subscriber
newCohortExistingSubscribers TMap SubscriberId Subscriber
oldCohortExistingSubscribers
TMap SubscriberId Subscriber
mergedNewSubscribers <- TMap SubscriberId Subscriber
-> TMap SubscriberId Subscriber
-> STM (TMap SubscriberId Subscriber)
forall k v.
(Eq k, Hashable k) =>
TMap k v -> TMap k v -> STM (TMap k v)
TMap.union TMap SubscriberId Subscriber
newCohortNewSubscribers TMap SubscriberId Subscriber
oldCohortNewSubscribers
Cohort (TVar CursorVariableValues)
-> STM (Cohort (TVar CursorVariableValues))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Cohort (TVar CursorVariableValues)
-> STM (Cohort (TVar CursorVariableValues)))
-> Cohort (TVar CursorVariableValues)
-> STM (Cohort (TVar CursorVariableValues))
forall a b. (a -> b) -> a -> b
$
Cohort (TVar CursorVariableValues)
Cohort 'Streaming
newCohort
{ _cNewSubscribers :: TMap SubscriberId Subscriber
C._cNewSubscribers = TMap SubscriberId Subscriber
mergedNewSubscribers,
_cExistingSubscribers :: TMap SubscriberId Subscriber
C._cExistingSubscribers = TMap SubscriberId Subscriber
mergedExistingSubscribers
}
getCohortOperations :: [(k, t)]
-> Either QErr [(k, ByteString, a)]
-> [(m ByteString, k, Maybe (ResponseHash, Int), Maybe a, t)]
getCohortOperations [(k, t)]
cohorts = \case
Left QErr
e ->
let resp :: m ByteString
resp = GQExecError -> m ByteString
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (GQExecError -> m ByteString) -> GQExecError -> m ByteString
forall a b. (a -> b) -> a -> b
$ [Value] -> GQExecError
GQExecError [Bool -> QErr -> Value
encodeGQLErr Bool
False QErr
e]
in [(m ByteString
resp, k
cohortId, Maybe (ResponseHash, Int)
forall a. Maybe a
Nothing, Maybe a
forall a. Maybe a
Nothing, t
snapshot) | (k
cohortId, t
snapshot) <- [(k, t)]
cohorts]
Right [(k, ByteString, a)]
responses -> do
let cohortSnapshotMap :: HashMap k t
cohortSnapshotMap = [(k, t)] -> HashMap k t
forall k v. (Eq k, Hashable k) => [(k, v)] -> HashMap k v
Map.fromList [(k, t)]
cohorts
(((k, ByteString, a)
-> Maybe (m ByteString, k, Maybe (ResponseHash, Int), Maybe a, t))
-> [(k, ByteString, a)]
-> [(m ByteString, k, Maybe (ResponseHash, Int), Maybe a, t)])
-> [(k, ByteString, a)]
-> ((k, ByteString, a)
-> Maybe (m ByteString, k, Maybe (ResponseHash, Int), Maybe a, t))
-> [(m ByteString, k, Maybe (ResponseHash, Int), Maybe a, t)]
forall a b c. (a -> b -> c) -> b -> a -> c
flip ((k, ByteString, a)
-> Maybe (m ByteString, k, Maybe (ResponseHash, Int), Maybe a, t))
-> [(k, ByteString, a)]
-> [(m ByteString, k, Maybe (ResponseHash, Int), Maybe a, t)]
forall (f :: * -> *) a b.
Filterable f =>
(a -> Maybe b) -> f a -> f b
mapMaybe [(k, ByteString, a)]
responses (((k, ByteString, a)
-> Maybe (m ByteString, k, Maybe (ResponseHash, Int), Maybe a, t))
-> [(m ByteString, k, Maybe (ResponseHash, Int), Maybe a, t)])
-> ((k, ByteString, a)
-> Maybe (m ByteString, k, Maybe (ResponseHash, Int), Maybe a, t))
-> [(m ByteString, k, Maybe (ResponseHash, Int), Maybe a, t)]
forall a b. (a -> b) -> a -> b
$ \(k
cohortId, ByteString
respBS, a
respCursorLatestValue) ->
let respHash :: ResponseHash
respHash = ByteString -> ResponseHash
mkRespHash ByteString
respBS
respSize :: Int
respSize = ByteString -> Int
BS.length ByteString
respBS
in
(ByteString -> m ByteString
forall (f :: * -> *) a. Applicative f => a -> f a
pure ByteString
respBS,k
cohortId,(ResponseHash, Int) -> Maybe (ResponseHash, Int)
forall a. a -> Maybe a
Just (ResponseHash
respHash, Int
respSize),a -> Maybe a
forall a. a -> Maybe a
Just a
respCursorLatestValue,)
(t -> (m ByteString, k, Maybe (ResponseHash, Int), Maybe a, t))
-> Maybe t
-> Maybe (m ByteString, k, Maybe (ResponseHash, Int), Maybe a, t)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> k -> HashMap k t -> Maybe t
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
Map.lookup k
cohortId HashMap k t
cohortSnapshotMap