Skip to content

Commit

Permalink
Merge branch 'release/v0.22.4'
Browse files Browse the repository at this point in the history
  • Loading branch information
holtwick committed Jul 12, 2024
2 parents 7ef7790 + 19c6ad5 commit f80f4ca
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 15 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "zeed",
"type": "module",
"version": "0.22.3",
"version": "0.22.4",
"description": "🌱 Simple foundation library",
"author": {
"name": "Dirk Holtwick",
Expand Down
25 changes: 25 additions & 0 deletions src/common/data/string-hash-pool.spec.ts

Large diffs are not rendered by default.

29 changes: 29 additions & 0 deletions src/common/data/string-hash-pool.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import { stringHashFNV1a } from './string-hash-fnv'

export function useStringHashPool() {
const hashToString: Map<number, string> = new Map()
const stringToHash: Map<string, number> = new Map()

function hash(s: string) {
let v: number | undefined = stringToHash.get(s)
if (v == null) {
v = stringHashFNV1a(s)
if (hashToString.has(v))
throw new Error(`Hash for '${s}' is already in use for '${hashToString.get(v)}`)
hashToString.set(v, s)
stringToHash.set(s, v)
}
return v
}

function stringForHash(v: number) {
return hashToString.get(v)
}

return {
hash,
stringForHash,
}
}

export type UseStringHashPool = ReturnType<typeof useStringHashPool>
47 changes: 44 additions & 3 deletions src/common/msg/rpc.xspec.ts → src/common/msg/rpc.spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import { MessageChannel } from 'node:worker_threads'
import { decodeJson, encodeJson } from '../bin'
import { createLocalChannelPair } from '../msg/channel-local'
import { cloneObject } from '../data'
import { useStringHashPool } from '../data/string-hash-pool'
import { sleep } from '../exec/promise'
import { createLocalChannelPair } from '../msg/channel-local'
import { useRPC, useRPCHub } from './rpc'

let bobCount = 0
Expand All @@ -27,22 +29,27 @@ const Alice = {
type BobFunctions = typeof Bob
type AliceFunctions = typeof Alice

describe.skip('rpc async', () => {
describe('rpc async', () => {
beforeEach(() => {
bobCount = 0
})

it('basic', async () => {
const log: any[] = []
const channel = new MessageChannel()

const serialize = (data: any) => encodeJson(data)
const serialize = (data: any) => {
log.push(cloneObject(data))
return encodeJson(data)
}
const deserialize = (data: any) => decodeJson(data)

const bob = useRPC<BobFunctions, AliceFunctions>(Bob, {
post: data => channel.port1.postMessage(data),
on: data => channel.port1.on('message', data),
serialize,
deserialize,
stringHashPool: useStringHashPool(),
})

const alice = useRPC<AliceFunctions, BobFunctions>(Alice, {
Expand All @@ -53,6 +60,7 @@ describe.skip('rpc async', () => {
on: data => channel.port2.on('message', data),
serialize,
deserialize,
stringHashPool: useStringHashPool(),
})

// RPCs
Expand All @@ -68,6 +76,37 @@ describe.skip('rpc async', () => {

channel.port1.close()
channel.port2.close()

expect(log).toMatchInlineSnapshot(`
Array [
Array [
1,
1,
1335831723,
"Bob",
],
Array [
3,
1,
"Hello Bob, my name is Alice",
],
Array [
1,
2,
1748694682,
"Alice",
],
Array [
3,
2,
"Hi Alice, I am Bob",
],
Array [
2,
1648103349,
],
]
`)
})

it('hub', async () => {
Expand All @@ -80,6 +119,7 @@ describe.skip('rpc async', () => {
on: data => channel.port1.on('message', data),
serialize,
deserialize,
stringHashPool: useStringHashPool(),
})

const bob = bobHub<BobFunctions, AliceFunctions>(Bob)
Expand All @@ -92,6 +132,7 @@ describe.skip('rpc async', () => {
on: data => channel.port2.on('message', data),
serialize,
deserialize,
stringHashPool: useStringHashPool(),
})

// RPCs
Expand Down
62 changes: 51 additions & 11 deletions src/common/msg/rpc.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// From https://github.com/antfu/birpc/blob/main/src/index.ts MIT

import type { UseStringHashPool } from '../data/string-hash-pool'
import { createPromise } from '../exec/promise'
import type { LoggerInterface } from '../log/log-base'
import type { Pipe } from './pipe'
Expand All @@ -20,6 +21,8 @@ export interface RPCOptionsBasic extends Pipe {
onTimeoutError?: (functionName: string, args: any[]) => boolean | void
/** Throw execptions. Default: true */
exceptions?: boolean
/** */
stringHashPool?: UseStringHashPool
}

export interface RPCOptions<Remote> extends RPCOptionsBasic {
Expand Down Expand Up @@ -52,7 +55,7 @@ enum RPCMode {
type RPCMessage = [
RPCMode,
number,
string | any,
string | number | any,
...any,
]

Expand All @@ -71,8 +74,33 @@ function setupRPCBasic(options: RPCOptionsBasic, functions: any, eventNames: str
onTimeoutError,
onlyEvents = false,
exceptions = true,
stringHashPool,
} = options

if (stringHashPool) {
Object.keys(functions).forEach(stringHashPool.hash)
}

function checkEventNames(eventNames: string[]) {
// eventNames.forEach((n) => {
// if (functions[n] == null)
// throw new Error(`event name ${n} has no registered function`)
// })
}

checkEventNames(eventNames)

function registerFunctions(additionalFunctions: any) {
Object.assign(functions, additionalFunctions ?? {})
if (stringHashPool)
Object.keys(additionalFunctions).forEach(stringHashPool.hash)
}

function registerEventNames(additionalEventNames: string[]) {
checkEventNames(additionalEventNames)
eventNames.push(...additionalEventNames)
}

const rpcPromiseMap = new Map<number, {
resolve: (...args: any) => any
reject: (...args: any) => any
Expand All @@ -85,11 +113,12 @@ function setupRPCBasic(options: RPCOptionsBasic, functions: any, eventNames: str
const mode = msg?.[0]
const id = mode === RPCMode.event ? 0 : msg?.[1]
const [method, ...args] = msg.slice(mode === RPCMode.event ? 1 : 2)
const methodName = stringHashPool?.stringForHash(method) ?? method
if (mode === RPCMode.request || mode === RPCMode.event) {
let result, error: any
if (method != null) {
try {
const fn = functions[method] as Function
const fn = functions[methodName] as Function
result = await fn(...args)
}
catch (e) {
Expand Down Expand Up @@ -128,10 +157,11 @@ function setupRPCBasic(options: RPCOptionsBasic, functions: any, eventNames: str
})

const proxyHandler = {
get(_: any, method: string) {
get(_: any, methodName: string) {
const method = stringHashPool?.hash(methodName) ?? methodName
const sendEvent = async (...args: any[]) => await post(await serialize([RPCMode.event, method, ...args]))

if (onlyEvents || eventNames.includes(method)) {
if (onlyEvents || eventNames.includes(methodName)) {
sendEvent.asEvent = sendEvent
return sendEvent
}
Expand All @@ -145,8 +175,8 @@ function setupRPCBasic(options: RPCOptionsBasic, functions: any, eventNames: str
timeoutId = setTimeout(() => {
try {
// Custom onTimeoutError handler can throw its own error too
onTimeoutError?.(method, args)
throw new Error(`rpc timeout on calling "${method}"`)
onTimeoutError?.(methodName, args)
throw new Error(`rpc timeout on calling "${methodName}"`)
}
catch (e) {
if (exceptions === true)
Expand All @@ -168,7 +198,14 @@ function setupRPCBasic(options: RPCOptionsBasic, functions: any, eventNames: str
},
}

return { post, serialize, rpcPromiseMap, proxyHandler }
return {
post,
serialize,
rpcPromiseMap,
proxyHandler,
registerFunctions,
registerEventNames,
}
}

export function useRPC<LocalFunctions, RemoteFunctions = LocalFunctions>(
Expand All @@ -186,7 +223,11 @@ export function useRPCHub(options: RPCOptionsBasic) {
const eventNames: string[] = []
const functions: Record<string, any> = {}

const { proxyHandler } = setupRPCBasic(options, functions)
const {
proxyHandler,
registerFunctions,
registerEventNames,
} = setupRPCBasic(options, functions, eventNames)

function createRPCProxy() {
return new Proxy({}, proxyHandler)
Expand All @@ -196,9 +237,8 @@ export function useRPCHub(options: RPCOptionsBasic) {
additionalFunctions?: LocalFunctions,
additionalEventNames: string[] = [],
): RPCReturn<RemoteFunctions> {
Object.assign(functions, additionalFunctions ?? {})
// log(`Registered functions:\n${Object.keys(functions).join('\n')}`)
eventNames.push(...additionalEventNames)
registerFunctions(additionalFunctions ?? {})
registerEventNames(additionalEventNames)
return createRPCProxy()
}
}
Expand Down

0 comments on commit f80f4ca

Please sign in to comment.