Skip to content

Commit

Permalink
use http hijacking instead of raw tcp for proxy. allow disabling of d…
Browse files Browse the repository at this point in the history
…ocker proxy
  • Loading branch information
frantjc committed Apr 9, 2024
1 parent 070d2a6 commit af4cd7c
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 138 deletions.
2 changes: 2 additions & 0 deletions command/forge.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"runtime"

"github.com/frantjc/forge"
"github.com/frantjc/forge/internal/containerutil"
"github.com/spf13/cobra"
)

Expand All @@ -27,6 +28,7 @@ func NewForge() *cobra.Command {

cmd.SetVersionTemplate("{{ .Name }}{{ .Version }} " + runtime.Version() + "\n")
cmd.PersistentFlags().CountVarP(&verbosity, "verbose", "V", "verbosity for forge")
cmd.PersistentFlags().BoolVar(&containerutil.NoUseForgeSock, "no-forge-sock", false, "disable use of forge.sock")
cmd.AddCommand(NewUse(), NewGet(), NewPut(), NewCheck(), NewTask(), NewCloudBuild(), NewCache())

return cmd
Expand Down
2 changes: 1 addition & 1 deletion internal/command/sleep.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func NewSleep() *cobra.Command {

if dockerHost := os.Getenv(client.EnvOverrideHost); dockerHost != "" {
if dockerSock, err := url.Parse(dockerHost); err == nil {
return dind.NewProxy(ctx, mounts, lis, dockerSock)
return dind.ServeDockerdProxy(ctx, mounts, lis, dockerSock)
}
}
}
Expand Down
8 changes: 7 additions & 1 deletion internal/containerutil/create_sleeping_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,14 @@ import (
"github.com/frantjc/forge/internal/hooks"
)

var NoUseForgeSock bool

func CreateSleepingContainer(ctx context.Context, containerRuntime forge.ContainerRuntime, image forge.Image, containerConfig *forge.ContainerConfig) (forge.Container, error) {
entrypoint := []string{bin.ShimPath, "sleep", "--sock", containerfs.ForgeSock}
entrypoint := []string{bin.ShimPath, "sleep"}

if !NoUseForgeSock {
entrypoint = append(entrypoint, "--sock", containerfs.ForgeSock)
}

for _, mount := range containerConfig.Mounts {
if mount.Source != "" && mount.Destination != "" {
Expand Down
274 changes: 138 additions & 136 deletions internal/dind/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,22 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net"
"net/http"
"net/url"
"os"
"path/filepath"
"strings"
"sync"
"time"

"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
xslice "github.com/frantjc/x/slice"
)

// NewProxy listens on the given listener for traffic intended for `dockerd`.
// ServeDockerdProxy listens on the given listener for traffic intended for `dockerd`.
// It proxies responses back from `dockerd` directly, but modifies requests
// to `dockerd` to try to translate them for the host `dockerd` when the
// client is calling from inside of a container.
Expand All @@ -31,11 +32,11 @@ import (
//
// It always returns an error and doesn't exit until the given context.Context
// is done or an error is encountered, similar to http.Serve.
func NewProxy(ctx context.Context, mounts map[string]string, lis net.Listener, dockerSock *url.URL) error {
func ServeDockerdProxy(ctx context.Context, mounts map[string]string, lis net.Listener, dockerSock *url.URL) error {
var (
errC = make(chan error)
network = "tcp"
address = dockerSock.Host
errC = make(chan error, 1)
)

if strings.EqualFold("unix", dockerSock.Scheme) {
Expand All @@ -44,154 +45,155 @@ func NewProxy(ctx context.Context, mounts map[string]string, lis net.Listener, d
}

go func() {
errC <- func() error {
for {
cli, err := lis.Accept()
if err != nil {
return err
}

dockerd, err := net.Dial(network, address)
errC <- (&http.Server{
ReadHeaderTimeout: time.Second * 5,
BaseContext: func(_ net.Listener) context.Context {
return ctx
},
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var (
statusCode = http.StatusInternalServerError
errC2 = make(chan error, 1)
)

conn, cli, err := func() (net.Conn, *bufio.ReadWriter, error) {
h, ok := w.(http.Hijacker)
if !ok {
return nil, nil, fmt.Errorf("not a hijacker")
}

return h.Hijack()
}()
if err != nil {
return err
_ = json.NewEncoder(w).Encode(&types.ErrorResponse{
Message: err.Error(),
})
return
}

// Copy responses from `dockerd` straight back to the client.
go func() {
// Close the client connection once we
// reach EOF on the response.
defer dockerd.Close()
defer cli.Close()

errC <- func() error {
if _, err = io.Copy(cli, dockerd); err != nil {
return err
}

return nil
}()
}()
closeConn := sync.OnceFunc(func() {
_ = cli.Flush()
_ = conn.Close()
})
defer closeConn()

if err = func() error {
daemon, err := net.Dial(network, address)
if err != nil {
return err
}

// Copy responses from `dockerd` straight back to the client.
go func() {
// Close the client connection once we
// reach EOF on the response.
defer closeConn()
defer daemon.Close()

buf := bufio.NewReader(daemon)

errC2 <- func() error {
res, err := http.ReadResponse(buf, r)
if err != nil {
return err
}

// Intercept, inspect and potentially modify requests to
// `dockerd` from the client.
go func() {
errC <- func() error {
buf := bufio.NewReader(cli)

for {
req, err := http.ReadRequest(buf)
if errors.Is(err, io.EOF) {
break
} else if err != nil {
if err = res.Write(io.MultiWriter(os.Stdout, cli)); err != nil {
return err
}

// TODO: Presumably there are other requests that
// need intercepted and modified to work properly.
if req.Method == http.MethodPost && strings.HasSuffix(req.URL.Path, "/containers/create") {
body := &struct {
HostConfig container.HostConfig
NetworkingConfig map[string]any
container.Config `json:",inline"`
}{}

if err := json.NewDecoder(req.Body).Decode(body); err != nil {
return err
}
return nil
}()
}()

if err = req.Body.Close(); err != nil {
return err
}
// Intercept, inspect and potentially modify requests to
// `dockerd` from the client.
// TODO: Presumably there are other requests that
// need intercepted and modified to work properly.
if r.Method == http.MethodPost && strings.HasSuffix(r.URL.Path, "/containers/create") {
body := &struct {
HostConfig container.HostConfig
NetworkingConfig map[string]any
container.Config `json:",inline"`
}{}

if err := json.NewDecoder(cli).Decode(body); err != nil {
return err
}

// Replace requested mount sources with their
// host path equivalents. Error if impossible.
//
// For example, if the client is running in container1 which
// has mount `/host/path:/container1/path` and requests mount
// `/container1/path/subpath:/container2/path`, then we modify the
// request to be for the mount `/host/path/subpath:/container2/path`.
mountsOk := true
body.HostConfig.Binds = xslice.Map(body.HostConfig.Binds, func(bind string, _ int) string {
var (
parts = strings.SplitN(bind, ":", 2)
src = parts[0]
dst = parts[1]
// Replace requested mount sources with their
// host path equivalents. Error if impossible.
//
// For example, if the client is running in container1 which
// has mount `/host/path:/container1/path` and requests mount
// `/container1/path/subpath:/container2/path`, then we modify the
// request to be for the mount `/host/path/subpath:/container2/path`.
for i, bind := range body.HostConfig.Binds {
var (
parts = strings.SplitN(bind, ":", 2)
src = parts[0]
dst = parts[1]
)

for k, v := range mounts {
if strings.HasPrefix(src, v) {
body.HostConfig.Binds[i] = fmt.Sprintf("%s:%s",
filepath.Join(
k, strings.TrimPrefix(src, v),
),
dst,
)

if mountsOk {
for k, v := range mounts {
if strings.HasPrefix(src, v) {
return fmt.Sprintf("%s:%s",
filepath.Join(
k, strings.TrimPrefix(src, v),
),
dst,
)
}
}

mountsOk = false
}

return bind
})

buf := new(bytes.Buffer)

if !mountsOk {
if err = json.NewEncoder(buf).Encode(&types.ErrorResponse{
Message: "one or more requested mounts cannot be satisfied by Forge because it exists inside of the container that Forge is running your process inside of, but not on the host where the Docker daemon is running",
}); err != nil {
return err
}

if err = (&http.Response{
Status: http.StatusText(http.StatusInternalServerError),
StatusCode: http.StatusInternalServerError,
Proto: req.Proto,
ProtoMajor: req.ProtoMajor,
ProtoMinor: req.ProtoMinor,
Body: io.NopCloser(buf),
ContentLength: int64(buf.Len()),
Request: req,
}).Write(cli); err != nil {
return err
}

return nil
continue
}
}

if err = json.NewEncoder(buf).Encode(body); err != nil {
return err
}
return fmt.Errorf("one or more requested mounts cannot be satisfied by Forge because it exists inside of the container that Forge is running your process inside of, but not on the host where the Docker daemon is running")
}

// Since we possibly modified the request body,
// the Content-Length has possibly changed.
req.Body = io.NopCloser(buf)
req.Header.Set("Content-Length", fmt.Sprint(buf.Len()))
req.ContentLength = int64(buf.Len())
}
buf := new(bytes.Buffer)

if err := req.WriteProxy(dockerd); err != nil {
return err
}
if err = json.NewEncoder(buf).Encode(body); err != nil {
return err
}

return nil
}()
}()
}
}()
// Since we possibly modified the request body,
// the Content-Length has possibly changed.
lenBuf := buf.Len()
r.Body = io.NopCloser(buf)
r.Header.Set("Content-Length", fmt.Sprint(lenBuf))
r.ContentLength = int64(lenBuf)
}

if err := r.WriteProxy(io.MultiWriter(os.Stdout, daemon)); err != nil {
return err
}

return <-errC2
}(); err != nil {
buf := new(bytes.Buffer)

_ = json.NewEncoder(buf).Encode(&types.ErrorResponse{
Message: err.Error(),
})

_ = (&http.Response{
Status: http.StatusText(statusCode),
StatusCode: statusCode,
Proto: r.Proto,
ProtoMajor: r.ProtoMajor,
ProtoMinor: r.ProtoMajor,
Request: r,
Body: io.NopCloser(buf),
}).Write(cli)
}
}),
}).Serve(lis)
}()

for {
select {
case <-ctx.Done():
return ctx.Err()
case err := <-errC:
if err != nil {
return err
}
}
select {
case <-ctx.Done():
return ctx.Err()
case err := <-errC:
return err
}
}

0 comments on commit af4cd7c

Please sign in to comment.