Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track processed events in Azure provider #1527

Merged
merged 13 commits into from
Mar 20, 2024
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@boostercloud/framework-core",
"comment": "Track processed events in Azure provider to avoid duplication",
"type": "minor"
}
],
"packageName": "@boostercloud/framework-core"
}
26 changes: 24 additions & 2 deletions packages/framework-core/src/booster-event-processor.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import {
TraceActionTypes,
BoosterConfig,
EventEnvelope,
EventHandlerGlobalError,
EventHandlerInterface,
EventInterface,
Register,
TraceActionTypes,
UUID,
} from '@boostercloud/framework-types'
import { EventStore } from './services/event-store'
Expand All @@ -24,8 +24,9 @@ export class BoosterEventProcessor {
*/
public static eventProcessor(eventStore: EventStore, readModelStore: ReadModelStore): EventsStreamingCallback {
return async (entityName, entityID, eventEnvelopes, config) => {
const unprocessedEvents = await BoosterEventProcessor.filterProcessed(config, eventEnvelopes, eventStore)
const eventEnvelopesProcessors = [
BoosterEventProcessor.dispatchEntityEventsToEventHandlers(eventEnvelopes, config),
BoosterEventProcessor.dispatchEntityEventsToEventHandlers(unprocessedEvents, config),
]

// Read models are not updated for notification events (events that are not related to an entity but a topic)
Expand All @@ -34,11 +35,32 @@ export class BoosterEventProcessor {
BoosterEventProcessor.snapshotAndUpdateReadModels(config, entityName, entityID, eventStore, readModelStore)
)
}
// Store events that were just processed
await eventStore.storeProcessedEvents(unprocessedEvents)

await Promises.allSettledAndFulfilled(eventEnvelopesProcessors)
}
alvaroloes marked this conversation as resolved.
Show resolved Hide resolved
}

private static async filterProcessed(
config: BoosterConfig,
eventEnvelopes: Array<EventEnvelope>,
eventStore: EventStore
): Promise<Array<EventEnvelope>> {
const logger = getLogger(config, 'BoosterEventDispatcher#filterProcessed')
const filteredResults = await Promise.all(
eventEnvelopes.map(async (eventEnvelope) => {
const result = await eventStore.searchProcessed(eventEnvelope)
if (!result || result.length > 0) {
logger.warn('Event has already been processed. Skipping.', eventEnvelope)
}
return result?.length === 0
})
)

return eventEnvelopes.filter((_, index) => filteredResults[index])
}

private static async snapshotAndUpdateReadModels(
config: BoosterConfig,
entityName: string,
Expand Down
24 changes: 24 additions & 0 deletions packages/framework-core/src/services/event-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,30 @@ export class EventStore {
}
}

@Trace(TraceActionTypes.CUSTOM)
public async storeProcessedEvents(eventEnvelopes: Array<EventEnvelope>) {
const logger = getLogger(this.config, 'EventStore#storeProcessedEvents')
try {
logger.debug('Storing processed events in the processed event store:', eventEnvelopes)
return await this.config.provider.events.storeProcessed(eventEnvelopes, this.config)
} catch (e) {
logger.error('Could not store processed events')
return
}
}

@Trace(TraceActionTypes.CUSTOM)
public async searchProcessed(eventEnvelope: EventEnvelope) {
const logger = getLogger(this.config, 'EventStore#searchProcessed')
try {
logger.debug('Checking if event has been processed:', eventEnvelope)
return await this.config.provider.events.searchProcessed(eventEnvelope, this.config)
} catch (e) {
logger.error('Could not verify if event has been processed')
return
}
}

@Trace(TraceActionTypes.STORE_SNAPSHOT)
private async storeSnapshot(
snapshot: NonPersistedEntitySnapshotEnvelope
Expand Down
34 changes: 29 additions & 5 deletions packages/framework-core/test/booster-event-processor.test.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
/* eslint-disable @typescript-eslint/explicit-function-return-type */
import { fake, replace, restore, createStubInstance } from 'sinon'
import { createStubInstance, fake, match, replace, restore } from 'sinon'
import {
BoosterConfig,
EntitySnapshotEnvelope,
UUID,
EntityInterface,
ProviderLibrary,
Register,
EntitySnapshotEnvelope,
EventInterface,
NonPersistedEventEnvelope,
ProviderLibrary,
Register,
UUID,
} from '@boostercloud/framework-types'
import { expect } from './expect'
import { ReadModelStore } from '../src/services/read-model-store'
Expand All @@ -24,6 +24,7 @@ class SomeEvent {
public entityID(): UUID {
return this.id
}

public getPrefixedId(prefix: string): string {
return `${prefix}-${this.id}`
}
Expand Down Expand Up @@ -129,6 +130,9 @@ describe('BoosterEventProcessor', () => {
const stubReadModelStore = createStubInstance(ReadModelStore)

const boosterEventProcessor = BoosterEventProcessor as any
const fakeFilterProcessed = fake.returns([someEvent])

replace(boosterEventProcessor, 'filterProcessed', fakeFilterProcessed)
replace(boosterEventProcessor, 'snapshotAndUpdateReadModels', fake())
replace(boosterEventProcessor, 'dispatchEntityEventsToEventHandlers', fake())

Expand Down Expand Up @@ -318,6 +322,26 @@ describe('BoosterEventProcessor', () => {
})
})

describe('the `filterProcessed` method', () => {
it("removes events if they've been already processed", async () => {
const boosterEventProcessor = BoosterEventProcessor as any
const eventStore = createStubInstance(EventStore)
const someEventEnvelope = { ...someEvent, id: 'event-id' }
eventStore.searchProcessed = fake.returns({ id: 'event-id' }) as any

const unprocessedEvents = await boosterEventProcessor.filterProcessed(config, [someEventEnvelope], eventStore)

expect(eventStore.searchProcessed).to.have.been.called
expect(eventStore.searchProcessed).to.have.been.calledOnceWith(someEventEnvelope)
expect(unprocessedEvents).to.deep.equal([])
expect(config.logger?.warn).to.have.been.calledWith(
'[Booster]|BoosterEventDispatcher#filterProcessed: ',
'Event has already been processed. Skipping.',
match.any
)
})
})

it('calls an instance method in the event and it is executed without failing', async () => {
config.eventHandlers[SomeEvent.name] = [{ handle: AnEventHandler.handle }]
const boosterEventProcessor = BoosterEventProcessor as any
Expand Down
7 changes: 7 additions & 0 deletions packages/framework-provider-aws/src/library/events-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -188,3 +188,10 @@ async function persistEvent(
throw e
}
}

/**
* Dummy method that'll always return an empty array, since local provider won't be tracking processed events
*/
export async function storeProcessedEvents() {
return []
}
Original file line number Diff line number Diff line change
Expand Up @@ -248,3 +248,10 @@ function convertToSearchResult(eventEnvelopes: Array<EventEnvelope>): Array<Even
return 0
})
}

