Skip to content

Commit

Permalink
chore: Added consumer attribute reconciliation
Browse files Browse the repository at this point in the history
  • Loading branch information
jsumners-nr committed Feb 20, 2025
1 parent 98d29fe commit 7068990
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 4 deletions.
48 changes: 46 additions & 2 deletions lib/otel/span-processor.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const {
ATTR_HTTP_ROUTE,
ATTR_HTTP_STATUS_CODE,
ATTR_HTTP_STATUS_TEXT,
ATTR_MESSAGING_DESTINATION_NAME,
ATTR_MESSAGING_MESSAGE_CONVERSATION_ID,
ATTR_MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY,
ATTR_NET_PEER_NAME,
Expand All @@ -38,7 +39,7 @@ module.exports = class NrSpanProcessor {

/**
* Synthesize segment at start of span and assign to a symbol
* that will be removed in `onEnd` once the correspondig
* that will be removed in `onEnd` once the corresponding
* segment is read.
* @param {object} span otel span getting tested
*/
Expand Down Expand Up @@ -71,12 +72,55 @@ module.exports = class NrSpanProcessor {
this.reconcileServerAttributes({ segment, span, transaction })
} else if (span.kind === SpanKind.CLIENT && span.attributes[ATTR_DB_SYSTEM]) {
this.reconcileDbAttributes({ segment, span })
} else if (span.kind === SpanKind.CONSUMER) {
this.reconcileConsumerAttributes({ segment, span, transaction })
} else if (span.kind === SpanKind.PRODUCER) {
this.reconcileProducerAttributes({ segment, span })
}
// TODO: add http external checks
}

/**
* Detect messaging consumer attributes in the OTEL span and add them
* to the New Relic transaction. Note: this method ends the current
* transaction.
*
* @param {object} params
* @param {object} params.span The OTEL span entity that possibly contains
* desired attributes.
* @param {Transaction} params.transaction The NR transaction to attach
* the found attributes to.
*/
reconcileConsumerAttributes({ span, transaction }) {
const attrs = span.attributes
const baseSegment = transaction.baseSegment

if (attrs[ATTR_SERVER_ADDRESS]) {
baseSegment.addAttribute('host', attrs[ATTR_SERVER_ADDRESS])
}
if (attrs[ATTR_SERVER_PORT]) {
baseSegment.addAttribute('port', attrs[ATTR_SERVER_PORT])
}

// Remaining attributes should only be added if we are not in high
// security mode.
if (this.agent.config.high_security === true) {
transaction.end()
return
}

const trace = transaction.trace
if (attrs[ATTR_MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY]) {
trace.attributes.addAttribute(DESTINATIONS.TRANS_COMMON, 'message.routingKey', attrs[ATTR_MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY])
}

if (attrs[ATTR_MESSAGING_DESTINATION_NAME]) {
trace.attributes.addAttribute(DESTINATIONS.TRANS_COMMON, 'message.queueName', attrs[ATTR_MESSAGING_DESTINATION_NAME])
}

transaction.end()
}

reconcileServerAttributes({ segment, span, transaction }) {
if (span.attributes[ATTR_RPC_SYSTEM]) {
this.reconcileRpcAttributes({ segment, span, transaction })
Expand All @@ -86,7 +130,7 @@ module.exports = class NrSpanProcessor {

// End the corresponding transaction for the entry point server span.
// We do then when the span ends to ensure all data has been processed
// for the correspondig server span.
// for the corresponding server span.
transaction.end()
}

Expand Down
2 changes: 1 addition & 1 deletion lib/spans/span-event.js
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ class HttpSpanEvent extends SpanEvent {
* Span event class for datastore operations and queries.
*
* @private
* @class.
* @class
*/
class DatastoreSpanEvent extends SpanEvent {
constructor(attributes, customAttributes) {
Expand Down
15 changes: 14 additions & 1 deletion test/versioned/otel-bridge/span.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const { hrTimeToMilliseconds } = require('@opentelemetry/core')
const helper = require('../../lib/agent_helper')
const { otelSynthesis } = require('../../../lib/symbols')

const { DESTINATIONS } = require('../../../lib/transaction')
const {
ATTR_DB_NAME,
ATTR_DB_STATEMENT,
Expand All @@ -26,6 +27,7 @@ const {
ATTR_HTTP_URL,
ATTR_MESSAGING_DESTINATION,
ATTR_MESSAGING_DESTINATION_KIND,
ATTR_MESSAGING_DESTINATION_NAME,
ATTR_MESSAGING_MESSAGE_CONVERSATION_ID,
ATTR_MESSAGING_OPERATION,
ATTR_MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY,
Expand Down Expand Up @@ -418,8 +420,11 @@ test('messaging consumer metrics are bridged correctly', (t, end) => {
[ATTR_MESSAGING_SYSTEM]: 'kafka',
[ATTR_MESSAGING_OPERATION]: 'getMessage',
[ATTR_SERVER_ADDRESS]: '127.0.0.1',
[ATTR_SERVER_PORT]: '1234',
[ATTR_MESSAGING_DESTINATION]: 'work-queue',
[ATTR_MESSAGING_DESTINATION_KIND]: 'queue'
[ATTR_MESSAGING_DESTINATION_KIND]: 'queue',
[ATTR_MESSAGING_DESTINATION_NAME]: 'test-queue',
[ATTR_MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY]: 'test-key'
}

tracer.startActiveSpan('consumer-test', { kind: otel.SpanKind.CONSUMER, attributes }, (span) => {
Expand All @@ -444,6 +449,14 @@ test('messaging consumer metrics are bridged correctly', (t, end) => {
assert.equal(unscopedMetrics[expectedMetric].callCount, 1, `${expectedMetric}.callCount`)
}

// Verify that required reconciled attributes are present:
let attrs = tx.baseSegment.getAttributes()
assert.equal(attrs.host, '127.0.0.1')
assert.equal(attrs.port, '1234')
attrs = tx.trace.attributes.get(DESTINATIONS.TRANS_COMMON)
assert.equal(attrs['message.queueName'], 'test-queue')
assert.equal(attrs['message.routingKey'], 'test-key')

end()
})
})

0 comments on commit 7068990

Please sign in to comment.