-- | Multiplexed subscription poller threads; see "Hasura.GraphQL.Execute.Subscription" for details.
module Hasura.GraphQL.Execute.Subscription.Poll.Common
  ( -- * Pollers
    Poller (..),
    PollerId (..),
    PollerIOState (..),
    PollerKey (..),
    BackendPollerKey (..),
    PollerMap,
    dumpPollerMap,
    PollDetailsError (..),
    PollDetails (..),
    BatchExecutionDetails (..),
    CohortExecutionDetails (..),
    SubscriptionPostPollHook,
    defaultSubscriptionPostPollHook,
    PollerResponseState (..),

    -- * Cohorts
    Cohort (..),
    CohortSnapshot (..),
    CursorVariableValues (..),
    CohortId,
    newCohortId,
    CohortVariables,
    CohortKey,
    CohortMap,

    -- * Subscribers
    Subscriber (..),
    SubscriberId,
    newSubscriberId,
    SubscriberMetadata,
    mkSubscriberMetadata,
    unSubscriberMetadata,
    SubscriberMap,
    OnChange,
    SubscriptionGQResponse,
    SubscriptionResponse (..),
    SubscriptionMetadata (..),
    SubscriberExecutionDetails (..),

    -- * Batch
    BatchId (..),

    -- * Hash
    ResponseHash (..),
    mkRespHash,
  )
where

import Control.Concurrent.STM qualified as STM
import Control.Immortal qualified as Immortal
import Crypto.Hash qualified as CH
import Data.Aeson qualified as J
import Data.ByteString qualified as BS
import Data.Text (unpack)
import Data.Time.Clock qualified as Clock
import Data.UUID qualified as UUID
import Data.UUID.V4 qualified as UUID
import Hasura.Base.Error (QErr, showQErr)
import Hasura.GraphQL.Execute.Subscription.Options
import Hasura.GraphQL.Execute.Subscription.Plan
import Hasura.GraphQL.Execute.Subscription.TMap qualified as TMap
import Hasura.GraphQL.ParameterizedQueryHash (ParameterizedQueryHash)
import Hasura.GraphQL.Transport.HTTP.Protocol
import Hasura.GraphQL.Transport.WebSocket.Protocol (OperationId)
import Hasura.GraphQL.Transport.WebSocket.Server qualified as WS
import Hasura.Logging qualified as L
import Hasura.Prelude
import Hasura.RQL.Types.Backend
import Hasura.RQL.Types.Common (SourceName)
import Hasura.RQL.Types.Roles (RoleName)
import Hasura.RQL.Types.Subscription (SubscriptionType)
import Hasura.SQL.AnyBackend qualified as AB
import Hasura.Server.Types (RequestId)
import ListT qualified
import StmContainers.Map qualified as STMMap

-- ----------------------------------------------------------------------------------------------
-- Subscribers

newtype SubscriberId = SubscriberId {SubscriberId -> UUID
unSubscriberId :: UUID.UUID}
  deriving (Int -> SubscriberId -> ShowS
[SubscriberId] -> ShowS
SubscriberId -> String
(Int -> SubscriberId -> ShowS)
-> (SubscriberId -> String)
-> ([SubscriberId] -> ShowS)
-> Show SubscriberId
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> SubscriberId -> ShowS
showsPrec :: Int -> SubscriberId -> ShowS
$cshow :: SubscriberId -> String
show :: SubscriberId -> String
$cshowList :: [SubscriberId] -> ShowS
showList :: [SubscriberId] -> ShowS
Show, SubscriberId -> SubscriberId -> Bool
(SubscriberId -> SubscriberId -> Bool)
-> (SubscriberId -> SubscriberId -> Bool) -> Eq SubscriberId
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: SubscriberId -> SubscriberId -> Bool
== :: SubscriberId -> SubscriberId -> Bool
$c/= :: SubscriberId -> SubscriberId -> Bool
/= :: SubscriberId -> SubscriberId -> Bool
Eq, (forall x. SubscriberId -> Rep SubscriberId x)
-> (forall x. Rep SubscriberId x -> SubscriberId)
-> Generic SubscriberId
forall x. Rep SubscriberId x -> SubscriberId
forall x. SubscriberId -> Rep SubscriberId x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. SubscriberId -> Rep SubscriberId x
from :: forall x. SubscriberId -> Rep SubscriberId x
$cto :: forall x. Rep SubscriberId x -> SubscriberId
to :: forall x. Rep SubscriberId x -> SubscriberId
Generic, Eq SubscriberId
Eq SubscriberId
-> (Int -> SubscriberId -> Int)
-> (SubscriberId -> Int)
-> Hashable SubscriberId
Int -> SubscriberId -> Int
SubscriberId -> Int
forall a. Eq a -> (Int -> a -> Int) -> (a -> Int) -> Hashable a
$chashWithSalt :: Int -> SubscriberId -> Int
hashWithSalt :: Int -> SubscriberId -> Int
$chash :: SubscriberId -> Int
hash :: SubscriberId -> Int
Hashable, [SubscriberId] -> Value
[SubscriberId] -> Encoding
SubscriberId -> Value
SubscriberId -> Encoding
(SubscriberId -> Value)
-> (SubscriberId -> Encoding)
-> ([SubscriberId] -> Value)
-> ([SubscriberId] -> Encoding)
-> ToJSON SubscriberId
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> ToJSON a
$ctoJSON :: SubscriberId -> Value
toJSON :: SubscriberId -> Value
$ctoEncoding :: SubscriberId -> Encoding
toEncoding :: SubscriberId -> Encoding
$ctoJSONList :: [SubscriberId] -> Value
toJSONList :: [SubscriberId] -> Value
$ctoEncodingList :: [SubscriberId] -> Encoding
toEncodingList :: [SubscriberId] -> Encoding
J.ToJSON)

newSubscriberId :: IO SubscriberId
newSubscriberId :: IO SubscriberId
newSubscriberId =
  UUID -> SubscriberId
SubscriberId (UUID -> SubscriberId) -> IO UUID -> IO SubscriberId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom

