Skip to content

Commit

Permalink
Proper workflow for mutliple agents.
Browse files Browse the repository at this point in the history
Signed-off-by: Thomas Hallgren <[email protected]>
  • Loading branch information
thallgren committed Jan 27, 2025
1 parent 3be97c1 commit 7eb431d
Show file tree
Hide file tree
Showing 12 changed files with 176 additions and 55 deletions.
30 changes: 25 additions & 5 deletions cmd/traffic/cmd/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@ import (
"fmt"
"io"
"net"
"os"
"os/signal"
"path/filepath"
"strings"
"time"

"github.com/pkg/sftp"
"golang.org/x/sys/unix"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -131,16 +134,31 @@ func sftpServer(ctx context.Context, sftpPortCh chan<- uint16) error {
func Main(ctx context.Context, _ ...string) error {
dlog.Infof(ctx, "Traffic Agent %s", version.Version)

ctx, cancel := context.WithCancel(ctx)
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, os.Interrupt, unix.SIGTERM)
defer func() {
signal.Stop(sigs)
cancel()
}()

go func() {
select {
case <-sigs:
cancel()
case <-ctx.Done():
}
}()

// Handle configuration
config, err := LoadConfig(ctx)
if err != nil {
return fmt.Errorf("unable to load config: %w", err)
}

g := dgroup.NewGroup(ctx, dgroup.GroupConfig{
EnableSignalHandling: true,
SoftShutdownTimeout: 10 * time.Second, // Agent must be able to depart.
})

s := NewState(config)
info, err := StartServices(ctx, g, config, s)
if err != nil {
Expand Down Expand Up @@ -246,9 +264,6 @@ func StartServices(ctx context.Context, g *dgroup.Group, config Config, srv Stat
if err != nil {
return err
}
defer func() {
_ = grpcListener.Close()
}()
grpcAddress := grpcListener.Addr().(*net.TCPAddr)
grpcPortCh <- uint16(grpcAddress.Port)

Expand All @@ -258,6 +273,11 @@ func StartServices(ctx context.Context, g *dgroup.Group, config Config, srv Stat
agent.RegisterAgentServer(grpcHandler, srv)
sc := &dhttp.ServerConfig{Handler: grpcHandler}
dlog.Info(ctx, "gRPC server started")
go func() {
<-ctx.Done()
dlog.Info(ctx, "gRPC server shutting down")
_ = grpcListener.Close()
}()
if err = sc.Serve(ctx, grpcListener); err != nil && ctx.Err() != nil {
err = nil // Normal shutdown
}
Expand Down
10 changes: 5 additions & 5 deletions cmd/traffic/cmd/manager/mutator/agent_injector.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,19 +200,19 @@ func (a *agentInjector) Inject(ctx context.Context, req *admission.AdmissionRequ

// Create patch operations to add the traffic-agent sidecar
if len(patches) > 0 {
dlog.Infof(ctx, "Injecting %d patches into pod %s.%s", len(patches), pod.Name, pod.Namespace)
if dlog.MaxLogLevel(ctx) >= dlog.LogLevelDebug {
dlog.Debugf(ctx, "Injecting %d patches into pod %s.%s", len(patches), pod.Name, pod.Namespace)
if dlog.MaxLogLevel(ctx) >= dlog.LogLevelTrace {
cns := strings.Builder{}
for i, cn := range pod.Spec.Containers {
cns.WriteString(fmt.Sprintf("%d %s\n", i, cn.Name))
}
dlog.Debugf(ctx, "Containers \n%s", cns.String())
dlog.Tracef(ctx, "Containers \n%s", cns.String())
if pj, err := json.Marshal(patches, jsontext.WithIndent(" ")); err == nil {
dlog.Debugf(ctx, "\n%s", string(pj))
dlog.Tracef(ctx, "\n%s", string(pj))
}
}
} else {
dlog.Infof(ctx, "Pod %s.%s was left untouched", pod.Name, pod.Namespace)
dlog.Debugf(ctx, "Pod %s.%s was left untouched", pod.Name, pod.Namespace)
}
return patches, nil
}
Expand Down
31 changes: 27 additions & 4 deletions cmd/traffic/cmd/manager/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ func (s *service) WatchIntercepts(session *rpc.SessionInfo, stream rpc.Manager_W
rpc.InterceptDispositionType_ACTIVE,
rpc.InterceptDispositionType_AGENT_ERROR:
// agent-owned state: include the intercept
dlog.Debugf(ctx, "Intercept %s.%s valid. Disposition: %s", info.Spec.Agent, info.Spec.Namespace, info.Disposition)
dlog.Debugf(ctx, "Intercept %s.%s valid. Disposition: %s, clientSession: %s", info.Spec.Agent, info.Spec.Namespace, info.Disposition, info.ClientSession.SessionId)
return true
case rpc.InterceptDispositionType_REMOVED:
dlog.Debugf(ctx, "Intercept %s.%s valid but removed", info.Spec.Agent, info.Spec.Namespace)
Expand All @@ -500,10 +500,33 @@ func (s *service) WatchIntercepts(session *rpc.SessionInfo, stream rpc.Manager_W
}
} else {
// sessionID refers to a client session.
filter = func(id string, info *rpc.InterceptInfo) bool {
return info.ClientSession.SessionId == sessionID &&
filter = func(id string, info *rpc.InterceptInfo) (ok bool) {
defer func() {
dlog.Debugf(ctx, "Client filter checking intercept %s.%s. Disposition: %s, clientSession: %s, result: %t",
info.Spec.Agent, info.Spec.Namespace, info.Disposition, info.ClientSession.SessionId, ok)
}()

if info.ClientSession.SessionId == sessionID &&
info.Disposition != rpc.InterceptDispositionType_REMOVED &&
!state.IsChildIntercept(info.Spec)
!state.IsChildIntercept(info.Spec) {
// Don't return intercepts that aren't in a "agent-owned" state.
switch info.Disposition {
case rpc.InterceptDispositionType_WAITING,
rpc.InterceptDispositionType_ACTIVE,
rpc.InterceptDispositionType_AGENT_ERROR:
// agent-owned state: include the intercept
dlog.Debugf(ctx, "Intercept %s.%s valid. Disposition: %s", info.Spec.Agent, info.Spec.Namespace, info.Disposition)
return true
case rpc.InterceptDispositionType_REMOVED:
dlog.Debugf(ctx, "Intercept %s.%s valid but removed", info.Spec.Agent, info.Spec.Namespace)
return true
default:
// otherwise: don't return this intercept
dlog.Debugf(ctx, "Intercept %s.%s is not in agent-owned state. Disposition: %s", info.Spec.Agent, info.Spec.Namespace, info.Disposition)
return false
}
}
return false
}
}
}
Expand Down
52 changes: 33 additions & 19 deletions cmd/traffic/cmd/manager/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,10 +302,7 @@ func (s *state) RemoveSession(ctx context.Context, sessionID string) {
defer sess.Cancel()
s.gcSessionIntercepts(ctx, sessionID)

agent, isAgent := s.agents.LoadAndDelete(sessionID)
if isAgent {
mutator.GetMap(s.backgroundCtx).Inactivate(agent.PodName)
} else if client, isClient := s.clients.LoadAndDelete(sessionID); isClient {
if client, isClient := s.clients.LoadAndDelete(sessionID); isClient {
scm := sess.(*clientSessionState).consumptionMetrics
atomic.AddUint64(&s.tunnelIngressCounter, scm.FromClientBytes.GetValue())
atomic.AddUint64(&s.tunnelEgressCounter, scm.ToClientBytes.GetValue())
Expand All @@ -321,6 +318,38 @@ func (s *state) gcSessionIntercepts(ctx context.Context, sessionID string) {
// 2. Don't have any agents (agent.PodIp == intercept.PodIp)
// Alternatively, if the intercept is still live but has been switched over to a different agent, send it back to
// WAITING state
if agent, ok := s.agents.Load(sessionID); ok {
dlog.Debugf(ctx, "Consolidating intercepts after removal of agent %s", agent.PodName)
mutator.GetMap(s.backgroundCtx).Inactivate(agent.PodName)
s.intercepts.Range(func(interceptID string, intercept *rpc.InterceptInfo) bool {
if intercept.Disposition == rpc.InterceptDispositionType_REMOVED {
return true
}
dlog.Debugf(ctx, "Checking intercept %q with podName %s", interceptID, intercept.PodName)
if errCode, errMsg := s.checkAgentsForIntercept(intercept); errCode != rpc.InterceptDispositionType_UNSPECIFIED {
// No agents are available, so the intercept is now dormant or in error.
dlog.Debugf(ctx, "Intercept %q has no available agents %s. Setting it disposition to %s", interceptID, errCode)

Check failure on line 331 in cmd/traffic/cmd/manager/state/state.go

View workflow job for this annotation

GitHub Actions / unit (ubuntu-latest)

printf: github.com/datawire/dlib/dlog.Debugf format %s reads arg #3, but call has 2 args (govet)

Check failure on line 331 in cmd/traffic/cmd/manager/state/state.go

View workflow job for this annotation

GitHub Actions / unit (macos-latest)

printf: github.com/datawire/dlib/dlog.Debugf format %s reads arg #3, but call has 2 args (govet)
s.UpdateIntercept(interceptID, func(intercept *rpc.InterceptInfo) {
intercept.Disposition = errCode
intercept.Message = errMsg
})
return true
}
if agent.PodIp == intercept.PodIp {
// The agent is about to die, but apparently more agents are present. Let
// some other agent pick it up then.
dlog.Debugf(ctx, "Intercept %q lost its agent pod %s. Setting it disposition to WAITING", interceptID, agent.PodName)
s.UpdateIntercept(interceptID, func(intercept *rpc.InterceptInfo) {
intercept.PodIp = ""
intercept.PodName = ""
intercept.Disposition = rpc.InterceptDispositionType_WAITING
})
}
return true
})
return
}

s.intercepts.Range(func(interceptID string, intercept *rpc.InterceptInfo) bool {
if intercept.Disposition == rpc.InterceptDispositionType_REMOVED {
return true
Expand All @@ -333,21 +362,6 @@ func (s *state) gcSessionIntercepts(ctx context.Context, sessionID string) {
s.allInterceptsFinalizerCall(client, &wl)
}
s.self.RemoveIntercept(ctx, interceptID)
} else if errCode, errMsg := s.checkAgentsForIntercept(intercept); errCode != 0 {
// Refcount went to zero:
// Tell the client, so that the client can tell us to delete it.
s.UpdateIntercept(interceptID, func(intercept *rpc.InterceptInfo) {
intercept.Disposition = errCode
intercept.Message = errMsg
})
} else {
if agent := s.GetAgent(sessionID); agent != nil && agent.PodIp == intercept.PodIp {
// The agent whose podIP was stored by the intercept is dead, but it's not the last agent
// Send it back to waiting so that one of the other agents can pick it up and set their own podIP
s.UpdateIntercept(interceptID, func(intercept *rpc.InterceptInfo) {
intercept.Disposition = rpc.InterceptDispositionType_WAITING
})
}
}
return true
})
Expand Down
33 changes: 30 additions & 3 deletions integration_test/itest/apply_app.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package itest

import (
"bytes"
"context"
"path/filepath"
"strconv"
Expand All @@ -10,7 +11,9 @@ import (
"github.com/go-json-experiment/json"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
core "k8s.io/api/core/v1"

"github.com/datawire/dlib/dlog"
"github.com/datawire/dlib/dtime"
"github.com/telepresenceio/telepresence/v2/pkg/dos"
)
Expand All @@ -22,9 +25,24 @@ func ApplyEchoService(ctx context.Context, name, namespace string, port int) {
func ApplyService(ctx context.Context, name, namespace, image string, port, targetPort int) {
t := getT(ctx)
t.Helper()
require.NoError(t, Kubectl(ctx, namespace, "create", "deploy", name, "--image", image), "failed to create deployment %s", name)
require.NoError(t, Kubectl(ctx, namespace, "expose", "deploy", name, "--port", strconv.Itoa(port), "--target-port", strconv.Itoa(targetPort)),
"failed to expose deployment %s", name)
tpl := &Generic{
Name: name,
Environment: []core.EnvVar{
{
Name: "PORTS",
Value: strconv.Itoa(targetPort),
},
},
TargetPort: strconv.Itoa(targetPort),
ServicePorts: []ServicePort{{
Number: port,
Protocol: "TCP",
TargetPort: strconv.Itoa(targetPort),
}},
ContainerPort: targetPort,
Image: image,
}
ApplyTemplate(ctx, namespace, "apply", filepath.Join("testdata", "k8s", "generic.goyaml"), &tpl)
require.NoError(t, Kubectl(ctx, namespace, "rollout", "status", "-w", "deployment/"+name), "failed to deploy %s", name)
}

Expand Down Expand Up @@ -86,6 +104,15 @@ func ApplyAppTemplate(ctx context.Context, namespace string, app *AppData) {
require.NoError(t, RolloutStatusWait(ctx, namespace, "deploy/"+wl))
}

func ApplyTemplate(ctx context.Context, namespace, action, path string, values any) {
yml, err := ReadTemplate(ctx, path, values)
require.NoError(getT(ctx), err)
if err = Kubectl(dos.WithStdin(ctx, bytes.NewReader(yml)), namespace, action, "-f", "-"); err != nil {
dlog.Errorf(ctx, "unable to %s %q", action, string(yml))
getT(ctx).Fatal(err)
}
}

func RolloutStatusWait(ctx context.Context, namespace, workload string) error {
ctx, cancel := context.WithTimeout(ctx, PodCreateTimeout(ctx))
defer cancel()
Expand Down
21 changes: 13 additions & 8 deletions integration_test/itest/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,17 +510,22 @@ func (s *cluster) CapturePodLogs(ctx context.Context, app, container, ns string)
}
present := struct{}{}

// Use another logger to avoid errors due to logs arriving after the tests complete.
ctx = dlog.WithLogger(ctx, dlog.WrapLogrus(logrus.StandardLogger()))
pod := pods[0]
key := pod
if container != "" {
key += "/" + container
var pod string
for i, key := range pods {
if container != "" {
key += "/" + container
}
if _, ok := s.logCapturingPods.LoadOrStore(key, present); !ok {
pod = pods[i]
break
}
}
if _, ok := s.logCapturingPods.LoadOrStore(key, present); ok {
return ""
if pod == "" {
return "" // All pods already captured
}

// Use another logger to avoid errors due to logs arriving after the tests complete.
ctx = dlog.WithLogger(ctx, dlog.WrapLogrus(logrus.StandardLogger()))
logFile, err := os.Create(
filepath.Join(filelocation.AppUserLogDir(ctx), fmt.Sprintf("%s-%s.log", dtime.Now().Format("20060102T150405"), pod)))
if err != nil {
Expand Down
11 changes: 8 additions & 3 deletions integration_test/podscaling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (s *interceptMountSuite) Test_StopInterceptedPodOfMany() {
assert.Eventually(
func() bool {
return len(s.runningPods(ctx)) == 1
}, 15*time.Second, time.Second)
}, time.Minute, time.Second)
s.CapturePodLogs(ctx, s.ServiceName(), "traffic-agent", s.AppNamespace())
}()

Expand All @@ -118,7 +118,7 @@ func (s *interceptMountSuite) Test_StopInterceptedPodOfMany() {
}
}
return len(pods) == 2
}, 15*time.Second, time.Second)
}, time.Minute, time.Second)
s.CapturePodLogs(ctx, s.ServiceName(), "traffic-agent", s.AppNamespace())

// Verify that intercept is still active
Expand All @@ -136,6 +136,7 @@ func (s *interceptMountSuite) Test_StopInterceptedPodOfMany() {
}, 15*time.Second, time.Second)

// Verify response from intercepting client
expect := s.ServiceName() + " from intercept at /"
require.Eventually(func() bool {
hc := http.Client{Timeout: time.Second}
resp, err := hc.Get("http://" + s.ServiceName())
Expand All @@ -147,7 +148,11 @@ func (s *interceptMountSuite) Test_StopInterceptedPodOfMany() {
if err != nil {
return false
}
return s.ServiceName()+" from intercept at /" == string(body)
if expect == string(body) {
return true
}
dlog.Infof(ctx, "%s != %s", expect, string(body))
return false
}, 30*time.Second, time.Second)

// Verify that volume mount is restored
Expand Down
6 changes: 5 additions & 1 deletion integration_test/testdata/k8s/generic.goyaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,11 @@ spec:
spec:
containers:
- name: backend
image: "{{ .Registry }}/{{ .Image }}"
{{- if .Registry }}
image: {{ .Registry }}/{{ .Image }}
{{- else }}
image: {{ .Image }}
{{- end}}
ports:
{{- range .ContainerPorts }}
- containerPort: {{ .Number }}
Expand Down
1 change: 1 addition & 0 deletions pkg/client/agentpf/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ func (ac *client) startDialWatcherReady(ctx context.Context) error {
if err != nil {
dlog.Error(ctx, err)
}
ac.cancel()
}()
return nil
}
Expand Down
11 changes: 9 additions & 2 deletions pkg/client/userd/trafficmgr/agents.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (s *session) watchAgentsLoop(ctx context.Context) error {

func (s *session) handleAgentSnapshot(ctx context.Context, infos []*manager.AgentInfo) {
s.ingestTracker.initSnapshot()
s.setCurrentAgents(infos)
s.setCurrentAgents(ctx, infos)

// infoForKey returns the AgentInfos that matches the ingestKey
infosForKey := func(key ingestKey) (ais []*manager.AgentInfo) {
Expand Down Expand Up @@ -79,7 +79,14 @@ func (s *session) getCurrentAgents() []*manager.AgentInfo {
return agents
}

func (s *session) setCurrentAgents(agents []*manager.AgentInfo) {
func (s *session) setCurrentAgents(ctx context.Context, agents []*manager.AgentInfo) {
if dlog.MaxLogLevel(ctx) >= dlog.LogLevelDebug {
names := make([]string, len(agents))
for i, agent := range agents {
names[i] = agent.PodName
}
dlog.Debugf(ctx, "Set current agents: %v", names)
}
s.currentInterceptsLock.Lock()
s.currentAgents = agents
s.currentInterceptsLock.Unlock()
Expand Down
Loading

0 comments on commit 7eb431d

Please sign in to comment.