Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: check filter against lightPush, basic implementation #2185

Open
wants to merge 2 commits into
base: weboko/reliability-filter
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/interfaces/src/filter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ export type SubscriptionCallback<T extends IDecodedMessage> = {
export type SubscribeOptions = {
keepAlive?: number;
pingsBeforePeerRenewed?: number;
maxMissedMessagesThreshold?: number;
enableLightPushFilterCheck?: boolean;
adklempner marked this conversation as resolved.
Show resolved Hide resolved
};

export interface ISubscription {
Expand Down
4 changes: 2 additions & 2 deletions packages/message-hash/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { sha256 } from "@noble/hashes/sha256";
import type { IDecodedMessage, IProtoMessage } from "@waku/interfaces";
import { isDefined } from "@waku/utils";
import {
bytesToUtf8,
bytesToHex,
concat,
numberToBytes,
utf8ToBytes
Expand Down Expand Up @@ -56,6 +56,6 @@ export function messageHashStr(
message: IProtoMessage | IDecodedMessage
): string {
const hash = messageHash(pubsubTopic, message);
const hashStr = bytesToUtf8(hash);
const hashStr = bytesToHex(hash);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is fix for messageHashStr when it doesn't transform bytes properly and shows weird symbols

return hashStr;
}
5 changes: 4 additions & 1 deletion packages/sdk/src/protocols/filter/constants.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
export const DEFAULT_KEEP_ALIVE = 10_000;
export const DEFAULT_LIGHT_PUSH_FILTER_CHECK = false;
export const DEFAULT_LIGHT_PUSH_FILTER_CHECK_INTERVAL = 10_000;

export const DEFAULT_SUBSCRIBE_OPTIONS = {
keepAlive: DEFAULT_KEEP_ALIVE
keepAlive: DEFAULT_KEEP_ALIVE,
enableLightPushFilterCheck: DEFAULT_LIGHT_PUSH_FILTER_CHECK
};
12 changes: 9 additions & 3 deletions packages/sdk/src/protocols/filter/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
type IDecodedMessage,
type IDecoder,
type IFilter,
type ILightPush,
type Libp2p,
NetworkConfig,
type ProtocolCreateOptions,
Expand Down Expand Up @@ -38,7 +39,8 @@ class Filter extends BaseProtocolSDK implements IFilter {

public constructor(
connectionManager: ConnectionManager,
libp2p: Libp2p,
private libp2p: Libp2p,
private lightPush?: ILightPush,
options?: ProtocolCreateOptions
) {
super(
Expand Down Expand Up @@ -195,7 +197,9 @@ class Filter extends BaseProtocolSDK implements IFilter {
this.protocol,
this.connectionManager,
() => this.connectedPeers,
this.renewPeer.bind(this)
this.renewPeer.bind(this),
this.libp2p,
this.lightPush
)
);

Expand Down Expand Up @@ -300,7 +304,9 @@ class Filter extends BaseProtocolSDK implements IFilter {

export function wakuFilter(
connectionManager: ConnectionManager,
lightPush?: ILightPush,
init?: ProtocolCreateOptions
): (libp2p: Libp2p) => IFilter {
return (libp2p: Libp2p) => new Filter(connectionManager, libp2p, init);
return (libp2p: Libp2p) =>
new Filter(connectionManager, libp2p, lightPush, init);
}
107 changes: 95 additions & 12 deletions packages/sdk/src/protocols/filter/subscription_manager.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
import type { Peer } from "@libp2p/interface";
import type { PeerId } from "@libp2p/interface";
import { ConnectionManager, FilterCore } from "@waku/core";
import {
ConnectionManager,
createDecoder,
createEncoder,
FilterCore,
LightPushCore
} from "@waku/core";
import {
type Callback,
type ContentTopic,
type CoreProtocolResult,
EConnectionStateEvents,
type IDecodedMessage,
type IDecoder,
type ILightPush,
type IProtoMessage,
type ISubscription,
type Libp2p,
type PeerIdStr,
ProtocolError,
type PubsubTopic,
Expand All @@ -23,7 +31,12 @@ import { groupByContentTopic, Logger } from "@waku/utils";
import { ReliabilityMonitorManager } from "../../reliability_monitor/index.js";
import { ReceiverReliabilityMonitor } from "../../reliability_monitor/receiver.js";

import { DEFAULT_KEEP_ALIVE, DEFAULT_SUBSCRIBE_OPTIONS } from "./constants.js";
import {
DEFAULT_KEEP_ALIVE,
DEFAULT_LIGHT_PUSH_FILTER_CHECK,
DEFAULT_LIGHT_PUSH_FILTER_CHECK_INTERVAL,
DEFAULT_SUBSCRIBE_OPTIONS
} from "./constants.js";

const log = new Logger("sdk:filter:subscription_manager");

Expand All @@ -33,6 +46,8 @@ export class SubscriptionManager implements ISubscription {
private keepAliveTimeout: number = DEFAULT_KEEP_ALIVE;
private keepAliveInterval: ReturnType<typeof setInterval> | null = null;

private enableLightPushFilterCheck = DEFAULT_LIGHT_PUSH_FILTER_CHECK;

private subscriptionCallbacks: Map<
ContentTopic,
SubscriptionCallback<IDecodedMessage>
Expand All @@ -45,7 +60,9 @@ export class SubscriptionManager implements ISubscription {
private readonly getPeers: () => Peer[],
private readonly renewPeer: (
peerToDisconnect: PeerId
) => Promise<Peer | undefined>
) => Promise<Peer | undefined>,
private readonly libp2p: Libp2p,
private readonly lightPush?: ILightPush
) {
this.pubsubTopic = pubsubTopic;
this.subscriptionCallbacks = new Map();
Expand All @@ -56,7 +73,8 @@ export class SubscriptionManager implements ISubscription {
this.renewPeer.bind(this),
() => Array.from(this.subscriptionCallbacks.keys()),
this.protocol.subscribe.bind(this.protocol),
this.protocol.addLibp2pEventListener.bind(this.protocol)
this.protocol.addLibp2pEventListener.bind(this.protocol),
this.sendLightPushCheckMessage.bind(this)
);
}

Expand All @@ -65,11 +83,10 @@ export class SubscriptionManager implements ISubscription {
callback: Callback<T>,
options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS
): Promise<SDKProtocolResult> {
this.reliabilityMonitor.setMaxMissedMessagesThreshold(
options.maxMissedMessagesThreshold
);
this.reliabilityMonitor.setMaxPingFailures(options.pingsBeforePeerRenewed);
this.keepAliveTimeout = options.keepAlive || DEFAULT_KEEP_ALIVE;
this.enableLightPushFilterCheck =
options?.enableLightPushFilterCheck || DEFAULT_LIGHT_PUSH_FILTER_CHECK;

const decodersArray = Array.isArray(decoders) ? decoders : [decoders];

Expand All @@ -87,11 +104,20 @@ export class SubscriptionManager implements ISubscription {
}
}

if (this.enableLightPushFilterCheck) {
decodersArray.push(
createDecoder(
this.buildLightPushContentTopic(),
this.pubsubTopic
) as IDecoder<T>
);
}

const decodersGroupedByCT = groupByContentTopic(decodersArray);
const contentTopics = Array.from(decodersGroupedByCT.keys());

const promises = this.getPeers().map(async (peer) =>
this.protocol.subscribe(this.pubsubTopic, peer, contentTopics)
this.subscribeWithPeerVerification(peer, contentTopics)
);

const results = await Promise.allSettled(promises);
Expand All @@ -109,6 +135,11 @@ export class SubscriptionManager implements ISubscription {
callback
} as unknown as SubscriptionCallback<IDecodedMessage>;

// don't handle case of internal content topic
if (contentTopic === this.buildLightPushContentTopic()) {
adklempner marked this conversation as resolved.
Show resolved Hide resolved
return;
}

// The callback and decoder may override previous values, this is on
// purpose as the user may call `subscribe` to refresh the subscription
this.subscriptionCallbacks.set(contentTopic, subscriptionCallback);
Expand Down Expand Up @@ -176,10 +207,9 @@ export class SubscriptionManager implements ISubscription {
message: WakuMessage,
peerIdStr: PeerIdStr
): Promise<void> {
const alreadyReceived = this.reliabilityMonitor.processIncomingMessage(
message,
this.pubsubTopic,
peerIdStr
const alreadyReceived = this.reliabilityMonitor.notifyMessageReceived(
peerIdStr,
message as IProtoMessage
);

if (alreadyReceived) {
Expand All @@ -202,6 +232,19 @@ export class SubscriptionManager implements ISubscription {
await pushMessage(subscriptionCallback, this.pubsubTopic, message);
}

private async subscribeWithPeerVerification(
peer: Peer,
contentTopics: string[]
): Promise<CoreProtocolResult> {
const result = await this.protocol.subscribe(
this.pubsubTopic,
peer,
contentTopics
);
await this.sendLightPushCheckMessage(peer);
return result;
}

private handleResult(
results: PromiseSettledResult<CoreProtocolResult>[],
type: "ping" | "subscribe" | "unsubscribe" | "unsubscribeAll"
Expand Down Expand Up @@ -324,6 +367,46 @@ export class SubscriptionManager implements ISubscription {
clearInterval(this.keepAliveInterval);
this.keepAliveInterval = null;
}

private async sendLightPushCheckMessage(peer: Peer): Promise<void> {
if (
this.lightPush &&
this.libp2p &&
this.reliabilityMonitor.shouldVerifyPeer(peer.id)
) {
const encoder = createEncoder({
contentTopic: this.buildLightPushContentTopic(),
pubsubTopic: this.pubsubTopic,
ephemeral: true
});

const message = { payload: new Uint8Array(1) };
const protoMessage = await encoder.toProtoObj(message);

// make a delay to be sure message is send when subscription is in place
setTimeout(
(async () => {
const result = await (this.lightPush!.protocol as LightPushCore).send(
encoder,
message,
peer
);
this.reliabilityMonitor.notifyMessageSent(peer.id, protoMessage);
if (result.failure) {
log.error(
`failed to send lightPush ping message to peer:${peer.id.toString()}\t${result.failure.error}`
);
return;
}
}) as () => void,
DEFAULT_LIGHT_PUSH_FILTER_CHECK_INTERVAL
);
}
}

private buildLightPushContentTopic(): string {
return `/js-waku-subscription-ping/1/${this.libp2p.peerId.toString()}/utf8`;
}
}

async function pushMessage<T extends IDecodedMessage>(
Expand Down
7 changes: 4 additions & 3 deletions packages/sdk/src/reliability_monitor/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ export class ReliabilityMonitorManager {
peer: Peer,
contentTopics: ContentTopic[]
) => Promise<CoreProtocolResult>,
addLibp2pEventListener: Libp2p["addEventListener"]
addLibp2pEventListener: Libp2p["addEventListener"],
sendLightPushMessage: (peer: Peer) => Promise<void>
): ReceiverReliabilityMonitor {
if (ReliabilityMonitorManager.receiverMonitors.has(pubsubTopic)) {
return ReliabilityMonitorManager.receiverMonitors.get(pubsubTopic)!;
Expand All @@ -36,7 +37,8 @@ export class ReliabilityMonitorManager {
renewPeer,
getContentTopics,
protocolSubscribe,
addLibp2pEventListener
addLibp2pEventListener,
sendLightPushMessage
);
ReliabilityMonitorManager.receiverMonitors.set(pubsubTopic, monitor);
return monitor;
Expand All @@ -50,7 +52,6 @@ export class ReliabilityMonitorManager {

public static stopAll(): void {
for (const [pubsubTopic, monitor] of this.receiverMonitors) {
monitor.setMaxMissedMessagesThreshold(undefined);
monitor.setMaxPingFailures(undefined);
this.receiverMonitors.delete(pubsubTopic);
}
Expand Down
Loading
Loading