From af4cd7c3aab73927a8f8d3f9d9a5b5b0645a5e53 Mon Sep 17 00:00:00 2001 From: Frantjc Date: Mon, 8 Apr 2024 23:34:15 -0400 Subject: [PATCH] use http hijacking instead of raw tcp for proxy. allow disabling of docker proxy --- command/forge.go | 2 + internal/command/sleep.go | 2 +- .../create_sleeping_container.go | 8 +- internal/dind/proxy.go | 274 +++++++++--------- 4 files changed, 148 insertions(+), 138 deletions(-) diff --git a/command/forge.go b/command/forge.go index 227218a8..18291ce5 100644 --- a/command/forge.go +++ b/command/forge.go @@ -4,6 +4,7 @@ import ( "runtime" "github.com/frantjc/forge" + "github.com/frantjc/forge/internal/containerutil" "github.com/spf13/cobra" ) @@ -27,6 +28,7 @@ func NewForge() *cobra.Command { cmd.SetVersionTemplate("{{ .Name }}{{ .Version }} " + runtime.Version() + "\n") cmd.PersistentFlags().CountVarP(&verbosity, "verbose", "V", "verbosity for forge") + cmd.PersistentFlags().BoolVar(&containerutil.NoUseForgeSock, "no-forge-sock", false, "disable use of forge.sock") cmd.AddCommand(NewUse(), NewGet(), NewPut(), NewCheck(), NewTask(), NewCloudBuild(), NewCache()) return cmd diff --git a/internal/command/sleep.go b/internal/command/sleep.go index 610d787b..5a2bdabe 100644 --- a/internal/command/sleep.go +++ b/internal/command/sleep.go @@ -30,7 +30,7 @@ func NewSleep() *cobra.Command { if dockerHost := os.Getenv(client.EnvOverrideHost); dockerHost != "" { if dockerSock, err := url.Parse(dockerHost); err == nil { - return dind.NewProxy(ctx, mounts, lis, dockerSock) + return dind.ServeDockerdProxy(ctx, mounts, lis, dockerSock) } } } diff --git a/internal/containerutil/create_sleeping_container.go b/internal/containerutil/create_sleeping_container.go index 5be3166b..16fd6993 100644 --- a/internal/containerutil/create_sleeping_container.go +++ b/internal/containerutil/create_sleeping_container.go @@ -11,8 +11,14 @@ import ( "github.com/frantjc/forge/internal/hooks" ) +var NoUseForgeSock bool + func CreateSleepingContainer(ctx context.Context, containerRuntime forge.ContainerRuntime, image forge.Image, containerConfig *forge.ContainerConfig) (forge.Container, error) { - entrypoint := []string{bin.ShimPath, "sleep", "--sock", containerfs.ForgeSock} + entrypoint := []string{bin.ShimPath, "sleep"} + + if !NoUseForgeSock { + entrypoint = append(entrypoint, "--sock", containerfs.ForgeSock) + } for _, mount := range containerConfig.Mounts { if mount.Source != "" && mount.Destination != "" { diff --git a/internal/dind/proxy.go b/internal/dind/proxy.go index 980fa39d..6c4df5c1 100644 --- a/internal/dind/proxy.go +++ b/internal/dind/proxy.go @@ -5,21 +5,22 @@ import ( "bytes" "context" "encoding/json" - "errors" "fmt" "io" "net" "net/http" "net/url" + "os" "path/filepath" "strings" + "sync" + "time" "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/container" - xslice "github.com/frantjc/x/slice" ) -// NewProxy listens on the given listener for traffic intended for `dockerd`. +// ServeDockerdProxy listens on the given listener for traffic intended for `dockerd`. // It proxies responses back from `dockerd` directly, but modifies requests // to `dockerd` to try to translate them for the host `dockerd` when the // client is calling from inside of a container. @@ -31,11 +32,11 @@ import ( // // It always returns an error and doesn't exit until the given context.Context // is done or an error is encountered, similar to http.Serve. -func NewProxy(ctx context.Context, mounts map[string]string, lis net.Listener, dockerSock *url.URL) error { +func ServeDockerdProxy(ctx context.Context, mounts map[string]string, lis net.Listener, dockerSock *url.URL) error { var ( - errC = make(chan error) network = "tcp" address = dockerSock.Host + errC = make(chan error, 1) ) if strings.EqualFold("unix", dockerSock.Scheme) { @@ -44,154 +45,155 @@ func NewProxy(ctx context.Context, mounts map[string]string, lis net.Listener, d } go func() { - errC <- func() error { - for { - cli, err := lis.Accept() - if err != nil { - return err - } - - dockerd, err := net.Dial(network, address) + errC <- (&http.Server{ + ReadHeaderTimeout: time.Second * 5, + BaseContext: func(_ net.Listener) context.Context { + return ctx + }, + Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var ( + statusCode = http.StatusInternalServerError + errC2 = make(chan error, 1) + ) + + conn, cli, err := func() (net.Conn, *bufio.ReadWriter, error) { + h, ok := w.(http.Hijacker) + if !ok { + return nil, nil, fmt.Errorf("not a hijacker") + } + + return h.Hijack() + }() if err != nil { - return err + _ = json.NewEncoder(w).Encode(&types.ErrorResponse{ + Message: err.Error(), + }) + return } - // Copy responses from `dockerd` straight back to the client. - go func() { - // Close the client connection once we - // reach EOF on the response. - defer dockerd.Close() - defer cli.Close() - - errC <- func() error { - if _, err = io.Copy(cli, dockerd); err != nil { - return err - } - - return nil - }() - }() + closeConn := sync.OnceFunc(func() { + _ = cli.Flush() + _ = conn.Close() + }) + defer closeConn() + + if err = func() error { + daemon, err := net.Dial(network, address) + if err != nil { + return err + } + + // Copy responses from `dockerd` straight back to the client. + go func() { + // Close the client connection once we + // reach EOF on the response. + defer closeConn() + defer daemon.Close() + + buf := bufio.NewReader(daemon) + + errC2 <- func() error { + res, err := http.ReadResponse(buf, r) + if err != nil { + return err + } - // Intercept, inspect and potentially modify requests to - // `dockerd` from the client. - go func() { - errC <- func() error { - buf := bufio.NewReader(cli) - - for { - req, err := http.ReadRequest(buf) - if errors.Is(err, io.EOF) { - break - } else if err != nil { + if err = res.Write(io.MultiWriter(os.Stdout, cli)); err != nil { return err } - // TODO: Presumably there are other requests that - // need intercepted and modified to work properly. - if req.Method == http.MethodPost && strings.HasSuffix(req.URL.Path, "/containers/create") { - body := &struct { - HostConfig container.HostConfig - NetworkingConfig map[string]any - container.Config `json:",inline"` - }{} - - if err := json.NewDecoder(req.Body).Decode(body); err != nil { - return err - } + return nil + }() + }() - if err = req.Body.Close(); err != nil { - return err - } + // Intercept, inspect and potentially modify requests to + // `dockerd` from the client. + // TODO: Presumably there are other requests that + // need intercepted and modified to work properly. + if r.Method == http.MethodPost && strings.HasSuffix(r.URL.Path, "/containers/create") { + body := &struct { + HostConfig container.HostConfig + NetworkingConfig map[string]any + container.Config `json:",inline"` + }{} + + if err := json.NewDecoder(cli).Decode(body); err != nil { + return err + } - // Replace requested mount sources with their - // host path equivalents. Error if impossible. - // - // For example, if the client is running in container1 which - // has mount `/host/path:/container1/path` and requests mount - // `/container1/path/subpath:/container2/path`, then we modify the - // request to be for the mount `/host/path/subpath:/container2/path`. - mountsOk := true - body.HostConfig.Binds = xslice.Map(body.HostConfig.Binds, func(bind string, _ int) string { - var ( - parts = strings.SplitN(bind, ":", 2) - src = parts[0] - dst = parts[1] + // Replace requested mount sources with their + // host path equivalents. Error if impossible. + // + // For example, if the client is running in container1 which + // has mount `/host/path:/container1/path` and requests mount + // `/container1/path/subpath:/container2/path`, then we modify the + // request to be for the mount `/host/path/subpath:/container2/path`. + for i, bind := range body.HostConfig.Binds { + var ( + parts = strings.SplitN(bind, ":", 2) + src = parts[0] + dst = parts[1] + ) + + for k, v := range mounts { + if strings.HasPrefix(src, v) { + body.HostConfig.Binds[i] = fmt.Sprintf("%s:%s", + filepath.Join( + k, strings.TrimPrefix(src, v), + ), + dst, ) - - if mountsOk { - for k, v := range mounts { - if strings.HasPrefix(src, v) { - return fmt.Sprintf("%s:%s", - filepath.Join( - k, strings.TrimPrefix(src, v), - ), - dst, - ) - } - } - - mountsOk = false - } - - return bind - }) - - buf := new(bytes.Buffer) - - if !mountsOk { - if err = json.NewEncoder(buf).Encode(&types.ErrorResponse{ - Message: "one or more requested mounts cannot be satisfied by Forge because it exists inside of the container that Forge is running your process inside of, but not on the host where the Docker daemon is running", - }); err != nil { - return err - } - - if err = (&http.Response{ - Status: http.StatusText(http.StatusInternalServerError), - StatusCode: http.StatusInternalServerError, - Proto: req.Proto, - ProtoMajor: req.ProtoMajor, - ProtoMinor: req.ProtoMinor, - Body: io.NopCloser(buf), - ContentLength: int64(buf.Len()), - Request: req, - }).Write(cli); err != nil { - return err - } - - return nil + continue } + } - if err = json.NewEncoder(buf).Encode(body); err != nil { - return err - } + return fmt.Errorf("one or more requested mounts cannot be satisfied by Forge because it exists inside of the container that Forge is running your process inside of, but not on the host where the Docker daemon is running") + } - // Since we possibly modified the request body, - // the Content-Length has possibly changed. - req.Body = io.NopCloser(buf) - req.Header.Set("Content-Length", fmt.Sprint(buf.Len())) - req.ContentLength = int64(buf.Len()) - } + buf := new(bytes.Buffer) - if err := req.WriteProxy(dockerd); err != nil { - return err - } + if err = json.NewEncoder(buf).Encode(body); err != nil { + return err } - return nil - }() - }() - } - }() + // Since we possibly modified the request body, + // the Content-Length has possibly changed. + lenBuf := buf.Len() + r.Body = io.NopCloser(buf) + r.Header.Set("Content-Length", fmt.Sprint(lenBuf)) + r.ContentLength = int64(lenBuf) + } + + if err := r.WriteProxy(io.MultiWriter(os.Stdout, daemon)); err != nil { + return err + } + + return <-errC2 + }(); err != nil { + buf := new(bytes.Buffer) + + _ = json.NewEncoder(buf).Encode(&types.ErrorResponse{ + Message: err.Error(), + }) + + _ = (&http.Response{ + Status: http.StatusText(statusCode), + StatusCode: statusCode, + Proto: r.Proto, + ProtoMajor: r.ProtoMajor, + ProtoMinor: r.ProtoMajor, + Request: r, + Body: io.NopCloser(buf), + }).Write(cli) + } + }), + }).Serve(lis) }() - for { - select { - case <-ctx.Done(): - return ctx.Err() - case err := <-errC: - if err != nil { - return err - } - } + select { + case <-ctx.Done(): + return ctx.Err() + case err := <-errC: + return err } }