Skip to content

Commit

Permalink
[Entity Analytics] [Entity Store] Run init requests sequentially to p…
Browse files Browse the repository at this point in the history
…revent resource exists error (elastic#198268)

## Summary

This PR fixes an issue where running init for both `user` and `host`
entity engines in parallel would cause a race condition while enabling
the risk engine, resulting in a `Resource already exists` error.

---------

Co-authored-by: kibanamachine <[email protected]>
(cherry picked from commit 6a50066)
  • Loading branch information
tiansivive committed Oct 31, 2024
1 parent 189cf8b commit f1f332e
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ const EntityStoreDashboardPanelsComponent = () => {
};
setRiskEngineInitializing(true);
initRiskEngine(undefined, options);
return;
}

if (enable.entityStore) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ export const useEntityStoreEnablement = () => {
const { initEntityStore } = useEntityStoreRoutes();
const { refetch: initialize } = useQuery({
queryKey: [ENTITY_STORE_ENABLEMENT_INIT],
queryFn: () => Promise.all([initEntityStore('user'), initEntityStore('host')]),
queryFn: async () =>
initEntityStore('user').then((usr) => initEntityStore('host').then((host) => [usr, host])),
enabled: false,
});

Expand Down Expand Up @@ -79,7 +80,9 @@ export const useInitEntityEngineMutation = (options?: UseMutationOptions<{}>) =>
timestamp: new Date().toISOString(),
action: 'start',
});
return Promise.all([initEntityStore('user'), initEntityStore('host')]);
return initEntityStore('user').then((usr) =>
initEntityStore('host').then((host) => [usr, host])
);
},
{
...options,
Expand Down Expand Up @@ -107,7 +110,9 @@ export const useStopEntityEngineMutation = (options?: UseMutationOptions<{}>) =>
timestamp: new Date().toISOString(),
action: 'stop',
});
return Promise.all([stopEntityStore('user'), stopEntityStore('host')]);
return stopEntityStore('user').then((usr) =>
stopEntityStore('host').then((host) => [usr, host])
);
},
{
...options,
Expand All @@ -129,7 +134,10 @@ export const useDeleteEntityEngineMutation = (options?: UseMutationOptions<{}>)
const invalidateEntityEngineStatusQuery = useInvalidateEntityEngineStatusQuery();
const { deleteEntityEngine } = useEntityStoreRoutes();
return useMutation<DeleteEntityEngineResponse[]>(
() => Promise.all([deleteEntityEngine('user', true), deleteEntityEngine('host', true)]),
() =>
deleteEntityEngine('user', true).then((usr) =>
deleteEntityEngine('host', true).then((host) => [usr, host])
),
{
...options,
mutationKey: DELETE_ENTITY_ENGINE_STATUS_KEY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ export const createPlatformPipeline = async ({
managed_by: 'entity_store',
managed: true,
},
description: `Ingest pipeline for entity defiinition ${entityManagerDefinition.id}`,
description: `Ingest pipeline for entity definition ${entityManagerDefinition.id}`,
processors: buildIngestPipeline({
namespace: unitedDefinition.namespace,
version: unitedDefinition.version,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,15 +143,15 @@ export class EntityStoreDataClient {
);
}
logger.info(
`In namespace ${this.options.namespace}: Initializing entity store for ${entityType}`
`[Entity Store] In namespace ${this.options.namespace}: Initializing entity store for ${entityType}`
);

const descriptor = await this.engineClient.init(entityType, {
filter,
fieldHistoryLength,
indexPattern,
});
logger.debug(`Initialized engine for ${entityType}`);
logger.debug(`[Entity Store] Initialized saved object for ${entityType}`);
// first create the entity definition without starting it
// so that the index template is created which we can add a component template to

Expand All @@ -164,7 +164,9 @@ export class EntityStoreDataClient {
config,
pipelineDebugMode
).catch((error) => {
logger.error(`There was an error during async setup of the Entity Store: ${error.message}`);
logger.error(
`[Entity Store] There was an error during async setup of the Entity Store: ${error.message}`
);
});

return descriptor;
Expand Down Expand Up @@ -259,7 +261,7 @@ export class EntityStoreDataClient {
logger,
taskManager,
});
logger.info(`Entity store initialized for ${entityType}`);
debugLog(`Entity store initialized`);

const setupEndTime = moment().utc().toISOString();
const duration = moment(setupEndTime).diff(moment(setupStartTime), 'seconds');
Expand All @@ -270,7 +272,7 @@ export class EntityStoreDataClient {
return updated;
} catch (err) {
this.options.logger.error(
`Error initializing entity store for ${entityType}: ${err.message}`
`[Entity Store] Error initializing entity store for ${entityType}: ${err.message}`
);

this.options.telemetry?.reportEvent(ENTITY_ENGINE_RESOURCE_INIT_FAILURE_EVENT.eventType, {
Expand Down Expand Up @@ -367,7 +369,9 @@ export class EntityStoreDataClient {
frequency: `${config.frequency.asSeconds()}s`,
});
const { entityManagerDefinition } = unitedDefinition;
logger.info(`In namespace ${namespace}: Deleting entity store for ${entityType}`);
logger.info(
`[Entity Store] In namespace ${namespace}: Deleting entity store for ${entityType}`
);
try {
try {
await this.entityClient.deleteEntityDefinition({
Expand Down Expand Up @@ -414,6 +418,7 @@ export class EntityStoreDataClient {
});
}

logger.info(`[Entity Store] In namespace ${namespace}: Deleted store for ${entityType}`);
return { deleted: true };
} catch (e) {
logger.error(`Error deleting entity store for ${entityType}: ${e.message}`);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ import {
const logFactory =
(logger: Logger, taskId: string) =>
(message: string): void =>
logger.info(`[task ${taskId}]: ${message}`);
logger.info(`[Entity Store] [task ${taskId}]: ${message}`);

const debugLogFactory =
(logger: Logger, taskId: string) =>
(message: string): void =>
logger.debug(`[task ${taskId}]: ${message}`);
logger.debug(`[Entity Store] [task ${taskId}]: ${message}`);

const getTaskName = (): string => TYPE;

Expand All @@ -65,7 +65,9 @@ export const registerEntityStoreFieldRetentionEnrichTask = ({
taskManager: TaskManagerSetupContract | undefined;
}): void => {
if (!taskManager) {
logger.info('Task Manager is unavailable; skipping entity store enrich policy registration.');
logger.info(
'[Entity Store] Task Manager is unavailable; skipping entity store enrich policy registration.'
);
return;
}

Expand Down Expand Up @@ -134,7 +136,7 @@ export const startEntityStoreFieldRetentionEnrichTask = async ({
params: { version: VERSION },
});
} catch (e) {
logger.warn(`[task ${taskId}]: error scheduling task, received ${e.message}`);
logger.warn(`[Entity Store] [task ${taskId}]: error scheduling task, received ${e.message}`);
throw e;
}
};
Expand All @@ -150,9 +152,14 @@ export const removeEntityStoreFieldRetentionEnrichTask = async ({
}) => {
try {
await taskManager.remove(getTaskId(namespace));
logger.info(
`[Entity Store] Removed entity store enrich policy task for namespace ${namespace}`
);
} catch (err) {
if (!SavedObjectsErrorHelpers.isNotFoundError(err)) {
logger.error(`Failed to remove entity store enrich policy task: ${err.message}`);
logger.error(
`[Entity Store] Failed to remove entity store enrich policy task: ${err.message}`
);
throw err;
}
}
Expand Down Expand Up @@ -233,7 +240,7 @@ export const runTask = async ({
state: updatedState,
};
} catch (e) {
logger.error(`[task ${taskId}]: error running task, received ${e.message}`);
logger.error(`[Entity Store] [task ${taskId}]: error running task, received ${e.message}`);
throw e;
}
};
Expand Down

0 comments on commit f1f332e

Please sign in to comment.