Skip to content

Commit

Permalink
Merge pull request #22 from madflojo/singleinstance
Browse files Browse the repository at this point in the history
Single Instance Tasks
  • Loading branch information
madflojo authored Mar 16, 2024
2 parents 2edc60e + d16104a commit e219a13
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 33 deletions.
11 changes: 6 additions & 5 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.16
go-version: 1.18

- name: Install cover
run: go get golang.org/x/tools/cmd/cover
Expand All @@ -30,7 +30,8 @@ jobs:
- name: Test
run: go test --race -v -covermode=atomic -coverprofile=coverage.out ./...

- name: Update Coveralls
env:
COVERALLS_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: goveralls -coverprofile=coverage.out -service=github
- name: Upload coverage reports to Codecov
uses: codecov/[email protected]
with:
token: ${{ secrets.CODECOV_TOKEN }}
slug: madflojo/tasks
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Tasks

[![Coverage Status](https://coveralls.io/repos/github/madflojo/tasks/badge.svg?branch=main)](https://coveralls.io/github/madflojo/tasks?branch=main)
[![codecov](https://codecov.io/gh/madflojo/tasks/graph/badge.svg?token=882QTXA7PX)](https://codecov.io/gh/madflojo/tasks)
[![Go Report Card](https://goreportcard.com/badge/github.com/madflojo/tasks)](https://goreportcard.com/report/github.com/madflojo/tasks)
[![PkgGoDev](https://pkg.go.dev/badge/github.com/madflojo/tasks)](https://pkg.go.dev/github.com/madflojo/tasks)

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
module github.com/madflojo/tasks

go 1.17
go 1.18

require github.com/rs/xid v1.5.0
33 changes: 29 additions & 4 deletions tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,7 @@ import (
// Task contains the scheduled task details and control mechanisms. This struct is used during the creation of tasks.
// It allows users to control how and when tasks are executed.
type Task struct {
sync.Mutex

// id is the Unique ID created for each task. This ID is generated by the Add() function.
id string
sync.RWMutex

// TaskContext allows for user-defined context that is passed to task functions.
TaskContext TaskContext
Expand All @@ -120,6 +117,14 @@ type Task struct {
// the task self deleting.
RunOnce bool

// RunSingleInstance is used to set a task as a single instance task. By default, tasks will continue executing at
// the interval specified until deleted. With RunSingleInstance enabled a subsequent task execution will be skipped
// if the previous task execution is still running.
//
// This is useful for tasks that may take longer than the interval to execute. This will prevent multiple instances
// of the same task from running concurrently.
RunSingleInstance bool

// StartAfter is used to specify a start time for the scheduler. When set, tasks will wait for the specified
// time to start the schedule timer.
StartAfter time.Time
Expand Down Expand Up @@ -148,6 +153,12 @@ type Task struct {
// Either ErrFunc or ErrFuncWithTaskContext must be defined. If both are defined, ErrFuncWithTaskContext will be used.
ErrFuncWithTaskContext func(TaskContext, error)

// id is the Unique ID created for each task. This ID is generated by the Add() function.
id string

// running is used for RunSingleInstance tasks to track whether a previous invocation is still running.
running sync.Mutex

// timer is the internal task timer. This is stored here to provide control via main scheduler functions.
timer *time.Timer

Expand Down Expand Up @@ -360,6 +371,15 @@ func (schd *Scheduler) scheduleTask(t *Task) {
// execTask is the underlying scheduler, it is used to trigger and execute tasks.
func (schd *Scheduler) execTask(t *Task) {
go func() {
if t.RunSingleInstance {
if !t.running.TryLock() {
// Skip execution if task is already running
return
}
defer t.running.Unlock()
}

// Execute task
var err error
if t.FuncWithTaskContext != nil {
err = t.FuncWithTaskContext(t.TaskContext)
Expand All @@ -373,10 +393,14 @@ func (schd *Scheduler) execTask(t *Task) {
go t.ErrFunc(err)
}
}

// If RunOnce is set, delete the task after execution
if t.RunOnce {
defer schd.Del(t.id)
}
}()

// Reschedule task for next execution
if !t.RunOnce {
t.safeOps(func() {
t.timer.Reset(t.Interval)
Expand All @@ -402,6 +426,7 @@ func (t *Task) Clone() *Task {
task.Interval = t.Interval
task.StartAfter = t.StartAfter
task.RunOnce = t.RunOnce
task.RunSingleInstance = t.RunSingleInstance
task.id = t.id
task.ctx = t.ctx
task.cancel = t.cancel
Expand Down
2 changes: 1 addition & 1 deletion tasks_benchmarks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func BenchmarkTasks(b *testing.B) {
taskID, err := scheduler.Add(&Task{
Interval: time.Duration(1 * time.Minute),
TaskFunc: func() error { return nil },
ErrFunc: func(e error) {},
ErrFunc: func(_ error) {},
})
if err != nil {
b.Fatalf("Unable to schedule example task - %s", err)
Expand Down
Loading

0 comments on commit e219a13

Please sign in to comment.