{-# LANGUAGE TemplateHaskell #-}

-- | This file contains the handlers that are used within websocket server.
--
-- This module export three main handlers for the websocket server ('onConn',
-- 'onMessage', 'onClose'), and two helpers for sending messages to the client
-- ('sendMsg', 'sendCloseWithMsg').
--
-- NOTE!
--  The handler functions 'onClose', 'onMessage', etc. depend for correctness on two properties:
--    - they run with async exceptions masked
--    - they do not race on the same connection
module Hasura.GraphQL.Transport.WebSocket
  ( onConn,
    onMessage,
    onClose,
    sendMsg,
    sendCloseWithMsg,
    mkCloseWebsocketsOnMetadataChangeAction,
    runWebsocketCloseOnMetadataChangeAction,
    WebsocketCloseOnMetadataChangeAction,
  )
where

import Control.Concurrent.Extended (sleep)
import Control.Concurrent.STM qualified as STM
import Control.Monad.Morph (hoist)
import Control.Monad.Trans.Control qualified as MC
import Data.Aeson qualified as J
import Data.Aeson.Casing qualified as J
import Data.Aeson.Encoding qualified as J
import Data.Aeson.Ordered qualified as JO
import Data.Aeson.TH qualified as J
import Data.ByteString (ByteString)
import Data.ByteString.Lazy qualified as LBS
import Data.CaseInsensitive qualified as CI
import Data.Dependent.Map qualified as DM
import Data.HashMap.Strict qualified as HashMap
import Data.HashMap.Strict.InsOrd qualified as InsOrdHashMap
import Data.HashSet qualified as Set
import Data.List.NonEmpty qualified as NE
import Data.Monoid (Endo (..))
import Data.String
import Data.Text qualified as T
import Data.Text.Encoding qualified as TE
import Data.Text.Extended ((<>>))
import Data.Time.Clock
import Data.Time.Clock qualified as TC
import Data.Word (Word16)
import GHC.AssertNF.CPP
import Hasura.App.State
import Hasura.Backends.DataConnector.Agent.Client (AgentLicenseKey)
import Hasura.Backends.Postgres.Instances.Transport (runPGMutationTransaction)
import Hasura.Base.Error
import Hasura.CredentialCache
import Hasura.EncJSON
import Hasura.GraphQL.Execute qualified as E
import Hasura.GraphQL.Execute.Action qualified as EA
import Hasura.GraphQL.Execute.Backend qualified as EB
import Hasura.GraphQL.Execute.RemoteJoin qualified as RJ
import Hasura.GraphQL.Execute.Subscription.Plan qualified as ES
import Hasura.GraphQL.Execute.Subscription.Poll qualified as ES
import Hasura.GraphQL.Execute.Subscription.State qualified as ES
import Hasura.GraphQL.Logging
import Hasura.GraphQL.Namespace (RootFieldAlias (..))
import Hasura.GraphQL.ParameterizedQueryHash (ParameterizedQueryHash)
import Hasura.GraphQL.Parser.Directives (cached)
import Hasura.GraphQL.Transport.Backend
import Hasura.GraphQL.Transport.HTTP
import Hasura.GraphQL.Transport.HTTP.Protocol
import Hasura.GraphQL.Transport.Instances ()
import Hasura.GraphQL.Transport.WebSocket.Protocol
import Hasura.GraphQL.Transport.WebSocket.Server qualified as WS
import Hasura.GraphQL.Transport.WebSocket.Types
import Hasura.GraphQL.Transport.WebSocket.Types qualified as WS
import Hasura.Logging qualified as L
import Hasura.Metadata.Class
import Hasura.Prelude
import Hasura.QueryTags
import Hasura.RQL.Types.Common (MetricsConfig (_mcAnalyzeQueryVariables))
import Hasura.RQL.Types.ResultCustomization
import Hasura.RQL.Types.SchemaCache (scApiLimits, scMetricsConfig)
import Hasura.RemoteSchema.SchemaCache
import Hasura.SQL.AnyBackend qualified as AB
import Hasura.Server.AppStateRef
import Hasura.Server.Auth
  ( AuthMode,
    UserAuthentication,
    resolveUserInfo,
  )
import Hasura.Server.Cors
import Hasura.Server.Init.Config (KeepAliveDelay (..))
import Hasura.Server.Limits
  ( HasResourceLimits (..),
    ResourceLimits (..),
  )
import Hasura.Server.Metrics (ServerMetrics (..))
import Hasura.Server.Prometheus
  ( GraphQLRequestMetrics (..),
    PrometheusMetrics (..),
  )
import Hasura.Server.Telemetry.Counters qualified as Telem
import Hasura.Server.Types (GranularPrometheusMetricsState (..), MonadGetPolicies (..), RequestId, getRequestId)
import Hasura.Services.Network
import Hasura.Session
import Hasura.Tracing qualified as Tracing
import Language.GraphQL.Draft.Syntax (Name (..))
import Language.GraphQL.Draft.Syntax qualified as G
import ListT qualified
import Network.HTTP.Client qualified as HTTP
import Network.HTTP.Types qualified as HTTP
import Network.WebSockets qualified as WS
import Refined (unrefine)
import StmContainers.Map qualified as STMMap
import System.Metrics.Prometheus.Counter qualified as Prometheus.Counter
import System.Metrics.Prometheus.Histogram qualified as Prometheus.Histogram

-- | 'ES.SubscriberDetails' comes from 'Hasura.GraphQL.Execute.LiveQuery.State.addLiveQuery'. We use
-- this to track a connection's operations so we can remove them from 'LiveQueryState', and
-- log.
--
-- NOTE!: This must be kept consistent with the global 'LiveQueryState', in 'onClose'
-- and 'onStart'.
data OpDetail
  = ODStarted
  | ODProtoErr !Text
  | ODQueryErr !QErr
  | ODCompleted
  | ODStopped
  deriving (OpDetail -> OpDetail -> Bool
(OpDetail -> OpDetail -> Bool)
-> (OpDetail -> OpDetail -> Bool) -> Eq OpDetail
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: OpDetail -> OpDetail -> Bool
== :: OpDetail -> OpDetail -> Bool
$c/= :: OpDetail -> OpDetail -> Bool
/= :: OpDetail -> OpDetail -> Bool
Eq)

$( J.deriveToJSON
     J.defaultOptions
       { J.constructorTagModifier = J.snakeCase . drop 2,
         J.sumEncoding = J.TaggedObject "type" "detail"
       }
     ''OpDetail
 )

data OperationDetails = OperationDetails
  { OperationDetails -> OperationId
_odOperationId :: !OperationId,
    OperationDetails -> Maybe RequestId
_odRequestId :: !(Maybe RequestId),
    OperationDetails -> Maybe OperationName
_odOperationName :: !(Maybe OperationName),
    OperationDetails -> OpDetail
_odOperationType :: !OpDetail,
    OperationDetails -> Maybe GQLReqUnparsed
_odQuery :: !(Maybe GQLReqUnparsed),
    OperationDetails -> Maybe ParameterizedQueryHash
_odParameterizedQueryHash :: !(Maybe ParameterizedQueryHash)
  }
  deriving (OperationDetails -> OperationDetails -> Bool
(OperationDetails -> OperationDetails -> Bool)
-> (OperationDetails -> OperationDetails -> Bool)
-> Eq OperationDetails
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: OperationDetails -> OperationDetails -> Bool
== :: OperationDetails -> OperationDetails -> Bool
$c/= :: OperationDetails -> OperationDetails -> Bool
/= :: OperationDetails -> OperationDetails -> Bool
Eq)

$(J.deriveToJSON hasuraJSON ''OperationDetails)

data WSEvent
  = EAccepted
  | ERejected !QErr
  | EConnErr !ConnErrMsg
  | EOperation !OperationDetails
  | EClosed
  deriving (WSEvent -> WSEvent -> Bool
(WSEvent -> WSEvent -> Bool)
-> (WSEvent -> WSEvent -> Bool) -> Eq WSEvent
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: WSEvent -> WSEvent -> Bool
== :: WSEvent -> WSEvent -> Bool
$c/= :: WSEvent -> WSEvent -> Bool
/= :: WSEvent -> WSEvent -> Bool
Eq)

$( J.deriveToJSON
     J.defaultOptions
       { J.constructorTagModifier = J.snakeCase . drop 1,
         J.sumEncoding = J.TaggedObject "type" "detail"
       }
     ''WSEvent
 )

data WsConnInfo = WsConnInfo
  { WsConnInfo -> WSId
_wsciWebsocketId :: !WS.WSId,
    WsConnInfo -> Maybe UTCTime
_wsciTokenExpiry :: !(Maybe TC.UTCTime),
    WsConnInfo -> Maybe Text
_wsciMsg :: !(Maybe Text)
  }
  deriving (WsConnInfo -> WsConnInfo -> Bool
(WsConnInfo -> WsConnInfo -> Bool)
-> (WsConnInfo -> WsConnInfo -> Bool) -> Eq WsConnInfo
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: WsConnInfo -> WsConnInfo -> Bool
== :: WsConnInfo -> WsConnInfo -> Bool
$c/= :: WsConnInfo -> WsConnInfo -> Bool
/= :: WsConnInfo -> WsConnInfo -> Bool
Eq)

$(J.deriveToJSON hasuraJSON ''WsConnInfo)

data WSLogInfo = WSLogInfo
  { WSLogInfo -> Maybe SessionVariables
_wsliUserVars :: !(Maybe SessionVariables),
    WSLogInfo -> WsConnInfo
_wsliConnectionInfo :: !WsConnInfo,
    WSLogInfo -> WSEvent
_wsliEvent :: !WSEvent
  }
  deriving (WSLogInfo -> WSLogInfo -> Bool
(WSLogInfo -> WSLogInfo -> Bool)
-> (WSLogInfo -> WSLogInfo -> Bool) -> Eq WSLogInfo
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: WSLogInfo -> WSLogInfo -> Bool
== :: WSLogInfo -> WSLogInfo -> Bool
$c/= :: WSLogInfo -> WSLogInfo -> Bool
/= :: WSLogInfo -> WSLogInfo -> Bool
Eq)

$(J.deriveToJSON hasuraJSON ''WSLogInfo)

data WSLog = WSLog
  { WSLog -> LogLevel
_wslLogLevel :: !L.LogLevel,
    WSLog -> WSLogInfo
_wslInfo :: !WSLogInfo
  }

instance L.ToEngineLog WSLog L.Hasura where
  toEngineLog :: WSLog -> (LogLevel, EngineLogType Hasura, Value)
toEngineLog (WSLog LogLevel
logLevel WSLogInfo
wsLog) =
    (LogLevel
logLevel, EngineLogType Hasura
L.ELTWebsocketLog, WSLogInfo -> Value
forall a. ToJSON a => a -> Value
J.toJSON WSLogInfo
wsLog)

mkWsInfoLog :: Maybe SessionVariables -> WsConnInfo -> WSEvent -> WSLog
mkWsInfoLog :: Maybe SessionVariables -> WsConnInfo -> WSEvent -> WSLog
mkWsInfoLog Maybe SessionVariables
uv WsConnInfo
ci WSEvent
ev =
  LogLevel -> WSLogInfo -> WSLog
WSLog LogLevel
L.LevelInfo (WSLogInfo -> WSLog) -> WSLogInfo -> WSLog
forall a b. (a -> b) -> a -> b
$ Maybe SessionVariables -> WsConnInfo -> WSEvent -> WSLogInfo
WSLogInfo Maybe SessionVariables
uv WsConnInfo
ci WSEvent
ev

mkWsErrorLog :: Maybe SessionVariables -> WsConnInfo -> WSEvent -> WSLog
mkWsErrorLog :: Maybe SessionVariables -> WsConnInfo -> WSEvent -> WSLog
mkWsErrorLog Maybe SessionVariables
uv WsConnInfo
ci WSEvent
ev =
  LogLevel -> WSLogInfo -> WSLog
WSLog LogLevel
L.LevelError (WSLogInfo -> WSLog) -> WSLogInfo -> WSLog
forall a b. (a -> b) -> a -> b
$ Maybe SessionVariables -> WsConnInfo -> WSEvent -> WSLogInfo
WSLogInfo Maybe SessionVariables
uv WsConnInfo
ci WSEvent
ev

logWSEvent ::
  (MonadIO m) =>
  L.Logger L.Hasura ->
  WSConn ->
  WSEvent ->
  m ()
logWSEvent :: forall (m :: * -> *).
MonadIO m =>
Logger Hasura -> WSConn -> WSEvent -> m ()
logWSEvent (L.Logger forall a (m :: * -> *).
(ToEngineLog a Hasura, MonadIO m) =>
a -> m ()
logger) WSConn
wsConn WSEvent
wsEv = do
  WSConnState
userInfoME <- IO WSConnState -> m WSConnState
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO WSConnState -> m WSConnState)
-> IO WSConnState -> m WSConnState
forall a b. (a -> b) -> a -> b
$ TVar WSConnState -> IO WSConnState
forall a. TVar a -> IO a
STM.readTVarIO TVar WSConnState
userInfoR
  let (Maybe SessionVariables
userVarsM, Maybe UTCTime
tokenExpM) = case WSConnState
userInfoME of
        CSInitialised WsClientState {[Header]
Maybe UTCTime
IpAddress
UserInfo
wscsUserInfo :: UserInfo
wscsTokenExpTime :: Maybe UTCTime
wscsReqHeaders :: [Header]
wscsIpAddress :: IpAddress
wscsUserInfo :: WsClientState -> UserInfo
wscsTokenExpTime :: WsClientState -> Maybe UTCTime
wscsReqHeaders :: WsClientState -> [Header]
wscsIpAddress :: WsClientState -> IpAddress
..} ->
          ( SessionVariables -> Maybe SessionVariables
forall a. a -> Maybe a
Just (SessionVariables -> Maybe SessionVariables)
-> SessionVariables -> Maybe SessionVariables
forall a b. (a -> b) -> a -> b
$ UserInfo -> SessionVariables
_uiSession UserInfo
wscsUserInfo,
            Maybe UTCTime
wscsTokenExpTime
          )
        WSConnState
_ -> (Maybe SessionVariables
forall a. Maybe a
Nothing, Maybe UTCTime
forall a. Maybe a
Nothing)
  IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ WSLog -> IO ()
forall a (m :: * -> *).
(ToEngineLog a Hasura, MonadIO m) =>
a -> m ()
logger (WSLog -> IO ()) -> WSLog -> IO ()
forall a b. (a -> b) -> a -> b
$ LogLevel -> WSLogInfo -> WSLog
WSLog LogLevel
logLevel (WSLogInfo -> WSLog) -> WSLogInfo -> WSLog
forall a b. (a -> b) -> a -> b
$ Maybe SessionVariables -> WsConnInfo -> WSEvent -> WSLogInfo
WSLogInfo Maybe SessionVariables
userVarsM (WSId -> Maybe UTCTime -> Maybe Text -> WsConnInfo
WsConnInfo WSId
wsId Maybe UTCTime
tokenExpM Maybe Text
forall a. Maybe a
Nothing) WSEvent
wsEv
  where
    WSConnData TVar WSConnState
userInfoR OperationMap
_ ErrRespType
_ GraphQLQueryType
_ = WSConn -> WSConnData
forall a. WSConn a -> a
WS.getData WSConn
wsConn
    wsId :: WSId
wsId = WSConn -> WSId
forall a. WSConn a -> WSId
WS.getWSId WSConn
wsConn
    logLevel :: LogLevel
logLevel = LogLevel -> LogLevel -> Bool -> LogLevel
forall a. a -> a -> Bool -> a
bool LogLevel
L.LevelInfo LogLevel
L.LevelError Bool
isError
    isError :: Bool
isError = case WSEvent
wsEv of
      WSEvent
EAccepted -> Bool
False
      ERejected QErr
_ -> Bool
True
      EConnErr ConnErrMsg
_ -> Bool
True
      WSEvent
EClosed -> Bool
False
      EOperation OperationDetails
operation -> case OperationDetails -> OpDetail
_odOperationType OperationDetails
operation of
        OpDetail
ODStarted -> Bool
False
        ODProtoErr Text
_ -> Bool
True
        ODQueryErr QErr
_ -> Bool
True
        OpDetail
ODCompleted -> Bool
False
        OpDetail
ODStopped -> Bool
False

sendMsg :: (MonadIO m) => WSConn -> ServerMsg -> m ()
sendMsg :: forall (m :: * -> *). MonadIO m => WSConn -> ServerMsg -> m ()
sendMsg WSConn
wsConn ServerMsg
msg = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO do
  IO DiffTime
timer <- IO (IO DiffTime)
forall (m :: * -> *) (n :: * -> *).
(MonadIO m, MonadIO n) =>
m (n DiffTime)
startTimer
  WSConn -> WSQueueResponse -> IO ()
forall a. WSConn a -> WSQueueResponse -> IO ()
WS.sendMsg WSConn
wsConn (WSQueueResponse -> IO ()) -> WSQueueResponse -> IO ()
forall a b. (a -> b) -> a -> b
$ ByteString -> Maybe WSEventInfo -> IO DiffTime -> WSQueueResponse
WS.WSQueueResponse (ServerMsg -> ByteString
encodeServerMsg ServerMsg
msg) Maybe WSEventInfo
forall a. Maybe a
Nothing IO DiffTime
timer

-- sendCloseWithMsg closes the websocket server with an error code that can be supplied as (Maybe Word16),
-- if there is `Nothing`, the server will be closed with an error code derived from ServerErrorCode
sendCloseWithMsg ::
  (MonadIO m) =>
  L.Logger L.Hasura ->
  WSConn ->
  ServerErrorCode ->
  Maybe ServerMsg ->
  Maybe Word16 ->
  m ()
sendCloseWithMsg :: forall (m :: * -> *).
MonadIO m =>
Logger Hasura
-> WSConn
-> ServerErrorCode
-> Maybe ServerMsg
-> Maybe Word16
-> m ()
sendCloseWithMsg Logger Hasura
logger WSConn
wsConn ServerErrorCode
errCode Maybe ServerMsg
mErrServerMsg Maybe Word16
mCode = do
  case Maybe ServerMsg
mErrServerMsg of
    Just ServerMsg
errServerMsg -> do
      Logger Hasura -> WSConn -> WSEvent -> m ()
forall (m :: * -> *).
MonadIO m =>
Logger Hasura -> WSConn -> WSEvent -> m ()
logWSEvent Logger Hasura
logger WSConn
wsConn WSEvent
EClosed
      IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ WSConn -> Word16 -> ByteString -> ServerMsg -> IO ()
forall a. WSConn a -> Word16 -> ByteString -> ServerMsg -> IO ()
WS.sendMsgAndCloseConn WSConn
wsConn Word16
errCloseCode ByteString
errMsg ServerMsg
errServerMsg
    Maybe ServerMsg
Nothing -> do
      Logger Hasura -> WSConn -> WSEvent -> m ()
forall (m :: * -> *).
MonadIO m =>
Logger Hasura -> WSConn -> WSEvent -> m ()
logWSEvent Logger Hasura
logger WSConn
wsConn WSEvent
EClosed
      IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Connection -> Word16 -> ByteString -> IO ()
forall a. WebSocketsData a => Connection -> Word16 -> a -> IO ()
WS.sendCloseCode Connection
wsc Word16
errCloseCode ByteString
errMsg
  where
    wsc :: Connection
wsc = WSConn -> Connection
forall a. WSConn a -> Connection
WS.getRawWebSocketConnection WSConn
wsConn
    errMsg :: ByteString
errMsg = ServerErrorCode -> ByteString
encodeServerErrorMsg ServerErrorCode
errCode
    errCloseCode :: Word16
errCloseCode = Word16 -> Maybe Word16 -> Word16
forall a. a -> Maybe a -> a
fromMaybe (ServerErrorCode -> Word16
getErrCode ServerErrorCode
errCode) Maybe Word16
mCode
    getErrCode :: ServerErrorCode -> Word16
    getErrCode :: ServerErrorCode -> Word16
getErrCode ServerErrorCode
err = case ServerErrorCode
err of
      ServerErrorCode
ProtocolError1002 -> Word16
1002
      GenericError4400 String
_ -> Word16
4400
      ServerErrorCode
Unauthorized4401 -> Word16
4401
      ServerErrorCode
Forbidden4403 -> Word16
4403
      ServerErrorCode
ConnectionInitTimeout4408 -> Word16
4408
      NonUniqueSubscription4409 OperationId
_ -> Word16
4409
      ServerErrorCode
TooManyRequests4429 -> Word16
4429

sendMsgWithMetadata ::
  (MonadIO m) =>
  WSConn ->
  ServerMsg ->
  Maybe OperationName ->
  Maybe ParameterizedQueryHash ->
  ES.SubscriptionMetadata ->
  m ()
sendMsgWithMetadata :: forall (m :: * -> *).
MonadIO m =>
WSConn
-> ServerMsg
-> Maybe OperationName
-> Maybe ParameterizedQueryHash
-> SubscriptionMetadata
-> m ()
sendMsgWithMetadata WSConn
wsConn ServerMsg
msg Maybe OperationName
opName Maybe ParameterizedQueryHash
paramQueryHash (ES.SubscriptionMetadata DiffTime
execTime) =
  IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO do
    IO DiffTime
timer <- IO (IO DiffTime)
forall (m :: * -> *) (n :: * -> *).
(MonadIO m, MonadIO n) =>
m (n DiffTime)
startTimer
    WSConn -> WSQueueResponse -> IO ()
forall a. WSConn a -> WSQueueResponse -> IO ()
WS.sendMsg WSConn
wsConn (WSQueueResponse -> IO ()) -> WSQueueResponse -> IO ()
forall a b. (a -> b) -> a -> b
$ ByteString -> Maybe WSEventInfo -> IO DiffTime -> WSQueueResponse
WS.WSQueueResponse ByteString
bs Maybe WSEventInfo
wsInfo IO DiffTime
timer
  where
    bs :: ByteString
bs = ServerMsg -> ByteString
encodeServerMsg ServerMsg
msg
    (Maybe ServerMsgType
msgType, Maybe OperationId
operationId) = case ServerMsg
msg of
      (SMNext (DataMsg OperationId
opId GQResponse
_)) -> (ServerMsgType -> Maybe ServerMsgType
forall a. a -> Maybe a
Just ServerMsgType
SMT_GQL_NEXT, OperationId -> Maybe OperationId
forall a. a -> Maybe a
Just OperationId
opId)
      (SMData (DataMsg OperationId
opId GQResponse
_)) -> (ServerMsgType -> Maybe ServerMsgType
forall a. a -> Maybe a
Just ServerMsgType
SMT_GQL_DATA, OperationId -> Maybe OperationId
forall a. a -> Maybe a
Just OperationId
opId)
      ServerMsg
_ -> (Maybe ServerMsgType
forall a. Maybe a
Nothing, Maybe OperationId
forall a. Maybe a
Nothing)
    wsInfo :: Maybe WSEventInfo
wsInfo =
      WSEventInfo -> Maybe WSEventInfo
forall a. a -> Maybe a
Just
        (WSEventInfo -> Maybe WSEventInfo)
-> WSEventInfo -> Maybe WSEventInfo
forall a b. (a -> b) -> a -> b
$! WS.WSEventInfo
          { _wseiEventType :: Maybe ServerMsgType
WS._wseiEventType = Maybe ServerMsgType
msgType,
            _wseiOperationId :: Maybe OperationId
WS._wseiOperationId = Maybe OperationId
operationId,
            _wseiOperationName :: Maybe OperationName
WS._wseiOperationName = Maybe OperationName
opName,
            _wseiQueryExecutionTime :: Maybe Double
WS._wseiQueryExecutionTime = Double -> Maybe Double
forall a. a -> Maybe a
Just (Double -> Maybe Double) -> Double -> Maybe Double
forall a b. (a -> b) -> a -> b
$! DiffTime -> Double
forall a b. (Real a, Fractional b) => a -> b
realToFrac DiffTime
execTime,
            _wseiResponseSize :: Maybe Int64
WS._wseiResponseSize = Int64 -> Maybe Int64
forall a. a -> Maybe a
Just (Int64 -> Maybe Int64) -> Int64 -> Maybe Int64
forall a b. (a -> b) -> a -> b
$! ByteString -> Int64
LBS.length ByteString
bs,
            _wseiParameterizedQueryHash :: Maybe ParameterizedQueryHash
WS._wseiParameterizedQueryHash = Maybe ParameterizedQueryHash
paramQueryHash
          }

onConn ::
  (MonadIO m, MonadReader (WSServerEnv impl) m) =>
  WS.OnConnH m WSConnData
onConn :: forall (m :: * -> *) impl.
(MonadIO m, MonadReader (WSServerEnv impl) m) =>
OnConnH m WSConnData
onConn WSId
wsId RequestHead
requestHead IpAddress
ipAddress WSActions WSConnData
onConnHActions = do
  Either QErr (WsHeaders, ErrRespType, GraphQLQueryType)
res <- ExceptT QErr m (WsHeaders, ErrRespType, GraphQLQueryType)
-> m (Either QErr (WsHeaders, ErrRespType, GraphQLQueryType))
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT QErr m (WsHeaders, ErrRespType, GraphQLQueryType)
 -> m (Either QErr (WsHeaders, ErrRespType, GraphQLQueryType)))
-> ExceptT QErr m (WsHeaders, ErrRespType, GraphQLQueryType)
-> m (Either QErr (WsHeaders, ErrRespType, GraphQLQueryType))
forall a b. (a -> b) -> a -> b
$ do
    (ErrRespType
errType, GraphQLQueryType
queryType) <- ExceptT QErr m (ErrRespType, GraphQLQueryType)
checkPath
    let reqHdrs :: [Header]
reqHdrs = RequestHead -> [Header]
WS.requestHeaders RequestHead
requestHead
    [Header]
headers <- ExceptT QErr m [Header]
-> (Header -> ExceptT QErr m [Header])
-> Maybe Header
-> ExceptT QErr m [Header]
forall b a. b -> (a -> b) -> Maybe a -> b
maybe ([Header] -> ExceptT QErr m [Header]
forall a. a -> ExceptT QErr m a
forall (m :: * -> *) a. Monad m => a -> m a
return [Header]
reqHdrs) ((ByteString -> [Header] -> ExceptT QErr m [Header])
-> [Header] -> ByteString -> ExceptT QErr m [Header]
forall a b c. (a -> b -> c) -> b -> a -> c
flip ByteString -> [Header] -> ExceptT QErr m [Header]
enforceCors [Header]
reqHdrs (ByteString -> ExceptT QErr m [Header])
-> (Header -> ByteString) -> Header -> ExceptT QErr m [Header]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Header -> ByteString
forall a b. (a, b) -> b
snd) Maybe Header
getOrigin
    (WsHeaders, ErrRespType, GraphQLQueryType)
-> ExceptT QErr m (WsHeaders, ErrRespType, GraphQLQueryType)
forall a. a -> ExceptT QErr m a
forall (m :: * -> *) a. Monad m => a -> m a
return ([Header] -> WsHeaders
WsHeaders ([Header] -> WsHeaders) -> [Header] -> WsHeaders
forall a b. (a -> b) -> a -> b
$ [Header] -> [Header]
forall {a} {b}. (Eq a, IsString a) => [(a, b)] -> [(a, b)]
filterWsHeaders [Header]
headers, ErrRespType
errType, GraphQLQueryType
queryType)
  (QErr -> m (Either RejectRequest (AcceptWith WSConnData)))
-> ((WsHeaders, ErrRespType, GraphQLQueryType)
    -> m (Either RejectRequest (AcceptWith WSConnData)))
-> Either QErr (WsHeaders, ErrRespType, GraphQLQueryType)
-> m (Either RejectRequest (AcceptWith WSConnData))
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either QErr -> m (Either RejectRequest (AcceptWith WSConnData))
reject (WsHeaders, ErrRespType, GraphQLQueryType)
-> m (Either RejectRequest (AcceptWith WSConnData))
accept Either QErr (WsHeaders, ErrRespType, GraphQLQueryType)
res
  where
    kaAction :: WSKeepAliveMessageAction WSConnData
kaAction = WSActions WSConnData -> WSKeepAliveMessageAction WSConnData
forall a. WSActions a -> WSKeepAliveMessageAction a
WS._wsaKeepAliveAction WSActions WSConnData
onConnHActions
    acceptRequest :: AcceptRequest
acceptRequest = WSActions WSConnData -> AcceptRequest
forall a. WSActions a -> AcceptRequest
WS._wsaAcceptRequest WSActions WSConnData
onConnHActions

    -- NOTE: the "Keep-Alive" delay is something that's mentioned
    -- in the Apollo spec. For 'graphql-ws', we're using the Ping
    -- messages that are part of the spec.
    keepAliveAction :: KeepAliveDelay -> WSKeepAliveMessageAction WSConnData
