{-# OPTIONS_GHC -fno-warn-orphans #-}

module Hasura.Eventing.Common
  ( LockedEventsCtx (..),
    saveLockedEvents,
    removeEventFromLockedEvents,
    generateScheduleTimes,
    cleanupSchedulesToBeGenerated,
    deleteEventTriggerLogsInBatchesWith,
  )
where

import Control.Arrow.Extended
import Control.Concurrent.STM.TVar
import Control.Monad.STM
import Data.List (unfoldr)
import Data.Set qualified as Set
import Data.Time
import Hasura.Base.Error (QErr)
import Hasura.Prelude
import Hasura.RQL.Types.Action (LockedActionEventId)
import Hasura.RQL.Types.Common
import Hasura.RQL.Types.EventTrigger
import Hasura.RQL.Types.Eventing (EventId)
import Hasura.RQL.Types.ScheduledTrigger (CronEventId, OneOffScheduledEventId)
import System.Cron

data LockedEventsCtx = LockedEventsCtx
  { LockedEventsCtx -> TVar (Set CronEventId)
leCronEvents :: TVar (Set.Set CronEventId),
    LockedEventsCtx -> TVar (Set CronEventId)
leOneOffEvents :: TVar (Set.Set OneOffScheduledEventId),
    LockedEventsCtx -> TVar (HashMap SourceName (Set CronEventId))
leEvents :: TVar (HashMap SourceName (Set.Set EventId)),
    LockedEventsCtx -> TVar (Set CronEventId)
leActionEvents :: TVar (Set.Set LockedActionEventId)
  }

-- | After the events are fetched from the DB, we store the locked events
--   in a hash set(order doesn't matter and look ups are faster) in the
--   event engine context
saveLockedEvents :: (MonadIO m) => [EventId] -> TVar (Set.Set EventId) -> m ()
saveLockedEvents :: forall (m :: * -> *).
MonadIO m =>
[CronEventId] -> TVar (Set CronEventId) -> m ()
saveLockedEvents [CronEventId]
eventIds TVar (Set CronEventId)
lockedEvents =
  IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
    (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically
    (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
      Set CronEventId
lockedEventsVals <- TVar (Set CronEventId) -> STM (Set CronEventId)
forall a. TVar a -> STM a
readTVar TVar (Set CronEventId)
lockedEvents
      TVar (Set CronEventId) -> Set CronEventId -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Set CronEventId)
lockedEvents
        (Set CronEventId -> STM ()) -> Set CronEventId -> STM ()
forall a b. (a -> b) -> a -> b
$! Set CronEventId -> Set CronEventId -> Set CronEventId
forall a. Ord a => Set a -> Set a -> Set a
Set.union Set CronEventId
lockedEventsVals
        (Set CronEventId -> Set CronEventId)
-> Set CronEventId -> Set CronEventId
forall a b. (a -> b) -> a -> b
$ [CronEventId] -> Set CronEventId
forall a. Ord a => [a] -> Set a
Set.fromList [CronEventId]
eventIds

-- | Remove an event from the 'LockedEventsCtx' after it has been processed
removeEventFromLockedEvents ::
  (MonadIO m) => EventId -> TVar (Set.Set EventId) -> m ()
removeEventFromLockedEvents :: forall (m :: * -> *).
MonadIO m =>
CronEventId -> TVar (Set CronEventId) -> m ()
removeEventFromLockedEvents CronEventId
eventId TVar (Set CronEventId)
lockedEvents =
  IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
    (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically
    (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
      Set CronEventId
lockedEventsVals <- TVar (Set CronEventId) -> STM (Set CronEventId)
forall a. TVar a -> STM a
readTVar TVar (Set CronEventId)
lockedEvents
      TVar (Set CronEventId) -> Set CronEventId -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Set CronEventId)
lockedEvents (Set CronEventId -> STM ()) -> Set CronEventId -> STM ()
forall a b. (a -> b) -> a -> b
$! CronEventId -> Set CronEventId -> Set CronEventId
forall a. Ord a => a -> Set a -> Set a
Set.delete CronEventId
eventId Set CronEventId
lockedEventsVals

-- | Generates next @n events starting @from according to 'CronSchedule'
generateScheduleTimes :: UTCTime -> Int -> CronSchedule -> [UTCTime]
generateScheduleTimes :: UTCTime -> Int -> CronSchedule -> [UTCTime]
generateScheduleTimes UTCTime
from Int
n CronSchedule
cron = Int -> [UTCTime] -> [UTCTime]
forall a. Int -> [a] -> [a]
take Int
n ([UTCTime] -> [UTCTime]) -> [UTCTime] -> [UTCTime]
forall a b. (a -> b) -> a -> b
$ UTCTime -> [UTCTime]
go UTCTime
from
  where
    go :: UTCTime -> [UTCTime]
go = (UTCTime -> Maybe (UTCTime, UTCTime)) -> UTCTime -> [UTCTime]
forall b a. (b -> Maybe (a, b)) -> b -> [a]
unfoldr ((UTCTime -> (UTCTime, UTCTime))
-> Maybe UTCTime -> Maybe (UTCTime, UTCTime)
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap UTCTime -> (UTCTime, UTCTime)
forall (arr :: * -> * -> *) a. Arrow arr => arr a (a, a)
dup (Maybe UTCTime -> Maybe (UTCTime, UTCTime))
-> (UTCTime -> Maybe UTCTime)
-> UTCTime
-> Maybe (UTCTime, UTCTime)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. CronSchedule -> UTCTime -> Maybe UTCTime
nextMatch CronSchedule
cron)

-- | number of cleanup schedules to be generated in one iteration
cleanupSchedulesToBeGenerated :: Int
cleanupSchedulesToBeGenerated :: Int
cleanupSchedulesToBeGenerated = Int
50

deleteEventTriggerLogsInBatchesWith ::
  (MonadIO m, MonadError QErr m) =>
  IO (Maybe (TriggerLogCleanupConfig, EventTriggerCleanupStatus)) ->
  TriggerLogCleanupConfig ->
  (TriggerLogCleanupConfig -> IO (Either QErr DeletedEventLogStats)) ->
  m DeletedEventLogStats
deleteEventTriggerLogsInBatchesWith :: forall (m :: * -> *).
(MonadIO m, MonadError QErr m) =>
IO (Maybe (TriggerLogCleanupConfig, EventTriggerCleanupStatus))
-> TriggerLogCleanupConfig
-> (TriggerLogCleanupConfig
    -> IO (Either QErr DeletedEventLogStats))
-> m DeletedEventLogStats
deleteEventTriggerLogsInBatchesWith IO (Maybe (TriggerLogCleanupConfig, EventTriggerCleanupStatus))
getLatestCleanupConfig TriggerLogCleanupConfig
oldCleanupConfig TriggerLogCleanupConfig -> IO (Either QErr DeletedEventLogStats)
dbLogDeleteAction = do
  -- fetch the latest cleanup config from the schema cache
  Maybe (TriggerLogCleanupConfig, EventTriggerCleanupStatus)
latestCleanupConfig <- IO (Maybe (TriggerLogCleanupConfig, EventTriggerCleanupStatus))
-> m (Maybe (TriggerLogCleanupConfig, EventTriggerCleanupStatus))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (Maybe (TriggerLogCleanupConfig, EventTriggerCleanupStatus))
getLatestCleanupConfig
  case Maybe (TriggerLogCleanupConfig, EventTriggerCleanupStatus)
latestCleanupConfig of
    -- if the cleanup has been paused, then don't delete anything
    Just (TriggerLogCleanupConfig
_, EventTriggerCleanupStatus
ETCSPaused) -> DeletedEventLogStats -> m DeletedEventLogStats
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> Int -> DeletedEventLogStats
DeletedEventLogStats Int
0 Int
0)
    Maybe (TriggerLogCleanupConfig, EventTriggerCleanupStatus)
nonPausedNewConfig -> do
      -- get latest cleanup config if available, else use the older one
      let cleanupConfig :: TriggerLogCleanupConfig
cleanupConfig = TriggerLogCleanupConfig
-> ((TriggerLogCleanupConfig, EventTriggerCleanupStatus)
    -> TriggerLogCleanupConfig)
-> Maybe (TriggerLogCleanupConfig, EventTriggerCleanupStatus)
-> TriggerLogCleanupConfig
forall b a. b -> (a -> b) -> Maybe a -> b
maybe TriggerLogCleanupConfig
oldCleanupConfig (TriggerLogCleanupConfig, EventTriggerCleanupStatus)
-> TriggerLogCleanupConfig
forall a b. (a, b) -> a
fst Maybe (TriggerLogCleanupConfig, EventTriggerCleanupStatus)
nonPausedNewConfig
      -- delete one batch of the logs
      deletedStatsForCurrentBatch :: DeletedEventLogStats
deletedStatsForCurrentBatch@(DeletedEventLogStats Int
delEventLogsInBatch Int
delInvocationLogsInBatch) <-
        m (Either QErr DeletedEventLogStats) -> m DeletedEventLogStats
forall e (m :: * -> *) a. MonadError e m => m (Either e a) -> m a
liftEitherM (m (Either QErr DeletedEventLogStats) -> m DeletedEventLogStats)
-> m (Either QErr DeletedEventLogStats) -> m DeletedEventLogStats
forall a b. (a -> b) -> a -> b
$ IO (Either QErr DeletedEventLogStats)
-> m (Either QErr DeletedEventLogStats)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either QErr DeletedEventLogStats)
 -> m (Either QErr DeletedEventLogStats))
-> IO (Either QErr DeletedEventLogStats)
-> m (Either QErr DeletedEventLogStats)
forall a b. (a -> b) -> a -> b
$ TriggerLogCleanupConfig -> IO (Either QErr DeletedEventLogStats)
dbLogDeleteAction TriggerLogCleanupConfig
cleanupConfig
      -- If no logs has been deleted, then end the recursion
      if DeletedEventLogStats
deletedStatsForCurrentBatch DeletedEventLogStats -> DeletedEventLogStats -> Bool
forall a. Eq a => a -> a -> Bool
== (Int -> Int -> DeletedEventLogStats
DeletedEventLogStats Int
0 Int
0)
        then DeletedEventLogStats -> m DeletedEventLogStats
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure DeletedEventLogStats
deletedStatsForCurrentBatch
        else do
          -- if non zero logs has been deleted then do a recursion
          (DeletedEventLogStats Int
deletedRemainingEventLogs Int
deletedRemainingInvocationLogs) <-
            IO (Maybe (TriggerLogCleanupConfig, EventTriggerCleanupStatus))
-> TriggerLogCleanupConfig
-> (TriggerLogCleanupConfig
    -> IO (Either QErr DeletedEventLogStats))
-> m DeletedEventLogStats
forall (m :: * -> *).
(MonadIO m, MonadError QErr m) =>
IO (Maybe (TriggerLogCleanupConfig, EventTriggerCleanupStatus))
-> TriggerLogCleanupConfig
-> (TriggerLogCleanupConfig
    -> IO (Either QErr DeletedEventLogStats))
-> m DeletedEventLogStats
deleteEventTriggerLogsInBatchesWith IO (Maybe (TriggerLogCleanupConfig, EventTriggerCleanupStatus))
getLatestCleanupConfig TriggerLogCleanupConfig
cleanupConfig TriggerLogCleanupConfig -> IO (Either QErr DeletedEventLogStats)
dbLogDeleteAction
          -- Finally collect all the statistics
          DeletedEventLogStats -> m DeletedEventLogStats
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> Int -> DeletedEventLogStats
DeletedEventLogStats (Int
delEventLogsInBatch Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
deletedRemainingEventLogs) (Int
delInvocationLogsInBatch Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
deletedRemainingInvocationLogs))