-- | Multiplexed subscription poller threads; see "Hasura.GraphQL.Execute.Subscription" for details.
module Hasura.GraphQL.Execute.Subscription.Poll.Common
  ( -- * Pollers
    Poller (..),
    PollerId (..),
    PollerIOState (..),
    PollerKey (..),
    PollerMap,
    dumpPollerMap,
    PollDetails (..),
    BatchExecutionDetails (..),
    CohortExecutionDetails (..),
    SubscriptionPostPollHook,
    defaultSubscriptionPostPollHook,

    -- * Cohorts
    Cohort (..),
    CohortSnapshot (..),
    CursorVariableValues (..),
    CohortId,
    newCohortId,
    CohortVariables,
    CohortKey,
    CohortMap,

    -- * Subscribers
    Subscriber (..),
    SubscriberId,
    newSubscriberId,
    SubscriberMetadata,
    mkSubscriberMetadata,
    unSubscriberMetadata,
    SubscriberMap,
    OnChange,
    SubscriptionGQResponse,
    SubscriptionResponse (..),
    SubscriptionMetadata (..),
    SubscriberExecutionDetails (..),

    -- * Batch
    BatchId (..),

    -- * Hash
    ResponseHash (..),
    mkRespHash,
  )
where

import Control.Concurrent.STM qualified as STM
import Control.Immortal qualified as Immortal
import Crypto.Hash qualified as CH
import Data.Aeson qualified as J
import Data.ByteString qualified as BS
import Data.Time.Clock qualified as Clock
import Data.UUID qualified as UUID
import Data.UUID.V4 qualified as UUID
import Hasura.GraphQL.Execute.Subscription.Options
import Hasura.GraphQL.Execute.Subscription.Plan
import Hasura.GraphQL.Execute.Subscription.TMap qualified as TMap
import Hasura.GraphQL.ParameterizedQueryHash (ParameterizedQueryHash)
import Hasura.GraphQL.Transport.HTTP.Protocol
import Hasura.GraphQL.Transport.WebSocket.Protocol (OperationId)
import Hasura.GraphQL.Transport.WebSocket.Server qualified as WS
import Hasura.Logging qualified as L
import Hasura.Prelude
import Hasura.RQL.Types.Common (SourceName)
import Hasura.Server.Types (RequestId)
import Hasura.Session
import ListT qualified
import StmContainers.Map qualified as STMMap

-- ----------------------------------------------------------------------------------------------
-- Subscribers

newtype SubscriberId = SubscriberId {SubscriberId -> UUID
unSubscriberId :: UUID.UUID}
  deriving (Int -> SubscriberId -> ShowS
[SubscriberId] -> ShowS
SubscriberId -> String
(Int -> SubscriberId -> ShowS)
-> (SubscriberId -> String)
-> ([SubscriberId] -> ShowS)
-> Show SubscriberId
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [SubscriberId] -> ShowS
$cshowList :: [SubscriberId] -> ShowS
show :: SubscriberId -> String
$cshow :: SubscriberId -> String
showsPrec :: Int -> SubscriberId -> ShowS
$cshowsPrec :: Int -> SubscriberId -> ShowS
Show, SubscriberId -> SubscriberId -> Bool
(SubscriberId -> SubscriberId -> Bool)
-> (SubscriberId -> SubscriberId -> Bool) -> Eq SubscriberId
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: SubscriberId -> SubscriberId -> Bool
$c/= :: SubscriberId -> SubscriberId -> Bool
== :: SubscriberId -> SubscriberId -> Bool
$c== :: SubscriberId -> SubscriberId -> Bool
Eq, (forall x. SubscriberId -> Rep SubscriberId x)
-> (forall x. Rep SubscriberId x -> SubscriberId)
-> Generic SubscriberId
forall x. Rep SubscriberId x -> SubscriberId
forall x. SubscriberId -> Rep SubscriberId x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep SubscriberId x -> SubscriberId
$cfrom :: forall x. SubscriberId -> Rep SubscriberId x
Generic, Int -> SubscriberId -> Int
SubscriberId -> Int
(Int -> SubscriberId -> Int)
-> (SubscriberId -> Int) -> Hashable SubscriberId
forall a. (Int -> a -> Int) -> (a -> Int) -> Hashable a
hash :: SubscriberId -> Int
$chash :: SubscriberId -> Int
hashWithSalt :: Int -> SubscriberId -> Int
$chashWithSalt :: Int -> SubscriberId -> Int
Hashable, [SubscriberId] -> Value
[SubscriberId] -> Encoding
SubscriberId -> Value
SubscriberId -> Encoding
(SubscriberId -> Value)
-> (SubscriberId -> Encoding)
-> ([SubscriberId] -> Value)
-> ([SubscriberId] -> Encoding)
-> ToJSON SubscriberId
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> ToJSON a
toEncodingList :: [SubscriberId] -> Encoding
$ctoEncodingList :: [SubscriberId] -> Encoding
toJSONList :: [SubscriberId] -> Value
$ctoJSONList :: [SubscriberId] -> Value
toEncoding :: SubscriberId -> Encoding
$ctoEncoding :: SubscriberId -> Encoding
toJSON :: SubscriberId -> Value
$ctoJSON :: SubscriberId -> Value
J.ToJSON)

newSubscriberId :: IO SubscriberId
newSubscriberId :: IO SubscriberId
newSubscriberId =
  UUID -> SubscriberId
SubscriberId (UUID -> SubscriberId) -> IO UUID -> IO SubscriberId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom

