module Hasura.GraphQL.Execute.Action.Subscription
( asyncActionSubscriptionsProcessor,
)
where
import Control.Concurrent.Extended qualified as C
import Control.Concurrent.STM qualified as STM
import Data.List.NonEmpty qualified as NE
import Hasura.GraphQL.Execute.Action
import Hasura.GraphQL.Execute.Subscription.State
import Hasura.GraphQL.Execute.Subscription.TMap qualified as TMap
import Hasura.Metadata.Class
import Hasura.Prelude
asyncActionSubscriptionsProcessor ::
( MonadIO m,
MonadMetadataStorage m
) =>
AsyncActionSubscriptionState ->
m void
asyncActionSubscriptionsProcessor :: forall (m :: * -> *) void.
(MonadIO m, MonadMetadataStorage m) =>
AsyncActionSubscriptionState -> m void
asyncActionSubscriptionsProcessor AsyncActionSubscriptionState
subsState = m () -> m void
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever do
NonEmpty (OperationId, AsyncActionQueryLive)
opList <- IO (NonEmpty (OperationId, AsyncActionQueryLive))
-> m (NonEmpty (OperationId, AsyncActionQueryLive))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (NonEmpty (OperationId, AsyncActionQueryLive))
-> m (NonEmpty (OperationId, AsyncActionQueryLive)))
-> IO (NonEmpty (OperationId, AsyncActionQueryLive))
-> m (NonEmpty (OperationId, AsyncActionQueryLive))
forall a b. (a -> b) -> a -> b
$ STM (NonEmpty (OperationId, AsyncActionQueryLive))
-> IO (NonEmpty (OperationId, AsyncActionQueryLive))
forall a. STM a -> IO a
STM.atomically do
[(OperationId, AsyncActionQueryLive)]
l <- AsyncActionSubscriptionState
-> STM [(OperationId, AsyncActionQueryLive)]
forall k v. TMap k v -> STM [(k, v)]
TMap.toList AsyncActionSubscriptionState
subsState
Maybe (NonEmpty (OperationId, AsyncActionQueryLive))
-> STM (NonEmpty (OperationId, AsyncActionQueryLive))
-> STM (NonEmpty (OperationId, AsyncActionQueryLive))
forall (m :: * -> *) a. Applicative m => Maybe a -> m a -> m a
onNothing ([(OperationId, AsyncActionQueryLive)]
-> Maybe (NonEmpty (OperationId, AsyncActionQueryLive))
forall a. [a] -> Maybe (NonEmpty a)
NE.nonEmpty [(OperationId, AsyncActionQueryLive)]
l) STM (NonEmpty (OperationId, AsyncActionQueryLive))
forall a. STM a
STM.retry
NonEmpty (OperationId, AsyncActionQueryLive)
-> ((OperationId, AsyncActionQueryLive) -> m ()) -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ NonEmpty (OperationId, AsyncActionQueryLive)
opList (((OperationId, AsyncActionQueryLive) -> m ()) -> m ())
-> ((OperationId, AsyncActionQueryLive) -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \(OperationId
opId, AsyncActionQueryLive NonEmpty ActionId
actionIds QErr -> IO ()
onError LiveAsyncActionQuery
op) -> do
Either QErr (ActionLogResponseMap, Bool)
actionLogMapE <- ExceptT QErr m (ActionLogResponseMap, Bool)
-> m (Either QErr (ActionLogResponseMap, Bool))
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT QErr m (ActionLogResponseMap, Bool)
-> m (Either QErr (ActionLogResponseMap, Bool)))
-> ExceptT QErr m (ActionLogResponseMap, Bool)
-> m (Either QErr (ActionLogResponseMap, Bool))
forall a b. (a -> b) -> a -> b
$ [ActionId] -> ExceptT QErr m (ActionLogResponseMap, Bool)
forall (m :: * -> *) (t :: * -> *).
(MonadError QErr m, MonadMetadataStorage m, Foldable t) =>
t ActionId -> m (ActionLogResponseMap, Bool)
fetchActionLogResponses ([ActionId] -> ExceptT QErr m (ActionLogResponseMap, Bool))
-> [ActionId] -> ExceptT QErr m (ActionLogResponseMap, Bool)
forall a b. (a -> b) -> a -> b
$ NonEmpty ActionId -> [ActionId]
forall a. NonEmpty a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList NonEmpty ActionId
actionIds
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ case Either QErr (ActionLogResponseMap, Bool)
actionLogMapE of
Left QErr
err -> do
QErr -> IO ()
onError QErr
err
AsyncActionSubscriptionState -> OperationId -> IO ()
removeAsyncActionLiveQuery AsyncActionSubscriptionState
subsState OperationId
opId
Right (ActionLogResponseMap
actionLogMap, Bool
actionsComplete) ->
case LiveAsyncActionQuery
op of
LAAQNoRelationships (LiveAsyncActionQueryWithNoRelationships ActionLogResponseMap -> IO ()
sendResp IO ()
sendCompleted) -> do
ActionLogResponseMap -> IO ()
sendResp ActionLogResponseMap
actionLogMap
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
actionsComplete (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
IO ()
sendCompleted
AsyncActionSubscriptionState -> OperationId -> IO ()
removeAsyncActionLiveQuery AsyncActionSubscriptionState
subsState OperationId
opId
LAAQOnSourceDB (LiveAsyncActionQueryOnSource LiveQuerySubscriberDetails
currLqId ActionLogResponseMap
prevLogMap LiveQuerySubscriberDetails
-> ActionLogResponseMap -> IO (Maybe LiveQuerySubscriberDetails)
lqRestarter) -> do
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (ActionLogResponseMap
prevLogMap ActionLogResponseMap -> ActionLogResponseMap -> Bool
forall a. Eq a => a -> a -> Bool
== ActionLogResponseMap
actionLogMap) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Maybe LiveQuerySubscriberDetails
maybeNewLqId <- LiveQuerySubscriberDetails
-> ActionLogResponseMap -> IO (Maybe LiveQuerySubscriberDetails)
lqRestarter LiveQuerySubscriberDetails
currLqId ActionLogResponseMap
actionLogMap
if Bool
actionsComplete
then AsyncActionSubscriptionState -> OperationId -> IO ()
removeAsyncActionLiveQuery AsyncActionSubscriptionState
subsState OperationId
opId
else do
case Maybe LiveQuerySubscriberDetails
maybeNewLqId of
Maybe LiveQuerySubscriberDetails
Nothing ->
AsyncActionSubscriptionState -> OperationId -> IO ()
removeAsyncActionLiveQuery AsyncActionSubscriptionState
subsState OperationId
opId
Just LiveQuerySubscriberDetails
newLqId ->
AsyncActionSubscriptionState
-> OperationId
-> NonEmpty ActionId
-> (QErr -> IO ())
-> LiveAsyncActionQuery
-> IO ()
addAsyncActionLiveQuery AsyncActionSubscriptionState
subsState OperationId
opId NonEmpty ActionId
actionIds QErr -> IO ()
onError
(LiveAsyncActionQuery -> IO ()) -> LiveAsyncActionQuery -> IO ()
forall a b. (a -> b) -> a -> b
$ LiveAsyncActionQueryOnSource -> LiveAsyncActionQuery
LAAQOnSourceDB
(LiveAsyncActionQueryOnSource -> LiveAsyncActionQuery)
-> LiveAsyncActionQueryOnSource -> LiveAsyncActionQuery
forall a b. (a -> b) -> a -> b
$ LiveQuerySubscriberDetails
-> ActionLogResponseMap
-> (LiveQuerySubscriberDetails
-> ActionLogResponseMap -> IO (Maybe LiveQuerySubscriberDetails))
-> LiveAsyncActionQueryOnSource
LiveAsyncActionQueryOnSource LiveQuerySubscriberDetails
newLqId ActionLogResponseMap
actionLogMap LiveQuerySubscriberDetails
-> ActionLogResponseMap -> IO (Maybe LiveQuerySubscriberDetails)
lqRestarter
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ DiffTime -> IO ()
C.sleep (DiffTime -> IO ()) -> DiffTime -> IO ()
forall a b. (a -> b) -> a -> b
$ Seconds -> DiffTime
seconds Seconds
1