-- | Allows a user of the live query subsystem (currently websocket transport)
-- to attach arbitrary metadata about a subscriber. This information is available
-- as part of Subscriber in CohortExecutionDetails and can be logged by customizing
-- in pollerlog
newtype SubscriberMetadata = SubscriberMetadata {SubscriberMetadata -> Value
unSubscriberMetadata :: J.Value}
  deriving (Int -> SubscriberMetadata -> ShowS
[SubscriberMetadata] -> ShowS
SubscriberMetadata -> String
(Int -> SubscriberMetadata -> ShowS)
-> (SubscriberMetadata -> String)
-> ([SubscriberMetadata] -> ShowS)
-> Show SubscriberMetadata
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> SubscriberMetadata -> ShowS
showsPrec :: Int -> SubscriberMetadata -> ShowS
$cshow :: SubscriberMetadata -> String
show :: SubscriberMetadata -> String
$cshowList :: [SubscriberMetadata] -> ShowS
showList :: [SubscriberMetadata] -> ShowS
Show, SubscriberMetadata -> SubscriberMetadata -> Bool
(SubscriberMetadata -> SubscriberMetadata -> Bool)
-> (SubscriberMetadata -> SubscriberMetadata -> Bool)
-> Eq SubscriberMetadata
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: SubscriberMetadata -> SubscriberMetadata -> Bool
== :: SubscriberMetadata -> SubscriberMetadata -> Bool
$c/= :: SubscriberMetadata -> SubscriberMetadata -> Bool
/= :: SubscriberMetadata -> SubscriberMetadata -> Bool
Eq, [SubscriberMetadata] -> Value
[SubscriberMetadata] -> Encoding
SubscriberMetadata -> Value
SubscriberMetadata -> Encoding
(SubscriberMetadata -> Value)
-> (SubscriberMetadata -> Encoding)
-> ([SubscriberMetadata] -> Value)
-> ([SubscriberMetadata] -> Encoding)
-> ToJSON SubscriberMetadata
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> ToJSON a
$ctoJSON :: SubscriberMetadata -> Value
toJSON :: SubscriberMetadata -> Value
$ctoEncoding :: SubscriberMetadata -> Encoding
toEncoding :: SubscriberMetadata -> Encoding
$ctoJSONList :: [SubscriberMetadata] -> Value
toJSONList :: [SubscriberMetadata] -> Value
$ctoEncodingList :: [SubscriberMetadata] -> Encoding
toEncodingList :: [SubscriberMetadata] -> Encoding
J.ToJSON)

mkSubscriberMetadata :: WS.WSId -> OperationId -> Maybe OperationName -> RequestId -> SubscriberMetadata
mkSubscriberMetadata :: WSId
-> OperationId
-> Maybe OperationName
-> RequestId
-> SubscriberMetadata
mkSubscriberMetadata WSId
websocketId OperationId
operationId Maybe OperationName
operationName RequestId
reqId =
  Value -> SubscriberMetadata
SubscriberMetadata
    (Value -> SubscriberMetadata) -> Value -> SubscriberMetadata
forall a b. (a -> b) -> a -> b
$ [Pair] -> Value
J.object
      [ Key
"websocket_id" Key -> WSId -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
J..= WSId
websocketId,
        Key
"operation_id" Key -> OperationId -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
J..= OperationId
operationId,
        Key
"operation_name" Key -> Maybe OperationName -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
J..= Maybe OperationName
operationName,
        Key
"request_id" Key -> RequestId -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
J..= RequestId
reqId
      ]

data Subscriber = Subscriber
  { Subscriber -> SubscriberId
_sId :: !SubscriberId,
    Subscriber -> SubscriberMetadata
_sMetadata :: !SubscriberMetadata,
    Subscriber -> RequestId
_sRequestId :: !RequestId,
    Subscriber -> Maybe OperationName
_sOperationName :: !(Maybe OperationName),
    Subscriber -> OnChange
_sOnChangeCallback :: !OnChange
  }

-- | Subscription onChange metadata, used for adding more extra analytics data
data SubscriptionMetadata = SubscriptionMetadata
  { SubscriptionMetadata -> DiffTime
_sqmExecutionTime :: !Clock.DiffTime
  }

data SubscriptionResponse = SubscriptionResponse
  { SubscriptionResponse -> ByteString
_lqrPayload :: !BS.ByteString,
    SubscriptionResponse -> DiffTime
_lqrExecutionTime :: !Clock.DiffTime
  }

type SubscriptionGQResponse = GQResult SubscriptionResponse

type OnChange = SubscriptionGQResponse -> IO ()

type SubscriberMap = TMap.TMap SubscriberId Subscriber

data PollerResponseState
  = PRSSuccess
  | PRSError
  deriving (PollerResponseState -> PollerResponseState -> Bool
(PollerResponseState -> PollerResponseState -> Bool)
-> (PollerResponseState -> PollerResponseState -> Bool)
-> Eq PollerResponseState
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: PollerResponseState -> PollerResponseState -> Bool
== :: PollerResponseState -> PollerResponseState -> Bool
$c/= :: PollerResponseState -> PollerResponseState -> Bool
/= :: PollerResponseState -> PollerResponseState -> Bool
Eq)

-- -------------------------------------------------------------------------------------------------
-- Cohorts

-- | A batched group of 'Subscriber's who are not only listening to the same query but also have
-- identical session and query variables. Each result pushed to a 'Cohort' is forwarded along to
-- each of its 'Subscriber's.
--
-- In SQL, each 'Cohort' corresponds to a single row in the laterally-joined @_subs@ table (and
-- therefore a single row in the query result).
--
-- See also 'CohortMap'.
data Cohort streamCursorVars = Cohort
  { -- | a unique identifier used to identify the cohort in the generated query
    forall streamCursorVars. Cohort streamCursorVars -> CohortId
_cCohortId :: CohortId,
    -- | Contains a hash of the previous poll's DB query result, if any, used to determine
    --   if we need to push an updated result to the subscribers or not.
    forall streamCursorVars.
Cohort streamCursorVars -> TVar (Maybe ResponseHash)
_cPreviousResponse :: STM.TVar (Maybe ResponseHash),
    -- | the subscribers we’ve already pushed a result to; we push new results to them if the
    -- response changes
    forall streamCursorVars. Cohort streamCursorVars -> SubscriberMap
_cExistingSubscribers :: SubscriberMap,
    -- | subscribers we haven’t yet pushed any results to; we push results to them regardless if the
    -- result changed, then merge them in the map of existing subscribers
    forall streamCursorVars. Cohort streamCursorVars -> SubscriberMap
_cNewSubscribers :: SubscriberMap,
    -- | a mutable type which holds the latest value of the subscription stream cursor. In case
    --   of live query subscription, this field is ignored by setting `streamCursorVars` to `()`
    forall streamCursorVars.
Cohort streamCursorVars -> streamCursorVars
_cStreamCursorVariables :: streamCursorVars
  }

-- | The @BatchId@ is a number based ID to uniquely identify a batch in a single poll and
--   it's used to identify the batch to which a cohort belongs to.
newtype BatchId = BatchId {BatchId -> Int
_unBatchId :: Int}
  deriving (Int -> BatchId -> ShowS
[BatchId] -> ShowS
BatchId -> String
(Int -> BatchId -> ShowS)
-> (BatchId -> String) -> ([BatchId] -> ShowS) -> Show BatchId
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> BatchId -> ShowS
showsPrec :: Int -> BatchId -> ShowS
$cshow :: BatchId -> String
show :: BatchId -> String
$cshowList :: [BatchId] -> ShowS
showList :: [BatchId] -> ShowS
Show, BatchId -> BatchId -> Bool
(BatchId -> BatchId -> Bool)
-> (BatchId -> BatchId -> Bool) -> Eq BatchId
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: BatchId -> BatchId -> Bool
== :: BatchId -> BatchId -> Bool
$c/= :: BatchId -> BatchId -> Bool
/= :: BatchId -> BatchId -> Bool
Eq, [BatchId] -> Value
[BatchId] -> Encoding
BatchId -> Value
BatchId -> Encoding
(BatchId -> Value)
-> (BatchId -> Encoding)
-> ([BatchId] -> Value)
-> ([BatchId] -> Encoding)
-> ToJSON BatchId
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> ToJSON a
$ctoJSON :: BatchId -> Value
toJSON :: BatchId -> Value
$ctoEncoding :: BatchId -> Encoding
toEncoding :: BatchId -> Encoding
$ctoJSONList :: [BatchId] -> Value
toJSONList :: [BatchId] -> Value
$ctoEncodingList :: [BatchId] -> Encoding
toEncodingList :: [BatchId] -> Encoding
J.ToJSON)

