Skip to content

Commit

Permalink
Added implementation of the message bus and deep read-only structure
Browse files Browse the repository at this point in the history
  • Loading branch information
oskardudycz committed Apr 12, 2024
1 parent 165278b commit e2681dd
Show file tree
Hide file tree
Showing 6 changed files with 280 additions and 15 deletions.
152 changes: 152 additions & 0 deletions src/packages/emmett/src/messageBus/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
import { EmmettError } from '../errors';
import {
type Command,
type CommandTypeOf,
type Event,
type EventTypeOf,
} from '../typing';

export interface CommandSender {
send<CommandType extends Command = Command>(
command: CommandType,
): Promise<void>;
}

export interface EventsPublisher {
publish<EventType extends Event = Event>(event: EventType): Promise<void>;
}

export type ScheduleOptions = { afterInMs: number } | { at: Date };

export interface MessageScheduler<CommandOrEvent extends Command | Event> {
schedule<MessageType extends CommandOrEvent>(
message: MessageType,
when?: ScheduleOptions,
): void;
}

export interface CommandBus extends CommandSender, MessageScheduler<Command> {}

export interface EventBus extends EventsPublisher, MessageScheduler<Event> {}

export interface MessageBus extends CommandBus, EventBus {
schedule<MessageType extends Command | Event>(
message: MessageType,
when?: ScheduleOptions,
): void;
}

export type CommandHandler<CommandType extends Command = Command> = (
command: CommandType,
) => Promise<void> | void;

export interface CommandProcessor {
handle<CommandType extends Command>(
commandHandler: CommandHandler<CommandType>,
...commandTypes: CommandTypeOf<CommandType>[]
): void;
}

export type EventHandler<EventType extends Event = Event> = (
event: EventType,
) => Promise<void> | void;

export interface EventProcessor {
subscribe<EventType extends Event>(
eventHandler: EventHandler<EventType>,
...eventTypes: EventTypeOf<EventType>[]
): void;
}

export type ScheduledMessage = {
message: Event | Command;
options?: ScheduleOptions;
};

export interface ScheduledMessageProcessor {
dequeue(): ScheduledMessage[];
}

export type MessageHandler = CommandHandler | EventHandler;

export type MessageProcessor = EventProcessor | CommandProcessor;

export const getInMemoryMessageBus = (): MessageBus &
MessageProcessor &
ScheduledMessageProcessor => {
const allHandlers = new Map<string, MessageHandler[]>();
let pendingMessages: ScheduledMessage[] = [];

return {
send: async <CommandType extends Command = Command>(
command: CommandType,
): Promise<void> => {
const handlers = allHandlers.get(command.type);

if (handlers === undefined || handlers.length === 0)
throw new EmmettError(
`No handler registered for command ${command.type}!`,
);

const commandHandler = handlers[0] as CommandHandler<CommandType>;

await commandHandler(command);
},

publish: async <EventType extends Event = Event>(
event: EventType,
): Promise<void> => {
const handlers = allHandlers.get(event.type) ?? [];

for (const handler of handlers) {
const eventHandler = handler as EventHandler<EventType>;

await eventHandler(event);
}
},

schedule: <MessageType extends Command | Event>(
message: MessageType,
when?: ScheduleOptions,
): void => {
pendingMessages = [...pendingMessages, { message, options: when }];
},

handle: <CommandType extends Command>(
commandHandler: CommandHandler<CommandType>,
...commandTypes: CommandTypeOf<CommandType>[]
): void => {
const alreadyRegistered = [...allHandlers.keys()].filter((registered) =>
commandTypes.includes(registered),
);

if (alreadyRegistered.length > 0)
throw new EmmettError(
`Cannot register handler for commands ${alreadyRegistered.join(', ')} as they're already registered!`,
);
for (const commandType of commandTypes) {
allHandlers.set(commandType, [commandHandler as MessageHandler]);
}
},

subscribe<EventType extends Event>(
eventHandler: EventHandler<EventType>,
...eventTypes: EventTypeOf<EventType>[]
): void {
for (const eventType of eventTypes) {
if (!allHandlers.has(eventType)) allHandlers.set(eventType, []);

allHandlers.set(eventType, [
...(allHandlers.get(eventType) ?? []),
eventHandler as MessageHandler,
]);
}
},

dequeue: (): ScheduledMessage[] => {
const pending = pendingMessages;
pendingMessages = [];
return pending;
},
};
};
11 changes: 5 additions & 6 deletions src/packages/emmett/src/testing/assertions.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import type { DefaultRecord } from '../typing';