-- | Allows a user of the live query subsystem (currently websocket transport)
-- to attach arbitrary metadata about a subscriber. This information is available
-- as part of Subscriber in CohortExecutionDetails and can be logged by customizing
-- in pollerlog
newtype SubscriberMetadata = SubscriberMetadata {SubscriberMetadata -> Value
unSubscriberMetadata :: J.Value}
  deriving (Int -> SubscriberMetadata -> ShowS
[SubscriberMetadata] -> ShowS
SubscriberMetadata -> String
(Int -> SubscriberMetadata -> ShowS)
-> (SubscriberMetadata -> String)
-> ([SubscriberMetadata] -> ShowS)
-> Show SubscriberMetadata
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [SubscriberMetadata] -> ShowS
$cshowList :: [SubscriberMetadata] -> ShowS
show :: SubscriberMetadata -> String
$cshow :: SubscriberMetadata -> String
showsPrec :: Int -> SubscriberMetadata -> ShowS
$cshowsPrec :: Int -> SubscriberMetadata -> ShowS
Show, SubscriberMetadata -> SubscriberMetadata -> Bool
(SubscriberMetadata -> SubscriberMetadata -> Bool)
-> (SubscriberMetadata -> SubscriberMetadata -> Bool)
-> Eq SubscriberMetadata
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: SubscriberMetadata -> SubscriberMetadata -> Bool
$c/= :: SubscriberMetadata -> SubscriberMetadata -> Bool
== :: SubscriberMetadata -> SubscriberMetadata -> Bool
$c== :: SubscriberMetadata -> SubscriberMetadata -> Bool
Eq, [SubscriberMetadata] -> Value
[SubscriberMetadata] -> Encoding
SubscriberMetadata -> Value
SubscriberMetadata -> Encoding
(SubscriberMetadata -> Value)
-> (SubscriberMetadata -> Encoding)
-> ([SubscriberMetadata] -> Value)
-> ([SubscriberMetadata] -> Encoding)
-> ToJSON SubscriberMetadata
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> ToJSON a
toEncodingList :: [SubscriberMetadata] -> Encoding
$ctoEncodingList :: [SubscriberMetadata] -> Encoding
toJSONList :: [SubscriberMetadata] -> Value
$ctoJSONList :: [SubscriberMetadata] -> Value
toEncoding :: SubscriberMetadata -> Encoding
$ctoEncoding :: SubscriberMetadata -> Encoding
toJSON :: SubscriberMetadata -> Value
$ctoJSON :: SubscriberMetadata -> Value
J.ToJSON)

mkSubscriberMetadata :: WS.WSId -> OperationId -> Maybe OperationName -> RequestId -> SubscriberMetadata
mkSubscriberMetadata :: WSId
-> OperationId
-> Maybe OperationName
-> RequestId
-> SubscriberMetadata
mkSubscriberMetadata WSId
websocketId OperationId
operationId Maybe OperationName
operationName RequestId
reqId =
  Value -> SubscriberMetadata
SubscriberMetadata (Value -> SubscriberMetadata) -> Value -> SubscriberMetadata
forall a b. (a -> b) -> a -> b
$
    [Pair] -> Value
J.object
      [ Key
"websocket_id" Key -> WSId -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= WSId
websocketId,
        Key
"operation_id" Key -> OperationId -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= OperationId
operationId,
        Key
"operation_name" Key -> Maybe OperationName -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= Maybe OperationName
operationName,
        Key
"request_id" Key -> RequestId -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= RequestId
reqId
      ]

data Subscriber = Subscriber
  { Subscriber -> SubscriberId
_sId :: !SubscriberId,
    Subscriber -> SubscriberMetadata
_sMetadata :: !SubscriberMetadata,
    Subscriber -> RequestId
_sRequestId :: !RequestId,
    Subscriber -> Maybe OperationName
_sOperationName :: !(Maybe OperationName),
    Subscriber -> OnChange
_sOnChangeCallback :: !OnChange
  }

-- | Subscription onChange metadata, used for adding more extra analytics data
data SubscriptionMetadata = SubscriptionMetadata
  { SubscriptionMetadata -> DiffTime
_sqmExecutionTime :: !Clock.DiffTime
  }

data SubscriptionResponse = SubscriptionResponse
  { SubscriptionResponse -> ByteString
_lqrPayload :: !BS.ByteString,
    SubscriptionResponse -> DiffTime
_lqrExecutionTime :: !Clock.DiffTime
  }

type SubscriptionGQResponse = GQResult SubscriptionResponse

type OnChange = SubscriptionGQResponse -> IO ()

type SubscriberMap = TMap.TMap SubscriberId Subscriber

-- -------------------------------------------------------------------------------------------------
-- Cohorts

-- | A batched group of 'Subscriber's who are not only listening to the same query but also have
-- identical session and query variables. Each result pushed to a 'Cohort' is forwarded along to
-- each of its 'Subscriber's.
--
-- In SQL, each 'Cohort' corresponds to a single row in the laterally-joined @_subs@ table (and
-- therefore a single row in the query result).
--
-- See also 'CohortMap'.
data Cohort streamCursorVars = Cohort
  { -- | a unique identifier used to identify the cohort in the generated query
    Cohort streamCursorVars -> CohortId
_cCohortId :: !CohortId,
    -- | a hash of the previous query result, if any, used to determine if we need to push an updated
    -- result to the subscribers or not
    Cohort streamCursorVars -> TVar (Maybe ResponseHash)
_cPreviousResponse :: !(STM.TVar (Maybe ResponseHash)),
    -- | the subscribers we’ve already pushed a result to; we push new results to them iff the
    -- response changes
    Cohort streamCursorVars -> SubscriberMap
_cExistingSubscribers :: !SubscriberMap,
    -- | subscribers we haven’t yet pushed any results to; we push results to them regardless if the
    -- result changed, then merge them in the map of existing subscribers
    Cohort streamCursorVars -> SubscriberMap
_cNewSubscribers :: !SubscriberMap,
    -- | a mutable type which holds the latest value of the subscription stream cursor. In case
    --   of live query subscription, this field is ignored by setting `streamCursorVars` to `()`
    Cohort streamCursorVars -> streamCursorVars
_cStreamCursorVariables :: !streamCursorVars
  }