{- Note [Blake2b faster than SHA-256]
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
At the time of writing, from https://blake2.net, it is stated,
"BLAKE2 is a cryptographic hash function faster than MD5, SHA-1, SHA-2, and SHA-3,
yet is at least as secure as the latest standard SHA-3".
-}

-- | A hash used to determine if the result changed without having to keep the entire result in
-- memory. Using a cryptographic hash ensures that a hash collision is almost impossible: with 256
-- bits, even if a subscription changes once per second for an entire year, the probability of a
-- hash collision is ~4.294417×10-63. See Note [Blake2b faster than SHA-256].
newtype ResponseHash = ResponseHash {ResponseHash -> Digest Blake2b_256
unResponseHash :: CH.Digest CH.Blake2b_256}
  deriving (Int -> ResponseHash -> ShowS
[ResponseHash] -> ShowS
ResponseHash -> String
(Int -> ResponseHash -> ShowS)
-> (ResponseHash -> String)
-> ([ResponseHash] -> ShowS)
-> Show ResponseHash
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ResponseHash -> ShowS
showsPrec :: Int -> ResponseHash -> ShowS
$cshow :: ResponseHash -> String
show :: ResponseHash -> String
$cshowList :: [ResponseHash] -> ShowS
showList :: [ResponseHash] -> ShowS
Show, ResponseHash -> ResponseHash -> Bool
(ResponseHash -> ResponseHash -> Bool)
-> (ResponseHash -> ResponseHash -> Bool) -> Eq ResponseHash
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ResponseHash -> ResponseHash -> Bool
== :: ResponseHash -> ResponseHash -> Bool
$c/= :: ResponseHash -> ResponseHash -> Bool
/= :: ResponseHash -> ResponseHash -> Bool
Eq)

instance J.ToJSON ResponseHash where
  toJSON :: ResponseHash -> Value
toJSON = String -> Value
forall a. ToJSON a => a -> Value
J.toJSON (String -> Value)
-> (ResponseHash -> String) -> ResponseHash -> Value
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Digest Blake2b_256 -> String
forall a. Show a => a -> String
show (Digest Blake2b_256 -> String)
-> (ResponseHash -> Digest Blake2b_256) -> ResponseHash -> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ResponseHash -> Digest Blake2b_256
unResponseHash

mkRespHash :: BS.ByteString -> ResponseHash
mkRespHash :: ByteString -> ResponseHash
mkRespHash = Digest Blake2b_256 -> ResponseHash
ResponseHash (Digest Blake2b_256 -> ResponseHash)
-> (ByteString -> Digest Blake2b_256) -> ByteString -> ResponseHash
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Digest Blake2b_256
forall ba a.
(ByteArrayAccess ba, HashAlgorithm a) =>
ba -> Digest a
CH.hash

-- | A key we use to determine if two 'Subscriber's belong in the same 'Cohort'
-- (assuming they already meet the criteria to be in the same 'Poller'). Note
-- the distinction between this and 'CohortId'; the latter is a completely
-- synthetic key used only to identify the cohort in the generated SQL query.
type CohortKey = CohortVariables

-- | This has the invariant, maintained in 'removeLiveQuery', that it contains
-- no 'Cohort' with zero total (existing + new) subscribers.
type CohortMap streamCursor = TMap.TMap CohortKey (Cohort streamCursor)

dumpCohortMap :: CohortMap streamCursor -> IO J.Value
dumpCohortMap :: forall streamCursor. CohortMap streamCursor -> IO Value
dumpCohortMap CohortMap streamCursor
cohortMap = do
  [(CohortKey, Cohort streamCursor)]
cohorts <- STM [(CohortKey, Cohort streamCursor)]
-> IO [(CohortKey, Cohort streamCursor)]
forall a. STM a -> IO a
STM.atomically (STM [(CohortKey, Cohort streamCursor)]
 -> IO [(CohortKey, Cohort streamCursor)])
-> STM [(CohortKey, Cohort streamCursor)]
-> IO [(CohortKey, Cohort streamCursor)]
forall a b. (a -> b) -> a -> b
$ CohortMap streamCursor -> STM [(CohortKey, Cohort streamCursor)]
forall k v. TMap k v -> STM [(k, v)]
TMap.toList CohortMap streamCursor
cohortMap
  ([Value] -> Value) -> IO [Value] -> IO Value
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap [Value] -> Value
forall a. ToJSON a => a -> Value
J.toJSON (IO [Value] -> IO Value)
-> (((CohortKey, Cohort streamCursor) -> IO Value) -> IO [Value])
-> ((CohortKey, Cohort streamCursor) -> IO Value)
-> IO Value
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [(CohortKey, Cohort streamCursor)]
-> ((CohortKey, Cohort streamCursor) -> IO Value) -> IO [Value]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [(CohortKey, Cohort streamCursor)]
cohorts (((CohortKey, Cohort streamCursor) -> IO Value) -> IO Value)
-> ((CohortKey, Cohort streamCursor) -> IO Value) -> IO Value
forall a b. (a -> b) -> a -> b
$ \(CohortKey
variableValues, Cohort streamCursor
cohort) -> do
    Value
cohortJ <- Cohort streamCursor -> IO Value
forall {streamCursorVars}. Cohort streamCursorVars -> IO Value
dumpCohort Cohort streamCursor
cohort
    Value -> IO Value
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return
      (Value -> IO Value) -> Value -> IO Value
forall a b. (a -> b) -> a -> b
$ [Pair] -> Value
J.object
        [ Key
"variables" Key -> CohortKey -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
J..= CohortKey
variableValues,
          Key
"cohort" Key -> Value -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
J..= Value
cohortJ
        ]
  where
    dumpCohort :: Cohort streamCursorVars -> IO Value
dumpCohort (Cohort CohortId
respId TVar (Maybe ResponseHash)
respTV SubscriberMap
curOps SubscriberMap
newOps streamCursorVars
_) =
      STM Value -> IO Value
forall a. STM a -> IO a
STM.atomically (STM Value -> IO Value) -> STM Value -> IO Value
forall a b. (a -> b) -> a -> b
$ do
        Maybe ResponseHash
prevResHash <- TVar (Maybe ResponseHash) -> STM (Maybe ResponseHash)
forall a. TVar a -> STM a
STM.readTVar TVar (Maybe ResponseHash)
respTV
        [(SubscriberId, Subscriber)]
curOpIds <- SubscriberMap -> STM [(SubscriberId, Subscriber)]
forall k v. TMap k v -> STM [(k, v)]
TMap.toList SubscriberMap
curOps
        [(SubscriberId, Subscriber)]
