Skip to content

Commit

Permalink
Merge pull request #10 from jpoz/ux-update
Browse files Browse the repository at this point in the history
Adding active workers count
  • Loading branch information
jpoz authored Aug 19, 2024
2 parents b278c18 + a61435f commit 66dbf83
Show file tree
Hide file tree
Showing 24 changed files with 9,440 additions and 193 deletions.
2 changes: 1 addition & 1 deletion .air.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ tmp_dir = "tmp"
follow_symlink = false
full_bin = ""
include_dir = []
include_ext = ["go", "tpl", "tmpl", "templ", "html"]
include_ext = ["go", "tpl", "tmpl", "templ", "html", "ts"]
include_file = []
kill_delay = "0s"
log = "build-errors.log"
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: '1.22'
go-version: '1.23'

- name: Build
run: go build -v ./...
Expand Down
4 changes: 1 addition & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,7 @@ gen_protos:
## Test
.PHONY: test
test: ## Run tests
.PHONY: cover
cover: test ## Generate coverage report
$(GOCMD) tool cover -html=coverage.out
$(GOTEST)

.PHONY: integration
integration: ## Run tests + integration
Expand Down
25 changes: 18 additions & 7 deletions _dev/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ package main
import (
"context"
"flag"
"io"
"fmt"
"log/slog"
"math/rand"
"os"
"os/signal"
"syscall"
Expand All @@ -17,6 +18,7 @@ import (
)

var redisURL = "redis://localhost:36379"
var sleep = 3 * time.Second

func main() {
var (
Expand All @@ -25,7 +27,7 @@ func main() {
)
flag.Parse()

log := slog.New(tint.NewHandler(os.Stderr, &tint.Options{
log := slog.New(tint.NewHandler(os.Stdout, &tint.Options{
Level: slog.LevelDebug,
TimeFormat: time.Kitchen,
}))
Expand All @@ -37,8 +39,8 @@ func main() {

w, err := conveyor.NewWorker(&config.Worker{
RedisURL: redisURL,
Logger: slog.New(tint.NewHandler(io.Discard, &tint.Options{
Level: slog.LevelDebug,
Logger: slog.New(tint.NewHandler(os.Stdout, &tint.Options{
Level: slog.LevelInfo,
TimeFormat: time.Kitchen,
})),
})
Expand All @@ -48,7 +50,7 @@ func main() {
os.Exit(1)
}

err = w.RegisterJobs(StartJob, ChildJob)
err = w.RegisterJobs(StartJob, ChildJob, LastJob)
if err != nil {
log.Error("failed to register jobs", slog.String("error", err.Error()))
os.Exit(1)
Expand Down Expand Up @@ -82,6 +84,7 @@ func main() {
os.Exit(1)
}

log.Info("Starting hub...")
s.Run(ctx)
}()
}
Expand Down Expand Up @@ -126,14 +129,15 @@ func StartJob(ctx context.Context, arg *Start) error {

_, err := client.EnqueueHeir(ctx, &Last{Level: 0, Message: arg.Message})
if err != nil {
slog.Error("failed to enqueue job", slog.String("error", err.Error()))
return err
}

// slog.Info("[%s] Enqueing %d for %d levels\n", j.Uuid, arg.Spawn, arg.Levels)

// random spawn
spawnr := int32(time.Now().Unix()) % (arg.Spawn + 1)

time.Sleep(sleep)

for i := int32(0); i < arg.Spawn; i++ {
_, err := client.EnqueueChild(ctx, &Child{
Level: int32(1),
Expand All @@ -149,11 +153,18 @@ func StartJob(ctx context.Context, arg *Start) error {
}

func ChildJob(ctx context.Context, arg *Child) error {
time.Sleep(sleep)

if rand.Intn(5) > 2 {
return fmt.Errorf("child job failed")
}

return nil
}

func LastJob(ctx context.Context, arg *Last) error {
client := conveyor.CurrentClient(ctx)
time.Sleep(sleep)
_, err := client.Enqueue(ctx, &Start{})
if err != nil {
slog.Error("failed to enqueue job", slog.String("error", err.Error()))
Expand Down
103 changes: 88 additions & 15 deletions hub/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,25 @@ import (
var ResultColors = []string{"#4caf50", "#2196f3", "#f44336"}

type ChartData struct {
Labels []string `json:"labels"`
Datasets []Dataset `json:"datasets"`
Series []Series `json:"series"`
Colors []string `json:"colors,omitempty"`
XAxis XAxis `json:"xaxis,omitempty"`
YAxis YAxis `json:"yaxis,omitempty"`
}

type Dataset struct {
Label string `json:"label"`
Data []int64 `json:"data"`
BackgroundColor string `json:"backgroundColor"`
Stack string `json:"stack"`
type Series struct {
Name string `json:"name"`
Data []int64 `json:"data"`
}

type XAxis struct {
Categories []string `json:"categories"`
}

type YAxis struct {
Title struct {
Text string `json:"text"`
} `json:"title"`
}

func (s *Server) JobsApi(w http.ResponseWriter, r *http.Request) {
Expand All @@ -33,8 +43,31 @@ func (s *Server) JobsApi(w http.ResponseWriter, r *http.Request) {
}

out := ChartData{
Labels: make([]string, 60),
Datasets: make([]Dataset, len(results)),
Series: []Series{
{
Name: "Success",
Data: make([]int64, 60),
},
{
Name: "Error",
Data: make([]int64, 60),
},
{
Name: "Failure",
Data: make([]int64, 60),
},
},
Colors: ResultColors,
XAxis: XAxis{
Categories: make([]string, 60),
},
YAxis: YAxis{
Title: struct {
Text string `json:"text"`
}{
Text: "Jobs",
},
},
}

var err error
Expand All @@ -43,22 +76,62 @@ func (s *Server) JobsApi(w http.ResponseWriter, r *http.Request) {
for i := 0; i < 60; i++ {
t := time.Now().Add(-time.Duration(i) * time.Minute)
if c == 0 {
out.Labels[i] = t.Format("15:04")
out.XAxis.Categories[i] = t.Format(time.RFC3339)
}
data[i], err = s.storage.HistoricalJobCount(ctx, t, r)
if err != nil {
s.log.Error("failed to get historical job count", slog.String("error", err.Error()))
}
}

out.Datasets[c] = Dataset{
Label: r.String(),
Data: data,
BackgroundColor: ResultColors[c],
Stack: "Stack 0",
out.Series[c].Data = data
}

outJSON, err := json.Marshal(out)
if err != nil {
s.log.Error("failed to marshal chart data", slog.String("error", err.Error()))
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

w.Write(outJSON)
}

func (s *Server) WorkersApi(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()

out := ChartData{
Series: []Series{
{
Name: "Workers",
Data: make([]int64, 60),
},
},
XAxis: XAxis{
Categories: make([]string, 60),
},
YAxis: YAxis{
Title: struct {
Text string `json:"text"`
}{
Text: "Workers",
},
},
}

var err error
data := make([]int64, 60)
for i := 0; i < 60; i++ {
t := time.Now().Add(-time.Duration(i) * time.Minute)
out.XAxis.Categories[i] = t.Format(time.RFC3339)
data[i], err = s.storage.HistoricalWorkerCount(ctx, t)
if err != nil {
s.log.Error("failed to get historical job count", slog.String("error", err.Error()))
}
}

out.Series[0].Data = data

outJSON, err := json.Marshal(out)
if err != nil {
s.log.Error("failed to marshal chart data", slog.String("error", err.Error()))
Expand Down
Loading

0 comments on commit 66dbf83

Please sign in to comment.