Skip to content

Commit

Permalink
updates to trace mode
Browse files Browse the repository at this point in the history
  • Loading branch information
xoscar committed Sep 27, 2024
1 parent f0794ea commit 570ca34
Show file tree
Hide file tree
Showing 24 changed files with 964 additions and 2,284 deletions.
12 changes: 0 additions & 12 deletions agent/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@ type Client struct {
graphqlIntrospectionListener func(context.Context, *proto.GraphqlIntrospectRequest) error
shutdownListener func(context.Context, *proto.ShutdownRequest) error
dataStoreConnectionListener func(context.Context, *proto.DataStoreConnectionTestRequest) error
traceModeListener func(context.Context, *proto.TraceModeRequest) error
getTraceListener func(context.Context, *proto.GetTraceRequest) error
otlpConnectionTestListener func(context.Context, *proto.OTLPConnectionTestRequest) error
}

Expand Down Expand Up @@ -128,12 +126,6 @@ func (c *Client) Start(ctx context.Context) error {
return err
}

err = c.startTraceModeListener(ctx)
if err != nil {
c.logger.Error("Failed to start list traces listener", zap.Error(err))
return err
}

c.logger.Debug("ControlPlane client started")

return nil
Expand Down Expand Up @@ -169,10 +161,6 @@ func (c *Client) OnDataStoreTestConnectionRequest(listener func(context.Context,
c.dataStoreConnectionListener = listener
}

func (c *Client) OnTraceModeRequest(listener func(context.Context, *proto.TraceModeRequest) error) {
c.traceModeListener = listener
}

func (c *Client) OnPollingRequest(listener func(context.Context, *proto.PollingRequest) error) {
c.pollListener = listener
}
Expand Down
35 changes: 0 additions & 35 deletions agent/client/mocks/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,12 @@ type GrpcServerMock struct {
terminationChannel chan Message[*proto.ShutdownRequest]
dataStoreTestChannel chan Message[*proto.DataStoreConnectionTestRequest]
graphqlIntrospectionChannel chan Message[*proto.GraphqlIntrospectRequest]
traceModeChannel chan Message[*proto.TraceModeRequest]

lastTriggerResponse Message[*proto.TriggerResponse]
lastPollingResponse Message[*proto.PollingResponse]
lastOtlpConnectionResponse Message[*proto.OTLPConnectionTestResponse]
lastDataStoreConnectionResponse Message[*proto.DataStoreConnectionTestResponse]
lastGraphqlIntrospectionResponse Message[*proto.GraphqlIntrospectResponse]
lastTraceModeResponse Message[*proto.TraceModeResponse]

server *grpc.Server
}
Expand All @@ -47,7 +45,6 @@ func NewGrpcServer() *GrpcServerMock {
dataStoreTestChannel: make(chan Message[*proto.DataStoreConnectionTestRequest]),
otlpConnectionTestChannel: make(chan Message[*proto.OTLPConnectionTestRequest]),
graphqlIntrospectionChannel: make(chan Message[*proto.GraphqlIntrospectRequest]),
traceModeChannel: make(chan Message[*proto.TraceModeRequest]),
}
var wg sync.WaitGroup
wg.Add(1)
Expand Down Expand Up @@ -180,21 +177,6 @@ func (s *GrpcServerMock) RegisterGraphqlIntrospectListener(id *proto.AgentIdenti
}
}

func (s *GrpcServerMock) RegisterTraceModeAgent(id *proto.AgentIdentification, stream proto.Orchestrator_RegisterTraceModeAgentServer) error {
if id.Token != "token" {
return fmt.Errorf("could not validate token")
}

for {
traceModeRequest := <-s.traceModeChannel

err := stream.Send(traceModeRequest.Data)
if err != nil {
log.Println("could not trace mode request to agent: %w", err)
}
}
}

