diff --git a/plugin-server/src/main/ingestion-queues/session-recording-v2/consumer.ts b/plugin-server/src/main/ingestion-queues/session-recording-v2/consumer.ts index c6db2d56fe1d9..b985ca89c0ffb 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording-v2/consumer.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording-v2/consumer.ts @@ -1,5 +1,13 @@ import { captureException } from '@sentry/node' -import { CODES, features, KafkaConsumer, librdkafkaVersion, Message, TopicPartition } from 'node-rdkafka' +import { + CODES, + features, + KafkaConsumer, + librdkafkaVersion, + Message, + TopicPartition, + TopicPartitionOffset, +} from 'node-rdkafka' import { KafkaProducerWrapper } from '~/src/kafka/producer' import { PostgresRouter } from '~/src/utils/db/postgres' @@ -19,20 +27,17 @@ import { KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_OVERFLOW, } from './constants' import { KafkaMessageParser } from './kafka/message-parser' -import { KafkaMetrics } from './kafka/metrics' import { KafkaOffsetManager } from './kafka/offset-manager' -import { SessionRecordingMetrics } from './metrics' +import { SessionRecordingIngesterMetrics } from './metrics' import { PromiseScheduler } from './promise-scheduler' -import { BlackholeSessionBatchWriter } from './sessions/blackhole-session-batch-writer' import { SessionBatchManager } from './sessions/session-batch-manager' -import { SessionBatchRecorder, SessionBatchRecorderInterface } from './sessions/session-batch-recorder' +import { SessionBatchRecorder } from './sessions/session-batch-recorder' import { TeamFilter } from './teams/team-filter' import { TeamService } from './teams/team-service' import { MessageWithTeam } from './teams/types' import { CaptureIngestionWarningFn } from './types' import { getPartitionsForTopic } from './utils' import { LibVersionMonitor } from './versions/lib-version-monitor' -import { VersionMetrics } from './versions/version-metrics' // Must require as `tsc` strips unused `import` statements and just requiring this seems to init some globals require('@sentry/tracing') @@ -45,7 +50,6 @@ export class SessionRecordingIngester { isStopping = false private isDebugLoggingEnabled: ValueMatcher - private readonly metrics: SessionRecordingMetrics private readonly promiseScheduler: PromiseScheduler private readonly batchConsumerFactory: BatchConsumerFactory private readonly sessionBatchManager: SessionBatchManager @@ -69,31 +73,19 @@ export class SessionRecordingIngester { this.promiseScheduler = new PromiseScheduler() - this.kafkaParser = new KafkaMessageParser(KafkaMetrics.getInstance()) + this.kafkaParser = new KafkaMessageParser() this.teamFilter = new TeamFilter(new TeamService(postgres)) if (ingestionWarningProducer) { const captureWarning: CaptureIngestionWarningFn = async (teamId, type, details, debounce) => { await captureIngestionWarning(ingestionWarningProducer, teamId, type, details, debounce) } - this.libVersionMonitor = new LibVersionMonitor(captureWarning, VersionMetrics.getInstance()) + this.libVersionMonitor = new LibVersionMonitor(captureWarning) } - this.metrics = SessionRecordingMetrics.getInstance() - - const offsetManager = new KafkaOffsetManager(async (offsets) => { - await new Promise((resolve, reject) => { - try { - this.batchConsumer!.consumer.commitSync(offsets) - resolve() - } catch (error) { - reject(error) - } - }) - }, this.topic) + const offsetManager = new KafkaOffsetManager(this.commitOffsets.bind(this), this.topic) this.sessionBatchManager = new SessionBatchManager({ maxBatchSizeBytes: (config.SESSION_RECORDING_MAX_BATCH_SIZE_KB ?? 0) * 1024, maxBatchAgeMs: config.SESSION_RECORDING_MAX_BATCH_AGE_MS ?? 1000, - createBatch: () => new SessionBatchRecorder(new BlackholeSessionBatchWriter()), offsetManager, }) @@ -128,15 +120,14 @@ export class SessionRecordingIngester { } private async processBatchMessages(messages: Message[], context: { heartbeat: () => void }): Promise { - // Increment message received counter for each message messages.forEach((message) => { - this.metrics.incrementMessageReceived(message.partition) + SessionRecordingIngesterMetrics.incrementMessageReceived(message.partition) }) const batchSize = messages.length const batchSizeKb = messages.reduce((acc, m) => (m.value?.length ?? 0) + acc, 0) / 1024 - this.metrics.observeKafkaBatchSize(batchSize) - this.metrics.observeKafkaBatchSizeKb(batchSizeKb) + SessionRecordingIngesterMetrics.observeKafkaBatchSize(batchSize) + SessionRecordingIngesterMetrics.observeKafkaBatchSizeKb(batchSizeKb) const processedMessages = await runInstrumentedFunction({ statsKey: `recordingingesterv2.handleEachBatch.parseBatch`, @@ -176,10 +167,10 @@ export class SessionRecordingIngester { }) } - private consume(message: MessageWithTeam, batch: SessionBatchRecorderInterface) { + private consume(message: MessageWithTeam, batch: SessionBatchRecorder) { // we have to reset this counter once we're consuming messages since then we know we're not re-balancing // otherwise the consumer continues to report however many sessions were revoked at the last re-balance forever - this.metrics.resetSessionsRevoked() + SessionRecordingIngesterMetrics.resetSessionsRevoked() const { team, message: parsedMessage } = message const debugEnabled = this.isDebugLoggingEnabled(parsedMessage.metadata.partition) @@ -203,7 +194,7 @@ export class SessionRecordingIngester { }) } - this.metrics.observeSessionInfo(parsedMessage.metadata.rawSize) + SessionRecordingIngesterMetrics.observeSessionInfo(parsedMessage.metadata.rawSize) batch.record(message) } @@ -307,7 +298,18 @@ export class SessionRecordingIngester { return } - this.metrics.resetSessionsHandled() + SessionRecordingIngesterMetrics.resetSessionsHandled() await this.sessionBatchManager.discardPartitions(revokedPartitions) } + + private async commitOffsets(offsets: TopicPartitionOffset[]): Promise { + await new Promise((resolve, reject) => { + try { + this.batchConsumer!.consumer.commitSync(offsets) + resolve() + } catch (error) { + reject(error) + } + }) + } } diff --git a/plugin-server/tests/main/ingestion-queues/session-recording-v2/kafka/message-parser.test.ts b/plugin-server/src/main/ingestion-queues/session-recording-v2/kafka/message-parser.test.ts similarity index 86% rename from plugin-server/tests/main/ingestion-queues/session-recording-v2/kafka/message-parser.test.ts rename to plugin-server/src/main/ingestion-queues/session-recording-v2/kafka/message-parser.test.ts index 0c1fc3bed8516..3f0dbf3bc2437 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording-v2/kafka/message-parser.test.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording-v2/kafka/message-parser.test.ts @@ -2,20 +2,19 @@ import { promisify } from 'node:util' import { Message } from 'node-rdkafka' import { gzip } from 'zlib' -import { KafkaMessageParser } from '../../../../../src/main/ingestion-queues/session-recording-v2/kafka/message-parser' -import { KafkaMetrics } from '../../../../../src/main/ingestion-queues/session-recording-v2/kafka/metrics' +import { KafkaMessageParser } from './message-parser' +import { KafkaMetrics } from './metrics' const compressWithGzip = promisify(gzip) +jest.mock('./metrics') + describe('KafkaMessageParser', () => { let parser: KafkaMessageParser - let mockKafkaMetrics: jest.Mocked beforeEach(() => { - mockKafkaMetrics = { - incrementMessageDropped: jest.fn(), - } as jest.Mocked - parser = new KafkaMessageParser(mockKafkaMetrics) + jest.clearAllMocks() + parser = new KafkaMessageParser() }) const createMessage = (data: any, overrides: Partial = {}): Message => ({ @@ -71,7 +70,7 @@ describe('KafkaMessageParser', () => { }, snapshot_source: undefined, }) - expect(mockKafkaMetrics.incrementMessageDropped).not.toHaveBeenCalled() + expect(KafkaMetrics.incrementMessageDropped).not.toHaveBeenCalled() }) it('handles gzipped message', async () => { @@ -108,7 +107,7 @@ describe('KafkaMessageParser', () => { end: 1234567891, }, }) - expect(mockKafkaMetrics.incrementMessageDropped).not.toHaveBeenCalled() + expect(KafkaMetrics.incrementMessageDropped).not.toHaveBeenCalled() }) it('filters out message with missing value', async () => { @@ -117,7 +116,7 @@ describe('KafkaMessageParser', () => { const results = await parser.parseBatch(messages) expect(results).toHaveLength(0) - expect(mockKafkaMetrics.incrementMessageDropped).toHaveBeenCalledWith( + expect(KafkaMetrics.incrementMessageDropped).toHaveBeenCalledWith( 'session_recordings_blob_ingestion', 'message_value_or_timestamp_is_empty' ) @@ -129,7 +128,7 @@ describe('KafkaMessageParser', () => { const results = await parser.parseBatch(messages) expect(results).toHaveLength(0) - expect(mockKafkaMetrics.incrementMessageDropped).toHaveBeenCalledWith( + expect(KafkaMetrics.incrementMessageDropped).toHaveBeenCalledWith( 'session_recordings_blob_ingestion', 'message_value_or_timestamp_is_empty' ) @@ -141,7 +140,7 @@ describe('KafkaMessageParser', () => { const results = await parser.parseBatch(messages) expect(results).toHaveLength(0) - expect(mockKafkaMetrics.incrementMessageDropped).toHaveBeenCalledWith( + expect(KafkaMetrics.incrementMessageDropped).toHaveBeenCalledWith( 'session_recordings_blob_ingestion', 'invalid_gzip_data' ) @@ -153,7 +152,7 @@ describe('KafkaMessageParser', () => { const results = await parser.parseBatch(messages) expect(results).toHaveLength(0) - expect(mockKafkaMetrics.incrementMessageDropped).toHaveBeenCalledWith( + expect(KafkaMetrics.incrementMessageDropped).toHaveBeenCalledWith( 'session_recordings_blob_ingestion', 'invalid_json' ) @@ -174,7 +173,7 @@ describe('KafkaMessageParser', () => { const results = await parser.parseBatch(messages) expect(results).toHaveLength(0) - expect(mockKafkaMetrics.incrementMessageDropped).toHaveBeenCalledWith( + expect(KafkaMetrics.incrementMessageDropped).toHaveBeenCalledWith( 'session_recordings_blob_ingestion', 'received_non_snapshot_message' ) diff --git a/plugin-server/src/main/ingestion-queues/session-recording-v2/kafka/message-parser.ts b/plugin-server/src/main/ingestion-queues/session-recording-v2/kafka/message-parser.ts index 25a29bb89c316..7b0ec43814dd1 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording-v2/kafka/message-parser.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording-v2/kafka/message-parser.ts @@ -11,8 +11,6 @@ const GZIP_HEADER = Uint8Array.from([0x1f, 0x8b, 0x08, 0x00]) const decompressWithGzip = promisify(gunzip) export class KafkaMessageParser { - constructor(private readonly metrics: KafkaMetrics) {} - public async parseBatch(messages: Message[]): Promise { const parsedMessages = await Promise.all(messages.map((message) => this.parseMessage(message))) return parsedMessages.filter((msg) => msg !== null) as ParsedMessageData[] @@ -20,7 +18,7 @@ export class KafkaMessageParser { private async parseMessage(message: Message): Promise { const dropMessage = (reason: string, extra?: Record) => { - this.metrics.incrementMessageDropped('session_recordings_blob_ingestion', reason) + KafkaMetrics.incrementMessageDropped('session_recordings_blob_ingestion', reason) status.warn('⚠️', 'invalid_message', { reason, diff --git a/plugin-server/src/main/ingestion-queues/session-recording-v2/kafka/metrics.ts b/plugin-server/src/main/ingestion-queues/session-recording-v2/kafka/metrics.ts index 3ba6dc04c5cbc..c304e32d0bb21 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording-v2/kafka/metrics.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording-v2/kafka/metrics.ts @@ -1,18 +1,7 @@ import { eventDroppedCounter } from '../../metrics' export class KafkaMetrics { - private static instance: KafkaMetrics - - public constructor() {} - - public static getInstance(): KafkaMetrics { - if (!KafkaMetrics.instance) { - KafkaMetrics.instance = new KafkaMetrics() - } - return KafkaMetrics.instance - } - - public incrementMessageDropped(event_type: string, drop_cause: string): void { + public static incrementMessageDropped(event_type: string, drop_cause: string): void { eventDroppedCounter.labels({ event_type, drop_cause }).inc() } } diff --git a/plugin-server/src/main/ingestion-queues/session-recording-v2/kafka/offset-manager.test.ts b/plugin-server/src/main/ingestion-queues/session-recording-v2/kafka/offset-manager.test.ts new file mode 100644 index 0000000000000..76b8f376df061 --- /dev/null +++ b/plugin-server/src/main/ingestion-queues/session-recording-v2/kafka/offset-manager.test.ts @@ -0,0 +1,124 @@ +import { KafkaOffsetManager } from './offset-manager' + +describe('KafkaOffsetManager', () => { + let offsetManager: KafkaOffsetManager + let mockCommitOffsets: jest.Mock> + const TEST_TOPIC = 'test_topic' + + beforeEach(() => { + mockCommitOffsets = jest.fn().mockResolvedValue(undefined) + offsetManager = new KafkaOffsetManager(mockCommitOffsets, TEST_TOPIC) + }) + + it('should track offsets when recording messages', async () => { + offsetManager.trackOffset({ partition: 1, offset: 100 }) + + await offsetManager.commit() + + expect(mockCommitOffsets).toHaveBeenCalledWith([{ topic: TEST_TOPIC, partition: 1, offset: 101 }]) + }) + + it('should commit offsets for multiple partitions', async () => { + const messages = [ + { partition: 1, offset: 100 }, + { partition: 1, offset: 101 }, + { partition: 2, offset: 200 }, + ] + + for (const metadata of messages) { + offsetManager.trackOffset(metadata) + } + + await offsetManager.commit() + + expect(mockCommitOffsets).toHaveBeenCalledWith([ + { topic: TEST_TOPIC, partition: 1, offset: 102 }, // Last offset + 1 + { topic: TEST_TOPIC, partition: 2, offset: 201 }, // Last offset + 1 + ]) + }) + + it('should clear offsets after commit', async () => { + offsetManager.trackOffset({ partition: 1, offset: 100 }) + await offsetManager.commit() + + // Second commit should not commit anything + await offsetManager.commit() + + expect(mockCommitOffsets).toHaveBeenCalledTimes(1) + }) + + it('should handle commit failures', async () => { + const error = new Error('Commit failed') + mockCommitOffsets.mockRejectedValueOnce(error) + + offsetManager.trackOffset({ partition: 1, offset: 100 }) + + await expect(offsetManager.commit()).rejects.toThrow(error) + }) + + describe('partition handling', () => { + it('should not commit offsets for discarded partitions', async () => { + // Record messages for two partitions + offsetManager.trackOffset({ partition: 1, offset: 100 }) + offsetManager.trackOffset({ partition: 2, offset: 200 }) + + // Discard partition 1 + offsetManager.discardPartition(1) + + await offsetManager.commit() + + // Should only commit offset for partition 2 + expect(mockCommitOffsets).toHaveBeenCalledWith([ + { + topic: 'test_topic', + partition: 2, + offset: 201, + }, + ]) + }) + + it('should handle discarding already committed partitions', async () => { + // Record and commit a message + offsetManager.trackOffset({ partition: 1, offset: 100 }) + await offsetManager.commit() + + // Discard the partition after commit + offsetManager.discardPartition(1) + + // Record new message for same partition + offsetManager.trackOffset({ partition: 1, offset: 101 }) + await offsetManager.commit() + + expect(mockCommitOffsets).toHaveBeenCalledTimes(2) + expect(mockCommitOffsets).toHaveBeenLastCalledWith([ + { + topic: 'test_topic', + partition: 1, + offset: 102, + }, + ]) + }) + + it('should handle discarding non-existent partitions', () => { + offsetManager.discardPartition(999) + // No error should be thrown + }) + + it('should maintain highest offset when recording multiple messages', async () => { + // Record messages in non-sequential order + offsetManager.trackOffset({ partition: 1, offset: 100 }) + offsetManager.trackOffset({ partition: 1, offset: 99 }) + offsetManager.trackOffset({ partition: 1, offset: 101 }) + + await offsetManager.commit() + + expect(mockCommitOffsets).toHaveBeenCalledWith([ + { + topic: 'test_topic', + partition: 1, + offset: 102, + }, + ]) + }) + }) +}) diff --git a/plugin-server/src/main/ingestion-queues/session-recording-v2/kafka/offset-manager.ts b/plugin-server/src/main/ingestion-queues/session-recording-v2/kafka/offset-manager.ts index a43c09e90c3fe..1f25dc529b3c8 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording-v2/kafka/offset-manager.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording-v2/kafka/offset-manager.ts @@ -1,50 +1,14 @@ import { TopicPartitionOffset } from 'node-rdkafka' -import { SessionBatchRecorderInterface } from '../sessions/session-batch-recorder' -import { MessageWithTeam } from '../teams/types' - -interface PartitionOffset { - partition: number - offset: number -} +import { PartitionOffset } from '../types' type CommitOffsetsCallback = (offsets: TopicPartitionOffset[]) => Promise -class OffsetTrackingSessionBatchRecorderWrapper implements SessionBatchRecorderInterface { - constructor( - private readonly recorder: SessionBatchRecorderInterface, - private readonly offsetManager: KafkaOffsetManager - ) {} - - public record(message: MessageWithTeam): number { - const bytesWritten = this.recorder.record(message) - this.offsetManager.trackOffset(message.message.metadata) - return bytesWritten - } - - public async flush(): Promise { - await this.recorder.flush() - } - - public discardPartition(partition: number): void { - this.recorder.discardPartition(partition) - this.offsetManager.discardPartition(partition) - } - - public get size(): number { - return this.recorder.size - } -} - export class KafkaOffsetManager { private partitionOffsets: Map = new Map() constructor(private readonly commitOffsets: CommitOffsetsCallback, private readonly topic: string) {} - public wrapBatch(recorder: SessionBatchRecorderInterface): SessionBatchRecorderInterface { - return new OffsetTrackingSessionBatchRecorderWrapper(recorder, this) - } - public trackOffset({ partition, offset }: PartitionOffset): void { // We track the next offset to process this.partitionOffsets.set(partition, offset + 1) diff --git a/plugin-server/src/main/ingestion-queues/session-recording-v2/metrics.ts b/plugin-server/src/main/ingestion-queues/session-recording-v2/metrics.ts index b1d670d8ca8bf..9c331fd2eeea6 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording-v2/metrics.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording-v2/metrics.ts @@ -2,101 +2,62 @@ import { Counter, Gauge, Histogram, Summary } from 'prom-client' const BUCKETS_KB_WRITTEN = [0, 128, 512, 1024, 5120, 10240, 20480, 51200, 102400, 204800, Infinity] -export class SessionRecordingMetrics { - private static instance: SessionRecordingMetrics +export class SessionRecordingIngesterMetrics { + private static readonly sessionsHandled = new Gauge({ + name: 'recording_blob_ingestion_v2_session_manager_count', + help: 'A gauge of the number of sessions being handled by this blob ingestion consumer', + }) - private readonly sessionsHandled: Gauge - private readonly sessionsRevoked: Gauge - private readonly kafkaBatchSize: Histogram - private readonly kafkaBatchSizeKb: Histogram - private readonly sessionInfo: Summary - private readonly messageReceived: Counter + private static readonly sessionsRevoked = new Gauge({ + name: 'recording_blob_ingestion_v2_sessions_revoked', + help: 'A gauge of the number of sessions being revoked when partitions are revoked when a re-balance occurs', + }) - private constructor() { - this.sessionsHandled = new Gauge({ - name: 'recording_blob_ingestion_v2_session_manager_count', - help: 'A gauge of the number of sessions being handled by this blob ingestion consumer', - }) + private static readonly kafkaBatchSize = new Histogram({ + name: 'recording_blob_ingestion_v2_kafka_batch_size', + help: 'The size of the batches we are receiving from Kafka', + buckets: [0, 1, 5, 10, 25, 50, 100, 150, 200, 250, 300, 350, 400, 500, 750, 1000, 1500, 2000, 3000, Infinity], + }) - this.sessionsRevoked = new Gauge({ - name: 'recording_blob_ingestion_v2_sessions_revoked', - help: 'A gauge of the number of sessions being revoked when partitions are revoked when a re-balance occurs', - }) + private static readonly kafkaBatchSizeKb = new Histogram({ + name: 'recording_blob_ingestion_v2_kafka_batch_size_kb', + help: 'The size in kb of the batches we are receiving from Kafka', + buckets: BUCKETS_KB_WRITTEN, + }) - this.kafkaBatchSize = new Histogram({ - name: 'recording_blob_ingestion_v2_kafka_batch_size', - help: 'The size of the batches we are receiving from Kafka', - buckets: [ - 0, - 1, - 5, - 10, - 25, - 50, - 100, - 150, - 200, - 250, - 300, - 350, - 400, - 500, - 750, - 1000, - 1500, - 2000, - 3000, - Infinity, - ], - }) + private static readonly sessionInfo = new Summary({ + name: 'recording_blob_ingestion_v2_session_info_bytes', + help: 'Size of aggregated session information being processed', + percentiles: [0.1, 0.25, 0.5, 0.9, 0.99], + }) - this.kafkaBatchSizeKb = new Histogram({ - name: 'recording_blob_ingestion_v2_kafka_batch_size_kb', - help: 'The size in kb of the batches we are receiving from Kafka', - buckets: BUCKETS_KB_WRITTEN, - }) + private static readonly messageReceived = new Counter({ + name: 'recording_blob_ingestion_v2_kafka_message_received', + help: 'The number of messages we have received from Kafka', + labelNames: ['partition'], + }) - this.sessionInfo = new Summary({ - name: 'recording_blob_ingestion_v2_session_info_bytes', - help: 'Size of aggregated session information being processed', - percentiles: [0.1, 0.25, 0.5, 0.9, 0.99], - }) - - this.messageReceived = new Counter({ - name: 'recording_blob_ingestion_v2_kafka_message_received', - help: 'The number of messages we have received from Kafka', - labelNames: ['partition'], - }) + public static incrementMessageReceived(partition: number): void { + this.messageReceived.labels(partition.toString()).inc() } - public static getInstance(): SessionRecordingMetrics { - if (!SessionRecordingMetrics.instance) { - SessionRecordingMetrics.instance = new SessionRecordingMetrics() - } - return SessionRecordingMetrics.instance + public static resetSessionsRevoked(): void { + this.sessionsRevoked.set(0) } - public resetSessionsRevoked(): void { - this.sessionsRevoked.reset() + public static resetSessionsHandled(): void { + this.sessionsHandled.set(0) } - public resetSessionsHandled(): void { - this.sessionsHandled.reset() + public static observeSessionInfo(rawSize: number): void { + this.sessionInfo.observe(rawSize) } - public observeKafkaBatchSize(size: number): void { + public static observeKafkaBatchSize(size: number): void { this.kafkaBatchSize.observe(size) } - public observeKafkaBatchSizeKb(sizeKb: number): void { + public static observeKafkaBatchSizeKb(sizeKb: number): void { this.kafkaBatchSizeKb.observe(sizeKb) } - - public observeSessionInfo(bytes: number): void { - this.sessionInfo.observe(bytes) - } - - public incrementMessageReceived(partition: number): void { - this.messageReceived.inc({ partition }) - } } diff --git a/plugin-server/tests/main/ingestion-queues/session-recording-v2/sessions/recorder.test.ts b/plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/recorder.test.ts similarity index 97% rename from plugin-server/tests/main/ingestion-queues/session-recording-v2/sessions/recorder.test.ts rename to plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/recorder.test.ts index 0bc39a656f746..66fcddfed42a1 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording-v2/sessions/recorder.test.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/recorder.test.ts @@ -1,7 +1,7 @@ import { PassThrough } from 'stream' -import { ParsedMessageData } from '../../../../../src/main/ingestion-queues/session-recording-v2/kafka/types' -import { SessionRecorder } from '../../../../../src/main/ingestion-queues/session-recording-v2/sessions/recorder' +import { ParsedMessageData } from '../kafka/types' +import { SessionRecorder } from './recorder' // RRWeb event type constants const enum EventType { diff --git a/plugin-server/tests/main/ingestion-queues/session-recording-v2/sessions/session-batch-manager.test.ts b/plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/session-batch-manager.test.ts similarity index 65% rename from plugin-server/tests/main/ingestion-queues/session-recording-v2/sessions/session-batch-manager.test.ts rename to plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/session-batch-manager.test.ts index aa5be4bfb273d..ccd3e2aec750f 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording-v2/sessions/session-batch-manager.test.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/session-batch-manager.test.ts @@ -1,49 +1,50 @@ -import { KafkaOffsetManager } from '../../../../../src/main/ingestion-queues/session-recording-v2/kafka/offset-manager' -import { SessionBatchManager } from '../../../../../src/main/ingestion-queues/session-recording-v2/sessions/session-batch-manager' -import { SessionBatchRecorderInterface } from '../../../../../src/main/ingestion-queues/session-recording-v2/sessions/session-batch-recorder' +import { KafkaOffsetManager } from '../kafka/offset-manager' +import { SessionBatchManager } from './session-batch-manager' +import { SessionBatchRecorder } from './session-batch-recorder' jest.setTimeout(1000) - -const createMockBatch = (): jest.Mocked => { - return { - record: jest.fn(), - flush: jest.fn().mockResolvedValue(undefined), - get size() { - return 0 - }, - discardPartition: jest.fn(), - } as unknown as jest.Mocked -} +jest.mock('./session-batch-recorder') describe('SessionBatchManager', () => { let manager: SessionBatchManager let executionOrder: number[] - let createBatchMock: jest.Mock - let currentBatch: jest.Mocked + let currentBatch: jest.Mocked let mockOffsetManager: jest.Mocked + const createMockBatch = (): jest.Mocked => + ({ + record: jest.fn(), + flush: jest.fn().mockResolvedValue(undefined), + get size() { + return 0 + }, + discardPartition: jest.fn(), + } as unknown as jest.Mocked) + beforeEach(() => { - currentBatch = createMockBatch() - createBatchMock = jest.fn().mockImplementation(() => { + jest.mocked(SessionBatchRecorder).mockImplementation(() => { currentBatch = createMockBatch() return currentBatch }) mockOffsetManager = { - wrapBatch: jest.fn().mockImplementation((batch) => batch), commit: jest.fn().mockResolvedValue(undefined), trackOffset: jest.fn(), + discardPartition: jest.fn(), } as unknown as jest.Mocked manager = new SessionBatchManager({ maxBatchSizeBytes: 100, maxBatchAgeMs: 1000, - createBatch: createBatchMock, offsetManager: mockOffsetManager, }) executionOrder = [] }) + afterEach(() => { + jest.clearAllMocks() + }) + const waitForNextTick = () => new Promise((resolve) => process.nextTick(resolve)) const waitFor = async (condition: () => boolean) => { @@ -77,7 +78,6 @@ describe('SessionBatchManager', () => { await Promise.all([promise1, promise2, promise3]) - // Should execute in order despite different delays expect(executionOrder).toEqual([1, 2, 3, 4, 5, 6]) }) @@ -107,7 +107,6 @@ describe('SessionBatchManager', () => { const results: number[] = [] const promises: Promise[] = [] - // Queue up 10 immediate callbacks for (let i = 0; i < 10; i++) { promises.push( manager.withBatch(async () => { @@ -119,7 +118,6 @@ describe('SessionBatchManager', () => { await Promise.all(promises) - // Should execute in order 0-9 expect(results).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]) }) @@ -150,7 +148,7 @@ describe('SessionBatchManager', () => { }) it('should create new batch on flush', async () => { - let firstBatch: SessionBatchRecorderInterface | null = null + let firstBatch: SessionBatchRecorder | null = null await manager.withBatch(async (batch) => { firstBatch = batch @@ -165,14 +163,24 @@ describe('SessionBatchManager', () => { }) }) - it('should create new batch and commit offsets on flush', async () => { - const firstBatch = currentBatch + it('should create new batch with correct params on flush', async () => { + let firstBatch: SessionBatchRecorder | null = null + await manager.withBatch(async (batch) => { + firstBatch = batch + expect(batch).toBeDefined() + return Promise.resolve() + }) await manager.flush() - expect(firstBatch.flush).toHaveBeenCalled() - expect(mockOffsetManager.commit).toHaveBeenCalled() - expect(createBatchMock).toHaveBeenCalledTimes(2) + expect(firstBatch!.flush).toHaveBeenCalled() + expect(SessionBatchRecorder).toHaveBeenCalledWith(mockOffsetManager) + + await manager.withBatch(async (batch) => { + expect(batch).not.toBe(firstBatch) + expect(batch.size).toBe(0) + return Promise.resolve() + }) }) describe('size-based flushing', () => { @@ -214,32 +222,45 @@ describe('SessionBatchManager', () => { }) it('should not indicate flush needed immediately after flushing', async () => { - const firstBatch = currentBatch - jest.spyOn(firstBatch, 'size', 'get').mockReturnValue(50) + let firstBatch: SessionBatchRecorder | null = null + const promise1 = manager.withBatch(async (batch) => { + firstBatch = batch + jest.spyOn(batch, 'size', 'get').mockReturnValue(50) + return Promise.resolve() + }) // First flush due to timeout jest.advanceTimersByTime(1500) + await promise1 expect(manager.shouldFlush()).toBe(true) + const firstFlushPromise = manager.flush() jest.runAllTimers() await firstFlushPromise - expect(firstBatch.flush).toHaveBeenCalled() + expect(firstBatch!.flush).toHaveBeenCalled() - expect(manager.shouldFlush()).toBe(false) + const promise2 = manager.withBatch(async (batch) => { + expect(batch).not.toBe(firstBatch) + expect(manager.shouldFlush()).toBe(false) + return Promise.resolve() + }) + jest.runAllTimers() + await promise2 }) }) it('should execute callbacks sequentially including flushes', async () => { - const firstBatch = currentBatch - - const promise1 = manager.withBatch(async () => { + let firstBatch: SessionBatchRecorder | null = null + const promise1 = await manager.withBatch(async (batch) => { + firstBatch = batch executionOrder.push(1) return Promise.resolve() }) const flushPromise = manager.flush() - const promise2 = manager.withBatch(async () => { + const promise2 = await manager.withBatch(async (batch) => { + expect(batch).not.toBe(firstBatch) executionOrder.push(2) return Promise.resolve() }) @@ -247,45 +268,56 @@ describe('SessionBatchManager', () => { await Promise.all([promise1, flushPromise, promise2]) expect(executionOrder).toEqual([1, 2]) - expect(firstBatch.flush).toHaveBeenCalled() - expect(mockOffsetManager.commit).toHaveBeenCalled() + expect(firstBatch!.flush).toHaveBeenCalled() }) describe('partition handling', () => { it('should discard partitions on new batch after flush', async () => { - const firstBatch = currentBatch + let firstBatch: SessionBatchRecorder | null = null + let secondBatch: SessionBatchRecorder | null = null + + await manager.withBatch(async (batch) => { + firstBatch = batch + await Promise.resolve() + }) - // Flush to create a new batch await manager.flush() - const secondBatch = currentBatch - // Verify we have a new batch - expect(secondBatch).not.toBe(firstBatch) + await manager.withBatch(async (batch) => { + secondBatch = batch + expect(batch).not.toBe(firstBatch) + await Promise.resolve() + }) - // Discard partitions await manager.discardPartitions([1, 2]) - // Verify discards happened on the new batch only - expect(firstBatch.discardPartition).not.toHaveBeenCalled() - expect(secondBatch.discardPartition).toHaveBeenCalledWith(1) - expect(secondBatch.discardPartition).toHaveBeenCalledWith(2) + expect(firstBatch!.discardPartition).not.toHaveBeenCalled() + expect(secondBatch!.discardPartition).toHaveBeenCalledWith(1) + expect(secondBatch!.discardPartition).toHaveBeenCalledWith(2) }) it('should discard multiple partitions on current batch', async () => { + let currentBatch: SessionBatchRecorder | null = null + await manager.withBatch(async (batch) => { + currentBatch = batch + await Promise.resolve() + }) + await manager.discardPartitions([1, 2]) - expect(currentBatch.discardPartition).toHaveBeenCalledWith(1) - expect(currentBatch.discardPartition).toHaveBeenCalledWith(2) - expect(currentBatch.discardPartition).toHaveBeenCalledTimes(2) + expect(currentBatch!.discardPartition).toHaveBeenCalledWith(1) + expect(currentBatch!.discardPartition).toHaveBeenCalledWith(2) + expect(currentBatch!.discardPartition).toHaveBeenCalledTimes(2) }) it('should maintain operation order when discarding partitions', async () => { const executionOrder: number[] = [] + let currentBatch: SessionBatchRecorder | null = null // Start a long-running batch operation - const batchPromise = manager.withBatch(async () => { + const batchPromise = manager.withBatch(async (batch) => { + currentBatch = batch await new Promise((resolve) => setTimeout(resolve, 100)) executionOrder.push(1) - return Promise.resolve() }) // Queue up a partition discard @@ -298,12 +330,18 @@ describe('SessionBatchManager', () => { // Verify operations happened in the correct order expect(executionOrder).toEqual([1, 2]) - expect(currentBatch.discardPartition).toHaveBeenCalledWith(1) + expect(currentBatch!.discardPartition).toHaveBeenCalledWith(1) }) it('should handle empty partition array', async () => { + let currentBatch: SessionBatchRecorder | null = null + await manager.withBatch(async (batch) => { + currentBatch = batch + await Promise.resolve() + }) + await manager.discardPartitions([]) - expect(currentBatch.discardPartition).not.toHaveBeenCalled() + expect(currentBatch!.discardPartition).not.toHaveBeenCalled() }) }) }) diff --git a/plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/session-batch-manager.ts b/plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/session-batch-manager.ts index 8aa7687b6999b..d8d7614f14924 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/session-batch-manager.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/session-batch-manager.ts @@ -1,34 +1,31 @@ import { KafkaOffsetManager } from '../kafka/offset-manager' import { PromiseQueue } from './promise-queue' -import { SessionBatchRecorderInterface } from './session-batch-recorder' +import { SessionBatchRecorder } from './session-batch-recorder' export interface SessionBatchManagerConfig { maxBatchSizeBytes: number maxBatchAgeMs: number - createBatch: () => SessionBatchRecorderInterface offsetManager: KafkaOffsetManager } export class SessionBatchManager { - private currentBatch: SessionBatchRecorderInterface + private currentBatch: SessionBatchRecorder private queue: PromiseQueue private readonly maxBatchSizeBytes: number private readonly maxBatchAgeMs: number - private readonly createBatch: () => SessionBatchRecorderInterface private readonly offsetManager: KafkaOffsetManager private lastFlushTime: number constructor(config: SessionBatchManagerConfig) { this.maxBatchSizeBytes = config.maxBatchSizeBytes this.maxBatchAgeMs = config.maxBatchAgeMs - this.createBatch = config.createBatch this.offsetManager = config.offsetManager - this.currentBatch = this.offsetManager.wrapBatch(this.createBatch()) + this.currentBatch = new SessionBatchRecorder(this.offsetManager) this.queue = new PromiseQueue() this.lastFlushTime = Date.now() } - public async withBatch(callback: (batch: SessionBatchRecorderInterface) => Promise): Promise { + public async withBatch(callback: (batch: SessionBatchRecorder) => Promise): Promise { return this.queue.add(() => callback(this.currentBatch)) } @@ -55,8 +52,7 @@ export class SessionBatchManager { private async rotateBatch(): Promise { await this.currentBatch.flush() - await this.offsetManager.commit() - this.currentBatch = this.offsetManager.wrapBatch(this.createBatch()) + this.currentBatch = new SessionBatchRecorder(this.offsetManager) this.lastFlushTime = Date.now() } } diff --git a/plugin-server/tests/main/ingestion-queues/session-recording-v2/sessions/session-batch-recorder.test.ts b/plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/session-batch-recorder.test.ts similarity index 82% rename from plugin-server/tests/main/ingestion-queues/session-recording-v2/sessions/session-batch-recorder.test.ts rename to plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/session-batch-recorder.test.ts index 3e62e9b840ae9..2a202a5b6a182 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording-v2/sessions/session-batch-recorder.test.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/session-batch-recorder.test.ts @@ -1,12 +1,10 @@ import { PassThrough } from 'stream' -import { SessionBatchMetrics } from '../../../../../src/main/ingestion-queues/session-recording-v2/sessions/metrics' -import { - SessionBatchRecorder, - SessionBatchRecorderInterface, - SessionBatchWriter, -} from '../../../../../src/main/ingestion-queues/session-recording-v2/sessions/session-batch-recorder' -import { MessageWithTeam } from '../../../../../src/main/ingestion-queues/session-recording-v2/teams/types' +import { KafkaOffsetManager } from '../kafka/offset-manager' +import { MessageWithTeam } from '../teams/types' +import { BlackholeSessionBatchWriter } from './blackhole-session-batch-writer' +import { SessionBatchMetrics } from './metrics' +import { SessionBatchRecorder } from './session-batch-recorder' // RRWeb event type constants const enum EventType { @@ -32,8 +30,9 @@ interface MessageMetadata { rawSize?: number } -// Add to the top of the file, after other mocks -jest.mock('../../../../../src/main/ingestion-queues/session-recording-v2/sessions/metrics', () => ({ +jest.setTimeout(1000) + +jest.mock('./metrics', () => ({ SessionBatchMetrics: { incrementBatchesFlushed: jest.fn(), incrementSessionsFlushed: jest.fn(), @@ -42,30 +41,41 @@ jest.mock('../../../../../src/main/ingestion-queues/session-recording-v2/session }, })) +jest.mock('./blackhole-session-batch-writer') + describe('SessionBatchRecorder', () => { - let recorder: SessionBatchRecorderInterface - let mockWriter: jest.Mocked + let recorder: SessionBatchRecorder + let mockWriter: jest.Mocked + let mockOffsetManager: jest.Mocked let mockStream: PassThrough - let mockFinish: () => Promise + let mockOpen: jest.Mock + let mockFinish: jest.Mock + + const createOpenMock = () => { + const stream = new PassThrough() + const finishMock = jest.fn().mockResolvedValue(undefined) + const openMock = jest.fn().mockResolvedValue({ stream, finish: finishMock }) + return { openMock, finishMock, stream } + } beforeEach(() => { - mockStream = new PassThrough() - mockFinish = jest.fn().mockResolvedValue(undefined) + const openMock = createOpenMock() + mockOpen = openMock.openMock + mockFinish = openMock.finishMock + mockStream = openMock.stream mockWriter = { - open: jest.fn().mockImplementation(() => - Promise.resolve({ - stream: mockStream, - finish: mockFinish, - }) - ), - } - recorder = new SessionBatchRecorder(mockWriter) - - // Reset metrics mocks - jest.mocked(SessionBatchMetrics.incrementBatchesFlushed).mockClear() - jest.mocked(SessionBatchMetrics.incrementSessionsFlushed).mockClear() - jest.mocked(SessionBatchMetrics.incrementEventsFlushed).mockClear() - jest.mocked(SessionBatchMetrics.incrementBytesWritten).mockClear() + open: mockOpen, + } as unknown as jest.Mocked + + jest.mocked(BlackholeSessionBatchWriter).mockImplementation(() => mockWriter) + + mockOffsetManager = { + trackOffset: jest.fn(), + discardPartition: jest.fn(), + commit: jest.fn(), + } as unknown as jest.Mocked + + recorder = new SessionBatchRecorder(mockOffsetManager) }) const createMessage = ( @@ -121,7 +131,7 @@ describe('SessionBatchRecorder', () => { } describe('recording and writing', () => { - it('should process and flush a single session', async () => { + it('should process and flush a single session and track offsets', async () => { const message = createMessage('session1', [ { type: EventType.FullSnapshot, @@ -131,11 +141,16 @@ describe('SessionBatchRecorder', () => { ]) recorder.record(message) + expect(mockOffsetManager.trackOffset).toHaveBeenCalledWith({ + partition: message.message.metadata.partition, + offset: message.message.metadata.offset, + }) + const outputPromise = captureOutput(mockStream) await recorder.flush() - expect(mockWriter.open).toHaveBeenCalled() - expect(mockFinish).toHaveBeenCalled() + expect(mockOpen).toHaveBeenCalledTimes(1) + expect(mockFinish).toHaveBeenCalledTimes(1) const output = await outputPromise const lines = parseLines(output) @@ -161,12 +176,20 @@ describe('SessionBatchRecorder', () => { ]), ] - messages.forEach((message) => recorder.record(message)) + messages.forEach((message) => { + recorder.record(message) + expect(mockOffsetManager.trackOffset).toHaveBeenCalledWith({ + partition: message.message.metadata.partition, + offset: message.message.metadata.offset, + }) + }) + expect(mockOffsetManager.trackOffset).toHaveBeenCalledTimes(2) + const outputPromise = captureOutput(mockStream) await recorder.flush() - expect(mockWriter.open).toHaveBeenCalled() - expect(mockFinish).toHaveBeenCalled() + expect(mockOpen).toHaveBeenCalledTimes(1) + expect(mockFinish).toHaveBeenCalledTimes(1) const output = await outputPromise const lines = parseLines(output) @@ -199,8 +222,8 @@ describe('SessionBatchRecorder', () => { const outputPromise = captureOutput(mockStream) await recorder.flush() - expect(mockWriter.open).toHaveBeenCalled() - expect(mockFinish).toHaveBeenCalled() + expect(mockOpen).toHaveBeenCalledTimes(1) + expect(mockFinish).toHaveBeenCalledTimes(1) const output = await outputPromise const lines = parseLines(output) @@ -218,8 +241,8 @@ describe('SessionBatchRecorder', () => { const outputPromise = captureOutput(mockStream) await recorder.flush() - expect(mockWriter.open).toHaveBeenCalled() - expect(mockFinish).toHaveBeenCalled() + expect(mockOpen).toHaveBeenCalledTimes(1) + expect(mockFinish).toHaveBeenCalledTimes(1) const output = await outputPromise expect(output).toBe('') @@ -262,8 +285,8 @@ describe('SessionBatchRecorder', () => { const outputPromise = captureOutput(mockStream) await recorder.flush() - expect(mockWriter.open).toHaveBeenCalled() - expect(mockFinish).toHaveBeenCalled() + expect(mockOpen).toHaveBeenCalledTimes(1) + expect(mockFinish).toHaveBeenCalledTimes(1) const output = await outputPromise const lines = parseLines(output) @@ -283,14 +306,8 @@ describe('SessionBatchRecorder', () => { describe('flushing behavior', () => { it('should clear sessions after flush', async () => { - const stream1 = new PassThrough() - const stream2 = new PassThrough() - const finish1 = jest.fn().mockResolvedValue(undefined) - const finish2 = jest.fn().mockResolvedValue(undefined) - - mockWriter.open - .mockResolvedValueOnce({ stream: stream1, finish: finish1 }) - .mockResolvedValueOnce({ stream: stream2, finish: finish2 }) + const { openMock: firstOpen, finishMock: firstFinish, stream: firstStream } = createOpenMock() + mockWriter.open = firstOpen const message1 = createMessage('session1', [ { @@ -309,25 +326,25 @@ describe('SessionBatchRecorder', () => { ]) recorder.record(message1) - const outputPromise1 = captureOutput(stream1) + const outputPromise1 = captureOutput(firstStream) await recorder.flush() - expect(mockWriter.open).toHaveBeenCalledTimes(1) - expect(finish1).toHaveBeenCalledTimes(1) - expect(finish2).not.toHaveBeenCalled() + expect(firstOpen).toHaveBeenCalledTimes(1) const output1 = await outputPromise1 + expect(firstFinish).toHaveBeenCalledTimes(1) + + const { openMock: secondOpen, finishMock: secondFinish, stream: secondStream } = createOpenMock() + mockWriter.open = secondOpen - // Record another message after flush recorder.record(message2) - const outputPromise2 = captureOutput(stream2) + const outputPromise2 = captureOutput(secondStream) await recorder.flush() - expect(mockWriter.open).toHaveBeenCalledTimes(2) - expect(finish1).toHaveBeenCalledTimes(1) - expect(finish2).toHaveBeenCalledTimes(1) + expect(secondOpen).toHaveBeenCalledTimes(1) + expect(firstFinish).toHaveBeenCalledTimes(1) + expect(secondFinish).toHaveBeenCalledTimes(1) const output2 = await outputPromise2 - // Each output should only contain the events from its own batch const lines1 = parseLines(output1) const lines2 = parseLines(output2) expect(lines1).toEqual([['window1', message1.message.eventsByWindowId.window1[0]]]) @@ -335,14 +352,8 @@ describe('SessionBatchRecorder', () => { }) it('should not output anything on second flush if no new events', async () => { - const stream1 = new PassThrough() - const stream2 = new PassThrough() - const finish1 = jest.fn().mockResolvedValue(undefined) - const finish2 = jest.fn().mockResolvedValue(undefined) - - mockWriter.open - .mockResolvedValueOnce({ stream: stream1, finish: finish1 }) - .mockResolvedValueOnce({ stream: stream2, finish: finish2 }) + const { openMock: firstOpen, finishMock: firstFinish } = createOpenMock() + mockWriter.open = firstOpen const message = createMessage('session1', [ { @@ -354,18 +365,21 @@ describe('SessionBatchRecorder', () => { recorder.record(message) await recorder.flush() - expect(mockWriter.open).toHaveBeenCalledTimes(1) - expect(finish1).toHaveBeenCalledTimes(1) - expect(finish2).not.toHaveBeenCalled() - const outputPromise = captureOutput(stream2) + expect(firstOpen).toHaveBeenCalledTimes(1) + expect(firstFinish).toHaveBeenCalledTimes(1) + + const { openMock: secondOpen, finishMock: secondFinish, stream: secondStream } = createOpenMock() + mockWriter.open = secondOpen + + const outputPromise = captureOutput(secondStream) await recorder.flush() const output = await outputPromise expect(output).toBe('') - expect(mockWriter.open).toHaveBeenCalledTimes(2) - expect(finish1).toHaveBeenCalledTimes(1) - expect(finish2).toHaveBeenCalledTimes(1) + expect(secondOpen).toHaveBeenCalledTimes(1) + expect(firstFinish).toHaveBeenCalledTimes(1) + expect(secondFinish).toHaveBeenCalledTimes(1) }) }) @@ -407,7 +421,7 @@ describe('SessionBatchRecorder', () => { ['window1', messages[0].message.eventsByWindowId.window1[0]], ['window1', messages[1].message.eventsByWindowId.window1[0]], ]) - expect(mockWriter.open).toHaveBeenCalledTimes(1) + expect(mockOpen).toHaveBeenCalledTimes(1) expect(mockFinish).toHaveBeenCalledTimes(1) }) @@ -438,7 +452,7 @@ describe('SessionBatchRecorder', () => { ] messages.forEach((message) => recorder.record(message)) - recorder.discardPartition(1) // Discard partition 1 + recorder.discardPartition(1) const outputPromise = captureOutput(mockStream) await recorder.flush() @@ -481,9 +495,11 @@ describe('SessionBatchRecorder', () => { expect(recorder.size).toBe(totalSize) recorder.discardPartition(1) + expect(mockOffsetManager.discardPartition).toHaveBeenCalledWith(1) expect(recorder.size).toBe(size2) recorder.discardPartition(2) + expect(mockOffsetManager.discardPartition).toHaveBeenCalledWith(2) expect(recorder.size).toBe(0) }) @@ -497,9 +513,12 @@ describe('SessionBatchRecorder', () => { ]) const bytesWritten = recorder.record(message) + expect(mockOffsetManager.trackOffset).toHaveBeenCalledWith({ + partition: message.message.metadata.partition, + offset: message.message.metadata.offset, + }) expect(recorder.size).toBe(bytesWritten) - // Should not throw or change size recorder.discardPartition(999) expect(recorder.size).toBe(bytesWritten) }) @@ -616,7 +635,6 @@ describe('SessionBatchRecorder', () => { ]), ] - // First flush messages.forEach((message) => recorder.record(message)) await recorder.flush() @@ -626,7 +644,6 @@ describe('SessionBatchRecorder', () => { expect(SessionBatchMetrics.incrementSessionsFlushed).toHaveBeenLastCalledWith(2) // Two sessions expect(SessionBatchMetrics.incrementEventsFlushed).toHaveBeenLastCalledWith(2) // Two events - // Second flush without new messages await recorder.flush() expect(SessionBatchMetrics.incrementBatchesFlushed).toHaveBeenCalledTimes(2) @@ -635,7 +652,6 @@ describe('SessionBatchRecorder', () => { expect(SessionBatchMetrics.incrementSessionsFlushed).toHaveBeenLastCalledWith(0) // No sessions expect(SessionBatchMetrics.incrementEventsFlushed).toHaveBeenLastCalledWith(0) // No events - // Add new message and flush again recorder.record( createMessage('session3', [ { diff --git a/plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/session-batch-recorder.ts b/plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/session-batch-recorder.ts index e44c722921382..282ad4e27a267 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/session-batch-recorder.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/session-batch-recorder.ts @@ -1,6 +1,8 @@ import { Writable } from 'stream' +import { KafkaOffsetManager } from '../kafka/offset-manager' import { MessageWithTeam } from '../teams/types' +import { BlackholeSessionBatchWriter } from './blackhole-session-batch-writer' import { SessionBatchMetrics } from './metrics' import { SessionRecorder } from './recorder' @@ -13,19 +15,15 @@ export interface SessionBatchWriter { open(): Promise } -export interface SessionBatchRecorderInterface { - record(message: MessageWithTeam): number - flush(): Promise - discardPartition(partition: number): void - readonly size: number -} - -export class SessionBatchRecorder implements SessionBatchRecorderInterface { +export class SessionBatchRecorder { private readonly partitionSessions = new Map>() private readonly partitionSizes = new Map() private _size: number = 0 + private readonly writer: BlackholeSessionBatchWriter - constructor(private readonly writer: SessionBatchWriter) {} + constructor(private readonly offsetManager: KafkaOffsetManager) { + this.writer = new BlackholeSessionBatchWriter() + } public record(message: MessageWithTeam): number { const { partition } = message.message.metadata @@ -49,6 +47,11 @@ export class SessionBatchRecorder implements SessionBatchRecorderInterface { this.partitionSizes.set(partition, currentPartitionSize + bytesWritten) this._size += bytesWritten + this.offsetManager.trackOffset({ + partition: message.message.metadata.partition, + offset: message.message.metadata.offset, + }) + return bytesWritten } @@ -58,6 +61,7 @@ export class SessionBatchRecorder implements SessionBatchRecorderInterface { this._size -= partitionSize this.partitionSizes.delete(partition) this.partitionSessions.delete(partition) + this.offsetManager.discardPartition(partition) } } @@ -80,6 +84,7 @@ export class SessionBatchRecorder implements SessionBatchRecorderInterface { stream.end() await finish() + await this.offsetManager.commit() // Update metrics SessionBatchMetrics.incrementBatchesFlushed() diff --git a/plugin-server/tests/main/ingestion-queues/session-recording-v2/teams/team-filter.test.ts b/plugin-server/src/main/ingestion-queues/session-recording-v2/teams/team-filter.test.ts similarity index 89% rename from plugin-server/tests/main/ingestion-queues/session-recording-v2/teams/team-filter.test.ts rename to plugin-server/src/main/ingestion-queues/session-recording-v2/teams/team-filter.test.ts index 91e9c7f76eacf..b122a562fa6a4 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording-v2/teams/team-filter.test.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording-v2/teams/team-filter.test.ts @@ -1,9 +1,9 @@ -import { ParsedMessageData } from '../../../../../src/main/ingestion-queues/session-recording-v2/kafka/types' -import { TeamFilter } from '../../../../../src/main/ingestion-queues/session-recording-v2/teams/team-filter' -import { TeamService } from '../../../../../src/main/ingestion-queues/session-recording-v2/teams/team-service' -import { Team } from '../../../../../src/main/ingestion-queues/session-recording-v2/teams/types' +import { ParsedMessageData } from '../kafka/types' +import { TeamFilter } from './team-filter' +import { TeamService } from './team-service' +import { Team } from './types' -jest.mock('../../../../../src/main/ingestion-queues/session-recording-v2/teams/team-service') +jest.mock('./team-service') const validTeam: Team = { teamId: 1, @@ -34,7 +34,7 @@ describe('TeamFilter', () => { jest.clearAllMocks() mockTeamService = { getTeamByToken: jest.fn(), - } as jest.Mocked + } as unknown as jest.Mocked teamFilter = new TeamFilter(mockTeamService) }) diff --git a/plugin-server/tests/main/ingestion-queues/session-recording-v2/teams/team-service.test.ts b/plugin-server/src/main/ingestion-queues/session-recording-v2/teams/team-service.test.ts similarity index 94% rename from plugin-server/tests/main/ingestion-queues/session-recording-v2/teams/team-service.test.ts rename to plugin-server/src/main/ingestion-queues/session-recording-v2/teams/team-service.test.ts index 0efaa630f6ec0..b1d65e8ecaff6 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording-v2/teams/team-service.test.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording-v2/teams/team-service.test.ts @@ -1,8 +1,8 @@ -import { TeamService } from '../../../../../src/main/ingestion-queues/session-recording-v2/teams/team-service' -import { PostgresRouter } from '../../../../../src/utils/db/postgres' -import { fetchTeamTokensWithRecordings } from '../../../../../src/worker/ingestion/team-manager' +import { PostgresRouter } from '../../../../utils/db/postgres' +import { fetchTeamTokensWithRecordings } from '../../../../worker/ingestion/team-manager' +import { TeamService } from './team-service' -jest.mock('../../../../../src/worker/ingestion/team-manager') +jest.mock('~/src/worker/ingestion/team-manager') const mockFetchTeamTokens = fetchTeamTokensWithRecordings as jest.MockedFunction describe('TeamService', () => { diff --git a/plugin-server/src/main/ingestion-queues/session-recording-v2/types.ts b/plugin-server/src/main/ingestion-queues/session-recording-v2/types.ts index 8820daf28fd97..7894539e92207 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording-v2/types.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording-v2/types.ts @@ -9,6 +9,11 @@ export type PersistedRecordingMessage = { export type EachBatchHandler = (messages: Message[], context: { heartbeat: () => void }) => Promise +export interface PartitionOffset { + partition: number + offset: number +} + export type CaptureIngestionWarningFn = ( teamId: number, type: string, diff --git a/plugin-server/tests/main/ingestion-queues/session-recording-v2/versions/lib-version-monitor.test.ts b/plugin-server/src/main/ingestion-queues/session-recording-v2/versions/lib-version-monitor.test.ts similarity index 69% rename from plugin-server/tests/main/ingestion-queues/session-recording-v2/versions/lib-version-monitor.test.ts rename to plugin-server/src/main/ingestion-queues/session-recording-v2/versions/lib-version-monitor.test.ts index b77de2187d775..194b464ad91bc 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording-v2/versions/lib-version-monitor.test.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording-v2/versions/lib-version-monitor.test.ts @@ -1,18 +1,17 @@ -import { MessageWithTeam } from '../../../../../src/main/ingestion-queues/session-recording-v2/teams/types' -import { LibVersionMonitor } from '../../../../../src/main/ingestion-queues/session-recording-v2/versions/lib-version-monitor' -import { VersionMetrics } from '../../../../../src/main/ingestion-queues/session-recording-v2/versions/version-metrics' +import { MessageWithTeam } from '../teams/types' +import { LibVersionMonitor } from './lib-version-monitor' +import { VersionMetrics } from './version-metrics' + +jest.mock('./version-metrics') describe('LibVersionMonitor', () => { let monitor: LibVersionMonitor let mockCaptureWarning: jest.Mock - let mockVersionMetrics: jest.Mocked beforeEach(() => { + jest.clearAllMocks() mockCaptureWarning = jest.fn() - mockVersionMetrics = { - incrementLibVersionWarning: jest.fn(), - } as unknown as jest.Mocked - monitor = new LibVersionMonitor(mockCaptureWarning, mockVersionMetrics) + monitor = new LibVersionMonitor(mockCaptureWarning) }) const createMessage = (headers: any[] = []): MessageWithTeam => ({ @@ -38,7 +37,7 @@ describe('LibVersionMonitor', () => { const result = await monitor.processBatch([message]) expect(result).toEqual([message]) - expect(mockVersionMetrics.incrementLibVersionWarning).toHaveBeenCalled() + expect(VersionMetrics.incrementLibVersionWarning).toHaveBeenCalled() expect(mockCaptureWarning).toHaveBeenCalledWith( 1, 'replay_lib_version_too_old', @@ -55,7 +54,7 @@ describe('LibVersionMonitor', () => { const result = await monitor.processBatch([message]) expect(result).toEqual([message]) - expect(mockVersionMetrics.incrementLibVersionWarning).not.toHaveBeenCalled() + expect(VersionMetrics.incrementLibVersionWarning).not.toHaveBeenCalled() expect(mockCaptureWarning).not.toHaveBeenCalled() }) @@ -64,7 +63,7 @@ describe('LibVersionMonitor', () => { const result = await monitor.processBatch([message]) expect(result).toEqual([message]) - expect(mockVersionMetrics.incrementLibVersionWarning).not.toHaveBeenCalled() + expect(VersionMetrics.incrementLibVersionWarning).not.toHaveBeenCalled() expect(mockCaptureWarning).not.toHaveBeenCalled() }) @@ -73,7 +72,7 @@ describe('LibVersionMonitor', () => { const result = await monitor.processBatch([message]) expect(result).toEqual([message]) - expect(mockVersionMetrics.incrementLibVersionWarning).not.toHaveBeenCalled() + expect(VersionMetrics.incrementLibVersionWarning).not.toHaveBeenCalled() expect(mockCaptureWarning).not.toHaveBeenCalled() }) }) diff --git a/plugin-server/src/main/ingestion-queues/session-recording-v2/versions/lib-version-monitor.ts b/plugin-server/src/main/ingestion-queues/session-recording-v2/versions/lib-version-monitor.ts index 348590c8010b1..3029eb196e37c 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording-v2/versions/lib-version-monitor.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording-v2/versions/lib-version-monitor.ts @@ -6,7 +6,7 @@ import { CaptureIngestionWarningFn } from '../types' import { VersionMetrics } from './version-metrics' export class LibVersionMonitor { - constructor(private readonly captureWarning: CaptureIngestionWarningFn, private readonly metrics: VersionMetrics) {} + constructor(private readonly captureWarning: CaptureIngestionWarningFn) {} public async processBatch(messages: MessageWithTeam[]): Promise { await Promise.all(messages.map((message) => this.checkLibVersion(message))) @@ -18,7 +18,7 @@ export class LibVersionMonitor { const parsedVersion = this.parseVersion(libVersion) if (parsedVersion && parsedVersion.major === 1 && parsedVersion.minor < 75) { - this.metrics.incrementLibVersionWarning() + VersionMetrics.incrementLibVersionWarning() await this.captureWarning( message.team.teamId, diff --git a/plugin-server/src/main/ingestion-queues/session-recording-v2/versions/version-metrics.ts b/plugin-server/src/main/ingestion-queues/session-recording-v2/versions/version-metrics.ts index 370768b824fab..886555c600d91 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording-v2/versions/version-metrics.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording-v2/versions/version-metrics.ts @@ -1,24 +1,12 @@ import { Counter } from 'prom-client' export class VersionMetrics { - private static instance: VersionMetrics - private readonly libVersionWarningCounter: Counter + private static readonly libVersionWarningCounter = new Counter({ + name: 'recording_blob_ingestion_v2_lib_version_warning_counter', + help: 'the number of times we have seen a message with a lib version that is too old, each _might_ cause an ingestion warning if not debounced', + }) - public constructor() { - this.libVersionWarningCounter = new Counter({ - name: 'recording_blob_ingestion_v2_lib_version_warning_counter', - help: 'the number of times we have seen a message with a lib version that is too old, each _might_ cause an ingestion warning if not debounced', - }) - } - - public static getInstance(): VersionMetrics { - if (!VersionMetrics.instance) { - VersionMetrics.instance = new VersionMetrics() - } - return VersionMetrics.instance - } - - public incrementLibVersionWarning(): void { + public static incrementLibVersionWarning(): void { this.libVersionWarningCounter.inc(1) } } diff --git a/plugin-server/tests/main/ingestion-queues/session-recording-v2/kafka/offset-manager.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording-v2/kafka/offset-manager.test.ts deleted file mode 100644 index f199990535c61..0000000000000 --- a/plugin-server/tests/main/ingestion-queues/session-recording-v2/kafka/offset-manager.test.ts +++ /dev/null @@ -1,198 +0,0 @@ -import { KafkaOffsetManager } from '../../../../../src/main/ingestion-queues/session-recording-v2/kafka/offset-manager' -import { SessionBatchRecorder } from '../../../../../src/main/ingestion-queues/session-recording-v2/sessions/session-batch-recorder' -import { MessageWithTeam } from '../../../../../src/main/ingestion-queues/session-recording-v2/teams/types' - -describe('KafkaOffsetManager', () => { - let offsetManager: KafkaOffsetManager - let mockCommitOffsets: jest.Mock> - let mockRecorder: jest.Mocked - const TEST_TOPIC = 'test_topic' - - beforeEach(() => { - mockCommitOffsets = jest.fn().mockResolvedValue(undefined) - mockRecorder = { - record: jest.fn().mockReturnValue(100), - flush: jest.fn().mockResolvedValue(undefined), - size: 0, - discardPartition: jest.fn(), - } as unknown as jest.Mocked - - offsetManager = new KafkaOffsetManager(mockCommitOffsets, TEST_TOPIC) - }) - - const createMessage = (metadata: { partition: number; offset: number }): MessageWithTeam => ({ - team: { - teamId: 1, - consoleLogIngestionEnabled: false, - }, - message: { - distinct_id: 'distinct_id', - session_id: 'session1', - eventsByWindowId: { window1: [] }, - eventsRange: { start: 0, end: 0 }, - metadata: { - partition: metadata.partition, - offset: metadata.offset, - topic: 'test_topic', - timestamp: 0, - rawSize: 0, - }, - }, - }) - - it('should track offsets when recording messages', async () => { - const wrapper = offsetManager.wrapBatch(mockRecorder) - const message: MessageWithTeam = { - team: { teamId: 1, consoleLogIngestionEnabled: false }, - message: { - metadata: { partition: 1, offset: 100 }, - }, - } as MessageWithTeam - - wrapper.record(message) - - await wrapper.flush() - await offsetManager.commit() - - expect(mockCommitOffsets).toHaveBeenCalledWith([{ topic: TEST_TOPIC, partition: 1, offset: 101 }]) - }) - - it('should commit offsets for multiple partitions', async () => { - const wrapper = offsetManager.wrapBatch(mockRecorder) - const messages = [ - { partition: 1, offset: 100 }, - { partition: 1, offset: 101 }, - { partition: 2, offset: 200 }, - ] - - for (const metadata of messages) { - wrapper.record({ - team: { teamId: 1, consoleLogIngestionEnabled: false }, - message: { metadata }, - } as MessageWithTeam) - } - - await wrapper.flush() - await offsetManager.commit() - - expect(mockCommitOffsets).toHaveBeenCalledWith([ - { topic: TEST_TOPIC, partition: 1, offset: 102 }, // Last offset + 1 - { topic: TEST_TOPIC, partition: 2, offset: 201 }, // Last offset + 1 - ]) - }) - - it('should clear offsets after commit', async () => { - const wrapper = offsetManager.wrapBatch(mockRecorder) - const message: MessageWithTeam = { - team: { teamId: 1, consoleLogIngestionEnabled: false }, - message: { - metadata: { partition: 1, offset: 100 }, - }, - } as MessageWithTeam - - wrapper.record(message) - await wrapper.flush() - await offsetManager.commit() - - // Second commit should not commit anything - await offsetManager.commit() - - expect(mockCommitOffsets).toHaveBeenCalledTimes(1) - }) - - it('should handle commit failures', async () => { - const error = new Error('Commit failed') - mockCommitOffsets.mockRejectedValueOnce(error) - - const wrapper = offsetManager.wrapBatch(mockRecorder) - wrapper.record({ - team: { teamId: 1, consoleLogIngestionEnabled: false }, - message: { - metadata: { partition: 1, offset: 100 }, - }, - } as MessageWithTeam) - - await wrapper.flush() - await expect(offsetManager.commit()).rejects.toThrow(error) - }) - - describe('partition handling', () => { - it('should delegate discardPartition to inner recorder', () => { - const wrappedBatch = offsetManager.wrapBatch(mockRecorder) - wrappedBatch.discardPartition(1) - - expect(mockRecorder.discardPartition).toHaveBeenCalledWith(1) - }) - - it('should not commit offsets for discarded partitions', async () => { - const wrappedBatch = offsetManager.wrapBatch(mockRecorder) - - // Record messages for two partitions - wrappedBatch.record(createMessage({ partition: 1, offset: 100 })) - wrappedBatch.record(createMessage({ partition: 2, offset: 200 })) - - // Discard partition 1 - wrappedBatch.discardPartition(1) - - await offsetManager.commit() - - // Should only commit offset for partition 2 - expect(mockCommitOffsets).toHaveBeenCalledWith([ - { - topic: 'test_topic', - partition: 2, - offset: 201, - }, - ]) - }) - - it('should handle discarding already committed partitions', async () => { - const wrappedBatch = offsetManager.wrapBatch(mockRecorder) - - // Record and commit a message - wrappedBatch.record(createMessage({ partition: 1, offset: 100 })) - await offsetManager.commit() - - // Discard the partition after commit - wrappedBatch.discardPartition(1) - - // Record new message for same partition - wrappedBatch.record(createMessage({ partition: 1, offset: 101 })) - await offsetManager.commit() - - expect(mockCommitOffsets).toHaveBeenCalledTimes(2) - expect(mockCommitOffsets).toHaveBeenLastCalledWith([ - { - topic: 'test_topic', - partition: 1, - offset: 102, - }, - ]) - }) - - it('should handle discarding non-existent partitions', () => { - const wrappedBatch = offsetManager.wrapBatch(mockRecorder) - wrappedBatch.discardPartition(999) - expect(mockRecorder.discardPartition).toHaveBeenCalledWith(999) - }) - - it('should maintain highest offset when recording multiple messages', async () => { - const wrappedBatch = offsetManager.wrapBatch(mockRecorder) - - // Record messages in non-sequential order - wrappedBatch.record(createMessage({ partition: 1, offset: 100 })) - wrappedBatch.record(createMessage({ partition: 1, offset: 99 })) - wrappedBatch.record(createMessage({ partition: 1, offset: 101 })) - - await offsetManager.commit() - - expect(mockCommitOffsets).toHaveBeenCalledWith([ - { - topic: 'test_topic', - partition: 1, - offset: 102, - }, - ]) - }) - }) -})