newOpIds <- SubscriberMap -> STM [(SubscriberId, Subscriber)]
forall k v. TMap k v -> STM [(k, v)]
TMap.toList SubscriberMap
newOps
        Value -> STM Value
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return
          (Value -> STM Value) -> Value -> STM Value
forall a b. (a -> b) -> a -> b
$ [Pair] -> Value
J.object
            [ Key
"resp_id" Key -> CohortId -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
J..= CohortId
respId,
              Key
"current_ops" Key -> [SubscriberId] -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
J..= ((SubscriberId, Subscriber) -> SubscriberId)
-> [(SubscriberId, Subscriber)] -> [SubscriberId]
forall a b. (a -> b) -> [a] -> [b]
map (SubscriberId, Subscriber) -> SubscriberId
forall a b. (a, b) -> a
fst [(SubscriberId, Subscriber)]
curOpIds,
              Key
"new_ops" Key -> [SubscriberId] -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
J..= ((SubscriberId, Subscriber) -> SubscriberId)
-> [(SubscriberId, Subscriber)] -> [SubscriberId]
forall a b. (a -> b) -> [a] -> [b]
map (SubscriberId, Subscriber) -> SubscriberId
forall a b. (a, b) -> a
fst [(SubscriberId, Subscriber)]
newOpIds,
              Key
"previous_result_hash" Key -> Maybe ResponseHash -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
J..= Maybe ResponseHash
prevResHash
            ]

data CohortSnapshot = CohortSnapshot
  { CohortSnapshot -> CohortKey
_csVariables :: CohortVariables,
    CohortSnapshot -> TVar (Maybe ResponseHash)
_csPreviousResponse :: STM.TVar (Maybe ResponseHash),
    CohortSnapshot -> [Subscriber]
_csExistingSubscribers :: [Subscriber],
    CohortSnapshot -> [Subscriber]
_csNewSubscribers :: [Subscriber]
  }

-- -----------------------------------------------------------------------------
-- Pollers

-- | A unique, multiplexed query. Each 'Poller' has its own polling thread that
-- periodically polls Postgres and pushes results to each of its listening
-- 'Cohort's.
--
-- In SQL, an 'Poller' corresponds to a single, multiplexed query, though in
-- practice, 'Poller's with large numbers of 'Cohort's are batched into
-- multiple concurrent queries for performance reasons.
data Poller streamCursor = Poller
  { forall streamCursor. Poller streamCursor -> CohortMap streamCursor
_pCohorts :: CohortMap streamCursor,
    forall streamCursor.
Poller streamCursor -> TVar PollerResponseState
_pPollerState :: STM.TVar PollerResponseState,
    -- | This is in a separate 'STM.TMVar' because it’s important that we are
    -- able to construct 'Poller' values in 'STM.STM' --- we need the insertion
    -- into the 'PollerMap' to be atomic to ensure that we don’t accidentally
    -- create two for the same query due to a race. However, we can’t spawn the
    -- worker thread or create the metrics store in 'STM.STM', so we insert it
    -- into the 'Poller' only after we’re certain we won’t create any duplicates.
    --
    -- This var is "write once", moving monotonically from empty to full.
    -- TODO this could probably be tightened up to something like
    -- 'STM PollerIOState'
    forall streamCursor. Poller streamCursor -> TMVar PollerIOState
_pIOState :: STM.TMVar PollerIOState,
    forall streamCursor. Poller streamCursor -> ParameterizedQueryHash
_pParameterizedQueryHash :: ParameterizedQueryHash,
    -- The operation names of the subscriptions that are part of this poller. This is
    -- used while emitting subscription metrics
    forall streamCursor.
Poller streamCursor -> TMap (Maybe OperationName) Int
_pOperationNamesMap :: TMap.TMap (Maybe OperationName) Int
  }

data PollerIOState = PollerIOState
  { -- | a handle on the poller’s worker thread that can be used to
    -- 'Immortal.stop' it if all its cohorts stop listening
    PollerIOState -> Thread
_pThread :: !Immortal.Thread,
    PollerIOState -> PollerId
_pId :: !PollerId
  }

data PollerKey b =
  -- we don't need operation name here as a subscription will only have a
  -- single top level field
  PollerKey
  { forall (b :: BackendType). PollerKey b -> SourceName
_lgSource :: SourceName,
    forall (b :: BackendType). PollerKey b -> RoleName
_lgRole :: RoleName,
    forall (b :: BackendType). PollerKey b -> Text
_lgQuery :: Text,
    forall (b :: BackendType).
PollerKey b -> ResolvedConnectionTemplate b
_lgConnectionKey :: (ResolvedConnectionTemplate b),
    forall (b :: BackendType). PollerKey b -> ParameterizedQueryHash
_lgParameterizedQueryHash :: ParameterizedQueryHash
  }
  deriving ((forall x. PollerKey b -> Rep (PollerKey b) x)
-> (forall x. Rep (PollerKey b) x -> PollerKey b)
-> Generic (PollerKey b)
forall x. Rep (PollerKey b) x -> PollerKey b
forall x. PollerKey b -> Rep (PollerKey b) x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
forall (b :: BackendType) x. Rep (PollerKey b) x -> PollerKey b
forall (b :: BackendType) x. PollerKey b -> Rep (PollerKey b) x
$cfrom :: forall (b :: BackendType) x. PollerKey b -> Rep (PollerKey b) x
from :: forall x. PollerKey b -> Rep (PollerKey b) x
$cto :: forall (b :: BackendType) x. Rep (PollerKey b) x -> PollerKey b
to :: forall x. Rep (PollerKey b) x -> PollerKey b
Generic)

deriving instance (Backend b) => Show (PollerKey b)

deriving instance (Backend b) => Eq (PollerKey b)

instance (Backend b) => Hashable (PollerKey b)

instance J.ToJSON (PollerKey b) where
  toJSON :: PollerKey b -> Value
toJSON (PollerKey SourceName
source RoleName
role Text
query ResolvedConnectionTemplate b
_connectionKey ParameterizedQueryHash
_) =
    [Pair] -> Value
J.object
      [ Key
"source" Key -> SourceName -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
J..= SourceName
source,
        Key
"role" Key -> RoleName -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
J..= RoleName
role,
        Key
"query" Key -> Text -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
J..= Text
query
      ]

