diff --git a/README.md b/README.md index 00decc0..129480e 100644 --- a/README.md +++ b/README.md @@ -32,6 +32,27 @@ This module is a component of the [Chatie](https://www.chatie.io) service. - [CQRS + Event Sourcing – Step by Step](https://danielwhittaker.me/2020/02/20/cqrs-step-step-guide-flow-typical-application/) - Talk Video: [GOTO 2015 • DDD & Microservices: At Last, Some Boundaries! • Eric Evans](https://www.youtube.com/watch?v=yPvef9R3k-M) - Talk Video: [GOTO 2014 • Event Sourcing • Greg Young](https://www.youtube.com/watch?v=8JKjvY4etTY) +- Talk Video: [Azure Messaging: When to use what and why](https://www.youtube.com/watch?v=82akIKoX09A) +- Talk Video: [Data Consistency in Microservice Using Sagas by Chris Richardson, QCon 2018](https://www.infoq.com/presentations/saga-microservices/#downloadPdf/) +- [Saga: How to implement complex business transactions without two phase commit.](https://blog.bernd-ruecker.com/saga-how-to-implement-complex-business-transactions-without-two-phase-commit-e00aa41a1b1b) +- [Clarifying the Saga pattern](https://web.archive.org/web/20160730071315/http://kellabyte.com/2012/05/30/clarifying-the-saga-pattern/) + +### WebRTC + +- [Some redux tooling around creating peer to peer connections](https://github.com/jcblw/redux-peer-connection) +- [Simple WebRTC video, voice, and data channels](https://github.com/feross/simple-peer) + +## History + +### 0.5 master + +### 0.2 (Aug 2018) + +Io Server worked + +### 0.0.1 (May 2, 2017) + +Init code base ## Author diff --git a/examples/server.ts b/examples/server.ts index 9384309..2926b15 100644 --- a/examples/server.ts +++ b/examples/server.ts @@ -11,7 +11,7 @@ import { } from 'net' import { - IoServer, + IoHub, log, } from '../src/' @@ -22,18 +22,18 @@ async function main (): Promise { httpServer.listen(port, () => { const address = httpServer.address() as AddressInfo - log.info('IoServerExample', 'Listening on ' + address.port) + log.info('IoHubExample', 'Listening on ' + address.port) }) - const ioServer = new IoServer({ + const ioHub = new IoHub({ httpServer, }) try { - await ioServer.start() - log.info('IoServerExample', 'init succeed') + await ioHub.start() + log.info('IoHubExample', 'init succeed') } catch (e) { - log.error('IoServerExample', 'init failed: %s', e.message) + log.error('IoHubExample', 'init failed: %s', e.message) throw e } return 0 diff --git a/package.json b/package.json index 6bad54f..b0839a2 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "@chatie/io", - "version": "0.3.13", - "description": "Cloud Input/Output Service for Chatie", + "version": "0.5.0", + "description": "IO Hub Service for Chatie Cloud", "main": "dist/src/index.js", "typings": "dist/src/index.d.ts", "directories": { @@ -9,11 +9,13 @@ "test": "tests" }, "dependencies": { - "brolog": "^1.8.1", - "listag": "^0.8.3", - "moment": "^2.22.2", + "brolog": "^1.8.3", + "listag": "^0.8.4", + "moment": "^2.24.0", "request-ip": "^2.1.3", - "ws": "^7.0.0" + "rxjs": "^6.5.4", + "state-switch": "^0.6.18", + "ws": "^7.2.3" }, "devDependencies": { "@chatie/eslint-config": "^0.8.1", @@ -21,10 +23,10 @@ "@chatie/semver": "^0.4.7", "@chatie/tsconfig": "^0.8.0", "@types/request-ip": "0.0.35", - "@types/ws": "^6.0.0", + "@types/ws": "^7.2.2", "pkg-jq": "^0.2.4", "shx": "^0.3.2", - "tstest": "^0.4.2" + "tstest": "^0.4.10" }, "scripts": { "clean": "shx rm -fr dist/*", diff --git a/src/server/io-auth.ts b/src/bus/io-auth.ts similarity index 100% rename from src/server/io-auth.ts rename to src/bus/io-auth.ts diff --git a/src/server/io-server.spec.ts b/src/bus/io-bus.spec.ts similarity index 55% rename from src/server/io-server.spec.ts rename to src/bus/io-bus.spec.ts index 6268f1a..e7952e2 100755 --- a/src/server/io-server.spec.ts +++ b/src/bus/io-bus.spec.ts @@ -6,12 +6,12 @@ import test from 'blue-tape' import http from 'http' import { - IoServer, -} from './io-server' + IoBus, +} from './io-bus' test('IoServer smoking test', async t => { const httpServer = http.createServer() - const ioServer = new IoServer({ httpServer }) + const ioBus = new IoBus({ httpServer }) - t.ok(ioServer, 'should instanciated an IoServer') + t.ok(ioBus, 'should instanciated an IoBus') }) diff --git a/src/server/io-server.ts b/src/bus/io-bus.ts similarity index 89% rename from src/server/io-server.ts rename to src/bus/io-bus.ts index 1932109..dbc76ce 100644 --- a/src/server/io-server.ts +++ b/src/bus/io-bus.ts @@ -39,12 +39,12 @@ import { IoSocket, } from './io-socket' -export interface IoServerOptions { +export interface IoBusOptions { httpServer : http.Server, httpPath? : string, } -export class IoServer { +export class IoBus { public static readonly VERSION = VERSION @@ -57,9 +57,9 @@ export class IoServer { * Constructor */ constructor ( - public options: IoServerOptions, + public options: IoBusOptions, ) { - log.verbose('IoServer', 'constructor()') + log.verbose('IoBus', 'constructor()') this.ioManager = new IoManager() this.ioAuth = new IoAuth() @@ -78,12 +78,12 @@ export class IoServer { } public async start () { - log.verbose('IoServer', 'start()') + log.verbose('IoBus', 'start()') await this.ioSocket.start() } public async stop () { - log.verbose('IoServer', 'stop()') + log.verbose('IoBus', 'stop()') // await this.ioSocket.stop() } diff --git a/src/server/io-manager.ts b/src/bus/io-manager.ts similarity index 100% rename from src/server/io-manager.ts rename to src/bus/io-manager.ts diff --git a/src/server/io-socket.ts b/src/bus/io-socket.ts similarity index 100% rename from src/server/io-socket.ts rename to src/bus/io-socket.ts diff --git a/src/config.ts b/src/config.ts index 911f116..14dd15e 100644 --- a/src/config.ts +++ b/src/config.ts @@ -1,3 +1,5 @@ +/// + export { log, } from 'brolog' diff --git a/src/index.ts b/src/index.ts index 7352ad3..addab97 100644 --- a/src/index.ts +++ b/src/index.ts @@ -4,8 +4,8 @@ export { } from './config' export { - IoServer, -} from './server/io-server' + IoBus, +} from './bus/io-bus' // export { // IoClient, diff --git a/src/io.spec.ts b/src/io.spec.ts new file mode 100644 index 0000000..7efa37a --- /dev/null +++ b/src/io.spec.ts @@ -0,0 +1,7 @@ +#!/usr/bin/env ts-node + +import test from 'blue-tape' + +test('tbw', async (t) => { + t.pass('ok') +}) diff --git a/src/io.ts b/src/io.ts new file mode 100644 index 0000000..b89e319 --- /dev/null +++ b/src/io.ts @@ -0,0 +1,196 @@ +import { StateSwitch } from 'state-switch' +import { Sockie } from 'sockie' + +import { + log, +} from './config' + +import { JsonRpc } from './json-rpc' +import { Selector } from './selector' + +interface IoEvent { + type: string, + channel: string, + source: string, + payload: object, +} + +interface BroadcastEvent { + source : string, + payload : object, +} + +export class Io { + + private state: StateSwitch + + private role: 'slave' | 'master' + + private sockie: undefined | Sockie + private jsonRpc: undefined | JsonRpc + + private cleanCallbackList: (() => void) [] + + constructor ( + public token: string, + ) { + log.verbose('Io', 'constructor(%s)', token) + + this.role = 'slave' + this.state = new StateSwitch(`io<${token}>`) + + this.cleanCallbackList = [] + } + + async start (master = false): Promise { + log.verbose('Io', 'start(master=%s)', master) + + if (this.state.on()) { + log.warn('Io', 'start() on an already started instance') + await this.state.on() + return + } + + try { + this.state.on('pending') + + this.sockie = new Sockie() + this.jsonRpc = new JsonRpc() + + this.sockie + .pipe(this.jsonRpc) + .pipe(this.sockie) + + if (master) { + await this.master(true) + } + + this.state.on(true) + } catch (e) { + log.error('Io', 'start() rejection: %s', e.message) + this.state.off(true) + } + } + + async stop (): Promise { + log.verbose('Io', 'stop()') + + if (this.state.off()) { + log.warn('Io', 'stop() on an already stopped instance') + await this.state.off() + return + } + + if (!this.jsonRpc) { + throw new Error('this.jsonRpc not found') + } + + if (!this.sockie) { + throw new Error('this.sockie not found!') + } + + try { + this.state.off('pending') + + log.silly('Io', 'stop() this.cleanCallbackList.length=%s', this.cleanCallbackList.length) + this.cleanCallbackList.forEach(callback => callback()) + this.cleanCallbackList = [] + + this.sockie.unsubscribe() + this.sockie = undefined + + this.jsonRpc = undefined + + } catch (e) { + log.error('Io', 'stop() rejection: %s', e.message) + } finally { + + this.state.off(true) + + } + } + + // Check whether this io instance is master + master (): boolean + // Announce this io instance to master + master (declare: true): Promise + + master (declare? : true): boolean | Promise { + log.verbose('Io', 'master(%s)', + typeof declare === 'undefined' + ? '' + : declare + ) + + /** + * Check if this instance is in role of master + */ + if (typeof declare === 'undefined') { + return this.role === 'master' + } + + /** + * Declare this instance to be a master + */ + return this.jsonRpc!.master(true) + .then((result: any) => { + if (!result) { + throw new Error('declare to be master failed! error message: ' + result.errorMessage) + } + return result + }) + } + + async addMethod ( + selector : Selector, + method : Function, + ): Promise { + log.verbose('Io', 'addMethod(%s, %s)', selector, method.name) + } + + async response ( + selector: Selector, + ): Promise { + log.verbose('Io', 'response(%s)', selector) + return false + } + + async perform ( + selector: () => T, + ): Promise { + log.verbose('Io', 'perform(%s)', selector) + return {} as T + } + + async broadcast (event: BroadcastEvent): Promise { + log.verbose('Io', 'broadcast("%s")', JSON.stringify(event)) + + this.jsonRpc.broadcast(event) + } + + /** + * Listen the broadcast io events + * + * @param event Event name + * @param listener Callback listener function + * + * @returns A callback that undo the listen + */ + listen ( + event : string, + listener : (event: BroadcastEvent) => void + ): () => void { + log.verbose('Io', 'listen(%s, %s)', event, listener) + + if (!this.jsonRpc) { + throw new Error('this.jsonRpc not found') + } + + const myListener = (event: BroadcastEvent) => listener(event) + this.jsonRpc.on(event, myListener) + + const offCallback = () => this.jsonRpc?.off(event, myListener) + return offCallback + } + +} diff --git a/src/json-rpc.ts b/src/json-rpc.ts new file mode 100644 index 0000000..815b4d7 --- /dev/null +++ b/src/json-rpc.ts @@ -0,0 +1,31 @@ +import { EventEmitter } from 'events' + +import { + log, +} from './config' +// import StateSwitch from 'state-switch' + +export class JsonRpc extends EventEmitter { + + constructor () { + super() + log.verbose('JsonRpc', 'constructor()') + + // this.state = new StateSwitch('JsonRpc') + } + + /** + * Get Root Privilidge + */ + su () { + + } + + /** + * Check whether this connection is a master connection + */ + async master (declare?: boolean) { + + } + +} diff --git a/src/selector.ts b/src/selector.ts new file mode 100644 index 0000000..a60c8dc --- /dev/null +++ b/src/selector.ts @@ -0,0 +1,13 @@ +import { + log, +} from './config' + +export class Selector { + + constructor ( + public method: T, + ) { + log.verbose('Selector', 'constructor(%s)', method) + } + +} diff --git a/src/sockie.spec.ts b/src/sockie.spec.ts new file mode 100644 index 0000000..fa6b46a --- /dev/null +++ b/src/sockie.spec.ts @@ -0,0 +1,12 @@ +#!/usr/bin/env ts-node + +import test from 'blue-tape' + +import { + Sockie, +} from './sockie' + +test('tbw', async (t) => { + const s = new Sockie() + t.ok(s, 's') +}) diff --git a/src/sockie.ts b/src/sockie.ts new file mode 100644 index 0000000..6ee3d34 --- /dev/null +++ b/src/sockie.ts @@ -0,0 +1,222 @@ +/** + * Making Use of WebSockets in Angular — Way Easier Than You Expected + * https://medium.com/briebug-blog/making-use-of-websockets-in-angular-way-easier-than-you-expected-25dd0061db1d + */ +import { + Observer, + Subject, + BehaviorSubject, + Subscribable, + Unsubscribable, + PartialObserver, + Observable, + of, +} from 'rxjs' +import { + switchMap, + retryWhen, + delay, + filter, +} from 'rxjs/operators' +import { + WebSocketSubject, + webSocket, + WebSocketSubjectConfig, +} from 'rxjs/webSocket' + +import ws, { + CloseEvent, + OpenEvent, +} from 'ws' + +import { + log, +} from './config' + +const RETRY_SECONDS = 1 + +export interface SockieOptions { + url : string, + protocol? : string | string[], +} + +/** + * Naming conventions for observables + * https://angular.io/guide/rx-library#naming-conventions-for-observables + */ +export class Sockie { + + public readonly url : string + public readonly protocol : undefined | string | string[] + /** + * Observers for the open/close/closing events + */ + protected readonly openSubject$ : Subject + protected readonly closeSubject$ : Subject + protected readonly closingSubject$ : Subject + + public get open$ () { return this.openSubject$.asObservable() } + public get close$ () { return this.closeSubject$.asObservable() } + public get closing$ () { return this.closingSubject$.asObservable() } + + /** + * The RxJS WebSocketSubject + */ + protected socket$: undefined | WebSocketSubject + + constructor ( + options: string | SockieOptions, + ) { + log.verbose('Sockie', 'constructor("%s")', JSON.stringify(options)) + + if (typeof options === 'string') { + this.url = options + } else { + this.url = options.url + this.protocol = options.protocol + } + + this.openSubject$ = new Subject() + this.closeSubject$ = new Subject() + this.closingSubject$ = new Subject() + } + + protected operatorSwitchMap () { + return switchMap((url: string) => { + if (this.socket$) { + return this.socket$ + } else { + const config = { + WebSocketCtor: ws, + + closeObserver : this.closeSubject$, + closingObserver : this.closingSubject$, + openObserver : this.openSubject$, + + protocol: this.protocol, + url, + } as WebSocketSubjectConfig + + this.socket$ = webSocket(config) + + return this.socket$ + } + }) + } + + protected operatorRetryWhen () { + return retryWhen( + errors => errors.pipe(delay(RETRY_SECONDS)), + ) + + // .retryWhen(errors=>{ + // return errors.delay(1000).scan((errorCount, err)=>{ + // if(errorCount < 3) return errorCount + 1 + // throw err + // }, 0) + // }) + } + + /** + * + * Subscribable + * + */ + subscribe ( + observer?: PartialObserver, + ): Unsubscribable { + const obs = of(this.url).pipe( + this.operatorSwitchMap(), + this.operatorRetryWhen(), + ) + return obs.subscribe(observer) + } + + // subscribeBak (): Unsubscribable { + // const obs = this.url$.pipe( + // switchMap(url => { + // if (this.socket$) { + // return this.socket$ + // } else { + // const config = { + // WebSocketCtor: ws, + + // closeObserver : this.closeSubject$, + // closingObserver : this.closingSubject$, + // openObserver : this.openSubject$, + + // protocol: this.protocol$.getValue(), + // url, + // } as WebSocketSubjectConfig + + // this.socket$ = webSocket(config) + + // return this.socket$ + // } + // }), + // retryWhen((errors) => errors.pipe(delay(RETRY_SECONDS))) + // ) + // return obs.subscribe(...arguments) + // } + + send (data: T) { + if (this.socket$) { + this.socket$.next(data) + } else { + throw new Error('no connection') + } + } + + close (error?: string) { + if (this.socket$) { + if (error) { + this.socket$.error(error) + } // else complete ??? + this.socket$.complete() + + this.socket$ = undefined + } + + this.cleanCallbackList.forEach(cb => cb()) + this.cleanCallbackList = [] + } + + /** + * + * Streaming APIs + * + */ + public pipe ( + destination: S, + options?: { + end?: boolean; + }, + ): S { + log.verbose('Sockie', 'pipe(%s)', destination) + + const next = (value: T) => destination.write(JSON.stringify(value)) + const error = (err: any) => destination.emit('error', err) + const complete = () => { + if (options && options.end !== false) { + destination.end() + } + } + + const observer = { complete, error, next } as Observer + + const sub = this.connect().subscribe(observer) + this.cleanCallbackList.push( + () => sub.unsubscribe(), + ) + + return destination + } + + +} + +destroyed$ = new Subject(); +this.webSocket.connect().pipe( + takeUntil(this.destroyed$) +).subscribe(messages => this.messages.push(messages)); +this.destroyed$.next(); diff --git a/src/typings.d.ts b/src/typings.d.ts new file mode 100644 index 0000000..5f56196 --- /dev/null +++ b/src/typings.d.ts @@ -0,0 +1 @@ +declare module 'json-rpc-peer'