Skip to content

Commit

Permalink
fix: pubsub broker now can handle unlimited events
Browse files Browse the repository at this point in the history
  • Loading branch information
leg100 committed Apr 28, 2024
1 parent 033cefe commit d09586a
Show file tree
Hide file tree
Showing 23 changed files with 150 additions and 140 deletions.
33 changes: 22 additions & 11 deletions internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func Start(stdout, stderr io.Writer, args []string) error {
return nil
}

app, model, err := newApp(cfg)
app, model, err := newApp(ctx, cfg)
if err != nil {
return err
}
Expand Down Expand Up @@ -88,7 +88,7 @@ func Start(stdout, stderr io.Writer, args []string) error {
}

// newApp constructs an instance of the app and the top-level TUI model.
func newApp(cfg config) (*app, tea.Model, error) {
func newApp(ctx context.Context, cfg config) (*app, tea.Model, error) {
// Setup logging
logger := logging.NewLogger(cfg.loggingOptions)

Expand Down Expand Up @@ -132,6 +132,17 @@ func newApp(cfg config) (*app, tea.Model, error) {
Logger: logger,
})

// Shutdown services once context is done.
go func() {
<-ctx.Done()
logger.Shutdown()
tasks.Shutdown()
modules.Shutdown()
workspaces.Shutdown()
states.Shutdown()
runs.Shutdown()
}()

// Construct top-level TUI model.
model, err := top.New(top.Options{
TaskService: tasks,
Expand Down Expand Up @@ -164,46 +175,46 @@ func newApp(cfg config) (*app, tea.Model, error) {
// start starts the app daemons and relays events to the TUI.
func (a *app) start(ctx context.Context, s sender) func() error {
// Start daemons
task.StartEnqueuer(ctx, a.tasks)
run.StartScheduler(ctx, a.runs, a.workspaces)
task.StartEnqueuer(a.tasks)
run.StartScheduler(a.runs, a.workspaces)
waitfn := task.StartRunner(ctx, a.logger, a.tasks, a.cfg.MaxTasks)

// Automatically load workspaces whenever modules are loaded.
a.workspaces.LoadWorkspacesUponModuleLoad(ctx, a.modules)
a.workspaces.LoadWorkspacesUponModuleLoad(a.modules)

// Relay resource events to TUI. Deliberately set up subscriptions *before*
// any events are triggered, to ensure the TUI receives all messages.
logEvents := a.logger.Subscribe(ctx)
logEvents := a.logger.Subscribe()
go func() {
for ev := range logEvents {
s.Send(ev)
}
}()
modEvents := a.modules.Subscribe(ctx)
modEvents := a.modules.Subscribe()
go func() {
for ev := range modEvents {
s.Send(ev)
}
}()
wsEvents := a.workspaces.Subscribe(ctx)
wsEvents := a.workspaces.Subscribe()
go func() {
for ev := range wsEvents {
s.Send(ev)
}
}()
stateEvents := a.states.Subscribe(ctx)
stateEvents := a.states.Subscribe()
go func() {
for ev := range stateEvents {
s.Send(ev)
}
}()
runEvents := a.runs.Subscribe(ctx)
runEvents := a.runs.Subscribe()
go func() {
for ev := range runEvents {
s.Send(ev)
}
}()
taskEvents := a.tasks.Subscribe(ctx)
taskEvents := a.tasks.Subscribe()
go func() {
for ev := range taskEvents {
s.Send(ev)
Expand Down
19 changes: 11 additions & 8 deletions internal/app/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,31 @@ import (
"github.com/stretchr/testify/require"
)

func setup(t *testing.T) *teatest.TestModel {
func setup(t *testing.T, workdir string) *teatest.TestModel {
t.Helper()

// Clean up any leftover artefacts from previous tests (previous tests
// neglect to clean up artefacts if they end with a panic).
cleanupArtefacts()
cleanupArtefacts(workdir)

// And clean up artefacts once test finishes
t.Cleanup(func() {
cleanupArtefacts(workdir)
})

// Cancel context once test finishes.
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

// Clean up artefacts once test finishes
t.Cleanup(cleanupArtefacts)

// Setup provider mirror
setupProviderMirror(t)

app, m, err := newApp(
ctx,
config{
FirstPage: "modules",
Program: "terraform",
Workdir: "./testdata",
Workdir: workdir,
MaxTasks: 3,
loggingOptions: logging.Options{
Level: "debug",
Expand All @@ -66,8 +69,8 @@ func setup(t *testing.T) *teatest.TestModel {
}

// cleanupArtefacts removes all the detritus that terraform leaves behind.
func cleanupArtefacts() {
_ = filepath.WalkDir("./testdata/modules", func(path string, d fs.DirEntry, walkerr error) error {
func cleanupArtefacts(workdir string) {
_ = filepath.WalkDir(workdir, func(path string, d fs.DirEntry, walkerr error) error {
if walkerr != nil {
return walkerr
}
Expand Down
11 changes: 6 additions & 5 deletions internal/app/module_list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

func TestModuleList(t *testing.T) {
tm := setup(t)
tm := setup(t, "./testdata/module_list")

// Expect three modules to be listed
want := []string{
Expand All @@ -17,6 +17,7 @@ func TestModuleList(t *testing.T) {
"modules/c",
}
waitFor(t, tm, func(s string) bool {
t.Log(s)
for _, w := range want {
if !strings.Contains(s, w) {
return false
Expand Down Expand Up @@ -54,7 +55,7 @@ func TestModuleList(t *testing.T) {
}

func TestModuleList_Reload(t *testing.T) {
tm := setup(t)
tm := setup(t, "./testdata/module_list")

// Expect message to inform user that three modules have been loaded
waitFor(t, tm, func(s string) bool {
Expand All @@ -72,7 +73,7 @@ func TestModuleList_Reload(t *testing.T) {
}

func TestModuleList_ReloadWorkspaces(t *testing.T) {
tm := setup(t)
tm := setup(t, "./testdata/module_list")

// Expect message to inform user that three modules have been loaded
waitFor(t, tm, func(s string) bool {
Expand Down Expand Up @@ -112,7 +113,7 @@ func TestModuleList_ReloadWorkspaces(t *testing.T) {
// then attempting to create a run on each. Pug should de-select those
// selections which are not initialized / have no current workspace.
func TestModuleList_CreateRun(t *testing.T) {
tm := setup(t)
tm := setup(t, "./testdata/module_list")

// Expect message to inform user that modules have been loaded
waitFor(t, tm, func(s string) bool {
Expand Down Expand Up @@ -155,7 +156,7 @@ func TestModuleList_CreateRun(t *testing.T) {
// should de-select those selections which have no current run in a planned
// state.
func TestModuleList_ApplyCurrentRun(t *testing.T) {
tm := setup(t)
tm := setup(t, "./testdata/module_list")

// Expect message to inform user that modules have been loaded
waitFor(t, tm, func(s string) bool {
Expand Down
6 changes: 3 additions & 3 deletions internal/app/module_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

func TestModule(t *testing.T) {
tm := setup(t)
tm := setup(t, "./testdata/module_list")

// Expect module to be listed
waitFor(t, tm, func(s string) bool {
Expand All @@ -25,7 +25,7 @@ func TestModule(t *testing.T) {
}

func TestModule_SetCurrentWorkspace(t *testing.T) {
tm := setup(t)
tm := setup(t, "./testdata/module_list")

// Wait for module to be loaded
waitFor(t, tm, func(s string) bool {
Expand Down Expand Up @@ -62,7 +62,7 @@ func TestModule_SetCurrentWorkspace(t *testing.T) {
}

func TestModule_ReloadWorkspaces(t *testing.T) {
tm := setup(t)
tm := setup(t, "./testdata/module_list")

// Wait for module to be loaded
waitFor(t, tm, func(s string) bool {
Expand Down
2 changes: 1 addition & 1 deletion internal/app/quit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

func TestQuit(t *testing.T) {
tm := setup(t)
tm := setup(t, "./testdata/module_list")

tm.Send(tea.KeyMsg{
Type: tea.KeyCtrlC,
Expand Down
4 changes: 2 additions & 2 deletions internal/app/run_list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

// TestRunList_Single tests interacting with a single run in the run list view.
func TestRunList_Single(t *testing.T) {
tm := setup(t)
tm := setup(t, "./testdata/module_list")

// Wait for module to be loaded
waitFor(t, tm, func(s string) bool {
Expand Down Expand Up @@ -74,7 +74,7 @@ func TestRunList_Single(t *testing.T) {
// TestRunList_Multiple demonstrates interacting with multiple runs on the run
// list page.
func TestRunList_Multiple(t *testing.T) {
tm := setup(t)
tm := setup(t, "./testdata/module_list")

// Expect message to inform user that modules have been loaded
waitFor(t, tm, func(s string) bool {
Expand Down
2 changes: 1 addition & 1 deletion internal/app/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
)

func TestRun(t *testing.T) {
tm := setup(t)
tm := setup(t, "./testdata/module_list")

// Initialize and apply run on modules/a
initAndApplyModuleA(t, tm)
Expand Down
15 changes: 15 additions & 0 deletions internal/app/testdata/module_list/modules/a/main.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
terraform {
backend "local" {}
}

resource "random_pet" "pet" {
count = 10

keepers = {
now = timestamp()
}
}

output "pets" {
value = random_pet.pet[*].id
}
Empty file.
15 changes: 15 additions & 0 deletions internal/app/testdata/module_list/modules/b/main.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
terraform {
backend "local" {}
}

resource "random_pet" "pet" {
count = 10

keepers = {
now = timestamp()
}
}

output "pets" {
value = random_pet.pet[*].id
}
15 changes: 15 additions & 0 deletions internal/app/testdata/module_list/modules/c/main.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
terraform {
backend "local" {}
}

resource "random_pet" "pet" {
count = 10

keepers = {
now = timestamp()
}
}

output "pets" {
value = random_pet.pet[*].id
}
6 changes: 3 additions & 3 deletions internal/app/workspace_list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

func TestWorkspaceList_SetCurrentWorkspace(t *testing.T) {
tm := setup(t)
tm := setup(t, "./testdata/module_list")

// Wait for module to be loaded
waitFor(t, tm, func(s string) bool {
Expand Down Expand Up @@ -50,7 +50,7 @@ func TestWorkspaceList_SetCurrentWorkspace(t *testing.T) {
}

func TestWorkspaceList_CreateRun(t *testing.T) {
tm := setup(t)
tm := setup(t, "./testdata/module_list")

// Expect message to inform user that modules have been loaded
waitFor(t, tm, func(s string) bool {
Expand Down Expand Up @@ -98,7 +98,7 @@ func TestWorkspaceList_CreateRun(t *testing.T) {
}

func TestWorkspaceList_ApplyCurrentRun(t *testing.T) {
tm := setup(t)
tm := setup(t, "./testdata/module_list")

// Expect message to inform user that modules have been loaded
waitFor(t, tm, func(s string) bool {
Expand Down
2 changes: 1 addition & 1 deletion internal/app/workspace_resources_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

func TestWorkspace_Resources(t *testing.T) {
tm := setup(t)
tm := setup(t, "./testdata/module_list")

// Initialize and apply run on modules/a
initAndApplyModuleA(t, tm)
Expand Down
11 changes: 2 additions & 9 deletions internal/logging/logger.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package logging

import (
"context"
"io"
"log/slog"

"github.com/leg100/pug/internal/pubsub"
"github.com/leg100/pug/internal/resource"
)

var levels = map[string]slog.Level{
Expand All @@ -30,7 +28,7 @@ func NewLogger(opts Options) *Logger {
)

logger.logger = slog.New(handler)
logger.broker = broker
logger.Broker = broker
logger.writer = writer
logger.enricher = &enricher{}

Expand All @@ -48,9 +46,9 @@ type Options struct {
// records as pug events, and enriching records with further attributes.
type Logger struct {
logger *slog.Logger
broker *pubsub.Broker[Message]
writer *writer

*pubsub.Broker[Message]
*enricher
}

Expand All @@ -70,11 +68,6 @@ func (l *Logger) Error(msg string, args ...any) {
l.logger.Error(msg, l.enrich(args...)...)
}

// Subscribe to log messages.
func (l *Logger) Subscribe(ctx context.Context) <-chan resource.Event[Message] {
return l.broker.Subscribe(ctx)
}

// Messages provides the log messages received thus far.
func (l *Logger) Messages() []Message {
return l.writer.Messages
Expand Down
Loading

0 comments on commit d09586a

Please sign in to comment.