Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: simplify mr blobby v2 abstractions #27896

Merged
merged 9 commits into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
import { captureException } from '@sentry/node'
import { CODES, features, KafkaConsumer, librdkafkaVersion, Message, TopicPartition } from 'node-rdkafka'
import {
CODES,
features,
KafkaConsumer,
librdkafkaVersion,
Message,
TopicPartition,
TopicPartitionOffset,
} from 'node-rdkafka'

import { KafkaProducerWrapper } from '~/src/kafka/producer'
import { PostgresRouter } from '~/src/utils/db/postgres'
Expand All @@ -19,20 +27,17 @@ import {
KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_OVERFLOW,
} from './constants'
import { KafkaMessageParser } from './kafka/message-parser'
import { KafkaMetrics } from './kafka/metrics'
import { KafkaOffsetManager } from './kafka/offset-manager'
import { SessionRecordingMetrics } from './metrics'
import { SessionRecordingIngesterMetrics } from './metrics'
import { PromiseScheduler } from './promise-scheduler'
import { BlackholeSessionBatchWriter } from './sessions/blackhole-session-batch-writer'
import { SessionBatchManager } from './sessions/session-batch-manager'
import { SessionBatchRecorder, SessionBatchRecorderInterface } from './sessions/session-batch-recorder'
import { SessionBatchRecorder } from './sessions/session-batch-recorder'
import { TeamFilter } from './teams/team-filter'
import { TeamService } from './teams/team-service'
import { MessageWithTeam } from './teams/types'
import { CaptureIngestionWarningFn } from './types'
import { getPartitionsForTopic } from './utils'
import { LibVersionMonitor } from './versions/lib-version-monitor'
import { VersionMetrics } from './versions/version-metrics'

