Skip to content

Commit

Permalink
fix: shutdown problems with db.stop when exit signal is received
Browse files Browse the repository at this point in the history
  * remove all awaits on queue.drained calls in stop methods
  * better error handling around pool/worker stops in db.stop
  • Loading branch information
hlolli committed Jan 29, 2025
1 parent f8b2591 commit 6e1a7d3
Show file tree
Hide file tree
Showing 13 changed files with 39 additions and 31 deletions.
45 changes: 27 additions & 18 deletions src/database/standalone-sqlite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2778,26 +2778,34 @@ export class StandaloneSqliteDatabase
async stop() {
const log = this.log.child({ method: 'stop' });
const promises: Promise<void>[] = [];
WORKER_POOL_NAMES.forEach((pool) => {
WORKER_ROLE_NAMES.forEach((role) => {
this.workers[pool][role].forEach(() => {
promises.push(
new Promise((resolve, reject) => {
this.workQueues[pool][role].push({
resolve,
reject,
message: {
method: 'terminate',
},
});
this.drainQueue();
}),
);
});
});
});

for (const pool of WORKER_POOL_NAMES) {
for (const role of WORKER_ROLE_NAMES) {
if (this.workers[pool][role] !== undefined) {
for (const worker of this.workers[pool][role]) {
if (worker && typeof worker.push === 'function') {
log.debug('Creating stop job for worker', { pool, role });
promises.push(
new Promise((resolve) => {
worker.push({
resolve,
// by always resolving, we prevent shutdown procedure from
// perpetually waiting for a response from the worker
reject: resolve,
message: {
method: 'terminate',
},
});
}),
);
this.drainQueue();
}
}
}
}
}
await Promise.all(promises);

log.debug('Stopped successfully.');
}

Expand Down Expand Up @@ -3460,6 +3468,7 @@ if (!isMainThread) {
parentPort?.postMessage(walCheckpoint);
break;
case 'terminate':
console.log('Worker terminating', method, args);
parentPort?.postMessage(null);
process.exit(0);
}
Expand Down
3 changes: 1 addition & 2 deletions src/system.ts
Original file line number Diff line number Diff line change
Expand Up @@ -754,11 +754,10 @@ export const shutdown = async (exitCode = 0) => {
await ans104DataIndexer.stop();
await ans104Unbundler.stop();
await webhookEmitter.stop();
await db.stop();
await headerFsCacheCleanupWorker?.stop();
await contiguousDataFsCacheCleanupWorker?.stop();
await dataVerificationWorker?.stop();

await db.stop();
process.exit(exitCode);
});
}
Expand Down
2 changes: 1 addition & 1 deletion src/workers/ans104-data-indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ export class Ans104DataIndexer {
async stop(): Promise<void> {
const log = this.log.child({ method: 'stop' });
this.queue.kill();
await this.queue.drained();
this.queue.drained();
log.debug('Stopped successfully.');
}
}
2 changes: 1 addition & 1 deletion src/workers/ans104-unbundler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ export class Ans104Unbundler {
async stop(): Promise<void> {
const log = this.log.child({ method: 'stop' });
this.queue.kill();
await this.queue.drained();
this.queue.drained();
await this.ans104Parser.stop();
log.debug('Stopped successfully.');
}
Expand Down
2 changes: 1 addition & 1 deletion src/workers/data-content-attribute-importer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ export class DataContentAttributeImporter {
async stop(): Promise<void> {
const log = this.log.child({ method: 'stop' });
this.queue.kill();
await this.queue.drained();
this.queue.drained();
log.debug('Stopped successfully.');
}

Expand Down
2 changes: 1 addition & 1 deletion src/workers/data-importer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ export class DataImporter {
async stop(): Promise<void> {
const log = this.log.child({ method: 'stop' });
this.queue.kill();
await this.queue.drained();
this.queue.drained();
log.debug('Stopped successfully.');
}

Expand Down
2 changes: 1 addition & 1 deletion src/workers/data-item-indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ export class DataItemIndexer {
async stop(): Promise<void> {
const log = this.log.child({ method: 'stop' });
this.queue.kill();
await this.queue.drained();
this.queue.drained();
log.debug('Stopped successfully.');
}
}
2 changes: 1 addition & 1 deletion src/workers/data-verification.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ export class DataVerificationWorker {
const log = this.log.child({ method: 'stop' });
clearInterval(this.intervalId);
this.queue.kill();
await this.queue.drained();
this.queue.drained();
await this.dataRootComputer.stop();
log.debug('Stopped successfully.');
}
Expand Down
2 changes: 1 addition & 1 deletion src/workers/transaction-fetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ export class TransactionFetcher {
async stop(): Promise<void> {
const log = this.log.child({ method: 'stop' });
this.queue.kill();
await this.queue.drained();
this.queue.drained();
log.debug('Stopped successfully.');
}

Expand Down
2 changes: 1 addition & 1 deletion src/workers/transaction-importer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ export class TransactionImporter {
async stop(): Promise<void> {
const log = this.log.child({ method: 'stop' });
this.queue.kill();
await this.queue.drained();
this.queue.drained();
log.debug('Stopped successfully.');
}

Expand Down
2 changes: 1 addition & 1 deletion src/workers/transaction-offset-importer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ export class TransactionOffsetImporter {
async stop(): Promise<void> {
const log = this.log.child({ method: 'stop' });
this.queue.kill();
await this.queue.drained();
this.queue.drained();
log.debug('Stopped successfully.');
}

Expand Down
2 changes: 1 addition & 1 deletion src/workers/webhook-emitter.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ describe('WebhookEmitter', () => {

await wait(0);

await webhookEmitter.emissionQueue.drained();
webhookEmitter.emissionQueue.drained();

assert.equal(
(webhookEmitter.emitWebhookToTargetServer as any).mock.callCount(),
Expand Down
2 changes: 1 addition & 1 deletion src/workers/webhook-emitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ export class WebhookEmitter {
this.listenerReferences.clear();

this.emissionQueue.kill();
await this.emissionQueue.drained();
this.emissionQueue.drained();
log.debug('Stopped successfully.');
}

Expand Down

0 comments on commit 6e1a7d3

Please sign in to comment.