Skip to content

Commit

Permalink
kube: controller lifecycle improvements
Browse files Browse the repository at this point in the history
* Optimize the `Dockerfile` to use cache mounts
* Use the `context.Context` during HTTP operations
* Add an internal context for normal controller operations
* Sequence shutdown to wait for informer to stop and then
  attempt to unexpose any ports before exiting
  * Unexpose ports in parallel, we just log out all the
    errors regardless

Signed-off-by: Milas Bowman <[email protected]>
  • Loading branch information
milas committed May 12, 2023
1 parent dc331cb commit a7e2158
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 30 deletions.
18 changes: 14 additions & 4 deletions go/Dockerfile.kube-forwarder
Original file line number Diff line number Diff line change
@@ -1,11 +1,21 @@
# syntax=docker/dockerfile:1
FROM golang:1.19-alpine AS builder

WORKDIR /go/src/github.com/moby/vpnkit/go
COPY . /go/src/github.com/moby/vpnkit

RUN apk add --no-cache musl-dev build-base
RUN GOPATH=/go CGO_ENABLED=1 go build -buildmode pie -ldflags "-linkmode=external -s -extldflags \"-fno-PIC -static\"" -o /kube-vpnkit-forwarder /go/src/github.com/moby/vpnkit/go/cmd/kube-vpnkit-forwarder/main.go

# no separate go mod download step because vendoring is in use
COPY . /src
WORKDIR /src

RUN --mount=type=bind,target=. \
--mount=type=cache,target=/root/.cache \
--mount=type=cache,target=/go/pkg/mod \
CGO_ENABLED=1 go build \
-mod=vendor \
-buildmode pie \
-ldflags '-linkmode=external -s -extldflags "-fno-PIC -static"' \
-o /kube-vpnkit-forwarder \
./go/cmd/kube-vpnkit-forwarder

FROM scratch
COPY --link --from=builder /kube-vpnkit-forwarder /kube-vpnkit-forwarder
Expand Down
50 changes: 42 additions & 8 deletions go/cmd/kube-vpnkit-forwarder/main.go
Original file line number Diff line number Diff line change
@@ -1,28 +1,42 @@
package main

import (
"context"
"flag"
"log"
"os"
"os/signal"
"syscall"
"time"

"github.com/moby/vpnkit/go/pkg/controller"
"github.com/moby/vpnkit/go/pkg/vpnkit"
log "github.com/sirupsen/logrus"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)

const defaultLogLevel = log.InfoLevel

var path string
var logLevelName string