-- | The @BatchId@ is a number based ID to uniquely identify a batch in a single poll and
--   it's used to identify the batch to which a cohort belongs to.
newtype BatchId = BatchId {BatchId -> Int
_unBatchId :: Int}
  deriving (Int -> BatchId -> ShowS
[BatchId] -> ShowS
BatchId -> String
(Int -> BatchId -> ShowS)
-> (BatchId -> String) -> ([BatchId] -> ShowS) -> Show BatchId
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [BatchId] -> ShowS
$cshowList :: [BatchId] -> ShowS
show :: BatchId -> String
$cshow :: BatchId -> String
showsPrec :: Int -> BatchId -> ShowS
$cshowsPrec :: Int -> BatchId -> ShowS
Show, BatchId -> BatchId -> Bool
(BatchId -> BatchId -> Bool)
-> (BatchId -> BatchId -> Bool) -> Eq BatchId
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: BatchId -> BatchId -> Bool
$c/= :: BatchId -> BatchId -> Bool
== :: BatchId -> BatchId -> Bool
$c== :: BatchId -> BatchId -> Bool
Eq, [BatchId] -> Value
[BatchId] -> Encoding
BatchId -> Value
BatchId -> Encoding
(BatchId -> Value)
-> (BatchId -> Encoding)
-> ([BatchId] -> Value)
-> ([BatchId] -> Encoding)
-> ToJSON BatchId
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> ToJSON a
toEncodingList :: [BatchId] -> Encoding
$ctoEncodingList :: [BatchId] -> Encoding
toJSONList :: [BatchId] -> Value
$ctoJSONList :: [BatchId] -> Value
toEncoding :: BatchId -> Encoding
$ctoEncoding :: BatchId -> Encoding
toJSON :: BatchId -> Value
$ctoJSON :: BatchId -> Value
J.ToJSON)

{- Note [Blake2b faster than SHA-256]
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
At the time of writing, from https://blake2.net, it is stated,
"BLAKE2 is a cryptographic hash function faster than MD5, SHA-1, SHA-2, and SHA-3,
yet is at least as secure as the latest standard SHA-3".
-}

-- | A hash used to determine if the result changed without having to keep the entire result in
-- memory. Using a cryptographic hash ensures that a hash collision is almost impossible: with 256
-- bits, even if a subscription changes once per second for an entire year, the probability of a
-- hash collision is ~4.294417×10-63. See Note [Blake2b faster than SHA-256].
newtype ResponseHash = ResponseHash {ResponseHash -> Digest Blake2b_256
unResponseHash :: CH.Digest CH.Blake2b_256}
  deriving (Int -> ResponseHash -> ShowS
[ResponseHash] -> ShowS
ResponseHash -> String
(Int -> ResponseHash -> ShowS)
-> (ResponseHash -> String)
-> ([ResponseHash] -> ShowS)
-> Show ResponseHash
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ResponseHash] -> ShowS
$cshowList :: [ResponseHash] -> ShowS
show :: ResponseHash -> String
$cshow :: ResponseHash -> String
showsPrec :: Int -> ResponseHash -> ShowS
$cshowsPrec :: Int -> ResponseHash -> ShowS
Show, ResponseHash -> ResponseHash -> Bool
(ResponseHash -> ResponseHash -> Bool)
-> (ResponseHash -> ResponseHash -> Bool) -> Eq ResponseHash
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ResponseHash -> ResponseHash -> Bool
$c/= :: ResponseHash -> ResponseHash -> Bool
== :: ResponseHash -> ResponseHash -> Bool
$c== :: ResponseHash -> ResponseHash -> Bool
Eq)

instance J.ToJSON ResponseHash where
  toJSON :: ResponseHash -> Value
toJSON = String -> Value
forall a. ToJSON a => a -> Value
J.toJSON (String -> Value)
-> (ResponseHash -> String) -> ResponseHash -> Value
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Digest Blake2b_256 -> String
forall a. Show a => a -> String
show (Digest Blake2b_256 -> String)
-> (ResponseHash -> Digest Blake2b_256) -> ResponseHash -> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ResponseHash -> Digest Blake2b_256
unResponseHash

mkRespHash :: BS.ByteString -> ResponseHash
mkRespHash :: ByteString -> ResponseHash
mkRespHash = Digest Blake2b_256 -> ResponseHash
ResponseHash (Digest Blake2b_256 -> ResponseHash)
-> (ByteString -> Digest Blake2b_256) -> ByteString -> ResponseHash
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Digest Blake2b_256
forall ba a.
(ByteArrayAccess ba, HashAlgorithm a) =>
ba -> Digest a
CH.hash

-- | A key we use to determine if two 'Subscriber's belong in the same 'Cohort'
-- (assuming they already meet the criteria to be in the same 'Poller'). Note
-- the distinction between this and 'CohortId'; the latter is a completely
-- synthetic key used only to identify the cohort in the generated SQL query.
type CohortKey = CohortVariables

-- | This has the invariant, maintained in 'removeLiveQuery', that it contains
-- no 'Cohort' with zero total (existing + new) subscribers.
type CohortMap streamCursor = TMap.TMap CohortKey (Cohort streamCursor)

dumpCohortMap :: CohortMap streamCursor -> IO J.Value
dumpCohortMap :: CohortMap streamCursor -> IO Value
dumpCohortMap CohortMap streamCursor
cohortMap = do
  [(CohortKey, Cohort streamCursor)]
