Safe Haskell | None |
---|---|
Language | Haskell2010 |
Top-level management of subscription poller threads. The implementation of the polling itself is in Hasura.GraphQL.Execute.Subscription.Poll. See Hasura.GraphQL.Execute.Subscription for high-level details.
Synopsis
- data SubscriptionsState = SubscriptionsState {}
- initSubscriptionsState :: LiveQueriesOptions -> StreamQueriesOptions -> SubscriptionPostPollHook -> IO SubscriptionsState
- dumpSubscriptionsState :: Bool -> SubscriptionsState -> IO Value
- data SubscriberDetails a = SubscriberDetails {
- _sdPoller :: !PollerKey
- _sdCohort :: !a
- _sdSubscriber :: !SubscriberId
- type LiveQuerySubscriberDetails = SubscriberDetails CohortKey
- type StreamingSubscriberDetails = SubscriberDetails (CohortKey, TVar CursorVariableValues)
- findPollerForSubscriber :: Subscriber -> CohortId -> PollerMap streamCursorVars -> PollerKey -> CohortKey -> (Subscriber -> Cohort streamCursorVars -> STM streamCursorVars) -> (Subscriber -> CohortId -> Poller streamCursorVars -> STM streamCursorVars) -> STM (Maybe (Poller streamCursorVars), streamCursorVars)
- addLiveQuery :: forall b. BackendTransport b => Logger Hasura -> ServerMetrics -> PrometheusMetrics -> SubscriberMetadata -> SubscriptionsState -> SourceName -> ParameterizedQueryHash -> Maybe OperationName -> RequestId -> SubscriptionQueryPlan b (MultiplexedQuery b) -> OnChange -> IO LiveQuerySubscriberDetails
- addStreamSubscriptionQuery :: forall b. BackendTransport b => Logger Hasura -> ServerMetrics -> PrometheusMetrics -> SubscriberMetadata -> SubscriptionsState -> SourceName -> ParameterizedQueryHash -> Maybe OperationName -> RequestId -> Name -> SubscriptionQueryPlan b (MultiplexedQuery b) -> OnChange -> IO StreamingSubscriberDetails
- removeLiveQuery :: Logger Hasura -> ServerMetrics -> PrometheusMetrics -> SubscriptionsState -> LiveQuerySubscriberDetails -> IO ()
- removeStreamingQuery :: Logger Hasura -> ServerMetrics -> PrometheusMetrics -> SubscriptionsState -> StreamingSubscriberDetails -> IO ()
- data LiveAsyncActionQueryOnSource = LiveAsyncActionQueryOnSource {}
- data LiveAsyncActionQueryWithNoRelationships = LiveAsyncActionQueryWithNoRelationships {
- _laaqwnrSendResponse :: !(ActionLogResponseMap -> IO ())
- _laaqwnrSendCompleted :: !(IO ())
- data LiveAsyncActionQuery
- data AsyncActionQueryLive = AsyncActionQueryLive {
- _aaqlActionIds :: !(NonEmpty ActionId)
- _aaqlOnException :: !(QErr -> IO ())
- _aaqlLiveExecution :: !LiveAsyncActionQuery
- type AsyncActionSubscriptionState = TMap OperationId AsyncActionQueryLive
- addAsyncActionLiveQuery :: AsyncActionSubscriptionState -> OperationId -> NonEmpty ActionId -> (QErr -> IO ()) -> LiveAsyncActionQuery -> IO ()
- removeAsyncActionLiveQuery :: AsyncActionSubscriptionState -> OperationId -> IO ()
Documentation
data SubscriptionsState Source #
The top-level datatype that holds the state for all active subscriptions.
NOTE!: This must be kept consistent with a websocket connection's
OperationMap
, in onClose
and onStart
.
SubscriptionsState | |
|
initSubscriptionsState :: LiveQueriesOptions -> StreamQueriesOptions -> SubscriptionPostPollHook -> IO SubscriptionsState Source #
dumpSubscriptionsState :: Bool -> SubscriptionsState -> IO Value Source #
data SubscriberDetails a Source #
SubscriberDetails contains the data required to locate a subscriber in the correct cohort within the correct poller in the operation map.
SubscriberDetails | |
|
Instances
Show a => Show (SubscriberDetails a) Source # | |
Defined in Hasura.GraphQL.Execute.Subscription.State showsPrec :: Int -> SubscriberDetails a -> ShowS # show :: SubscriberDetails a -> String # showList :: [SubscriberDetails a] -> ShowS # |
type StreamingSubscriberDetails = SubscriberDetails (CohortKey, TVar CursorVariableValues) Source #
The CohortKey
contains the variables with which the subscription was started
and which will remain unchanged. The second type contains the mutable reference
through which we can get the latest value of the cursor and using both the CohortKey
and the latest cursor value, we locate the subscriber in the operation map to find its
details and then stop it.
findPollerForSubscriber :: Subscriber -> CohortId -> PollerMap streamCursorVars -> PollerKey -> CohortKey -> (Subscriber -> Cohort streamCursorVars -> STM streamCursorVars) -> (Subscriber -> CohortId -> Poller streamCursorVars -> STM streamCursorVars) -> STM (Maybe (Poller streamCursorVars), streamCursorVars) Source #
findPollerForSubscriber
places a subscriber in the correct poller.
If the poller doesn't exist then we create one otherwise we return the
existing one.
:: forall b. BackendTransport b | |
=> Logger Hasura | |
-> ServerMetrics | |
-> PrometheusMetrics | |
-> SubscriberMetadata | |
-> SubscriptionsState | |
-> SourceName | |
-> ParameterizedQueryHash | |
-> Maybe OperationName | operation name of the query |
-> RequestId | |
-> SubscriptionQueryPlan b (MultiplexedQuery b) | |
-> OnChange | the action to be executed when result changes |
-> IO LiveQuerySubscriberDetails |
Fork a thread handling a regular (live query) subscription
addStreamSubscriptionQuery Source #
:: forall b. BackendTransport b | |
=> Logger Hasura | |
-> ServerMetrics | |
-> PrometheusMetrics | |
-> SubscriberMetadata | |
-> SubscriptionsState | |
-> SourceName | |
-> ParameterizedQueryHash | |
-> Maybe OperationName | operation name of the query |
-> RequestId | |
-> Name | root field name |
-> SubscriptionQueryPlan b (MultiplexedQuery b) | |
-> OnChange | the action to be executed when result changes |
-> IO StreamingSubscriberDetails |
Fork a thread handling a streaming subscription
removeLiveQuery :: Logger Hasura -> ServerMetrics -> PrometheusMetrics -> SubscriptionsState -> LiveQuerySubscriberDetails -> IO () Source #
removeStreamingQuery :: Logger Hasura -> ServerMetrics -> PrometheusMetrics -> SubscriptionsState -> StreamingSubscriberDetails -> IO () Source #
data LiveAsyncActionQueryOnSource Source #
An async action query whose relationships are refered to table in a source. We need to generate an SQL statement with the action response and execute it in the source database so as to fetch response joined with relationship rows. For more details see Note [Resolving async action query]
LiveAsyncActionQueryOnSource | |
|
data LiveAsyncActionQueryWithNoRelationships Source #
LiveAsyncActionQueryWithNoRelationships | |
|
data LiveAsyncActionQuery Source #
data AsyncActionQueryLive Source #
AsyncActionQueryLive | |
|
type AsyncActionSubscriptionState = TMap OperationId AsyncActionQueryLive Source #
A share-able state map which stores an async action live query with it's subscription operation id
addAsyncActionLiveQuery :: AsyncActionSubscriptionState -> OperationId -> NonEmpty ActionId -> (QErr -> IO ()) -> LiveAsyncActionQuery -> IO () Source #