Skip to content

Commit

Permalink
[service-utils] switch to @confluentinc/kafka-javascript (#6345)
Browse files Browse the repository at this point in the history
  • Loading branch information
arcoraven authored Feb 26, 2025
1 parent 2a4b5dc commit 0595c37
Show file tree
Hide file tree
Showing 6 changed files with 281 additions and 238 deletions.
5 changes: 5 additions & 0 deletions .changeset/pink-states-tell.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@thirdweb-dev/service-utils": patch
---

[service-utils] Use @confluentinc/kafka-javascript
4 changes: 1 addition & 3 deletions packages/service-utils/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,12 @@
],
"sideEffects": false,
"dependencies": {
"@confluentinc/kafka-javascript": "^1.2.0",
"aws4fetch": "1.0.20",
"kafkajs": "2.2.4",
"lz4js": "0.2.0",
"zod": "3.24.2"
},
"devDependencies": {
"@cloudflare/workers-types": "4.20250224.0",
"@types/lz4js": "0.2.1",
"@types/node": "22.13.5",
"typescript": "5.7.3",
"vitest": "3.0.7"
Expand Down
13 changes: 12 additions & 1 deletion packages/service-utils/src/core/usageV2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,14 @@ export const USAGE_V2_SOURCES = [
] as const;
export type UsageV2Source = (typeof USAGE_V2_SOURCES)[number];
export function getTopicName(source: UsageV2Source) {
return `usage_v2.raw_${source}`;
switch (source) {
// Some sources are sent from clients and are written to an "untrusted" table.
case "sdk":
case "engine":
return `usage_v2.untrusted_raw_${source}`;
default:
return `usage_v2.raw_${source}`;
}
}

export interface ClientUsageV2Event {
Expand Down Expand Up @@ -55,6 +62,10 @@ export interface ClientUsageV2Event {
* The product version, if available.
*/
product_version?: string;
/**
* The event version. Defaults to 1.
*/
version?: number;
/**
* An object of arbitrary key-value pairs.
* Values can be boolean, number, string, Date, or null.
Expand Down
121 changes: 45 additions & 76 deletions packages/service-utils/src/node/kafka.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
import { checkServerIdentity } from "node:tls";
import { CompressionTypes, Kafka, type Producer } from "kafkajs";
import { compress, decompress } from "lz4js";

// CompressionCodecs is not exported properly in kafkajs. Source: https://github.com/tulios/kafkajs/issues/1391
import KafkaJS from "kafkajs";
const { CompressionCodecs } = KafkaJS;
import {
KafkaJS,
type ProducerGlobalConfig,
} from "@confluentinc/kafka-javascript";

/**
* Reference: https://kafka.js.org/docs/producing#producing-messages
Expand Down Expand Up @@ -33,104 +30,73 @@ export interface KafkaProducerSendOptions {
* ```
*/
export class KafkaProducer {
private kafka: Kafka;
private producer: Producer | null = null;
private compression: CompressionTypes;
private producer: KafkaJS.Producer;
private isConnected = false;

constructor(config: {
constructor(options: {
/**
* A descriptive name for your service. Example: "storage-server"
*/
producerName: string;
/**
* The environment the service is running in.
*/
environment: "development" | "production";
/**
* Whether to compress the events.
* A comma-separated list of `host[:port]` Kafka servers.
*/
shouldCompress?: boolean;

kafkaServers: string;
username: string;
password: string;

/**
* Configuration for the Kafka producer.
*/
config?: ProducerGlobalConfig;
}) {
const {
producerName,
environment,
shouldCompress = true,
username,
password,
} = config;
const { producerName, kafkaServers, username, password, config } = options;

this.kafka = new Kafka({
clientId: `${producerName}-${environment}`,
brokers:
environment === "production"
? ["warpstream.thirdweb.xyz:9092"]
: ["warpstream-dev.thirdweb.xyz:9092"],
ssl: {
checkServerIdentity(hostname, cert) {
return checkServerIdentity(hostname.toLowerCase(), cert);
},
},
sasl: {
mechanism: "plain",
username,
password,
},
this.producer = new KafkaJS.Kafka({}).producer({
"client.id": producerName,
"bootstrap.servers": kafkaServers,
"security.protocol": "sasl_ssl",
"sasl.mechanisms": "PLAIN",
"sasl.username": username,
"sasl.password": password,
"compression.codec": "lz4",
"allow.auto.create.topics": true,
// All configuration can be overridden.
...config,
});
}

if (shouldCompress) {
this.compression = CompressionTypes.LZ4;

CompressionCodecs[CompressionTypes.LZ4] = () => ({
// biome-ignore lint/style/noRestrictedGlobals: kafkajs expects a Buffer
compress: (encoder: { buffer: Buffer }) => {
const compressed = compress(encoder.buffer);
// biome-ignore lint/style/noRestrictedGlobals: kafkajs expects a Buffer
return Buffer.from(compressed);
},
// biome-ignore lint/style/noRestrictedGlobals: kafkajs expects a Buffer
decompress: (buffer: Buffer) => {
const decompressed = decompress(buffer);
// biome-ignore lint/style/noRestrictedGlobals: kafkajs expects a Buffer
return Buffer.from(decompressed);
},
});
} else {
this.compression = CompressionTypes.None;
}
/**
* Connects the producer. Can be called explicitly at the start of your service, or will be called automatically when sending messages.
*/
async connect() {
await this.producer.connect();
this.isConnected = true;
}

/**
* Send messages to a Kafka topic.
* This method may throw. To call this non-blocking:
* ```ts
* void kafka.send(topic, events).catch((e) => console.error(e))
* ```
*
* @param topic
* @param messages
* @param configOverrides
*/
async send(
topic: string,
messages: Record<string, unknown>[],
options?: KafkaProducerSendOptions,
): Promise<void> {
if (!this.producer) {
this.producer = this.kafka.producer({
allowAutoTopicCreation: options?.allowAutoTopicCreation ?? false,
maxInFlightRequests: options?.maxInFlightRequests ?? 2000,
retry: { retries: options?.retries ?? 5 },
});
await this.producer.connect();
if (!this.isConnected) {
await this.connect();
}

await this.producer.send({
topic,
messages: messages.map((m) => ({
value: JSON.stringify(m),
})),
compression: this.compression,
acks: options?.acks ?? -1, // Default: All brokers must acknowledge
timeout: options?.timeout ?? 10_000, // Default: 10 seconds
});
}

Expand All @@ -139,9 +105,12 @@ export class KafkaProducer {
* Useful when shutting down the service to flush in-flight events.
*/
async disconnect() {
if (this.producer) {
await this.producer.disconnect();
this.producer = null;
if (this.isConnected) {
try {
await this.producer.flush();
await this.producer.disconnect();
} catch {}
this.isConnected = false;
}
}
}
23 changes: 9 additions & 14 deletions packages/service-utils/src/node/usageV2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {
type UsageV2Source,
getTopicName,
} from "../core/usageV2.js";
import { KafkaProducer, type KafkaProducerSendOptions } from "./kafka.js";
import { KafkaProducer } from "./kafka.js";

/**
* Creates a UsageV2Producer which opens a persistent TCP connection.
Expand Down Expand Up @@ -35,18 +35,13 @@ export class UsageV2Producer {
* The product where usage is coming from.
*/
source: UsageV2Source;
/**
* Whether to compress the events.
*/
shouldCompress?: boolean;

username: string;
password: string;
}) {
this.kafkaProducer = new KafkaProducer({
producerName: config.producerName,
environment: config.environment,
shouldCompress: config.shouldCompress,
username: config.username,
password: config.password,
});
Expand All @@ -56,25 +51,25 @@ export class UsageV2Producer {
/**
* Send usageV2 events.
* This method may throw. To call this non-blocking:
* ```ts
* void usageV2.sendEvents(events).catch((e) => console.error(e))
* ```
*
* @param events
*/
async sendEvents(
events: UsageV2Event[],
/**
* Reference: https://kafka.js.org/docs/producing#producing-messages
*/
options?: KafkaProducerSendOptions,
): Promise<void> {
async sendEvents(events: UsageV2Event[]): Promise<void> {
const parsedEvents = events.map((event) => ({
...event,
// Default to a generated UUID.
id: event.id ?? randomUUID(),
// Default to now.
created_at: event.created_at ?? new Date(),
// Remove the "team_" prefix, if any.
team_id: event.team_id.startsWith("team_")
? event.team_id.slice(5)
: event.team_id,
}));
await this.kafkaProducer.send(this.topic, parsedEvents, options);
await this.kafkaProducer.send(this.topic, parsedEvents);
}

/**
Expand Down
Loading

0 comments on commit 0595c37

Please sign in to comment.