// Must require as `tsc` strips unused `import` statements and just requiring this seems to init some globals
require('@sentry/tracing')
Expand All @@ -45,7 +50,6 @@ export class SessionRecordingIngester {
isStopping = false

private isDebugLoggingEnabled: ValueMatcher<number>
private readonly metrics: SessionRecordingMetrics
private readonly promiseScheduler: PromiseScheduler
private readonly batchConsumerFactory: BatchConsumerFactory
private readonly sessionBatchManager: SessionBatchManager
Expand All @@ -69,31 +73,19 @@ export class SessionRecordingIngester {

this.promiseScheduler = new PromiseScheduler()

this.kafkaParser = new KafkaMessageParser(KafkaMetrics.getInstance())
this.kafkaParser = new KafkaMessageParser()
this.teamFilter = new TeamFilter(new TeamService(postgres))
if (ingestionWarningProducer) {
const captureWarning: CaptureIngestionWarningFn = async (teamId, type, details, debounce) => {
await captureIngestionWarning(ingestionWarningProducer, teamId, type, details, debounce)
}
this.libVersionMonitor = new LibVersionMonitor(captureWarning, VersionMetrics.getInstance())
this.libVersionMonitor = new LibVersionMonitor(captureWarning)
}

this.metrics = SessionRecordingMetrics.getInstance()

const offsetManager = new KafkaOffsetManager(async (offsets) => {
await new Promise<void>((resolve, reject) => {
try {
this.batchConsumer!.consumer.commitSync(offsets)
resolve()
} catch (error) {
reject(error)
}
})
}, this.topic)
const offsetManager = new KafkaOffsetManager(this.commitOffsets.bind(this), this.topic)
this.sessionBatchManager = new SessionBatchManager({
maxBatchSizeBytes: (config.SESSION_RECORDING_MAX_BATCH_SIZE_KB ?? 0) * 1024,
maxBatchAgeMs: config.SESSION_RECORDING_MAX_BATCH_AGE_MS ?? 1000,
createBatch: () => new SessionBatchRecorder(new BlackholeSessionBatchWriter()),
offsetManager,
})

Expand Down Expand Up @@ -128,15 +120,14 @@ export class SessionRecordingIngester {
}

private async processBatchMessages(messages: Message[], context: { heartbeat: () => void }): Promise<void> {
// Increment message received counter for each message
messages.forEach((message) => {
this.metrics.incrementMessageReceived(message.partition)
SessionRecordingIngesterMetrics.incrementMessageReceived(message.partition)
})

const batchSize = messages.length
const batchSizeKb = messages.reduce((acc, m) => (m.value?.length ?? 0) + acc, 0) / 1024
this.metrics.observeKafkaBatchSize(batchSize)
this.metrics.observeKafkaBatchSizeKb(batchSizeKb)
SessionRecordingIngesterMetrics.observeKafkaBatchSize(batchSize)
SessionRecordingIngesterMetrics.observeKafkaBatchSizeKb(batchSizeKb)

const processedMessages = await runInstrumentedFunction({
statsKey: `recordingingesterv2.handleEachBatch.parseBatch`,
Expand Down Expand Up @@ -176,10 +167,10 @@ export class SessionRecordingIngester {
})
}

private consume(message: MessageWithTeam, batch: SessionBatchRecorderInterface) {
private consume(message: MessageWithTeam, batch: SessionBatchRecorder) {
// we have to reset this counter once we're consuming messages since then we know we're not re-balancing
// otherwise the consumer continues to report however many sessions were revoked at the last re-balance forever
this.metrics.resetSessionsRevoked()
SessionRecordingIngesterMetrics.resetSessionsRevoked()
const { team, message: parsedMessage } = message
const debugEnabled = this.isDebugLoggingEnabled(parsedMessage.metadata.partition)

Expand All @@ -203,7 +194,7 @@ export class SessionRecordingIngester {
})
}

this.metrics.observeSessionInfo(parsedMessage.metadata.rawSize)
SessionRecordingIngesterMetrics.observeSessionInfo(parsedMessage.metadata.rawSize)
batch.record(message)
}

Expand Down Expand Up @@ -307,7 +298,18 @@ export class SessionRecordingIngester {
return
}

this.metrics.resetSessionsHandled()
SessionRecordingIngesterMetrics.resetSessionsHandled()
await this.sessionBatchManager.discardPartitions(revokedPartitions)
}

private async commitOffsets(offsets: TopicPartitionOffset[]): Promise<void> {
await new Promise<void>((resolve, reject) => {
try {
this.batchConsumer!.consumer.commitSync(offsets)
resolve()
} catch (error) {
reject(error)
}
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,19 @@ import { promisify } from 'node:util'
import { Message } from 'node-rdkafka'
import { gzip } from 'zlib'

import { KafkaMessageParser } from '../../../../../src/main/ingestion-queues/session-recording-v2/kafka/message-parser'
import { KafkaMetrics } from '../../../../../src/main/ingestion-queues/session-recording-v2/kafka/metrics'
import { KafkaMessageParser } from './message-parser'
import { KafkaMetrics } from './metrics'

const compressWithGzip = promisify(gzip)

jest.mock('./metrics')

describe('KafkaMessageParser', () => {
let parser: KafkaMessageParser
let mockKafkaMetrics: jest.Mocked<KafkaMetrics>

beforeEach(() => {
mockKafkaMetrics = {
incrementMessageDropped: jest.fn(),
} as jest.Mocked<KafkaMetrics>
parser = new KafkaMessageParser(mockKafkaMetrics)
jest.clearAllMocks()
parser = new KafkaMessageParser()
})

const createMessage = (data: any, overrides: Partial<Message> = {}): Message => ({
Expand Down Expand Up @@ -71,7 +70,7 @@ describe('KafkaMessageParser', () => {
},
snapshot_source: undefined,
})
expect(mockKafkaMetrics.incrementMessageDropped).not.toHaveBeenCalled()
expect(KafkaMetrics.incrementMessageDropped).not.toHaveBeenCalled()
})

it('handles gzipped message', async () => {
Expand Down Expand Up @@ -108,7 +107,7 @@ describe('KafkaMessageParser', () => {
end: 1234567891,
},
})
expect(mockKafkaMetrics.incrementMessageDropped).not.toHaveBeenCalled()
expect(KafkaMetrics.incrementMessageDropped).not.toHaveBeenCalled()
})

it('filters out message with missing value', async () => {
Expand All @@ -117,7 +116,7 @@ describe('KafkaMessageParser', () => {
const results = await parser.parseBatch(messages)

expect(results).toHaveLength(0)
expect(mockKafkaMetrics.incrementMessageDropped).toHaveBeenCalledWith(
expect(KafkaMetrics.incrementMessageDropped).toHaveBeenCalledWith(
'session_recordings_blob_ingestion',
'message_value_or_timestamp_is_empty'
)
Expand All @@ -129,7 +128,7 @@ describe('KafkaMessageParser', () => {
const results = await parser.parseBatch(messages)

expect(results).toHaveLength(0)
expect(mockKafkaMetrics.incrementMessageDropped).toHaveBeenCalledWith(
expect(KafkaMetrics.incrementMessageDropped).toHaveBeenCalledWith(
'session_recordings_blob_ingestion',
'message_value_or_timestamp_is_empty'
)
Expand All @@ -141,7 +140,7 @@ describe('KafkaMessageParser', () => {
const results = await parser.parseBatch(messages)

expect(results).toHaveLength(0)
expect(mockKafkaMetrics.incrementMessageDropped).toHaveBeenCalledWith(
expect(KafkaMetrics.incrementMessageDropped).toHaveBeenCalledWith(
'session_recordings_blob_ingestion',
'invalid_gzip_data'
)
Expand All @@ -153,7 +152,7 @@ describe('KafkaMessageParser', () => {
const results = await parser.parseBatch(messages)

expect(results).toHaveLength(0)
expect(mockKafkaMetrics.incrementMessageDropped).toHaveBeenCalledWith(
expect(KafkaMetrics.incrementMessageDropped).toHaveBeenCalledWith(
'session_recordings_blob_ingestion',
'invalid_json'
)
Expand All @@ -174,7 +173,7 @@ describe('KafkaMessageParser', () => {
const results = await parser.parseBatch(messages)

expect(results).toHaveLength(0)
expect(mockKafkaMetrics.incrementMessageDropped).toHaveBeenCalledWith(
expect(KafkaMetrics.incrementMessageDropped).toHaveBeenCalledWith(
'session_recordings_blob_ingestion',
'received_non_snapshot_message'
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,14 @@ const GZIP_HEADER = Uint8Array.from([0x1f, 0x8b, 0x08, 0x00])
const decompressWithGzip = promisify(gunzip)

export class KafkaMessageParser {
constructor(private readonly metrics: KafkaMetrics) {}

public async parseBatch(messages: Message[]): Promise<ParsedMessageData[]> {
const parsedMessages = await Promise.all(messages.map((message) => this.parseMessage(message)))
return parsedMessages.filter((msg) => msg !== null) as ParsedMessageData[]
}

private async parseMessage(message: Message): Promise<ParsedMessageData | null> {
const dropMessage = (reason: string, extra?: Record<string, any>) => {
this.metrics.incrementMessageDropped('session_recordings_blob_ingestion', reason)
KafkaMetrics.incrementMessageDropped('session_recordings_blob_ingestion', reason)

status.warn('⚠️', 'invalid_message', {
reason,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,7 @@
import { eventDroppedCounter } from '../../metrics'

export class KafkaMetrics {
private static instance: KafkaMetrics

public constructor() {}

public static getInstance(): KafkaMetrics {
if (!KafkaMetrics.instance) {
KafkaMetrics.instance = new KafkaMetrics()
}
return KafkaMetrics.instance
}

public incrementMessageDropped(event_type: string, drop_cause: string): void {
public static incrementMessageDropped(event_type: string, drop_cause: string): void {
eventDroppedCounter.labels({ event_type, drop_cause }).inc()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
import { KafkaOffsetManager } from './offset-manager'

describe('KafkaOffsetManager', () => {
let offsetManager: KafkaOffsetManager
let mockCommitOffsets: jest.Mock<Promise<void>>
const TEST_TOPIC = 'test_topic'

beforeEach(() => {
mockCommitOffsets = jest.fn().mockResolvedValue(undefined)
offsetManager = new KafkaOffsetManager(mockCommitOffsets, TEST_TOPIC)
})

it('should track offsets when recording messages', async () => {
offsetManager.trackOffset({ partition: 1, offset: 100 })

await offsetManager.commit()

expect(mockCommitOffsets).toHaveBeenCalledWith([{ topic: TEST_TOPIC, partition: 1, offset: 101 }])
})

it('should commit offsets for multiple partitions', async () => {
const messages = [
{ partition: 1, offset: 100 },
{ partition: 1, offset: 101 },
{ partition: 2, offset: 200 },
]

for (const metadata of messages) {
offsetManager.trackOffset(metadata)
}

await offsetManager.commit()

expect(mockCommitOffsets).toHaveBeenCalledWith([
{ topic: TEST_TOPIC, partition: 1, offset: 102 }, // Last offset + 1
{ topic: TEST_TOPIC, partition: 2, offset: 201 }, // Last offset + 1
])
})

it('should clear offsets after commit', async () => {
offsetManager.trackOffset({ partition: 1, offset: 100 })
await offsetManager.commit()

// Second commit should not commit anything
await offsetManager.commit()

expect(mockCommitOffsets).toHaveBeenCalledTimes(1)
})

it('should handle commit failures', async () => {
const error = new Error('Commit failed')
mockCommitOffsets.mockRejectedValueOnce(error)

offsetManager.trackOffset({ partition: 1, offset: 100 })

await expect(offsetManager.commit()).rejects.toThrow(error)
})

describe('partition handling', () => {
it('should not commit offsets for discarded partitions', async () => {
// Record messages for two partitions
offsetManager.trackOffset({ partition: 1, offset: 100 })
offsetManager.trackOffset({ partition: 2, offset: 200 })

// Discard partition 1
offsetManager.discardPartition(1)

await offsetManager.commit()

// Should only commit offset for partition 2
expect(mockCommitOffsets).toHaveBeenCalledWith([
{
topic: 'test_topic',
partition: 2,
offset: 201,
},
])
})

it('should handle discarding already committed partitions', async () => {
// Record and commit a message
offsetManager.trackOffset({ partition: 1, offset: 100 })
await offsetManager.commit()

// Discard the partition after commit
offsetManager.discardPartition(1)

// Record new message for same partition
offsetManager.trackOffset({ partition: 1, offset: 101 })
await offsetManager.commit()

expect(mockCommitOffsets).toHaveBeenCalledTimes(2)
expect(mockCommitOffsets).toHaveBeenLastCalledWith([
{
topic: 'test_topic',
partition: 1,
offset: 102,
},
])
})

it('should handle discarding non-existent partitions', () => {
offsetManager.discardPartition(999)
// No error should be thrown
})

it('should maintain highest offset when recording multiple messages', async () => {
// Record messages in non-sequential order
offsetManager.trackOffset({ partition: 1, offset: 100 })
offsetManager.trackOffset({ partition: 1, offset: 99 })
offsetManager.trackOffset({ partition: 1, offset: 101 })

await offsetManager.commit()

expect(mockCommitOffsets).toHaveBeenCalledWith([
{
topic: 'test_topic',
partition: 1,
offset: 102,
},
])
})
})
})
Loading