func (s *GrpcServerMock) RegisterOTLPConnectionTestListener(id *proto.AgentIdentification, stream proto.Orchestrator_RegisterOTLPConnectionTestListenerServer) error {
if id.Token != "token" {
return fmt.Errorf("could not validate token")
Expand Down Expand Up @@ -246,15 +228,6 @@ func (s *GrpcServerMock) SendGraphqlIntrospectResult(ctx context.Context, result
return &proto.Empty{}, nil
}

func (s *GrpcServerMock) SendTraceModeResponse(ctx context.Context, result *proto.TraceModeResponse) (*proto.Empty, error) {
if result.AgentIdentification == nil || result.AgentIdentification.Token != "token" {
return nil, fmt.Errorf("could not validate token")
}

s.lastTraceModeResponse = Message[*proto.TraceModeResponse]{Data: result, Context: ctx}
return &proto.Empty{}, nil
}

func (s *GrpcServerMock) RegisterShutdownListener(_ *proto.AgentIdentification, stream proto.Orchestrator_RegisterShutdownListenerServer) error {
for {
shutdownRequest := <-s.terminationChannel
Expand Down Expand Up @@ -284,10 +257,6 @@ func (s *GrpcServerMock) SendGraphqlIntrospectionRequest(ctx context.Context, re
s.graphqlIntrospectionChannel <- Message[*proto.GraphqlIntrospectRequest]{Context: ctx, Data: request}
}

func (s *GrpcServerMock) SendTraceModeRequest(ctx context.Context, request *proto.TraceModeRequest) {
s.traceModeChannel <- Message[*proto.TraceModeRequest]{Context: ctx, Data: request}
}

func (s *GrpcServerMock) SendOTLPConnectionTestRequest(ctx context.Context, request *proto.OTLPConnectionTestRequest) {
s.otlpConnectionTestChannel <- Message[*proto.OTLPConnectionTestRequest]{Context: ctx, Data: request}
}
Expand All @@ -312,10 +281,6 @@ func (s *GrpcServerMock) GetLastGraphqlIntrospectionResponse() Message[*proto.Gr
return s.lastGraphqlIntrospectionResponse
}

func (s *GrpcServerMock) GetLastTraceModeResponse() Message[*proto.TraceModeResponse] {
return s.lastTraceModeResponse
}

func (s *GrpcServerMock) TerminateConnection(ctx context.Context, reason string) {
s.terminationChannel <- Message[*proto.ShutdownRequest]{
Context: ctx,
Expand Down
63 changes: 0 additions & 63 deletions agent/client/workflow_listen_for_trace_mode_requests.go

This file was deleted.

4 changes: 2 additions & 2 deletions agent/client/workflow_send_trace_mode_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ import (
"github.com/kubeshop/tracetest/agent/telemetry"
)

func (c *Client) SendTraceModeResponse(ctx context.Context, response *proto.TraceModeResponse) error {
func (c *Client) SendTraces(ctx context.Context, response *proto.ExportRequest) error {
client := proto.NewOrchestratorClient(c.conn)

response.AgentIdentification = c.sessionConfig.AgentIdentification
response.Metadata = telemetry.ExtractMetadataFromContext(ctx)

_, err := client.SendTraceModeResponse(ctx, response)
_, err := client.Export(ctx, response)
if err != nil {
return fmt.Errorf("could not send list traces result request: %w", err)
}
Expand Down
8 changes: 7 additions & 1 deletion agent/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ func WithSensor(sensor sensors.Sensor) CollectorOption {
}
}

func WithTraceModeForwarder(traceModeForwarder TraceModeForwarder) CollectorOption {
return func(ric *remoteIngesterConfig) {
ric.traceModeForwarder = traceModeForwarder
}
}

type collector struct {
grpcServer stoppable
httpServer stoppable
Expand Down Expand Up @@ -114,7 +120,7 @@ func Start(ctx context.Context, config Config, tracer trace.Tracer, opts ...Coll
opt(&ingesterConfig)
}

ingester, err := newForwardIngester(ctx, config.BatchTimeout, ingesterConfig, ingesterConfig.startRemoteServer)
ingester, err := newForwardIngester(ctx, config.BatchTimeout, ingesterConfig, ingesterConfig.traceModeForwarder, ingesterConfig.startRemoteServer)
if err != nil {
return nil, fmt.Errorf("could not start local collector: %w", err)
}
Expand Down
63 changes: 38 additions & 25 deletions agent/collector/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,20 @@ type ingester interface {
SetSensor(sensors.Sensor)
}

func newForwardIngester(ctx context.Context, batchTimeout time.Duration, cfg remoteIngesterConfig, startRemoteServer bool) (ingester, error) {
type TraceModeForwarder interface {
Export(ctx context.Context, request *pb.ExportTraceServiceRequest) error
}

func newForwardIngester(ctx context.Context, batchTimeout time.Duration, cfg remoteIngesterConfig, traceModeForwarder TraceModeForwarder, startRemoteServer bool) (ingester, error) {
ingester := &forwardIngester{
BatchTimeout: batchTimeout,
RemoteIngester: cfg,
traceIDs: make(map[string]bool, 0),
done: make(chan bool),
traceCache: cfg.traceCache,
logger: cfg.logger,
sensor: cfg.sensor,
BatchTimeout: batchTimeout,
RemoteIngester: cfg,
traceIDs: make(map[string]bool, 0),
done: make(chan bool),
traceCache: cfg.traceCache,
logger: cfg.logger,
sensor: cfg.sensor,
traceModeForwarder: traceModeForwarder,
}

return ingester, nil
Expand All @@ -53,29 +58,31 @@ type Statistics struct {
// forwardIngester forwards all incoming spans to a remote ingester. It also batches those
// spans to reduce network traffic.
type forwardIngester struct {
BatchTimeout time.Duration
RemoteIngester remoteIngesterConfig
mutex sync.Mutex
traceIDs map[string]bool
done chan bool
traceCache TraceCache
logger *zap.Logger
sensor sensors.Sensor
BatchTimeout time.Duration
RemoteIngester remoteIngesterConfig
mutex sync.Mutex
traceIDs map[string]bool
done chan bool
traceCache TraceCache
logger *zap.Logger
sensor sensors.Sensor
traceModeForwarder TraceModeForwarder

statistics Statistics

sync.Mutex
}

type remoteIngesterConfig struct {
URL string
Token string
traceCache TraceCache
startRemoteServer bool
logger *zap.Logger
observer event.Observer
sensor sensors.Sensor
traceMode bool
URL string
Token string
traceCache TraceCache
startRemoteServer bool
logger *zap.Logger
observer event.Observer
sensor sensors.Sensor
traceMode bool
traceModeForwarder TraceModeForwarder
}

func (i *forwardIngester) Statistics() Statistics {
Expand All @@ -92,6 +99,12 @@ func (i *forwardIngester) SetSensor(sensor sensors.Sensor) {

func (i *forwardIngester) Ingest(ctx context.Context, request *pb.ExportTraceServiceRequest, requestType otlp.RequestType) (*pb.ExportTraceServiceResponse, error) {
go i.ingestSpans(request)
if i.RemoteIngester.traceMode {
err := i.traceModeForwarder.Export(ctx, request)
if err != nil {
i.logger.Error("failed to forward spans to trace mode", zap.Error(err))
}
}

return &pb.ExportTraceServiceResponse{
PartialSuccess: &pb.ExportTracePartialSuccess{
Expand Down Expand Up @@ -166,7 +179,7 @@ func (i *forwardIngester) cacheTestSpans(resourceSpans []*v1.ResourceSpans) {
i.Lock()
i.traceIDs[traceID] = true
i.Unlock()
if _, ok := i.traceCache.Get(traceID); !ok && !i.RemoteIngester.traceMode {
if _, ok := i.traceCache.Get(traceID); !ok {
i.logger.Debug("traceID is not part of a test", zap.String("traceID", traceID))
// traceID is not part of a test
continue
Expand Down
Loading

0 comments on commit 570ca34

Please sign in to comment.