Skip to content

Commit

Permalink
feat: confirm metadata and protocols needed in waitForRemotePeer (#2160)
Browse files Browse the repository at this point in the history
* fix comment of default number of peers

* export default number of peers from base protocol sdk

* rename to light_push, move class to separate file

* move waitForRemotePeer to sdk package

* add todo to move waitForGossipSubPeerInMesh into @waku/relay

* clean up waitForRemotePeer, split metadata await from event and optimise, decouple from protocol implementations

* simplify and rename ILightPush interface

* use only connected peers in light push based on connections instead of peer renewal mechanism

* improve readability of result processing in light push

* fix check & update tests

* address tests, add new test cases, fix racing condition in StreamManager

* use libp2p.getPeers

* feat: confirm metadata and protocols needed in waitForRemotePeer

* rely on passed protocols and fallback to mounted

* imrpove iteration for existing connections

* address protocol adverisement in CI

* add protocols needed

* add missing protocols

* make lightpush and filter default for tests

* up
  • Loading branch information
weboko authored Oct 4, 2024
1 parent 2be0e81 commit d37e024
Show file tree
Hide file tree
Showing 10 changed files with 87 additions and 47 deletions.
84 changes: 62 additions & 22 deletions packages/sdk/src/wait_for_remote_peer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,16 @@ export async function waitForRemotePeer(
protocols?: Protocols[],
timeoutMs?: number
): Promise<void> {
protocols = protocols ?? getEnabledProtocols(waku);
// if no protocols or empty array passed - try to derive from mounted
protocols = protocols?.length ? protocols : getEnabledProtocols(waku);
const connections = waku.libp2p.getConnections();

if (!waku.isStarted()) {
throw Error("Waku node is not started");
}

if (connections.length > 0 && !protocols.includes(Protocols.Relay)) {
const success = await waitForMetadata(waku.libp2p);

const success = await waitForMetadata(waku, protocols);
if (success) {
return;
}
Expand Down Expand Up @@ -135,33 +135,55 @@ async function waitForConnectedPeer(
/**
* Waits for the metadata from the remote peer.
*/
async function waitForMetadata(libp2p: Libp2p): Promise<boolean> {
const connections = libp2p.getConnections();
const metadataService = libp2p.services.metadata;
async function waitForMetadata(
waku: Waku,
protocols: Protocols[]
): Promise<boolean> {
const connectedPeers = waku.libp2p.getPeers();
const metadataService = waku.libp2p.services.metadata;
const enabledCodes = mapProtocolsToCodecs(protocols);

if (!connections.length || !metadataService) {
if (!connectedPeers.length || !metadataService) {
log.info(
`Skipping waitForMetadata due to missing connections:${connections.length} or metadataService:${!!metadataService}`
`Skipping waitForMetadata due to missing connections:${connectedPeers.length} or metadataService:${!!metadataService}`
);
return false;
}

try {
// confirm at least with one connected peer
await Promise.any(
connections
.map((c) => c.remotePeer)
.map((peer) => metadataService.confirmOrAttemptHandshake(peer))
);
for (const peerId of connectedPeers) {
try {
const peer = await waku.libp2p.peerStore.get(peerId);
const hasSomeCodes = peer.protocols.some((c) => enabledCodes.has(c));

return true;
} catch (e) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
if ((e as any).code === "ERR_CONNECTION_BEING_CLOSED") {
log.error("Connection closed. Some peers can be on different shard.");
}
if (hasSomeCodes) {
const response =
await metadataService.confirmOrAttemptHandshake(peerId);

if (!response.error) {
peer.protocols.forEach((c) => {
if (enabledCodes.has(c)) {
enabledCodes.set(c, true);
}
});

log.error(`Error waiting for metadata: ${e}`);
const confirmedAllCodecs = Array.from(enabledCodes.values()).every(
(v) => v
);

if (confirmedAllCodecs) {
return true;
}
}
}
} catch (e) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
if ((e as any).code === "ERR_CONNECTION_BEING_CLOSED") {
log.error("Connection closed. Some peers can be on different shard.");
}

log.error(`Error while iterating through peers: ${e}`);
continue;
}
}

return false;
Expand Down Expand Up @@ -216,3 +238,21 @@ function getEnabledProtocols(waku: Waku): Protocols[] {

return protocols;
}

function mapProtocolsToCodecs(protocols: Protocols[]): Map<string, boolean> {
const codecs: Map<string, boolean> = new Map();

const protocolToCodec: Record<string, string> = {
[Protocols.Filter]: FilterCodecs.SUBSCRIBE,
[Protocols.LightPush]: LightPushCodec,
[Protocols.Store]: StoreCodec
};

for (const protocol of protocols) {
if (protocolToCodec[protocol]) {
codecs.set(protocolToCodec[protocol], false);
}
}

return codecs;
}
10 changes: 2 additions & 8 deletions packages/tests/src/utils/nodes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {
Waku
} from "@waku/interfaces";
import { createLightNode, waitForRemotePeer } from "@waku/sdk";
import { derivePubsubTopicsFromNetworkConfig, isDefined } from "@waku/utils";
import { derivePubsubTopicsFromNetworkConfig } from "@waku/utils";
import { Context } from "mocha";
import pRetry from "p-retry";

Expand Down Expand Up @@ -52,13 +52,7 @@ export async function runMultipleNodes(

for (const node of serviceNodes.nodes) {
await waku.dial(await node.getMultiaddrWithId());
await waitForRemotePeer(
waku,
[
!customArgs?.filter ? undefined : Protocols.Filter,
!customArgs?.lightpush ? undefined : Protocols.LightPush
].filter(isDefined)
);
await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]);
await node.ensureSubscriptions(
derivePubsubTopicsFromNetworkConfig(networkConfig)
);
Expand Down
6 changes: 1 addition & 5 deletions packages/tests/tests/ephemeral.node.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,7 @@ describe("Waku Message Ephemeral field", function () {
await waku.start();
await waku.dial(await nwaku.getMultiaddrWithId());

await waitForRemotePeer(waku, [
Protocols.Filter,
Protocols.LightPush,
Protocols.Store
]);
await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]);
});

it("Ephemeral messages are not stored", async function () {
Expand Down
4 changes: 2 additions & 2 deletions packages/tests/tests/filter/peer_management.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ describe("Waku Filter: Peer Management: E2E", function () {
[serviceNodes, waku] = await runMultipleNodes(
this.ctx,
DefaultTestShardInfo,
undefined,
{ lightpush: true, filter: true },
undefined,
5
);
Expand Down Expand Up @@ -222,7 +222,7 @@ describe("Waku Filter: Peer Management: E2E", function () {
const [serviceNodes, waku] = await runMultipleNodes(
this.ctx,
DefaultTestShardInfo,
undefined,
{ lightpush: true, filter: true },
undefined,
2
);
Expand Down
5 changes: 4 additions & 1 deletion packages/tests/tests/filter/ping.node.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ const runTests = (strictCheckNodes: boolean): void => {

beforeEachCustom(this, async () => {
try {
[serviceNodes, waku] = await runMultipleNodes(this.ctx, TestShardInfo);
[serviceNodes, waku] = await runMultipleNodes(this.ctx, TestShardInfo, {
lightpush: true,
filter: true
});
} catch (error) {
console.error(error);
}
Expand Down
5 changes: 4 additions & 1 deletion packages/tests/tests/filter/push.node.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ const runTests = (strictCheckNodes: boolean): void => {
let serviceNodes: ServiceNodesFleet;

beforeEachCustom(this, async () => {
[serviceNodes, waku] = await runMultipleNodes(this.ctx, TestShardInfo);
[serviceNodes, waku] = await runMultipleNodes(this.ctx, TestShardInfo, {
lightpush: true,
filter: true
});
});

afterEachCustom(this, async () => {
Expand Down
12 changes: 8 additions & 4 deletions packages/tests/tests/filter/unsubscribe.node.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,14 @@ const runTests = (strictCheckNodes: boolean): void => {
let serviceNodes: ServiceNodesFleet;

beforeEachCustom(this, async () => {
[serviceNodes, waku] = await runMultipleNodes(this.ctx, {
contentTopics: [TestContentTopic],
clusterId: ClusterId
});
[serviceNodes, waku] = await runMultipleNodes(
this.ctx,
{
contentTopics: [TestContentTopic],
clusterId: ClusterId
},
{ filter: true, lightpush: true }
);
});

afterEachCustom(this, async () => {
Expand Down
4 changes: 2 additions & 2 deletions packages/tests/tests/health-manager/protocols.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ describe("Health Manager", function () {
[serviceNodes, waku] = await runMultipleNodes(
this.ctx,
TestShardInfo,
undefined,
{ lightpush: true, filter: true },
undefined,
num
);
Expand All @@ -62,7 +62,7 @@ describe("Health Manager", function () {
[serviceNodes, waku] = await runMultipleNodes(
this.ctx,
TestShardInfo,
undefined,
{ filter: true, lightpush: true },
undefined,
num
);
Expand Down
2 changes: 1 addition & 1 deletion packages/tests/tests/light-push/index.node.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ const runTests = (strictNodeCheck: boolean): void => {
[serviceNodes, waku] = await runMultipleNodes(
this.ctx,
TestShardInfo,
undefined,
{ lightpush: true, filter: true },
strictNodeCheck,
numServiceNodes,
true
Expand Down
2 changes: 1 addition & 1 deletion packages/tests/tests/light-push/peer_management.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ describe("Waku Light Push: Connection Management: E2E", function () {
[serviceNodes, waku] = await runMultipleNodes(
this.ctx,
DefaultTestShardInfo,
undefined,
{ lightpush: true, filter: true },
undefined,
5
);
Expand Down

0 comments on commit d37e024

Please sign in to comment.