Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring: IO Bus #20

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,27 @@ This module is a component of the [Chatie](https://www.chatie.io) service.
- [CQRS + Event Sourcing – Step by Step](https://danielwhittaker.me/2020/02/20/cqrs-step-step-guide-flow-typical-application/)
- Talk Video: [GOTO 2015 • DDD & Microservices: At Last, Some Boundaries! • Eric Evans](https://www.youtube.com/watch?v=yPvef9R3k-M)
- Talk Video: [GOTO 2014 • Event Sourcing • Greg Young](https://www.youtube.com/watch?v=8JKjvY4etTY)
- Talk Video: [Azure Messaging: When to use what and why](https://www.youtube.com/watch?v=82akIKoX09A)
- Talk Video: [Data Consistency in Microservice Using Sagas by Chris Richardson, QCon 2018](https://www.infoq.com/presentations/saga-microservices/#downloadPdf/)
- [Saga: How to implement complex business transactions without two phase commit.](https://blog.bernd-ruecker.com/saga-how-to-implement-complex-business-transactions-without-two-phase-commit-e00aa41a1b1b)
- [Clarifying the Saga pattern](https://web.archive.org/web/20160730071315/http://kellabyte.com/2012/05/30/clarifying-the-saga-pattern/)

### WebRTC

- [Some redux tooling around creating peer to peer connections](https://github.com/jcblw/redux-peer-connection)
- [Simple WebRTC video, voice, and data channels](https://github.com/feross/simple-peer)

## History

### 0.5 master

### 0.2 (Aug 2018)

Io Server worked

### 0.0.1 (May 2, 2017)

Init code base

## Author

Expand Down
12 changes: 6 additions & 6 deletions examples/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import {
} from 'net'

import {
IoServer,
IoHub,
log,
} from '../src/'

Expand All @@ -22,18 +22,18 @@ async function main (): Promise<number> {

httpServer.listen(port, () => {
const address = httpServer.address() as AddressInfo
log.info('IoServerExample', 'Listening on ' + address.port)
log.info('IoHubExample', 'Listening on ' + address.port)
})

const ioServer = new IoServer({
const ioHub = new IoHub({
httpServer,
})

try {
await ioServer.start()
log.info('IoServerExample', 'init succeed')
await ioHub.start()
log.info('IoHubExample', 'init succeed')
} catch (e) {
log.error('IoServerExample', 'init failed: %s', e.message)
log.error('IoHubExample', 'init failed: %s', e.message)
throw e
}
return 0
Expand Down
18 changes: 10 additions & 8 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,30 +1,32 @@
{
"name": "@chatie/io",
"version": "0.3.13",
"description": "Cloud Input/Output Service for Chatie",
"version": "0.5.0",
"description": "IO Hub Service for Chatie Cloud",
"main": "dist/src/index.js",
"typings": "dist/src/index.d.ts",
"directories": {
"doc": "docs",
"test": "tests"
},
"dependencies": {
"brolog": "^1.8.1",
"listag": "^0.8.3",
"moment": "^2.22.2",
"brolog": "^1.8.3",
"listag": "^0.8.4",
"moment": "^2.24.0",
"request-ip": "^2.1.3",
"ws": "^7.0.0"
"rxjs": "^6.5.4",
"state-switch": "^0.6.18",
"ws": "^7.2.3"
},
"devDependencies": {
"@chatie/eslint-config": "^0.8.1",
"@chatie/git-scripts": "^0.2.5",
"@chatie/semver": "^0.4.7",
"@chatie/tsconfig": "^0.8.0",
"@types/request-ip": "0.0.35",
"@types/ws": "^6.0.0",
"@types/ws": "^7.2.2",
"pkg-jq": "^0.2.4",
"shx": "^0.3.2",
"tstest": "^0.4.2"
"tstest": "^0.4.10"
},
"scripts": {
"clean": "shx rm -fr dist/*",
Expand Down
File renamed without changes.
8 changes: 4 additions & 4 deletions src/server/io-server.spec.ts → src/bus/io-bus.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ import test from 'blue-tape'
import http from 'http'

import {
IoServer,
} from './io-server'
IoBus,
} from './io-bus'

test('IoServer smoking test', async t => {
const httpServer = http.createServer()
const ioServer = new IoServer({ httpServer })
const ioBus = new IoBus({ httpServer })

t.ok(ioServer, 'should instanciated an IoServer')
t.ok(ioBus, 'should instanciated an IoBus')
})
12 changes: 6 additions & 6 deletions src/server/io-server.ts → src/bus/io-bus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ import {
IoSocket,
} from './io-socket'

export interface IoServerOptions {
export interface IoBusOptions {
httpServer : http.Server,
httpPath? : string,
}

export class IoServer {
export class IoBus {

public static readonly VERSION = VERSION

Expand All @@ -57,9 +57,9 @@ export class IoServer {
* Constructor
*/
constructor (
public options: IoServerOptions,
public options: IoBusOptions,
) {
log.verbose('IoServer', 'constructor()')
log.verbose('IoBus', 'constructor()')

this.ioManager = new IoManager()
this.ioAuth = new IoAuth()
Expand All @@ -78,12 +78,12 @@ export class IoServer {
}

public async start () {
log.verbose('IoServer', 'start()')
log.verbose('IoBus', 'start()')
await this.ioSocket.start()
}

public async stop () {
log.verbose('IoServer', 'stop()')
log.verbose('IoBus', 'stop()')
// await this.ioSocket.stop()
}

Expand Down
File renamed without changes.
File renamed without changes.
2 changes: 2 additions & 0 deletions src/config.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
/// <reference path="./typings.d.ts" />

export {
log,
} from 'brolog'
Expand Down
4 changes: 2 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ export {
} from './config'

export {
IoServer,
} from './server/io-server'
IoBus,
} from './bus/io-bus'

// export {
// IoClient,
Expand Down
7 changes: 7 additions & 0 deletions src/io.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/usr/bin/env ts-node

import test from 'blue-tape'

test('tbw', async (t) => {
t.pass('ok')
})
196 changes: 196 additions & 0 deletions src/io.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
import { StateSwitch } from 'state-switch'
import { Sockie } from 'sockie'

import {
log,
} from './config'

import { JsonRpc } from './json-rpc'
import { Selector } from './selector'

interface IoEvent {
type: string,
channel: string,
source: string,
payload: object,
}

interface BroadcastEvent {
source : string,
payload : object,
}

export class Io {

private state: StateSwitch

private role: 'slave' | 'master'

private sockie: undefined | Sockie
private jsonRpc: undefined | JsonRpc

private cleanCallbackList: (() => void) []

constructor (
public token: string,
) {
log.verbose('Io', 'constructor(%s)', token)

this.role = 'slave'
this.state = new StateSwitch(`io<${token}>`)

this.cleanCallbackList = []
}

async start (master = false): Promise<void> {
log.verbose('Io', 'start(master=%s)', master)

if (this.state.on()) {
log.warn('Io', 'start() on an already started instance')
await this.state.on()
return
}

try {
this.state.on('pending')

this.sockie = new Sockie()
this.jsonRpc = new JsonRpc()

this.sockie
.pipe(this.jsonRpc)
.pipe(this.sockie)

if (master) {
await this.master(true)
}

this.state.on(true)
} catch (e) {
log.error('Io', 'start() rejection: %s', e.message)
this.state.off(true)
}
}

async stop (): Promise<void> {
log.verbose('Io', 'stop()')

if (this.state.off()) {
log.warn('Io', 'stop() on an already stopped instance')
await this.state.off()
return
}

if (!this.jsonRpc) {
throw new Error('this.jsonRpc not found')
}

if (!this.sockie) {
throw new Error('this.sockie not found!')
}

try {
this.state.off('pending')

log.silly('Io', 'stop() this.cleanCallbackList.length=%s', this.cleanCallbackList.length)
this.cleanCallbackList.forEach(callback => callback())
this.cleanCallbackList = []

this.sockie.unsubscribe()
this.sockie = undefined

this.jsonRpc = undefined

} catch (e) {
log.error('Io', 'stop() rejection: %s', e.message)
} finally {

this.state.off(true)

}
}

// Check whether this io instance is master
master (): boolean
// Announce this io instance to master
master (declare: true): Promise<void>

master (declare? : true): boolean | Promise<void> {
log.verbose('Io', 'master(%s)',
typeof declare === 'undefined'
? ''
: declare
)

/**
* Check if this instance is in role of master
*/
if (typeof declare === 'undefined') {
return this.role === 'master'
}

/**
* Declare this instance to be a master
*/
return this.jsonRpc!.master(true)
.then((result: any) => {
if (!result) {
throw new Error('declare to be master failed! error message: ' + result.errorMessage)
}
return result
})
}

async addMethod<T extends Function> (
selector : Selector<T>,
method : Function,
): Promise<void> {
log.verbose('Io', 'addMethod(%s, %s)', selector, method.name)
}

async response<T> (
selector: Selector<T>,
): Promise<boolean> {
log.verbose('Io', 'response(%s)', selector)
return false
}

async perform<T> (
selector: () => T,
): Promise<T> {
log.verbose('Io', 'perform(%s)', selector)
return {} as T
}

async broadcast (event: BroadcastEvent): Promise<void> {
log.verbose('Io', 'broadcast("%s")', JSON.stringify(event))

this.jsonRpc.broadcast(event)
}

/**
* Listen the broadcast io events
*
* @param event Event name
* @param listener Callback listener function
*
* @returns A callback that undo the listen
*/
listen (
event : string,
listener : (event: BroadcastEvent) => void
): () => void {
log.verbose('Io', 'listen(%s, %s)', event, listener)

if (!this.jsonRpc) {
throw new Error('this.jsonRpc not found')
}

const myListener = (event: BroadcastEvent) => listener(event)
this.jsonRpc.on(event, myListener)

const offCallback = () => this.jsonRpc?.off(event, myListener)
return offCallback
}

}
Loading