newtype BackendPollerKey = BackendPollerKey {BackendPollerKey -> AnyBackend PollerKey
unBackendPollerKey :: AB.AnyBackend PollerKey}
  deriving (BackendPollerKey -> BackendPollerKey -> Bool
(BackendPollerKey -> BackendPollerKey -> Bool)
-> (BackendPollerKey -> BackendPollerKey -> Bool)
-> Eq BackendPollerKey
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: BackendPollerKey -> BackendPollerKey -> Bool
== :: BackendPollerKey -> BackendPollerKey -> Bool
$c/= :: BackendPollerKey -> BackendPollerKey -> Bool
/= :: BackendPollerKey -> BackendPollerKey -> Bool
Eq, Int -> BackendPollerKey -> ShowS
[BackendPollerKey] -> ShowS
BackendPollerKey -> String
(Int -> BackendPollerKey -> ShowS)
-> (BackendPollerKey -> String)
-> ([BackendPollerKey] -> ShowS)
-> Show BackendPollerKey
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> BackendPollerKey -> ShowS
showsPrec :: Int -> BackendPollerKey -> ShowS
$cshow :: BackendPollerKey -> String
show :: BackendPollerKey -> String
$cshowList :: [BackendPollerKey] -> ShowS
showList :: [BackendPollerKey] -> ShowS
Show, Eq BackendPollerKey
Eq BackendPollerKey
-> (Int -> BackendPollerKey -> Int)
-> (BackendPollerKey -> Int)
-> Hashable BackendPollerKey
Int -> BackendPollerKey -> Int
BackendPollerKey -> Int
forall a. Eq a -> (Int -> a -> Int) -> (a -> Int) -> Hashable a
$chashWithSalt :: Int -> BackendPollerKey -> Int
hashWithSalt :: Int -> BackendPollerKey -> Int
$chash :: BackendPollerKey -> Int
hash :: BackendPollerKey -> Int
Hashable)

type PollerMap streamCursor = STMMap.Map BackendPollerKey (Poller streamCursor)

-- | For dev debugging, output subject to change.
dumpPollerMap :: Bool -> PollerMap streamCursor -> IO J.Value
dumpPollerMap :: forall streamCursor. Bool -> PollerMap streamCursor -> IO Value
dumpPollerMap Bool
extended PollerMap streamCursor
pollerMap =
  ([Value] -> Value) -> IO [Value] -> IO Value
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap [Value] -> Value
forall a. ToJSON a => a -> Value
J.toJSON (IO [Value] -> IO Value) -> IO [Value] -> IO Value
forall a b. (a -> b) -> a -> b
$ do
    [(BackendPollerKey, Poller streamCursor)]
entries <- STM [(BackendPollerKey, Poller streamCursor)]
-> IO [(BackendPollerKey, Poller streamCursor)]
forall a. STM a -> IO a
STM.atomically (STM [(BackendPollerKey, Poller streamCursor)]
 -> IO [(BackendPollerKey, Poller streamCursor)])
-> STM [(BackendPollerKey, Poller streamCursor)]
-> IO [(BackendPollerKey, Poller streamCursor)]
forall a b. (a -> b) -> a -> b
$ ListT STM (BackendPollerKey, Poller streamCursor)
-> STM [(BackendPollerKey, Poller streamCursor)]
forall (m :: * -> *) a. Monad m => ListT m a -> m [a]
ListT.toList (ListT STM (BackendPollerKey, Poller streamCursor)
 -> STM [(BackendPollerKey, Poller streamCursor)])
-> ListT STM (BackendPollerKey, Poller streamCursor)
-> STM [(BackendPollerKey, Poller streamCursor)]
forall a b. (a -> b) -> a -> b
$ PollerMap streamCursor
-> ListT STM (BackendPollerKey, Poller streamCursor)
forall key value. Map key value -> ListT STM (key, value)
STMMap.listT PollerMap streamCursor
pollerMap
    [(BackendPollerKey, Poller streamCursor)]
-> ((BackendPollerKey, Poller streamCursor) -> IO Value)
-> IO [Value]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [(BackendPollerKey, Poller streamCursor)]
entries (((BackendPollerKey, Poller streamCursor) -> IO Value)
 -> IO [Value])
-> ((BackendPollerKey, Poller streamCursor) -> IO Value)
-> IO [Value]
forall a b. (a -> b) -> a -> b
$ \(BackendPollerKey
pollerKey, Poller CohortMap streamCursor
cohortsMap TVar PollerResponseState
_responseState TMVar PollerIOState
ioState ParameterizedQueryHash
_paramQueryHash TMap (Maybe OperationName) Int
_opNames) ->
      forall (c :: BackendType -> Constraint) (i :: BackendType -> *) r.
AllBackendsSatisfy c =>
AnyBackend i -> (forall (b :: BackendType). c b => i b -> r) -> r
AB.dispatchAnyBackend @Backend (BackendPollerKey -> AnyBackend PollerKey
unBackendPollerKey BackendPollerKey
pollerKey) ((forall (b :: BackendType). Backend b => PollerKey b -> IO Value)
 -> IO Value)
-> (forall (b :: BackendType).
    Backend b =>
    PollerKey b -> IO Value)
-> IO Value
forall a b. (a -> b) -> a -> b
$ \(PollerKey SourceName
source RoleName
role Text
query ResolvedConnectionTemplate b
_connectionKey ParameterizedQueryHash
_) -> do
        PollerIOState Thread
threadId PollerId
pollerId <- STM PollerIOState -> IO PollerIOState
forall a. STM a -> IO a
STM.atomically (STM PollerIOState -> IO PollerIOState)
-> STM PollerIOState -> IO PollerIOState
forall a b. (a -> b) -> a -> b
$ TMVar PollerIOState -> STM PollerIOState
forall a. TMVar a -> STM a
STM.readTMVar TMVar PollerIOState
ioState
        Maybe Value
cohortsJ <-
          if Bool
extended
            then Value -> Maybe Value
forall a. a -> Maybe a
Just (Value -> Maybe Value) -> IO Value -> IO (Maybe Value)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> CohortMap streamCursor -> IO Value
forall streamCursor. CohortMap streamCursor -> IO Value
dumpCohortMap CohortMap streamCursor
cohortsMap
            else Maybe Value -> IO (Maybe Value)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe Value
forall a. Maybe a
Nothing
        Value -> IO Value
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return
          (Value -> IO Value) -> Value -> IO Value
forall a b. (a -> b) -> a -> b
$ [Pair] -> Value
J.object
            [ Key
"source" Key -> SourceName -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
J..= SourceName
source,
              Key
"role" Key -> RoleName -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
J..= RoleName
role,
              Key
"thread_id" Key -> String -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
J..= ThreadId -> String
forall a. Show a => a -> String
show (Thread -> ThreadId
Immortal.threadId Thread
threadId),
              Key
"poller_id" Key -> PollerId -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
J..= PollerId
pollerId,
              Key
"multiplexed_query" Key -> Text -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
J..= Text
query,
              Key
"cohorts" Key -> Maybe Value -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
J..= Maybe Value
cohortsJ,
              Key
"parameterized_query_hash" Key -> ParameterizedQueryHash -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
J..= ParameterizedQueryHash
_paramQueryHash
            ]

