Skip to content

Commit

Permalink
feat: Filter reacts to peer:disconnect event, add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
danisharora099 committed Oct 9, 2024
1 parent ed138cc commit 43fa32b
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 4 deletions.
3 changes: 2 additions & 1 deletion packages/sdk/src/protocols/filter/subscription_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ export class SubscriptionManager implements ISubscription {
this.getPeers.bind(this),
this.renewPeer.bind(this),
() => Array.from(this.subscriptionCallbacks.keys()),
this.protocol.subscribe.bind(this.protocol)
this.protocol.subscribe.bind(this.protocol),
this.protocol.addLibp2pEventListener.bind(this.protocol)
);
}

Expand Down
7 changes: 5 additions & 2 deletions packages/sdk/src/reliability_monitor/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { Peer, PeerId } from "@libp2p/interface";
import {
ContentTopic,
CoreProtocolResult,
Libp2p,
PubsubTopic
} from "@waku/interfaces";

Expand All @@ -24,7 +25,8 @@ export class ReliabilityMonitorManager {
pubsubTopic: PubsubTopic,
peer: Peer,
contentTopics: ContentTopic[]
) => Promise<CoreProtocolResult>
) => Promise<CoreProtocolResult>,
addLibp2pEventListener: Libp2p["addEventListener"]
): ReceiverReliabilityMonitor {
if (ReliabilityMonitorManager.receiverMonitors.has(pubsubTopic)) {
return ReliabilityMonitorManager.receiverMonitors.get(pubsubTopic)!;
Expand All @@ -35,7 +37,8 @@ export class ReliabilityMonitorManager {
getPeers,
renewPeer,
getContentTopics,
protocolSubscribe
protocolSubscribe,
addLibp2pEventListener
);
ReliabilityMonitorManager.receiverMonitors.set(pubsubTopic, monitor);
return monitor;
Expand Down
11 changes: 10 additions & 1 deletion packages/sdk/src/reliability_monitor/receiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
ContentTopic,
CoreProtocolResult,
IProtoMessage,
Libp2p,
PeerIdStr,
PubsubTopic
} from "@waku/interfaces";
Expand Down Expand Up @@ -38,7 +39,8 @@ export class ReceiverReliabilityMonitor {
pubsubTopic: PubsubTopic,
peer: Peer,
contentTopics: ContentTopic[]
) => Promise<CoreProtocolResult>
) => Promise<CoreProtocolResult>,
private addLibp2pEventListener: Libp2p["addEventListener"]
) {
const allPeerIdStr = this.getPeers().map((p) => p.id.toString());

Expand All @@ -49,6 +51,13 @@ export class ReceiverReliabilityMonitor {
}
};
allPeerIdStr.forEach((peerId) => this.missedMessagesByPeer.set(peerId, 0));

this.addLibp2pEventListener("peer:disconnect", (evt) => {
const peerId = evt.detail;
if (this.getPeers().some((p) => p.id.equals(peerId))) {
void this.renewAndSubscribePeer(peerId);
}
});
}

public setMaxMissedMessagesThreshold(value: number | undefined): void {
Expand Down
48 changes: 48 additions & 0 deletions packages/tests/tests/filter/peer_management.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -272,4 +272,52 @@ describe("Waku Filter: Peer Management: E2E", function () {

expect(waku.filter.connectedPeers.length).to.equal(2);
});

it("Renews peer for Filter on peer:disconnect event", async function () {
this.timeout(30000);

const messages: DecodedMessage[] = [];
const { error, subscription } = await waku.filter.subscribe(
[decoder],
(msg) => {
messages.push(msg);
}
);

if (error) {
throw error;
}

const initialPeers = waku.filter.connectedPeers;
expect(initialPeers.length).to.equal(waku.filter.numPeersToUse);

const peerToDisconnect = initialPeers[0];
await waku.connectionManager.dropConnection(peerToDisconnect.id);

await delay(5000);

expect(waku.filter.connectedPeers.length).to.equal(
waku.filter.numPeersToUse
);

const stillConnected = waku.filter.connectedPeers.some((peer) =>
peer.id.equals(peerToDisconnect.id)
);
expect(stillConnected).to.be.false;

await waku.lightPush.send(encoder, {
payload: utf8ToBytes("Hello after disconnect")
});

await delay(2000);

expect(messages.length).to.equal(1);
expect(new TextDecoder().decode(messages[0].payload)).to.equal(
"Hello after disconnect"
);

const pingResult = await subscription.ping();
expect(pingResult.successes.length).to.equal(waku.filter.numPeersToUse);
expect(pingResult.failures.length).to.equal(0);
});
});

0 comments on commit 43fa32b

Please sign in to comment.