From d09586a53a017e215779e57d439b05c2c05f6b0e Mon Sep 17 00:00:00 2001 From: Louis Garman Date: Sun, 28 Apr 2024 07:34:30 +0100 Subject: [PATCH] fix: pubsub broker now can handle unlimited events --- internal/app/app.go | 33 +++++--- internal/app/helpers_test.go | 19 +++-- internal/app/module_list_test.go | 11 +-- internal/app/module_test.go | 6 +- internal/app/quit_test.go | 2 +- internal/app/run_list_test.go | 4 +- internal/app/run_test.go | 2 +- .../testdata/module_list/modules/a/main.tf | 15 ++++ .../a/terraform.tfstate.d/dev/.gitkeep | 0 .../testdata/module_list/modules/b/main.tf | 15 ++++ .../testdata/module_list/modules/c/main.tf | 15 ++++ internal/app/workspace_list_test.go | 6 +- internal/app/workspace_resources_test.go | 2 +- internal/logging/logger.go | 11 +-- internal/module/service.go | 10 +-- internal/pubsub/broker.go | 79 ++++++++----------- internal/run/scheduler.go | 6 +- internal/run/service.go | 11 +-- internal/state/service.go | 11 +-- internal/task/enqueuer.go | 6 +- internal/task/runner.go | 2 +- internal/task/service.go | 8 +- internal/workspace/service.go | 16 ++-- 23 files changed, 150 insertions(+), 140 deletions(-) create mode 100644 internal/app/testdata/module_list/modules/a/main.tf create mode 100644 internal/app/testdata/module_list/modules/a/terraform.tfstate.d/dev/.gitkeep create mode 100644 internal/app/testdata/module_list/modules/b/main.tf create mode 100644 internal/app/testdata/module_list/modules/c/main.tf diff --git a/internal/app/app.go b/internal/app/app.go index b5ca90c6..3da512f9 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -50,7 +50,7 @@ func Start(stdout, stderr io.Writer, args []string) error { return nil } - app, model, err := newApp(cfg) + app, model, err := newApp(ctx, cfg) if err != nil { return err } @@ -88,7 +88,7 @@ func Start(stdout, stderr io.Writer, args []string) error { } // newApp constructs an instance of the app and the top-level TUI model. -func newApp(cfg config) (*app, tea.Model, error) { +func newApp(ctx context.Context, cfg config) (*app, tea.Model, error) { // Setup logging logger := logging.NewLogger(cfg.loggingOptions) @@ -132,6 +132,17 @@ func newApp(cfg config) (*app, tea.Model, error) { Logger: logger, }) + // Shutdown services once context is done. + go func() { + <-ctx.Done() + logger.Shutdown() + tasks.Shutdown() + modules.Shutdown() + workspaces.Shutdown() + states.Shutdown() + runs.Shutdown() + }() + // Construct top-level TUI model. model, err := top.New(top.Options{ TaskService: tasks, @@ -164,46 +175,46 @@ func newApp(cfg config) (*app, tea.Model, error) { // start starts the app daemons and relays events to the TUI. func (a *app) start(ctx context.Context, s sender) func() error { // Start daemons - task.StartEnqueuer(ctx, a.tasks) - run.StartScheduler(ctx, a.runs, a.workspaces) + task.StartEnqueuer(a.tasks) + run.StartScheduler(a.runs, a.workspaces) waitfn := task.StartRunner(ctx, a.logger, a.tasks, a.cfg.MaxTasks) // Automatically load workspaces whenever modules are loaded. - a.workspaces.LoadWorkspacesUponModuleLoad(ctx, a.modules) + a.workspaces.LoadWorkspacesUponModuleLoad(a.modules) // Relay resource events to TUI. Deliberately set up subscriptions *before* // any events are triggered, to ensure the TUI receives all messages. - logEvents := a.logger.Subscribe(ctx) + logEvents := a.logger.Subscribe() go func() { for ev := range logEvents { s.Send(ev) } }() - modEvents := a.modules.Subscribe(ctx) + modEvents := a.modules.Subscribe() go func() { for ev := range modEvents { s.Send(ev) } }() - wsEvents := a.workspaces.Subscribe(ctx) + wsEvents := a.workspaces.Subscribe() go func() { for ev := range wsEvents { s.Send(ev) } }() - stateEvents := a.states.Subscribe(ctx) + stateEvents := a.states.Subscribe() go func() { for ev := range stateEvents { s.Send(ev) } }() - runEvents := a.runs.Subscribe(ctx) + runEvents := a.runs.Subscribe() go func() { for ev := range runEvents { s.Send(ev) } }() - taskEvents := a.tasks.Subscribe(ctx) + taskEvents := a.tasks.Subscribe() go func() { for ev := range taskEvents { s.Send(ev) diff --git a/internal/app/helpers_test.go b/internal/app/helpers_test.go index 5812205d..fda44643 100644 --- a/internal/app/helpers_test.go +++ b/internal/app/helpers_test.go @@ -18,28 +18,31 @@ import ( "github.com/stretchr/testify/require" ) -func setup(t *testing.T) *teatest.TestModel { +func setup(t *testing.T, workdir string) *teatest.TestModel { t.Helper() // Clean up any leftover artefacts from previous tests (previous tests // neglect to clean up artefacts if they end with a panic). - cleanupArtefacts() + cleanupArtefacts(workdir) + + // And clean up artefacts once test finishes + t.Cleanup(func() { + cleanupArtefacts(workdir) + }) // Cancel context once test finishes. ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) - // Clean up artefacts once test finishes - t.Cleanup(cleanupArtefacts) - // Setup provider mirror setupProviderMirror(t) app, m, err := newApp( + ctx, config{ FirstPage: "modules", Program: "terraform", - Workdir: "./testdata", + Workdir: workdir, MaxTasks: 3, loggingOptions: logging.Options{ Level: "debug", @@ -66,8 +69,8 @@ func setup(t *testing.T) *teatest.TestModel { } // cleanupArtefacts removes all the detritus that terraform leaves behind. -func cleanupArtefacts() { - _ = filepath.WalkDir("./testdata/modules", func(path string, d fs.DirEntry, walkerr error) error { +func cleanupArtefacts(workdir string) { + _ = filepath.WalkDir(workdir, func(path string, d fs.DirEntry, walkerr error) error { if walkerr != nil { return walkerr } diff --git a/internal/app/module_list_test.go b/internal/app/module_list_test.go index b0ad31eb..2fba0c34 100644 --- a/internal/app/module_list_test.go +++ b/internal/app/module_list_test.go @@ -8,7 +8,7 @@ import ( ) func TestModuleList(t *testing.T) { - tm := setup(t) + tm := setup(t, "./testdata/module_list") // Expect three modules to be listed want := []string{ @@ -17,6 +17,7 @@ func TestModuleList(t *testing.T) { "modules/c", } waitFor(t, tm, func(s string) bool { + t.Log(s) for _, w := range want { if !strings.Contains(s, w) { return false @@ -54,7 +55,7 @@ func TestModuleList(t *testing.T) { } func TestModuleList_Reload(t *testing.T) { - tm := setup(t) + tm := setup(t, "./testdata/module_list") // Expect message to inform user that three modules have been loaded waitFor(t, tm, func(s string) bool { @@ -72,7 +73,7 @@ func TestModuleList_Reload(t *testing.T) { } func TestModuleList_ReloadWorkspaces(t *testing.T) { - tm := setup(t) + tm := setup(t, "./testdata/module_list") // Expect message to inform user that three modules have been loaded waitFor(t, tm, func(s string) bool { @@ -112,7 +113,7 @@ func TestModuleList_ReloadWorkspaces(t *testing.T) { // then attempting to create a run on each. Pug should de-select those // selections which are not initialized / have no current workspace. func TestModuleList_CreateRun(t *testing.T) { - tm := setup(t) + tm := setup(t, "./testdata/module_list") // Expect message to inform user that modules have been loaded waitFor(t, tm, func(s string) bool { @@ -155,7 +156,7 @@ func TestModuleList_CreateRun(t *testing.T) { // should de-select those selections which have no current run in a planned // state. func TestModuleList_ApplyCurrentRun(t *testing.T) { - tm := setup(t) + tm := setup(t, "./testdata/module_list") // Expect message to inform user that modules have been loaded waitFor(t, tm, func(s string) bool { diff --git a/internal/app/module_test.go b/internal/app/module_test.go index 928906fb..0b86611a 100644 --- a/internal/app/module_test.go +++ b/internal/app/module_test.go @@ -8,7 +8,7 @@ import ( ) func TestModule(t *testing.T) { - tm := setup(t) + tm := setup(t, "./testdata/module_list") // Expect module to be listed waitFor(t, tm, func(s string) bool { @@ -25,7 +25,7 @@ func TestModule(t *testing.T) { } func TestModule_SetCurrentWorkspace(t *testing.T) { - tm := setup(t) + tm := setup(t, "./testdata/module_list") // Wait for module to be loaded waitFor(t, tm, func(s string) bool { @@ -62,7 +62,7 @@ func TestModule_SetCurrentWorkspace(t *testing.T) { } func TestModule_ReloadWorkspaces(t *testing.T) { - tm := setup(t) + tm := setup(t, "./testdata/module_list") // Wait for module to be loaded waitFor(t, tm, func(s string) bool { diff --git a/internal/app/quit_test.go b/internal/app/quit_test.go index 7d2609af..4126d6f0 100644 --- a/internal/app/quit_test.go +++ b/internal/app/quit_test.go @@ -10,7 +10,7 @@ import ( ) func TestQuit(t *testing.T) { - tm := setup(t) + tm := setup(t, "./testdata/module_list") tm.Send(tea.KeyMsg{ Type: tea.KeyCtrlC, diff --git a/internal/app/run_list_test.go b/internal/app/run_list_test.go index 98bcaae5..79e06800 100644 --- a/internal/app/run_list_test.go +++ b/internal/app/run_list_test.go @@ -10,7 +10,7 @@ import ( // TestRunList_Single tests interacting with a single run in the run list view. func TestRunList_Single(t *testing.T) { - tm := setup(t) + tm := setup(t, "./testdata/module_list") // Wait for module to be loaded waitFor(t, tm, func(s string) bool { @@ -74,7 +74,7 @@ func TestRunList_Single(t *testing.T) { // TestRunList_Multiple demonstrates interacting with multiple runs on the run // list page. func TestRunList_Multiple(t *testing.T) { - tm := setup(t) + tm := setup(t, "./testdata/module_list") // Expect message to inform user that modules have been loaded waitFor(t, tm, func(s string) bool { diff --git a/internal/app/run_test.go b/internal/app/run_test.go index cce320d5..914c6782 100644 --- a/internal/app/run_test.go +++ b/internal/app/run_test.go @@ -5,7 +5,7 @@ import ( ) func TestRun(t *testing.T) { - tm := setup(t) + tm := setup(t, "./testdata/module_list") // Initialize and apply run on modules/a initAndApplyModuleA(t, tm) diff --git a/internal/app/testdata/module_list/modules/a/main.tf b/internal/app/testdata/module_list/modules/a/main.tf new file mode 100644 index 00000000..a250b310 --- /dev/null +++ b/internal/app/testdata/module_list/modules/a/main.tf @@ -0,0 +1,15 @@ +terraform { + backend "local" {} +} + +resource "random_pet" "pet" { + count = 10 + + keepers = { + now = timestamp() + } +} + +output "pets" { + value = random_pet.pet[*].id +} diff --git a/internal/app/testdata/module_list/modules/a/terraform.tfstate.d/dev/.gitkeep b/internal/app/testdata/module_list/modules/a/terraform.tfstate.d/dev/.gitkeep new file mode 100644 index 00000000..e69de29b diff --git a/internal/app/testdata/module_list/modules/b/main.tf b/internal/app/testdata/module_list/modules/b/main.tf new file mode 100644 index 00000000..a250b310 --- /dev/null +++ b/internal/app/testdata/module_list/modules/b/main.tf @@ -0,0 +1,15 @@ +terraform { + backend "local" {} +} + +resource "random_pet" "pet" { + count = 10 + + keepers = { + now = timestamp() + } +} + +output "pets" { + value = random_pet.pet[*].id +} diff --git a/internal/app/testdata/module_list/modules/c/main.tf b/internal/app/testdata/module_list/modules/c/main.tf new file mode 100644 index 00000000..a250b310 --- /dev/null +++ b/internal/app/testdata/module_list/modules/c/main.tf @@ -0,0 +1,15 @@ +terraform { + backend "local" {} +} + +resource "random_pet" "pet" { + count = 10 + + keepers = { + now = timestamp() + } +} + +output "pets" { + value = random_pet.pet[*].id +} diff --git a/internal/app/workspace_list_test.go b/internal/app/workspace_list_test.go index cf9cb4ce..88a8bed2 100644 --- a/internal/app/workspace_list_test.go +++ b/internal/app/workspace_list_test.go @@ -10,7 +10,7 @@ import ( ) func TestWorkspaceList_SetCurrentWorkspace(t *testing.T) { - tm := setup(t) + tm := setup(t, "./testdata/module_list") // Wait for module to be loaded waitFor(t, tm, func(s string) bool { @@ -50,7 +50,7 @@ func TestWorkspaceList_SetCurrentWorkspace(t *testing.T) { } func TestWorkspaceList_CreateRun(t *testing.T) { - tm := setup(t) + tm := setup(t, "./testdata/module_list") // Expect message to inform user that modules have been loaded waitFor(t, tm, func(s string) bool { @@ -98,7 +98,7 @@ func TestWorkspaceList_CreateRun(t *testing.T) { } func TestWorkspaceList_ApplyCurrentRun(t *testing.T) { - tm := setup(t) + tm := setup(t, "./testdata/module_list") // Expect message to inform user that modules have been loaded waitFor(t, tm, func(s string) bool { diff --git a/internal/app/workspace_resources_test.go b/internal/app/workspace_resources_test.go index c074fe61..ac6007cc 100644 --- a/internal/app/workspace_resources_test.go +++ b/internal/app/workspace_resources_test.go @@ -9,7 +9,7 @@ import ( ) func TestWorkspace_Resources(t *testing.T) { - tm := setup(t) + tm := setup(t, "./testdata/module_list") // Initialize and apply run on modules/a initAndApplyModuleA(t, tm) diff --git a/internal/logging/logger.go b/internal/logging/logger.go index 44b79163..55405a53 100644 --- a/internal/logging/logger.go +++ b/internal/logging/logger.go @@ -1,12 +1,10 @@ package logging import ( - "context" "io" "log/slog" "github.com/leg100/pug/internal/pubsub" - "github.com/leg100/pug/internal/resource" ) var levels = map[string]slog.Level{ @@ -30,7 +28,7 @@ func NewLogger(opts Options) *Logger { ) logger.logger = slog.New(handler) - logger.broker = broker + logger.Broker = broker logger.writer = writer logger.enricher = &enricher{} @@ -48,9 +46,9 @@ type Options struct { // records as pug events, and enriching records with further attributes. type Logger struct { logger *slog.Logger - broker *pubsub.Broker[Message] writer *writer + *pubsub.Broker[Message] *enricher } @@ -70,11 +68,6 @@ func (l *Logger) Error(msg string, args ...any) { l.logger.Error(msg, l.enrich(args...)...) } -// Subscribe to log messages. -func (l *Logger) Subscribe(ctx context.Context) <-chan resource.Event[Message] { - return l.broker.Subscribe(ctx) -} - // Messages provides the log messages received thus far. func (l *Logger) Messages() []Message { return l.writer.Messages diff --git a/internal/module/service.go b/internal/module/service.go index bf6afe55..c9706aa2 100644 --- a/internal/module/service.go +++ b/internal/module/service.go @@ -1,7 +1,6 @@ package module import ( - "context" "fmt" "slices" @@ -14,11 +13,12 @@ import ( type Service struct { table *resource.Table[*Module] - broker *pubsub.Broker[*Module] tasks *task.Service workdir internal.Workdir pluginCache bool logger logging.Interface + + *pubsub.Broker[*Module] } type ServiceOptions struct { @@ -36,7 +36,7 @@ func NewService(opts ServiceOptions) *Service { return &Service{ table: table, - broker: broker, + Broker: broker, tasks: opts.TaskService, workdir: opts.Workdir, pluginCache: opts.PluginCache, @@ -130,10 +130,6 @@ func (s *Service) GetByPath(path string) (*Module, error) { return nil, resource.ErrNotFound } -func (s *Service) Subscribe(ctx context.Context) <-chan resource.Event[*Module] { - return s.broker.Subscribe(ctx) -} - func (s *Service) SetCurrent(moduleID, workspaceID resource.ID) error { _, err := s.table.Update(moduleID, func(existing *Module) error { existing.CurrentWorkspaceID = &workspaceID diff --git a/internal/pubsub/broker.go b/internal/pubsub/broker.go index f2a74597..8053fb92 100644 --- a/internal/pubsub/broker.go +++ b/internal/pubsub/broker.go @@ -1,22 +1,11 @@ package pubsub import ( - "context" - "errors" "sync" "github.com/leg100/pug/internal/resource" ) -const ( - // subBufferSize is the buffer size of the channel for each subscription. - subBufferSize = 1024 -) - -// ErrSubscriptionTerminated is for use by subscribers to indicate that their -// subscription has been terminated by the broker. -var ErrSubscriptionTerminated = errors.New("broker terminated the subscription") - type Logger interface { Debug(msg string, args ...any) Info(msg string, args ...any) @@ -29,66 +18,60 @@ type Broker[T any] struct { subs map[chan resource.Event[T]]struct{} // subscriptions mu sync.Mutex // sync access to map + done chan struct{} logger Logger } +// NewBroker constructs a pub/sub broker. func NewBroker[T any](logger Logger) *Broker[T] { b := &Broker[T]{ subs: make(map[chan resource.Event[T]]struct{}), + done: make(chan struct{}), logger: logger, } return b } -// Subscribe subscribes the caller to a stream of events. The caller can close -// the subscription by either canceling the context or calling the returned -// unsubscribe function. -func (b *Broker[T]) Subscribe(ctx context.Context) <-chan resource.Event[T] { +// Shutdown the broker, terminating any subscriptions. +func (b *Broker[T]) Shutdown() { b.mu.Lock() defer b.mu.Unlock() - sub := make(chan resource.Event[T], subBufferSize) - b.subs[sub] = struct{}{} + // First, close the done channel, which'll unblock any go-routines in + // Publish() that are blocked sending to a subscriber's channel. + close(b.done) + // Second, remove each subscriber entry, so Publish() cannot send any + // further messages, and close each subscriber's channel, so the subscriber + // knows to stop consuming messages. + for ch := range b.subs { + delete(b.subs, ch) + close(ch) + } +} - // when the context is canceled remove the subscriber - go func() { - <-ctx.Done() - b.unsubscribe(sub) - }() +// Subscribe subscribes the caller to a stream of events. The returned channel +// is closed when the broker is shutdown. +func (b *Broker[T]) Subscribe() <-chan resource.Event[T] { + b.mu.Lock() + defer b.mu.Unlock() + sub := make(chan resource.Event[T]) + b.subs[sub] = struct{}{} return sub } // Publish an event to subscribers. -// -// TODO: there is the potential for a subscriber to become full, i.e. its -// buffered channel is full, in which case the broker will block until the -// channel has free capacity again. This should only happen in extremis, e.g. a -// user has a shit-load of modules/workspaces and invokes a massive number of -// parallel tasks, which in turn publishes a shit-load of events. And if it -// happens, it *should* only happen briefly before the subscriber consumes from -// its channel, freeing up capacity. But if the subscriber does not consume -// because it has blocked on something else indefinitely then the broker will -// block indefinitely. -// -// We need need to know when this happens, via some sort of surfacing of -// metrics that does not get blocked itself... func (b *Broker[T]) Publish(t resource.EventType, payload T) { - b.mu.Lock() - for sub := range b.subs { - sub <- resource.Event[T]{Type: t, Payload: payload} - } - b.mu.Unlock() -} - -func (b *Broker[T]) unsubscribe(sub chan resource.Event[T]) { b.mu.Lock() defer b.mu.Unlock() - if _, ok := b.subs[sub]; !ok { - // already unsubscribed - return + for sub := range b.subs { + go func() { + select { + case sub <- resource.Event[T]{Type: t, Payload: payload}: + case <-b.done: + return + } + }() } - close(sub) - delete(b.subs, sub) } diff --git a/internal/run/scheduler.go b/internal/run/scheduler.go index 45ad5179..34638bc0 100644 --- a/internal/run/scheduler.go +++ b/internal/run/scheduler.go @@ -1,8 +1,6 @@ package run import ( - "context" - "github.com/leg100/pug/internal/resource" "github.com/leg100/pug/internal/workspace" "golang.org/x/exp/maps" @@ -20,8 +18,8 @@ type runLister interface { // there is at most one active run on each workspace at a time. // // The scheduler attempts to schedule runs upon every run event it receives. -func StartScheduler(ctx context.Context, runs *Service, workspaces *workspace.Service) { - sub := runs.Broker.Subscribe(ctx) +func StartScheduler(runs *Service, workspaces *workspace.Service) { + sub := runs.Broker.Subscribe() s := &scheduler{runs: runs} go func() { diff --git a/internal/run/service.go b/internal/run/service.go index cb0e8474..cc922f9f 100644 --- a/internal/run/service.go +++ b/internal/run/service.go @@ -1,7 +1,6 @@ package run import ( - "context" "fmt" "io" "slices" @@ -16,8 +15,6 @@ import ( ) type Service struct { - Broker *pubsub.Broker[*Run] - table *resource.Table[*Run] logger logging.Interface @@ -27,6 +24,8 @@ type Service struct { states *state.Service disableReloadAfterApply bool + + *pubsub.Broker[*Run] } type ServiceOptions struct { @@ -74,7 +73,7 @@ func (s *Service) create(workspaceID resource.ID, opts CreateOptions) (*Run, err } // Publish an event upon every run status update opts.afterUpdate = func(run *Run) { - s.Broker.Publish(resource.UpdatedEvent, run) + s.Publish(resource.UpdatedEvent, run) } run, err := newRun(mod, ws, opts) if err != nil { @@ -264,10 +263,6 @@ func (s *Service) List(opts ListOptions) []*Run { return runs } -func (s *Service) Subscribe(ctx context.Context) <-chan resource.Event[*Run] { - return s.Broker.Subscribe(ctx) -} - func (s *Service) Delete(id resource.ID) error { if err := s.delete(id); err != nil { return err diff --git a/internal/state/service.go b/internal/state/service.go index 66ba1198..cefb2d2d 100644 --- a/internal/state/service.go +++ b/internal/state/service.go @@ -18,11 +18,12 @@ type Service struct { modules *module.Service workspaces *workspace.Service tasks *task.Service - broker *pubsub.Broker[*State] logger logging.Interface // Table mapping workspace IDs to states cache *resource.Table[*State] + + *pubsub.Broker[*State] } type ServiceOptions struct { @@ -39,13 +40,13 @@ func NewService(ctx context.Context, opts ServiceOptions) *Service { workspaces: opts.WorkspaceService, tasks: opts.TaskService, cache: resource.NewTable(broker), - broker: broker, + Broker: broker, logger: opts.Logger, } // Whenever a workspace is added, pull its state go func() { - for event := range opts.WorkspaceService.Subscribe(ctx) { + for event := range opts.WorkspaceService.Subscribe() { if event.Type == resource.CreatedEvent { _, _ = svc.Reload(event.Payload.ID) } @@ -199,10 +200,6 @@ func (s *Service) Untaint(workspaceID resource.ID, addr ResourceAddress) (*task. }) } -func (s *Service) Subscribe(ctx context.Context) <-chan resource.Event[*State] { - return s.broker.Subscribe(ctx) -} - func (s *Service) createTask(workspaceID resource.ID, opts task.CreateOptions) (*task.Task, error) { ws, err := s.workspaces.Get(workspaceID) if err != nil { diff --git a/internal/task/enqueuer.go b/internal/task/enqueuer.go index 3bc45e54..71bf9138 100644 --- a/internal/task/enqueuer.go +++ b/internal/task/enqueuer.go @@ -1,8 +1,6 @@ package task import ( - "context" - "github.com/leg100/pug/internal/resource" ) @@ -12,9 +10,9 @@ type enqueuer struct { tasks taskLister } -func StartEnqueuer(ctx context.Context, tasks *Service) { +func StartEnqueuer(tasks *Service) { e := enqueuer{tasks: tasks} - sub := tasks.Broker.Subscribe(ctx) + sub := tasks.Broker.Subscribe() go func() { for range sub { diff --git a/internal/task/runner.go b/internal/task/runner.go index 4af96a2a..8cfb8a69 100644 --- a/internal/task/runner.go +++ b/internal/task/runner.go @@ -16,7 +16,7 @@ type runner struct { } func StartRunner(ctx context.Context, logger logging.Interface, tasks *Service, maxTasks int) func() error { - sub := tasks.Broker.Subscribe(ctx) + sub := tasks.Subscribe() r := &runner{ max: maxTasks, tasks: tasks, diff --git a/internal/task/service.go b/internal/task/service.go index 6941bab1..7492dcd6 100644 --- a/internal/task/service.go +++ b/internal/task/service.go @@ -1,7 +1,6 @@ package task import ( - "context" "slices" "github.com/leg100/pug/internal" @@ -11,12 +10,11 @@ import ( ) type Service struct { - Broker *pubsub.Broker[*Task] - table *resource.Table[*Task] counter *int logger logging.Interface + *pubsub.Broker[*Task] *factory } @@ -170,10 +168,6 @@ func (s *Service) Get(taskID resource.ID) (*Task, error) { return s.table.Get(taskID) } -func (s *Service) Subscribe(ctx context.Context) <-chan resource.Event[*Task] { - return s.Broker.Subscribe(ctx) -} - func (s *Service) Cancel(taskID resource.ID) (*Task, error) { task, err := s.table.Get(taskID) if err != nil { diff --git a/internal/workspace/service.go b/internal/workspace/service.go index f54f3653..8874cadd 100644 --- a/internal/workspace/service.go +++ b/internal/workspace/service.go @@ -2,7 +2,6 @@ package workspace import ( "bufio" - "context" "fmt" "io" "slices" @@ -16,12 +15,13 @@ import ( ) type Service struct { - broker *pubsub.Broker[*Workspace] table workspaceTable logger logging.Interface modules moduleService tasks *task.Service + + *pubsub.Broker[*Workspace] } type ServiceOptions struct { @@ -48,7 +48,7 @@ type moduleService interface { } type moduleSubscription interface { - Subscribe(ctx context.Context) <-chan resource.Event[*module.Module] + Subscribe() <-chan resource.Event[*module.Module] } func NewService(opts ServiceOptions) *Service { @@ -58,7 +58,7 @@ func NewService(opts ServiceOptions) *Service { opts.Logger.AddEnricher(&logEnricher{table: table}) svc := &Service{ - broker: broker, + Broker: broker, table: table, modules: opts.ModuleService, tasks: opts.TaskService, @@ -73,8 +73,8 @@ func NewService(opts ServiceOptions) *Service { // initialized (because `terraform workspace list` would fail) // * an existing module is updated, has been initialized, and does not yet have // a current workspace. -func (s *Service) LoadWorkspacesUponModuleLoad(ctx context.Context, ms moduleSubscription) { - sub := ms.Subscribe(ctx) +func (s *Service) LoadWorkspacesUponModuleLoad(ms moduleSubscription) { + sub := ms.Subscribe() go func() { for event := range sub { @@ -271,10 +271,6 @@ func (s *Service) List(opts ListOptions) []*Workspace { return existing } -func (s *Service) Subscribe(ctx context.Context) <-chan resource.Event[*Workspace] { - return s.broker.Subscribe(ctx) -} - func (s *Service) SetCurrentRun(workspaceID, runID resource.ID) error { ws, err := s.table.Update(workspaceID, func(existing *Workspace) error { existing.CurrentRunID = &runID