docs: Add comprehensive documentation for KnowledgeService concurrent processing mechanism
- Created detailed technical documentation explaining the KnowledgeService's task queue and workload management - Added JSDoc comment to KnowledgeService.ts to provide high-level overview - Documented key aspects of concurrent task processing, including workload evaluation, task state management, and resource optimization strategies
This commit is contained in:
parent
1b09bb47bf
commit
687f140a5c
174
docs/technical/KnowledgeService.md
Normal file
174
docs/technical/KnowledgeService.md
Normal file
@ -0,0 +1,174 @@
|
|||||||
|
我来详细介绍 KnowledgeService 中控制并发处理的项目数量和工作负载的部分。
|
||||||
|
|
||||||
|
## 并发控制与工作负载管理机制
|
||||||
|
|
||||||
|
KnowledgeService 实现了一个精细的任务队列系统,用于控制并发处理的项目数量和工作负载。这个系统主要通过以下几个关键部分实现:
|
||||||
|
|
||||||
|
### 1. 关键变量和限制
|
||||||
|
|
||||||
|
```typescript
|
||||||
|
private workload = 0
|
||||||
|
private processingItemCount = 0
|
||||||
|
private knowledgeItemProcessingQueueMappingPromise: Map<LoaderTaskOfSet, () => void> = new Map()
|
||||||
|
private static MAXIMUM_WORKLOAD = 1024 * 1024 * 80 // 约80MB
|
||||||
|
private static MAXIMUM_PROCESSING_ITEM_COUNT = 30
|
||||||
|
```
|
||||||
|
|
||||||
|
- `workload`: 跟踪当前正在处理的总工作量(以字节为单位)
|
||||||
|
- `processingItemCount`: 跟踪当前正在处理的项目数量
|
||||||
|
- `MAXIMUM_WORKLOAD`: 设置最大工作负载为80MB
|
||||||
|
- `MAXIMUM_PROCESSING_ITEM_COUNT`: 设置最大并发处理项目数为30个
|
||||||
|
|
||||||
|
### 2. 工作负载评估
|
||||||
|
|
||||||
|
每个任务都有一个评估工作负载的机制,通过 `evaluateTaskWorkload` 属性来表示:
|
||||||
|
|
||||||
|
```typescript
|
||||||
|
interface EvaluateTaskWorkload {
|
||||||
|
workload: number
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
不同类型的任务有不同的工作负载评估方式:
|
||||||
|
|
||||||
|
- 文件任务:使用文件大小作为工作负载 `{ workload: file.size }`
|
||||||
|
- URL任务:使用固定值 `{ workload: 1024 * 1024 * 2 }` (约2MB)
|
||||||
|
- 网站地图任务:使用固定值 `{ workload: 1024 * 1024 * 20 }` (约20MB)
|
||||||
|
- 笔记任务:使用文本内容的字节长度 `{ workload: contentBytes.length }`
|
||||||
|
|
||||||
|
### 3. 任务状态管理
|
||||||
|
|
||||||
|
任务通过状态枚举来跟踪其生命周期:
|
||||||
|
|
||||||
|
```typescript
|
||||||
|
enum LoaderTaskItemState {
|
||||||
|
PENDING, // 等待处理
|
||||||
|
PROCESSING, // 正在处理
|
||||||
|
DONE // 已完成
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### 4. 任务队列处理核心逻辑
|
||||||
|
|
||||||
|
核心的队列处理逻辑在 `processingQueueHandle` 方法中:
|
||||||
|
|
||||||
|
```typescript
|
||||||
|
private processingQueueHandle() {
|
||||||
|
const getSubtasksUntilMaximumLoad = (): QueueTaskItem[] => {
|
||||||
|
const queueTaskList: QueueTaskItem[] = []
|
||||||
|
that: for (const [task, resolve] of this.knowledgeItemProcessingQueueMappingPromise) {
|
||||||
|
for (const item of task.loaderTasks) {
|
||||||
|
if (this.maximumLoad()) {
|
||||||
|
break that
|
||||||
|
}
|
||||||
|
|
||||||
|
const { state, task: taskPromise, evaluateTaskWorkload } = item
|
||||||
|
|
||||||
|
if (state !== LoaderTaskItemState.PENDING) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
const { workload } = evaluateTaskWorkload
|
||||||
|
this.workload += workload
|
||||||
|
this.processingItemCount += 1
|
||||||
|
item.state = LoaderTaskItemState.PROCESSING
|
||||||
|
queueTaskList.push({
|
||||||
|
taskPromise: () =>
|
||||||
|
taskPromise().then(() => {
|
||||||
|
this.workload -= workload
|
||||||
|
this.processingItemCount -= 1
|
||||||
|
task.loaderTasks.delete(item)
|
||||||
|
if (task.loaderTasks.size === 0) {
|
||||||
|
this.knowledgeItemProcessingQueueMappingPromise.delete(task)
|
||||||
|
resolve()
|
||||||
|
}
|
||||||
|
this.processingQueueHandle()
|
||||||
|
}),
|
||||||
|
resolve: () => {},
|
||||||
|
evaluateTaskWorkload
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return queueTaskList
|
||||||
|
}
|
||||||
|
|
||||||
|
const subTasks = getSubtasksUntilMaximumLoad()
|
||||||
|
if (subTasks.length > 0) {
|
||||||
|
const subTaskPromises = subTasks.map(({ taskPromise }) => taskPromise())
|
||||||
|
Promise.all(subTaskPromises).then(() => {
|
||||||
|
subTasks.forEach(({ resolve }) => resolve())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
这个方法的工作流程是:
|
||||||
|
|
||||||
|
1. 遍历所有待处理的任务集合
|
||||||
|
2. 对于每个任务集合中的每个子任务:
|
||||||
|
- 检查是否已达到最大负载(通过 `maximumLoad()` 方法)
|
||||||
|
- 如果任务状态为 PENDING,则:
|
||||||
|
- 增加当前工作负载和处理项目计数
|
||||||
|
- 将任务状态更新为 PROCESSING
|
||||||
|
- 将任务添加到待执行队列
|
||||||
|
3. 执行所有收集到的子任务
|
||||||
|
4. 当子任务完成时:
|
||||||
|
- 减少工作负载和处理项目计数
|
||||||
|
- 从任务集合中移除已完成的任务
|
||||||
|
- 如果任务集合为空,则解析相应的 Promise
|
||||||
|
- 递归调用 `processingQueueHandle()` 以处理更多任务
|
||||||
|
|
||||||
|
### 5. 负载检查
|
||||||
|
|
||||||
|
```typescript
|
||||||
|
private maximumLoad() {
|
||||||
|
return (
|
||||||
|
this.processingItemCount >= KnowledgeService.MAXIMUM_PROCESSING_ITEM_COUNT ||
|
||||||
|
this.workload >= KnowledgeService.MAXIMUM_WORKLOAD
|
||||||
|
)
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
这个方法检查当前是否已达到最大负载,通过两个条件:
|
||||||
|
|
||||||
|
- 处理项目数量达到上限(30个)
|
||||||
|
- 总工作负载达到上限(80MB)
|
||||||
|
|
||||||
|
### 6. 任务添加与执行流程
|
||||||
|
|
||||||
|
当添加新任务时,流程如下:
|
||||||
|
|
||||||
|
1. 创建任务(根据类型不同创建不同的任务)
|
||||||
|
2. 通过 `appendProcessingQueue` 将任务添加到队列
|
||||||
|
3. 调用 `processingQueueHandle` 开始处理队列中的任务
|
||||||
|
|
||||||
|
```typescript
|
||||||
|
private appendProcessingQueue(task: LoaderTask): Promise<LoaderReturn> {
|
||||||
|
return new Promise((resolve) => {
|
||||||
|
this.knowledgeItemProcessingQueueMappingPromise.set(loaderTaskIntoOfSet(task), () => {
|
||||||
|
resolve(task.loaderDoneReturn!)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
## 并发控制的优势
|
||||||
|
|
||||||
|
这种并发控制机制有几个重要优势:
|
||||||
|
|
||||||
|
1. **资源使用优化**:通过限制同时处理的项目数量和总工作负载,避免系统资源过度使用
|
||||||
|
2. **自动调节**:当任务完成时,会自动从队列中获取新任务,保持资源的高效利用
|
||||||
|
3. **灵活性**:不同类型的任务有不同的工作负载评估,更准确地反映实际资源需求
|
||||||
|
4. **可靠性**:通过状态管理和Promise解析机制,确保任务正确完成并通知调用者
|
||||||
|
|
||||||
|
## 实际应用场景
|
||||||
|
|
||||||
|
这种并发控制在处理大量数据时特别有用,例如:
|
||||||
|
|
||||||
|
- 导入大型目录时,可能包含数百个文件
|
||||||
|
- 处理大型网站地图,可能包含大量URL
|
||||||
|
- 处理多个用户同时添加知识库项目的请求
|
||||||
|
|
||||||
|
通过这种机制,系统可以平滑地处理大量请求,避免资源耗尽,同时保持良好的响应性。
|
||||||
|
|
||||||
|
总结来说,KnowledgeService 实现了一个复杂而高效的任务队列系统,通过精确控制并发处理的项目数量和工作负载,确保系统在处理大量数据时保持稳定和高效。
|
||||||
@ -1,3 +1,18 @@
|
|||||||
|
/**
|
||||||
|
* Knowledge Service - Manages knowledge bases using RAG (Retrieval-Augmented Generation)
|
||||||
|
*
|
||||||
|
* This service handles creation, management, and querying of knowledge bases from various sources
|
||||||
|
* including files, directories, URLs, sitemaps, and notes.
|
||||||
|
*
|
||||||
|
* Features:
|
||||||
|
* - Concurrent task processing with workload management
|
||||||
|
* - Multiple data source support
|
||||||
|
* - Vector database integration
|
||||||
|
*
|
||||||
|
* For detailed documentation, see:
|
||||||
|
* @see {@link ../../../docs/technical/KnowledgeService.md}
|
||||||
|
*/
|
||||||
|
|
||||||
import * as fs from 'node:fs'
|
import * as fs from 'node:fs'
|
||||||
import path from 'node:path'
|
import path from 'node:path'
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user