From c1b20476685e91be8e38ec9ca87080fe8d383407 Mon Sep 17 00:00:00 2001 From: Michael Taylor Date: Mon, 30 Sep 2024 10:43:47 -0400 Subject: [PATCH] feat: more parrallel payout processing --- src/tasks/payouts.ts | 164 +++++++++++++++++++++++++++---------------- 1 file changed, 103 insertions(+), 61 deletions(-) diff --git a/src/tasks/payouts.ts b/src/tasks/payouts.ts index 84cee1e..4133911 100644 --- a/src/tasks/payouts.ts +++ b/src/tasks/payouts.ts @@ -12,11 +12,6 @@ import { import { Mutex } from "async-mutex"; import { IncentiveProgram } from "../utils/IncentiveProgram"; import { hexToUtf8 } from "../utils/hexUtils"; -import crypto from "crypto"; - -function md5(input: string) { - return crypto.createHash('md5').update(input).digest('hex'); -} const mutex = new Mutex(); @@ -155,7 +150,8 @@ const runIncentiveProgram = async ( console.log(`Root hash for current epoch: ${rootHash}`); const rewardThisRound = - (BigInt(program.xchRewardPerEpoch) * mojosPerXch) / BigInt(roundsPerEpoch); + (BigInt(program.xchRewardPerEpoch) * mojosPerXch) / + BigInt(roundsPerEpoch); console.log(`Reward for this round: ${rewardThisRound} mojos`); @@ -186,7 +182,10 @@ const runIncentiveProgram = async ( while (!payoutMade) { console.log("Sampling up to 50 peers from the current epoch..."); - const serverCoins = await serverCoin.sampleCurrentEpoch(50, peerBlackList); + const serverCoins = await serverCoin.sampleCurrentEpoch( + 50, + peerBlackList + ); if (serverCoins.length === 0) { console.log(`No more peers available for storeId ${program.storeId}`); @@ -201,62 +200,94 @@ const runIncentiveProgram = async ( const digPeer = new DigPeer(peerIp, program.storeId); // Track request results in the current run - pendingRequests.push(new Promise((resolve) => { - addToRequestQueue(async () => { - try { - // Add headStore request to the queue - const response = await withTimeout( - digPeer.contentServer.headStore(), - 10000, - `headStore timed out for peer ${peerIp}` - ); - console.log(`Peer ${peerIp} responded to headStore request`); - - const peerGenerationHash = response.headers?.["x-generation-hash"]; - if (peerGenerationHash === rootHash) { - console.log(`Peer ${peerIp} has correct generation hash.`); - - // Use Promise.all so that any failure immediately marks the peer as invalid - await Promise.all( - randomKeysHex.map(async (hexKey) => { - const expectedChallengeResponse = await getExpectedChallengeResponse( - program.storeId, - hexKey, - rootHash + pendingRequests.push( + new Promise((resolve) => { + addToRequestQueue( + async () => { + try { + // Add headStore request to the queue + const response = await withTimeout( + digPeer.contentServer.headStore(), + 10000, + `headStore timed out for peer ${peerIp}` + ); + console.log(`Peer ${peerIp} responded to headStore request`); + + const peerGenerationHash = + response.headers?.["x-generation-hash"]; + if (peerGenerationHash === rootHash) { + console.log(`Peer ${peerIp} has correct generation hash.`); + + // Use Promise.all so that any failure immediately marks the peer as invalid + await Promise.all( + randomKeysHex.map(async (hexKey) => { + const digChallenge = new DigChallenge( + program.storeId, + hexKey, + rootHash + ); + const seed = DigChallenge.generateSeed(); + const challenge = await digChallenge.generateChallenge( + seed + ); + const serializedChallenge = + DigChallenge.serializeChallenge(challenge); + + // Send the serialized challenge to the peer + const peerChallengeResponse = await withTimeout( + digPeer.contentServer.getKey( + hexToUtf8(hexKey), + rootHash, + serializedChallenge + ), + 10000, + `getKey timed out for peer ${peerIp}` + ); + + // Create the expected challenge response locally + const expectedChallengeResponse = + await digChallenge.createChallengeResponse(challenge); + + console.log( + `${peerIp} - ${hexToUtf8( + hexKey + )} - ${peerChallengeResponse} - ${expectedChallengeResponse}` + ); + + // Compare the peer's response with the expected response + if ( + peerChallengeResponse !== expectedChallengeResponse + ) { + throw new Error( + `Challenge response does not match for peer ${peerIp}` + ); + } + }) ); - const peerChallengeResponse = await withTimeout( - digPeer.contentServer.getKey( - hexToUtf8(hexKey), - rootHash, - expectedChallengeResponse - ), - 10000, - `getKey timed out for peer ${peerIp}` + validPeers.push(digPeer); + console.log( + `Peer ${peerIp} passed all challenges and is valid.` ); - - if (peerChallengeResponse !== expectedChallengeResponse) { - throw new Error(`Challenge response does not match for peer ${peerIp}`); - } - }) - ); - - validPeers.push(digPeer); - console.log(`Peer ${peerIp} passed all challenges and is valid.`); - resolve(true); - } else { - console.log(`Peer ${peerIp} has an incorrect generation hash.`); - resolve(false); + resolve(true); + } else { + console.log( + `Peer ${peerIp} has an incorrect generation hash.` + ); + resolve(false); + } + } catch (error: any) { + console.error(`Error with peer ${peerIp}: ${error.message}`); + resolve(false); // Skip this peer and continue to the next + } + }, + (result) => { + // The callback processes the result + console.log(`Callback for peer ${peerIp}, result: ${result}`); } - } catch (error: any) { - console.error(`Error with peer ${peerIp}: ${error.message}`); - resolve(false); // Skip this peer and continue to the next - } - }, (result) => { - // The callback processes the result - console.log(`Callback for peer ${peerIp}, result: ${result}`); - }); - })); + ); + }) + ); } // Wait for all pending requests for this program to complete @@ -278,14 +309,25 @@ const runIncentiveProgram = async ( const { epoch: currentEpoch, round: currentRound } = ServerCoin.getCurrentEpoch(); + const paymentHint = DigPeer.createPaymentHint( + Buffer.from(program.storeId, "hex") + ); const message = Buffer.from( `DIG Network payout: Store Id ${program.storeId}, Epoch ${currentEpoch}, Round ${currentRound}`, "utf-8" ); - + console.log( + `Payment hint: ${paymentHint.toString("hex")} - ${message.toString( + "utf-8" + )}` + ); + // For the alpha program we are going to forgo the hint and just use the message so people can see it in their chia wallet + //const memo = [paymentHint, message]; const memos = [message]; - console.log(`Sending equal bulk payments to ${paymentAddresses.length} valid peers...`); + console.log( + `Sending equal bulk payments to ${paymentAddresses.length} valid peers...` + ); await DigPeer.sendEqualBulkPayments( program.walletName, paymentAddresses,