Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: peer renewal connection drop & stream management #2145

Merged
merged 13 commits into from
Oct 1, 2024
159 changes: 98 additions & 61 deletions packages/core/src/lib/stream_manager/stream_manager.ts
Original file line number Diff line number Diff line change
@@ -1,47 +1,46 @@
import type { PeerUpdate, Stream } from "@libp2p/interface";
import type { Peer, PeerId } from "@libp2p/interface";
import { Libp2p } from "@waku/interfaces";
import type {
Connection,
Peer,
PeerId,
PeerUpdate,
Stream
} from "@libp2p/interface";
import type { Libp2p } from "@waku/interfaces";
import { Logger } from "@waku/utils";

import { selectConnection } from "./utils.js";

const CONNECTION_TIMEOUT = 5_000;
const RETRY_BACKOFF_BASE = 1_000;
const MAX_RETRIES = 3;

export class StreamManager {
private readonly streamPool: Map<string, Promise<Stream | void>>;
private readonly log: Logger;

private ongoingCreation: Set<string> = new Set();
private streamPool: Map<string, Promise<void>> = new Map();

public constructor(
public multicodec: string,
public getConnections: Libp2p["getConnections"],
public addEventListener: Libp2p["addEventListener"]
) {
this.log = new Logger(`stream-manager:${multicodec}`);
this.streamPool = new Map();
this.addEventListener("peer:update", this.handlePeerUpdateStreamPool);
}

public async getStream(peer: Peer): Promise<Stream> {
const peerIdStr = peer.id.toString();
const streamPromise = this.streamPool.get(peerIdStr);
const peerId = peer.id.toString();

if (!streamPromise) {
return this.createStream(peer);
}
const scheduledStream = this.streamPool.get(peerId);
this.streamPool.delete(peerId);
await scheduledStream;

this.streamPool.delete(peerIdStr);
this.prepareStream(peer);
const stream = this.getStreamForCodec(peer.id);

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if stream = await connection.newStream(this.multicodec); in createStream succeeds - then it will be attached to connection and hence this.getStreamForCodec will return it.

This is a bit convoluted at first but:

  • we need to keep our single source of truth - connection object;
  • we need to have a lock so that multiple events schedule only one redundant stream creation;

try {
const stream = await streamPromise;
if (stream && stream.status !== "closed") {
return stream;
}
} catch (error) {
this.log.warn(`Failed to get stream for ${peerIdStr} -- `, error);
this.log.warn("Attempting to create a new stream for the peer");
if (stream) {
this.log.info(
`Found existing stream peerId=${peer.id.toString()} multicodec=${this.multicodec}`
);
return stream;
}

return this.createStream(peer);
Expand All @@ -52,64 +51,102 @@ export class StreamManager {
const connection = selectConnection(connections);

if (!connection) {
throw new Error("Failed to get a connection to the peer");
throw new Error(
`Failed to get a connection to the peer peerId=${peer.id.toString()} multicodec=${this.multicodec}`
);
}

try {
return await connection.newStream(this.multicodec);
} catch (error) {
if (retries < MAX_RETRIES) {
const backoff = RETRY_BACKOFF_BASE * Math.pow(2, retries);
await new Promise((resolve) => setTimeout(resolve, backoff));
return this.createStream(peer, retries + 1);
let lastError: unknown;
let stream: Stream | undefined;

for (let i = 0; i < retries + 1; i++) {
try {
this.log.info(
`Attempting to create a stream for peerId=${peer.id.toString()} multicodec=${this.multicodec}`
);
stream = await connection.newStream(this.multicodec);
this.log.info(
`Created stream for peerId=${peer.id.toString()} multicodec=${this.multicodec}`
);
} catch (error) {
lastError = error;
}
}

if (!stream) {
throw new Error(
`Failed to create a new stream for ${peer.id.toString()} -- ` + error
`Failed to create a new stream for ${peer.id.toString()} -- ` +
lastError
);
}

return stream;
}

private prepareStream(peer: Peer): void {
const timeoutPromise = new Promise<void>((resolve) =>
setTimeout(resolve, CONNECTION_TIMEOUT)
);
private async createStreamWithLock(peer: Peer): Promise<void> {
const peerId = peer.id.toString();

const streamPromise = Promise.race([
this.createStream(peer),
timeoutPromise.then(() => {
throw new Error("Connection timeout");
})
]).catch((error) => {
this.log.error(
`Failed to prepare a new stream for ${peer.id.toString()} -- `,
error
if (this.ongoingCreation.has(peerId)) {
this.log.info(
`Skipping creation of a stream due to lock for peerId=${peerId} multicodec=${this.multicodec}`
);
});
return;
}

this.streamPool.set(peer.id.toString(), streamPromise);
try {
this.ongoingCreation.add(peerId);
await this.createStream(peer);
} finally {
this.ongoingCreation.delete(peerId);
}
}

private handlePeerUpdateStreamPool = (evt: CustomEvent<PeerUpdate>): void => {
const { peer } = evt.detail;

if (peer.protocols.includes(this.multicodec)) {
const isConnected = this.isConnectedTo(peer.id);
if (!peer.protocols.includes(this.multicodec)) {
return;
}

if (isConnected) {
this.log.info(`Preemptively opening a stream to ${peer.id.toString()}`);
this.prepareStream(peer);
} else {
const peerIdStr = peer.id.toString();
this.streamPool.delete(peerIdStr);
this.log.info(
`Removed pending stream for disconnected peer ${peerIdStr}`
);
}
const stream = this.getStreamForCodec(peer.id);

if (stream) {
return;
}

this.scheduleNewStream(peer);
};

private isConnectedTo(peerId: PeerId): boolean {
const connections = this.getConnections(peerId);
return connections.some((connection) => connection.status === "open");
private scheduleNewStream(peer: Peer): void {
this.log.info(
`Scheduling creation of a stream for peerId=${peer.id.toString()} multicodec=${this.multicodec}`
);

const timeoutPromise = new Promise<undefined>((resolve) =>
setTimeout(() => resolve(undefined), CONNECTION_TIMEOUT)
);
weboko marked this conversation as resolved.
Show resolved Hide resolved

const streamPromise = Promise.race([
this.createStreamWithLock(peer),
timeoutPromise
]);

this.streamPool.set(peer.id.toString(), streamPromise);
}

private getStreamForCodec(peerId: PeerId): Stream | undefined {
const connection: Connection | undefined = this.getConnections(peerId).find(
(c) => c.status === "open"
);

if (!connection) {
return;
}

const stream = connection.streams.find(
(s) => s.protocol === this.multicodec
);

return stream;
}
}
7 changes: 2 additions & 5 deletions packages/sdk/src/protocols/base_protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,19 +59,16 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
public async renewPeer(peerToDisconnect: PeerId): Promise<Peer> {
this.log.info(`Renewing peer ${peerToDisconnect}`);

await this.connectionManager.dropConnection(peerToDisconnect);

const peer = (await this.findAndAddPeers(1))[0];
if (!peer) {
this.log.error(
"Failed to find a new peer to replace the disconnected one."
);
throw Error("Failed to find a new peer to replace the disconnected one.");
}

const updatedPeers = this.peers.filter(
(peer) => !peer.id.equals(peerToDisconnect)
);
this.updatePeers(updatedPeers);
await this.connectionManager.dropConnection(peerToDisconnect);

this.log.info(
`Peer ${peerToDisconnect} disconnected and removed from the peer list`
Expand Down
Loading