Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes 4103: add docs for tasking and metrics #718

Merged
merged 3 commits into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions docs/metrics/metrics.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Metrics

Our app uses [Prometheus](https://prometheus.io/) to continuously monitor performance and alert if performance in an area has fallen too low.

Prometheus requires our app to return metrics on the `/metrics` endpoint. A metric is a point of data about our app. These are metrics we define, implement and eventually use to configure Prometheus to alert on. Prometheus queries the metrics endpoint every 15 seconds (or as defined in the configuration) to get the latest information.

All of our metrics are defined in the instrumentation package.
https://github.com/content-services/content-sources-backend/blob/173f764d031da46665136a317caa8213e3677ad7/pkg/instrumentation/metrics.go#L12-L28

## How are metrics implemented?

One example of a metric is `repository_configs_total`, which records the total number of repository configurations across the app.

The metric is registered in `metrics.go`

https://github.com/content-services/content-sources-backend/blob/173f764d031da46665136a317caa8213e3677ad7/pkg/instrumentation/metrics.go#L87-L91

Every 15 seconds when Prometheus scrapes the app, the collector is iterated and a query is run to get the total number of repository configurations. All of the collectors are defined in `collector.go`. For each metric, the collector calls a method that returns the metric value.

https://github.com/content-services/content-sources-backend/blob/173f764d031da46665136a317caa8213e3677ad7/pkg/instrumentation/custom/collector.go#L63
50 changes: 0 additions & 50 deletions docs/tasking-system-architecture.md

This file was deleted.

1 change: 1 addition & 0 deletions docs/tasking_system/images/api_process.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 1 addition & 0 deletions docs/tasking_system/images/cancellation_logic.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 1 addition & 0 deletions docs/tasking_system/images/consumer_process.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/tasking_system/images/worker_loop.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
74 changes: 74 additions & 0 deletions docs/tasking_system/tasking_system.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# Tasking System
The tasking system enables our app to complete actions, i.e. tasks, asynchronously from the backend API.
For example, when creating a repository, a “snapshot” task is triggered, and begins to run in parallel to the API server.

For a detailed overview of the tasking system see [this video](https://drive.google.com/file/d/1clTBLDOhPaTmxEd2PUuvmuHYu2oP6Yp1/view) and the [accompanying slide deck](https://docs.google.com/presentation/d/19nzcEvXNS1OzwRmGJ7tPdeiJS2LAbjwMi0YBH-yU-eE/edit#slide=id.p).


## Features
* Queue and process asynchronous tasks
* Check the status of queued tasks
* Requeue a task if its worker times-out or exits early, with a backoff timer
* Schedule dependent tasks
* Cancel a task
* Set a task's priority

## Directory structure

| Package | Description |
|--------------------|-------------------------------------------------------------------------------------------------|
| pkg/tasks | each file contains code for handling a particular task type |
| pkg/tasks/queue | queue used by client and worker to schedule tasks |
| pkg/tasks/client | an interface to enqueue a task |
| pkg/tasks/worker | an interface to dequeue and handle tasks |
| pkg/tasks/payloads | workaround to import certain payloads to dao layer, but payloads are not generally defined here |

## Components

### Queue

`Queue` is an interface used by the client and worker packages for scheduling tasks. It is meant to be used through client or worker, not imported independently.

https://github.com/content-services/content-sources-backend/blob/173f764d031da46665136a317caa8213e3677ad7/pkg/tasks/queue/queue.go#L28-L53

### Client

`TaskClient` is an interface for enqueuing or canceling tasks.

https://github.com/content-services/content-sources-backend/blob/173f764d031da46665136a317caa8213e3677ad7/pkg/tasks/client/client.go#L12-L15
### Worker Pool

`TaskWorkerPool` is an interface used by the main application to configure and start the workers and the heartbeat listener.

https://github.com/content-services/content-sources-backend/blob/173f764d031da46665136a317caa8213e3677ad7/pkg/tasks/worker/worker_pool.go#L17-L28
A worker pool will manage the individual workers. Workers are meant to be used through the `TaskWorkerPool` interface, not directly.

Each worker is a goroutine that follows the logic loop below:

![image](images/worker_loop.png)

## Deployment

The tasking system runs in two different processes, the API and the consumer.

The API is the main API server, where tasks are enqueued from endpoint handlers.

![image](images/api_process.svg)

The consumer runs two sets of goroutines: the workers and the heartbeat listener.

![image](images/consumer_process.svg)

## How to add a new task type

To add a new task you must define a handler method. Each handler method should end with a `Run()` method that performs the task. Tasks should be written to be idempotent i.e. they can be re-run without causing errors.

Here is the snapshot handler as an example:

https://github.com/content-services/content-sources-backend/blob/173f764d031da46665136a317caa8213e3677ad7/pkg/tasks/repository_snapshot.go#L27-L53
Once a handler is created, it needs to be registered to the worker pool. We register our tasks here:

https://github.com/content-services/content-sources-backend/blob/173f764d031da46665136a317caa8213e3677ad7/cmd/content-sources/main.go#L109-L115
See here for a list of all current task types:

https://github.com/content-services/content-sources-backend/blob/173f764d031da46665136a317caa8213e3677ad7/pkg/config/tasks.go#L3-L10
jlsherrill marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 1 addition & 1 deletion pkg/tasks/introspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"github.com/rs/zerolog/log"
)

func IntrospectHandler(ctx context.Context, task *models.TaskInfo, q *queue.Queue) error {
func IntrospectHandler(ctx context.Context, task *models.TaskInfo, _ *queue.Queue) error {
var p payloads.IntrospectPayload

if err := json.Unmarshal(task.Payload, &p); err != nil {
Expand Down
Loading