Skip to content

Commit

Permalink
(choria-io#109) Support signing tasks before storing
Browse files Browse the repository at this point in the history
Signed-off-by: R.I.Pienaar <[email protected]>
  • Loading branch information
ripienaar committed May 5, 2023
1 parent 3fd8f03 commit 9acac5f
Show file tree
Hide file tree
Showing 9 changed files with 279 additions and 8 deletions.
138 changes: 138 additions & 0 deletions ABTaskFile
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
name: build_tasks
description: Choria Build Tasks

commands:
- name: dependencies
type: parent
description: Manage dependencies
aliases: [d]
commands:
- name: update
description: Update dependencies
type: exec
aliases: [up]
dir: "{{ AppDir }}"
flags:
- name: verbose
description: Log verbosely
short: v
bool: true
- name: proxy
description: Enable using go proxy
bool: true
default: "true"
script: |
. "{{ BashHelperPath }}"

ab_announce Updating all dependencies
echo

{{ if eq .Flags.proxy false }}
export GOPROXY=direct
ab_say Disabling go mod proxy
{{ end }}

go get -u -n -a -t {{- if .Flags.verbose }} -d -x {{ end }} ./...

ab_say Running go mod tidy

go mod tidy

- name: test
type: parent
aliases: [t]
description: Perform various tests
commands:
- name: unit
type: exec
description: Run ginkgo unit tests
aliases: [u]
arguments:
- name: dir
description: Directory to test
default: .
flags:
- name: update
description: Updates the ginkgo runtime
bool: true
script: |
set -e

. "{{ BashHelperPath }}"

{{ if .Flags.update }}
ab_say Updating ginkgo binary
go install github.com/onsi/ginkgo/v2/ginkgo
{{ end }}

ginkgo -r --skip Integration {{ .Arguments.dir | escape }}

- name: integration
type: exec
dir: "{{ AppDir }}"
aliases: [i]
description: Run ginkgo integration tests
command: ginkgo -r integration

- name: lint
type: exec
dir: "{{ AppDir }}"
flags:
- name: vet
description: Perform go vet
bool: true
default: true
- name: staticcheck
description: Perform staticcheck
bool: true
default: true
- name: update
description: Updates lint dependencies
bool: true
script: |
set -e

. "{{ BashHelperPath }}"

{{ if .Flags.update }}
ab_say Updating linting tools
go install github.com/client9/misspell/cmd/misspell@latest
go install honnef.co/go/tools/cmd/staticcheck@latest
{{ else }}
echo ">>> Run with --update to install required commands"
echo
{{ end }}

ab_say Formatting source files
go fmt ./...

ab_say Tidying go mod
go mod tidy

ab_say Checking spelling
find . -type f -name "*.go" | xargs misspell -error -locale US -i flavour
find docs/content -type f -name "*.md" | xargs misspell -error -locale US

{{ if .Flags.vet }}
ab_say Performing go vet
go vet ./...
{{ end }}

{{ if .Flags.staticcheck }}
ab_say Running staticcheck
staticcheck ./...
{{ end }}

- name: docs
type: parent
description: Documentation related commands
commands:
- name: serve
description: Serves documentation locally
type: exec
dir: "{{ AppDir }}"
flags:
- name: port
description: The port to listen on
default: "8081"
command: hugo serve -p {{ .Flags.port }} -s docs
24 changes: 24 additions & 0 deletions ajc/task_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ package main

import (
"context"
"crypto/ed25519"
"encoding/hex"
"fmt"
"os"
"strings"
"time"

Expand Down Expand Up @@ -36,6 +39,7 @@ type taskCommand struct {
discardExpired bool
dependencies []string
loadDepResults bool
ed25519Seed string

limit int
json bool
Expand All @@ -55,6 +59,7 @@ func configureTaskCommand(app *fisk.Application) {
add.Flag("tries", "Sets the maximum amount of times this task may be tried").IntVar(&c.maxtries)
add.Flag("depends", "Sets IDs to depend on, comma sep or pass multiple times").StringsVar(&c.dependencies)
add.Flag("load", "Loads results from dependencies before executing task").BoolVar(&c.loadDepResults)
add.Flag("sign", "Signs the task using an ed25519 seed").StringVar(&c.ed25519Seed)

retry := tasks.Command("retry", "Retries delivery of a task currently in the Task Store").Action(c.retryAction)
retry.Arg("id", "The Task ID to view").Required().StringVar(&c.id)
Expand Down Expand Up @@ -176,6 +181,9 @@ func (c *taskCommand) watchAction(_ *fisk.ParseContext) error {
fmt.Printf("[%s] %s: queue: %s type: %s tries: %d state: %s error: %s\n", e.TimeStamp.Format("15:04:05"), e.TaskID, e.Queue, e.TaskType, e.Tries, e.State, e.LastErr)
}

case aj.LeaderElectedEvent:
fmt.Printf("[%s] %s: new %s leader\n", e.TimeStamp.Format("15:04:05"), e.Name, e.Component)

default:
fmt.Printf("[%s] Unknown event type %s\n", time.Now().UTC().Format("15:04:05"), kind)
}
Expand Down Expand Up @@ -412,6 +420,22 @@ func (c *taskCommand) addAction(_ *fisk.ParseContext) error {
opts = append(opts, aj.TaskMaxTries(c.maxtries))
}

if c.ed25519Seed != "" {
var seed []byte
if fileExist(c.ed25519Seed) {
seed, err = os.ReadFile(c.ed25519Seed)
if err != nil {
return err
}
} else {
seed, err = hex.DecodeString(c.ed25519Seed)
if err != nil {
return err
}
}
opts = append(opts, aj.TaskSigner(ed25519.NewKeyFromSeed(seed)))
}

task, err := aj.NewTask(c.ttype, c.payload, opts...)
if err != nil {
return err
Expand Down
12 changes: 12 additions & 0 deletions ajc/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,3 +224,15 @@ func showQueue(q *asyncjobs.QueueInfo) {
fmt.Printf(" Last Item: %v (%s)\n", q.Stream.State.LastTime.Format(timeFormat), humanizeDuration(time.Since(q.Stream.State.LastTime)))
}
}

func fileExist(path string) bool {
if path == "" {
return false
}

if _, err := os.Stat(path); os.IsNotExist(err) {
return false
}

return true
}
5 changes: 3 additions & 2 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ var _ = Describe("Client", func() {
})
})

It("Should handle retried messages with a backoff delay", func() {
It("Should handle retried messages with a backoff delay", FlakeAttempts(5), func() {
withJetStream(func(nc *nats.Conn, _ *jsm.Manager) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
Expand All @@ -211,7 +211,7 @@ var _ = Describe("Client", func() {
var tries []time.Time

router := NewTaskRouter()
router.HandleFunc("ginkgo", func(ctx context.Context, log Logger, t *Task) (any, error) {
err = router.HandleFunc("ginkgo", func(ctx context.Context, log Logger, t *Task) (any, error) {
tries = append(tries, time.Now())

log.Infof("Trying task %s on try %d\n", t.ID, t.Tries)
Expand All @@ -223,6 +223,7 @@ var _ = Describe("Client", func() {
wg.Done()
return "done", nil
})
Expect(err).ToNot(HaveOccurred())

go client.Run(ctx, router)

Expand Down
9 changes: 7 additions & 2 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ var (
ErrTaskTypeInvalid = fmt.Errorf("task type is invalid")
// ErrTaskDependenciesFailed indicates that the task cannot be run as its dependencies failed
ErrTaskDependenciesFailed = fmt.Errorf("task dependencies failed")
// ErrTaskAlreadySigned indicates a task is already signed
ErrTaskAlreadySigned = fmt.Errorf("task is already signed")
// ErrTaskSignatureRequiresQueue indicates a signature request was made without configuring the queue name for a task
ErrTaskSignatureRequiresQueue = fmt.Errorf("signing a task requires the queue to be set")

// ErrNoHandlerForTaskType indicates that a task could not be handled by any known handlers
ErrNoHandlerForTaskType = fmt.Errorf("no handler for task type")
Expand Down Expand Up @@ -73,14 +77,15 @@ var (
ErrExternalCommandNotFound = fmt.Errorf("command not found")
// ErrExternalCommandFailed indicates a command for an ExternalProcess handler failed
ErrExternalCommandFailed = fmt.Errorf("execution failed")

// ErrUnknownEventType indicates that while parsing an event an unknown type of event was encountered
ErrUnknownEventType = fmt.Errorf("unknown event type")

// ErrUnknownRetryPolicy indicates the requested retry policy does not exist
ErrUnknownRetryPolicy = fmt.Errorf("unknown retry policy")

// ErrUnknownDiscardPolicy indicates a discard policy could not be found matching a name
ErrUnknownDiscardPolicy = fmt.Errorf("unknown discard policy")
// ErrInvalidPrivateKey indicates the private key is not valid
ErrInvalidPrivateKey = fmt.Errorf("invalid private key length")

// ErrRequestReplyFailed indicates a callout to a remote handler failed due to a timeout, lack of listeners or network error
ErrRequestReplyFailed = fmt.Errorf("request-reply callout failed")
Expand Down
6 changes: 6 additions & 0 deletions storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,12 @@ func (s *jetStreamStorage) EnqueueTask(ctx context.Context, queue *Queue, task *
}

task.Queue = queue.Name

err = task.Sign()
if err != nil {
return err
}

err = s.SaveTaskState(ctx, task, true)
if err != nil {
return err
Expand Down
11 changes: 9 additions & 2 deletions storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package asyncjobs
import (
"bytes"
"context"
"crypto/ed25519"
"encoding/json"
"fmt"
"log"
Expand Down Expand Up @@ -914,11 +915,16 @@ var _ = Describe("Storage", func() {
err = storage.PrepareQueue(q, 1, true)
Expect(err).ToNot(HaveOccurred())

task, err := NewTask("ginkgo", nil)
_, pri, err := ed25519.GenerateKey(nil)
Expect(err).ToNot(HaveOccurred())

task, err := NewTask("ginkgo", nil, TaskSigner(pri))
Expect(err).ToNot(HaveOccurred())

Expect(task.Signature).To(HaveLen(0))
err = storage.EnqueueTask(ctx, q, task)
Expect(err).ToNot(HaveOccurred())
Expect(task.Signature).To(HaveLen(128))

msg, err := storage.qStreams[q.Name].ReadMessage(1)
Expect(err).ToNot(HaveOccurred())
Expand All @@ -929,8 +935,9 @@ var _ = Describe("Storage", func() {
Expect(item.Kind).To(Equal(TaskItem))
Expect(item.JobID).To(Equal(task.ID))

_, err = storage.LoadTaskByID(task.ID)
t, err := storage.LoadTaskByID(task.ID)
Expect(err).ToNot(HaveOccurred())
Expect(t.Signature).To(Equal(task.Signature))
})
})
})
Expand Down
Loading

0 comments on commit 9acac5f

Please sign in to comment.