Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/1982 #2051

Merged
merged 17 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion packages/backend/src/nest/common/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ import { type PermsData } from '@quiet/types'
import { TestConfig } from '../const'
import logger from './logger'
import { Libp2pNodeParams } from '../libp2p/libp2p.types'
import { createLibp2pAddress, createLibp2pListenAddress } from '@quiet/common'
import { createLibp2pAddress, createLibp2pListenAddress, isDefined } from '@quiet/common'
import { Libp2pService } from '../libp2p/libp2p.service'
import { CertFieldsTypes, getReqFieldValue, loadCSR } from '@quiet/identity'

const log = logger('test')

Expand Down Expand Up @@ -153,6 +154,20 @@ export const getUsersAddresses = async (users: UserData[]): Promise<string[]> =>
return await Promise.all(peers)
}

export const getLibp2pAddressesFromCsrs = async (csrs: string[]): Promise<string[]> => {
const addresses = await Promise.all(
csrs.map(async csr => {
const parsedCsr = await loadCSR(csr)
const peerId = getReqFieldValue(parsedCsr, CertFieldsTypes.peerId)
const onionAddress = getReqFieldValue(parsedCsr, CertFieldsTypes.commonName)
if (!peerId || !onionAddress) return

return createLibp2pAddress(onionAddress, peerId)
})
)
return addresses.filter(isDefined)
}

