Skip to content

Commit

Permalink
Merge pull request #1 from Mahad-10/implement-call
Browse files Browse the repository at this point in the history
Implement call
  • Loading branch information
Mahad-10 authored Oct 5, 2024
2 parents 0f6625b + 62e956a commit 5f1a99a
Show file tree
Hide file tree
Showing 8 changed files with 633 additions and 2 deletions.
22 changes: 22 additions & 0 deletions lib/client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import {ClientAuthenticator, Serializer} from "wampproto";

import {Session} from "./session";
import {BaseSession} from "./types";
import {WAMPSessionJoiner} from "./joiner";


export class Client {
private readonly _authenticator?: ClientAuthenticator;
private readonly _serializer?: Serializer;

constructor(clientOptions: { authenticator?: ClientAuthenticator; serializer?: Serializer } = {}) {
this._authenticator = clientOptions.authenticator;
this._serializer = clientOptions.serializer;
}

async connect(url: string, realm: string): Promise<Session> {
const joiner = new WAMPSessionJoiner({authenticator: this._authenticator, serializer: this._serializer});
const baseSession: BaseSession = await joiner.join(url, realm);
return new Session(baseSession);
}
}
37 changes: 37 additions & 0 deletions lib/exception.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
export class ApplicationError extends Error {
public readonly message: string;
public readonly args?: any[];
public readonly kwargs?: { [key: string]: any } | null;

constructor(message: string, options?: { args?: any[]; kwargs?: { [key: string]: any } | null }) {
super(message);
this.message = message;
this.args = options?.args;
this.kwargs = options?.kwargs;
}

toString(): string {
let errStr = this.message;

if (this.args && this.args.length > 0) {
const argsStr = this.args.map(arg => arg.toString()).join(", ");
errStr += `: ${argsStr}`;
}

if (this.kwargs && Object.keys(this.kwargs).length > 0) {
const kwargsStr = Object.entries(this.kwargs)
.map(([key, value]) => `${key}=${value}`)
.join(", ");
errStr += `: ${kwargsStr}`;
}

return errStr;
}
}

export class ProtocolError extends Error {
constructor(public readonly message: string) {
super(message);
this.message = message;
}
}
33 changes: 33 additions & 0 deletions lib/helpers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import {JSONSerializer, CBORSerializer, MsgPackSerializer, Serializer, Error as ErrorMsg} from "wampproto";

const jsonSubProtocol = "wamp.2.json";
const cborSubProtocol = "wamp.2.cbor";
const msgpackSubProtocol = "wamp.2.msgpack";


export function getSubProtocol(serializer: Serializer): string {
if (serializer instanceof JSONSerializer) {
return jsonSubProtocol;
} else if (serializer instanceof CBORSerializer) {
return cborSubProtocol;
} else if (serializer instanceof MsgPackSerializer) {
return msgpackSubProtocol;
} else {
throw new Error("invalid serializer");
}
}

export function wampErrorString(err: ErrorMsg): string {
let errStr = err.uri;
if (err.args && err.args.length > 0) {
const args = err.args.map((arg: any) => arg.toString()).join(", ");
errStr += `: ${args}`;
}
if (err.kwargs && Object.keys(err.kwargs).length > 0) {
const kwargs = Object.entries(err.kwargs)
.map(([key, value]) => `${key}=${value}`)
.join(", ");
errStr += `: ${kwargs}`;
}
return errStr;
}
53 changes: 53 additions & 0 deletions lib/joiner.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import {Joiner, ClientAuthenticator, Serializer, JSONSerializer} from 'wampproto';

import {BaseSession} from './types';
import {getSubProtocol} from './helpers';


export class WAMPSessionJoiner {
private readonly _authenticator?: ClientAuthenticator;
private readonly _serializer: Serializer;

constructor(joinerOptions: { authenticator?: ClientAuthenticator, serializer?: Serializer }) {
this._serializer = joinerOptions.serializer || new JSONSerializer();
this._authenticator = joinerOptions.authenticator;
}

async join(uri: string, realm: string): Promise<BaseSession> {
const ws = new WebSocket(uri, [getSubProtocol(this._serializer)]);

const joiner = new Joiner(realm, this._serializer, this._authenticator);

ws.addEventListener('open', () => {
ws.send(joiner.sendHello());
});

return new Promise<BaseSession>((resolve, reject) => {
const wsMessageHandler = (event: MessageEvent) => {
try {
const toSend = joiner.receive(event.data);
if (!toSend) {
ws.removeEventListener('message', wsMessageHandler);
ws.removeEventListener('close', closeHandler);

const baseSession = new BaseSession(ws, wsMessageHandler, joiner.getSessionDetails(), this._serializer);
resolve(baseSession);
} else {
ws.send(toSend);
}
} catch (error) {
reject(error);
}
};

const closeHandler = () => {
ws.removeEventListener('message', wsMessageHandler);
reject(new Error('Connection closed before handshake completed'));
};

ws.addEventListener('message', wsMessageHandler);
ws.addEventListener('error', (error) => reject(error));
ws.addEventListener('close', closeHandler);
});
}
}
95 changes: 95 additions & 0 deletions lib/session.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import {
WAMPSession,
SessionScopeIDGenerator,
Message,
Call,
Result as ResultMsg,
Error as ErrorMsg, CallFields
} from "wampproto";