keepAliveAction KeepAliveDelay
keepAliveDelay WSConn
wsConn =
      IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
        (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever
        (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
          WSKeepAliveMessageAction WSConnData
kaAction WSConn
wsConn
          DiffTime -> IO ()
sleep (DiffTime -> IO ()) -> DiffTime -> IO ()
forall a b. (a -> b) -> a -> b
$ Seconds -> DiffTime
seconds (Refined NonNegative Seconds -> Seconds
forall {k} (p :: k) x. Refined p x -> x
unrefine (Refined NonNegative Seconds -> Seconds)
-> Refined NonNegative Seconds -> Seconds
forall a b. (a -> b) -> a -> b
$ KeepAliveDelay -> Refined NonNegative Seconds
unKeepAliveDelay KeepAliveDelay
keepAliveDelay)

    tokenExpiryHandler :: WSKeepAliveMessageAction WSConnData
tokenExpiryHandler WSConn
wsConn = do
      UTCTime
expTime <- IO UTCTime -> IO UTCTime
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
        (IO UTCTime -> IO UTCTime) -> IO UTCTime -> IO UTCTime
forall a b. (a -> b) -> a -> b
$ STM UTCTime -> IO UTCTime
forall a. STM a -> IO a
STM.atomically
        (STM UTCTime -> IO UTCTime) -> STM UTCTime -> IO UTCTime
forall a b. (a -> b) -> a -> b
$ do
          WSConnState
connState <- TVar WSConnState -> STM WSConnState
forall a. TVar a -> STM a
STM.readTVar (TVar WSConnState -> STM WSConnState)
-> TVar WSConnState -> STM WSConnState
forall a b. (a -> b) -> a -> b
$ (WSConnData -> TVar WSConnState
_wscUser (WSConnData -> TVar WSConnState)
-> (WSConn -> WSConnData) -> WSConn -> TVar WSConnState
forall b c a. (b -> c) -> (a -> b) -> a -> c
. WSConn -> WSConnData
forall a. WSConn a -> a
WS.getData) WSConn
wsConn
          case WSConnState
connState of
            CSNotInitialised WsHeaders
_ IpAddress
_ -> STM UTCTime
forall a. STM a
STM.retry
            CSInitError Text
_ -> STM UTCTime
forall a. STM a
STM.retry
            CSInitialised WsClientState
clientState -> Maybe UTCTime -> STM UTCTime -> STM UTCTime
forall (m :: * -> *) a. Applicative m => Maybe a -> m a -> m a
onNothing (WsClientState -> Maybe UTCTime
wscsTokenExpTime WsClientState
clientState) STM UTCTime
forall a. STM a
STM.retry
      UTCTime
currTime <- IO UTCTime
TC.getCurrentTime
      DiffTime -> IO ()
sleep (DiffTime -> IO ()) -> DiffTime -> IO ()
forall a b. (a -> b) -> a -> b
$ NominalDiffTime -> DiffTime
forall x y. (Duration x, Duration y) => x -> y
convertDuration (NominalDiffTime -> DiffTime) -> NominalDiffTime -> DiffTime
forall a b. (a -> b) -> a -> b
$ UTCTime -> UTCTime -> NominalDiffTime
TC.diffUTCTime UTCTime
expTime UTCTime
currTime

    accept :: (WsHeaders, ErrRespType, GraphQLQueryType)
-> m (Either RejectRequest (AcceptWith WSConnData))
accept (WsHeaders
hdrs, ErrRespType
errType, GraphQLQueryType
queryType) = do
      (L.Logger forall a (m :: * -> *).
(ToEngineLog a Hasura, MonadIO m) =>
a -> m ()
logger) <- (WSServerEnv impl -> Logger Hasura) -> m (Logger Hasura)
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks WSServerEnv impl -> Logger Hasura
forall impl. WSServerEnv impl -> Logger Hasura
_wseLogger
      KeepAliveDelay
keepAliveDelay <- (WSServerEnv impl -> KeepAliveDelay) -> m KeepAliveDelay
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks WSServerEnv impl -> KeepAliveDelay
forall impl. WSServerEnv impl -> KeepAliveDelay
_wseKeepAliveDelay
      WSLog -> m ()
forall a (m :: * -> *).
(ToEngineLog a Hasura, MonadIO m) =>
a -> m ()
logger (WSLog -> m ()) -> WSLog -> m ()
forall a b. (a -> b) -> a -> b
$ Maybe SessionVariables -> WsConnInfo -> WSEvent -> WSLog
mkWsInfoLog Maybe SessionVariables
forall a. Maybe a
Nothing (WSId -> Maybe UTCTime -> Maybe Text -> WsConnInfo
WsConnInfo WSId
wsId Maybe UTCTime
forall a. Maybe a
Nothing Maybe Text
forall a. Maybe a
Nothing) WSEvent
EAccepted
      WSConnData
connData <-
        IO WSConnData -> m WSConnData
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
          (IO WSConnData -> m WSConnData) -> IO WSConnData -> m WSConnData
forall a b. (a -> b) -> a -> b
$ TVar WSConnState
-> OperationMap -> ErrRespType -> GraphQLQueryType -> WSConnData
WSConnData
          (TVar WSConnState
 -> OperationMap -> ErrRespType -> GraphQLQueryType -> WSConnData)
-> IO (TVar WSConnState)
-> IO
     (OperationMap -> ErrRespType -> GraphQLQueryType -> WSConnData)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> WSConnState -> IO (TVar WSConnState)
forall a. a -> IO (TVar a)
STM.newTVarIO (WsHeaders -> IpAddress -> WSConnState
CSNotInitialised WsHeaders
hdrs IpAddress
ipAddress)
          IO (OperationMap -> ErrRespType -> GraphQLQueryType -> WSConnData)
-> IO OperationMap
-> IO (ErrRespType -> GraphQLQueryType -> WSConnData)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> IO OperationMap
forall key value. IO (Map key value)
STMMap.newIO
          IO (ErrRespType -> GraphQLQueryType -> WSConnData)
-> IO ErrRespType -> IO (GraphQLQueryType -> WSConnData)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> ErrRespType -> IO ErrRespType
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ErrRespType
errType
          IO (GraphQLQueryType -> WSConnData)
-> IO GraphQLQueryType -> IO WSConnData
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> GraphQLQueryType -> IO GraphQLQueryType
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure GraphQLQueryType
queryType

      Either RejectRequest (AcceptWith WSConnData)
-> m (Either RejectRequest (AcceptWith WSConnData))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
        (Either RejectRequest (AcceptWith WSConnData)
 -> m (Either RejectRequest (AcceptWith WSConnData)))
-> Either RejectRequest (AcceptWith WSConnData)
-> m (Either RejectRequest (AcceptWith WSConnData))
forall a b. (a -> b) -> a -> b
$ AcceptWith WSConnData
-> Either RejectRequest (AcceptWith WSConnData)
forall a b. b -> Either a b
Right
        (AcceptWith WSConnData
 -> Either RejectRequest (AcceptWith WSConnData))
-> AcceptWith WSConnData
-> Either RejectRequest (AcceptWith WSConnData)
forall a b. (a -> b) -> a -> b
$ WSConnData
-> AcceptRequest
-> WSKeepAliveMessageAction WSConnData
-> WSKeepAliveMessageAction WSConnData
-> AcceptWith WSConnData
forall a.
a
-> AcceptRequest
-> (WSConn a -> IO ())
-> (WSConn a -> IO ())
-> AcceptWith a
WS.AcceptWith
          WSConnData
connData
          AcceptRequest
acceptRequest
          (KeepAliveDelay -> WSKeepAliveMessageAction WSConnData
keepAliveAction KeepAliveDelay
keepAliveDelay)
          WSKeepAliveMessageAction WSConnData
tokenExpiryHandler

    reject :: QErr -> m (Either RejectRequest (AcceptWith WSConnData))
reject QErr
qErr = do
      (L.Logger forall a (m :: * -> *).
(ToEngineLog a Hasura, MonadIO m) =>
a -> m ()
logger) <- (WSServerEnv impl -> Logger Hasura) -> m (Logger Hasura)
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks WSServerEnv impl -> Logger Hasura
forall impl. WSServerEnv impl -> Logger Hasura
_wseLogger
      WSLog -> m ()
forall a (m :: * -> *).
(ToEngineLog a Hasura, MonadIO m) =>
a -> m ()
logger (WSLog -> m ()) -> WSLog -> m ()
forall a b. (a -> b) -> a -> b
$ Maybe SessionVariables -> WsConnInfo -> WSEvent -> WSLog
mkWsErrorLog Maybe SessionVariables
forall a. Maybe a
Nothing (WSId -> Maybe UTCTime -> Maybe Text -> WsConnInfo
WsConnInfo WSId
wsId Maybe UTCTime
forall a. Maybe a
Nothing Maybe Text
forall a. Maybe a
Nothing) (QErr -> WSEvent
ERejected QErr
qErr)
      Either RejectRequest (AcceptWith WSConnData)
-> m (Either RejectRequest (AcceptWith WSConnData))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return
        (Either RejectRequest (AcceptWith WSConnData)
 -> m (Either RejectRequest (AcceptWith WSConnData)))
-> Either RejectRequest (AcceptWith WSConnData)
-> m (Either RejectRequest (AcceptWith WSConnData))
forall a b. (a -> b) -> a -> b
$ RejectRequest -> Either RejectRequest (AcceptWith WSConnData)
forall a b. a -> Either a b
Left
        (RejectRequest -> Either RejectRequest (AcceptWith WSConnData))
-> RejectRequest -> Either RejectRequest (AcceptWith WSConnData)
forall a b. (a -> b) -> a -> b
$ Int -> ByteString -> [Header] -> ByteString -> RejectRequest
WS.RejectRequest
          (Status -> Int
HTTP.statusCode (Status -> Int) -> Status -> Int
forall a b. (a -> b) -> a -> b
$ QErr -> Status
qeStatus QErr
qErr)
          (Status -> ByteString
HTTP.statusMessage (Status -> ByteString) -> Status -> ByteString
forall a b. (a -> b) -> a -> b
$ QErr -> Status
qeStatus QErr
qErr)
          []
          (ByteString -> ByteString
LBS.toStrict (ByteString -> ByteString) -> ByteString -> ByteString
forall a b. (a -> b) -> a -> b
$ Encoding -> ByteString
forall a. Encoding' a -> ByteString
J.encodingToLazyByteString (Encoding -> ByteString) -> Encoding -> ByteString
forall a b. (a -> b) -> a -> b
$ Bool -> QErr -> Encoding
encodeGQLErr Bool
False QErr
qErr)

    checkPath :: ExceptT QErr m (ErrRespType, GraphQLQueryType)
checkPath = case RequestHead -> ByteString
WS.requestPath RequestHead
requestHead of
      ByteString
"/v1alpha1/graphql" -> (ErrRespType, GraphQLQueryType)
-> ExceptT QErr m (ErrRespType, GraphQLQueryType)
forall a. a -> ExceptT QErr m a
forall (m :: * -> *) a. Monad m => a -> m a
return (ErrRespType
ERTLegacy, GraphQLQueryType
E.QueryHasura)
      ByteString
"/v1/graphql" -> (ErrRespType, GraphQLQueryType)
-> ExceptT QErr m (ErrRespType, GraphQLQueryType)
forall a. a -> ExceptT QErr m a
forall (m :: * -> *) a. Monad m => a -> m a
return (ErrRespType
ERTGraphqlCompliant, GraphQLQueryType
E.QueryHasura)
      ByteString
"/v1beta1/relay" -> (ErrRespType, GraphQLQueryType)
-> ExceptT QErr m (ErrRespType, GraphQLQueryType)
forall a. a -> ExceptT QErr m a
forall (m :: * -> *) a. Monad m => a -> m a
return (ErrRespType
ERTGraphqlCompliant, GraphQLQueryType
E.QueryRelay)
      ByteString
_ ->
        Text -> ExceptT QErr m (ErrRespType, GraphQLQueryType)
forall (m :: * -> *) a. QErrM m => Text -> m a
throw404 Text
"only '/v1/graphql', '/v1alpha1/graphql' and '/v1beta1/relay' are supported on websockets"

    getOrigin :: Maybe Header
getOrigin =
      (Header -> Bool) -> [Header] -> Maybe Header
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Maybe a
find (CI ByteString -> CI ByteString -> Bool
forall a. Eq a => a -> a -> Bool
(==) CI ByteString
"Origin" (CI ByteString -> Bool)
-> (Header -> CI ByteString) -> Header -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Header -> CI ByteString
forall a b. (a, b) -> a
fst) (RequestHead -> [Header]
WS.requestHeaders RequestHead
requestHead)

    enforceCors :: ByteString -> [Header] -> ExceptT QErr m [Header]
enforceCors ByteString
origin [Header]
reqHdrs = do
      (L.Logger forall a (m :: * -> *).
(ToEngineLog a Hasura, MonadIO m) =>
a -> m ()
logger) <- (WSServerEnv impl -> Logger Hasura)
-> ExceptT QErr m (Logger Hasura)
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks WSServerEnv impl -> Logger Hasura
forall impl. WSServerEnv impl -> Logger Hasura
_wseLogger
      CorsPolicy
corsPolicy <- IO CorsPolicy -> ExceptT QErr m CorsPolicy
forall a. IO a -> ExceptT QErr m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO CorsPolicy -> ExceptT QErr m CorsPolicy)
-> ExceptT QErr m (IO CorsPolicy) -> ExceptT QErr m CorsPolicy
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< (WSServerEnv impl -> IO CorsPolicy)
-> ExceptT QErr m (IO CorsPolicy)
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks WSServerEnv impl -> IO CorsPolicy
forall impl. WSServerEnv impl -> IO CorsPolicy
_wseCorsPolicy
      case CorsPolicy -> CorsConfig
cpConfig CorsPolicy
corsPolicy of
        CorsConfig
CCAllowAll -> [Header] -> ExceptT QErr m [Header]
forall a. a -> ExceptT QErr m a
forall (m :: * -> *) a. Monad m => a -> m a
return [Header]
reqHdrs
        CCDisabled Bool
readCookie ->
          if Bool
readCookie
            then [Header] -> ExceptT QErr m [Header]
forall a. a -> ExceptT QErr m a
forall (m :: * -> *) a. Monad m => a -> m a
return [Header]
reqHdrs
            else do
              m () -> ExceptT QErr m ()
forall (m :: * -> *) a. Monad m => m a -> ExceptT QErr m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m () -> ExceptT QErr m ()) -> m () -> ExceptT QErr m ()
forall a b. (a -> b) -> a -> b
$ WSLog -> m ()
forall a (m :: * -> *).
(ToEngineLog a Hasura, MonadIO m) =>
a -> m ()
logger (WSLog -> m ()) -> WSLog -> m ()
forall a b. (a -> b) -> a -> b
$ Maybe SessionVariables -> WsConnInfo -> WSEvent -> WSLog
mkWsInfoLog Maybe SessionVariables
forall a. Maybe a
Nothing (WSId -> Maybe UTCTime -> Maybe Text -> WsConnInfo
WsConnInfo WSId
wsId Maybe UTCTime
forall a. Maybe a
Nothing (Text -> Maybe Text
forall a. a -> Maybe a
Just Text
corsNote)) WSEvent
EAccepted
              [Header] -> ExceptT QErr m [Header]
forall a. a -> ExceptT QErr m a
forall (m :: * -> *) a. Monad m => a -> m a
return ([Header] -> ExceptT QErr m [Header])
-> [Header] -> ExceptT QErr m [Header]
forall a b. (a -> b) -> a -> b
$ (Header -> Bool) -> [Header] -> [Header]
forall a. (a -> Bool) -> [a] -> [a]
filter (\Header
h -> Header -> CI ByteString
forall a b. (a, b) -> a
fst Header
h CI ByteString -> CI ByteString -> Bool
forall a. Eq a => a -> a -> Bool
/= CI ByteString
"Cookie") [Header]
reqHdrs
        CCAllowedOrigins Domains
ds
          -- if the origin is in our cors domains, no error
          | ByteString -> Text
