module Hasura.GraphQL.Execute.Subscription.Poll.Common
(
Poller (..),
PollerId (..),
PollerIOState (..),
PollerKey (..),
PollerMap,
dumpPollerMap,
PollDetails (..),
BatchExecutionDetails (..),
CohortExecutionDetails (..),
SubscriptionPostPollHook,
defaultSubscriptionPostPollHook,
Cohort (..),
CohortSnapshot (..),
CursorVariableValues (..),
CohortId,
newCohortId,
CohortVariables,
CohortKey,
CohortMap,
Subscriber (..),
SubscriberId,
newSubscriberId,
SubscriberMetadata,
mkSubscriberMetadata,
unSubscriberMetadata,
SubscriberMap,
OnChange,
SubscriptionGQResponse,
SubscriptionResponse (..),
SubscriptionMetadata (..),
SubscriberExecutionDetails (..),
BatchId (..),
ResponseHash (..),
mkRespHash,
)
where
import Control.Concurrent.STM qualified as STM
import Control.Immortal qualified as Immortal
import Crypto.Hash qualified as CH
import Data.Aeson qualified as J
import Data.ByteString qualified as BS
import Data.Time.Clock qualified as Clock
import Data.UUID qualified as UUID
import Data.UUID.V4 qualified as UUID
import Hasura.GraphQL.Execute.Subscription.Options
import Hasura.GraphQL.Execute.Subscription.Plan
import Hasura.GraphQL.Execute.Subscription.TMap qualified as TMap
import Hasura.GraphQL.ParameterizedQueryHash (ParameterizedQueryHash)
import Hasura.GraphQL.Transport.HTTP.Protocol
import Hasura.GraphQL.Transport.WebSocket.Protocol (OperationId)
import Hasura.GraphQL.Transport.WebSocket.Server qualified as WS
import Hasura.Logging qualified as L
import Hasura.Prelude
import Hasura.RQL.Types.Common (SourceName)
import Hasura.Server.Types (RequestId)
import Hasura.Session
import ListT qualified
import StmContainers.Map qualified as STMMap
newtype SubscriberId = SubscriberId {SubscriberId -> UUID
unSubscriberId :: UUID.UUID}
deriving (Int -> SubscriberId -> ShowS
[SubscriberId] -> ShowS
SubscriberId -> String
(Int -> SubscriberId -> ShowS)
-> (SubscriberId -> String)
-> ([SubscriberId] -> ShowS)
-> Show SubscriberId
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [SubscriberId] -> ShowS
$cshowList :: [SubscriberId] -> ShowS
show :: SubscriberId -> String
$cshow :: SubscriberId -> String
showsPrec :: Int -> SubscriberId -> ShowS
$cshowsPrec :: Int -> SubscriberId -> ShowS
Show, SubscriberId -> SubscriberId -> Bool
(SubscriberId -> SubscriberId -> Bool)
-> (SubscriberId -> SubscriberId -> Bool) -> Eq SubscriberId
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: SubscriberId -> SubscriberId -> Bool
$c/= :: SubscriberId -> SubscriberId -> Bool
== :: SubscriberId -> SubscriberId -> Bool
$c== :: SubscriberId -> SubscriberId -> Bool
Eq, (forall x. SubscriberId -> Rep SubscriberId x)
-> (forall x. Rep SubscriberId x -> SubscriberId)
-> Generic SubscriberId
forall x. Rep SubscriberId x -> SubscriberId
forall x. SubscriberId -> Rep SubscriberId x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep SubscriberId x -> SubscriberId
$cfrom :: forall x. SubscriberId -> Rep SubscriberId x
Generic, Int -> SubscriberId -> Int
SubscriberId -> Int
(Int -> SubscriberId -> Int)
-> (SubscriberId -> Int) -> Hashable SubscriberId
forall a. (Int -> a -> Int) -> (a -> Int) -> Hashable a
hash :: SubscriberId -> Int
$chash :: SubscriberId -> Int
hashWithSalt :: Int -> SubscriberId -> Int
$chashWithSalt :: Int -> SubscriberId -> Int
Hashable, [SubscriberId] -> Value
[SubscriberId] -> Encoding
SubscriberId -> Value
SubscriberId -> Encoding
(SubscriberId -> Value)
-> (SubscriberId -> Encoding)
-> ([SubscriberId] -> Value)
-> ([SubscriberId] -> Encoding)
-> ToJSON SubscriberId
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> ToJSON a
toEncodingList :: [SubscriberId] -> Encoding
$ctoEncodingList :: [SubscriberId] -> Encoding
toJSONList :: [SubscriberId] -> Value
$ctoJSONList :: [SubscriberId] -> Value
toEncoding :: SubscriberId -> Encoding
$ctoEncoding :: SubscriberId -> Encoding
toJSON :: SubscriberId -> Value
$ctoJSON :: SubscriberId -> Value
J.ToJSON)
newSubscriberId :: IO SubscriberId
newSubscriberId :: IO SubscriberId
newSubscriberId =
UUID -> SubscriberId
SubscriberId (UUID -> SubscriberId) -> IO UUID -> IO SubscriberId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
newtype SubscriberMetadata = SubscriberMetadata {SubscriberMetadata -> Value
unSubscriberMetadata :: J.Value}
deriving (Int -> SubscriberMetadata -> ShowS
[SubscriberMetadata] -> ShowS
SubscriberMetadata -> String
(Int -> SubscriberMetadata -> ShowS)
-> (SubscriberMetadata -> String)
-> ([SubscriberMetadata] -> ShowS)
-> Show SubscriberMetadata
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [SubscriberMetadata] -> ShowS
$cshowList :: [SubscriberMetadata] -> ShowS
show :: SubscriberMetadata -> String
$cshow :: SubscriberMetadata -> String
showsPrec :: Int -> SubscriberMetadata -> ShowS
$cshowsPrec :: Int -> SubscriberMetadata -> ShowS
Show, SubscriberMetadata -> SubscriberMetadata -> Bool
(SubscriberMetadata -> SubscriberMetadata -> Bool)
-> (SubscriberMetadata -> SubscriberMetadata -> Bool)
-> Eq SubscriberMetadata
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: SubscriberMetadata -> SubscriberMetadata -> Bool
$c/= :: SubscriberMetadata -> SubscriberMetadata -> Bool
== :: SubscriberMetadata -> SubscriberMetadata -> Bool
$c== :: SubscriberMetadata -> SubscriberMetadata -> Bool
Eq, [SubscriberMetadata] -> Value
[SubscriberMetadata] -> Encoding
SubscriberMetadata -> Value
SubscriberMetadata -> Encoding
(SubscriberMetadata -> Value)
-> (SubscriberMetadata -> Encoding)
-> ([SubscriberMetadata] -> Value)
-> ([SubscriberMetadata] -> Encoding)
-> ToJSON SubscriberMetadata
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> ToJSON a
toEncodingList :: [SubscriberMetadata] -> Encoding
$ctoEncodingList :: [SubscriberMetadata] -> Encoding
toJSONList :: [SubscriberMetadata] -> Value
$ctoJSONList :: [SubscriberMetadata] -> Value
toEncoding :: SubscriberMetadata -> Encoding
$ctoEncoding :: SubscriberMetadata -> Encoding
toJSON :: SubscriberMetadata -> Value
$ctoJSON :: SubscriberMetadata -> Value
J.ToJSON)
mkSubscriberMetadata :: WS.WSId -> OperationId -> Maybe OperationName -> RequestId -> SubscriberMetadata
mkSubscriberMetadata :: WSId
-> OperationId
-> Maybe OperationName
-> RequestId
-> SubscriberMetadata
mkSubscriberMetadata WSId
websocketId OperationId
operationId Maybe OperationName
operationName RequestId
reqId =
Value -> SubscriberMetadata
SubscriberMetadata (Value -> SubscriberMetadata) -> Value -> SubscriberMetadata
forall a b. (a -> b) -> a -> b
$
[Pair] -> Value
J.object
[ Key
"websocket_id" Key -> WSId -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= WSId
websocketId,
Key
"operation_id" Key -> OperationId -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= OperationId
operationId,
Key
"operation_name" Key -> Maybe OperationName -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= Maybe OperationName
operationName,
Key
"request_id" Key -> RequestId -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= RequestId
reqId
]
data Subscriber = Subscriber
{ Subscriber -> SubscriberId
_sId :: !SubscriberId,
Subscriber -> SubscriberMetadata
_sMetadata :: !SubscriberMetadata,
Subscriber -> RequestId
_sRequestId :: !RequestId,
Subscriber -> Maybe OperationName
_sOperationName :: !(Maybe OperationName),
Subscriber -> OnChange
_sOnChangeCallback :: !OnChange
}
data SubscriptionMetadata = SubscriptionMetadata
{ SubscriptionMetadata -> DiffTime
_sqmExecutionTime :: !Clock.DiffTime
}
data SubscriptionResponse = SubscriptionResponse
{ SubscriptionResponse -> ByteString
_lqrPayload :: !BS.ByteString,
SubscriptionResponse -> DiffTime
_lqrExecutionTime :: !Clock.DiffTime
}
type SubscriptionGQResponse = GQResult SubscriptionResponse
type OnChange = SubscriptionGQResponse -> IO ()
type SubscriberMap = TMap.TMap SubscriberId Subscriber
data Cohort streamCursorVars = Cohort
{
Cohort streamCursorVars -> CohortId
_cCohortId :: !CohortId,
Cohort streamCursorVars -> TVar (Maybe ResponseHash)
_cPreviousResponse :: !(STM.TVar (Maybe ResponseHash)),
Cohort streamCursorVars -> SubscriberMap
_cExistingSubscribers :: !SubscriberMap,
Cohort streamCursorVars -> SubscriberMap
_cNewSubscribers :: !SubscriberMap,
Cohort streamCursorVars -> streamCursorVars
_cStreamCursorVariables :: !streamCursorVars
}
newtype BatchId = BatchId {BatchId -> Int
_unBatchId :: Int}
deriving (Int -> BatchId -> ShowS
[BatchId] -> ShowS
BatchId -> String
(Int -> BatchId -> ShowS)
-> (BatchId -> String) -> ([BatchId] -> ShowS) -> Show BatchId
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [BatchId] -> ShowS
$cshowList :: [BatchId] -> ShowS
show :: BatchId -> String
$cshow :: BatchId -> String
showsPrec :: Int -> BatchId -> ShowS
$cshowsPrec :: Int -> BatchId -> ShowS
Show, BatchId -> BatchId -> Bool
(BatchId -> BatchId -> Bool)
-> (BatchId -> BatchId -> Bool) -> Eq BatchId
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: BatchId -> BatchId -> Bool
$c/= :: BatchId -> BatchId -> Bool
== :: BatchId -> BatchId -> Bool
$c== :: BatchId -> BatchId -> Bool
Eq, [BatchId] -> Value
[BatchId] -> Encoding
BatchId -> Value
BatchId -> Encoding
(BatchId -> Value)
-> (BatchId -> Encoding)
-> ([BatchId] -> Value)
-> ([BatchId] -> Encoding)
-> ToJSON BatchId
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> ToJSON a
toEncodingList :: [BatchId] -> Encoding
$ctoEncodingList :: [BatchId] -> Encoding
toJSONList :: [BatchId] -> Value
$ctoJSONList :: [BatchId] -> Value
toEncoding :: BatchId -> Encoding
$ctoEncoding :: BatchId -> Encoding
toJSON :: BatchId -> Value
$ctoJSON :: BatchId -> Value
J.ToJSON)
newtype ResponseHash = ResponseHash {ResponseHash -> Digest Blake2b_256
unResponseHash :: CH.Digest CH.Blake2b_256}
deriving (Int -> ResponseHash -> ShowS
[ResponseHash] -> ShowS
ResponseHash -> String
(Int -> ResponseHash -> ShowS)
-> (ResponseHash -> String)
-> ([ResponseHash] -> ShowS)
-> Show ResponseHash
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ResponseHash] -> ShowS
$cshowList :: [ResponseHash] -> ShowS
show :: ResponseHash -> String
$cshow :: ResponseHash -> String
showsPrec :: Int -> ResponseHash -> ShowS
$cshowsPrec :: Int -> ResponseHash -> ShowS
Show, ResponseHash -> ResponseHash -> Bool
(ResponseHash -> ResponseHash -> Bool)
-> (ResponseHash -> ResponseHash -> Bool) -> Eq ResponseHash
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ResponseHash -> ResponseHash -> Bool
$c/= :: ResponseHash -> ResponseHash -> Bool
== :: ResponseHash -> ResponseHash -> Bool
$c== :: ResponseHash -> ResponseHash -> Bool
Eq)
instance J.ToJSON ResponseHash where
toJSON :: ResponseHash -> Value
toJSON = String -> Value
forall a. ToJSON a => a -> Value
J.toJSON (String -> Value)
-> (ResponseHash -> String) -> ResponseHash -> Value
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Digest Blake2b_256 -> String
forall a. Show a => a -> String
show (Digest Blake2b_256 -> String)
-> (ResponseHash -> Digest Blake2b_256) -> ResponseHash -> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ResponseHash -> Digest Blake2b_256
unResponseHash
mkRespHash :: BS.ByteString -> ResponseHash
mkRespHash :: ByteString -> ResponseHash
mkRespHash = Digest Blake2b_256 -> ResponseHash
ResponseHash (Digest Blake2b_256 -> ResponseHash)
-> (ByteString -> Digest Blake2b_256) -> ByteString -> ResponseHash
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Digest Blake2b_256
forall ba a.
(ByteArrayAccess ba, HashAlgorithm a) =>
ba -> Digest a
CH.hash
type CohortKey = CohortVariables
type CohortMap streamCursor = TMap.TMap CohortKey (Cohort streamCursor)
dumpCohortMap :: CohortMap streamCursor -> IO J.Value
dumpCohortMap :: CohortMap streamCursor -> IO Value
dumpCohortMap CohortMap streamCursor
cohortMap = do
[(CohortKey, Cohort streamCursor)]
cohorts <- STM [(CohortKey, Cohort streamCursor)]
-> IO [(CohortKey, Cohort streamCursor)]
forall a. STM a -> IO a
STM.atomically (STM [(CohortKey, Cohort streamCursor)]
-> IO [(CohortKey, Cohort streamCursor)])
-> STM [(CohortKey, Cohort streamCursor)]
-> IO [(CohortKey, Cohort streamCursor)]
forall a b. (a -> b) -> a -> b
$ CohortMap streamCursor -> STM [(CohortKey, Cohort streamCursor)]
forall k v. TMap k v -> STM [(k, v)]
TMap.toList CohortMap streamCursor
cohortMap
([Value] -> Value) -> IO [Value] -> IO Value
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap [Value] -> Value
forall a. ToJSON a => a -> Value
J.toJSON (IO [Value] -> IO Value)
-> (((CohortKey, Cohort streamCursor) -> IO Value) -> IO [Value])
-> ((CohortKey, Cohort streamCursor) -> IO Value)
-> IO Value
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [(CohortKey, Cohort streamCursor)]
-> ((CohortKey, Cohort streamCursor) -> IO Value) -> IO [Value]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [(CohortKey, Cohort streamCursor)]
cohorts (((CohortKey, Cohort streamCursor) -> IO Value) -> IO Value)
-> ((CohortKey, Cohort streamCursor) -> IO Value) -> IO Value
forall a b. (a -> b) -> a -> b
$ \(CohortKey
variableValues, Cohort streamCursor
cohort) -> do
Value
cohortJ <- Cohort streamCursor -> IO Value
forall streamCursorVars. Cohort streamCursorVars -> IO Value
dumpCohort Cohort streamCursor
cohort
Value -> IO Value
forall (m :: * -> *) a. Monad m => a -> m a
return (Value -> IO Value) -> Value -> IO Value
forall a b. (a -> b) -> a -> b
$
[Pair] -> Value
J.object
[ Key
"variables" Key -> CohortKey -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= CohortKey
variableValues,
Key
"cohort" Key -> Value -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= Value
cohortJ
]
where
dumpCohort :: Cohort streamCursorVars -> IO Value
dumpCohort (Cohort CohortId
respId TVar (Maybe ResponseHash)
respTV SubscriberMap
curOps SubscriberMap
newOps streamCursorVars
_) =
STM Value -> IO Value
forall a. STM a -> IO a
STM.atomically (STM Value -> IO Value) -> STM Value -> IO Value
forall a b. (a -> b) -> a -> b
$ do
Maybe ResponseHash
prevResHash <- TVar (Maybe ResponseHash) -> STM (Maybe ResponseHash)
forall a. TVar a -> STM a
STM.readTVar TVar (Maybe ResponseHash)
respTV
[(SubscriberId, Subscriber)]
curOpIds <- SubscriberMap -> STM [(SubscriberId, Subscriber)]
forall k v. TMap k v -> STM [(k, v)]
TMap.toList SubscriberMap
curOps
[(SubscriberId, Subscriber)]
newOpIds <- SubscriberMap -> STM [(SubscriberId, Subscriber)]
forall k v. TMap k v -> STM [(k, v)]
TMap.toList SubscriberMap
newOps
Value -> STM Value
forall (m :: * -> *) a. Monad m => a -> m a
return (Value -> STM Value) -> Value -> STM Value
forall a b. (a -> b) -> a -> b
$
[Pair] -> Value
J.object
[ Key
"resp_id" Key -> CohortId -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= CohortId
respId,
Key
"current_ops" Key -> [SubscriberId] -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= ((SubscriberId, Subscriber) -> SubscriberId)
-> [(SubscriberId, Subscriber)] -> [SubscriberId]
forall a b. (a -> b) -> [a] -> [b]
map (SubscriberId, Subscriber) -> SubscriberId
forall a b. (a, b) -> a
fst [(SubscriberId, Subscriber)]
curOpIds,
Key
"new_ops" Key -> [SubscriberId] -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= ((SubscriberId, Subscriber) -> SubscriberId)
-> [(SubscriberId, Subscriber)] -> [SubscriberId]
forall a b. (a -> b) -> [a] -> [b]
map (SubscriberId, Subscriber) -> SubscriberId
forall a b. (a, b) -> a
fst [(SubscriberId, Subscriber)]
newOpIds,
Key
"previous_result_hash" Key -> Maybe ResponseHash -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= Maybe ResponseHash
prevResHash
]
data CohortSnapshot = CohortSnapshot
{ CohortSnapshot -> CohortKey
_csVariables :: !CohortVariables,
CohortSnapshot -> TVar (Maybe ResponseHash)
_csPreviousResponse :: !(STM.TVar (Maybe ResponseHash)),
CohortSnapshot -> [Subscriber]
_csExistingSubscribers :: ![Subscriber],
CohortSnapshot -> [Subscriber]
_csNewSubscribers :: ![Subscriber]
}
data Poller streamCursor = Poller
{ Poller streamCursor -> CohortMap streamCursor
_pCohorts :: !(CohortMap streamCursor),
Poller streamCursor -> TMVar PollerIOState
_pIOState :: !(STM.TMVar PollerIOState)
}
data PollerIOState = PollerIOState
{
PollerIOState -> Thread
_pThread :: !Immortal.Thread,
PollerIOState -> PollerId
_pId :: !PollerId
}
data PollerKey =
PollerKey
{ PollerKey -> SourceName
_lgSource :: !SourceName,
PollerKey -> RoleName
_lgRole :: !RoleName,
PollerKey -> Text
_lgQuery :: !Text
}
deriving (Int -> PollerKey -> ShowS
[PollerKey] -> ShowS
PollerKey -> String
(Int -> PollerKey -> ShowS)
-> (PollerKey -> String)
-> ([PollerKey] -> ShowS)
-> Show PollerKey
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [PollerKey] -> ShowS
$cshowList :: [PollerKey] -> ShowS
show :: PollerKey -> String
$cshow :: PollerKey -> String
showsPrec :: Int -> PollerKey -> ShowS
$cshowsPrec :: Int -> PollerKey -> ShowS
Show, PollerKey -> PollerKey -> Bool
(PollerKey -> PollerKey -> Bool)
-> (PollerKey -> PollerKey -> Bool) -> Eq PollerKey
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: PollerKey -> PollerKey -> Bool
$c/= :: PollerKey -> PollerKey -> Bool
== :: PollerKey -> PollerKey -> Bool
$c== :: PollerKey -> PollerKey -> Bool
Eq, (forall x. PollerKey -> Rep PollerKey x)
-> (forall x. Rep PollerKey x -> PollerKey) -> Generic PollerKey
forall x. Rep PollerKey x -> PollerKey
forall x. PollerKey -> Rep PollerKey x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep PollerKey x -> PollerKey
$cfrom :: forall x. PollerKey -> Rep PollerKey x
Generic)
instance Hashable PollerKey
instance J.ToJSON PollerKey where
toJSON :: PollerKey -> Value
toJSON (PollerKey SourceName
source RoleName
role Text
query) =
[Pair] -> Value
J.object
[ Key
"source" Key -> SourceName -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= SourceName
source,
Key
"role" Key -> RoleName -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= RoleName
role,
Key
"query" Key -> Text -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= Text
query
]
type PollerMap streamCursor = STMMap.Map PollerKey (Poller streamCursor)
dumpPollerMap :: Bool -> PollerMap streamCursor -> IO J.Value
dumpPollerMap :: Bool -> PollerMap streamCursor -> IO Value
dumpPollerMap Bool
extended PollerMap streamCursor
pollerMap =
([Value] -> Value) -> IO [Value] -> IO Value
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap [Value] -> Value
forall a. ToJSON a => a -> Value
J.toJSON (IO [Value] -> IO Value) -> IO [Value] -> IO Value
forall a b. (a -> b) -> a -> b
$ do
[(PollerKey, Poller streamCursor)]
entries <- STM [(PollerKey, Poller streamCursor)]
-> IO [(PollerKey, Poller streamCursor)]
forall a. STM a -> IO a
STM.atomically (STM [(PollerKey, Poller streamCursor)]
-> IO [(PollerKey, Poller streamCursor)])
-> STM [(PollerKey, Poller streamCursor)]
-> IO [(PollerKey, Poller streamCursor)]
forall a b. (a -> b) -> a -> b
$ ListT STM (PollerKey, Poller streamCursor)
-> STM [(PollerKey, Poller streamCursor)]
forall (m :: * -> *) a. Monad m => ListT m a -> m [a]
ListT.toList (ListT STM (PollerKey, Poller streamCursor)
-> STM [(PollerKey, Poller streamCursor)])
-> ListT STM (PollerKey, Poller streamCursor)
-> STM [(PollerKey, Poller streamCursor)]
forall a b. (a -> b) -> a -> b
$ PollerMap streamCursor
-> ListT STM (PollerKey, Poller streamCursor)
forall key value. Map key value -> ListT STM (key, value)
STMMap.listT PollerMap streamCursor
pollerMap
[(PollerKey, Poller streamCursor)]
-> ((PollerKey, Poller streamCursor) -> IO Value) -> IO [Value]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [(PollerKey, Poller streamCursor)]
entries (((PollerKey, Poller streamCursor) -> IO Value) -> IO [Value])
-> ((PollerKey, Poller streamCursor) -> IO Value) -> IO [Value]
forall a b. (a -> b) -> a -> b
$ \(PollerKey SourceName
source RoleName
role Text
query, Poller CohortMap streamCursor
cohortsMap TMVar PollerIOState
ioState) -> do
PollerIOState Thread
threadId PollerId
pollerId <- STM PollerIOState -> IO PollerIOState
forall a. STM a -> IO a
STM.atomically (STM PollerIOState -> IO PollerIOState)
-> STM PollerIOState -> IO PollerIOState
forall a b. (a -> b) -> a -> b
$ TMVar PollerIOState -> STM PollerIOState
forall a. TMVar a -> STM a
STM.readTMVar TMVar PollerIOState
ioState
Maybe Value
cohortsJ <-
if Bool
extended
then Value -> Maybe Value
forall a. a -> Maybe a
Just (Value -> Maybe Value) -> IO Value -> IO (Maybe Value)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> CohortMap streamCursor -> IO Value
forall streamCursor. CohortMap streamCursor -> IO Value
dumpCohortMap CohortMap streamCursor
cohortsMap
else Maybe Value -> IO (Maybe Value)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe Value
forall a. Maybe a
Nothing
Value -> IO Value
forall (m :: * -> *) a. Monad m => a -> m a
return (Value -> IO Value) -> Value -> IO Value
forall a b. (a -> b) -> a -> b
$
[Pair] -> Value
J.object
[ Key
"source" Key -> SourceName -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= SourceName
source,
Key
"role" Key -> RoleName -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= RoleName
role,
Key
"thread_id" Key -> String -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= ThreadId -> String
forall a. Show a => a -> String
show (Thread -> ThreadId
Immortal.threadId Thread
threadId),
Key
"poller_id" Key -> PollerId -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= PollerId
pollerId,
Key
"multiplexed_query" Key -> Text -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= Text
query,
Key
"cohorts" Key -> Maybe Value -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= Maybe Value
cohortsJ
]
newtype PollerId = PollerId {PollerId -> UUID
unPollerId :: UUID.UUID}
deriving (Int -> PollerId -> ShowS
[PollerId] -> ShowS
PollerId -> String
(Int -> PollerId -> ShowS)
-> (PollerId -> String) -> ([PollerId] -> ShowS) -> Show PollerId
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [PollerId] -> ShowS
$cshowList :: [PollerId] -> ShowS
show :: PollerId -> String
$cshow :: PollerId -> String
showsPrec :: Int -> PollerId -> ShowS
$cshowsPrec :: Int -> PollerId -> ShowS
Show, PollerId -> PollerId -> Bool
(PollerId -> PollerId -> Bool)
-> (PollerId -> PollerId -> Bool) -> Eq PollerId
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: PollerId -> PollerId -> Bool
$c/= :: PollerId -> PollerId -> Bool
== :: PollerId -> PollerId -> Bool
$c== :: PollerId -> PollerId -> Bool
Eq, (forall x. PollerId -> Rep PollerId x)
-> (forall x. Rep PollerId x -> PollerId) -> Generic PollerId
forall x. Rep PollerId x -> PollerId
forall x. PollerId -> Rep PollerId x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep PollerId x -> PollerId
$cfrom :: forall x. PollerId -> Rep PollerId x
Generic, [PollerId] -> Value
[PollerId] -> Encoding
PollerId -> Value
PollerId -> Encoding
(PollerId -> Value)
-> (PollerId -> Encoding)
-> ([PollerId] -> Value)
-> ([PollerId] -> Encoding)
-> ToJSON PollerId
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> ToJSON a
toEncodingList :: [PollerId] -> Encoding
$ctoEncodingList :: [PollerId] -> Encoding
toJSONList :: [PollerId] -> Value
$ctoJSONList :: [PollerId] -> Value
toEncoding :: PollerId -> Encoding
$ctoEncoding :: PollerId -> Encoding
toJSON :: PollerId -> Value
$ctoJSON :: PollerId -> Value
J.ToJSON)
data SubscriberExecutionDetails = SubscriberExecutionDetails
{ SubscriberExecutionDetails -> SubscriberId
_sedSubscriberId :: !SubscriberId,
SubscriberExecutionDetails -> SubscriberMetadata
_sedSubscriberMetadata :: !SubscriberMetadata
}
deriving (Int -> SubscriberExecutionDetails -> ShowS
[SubscriberExecutionDetails] -> ShowS
SubscriberExecutionDetails -> String
(Int -> SubscriberExecutionDetails -> ShowS)
-> (SubscriberExecutionDetails -> String)
-> ([SubscriberExecutionDetails] -> ShowS)
-> Show SubscriberExecutionDetails
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [SubscriberExecutionDetails] -> ShowS
$cshowList :: [SubscriberExecutionDetails] -> ShowS
show :: SubscriberExecutionDetails -> String
$cshow :: SubscriberExecutionDetails -> String
showsPrec :: Int -> SubscriberExecutionDetails -> ShowS
$cshowsPrec :: Int -> SubscriberExecutionDetails -> ShowS
Show, SubscriberExecutionDetails -> SubscriberExecutionDetails -> Bool
(SubscriberExecutionDetails -> SubscriberExecutionDetails -> Bool)
-> (SubscriberExecutionDetails
-> SubscriberExecutionDetails -> Bool)
-> Eq SubscriberExecutionDetails
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: SubscriberExecutionDetails -> SubscriberExecutionDetails -> Bool
$c/= :: SubscriberExecutionDetails -> SubscriberExecutionDetails -> Bool
== :: SubscriberExecutionDetails -> SubscriberExecutionDetails -> Bool
$c== :: SubscriberExecutionDetails -> SubscriberExecutionDetails -> Bool
Eq)
data CohortExecutionDetails = CohortExecutionDetails
{ CohortExecutionDetails -> CohortId
_cedCohortId :: !CohortId,
CohortExecutionDetails -> CohortKey
_cedVariables :: !CohortVariables,
CohortExecutionDetails -> Maybe Int
_cedResponseSize :: !(Maybe Int),
CohortExecutionDetails -> [SubscriberExecutionDetails]
_cedPushedTo :: ![SubscriberExecutionDetails],
CohortExecutionDetails -> [SubscriberExecutionDetails]
_cedIgnored :: ![SubscriberExecutionDetails],
CohortExecutionDetails -> BatchId
_cedBatchId :: !BatchId
}
deriving (Int -> CohortExecutionDetails -> ShowS
[CohortExecutionDetails] -> ShowS
CohortExecutionDetails -> String
(Int -> CohortExecutionDetails -> ShowS)
-> (CohortExecutionDetails -> String)
-> ([CohortExecutionDetails] -> ShowS)
-> Show CohortExecutionDetails
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [CohortExecutionDetails] -> ShowS
$cshowList :: [CohortExecutionDetails] -> ShowS
show :: CohortExecutionDetails -> String
$cshow :: CohortExecutionDetails -> String
showsPrec :: Int -> CohortExecutionDetails -> ShowS
$cshowsPrec :: Int -> CohortExecutionDetails -> ShowS
Show, CohortExecutionDetails -> CohortExecutionDetails -> Bool
(CohortExecutionDetails -> CohortExecutionDetails -> Bool)
-> (CohortExecutionDetails -> CohortExecutionDetails -> Bool)
-> Eq CohortExecutionDetails
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: CohortExecutionDetails -> CohortExecutionDetails -> Bool
$c/= :: CohortExecutionDetails -> CohortExecutionDetails -> Bool
== :: CohortExecutionDetails -> CohortExecutionDetails -> Bool
$c== :: CohortExecutionDetails -> CohortExecutionDetails -> Bool
Eq)
data BatchExecutionDetails = BatchExecutionDetails
{
BatchExecutionDetails -> DiffTime
_bedPgExecutionTime :: !Clock.DiffTime,
BatchExecutionDetails -> DiffTime
_bedPushTime :: !Clock.DiffTime,
BatchExecutionDetails -> BatchId
_bedBatchId :: !BatchId,
BatchExecutionDetails -> [CohortExecutionDetails]
_bedCohorts :: ![CohortExecutionDetails],
BatchExecutionDetails -> Maybe Int
_bedBatchResponseSizeBytes :: !(Maybe Int)
}
deriving (Int -> BatchExecutionDetails -> ShowS
[BatchExecutionDetails] -> ShowS
BatchExecutionDetails -> String
(Int -> BatchExecutionDetails -> ShowS)
-> (BatchExecutionDetails -> String)
-> ([BatchExecutionDetails] -> ShowS)
-> Show BatchExecutionDetails
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [BatchExecutionDetails] -> ShowS
$cshowList :: [BatchExecutionDetails] -> ShowS
show :: BatchExecutionDetails -> String
$cshow :: BatchExecutionDetails -> String
showsPrec :: Int -> BatchExecutionDetails -> ShowS
$cshowsPrec :: Int -> BatchExecutionDetails -> ShowS
Show, BatchExecutionDetails -> BatchExecutionDetails -> Bool
(BatchExecutionDetails -> BatchExecutionDetails -> Bool)
-> (BatchExecutionDetails -> BatchExecutionDetails -> Bool)
-> Eq BatchExecutionDetails
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: BatchExecutionDetails -> BatchExecutionDetails -> Bool
$c/= :: BatchExecutionDetails -> BatchExecutionDetails -> Bool
== :: BatchExecutionDetails -> BatchExecutionDetails -> Bool
$c== :: BatchExecutionDetails -> BatchExecutionDetails -> Bool
Eq)
batchExecutionDetailMinimal :: BatchExecutionDetails -> J.Value
batchExecutionDetailMinimal :: BatchExecutionDetails -> Value
batchExecutionDetailMinimal BatchExecutionDetails {[CohortExecutionDetails]
Maybe Int
DiffTime
BatchId
_bedBatchResponseSizeBytes :: Maybe Int
_bedCohorts :: [CohortExecutionDetails]
_bedBatchId :: BatchId
_bedPushTime :: DiffTime
_bedPgExecutionTime :: DiffTime
_bedBatchResponseSizeBytes :: BatchExecutionDetails -> Maybe Int
_bedCohorts :: BatchExecutionDetails -> [CohortExecutionDetails]
_bedBatchId :: BatchExecutionDetails -> BatchId
_bedPushTime :: BatchExecutionDetails -> DiffTime
_bedPgExecutionTime :: BatchExecutionDetails -> DiffTime
..} =
let batchRespSize :: [Pair]
batchRespSize =
[Pair] -> (Int -> [Pair]) -> Maybe Int -> [Pair]
forall b a. b -> (a -> b) -> Maybe a -> b
maybe
[Pair]
forall a. Monoid a => a
mempty
(\Int
respSize -> [Key
"batch_response_size_bytes" Key -> Int -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= Int
respSize])
Maybe Int
_bedBatchResponseSizeBytes
in [Pair] -> Value
J.object
( [ Key
"pg_execution_time" Key -> DiffTime -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= DiffTime
_bedPgExecutionTime,
Key
"push_time" Key -> DiffTime -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= DiffTime
_bedPushTime
]
[Pair] -> [Pair] -> [Pair]
forall a. Semigroup a => a -> a -> a
<> [Pair]
batchRespSize
)
data PollDetails = PollDetails
{
PollDetails -> PollerId
_pdPollerId :: !PollerId,
PollDetails -> Text
_pdGeneratedSql :: !Text,
PollDetails -> DiffTime
_pdSnapshotTime :: !Clock.DiffTime,
PollDetails -> [BatchExecutionDetails]
_pdBatches :: ![BatchExecutionDetails],
PollDetails -> DiffTime
_pdTotalTime :: !Clock.DiffTime,
PollDetails -> SubscriptionsOptions
_pdLiveQueryOptions :: !SubscriptionsOptions,
PollDetails -> SourceName
_pdSource :: !SourceName,
PollDetails -> RoleName
_pdRole :: !RoleName,
PollDetails -> ParameterizedQueryHash
_pdParameterizedQueryHash :: !ParameterizedQueryHash
}
deriving (Int -> PollDetails -> ShowS
[PollDetails] -> ShowS
PollDetails -> String
(Int -> PollDetails -> ShowS)
-> (PollDetails -> String)
-> ([PollDetails] -> ShowS)
-> Show PollDetails
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [PollDetails] -> ShowS
$cshowList :: [PollDetails] -> ShowS
show :: PollDetails -> String
$cshow :: PollDetails -> String
showsPrec :: Int -> PollDetails -> ShowS
$cshowsPrec :: Int -> PollDetails -> ShowS
Show, PollDetails -> PollDetails -> Bool
(PollDetails -> PollDetails -> Bool)
-> (PollDetails -> PollDetails -> Bool) -> Eq PollDetails
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: PollDetails -> PollDetails -> Bool
$c/= :: PollDetails -> PollDetails -> Bool
== :: PollDetails -> PollDetails -> Bool
$c== :: PollDetails -> PollDetails -> Bool
Eq)
pollDetailMinimal :: PollDetails -> J.Value
pollDetailMinimal :: PollDetails -> Value
pollDetailMinimal PollDetails {[BatchExecutionDetails]
Text
DiffTime
ParameterizedQueryHash
RoleName
SubscriptionsOptions
SourceName
PollerId
_pdParameterizedQueryHash :: ParameterizedQueryHash
_pdRole :: RoleName
_pdSource :: SourceName
_pdLiveQueryOptions :: SubscriptionsOptions
_pdTotalTime :: DiffTime
_pdBatches :: [BatchExecutionDetails]
_pdSnapshotTime :: DiffTime
_pdGeneratedSql :: Text
_pdPollerId :: PollerId
_pdParameterizedQueryHash :: PollDetails -> ParameterizedQueryHash
_pdRole :: PollDetails -> RoleName
_pdSource :: PollDetails -> SourceName
_pdLiveQueryOptions :: PollDetails -> SubscriptionsOptions
_pdTotalTime :: PollDetails -> DiffTime
_pdBatches :: PollDetails -> [BatchExecutionDetails]
_pdSnapshotTime :: PollDetails -> DiffTime
_pdGeneratedSql :: PollDetails -> Text
_pdPollerId :: PollDetails -> PollerId
..} =
[Pair] -> Value
J.object
[ Key
"poller_id" Key -> PollerId -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= PollerId
_pdPollerId,
Key
"snapshot_time" Key -> DiffTime -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= DiffTime
_pdSnapshotTime,
Key
"batches" Key -> [Value] -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= (BatchExecutionDetails -> Value)
-> [BatchExecutionDetails] -> [Value]
forall a b. (a -> b) -> [a] -> [b]
map BatchExecutionDetails -> Value
batchExecutionDetailMinimal [BatchExecutionDetails]
_pdBatches,
Key
"total_time" Key -> DiffTime -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= DiffTime
_pdTotalTime,
Key
"source" Key -> SourceName -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= SourceName
_pdSource,
Key
"role" Key -> RoleName -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= RoleName
_pdRole
]
instance L.ToEngineLog PollDetails L.Hasura where
toEngineLog :: PollDetails -> (LogLevel, EngineLogType Hasura, Value)
toEngineLog PollDetails
pl = (LogLevel
L.LevelInfo, EngineLogType Hasura
L.ELTLivequeryPollerLog, PollDetails -> Value
pollDetailMinimal PollDetails
pl)
type SubscriptionPostPollHook = PollDetails -> IO ()
defaultSubscriptionPostPollHook :: L.Logger L.Hasura -> SubscriptionPostPollHook
defaultSubscriptionPostPollHook :: Logger Hasura -> SubscriptionPostPollHook
defaultSubscriptionPostPollHook = \Logger Hasura
x -> Logger Hasura
-> forall a (m :: * -> *).
(ToEngineLog a Hasura, MonadIO m) =>
a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
(ToEngineLog a impl, MonadIO m) =>
a -> m ()
L.unLogger Logger Hasura
x