Skip to content

Commit

Permalink
fix: tries to reconnect of remote host disconnects (#2876)
Browse files Browse the repository at this point in the history
  • Loading branch information
amir20 authored Apr 8, 2024
1 parent 3d2036c commit 83f488e
Show file tree
Hide file tree
Showing 15 changed files with 196 additions and 101 deletions.
3 changes: 3 additions & 0 deletions assets/auto-imports.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ declare global {
const useGamepad: typeof import('@vueuse/core')['useGamepad']
const useGeolocation: typeof import('@vueuse/core')['useGeolocation']
const useHead: typeof import('@vueuse/head')['useHead']
const useHosts: typeof import('./stores/hosts')['useHosts']
const useI18n: typeof import('vue-i18n')['useI18n']
const useIdle: typeof import('@vueuse/core')['useIdle']
const useImage: typeof import('@vueuse/core')['useImage']
Expand Down Expand Up @@ -587,6 +588,7 @@ declare module 'vue' {
readonly useGamepad: UnwrapRef<typeof import('@vueuse/core')['useGamepad']>
readonly useGeolocation: UnwrapRef<typeof import('@vueuse/core')['useGeolocation']>
readonly useHead: UnwrapRef<typeof import('@vueuse/head')['useHead']>
readonly useHosts: UnwrapRef<typeof import('./stores/hosts')['useHosts']>
readonly useI18n: UnwrapRef<typeof import('vue-i18n')['useI18n']>
readonly useIdle: UnwrapRef<typeof import('@vueuse/core')['useIdle']>
readonly useImage: UnwrapRef<typeof import('@vueuse/core')['useImage']>
Expand Down Expand Up @@ -937,6 +939,7 @@ declare module '@vue/runtime-core' {
readonly useGamepad: UnwrapRef<typeof import('@vueuse/core')['useGamepad']>
readonly useGeolocation: UnwrapRef<typeof import('@vueuse/core')['useGeolocation']>
readonly useHead: UnwrapRef<typeof import('@vueuse/head')['useHead']>
readonly useHosts: UnwrapRef<typeof import('./stores/hosts')['useHosts']>
readonly useI18n: UnwrapRef<typeof import('vue-i18n')['useI18n']>
readonly useIdle: UnwrapRef<typeof import('@vueuse/core')['useIdle']>
readonly useImage: UnwrapRef<typeof import('@vueuse/core')['useImage']>
Expand Down
16 changes: 4 additions & 12 deletions assets/components/SideMenu.vue
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@
</div>
<transition :name="sessionHost ? 'slide-left' : 'slide-right'" mode="out-in">
<ul class="menu p-0" v-if="!sessionHost">
<li v-for="host in config.hosts">
<a @click.prevent="setHost(host.id)">
<li v-for="host in hosts">
<a @click.prevent="setHost(host.id)" :class="{ 'pointer-events-none text-base-content/50': !host.available }">
<ph:computer-tower />
{{ host.name }}
<span class="badge badge-error badge-xs p-1.5" v-if="!host.available">offline</span>
</a>
</li>
</ul>
Expand Down Expand Up @@ -68,6 +69,7 @@ import { sessionHost } from "@/composable/storage";
const store = useContainerStore();
const { activeContainers, visibleContainers, ready } = storeToRefs(store);
const { hosts } = useHosts();
function setHost(host: string | null) {
sessionHost.value = host;
Expand Down Expand Up @@ -116,16 +118,6 @@ const menuItems = computed(() => {
}
});
const hosts = computed(() =>
config.hosts.reduce(
(acc, item) => {
acc[item.id] = item;
return acc;
},
{} as Record<string, { name: string; id: string }>,
),
);
const activeContainersById = computed(() =>
activeContainers.value.reduce(
(acc, item) => {
Expand Down
6 changes: 6 additions & 0 deletions assets/stores/container.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { Container } from "@/models/Container";
import i18n from "@/modules/i18n";

const { showToast } = useToast();
const { markHostAvailable } = useHosts();
// @ts-ignore
const { t } = i18n.global;

Expand Down Expand Up @@ -68,6 +69,11 @@ export const useContainerStore = defineStore("container", () => {
}
});

es.addEventListener("host-unavailable", (e) => {
const hostId = (e as MessageEvent).data;
markHostAvailable(hostId, false);
});

es.addEventListener("container-health", (e) => {
const event = JSON.parse((e as MessageEvent).data) as { actorId: string; health: ContainerHealth };
const container = allContainersById.value[event.actorId];
Expand Down
25 changes: 25 additions & 0 deletions assets/stores/hosts.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
type Host = {
name: string;
id: string;
available: boolean;
};
const hosts = computed(() =>
config.hosts.reduce(
(acc, item) => {
acc[item.id] = { ...item, available: true };
return acc;
},
{} as Record<string, Host>,
),
);

const markHostAvailable = (id: string, available: boolean) => {
hosts.value[id].available = available;
};

export function useHosts() {
return {
hosts,
markHostAvailable,
};
}
8 changes: 7 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ services:
build:
context: .
depends_on:
- proxy
proxy:
condition: service_healthy

proxy:
container_name: proxy
Expand All @@ -53,6 +54,11 @@ services:
- /var/run/docker.sock:/var/run/docker.sock:ro
environment:
- CONTAINERS=1
healthcheck:
test: ["CMD", "nc", "-z", "127.0.0.1", "2375"]
interval: 30s
retries: 5
start_period: 5s
ports:
- 2375:2375

Expand Down
40 changes: 16 additions & 24 deletions internal/docker/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type Client interface {
ListContainers() ([]Container, error)
FindContainer(string) (Container, error)
ContainerLogs(context.Context, string, string, StdType) (io.ReadCloser, error)
Events(context.Context, chan<- ContainerEvent) <-chan error
Events(context.Context, chan<- ContainerEvent) error
ContainerLogsBetweenDates(context.Context, string, time.Time, time.Time, StdType) (io.ReadCloser, error)
ContainerStats(context.Context, string, chan<- ContainerStat) error
Ping(context.Context) (types.Ping, error)
Expand Down Expand Up @@ -297,35 +297,27 @@ func (d *_client) ContainerLogs(ctx context.Context, id string, since string, st
return reader, nil
}

func (d *_client) Events(ctx context.Context, messages chan<- ContainerEvent) <-chan error {
dockerMessages, errors := d.cli.Events(ctx, types.EventsOptions{})
func (d *_client) Events(ctx context.Context, messages chan<- ContainerEvent) error {
dockerMessages, err := d.cli.Events(ctx, types.EventsOptions{})

go func() {

for {
select {
case <-ctx.Done():
return
case err := <-errors:
log.Fatalf("error while listening to docker events: %v. Exiting...", err)
case message, ok := <-dockerMessages:
if !ok {
log.Errorf("docker events channel closed")
return
}
for {
select {
case <-ctx.Done():
return nil
case err := <-err:
return err

if message.Type == "container" && len(message.Actor.ID) > 0 {
messages <- ContainerEvent{
ActorID: message.Actor.ID[:12],
Name: string(message.Action),
Host: d.host.ID,
}
case message := <-dockerMessages:
if message.Type == "container" && len(message.Actor.ID) > 0 {
messages <- ContainerEvent{
ActorID: message.Actor.ID[:12],
Name: string(message.Action),
Host: d.host.ID,
}
}
}
}()
}

return errors
}

func (d *_client) ContainerLogsBetweenDates(ctx context.Context, id string, from time.Time, to time.Time, stdType StdType) (io.ReadCloser, error) {
Expand Down
63 changes: 45 additions & 18 deletions internal/docker/container_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package docker

import (
"context"
"errors"
"sync"
"sync/atomic"

"github.com/puzpuzpuz/xsync/v3"
log "github.com/sirupsen/logrus"
Expand All @@ -14,6 +16,9 @@ type ContainerStore struct {
client Client
statsCollector *StatsCollector
wg sync.WaitGroup
connected atomic.Bool
events chan ContainerEvent
ctx context.Context
}

func NewContainerStore(ctx context.Context, client Client) *ContainerStore {
Expand All @@ -23,24 +28,54 @@ func NewContainerStore(ctx context.Context, client Client) *ContainerStore {
subscribers: xsync.NewMapOf[context.Context, chan ContainerEvent](),
statsCollector: NewStatsCollector(client),
wg: sync.WaitGroup{},
events: make(chan ContainerEvent),
ctx: ctx,
}

s.wg.Add(1)

go s.init(ctx)
go s.init()

return s
}

func (s *ContainerStore) List() []Container {
func (s *ContainerStore) checkConnectivity() error {
if s.connected.CompareAndSwap(false, true) {
go func() {
log.Debugf("subscribing to docker events from container store %s", s.client.Host())
err := s.client.Events(s.ctx, s.events)
if !errors.Is(err, context.Canceled) {
log.Errorf("docker store unexpectedly disconnected from docker events from %s with %v", s.client.Host(), err)
}
s.connected.Store(false)
}()

if containers, err := s.client.ListContainers(); err != nil {
return err
} else {
s.containers.Clear()
for _, c := range containers {
s.containers.Store(c.ID, &c)
}
}
}

return nil
}

func (s *ContainerStore) List() ([]Container, error) {
s.wg.Wait()

if err := s.checkConnectivity(); err != nil {
return nil, err
}
containers := make([]Container, 0)
s.containers.Range(func(_ string, c *Container) bool {
containers = append(containers, *c)
return true
})

return containers
return containers, nil
}

func (s *ContainerStore) Client() Client {
Expand All @@ -49,14 +84,15 @@ func (s *ContainerStore) Client() Client {

func (s *ContainerStore) Subscribe(ctx context.Context, events chan ContainerEvent) {
go func() {
if s.statsCollector.Start(context.Background()) {
if s.statsCollector.Start(s.ctx) {
log.Debug("clearing container stats as stats collector has been stopped")
s.containers.Range(func(_ string, c *Container) bool {
c.Stats.Clear()
return true
})
}
}()

s.subscribers.Store(ctx, events)
}

Expand All @@ -69,26 +105,17 @@ func (s *ContainerStore) SubscribeStats(ctx context.Context, stats chan Containe
s.statsCollector.Subscribe(ctx, stats)
}

func (s *ContainerStore) init(ctx context.Context) {
events := make(chan ContainerEvent)
s.client.Events(ctx, events)

func (s *ContainerStore) init() {
stats := make(chan ContainerStat)
s.statsCollector.Subscribe(ctx, stats)
s.statsCollector.Subscribe(s.ctx, stats)

if containers, err := s.client.ListContainers(); err == nil {
for _, c := range containers {
s.containers.Store(c.ID, &c)
}
} else {
log.Fatalf("error listing containers: %v", err)
}
s.checkConnectivity()

s.wg.Done()

for {
select {
case event := <-events:
case event := <-s.events:
log.Tracef("received event: %+v", event)
switch event.Name {
case "start":
Expand Down Expand Up @@ -129,7 +156,7 @@ func (s *ContainerStore) init(ctx context.Context) {
stat.ID = ""
container.Stats.Push(stat)
}
case <-ctx.Done():
case <-s.ctx.Done():
return
}
}
Expand Down
Loading

0 comments on commit 83f488e

Please sign in to comment.