cohorts <- STM [(CohortKey, Cohort streamCursor)]
-> IO [(CohortKey, Cohort streamCursor)]
forall a. STM a -> IO a
STM.atomically (STM [(CohortKey, Cohort streamCursor)]
 -> IO [(CohortKey, Cohort streamCursor)])
-> STM [(CohortKey, Cohort streamCursor)]
-> IO [(CohortKey, Cohort streamCursor)]
forall a b. (a -> b) -> a -> b
$ CohortMap streamCursor -> STM [(CohortKey, Cohort streamCursor)]
forall k v. TMap k v -> STM [(k, v)]
TMap.toList CohortMap streamCursor
cohortMap
  ([Value] -> Value) -> IO [Value] -> IO Value
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap [Value] -> Value
forall a. ToJSON a => a -> Value
J.toJSON (IO [Value] -> IO Value)
-> (((CohortKey, Cohort streamCursor) -> IO Value) -> IO [Value])
-> ((CohortKey, Cohort streamCursor) -> IO Value)
-> IO Value
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [(CohortKey, Cohort streamCursor)]
-> ((CohortKey, Cohort streamCursor) -> IO Value) -> IO [Value]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [(CohortKey, Cohort streamCursor)]
cohorts (((CohortKey, Cohort streamCursor) -> IO Value) -> IO Value)
-> ((CohortKey, Cohort streamCursor) -> IO Value) -> IO Value
forall a b. (a -> b) -> a -> b
$ \(CohortKey
variableValues, Cohort streamCursor
cohort) -> do
    Value
cohortJ <- Cohort streamCursor -> IO Value
forall streamCursorVars. Cohort streamCursorVars -> IO Value
dumpCohort Cohort streamCursor
cohort
    Value -> IO Value
forall (m :: * -> *) a. Monad m => a -> m a
return (Value -> IO Value) -> Value -> IO Value
forall a b. (a -> b) -> a -> b
$
      [Pair] -> Value
J.object
        [ Key
"variables" Key -> CohortKey -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= CohortKey
variableValues,
          Key
"cohort" Key -> Value -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= Value
cohortJ
        ]
  where
    dumpCohort :: Cohort streamCursorVars -> IO Value
dumpCohort (Cohort CohortId
respId TVar (Maybe ResponseHash)
respTV SubscriberMap
curOps SubscriberMap
newOps streamCursorVars
_) =
      STM Value -> IO Value
forall a. STM a -> IO a
STM.atomically (STM Value -> IO Value) -> STM Value -> IO Value
forall a b. (a -> b) -> a -> b
$ do
        Maybe ResponseHash
prevResHash <- TVar (Maybe ResponseHash) -> STM (Maybe ResponseHash)
forall a. TVar a -> STM a
STM.readTVar TVar (Maybe ResponseHash)
respTV
        [(SubscriberId, Subscriber)]
curOpIds <- SubscriberMap -> STM [(SubscriberId, Subscriber)]
forall k v. TMap k v -> STM [(k, v)]
TMap.toList SubscriberMap
curOps
        [(SubscriberId, Subscriber)]
newOpIds <- SubscriberMap -> STM [(SubscriberId, Subscriber)]
forall k v. TMap k v -> STM [(k, v)]
TMap.toList SubscriberMap
newOps
        Value -> STM Value
forall (m :: * -> *) a. Monad m => a -> m a
return (Value -> STM Value) -> Value -> STM Value
forall a b. (a -> b) -> a -> b
$
          [Pair] -> Value
J.object
            [ Key
"resp_id" Key -> CohortId -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= CohortId
respId,
              Key
"current_ops" Key -> [SubscriberId] -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= ((SubscriberId, Subscriber) -> SubscriberId)
-> [(SubscriberId, Subscriber)] -> [SubscriberId]
forall a b. (a -> b) -> [a] -> [b]
map (SubscriberId, Subscriber) -> SubscriberId
forall a b. (a, b) -> a
fst [(SubscriberId, Subscriber)]
curOpIds,
              Key
"new_ops" Key -> [SubscriberId] -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= ((SubscriberId, Subscriber) -> SubscriberId)
-> [(SubscriberId, Subscriber)] -> [SubscriberId]
forall a b. (a -> b) -> [a] -> [b]
map (SubscriberId, Subscriber) -> SubscriberId
forall a b. (a, b) -> a
fst [(SubscriberId, Subscriber)]
newOpIds,
              Key
"previous_result_hash" Key -> Maybe ResponseHash -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= Maybe ResponseHash
prevResHash
            ]

data CohortSnapshot = CohortSnapshot
  { CohortSnapshot -> CohortKey
_csVariables :: !CohortVariables,
    CohortSnapshot -> TVar (Maybe ResponseHash)
_csPreviousResponse :: !(STM.TVar (Maybe ResponseHash)),
    CohortSnapshot -> [Subscriber]
_csExistingSubscribers :: ![Subscriber],
    CohortSnapshot -> [Subscriber]
_csNewSubscribers :: ![Subscriber]
  }

-- -----------------------------------------------------------------------------
-- Pollers

-- | A unique, multiplexed query. Each 'Poller' has its own polling thread that
-- periodically polls Postgres and pushes results to each of its listening
-- 'Cohort's.
--
-- In SQL, an 'Poller' corresponds to a single, multiplexed query, though in
-- practice, 'Poller's with large numbers of 'Cohort's are batched into
-- multiple concurrent queries for performance reasons.
data Poller streamCursor = Poller
  { Poller streamCursor -> CohortMap streamCursor
_pCohorts :: !(CohortMap streamCursor),
    -- | This is in a separate 'STM.TMVar' because it’s important that we are
    -- able to construct 'Poller' values in 'STM.STM' --- we need the insertion
    -- into the 'PollerMap' to be atomic to ensure that we don’t accidentally
    -- create two for the same query due to a race. However, we can’t spawn the
    -- worker thread or create the metrics store in 'STM.STM', so we insert it
    -- into the 'Poller' only after we’re certain we won’t create any duplicates.
    --
    -- This var is "write once", moving monotonically from empty to full.
    -- TODO this could probably be tightened up to something like
    -- 'STM PollerIOState'
    Poller streamCursor -> TMVar PollerIOState
_pIOState :: !(STM.TMVar PollerIOState)
  }

