Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(agent): Adding graphql introspection to agent #3984

Merged
merged 7 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 17 additions & 6 deletions agent/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,13 @@ type Client struct {
logger *zap.Logger
tracer trace.Tracer

stopListener func(context.Context, *proto.StopRequest) error
triggerListener func(context.Context, *proto.TriggerRequest) error
pollListener func(context.Context, *proto.PollingRequest) error
shutdownListener func(context.Context, *proto.ShutdownRequest) error
dataStoreConnectionListener func(context.Context, *proto.DataStoreConnectionTestRequest) error
otlpConnectionTestListener func(context.Context, *proto.OTLPConnectionTestRequest) error
stopListener func(context.Context, *proto.StopRequest) error
triggerListener func(context.Context, *proto.TriggerRequest) error
pollListener func(context.Context, *proto.PollingRequest) error
graphqlIntrospectionListener func(context.Context, *proto.GraphqlIntrospectRequest) error
shutdownListener func(context.Context, *proto.ShutdownRequest) error
dataStoreConnectionListener func(context.Context, *proto.DataStoreConnectionTestRequest) error
otlpConnectionTestListener func(context.Context, *proto.OTLPConnectionTestRequest) error
}

func (c *Client) Start(ctx context.Context) error {
Expand Down Expand Up @@ -107,6 +108,12 @@ func (c *Client) Start(ctx context.Context) error {
return err
}

err = c.startGraphqlIntrospectionListener(ctx)
if err != nil {
c.logger.Error("Failed to start graphql introspection listener", zap.Error(err))
return err
}

err = c.startOTLPConnectionTestListener(ctx)
if err != nil {
c.logger.Error("Failed to start OTLP connection test listener", zap.Error(err))
Expand Down Expand Up @@ -158,6 +165,10 @@ func (c *Client) OnPollingRequest(listener func(context.Context, *proto.PollingR
c.pollListener = listener
}

func (c *Client) OnGraphqlIntrospectionRequest(listener func(context.Context, *proto.GraphqlIntrospectRequest) error) {
c.graphqlIntrospectionListener = listener
}

func (c *Client) OnOTLPConnectionTest(listener func(context.Context, *proto.OTLPConnectionTestRequest) error) {
c.otlpConnectionTestListener = listener
}
Expand Down
67 changes: 51 additions & 16 deletions agent/client/mocks/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,19 @@ import (

type GrpcServerMock struct {
proto.UnimplementedOrchestratorServer
port int
triggerChannel chan Message[*proto.TriggerRequest]
pollingChannel chan Message[*proto.PollingRequest]
otlpConnectionTestChannel chan Message[*proto.OTLPConnectionTestRequest]
terminationChannel chan Message[*proto.ShutdownRequest]
dataStoreTestChannel chan Message[*proto.DataStoreConnectionTestRequest]

lastTriggerResponse Message[*proto.TriggerResponse]
lastPollingResponse Message[*proto.PollingResponse]
lastOtlpConnectionResponse Message[*proto.OTLPConnectionTestResponse]
lastDataStoreConnectionResponse Message[*proto.DataStoreConnectionTestResponse]
port int
triggerChannel chan Message[*proto.TriggerRequest]
pollingChannel chan Message[*proto.PollingRequest]
otlpConnectionTestChannel chan Message[*proto.OTLPConnectionTestRequest]
terminationChannel chan Message[*proto.ShutdownRequest]
dataStoreTestChannel chan Message[*proto.DataStoreConnectionTestRequest]
graphqlIntrospectionChannel chan Message[*proto.GraphqlIntrospectRequest]

lastTriggerResponse Message[*proto.TriggerResponse]
lastPollingResponse Message[*proto.PollingResponse]
lastOtlpConnectionResponse Message[*proto.OTLPConnectionTestResponse]
lastDataStoreConnectionResponse Message[*proto.DataStoreConnectionTestResponse]
lastGraphqlIntrospectionResponse Message[*proto.GraphqlIntrospectResponse]

server *grpc.Server
}
Expand All @@ -37,11 +39,12 @@ type Message[T any] struct {

func NewGrpcServer() *GrpcServerMock {
server := &GrpcServerMock{
triggerChannel: make(chan Message[*proto.TriggerRequest]),
pollingChannel: make(chan Message[*proto.PollingRequest]),
terminationChannel: make(chan Message[*proto.ShutdownRequest]),
dataStoreTestChannel: make(chan Message[*proto.DataStoreConnectionTestRequest]),
otlpConnectionTestChannel: make(chan Message[*proto.OTLPConnectionTestRequest]),
triggerChannel: make(chan Message[*proto.TriggerRequest]),
pollingChannel: make(chan Message[*proto.PollingRequest]),
terminationChannel: make(chan Message[*proto.ShutdownRequest]),
dataStoreTestChannel: make(chan Message[*proto.DataStoreConnectionTestRequest]),
otlpConnectionTestChannel: make(chan Message[*proto.OTLPConnectionTestRequest]),
graphqlIntrospectionChannel: make(chan Message[*proto.GraphqlIntrospectRequest]),
}
var wg sync.WaitGroup
wg.Add(1)
Expand Down Expand Up @@ -159,6 +162,21 @@ func (s *GrpcServerMock) RegisterDataStoreConnectionTestAgent(id *proto.AgentIde
}
}

func (s *GrpcServerMock) RegisterGraphqlIntrospectListener(id *proto.AgentIdentification, stream proto.Orchestrator_RegisterGraphqlIntrospectListenerServer) error {
if id.Token != "token" {
return fmt.Errorf("could not validate token")
}

for {
graphqlRequest := <-s.graphqlIntrospectionChannel

err := stream.Send(graphqlRequest.Data)
if err != nil {
log.Println("could not send polling request to agent: %w", err)
}
}
}

func (s *GrpcServerMock) RegisterOTLPConnectionTestListener(id *proto.AgentIdentification, stream proto.Orchestrator_RegisterOTLPConnectionTestListenerServer) error {
if id.Token != "token" {
return fmt.Errorf("could not validate token")
Expand Down Expand Up @@ -201,6 +219,15 @@ func (s *GrpcServerMock) SendPolledSpans(ctx context.Context, result *proto.Poll
return &proto.Empty{}, nil
}

func (s *GrpcServerMock) SendGraphqlIntrospectResult(ctx context.Context, result *proto.GraphqlIntrospectResponse) (*proto.Empty, error) {
if result.AgentIdentification == nil || result.AgentIdentification.Token != "token" {
return nil, fmt.Errorf("could not validate token")
}

s.lastGraphqlIntrospectionResponse = Message[*proto.GraphqlIntrospectResponse]{Data: result, Context: ctx}
return &proto.Empty{}, nil
}

func (s *GrpcServerMock) RegisterShutdownListener(_ *proto.AgentIdentification, stream proto.Orchestrator_RegisterShutdownListenerServer) error {
for {
shutdownRequest := <-s.terminationChannel
Expand All @@ -226,6 +253,10 @@ func (s *GrpcServerMock) SendDataStoreConnectionTestRequest(ctx context.Context,
s.dataStoreTestChannel <- Message[*proto.DataStoreConnectionTestRequest]{Context: ctx, Data: request}
}

func (s *GrpcServerMock) SendGraphqlIntrospectionRequest(ctx context.Context, request *proto.GraphqlIntrospectRequest) {
s.graphqlIntrospectionChannel <- Message[*proto.GraphqlIntrospectRequest]{Context: ctx, Data: request}
}

func (s *GrpcServerMock) SendOTLPConnectionTestRequest(ctx context.Context, request *proto.OTLPConnectionTestRequest) {
s.otlpConnectionTestChannel <- Message[*proto.OTLPConnectionTestRequest]{Context: ctx, Data: request}
}
Expand All @@ -246,6 +277,10 @@ func (s *GrpcServerMock) GetLastDataStoreConnectionResponse() Message[*proto.Dat
return s.lastDataStoreConnectionResponse
}

func (s *GrpcServerMock) GetLastGraphqlIntrospectionResponse() Message[*proto.GraphqlIntrospectResponse] {
return s.lastGraphqlIntrospectionResponse
}

func (s *GrpcServerMock) TerminateConnection(ctx context.Context, reason string) {
s.terminationChannel <- Message[*proto.ShutdownRequest]{
Context: ctx,
Expand Down
63 changes: 63 additions & 0 deletions agent/client/workflow_listen_for_graphql_introspection_requests.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package client

import (
"context"
"fmt"
"log"
"time"

"github.com/kubeshop/tracetest/agent/proto"
"github.com/kubeshop/tracetest/agent/telemetry"
"go.uber.org/zap"
)

func (c *Client) startGraphqlIntrospectionListener(ctx context.Context) error {
logger := c.logger.Named("graphqlIntrospectionListener")
logger.Debug("Starting")

client := proto.NewOrchestratorClient(c.conn)

stream, err := client.RegisterGraphqlIntrospectListener(ctx, c.sessionConfig.AgentIdentification)
if err != nil {
logger.Error("could not open agent stream", zap.Error(err))
return fmt.Errorf("could not open agent stream: %w", err)
}

go func() {
for {
req := proto.GraphqlIntrospectRequest{}
err := stream.RecvMsg(&req)
if err != nil {
logger.Error("could not get message from graphql introspection stream", zap.Error(err))
}
if isEndOfFileError(err) || isCancelledError(err) {
logger.Debug("graphql introspection stream closed")
return
}

reconnected, err := c.handleDisconnectionError(err, &req)
if reconnected {
logger.Warn("reconnected to graphql introspection stream")
return
}

if err != nil {
logger.Error("could not get message from graphql introspection stream", zap.Error(err))
log.Println("could not get message from graphql introspection stream: %w", err)
time.Sleep(1 * time.Second)
continue
}

// we want a new context per request, not to reuse the one from the stream
ctx := telemetry.InjectMetadataIntoContext(context.Background(), req.Metadata)
go func() {
err = c.graphqlIntrospectionListener(ctx, &req)
if err != nil {
logger.Error("could not handle graphql introspection request", zap.Error(err))
fmt.Println(err.Error())
}
}()
}
}()
return nil
}
23 changes: 23 additions & 0 deletions agent/client/workflow_send_graphql_introspection_result.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package client

import (
"context"
"fmt"

"github.com/kubeshop/tracetest/agent/proto"
"github.com/kubeshop/tracetest/agent/telemetry"
)

func (c *Client) SendGraphqlIntrospectionResult(ctx context.Context, response *proto.GraphqlIntrospectResponse) error {
client := proto.NewOrchestratorClient(c.conn)

response.AgentIdentification = c.sessionConfig.AgentIdentification
response.Metadata = telemetry.ExtractMetadataFromContext(ctx)

_, err := client.SendGraphqlIntrospectResult(ctx, response)
if err != nil {
return fmt.Errorf("could not send graphql introspection result request: %w", err)
}

return nil
}
Loading
Loading