diff --git a/README.md b/README.md index 1cebda5..283cdae 100644 --- a/README.md +++ b/README.md @@ -82,7 +82,7 @@ * [Flush tags](#flush-tags) * [Tasks](#tasks) * [Queues](#queues) - * [Runner](#runner) + * [Dispatcher](#dispatcher) * [Cron](#cron) * [Static files](#static-files) * [Cache control headers](#cache-control-headers) @@ -997,77 +997,29 @@ As shown in the previous examples, cache tags were provided because they can be Tasks are queued operations to be executed in the background, either immediately, at a specfic time, or after a given amount of time has passed. Some examples of tasks could be long-running operations, bulk processing, cleanup, notifications, etc. -Since we're already using [SQLite](https://sqlite.org/) for our database, it's available to act as a persistent store for queued tasks so that tasks are never lost, can be retried until successful, and their concurrent execution can be managed. [Goqite](https://github.com/maragudk/goqite) is the library chosen to interface with [SQLite](https://sqlite.org/) and handle queueing tasks and processing them asynchronously. +Since we're already using [SQLite](https://sqlite.org/) for our database, it's available to act as a persistent store for queued tasks so that tasks are never lost, can be retried until successful, and their concurrent execution can be managed. [Backlite](https://github.com/mikestefanello/backlite) is the library chosen to interface with [SQLite](https://sqlite.org/) and handle queueing tasks and processing them asynchronously. I wrote that specifically to address the requirements I wanted to satisfy for this project. -To make things even easier, a custom client (`TaskClient`) is provided as a _Service_ on the `Container` which exposes a simple interface with [goqite](https://github.com/maragudk/goqite) that supports type-safe tasks and queues. +To make things easy, the _Backlite_ client is provided as a _Service_ on the `Container` which allows you to register queues and add tasks. -### Queues - -A full example of a queue implementation can be found in `pkg/tasks` with an interactive form to create a task and add to the queue at `/task` (see `pkg/handlers/task.go`). - -A queue starts by declaring a `Task` _type_, which is the object that gets placed in to a queue and eventually passed to a queue subscriber (a callback function to process the task). A `Task` must implement the `Name()` method which returns a unique name for the task. For example: - -```go -type MyTask struct { - Text string - Num int -} - -func (t MyTask) Name() string { - return "my_task" -} -``` - -Then, create the queue for `MyTask` tasks: - -```go -q := services.NewQueue[MyTask](func(ctx context.Context, task MyTask) error { - // This is where you process the task - fmt.Println("Processed %s task!", task.Text) - return nil -}) -``` - -And finally, register the queue with the `TaskClient`: +Configuration for the _Backlite_ client is exposed through the app's yaml configuration. The required database schema will be automatically installed when the app starts. -```go -c.Tasks.Register(q) -``` +### Queues -See `pkg/tasks/register.go` for a simple way to register all of your queues and to easily pass the `Container` to them so the queue subscriber callbacks have access to all of your app's dependencies. +A full example of a queue implementation can be found in `pkg/tasks` with an interactive form to create a task and add to the queue at `/task` (see `pkg/handlers/task.go`). Also refer to the [Backlite](https://github.com/mikestefanello/backlite) documentation for reference and examples. -Now you can easily add a task to the queue using the `TaskClient`: +See `pkg/tasks/register.go` for a simple way to register all of your queues and to easily pass the `Container` to them so the queue processor callbacks have access to all of your app's dependencies. -```go -task := MyTask{Text: "Hello world!", Num: 10} +### Dispatcher -err := c.Tasks. - New(task). - Save() -``` - -#### Options - -Tasks can be created and queued with various chained options: +The _task dispatcher_ is what manages the worker pool used for executing tasks in the background. It monitors incoming and scheduled tasks and handles sending them to the pool for execution by the queue's processor callback. This must be started in order for this to happen. In `cmd/web/main.go`, the _task dispatcher_ is automatically started when the app starts via: ```go -err := c.Tasks. - New(task). - Wait(30 * time.Second). // Wait 30 seconds before passing the task to the subscriber - At(time.Date(...)). // Wait until a given date before passing the task to the subscriber - Tx(tx). // Include the queueing of this task in a database transaction - Save() +c.Tasks.Start(ctx) ``` -### Runner - -The _task runner_ is what manages periodically polling the database for available queued tasks to process and passing them to the queue's subscriber callback. This must be started in order for this to happen. In `cmd/web/main.go`, the _task runner_ is started by using the `TaskClient`: - -```go -go c.Tasks.StartRunner(ctx) -``` +The app [configuration](#configuration) contains values to configure the client and dispatcher including how many goroutines to use, when to release stuck tasks back into the queue, and how often to cleanup retained tasks in the database. -The app [configuration](#configuration) contains values to configure the runner including how often to poll the database for tasks, the maximum amount of retries for a given task, and the amount of tasks that can be processed concurrently. +When the app is shutdown, the dispatcher is given 10 seconds to wait for any in-progress tasks to finish execution. This can be changed in `cmd/web/main.go`. ## Cron @@ -1203,12 +1155,12 @@ Future work includes but is not limited to: Thank you to all of the following amazing projects for making this possible. - [alpinejs](https://github.com/alpinejs/alpine) +- [backlite](https://github.com/mikestefanello/backlite) - [bulma](https://github.com/jgthms/bulma) - [echo](https://github.com/labstack/echo) - [ent](https://github.com/ent/ent) - [go](https://go.dev/) - [go-sqlite3](https://github.com/mattn/go-sqlite3) -- [goqite](https://github.com/maragudk/goqite) - [goquery](https://github.com/PuerkitoBio/goquery) - [htmx](https://github.com/bigskysoftware/htmx) - [jwt](https://github.com/golang-jwt/jwt) diff --git a/cmd/web/main.go b/cmd/web/main.go index f07d0c7..3482b2f 100644 --- a/cmd/web/main.go +++ b/cmd/web/main.go @@ -9,6 +9,7 @@ import ( "net/http" "os" "os/signal" + "sync" "time" "github.com/mikestefanello/pagoda/pkg/handlers" @@ -60,18 +61,32 @@ func main() { tasks.Register(c) // Start the task runner to execute queued tasks - ctx, cancel := context.WithCancel(context.Background()) - go c.Tasks.StartRunner(ctx) + c.Tasks.Start(context.Background()) // Wait for interrupt signal to gracefully shut down the server with a timeout of 10 seconds. quit := make(chan os.Signal, 1) signal.Notify(quit, os.Interrupt) signal.Notify(quit, os.Kill) <-quit - cancel() - ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second) + + // Shutdown both the task runner and web server + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - if err := c.Web.Shutdown(ctx); err != nil { - log.Fatal(err) - } + + wg := sync.WaitGroup{} + wg.Add(2) + + go func() { + defer wg.Done() + c.Tasks.Stop(ctx) + }() + + go func() { + defer wg.Done() + if err := c.Web.Shutdown(ctx); err != nil { + log.Fatal(err) + } + }() + + wg.Wait() } diff --git a/config/config.go b/config/config.go index b868259..1a8d7d7 100644 --- a/config/config.go +++ b/config/config.go @@ -106,9 +106,9 @@ type ( // TasksConfig stores the tasks configuration TasksConfig struct { - PollInterval time.Duration - MaxRetries int - Goroutines int + Goroutines int + ReleaseAfter time.Duration + CleanupInterval time.Duration } // MailConfig stores the mail configuration diff --git a/config/config.yaml b/config/config.yaml index 8d6c305..e743a09 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -32,9 +32,9 @@ database: testConnection: ":memory:?_journal=WAL&_timeout=5000&_fk=true" tasks: - pollInterval: "1s" - maxRetries: 10 goroutines: 1 + releaseAfter: "15m" + cleanupInterval: "1h" mail: hostname: "localhost" diff --git a/go.mod b/go.mod index 7e669be..4470f38 100644 --- a/go.mod +++ b/go.mod @@ -1,8 +1,6 @@ module github.com/mikestefanello/pagoda -go 1.22 - -toolchain go1.22.1 +go 1.22.4 require ( entgo.io/ent v0.13.1 @@ -14,9 +12,9 @@ require ( github.com/gorilla/sessions v1.2.2 github.com/labstack/echo/v4 v4.12.0 github.com/labstack/gommon v0.4.2 - github.com/maragudk/goqite v0.2.3 github.com/mattn/go-sqlite3 v1.14.22 github.com/maypok86/otter v1.2.1 + github.com/mikestefanello/backlite v0.1.0 github.com/spf13/viper v1.18.2 github.com/stretchr/testify v1.9.0 golang.org/x/crypto v0.22.0 diff --git a/go.sum b/go.sum index f3751f8..37a9675 100644 --- a/go.sum +++ b/go.sum @@ -81,10 +81,6 @@ github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ= github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI= github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY= github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0= -github.com/maragudk/goqite v0.2.3 h1:R8oVD6IMCQfjhCKyGIYwWxR1w8yxjvT/3uwYtA656jE= -github.com/maragudk/goqite v0.2.3/go.mod h1:5430TCLkycUeLE314c9fifTrTbwcJqJXdU3iyEiF6hM= -github.com/maragudk/is v0.1.0 h1:obq9anZNmOYcaNbeT0LMyjIexdNeYTw/TLAPD/BnZHA= -github.com/maragudk/is v0.1.0/go.mod h1:W/r6+TpnISu+a88OLXQy5JQGCOhXQXXLD2e5b4xMn5c= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= @@ -94,6 +90,8 @@ github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= github.com/maypok86/otter v1.2.1 h1:xyvMW+t0vE1sKt/++GTkznLitEl7D/msqXkAbLwiC1M= github.com/maypok86/otter v1.2.1/go.mod h1:mKLfoI7v1HOmQMwFgX4QkRk23mX6ge3RDvjdHOWG4R4= +github.com/mikestefanello/backlite v0.1.0 h1:bIiZJXPZB8V5PXWvDmkTepY015w3gJdeRrP3QrEV4Ls= +github.com/mikestefanello/backlite v0.1.0/go.mod h1:/vj8LPZWG/xqK/3uHaqOtu5JRLDEWqeyJKWTAlADTV0= github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw= github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s= github.com/mitchellh/go-wordwrap v1.0.1 h1:TLuKupo69TCn6TQSyGxwI1EblZZEsQ0vMlAFQflz0v0= diff --git a/pkg/handlers/task.go b/pkg/handlers/task.go index f4abc64..aae9564 100644 --- a/pkg/handlers/task.go +++ b/pkg/handlers/task.go @@ -2,6 +2,7 @@ package handlers import ( "fmt" + "github.com/mikestefanello/backlite" "github.com/mikestefanello/pagoda/pkg/msg" "time" @@ -21,7 +22,7 @@ const ( type ( Task struct { - tasks *services.TaskClient + tasks *backlite.Client *services.TemplateRenderer } @@ -71,9 +72,10 @@ func (h *Task) Submit(ctx echo.Context) error { } // Insert the task - err = h.tasks.New(tasks.ExampleTask{ - Message: input.Message, - }). + err = h.tasks. + Add(tasks.ExampleTask{ + Message: input.Message, + }). Wait(time.Duration(input.Delay) * time.Second). Save() diff --git a/pkg/services/container.go b/pkg/services/container.go index 798cc63..fd043f9 100644 --- a/pkg/services/container.go +++ b/pkg/services/container.go @@ -4,17 +4,18 @@ import ( "context" "database/sql" "fmt" + "github.com/mikestefanello/backlite" "log/slog" "os" "strings" entsql "entgo.io/ent/dialect/sql" - _ "github.com/mattn/go-sqlite3" - "github.com/mikestefanello/pagoda/pkg/funcmap" - "github.com/labstack/echo/v4" + _ "github.com/mattn/go-sqlite3" "github.com/mikestefanello/pagoda/config" "github.com/mikestefanello/pagoda/ent" + "github.com/mikestefanello/pagoda/pkg/funcmap" + "github.com/mikestefanello/pagoda/pkg/log" // Require by ent _ "github.com/mikestefanello/pagoda/ent/runtime" @@ -51,7 +52,7 @@ type Container struct { TemplateRenderer *TemplateRenderer // Tasks stores the task client - Tasks *TaskClient + Tasks *backlite.Client } // NewContainer creates and initializes a new Container @@ -177,10 +178,21 @@ func (c *Container) initTasks() { var err error // You could use a separate database for tasks, if you'd like. but using one // makes transaction support easier - c.Tasks, err = NewTaskClient(c.Config.Tasks, c.Database) + c.Tasks, err = backlite.NewClient(backlite.ClientConfig{ + DB: c.Database, + Logger: log.Default(), + NumWorkers: c.Config.Tasks.Goroutines, + ReleaseAfter: c.Config.Tasks.ReleaseAfter, + CleanupInterval: c.Config.Tasks.CleanupInterval, + }) + if err != nil { panic(fmt.Sprintf("failed to create task client: %v", err)) } + + if err = c.Tasks.Install(); err != nil { + panic(fmt.Sprintf("failed to install task schema: %v", err)) + } } // openDB opens a database connection diff --git a/pkg/services/tasks.go b/pkg/services/tasks.go deleted file mode 100644 index e399144..0000000 --- a/pkg/services/tasks.go +++ /dev/null @@ -1,204 +0,0 @@ -package services - -import ( - "bytes" - "context" - "database/sql" - "encoding/gob" - "strings" - "sync" - "time" - - "github.com/maragudk/goqite" - "github.com/maragudk/goqite/jobs" - "github.com/mikestefanello/pagoda/config" - "github.com/mikestefanello/pagoda/pkg/log" -) - -type ( - // TaskClient is that client that allows you to queue or schedule task execution. - // Under the hood we create only a single queue using goqite for all tasks because we do not want more than one - // runner to process the tasks. The TaskClient wrapper provides abstractions for separate, type-safe queues. - TaskClient struct { - queue *goqite.Queue - runner *jobs.Runner - buffers sync.Pool - } - - // Task is a job that can be added to a queue and later passed to and executed by a QueueSubscriber. - // See pkg/tasks for an example of how this can be used with a queue. - Task interface { - Name() string - } - - // TaskSaveOp handles task save operations - TaskSaveOp struct { - client *TaskClient - task Task - tx *sql.Tx - at *time.Time - wait *time.Duration - } - - // Queue is a queue that a Task can be pushed to for execution. - // While this can be implemented directly, it's recommended to use NewQueue() which uses generics in - // order to provide type-safe queues and queue subscriber callbacks for task execution. - Queue interface { - // Name returns the name of the task this queue processes - Name() string - - // Receive receives the Task payload to be processed - Receive(ctx context.Context, payload []byte) error - } - - // queue provides a type-safe implementation of Queue - queue[T Task] struct { - name string - subscriber QueueSubscriber[T] - } - - // QueueSubscriber is a generic subscriber callback for a given queue to process Tasks - QueueSubscriber[T Task] func(context.Context, T) error -) - -// NewTaskClient creates a new task client -func NewTaskClient(cfg config.TasksConfig, db *sql.DB) (*TaskClient, error) { - // Install the schema - if err := goqite.Setup(context.Background(), db); err != nil { - // An error is returned if we already ran this and there's no better way to check. - // You can and probably should handle this via migrations - if !strings.Contains(err.Error(), "already exists") { - return nil, err - } - } - - t := &TaskClient{ - queue: goqite.New(goqite.NewOpts{ - DB: db, - Name: "tasks", - MaxReceive: cfg.MaxRetries, - }), - buffers: sync.Pool{ - New: func() interface{} { - return bytes.NewBuffer(nil) - }, - }, - } - - t.runner = jobs.NewRunner(jobs.NewRunnerOpts{ - Limit: cfg.Goroutines, - Log: log.Default(), - PollInterval: cfg.PollInterval, - Queue: t.queue, - }) - - return t, nil -} - -// StartRunner starts the scheduler service which adds scheduled tasks to the queue. -// This must be running in order to execute queued tasked. -// To stop the runner, cancel the context. -// This is a blocking call. -func (t *TaskClient) StartRunner(ctx context.Context) { - t.runner.Start(ctx) -} - -// Register registers a queue so tasks can be added to it and processed -func (t *TaskClient) Register(queue Queue) { - t.runner.Register(queue.Name(), queue.Receive) -} - -// New starts a task save operation -func (t *TaskClient) New(task Task) *TaskSaveOp { - return &TaskSaveOp{ - client: t, - task: task, - } -} - -// At sets the exact date and time the task should be executed -func (t *TaskSaveOp) At(processAt time.Time) *TaskSaveOp { - t.Wait(time.Until(processAt)) - return t -} - -// Wait instructs the task to wait a given duration before it is executed -func (t *TaskSaveOp) Wait(duration time.Duration) *TaskSaveOp { - t.wait = &duration - return t -} - -// Tx will include the task as part of a given database transaction -func (t *TaskSaveOp) Tx(tx *sql.Tx) *TaskSaveOp { - t.tx = tx - return t -} - -// Save saves the task, so it can be queued for execution -func (t *TaskSaveOp) Save() error { - type message struct { - Name string - Message []byte - } - - // Encode the task - taskBuf := t.client.buffers.Get().(*bytes.Buffer) - if err := gob.NewEncoder(taskBuf).Encode(t.task); err != nil { - return err - } - - // Wrap and encode the message - // This is needed as a workaround because goqite doesn't support delays using the jobs package, - // so we format the message the way it expects but use the queue to supply the delay - msgBuf := t.client.buffers.Get().(*bytes.Buffer) - wrapper := message{Name: t.task.Name(), Message: taskBuf.Bytes()} - if err := gob.NewEncoder(msgBuf).Encode(wrapper); err != nil { - return err - } - - msg := goqite.Message{ - Body: msgBuf.Bytes(), - } - - if t.wait != nil { - msg.Delay = *t.wait - } - - // Put the buffers back in the pool for re-use - taskBuf.Reset() - msgBuf.Reset() - t.client.buffers.Put(taskBuf) - t.client.buffers.Put(msgBuf) - - if t.tx == nil { - return t.client.queue.Send(context.Background(), msg) - } else { - return t.client.queue.SendTx(context.Background(), t.tx, msg) - } -} - -// NewQueue queues a new type-safe Queue of a given Task type -func NewQueue[T Task](subscriber QueueSubscriber[T]) Queue { - var task T - - q := &queue[T]{ - name: task.Name(), - subscriber: subscriber, - } - - return q -} - -func (q *queue[T]) Name() string { - return q.name -} - -func (q *queue[T]) Receive(ctx context.Context, payload []byte) error { - var obj T - err := gob.NewDecoder(bytes.NewReader(payload)).Decode(&obj) - if err != nil { - return err - } - - return q.subscriber(ctx, obj) -} diff --git a/pkg/services/tasks_test.go b/pkg/services/tasks_test.go deleted file mode 100644 index a34385c..0000000 --- a/pkg/services/tasks_test.go +++ /dev/null @@ -1,69 +0,0 @@ -package services - -import ( - "context" - "database/sql" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "testing" - "time" -) - -type testTask struct { - Val int -} - -func (t testTask) Name() string { - return "test_task" -} - -func TestTaskClient_New(t *testing.T) { - var subCalled bool - - queue := NewQueue[testTask](func(ctx context.Context, task testTask) error { - subCalled = true - assert.Equal(t, 123, task.Val) - return nil - }) - c.Tasks.Register(queue) - - task := testTask{Val: 123} - - tx := &sql.Tx{} - - op := c.Tasks. - New(task). - Wait(5 * time.Second). - Tx(tx) - - // Check that the task op was built correctly - assert.Equal(t, task, op.task) - assert.Equal(t, tx, op.tx) - assert.Equal(t, 5*time.Second, *op.wait) - - // Remove the transaction and delay so we can process the task immediately - op.tx, op.wait = nil, nil - err := op.Save() - require.NoError(t, err) - - // Start the runner - ctx, cancel := context.WithCancel(context.Background()) - go c.Tasks.StartRunner(ctx) - defer cancel() - - // Check for up to 5 seconds if the task executed - start := time.Now() -waitLoop: - for { - switch { - case subCalled: - break waitLoop - case time.Since(start) > (5 * time.Second): - break waitLoop - default: - time.Sleep(10 * time.Millisecond) - } - } - - assert.True(t, subCalled) -} diff --git a/pkg/tasks/example.go b/pkg/tasks/example.go index 3ef4b60..48e1839 100644 --- a/pkg/tasks/example.go +++ b/pkg/tasks/example.go @@ -2,29 +2,44 @@ package tasks import ( "context" + "github.com/mikestefanello/backlite" + "time" "github.com/mikestefanello/pagoda/pkg/log" "github.com/mikestefanello/pagoda/pkg/services" ) -// ExampleTask is an example implementation of services.Task +// ExampleTask is an example implementation of backlite.Task // This represents the task that can be queued for execution via the task client and should contain everything -// that your queue subscriber needs to process the task. +// that your queue processor needs to process the task. type ExampleTask struct { Message string } -// Name satisfies the services.Task interface by proviing a unique name for this Task type -func (t ExampleTask) Name() string { - return "example_task" +// Config satisfies the backlite.Task interface by providing configuration for the queue that these items will be +// placed into for execution. +func (t ExampleTask) Config() backlite.QueueConfig { + return backlite.QueueConfig{ + Name: "ExampleTask", + MaxAttempts: 3, + Timeout: 5 * time.Second, + Backoff: 10 * time.Second, + Retention: &backlite.Retention{ + Duration: 24 * time.Hour, + OnlyFailed: false, + Data: &backlite.RetainData{ + OnlyFailed: false, + }, + }, + } } // NewExampleTaskQueue provides a Queue that can process ExampleTask tasks // The service container is provided so the subscriber can have access to the app dependencies. // All queues must be registered in the Register() function. // Whenever an ExampleTask is added to the task client, it will be queued and eventually sent here for execution. -func NewExampleTaskQueue(c *services.Container) services.Queue { - return services.NewQueue[ExampleTask](func(ctx context.Context, task ExampleTask) error { +func NewExampleTaskQueue(c *services.Container) backlite.Queue { + return backlite.NewQueue[ExampleTask](func(ctx context.Context, task ExampleTask) error { log.Default().Info("Example task received", "message", task.Message, )