Skip to content

Commit

Permalink
Merge pull request #144 from DIG-Network/release/v0.0.1-alpha.160
Browse files Browse the repository at this point in the history
Release/v0.0.1 alpha.160
  • Loading branch information
MichaelTaylor3D authored Oct 7, 2024
2 parents 5b2bc13 + 5616ae8 commit 8123784
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 62 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@

All notable changes to this project will be documented in this file. See [standard-version](https://github.com/conventional-changelog/standard-version) for commit guidelines.

### [0.0.1-alpha.160](https://github.com/DIG-Network/dig-chia-sdk/compare/v0.0.1-alpha.159...v0.0.1-alpha.160) (2024-10-07)


### Features

* fullnode peer improvements ([ba02ee2](https://github.com/DIG-Network/dig-chia-sdk/commit/ba02ee216d95e8295e4138035d823cf9f4cd5480))

### [0.0.1-alpha.159](https://github.com/DIG-Network/dig-chia-sdk/compare/v0.0.1-alpha.158...v0.0.1-alpha.159) (2024-10-07)


Expand Down
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@dignetwork/dig-sdk",
"version": "0.0.1-alpha.159",
"version": "0.0.1-alpha.160",
"description": "",
"type": "commonjs",
"main": "./dist/index.js",
Expand Down
4 changes: 1 addition & 3 deletions src/DigNetwork/PropagationServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,9 @@ export class PropagationServer {

try {
const response = await axios.get(url, config);
console.log(green(`✔ Successfully pinged peer: ${this.ipAddress}`));

return response.data;
} catch (error: any) {
console.error(red(`✖ Failed to ping peer: ${this.ipAddress}`));
console.error(red(error.message));
throw error;
}
Expand Down Expand Up @@ -164,7 +162,7 @@ export class PropagationServer {

return response.data;
} catch (error: any) {
console.error(red(`✖ Failed to ping peer: ${this.ipAddress}`));
console.error(red(`✖ Failed to ping peer: ${this.ipAddress}`), error.message);
console.error(red(error.message));
throw error;
}
Expand Down
99 changes: 43 additions & 56 deletions src/blockchain/FullNodePeer.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
// FullNodePeer.ts

import path from "path";
import os from "os";
import fs from "fs";
Expand All @@ -12,7 +10,6 @@ import { Environment } from "../utils/Environment";
import NodeCache from "node-cache";
import Bottleneck from "bottleneck";

// Constants
const FULLNODE_PORT = 8444;
const LOCALHOST = "127.0.0.1";
const CHIA_NODES_HOST = "chia-nodes";
Expand All @@ -24,9 +21,8 @@ const DNS_HOSTS = [
];
const CONNECTION_TIMEOUT = 2000; // in milliseconds
const CACHE_DURATION = 30000; // in milliseconds
const COOLDOWN_DURATION = 60000; // in milliseconds
const COOLDOWN_DURATION = 600000; // 10 minutes in milliseconds
const MAX_PEERS_TO_FETCH = 5; // Maximum number of peers to fetch from DNS
const MAX_RETRIES = 3; // Maximum number of retry attempts
const MAX_REQUESTS_PER_MINUTE = 100; // Per-peer rate limit

/**
Expand All @@ -45,12 +41,12 @@ export class FullNodePeer {
// Singleton instance
private static instance: FullNodePeer | null = null;

// Cached peer with timestamp
private static cachedPeer: { peer: Peer; timestamp: number } | null = null;

// Cooldown cache to exclude faulty peers temporarily
private static cooldownCache = new NodeCache({ stdTTL: COOLDOWN_DURATION / 1000 });

// Failed DNS hosts cooldown cache
private static failedDNSCache = new NodeCache({ stdTTL: COOLDOWN_DURATION / 1000 });

// Peer reliability weights
private static peerWeights: Map<string, number> = new Map();

Expand All @@ -66,6 +62,9 @@ export class FullNodePeer {
// Map to store rate limiters per peer IP
private static peerLimiters: Map<string, Bottleneck> = new Map();

// Round-robin index
private static roundRobinIndex: number = 0;

// Private constructor for singleton pattern
private constructor(private peer: Peer) {}

Expand All @@ -91,7 +90,7 @@ export class FullNodePeer {
this.peer = bestPeer;
FullNodePeer.instance = this; // Assign the initialized instance
} catch (error: any) {
console.error(`Initialization failed: ${error.message}`);
console.error(`Fullnode Initialization failed: ${error.message}`);
throw error;
}
}
Expand All @@ -103,6 +102,8 @@ export class FullNodePeer {
*/
public static async connect(): Promise<Peer> {
const instance = FullNodePeer.getInstance();
// Remove cached peer to ensure a new connection each time
FullNodePeer.cachedPeer = null;

Check failure on line 106 in src/blockchain/FullNodePeer.ts

View workflow job for this annotation

GitHub Actions / Build and Publish to npm

Property 'cachedPeer' does not exist on type 'typeof FullNodePeer'.
await instance.initialize();
return instance.peer;
}
Expand Down Expand Up @@ -216,6 +217,12 @@ export class FullNodePeer {
// Fetch peers from DNS introducers
const fetchedPeers: string[] = [];
for (const DNS_HOST of DNS_HOSTS) {
// Check if DNS_HOST is in failedDNSCache
if (FullNodePeer.failedDNSCache.has(DNS_HOST)) {
console.warn(`Skipping DNS host ${DNS_HOST} due to recent failure.`);
continue;
}

try {
const ips = await resolve4(DNS_HOST);
if (ips && ips.length > 0) {
Expand All @@ -238,6 +245,8 @@ export class FullNodePeer {
}
} catch (error: any) {
console.error(`Failed to resolve IPs from ${DNS_HOST}: ${error.message}`);
// Add DNS_HOST to failedDNSCache for cooldown
FullNodePeer.failedDNSCache.set(DNS_HOST, true);
}
}

Expand Down Expand Up @@ -286,32 +295,22 @@ export class FullNodePeer {
}

/**
* Selects a peer based on weighted random selection.
* Prioritized peers have higher weights.
* Selects the next peer based on round-robin selection.
* @returns {string} The selected peer IP.
*/
private static selectPeerByWeight(): string {
const peers = Array.from(FullNodePeer.peerWeights.entries())
.filter(([ip, _]) => !FullNodePeer.cooldownCache.has(ip))
.map(([ip, weight]) => ({ ip, weight }));

const totalWeight = peers.reduce((sum, peer) => sum + peer.weight, 0);
if (totalWeight === 0) {
throw new Error("All peers are in cooldown.");
}
private static selectPeerRoundRobin(): string {
const peers = Array.from(FullNodePeer.peerWeights.keys()).filter(
(ip) => !FullNodePeer.cooldownCache.has(ip)
);

const random = Math.random() * totalWeight;
let cumulative = 0;

for (const peer of peers) {
cumulative += peer.weight;
if (random < cumulative) {
return peer.ip;
}
if (peers.length === 0) {
throw new Error("All fullnode peers are in cooldown.");
}

// Fallback
return peers[peers.length - 1].ip;
// Round-robin selection
const selectedPeer = peers[FullNodePeer.roundRobinIndex % peers.length];
FullNodePeer.roundRobinIndex += 1;
return selectedPeer;
}

/**
Expand All @@ -332,32 +331,26 @@ export class FullNodePeer {
}

/**
* Connects to the best available peer based on weighted selection and reliability.
* Connects to the best available peer based on round-robin selection and reliability.
* @returns {Promise<Peer>} The connected Peer instance.
*/
private async getBestPeer(): Promise<Peer> {
const now = Date.now();

// Refresh cachedPeer if expired
if (
FullNodePeer.cachedPeer &&
now - FullNodePeer.cachedPeer.timestamp < CACHE_DURATION
) {
return FullNodePeer.cachedPeer.peer;
}
// Removed cachedPeer logic to ensure a new connection each time

// Fetch peer IPs
const peerIPs = await FullNodePeer.getPeerIPs();

// Setup peer weights with prioritization
FullNodePeer.setupPeers(peerIPs);

// Weighted random selection
// Round-robin selection
let selectedPeerIP: string;
try {
selectedPeerIP = FullNodePeer.selectPeerByWeight();
selectedPeerIP = FullNodePeer.selectPeerRoundRobin();
} catch (error: any) {
throw new Error(`Failed to select a peer: ${error.message}`);
throw new Error(`Failed to select a fullnode peer: ${error.message}`);
}

// Attempt to create a peer connection
Expand All @@ -376,7 +369,7 @@ export class FullNodePeer {
peer = await Peer.new(`${selectedPeerIP}:${FULLNODE_PORT}`, false, tls);
} catch (error: any) {
console.error(
`Failed to create peer for IP ${selectedPeerIP}: ${error.message}`
`Failed to create fullnode peer for IP ${selectedPeerIP}: ${error.message}`
);
// Add to cooldown
FullNodePeer.cooldownCache.set(selectedPeerIP, true);
Expand All @@ -387,7 +380,7 @@ export class FullNodePeer {
} else {
FullNodePeer.peerWeights.delete(selectedPeerIP);
}
throw new Error(`Unable to connect to peer ${selectedPeerIP}`);
throw new Error(`Unable to connect to fullnode peer ${selectedPeerIP}`);
}

// Create a Bottleneck limiter for this peer
Expand All @@ -407,9 +400,6 @@ export class FullNodePeer {
FullNodePeer.peerLimiters.set(selectedPeerIP, limiter);
const proxiedPeer = this.createPeerProxy(peer, selectedPeerIP);

// Cache the peer
FullNodePeer.cachedPeer = { peer: peer, timestamp: now };

console.log(`Using Fullnode Peer: ${selectedPeerIP}`);

return proxiedPeer;
Expand All @@ -427,9 +417,8 @@ export class FullNodePeer {
// Start the timeout to forget the peer after 1 minute
const timeoutPromise = new Promise<null>((_, reject) => {
timeoutId = setTimeout(() => {
FullNodePeer.cachedPeer = null;
reject(
new Error("Operation timed out. Reconnecting to a new peer.")
new Error("Operation timed out. Reconnecting to a new fullnode peer.")
);
}, 60000); // 1 minute
});
Expand All @@ -453,9 +442,7 @@ export class FullNodePeer {
error.message.includes("WebSocket") ||
error.message.includes("Operation timed out")
) {
FullNodePeer.cachedPeer = null;

this.handlePeerDisconnection(peerIP);
FullNodePeer.handlePeerDisconnection(peerIP);
const newPeer = await this.getBestPeer();
return (newPeer as any)[prop](...args);
}
Expand All @@ -472,7 +459,7 @@ export class FullNodePeer {
* Handles peer disconnection by marking it in cooldown and updating internal states.
* @param {string} peerIP - The IP address of the disconnected peer.
*/
public handlePeerDisconnection(peerIP: string): void {
public static handlePeerDisconnection(peerIP: string): void {
// Add the faulty peer to the cooldown cache
FullNodePeer.cooldownCache.set(peerIP, true);

Expand All @@ -490,7 +477,7 @@ export class FullNodePeer {
// Remove the limiter
FullNodePeer.peerLimiters.delete(peerIP);

console.warn(`Peer ${peerIP} has been marked as disconnected and is in cooldown.`);
console.warn(`Fullnode Peer ${peerIP} has been marked as disconnected and is in cooldown.`);
}

/**
Expand Down Expand Up @@ -521,21 +508,21 @@ export class FullNodePeer {
try {
peer = await FullNodePeer.connect();
} catch (error: any) {
spinner.error({ text: "Failed to connect to a peer." });
spinner.error({ text: "Failed to connect to a fullnode peer." });
console.error(`waitForConfirmation connection error: ${error.message}`);
throw error;
}

// Extract peer IP to access the corresponding limiter
const peerIP = FullNodePeer.extractPeerIP(peer);
if (!peerIP) {
spinner.error({ text: "Failed to extract peer IP." });
spinner.error({ text: "Failed to extract fullnode peer IP." });
throw new Error("Failed to extract peer IP.");
}

const limiter = FullNodePeer.peerLimiters.get(peerIP);
if (!limiter) {
spinner.error({ text: "No rate limiter found for the peer." });
spinner.error({ text: "No rate limiter found for the fullnode peer." });
throw new Error("No rate limiter found for the peer.");
}

Expand Down

0 comments on commit 8123784

Please sign in to comment.