data PollerIOState = PollerIOState
  { -- | a handle on the poller’s worker thread that can be used to
    -- 'Immortal.stop' it if all its cohorts stop listening
    PollerIOState -> Thread
_pThread :: !Immortal.Thread,
    PollerIOState -> PollerId
_pId :: !PollerId
  }

data PollerKey =
  -- we don't need operation name here as a subscription will only have a
  -- single top level field
  PollerKey
  { PollerKey -> SourceName
_lgSource :: !SourceName,
    PollerKey -> RoleName
_lgRole :: !RoleName,
    PollerKey -> Text
_lgQuery :: !Text
  }
  deriving (Int -> PollerKey -> ShowS
[PollerKey] -> ShowS
PollerKey -> String
(Int -> PollerKey -> ShowS)
-> (PollerKey -> String)
-> ([PollerKey] -> ShowS)
-> Show PollerKey
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [PollerKey] -> ShowS
$cshowList :: [PollerKey] -> ShowS
show :: PollerKey -> String
$cshow :: PollerKey -> String
showsPrec :: Int -> PollerKey -> ShowS
$cshowsPrec :: Int -> PollerKey -> ShowS
Show, PollerKey -> PollerKey -> Bool
(PollerKey -> PollerKey -> Bool)
-> (PollerKey -> PollerKey -> Bool) -> Eq PollerKey
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: PollerKey -> PollerKey -> Bool
$c/= :: PollerKey -> PollerKey -> Bool
== :: PollerKey -> PollerKey -> Bool
$c== :: PollerKey -> PollerKey -> Bool
Eq, (forall x. PollerKey -> Rep PollerKey x)
-> (forall x. Rep PollerKey x -> PollerKey) -> Generic PollerKey
forall x. Rep PollerKey x -> PollerKey
forall x. PollerKey -> Rep PollerKey x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep PollerKey x -> PollerKey
$cfrom :: forall x. PollerKey -> Rep PollerKey x
Generic)

instance Hashable PollerKey

instance J.ToJSON PollerKey where
  toJSON :: PollerKey -> Value
toJSON (PollerKey SourceName
source RoleName
role Text
query) =
    [Pair] -> Value
J.object
      [ Key
"source" Key -> SourceName -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= SourceName
source,
        Key
"role" Key -> RoleName -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= RoleName
role,
        Key
"query" Key -> Text -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= Text
query
      ]

type PollerMap streamCursor = STMMap.Map PollerKey (Poller streamCursor)

dumpPollerMap :: Bool -> PollerMap streamCursor -> IO J.Value
dumpPollerMap :: Bool -> PollerMap streamCursor -> IO Value
dumpPollerMap Bool
extended PollerMap streamCursor
pollerMap =
  ([Value] -> Value) -> IO [Value] -> IO Value
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap [Value] -> Value
forall a. ToJSON a => a -> Value
J.toJSON (IO [Value] -> IO Value) -> IO [Value] -> IO Value
forall a b. (a -> b) -> a -> b
$ do
    [(PollerKey, Poller streamCursor)]
entries <- STM [(PollerKey, Poller streamCursor)]
-> IO [(PollerKey, Poller streamCursor)]
forall a. STM a -> IO a
STM.atomically (STM [(PollerKey, Poller streamCursor)]
 -> IO [(PollerKey, Poller streamCursor)])
-> STM [(PollerKey, Poller streamCursor)]
-> IO [(PollerKey, Poller streamCursor)]
forall a b. (a -> b) -> a -> b
$ ListT STM (PollerKey, Poller streamCursor)
-> STM [(PollerKey, Poller streamCursor)]
forall (m :: * -> *) a. Monad m => ListT m a -> m [a]
ListT.toList (ListT STM (PollerKey, Poller streamCursor)
 -> STM [(PollerKey, Poller streamCursor)])
-> ListT STM (PollerKey, Poller streamCursor)
-> STM [(PollerKey, Poller streamCursor)]
forall a b. (a -> b) -> a -> b
$ PollerMap streamCursor
-> ListT STM (PollerKey, Poller streamCursor)
forall key value. Map key value -> ListT STM (key, value)
STMMap.listT PollerMap streamCursor
pollerMap
    [(PollerKey, Poller streamCursor)]
-> ((PollerKey, Poller streamCursor) -> IO Value) -> IO [Value]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [(PollerKey, Poller streamCursor)]
entries (((PollerKey, Poller streamCursor) -> IO Value) -> IO [Value])
-> ((PollerKey, Poller streamCursor) -> IO Value) -> IO [Value]
forall a b. (a -> b) -> a -> b
$ \(PollerKey SourceName
source RoleName
role Text
query, Poller CohortMap streamCursor
cohortsMap TMVar PollerIOState
ioState) -> do
      PollerIOState Thread
threadId PollerId
pollerId <- STM PollerIOState -> IO PollerIOState
forall a. STM a -> IO a
STM.atomically (STM PollerIOState -> IO PollerIOState)
-> STM PollerIOState -> IO PollerIOState
forall a b. (a -> b) -> a -> b
$ TMVar PollerIOState -> STM PollerIOState
forall a. TMVar a -> STM a
STM.readTMVar TMVar PollerIOState
ioState
      Maybe Value
cohortsJ <-
        if Bool
extended
          then Value -> Maybe Value
