diff --git a/assets/auto-imports.d.ts b/assets/auto-imports.d.ts index bc57bf0645d..ebae61d0b12 100644 --- a/assets/auto-imports.d.ts +++ b/assets/auto-imports.d.ts @@ -230,6 +230,7 @@ declare global { const useGamepad: typeof import('@vueuse/core')['useGamepad'] const useGeolocation: typeof import('@vueuse/core')['useGeolocation'] const useHead: typeof import('@vueuse/head')['useHead'] + const useHosts: typeof import('./stores/hosts')['useHosts'] const useI18n: typeof import('vue-i18n')['useI18n'] const useIdle: typeof import('@vueuse/core')['useIdle'] const useImage: typeof import('@vueuse/core')['useImage'] @@ -587,6 +588,7 @@ declare module 'vue' { readonly useGamepad: UnwrapRef readonly useGeolocation: UnwrapRef readonly useHead: UnwrapRef + readonly useHosts: UnwrapRef readonly useI18n: UnwrapRef readonly useIdle: UnwrapRef readonly useImage: UnwrapRef @@ -937,6 +939,7 @@ declare module '@vue/runtime-core' { readonly useGamepad: UnwrapRef readonly useGeolocation: UnwrapRef readonly useHead: UnwrapRef + readonly useHosts: UnwrapRef readonly useI18n: UnwrapRef readonly useIdle: UnwrapRef readonly useImage: UnwrapRef diff --git a/assets/components/SideMenu.vue b/assets/components/SideMenu.vue index c90eef0efb3..e885cca6c55 100644 --- a/assets/components/SideMenu.vue +++ b/assets/components/SideMenu.vue @@ -10,10 +10,11 @@ @@ -68,6 +69,7 @@ import { sessionHost } from "@/composable/storage"; const store = useContainerStore(); const { activeContainers, visibleContainers, ready } = storeToRefs(store); +const { hosts } = useHosts(); function setHost(host: string | null) { sessionHost.value = host; @@ -116,16 +118,6 @@ const menuItems = computed(() => { } }); -const hosts = computed(() => - config.hosts.reduce( - (acc, item) => { - acc[item.id] = item; - return acc; - }, - {} as Record, - ), -); - const activeContainersById = computed(() => activeContainers.value.reduce( (acc, item) => { diff --git a/assets/stores/container.ts b/assets/stores/container.ts index d145e39ea45..17748968129 100644 --- a/assets/stores/container.ts +++ b/assets/stores/container.ts @@ -5,6 +5,7 @@ import { Container } from "@/models/Container"; import i18n from "@/modules/i18n"; const { showToast } = useToast(); +const { markHostAvailable } = useHosts(); // @ts-ignore const { t } = i18n.global; @@ -68,6 +69,11 @@ export const useContainerStore = defineStore("container", () => { } }); + es.addEventListener("host-unavailable", (e) => { + const hostId = (e as MessageEvent).data; + markHostAvailable(hostId, false); + }); + es.addEventListener("container-health", (e) => { const event = JSON.parse((e as MessageEvent).data) as { actorId: string; health: ContainerHealth }; const container = allContainersById.value[event.actorId]; diff --git a/assets/stores/hosts.ts b/assets/stores/hosts.ts new file mode 100644 index 00000000000..1bc16fbec74 --- /dev/null +++ b/assets/stores/hosts.ts @@ -0,0 +1,25 @@ +type Host = { + name: string; + id: string; + available: boolean; +}; +const hosts = computed(() => + config.hosts.reduce( + (acc, item) => { + acc[item.id] = { ...item, available: true }; + return acc; + }, + {} as Record, + ), +); + +const markHostAvailable = (id: string, available: boolean) => { + hosts.value[id].available = available; +}; + +export function useHosts() { + return { + hosts, + markHostAvailable, + }; +} diff --git a/docker-compose.yml b/docker-compose.yml index 9a3dc38ef48..1c78ecaa9c2 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -44,7 +44,8 @@ services: build: context: . depends_on: - - proxy + proxy: + condition: service_healthy proxy: container_name: proxy @@ -53,6 +54,11 @@ services: - /var/run/docker.sock:/var/run/docker.sock:ro environment: - CONTAINERS=1 + healthcheck: + test: ["CMD", "nc", "-z", "127.0.0.1", "2375"] + interval: 30s + retries: 5 + start_period: 5s ports: - 2375:2375 diff --git a/internal/docker/client.go b/internal/docker/client.go index 2e18b8bd386..4d503b7605c 100644 --- a/internal/docker/client.go +++ b/internal/docker/client.go @@ -61,7 +61,7 @@ type Client interface { ListContainers() ([]Container, error) FindContainer(string) (Container, error) ContainerLogs(context.Context, string, string, StdType) (io.ReadCloser, error) - Events(context.Context, chan<- ContainerEvent) <-chan error + Events(context.Context, chan<- ContainerEvent) error ContainerLogsBetweenDates(context.Context, string, time.Time, time.Time, StdType) (io.ReadCloser, error) ContainerStats(context.Context, string, chan<- ContainerStat) error Ping(context.Context) (types.Ping, error) @@ -297,35 +297,27 @@ func (d *_client) ContainerLogs(ctx context.Context, id string, since string, st return reader, nil } -func (d *_client) Events(ctx context.Context, messages chan<- ContainerEvent) <-chan error { - dockerMessages, errors := d.cli.Events(ctx, types.EventsOptions{}) +func (d *_client) Events(ctx context.Context, messages chan<- ContainerEvent) error { + dockerMessages, err := d.cli.Events(ctx, types.EventsOptions{}) - go func() { - - for { - select { - case <-ctx.Done(): - return - case err := <-errors: - log.Fatalf("error while listening to docker events: %v. Exiting...", err) - case message, ok := <-dockerMessages: - if !ok { - log.Errorf("docker events channel closed") - return - } + for { + select { + case <-ctx.Done(): + return nil + case err := <-err: + return err - if message.Type == "container" && len(message.Actor.ID) > 0 { - messages <- ContainerEvent{ - ActorID: message.Actor.ID[:12], - Name: string(message.Action), - Host: d.host.ID, - } + case message := <-dockerMessages: + if message.Type == "container" && len(message.Actor.ID) > 0 { + messages <- ContainerEvent{ + ActorID: message.Actor.ID[:12], + Name: string(message.Action), + Host: d.host.ID, } } } - }() + } - return errors } func (d *_client) ContainerLogsBetweenDates(ctx context.Context, id string, from time.Time, to time.Time, stdType StdType) (io.ReadCloser, error) { diff --git a/internal/docker/container_store.go b/internal/docker/container_store.go index a4ea06d1ddb..98c2199d9b2 100644 --- a/internal/docker/container_store.go +++ b/internal/docker/container_store.go @@ -2,7 +2,9 @@ package docker import ( "context" + "errors" "sync" + "sync/atomic" "github.com/puzpuzpuz/xsync/v3" log "github.com/sirupsen/logrus" @@ -14,6 +16,9 @@ type ContainerStore struct { client Client statsCollector *StatsCollector wg sync.WaitGroup + connected atomic.Bool + events chan ContainerEvent + ctx context.Context } func NewContainerStore(ctx context.Context, client Client) *ContainerStore { @@ -23,24 +28,54 @@ func NewContainerStore(ctx context.Context, client Client) *ContainerStore { subscribers: xsync.NewMapOf[context.Context, chan ContainerEvent](), statsCollector: NewStatsCollector(client), wg: sync.WaitGroup{}, + events: make(chan ContainerEvent), + ctx: ctx, } s.wg.Add(1) - go s.init(ctx) + go s.init() return s } -func (s *ContainerStore) List() []Container { +func (s *ContainerStore) checkConnectivity() error { + if s.connected.CompareAndSwap(false, true) { + go func() { + log.Debugf("subscribing to docker events from container store %s", s.client.Host()) + err := s.client.Events(s.ctx, s.events) + if !errors.Is(err, context.Canceled) { + log.Errorf("docker store unexpectedly disconnected from docker events from %s with %v", s.client.Host(), err) + } + s.connected.Store(false) + }() + + if containers, err := s.client.ListContainers(); err != nil { + return err + } else { + s.containers.Clear() + for _, c := range containers { + s.containers.Store(c.ID, &c) + } + } + } + + return nil +} + +func (s *ContainerStore) List() ([]Container, error) { s.wg.Wait() + + if err := s.checkConnectivity(); err != nil { + return nil, err + } containers := make([]Container, 0) s.containers.Range(func(_ string, c *Container) bool { containers = append(containers, *c) return true }) - return containers + return containers, nil } func (s *ContainerStore) Client() Client { @@ -49,7 +84,7 @@ func (s *ContainerStore) Client() Client { func (s *ContainerStore) Subscribe(ctx context.Context, events chan ContainerEvent) { go func() { - if s.statsCollector.Start(context.Background()) { + if s.statsCollector.Start(s.ctx) { log.Debug("clearing container stats as stats collector has been stopped") s.containers.Range(func(_ string, c *Container) bool { c.Stats.Clear() @@ -57,6 +92,7 @@ func (s *ContainerStore) Subscribe(ctx context.Context, events chan ContainerEve }) } }() + s.subscribers.Store(ctx, events) } @@ -69,26 +105,17 @@ func (s *ContainerStore) SubscribeStats(ctx context.Context, stats chan Containe s.statsCollector.Subscribe(ctx, stats) } -func (s *ContainerStore) init(ctx context.Context) { - events := make(chan ContainerEvent) - s.client.Events(ctx, events) - +func (s *ContainerStore) init() { stats := make(chan ContainerStat) - s.statsCollector.Subscribe(ctx, stats) + s.statsCollector.Subscribe(s.ctx, stats) - if containers, err := s.client.ListContainers(); err == nil { - for _, c := range containers { - s.containers.Store(c.ID, &c) - } - } else { - log.Fatalf("error listing containers: %v", err) - } + s.checkConnectivity() s.wg.Done() for { select { - case event := <-events: + case event := <-s.events: log.Tracef("received event: %+v", event) switch event.Name { case "start": @@ -129,7 +156,7 @@ func (s *ContainerStore) init(ctx context.Context) { stat.ID = "" container.Stats.Push(stat) } - case <-ctx.Done(): + case <-s.ctx.Done(): return } } diff --git a/internal/docker/container_store_test.go b/internal/docker/container_store_test.go index 70fc9eeecbb..c696e31f57f 100644 --- a/internal/docker/container_store_test.go +++ b/internal/docker/container_store_test.go @@ -4,6 +4,7 @@ import ( "context" "testing" + "github.com/amir20/dozzle/internal/utils" "github.com/magiconair/properties/assert" "github.com/stretchr/testify/mock" ) @@ -23,9 +24,9 @@ func (m *mockedClient) FindContainer(id string) (Container, error) { return args.Get(0).(Container), args.Error(1) } -func (m *mockedClient) Events(ctx context.Context, events chan<- ContainerEvent) <-chan error { +func (m *mockedClient) Events(ctx context.Context, events chan<- ContainerEvent) error { args := m.Called(ctx, events) - return args.Get(0).(chan error) + return args.Error(0) } func (m *mockedClient) ContainerStats(ctx context.Context, id string, stats chan<- ContainerStat) error { @@ -33,6 +34,11 @@ func (m *mockedClient) ContainerStats(ctx context.Context, id string, stats chan return args.Error(0) } +func (m *mockedClient) Host() *Host { + args := m.Called() + return args.Get(0).(*Host) +} + func TestContainerStore_List(t *testing.T) { client := new(mockedClient) @@ -42,12 +48,18 @@ func TestContainerStore_List(t *testing.T) { Name: "test", }, }, nil) - client.On("Events", mock.Anything, mock.AnythingOfType("chan<- docker.ContainerEvent")).Return(make(chan error)) + client.On("Events", mock.Anything, mock.AnythingOfType("chan<- docker.ContainerEvent")).Return(nil).Run(func(args mock.Arguments) { + ctx := args.Get(0).(context.Context) + <-ctx.Done() + }) + client.On("Host").Return(&Host{ + ID: "localhost", + }) ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) store := NewContainerStore(ctx, client) - containers := store.List() + containers, _ := store.List() assert.Equal(t, containers[0].ID, "1234") } @@ -59,22 +71,24 @@ func TestContainerStore_die(t *testing.T) { ID: "1234", Name: "test", State: "running", + Stats: utils.NewRingBuffer[ContainerStat](300), }, }, nil) - client.On("Events", mock.Anything, mock.AnythingOfType("chan<- docker.ContainerEvent")).Return(make(chan error)). + client.On("Events", mock.Anything, mock.AnythingOfType("chan<- docker.ContainerEvent")).Return(nil). Run(func(args mock.Arguments) { ctx := args.Get(0).(context.Context) events := args.Get(1).(chan<- ContainerEvent) - go func() { - events <- ContainerEvent{ - Name: "die", - ActorID: "1234", - Host: "localhost", - } - <-ctx.Done() - }() + events <- ContainerEvent{ + Name: "die", + ActorID: "1234", + Host: "localhost", + } + <-ctx.Done() }) + client.On("Host").Return(&Host{ + ID: "localhost", + }) client.On("ContainerStats", mock.Anything, "1234", mock.AnythingOfType("chan<- docker.ContainerStat")).Return(nil) @@ -87,6 +101,6 @@ func TestContainerStore_die(t *testing.T) { store.Subscribe(ctx, events) <-events - containers := store.List() + containers, _ := store.List() assert.Equal(t, containers[0].State, "exited") } diff --git a/internal/docker/event_generator_test.go b/internal/docker/event_generator_test.go index 4a5cc1f79eb..8e247335ee3 100644 --- a/internal/docker/event_generator_test.go +++ b/internal/docker/event_generator_test.go @@ -152,6 +152,5 @@ func Benchmark_readEvent(b *testing.B) { for i := 0; i < b.N; i++ { readEvent(reader, true) - // println(message, stream) } } diff --git a/internal/docker/host.go b/internal/docker/host.go index e45e803c30e..7cbb9d7791f 100644 --- a/internal/docker/host.go +++ b/internal/docker/host.go @@ -19,6 +19,10 @@ type Host struct { ValidCerts bool `json:"-"` } +func (h *Host) String() string { + return h.ID +} + func ParseConnection(connection string) (Host, error) { parts := strings.Split(connection, "|") if len(parts) > 2 { diff --git a/internal/docker/stats_collector.go b/internal/docker/stats_collector.go index 74dd8d77c2a..a3b29f4422a 100644 --- a/internal/docker/stats_collector.go +++ b/internal/docker/stats_collector.go @@ -44,7 +44,7 @@ func (c *StatsCollector) forceStop() { if c.stopper != nil { c.stopper() c.stopper = nil - log.Debug("stopping container stats collector due to inactivity") + log.Debug("stopping container stats collector") } } @@ -52,7 +52,7 @@ func (c *StatsCollector) Stop() { c.mu.Lock() defer c.mu.Unlock() if c.totalStarted.Add(-1) == 0 { - log.Debug("scheduled to stop container stats collector") + log.Debugf("scheduled to stop container stats collector %s", c.client.Host()) c.timer = time.AfterFunc(timeToStop, func() { c.forceStop() }) @@ -62,7 +62,7 @@ func (c *StatsCollector) Stop() { func (c *StatsCollector) reset() { c.mu.Lock() defer c.mu.Unlock() - log.Debug("resetting timer for container stats collector") + log.Debugf("resetting timer for container stats collector %s", c.client.Host()) if c.timer != nil { c.timer.Stop() } @@ -87,7 +87,6 @@ func (sc *StatsCollector) Start(parentCtx context.Context) bool { sc.totalStarted.Add(1) var ctx context.Context - sc.mu.Lock() if sc.stopper != nil { sc.mu.Unlock() @@ -106,9 +105,18 @@ func (sc *StatsCollector) Start(parentCtx context.Context) bool { log.Errorf("error while listing containers: %v", err) } + events := make(chan ContainerEvent) + + go func() { + log.Debugf("subscribing to docker events from stats collector %s", sc.client.Host()) + err := sc.client.Events(context.Background(), events) + if !errors.Is(err, context.Canceled) { + log.Errorf("stats collector unexpectedly disconnected from docker events from %s with %v", sc.client.Host(), err) + } + sc.forceStop() + }() + go func() { - events := make(chan ContainerEvent) - sc.client.Events(ctx, events) for event := range events { switch event.Name { case "start": diff --git a/internal/docker/stats_collector_test.go b/internal/docker/stats_collector_test.go index ce84131045a..504c4294ea9 100644 --- a/internal/docker/stats_collector_test.go +++ b/internal/docker/stats_collector_test.go @@ -17,7 +17,12 @@ func startedCollector(ctx context.Context) *StatsCollector { State: "running", }, }, nil) - client.On("Events", mock.Anything, mock.AnythingOfType("chan<- docker.ContainerEvent")).Return(make(chan error)) + client.On("Events", mock.Anything, mock.AnythingOfType("chan<- docker.ContainerEvent")). + Return(nil). + Run(func(args mock.Arguments) { + ctx := args.Get(0).(context.Context) + <-ctx.Done() + }) client.On("ContainerStats", mock.Anything, mock.Anything, mock.AnythingOfType("chan<- docker.ContainerStat")). Return(nil). Run(func(args mock.Arguments) { @@ -26,6 +31,9 @@ func startedCollector(ctx context.Context) *StatsCollector { ID: "1234", } }) + client.On("Host").Return(&Host{ + ID: "localhost", + }) collector := NewStatsCollector(client) stats := make(chan ContainerStat) diff --git a/internal/web/events.go b/internal/web/events.go index efa8e222c09..bd3534bb32b 100644 --- a/internal/web/events.go +++ b/internal/web/events.go @@ -44,7 +44,15 @@ func (h *handler) streamEvents(w http.ResponseWriter, r *http.Request) { stats := make(chan docker.ContainerStat) for _, store := range h.stores { - allContainers = append(allContainers, store.List()...) + if containers, err := store.List(); err == nil { + allContainers = append(allContainers, containers...) + } else { + log.Errorf("error listing containers: %v", err) + + if _, err := fmt.Fprintf(w, "event: host-unavailable\ndata: %s\n\n", store.Client().Host().ID); err != nil { + log.Errorf("error writing event to event stream: %v", err) + } + } store.SubscribeStats(ctx, stats) store.Subscribe(ctx, events) } @@ -86,10 +94,11 @@ func (h *handler) streamEvents(w http.ResponseWriter, r *http.Request) { case "start", "die": if event.Name == "start" { log.Debugf("found new container with id: %v", event.ActorID) - containers := h.stores[event.Host].List() - if err := sendContainersJSON(containers, w); err != nil { - log.Errorf("error encoding containers to stream: %v", err) - return + if containers, err := h.stores[event.Host].List(); err == nil { + if err := sendContainersJSON(containers, w); err != nil { + log.Errorf("error encoding containers to stream: %v", err) + return + } } } diff --git a/internal/web/events_test.go b/internal/web/events_test.go index 63087d07a34..0dcb220139e 100644 --- a/internal/web/events_test.go +++ b/internal/web/events_test.go @@ -21,26 +21,24 @@ func Test_handler_streamEvents_happy(t *testing.T) { require.NoError(t, err, "NewRequest should not return an error.") mockedClient := new(MockedClient) - errChannel := make(chan error) mockedClient.On("ListContainers").Return([]docker.Container{}, nil) - mockedClient.On("Events", mock.Anything, mock.AnythingOfType("chan<- docker.ContainerEvent")).Return(errChannel).Run(func(args mock.Arguments) { + mockedClient.On("Events", mock.Anything, mock.AnythingOfType("chan<- docker.ContainerEvent")).Return(nil).Run(func(args mock.Arguments) { messages := args.Get(1).(chan<- docker.ContainerEvent) - go func() { - time.Sleep(50 * time.Millisecond) - messages <- docker.ContainerEvent{ - Name: "start", - ActorID: "1234", - Host: "localhost", - } - messages <- docker.ContainerEvent{ - Name: "something-random", - ActorID: "1234", - Host: "localhost", - } - time.Sleep(50 * time.Millisecond) - cancel() - }() + + time.Sleep(50 * time.Millisecond) + messages <- docker.ContainerEvent{ + Name: "start", + ActorID: "1234", + Host: "localhost", + } + messages <- docker.ContainerEvent{ + Name: "something-random", + ActorID: "1234", + Host: "localhost", + } + time.Sleep(50 * time.Millisecond) + cancel() }) mockedClient.On("FindContainer", "1234").Return(docker.Container{ ID: "1234", @@ -49,6 +47,10 @@ func Test_handler_streamEvents_happy(t *testing.T) { Stats: utils.NewRingBuffer[docker.ContainerStat](300), // 300 seconds of stats }, nil) + mockedClient.On("Host").Return(&docker.Host{ + ID: "localhost", + }) + clients := map[string]docker.Client{ "localhost": mockedClient, } diff --git a/internal/web/routes_test.go b/internal/web/routes_test.go index 9bdaafeeef7..bef64b724f5 100644 --- a/internal/web/routes_test.go +++ b/internal/web/routes_test.go @@ -40,9 +40,9 @@ func (m *MockedClient) ContainerLogs(ctx context.Context, id string, since strin return args.Get(0).(io.ReadCloser), args.Error(1) } -func (m *MockedClient) Events(ctx context.Context, events chan<- docker.ContainerEvent) <-chan error { +func (m *MockedClient) Events(ctx context.Context, events chan<- docker.ContainerEvent) error { args := m.Called(ctx, events) - return args.Get(0).(chan error) + return args.Error(0) } func (m *MockedClient) ContainerStats(context.Context, string, chan<- docker.ContainerStat) error {