diff --git a/v3/integrations/nramqp/examples/consumer/main.go b/v3/integrations/nramqp/examples/consumer/main.go index 5cfc92ec4..ae44c4b85 100644 --- a/v3/integrations/nramqp/examples/consumer/main.go +++ b/v3/integrations/nramqp/examples/consumer/main.go @@ -32,7 +32,8 @@ func main() { nrApp.WaitForConnection(time.Second * 5) - conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") + amqpURL := "amqp://guest:guest@localhost:5672/" + conn, err := amqp.Dial(amqpURL) failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() diff --git a/v3/integrations/nramqp/examples/publisher/main.go b/v3/integrations/nramqp/examples/publisher/main.go index 445947a08..295071b82 100644 --- a/v3/integrations/nramqp/examples/publisher/main.go +++ b/v3/integrations/nramqp/examples/publisher/main.go @@ -40,13 +40,15 @@ type amqpServer struct { ch *amqp.Channel exchange string routingKey string + url string } -func NewServer(channel *amqp.Channel, exchangeName, routingKeyName string) *amqpServer { +func NewServer(channel *amqp.Channel, exchangeName, routingKeyName string, url string) *amqpServer { return &amqpServer{ channel, exchangeName, routingKeyName, + url, } } @@ -65,6 +67,7 @@ func (serv *amqpServer) publishPlainTxtMessage(w http.ResponseWriter, r *http.Re ctx, serv.exchange, // exchange serv.routingKey, // routing key + serv.url, // url false, // mandatory false, // immediate amqp.Publishing{ @@ -94,7 +97,8 @@ func main() { nrApp.WaitForConnection(time.Second * 5) - conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") + amqpURL := "amqp://guest:guest@localhost:5672/" + conn, err := amqp.Dial(amqpURL) failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() @@ -112,7 +116,7 @@ func main() { ) failOnError(err, "Failed to declare a queue") - server := NewServer(ch, "", q.Name) + server := NewServer(ch, "", q.Name, amqpURL) http.HandleFunc(newrelic.WrapHandleFunc(nrApp, "/", server.index)) http.HandleFunc(newrelic.WrapHandleFunc(nrApp, "/message", server.publishPlainTxtMessage)) diff --git a/v3/integrations/nramqp/nramqp.go b/v3/integrations/nramqp/nramqp.go index 2be8e7634..8e2094dbe 100644 --- a/v3/integrations/nramqp/nramqp.go +++ b/v3/integrations/nramqp/nramqp.go @@ -2,6 +2,7 @@ package nramqp import ( "context" + "strings" amqp "github.com/rabbitmq/amqp091-go" @@ -16,7 +17,7 @@ const ( func init() { internal.TrackUsage("integration", "messagebroker", "nramqp") } -func creatProducerSegment(exchange, key string) *newrelic.MessageProducerSegment { +func createProducerSegment(exchange, key string) *newrelic.MessageProducerSegment { s := newrelic.MessageProducerSegment{ Library: RabbitMQLibrary, DestinationName: "Default", @@ -33,13 +34,34 @@ func creatProducerSegment(exchange, key string) *newrelic.MessageProducerSegment return &s } +func GetHostAndPortFromURL(url string) (string, string) { + // url is of format amqp://user:password@host:port or amqp://host:port + var hostPortPart string + + // extract the part after "@" symbol, if present + if parts := strings.Split(url, "@"); len(parts) == 2 { + hostPortPart = parts[1] + } else { + // assume the whole url after "amqp://" is the host:port part + hostPortPart = strings.TrimPrefix(url, "amqp://") + } + + // split the host:port part + strippedURL := strings.Split(hostPortPart, ":") + if len(strippedURL) != 2 { + return "", "" + } + return strippedURL[0], strippedURL[1] +} + // PublishedWithContext looks for a newrelic transaction in the context object, and if found, creates a message producer segment. // It will also inject distributed tracing headers into the message. -func PublishWithContext(ch *amqp.Channel, ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error { +func PublishWithContext(ch *amqp.Channel, ctx context.Context, exchange, key, url string, mandatory, immediate bool, msg amqp.Publishing) error { + host, port := GetHostAndPortFromURL(url) txn := newrelic.FromContext(ctx) if txn != nil { // generate message broker segment - s := creatProducerSegment(exchange, key) + s := createProducerSegment(exchange, key) // capture telemetry for AMQP producer if msg.Headers != nil && len(msg.Headers) > 0 { @@ -49,15 +71,18 @@ func PublishWithContext(ch *amqp.Channel, ctx context.Context, exchange, key str } integrationsupport.AddAgentSpanAttribute(txn, newrelic.AttributeMessageHeaders, hdrStr) } + s.StartTime = txn.StartSegmentNow() + // inject DT headers into headers object + msg.Headers = injectDtHeaders(txn, msg.Headers) + integrationsupport.AddAgentSpanAttribute(txn, newrelic.AttributeSpanKind, "producer") + integrationsupport.AddAgentSpanAttribute(txn, newrelic.AttributeServerAddress, host) + integrationsupport.AddAgentSpanAttribute(txn, newrelic.AttributeServerPort, port) + integrationsupport.AddAgentSpanAttribute(txn, newrelic.AttributeMessageDestinationName, exchange) integrationsupport.AddAgentSpanAttribute(txn, newrelic.AttributeMessageRoutingKey, key) integrationsupport.AddAgentSpanAttribute(txn, newrelic.AttributeMessageCorrelationID, msg.CorrelationId) integrationsupport.AddAgentSpanAttribute(txn, newrelic.AttributeMessageReplyTo, msg.ReplyTo) - // inject DT headers into headers object - msg.Headers = injectDtHeaders(txn, msg.Headers) - - s.StartTime = txn.StartSegmentNow() err := ch.PublishWithContext(ctx, exchange, key, mandatory, immediate, msg) s.End() return err @@ -91,8 +116,10 @@ func Consume(app *newrelic.Application, ch *amqp.Channel, queue, consumer string integrationsupport.AddAgentAttribute(txn, newrelic.AttributeMessageHeaders, hdrStr, nil) } } - + integrationsupport.AddAgentAttribute(txn, newrelic.AttributeSpanKind, "consumer", nil) integrationsupport.AddAgentAttribute(txn, newrelic.AttributeMessageQueueName, queue, nil) + integrationsupport.AddAgentAttribute(txn, newrelic.AttributeMessageDestinationName, queue, nil) + integrationsupport.AddAgentAttribute(txn, newrelic.AttributeMessagingDestinationPublishName, delivery.Exchange, nil) integrationsupport.AddAgentAttribute(txn, newrelic.AttributeMessageRoutingKey, delivery.RoutingKey, nil) integrationsupport.AddAgentAttribute(txn, newrelic.AttributeMessageCorrelationID, delivery.CorrelationId, nil) integrationsupport.AddAgentAttribute(txn, newrelic.AttributeMessageReplyTo, delivery.ReplyTo, nil) diff --git a/v3/integrations/nramqp/nramqp_test.go b/v3/integrations/nramqp/nramqp_test.go index 3db9e4ce9..213a4cc5c 100644 --- a/v3/integrations/nramqp/nramqp_test.go +++ b/v3/integrations/nramqp/nramqp_test.go @@ -15,7 +15,7 @@ func BenchmarkCreateProducerSegment(b *testing.B) { b.ReportAllocs() for i := 0; i < b.N; i++ { - creatProducerSegment("exchange", "key") + createProducerSegment("exchange", "key") } } @@ -66,7 +66,7 @@ func TestCreateProducerSegment(t *testing.T) { } for _, test := range tests { - s := creatProducerSegment(test.exchange, test.key) + s := createProducerSegment(test.exchange, test.key) if s.DestinationName != test.expect.DestinationName { t.Errorf("expected destination name %s, got %s", test.expect.DestinationName, s.DestinationName) } @@ -76,3 +76,55 @@ func TestCreateProducerSegment(t *testing.T) { } } + +func TestHostAndPortParsing(t *testing.T) { + app := createTestApp() + txn := app.StartTransaction("test") + defer txn.End() + + type testObject struct { + url string + expectHost string + expectPort string + } + + tests := []testObject{ + { + "amqp://user:password@host:port", + "host", + "port", + }, + { + "amqp://host:port", + "host", + "port", + }, + { + "aaa://host:port", + "", + "", + }, + + { + "amqp://user:password@host", + "", + "", + }, + { + "amqp://user:password@host:port:extra", + "", + "", + }, + } + + for _, test := range tests { + host, port := GetHostAndPortFromURL(test.url) + if host != test.expectHost { + t.Errorf("expected host %s, got %s", test.expectHost, host) + } + if port != test.expectPort { + t.Errorf("expected port %s, got %s", test.expectPort, port) + } + } + +} diff --git a/v3/integrations/nrawssdk-v2/go.mod b/v3/integrations/nrawssdk-v2/go.mod index 2a81df60a..a286df982 100644 --- a/v3/integrations/nrawssdk-v2/go.mod +++ b/v3/integrations/nrawssdk-v2/go.mod @@ -2,17 +2,45 @@ module github.com/newrelic/go-agent/v3/integrations/nrawssdk-v2 // As of May 2021, the aws-sdk-go-v2 go.mod file uses 1.15: // https://github.com/aws/aws-sdk-go-v2/blob/master/go.mod -go 1.20 +go 1.21 + +toolchain go1.21.0 require ( - github.com/aws/aws-sdk-go-v2 v1.16.15 - github.com/aws/aws-sdk-go-v2/config v1.17.6 - github.com/aws/aws-sdk-go-v2/service/dynamodb v1.17.0 - github.com/aws/aws-sdk-go-v2/service/lambda v1.24.5 - github.com/aws/aws-sdk-go-v2/service/s3 v1.27.10 - github.com/aws/smithy-go v1.13.3 + github.com/aws/aws-sdk-go-v2 v1.30.4 + github.com/aws/aws-sdk-go-v2/config v1.27.31 + github.com/aws/aws-sdk-go-v2/service/dynamodb v1.34.6 + github.com/aws/aws-sdk-go-v2/service/lambda v1.58.1 + github.com/aws/aws-sdk-go-v2/service/s3 v1.61.0 + github.com/aws/aws-sdk-go-v2/service/sqs v1.34.6 + github.com/aws/smithy-go v1.20.4 github.com/newrelic/go-agent/v3 v3.33.1 ) +require ( + github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.4 // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.17.30 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.12 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.16 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.16 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 // indirect + github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.16 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.4 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.18 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.9.17 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.18 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.16 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.22.5 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.5 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.30.5 // indirect + github.com/golang/protobuf v1.5.3 // indirect + github.com/jmespath/go-jmespath v0.4.0 // indirect + golang.org/x/net v0.9.0 // indirect + golang.org/x/sys v0.7.0 // indirect + golang.org/x/text v0.9.0 // indirect + google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect + google.golang.org/grpc v1.56.3 // indirect + google.golang.org/protobuf v1.30.0 // indirect +) replace github.com/newrelic/go-agent/v3 => ../.. diff --git a/v3/integrations/nrawssdk-v2/nrawssdk.go b/v3/integrations/nrawssdk-v2/nrawssdk.go index a6c151588..21ae60461 100644 --- a/v3/integrations/nrawssdk-v2/nrawssdk.go +++ b/v3/integrations/nrawssdk-v2/nrawssdk.go @@ -28,9 +28,14 @@ package nrawssdk import ( "context" + "net/url" "strconv" + "strings" + "github.com/aws/aws-sdk-go-v2/aws" awsmiddle "github.com/aws/aws-sdk-go-v2/aws/middleware" + "github.com/aws/aws-sdk-go-v2/service/sqs" + "github.com/aws/smithy-go/middleware" smithymiddle "github.com/aws/smithy-go/middleware" smithyhttp "github.com/aws/smithy-go/transport/http" "github.com/newrelic/go-agent/v3/internal/integrationsupport" @@ -41,6 +46,11 @@ type nrMiddleware struct { txn *newrelic.Transaction } +// Context key for SQS service queue +type contextKey string + +const queueURLKey contextKey = "QueueURL" + type endable interface{ End() } // See https://aws.github.io/aws-sdk-go-v2/docs/middleware/ for a description of @@ -88,6 +98,24 @@ func (m nrMiddleware) deserializeMiddleware(stack *smithymiddle.Stack) error { response, ok := out.RawResponse.(*smithyhttp.Response) if ok { + if serviceName == "sqs" || serviceName == "SQS" { + if queueURL, ok := ctx.Value(queueURLKey).(string); ok { + parsedURL, err := url.Parse(queueURL) + if err == nil { + // Example URL: https://sqs.{region}.amazonaws.com/{account.id}/{queue.name} + pathParts := strings.Split(parsedURL.Path, "/") + if len(pathParts) >= 3 { + accountID := pathParts[1] + queueName := pathParts[2] + integrationsupport.AddAgentSpanAttribute(txn, newrelic.AttributeCloudAccountID, accountID) + integrationsupport.AddAgentSpanAttribute(txn, newrelic.AttributeCloudRegion, region) + integrationsupport.AddAgentSpanAttribute(txn, newrelic.AttributeMessageSystem, "aws_sqs") + integrationsupport.AddAgentSpanAttribute(txn, newrelic.AttributeMessageDestinationName, queueName) + } + } + + } + } // Set additional span attributes integrationsupport.AddAgentSpanAttribute(txn, newrelic.AttributeResponseCode, strconv.Itoa(response.StatusCode)) @@ -107,6 +135,51 @@ func (m nrMiddleware) deserializeMiddleware(stack *smithymiddle.Stack) error { smithymiddle.Before) } +func (m nrMiddleware) serializeMiddleware(stack *middleware.Stack) error { + return stack.Initialize.Add(middleware.InitializeMiddlewareFunc("NRSerializeMiddleware", func( + ctx context.Context, in middleware.InitializeInput, next middleware.InitializeHandler) ( + out middleware.InitializeOutput, metadata middleware.Metadata, err error) { + + serviceName := awsmiddle.GetServiceID(ctx) + if serviceName == "sqs" || serviceName == "SQS" { + QueueURL := "" + switch params := in.Parameters.(type) { + case *sqs.SendMessageInput: + QueueURL = aws.ToString(params.QueueUrl) + case *sqs.DeleteQueueInput: + QueueURL = aws.ToString(params.QueueUrl) + case *sqs.ReceiveMessageInput: + QueueURL = aws.ToString(params.QueueUrl) + case *sqs.DeleteMessageInput: + QueueURL = aws.ToString(params.QueueUrl) + case *sqs.ChangeMessageVisibilityInput: + QueueURL = aws.ToString(params.QueueUrl) + case *sqs.ChangeMessageVisibilityBatchInput: + QueueURL = aws.ToString(params.QueueUrl) + case *sqs.DeleteMessageBatchInput: + QueueURL = aws.ToString(params.QueueUrl) + case *sqs.SendMessageBatchInput: + QueueURL = aws.ToString(params.QueueUrl) + case *sqs.PurgeQueueInput: + QueueURL = aws.ToString(params.QueueUrl) + case *sqs.GetQueueAttributesInput: + QueueURL = aws.ToString(params.QueueUrl) + case *sqs.SetQueueAttributesInput: + QueueURL = aws.ToString(params.QueueUrl) + case *sqs.TagQueueInput: + QueueURL = aws.ToString(params.QueueUrl) + case *sqs.UntagQueueInput: + QueueURL = aws.ToString(params.QueueUrl) + default: + QueueURL = "" + } + // Store the QueueURL in the context + ctx = context.WithValue(ctx, queueURLKey, QueueURL) + } + return next.HandleInitialize(ctx, in) + }), middleware.After) +} + // AppendMiddlewares inserts New Relic middleware in the given `apiOptions` for // the AWS SDK V2 for Go. It must be called only once per AWS configuration. // @@ -167,4 +240,6 @@ func (m nrMiddleware) deserializeMiddleware(stack *smithymiddle.Stack) error { func AppendMiddlewares(apiOptions *[]func(*smithymiddle.Stack) error, txn *newrelic.Transaction) { m := nrMiddleware{txn: txn} *apiOptions = append(*apiOptions, m.deserializeMiddleware) + *apiOptions = append(*apiOptions, m.serializeMiddleware) + } diff --git a/v3/integrations/nrawssdk-v2/nrawssdk_test.go b/v3/integrations/nrawssdk-v2/nrawssdk_test.go index bd3d8cde3..0385559ec 100644 --- a/v3/integrations/nrawssdk-v2/nrawssdk_test.go +++ b/v3/integrations/nrawssdk-v2/nrawssdk_test.go @@ -17,9 +17,14 @@ import ( "github.com/aws/aws-sdk-go-v2/service/dynamodb" "github.com/aws/aws-sdk-go-v2/service/lambda" "github.com/aws/aws-sdk-go-v2/service/lambda/types" + + "github.com/aws/aws-sdk-go-v2/service/sqs" + "github.com/newrelic/go-agent/v3/internal" "github.com/newrelic/go-agent/v3/internal/integrationsupport" "github.com/newrelic/go-agent/v3/newrelic" + + sqstypes "github.com/aws/aws-sdk-go-v2/service/sqs/types" ) func testApp() integrationsupport.ExpectApp { @@ -140,6 +145,28 @@ var ( "http.statusCode": "200", }, } + SQSSpan = internal.WantEvent{ + Intrinsics: map[string]interface{}{ + "name": "External/sqs.us-west-2.amazonaws.com/http/POST", + "category": "http", + "parentId": internal.MatchAnything, + "component": "http", + "span.kind": "client", + "sampled": true, + }, + UserAttributes: map[string]interface{}{}, + AgentAttributes: map[string]interface{}{ + "message.destination.name": "MyQueue", + "cloud.account.id": "123456789012", + "cloud.region": "us-west-2", + "http.url": "https://sqs.us-west-2.amazonaws.com/", + "http.method": "POST", + "messaging.system": "aws_sqs", + "aws.requestId": "testing request id", + "http.statusCode": "200", + "aws.region": "us-west-2", + }, + } datastoreSpan = internal.WantEvent{ Intrinsics: map[string]interface{}{ "name": "Datastore/operation/DynamoDB/DescribeTable", @@ -258,6 +285,262 @@ func TestInstrumentRequestExternal(t *testing.T) { ) } +type sqsTestTableEntry struct { + Name string + BuildContext func(txn *newrelic.Transaction) context.Context + BuildConfig func(ctx context.Context, txn *newrelic.Transaction) aws.Config + Input interface{} +} + +func runSQSTestTable(t *testing.T, entries []*sqsTestTableEntry, testFunc func(t *testing.T, entry *sqsTestTableEntry)) { + for _, entry := range entries { + t.Run(entry.Name, func(t *testing.T) { + testFunc(t, entry) + }) + } +} + +func TestSQSMiddleware(t *testing.T) { + runSQSTestTable(t, + []*sqsTestTableEntry{ + { + Name: "DeleteQueueInput", + BuildContext: func(txn *newrelic.Transaction) context.Context { + return context.Background() + }, + BuildConfig: func(ctx context.Context, txn *newrelic.Transaction) aws.Config { + return newConfig(ctx, txn) + }, + Input: &sqs.DeleteQueueInput{QueueUrl: aws.String("https://sqs.us-west-2.amazonaws.com/123456789012/MyQueue")}, + }, + { + Name: "ReceiveMessageInput", + BuildContext: func(txn *newrelic.Transaction) context.Context { + return context.Background() + }, + BuildConfig: func(ctx context.Context, txn *newrelic.Transaction) aws.Config { + return newConfig(ctx, txn) + }, + Input: &sqs.ReceiveMessageInput{QueueUrl: aws.String("https://sqs.us-west-2.amazonaws.com/123456789012/MyQueue")}, + }, + { + Name: "SendMessageInput", + BuildContext: func(txn *newrelic.Transaction) context.Context { + return context.Background() + }, + BuildConfig: func(ctx context.Context, txn *newrelic.Transaction) aws.Config { + return newConfig(ctx, txn) + }, + Input: &sqs.SendMessageInput{QueueUrl: aws.String("https://sqs.us-west-2.amazonaws.com/123456789012/MyQueue"), MessageBody: aws.String("Hello, world!")}, + }, + { + Name: "PurgeQueueInput", + BuildContext: func(txn *newrelic.Transaction) context.Context { + return context.Background() + }, + BuildConfig: func(ctx context.Context, txn *newrelic.Transaction) aws.Config { + return newConfig(ctx, txn) + }, + Input: &sqs.PurgeQueueInput{QueueUrl: aws.String("https://sqs.us-west-2.amazonaws.com/123456789012/MyQueue")}, + }, + { + Name: "DeleteMessageInput", + BuildContext: func(txn *newrelic.Transaction) context.Context { + return context.Background() + }, + BuildConfig: func(ctx context.Context, txn *newrelic.Transaction) aws.Config { + return newConfig(ctx, txn) + }, + Input: &sqs.DeleteMessageInput{QueueUrl: aws.String("https://sqs.us-west-2.amazonaws.com/123456789012/MyQueue"), ReceiptHandle: aws.String("receipt-handle")}, + }, + { + Name: "ChangeMessageVisibilityInput", + BuildContext: func(txn *newrelic.Transaction) context.Context { + return context.Background() + }, + BuildConfig: func(ctx context.Context, txn *newrelic.Transaction) aws.Config { + return newConfig(ctx, txn) + }, + Input: &sqs.ChangeMessageVisibilityInput{QueueUrl: aws.String("https://sqs.us-west-2.amazonaws.com/123456789012/MyQueue"), ReceiptHandle: aws.String("receipt-handle"), VisibilityTimeout: 10}, + }, + + { + Name: "ChangeMessageVisibilityBatchInput", + BuildContext: func(txn *newrelic.Transaction) context.Context { + return context.Background() + }, + BuildConfig: func(ctx context.Context, txn *newrelic.Transaction) aws.Config { + return newConfig(ctx, txn) + }, + Input: &sqs.ChangeMessageVisibilityBatchInput{ + QueueUrl: aws.String("https://sqs.us-west-2.amazonaws.com/123456789012/MyQueue"), + Entries: []sqstypes.ChangeMessageVisibilityBatchRequestEntry{ + { + Id: aws.String("id1"), + ReceiptHandle: aws.String("receipt-handle"), + VisibilityTimeout: 10, + }, + }, + }, + }, + { + Name: "DeleteMessageBatchInput", + BuildContext: func(txn *newrelic.Transaction) context.Context { + return context.Background() + }, + BuildConfig: func(ctx context.Context, txn *newrelic.Transaction) aws.Config { + return newConfig(ctx, txn) + }, + Input: &sqs.DeleteMessageBatchInput{ + QueueUrl: aws.String("https://sqs.us-west-2.amazonaws.com/123456789012/MyQueue"), + Entries: []sqstypes.DeleteMessageBatchRequestEntry{ + { + Id: aws.String("id1"), + ReceiptHandle: aws.String("receipt-handle"), + }, + }, + }, + }, + { + Name: "SendMessageBatchInput", + BuildContext: func(txn *newrelic.Transaction) context.Context { + return context.Background() + }, + BuildConfig: func(ctx context.Context, txn *newrelic.Transaction) aws.Config { + return newConfig(ctx, txn) + }, + Input: &sqs.SendMessageBatchInput{ + QueueUrl: aws.String("https://sqs.us-west-2.amazonaws.com/123456789012/MyQueue"), + Entries: []sqstypes.SendMessageBatchRequestEntry{ + { + Id: aws.String("id1"), + MessageBody: aws.String("Hello, world!"), + }, + }, + }, + }, + { + Name: "GetQueueAttributesInput", + BuildContext: func(txn *newrelic.Transaction) context.Context { + return context.Background() + }, + BuildConfig: func(ctx context.Context, txn *newrelic.Transaction) aws.Config { + return newConfig(ctx, txn) + }, + Input: &sqs.GetQueueAttributesInput{ + QueueUrl: aws.String("https://sqs.us-west-2.amazonaws.com/123456789012/MyQueue"), + AttributeNames: []sqstypes.QueueAttributeName{ + "ApproximateNumberOfMessages", + }, + }, + }, + { + Name: "SetQueueAttributesInput", + BuildContext: func(txn *newrelic.Transaction) context.Context { + return context.Background() + }, + BuildConfig: func(ctx context.Context, txn *newrelic.Transaction) aws.Config { + return newConfig(ctx, txn) + }, + Input: &sqs.SetQueueAttributesInput{ + QueueUrl: aws.String("https://sqs.us-west-2.amazonaws.com/123456789012/MyQueue"), + Attributes: map[string]string{ + "VisibilityTimeout": "10", + }, + }, + }, + { + Name: "TagQueueInput", + BuildContext: func(txn *newrelic.Transaction) context.Context { + return context.Background() + }, + BuildConfig: func(ctx context.Context, txn *newrelic.Transaction) aws.Config { + return newConfig(ctx, txn) + }, + Input: &sqs.TagQueueInput{ + QueueUrl: aws.String("https://sqs.us-west-2.amazonaws.com/123456789012/MyQueue"), + Tags: map[string]string{ + "tag1": "value1", + }, + }, + }, + { + Name: "UntagQueueInput", + BuildContext: func(txn *newrelic.Transaction) context.Context { + return context.Background() + }, + BuildConfig: func(ctx context.Context, txn *newrelic.Transaction) aws.Config { + return newConfig(ctx, txn) + }, + Input: &sqs.UntagQueueInput{ + QueueUrl: aws.String("https://sqs.us-west-2.amazonaws.com/123456789012/MyQueue"), + TagKeys: []string{"tag1"}, + }, + }, + }, + + func(t *testing.T, entry *sqsTestTableEntry) { + app := testApp() + txn := app.StartTransaction(txnName) + ctx := entry.BuildContext(txn) + awsOp := "" + client := sqs.NewFromConfig(entry.BuildConfig(ctx, txn)) + switch input := entry.Input.(type) { + case *sqs.SendMessageInput: + client.SendMessage(ctx, input) + awsOp = "SendMessage" + case *sqs.DeleteQueueInput: + client.DeleteQueue(ctx, input) + awsOp = "DeleteQueue" + case *sqs.ReceiveMessageInput: + client.ReceiveMessage(ctx, input) + awsOp = "ReceiveMessage" + case *sqs.DeleteMessageInput: + client.DeleteMessage(ctx, input) + awsOp = "DeleteMessage" + case *sqs.ChangeMessageVisibilityInput: + client.ChangeMessageVisibility(ctx, input) + awsOp = "ChangeMessageVisibility" + case *sqs.ChangeMessageVisibilityBatchInput: + client.ChangeMessageVisibilityBatch(ctx, input) + awsOp = "ChangeMessageVisibilityBatch" + case *sqs.DeleteMessageBatchInput: + client.DeleteMessageBatch(ctx, input) + awsOp = "DeleteMessageBatch" + case *sqs.PurgeQueueInput: + client.PurgeQueue(ctx, input) + awsOp = "PurgeQueue" + case *sqs.GetQueueAttributesInput: + client.GetQueueAttributes(ctx, input) + awsOp = "GetQueueAttributes" + case *sqs.SetQueueAttributesInput: + client.SetQueueAttributes(ctx, input) + awsOp = "SetQueueAttributes" + case *sqs.TagQueueInput: + client.TagQueue(ctx, input) + awsOp = "TagQueue" + case *sqs.UntagQueueInput: + client.UntagQueue(ctx, input) + awsOp = "UntagQueue" + case *sqs.SendMessageBatchInput: + client.SendMessageBatch(ctx, input) + awsOp = "SendMessageBatch" + + default: + t.Errorf("unexpected input type: %T", input) + + } + + txn.End() + SQSSpanModified := SQSSpan + SQSSpanModified.AgentAttributes["aws.operation"] = awsOp + app.ExpectSpanEvents(t, []internal.WantEvent{ + SQSSpan, genericSpan}) + + }, + ) +} + func TestInstrumentRequestDatastore(t *testing.T) { runTestTable(t, []*testTableEntry{ diff --git a/v3/newrelic/attributes.go b/v3/newrelic/attributes.go index 2fd2f8d09..a3770b47f 100644 --- a/v3/newrelic/attributes.go +++ b/v3/newrelic/attributes.go @@ -118,6 +118,15 @@ const ( // // It is recommended that at most one message is consumed per transaction. const ( + // The account ID of a cloud service provider + AttributeCloudAccountID = "cloud.account.id" + // The region of a cloud service provider + AttributeCloudRegion = "cloud.region" + // The name of the messaging system + AttributeMessageSystem = "messaging.system" + // The name of the messagine broker destination + AttributeMessageDestinationName = "message.destination.name" + // The routing key of the consumed message. AttributeMessageRoutingKey = "message.routingKey" // The name of the queue the message was consumed from. @@ -131,6 +140,18 @@ const ( AttributeMessageCorrelationID = "message.correlationId" // The headers of the message without CAT keys/values AttributeMessageHeaders = "message.headers" + // Host identifier of the message broker + AttributeServerAddress = "server.address" + // Port number of the message broker + AttributeServerPort = "server.port" + // Will take on either the values "producer" or "consumer" + AttributeSpanKind = "span.kind" +) + +// Experimental OTEL Attributes for consumed message transactions +const ( + AttributeMessagingDestinationPublishName = "messaging.destination_publish.name" + AttributeRabbitMQDestinationRoutingKey = "messaging.rabbitmq.destination.routing_key" ) // Attributes destined for Span Events. These attributes appear only on Span diff --git a/v3/newrelic/attributes_from_internal.go b/v3/newrelic/attributes_from_internal.go index c2278a5f7..eb1ef14b5 100644 --- a/v3/newrelic/attributes_from_internal.go +++ b/v3/newrelic/attributes_from_internal.go @@ -34,37 +34,45 @@ var ( // attributes.go and add its default destinations here. // agentAttributeDefaultDests = map[string]destinationSet{ - AttributeHostDisplayName: usualDests, - AttributeRequestMethod: usualDests, - AttributeRequestAccept: usualDests, - AttributeRequestContentType: usualDests, - AttributeRequestContentLength: usualDests, - AttributeRequestHost: usualDests, - AttributeRequestUserAgent: tracesDests, - AttributeRequestUserAgentDeprecated: tracesDests, - AttributeRequestReferer: tracesDests, - AttributeRequestURI: usualDests, - AttributeResponseContentType: usualDests, - AttributeResponseContentLength: usualDests, - AttributeResponseCode: usualDests, - AttributeResponseCodeDeprecated: usualDests, - AttributeAWSRequestID: usualDests, - AttributeAWSLambdaARN: usualDests, - AttributeAWSLambdaColdStart: usualDests, - AttributeAWSLambdaEventSourceARN: usualDests, - AttributeMessageRoutingKey: usualDests, - AttributeMessageQueueName: usualDests, - AttributeMessageHeaders: usualDests, - AttributeMessageExchangeType: destNone, - AttributeMessageReplyTo: destNone, - AttributeMessageCorrelationID: destNone, - AttributeCodeFunction: usualDests, - AttributeCodeNamespace: usualDests, - AttributeCodeFilepath: usualDests, - AttributeCodeLineno: usualDests, - AttributeUserID: usualDests, - AttributeLLM: usualDests, - + AttributeCloudAccountID: usualDests, + AttributeMessageDestinationName: usualDests, + AttributeCloudRegion: usualDests, + AttributeMessageSystem: usualDests, + AttributeHostDisplayName: usualDests, + AttributeRequestMethod: usualDests, + AttributeRequestAccept: usualDests, + AttributeRequestContentType: usualDests, + AttributeRequestContentLength: usualDests, + AttributeRequestHost: usualDests, + AttributeRequestUserAgent: tracesDests, + AttributeRequestUserAgentDeprecated: tracesDests, + AttributeRequestReferer: tracesDests, + AttributeRequestURI: usualDests, + AttributeResponseContentType: usualDests, + AttributeResponseContentLength: usualDests, + AttributeResponseCode: usualDests, + AttributeResponseCodeDeprecated: usualDests, + AttributeAWSRequestID: usualDests, + AttributeAWSLambdaARN: usualDests, + AttributeAWSLambdaColdStart: usualDests, + AttributeAWSLambdaEventSourceARN: usualDests, + AttributeMessageRoutingKey: usualDests, + AttributeMessageQueueName: usualDests, + AttributeMessageHeaders: usualDests, + AttributeMessageExchangeType: destNone, + AttributeMessageReplyTo: destNone, + AttributeMessageCorrelationID: destNone, + AttributeCodeFunction: usualDests, + AttributeCodeNamespace: usualDests, + AttributeCodeFilepath: usualDests, + AttributeCodeLineno: usualDests, + AttributeUserID: usualDests, + AttributeLLM: usualDests, + AttributeServerAddress: usualDests, + AttributeServerPort: usualDests, + AttributeSpanKind: usualDests, + AttributeMessagingDestinationPublishName: usualDests, + AttributeRabbitMQDestinationRoutingKey: usualDests, // Span specific attributes SpanAttributeDBStatement: usualDests, SpanAttributeDBInstance: usualDests,