diff --git a/CHANGELOG.md b/CHANGELOG.md index cb686f7..0055095 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,13 @@ All notable changes to this project will be documented in this file. See [standard-version](https://github.com/conventional-changelog/standard-version) for commit guidelines. +### [0.0.1-alpha.151](https://github.com/DIG-Network/dig-chia-sdk/compare/v0.0.1-alpha.150...v0.0.1-alpha.151) (2024-10-06) + + +### Features + +* improve network sync with peer ranker ([668b894](https://github.com/DIG-Network/dig-chia-sdk/commit/668b8944b5069caf6117526d4480e762a1ffcb46)) + ### [0.0.1-alpha.150](https://github.com/DIG-Network/dig-chia-sdk/compare/v0.0.1-alpha.149...v0.0.1-alpha.150) (2024-10-06) ### [0.0.1-alpha.149](https://github.com/DIG-Network/dig-chia-sdk/compare/v0.0.1-alpha.148...v0.0.1-alpha.149) (2024-10-06) diff --git a/package-lock.json b/package-lock.json index aeafeb7..3880950 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@dignetwork/dig-sdk", - "version": "0.0.1-alpha.150", + "version": "0.0.1-alpha.151", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@dignetwork/dig-sdk", - "version": "0.0.1-alpha.150", + "version": "0.0.1-alpha.151", "license": "ISC", "dependencies": { "@dignetwork/datalayer-driver": "^0.1.29", diff --git a/package.json b/package.json index e100775..49b7845 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@dignetwork/dig-sdk", - "version": "0.0.1-alpha.150", + "version": "0.0.1-alpha.151", "description": "", "type": "commonjs", "main": "./dist/index.js", diff --git a/src/DigNetwork/DigNetwork.ts b/src/DigNetwork/DigNetwork.ts index 90287da..ee8d68c 100644 --- a/src/DigNetwork/DigNetwork.ts +++ b/src/DigNetwork/DigNetwork.ts @@ -3,7 +3,7 @@ import * as path from "path"; import { DigPeer } from "./DigPeer"; import { DataStore, ServerCoin } from "../blockchain"; import { DIG_FOLDER_PATH } from "../utils/config"; -import { withTimeout } from "../utils"; +import { withTimeout, PeerRanker, PeerMetrics } from "../utils"; export class DigNetwork { private dataStore: DataStore; @@ -27,99 +27,110 @@ export class DigNetwork { await digNetwork.syncStoreFromPeers(); } + public static getAllNetworkDataStoreIds(): string[] { + throw new Error("Method not implemented."); + } + public static async getUdiContent(udi: string) { - // TODO: Implement this method + throw new Error("Method not implemented."); } /** - * Find a peer that has the store key and root hash. + * Find a peer that has the store key and root hash, using ranked peers first and searching in groups of 5. * * @param {string} storeId - The ID of the store. * @param {string} rootHash - The root hash of the store. * @param {string} [key] - Optional key to check for in the store. - * @param {string[]} [initialBlackList] - Initial list of blacklisted peer IPs. * @returns {Promise} - A valid peer or null if none found. */ public static async findPeerWithStoreKey( storeId: string, rootHash: string, - key?: string, - initialBlackList: string[] = [] + key?: string ): Promise { - const peerBlackList = new Set(initialBlackList); const serverCoin = new ServerCoin(storeId); - while (true) { - try { - // Sample 10 peers from the current epoch excluding blacklisted peers - const digPeers = await serverCoin.sampleCurrentEpoch( - 10, - Array.from(peerBlackList) - ); + try { + // Fetch all active peers for the current epoch + const digPeers = await serverCoin.getActiveEpochPeers(); - // If no peers are returned, break out of the loop - if (digPeers.length === 0) { - console.log("No more peers found."); - break; - } + // If no peers are returned, exit early + if (digPeers.length === 0) { + console.log("No peers found."); + return null; + } - // Create a race of promises for all peers - const peerPromises = digPeers.map((peerIp) => { - return new Promise(async (resolve) => { - try { - const digPeer = new DigPeer(peerIp, storeId); - const { storeExists, rootHashExists } = - await digPeer.propagationServer.checkStoreExists(rootHash); - - // Check if the store and root hash exist on the peer - if (storeExists && rootHashExists) { - console.log( - `Found Peer at ${peerIp} for storeId: ${storeId}, root hash ${rootHash}` - ); - - // If no key is provided, resolve the peer - if (!key) { - return resolve(digPeer); - } - - // If key is provided, check if the peer has it - const keyResponse = await digPeer.contentServer.headKey( - key, - rootHash - ); - if (keyResponse.headers?.["x-key-exists"] === "true") { - return resolve(digPeer); - } - } - } catch (error) { - console.error(`Error connecting to DIG Peer ${peerIp}.`); - } + // Initialize PeerRanker with the list of digPeers (IP addresses) + const peerRanker = new PeerRanker(digPeers); - // If the peer does not meet the criteria, resolve with null - resolve(null); - }); - }); + // Rank the peers based on latency and bandwidth + const rankedPeers = await peerRanker.rankPeers(); - // Wait for the first valid peer that resolves - const firstValidPeer = await Promise.race(peerPromises); + // If no peers are returned after ranking, exit early + if (rankedPeers.length === 0) { + console.log("No valid peers found after ranking."); + return null; + } + + // Define the iterator function to process each peer + const iteratorFn = async ( + peerMetrics: PeerMetrics + ): Promise => { + const peerIp = peerMetrics.ip; + try { + const digPeer = new DigPeer(peerIp, storeId); + + // Wrap the store check with a 10-second timeout + const { storeExists, rootHashExists } = await withTimeout( + digPeer.propagationServer.checkStoreExists(rootHash), + 10000, + `Timeout while checking store on peer ${peerIp}` + ); + + // Check if the store and root hash exist on the peer + if (storeExists && rootHashExists) { + console.log( + `Found Peer at ${peerIp} for storeId: ${storeId}, root hash ${rootHash}` + ); + + // If no key is provided, return the peer + if (!key) { + return digPeer; + } - // If a valid peer is found, return it - if (firstValidPeer) { - return firstValidPeer; + // If key is provided, wrap key check with a 10-second timeout + const keyResponse = await withTimeout( + digPeer.contentServer.headKey(key, rootHash), + 10000, + `Timeout while checking key on peer ${peerIp}` + ); + + if (keyResponse.headers?.["x-key-exists"] === "true") { + return digPeer; + } + } + } catch (error: any) { + console.error( + `Error connecting to DIG Peer ${peerIp}:`, + error.message + ); } - // If none of the peers were valid, add them to the blacklist - digPeers.forEach((peerIp) => peerBlackList.add(peerIp)); + // If the peer does not meet the criteria, return null + return null; + }; - // Retry with the next set of peers - console.log("No valid peers found, retrying with new peers..."); - } catch (error) { - console.error("Error sampling peers. Resampling..."); - } - } + // Use Promise.race to return the first valid peer found + const validPeer = await Promise.race( + rankedPeers.map((peer) => iteratorFn(peer)) + ); - // Return null if no valid peer was found after all attempts - return null; + // Return the first valid peer or null if none is found + return validPeer || null; + } catch (error) { + console.error("Error sampling peers:", error); + return null; + } } public static unsubscribeFromStore(storeId: string): void { @@ -156,7 +167,6 @@ export class DigNetwork { } console.log("Starting network sync for store:", this.dataStore.StoreId); DigNetwork.networkSyncMap.set(this.dataStore.StoreId, true); - let peerBlackList: string[] = []; try { const rootHistory = await this.dataStore.getRootHistory(); @@ -189,61 +199,35 @@ export class DigNetwork { // Process the root hashes sequentially for (const rootInfo of rootsToProcess) { - let selectedPeer: DigPeer | null = null; - - while (true) { - try { - // Find a peer with the store and root hash - if (prioritizedPeer) { - selectedPeer = prioritizedPeer; - } else { - selectedPeer = await DigNetwork.findPeerWithStoreKey( - this.dataStore.StoreId, - rootInfo.root_hash, - undefined, - peerBlackList - ); - } - - if (!selectedPeer) { - console.error( - `No peer found with root hash ${rootInfo.root_hash}. Moving to next root.` - ); - break; // Exit the while loop to proceed to the next rootInfo - } - - // Check if the selected peer has the store and root hash - const { storeExists, rootHashExists } = - await selectedPeer.propagationServer.checkStoreExists( - rootInfo.root_hash - ); - - if (!storeExists || !rootHashExists) { - console.warn( - `Peer ${selectedPeer.IpAddress} does not have the required store or root hash. Trying another peer...` - ); - peerBlackList.push(selectedPeer.IpAddress); // Blacklist and retry - continue; - } + try { + let selectedPeer: DigPeer | null = prioritizedPeer || null; + + if (!selectedPeer) { + // Use the `findPeerWithStoreKey` method to find a peer with the store and root hash + selectedPeer = await DigNetwork.findPeerWithStoreKey( + this.dataStore.StoreId, + rootInfo.root_hash + ); + } - // Download the store root and associated data - await selectedPeer.downloadStoreRoot(rootInfo.root_hash); - - // Clear the blacklist upon successful download - peerBlackList = []; - - // Break after successful download to proceed to next root hash - break; - } catch (error: any) { - if (error.message) - console.error( - `Error downloading from peer ${selectedPeer?.IpAddress}. Retrying with another peer.`, - error - ); - if (selectedPeer) { - peerBlackList.push(selectedPeer.IpAddress); // Blacklist and retry - } + if (!selectedPeer) { + console.error( + `No peer found with root hash ${rootInfo.root_hash}. Moving to next root.` + ); + continue; // Move to the next rootInfo } + + // Download the store root and associated data + await selectedPeer.downloadStoreRoot(rootInfo.root_hash); + + // Break after successful download to proceed to next root hash + } catch (error: any) { + if (error.message) + console.error( + `Error downloading from peer ${prioritizedPeer?.IpAddress}. Retrying with another peer.`, + error + ); + // Continue to next rootInfo in case of error } } diff --git a/src/blockchain/StoreMonitorRegistry.ts b/src/blockchain/StoreMonitorRegistry.ts index 1d4dc08..8c77434 100644 --- a/src/blockchain/StoreMonitorRegistry.ts +++ b/src/blockchain/StoreMonitorRegistry.ts @@ -155,7 +155,7 @@ export class StoreMonitorRegistry { callback: StoreUpdateCallback ): Promise { let retryCount = 0; - const maxRetryDelay = 60000; // 60 seconds + const maxRetryDelay = 1000; // 60 seconds while (this.activeMonitors.has(storeId)) { try { diff --git a/src/utils/PeerRanker.ts b/src/utils/PeerRanker.ts index fe6b50b..536de45 100644 --- a/src/utils/PeerRanker.ts +++ b/src/utils/PeerRanker.ts @@ -1,9 +1,13 @@ import axios, { AxiosRequestConfig } from 'axios'; import fs from 'fs'; import https from 'https'; +import NodeCache from 'node-cache'; // Import node-cache import { getOrCreateSSLCerts } from './ssl'; import { asyncPool } from './promiseUtils'; +// Cache with TTL of 1 day (86400 seconds) +const peerCache = new NodeCache({ stdTTL: 86400 }); + export interface PeerMetrics { ip: string; latency: number; // in milliseconds @@ -42,12 +46,18 @@ export class PeerRanker { /** * Measures the latency of a given IP address using an HTTPS request. * Tries HEAD first, then falls back to GET if HEAD is not supported. + * If the latency is cached, returns the cached value. * @param ip - The IP address of the peer. * @returns Promise resolving to the latency in milliseconds or rejecting if the peer fails. */ private async measureLatency(ip: string): Promise { + const cachedMetrics = peerCache.get(ip); + if (cachedMetrics && cachedMetrics.latency) { + console.log(`Latency for IP ${ip} retrieved from cache.`); + return cachedMetrics.latency; + } + const url = `https://${ip}:4159/diagnostics/ping`; - const configHead: AxiosRequestConfig = { url: url, method: 'HEAD', @@ -81,6 +91,10 @@ export class PeerRanker { await axios(configGet); } const latency = Date.now() - startTime; + + // Cache the latency for the IP + peerCache.set(ip, { ...cachedMetrics, latency } as PeerMetrics); + return latency; } catch (error: any) { console.error(`Latency measurement failed for IP ${ip}:`, error.message); @@ -90,10 +104,17 @@ export class PeerRanker { /** * Measures the upload bandwidth of a given IP address by sending random data. + * If the bandwidth is cached, returns the cached value. * @param ip - The IP address of the peer. * @returns Promise resolving to the upload bandwidth in bytes per second or rejecting if the peer fails. */ private async measureBandwidth(ip: string): Promise { + const cachedMetrics = peerCache.get(ip); + if (cachedMetrics && cachedMetrics.bandwidth) { + console.log(`Bandwidth for IP ${ip} retrieved from cache.`); + return cachedMetrics.bandwidth; + } + const url = `https://${ip}:4159/diagnostics/bandwidth`; const randomData = Buffer.alloc(this.uploadTestSize, 'a'); @@ -121,6 +142,10 @@ export class PeerRanker { await axios(config); const timeElapsed = (Date.now() - startTime) / 1000; const bandwidth = this.uploadTestSize / timeElapsed; + + // Cache the bandwidth for the IP + peerCache.set(ip, { ...cachedMetrics, bandwidth } as PeerMetrics); + return bandwidth; } catch (error: any) { console.error(`Bandwidth measurement failed for IP ${ip}:`, error.message); @@ -153,7 +178,7 @@ export class PeerRanker { // Process all peers with a concurrency limit and cooldown between batches const peerMetrics: PeerMetrics[] = ( await asyncPool(limit, this.ipAddresses, iteratorFn, cooldown) - ).filter((metrics: any): metrics is PeerMetrics => metrics !== null); // Use a type guard + ).filter((metrics): metrics is PeerMetrics => metrics !== null); // Use a type guard // Sort by lowest latency first, then by highest bandwidth peerMetrics.sort((a, b) => {