Skip to content

Commit

Permalink
Fix/more connection nonsense (#2695)
Browse files Browse the repository at this point in the history
* More connection tweaks, add connected peers interval, better handling of abortable generators

* Update websocket files to better match current websocket transport in libp2p
  • Loading branch information
islathehut authored Jan 16, 2025
1 parent d7d480c commit b1fee6d
Show file tree
Hide file tree
Showing 10 changed files with 488 additions and 835 deletions.
771 changes: 126 additions & 645 deletions packages/backend/package-lock.json

Large diffs are not rendered by default.

7 changes: 6 additions & 1 deletion packages/backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
"babel-jest": "^29.3.1",
"cross-env": "^5.2.0",
"fetch-mock-jest": "^1.5.1",
"it-stream-types": "^2.0.2",
"jest": "^29.4.2",
"lint-staged": "^15.2.2",
"mock-fs": "^5.1.2",
Expand Down Expand Up @@ -112,6 +113,7 @@
"@localfirst/auth": "file:../../3rd-party/auth/packages/auth/dist",
"@localfirst/crdx": "file:../../3rd-party/auth/packages/crdx/dist",
"@multiformats/multiaddr": "^12.3.0",
"@multiformats/multiaddr-matcher": "^1.6.0",
"@multiformats/multiaddr-to-uri": "^10.1.0",
"@nestjs/common": "^10.2.10",
"@nestjs/core": "^10.2.10",
Expand Down Expand Up @@ -161,6 +163,7 @@
"multiformats": "13.3.1",
"node-fetch": "^3.3.0",
"p-defer": "^4.0.1",
"p-event": "^6.0.1",
"p-queue": "7.3.4",
"pkijs": "3.0.15",
"progress-events": "^1.0.1",
Expand All @@ -173,8 +176,10 @@
"string-replace-loader": "3.1.0",
"ts-jest-resolver": "^2.0.0",
"uint8arrays": "^5.1.0",
"uint8arraylist": "^2.4.8",
"utf-8-validate": "^5.0.2",
"validator": "^13.11.0"
"validator": "^13.11.0",
"ws": "^8.18.0"
},
"overrides": {
"level": "$level",
Expand Down
27 changes: 20 additions & 7 deletions packages/backend/src/nest/common/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -281,14 +281,21 @@ export const createArbitraryFile = (filePath: string, sizeBytes: number) => {
stream.end()
}

export async function* asyncGeneratorFromIterator<T>(asyncIterator: AsyncIterable<T>): AsyncGenerator<T> {
for await (const value of asyncIterator) {
yield value
}
}

// Shamelessly stolen from https://github.com/whatwg/streams/issues/1255#issuecomment-2442964298
// This is necessary because AsyncIterators are fickle and if you just wrap them in a try/catch or try to use
// catch/then/finally on a wrapper promise it ultimately generates an unhandled rejection. JS is so much fun.
export function abortableAsyncIterable<T, TReturn, TNext>(
iter: AsyncIterable<T> | AsyncGenerator<T, TReturn, TNext>,
signal?: AbortSignal,
timeoutMs?: number
): AsyncIterable<T> {
export function abortableAsyncIterable<
T,
TReturn,
TNext,
IterType = AsyncIterable<T> | AsyncGenerator<T, TReturn, TNext>,
>(iter: IterType, signal?: AbortSignal, timeoutMs?: number): IterType {
const abortedPromise = new Promise<IteratorResult<T, TReturn>>((resolve, reject) => {
const ABORT_MESSAGE = 'Operation aborted'
const TIMEOUT_MESSAGE = `Operation exceeded timeout of ${timeoutMs}ms`
Expand Down Expand Up @@ -318,9 +325,9 @@ export function abortableAsyncIterable<T, TReturn, TNext>(
})
abortedPromise.catch(() => {})

return {
const abortableIterable: AsyncIterable<T> = {
[Symbol.asyncIterator]: () => {
const inner = iter[Symbol.asyncIterator]()
const inner = (iter as AsyncIterable<T>)[Symbol.asyncIterator]()
const { return: _return, throw: _throw } = inner
return {
next: (...args) => Promise.race([inner.next(...args), abortedPromise]),
Expand All @@ -329,4 +336,10 @@ export function abortableAsyncIterable<T, TReturn, TNext>(
}
},
}

if (Object.prototype.toString.call(iter) === '[object AsyncGenerator]') {
return asyncGeneratorFromIterator(abortableIterable) as IterType
}

return abortableIterable as IterType
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,6 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI
throw new Error(`Unhandled Rejection`)
})

process.on('uncaughtException', error => {
this.logger.error(`Unhandled exception`, error)
throw new Error(`Unhandled Exception`)
})

// process.on('SIGINT', function () {
// // This is not graceful even in a single percent. we must close services first, not just kill process %
// // this.logger.info('\nGracefully shutting down from SIGINT (Ctrl-C)')
Expand Down
19 changes: 18 additions & 1 deletion packages/backend/src/nest/libp2p/libp2p.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ export class Libp2pService extends EventEmitter {
public libp2pDatastore: Libp2pDatastore
private redialTimeout: NodeJS.Timeout
private localAddress: string
private _connectedPeersInterval: NodeJS.Timer

private readonly logger = createLogger(Libp2pService.name)

Expand Down Expand Up @@ -290,7 +291,7 @@ export class Libp2pService extends EventEmitter {
},
localAddress: params.localAddress,
targetPort: params.targetPort,
closeOnEnd: true,
inboundConnectionUpgradeTimeout: 30_000,
}),
],
services: {
Expand Down Expand Up @@ -422,12 +423,28 @@ export class Libp2pService extends EventEmitter {
this.libp2pInstance.getMultiaddrs().map(addr => addr.toString())
)

this._connectedPeersInterval = setInterval(() => {
const connections = []
for (const [peerId, peer] of this.connectedPeers.entries()) {
connections.push({
peerId,
address: peer.address,
connectedAtSeconds: peer.connectedAtSeconds,
})
}
this.logger.info(`Current Connected Peers`, {
connectionCount: this.connectedPeers.size,
connections,
})
}, 60_000)

this.logger.info(`Initialized libp2p for peer ${peerId.peerId.toString()}`)
}

public async close(): Promise<void> {
this.logger.info('Closing libp2p service')
clearTimeout(this.redialTimeout)
clearInterval(this._connectedPeersInterval)
await this.hangUpPeers()
await this.libp2pInstance?.stop()
await this.libp2pDatastore?.close()
Expand Down
42 changes: 39 additions & 3 deletions packages/backend/src/nest/websocketOverTor/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@
// Essentially, the only thing we've done is override the listening port of the
// listener and add a remoteAddress query parameter in the _connect function.

import { ConnectionFailedError, transportSymbol, serviceCapabilities } from '@libp2p/interface'
import {
ConnectionFailedError,
transportSymbol,
serviceCapabilities,
TypedEventTarget,
Libp2pEvents,
} from '@libp2p/interface'
import { multiaddrToUri as toUri } from '@multiformats/multiaddr-to-uri'
import { connect, type WebSocketOptions } from 'it-ws/client'
import pDefer from 'p-defer'
Expand Down Expand Up @@ -32,17 +38,45 @@ import type { Server } from 'http'
import type { DuplexWebSocket } from 'it-ws/duplex'
import type { ProgressEvent } from 'progress-events'
import type { ClientOptions } from 'ws'
import http from 'node:http'
import https from 'node:https'

export interface WebSocketsInit extends AbortOptions, WebSocketOptions {
/**
* @deprecated Use a ConnectionGater instead
*/
filter?: MultiaddrFilter

/**
* Options used to create WebSockets
*/
websocket?: ClientOptions
server?: Server

/**
* Options used to create the HTTP server
*/
http?: http.ServerOptions

/**
* Options used to create the HTTPs server. `options.http` will be used if
* unspecified.
*/
https?: https.ServerOptions

/**
* Inbound connections must complete their upgrade within this many ms
*
* @default 5000
*/
inboundConnectionUpgradeTimeout?: number

localAddress: string
targetPort: number
}

export interface WebSocketsComponents {
logger: ComponentLogger
events: TypedEventTarget<Libp2pEvents>
metrics?: Metrics
}

Expand Down Expand Up @@ -99,9 +133,10 @@ export class WebSockets implements Transport<WebSocketsDialEvents> {
}

async _connect(ma: Multiaddr, options: DialTransportOptions<WebSocketsDialEvents>): Promise<DuplexWebSocket> {
const _log = this.components.logger.forComponent(`libp2p:websockets:dial:connect:${ma.getPeerId()}`)
options?.signal?.throwIfAborted()

const _log = this.components.logger.forComponent(`libp2p:websockets:dial:connect:${ma.getPeerId()}`)

const cOpts = ma.toOptions()
_log('dialing %s:%s', cOpts.host, cOpts.port)

Expand Down Expand Up @@ -149,6 +184,7 @@ export class WebSockets implements Transport<WebSocketsDialEvents> {
return createListener(
{
logger: this.logger,
events: this.components.events,
metrics: this.components.metrics,
},
{
Expand Down
Loading

0 comments on commit b1fee6d

Please sign in to comment.