diff --git a/packages/event-producer/package.json b/packages/event-producer/package.json index 79ba4133..5a2ca533 100644 --- a/packages/event-producer/package.json +++ b/packages/event-producer/package.json @@ -1,6 +1,6 @@ { "name": "@tidal-music/event-producer", - "version": "1.1.2", + "version": "1.1.3", "type": "module", "files": [ "dist" diff --git a/packages/event-producer/src/submit/submit.test.ts b/packages/event-producer/src/submit/submit.test.ts index 1229e205..5cb6e631 100644 --- a/packages/event-producer/src/submit/submit.test.ts +++ b/packages/event-producer/src/submit/submit.test.ts @@ -13,6 +13,11 @@ vi.mock('../queue'); vi.mock('../monitor'); describe('submit', () => { + beforeEach(() => { + vi.mocked(queue).getEvents.mockReturnValue([]); + vi.mocked(queue).getEventBatch.mockReturnValue([]); + }); + it('fails to send if no credentialsProvider is set', async () => { vi.mocked(queue).getEventBatch.mockReturnValue([epEvent1]); @@ -100,6 +105,78 @@ describe('submit', () => { expect(outage.setOutage).not.toHaveBeenCalled(); }); + it('recursive calls until empty', async () => { + const events = Array.from({ length: 21 }, (_, i) => ({ + ...epEvent1, + id: `${i}`, + })); + const firstBatch = events.slice(0, 10); + const secondBatch = events.slice(10, 20); + const lastBatch = events.slice(20, 21); + + const [firstResponse, secondResponse, lastResponse] = [ + firstBatch, + secondBatch, + lastBatch, + ].map(batch => + js2xml( + { + xml: { + SendMessageBatchResponse: { + SendMessageBatchResult: batch.map(e => ({ + SendMessageBatchResultEntry: { Id: e.id }, + })), + }, + }, + }, + { compact: true }, + ), + ); + + vi.mocked(queue).getEventBatch.mockReturnValueOnce(firstBatch); + vi.mocked(queue).getEventBatch.mockReturnValueOnce(secondBatch); + vi.mocked(queue).getEventBatch.mockReturnValueOnce(lastBatch); + + vi.mocked(queue).getEvents.mockReturnValueOnce(events); + vi.mocked(queue).getEvents.mockReturnValueOnce( + secondBatch.concat(lastBatch), + ); + vi.mocked(queue).getEvents.mockReturnValueOnce(lastBatch); + vi.stubGlobal( + 'fetch', + vi + .fn() + .mockResolvedValueOnce({ + ok: true, + text: vi.fn().mockResolvedValue(firstResponse), + }) + .mockResolvedValueOnce({ + ok: true, + text: vi.fn().mockResolvedValue(secondResponse), + }) + .mockResolvedValueOnce({ + ok: true, + text: vi.fn().mockResolvedValue(lastResponse), + }), + ); + await submitEvents({ config }); + + expect(fetch).toHaveBeenCalledWith( + config.tlConsumerUri, + expect.objectContaining({ + body: expect.any(URLSearchParams), + headers: expect.any(Headers), + method: 'post', + }), + ); + + expect(fetch).toHaveBeenCalledTimes(3); + + expect(queue.removeEvents).toHaveBeenCalledWith(firstBatch.map(e => e.id)); + expect(queue.removeEvents).toHaveBeenCalledWith(secondBatch.map(e => e.id)); + expect(queue.removeEvents).toHaveBeenCalledWith(lastBatch.map(e => e.id)); + }); + it('does not submit events if there are no events in the queue', async () => { vi.spyOn(globalThis, 'fetch'); vi.mocked(queue).getEventBatch.mockReturnValue([]); diff --git a/packages/event-producer/src/submit/submit.ts b/packages/event-producer/src/submit/submit.ts index 13be2595..b87e6c0f 100644 --- a/packages/event-producer/src/submit/submit.ts +++ b/packages/event-producer/src/submit/submit.ts @@ -16,7 +16,9 @@ import { eventsToSqsRequestParameters } from '../utils/sqsParamsConverter'; * @param {SubmitEventsParams} params */ type SubmitEventsParams = { config: Config }; -export const submitEvents = async ({ config }: SubmitEventsParams) => { +export const submitEvents = async ({ + config, +}: SubmitEventsParams): Promise => { const eventsBatch = queue.getEventBatch(); if (eventsBatch.length === 0) { return Promise.resolve(); @@ -80,7 +82,10 @@ export const submitEvents = async ({ config }: SubmitEventsParams) => { } } }); - return queue.removeEvents(idsToRemove); + queue.removeEvents(idsToRemove); + if (queue.getEvents().length > 0) { + return submitEvents({ config }); + } } else { setOutage(true); }