Skip to content

Commit

Permalink
first attempt to implement rescheduleJob safely
Browse files Browse the repository at this point in the history
  • Loading branch information
saurabhnanda committed Oct 11, 2024
1 parent a159d7a commit 3cff0cd
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 2 deletions.
55 changes: 55 additions & 0 deletions src/OddJobs/Job.hs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ module OddJobs.Job
-- $createJobs
, createJob
, scheduleJob
, rescheduleJob

, createJobWithResources
, scheduleJobWithResources
Expand All @@ -39,6 +40,7 @@ module OddJobs.Job
, ResourceId(..)
, FunctionName
, RunnerEnv(..)
, RescheduleError(..)

-- ** Structured logging
--
Expand Down Expand Up @@ -781,6 +783,59 @@ scheduleJob conn tname payload runAt = do
[r] -> pure r
_ -> Prelude.error . (<> "Not expecting multiple rows when creating a single job. Query=") <$> queryFormatter

fetchJobByIdForUpdate :: Connection -> TableName -> JobId -> IO (Maybe Job)
fetchJobByIdForUpdate conn tname jid = do
let args = (tname, jid)
queryFormatter = toS <$> PGS.formatQuery conn fetchJobByIdForUpdateSql args
PGS.query conn fetchJobByIdForUpdateSql args >>= \case
[] -> pure Nothing
[j] -> pure $ Just j
_ -> Prelude.error . (<> "Not expecting multiple rows when querying a job by Job ID. Query=") <$> queryFormatter

-- | Reschedule a job (among other things)
--
-- This function can be used to __safely__ change the following three values of a job:
--
-- * Job's status (so, you can use it to pre-maturely cancel a job)
-- * Job's attempts (so, you can use it to reduce the number of attempts, thus effectively
-- /increasing/ the number of times the job will be attempted)
-- * Job's run-at (so, you can use it to /prepone/ or /postpone/ a job, especially useful
-- if your job is a result of some user-action, and you want to implement a /debounce/
-- logic)
--
-- In case the job is currently locked, or not found, this returns a 'RescheduleError' instead.
rescheduleJob :: Connection
-- ^DB connection to use. __Note:__ This should /ideally/ come out of your
-- application's DB pool, not the 'cfgDbPool' you used in the job-runner.
-> TableName
-- ^ DB table which holds your jobs
-> JobId
-- ^ the JobId which you want to reschedule
-> (Job -> IO (Status, Int, UTCTime))
-- ^ a "rescheduling function" which will be passed the Job and will need to
-- return a 3-tuple of @(newStatus, newAttempts, newRunAt)@
--
-- __Note:__ This rescheduliung function is in @IO@ monad to allow you to
-- do interesting things, but please be __careful__; while the rescheduling
-- function is execute, a DB transaction is being kept open with this particular
-- Job in a LOCKED state (via @SELECT FOR UPDATE@)
-> IO (Either RescheduleError Job)
-- ^ Either the updated job is returned or a 'RescheduleError
rescheduleJob conn tname jid reschedulingFn = do
withTransaction conn $ do
fetchJobByIdForUpdate conn tname jid >>= \case
Nothing -> pure $ Left RescheduleJobNotFound
Just j -> case jobLockedAt j of
Just _ -> pure $ Left RescheduleJobLocked
Nothing -> do
(newStatus, newAttempts, newRunAt) <- reschedulingFn j
let args = (tname, newStatus, newAttempts, newRunAt, jid)
queryFormatter = toS <$> PGS.formatQuery conn rescheduleJobSql args
PGS.query conn rescheduleJobSql args >>= \case
[newjob] -> pure $ Right newjob
[] -> Prelude.error . (<> "Not expecting zero rows when updating a LOCKED job by Job ID. Query=") <$> queryFormatter
_ -> Prelude.error . (<> "Not expecting multiple rows when updating a LOCKED job by Job ID. Query=") <$> queryFormatter

type ResourceList = [(ResourceId, Int)]

createJobWithResources
Expand Down
13 changes: 11 additions & 2 deletions src/OddJobs/Job/Query.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ module OddJobs.Job.Query
, registerResourceUsage
, concatJobDbColumns
, jobDbColumns
, rescheduleJobSql
, fetchJobByIdForUpdateSql
)
where

Expand All @@ -23,7 +25,7 @@ jobPollingSql :: Query
jobPollingSql =
"update ? set status = ?, locked_at = ?, locked_by = ?, attempts=attempts+1 \
\ WHERE id in (select id from ? where (run_at<=? AND ((status in ?) OR (status = ? and locked_at<?))) \
\ ORDER BY attempts ASC, run_at ASC LIMIT 1 FOR UPDATE) RETURNING id"
\ ORDER BY attempts ASC, run_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED) RETURNING id"

jobPollingWithResourceSql :: Query
jobPollingWithResourceSql =
Expand All @@ -46,7 +48,6 @@ qWithResources =
"UPDATE ? SET status=?, locked_at=now(), locked_by=?, attempts=attempts+1 \
\ WHERE id=? AND status in ? AND ?(id) RETURNING id"


createJobQuery :: Query
createJobQuery = "INSERT INTO ? (run_at, status, payload, last_error, attempts, locked_at, locked_by) VALUES (?, ?, ?, ?, ?, ?, ?) RETURNING " <> concatJobDbColumns

Expand All @@ -68,6 +69,14 @@ concatJobDbColumns = concatJobDbColumns_ jobDbColumns ""
concatJobDbColumns_ [col] x = x <> col
concatJobDbColumns_ (col:cols) x = concatJobDbColumns_ cols (x <> col <> ", ")

-- | Ref: 'rescheduleJob'
fetchJobByIdForUpdateSql :: Query
fetchJobByIdForUpdateSql = "select " <> concatJobDbColumns <> " from ? where id = ? for update"

-- | Ref: 'rescheduleJob'
rescheduleJobSql :: Query
rescheduleJobSql = "update ? set status = ?, attempts = ?, run_at = ? where id = ? returning " <> concatJobDbColumns

-- | If you are writing SQL queries where you want to return ALL columns from
-- the jobs table it is __recommended__ that you do not issue a @SELECT *@ or
-- @RETURNIG *@. List out specific DB columns using 'jobDbColumns' and
Expand Down
2 changes: 2 additions & 0 deletions src/OddJobs/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ data FailureMode
-- by 'JobErrHandler' and 'cfgOnJobFailed'.
data JobErrHandler = forall a e . (Exception e) => JobErrHandler (e -> Job -> FailureMode -> IO a)

data RescheduleError = RescheduleJobNotFound | RescheduleJobLocked deriving (Eq, Show)

type FunctionName = PGS.Identifier

data ResourceCfg = ResourceCfg
Expand Down

0 comments on commit 3cff0cd

Please sign in to comment.