forall a. a -> Maybe a
Just (Value -> Maybe Value) -> IO Value -> IO (Maybe Value)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> CohortMap streamCursor -> IO Value
forall streamCursor. CohortMap streamCursor -> IO Value
dumpCohortMap CohortMap streamCursor
cohortsMap
          else Maybe Value -> IO (Maybe Value)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe Value
forall a. Maybe a
Nothing
      Value -> IO Value
forall (m :: * -> *) a. Monad m => a -> m a
return (Value -> IO Value) -> Value -> IO Value
forall a b. (a -> b) -> a -> b
$
        [Pair] -> Value
J.object
          [ Key
"source" Key -> SourceName -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= SourceName
source,
            Key
"role" Key -> RoleName -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= RoleName
role,
            Key
"thread_id" Key -> String -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= ThreadId -> String
forall a. Show a => a -> String
show (Thread -> ThreadId
Immortal.threadId Thread
threadId),
            Key
"poller_id" Key -> PollerId -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= PollerId
pollerId,
            Key
"multiplexed_query" Key -> Text -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= Text
query,
            Key
"cohorts" Key -> Maybe Value -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= Maybe Value
cohortsJ
          ]

-- | An ID to track unique 'Poller's, so that we can gather metrics about each
-- poller
newtype PollerId = PollerId {PollerId -> UUID
unPollerId :: UUID.UUID}
  deriving (Int -> PollerId -> ShowS
[PollerId] -> ShowS
PollerId -> String
(Int -> PollerId -> ShowS)
-> (PollerId -> String) -> ([PollerId] -> ShowS) -> Show PollerId
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [PollerId] -> ShowS
$cshowList :: [PollerId] -> ShowS
show :: PollerId -> String
$cshow :: PollerId -> String
showsPrec :: Int -> PollerId -> ShowS
$cshowsPrec :: Int -> PollerId -> ShowS
Show, PollerId -> PollerId -> Bool
(PollerId -> PollerId -> Bool)
-> (PollerId -> PollerId -> Bool) -> Eq PollerId
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: PollerId -> PollerId -> Bool
$c/= :: PollerId -> PollerId -> Bool
== :: PollerId -> PollerId -> Bool
$c== :: PollerId -> PollerId -> Bool
Eq, (forall x. PollerId -> Rep PollerId x)
-> (forall x. Rep PollerId x -> PollerId) -> Generic PollerId
forall x. Rep PollerId x -> PollerId
forall x. PollerId -> Rep PollerId x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep PollerId x -> PollerId
$cfrom :: forall x. PollerId -> Rep PollerId x
Generic, [PollerId] -> Value
[PollerId] -> Encoding
PollerId -> Value
PollerId -> Encoding
(PollerId -> Value)
-> (PollerId -> Encoding)
-> ([PollerId] -> Value)
-> ([PollerId] -> Encoding)
-> ToJSON PollerId
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> ToJSON a
toEncodingList :: [PollerId] -> Encoding
$ctoEncodingList :: [PollerId] -> Encoding
toJSONList :: [PollerId] -> Value
$ctoJSONList :: [PollerId] -> Value
toEncoding :: PollerId -> Encoding
$ctoEncoding :: PollerId -> Encoding
toJSON :: PollerId -> Value
$ctoJSON :: PollerId -> Value
J.ToJSON)

data SubscriberExecutionDetails = SubscriberExecutionDetails
  { SubscriberExecutionDetails -> SubscriberId
_sedSubscriberId :: !SubscriberId,
    SubscriberExecutionDetails -> SubscriberMetadata
_sedSubscriberMetadata :: !SubscriberMetadata
  }
  deriving (Int -> SubscriberExecutionDetails -> ShowS
[SubscriberExecutionDetails] -> ShowS
SubscriberExecutionDetails -> String
(Int -> SubscriberExecutionDetails -> ShowS)
-> (SubscriberExecutionDetails -> String)
-> ([SubscriberExecutionDetails] -> ShowS)
-> Show SubscriberExecutionDetails
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [SubscriberExecutionDetails] -> ShowS
$cshowList :: [SubscriberExecutionDetails] -> ShowS
show :: SubscriberExecutionDetails -> String
$cshow :: SubscriberExecutionDetails -> String
showsPrec :: Int -> SubscriberExecutionDetails -> ShowS
$cshowsPrec :: Int -> SubscriberExecutionDetails -> ShowS
Show, SubscriberExecutionDetails -> SubscriberExecutionDetails -> Bool
(SubscriberExecutionDetails -> SubscriberExecutionDetails -> Bool)
-> (SubscriberExecutionDetails
    -> SubscriberExecutionDetails -> Bool)
-> Eq SubscriberExecutionDetails
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: SubscriberExecutionDetails -> SubscriberExecutionDetails -> Bool
$c/= :: SubscriberExecutionDetails -> SubscriberExecutionDetails -> Bool
== :: SubscriberExecutionDetails -> SubscriberExecutionDetails -> Bool
$c== :: SubscriberExecutionDetails -> SubscriberExecutionDetails -> Bool
Eq)

