module Hasura.GraphQL.Transport.WSServerApp
( createWSServerApp,
stopWSServerApp,
createWSServerEnv,
)
where
import Control.Concurrent.Async.Lifted.Safe qualified as LA
import Control.Concurrent.STM qualified as STM
import Control.Exception.Lifted
import Control.Monad.Trans.Control qualified as MC
import Data.Aeson (object, toJSON, (.=))
import Data.ByteString.Char8 qualified as B (pack)
import Data.Environment qualified as Env
import Data.Text (pack, unpack)
import Hasura.GraphQL.Execute qualified as E
import Hasura.GraphQL.Execute.Backend qualified as EB
import Hasura.GraphQL.Execute.Subscription.State qualified as ES
import Hasura.GraphQL.Logging
import Hasura.GraphQL.Transport.HTTP (MonadExecuteQuery)
import Hasura.GraphQL.Transport.Instances ()
import Hasura.GraphQL.Transport.WebSocket
import Hasura.GraphQL.Transport.WebSocket.Protocol
import Hasura.GraphQL.Transport.WebSocket.Server qualified as WS
import Hasura.GraphQL.Transport.WebSocket.Types
import Hasura.Logging qualified as L
import Hasura.Metadata.Class
import Hasura.Prelude
import Hasura.RQL.Types.Common
import Hasura.RQL.Types.SchemaCache
import Hasura.Server.Auth (AuthMode, UserAuthentication)
import Hasura.Server.Cors
import Hasura.Server.Init.Config
( KeepAliveDelay,
WSConnectionInitTimeout,
)
import Hasura.Server.Limits
import Hasura.Server.Metrics (ServerMetrics (..))
import Hasura.Server.Prometheus
( PrometheusMetrics (..),
decWebsocketConnections,
incWebsocketConnections,
)
import Hasura.Server.Types (ReadOnlyMode)
import Hasura.Tracing qualified as Tracing
import Network.HTTP.Client qualified as HTTP
import Network.WebSockets qualified as WS
import System.Metrics.Gauge qualified as EKG.Gauge
createWSServerApp ::
( MonadIO m,
MC.MonadBaseControl IO m,
LA.Forall (LA.Pure m),
UserAuthentication (Tracing.TraceT m),
E.MonadGQLExecutionCheck m,
WS.MonadWSLog m,
MonadQueryLog m,
Tracing.HasReporter m,
MonadExecuteQuery m,
MonadMetadataStorage (MetadataStorageT m),
EB.MonadQueryTags m,
HasResourceLimits m
) =>
Env.Environment ->
HashSet (L.EngineLogType L.Hasura) ->
AuthMode ->
WSServerEnv ->
WSConnectionInitTimeout ->
WS.HasuraServerApp m
createWSServerApp :: Environment
-> HashSet (EngineLogType Hasura)
-> AuthMode
-> WSServerEnv
-> WSConnectionInitTimeout
-> HasuraServerApp m
createWSServerApp Environment
env HashSet (EngineLogType Hasura)
enabledLogTypes AuthMode
authMode WSServerEnv
serverEnv WSConnectionInitTimeout
connInitTimeout = \ !IpAddress
ipAddress !PendingConnection
pendingConn ->
WSConnectionInitTimeout
-> WSServer WSConnData
-> WSHandlers m WSConnData
-> HasuraServerApp m
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m, Forall (Pure m),
MonadWSLog m) =>
WSConnectionInitTimeout
-> WSServer a -> WSHandlers m a -> HasuraServerApp m
WS.createServerApp WSConnectionInitTimeout
connInitTimeout (WSServerEnv -> WSServer WSConnData
_wseServer WSServerEnv
serverEnv) WSHandlers m WSConnData
handlers IpAddress
ipAddress PendingConnection
pendingConn
where
handlers :: WSHandlers m WSConnData
handlers =
(WSId
-> RequestHead
-> IpAddress
-> WSSubProtocol
-> m (Either RejectRequest (AcceptWith WSConnData)))
-> (WSConn WSConnData -> ByteString -> WSSubProtocol -> m ())
-> OnCloseH m WSConnData
-> WSHandlers m WSConnData
forall (m :: * -> *) a.
(WSId
-> RequestHead
-> IpAddress
-> WSSubProtocol
-> m (Either RejectRequest (AcceptWith a)))
-> (WSConn a -> ByteString -> WSSubProtocol -> m ())
-> OnCloseH m a
-> WSHandlers m a
WS.WSHandlers
WSId
-> RequestHead
-> IpAddress
-> WSSubProtocol
-> m (Either RejectRequest (AcceptWith WSConnData))
onConnHandler
WSConn WSConnData -> ByteString -> WSSubProtocol -> m ()
onMessageHandler
OnCloseH m WSConnData
onCloseHandler
logger :: Logger Hasura
logger = WSServerEnv -> Logger Hasura
_wseLogger WSServerEnv
serverEnv
serverMetrics :: ServerMetrics
serverMetrics = WSServerEnv -> ServerMetrics
_wseServerMetrics WSServerEnv
serverEnv
prometheusMetrics :: PrometheusMetrics
prometheusMetrics = WSServerEnv -> PrometheusMetrics
_wsePrometheusMetrics WSServerEnv
serverEnv
wsActions :: WSSubProtocol -> WSActions WSConnData
wsActions = Logger Hasura -> WSSubProtocol -> WSActions WSConnData
mkWSActions Logger Hasura
logger
onConnHandler :: WSId
-> RequestHead
-> IpAddress
-> WSSubProtocol
-> m (Either RejectRequest (AcceptWith WSConnData))
onConnHandler WSId
rid RequestHead
rh IpAddress
ip WSSubProtocol
sp = m (Either RejectRequest (AcceptWith WSConnData))
-> m (Either RejectRequest (AcceptWith WSConnData))
forall (m :: * -> *) a. MonadBaseControl IO m => m a -> m a
mask_ do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Gauge -> IO ()
EKG.Gauge.inc (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ ServerMetrics -> Gauge
smWebsocketConnections ServerMetrics
serverMetrics
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ ConnectionsGauge -> IO ()
incWebsocketConnections (ConnectionsGauge -> IO ()) -> ConnectionsGauge -> IO ()
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics -> ConnectionsGauge
pmConnections PrometheusMetrics
prometheusMetrics
(ReaderT
WSServerEnv m (Either RejectRequest (AcceptWith WSConnData))
-> WSServerEnv -> m (Either RejectRequest (AcceptWith WSConnData)))
-> WSServerEnv
-> ReaderT
WSServerEnv m (Either RejectRequest (AcceptWith WSConnData))
-> m (Either RejectRequest (AcceptWith WSConnData))
forall a b c. (a -> b -> c) -> b -> a -> c
flip ReaderT
WSServerEnv m (Either RejectRequest (AcceptWith WSConnData))
-> WSServerEnv -> m (Either RejectRequest (AcceptWith WSConnData))
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT WSServerEnv
serverEnv (ReaderT
WSServerEnv m (Either RejectRequest (AcceptWith WSConnData))
-> m (Either RejectRequest (AcceptWith WSConnData)))
-> ReaderT
WSServerEnv m (Either RejectRequest (AcceptWith WSConnData))
-> m (Either RejectRequest (AcceptWith WSConnData))
forall a b. (a -> b) -> a -> b
$ OnConnH (ReaderT WSServerEnv m) WSConnData
forall (m :: * -> *).
(MonadIO m, MonadReader WSServerEnv m) =>
OnConnH m WSConnData
onConn WSId
rid RequestHead
rh IpAddress
ip (WSSubProtocol -> WSActions WSConnData
wsActions WSSubProtocol
sp)
onMessageHandler :: WSConn WSConnData -> ByteString -> WSSubProtocol -> m ()
onMessageHandler WSConn WSConnData
conn ByteString
bs WSSubProtocol
sp =
m () -> m ()
forall (m :: * -> *) a. MonadBaseControl IO m => m a -> m a
mask_ (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
Environment
-> HashSet (EngineLogType Hasura)
-> AuthMode
-> WSServerEnv
-> WSConn WSConnData
-> ByteString
-> WSActions WSConnData
-> m ()
forall (m :: * -> *).
(MonadIO m, UserAuthentication (TraceT m),
MonadGQLExecutionCheck m, MonadQueryLog m, HasReporter m,
MonadExecuteQuery m, MonadBaseControl IO m,
MonadMetadataStorage (MetadataStorageT m), MonadQueryTags m,
HasResourceLimits m) =>
Environment
-> HashSet (EngineLogType Hasura)
-> AuthMode
-> WSServerEnv
-> WSConn WSConnData
-> ByteString
-> WSActions WSConnData
-> m ()
onMessage Environment
env HashSet (EngineLogType Hasura)
enabledLogTypes AuthMode
authMode WSServerEnv
serverEnv WSConn WSConnData
conn ByteString
bs (WSSubProtocol -> WSActions WSConnData
wsActions WSSubProtocol
sp)
onCloseHandler :: OnCloseH m WSConnData
onCloseHandler WSConn WSConnData
conn = m () -> m ()
forall (m :: * -> *) a. MonadBaseControl IO m => m a -> m a
mask_ do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Gauge -> IO ()
EKG.Gauge.dec (Gauge -> IO ()) -> Gauge -> IO ()
forall a b. (a -> b) -> a -> b
$ ServerMetrics -> Gauge
smWebsocketConnections ServerMetrics
serverMetrics
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ ConnectionsGauge -> IO ()
decWebsocketConnections (ConnectionsGauge -> IO ()) -> ConnectionsGauge -> IO ()
forall a b. (a -> b) -> a -> b
$ PrometheusMetrics -> ConnectionsGauge
pmConnections PrometheusMetrics
prometheusMetrics
Logger Hasura
-> ServerMetrics
-> PrometheusMetrics
-> SubscriptionsState
-> OnCloseH m WSConnData
forall (m :: * -> *).
MonadIO m =>
Logger Hasura
-> ServerMetrics
-> PrometheusMetrics
-> SubscriptionsState
-> WSConn WSConnData
-> m ()
onClose Logger Hasura
logger ServerMetrics
serverMetrics PrometheusMetrics
prometheusMetrics (WSServerEnv -> SubscriptionsState
_wseSubscriptionState WSServerEnv
serverEnv) WSConn WSConnData
conn
stopWSServerApp :: WSServerEnv -> IO ()
stopWSServerApp :: WSServerEnv -> IO ()
stopWSServerApp WSServerEnv
wsEnv = WSServer WSConnData -> IO ()
forall a. WSServer a -> IO ()
WS.shutdown (WSServerEnv -> WSServer WSConnData
_wseServer WSServerEnv
wsEnv)
createWSServerEnv ::
(MonadIO m) =>
L.Logger L.Hasura ->
ES.SubscriptionsState ->
IO (SchemaCache, SchemaCacheVer) ->
HTTP.Manager ->
CorsPolicy ->
SQLGenCtx ->
ReadOnlyMode ->
Bool ->
KeepAliveDelay ->
ServerMetrics ->
PrometheusMetrics ->
m WSServerEnv
createWSServerEnv :: Logger Hasura
-> SubscriptionsState
-> IO (SchemaCache, SchemaCacheVer)
-> Manager
-> CorsPolicy
-> SQLGenCtx
-> ReadOnlyMode
-> Bool
-> KeepAliveDelay
-> ServerMetrics
-> PrometheusMetrics
-> m WSServerEnv
createWSServerEnv
Logger Hasura
logger
SubscriptionsState
lqState
IO (SchemaCache, SchemaCacheVer)
getSchemaCache
Manager
httpManager
CorsPolicy
corsPolicy
SQLGenCtx
sqlGenCtx
ReadOnlyMode
readOnlyMode
Bool
enableAL
KeepAliveDelay
keepAliveDelay
ServerMetrics
serverMetrics
PrometheusMetrics
prometheusMetrics = do
WSServer WSConnData
wsServer <- IO (WSServer WSConnData) -> m (WSServer WSConnData)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (WSServer WSConnData) -> m (WSServer WSConnData))
-> IO (WSServer WSConnData) -> m (WSServer WSConnData)
forall a b. (a -> b) -> a -> b
$ STM (WSServer WSConnData) -> IO (WSServer WSConnData)
forall a. STM a -> IO a
STM.atomically (STM (WSServer WSConnData) -> IO (WSServer WSConnData))
-> STM (WSServer WSConnData) -> IO (WSServer WSConnData)
forall a b. (a -> b) -> a -> b
$ Logger Hasura -> STM (WSServer WSConnData)
forall a. Logger Hasura -> STM (WSServer a)
WS.createWSServer Logger Hasura
logger
WSServerEnv -> m WSServerEnv
forall (f :: * -> *) a. Applicative f => a -> f a
pure (WSServerEnv -> m WSServerEnv) -> WSServerEnv -> m WSServerEnv
forall a b. (a -> b) -> a -> b
$
Logger Hasura
-> SubscriptionsState
-> IO (SchemaCache, SchemaCacheVer)
-> Manager
-> CorsPolicy
-> SQLGenCtx
-> ReadOnlyMode
-> WSServer WSConnData
-> Bool
-> KeepAliveDelay
-> ServerMetrics
-> PrometheusMetrics
-> WSServerEnv
WSServerEnv
Logger Hasura
logger
SubscriptionsState
lqState
IO (SchemaCache, SchemaCacheVer)
getSchemaCache
Manager
httpManager
CorsPolicy
corsPolicy
SQLGenCtx
sqlGenCtx
ReadOnlyMode
readOnlyMode
WSServer WSConnData
wsServer
Bool
enableAL
KeepAliveDelay
keepAliveDelay
ServerMetrics
serverMetrics
PrometheusMetrics
prometheusMetrics
mkWSActions :: L.Logger L.Hasura -> WSSubProtocol -> WS.WSActions WSConnData
mkWSActions :: Logger Hasura -> WSSubProtocol -> WSActions WSConnData
mkWSActions Logger Hasura
logger WSSubProtocol
subProtocol =
WSPostExecErrMessageAction WSConnData
-> WSOnErrorMessageAction WSConnData
-> WSCloseConnAction WSConnData
-> WSKeepAliveMessageAction WSConnData
-> (DataMsg -> ServerMsg)
-> AcceptRequest
-> ([Value] -> Value)
-> WSActions WSConnData
forall a.
WSPostExecErrMessageAction a
-> WSOnErrorMessageAction a
-> WSCloseConnAction a
-> WSKeepAliveMessageAction a
-> (DataMsg -> ServerMsg)
-> AcceptRequest
-> ([Value] -> Value)
-> WSActions a
WS.WSActions
WSPostExecErrMessageAction WSConnData
mkPostExecErrMessageAction
WSOnErrorMessageAction WSConnData
mkOnErrorMessageAction
WSCloseConnAction WSConnData
mkConnectionCloseAction
WSKeepAliveMessageAction WSConnData
keepAliveAction
DataMsg -> ServerMsg
getServerMsgType
AcceptRequest
mkAcceptRequest
[Value] -> Value
fmtErrorMessage
where
mkPostExecErrMessageAction :: WSPostExecErrMessageAction WSConnData
mkPostExecErrMessageAction WSConn WSConnData
wsConn OperationId
opId GQExecError
execErr =
WSConn WSConnData -> ServerMsg -> IO ()
forall (m :: * -> *).
MonadIO m =>
WSConn WSConnData -> ServerMsg -> m ()
sendMsg WSConn WSConnData
wsConn (ServerMsg -> IO ()) -> ServerMsg -> IO ()
forall a b. (a -> b) -> a -> b
$ case WSSubProtocol
subProtocol of
WSSubProtocol
Apollo -> DataMsg -> ServerMsg
SMData (DataMsg -> ServerMsg) -> DataMsg -> ServerMsg
forall a b. (a -> b) -> a -> b
$ OperationId -> GQResponse -> DataMsg
DataMsg OperationId
opId (GQResponse -> DataMsg) -> GQResponse -> DataMsg
forall a b. (a -> b) -> a -> b
$ GQExecError -> GQResponse
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError GQExecError
execErr
WSSubProtocol
GraphQLWS -> ErrorMsg -> ServerMsg
SMErr (ErrorMsg -> ServerMsg) -> ErrorMsg -> ServerMsg
forall a b. (a -> b) -> a -> b
$ OperationId -> Value -> ErrorMsg
ErrorMsg OperationId
opId (Value -> ErrorMsg) -> Value -> ErrorMsg
forall a b. (a -> b) -> a -> b
$ GQExecError -> Value
forall a. ToJSON a => a -> Value
toJSON GQExecError
execErr
mkOnErrorMessageAction :: WSOnErrorMessageAction WSConnData
mkOnErrorMessageAction WSConn WSConnData
wsConn ConnErrMsg
err Maybe String
mErrMsg = case WSSubProtocol
subProtocol of
WSSubProtocol
Apollo -> WSConn WSConnData -> ServerMsg -> IO ()
forall (m :: * -> *).
MonadIO m =>
WSConn WSConnData -> ServerMsg -> m ()
sendMsg WSConn WSConnData
wsConn (ServerMsg -> IO ()) -> ServerMsg -> IO ()
forall a b. (a -> b) -> a -> b
$ ConnErrMsg -> ServerMsg
SMConnErr ConnErrMsg
err
WSSubProtocol
GraphQLWS -> Logger Hasura
-> WSConn WSConnData
-> ServerErrorCode
-> Maybe ServerMsg
-> Maybe Word16
-> IO ()
forall (m :: * -> *).
MonadIO m =>
Logger Hasura
-> WSConn WSConnData
-> ServerErrorCode
-> Maybe ServerMsg
-> Maybe Word16
-> m ()
sendCloseWithMsg Logger Hasura
logger WSConn WSConnData
wsConn (String -> ServerErrorCode
GenericError4400 (String -> ServerErrorCode) -> String -> ServerErrorCode
forall a b. (a -> b) -> a -> b
$ (String -> Maybe String -> String
forall a. a -> Maybe a -> a
fromMaybe String
"" Maybe String
mErrMsg) String -> String -> String
forall a. Semigroup a => a -> a -> a
<> (Text -> String
unpack (Text -> String) -> (ConnErrMsg -> Text) -> ConnErrMsg -> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ConnErrMsg -> Text
unConnErrMsg (ConnErrMsg -> String) -> ConnErrMsg -> String
forall a b. (a -> b) -> a -> b
$ ConnErrMsg
err)) Maybe ServerMsg
forall a. Maybe a
Nothing Maybe Word16
forall a. Maybe a
Nothing
mkConnectionCloseAction :: WSCloseConnAction WSConnData
mkConnectionCloseAction WSConn WSConnData
wsConn OperationId
opId String
errMsg =
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (WSSubProtocol
subProtocol WSSubProtocol -> WSSubProtocol -> Bool
forall a. Eq a => a -> a -> Bool
== WSSubProtocol
GraphQLWS) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
Logger Hasura
-> WSConn WSConnData
-> ServerErrorCode
-> Maybe ServerMsg
-> Maybe Word16
-> IO ()
forall (m :: * -> *).
MonadIO m =>
Logger Hasura
-> WSConn WSConnData
-> ServerErrorCode
-> Maybe ServerMsg
-> Maybe Word16
-> m ()
sendCloseWithMsg Logger Hasura
logger WSConn WSConnData
wsConn (String -> ServerErrorCode
GenericError4400 String
errMsg) (ServerMsg -> Maybe ServerMsg
forall a. a -> Maybe a
Just (ServerMsg -> Maybe ServerMsg)
-> (ErrorMsg -> ServerMsg) -> ErrorMsg -> Maybe ServerMsg
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ErrorMsg -> ServerMsg
SMErr (ErrorMsg -> Maybe ServerMsg) -> ErrorMsg -> Maybe ServerMsg
forall a b. (a -> b) -> a -> b
$ OperationId -> Value -> ErrorMsg
ErrorMsg OperationId
opId (Value -> ErrorMsg) -> Value -> ErrorMsg
forall a b. (a -> b) -> a -> b
$ Text -> Value
forall a. ToJSON a => a -> Value
toJSON (String -> Text
pack String
errMsg)) (Word16 -> Maybe Word16
forall a. a -> Maybe a
Just Word16
1000)
getServerMsgType :: DataMsg -> ServerMsg
getServerMsgType = case WSSubProtocol
subProtocol of
WSSubProtocol
Apollo -> DataMsg -> ServerMsg
SMData
WSSubProtocol
GraphQLWS -> DataMsg -> ServerMsg
SMNext
keepAliveAction :: WSKeepAliveMessageAction WSConnData
keepAliveAction WSConn WSConnData
wsConn = WSConn WSConnData -> ServerMsg -> IO ()
forall (m :: * -> *).
MonadIO m =>
WSConn WSConnData -> ServerMsg -> m ()
sendMsg WSConn WSConnData
wsConn (ServerMsg -> IO ()) -> ServerMsg -> IO ()
forall a b. (a -> b) -> a -> b
$
case WSSubProtocol
subProtocol of
WSSubProtocol
Apollo -> ServerMsg
SMConnKeepAlive
WSSubProtocol
GraphQLWS -> Maybe PingPongPayload -> ServerMsg
SMPing (Maybe PingPongPayload -> ServerMsg)
-> (PingPongPayload -> Maybe PingPongPayload)
-> PingPongPayload
-> ServerMsg
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PingPongPayload -> Maybe PingPongPayload
forall a. a -> Maybe a
Just (PingPongPayload -> ServerMsg) -> PingPongPayload -> ServerMsg
forall a b. (a -> b) -> a -> b
$ PingPongPayload
keepAliveMessage
mkAcceptRequest :: AcceptRequest
mkAcceptRequest =
AcceptRequest
WS.defaultAcceptRequest
{ acceptSubprotocol :: Maybe ByteString
WS.acceptSubprotocol = ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just (ByteString -> Maybe ByteString)
-> (WSSubProtocol -> ByteString)
-> WSSubProtocol
-> Maybe ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> ByteString
B.pack (String -> ByteString)
-> (WSSubProtocol -> String) -> WSSubProtocol -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. WSSubProtocol -> String
showSubProtocol (WSSubProtocol -> Maybe ByteString)
-> WSSubProtocol -> Maybe ByteString
forall a b. (a -> b) -> a -> b
$ WSSubProtocol
subProtocol
}
fmtErrorMessage :: [Value] -> Value
fmtErrorMessage [Value]
errMsgs = case WSSubProtocol
subProtocol of
WSSubProtocol
Apollo -> [Pair] -> Value
object [Key
"errors" Key -> [Value] -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= [Value]
errMsgs]
WSSubProtocol
GraphQLWS -> [Value] -> Value
forall a. ToJSON a => a -> Value
toJSON [Value]
errMsgs