func main() {
flag.StringVar(&path, "path", "", "unix socket to vpnkit port forward API")
flag.StringVar(&logLevelName, "log-level", defaultLogLevel.String(), "log output level (error, warn, info, debug)")
flag.Parse()

if logLevel, err := log.ParseLevel(logLevelName); err == nil {
log.SetLevel(logLevel)
} else {
log.SetLevel(defaultLogLevel)
log.Warnf("Using default log level (%s): %v", defaultLogLevel.String(), err)
}

log.Println("Starting kube-vpnkit-forwarder...")

rootCtx := context.Background()

clusterConfig, err := rest.InClusterConfig()
if err != nil {
log.Fatal(err.Error())
Expand All @@ -40,18 +54,38 @@ func main() {
if err != nil {
log.Fatal(err)
}
controller := controller.New(vpnkitClient, clientset.CoreV1())
defer controller.Dispose()

informer.AddEventHandler(controller)
vpnkitController := controller.New(rootCtx, vpnkitClient, clientset.CoreV1())
if _, err := informer.AddEventHandler(vpnkitController); err != nil {
log.Fatal(err)
}

// stop signals to the informer to stop the controllers
// informerDone signals that the informer has actually stopped running
stop := make(chan struct{})
go informer.Run(stop)
informerDone := make(chan struct{})
go func() {
defer close(informerDone)
informer.Run(stop)
}()

signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
<-signalChan

log.Println("Shutdown signal received, exiting...")
log.Println("Shutdown signal received")
close(stop)

// allow the informer a chance to stop cleanly
log.Println("Waiting for controller to finish")
select {
case <-time.After(10 * time.Second):
log.Warn("Controller shutdown timed out")
case <-informerDone:
}

// always attempt cleanup, even if the informer didn't stop nicely,
// we can still hopefully unexpose any open ports
log.Println("Cleaning up controller")
cleanupCtx, cancel := context.WithTimeout(rootCtx, 15*time.Second)
defer cancel()
vpnkitController.Dispose(cleanupCtx)
}
37 changes: 27 additions & 10 deletions go/pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net"
"sync"

"github.com/moby/vpnkit/go/pkg/vpnkit"
"github.com/pkg/errors"
Expand All @@ -20,34 +21,50 @@ const annotation = "vpnkit-k8s-controller"
type Controller struct {
services corev1client.ServicesGetter
client vpnkit.Client

internalCtx context.Context
cancel context.CancelFunc
}

// New creates a new controller
func New(client vpnkit.Client, services corev1client.ServicesGetter) *Controller {
func New(rootCtx context.Context, client vpnkit.Client, services corev1client.ServicesGetter) *Controller {
internalCtx, cancel := context.WithCancel(rootCtx)
return &Controller{
services: services,
client: client,
internalCtx: internalCtx,
cancel: cancel,
services: services,
client: client,
}
}

var _ cache.ResourceEventHandler = &Controller{}

// Dispose unexpose all ports previously exposed by this controller
func (c *Controller) Dispose() {
ctx := context.Background()
// Dispose unexposes all ports previously exposed by this controller
func (c *Controller) Dispose(ctx context.Context) {
// stop any ongoing operations using the internalCtx
c.cancel()

ports, err := c.client.ListExposed(ctx)
if err != nil {
log.Infof("Cannot list exposed ports: %v", err)
return
}
for _, port := range ports {
var wg sync.WaitGroup
for i := range ports {
port := ports[i]
if port.Annotation != annotation {
continue
}
if err := c.client.Unexpose(ctx, &port); err != nil {
log.Infof("cannot unexpose port: %v", err)
}
wg.Add(1)
go func() {
defer wg.Done()
log.Infof("Unexposing port: %s", port.String())
if err := c.client.Unexpose(ctx, &port); err != nil {
log.Infof("cannot unexpose port: %v", err)
}
}()
}
wg.Wait()
}

// OnAdd exposes port if necessary
Expand Down
24 changes: 16 additions & 8 deletions go/pkg/vpnkit/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (e *ExposeError) Error() string {
return e.Message
}

func (h *httpClient) Expose(_ context.Context, port *Port) error {
func (h *httpClient) Expose(ctx context.Context, port *Port) error {
var buf bytes.Buffer
enc := json.NewEncoder(&buf)
if err := enc.Encode(port); err != nil {
Expand All @@ -60,7 +60,7 @@ func (h *httpClient) Expose(_ context.Context, port *Port) error {
if port.Proto == Unix {
path = ExposePipePath
}
request, err := http.NewRequest("PUT", "http://unix"+path, &buf)
request, err := http.NewRequestWithContext(ctx, http.MethodPut, "http://unix"+path, &buf)
if err != nil {
return err
}
Expand All @@ -85,7 +85,7 @@ func (h *httpClient) Expose(_ context.Context, port *Port) error {
return nil
}

func (h *httpClient) Unexpose(_ context.Context, port *Port) error {
func (h *httpClient) Unexpose(ctx context.Context, port *Port) error {
var buf bytes.Buffer
enc := json.NewEncoder(&buf)
if err := enc.Encode(port); err != nil {
Expand All @@ -95,7 +95,7 @@ func (h *httpClient) Unexpose(_ context.Context, port *Port) error {
if port.Proto == Unix {
path = UnexposePipePath
}
request, err := http.NewRequest("DELETE", "http://unix"+path, &buf)
request, err := http.NewRequestWithContext(ctx, http.MethodDelete, "http://unix"+path, &buf)
if err != nil {
return err
}
Expand All @@ -111,8 +111,12 @@ func (h *httpClient) Unexpose(_ context.Context, port *Port) error {
return nil
}

func (h *httpClient) ListExposed(context.Context) ([]Port, error) {
res, err := h.client.Get("http://unix" + ListPath)
func (h *httpClient) ListExposed(ctx context.Context) ([]Port, error) {
request, err := http.NewRequestWithContext(ctx, http.MethodGet, "http://unix"+ListPath, nil)
if err != nil {
return nil, err
}
res, err := h.client.Do(request)
if err != nil {
fmt.Printf("GET failed with %v\n", err)
return nil, err
Expand All @@ -129,8 +133,12 @@ func (h *httpClient) ListExposed(context.Context) ([]Port, error) {
return ports, nil
}

func (h *httpClient) DumpState(_ context.Context, w io.Writer) error {
res, err := h.client.Get("http://unix" + DumpStatePath)
func (h *httpClient) DumpState(ctx context.Context, w io.Writer) error {
request, err := http.NewRequestWithContext(ctx, http.MethodGet, "http://unix"+DumpStatePath, nil)
if err != nil {
return err
}
res, err := h.client.Do(request)
if err != nil {
fmt.Printf("GET failed with %v\n", err)
return err
Expand Down

0 comments on commit a7e2158

Please sign in to comment.