-- | Execution information related to a cohort on a poll cycle
data CohortExecutionDetails = CohortExecutionDetails
  { CohortExecutionDetails -> CohortId
_cedCohortId :: !CohortId,
    CohortExecutionDetails -> CohortKey
_cedVariables :: !CohortVariables,
    -- | Nothing in case of an error
    CohortExecutionDetails -> Maybe Int
_cedResponseSize :: !(Maybe Int),
    -- | The response on this cycle has been pushed to these above subscribers
    -- New subscribers (those which haven't been around during the previous poll
    -- cycle) will always be part of this
    CohortExecutionDetails -> [SubscriberExecutionDetails]
_cedPushedTo :: ![SubscriberExecutionDetails],
    -- | The response on this cycle has *not* been pushed to these above
    -- subscribers. This would when the response hasn't changed from the previous
    -- polled cycle
    CohortExecutionDetails -> [SubscriberExecutionDetails]
_cedIgnored :: ![SubscriberExecutionDetails],
    CohortExecutionDetails -> BatchId
_cedBatchId :: !BatchId
  }
  deriving (Int -> CohortExecutionDetails -> ShowS
[CohortExecutionDetails] -> ShowS
CohortExecutionDetails -> String
(Int -> CohortExecutionDetails -> ShowS)
-> (CohortExecutionDetails -> String)
-> ([CohortExecutionDetails] -> ShowS)
-> Show CohortExecutionDetails
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [CohortExecutionDetails] -> ShowS
$cshowList :: [CohortExecutionDetails] -> ShowS
show :: CohortExecutionDetails -> String
$cshow :: CohortExecutionDetails -> String
showsPrec :: Int -> CohortExecutionDetails -> ShowS
$cshowsPrec :: Int -> CohortExecutionDetails -> ShowS
Show, CohortExecutionDetails -> CohortExecutionDetails -> Bool
(CohortExecutionDetails -> CohortExecutionDetails -> Bool)
-> (CohortExecutionDetails -> CohortExecutionDetails -> Bool)
-> Eq CohortExecutionDetails
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: CohortExecutionDetails -> CohortExecutionDetails -> Bool
$c/= :: CohortExecutionDetails -> CohortExecutionDetails -> Bool
== :: CohortExecutionDetails -> CohortExecutionDetails -> Bool
$c== :: CohortExecutionDetails -> CohortExecutionDetails -> Bool
Eq)

-- | Execution information related to a single batched execution
data BatchExecutionDetails = BatchExecutionDetails
  { -- | postgres execution time of each batch
    BatchExecutionDetails -> DiffTime
_bedPgExecutionTime :: !Clock.DiffTime,
    -- | time to taken to push to all cohorts belonging to this batch
    BatchExecutionDetails -> DiffTime
_bedPushTime :: !Clock.DiffTime,
    -- | id of the batch
    BatchExecutionDetails -> BatchId
_bedBatchId :: !BatchId,
    -- | execution details of the cohorts belonging to this batch
    BatchExecutionDetails -> [CohortExecutionDetails]
_bedCohorts :: ![CohortExecutionDetails],
    BatchExecutionDetails -> Maybe Int
_bedBatchResponseSizeBytes :: !(Maybe Int)
  }
  deriving (Int -> BatchExecutionDetails -> ShowS
[BatchExecutionDetails] -> ShowS
BatchExecutionDetails -> String
(Int -> BatchExecutionDetails -> ShowS)
-> (BatchExecutionDetails -> String)
-> ([BatchExecutionDetails] -> ShowS)
-> Show BatchExecutionDetails
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [BatchExecutionDetails] -> ShowS
$cshowList :: [BatchExecutionDetails] -> ShowS
show :: BatchExecutionDetails -> String
$cshow :: BatchExecutionDetails -> String
showsPrec :: Int -> BatchExecutionDetails -> ShowS
$cshowsPrec :: Int -> BatchExecutionDetails -> ShowS
Show, BatchExecutionDetails -> BatchExecutionDetails -> Bool
(BatchExecutionDetails -> BatchExecutionDetails -> Bool)
-> (BatchExecutionDetails -> BatchExecutionDetails -> Bool)
-> Eq BatchExecutionDetails
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: BatchExecutionDetails -> BatchExecutionDetails -> Bool
$c/= :: BatchExecutionDetails -> BatchExecutionDetails -> Bool
== :: BatchExecutionDetails -> BatchExecutionDetails -> Bool
$c== :: BatchExecutionDetails -> BatchExecutionDetails -> Bool
Eq)

-- | see Note [Minimal LiveQuery Poller Log]
batchExecutionDetailMinimal :: BatchExecutionDetails -> J.Value
batchExecutionDetailMinimal :: BatchExecutionDetails -> Value
batchExecutionDetailMinimal BatchExecutionDetails {[CohortExecutionDetails]
Maybe Int
DiffTime
BatchId
_bedBatchResponseSizeBytes :: Maybe Int
_bedCohorts :: [CohortExecutionDetails]
_bedBatchId :: BatchId
_bedPushTime :: DiffTime
_bedPgExecutionTime :: DiffTime
_bedBatchResponseSizeBytes :: BatchExecutionDetails -> Maybe Int
_bedCohorts :: BatchExecutionDetails -> [CohortExecutionDetails]
_bedBatchId :: BatchExecutionDetails -> BatchId
_bedPushTime :: BatchExecutionDetails -> DiffTime
_bedPgExecutionTime :: BatchExecutionDetails -> DiffTime
..} =
  let batchRespSize :: [Pair]
batchRespSize =
        [Pair] -> (Int -> [Pair]) -> Maybe Int -> [Pair]
forall b a. b -> (a -> b) -> Maybe a -> b
maybe
          [Pair]
forall a. Monoid a => a
mempty
          (\Int
respSize -> [Key
"batch_response_size_bytes" Key -> Int -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= Int
respSize])
          Maybe Int
_bedBatchResponseSizeBytes
   in [Pair] -> Value
J.object
        ( [ Key
"pg_execution_time" Key -> DiffTime -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= DiffTime
_bedPgExecutionTime,
            Key
"push_time" Key -> DiffTime -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= DiffTime
_bedPushTime
          ]
            -- log batch resp size only when there are no errors
            [Pair] -> [Pair] -> [Pair]
forall a. Semigroup a => a -> a -> a
<> [Pair]
batchRespSize
        )

