Skip to content

Commit

Permalink
Merge pull request #136 from DIG-Network/release/v0.0.1-alpha.151
Browse files Browse the repository at this point in the history
Release/v0.0.1 alpha.151
  • Loading branch information
MichaelTaylor3D authored Oct 6, 2024
2 parents 1454b93 + 3ffe67b commit 7c8e5da
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 129 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@

All notable changes to this project will be documented in this file. See [standard-version](https://github.com/conventional-changelog/standard-version) for commit guidelines.

### [0.0.1-alpha.151](https://github.com/DIG-Network/dig-chia-sdk/compare/v0.0.1-alpha.150...v0.0.1-alpha.151) (2024-10-06)


### Features

* improve network sync with peer ranker ([668b894](https://github.com/DIG-Network/dig-chia-sdk/commit/668b8944b5069caf6117526d4480e762a1ffcb46))

### [0.0.1-alpha.150](https://github.com/DIG-Network/dig-chia-sdk/compare/v0.0.1-alpha.149...v0.0.1-alpha.150) (2024-10-06)

### [0.0.1-alpha.149](https://github.com/DIG-Network/dig-chia-sdk/compare/v0.0.1-alpha.148...v0.0.1-alpha.149) (2024-10-06)
Expand Down
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@dignetwork/dig-sdk",
"version": "0.0.1-alpha.150",
"version": "0.0.1-alpha.151",
"description": "",
"type": "commonjs",
"main": "./dist/index.js",
Expand Down
230 changes: 107 additions & 123 deletions src/DigNetwork/DigNetwork.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import * as path from "path";
import { DigPeer } from "./DigPeer";
import { DataStore, ServerCoin } from "../blockchain";
import { DIG_FOLDER_PATH } from "../utils/config";
import { withTimeout } from "../utils";
import { withTimeout, PeerRanker, PeerMetrics } from "../utils";

export class DigNetwork {
private dataStore: DataStore;
Expand All @@ -27,99 +27,110 @@ export class DigNetwork {
await digNetwork.syncStoreFromPeers();
}

public static getAllNetworkDataStoreIds(): string[] {
throw new Error("Method not implemented.");
}

public static async getUdiContent(udi: string) {
// TODO: Implement this method
throw new Error("Method not implemented.");
}

/**
* Find a peer that has the store key and root hash.
* Find a peer that has the store key and root hash, using ranked peers first and searching in groups of 5.
*
* @param {string} storeId - The ID of the store.
* @param {string} rootHash - The root hash of the store.
* @param {string} [key] - Optional key to check for in the store.
* @param {string[]} [initialBlackList] - Initial list of blacklisted peer IPs.
* @returns {Promise<DigPeer | null>} - A valid peer or null if none found.
*/
public static async findPeerWithStoreKey(
storeId: string,
rootHash: string,
key?: string,
initialBlackList: string[] = []
key?: string
): Promise<DigPeer | null> {
const peerBlackList = new Set(initialBlackList);
const serverCoin = new ServerCoin(storeId);

while (true) {
try {
// Sample 10 peers from the current epoch excluding blacklisted peers
const digPeers = await serverCoin.sampleCurrentEpoch(
10,
Array.from(peerBlackList)
);
try {
// Fetch all active peers for the current epoch
const digPeers = await serverCoin.getActiveEpochPeers();

// If no peers are returned, break out of the loop
if (digPeers.length === 0) {
console.log("No more peers found.");
break;
}
// If no peers are returned, exit early
if (digPeers.length === 0) {
console.log("No peers found.");
return null;
}

// Create a race of promises for all peers
const peerPromises = digPeers.map((peerIp) => {
return new Promise<DigPeer | null>(async (resolve) => {
try {
const digPeer = new DigPeer(peerIp, storeId);
const { storeExists, rootHashExists } =
await digPeer.propagationServer.checkStoreExists(rootHash);

// Check if the store and root hash exist on the peer
if (storeExists && rootHashExists) {
console.log(
`Found Peer at ${peerIp} for storeId: ${storeId}, root hash ${rootHash}`
);

// If no key is provided, resolve the peer
if (!key) {
return resolve(digPeer);
}

// If key is provided, check if the peer has it
const keyResponse = await digPeer.contentServer.headKey(
key,
rootHash
);
if (keyResponse.headers?.["x-key-exists"] === "true") {
return resolve(digPeer);
}
}
} catch (error) {
console.error(`Error connecting to DIG Peer ${peerIp}.`);
}
// Initialize PeerRanker with the list of digPeers (IP addresses)
const peerRanker = new PeerRanker(digPeers);

// If the peer does not meet the criteria, resolve with null
resolve(null);
});
});
// Rank the peers based on latency and bandwidth
const rankedPeers = await peerRanker.rankPeers();

// Wait for the first valid peer that resolves
const firstValidPeer = await Promise.race(peerPromises);
// If no peers are returned after ranking, exit early
if (rankedPeers.length === 0) {
console.log("No valid peers found after ranking.");
return null;
}

// Define the iterator function to process each peer
const iteratorFn = async (
peerMetrics: PeerMetrics
): Promise<DigPeer | null> => {
const peerIp = peerMetrics.ip;
try {
const digPeer = new DigPeer(peerIp, storeId);

// Wrap the store check with a 10-second timeout
const { storeExists, rootHashExists } = await withTimeout(
digPeer.propagationServer.checkStoreExists(rootHash),
10000,
`Timeout while checking store on peer ${peerIp}`
);

// Check if the store and root hash exist on the peer
if (storeExists && rootHashExists) {
console.log(
`Found Peer at ${peerIp} for storeId: ${storeId}, root hash ${rootHash}`
);

// If no key is provided, return the peer
if (!key) {
return digPeer;
}

// If a valid peer is found, return it
if (firstValidPeer) {
return firstValidPeer;
// If key is provided, wrap key check with a 10-second timeout
const keyResponse = await withTimeout(
digPeer.contentServer.headKey(key, rootHash),
10000,
`Timeout while checking key on peer ${peerIp}`
);

if (keyResponse.headers?.["x-key-exists"] === "true") {
return digPeer;
}
}
} catch (error: any) {
console.error(
`Error connecting to DIG Peer ${peerIp}:`,
error.message
);
}

// If none of the peers were valid, add them to the blacklist
digPeers.forEach((peerIp) => peerBlackList.add(peerIp));
// If the peer does not meet the criteria, return null
return null;
};

// Retry with the next set of peers
console.log("No valid peers found, retrying with new peers...");
} catch (error) {
console.error("Error sampling peers. Resampling...");
}
}
// Use Promise.race to return the first valid peer found
const validPeer = await Promise.race(
rankedPeers.map((peer) => iteratorFn(peer))
);

// Return null if no valid peer was found after all attempts
return null;
// Return the first valid peer or null if none is found
return validPeer || null;
} catch (error) {
console.error("Error sampling peers:", error);
return null;
}
}

public static unsubscribeFromStore(storeId: string): void {
Expand Down Expand Up @@ -156,7 +167,6 @@ export class DigNetwork {
}
console.log("Starting network sync for store:", this.dataStore.StoreId);
DigNetwork.networkSyncMap.set(this.dataStore.StoreId, true);
let peerBlackList: string[] = [];

try {
const rootHistory = await this.dataStore.getRootHistory();
Expand Down Expand Up @@ -189,61 +199,35 @@ export class DigNetwork {

// Process the root hashes sequentially
for (const rootInfo of rootsToProcess) {
let selectedPeer: DigPeer | null = null;

while (true) {
try {
// Find a peer with the store and root hash
if (prioritizedPeer) {
selectedPeer = prioritizedPeer;
} else {
selectedPeer = await DigNetwork.findPeerWithStoreKey(
this.dataStore.StoreId,
rootInfo.root_hash,
undefined,
peerBlackList
);
}

if (!selectedPeer) {
console.error(
`No peer found with root hash ${rootInfo.root_hash}. Moving to next root.`
);
break; // Exit the while loop to proceed to the next rootInfo
}

// Check if the selected peer has the store and root hash
const { storeExists, rootHashExists } =
await selectedPeer.propagationServer.checkStoreExists(
rootInfo.root_hash
);

if (!storeExists || !rootHashExists) {
console.warn(
`Peer ${selectedPeer.IpAddress} does not have the required store or root hash. Trying another peer...`
);
peerBlackList.push(selectedPeer.IpAddress); // Blacklist and retry
continue;
}
try {
let selectedPeer: DigPeer | null = prioritizedPeer || null;

if (!selectedPeer) {
// Use the `findPeerWithStoreKey` method to find a peer with the store and root hash
selectedPeer = await DigNetwork.findPeerWithStoreKey(
this.dataStore.StoreId,
rootInfo.root_hash
);
}

// Download the store root and associated data
await selectedPeer.downloadStoreRoot(rootInfo.root_hash);

// Clear the blacklist upon successful download
peerBlackList = [];

// Break after successful download to proceed to next root hash
break;
} catch (error: any) {
if (error.message)
console.error(
`Error downloading from peer ${selectedPeer?.IpAddress}. Retrying with another peer.`,
error
);
if (selectedPeer) {
peerBlackList.push(selectedPeer.IpAddress); // Blacklist and retry
}
if (!selectedPeer) {
console.error(
`No peer found with root hash ${rootInfo.root_hash}. Moving to next root.`
);
continue; // Move to the next rootInfo
}

// Download the store root and associated data
await selectedPeer.downloadStoreRoot(rootInfo.root_hash);

// Break after successful download to proceed to next root hash
} catch (error: any) {
if (error.message)
console.error(
`Error downloading from peer ${prioritizedPeer?.IpAddress}. Retrying with another peer.`,
error
);
// Continue to next rootInfo in case of error
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/blockchain/StoreMonitorRegistry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ export class StoreMonitorRegistry {
callback: StoreUpdateCallback
): Promise<void> {
let retryCount = 0;
const maxRetryDelay = 60000; // 60 seconds
const maxRetryDelay = 1000; // 60 seconds

while (this.activeMonitors.has(storeId)) {
try {
Expand Down
Loading

0 comments on commit 7c8e5da

Please sign in to comment.