diff --git a/docs/technical/KnowledgeService.md b/docs/technical/KnowledgeService.md new file mode 100644 index 00000000..0a5b51e0 --- /dev/null +++ b/docs/technical/KnowledgeService.md @@ -0,0 +1,174 @@ +我来详细介绍 KnowledgeService 中控制并发处理的项目数量和工作负载的部分。 + +## 并发控制与工作负载管理机制 + +KnowledgeService 实现了一个精细的任务队列系统,用于控制并发处理的项目数量和工作负载。这个系统主要通过以下几个关键部分实现: + +### 1. 关键变量和限制 + +```typescript +private workload = 0 +private processingItemCount = 0 +private knowledgeItemProcessingQueueMappingPromise: Map void> = new Map() +private static MAXIMUM_WORKLOAD = 1024 * 1024 * 80 // 约80MB +private static MAXIMUM_PROCESSING_ITEM_COUNT = 30 +``` + +- `workload`: 跟踪当前正在处理的总工作量(以字节为单位) +- `processingItemCount`: 跟踪当前正在处理的项目数量 +- `MAXIMUM_WORKLOAD`: 设置最大工作负载为80MB +- `MAXIMUM_PROCESSING_ITEM_COUNT`: 设置最大并发处理项目数为30个 + +### 2. 工作负载评估 + +每个任务都有一个评估工作负载的机制,通过 `evaluateTaskWorkload` 属性来表示: + +```typescript +interface EvaluateTaskWorkload { + workload: number +} +``` + +不同类型的任务有不同的工作负载评估方式: + +- 文件任务:使用文件大小作为工作负载 `{ workload: file.size }` +- URL任务:使用固定值 `{ workload: 1024 * 1024 * 2 }` (约2MB) +- 网站地图任务:使用固定值 `{ workload: 1024 * 1024 * 20 }` (约20MB) +- 笔记任务:使用文本内容的字节长度 `{ workload: contentBytes.length }` + +### 3. 任务状态管理 + +任务通过状态枚举来跟踪其生命周期: + +```typescript +enum LoaderTaskItemState { + PENDING, // 等待处理 + PROCESSING, // 正在处理 + DONE // 已完成 +} +``` + +### 4. 任务队列处理核心逻辑 + +核心的队列处理逻辑在 `processingQueueHandle` 方法中: + +```typescript +private processingQueueHandle() { + const getSubtasksUntilMaximumLoad = (): QueueTaskItem[] => { + const queueTaskList: QueueTaskItem[] = [] + that: for (const [task, resolve] of this.knowledgeItemProcessingQueueMappingPromise) { + for (const item of task.loaderTasks) { + if (this.maximumLoad()) { + break that + } + + const { state, task: taskPromise, evaluateTaskWorkload } = item + + if (state !== LoaderTaskItemState.PENDING) { + continue + } + + const { workload } = evaluateTaskWorkload + this.workload += workload + this.processingItemCount += 1 + item.state = LoaderTaskItemState.PROCESSING + queueTaskList.push({ + taskPromise: () => + taskPromise().then(() => { + this.workload -= workload + this.processingItemCount -= 1 + task.loaderTasks.delete(item) + if (task.loaderTasks.size === 0) { + this.knowledgeItemProcessingQueueMappingPromise.delete(task) + resolve() + } + this.processingQueueHandle() + }), + resolve: () => {}, + evaluateTaskWorkload + }) + } + } + return queueTaskList + } + + const subTasks = getSubtasksUntilMaximumLoad() + if (subTasks.length > 0) { + const subTaskPromises = subTasks.map(({ taskPromise }) => taskPromise()) + Promise.all(subTaskPromises).then(() => { + subTasks.forEach(({ resolve }) => resolve()) + }) + } +} +``` + +这个方法的工作流程是: + +1. 遍历所有待处理的任务集合 +2. 对于每个任务集合中的每个子任务: + - 检查是否已达到最大负载(通过 `maximumLoad()` 方法) + - 如果任务状态为 PENDING,则: + - 增加当前工作负载和处理项目计数 + - 将任务状态更新为 PROCESSING + - 将任务添加到待执行队列 +3. 执行所有收集到的子任务 +4. 当子任务完成时: + - 减少工作负载和处理项目计数 + - 从任务集合中移除已完成的任务 + - 如果任务集合为空,则解析相应的 Promise + - 递归调用 `processingQueueHandle()` 以处理更多任务 + +### 5. 负载检查 + +```typescript +private maximumLoad() { + return ( + this.processingItemCount >= KnowledgeService.MAXIMUM_PROCESSING_ITEM_COUNT || + this.workload >= KnowledgeService.MAXIMUM_WORKLOAD + ) +} +``` + +这个方法检查当前是否已达到最大负载,通过两个条件: + +- 处理项目数量达到上限(30个) +- 总工作负载达到上限(80MB) + +### 6. 任务添加与执行流程 + +当添加新任务时,流程如下: + +1. 创建任务(根据类型不同创建不同的任务) +2. 通过 `appendProcessingQueue` 将任务添加到队列 +3. 调用 `processingQueueHandle` 开始处理队列中的任务 + +```typescript +private appendProcessingQueue(task: LoaderTask): Promise { + return new Promise((resolve) => { + this.knowledgeItemProcessingQueueMappingPromise.set(loaderTaskIntoOfSet(task), () => { + resolve(task.loaderDoneReturn!) + }) + }) +} +``` + +## 并发控制的优势 + +这种并发控制机制有几个重要优势: + +1. **资源使用优化**:通过限制同时处理的项目数量和总工作负载,避免系统资源过度使用 +2. **自动调节**:当任务完成时,会自动从队列中获取新任务,保持资源的高效利用 +3. **灵活性**:不同类型的任务有不同的工作负载评估,更准确地反映实际资源需求 +4. **可靠性**:通过状态管理和Promise解析机制,确保任务正确完成并通知调用者 + +## 实际应用场景 + +这种并发控制在处理大量数据时特别有用,例如: + +- 导入大型目录时,可能包含数百个文件 +- 处理大型网站地图,可能包含大量URL +- 处理多个用户同时添加知识库项目的请求 + +通过这种机制,系统可以平滑地处理大量请求,避免资源耗尽,同时保持良好的响应性。 + +总结来说,KnowledgeService 实现了一个复杂而高效的任务队列系统,通过精确控制并发处理的项目数量和工作负载,确保系统在处理大量数据时保持稳定和高效。 diff --git a/src/main/services/KnowledgeService.ts b/src/main/services/KnowledgeService.ts index b2632b56..f36d7deb 100644 --- a/src/main/services/KnowledgeService.ts +++ b/src/main/services/KnowledgeService.ts @@ -1,3 +1,18 @@ +/** + * Knowledge Service - Manages knowledge bases using RAG (Retrieval-Augmented Generation) + * + * This service handles creation, management, and querying of knowledge bases from various sources + * including files, directories, URLs, sitemaps, and notes. + * + * Features: + * - Concurrent task processing with workload management + * - Multiple data source support + * - Vector database integration + * + * For detailed documentation, see: + * @see {@link ../../../docs/technical/KnowledgeService.md} + */ + import * as fs from 'node:fs' import path from 'node:path'