Skip to content

Commit

Permalink
Fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
sorenlouv committed Aug 27, 2024
1 parent bafa18b commit 14854d2
Show file tree
Hide file tree
Showing 12 changed files with 448 additions and 222 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,14 @@ export function registerSummarizationFunction({
signal
) => {
// The LLM should be able to update an existing entry by providing the same doc_id
// if no id is provided, we generate a new one
// if no existing entry is found, we generate a uuid
const id = await client.getUuidFromDocId(docId);

return client
.addKnowledgeBaseEntry({
entry: {
id: id ?? v4(),
title: docId, // use doc_id as title for now
doc_id: docId,
role: KnowledgeBaseEntryRole.AssistantSummarization,
text,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import type {
import { notImplemented } from '@hapi/boom';
import { nonEmptyStringRt, toBooleanRt } from '@kbn/io-ts-utils';
import * as t from 'io-ts';
import { v4 } from 'uuid';
import { createObservabilityAIAssistantServerRoute } from '../create_observability_ai_assistant_server_route';
import {
Instruction,
Expand Down Expand Up @@ -240,10 +239,15 @@ const importKnowledgeBaseEntries = createObservabilityAIAssistantServerRoute({
params: t.type({
body: t.type({
entries: t.array(
t.type({
doc_id: t.string,
text: nonEmptyStringRt,
})
t.intersection([
t.type({
id: t.string,
text: nonEmptyStringRt,
}),
t.partial({
title: t.string,
}),
])
),
}),
}),
Expand All @@ -257,18 +261,48 @@ const importKnowledgeBaseEntries = createObservabilityAIAssistantServerRoute({
throw notImplemented();
}

const entries = resources.params.body.entries.map((entry) => ({
id: v4(),
const formattedEntries = resources.params.body.entries.map((entry) => ({
id: entry.id,
title: entry.title,
text: entry.text,
confidence: 'high' as KnowledgeBaseEntry['confidence'],
is_correction: false,
type: 'contextual' as const,
public: true,
labels: {},
role: KnowledgeBaseEntryRole.UserEntry,
...entry,
}));

return await client.importKnowledgeBaseEntries({ entries });
return await client.importKnowledgeBaseEntries({ entries: formattedEntries });
},
});

const importKnowledgeBaseCategoryEntries = createObservabilityAIAssistantServerRoute({
endpoint: 'POST /internal/observability_ai_assistant/kb/entries/category/import',
params: t.type({
body: t.type({
category: t.string,
entries: t.array(
t.type({
id: t.string,
texts: t.array(t.string),
})
),
}),
}),
options: {
tags: ['access:ai_assistant'],
},
handler: async (resources): Promise<void> => {
const client = await resources.service.getClient({ request: resources.request });

if (!client) {
throw notImplemented();
}

const { entries, category } = resources.params.body;

return resources.service.addCategoryToKnowledgeBase(category, entries);
},
});

Expand All @@ -279,6 +313,7 @@ export const knowledgeBaseRoutes = {
...saveKnowledgeBaseUserInstruction,
...getKnowledgeBaseUserInstructions,
...importKnowledgeBaseEntries,
...importKnowledgeBaseCategoryEntries,
...saveKnowledgeBaseEntry,
...deleteKnowledgeBaseEntry,
};
Original file line number Diff line number Diff line change
Expand Up @@ -326,35 +326,34 @@ export class ObservabilityAIAssistantService {
addToKnowledgeBaseQueue(entries: KnowledgeBaseEntryRequest[]): void {
this.init()
.then(() => {
this.kbService!.queue(
entries.flatMap((entry) => {
const entryWithSystemProperties = {
...entry,
doc_id: entry.id,
'@timestamp': new Date().toISOString(),
public: true,
confidence: 'high' as const,
type: 'contextual' as const,
is_correction: false,
labels: {
...entry.labels,
},
role: KnowledgeBaseEntryRole.Elastic,
};

const operations =
'texts' in entryWithSystemProperties
? splitKbText(entryWithSystemProperties)
: [
{
type: KnowledgeBaseEntryOperationType.Index,
document: entryWithSystemProperties,
},
];

return operations;
})
);
const operations = entries.flatMap((entry) => {
const entryWithSystemProperties = {
...entry,
doc_id: entry.id,
'@timestamp': new Date().toISOString(),
public: true,
confidence: 'high' as const,
type: 'contextual' as const,
is_correction: false,
labels: {
...entry.labels,
},
role: KnowledgeBaseEntryRole.Elastic,
};

return 'texts' in entryWithSystemProperties
? splitKbText(entryWithSystemProperties)
: [
{
type: KnowledgeBaseEntryOperationType.Index,
document: entryWithSystemProperties,
},
];
});

this.logger.debug(`Queuing ${operations.length} operations (${entries.length} entries)`);

this.kbService!.queue(operations);
})
.catch((error) => {
this.logger.error(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,15 @@ export const kbComponentTemplate: ClusterComponentTemplate['component_template']
'@timestamp': date,
id: keyword,
doc_id: { type: 'text', fielddata: true },
title: { type: 'text', fielddata: true },
title: {
type: 'text',
fields: {
keyword: {
type: 'keyword',
ignore_above: 256,
},
},
},
user: {
properties: {
id: keyword,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ export interface RecalledEntry {
function isModelMissingOrUnavailableError(error: Error) {
return (
error instanceof errors.ResponseError &&
(error.body.error.type === 'resource_not_found_exception' ||
error.body.error.type === 'status_exception')
(error.body.error?.type === 'resource_not_found_exception' ||
error.body.error?.type === 'status_exception')
);
}
function isCreateModelValidationError(error: Error) {
Expand All @@ -70,7 +70,7 @@ export enum KnowledgeBaseEntryOperationType {

interface KnowledgeBaseDeleteOperation {
type: KnowledgeBaseEntryOperationType.Delete;
doc_id?: string;
groupId?: string;
labels?: Record<string, string>;
}

Expand All @@ -84,7 +84,7 @@ export type KnowledgeBaseEntryOperation =
| KnowledgeBaseIndexOperation;

export class KnowledgeBaseService {
private hasSetup: boolean = false;
private isModelReady: boolean = false;

private _queue: KnowledgeBaseEntryOperation[] = [];

Expand All @@ -93,6 +93,7 @@ export class KnowledgeBaseService {
}

setup = async () => {
this.dependencies.logger.debug('Setting up knowledge base');
const elserModelId = await this.dependencies.getModelId();

const retryOptions = { factor: 1, minTimeout: 10000, retries: 12 };
Expand Down Expand Up @@ -189,7 +190,7 @@ export class KnowledgeBaseService {
);

if (isReady) {
return Promise.resolve();
return;
}

this.dependencies.logger.debug('Model is not allocated yet');
Expand Down Expand Up @@ -234,7 +235,7 @@ export class KnowledgeBaseService {
query: {
bool: {
filter: [
...(operation.doc_id ? [{ term: { _id: operation.doc_id } }] : []),
...(operation.groupId ? [{ term: { doc_id: operation.groupId } }] : []),
...(operation.labels
? map(operation.labels, (value, key) => {
return { term: { [key]: value } };
Expand All @@ -247,7 +248,7 @@ export class KnowledgeBaseService {
return;
} catch (error) {
this.dependencies.logger.error(
`Failed to delete document "${operation?.doc_id}" due to ${error.message}`
`Failed to delete document "${operation?.groupId}" due to ${error.message}`
);
this.dependencies.logger.debug(() => JSON.stringify(operation));
throw error;
Expand Down Expand Up @@ -275,7 +276,7 @@ export class KnowledgeBaseService {

this.dependencies.logger.debug(`Processing queue`);

this.hasSetup = true;
this.isModelReady = true;

this.dependencies.logger.info(`Processing ${this._queue.length} queue operations`);

Expand All @@ -292,16 +293,23 @@ export class KnowledgeBaseService {
)
);

this.dependencies.logger.info('Processed all queued operations');
this.dependencies.logger.info(`Finished processing ${operations.length} queued operations`);
}

queue(operations: KnowledgeBaseEntryOperation[]): void {
if (!operations.length) {
return;
}

if (!this.hasSetup) {
this._queue.push(...operations);
this.dependencies.logger.debug(
`Adding ${operations.length} operations to queue. Queue size now: ${this._queue.length})`
);
this._queue.push(...operations);

if (!this.isModelReady) {
this.dependencies.logger.debug(
`Delay processing ${operations.length} operations until knowledge base is ready`
);
return;
}

Expand All @@ -311,13 +319,18 @@ export class KnowledgeBaseService {
limiter(() => this.processOperation(operation))
);

Promise.all(limitedFunctions).catch((err) => {
this.dependencies.logger.error(`Failed to process all queued operations`);
this.dependencies.logger.error(err);
});
Promise.all(limitedFunctions)
.then(() => {
this.dependencies.logger.debug(`Processed all queued operations`);
})
.catch((err) => {
this.dependencies.logger.error(`Failed to process all queued operations`);
this.dependencies.logger.error(err);
});
}

status = async () => {
this.dependencies.logger.debug('Checking model status');
const elserModelId = await this.dependencies.getModelId();

try {
Expand All @@ -327,14 +340,23 @@ export class KnowledgeBaseService {
const elserModelStats = modelStats.trained_model_stats[0];
const deploymentState = elserModelStats.deployment_stats?.state;
const allocationState = elserModelStats.deployment_stats?.allocation_status.state;
const ready = deploymentState === 'started' && allocationState === 'fully_allocated';

this.dependencies.logger.debug(
`Model deployment state: ${deploymentState}, allocation state: ${allocationState}, ready: ${ready}`
);

return {
ready: deploymentState === 'started' && allocationState === 'fully_allocated',
ready,
deployment_state: deploymentState,
allocation_state: allocationState,
model_name: elserModelId,
};
} catch (error) {
this.dependencies.logger.debug(
`Failed to get status for model "${elserModelId}" due to ${error.message}`
);

return {
error: error instanceof errors.ResponseError ? error.body.error : String(error),
ready: false,
Expand Down Expand Up @@ -533,22 +555,24 @@ export class KnowledgeBaseService {
query: {
bool: {
filter: [
// filter title by query
...(query ? [{ wildcard: { doc_id: { value: `${query}*` } } }] : []),
// filter by search query
...(query
? [{ query_string: { query: `${query}*`, fields: ['doc_id', 'title'] } }]
: []),
{
// exclude user instructions
bool: { must_not: { term: { type: KnowledgeBaseType.UserInstruction } } },
},
],
},
},
sort: [
{
[String(sortBy)]: {
order: sortDirection,
},
},
],
sort:
sortBy === 'title'
? [
{ ['title.keyword']: { order: sortDirection } },
{ doc_id: { order: sortDirection } }, // sort by doc_id for backwards compatibility
]
: [{ [String(sortBy)]: { order: sortDirection } }],
size: 500,
_source: {
includes: [
Expand Down Expand Up @@ -661,6 +685,7 @@ export class KnowledgeBaseService {
document: {
'@timestamp': new Date().toISOString(),
...doc,
title: doc.title ?? doc.doc_id,
user,
namespace,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ export function splitKbText({
return [
{
type: KnowledgeBaseEntryOperationType.Delete,
doc_id: id,
groupId: id, // delete all entries with the same groupId
labels: {},
},
...texts.map((text, index) => ({
type: KnowledgeBaseEntryOperationType.Index,
document: merge({}, rest, {
id: [id, index].join('_'),
doc_id: id,
doc_id: id, // group_id is used to group entries together
labels: {},
text,
}),
Expand Down
Loading

0 comments on commit 14854d2

Please sign in to comment.