Skip to content

Commit

Permalink
Merge pull request #3761 from telepresenceio/thallgren/drop-replaced
Browse files Browse the repository at this point in the history
Remove the dormant container during intercept with --replace.
  • Loading branch information
thallgren authored Jan 6, 2025

Verified

This commit was signed with the committer’s verified signature.
cakemanny Daniel Golding
2 parents 13f0c83 + d9c48f2 commit c0f247a
Showing 43 changed files with 481 additions and 382 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.yml
Original file line number Diff line number Diff line change
@@ -44,6 +44,16 @@ items:
values: <namespaces>`.
```
docs: install/manager#static-versus-dynamic-namespace-selection
- type: feature
title: Removal of the dormant container during intercept with --replace.
body: |-
During a `telepresence intercept --replace operation`, the previously injected dormant container has been
removed. The Traffic Agent now directly serves as the replacement container, eliminating the need to forward
traffic to the original application container. This simplification offers several advantages when using the
`--replace` flag:
- **Removal of the init-container:** The need for a separate init-container is no longer necessary.
- **Elimination of port renames:** Port renames within the intercepted pod are no longer required.
- type: change
title: Drop deprecated current-cluster-id command.
body: >-
41 changes: 24 additions & 17 deletions cmd/traffic/cmd/agent/agent.go
Original file line number Diff line number Diff line change
@@ -95,11 +95,11 @@ func sftpServer(ctx context.Context, sftpPortCh chan<- uint16) error {
_ = l.Close()
}()

_, sftpPort, err := iputil.SplitToIPPort(l.Addr())
ap, err := iputil.SplitToIPPort(l.Addr())
if err != nil {
return err
}
sftpPortCh <- sftpPort
sftpPortCh <- ap.Port()