export const isSubset = (superObj: unknown, subObj: unknown): boolean => {
const sup = superObj as Record<string, unknown>;
const sub = subObj as Record<string, unknown>;
const sup = superObj as DefaultRecord;
const sub = subObj as DefaultRecord;

return Object.keys(sub).every((ele: string) => {
if (typeof sub[ele] == 'object') {
return isSubset(
sup[ele] as Record<string, unknown>,
sub[ele] as Record<string, unknown>,
);
return isSubset(sup[ele], sub[ele]);
}
return sub[ele] === sup[ele];
});
Expand Down
37 changes: 33 additions & 4 deletions src/packages/emmett/src/typing/command.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import type { Flavour } from './';

import type { DefaultRecord, Flavour } from './';
export type Command<
CommandType extends string = string,
CommandData extends Record<string, unknown> = Record<string, unknown>,
CommandMetaData extends Record<string, unknown> = DefaultCommandMetadata,
CommandData extends DefaultRecord = DefaultRecord,
CommandMetaData extends DefaultRecord = DefaultCommandMetadata,
> = Flavour<
Readonly<{
type: CommandType;
Expand All @@ -13,4 +12,34 @@ export type Command<
'Command'
>;

export type CommandTypeOf<T extends Command> = T['type'];
export type CommandDataOf<T extends Command> = T['data'];
export type CommandMetaDataOf<T extends Command> = T['metadata'];

export type CreateCommandType<
CommandType extends string,
CommandData extends DefaultRecord,
CommandMetaData extends DefaultRecord | undefined,
> = Readonly<{
type: CommandType;
data: CommandData;
metadata?: CommandMetaData;
}>;

export const command = <CommandType extends Command>(
type: CommandTypeOf<CommandType>,
data: CommandDataOf<CommandType>,
metadata?: CommandMetaDataOf<CommandType>,
): CreateCommandType<
CommandTypeOf<CommandType>,
CommandDataOf<CommandType>,
CommandMetaDataOf<CommandType>
> => {
return {
type,
data,
metadata,
};
};

export type DefaultCommandMetadata = { now: Date };
51 changes: 51 additions & 0 deletions src/packages/emmett/src/typing/deepReadonly.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
type Primitive =
| undefined
| null
| boolean
| string
| number
| bigint
| symbol
// eslint-disable-next-line @typescript-eslint/ban-types
| Function;
type ImmutableTypes = Date | RegExp;

export type DeepReadonly<T> = T extends Primitive | ImmutableTypes
? T
: T extends Array<infer U>
? ReadonlyArray<DeepReadonly<U>>
: T extends Map<infer K, infer V>
? ReadonlyMap<DeepReadonly<K>, DeepReadonly<V>>
: T extends Set<infer M>
? ReadonlySet<DeepReadonly<M>>
: T extends Promise<infer U>
? Promise<DeepReadonly<U>>
: T extends object
? DeepReadonlyObject<T>
: Readonly<T>;

type DeepReadonlyObject<T> = {
readonly [P in keyof T]: DeepReadonly<T[P]>;
};

export type Mutable<T> = T extends Primitive
? T // Primitives are returned as-is
: T extends ReadonlyArray<infer U>
? MutableArray<U> // Handle ReadonlyArray
: T extends ReadonlyMap<infer K, infer V>
? MutableMap<K, V> // Handle ReadonlyMap
: T extends ReadonlySet<infer M>
? MutableSet<M> // Handle ReadonlySet
: // eslint-disable-next-line @typescript-eslint/ban-types
T extends Function
? T // Functions are returned as-is
: T extends object
? MutableObject<T> // Handle objects
: unknown; // Fallback type if none above match

type MutableArray<T> = Array<Mutable<T>>;
type MutableMap<K, V> = Map<Mutable<K>, Mutable<V>>;
type MutableSet<T> = Set<Mutable<T>>;
type MutableObject<T> = {
-readonly [P in keyof T]: Mutable<T[P]>;
};
40 changes: 35 additions & 5 deletions src/packages/emmett/src/typing/event.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,44 @@
import type { Flavour } from './';
import type { DefaultRecord, Flavour } from './';

export type Event<
EventType extends string = string,
EventData extends Record<string, unknown> = Record<string, unknown>,
EventMetaData extends Record<string, unknown> = Record<string, unknown>,
EventData extends DefaultRecord = DefaultRecord,
EventMetaData extends DefaultRecord = DefaultRecord,
> = Flavour<
Readonly<{
type: EventType;
data: Readonly<EventData>;
metadata?: EventMetaData | undefined;
data: EventData;
metadata?: EventMetaData;
}>,
'Event'
>;

export type EventTypeOf<T extends Event> = T['type'];
export type EventDataOf<T extends Event> = T['data'];
export type EventMetaDataOf<T extends Event> = T['metadata'];

export type CreateEventType<
EventType extends string,
EventData extends DefaultRecord,
EventMetaData extends DefaultRecord | undefined,
> = Readonly<{
type: EventType;
data: EventData;
metadata?: EventMetaData;
}>;

export const event = <EventType extends Event>(
type: EventTypeOf<EventType>,
data: EventDataOf<EventType>,
metadata?: EventMetaDataOf<EventType>,
): CreateEventType<
EventTypeOf<EventType>,
EventDataOf<EventType>,
EventMetaDataOf<EventType>
> => {
return {
type,
data,
metadata,
};
};
4 changes: 4 additions & 0 deletions src/packages/emmett/src/typing/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
export * from './deepReadonly';

export * from './command';
export * from './event';

Expand All @@ -6,3 +8,5 @@ export * from './workflow';

export type Brand<K, T> = K & { readonly __brand: T };
export type Flavour<K, T> = K & { readonly __brand?: T };

export type DefaultRecord = Record<string, unknown>;

0 comments on commit e2681dd

Please sign in to comment.