From a22ec5c7b2184f59939340c22c3d72f8d1fadf16 Mon Sep 17 00:00:00 2001 From: icinggslits Date: Fri, 28 Feb 2025 14:16:35 +0800 Subject: [PATCH 1/3] feat: Improve file upload for the knowledge base --- src/main/services/KnowledgeService.ts | 406 +++++++++++++++++++---- src/renderer/src/queue/KnowledgeQueue.ts | 32 +- 2 files changed, 351 insertions(+), 87 deletions(-) diff --git a/src/main/services/KnowledgeService.ts b/src/main/services/KnowledgeService.ts index ae116bb41a..a5ed93e2e2 100644 --- a/src/main/services/KnowledgeService.ts +++ b/src/main/services/KnowledgeService.ts @@ -8,17 +8,77 @@ import { SitemapLoader } from '@llm-tools/embedjs-loader-sitemap' import { WebLoader } from '@llm-tools/embedjs-loader-web' import { AzureOpenAiEmbeddings, OpenAiEmbeddings } from '@llm-tools/embedjs-openai' import { addFileLoader } from '@main/loader' +import { windowService } from '@main/services/WindowService' import { getInstanceName } from '@main/utils' import { getAllFiles } from '@main/utils/file' import type { LoaderReturn } from '@shared/config/types' import { FileType, KnowledgeBaseParams, KnowledgeItem } from '@types' import { app } from 'electron' +import Logger from 'electron-log' import { v4 as uuidv4 } from 'uuid' -import { windowService } from './WindowService' +export interface KnowledgeBaseAddItemOptions { + base: KnowledgeBaseParams + item: KnowledgeItem + forceReload?: boolean +} + +interface KnowledgeBaseAddItemOptionsNonNullableAttribute { + base: KnowledgeBaseParams + item: KnowledgeItem + forceReload: boolean +} + +interface EvaluateTaskWorkload { + workload: number +} + +type LoaderDoneReturn = LoaderReturn | null + +enum LoaderTaskItemState { + PENDING, + PROCESSING, + DONE +} + +interface LoaderTaskItem { + state: LoaderTaskItemState + task: () => Promise + evaluateTaskWorkload: EvaluateTaskWorkload +} + +interface LoaderTask { + loaderTasks: LoaderTaskItem[] + loaderDoneReturn: LoaderDoneReturn +} + +interface LoaderTaskOfSet { + loaderTasks: Set + loaderDoneReturn: LoaderDoneReturn +} + +interface QueueTaskItem { + taskPromise: () => Promise + resolve: () => void + evaluateTaskWorkload: EvaluateTaskWorkload +} + +const loaderTaskIntoOfSet = (loaderTask: LoaderTask): LoaderTaskOfSet => { + return { + loaderTasks: new Set(loaderTask.loaderTasks), + loaderDoneReturn: loaderTask.loaderDoneReturn + } +} class KnowledgeService { private storageDir = path.join(app.getPath('userData'), 'Data', 'KnowledgeBase') + // Byte based + private workload = 0 + private processingItemCount = 0 + private knowledgeItemProcessingQueueMappingPromise: Map void> = new Map() + private static MAXIMUM_WORKLOAD = 1024 * 1024 * 80 + private static MAXIMUM_PROCESSING_ITEM_COUNT = 30 + private static ERROR_LOADER_RETURN: LoaderReturn = { entriesAdded: 0, uniqueId: '', uniqueIds: [''], loaderType: '' } constructor() { this.initStorageDir() @@ -79,11 +139,52 @@ class KnowledgeService { } } - public add = async ( - _: Electron.IpcMainInvokeEvent, - { base, item, forceReload = false }: { base: KnowledgeBaseParams; item: KnowledgeItem; forceReload: boolean } - ): Promise => { - const ragApplication = await this.getRagApplication(base) + private maximumLoad() { + return ( + this.processingItemCount >= KnowledgeService.MAXIMUM_PROCESSING_ITEM_COUNT || + this.workload >= KnowledgeService.MAXIMUM_WORKLOAD + ) + } + + private fileTask( + ragApplication: RAGApplication, + options: KnowledgeBaseAddItemOptionsNonNullableAttribute + ): LoaderTask { + const { base, item, forceReload } = options + const file = item.content as FileType + + const loaderTask: LoaderTask = { + loaderTasks: [ + { + state: LoaderTaskItemState.PENDING, + task: () => + addFileLoader(ragApplication, file, base, forceReload) + .then((result) => { + loaderTask.loaderDoneReturn = result + return result + }) + .catch((err) => { + Logger.error(err) + return KnowledgeService.ERROR_LOADER_RETURN + }), + evaluateTaskWorkload: { workload: file.size } + } + ], + loaderDoneReturn: null + } + + return loaderTask + } + + private directoryTask( + ragApplication: RAGApplication, + options: KnowledgeBaseAddItemOptionsNonNullableAttribute + ): LoaderTask { + const { base, item, forceReload } = options + const directory = item.content as string + const files = getAllFiles(directory) + const totalFiles = files.length + let processedFiles = 0 const sendDirectoryProcessingPercent = (totalFiles: number, processedFiles: number) => { const mainWindow = windowService.getMainWindow() @@ -93,86 +194,245 @@ class KnowledgeService { }) } - if (item.type === 'directory') { - const directory = item.content as string - const files = getAllFiles(directory) - const totalFiles = files.length - let processedFiles = 0 - - const loaderPromises = files.map(async (file) => { - const result = await addFileLoader(ragApplication, file, base, forceReload) - processedFiles++ - sendDirectoryProcessingPercent(totalFiles, processedFiles) - return result + const loaderDoneReturn: LoaderDoneReturn = { + entriesAdded: 0, + uniqueId: `DirectoryLoader_${uuidv4()}`, + uniqueIds: [], + loaderType: 'DirectoryLoader' + } + const loaderTasks: LoaderTaskItem[] = [] + for (const file of files) { + loaderTasks.push({ + state: LoaderTaskItemState.PENDING, + task: () => + addFileLoader(ragApplication, file, base, forceReload) + .then((result) => { + loaderDoneReturn.entriesAdded += 1 + processedFiles += 1 + sendDirectoryProcessingPercent(totalFiles, processedFiles) + loaderDoneReturn.uniqueIds.push(result.uniqueId) + return result + }) + .catch((err) => { + Logger.error(err) + return KnowledgeService.ERROR_LOADER_RETURN + }), + evaluateTaskWorkload: { workload: file.size } }) + } - const loaderResults = await Promise.allSettled(loaderPromises) - // @ts-ignore uniqueId - const uniqueIds = loaderResults - .filter((result) => result.status === 'fulfilled') - .map((result) => result.value.uniqueId) - - return { - entriesAdded: loaderResults.length, - uniqueId: `DirectoryLoader_${uuidv4()}`, - uniqueIds, - loaderType: 'DirectoryLoader' - } as LoaderReturn + return { + loaderTasks, + loaderDoneReturn } + } - if (item.type === 'url') { - const content = item.content as string - if (content.startsWith('http')) { - const loaderReturn = await ragApplication.addLoader( - new WebLoader({ urlOrContent: content, chunkSize: base.chunkSize, chunkOverlap: base.chunkOverlap }) as any, - forceReload - ) - return { - entriesAdded: loaderReturn.entriesAdded, - uniqueId: loaderReturn.uniqueId, - uniqueIds: [loaderReturn.uniqueId], - loaderType: loaderReturn.loaderType - } as LoaderReturn - } + private urlTask( + ragApplication: RAGApplication, + options: KnowledgeBaseAddItemOptionsNonNullableAttribute + ): LoaderTask { + const { base, item, forceReload } = options + const content = item.content as string + const loaderReturn = ragApplication.addLoader( + new WebLoader({ urlOrContent: content, chunkSize: base.chunkSize, chunkOverlap: base.chunkOverlap }) as any, + forceReload + ) + + const loaderTask: LoaderTask = { + loaderTasks: [ + { + state: LoaderTaskItemState.PENDING, + task: () => + loaderReturn + .then(({ entriesAdded, uniqueId, loaderType }) => { + loaderTask.loaderDoneReturn = { + entriesAdded: entriesAdded, + uniqueId: uniqueId, + uniqueIds: [uniqueId], + loaderType: loaderType + } + }) + .catch((err) => { + Logger.error(err) + return KnowledgeService.ERROR_LOADER_RETURN + }), + evaluateTaskWorkload: { workload: 1024 * 1024 * 2 } + } + ], + loaderDoneReturn: null } + return loaderTask + } - if (item.type === 'sitemap') { - const content = item.content as string - // @ts-ignore loader type - const loaderReturn = await ragApplication.addLoader( - new SitemapLoader({ url: content, chunkSize: base.chunkSize, chunkOverlap: base.chunkOverlap }) as any, - forceReload - ) - return { - entriesAdded: loaderReturn.entriesAdded, - uniqueId: loaderReturn.uniqueId, - uniqueIds: [loaderReturn.uniqueId], - loaderType: loaderReturn.loaderType - } as LoaderReturn + private sitemapTask( + ragApplication: RAGApplication, + options: KnowledgeBaseAddItemOptionsNonNullableAttribute + ): LoaderTask { + const { base, item, forceReload } = options + const content = item.content as string + const loaderReturn = ragApplication.addLoader( + new SitemapLoader({ url: content, chunkSize: base.chunkSize, chunkOverlap: base.chunkOverlap }) as any, + forceReload + ) + + const loaderTask: LoaderTask = { + loaderTasks: [ + { + state: LoaderTaskItemState.PENDING, + task: () => + loaderReturn + .then(({ entriesAdded, uniqueId, loaderType }) => { + loaderTask.loaderDoneReturn = { + entriesAdded: entriesAdded, + uniqueId: uniqueId, + uniqueIds: [uniqueId], + loaderType: loaderType + } + }) + .catch((err) => { + Logger.error(err) + return KnowledgeService.ERROR_LOADER_RETURN + }), + evaluateTaskWorkload: { workload: 1024 * 1024 * 20 } + } + ], + loaderDoneReturn: null } + return loaderTask + } - if (item.type === 'note') { - const content = item.content as string - console.debug('chunkSize', base.chunkSize) - const loaderReturn = await ragApplication.addLoader( - new TextLoader({ text: content, chunkSize: base.chunkSize, chunkOverlap: base.chunkOverlap }), - forceReload - ) - return { - entriesAdded: loaderReturn.entriesAdded, - uniqueId: loaderReturn.uniqueId, - uniqueIds: [loaderReturn.uniqueId], - loaderType: loaderReturn.loaderType - } as LoaderReturn + private noteTask( + ragApplication: RAGApplication, + options: KnowledgeBaseAddItemOptionsNonNullableAttribute + ): LoaderTask { + const { base, item, forceReload } = options + const content = item.content as string + console.debug('chunkSize', base.chunkSize) + const loaderReturn = ragApplication.addLoader( + new TextLoader({ text: content, chunkSize: base.chunkSize, chunkOverlap: base.chunkOverlap }), + forceReload + ) + + const encoder = new TextEncoder() + const contentBytes = encoder.encode(content) + const loaderTask: LoaderTask = { + loaderTasks: [ + { + state: LoaderTaskItemState.PENDING, + task: () => + loaderReturn + .then(({ entriesAdded, uniqueId, loaderType }) => { + loaderTask.loaderDoneReturn = { + entriesAdded: entriesAdded, + uniqueId: uniqueId, + uniqueIds: [uniqueId], + loaderType: loaderType + } + }) + .catch((err) => { + Logger.error(err) + return KnowledgeService.ERROR_LOADER_RETURN + }), + evaluateTaskWorkload: { workload: contentBytes.length } + } + ], + loaderDoneReturn: null } + return loaderTask + } + + private processingQueueHandle() { + const getSubtasksUntilMaximumLoad = (): QueueTaskItem[] => { + const queueTaskList: QueueTaskItem[] = [] + that: for (const [task, resolve] of this.knowledgeItemProcessingQueueMappingPromise) { + for (const item of task.loaderTasks) { + if (this.maximumLoad()) { + break that + } + + const { state, task: taskPromise, evaluateTaskWorkload } = item - if (item.type === 'file') { - const file = item.content as FileType + if (state !== LoaderTaskItemState.PENDING) { + break + } - return await addFileLoader(ragApplication, file, base, forceReload) + const { workload } = evaluateTaskWorkload + this.workload += workload + this.processingItemCount += 1 + item.state = LoaderTaskItemState.PROCESSING + queueTaskList.push({ + taskPromise: () => + taskPromise().then(() => { + this.workload -= workload + this.processingItemCount -= 1 + task.loaderTasks.delete(item) + if (task.loaderTasks.size === 0) { + this.knowledgeItemProcessingQueueMappingPromise.delete(task) + resolve() + } + this.processingQueueHandle() + }), + resolve: () => {}, + evaluateTaskWorkload + }) + } + } + return queueTaskList + } + const subTasks = getSubtasksUntilMaximumLoad() + if (subTasks.length > 0) { + const subTaskPromises = subTasks.map(({ taskPromise }) => taskPromise()) + Promise.all(subTaskPromises).then(() => { + subTasks.forEach(({ resolve }) => resolve()) + }) } + } + + private appendProcessingQueue(task: LoaderTask): Promise { + return new Promise((resolve) => { + this.knowledgeItemProcessingQueueMappingPromise.set(loaderTaskIntoOfSet(task), () => { + resolve(task.loaderDoneReturn!) + }) + }) + } + + public add = (_: Electron.IpcMainInvokeEvent, options: KnowledgeBaseAddItemOptions): Promise => { + return new Promise((resolve) => { + const { base, item, forceReload = false } = options + const optionsNonNullableAttribute = { base, item, forceReload } + this.getRagApplication(base) + .then((ragApplication) => { + const task = (() => { + switch (item.type) { + case 'file': + return this.fileTask(ragApplication, optionsNonNullableAttribute) + case 'directory': + return this.directoryTask(ragApplication, optionsNonNullableAttribute) + case 'url': + return this.urlTask(ragApplication, optionsNonNullableAttribute) + case 'sitemap': + return this.sitemapTask(ragApplication, optionsNonNullableAttribute) + case 'note': + return this.noteTask(ragApplication, optionsNonNullableAttribute) + default: + return null + } + })() - return { entriesAdded: 0, uniqueId: '', uniqueIds: [''], loaderType: '' } + if (task) { + this.appendProcessingQueue(task).then(() => { + resolve(task.loaderDoneReturn!) + }) + this.processingQueueHandle() + } else { + resolve(KnowledgeService.ERROR_LOADER_RETURN) + } + }) + .catch((err) => { + Logger.error(err) + resolve(KnowledgeService.ERROR_LOADER_RETURN) + }) + }) } public remove = async ( diff --git a/src/renderer/src/queue/KnowledgeQueue.ts b/src/renderer/src/queue/KnowledgeQueue.ts index a4ccef1342..daabd91793 100644 --- a/src/renderer/src/queue/KnowledgeQueue.ts +++ b/src/renderer/src/queue/KnowledgeQueue.ts @@ -51,20 +51,24 @@ class KnowledgeQueue { throw new Error('Knowledge base not found') } - const processableItems = base.items.filter((item) => { - if (item.processingStatus === 'failed') { - return !item.retryCount || item.retryCount < this.MAX_RETRIES - } - return item.processingStatus === 'pending' - }) - - for (const item of processableItems) { - if (!this.processing.get(baseId)) { - console.log(`[KnowledgeQueue] Processing interrupted for base ${baseId}`) - break - } + const findProcessableItem = () => { + const state = store.getState() + const base = state.knowledge.bases.find((b) => b.id === baseId) + return ( + base?.items.find((item) => { + if (item.processingStatus === 'failed') { + return !item.retryCount || item.retryCount < this.MAX_RETRIES + } else { + return item.processingStatus === 'pending' + } + }) ?? null + ) + } - this.processItem(baseId, item) + let processableItem = findProcessableItem() + while (processableItem) { + this.processItem(baseId, processableItem).then() + processableItem = findProcessableItem() } } finally { console.log(`[KnowledgeQueue] Finished processing queue for base ${baseId}`) @@ -153,7 +157,7 @@ class KnowledgeQueue { } console.debug(`[KnowledgeQueue] Updated uniqueId for item ${item.id} in base ${baseId} `) - setTimeout(() => store.dispatch(clearCompletedProcessing({ baseId })), 1000) + store.dispatch(clearCompletedProcessing({ baseId })) } catch (error) { console.error(`[KnowledgeQueue] Error processing item ${item.id}: `, error) store.dispatch( From bfee199b40955b47b076fb4e487c6fbe350ce4c9 Mon Sep 17 00:00:00 2001 From: icinggslits Date: Fri, 28 Feb 2025 17:10:23 +0800 Subject: [PATCH 2/3] feat: Improve file upload for the knowledge base --- src/main/services/KnowledgeService.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/services/KnowledgeService.ts b/src/main/services/KnowledgeService.ts index d9d5cb9417..e2be409fe6 100644 --- a/src/main/services/KnowledgeService.ts +++ b/src/main/services/KnowledgeService.ts @@ -356,7 +356,7 @@ class KnowledgeService { const { state, task: taskPromise, evaluateTaskWorkload } = item if (state !== LoaderTaskItemState.PENDING) { - break + continue } const { workload } = evaluateTaskWorkload From 5eeadf3e040ca227ed2835ddcf674a5914e2b37f Mon Sep 17 00:00:00 2001 From: icinggslits Date: Sat, 1 Mar 2025 02:50:17 +0800 Subject: [PATCH 3/3] feat: Improve file upload for the knowledge base --- src/main/services/KnowledgeService.ts | 54 ++++++++++++++++----------- 1 file changed, 33 insertions(+), 21 deletions(-) diff --git a/src/main/services/KnowledgeService.ts b/src/main/services/KnowledgeService.ts index e2be409fe6..ab0a901ddd 100644 --- a/src/main/services/KnowledgeService.ts +++ b/src/main/services/KnowledgeService.ts @@ -236,29 +236,37 @@ class KnowledgeService { ): LoaderTask { const { base, item, forceReload } = options const content = item.content as string - const loaderReturn = ragApplication.addLoader( - new WebLoader({ urlOrContent: content, chunkSize: base.chunkSize, chunkOverlap: base.chunkOverlap }) as any, - forceReload - ) const loaderTask: LoaderTask = { loaderTasks: [ { state: LoaderTaskItemState.PENDING, - task: () => - loaderReturn - .then(({ entriesAdded, uniqueId, loaderType }) => { + task: () => { + const loaderReturn = ragApplication.addLoader( + new WebLoader({ + urlOrContent: content, + chunkSize: base.chunkSize, + chunkOverlap: base.chunkOverlap + }), + forceReload + ) as Promise + + return loaderReturn + .then((result) => { + const { entriesAdded, uniqueId, loaderType } = result loaderTask.loaderDoneReturn = { entriesAdded: entriesAdded, uniqueId: uniqueId, uniqueIds: [uniqueId], loaderType: loaderType } + return result }) .catch((err) => { Logger.error(err) return KnowledgeService.ERROR_LOADER_RETURN - }), + }) + }, evaluateTaskWorkload: { workload: 1024 * 1024 * 2 } } ], @@ -273,24 +281,26 @@ class KnowledgeService { ): LoaderTask { const { base, item, forceReload } = options const content = item.content as string - const loaderReturn = ragApplication.addLoader( - new SitemapLoader({ url: content, chunkSize: base.chunkSize, chunkOverlap: base.chunkOverlap }) as any, - forceReload - ) const loaderTask: LoaderTask = { loaderTasks: [ { state: LoaderTaskItemState.PENDING, task: () => - loaderReturn - .then(({ entriesAdded, uniqueId, loaderType }) => { + ragApplication + .addLoader( + new SitemapLoader({ url: content, chunkSize: base.chunkSize, chunkOverlap: base.chunkOverlap }) as any, + forceReload + ) + .then((result) => { + const { entriesAdded, uniqueId, loaderType } = result loaderTask.loaderDoneReturn = { entriesAdded: entriesAdded, uniqueId: uniqueId, uniqueIds: [uniqueId], loaderType: loaderType } + return result }) .catch((err) => { Logger.error(err) @@ -311,10 +321,6 @@ class KnowledgeService { const { base, item, forceReload } = options const content = item.content as string console.debug('chunkSize', base.chunkSize) - const loaderReturn = ragApplication.addLoader( - new TextLoader({ text: content, chunkSize: base.chunkSize, chunkOverlap: base.chunkOverlap }), - forceReload - ) const encoder = new TextEncoder() const contentBytes = encoder.encode(content) @@ -322,8 +328,13 @@ class KnowledgeService { loaderTasks: [ { state: LoaderTaskItemState.PENDING, - task: () => - loaderReturn + task: () => { + const loaderReturn = ragApplication.addLoader( + new TextLoader({ text: content, chunkSize: base.chunkSize, chunkOverlap: base.chunkOverlap }), + forceReload + ) as Promise + + return loaderReturn .then(({ entriesAdded, uniqueId, loaderType }) => { loaderTask.loaderDoneReturn = { entriesAdded: entriesAdded, @@ -335,7 +346,8 @@ class KnowledgeService { .catch((err) => { Logger.error(err) return KnowledgeService.ERROR_LOADER_RETURN - }), + }) + }, evaluateTaskWorkload: { workload: contentBytes.length } } ],