Safe Haskell | Safe-Inferred |
---|---|
Language | Haskell2010 |
Reasonably efficient PostgreSQL live queries
The module implements query multiplexing, which is our implementation strategy for live queries (i.e. GraphQL subscriptions) made against Postgres. Fundamentally, our implementation is built around polling, which is never ideal, but it’s a lot easier to implement than trying to do something event-based. To minimize the resource cost of polling, we use multiplexing, which is essentially a two-tier batching strategy.
The high-level idea
The objective is to minimize the number of concurrent polling workers to reduce database load as much as possible. A very naïve strategy would be to group identical queries together so we only have one poller per unique active subscription. That’s a good start, but of course, in practice, most queries differ slightly. However, it happens that they very frequently /only differ in their variables/ (that is, GraphQL query variables and session variables), and in those cases, we try to generated parameterized SQL. This means that the same prepared SQL query can be reused, just with a different set of variables.
To give a concrete example, consider the following query:
subscription vote_count($post_id: Int!) { vote_count(where: {post_id: {_eq: $post_id}}) { votes } }
No matter what the client provides for $post_id
, we will always generate the same SQL:
SELECT votes FROM vote_count WHERE post_id = $1
If multiple clients subscribe to vote_count
, we can certainly reuse the same prepared query. For
example, imagine we had 10 concurrent subscribers, each listening on a distinct $post_id
:
let postIds = [3, 11, 32, 56, 13, 97, 24, 43, 109, 48]
We could iterate over postIds
in Haskell, executing the same prepared query 10 times:
for postIds $ \postId -> PG.withQE defaultTxErrorHandler preparedQuery (Identity postId) True
Sadly, that on its own isn’t good enough. The overhead of running each query is large enough that Postgres becomes overwhelmed if we have to serve lots of concurrent subscribers. Therefore, what we want to be able to do is somehow make one query instead of ten.
Multiplexing
This is where multiplexing comes in. By taking advantage of Postgres
lateral joins,
we can do the iteration in Postgres rather than in Haskell, allowing us to pay the query overhead
just once for all ten subscribers. Essentially, lateral joins add map
-like functionality to SQL,
so we can run our query once per $post_id
:
SELECT results.votes FROM unnest($1::integer[]) query_variables (post_id) LEFT JOIN LATERAL ( SELECT coalesce(json_agg(votes), '[]') FROM vote_count WHERE vote_count.post_id = query_variables.post_id ) results ON true
If we generalize this approach just a little bit more, we can apply this transformation to arbitrary queries parameterized over arbitrary session and query variables!
Implementation overview
To support query multiplexing, we maintain a tree of the following types, where >
should be read
as “contains”:
SubscriptionsState
>Poller
>Cohort
>Subscriber
Here’s a brief summary of each type’s role:
- A
Subscriber
is an actual client with an open websocket connection. - A
Cohort
is a set ofSubscriber
s that are all subscribed to the same query /with the exact same variables/. (By batching these together, we can do better than multiplexing, since we can just query the data once.) - A
Poller
is a worker thread for a single, multiplexed query. It fetches data for a set ofCohort
s that all use the same parameterized query, but have different sets of variables. - Finally, the
SubscriptionsState
is the top-level container that holds all the activePoller
s.
Additional details are provided by the documentation for individual bindings.
Synopsis
- newtype ValidatedVariables f = ValidatedVariables {}
- unValidatedVariables :: forall f f. Iso (ValidatedVariables f) (ValidatedVariables f) (f TxtEncodedVal) (f TxtEncodedVal)
- type ValidatedQueryVariables = ValidatedVariables (HashMap Name)
- type ValidatedSyntheticVariables = ValidatedVariables []
- type ValidatedCursorVariables = ValidatedVariables (HashMap Name)
- mkUnsafeValidateVariables :: f TxtEncodedVal -> ValidatedVariables f
- data CohortId
- newCohortId :: MonadIO m => m CohortId
- dummyCohortId :: CohortId
- data CohortVariables
- cvSyntheticVariables :: Lens' CohortVariables ValidatedSyntheticVariables
- cvSessionVariables :: Lens' CohortVariables SessionVariables
- cvQueryVariables :: Lens' CohortVariables ValidatedQueryVariables
- cvCursorVariables :: Lens' CohortVariables ValidatedCursorVariables
- modifyCursorCohortVariables :: ValidatedCursorVariables -> CohortVariables -> CohortVariables
- mkCohortVariables :: HashSet SessionVariable -> SessionVariables -> ValidatedQueryVariables -> ValidatedSyntheticVariables -> ValidatedCursorVariables -> CohortVariables
- newtype CohortIdArray = CohortIdArray {
- unCohortIdArray :: [CohortId]
- newtype CohortVariablesArray = CohortVariablesArray {}
- applyModifier :: Maybe (Endo Value) -> ByteString -> ByteString
- data SubscriptionQueryPlan (b :: BackendType) q = SubscriptionQueryPlan {}
- data ParameterizedSubscriptionQueryPlan (b :: BackendType) q = ParameterizedSubscriptionQueryPlan {
- _plqpRole :: !RoleName
- _plqpQuery :: !q
- data SubscriptionQueryPlanExplanation = SubscriptionQueryPlanExplanation {
- _sqpeSql :: !Text
- _sqpePlan :: ![Text]
- _sqpeVariables :: !CohortVariables
- newtype CursorVariableValues = CursorVariableValues (HashMap Name TxtEncodedVal)
Documentation
newtype ValidatedVariables f Source #
When running multiplexed queries, we have to be especially careful about user input, since invalid values will cause the query to fail, causing collateral damage for anyone else multiplexed into the same query. Therefore, we pre-validate variables against Postgres by executing a no-op query of the shape
SELECT 'v1'::t1, 'v2'::t2, ..., 'vn'::tn
so if any variable values are invalid, the error will be caught early.
Instances
unValidatedVariables :: forall f f. Iso (ValidatedVariables f) (ValidatedVariables f) (f TxtEncodedVal) (f TxtEncodedVal) Source #
type ValidatedSyntheticVariables = ValidatedVariables [] Source #
newCohortId :: MonadIO m => m CohortId Source #
data CohortVariables Source #
Instances
modifyCursorCohortVariables :: ValidatedCursorVariables -> CohortVariables -> CohortVariables Source #
mkCohortVariables :: HashSet SessionVariable -> SessionVariables -> ValidatedQueryVariables -> ValidatedSyntheticVariables -> ValidatedCursorVariables -> CohortVariables Source #
Builds a cohort's variables by only using the session variables that are required for the subscription
newtype CohortIdArray Source #
Instances
Show CohortIdArray Source # | |
Defined in Hasura.GraphQL.Execute.Subscription.Plan showsPrec :: Int -> CohortIdArray -> ShowS # show :: CohortIdArray -> String # showList :: [CohortIdArray] -> ShowS # | |
Eq CohortIdArray Source # | |
Defined in Hasura.GraphQL.Execute.Subscription.Plan (==) :: CohortIdArray -> CohortIdArray -> Bool # (/=) :: CohortIdArray -> CohortIdArray -> Bool # | |
ToPrepArg CohortIdArray Source # | |
Defined in Hasura.GraphQL.Execute.Subscription.Plan toPrepVal :: CohortIdArray -> PrepArg Source # |
newtype CohortVariablesArray Source #
Instances
Show CohortVariablesArray Source # | |
Defined in Hasura.GraphQL.Execute.Subscription.Plan showsPrec :: Int -> CohortVariablesArray -> ShowS # show :: CohortVariablesArray -> String # showList :: [CohortVariablesArray] -> ShowS # | |
Eq CohortVariablesArray Source # | |
Defined in Hasura.GraphQL.Execute.Subscription.Plan (==) :: CohortVariablesArray -> CohortVariablesArray -> Bool # (/=) :: CohortVariablesArray -> CohortVariablesArray -> Bool # | |
ToPrepArg CohortVariablesArray Source # | |
Defined in Hasura.GraphQL.Execute.Subscription.Plan |
applyModifier :: Maybe (Endo Value) -> ByteString -> ByteString Source #
data SubscriptionQueryPlan (b :: BackendType) q Source #
A self-contained, ready-to-execute subscription plan. Contains enough information to find an existing poller that this can be added to or to create a new poller if necessary.
SubscriptionQueryPlan | |
|
data ParameterizedSubscriptionQueryPlan (b :: BackendType) q Source #
Instances
ToJSON q => ToJSON (ParameterizedSubscriptionQueryPlan b q) Source # | |
Defined in Hasura.GraphQL.Execute.Subscription.Plan toJSON :: ParameterizedSubscriptionQueryPlan b q -> Value Source # toEncoding :: ParameterizedSubscriptionQueryPlan b q -> Encoding Source # toJSONList :: [ParameterizedSubscriptionQueryPlan b q] -> Value Source # toEncodingList :: [ParameterizedSubscriptionQueryPlan b q] -> Encoding Source # | |
Show q => Show (ParameterizedSubscriptionQueryPlan b q) Source # | |
Defined in Hasura.GraphQL.Execute.Subscription.Plan showsPrec :: Int -> ParameterizedSubscriptionQueryPlan b q -> ShowS # show :: ParameterizedSubscriptionQueryPlan b q -> String # showList :: [ParameterizedSubscriptionQueryPlan b q] -> ShowS # |
data SubscriptionQueryPlanExplanation Source #
SubscriptionQueryPlanExplanation | |
|
newtype CursorVariableValues Source #
Instances
FromJSON CursorVariableValues Source # | |
Defined in Hasura.GraphQL.Execute.Subscription.Plan | |
ToJSON CursorVariableValues Source # | |
Defined in Hasura.GraphQL.Execute.Subscription.Plan toJSON :: CursorVariableValues -> Value Source # toEncoding :: CursorVariableValues -> Encoding Source # toJSONList :: [CursorVariableValues] -> Value Source # toEncodingList :: [CursorVariableValues] -> Encoding Source # | |
Show CursorVariableValues Source # | |
Defined in Hasura.GraphQL.Execute.Subscription.Plan showsPrec :: Int -> CursorVariableValues -> ShowS # show :: CursorVariableValues -> String # showList :: [CursorVariableValues] -> ShowS # | |
Eq CursorVariableValues Source # | |
Defined in Hasura.GraphQL.Execute.Subscription.Plan (==) :: CursorVariableValues -> CursorVariableValues -> Bool # (/=) :: CursorVariableValues -> CursorVariableValues -> Bool # |