Skip to content

Commit

Permalink
Merge pull request #132 from DIG-Network/release/v0.0.1-alpha.147
Browse files Browse the repository at this point in the history
Release/v0.0.1 alpha.147
  • Loading branch information
MichaelTaylor3D authored Oct 6, 2024
2 parents 901a922 + b21bae1 commit d2cfc04
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 81 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

All notable changes to this project will be documented in this file. See [standard-version](https://github.com/conventional-changelog/standard-version) for commit guidelines.

### [0.0.1-alpha.147](https://github.com/DIG-Network/dig-chia-sdk/compare/v0.0.1-alpha.146...v0.0.1-alpha.147) (2024-10-06)

### [0.0.1-alpha.146](https://github.com/DIG-Network/dig-chia-sdk/compare/v0.0.1-alpha.145...v0.0.1-alpha.146) (2024-10-06)

### [0.0.1-alpha.145](https://github.com/DIG-Network/dig-chia-sdk/compare/v0.0.1-alpha.144...v0.0.1-alpha.145) (2024-10-06)

### [0.0.1-alpha.144](https://github.com/DIG-Network/dig-chia-sdk/compare/v0.0.1-alpha.143...v0.0.1-alpha.144) (2024-10-06)
Expand Down
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@dignetwork/dig-sdk",
"version": "0.0.1-alpha.145",
"version": "0.0.1-alpha.147",
"description": "",
"type": "commonjs",
"main": "./dist/index.js",
Expand Down
109 changes: 49 additions & 60 deletions src/utils/PeerRanker.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
// src/PeerRanker.ts

import axios, { AxiosRequestConfig } from 'axios';
import fs from 'fs';
import https from 'https';
import { getOrCreateSSLCerts } from './ssl';
import { asyncPool } from './promiseUtils';

/**
* Interface representing the metrics of a peer.
Expand All @@ -14,23 +13,13 @@ export interface PeerMetrics {
bandwidth: number; // in bytes per second (upload speed)
}

/**
* Configuration options for the PeerRanker.
*/
interface PeerRankerOptions {
pingPath?: string; // Optional: Path for latency ping (e.g., '/ping')
timeout?: number; // Timeout for requests in milliseconds
uploadTestSize?: number; // Size of the data to upload in bytes
}

/**
* Utility class to rank peers based on latency and upload bandwidth using HTTPS with mTLS.
*/
export class PeerRanker {
private ipAddresses: string[];
private static certPath: string;
private static keyPath: string;
private pingPath: string;
private timeout: number;
private uploadTestSize: number;

Expand All @@ -41,14 +30,13 @@ export class PeerRanker {
/**
* Constructs a PeerRanker instance.
* @param ipAddresses - Array of IP addresses to rank.
* @param options - Configuration options including paths to client certificates.
*/
constructor(ipAddresses: string[], options: PeerRankerOptions) {
constructor(ipAddresses: string[], timeout: number = 5000, uploadTestSize: number = 1024 * 1024) {
this.ipAddresses = ipAddresses;
this.pingPath = options.pingPath || '/'; // Default to root path if not provided
this.timeout = options.timeout || 5000; // Default timeout: 5 seconds
this.uploadTestSize = options.uploadTestSize || 1024 * 1024; // Default: 1MB
this.timeout = timeout; // Allow customizable timeout
this.uploadTestSize = uploadTestSize; // Default upload size: 1MB

// Fetch the SSL certificates used for mTLS.
const { certPath, keyPath } = getOrCreateSSLCerts();
PeerRanker.certPath = certPath;
PeerRanker.keyPath = keyPath;
Expand All @@ -58,41 +46,38 @@ export class PeerRanker {
* Measures the latency of a given IP address using an HTTPS request.
* Tries HEAD first, then falls back to GET if HEAD is not supported.
* @param ip - The IP address of the peer.
* @returns Promise resolving to the latency in milliseconds.
* @returns Promise resolving to the latency in milliseconds or rejecting if the peer fails.
*/
private async measureLatency(ip: string): Promise<number> {
const path = this.pingPath;
const url = `https://${ip}${path}`;
const url = `https://${ip}:4159/diagnostics/ping`;

// Configuration for HEAD request
const configHead: AxiosRequestConfig = {
url: url,
method: 'HEAD',
httpsAgent: new https.Agent({
cert: fs.readFileSync(PeerRanker.certPath),
key: fs.readFileSync(PeerRanker.keyPath),
rejectUnauthorized: false, // Set to true in production
rejectUnauthorized: false,
}),
timeout: this.timeout,
validateStatus: (status) => status < 500, // Resolve only if status is less than 500
validateStatus: (status) => status < 500,
};

const startTime = Date.now();
try {
const response = await axios(configHead);
if (response.status === 405) { // Method Not Allowed
// Fallback to GET with Range header to minimize data transfer
if (response.status === 405) {
const configGet: AxiosRequestConfig = {
url: url,
method: 'GET',
httpsAgent: new https.Agent({
cert: fs.readFileSync(PeerRanker.certPath),
key: fs.readFileSync(PeerRanker.keyPath),
rejectUnauthorized: false, // Set to true in production
rejectUnauthorized: false,
}),
timeout: this.timeout,
headers: {
'Range': 'bytes=0-0', // Request only the first byte
'Range': 'bytes=0-0',
},
validateStatus: (status) => status < 500,
};
Expand All @@ -102,20 +87,18 @@ export class PeerRanker {
return latency;
} catch (error: any) {
console.error(`Latency measurement failed for IP ${ip}:`, error.message);
return Infinity; // Indicate unreachable or unresponsive peer
throw new Error(`Latency measurement failed for IP ${ip}`);
}
}

/**
* Measures the upload bandwidth of a given IP address by sending random data.
* @param ip - The IP address of the peer.
* @returns Promise resolving to the upload bandwidth in bytes per second.
* @returns Promise resolving to the upload bandwidth in bytes per second or rejecting if the peer fails.
*/
private async measureBandwidth(ip: string): Promise<number> {
const url = `https://${ip}/upload`; // Assume /upload as the endpoint for upload testing

// Generate random data
const randomData = Buffer.alloc(this.uploadTestSize, 'a'); // 1MB of 'a's
const url = `https://${ip}:4159/diagnostics/bandwidth`;
const randomData = Buffer.alloc(this.uploadTestSize, 'a');

const config: AxiosRequestConfig = {
url: url,
Expand All @@ -128,44 +111,52 @@ export class PeerRanker {
httpsAgent: new https.Agent({
cert: fs.readFileSync(PeerRanker.certPath),
key: fs.readFileSync(PeerRanker.keyPath),
rejectUnauthorized: false, // Set to true in production
rejectUnauthorized: false,
}),
timeout: this.timeout,
maxContentLength: Infinity,
maxBodyLength: Infinity,
};

return new Promise<number>((resolve) => {
const startTime = Date.now();

axios(config)
.then(() => {
const timeElapsed = (Date.now() - startTime) / 1000; // seconds
const bandwidth = this.uploadTestSize / timeElapsed; // bytes per second
resolve(bandwidth);
})
.catch((error: any) => {
console.error(`Bandwidth measurement failed for IP ${ip}:`, error.message);
resolve(0); // Indicate failure in measuring bandwidth
});
});
const startTime = Date.now();

try {
await axios(config);
const timeElapsed = (Date.now() - startTime) / 1000;
const bandwidth = this.uploadTestSize / timeElapsed;
return bandwidth;
} catch (error: any) {
console.error(`Bandwidth measurement failed for IP ${ip}:`, error.message);
throw new Error(`Bandwidth measurement failed for IP ${ip}`);
}
}

/**
* Ranks the peers based on measured latency and upload bandwidth.
* Unresponsive peers are excluded from the final ranking.
* @param cooldown - Cooldown time in milliseconds between batches.
* @returns Promise resolving to an array of PeerMetrics sorted by latency and bandwidth.
*/
public async rankPeers(): Promise<PeerMetrics[]> {
const metricsPromises = this.ipAddresses.map(async (ip) => {
const [latency, bandwidth] = await Promise.all([
this.measureLatency(ip),
this.measureBandwidth(ip),
]);

return { ip, latency, bandwidth };
});
public async rankPeers(cooldown: number = 500): Promise<PeerMetrics[]> {
const limit = 5; // Limit to 5 parallel requests at a time

const iteratorFn = async (ip: string): Promise<PeerMetrics | null> => {
try {
const [latency, bandwidth] = await Promise.all([
this.measureLatency(ip),
this.measureBandwidth(ip),
]);
return { ip, latency, bandwidth };
} catch (error) {
// Peer failed, skip it by returning null
return null;
}
};

const peerMetrics: PeerMetrics[] = await Promise.all(metricsPromises);
// Process all peers with a concurrency limit and cooldown between batches
const peerMetrics: PeerMetrics[] = (
await asyncPool(limit, this.ipAddresses, iteratorFn, cooldown)
).filter((metrics: any): metrics is PeerMetrics => metrics !== null); // Use a type guard

// Sort by lowest latency first, then by highest bandwidth
peerMetrics.sort((a, b) => {
Expand All @@ -175,9 +166,7 @@ export class PeerRanker {
return a.latency - b.latency; // Lower latency is better
});

// Update the internal sorted list
this.sortedPeers = peerMetrics;
// Reset the iterator index
this.currentIndex = 0;

return peerMetrics;
Expand Down
40 changes: 22 additions & 18 deletions src/utils/promiseUtils.ts
Original file line number Diff line number Diff line change
@@ -1,33 +1,37 @@

/**
* Processes items in sequential batches with a concurrency limit.
* Adds a cooldown between batches.
* @param {number} limit - The maximum number of concurrent executions per batch.
* @param {Array<T>} items - The array of items to process.
* @param {(item: T) => Promise<R>} iteratorFn - The async function to apply to each item.
* @param {number} cooldownMs - The cooldown duration between batches in milliseconds.
* @returns {Promise<Array<R>>} - A promise that resolves when all items have been processed.
*/
export async function asyncPool<T, R>(
limit: number,
items: T[],
iteratorFn: (item: T) => Promise<R>
): Promise<R[]> {
const ret: R[] = [];

for (let i = 0; i < items.length; i += limit) {
const batchItems = items.slice(i, i + limit);
const batchPromises = batchItems.map((item) => iteratorFn(item));

// Wait for the current batch to complete before starting the next one
const batchResults = await Promise.all(batchPromises);
ret.push(...batchResults);

// Optional: add a cooldown between batches
// await new Promise((resolve) => setTimeout(resolve, 500));
limit: number,
items: T[],
iteratorFn: (item: T) => Promise<R>,
cooldownMs: number = 500 // Default cooldown of 500ms
): Promise<R[]> {
const ret: R[] = [];

for (let i = 0; i < items.length; i += limit) {
const batchItems = items.slice(i, i + limit);
const batchPromises = batchItems.map((item) => iteratorFn(item));

// Wait for the current batch to complete before starting the next one
const batchResults = await Promise.all(batchPromises);
ret.push(...batchResults);

// Add a cooldown between batches, except after the last batch
if (i + limit < items.length) {
await new Promise((resolve) => setTimeout(resolve, cooldownMs));
}

return ret;
}

return ret;
}
/**
* Helper function to add a timeout to a promise.
* @param promise The original promise.
Expand Down

0 comments on commit d2cfc04

Please sign in to comment.