From b57a8e0572ba28abae81fe02179a9c4b9a1b4ea0 Mon Sep 17 00:00:00 2001 From: Michael Taylor Date: Fri, 20 Sep 2024 11:37:41 -0400 Subject: [PATCH 1/3] fix: sync process --- src/DataIntegrityTree/DataIntegrityTree.ts | 112 ++++++++++++++------- src/DigNetwork/DigNetwork.ts | 40 ++++++-- src/DigNetwork/DigPeer.ts | 4 +- src/blockchain/FullNodePeer.ts | 23 +++-- src/blockchain/ServerCoin.ts | 4 +- 5 files changed, 131 insertions(+), 52 deletions(-) diff --git a/src/DataIntegrityTree/DataIntegrityTree.ts b/src/DataIntegrityTree/DataIntegrityTree.ts index 9429877..1dece42 100644 --- a/src/DataIntegrityTree/DataIntegrityTree.ts +++ b/src/DataIntegrityTree/DataIntegrityTree.ts @@ -775,13 +775,14 @@ class DataIntegrityTree { * @param expectedRootHash - The expected root hash of the Merkle tree. * @returns A boolean indicating if the SHA-256 is present in the foreign tree and the root hash matches. */ - static validateKeyIntegrityWithForeignTree( - key: string, + static async validateKeyIntegrityWithForeignTree( + hexkey: string, sha256: string, serializedTree: object, - expectedRootHash: string - ): boolean { - if (!isHexString(key)) { + expectedRootHash: string, + dataDir: string + ): Promise { + if (!isHexString(hexkey)) { throw new Error("key must be a valid hex string"); } if (!isHexString(sha256)) { @@ -791,42 +792,83 @@ class DataIntegrityTree { throw new Error("expectedRootHash must be a valid hex string"); } - // Deserialize the foreign tree - const leaves = (serializedTree as any).leaves.map((leaf: string) => - Buffer.from(leaf, "hex") - ); - const tree = new MerkleTree(leaves, SHA256, { sortPairs: true }); + // File path based on sha256 + const filePath = path.join(dataDir, sha256.match(/.{1,2}/g)!.join("/")); - // Verify that the deserialized tree's root matches the expected root hash - const treeRootHash = tree.getRoot().toString("hex"); - if (treeRootHash !== expectedRootHash) { - console.warn( - `Expected root hash ${expectedRootHash}, but got ${treeRootHash}` - ); - return false; + // Check if the file exists + if (!fs.existsSync(filePath)) { + throw new Error(`File at path ${filePath} does not exist`); } - // Rebuild the files map from the serialized tree - // @ts-ignore - tree.files = new Map( - Object.entries((serializedTree as any).files).map( - ([key, value]: [string, any]) => [ - key, - { hash: value.hash, sha256: value.sha256 }, - ] - ) - ); + const compressedReadStream = fs.createReadStream(filePath); + const decompressStream = zlib.createGunzip(); + const hash = crypto.createHash("sha256"); - // Check if the SHA-256 exists in the foreign tree's files - const combinedHash = crypto - .createHash("sha256") - .update(`${toHex(key)}/${sha256}`) - .digest("hex"); + // Process file decompression and hash comparison + return new Promise((resolve, reject) => { + compressedReadStream.pipe(decompressStream); - const leaf = Buffer.from(combinedHash, "hex"); - const isInTree = tree.getLeafIndex(leaf) !== -1; + decompressStream.on("data", (chunk) => { + hash.update(chunk); + }); + + decompressStream.on("end", () => { + const uncompressedSha256 = hash.digest("hex"); + console.log(`SHA-256 of uncompressed file: ${uncompressedSha256}`); + + if (uncompressedSha256 !== sha256) { + console.warn( + `File hash mismatch. Expected: ${sha256}, got: ${uncompressedSha256}` + ); + return resolve(false); + } - return isInTree; + // Deserialize the foreign tree + const leaves = (serializedTree as any).leaves.map((leaf: string) => + Buffer.from(leaf, "hex") + ); + const tree = new MerkleTree(leaves, SHA256, { sortPairs: true }); + + // Verify that the deserialized tree's root matches the expected root hash + const treeRootHash = tree.getRoot().toString("hex"); + if (treeRootHash !== expectedRootHash) { + console.warn( + `Expected root hash ${expectedRootHash}, but got ${treeRootHash}` + ); + return resolve(false); + } + + // Rebuild the files map from the serialized tree + // @ts-ignore + tree.files = new Map( + Object.entries((serializedTree as any).files).map( + ([key, value]: [string, any]) => [ + key, + { hash: value.hash, sha256: value.sha256 }, + ] + ) + ); + + // Check if the SHA-256 exists in the foreign tree's files + const combinedHash = crypto + .createHash("sha256") + .update(`${hexkey}/${sha256}`) + .digest("hex"); + + const leaf = Buffer.from(combinedHash, "hex"); + const isInTree = tree.getLeafIndex(leaf) !== -1; + + resolve(isInTree); + }); + + decompressStream.on("error", (err) => { + reject(err); + }); + + compressedReadStream.on("error", (err) => { + reject(err); + }); + }); } } diff --git a/src/DigNetwork/DigNetwork.ts b/src/DigNetwork/DigNetwork.ts index f43fb3e..ec85602 100644 --- a/src/DigNetwork/DigNetwork.ts +++ b/src/DigNetwork/DigNetwork.ts @@ -8,6 +8,7 @@ import { DataStore, ServerCoin } from "../blockchain"; import { DIG_FOLDER_PATH } from "../utils/config"; import { RootHistoryItem } from "../types"; import { promisify } from "util"; +import { DataIntegrityTree } from "../DataIntegrityTree"; const rename = promisify(fs.rename); const unlink = promisify(fs.unlink); @@ -213,7 +214,7 @@ export class DigNetwork { // Add peer to blacklist if it doesn't meet criteria peerBlackList.push(peerIp); } catch (error) { - console.error(`Error connecting to peer ${peerIp}. Resampling...`); + console.error(`Error connecting to DIG Peer ${peerIp}. Resampling...`); if (peerIp) { peerBlackList.push(peerIp); // Add to blacklist if error occurs } @@ -312,10 +313,33 @@ export class DigNetwork { console.log( `Downloading file with sha256: ${file.sha256}...` ); + await selectedPeer.downloadData( this.dataStore.StoreId, `data/${file.sha256.match(/.{1,2}/g)!.join("/")}` ); + + const integrityCheck = + await DataIntegrityTree.validateKeyIntegrityWithForeignTree( + storeKey, + file.sha256, + root, + rootInfo.root_hash, + `${this.storeDir}/data` + ); + + if (integrityCheck) { + console.log( + `\x1b[32mIntegrity check passed for file with sha256: ${file.sha256}.\x1b[0m` + ); + continue; + } + + console.error( + `\x1b[31mIntegrity check failed for file with sha256: ${file.sha256}.\x1b[0m` + ); + await unlink(filePath); + throw new Error(`Store Integrity check failed. Syncing file from another peer.`); } } } @@ -348,6 +372,8 @@ export class DigNetwork { if (selectedPeer) { peerBlackList.push(selectedPeer.IpAddress); } + + console.trace(error); throw error; } } @@ -396,15 +422,15 @@ export class DigNetwork { const blacklist = this.peerBlacklist.get(dataPath) || new Set(); for (const digPeer of digPeers) { - if (blacklist.has(digPeer.IpAddress)) continue; + try { + if (blacklist.has(digPeer.IpAddress)) continue; - const response = await digPeer.propagationServer.headStore(); + const response = await digPeer.propagationServer.headStore(); - if (!response.success) { - continue; - } + if (!response.success) { + continue; + } - try { // Create directory if it doesn't exist const directory = path.dirname(tempFilePath); if (!fs.existsSync(directory)) { diff --git a/src/DigNetwork/DigPeer.ts b/src/DigNetwork/DigPeer.ts index 672b945..248cb1d 100644 --- a/src/DigNetwork/DigPeer.ts +++ b/src/DigNetwork/DigPeer.ts @@ -6,6 +6,7 @@ import { PropagationServer } from "./PropagationServer"; import { IncentiveServer } from "./IncentiveServer"; import { DataStore } from "../blockchain"; import { DataIntegrityTree } from "../DataIntegrityTree"; +import { DIG_FOLDER_PATH } from "../utils/config"; import fs from "fs"; import { sendXch, @@ -150,7 +151,8 @@ export class DigPeer { key, fileData.sha256, datFileContent, - rootHash + rootHash, + path.resolve(DIG_FOLDER_PATH, "stores", this.storeId, 'data') ); if (!treeCheck) { diff --git a/src/blockchain/FullNodePeer.ts b/src/blockchain/FullNodePeer.ts index 85c18dc..a58c409 100644 --- a/src/blockchain/FullNodePeer.ts +++ b/src/blockchain/FullNodePeer.ts @@ -24,6 +24,7 @@ export class FullNodePeer { private static cachedPeer: { peer: Peer; timestamp: number } | null = null; private static memoizedFetchNewPeerIPs: () => Promise; private peer: Peer; + private static deprioritizedIps: Set = new Set(); // New set for deprioritized IPs static { FullNodePeer.memoizedFetchNewPeerIPs = memoize( @@ -82,16 +83,20 @@ export class FullNodePeer { const trustedNodeIp = FullNodePeer.getTrustedFullNode(); const priorityIps: string[] = []; - // Prioritize trustedNodeIp + // Prioritize trustedNodeIp unless it's deprioritized if ( trustedNodeIp && + !FullNodePeer.deprioritizedIps.has(trustedNodeIp) && (await FullNodePeer.isPortReachable(trustedNodeIp, FULLNODE_PORT)) ) { priorityIps.push(trustedNodeIp); } - // Prioritize LOCALHOST - if (await FullNodePeer.isPortReachable(LOCALHOST, FULLNODE_PORT)) { + // Prioritize LOCALHOST unless it's deprioritized + if ( + !FullNodePeer.deprioritizedIps.has(LOCALHOST) && + (await FullNodePeer.isPortReachable(LOCALHOST, FULLNODE_PORT)) + ) { priorityIps.push(LOCALHOST); } @@ -156,11 +161,11 @@ export class FullNodePeer { return new Proxy(peer, { get: (target, prop) => { const originalMethod = (target as any)[prop]; - + if (typeof originalMethod === "function") { return async (...args: any[]) => { let timeoutId: NodeJS.Timeout | undefined; - + // Start the timeout to forget the peer after 1 minute const timeoutPromise = new Promise((_, reject) => { timeoutId = setTimeout(() => { @@ -168,19 +173,19 @@ export class FullNodePeer { reject(new Error("Operation timed out. Reconnecting to a new peer.")); }, 60000); // 1 minute }); - + try { // Run the original method and race it against the timeout const result = await Promise.race([ originalMethod.apply(target, args), timeoutPromise, ]); - + // Clear the timeout if the operation succeeded if (timeoutId) { clearTimeout(timeoutId); } - + return result; } catch (error: any) { // If the error is WebSocket-related or timeout, reset the peer @@ -188,6 +193,7 @@ export class FullNodePeer { FullNodePeer.cachedPeer = null; // @ts-ignore FullNodePeer.memoizedFetchNewPeerIPs.cache.clear(); + console.info(`Fullnode Peer error, reconnecting to a new peer...`); const newPeer = await FullNodePeer.getBestPeer(); return (newPeer as any)[prop](...args); } @@ -277,6 +283,7 @@ export class FullNodePeer { let bestPeerIndex = validHeights.findIndex( (height, index) => height === highestPeak && + !FullNodePeer.deprioritizedIps.has(peerIPs[index]) && // Exclude deprioritized IPs (peerIPs[index] === LOCALHOST || peerIPs[index] === trustedNodeIp) ); diff --git a/src/blockchain/ServerCoin.ts b/src/blockchain/ServerCoin.ts index f36e1ac..0fd05e2 100644 --- a/src/blockchain/ServerCoin.ts +++ b/src/blockchain/ServerCoin.ts @@ -241,7 +241,9 @@ export class ServerCoin { blacklist: string[] = [] ): Promise { const serverCoinPeers = await this.getAllEpochPeers(epoch, blacklist); - console.log("Server Coin Peers: ", serverCoinPeers); + if (process.env.DIG_DEBUG === "1") { + console.log("Server Coin Peers: ", serverCoinPeers); + } return _.sampleSize(serverCoinPeers, sampleSize); } From 935fd3ff312f87f0cbbfdefb143f68f87e82c3ed Mon Sep 17 00:00:00 2001 From: Michael Taylor Date: Fri, 20 Sep 2024 12:37:55 -0400 Subject: [PATCH 2/3] fix: sync process --- src/DigNetwork/DigNetwork.ts | 60 ++++++++++++++++++---------------- src/blockchain/FullNodePeer.ts | 32 ++++++++++-------- 2 files changed, 51 insertions(+), 41 deletions(-) diff --git a/src/DigNetwork/DigNetwork.ts b/src/DigNetwork/DigNetwork.ts index ec85602..d90d79f 100644 --- a/src/DigNetwork/DigNetwork.ts +++ b/src/DigNetwork/DigNetwork.ts @@ -260,6 +260,7 @@ export class DigNetwork { .filter( (item) => !fs.existsSync(`${this.storeDir}/${item.root_hash}.dat`) ) + // Reverse to download the latest first .reverse(); if (!rootHistoryFiltered.length) { @@ -309,38 +310,36 @@ export class DigNetwork { `${this.storeDir}/data` ); - if (!fs.existsSync(filePath) || forceDownload) { - console.log( - `Downloading file with sha256: ${file.sha256}...` - ); + console.log(`Downloading file with sha256: ${file.sha256}...`); - await selectedPeer.downloadData( - this.dataStore.StoreId, - `data/${file.sha256.match(/.{1,2}/g)!.join("/")}` + await selectedPeer.downloadData( + this.dataStore.StoreId, + `data/${file.sha256.match(/.{1,2}/g)!.join("/")}` + ); + + const integrityCheck = + await DataIntegrityTree.validateKeyIntegrityWithForeignTree( + storeKey, + file.sha256, + root, + rootInfo.root_hash, + `${this.storeDir}/data` ); - const integrityCheck = - await DataIntegrityTree.validateKeyIntegrityWithForeignTree( - storeKey, - file.sha256, - root, - rootInfo.root_hash, - `${this.storeDir}/data` - ); - - if (integrityCheck) { - console.log( - `\x1b[32mIntegrity check passed for file with sha256: ${file.sha256}.\x1b[0m` - ); - continue; - } - - console.error( - `\x1b[31mIntegrity check failed for file with sha256: ${file.sha256}.\x1b[0m` + if (integrityCheck) { + console.log( + `\x1b[32mIntegrity check passed for file with sha256: ${file.sha256}.\x1b[0m` ); - await unlink(filePath); - throw new Error(`Store Integrity check failed. Syncing file from another peer.`); + continue; } + + console.error( + `\x1b[31mIntegrity check failed for file with sha256: ${file.sha256}.\x1b[0m` + ); + await unlink(filePath); + throw new Error( + `Store Integrity check failed. Syncing file from another peer.` + ); } } @@ -363,6 +362,11 @@ export class DigNetwork { } } } + + // Only process the first root hash so other stores can sync the latest. + // This has an effect where the latest roothash will always be synced first, even if new ones come in. + // Then it will backfill historical roothashes + break; } await this.downloadManifestFile(true); @@ -370,7 +374,7 @@ export class DigNetwork { console.log("Syncing store complete."); } catch (error: any) { if (selectedPeer) { - peerBlackList.push(selectedPeer.IpAddress); + peerBlackList.push((selectedPeer as DigPeer).IpAddress); } console.trace(error); diff --git a/src/blockchain/FullNodePeer.ts b/src/blockchain/FullNodePeer.ts index a58c409..84682d7 100644 --- a/src/blockchain/FullNodePeer.ts +++ b/src/blockchain/FullNodePeer.ts @@ -11,6 +11,7 @@ import { MIN_HEIGHT, MIN_HEIGHT_HEADER_HASH } from "../utils/config"; const FULLNODE_PORT = 8444; const LOCALHOST = "127.0.0.1"; +const CHIA_NODES_HOST = "chia-nodes"; const DNS_HOSTS = [ "dns-introducer.chia.net", "chia.ctrlaltdel.ch", @@ -27,9 +28,7 @@ export class FullNodePeer { private static deprioritizedIps: Set = new Set(); // New set for deprioritized IPs static { - FullNodePeer.memoizedFetchNewPeerIPs = memoize( - FullNodePeer.fetchNewPeerIPs - ); + FullNodePeer.memoizedFetchNewPeerIPs = memoize(FullNodePeer.fetchNewPeerIPs); } private constructor(peer: Peer) { @@ -67,7 +66,7 @@ export class FullNodePeer { /** * Retrieves the TRUSTED_FULLNODE IP from the environment * and verifies if it is a valid IP address. - * + * * @returns {string | null} The valid IP address or null if invalid */ private static getTrustedFullNode(): string | null { @@ -100,6 +99,14 @@ export class FullNodePeer { priorityIps.push(LOCALHOST); } + // Prioritize CHIA_NODES_HOST unless it's deprioritized + if ( + !FullNodePeer.deprioritizedIps.has(CHIA_NODES_HOST) && + (await FullNodePeer.isPortReachable(CHIA_NODES_HOST, FULLNODE_PORT)) + ) { + priorityIps.push(CHIA_NODES_HOST); + } + if (priorityIps.length > 0) { return priorityIps; } @@ -124,9 +131,7 @@ export class FullNodePeer { } } } catch (error: any) { - console.error( - `Failed to resolve IPs from ${DNS_HOST}: ${error.message}` - ); + console.error(`Failed to resolve IPs from ${DNS_HOST}: ${error.message}`); } } throw new Error("No reachable IPs found in any DNS records."); @@ -150,6 +155,8 @@ export class FullNodePeer { // @ts-ignore if (FullNodePeer.memoizedFetchNewPeerIPs?.cache?.clear) { + // Clear cache and reset deprioritized IPs when cache is cleared + FullNodePeer.deprioritizedIps.clear(); // @ts-ignore FullNodePeer.memoizedFetchNewPeerIPs.cache.clear(); } @@ -193,6 +200,7 @@ export class FullNodePeer { FullNodePeer.cachedPeer = null; // @ts-ignore FullNodePeer.memoizedFetchNewPeerIPs.cache.clear(); + FullNodePeer.deprioritizedIps.clear(); console.info(`Fullnode Peer error, reconnecting to a new peer...`); const newPeer = await FullNodePeer.getBestPeer(); return (newPeer as any)[prop](...args); @@ -241,9 +249,7 @@ export class FullNodePeer { ); return FullNodePeer.createPeerProxy(peer); } catch (error: any) { - console.error( - `Failed to create peer for IP ${ip}: ${error.message}` - ); + console.error(`Failed to create peer for IP ${ip}: ${error.message}`); return null; } } @@ -279,15 +285,15 @@ export class FullNodePeer { const highestPeak = Math.max(...validHeights); - // Prioritize LOCALHOST and TRUSTED_NODE_IP if they have the highest peak height + // Prioritize LOCALHOST, TRUSTED_NODE_IP, and CHIA_NODES_HOST if they have the highest peak height let bestPeerIndex = validHeights.findIndex( (height, index) => height === highestPeak && !FullNodePeer.deprioritizedIps.has(peerIPs[index]) && // Exclude deprioritized IPs - (peerIPs[index] === LOCALHOST || peerIPs[index] === trustedNodeIp) + (peerIPs[index] === LOCALHOST || peerIPs[index] === trustedNodeIp || peerIPs[index] === CHIA_NODES_HOST) ); - // If LOCALHOST or TRUSTED_NODE_IP don't have the highest peak, select any peer with the highest peak + // If LOCALHOST, TRUSTED_NODE_IP, or CHIA_NODES_HOST don't have the highest peak, select any peer with the highest peak if (bestPeerIndex === -1) { bestPeerIndex = validHeights.findIndex( (height) => height === highestPeak From db991d9557e057609952b829e58934e651682e64 Mon Sep 17 00:00:00 2001 From: Michael Taylor Date: Fri, 20 Sep 2024 12:38:45 -0400 Subject: [PATCH 3/3] chore(release): 0.0.1-alpha.48 --- CHANGELOG.md | 8 ++++++++ package-lock.json | 4 ++-- package.json | 2 +- 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 694efb2..2b660b7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,14 @@ All notable changes to this project will be documented in this file. See [standard-version](https://github.com/conventional-changelog/standard-version) for commit guidelines. +### [0.0.1-alpha.48](https://github.com/DIG-Network/dig-chia-sdk/compare/v0.0.1-alpha.47...v0.0.1-alpha.48) (2024-09-20) + + +### Bug Fixes + +* sync process ([935fd3f](https://github.com/DIG-Network/dig-chia-sdk/commit/935fd3ff312f87f0cbbfdefb143f68f87e82c3ed)) +* sync process ([b57a8e0](https://github.com/DIG-Network/dig-chia-sdk/commit/b57a8e0572ba28abae81fe02179a9c4b9a1b4ea0)) + ### [0.0.1-alpha.47](https://github.com/DIG-Network/dig-chia-sdk/compare/v0.0.1-alpha.46...v0.0.1-alpha.47) (2024-09-20) diff --git a/package-lock.json b/package-lock.json index 04c73f9..9ffd264 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@dignetwork/dig-sdk", - "version": "0.0.1-alpha.47", + "version": "0.0.1-alpha.48", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@dignetwork/dig-sdk", - "version": "0.0.1-alpha.47", + "version": "0.0.1-alpha.48", "license": "ISC", "dependencies": { "@dignetwork/datalayer-driver": "^0.1.24", diff --git a/package.json b/package.json index 6ab485a..21e1da7 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@dignetwork/dig-sdk", - "version": "0.0.1-alpha.47", + "version": "0.0.1-alpha.48", "description": "", "type": "commonjs", "main": "./dist/index.js",