bsToTxt ByteString
origin Text -> HashSet Text -> Bool
forall a. Eq a => a -> HashSet a -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` Domains -> HashSet Text
dmFqdns Domains
ds -> [Header] -> ExceptT QErr m [Header]
forall a. a -> ExceptT QErr m a
forall (m :: * -> *) a. Monad m => a -> m a
return [Header]
reqHdrs
          -- if current origin is part of wildcard domain list, no error
          | Domains -> Text -> Bool
inWildcardList Domains
ds (ByteString -> Text
bsToTxt ByteString
origin) -> [Header] -> ExceptT QErr m [Header]
forall a. a -> ExceptT QErr m a
forall (m :: * -> *) a. Monad m => a -> m a
return [Header]
reqHdrs
          -- otherwise error
          | Bool
otherwise -> ExceptT QErr m [Header]
forall {a}. ExceptT QErr m a
corsErr

    filterWsHeaders :: [(a, b)] -> [(a, b)]
filterWsHeaders [(a, b)]
hdrs = (((a, b) -> Bool) -> [(a, b)] -> [(a, b)])
-> [(a, b)] -> ((a, b) -> Bool) -> [(a, b)]
forall a b c. (a -> b -> c) -> b -> a -> c
flip ((a, b) -> Bool) -> [(a, b)] -> [(a, b)]
forall a. (a -> Bool) -> [a] -> [a]
filter [(a, b)]
hdrs (((a, b) -> Bool) -> [(a, b)]) -> ((a, b) -> Bool) -> [(a, b)]
forall a b. (a -> b) -> a -> b
$ \(a
n, b
_) ->
      a
n
        a -> [a] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`notElem` [ a
"sec-websocket-key",
                    a
"sec-websocket-version",
                    a
"upgrade",
                    a
"connection"
                  ]

    corsErr :: ExceptT QErr m a
corsErr =
      Code -> Text -> ExceptT QErr m a
forall (m :: * -> *) a. QErrM m => Code -> Text -> m a
throw400
        Code
AccessDenied
        Text
"received origin header does not match configured CORS domains"

    corsNote :: Text
corsNote =
      Text
"Cookie is not read when CORS is disabled, because it is a potential "
        Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"security issue. If you're already handling CORS before Hasura and enforcing "
        Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"CORS on websocket connections, then you can use the flag --ws-read-cookie or "
        Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"HASURA_GRAPHQL_WS_READ_COOKIE to force read cookie when CORS is disabled."

-- Helper for avoiding boolean blindness
data ShouldCaptureQueryVariables
  = CaptureQueryVariables
  | DoNotCaptureQueryVariables

onStart ::
  forall m impl.
  ( MonadIO m,
    E.MonadGQLExecutionCheck m,
    MonadQueryLog m,
    MonadExecutionLog m,
    Tracing.MonadTrace m,
    MonadExecuteQuery m,
    MC.MonadBaseControl IO m,
    MonadMetadataStorage m,
    MonadQueryTags m,
    HasResourceLimits m,
    ProvidesNetwork m,
    MonadGetPolicies m
  ) =>
  HashSet (L.EngineLogType L.Hasura) ->
  Maybe (CredentialCache AgentLicenseKey) ->
  WSServerEnv impl ->
  WSConn ->
  ShouldCaptureQueryVariables ->
  StartMsg ->
  WS.WSActions WSConnData ->
  m ()
onStart :: forall (m :: * -> *) impl.
(MonadIO m, MonadGQLExecutionCheck m, MonadQueryLog m,
 MonadExecutionLog m, MonadTrace m, MonadExecuteQuery m,
 MonadBaseControl IO m, MonadMetadataStorage m, MonadQueryTags m,
 HasResourceLimits m, ProvidesNetwork m, MonadGetPolicies m) =>
HashSet (EngineLogType Hasura)
-> Maybe (CredentialCache AgentLicenseKey)
-> WSServerEnv impl
-> WSConn
-> ShouldCaptureQueryVariables
-> StartMsg
-> WSActions WSConnData
-> m ()
onStart HashSet (EngineLogType Hasura)
enabledLogTypes Maybe (CredentialCache AgentLicenseKey)
agentLicenseKey WSServerEnv impl
serverEnv WSConn
wsConn ShouldCaptureQueryVariables
shouldCaptureVariables (StartMsg OperationId
opId GQLReqUnparsed
q) WSActions WSConnData
onMessageActions = ExceptT () m () -> m ()
catchAndIgnore (ExceptT () m () -> m ()) -> ExceptT () m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
  ExceptT () m DiffTime
timerTot <- ExceptT () m (ExceptT () m DiffTime)
forall (m :: * -> *) (n :: * -> *).
(MonadIO m, MonadIO n) =>
m (n DiffTime)
startTimer
  Maybe (SubscriberType, Maybe OperationName)
op <- IO (Maybe (SubscriberType, Maybe OperationName))
-> ExceptT () m (Maybe (SubscriberType, Maybe OperationName))
forall a. IO a -> ExceptT () m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (SubscriberType, Maybe OperationName))
 -> ExceptT () m (Maybe (SubscriberType, Maybe OperationName)))
-> IO (Maybe (SubscriberType, Maybe OperationName))
-> ExceptT () m (Maybe (SubscriberType, Maybe OperationName))
forall a b. (a -> b) -> a -> b
$ STM (Maybe (SubscriberType, Maybe OperationName))
-> IO (Maybe (SubscriberType, Maybe OperationName))
forall a. STM a -> IO a
STM.atomically (STM (Maybe (SubscriberType, Maybe OperationName))
 -> IO (Maybe (SubscriberType, Maybe OperationName)))
-> STM (Maybe (SubscriberType, Maybe OperationName))
-> IO (Maybe (SubscriberType, Maybe OperationName))
forall a b. (a -> b) -> a -> b
$ OperationId
-> OperationMap
-> STM (Maybe (SubscriberType, Maybe OperationName))
forall key value.
(Eq key, Hashable key) =>
key -> Map key value -> STM (Maybe value)
STMMap.lookup OperationId
opId OperationMap
opMap

  -- NOTE: it should be safe to rely on this check later on in this function, since we expect that
  -- we process all operations on a websocket connection serially:
  Bool -> ExceptT () m () -> ExceptT () m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Maybe (SubscriberType, Maybe OperationName) -> Bool
forall a. Maybe a -> Bool
isJust Maybe (SubscriberType, Maybe OperationName)
op)
    (ExceptT () m () -> ExceptT () m ())
-> ExceptT () m () -> ExceptT () m ()
forall a b. (a -> b) -> a -> b
$ ExceptT () m () -> ExceptT () m ()
forall a. ExceptT () m () -> ExceptT () m a
withComplete
    (ExceptT () m () -> ExceptT () m ())
-> ExceptT () m () -> ExceptT () m ()
forall a b. (a -> b) -> a -> b
$ Text -> ExceptT () m ()
sendStartErr
    (Text -> ExceptT () m ()) -> Text -> ExceptT () m ()
forall a b. (a -> b) -> a -> b
$ Text
"an operation already exists with this id: "
    Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> OperationId -> Text
unOperationId OperationId
opId

  WSConnState
userInfoM <- IO WSConnState -> ExceptT () m WSConnState
forall a. IO a -> ExceptT () m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO WSConnState -> ExceptT () m WSConnState)
-> IO WSConnState -> ExceptT () m WSConnState
forall a b. (a -> b) -> a -> b
$ TVar WSConnState -> IO WSConnState
forall a. TVar a -> IO a
STM.readTVarIO TVar WSConnState
userInfoR
  (UserInfo
userInfo, [Header]
origReqHdrs, IpAddress
ipAddress) <- case WSConnState
userInfoM of
    CSInitialised WsClientState {[Header]
Maybe UTCTime
IpAddress
UserInfo
wscsUserInfo :: WsClientState -> UserInfo
wscsTokenExpTime :: WsClientState -> Maybe UTCTime
wscsReqHeaders :: WsClientState -> [Header]
wscsIpAddress :: WsClientState -> IpAddress
wscsUserInfo :: UserInfo
wscsTokenExpTime :: Maybe UTCTime
wscsReqHeaders :: [Header]
wscsIpAddress :: IpAddress
..} -> (UserInfo, [Header], IpAddress)
-> ExceptT () m (UserInfo, [Header], IpAddress)
forall a. a -> ExceptT () m a
forall (m :: * -> *) a. Monad m => a -> m a
return (UserInfo
wscsUserInfo, [Header]
wscsReqHeaders, IpAddress
wscsIpAddress)
    CSInitError Text
initErr -> do
      let e :: Text
e = Text
"cannot start as connection_init failed with: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
initErr
      ExceptT () m () -> ExceptT () m (UserInfo, [Header], IpAddress)
forall a. ExceptT () m () -> ExceptT () m a
withComplete (ExceptT () m () -> ExceptT () m (UserInfo, [Header], IpAddress))
-> ExceptT () m () -> ExceptT () m (UserInfo, [Header], IpAddress)
forall a b. (a -> b) -> a -> b
$ Text -> ExceptT () m ()
sendStartErr Text
e
    CSNotInitialised WsHeaders
_ IpAddress
_ -> do
      let e :: Text
e = Text
"start received before the connection is initialised"
      ExceptT () m () -> ExceptT () m (UserInfo, [Header], IpAddress)
forall a. ExceptT () m () -> ExceptT () m a
withComplete (ExceptT () m () -> ExceptT () m (UserInfo, [Header], IpAddress))
-> ExceptT () m () -> ExceptT () m (UserInfo, [Header], IpAddress)
forall a b. (a -> b) -> a -> b
$ Text -> ExceptT () m ()
sendStartErr Text
e

  (RequestId
requestId, [Header]
reqHdrs) <- IO (RequestId, [Header]) -> ExceptT () m (RequestId, [Header])
forall a. IO a -> ExceptT () m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (RequestId, [Header]) -> ExceptT () m (RequestId, [Header]))
-> IO (RequestId, [Header]) -> ExceptT () m (RequestId, [Header])
forall a b. (a -> b) -> a -> b
$ [Header] -> IO (RequestId, [Header])
forall (m :: * -> *).
MonadIO m =>
[Header] -> m (RequestId, [Header])
getRequestId [Header]
origReqHdrs
  SchemaCache
sc <- IO SchemaCache -> ExceptT () m SchemaCache
forall a. IO a -> ExceptT () m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO SchemaCache -> ExceptT () m SchemaCache)
-> IO SchemaCache -> ExceptT () m SchemaCache
forall a b. (a -> b) -> a -> b
$ AppStateRef impl -> IO SchemaCache
forall impl. AppStateRef impl -> IO SchemaCache
getSchemaCacheWithVersion AppStateRef impl
appStateRef

  ResourceLimits
operationLimit <- RequestId -> UserInfo -> ApiLimit -> ExceptT () m ResourceLimits
forall (m :: * -> *).
HasResourceLimits m =>
RequestId -> UserInfo -> ApiLimit -> m ResourceLimits
askGraphqlOperationLimit RequestId
requestId UserInfo
userInfo (SchemaCache -> ApiLimit
scApiLimits SchemaCache
sc)
  let runLimits ::
        ExceptT (Either GQExecError QErr) (ExceptT () m) a ->
        ExceptT (Either GQExecError QErr) (ExceptT () m) a
      runLimits :: forall a.
ExceptT (Either GQExecError QErr) (ExceptT () m) a
-> ExceptT (Either GQExecError QErr) (ExceptT () m) a
runLimits = (QErr -> Either GQExecError QErr)
-> (ExceptT
      QErr (ExceptT (Either GQExecError QErr) (ExceptT () m)) a
    -> ExceptT
         QErr (ExceptT (Either GQExecError QErr) (ExceptT () m)) a)
-> ExceptT (Either GQExecError QErr) (ExceptT () m) a
-> ExceptT (Either GQExecError QErr) (ExceptT () m) a
forall e f (n :: * -> *) a.
Monad n =>
(e -> f)
-> (ExceptT e (ExceptT f n) a -> ExceptT e (ExceptT f n) a)
-> ExceptT f n a
-> ExceptT f n a
withErr QErr -> Either GQExecError QErr
forall a b. b -> Either a b
Right ((ExceptT QErr (ExceptT (Either GQExecError QErr) (ExceptT () m)) a
  -> ExceptT
       QErr (ExceptT (Either GQExecError QErr) (ExceptT () m)) a)
 -> ExceptT (Either GQExecError QErr) (ExceptT () m) a
 -> ExceptT (Either GQExecError QErr) (ExceptT () m) a)
-> (ExceptT
      QErr (ExceptT (Either GQExecError QErr) (ExceptT () m)) a
    -> ExceptT
         QErr (ExceptT (Either GQExecError QErr) (ExceptT () m)) a)
-> ExceptT (Either GQExecError QErr) (ExceptT () m) a
-> ExceptT (Either GQExecError QErr) (ExceptT () m) a
forall a b. (a -> b) -> a -> b
$ ResourceLimits
-> forall (m :: * -> *) a.
   (MonadBaseControl IO m, MonadError QErr m) =>
   m a -> m a
runResourceLimits ResourceLimits
operationLimit

  Environment
env <- IO Environment -> ExceptT () m Environment
forall a. IO a -> ExceptT () m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Environment -> ExceptT () m Environment)
-> IO Environment -> ExceptT () m Environment
forall a b. (a -> b) -> a -> b
$ AppContext -> Environment
acEnvironment (AppContext -> Environment) -> IO AppContext -> IO Environment
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> AppStateRef impl -> IO AppContext
forall impl. AppStateRef impl -> IO AppContext
getAppContext AppStateRef impl
appStateRef
  SQLGenCtx
sqlGenCtx <- IO SQLGenCtx -> ExceptT () m SQLGenCtx
forall a. IO a -> ExceptT () m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO SQLGenCtx -> ExceptT () m SQLGenCtx)
-> IO SQLGenCtx -> ExceptT () m SQLGenCtx
forall a b. (a -> b) -> a -> b
$ AppContext -> SQLGenCtx
acSQLGenCtx (AppContext -> SQLGenCtx) -> IO AppContext -> IO SQLGenCtx
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> AppStateRef impl -> IO AppContext
forall impl. AppStateRef impl -> IO AppContext
getAppContext AppStateRef impl
appStateRef
  AllowListStatus
enableAL <- IO AllowListStatus -> ExceptT () m AllowListStatus
forall a. IO a -> ExceptT () m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO AllowListStatus -> ExceptT () m AllowListStatus)
-> IO AllowListStatus -> ExceptT () m AllowListStatus
forall a b. (a -> b) -> a -> b
$ AppContext -> AllowListStatus
acEnableAllowlist (AppContext -> AllowListStatus)
-> IO AppContext -> IO AllowListStatus
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> AppStateRef impl -> IO AppContext
forall impl. AppStateRef impl -> IO AppContext
getAppContext AppStateRef impl
appStateRef

  (GQLReqParsed
reqParsed, SingleOperation
queryParts) <- Text
-> ExceptT () m (GQLReqParsed, SingleOperation)
-> ExceptT () m (GQLReqParsed, SingleOperation)
forall (m :: * -> *) a.
(MonadIO m, MonadTrace m) =>
Text -> m a -> m a
Tracing.newSpan Text
"Parse GraphQL" (ExceptT () m (GQLReqParsed, SingleOperation)
 -> ExceptT () m (GQLReqParsed, SingleOperation))
-> ExceptT () m (GQLReqParsed, SingleOperation)
-> ExceptT () m (GQLReqParsed, SingleOperation)
forall a b. (a -> b) -> a -> b
$ do
    Either QErr GQLReqParsed
reqParsedE <- m (Either QErr GQLReqParsed)
-> ExceptT () m (Either QErr GQLReqParsed)
forall (m :: * -> *) a. Monad m => m a -> ExceptT () m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m (Either QErr GQLReqParsed)
 -> ExceptT () m (Either QErr GQLReqParsed))
-> m (Either QErr GQLReqParsed)
-> ExceptT () m (Either QErr GQLReqParsed)
forall a b. (a -> b) -> a -> b
$ UserInfo
-> ([Header], IpAddress)
-> AllowListStatus
-> SchemaCache
-> GQLReqUnparsed
-> RequestId
-> m (Either QErr GQLReqParsed)
forall (m :: * -> *).
MonadGQLExecutionCheck m =>
UserInfo
-> ([Header], IpAddress)
-> AllowListStatus
-> SchemaCache
-> GQLReqUnparsed
-> RequestId
-> m (Either QErr GQLReqParsed)
E.checkGQLExecution UserInfo
userInfo ([Header]
reqHdrs, IpAddress
ipAddress) AllowListStatus
enableAL SchemaCache
sc GQLReqUnparsed
q RequestId
requestId
    GQLReqParsed
reqParsed <- Either QErr GQLReqParsed
-> (QErr -> ExceptT () m GQLReqParsed) -> ExceptT () m GQLReqParsed
forall (m :: * -> *) e a.
Applicative m =>
Either e a -> (e -> m a) -> m a
onLeft Either QErr GQLReqParsed
reqParsedE (ExceptT () m () -> ExceptT () m GQLReqParsed
forall a. ExceptT () m () -> ExceptT () m a
withComplete (ExceptT () m () -> ExceptT () m GQLReqParsed)
-> (QErr -> ExceptT () m ()) -> QErr -> ExceptT () m GQLReqParsed
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RequestId -> Maybe OperationType -> QErr -> ExceptT () m ()
preExecErr RequestId
requestId Maybe OperationType
forall a. Maybe a
Nothing)
    Either QErr SingleOperation
queryPartsE <- ExceptT QErr (ExceptT () m) SingleOperation
-> ExceptT () m (Either QErr SingleOperation)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT QErr (ExceptT () m) SingleOperation
 -> ExceptT () m (Either QErr SingleOperation))
-> ExceptT QErr (ExceptT () m) SingleOperation
-> ExceptT () m (Either QErr SingleOperation)
forall a b. (a -> b) -> a -> b
$ GQLReqParsed -> ExceptT QErr (ExceptT () m) SingleOperation
forall (m :: * -> *).
MonadError QErr m =>
GQLReqParsed -> m SingleOperation
getSingleOperation GQLReqParsed
reqParsed
    SingleOperation
queryParts <- Either QErr SingleOperation
-> (QErr -> ExceptT () m SingleOperation)
-> ExceptT () m SingleOperation
forall (m :: * -> *) e a.
Applicative m =>
Either e a -> (e -> m a) -> m a
onLeft Either QErr SingleOperation
queryPartsE (ExceptT () m () -> ExceptT () m SingleOperation
forall a. ExceptT () m () -> ExceptT () m a
withComplete (ExceptT () m () -> ExceptT () m SingleOperation)
-> (QErr -> ExceptT () m ())
-> QErr
-> ExceptT () m SingleOperation
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RequestId -> Maybe OperationType -> QErr -> ExceptT () m ()
preExecErr RequestId
requestId Maybe OperationType
forall a. Maybe a
Nothing)
    (GQLReqParsed, SingleOperation)
-> ExceptT () m (GQLReqParsed, SingleOperation)
forall a. a -> ExceptT () m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (GQLReqParsed
reqParsed, SingleOperation
queryParts)

  let gqlOpType :: OperationType
gqlOpType = SingleOperation -> OperationType
forall (frag :: * -> *) var.
TypedOperationDefinition frag var -> OperationType
G._todType SingleOperation
queryParts
      opName :: Maybe OperationName
opName = GQLReqParsed -> Maybe OperationName
getOpNameFromParsedReq GQLReqParsed
reqParsed
      maybeOperationName :: Maybe Name
maybeOperationName = OperationName -> Name
_unOperationName (OperationName -> Name) -> Maybe OperationName -> Maybe Name
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe OperationName
opName
  Maybe Name -> (Name -> ExceptT () m ()) -> ExceptT () m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ Maybe Name
maybeOperationName ((Name -> ExceptT () m ()) -> ExceptT () m ())
-> (Name -> ExceptT () m ()) -> ExceptT () m ()
forall a b. (a -> b) -> a -> b
$ \Name
nm ->
    -- https://opentelemetry.io/docs/reference/specification/trace/semantic_conventions/instrumentation/graphql/
    TraceMetadata -> ExceptT () m ()
forall (m :: * -> *). MonadTrace m => TraceMetadata -> m ()
Tracing.attachMetadata [(Text
"graphql.operation.name", Name -> Text
unName Name
nm)]
  Either QErr (ParameterizedQueryHash, ResolvedExecutionPlan)
execPlanE <-
    ExceptT
  QErr (ExceptT () m) (ParameterizedQueryHash, ResolvedExecutionPlan)
-> ExceptT
     () m (Either QErr (ParameterizedQueryHash, ResolvedExecutionPlan))
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT
      (ExceptT
   QErr (ExceptT () m) (ParameterizedQueryHash, ResolvedExecutionPlan)
 -> ExceptT
      () m (Either QErr (ParameterizedQueryHash, ResolvedExecutionPlan)))
-> ExceptT
     QErr (ExceptT () m) (ParameterizedQueryHash, ResolvedExecutionPlan)
-> ExceptT
     () m (Either QErr (ParameterizedQueryHash, ResolvedExecutionPlan))
forall a b. (a -> b) -> a -> b
$ Environment
-> Logger Hasura
-> PrometheusMetrics
-> UserInfo
-> SQLGenCtx
-> ReadOnlyMode
-> SchemaCache
-> GraphQLQueryType
-> [Header]
-> GQLReqUnparsed
-> SingleOperation
-> Maybe Name
-> RequestId
-> ExceptT
     QErr (ExceptT () m) (ParameterizedQueryHash, ResolvedExecutionPlan)
forall (m :: * -> *).
(MonadError QErr m, MonadMetadataStorage m, MonadIO m,
 MonadBaseControl IO m, MonadTrace m, MonadGQLExecutionCheck m,
 MonadQueryTags m, ProvidesNetwork m) =>
Environment
-> Logger Hasura
-> PrometheusMetrics
-> UserInfo
-> SQLGenCtx
-> ReadOnlyMode
-> SchemaCache
-> GraphQLQueryType
-> [Header]
-> GQLReqUnparsed
-> SingleOperation
-> Maybe Name
-> RequestId
-> m (ParameterizedQueryHash, ResolvedExecutionPlan)
E.getResolvedExecPlan
        Environment
env
        Logger Hasura
logger
        PrometheusMetrics
prometheusMetrics
        UserInfo
userInfo
        SQLGenCtx
sqlGenCtx
        ReadOnlyMode
readOnlyMode
        SchemaCache
sc
        GraphQLQueryType
queryType
        [Header]
reqHdrs
        GQLReqUnparsed
q
        SingleOperation
queryParts
        Maybe Name
maybeOperationName
        RequestId
requestId

  (ParameterizedQueryHash
parameterizedQueryHash, ResolvedExecutionPlan
execPlan) <- Either QErr (ParameterizedQueryHash, ResolvedExecutionPlan)
-> (QErr
    -> ExceptT () m (ParameterizedQueryHash, ResolvedExecutionPlan))
-> ExceptT () m (ParameterizedQueryHash, ResolvedExecutionPlan)
forall (m :: * -> *) e a.
Applicative m =>
Either e a -> (e -> m a) -> m a
onLeft Either QErr (ParameterizedQueryHash, ResolvedExecutionPlan)
execPlanE (ExceptT () m ()
-> ExceptT () m (ParameterizedQueryHash, ResolvedExecutionPlan)
forall a. ExceptT () m () -> ExceptT () m a
withComplete (ExceptT () m ()
 -> ExceptT () m (ParameterizedQueryHash, ResolvedExecutionPlan))
-> (QErr -> ExceptT () m ())
-> QErr
-> ExceptT () m (ParameterizedQueryHash, ResolvedExecutionPlan)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RequestId -> Maybe OperationType -> QErr -> ExceptT () m ()
preExecErr RequestId
requestId (OperationType -> Maybe OperationType
forall a. a -> Maybe a
Just OperationType
gqlOpType))

  case ResolvedExecutionPlan
execPlan of
    E.QueryExecutionPlan ExecutionPlan
queryPlan [QueryRootField UnpreparedValue]
asts DirectiveMap
dirMap -> do
      let cachedDirective :: Maybe CachedDirective
cachedDirective = Identity CachedDirective -> CachedDirective
forall a. Identity a -> a
runIdentity (Identity CachedDirective -> CachedDirective)
-> Maybe (Identity CachedDirective) -> Maybe CachedDirective
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> DirectiveKey CachedDirective
-> DirectiveMap -> Maybe (Identity CachedDirective)
forall {k1} (k2 :: k1 -> *) (f :: k1 -> *) (v :: k1).
GCompare k2 =>
k2 v -> DMap k2 f -> Maybe (f v)
DM.lookup DirectiveKey CachedDirective
cached DirectiveMap
dirMap

      -- We ignore the response headers (containing TTL information) because
      -- WebSockets don't support them.
      CacheResult
cachedValue <-
        ExecutionPlan
-> [QueryRootField UnpreparedValue]
-> Maybe CachedDirective
-> GQLReqParsed
-> UserInfo
-> [Header]
-> ExceptT () m (Either QErr ([Header], CacheResult))
forall (m :: * -> *).
MonadExecuteQuery m =>
ExecutionPlan
-> [QueryRootField UnpreparedValue]
-> Maybe CachedDirective
-> GQLReqParsed
-> UserInfo
-> [Header]
-> m (Either QErr ([Header], CacheResult))
cacheLookup ExecutionPlan
queryPlan [QueryRootField UnpreparedValue]
asts Maybe CachedDirective
cachedDirective GQLReqParsed
reqParsed UserInfo
userInfo [Header]
reqHdrs ExceptT () m (Either QErr ([Header], CacheResult))
-> (Either QErr ([Header], CacheResult)
    -> ExceptT () m CacheResult)
-> ExceptT () m CacheResult
forall a b.
ExceptT () m a -> (a -> ExceptT () m b) -> ExceptT () m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
          Right ([Header]
_responseHeaders, CacheResult
cachedValue) -> CacheResult -> ExceptT () m CacheResult
forall a. a -> ExceptT () m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure CacheResult
cachedValue
          Left QErr
_err -> () -> ExceptT () m CacheResult
forall a. () -> ExceptT () m a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError ()
      case CacheResult
cachedValue of
        ResponseCached EncJSON
cachedResponseData -> do
          Logger Hasura -> QueryLog -> ExceptT () m ()
forall (m :: * -> *).
MonadQueryLog m =>
Logger Hasura -> QueryLog -> m ()
logQueryLog Logger Hasura
logger (QueryLog -> ExceptT () m ()) -> QueryLog -> ExceptT () m ()
forall a b. (a -> b) -> a -> b
$ GQLReqUnparsed
-> Maybe (RootFieldAlias, GeneratedQuery)
-> RequestId
-> QueryLogKind
-> QueryLog
QueryLog GQLReqUnparsed
q Maybe (RootFieldAlias, GeneratedQuery)
forall a. Maybe a
Nothing RequestId
requestId QueryLogKind
QueryLogKindCached
          let reportedExecutionTime :: DiffTime
reportedExecutionTime = DiffTime
0
          IO () -> ExceptT () m ()
forall a. IO a -> ExceptT () m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ExceptT () m ()) -> IO () -> ExceptT () m ()
forall a b. (a -> b) -> a -> b
$ DiffTime -> OperationType -> IO ()
recordGQLQuerySuccess DiffTime
reportedExecutionTime OperationType
gqlOpType
          EncJSON
-> Maybe OperationName
-> ParameterizedQueryHash
-> SubscriptionMetadata
-> ExceptT () m ()
sendSuccResp EncJSON
cachedResponseData Maybe OperationName
opName ParameterizedQueryHash
parameterizedQueryHash (SubscriptionMetadata -> ExceptT () m ())
-> SubscriptionMetadata -> ExceptT () m ()
forall a b. (a -> b) -> a -> b
$ DiffTime -> SubscriptionMetadata
ES.SubscriptionMetadata DiffTime
reportedExecutionTime
        ResponseUncached Maybe ResponseCacher
storeResponseM -> do
          Either
  (Either GQExecError QErr)
  (InsOrdHashMap RootFieldAlias AnnotatedResponsePart)
conclusion <- ExceptT
  (Either GQExecError QErr)
  (ExceptT () m)
  (InsOrdHashMap RootFieldAlias AnnotatedResponsePart)
-> ExceptT
     ()
     m
     (Either
        (Either GQExecError QErr)
        (InsOrdHashMap RootFieldAlias AnnotatedResponsePart))
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT
            (ExceptT
   (Either GQExecError QErr)
   (ExceptT () m)
   (InsOrdHashMap RootFieldAlias AnnotatedResponsePart)
 -> ExceptT
      ()
      m
      (Either
         (Either GQExecError QErr)
         (InsOrdHashMap RootFieldAlias AnnotatedResponsePart)))
-> ExceptT
     (Either GQExecError QErr)
     (ExceptT () m)
     (InsOrdHashMap RootFieldAlias AnnotatedResponsePart)
-> ExceptT
     ()
     m
     (Either
        (Either GQExecError QErr)
        (InsOrdHashMap RootFieldAlias AnnotatedResponsePart))
forall a b. (a -> b) -> a -> b
$ ExceptT
  (Either GQExecError QErr)
  (ExceptT () m)
  (InsOrdHashMap RootFieldAlias AnnotatedResponsePart)
-> ExceptT
     (Either GQExecError QErr)
     (ExceptT () m)
     (InsOrdHashMap RootFieldAlias AnnotatedResponsePart)
forall a.
ExceptT (Either GQExecError QErr) (ExceptT () m) a
-> ExceptT (Either GQExecError QErr) (ExceptT () m) a
runLimits
            (ExceptT
   (Either GQExecError QErr)
   (ExceptT () m)
   (InsOrdHashMap RootFieldAlias AnnotatedResponsePart)
 -> ExceptT
      (Either GQExecError QErr)
      (ExceptT () m)
      (InsOrdHashMap RootFieldAlias AnnotatedResponsePart))
-> ExceptT
     (Either GQExecError QErr)
     (ExceptT () m)
     (InsOrdHashMap RootFieldAlias AnnotatedResponsePart)
-> ExceptT
     (Either GQExecError QErr)
     (ExceptT () m)
     (InsOrdHashMap RootFieldAlias AnnotatedResponsePart)
forall a b. (a -> b) -> a -> b
$ ExecutionPlan
-> (RootFieldAlias
    -> ExecutionStep
    -> ExceptT
         (Either GQExecError QErr) (ExceptT () m) AnnotatedResponsePart)
-> ExceptT
     (Either GQExecError QErr)
     (ExceptT () m)
     (InsOrdHashMap RootFieldAlias AnnotatedResponsePart)
forall {k} {a} {b}.
InsOrdHashMap k a
-> (k -> a -> ExceptT (Either GQExecError QErr) (ExceptT () m) b)
-> ExceptT
     (Either GQExecError QErr) (ExceptT () m) (InsOrdHashMap k b)
forWithKey ExecutionPlan
queryPlan
            ((RootFieldAlias
  -> ExecutionStep
  -> ExceptT
       (Either GQExecError QErr) (ExceptT () m) AnnotatedResponsePart)
 -> ExceptT
      (Either GQExecError QErr)
      (ExceptT () m)
      (InsOrdHashMap RootFieldAlias AnnotatedResponsePart))
-> (RootFieldAlias
    -> ExecutionStep
    -> ExceptT
         (Either GQExecError QErr) (ExceptT () m) AnnotatedResponsePart)
-> ExceptT
     (Either GQExecError QErr)
     (ExceptT () m)
     (InsOrdHashMap RootFieldAlias AnnotatedResponsePart)
forall a b. (a -> b) -> a -> b
$ \RootFieldAlias
fieldName ->
              let getResponse :: ExecutionStep
-> ExceptT
     (Either GQExecError QErr) (ExceptT () m) AnnotatedResponsePart
getResponse = \case
                    E.ExecStepDB [Header]
_headers AnyBackend DBStepInfo
exists Maybe RemoteJoins
remoteJoins -> ExceptT QErr (ExceptT () m) AnnotatedResponsePart
-> ExceptT
     (Either GQExecError QErr) (ExceptT () m) AnnotatedResponsePart
forall (n :: * -> *) a.
Monad n =>
ExceptT QErr n a -> ExceptT (Either GQExecError QErr) n a
doQErr (ExceptT QErr (ExceptT () m) AnnotatedResponsePart
 -> ExceptT
      (Either GQExecError QErr) (ExceptT () m) AnnotatedResponsePart)
-> ExceptT QErr (ExceptT () m) AnnotatedResponsePart
-> ExceptT
     (Either GQExecError QErr) (ExceptT () m) AnnotatedResponsePart
forall a b. (a -> b) -> a -> b
$ do
                      (DiffTime
telemTimeIO_DT, EncJSON
resp) <-
                        forall (c :: BackendType -> Constraint) (i :: BackendType -> *) r.
AllBackendsSatisfy c =>
AnyBackend i -> (forall (b :: BackendType). c b => i b -> r) -> r
AB.dispatchAnyBackend @BackendTransport
                          AnyBackend DBStepInfo
exists
                          \(EB.DBStepInfo SourceName
_ SourceConfig b
sourceConfig Maybe (PreparedQuery b)
genSql OnBaseMonad (ExecutionMonad b) (ActionResult b)
tx ResolvedConnectionTemplate b
resolvedConnectionTemplate :: EB.DBStepInfo b) ->
                            forall (b :: BackendType) (m :: * -> *).
(BackendTransport b, MonadIO m, MonadBaseControl IO m,
 MonadError QErr m, MonadQueryLog m, MonadExecutionLog m,
 MonadTrace m) =>
RequestId
-> GQLReqUnparsed
-> RootFieldAlias
-> UserInfo
-> Logger Hasura
-> Maybe (CredentialCache AgentLicenseKey)
-> SourceConfig b
-> OnBaseMonad
     (ExecutionMonad b) (Maybe (AnyBackend ExecutionStats), EncJSON)
-> Maybe (PreparedQuery b)
-> ResolvedConnectionTemplate b
-> m (DiffTime, EncJSON)
runDBQuery @b
                              RequestId
requestId
                              GQLReqUnparsed
q
                              RootFieldAlias
fieldName
                              UserInfo
userInfo
                              Logger Hasura
logger
                              Maybe (CredentialCache AgentLicenseKey)
agentLicenseKey
                              SourceConfig b
sourceConfig
                              ((ActionResult b -> (Maybe (AnyBackend ExecutionStats), EncJSON))
-> OnBaseMonad (ExecutionMonad b) (ActionResult b)
-> OnBaseMonad
     (ExecutionMonad b) (Maybe (AnyBackend ExecutionStats), EncJSON)
forall a b.
(a -> b)
-> OnBaseMonad (ExecutionMonad b) a
-> OnBaseMonad (ExecutionMonad b) b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall (b :: BackendType).
HasTag b =>
ActionResult b -> (Maybe (AnyBackend ExecutionStats), EncJSON)
statsToAnyBackend @b) OnBaseMonad (ExecutionMonad b) (ActionResult b)
tx)
                              Maybe (PreparedQuery b)
genSql
                              ResolvedConnectionTemplate b
resolvedConnectionTemplate
                      EncJSON
finalResponse <-
                        RequestId
-> Logger Hasura
-> Maybe (CredentialCache AgentLicenseKey)
-> Environment
-> [Header]
-> UserInfo
-> EncJSON
-> Maybe RemoteJoins
-> GQLReqUnparsed
-> ExceptT QErr (ExceptT () m) EncJSON
forall (m :: * -> *).
(MonadError QErr m, MonadIO m, MonadBaseControl IO m,
 MonadQueryTags m, MonadQueryLog m, MonadExecutionLog m,
 MonadTrace m, ProvidesNetwork m) =>
RequestId
-> Logger Hasura
-> Maybe (CredentialCache AgentLicenseKey)
-> Environment
-> [Header]
-> UserInfo
-> EncJSON
-> Maybe RemoteJoins
-> GQLReqUnparsed
-> m EncJSON
RJ.processRemoteJoins RequestId
requestId Logger Hasura
logger Maybe (CredentialCache AgentLicenseKey)
agentLicenseKey Environment
env [Header]
reqHdrs UserInfo
userInfo EncJSON
resp Maybe RemoteJoins
remoteJoins GQLReqUnparsed
q
                      AnnotatedResponsePart
-> ExceptT QErr (ExceptT () m) AnnotatedResponsePart
forall a. a -> ExceptT QErr (ExceptT () m) a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (AnnotatedResponsePart
 -> ExceptT QErr (ExceptT () m) AnnotatedResponsePart)
-> AnnotatedResponsePart
-> ExceptT QErr (ExceptT () m) AnnotatedResponsePart
forall a b. (a -> b) -> a -> b
$ DiffTime
-> Locality -> EncJSON -> [Header] -> AnnotatedResponsePart
AnnotatedResponsePart DiffTime
telemTimeIO_DT Locality
Telem.Local EncJSON
finalResponse []
                    E.ExecStepRemote RemoteSchemaInfo
rsi ResultCustomizer
resultCustomizer GQLReqOutgoing
gqlReq Maybe RemoteJoins
remoteJoins -> do
                      Logger Hasura
-> QueryLog -> ExceptT (Either GQExecError QErr) (ExceptT () m) ()
forall (m :: * -> *).
MonadQueryLog m =>
Logger Hasura -> QueryLog -> m ()
logQueryLog Logger Hasura
logger (QueryLog -> ExceptT (Either GQExecError QErr) (ExceptT () m) ())
-> QueryLog -> ExceptT (Either GQExecError QErr) (ExceptT () m) ()
forall a b. (a -> b) -> a -> b
$ GQLReqUnparsed
-> Maybe (RootFieldAlias, GeneratedQuery)
-> RequestId
-> QueryLogKind
-> QueryLog
QueryLog GQLReqUnparsed
q Maybe (RootFieldAlias, GeneratedQuery)
forall a. Maybe a
Nothing RequestId
requestId QueryLogKind
QueryLogKindRemoteSchema
                      RequestId
-> GQLReqUnparsed
-> RootFieldAlias
-> UserInfo
-> [Header]
-> RemoteSchemaInfo
-> ResultCustomizer
-> GQLReqOutgoing
-> Maybe RemoteJoins
-> ExceptT
     (Either GQExecError QErr) (ExceptT () m) AnnotatedResponsePart
runRemoteGQ RequestId
requestId GQLReqUnparsed
q RootFieldAlias
fieldName UserInfo
userInfo [Header]
reqHdrs RemoteSchemaInfo
rsi ResultCustomizer
resultCustomizer GQLReqOutgoing
gqlReq Maybe RemoteJoins
remoteJoins
                    E.ExecStepAction ActionExecutionPlan
actionExecPlan ActionsInfo
_ Maybe RemoteJoins
remoteJoins -> do
                      Logger Hasura
-> QueryLog -> ExceptT (Either GQExecError QErr) (ExceptT () m) ()
forall (m :: * -> *).
MonadQueryLog m =>
Logger Hasura -> QueryLog -> m ()
logQueryLog Logger Hasura
logger (QueryLog -> ExceptT (Either GQExecError QErr) (ExceptT () m) ())
-> QueryLog -> ExceptT (Either GQExecError QErr) (ExceptT () m) ()
forall a b. (a -> b) -> a -> b
$ GQLReqUnparsed
-> Maybe (RootFieldAlias, GeneratedQuery)
-> RequestId
-> QueryLogKind
-> QueryLog
QueryLog GQLReqUnparsed
q Maybe (RootFieldAlias, GeneratedQuery)
forall a. Maybe a
Nothing RequestId
requestId QueryLogKind
QueryLogKindAction
                      (DiffTime
time, (EncJSON
resp, Maybe [Header]
_)) <- ExceptT QErr (ExceptT () m) (DiffTime, (EncJSON, Maybe [Header]))
-> ExceptT
     (Either GQExecError QErr)
     (ExceptT () m)
     (DiffTime, (EncJSON, Maybe [Header]))
forall (n :: * -> *) a.
Monad n =>
ExceptT QErr n a -> ExceptT (Either GQExecError QErr) n a
doQErr (ExceptT QErr (ExceptT () m) (DiffTime, (EncJSON, Maybe [Header]))
 -> ExceptT
      (Either GQExecError QErr)
      (ExceptT () m)
      (DiffTime, (EncJSON, Maybe [Header])))
-> ExceptT
     QErr (ExceptT () m) (DiffTime, (EncJSON, Maybe [Header]))
-> ExceptT
     (Either GQExecError QErr)
     (ExceptT () m)
     (DiffTime, (EncJSON, Maybe [Header]))
forall a b. (a -> b) -> a -> b
$ do
                        (DiffTime
time, (EncJSON
resp, Maybe [Header]
hdrs)) <- UserInfo
-> ActionExecutionPlan
-> ExceptT
     QErr (ExceptT () m) (DiffTime, (EncJSON, Maybe [Header]))
forall (m :: * -> *).
(MonadIO m, MonadBaseControl IO m, MonadError QErr m, MonadTrace m,
 MonadMetadataStorage m) =>
UserInfo
-> ActionExecutionPlan -> m (DiffTime, (EncJSON, Maybe [Header]))
EA.runActionExecution UserInfo
userInfo ActionExecutionPlan
actionExecPlan
                        EncJSON
finalResponse <-
                          RequestId
-> Logger Hasura
-> Maybe (CredentialCache AgentLicenseKey)
-> Environment
-> [Header]
-> UserInfo
-> EncJSON
-> Maybe RemoteJoins
-> GQLReqUnparsed
-> ExceptT QErr (ExceptT () m) EncJSON
forall (m :: * -> *).
(MonadError QErr m, MonadIO m, MonadBaseControl IO m,
 MonadQueryTags m, MonadQueryLog m, MonadExecutionLog m,
 MonadTrace m, ProvidesNetwork m) =>
RequestId
-> Logger Hasura
-> Maybe (CredentialCache AgentLicenseKey)
-> Environment
-> [Header]
-> UserInfo
-> EncJSON
-> Maybe RemoteJoins
-> GQLReqUnparsed
-> m EncJSON
RJ.processRemoteJoins RequestId
requestId Logger Hasura
logger Maybe (CredentialCache AgentLicenseKey)
agentLicenseKey Environment
env [Header]
reqHdrs UserInfo
userInfo EncJSON
resp Maybe RemoteJoins
remoteJoins GQLReqUnparsed
q
                        (DiffTime, (EncJSON, Maybe [Header]))
-> ExceptT
     QErr (ExceptT () m) (DiffTime, (EncJSON, Maybe [Header]))
forall a. a -> ExceptT QErr (ExceptT () m) a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (DiffTime
time, (EncJSON
finalResponse, Maybe [Header]
hdrs))
                      AnnotatedResponsePart
-> ExceptT
     (Either GQExecError QErr) (ExceptT () m) AnnotatedResponsePart
forall a. a -> ExceptT (Either GQExecError QErr) (ExceptT () m) a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (AnnotatedResponsePart
 -> ExceptT
      (Either GQExecError QErr) (ExceptT () m) AnnotatedResponsePart)
-> AnnotatedResponsePart
-> ExceptT
     (Either GQExecError QErr) (ExceptT () m) AnnotatedResponsePart
forall a b. (a -> b) -> a -> b
$ DiffTime
-> Locality -> EncJSON -> [Header] -> AnnotatedResponsePart
AnnotatedResponsePart DiffTime
time Locality
Telem.Empty EncJSON
resp []
                    E.ExecStepRaw Value
json -> do
                      Logger Hasura
-> QueryLog -> ExceptT (Either GQExecError QErr) (ExceptT () m) ()
forall (m :: * -> *).
MonadQueryLog m =>
Logger Hasura -> QueryLog -> m ()
logQueryLog Logger Hasura
logger (QueryLog -> ExceptT (Either GQExecError QErr) (ExceptT () m) ())
-> QueryLog -> ExceptT (Either GQExecError QErr) (ExceptT () m) ()
forall a b. (a -> b) -> a -> b
$ GQLReqUnparsed
-> Maybe (RootFieldAlias, GeneratedQuery)
-> RequestId
-> QueryLogKind
-> QueryLog
QueryLog GQLReqUnparsed
q Maybe (RootFieldAlias, GeneratedQuery)
forall a. Maybe a
Nothing RequestId
requestId QueryLogKind
QueryLogKindIntrospection
                      Value
-> ExceptT
     (Either GQExecError QErr) (ExceptT () m) AnnotatedResponsePart
forall (m :: * -> *).
Applicative m =>
Value -> m AnnotatedResponsePart
buildRaw Value
json
                    E.ExecStepMulti [ExecutionStep]
lst -> do
                      [AnnotatedResponsePart]
allResponses <- (ExecutionStep
 -> ExceptT
      (Either GQExecError QErr) (ExceptT () m) AnnotatedResponsePart)
-> [ExecutionStep]
-> ExceptT
     (Either GQExecError QErr) (ExceptT () m) [AnnotatedResponsePart]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> [a] -> f [b]
traverse ExecutionStep
-> ExceptT
     (Either GQExecError QErr) (ExceptT () m) AnnotatedResponsePart
getResponse [ExecutionStep]
lst
                      AnnotatedResponsePart
-> ExceptT
     (Either GQExecError QErr) (ExceptT () m) AnnotatedResponsePart
forall a. a -> ExceptT (Either GQExecError QErr) (ExceptT () m) a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (AnnotatedResponsePart
 -> ExceptT
      (Either GQExecError QErr) (ExceptT () m) AnnotatedResponsePart)
-> AnnotatedResponsePart
-> ExceptT
     (Either GQExecError QErr) (ExceptT () m) AnnotatedResponsePart
forall a b. (a -> b) -> a -> b
$ DiffTime
-> Locality -> EncJSON -> [Header] -> AnnotatedResponsePart
AnnotatedResponsePart DiffTime
0 Locality
Telem.Local ([EncJSON] -> EncJSON
encJFromList ((AnnotatedResponsePart -> EncJSON)
-> [AnnotatedResponsePart] -> [EncJSON]
forall a b. (a -> b) -> [a] -> [b]
map AnnotatedResponsePart -> EncJSON
arpResponse [AnnotatedResponsePart]
allResponses)) []
               in ExecutionStep
-> ExceptT
     (Either GQExecError QErr) (ExceptT () m) AnnotatedResponsePart
getResponse
          QueryType
-> ExceptT () m DiffTime
-> RequestId
-> Either
     (Either GQExecError QErr)
     (InsOrdHashMap RootFieldAlias AnnotatedResponsePart)
-> Maybe OperationName
-> ParameterizedQueryHash
-> OperationType
-> ExceptT () m ()
sendResultFromFragments QueryType
Telem.Query ExceptT () m DiffTime
timerTot RequestId
requestId Either
  (Either GQExecError QErr)
  (InsOrdHashMap RootFieldAlias AnnotatedResponsePart)
conclusion Maybe OperationName
opName ParameterizedQueryHash
parameterizedQueryHash OperationType
gqlOpType
          case (Maybe ResponseCacher
storeResponseM, Either
  (Either GQExecError QErr)
  (InsOrdHashMap RootFieldAlias AnnotatedResponsePart)
conclusion) of
            (Just ResponseCacher {forall (m :: * -> *).
(MonadTrace m, MonadIO m) =>
EncJSON -> m (Either QErr CacheStoreResponse)
runStoreResponse :: forall (m :: * -> *).
(MonadTrace m, MonadIO m) =>
EncJSON -> m (Either QErr CacheStoreResponse)
runStoreResponse :: ResponseCacher
-> forall (m :: * -> *).
   (MonadTrace m, MonadIO m) =>
   EncJSON -> m (Either QErr CacheStoreResponse)
..}, Right InsOrdHashMap RootFieldAlias AnnotatedResponsePart
results) ->
              -- Note: The result of `runStoreResponse` is ignored here since we can't ensure that
              --       the WS client will respond correctly to multiple messages.
              ExceptT () m (Either QErr CacheStoreResponse) -> ExceptT () m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void
                (ExceptT () m (Either QErr CacheStoreResponse) -> ExceptT () m ())
-> ExceptT () m (Either QErr CacheStoreResponse) -> ExceptT () m ()
forall a b. (a -> b) -> a -> b
$ EncJSON -> ExceptT () m (Either QErr CacheStoreResponse)
forall (m :: * -> *).
(MonadTrace m, MonadIO m) =>
EncJSON -> m (Either QErr CacheStoreResponse)
runStoreResponse
                (EncJSON -> ExceptT () m (Either QErr CacheStoreResponse))
-> EncJSON -> ExceptT () m (Either QErr CacheStoreResponse)
forall a b. (a -> b) -> a -> b
$ InsOrdHashMap RootFieldAlias AnnotatedResponsePart -> EncJSON
encodeAnnotatedResponseParts InsOrdHashMap RootFieldAlias AnnotatedResponsePart
results
            (Maybe ResponseCacher,
 Either
   (Either GQExecError QErr)
   (InsOrdHashMap RootFieldAlias AnnotatedResponsePart))
_ -> () -> ExceptT () m ()
forall a. a -> ExceptT () m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

      IO () -> ExceptT () m ()
forall a. IO a -> ExceptT () m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ExceptT () m ()) -> IO () -> ExceptT () m ()
forall a b. (a -> b) -> a -> b
$ Maybe RequestId -> Maybe ParameterizedQueryHash -> IO ()
sendCompleted (RequestId -> Maybe RequestId
forall a. a -> Maybe a
Just RequestId
requestId) (ParameterizedQueryHash -> Maybe ParameterizedQueryHash
forall a. a -> Maybe a
Just ParameterizedQueryHash
parameterizedQueryHash)
    E.MutationExecutionPlan ExecutionPlan
mutationPlan -> do
      -- See Note [Backwards-compatible transaction optimisation]
      case ExecutionPlan
-> Maybe
     (SourceConfig ('Postgres 'Vanilla),
      ResolvedConnectionTemplate ('Postgres 'Vanilla),
      InsOrdHashMap RootFieldAlias (DBStepInfo ('Postgres 'Vanilla)))
coalescePostgresMutations ExecutionPlan
mutationPlan of
        -- we are in the aforementioned case; we circumvent the normal process
        Just (SourceConfig ('Postgres 'Vanilla)
sourceConfig, ResolvedConnectionTemplate ('Postgres 'Vanilla)
resolvedConnectionTemplate, InsOrdHashMap RootFieldAlias (DBStepInfo ('Postgres 'Vanilla))
pgMutations) -> do
          Either (Either GQExecError QErr) (DiffTime, RootFieldMap EncJSON)
resp <-
            ExceptT
  (Either GQExecError QErr)
  (ExceptT () m)
  (DiffTime, RootFieldMap EncJSON)
-> ExceptT
     ()
     m
     (Either (Either GQExecError QErr) (DiffTime, RootFieldMap EncJSON))
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT
              (ExceptT
   (Either GQExecError QErr)
   (ExceptT () m)
   (DiffTime, RootFieldMap EncJSON)
 -> ExceptT
      ()
      m
      (Either
         (Either GQExecError QErr) (DiffTime, RootFieldMap EncJSON)))
-> ExceptT
     (Either GQExecError QErr)
     (ExceptT () m)
     (DiffTime, RootFieldMap EncJSON)
-> ExceptT
     ()
     m
     (Either (Either GQExecError QErr) (DiffTime, RootFieldMap EncJSON))
forall a b. (a -> b) -> a -> b
$ ExceptT
  (Either GQExecError QErr)
  (ExceptT () m)
  (DiffTime, RootFieldMap EncJSON)
-> ExceptT
     (Either GQExecError QErr)
     (ExceptT () m)
     (DiffTime, RootFieldMap EncJSON)
forall a.
ExceptT (Either GQExecError QErr) (ExceptT () m) a
-> ExceptT (Either GQExecError QErr) (ExceptT () m) a
runLimits
              (ExceptT
   (Either GQExecError QErr)
   (ExceptT () m)
   (DiffTime, RootFieldMap EncJSON)
 -> ExceptT
      (Either GQExecError QErr)
      (ExceptT () m)
      (DiffTime, RootFieldMap EncJSON))
-> ExceptT
     (Either GQExecError QErr)
     (ExceptT () m)
     (DiffTime, RootFieldMap EncJSON)
-> ExceptT
     (Either GQExecError QErr)
     (ExceptT () m)
     (DiffTime, RootFieldMap EncJSON)
forall a b. (a -> b) -> a -> b
$ ExceptT QErr (ExceptT () m) (DiffTime, RootFieldMap EncJSON)
-> ExceptT
     (Either GQExecError QErr)
     (ExceptT () m)
     (DiffTime, RootFieldMap EncJSON)
forall (n :: * -> *) a.
Monad n =>
ExceptT QErr n a -> ExceptT (Either GQExecError QErr) n a
doQErr
              (ExceptT QErr (ExceptT () m) (DiffTime, RootFieldMap EncJSON)
 -> ExceptT
      (Either GQExecError QErr)
      (ExceptT () m)
      (DiffTime, RootFieldMap EncJSON))
-> ExceptT QErr (ExceptT () m) (DiffTime, RootFieldMap EncJSON)
-> ExceptT
     (Either GQExecError QErr)
     (ExceptT () m)
     (DiffTime, RootFieldMap EncJSON)
forall a b. (a -> b) -> a -> b
$ RequestId
-> GQLReqUnparsed
-> UserInfo
-> Logger Hasura
-> SourceConfig ('Postgres 'Vanilla)
-> ResolvedConnectionTemplate ('Postgres 'Vanilla)
-> InsOrdHashMap RootFieldAlias (DBStepInfo ('Postgres 'Vanilla))
-> ExceptT QErr (ExceptT () m) (DiffTime, RootFieldMap EncJSON)
forall (pgKind :: PostgresKind) (m :: * -> *).
(HasTag ('Postgres pgKind), MonadIO m, MonadBaseControl IO m,
 MonadError QErr m, MonadQueryLog m, MonadTrace m) =>
RequestId
-> GQLReqUnparsed
-> UserInfo
-> Logger Hasura
-> SourceConfig ('Postgres pgKind)
-> ResolvedConnectionTemplate ('Postgres pgKind)
-> RootFieldMap (DBStepInfo ('Postgres pgKind))
-> m (DiffTime, RootFieldMap EncJSON)
runPGMutationTransaction RequestId
requestId GQLReqUnparsed
q UserInfo
userInfo Logger Hasura
logger SourceConfig ('Postgres 'Vanilla)
sourceConfig ResolvedConnectionTemplate ('Postgres 'Vanilla)
resolvedConnectionTemplate InsOrdHashMap RootFieldAlias (DBStepInfo ('Postgres 'Vanilla))
pgMutations
          -- we do not construct result fragments since we have only one result
          RequestId
-> OperationType
-> Either
     (Either GQExecError QErr) (DiffTime, RootFieldMap EncJSON)
-> ((DiffTime, RootFieldMap EncJSON) -> ExceptT () m ())
-> ExceptT () m ()
forall a.
RequestId
-> OperationType
-> Either (Either GQExecError QErr) a
-> (a -> ExceptT () m ())
-> ExceptT () m ()
handleResult RequestId
requestId OperationType
gqlOpType Either (Either GQExecError QErr) (DiffTime, RootFieldMap EncJSON)
resp \(DiffTime
telemTimeIO_DT, RootFieldMap EncJSON
results) -> do
            let telemQueryType :: QueryType
telemQueryType = QueryType
Telem.Query
                telemLocality :: Locality
telemLocality = Locality
Telem.Local
                telemTimeIO :: Seconds
telemTimeIO = DiffTime -> Seconds
forall x y. (Duration x, Duration y) => x -> y
convertDuration DiffTime
telemTimeIO_DT
            DiffTime
totalTime <- ExceptT () m DiffTime
timerTot
            let telemTimeTot :: Seconds
telemTimeTot = DiffTime -> Seconds
Seconds DiffTime
totalTime
            EncJSON
-> Maybe OperationName
-> ParameterizedQueryHash
-> SubscriptionMetadata
-> ExceptT () m ()
sendSuccResp (RootFieldMap EncJSON -> EncJSON
encodeEncJSONResults RootFieldMap EncJSON
results) Maybe OperationName
opName ParameterizedQueryHash
parameterizedQueryHash
              (SubscriptionMetadata -> ExceptT () m ())
-> SubscriptionMetadata -> ExceptT () m ()
forall a b. (a -> b) -> a -> b
$ DiffTime -> SubscriptionMetadata
ES.SubscriptionMetadata DiffTime
telemTimeIO_DT
            -- Telemetry. NOTE: don't time network IO:
            RequestDimensions -> RequestTimings -> ExceptT () m ()
forall (m :: * -> *).
MonadIO m =>
RequestDimensions -> RequestTimings -> m ()
Telem.recordTimingMetric Telem.RequestDimensions {Transport
Locality
QueryType
telemQueryType :: QueryType
telemLocality :: Locality
telemTransport :: Transport
$sel:telemQueryType:RequestDimensions :: QueryType
$sel:telemLocality:RequestDimensions :: Locality
$sel:telemTransport:RequestDimensions :: Transport
..} Telem.RequestTimings {Seconds
telemTimeIO :: Seconds
telemTimeTot :: Seconds
$sel:telemTimeIO:RequestTimings :: Seconds
$sel:telemTimeTot:RequestTimings :: Seconds
..}
            IO () -> ExceptT () m ()
forall a. IO a -> ExceptT () m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ExceptT () m ()) -> IO () -> ExceptT () m ()
forall a b. (a -> b) -> a -> b
$ DiffTime -> OperationType -> IO ()
recordGQLQuerySuccess DiffTime
totalTime OperationType
gqlOpType

        -- we are not in the transaction case; proceeding normally
        Maybe
  (SourceConfig ('Postgres 'Vanilla),
   ResolvedConnectionTemplate ('Postgres 'Vanilla),
   InsOrdHashMap RootFieldAlias (DBStepInfo ('Postgres 'Vanilla)))
Nothing -> do
          Either
  (Either GQExecError QErr)
  (InsOrdHashMap RootFieldAlias AnnotatedResponsePart)
conclusion <- ExceptT
  (Either GQExecError QErr)
  (ExceptT () m)
  (InsOrdHashMap RootFieldAlias AnnotatedResponsePart)
-> ExceptT
     ()
     m
     (Either
        (Either GQExecError QErr)
        (InsOrdHashMap RootFieldAlias AnnotatedResponsePart))
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT
            (ExceptT
   (Either GQExecError QErr)
   (ExceptT () m)
   (InsOrdHashMap RootFieldAlias AnnotatedResponsePart)
 -> ExceptT
      ()
      m
      (Either
         (Either GQExecError QErr)
         (InsOrdHashMap RootFieldAlias AnnotatedResponsePart)))
-> ExceptT
     (Either GQExecError QErr)
     (ExceptT () m)
     (InsOrdHashMap RootFieldAlias AnnotatedResponsePart)
-> ExceptT
     ()
     m
     (Either
        (Either GQExecError QErr)
        (InsOrdHashMap RootFieldAlias AnnotatedResponsePart))
forall a b. (a -> b) -> a -> b
$ ExceptT
  (Either GQExecError QErr)
  (ExceptT () m)
  (InsOrdHashMap RootFieldAlias AnnotatedResponsePart)
-> ExceptT
     (Either GQExecError QErr)
     (ExceptT () m)
     (InsOrdHashMap RootFieldAlias AnnotatedResponsePart)
forall a.
ExceptT (Either GQExecError QErr) (ExceptT () m) a
-> ExceptT (Either GQExecError QErr) (ExceptT () m) a
runLimits
            (ExceptT
   (Either GQExecError QErr)
   (ExceptT () m)
   (InsOrdHashMap RootFieldAlias AnnotatedResponsePart)
 -> ExceptT
      (Either GQExecError QErr)
      (ExceptT () m)
      (InsOrdHashMap RootFieldAlias AnnotatedResponsePart))
-> ExceptT
     (Either GQExecError QErr)
     (ExceptT () m)
     (InsOrdHashMap RootFieldAlias AnnotatedResponsePart)
-> ExceptT
     (Either GQExecError QErr)
     (ExceptT () m)
     (InsOrdHashMap RootFieldAlias AnnotatedResponsePart)
forall a b. (a -> b) -> a -> b
$ ExecutionPlan
-> (RootFieldAlias
    -> ExecutionStep
    -> ExceptT
         (Either GQExecError QErr) (ExceptT () m) AnnotatedResponsePart)
-> ExceptT
     (Either GQExecError QErr)
     (ExceptT () m)
     (InsOrdHashMap RootFieldAlias AnnotatedResponsePart)
forall {k} {a} {b}.
InsOrdHashMap k a
-> (k -> a -> ExceptT (Either GQExecError QErr) (ExceptT () m) b)
-> ExceptT
     (Either GQExecError QErr) (ExceptT () m) (InsOrdHashMap k b)
forWithKey ExecutionPlan
mutationPlan
            ((RootFieldAlias
  -> ExecutionStep
  -> ExceptT
       (Either GQExecError QErr) (ExceptT () m) AnnotatedResponsePart)
 -> ExceptT
      (Either GQExecError QErr)
      (ExceptT () m)
      (InsOrdHashMap RootFieldAlias AnnotatedResponsePart))
-> (RootFieldAlias
    -> ExecutionStep
    -> ExceptT
         (Either GQExecError QErr) (ExceptT () m) AnnotatedResponsePart)
-> ExceptT
     (Either GQExecError QErr)
     (ExceptT () m)
     (InsOrdHashMap RootFieldAlias AnnotatedResponsePart)
forall a b. (a -> b) -> a -> b
$ \RootFieldAlias
fieldName ->
              let getResponse :: ExecutionStep
-> ExceptT
     (Either GQExecError QErr) (ExceptT () m) AnnotatedResponsePart
getResponse = \case
                    -- Ignoring response headers since we can't send them over WebSocket
                    E.ExecStepDB [Header]
_responseHeaders AnyBackend DBStepInfo
exists Maybe RemoteJoins
remoteJoins -> ExceptT QErr (ExceptT () m) AnnotatedResponsePart
-> ExceptT
     (Either GQExecError QErr) (ExceptT () m) AnnotatedResponsePart
forall (n :: * -> *) a.
Monad n =>
ExceptT QErr n a -> ExceptT (Either GQExecError QErr) n a
doQErr (ExceptT QErr (ExceptT () m) AnnotatedResponsePart
 -> ExceptT
      (Either GQExecError QErr) (ExceptT () m) AnnotatedResponsePart)
-> ExceptT QErr (ExceptT () m) AnnotatedResponsePart
-> ExceptT
     (Either GQExecError QErr) (ExceptT () m) AnnotatedResponsePart
forall a b. (a -> b) -> a -> b
$ do
                      (DiffTime
telemTimeIO_DT, EncJSON
resp) <-
                        forall (c :: BackendType -> Constraint) (i :: BackendType -> *) r.
AllBackendsSatisfy c =>
AnyBackend i -> (forall (b :: BackendType). c b => i b -> r) -> r
AB.dispatchAnyBackend @BackendTransport
                          AnyBackend DBStepInfo
exists
                          \(EB.DBStepInfo SourceName
_ SourceConfig b
sourceConfig Maybe (PreparedQuery b)
genSql OnBaseMonad (ExecutionMonad b) (ActionResult b)
tx ResolvedConnectionTemplate b
resolvedConnectionTemplate :: EB.DBStepInfo b) ->
                            forall (b :: BackendType) (m :: * -> *).
(BackendTransport b, MonadIO m, MonadBaseControl IO m,
 MonadError QErr m, MonadQueryLog m, MonadTrace m) =>
RequestId
-> GQLReqUnparsed
-> RootFieldAlias
-> UserInfo
-> Logger Hasura
-> Maybe (CredentialCache AgentLicenseKey)
-> SourceConfig b
-> OnBaseMonad (ExecutionMonad b) EncJSON
-> Maybe (PreparedQuery b)
-> ResolvedConnectionTemplate b
-> m (DiffTime, EncJSON)
runDBMutation @b
                              RequestId
requestId
                              GQLReqUnparsed
q
                              RootFieldAlias
fieldName
                              UserInfo
userInfo
                              Logger Hasura
logger
                              Maybe (CredentialCache AgentLicenseKey)
agentLicenseKey
                              SourceConfig b
sourceConfig
                              ((ActionResult b -> EncJSON)
-> OnBaseMonad (ExecutionMonad b) (ActionResult b)
-> OnBaseMonad (ExecutionMonad b) EncJSON
forall a b.
(a -> b)
-> OnBaseMonad (ExecutionMonad b) a
-> OnBaseMonad (ExecutionMonad b) b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ActionResult b -> EncJSON
forall (b :: BackendType). ActionResult b -> EncJSON
EB.arResult OnBaseMonad (ExecutionMonad b) (ActionResult b)
tx)
                              Maybe (PreparedQuery b)
genSql
                              ResolvedConnectionTemplate b
resolvedConnectionTemplate
                      EncJSON
finalResponse <-
                        RequestId
-> Logger Hasura
-> Maybe (CredentialCache AgentLicenseKey)
-> Environment
-> [Header]
-> UserInfo
-> EncJSON
-> Maybe RemoteJoins
-> GQLReqUnparsed
-> ExceptT QErr (ExceptT () m) EncJSON
forall (m :: * -> *).
(MonadError QErr m, MonadIO m, MonadBaseControl IO m,
 MonadQueryTags m, MonadQueryLog m, MonadExecutionLog m,
 MonadTrace m, ProvidesNetwork m) =>
RequestId
-> Logger Hasura
-> Maybe (CredentialCache AgentLicenseKey)
-> Environment
-> [Header]
-> UserInfo
-> EncJSON
-> Maybe RemoteJoins
-> GQLReqUnparsed
-> m EncJSON
RJ.processRemoteJoins RequestId
requestId Logger Hasura
logger Maybe (CredentialCache AgentLicenseKey)
agentLicenseKey Environment
env [Header]
reqHdrs UserInfo
userInfo EncJSON
resp Maybe RemoteJoins
remoteJoins GQLReqUnparsed
q
                      AnnotatedResponsePart
-> ExceptT QErr (ExceptT () m) AnnotatedResponsePart
forall a. a -> ExceptT QErr (ExceptT () m) a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (AnnotatedResponsePart
 -> ExceptT QErr (ExceptT () m) AnnotatedResponsePart)
-> AnnotatedResponsePart
-> ExceptT QErr (ExceptT () m) AnnotatedResponsePart
forall a b. (a -> b) -> a -> b
$ DiffTime
-> Locality -> EncJSON -> [Header] -> AnnotatedResponsePart
AnnotatedResponsePart DiffTime
telemTimeIO_DT Locality
Telem.Local EncJSON
finalResponse []
                    E.ExecStepAction ActionExecutionPlan
actionExecPlan ActionsInfo
_ Maybe RemoteJoins
remoteJoins -> do
                      Logger Hasura
-> QueryLog -> ExceptT (Either GQExecError QErr) (ExceptT () m) ()
forall (m :: * -> *).
MonadQueryLog m =>
Logger Hasura -> QueryLog -> m ()
logQueryLog Logger Hasura
logger (QueryLog -> ExceptT (Either GQExecError QErr) (ExceptT () m) ())
-> QueryLog -> ExceptT (Either GQExecError QErr) (ExceptT () m) ()
forall a b. (a -> b) -> a -> b
$ GQLReqUnparsed
-> Maybe (RootFieldAlias, GeneratedQuery)
-> RequestId
-> QueryLogKind
-> QueryLog
QueryLog GQLReqUnparsed
q Maybe (RootFieldAlias, GeneratedQuery)
forall a. Maybe a
Nothing RequestId
requestId QueryLogKind
QueryLogKindAction
                      (DiffTime
time, (EncJSON
resp, Maybe [Header]
hdrs)) <- ExceptT QErr (ExceptT () m) (DiffTime, (EncJSON, Maybe [Header]))
-> ExceptT
     (Either GQExecError QErr)
     (ExceptT () m)
     (DiffTime, (EncJSON, Maybe [Header]))
forall (n :: * -> *) a.
Monad n =>
ExceptT QErr n a -> ExceptT (Either GQExecError QErr) n a
doQErr (ExceptT QErr (ExceptT () m) (DiffTime, (EncJSON, Maybe [Header]))
 -> ExceptT
      (Either GQExecError QErr)
      (ExceptT () m)
      (DiffTime, (EncJSON, Maybe [Header])))
-> ExceptT
     QErr (ExceptT () m) (DiffTime, (EncJSON, Maybe [Header]))
-> ExceptT
     (Either GQExecError QErr)
     (ExceptT () m)
     (DiffTime, (EncJSON, Maybe [Header]))
forall a b. (a -> b) -> a -> b
$ do
                        (DiffTime
time, (EncJSON
resp, Maybe [Header]
hdrs)) <- UserInfo
-> ActionExecutionPlan
-> ExceptT
     QErr (ExceptT () m) (DiffTime, (EncJSON, Maybe [Header]))
forall (m :: * -> *).
(MonadIO m, MonadBaseControl IO m, MonadError QErr m, MonadTrace m,
 MonadMetadataStorage m) =>
UserInfo
-> ActionExecutionPlan -> m (DiffTime, (EncJSON, Maybe [Header]))
EA.runActionExecution UserInfo
userInfo ActionExecutionPlan
actionExecPlan
                        EncJSON
finalResponse <-
                          RequestId
-> Logger Hasura
-> Maybe (CredentialCache AgentLicenseKey)
-> Environment
-> [Header]
-> UserInfo
-> EncJSON
-> Maybe RemoteJoins
-> GQLReqUnparsed
-> ExceptT QErr (ExceptT () m) EncJSON
forall (m :: * -> *).
(MonadError QErr m, MonadIO m, MonadBaseControl IO m,
 MonadQueryTags m, MonadQueryLog m, MonadExecutionLog m,
 MonadTrace m, ProvidesNetwork m) =>
RequestId
-> Logger Hasura
-> Maybe (CredentialCache AgentLicenseKey)
-> Environment
-> [Header]
-> UserInfo
-> EncJSON
-> Maybe RemoteJoins
-> GQLReqUnparsed
-> m EncJSON
RJ.processRemoteJoins RequestId
requestId Logger Hasura
logger Maybe (CredentialCache AgentLicenseKey)
agentLicenseKey Environment
env [Header]
reqHdrs UserInfo
userInfo EncJSON
resp Maybe RemoteJoins
remoteJoins GQLReqUnparsed
q
                        (DiffTime, (EncJSON, Maybe [Header]))
-> ExceptT
     QErr (ExceptT () m) (DiffTime, (EncJSON, Maybe [Header]))
forall a. a -> ExceptT QErr (ExceptT () m) a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (DiffTime
time, (EncJSON
finalResponse, Maybe [Header]
hdrs))
                      AnnotatedResponsePart
-> ExceptT
     (Either GQExecError QErr) (ExceptT () m) AnnotatedResponsePart
forall a. a -> ExceptT (Either GQExecError QErr) (ExceptT () m) a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (AnnotatedResponsePart
 -> ExceptT
      (Either GQExecError QErr) (ExceptT () m) AnnotatedResponsePart)
-> AnnotatedResponsePart
-> ExceptT
     (Either GQExecError QErr) (ExceptT () m) AnnotatedResponsePart
forall a b. (a -> b) -> a -> b
$ DiffTime
-> Locality -> EncJSON -> [Header] -> AnnotatedResponsePart
AnnotatedResponsePart DiffTime
time Locality
Telem.Empty EncJSON
resp ([Header] -> AnnotatedResponsePart)
-> [Header] -> AnnotatedResponsePart
forall a b. (a -> b) -> a -> b
$ [Header] -> Maybe [Header] -> [Header]
forall a. a -> Maybe a -> a
fromMaybe [] Maybe [Header]
hdrs
                    E.ExecStepRemote RemoteSchemaInfo
rsi ResultCustomizer
resultCustomizer GQLReqOutgoing
gqlReq Maybe RemoteJoins
remoteJoins -> do
                      Logger Hasura
-> QueryLog -> ExceptT (Either GQExecError QErr) (ExceptT () m) ()
forall (m :: * -> *).
MonadQueryLog m =>
Logger Hasura -> QueryLog -> m ()
logQueryLog Logger Hasura
logger (QueryLog -> ExceptT (Either GQExecError QErr) (ExceptT () m) ())
-> QueryLog -> ExceptT (Either GQExecError QErr) (ExceptT () m) ()
forall a b. (a -> b) -> a -> b
$ GQLReqUnparsed
-> Maybe (RootFieldAlias, GeneratedQuery)
-> RequestId
-> QueryLogKind
-> QueryLog
QueryLog GQLReqUnparsed
q Maybe (RootFieldAlias, GeneratedQuery)
forall a. Maybe a
Nothing RequestId
requestId QueryLogKind
QueryLogKindRemoteSchema
                      RequestId
-> GQLReqUnparsed
-> RootFieldAlias
-> UserInfo
-> [Header]
-> RemoteSchemaInfo
-> ResultCustomizer
-> GQLReqOutgoing
-> Maybe RemoteJoins
-> ExceptT
     (Either GQExecError QErr) (ExceptT () m) AnnotatedResponsePart
runRemoteGQ RequestId
requestId GQLReqUnparsed
q RootFieldAlias
fieldName UserInfo
userInfo [Header]
reqHdrs RemoteSchemaInfo
rsi ResultCustomizer
resultCustomizer GQLReqOutgoing
gqlReq Maybe RemoteJoins
remoteJoins
                    E.ExecStepRaw Value
json -> do
                      Logger Hasura
-> QueryLog -> ExceptT (Either GQExecError QErr) (ExceptT () m) ()
forall (m :: * -> *).
MonadQueryLog m =>
Logger Hasura -> QueryLog -> m ()
logQueryLog Logger Hasura
logger (QueryLog -> ExceptT (Either GQExecError QErr) (ExceptT () m) ())
-> QueryLog -> ExceptT (Either GQExecError QErr) (ExceptT () m) ()
forall a b. (a -> b) -> a -> b
$ GQLReqUnparsed
-> Maybe (RootFieldAlias, GeneratedQuery)
-> RequestId
-> QueryLogKind
-> QueryLog
QueryLog GQLReqUnparsed
q Maybe (RootFieldAlias, GeneratedQuery)
forall a. Maybe a
Nothing RequestId
requestId QueryLogKind
QueryLogKindIntrospection
                      Value
-> ExceptT
     (Either GQExecError QErr) (ExceptT () m) AnnotatedResponsePart
forall (m :: * -> *).
Applicative m =>
Value -> m AnnotatedResponsePart
buildRaw Value
json
                    E.ExecStepMulti [ExecutionStep]
lst -> do
                      [AnnotatedResponsePart]
allResponses <- (ExecutionStep
 -> ExceptT
      (Either GQExecError QErr) (ExceptT () m) AnnotatedResponsePart)
-> [ExecutionStep]
-> ExceptT
     (Either GQExecError QErr) (ExceptT () m) [AnnotatedResponsePart]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> [a] -> f [b]
traverse ExecutionStep
-> ExceptT
     (Either GQExecError QErr) (ExceptT () m) AnnotatedResponsePart
getResponse [ExecutionStep]
lst
                      AnnotatedResponsePart
-> ExceptT
     (Either GQExecError QErr) (ExceptT () m) AnnotatedResponsePart
forall a. a -> ExceptT (Either GQExecError QErr) (ExceptT () m) a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (AnnotatedResponsePart
 -> ExceptT
      (Either GQExecError QErr) (ExceptT () m) AnnotatedResponsePart)
-> AnnotatedResponsePart
-> ExceptT
     (Either GQExecError QErr) (ExceptT () m) AnnotatedResponsePart
forall a b. (a -> b) -> a -> b
$ DiffTime
-> Locality -> EncJSON -> [Header] -> AnnotatedResponsePart
AnnotatedResponsePart DiffTime
0 Locality
Telem.Local ([EncJSON] -> EncJSON
encJFromList ((AnnotatedResponsePart -> EncJSON)
-> [AnnotatedResponsePart] -> [EncJSON]
forall a b. (a -> b) -> [a] -> [b]
map AnnotatedResponsePart -> EncJSON
arpResponse [AnnotatedResponsePart]
allResponses)) []
               in ExecutionStep
-> ExceptT
     (Either GQExecError QErr) (ExceptT () m) AnnotatedResponsePart
getResponse
          QueryType
-> ExceptT () m DiffTime
-> RequestId
-> Either
     (Either GQExecError QErr)
     (InsOrdHashMap RootFieldAlias AnnotatedResponsePart)
-> Maybe OperationName
-> ParameterizedQueryHash
-> OperationType
-> ExceptT () m ()
sendResultFromFragments QueryType
Telem.Query ExceptT () m DiffTime
timerTot RequestId
requestId Either
  (Either GQExecError QErr)
  (InsOrdHashMap RootFieldAlias AnnotatedResponsePart)
conclusion Maybe OperationName
opName ParameterizedQueryHash
parameterizedQueryHash OperationType
gqlOpType
      IO () -> ExceptT () m ()
forall a. IO a -> ExceptT () m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ExceptT () m ()) -> IO () -> ExceptT () m ()
forall a b. (a -> b) -> a -> b
$ Maybe RequestId -> Maybe ParameterizedQueryHash -> IO ()
sendCompleted (RequestId -> Maybe RequestId
forall a. a -> Maybe a
Just RequestId
requestId) (ParameterizedQueryHash -> Maybe ParameterizedQueryHash
forall a. a -> Maybe a
Just ParameterizedQueryHash
parameterizedQueryHash)
    E.SubscriptionExecutionPlan (SubscriptionExecution
subExec, Maybe (Endo Value)
modifier) -> do
      case SubscriptionExecution
subExec of
        E.SEAsyncActionsWithNoRelationships RootFieldMap (ActionId, ActionLogResponse -> Either QErr EncJSON)
actions -> do
          Logger Hasura -> QueryLog -> ExceptT () m ()
forall (m :: * -> *).
MonadQueryLog m =>
Logger Hasura -> QueryLog -> m ()
logQueryLog Logger Hasura
logger (QueryLog -> ExceptT () m ()) -> QueryLog -> ExceptT () m ()
forall a b. (a -> b) -> a -> b
$ GQLReqUnparsed
-> Maybe (RootFieldAlias, GeneratedQuery)
-> RequestId
-> QueryLogKind
-> QueryLog
QueryLog GQLReqUnparsed
q Maybe (RootFieldAlias, GeneratedQuery)
forall a. Maybe a
Nothing RequestId
requestId QueryLogKind
QueryLogKindAction
          IO () -> ExceptT () m ()
forall a. IO a -> ExceptT () m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO do
            let allActionIds :: [ActionId]
allActionIds = ((ActionId, ActionLogResponse -> Either QErr EncJSON) -> ActionId)
-> [(ActionId, ActionLogResponse -> Either QErr EncJSON)]
-> [ActionId]
forall a b. (a -> b) -> [a] -> [b]
map (ActionId, ActionLogResponse -> Either QErr EncJSON) -> ActionId
forall a b. (a, b) -> a
fst ([(ActionId, ActionLogResponse -> Either QErr EncJSON)]
 -> [ActionId])
-> [(ActionId, ActionLogResponse -> Either QErr EncJSON)]
-> [ActionId]
forall a b. (a -> b) -> a -> b
$ RootFieldMap (ActionId, ActionLogResponse -> Either QErr EncJSON)
-> [(ActionId, ActionLogResponse -> Either QErr EncJSON)]
forall a. InsOrdHashMap RootFieldAlias a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList RootFieldMap (ActionId, ActionLogResponse -> Either QErr EncJSON)
actions
            case [ActionId] -> Maybe (NonEmpty ActionId)
forall a. [a] -> Maybe (NonEmpty a)
NE.nonEmpty [ActionId]
allActionIds of
              Maybe (NonEmpty ActionId)
Nothing -> do
                -- This means there is no async action query field present and there is no live-query or streaming
                -- subscription present. Now, we need to check if the modifier is present or not. If it is present,
                -- then we need to send the modified empty object. If it is not present, then we need to send
                -- the completed message.
                case Maybe (Endo Value)
modifier of
                  Maybe (Endo Value)
Nothing -> Maybe RequestId -> Maybe ParameterizedQueryHash -> IO ()
sendCompleted (RequestId -> Maybe RequestId
forall a. a -> Maybe a
Just RequestId
requestId) (ParameterizedQueryHash -> Maybe ParameterizedQueryHash
forall a. a -> Maybe a
Just ParameterizedQueryHash
parameterizedQueryHash)
                  Just Endo Value
modifier' -> do
                    let serverMsg :: ServerMsg
serverMsg = DataMsg -> ServerMsg
sendDataMsg (DataMsg -> ServerMsg) -> DataMsg -> ServerMsg
forall a b. (a -> b) -> a -> b
$ OperationId -> GQResponse -> DataMsg
DataMsg OperationId
opId (GQResponse -> DataMsg) -> GQResponse -> DataMsg
forall a b. (a -> b) -> a -> b
$ ByteString -> GQResponse
forall a b. b -> Either a b
Right (ByteString -> GQResponse)
-> (Value -> ByteString) -> Value -> GQResponse
forall b c a. (b -> c) -> (a -> b) -> a -> c
. EncJSON -> ByteString
encJToLBS (EncJSON -> ByteString)
-> (Value -> EncJSON) -> Value -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Value -> EncJSON
encJFromOrderedValue (Value -> GQResponse) -> Value -> GQResponse
forall a b. (a -> b) -> a -> b
$ Endo Value -> Value -> Value
forall a. Endo a -> a -> a
appEndo Endo Value
modifier' (Value -> Value) -> Value -> Value
forall a b. (a -> b) -> a -> b
$ Object -> Value
JO.Object (Object -> Value) -> Object -> Value
forall a b. (a -> b) -> a -> b
$ Object
JO.empty
                    WSConn -> ServerMsg -> IO ()
forall (m :: * -> *). MonadIO m => WSConn -> ServerMsg -> m ()
sendMsg WSConn
wsConn ServerMsg
serverMsg
              Just NonEmpty ActionId
actionIds -> do
                let sendResponseIO :: HashMap ActionId ActionLogResponse -> IO ()
sendResponseIO HashMap ActionId ActionLogResponse
actionLogMap = do
                      (DiffTime
dTime, Either QErr (RootFieldMap EncJSON)
resultsE) <- IO (Either QErr (RootFieldMap EncJSON))
-> IO (DiffTime, Either QErr (RootFieldMap EncJSON))
forall (m :: * -> *) a. MonadIO m => m a -> m (DiffTime, a)
withElapsedTime
                        (IO (Either QErr (RootFieldMap EncJSON))
 -> IO (DiffTime, Either QErr (RootFieldMap EncJSON)))
-> IO (Either QErr (RootFieldMap EncJSON))
-> IO (DiffTime, Either QErr (RootFieldMap EncJSON))
forall a b. (a -> b) -> a -> b
$ ExceptT QErr IO (RootFieldMap EncJSON)
-> IO (Either QErr (RootFieldMap EncJSON))
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT
                        (ExceptT QErr IO (RootFieldMap EncJSON)
 -> IO (Either QErr (RootFieldMap EncJSON)))
-> ExceptT QErr IO (RootFieldMap EncJSON)
-> IO (Either QErr (RootFieldMap EncJSON))
forall a b. (a -> b) -> a -> b
$ RootFieldMap (ActionId, ActionLogResponse -> Either QErr EncJSON)
-> ((ActionId, ActionLogResponse -> Either QErr EncJSON)
    -> ExceptT QErr IO EncJSON)
-> ExceptT QErr IO (RootFieldMap EncJSON)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
t a -> (a -> f b) -> f (t b)
for RootFieldMap (ActionId, ActionLogResponse -> Either QErr EncJSON)
actions
                        (((ActionId, ActionLogResponse -> Either QErr EncJSON)
  -> ExceptT QErr IO EncJSON)
 -> ExceptT QErr IO (RootFieldMap EncJSON))
-> ((ActionId, ActionLogResponse -> Either QErr EncJSON)
    -> ExceptT QErr IO EncJSON)
-> ExceptT QErr IO (RootFieldMap EncJSON)
forall a b. (a -> b) -> a -> b
$ \(ActionId
actionId, ActionLogResponse -> Either QErr EncJSON
resultBuilder) -> do
                          ActionLogResponse
actionLogResponse <-
                            ActionId
-> HashMap ActionId ActionLogResponse -> Maybe ActionLogResponse
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HashMap.lookup ActionId
actionId HashMap ActionId ActionLogResponse
actionLogMap
                              Maybe ActionLogResponse
-> ExceptT QErr IO ActionLogResponse
-> ExceptT QErr IO ActionLogResponse
forall (m :: * -> *) a. Applicative m => Maybe a -> m a -> m a
`onNothing` Text -> ExceptT QErr IO ActionLogResponse
forall (m :: * -> *) a. QErrM m => Text -> m a
throw500 Text
"unexpected: cannot lookup action_id in response map"
                          Either QErr EncJSON -> ExceptT QErr IO EncJSON
forall e (m :: * -> *) a. MonadError e m => Either e a -> m a
liftEither (Either QErr EncJSON -> ExceptT QErr IO EncJSON)
-> Either QErr EncJSON -> ExceptT QErr IO EncJSON
forall a b. (a -> b) -> a -> b
$ ActionLogResponse -> Either QErr EncJSON
resultBuilder ActionLogResponse
actionLogResponse
                      case Either QErr (RootFieldMap EncJSON)
resultsE of
                        Left QErr
err -> RequestId -> QErr -> IO ()
sendError RequestId
requestId QErr
err
                        Right RootFieldMap EncJSON
results -> do
                          let dataMsg :: ServerMsg
dataMsg =
                                DataMsg -> ServerMsg
sendDataMsg
                                  (DataMsg -> ServerMsg) -> DataMsg -> ServerMsg
forall a b. (a -> b) -> a -> b
$ OperationId -> GQResponse -> DataMsg
DataMsg OperationId
opId
                                  (GQResponse -> DataMsg) -> GQResponse -> DataMsg
forall a b. (a -> b) -> a -> b
$ ByteString -> GQResponse
forall a. a -> Either GQExecError a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
                                  (ByteString -> GQResponse) -> ByteString -> GQResponse
forall a b. (a -> b) -> a -> b
$ EncJSON -> ByteString
encJToLBS
                                  (EncJSON -> ByteString) -> EncJSON -> ByteString
forall a b. (a -> b) -> a -> b
$ RootFieldMap EncJSON -> EncJSON
encodeEncJSONResults RootFieldMap EncJSON
results
                          WSConn
-> ServerMsg
-> Maybe OperationName
-> Maybe ParameterizedQueryHash
-> SubscriptionMetadata
-> IO ()
forall (m :: * -> *).
MonadIO m =>
WSConn
-> ServerMsg
-> Maybe OperationName
-> Maybe ParameterizedQueryHash
-> SubscriptionMetadata
-> m ()
sendMsgWithMetadata WSConn
wsConn ServerMsg
dataMsg Maybe OperationName
opName (ParameterizedQueryHash -> Maybe ParameterizedQueryHash
forall a. a -> Maybe a
Just ParameterizedQueryHash
parameterizedQueryHash) (SubscriptionMetadata -> IO ()) -> SubscriptionMetadata -> IO ()
forall a b. (a -> b) -> a -> b
$ DiffTime -> SubscriptionMetadata
ES.SubscriptionMetadata DiffTime
dTime

                    asyncActionQueryLive :: LiveAsyncActionQuery
asyncActionQueryLive =
                      LiveAsyncActionQueryWithNoRelationships -> LiveAsyncActionQuery
ES.LAAQNoRelationships
                        (LiveAsyncActionQueryWithNoRelationships -> LiveAsyncActionQuery)
-> LiveAsyncActionQueryWithNoRelationships -> LiveAsyncActionQuery
forall a b. (a -> b) -> a -> b
$ (HashMap ActionId ActionLogResponse -> IO ())
-> IO () -> LiveAsyncActionQueryWithNoRelationships
ES.LiveAsyncActionQueryWithNoRelationships HashMap ActionId ActionLogResponse -> IO ()
sendResponseIO (Maybe RequestId -> Maybe ParameterizedQueryHash -> IO ()
sendCompleted (RequestId -> Maybe RequestId
forall a. a -> Maybe a
Just RequestId
requestId) (ParameterizedQueryHash -> Maybe ParameterizedQueryHash
forall a. a -> Maybe a
Just ParameterizedQueryHash
parameterizedQueryHash))

                AsyncActionSubscriptionState
-> OperationId
-> NonEmpty ActionId
-> (QErr -> IO ())
-> LiveAsyncActionQuery
-> IO ()
ES.addAsyncActionLiveQuery
                  (SubscriptionsState -> AsyncActionSubscriptionState
ES._ssAsyncActions SubscriptionsState
subscriptionsState)
                  OperationId
opId
                  NonEmpty ActionId
actionIds
                  (RequestId -> QErr -> IO ()
sendError RequestId
requestId)
                  LiveAsyncActionQuery
asyncActionQueryLive
        E.SEOnSourceDB (E.SSLivequery HashSet ActionId
actionIds HashMap ActionId ActionLogResponse
-> ExceptT QErr IO (SourceName, SubscriptionQueryPlan)
liveQueryBuilder) -> do
          Either QErr (HashMap ActionId ActionLogResponse)
actionLogMapE <- ((HashMap ActionId ActionLogResponse, Bool)
 -> HashMap ActionId ActionLogResponse)
-> Either QErr (HashMap ActionId ActionLogResponse, Bool)
-> Either QErr (HashMap ActionId ActionLogResponse)
forall a b. (a -> b) -> Either QErr a -> Either QErr b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (HashMap ActionId ActionLogResponse, Bool)
-> HashMap ActionId ActionLogResponse
forall a b. (a, b) -> a
fst (Either QErr (HashMap ActionId ActionLogResponse, Bool)
 -> Either QErr (HashMap ActionId ActionLogResponse))
-> ExceptT
     () m (Either QErr (HashMap ActionId ActionLogResponse, Bool))
-> ExceptT () m (Either QErr (HashMap ActionId ActionLogResponse))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ExceptT
  QErr (ExceptT () m) (HashMap ActionId ActionLogResponse, Bool)
-> ExceptT
     () m (Either QErr (HashMap ActionId ActionLogResponse, Bool))
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (HashSet ActionId
-> ExceptT
     QErr (ExceptT () m) (HashMap ActionId ActionLogResponse, Bool)
forall (m :: * -> *) (t :: * -> *).
(MonadError QErr m, MonadMetadataStorage m, Foldable t) =>
t ActionId -> m (HashMap ActionId ActionLogResponse, Bool)
EA.fetchActionLogResponses HashSet ActionId
actionIds)
          HashMap ActionId ActionLogResponse
actionLogMap <- Either QErr (HashMap ActionId ActionLogResponse)
-> (QErr -> ExceptT () m (HashMap ActionId ActionLogResponse))
-> ExceptT () m (HashMap ActionId ActionLogResponse)
forall (m :: * -> *) e a.
Applicative m =>
Either e a -> (e -> m a) -> m a
onLeft Either QErr (HashMap ActionId ActionLogResponse)
actionLogMapE (ExceptT () m ()
-> ExceptT () m (HashMap ActionId ActionLogResponse)
forall a. ExceptT () m () -> ExceptT () m a
withComplete (ExceptT () m ()
 -> ExceptT () m (HashMap ActionId ActionLogResponse))
-> (QErr -> ExceptT () m ())
-> QErr
-> ExceptT () m (HashMap ActionId ActionLogResponse)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RequestId -> Maybe OperationType -> QErr -> ExceptT () m ()
preExecErr RequestId
requestId (OperationType -> Maybe OperationType
forall a. a -> Maybe a
Just OperationType
gqlOpType))
          IO GranularPrometheusMetricsState
granularPrometheusMetricsState <- ExceptT () m (IO GranularPrometheusMetricsState)
forall (m :: * -> *).
MonadGetPolicies m =>
m (IO GranularPrometheusMetricsState)
runGetPrometheusMetricsGranularity
          Either QErr (SubscriberDetails CohortKey)
opMetadataE <- IO (Either QErr (SubscriberDetails CohortKey))
-> ExceptT () m (Either QErr (SubscriberDetails CohortKey))
forall a. IO a -> ExceptT () m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either QErr (SubscriberDetails CohortKey))
 -> ExceptT () m (Either QErr (SubscriberDetails CohortKey)))
-> IO (Either QErr (SubscriberDetails CohortKey))
-> ExceptT () m (Either QErr (SubscriberDetails CohortKey))
forall a b. (a -> b) -> a -> b
$ Maybe OperationName
-> (HashMap ActionId ActionLogResponse
    -> ExceptT QErr IO (SourceName, SubscriptionQueryPlan))
-> ParameterizedQueryHash
-> RequestId
-> HashMap ActionId ActionLogResponse
-> IO GranularPrometheusMetricsState
-> Maybe (Endo Value)
-> IO (Either QErr (SubscriberDetails CohortKey))
startLiveQuery Maybe OperationName
opName HashMap ActionId ActionLogResponse
-> ExceptT QErr IO (SourceName, SubscriptionQueryPlan)
liveQueryBuilder ParameterizedQueryHash
parameterizedQueryHash RequestId
requestId HashMap ActionId ActionLogResponse
actionLogMap IO GranularPrometheusMetricsState
granularPrometheusMetricsState Maybe (Endo Value)
modifier
          SubscriberDetails CohortKey
lqId <- Either QErr (SubscriberDetails CohortKey)
-> (QErr -> ExceptT () m (SubscriberDetails CohortKey))
-> ExceptT () m (SubscriberDetails CohortKey)
forall (m :: * -> *) e a.
Applicative m =>
Either e a -> (e -> m a) -> m a
onLeft Either QErr (SubscriberDetails CohortKey)
opMetadataE (ExceptT () m () -> ExceptT () m (SubscriberDetails CohortKey)
forall a. ExceptT () m () -> ExceptT () m a
withComplete (ExceptT () m () -> ExceptT () m (SubscriberDetails CohortKey))
-> (QErr -> ExceptT () m ())
-> QErr
-> ExceptT () m (SubscriberDetails CohortKey)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RequestId -> Maybe OperationType -> QErr -> ExceptT () m ()
preExecErr RequestId
requestId (OperationType -> Maybe OperationType
forall a. a -> Maybe a
Just OperationType
gqlOpType))
          -- Update async action query subscription state
          case [ActionId] -> Maybe (NonEmpty ActionId)
forall a. [a] -> Maybe (NonEmpty a)
NE.nonEmpty (HashSet ActionId -> [ActionId]
forall a. HashSet a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList HashSet ActionId
actionIds) of
            Maybe (NonEmpty ActionId)
Nothing -> do
              Logger Hasura -> QueryLog -> ExceptT () m ()
forall (m :: * -> *).
MonadQueryLog m =>
Logger Hasura -> QueryLog -> m ()
logQueryLog Logger Hasura
logger (QueryLog -> ExceptT () m ()) -> QueryLog -> ExceptT () m ()
forall a b. (a -> b) -> a -> b
$ GQLReqUnparsed
-> Maybe (RootFieldAlias, GeneratedQuery)
-> RequestId
-> QueryLogKind
-> QueryLog
QueryLog GQLReqUnparsed
q Maybe (RootFieldAlias, GeneratedQuery)
forall a. Maybe a
Nothing RequestId
requestId (Maybe BackendResolvedConnectionTemplate -> QueryLogKind
QueryLogKindDatabase Maybe BackendResolvedConnectionTemplate
forall a. Maybe a
Nothing)
              -- No async action query fields present, do nothing.
              () -> ExceptT () m ()
forall a. a -> ExceptT () m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
            Just NonEmpty ActionId
nonEmptyActionIds -> do
              Logger Hasura -> QueryLog -> ExceptT () m ()
forall (m :: * -> *).
MonadQueryLog m =>
Logger Hasura -> QueryLog -> m ()
logQueryLog Logger Hasura
logger (QueryLog -> ExceptT () m ()) -> QueryLog -> ExceptT () m ()
forall a b. (a -> b) -> a -> b
$ GQLReqUnparsed
-> Maybe (RootFieldAlias, GeneratedQuery)
-> RequestId
-> QueryLogKind
-> QueryLog
QueryLog GQLReqUnparsed
q Maybe (RootFieldAlias, GeneratedQuery)
forall a. Maybe a
Nothing RequestId
requestId QueryLogKind
QueryLogKindAction
              IO () -> ExceptT () m ()
forall a. IO a -> ExceptT () m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ExceptT () m ()) -> IO () -> ExceptT () m ()
forall a b. (a -> b) -> a -> b
$ do
                let asyncActionQueryLive :: LiveAsyncActionQuery
asyncActionQueryLive =
                      LiveAsyncActionQueryOnSource -> LiveAsyncActionQuery
ES.LAAQOnSourceDB
                        (LiveAsyncActionQueryOnSource -> LiveAsyncActionQuery)
-> LiveAsyncActionQueryOnSource -> LiveAsyncActionQuery
forall a b. (a -> b) -> a -> b
$ SubscriberDetails CohortKey
-> HashMap ActionId ActionLogResponse
-> (SubscriberDetails CohortKey
    -> HashMap ActionId ActionLogResponse
    -> IO (Maybe (SubscriberDetails CohortKey)))
-> LiveAsyncActionQueryOnSource
ES.LiveAsyncActionQueryOnSource SubscriberDetails CohortKey
lqId HashMap ActionId ActionLogResponse
actionLogMap
                        ((SubscriberDetails CohortKey
  -> HashMap ActionId ActionLogResponse
  -> IO (Maybe (SubscriberDetails CohortKey)))
 -> LiveAsyncActionQueryOnSource)
-> (SubscriberDetails CohortKey
    -> HashMap ActionId ActionLogResponse
    -> IO (Maybe (SubscriberDetails CohortKey)))
-> LiveAsyncActionQueryOnSource
forall a b. (a -> b) -> a -> b
$ Maybe OperationName
-> ParameterizedQueryHash
-> RequestId
-> (HashMap ActionId ActionLogResponse
    -> ExceptT QErr IO (SourceName, SubscriptionQueryPlan))
-> IO GranularPrometheusMetricsState
-> Maybe OperationName
-> Maybe (Endo Value)
-> SubscriberDetails CohortKey
-> HashMap ActionId ActionLogResponse
-> IO (Maybe (SubscriberDetails CohortKey))
restartLiveQuery Maybe OperationName
opName ParameterizedQueryHash
parameterizedQueryHash RequestId
requestId HashMap ActionId ActionLogResponse
-> ExceptT QErr IO (SourceName, SubscriptionQueryPlan)
liveQueryBuilder IO GranularPrometheusMetricsState
granularPrometheusMetricsState (GQLReqParsed -> Maybe OperationName
forall a. GQLReq a -> Maybe OperationName
_grOperationName GQLReqParsed
reqParsed) Maybe (Endo Value)
modifier

                    onUnexpectedException :: QErr -> IO ()
onUnexpectedException QErr
err = do
                      RequestId -> QErr -> IO ()
sendError RequestId
requestId QErr
err
                      WSServerEnv impl
-> WSConn
-> OperationId
-> IO GranularPrometheusMetricsState
-> IO ()
-> IO ()
forall impl.
WSServerEnv impl
-> WSConn
-> OperationId
-> IO GranularPrometheusMetricsState
-> IO ()
-> IO ()
stopOperation WSServerEnv impl
serverEnv WSConn
wsConn OperationId
opId IO GranularPrometheusMetricsState
granularPrometheusMetricsState (() -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) -- Don't log in case opId don't exist
                AsyncActionSubscriptionState
-> OperationId
-> NonEmpty ActionId
-> (QErr -> IO ())
-> LiveAsyncActionQuery
-> IO ()
ES.addAsyncActionLiveQuery
                  (SubscriptionsState -> AsyncActionSubscriptionState
ES._ssAsyncActions SubscriptionsState
subscriptionsState)
                  OperationId
opId
                  NonEmpty ActionId
nonEmptyActionIds
                  QErr -> IO ()
onUnexpectedException
                  LiveAsyncActionQuery
asyncActionQueryLive
        E.SEOnSourceDB (E.SSStreaming RootFieldAlias
rootFieldName (SourceName, SubscriptionQueryPlan)
streamQueryBuilder) -> do
          IO GranularPrometheusMetricsState
granularPrometheusMetricsState <- ExceptT () m (IO GranularPrometheusMetricsState)
forall (m :: * -> *).
MonadGetPolicies m =>
m (IO GranularPrometheusMetricsState)
runGetPrometheusMetricsGranularity
          IO () -> ExceptT () m ()
forall a. IO a -> ExceptT () m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ExceptT () m ()) -> IO () -> ExceptT () m ()
forall a b. (a -> b) -> a -> b
$ RootFieldAlias
-> (SourceName, SubscriptionQueryPlan)
-> ParameterizedQueryHash
-> RequestId
-> IO GranularPrometheusMetricsState
-> Maybe (Endo Value)
-> IO ()
startStreamingQuery RootFieldAlias
rootFieldName (SourceName, SubscriptionQueryPlan)
streamQueryBuilder ParameterizedQueryHash
parameterizedQueryHash RequestId
requestId IO GranularPrometheusMetricsState
granularPrometheusMetricsState Maybe (Endo Value)
modifier

      IO () -> ExceptT () m ()
forall a. IO a -> ExceptT () m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ExceptT () m ()) -> IO () -> ExceptT () m ()
forall a b. (a -> b) -> a -> b
$ Counter -> IO ()
Prometheus.Counter.inc (GraphQLRequestMetrics -> Counter
gqlRequestsSubscriptionSuccess GraphQLRequestMetrics
gqlMetrics)
      IO () -> ExceptT () m ()
forall a. IO a -> ExceptT () m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ExceptT () m ()) -> IO () -> ExceptT () m ()
forall a b. (a -> b) -> a -> b
$ OpDetail
-> Maybe RequestId -> Maybe ParameterizedQueryHash -> IO ()
forall (n :: * -> *).
MonadIO n =>
OpDetail -> Maybe RequestId -> Maybe ParameterizedQueryHash -> n ()
logOpEv OpDetail
ODStarted (RequestId -> Maybe RequestId
forall a. a -> Maybe a
Just RequestId
requestId) (ParameterizedQueryHash -> Maybe ParameterizedQueryHash
forall a. a -> Maybe a
Just ParameterizedQueryHash
parameterizedQueryHash)
  where
    sendDataMsg :: DataMsg -> ServerMsg
sendDataMsg = WSActions WSConnData -> DataMsg -> ServerMsg
forall a. WSActions a -> DataMsg -> ServerMsg
WS._wsaGetDataMessageType WSActions WSConnData
onMessageActions
    closeConnAction :: WSCloseConnAction WSConnData
closeConnAction = WSActions WSConnData -> WSCloseConnAction WSConnData
forall a. WSActions a -> WSCloseConnAction a
WS._wsaConnectionCloseAction WSActions WSConnData
onMessageActions
    postExecErrAction :: WSPostExecErrMessageAction WSConnData
postExecErrAction = WSActions WSConnData -> WSPostExecErrMessageAction WSConnData
forall a. WSActions a -> WSPostExecErrMessageAction a
WS._wsaPostExecErrMessageAction WSActions WSConnData
onMessageActions
    fmtErrorMessage :: [Encoding] -> Encoding
fmtErrorMessage = WSActions WSConnData -> [Encoding] -> Encoding
forall a. WSActions a -> [Encoding] -> Encoding
WS._wsaErrorMsgFormat WSActions WSConnData
onMessageActions

    doQErr ::
      (Monad n) =>
      ExceptT QErr n a ->
      ExceptT (Either GQExecError QErr) n a
    doQErr :: forall (n :: * -> *) a.
Monad n =>
ExceptT QErr n a -> ExceptT (Either GQExecError QErr) n a
doQErr = (QErr -> Either GQExecError QErr)
-> ExceptT QErr n a -> ExceptT (Either GQExecError QErr) n a
forall (m :: * -> *) e e' a.
Functor m =>
(e -> e') -> ExceptT e m a -> ExceptT e' m a
withExceptT QErr -> Either GQExecError QErr
forall a b. b -> Either a b
Right

    withErr ::
      forall e f n a.
      (Monad n) =>
      (e -> f) ->
      (ExceptT e (ExceptT f n) a -> ExceptT e (ExceptT f n) a) ->
      ExceptT f n a ->
      ExceptT f n a
    withErr :: forall e f (n :: * -> *) a.
Monad n =>
(e -> f)
-> (ExceptT e (ExceptT f n) a -> ExceptT e (ExceptT f n) a)
-> ExceptT f n a
-> ExceptT f n a
withErr e -> f
embed ExceptT e (ExceptT f n) a -> ExceptT e (ExceptT f n) a
f ExceptT f n a
action = do
      Either e a
res <- ExceptT e (ExceptT f n) a -> ExceptT f n (Either e a)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT e (ExceptT f n) a -> ExceptT f n (Either e a))
-> ExceptT e (ExceptT f n) a -> ExceptT f n (Either e a)
forall a b. (a -> b) -> a -> b
$ ExceptT e (ExceptT f n) a -> ExceptT e (ExceptT f n) a
f (ExceptT e (ExceptT f n) a -> ExceptT e (ExceptT f n) a)
-> ExceptT e (ExceptT f n) a -> ExceptT e (ExceptT f n) a
forall a b. (a -> b) -> a -> b
$ ExceptT f n a -> ExceptT e (ExceptT f n) a
forall (m :: * -> *) a. Monad m => m a -> ExceptT e m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift ExceptT f n a
action
      Either e a -> (e -> ExceptT f n a) -> ExceptT f n a
forall (m :: * -> *) e a.
Applicative m =>
Either e a -> (e -> m a) -> m a
onLeft Either e a
res (f -> ExceptT f n a
forall a. f -> ExceptT f n a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (f -> ExceptT f n a) -> (e -> f) -> e -> ExceptT f n a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. e -> f
embed)

    forWithKey :: InsOrdHashMap k a
-> (k -> a -> ExceptT (Either GQExecError QErr) (ExceptT () m) b)
-> ExceptT
     (Either GQExecError QErr) (ExceptT () m) (InsOrdHashMap k b)
forWithKey = ((k -> a -> ExceptT (Either GQExecError QErr) (ExceptT () m) b)
 -> InsOrdHashMap k a
 -> ExceptT
      (Either GQExecError QErr) (ExceptT () m) (InsOrdHashMap k b))
-> InsOrdHashMap k a
-> (k -> a -> ExceptT (Either GQExecError QErr) (ExceptT () m) b)
-> ExceptT
     (Either GQExecError QErr) (ExceptT () m) (InsOrdHashMap k b)
forall a b c. (a -> b -> c) -> b -> a -> c
flip (k -> a -> ExceptT (Either GQExecError QErr) (ExceptT () m) b)
-> InsOrdHashMap k a
-> ExceptT
     (Either GQExecError QErr) (ExceptT () m) (InsOrdHashMap k b)
forall (f :: * -> *) k a b.
Applicative f =>
(k -> a -> f b) -> InsOrdHashMap k a -> f (InsOrdHashMap k b)
InsOrdHashMap.traverseWithKey

    telemTransport :: Transport
telemTransport = Transport
Telem.WebSocket

    handleResult ::
      forall a.
      RequestId ->
      G.OperationType ->
      Either (Either GQExecError QErr) a ->
      (a -> ExceptT () m ()) ->
      ExceptT () m ()
    handleResult :: forall a.
RequestId
-> OperationType
-> Either (Either GQExecError QErr) a
-> (a -> ExceptT () m ())
-> ExceptT () m ()
handleResult RequestId
requestId OperationType
gqlOpType Either (Either GQExecError QErr) a
r a -> ExceptT () m ()
f = case Either (Either GQExecError QErr) a
r of
      Left (Left GQExecError
err) -> OperationType -> GQExecError -> ExceptT () m ()
postExecErr' OperationType
gqlOpType GQExecError
err
      Left (Right QErr
err) -> RequestId -> OperationType -> QErr -> ExceptT () m ()
postExecErr RequestId
requestId OperationType
gqlOpType QErr
err
      Right a
results -> a -> ExceptT () m ()
f a
results

    sendResultFromFragments :: QueryType
-> ExceptT () m DiffTime
-> RequestId
-> Either
     (Either GQExecError QErr)
     (InsOrdHashMap RootFieldAlias AnnotatedResponsePart)
-> Maybe OperationName
-> ParameterizedQueryHash
-> OperationType
-> ExceptT () m ()
sendResultFromFragments QueryType
telemQueryType ExceptT () m DiffTime
timerTot RequestId
requestId Either
  (Either GQExecError QErr)
  (InsOrdHashMap RootFieldAlias AnnotatedResponsePart)
r Maybe OperationName
opName ParameterizedQueryHash
pqh OperationType
gqlOpType =
      RequestId
-> OperationType
-> Either
     (Either GQExecError QErr)
     (InsOrdHashMap RootFieldAlias AnnotatedResponsePart)
-> (InsOrdHashMap RootFieldAlias AnnotatedResponsePart
    -> ExceptT () m ())
-> ExceptT () m ()
forall a.
RequestId
-> OperationType
-> Either (Either GQExecError QErr) a
-> (a -> ExceptT () m ())
-> ExceptT () m ()
handleResult RequestId
requestId OperationType
gqlOpType Either
  (Either GQExecError QErr)
  (InsOrdHashMap RootFieldAlias AnnotatedResponsePart)
r \InsOrdHashMap RootFieldAlias AnnotatedResponsePart
results -> do
        let telemLocality :: Locality
telemLocality = (AnnotatedResponsePart -> Locality)
-> InsOrdHashMap RootFieldAlias AnnotatedResponsePart -> Locality
forall m a.
Monoid m =>
(a -> m) -> InsOrdHashMap RootFieldAlias a -> m
forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
foldMap AnnotatedResponsePart -> Locality
arpLocality InsOrdHashMap RootFieldAlias AnnotatedResponsePart
results
            telemTimeIO :: Seconds
telemTimeIO = DiffTime -> Seconds
forall x y. (Duration x, Duration y) => x -> y
convertDuration (DiffTime -> Seconds) -> DiffTime -> Seconds
forall a b. (a -> b) -> a -> b
$ InsOrdHashMap RootFieldAlias DiffTime -> DiffTime
forall a. Num a => InsOrdHashMap RootFieldAlias a -> a
forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum (InsOrdHashMap RootFieldAlias DiffTime -> DiffTime)
-> InsOrdHashMap RootFieldAlias DiffTime -> DiffTime
forall a b. (a -> b) -> a -> b
$ (AnnotatedResponsePart -> DiffTime)
-> InsOrdHashMap RootFieldAlias AnnotatedResponsePart
-> InsOrdHashMap RootFieldAlias DiffTime
forall a b.
(a -> b)
-> InsOrdHashMap RootFieldAlias a -> InsOrdHashMap RootFieldAlias b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap AnnotatedResponsePart -> DiffTime
arpTimeIO InsOrdHashMap RootFieldAlias AnnotatedResponsePart
results
        DiffTime
totalTime <- ExceptT () m DiffTime
timerTot
        let telemTimeTot :: Seconds
telemTimeTot = DiffTime -> Seconds
Seconds DiffTime
totalTime
        EncJSON
-> Maybe OperationName
-> ParameterizedQueryHash
-> SubscriptionMetadata
-> ExceptT () m ()
sendSuccResp (InsOrdHashMap RootFieldAlias AnnotatedResponsePart -> EncJSON
encodeAnnotatedResponseParts InsOrdHashMap RootFieldAlias AnnotatedResponsePart
results) Maybe OperationName
opName ParameterizedQueryHash
pqh
          (SubscriptionMetadata -> ExceptT () m ())
-> SubscriptionMetadata -> ExceptT () m ()
forall a b. (a -> b) -> a -> b
$ DiffTime -> SubscriptionMetadata
ES.SubscriptionMetadata
          (DiffTime -> SubscriptionMetadata)
-> DiffTime -> SubscriptionMetadata
forall a b. (a -> b) -> a -> b
$ InsOrdHashMap RootFieldAlias DiffTime -> DiffTime
forall a. Num a => InsOrdHashMap RootFieldAlias a -> a
forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum
          (InsOrdHashMap RootFieldAlias DiffTime -> DiffTime)
-> InsOrdHashMap RootFieldAlias DiffTime -> DiffTime
forall a b. (a -> b) -> a -> b
$ (AnnotatedResponsePart -> DiffTime)
-> InsOrdHashMap RootFieldAlias AnnotatedResponsePart
-> InsOrdHashMap RootFieldAlias DiffTime
forall a b.
(a -> b)
-> InsOrdHashMap RootFieldAlias a -> InsOrdHashMap RootFieldAlias b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap AnnotatedResponsePart -> DiffTime
arpTimeIO InsOrdHashMap RootFieldAlias AnnotatedResponsePart
results
        -- Telemetry. NOTE: don't time network IO:
        RequestDimensions -> RequestTimings -> ExceptT () m ()
forall (m :: * -> *).
MonadIO m =>
RequestDimensions -> RequestTimings -> m ()
Telem.recordTimingMetric Telem.RequestDimensions {Transport
Locality
QueryType
telemTransport :: Transport
$sel:telemQueryType:RequestDimensions :: QueryType
$sel:telemLocality:RequestDimensions :: Locality
$sel:telemTransport:RequestDimensions :: Transport
telemQueryType :: QueryType
telemLocality :: Locality
..} Telem.RequestTimings {Seconds
$sel:telemTimeIO:RequestTimings :: Seconds
$sel:telemTimeTot:RequestTimings :: Seconds
telemTimeIO :: Seconds
telemTimeTot :: Seconds
..}
        IO () -> ExceptT () m ()
forall a. IO a -> ExceptT () m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ExceptT () m ()) -> IO () -> ExceptT () m ()
forall a b. (a -> b) -> a -> b
$ DiffTime -> OperationType -> IO ()
recordGQLQuerySuccess DiffTime
totalTime OperationType
gqlOpType

    runRemoteGQ ::
      RequestId ->
      GQLReqUnparsed ->
      RootFieldAlias ->
      UserInfo ->
      [HTTP.Header] ->
      RemoteSchemaInfo ->
      ResultCustomizer ->
      GQLReqOutgoing ->
      Maybe RJ.RemoteJoins ->
      ExceptT (Either GQExecError QErr) (ExceptT () m) AnnotatedResponsePart
    runRemoteGQ :: RequestId
-> GQLReqUnparsed
-> RootFieldAlias
-> UserInfo
-> [Header]
-> RemoteSchemaInfo
-> ResultCustomizer
-> GQLReqOutgoing
-> Maybe RemoteJoins
-> ExceptT
     (Either GQExecError QErr) (ExceptT () m) AnnotatedResponsePart
runRemoteGQ RequestId
requestId GQLReqUnparsed
reqUnparsed RootFieldAlias
fieldName UserInfo
userInfo [Header]
reqHdrs RemoteSchemaInfo
rsi ResultCustomizer
resultCustomizer GQLReqOutgoing
gqlReq Maybe RemoteJoins
remoteJoins = Text
-> ExceptT
     (Either GQExecError QErr) (ExceptT () m) AnnotatedResponsePart
-> ExceptT
     (Either GQExecError QErr) (ExceptT () m) AnnotatedResponsePart
forall (m :: * -> *) a.
(MonadIO m, MonadTrace m) =>
Text -> m a -> m a
Tracing.newSpan (Text
"Remote schema query for root field " Text -> RootFieldAlias -> Text
forall t. ToTxt t => Text -> t -> Text
<>> RootFieldAlias
fieldName) (ExceptT
   (Either GQExecError QErr) (ExceptT () m) AnnotatedResponsePart
 -> ExceptT
      (Either GQExecError QErr) (ExceptT () m) AnnotatedResponsePart)
-> ExceptT
     (Either GQExecError QErr) (ExceptT () m) AnnotatedResponsePart
-> ExceptT
     (Either GQExecError QErr) (ExceptT () m) AnnotatedResponsePart
forall a b. (a -> b) -> a -> b
$ do
      Environment
env <- IO Environment
-> ExceptT (Either GQExecError QErr) (ExceptT () m) Environment
forall a.
IO a -> ExceptT (Either GQExecError QErr) (ExceptT () m) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Environment
 -> ExceptT (Either GQExecError QErr) (ExceptT () m) Environment)
-> IO Environment
-> ExceptT (Either GQExecError QErr) (ExceptT () m) Environment
forall a b. (a -> b) -> a -> b
$ AppContext -> Environment
acEnvironment (AppContext -> Environment) -> IO AppContext -> IO Environment
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> AppStateRef impl -> IO AppContext
forall impl. AppStateRef impl -> IO AppContext
getAppContext AppStateRef impl
appStateRef
      (DiffTime
telemTimeIO_DT, [Header]
_respHdrs, ByteString
resp) <-
        ExceptT QErr (ExceptT () m) (DiffTime, [Header], ByteString)
-> ExceptT
     (Either GQExecError QErr)
     (ExceptT () m)
     (DiffTime, [Header], ByteString)
forall (n :: * -> *) a.
Monad n =>
ExceptT QErr n a -> ExceptT (Either GQExecError QErr) n a
doQErr
          (ExceptT QErr (ExceptT () m) (DiffTime, [Header], ByteString)
 -> ExceptT
      (Either GQExecError QErr)
      (ExceptT () m)
      (DiffTime, [Header], ByteString))
-> ExceptT QErr (ExceptT () m) (DiffTime, [Header], ByteString)
-> ExceptT
     (Either GQExecError QErr)
     (ExceptT () m)
     (DiffTime, [Header], ByteString)
forall a b. (a -> b) -> a -> b
$ Environment
-> UserInfo
-> [Header]
-> ValidatedRemoteSchemaDef
-> GQLReqOutgoing
-> ExceptT QErr (ExceptT () m) (DiffTime, [Header], ByteString)
forall (m :: * -> *).
(MonadIO m, MonadError QErr m, MonadTrace m, ProvidesNetwork m) =>
Environment
-> UserInfo
-> [Header]
-> ValidatedRemoteSchemaDef
-> GQLReqOutgoing
-> m (DiffTime, [Header], ByteString)
E.execRemoteGQ Environment
env UserInfo
userInfo [Header]
reqHdrs (RemoteSchemaInfo -> ValidatedRemoteSchemaDef
rsDef RemoteSchemaInfo
rsi) GQLReqOutgoing
gqlReq
      Value
value <- (forall a. m a -> ExceptT () m a)
-> ExceptT (Either GQExecError QErr) m Value
-> ExceptT (Either GQExecError QErr) (ExceptT () m) Value
forall {k} (t :: (* -> *) -> k -> *) (m :: * -> *) (n :: * -> *)
       (b :: k).
(MFunctor t, Monad m) =>
(forall a. m a -> n a) -> t m b -> t n b
forall (m :: * -> *) (n :: * -> *) b.
Monad m =>
(forall a. m a -> n a)
-> ExceptT (Either GQExecError QErr) m b
-> ExceptT (Either GQExecError QErr) n b
hoist m a -> ExceptT () m a
forall a. m a -> ExceptT () m a
forall (m :: * -> *) a. Monad m => m a -> ExceptT () m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (ExceptT (Either GQExecError QErr) m Value
 -> ExceptT (Either GQExecError QErr) (ExceptT () m) Value)
-> ExceptT (Either GQExecError QErr) m Value
-> ExceptT (Either GQExecError QErr) (ExceptT () m) Value
forall a b. (a -> b) -> a -> b
$ RootFieldAlias
-> ResultCustomizer
-> ByteString
-> ExceptT (Either GQExecError QErr) m Value
forall (m :: * -> *).
Monad m =>
RootFieldAlias
-> ResultCustomizer
-> ByteString
-> ExceptT (Either GQExecError QErr) m Value
extractFieldFromResponse RootFieldAlias
fieldName ResultCustomizer
resultCustomizer ByteString
resp
      EncJSON
finalResponse <-
        ExceptT QErr (ExceptT () m) EncJSON
-> ExceptT (Either GQExecError QErr) (ExceptT () m) EncJSON
forall (n :: * -> *) a.
Monad n =>
ExceptT QErr n a -> ExceptT (Either GQExecError QErr) n a
doQErr
          (ExceptT QErr (ExceptT () m) EncJSON
 -> ExceptT (Either GQExecError QErr) (ExceptT () m) EncJSON)
-> ExceptT QErr (ExceptT () m) EncJSON
-> ExceptT (Either GQExecError QErr) (ExceptT () m) EncJSON
forall a b. (a -> b) -> a -> b
$ RequestId
-> Logger Hasura
-> Maybe (CredentialCache AgentLicenseKey)
-> Environment
-> [Header]
-> UserInfo
-> EncJSON
-> Maybe RemoteJoins
-> GQLReqUnparsed
-> ExceptT QErr (ExceptT () m) EncJSON
forall (m :: * -> *).
(MonadError QErr m, MonadIO m, MonadBaseControl IO m,
 MonadQueryTags m, MonadQueryLog m, MonadExecutionLog m,
 MonadTrace m, ProvidesNetwork m) =>
RequestId
-> Logger Hasura
-> Maybe (CredentialCache AgentLicenseKey)
-> Environment
-> [Header]
-> UserInfo
-> EncJSON
-> Maybe RemoteJoins
-> GQLReqUnparsed
-> m EncJSON
RJ.processRemoteJoins
            RequestId
requestId
            Logger Hasura
logger
            Maybe (CredentialCache AgentLicenseKey)
agentLicenseKey
            Environment
env
            [Header]
reqHdrs
            UserInfo
userInfo
            -- TODO: avoid encode and decode here
            (Value -> EncJSON
encJFromOrderedValue Value
value)
            Maybe RemoteJoins
remoteJoins
            GQLReqUnparsed
reqUnparsed
      AnnotatedResponsePart
-> ExceptT
     (Either GQExecError QErr) (ExceptT () m) AnnotatedResponsePart
forall a. a -> ExceptT (Either GQExecError QErr) (ExceptT () m) a
forall (m :: * -> *) a. Monad m => a -> m a
return (AnnotatedResponsePart
 -> ExceptT
      (Either GQExecError QErr) (ExceptT () m) AnnotatedResponsePart)
-> AnnotatedResponsePart
-> ExceptT
     (Either GQExecError QErr) (ExceptT () m) AnnotatedResponsePart
forall a b. (a -> b) -> a -> b
$ DiffTime
-> Locality -> EncJSON -> [Header] -> AnnotatedResponsePart
AnnotatedResponsePart DiffTime
telemTimeIO_DT Locality
Telem.Remote EncJSON
finalResponse []

    WSServerEnv
      Logger Hasura
logger
      SubscriptionsState
subscriptionsState
      AppStateRef impl
appStateRef
      Manager
_
      IO CorsPolicy
_
      ReadOnlyMode
readOnlyMode
      WSServer
_
      KeepAliveDelay
_keepAliveDelay
      ServerMetrics
_serverMetrics
      PrometheusMetrics
prometheusMetrics
      SamplingPolicy
_ = WSServerEnv impl
serverEnv

    -- Hook to retrieve the latest subscription options(live query + stream query options) from the `appStateRef`
    getSubscriptionOptions :: IO (LiveQueriesOptions, LiveQueriesOptions)
getSubscriptionOptions = (AppContext -> (LiveQueriesOptions, LiveQueriesOptions))
-> IO AppContext -> IO (LiveQueriesOptions, LiveQueriesOptions)
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (\AppContext
appCtx -> (AppContext -> LiveQueriesOptions
acLiveQueryOptions AppContext
appCtx, AppContext -> LiveQueriesOptions
acStreamQueryOptions AppContext
appCtx)) (AppStateRef impl -> IO AppContext
forall impl. AppStateRef impl -> IO AppContext
getAppContext AppStateRef impl
appStateRef)
    gqlMetrics :: GraphQLRequestMetrics
gqlMetrics = PrometheusMetrics -> GraphQLRequestMetrics
pmGraphQLRequestMetrics PrometheusMetrics
prometheusMetrics

    WSConnData TVar WSConnState
userInfoR OperationMap
opMap ErrRespType
errRespTy GraphQLQueryType
queryType = WSConn -> WSConnData
forall a. WSConn a -> a
WS.getData WSConn
wsConn

    logOpEv :: (MonadIO n) => OpDetail -> Maybe RequestId -> Maybe ParameterizedQueryHash -> n ()
    logOpEv :: forall (n :: * -> *).
MonadIO n =>
OpDetail -> Maybe RequestId -> Maybe ParameterizedQueryHash -> n ()
logOpEv OpDetail
opTy Maybe RequestId
reqId Maybe ParameterizedQueryHash
parameterizedQueryHash =
      -- See Note [Disable query printing when query-log is disabled]
      let censoredReq :: GQLReqUnparsed
censoredReq =
            case ShouldCaptureQueryVariables
shouldCaptureVariables of
              ShouldCaptureQueryVariables
CaptureQueryVariables -> GQLReqUnparsed
q
              ShouldCaptureQueryVariables
DoNotCaptureQueryVariables -> GQLReqUnparsed
q {_grVariables :: Maybe VariableValues
_grVariables = Maybe VariableValues
forall a. Maybe a
Nothing}
          queryToLog :: Maybe GQLReqUnparsed
queryToLog = GQLReqUnparsed
censoredReq GQLReqUnparsed -> Maybe () -> Maybe GQLReqUnparsed
forall a b. a -> Maybe b -> Maybe a
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ Bool -> Maybe ()
forall (f :: * -> *). Alternative f => Bool -> f ()
guard (EngineLogType Hasura -> HashSet (EngineLogType Hasura) -> Bool
forall a. (Eq a, Hashable a) => a -> HashSet a -> Bool
Set.member EngineLogType Hasura
L.ELTQueryLog HashSet (EngineLogType Hasura)
enabledLogTypes)
       in Logger Hasura -> WSConn -> WSEvent -> n ()
forall (m :: * -> *).
MonadIO m =>
Logger Hasura -> WSConn -> WSEvent -> m ()
logWSEvent Logger Hasura
logger WSConn
wsConn
            (WSEvent -> n ()) -> WSEvent -> n ()
forall a b. (a -> b) -> a -> b
$ OperationDetails -> WSEvent
EOperation
            (OperationDetails -> WSEvent) -> OperationDetails -> WSEvent
forall a b. (a -> b) -> a -> b
$ OperationId
-> Maybe RequestId
-> Maybe OperationName
-> OpDetail
-> Maybe GQLReqUnparsed
-> Maybe ParameterizedQueryHash
-> OperationDetails
OperationDetails OperationId
opId Maybe RequestId
reqId (GQLReqUnparsed -> Maybe OperationName
forall a. GQLReq a -> Maybe OperationName
_grOperationName GQLReqUnparsed
q) OpDetail
opTy Maybe GQLReqUnparsed
queryToLog Maybe ParameterizedQueryHash
parameterizedQueryHash

    getErrFn :: ErrRespType -> Bool -> QErr -> Encoding
getErrFn ErrRespType
ERTLegacy = Bool -> QErr -> Encoding
encodeQErr
    getErrFn ErrRespType
ERTGraphqlCompliant = Bool -> QErr -> Encoding
encodeGQLErr

    sendStartErr :: Text -> ExceptT () m ()
sendStartErr Text
e = do
      let errFn :: Bool -> QErr -> Encoding
errFn = ErrRespType -> Bool -> QErr -> Encoding
getErrFn ErrRespType
errRespTy
      WSConn -> ServerMsg -> ExceptT () m ()
forall (m :: * -> *). MonadIO m => WSConn -> ServerMsg -> m ()
sendMsg WSConn
wsConn
        (ServerMsg -> ExceptT () m ()) -> ServerMsg -> ExceptT () m ()
forall a b. (a -> b) -> a -> b
$ ErrorMsg -> ServerMsg
SMErr
        (ErrorMsg -> ServerMsg) -> ErrorMsg -> ServerMsg
forall a b. (a -> b) -> a -> b
$ OperationId -> Encoding -> ErrorMsg
ErrorMsg OperationId
opId
        (Encoding -> ErrorMsg) -> Encoding -> ErrorMsg
forall a b. (a -> b) -> a -> b
$ Bool -> QErr -> Encoding
errFn Bool
False
        (QErr -> Encoding) -> QErr -> Encoding
forall a b. (a -> b) -> a -> b
$ Code -> Text -> QErr
err400 Code
StartFailed Text
e
      IO () -> ExceptT () m ()
forall a. IO a -> ExceptT () m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ExceptT () m ()) -> IO () -> ExceptT () m ()
forall a b. (a -> b) -> a -> b
$ OpDetail
-> Maybe RequestId -> Maybe ParameterizedQueryHash -> IO ()
forall (n :: * -> *).
MonadIO n =>
OpDetail -> Maybe RequestId -> Maybe ParameterizedQueryHash -> n ()
logOpEv (Text -> OpDetail
ODProtoErr Text
e) Maybe RequestId
forall a. Maybe a
Nothing Maybe ParameterizedQueryHash
forall a. Maybe a
Nothing
      IO () -> ExceptT () m ()
forall a. IO a -> ExceptT () m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ExceptT () m ()) -> IO () -> ExceptT () m ()
forall a b. (a -> b) -> a -> b
$ Maybe OperationType -> IO ()
reportGQLQueryError Maybe OperationType
forall a. Maybe a
Nothing
      IO () -> ExceptT () m ()
forall a. IO a -> ExceptT () m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ExceptT () m ()) -> IO () -> ExceptT () m ()
forall a b. (a -> b) -> a -> b
$ WSCloseConnAction WSConnData
closeConnAction WSConn
wsConn OperationId
opId (Text -> String
T.unpack Text
e)

    sendCompleted :: Maybe RequestId -> Maybe ParameterizedQueryHash -> IO ()
sendCompleted Maybe RequestId
reqId Maybe ParameterizedQueryHash
paramQueryHash = do
      WSConn -> ServerMsg -> IO ()
forall (m :: * -> *). MonadIO m => WSConn -> ServerMsg -> m ()
sendMsg WSConn
wsConn (CompletionMsg -> ServerMsg
SMComplete (CompletionMsg -> ServerMsg)
-> (OperationId -> CompletionMsg) -> OperationId -> ServerMsg
forall b c a. (b -> c) -> (a -> b) -> a -> c
. OperationId -> CompletionMsg
CompletionMsg (OperationId -> ServerMsg) -> OperationId -> ServerMsg
forall a b. (a -> b) -> a -> b
$ OperationId
opId)
      OpDetail
-> Maybe RequestId -> Maybe ParameterizedQueryHash -> IO ()
forall (n :: * -> *).
MonadIO n =>
OpDetail -> Maybe RequestId -> Maybe ParameterizedQueryHash -> n ()
logOpEv OpDetail
ODCompleted Maybe RequestId
reqId Maybe ParameterizedQueryHash
paramQueryHash

    postExecErr ::
      RequestId ->
      G.OperationType ->
      QErr ->
      ExceptT () m ()
    postExecErr :: RequestId -> OperationType -> QErr -> ExceptT () m ()
postExecErr RequestId
reqId OperationType
gqlOpType QErr
qErr = do
      let errFn :: QErr -> Encoding
errFn = ErrRespType -> Bool -> QErr -> Encoding
getErrFn ErrRespType
errRespTy Bool
False
      IO () -> ExceptT () m ()
forall a. IO a -> ExceptT () m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ExceptT () m ()) -> IO () -> ExceptT () m ()
forall a b. (a -> b) -> a -> b
$ OpDetail
-> Maybe RequestId -> Maybe ParameterizedQueryHash -> IO ()
forall (n :: * -> *).
MonadIO n =>
OpDetail -> Maybe RequestId -> Maybe ParameterizedQueryHash -> n ()
logOpEv (QErr -> OpDetail
ODQueryErr QErr
qErr) (RequestId -> Maybe RequestId
forall a. a -> Maybe a
Just RequestId
reqId) Maybe ParameterizedQueryHash
forall a. Maybe a
Nothing
      OperationType -> GQExecError -> ExceptT () m ()
postExecErr' OperationType
gqlOpType (GQExecError -> ExceptT () m ()) -> GQExecError -> ExceptT () m ()
forall a b. (a -> b) -> a -> b
$ [Encoding] -> GQExecError
GQExecError ([Encoding] -> GQExecError) -> [Encoding] -> GQExecError
forall a b. (a -> b) -> a -> b
$ Encoding -> [Encoding]
forall a. a -> [a]
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Encoding -> [Encoding]) -> Encoding -> [Encoding]
forall a b. (a -> b) -> a -> b
$ QErr -> Encoding
errFn QErr
qErr

    postExecErr' :: G.OperationType -> GQExecError -> ExceptT () m ()
    postExecErr' :: OperationType -> GQExecError -> ExceptT () m ()
postExecErr' OperationType
gqlOpType GQExecError
qErr =
      IO () -> ExceptT () m ()
forall a. IO a -> ExceptT () m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ExceptT () m ()) -> IO () -> ExceptT () m ()
forall a b. (a -> b) -> a -> b
$ do
        Maybe OperationType -> IO ()
reportGQLQueryError (OperationType -> Maybe OperationType
forall a. a -> Maybe a
Just OperationType
gqlOpType)
        WSPostExecErrMessageAction WSConnData
postExecErrAction WSConn
wsConn OperationId
opId GQExecError
qErr

    -- why wouldn't pre exec error use graphql response?
    preExecErr :: RequestId -> Maybe OperationType -> QErr -> ExceptT () m ()
preExecErr RequestId
reqId Maybe OperationType
mGqlOpType QErr
qErr = do
      IO () -> ExceptT () m ()
forall a. IO a -> ExceptT () m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ExceptT () m ()) -> IO () -> ExceptT () m ()
forall a b. (a -> b) -> a -> b
$ Maybe OperationType -> IO ()
reportGQLQueryError Maybe OperationType
mGqlOpType
      IO () -> ExceptT () m ()
forall a. IO a -> ExceptT () m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ExceptT () m ()) -> IO () -> ExceptT () m ()
forall a b. (a -> b) -> a -> b
$ RequestId -> QErr -> IO ()
sendError RequestId
reqId QErr
qErr

    sendError :: RequestId -> QErr -> IO ()
sendError RequestId
reqId QErr
qErr = do
      let errFn :: Bool -> QErr -> Encoding
errFn = ErrRespType -> Bool -> QErr -> Encoding
getErrFn ErrRespType
errRespTy
      OpDetail
-> Maybe RequestId -> Maybe ParameterizedQueryHash -> IO ()
forall (n :: * -> *).
MonadIO n =>
OpDetail -> Maybe RequestId -> Maybe ParameterizedQueryHash -> n ()
logOpEv (QErr -> OpDetail
ODQueryErr QErr
qErr) (RequestId -> Maybe RequestId
forall a. a -> Maybe a
Just RequestId
reqId) Maybe ParameterizedQueryHash
forall a. Maybe a
Nothing
      let err :: Encoding
err = case ErrRespType
errRespTy of
            ErrRespType
ERTLegacy -> Bool -> QErr -> Encoding
errFn Bool
False QErr
qErr
            ErrRespType
ERTGraphqlCompliant -> [Encoding] -> Encoding
fmtErrorMessage [Bool -> QErr -> Encoding
errFn Bool
False QErr
qErr]
      WSConn -> ServerMsg -> IO ()
forall (m :: * -> *). MonadIO m => WSConn -> ServerMsg -> m ()
sendMsg WSConn
wsConn (ErrorMsg -> ServerMsg
SMErr (ErrorMsg -> ServerMsg) -> ErrorMsg -> ServerMsg
forall a b. (a -> b) -> a -> b
$ OperationId -> Encoding -> ErrorMsg
ErrorMsg OperationId
opId Encoding
err)

    sendSuccResp ::
      EncJSON ->
      Maybe OperationName ->
      ParameterizedQueryHash ->
      ES.SubscriptionMetadata ->
      ExceptT () m ()
    sendSuccResp :: EncJSON
-> Maybe OperationName
-> ParameterizedQueryHash
-> SubscriptionMetadata
-> ExceptT () m ()
sendSuccResp EncJSON
encJson Maybe OperationName
opName ParameterizedQueryHash
queryHash =
      WSConn
-> ServerMsg
-> Maybe OperationName
-> Maybe ParameterizedQueryHash
-> SubscriptionMetadata
-> ExceptT () m ()
forall (m :: * -> *).
MonadIO m =>
WSConn
-> ServerMsg
-> Maybe OperationName
-> Maybe ParameterizedQueryHash
-> SubscriptionMetadata
-> m ()
sendMsgWithMetadata
        WSConn
wsConn
        (DataMsg -> ServerMsg
sendDataMsg (DataMsg -> ServerMsg) -> DataMsg -> ServerMsg
forall a b. (a -> b) -> a -> b
$ OperationId -> GQResponse -> DataMsg
DataMsg OperationId
opId (GQResponse -> DataMsg) -> GQResponse -> DataMsg
forall a b. (a -> b) -> a -> b
$ ByteString -> GQResponse
forall a. a -> Either GQExecError a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ByteString -> GQResponse) -> ByteString -> GQResponse
forall a b. (a -> b) -> a -> b
$ EncJSON -> ByteString
encJToLBS EncJSON
encJson)
        Maybe OperationName
opName
        (ParameterizedQueryHash -> Maybe ParameterizedQueryHash
forall a. a -> Maybe a
Just ParameterizedQueryHash
queryHash)

    withComplete ::
      ExceptT () m () ->
      ExceptT () m a
    withComplete :: forall a. ExceptT () m () -> ExceptT () m a
withComplete ExceptT () m ()
action = do
      ExceptT () m ()
action
      IO () -> ExceptT () m ()
forall a. IO a -> ExceptT () m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ExceptT () m ()) -> IO () -> ExceptT () m ()
forall a b. (a -> b) -> a -> b
$ Maybe RequestId -> Maybe ParameterizedQueryHash -> IO ()
sendCompleted Maybe RequestId
forall a. Maybe a
Nothing Maybe ParameterizedQueryHash
forall a. Maybe a
Nothing
      () -> ExceptT () m a
forall a. () -> ExceptT () m a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError ()

    restartLiveQuery :: Maybe OperationName
-> ParameterizedQueryHash
-> RequestId
-> (HashMap ActionId ActionLogResponse
    -> ExceptT QErr IO (SourceName, SubscriptionQueryPlan))
-> IO GranularPrometheusMetricsState
-> Maybe OperationName
-> Maybe (Endo Value)
-> SubscriberDetails CohortKey
-> HashMap ActionId ActionLogResponse
-> IO (Maybe (SubscriberDetails CohortKey))
restartLiveQuery Maybe OperationName
opName ParameterizedQueryHash
parameterizedQueryHash RequestId
requestId HashMap ActionId ActionLogResponse
-> ExceptT QErr IO (SourceName, SubscriptionQueryPlan)
liveQueryBuilder IO GranularPrometheusMetricsState
granularPrometheusMetricsState Maybe OperationName
maybeOperationName Maybe (Endo Value)
modifier SubscriberDetails CohortKey
lqId HashMap ActionId ActionLogResponse
actionLogMap = do
      Logger Hasura
-> ServerMetrics
-> PrometheusMetrics
-> SubscriptionsState
-> SubscriberDetails CohortKey
-> IO GranularPrometheusMetricsState
-> Maybe OperationName
-> IO ()
ES.removeLiveQuery Logger Hasura
logger (WSServerEnv impl -> ServerMetrics
forall impl. WSServerEnv impl -> ServerMetrics
_wseServerMetrics WSServerEnv impl
serverEnv) (WSServerEnv impl -> PrometheusMetrics
forall impl. WSServerEnv impl -> PrometheusMetrics
_wsePrometheusMetrics WSServerEnv impl
serverEnv) SubscriptionsState
subscriptionsState SubscriberDetails CohortKey
lqId IO GranularPrometheusMetricsState
granularPrometheusMetricsState Maybe OperationName
maybeOperationName
      (QErr -> Maybe (SubscriberDetails CohortKey))
-> (SubscriberDetails CohortKey
    -> Maybe (SubscriberDetails CohortKey))
-> Either QErr (SubscriberDetails CohortKey)
-> Maybe (SubscriberDetails CohortKey)
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (Maybe (SubscriberDetails CohortKey)
-> QErr -> Maybe (SubscriberDetails CohortKey)
forall a b. a -> b -> a
const Maybe (SubscriberDetails CohortKey)
forall a. Maybe a
Nothing) SubscriberDetails CohortKey -> Maybe (SubscriberDetails CohortKey)
forall a. a -> Maybe a
Just (Either QErr (SubscriberDetails CohortKey)
 -> Maybe (SubscriberDetails CohortKey))
-> IO (Either QErr (SubscriberDetails CohortKey))
-> IO (Maybe (SubscriberDetails CohortKey))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe OperationName
-> (HashMap ActionId ActionLogResponse
    -> ExceptT QErr IO (SourceName, SubscriptionQueryPlan))
-> ParameterizedQueryHash
-> RequestId
-> HashMap ActionId ActionLogResponse
-> IO GranularPrometheusMetricsState
-> Maybe (Endo Value)
-> IO (Either QErr (SubscriberDetails CohortKey))
startLiveQuery Maybe OperationName
opName HashMap ActionId ActionLogResponse
-> ExceptT QErr IO (SourceName, SubscriptionQueryPlan)
liveQueryBuilder ParameterizedQueryHash
parameterizedQueryHash RequestId
requestId HashMap ActionId ActionLogResponse
actionLogMap IO GranularPrometheusMetricsState
granularPrometheusMetricsState Maybe (Endo Value)
modifier

    startLiveQuery :: Maybe OperationName
-> (HashMap ActionId ActionLogResponse
    -> ExceptT QErr IO (SourceName, SubscriptionQueryPlan))
-> ParameterizedQueryHash
-> RequestId
-> HashMap ActionId ActionLogResponse
-> IO GranularPrometheusMetricsState
-> Maybe (Endo Value)
-> IO (Either QErr (SubscriberDetails CohortKey))
startLiveQuery Maybe OperationName
opName HashMap ActionId ActionLogResponse
-> ExceptT QErr IO (SourceName, SubscriptionQueryPlan)
liveQueryBuilder ParameterizedQueryHash
parameterizedQueryHash RequestId
requestId HashMap ActionId ActionLogResponse
actionLogMap IO GranularPrometheusMetricsState
granularPrometheusMetricsState Maybe (Endo Value)
modifier = do
      Either QErr (SourceName, SubscriptionQueryPlan)
liveQueryE <- ExceptT QErr IO (SourceName, SubscriptionQueryPlan)
-> IO (Either QErr (SourceName, SubscriptionQueryPlan))
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT QErr IO (SourceName, SubscriptionQueryPlan)
 -> IO (Either QErr (SourceName, SubscriptionQueryPlan)))
-> ExceptT QErr IO (SourceName, SubscriptionQueryPlan)
-> IO (Either QErr (SourceName, SubscriptionQueryPlan))
forall a b. (a -> b) -> a -> b
$ HashMap ActionId ActionLogResponse
-> ExceptT QErr IO (SourceName, SubscriptionQueryPlan)
liveQueryBuilder HashMap ActionId ActionLogResponse
actionLogMap

      Either QErr (SourceName, SubscriptionQueryPlan)
-> ((SourceName, SubscriptionQueryPlan)
    -> IO (SubscriberDetails CohortKey))
-> IO (Either QErr (SubscriberDetails CohortKey))
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
t a -> (a -> f b) -> f (t b)
for Either QErr (SourceName, SubscriptionQueryPlan)
liveQueryE (((SourceName, SubscriptionQueryPlan)
  -> IO (SubscriberDetails CohortKey))
 -> IO (Either QErr (SubscriberDetails CohortKey)))
-> ((SourceName, SubscriptionQueryPlan)
    -> IO (SubscriberDetails CohortKey))
-> IO (Either QErr (SubscriberDetails CohortKey))
forall a b. (a -> b) -> a -> b
$ \(SourceName
sourceName, E.SubscriptionQueryPlan AnyBackend MultiplexedSubscriptionQueryPlan
exists) -> do
        let subscriberMetadata :: SubscriberMetadata
subscriberMetadata = WSId
-> OperationId
-> Maybe OperationName
-> RequestId
-> SubscriberMetadata
ES.mkSubscriberMetadata (WSConn -> WSId
forall a. WSConn a -> WSId
WS.getWSId WSConn
wsConn) OperationId
opId Maybe OperationName
opName RequestId
requestId
        -- NOTE!: we mask async exceptions higher in the call stack, but it's
        -- crucial we don't lose lqId after addLiveQuery returns successfully.
        !SubscriberDetails CohortKey
lqId <- IO (SubscriberDetails CohortKey)
-> IO (SubscriberDetails CohortKey)
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (SubscriberDetails CohortKey)
 -> IO (SubscriberDetails CohortKey))
-> IO (SubscriberDetails CohortKey)
-> IO (SubscriberDetails CohortKey)
forall a b. (a -> b) -> a -> b
$ forall (c :: BackendType -> Constraint) (i :: BackendType -> *) r.
AllBackendsSatisfy c =>
AnyBackend i -> (forall (b :: BackendType). c b => i b -> r) -> r
AB.dispatchAnyBackend @BackendTransport
          AnyBackend MultiplexedSubscriptionQueryPlan
exists
          \(E.MultiplexedSubscriptionQueryPlan SubscriptionQueryPlan b (MultiplexedQuery b)
liveQueryPlan) ->
            Logger Hasura
-> ServerMetrics
-> PrometheusMetrics
-> SubscriberMetadata
-> SubscriptionsState
-> IO (LiveQueriesOptions, LiveQueriesOptions)
-> SourceName
-> ParameterizedQueryHash
-> Maybe OperationName
-> RequestId
-> SubscriptionQueryPlan b (MultiplexedQuery b)
-> IO GranularPrometheusMetricsState
-> OnChange
-> Maybe (Endo Value)
-> IO (SubscriberDetails CohortKey)
forall (b :: BackendType).
BackendTransport b =>
Logger Hasura
-> ServerMetrics
-> PrometheusMetrics
-> SubscriberMetadata
-> SubscriptionsState
-> IO (LiveQueriesOptions, LiveQueriesOptions)
-> SourceName
-> ParameterizedQueryHash
-> Maybe OperationName
-> RequestId
-> SubscriptionQueryPlan b (MultiplexedQuery b)
-> IO GranularPrometheusMetricsState
-> OnChange
-> Maybe (Endo Value)
-> IO (SubscriberDetails CohortKey)
ES.addLiveQuery
              Logger Hasura
logger
              (WSServerEnv impl -> ServerMetrics
forall impl. WSServerEnv impl -> ServerMetrics
_wseServerMetrics WSServerEnv impl
serverEnv)
              (WSServerEnv impl -> PrometheusMetrics
forall impl. WSServerEnv impl -> PrometheusMetrics
_wsePrometheusMetrics WSServerEnv impl
serverEnv)
              SubscriberMetadata
subscriberMetadata
              SubscriptionsState
subscriptionsState
              IO (LiveQueriesOptions, LiveQueriesOptions)
getSubscriptionOptions
              SourceName
sourceName
              ParameterizedQueryHash
parameterizedQueryHash
              Maybe OperationName
opName
              RequestId
requestId
              SubscriptionQueryPlan b (MultiplexedQuery b)
liveQueryPlan
              IO GranularPrometheusMetricsState
granularPrometheusMetricsState
              (Maybe OperationName
-> ParameterizedQueryHash -> Maybe Name -> OnChange
onChange Maybe OperationName
opName ParameterizedQueryHash
parameterizedQueryHash (Maybe Name -> OnChange) -> Maybe Name -> OnChange
forall a b. (a -> b) -> a -> b
$ SubscriptionQueryPlan b (MultiplexedQuery b) -> Maybe Name
forall (b :: BackendType) q.
SubscriptionQueryPlan b q -> Maybe Name
ES._sqpNamespace SubscriptionQueryPlan b (MultiplexedQuery b)
liveQueryPlan)
              Maybe (Endo Value)
modifier

        IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ $String
String
-> (SubscriberDetails CohortKey, Maybe OperationName) -> IO ()
forall a. String -> a -> IO ()
assertNFHere (SubscriberDetails CohortKey
lqId, Maybe OperationName
opName) -- so we don't write thunks to mutable vars
        STM () -> IO ()
forall a. STM a -> IO a
STM.atomically
          (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
          -- NOTE: see crucial `lookup` check above, ensuring this doesn't clobber:
          (SubscriberType, Maybe OperationName)
-> OperationId -> OperationMap -> STM ()
forall key value.
(Eq key, Hashable key) =>
value -> key -> Map key value -> STM ()
STMMap.insert (SubscriberDetails CohortKey -> SubscriberType
LiveQuerySubscriber SubscriberDetails CohortKey
lqId, Maybe OperationName
opName) OperationId
opId OperationMap
opMap
        SubscriberDetails CohortKey -> IO (SubscriberDetails CohortKey)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SubscriberDetails CohortKey
lqId

    startStreamingQuery :: RootFieldAlias
-> (SourceName, SubscriptionQueryPlan)
-> ParameterizedQueryHash
-> RequestId
-> IO GranularPrometheusMetricsState
-> Maybe (Endo Value)
-> IO ()
startStreamingQuery RootFieldAlias
rootFieldName (SourceName
sourceName, E.SubscriptionQueryPlan AnyBackend MultiplexedSubscriptionQueryPlan
exists) ParameterizedQueryHash
parameterizedQueryHash RequestId
requestId IO GranularPrometheusMetricsState
granularPrometheusMetricsState Maybe (Endo Value)
modifier = do
      let !opName :: Maybe OperationName
opName = GQLReqUnparsed -> Maybe OperationName
forall a. GQLReq a -> Maybe OperationName
_grOperationName GQLReqUnparsed
q
          subscriberMetadata :: SubscriberMetadata
subscriberMetadata = WSId
-> OperationId
-> Maybe OperationName
-> RequestId
-> SubscriberMetadata
ES.mkSubscriberMetadata (WSConn -> WSId
forall a. WSConn a -> WSId
WS.getWSId WSConn
wsConn) OperationId
opId Maybe OperationName
opName RequestId
requestId
      -- NOTE!: we mask async exceptions higher in the call stack, but it's
      -- crucial we don't lose lqId after addLiveQuery returns successfully.
      SubscriberDetails (CohortKey, TVar CursorVariableValues)
streamSubscriberId <- IO (SubscriberDetails (CohortKey, TVar CursorVariableValues))
-> IO (SubscriberDetails (CohortKey, TVar CursorVariableValues))
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (SubscriberDetails (CohortKey, TVar CursorVariableValues))
 -> IO (SubscriberDetails (CohortKey, TVar CursorVariableValues)))
-> IO (SubscriberDetails (CohortKey, TVar CursorVariableValues))
-> IO (SubscriberDetails (CohortKey, TVar CursorVariableValues))
forall a b. (a -> b) -> a -> b
$ forall (c :: BackendType -> Constraint) (i :: BackendType -> *) r.
AllBackendsSatisfy c =>
AnyBackend i -> (forall (b :: BackendType). c b => i b -> r) -> r
AB.dispatchAnyBackend @BackendTransport
        AnyBackend MultiplexedSubscriptionQueryPlan
exists
        \(E.MultiplexedSubscriptionQueryPlan SubscriptionQueryPlan b (MultiplexedQuery b)
streamQueryPlan) ->
          Logger Hasura
-> ServerMetrics
-> PrometheusMetrics
-> SubscriberMetadata
-> SubscriptionsState
-> IO (LiveQueriesOptions, LiveQueriesOptions)
-> SourceName
-> ParameterizedQueryHash
-> Maybe OperationName
-> RequestId
-> Name
-> SubscriptionQueryPlan b (MultiplexedQuery b)
-> IO GranularPrometheusMetricsState
-> OnChange
-> Maybe (Endo Value)
-> IO (SubscriberDetails (CohortKey, TVar CursorVariableValues))
forall (b :: BackendType).
BackendTransport b =>
Logger Hasura
-> ServerMetrics
-> PrometheusMetrics
-> SubscriberMetadata
-> SubscriptionsState
-> IO (LiveQueriesOptions, LiveQueriesOptions)
-> SourceName
-> ParameterizedQueryHash
-> Maybe OperationName
-> RequestId
-> Name
-> SubscriptionQueryPlan b (MultiplexedQuery b)
-> IO GranularPrometheusMetricsState
-> OnChange
-> Maybe (Endo Value)
-> IO (SubscriberDetails (CohortKey, TVar CursorVariableValues))
ES.addStreamSubscriptionQuery
            Logger Hasura
logger
            (WSServerEnv impl -> ServerMetrics
forall impl. WSServerEnv impl -> ServerMetrics
_wseServerMetrics WSServerEnv impl
serverEnv)
            (WSServerEnv impl -> PrometheusMetrics
forall impl. WSServerEnv impl -> PrometheusMetrics
_wsePrometheusMetrics WSServerEnv impl
serverEnv)
            SubscriberMetadata
subscriberMetadata
            SubscriptionsState
subscriptionsState
            IO (LiveQueriesOptions, LiveQueriesOptions)
getSubscriptionOptions
            SourceName
sourceName
            ParameterizedQueryHash
parameterizedQueryHash
            Maybe OperationName
opName
            RequestId
requestId
            (RootFieldAlias -> Name
_rfaAlias RootFieldAlias
rootFieldName)
            SubscriptionQueryPlan b (MultiplexedQuery b)
streamQueryPlan
            IO GranularPrometheusMetricsState
granularPrometheusMetricsState
            (Maybe OperationName
-> ParameterizedQueryHash -> Maybe Name -> OnChange
onChange Maybe OperationName
opName ParameterizedQueryHash
parameterizedQueryHash (Maybe Name -> OnChange) -> Maybe Name -> OnChange
forall a b. (a -> b) -> a -> b
$ SubscriptionQueryPlan b (MultiplexedQuery b) -> Maybe Name
forall (b :: BackendType) q.
SubscriptionQueryPlan b q -> Maybe Name
ES._sqpNamespace SubscriptionQueryPlan b (MultiplexedQuery b)
streamQueryPlan)
            Maybe (Endo Value)
modifier
      IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ $String
String
-> (SubscriberDetails (CohortKey, TVar CursorVariableValues),
    Maybe OperationName)
-> IO ()
forall a. String -> a -> IO ()
assertNFHere (SubscriberDetails (CohortKey, TVar CursorVariableValues)
streamSubscriberId, Maybe OperationName
opName) -- so we don't write thunks to mutable vars
      STM () -> IO ()
forall a. STM a -> IO a
STM.atomically
        (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
        -- NOTE: see crucial `lookup` check above, ensuring this doesn't clobber:
        (SubscriberType, Maybe OperationName)
-> OperationId -> OperationMap -> STM ()
forall key value.
(Eq key, Hashable key) =>
value -> key -> Map key value -> STM ()
STMMap.insert (SubscriberDetails (CohortKey, TVar CursorVariableValues)
-> SubscriberType
StreamingQuerySubscriber SubscriberDetails (CohortKey, TVar CursorVariableValues)
streamSubscriberId, Maybe OperationName
opName) OperationId
opId OperationMap
opMap
      () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

    -- on change, send message on the websocket
    onChange :: Maybe OperationName -> ParameterizedQueryHash -> Maybe Name -> ES.OnChange
    onChange :: Maybe OperationName
-> ParameterizedQueryHash -> Maybe Name -> OnChange
onChange Maybe OperationName
opName ParameterizedQueryHash
queryHash Maybe Name
namespace = \case
      Right (ES.SubscriptionResponse ByteString
bs DiffTime
dTime) ->
        WSConn
-> ServerMsg
-> Maybe OperationName
-> Maybe ParameterizedQueryHash
-> SubscriptionMetadata
-> IO ()
forall (m :: * -> *).
MonadIO m =>
WSConn
-> ServerMsg
-> Maybe OperationName
-> Maybe ParameterizedQueryHash
-> SubscriptionMetadata
-> m ()
sendMsgWithMetadata
          WSConn
wsConn
          (DataMsg -> ServerMsg
sendDataMsg (DataMsg -> ServerMsg) -> DataMsg -> ServerMsg
forall a b. (a -> b) -> a -> b
$ OperationId -> GQResponse -> DataMsg
DataMsg OperationId
opId (GQResponse -> DataMsg) -> GQResponse -> DataMsg
forall a b. (a -> b) -> a -> b
$ ByteString -> GQResponse
forall a. a -> Either GQExecError a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ByteString -> GQResponse) -> ByteString -> GQResponse
forall a b. (a -> b) -> a -> b
$ (ByteString -> ByteString)
-> (Name -> ByteString -> ByteString)
-> Maybe Name
-> ByteString
-> ByteString
forall b a. b -> (a -> b) -> Maybe a -> b
maybe ByteString -> ByteString
LBS.fromStrict Name -> ByteString -> ByteString
wrapNamespace Maybe Name
namespace ByteString
bs)
          Maybe OperationName
opName
          (ParameterizedQueryHash -> Maybe ParameterizedQueryHash
forall a. a -> Maybe a
Just ParameterizedQueryHash
queryHash)
          (DiffTime -> SubscriptionMetadata
ES.SubscriptionMetadata DiffTime
dTime)
      Either GQExecError SubscriptionResponse
resp ->
        WSConn -> ServerMsg -> IO ()
forall (m :: * -> *). MonadIO m => WSConn -> ServerMsg -> m ()
sendMsg WSConn
wsConn
          (ServerMsg -> IO ()) -> ServerMsg -> IO ()
forall a b. (a -> b) -> a -> b
$ DataMsg -> ServerMsg
sendDataMsg
          (DataMsg -> ServerMsg) -> DataMsg -> ServerMsg
forall a b. (a -> b) -> a -> b
$ OperationId -> GQResponse -> DataMsg
DataMsg OperationId
opId
          (GQResponse -> DataMsg) -> GQResponse -> DataMsg
forall a b. (a -> b) -> a -> b
$ ByteString -> ByteString
LBS.fromStrict
          (ByteString -> ByteString)
-> (SubscriptionResponse -> ByteString)
-> SubscriptionResponse
-> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SubscriptionResponse -> ByteString
ES._lqrPayload
          (SubscriptionResponse -> ByteString)
-> Either GQExecError SubscriptionResponse -> GQResponse
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Either GQExecError SubscriptionResponse
resp

    -- If the source has a namespace then we need to wrap the response
    -- from the DB in that namespace.
    wrapNamespace :: Name -> ByteString -> LBS.ByteString
    wrapNamespace :: Name -> ByteString -> ByteString
wrapNamespace Name
namespace ByteString
bs =
      EncJSON -> ByteString
encJToLBS (EncJSON -> ByteString) -> EncJSON -> ByteString
forall a b. (a -> b) -> a -> b
$ [(Text, EncJSON)] -> EncJSON
encJFromAssocList [(Name -> Text
unName Name
namespace, ByteString -> EncJSON
encJFromBS ByteString
bs)]

    catchAndIgnore :: ExceptT () m () -> m ()
    catchAndIgnore :: ExceptT () m () -> m ()
catchAndIgnore ExceptT () m ()
m = m (Either () ()) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m (Either () ()) -> m ()) -> m (Either () ()) -> m ()
forall a b. (a -> b) -> a -> b
$ ExceptT () m () -> m (Either () ())
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT ExceptT () m ()
m

    reportGQLQueryError :: Maybe G.OperationType -> IO ()
    reportGQLQueryError :: Maybe OperationType -> IO ()
reportGQLQueryError = \case
      Maybe OperationType
Nothing ->
        IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Counter -> IO ()
Prometheus.Counter.inc (GraphQLRequestMetrics -> Counter
gqlRequestsUnknownFailure GraphQLRequestMetrics
gqlMetrics)
      Just OperationType
opType -> case OperationType
opType of
        OperationType
G.OperationTypeQuery ->
          IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Counter -> IO ()
Prometheus.Counter.inc (GraphQLRequestMetrics -> Counter
gqlRequestsQueryFailure GraphQLRequestMetrics
gqlMetrics)
        OperationType
G.OperationTypeMutation ->
          IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Counter -> IO ()
Prometheus.Counter.inc (GraphQLRequestMetrics -> Counter
gqlRequestsMutationFailure GraphQLRequestMetrics
gqlMetrics)
        OperationType
G.OperationTypeSubscription ->
          IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Counter -> IO ()
Prometheus.Counter.inc (GraphQLRequestMetrics -> Counter
gqlRequestsSubscriptionFailure GraphQLRequestMetrics
gqlMetrics)

    -- Tally and record execution times for successful GraphQL requests.
    recordGQLQuerySuccess :: DiffTime -> G.OperationType -> IO ()
    recordGQLQuerySuccess :: DiffTime -> OperationType -> IO ()
recordGQLQuerySuccess DiffTime
totalTime = \case
      OperationType
G.OperationTypeQuery -> IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
        Counter -> IO ()
Prometheus.Counter.inc (GraphQLRequestMetrics -> Counter
gqlRequestsQuerySuccess GraphQLRequestMetrics
gqlMetrics)
        Histogram -> Double -> IO ()
Prometheus.Histogram.observe (GraphQLRequestMetrics -> Histogram
gqlExecutionTimeSecondsQuery GraphQLRequestMetrics
gqlMetrics) (DiffTime -> Double
forall a b. (Real a, Fractional b) => a -> b
realToFrac DiffTime
totalTime)
      OperationType
G.OperationTypeMutation -> IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
        Counter -> IO ()
Prometheus.Counter.inc (GraphQLRequestMetrics -> Counter
gqlRequestsMutationSuccess GraphQLRequestMetrics
gqlMetrics)
        Histogram -> Double -> IO ()
Prometheus.Histogram.observe (GraphQLRequestMetrics -> Histogram
gqlExecutionTimeSecondsMutation GraphQLRequestMetrics
gqlMetrics) (DiffTime -> Double
forall a b. (Real a, Fractional b) => a -> b
realToFrac DiffTime
totalTime)
      OperationType
G.OperationTypeSubscription ->
        -- We do not collect metrics for subscriptions at the request level.
        () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

onMessage ::
  ( MonadIO m,
    UserAuthentication m,
    E.MonadGQLExecutionCheck m,
    MonadQueryLog m,
    MonadExecutionLog m,
    MonadExecuteQuery m,
    MC.MonadBaseControl IO m,
    MonadMetadataStorage m,
    MonadQueryTags m,
    HasResourceLimits m,
    ProvidesNetwork m,
    Tracing.MonadTrace m,
    MonadGetPolicies m
  ) =>
  HashSet (L.EngineLogType L.Hasura) ->
  IO AuthMode ->
  WSServerEnv impl ->
  WSConn ->
  LBS.ByteString ->
  WS.WSActions WSConnData ->
  Maybe (CredentialCache AgentLicenseKey) ->
  m ()
onMessage :: forall (m :: * -> *) impl.
(MonadIO m, UserAuthentication m, MonadGQLExecutionCheck m,
 MonadQueryLog m, MonadExecutionLog m, MonadExecuteQuery m,
 MonadBaseControl IO m, MonadMetadataStorage m, MonadQueryTags m,
 HasResourceLimits m, ProvidesNetwork m, MonadTrace m,
 MonadGetPolicies m) =>
HashSet (EngineLogType Hasura)
-> IO AuthMode
-> WSServerEnv impl
-> WSConn
-> ByteString
-> WSActions WSConnData
-> Maybe (CredentialCache AgentLicenseKey)
-> m ()
onMessage HashSet (EngineLogType Hasura)
enabledLogTypes IO AuthMode
authMode WSServerEnv impl
serverEnv WSConn
wsConn ByteString
msgRaw WSActions WSConnData
onMessageActions Maybe (CredentialCache AgentLicenseKey)
agentLicenseKey =
  SamplingPolicy -> Text -> m () -> m ()
forall (m :: * -> *) a.
(MonadIO m, MonadTrace m) =>
SamplingPolicy -> Text -> m a -> m a
Tracing.newTrace (WSServerEnv impl -> SamplingPolicy
forall impl. WSServerEnv impl -> SamplingPolicy
_wseTraceSamplingPolicy WSServerEnv impl
serverEnv) Text
"websocket" do
    case ByteString -> Either String ClientMsg
forall a. FromJSON a => ByteString -> Either String a
J.eitherDecode ByteString
msgRaw of
      Left String
e -> do
        let err :: ConnErrMsg
err = Text -> ConnErrMsg
ConnErrMsg (Text -> ConnErrMsg) -> Text -> ConnErrMsg
forall a b. (a -> b) -> a -> b
$ Text
"parsing ClientMessage failed: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack String
e
        Logger Hasura -> WSConn -> WSEvent -> m ()
forall (m :: * -> *).
MonadIO m =>
Logger Hasura -> WSConn -> WSEvent -> m ()
logWSEvent Logger Hasura
logger WSConn
wsConn (WSEvent -> m ()) -> WSEvent -> m ()
forall a b. (a -> b) -> a -> b
$ ConnErrMsg -> WSEvent
EConnErr ConnErrMsg
err
        IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ WSOnErrorMessageAction WSConnData
onErrAction WSConn
wsConn ConnErrMsg
err WSErrorMessage
WS.ClientMessageParseFailed
      Right ClientMsg
msg -> case ClientMsg
msg of
        -- common to both protocols
        CMConnInit Maybe ConnParams
params ->
          Logger Hasura
-> Manager
-> WSConn
-> IO AuthMode
-> Maybe ConnParams
-> WSOnErrorMessageAction WSConnData
-> WSKeepAliveMessageAction WSConnData
-> m ()
forall (m :: * -> *).
(MonadIO m, UserAuthentication m) =>
Logger Hasura
-> Manager
-> WSConn
-> IO AuthMode
-> Maybe ConnParams
-> WSOnErrorMessageAction WSConnData
-> WSKeepAliveMessageAction WSConnData
-> m ()
onConnInit
            Logger Hasura
logger
            (WSServerEnv impl -> Manager
forall impl. WSServerEnv impl -> Manager
_wseHManager WSServerEnv impl
serverEnv)
            WSConn
wsConn
            IO AuthMode
authMode
            Maybe ConnParams
params
            WSOnErrorMessageAction WSConnData
onErrAction
            WSKeepAliveMessageAction WSConnData
keepAliveMessageAction
        CMStart StartMsg
startMsg -> do
          SchemaCache
schemaCache <- IO SchemaCache -> m SchemaCache
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO SchemaCache -> m SchemaCache)
-> IO SchemaCache -> m SchemaCache
forall a b. (a -> b) -> a -> b
$ AppStateRef impl -> IO SchemaCache
forall impl. AppStateRef impl -> IO SchemaCache
getSchemaCache (AppStateRef impl -> IO SchemaCache)
-> AppStateRef impl -> IO SchemaCache
forall a b. (a -> b) -> a -> b
$ WSServerEnv impl -> AppStateRef impl
forall impl. WSServerEnv impl -> AppStateRef impl
_wseAppStateRef WSServerEnv impl
serverEnv
          let shouldCaptureVariables :: ShouldCaptureQueryVariables
shouldCaptureVariables =
                if MetricsConfig -> Bool
_mcAnalyzeQueryVariables (SchemaCache -> MetricsConfig
scMetricsConfig SchemaCache
schemaCache)
                  then ShouldCaptureQueryVariables
CaptureQueryVariables
                  else ShouldCaptureQueryVariables
DoNotCaptureQueryVariables
          HashSet (EngineLogType Hasura)
-> Maybe (CredentialCache AgentLicenseKey)
-> WSServerEnv impl
-> WSConn
-> ShouldCaptureQueryVariables
-> StartMsg
-> WSActions WSConnData
-> m ()
forall (m :: * -> *) impl.
(MonadIO m, MonadGQLExecutionCheck m, MonadQueryLog m,
 MonadExecutionLog m, MonadTrace m, MonadExecuteQuery m,
 MonadBaseControl IO m, MonadMetadataStorage m, MonadQueryTags m,
 HasResourceLimits m, ProvidesNetwork m, MonadGetPolicies m) =>
HashSet (EngineLogType Hasura)
-> Maybe (CredentialCache AgentLicenseKey)
-> WSServerEnv impl
-> WSConn
-> ShouldCaptureQueryVariables
-> StartMsg
-> WSActions WSConnData
-> m ()
onStart HashSet (EngineLogType Hasura)
enabledLogTypes Maybe (CredentialCache AgentLicenseKey)
agentLicenseKey WSServerEnv impl
serverEnv WSConn
wsConn ShouldCaptureQueryVariables
shouldCaptureVariables StartMsg
startMsg WSActions WSConnData
onMessageActions
        CMStop StopMsg
stopMsg -> do
          IO GranularPrometheusMetricsState
granularPrometheusMetricsState <- m (IO GranularPrometheusMetricsState)
forall (m :: * -> *).
MonadGetPolicies m =>
m (IO GranularPrometheusMetricsState)
runGetPrometheusMetricsGranularity
          WSServerEnv impl
-> WSConn -> StopMsg -> IO GranularPrometheusMetricsState -> m ()
forall (m :: * -> *) impl.
MonadIO m =>
WSServerEnv impl
-> WSConn -> StopMsg -> IO GranularPrometheusMetricsState -> m ()
onStop WSServerEnv impl
serverEnv WSConn
wsConn StopMsg
stopMsg IO GranularPrometheusMetricsState
granularPrometheusMetricsState
        -- specfic to graphql-ws
        CMPing Maybe PingPongPayload
mPayload -> WSConn -> Maybe PingPongPayload -> m ()
forall (m :: * -> *).
MonadIO m =>
WSConn -> Maybe PingPongPayload -> m ()
onPing WSConn
wsConn Maybe PingPongPayload
mPayload
        CMPong Maybe PingPongPayload
_mPayload -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
        -- specific to apollo clients
        ClientMsg
CMConnTerm -> IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ WSConn -> ByteString -> IO ()
forall a. WSConn a -> ByteString -> IO ()
WS.closeConn WSConn
wsConn ByteString
"GQL_CONNECTION_TERMINATE received"
  where
    logger :: Logger Hasura
logger = WSServerEnv impl -> Logger Hasura
forall impl. WSServerEnv impl -> Logger Hasura
_wseLogger WSServerEnv impl
serverEnv
    onErrAction :: WSOnErrorMessageAction WSConnData
onErrAction = WSActions WSConnData -> WSOnErrorMessageAction WSConnData
forall a. WSActions a -> WSOnErrorMessageAction a
WS._wsaOnErrorMessageAction WSActions WSConnData
onMessageActions
    keepAliveMessageAction :: WSKeepAliveMessageAction WSConnData
keepAliveMessageAction = WSActions WSConnData -> WSKeepAliveMessageAction WSConnData
forall a. WSActions a -> WSKeepAliveMessageAction a
WS._wsaKeepAliveAction WSActions WSConnData
onMessageActions

onPing :: (MonadIO m) => WSConn -> Maybe PingPongPayload -> m ()
onPing :: forall (m :: * -> *).
MonadIO m =>
WSConn -> Maybe PingPongPayload -> m ()
onPing WSConn
wsConn Maybe PingPongPayload
mPayload =
  IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ WSConn -> ServerMsg -> IO ()
forall (m :: * -> *). MonadIO m => WSConn -> ServerMsg -> m ()
sendMsg WSConn
wsConn (Maybe PingPongPayload -> ServerMsg
SMPong Maybe PingPongPayload
mPayload)

onStop :: (MonadIO m) => WSServerEnv impl -> WSConn -> StopMsg -> IO GranularPrometheusMetricsState -> m ()
onStop :: forall (m :: * -> *) impl.
MonadIO m =>
WSServerEnv impl
-> WSConn -> StopMsg -> IO GranularPrometheusMetricsState -> m ()
onStop WSServerEnv impl
serverEnv WSConn
wsConn (StopMsg OperationId
opId) IO GranularPrometheusMetricsState
granularPrometheusMetricsState = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
  -- When a stop message is received for an operation, it may not be present in OpMap
  -- in these cases:
  -- 1. If the operation is a query/mutation - as we remove the operation from the
  -- OpMap as soon as it is executed
  -- 2. A misbehaving client
  -- 3. A bug on our end
  WSServerEnv impl
-> WSConn
-> OperationId
-> IO GranularPrometheusMetricsState
-> IO ()
-> IO ()
forall impl.
WSServerEnv impl
-> WSConn
-> OperationId
-> IO GranularPrometheusMetricsState
-> IO ()
-> IO ()
stopOperation WSServerEnv impl
serverEnv WSConn
wsConn OperationId
opId IO GranularPrometheusMetricsState
granularPrometheusMetricsState
    (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Logger Hasura
-> forall a (m :: * -> *).
   (ToEngineLog a Hasura, MonadIO m) =>
   a -> m ()
forall impl.
Logger impl
-> forall a (m :: * -> *).
   (ToEngineLog a impl, MonadIO m) =>
   a -> m ()
L.unLogger Logger Hasura
logger
    (UnstructuredLog -> IO ()) -> UnstructuredLog -> IO ()
forall a b. (a -> b) -> a -> b
$ LogLevel -> SerializableBlob -> UnstructuredLog
L.UnstructuredLog LogLevel
L.LevelDebug
    (SerializableBlob -> UnstructuredLog)
-> SerializableBlob -> UnstructuredLog
forall a b. (a -> b) -> a -> b
$ String -> SerializableBlob
forall a. IsString a => String -> a
fromString
    (String -> SerializableBlob) -> String -> SerializableBlob
forall a b. (a -> b) -> a -> b
$ String
"Received STOP for an operation that we have no record for: "
    String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Text -> String
forall a. Show a => a -> String
show (OperationId -> Text
unOperationId OperationId
opId)
    String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
" (could be a query/mutation operation or a misbehaving client or a bug)"
  where
    logger :: Logger Hasura
logger = WSServerEnv impl -> Logger Hasura
forall impl. WSServerEnv impl -> Logger Hasura
_wseLogger WSServerEnv impl
serverEnv

stopOperation :: WSServerEnv impl -> WSConn -> OperationId -> IO GranularPrometheusMetricsState -> IO () -> IO ()
stopOperation :: forall impl.
WSServerEnv impl
-> WSConn
-> OperationId
-> IO GranularPrometheusMetricsState
-> IO ()
-> IO ()
stopOperation WSServerEnv impl
serverEnv WSConn
wsConn OperationId
opId IO GranularPrometheusMetricsState
granularPrometheusMetricsState IO ()
logWhenOpNotExist = do
  Maybe (SubscriberType, Maybe OperationName)
opM <- IO (Maybe (SubscriberType, Maybe OperationName))
-> IO (Maybe (SubscriberType, Maybe OperationName))
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (SubscriberType, Maybe OperationName))
 -> IO (Maybe (SubscriberType, Maybe OperationName)))
-> IO (Maybe (SubscriberType, Maybe OperationName))
-> IO (Maybe (SubscriberType, Maybe OperationName))
forall a b. (a -> b) -> a -> b
$ STM (Maybe (SubscriberType, Maybe OperationName))
-> IO (Maybe (SubscriberType, Maybe OperationName))
forall a. STM a -> IO a
STM.atomically (STM (Maybe (SubscriberType, Maybe OperationName))
 -> IO (Maybe (SubscriberType, Maybe OperationName)))
-> STM (Maybe (SubscriberType, Maybe OperationName))
-> IO (Maybe (SubscriberType, Maybe OperationName))
forall a b. (a -> b) -> a -> b
$ OperationId
-> OperationMap
-> STM (Maybe (SubscriberType, Maybe OperationName))
forall key value.
(Eq key, Hashable key) =>
key -> Map key value -> STM (Maybe value)
STMMap.lookup OperationId
opId OperationMap
opMap
  case Maybe (SubscriberType, Maybe OperationName)
opM of
    Just (SubscriberType
subscriberDetails, Maybe OperationName
operationName) -> do
      Logger Hasura -> WSConn -> WSEvent -> IO ()
forall (m :: * -> *).
MonadIO m =>
Logger Hasura -> WSConn -> WSEvent -> m ()
logWSEvent Logger Hasura
logger WSConn
wsConn (WSEvent -> IO ()) -> WSEvent -> IO ()
forall a b. (a -> b) -> a -> b
$ OperationDetails -> WSEvent
EOperation (OperationDetails -> WSEvent) -> OperationDetails -> WSEvent
forall a b. (a -> b) -> a -> b
$ Maybe OperationName -> OperationDetails
opDet Maybe OperationName
operationName
      case SubscriberType
subscriberDetails of
        LiveQuerySubscriber SubscriberDetails CohortKey
lqId ->
          Logger Hasura
-> ServerMetrics
-> PrometheusMetrics
-> SubscriptionsState
-> SubscriberDetails CohortKey
-> IO GranularPrometheusMetricsState
-> Maybe OperationName
-> IO ()
ES.removeLiveQuery Logger Hasura
logger (WSServerEnv impl -> ServerMetrics
forall impl. WSServerEnv impl -> ServerMetrics
_wseServerMetrics WSServerEnv impl
serverEnv) (WSServerEnv impl -> PrometheusMetrics
forall impl. WSServerEnv impl -> PrometheusMetrics
_wsePrometheusMetrics WSServerEnv impl
serverEnv) SubscriptionsState
subscriptionState SubscriberDetails CohortKey
lqId IO GranularPrometheusMetricsState
granularPrometheusMetricsState Maybe OperationName
operationName
        StreamingQuerySubscriber SubscriberDetails (CohortKey, TVar CursorVariableValues)
streamSubscriberId ->
          Logger Hasura
-> ServerMetrics
-> PrometheusMetrics
-> SubscriptionsState
-> SubscriberDetails (CohortKey, TVar CursorVariableValues)
-> IO GranularPrometheusMetricsState
-> Maybe OperationName
-> IO ()
ES.removeStreamingQuery Logger Hasura
logger (WSServerEnv impl -> ServerMetrics
forall impl. WSServerEnv impl -> ServerMetrics
_wseServerMetrics WSServerEnv impl
serverEnv) (WSServerEnv impl -> PrometheusMetrics
forall impl. WSServerEnv impl -> PrometheusMetrics
_wsePrometheusMetrics WSServerEnv impl
serverEnv) SubscriptionsState
subscriptionState SubscriberDetails (CohortKey, TVar CursorVariableValues)
streamSubscriberId IO GranularPrometheusMetricsState
granularPrometheusMetricsState Maybe OperationName
operationName
    Maybe (SubscriberType, Maybe OperationName)
Nothing -> IO ()
logWhenOpNotExist
  STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ OperationId -> OperationMap -> STM ()
forall key value.
(Eq key, Hashable key) =>
key -> Map key value -> STM ()
STMMap.delete OperationId
opId OperationMap
opMap
  where
    logger :: Logger Hasura
logger = WSServerEnv impl -> Logger Hasura
forall impl. WSServerEnv impl -> Logger Hasura
_wseLogger WSServerEnv impl
serverEnv
    subscriptionState :: SubscriptionsState
subscriptionState = WSServerEnv impl -> SubscriptionsState
forall impl. WSServerEnv impl -> SubscriptionsState
_wseSubscriptionState WSServerEnv impl
serverEnv
    opMap :: OperationMap
opMap = WSConnData -> OperationMap
_wscOpMap (WSConnData -> OperationMap) -> WSConnData -> OperationMap
forall a b. (a -> b) -> a -> b
$ WSConn -> WSConnData
forall a. WSConn a -> a
WS.getData WSConn
wsConn
    opDet :: Maybe OperationName -> OperationDetails
opDet Maybe OperationName
n = OperationId
-> Maybe RequestId
-> Maybe OperationName
-> OpDetail
-> Maybe GQLReqUnparsed
-> Maybe ParameterizedQueryHash
-> OperationDetails
OperationDetails OperationId
opId Maybe RequestId
forall a. Maybe a
Nothing Maybe OperationName
n OpDetail
ODStopped Maybe GQLReqUnparsed
forall a. Maybe a
Nothing Maybe ParameterizedQueryHash
forall a. Maybe a
Nothing

onConnInit ::
  (MonadIO m, UserAuthentication m) =>
  L.Logger L.Hasura ->
  HTTP.Manager ->
  WSConn ->
  IO AuthMode ->
  Maybe ConnParams ->
  -- | this is the message handler for handling errors on initializing a from the client connection
  WS.WSOnErrorMessageAction WSConnData ->
  -- | this is the message handler for handling "keep-alive" messages to the client
  WS.WSKeepAliveMessageAction WSConnData ->
  m ()
onConnInit :: forall (m :: * -> *).
(MonadIO m, UserAuthentication m) =>
Logger Hasura
-> Manager
-> WSConn
-> IO AuthMode
-> Maybe ConnParams
-> WSOnErrorMessageAction WSConnData
-> WSKeepAliveMessageAction WSConnData
-> m ()
onConnInit Logger Hasura
logger Manager
manager WSConn
wsConn IO AuthMode
getAuthMode Maybe ConnParams
connParamsM WSOnErrorMessageAction WSConnData
onConnInitErrAction WSKeepAliveMessageAction WSConnData
keepAliveMessageAction = do
  -- TODO(from master): what should be the behaviour of connection_init message when a
  -- connection is already iniatilized? Currently, we seem to be doing
  -- something arbitrary which isn't correct. Ideally, we should stick to
  -- this:
  --
  -- > Allow connection_init message only when the connection state is
  -- 'not initialised'. This means that there is no reason for the
  -- connection to be in `CSInitError` state.
  WSConnState
connState <- IO WSConnState -> m WSConnState
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (TVar WSConnState -> IO WSConnState
forall a. TVar a -> IO a
STM.readTVarIO (WSConnData -> TVar WSConnState
_wscUser (WSConnData -> TVar WSConnState) -> WSConnData -> TVar WSConnState
forall a b. (a -> b) -> a -> b
$ WSConn -> WSConnData
forall a. WSConn a -> a
WS.getData WSConn
wsConn))
  AuthMode
authMode <- IO AuthMode -> m AuthMode
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO AuthMode -> m AuthMode) -> IO AuthMode -> m AuthMode
forall a b. (a -> b) -> a -> b
$ IO AuthMode
getAuthMode
  case WSConnState -> Either Text IpAddress
getIpAddress WSConnState
connState of
    Left Text
err -> Text -> m ()
unexpectedInitError Text
err
    Right IpAddress
ipAddress -> do
      let headers :: [Header]
headers = WSConnState -> [Header]
mkHeaders WSConnState
connState
      Either QErr (UserInfo, Maybe UTCTime, [Header], ExtraUserInfo)
res <- Logger Hasura
-> Manager
-> [Header]
-> AuthMode
-> Maybe ReqsText
-> m (Either
        QErr (UserInfo, Maybe UTCTime, [Header], ExtraUserInfo))
forall (m :: * -> *).
UserAuthentication m =>
Logger Hasura
-> Manager
-> [Header]
-> AuthMode
-> Maybe ReqsText
-> m (Either
        QErr (UserInfo, Maybe UTCTime, [Header], ExtraUserInfo))
resolveUserInfo Logger Hasura
logger Manager
manager [Header]
headers AuthMode
authMode Maybe ReqsText
forall a. Maybe a
Nothing

      case Either QErr (UserInfo, Maybe UTCTime, [Header], ExtraUserInfo)
res of
        Left QErr
e -> do
          let !initErr :: WSConnState
initErr = Text -> WSConnState
CSInitError (Text -> WSConnState) -> Text -> WSConnState
forall a b. (a -> b) -> a -> b
$ QErr -> Text
qeError QErr
e
          IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
            $String
String -> WSConnState -> IO ()
forall a. String -> a -> IO ()
assertNFHere WSConnState
initErr -- so we don't write thunks to mutable vars
            STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar WSConnState -> WSConnState -> STM ()
forall a. TVar a -> a -> STM ()
STM.writeTVar (WSConnData -> TVar WSConnState
_wscUser (WSConnData -> TVar WSConnState) -> WSConnData -> TVar WSConnState
forall a b. (a -> b) -> a -> b
$ WSConn -> WSConnData
forall a. WSConn a -> a
WS.getData WSConn
wsConn) WSConnState
initErr

          let connErr :: ConnErrMsg
connErr = Text -> ConnErrMsg
ConnErrMsg (Text -> ConnErrMsg) -> Text -> ConnErrMsg
forall a b. (a -> b) -> a -> b
$ QErr -> Text
qeError QErr
e
          Logger Hasura -> WSConn -> WSEvent -> m ()
forall (m :: * -> *).
MonadIO m =>
Logger Hasura -> WSConn -> WSEvent -> m ()
logWSEvent Logger Hasura
logger WSConn
wsConn (WSEvent -> m ()) -> WSEvent -> m ()
forall a b. (a -> b) -> a -> b
$ ConnErrMsg -> WSEvent
EConnErr ConnErrMsg
connErr
          IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ WSOnErrorMessageAction WSConnData
onConnInitErrAction WSConn
wsConn ConnErrMsg
connErr WSErrorMessage
WS.ConnInitFailed
        -- we're ignoring the auth headers as headers are irrelevant in websockets
        Right (UserInfo
userInfo, Maybe UTCTime
expTimeM, [Header]
_authHeaders, ExtraUserInfo
_) -> do
          let !csInit :: WSConnState
csInit = WsClientState -> WSConnState
CSInitialised (WsClientState -> WSConnState) -> WsClientState -> WSConnState
forall a b. (a -> b) -> a -> b
$ UserInfo -> Maybe UTCTime -> [Header] -> IpAddress -> WsClientState
WsClientState UserInfo
userInfo Maybe UTCTime
expTimeM [Header]
paramHeaders IpAddress
ipAddress
          IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
            $String
String -> WSConnState -> IO ()
forall a. String -> a -> IO ()
assertNFHere WSConnState
csInit -- so we don't write thunks to mutable vars
            STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar WSConnState -> WSConnState -> STM ()
forall a. TVar a -> a -> STM ()
STM.writeTVar (WSConnData -> TVar WSConnState
_wscUser (WSConnData -> TVar WSConnState) -> WSConnData -> TVar WSConnState
forall a b. (a -> b) -> a -> b
$ WSConn -> WSConnData
forall a. WSConn a -> a
WS.getData WSConn
wsConn) WSConnState
csInit

          WSConn -> ServerMsg -> m ()
forall (m :: * -> *). MonadIO m => WSConn -> ServerMsg -> m ()
sendMsg WSConn
wsConn ServerMsg
SMConnAck
          IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ WSKeepAliveMessageAction WSConnData
keepAliveMessageAction WSConn
wsConn
  where
    unexpectedInitError :: Text -> m ()
unexpectedInitError Text
e = do
      let connErr :: ConnErrMsg
connErr = Text -> ConnErrMsg
ConnErrMsg Text
e
      Logger Hasura -> WSConn -> WSEvent -> m ()
forall (m :: * -> *).
MonadIO m =>
Logger Hasura -> WSConn -> WSEvent -> m ()
logWSEvent Logger Hasura
logger WSConn
wsConn (WSEvent -> m ()) -> WSEvent -> m ()
forall a b. (a -> b) -> a -> b
$ ConnErrMsg -> WSEvent
EConnErr ConnErrMsg
connErr
      IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ WSOnErrorMessageAction WSConnData
onConnInitErrAction WSConn
wsConn ConnErrMsg
connErr WSErrorMessage
WS.ConnInitFailed

    getIpAddress :: WSConnState -> Either Text IpAddress
getIpAddress = \case
      CSNotInitialised WsHeaders
_ IpAddress
ip -> IpAddress -> Either Text IpAddress
forall a. a -> Either Text a
forall (m :: * -> *) a. Monad m => a -> m a
return IpAddress
ip
      CSInitialised WsClientState {[Header]
Maybe UTCTime
IpAddress
UserInfo
wscsUserInfo :: WsClientState -> UserInfo
wscsTokenExpTime :: WsClientState -> Maybe UTCTime
wscsReqHeaders :: WsClientState -> [Header]
wscsIpAddress :: WsClientState -> IpAddress
wscsUserInfo :: UserInfo
wscsTokenExpTime :: Maybe UTCTime
wscsReqHeaders :: [Header]
wscsIpAddress :: IpAddress
..} -> IpAddress -> Either Text IpAddress
forall a. a -> Either Text a
forall (m :: * -> *) a. Monad m => a -> m a
return IpAddress
wscsIpAddress
      CSInitError Text
e -> Text -> Either Text IpAddress
forall a b. a -> Either a b
Left Text
e

    mkHeaders :: WSConnState -> [Header]
mkHeaders WSConnState
st =
      [Header]
paramHeaders [Header] -> [Header] -> [Header]
forall a. [a] -> [a] -> [a]
++ WSConnState -> [Header]
getClientHdrs WSConnState
st

    paramHeaders :: [Header]
paramHeaders =
      [ (ByteString -> CI ByteString
forall s. FoldCase s => s -> CI s
CI.mk (ByteString -> CI ByteString) -> ByteString -> CI ByteString
forall a b. (a -> b) -> a -> b
$ Text -> ByteString
TE.encodeUtf8 Text
h, Text -> ByteString
TE.encodeUtf8 Text
v)
        | (Text
h, Text
v) <- TraceMetadata
-> (HashMap Text Text -> TraceMetadata)
-> Maybe (HashMap Text Text)
-> TraceMetadata
forall b a. b -> (a -> b) -> Maybe a -> b
maybe [] HashMap Text Text -> TraceMetadata
forall k v. HashMap k v -> [(k, v)]
HashMap.toList (Maybe (HashMap Text Text) -> TraceMetadata)
-> Maybe (HashMap Text Text) -> TraceMetadata
forall a b. (a -> b) -> a -> b
$ Maybe ConnParams
connParamsM Maybe ConnParams
-> (ConnParams -> Maybe (HashMap Text Text))
-> Maybe (HashMap Text Text)
forall a b. Maybe a -> (a -> Maybe b) -> Maybe b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ConnParams -> Maybe (HashMap Text Text)
_cpHeaders
      ]

    getClientHdrs :: WSConnState -> [Header]
getClientHdrs WSConnState
st = case WSConnState
st of
      CSNotInitialised WsHeaders
h IpAddress
_ -> WsHeaders -> [Header]
unWsHeaders WsHeaders
h
      WSConnState
_ -> []

onClose ::
  (MonadIO m) =>
  L.Logger L.Hasura ->
  ServerMetrics ->
  PrometheusMetrics ->
  ES.SubscriptionsState ->
  WSConn ->
  IO GranularPrometheusMetricsState ->
  m ()
onClose :: forall (m :: * -> *).
MonadIO m =>
Logger Hasura
-> ServerMetrics
-> PrometheusMetrics
-> SubscriptionsState
-> WSConn
-> IO GranularPrometheusMetricsState
-> m ()
onClose Logger Hasura
logger ServerMetrics
serverMetrics PrometheusMetrics
prometheusMetrics SubscriptionsState
subscriptionsState WSConn
wsConn IO GranularPrometheusMetricsState
granularPrometheusMetricsState = do
  Logger Hasura -> WSConn -> WSEvent -> m ()
forall (m :: * -> *).
MonadIO m =>
Logger Hasura -> WSConn -> WSEvent -> m ()
logWSEvent Logger Hasura
logger WSConn
wsConn WSEvent
EClosed
  [(OperationId, (SubscriberType, Maybe OperationName))]
operations <- IO [(OperationId, (SubscriberType, Maybe OperationName))]
-> m [(OperationId, (SubscriberType, Maybe OperationName))]
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [(OperationId, (SubscriberType, Maybe OperationName))]
 -> m [(OperationId, (SubscriberType, Maybe OperationName))])
-> IO [(OperationId, (SubscriberType, Maybe OperationName))]
-> m [(OperationId, (SubscriberType, Maybe OperationName))]
forall a b. (a -> b) -> a -> b
$ STM [(OperationId, (SubscriberType, Maybe OperationName))]
-> IO [(OperationId, (SubscriberType, Maybe OperationName))]
forall a. STM a -> IO a
STM.atomically (STM [(OperationId, (SubscriberType, Maybe OperationName))]
 -> IO [(OperationId, (SubscriberType, Maybe OperationName))])
-> STM [(OperationId, (SubscriberType, Maybe OperationName))]
-> IO [(OperationId, (SubscriberType, Maybe OperationName))]
forall a b. (a -> b) -> a -> b
$ ListT STM (OperationId, (SubscriberType, Maybe OperationName))
-> STM [(OperationId, (SubscriberType, Maybe OperationName))]
forall (m :: * -> *) a. Monad m => ListT m a -> m [a]
ListT.toList (ListT STM (OperationId, (SubscriberType, Maybe OperationName))
 -> STM [(OperationId, (SubscriberType, Maybe OperationName))])
-> ListT STM (OperationId, (SubscriberType, Maybe OperationName))
-> STM [(OperationId, (SubscriberType, Maybe OperationName))]
forall a b. (a -> b) -> a -> b
$ OperationMap
-> ListT STM (OperationId, (SubscriberType, Maybe OperationName))
forall key value. Map key value -> ListT STM (key, value)
STMMap.listT OperationMap
opMap
  IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
    (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ [(OperationId, (SubscriberType, Maybe OperationName))]
-> ((OperationId, (SubscriberType, Maybe OperationName)) -> IO ())
-> IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ [(OperationId, (SubscriberType, Maybe OperationName))]
operations
    (((OperationId, (SubscriberType, Maybe OperationName)) -> IO ())
 -> IO ())
-> ((OperationId, (SubscriberType, Maybe OperationName)) -> IO ())
-> IO ()
forall a b. (a -> b) -> a -> b
$ \(OperationId
_, (SubscriberType
subscriber, Maybe OperationName
operationName)) ->
      case SubscriberType
subscriber of
        LiveQuerySubscriber SubscriberDetails CohortKey
lqId -> Logger Hasura
-> ServerMetrics
-> PrometheusMetrics
-> SubscriptionsState
-> SubscriberDetails CohortKey
-> IO GranularPrometheusMetricsState
-> Maybe OperationName
-> IO ()
ES.removeLiveQuery Logger Hasura
logger ServerMetrics
serverMetrics PrometheusMetrics
prometheusMetrics SubscriptionsState
subscriptionsState SubscriberDetails CohortKey
lqId IO GranularPrometheusMetricsState
granularPrometheusMetricsState Maybe OperationName
operationName
        StreamingQuerySubscriber SubscriberDetails (CohortKey, TVar CursorVariableValues)
streamSubscriberId -> Logger Hasura
-> ServerMetrics
-> PrometheusMetrics
-> SubscriptionsState
-> SubscriberDetails (CohortKey, TVar CursorVariableValues)
-> IO GranularPrometheusMetricsState
-> Maybe OperationName
-> IO ()
ES.removeStreamingQuery Logger Hasura
logger ServerMetrics
serverMetrics PrometheusMetrics
prometheusMetrics SubscriptionsState
subscriptionsState SubscriberDetails (CohortKey, TVar CursorVariableValues)
streamSubscriberId IO GranularPrometheusMetricsState
granularPrometheusMetricsState Maybe OperationName
operationName
  where
    opMap :: OperationMap
opMap = WSConnData -> OperationMap
_wscOpMap (WSConnData -> OperationMap) -> WSConnData -> OperationMap
forall a b. (a -> b) -> a -> b
$ WSConn -> WSConnData
forall a. WSConn a -> a
WS.getData WSConn
wsConn

newtype WebsocketCloseOnMetadataChangeAction = WebsocketCloseOnMetadataChangeAction
  { WebsocketCloseOnMetadataChangeAction -> IO ()
runWebsocketCloseOnMetadataChangeAction :: IO ()
  }

-- | By default, we close all the websocket connections when the metadata changes. This function is used to create the
-- action that will be run when the metadata changes.
mkCloseWebsocketsOnMetadataChangeAction :: WS.WSServer WS.WSConnData -> WebsocketCloseOnMetadataChangeAction
mkCloseWebsocketsOnMetadataChangeAction :: WSServer -> WebsocketCloseOnMetadataChangeAction
mkCloseWebsocketsOnMetadataChangeAction WSServer
wsServer =
  IO () -> WebsocketCloseOnMetadataChangeAction
WebsocketCloseOnMetadataChangeAction
    (IO () -> WebsocketCloseOnMetadataChangeAction)
-> IO () -> WebsocketCloseOnMetadataChangeAction
forall a b. (a -> b) -> a -> b
$ WSServer
-> String
-> ByteString
-> (SecuritySensitiveUserConfig -> SecuritySensitiveUserConfig)
-> IO ()
forall a.
WSServer a
-> String
-> ByteString
-> (SecuritySensitiveUserConfig -> SecuritySensitiveUserConfig)
-> IO ()
WS.closeAllConnectionsWithReason
      WSServer
wsServer
      String
"Closing all websocket connections as the metadata has changed"
      ByteString
"Server state changed, restarting the server"
      SecuritySensitiveUserConfig -> SecuritySensitiveUserConfig
forall a. a -> a
id