Skip to content

Commit

Permalink
preliminary scopes implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
aidanm3341 committed Sep 3, 2024
1 parent 26e1256 commit cbb6b90
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 2 deletions.
17 changes: 17 additions & 0 deletions main/contracts/contracts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,15 @@ export interface IMessage<T = any> {
* Represents a messagebroker and provides access to the core features which includes publishing/subscribing to messages and RSVP.
*/
export interface IMessageBroker<T> {
/**
* A reference to the parent scope if this is not the root node in the tree of scopes. If this is the root, it's undefined.
*/
readonly parent?: IMessageBroker<T>;
/**
* A list of all child scopes that have been created on this instance of the broker.
*/
readonly scopes: IMessageBroker<T>[];

/**
* Creates a new channel with the provided channelName. An optional config object can be passed that specifies how many messages to cache.
* No caching is set by default
Expand Down Expand Up @@ -96,6 +105,14 @@ export interface IMessageBroker<T> {
* This RSVP function is used by responders and is analogous to the 'Get' function. Responders when invoked must return the required response value type.
*/
rsvp<K extends keyof RSVPOf<T>>(channelName: K, handler: RSVPHandler<T>): IResponderRef;

/**
* Creates a new scope with the given scopeName with this instance of the MessageBroker as its parent.
* If a scope with this name already exists, it returns that instance instead of creating a new one.
* @param scopeName The name to use for the scope to create
* @returns An instance of the messagebroker that matches the scopeName provided
*/
createScope(scopeName: string): IMessageBroker<T>;
}

/**
Expand Down
31 changes: 29 additions & 2 deletions main/core/messagebroker.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { get, Injectable } from '@morgan-stanley/needle';
import { get, getRootInjector, Injectable } from '@morgan-stanley/needle';
import { defer, Observable, Subject, Subscription } from 'rxjs';
import { filter, shareReplay } from 'rxjs/operators';
import { v4 as uuid } from 'uuid';
Expand Down Expand Up @@ -29,15 +29,18 @@ export function messagebroker<T = any>(): IMessageBroker<T> {
return instance;
}

const rootInjector = getRootInjector();

/**
* Represents a messagebroker. Using the 'new' operator is discouraged, instead use the messagebroker() function or dependency injection.
*/
@Injectable()
export class MessageBroker<T = any> implements IMessageBroker<T> {
private channelLookup: ChannelModelLookup<T> = {};
private messagePublisher = new Subject<IMessage<any>>();
private _scopes: IMessageBroker<T>[] = [];

constructor(private rsvpMediator: RSVPMediator<T>) {}
constructor(private rsvpMediator: RSVPMediator<T>, private _parent?: IMessageBroker<T>) {}

/**
* Creates a new channel with the provided channelName. An optional config object can be passed that specifies how many messages to cache.
Expand Down Expand Up @@ -99,6 +102,21 @@ export class MessageBroker<T = any> implements IMessageBroker<T> {
delete this.channelLookup[channelName];
}

/**
* Creates a new scope with the given scopeName with this instance of the MessageBroker as its parent.
* If a scope with this name already exists, it returns that instance instead of creating a new one.
* @param scopeName The name to use for the scope to create
* @returns An instance of the messagebroker that matches the scopeName provided
*/
public createScope(scopeName: string): IMessageBroker<T> {
const scope = rootInjector.createScope(scopeName);
scope.registerInstance(MessageBroker, new MessageBroker<T>(new RSVPMediator(), this));

const instance = scope.get(MessageBroker);
this._scopes.push(instance);
return instance;
}

/**
* Return a deferred observable as the channel config may have been updated before the subscription
* @param channelName name of channel to subscribe to
Expand Down Expand Up @@ -143,6 +161,7 @@ export class MessageBroker<T = any> implements IMessageBroker<T> {
}

const publishFunction = (data?: T[K], type?: string): void => {
this._scopes.forEach((scope) => scope.create(channelName).publish(data), type);
this.messagePublisher.next(this.createMessage(channelName, data, type));
};

Expand Down Expand Up @@ -180,4 +199,12 @@ export class MessageBroker<T = any> implements IMessageBroker<T> {
): channel is RequiredPick<IChannelModel<T[K]>, 'config' | 'subscription'> {
return channel != null && channel.subscription != null;
}

public get parent(): IMessageBroker<T> | undefined {
return this._parent;
}

public get scopes(): IMessageBroker<T>[] {
return this._scopes;
}
}
73 changes: 73 additions & 0 deletions spec/core/messagebroker.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,79 @@ describe('MessageBroker', () => {
});
});

describe('Scopes', () => {
it('should return a new messagebroker instance when creating a new scope', () => {
const instance = getInstance<IMySampleBroker>();
const scope = instance.createScope('scope1');

expect(scope).not.toEqual(instance);
});

it('should return same scope if same name is used', () => {
const instance = getInstance<IMySampleBroker>();
const scope = instance.createScope('scope1');
const sameScope = instance.createScope('scope1');

expect(scope).toEqual(sameScope);
});

it('should return itself when getting the parent of its child', () => {
const instance = getInstance<IMySampleBroker>();
const scope = instance.createScope('scope1');

expect(scope.parent).toEqual(instance);
});

it('should return a list of children scopes via scopes property', () => {
const instance = getInstance<IMySampleBroker>();
const scope1 = instance.createScope('scope1');
const scope2 = instance.createScope('scope2');
const scope3 = instance.createScope('scope3');

expect(instance.scopes).toEqual([scope1, scope2, scope3]);
});

it('should publish messages from parent to children', () => {
const parentMessages: Array<IMessage<string>> = [];
const childMessages: Array<IMessage<string>> = [];
const parent = getInstance();
const child = parent.createScope('scope1');

parent.get('channel').subscribe((message) => parentMessages.push(message));
child.get('channel').subscribe((message) => childMessages.push(message));

parent.create('channel').publish('both should get this');
child.create('channel').publish('only the child should get this');

expect(parentMessages.length).toEqual(1);
verifyMessage(parentMessages[0], 'both should get this');

expect(childMessages.length).toEqual(2);
verifyMessage(childMessages[0], 'both should get this');
verifyMessage(childMessages[1], 'only the child should get this');
});

it('should not publish messages to "sibling" scopes', () => {
const brotherMessages: Array<IMessage<string>> = [];
const sisterMessages: Array<IMessage<string>> = [];
const parent = getInstance();
const brother = parent.createScope('scope1');
const sister = parent.createScope('scope2');

brother.get('channel').subscribe((message) => brotherMessages.push(message));
sister.get('channel').subscribe((message) => sisterMessages.push(message));

brother.create('channel').publish('brother should get this');
sister.create('channel').publish('sister should get this');

expect(brotherMessages.length).toEqual(1);
verifyMessage(brotherMessages[0], 'brother should get this');

expect(sisterMessages.length).toEqual(1);
verifyMessage(sisterMessages[0], 'sister should get this');
});
});

function verifyMessage<T>(message: IMessage<T>, expectedData: T, expectedType?: string) {
expect(message).toBeDefined();
expect(message.data).toEqual(expectedData);
Expand Down

0 comments on commit cbb6b90

Please sign in to comment.