Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add DISABLE_READ_THROUGH_DATA_CACHE #270

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ export const BUNDLE_REPAIR_RETRY_INTERVAL_SECONDS = +env.varOrDefault(
);

export const BUNDLE_REPAIR_RETRY_BATCH_SIZE = +env.varOrDefault(
'BUNDLE_REPAIR_RETRY_INTERVAL_SECONDS',
'BUNDLE_REPAIR_RETRY_BATCH_SIZE',
'1000',
);

Expand Down Expand Up @@ -562,6 +562,9 @@ export const GET_DATA_CIRCUIT_BREAKER_TIMEOUT_MS = +env.varOrDefault(
'500',
);

export const DISABLE_READ_THROUGH_DATA_CACHE =
env.varOrDefault('DISABLE_READ_THROUGH_DATA_CACHE', 'false') === 'true';

//
// AO
//
Expand Down
28 changes: 13 additions & 15 deletions src/database/sql/bundles/repair.sql
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,21 @@ FROM (
ORDER BY b.import_attempt_count, b.last_queued_at ASC
LIMIT @limit
)
ORDER BY RANDOM()

-- updateFullyIndexedAt
UPDATE bundles
SET
first_fully_indexed_at = IFNULL(first_fully_indexed_at, @fully_indexed_at),
last_fully_indexed_at = @fully_indexed_at
WHERE matched_data_item_count IS NOT NULL
AND matched_data_item_count > 0
AND EXISTS (
SELECT 1
FROM bundle_data_items bdi
WHERE bdi.parent_id = bundles.id
AND bdi.filter_id = bundles.index_filter_id
GROUP BY bdi.parent_id
HAVING COUNT(*) = bundles.matched_data_item_count
) AND last_fully_indexed_at IS NULL
UPDATE bundles
SET
first_fully_indexed_at = IFNULL(first_fully_indexed_at, @fully_indexed_at),
last_fully_indexed_at = @fully_indexed_at
WHERE matched_data_item_count IS NOT NULL
AND matched_data_item_count > 0
AND (
SELECT COUNT(*)
FROM bundle_data_items bdi
WHERE bdi.parent_id = bundles.id
AND bdi.filter_id = bundles.index_filter_id
) = bundles.matched_data_item_count
AND last_fully_indexed_at IS NULL;

-- updateForFilterChange
UPDATE bundles
Expand Down
11 changes: 10 additions & 1 deletion src/database/standalone-sqlite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -810,9 +810,18 @@ export class StandaloneSqliteDatabaseWorker {
}

getFailedBundleIds(limit: number) {
const reprocessCutoff = currentUnixTimestamp() - BUNDLE_REPROCESS_WAIT_SECS;

const rows = this.stmts.bundles.selectFailedBundleIds.all({
limit,
reprocess_cutoff: currentUnixTimestamp() - BUNDLE_REPROCESS_WAIT_SECS,
reprocess_cutoff: reprocessCutoff,
});

this.log.debug('getFailedBundleIds', {
reprocessCutoff,
BUNDLE_REPROCESS_WAIT_SECS,
limit,
rowsLength: rows.length,
});

return rows.map((row): string => toB64Url(row.id));
Expand Down
52 changes: 32 additions & 20 deletions src/system.ts
Original file line number Diff line number Diff line change
Expand Up @@ -412,27 +412,39 @@ const contiguousDataStore =
baseDir: 'data/contiguous',
});

export const onDemandContiguousDataSource = new ReadThroughDataCache({
log,
dataSource: new SequentialDataSource({
log,
dataSources: onDemandDataSources,
}),
dataStore: contiguousDataStore,
contiguousDataIndex,
dataContentAttributeImporter,
});
export const onDemandContiguousDataSource =
config.DISABLE_READ_THROUGH_DATA_CACHE
? new SequentialDataSource({
log,
dataSources: onDemandDataSources,
})
: new ReadThroughDataCache({
log,
dataSource: new SequentialDataSource({
log,
dataSources: onDemandDataSources,
}),
dataStore: contiguousDataStore,
contiguousDataIndex,
dataContentAttributeImporter,
});

export const backgroundContiguousDataSource = new ReadThroughDataCache({
log,
dataSource: new SequentialDataSource({
log,
dataSources: backgroundDataSources,
}),
dataStore: contiguousDataStore,
contiguousDataIndex,
dataContentAttributeImporter,
});
export const backgroundContiguousDataSource =
config.DISABLE_READ_THROUGH_DATA_CACHE
? new SequentialDataSource({
log,
dataSources: backgroundDataSources,
})
: new ReadThroughDataCache({
log,
dataSource: new SequentialDataSource({
log,
dataSources: backgroundDataSources,
}),
dataStore: contiguousDataStore,
contiguousDataIndex,
dataContentAttributeImporter,
});

export const dataItemIndexer = new DataItemIndexer({
log,
Expand Down
7 changes: 7 additions & 0 deletions src/workers/bundle-repair-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,13 @@ export class BundleRepairWorker {
const bundleIds = await this.bundleIndex.getFailedBundleIds(
config.BUNDLE_REPAIR_RETRY_BATCH_SIZE,
);

this.log.debug('Bundles to retry', {
idsLength: bundleIds.length,
bundleIds,
batchSize: config.BUNDLE_REPAIR_RETRY_BATCH_SIZE,
});

for (const bundleId of bundleIds) {
this.log.info('Retrying failed bundle', { bundleId });
await this.txFetcher.queueTxId({ txId: bundleId });
Expand Down
Loading