feat: Improve file upload for the knowledge base (#2544)

* feat: Improve file upload for the knowledge base

* feat: Improve file upload for the knowledge base

* feat: Improve file upload for the knowledge base

---------

Co-authored-by: 亢奋猫 <kangfenmao@qq.com>
This commit is contained in:
icinggslits 2025-03-02 20:15:51 +08:00 committed by kangfenmao
parent 808b457503
commit 1b09bb47bf
2 changed files with 367 additions and 91 deletions

View File

@ -8,17 +8,77 @@ import { SitemapLoader } from '@llm-tools/embedjs-loader-sitemap'
import { WebLoader } from '@llm-tools/embedjs-loader-web' import { WebLoader } from '@llm-tools/embedjs-loader-web'
import { AzureOpenAiEmbeddings, OpenAiEmbeddings } from '@llm-tools/embedjs-openai' import { AzureOpenAiEmbeddings, OpenAiEmbeddings } from '@llm-tools/embedjs-openai'
import { addFileLoader } from '@main/loader' import { addFileLoader } from '@main/loader'
import { windowService } from '@main/services/WindowService'
import { getInstanceName } from '@main/utils' import { getInstanceName } from '@main/utils'
import { getAllFiles } from '@main/utils/file' import { getAllFiles } from '@main/utils/file'
import type { LoaderReturn } from '@shared/config/types' import type { LoaderReturn } from '@shared/config/types'
import { FileType, KnowledgeBaseParams, KnowledgeItem } from '@types' import { FileType, KnowledgeBaseParams, KnowledgeItem } from '@types'
import { app } from 'electron' import { app } from 'electron'
import Logger from 'electron-log'
import { v4 as uuidv4 } from 'uuid' import { v4 as uuidv4 } from 'uuid'
import { windowService } from './WindowService' export interface KnowledgeBaseAddItemOptions {
base: KnowledgeBaseParams
item: KnowledgeItem
forceReload?: boolean
}
interface KnowledgeBaseAddItemOptionsNonNullableAttribute {
base: KnowledgeBaseParams
item: KnowledgeItem
forceReload: boolean
}
interface EvaluateTaskWorkload {
workload: number
}
type LoaderDoneReturn = LoaderReturn | null
enum LoaderTaskItemState {
PENDING,
PROCESSING,
DONE
}
interface LoaderTaskItem {
state: LoaderTaskItemState
task: () => Promise<unknown>
evaluateTaskWorkload: EvaluateTaskWorkload
}
interface LoaderTask {
loaderTasks: LoaderTaskItem[]
loaderDoneReturn: LoaderDoneReturn
}
interface LoaderTaskOfSet {
loaderTasks: Set<LoaderTaskItem>
loaderDoneReturn: LoaderDoneReturn
}
interface QueueTaskItem {
taskPromise: () => Promise<unknown>
resolve: () => void
evaluateTaskWorkload: EvaluateTaskWorkload
}
const loaderTaskIntoOfSet = (loaderTask: LoaderTask): LoaderTaskOfSet => {
return {
loaderTasks: new Set(loaderTask.loaderTasks),
loaderDoneReturn: loaderTask.loaderDoneReturn
}
}
class KnowledgeService { class KnowledgeService {
private storageDir = path.join(app.getPath('userData'), 'Data', 'KnowledgeBase') private storageDir = path.join(app.getPath('userData'), 'Data', 'KnowledgeBase')
// Byte based
private workload = 0
private processingItemCount = 0
private knowledgeItemProcessingQueueMappingPromise: Map<LoaderTaskOfSet, () => void> = new Map()
private static MAXIMUM_WORKLOAD = 1024 * 1024 * 80
private static MAXIMUM_PROCESSING_ITEM_COUNT = 30
private static ERROR_LOADER_RETURN: LoaderReturn = { entriesAdded: 0, uniqueId: '', uniqueIds: [''], loaderType: '' }
constructor() { constructor() {
this.initStorageDir() this.initStorageDir()
@ -79,11 +139,52 @@ class KnowledgeService {
} }
} }
public add = async ( private maximumLoad() {
_: Electron.IpcMainInvokeEvent, return (
{ base, item, forceReload = false }: { base: KnowledgeBaseParams; item: KnowledgeItem; forceReload: boolean } this.processingItemCount >= KnowledgeService.MAXIMUM_PROCESSING_ITEM_COUNT ||
): Promise<LoaderReturn> => { this.workload >= KnowledgeService.MAXIMUM_WORKLOAD
const ragApplication = await this.getRagApplication(base) )
}
private fileTask(
ragApplication: RAGApplication,
options: KnowledgeBaseAddItemOptionsNonNullableAttribute
): LoaderTask {
const { base, item, forceReload } = options
const file = item.content as FileType
const loaderTask: LoaderTask = {
loaderTasks: [
{
state: LoaderTaskItemState.PENDING,
task: () =>
addFileLoader(ragApplication, file, base, forceReload)
.then((result) => {
loaderTask.loaderDoneReturn = result
return result
})
.catch((err) => {
Logger.error(err)
return KnowledgeService.ERROR_LOADER_RETURN
}),
evaluateTaskWorkload: { workload: file.size }
}
],
loaderDoneReturn: null
}
return loaderTask
}
private directoryTask(
ragApplication: RAGApplication,
options: KnowledgeBaseAddItemOptionsNonNullableAttribute
): LoaderTask {
const { base, item, forceReload } = options
const directory = item.content as string
const files = getAllFiles(directory)
const totalFiles = files.length
let processedFiles = 0
const sendDirectoryProcessingPercent = (totalFiles: number, processedFiles: number) => { const sendDirectoryProcessingPercent = (totalFiles: number, processedFiles: number) => {
const mainWindow = windowService.getMainWindow() const mainWindow = windowService.getMainWindow()
@ -93,86 +194,257 @@ class KnowledgeService {
}) })
} }
if (item.type === 'directory') { const loaderDoneReturn: LoaderDoneReturn = {
const directory = item.content as string entriesAdded: 0,
const files = getAllFiles(directory) uniqueId: `DirectoryLoader_${uuidv4()}`,
const totalFiles = files.length uniqueIds: [],
let processedFiles = 0 loaderType: 'DirectoryLoader'
}
const loaderPromises = files.map(async (file) => { const loaderTasks: LoaderTaskItem[] = []
const result = await addFileLoader(ragApplication, file, base, forceReload) for (const file of files) {
processedFiles++ loaderTasks.push({
sendDirectoryProcessingPercent(totalFiles, processedFiles) state: LoaderTaskItemState.PENDING,
return result task: () =>
addFileLoader(ragApplication, file, base, forceReload)
.then((result) => {
loaderDoneReturn.entriesAdded += 1
processedFiles += 1
sendDirectoryProcessingPercent(totalFiles, processedFiles)
loaderDoneReturn.uniqueIds.push(result.uniqueId)
return result
})
.catch((err) => {
Logger.error(err)
return KnowledgeService.ERROR_LOADER_RETURN
}),
evaluateTaskWorkload: { workload: file.size }
}) })
const loaderResults = await Promise.allSettled(loaderPromises)
// @ts-ignore uniqueId
const uniqueIds = loaderResults
.filter((result) => result.status === 'fulfilled')
.map((result) => result.value.uniqueId)
return {
entriesAdded: loaderResults.length,
uniqueId: `DirectoryLoader_${uuidv4()}`,
uniqueIds,
loaderType: 'DirectoryLoader'
} as LoaderReturn
} }
if (item.type === 'url') { return {
const content = item.content as string loaderTasks,
if (content.startsWith('http')) { loaderDoneReturn
const loaderReturn = await ragApplication.addLoader( }
new WebLoader({ urlOrContent: content, chunkSize: base.chunkSize, chunkOverlap: base.chunkOverlap }) as any, }
forceReload
) private urlTask(
return { ragApplication: RAGApplication,
entriesAdded: loaderReturn.entriesAdded, options: KnowledgeBaseAddItemOptionsNonNullableAttribute
uniqueId: loaderReturn.uniqueId, ): LoaderTask {
uniqueIds: [loaderReturn.uniqueId], const { base, item, forceReload } = options
loaderType: loaderReturn.loaderType const content = item.content as string
} as LoaderReturn
const loaderTask: LoaderTask = {
loaderTasks: [
{
state: LoaderTaskItemState.PENDING,
task: () => {
const loaderReturn = ragApplication.addLoader(
new WebLoader({
urlOrContent: content,
chunkSize: base.chunkSize,
chunkOverlap: base.chunkOverlap
}),
forceReload
) as Promise<LoaderReturn>
return loaderReturn
.then((result) => {
const { entriesAdded, uniqueId, loaderType } = result
loaderTask.loaderDoneReturn = {
entriesAdded: entriesAdded,
uniqueId: uniqueId,
uniqueIds: [uniqueId],
loaderType: loaderType
}
return result
})
.catch((err) => {
Logger.error(err)
return KnowledgeService.ERROR_LOADER_RETURN
})
},
evaluateTaskWorkload: { workload: 1024 * 1024 * 2 }
}
],
loaderDoneReturn: null
}
return loaderTask
}
private sitemapTask(
ragApplication: RAGApplication,
options: KnowledgeBaseAddItemOptionsNonNullableAttribute
): LoaderTask {
const { base, item, forceReload } = options
const content = item.content as string
const loaderTask: LoaderTask = {
loaderTasks: [
{
state: LoaderTaskItemState.PENDING,
task: () =>
ragApplication
.addLoader(
new SitemapLoader({ url: content, chunkSize: base.chunkSize, chunkOverlap: base.chunkOverlap }) as any,
forceReload
)
.then((result) => {
const { entriesAdded, uniqueId, loaderType } = result
loaderTask.loaderDoneReturn = {
entriesAdded: entriesAdded,
uniqueId: uniqueId,
uniqueIds: [uniqueId],
loaderType: loaderType
}
return result
})
.catch((err) => {
Logger.error(err)
return KnowledgeService.ERROR_LOADER_RETURN
}),
evaluateTaskWorkload: { workload: 1024 * 1024 * 20 }
}
],
loaderDoneReturn: null
}
return loaderTask
}
private noteTask(
ragApplication: RAGApplication,
options: KnowledgeBaseAddItemOptionsNonNullableAttribute
): LoaderTask {
const { base, item, forceReload } = options
const content = item.content as string
console.debug('chunkSize', base.chunkSize)
const encoder = new TextEncoder()
const contentBytes = encoder.encode(content)
const loaderTask: LoaderTask = {
loaderTasks: [
{
state: LoaderTaskItemState.PENDING,
task: () => {
const loaderReturn = ragApplication.addLoader(
new TextLoader({ text: content, chunkSize: base.chunkSize, chunkOverlap: base.chunkOverlap }),
forceReload
) as Promise<LoaderReturn>
return loaderReturn
.then(({ entriesAdded, uniqueId, loaderType }) => {
loaderTask.loaderDoneReturn = {
entriesAdded: entriesAdded,
uniqueId: uniqueId,
uniqueIds: [uniqueId],
loaderType: loaderType
}
})
.catch((err) => {
Logger.error(err)
return KnowledgeService.ERROR_LOADER_RETURN
})
},
evaluateTaskWorkload: { workload: contentBytes.length }
}
],
loaderDoneReturn: null
}
return loaderTask
}
private processingQueueHandle() {
const getSubtasksUntilMaximumLoad = (): QueueTaskItem[] => {
const queueTaskList: QueueTaskItem[] = []
that: for (const [task, resolve] of this.knowledgeItemProcessingQueueMappingPromise) {
for (const item of task.loaderTasks) {
if (this.maximumLoad()) {
break that
}
const { state, task: taskPromise, evaluateTaskWorkload } = item
if (state !== LoaderTaskItemState.PENDING) {
continue
}
const { workload } = evaluateTaskWorkload
this.workload += workload
this.processingItemCount += 1
item.state = LoaderTaskItemState.PROCESSING
queueTaskList.push({
taskPromise: () =>
taskPromise().then(() => {
this.workload -= workload
this.processingItemCount -= 1
task.loaderTasks.delete(item)
if (task.loaderTasks.size === 0) {
this.knowledgeItemProcessingQueueMappingPromise.delete(task)
resolve()
}
this.processingQueueHandle()
}),
resolve: () => {},
evaluateTaskWorkload
})
}
} }
return queueTaskList
} }
const subTasks = getSubtasksUntilMaximumLoad()
if (item.type === 'sitemap') { if (subTasks.length > 0) {
const content = item.content as string const subTaskPromises = subTasks.map(({ taskPromise }) => taskPromise())
// @ts-ignore loader type Promise.all(subTaskPromises).then(() => {
const loaderReturn = await ragApplication.addLoader( subTasks.forEach(({ resolve }) => resolve())
new SitemapLoader({ url: content, chunkSize: base.chunkSize, chunkOverlap: base.chunkOverlap }) as any, })
forceReload
)
return {
entriesAdded: loaderReturn.entriesAdded,
uniqueId: loaderReturn.uniqueId,
uniqueIds: [loaderReturn.uniqueId],
loaderType: loaderReturn.loaderType
} as LoaderReturn
} }
}
if (item.type === 'note') { private appendProcessingQueue(task: LoaderTask): Promise<LoaderReturn> {
const content = item.content as string return new Promise((resolve) => {
console.debug('chunkSize', base.chunkSize) this.knowledgeItemProcessingQueueMappingPromise.set(loaderTaskIntoOfSet(task), () => {
const loaderReturn = await ragApplication.addLoader( resolve(task.loaderDoneReturn!)
new TextLoader({ text: content, chunkSize: base.chunkSize, chunkOverlap: base.chunkOverlap }), })
forceReload })
) }
return {
entriesAdded: loaderReturn.entriesAdded,
uniqueId: loaderReturn.uniqueId,
uniqueIds: [loaderReturn.uniqueId],
loaderType: loaderReturn.loaderType
} as LoaderReturn
}
if (item.type === 'file') { public add = (_: Electron.IpcMainInvokeEvent, options: KnowledgeBaseAddItemOptions): Promise<LoaderReturn> => {
const file = item.content as FileType return new Promise((resolve) => {
const { base, item, forceReload = false } = options
const optionsNonNullableAttribute = { base, item, forceReload }
this.getRagApplication(base)
.then((ragApplication) => {
const task = (() => {
switch (item.type) {
case 'file':
return this.fileTask(ragApplication, optionsNonNullableAttribute)
case 'directory':
return this.directoryTask(ragApplication, optionsNonNullableAttribute)
case 'url':
return this.urlTask(ragApplication, optionsNonNullableAttribute)
case 'sitemap':
return this.sitemapTask(ragApplication, optionsNonNullableAttribute)
case 'note':
return this.noteTask(ragApplication, optionsNonNullableAttribute)
default:
return null
}
})()
return await addFileLoader(ragApplication, file, base, forceReload) if (task) {
} this.appendProcessingQueue(task).then(() => {
resolve(task.loaderDoneReturn!)
return { entriesAdded: 0, uniqueId: '', uniqueIds: [''], loaderType: '' } })
this.processingQueueHandle()
} else {
resolve(KnowledgeService.ERROR_LOADER_RETURN)
}
})
.catch((err) => {
Logger.error(err)
resolve(KnowledgeService.ERROR_LOADER_RETURN)
})
})
} }
public remove = async ( public remove = async (

View File

@ -51,20 +51,24 @@ class KnowledgeQueue {
throw new Error('Knowledge base not found') throw new Error('Knowledge base not found')
} }
const processableItems = base.items.filter((item) => { const findProcessableItem = () => {
if (item.processingStatus === 'failed') { const state = store.getState()
return !item.retryCount || item.retryCount < this.MAX_RETRIES const base = state.knowledge.bases.find((b) => b.id === baseId)
} return (
return item.processingStatus === 'pending' base?.items.find((item) => {
}) if (item.processingStatus === 'failed') {
return !item.retryCount || item.retryCount < this.MAX_RETRIES
} else {
return item.processingStatus === 'pending'
}
}) ?? null
)
}
for (const item of processableItems) { let processableItem = findProcessableItem()
if (!this.processing.get(baseId)) { while (processableItem) {
console.log(`[KnowledgeQueue] Processing interrupted for base ${baseId}`) this.processItem(baseId, processableItem).then()
break processableItem = findProcessableItem()
}
this.processItem(baseId, item)
} }
} finally { } finally {
console.log(`[KnowledgeQueue] Finished processing queue for base ${baseId}`) console.log(`[KnowledgeQueue] Finished processing queue for base ${baseId}`)
@ -153,7 +157,7 @@ class KnowledgeQueue {
} }
console.debug(`[KnowledgeQueue] Updated uniqueId for item ${item.id} in base ${baseId} `) console.debug(`[KnowledgeQueue] Updated uniqueId for item ${item.id} in base ${baseId} `)
setTimeout(() => store.dispatch(clearCompletedProcessing({ baseId })), 1000) store.dispatch(clearCompletedProcessing({ baseId }))
} catch (error) { } catch (error) {
console.error(`[KnowledgeQueue] Error processing item ${item.id}: `, error) console.error(`[KnowledgeQueue] Error processing item ${item.id}: `, error)
store.dispatch( store.dispatch(