/**
* Compares given numbers
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { setEngine, CryptoEngine } from 'pkijs'
import { EventEmitter } from 'events'
import getPort from 'get-port'
import PeerId from 'peer-id'
import { removeFilesFromDir } from '../common/utils'
import { getLibp2pAddressesFromCsrs, removeFilesFromDir } from '../common/utils'

import {
AskForMessagesPayload,
Expand Down Expand Up @@ -157,9 +157,10 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI
this.logger('launchCommunityFromStorage')

const community: InitCommunityPayload = await this.localDbService.get(LocalDBKeys.COMMUNITY)
this.logger('launchCommunityFromStorage - community:', community?.id)
console.log('launchCommunityFromStorage - community peers', community?.peers)
if (community) {
const sortedPeers = await this.localDbService.getSortedPeers(community.peers)
console.log('launchCommunityFromStorage - sorted peers', sortedPeers)
if (sortedPeers.length > 0) {
community.peers = sortedPeers
}
Expand Down Expand Up @@ -191,9 +192,9 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI
this.logger('Closing local storage')
await this.localDbService.close()
}
if (this.libp2pService?.libp2pInstance) {
if (this.libp2pService) {
this.logger('Stopping libp2p')
await this.libp2pService.libp2pInstance.stop()
await this.libp2pService.close()
}
}

Expand All @@ -208,22 +209,24 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI

public async leaveCommunity() {
this.tor.resetHiddenServices()
this.serverIoProvider.io.close()
this.closeSocket()
await this.localDbService.purge()
await this.closeAllServices({ saveTor: true })
await this.purgeData()
await this.resetState()
await this.localDbService.open()
await this.socketService.init()
}

async resetState() {
this.communityId = ''
this.ports = { ...this.ports, libp2pHiddenService: await getPort() }
this.libp2pService.libp2pInstance = null
this.libp2pService.connectedPeers = new Map()
this.communityState = ServiceState.DEFAULT
this.registrarState = ServiceState.DEFAULT
await this.localDbService.open()
await this.socketService.init()
}

public async purgeData() {
console.log('removing data')
this.logger('Purging community data')
const dirsToRemove = fs
.readdirSync(this.quietDir)
.filter(
Expand Down Expand Up @@ -586,6 +589,7 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI
this.serverIoProvider.io.emit(SocketActionTypes.RESPONSE_FETCH_ALL_DIRECT_MESSAGES, payload)
})
this.storageService.on(StorageEvents.UPDATE_PEERS_LIST, (payload: StorePeerListPayload) => {
// this.libp2pService.emit(Libp2pEvents.UPDATE_KNOWN_PEERS_LIST, payload.peerList)
this.serverIoProvider.io.emit(SocketActionTypes.PEER_LIST, payload)
})
this.storageService.on(StorageEvents.SEND_PUSH_NOTIFICATION, (payload: PushNotificationPayload) => {
Expand All @@ -601,6 +605,8 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI
this.storageService.on(
StorageEvents.REPLICATED_CSR,
async (payload: { csrs: string[]; certificates: string[]; id: string }) => {
console.log(`On ${StorageEvents.REPLICATED_CSR}`)
this.libp2pService.emit(Libp2pEvents.DIAL_PEERS, await getLibp2pAddressesFromCsrs(payload.csrs))
console.log(`Storage - ${StorageEvents.REPLICATED_CSR}`)
this.serverIoProvider.io.emit(SocketActionTypes.RESPONSE_GET_CSRS, { csrs: payload.csrs })
this.registrationService.emit(RegistrationEvents.REGISTER_USER_CERTIFICATE, payload)
Expand Down
2 changes: 2 additions & 0 deletions packages/backend/src/nest/const.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ export const IPFS_REPO_PATCH = 'ipfsRepoPath'
export const CONFIG_OPTIONS = 'configOptions'
export const SERVER_IO_PROVIDER = 'serverIoProvider'

export const PROCESS_IN_CHUNKS_PROVIDER = 'processInChunksProvider'

export const EXPRESS_PROVIDER = 'expressProvider'

export const LEVEL_DB = 'levelDb'
Expand Down
3 changes: 2 additions & 1 deletion packages/backend/src/nest/libp2p/libp2p.module.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { Module } from '@nestjs/common'
import { SocketModule } from '../socket/socket.module'
import { Libp2pService } from './libp2p.service'
import { ProcessInChunksService } from './process-in-chunks.service'

@Module({
imports: [SocketModule],
providers: [Libp2pService],
providers: [Libp2pService, ProcessInChunksService],
exports: [Libp2pService],
})
export class Libp2pModule {}
51 changes: 46 additions & 5 deletions packages/backend/src/nest/libp2p/libp2p.service.spec.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,28 @@
import { jest } from '@jest/globals'
import { Test, TestingModule } from '@nestjs/testing'
import { TestModule } from '../common/test.module'
import { libp2pInstanceParams } from '../common/utils'
import { createPeerId, libp2pInstanceParams } from '../common/utils'
import { Libp2pModule } from './libp2p.module'
import { LIBP2P_PSK_METADATA, Libp2pService } from './libp2p.service'
import { Libp2pNodeParams } from './libp2p.types'
import { Libp2pEvents, Libp2pNodeParams } from './libp2p.types'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import validator from 'validator'
import waitForExpect from 'wait-for-expect'
import { ProcessInChunksService } from './process-in-chunks.service'

describe('Libp2pService', () => {
let module: TestingModule
let libp2pService: Libp2pService
let params: Libp2pNodeParams
let processInChunks: ProcessInChunksService<string>

beforeAll(async () => {
module = await Test.createTestingModule({
imports: [TestModule, Libp2pModule],
}).compile()

libp2pService = await module.resolve(Libp2pService)
processInChunks = await module.resolve(ProcessInChunksService<string>)
params = await libp2pInstanceParams()
})

Expand All @@ -32,13 +37,13 @@ describe('Libp2pService', () => {
expect(libp2pService?.libp2pInstance?.peerId).toBe(params.peerId)
})

it('destory instance libp2p', async () => {
it('close libp2p service', async () => {
await libp2pService.createInstance(params)
await libp2pService.destroyInstance()
await libp2pService.close()
expect(libp2pService.libp2pInstance).toBeNull()
})

it('creates libp2p address with proper ws type (%s)', async () => {
it('creates libp2p address', async () => {
const libp2pAddress = libp2pService.createLibp2pAddress(params.localAddress, params.peerId.toString())
expect(libp2pAddress).toStrictEqual(`/dns4/${params.localAddress}.onion/tcp/80/ws/p2p/${params.peerId.toString()}`)
})
Expand All @@ -58,4 +63,40 @@ describe('Libp2pService', () => {
const expectedFullKeyString = LIBP2P_PSK_METADATA + uint8ArrayToString(generatedPskBuffer, 'base16')
expect(uint8ArrayToString(generatedKey.fullKey)).toEqual(expectedFullKeyString)
})

it(`Starts dialing peers on '${Libp2pEvents.DIAL_PEERS}' event`, async () => {
const peerId1 = await createPeerId()
const peerId2 = await createPeerId()
const addresses = [
libp2pService.createLibp2pAddress('onionAddress1.onion', peerId1.toString()),
libp2pService.createLibp2pAddress('onionAddress2.onion', peerId2.toString()),
]
await libp2pService.createInstance(params)
// @ts-expect-error processItem is private
const spyOnProcessItem = jest.spyOn(processInChunks, 'processItem')
expect(libp2pService.libp2pInstance).not.toBeNull()
libp2pService.emit(Libp2pEvents.DIAL_PEERS, addresses)
await waitForExpect(async () => {
expect(spyOnProcessItem).toBeCalledTimes(addresses.length)
})
})

it(`Do not dial peer on '${Libp2pEvents.DIAL_PEERS}' event if peer was already dialed`, async () => {
const peerId1 = await createPeerId()
const peerId2 = await createPeerId()
const alreadyDialedAddress = libp2pService.createLibp2pAddress('onionAddress1.onion', peerId1.toString())
libp2pService.dialedPeers.add(alreadyDialedAddress)
const addresses = [
alreadyDialedAddress,
libp2pService.createLibp2pAddress('onionAddress2.onion', peerId2.toString()),
]
await libp2pService.createInstance(params)
expect(libp2pService.libp2pInstance).not.toBeNull()
// @ts-expect-error processItem is private
const dialPeerSpy = jest.spyOn(processInChunks, 'processItem')
libp2pService.emit(Libp2pEvents.DIAL_PEERS, addresses)
await waitForExpect(async () => {
expect(dialPeerSpy).toBeCalledTimes(1)
})
})
})
74 changes: 51 additions & 23 deletions packages/backend/src/nest/libp2p/libp2p.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,21 @@ import { createServer } from 'it-ws'
import { DateTime } from 'luxon'
import { EventEmitter } from 'events'
import { Libp2pEvents, Libp2pNodeParams } from './libp2p.types'
import { ProcessInChunks } from './process-in-chunks'
import { ProcessInChunksService } from './process-in-chunks.service'
import { multiaddr } from '@multiformats/multiaddr'
import { ConnectionProcessInfo, PeerId, SocketActionTypes } from '@quiet/types'
import { SERVER_IO_PROVIDER, SOCKS_PROXY_AGENT } from '../const'
import { ServerIoProviderTypes } from '../types'
import Logger from '../common/logger'
import { webSockets } from '../websocketOverTor'
import { all } from '../websocketOverTor/filters'
import { createLibp2pAddress, createLibp2pListenAddress } from '@quiet/common'
import { createLibp2pAddress, createLibp2pListenAddress, isDefined } from '@quiet/common'
import { CertFieldsTypes, getReqFieldValue, loadCSR } from '@quiet/identity'
import { preSharedKey } from 'libp2p/pnet'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import crypto from 'crypto'
import { peerIdFromString } from '@libp2p/peer-id'

const KEY_LENGTH = 32
export const LIBP2P_PSK_METADATA = '/key/swarm/psk/1.0.0/\n/base16/\n'
Expand All @@ -30,15 +32,23 @@ export const LIBP2P_PSK_METADATA = '/key/swarm/psk/1.0.0/\n/base16/\n'
export class Libp2pService extends EventEmitter {
public libp2pInstance: Libp2p | null
public connectedPeers: Map<string, number> = new Map()
public dialedPeers: Set<string> = new Set()
// public processInChunksService: ProcessInChunks<string>
private readonly logger = Logger(Libp2pService.name)
constructor(
@Inject(SERVER_IO_PROVIDER) public readonly serverIoProvider: ServerIoProviderTypes,
@Inject(SOCKS_PROXY_AGENT) public readonly socksProxyAgent: Agent
@Inject(SOCKS_PROXY_AGENT) public readonly socksProxyAgent: Agent,
private readonly processInChunksService: ProcessInChunksService<string>
) {
super()
}

private dialPeer = async (peerAddress: string) => {
if (this.dialedPeers.has(peerAddress)) {
console.log(`Peer ${peerAddress} already dialed, not dialing`) // TODO: remove log
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this work with min/max connections? Once a peer has been dialed, are they never dialed again? Would that mean we might run out of peers to dial if enough peers disconnect?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every dialed peer is added to libp2p's internal address book. Autodialer checks connections every X minutes. Technically (according to code) if number of connections falls below set minConnections (e.g everyone disconnect from us) autodialer should look up peers in libp2p peerStore and try to open connections again.

}
this.dialedPeers.add(peerAddress)
await this.libp2pInstance?.dial(multiaddr(peerAddress))
}

Expand Down Expand Up @@ -79,10 +89,11 @@ export class Libp2pService extends EventEmitter {
libp2p = await createLibp2p({
start: false,
connectionManager: {
minConnections: 3,
maxConnections: 8,
minConnections: 3, // TODO: increase?
maxConnections: 8, // TODO: increase?
dialTimeout: 120_000,
maxParallelDials: 10,
autoDial: true, // It's a default but let's set it to have explicit information
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice idea!

},
peerId: params.peerId,
addresses: {
Expand Down Expand Up @@ -123,43 +134,65 @@ export class Libp2pService extends EventEmitter {
return libp2p
}

// private async addPeersToPeerBook(addresses: string[]) {
// for (const address of addresses) {
// const peerId = multiaddr(address).getPeerId()
// if (!peerId) return
// // @ts-expect-error
// await this.libp2pInstance?.peerStore.addressBook.add(peerIdFromString(peerId), [multiaddr(address)])
// }
// }

private async afterCreation(peers: string[], peerId: PeerId) {
if (!this.libp2pInstance) {
this.logger.error('libp2pInstance was not created')
throw new Error('libp2pInstance was not created')
}

this.logger(`Local peerId: ${peerId.toString()}`)
this.on(Libp2pEvents.DIAL_PEERS, async (addresses: string[]) => {
const nonDialedAddresses = addresses.filter(peerAddress => !this.dialedPeers.has(peerAddress))
console.log('DIALING PEERS', nonDialedAddresses.length, 'addresses')
this.processInChunksService.updateData(nonDialedAddresses)
await this.processInChunksService.process()
})

this.logger(`Initializing libp2p for ${peerId.toString()}, bootstrapping with ${peers.length} peers`)
this.serverIoProvider.io.emit(SocketActionTypes.CONNECTION_PROCESS_INFO, ConnectionProcessInfo.INITIALIZING_LIBP2P)
const dialInChunks = new ProcessInChunks<string>(peers, this.dialPeer)
this.processInChunksService.init(peers, this.dialPeer)

this.libp2pInstance.addEventListener('peer:discovery', peer => {
this.logger(`${peerId.toString()} discovered ${peer.detail.id}`)
})

this.libp2pInstance.addEventListener('peer:connect', async peer => {
const remotePeerId = peer.detail.remotePeer.toString()
this.logger(`${peerId.toString()} connected to ${remotePeerId}`)
const localPeerId = peerId.toString()
this.logger(`${localPeerId} connected to ${remotePeerId}`)

// Stop dialing as soon as we connect to a peer
dialInChunks.stop()
// this.processInChunksService.stop()

this.connectedPeers.set(remotePeerId, DateTime.utc().valueOf())
this.logger(`${this.connectedPeers.size} connected peers`)
this.logger(`${localPeerId} is connected to ${this.connectedPeers.size} peers`)
this.logger(`${localPeerId} has ${this.libp2pInstance?.getConnections().length} open connections`)

this.emit(Libp2pEvents.PEER_CONNECTED, {
peers: [remotePeerId],
})
const latency = await this.libp2pInstance?.ping(peer.detail.remoteAddr)
this.logger(`${localPeerId} ping to ${remotePeerId}. Latency: ${latency}`)
})

this.libp2pInstance.addEventListener('peer:disconnect', async peer => {
const remotePeerId = peer.detail.remotePeer.toString()
this.logger(`${peerId.toString()} disconnected from ${remotePeerId}`)
const localPeerId = peerId.toString()
this.logger(`${localPeerId} disconnected from ${remotePeerId}`)
if (!this.libp2pInstance) {
this.logger.error('libp2pInstance was not created')
throw new Error('libp2pInstance was not created')
}
this.logger(`${this.libp2pInstance.getConnections().length} open connections`)
this.logger(`${localPeerId} has ${this.libp2pInstance.getConnections().length} open connections`)

const connectionStartTime = this.connectedPeers.get(remotePeerId)
if (!connectionStartTime) {
Expand All @@ -172,7 +205,7 @@ export class Libp2pService extends EventEmitter {
const connectionDuration: number = connectionEndTime - connectionStartTime

this.connectedPeers.delete(remotePeerId)
this.logger(`${this.connectedPeers.size} connected peers`)
this.logger(`${localPeerId} is connected to ${this.connectedPeers.size} peers`)

this.emit(Libp2pEvents.PEER_DISCONNECTED, {
peer: remotePeerId,
Expand All @@ -181,21 +214,16 @@ export class Libp2pService extends EventEmitter {
})
})

await dialInChunks.process()
await this.processInChunksService.process()

this.logger(`Initialized libp2p for peer ${peerId.toString()}`)
}

public async destroyInstance(): Promise<void> {
this.libp2pInstance?.removeEventListener('peer:discovery')
this.libp2pInstance?.removeEventListener('peer:connect')
this.libp2pInstance?.removeEventListener('peer:disconnect')
try {
await this.libp2pInstance?.stop()
} catch (error) {
this.logger.error(error)
}

public async close(): Promise<void> {
this.logger('Closing libp2p service')
await this.libp2pInstance?.stop()
this.libp2pInstance = null
this.connectedPeers = new Map()
this.dialedPeers = new Set()
}
}
Loading