diff --git a/src/packages/emmett-postgresql/src/eventStore/postgreSQLEventStore.ts b/src/packages/emmett-postgresql/src/eventStore/postgreSQLEventStore.ts index 63807b77..18f5b269 100644 --- a/src/packages/emmett-postgresql/src/eventStore/postgreSQLEventStore.ts +++ b/src/packages/emmett-postgresql/src/eventStore/postgreSQLEventStore.ts @@ -24,9 +24,9 @@ import { } from '@event-driven-io/emmett'; import pg from 'pg'; import { - defaultProjectionOptions, + defaultPostgreSQLProjectionOptions, handleProjections, - type ProjectionDefintion, + type PostgreSQLProjectionDefintion, } from './projections'; import { appendToStream, createEventStoreSchema, readStream } from './schema'; @@ -99,12 +99,12 @@ export type PostgresEventStoreConnectionOptions = | PostgresEventStoreNotPooledOptions; export type PostgresEventStoreOptions = { - projections: ProjectionDefintion[]; + projections: PostgreSQLProjectionDefintion[]; connectionOptions?: PostgresEventStoreConnectionOptions; }; export const getPostgreSQLEventStore = ( connectionString: string, - options: PostgresEventStoreOptions = defaultProjectionOptions, + options: PostgresEventStoreOptions = defaultPostgreSQLProjectionOptions, ): PostgresEventStore => { const pool = dumbo({ connectionString, diff --git a/src/packages/emmett-postgresql/src/eventStore/projections/index.ts b/src/packages/emmett-postgresql/src/eventStore/projections/index.ts index 5a68e571..19bd0146 100644 --- a/src/packages/emmett-postgresql/src/eventStore/projections/index.ts +++ b/src/packages/emmett-postgresql/src/eventStore/projections/index.ts @@ -1,40 +1,41 @@ import { type NodePostgresClient, type NodePostgresTransaction, + type SQL, type SQLExecutor, } from '@event-driven-io/dumbo'; import { type Event, type EventTypeOf, + type ProjectionDefintion, + type ProjectionHandler, type ReadEvent, } from '@event-driven-io/emmett'; import type { PostgresEventStoreOptions } from '../postgreSQLEventStore'; -export type ProjectionHandlerContext = { +export type PostgreSQLProjectionHandlerContext = { connectionString: string; client: NodePostgresClient; execute: SQLExecutor; transaction: NodePostgresTransaction; }; -export type PostgresProjectionHandler = ( - events: ReadEvent[], - context: ProjectionHandlerContext, -) => Promise | void; - -export type ProjectionDefintion = { - type: 'inline'; - name?: string; - canHandle: EventTypeOf[]; - handle: PostgresProjectionHandler; -}; +export type PostgreSQLProjectionHandler = + ProjectionHandler; + +export interface PostgreSQLProjectionDefintion + extends ProjectionDefintion< + 'inline', + EventType, + PostgreSQLProjectionHandlerContext + > {} -export const defaultProjectionOptions: PostgresEventStoreOptions = { +export const defaultPostgreSQLProjectionOptions: PostgresEventStoreOptions = { projections: [], }; export const handleProjections = async ( - allProjections: ProjectionDefintion[], + allProjections: PostgreSQLProjectionDefintion[], connectionString: string, transaction: NodePostgresTransaction, events: ReadEvent[], @@ -57,12 +58,55 @@ export const handleProjections = async ( } }; -export const projection = ( - definition: ProjectionDefintion, -): ProjectionDefintion => definition as unknown as ProjectionDefintion; +export const postgreSQLProjection = ( + definition: PostgreSQLProjectionDefintion, +): PostgreSQLProjectionDefintion => + definition as unknown as PostgreSQLProjectionDefintion; + +/** @deprecated use postgreSQLProjection instead */ +export const projection = postgreSQLProjection; + +export const postgreSQLInlineProjection = ( + definition: Omit, 'type'>, +): PostgreSQLProjectionDefintion => + postgreSQLProjection({ type: 'inline', ...definition }); -export const inlineProjection = ( - definition: Omit, 'type'>, -): ProjectionDefintion => projection({ type: 'inline', ...definition }); +/** @deprecated use postgreSQLSingleProjection instead */ +export const inlineProjection = postgreSQLInlineProjection; + +export const postgreSQLRawBatchSQLProjection = ( + handle: ( + events: EventType[], + context: PostgreSQLProjectionHandlerContext, + ) => Promise | SQL[], + ...canHandle: EventTypeOf[] +): PostgreSQLProjectionDefintion => + postgreSQLInlineProjection({ + canHandle, + handle: async (events, context) => { + const sqls: SQL[] = await handle(events, context); + + await context.execute.batchCommand(sqls); + }, + }); + +export const postgreSQLRawSQLProjection = ( + handle: ( + event: EventType, + context: PostgreSQLProjectionHandlerContext, + ) => Promise | SQL, + ...canHandle: EventTypeOf[] +): PostgreSQLProjectionDefintion => + postgreSQLRawBatchSQLProjection( + async (events, context) => { + const sqls: SQL[] = []; + + for (const event of events) { + sqls.push(await handle(event, context)); + } + return sqls; + }, + ...canHandle, + ); export * from './pongo'; diff --git a/src/packages/emmett-postgresql/src/eventStore/projections/pongo.ts b/src/packages/emmett-postgresql/src/eventStore/projections/pongo.ts index 6c8bad85..7925febd 100644 --- a/src/packages/emmett-postgresql/src/eventStore/projections/pongo.ts +++ b/src/packages/emmett-postgresql/src/eventStore/projections/pongo.ts @@ -9,7 +9,10 @@ import { type PongoDocument, } from '@event-driven-io/pongo'; import pg from 'pg'; -import { inlineProjection, type ProjectionDefintion } from './'; +import { + postgreSQLInlineProjection, + type PostgreSQLProjectionDefintion, +} from './'; export type PongoProjectionOptions = { documentId: (event: ReadEvent) => string; @@ -40,8 +43,8 @@ export type PongoDocumentEvolve< export const pongoProjection = ( handle: (pongo: PongoClient, events: ReadEvent[]) => Promise, ...canHandle: EventTypeOf[] -): ProjectionDefintion => - inlineProjection({ +): PostgreSQLProjectionDefintion => + postgreSQLInlineProjection({ canHandle, handle: async (events, context) => { const { connectionString, client } = context; @@ -58,7 +61,7 @@ export const pongoMultiStreamProjection = < getDocumentId: (event: ReadEvent) => string, evolve: PongoDocumentEvolve, ...canHandle: EventTypeOf[] -): ProjectionDefintion => +): PostgreSQLProjectionDefintion => pongoProjection( async (pongo, events) => { const collection = pongo.db().collection(collectionName); @@ -79,7 +82,7 @@ export const pongoSingleProjection = < collectionName: string, evolve: PongoDocumentEvolve, ...canHandle: EventTypeOf[] -): ProjectionDefintion => +): PostgreSQLProjectionDefintion => pongoMultiStreamProjection( collectionName, (event) => event.metadata.streamName, diff --git a/src/packages/emmett/src/index.ts b/src/packages/emmett/src/index.ts index 8ec862fc..e004604f 100644 --- a/src/packages/emmett/src/index.ts +++ b/src/packages/emmett/src/index.ts @@ -2,6 +2,7 @@ export * from './commandHandling'; export * from './errors'; export * from './eventStore'; export * from './messageBus'; +export * from './projections'; export * from './serialization'; export * from './streaming'; export * from './subscriptions'; diff --git a/src/packages/emmett/src/projections/index.ts b/src/packages/emmett/src/projections/index.ts new file mode 100644 index 00000000..fc1a4eff --- /dev/null +++ b/src/packages/emmett/src/projections/index.ts @@ -0,0 +1,20 @@ +import type { DefaultRecord, Event, EventTypeOf, ReadEvent } from '../typing'; + +export type ProjectionHandler< + EventType extends Event = Event, + ProjectionHandlerContext extends DefaultRecord = DefaultRecord, +> = ( + events: ReadEvent[], + context: ProjectionHandlerContext, +) => Promise | void; + +export interface ProjectionDefintion< + ProjectionType extends 'inline' | 'async', + EventType extends Event = Event, + ProjectionHandlerContext extends DefaultRecord = DefaultRecord, +> { + type: ProjectionType; + name?: string; + canHandle: EventTypeOf[]; + handle: ProjectionHandler; +}