-- TODO consider refactoring into two types: one that is returned from pollLiveQuery and pollStreamingQuery, and a parent type containing pollerId, sourceName, and so on, which is assembled at the callsites of those two functions. Move postPollHook out of those functions to callsites
data PollDetails = PollDetails
  { -- | the unique ID (basically a thread that run as a 'Poller') for the
    -- 'Poller'
    PollDetails -> PollerId
_pdPollerId :: !PollerId,
    -- | the multiplexed SQL query to be run against the database with all the
    -- variables together
    PollDetails -> Text
_pdGeneratedSql :: !Text,
    -- | the time taken to get a snapshot of cohorts from our 'SubscriptionsState'
    -- data structure
    PollDetails -> DiffTime
_pdSnapshotTime :: !Clock.DiffTime,
    -- | list of execution batches and their details
    PollDetails -> [BatchExecutionDetails]
_pdBatches :: ![BatchExecutionDetails],
    -- | total time spent on a poll cycle
    PollDetails -> DiffTime
_pdTotalTime :: !Clock.DiffTime,
    PollDetails -> SubscriptionsOptions
_pdLiveQueryOptions :: !SubscriptionsOptions,
    PollDetails -> SourceName
_pdSource :: !SourceName,
    PollDetails -> RoleName
_pdRole :: !RoleName,
    PollDetails -> ParameterizedQueryHash
_pdParameterizedQueryHash :: !ParameterizedQueryHash
  }
  deriving (Int -> PollDetails -> ShowS
[PollDetails] -> ShowS
PollDetails -> String
(Int -> PollDetails -> ShowS)
-> (PollDetails -> String)
-> ([PollDetails] -> ShowS)
-> Show PollDetails
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [PollDetails] -> ShowS
$cshowList :: [PollDetails] -> ShowS
show :: PollDetails -> String
$cshow :: PollDetails -> String
showsPrec :: Int -> PollDetails -> ShowS
$cshowsPrec :: Int -> PollDetails -> ShowS
Show, PollDetails -> PollDetails -> Bool
(PollDetails -> PollDetails -> Bool)
-> (PollDetails -> PollDetails -> Bool) -> Eq PollDetails
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: PollDetails -> PollDetails -> Bool
$c/= :: PollDetails -> PollDetails -> Bool
== :: PollDetails -> PollDetails -> Bool
$c== :: PollDetails -> PollDetails -> Bool
Eq)

{- Note [Minimal LiveQuery Poller Log]
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
We only want to log the minimal information in the livequery-poller-log as it
could be expensive to log the details of every subscriber (all poller related
information can always be retrieved by dumping the current live queries state)
We capture a lot more details in PollDetails and BatchExecutionDetails than
that is logged currently as other implementations such as pro can use them if
they need to.
-}

-- | see Note [Minimal LiveQuery Poller Log]
pollDetailMinimal :: PollDetails -> J.Value
pollDetailMinimal :: PollDetails -> Value
pollDetailMinimal PollDetails {[BatchExecutionDetails]
Text
DiffTime
ParameterizedQueryHash
RoleName
SubscriptionsOptions
SourceName
PollerId
_pdParameterizedQueryHash :: ParameterizedQueryHash
_pdRole :: RoleName
_pdSource :: SourceName
_pdLiveQueryOptions :: SubscriptionsOptions
_pdTotalTime :: DiffTime
_pdBatches :: [BatchExecutionDetails]
_pdSnapshotTime :: DiffTime
_pdGeneratedSql :: Text
_pdPollerId :: PollerId
_pdParameterizedQueryHash :: PollDetails -> ParameterizedQueryHash
_pdRole :: PollDetails -> RoleName
_pdSource :: PollDetails -> SourceName
_pdLiveQueryOptions :: PollDetails -> SubscriptionsOptions
_pdTotalTime :: PollDetails -> DiffTime
_pdBatches :: PollDetails -> [BatchExecutionDetails]
_pdSnapshotTime :: PollDetails -> DiffTime
_pdGeneratedSql :: PollDetails -> Text
_pdPollerId :: PollDetails -> PollerId
..} =
  [Pair] -> Value
J.object
    [ Key
"poller_id" Key -> PollerId -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= PollerId
_pdPollerId,
      Key
"snapshot_time" Key -> DiffTime -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= DiffTime
_pdSnapshotTime,
      Key
"batches" Key -> [Value] -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= (BatchExecutionDetails -> Value)
-> [BatchExecutionDetails] -> [Value]
forall a b. (a -> b) -> [a] -> [b]
map BatchExecutionDetails -> Value
batchExecutionDetailMinimal [BatchExecutionDetails]
_pdBatches,
      Key
"total_time" Key -> DiffTime -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= DiffTime
_pdTotalTime,
      Key
"source" Key -> SourceName -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= SourceName
_pdSource,
      Key
"role" Key -> RoleName -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
J..= RoleName
_pdRole
    ]

instance L.ToEngineLog PollDetails L.Hasura where
  toEngineLog :: PollDetails -> (LogLevel, EngineLogType Hasura, Value)
toEngineLog PollDetails
pl = (LogLevel
L.LevelInfo, EngineLogType Hasura
L.ELTLivequeryPollerLog, PollDetails -> Value
pollDetailMinimal PollDetails
pl)

type SubscriptionPostPollHook = PollDetails -> IO ()

-- the default SubscriptionPostPollHook
defaultSubscriptionPostPollHook :: L.Logger L.Hasura -> SubscriptionPostPollHook
defaultSubscriptionPostPollHook :: Logger Hasura -> SubscriptionPostPollHook
defaultSubscriptionPostPollHook = \Logger Hasura
x -> Logger Hasura
-> forall a (m :: * -> *).
   (ToEngineLog a Hasura, MonadIO m) =>
   a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
   (ToEngineLog a impl, MonadIO m) =>
   a -> m ()
L.unLogger Logger Hasura
x