diff --git a/package.json b/package.json index 005c894..9333f84 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "tgrid", - "version": "1.0.3", + "version": "1.1.0", "main": "lib/index.js", "typings": "lib/index.d.ts", "exports": { diff --git a/src/components/Communicator.ts b/src/components/Communicator.ts index 11a2b6f..7d20a9e 100644 --- a/src/components/Communicator.ts +++ b/src/components/Communicator.ts @@ -1,8 +1,9 @@ -import { ConditionVariable, HashMap, Pair } from "tstl"; +import { ConditionVariable, HashMap, HashSet } from "tstl"; import { Driver } from "../typings/Driver"; import { serializeError } from "../utils/internal/serializeError"; import { Invoke } from "./Invoke"; +import { InvokeEvent } from "./InvokeEvent"; /** * The basic communicator. @@ -44,7 +45,15 @@ export abstract class Communicator< /** * @hidden */ - private promises_: HashMap>; + private promises_: HashMap; + + /** + * @hidden + */ + private event_listeners_: HashMap< + InvokeEvent.Type, + HashSet<(event: InvokeEvent) => void> + >; /** * @hidden @@ -72,6 +81,50 @@ export abstract class Communicator< // OTHER MEMBERS this.promises_ = new HashMap(); this.join_cv_ = new ConditionVariable(); + this.event_listeners_ = new HashMap(); + } + + /** + * Add invoke event listener. + * + * Add an event listener for the invoke event. The event listener would be called + * when some invoke event has been occured; sending, receiving, completing, or returning. + * + * If you change the requesting parameters or returning value in the event listener, + * it would affect to the RPC (Remote Procedure Call) communication. Therefore, you have + * to be careful when modifying the remote function calling. + * + * Of course, you can utilize the event listener just for monitoring the RPC events. + * + * @param type Type of the event + * @param listener The listener function to enroll + */ + public on( + type: Type, + listener: (event: InvokeEvent.EventMapper[Type]) => void, + ): void { + this.event_listeners_ + .take(type, () => new HashSet()) + .insert(listener as (event: InvokeEvent) => void); + } + + /** + * Erase invoke event listener. + * + * Erase an event listener from the invoke event. The event listener would not be + * called anymore when the specific invoke event has been occured. + * + * @param type Type of the event + * @param listener The listener function to erase + */ + public off( + type: Type, + listener: (event: InvokeEvent.EventMapper[Type]) => void, + ): void { + const it = this.event_listeners_.find(type); + if (it.equals(this.event_listeners_.end()) === false) + it.second.erase(listener as (event: InvokeEvent) => void); + if (it.second.empty()) this.event_listeners_.erase(it); } /** @@ -93,7 +146,7 @@ export abstract class Communicator< : new Error("Connection has been closed."); for (const entry of this.promises_) { - const reject: FunctionLike = entry.second.second; + const reject: FunctionLike = entry.second.reject; reject(rejectError); } @@ -116,7 +169,6 @@ export abstract class Communicator< */ private _Proxy_func(name: string): FunctionLike { const func = (...params: any[]) => this._Call_function(name, ...params); - return new Proxy(func, { get: ({}, newName: string) => { if (newName === "bind") @@ -125,7 +177,6 @@ export abstract class Communicator< return (thisArg: any, ...args: any[]) => func.call(thisArg, ...args); else if (newName === "apply") return (thisArg: any, args: any[]) => func.apply(thisArg, args); - return this._Proxy_func(`${name}.${newName}`); }, }); @@ -155,8 +206,27 @@ export abstract class Communicator< })), }; + // CALL EVENT LISTENERS + const eventSetIterator = this.event_listeners_.find("send"); + if (eventSetIterator.equals(this.event_listeners_.end()) === false) { + const event: InvokeEvent.ISend = { + type: "send", + time: new Date(), + function: invoke, + }; + for (const listener of eventSetIterator.second) + try { + listener(event); + } catch {} + } + // DO SEND WITH PROMISE - this.promises_.emplace(invoke.uid, new Pair(resolve, reject)); + this.promises_.emplace(invoke.uid, { + function: invoke, + time: new Date(), + resolve, + reject, + }); await this.sendData(invoke); }); } @@ -271,7 +341,7 @@ export abstract class Communicator< protected replyData(invoke: Invoke): void { if ((invoke as Invoke.IFunction).listener) this._Handle_function(invoke as Invoke.IFunction).catch(() => {}); - else this._Handle_return(invoke as Invoke.IReturn); + else this._Handle_complete(invoke as Invoke.IReturn); } /** @@ -279,6 +349,7 @@ export abstract class Communicator< */ private async _Handle_function(invoke: Invoke.IFunction): Promise { const uid: number = invoke.uid; + const time: Date = new Date(); try { //---- @@ -323,33 +394,80 @@ export abstract class Communicator< } func = func.bind(thisArg); + // CALL EVENT LISTENERS + const eventSetIterator: HashMap.Iterator< + InvokeEvent.Type, + HashSet<(event: InvokeEvent) => void> + > = this.event_listeners_.find("receive"); + if (eventSetIterator.equals(this.event_listeners_.end()) === false) { + const event: InvokeEvent.IReceive = { + type: "receive", + time, + function: invoke, + }; + for (const closure of eventSetIterator.second) + try { + closure(event); + } catch {} + } + //---- // RETURN VALUE //---- // CALL FUNCTION const parameters: any[] = invoke.parameters.map((p) => p.value); - const ret: any = await func(...parameters); - - await this._Send_return(uid, true, ret); + const result: any = await func(...parameters); + await this._Send_return({ + invoke, + time, + return: { + uid, + success: true, + value: result, + }, + }); } catch (exp) { - await this._Send_return(uid, false, exp); + await this._Send_return({ + invoke, + time, + return: { + uid, + success: false, + value: exp, + }, + }); } } /** * @hidden */ - private _Handle_return(invoke: Invoke.IReturn): void { - // GET THE PROMISE OBJECT + private _Handle_complete(invoke: Invoke.IReturn): void { + // FIND TARGET FUNCTION CALL const it = this.promises_.find(invoke.uid); if (it.equals(this.promises_.end())) return; + // CALL EVENT LISTENERS + const eventSetIterator = this.event_listeners_.find("complete"); + if (eventSetIterator.equals(this.event_listeners_.end()) === false) { + const event: InvokeEvent.IComplete = { + type: "complete", + function: it.second.function, + return: invoke, + requested_at: it.second.time, + completed_at: new Date(), + }; + for (const closure of eventSetIterator.second) + try { + closure(event); + } catch {} + } + // RETURNS const func: FunctionLike = invoke.success - ? it.second.first - : it.second.second; + ? it.second.resolve + : it.second.reject; this.promises_.erase(it); - func(invoke.value); } @@ -366,23 +484,39 @@ export abstract class Communicator< /** * @hidden */ - private async _Send_return( - uid: number, - success: boolean, - value: any, - ): Promise { + private async _Send_return(props: { + invoke: Invoke.IFunction; + return: Invoke.IReturn; + time: Date; + }): Promise { + const eventSet = this.event_listeners_.find("return"); + if (eventSet.equals(this.event_listeners_.end()) === false) { + const event: InvokeEvent.IReturn = { + type: "return", + function: props.invoke, + return: props.return, + requested_at: props.time, + completed_at: new Date(), + }; + for (const closure of eventSet.second) + try { + closure(event); + } catch {} + } + // SPECIAL LOGIC FOR ERROR -> FOR CLEAR JSON ENCODING - if (success === false && value instanceof Error) - value = serializeError(value); + if (props.return.success === false && props.return.value instanceof Error) + props.return.value = serializeError(props.return.value); // RETURNS - const ret: Invoke.IReturn = { - uid, - success, - value, - }; - await this.sendData(ret); + await this.sendData(props.return); } } type FunctionLike = (...args: any[]) => any; +interface IFunctionReservation { + function: Invoke.IFunction; + time: Date; + resolve: FunctionLike; + reject: FunctionLike; +} diff --git a/src/components/Invoke.ts b/src/components/Invoke.ts index 7039478..4995163 100644 --- a/src/components/Invoke.ts +++ b/src/components/Invoke.ts @@ -13,12 +13,12 @@ export namespace Invoke { /** * Unique identifier. */ - uid: number; + readonly uid: number; /** * Target function (sometimes calsuled in objects) to call. */ - listener: string; + readonly listener: string; /** * Parameters for the function call. @@ -49,12 +49,12 @@ export namespace Invoke { /** * Unique identifier. */ - uid: number; + readonly uid: number; /** * `true` -> return, `false` -> exception. */ - success: boolean; + readonly success: boolean; /** * Returned value or thrown exception. diff --git a/src/components/InvokeEvent.ts b/src/components/InvokeEvent.ts new file mode 100644 index 0000000..a229db0 --- /dev/null +++ b/src/components/InvokeEvent.ts @@ -0,0 +1,144 @@ +import { Invoke } from "./Invoke"; + +/** + * Event of the RPC invocation. + * + * `InvokeEvent` is a type for the RPC invocation event. It's a type for + * the event object that is dispatched when an RPC invocation is sended, + * received, completed or returned. + * + * For reference, "send" and "commplete" events are dispatched from the + * function calling request side, and "receive" and "return" events are + * dispatched from the function calling executor side. + * + * @author Jeongho Nam - https://github.com/samchon + */ +export type InvokeEvent = + | InvokeEvent.ISend + | InvokeEvent.IReceive + | InvokeEvent.IComplete + | InvokeEvent.IReturn; +export namespace InvokeEvent { + /** + * Type of the invoke event. + */ + export type Type = "send" | "receive" | "complete" | "return"; + + /** + * Event mapper for the RPC invocation. + */ + export interface EventMapper { + send: ISend; + receive: IReceive; + complete: IComplete; + return: IReturn; + } + + /** + * RPC message send event. + * + * Dispatched when request function calling to the remote system. + */ + export interface ISend { + /** + * Discriminator for the type of the event. + */ + readonly type: "send"; + + /** + * Time when the event is occurred. + */ + readonly time: Date; + + /** + * Sending message to be invoked. + */ + readonly function: Invoke.IFunction; + } + + /** + * RPC message receive event. + * + * Dispatched when requsted function calling from the remote system. + */ + export interface IReceive { + /** + * Discriminator for the type of the event. + */ + readonly type: "receive"; + + /** + * Time when the event is occurred. + */ + readonly time: Date; + + /** + * Received message to be invoked. + */ + readonly function: Invoke.IFunction; + } + + /** + * RPC message complete event. + * + * Dispatched when request function calling is completed by the remote system. + */ + export interface IComplete { + /** + * Discriminator for the type of the event. + */ + readonly type: "complete"; + + /** + * Function calling request. + */ + readonly function: Invoke.IFunction; + + /** + * Returned value from the remote function calling. + */ + readonly return: Invoke.IReturn; + + /** + * Time when the function calling is requested. + */ + readonly requested_at: Date; + + /** + * Time when the function calling is completed. + */ + readonly completed_at: Date; + } + + /** + * RPC message return event. + * + * Dispatched when return a result of the function calling from the remote system. + */ + export interface IReturn { + /** + * Discriminator for the type of the event. + */ + readonly type: "return"; + + /** + * Function calling requested by the remote system. + */ + readonly function: Invoke.IFunction; + + /** + * Return value that would be sent to the remove system. + */ + readonly return: Invoke.IReturn; + + /** + * Time when the function calling is requested by the remote system. + */ + readonly requested_at: Date; + + /** + * Time when the function calling is completed. + */ + readonly completed_at: Date; + } +} diff --git a/test/node/protocols/web/test_web_event.ts b/test/node/protocols/web/test_web_event.ts new file mode 100644 index 0000000..fb00006 --- /dev/null +++ b/test/node/protocols/web/test_web_event.ts @@ -0,0 +1,60 @@ +import { + Driver, + WebSocketAcceptor, + WebSocketConnector, + WebSocketServer, +} from "tgrid"; + +import { ICalculator } from "../../../controllers/ICalculator"; +import { Calculator } from "../../../providers/Calculator"; + +const PORT: number = 10171; + +export async function test_web_event(): Promise { + //---- + // HANDSHAKE + //---- + const server: WebSocketServer = + new WebSocketServer(); + let acceptor!: WebSocketAcceptor; + await server.open(PORT, async (a) => { + acceptor = a; + acceptor.accept(new Calculator()); + }); + + const connector: WebSocketConnector = + new WebSocketConnector(null, null); + await connector.connect(`ws://127.0.0.1:${PORT}`); + + //---- + // EVENT LISTENERS + //---- + connector.on("send", (event) => { + if (event.function.listener === "plus") { + event.function.parameters[0].value = 3; + event.function.parameters[1].value = 4; + } + }); + connector.on("complete", (event) => { + if (event.function.listener === "plus") event.return.value += 0.2; + }); + + acceptor.on("receive", (event) => { + if (event.function.listener === "multiplies") { + event.function.parameters[0].value = 2; + event.function.parameters[1].value = 7; + } + }); + acceptor.on("return", (event) => { + if (event.function.listener === "multiplies") event.return.value += 0.1; + }); + + // VALIDATE + const driver: Driver = connector.getDriver(); + if (7.2 !== (await driver.plus(1, 1))) + throw new Error("Failed to hook request evevnt"); + if (14.1 !== (await driver.multiplies(1, 1))) + throw new Error("Failed to hook response event"); + + await server.close(); +}