import {wampErrorString} from "./helpers";
import {IBaseSession, Result} from "./types";
import {ApplicationError, ProtocolError} from "./exception";


export class Session {
private _baseSession: IBaseSession;
private _wampSession: WAMPSession;
private _idGen: SessionScopeIDGenerator = new SessionScopeIDGenerator();

private _callRequests: Map<number, {
resolve: (value?: Result) => void,
reject: (reason?: ApplicationError) => void
}> = new Map();

constructor(baseSession: IBaseSession) {
this._baseSession = baseSession;
this._wampSession = new WAMPSession(baseSession.serializer());

(async () => {
for (; ;) {
const message = await this._baseSession.receive();
this._processIncomingMessage(this._wampSession.receive(message));
}
})();
}

private get _nextID(): number {
return this._idGen.next();
}

async close(): Promise<void> {
await this._baseSession.close();
}

private _processIncomingMessage(message: Message): void {
if (message instanceof ResultMsg) {
const promiseHandler = this._callRequests.get(message.requestID);
promiseHandler?.resolve(new Result(message.args, message.kwargs, message.options));
} else if (message instanceof ErrorMsg) {
switch (message.messageType) {
case Call.TYPE: {
const promiseHandler = this._callRequests.get(message.requestID);
promiseHandler?.reject(new ApplicationError(message.uri, {
args: message.args,
kwargs: message.kwargs,
}));

this._callRequests.delete(message.requestID);
break;
}
default:
throw new ProtocolError(wampErrorString(message));
}
} else {
throw new ProtocolError(`Unexpected message type ${typeof message}`);
}
}

async call(
procedure: string,
callOptions: {
args?: any[] | null,
kwargs?: { [key: string]: any } | null,
options?: { [key: string]: any } | null
} = {}
): Promise<Result> {
const call = new Call(new CallFields(
this._nextID, procedure, callOptions.args, callOptions.kwargs, callOptions.options)
);

let promiseHandler: {
resolve: (value: Result | PromiseLike<Result>) => void;
reject: (reason?: ApplicationError) => void
};

const promise = new Promise<Result>((resolve, reject) => {
promiseHandler = {resolve, reject};
});

this._callRequests.set(call.requestID, promiseHandler);
this._baseSession.send(this._wampSession.sendMessage(call));

return promise;
}
}
133 changes: 133 additions & 0 deletions lib/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
import {Serializer, Message, SessionDetails} from "wampproto";


export abstract class IBaseSession {
id(): number {
throw new Error("UnimplementedError");
}

realm(): string {
throw new Error("UnimplementedError");
}

authid(): string {
throw new Error("UnimplementedError");
}

authrole(): string {
throw new Error("UnimplementedError");
}

serializer(): Serializer {
throw new Error("UnimplementedError");
}

send(data: any): void {
throw new Error("UnimplementedError");
}

async receive(): Promise<any> {
throw new Error("UnimplementedError");
}

sendMessage(msg: Message): void {
throw new Error("UnimplementedError");
}

async receiveMessage(): Promise<Message> {
throw new Error("UnimplementedError");
}

async close(): Promise<void> {
throw new Error("UnimplementedError");
}
}

export class BaseSession extends IBaseSession {
private readonly _ws: WebSocket;
private readonly _wsMessageHandler: any;
private readonly sessionDetails: SessionDetails;
private readonly _serializer: Serializer;

constructor(
ws: WebSocket,
wsMessageHandler: any,
sessionDetails: SessionDetails,
serializer: Serializer
) {
super();
this._ws = ws;
this._wsMessageHandler = wsMessageHandler;
this.sessionDetails = sessionDetails;
this._serializer = serializer;

// close cleanly on abrupt client disconnect
this._ws.addEventListener("close", async () => {
await this.close();
});
}

id(): number {
return this.sessionDetails.sessionID;
}

realm(): string {
return this.sessionDetails.realm;
}

authid(): string {
return this.sessionDetails.authid;
}

authrole(): string {
return this.sessionDetails.authrole;
}

serializer(): Serializer {
return this._serializer;
}

send(data: any): void {
this._ws.send(data);
}

sendMessage(msg: Message): void {
this.send(this._serializer.serialize(msg));
}

async receive(): Promise<any> {
return new Promise((resolve) => {
const messageHandler = (event: MessageEvent) => {
resolve(event.data);
this._ws.removeEventListener("message", messageHandler);
};

this._ws.addEventListener("message", messageHandler, {once: true});
});
}

async receiveMessage(): Promise<Message> {
return this._serializer.deserialize(await this.receive());
}

async close(): Promise<void> {
if (this._wsMessageHandler) {
this._ws.removeEventListener("message", this._wsMessageHandler);
this._ws.removeEventListener("close", this._wsMessageHandler);
}

this._ws.close();
}
}

export class Result {
args: any[];
kwargs: { [key: string]: any };
details: { [key: string]: any };

constructor(args?: any[], kwargs?: { [key: string]: any }, details?: { [key: string]: any }) {
this.args = args || [];
this.kwargs = kwargs || {};
this.details = details || {};
}
}
Loading

0 comments on commit 5f1a99a

Please sign in to comment.