-- | An ID to track unique 'Poller's, so that we can gather metrics about each
-- poller
newtype PollerId = PollerId {PollerId -> UUID
unPollerId :: UUID.UUID}
  deriving (Int -> PollerId -> ShowS
[PollerId] -> ShowS
PollerId -> String
(Int -> PollerId -> ShowS)
-> (PollerId -> String) -> ([PollerId] -> ShowS) -> Show PollerId
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> PollerId -> ShowS
showsPrec :: Int -> PollerId -> ShowS
$cshow :: PollerId -> String
show :: PollerId -> String
$cshowList :: [PollerId] -> ShowS
showList :: [PollerId] -> ShowS
Show, PollerId -> PollerId -> Bool
(PollerId -> PollerId -> Bool)
-> (PollerId -> PollerId -> Bool) -> Eq PollerId
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: PollerId -> PollerId -> Bool
== :: PollerId -> PollerId -> Bool
$c/= :: PollerId -> PollerId -> Bool
/= :: PollerId -> PollerId -> Bool
Eq, (forall x. PollerId -> Rep PollerId x)
-> (forall x. Rep PollerId x -> PollerId) -> Generic PollerId
forall x. Rep PollerId x -> PollerId
forall x. PollerId -> Rep PollerId x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. PollerId -> Rep PollerId x
from :: forall x. PollerId -> Rep PollerId x
$cto :: forall x. Rep PollerId x -> PollerId
to :: forall x. Rep PollerId x -> PollerId
Generic, [PollerId] -> Value
[PollerId] -> Encoding
PollerId -> Value
PollerId -> Encoding
(PollerId -> Value)
-> (PollerId -> Encoding)
-> ([PollerId] -> Value)
-> ([PollerId] -> Encoding)
-> ToJSON PollerId
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> ToJSON a
$ctoJSON :: PollerId -> Value
toJSON :: PollerId -> Value
$ctoEncoding :: PollerId -> Encoding
toEncoding :: PollerId -> Encoding
$ctoJSONList :: [PollerId] -> Value
toJSONList :: [PollerId] -> Value
$ctoEncodingList :: [PollerId] -> Encoding
toEncodingList :: [PollerId] -> Encoding
J.ToJSON)

data SubscriberExecutionDetails = SubscriberExecutionDetails
  { SubscriberExecutionDetails -> SubscriberId
_sedSubscriberId :: !SubscriberId,
    SubscriberExecutionDetails -> SubscriberMetadata
_sedSubscriberMetadata :: !SubscriberMetadata
  }
  deriving (Int -> SubscriberExecutionDetails -> ShowS
[SubscriberExecutionDetails] -> ShowS
SubscriberExecutionDetails -> String
(Int -> SubscriberExecutionDetails -> ShowS)
-> (SubscriberExecutionDetails -> String)
-> ([SubscriberExecutionDetails] -> ShowS)
-> Show SubscriberExecutionDetails
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> SubscriberExecutionDetails -> ShowS
showsPrec :: Int -> SubscriberExecutionDetails -> ShowS
$cshow :: SubscriberExecutionDetails -> String
show :: SubscriberExecutionDetails -> String
$cshowList :: [SubscriberExecutionDetails] -> ShowS
showList :: [SubscriberExecutionDetails] -> ShowS
Show, SubscriberExecutionDetails -> SubscriberExecutionDetails -> Bool
(SubscriberExecutionDetails -> SubscriberExecutionDetails -> Bool)
-> (SubscriberExecutionDetails
    -> SubscriberExecutionDetails -> Bool)
-> Eq SubscriberExecutionDetails
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: SubscriberExecutionDetails -> SubscriberExecutionDetails -> Bool
== :: SubscriberExecutionDetails -> SubscriberExecutionDetails -> Bool
$c/= :: SubscriberExecutionDetails -> SubscriberExecutionDetails -> Bool
/= :: SubscriberExecutionDetails -> SubscriberExecutionDetails -> Bool
Eq)

-- | Execution information related to a cohort on a poll cycle
data CohortExecutionDetails = CohortExecutionDetails
  { CohortExecutionDetails -> CohortId
_cedCohortId :: !CohortId,
    CohortExecutionDetails -> CohortKey
_cedVariables :: !CohortVariables,
    -- | Nothing in case of an error
    CohortExecutionDetails -> Maybe Int
_cedResponseSize :: !(Maybe Int),
    -- | The response on this cycle has been pushed to these above subscribers
    -- New subscribers (those which haven't been around during the previous poll
    -- cycle) will always be part of this
    CohortExecutionDetails -> [SubscriberExecutionDetails]
_cedPushedTo :: ![SubscriberExecutionDetails],
    -- | The response on this cycle has *not* been pushed to these above
    -- subscribers. This would when the response hasn't changed from the previous
    -- polled cycle
    CohortExecutionDetails -> [SubscriberExecutionDetails]
_cedIgnored :: ![SubscriberExecutionDetails],
    CohortExecutionDetails -> BatchId
_cedBatchId :: !BatchId
  }
  deriving (Int -> CohortExecutionDetails -> ShowS
[CohortExecutionDetails] -> ShowS
CohortExecutionDetails -> String
(Int -> CohortExecutionDetails -> ShowS)
-> (CohortExecutionDetails -> String)
-> ([CohortExecutionDetails] -> ShowS)
-> Show CohortExecutionDetails
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> CohortExecutionDetails -> ShowS
showsPrec :: Int -> CohortExecutionDetails -> ShowS
$cshow :: CohortExecutionDetails -> String
show :: CohortExecutionDetails -> String
$cshowList :: [CohortExecutionDetails] -> ShowS
showList :: [CohortExecutionDetails] -> ShowS
Show, CohortExecutionDetails -> CohortExecutionDetails -> Bool
(CohortExecutionDetails -> CohortExecutionDetails -> Bool)
-> (CohortExecutionDetails -> CohortExecutionDetails -> Bool)
-> Eq CohortExecutionDetails
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: CohortExecutionDetails -> CohortExecutionDetails -> Bool
== :: CohortExecutionDetails -> CohortExecutionDetails -> Bool
$c/= :: CohortExecutionDetails -> CohortExecutionDetails -> Bool
/= :: CohortExecutionDetails -> CohortExecutionDetails -> Bool
Eq)

-- | Execution information related to a single batched execution
data BatchExecutionDetails = BatchExecutionDetails
  { -- | postgres execution time of each batch ('Nothing' in case of non-PG dbs)
    BatchExecutionDetails -> Maybe DiffTime
_bedPgExecutionTime :: Maybe Clock.DiffTime,
    -- | database execution time of each batch
    BatchExecutionDetails -> DiffTime
_bedDbExecutionTime :: Clock.DiffTime,
    -- | time to taken to push to all cohorts belonging to this batch
    BatchExecutionDetails -> DiffTime
_bedPushTime :: Clock.DiffTime,
    -- | id of the batch
    BatchExecutionDetails -> BatchId
_bedBatchId :: BatchId,
    -- | execution details of the cohorts belonging to this batch
    BatchExecutionDetails -> [CohortExecutionDetails]
_bedCohorts :: [CohortExecutionDetails],
    BatchExecutionDetails -> Maybe Int
_bedBatchResponseSizeBytes :: Maybe Int
  }
  deriving (Int -> BatchExecutionDetails -> ShowS
[BatchExecutionDetails] -> ShowS
BatchExecutionDetails -> String
(Int -> BatchExecutionDetails -> ShowS)
-> (BatchExecutionDetails -> String)
-> ([BatchExecutionDetails] -> ShowS)
-> Show BatchExecutionDetails
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> BatchExecutionDetails -> ShowS
showsPrec :: Int -> BatchExecutionDetails -> ShowS
$cshow :: BatchExecutionDetails -> String
show :: BatchExecutionDetails -> String
$cshowList :: [BatchExecutionDetails] -> ShowS
showList :: [BatchExecutionDetails] -> ShowS
Show, BatchExecutionDetails -> BatchExecutionDetails -> Bool
(BatchExecutionDetails -> BatchExecutionDetails -> Bool)
-> (BatchExecutionDetails -> BatchExecutionDetails -> Bool)
-> Eq BatchExecutionDetails
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: BatchExecutionDetails -> BatchExecutionDetails -> Bool
== :: BatchExecutionDetails -> BatchExecutionDetails -> Bool
$c/= :: BatchExecutionDetails -> BatchExecutionDetails -> Bool
/= :: BatchExecutionDetails -> BatchExecutionDetails -> Bool
Eq)