/**
* Dummy method that'll always return an empty array, meaning that no processed event was found for a given event ID
*/
export async function searchProcessedEvents() {
return []
}
5 changes: 4 additions & 1 deletion packages/framework-provider-aws/src/setup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ import {
readEntityEventsSince,
readEntityLatestSnapshot,
storeEvents,
storeProcessedEvents,
storeSnapshot,
} from './library/events-adapter'
import { searchEntitiesIds, searchEvents } from './library/events-searcher-adapter'
import { searchEntitiesIds, searchEvents, searchProcessedEvents } from './library/events-searcher-adapter'
import { rawGraphQLRequestToEnvelope } from './library/graphql-adapter'
import {
deleteReadModel,
Expand Down Expand Up @@ -75,6 +76,8 @@ export const Provider = (rockets?: RocketDescriptor[]): ProviderLibrary => {
searchEntitiesIDs: searchEntitiesIds.bind(null, dynamoDB),
store: storeEvents.bind(null, dynamoDB),
storeSnapshot: storeSnapshot.bind(null, dynamoDB),
storeProcessed: storeProcessedEvents,
searchProcessed: searchProcessedEvents,
},
// ProviderReadModelsLibrary
readModels: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ export class TerraformContainers {
cosmosdbDatabase,
cosmosdbSqlDatabase
)
const processedEventsContainer = this.createProcessedEventsContainer(
azureProvider,
appPrefix,
terraformStack,
config,
cosmosdbDatabase,
cosmosdbSqlDatabase
)
const readModels = Object.keys(config.readModels).map((readModel) =>
this.createReadModel(azureProvider, terraformStack, config, readModel, cosmosdbDatabase, cosmosdbSqlDatabase)
)
Expand All @@ -53,7 +61,9 @@ export class TerraformContainers {
cosmosdbDatabase,
cosmosdbSqlDatabase
)
return [cosmosdbSqlEventContainer, subscriptionsContainer, connectionsContainer].concat(readModels)
return [cosmosdbSqlEventContainer, processedEventsContainer, subscriptionsContainer, connectionsContainer].concat(
readModels
)
}
return [cosmosdbSqlEventContainer].concat(readModels)
}
Expand Down Expand Up @@ -81,6 +91,30 @@ export class TerraformContainers {
})
}

private static createProcessedEventsContainer(
providerResource: AzurermProvider,
appPrefix: string,
terraformStackResource: TerraformStack,
config: BoosterConfig,
cosmosdbDatabaseResource: cosmosdbAccount.CosmosdbAccount,
cosmosdbSqlDatabaseResource: cosmosdbSqlDatabase.CosmosdbSqlDatabase
): cosmosdbSqlContainer.CosmosdbSqlContainer {
const idEvent = toTerraformName(appPrefix, 'processed-events')
return new cosmosdbSqlContainer.CosmosdbSqlContainer(terraformStackResource, idEvent, {
name: config.resourceNames.processedEventsStore,
resourceGroupName: cosmosdbDatabaseResource.resourceGroupName,
accountName: cosmosdbDatabaseResource.name,
databaseName: cosmosdbSqlDatabaseResource.name,
partitionKeyPath: '/id',
partitionKeyVersion: 2,
autoscaleSettings: {
maxThroughput: MAX_CONTAINER_THROUGHPUT,
},
defaultTtl: config.processedEventsTtl,
provider: providerResource,
})
}

private static createReadModel(
providerResource: AzurermProvider,
terraformStackResource: TerraformStack,
Expand Down
4 changes: 4 additions & 0 deletions packages/framework-provider-azure/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import {
readEntityEventsSince,
readEntityLatestSnapshot,
storeEvents,
storeProcessedEvents,
fetchProcessedEvents,
storeSnapshot,
} from './library/events-adapter'
import { CosmosClient } from '@azure/cosmos'
Expand Down Expand Up @@ -104,6 +106,8 @@ export const Provider = (rockets?: RocketDescriptor[]): ProviderLibrary => ({
latestEntitySnapshot: readEntityLatestSnapshot.bind(null, cosmosClient),
search: searchEvents.bind(null, cosmosClient),
searchEntitiesIDs: searchEntitiesIds.bind(null, cosmosClient),
storeProcessed: storeProcessedEvents.bind(null, cosmosClient),
searchProcessed: fetchProcessedEvents.bind(null, cosmosClient),
},
// ProviderReadModelsLibrary
readModels: {
Expand Down
Loading
Loading