diff --git a/.ci/docker/docker-compose-all.yml b/.ci/docker/docker-compose-all.yml index 7727d2c7c5..57bbdbf500 100644 --- a/.ci/docker/docker-compose-all.yml +++ b/.ci/docker/docker-compose-all.yml @@ -37,6 +37,10 @@ services: extends: file: docker-compose.yml service: localstack + kafka: + extends: + file: docker-compose.yml + service: kafka node_tests: extends: file: docker-compose-node-test.yml @@ -60,6 +64,8 @@ services: condition: service_healthy localstack: condition: service_healthy + kafka: + condition: service_healthy volumes: nodepgdata: @@ -76,3 +82,5 @@ volumes: driver: local nodelocalstackdata: driver: local + nodekafkadata: + driver: local diff --git a/.ci/docker/docker-compose-edge.yml b/.ci/docker/docker-compose-edge.yml index feb27dd063..c53de517b9 100644 --- a/.ci/docker/docker-compose-edge.yml +++ b/.ci/docker/docker-compose-edge.yml @@ -37,6 +37,10 @@ services: extends: file: docker-compose.yml service: redis + kafka: + extends: + file: docker-compose.yml + service: kafka node_tests: extends: file: docker-compose-node-edge-test.yml @@ -60,6 +64,8 @@ services: condition: service_healthy redis: condition: service_healthy + kafka: + condition: service_healthy volumes: nodepgdata: @@ -76,3 +82,5 @@ volumes: driver: local nodecassandradata: driver: local + nodekafkadata: + driver: local diff --git a/.ci/docker/docker-compose-kafka.yml b/.ci/docker/docker-compose-kafka.yml new file mode 100644 index 0000000000..3ae3e159f2 --- /dev/null +++ b/.ci/docker/docker-compose-kafka.yml @@ -0,0 +1,28 @@ +version: '2.1' + +services: + zookeeper: + extends: + file: docker-compose.yml + service: zookeeper + kafka: + extends: + file: docker-compose.yml + service: kafka + depends_on: + - zookeeper + node_tests: + extends: + file: docker-compose-node-test.yml + service: node_tests + depends_on: + - kafka + # TODO: uncomment this if health_check is necessary + # kafka: + # condition: service_healthy + +volumes: + nodekafkadata: + driver: local + nodezookeeperdata: + driver: local diff --git a/.ci/docker/docker-compose-node-edge-test.yml b/.ci/docker/docker-compose-node-edge-test.yml index b2129a58bb..e2b135eb73 100644 --- a/.ci/docker/docker-compose-node-edge-test.yml +++ b/.ci/docker/docker-compose-node-edge-test.yml @@ -24,6 +24,7 @@ services: PGUSER: 'postgres' MEMCACHED_HOST: 'memcached' LOCALSTACK_HOST: 'localstack:4566' + KAFKA_HOST: 'kafka:9093' NODE_VERSION: ${NODE_VERSION} NODE_FULL_VERSION: ${NODE_FULL_VERSION} NVM_NODEJS_ORG_MIRROR: ${NVM_NODEJS_ORG_MIRROR} diff --git a/.ci/docker/docker-compose-node-test.yml b/.ci/docker/docker-compose-node-test.yml index 96423bba4a..ede042ddec 100644 --- a/.ci/docker/docker-compose-node-test.yml +++ b/.ci/docker/docker-compose-node-test.yml @@ -21,6 +21,7 @@ services: PGUSER: 'postgres' MEMCACHED_HOST: 'memcached' LOCALSTACK_HOST: 'localstack:4566' + KAFKA_HOST: 'kafka:9093' NODE_VERSION: ${NODE_VERSION} TAV: ${TAV_MODULE} ELASTIC_APM_CONTEXT_MANAGER: ${ELASTIC_APM_CONTEXT_MANAGER} diff --git a/.ci/docker/docker-compose.yml b/.ci/docker/docker-compose.yml index 07aafc52ac..e14c125b09 100644 --- a/.ci/docker/docker-compose.yml +++ b/.ci/docker/docker-compose.yml @@ -140,6 +140,43 @@ services: volumes: - nodelocalstackdata:/var/lib/localstack + zookeeper: + # https://hub.docker.com/r/bitnami/zookeeper/tags + image: bitnami/zookeeper:3.9.1 + ports: + - "2181:2181" + volumes: + - nodezookeeperdata:/bitnami + environment: + - ALLOW_ANONYMOUS_LOGIN=yes + + kafka: + # https://hub.docker.com/r/bitnami/kafka/tags + image: bitnami/kafka:3.3.2 + ports: + - "9093:9093" + volumes: + - nodekafkadata:/var/lib/kafka/data + environment: + - KAFKA_BROKER_ID=1 + - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 + - ALLOW_PLAINTEXT_LISTENER=yes + - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT + - KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093 + - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://kafka:9093 + - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT + - KAFKA_CFG_DELETE_TOPIC_ENABLE=true + depends_on: + - zookeeper + # TODO: maybe not necessary but figure out how to do this + healthcheck: + # use netcat to check tcp connection available + # test: nc -z localhost 9093 || exit -1 + # start_period: 15s + # interval: 5s + # timeout: 10s + # retries: 5 + volumes: nodepgdata: driver: local @@ -155,3 +192,7 @@ volumes: driver: local nodelocalstackdata: driver: local + nodekafkadata: + driver: local + nodezookeeperdata: + driver: local diff --git a/.ci/scripts/test.sh b/.ci/scripts/test.sh index 0f50b229b4..f60b1833d5 100755 --- a/.ci/scripts/test.sh +++ b/.ci/scripts/test.sh @@ -216,6 +216,9 @@ elif [[ -n "${TAV_MODULE}" ]]; then aws-sdk|@aws-sdk/client-s3|@aws-sdk/client-dynamodb|@aws-sdk/client-sns|@aws-sdk/client-sqs) DOCKER_COMPOSE_FILE=docker-compose-localstack.yml ;; + kafkajs) + DOCKER_COMPOSE_FILE=docker-compose-kafka.yml + ;; *) # Just the "node_tests" container. No additional services needed for testing. DOCKER_COMPOSE_FILE=docker-compose-node-test.yml diff --git a/.ci/tav.json b/.ci/tav.json index afd9ac6cc2..98991eb64b 100644 --- a/.ci/tav.json +++ b/.ci/tav.json @@ -24,6 +24,7 @@ { "name": "generic-pool", "minMajorVersion": 8 }, { "name": "graphql", "minMajorVersion": 8 }, { "name": "ioredis", "minMajorVersion": 8 }, + { "name": "kafkajs", "minMajorVersion": 14 }, { "name": "knex", "minMajorVersion": 8 }, { "name": "memcached", "minMajorVersion": 8 }, { "name": "mongodb", "minMajorVersion": 8 }, diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 599a489047..b19cea6433 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -125,6 +125,31 @@ jobs: volumes: - nodelocalstackdata:/var/lib/localstack + zookeeper: + image: bitnami/zookeeper:3.9.1 + env: + ALLOW_ANONYMOUS_LOGIN: 'yes' + ports: + - "2181:2181" + volumes: + - nodezookeeperdata:/var/lib/zookeeper/data + + kafka: + image: bitnami/kafka:3.3.2 + ports: + - "9093:9093" + volumes: + - nodekafkadata:/var/lib/kafka/data + env: + KAFKA_BROKER_ID: '1' + KAFKA_CFG_ZOOKEEPER_CONNECT: 'zookeeper:2181' + ALLOW_PLAINTEXT_LISTENER: 'yes' + KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: 'CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT' + KAFKA_CFG_LISTENERS: 'CLIENT://:9092,EXTERNAL://:9093' + KAFKA_CFG_ADVERTISED_LISTENERS: 'CLIENT://kafka:9092,EXTERNAL://localhost:9093' + KAFKA_CFG_INTER_BROKER_LISTENER_NAME: 'CLIENT' + KAFKA_CFG_DELETE_TOPIC_ENABLE: 'true' + strategy: fail-fast: false matrix: diff --git a/.tav.yml b/.tav.yml index d28f25df91..0866f22859 100644 --- a/.tav.yml +++ b/.tav.yml @@ -483,3 +483,9 @@ undici: mode: max-7 include: '>=4.7.1 <6' commands: node test/instrumentation/modules/undici/undici.test.js + +kafkajs: + versions: + mode: latest-minors + include: '>=2 <3' + commands: node test/instrumentation/modules/kafkajs/kafkajs.test.js diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 365ba93ba8..64eb32dec4 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -43,6 +43,7 @@ See the <> guide. * Update <> support to `@opentelemetry/api` version 1.8.0. * Add support for `tedious` versions 17 and 18. ({pull}3901[#3901], {pull}3911[#3911]) +* Add support for `kafkajs` version v2. ({issues}2905[#2905]) [float] ===== Bug fixes diff --git a/docs/supported-technologies.asciidoc b/docs/supported-technologies.asciidoc index 8ceb06c55f..5461eaec9b 100644 --- a/docs/supported-technologies.asciidoc +++ b/docs/supported-technologies.asciidoc @@ -142,6 +142,7 @@ so those should be supported as well. |https://www.npmjs.com/package/tedious[tedious] |>=1.9 <19.0.0 | (Excluding v4.0.0.) Will instrument all queries |https://www.npmjs.com/package/undici[undici] | >=4.7.1 <6 | Will instrument undici HTTP requests, except HTTP CONNECT. Requires node v14.17.0 or later, or the user to have installed the https://www.npmjs.com/package/diagnostics_channel['diagnostics_channel' polyfill]. |https://www.npmjs.com/package/ws[ws] |>=1.0.0 <8.0.0 |Will instrument outgoing WebSocket messages +|https://www.npmjs.com/package/kafkajs[kafkajs] |>=2.0.0 <3.0.0 |Will instrument all send methods for producers and message and batch processing for consumers. |======================================================================= [float] diff --git a/examples/trace-kafka.js b/examples/trace-kafka.js new file mode 100644 index 0000000000..74d992310d --- /dev/null +++ b/examples/trace-kafka.js @@ -0,0 +1,89 @@ +#!/usr/bin/env node --unhandled-rejections=strict + +/* + * Copyright Elasticsearch B.V. and other contributors where applicable. + * Licensed under the BSD 2-Clause License; you may not use this file except in + * compliance with the BSD 2-Clause License. + */ + +// A small example showing Elastic APM tracing the 'kadfkajs' package. +// +// This assumes a Kafka server running on localhost. You can use: +// npm run docker:start kafka +// to start a Kafka container. Then `npm run docker:stop` to stop it. + +// eslint-disable-next-line no-unused-vars +const apm = require('../').start({ + serviceName: 'example-trace-kafka', +}); + +const { Buffer } = require('buffer'); +const { TextEncoder } = require('util'); + +const { Kafka } = require('kafkajs'); + +const topic = 'trace-kafka-topic'; +const kafka = new Kafka({ clientId: 'my-app', brokers: ['localhost:9093'] }); +const admin = kafka.admin(); + +const headerStr = 'value inside buffer'; +const headerEnc = new TextEncoder().encode(headerStr); +const headerBuf = Buffer.from(headerEnc); + +let producer, consumer; +let messagesConsumed = 0; + +async function run() { + await admin.connect(); + await admin.createTopics({ topics: [{ topic }] }); + + consumer = kafka.consumer({ groupId: 'trace-group' }); + producer = kafka.producer(); + + await producer.connect(); + await producer.send({ + topic, + messages: [ + { value: 'message 1', headers: { foo: 'bar' } }, + { value: 'message 2', headers: { foo: headerBuf } }, + { value: 'message 3' }, + ], + }); + + await consumer.connect(); + await consumer.subscribe({ topic, fromBeginning: true }); + await consumer.run({ + eachMessage: async function ({ topic, partition, message }) { + console.log(`message from topic(${topic}): ${message.value.toString()}`); + console.log(`message header ${message.headers.foo}`); + messagesConsumed++; + }, + }); + + await new Promise((resolve, reject) => { + let count = 0; + const id = setInterval(() => { + count++; + if (messagesConsumed === 3) { + clearInterval(id); + resolve(); + } else if (count > 10) { + // set a limit of 10s/retries + clearInterval(id); + reject(new Error('not receiving all messages after 10s')); + } + }, 1000); + }); +} + +run() + .catch((err) => { + console.warn('run err:', err); + }) + .finally(async () => { + console.log('disconnecting Kafkajs client'); + await producer.disconnect(); + await consumer.disconnect(); + await admin.deleteTopics({ topics: [topic] }); + await admin.disconnect(); + }); diff --git a/lib/instrumentation/index.js b/lib/instrumentation/index.js index aa492a6a25..8af059cff2 100644 --- a/lib/instrumentation/index.js +++ b/lib/instrumentation/index.js @@ -84,6 +84,7 @@ var MODULE_PATCHERS = [ { modPath: 'http2' }, { modPath: 'ioredis' }, { modPath: 'jade' }, + { modPath: 'kafkajs' }, { modPath: 'knex' }, { modPath: 'koa' }, { modPath: 'koa-router' }, diff --git a/lib/instrumentation/modules/kafkajs.js b/lib/instrumentation/modules/kafkajs.js new file mode 100644 index 0000000000..e0849412e4 --- /dev/null +++ b/lib/instrumentation/modules/kafkajs.js @@ -0,0 +1,476 @@ +/* + * Copyright Elasticsearch B.V. and other contributors where applicable. + * Licensed under the BSD 2-Clause License; you may not use this file except in + * compliance with the BSD 2-Clause License. + */ + +'use strict'; + +const { Buffer } = require('buffer'); + +const semver = require('semver'); + +const constants = require('../../constants'); +const shimmer = require('../shimmer'); +const { redactKeysFromObject } = require('../../filters/sanitize-field-names'); + +const NAME = 'Kafka'; +const TYPE = 'messaging'; +const SUBTYPE = 'kafka'; + +/** + * @typedef {{ Kafka: import('kafkajs').Kafka}} KafkaModule + * @typedef {(config: any) => Consumer} ConsumerFactory + * @typedef {import('kafkajs').Consumer} Consumer + * @typedef {import('kafkajs').ConsumerRunConfig} ConsumerRunConfig + * @typedef {(config: any) => Producer} ProducerFactory + * @typedef {import('kafkajs').Producer} Producer + * @typedef {import('kafkajs').ProducerRecord} ProducerRecord + */ + +/** + * @param {KafkaModule} mod + * @param {any} agent + * @param {Object} options + * @param {string} options.version + * @param {boolean} options.enabled + */ +module.exports = function (mod, agent, { version, enabled }) { + if (!enabled || !semver.satisfies(version, '>=2 <3')) { + return mod; + } + + const config = agent._conf; + const ins = agent._instrumentation; + + agent.logger.debug('shimming Kafka.prototype.consumer'); + shimmer.wrap(mod.Kafka.prototype, 'consumer', wrapConsumer); + agent.logger.debug('shimming Kafka.prototype.producer'); + shimmer.wrap(mod.Kafka.prototype, 'producer', wrapProducer); + return mod; + + /** + * Returns the patched version of `Kafka.consumer` which creates a new + * consumer with its `run` method patched to instrument message handling + * + * @param {ConsumerFactory} origConsumer + * @returns {ConsumerFactory} + */ + function wrapConsumer(origConsumer) { + return function wrappedConsumer() { + const consumer = origConsumer.apply(this, arguments); + + shimmer.wrap(consumer, 'run', wrapConsumerRun); + return consumer; + }; + } + + /** + * Return the patched version of `run` which instruments the + * `eachMessage` & `eachBatch` callbacks. + * + * @param {Consumer['run']} origRun + * @returns {Consumer['run']} + */ + function wrapConsumerRun(origRun) { + return function wrappedConsumerRun() { + const runConfig = arguments[0]; + + if (typeof runConfig.eachMessage === 'function') { + shimmer.wrap(runConfig, 'eachMessage', wrapEachMessage); + } + + if (typeof runConfig.eachBatch === 'function') { + shimmer.wrap(runConfig, 'eachBatch', wrapEachBatch); + } + + return origRun.apply(this, arguments); + }; + } + + /** + * Returns the instrumented version of `eachMessage` which + * - creates a transaction each time is called + * - add trace context into the transaction if present in message headers + * + * @param {ConsumerRunConfig['eachMessage']} origEachMessage + * @returns {ConsumerRunConfig['eachMessage']} + */ + function wrapEachMessage(origEachMessage) { + return async function (payload) { + const { topic, message } = payload; + + if (shouldIgnoreTopic(topic, config)) { + return origEachMessage.apply(this, arguments); + } + + // For distributed tracing this instrumentation is going to check + // the headers defined by opentelemetry and ignore the propietary + // `elasticaapmtraceparent` header + // https://github.com/elastic/apm/blob/main/specs/agents/tracing-distributed-tracing.md#binary-fields + const traceparent = message.headers && message.headers.traceparent; + const tracestate = message.headers && message.headers.tracestate; + const opts = {}; + + // According to `kafkajs` types a header value might be + // a string or Buffer + // https://github.com/tulios/kafkajs/blob/ff3b1117f316d527ae170b550bc0f772614338e9/types/index.d.ts#L148 + if (typeof traceparent === 'string') { + opts.childOf = traceparent; + } else if (traceparent instanceof Buffer) { + opts.childOf = traceparent.toString('utf-8'); + } + + if (typeof tracestate === 'string') { + opts.tracestate = tracestate; + } else if (tracestate instanceof Buffer) { + opts.tracestate = tracestate.toString('utf-8'); + } + + const trans = ins.startTransaction( + `${NAME} RECEIVE from ${topic}`, + TYPE, + opts, + ); + + const messageCtx = { queue: { name: topic } }; + if ( + config.captureBody === 'all' || + config.captureBody === 'transactions' + ) { + messageCtx.body = message.value.toString(); + } + + if (message.headers && config.captureHeaders) { + // Make sure there is no sensitive data + // and transform non-redacted buffers + messageCtx.headers = redactKeysFromObject( + message.headers, + config.sanitizeFieldNamesRegExp, + ); + Object.keys(messageCtx.headers).forEach((key) => { + const value = messageCtx.headers[key]; + if (value instanceof Buffer) { + messageCtx.headers[key] = value.toString('utf-8'); + } + }); + } + + if (message.timestamp) { + messageCtx.age = { + ms: Date.now() - Number(message.timestamp), + }; + } + + trans.setMessageContext(messageCtx); + + let result, err; + try { + result = await origEachMessage.apply(this, arguments); + } catch (ex) { + // Save the error for use in `finally` below, but re-throw it to + // not impact code flow. + err = ex; + throw ex; + } finally { + trans.setOutcome( + err ? constants.OUTCOME_FAILURE : constants.OUTCOME_SUCCESS, + ); + trans.end(); + } + + return result; + }; + } + + /** + * Returns the instrumented version of `eachBatch` which + * - creates a transaction each time is called + * - if trace context present in messages inks them to the transaction + * + * @param {ConsumerRunConfig['eachBatch']} origEachBatch + * @returns {ConsumerRunConfig['eachBatch']} + */ + function wrapEachBatch(origEachBatch) { + return async function ({ batch }) { + if (shouldIgnoreTopic(batch.topic, config)) { + return origEachBatch.apply(this, arguments); + } + + const trans = ins.startTransaction( + `${NAME} RECEIVE from ${batch.topic}`, + TYPE, + ); + const messageCtx = { queue: { name: batch.topic } }; + trans.setMessageContext(messageCtx); + + const serviceContext = { + framework: { name: 'Kafka' }, + }; + trans.setServiceContext(serviceContext); + + // Extract span links from up to 1000 messages in this batch. + // https://github.com/elastic/apm/blob/main/specs/agents/tracing-instrumentation-messaging.md#receiving-trace-context + // A span link is created from a `traceparent` header in a message. + const messages = batch && batch.messages; + + if (messages) { + const traceparentsSeen = new Set(); + const links = []; + const limit = Math.min( + messages.length, + constants.MAX_MESSAGES_PROCESSED_FOR_TRACE_CONTEXT, + ); + + for (let i = 0; i < messages.length; i++) { + const msg = messages[i]; + const traceparent = + msg.headers && + msg.headers.traceparent && + msg.headers.traceparent.toString(); + + if (traceparent && !traceparentsSeen.has(traceparent)) { + links.push({ context: traceparent }); + traceparentsSeen.add(traceparent); + + if (links.length >= limit) { + break; + } + } + } + trans._addLinks(links); + } + + let result, err; + try { + result = await origEachBatch.apply(this, arguments); + } catch (ex) { + // Save the error for use in `finally` below, but re-throw it to + // not impact code flow. + err = ex; + throw ex; + } finally { + trans.setOutcome( + err ? constants.OUTCOME_FAILURE : constants.OUTCOME_SUCCESS, + ); + trans.end(); + } + + return result; + }; + } + + /** + * Returns the patched version of `Kafka.producer` which creates a new + * producer with `send` & `sendBatch` methods patched to instrument message sending + * + * @param {ProducerFactory} origProducer + * @returns {ProducerFactory} + */ + function wrapProducer(origProducer) { + return function wrappedProducer() { + const producer = origProducer.apply(this, arguments); + + shimmer.wrap(producer, 'send', wrapProducerSend); + shimmer.wrap(producer, 'sendBatch', wrapProducerSendBatch); + return producer; + }; + } + + /** + * Returns the instrumented version of `send` which + * - creates an exit span each time is called + * - propagates trace context through message headers + * + * @param {Producer['send']} origSend + * @returns {Producer['send']} + */ + function wrapProducerSend(origSend) { + return async function (record) { + const { topic } = record; + let span; + + if (!shouldIgnoreTopic(topic, config)) { + span = ins.createSpan( + `${NAME} SEND to ${topic}`, + TYPE, + SUBTYPE, + 'send', + { exitSpan: true }, + ); + } + + // W3C trace-context propagation. + const runContext = ins.currRunContext(); + const parentSpan = + span || runContext.currSpan() || runContext.currTransaction(); + + if (parentSpan) { + record.messages.forEach((msg) => { + const newHeaders = Object.assign({}, msg.headers); + parentSpan.propagateTraceContextHeaders( + newHeaders, + function (carrier, name, value) { + if (name.startsWith('elastic-')) { + return; + } + carrier[name] = value; + }, + ); + msg.headers = newHeaders; + }); + } + + if (!span) { + return origSend.apply(this, arguments); + } + + // We do not add headers or body because: + // - `record.messages` is a list + // - spec says is for transactions (https://github.com/elastic/apm/blob/main/specs/agents/tracing-instrumentation-messaging.md#transaction-context-fields) + span.setMessageContext({ queue: { name: topic } }); + + const service = { + resource: `${SUBTYPE}/${topic}`, + type: SUBTYPE, + name: topic, + }; + + span._setDestinationContext({ service }); + + let result, err; + try { + result = await origSend.apply(this, arguments); + } catch (ex) { + // Save the error for use in `finally` below, but re-throw it to + // not impact code flow. + err = ex; + throw ex; + } finally { + span.setOutcome( + err ? constants.OUTCOME_FAILURE : constants.OUTCOME_SUCCESS, + ); + span.end(); + } + + return result; + }; + } + + /** + * Returns the patched version of `Producer.sendBatch` which + * - creates an exit span for the operation + * - propagates trace context via message headers + * + * @param {Producer['sendBatch']} origSendBatch + * @returns {Producer['sendBatch']} + */ + function wrapProducerSendBatch(origSendBatch) { + return async function (batch) { + let span; + let topicForContext; + let shouldIgnoreBatch = true; + const messages = batch.topicMessages || []; + const topics = new Set(); + + // Remove possible topic duplications + for (const msg of messages) { + topics.add(msg.topic); + } + + for (const t of topics) { + const topicIgnored = shouldIgnoreTopic(t, config); + + shouldIgnoreBatch = shouldIgnoreBatch && topicIgnored; + + // When a topic is not ignored we keep a copy for context unless + // we find a 2nd topic also not ignored. + if (!topicIgnored) { + if (topicForContext) { + topicForContext = undefined; + break; + } + topicForContext = t; + } + } + + if (!shouldIgnoreBatch) { + const suffix = topicForContext ? ` to ${topicForContext}` : ''; + span = ins.createSpan(`${NAME} SEND${suffix}`, TYPE, SUBTYPE, 'send', { + exitSpan: true, + }); + } + + // W3C trace-context propagation. + const runContext = ins.currRunContext(); + const parentSpan = + span || runContext.currSpan() || runContext.currTransaction(); + + if (parentSpan && batch.topicMessages) { + batch.topicMessages.forEach((topicMessage) => { + topicMessage.messages.forEach((msg) => { + const newHeaders = Object.assign({}, msg.headers); + parentSpan.propagateTraceContextHeaders( + newHeaders, + function (carrier, name, value) { + if (name.startsWith('elastic-')) { + return; + } + carrier[name] = value; + }, + ); + msg.headers = newHeaders; + }); + }); + } + + if (!span) { + return origSendBatch.apply(this, arguments); + } + + if (topicForContext) { + // We do not add headers or body because: + // - `record.messages` is a list + // - spec says is for transactions (https://github.com/elastic/apm/blob/main/specs/agents/tracing-instrumentation-messaging.md#transaction-context-fields) + span.setMessageContext({ queue: { name: topicForContext } }); + } + span.setServiceTarget(SUBTYPE, topicForContext); + + let result, err; + try { + result = await origSendBatch.apply(this, arguments); + } catch (ex) { + // Save the error for use in `finally` below, but re-throw it to + // not impact code flow. + err = ex; + throw ex; + } finally { + span.setOutcome( + err ? constants.OUTCOME_FAILURE : constants.OUTCOME_SUCCESS, + ); + span.end(); + } + + return result; + }; + } +}; + +/** + * Returns true if we have to ignore messages on the given topic + * + * @param {string} topic the topic where client is publishing/subscribing + * @param {{ ignoreMessageQueuesRegExp: RegExp[] }} config the agent's configuration object + * @returns {boolean} + */ +function shouldIgnoreTopic(topic, config) { + if (config.ignoreMessageQueuesRegExp) { + for (const rule of config.ignoreMessageQueuesRegExp) { + if (rule.test(topic)) { + return true; + } + } + } + + return false; +} diff --git a/package-lock.json b/package-lock.json index 8ef37ed10e..3b6249f7f2 100644 --- a/package-lock.json +++ b/package-lock.json @@ -98,6 +98,7 @@ "ioredis": "^5.1.0", "js-yaml": "^4.1.0", "json": "^11.0.0", + "kafkajs": "^2.2.4", "knex": "^3.0.1", "koa": "^2.11.0", "koa-bodyparser": "^4.3.0", @@ -12778,6 +12779,15 @@ "safe-buffer": "^5.0.1" } }, + "node_modules/kafkajs": { + "version": "2.2.4", + "resolved": "https://registry.npmjs.org/kafkajs/-/kafkajs-2.2.4.tgz", + "integrity": "sha512-j/YeapB1vfPT2iOIUn/vxdyKEuhuY2PxMBvf5JWux6iSaukAccrMtXEY/Lb7OvavDhOWME589bpLrEdnVHjfjA==", + "dev": true, + "engines": { + "node": ">=14.0.0" + } + }, "node_modules/keygrip": { "version": "1.1.0", "resolved": "https://registry.npmjs.org/keygrip/-/keygrip-1.1.0.tgz", @@ -28153,6 +28163,12 @@ "safe-buffer": "^5.0.1" } }, + "kafkajs": { + "version": "2.2.4", + "resolved": "https://registry.npmjs.org/kafkajs/-/kafkajs-2.2.4.tgz", + "integrity": "sha512-j/YeapB1vfPT2iOIUn/vxdyKEuhuY2PxMBvf5JWux6iSaukAccrMtXEY/Lb7OvavDhOWME589bpLrEdnVHjfjA==", + "dev": true + }, "keygrip": { "version": "1.1.0", "resolved": "https://registry.npmjs.org/keygrip/-/keygrip-1.1.0.tgz", diff --git a/package.json b/package.json index 5e58f85d0d..b59a83a39e 100644 --- a/package.json +++ b/package.json @@ -176,6 +176,7 @@ "ioredis": "^5.1.0", "js-yaml": "^4.1.0", "json": "^11.0.0", + "kafkajs": "^2.2.4", "knex": "^3.0.1", "koa": "^2.11.0", "koa-bodyparser": "^4.3.0", diff --git a/test/docker-compose.ci.yml b/test/docker-compose.ci.yml index 009f90bcc1..d68f46cf9f 100644 --- a/test/docker-compose.ci.yml +++ b/test/docker-compose.ci.yml @@ -12,6 +12,7 @@ services: CASSANDRA_HOST: 'cassandra' MEMCACHED_HOST: 'memcached' LOCALSTACK_HOST: 'localstack:4566' + KAFKA_HOST: 'kafka:9093' PGHOST: 'postgres' PGUSER: 'postgres' depends_on: @@ -33,3 +34,5 @@ services: condition: service_started localstack: condition: service_started + kafka: + condition: service_started diff --git a/test/docker-compose.yml b/test/docker-compose.yml index 5609f246a4..8252071fc1 100644 --- a/test/docker-compose.yml +++ b/test/docker-compose.yml @@ -132,6 +132,43 @@ services: volumes: - nodelocalstackdata:/var/lib/localstack + zookeeper: + # https://hub.docker.com/r/bitnami/zookeeper/tags + image: bitnami/zookeeper:3.9.1 + ports: + - "2181:2181" + volumes: + - nodezookeeperdata:/var/lib/zookeeper/data + environment: + - ALLOW_ANONYMOUS_LOGIN=yes + + kafka: + # https://hub.docker.com/r/bitnami/kafka/tags + # Config ref: https://github.com/bitnami/containers/tree/main/bitnami/kafka#how-to-use-this-image + image: bitnami/kafka:3.3.2 + ports: + - "9093:9093" + volumes: + - nodekafkadata:/var/lib/kafka/data + environment: + - KAFKA_BROKER_ID=1 + - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 + - ALLOW_PLAINTEXT_LISTENER=yes + - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT + - KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093 + - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9093 + - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT + - KAFKA_CFG_DELETE_TOPIC_ENABLE=true + depends_on: + - zookeeper + healthcheck: + # use netcat to check tcp connection available + test: nc -z localhost 9093 || exit -1 + start_period: 15s + interval: 5s + timeout: 10s + retries: 5 + volumes: nodepgdata: driver: local @@ -147,3 +184,7 @@ volumes: driver: local nodelocalstackdata: driver: local + nodekafkadata: + driver: local + nodezookeeperdata: + driver: local diff --git a/test/instrumentation/modules/kafkajs/fixtures/use-kafkajs-ctx-propagation.js b/test/instrumentation/modules/kafkajs/fixtures/use-kafkajs-ctx-propagation.js new file mode 100644 index 0000000000..dfe18730e3 --- /dev/null +++ b/test/instrumentation/modules/kafkajs/fixtures/use-kafkajs-ctx-propagation.js @@ -0,0 +1,173 @@ +/* + * Copyright Elasticsearch B.V. and other contributors where applicable. + * Licensed under the BSD 2-Clause License; you may not use this file except in + * compliance with the BSD 2-Clause License. + */ + +'use strict'; + +const TEST_TOPIC_PREFIX = 'elasticapmtest-topic-'; + +const apm = require('../../../../..').start({ + serviceName: 'use-kafkajs', + captureExceptions: false, + centralConfig: false, + metricsInterval: 0, + cloudProvider: 'none', + stackTraceLimit: 4, // get it smaller for reviewing output + logLevel: 'info', + ignoreMessageQueues: + process.env.TEST_MODE === 'send' ? [`${TEST_TOPIC_PREFIX}*`] : [], +}); + +const { Kafka } = require('kafkajs'); +/** @type {import('kafkajs').Admin} */ +let admin; +/** @type {import('kafkajs').Consumer} */ +let consumer; +/** @type {import('kafkajs').Producer} */ +let producer; + +/** + * @param {import('kafkajs').Kafka} kafkaClient + * @param {{topic: string; groupId: string, mode: string}} options + */ +async function useKafkajsClient(kafkaClient, options) { + const { topic, groupId, mode } = options; + const log = apm.logger.child({ + 'event.module': 'kafkajs', + topic, + }); + + admin = kafkaClient.admin(); + await admin.connect(); + await admin.createTopics({ + waitForLeaders: true, + topics: [{ topic }], + }); + log.info('createTopics'); + + if (mode === 'send') { + // On this mode we send some messages which are ingonerd as per agent config + // this must be executed 1st + producer = kafkaClient.producer(); + await producer.connect(); + log.info('producer connected'); + let data; + const tx = apm.startTransaction(`manual send to ${topic}`); + data = await producer.send({ + topic: topic, + messages: [{ value: 'message 1' }], + }); + log.info({ data }, 'messages sent'); + data = await producer.sendBatch({ + topicMessages: [ + { + topic: topic, + messages: [{ value: 'batch message 1' }], + }, + ], + }); + log.info({ data }, 'batch sent'); + tx.end(); + + await producer.disconnect(); + log.info('messages sent'); + } else if (mode === 'consume') { + // On this mode we consume the already sent messsages. This time they are + // instrumented (not ignored) and trace context should be added to transactions + // this must be executed 2nd + consumer = kafkaClient.consumer({ groupId }); + await consumer.connect(); + await consumer.subscribe({ + topics: [topic], + fromBeginning: true, + }); + log.info('consumer connected'); + + let messagesConsumed = 0; + await consumer.run({ + eachMessage: async function ({ message }) { + log.info(`message received: ${message.value.toString()}`); + messagesConsumed++; + }, + }); + await waitUntil(() => messagesConsumed >= 2, 10000); + log.info('messages consumed'); + await consumer.disconnect(); + log.info('consumer disconnect'); + await admin.deleteTopics({ topics: [topic] }); + log.info('topics deleted'); + } + await admin.disconnect(); + log.info('admin disconnect'); +} + +// ---- helper functions + +/** + * Retuns a promise which is resolved when the predicate becomes true or rejected + * if the timeout is reached. + * @param {() => boolean} predicate function which will return true to make ed of wait + * @param {number} [timeout] max time in ms to wait for the predicste to be true (defaults to 5000) + * @returns {Promise} + */ +function waitUntil(predicate, timeout = 5000) { + const startTime = Date.now(); + + return new Promise((resolve, reject) => { + const intervalId = setInterval(() => { + const deltaTime = Date.now() - startTime; + + if (predicate()) { + clearInterval(intervalId); + resolve(); + } else if (deltaTime > timeout) { + clearInterval(intervalId); + reject(new Error(`timeout after ${deltaTime}ms`)); + } + }, 1000); + }); +} + +// ---- mainline + +function main() { + // Config vars. + const mode = process.env.TEST_MODE; + const clientId = process.env.TEST_CLIENT_ID || 'elastictest-kafka-client'; + const groupId = process.env.TEST_GROUP_ID || 'elastictest-kafka-group'; + const broker = process.env.TEST_KAFKA_HOST || 'localhost:9093'; + const topic = + process.env.TEST_TOPIC || + TEST_TOPIC_PREFIX + Math.floor(Math.random() * 1000); + + // Guard against any topic name being used because we will be sending and + // receiveing in it, and potentially *deleting* the topic. + if (!topic.startsWith(TEST_TOPIC_PREFIX)) { + throw new Error( + `cannot use topic name "${topic}", it must start with ${TEST_TOPIC_PREFIX}`, + ); + } + + if (!mode) { + throw new Error( + `cannot use ${__filename} wihtout a "TEST_MODE" set to "send|consume" in the env.`, + ); + } + + const kafkaClient = new Kafka({ clientId, brokers: [broker] }); + + useKafkajsClient(kafkaClient, { topic, groupId, mode }).then( + function () { + apm.logger.info(`useKafkajsClient in "${mode}" mode resolved`); + process.exitCode = 0; + }, + function (err) { + apm.logger.error(err, `useKafkajsClient in "${mode}" mode rejected`); + process.exitCode = 1; + }, + ); +} + +main(); diff --git a/test/instrumentation/modules/kafkajs/fixtures/use-kafkajs-each-batch.js b/test/instrumentation/modules/kafkajs/fixtures/use-kafkajs-each-batch.js new file mode 100644 index 0000000000..28232afcf2 --- /dev/null +++ b/test/instrumentation/modules/kafkajs/fixtures/use-kafkajs-each-batch.js @@ -0,0 +1,177 @@ +/* + * Copyright Elasticsearch B.V. and other contributors where applicable. + * Licensed under the BSD 2-Clause License; you may not use this file except in + * compliance with the BSD 2-Clause License. + */ + +'use strict'; + +const MUST_IGNORE_TOPIC = process.env.TEST_IGNORE_TOPIC === 'true'; + +const apm = require('../../../../..').start({ + serviceName: 'use-kafkajs', + captureExceptions: false, + centralConfig: false, + metricsInterval: 0, + cloudProvider: 'none', + stackTraceLimit: 4, // get it smaller for reviewing output + logLevel: 'info', + ignoreMessageQueues: MUST_IGNORE_TOPIC ? ['*-ignore'] : [], +}); + +const { Buffer } = require('buffer'); + +const { Kafka } = require('kafkajs'); +/** @type {import('kafkajs').Admin} */ +let admin; +/** @type {import('kafkajs').Consumer} */ +let consumer; +/** @type {import('kafkajs').Producer} */ +let producer; + +const TEST_TOPIC_PREFIX = 'elasticapmtest-topic-'; + +/** + * @param {import('kafkajs').Kafka} kafkaClient + * @param {{topic: string; groupId: string}} options + */ +async function useKafkajsClient(kafkaClient, options) { + const { topic, groupId } = options; + const topicToIgnore = `${topic}-ignore`; + const log = apm.logger.child({ + 'event.module': 'kafkajs', + topic, + }); + + admin = kafkaClient.admin(); + consumer = kafkaClient.consumer({ groupId }); + producer = kafkaClient.producer(); + + // Create the topics & subscribe + await admin.connect(); + await admin.createTopics({ + waitForLeaders: true, + topics: [{ topic }, { topic: topicToIgnore }], + }); + log.info('createTopics'); + + await producer.connect(); + await consumer.connect(); + await consumer.subscribe({ + topics: [topic, topicToIgnore], + fromBeginning: true, + }); + log.info('all connected'); + + let batchMessagesConsumed = 0; + await consumer.run({ + eachBatch: async function ({ batch }) { + log.info(`batch received for topic: ${batch.topic}`); + batch.messages.forEach((message) => { + log.info(`batch message received: ${message.value.toString()}`); + batchMessagesConsumed++; + }); + }, + }); + + // 1st test trasnsactions for each message received + // Ensure an APM transaction so spans can happen. + let data; + const batchTx = apm.startTransaction(`manual send to ${topic}`); + data = await producer.sendBatch({ + topicMessages: [ + { + topic, + messages: [ + { value: 'batch message 1', headers: { foo: 'string' } }, + { value: 'batch message 2', headers: { foo: Buffer.from('buffer') } }, + { value: 'batch message 3' }, + ], + }, + { + topic: topicToIgnore, + messages: [ + { value: 'ignore message 1' }, + { value: 'ignore message 2' }, + { value: 'ignore message 3' }, + ], + }, + ], + }); + batchTx.end(); + log.info({ data }, 'batch sent'); + + await waitUntil(() => batchMessagesConsumed >= 6, 10000); + log.info('messages consumed'); + + await consumer.disconnect(); + log.info('consumer disconnect'); + await producer.disconnect(); + log.info('producer disconnect'); + await admin.deleteTopics({ topics: [topic, topicToIgnore] }); + log.info('topics deleted'); + await admin.disconnect(); + log.info('admin disconnect'); +} + +// ---- helper functions + +/** + * Retuns a promise which is resolved when the predicate becomes true or rejected + * if the timeout is reached. + * @param {() => boolean} predicate function which will return true to make ed of wait + * @param {number} [timeout] max time in ms to wait for the predicste to be true (defaults to 5000) + * @returns {Promise} + */ +function waitUntil(predicate, timeout = 5000) { + const startTime = Date.now(); + + return new Promise((resolve, reject) => { + const intervalId = setInterval(() => { + const deltaTime = Date.now() - startTime; + + if (predicate()) { + clearInterval(intervalId); + resolve(); + } else if (deltaTime > timeout) { + clearInterval(intervalId); + reject(new Error(`timeout after ${deltaTime}ms`)); + } + }, 1000); + }); +} + +// ---- mainline + +function main() { + // Config vars. + const clientId = process.env.TEST_CLIENT_ID || 'elastictest-kafka-client'; + const groupId = process.env.TEST_GROUP_ID || 'elastictest-kafka-group'; + const broker = process.env.TEST_KAFKA_HOST || 'localhost:9093'; + const topic = + process.env.TEST_TOPIC || + TEST_TOPIC_PREFIX + Math.floor(Math.random() * 1000); + + // Guard against any topic name being used because we will be sending and + // receiveing in it, and potentially *deleting* the topic. + if (!topic.startsWith(TEST_TOPIC_PREFIX)) { + throw new Error( + `cannot use topic name "${topic}", it must start with ${TEST_TOPIC_PREFIX}`, + ); + } + + const kafkaClient = new Kafka({ clientId, brokers: [broker] }); + + useKafkajsClient(kafkaClient, { topic, groupId }).then( + function () { + apm.logger.info('useKafkajsClient resolved'); + process.exitCode = 0; + }, + function (err) { + apm.logger.error(err, 'useKafkajsClient rejected'); + process.exitCode = 1; + }, + ); +} + +main(); diff --git a/test/instrumentation/modules/kafkajs/fixtures/use-kafkajs-each-message.js b/test/instrumentation/modules/kafkajs/fixtures/use-kafkajs-each-message.js new file mode 100644 index 0000000000..1511caae66 --- /dev/null +++ b/test/instrumentation/modules/kafkajs/fixtures/use-kafkajs-each-message.js @@ -0,0 +1,169 @@ +/* + * Copyright Elasticsearch B.V. and other contributors where applicable. + * Licensed under the BSD 2-Clause License; you may not use this file except in + * compliance with the BSD 2-Clause License. + */ + +'use strict'; + +const apm = require('../../../../..').start({ + serviceName: 'use-kafkajs', + captureExceptions: false, + centralConfig: false, + metricsInterval: 0, + cloudProvider: 'none', + stackTraceLimit: 4, // get it smaller for reviewing output + logLevel: 'info', + ignoreMessageQueues: ['*-ignore'], +}); + +const { Buffer } = require('buffer'); + +const { Kafka } = require('kafkajs'); +/** @type {import('kafkajs').Admin} */ +let admin; +/** @type {import('kafkajs').Consumer} */ +let consumer; +/** @type {import('kafkajs').Producer} */ +let producer; + +const TEST_TOPIC_PREFIX = 'elasticapmtest-topic-'; + +/** + * @param {import('kafkajs').Kafka} kafkaClient + * @param {{topic: string; groupId: string}} options + */ +async function useKafkajsClient(kafkaClient, options) { + const { topic, groupId } = options; + const topicToIgnore = `${topic}-ignore`; + const log = apm.logger.child({ + 'event.module': 'kafkajs', + topic, + }); + + admin = kafkaClient.admin(); + consumer = kafkaClient.consumer({ groupId }); + producer = kafkaClient.producer(); + + // Create the topics & subscribe + await admin.connect(); + await admin.createTopics({ + waitForLeaders: true, + topics: [{ topic, topicToIgnore }], + }); + log.info('topic created'); + + await producer.connect(); + await consumer.connect(); + await consumer.subscribe({ + topics: [topic, topicToIgnore], + fromBeginning: true, + }); + log.info('all connected'); + + let eachMessagesConsumed = 0; + await consumer.run({ + eachMessage: async function ({ message }) { + log.info(`message received: ${message.value.toString()}`); + eachMessagesConsumed++; + }, + }); + + // 1st test trasnsactions for each message received + // Ensure an APM transaction so spans can happen. + let data; + const eachTx = apm.startTransaction(`manual send to ${topic}`); + data = await producer.send({ + topic, + messages: [ + { value: 'each message 1', headers: { foo: 'string' } }, + { value: 'each message 2', headers: { foo: Buffer.from('buffer') } }, + { value: 'each message 3', headers: { auth: 'this_is_a_secret' } }, + ], + }); + log.info({ data }, 'messages sent'); + data = await producer.send({ + topic: topicToIgnore, + messages: [ + { value: 'ignore message 1' }, + { value: 'ignore message 2' }, + { value: 'ignore message 3' }, + ], + }); + log.info({ data }, 'messages to ignore sent'); + eachTx.end(); + + await waitUntil(() => eachMessagesConsumed >= 6, 10000); + log.info('messages consumed'); + + await consumer.disconnect(); + log.info('consumer disconnect'); + await producer.disconnect(); + log.info('producer disconnect'); + await admin.deleteTopics({ topics: [topic, topicToIgnore] }); + log.info('topics deleted'); + await admin.disconnect(); + log.info('admin disconnect'); +} + +// ---- helper functions + +/** + * Retuns a promise which is resolved when the predicate becomes true or rejected + * if the timeout is reached. + * @param {() => boolean} predicate function which will return true to make ed of wait + * @param {number} [timeout] max time in ms to wait for the predicste to be true (defaults to 5000) + * @returns {Promise} + */ +function waitUntil(predicate, timeout = 5000) { + const startTime = Date.now(); + + return new Promise((resolve, reject) => { + const intervalId = setInterval(() => { + const deltaTime = Date.now() - startTime; + + if (predicate()) { + clearInterval(intervalId); + resolve(); + } else if (deltaTime > timeout) { + clearInterval(intervalId); + reject(new Error(`timeout after ${deltaTime}ms`)); + } + }, 1000); + }); +} + +// ---- mainline + +async function main() { + // Config vars. + const clientId = process.env.TEST_CLIENT_ID || 'elastictest-kafka-client'; + const groupId = process.env.TEST_GROUP_ID || 'elastictest-kafka-group'; + const broker = process.env.TEST_KAFKA_HOST || 'localhost:9093'; + const topic = + process.env.TEST_TOPIC || + TEST_TOPIC_PREFIX + Math.floor(Math.random() * 1000); + + // Guard against any topic name being used because we will be sending and + // receiveing in it, and potentially *deleting* the topic. + if (!topic.startsWith(TEST_TOPIC_PREFIX)) { + throw new Error( + `cannot use topic name "${topic}", it must start with ${TEST_TOPIC_PREFIX}`, + ); + } + + const kafkaClient = new Kafka({ clientId, brokers: [broker] }); + + useKafkajsClient(kafkaClient, { topic, groupId }).then( + function () { + apm.logger.info('useKafkajsClient resolved'); + process.exitCode = 0; + }, + function (err) { + apm.logger.error(err, 'useKafkajsClient rejected'); + process.exitCode = 1; + }, + ); +} + +main(); diff --git a/test/instrumentation/modules/kafkajs/kafkajs.test.js b/test/instrumentation/modules/kafkajs/kafkajs.test.js new file mode 100644 index 0000000000..206496efcc --- /dev/null +++ b/test/instrumentation/modules/kafkajs/kafkajs.test.js @@ -0,0 +1,769 @@ +/* + * Copyright Elasticsearch B.V. and other contributors where applicable. + * Licensed under the BSD 2-Clause License; you may not use this file except in + * compliance with the BSD 2-Clause License. + */ + +'use strict'; + +// Test instrumentation of the 'kafkajs' module. +if (process.env.GITHUB_ACTIONS === 'true' && process.platform === 'win32') { + console.log('# SKIP: GH Actions do not support docker services on Windows'); + process.exit(0); +} + +const test = require('tape'); + +const { validateSpan } = require('../../../_validate_schema'); +const { runTestFixtures, sortApmEvents } = require('../../../_utils'); + +const kafkaHost = process.env.KAFKA_HOST || 'localhost:9093'; + +const rand = Math.floor(Math.random() * 1000); +const kafkaTopic = `elasticapmtest-topic-each-${rand}`; + +// this map will be used to stash data to be used among different tests +const store = new Map(); + +/** @type {import('../../../_utils').TestFixture[]} */ +const testFixtures = [ + { + name: 'simple Kafkajs usage scenario for single message processing', + script: 'fixtures/use-kafkajs-each-message.js', + cwd: __dirname, + timeout: 20000, + env: { + TEST_CLIENT_ID: 'elastic-kafka-client', + TEST_GROUP_ID: `elastictest-kafka-group-${rand}`, + TEST_TOPIC: kafkaTopic, + TEST_KAFKA_HOST: kafkaHost, + // Suppres warinings about new default partitioner + // https://kafka.js.org/docs/migration-guide-v2.0.0#producer-new-default-partitioner + KAFKAJS_NO_PARTITIONER_WARNING: '1', + }, + checkApmServer(t, apmServer) { + t.ok(apmServer.events[0].metadata, 'metadata'); + const events = sortApmEvents(apmServer.events); + const tx = events.shift().transaction; + + // First the transaction. + t.ok(tx, 'got the send transaction'); + + // Compare some common fields across all spans. + // ignore http/external spans + const spans = events.filter((e) => e.span).map((e) => e.span); + const parentId = spans[0].id; + spans.forEach((s) => { + const errs = validateSpan(s); + t.equal(errs, null, 'span is valid (per apm-server intake schema)'); + }); + t.equal( + spans.filter((s) => s.trace_id === tx.trace_id).length, + spans.length, + 'all spans have the same trace_id', + ); + t.equal( + spans.filter((s) => s.transaction_id === tx.id).length, + spans.length, + 'all spans have the same transaction_id', + ); + t.equal( + spans.filter((s) => s.sync === false).length, + spans.length, + 'all spans have sync=false', + ); + t.equal( + spans.filter((s) => s.sample_rate === 1).length, + spans.length, + 'all spans have sample_rate=1', + ); + + spans.forEach((s) => { + // Remove variable and common fields to facilitate t.deepEqual below. + delete s.id; + delete s.transaction_id; + delete s.parent_id; + delete s.trace_id; + delete s.timestamp; + delete s.duration; + delete s.sync; + delete s.sample_rate; + }); + + t.deepEqual(spans.shift(), { + name: `Kafka SEND to ${kafkaTopic}`, + type: 'messaging', + subtype: 'kafka', + action: 'send', + context: { + service: { + target: { type: 'kafka', name: kafkaTopic }, + }, + destination: { + service: { + resource: `kafka/${kafkaTopic}`, + type: '', + name: '', + }, + }, + message: { queue: { name: kafkaTopic } }, + }, + outcome: 'success', + }); + + t.equal(spans.length, 0, 'all spans accounted for'); + + // Now check the transactions created for each message received + const transactions = events + .filter((e) => e.transaction) + .map((e) => e.transaction); + + t.equal( + transactions.filter((t) => t.trace_id === tx.trace_id).length, + transactions.length, + 'all transactions have the same trace_id', + ); + t.equal( + transactions.filter((t) => t.parent_id === parentId).length, + transactions.length, + 'all transactions have the same parent_id', + ); + t.equal( + transactions + .map((t) => t.context.message.age.ms) + .filter((ms) => typeof ms === 'number' && ms > 0).length, + transactions.length, + 'all transactions have positive age', + ); + + // NOTE: messages could arrive in different order so we sort them + // to properly do the assertions + transactions.sort((t1, t2) => { + const header1 = t1.context.message.headers.foo || 'undefined'; + const header2 = t2.context.message.headers.foo || 'undefined'; + return header1 < header2 ? -1 : 1; + }); + transactions.forEach((t) => { + // Remove variable and common fields to facilitate t.deepEqual below. + delete t.id; + delete t.parent_id; + delete t.trace_id; + delete t.timestamp; + delete t.duration; + delete t.sample_rate; + delete t.sampled; + delete t.span_count; + delete t.result; + delete t.context.user; + delete t.context.tags; + delete t.context.custom; + delete t.context.cloud; + delete t.context.message.age; + }); + + // Check message handling transactions. + // Headers should be captured by default and redacted + // according to the default value of `sanitizeFieldNames` + t.deepEqual(transactions.shift(), { + name: `Kafka RECEIVE from ${kafkaTopic}`, + type: 'messaging', + context: { + service: {}, + message: { + queue: { name: kafkaTopic }, + headers: { + foo: 'buffer', + traceparent: `00-${tx.trace_id}-${parentId}-01`, + tracestate: 'es=s:1', + }, + }, + }, + outcome: 'success', + }); + + t.deepEqual(transactions.shift(), { + name: `Kafka RECEIVE from ${kafkaTopic}`, + type: 'messaging', + context: { + service: {}, + message: { + queue: { name: kafkaTopic }, + headers: { + foo: 'string', + traceparent: `00-${tx.trace_id}-${parentId}-01`, + tracestate: 'es=s:1', + }, + }, + }, + outcome: 'success', + }); + + t.deepEqual(transactions.shift(), { + name: `Kafka RECEIVE from ${kafkaTopic}`, + type: 'messaging', + context: { + service: {}, + message: { + queue: { name: kafkaTopic }, + headers: { + auth: '[REDACTED]', + traceparent: `00-${tx.trace_id}-${parentId}-01`, + tracestate: 'es=s:1', + }, + }, + }, + outcome: 'success', + }); + t.equal(transactions.length, 0, 'all transactions accounted for'); + }, + }, + { + name: 'simple Kafkajs usage scenario for batch message processing with a single topic not ignored', + script: 'fixtures/use-kafkajs-each-batch.js', + cwd: __dirname, + timeout: 20000, + env: { + TEST_CLIENT_ID: 'elastic-kafka-client', + TEST_GROUP_ID: `elastictest-kafka-group-${rand}`, + TEST_TOPIC: kafkaTopic, + TEST_KAFKA_HOST: kafkaHost, + TEST_IGNORE_TOPIC: 'true', + // Suppres warinings about new default partitioner + // https://kafka.js.org/docs/migration-guide-v2.0.0#producer-new-default-partitioner + KAFKAJS_NO_PARTITIONER_WARNING: '1', + }, + checkApmServer(t, apmServer) { + t.ok(apmServer.events[0].metadata, 'metadata'); + const events = sortApmEvents(apmServer.events); + const tx = events.shift().transaction; + + // First the transaction. + t.ok(tx, 'got the send batch transaction'); + + // Compare some common fields across all spans. + // ignore http/external spans + const spans = events.filter((e) => e.span).map((e) => e.span); + const spanId = spans[0].id; + spans.forEach((s) => { + const errs = validateSpan(s); + t.equal(errs, null, 'span is valid (per apm-server intake schema)'); + }); + t.equal( + spans.filter((s) => s.trace_id === tx.trace_id).length, + spans.length, + 'all spans have the same trace_id', + ); + t.equal( + spans.filter((s) => s.transaction_id === tx.id).length, + spans.length, + 'all spans have the same transaction_id', + ); + t.equal( + spans.filter((s) => s.sync === false).length, + spans.length, + 'all spans have sync=false', + ); + t.equal( + spans.filter((s) => s.sample_rate === 1).length, + spans.length, + 'all spans have sample_rate=1', + ); + + spans.forEach((s) => { + // Remove variable and common fields to facilitate t.deepEqual below. + delete s.id; + delete s.transaction_id; + delete s.parent_id; + delete s.trace_id; + delete s.timestamp; + delete s.duration; + delete s.sync; + delete s.sample_rate; + }); + + // The 1st batch has only one topic which is not ignored + // so the span has message context and also + // - service.target.name + // - destination.service.resource + t.deepEqual(spans.shift(), { + name: `Kafka SEND to ${kafkaTopic}`, + type: 'messaging', + subtype: 'kafka', + action: 'send', + context: { + service: { target: { type: 'kafka', name: kafkaTopic } }, + destination: { + service: { type: '', name: '', resource: `kafka/${kafkaTopic}` }, + }, + message: { queue: { name: kafkaTopic } }, + }, + outcome: 'success', + }); + + t.equal(spans.length, 0, 'all spans accounted for'); + + // Now check the transactions created + const transactions = events + .filter((e) => e.transaction) + .map((e) => e.transaction); + + transactions.forEach((t) => { + // Remove variable and common fields to facilitate t.deepEqual below. + delete t.id; + delete t.parent_id; + delete t.trace_id; + delete t.timestamp; + delete t.duration; + delete t.sample_rate; + delete t.sampled; + delete t.span_count; + delete t.result; + delete t.context.user; + delete t.context.tags; + delete t.context.custom; + delete t.context.cloud; + }); + + // Check message handling transactions + t.deepEqual(transactions.shift(), { + name: `Kafka RECEIVE from ${kafkaTopic}`, + type: 'messaging', + context: { + service: { framework: { name: 'Kafka' } }, + message: { queue: { name: kafkaTopic } }, + }, + links: [ + { + trace_id: tx.trace_id, + span_id: spanId, + }, + ], + + outcome: 'success', + }); + + t.equal(transactions.length, 0, 'all transactions accounted for'); + }, + }, + { + name: 'simple Kafkajs usage scenario for batch message processing without ignored topics', + script: 'fixtures/use-kafkajs-each-batch.js', + cwd: __dirname, + timeout: 20000, + env: { + TEST_CLIENT_ID: 'elastic-kafka-client', + TEST_GROUP_ID: `elastictest-kafka-group-${rand}`, + TEST_TOPIC: kafkaTopic, + TEST_KAFKA_HOST: kafkaHost, + TEST_IGNORE_TOPIC: 'false', + // Suppres warinings about new default partitioner + // https://kafka.js.org/docs/migration-guide-v2.0.0#producer-new-default-partitioner + KAFKAJS_NO_PARTITIONER_WARNING: '1', + }, + checkApmServer(t, apmServer) { + t.ok(apmServer.events[0].metadata, 'metadata'); + const events = sortApmEvents(apmServer.events); + const tx = events.shift().transaction; + + // First the transaction. + t.ok(tx, 'got the send batch transaction'); + + // Compare some common fields across all spans. + // ignore http/external spans + const spans = events.filter((e) => e.span).map((e) => e.span); + const spanId = spans[0].id; + spans.forEach((s) => { + const errs = validateSpan(s); + t.equal(errs, null, 'span is valid (per apm-server intake schema)'); + }); + t.equal( + spans.filter((s) => s.trace_id === tx.trace_id).length, + spans.length, + 'all spans have the same trace_id', + ); + t.equal( + spans.filter((s) => s.transaction_id === tx.id).length, + spans.length, + 'all spans have the same transaction_id', + ); + t.equal( + spans.filter((s) => s.sync === false).length, + spans.length, + 'all spans have sync=false', + ); + t.equal( + spans.filter((s) => s.sample_rate === 1).length, + spans.length, + 'all spans have sample_rate=1', + ); + + spans.forEach((s) => { + // Remove variable and common fields to facilitate t.deepEqual below. + delete s.id; + delete s.transaction_id; + delete s.parent_id; + delete s.trace_id; + delete s.timestamp; + delete s.duration; + delete s.sync; + delete s.sample_rate; + }); + + t.deepEqual(spans.shift(), { + name: `Kafka SEND`, + type: 'messaging', + subtype: 'kafka', + action: 'send', + context: { + service: { target: { type: 'kafka' } }, + destination: { + service: { type: '', name: '', resource: 'kafka' }, + }, + }, + outcome: 'success', + }); + + t.equal(spans.length, 0, 'all spans accounted for'); + + // Now check the transactions created + const transactions = events + .filter((e) => e.transaction) + .map((e) => e.transaction) + // We cannot ensure the order of the received batches so we have to sort + // to do the assertions properly + .sort((ta, tb) => { + return ta.name < tb.name ? -1 : 1; + }); + + transactions.forEach((t) => { + // Remove variable and common fields to facilitate t.deepEqual below. + delete t.id; + delete t.parent_id; + delete t.trace_id; + delete t.timestamp; + delete t.duration; + delete t.sample_rate; + delete t.sampled; + delete t.span_count; + delete t.result; + delete t.context.user; + delete t.context.tags; + delete t.context.custom; + delete t.context.cloud; + }); + + // Check message handling transactions + t.deepEqual(transactions.shift(), { + name: `Kafka RECEIVE from ${kafkaTopic}`, + type: 'messaging', + context: { + service: { framework: { name: 'Kafka' } }, + message: { queue: { name: kafkaTopic } }, + }, + links: [ + { + trace_id: tx.trace_id, + span_id: spanId, + }, + ], + + outcome: 'success', + }); + + t.deepEqual(transactions.shift(), { + name: `Kafka RECEIVE from ${kafkaTopic}-ignore`, + type: 'messaging', + context: { + service: { framework: { name: 'Kafka' } }, + message: { queue: { name: `${kafkaTopic}-ignore` } }, + }, + links: [ + { + trace_id: tx.trace_id, + span_id: spanId, + }, + ], + + outcome: 'success', + }); + + t.equal(transactions.length, 0, 'all transactions accounted for'); + }, + }, + { + name: 'simple Kafkajs usage scenario for `captureHeaders=false` and `captureBody=all` on message reception', + script: 'fixtures/use-kafkajs-each-message.js', + cwd: __dirname, + timeout: 20000, + env: { + TEST_CLIENT_ID: 'elastic-kafka-client', + TEST_GROUP_ID: `elastictest-kafka-group-${rand}`, + TEST_TOPIC: kafkaTopic, + TEST_KAFKA_HOST: kafkaHost, + ELASTIC_APM_CAPTURE_HEADERS: 'false', + ELASTIC_APM_CAPTURE_BODY: 'all', + // Suppres warinings about new default partitioner + // https://kafka.js.org/docs/migration-guide-v2.0.0#producer-new-default-partitioner + KAFKAJS_NO_PARTITIONER_WARNING: '1', + }, + checkApmServer(t, apmServer) { + t.ok(apmServer.events[0].metadata, 'metadata'); + const events = sortApmEvents(apmServer.events); + const tx = events.shift().transaction; + + // First the transaction. + t.ok(tx, 'got the send transaction'); + + // Compare some common fields across all spans. + // ignore http/external spans + const spans = events.filter((e) => e.span).map((e) => e.span); + spans.forEach((s) => { + const errs = validateSpan(s); + t.equal(errs, null, 'span is valid (per apm-server intake schema)'); + }); + t.equal( + spans.filter((s) => s.trace_id === tx.trace_id).length, + spans.length, + 'all spans have the same trace_id', + ); + t.equal( + spans.filter((s) => s.transaction_id === tx.id).length, + spans.length, + 'all spans have the same transaction_id', + ); + t.equal( + spans.filter((s) => s.sync === false).length, + spans.length, + 'all spans have sync=false', + ); + t.equal( + spans.filter((s) => s.sample_rate === 1).length, + spans.length, + 'all spans have sample_rate=1', + ); + + spans.forEach((s) => { + // Remove variable and common fields to facilitate t.deepEqual below. + delete s.id; + delete s.transaction_id; + delete s.parent_id; + delete s.trace_id; + delete s.timestamp; + delete s.duration; + delete s.sync; + delete s.sample_rate; + }); + + t.deepEqual(spans.shift(), { + name: `Kafka SEND to ${kafkaTopic}`, + type: 'messaging', + subtype: 'kafka', + action: 'send', + context: { + service: { + target: { type: 'kafka', name: kafkaTopic }, + }, + destination: { + service: { + resource: `kafka/${kafkaTopic}`, + type: '', + name: '', + }, + }, + message: { queue: { name: kafkaTopic } }, + }, + outcome: 'success', + }); + + t.equal(spans.length, 0, 'all spans accounted for'); + + // Now check the transactions created for each message received + const transactions = events + .filter((e) => e.transaction) + .map((e) => e.transaction); + const parentId = transactions[0].parent_id; + + t.equal( + transactions.filter((t) => t.trace_id === tx.trace_id).length, + transactions.length, + 'all transactions have the same trace_id', + ); + t.equal( + transactions.filter((t) => t.parent_id === parentId).length, + transactions.length, + 'all transactions have the same parent_id', + ); + t.equal( + transactions + .map((t) => t.context.message.age.ms) + .filter((ms) => typeof ms === 'number' && ms > 0).length, + transactions.length, + 'all transactions have positive age', + ); + + // NOTE: messages could arrive in different order so we sort them + // to properly do the assertions + transactions.sort((t1, t2) => { + const body1 = t1.context.message.body || 'undefined'; + const body2 = t2.context.message.body || 'undefined'; + return body1 < body2 ? -1 : 1; + }); + transactions.forEach((t) => { + // Remove variable and common fields to facilitate t.deepEqual below. + delete t.id; + delete t.parent_id; + delete t.trace_id; + delete t.timestamp; + delete t.duration; + delete t.sample_rate; + delete t.sampled; + delete t.span_count; + delete t.result; + delete t.context.user; + delete t.context.tags; + delete t.context.custom; + delete t.context.cloud; + delete t.context.message.age; + }); + + // Check message handling transactions + t.deepEqual(transactions.shift(), { + name: `Kafka RECEIVE from ${kafkaTopic}`, + type: 'messaging', + context: { + service: {}, + message: { + queue: { name: kafkaTopic }, + body: 'each message 1', + }, + }, + outcome: 'success', + }); + + t.deepEqual(transactions.shift(), { + name: `Kafka RECEIVE from ${kafkaTopic}`, + type: 'messaging', + context: { + service: {}, + message: { + queue: { name: kafkaTopic }, + body: 'each message 2', + }, + }, + outcome: 'success', + }); + + t.deepEqual(transactions.shift(), { + name: `Kafka RECEIVE from ${kafkaTopic}`, + type: 'messaging', + context: { + service: {}, + message: { + queue: { name: kafkaTopic }, + body: 'each message 3', + }, + }, + outcome: 'success', + }); + t.equal(transactions.length, 0, 'all transactions accounted for'); + }, + }, + { + name: 'simple Kafkajs usage scenario of context propagation while sending messages', + script: 'fixtures/use-kafkajs-ctx-propagation.js', + cwd: __dirname, + timeout: 20000, + env: { + TEST_CLIENT_ID: 'elastic-kafka-client', + TEST_GROUP_ID: `elastictest-kafka-group-${rand}`, + TEST_TOPIC: kafkaTopic, + TEST_KAFKA_HOST: kafkaHost, + TEST_MODE: 'send', + // Suppres warinings about new default partitioner + // https://kafka.js.org/docs/migration-guide-v2.0.0#producer-new-default-partitioner + KAFKAJS_NO_PARTITIONER_WARNING: '1', + }, + checkApmServer(t, apmServer) { + t.ok(apmServer.events[0].metadata, 'metadata'); + const events = sortApmEvents(apmServer.events); + const tx = events.shift().transaction; + + // First the transaction. + t.ok(tx, 'got the send transaction'); + + // Stash the trace context data to use it on the assertions of the next test + store.set('ctx-propagation-parent-id', tx.id); + store.set('ctx-propagation-trace-id', tx.trace_id); + + // Check topic is ignored + const spans = events.filter((e) => e.span).map((e) => e.span); + t.equal(spans.length, 0, 'there are no spans'); + }, + }, + { + name: 'simple Kafkajs usage scenario of context propagation while consuming messages', + script: 'fixtures/use-kafkajs-ctx-propagation.js', + cwd: __dirname, + timeout: 20000, + env: { + TEST_CLIENT_ID: 'elastic-kafka-client', + TEST_GROUP_ID: `elastictest-kafka-group-${rand}`, + TEST_TOPIC: kafkaTopic, + TEST_KAFKA_HOST: kafkaHost, + TEST_MODE: 'consume', + // Suppres warinings about new default partitioner + // https://kafka.js.org/docs/migration-guide-v2.0.0#producer-new-default-partitioner + KAFKAJS_NO_PARTITIONER_WARNING: '1', + }, + checkApmServer(t, apmServer) { + t.ok(apmServer.events[0].metadata, 'metadata'); + const events = sortApmEvents(apmServer.events); + + // Consuming does not generate spans + const spans = events.filter((e) => e.span).map((e) => e.span); + t.equal(spans.length, 0, 'there are no spans'); + + // Gat stashed data from previous test + const traceId = store.get('ctx-propagation-trace-id'); + const parentId = store.get('ctx-propagation-parent-id'); + + // Check the transactions fo consuming messages have the proper trace + const transactions = events + .filter((e) => e.transaction) + .map((e) => e.transaction); + + t.ok( + transactions.every((t) => t.trace_id === traceId), + 'all transactions have the right trace_id', + ); + + t.ok( + transactions.every((t) => t.parent_id === parentId), + 'all transactions have the right parent_id', + ); + + t.ok( + transactions.every((t) => { + const traceparent = t.context.message.headers.traceparent; + return traceparent === `00-${t.trace_id}-${parentId}-01`; + }), + 'all transactions have the right traceparent header', + ); + + t.ok( + transactions.every((t) => { + const tracestate = t.context.message.headers.tracestate; + return tracestate === 'es=s:1'; + }), + 'all transactions have the right tracestate header', + ); + + t.equal(transactions.length, 2, 'get the right amount of transactions'); + }, + }, +]; + +test('kafkajs fixtures', (suite) => { + runTestFixtures(suite, testFixtures); + suite.end(); +});