Skip to content

Commit

Permalink
feat: add kafkajs instrumentation (#3786)
Browse files Browse the repository at this point in the history
Closes: #2905
  • Loading branch information
david-luna authored Mar 12, 2024
1 parent 8374a42 commit 489fc50
Show file tree
Hide file tree
Showing 23 changed files with 2,039 additions and 0 deletions.
8 changes: 8 additions & 0 deletions .ci/docker/docker-compose-all.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ services:
extends:
file: docker-compose.yml
service: localstack
kafka:
extends:
file: docker-compose.yml
service: kafka
node_tests:
extends:
file: docker-compose-node-test.yml
Expand All @@ -60,6 +64,8 @@ services:
condition: service_healthy
localstack:
condition: service_healthy
kafka:
condition: service_healthy

volumes:
nodepgdata:
Expand All @@ -76,3 +82,5 @@ volumes:
driver: local
nodelocalstackdata:
driver: local
nodekafkadata:
driver: local
8 changes: 8 additions & 0 deletions .ci/docker/docker-compose-edge.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ services:
extends:
file: docker-compose.yml
service: redis
kafka:
extends:
file: docker-compose.yml
service: kafka
node_tests:
extends:
file: docker-compose-node-edge-test.yml
Expand All @@ -60,6 +64,8 @@ services:
condition: service_healthy
redis:
condition: service_healthy
kafka:
condition: service_healthy

volumes:
nodepgdata:
Expand All @@ -76,3 +82,5 @@ volumes:
driver: local
nodecassandradata:
driver: local
nodekafkadata:
driver: local
28 changes: 28 additions & 0 deletions .ci/docker/docker-compose-kafka.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
version: '2.1'

services:
zookeeper:
extends:
file: docker-compose.yml
service: zookeeper
kafka:
extends:
file: docker-compose.yml
service: kafka
depends_on:
- zookeeper
node_tests:
extends:
file: docker-compose-node-test.yml
service: node_tests
depends_on:
- kafka
# TODO: uncomment this if health_check is necessary
# kafka:
# condition: service_healthy

volumes:
nodekafkadata:
driver: local
nodezookeeperdata:
driver: local
1 change: 1 addition & 0 deletions .ci/docker/docker-compose-node-edge-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ services:
PGUSER: 'postgres'
MEMCACHED_HOST: 'memcached'
LOCALSTACK_HOST: 'localstack:4566'
KAFKA_HOST: 'kafka:9093'
NODE_VERSION: ${NODE_VERSION}
NODE_FULL_VERSION: ${NODE_FULL_VERSION}
NVM_NODEJS_ORG_MIRROR: ${NVM_NODEJS_ORG_MIRROR}
Expand Down
1 change: 1 addition & 0 deletions .ci/docker/docker-compose-node-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ services:
PGUSER: 'postgres'
MEMCACHED_HOST: 'memcached'
LOCALSTACK_HOST: 'localstack:4566'
KAFKA_HOST: 'kafka:9093'
NODE_VERSION: ${NODE_VERSION}
TAV: ${TAV_MODULE}
ELASTIC_APM_CONTEXT_MANAGER: ${ELASTIC_APM_CONTEXT_MANAGER}
Expand Down
41 changes: 41 additions & 0 deletions .ci/docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,43 @@ services:
volumes:
- nodelocalstackdata:/var/lib/localstack

zookeeper:
# https://hub.docker.com/r/bitnami/zookeeper/tags
image: bitnami/zookeeper:3.9.1
ports:
- "2181:2181"
volumes:
- nodezookeeperdata:/bitnami
environment:
- ALLOW_ANONYMOUS_LOGIN=yes

kafka:
# https://hub.docker.com/r/bitnami/kafka/tags
image: bitnami/kafka:3.3.2
ports:
- "9093:9093"
volumes:
- nodekafkadata:/var/lib/kafka/data
environment:
- KAFKA_BROKER_ID=1
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://kafka:9093
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT
- KAFKA_CFG_DELETE_TOPIC_ENABLE=true
depends_on:
- zookeeper
# TODO: maybe not necessary but figure out how to do this
healthcheck:
# use netcat to check tcp connection available
# test: nc -z localhost 9093 || exit -1
# start_period: 15s
# interval: 5s
# timeout: 10s
# retries: 5

volumes:
nodepgdata:
driver: local
Expand All @@ -155,3 +192,7 @@ volumes:
driver: local
nodelocalstackdata:
driver: local
nodekafkadata:
driver: local
nodezookeeperdata:
driver: local
3 changes: 3 additions & 0 deletions .ci/scripts/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,9 @@ elif [[ -n "${TAV_MODULE}" ]]; then
aws-sdk|@aws-sdk/client-s3|@aws-sdk/client-dynamodb|@aws-sdk/client-sns|@aws-sdk/client-sqs)
DOCKER_COMPOSE_FILE=docker-compose-localstack.yml
;;
kafkajs)
DOCKER_COMPOSE_FILE=docker-compose-kafka.yml
;;
*)
# Just the "node_tests" container. No additional services needed for testing.
DOCKER_COMPOSE_FILE=docker-compose-node-test.yml
Expand Down
1 change: 1 addition & 0 deletions .ci/tav.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
{ "name": "generic-pool", "minMajorVersion": 8 },
{ "name": "graphql", "minMajorVersion": 8 },
{ "name": "ioredis", "minMajorVersion": 8 },
{ "name": "kafkajs", "minMajorVersion": 14 },
{ "name": "knex", "minMajorVersion": 8 },
{ "name": "memcached", "minMajorVersion": 8 },
{ "name": "mongodb", "minMajorVersion": 8 },
Expand Down
25 changes: 25 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,31 @@ jobs:
volumes:
- nodelocalstackdata:/var/lib/localstack

