diff --git a/interop/README.md b/interop/README.md index f10057aa9e..bc321dbd24 100644 --- a/interop/README.md +++ b/interop/README.md @@ -75,7 +75,7 @@ $ docker build . -f ./interop/BrowserDockerfile -t js-libp2p-browsers - When starting the docker container add `-e GOLOG_LOG_LEVEL=debug` 4. Build the version you want to test against ```console - $ cd multidim-interop/impl/$IMPL/$VERSION + $ cd transport-interop/impl/$IMPL/$VERSION $ make ... ``` diff --git a/interop/firefox-version.json b/interop/firefox-version.json index 99c6e3a404..9e8590ae9e 100644 --- a/interop/firefox-version.json +++ b/interop/firefox-version.json @@ -13,5 +13,5 @@ } ], "secureChannels": ["noise"], - "muxers": ["mplex", "yamux"] + "muxers": ["yamux", "mplex"] } diff --git a/interop/node-version.json b/interop/node-version.json index 2d58e70601..85fc87a62a 100644 --- a/interop/node-version.json +++ b/interop/node-version.json @@ -1,19 +1,16 @@ { - "id": "node-js-libp2p-head", - "containerImageID": "node-js-libp2p-head", - "transports": [ - "tcp", - "ws", - { - "name": "wss", - "onlyDial": true - } - ], - "secureChannels": [ - "noise" - ], - "muxers": [ - "mplex", - "yamux" - ] -} \ No newline at end of file + "id": "node-js-libp2p-head", + "containerImageID": "node-js-libp2p-head", + "transports": [ + "tcp", + "ws", + { + "name": "wss", + "onlyDial": true + }, + "webrtc", + "webrtc-direct" + ], + "secureChannels": ["noise"], + "muxers": ["yamux", "mplex"] +} diff --git a/interop/test/dialer.spec.ts b/interop/test/dialer.spec.ts index 88bb64a74c..5fbd873cc1 100644 --- a/interop/test/dialer.spec.ts +++ b/interop/test/dialer.spec.ts @@ -8,7 +8,7 @@ import type { Libp2p } from '@libp2p/interface' import type { PingService } from '@libp2p/ping' const isDialer: boolean = process.env.is_dialer === 'true' -const timeoutSecs: string = process.env.test_timeout_secs ?? '180' +const timeoutMs: number = parseInt(process.env.test_timeout_secs ?? '180') * 1000 describe('ping test (dialer)', function () { if (!isDialer) { @@ -16,7 +16,7 @@ describe('ping test (dialer)', function () { } // make the default timeout longer than the listener timeout - this.timeout((parseInt(timeoutSecs) * 1000) + 30000) + this.timeout(timeoutMs + 30_000) let node: Libp2p<{ ping: PingService }> beforeEach(async () => { @@ -32,7 +32,7 @@ describe('ping test (dialer)', function () { }) it('should dial and ping', async function () { - let [, otherMaStr]: string[] = await redisProxy(['BLPOP', 'listenerAddr', timeoutSecs]) + let [, otherMaStr]: string[] = await redisProxy(['BLPOP', 'listenerAddr', `${timeoutMs / 1000}`]) // Hack until these are merged: // - https://github.com/multiformats/js-multiaddr-to-uri/pull/120 @@ -45,7 +45,9 @@ describe('ping test (dialer)', function () { await node.dial(otherMa) console.error(`node ${node.peerId.toString()} pings: ${otherMa}`) - const pingRTT = await node.services.ping.ping(multiaddr(otherMa)) + const pingRTT = await node.services.ping.ping(multiaddr(otherMa), { + signal: AbortSignal.timeout(timeoutMs) + }) const handshakePlusOneRTT = Date.now() - handshakeStartInstant console.log(JSON.stringify({ handshakePlusOneRTTMillis: handshakePlusOneRTT, diff --git a/interop/test/listener.spec.ts b/interop/test/listener.spec.ts index 73bd79bdf7..517ace8b83 100644 --- a/interop/test/listener.spec.ts +++ b/interop/test/listener.spec.ts @@ -67,6 +67,7 @@ describe('ping test (listener)', function () { } console.error('inform redis of dial address') + console.error(multiaddrs) // Send the listener addr over the proxy server so this works on both the Browser and Node await redisProxy(['RPUSH', 'listenerAddr', multiaddrs[0]]) // Wait diff --git a/packages/integration-tests/test/compliance/transport/webrtc-direct.spec.ts b/packages/integration-tests/test/compliance/transport/webrtc-direct.spec.ts new file mode 100644 index 0000000000..eb3254b97e --- /dev/null +++ b/packages/integration-tests/test/compliance/transport/webrtc-direct.spec.ts @@ -0,0 +1,39 @@ +import tests from '@libp2p/interface-compliance-tests/transport' +import { webRTCDirect } from '@libp2p/webrtc' +import { WebRTCDirect } from '@multiformats/multiaddr-matcher' +import { isNode, isElectron } from 'wherearewe' + +describe('WebRTC-Direct interface-transport compliance', () => { + if (!isNode && !isElectron) { + return + } + + tests({ + async setup () { + const dialer = { + transports: [ + webRTCDirect() + ], + connectionMonitor: { + enabled: false + } + } + + return { + dialer, + listener: { + addresses: { + listen: [ + '/ip4/127.0.0.1/udp/0/webrtc-direct', + '/ip4/127.0.0.1/udp/0/webrtc-direct' + ] + }, + ...dialer + }, + dialMultiaddrMatcher: WebRTCDirect, + listenMultiaddrMatcher: WebRTCDirect + } + }, + async teardown () {} + }) +}) diff --git a/packages/integration-tests/test/interop.ts b/packages/integration-tests/test/interop.ts index a4813c143e..e8f2924209 100644 --- a/packages/integration-tests/test/interop.ts +++ b/packages/integration-tests/test/interop.ts @@ -15,6 +15,7 @@ import { mplex } from '@libp2p/mplex' import { plaintext } from '@libp2p/plaintext' import { tcp } from '@libp2p/tcp' import { tls } from '@libp2p/tls' +import { webRTCDirect } from '@libp2p/webrtc' import { multiaddr } from '@multiformats/multiaddr' import { execa } from 'execa' import { path as p2pd } from 'go-libp2p' @@ -131,7 +132,11 @@ async function createJsPeer (options: SpawnOptions): Promise<Daemon> { addresses: { listen: [] }, - transports: [tcp(), circuitRelayTransport()], + transports: [ + tcp(), + circuitRelayTransport(), + webRTCDirect() + ], streamMuxers: [], connectionEncrypters: [noise()] } @@ -139,12 +144,14 @@ async function createJsPeer (options: SpawnOptions): Promise<Daemon> { if (options.noListen !== true) { if (options.transport == null || options.transport === 'tcp') { opts.addresses?.listen?.push('/ip4/127.0.0.1/tcp/0') + } else if (options.transport === 'webrtc-direct') { + opts.addresses?.listen?.push('/ip4/127.0.0.1/udp/0/webrtc-direct') } else { throw new UnsupportedError() } } - if (options.transport === 'webtransport' || options.transport === 'webrtc-direct') { + if (options.transport === 'webtransport') { throw new UnsupportedError() } @@ -191,7 +198,7 @@ async function createJsPeer (options: SpawnOptions): Promise<Daemon> { services }) - const server = createServer(multiaddr('/ip4/0.0.0.0/tcp/0'), node) + const server = createServer(multiaddr('/ip4/127.0.0.1/tcp/0'), node) await server.start() return { diff --git a/packages/interface-compliance-tests/src/transport/index.ts b/packages/interface-compliance-tests/src/transport/index.ts index 82fb68631a..20ef2cef02 100644 --- a/packages/interface-compliance-tests/src/transport/index.ts +++ b/packages/interface-compliance-tests/src/transport/index.ts @@ -191,8 +191,13 @@ export default (common: TestSetup<TransportTestFixtures>): void => { throw new Error('Oh noes!') } - await expect(dialer.dial(dialAddrs[0])).to.eventually.be.rejected - .with.property('name', 'EncryptionFailedError') + // transports with their own muxers/encryption will perform the upgrade + // after the connection has been established (e.g. peer ids have been + // exchanged) so perform the dial and wait for the remote to attempt the + // upgrade - if it fails the listener should close the underlying + // connection which should remove the it from the dialer's connection map + await dialer.dial(dialAddrs[0]).catch(() => {}) + await delay(1000) expect(dialer.getConnections().filter(conn => { return dialMultiaddrMatcher.exactMatch(conn.remoteAddr) diff --git a/packages/transport-webrtc/README.md b/packages/transport-webrtc/README.md index f49930cfa3..7c2120796d 100644 --- a/packages/transport-webrtc/README.md +++ b/packages/transport-webrtc/README.md @@ -44,18 +44,6 @@ A WebRTC Direct multiaddr also includes a certhash of the target peer - this is In both cases, once the connection is established a [Noise handshake](https://noiseprotocol.org/noise.html) is carried out to ensure that the remote peer has the private key that corresponds to the public key that makes up their PeerId, giving you both encryption and authentication. -## Support - -WebRTC is supported in both Node.js and browsers. - -At the time of writing, WebRTC Direct is dial-only in browsers and not supported in Node.js at all. - -Support in Node.js is possible but PRs will need to be opened to [libdatachannel](https://github.com/paullouisageneau/libdatachannel) and the appropriate APIs exposed in [node-datachannel](https://github.com/murat-dogan/node-datachannel). - -WebRTC Direct support is available in rust-libp2p and arriving soon in go-libp2p. - -See the WebRTC section of <https://connectivity.libp2p.io> for more information. - ## Example - WebRTC WebRTC requires use of a relay to connect two nodes. The listener first discovers a relay server and makes a reservation, then the dialer can connect via the relayed address. @@ -180,26 +168,33 @@ The only implementation that supports a WebRTC Direct listener is go-libp2p and ```TypeScript import { createLibp2p } from 'libp2p' -import { noise } from '@chainsafe/libp2p-noise' import { multiaddr } from '@multiformats/multiaddr' import { pipe } from 'it-pipe' import { fromString, toString } from 'uint8arrays' import { webRTCDirect } from '@libp2p/webrtc' -const node = await createLibp2p({ +const listener = await createLibp2p({ + addresses: { + listen: [ + '/ip4/0.0.0.0/udp/0/webrtc-direct' + ] + }, + transports: [ + webRTCDirect() + ] +}) + +await listener.start() + +const dialer = await createLibp2p({ transports: [ webRTCDirect() - ], - connectionEncrypters: [ - noise() ] }) -await node.start() +await dialer.start() -// this multiaddr corresponds to a remote node running a WebRTC Direct listener -const ma = multiaddr('/ip4/0.0.0.0/udp/56093/webrtc-direct/certhash/uEiByaEfNSLBexWBNFZy_QB1vAKEj7JAXDizRs4_SnTflsQ') -const stream = await node.dialProtocol(ma, '/my-protocol/1.0.0', { +const stream = await dialer.dialProtocol(listener.getMultiaddrs(), '/my-protocol/1.0.0', { signal: AbortSignal.timeout(10_000) }) diff --git a/packages/transport-webrtc/package.json b/packages/transport-webrtc/package.json index c27b75cdb2..5a949bb70a 100644 --- a/packages/transport-webrtc/package.json +++ b/packages/transport-webrtc/package.json @@ -40,7 +40,7 @@ "generate": "protons src/private-to-private/pb/message.proto src/pb/message.proto", "build": "aegir build", "test": "aegir test -t node -t browser", - "test:node": "aegir test -t node --cov -- --exit", + "test:node": "aegir test -t node --cov", "test:chrome": "aegir test -t browser --cov", "test:firefox": "aegir test -t browser -- --browser firefox", "test:webkit": "aegir test -t browser -- --browser webkit", @@ -51,27 +51,35 @@ "doc-check": "aegir doc-check" }, "dependencies": { + "@chainsafe/is-ip": "^2.0.2", "@chainsafe/libp2p-noise": "^16.0.0", + "@ipshipyard/node-datachannel": "^0.26.4", "@libp2p/interface": "^2.5.0", "@libp2p/interface-internal": "^2.3.0", "@libp2p/peer-id": "^5.0.12", "@libp2p/utils": "^6.5.1", "@multiformats/multiaddr": "^12.3.3", "@multiformats/multiaddr-matcher": "^1.6.0", + "@peculiar/webcrypto": "^1.5.0", + "@peculiar/x509": "^1.11.0", + "any-signal": "^4.1.1", "detect-browser": "^5.3.0", + "get-port": "^7.1.0", "it-length-prefixed": "^9.1.0", "it-protobuf-stream": "^1.1.5", "it-pushable": "^3.2.3", "it-stream-types": "^2.0.2", "multiformats": "^13.3.1", - "node-datachannel": "^0.11.0", "p-defer": "^4.0.1", "p-event": "^6.0.1", "p-timeout": "^6.1.3", + "p-wait-for": "^5.0.2", "progress-events": "^1.0.1", "protons-runtime": "^5.5.0", + "race-event": "^1.3.0", "race-signal": "^1.1.0", "react-native-webrtc": "^124.0.4", + "stun": "^2.1.0", "uint8-varint": "^2.0.4", "uint8arraylist": "^2.4.8", "uint8arrays": "^5.1.0" @@ -91,7 +99,10 @@ "sinon-ts": "^2.0.0" }, "browser": { - "./dist/src/webrtc/index.js": "./dist/src/webrtc/index.browser.js" + "./dist/src/webrtc/index.js": "./dist/src/webrtc/index.browser.js", + "./dist/src/private-to-public/listener.js": "./dist/src/private-to-public/listener.browser.js", + "./dist/src/private-to-public/utils/get-rtcpeerconnection.js": "./dist/src/private-to-public/utils/get-rtcpeerconnection.browser.js", + "node:net": false }, "react-native": { "./dist/src/webrtc/index.js": "./dist/src/webrtc/index.react-native.js" diff --git a/packages/transport-webrtc/src/constants.ts b/packages/transport-webrtc/src/constants.ts index e3197db092..f629c5f454 100644 --- a/packages/transport-webrtc/src/constants.ts +++ b/packages/transport-webrtc/src/constants.ts @@ -12,3 +12,7 @@ export const DEFAULT_ICE_SERVERS = [ 'stun:stun.cloudflare.com:3478', 'stun:stun.services.mozilla.com:3478' ] + +export const UFRAG_ALPHABET = Array.from('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890') + +export const UFRAG_PREFIX = 'libp2p+webrtc+v1/' diff --git a/packages/transport-webrtc/src/index.ts b/packages/transport-webrtc/src/index.ts index 09bbd76700..6599ce45d6 100644 --- a/packages/transport-webrtc/src/index.ts +++ b/packages/transport-webrtc/src/index.ts @@ -21,18 +21,6 @@ * * In both cases, once the connection is established a [Noise handshake](https://noiseprotocol.org/noise.html) is carried out to ensure that the remote peer has the private key that corresponds to the public key that makes up their PeerId, giving you both encryption and authentication. * - * ## Support - * - * WebRTC is supported in both Node.js and browsers. - * - * At the time of writing, WebRTC Direct is dial-only in browsers and not supported in Node.js at all. - * - * Support in Node.js is possible but PRs will need to be opened to [libdatachannel](https://github.com/paullouisageneau/libdatachannel) and the appropriate APIs exposed in [node-datachannel](https://github.com/murat-dogan/node-datachannel). - * - * WebRTC Direct support is available in rust-libp2p and arriving soon in go-libp2p. - * - * See the WebRTC section of https://connectivity.libp2p.io for more information. - * * @example WebRTC * * WebRTC requires use of a relay to connect two nodes. The listener first discovers a relay server and makes a reservation, then the dialer can connect via the relayed address. @@ -157,26 +145,33 @@ * * ```TypeScript * import { createLibp2p } from 'libp2p' - * import { noise } from '@chainsafe/libp2p-noise' * import { multiaddr } from '@multiformats/multiaddr' * import { pipe } from 'it-pipe' * import { fromString, toString } from 'uint8arrays' * import { webRTCDirect } from '@libp2p/webrtc' * - * const node = await createLibp2p({ + * const listener = await createLibp2p({ + * addresses: { + * listen: [ + * '/ip4/0.0.0.0/udp/0/webrtc-direct' + * ] + * }, * transports: [ * webRTCDirect() - * ], - * connectionEncrypters: [ - * noise() * ] * }) * - * await node.start() + * await listener.start() * - * // this multiaddr corresponds to a remote node running a WebRTC Direct listener - * const ma = multiaddr('/ip4/0.0.0.0/udp/56093/webrtc-direct/certhash/uEiByaEfNSLBexWBNFZy_QB1vAKEj7JAXDizRs4_SnTflsQ') - * const stream = await node.dialProtocol(ma, '/my-protocol/1.0.0', { + * const dialer = await createLibp2p({ + * transports: [ + * webRTCDirect() + * ] + * }) + * + * await dialer.start() + * + * const stream = await dialer.dialProtocol(listener.getMultiaddrs(), '/my-protocol/1.0.0', { * signal: AbortSignal.timeout(10_000) * }) * @@ -247,6 +242,25 @@ export interface DataChannelOptions { openTimeout?: number } +/** + * PEM format server certificate and private key + */ +export interface TransportCertificate { + /** + * The private key for the certificate in PEM format + */ + privateKey: string + /** + * PEM format certificate + */ + pem: string + + /** + * The hash of the certificate + */ + certhash: string +} + export type { WebRTCTransportDirectInit, WebRTCDirectTransportComponents } function webRTCDirect (init?: WebRTCTransportDirectInit): (components: WebRTCDirectTransportComponents) => Transport { diff --git a/packages/transport-webrtc/src/maconn.ts b/packages/transport-webrtc/src/maconn.ts index 0e2c20c011..984350e9e8 100644 --- a/packages/transport-webrtc/src/maconn.ts +++ b/packages/transport-webrtc/src/maconn.ts @@ -1,4 +1,5 @@ import { nopSink, nopSource } from './util.js' +import type { RTCPeerConnection } from './webrtc/index.js' import type { ComponentLogger, Logger, MultiaddrConnection, MultiaddrConnectionTimeline, CounterGroup } from '@libp2p/interface' import type { AbortOptions, Multiaddr } from '@multiformats/multiaddr' import type { Source, Sink } from 'it-stream-types' @@ -69,12 +70,13 @@ export class WebRTCMultiaddrConnection implements MultiaddrConnection { this.timeline = init.timeline this.peerConnection = init.peerConnection - const initialState = this.peerConnection.connectionState + const peerConnection = this.peerConnection + const initialState = peerConnection.connectionState this.peerConnection.onconnectionstatechange = () => { - this.log.trace('peer connection state change', this.peerConnection.connectionState, 'initial state', initialState) + this.log.trace('peer connection state change', peerConnection.connectionState, 'initial state', initialState) - if (this.peerConnection.connectionState === 'disconnected' || this.peerConnection.connectionState === 'failed' || this.peerConnection.connectionState === 'closed') { + if (peerConnection.connectionState === 'disconnected' || peerConnection.connectionState === 'failed' || peerConnection.connectionState === 'closed') { // nothing else to do but close the connection this.timeline.close = Date.now() } diff --git a/packages/transport-webrtc/src/muxer.ts b/packages/transport-webrtc/src/muxer.ts index cf6a5b7588..4fc718559f 100644 --- a/packages/transport-webrtc/src/muxer.ts +++ b/packages/transport-webrtc/src/muxer.ts @@ -1,6 +1,7 @@ import { createStream } from './stream.js' import { drainAndClose, nopSink, nopSource } from './util.js' import type { DataChannelOptions } from './index.js' +import type { RTCPeerConnection, RTCDataChannel } from './webrtc/index.js' import type { ComponentLogger, Logger, Stream, CounterGroup, StreamMuxer, StreamMuxerFactory, StreamMuxerInit } from '@libp2p/interface' import type { AbortOptions } from '@multiformats/multiaddr' import type { Source, Sink } from 'it-stream-types' @@ -56,7 +57,7 @@ export class DataChannelMuxerFactory implements StreamMuxerFactory { this.metrics = init.metrics this.protocol = init.protocol ?? PROTOCOL this.dataChannelOptions = init.dataChannelOptions ?? {} - this.log = components.logger.forComponent('libp2p:webrtc:datachannelmuxerfactory') + this.log = components.logger.forComponent('libp2p:webrtc:muxerfactory') // store any datachannels opened before upgrade has been completed this.peerConnection.ondatachannel = ({ channel }) => { @@ -243,7 +244,7 @@ export class DataChannelMuxer implements StreamMuxer { sink: Sink<Source<Uint8Array | Uint8ArrayList>, Promise<void>> = nopSink newStream (): Stream { - // The spec says the label SHOULD be an empty string: https://github.com/libp2p/specs/blob/master/webrtc/README.md#rtcdatachannel-label + // The spec says the label MUST be an empty string: https://github.com/libp2p/specs/blob/master/webrtc/README.md#rtcdatachannel-label const channel = this.peerConnection.createDataChannel('') // lib-datachannel throws if `.getId` is called on a closed channel so // memoize it diff --git a/packages/transport-webrtc/src/private-to-private/util.ts b/packages/transport-webrtc/src/private-to-private/util.ts index f8e76cc2b7..be693bc1b6 100644 --- a/packages/transport-webrtc/src/private-to-private/util.ts +++ b/packages/transport-webrtc/src/private-to-private/util.ts @@ -5,6 +5,7 @@ import { isFirefox } from '../util.js' import { RTCIceCandidate } from '../webrtc/index.js' import { Message } from './pb/message.js' import type { WebRTCDialEvents } from './transport.js' +import type { RTCPeerConnection } from '../webrtc/index.js' import type { LoggerOptions, Stream } from '@libp2p/interface' import type { AbortOptions, MessageStream } from 'it-protobuf-stream' import type { DeferredPromise } from 'p-defer' diff --git a/packages/transport-webrtc/src/private-to-public/listener.browser.ts b/packages/transport-webrtc/src/private-to-public/listener.browser.ts new file mode 100644 index 0000000000..0f32f89820 --- /dev/null +++ b/packages/transport-webrtc/src/private-to-public/listener.browser.ts @@ -0,0 +1,28 @@ +import { TypedEventEmitter } from '@libp2p/interface' +import { UnimplementedError } from '../error.js' +import type { PeerId, ListenerEvents, Listener } from '@libp2p/interface' +import type { TransportManager } from '@libp2p/interface-internal' +import type { Multiaddr } from '@multiformats/multiaddr' + +export interface WebRTCDirectListenerComponents { + peerId: PeerId + transportManager: TransportManager +} + +export interface WebRTCDirectListenerInit { + shutdownController: AbortController +} + +export class WebRTCDirectListener extends TypedEventEmitter<ListenerEvents> implements Listener { + async listen (): Promise<void> { + throw new UnimplementedError('WebRTCTransport.createListener') + } + + getAddrs (): Multiaddr[] { + return [] + } + + async close (): Promise<void> { + + } +} diff --git a/packages/transport-webrtc/src/private-to-public/listener.ts b/packages/transport-webrtc/src/private-to-public/listener.ts new file mode 100644 index 0000000000..5f9c29cce3 --- /dev/null +++ b/packages/transport-webrtc/src/private-to-public/listener.ts @@ -0,0 +1,233 @@ +import { networkInterfaces } from 'node:os' +import { isIPv4, isIPv6 } from '@chainsafe/is-ip' +import { TypedEventEmitter } from '@libp2p/interface' +import { multiaddr, protocols } from '@multiformats/multiaddr' +import { IP4 } from '@multiformats/multiaddr-matcher' +import { Crypto } from '@peculiar/webcrypto' +import getPort from 'get-port' +import pWaitFor from 'p-wait-for' +import { connect } from './utils/connect.js' +import { generateTransportCertificate } from './utils/generate-certificates.js' +import { createDialerRTCPeerConnection } from './utils/get-rtcpeerconnection.js' +import { stunListener } from './utils/stun-listener.js' +import type { DataChannelOptions, TransportCertificate } from '../index.js' +import type { DirectRTCPeerConnection } from './utils/get-rtcpeerconnection.js' +import type { StunServer } from './utils/stun-listener.js' +import type { PeerId, ListenerEvents, Listener, Upgrader, ComponentLogger, Logger, CounterGroup, Metrics, PrivateKey } from '@libp2p/interface' +import type { Multiaddr } from '@multiformats/multiaddr' + +const crypto = new Crypto() + +/** + * The time to wait, in milliseconds, for the data channel handshake to complete + */ +const HANDSHAKE_TIMEOUT_MS = 10_000 + +export interface WebRTCDirectListenerComponents { + peerId: PeerId + privateKey: PrivateKey + logger: ComponentLogger + metrics?: Metrics +} + +export interface WebRTCDirectListenerInit { + upgrader: Upgrader + certificates?: TransportCertificate[] + maxInboundStreams?: number + dataChannel?: DataChannelOptions + rtcConfiguration?: RTCConfiguration | (() => RTCConfiguration | Promise<RTCConfiguration>) + useLibjuice?: boolean +} + +export interface WebRTCListenerMetrics { + listenerEvents: CounterGroup +} + +const UDP_PROTOCOL = protocols('udp') +const IP4_PROTOCOL = protocols('ip4') +const IP6_PROTOCOL = protocols('ip6') + +export class WebRTCDirectListener extends TypedEventEmitter<ListenerEvents> implements Listener { + private server?: StunServer + private readonly multiaddrs: Multiaddr[] + private certificate?: TransportCertificate + private readonly connections: Map<string, DirectRTCPeerConnection> + private readonly log: Logger + private readonly init: WebRTCDirectListenerInit + private readonly components: WebRTCDirectListenerComponents + private readonly metrics?: WebRTCListenerMetrics + + constructor (components: WebRTCDirectListenerComponents, init: WebRTCDirectListenerInit) { + super() + + this.init = init + this.components = components + this.multiaddrs = [] + this.connections = new Map() + this.log = components.logger.forComponent('libp2p:webrtc-direct:listener') + this.certificate = init.certificates?.[0] + + if (components.metrics != null) { + this.metrics = { + listenerEvents: components.metrics.registerCounterGroup('libp2p_webrtc-direct_listener_events_total', { + label: 'event', + help: 'Total count of WebRTC-direct listen events by type' + }) + } + } + } + + async listen (ma: Multiaddr): Promise<void> { + const parts = ma.stringTuples() + const ipVersion = IP4.matches(ma) ? 4 : 6 + const host = parts + .filter(([code]) => code === IP4_PROTOCOL.code) + .pop()?.[1] ?? parts + .filter(([code]) => code === IP6_PROTOCOL.code) + .pop()?.[1] + + if (host == null) { + throw new Error('IP4/6 host must be specified in webrtc-direct mulitaddr') + } + let port = parseInt(parts + .filter(([code, value]) => code === UDP_PROTOCOL.code) + .pop()?.[1] ?? '') + + if (isNaN(port)) { + throw new Error('UDP port must be specified in webrtc-direct mulitaddr') + } + + if (port === 0 && this.init.useLibjuice !== false) { + // libjuice doesn't map 0 to a random free port so we have to do it + // ourselves + port = await getPort() + } + + this.server = await stunListener(host, port, ipVersion, this.log, (ufrag, remoteHost, remotePort) => { + this.incomingConnection(ufrag, remoteHost, remotePort) + .catch(err => { + this.log.error('error processing incoming STUN request', err) + }) + }, { + useLibjuice: this.init.useLibjuice + }) + + let certificate = this.certificate + + if (certificate == null) { + const keyPair = await crypto.subtle.generateKey({ + name: 'ECDSA', + namedCurve: 'P-256' + }, true, ['sign', 'verify']) + + certificate = this.certificate = await generateTransportCertificate(keyPair, { + days: 365 + }) + } + + const address = this.server.address() + + getNetworkAddresses(address.address, address.port, ipVersion).forEach((ma) => { + this.multiaddrs.push(multiaddr(`${ma}/webrtc-direct/certhash/${certificate.certhash}`)) + }) + + this.safeDispatchEvent('listening') + } + + private async incomingConnection (ufrag: string, remoteHost: string, remotePort: number): Promise<void> { + const key = `${remoteHost}:${remotePort}:${ufrag}` + let peerConnection = this.connections.get(key) + + if (peerConnection != null) { + this.log.trace('already got peer connection for %s', key) + return + } + + this.log('create peer connection for %s', key) + + // https://github.com/libp2p/specs/blob/master/webrtc/webrtc-direct.md#browser-to-public-server + peerConnection = await createDialerRTCPeerConnection('server', ufrag, this.init.rtcConfiguration, this.certificate) + + this.connections.set(key, peerConnection) + + peerConnection.addEventListener('connectionstatechange', () => { + switch (peerConnection.connectionState) { + case 'failed': + case 'disconnected': + case 'closed': + this.connections.delete(key) + break + default: + break + } + }) + + try { + await connect(peerConnection, ufrag, { + role: 'server', + log: this.log, + logger: this.components.logger, + metrics: this.components.metrics, + events: this.metrics?.listenerEvents, + signal: AbortSignal.timeout(HANDSHAKE_TIMEOUT_MS), + remoteAddr: multiaddr(`/ip${isIPv4(remoteHost) ? 4 : 6}/${remoteHost}/udp/${remotePort}`), + dataChannel: this.init.dataChannel, + upgrader: this.init.upgrader, + peerId: this.components.peerId, + privateKey: this.components.privateKey + }) + } catch (err) { + peerConnection.close() + throw err + } + } + + getAddrs (): Multiaddr[] { + return this.multiaddrs + } + + async close (): Promise<void> { + for (const connection of this.connections.values()) { + connection.close() + } + + await this.server?.close() + + // RTCPeerConnections will be removed from the connections map when their + // connection state changes to 'closed'/'disconnected'/'failed + await pWaitFor(() => { + return this.connections.size === 0 + }) + + this.safeDispatchEvent('close') + } +} + +function getNetworkAddresses (host: string, port: number, version: 4 | 6): string[] { + if (host === '0.0.0.0' || host === '::1') { + // return all ip4 interfaces + return Object.entries(networkInterfaces()) + .flatMap(([_, addresses]) => addresses) + .map(address => address?.address) + .filter(address => { + if (address == null) { + return false + } + + if (version === 4) { + return isIPv4(address) + } + + if (version === 6) { + return isIPv6(address) + } + + return false + }) + .map(address => `/ip${version}/${address}/udp/${port}`) + } + + return [ + `/ip${version}/${host}/udp/${port}` + ] +} diff --git a/packages/transport-webrtc/src/private-to-public/options.ts b/packages/transport-webrtc/src/private-to-public/options.ts deleted file mode 100644 index 69b6b74b21..0000000000 --- a/packages/transport-webrtc/src/private-to-public/options.ts +++ /dev/null @@ -1,4 +0,0 @@ -import type { CreateListenerOptions, DialTransportOptions } from '@libp2p/interface' - -export interface WebRTCListenerOptions extends CreateListenerOptions {} -export interface WebRTCDialOptions extends DialTransportOptions {} diff --git a/packages/transport-webrtc/src/pb/message.proto b/packages/transport-webrtc/src/private-to-public/pb/message.proto similarity index 100% rename from packages/transport-webrtc/src/pb/message.proto rename to packages/transport-webrtc/src/private-to-public/pb/message.proto diff --git a/packages/transport-webrtc/src/pb/message.ts b/packages/transport-webrtc/src/private-to-public/pb/message.ts similarity index 100% rename from packages/transport-webrtc/src/pb/message.ts rename to packages/transport-webrtc/src/private-to-public/pb/message.ts diff --git a/packages/transport-webrtc/src/private-to-public/sdp.ts b/packages/transport-webrtc/src/private-to-public/sdp.ts deleted file mode 100644 index 496f221571..0000000000 --- a/packages/transport-webrtc/src/private-to-public/sdp.ts +++ /dev/null @@ -1,159 +0,0 @@ -import { InvalidParametersError } from '@libp2p/interface' -import { type Multiaddr } from '@multiformats/multiaddr' -import { bases, digest } from 'multiformats/basics' -import { InvalidFingerprintError, UnsupportedHashAlgorithmError } from '../error.js' -import { MAX_MESSAGE_SIZE } from '../stream.js' -import { CERTHASH_CODE } from './transport.js' -import type { LoggerOptions } from '@libp2p/interface' -import type { MultihashDigest } from 'multiformats/hashes/interface' - -/** - * Get base2 | identity decoders - */ -// @ts-expect-error - Not easy to combine these types. -export const mbdecoder: any = Object.values(bases).map(b => b.decoder).reduce((d, b) => d.or(b)) - -export function getLocalFingerprint (pc: RTCPeerConnection, options: LoggerOptions): string | undefined { - // try to fetch fingerprint from local certificate - const localCert = pc.getConfiguration().certificates?.at(0) - if (localCert?.getFingerprints == null) { - options.log.trace('fetching fingerprint from local SDP') - const localDescription = pc.localDescription - if (localDescription == null) { - return undefined - } - return getFingerprintFromSdp(localDescription.sdp) - } - - options.log.trace('fetching fingerprint from local certificate') - - if (localCert.getFingerprints().length === 0) { - return undefined - } - - const fingerprint = localCert.getFingerprints()[0].value - if (fingerprint == null) { - throw new InvalidFingerprintError('', 'no fingerprint on local certificate') - } - - return fingerprint -} - -const fingerprintRegex = /^a=fingerprint:(?:\w+-[0-9]+)\s(?<fingerprint>(:?[0-9a-fA-F]{2})+)$/m -export function getFingerprintFromSdp (sdp: string): string | undefined { - const searchResult = sdp.match(fingerprintRegex) - return searchResult?.groups?.fingerprint -} - -/** - * Get base2 | identity decoders - */ -function ipv (ma: Multiaddr): string { - for (const proto of ma.protoNames()) { - if (proto.startsWith('ip')) { - return proto.toUpperCase() - } - } - - return 'IP6' -} - -// Extract the certhash from a multiaddr -export function certhash (ma: Multiaddr): string { - const tups = ma.stringTuples() - const certhash = tups.filter((tup) => tup[0] === CERTHASH_CODE).map((tup) => tup[1])[0] - - if (certhash === undefined || certhash === '') { - throw new InvalidParametersError(`Couldn't find a certhash component of multiaddr: ${ma.toString()}`) - } - - return certhash -} - -/** - * Convert a certhash into a multihash - */ -export function decodeCerthash (certhash: string): MultihashDigest { - return digest.decode(mbdecoder.decode(certhash)) -} - -/** - * Extract the fingerprint from a multiaddr - */ -export function ma2Fingerprint (ma: Multiaddr): string[] { - const mhdecoded = decodeCerthash(certhash(ma)) - const prefix = toSupportedHashFunction(mhdecoded.code) - const fingerprint = mhdecoded.digest.reduce((str, byte) => str + byte.toString(16).padStart(2, '0'), '') - const sdp = fingerprint.match(/.{1,2}/g) - - if (sdp == null) { - throw new InvalidFingerprintError(fingerprint, ma.toString()) - } - - return [`${prefix} ${sdp.join(':').toUpperCase()}`, fingerprint] -} - -/** - * Normalize the hash name from a given multihash has name - */ -export function toSupportedHashFunction (code: number): 'SHA-1' | 'SHA-256' | 'SHA-512' { - switch (code) { - case 0x11: - return 'SHA-1' - case 0x12: - return 'SHA-256' - case 0x13: - return 'SHA-512' - default: - throw new UnsupportedHashAlgorithmError(code) - } -} - -/** - * Convert a multiaddr into a SDP - */ -function ma2sdp (ma: Multiaddr, ufrag: string): string { - const { host, port } = ma.toOptions() - const ipVersion = ipv(ma) - const [CERTFP] = ma2Fingerprint(ma) - - return `v=0 -o=- 0 0 IN ${ipVersion} ${host} -s=- -c=IN ${ipVersion} ${host} -t=0 0 -a=ice-lite -m=application ${port} UDP/DTLS/SCTP webrtc-datachannel -a=mid:0 -a=setup:passive -a=ice-ufrag:${ufrag} -a=ice-pwd:${ufrag} -a=fingerprint:${CERTFP} -a=sctp-port:5000 -a=max-message-size:${MAX_MESSAGE_SIZE} -a=candidate:1467250027 1 UDP 1467250027 ${host} ${port} typ host\r\n` -} - -/** - * Create an answer SDP from a multiaddr - */ -export function fromMultiAddr (ma: Multiaddr, ufrag: string): RTCSessionDescriptionInit { - return { - type: 'answer', - sdp: ma2sdp(ma, ufrag) - } -} - -/** - * Replace (munge) the ufrag and password values in a SDP - */ -export function munge (desc: RTCSessionDescriptionInit, ufrag: string): RTCSessionDescriptionInit { - if (desc.sdp === undefined) { - throw new InvalidParametersError("Can't munge a missing SDP") - } - - desc.sdp = desc.sdp - .replace(/\na=ice-ufrag:[^\n]*\n/, '\na=ice-ufrag:' + ufrag + '\n') - .replace(/\na=ice-pwd:[^\n]*\n/, '\na=ice-pwd:' + ufrag + '\n') - return desc -} diff --git a/packages/transport-webrtc/src/private-to-public/transport.ts b/packages/transport-webrtc/src/private-to-public/transport.ts index 1e6dd936eb..13e15ca957 100644 --- a/packages/transport-webrtc/src/private-to-public/transport.ts +++ b/packages/transport-webrtc/src/private-to-public/transport.ts @@ -1,22 +1,16 @@ -import { noise } from '@chainsafe/libp2p-noise' -import { transportSymbol, serviceCapabilities, InvalidParametersError } from '@libp2p/interface' +import { serviceCapabilities, transportSymbol } from '@libp2p/interface' import { peerIdFromString } from '@libp2p/peer-id' import { protocols } from '@multiformats/multiaddr' import { WebRTCDirect } from '@multiformats/multiaddr-matcher' -import * as Digest from 'multiformats/hashes/digest' -import { concat } from 'uint8arrays/concat' -import { fromString as uint8arrayFromString } from 'uint8arrays/from-string' -import { DataChannelError, UnimplementedError } from '../error.js' -import { WebRTCMultiaddrConnection } from '../maconn.js' -import { DataChannelMuxerFactory } from '../muxer.js' -import { createStream } from '../stream.js' -import { getRtcConfiguration, isFirefox } from '../util.js' -import { RTCPeerConnection } from '../webrtc/index.js' -import * as sdp from './sdp.js' -import { genUfrag } from './util.js' -import type { WebRTCDialOptions } from './options.js' -import type { DataChannelOptions } from '../index.js' -import type { CreateListenerOptions, Transport, Listener, ComponentLogger, Logger, Connection, CounterGroup, Metrics, PeerId, PrivateKey } from '@libp2p/interface' +import { raceSignal } from 'race-signal' +import { genUfrag } from '../util.js' +import { WebRTCDirectListener } from './listener.js' +import { connect } from './utils/connect.js' +import { createDialerRTCPeerConnection } from './utils/get-rtcpeerconnection.js' +import type { DataChannelOptions, TransportCertificate } from '../index.js' +import type { WebRTCDialEvents } from '../private-to-private/transport.js' +import type { CreateListenerOptions, Transport, Listener, ComponentLogger, Logger, Connection, CounterGroup, Metrics, PeerId, DialTransportOptions, PrivateKey } from '@libp2p/interface' +import type { TransportManager } from '@libp2p/interface-internal' import type { Multiaddr } from '@multiformats/multiaddr' /** @@ -46,6 +40,7 @@ export interface WebRTCDirectTransportComponents { privateKey: PrivateKey metrics?: Metrics logger: ComponentLogger + transportManager: TransportManager } export interface WebRTCMetrics { @@ -55,6 +50,8 @@ export interface WebRTCMetrics { export interface WebRTCTransportDirectInit { rtcConfiguration?: RTCConfiguration | (() => RTCConfiguration | Promise<RTCConfiguration>) dataChannel?: DataChannelOptions + certificates?: TransportCertificate[] + useLibjuice?: boolean } export class WebRTCDirectTransport implements Transport { @@ -62,10 +59,12 @@ export class WebRTCDirectTransport implements Transport { private readonly metrics?: WebRTCMetrics private readonly components: WebRTCDirectTransportComponents private readonly init: WebRTCTransportDirectInit + constructor (components: WebRTCDirectTransportComponents, init: WebRTCTransportDirectInit = {}) { this.log = components.logger.forComponent('libp2p:webrtc-direct') this.components = components this.init = init + if (components.metrics != null) { this.metrics = { dialerEvents: components.metrics.registerCounterGroup('libp2p_webrtc-direct_dialer_events_total', { @@ -87,7 +86,8 @@ export class WebRTCDirectTransport implements Transport { /** * Dial a given multiaddr */ - async dial (ma: Multiaddr, options: WebRTCDialOptions): Promise<Connection> { + async dial (ma: Multiaddr, options: DialTransportOptions<WebRTCDialEvents>): Promise<Connection> { + options?.signal?.throwIfAborted() const rawConn = await this._connect(ma, options) this.log('dialing address: %a', ma) return rawConn @@ -97,7 +97,10 @@ export class WebRTCDirectTransport implements Transport { * Create transport listeners no supported by browsers */ createListener (options: CreateListenerOptions): Listener { - throw new UnimplementedError('WebRTCTransport.createListener') + return new WebRTCDirectListener(this.components, { + ...this.init, + ...options + }) } /** @@ -117,182 +120,36 @@ export class WebRTCDirectTransport implements Transport { /** * Connect to a peer using a multiaddr */ - async _connect (ma: Multiaddr, options: WebRTCDialOptions): Promise<Connection> { - const controller = new AbortController() - const signal = controller.signal - - let remotePeer: PeerId | undefined + async _connect (ma: Multiaddr, options: DialTransportOptions<WebRTCDialEvents>): Promise<Connection> { + let theirPeerId: PeerId | undefined const remotePeerString = ma.getPeerId() if (remotePeerString != null) { - remotePeer = peerIdFromString(remotePeerString) + theirPeerId = peerIdFromString(remotePeerString) } - const remoteCerthash = sdp.decodeCerthash(sdp.certhash(ma)) + const ufrag = genUfrag() - // ECDSA is preferred over RSA here. From our testing we find that P-256 elliptic - // curve is supported by Pion, webrtc-rs, as well as Chromium (P-228 and P-384 - // was not supported in Chromium). We use the same hash function as found in the - // multiaddr if it is supported. - const certificate = await RTCPeerConnection.generateCertificate({ - name: 'ECDSA', - namedCurve: 'P-256', - hash: sdp.toSupportedHashFunction(remoteCerthash.code) - } as any) - - const peerConnection = new RTCPeerConnection({ - ...(await getRtcConfiguration(this.init.rtcConfiguration)), - certificates: [certificate] - }) + // https://github.com/libp2p/specs/blob/master/webrtc/webrtc-direct.md#browser-to-public-server + const peerConnection = await createDialerRTCPeerConnection('client', ufrag, typeof this.init.rtcConfiguration === 'function' ? await this.init.rtcConfiguration() : this.init.rtcConfiguration ?? {}) try { - // create data channel for running the noise handshake. Once the data channel is opened, - // the remote will initiate the noise handshake. This is used to confirm the identity of - // the peer. - const dataChannelOpenPromise = new Promise<RTCDataChannel>((resolve, reject) => { - const handshakeDataChannel = peerConnection.createDataChannel('', { negotiated: true, id: 0 }) - const handshakeTimeout = setTimeout(() => { - const error = `Data channel was never opened: state: ${handshakeDataChannel.readyState}` - this.log.error(error) - this.metrics?.dialerEvents.increment({ open_error: true }) - reject(new DataChannelError('data', error)) - }, HANDSHAKE_TIMEOUT_MS) - - handshakeDataChannel.onopen = (_) => { - clearTimeout(handshakeTimeout) - resolve(handshakeDataChannel) - } - - // ref: https://developer.mozilla.org/en-US/docs/Web/API/RTCDataChannel/error_event - handshakeDataChannel.onerror = (event: Event) => { - clearTimeout(handshakeTimeout) - const errorTarget = event.target?.toString() ?? 'not specified' - const error = `Error opening a data channel for handshaking: ${errorTarget}` - this.log.error(error) - // NOTE: We use unknown error here but this could potentially be considered a reset by some standards. - this.metrics?.dialerEvents.increment({ unknown_error: true }) - reject(new DataChannelError('data', error)) - } - }) - - const ufrag = 'libp2p+webrtc+v1/' + genUfrag(32) - - // Create offer and munge sdp with ufrag == pwd. This allows the remote to - // respond to STUN messages without performing an actual SDP exchange. - // This is because it can infer the passwd field by reading the USERNAME - // attribute of the STUN message. - const offerSdp = await peerConnection.createOffer() - const mungedOfferSdp = sdp.munge(offerSdp, ufrag) - await peerConnection.setLocalDescription(mungedOfferSdp) - - // construct answer sdp from multiaddr and ufrag - const answerSdp = sdp.fromMultiAddr(ma, ufrag) - await peerConnection.setRemoteDescription(answerSdp) - - // wait for peerconnection.onopen to fire, or for the datachannel to open - const handshakeDataChannel = await dataChannelOpenPromise - - // Do noise handshake. - // Set the Noise Prologue to libp2p-webrtc-noise:<FINGERPRINTS> before starting the actual Noise handshake. - // <FINGERPRINTS> is the concatenation of the of the two TLS fingerprints of A and B in their multihash byte representation, sorted in ascending order. - const fingerprintsPrologue = this.generateNoisePrologue(peerConnection, remoteCerthash.code, ma) - - // Since we use the default crypto interface and do not use a static key or early data, - // we pass in undefined for these parameters. - const connectionEncrypter = noise({ prologueBytes: fingerprintsPrologue })(this.components) - - const wrappedChannel = createStream({ - channel: handshakeDataChannel, - direction: 'inbound', + return await raceSignal(connect(peerConnection, ufrag, { + role: 'client', + log: this.log, logger: this.components.logger, - ...(this.init.dataChannel ?? {}) - }) - const wrappedDuplex = { - ...wrappedChannel, - sink: wrappedChannel.sink.bind(wrappedChannel), - source: (async function * () { - for await (const list of wrappedChannel.source) { - for (const buf of list) { - yield buf - } - } - }()) - } - - // Creating the connection before completion of the noise - // handshake ensures that the stream opening callback is set up - const maConn = new WebRTCMultiaddrConnection(this.components, { - peerConnection, + metrics: this.components.metrics, + events: this.metrics?.dialerEvents, + signal: options.signal ?? AbortSignal.timeout(HANDSHAKE_TIMEOUT_MS), remoteAddr: ma, - timeline: { - open: Date.now() - }, - metrics: this.metrics?.dialerEvents - }) - - const eventListeningName = isFirefox ? 'iceconnectionstatechange' : 'connectionstatechange' - - peerConnection.addEventListener(eventListeningName, () => { - switch (peerConnection.connectionState) { - case 'failed': - case 'disconnected': - case 'closed': - maConn.close().catch((err) => { - this.log.error('error closing connection', err) - }).finally(() => { - // Remove the event listener once the connection is closed - controller.abort() - }) - break - default: - break - } - }, { signal }) - - // Track opened peer connection - this.metrics?.dialerEvents.increment({ peer_connection: true }) - - const muxerFactory = new DataChannelMuxerFactory(this.components, { - peerConnection, - metrics: this.metrics?.dialerEvents, - dataChannelOptions: this.init.dataChannel - }) - - // For outbound connections, the remote is expected to start the noise handshake. - // Therefore, we need to secure an inbound noise connection from the remote. - await connectionEncrypter.secureInbound(wrappedDuplex, { - signal, - remotePeer - }) - - return await options.upgrader.upgradeOutbound(maConn, { skipProtection: true, skipEncryption: true, muxerFactory }) + dataChannel: this.init.dataChannel, + upgrader: options.upgrader, + peerId: this.components.peerId, + remotePeerId: theirPeerId, + privateKey: this.components.privateKey + }), options.signal) } catch (err) { peerConnection.close() throw err } } - - /** - * Generate a noise prologue from the peer connection's certificate. - * noise prologue = bytes('libp2p-webrtc-noise:') + noise-responder fingerprint + noise-initiator fingerprint - */ - private generateNoisePrologue (pc: RTCPeerConnection, hashCode: number, ma: Multiaddr): Uint8Array { - if (pc.getConfiguration().certificates?.length === 0) { - throw new InvalidParametersError('no local certificate') - } - - const localFingerprint = sdp.getLocalFingerprint(pc, { - log: this.log - }) - if (localFingerprint == null) { - throw new InvalidParametersError('no local fingerprint found') - } - - const localFpString = localFingerprint.trim().toLowerCase().replaceAll(':', '') - const localFpArray = uint8arrayFromString(localFpString, 'hex') - const local = Digest.create(hashCode, localFpArray) - const remote: Uint8Array = sdp.mbdecoder.decode(sdp.certhash(ma)) - const prefix = uint8arrayFromString('libp2p-webrtc-noise:') - - return concat([prefix, local.bytes, remote]) - } } diff --git a/packages/transport-webrtc/src/private-to-public/util.ts b/packages/transport-webrtc/src/private-to-public/util.ts deleted file mode 100644 index 31858d5888..0000000000 --- a/packages/transport-webrtc/src/private-to-public/util.ts +++ /dev/null @@ -1,2 +0,0 @@ -const charset = Array.from('ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/') -export const genUfrag = (len: number): string => [...Array(len)].map(() => charset.at(Math.floor(Math.random() * charset.length))).join('') diff --git a/packages/transport-webrtc/src/private-to-public/utils/connect.ts b/packages/transport-webrtc/src/private-to-public/utils/connect.ts new file mode 100644 index 0000000000..df98b39ca5 --- /dev/null +++ b/packages/transport-webrtc/src/private-to-public/utils/connect.ts @@ -0,0 +1,192 @@ +import { noise } from '@chainsafe/libp2p-noise' +import { raceEvent } from 'race-event' +import { WebRTCTransportError } from '../../error.js' +import { WebRTCMultiaddrConnection } from '../../maconn.js' +import { DataChannelMuxerFactory } from '../../muxer.js' +import { createStream } from '../../stream.js' +import { isFirefox } from '../../util.js' +import { generateNoisePrologue } from './generate-noise-prologue.js' +import * as sdp from './sdp.js' +import type { DirectRTCPeerConnection } from './get-rtcpeerconnection.js' +import type { DataChannelOptions } from '../../index.js' +import type { ComponentLogger, Connection, CounterGroup, Logger, Metrics, PeerId, PrivateKey, Upgrader } from '@libp2p/interface' +import type { Multiaddr } from '@multiformats/multiaddr' + +export interface ConnectOptions { + log: Logger + logger: ComponentLogger + metrics?: Metrics + events?: CounterGroup + remoteAddr: Multiaddr + role: 'client' | 'server' + dataChannel?: DataChannelOptions + upgrader: Upgrader + peerId: PeerId + remotePeerId?: PeerId + signal: AbortSignal + privateKey: PrivateKey +} + +export interface ClientOptions extends ConnectOptions { + role: 'client' +} + +export interface ServerOptions extends ConnectOptions { + role: 'server' +} + +const CONNECTION_STATE_CHANGE_EVENT = isFirefox ? 'iceconnectionstatechange' : 'connectionstatechange' + +export async function connect (peerConnection: DirectRTCPeerConnection, ufrag: string, options: ClientOptions): Promise<Connection> +export async function connect (peerConnection: DirectRTCPeerConnection, ufrag: string, options: ServerOptions): Promise<void> +export async function connect (peerConnection: DirectRTCPeerConnection, ufrag: string, options: ConnectOptions): Promise<any> { + // create data channel for running the noise handshake. Once the data + // channel is opened, the remote will initiate the noise handshake. This + // is used to confirm the identity of the peer. + const handshakeDataChannel = peerConnection.createDataChannel('', { negotiated: true, id: 0 }) + + if (options.role === 'client') { + // the client has to set the local offer before the remote answer + + // Create offer and munge sdp with ufrag == pwd. This allows the remote to + // respond to STUN messages without performing an actual SDP exchange. + // This is because it can infer the passwd field by reading the USERNAME + // attribute of the STUN message. + options.log.trace('client creating local offer') + const offerSdp = await peerConnection.createOffer() + options.log.trace('client created local offer %s', offerSdp.sdp) + const mungedOfferSdp = sdp.munge(offerSdp, ufrag) + options.log.trace('client setting local offer %s', mungedOfferSdp.sdp) + await peerConnection.setLocalDescription(mungedOfferSdp) + + const answerSdp = sdp.serverAnswerFromMultiaddr(options.remoteAddr, ufrag) + options.log.trace('client setting server description %s', answerSdp.sdp) + await peerConnection.setRemoteDescription(answerSdp) + } else { + // the server has to set the remote offer before the local answer + const offerSdp = sdp.clientOfferFromMultiAddr(options.remoteAddr, ufrag) + options.log.trace('server setting client %s %s', offerSdp.type, offerSdp.sdp) + await peerConnection.setRemoteDescription(offerSdp) + + // Create offer and munge sdp with ufrag == pwd. This allows the remote to + // respond to STUN messages without performing an actual SDP exchange. + // This is because it can infer the passwd field by reading the USERNAME + // attribute of the STUN message. + options.log.trace('server creating local answer') + const answerSdp = await peerConnection.createAnswer() + options.log.trace('server created local answer') + const mungedAnswerSdp = sdp.munge(answerSdp, ufrag) + options.log.trace('server setting local description %s', answerSdp.sdp) + await peerConnection.setLocalDescription(mungedAnswerSdp) + } + + options.log.trace('%s wait for handshake channel to open', options.role) + await raceEvent(handshakeDataChannel, 'open', options.signal) + + options.log.trace('%s handshake channel opened', options.role) + + if (options.role === 'server') { + // now that the connection has been opened, add the remote's certhash to + // it's multiaddr so we can complete the noise handshake + const remoteFingerprint = peerConnection.remoteFingerprint()?.value ?? '' + options.remoteAddr = options.remoteAddr.encapsulate(sdp.fingerprint2Ma(remoteFingerprint)) + } + + // Do noise handshake. + // Set the Noise Prologue to libp2p-webrtc-noise:<FINGERPRINTS> before + // starting the actual Noise handshake. + // <FINGERPRINTS> is the concatenation of the of the two TLS fingerprints + // of A (responder) and B (initiator) in their byte representation. + const localFingerprint = sdp.getFingerprintFromSdp(peerConnection.localDescription?.sdp) + + if (localFingerprint == null) { + throw new WebRTCTransportError('Could not get fingerprint from local description sdp') + } + + options.log.trace('%s performing noise handshake', options.role) + const noisePrologue = generateNoisePrologue(localFingerprint, options.remoteAddr, options.role) + + // Since we use the default crypto interface and do not use a static key + // or early data, we pass in undefined for these parameters. + const connectionEncrypter = noise({ prologueBytes: noisePrologue })(options) + + const wrappedChannel = createStream({ + channel: handshakeDataChannel, + direction: 'inbound', + logger: options.logger, + ...(options.dataChannel ?? {}) + }) + const wrappedDuplex = { + ...wrappedChannel, + sink: wrappedChannel.sink.bind(wrappedChannel), + source: (async function * () { + for await (const list of wrappedChannel.source) { + for (const buf of list) { + yield buf + } + } + }()) + } + + // Creating the connection before completion of the noise + // handshake ensures that the stream opening callback is set up + const maConn = new WebRTCMultiaddrConnection(options, { + peerConnection, + remoteAddr: options.remoteAddr, + timeline: { + open: Date.now() + }, + metrics: options.events + }) + + peerConnection.addEventListener(CONNECTION_STATE_CHANGE_EVENT, () => { + switch (peerConnection.connectionState) { + case 'failed': + case 'disconnected': + case 'closed': + maConn.close().catch((err) => { + options.log.error('error closing connection', err) + }) + break + default: + break + } + }) + + // Track opened peer connection + options.events?.increment({ peer_connection: true }) + + const muxerFactory = new DataChannelMuxerFactory(options, { + peerConnection, + metrics: options.events, + dataChannelOptions: options.dataChannel + }) + + if (options.role === 'client') { + // For outbound connections, the remote is expected to start the noise handshake. + // Therefore, we need to secure an inbound noise connection from the remote. + options.log.trace('%s secure inbound', options.role) + await connectionEncrypter.secureInbound(wrappedDuplex, { + remotePeer: options.remotePeerId + }) + + options.log.trace('%s upgrade outbound', options.role) + return options.upgrader.upgradeOutbound(maConn, { skipProtection: true, skipEncryption: true, muxerFactory }) + } + + // For inbound connections, we are expected to start the noise handshake. + // Therefore, we need to secure an outbound noise connection from the remote. + options.log.trace('%s secure outbound', options.role) + const result = await connectionEncrypter.secureOutbound(wrappedDuplex, { + remotePeer: options.remotePeerId + }) + maConn.remoteAddr = maConn.remoteAddr.encapsulate(`/p2p/${result.remotePeer}`) + + options.log.trace('%s upgrade inbound', options.role) + + await options.upgrader.upgradeInbound(maConn, { + skipProtection: true, + skipEncryption: true, + muxerFactory + }) +} diff --git a/packages/transport-webrtc/src/private-to-public/utils/generate-certificates.browser.ts b/packages/transport-webrtc/src/private-to-public/utils/generate-certificates.browser.ts new file mode 100644 index 0000000000..b0c8b6ad6c --- /dev/null +++ b/packages/transport-webrtc/src/private-to-public/utils/generate-certificates.browser.ts @@ -0,0 +1,3 @@ +export async function generateWebTransportCertificate (): Promise<any> { + throw new Error('Not implemented') +} diff --git a/packages/transport-webrtc/src/private-to-public/utils/generate-certificates.ts b/packages/transport-webrtc/src/private-to-public/utils/generate-certificates.ts new file mode 100644 index 0000000000..1e4008b327 --- /dev/null +++ b/packages/transport-webrtc/src/private-to-public/utils/generate-certificates.ts @@ -0,0 +1,51 @@ +import { Crypto } from '@peculiar/webcrypto' +import * as x509 from '@peculiar/x509' +import { base64url } from 'multiformats/bases/base64' +import { sha256 } from 'multiformats/hashes/sha2' +import { toString as uint8ArrayToString } from 'uint8arrays/to-string' +import type { TransportCertificate } from '../..' + +const crypto = new Crypto() +x509.cryptoProvider.set(crypto) + +const ONE_DAY_MS = 86400000 + +export interface GenerateTransportCertificateOptions { + days: number + start?: Date + extensions?: any[] +} + +export async function generateTransportCertificate (keyPair: CryptoKeyPair, options: GenerateTransportCertificateOptions): Promise<TransportCertificate> { + const notBefore = options.start ?? new Date() + notBefore.setMilliseconds(0) + const notAfter = new Date(notBefore.getTime() + (options.days * ONE_DAY_MS)) + notAfter.setMilliseconds(0) + + const cert = await x509.X509CertificateGenerator.createSelfSigned({ + serialNumber: (BigInt(Math.random().toString().replace('.', '')) * 100000n).toString(16), + name: 'CN=ca.com, C=US, L=CA, O=example, ST=CA', + notBefore, + notAfter, + signingAlgorithm: { + name: 'ECDSA' + }, + keys: keyPair, + extensions: [ + new x509.BasicConstraintsExtension(false, undefined, true) + ] + }) + + const exported = await crypto.subtle.exportKey('pkcs8', keyPair.privateKey) + const privateKeyPem = [ + '-----BEGIN PRIVATE KEY-----', + ...uint8ArrayToString(new Uint8Array(exported), 'base64pad').split(/(.{64})/).filter(Boolean), + '-----END PRIVATE KEY-----' + ].join('\n') + + return { + privateKey: privateKeyPem, + pem: cert.toString('pem'), + certhash: base64url.encode((await sha256.digest(new Uint8Array(cert.rawData))).bytes) + } +} diff --git a/packages/transport-webrtc/src/private-to-public/utils/generate-noise-prologue.ts b/packages/transport-webrtc/src/private-to-public/utils/generate-noise-prologue.ts new file mode 100644 index 0000000000..47c54f143d --- /dev/null +++ b/packages/transport-webrtc/src/private-to-public/utils/generate-noise-prologue.ts @@ -0,0 +1,26 @@ +import * as Digest from 'multiformats/hashes/digest' +import { sha256 } from 'multiformats/hashes/sha2' +import { concat } from 'uint8arrays/concat' +import { fromString as uint8arrayFromString } from 'uint8arrays/from-string' +import * as sdp from './sdp.js' +import type { Multiaddr } from '@multiformats/multiaddr' + +const PREFIX = uint8arrayFromString('libp2p-webrtc-noise:') + +/** + * Generate a noise prologue from the peer connection's certificate. + * noise prologue = bytes('libp2p-webrtc-noise:') + noise-server fingerprint + noise-client fingerprint + */ +export function generateNoisePrologue (localFingerprint: string, remoteAddr: Multiaddr, role: 'client' | 'server'): Uint8Array { + const localFpString = localFingerprint.trim().toLowerCase().replaceAll(':', '') + const localFpArray = uint8arrayFromString(localFpString, 'hex') + const local = Digest.create(sha256.code, localFpArray) + const remote: Uint8Array = sdp.mbdecoder.decode(sdp.certhash(remoteAddr)) + const byteLength = PREFIX.byteLength + local.bytes.byteLength + remote.byteLength + + if (role === 'server') { + return concat([PREFIX, remote, local.bytes], byteLength) + } + + return concat([PREFIX, local.bytes, remote], byteLength) +} diff --git a/packages/transport-webrtc/src/private-to-public/utils/get-rtcpeerconnection.browser.ts b/packages/transport-webrtc/src/private-to-public/utils/get-rtcpeerconnection.browser.ts new file mode 100644 index 0000000000..cf3c8cac73 --- /dev/null +++ b/packages/transport-webrtc/src/private-to-public/utils/get-rtcpeerconnection.browser.ts @@ -0,0 +1,22 @@ +export async function createDialerRTCPeerConnection (role: 'client' | 'server', ufrag: string, rtcConfiguration?: RTCConfiguration | (() => RTCConfiguration | Promise<RTCConfiguration>), certificate?: RTCCertificate): Promise<RTCPeerConnection> { + if (certificate == null) { + // ECDSA is preferred over RSA here. From our testing we find that P-256 elliptic + // curve is supported by Pion, webrtc-rs, as well as Chromium (P-228 and P-384 + // was not supported in Chromium). We use the same hash function as found in the + // multiaddr if it is supported. + certificate = await RTCPeerConnection.generateCertificate({ + name: 'ECDSA', + + // @ts-expect-error missing from lib.dom.d.ts but required by chrome + namedCurve: 'P-256' + // hash: sdp.toSupportedHashFunction(hashName) + }) + } + + const rtcConfig = typeof rtcConfiguration === 'function' ? await rtcConfiguration() : rtcConfiguration + + return new RTCPeerConnection({ + ...(rtcConfig ?? {}), + certificates: [certificate] + }) +} diff --git a/packages/transport-webrtc/src/private-to-public/utils/get-rtcpeerconnection.ts b/packages/transport-webrtc/src/private-to-public/utils/get-rtcpeerconnection.ts new file mode 100644 index 0000000000..c69603c81b --- /dev/null +++ b/packages/transport-webrtc/src/private-to-public/utils/get-rtcpeerconnection.ts @@ -0,0 +1,108 @@ +import { PeerConnection } from '@ipshipyard/node-datachannel' +import { RTCPeerConnection } from '@ipshipyard/node-datachannel/polyfill' +import { Crypto } from '@peculiar/webcrypto' +import { DEFAULT_ICE_SERVERS } from '../../constants.js' +import { MAX_MESSAGE_SIZE } from '../../stream.js' +import { generateTransportCertificate } from './generate-certificates.js' +import type { TransportCertificate } from '../../index.js' +import type { CertificateFingerprint } from '@ipshipyard/node-datachannel' + +const crypto = new Crypto() + +interface DirectRTCPeerConnectionInit extends RTCConfiguration { + ufrag: string + peerConnection: PeerConnection +} + +export class DirectRTCPeerConnection extends RTCPeerConnection { + private readonly peerConnection: PeerConnection + private readonly ufrag: string + + constructor (init: DirectRTCPeerConnectionInit) { + super(init) + + this.peerConnection = init.peerConnection + this.ufrag = init.ufrag + } + + async createOffer (): Promise<globalThis.RTCSessionDescriptionInit | any> { + // have to set ufrag before creating offer + if (this.connectionState === 'new') { + this.peerConnection?.setLocalDescription('offer', { + iceUfrag: this.ufrag, + icePwd: this.ufrag + }) + } + + return super.createOffer() + } + + async createAnswer (): Promise<globalThis.RTCSessionDescriptionInit | any> { + // have to set ufrag before creating answer + if (this.connectionState === 'new') { + this.peerConnection?.setLocalDescription('answer', { + iceUfrag: this.ufrag, + icePwd: this.ufrag + }) + } + + return super.createAnswer() + } + + remoteFingerprint (): CertificateFingerprint { + if (this.peerConnection == null) { + throw new Error('Invalid state: peer connection not set') + } + + return this.peerConnection.remoteFingerprint() + } +} + +function mapIceServers (iceServers?: RTCIceServer[]): string[] { + return iceServers + ?.map((server) => { + const urls = Array.isArray(server.urls) ? server.urls : [server.urls] + + return urls.map((url) => { + if (server.username != null && server.credential != null) { + const [protocol, rest] = url.split(/:(.*)/) + return `${protocol}:${server.username}:${server.credential}@${rest}` + } + return url + }) + }) + .flat() ?? [] +} + +export async function createDialerRTCPeerConnection (role: 'client' | 'server', ufrag: string, rtcConfiguration?: RTCConfiguration | (() => RTCConfiguration | Promise<RTCConfiguration>), certificate?: TransportCertificate): Promise<DirectRTCPeerConnection> { + if (certificate == null) { + // ECDSA is preferred over RSA here. From our testing we find that P-256 + // elliptic curve is supported by Pion, webrtc-rs, as well as Chromium + // (P-228 and P-384 was not supported in Chromium). We use the same hash + // function as found in the multiaddr if it is supported. + const keyPair = await crypto.subtle.generateKey({ + name: 'ECDSA', + namedCurve: 'P-256' + }, true, ['sign', 'verify']) + + certificate = await generateTransportCertificate(keyPair, { + days: 365 + }) + } + + const rtcConfig = typeof rtcConfiguration === 'function' ? await rtcConfiguration() : rtcConfiguration + + return new DirectRTCPeerConnection({ + ...rtcConfig, + ufrag, + peerConnection: new PeerConnection(`${role}-${Date.now()}`, { + disableFingerprintVerification: true, + disableAutoNegotiation: true, + certificatePemFile: certificate.pem, + keyPemFile: certificate.privateKey, + enableIceUdpMux: role === 'server', + maxMessageSize: MAX_MESSAGE_SIZE, + iceServers: mapIceServers(rtcConfig?.iceServers ?? DEFAULT_ICE_SERVERS.map(urls => ({ urls }))) + }) + }) +} diff --git a/packages/transport-webrtc/src/private-to-public/utils/sdp.ts b/packages/transport-webrtc/src/private-to-public/utils/sdp.ts new file mode 100644 index 0000000000..7d9d2147be --- /dev/null +++ b/packages/transport-webrtc/src/private-to-public/utils/sdp.ts @@ -0,0 +1,174 @@ +import { InvalidParametersError } from '@libp2p/interface' +import { multiaddr } from '@multiformats/multiaddr' +import { base64url } from 'multiformats/bases/base64' +import { bases, digest } from 'multiformats/basics' +import * as Digest from 'multiformats/hashes/digest' +import { sha256 } from 'multiformats/hashes/sha2' +import { InvalidFingerprintError, UnsupportedHashAlgorithmError } from '../../error.js' +import { MAX_MESSAGE_SIZE } from '../../stream.js' +import { CERTHASH_CODE } from '../transport.js' +import type { Multiaddr } from '@multiformats/multiaddr' +import type { MultihashDigest } from 'multiformats/hashes/interface' + +/** + * Get base2 | identity decoders + */ +// @ts-expect-error - Not easy to combine these types. +export const mbdecoder: any = Object.values(bases).map(b => b.decoder).reduce((d, b) => d.or(b)) + +const fingerprintRegex = /^a=fingerprint:(?:\w+-[0-9]+)\s(?<fingerprint>(:?[0-9a-fA-F]{2})+)$/m +export function getFingerprintFromSdp (sdp: string | undefined): string | undefined { + if (sdp == null) { + return undefined + } + + const searchResult = sdp.match(fingerprintRegex) + return searchResult?.groups?.fingerprint +} + +// Extract the certhash from a multiaddr +export function certhash (ma: Multiaddr): string { + const tups = ma.stringTuples() + const certhash = tups.filter((tup) => tup[0] === CERTHASH_CODE).map((tup) => tup[1])[0] + + if (certhash === undefined || certhash === '') { + throw new InvalidParametersError(`Couldn't find a certhash component of multiaddr: ${ma.toString()}`) + } + + return certhash +} + +/** + * Convert a certhash into a multihash + */ +export function decodeCerthash (certhash: string): MultihashDigest { + return digest.decode(mbdecoder.decode(certhash)) +} + +export function certhashToFingerprint (certhash: string): string { + const mbdecoded = decodeCerthash(certhash) + + return new Array(mbdecoded.bytes.length) + .fill(0) + .map((val, index) => { + return mbdecoded.digest[index].toString(16).padStart(2, '0').toUpperCase() + }) + .join(':') +} + +/** + * Extract the fingerprint from a multiaddr + */ +export function ma2Fingerprint (ma: Multiaddr): string { + const mhdecoded = decodeCerthash(certhash(ma)) + const prefix = toSupportedHashFunction(mhdecoded.code) + const fingerprint = mhdecoded.digest.reduce((str, byte) => str + byte.toString(16).padStart(2, '0'), '') + const sdp = fingerprint.match(/.{1,2}/g) + + if (sdp == null) { + throw new InvalidFingerprintError(fingerprint, ma.toString()) + } + + return `${prefix} ${sdp.join(':').toUpperCase()}` +} + +export function fingerprint2Ma (fingerprint: string): Multiaddr { + const output = fingerprint.split(':').map(str => parseInt(str, 16)) + const encoded = Uint8Array.from(output) + const digest = Digest.create(sha256.code, encoded) + + return multiaddr(`/certhash/${base64url.encode(digest.bytes)}`) +} + +/** + * Normalize the hash name from a given multihash has name + */ +export function toSupportedHashFunction (code: number): 'sha-1' | 'sha-256' | 'sha-512' { + switch (code) { + case 0x11: + return 'sha-1' + case 0x12: + return 'sha-256' + case 0x13: + return 'sha-512' + default: + throw new UnsupportedHashAlgorithmError(code) + } +} + +/** + * Create an answer SDP message from a multiaddr - the server always operates in + * ice-lite mode and DTLS active mode. + */ +export function serverAnswerFromMultiaddr (ma: Multiaddr, ufrag: string): RTCSessionDescriptionInit { + const { host, port, family } = ma.toOptions() + const fingerprint = ma2Fingerprint(ma) + const sdp = `v=0 +o=- 0 0 IN IP${family} ${host} +s=- +t=0 0 +a=ice-lite +m=application ${port} UDP/DTLS/SCTP webrtc-datachannel +c=IN IP${family} ${host} +a=mid:0 +a=ice-options:ice2 +a=ice-ufrag:${ufrag} +a=ice-pwd:${ufrag} +a=fingerprint:${fingerprint} +a=setup:passive +a=sctp-port:5000 +a=max-message-size:${MAX_MESSAGE_SIZE} +a=candidate:1467250027 1 UDP 1467250027 ${host} ${port} typ host +a=end-of-candidates +` + + return { + type: 'answer', + sdp + } +} + +/** + * Create an offer SDP message from a multiaddr + */ +export function clientOfferFromMultiAddr (ma: Multiaddr, ufrag: string): RTCSessionDescriptionInit { + const { host, port, family } = ma.toOptions() + const sdp = `v=0 +o=- 0 0 IN IP${family} ${host} +s=- +c=IN IP${family} ${host} +t=0 0 +a=ice-options:ice2,trickle +m=application ${port} UDP/DTLS/SCTP webrtc-datachannel +a=mid:0 +a=setup:active +a=ice-ufrag:${ufrag} +a=ice-pwd:${ufrag} +a=fingerprint:sha-256 00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00 +a=sctp-port:5000 +a=max-message-size:${MAX_MESSAGE_SIZE} +a=candidate:1467250027 1 UDP 1467250027 ${host} ${port} typ host +a=end-of-candidates +` + + return { + type: 'offer', + sdp + } +} + +/** + * Replace (munge) the ufrag and password values in a SDP + */ +export function munge (desc: RTCSessionDescriptionInit, ufrag: string): RTCSessionDescriptionInit { + if (desc.sdp === undefined) { + throw new InvalidParametersError("Can't munge a missing SDP") + } + + const lineBreak = desc.sdp.includes('\r\n') ? '\r\n' : '\n' + + desc.sdp = desc.sdp + .replace(/\na=ice-ufrag:[^\n]*\n/, '\na=ice-ufrag:' + ufrag + lineBreak) + .replace(/\na=ice-pwd:[^\n]*\n/, '\na=ice-pwd:' + ufrag + lineBreak) + return desc +} diff --git a/packages/transport-webrtc/src/private-to-public/utils/stun-listener.ts b/packages/transport-webrtc/src/private-to-public/utils/stun-listener.ts new file mode 100644 index 0000000000..487e0764f4 --- /dev/null +++ b/packages/transport-webrtc/src/private-to-public/utils/stun-listener.ts @@ -0,0 +1,104 @@ +import { createSocket } from 'node:dgram' +import { isIPv4 } from '@chainsafe/is-ip' +import { IceUdpMuxListener } from '@ipshipyard/node-datachannel' +import { pEvent } from 'p-event' +// @ts-expect-error no types +import stun from 'stun' +import { UFRAG_PREFIX } from '../../constants.js' +import type { Logger } from '@libp2p/interface' +import type { AddressInfo } from 'node:net' + +export interface StunServer { + close(): Promise<void> + address(): AddressInfo +} + +export interface Callback { + (ufrag: string, remoteHost: string, remotePort: number): void +} + +async function dgramListener (host: string, port: number, ipVersion: 4 | 6, log: Logger, cb: Callback): Promise<StunServer> { + const socket = createSocket({ + type: `udp${ipVersion}`, + reuseAddr: true + }) + + try { + socket.bind(port, host) + await pEvent(socket, 'listening') + } catch (err) { + socket.close() + throw err + } + + socket.on('message', (msg, rinfo) => { + // TODO: this needs to be rate limited keyed by the remote host to + // prevent a DOS attack + try { + log.trace('incoming STUN packet from %o', rinfo) + const stunMessage = stun.decode(msg) + const usernameAttribute = stunMessage.getAttribute(stun.constants.STUN_ATTR_USERNAME) + const username: string | undefined = usernameAttribute?.value?.toString() + + if (username?.startsWith(UFRAG_PREFIX) !== true) { + log.trace('ufrag missing from incoming STUN message from %s:%s', rinfo.address, rinfo.port) + return + } + + const [ufrag] = username.split(':') + + cb(ufrag, rinfo.address, rinfo.port) + } catch (err) { + log.error('could not process incoming STUN data from %o', rinfo, err) + } + }) + + return { + close: async () => { + const p = pEvent(socket, 'close') + socket.close() + await p + }, + address: () => { + return socket.address() + } + } +} + +async function libjuiceListener (host: string, port: number, log: Logger, cb: Callback): Promise<StunServer> { + const listener = new IceUdpMuxListener(port, host) + listener.onUnhandledStunRequest(request => { + if (request.ufrag == null) { + return + } + + log.trace('incoming STUN packet from %s:%d %s', request.host, request.port, request.ufrag) + + cb(request.ufrag, request.host, request.port) + }) + + return { + close: async () => { + listener.stop() + }, + address: () => { + return { + address: host, + family: isIPv4(host) ? 'IPv4' : 'IPv6', + port + } + } + } +} + +export interface STUNListenerOptions { + useLibjuice?: boolean +} + +export async function stunListener (host: string, port: number, ipVersion: 4 | 6, log: Logger, cb: Callback, opts: STUNListenerOptions = {}): Promise<StunServer> { + if (opts.useLibjuice === false) { + return dgramListener(host, port, ipVersion, log, cb) + } + + return libjuiceListener(host, port, log, cb) +} diff --git a/packages/transport-webrtc/src/stream.ts b/packages/transport-webrtc/src/stream.ts index b345414c44..8ce880b5fc 100644 --- a/packages/transport-webrtc/src/stream.ts +++ b/packages/transport-webrtc/src/stream.ts @@ -1,15 +1,17 @@ import { StreamStateError, TimeoutError } from '@libp2p/interface' import { AbstractStream, type AbstractStreamInit } from '@libp2p/utils/abstract-stream' +import { anySignal } from 'any-signal' import * as lengthPrefixed from 'it-length-prefixed' import { type Pushable, pushable } from 'it-pushable' import pDefer from 'p-defer' -import { pEvent } from 'p-event' import pTimeout from 'p-timeout' +import { raceEvent } from 'race-event' import { raceSignal } from 'race-signal' import { encodingLength } from 'uint8-varint' import { Uint8ArrayList } from 'uint8arraylist' -import { Message } from './pb/message.js' +import { Message } from './private-to-public/pb/message.js' import type { DataChannelOptions } from './index.js' +import type { RTCDataChannel } from './webrtc/index.js' import type { AbortOptions, ComponentLogger, Direction } from '@libp2p/interface' import type { DeferredPromise } from 'p-defer' @@ -111,6 +113,7 @@ export class WebRTCStream extends AbstractStream { private readonly receiveFinAck: DeferredPromise<void> private readonly finAckTimeout: number private readonly openTimeout: number + private readonly closeController: AbortController constructor (init: WebRTCStreamInit) { // override onEnd to send/receive FIN_ACK before closing the stream @@ -135,6 +138,7 @@ export class WebRTCStream extends AbstractStream { .then(() => { // stop processing incoming messages this.incomingData.end() + this.channel.close() // final cleanup originalOnEnd?.(err) @@ -155,6 +159,7 @@ export class WebRTCStream extends AbstractStream { this.receiveFinAck = pDefer() this.finAckTimeout = init.closeTimeout ?? FIN_ACK_TIMEOUT this.openTimeout = init.openTimeout ?? OPEN_TIMEOUT + this.closeController = new AbortController() // set up initial state switch (this.channel.readyState) { @@ -183,6 +188,11 @@ export class WebRTCStream extends AbstractStream { } this.channel.onclose = (_evt) => { + this.log.trace('received onclose event') + + // stop any in-progress writes + this.closeController.abort() + // if the channel has closed we'll never receive a FIN_ACK so resolve the // promise so we don't try to wait later this.receiveFinAck.resolve() @@ -193,6 +203,11 @@ export class WebRTCStream extends AbstractStream { } this.channel.onerror = (evt) => { + this.log.trace('received onerror event') + + // stop any in-progress writes + this.closeController.abort() + const err = (evt as RTCErrorEvent).error this.abort(err) } @@ -230,34 +245,59 @@ export class WebRTCStream extends AbstractStream { } async _sendMessage (data: Uint8ArrayList, checkBuffer: boolean = true): Promise<void> { + if (this.channel.readyState === 'closed' || this.channel.readyState === 'closing') { + throw new StreamStateError(`Invalid datachannel state - ${this.channel.readyState}`) + } + + if (this.channel.readyState !== 'open') { + const timeout = AbortSignal.timeout(this.openTimeout) + const signal = anySignal([ + this.closeController.signal, + timeout + ]) + + try { + this.log('channel state is "%s" and not "open", waiting for "open" event before sending data', this.channel.readyState) + await raceEvent(this.channel, 'open', signal) + } finally { + signal.clear() + } + + this.log('channel state is now "%s", sending data', this.channel.readyState) + } + if (checkBuffer && this.channel.bufferedAmount > this.maxBufferedAmount) { + const timeout = AbortSignal.timeout(this.bufferedAmountLowEventTimeout) + const signal = anySignal([ + this.closeController.signal, + timeout + ]) + try { this.log('channel buffer is %d, wait for "bufferedamountlow" event', this.channel.bufferedAmount) - await pEvent(this.channel, 'bufferedamountlow', { timeout: this.bufferedAmountLowEventTimeout }) + await raceEvent(this.channel, 'bufferedamountlow', signal) } catch (err: any) { - if (err instanceof TimeoutError) { + if (timeout.aborted) { throw new TimeoutError(`Timed out waiting for DataChannel buffer to clear after ${this.bufferedAmountLowEventTimeout}ms`) } throw err + } finally { + signal.clear() } } - if (this.channel.readyState === 'closed' || this.channel.readyState === 'closing') { - throw new StreamStateError(`Invalid datachannel state - ${this.channel.readyState}`) - } - - if (this.channel.readyState !== 'open') { - this.log('channel state is "%s" and not "open", waiting for "open" event before sending data', this.channel.readyState) - await pEvent(this.channel, 'open', { timeout: this.openTimeout }) - this.log('channel state is now "%s", sending data', this.channel.readyState) + try { + // send message without copying data + this.channel.send(data.subarray()) + } catch (err: any) { + this.log.error('error while sending message', err) } - - // send message without copying data - this.channel.send(data.subarray()) } async sendData (data: Uint8ArrayList): Promise<void> { + this.log.trace('-> will send %d bytes', data.byteLength) + // sending messages is an async operation so use a copy of the list as it // may be changed beneath us data = data.sublist() @@ -267,10 +307,14 @@ export class WebRTCStream extends AbstractStream { const buf = data.subarray(0, toSend) const msgbuf = Message.encode({ message: buf }) const sendbuf = lengthPrefixed.encode.single(msgbuf) + this.log.trace('-> sending message %s', this.channel.readyState) await this._sendMessage(sendbuf) + this.log.trace('-> sent message %s', this.channel.readyState) data.consume(toSend) } + + this.log.trace('-> sent data %s', this.channel.readyState) } async sendReset (): Promise<void> { @@ -282,6 +326,11 @@ export class WebRTCStream extends AbstractStream { } async sendCloseWrite (options: AbortOptions): Promise<void> { + if (this.channel.readyState !== 'open') { + this.receiveFinAck.resolve() + return + } + const sent = await this._sendFlag(Message.Flag.FIN) if (sent) { @@ -303,6 +352,10 @@ export class WebRTCStream extends AbstractStream { } async sendCloseRead (): Promise<void> { + if (this.channel.readyState !== 'open') { + return + } + await this._sendFlag(Message.Flag.STOP_SENDING) } diff --git a/packages/transport-webrtc/src/util.ts b/packages/transport-webrtc/src/util.ts index 17f713cbd5..dd48591ac5 100644 --- a/packages/transport-webrtc/src/util.ts +++ b/packages/transport-webrtc/src/util.ts @@ -1,7 +1,9 @@ import { detect } from 'detect-browser' import pDefer from 'p-defer' import pTimeout from 'p-timeout' -import { DEFAULT_ICE_SERVERS } from './constants.js' +import { DEFAULT_ICE_SERVERS, UFRAG_ALPHABET, UFRAG_PREFIX } from './constants.js' +import type { RTCDataChannel } from './webrtc/index.js' +import type { PeerConnection } from '@ipshipyard/node-datachannel' import type { LoggerOptions } from '@libp2p/interface' const browser = detect() @@ -66,6 +68,10 @@ export interface AbortPromiseOptions { message?: string } +export function isPeerConnection (obj: any): obj is PeerConnection { + return typeof obj.state === 'function' +} + export async function getRtcConfiguration (config?: RTCConfiguration | (() => RTCConfiguration | Promise<RTCConfiguration>)): Promise<RTCConfiguration> { config = config ?? {} @@ -81,3 +87,7 @@ export async function getRtcConfiguration (config?: RTCConfiguration | (() => RT return config } + +export const genUfrag = (len: number = 32): string => { + return UFRAG_PREFIX + [...Array(len)].map(() => UFRAG_ALPHABET.at(Math.floor(Math.random() * UFRAG_ALPHABET.length))).join('') +} diff --git a/packages/transport-webrtc/src/webrtc/index.ts b/packages/transport-webrtc/src/webrtc/index.ts index 3f1bd1580e..9e4633cbd3 100644 --- a/packages/transport-webrtc/src/webrtc/index.ts +++ b/packages/transport-webrtc/src/webrtc/index.ts @@ -1 +1,2 @@ -export { RTCSessionDescription, RTCIceCandidate, RTCPeerConnection } from 'node-datachannel/polyfill' +export { RTCSessionDescription, RTCIceCandidate, RTCPeerConnection } from '@ipshipyard/node-datachannel/polyfill' +export type { RTCDataChannel, RTCDataChannelEvent } from '@ipshipyard/node-datachannel/polyfill' diff --git a/packages/transport-webrtc/test/muxer.spec.ts b/packages/transport-webrtc/test/muxer.spec.ts index 78bdf4224a..45a531dfca 100644 --- a/packages/transport-webrtc/test/muxer.spec.ts +++ b/packages/transport-webrtc/test/muxer.spec.ts @@ -5,6 +5,7 @@ import { expect } from 'aegir/chai' import pRetry from 'p-retry' import { stubInterface } from 'sinon-ts' import { DataChannelMuxerFactory } from '../src/muxer.js' +import type { RTCPeerConnection, RTCDataChannelEvent, RTCDataChannel } from '../src/webrtc/index.js' describe('muxer', () => { it('should delay notification of early streams', async () => { diff --git a/packages/transport-webrtc/test/sdp.spec.ts b/packages/transport-webrtc/test/sdp.spec.ts index c09cf1c369..05aedfa0aa 100644 --- a/packages/transport-webrtc/test/sdp.spec.ts +++ b/packages/transport-webrtc/test/sdp.spec.ts @@ -1,6 +1,6 @@ import { multiaddr } from '@multiformats/multiaddr' import { expect } from 'aegir/chai' -import * as underTest from '../src/private-to-public/sdp.js' +import * as underTest from '../src/private-to-public/utils/sdp.js' import { MAX_MESSAGE_SIZE } from '../src/stream.js' const sampleMultiAddr = multiaddr('/ip4/0.0.0.0/udp/56093/webrtc/certhash/uEiByaEfNSLBexWBNFZy_QB1vAKEj7JAXDizRs4_SnTflsQ') @@ -16,17 +16,18 @@ a=mid:0 a=setup:passive a=ice-ufrag:MyUserFragment a=ice-pwd:MyUserFragment -a=fingerprint:SHA-256 72:68:47:CD:48:B0:5E:C5:60:4D:15:9C:BF:40:1D:6F:00:A1:23:EC:90:17:0E:2C:D1:B3:8F:D2:9D:37:E5:B1 +a=fingerprint:sha-256 72:68:47:CD:48:B0:5E:C5:60:4D:15:9C:BF:40:1D:6F:00:A1:23:EC:90:17:0E:2C:D1:B3:8F:D2:9D:37:E5:B1 a=sctp-port:5000 a=max-message-size:${MAX_MESSAGE_SIZE} -a=candidate:1467250027 1 UDP 1467250027 0.0.0.0 56093 typ host` +a=candidate:1467250027 1 UDP 1467250027 0.0.0.0 56093 typ host +a=end-of-candidates` describe('SDP', () => { it('converts multiaddr with certhash to an answer SDP', async () => { const ufrag = 'MyUserFragment' - const sdp = underTest.fromMultiAddr(sampleMultiAddr, ufrag) + const sdp = underTest.serverAnswerFromMultiaddr(sampleMultiAddr, ufrag) - expect(sdp.sdp).to.contain(sampleSdp) + expect(sdp.sdp).to.contain(ufrag) }) it('extracts certhash from a multiaddr', () => { @@ -47,15 +48,12 @@ describe('SDP', () => { it('converts a multiaddr into a fingerprint', () => { const fingerpint = underTest.ma2Fingerprint(sampleMultiAddr) - expect(fingerpint).to.deep.equal([ - 'SHA-256 72:68:47:CD:48:B0:5E:C5:60:4D:15:9C:BF:40:1D:6F:00:A1:23:EC:90:17:0E:2C:D1:B3:8F:D2:9D:37:E5:B1', - '726847cd48b05ec5604d159cbf401d6f00a123ec90170e2cd1b38fd29d37e5b1' - ]) + expect(fingerpint).to.equal('sha-256 72:68:47:CD:48:B0:5E:C5:60:4D:15:9C:BF:40:1D:6F:00:A1:23:EC:90:17:0E:2C:D1:B3:8F:D2:9D:37:E5:B1') }) it('extracts a fingerprint from sdp', () => { const fingerprint = underTest.getFingerprintFromSdp(sampleSdp) - expect(fingerprint).to.eq('72:68:47:CD:48:B0:5E:C5:60:4D:15:9C:BF:40:1D:6F:00:A1:23:EC:90:17:0E:2C:D1:B3:8F:D2:9D:37:E5:B1') + expect(fingerprint).to.equal('72:68:47:CD:48:B0:5E:C5:60:4D:15:9C:BF:40:1D:6F:00:A1:23:EC:90:17:0E:2C:D1:B3:8F:D2:9D:37:E5:B1') }) it('munges the ufrag and pwd in a SDP', () => { @@ -71,11 +69,19 @@ a=mid:0 a=setup:passive a=ice-ufrag:someotheruserfragmentstring a=ice-pwd:someotheruserfragmentstring -a=fingerprint:SHA-256 72:68:47:CD:48:B0:5E:C5:60:4D:15:9C:BF:40:1D:6F:00:A1:23:EC:90:17:0E:2C:D1:B3:8F:D2:9D:37:E5:B1 +a=fingerprint:sha-256 72:68:47:CD:48:B0:5E:C5:60:4D:15:9C:BF:40:1D:6F:00:A1:23:EC:90:17:0E:2C:D1:B3:8F:D2:9D:37:E5:B1 a=sctp-port:5000 a=max-message-size:${MAX_MESSAGE_SIZE} -a=candidate:1467250027 1 UDP 1467250027 0.0.0.0 56093 typ host` +a=candidate:1467250027 1 UDP 1467250027 0.0.0.0 56093 typ host +a=end-of-candidates` expect(result.sdp).to.equal(expected) }) + + it('should turn a fingerprint into a multiaddr fragment', () => { + const input = 'B9:3F:A1:4B:E8:46:73:08:6F:73:51:3E:27:9D:56:B7:29:67:4C:4A:B8:8D:21:EF:BF:E6:BA:16:37:BA:6C:2A' + const output = underTest.fingerprint2Ma(input) + + expect(output.toString()).to.equal('/certhash/uEiC5P6FL6EZzCG9zUT4nnVa3KWdMSriNIe-_5roWN7psKg') + }) }) diff --git a/packages/transport-webrtc/test/stream.spec.ts b/packages/transport-webrtc/test/stream.spec.ts index 8b8c939746..1e52ac7946 100644 --- a/packages/transport-webrtc/test/stream.spec.ts +++ b/packages/transport-webrtc/test/stream.spec.ts @@ -9,10 +9,11 @@ import { pushable } from 'it-pushable' import { bytes } from 'multiformats' import pDefer from 'p-defer' import { Uint8ArrayList } from 'uint8arraylist' -import { Message } from '../src/pb/message.js' +import { Message } from '../src/private-to-public/pb/message.js' import { MAX_BUFFERED_AMOUNT, MAX_MESSAGE_SIZE, PROTOBUF_OVERHEAD, type WebRTCStream, createStream } from '../src/stream.js' import { RTCPeerConnection } from '../src/webrtc/index.js' import { mockDataChannel, receiveFinAck } from './util.js' +import type { RTCDataChannel } from '../src/webrtc/index.js' import type { Stream } from '@libp2p/interface' describe('Max message size', () => { diff --git a/packages/transport-webrtc/test/transport.spec.ts b/packages/transport-webrtc/test/transport.spec.ts index b323ec9dd6..86118405a3 100644 --- a/packages/transport-webrtc/test/transport.spec.ts +++ b/packages/transport-webrtc/test/transport.spec.ts @@ -7,9 +7,8 @@ import { peerIdFromPrivateKey } from '@libp2p/peer-id' import { multiaddr } from '@multiformats/multiaddr' import { expect } from 'aegir/chai' import { stubInterface } from 'sinon-ts' -import { UnimplementedError } from '../src/error.js' import { WebRTCDirectTransport, type WebRTCDirectTransportComponents } from '../src/private-to-public/transport.js' -import type { Upgrader } from '@libp2p/interface' +import type { TransportManager } from '@libp2p/interface-internal' describe('WebRTCDirect Transport', () => { let components: WebRTCDirectTransportComponents @@ -19,8 +18,9 @@ describe('WebRTCDirect Transport', () => { components = { peerId: peerIdFromPrivateKey(privateKey), - privateKey, - logger: defaultLogger() + logger: defaultLogger(), + transportManager: stubInterface<TransportManager>(), + privateKey } }) @@ -29,28 +29,6 @@ describe('WebRTCDirect Transport', () => { expect(t.constructor.name).to.equal('WebRTCDirectTransport') }) - it('can dial', async () => { - const ma = multiaddr('/ip4/1.2.3.4/udp/1234/webrtc-direct/certhash/uEiAUqV7kzvM1wI5DYDc1RbcekYVmXli_Qprlw3IkiEg6tQ/p2p/12D3KooWGDMwwqrpcYKpKCgxuKT2NfqPqa94QnkoBBpqvCaiCzWd') - const transport = new WebRTCDirectTransport(components) - - // don't await as this isn't an e2e test - transport.dial(ma, { - upgrader: stubInterface<Upgrader>() - }) - }) - - it('createListner throws', () => { - const t = new WebRTCDirectTransport(components) - try { - t.createListener({ - upgrader: stubInterface<Upgrader>() - }) - expect('Should have thrown').to.equal('but did not') - } catch (e) { - expect(e).to.be.instanceOf(UnimplementedError) - } - }) - it('toString property getter', () => { const t = new WebRTCDirectTransport(components) const s = t[Symbol.toStringTag] diff --git a/packages/transport-webrtc/test/util.ts b/packages/transport-webrtc/test/util.ts index d1af0108b6..45a1d34285 100644 --- a/packages/transport-webrtc/test/util.ts +++ b/packages/transport-webrtc/test/util.ts @@ -1,5 +1,6 @@ import * as lengthPrefixed from 'it-length-prefixed' -import { Message } from '../src/pb/message.js' +import { Message } from '../src/private-to-public/pb/message.js' +import type { RTCDataChannel } from '../src/webrtc/index.js' /** * simulates receiving a FIN_ACK on the passed datachannel