-- | Module related to async action query subscriptions
module Hasura.GraphQL.Execute.Action.Subscription
  ( asyncActionSubscriptionsProcessor,
  )
where

import Control.Concurrent.Extended qualified as C
import Control.Concurrent.STM qualified as STM
import Data.List.NonEmpty qualified as NE
import Hasura.GraphQL.Execute.Action
import Hasura.GraphQL.Execute.Subscription.State
import Hasura.GraphQL.Execute.Subscription.TMap qualified as TMap
import Hasura.Metadata.Class
import Hasura.Prelude

{- Note [Async action subscriptions]
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
We have two kinds of async action query execution based on the defined relationships
on output object type. See Note [Resolving async action query]. Actions with table
relationships on output object type have to a generate SQL select query with the action
webhook response embedded in so as to execute on the source database that emits client
response with joined relationship rows (see Note [Resolving async action query]).
We can multiplex this queries and run a live subscription just like our usual database
queries. But the action log from the metadata storage is subjected to change after
calling action webhook in two ways,
1. When the webhook returns successful response, 'response_payload' column value isn't null
2. When any exception occurs in the process, 'errors' column value isn't null

So, we have a background thread (@'asyncActionSubscriptionsProcessor') which
constantly fetches the action log response from the metadata storage and restarts the
live query to reflect those changes in subscriptions.

In case of action queries without relationships, there are no SQL queries associated with.
We can't multiplex them. The thread, @'asyncActionSubscriptionsProcessor' also handles these
action subscriptions by sending the responses to the websocket client after fetching the
action log response.

We can't support multiple async query fields of different kinds in a single subscription.
-}

-- | A forever running thread which processes async action subscriptions.
-- See Note [Async action subscriptions]
asyncActionSubscriptionsProcessor ::
  ( MonadIO m,
    MonadMetadataStorage (MetadataStorageT m)
  ) =>
  AsyncActionSubscriptionState ->
  m void
asyncActionSubscriptionsProcessor :: AsyncActionSubscriptionState -> m void
asyncActionSubscriptionsProcessor AsyncActionSubscriptionState
subsState = m () -> m void
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever do
  -- Collect all active async query subscription operations
  NonEmpty (OperationId, AsyncActionQueryLive)
opList <- IO (NonEmpty (OperationId, AsyncActionQueryLive))
-> m (NonEmpty (OperationId, AsyncActionQueryLive))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (NonEmpty (OperationId, AsyncActionQueryLive))
 -> m (NonEmpty (OperationId, AsyncActionQueryLive)))
-> IO (NonEmpty (OperationId, AsyncActionQueryLive))
-> m (NonEmpty (OperationId, AsyncActionQueryLive))
forall a b. (a -> b) -> a -> b
$ STM (NonEmpty (OperationId, AsyncActionQueryLive))
-> IO (NonEmpty (OperationId, AsyncActionQueryLive))
forall a. STM a -> IO a
STM.atomically do
    [(OperationId, AsyncActionQueryLive)]
l <- AsyncActionSubscriptionState
-> STM [(OperationId, AsyncActionQueryLive)]
forall k v. TMap k v -> STM [(k, v)]
TMap.toList AsyncActionSubscriptionState
subsState
    Maybe (NonEmpty (OperationId, AsyncActionQueryLive))
-> STM (NonEmpty (OperationId, AsyncActionQueryLive))
-> STM (NonEmpty (OperationId, AsyncActionQueryLive))
forall (m :: * -> *) a. Applicative m => Maybe a -> m a -> m a
onNothing ([(OperationId, AsyncActionQueryLive)]
-> Maybe (NonEmpty (OperationId, AsyncActionQueryLive))
forall a. [a] -> Maybe (NonEmpty a)
NE.nonEmpty [(OperationId, AsyncActionQueryLive)]
l) STM (NonEmpty (OperationId, AsyncActionQueryLive))
forall a. STM a
STM.retry -- Continue only if there are any active subscription operations
  NonEmpty (OperationId, AsyncActionQueryLive)
-> ((OperationId, AsyncActionQueryLive) -> m ()) -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ NonEmpty (OperationId, AsyncActionQueryLive)
opList (((OperationId, AsyncActionQueryLive) -> m ()) -> m ())
-> ((OperationId, AsyncActionQueryLive) -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \(OperationId
opId, AsyncActionQueryLive NonEmpty ActionId
actionIds QErr -> IO ()
onError LiveAsyncActionQuery
op) -> do
    -- Fetch action webhook responses from the metadata storage
    Either QErr (ActionLogResponseMap, Bool)
actionLogMapE <- ExceptT QErr m (ActionLogResponseMap, Bool)
-> m (Either QErr (ActionLogResponseMap, Bool))
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT QErr m (ActionLogResponseMap, Bool)
 -> m (Either QErr (ActionLogResponseMap, Bool)))