zookeeper:
image: bitnami/zookeeper:3.9.1
env:
ALLOW_ANONYMOUS_LOGIN: 'yes'
ports:
- "2181:2181"
volumes:
- nodezookeeperdata:/var/lib/zookeeper/data

kafka:
image: bitnami/kafka:3.3.2
ports:
- "9093:9093"
volumes:
- nodekafkadata:/var/lib/kafka/data
env:
KAFKA_BROKER_ID: '1'
KAFKA_CFG_ZOOKEEPER_CONNECT: 'zookeeper:2181'
ALLOW_PLAINTEXT_LISTENER: 'yes'
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: 'CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT'
KAFKA_CFG_LISTENERS: 'CLIENT://:9092,EXTERNAL://:9093'
KAFKA_CFG_ADVERTISED_LISTENERS: 'CLIENT://kafka:9092,EXTERNAL://localhost:9093'
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: 'CLIENT'
KAFKA_CFG_DELETE_TOPIC_ENABLE: 'true'

strategy:
fail-fast: false
matrix:
Expand Down
6 changes: 6 additions & 0 deletions .tav.yml
Original file line number Diff line number Diff line change
Expand Up @@ -483,3 +483,9 @@ undici:
mode: max-7
include: '>=4.7.1 <6'
commands: node test/instrumentation/modules/undici/undici.test.js

kafkajs:
versions:
mode: latest-minors
include: '>=2 <3'
commands: node test/instrumentation/modules/kafkajs/kafkajs.test.js
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ See the <<upgrade-to-v4>> guide.
* Update <<opentelemetry-bridge>> support to `@opentelemetry/api` version 1.8.0.
* Add support for `tedious` versions 17 and 18. ({pull}3901[#3901], {pull}3911[#3911])
* Add support for `kafkajs` version v2. ({issues}2905[#2905])
[float]
===== Bug fixes
Expand Down
1 change: 1 addition & 0 deletions docs/supported-technologies.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ so those should be supported as well.
|https://www.npmjs.com/package/tedious[tedious] |>=1.9 <19.0.0 | (Excluding v4.0.0.) Will instrument all queries
|https://www.npmjs.com/package/undici[undici] | >=4.7.1 <6 | Will instrument undici HTTP requests, except HTTP CONNECT. Requires node v14.17.0 or later, or the user to have installed the https://www.npmjs.com/package/diagnostics_channel['diagnostics_channel' polyfill].
|https://www.npmjs.com/package/ws[ws] |>=1.0.0 <8.0.0 |Will instrument outgoing WebSocket messages
|https://www.npmjs.com/package/kafkajs[kafkajs] |>=2.0.0 <3.0.0 |Will instrument all send methods for producers and message and batch processing for consumers.
|=======================================================================

[float]
Expand Down
89 changes: 89 additions & 0 deletions examples/trace-kafka.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
#!/usr/bin/env node --unhandled-rejections=strict

/*
* Copyright Elasticsearch B.V. and other contributors where applicable.
* Licensed under the BSD 2-Clause License; you may not use this file except in
* compliance with the BSD 2-Clause License.
*/

// A small example showing Elastic APM tracing the 'kadfkajs' package.
//
// This assumes a Kafka server running on localhost. You can use:
// npm run docker:start kafka
// to start a Kafka container. Then `npm run docker:stop` to stop it.

// eslint-disable-next-line no-unused-vars
const apm = require('../').start({
serviceName: 'example-trace-kafka',
});

const { Buffer } = require('buffer');
const { TextEncoder } = require('util');

const { Kafka } = require('kafkajs');

const topic = 'trace-kafka-topic';
const kafka = new Kafka({ clientId: 'my-app', brokers: ['localhost:9093'] });
const admin = kafka.admin();

const headerStr = 'value inside buffer';
const headerEnc = new TextEncoder().encode(headerStr);
const headerBuf = Buffer.from(headerEnc);

let producer, consumer;
let messagesConsumed = 0;

async function run() {
await admin.connect();
await admin.createTopics({ topics: [{ topic }] });

consumer = kafka.consumer({ groupId: 'trace-group' });
producer = kafka.producer();

await producer.connect();
await producer.send({
topic,
messages: [
{ value: 'message 1', headers: { foo: 'bar' } },
{ value: 'message 2', headers: { foo: headerBuf } },
{ value: 'message 3' },
],
});

await consumer.connect();
await consumer.subscribe({ topic, fromBeginning: true });
await consumer.run({
eachMessage: async function ({ topic, partition, message }) {
console.log(`message from topic(${topic}): ${message.value.toString()}`);
console.log(`message header ${message.headers.foo}`);
messagesConsumed++;
},
});

await new Promise((resolve, reject) => {
let count = 0;
const id = setInterval(() => {
count++;
if (messagesConsumed === 3) {
clearInterval(id);
resolve();
} else if (count > 10) {
// set a limit of 10s/retries
clearInterval(id);
reject(new Error('not receiving all messages after 10s'));
}
}, 1000);
});
}

run()
.catch((err) => {
console.warn('run err:', err);
})
.finally(async () => {
console.log('disconnecting Kafkajs client');
await producer.disconnect();
await consumer.disconnect();
await admin.deleteTopics({ topics: [topic] });
await admin.disconnect();
});
1 change: 1 addition & 0 deletions lib/instrumentation/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ var MODULE_PATCHERS = [
{ modPath: 'http2' },
{ modPath: 'ioredis' },
{ modPath: 'jade' },
{ modPath: 'kafkajs' },
{ modPath: 'knex' },
{ modPath: 'koa' },
{ modPath: 'koa-router' },
Expand Down
Loading

0 comments on commit 489fc50

Please sign in to comment.