From 736bfd9882c88970fce9418ec32ca6e07f0c865b Mon Sep 17 00:00:00 2001 From: Felipe Lima Date: Mon, 3 Feb 2025 16:28:26 -0300 Subject: [PATCH] feat(events): Add event context --- src/client/eppo-client.ts | 14 +++++++++ src/events/batch-retry-manager.spec.ts | 10 +++--- src/events/batch-retry-manager.ts | 11 ++++--- src/events/default-event-dispatcher.spec.ts | 35 +++++++++++++++++++++ src/events/default-event-dispatcher.ts | 13 ++++++-- src/events/event-delivery.spec.ts | 12 +++---- src/events/event-delivery.ts | 8 +++-- src/events/event-dispatcher.ts | 11 +++++++ src/events/no-op-event-dispatcher.ts | 4 +++ 9 files changed, 97 insertions(+), 21 deletions(-) diff --git a/src/client/eppo-client.ts b/src/client/eppo-client.ts index 8378b852..c8cf94b7 100644 --- a/src/client/eppo-client.ts +++ b/src/client/eppo-client.ts @@ -165,6 +165,20 @@ export default class EppoClient { this.eventDispatcher = eventDispatcher; } + /** + * Attaches a context to be included with all events dispatched by the EventDispatcher. + * The context is delivered as a top-level object in the ingestion request payload. + * An existing key can be removed by providing a `null` value. + * Calling this method with same key multiple times causes only the last value to be used for the + * given key. + * + * @param key - The context entry key. + * @param value - The context entry value. + */ + setContext(key: string, value: string | number | boolean | null) { + this.eventDispatcher?.attachContext(key, value); + } + // noinspection JSUnusedGlobalSymbols setBanditModelConfigurationStore( banditModelConfigurationStore: IConfigurationStore, diff --git a/src/events/batch-retry-manager.spec.ts b/src/events/batch-retry-manager.spec.ts index 8c7947b8..598322e8 100644 --- a/src/events/batch-retry-manager.spec.ts +++ b/src/events/batch-retry-manager.spec.ts @@ -19,15 +19,15 @@ describe('BatchRetryManager', () => { it('should successfully retry and deliver a batch with no failures', async () => { mockDelivery.deliver.mockResolvedValueOnce({ failedEvents: [] }); - const result = await batchRetryManager.retry(mockBatch); + const result = await batchRetryManager.retry(mockBatch, {}); expect(result).toEqual([]); expect(mockDelivery.deliver).toHaveBeenCalledTimes(1); - expect(mockDelivery.deliver).toHaveBeenCalledWith(mockBatch); + expect(mockDelivery.deliver).toHaveBeenCalledWith(mockBatch, {}); }); it('should retry failed deliveries up to maxRetries times and return last failed batch', async () => { mockDelivery.deliver.mockResolvedValue({ failedEvents: [{ id: 'event1' }] }); - const result = await batchRetryManager.retry(mockBatch); + const result = await batchRetryManager.retry(mockBatch, {}); expect(result).toEqual([{ id: 'event1' }]); expect(mockDelivery.deliver).toHaveBeenCalledTimes(maxRetries); }); @@ -40,7 +40,7 @@ describe('BatchRetryManager', () => { jest.useFakeTimers(); - const retryPromise = batchRetryManager.retry(mockBatch); + const retryPromise = batchRetryManager.retry(mockBatch, {}); // 1st retry: 100ms // 2nd retry: 200ms @@ -67,7 +67,7 @@ describe('BatchRetryManager', () => { jest.useFakeTimers(); - const retryPromise = batchRetryManager.retry(mockBatch); + const retryPromise = batchRetryManager.retry(mockBatch, {}); // 100ms + 200ms + 300ms (maxRetryDelayMs) = 600ms await jest.advanceTimersByTimeAsync(600); const result = await retryPromise; diff --git a/src/events/batch-retry-manager.ts b/src/events/batch-retry-manager.ts index f9e56647..ba4c2846 100644 --- a/src/events/batch-retry-manager.ts +++ b/src/events/batch-retry-manager.ts @@ -1,10 +1,11 @@ import { logger } from '../application-logger'; +import { EventContext } from './default-event-dispatcher'; import Event from './event'; import { EventDeliveryResult } from './event-delivery'; -export type IEventDelivery = { - deliver(batch: Event[]): Promise; +export interface IEventDelivery { + deliver(batch: Event[], context: EventContext): Promise; }; /** @@ -27,7 +28,7 @@ export default class BatchRetryManager { ) {} /** Re-attempts delivery of the provided batch, returns the UUIDs of events that failed retry. */ - async retry(batch: Event[], attempt = 0): Promise { + async retry(batch: Event[], context: EventContext, attempt = 0): Promise { const { retryIntervalMs, maxRetryDelayMs, maxRetries } = this.config; const delay = Math.min(retryIntervalMs * Math.pow(2, attempt), maxRetryDelayMs); logger.info( @@ -35,14 +36,14 @@ export default class BatchRetryManager { ); await new Promise((resolve) => setTimeout(resolve, delay)); - const { failedEvents } = await this.delivery.deliver(batch); + const { failedEvents } = await this.delivery.deliver(batch, context); if (failedEvents.length === 0) { logger.info(`[BatchRetryManager] Batch delivery successfully after ${attempt + 1} tries.`); return []; } // attempts are zero-indexed while maxRetries is not if (attempt < maxRetries - 1) { - return this.retry(failedEvents, attempt + 1); + return this.retry(failedEvents, context, attempt + 1); } else { logger.warn(`[BatchRetryManager] Failed to deliver batch after ${maxRetries} tries, bailing`); return batch; diff --git a/src/events/default-event-dispatcher.spec.ts b/src/events/default-event-dispatcher.spec.ts index 50d423c8..feaa1646 100644 --- a/src/events/default-event-dispatcher.spec.ts +++ b/src/events/default-event-dispatcher.spec.ts @@ -7,6 +7,7 @@ import DefaultEventDispatcher, { import Event from './event'; import NetworkStatusListener from './network-status-listener'; import NoOpEventDispatcher from './no-op-event-dispatcher'; +import { v4 as randomUUID } from 'uuid'; global.fetch = jest.fn(); @@ -120,6 +121,7 @@ describe('DefaultEventDispatcher', () => { let fetchOptions = fetch.mock.calls[0][1]; let payload = JSON.parse(fetchOptions.body); expect(payload).toEqual({ + context: {}, eppo_events: [ expect.objectContaining({ payload: { foo: 'event1' } }), expect.objectContaining({ payload: { foo: 'event2' } }), @@ -139,6 +141,7 @@ describe('DefaultEventDispatcher', () => { fetchOptions = fetch.mock.calls[1][1]; payload = JSON.parse(fetchOptions.body); expect(payload).toEqual({ + context: {}, eppo_events: [expect.objectContaining({ payload: { foo: 'event3' } })], }); }); @@ -318,4 +321,36 @@ describe('DefaultEventDispatcher', () => { expect(dispatcher).toBeInstanceOf(DefaultEventDispatcher); }); }); + + describe('attachContext', () => { + it.only('attaches a context to be included with all events dispatched by this dispatcher', async () => { + const eventQueue = new ArrayBackedNamedEventQueue('test-queue'); + const { dispatcher } = createDispatcher({ maxRetries: 1 }, eventQueue); + dispatcher.attachContext('foo', 'bar'); + dispatcher.attachContext('baz', 'qux'); + const event = { + uuid: randomUUID(), + payload: { foo: 'event1' }, + timestamp: new Date().getTime(), + type: 'foo', + }; + dispatcher.dispatch(event); + const fetch = global.fetch as jest.Mock; + fetch.mockResolvedValue({ ok: true, json: () => Promise.resolve([]) }); + + await new Promise((resolve) => setTimeout(resolve, 100)); + + expect(global.fetch).toHaveBeenCalledWith( + 'http://example.com', + { + method: 'POST', + headers: { 'Content-Type': 'application/json', 'x-eppo-token': 'test-sdk-key' }, + body: JSON.stringify({ + eppo_events: [event], + context: { foo: 'bar', baz: 'qux' }, + }), + }, + ); + }); + }); }); diff --git a/src/events/default-event-dispatcher.ts b/src/events/default-event-dispatcher.ts index 3a606b7b..b848dc62 100644 --- a/src/events/default-event-dispatcher.ts +++ b/src/events/default-event-dispatcher.ts @@ -25,6 +25,8 @@ export type EventDispatcherConfig = { maxRetries?: number; }; +export type EventContext = Record; + export const DEFAULT_EVENT_DISPATCHER_BATCH_SIZE = 1_000; export const DEFAULT_EVENT_DISPATCHER_CONFIG: Omit< EventDispatcherConfig, @@ -46,6 +48,7 @@ export default class DefaultEventDispatcher implements EventDispatcher { private readonly eventDelivery: EventDelivery; private readonly retryManager: BatchRetryManager; private readonly deliveryIntervalMs: number; + private readonly context: EventContext = {}; private dispatchTimer: NodeJS.Timeout | null = null; private isOffline = false; @@ -74,6 +77,10 @@ export default class DefaultEventDispatcher implements EventDispatcher { }); } + attachContext(key: string, value: string | number | boolean | null): void { + this.context[key] = value; + } + dispatch(event: Event) { this.batchProcessor.push(event); this.maybeScheduleNextDelivery(); @@ -92,10 +99,12 @@ export default class DefaultEventDispatcher implements EventDispatcher { return; } - const { failedEvents } = await this.eventDelivery.deliver(batch); + // make a defensive copy of the context to avoid mutating the original + const context = { ...this.context }; + const { failedEvents } = await this.eventDelivery.deliver(batch, context); if (failedEvents.length > 0) { logger.warn('[EventDispatcher] Failed to deliver some events from batch, retrying...'); - const failedRetry = await this.retryManager.retry(failedEvents); + const failedRetry = await this.retryManager.retry(failedEvents, context); if (failedRetry.length > 0) { // re-enqueue events that failed to retry this.batchProcessor.push(...failedRetry); diff --git a/src/events/event-delivery.spec.ts b/src/events/event-delivery.spec.ts index fd321750..49a9fab6 100644 --- a/src/events/event-delivery.spec.ts +++ b/src/events/event-delivery.spec.ts @@ -20,19 +20,19 @@ describe('EventDelivery', () => { it('should deliver events successfully when response is OK', async () => { const mockResponse = { ok: true, json: async () => ({}) }; (global.fetch as jest.Mock).mockResolvedValue(mockResponse); - const result = await eventDelivery.deliver(testBatch); + const result = await eventDelivery.deliver(testBatch, {}); expect(result).toEqual({ failedEvents: [] }); expect(global.fetch).toHaveBeenCalledWith(ingestionUrl, { method: 'POST', headers: { 'Content-Type': 'application/json', 'x-eppo-token': sdkKey }, - body: JSON.stringify({ eppo_events: testBatch }), + body: JSON.stringify({ eppo_events: testBatch, context: {} }), }); }); it('should return failed result if response is not OK', async () => { const mockResponse = { ok: false }; (global.fetch as jest.Mock).mockResolvedValue(mockResponse); - const result = await eventDelivery.deliver(testBatch); + const result = await eventDelivery.deliver(testBatch, {}); expect(result).toEqual({ failedEvents: testBatch }); }); @@ -40,20 +40,20 @@ describe('EventDelivery', () => { const failedEvents = ['1', '2']; const mockResponse = { ok: true, json: async () => ({ failed_events: failedEvents }) }; (global.fetch as jest.Mock).mockResolvedValue(mockResponse); - const result = await eventDelivery.deliver(testBatch); + const result = await eventDelivery.deliver(testBatch, {}); expect(result).toEqual({ failedEvents: [testBatch[0], testBatch[1]] }); }); it('should return success=true if no failed events in the response', async () => { const mockResponse = { ok: true, json: async () => ({}) }; (global.fetch as jest.Mock).mockResolvedValue(mockResponse); - const result = await eventDelivery.deliver(testBatch); + const result = await eventDelivery.deliver(testBatch, {}); expect(result).toEqual({ failedEvents: [] }); }); it('should handle fetch errors gracefully', async () => { (global.fetch as jest.Mock).mockRejectedValue(new Error('Network error')); - const result = await eventDelivery.deliver(testBatch); + const result = await eventDelivery.deliver(testBatch, {}); expect(result).toEqual({ failedEvents: testBatch }); }); }); diff --git a/src/events/event-delivery.ts b/src/events/event-delivery.ts index b767d738..c51c9d36 100644 --- a/src/events/event-delivery.ts +++ b/src/events/event-delivery.ts @@ -1,4 +1,6 @@ import { logger } from '../application-logger'; +import { IEventDelivery } from './batch-retry-manager'; +import { EventContext } from './default-event-dispatcher'; import Event from './event'; @@ -6,7 +8,7 @@ export type EventDeliveryResult = { failedEvents: Event[]; }; -export default class EventDelivery { +export default class EventDelivery implements IEventDelivery { constructor( private readonly sdkKey: string, private readonly ingestionUrl: string, @@ -16,7 +18,7 @@ export default class EventDelivery { * Delivers a batch of events to the ingestion URL endpoint. Returns the UUIDs of any events from * the batch that failed ingestion. */ - async deliver(batch: Event[]): Promise { + async deliver(batch: Event[], context: EventContext): Promise { try { logger.info( `[EventDispatcher] Delivering batch of ${batch.length} events to ${this.ingestionUrl}...`, @@ -24,7 +26,7 @@ export default class EventDelivery { const response = await fetch(this.ingestionUrl, { method: 'POST', headers: { 'Content-Type': 'application/json', 'x-eppo-token': this.sdkKey }, - body: JSON.stringify({ eppo_events: batch }), + body: JSON.stringify({ eppo_events: batch, context }), }); if (response.ok) { return await this.parseFailedEvents(response, batch); diff --git a/src/events/event-dispatcher.ts b/src/events/event-dispatcher.ts index 1c74d12a..30c19074 100644 --- a/src/events/event-dispatcher.ts +++ b/src/events/event-dispatcher.ts @@ -3,4 +3,15 @@ import Event from './event'; export default interface EventDispatcher { /** Dispatches (enqueues) an event for eventual delivery. */ dispatch(event: Event): void; + /** + * Attaches a context to be included with all events dispatched by this dispatcher. + * The context is delivered as a top-level object in the ingestion request payload. + * An existing key can be removed by providing a `null` value. + * Calling this method with same key multiple times causes only the last value to be used for the + * given key. + * + * @param key - The context entry key. + * @param value - The context entry value. + */ + attachContext(key: string, value: string | number | boolean | null): void; } diff --git a/src/events/no-op-event-dispatcher.ts b/src/events/no-op-event-dispatcher.ts index 6d1b007c..9c5d32a5 100644 --- a/src/events/no-op-event-dispatcher.ts +++ b/src/events/no-op-event-dispatcher.ts @@ -2,6 +2,10 @@ import Event from './event'; import EventDispatcher from './event-dispatcher'; export default class NoOpEventDispatcher implements EventDispatcher { + attachContext(key: string, value: string | number | boolean | null): void { + // Do nothing + } + // eslint-disable-next-line @typescript-eslint/no-unused-vars dispatch(_: Event): void { // Do nothing