-> ExceptT QErr m (ActionLogResponseMap, Bool)
-> m (Either QErr (ActionLogResponseMap, Bool))
forall a b. (a -> b) -> a -> b
$ [ActionId] -> ExceptT QErr m (ActionLogResponseMap, Bool)
forall (m :: * -> *) (t :: * -> *).
(MonadError QErr m, MonadMetadataStorage (MetadataStorageT m),
 Foldable t) =>
t ActionId -> m (ActionLogResponseMap, Bool)
fetchActionLogResponses ([ActionId] -> ExceptT QErr m (ActionLogResponseMap, Bool))
-> [ActionId] -> ExceptT QErr m (ActionLogResponseMap, Bool)
forall a b. (a -> b) -> a -> b
$ NonEmpty ActionId -> [ActionId]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList NonEmpty ActionId
actionIds
    IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ case Either QErr (ActionLogResponseMap, Bool)
actionLogMapE of
      Left QErr
err -> do
        QErr -> IO ()
onError QErr
err
        AsyncActionSubscriptionState -> OperationId -> IO ()
removeAsyncActionLiveQuery AsyncActionSubscriptionState
subsState OperationId
opId
      Right (ActionLogResponseMap
actionLogMap, Bool
actionsComplete) ->
        case LiveAsyncActionQuery
op of
          LAAQNoRelationships (LiveAsyncActionQueryWithNoRelationships ActionLogResponseMap -> IO ()
sendResp IO ()
sendCompleted) -> do
            ActionLogResponseMap -> IO ()
sendResp ActionLogResponseMap
actionLogMap
            Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
actionsComplete (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
              IO ()
sendCompleted
              AsyncActionSubscriptionState -> OperationId -> IO ()
removeAsyncActionLiveQuery AsyncActionSubscriptionState
subsState OperationId
opId
          LAAQOnSourceDB (LiveAsyncActionQueryOnSource LiveQuerySubscriberDetails
currLqId ActionLogResponseMap
prevLogMap LiveQuerySubscriberDetails
-> ActionLogResponseMap -> IO (Maybe LiveQuerySubscriberDetails)
lqRestarter) -> do
            -- Actions webhook responses aren't updated in metadata storage, hence no need to restart the live query
            Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (ActionLogResponseMap
prevLogMap ActionLogResponseMap -> ActionLogResponseMap -> Bool
forall a. Eq a => a -> a -> Bool
== ActionLogResponseMap
actionLogMap) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
              Maybe LiveQuerySubscriberDetails
maybeNewLqId <- LiveQuerySubscriberDetails
-> ActionLogResponseMap -> IO (Maybe LiveQuerySubscriberDetails)
lqRestarter LiveQuerySubscriberDetails
currLqId ActionLogResponseMap
actionLogMap
              if Bool
actionsComplete
                then AsyncActionSubscriptionState -> OperationId -> IO ()
removeAsyncActionLiveQuery AsyncActionSubscriptionState
subsState OperationId
opId
                else do
                  case Maybe LiveQuerySubscriberDetails
maybeNewLqId of
                    Maybe LiveQuerySubscriberDetails
Nothing ->
                      -- Happens only when restarting a live query fails.
                      -- There's no point in holding the operation in the state.
                      AsyncActionSubscriptionState -> OperationId -> IO ()
removeAsyncActionLiveQuery AsyncActionSubscriptionState
subsState OperationId
opId
                    Just LiveQuerySubscriberDetails
newLqId ->
                      AsyncActionSubscriptionState
-> OperationId
-> NonEmpty ActionId
-> (QErr -> IO ())
-> LiveAsyncActionQuery
-> IO ()
addAsyncActionLiveQuery AsyncActionSubscriptionState
subsState OperationId
opId NonEmpty ActionId
actionIds QErr -> IO ()
onError (LiveAsyncActionQuery -> IO ()) -> LiveAsyncActionQuery -> IO ()
forall a b. (a -> b) -> a -> b
$
                        LiveAsyncActionQueryOnSource -> LiveAsyncActionQuery
LAAQOnSourceDB (LiveAsyncActionQueryOnSource -> LiveAsyncActionQuery)
-> LiveAsyncActionQueryOnSource -> LiveAsyncActionQuery
forall a b. (a -> b) -> a -> b
$ LiveQuerySubscriberDetails
-> ActionLogResponseMap
-> (LiveQuerySubscriberDetails
    -> ActionLogResponseMap -> IO (Maybe LiveQuerySubscriberDetails))
-> LiveAsyncActionQueryOnSource
LiveAsyncActionQueryOnSource LiveQuerySubscriberDetails
newLqId ActionLogResponseMap
actionLogMap LiveQuerySubscriberDetails
-> ActionLogResponseMap -> IO (Maybe LiveQuerySubscriberDetails)
lqRestarter
  -- Sleep for a second
  IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ DiffTime -> IO ()
C.sleep (DiffTime -> IO ()) -> DiffTime -> IO ()
forall a b. (a -> b) -> a -> b
$ Seconds -> DiffTime
seconds Seconds
1