Skip to content

Commit

Permalink
feat(events): Add event context
Browse files Browse the repository at this point in the history
  • Loading branch information
felipecsl committed Feb 3, 2025
1 parent 29e95b7 commit 736bfd9
Show file tree
Hide file tree
Showing 9 changed files with 97 additions and 21 deletions.
14 changes: 14 additions & 0 deletions src/client/eppo-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,20 @@ export default class EppoClient {
this.eventDispatcher = eventDispatcher;
}

/**
* Attaches a context to be included with all events dispatched by the EventDispatcher.
* The context is delivered as a top-level object in the ingestion request payload.
* An existing key can be removed by providing a `null` value.
* Calling this method with same key multiple times causes only the last value to be used for the
* given key.
*
* @param key - The context entry key.
* @param value - The context entry value.
*/
setContext(key: string, value: string | number | boolean | null) {
this.eventDispatcher?.attachContext(key, value);
}

// noinspection JSUnusedGlobalSymbols
setBanditModelConfigurationStore(
banditModelConfigurationStore: IConfigurationStore<BanditParameters>,
Expand Down
10 changes: 5 additions & 5 deletions src/events/batch-retry-manager.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ describe('BatchRetryManager', () => {

it('should successfully retry and deliver a batch with no failures', async () => {
mockDelivery.deliver.mockResolvedValueOnce({ failedEvents: [] });
const result = await batchRetryManager.retry(mockBatch);
const result = await batchRetryManager.retry(mockBatch, {});
expect(result).toEqual([]);
expect(mockDelivery.deliver).toHaveBeenCalledTimes(1);
expect(mockDelivery.deliver).toHaveBeenCalledWith(mockBatch);
expect(mockDelivery.deliver).toHaveBeenCalledWith(mockBatch, {});
});

it('should retry failed deliveries up to maxRetries times and return last failed batch', async () => {
mockDelivery.deliver.mockResolvedValue({ failedEvents: [{ id: 'event1' }] });
const result = await batchRetryManager.retry(mockBatch);
const result = await batchRetryManager.retry(mockBatch, {});
expect(result).toEqual([{ id: 'event1' }]);
expect(mockDelivery.deliver).toHaveBeenCalledTimes(maxRetries);
});
Expand All @@ -40,7 +40,7 @@ describe('BatchRetryManager', () => {

jest.useFakeTimers();

const retryPromise = batchRetryManager.retry(mockBatch);
const retryPromise = batchRetryManager.retry(mockBatch, {});

// 1st retry: 100ms
// 2nd retry: 200ms
Expand All @@ -67,7 +67,7 @@ describe('BatchRetryManager', () => {

jest.useFakeTimers();

const retryPromise = batchRetryManager.retry(mockBatch);
const retryPromise = batchRetryManager.retry(mockBatch, {});
// 100ms + 200ms + 300ms (maxRetryDelayMs) = 600ms
await jest.advanceTimersByTimeAsync(600);
const result = await retryPromise;
Expand Down
11 changes: 6 additions & 5 deletions src/events/batch-retry-manager.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { logger } from '../application-logger';

Check warning on line 1 in src/events/batch-retry-manager.ts

View workflow job for this annotation

GitHub Actions / lint-test-sdk (18)

There should be at least one empty line between import groups

Check warning on line 1 in src/events/batch-retry-manager.ts

View workflow job for this annotation

GitHub Actions / lint-test-sdk (20)

There should be at least one empty line between import groups

Check warning on line 1 in src/events/batch-retry-manager.ts

View workflow job for this annotation

GitHub Actions / lint-test-sdk (22)

There should be at least one empty line between import groups

Check warning on line 1 in src/events/batch-retry-manager.ts

View workflow job for this annotation

GitHub Actions / lint-test-sdk (23)

There should be at least one empty line between import groups
import { EventContext } from './default-event-dispatcher';

Check warning on line 2 in src/events/batch-retry-manager.ts

View workflow job for this annotation

GitHub Actions / lint-test-sdk (18)

There should be no empty line within import group

Check warning on line 2 in src/events/batch-retry-manager.ts

View workflow job for this annotation

GitHub Actions / lint-test-sdk (20)

There should be no empty line within import group

Check warning on line 2 in src/events/batch-retry-manager.ts

View workflow job for this annotation

GitHub Actions / lint-test-sdk (22)

There should be no empty line within import group

Check warning on line 2 in src/events/batch-retry-manager.ts

View workflow job for this annotation

GitHub Actions / lint-test-sdk (23)

There should be no empty line within import group

import Event from './event';
import { EventDeliveryResult } from './event-delivery';

export type IEventDelivery = {
deliver(batch: Event[]): Promise<EventDeliveryResult>;
export interface IEventDelivery {
deliver(batch: Event[], context: EventContext): Promise<EventDeliveryResult>;
};

Check warning on line 9 in src/events/batch-retry-manager.ts

View workflow job for this annotation

GitHub Actions / lint-test-sdk (18)

Delete `;`

Check warning on line 9 in src/events/batch-retry-manager.ts

View workflow job for this annotation

GitHub Actions / lint-test-sdk (20)

Delete `;`

Check warning on line 9 in src/events/batch-retry-manager.ts

View workflow job for this annotation

GitHub Actions / lint-test-sdk (22)

Delete `;`

Check warning on line 9 in src/events/batch-retry-manager.ts

View workflow job for this annotation

GitHub Actions / lint-test-sdk (23)

Delete `;`

/**
Expand All @@ -27,22 +28,22 @@ export default class BatchRetryManager {
) {}

/** Re-attempts delivery of the provided batch, returns the UUIDs of events that failed retry. */
async retry(batch: Event[], attempt = 0): Promise<Event[]> {
async retry(batch: Event[], context: EventContext, attempt = 0): Promise<Event[]> {
const { retryIntervalMs, maxRetryDelayMs, maxRetries } = this.config;
const delay = Math.min(retryIntervalMs * Math.pow(2, attempt), maxRetryDelayMs);
logger.info(
`[BatchRetryManager] Retrying batch delivery of ${batch.length} events in ${delay}ms...`,
);
await new Promise((resolve) => setTimeout(resolve, delay));

const { failedEvents } = await this.delivery.deliver(batch);
const { failedEvents } = await this.delivery.deliver(batch, context);
if (failedEvents.length === 0) {
logger.info(`[BatchRetryManager] Batch delivery successfully after ${attempt + 1} tries.`);
return [];
}
// attempts are zero-indexed while maxRetries is not
if (attempt < maxRetries - 1) {
return this.retry(failedEvents, attempt + 1);
return this.retry(failedEvents, context, attempt + 1);
} else {
logger.warn(`[BatchRetryManager] Failed to deliver batch after ${maxRetries} tries, bailing`);
return batch;
Expand Down
35 changes: 35 additions & 0 deletions src/events/default-event-dispatcher.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import DefaultEventDispatcher, {
import Event from './event';
import NetworkStatusListener from './network-status-listener';
import NoOpEventDispatcher from './no-op-event-dispatcher';

Check warning on line 9 in src/events/default-event-dispatcher.spec.ts

View workflow job for this annotation

GitHub Actions / lint-test-sdk (18)

There should be at least one empty line between import groups

Check warning on line 9 in src/events/default-event-dispatcher.spec.ts

View workflow job for this annotation

GitHub Actions / lint-test-sdk (20)

There should be at least one empty line between import groups

Check warning on line 9 in src/events/default-event-dispatcher.spec.ts

View workflow job for this annotation

GitHub Actions / lint-test-sdk (22)

There should be at least one empty line between import groups

Check warning on line 9 in src/events/default-event-dispatcher.spec.ts

View workflow job for this annotation

GitHub Actions / lint-test-sdk (23)

There should be at least one empty line between import groups
import { v4 as randomUUID } from 'uuid';

Check warning on line 10 in src/events/default-event-dispatcher.spec.ts

View workflow job for this annotation

GitHub Actions / lint-test-sdk (18)

`uuid` import should occur before import of `./array-backed-named-event-queue`

Check warning on line 10 in src/events/default-event-dispatcher.spec.ts

View workflow job for this annotation

GitHub Actions / lint-test-sdk (20)

`uuid` import should occur before import of `./array-backed-named-event-queue`

Check warning on line 10 in src/events/default-event-dispatcher.spec.ts

View workflow job for this annotation

GitHub Actions / lint-test-sdk (22)

`uuid` import should occur before import of `./array-backed-named-event-queue`

Check warning on line 10 in src/events/default-event-dispatcher.spec.ts

View workflow job for this annotation

GitHub Actions / lint-test-sdk (23)

`uuid` import should occur before import of `./array-backed-named-event-queue`

global.fetch = jest.fn();

Expand Down Expand Up @@ -120,6 +121,7 @@ describe('DefaultEventDispatcher', () => {
let fetchOptions = fetch.mock.calls[0][1];
let payload = JSON.parse(fetchOptions.body);
expect(payload).toEqual({
context: {},
eppo_events: [
expect.objectContaining({ payload: { foo: 'event1' } }),
expect.objectContaining({ payload: { foo: 'event2' } }),
Expand All @@ -139,6 +141,7 @@ describe('DefaultEventDispatcher', () => {
fetchOptions = fetch.mock.calls[1][1];
payload = JSON.parse(fetchOptions.body);
expect(payload).toEqual({
context: {},
eppo_events: [expect.objectContaining({ payload: { foo: 'event3' } })],
});
});
Expand Down Expand Up @@ -318,4 +321,36 @@ describe('DefaultEventDispatcher', () => {
expect(dispatcher).toBeInstanceOf(DefaultEventDispatcher);
});
});

describe('attachContext', () => {
it.only('attaches a context to be included with all events dispatched by this dispatcher', async () => {
const eventQueue = new ArrayBackedNamedEventQueue<Event>('test-queue');
const { dispatcher } = createDispatcher({ maxRetries: 1 }, eventQueue);
dispatcher.attachContext('foo', 'bar');
dispatcher.attachContext('baz', 'qux');
const event = {
uuid: randomUUID(),
payload: { foo: 'event1' },
timestamp: new Date().getTime(),
type: 'foo',
};
dispatcher.dispatch(event);
const fetch = global.fetch as jest.Mock;
fetch.mockResolvedValue({ ok: true, json: () => Promise.resolve([]) });

await new Promise((resolve) => setTimeout(resolve, 100));

expect(global.fetch).toHaveBeenCalledWith(

Check warning on line 343 in src/events/default-event-dispatcher.spec.ts

View workflow job for this annotation

GitHub Actions / lint-test-sdk (18)

Replace `⏎········'http://example.com',⏎·······` with `'http://example.com',`

Check warning on line 343 in src/events/default-event-dispatcher.spec.ts

View workflow job for this annotation

GitHub Actions / lint-test-sdk (20)

Replace `⏎········'http://example.com',⏎·······` with `'http://example.com',`

Check warning on line 343 in src/events/default-event-dispatcher.spec.ts

View workflow job for this annotation

GitHub Actions / lint-test-sdk (22)

Replace `⏎········'http://example.com',⏎·······` with `'http://example.com',`

Check warning on line 343 in src/events/default-event-dispatcher.spec.ts

View workflow job for this annotation

GitHub Actions / lint-test-sdk (23)

Replace `⏎········'http://example.com',⏎·······` with `'http://example.com',`
'http://example.com',
{
method: 'POST',

Check warning on line 346 in src/events/default-event-dispatcher.spec.ts

View workflow job for this annotation

GitHub Actions / lint-test-sdk (18)

Delete `··`

Check warning on line 346 in src/events/default-event-dispatcher.spec.ts

View workflow job for this annotation

GitHub Actions / lint-test-sdk (20)

Delete `··`

Check warning on line 346 in src/events/default-event-dispatcher.spec.ts

View workflow job for this annotation

GitHub Actions / lint-test-sdk (22)

Delete `··`

Check warning on line 346 in src/events/default-event-dispatcher.spec.ts

View workflow job for this annotation

GitHub Actions / lint-test-sdk (23)

Delete `··`
headers: { 'Content-Type': 'application/json', 'x-eppo-token': 'test-sdk-key' },

Check warning on line 347 in src/events/default-event-dispatcher.spec.ts

View workflow job for this annotation

GitHub Actions / lint-test-sdk (18)

Replace `··········` with `········`

Check warning on line 347 in src/events/default-event-dispatcher.spec.ts

View workflow job for this annotation

GitHub Actions / lint-test-sdk (20)

Replace `··········` with `········`

Check warning on line 347 in src/events/default-event-dispatcher.spec.ts

View workflow job for this annotation

GitHub Actions / lint-test-sdk (22)

Replace `··········` with `········`

Check warning on line 347 in src/events/default-event-dispatcher.spec.ts

View workflow job for this annotation

GitHub Actions / lint-test-sdk (23)

Replace `··········` with `········`
body: JSON.stringify({

Check warning on line 348 in src/events/default-event-dispatcher.spec.ts

View workflow job for this annotation

GitHub Actions / lint-test-sdk (18)

Delete `··`

Check warning on line 348 in src/events/default-event-dispatcher.spec.ts

View workflow job for this annotation

GitHub Actions / lint-test-sdk (20)

Delete `··`

Check warning on line 348 in src/events/default-event-dispatcher.spec.ts

View workflow job for this annotation

GitHub Actions / lint-test-sdk (22)

Delete `··`

Check warning on line 348 in src/events/default-event-dispatcher.spec.ts

View workflow job for this annotation

GitHub Actions / lint-test-sdk (23)

Delete `··`
eppo_events: [event],

Check warning on line 349 in src/events/default-event-dispatcher.spec.ts

View workflow job for this annotation

GitHub Actions / lint-test-sdk (18)

Delete `··`

Check warning on line 349 in src/events/default-event-dispatcher.spec.ts

View workflow job for this annotation

GitHub Actions / lint-test-sdk (20)

Delete `··`

Check warning on line 349 in src/events/default-event-dispatcher.spec.ts

View workflow job for this annotation

GitHub Actions / lint-test-sdk (22)

Delete `··`

Check warning on line 349 in src/events/default-event-dispatcher.spec.ts

View workflow job for this annotation

GitHub Actions / lint-test-sdk (23)

Delete `··`
context: { foo: 'bar', baz: 'qux' },
}),
},
);
});
});
});
13 changes: 11 additions & 2 deletions src/events/default-event-dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ export type EventDispatcherConfig = {
maxRetries?: number;
};

export type EventContext = Record<string, string | number | boolean | null>;

export const DEFAULT_EVENT_DISPATCHER_BATCH_SIZE = 1_000;
export const DEFAULT_EVENT_DISPATCHER_CONFIG: Omit<
EventDispatcherConfig,
Expand All @@ -46,6 +48,7 @@ export default class DefaultEventDispatcher implements EventDispatcher {
private readonly eventDelivery: EventDelivery;
private readonly retryManager: BatchRetryManager;
private readonly deliveryIntervalMs: number;
private readonly context: EventContext = {};
private dispatchTimer: NodeJS.Timeout | null = null;
private isOffline = false;

Expand Down Expand Up @@ -74,6 +77,10 @@ export default class DefaultEventDispatcher implements EventDispatcher {
});
}

attachContext(key: string, value: string | number | boolean | null): void {
this.context[key] = value;
}

dispatch(event: Event) {
this.batchProcessor.push(event);
this.maybeScheduleNextDelivery();
Expand All @@ -92,10 +99,12 @@ export default class DefaultEventDispatcher implements EventDispatcher {
return;
}

const { failedEvents } = await this.eventDelivery.deliver(batch);
// make a defensive copy of the context to avoid mutating the original
const context = { ...this.context };
const { failedEvents } = await this.eventDelivery.deliver(batch, context);
if (failedEvents.length > 0) {
logger.warn('[EventDispatcher] Failed to deliver some events from batch, retrying...');
const failedRetry = await this.retryManager.retry(failedEvents);
const failedRetry = await this.retryManager.retry(failedEvents, context);
if (failedRetry.length > 0) {
// re-enqueue events that failed to retry
this.batchProcessor.push(...failedRetry);
Expand Down
12 changes: 6 additions & 6 deletions src/events/event-delivery.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,40 +20,40 @@ describe('EventDelivery', () => {
it('should deliver events successfully when response is OK', async () => {
const mockResponse = { ok: true, json: async () => ({}) };
(global.fetch as jest.Mock).mockResolvedValue(mockResponse);
const result = await eventDelivery.deliver(testBatch);
const result = await eventDelivery.deliver(testBatch, {});
expect(result).toEqual({ failedEvents: [] });
expect(global.fetch).toHaveBeenCalledWith(ingestionUrl, {
method: 'POST',
headers: { 'Content-Type': 'application/json', 'x-eppo-token': sdkKey },
body: JSON.stringify({ eppo_events: testBatch }),
body: JSON.stringify({ eppo_events: testBatch, context: {} }),
});
});

it('should return failed result if response is not OK', async () => {
const mockResponse = { ok: false };
(global.fetch as jest.Mock).mockResolvedValue(mockResponse);
const result = await eventDelivery.deliver(testBatch);
const result = await eventDelivery.deliver(testBatch, {});
expect(result).toEqual({ failedEvents: testBatch });
});

it('should return failed events when response includes failed events', async () => {
const failedEvents = ['1', '2'];
const mockResponse = { ok: true, json: async () => ({ failed_events: failedEvents }) };
(global.fetch as jest.Mock).mockResolvedValue(mockResponse);
const result = await eventDelivery.deliver(testBatch);
const result = await eventDelivery.deliver(testBatch, {});
expect(result).toEqual({ failedEvents: [testBatch[0], testBatch[1]] });
});

it('should return success=true if no failed events in the response', async () => {
const mockResponse = { ok: true, json: async () => ({}) };
(global.fetch as jest.Mock).mockResolvedValue(mockResponse);
const result = await eventDelivery.deliver(testBatch);
const result = await eventDelivery.deliver(testBatch, {});
expect(result).toEqual({ failedEvents: [] });
});

it('should handle fetch errors gracefully', async () => {
(global.fetch as jest.Mock).mockRejectedValue(new Error('Network error'));
const result = await eventDelivery.deliver(testBatch);
const result = await eventDelivery.deliver(testBatch, {});
expect(result).toEqual({ failedEvents: testBatch });
});
});
8 changes: 5 additions & 3 deletions src/events/event-delivery.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import { logger } from '../application-logger';
import { IEventDelivery } from './batch-retry-manager';
import { EventContext } from './default-event-dispatcher';

import Event from './event';

export type EventDeliveryResult = {
failedEvents: Event[];
};

export default class EventDelivery {
export default class EventDelivery implements IEventDelivery {
constructor(
private readonly sdkKey: string,
private readonly ingestionUrl: string,
Expand All @@ -16,15 +18,15 @@ export default class EventDelivery {
* Delivers a batch of events to the ingestion URL endpoint. Returns the UUIDs of any events from
* the batch that failed ingestion.
*/
async deliver(batch: Event[]): Promise<EventDeliveryResult> {
async deliver(batch: Event[], context: EventContext): Promise<EventDeliveryResult> {
try {
logger.info(
`[EventDispatcher] Delivering batch of ${batch.length} events to ${this.ingestionUrl}...`,
);
const response = await fetch(this.ingestionUrl, {
method: 'POST',
headers: { 'Content-Type': 'application/json', 'x-eppo-token': this.sdkKey },
body: JSON.stringify({ eppo_events: batch }),
body: JSON.stringify({ eppo_events: batch, context }),
});
if (response.ok) {
return await this.parseFailedEvents(response, batch);
Expand Down
11 changes: 11 additions & 0 deletions src/events/event-dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,15 @@ import Event from './event';
export default interface EventDispatcher {
/** Dispatches (enqueues) an event for eventual delivery. */
dispatch(event: Event): void;
/**
* Attaches a context to be included with all events dispatched by this dispatcher.
* The context is delivered as a top-level object in the ingestion request payload.
* An existing key can be removed by providing a `null` value.
* Calling this method with same key multiple times causes only the last value to be used for the
* given key.
*
* @param key - The context entry key.
* @param value - The context entry value.
*/
attachContext(key: string, value: string | number | boolean | null): void;
}
4 changes: 4 additions & 0 deletions src/events/no-op-event-dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ import Event from './event';
import EventDispatcher from './event-dispatcher';

export default class NoOpEventDispatcher implements EventDispatcher {
attachContext(key: string, value: string | number | boolean | null): void {
// Do nothing
}

// eslint-disable-next-line @typescript-eslint/no-unused-vars
dispatch(_: Event): void {
// Do nothing
Expand Down

0 comments on commit 736bfd9

Please sign in to comment.