Skip to content

Commit

Permalink
[Backend] Enhance retention policy deletion performances / speed (#4864)
Browse files Browse the repository at this point in the history
  • Loading branch information
richard-julien authored Oct 4, 2024
1 parent a277b00 commit 649af4f
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 75 deletions.
2 changes: 1 addition & 1 deletion opencti-platform/opencti-graphql/config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@
"retention_manager": {
"enabled": true,
"lock_key": "retention_manager_lock",
"interval": 60000
"interval": 30000
},
"file_index_manager": {
"enabled": true,
Expand Down
1 change: 1 addition & 0 deletions opencti-platform/opencti-graphql/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@
"vitest": "2.0.5"
},
"resolutions": {
"axios": "1.7.7",
"body-parser": "1.20.3",
"json5": "2.2.3",
"cross-fetch": "4.0.0",
Expand Down
25 changes: 6 additions & 19 deletions opencti-platform/opencti-graphql/src/domain/retentionRule.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { listEntities, storeLoadById } from '../database/middleware-loader';
import { ENTITY_TYPE_RETENTION_RULE } from '../schema/internalObject';
import { generateInternalId, generateStandardId } from '../schema/identifier';
import { elIndex, elPaginate } from '../database/engine';
import { INDEX_INTERNAL_OBJECTS, READ_DATA_INDICES_WITHOUT_INFERRED } from '../database/utils';
import { INDEX_INTERNAL_OBJECTS, READ_STIX_INDICES } from '../database/utils';
import { UnsupportedError } from '../config/errors';
import { utcDate } from '../utils/format';
import { RETENTION_MANAGER_USER } from '../utils/access';
Expand All @@ -20,33 +20,20 @@ export const checkRetentionRule = async (context, input) => {
if (scope === 'knowledge') {
const jsonFilters = filters ? JSON.parse(filters) : null;
const queryOptions = await convertFiltersToQueryOptions(jsonFilters, { before });
result = await elPaginate(context, RETENTION_MANAGER_USER, READ_DATA_INDICES_WITHOUT_INFERRED, queryOptions);
result = await elPaginate(context, RETENTION_MANAGER_USER, READ_STIX_INDICES, { ...queryOptions, first: 1 });
return result.pageInfo.globalCount;
}
// file and workbench rules
if (scope === 'file') {
result = await paginatedForPathWithEnrichment(
context,
RETENTION_MANAGER_USER,
'import/global',
undefined,
{ notModifiedSince: before.toISOString() }
);
result = await paginatedForPathWithEnrichment(context, RETENTION_MANAGER_USER, 'import/global', undefined, { notModifiedSince: before.toISOString() });
} else if (scope === 'workbench') {
result = await paginatedForPathWithEnrichment(
context,
RETENTION_MANAGER_USER,
'import/pending',
undefined,
{ notModifiedSince: before.toISOString() }
);
result = await paginatedForPathWithEnrichment(context, RETENTION_MANAGER_USER, 'import/pending', undefined, { notModifiedSince: before.toISOString() });
} else {
logApp.error(`[Retention manager] Scope ${scope} not existing for Retention Rule.`);
}
if (scope === 'file' || scope === 'workbench') { // don't delete progress files or files with works in progress
const resultEdges = result.edges.filter((e) => DELETABLE_FILE_STATUSES.includes(e.node.uploadStatus)
&& (e.node.works ?? []).every((work) => !work || DELETABLE_FILE_STATUSES.includes(work?.status)));
result.edges = resultEdges;
result.edges = result.edges.filter((e) => DELETABLE_FILE_STATUSES.includes(e.node.uploadStatus)
&& (e.node.works ?? []).every((work) => !work || DELETABLE_FILE_STATUSES.includes(work?.status)));
}
return result.edges.length;
};
Expand Down
14 changes: 11 additions & 3 deletions opencti-platform/opencti-graphql/src/manager/managerModule.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { clearIntervalAsync, setIntervalAsync, type SetIntervalAsyncTimer } from 'set-interval-async/fixed';
import { clearIntervalAsync as clearDynamicIntervalAsync, setIntervalAsync as setDynamicIntervalAsync } from 'set-interval-async/dynamic';
import moment from 'moment/moment';
import { createStreamProcessor, lockResource, type StreamProcessor } from '../database/redis';
import type { BasicStoreSettings } from '../types/settings';
Expand All @@ -19,6 +20,7 @@ export interface ManagerCronScheduler {
infiniteInterval?: number
handlerInitializer?: () => Promise<HandlerInput>
lockInHandlerParams?: boolean
dynamicSchedule?: boolean
}

export interface ManagerStreamScheduler {
Expand Down Expand Up @@ -122,14 +124,16 @@ const initManager = (manager: ManagerDefinition) => {
return {
manager,
start: async () => {
logApp.info(`[OPENCTI-MODULE] Starting ${manager.label}`);
if (manager.cronSchedulerHandler) {
const asyncInterval = manager.cronSchedulerHandler.dynamicSchedule ? setDynamicIntervalAsync : setIntervalAsync;
logApp.info(`[OPENCTI-MODULE] Starting ${manager.label} every ${manager.cronSchedulerHandler.interval}`);
const { handlerInitializer } = manager.cronSchedulerHandler;
scheduler = setIntervalAsync(async () => {
scheduler = asyncInterval(async () => {
await cronHandler(handlerInitializer);
}, manager.cronSchedulerHandler.interval);
}
if (manager.streamSchedulerHandler) {
logApp.info(`[OPENCTI-MODULE] Starting ${manager.label}`);
streamScheduler = setIntervalAsync(async () => {
await streamHandler();
}, manager.streamSchedulerHandler.interval);
Expand All @@ -146,7 +150,11 @@ const initManager = (manager: ManagerDefinition) => {
shutdown: async () => {
logApp.info(`[OPENCTI-MODULE] Stopping ${manager.label}`);
shutdown = true;
if (scheduler) await clearIntervalAsync(scheduler);
if (scheduler) {
const asyncCleanInterval = manager.cronSchedulerHandler && manager.cronSchedulerHandler.dynamicSchedule
? clearDynamicIntervalAsync : clearIntervalAsync;
await asyncCleanInterval(scheduler);
}
if (streamScheduler) await clearIntervalAsync(streamScheduler);
return true;
},
Expand Down
62 changes: 31 additions & 31 deletions opencti-platform/opencti-graphql/src/manager/retentionManager.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import moment, { type Moment } from 'moment';
import { Promise as BluePromise } from 'bluebird';
import { findAll as findRetentionRulesToExecute } from '../domain/retentionRule';
import conf, { booleanConf, logApp } from '../config/conf';
import { deleteElementById, patchAttribute } from '../database/middleware';
Expand All @@ -12,18 +13,21 @@ import type { ManagerDefinition } from './managerModule';
import { registerManager } from './managerModule';
import type { AuthContext } from '../types/user';
import type { FileEdge, RetentionRule } from '../generated/graphql';
import { RetentionRuleScope, RetentionUnit } from '../generated/graphql';
import { deleteFile } from '../database/file-storage';
import { DELETABLE_FILE_STATUSES, paginatedForPathWithEnrichment } from '../modules/internal/document/document-domain';
import { RetentionRuleScope, RetentionUnit } from '../generated/graphql';
import type { BasicStoreCommonEdge, StoreObject } from '../types/store';
import { ALREADY_DELETED_ERROR } from '../config/errors';

const RETENTION_MANAGER_ENABLED = booleanConf('retention_manager:enabled', false);
const RETENTION_MANAGER_START_ENABLED = booleanConf('retention_manager:enabled', true);
// Retention manager responsible to cleanup old data
// Each API will start is retention manager.
// If the lock is free, every API as the right to take it.
const SCHEDULE_TIME = conf.get('retention_manager:interval') || 60000;
const SCHEDULE_TIME = conf.get('retention_manager:interval') || 30000;
const RETENTION_MANAGER_KEY = conf.get('retention_manager:lock_key') || 'retention_manager_lock';
const RETENTION_BATCH_SIZE = conf.get('retention_manager:batch_size') || 100;
const RETENTION_BATCH_SIZE = conf.get('retention_manager:batch_size') || 1500;
const RETENTION_MAX_CONCURRENCY = conf.get('retention_manager:max_deletion_concurrency') || 10;
export const RETENTION_SCOPE_VALUES = Object.values(RetentionRuleScope);
export const RETENTION_UNIT_VALUES = Object.values(RetentionUnit);

Expand All @@ -44,28 +48,15 @@ export const getElementsToDelete = async (context: AuthContext, scope: string, b
const queryOptions = await convertFiltersToQueryOptions(jsonFilters, { before });
result = await elPaginate(context, RETENTION_MANAGER_USER, READ_STIX_INDICES, { ...queryOptions, first: RETENTION_BATCH_SIZE });
} else if (scope === 'file') {
result = await paginatedForPathWithEnrichment(
context,
RETENTION_MANAGER_USER,
'import/global',
undefined,
{ first: RETENTION_BATCH_SIZE, notModifiedSince: before.toISOString() }
);
result = await paginatedForPathWithEnrichment(context, RETENTION_MANAGER_USER, 'import/global', undefined, { first: RETENTION_BATCH_SIZE, notModifiedSince: before.toISOString() });
} else if (scope === 'workbench') {
result = await paginatedForPathWithEnrichment(
context,
RETENTION_MANAGER_USER,
'import/pending',
undefined,
{ first: RETENTION_BATCH_SIZE, notModifiedSince: before.toISOString() }
);
result = await paginatedForPathWithEnrichment(context, RETENTION_MANAGER_USER, 'import/pending', undefined, { first: RETENTION_BATCH_SIZE, notModifiedSince: before.toISOString() });
} else {
throw Error(`[Retention manager] Scope ${scope} not existing for Retention Rule.`);
}
if (scope === 'file' || scope === 'workbench') { // don't delete progress files or files with works in progress
const resultEdges = result.edges.filter((e: FileEdge) => DELETABLE_FILE_STATUSES.includes(e.node.uploadStatus)
&& (e.node.works ?? []).every((work) => !work || DELETABLE_FILE_STATUSES.includes(work?.status)));
result.edges = resultEdges;
result.edges = result.edges.filter((e: FileEdge) => DELETABLE_FILE_STATUSES.includes(e.node.uploadStatus)
&& (e.node.works ?? []).every((work) => !work || DELETABLE_FILE_STATUSES.includes(work?.status)));
}
return result;
};
Expand All @@ -77,17 +68,25 @@ const executeProcessing = async (context: AuthContext, retentionRule: RetentionR
const result = await getElementsToDelete(context, scope, before, filters);
const remainingDeletions = result.pageInfo.globalCount;
const elements = result.edges;
logApp.debug(`[OPENCTI] Retention manager clearing ${elements.length} elements`);
for (let index = 0; index < elements.length; index += 1) {
const { node } = elements[index];
const { updated_at: up } = node;
const humanDuration = moment.duration(utcDate(up).diff(utcDate())).humanize();
try {
await deleteElement(context, scope, scope === 'knowledge' ? node.internal_id : node.id, node.entity_type);
logApp.debug(`[OPENCTI] Retention manager deleting ${node.id} after ${humanDuration}`);
} catch (e) {
logApp.error(e, { id: node.id, manager: 'RETENTION_MANAGER' });
}
if (elements.length > 0) {
logApp.debug(`[OPENCTI] Retention manager clearing ${elements.length} elements`);
const start = new Date().getTime();
const deleteFn = async (element: BasicStoreCommonEdge<StoreObject>) => {
const { node } = element;
const { updated_at: up } = node;
try {
const humanDuration = moment.duration(utcDate(up).diff(utcDate())).humanize();
await deleteElement(context, scope, scope === 'knowledge' ? node.internal_id : node.id, node.entity_type);
logApp.debug(`[OPENCTI] Retention manager deleting ${node.id} after ${humanDuration}`);
} catch (err: any) {
// Only log the error if not an already deleted message (that can happen though concurrency deletion)
if (err?.extensions?.code !== ALREADY_DELETED_ERROR) {
logApp.error(err, { id: node.id, manager: 'RETENTION_MANAGER' });
}
}
};
await BluePromise.map(elements, deleteFn, { concurrency: RETENTION_MAX_CONCURRENCY });
logApp.debug(`[OPENCTI] Retention manager deleted ${elements.length} in ${new Date().getTime() - start} ms`);
}
// Patch the last execution of the rule
const patch = {
Expand Down Expand Up @@ -121,6 +120,7 @@ const RETENTION_MANAGER_DEFINITION: ManagerDefinition = {
interval: SCHEDULE_TIME,
lockKey: RETENTION_MANAGER_KEY,
lockInHandlerParams: true,
dynamicSchedule: true
},
enabledByConfig: RETENTION_MANAGER_ENABLED,
enabledToStart(): boolean {
Expand Down
22 changes: 1 addition & 21 deletions opencti-platform/opencti-graphql/yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5594,26 +5594,6 @@ __metadata:
languageName: node
linkType: hard

"axios@npm:^0.21.1":
version: 0.21.4
resolution: "axios@npm:0.21.4"
dependencies:
follow-redirects: "npm:^1.14.0"
checksum: 10/da644592cb6f8f9f8c64fdabd7e1396d6769d7a4c1ea5f8ae8beb5c2eb90a823e3a574352b0b934ac62edc762c0f52647753dc54f7d07279127a7e5c4cd20272
languageName: node
linkType: hard

"axios@npm:^1.6.0":
version: 1.7.4
resolution: "axios@npm:1.7.4"
dependencies:
follow-redirects: "npm:^1.15.6"
form-data: "npm:^4.0.0"
proxy-from-env: "npm:^1.1.0"
checksum: 10/7a1429be1e3d0c2e1b96d4bba4d113efbfabc7c724bed107beb535c782c7bea447ff634886b0c7c43395a264d085450d009eb1154b5f38a8bae49d469fdcbc61
languageName: node
linkType: hard

"axobject-query@npm:~3.1.1":
version: 3.1.1
resolution: "axobject-query@npm:3.1.1"
Expand Down Expand Up @@ -8120,7 +8100,7 @@ __metadata:
languageName: node
linkType: hard

"follow-redirects@npm:^1.14.0, follow-redirects@npm:^1.15.6":
"follow-redirects@npm:^1.15.6":
version: 1.15.6
resolution: "follow-redirects@npm:1.15.6"
peerDependenciesMeta:
Expand Down

0 comments on commit 649af4f

Please sign in to comment.