Skip to content

Commit

Permalink
feat: more parrallel payout processing
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelTaylor3D committed Sep 30, 2024
1 parent 98c6ef1 commit c1b2047
Showing 1 changed file with 103 additions and 61 deletions.
164 changes: 103 additions & 61 deletions src/tasks/payouts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,6 @@ import {
import { Mutex } from "async-mutex";
import { IncentiveProgram } from "../utils/IncentiveProgram";
import { hexToUtf8 } from "../utils/hexUtils";
import crypto from "crypto";

function md5(input: string) {
return crypto.createHash('md5').update(input).digest('hex');
}

const mutex = new Mutex();

Expand Down Expand Up @@ -155,7 +150,8 @@ const runIncentiveProgram = async (
console.log(`Root hash for current epoch: ${rootHash}`);

const rewardThisRound =
(BigInt(program.xchRewardPerEpoch) * mojosPerXch) / BigInt(roundsPerEpoch);
(BigInt(program.xchRewardPerEpoch) * mojosPerXch) /
BigInt(roundsPerEpoch);

console.log(`Reward for this round: ${rewardThisRound} mojos`);

Expand Down Expand Up @@ -186,7 +182,10 @@ const runIncentiveProgram = async (
while (!payoutMade) {
console.log("Sampling up to 50 peers from the current epoch...");

const serverCoins = await serverCoin.sampleCurrentEpoch(50, peerBlackList);
const serverCoins = await serverCoin.sampleCurrentEpoch(
50,
peerBlackList
);

if (serverCoins.length === 0) {
console.log(`No more peers available for storeId ${program.storeId}`);
Expand All @@ -201,62 +200,94 @@ const runIncentiveProgram = async (
const digPeer = new DigPeer(peerIp, program.storeId);

// Track request results in the current run
pendingRequests.push(new Promise((resolve) => {
addToRequestQueue(async () => {
try {
// Add headStore request to the queue
const response = await withTimeout(
digPeer.contentServer.headStore(),
10000,
`headStore timed out for peer ${peerIp}`
);
console.log(`Peer ${peerIp} responded to headStore request`);

const peerGenerationHash = response.headers?.["x-generation-hash"];
if (peerGenerationHash === rootHash) {
console.log(`Peer ${peerIp} has correct generation hash.`);

// Use Promise.all so that any failure immediately marks the peer as invalid
await Promise.all(
randomKeysHex.map(async (hexKey) => {
const expectedChallengeResponse = await getExpectedChallengeResponse(
program.storeId,
hexKey,
rootHash
pendingRequests.push(
new Promise((resolve) => {
addToRequestQueue(
async () => {
try {
// Add headStore request to the queue
const response = await withTimeout(
digPeer.contentServer.headStore(),
10000,
`headStore timed out for peer ${peerIp}`
);
console.log(`Peer ${peerIp} responded to headStore request`);

const peerGenerationHash =
response.headers?.["x-generation-hash"];
if (peerGenerationHash === rootHash) {
console.log(`Peer ${peerIp} has correct generation hash.`);

// Use Promise.all so that any failure immediately marks the peer as invalid
await Promise.all(
randomKeysHex.map(async (hexKey) => {
const digChallenge = new DigChallenge(
program.storeId,
hexKey,
rootHash
);
const seed = DigChallenge.generateSeed();
const challenge = await digChallenge.generateChallenge(
seed
);
const serializedChallenge =
DigChallenge.serializeChallenge(challenge);

// Send the serialized challenge to the peer
const peerChallengeResponse = await withTimeout(
digPeer.contentServer.getKey(
hexToUtf8(hexKey),
rootHash,
serializedChallenge
),
10000,
`getKey timed out for peer ${peerIp}`
);

// Create the expected challenge response locally
const expectedChallengeResponse =
await digChallenge.createChallengeResponse(challenge);

console.log(
`${peerIp} - ${hexToUtf8(
hexKey
)} - ${peerChallengeResponse} - ${expectedChallengeResponse}`
);

// Compare the peer's response with the expected response
if (
peerChallengeResponse !== expectedChallengeResponse
) {
throw new Error(
`Challenge response does not match for peer ${peerIp}`
);
}
})
);

const peerChallengeResponse = await withTimeout(
digPeer.contentServer.getKey(
hexToUtf8(hexKey),
rootHash,
expectedChallengeResponse
),
10000,
`getKey timed out for peer ${peerIp}`
validPeers.push(digPeer);
console.log(
`Peer ${peerIp} passed all challenges and is valid.`
);

if (peerChallengeResponse !== expectedChallengeResponse) {
throw new Error(`Challenge response does not match for peer ${peerIp}`);
}
})
);

validPeers.push(digPeer);
console.log(`Peer ${peerIp} passed all challenges and is valid.`);
resolve(true);
} else {
console.log(`Peer ${peerIp} has an incorrect generation hash.`);
resolve(false);
resolve(true);
} else {
console.log(
`Peer ${peerIp} has an incorrect generation hash.`
);
resolve(false);
}
} catch (error: any) {
console.error(`Error with peer ${peerIp}: ${error.message}`);
resolve(false); // Skip this peer and continue to the next
}
},
(result) => {
// The callback processes the result
console.log(`Callback for peer ${peerIp}, result: ${result}`);
}
} catch (error: any) {
console.error(`Error with peer ${peerIp}: ${error.message}`);
resolve(false); // Skip this peer and continue to the next
}
}, (result) => {
// The callback processes the result
console.log(`Callback for peer ${peerIp}, result: ${result}`);
});
}));
);
})
);
}

// Wait for all pending requests for this program to complete
Expand All @@ -278,14 +309,25 @@ const runIncentiveProgram = async (

const { epoch: currentEpoch, round: currentRound } =
ServerCoin.getCurrentEpoch();
const paymentHint = DigPeer.createPaymentHint(
Buffer.from(program.storeId, "hex")
);
const message = Buffer.from(
`DIG Network payout: Store Id ${program.storeId}, Epoch ${currentEpoch}, Round ${currentRound}`,
"utf-8"
);

console.log(
`Payment hint: ${paymentHint.toString("hex")} - ${message.toString(
"utf-8"
)}`
);
// For the alpha program we are going to forgo the hint and just use the message so people can see it in their chia wallet
//const memo = [paymentHint, message];
const memos = [message];

console.log(`Sending equal bulk payments to ${paymentAddresses.length} valid peers...`);
console.log(
`Sending equal bulk payments to ${paymentAddresses.length} valid peers...`
);
await DigPeer.sendEqualBulkPayments(
program.walletName,
paymentAddresses,
Expand Down

0 comments on commit c1b2047

Please sign in to comment.