dlog.Infof(ctx, "Listening at: %s", l.Addr())
for {
@@ -163,31 +163,38 @@ func sidecar(ctx context.Context, s State, info *rpc.AgentInfo) error {
// Group the container's intercepts by agent port
icStates := make(map[agentconfig.PortAndProto][]*agentconfig.Intercept, len(cn.Intercepts))
for _, ic := range cn.Intercepts {
k := agentconfig.PortAndProto{Port: ic.AgentPort, Proto: ic.Protocol}
ap := ic.AgentPort
if cn.Replace {
// Listen to replaced container's original port.
ap = ic.ContainerPort
}
k := agentconfig.PortAndProto{Port: ap, Proto: ic.Protocol}
icStates[k] = append(icStates[k], ic)
}

for pp, ics := range icStates {
ic := ics[0] // They all have the same protocol container port, so the first one will do.
var fwd forwarder.Interceptor
var cp uint16
if ic.TargetPortNumeric {
// We must differentiate between connections originating from the agent's forwarder to the container
// port and those from other sources. The former should not be routed back, while the latter should
// always be routed to the agent. We do this by using a proxy port that will be recognized by the
// iptables filtering in our init-container.
cp = ac.ProxyPort(ic)
if !cn.Replace {
if ic.TargetPortNumeric {
// We must differentiate between connections originating from the agent's forwarder to the container
// port and those from other sources. The former should not be routed back, while the latter should
// always be routed to the agent. We do this by using a proxy port that will be recognized by the
// iptables filtering in our init-container.
cp = ac.ProxyPort(ic)
} else {
cp = ic.ContainerPort
}
// Redirect non-intercepted traffic to the pod, so that injected sidecars that hijack the ports for
// incoming connections will continue to work.
targetHost := s.PodIP()
fwd = forwarder.NewInterceptor(pp, targetHost, cp)
} else {
fwd = forwarder.NewInterceptor(pp, "", 0)
cp = ic.ContainerPort
}
lisAddr, err := pp.Addr()
if err != nil {
return err
}
// Redirect non-intercepted traffic to the pod, so that injected sidecars that hijack the ports for
// incoming connections will continue to work.
targetHost := s.PodIP()

fwd := forwarder.NewInterceptor(lisAddr, targetHost, cp)
dgroup.ParentGroup(ctx).Go(fmt.Sprintf("forward-%s", iputil.JoinHostPort(cn.Name, cp)), func(ctx context.Context) error {
return fwd.Serve(tunnel.WithPool(ctx, tunnel.NewPool()), nil)
})
2 changes: 1 addition & 1 deletion cmd/traffic/cmd/agent/server.go
Original file line number Diff line number Diff line change
@@ -118,7 +118,7 @@ func (s *state) ReportMetrics(ctx context.Context, metrics *rpc.TunnelMetrics) {
mCtx, mCancel := context.WithTimeout(context.WithoutCancel(ctx), time.Second)
defer mCancel()
_, err := s.manager.ReportMetrics(mCtx, metrics)
if err != nil {
if err != nil && status.Code(err) != codes.Canceled {
dlog.Errorf(ctx, "ReportMetrics failed: %v", err)
}
}()
7 changes: 2 additions & 5 deletions cmd/traffic/cmd/agent/state_test.go
Original file line number Diff line number Diff line change
@@ -2,13 +2,13 @@ package agent_test

import (
"context"
"net"
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
core "k8s.io/api/core/v1"

"github.com/datawire/dlib/dlog"
rpc "github.com/telepresenceio/telepresence/rpc/v2/manager"
@@ -23,10 +23,7 @@ const (
)

func makeFS(t *testing.T, ctx context.Context) (forwarder.Interceptor, agent.State) {
lAddr, err := net.ResolveTCPAddr("tcp", ":1111")
assert.NoError(t, err)

f := forwarder.NewInterceptor(lAddr, appHost, appPort)
f := forwarder.NewInterceptor(agentconfig.PortAndProto{Proto: core.ProtocolTCP, Port: 1111}, appHost, appPort)
go func() {
if err := f.Serve(context.Background(), nil); err != nil {
dlog.Error(ctx, err)
22 changes: 13 additions & 9 deletions cmd/traffic/cmd/agentinit/agent_init.go
Original file line number Diff line number Diff line change
@@ -7,6 +7,7 @@ import (
"context"
"fmt"
"net"
"net/netip"
"os"
"path/filepath"
"strconv"
@@ -19,7 +20,6 @@ import (
"github.com/datawire/dlib/dlog"
"github.com/telepresenceio/telepresence/v2/pkg/agentconfig"
"github.com/telepresenceio/telepresence/v2/pkg/dos"
"github.com/telepresenceio/telepresence/v2/pkg/iputil"
"github.com/telepresenceio/telepresence/v2/pkg/version"
)

@@ -45,7 +45,7 @@ func loadConfig(ctx context.Context) (*config, error) {
return &c, nil
}

func (c *config) configureIptables(ctx context.Context, iptables *iptables.IPTables, loopback, localHostCIDR, podIP string) error {
func (c *config) configureIptables(ctx context.Context, iptables *iptables.IPTables, loopback string, localHostCIDR netip.Prefix, podIP netip.Addr) error {
// These iptables rules implement routing such that a packet directed to the appPort will hit the agentPort instead.
// If there's no mesh this is simply request -> agent -> app (or intercept)
// However, if there's a service mesh we want to make sure we don't bypass the mesh, so the traffic
@@ -114,8 +114,8 @@ func (c *config) configureIptables(ctx context.Context, iptables *iptables.IPTab
// loop it back into the agent.
dlog.Debugf(ctx, "output DNAT %s:%d -> %s:%d", podIP, ac.ProxyPort(ic), podIP, ic.ContainerPort)
err = iptables.AppendUnique(nat, outputChain,
"-p", lcProto, "-d", podIP, "--dport", strconv.Itoa(int(ac.ProxyPort(ic))),
"-j", "DNAT", "--to-destination", net.JoinHostPort(podIP, strconv.Itoa(int(ic.ContainerPort))))
"-p", lcProto, "-d", podIP.String(), "--dport", strconv.Itoa(int(ac.ProxyPort(ic))),
"-j", "DNAT", "--to-destination", netip.AddrPortFrom(podIP, ic.ContainerPort).String())
if err != nil {
return fmt.Errorf("failed to append rule to %s: %w", outputChain, err)
}
@@ -155,7 +155,7 @@ func (c *config) configureIptables(ctx context.Context, iptables *iptables.IPTab
err = iptables.Insert(nat, "OUTPUT", 1,
"-o", loopback,
"-p", lcProto,
"!", "-d", localHostCIDR,
"!", "-d", localHostCIDR.String(),
"-m", "owner", "--uid-owner", agentUID,
"-j", outputChain)
if err != nil {
@@ -213,11 +213,15 @@ func Main(ctx context.Context, args ...string) error {
return err
}
proto := iptables.ProtocolIPv4
localhostCIDR := "127.0.0.1/32"
podIP := os.Getenv("POD_IP")
if len(iputil.Parse(podIP)) == 16 {
localhostCIDR := netip.PrefixFrom(netip.AddrFrom4([4]byte{127, 0, 0, 1}), 32)
podIP, err := netip.ParseAddr(os.Getenv("POD_IP"))
if err != nil {
dlog.Error(ctx, err)
return err
}
if podIP.Is6() {
proto = iptables.ProtocolIPv6
localhostCIDR = "::1/128"
localhostCIDR = netip.PrefixFrom(netip.IPv6Loopback(), 128)
}
it, err := iptables.NewWithProtocol(proto)
if err != nil {
6 changes: 5 additions & 1 deletion cmd/traffic/cmd/manager/cluster/info.go
Original file line number Diff line number Diff line change
@@ -348,7 +348,11 @@ func getInjectorSvcIP(ctx context.Context, env *managerutil.Env, client v1.CoreV
break
}
}
return iputil.Parse(sc.Spec.ClusterIP), p, nil
ip, err := netip.ParseAddr(sc.Spec.ClusterIP)
if err != nil {
return nil, 0, err
}
return ip.AsSlice(), p, nil
}

func (oi *info) watchPodSubnets(ctx context.Context) {
Loading

0 comments on commit c0f247a

Please sign in to comment.