Skip to content

Commit

Permalink
feat(ArweaveCompositeClient): Search Arweave network for chunks
Browse files Browse the repository at this point in the history
  * add method 'peerGetChunk'
  * attempt to fetch chunk from a random peer before trustedPeers
  * new prometheus metrics 'request_chunk_total' and 'get_chunk_total'
  * add env-var 'ARWEAVE_NODE_IGNORE_URLS' to remove specific peers from
    the arweave node peer list
  • Loading branch information
hlolli committed Jan 29, 2025
1 parent feee6e0 commit da6c2d4
Show file tree
Hide file tree
Showing 3 changed files with 264 additions and 50 deletions.
295 changes: 245 additions & 50 deletions src/arweave/composite-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ import CircuitBreaker from 'opossum';

import { FailureSimulator } from '../lib/chaos.js';
import { fromB64Url } from '../lib/encoding.js';
import {
WeightedElement,
randomWeightedChoices,
} from '../lib/random-weighted-choices.js';
import {
sanityCheckBlock,
sanityCheckChunk,
Expand Down Expand Up @@ -112,6 +116,7 @@ export class ArweaveCompositeClient
// Peers
private peers: Record<string, Peer> = {};
private preferredPeers: Set<Peer> = new Set();
private weightedChunkPeers: WeightedElement<string>[] = [];

// Block and TX promise caches used for prefetching
private blockByHeightPromiseCache = new NodeCache({
Expand Down Expand Up @@ -256,6 +261,9 @@ export class ArweaveCompositeClient
}

async refreshPeers(): Promise<void> {
const log = this.log.child({ method: 'refreshPeers' });
log.debug('Refreshing peers...');

try {
const response = await this.trustedNodeRequest({
method: 'GET',
Expand All @@ -264,33 +272,59 @@ export class ArweaveCompositeClient
const peerHosts = response.data as string[];
await Promise.all(
peerHosts.map(async (peerHost) => {
try {
const peerUrl = `http://${peerHost}`;
const response = await axios({
method: 'GET',
url: '/info',
baseURL: peerUrl,
});
this.peers[peerHost] = {
url: peerUrl,
blocks: response.data.blocks,
height: response.data.height,
lastSeen: new Date().getTime(),
};
if (response.data.blocks / response.data.height > 0.9) {
this.preferredPeers.add(this.peers[peerHost]);
if (!config.ARWEAVE_NODE_IGNORE_URLS.includes(peerHost)) {
try {
const peerUrl = `http://${peerHost}`;
const response = await axios({
method: 'GET',
url: '/info',
baseURL: peerUrl,
});
this.peers[peerHost] = {
url: peerUrl,
blocks: response.data.blocks,
height: response.data.height,
lastSeen: new Date().getTime(),
};
if (response.data.blocks / response.data.height > 0.9) {
this.preferredPeers.add(this.peers[peerHost]);
}
} catch (error) {
metrics.arweavePeerInfoErrorCounter.inc();
}
} catch (error) {
metrics.arweavePeerInfoErrorCounter.inc();
} else {
this.log.debug('Ignoring peer:', { peerHost });
}
return;
}),
);
this.weightedChunkPeers = Object.values(this.peers).map((peerObject) => {
const previousWeight =
this.weightedChunkPeers.find((peer) => peer.id === peerObject.url)
?.weight ?? undefined;
return {
id: peerObject.url,
weight: previousWeight === undefined ? 50 : previousWeight,
};
});
} catch (error) {
metrics.arweavePeerRefreshErrorCounter.inc();
}
}

selectChunkPeers(peerCount: number): string[] {
const log = this.log.child({ method: 'selectChunkPeers' });

if (this.weightedChunkPeers.length === 0) {
log.warn('No weighted chunk peers available');
throw new Error('No weighted chunk peers available');
}

return randomWeightedChoices<string>({
table: this.weightedChunkPeers,
count: peerCount,
});
}

async trustedNodeRequest(request: AxiosRequestConfig) {
while (this.trustedNodeRequestBucket <= 0) {
await wait(100);
Expand Down Expand Up @@ -619,6 +653,124 @@ export class ArweaveCompositeClient
return response.data;
}

handlePeerSuccess(
peer: string,
method: string,
sourceType: 'trusted' | 'peer',
): void {
metrics.requestChunkTotal.inc({
status: 'success',
method,
class: this.constructor.name,
source: peer,
source_type: sourceType,
});
if (sourceType === 'peer') {
// warm the succeeding peer
this.weightedChunkPeers.forEach((weightedChunkPeer) => {
if (weightedChunkPeer.id === peer) {
weightedChunkPeer.weight = Math.min(
weightedChunkPeer.weight + config.WEIGHTED_PEERS_TEMPERATURE_DELTA,
100,
);
}
});
}
}

handlePeerFailure(
peer: string,
method: string,
sourceType: 'trusted' | 'peer',
): void {
metrics.requestChunkTotal.inc({
status: 'error',
method,
class: this.constructor.name,
source: peer,
source_type: sourceType,
});
if (sourceType === 'peer') {
// cool the failing peer
this.weightedChunkPeers.forEach((weightedChunkPeer) => {
if (weightedChunkPeer.id === peer) {
weightedChunkPeer.weight = Math.max(
weightedChunkPeer.weight - config.WEIGHTED_PEERS_TEMPERATURE_DELTA,
1,
);
}
});
}
}

async peerGetChunk({
absoluteOffset,
retryCount,
txSize,
dataRoot,
relativeOffset,
}: {
txSize: number;
absoluteOffset: number;
dataRoot: string;
relativeOffset: number;
retryCount: number;
}): Promise<Chunk> {
const randomChunkPeers = this.selectChunkPeers(retryCount);
for (const randomChunkPeer of randomChunkPeers) {
try {
const response = await axios({
method: 'GET',
url: `/chunk/${absoluteOffset}`,
baseURL: randomChunkPeer,
timeout: 500,
});
const jsonChunk = response.data;

// Fast fail if chunk has the wrong structure
sanityCheckChunk(jsonChunk);

this.handlePeerSuccess(randomChunkPeer, 'peerGetChunk', 'peer');

const txPath = fromB64Url(jsonChunk.tx_path);
const dataRootBuffer = txPath.slice(-64, -32);
const dataPath = fromB64Url(jsonChunk.data_path);
const hash = dataPath.slice(-64, -32);

const chunk = {
tx_path: txPath,
data_root: dataRootBuffer,
data_size: txSize,
data_path: dataPath,
offset: relativeOffset,
hash,
chunk: fromB64Url(jsonChunk.chunk),
};

await validateChunk(
txSize,
chunk,
fromB64Url(dataRoot),
relativeOffset,
);

this.chunkCache.set(
{ absoluteOffset },
{
cachedAt: Date.now(),
chunk,
},
);

return chunk;
} catch {
this.handlePeerFailure(randomChunkPeer, 'peerGetChunk', 'peer');
}
}

throw new Error('Failed to fetch chunk from any peer');
}

async getChunkByAny(
txSize: number,
absoluteOffset: number,
Expand All @@ -632,44 +784,87 @@ export class ArweaveCompositeClient
cacheEntry &&
cacheEntry.cachedAt > Date.now() - CHUNK_CACHE_TTL_SECONDS * 1000
) {
metrics.getChunkTotal.inc({
status: 'success',
method: 'getChunkByAny',
class: this.constructor.name,
});
return cacheEntry.chunk;
}

const response = await this.trustedNodeRequestQueue.push({
method: 'GET',
url: `/chunk/${absoluteOffset}`,
});
const jsonChunk = response.data;

// Fast fail if chunk has the wrong structure
sanityCheckChunk(jsonChunk);

const txPath = fromB64Url(jsonChunk.tx_path);
const dataRootBuffer = txPath.slice(-64, -32);
const dataPath = fromB64Url(jsonChunk.data_path);
const hash = dataPath.slice(-64, -32);

const chunk = {
tx_path: txPath,
data_root: dataRootBuffer,
data_size: txSize,
data_path: dataPath,
offset: relativeOffset,
hash,
chunk: fromB64Url(jsonChunk.chunk),
};
try {
const result = this.peerGetChunk({
absoluteOffset,
retryCount: 3,
txSize,
dataRoot,
relativeOffset,
});
metrics.getChunkTotal.inc({
status: 'success',
method: 'getChunkByAny',
class: this.constructor.name,
});
return result;
} catch {
this.log.debug(
'Failed to fetch chunk from peers, moving forward to trusted nodes',
);
}

try {
const response = await this.trustedNodeRequestQueue.push({
method: 'GET',
url: `/chunk/${absoluteOffset}`,
});
const jsonChunk = response.data;

// Fast fail if chunk has the wrong structure
sanityCheckChunk(jsonChunk);

this.handlePeerSuccess(this.trustedNodeUrl, 'getChunkByAny', 'trusted');

const txPath = fromB64Url(jsonChunk.tx_path);
const dataRootBuffer = txPath.slice(-64, -32);
const dataPath = fromB64Url(jsonChunk.data_path);
const hash = dataPath.slice(-64, -32);

const chunk = {
tx_path: txPath,
data_root: dataRootBuffer,
data_size: txSize,
data_path: dataPath,
offset: relativeOffset,
hash,
chunk: fromB64Url(jsonChunk.chunk),
};

await validateChunk(txSize, chunk, fromB64Url(dataRoot), relativeOffset);
await validateChunk(txSize, chunk, fromB64Url(dataRoot), relativeOffset);

this.chunkCache.set(
{ absoluteOffset },
{
cachedAt: Date.now(),
chunk,
},
);
this.chunkCache.set(
{ absoluteOffset },
{
cachedAt: Date.now(),
chunk,
},
);

return chunk;
metrics.getChunkTotal.inc({
status: 'success',
method: 'getChunkByAny',
class: this.constructor.name,
});

return chunk;
} catch (error) {
this.handlePeerFailure(this.trustedNodeUrl, 'getChunkByAny', 'trusted');
metrics.getChunkTotal.inc({
status: 'error',
method: 'getChunkByAny',
class: this.constructor.name,
});
throw error;
}
}

async getChunkDataByAny(
Expand Down
3 changes: 3 additions & 0 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ export const WEIGHTED_PEERS_TEMPERATURE_DELTA = +env.varOrDefault(
'0.1',
);

export const ARWEAVE_NODE_IGNORE_URLS: string[] =
env.varOrUndefined('ARWEAVE_NODE_IGNORE_URLS')?.split(',') ?? [];

// Trusted chunk POST URLs (for posting chunks received at /chunk)
export const CHUNK_POST_URLS = env
.varOrDefault('CHUNK_POST_URLS', `${TRUSTED_NODE_URL}/chunk`)
Expand Down
16 changes: 16 additions & 0 deletions src/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,22 @@ export const getDataStreamSuccessesTotal = new promClient.Counter({
labelNames: ['class', 'source'] as const,
});

//
// Chunk source metrics
//

export const requestChunkTotal = new promClient.Counter({
name: 'request_chunk_total',
help: 'Count of each individual chunk http request, status can be "error" or "success", source_type can be "trusted" or "peer".',
labelNames: ['status', 'class', 'method', 'source', 'source_type'] as const,
});

export const getChunkTotal = new promClient.Counter({
name: 'get_chunk_total',
help: 'Higher level count of chunk discovery, counts when the caller request for chunk ends, stores the status of the request',
labelNames: ['status', 'class', 'method'] as const,
});

//
// Queue length metrics
//
Expand Down

0 comments on commit da6c2d4

Please sign in to comment.