-- | see Note [Minimal LiveQuery Poller Log]
batchExecutionDetailMinimal :: BatchExecutionDetails -> J.Value
batchExecutionDetailMinimal :: BatchExecutionDetails -> Value
batchExecutionDetailMinimal BatchExecutionDetails {[CohortExecutionDetails]
Maybe Int
Maybe DiffTime
DiffTime
BatchId
_bedPgExecutionTime :: BatchExecutionDetails -> Maybe DiffTime
_bedDbExecutionTime :: BatchExecutionDetails -> DiffTime
_bedPushTime :: BatchExecutionDetails -> DiffTime
_bedBatchId :: BatchExecutionDetails -> BatchId
_bedCohorts :: BatchExecutionDetails -> [CohortExecutionDetails]
_bedBatchResponseSizeBytes :: BatchExecutionDetails -> Maybe Int
_bedPgExecutionTime :: Maybe DiffTime
_bedDbExecutionTime :: DiffTime
_bedPushTime :: DiffTime
_bedBatchId :: BatchId
_bedCohorts :: [CohortExecutionDetails]
_bedBatchResponseSizeBytes :: Maybe Int
..} =
  let batchRespSize :: [Pair]
batchRespSize =
        [Pair] -> (Int -> [Pair]) -> Maybe Int -> [Pair]
forall b a. b -> (a -> b) -> Maybe a -> b
maybe
          [Pair]
forall a. Monoid a => a
mempty
          (\Int
respSize -> [Key
"batch_response_size_bytes" Key -> Int -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
J..= Int
respSize])
          Maybe Int
_bedBatchResponseSizeBytes
      pgExecTime :: [Pair]
pgExecTime =
        [Pair] -> (DiffTime -> [Pair]) -> Maybe DiffTime -> [Pair]
forall b a. b -> (a -> b) -> Maybe a -> b
maybe
          [Pair]
forall a. Monoid a => a
mempty
          (\DiffTime
execTime -> [Key
"pg_execution_time" Key -> DiffTime -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
J..= DiffTime
execTime])
          Maybe DiffTime
_bedPgExecutionTime
   in [Pair] -> Value
J.object
        ( [ Key
"db_execution_time" Key -> DiffTime -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
J..= DiffTime
_bedDbExecutionTime,
            Key
"push_time" Key -> DiffTime -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
J..= DiffTime
_bedPushTime,
            Key
"batch_id" Key -> BatchId -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
J..= BatchId
_bedBatchId
          ]
            -- log pg exec time only when its not 'Nothing'
            [Pair] -> [Pair] -> [Pair]
forall a. Semigroup a => a -> a -> a
<> [Pair]
pgExecTime
            -- log batch resp size only when there are no errors
            [Pair] -> [Pair] -> [Pair]
forall a. Semigroup a => a -> a -> a
<> [Pair]
batchRespSize
        )

data PollDetailsError = PollDetailsError
  { PollDetailsError -> BatchId
_pdeBatchId :: BatchId,
    PollDetailsError -> QErr
_pdeErrorDetails :: QErr
  }
  deriving (PollDetailsError -> PollDetailsError -> Bool
(PollDetailsError -> PollDetailsError -> Bool)
-> (PollDetailsError -> PollDetailsError -> Bool)
-> Eq PollDetailsError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: PollDetailsError -> PollDetailsError -> Bool
== :: PollDetailsError -> PollDetailsError -> Bool
$c/= :: PollDetailsError -> PollDetailsError -> Bool
/= :: PollDetailsError -> PollDetailsError -> Bool
Eq)

instance Show PollDetailsError where
  show :: PollDetailsError -> String
show PollDetailsError
pde = String
"batch_id = " String -> ShowS
forall a. [a] -> [a] -> [a]
++ BatchId -> String
forall a. Show a => a -> String
show (PollDetailsError -> BatchId
_pdeBatchId PollDetailsError
pde) String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
", detail = " String -> ShowS
forall a. [a] -> [a] -> [a]
++ Text -> String
unpack (QErr -> Text
showQErr (QErr -> Text) -> QErr -> Text
forall a b. (a -> b) -> a -> b
$ PollDetailsError -> QErr
_pdeErrorDetails PollDetailsError
pde)

instance J.ToJSON PollDetailsError where
  toJSON :: PollDetailsError -> Value
toJSON PollDetailsError {QErr
BatchId
_pdeBatchId :: PollDetailsError -> BatchId
_pdeErrorDetails :: PollDetailsError -> QErr
_pdeBatchId :: BatchId
_pdeErrorDetails :: QErr
..} =
    [Pair] -> Value
J.object
      ([Pair] -> Value) -> [Pair] -> Value
forall a b. (a -> b) -> a -> b
$ [ Key
"batch_id" Key -> BatchId -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
J..= BatchId
_pdeBatchId,
          Key
"detail" Key -> QErr -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
J..= QErr
_pdeErrorDetails
        ]

