Skip to content

Commit

Permalink
feat: queue chunk downloads when data roots don't match
Browse files Browse the repository at this point in the history
  * Rename BundleDataImporter to DataImporter
  * Update log messages to refer to “contiguous data” instead of “bundles”
  * Only queue for unbundling after download if ans104 unbundler is
    passed
  * Set ENABLE_BACKGROUND_DATA_VERIFICATION to true by default
  • Loading branch information
hlolli committed Jan 24, 2025
1 parent 0b02e18 commit df9391b
Show file tree
Hide file tree
Showing 9 changed files with 87 additions and 51 deletions.
4 changes: 2 additions & 2 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,10 @@ services:
- MAX_DATA_ITEM_QUEUE_SIZE=${MAX_DATA_ITEM_QUEUE_SIZE:-}
- TAG_SELECTIVITY=${TAG_SELECTIVITY:-}
- MAX_EXPECTED_DATA_ITEM_INDEXING_INTERVAL_SECONDS=${MAX_EXPECTED_DATA_ITEM_INDEXING_INTERVAL_SECONDS:-}
- ENABLE_BACKGROUND_DATA_VERIFICATION=${ENABLE_BACKGROUND_DATA_VERIFICATION:-}
- ENABLE_BACKGROUND_DATA_VERIFICATION=${ENABLE_BACKGROUND_DATA_VERIFICATION:-true}
- BACKGROUND_DATA_VERIFICATION_INTERVAL_SECONDS=${BACKGROUND_DATA_VERIFICATION_INTERVAL_SECONDS:-}
- CLICKHOUSE_URL=${CLICKHOUSE_URL:-}
- BUNDLE_DATA_IMPORTER_QUEUE_SIZE=${BUNDLE_DATA_IMPORTER_QUEUE_SIZE:-}
- DATA_IMPORTER_QUEUE_SIZE=${DATA_IMPORTER_QUEUE_SIZE:-}
- FS_CLEANUP_WORKER_BATCH_SIZE=${FS_CLEANUP_WORKER_BATCH_SIZE:-}
- FS_CLEANUP_WORKER_BATCH_PAUSE_DURATION=${FS_CLEANUP_WORKER_BATCH_PAUSE_DURATION:-}
- FS_CLEANUP_WORKER_RESTART_PAUSE_DURATION=${FS_CLEANUP_WORKER_RESTART_PAUSE_DURATION:-}
Expand Down
2 changes: 1 addition & 1 deletion docs/envs.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ This document describes the environment variables that can be used to configure
| WRITE_ANS104_DATA_ITEM_DB_SIGNATURES | Boolean | false | If true, the data item signatures will be written to the database. |
| WRITE_TRANSACTION_DB_SIGNATURES | Boolean | false | If true, the transactions signatures will be written to the database. |
| ENABLE_DATA_DB_WAL_CLEANUP | Boolean | false | If true, the data database WAL cleanup worker will be enabled |
| ENABLE_BACKGROUND_DATA_VERIFICATION | Boolean | false | If true, unverified data will be verified in background |
| ENABLE_BACKGROUND_DATA_VERIFICATION | Boolean | true | If true, unverified data will be verified in background |
| MAX_DATA_ITEM_QUEUE_SIZE | Number | 100000 | Sets the maximum number of data items to queue for indexing before skipping indexing new data items |
| ARNS_ROOT_HOST | String | undefined | Domain name for ArNS host |
| SANDBOX_PROTOCOL | String | undefined | Protocol setting in process of creating sandbox domain in ArNS (ARNS_ROOT_HOST needs to be set for this env to have any effect) |
Expand Down
4 changes: 2 additions & 2 deletions monitoring/grafana/dashboards/examples/queues-example.json
Original file line number Diff line number Diff line change
Expand Up @@ -856,13 +856,13 @@
"uid": "fDMLtag4k"
},
"editorMode": "builder",
"expr": "queue_length{job=\"ar_io_nodes\", queue_name=\"bundleDataImporter\"}",
"expr": "queue_length{job=\"ar_io_nodes\", queue_name=\"dataImporter\"}",
"legendFormat": "{{instance}}",
"range": true,
"refId": "A"
}
],
"title": "bundleDataImporter",
"title": "dataImporter",
"type": "timeseries"
},
{
Expand Down
6 changes: 3 additions & 3 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,9 @@ export const MAX_DATA_ITEM_QUEUE_SIZE = +env.varOrDefault(
'100000',
);

// The maximum number of bundles to queue for unbundling before skipping
export const BUNDLE_DATA_IMPORTER_QUEUE_SIZE = +env.varOrDefault(
'BUNDLE_DATA_IMPORTER_QUEUE_SIZE',
// The maximum number of data imports to queue for unbundling/importing before skipping
export const DATA_IMPORTER_QUEUE_SIZE = +env.varOrDefault(
'DATA_IMPORTER_QUEUE_SIZE',
'1000',
);

Expand Down
2 changes: 1 addition & 1 deletion src/routes/ar-io.ts
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ arIoRouter.post(
return;
}

if (await system.bundleDataImporter.isQueueFull()) {
if (await system.dataImporter.isQueueFull()) {
res.status(429).send('Bundle importer queue is full');
return;
}
Expand Down
14 changes: 7 additions & 7 deletions src/system.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import { ReadThroughChunkDataCache } from './data/read-through-chunk-data-cache.
import { ReadThroughDataCache } from './data/read-through-data-cache.js';
import { SequentialDataSource } from './data/sequential-data-source.js';
import { TxChunksDataSource } from './data/tx-chunks-data-source.js';
import { BundleDataImporter } from './workers/bundle-data-importer.js';
import { DataImporter } from './workers/data-importer.js';
import { CompositeClickHouseDatabase } from './database/composite-clickhouse.js';
import { StandaloneSqliteDatabase } from './database/standalone-sqlite.js';
import * as events from './events.js';
Expand Down Expand Up @@ -472,15 +472,15 @@ metrics.registerQueueLengthGauge('ans104Unbundler', {
length: () => ans104Unbundler.queueDepth(),
});

export const bundleDataImporter = new BundleDataImporter({
export const dataImporter = new DataImporter({
log,
contiguousDataSource: backgroundContiguousDataSource,
ans104Unbundler,
workerCount: config.ANS104_DOWNLOAD_WORKERS,
maxQueueSize: config.BUNDLE_DATA_IMPORTER_QUEUE_SIZE,
maxQueueSize: config.DATA_IMPORTER_QUEUE_SIZE,
});
metrics.registerQueueLengthGauge('bundleDataImporter', {
length: () => bundleDataImporter.queueDepth(),
metrics.registerQueueLengthGauge('dataImporter', {
length: () => dataImporter.queueDepth(),
});
export type QueueBundleResponse = {
status: 'skipped' | 'queued' | 'error';
Expand Down Expand Up @@ -516,7 +516,7 @@ export async function queueBundle(
indexFilter: config.ANS104_INDEX_FILTER_STRING,
queuedAt: currentUnixTimestamp(),
});
bundleDataImporter.queueItem(
dataImporter.queueItem(
{
...item,
index:
Expand Down Expand Up @@ -720,7 +720,7 @@ export const shutdown = async (exitCode = 0) => {
await txFetcher.stop();
await txOffsetImporter.stop();
await txOffsetRepairWorker.stop();
await bundleDataImporter.stop();
await dataImporter.stop();
await bundleRepairWorker.stop();
await ans104DataIndexer.stop();
await ans104Unbundler.stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import {
} from 'node:test';
import winston from 'winston';
import { ContiguousDataSource } from '../types.js';
import { BundleDataImporter } from './bundle-data-importer.js';
import { DataImporter } from './data-importer.js';

class Ans104UnbundlerStub {
async queueItem(): Promise<void> {
Expand All @@ -44,10 +44,10 @@ class Ans104UnbundlerStub {
}
}

describe('BundleDataImporter', () => {
describe('DataImporter', () => {
let log: winston.Logger;
let bundleDataImporter: BundleDataImporter;
let bundleDataImporterWithFullQueue: BundleDataImporter;
let dataImporter: DataImporter;
let dataImporterWithFullQueue: DataImporter;
let contiguousDataSource: ContiguousDataSource;
let ans104Unbundler: any;
let mockItem: any;
Expand All @@ -70,20 +70,20 @@ describe('BundleDataImporter', () => {
});

after(async () => {
await bundleDataImporter.stop();
await bundleDataImporterWithFullQueue.stop();
await dataImporter.stop();
await dataImporterWithFullQueue.stop();
});

beforeEach(() => {
ans104Unbundler = new Ans104UnbundlerStub();
bundleDataImporter = new BundleDataImporter({
dataImporter = new DataImporter({
log,
contiguousDataSource,
ans104Unbundler,
workerCount: 1,
maxQueueSize: 1,
});
bundleDataImporterWithFullQueue = new BundleDataImporter({
dataImporterWithFullQueue = new DataImporter({
log,
contiguousDataSource,
ans104Unbundler,
Expand All @@ -100,7 +100,7 @@ describe('BundleDataImporter', () => {
it('should queue a non-prioritized item if queue is not full', async () => {
mock.method(contiguousDataSource, 'getData');

await bundleDataImporter.queueItem(mockItem, false);
await dataImporter.queueItem(mockItem, false);

assert.deepEqual(
(contiguousDataSource.getData as any).mock.calls[0].arguments[0],
Expand All @@ -111,15 +111,15 @@ describe('BundleDataImporter', () => {
it('should not queue a non-prioritized item if queue is full', async () => {
mock.method(contiguousDataSource, 'getData');

await bundleDataImporterWithFullQueue.queueItem(mockItem, false);
await dataImporterWithFullQueue.queueItem(mockItem, false);

assert.equal((contiguousDataSource.getData as any).mock.callCount(), 0);
});

it('should queue a prioritized item if the queue is not full', async () => {
mock.method(contiguousDataSource, 'getData');

await bundleDataImporter.queueItem(mockItem, true);
await dataImporter.queueItem(mockItem, true);

assert.deepEqual(
(contiguousDataSource.getData as any).mock.calls[0].arguments[0],
Expand All @@ -130,7 +130,7 @@ describe('BundleDataImporter', () => {
it('should queue a prioritized item if the queue is full', async () => {
mock.method(contiguousDataSource, 'getData');

await bundleDataImporterWithFullQueue.queueItem(mockItem, true);
await dataImporterWithFullQueue.queueItem(mockItem, true);

assert.deepEqual(
(contiguousDataSource.getData as any).mock.calls[0].arguments[0],
Expand All @@ -142,15 +142,15 @@ describe('BundleDataImporter', () => {
describe('download', () => {
it('should download and queue the item for unbundling', async () => {
mock.method(ans104Unbundler, 'queueItem');
bundleDataImporter = new BundleDataImporter({
dataImporter = new DataImporter({
log,
contiguousDataSource,
ans104Unbundler: ans104Unbundler,
workerCount: 1,
maxQueueSize: 1,
});

await bundleDataImporter.download({
await dataImporter.download({
item: mockItem,
prioritized: true,
bypassFilter: false,
Expand All @@ -169,7 +169,7 @@ describe('BundleDataImporter', () => {

await assert.rejects(
async () => {
await bundleDataImporter.download({
await dataImporter.download({
item: mockItem,
prioritized: true,
bypassFilter: false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,42 +31,46 @@ interface IndexProperty {
index: number;
}

type AnyContiguousData = { id: string };
type UnbundleableItem = (NormalizedDataItem | PartialJsonTransaction) &
IndexProperty;
type ImportableItem = AnyContiguousData | UnbundleableItem;

interface UnbundlingQueueItem {
item: UnbundleableItem;
interface DataImporterQueueItem {
item: ImportableItem;
prioritized: boolean | undefined;
bypassFilter: boolean;
}

export class BundleDataImporter {
export class DataImporter {
// Dependencies
private log: winston.Logger;
private contiguousDataSource: ContiguousDataSource;
private ans104Unbundler: Ans104Unbundler;
private ans104Unbundler: Ans104Unbundler | undefined;

// Unbundling queue
// contiguous-data queue
private workerCount: number;
private maxQueueSize: number;
private queue: queueAsPromised<UnbundlingQueueItem, void>;
private queue: queueAsPromised<DataImporterQueueItem, void>;

constructor({
log,
contiguousDataSource,
ans104Unbundler,
workerCount,
maxQueueSize = config.BUNDLE_DATA_IMPORTER_QUEUE_SIZE,
maxQueueSize = config.DATA_IMPORTER_QUEUE_SIZE,
}: {
log: winston.Logger;
contiguousDataSource: ContiguousDataSource;
ans104Unbundler: Ans104Unbundler;
ans104Unbundler?: Ans104Unbundler;
workerCount: number;
maxQueueSize?: number;
}) {
this.log = log.child({ class: this.constructor.name });
this.contiguousDataSource = contiguousDataSource;
this.ans104Unbundler = ans104Unbundler;
if (ans104Unbundler) {
this.ans104Unbundler = ans104Unbundler;
}
this.workerCount = workerCount;
this.maxQueueSize = maxQueueSize;
this.queue = fastq.promise(
Expand All @@ -76,47 +80,56 @@ export class BundleDataImporter {
}

async queueItem(
item: UnbundleableItem,
item: ImportableItem,
prioritized: boolean | undefined,
bypassFilter = false,
): Promise<void> {
const log = this.log.child({ method: 'queueItem', id: item.id });
if (this.workerCount === 0) {
log.debug('Skipping bundle download, no workers.');
log.debug('Skipping contiguous-data download, no workers.');
return;
}

if (prioritized === true) {
log.debug('Queueing prioritized bundle download...');
log.debug('Queueing prioritized contiguous-data download...');
this.queue.unshift({ item, prioritized, bypassFilter });
log.debug('Prioritized bundle download queued.');
log.debug('Prioritized contiguous-data download queued.');
} else if (this.queue.length() < this.maxQueueSize) {
log.debug('Queueing bundle download...');
log.debug('Queueing contiguous-data download...');
this.queue.push({ item, prioritized, bypassFilter });
log.debug('Bundle download queued.');
log.debug('contiguous-data download queued.');
} else {
log.debug('Skipping bundle download, queue is full.');
log.debug('Skipping contiguous-data download, queue is full.');
}
}

async download({
item,
prioritized,
bypassFilter,
}: UnbundlingQueueItem): Promise<void> {
}: DataImporterQueueItem): Promise<void> {
const log = this.log.child({ method: 'download', id: item.id });

const data = await this.contiguousDataSource.getData({ id: item.id });

return new Promise((resolve, reject) => {
data.stream.on('end', () => {
log.debug('Bundle data downloaded complete. Queuing for unbundling..');
this.ans104Unbundler.queueItem(item, prioritized, bypassFilter);
const isAnyContiguousData = this.isAnyContiguousData(item);
if (this.ans104Unbundler && !isAnyContiguousData) {
log.debug('Data download completed. Queuing for unbundling...');
this.ans104Unbundler.queueItem(item, prioritized, bypassFilter);
} else {
log.debug(
isAnyContiguousData
? 'Data download completed, marked as any contiguous tx/data-item, skipping unbundling'
: 'Data download completed, skipping unbundling because unbundler is not available',
);
}
resolve();
});

data.stream.on('error', (error) => {
log.error('Error downloading bundle data.', {
log.error('Error downloading data.', {
message: error.message,
stack: error.stack,
});
Expand All @@ -141,4 +154,8 @@ export class BundleDataImporter {
async isQueueFull(): Promise<boolean> {
return this.queue.length() >= this.maxQueueSize;
}

isAnyContiguousData(item: ImportableItem): item is AnyContiguousData {
return Object.keys(item).length === 1 && 'id' in item;
}
}
Loading

0 comments on commit df9391b

Please sign in to comment.