Skip to content

Commit

Permalink
chore: Added consumer attribute reconciliation
Browse files Browse the repository at this point in the history
  • Loading branch information
jsumners-nr committed Feb 21, 2025
1 parent 98d29fe commit 33bb444
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 20 deletions.
18 changes: 7 additions & 11 deletions lib/otel/segments/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,26 @@ module.exports = createConsumerSegment

const Transaction = require('../../transaction/')
const recorder = require('../../metrics/recorders/message-transaction')
const { DESTINATIONS, TYPES } = Transaction
const { TYPES } = Transaction

const {
ATTR_MESSAGING_DESTINATION,
ATTR_MESSAGING_DESTINATION_KIND,
ATTR_MESSAGING_DESTINATION_NAME,
ATTR_MESSAGING_SYSTEM
} = require('../constants')

function createConsumerSegment(agent, otelSpan) {
const attrs = otelSpan.attributes
const transaction = new Transaction(agent)
transaction.type = TYPES.MESSAGE

const system = otelSpan.attributes[ATTR_MESSAGING_SYSTEM] ?? 'unknown'
const destination = otelSpan.attributes[ATTR_MESSAGING_DESTINATION] ?? 'unknown'
const destKind = otelSpan.attributes[ATTR_MESSAGING_DESTINATION_KIND] ?? 'unknown'
const system = attrs[ATTR_MESSAGING_SYSTEM] ?? 'unknown'
// _NAME is the current preferred attribute with semantic conventions >=1.3.0.
const destination = attrs[ATTR_MESSAGING_DESTINATION_NAME] ?? attrs[ATTR_MESSAGING_DESTINATION] ?? 'unknown'
const destKind = attrs[ATTR_MESSAGING_DESTINATION_KIND] ?? 'unknown'
const segmentName = `${system}/${destKind}/Named/${destination}`

const txAttrs = transaction.trace.attributes
txAttrs.addAttribute(DESTINATIONS.TRANS_SCOPE, 'message.queueName', destination)
// txAttrs.addAttribute(
// DESTINATIONS.TRANS_SCOPE,
// 'host',
//
// )
transaction.setPartialName(segmentName)

const segment = agent.tracer.createSegment({
Expand Down
50 changes: 48 additions & 2 deletions lib/otel/span-processor.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ const {
ATTR_HTTP_ROUTE,
ATTR_HTTP_STATUS_CODE,
ATTR_HTTP_STATUS_TEXT,
ATTR_MESSAGING_DESTINATION,
ATTR_MESSAGING_DESTINATION_NAME,
ATTR_MESSAGING_MESSAGE_CONVERSATION_ID,
ATTR_MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY,
ATTR_NET_PEER_NAME,
Expand All @@ -38,7 +40,7 @@ module.exports = class NrSpanProcessor {

/**
* Synthesize segment at start of span and assign to a symbol
* that will be removed in `onEnd` once the correspondig
* that will be removed in `onEnd` once the corresponding
* segment is read.
* @param {object} span otel span getting tested
*/
Expand Down Expand Up @@ -71,12 +73,56 @@ module.exports = class NrSpanProcessor {
this.reconcileServerAttributes({ segment, span, transaction })
} else if (span.kind === SpanKind.CLIENT && span.attributes[ATTR_DB_SYSTEM]) {
this.reconcileDbAttributes({ segment, span })
} else if (span.kind === SpanKind.CONSUMER) {
this.reconcileConsumerAttributes({ segment, span, transaction })
} else if (span.kind === SpanKind.PRODUCER) {
this.reconcileProducerAttributes({ segment, span })
}
// TODO: add http external checks
}

/**
* Detect messaging consumer attributes in the OTEL span and add them
* to the New Relic transaction. Note: this method ends the current
* transaction.
*
* @param {object} params
* @param {object} params.span The OTEL span entity that possibly contains
* desired attributes.
* @param {Transaction} params.transaction The NR transaction to attach
* the found attributes to.
*/
reconcileConsumerAttributes({ span, transaction }) {
const attrs = span.attributes
const baseSegment = transaction.baseSegment

if (attrs[ATTR_SERVER_ADDRESS]) {
baseSegment.addAttribute('host', attrs[ATTR_SERVER_ADDRESS])
}
if (attrs[ATTR_SERVER_PORT]) {
baseSegment.addAttribute('port', attrs[ATTR_SERVER_PORT])
}

// Remaining attributes should only be added if we are not in high
// security mode.
if (this.agent.config.high_security === true) {
transaction.end()
return
}

const trace = transaction.trace
if (attrs[ATTR_MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY]) {
trace.attributes.addAttribute(DESTINATIONS.TRANS_COMMON, 'message.routingKey', attrs[ATTR_MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY])
}

const destName = attrs[ATTR_MESSAGING_DESTINATION_NAME] ?? attrs[ATTR_MESSAGING_DESTINATION]
if (destName) {
trace.attributes.addAttribute(DESTINATIONS.TRANS_COMMON, 'message.queueName', destName)
}

transaction.end()
}

reconcileServerAttributes({ segment, span, transaction }) {
if (span.attributes[ATTR_RPC_SYSTEM]) {
this.reconcileRpcAttributes({ segment, span, transaction })
Expand All @@ -86,7 +132,7 @@ module.exports = class NrSpanProcessor {

// End the corresponding transaction for the entry point server span.
// We do then when the span ends to ensure all data has been processed
// for the correspondig server span.
// for the corresponding server span.
transaction.end()
}

Expand Down
2 changes: 1 addition & 1 deletion lib/spans/span-event.js
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ class HttpSpanEvent extends SpanEvent {
* Span event class for datastore operations and queries.
*
* @private
* @class.
* @class
*/
class DatastoreSpanEvent extends SpanEvent {
constructor(attributes, customAttributes) {
Expand Down
5 changes: 0 additions & 5 deletions test/unit/lib/otel/consumer.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ const {
ATTR_MESSAGING_SYSTEM,
} = require('#agentlib/otel/constants.js')

const { DESTINATIONS } = require('../../../../lib/transaction')
const helper = require('../../../lib/agent_helper')
const createSpan = require('./fixtures/span')
const SegmentSynthesizer = require('../../../../lib/otel/segment-synthesis')
Expand Down Expand Up @@ -63,8 +62,4 @@ test('should create consumer segment from otel span', (t) => {
assert.equal(transaction.name, expectedName)
assert.equal(transaction.type, 'message')
assert.equal(transaction.baseSegment, segment)
assert.equal(
transaction.trace.attributes.get(DESTINATIONS.TRANS_SCOPE)['message.queueName'],
'dest1'
)
})
44 changes: 43 additions & 1 deletion test/versioned/otel-bridge/span.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const { hrTimeToMilliseconds } = require('@opentelemetry/core')
const helper = require('../../lib/agent_helper')
const { otelSynthesis } = require('../../../lib/symbols')

const { DESTINATIONS } = require('../../../lib/transaction')
const {
ATTR_DB_NAME,
ATTR_DB_STATEMENT,
Expand All @@ -26,6 +27,7 @@ const {
ATTR_HTTP_URL,
ATTR_MESSAGING_DESTINATION,
ATTR_MESSAGING_DESTINATION_KIND,
ATTR_MESSAGING_DESTINATION_NAME,
ATTR_MESSAGING_MESSAGE_CONVERSATION_ID,
ATTR_MESSAGING_OPERATION,
ATTR_MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY,
Expand Down Expand Up @@ -418,8 +420,10 @@ test('messaging consumer metrics are bridged correctly', (t, end) => {
[ATTR_MESSAGING_SYSTEM]: 'kafka',
[ATTR_MESSAGING_OPERATION]: 'getMessage',
[ATTR_SERVER_ADDRESS]: '127.0.0.1',
[ATTR_SERVER_PORT]: '1234',
[ATTR_MESSAGING_DESTINATION]: 'work-queue',
[ATTR_MESSAGING_DESTINATION_KIND]: 'queue'
[ATTR_MESSAGING_DESTINATION_KIND]: 'queue',
[ATTR_MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY]: 'test-key'
}

tracer.startActiveSpan('consumer-test', { kind: otel.SpanKind.CONSUMER, attributes }, (span) => {
Expand All @@ -444,6 +448,44 @@ test('messaging consumer metrics are bridged correctly', (t, end) => {
assert.equal(unscopedMetrics[expectedMetric].callCount, 1, `${expectedMetric}.callCount`)
}

// Verify that required reconciled attributes are present:
let attrs = tx.baseSegment.getAttributes()
assert.equal(attrs.host, '127.0.0.1')
assert.equal(attrs.port, '1234')
attrs = tx.trace.attributes.get(DESTINATIONS.TRANS_COMMON)
assert.equal(attrs['message.queueName'], 'work-queue')
assert.equal(attrs['message.routingKey'], 'test-key')

end()
})
})

test('messaging consumer skips high security attributes', (t, end) => {
const { agent, tracer } = t.nr
const attributes = {
[ATTR_MESSAGING_SYSTEM]: 'kafka',
[ATTR_MESSAGING_OPERATION]: 'getMessage',
[ATTR_SERVER_ADDRESS]: '127.0.0.1',
[ATTR_SERVER_PORT]: '1234',
[ATTR_MESSAGING_DESTINATION_KIND]: 'queue',
[ATTR_MESSAGING_DESTINATION_NAME]: 'test-queue',
[ATTR_MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY]: 'test-key'
}
agent.config.high_security = true

tracer.startActiveSpan('consumer-test', { kind: otel.SpanKind.CONSUMER, attributes }, (span) => {
const tx = agent.getTransaction()
span.end()
tx.end()

// Verify that required reconciled attributes are present:
let attrs = tx.baseSegment.getAttributes()
assert.equal(attrs.host, '127.0.0.1')
assert.equal(attrs.port, '1234')
attrs = tx.trace.attributes.get(DESTINATIONS.TRANS_COMMON)
assert.equal(attrs['message.queueName'], undefined)
assert.equal(attrs['message.routingKey'], undefined)

end()
})
})

0 comments on commit 33bb444

Please sign in to comment.