{-# OPTIONS_GHC -fno-warn-orphans #-}
module Hasura.Eventing.Common
( LockedEventsCtx (..),
saveLockedEvents,
removeEventFromLockedEvents,
generateScheduleTimes,
cleanupSchedulesToBeGenerated,
deleteEventTriggerLogsInBatchesWith,
)
where
import Control.Arrow.Extended
import Control.Concurrent.STM.TVar
import Control.Monad.STM
import Data.List (unfoldr)
import Data.Set qualified as Set
import Data.Time
import Hasura.Base.Error (QErr)
import Hasura.Prelude
import Hasura.RQL.Types.Action (LockedActionEventId)
import Hasura.RQL.Types.Common
import Hasura.RQL.Types.EventTrigger
import Hasura.RQL.Types.Eventing (EventId)
import Hasura.RQL.Types.ScheduledTrigger (CronEventId, OneOffScheduledEventId)
import System.Cron
data LockedEventsCtx = LockedEventsCtx
{ LockedEventsCtx -> TVar (Set CronEventId)
leCronEvents :: TVar (Set.Set CronEventId),
LockedEventsCtx -> TVar (Set CronEventId)
leOneOffEvents :: TVar (Set.Set OneOffScheduledEventId),
LockedEventsCtx -> TVar (HashMap SourceName (Set CronEventId))
leEvents :: TVar (HashMap SourceName (Set.Set EventId)),
LockedEventsCtx -> TVar (Set CronEventId)
leActionEvents :: TVar (Set.Set LockedActionEventId)
}
saveLockedEvents :: (MonadIO m) => [EventId] -> TVar (Set.Set EventId) -> m ()
saveLockedEvents :: forall (m :: * -> *).
MonadIO m =>
[CronEventId] -> TVar (Set CronEventId) -> m ()
saveLockedEvents [CronEventId]
eventIds TVar (Set CronEventId)
lockedEvents =
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
(IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically
(STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Set CronEventId
lockedEventsVals <- TVar (Set CronEventId) -> STM (Set CronEventId)
forall a. TVar a -> STM a
readTVar TVar (Set CronEventId)
lockedEvents
TVar (Set CronEventId) -> Set CronEventId -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Set CronEventId)
lockedEvents
(Set CronEventId -> STM ()) -> Set CronEventId -> STM ()
forall a b. (a -> b) -> a -> b
$! Set CronEventId -> Set CronEventId -> Set CronEventId
forall a. Ord a => Set a -> Set a -> Set a
Set.union Set CronEventId
lockedEventsVals
(Set CronEventId -> Set CronEventId)
-> Set CronEventId -> Set CronEventId
forall a b. (a -> b) -> a -> b
$ [CronEventId] -> Set CronEventId
forall a. Ord a => [a] -> Set a
Set.fromList [CronEventId]
eventIds
removeEventFromLockedEvents ::
(MonadIO m) => EventId -> TVar (Set.Set EventId) -> m ()
removeEventFromLockedEvents :: forall (m :: * -> *).
MonadIO m =>
CronEventId -> TVar (Set CronEventId) -> m ()
removeEventFromLockedEvents CronEventId
eventId TVar (Set CronEventId)
lockedEvents =
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
(IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically
(STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Set CronEventId
lockedEventsVals <- TVar (Set CronEventId) -> STM (Set CronEventId)
forall a. TVar a -> STM a
readTVar TVar (Set CronEventId)
lockedEvents
TVar (Set CronEventId) -> Set CronEventId -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Set CronEventId)
lockedEvents (Set CronEventId -> STM ()) -> Set CronEventId -> STM ()
forall a b. (a -> b) -> a -> b
$! CronEventId -> Set CronEventId -> Set CronEventId
forall a. Ord a => a -> Set a -> Set a
Set.delete CronEventId
eventId Set CronEventId
lockedEventsVals
generateScheduleTimes :: UTCTime -> Int -> CronSchedule -> [UTCTime]
generateScheduleTimes :: UTCTime -> Int -> CronSchedule -> [UTCTime]
generateScheduleTimes UTCTime
from Int
n CronSchedule
cron = Int -> [UTCTime] -> [UTCTime]
forall a. Int -> [a] -> [a]
take Int
n ([UTCTime] -> [UTCTime]) -> [UTCTime] -> [UTCTime]
forall a b. (a -> b) -> a -> b
$ UTCTime -> [UTCTime]
go UTCTime
from
where
go :: UTCTime -> [UTCTime]
go = (UTCTime -> Maybe (UTCTime, UTCTime)) -> UTCTime -> [UTCTime]
forall b a. (b -> Maybe (a, b)) -> b -> [a]
unfoldr ((UTCTime -> (UTCTime, UTCTime))
-> Maybe UTCTime -> Maybe (UTCTime, UTCTime)
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap UTCTime -> (UTCTime, UTCTime)
forall (arr :: * -> * -> *) a. Arrow arr => arr a (a, a)
dup (Maybe UTCTime -> Maybe (UTCTime, UTCTime))
-> (UTCTime -> Maybe UTCTime)
-> UTCTime
-> Maybe (UTCTime, UTCTime)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. CronSchedule -> UTCTime -> Maybe UTCTime
nextMatch CronSchedule
cron)
cleanupSchedulesToBeGenerated :: Int
cleanupSchedulesToBeGenerated :: Int
cleanupSchedulesToBeGenerated = Int
50
deleteEventTriggerLogsInBatchesWith ::
(MonadIO m, MonadError QErr m) =>
IO (Maybe (TriggerLogCleanupConfig, EventTriggerCleanupStatus)) ->
TriggerLogCleanupConfig ->
(TriggerLogCleanupConfig -> IO (Either QErr DeletedEventLogStats)) ->
m DeletedEventLogStats
deleteEventTriggerLogsInBatchesWith :: forall (m :: * -> *).
(MonadIO m, MonadError QErr m) =>
IO (Maybe (TriggerLogCleanupConfig, EventTriggerCleanupStatus))
-> TriggerLogCleanupConfig
-> (TriggerLogCleanupConfig
-> IO (Either QErr DeletedEventLogStats))
-> m DeletedEventLogStats
deleteEventTriggerLogsInBatchesWith IO (Maybe (TriggerLogCleanupConfig, EventTriggerCleanupStatus))
getLatestCleanupConfig TriggerLogCleanupConfig
oldCleanupConfig TriggerLogCleanupConfig -> IO (Either QErr DeletedEventLogStats)
dbLogDeleteAction = do
Maybe (TriggerLogCleanupConfig, EventTriggerCleanupStatus)
latestCleanupConfig <- IO (Maybe (TriggerLogCleanupConfig, EventTriggerCleanupStatus))
-> m (Maybe (TriggerLogCleanupConfig, EventTriggerCleanupStatus))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (Maybe (TriggerLogCleanupConfig, EventTriggerCleanupStatus))
getLatestCleanupConfig
case Maybe (TriggerLogCleanupConfig, EventTriggerCleanupStatus)
latestCleanupConfig of
Just (TriggerLogCleanupConfig
_, EventTriggerCleanupStatus
ETCSPaused) -> DeletedEventLogStats -> m DeletedEventLogStats
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> Int -> DeletedEventLogStats
DeletedEventLogStats Int
0 Int
0)
Maybe (TriggerLogCleanupConfig, EventTriggerCleanupStatus)
nonPausedNewConfig -> do
let cleanupConfig :: TriggerLogCleanupConfig
cleanupConfig = TriggerLogCleanupConfig
-> ((TriggerLogCleanupConfig, EventTriggerCleanupStatus)
-> TriggerLogCleanupConfig)
-> Maybe (TriggerLogCleanupConfig, EventTriggerCleanupStatus)
-> TriggerLogCleanupConfig
forall b a. b -> (a -> b) -> Maybe a -> b
maybe TriggerLogCleanupConfig
oldCleanupConfig (TriggerLogCleanupConfig, EventTriggerCleanupStatus)
-> TriggerLogCleanupConfig
forall a b. (a, b) -> a
fst Maybe (TriggerLogCleanupConfig, EventTriggerCleanupStatus)
nonPausedNewConfig
deletedStatsForCurrentBatch :: DeletedEventLogStats
deletedStatsForCurrentBatch@(DeletedEventLogStats Int
delEventLogsInBatch Int
delInvocationLogsInBatch) <-
m (Either QErr DeletedEventLogStats) -> m DeletedEventLogStats
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM (m (Either QErr DeletedEventLogStats) -> m DeletedEventLogStats)
-> m (Either QErr DeletedEventLogStats) -> m DeletedEventLogStats
forall a b. (a -> b) -> a -> b
$ IO (Either QErr DeletedEventLogStats)
-> m (Either QErr DeletedEventLogStats)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either QErr DeletedEventLogStats)
-> m (Either QErr DeletedEventLogStats))
-> IO (Either QErr DeletedEventLogStats)
-> m (Either QErr DeletedEventLogStats)
forall a b. (a -> b) -> a -> b
$ TriggerLogCleanupConfig -> IO (Either QErr DeletedEventLogStats)
dbLogDeleteAction TriggerLogCleanupConfig
cleanupConfig
if DeletedEventLogStats
deletedStatsForCurrentBatch DeletedEventLogStats -> DeletedEventLogStats -> Bool
forall a. Eq a => a -> a -> Bool
== (Int -> Int -> DeletedEventLogStats
DeletedEventLogStats Int
0 Int
0)
then DeletedEventLogStats -> m DeletedEventLogStats
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure DeletedEventLogStats
deletedStatsForCurrentBatch
else do
(DeletedEventLogStats Int
deletedRemainingEventLogs Int
deletedRemainingInvocationLogs) <-
IO (Maybe (TriggerLogCleanupConfig, EventTriggerCleanupStatus))
-> TriggerLogCleanupConfig
-> (TriggerLogCleanupConfig
-> IO (Either QErr DeletedEventLogStats))
-> m DeletedEventLogStats
forall (m :: * -> *).
(MonadIO m, MonadError QErr m) =>
IO (Maybe (TriggerLogCleanupConfig, EventTriggerCleanupStatus))
-> TriggerLogCleanupConfig
-> (TriggerLogCleanupConfig
-> IO (Either QErr DeletedEventLogStats))
-> m DeletedEventLogStats
deleteEventTriggerLogsInBatchesWith IO (Maybe (TriggerLogCleanupConfig, EventTriggerCleanupStatus))
getLatestCleanupConfig TriggerLogCleanupConfig
cleanupConfig TriggerLogCleanupConfig -> IO (Either QErr DeletedEventLogStats)
dbLogDeleteAction
DeletedEventLogStats -> m DeletedEventLogStats
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> Int -> DeletedEventLogStats
DeletedEventLogStats (Int
delEventLogsInBatch Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
deletedRemainingEventLogs) (Int
delInvocationLogsInBatch Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
deletedRemainingInvocationLogs))