Skip to content

Commit

Permalink
feat(ans-104): add support for parallel unbundling PE-4903
Browse files Browse the repository at this point in the history
Adds worker threads to the Ans104Parser implemented in a similar fashion
to the SQLite workers. There is a fixed size worker pool. Each worker
takes work as it becomes available, processes it, and then repeats.
Callers block until work is complete. The number of workers defaults to
2 but is configurable via the ANS104_UNBUNDLE_WORKERS environment
variable. Along with this change, The number of data item indexing
workers is increased from 1 to 2 to help ensure that communication round
tip latency between the main thread the DB threads is not the bottleneck
for data item indexing.
  • Loading branch information
djwhitt committed Nov 27, 2023
1 parent 5f94d6b commit d7a3c1f
Show file tree
Hide file tree
Showing 7 changed files with 177 additions and 115 deletions.
1 change: 1 addition & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ services:
- ADMIN_API_KEY=${ADMIN_API_KEY:-}
- BACKFILL_BUNDLE_RECORDS=${BACKFILL_BUNDLE_RECORDS:-}
- FILTER_CHANGE_REPROCESS=${FILTER_CHANGE_REPROCESS:-}
- ANS104_UNBUNDLE_WORKERS=${ANS104_UNBUNDLE_WORKERS:-}
- ANS104_UNBUNDLE_FILTER=${ANS104_UNBUNDLE_FILTER:-}
- ANS104_INDEX_FILTER=${ANS104_INDEX_FILTER:-}
- ARNS_ROOT_HOST=${ARNS_ROOT_HOST:-}
Expand Down
4 changes: 4 additions & 0 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ export const BACKFILL_BUNDLE_RECORDS =
env.varOrDefault('BACKFILL_BUNDLE_RECORDS', 'false') === 'true';
export const FILTER_CHANGE_REPROCESS =
env.varOrDefault('FILTER_CHANGE_REPROCESS', 'false') === 'true';
export const ANS104_UNBUNDLE_WORKERS = +env.varOrDefault(
'ANS104_UNBUNDLE_WORKERS',
'2',
);
export const ANS104_UNBUNDLE_FILTER_STRING = canonicalize(
JSON.parse(env.varOrDefault('ANS104_UNBUNDLE_FILTER', '{"never": true}')),
);
Expand Down
269 changes: 159 additions & 110 deletions src/lib/ans-104.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,75 +100,124 @@ export function normalizeAns104DataItem({

export class Ans104Parser {
private log: winston.Logger;
private worker: Worker;
private contiguousDataSource: ContiguousDataSource;
private streamTimeout: number;
private unbundlePromiseResolve: (() => void) | undefined;
private unbundlePromiseReject: ((reason?: any) => void) | undefined;
private workers: any[] = []; // TODO what's the type for this?
private workQueue: any[] = []; // TODO what's the type for this?

constructor({
log,
eventEmitter,
contiguousDataSource,
dataItemIndexFilterString,
workerCount,

Check warning on line 113 in src/lib/ans-104.ts

View check run for this annotation

Codecov / codecov/patch

src/lib/ans-104.ts#L113

Added line #L113 was not covered by tests
streamTimeout = DEFAULT_STREAM_TIMEOUT,
}: {
log: winston.Logger;
eventEmitter: EventEmitter;
contiguousDataSource: ContiguousDataSource;
dataItemIndexFilterString: string;
workerCount: number;

Check warning on line 120 in src/lib/ans-104.ts

View check run for this annotation

Codecov / codecov/patch

src/lib/ans-104.ts#L120

Added line #L120 was not covered by tests
streamTimeout?: number;
}) {
this.log = log.child({ class: 'Ans104Parser' });
this.contiguousDataSource = contiguousDataSource;
this.streamTimeout = streamTimeout;

const workerUrl = new URL('./ans-104.js', import.meta.url);
this.worker = new Worker(workerUrl, {
workerData: {
dataItemIndexFilterString,
},
});
const self = this;

Check warning on line 127 in src/lib/ans-104.ts

View check run for this annotation

Codecov / codecov/patch

src/lib/ans-104.ts#L127

Added line #L127 was not covered by tests

this.worker
.on('message', (message: ParserMessage) => {
switch (message.eventName) {
case DATA_ITEM_MATCHED:
eventEmitter.emit(
events.ANS104_DATA_ITEM_MATCHED,
message.dataItem,
);
break;
case UNBUNDLE_COMPLETE:
this.unbundlePromiseResolve?.();
this.resetUnbundlePromise();
const { eventName, ...eventBody } = message;
eventEmitter.emit(events.ANS104_UNBUNDLE_COMPLETE, {
dataItemIndexFilterString,
...eventBody,
});
break;
case UNBUNDLE_ERROR:
this.unbundlePromiseReject?.();
this.resetUnbundlePromise();
break;
}
})
.on('error', (error: any) => {
this.unbundlePromiseReject?.();
this.resetUnbundlePromise();
this.log.error('Error in ANS-104 worker', error);
})
.on('exit', (code: number) => {
this.unbundlePromiseReject?.();
this.resetUnbundlePromise();
this.log.error(`ANS-104 worker exited with code ${code}.`);
function spawn() {
const workerUrl = new URL('./ans-104.js', import.meta.url);
const worker = new Worker(workerUrl, {
workerData: {
dataItemIndexFilterString,
},

Check warning on line 134 in src/lib/ans-104.ts

View check run for this annotation

Codecov / codecov/patch

src/lib/ans-104.ts#L129-L134

Added lines #L129 - L134 were not covered by tests
});

let job: any = null; // Current item from the queue
let error: any = null; // Error that caused the worker to crash

function takeWork() {
if (!job && self.workQueue.length) {
// If there's a job in the queue, send it to the worker
job = self.workQueue.shift();
worker.postMessage(job.message);
}
}

worker
.on('online', () => {
self.workers.push({ takeWork });
takeWork();
})
.on('message', (message: ParserMessage) => {
switch (message.eventName) {
case DATA_ITEM_MATCHED:
eventEmitter.emit(
events.ANS104_DATA_ITEM_MATCHED,
message.dataItem,
);
break;
case UNBUNDLE_COMPLETE:
const { eventName, ...eventBody } = message;
eventEmitter.emit(events.ANS104_UNBUNDLE_COMPLETE, {
dataItemIndexFilterString,
...eventBody,
});
job.resolve();
job = null;
break;
case UNBUNDLE_ERROR:
job.reject(new Error('Worker error'));
job = null;
break;
default:
job.reject(new Error('Unknown worker message'));
job = null;
break;
}
takeWork(); // Check if there's more work to do
})
.on('error', (err) => {
self.log.error('Worker error', err);
error = err;
})
.on('exit', (code) => {
self.workers = self.workers.filter(
(w: any) => w.takeWork !== takeWork,
);
if (job) {
job.reject(error || new Error('worker died'));
}
if (code !== 0) {
self.log.error('Worker stopped with exit code ' + code, {
exitCode: code,
});
spawn(); // Worker died, so spawn a new one
}
});
}

for (let i = 0; i < workerCount; i++) {
spawn();
}
}

Check warning on line 204 in src/lib/ans-104.ts

View check run for this annotation

Codecov / codecov/patch

src/lib/ans-104.ts#L136-L204

Added lines #L136 - L204 were not covered by tests

drainQueue() {
for (const worker of this.workers) {
worker.takeWork();
}

Check warning on line 209 in src/lib/ans-104.ts

View check run for this annotation

Codecov / codecov/patch

src/lib/ans-104.ts#L207-L209

Added lines #L207 - L209 were not covered by tests
}

resetUnbundlePromise() {
this.unbundlePromiseResolve = undefined;
this.unbundlePromiseReject = undefined;
queueWork(message: any): Promise<any> {
return new Promise((resolve, reject) => {
this.workQueue.push({
resolve,
reject,
message,
});
this.drainQueue();
});

Check warning on line 220 in src/lib/ans-104.ts

View check run for this annotation

Codecov / codecov/patch

src/lib/ans-104.ts#L213-L220

Added lines #L213 - L220 were not covered by tests
}

async parseBundle({
Expand All @@ -180,84 +229,84 @@ export class Ans104Parser {
parentId: string;
parentIndex: number;
}): Promise<void> {
const unbundlePromise: Promise<void> = new Promise(
async (resolve, reject) => {
let bundlePath: string | undefined;
try {
this.unbundlePromiseResolve = resolve;
this.unbundlePromiseReject = reject;
const log = this.log.child({ parentId });
return new Promise(async (resolve, reject) => {
let bundlePath: string | undefined;
try {
const log = this.log.child({ parentId });

Check warning on line 235 in src/lib/ans-104.ts

View check run for this annotation

Codecov / codecov/patch

src/lib/ans-104.ts#L232-L235

Added lines #L232 - L235 were not covered by tests

// Get data stream
const data = await this.contiguousDataSource.getData(parentId);
// Get data stream
const data = await this.contiguousDataSource.getData(parentId);

Check warning on line 238 in src/lib/ans-104.ts

View check run for this annotation

Codecov / codecov/patch

src/lib/ans-104.ts#L237-L238

Added lines #L237 - L238 were not covered by tests

// Construct temp path for passing data to worker
await fsPromises.mkdir(path.join(process.cwd(), 'data/tmp/ans-104'), {
recursive: true,
});
bundlePath = path.join(
process.cwd(),
'data/tmp/ans-104',
`${parentId}-${Math.random().toString(36).substring(2, 15)}`,
);
// Construct temp path for passing data to worker
await fsPromises.mkdir(path.join(process.cwd(), 'data/tmp/ans-104'), {
recursive: true,
});
bundlePath = path.join(
process.cwd(),
'data/tmp/ans-104',
`${parentId}-${Math.random().toString(36).substring(2, 15)}`,
);

Check warning on line 248 in src/lib/ans-104.ts

View check run for this annotation

Codecov / codecov/patch

src/lib/ans-104.ts#L240-L248

Added lines #L240 - L248 were not covered by tests

// Setup timeout for stalled data streams
let timeout: NodeJS.Timeout;
const resetTimeout = () => {
if (timeout !== undefined) {
clearTimeout(timeout);
}
timeout = setTimeout(() => {
data.stream.destroy(new Error('Timeout'));
}, this.streamTimeout);
};
data.stream.on('data', resetTimeout);
data.stream.pause();
// Setup timeout for stalled data streams
let timeout: NodeJS.Timeout;
const resetTimeout = () => {
if (timeout !== undefined) {
clearTimeout(timeout);
}
timeout = setTimeout(() => {
data.stream.destroy(new Error('Timeout'));
}, this.streamTimeout);
};
data.stream.on('data', resetTimeout);
data.stream.pause();

Check warning on line 261 in src/lib/ans-104.ts

View check run for this annotation

Codecov / codecov/patch

src/lib/ans-104.ts#L250-L261

Added lines #L250 - L261 were not covered by tests

// Write data stream to temp file
const writeStream = fs.createWriteStream(bundlePath);
pipeline(data.stream, writeStream, async (error) => {
if (error !== undefined) {
reject(error);
this.resetUnbundlePromise();
log.error('Error writing ANS-104 bundle stream', error);
if (bundlePath !== undefined) {
try {
await fsPromises.unlink(bundlePath);
} catch (error: any) {
log.error('Error deleting ANS-104 temporary bundle file', {
// Write data stream to temp file
const writeStream = fs.createWriteStream(bundlePath);
pipeline(data.stream, writeStream, async (error) => {
if (error !== undefined) {
reject(error);
log.error('Error writing ANS-104 bundle stream', error);
if (bundlePath !== undefined) {
try {
await fsPromises.unlink(bundlePath);
} catch (error: any) {
log.error(
'Error deleting ANS-104 temporary bundle file', {

Check warning on line 274 in src/lib/ans-104.ts

View check run for this annotation

Codecov / codecov/patch

src/lib/ans-104.ts#L263-L274

Added lines #L263 - L274 were not covered by tests
message: error?.message,
stack: error?.stack,
});
}
}
);

Check warning on line 278 in src/lib/ans-104.ts

View check run for this annotation

Codecov / codecov/patch

src/lib/ans-104.ts#L277-L278

Added lines #L277 - L278 were not covered by tests
}
} else {
log.info('Parsing ANS-104 bundle stream...');
this.worker.postMessage({
}
} else {
log.info('Parsing ANS-104 bundle stream...');
this.workQueue.push({
resolve,
reject,
message: {

Check warning on line 286 in src/lib/ans-104.ts

View check run for this annotation

Codecov / codecov/patch

src/lib/ans-104.ts#L280-L286

Added lines #L280 - L286 were not covered by tests
rootTxId,
parentId,
parentIndex,
bundlePath,
});
}
});
} catch (error) {
reject(error);
this.resetUnbundlePromise();
if (bundlePath !== undefined) {
try {
await fsPromises.unlink(bundlePath);
} catch (error: any) {
log.error('Error deleting ANS-104 temporary bundle file', {
message: error?.message,
stack: error?.stack,
});
}
},
});
this.drainQueue();
}
});
} catch (error) {
reject(error);
if (bundlePath !== undefined) {
try {
await fsPromises.unlink(bundlePath);
} catch (error: any) {
log.error('Error deleting ANS-104 temporary bundle file', {
message: error?.message,
stack: error?.stack,
});

Check warning on line 305 in src/lib/ans-104.ts

View check run for this annotation

Codecov / codecov/patch

src/lib/ans-104.ts#L291-L305

Added lines #L291 - L305 were not covered by tests
}
}
},
);
return unbundlePromise;
}
});

Check warning on line 309 in src/lib/ans-104.ts

View check run for this annotation

Codecov / codecov/patch

src/lib/ans-104.ts#L308-L309

Added lines #L308 - L309 were not covered by tests
}
}

Expand Down
5 changes: 5 additions & 0 deletions src/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ export const dataItemsQueuedCounter = new promClient.Counter({
labelNames: ['bundle_format'],
});

export const dataItemsIndexedCounter = new promClient.Counter({
name: 'data_items_indexed_total',
help: 'Count of data items indexed',
});

//
// Arweave client metrics
//
Expand Down
1 change: 1 addition & 0 deletions src/system.ts
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ const ans104Unbundler = new Ans104Unbundler({
filter: config.ANS104_UNBUNDLE_FILTER,
contiguousDataSource,
dataItemIndexFilterString: config.ANS104_INDEX_FILTER_STRING,
workerCount: config.ANS104_UNBUNDLE_WORKERS,
});

eventEmitter.on(
Expand Down
6 changes: 4 additions & 2 deletions src/workers/ans104-data-indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ import * as EventEmitter from 'node:events';
import * as winston from 'winston';

import * as events from '../events.js';
import * as metrics from '../metrics.js';
import { NestedDataIndexWriter, NormalizedDataItem } from '../types.js';

const DEFAULT_WORKER_COUNT = 1;
const DEFAULT_WORKER_COUNT = 2;

export class Ans104DataIndexer {
// Dependencies
Expand Down Expand Up @@ -82,12 +83,13 @@ export class Ans104DataIndexer {
typeof item.data_size === 'number'
) {
log.debug('Indexing data item data...');
this.indexWriter.saveNestedDataId({
await this.indexWriter.saveNestedDataId({
id: item.id,
parentId: item.parent_id,
dataOffset: item.data_offset,
dataSize: item.data_size,
});
metrics.dataItemsIndexedCounter.inc();
this.eventEmitter.emit(events.ANS104_DATA_ITEM_DATA_INDEXED, item);
log.debug('Data item data indexed.');
} else {
Expand Down
Loading

0 comments on commit d7a3c1f

Please sign in to comment.