-- TODO consider refactoring into two types: one that is returned from pollLiveQuery and pollStreamingQuery, and a parent type containing pollerId, sourceName, and so on, which is assembled at the callsites of those two functions. Move postPollHook out of those functions to callsites
data PollDetails = PollDetails
  { -- | the unique ID (basically a thread that run as a 'Poller') for the
    -- 'Poller'
    PollDetails -> PollerId
_pdPollerId :: PollerId,
    -- | distinguish between the subscription type (i.e. live-query or streaming
    -- subscription)
    PollDetails -> SubscriptionType
_pdKind :: SubscriptionType,
    -- | the multiplexed SQL query to be run against the database with all the
    -- variables together
    PollDetails -> Text
_pdGeneratedSql :: Text,
    -- | the time taken to get a snapshot of cohorts from our 'SubscriptionsState'
    -- data structure
    PollDetails -> DiffTime
_pdSnapshotTime :: Clock.DiffTime,
    -- | list of execution batches and their details
    PollDetails -> [BatchExecutionDetails]
_pdBatches :: [BatchExecutionDetails],
    -- | total time spent on a poll cycle
    PollDetails -> DiffTime
_pdTotalTime :: Clock.DiffTime,
    PollDetails -> SubscriptionsOptions
_pdLiveQueryOptions :: SubscriptionsOptions,
    PollDetails -> SourceName
_pdSource :: SourceName,
    PollDetails -> RoleName
_pdRole :: RoleName,
    PollDetails -> ParameterizedQueryHash
_pdParameterizedQueryHash :: ParameterizedQueryHash,
    PollDetails -> LogLevel
_pdLogLevel :: L.LogLevel,
    PollDetails -> Maybe [PollDetailsError]
_pdErrors :: Maybe [PollDetailsError]
  }
  deriving (Int -> PollDetails -> ShowS
[PollDetails] -> ShowS
PollDetails -> String
(Int -> PollDetails -> ShowS)
-> (PollDetails -> String)
-> ([PollDetails] -> ShowS)
-> Show PollDetails
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> PollDetails -> ShowS
showsPrec :: Int -> PollDetails -> ShowS
$cshow :: PollDetails -> String
show :: PollDetails -> String
$cshowList :: [PollDetails] -> ShowS
showList :: [PollDetails] -> ShowS
Show, PollDetails -> PollDetails -> Bool
(PollDetails -> PollDetails -> Bool)
-> (PollDetails -> PollDetails -> Bool) -> Eq PollDetails
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: PollDetails -> PollDetails -> Bool
== :: PollDetails -> PollDetails -> Bool
$c/= :: PollDetails -> PollDetails -> Bool
/= :: PollDetails -> PollDetails -> Bool
Eq)

{- Note [Minimal LiveQuery Poller Log]
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
We only want to log the minimal information in the livequery-poller-log as it
could be expensive to log the details of every subscriber (all poller related
information can always be retrieved by dumping the current live queries state)
We capture a lot more details in PollDetails and BatchExecutionDetails than
that is logged currently as other implementations such as pro can use them if
they need to.
-}

-- | see Note [Minimal LiveQuery Poller Log]
pollDetailMinimal :: PollDetails -> J.Value
pollDetailMinimal :: PollDetails -> Value
pollDetailMinimal PollDetails {[BatchExecutionDetails]
Maybe [PollDetailsError]
Text
DiffTime
SubscriptionsOptions
LogLevel
RoleName
SourceName
SubscriptionType
ParameterizedQueryHash
PollerId
_pdPollerId :: PollDetails -> PollerId
_pdKind :: PollDetails -> SubscriptionType
_pdGeneratedSql :: PollDetails -> Text
_pdSnapshotTime :: PollDetails -> DiffTime
_pdBatches :: PollDetails -> [BatchExecutionDetails]
_pdTotalTime :: PollDetails -> DiffTime
_pdLiveQueryOptions :: PollDetails -> SubscriptionsOptions
_pdSource :: PollDetails -> SourceName
_pdRole :: PollDetails -> RoleName
_pdParameterizedQueryHash :: PollDetails -> ParameterizedQueryHash
_pdLogLevel :: PollDetails -> LogLevel
_pdErrors :: PollDetails -> Maybe [PollDetailsError]
_pdPollerId :: PollerId
_pdKind :: SubscriptionType
_pdGeneratedSql :: Text
_pdSnapshotTime :: DiffTime
_pdBatches :: [BatchExecutionDetails]
_pdTotalTime :: DiffTime
_pdLiveQueryOptions :: SubscriptionsOptions
_pdSource :: SourceName
_pdRole :: RoleName
_pdParameterizedQueryHash :: ParameterizedQueryHash
_pdLogLevel :: LogLevel
_pdErrors :: Maybe [PollDetailsError]
..} =
  [Pair] -> Value
J.object
    ([Pair] -> Value) -> [Pair] -> Value
forall a b. (a -> b) -> a -> b
$ [ Key
"poller_id" Key -> PollerId -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
J..= PollerId
_pdPollerId,
        Key
"kind" Key -> SubscriptionType -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
J..= SubscriptionType
_pdKind,
        Key
"snapshot_time" Key -> DiffTime -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
J..= DiffTime
_pdSnapshotTime,
        Key
"batches" Key -> [Value] -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
J..= [Value]
batches, -- TODO: deprecate this field
        Key
"execution_batches" Key -> [Value] -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
J..= [Value]
batches,
        Key
"subscriber_count" Key -> Int -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
J..= [Int] -> Int
forall a. Num a => [a] -> a
forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum ((BatchExecutionDetails -> Int) -> [BatchExecutionDetails] -> [Int]
forall a b. (a -> b) -> [a] -> [b]
map ([CohortExecutionDetails] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length ([CohortExecutionDetails] -> Int)
-> (BatchExecutionDetails -> [CohortExecutionDetails])
-> BatchExecutionDetails
-> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. BatchExecutionDetails -> [CohortExecutionDetails]
_bedCohorts) [BatchExecutionDetails]
_pdBatches),
        Key
"total_time" Key -> DiffTime -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
J..= DiffTime
_pdTotalTime,
        Key
"source" Key -> SourceName -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
J..= SourceName
_pdSource,
        Key
"generated_sql" Key -> Text -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
J..= Text
_pdGeneratedSql,
        Key
"role" Key -> RoleName -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
J..= RoleName
_pdRole,
        Key
"subscription_options" Key -> SubscriptionsOptions -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
J..= SubscriptionsOptions
_pdLiveQueryOptions
      ]
    [Pair] -> [Pair] -> [Pair]
forall a. Semigroup a => a -> a -> a
<> [Pair]
-> ([PollDetailsError] -> [Pair])
-> Maybe [PollDetailsError]
-> [Pair]
forall b a. b -> (a -> b) -> Maybe a -> b
maybe [] (\[PollDetailsError]
err -> [Key
"errors" Key -> [PollDetailsError] -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> Pair
J..= [PollDetailsError]
err]) Maybe [PollDetailsError]
_pdErrors
  where
    batches :: [Value]
batches = (BatchExecutionDetails -> Value)
-> [BatchExecutionDetails] -> [Value]
forall a b. (a -> b) -> [a] -> [b]
map BatchExecutionDetails -> Value
batchExecutionDetailMinimal [BatchExecutionDetails]
_pdBatches

instance L.ToEngineLog PollDetails L.Hasura where
  toEngineLog :: PollDetails -> (LogLevel, EngineLogType Hasura, Value)
toEngineLog PollDetails
pl = (PollDetails -> LogLevel
_pdLogLevel PollDetails
pl, EngineLogType Hasura
L.ELTLivequeryPollerLog, PollDetails -> Value
pollDetailMinimal PollDetails
pl)

type SubscriptionPostPollHook = PollDetails -> IO ()

-- the default SubscriptionPostPollHook
defaultSubscriptionPostPollHook :: L.Logger L.Hasura -> SubscriptionPostPollHook
defaultSubscriptionPostPollHook :: Logger Hasura -> SubscriptionPostPollHook
defaultSubscriptionPostPollHook = \Logger Hasura
x -> Logger Hasura
-> forall a (m :: * -> *).
   (ToEngineLog a Hasura, MonadIO m) =>
   a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
   (ToEngineLog a impl, MonadIO m) =>
   a -> m ()
L.unLogger Logger Hasura
x