Skip to content

Commit

Permalink
chore: refactor LightPush send (#1487)
Browse files Browse the repository at this point in the history
* refactor lightpush send

* add trycatch
  • Loading branch information
danisharora099 authored Aug 22, 2023
1 parent 9d4fa3f commit 812310a
Showing 1 changed file with 53 additions and 18 deletions.
71 changes: 53 additions & 18 deletions packages/core/src/lib/light_push/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,44 +38,78 @@ class LightPush extends BaseProtocol implements ILightPush {
this.options = options || {};
}

private async preparePushMessage(
encoder: IEncoder,
message: IMessage,
pubSubTopic: string
): Promise<{
query: PushRpc | null;
error?: SendError;
}> {
if (!isSizeValid(message.payload)) {
log("Failed to send waku light push: message is bigger than 1MB");
return { query: null, error: SendError.SIZE_TOO_BIG };
}

const protoMessage = await encoder.toProtoObj(message);
if (!protoMessage) {
log("Failed to encode to protoMessage, aborting push");
return {
query: null,
error: SendError.ENCODE_FAILED
};
}

const query = PushRpc.createRequest(protoMessage, pubSubTopic);
return { query };
}

async send(
encoder: IEncoder,
message: IMessage,
opts?: ProtocolOptions
): Promise<SendResult> {
const { pubSubTopic = DefaultPubSubTopic } = this.options;

const peer = await this.getPeer(opts?.peerId);
const stream = await this.newStream(peer);

const recipients: PeerId[] = [];
let error: undefined | SendError = undefined;

let query: PushRpc | null = null;

try {
if (!isSizeValid(message.payload)) {
log("Failed to send waku light push: message is bigger that 1MB");
return {
recipients,
error: SendError.SIZE_TOO_BIG
};
}
const { query: preparedQuery, error: preparationError } =
await this.preparePushMessage(encoder, message, pubSubTopic);

const protoMessage = await encoder.toProtoObj(message);
if (!protoMessage) {
log("Failed to encode to protoMessage, aborting push");
if (preparationError) {
return {
recipients,
error: SendError.ENCODE_FAILED
error: preparationError
};
}
const query = PushRpc.createRequest(protoMessage, pubSubTopic);

query = preparedQuery;
} catch (error) {
log("Failed to prepare push message", error);
}

if (!query) {
return {
recipients,
error: SendError.GENERIC_FAIL
};
}

const peer = await this.getPeer(opts?.peerId);
const stream = await this.newStream(peer);

try {
const res = await pipe(
[query.encode()],
lp.encode,
stream,
lp.decode,
async (source) => await all(source)
);

try {
const bytes = new Uint8ArrayList();
res.forEach((chunk) => {
Expand All @@ -98,9 +132,10 @@ class LightPush extends BaseProtocol implements ILightPush {
log("Failed to send waku light push request", err);
error = SendError.GENERIC_FAIL;
}

return {
error,
recipients
recipients,
error
};
}
}
Expand Down

0 comments on commit 812310a

Please sign in to comment.