# Multi-Agent Crew 编排与运行监控 — 实施规划 > 基于设计文档:`2026-04-14-multi-agent-crew-orchestration-design.md` > 日期:2026-04-14 > 状态:待审核 > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. --- ## 目录 - [总览](#总览) - [Phase 0:准备工作](#phase-0准备工作) - [Phase 1:后端数据层(Entity + Migration)](#phase-1后端数据层entity--migration) - [Phase 2:后端核心引擎(编排器 + 委派工具)](#phase-2后端核心引擎编排器--委派工具) - [Phase 3:后端通信层(WebSocket + 触发 + 定时)](#phase-3后端通信层websocket--触发--定时) - [Phase 4:前端编排页](#phase-4前端编排页) - [Phase 5:前端监控页](#phase-5前端监控页) - [Phase 6:权限集成 + 端到端测试](#phase-6权限集成--端到端测试) - [风险应对与降级方案](#风险应对与降级方案) - [验收标准](#验收标准) --- ## 总览 ### 实施策略 采用 **纵向切片、逐层推进** 策略。每个 Phase 完成后都有可验证的产出,允许中途停下来评审。 ``` Phase 0 ─ 准备(依赖验证、技术验证、代码骨架) │ Phase 1 ─ 后端数据层(4 个 Entity + agent 扩展字段) │ Phase 2 ─ 后端核心引擎(CrewOrchestrator + CrewDelegate + 3 个工具) │ Phase 3 ─ 后端通信层(CrewGateway + CrewTrigger + CrewScheduler) │ Phase 4 ─ 前端编排页(画布 + 侧栏 + 属性面板) │ Phase 5 ─ 前端监控页(运行列表 + 实时画布 + 时间线 + 日志) │ Phase 6 ─ 权限集成 + 端到端测试 ``` ### 关键约定 | 约定 | 说明 | |------|------| | **Entity 继承** | 全部继承 `BaseEntity`(自带 id/createTime/updateTime/tenantId) | | **Controller 风格** | 使用 `@CoolController` 自动 CRUD,复杂接口手写 | | **文件命名** | 下划线法(如 `crew_agent.ts`) | | **字段命名** | 驼峰法(如 `masterAgentId`) | | **WebSocket** | 独立 `/crew` 命名空间,不侵入现有 `/netaclaw` | | **子 Agent 执行** | 直接调用 `runAgent()` 纯函数,零 runtime 改动 | | **注释语言** | 中文 | ### 文件影响矩阵 ``` packages/backend/src/modules/netaclaw/ ├── entity/ ← Phase 1: 新增 4 个文件 │ ├── crew.ts [新增] │ ├── crew_agent.ts [新增] │ ├── crew_run.ts [新增] │ ├── crew_task.ts [新增] │ └── agent.ts [修改: +isCrewMaster 字段] ├── controller/ │ └── admin/ │ ├── crew.ts [新增] Phase 3 │ ├── crew_run.ts [新增] Phase 3 │ └── crew_trigger.ts [新增] Phase 3 ├── gateway/ │ └── crew_server.ts [新增] Phase 3 ├── service/ │ ├── crew.ts [新增] Phase 2 │ ├── crew_orchestrator.ts [新增] Phase 2 │ ├── crew_delegate.ts [新增] Phase 2 │ └── crew_scheduler.ts [新增] Phase 3 ├── tools/ │ ├── builtin/ │ │ ├── delegate_task.ts [新增] Phase 2 │ │ ├── delegate_parallel.ts [新增] Phase 2 │ │ └── escalate.ts [新增] Phase 2 └── config.ts [修改: 注册新 Entity] Phase 1 packages/frontend/src/modules/agent/ ├── config.ts [修改: +2 路由] Phase 4 ├── views/ │ ├── crew-editor.vue [新增] Phase 4 │ └── crew-monitor.vue [新增] Phase 5 ├── components/crew/ │ ├── crew-canvas.vue [新增] Phase 4 │ ├── crew-agent-node.vue [新增] Phase 4 │ ├── crew-edge-label.vue [新增] Phase 4 │ ├── crew-sidebar.vue [新增] Phase 4 │ ├── crew-property-panel.vue [新增] Phase 4 │ ├── crew-trigger-config.vue [新增] Phase 4 │ ├── crew-context-menu.vue [新增] Phase 4 │ ├── crew-run-table.vue [新增] Phase 5 │ ├── crew-run-detail.vue [新增] Phase 5 │ ├── crew-timeline.vue [新增] Phase 5 │ └── crew-log-panel.vue [新增] Phase 5 ├── hooks/ │ ├── crew-canvas.ts [新增] Phase 4 │ ├── crew-orchestration.ts [新增] Phase 4 │ └── crew-monitor.ts [新增] Phase 5 └── store/ └── crew.ts [新增] Phase 4 ``` --- ## Phase 0:准备工作 > 目标:搭建开发环境,确认依赖可用,创建代码骨架 > 预估:0.5 天 ### Step 0.1:确认前端依赖可用 项目已安装但未实际使用的关键依赖,需验证版本兼容: ```bash cd packages/frontend # 检查 @vue-flow 系列 pnpm list @vue-flow/core @vue-flow/minimap @vue-flow/controls @vue-flow/background # 检查 elkjs(自动布局) pnpm list elkjs ``` **预期结果**: - `@vue-flow/core` ≥ 1.42.1 ✓ - `@vue-flow/minimap`、`@vue-flow/controls`、`@vue-flow/background` 已安装 ✓ - `elkjs` ≥ 0.9.3 ✓ **如果缺失**: ```bash pnpm add @vue-flow/core @vue-flow/minimap @vue-flow/controls @vue-flow/background elkjs ``` ### Step 0.2:确认后端依赖 ```bash cd packages/backend # cron 包(定时调度用) pnpm list cron ``` **如果缺失**: ```bash pnpm add cron pnpm add -D @types/cron ``` ### Step 0.3:Vue Flow 技术验证(⚠️ 关键风险点) > 这是本项目首次实际使用 Vue Flow。在正式开发前,先用最小 demo 验证核心功能。 创建临时验证文件 `packages/frontend/src/modules/agent/views/__test-flow.vue`: ```vue ``` **验证清单**: - [ ] 画布正常渲染,节点可见 - [ ] 节点可自由拖拽,连线跟随 - [ ] MiniMap、Controls、Background 正常显示 - [ ] 自定义节点(slot 方式)可渲染 - [ ] 连线可通过 Handle 创建 **通过后删除测试文件**,正式进入 Phase 1。 ### Step 0.4:创建后端模块骨架目录 ```bash cd packages/backend/src/modules/netaclaw # 确认以下目录存在 ls entity/ ls controller/admin/ ls service/ ls gateway/ ls tools/builtin/ ``` ### Phase 0 验收 - [ ] 前端 Vue Flow 依赖可用且 demo 验证通过 - [ ] 后端 cron 依赖可用 - [ ] 目录结构就绪 --- ## Phase 1:后端数据层(Entity + Migration) > 目标:建立 4 个新表 + 扩展 agent 表,TypeORM synchronize 自动建表 > 预估:1 天 > 依赖:Phase 0 完成 ### Step 1.1:创建 `crew.ts` Entity **文件**:`packages/backend/src/modules/netaclaw/entity/crew.ts` ```typescript import { BaseEntity } from '../../base/entity/base.js'; import { Column, Entity, Index } from 'typeorm'; /** * Agent 集群(Crew) */ @Entity('netaclaw_crew') export class NetaClawCrewEntity extends BaseEntity { @Column({ comment: '集群唯一标识', length: 100, unique: true }) name: string; @Column({ comment: '显示名称', length: 200 }) label: string; @Column({ comment: '集群描述', type: 'text', nullable: true }) description: string; @Column({ comment: '图标', length: 500, nullable: true }) icon: string; @Index() @Column({ comment: '主 Agent ID', nullable: true }) masterAgentId: number; @Column({ type: 'json', comment: '画布布局数据', nullable: true }) canvasData: Record; @Column({ type: 'json', comment: '触发配置', nullable: true }) triggerConfig: { manual?: boolean; cron?: { enabled: boolean; expression: string; timezone: string }; webhook?: { enabled: boolean; secret: string }; api?: { enabled: boolean; apiKey: string }; }; @Column({ type: 'json', comment: '委派提示(从连线生成)', nullable: true }) delegateHints: { hints: string; edges: Array<{ from: string; to: string; type: string; note?: string }>; }; @Index() @Column({ comment: '状态: 0=草稿 1=已发布', default: 0 }) status: number; @Column({ comment: '最大并发子 Agent 数', default: 3 }) maxConcurrent: number; @Column({ comment: '单个子任务默认超时(秒)', default: 300 }) taskTimeout: number; @Column({ type: 'json', comment: '全局重试策略', nullable: true }) retryPolicy: { maxRetries: number; retryDelay: number }; } ``` **设计要点**: - `triggerConfig` / `delegateHints` / `retryPolicy` 使用 JSON 列,灵活扩展 - `masterAgentId` 不使用外键约束(遵循项目规范),通过应用层关联 - `status` 索引优化发布状态查询 ### Step 1.2:创建 `crew_agent.ts` Entity **文件**:`packages/backend/src/modules/netaclaw/entity/crew_agent.ts` ```typescript import { BaseEntity } from '../../base/entity/base.js'; import { Column, Entity, Index } from 'typeorm'; /** * 集群-Agent 关联(成员关系权威来源) */ @Entity('netaclaw_crew_agent') export class NetaClawCrewAgentEntity extends BaseEntity { @Index() @Column({ comment: '集群 ID' }) crewId: number; @Index() @Column({ comment: 'Agent ID' }) agentId: number; @Column({ comment: '该 Agent 在集群中的角色描述', type: 'text', nullable: true }) role: string; @Column({ type: 'json', comment: '画布位置', nullable: true }) canvasPosition: { x: number; y: number }; @Column({ comment: '分组名', length: 100, nullable: true }) groupName: string; } ``` **设计要点**: - `crewId + agentId` 双索引,支持高频关联查询 - `canvasPosition` 用 JSON 存画布坐标,前端序列化/反序列化 - 此表是成员关系的**权威来源**,`canvasData` 仅存渲染信息 ### Step 1.3:创建 `crew_run.ts` Entity **文件**:`packages/backend/src/modules/netaclaw/entity/crew_run.ts` ```typescript import { BaseEntity } from '../../base/entity/base.js'; import { Column, Entity, Index } from 'typeorm'; /** * 集群运行记录 */ @Entity('netaclaw_crew_run') export class NetaClawCrewRunEntity extends BaseEntity { @Index() @Column({ comment: '集群 ID' }) crewId: number; @Column({ comment: '触发方式', length: 20 }) triggerType: string; // manual / cron / webhook / api @Column({ comment: '触发输入参数', type: 'text', nullable: true }) triggerInput: string; @Index() @Column({ comment: '运行状态', length: 20, default: 'pending' }) status: string; // pending / running / paused / completed / failed @Column({ comment: '主 Agent 会话 ID', nullable: true }) masterSessionId: number; @Column({ comment: '开始时间', type: 'datetime', nullable: true }) startTime: Date; @Column({ comment: '结束时间', type: 'datetime', nullable: true }) endTime: Date; @Column({ type: 'json', comment: '运行结果摘要', nullable: true }) result: Record; @Column({ comment: '错误信息', type: 'text', nullable: true }) error: string; @Column({ type: 'json', comment: '升级暂停时保存的对话上下文', nullable: true }) pausedState: unknown; @Column({ type: 'json', comment: '累计 token 消耗', nullable: true }) tokenUsage: { inputTokens: number; outputTokens: number }; } ``` ### Step 1.4:创建 `crew_task.ts` Entity **文件**:`packages/backend/src/modules/netaclaw/entity/crew_task.ts` ```typescript import { BaseEntity } from '../../base/entity/base.js'; import { Column, Entity, Index } from 'typeorm'; /** * 子 Agent 任务记录 */ @Entity('netaclaw_crew_task') export class NetaClawCrewTaskEntity extends BaseEntity { @Index() @Column({ comment: '运行记录 ID' }) runId: number; @Index() @Column({ comment: '子 Agent ID' }) agentId: number; @Column({ comment: '任务描述', type: 'text' }) taskDescription: string; @Index() @Column({ comment: '任务状态', length: 20, default: 'pending' }) status: string; // pending / running / completed / failed / retrying @Column({ comment: '子 Agent 会话 ID', nullable: true }) sessionId: number; @Column({ comment: '开始时间', type: 'datetime', nullable: true }) startTime: Date; @Column({ comment: '结束时间', type: 'datetime', nullable: true }) endTime: Date; @Column({ type: 'json', comment: '执行结果', nullable: true }) result: Record; @Column({ comment: '错误信息', type: 'text', nullable: true }) error: string; @Column({ comment: '已重试次数', default: 0 }) retryCount: number; @Column({ comment: '该任务超时(秒),空则使用集群默认', nullable: true }) timeout: number; @Column({ type: 'json', comment: '该任务 token 消耗', nullable: true }) tokenUsage: { inputTokens: number; outputTokens: number }; @Column({ comment: '父任务 ID(支持嵌套委派)', nullable: true }) parentTaskId: number; } ``` ### Step 1.5:扩展 `agent.ts` Entity **文件**:`packages/backend/src/modules/netaclaw/entity/agent.ts` **操作**:在现有 `status` 字段后新增一个字段 ```typescript // 在 status 字段之后添加: @Column({ comment: '是否可作为集群主 Agent', default: 0 }) isCrewMaster: number; // 0=否 1=是(UI 筛选用) ``` > 这是唯一一处修改现有文件的地方,风险极低(仅新增列,不改现有字段)。 ### Step 1.6:注册 Entity 到模块配置 **文件**:`packages/backend/src/modules/netaclaw/config.ts` 在现有 Entity 导入列表中追加: ```typescript import { NetaClawCrewEntity } from './entity/crew.js'; import { NetaClawCrewAgentEntity } from './entity/crew_agent.js'; import { NetaClawCrewRunEntity } from './entity/crew_run.js'; import { NetaClawCrewTaskEntity } from './entity/crew_task.js'; // 在 typeorm.entity 数组中追加: NetaClawCrewEntity, NetaClawCrewAgentEntity, NetaClawCrewRunEntity, NetaClawCrewTaskEntity, ``` ### Step 1.7:验证建表 ```bash cd packages/backend pnpm dev # 启动开发服务器,TypeORM synchronize=true 自动建表 ``` **验证清单**: - [ ] 服务启动无报错 - [ ] MySQL 中新增 4 张表:`netaclaw_crew`、`netaclaw_crew_agent`、`netaclaw_crew_run`、`netaclaw_crew_task` - [ ] `netaclaw_agent` 表新增 `isCrewMaster` 字段 - [ ] 所有索引正确创建 - [ ] JSON 列可正常读写(手动插入测试数据验证) ### Step 1.8:提交代码 ```bash git add packages/backend/src/modules/netaclaw/entity/crew*.ts git add packages/backend/src/modules/netaclaw/entity/agent.ts git add packages/backend/src/modules/netaclaw/config.ts git commit -m "feat(crew): 新增 Crew 编排数据模型 - 4 个 Entity + agent 扩展字段" ``` ### Phase 1 验收 - [ ] 4 个新 Entity 文件创建完成 - [ ] agent.ts 新增 isCrewMaster 字段 - [ ] config.ts 注册新 Entity - [ ] 自动建表成功,字段/索引/JSON 列均正确 - [ ] 代码已提交 --- ## Phase 2:后端核心引擎(编排器 + 委派工具) > 目标:实现 CrewOrchestrator 编排调度器 + CrewDelegate 委派执行器 + 3 个委派工具 > 预估:2.5 天 > 依赖:Phase 1 完成 > **本阶段是整个系统的核心,代码量最大,逻辑最复杂** ### Step 2.1:定义类型接口 **文件**:`packages/backend/src/modules/netaclaw/service/crew_types.ts` > 先定义所有跨模块共享的类型,避免循环引用。 ```typescript /** * Crew 编排系统共享类型定义 */ /** 委派执行结果 */ export interface DelegateResult { agent: string; status: 'completed' | 'failed'; result: string; error?: string; duration: string; tokenUsage: { inputTokens: number; outputTokens: number }; } /** 委派回调 */ export interface CrewCallbacks { /** 日志推送 */ onLog: (runId: number, taskId: number, agentName: string, level: string, message: string) => void; /** 任务状态变更推送 */ onTaskStatus: (runId: number, taskId: number, agentName: string, status: string, result?: any, error?: string) => void; /** 运行状态变更推送 */ onRunStatus: (runId: number, status: string, progress?: string) => void; /** 升级推送 */ onEscalation: (runId: number, taskId: number | undefined, reason: string, error?: string) => void; } /** 并行委派任务项 */ export interface ParallelTaskItem { agent_name: string; task_description: string; context?: string; } /** 集群运行上下文(编排器运行期间的内存状态) */ export interface CrewRunContext { runId: number; crewId: number; masterAgentId: number; memberAgents: Array<{ id: number; name: string; label: string; role: string; agent: any; // NetaClawAgentEntity }>; maxConcurrent: number; taskTimeout: number; callbacks: CrewCallbacks; /** 获取当前主 Agent 对话历史(escalate 持久化用) */ getConversation?: () => any[]; } ``` ### Step 2.2:实现 CrewDelegate(委派执行器) **文件**:`packages/backend/src/modules/netaclaw/service/crew_delegate.ts` **职责**:执行子 Agent 任务的核心函数,直接调用 `runAgent()`。 ```typescript import { Provide, Inject, Logger } from '@midwayjs/core'; import { ILogger } from '@midwayjs/logger'; import { InjectEntityModel } from '@midwayjs/typeorm'; import { Repository } from 'typeorm'; import { NetaClawCrewTaskEntity } from '../entity/crew_task.js'; import { NetaClawAgentEntity } from '../entity/agent.js'; import { runAgent, AgentConfig } from '../runtime/agent.js'; import { AnyAgentTool } from '../tools/common.js'; import { DelegateResult, CrewCallbacks } from './crew_types.js'; import { SkillLoaderService } from '../service/skill_loader.js'; import { buildSkillContext } from '../service/skill_context.js'; // 导入内置工具 import { bashTool } from '../tools/builtin/bash.js'; import { readFileTool, writeFileTool, listDirTool } from '../tools/builtin/file.js'; /** 委派相关工具名,子 Agent 不可使用(防止无限嵌套) */ const CREW_TOOLS = ['delegate_task', 'delegate_parallel', 'escalate']; @Provide() export class CrewDelegateService { @Logger() logger: ILogger; @InjectEntityModel(NetaClawCrewTaskEntity) crewTaskRepo: Repository; @Inject() skillLoader: SkillLoaderService; /** * 执行单个子 Agent 任务 */ async executeSubAgent( agent: NetaClawAgentEntity, taskDescription: string, context: string | undefined, runId: number, taskTimeout: number, callbacks: CrewCallbacks ): Promise { const startTime = Date.now(); // 1. 创建 crew_task 记录 const task = await this.crewTaskRepo.save({ runId, agentId: agent.id, taskDescription, status: 'running', startTime: new Date(), } as any); callbacks.onTaskStatus(runId, task.id, agent.name, 'running'); try { // 2. 加载子 Agent 的 Skill 上下文(继承 Agent 自身配置的 skills) let skillSystemPrompt = ''; if (agent.skills?.length) { const skillContext = await buildSkillContext(this.skillLoader, agent.skills); skillSystemPrompt = skillContext ? `\n\n${skillContext}` : ''; } // 3. 构建 Agent 配置 const agentConfig: AgentConfig = { name: agent.name, systemPrompt: `${agent.systemPrompt || ''}${skillSystemPrompt}\n\n## 当前任务\n${taskDescription}`, model: agent.modelConfig?.modelId || 'anthropic:claude-sonnet-4-20250514', apiKey: agent.modelConfig?.apiKey || '', baseUrl: agent.modelConfig?.apiUrl, skills: agent.skills || [], }; // 4. 加载子 Agent 工具集(内置工具 + Skill 工具,移除委派工具) const builtinTools: AnyAgentTool[] = [bashTool, readFileTool, writeFileTool, listDirTool]; const skillTools = await this.skillLoader.loadToolsForAgent(agent); const tools = [...builtinTools, ...skillTools].filter(t => !CREW_TOOLS.includes(t.name)); // 5. 构建用户消息 const userMessage = context ? `任务:${taskDescription}\n\n上下文:${context}` : taskDescription; // 6. 带超时调用 runAgent() const timeout = taskTimeout * 1000; const result = await Promise.race([ runAgent({ agentConfig, tools, userMessage, history: [], // 空历史 = 独立会话 onToken: (text) => callbacks.onLog(runId, task.id, agent.name, 'info', text), onToolCall: (name) => callbacks.onLog(runId, task.id, agent.name, 'tool', `调用工具: ${name}`), }), new Promise((_, reject) => setTimeout(() => reject(new Error(`子 Agent "${agent.name}" 执行超时 (${taskTimeout}s)`)), timeout) ), ]) as any; // 7. 更新 crew_task 为完成 const duration = `${((Date.now() - startTime) / 1000).toFixed(1)}s`; await this.crewTaskRepo.update(task.id, { status: 'completed', result: { text: result.finalContent }, tokenUsage: result.usage, endTime: new Date(), } as any); const delegateResult: DelegateResult = { agent: agent.name, status: 'completed', result: result.finalContent, duration, tokenUsage: result.usage, }; callbacks.onTaskStatus(runId, task.id, agent.name, 'completed', delegateResult); return delegateResult; } catch (err: any) { // 8. 失败处理 const duration = `${((Date.now() - startTime) / 1000).toFixed(1)}s`; await this.crewTaskRepo.update(task.id, { status: 'failed', error: err.message, endTime: new Date(), } as any); const delegateResult: DelegateResult = { agent: agent.name, status: 'failed', result: '', error: err.message, duration, tokenUsage: { inputTokens: 0, outputTokens: 0 }, }; callbacks.onTaskStatus(runId, task.id, agent.name, 'failed', undefined, err.message); return delegateResult; } } /** * 并行执行多个子 Agent 任务(受 maxConcurrent 限制) */ async executeParallel( tasks: Array<{ agent: NetaClawAgentEntity; taskDescription: string; context?: string }>, runId: number, maxConcurrent: number, taskTimeout: number, callbacks: CrewCallbacks ): Promise { // 简单的并发限制:分批执行 const results: DelegateResult[] = []; for (let i = 0; i < tasks.length; i += maxConcurrent) { const batch = tasks.slice(i, i + maxConcurrent); const batchResults = await Promise.all( batch.map(t => this.executeSubAgent(t.agent, t.taskDescription, t.context, runId, taskTimeout, callbacks)) ); results.push(...batchResults); } return results; } } ``` **设计要点**: - `runAgent()` 是纯函数调用,`history: []` 保证独立会话 - `Promise.race` 实现超时控制 - **动态加载子 Agent 的 skills/tools**(通过 SkillLoaderService),而非硬编码 - 移除 delegate/escalate 工具防止子 Agent 嵌套委派 - 并行执行通过分批 + `Promise.all` 实现,受 `maxConcurrent` 限制 ### Step 2.3:实现 3 个委派工具 #### 2.3.1 `delegate_task.ts`(串行委派) **文件**:`packages/backend/src/modules/netaclaw/tools/builtin/delegate_task.ts` ```typescript import { Type } from '@sinclair/typebox'; import type { AnyAgentTool } from '../common.js'; import type { CrewRunContext, DelegateResult } from '../../service/crew_types.js'; /** * 创建 delegate_task 工具实例 * 闭包绑定当前 crew 运行上下文,使主 Agent 可通过工具调用委派子 Agent */ export function createDelegateTaskTool(ctx: CrewRunContext): AnyAgentTool { return { name: 'delegate_task', label: '委派任务', description: '将任务委派给团队中的一个子 Agent 执行,等待执行完成后返回结果。', visibility: 'tool', parameters: Type.Object({ agent_name: Type.String({ description: '子 Agent 名称' }), task_description: Type.String({ description: '任务描述' }), context: Type.Optional(Type.String({ description: '上下文(前序任务结果等)' })), }), execute: async (_id: string, params: any): Promise => { const { agent_name, task_description, context } = params; // 查找目标子 Agent const member = ctx.memberAgents.find(m => m.name === agent_name); if (!member) { return JSON.stringify({ agent: agent_name, status: 'failed', error: `未找到名为 "${agent_name}" 的子 Agent。可用成员: ${ctx.memberAgents.map(m => m.name).join(', ')}`, }); } // 调用 CrewDelegate 执行 // 注意:ctx._delegate 在 CrewOrchestrator 构建时注入 const result: DelegateResult = await (ctx as any)._delegate.executeSubAgent( member.agent, task_description, context, ctx.runId, ctx.taskTimeout, ctx.callbacks, ); return JSON.stringify(result); }, }; } ``` #### 2.3.2 `delegate_parallel.ts`(并行委派) **文件**:`packages/backend/src/modules/netaclaw/tools/builtin/delegate_parallel.ts` ```typescript import { Type } from '@sinclair/typebox'; import type { AnyAgentTool } from '../common.js'; import type { CrewRunContext, DelegateResult } from '../../service/crew_types.js'; /** * 创建 delegate_parallel 工具实例 */ export function createDelegateParallelTool(ctx: CrewRunContext): AnyAgentTool { return { name: 'delegate_parallel', label: '并行委派', description: '将多个任务同时委派给不同的子 Agent 并行执行,全部完成后返回所有结果。', visibility: 'tool', parameters: Type.Object({ tasks: Type.Array( Type.Object({ agent_name: Type.String({ description: '子 Agent 名称' }), task_description: Type.String({ description: '任务描述' }), context: Type.Optional(Type.String({ description: '上下文' })), }), { description: '要并行执行的任务列表' } ), }), execute: async (_id: string, params: any): Promise => { const { tasks } = params; // 解析并验证所有目标 Agent const resolvedTasks = []; for (const t of tasks) { const member = ctx.memberAgents.find(m => m.name === t.agent_name); if (!member) { return JSON.stringify([{ agent: t.agent_name, status: 'failed', error: `未找到名为 "${t.agent_name}" 的子 Agent`, }]); } resolvedTasks.push({ agent: member.agent, taskDescription: t.task_description, context: t.context, }); } // 并行执行 const results: DelegateResult[] = await (ctx as any)._delegate.executeParallel( resolvedTasks, ctx.runId, ctx.maxConcurrent, ctx.taskTimeout, ctx.callbacks, ); return JSON.stringify(results); }, }; } ``` #### 2.3.3 `escalate.ts`(升级人工) **文件**:`packages/backend/src/modules/netaclaw/tools/builtin/escalate.ts` ```typescript import { Type } from '@sinclair/typebox'; import type { AnyAgentTool } from '../common.js'; import type { CrewRunContext } from '../../service/crew_types.js'; import { InjectEntityModel } from '@midwayjs/typeorm'; import { Repository } from 'typeorm'; import { NetaClawCrewRunEntity } from '../../entity/crew_run.js'; /** * 内存中保存 escalate 的 resolve 回调(用于暂停/恢复) * key: runId, value: resolve 函数 */ export const escalateResolvers = new Map void>(); /** * 创建 escalate 工具实例 * 需要传入 crewRunRepo 以持久化 pausedState */ export function createEscalateTool( ctx: CrewRunContext, crewRunRepo: Repository ): AnyAgentTool { return { name: 'escalate', label: '升级人工', description: '当你无法处理某个问题时,升级给人工处理。集群将暂停等待人工介入。', visibility: 'tool', parameters: Type.Object({ reason: Type.String({ description: '升级原因' }), failed_task: Type.Optional(Type.String({ description: '失败的任务描述' })), }), execute: async (_id: string, params: any): Promise => { const { reason, failed_task } = params; // 1. 持久化当前对话上下文到 pausedState(进程重启恢复用) const conversation = ctx.getConversation?.() || []; await crewRunRepo.update(ctx.runId, { status: 'paused', pausedState: JSON.stringify(conversation), } as any); // 2. 推送升级事件 + 运行状态变更 ctx.callbacks.onEscalation(ctx.runId, undefined, reason, failed_task); ctx.callbacks.onRunStatus(ctx.runId, 'paused'); // 3. 返回不 resolve 的 Promise,阻塞 runAttempt 循环 // 用户在监控页点击"恢复"时,通过 escalateResolvers 回调 resolve return new Promise((resolve) => { escalateResolvers.set(ctx.runId, (userMessage: string) => { resolve(JSON.stringify({ status: 'resumed', userMessage, instruction: '用户已提供处理意见,请根据用户意见继续执行。', })); }); }); }, }; } ``` **设计要点**: - `escalateResolvers` 内存 Map 是暂停/恢复的核心机制 - `execute()` 返回的 Promise 不立即 resolve → `runAttempt` 阻塞在该 tool_call - 恢复时通过 Map 取出 resolve 回调,注入用户意见作为 tool_result - 进程重启恢复在 Phase 3 的 CrewScheduler 中处理 ### Step 2.4:实现 CrewOrchestrator(核心编排器) **文件**:`packages/backend/src/modules/netaclaw/service/crew_orchestrator.ts` **这是整个系统最核心的文件**,负责: 1. 加载集群配置 2. 构建主 Agent 增强 system prompt 3. 注入委派工具 4. 启动主 Agent ReAct 循环 5. 管理运行生命周期 ```typescript import { Provide, Inject, Logger } from '@midwayjs/core'; import { ILogger } from '@midwayjs/logger'; import { InjectEntityModel } from '@midwayjs/typeorm'; import { Repository } from 'typeorm'; import { NetaClawCrewEntity } from '../entity/crew.js'; import { NetaClawCrewAgentEntity } from '../entity/crew_agent.js'; import { NetaClawCrewRunEntity } from '../entity/crew_run.js'; import { NetaClawCrewTaskEntity } from '../entity/crew_task.js'; import { NetaClawAgentEntity } from '../entity/agent.js'; import { runAgent, AgentConfig } from '../runtime/agent.js'; import { AnyAgentTool } from '../tools/common.js'; import { CrewDelegateService } from './crew_delegate.js'; import { CrewRunContext, CrewCallbacks } from './crew_types.js'; import { createDelegateTaskTool } from '../tools/builtin/delegate_task.js'; import { createDelegateParallelTool } from '../tools/builtin/delegate_parallel.js'; import { createEscalateTool } from '../tools/builtin/escalate.js'; import { bashTool } from '../tools/builtin/bash.js'; import { readFileTool, writeFileTool, listDirTool } from '../tools/builtin/file.js'; @Provide() export class CrewOrchestratorService { @Logger() logger: ILogger; @InjectEntityModel(NetaClawCrewEntity) crewRepo: Repository; @InjectEntityModel(NetaClawCrewAgentEntity) crewAgentRepo: Repository; @InjectEntityModel(NetaClawCrewRunEntity) crewRunRepo: Repository; @InjectEntityModel(NetaClawCrewTaskEntity) crewTaskRepo: Repository; @InjectEntityModel(NetaClawAgentEntity) agentRepo: Repository; @Inject() crewDelegate: CrewDelegateService; /** * 启动一次集群运行 */ async start( crewId: number, triggerType: string, triggerInput: string, callbacks: CrewCallbacks ): Promise { // 1. 加载集群配置 const crew = await this.crewRepo.findOne({ where: { id: crewId } }); if (!crew) throw new Error(`集群 ${crewId} 不存在`); if (crew.status !== 1) throw new Error(`集群 ${crewId} 未发布`); // 2. 加载主 Agent const masterAgent = await this.agentRepo.findOne({ where: { id: crew.masterAgentId } }); if (!masterAgent) throw new Error(`主 Agent ${crew.masterAgentId} 不存在`); // 3. 加载集群成员 const crewAgents = await this.crewAgentRepo.find({ where: { crewId } }); const memberAgents = []; for (const ca of crewAgents) { if (ca.agentId === crew.masterAgentId) continue; // 排除主 Agent 自身 const agent = await this.agentRepo.findOne({ where: { id: ca.agentId } }); if (agent) { memberAgents.push({ id: agent.id, name: agent.name, label: agent.label, role: ca.role || agent.description || '', agent, }); } } // 4. 创建运行记录 const run = await this.crewRunRepo.save({ crewId, triggerType, triggerInput, status: 'running', startTime: new Date(), tokenUsage: { inputTokens: 0, outputTokens: 0 }, } as any); callbacks.onRunStatus(run.id, 'running', `0/${memberAgents.length}`); // 5. 构建运行上下文 const ctx: CrewRunContext = { runId: run.id, crewId, masterAgentId: crew.masterAgentId, memberAgents, maxConcurrent: crew.maxConcurrent || 3, taskTimeout: crew.taskTimeout || 300, callbacks, // getConversation 在 runOrchestration 中注入(需要闭包引用 runAgent 的内部状态) }; // 注入 delegate 服务引用 (ctx as any)._delegate = this.crewDelegate; // 6. 异步执行编排(不阻塞调用方) this.runOrchestration(crew, masterAgent, ctx, triggerInput, callbacks) .catch(err => { this.logger.error(`集群 ${crewId} 运行 ${run.id} 异常:`, err); }); return run.id; } /** * 核心编排执行逻辑 */ private async runOrchestration( crew: NetaClawCrewEntity, masterAgent: NetaClawAgentEntity, ctx: CrewRunContext, triggerInput: string, callbacks: CrewCallbacks ): Promise { try { // 1. 构建增强 system prompt const enhancedPrompt = this.buildEnhancedPrompt(masterAgent, ctx, crew); // 2. 构建主 Agent 配置 const agentConfig: AgentConfig = { name: masterAgent.name, systemPrompt: enhancedPrompt, model: masterAgent.modelConfig?.modelId || 'anthropic:claude-sonnet-4-20250514', apiKey: masterAgent.modelConfig?.apiKey || '', baseUrl: masterAgent.modelConfig?.apiUrl, maxToolRounds: 50, // 主 Agent 可能需要多轮委派 }; // 3. 用闭包捕获对话历史引用(供 escalate 持久化用) // runAgent 内部会构建 messages 数组,我们通过 history 参数的引用来追踪 const conversationTracker: any[] = []; ctx.getConversation = () => conversationTracker; // 4. 构建主 Agent 工具集(基础工具 + 委派工具) const tools: AnyAgentTool[] = [ bashTool, readFileTool, writeFileTool, listDirTool, createDelegateTaskTool(ctx), createDelegateParallelTool(ctx), createEscalateTool(ctx, this.crewRunRepo), // 传入 repo 以持久化 pausedState ]; // 5. 执行主 Agent ReAct 循环 const result = await runAgent({ agentConfig, tools, userMessage: triggerInput || '请根据团队成员和调度建议,开始执行任务。', history: [], onToken: (text) => { callbacks.onLog(ctx.runId, 0, masterAgent.name, 'info', text); // 追踪对话内容(简化版,完整版需在 runAttempt 层获取) conversationTracker.push({ role: 'assistant_token', content: text }); }, onToolCall: (name, args) => callbacks.onLog(ctx.runId, 0, masterAgent.name, 'tool', `调用 ${name}`), }); // 6. 计算累计 Token(主 Agent + 所有子 Agent 任务) const tasks = await this.crewTaskRepo.find({ where: { runId: ctx.runId } }); const totalTokens = tasks.reduce((acc, t) => ({ inputTokens: acc.inputTokens + (t.tokenUsage?.inputTokens || 0), outputTokens: acc.outputTokens + (t.tokenUsage?.outputTokens || 0), }), { inputTokens: result.usage.inputTokens, outputTokens: result.usage.outputTokens, }); // 7. 更新运行记录为完成 await this.crewRunRepo.update(ctx.runId, { status: 'completed', result: { summary: result.finalContent }, tokenUsage: totalTokens, endTime: new Date(), } as any); callbacks.onRunStatus(ctx.runId, 'completed'); } catch (err: any) { // 失败处理 await this.crewRunRepo.update(ctx.runId, { status: 'failed', error: err.message, endTime: new Date(), } as any); callbacks.onRunStatus(ctx.runId, 'failed'); this.logger.error(`集群运行 ${ctx.runId} 失败:`, err); } } /** * 构建主 Agent 增强 system prompt */ private buildEnhancedPrompt( masterAgent: NetaClawAgentEntity, ctx: CrewRunContext, crew: NetaClawCrewEntity ): string { let prompt = masterAgent.systemPrompt || ''; // 追加团队成员信息 prompt += '\n\n## 团队成员\n'; prompt += '你是团队的主管 Agent,负责将任务分配给以下子 Agent 执行:\n\n'; for (const member of ctx.memberAgents) { prompt += `- **${member.name}**(${member.label}): ${member.role}\n`; } // 追加调度建议(如有连线提示) if (crew.delegateHints?.hints) { prompt += `\n## 调度建议\n${crew.delegateHints.hints}\n`; } // 追加工具使用说明 prompt += `\n## 工具说明\n`; prompt += `- 使用 \`delegate_task\` 将任务委派给一个子 Agent 串行执行\n`; prompt += `- 使用 \`delegate_parallel\` 将多个独立任务同时委派给不同子 Agent 并行执行\n`; prompt += `- 当遇到无法解决的问题时,使用 \`escalate\` 升级给人工处理\n`; prompt += `- 根据任务之间的依赖关系决定串行还是并行执行\n`; prompt += `- 收到子 Agent 的执行结果后,分析结果并决定下一步行动\n`; return prompt; } /** * 终止运行 */ async stop(runId: number): Promise { await this.crewRunRepo.update(runId, { status: 'failed', error: '用户手动终止', endTime: new Date(), } as any); } /** * 暂停运行(由 escalate 工具自动触发,此方法用于手动暂停) */ async pause(runId: number): Promise { await this.crewRunRepo.update(runId, { status: 'paused', } as any); } /** * 从 pausedState 恢复运行(进程重启后使用) */ async resumeFromPausedState( runId: number, pausedHistory: any[], userMessage: string, callbacks: CrewCallbacks ): Promise { const run = await this.crewRunRepo.findOne({ where: { id: runId } }); if (!run) throw new Error(`运行 ${runId} 不存在`); const crew = await this.crewRepo.findOne({ where: { id: run.crewId } }); if (!crew) throw new Error(`集群 ${run.crewId} 不存在`); const masterAgent = await this.agentRepo.findOne({ where: { id: crew.masterAgentId } }); if (!masterAgent) throw new Error(`主 Agent 不存在`); // 更新状态为 running await this.crewRunRepo.update(runId, { status: 'running' } as any); // 重新构建上下文并继续执行(将用户恢复消息追加到历史中) // 简化实现:以用户消息重新触发,历史已持久化 const crewAgents = await this.crewAgentRepo.find({ where: { crewId: crew.id } }); const memberAgents = []; for (const ca of crewAgents) { if (ca.agentId === crew.masterAgentId) continue; const agent = await this.agentRepo.findOne({ where: { id: ca.agentId } }); if (agent) { memberAgents.push({ id: agent.id, name: agent.name, label: agent.label, role: ca.role || '', agent }); } } const ctx: CrewRunContext = { runId, crewId: crew.id, masterAgentId: crew.masterAgentId, memberAgents, maxConcurrent: crew.maxConcurrent || 3, taskTimeout: crew.taskTimeout || 300, callbacks, }; (ctx as any)._delegate = this.crewDelegate; // 用恢复消息作为新一轮 triggerInput this.runOrchestration(crew, masterAgent, ctx, `用户恢复指令: ${userMessage}`, callbacks) .catch(err => this.logger.error(`恢复运行 ${runId} 失败:`, err)); } } ``` ### Step 2.5:实现 CrewService(集群 CRUD 业务层) **文件**:`packages/backend/src/modules/netaclaw/service/crew.ts` ```typescript import { Provide, Inject } from '@midwayjs/core'; import { InjectEntityModel } from '@midwayjs/typeorm'; import { Repository, In } from 'typeorm'; import { NetaClawCrewEntity } from '../entity/crew.js'; import { NetaClawCrewAgentEntity } from '../entity/crew_agent.js'; import { NetaClawAgentEntity } from '../entity/agent.js'; @Provide() export class CrewService { @InjectEntityModel(NetaClawCrewEntity) crewRepo: Repository; @InjectEntityModel(NetaClawCrewAgentEntity) crewAgentRepo: Repository; @InjectEntityModel(NetaClawAgentEntity) agentRepo: Repository; /** * 保存画布数据 + 同步成员关系 */ async saveCanvas(crewId: number, data: { canvasData: any; members: Array<{ agentId: number; role?: string; canvasPosition?: any; groupName?: string }>; delegateHints?: any; }): Promise { // 1. 更新画布数据 await this.crewRepo.update(crewId, { canvasData: data.canvasData, delegateHints: data.delegateHints, } as any); // 2. 同步成员关系(先删后插,简单可靠) await this.crewAgentRepo.delete({ crewId }); if (data.members?.length) { const entities = data.members.map(m => ({ crewId, agentId: m.agentId, role: m.role || '', canvasPosition: m.canvasPosition, groupName: m.groupName || '', })); await this.crewAgentRepo.save(entities as any[]); } } /** * 获取集群详情(含成员信息) */ async getDetail(crewId: number): Promise { const crew = await this.crewRepo.findOne({ where: { id: crewId } }); if (!crew) return null; const crewAgents = await this.crewAgentRepo.find({ where: { crewId } }); const agentIds = crewAgents.map(ca => ca.agentId); const agents = agentIds.length ? await this.agentRepo.find({ where: { id: In(agentIds) } }) : []; const members = crewAgents.map(ca => { const agent = agents.find(a => a.id === ca.agentId); return { ...ca, agentName: agent?.name, agentLabel: agent?.label, agentIcon: agent?.icon, agentDescription: agent?.description, }; }); return { ...crew, members }; } /** * 发布集群 */ async publish(crewId: number): Promise { const crew = await this.crewRepo.findOne({ where: { id: crewId } }); if (!crew) throw new Error('集群不存在'); if (!crew.masterAgentId) throw new Error('请先设置主 Agent'); const members = await this.crewAgentRepo.find({ where: { crewId } }); if (members.length < 2) throw new Error('集群至少需要 2 个成员(含主 Agent)'); await this.crewRepo.update(crewId, { status: 1 } as any); } /** * 取消发布 */ async unpublish(crewId: number): Promise { await this.crewRepo.update(crewId, { status: 0 } as any); } } ``` ### Step 2.6:验证核心引擎 此阶段暂不实现 Controller 和 WebSocket(Phase 3),但可通过单元测试或临时脚本验证: **验证方式**:编写一个临时测试服务,在后端启动后通过 API 手动调用 ```typescript // 临时测试(可在 service 中加一个 test 方法) // 1. 创建一个 crew 记录 // 2. 创建 crew_agent 记录 // 3. 调用 orchestrator.start() // 4. 检查 crew_run 和 crew_task 表是否正确记录 ``` **验证清单**: - [ ] CrewDelegateService 可正确调用 runAgent() 执行子 Agent - [ ] delegate_task 工具可找到目标 Agent 并返回结果 - [ ] delegate_parallel 工具可并行执行多个子 Agent - [ ] escalate 工具可阻塞 ReAct 循环 - [ ] CrewOrchestrator 可完成完整的编排流程 - [ ] crew_run / crew_task 记录正确写入数据库 - [ ] token 消耗正确统计 ### Step 2.7:提交代码 ```bash git add packages/backend/src/modules/netaclaw/service/crew_types.ts git add packages/backend/src/modules/netaclaw/service/crew_delegate.ts git add packages/backend/src/modules/netaclaw/service/crew_orchestrator.ts git add packages/backend/src/modules/netaclaw/service/crew.ts git add packages/backend/src/modules/netaclaw/tools/builtin/delegate_task.ts git add packages/backend/src/modules/netaclaw/tools/builtin/delegate_parallel.ts git add packages/backend/src/modules/netaclaw/tools/builtin/escalate.ts git commit -m "feat(crew): 实现核心编排引擎 - Orchestrator + Delegate + 3个委派工具" ``` ### Phase 2 验收 - [ ] `crew_types.ts` 共享类型定义完成 - [ ] `crew_delegate.ts` 子 Agent 执行器完成(含超时控制、并发限制) - [ ] `delegate_task.ts` 串行委派工具完成 - [ ] `delegate_parallel.ts` 并行委派工具完成 - [ ] `escalate.ts` 升级人工工具完成(含暂停/恢复机制) - [ ] `crew_orchestrator.ts` 核心编排器完成(含增强 prompt 构建) - [ ] `crew.ts` 集群 CRUD 业务层完成 - [ ] 核心流程验证通过 - [ ] 代码已提交 --- ## Phase 3:后端通信层(WebSocket + 触发 + 定时) > 目标:实现 Controller API + CrewGateway WebSocket + 定时调度器 > 预估:2 天 > 依赖:Phase 2 完成 ### Step 3.1:实现 Crew CRUD Controller **文件**:`packages/backend/src/modules/netaclaw/controller/admin/crew.ts` ```typescript import { Body, Inject, Post, Provide } from '@midwayjs/core'; import { CoolController, BaseController } from '@cool-midway/core'; import { NetaClawCrewEntity } from '../../entity/crew.js'; import { CrewService } from '../../service/crew.js'; /** * 集群管理 CRUD * 自动生成: add / delete / update / info / list / page */ @Provide() @CoolController({ api: ['add', 'delete', 'update', 'info', 'list', 'page'], entity: NetaClawCrewEntity, pageQueryOp: { fieldEq: ['status'], keyWordLikeFields: ['name', 'label'], }, }) export class AdminCrewController extends BaseController { @Inject() crewService: CrewService; /** * 保存画布 + 同步成员关系 */ @Post('/saveCanvas') async saveCanvas(@Body() body: any) { const { crewId, canvasData, members, delegateHints } = body; await this.crewService.saveCanvas(crewId, { canvasData, members, delegateHints }); return this.ok(); } /** * 获取集群详情(含成员 Agent 信息) */ @Post('/detail') async detail(@Body() body: any) { const result = await this.crewService.getDetail(body.id); return this.ok(result); } /** * 发布集群 */ @Post('/publish') async publish(@Body() body: any) { await this.crewService.publish(body.id); return this.ok(); } /** * 取消发布 */ @Post('/unpublish') async unpublish(@Body() body: any) { await this.crewService.unpublish(body.id); return this.ok(); } } ``` **自动生成的 API 端点**(Cool Admin 约定,路径基于 `controller/admin/` 目录结构): - `POST /admin/netaclaw/crew/add` — 创建集群 - `POST /admin/netaclaw/crew/delete` — 删除集群 - `POST /admin/netaclaw/crew/update` — 更新集群 - `POST /admin/netaclaw/crew/info` — 获取集群信息 - `POST /admin/netaclaw/crew/page` — 分页查询 - `POST /admin/netaclaw/crew/saveCanvas` — 保存画布 - `POST /admin/netaclaw/crew/detail` — 集群详情(含成员) - `POST /admin/netaclaw/crew/publish` — 发布 - `POST /admin/netaclaw/crew/unpublish` — 取消发布 > **注意**:设计文档中写的 `POST /admin/crew/trigger/start` 是简写。实际 Cool Admin 路由映射为 `POST /admin/netaclaw/crewTrigger/start`(基于文件路径 `controller/admin/crew_trigger.ts`,驼峰转换)。前端通过 service 代理调用,无需关心具体路径。 **前端 service 代理路径**:`service.netaclaw.crew.saveCanvas()` 等 ### Step 3.2:实现 Crew Run Controller **文件**:`packages/backend/src/modules/netaclaw/controller/admin/crew_run.ts` ```typescript import { Provide } from '@midwayjs/core'; import { CoolController, BaseController } from '@cool-midway/core'; import { NetaClawCrewRunEntity } from '../../entity/crew_run.js'; /** * 集群运行记录查询 */ @Provide() @CoolController({ api: ['info', 'list', 'page'], entity: NetaClawCrewRunEntity, pageQueryOp: { fieldEq: ['crewId', 'status', 'triggerType'], where: async (ctx) => { // 可根据需求添加筛选逻辑 return []; }, }, }) export class AdminCrewRunController extends BaseController {} ``` ### Step 3.3:实现 Crew Trigger Controller **文件**:`packages/backend/src/modules/netaclaw/controller/admin/crew_trigger.ts` ```typescript import { Body, Inject, Post, Provide } from '@midwayjs/core'; import { CoolController, BaseController } from '@cool-midway/core'; import { CrewOrchestratorService } from '../../service/crew_orchestrator.js'; import { escalateResolvers } from '../../tools/builtin/escalate.js'; import { InjectEntityModel } from '@midwayjs/typeorm'; import { Repository } from 'typeorm'; import { NetaClawCrewRunEntity } from '../../entity/crew_run.js'; /** * 集群触发控制 */ @Provide() @CoolController() export class AdminCrewTriggerController extends BaseController { @Inject() orchestrator: CrewOrchestratorService; @InjectEntityModel(NetaClawCrewRunEntity) crewRunRepo: Repository; /** * 手动触发运行 */ @Post('/start') async start(@Body() body: { crewId: number; triggerInput?: string }) { const { crewId, triggerInput = '' } = body; // 创建 noop 回调(WebSocket 推送在 Gateway 中处理) // 这里提供基础回调,实际的 WebSocket 推送在 Phase 3.4 集成 const callbacks = this.createNoopCallbacks(); const runId = await this.orchestrator.start(crewId, 'manual', triggerInput, callbacks); return this.ok({ runId }); } /** * 终止运行 */ @Post('/stop') async stop(@Body() body: { runId: number }) { await this.orchestrator.stop(body.runId); return this.ok(); } /** * 恢复暂停的运行(处理 escalate) */ @Post('/resume') async resume(@Body() body: { runId: number; userMessage: string }) { const { runId, userMessage } = body; // 优先从内存 Map 中取出 resolve 回调 const resolver = escalateResolvers.get(runId); if (resolver) { // 内存中有 → 直接恢复(进程未重启) resolver(userMessage); escalateResolvers.delete(runId); await this.crewRunRepo.update(runId, { status: 'running' } as any); return this.ok(); } // 内存中没有(进程重启了)→ 从 pausedState 恢复 const run = await this.crewRunRepo.findOne({ where: { id: runId } }); if (run?.pausedState) { const history = JSON.parse(run.pausedState as string); const callbacks = this.createNoopCallbacks(); await this.orchestrator.resumeFromPausedState(runId, history, userMessage, callbacks); return this.ok(); } return this.fail('运行已过期或无暂停状态,请重新触发'); } /** * 获取运行详情(含子任务列表) */ @Post('/runDetail') async runDetail(@Body() body: { runId: number }) { const run = await this.crewRunRepo.findOne({ where: { id: body.runId } }); // 查询关联的 crew_task // ... 补充查询逻辑 return this.ok(run); } private createNoopCallbacks() { return { onLog: () => {}, onTaskStatus: () => {}, onRunStatus: () => {}, onEscalation: () => {}, }; } } ``` ### Step 3.4:实现 CrewGateway(WebSocket 网关) **文件**:`packages/backend/src/modules/netaclaw/gateway/crew_server.ts` > 独立 `/crew` 命名空间,与现有 `/netaclaw` 隔离。 ```typescript import { Inject, Logger, WSController, OnWSConnection, OnWSMessage } from '@midwayjs/core'; import { ILogger } from '@midwayjs/logger'; import { Context } from '@midwayjs/socketio'; import { CrewOrchestratorService } from '../service/crew_orchestrator.js'; import { escalateResolvers } from '../tools/builtin/escalate.js'; import { InjectEntityModel } from '@midwayjs/typeorm'; import { Repository } from 'typeorm'; import { NetaClawCrewRunEntity } from '../entity/crew_run.js'; import { NetaClawCrewTaskEntity } from '../entity/crew_task.js'; import { CrewCallbacks } from '../service/crew_types.js'; /** * Crew WebSocket 网关 * 命名空间: /crew * * 服务端 → 客户端事件: * crew:run:status - 运行状态变更 * crew:task:status - 子任务状态变更 * crew:escalation - 升级人工通知 * crew:log - 实时日志 * * 客户端 → 服务端事件: * crew:trigger - 触发运行 * crew:control - 控制运行(暂停/恢复/终止) */ @WSController('/crew') export class CrewGateway { @Inject() ctx: Context; @Logger() logger: ILogger; @Inject() orchestrator: CrewOrchestratorService; @InjectEntityModel(NetaClawCrewRunEntity) crewRunRepo: Repository; @OnWSConnection() async onConnection() { this.logger.info('[Crew WS] 客户端连接:', this.ctx.id); } @OnWSMessage('crew:trigger') async onTrigger(data: { crewId: number; triggerInput?: string }) { const { crewId, triggerInput = '' } = data; // 构建 WebSocket 推送回调 const callbacks = this.createCallbacks(); try { const runId = await this.orchestrator.start(crewId, 'manual', triggerInput, callbacks); this.ctx.emit('crew:run:status', { runId, status: 'running' }); } catch (err: any) { this.ctx.emit('crew:run:status', { runId: 0, status: 'failed', error: err.message }); } } @OnWSMessage('crew:control') async onControl(data: { runId: number; action: string; userMessage?: string }) { const { runId, action, userMessage } = data; switch (action) { case 'stop': await this.orchestrator.stop(runId); this.ctx.emit('crew:run:status', { runId, status: 'failed' }); break; case 'resume': const resolver = escalateResolvers.get(runId); if (resolver) { resolver(userMessage || '继续执行'); escalateResolvers.delete(runId); await this.crewRunRepo.update(runId, { status: 'running' } as any); this.ctx.emit('crew:run:status', { runId, status: 'running' }); } break; case 'pause': await this.orchestrator.pause(runId); this.ctx.emit('crew:run:status', { runId, status: 'paused' }); break; case 'retry': { // 重新触发一次运行(复制原 run 的 crewId 和 triggerInput) const run = await this.crewRunRepo.findOne({ where: { id: runId } }); if (run) { const callbacks = this.createCallbacks(); const newRunId = await this.orchestrator.start(run.crewId, 'manual', run.triggerInput || '', callbacks); this.ctx.emit('crew:run:status', { runId: newRunId, status: 'running' }); } break; } } } /** * 构建 WebSocket 推送回调 */ private createCallbacks(): CrewCallbacks { const ctx = this.ctx; return { onLog: (runId, taskId, agentName, level, message) => { ctx.emit('crew:log', { runId, taskId, agentName, level, message, timestamp: new Date().toISOString(), }); }, onTaskStatus: (runId, taskId, agentName, status, result, error) => { ctx.emit('crew:task:status', { runId, taskId, agentName, status, result, error }); }, onRunStatus: (runId, status, progress) => { ctx.emit('crew:run:status', { runId, status, progress }); }, onEscalation: (runId, taskId, reason, error) => { ctx.emit('crew:escalation', { runId, taskId, reason, error }); }, }; } } ``` ### Step 3.5:实现 CrewScheduler(定时调度器) **文件**:`packages/backend/src/modules/netaclaw/service/crew_scheduler.ts` ```typescript import { Provide, Inject, Logger, Init } from '@midwayjs/core'; import { ILogger } from '@midwayjs/logger'; import { InjectEntityModel } from '@midwayjs/typeorm'; import { Repository } from 'typeorm'; import { CronJob } from 'cron'; import { NetaClawCrewEntity } from '../entity/crew.js'; import { NetaClawCrewRunEntity } from '../entity/crew_run.js'; import { CrewOrchestratorService } from './crew_orchestrator.js'; import { escalateResolvers } from '../tools/builtin/escalate.js'; /** * 集群定时调度器 * - 管理 CronJob 实例的动态创建/销毁 * - 服务启动时恢复已发布集群的定时任务 * - 服务启动时恢复 paused 状态的运行 */ @Provide() export class CrewSchedulerService { @Logger() logger: ILogger; @InjectEntityModel(NetaClawCrewEntity) crewRepo: Repository; @InjectEntityModel(NetaClawCrewRunEntity) crewRunRepo: Repository; @Inject() orchestrator: CrewOrchestratorService; /** 活跃的 CronJob 实例 */ private activeJobs = new Map(); /** * 服务启动时初始化 */ @Init() async init() { // 延迟初始化,等待其他服务就绪 setTimeout(() => { this.restoreSchedules().catch(err => this.logger.error('[CrewScheduler] 恢复定时任务失败:', err) ); this.restorePausedRuns().catch(err => this.logger.error('[CrewScheduler] 恢复暂停运行失败:', err) ); }, 5000); } /** * 注册定时任务 */ registerCron(crew: NetaClawCrewEntity): void { if (!crew.triggerConfig?.cron?.enabled) return; // 先取消已有的 this.unregisterCron(crew.id); try { const job = new CronJob( crew.triggerConfig.cron.expression, () => { this.logger.info(`[CrewScheduler] 定时触发集群 ${crew.id}: ${crew.label}`); const callbacks = { onLog: () => {}, onTaskStatus: () => {}, onRunStatus: () => {}, onEscalation: () => {}, }; this.orchestrator.start(crew.id, 'cron', '', callbacks) .catch(err => this.logger.error(`[CrewScheduler] 定时运行失败:`, err)); }, null, true, crew.triggerConfig.cron.timezone || 'Asia/Shanghai' ); this.activeJobs.set(crew.id, job); this.logger.info(`[CrewScheduler] 注册定时任务: 集群 ${crew.id}, cron=${crew.triggerConfig.cron.expression}`); } catch (err) { this.logger.error(`[CrewScheduler] 注册定时任务失败:`, err); } } /** * 取消定时任务 */ unregisterCron(crewId: number): void { const job = this.activeJobs.get(crewId); if (job) { job.stop(); this.activeJobs.delete(crewId); this.logger.info(`[CrewScheduler] 取消定时任务: 集群 ${crewId}`); } } /** * 恢复所有已发布集群的定时任务 */ private async restoreSchedules(): Promise { const publishedCrews = await this.crewRepo.find({ where: { status: 1 } }); const cronCrews = publishedCrews.filter(c => c.triggerConfig?.cron?.enabled); this.logger.info(`[CrewScheduler] 恢复 ${cronCrews.length} 个定时任务`); cronCrews.forEach(crew => this.registerCron(crew)); } /** * 恢复 paused 状态的运行(进程重启后) */ private async restorePausedRuns(): Promise { const pausedRuns = await this.crewRunRepo.find({ where: { status: 'paused' } }); if (pausedRuns.length === 0) return; this.logger.info(`[CrewScheduler] 发现 ${pausedRuns.length} 个暂停的运行,等待用户恢复`); // 暂停的运行不自动恢复,而是等待用户手动处理 // 如果 pausedState 存在,可以在用户恢复时使用 } /** * 获取活跃定时任务数 */ getActiveJobCount(): number { return this.activeJobs.size; } } ``` ### Step 3.6:集成定时调度到发布/取消发布流程 **修改文件**:`packages/backend/src/modules/netaclaw/service/crew.ts` 在 `publish()` 和 `unpublish()` 方法中注入调度器: ```typescript // 在 CrewService 中注入 @Inject() scheduler: CrewSchedulerService; // publish() 中追加: async publish(crewId: number): Promise { // ... 现有校验逻辑 ... await this.crewRepo.update(crewId, { status: 1 } as any); // 注册定时任务 const crew = await this.crewRepo.findOne({ where: { id: crewId } }); if (crew) this.scheduler.registerCron(crew); } // unpublish() 中追加: async unpublish(crewId: number): Promise { await this.crewRepo.update(crewId, { status: 0 } as any); // 取消定时任务 this.scheduler.unregisterCron(crewId); } ``` ### Step 3.7:验证通信层 **验证清单**: - [ ] **CRUD API**:通过 Postman/curl 测试集群的创建、查询、更新、删除 - [ ] **画布保存**:`saveCanvas` API 正确保存 canvasData + 同步成员关系 - [ ] **发布流程**:发布时校验主 Agent 和成员数量 - [ ] **手动触发**:通过 REST API 和 WebSocket 均可触发运行 - [ ] **WebSocket 事件**:连接 `/crew` 命名空间,触发运行后能收到 `crew:run:status`、`crew:task:status`、`crew:log` 事件 - [ ] **终止运行**:`crew:control { action: 'stop' }` 正确终止 - [ ] **升级恢复**:escalate 后通过 `crew:control { action: 'resume' }` 正确恢复 - [ ] **定时调度**:发布带 cron 配置的集群后,定时任务注册成功 - [ ] 前端 service 代理路径可访问:`service.netaclaw.crew.*` ### Step 3.8:提交代码 ```bash git add packages/backend/src/modules/netaclaw/controller/admin/crew*.ts git add packages/backend/src/modules/netaclaw/gateway/crew_server.ts git add packages/backend/src/modules/netaclaw/service/crew_scheduler.ts # + crew.ts 的修改 git commit -m "feat(crew): 实现通信层 - Controller + WebSocket Gateway + 定时调度器" ``` ### Phase 3 验收 - [ ] 3 个 Controller 创建完成(crew / crew_run / crew_trigger) - [ ] CrewGateway WebSocket 网关工作正常 - [ ] CrewScheduler 定时调度器工作正常 - [ ] 发布/取消发布联动定时调度 - [ ] REST API + WebSocket 双通道触发验证通过 - [ ] 代码已提交 --- ## Phase 4:前端编排页 > 目标:实现完整的集群编排画布页面(Vue Flow 画布 + Agent 侧栏 + 属性面板) > 预估:3 天(Vue Flow 首次使用,额外预留缓冲) > 依赖:Phase 3 完成(后端 API 可用) ### Step 4.1:注册路由与菜单 #### 4.1.1 修改前端路由配置 **文件**:`packages/frontend/src/modules/agent/config.ts` 在 views 数组中追加: ```typescript { path: '/agent/crew-editor', meta: { label: 'Agent 编排' }, component: () => import('./views/crew-editor.vue') }, { path: '/agent/crew-monitor', meta: { label: '运行监控' }, component: () => import('./views/crew-monitor.vue') }, ``` #### 4.1.2 插入菜单记录 通过数据库插入 `base_sys_menu` 记录(Cool Admin 菜单管理): ```sql -- 在 Agent 管理父菜单下新增 INSERT INTO base_sys_menu (name, router, parentId, orderNum, type, icon, perms) VALUES ('Agent 编排', '/agent/crew-editor', , 5, 1, 'icon-crew', 'agent:crew:editor'), ('运行监控', '/agent/crew-monitor', , 6, 1, 'icon-monitor', 'agent:crew:monitor'); ``` > 具体 parentId 需查询当前数据库中 Agent 管理的菜单 ID。 ### Step 4.2:实现 Pinia Store **文件**:`packages/frontend/src/modules/agent/store/crew.ts` ```typescript import { defineStore } from 'pinia'; import { ref, computed } from 'vue'; import { useCool } from '/@/cool'; /** * Crew 编排状态管理 */ export const useCrewStore = defineStore('agent-crew', () => { const { service } = useCool(); // 集群列表 const crewList = ref([]); // 当前编辑的集群 const currentCrew = ref(null); // 当前集群的成员列表 const currentMembers = ref([]); // 可用 Agent 列表(已发布的 Agent) const availableAgents = ref([]); // 加载集群列表 async function loadCrewList() { const res = await service.netaclaw.crew.list(); crewList.value = res || []; } // 加载集群详情 async function loadCrewDetail(crewId: number) { const res = await service.netaclaw.crew.detail({ id: crewId }); currentCrew.value = res; currentMembers.value = res?.members || []; return res; } // 加载可用 Agent async function loadAvailableAgents() { const res = await service.netaclaw.agent.list({ status: 1 }); availableAgents.value = res || []; } // 保存画布 async function saveCanvas(crewId: number, canvasData: any, members: any[], delegateHints: any) { await service.netaclaw.crew.saveCanvas({ crewId, canvasData, members, delegateHints, }); } // 创建集群 async function createCrew(data: any) { const res = await service.netaclaw.crew.add(data); await loadCrewList(); return res; } // 发布/取消发布 async function publish(crewId: number) { await service.netaclaw.crew.publish({ id: crewId }); await loadCrewDetail(crewId); } async function unpublish(crewId: number) { await service.netaclaw.crew.unpublish({ id: crewId }); await loadCrewDetail(crewId); } // 判断某 Agent 是否已在当前集群中 const isMember = computed(() => { const ids = new Set(currentMembers.value.map((m: any) => m.agentId)); return (agentId: number) => ids.has(agentId); }); return { crewList, currentCrew, currentMembers, availableAgents, loadCrewList, loadCrewDetail, loadAvailableAgents, saveCanvas, createCrew, publish, unpublish, isMember, }; }); ``` ### Step 4.3:实现画布核心 Hook **文件**:`packages/frontend/src/modules/agent/hooks/crew-canvas.ts` ```typescript import { ref, watch } from 'vue'; import { useVueFlow, type Node, type Edge } from '@vue-flow/core'; /** * 画布操作 Hook * 封装 Vue Flow 的节点/连线增删、序列化/反序列化 */ export function useCrewCanvas() { const { nodes, edges, addNodes, removeNodes, addEdges, removeEdges, fitView, onConnect, onNodeDragStop, } = useVueFlow(); // 主 Agent ID const masterAgentId = ref(null); /** * 添加 Agent 节点到画布 */ function addAgentNode(agent: any, position?: { x: number; y: number }) { const id = `agent-${agent.id}`; // 避免重复添加 if (nodes.value.find(n => n.id === id)) return; const node: Node = { id, type: 'crew-agent', // 自定义节点类型 position: position || { x: Math.random() * 400 + 100, y: Math.random() * 300 + 100 }, data: { agentId: agent.id, name: agent.name, label: agent.label, icon: agent.icon, description: agent.description, role: '', groupName: '', isMaster: agent.id === masterAgentId.value, }, }; addNodes([node]); } /** * 移除 Agent 节点 */ function removeAgentNode(agentId: number) { const nodeId = `agent-${agentId}`; const node = nodes.value.find(n => n.id === nodeId); if (node) removeNodes([node]); } /** * 设置主 Agent */ function setMasterAgent(agentId: number) { masterAgentId.value = agentId; // 更新所有节点的 isMaster 标记 nodes.value.forEach(n => { if (n.data) { n.data.isMaster = n.data.agentId === agentId; } }); } /** * 序列化画布数据(保存用) */ function serializeCanvas() { return { nodes: nodes.value.map(n => ({ id: n.id, type: n.type, position: n.position, data: n.data, })), edges: edges.value.map(e => ({ id: e.id, source: e.source, target: e.target, data: e.data, })), }; } /** * 反序列化画布数据(加载用) */ function deserializeCanvas(canvasData: any) { if (!canvasData?.nodes) return; // 清空当前画布 removeNodes(nodes.value); removeEdges(edges.value); // 恢复节点 addNodes(canvasData.nodes.map((n: any) => ({ ...n, type: n.type || 'crew-agent', }))); // 恢复连线 if (canvasData.edges?.length) { addEdges(canvasData.edges); } // 延迟 fitView setTimeout(() => fitView({ padding: 0.2 }), 100); } /** * 从画布状态提取成员列表(保存到 crew_agent 表) */ function extractMembers() { return nodes.value .filter(n => n.data?.agentId) .map(n => ({ agentId: n.data.agentId, role: n.data.role || '', canvasPosition: n.position, groupName: n.data.groupName || '', })); } return { nodes, edges, masterAgentId, addAgentNode, removeAgentNode, setMasterAgent, serializeCanvas, deserializeCanvas, extractMembers, onConnect, onNodeDragStop, fitView, }; } ``` ### Step 4.4:实现画布 → delegateHints 转换 Hook **文件**:`packages/frontend/src/modules/agent/hooks/crew-orchestration.ts` ```typescript import type { Edge, Node } from '@vue-flow/core'; /** * 从画布连线生成 delegateHints * 供主 Agent system prompt 使用 */ export function canvasToHints( nodes: Node[], edges: Edge[] ): { hints: string; edges: Array<{ from: string; to: string; type: string; note?: string }> } { if (!edges.length) { return { hints: '无特定调度建议,请根据任务需求自行安排。', edges: [] }; } const nodeMap = new Map(); nodes.forEach(n => { if (n.data?.label) nodeMap.set(n.id, n.data.label); }); // 解析连线 const edgeInfos = edges.map(e => ({ from: nodeMap.get(e.source) || e.source, to: nodeMap.get(e.target) || e.target, type: e.data?.edgeType || 'serial', note: e.data?.note || '', })); // 生成自然语言提示 const serialEdges = edgeInfos.filter(e => e.type === 'serial'); const parallelEdges = edgeInfos.filter(e => e.type === 'parallel'); let hints = '建议执行顺序:\n'; if (serialEdges.length) { // 构建串行链 const chains: string[] = []; serialEdges.forEach(e => { const note = e.note ? `(${e.note})` : ''; chains.push(`${e.from} → ${e.to}${note}`); }); hints += `串行依赖:${chains.join(';')}\n`; } if (parallelEdges.length) { const parallel = parallelEdges.map(e => `${e.from} 与 ${e.to}`); hints += `可并行执行:${parallel.join(';')}\n`; } return { hints, edges: edgeInfos }; } ``` ### Step 4.5:实现自定义节点组件 **文件**:`packages/frontend/src/modules/agent/components/crew/crew-agent-node.vue` ```vue ★ {{ data.groupName }} {{ data.label || data.name }} {{ data.role }} ``` ### Step 4.6:实现自定义连线标签 **文件**:`packages/frontend/src/modules/agent/components/crew/crew-edge-label.vue` ```vue {{ label }} ``` ### Step 4.7:实现左侧 Agent 列表 **文件**:`packages/frontend/src/modules/agent/components/crew/crew-sidebar.vue` ```vue {{ agent.label || agent.name }} {{ agent.description }} ``` ### Step 4.8:实现属性面板 **文件**:`packages/frontend/src/modules/agent/components/crew/crew-property-panel.vue` ```vue Agent 属性 {{ selectedNode.data.label }} 设为主 Agent 连线属性 串行依赖 并行建议 集群配置 ``` ### Step 4.9:实现触发配置子组件 **文件**:`packages/frontend/src/modules/agent/components/crew/crew-trigger-config.vue` ```vue 触发方式 ``` ### Step 4.10:实现编排页主视图 **文件**:`packages/frontend/src/modules/agent/views/crew-editor.vue` ```vue + 新建集群 保存 发布 取消发布 试运行 ``` ### Step 4.11:实现右键菜单组件 **文件**:`packages/frontend/src/modules/agent/components/crew/crew-context-menu.vue` ```vue 设为主 Agent 编辑 Agent 移出集群 删除连线 自动布局 设为分组 ``` 在 `crew-editor.vue` 中集成右键菜单:在 `` 上监听 `@node-context-menu`、`@edge-context-menu`、`@pane-context-menu` 事件,控制菜单显示/隐藏和位置。 ### Step 4.12:实现自动布局(elkjs) 在 `hooks/crew-canvas.ts` 中新增 `autoLayout` 方法: ```typescript import ELK from 'elkjs/lib/elk.bundled.js'; const elk = new ELK(); /** * 使用 elkjs 自动布局 */ async function autoLayout() { const graph = { id: 'root', layoutOptions: { 'elk.algorithm': 'layered', 'elk.direction': 'RIGHT', 'elk.spacing.nodeNode': '80', 'elk.layered.spacing.nodeNodeBetweenLayers': '120', }, children: nodes.value.map(n => ({ id: n.id, width: 160, height: 80, })), edges: edges.value.map(e => ({ id: e.id, sources: [e.source], targets: [e.target], })), }; const layout = await elk.layout(graph); // 应用布局结果到节点位置 layout.children?.forEach(child => { const node = nodes.value.find(n => n.id === child.id); if (node && child.x !== undefined && child.y !== undefined) { node.position = { x: child.x, y: child.y }; } }); setTimeout(() => fitView({ padding: 0.2 }), 100); } ``` 在 `useCrewCanvas` 的返回值中导出 `autoLayout`,右键菜单的"自动布局"调用此方法。 ### Step 4.13:实现框选分组 在 `crew-editor.vue` 中利用 Vue Flow 的 `selectionMode` 和 `@selection-end` 事件实现框选: ```typescript // 在 crew-editor.vue 的 上添加: // selection-mode="box" // @selection-end="onSelectionEnd" function onSelectionEnd({ nodes: selectedNodes }: any) { if (selectedNodes.length < 2) return; // 弹出分组名输入框 ElMessageBox.prompt('请输入分组名', '设为分组', { confirmButtonText: '确定', cancelButtonText: '取消', }).then(({ value }) => { selectedNodes.forEach((n: any) => { if (n.data) n.data.groupName = value; }); }).catch(() => {}); } ``` ### Step 4.14:验证编排页 **验证清单**: - [ ] 路由 `/agent/crew-editor` 正常访问 - [ ] 左侧 Agent 列表正确加载 - [ ] 拖拽 Agent 到画布,节点正常渲染 - [ ] 节点间可拖拽连线 - [ ] 右键节点:设为主 Agent / 移出集群 / 编辑 Agent - [ ] 右键连线:删除连线 - [ ] 右键空白:自动布局(elkjs) - [ ] 框选多节点 → 右键"设为分组" - [ ] 主 Agent 显示金色边框 + 星标 - [ ] 点击节点/连线/空白,底部属性面板正确切换 - [ ] 属性面板:角色描述、分组名、重试次数均可编辑 - [ ] 连线类型(串行/并行)可切换 - [ ] 保存功能正常(canvasData + members + delegateHints 持久化) - [ ] 加载已保存的集群,画布正确恢复 - [ ] 发布/取消发布功能正常 - [ ] MiniMap / Controls / Background 正常显示 ### Step 4.15:提交代码 ```bash git add packages/frontend/src/modules/agent/config.ts git add packages/frontend/src/modules/agent/store/crew.ts git add packages/frontend/src/modules/agent/hooks/crew-canvas.ts git add packages/frontend/src/modules/agent/hooks/crew-orchestration.ts git add packages/frontend/src/modules/agent/views/crew-editor.vue git add packages/frontend/src/modules/agent/components/crew/ git commit -m "feat(crew): 实现编排页 - Vue Flow 画布 + Agent 侧栏 + 属性面板" ``` ### Phase 4 验收 - [ ] 前端路由注册完成 - [ ] Pinia Store 完成 - [ ] 画布操作 Hook 完成(含 elkjs 自动布局) - [ ] delegateHints 转换 Hook 完成 - [ ] 自定义节点/连线组件完成 - [ ] 右键菜单组件完成(节点/连线/画布三种模式) - [ ] 侧栏/属性面板(含重试次数)/触发配置组件完成 - [ ] 框选分组功能完成 - [ ] 编排页主视图完成 - [ ] 拖拽添加 Agent → 连线 → 配置角色 → 保存 全流程验证通过 - [ ] 代码已提交 --- ## Phase 5:前端监控页 > 目标:实现运行列表 + 运行详情(实时画布 + 任务时间线 + 日志面板) > 预估:2.5 天 > 依赖:Phase 4 完成(画布组件可复用) ### Step 5.1:实现 WebSocket 监控 Hook **文件**:`packages/frontend/src/modules/agent/hooks/crew-monitor.ts` ```typescript import { ref, onUnmounted } from 'vue'; import { io, Socket } from 'socket.io-client'; import { useStore } from '../../../store'; import config from '/@/config'; /** * Crew 监控 WebSocket Hook * 连接 /crew 命名空间,订阅实时事件 */ export function useCrewMonitor() { const socket = ref(null); const { user } = useStore(); // 实时状态 const runStatusMap = ref>(new Map()); const taskStatusMap = ref>(new Map()); const logs = ref([]); const escalations = ref([]); /** 连接 WebSocket */ function connect() { const wsUrl = (config as any).host || 'http://127.0.0.1:8001'; socket.value = io(`${wsUrl}/crew`, { auth: { token: user.token, isAdmin: true }, transports: ['websocket'], }); socket.value.on('crew:run:status', (data: any) => { runStatusMap.value.set(data.runId, data); }); socket.value.on('crew:task:status', (data: any) => { taskStatusMap.value.set(data.taskId, data); }); socket.value.on('crew:log', (data: any) => { logs.value.push(data); // 限制日志条数,防止内存溢出 if (logs.value.length > 5000) logs.value.splice(0, 1000); }); socket.value.on('crew:escalation', (data: any) => { escalations.value.push(data); }); } /** 触发运行 */ function triggerRun(crewId: number, triggerInput?: string) { socket.value?.emit('crew:trigger', { crewId, triggerInput }); } /** 控制运行 */ function controlRun(runId: number, action: string, userMessage?: string) { socket.value?.emit('crew:control', { runId, action, userMessage }); } /** 获取某次运行的日志 */ function getRunLogs(runId: number, agentName?: string) { return logs.value.filter(l => { if (agentName) return l.agentName === agentName; return true; // 返回所有日志 }); } /** 获取某次运行的任务状态列表 */ function getRunTasks(runId: number) { return [...taskStatusMap.value.values()].filter(t => t.runId === runId); } /** 清理日志 */ function clearLogs() { logs.value = []; } /** 断开连接 */ function disconnect() { socket.value?.disconnect(); socket.value = null; } onUnmounted(() => disconnect()); return { socket, runStatusMap, taskStatusMap, logs, escalations, connect, disconnect, triggerRun, controlRun, getRunLogs, getRunTasks, clearLogs, }; } ``` ### Step 5.2:实现运行记录表格 **文件**:`packages/frontend/src/modules/agent/components/crew/crew-run-table.vue` ```vue 刷新 {{ getCrewLabel(row.crewId) }} {{ statusLabel(row.status) }} {{ calcDuration(row.startTime, row.endTime) }} {{ row.progress || '-' }} {{ row.tokenUsage ? `${row.tokenUsage.inputTokens + row.tokenUsage.outputTokens}` : '-' }} 终止 恢复 详情 ``` ### Step 5.3:实现任务时间线 **文件**:`packages/frontend/src/modules/agent/components/crew/crew-timeline.vue` ```vue 任务时间线 {{ task.agentName }} {{ statusIcon(task.status) }} {{ task.status }} {{ task.taskDescription || task.result?.agent }} {{ task.duration }} 暂无任务记录 ``` ### Step 5.4:实现日志面板 **文件**:`packages/frontend/src/modules/agent/components/crew/crew-log-panel.vue` ```vue {{ formatTime(log.timestamp) }} [{{ log.agentName }}] {{ log.message }} 等待日志... ``` ### Step 5.5:实现运行详情容器 **文件**:`packages/frontend/src/modules/agent/components/crew/crew-run-detail.vue` ```vue {{ currentEscalation?.reason }} 取消 恢复执行 ``` ### Step 5.6:实现监控页主视图 **文件**:`packages/frontend/src/modules/agent/views/crew-monitor.vue` ```vue 点击运行记录查看详情 ``` ### Step 5.7:验证监控页 **验证清单**: - [ ] 路由 `/agent/crew-monitor` 正常访问 - [ ] 运行记录表格正确加载,筛选功能正常 - [ ] 状态颜色圆点正确显示 - [ ] WebSocket 连接 `/crew` 命名空间成功 - [ ] 触发运行后,表格实时更新状态 - [ ] 点击行展开详情,三栏布局正确渲染 - [ ] 实时画布:节点状态随任务进度变化(idle→running→completed/failed) - [ ] 任务时间线:按时间排序,状态图标正确 - [ ] 日志面板:实时滚动,Tab 切换过滤,搜索功能正常 - [ ] 升级处理:escalate 触发后弹出对话框,输入意见后恢复执行 - [ ] 终止运行功能正常 ### Step 5.8:提交代码 ```bash git add packages/frontend/src/modules/agent/hooks/crew-monitor.ts git add packages/frontend/src/modules/agent/views/crew-monitor.vue git add packages/frontend/src/modules/agent/components/crew/crew-run-table.vue git add packages/frontend/src/modules/agent/components/crew/crew-run-detail.vue git add packages/frontend/src/modules/agent/components/crew/crew-timeline.vue git add packages/frontend/src/modules/agent/components/crew/crew-log-panel.vue git commit -m "feat(crew): 实现监控页 - 运行列表 + 实时画布 + 时间线 + 日志面板" ``` ### Phase 5 验收 - [ ] WebSocket 监控 Hook 完成 - [ ] 运行记录表格完成(含实时更新) - [ ] 任务时间线完成 - [ ] 日志面板完成(Tab 切换 + 搜索 + 自动滚动) - [ ] 运行详情三栏布局完成(画布 + 时间线 + 日志) - [ ] 升级处理对话框完成 - [ ] 监控页主视图完成 - [ ] 端到端实时监控验证通过 - [ ] 代码已提交 --- ## Phase 6:权限集成 + 端到端测试 > 目标:配置菜单权限、数据权限,并完成端到端全流程测试 > 预估:1 天 > 依赖:Phase 5 完成 ### Step 6.1:配置菜单权限 通过数据库或 Cool Admin 管理后台插入菜单和权限记录: ```sql -- 1. 查询 Agent 管理父菜单 ID SELECT id FROM base_sys_menu WHERE router = '/agent' OR name = 'Agent 管理'; -- 2. 插入 Agent 编排页菜单 INSERT INTO base_sys_menu (name, router, parentId, orderNum, type, icon, perms, isShow) VALUES ('Agent 编排', '/agent/crew-editor', @parentId, 5, 1, 'icon-crew', NULL, 1); -- 3. 插入运行监控页菜单 INSERT INTO base_sys_menu (name, router, parentId, orderNum, type, icon, perms, isShow) VALUES ('运行监控', '/agent/crew-monitor', @parentId, 6, 1, 'icon-monitor', NULL, 1); -- 4. 插入操作权限按钮(挂在编排页菜单下) SET @crewMenuId = LAST_INSERT_ID(); -- 需要调整 INSERT INTO base_sys_menu (name, parentId, type, perms) VALUES ('创建集群', @crewMenuId, 2, 'agent:crew:create'), ('编辑集群', @crewMenuId, 2, 'agent:crew:edit'), ('发布集群', @crewMenuId, 2, 'agent:crew:publish'), ('触发运行', @crewMenuId, 2, 'agent:crew:trigger'), ('控制运行', @crewMenuId, 2, 'agent:crew:control'), ('查看监控', @crewMenuId, 2, 'agent:crew:view'); ``` > 实际操作时建议通过 Cool Admin 的菜单管理页面 UI 操作,更安全。 ### Step 6.2:数据权限(租户隔离) `netaclaw_crew` 继承 `BaseEntity` 已自带 `tenantId` 字段。Cool Admin 的 `@CoolController` 自动在查询时注入租户过滤条件,无需额外代码。 **验证点**: - [ ] 不同租户创建的集群互不可见 - [ ] 管理员可查看所有集群 ### Step 6.3:Controller 权限装饰 如需更细粒度的权限控制,在 Controller 方法上添加装饰器: ```typescript // 示例:crew_trigger.ts 中的 start 方法 @Post('/start') // Cool Admin 权限检查由框架自动处理,基于菜单 perms 配置 async start(@Body() body: { crewId: number; triggerInput?: string }) { // ... } ``` > Cool Admin 的权限机制是:前端请求携带 token → 后端中间件解析用户角色 → 校验用户是否有该路由对应的菜单权限。菜单权限在 Step 6.1 已配置。 ### Step 6.4:端到端全流程测试 #### 测试场景 1:基本编排 + 运行 ``` 前置条件:已有 3 个已发布 Agent(如:登录Agent、数据获取Agent、上架Agent) 1. 进入编排页 → 新建集群"淘宝运营" 2. 从左侧拖入 3 个 Agent 3. 设置"登录Agent"为主 Agent 4. 连线:登录Agent → 数据获取Agent → 上架Agent(串行) 5. 给每个 Agent 配置角色描述 6. 保存 → 发布 7. 点击"试运行" → 跳转监控页 8. 观察: - 运行列表新增一行(status=running) - 实时画布节点状态变化 - 时间线记录各任务执行 - 日志面板实时输出 9. 运行完成 → status=completed 10. 检查数据库:crew_run / crew_task 记录正确 ``` #### 测试场景 2:并行委派 ``` 1. 编排:A(主) → B(串行), A → C(并行), A → D(并行) 2. 触发运行 3. 验证:B 先执行完,然后 C 和 D 并行执行 4. 检查 crew_task 表:C 和 D 的 startTime 接近 ``` #### 测试场景 3:错误处理 + 升级 ``` 1. 编排包含一个"必然失败"的 Agent(如 API Key 无效) 2. 触发运行 3. 验证:主 Agent 收到错误,尝试处理 4. 如果主 Agent 判断无法处理 → 调用 escalate 5. 监控页弹出升级对话框 6. 输入处理意见 → 点击恢复 7. 验证主 Agent 继续执行 ``` #### 测试场景 4:定时调度 ``` 1. 编辑集群触发配置:启用 cron,表达式 = "*/2 * * * *"(每2分钟) 2. 发布集群 3. 等待 2 分钟,验证自动触发运行 4. 取消发布 → 验证定时任务停止 ``` ### Step 6.5:性能与稳定性检查 - [ ] **并发安全**:同时触发 3 个集群运行,无死锁或数据混乱 - [ ] **WebSocket 断线重连**:刷新页面后重新连接 `/crew`,状态恢复 - [ ] **超时控制**:子 Agent 执行超过 taskTimeout 后正确标记 failed - [ ] **内存泄漏**:长时间运行后检查 Node.js 内存使用 - [ ] **日志量**:大量日志时前端不卡顿(已设置 5000 条上限) ### Step 6.6:提交代码 ```bash git add -A git commit -m "feat(crew): 完成权限集成 + 端到端测试验证" ``` ### Phase 6 验收 - [ ] 菜单权限配置完成 - [ ] 租户数据隔离验证通过 - [ ] 端到端测试场景 1-4 全部通过 - [ ] 性能与稳定性检查通过 - [ ] 代码已提交 --- ## 风险应对与降级方案 ### 风险 1:Vue Flow 兼容问题 | 级别 | 场景 | 应对 | |------|------|------| | 🟢 低 | 样式冲突 | 调整 CSS 隔离,使用 `scoped` + `deep` 选择器 | | 🟡 中 | 自定义节点渲染异常 | 检查 Vue Flow 版本 API 变更,参考官方 examples | | 🔴 高 | Vue Flow 与项目 Vue 版本不兼容 | **降级方案**:使用纯 SVG + CSS 实现简化版画布(节点拖拽用 `interact.js`,连线用 SVG ``) | **预防措施**:Phase 0 的技术验证(Step 0.4)可在正式开发前发现此类问题。 ### 风险 2:主 Agent 编排质量差 | 场景 | 应对 | |------|------| | 主 Agent 不遵循 delegateHints | 强化 system prompt 中的指令措辞,增加 few-shot 示例 | | 主 Agent 无限循环委派 | `maxToolRounds = 50` 硬性限制 + 检测连续失败次数 | | 主 Agent 忽略错误 | 在 tool_result 中显式标记 `status: "failed"`,提示需要处理 | ### 风险 3:长时间运行稳定性 | 场景 | 应对 | |------|------| | 服务重启丢失运行状态 | `pausedState` 持久化对话上下文,启动时恢复 | | 子 Agent 执行卡死 | `Promise.race` 超时控制,默认 300s | | WebSocket 断线 | 前端 Socket.IO 自动重连 + 重新拉取当前状态 | ### 风险 4:并发 Token 消耗过高 | 场景 | 应对 | |------|------| | 并行执行大量子 Agent | `maxConcurrent` 限制(默认 3),分批执行 | | Token 成本超预期 | 前端展示累计 token 消耗,集群配置 token 预算告警(后续迭代) | --- ## 验收标准 ### 功能验收 | # | 功能点 | 验收标准 | |---|--------|---------| | 1 | 集群 CRUD | 创建/编辑/删除集群,数据持久化正确 | | 2 | 画布编排 | 拖拽添加 Agent、自由连线、设置主 Agent、保存/加载画布 | | 3 | 角色配置 | 每个 Agent 可配置角色描述和分组 | | 4 | 连线提示 | 连线自动生成 delegateHints,注入主 Agent system prompt | | 5 | 发布控制 | 发布校验(需主 Agent + ≥2 成员),取消发布 | | 6 | 手动触发 | 通过 REST API 和 WebSocket 触发运行 | | 7 | 定时调度 | Cron 表达式配置,发布时注册,取消发布时销毁 | | 8 | 串行委派 | 主 Agent 通过 delegate_task 串行执行子 Agent | | 9 | 并行委派 | 主 Agent 通过 delegate_parallel 并行执行子 Agent | | 10 | 错误处理 | 子 Agent 失败时,主 Agent 收到错误信息并自主决策 | | 11 | 升级人工 | escalate 暂停运行 → 用户处理 → 恢复执行 | | 12 | 运行列表 | 分页查询、状态筛选、实时更新 | | 13 | 实时画布 | 节点状态随任务进度实时变化(呼吸动画/颜色) | | 14 | 任务时间线 | 按时间排序,并行任务并排显示 | | 15 | 日志面板 | 实时流式日志,Agent 维度 Tab 切换,搜索过滤 | | 16 | Token 统计 | 每个任务和整次运行的 token 消耗正确记录 | | 17 | 权限控制 | 菜单权限 + 租户数据隔离 | ### 非功能验收 | # | 指标 | 标准 | |---|------|------| | 1 | 零 runtime 改动 | `runtime/agent.ts` 和 `runtime/attempt.ts` 无修改 | | 2 | 现有功能不受影响 | Agent 对话、Skill 管理等原有功能正常 | | 3 | 代码规范 | Entity 继承 BaseEntity、Controller 使用 @CoolController、文件下划线命名 | | 4 | 并发安全 | 3 个集群同时运行无异常 | | 5 | 超时控制 | 子 Agent 超时后正确标记 failed | ### 交付物清单 | 类别 | 数量 | 文件 | |------|------|------| | 后端 Entity | 4 新增 + 1 修改 | `crew.ts`, `crew_agent.ts`, `crew_run.ts`, `crew_task.ts`, `agent.ts` | | 后端 Service | 5 新增 | `crew_types.ts`, `crew.ts`, `crew_orchestrator.ts`, `crew_delegate.ts`, `crew_scheduler.ts` | | 后端 Tool | 3 新增 | `delegate_task.ts`, `delegate_parallel.ts`, `escalate.ts` | | 后端 Controller | 3 新增 | `crew.ts`, `crew_run.ts`, `crew_trigger.ts` | | 后端 Gateway | 1 新增 | `crew_server.ts` | | 前端 View | 2 新增 | `crew-editor.vue`, `crew-monitor.vue` | | 前端 Component | 11 新增 | `crew-agent-node.vue`, `crew-edge-label.vue`, `crew-sidebar.vue`, `crew-property-panel.vue`, `crew-trigger-config.vue`, `crew-context-menu.vue`, `crew-run-table.vue`, `crew-run-detail.vue`, `crew-timeline.vue`, `crew-log-panel.vue`, `crew-canvas.vue`(可选抽取) | | 前端 Hook | 3 新增 | `crew-canvas.ts`, `crew-orchestration.ts`, `crew-monitor.ts` | | 前端 Store | 1 新增 | `crew.ts` | | **合计** | **34 个文件** | 31 新增 + 3 修改 | --- ## 时间线总览 ``` Phase 0 准备工作 ██ 0.5 天 Phase 1 后端数据层 ████ 1 天 Phase 2 后端核心引擎 ██████████ 2.5 天 Phase 3 后端通信层 ████████ 2 天 Phase 4 前端编排页 ████████████ 3 天 Phase 5 前端监控页 ██████████ 2.5 天 Phase 6 权限+测试 ████ 1 天 ──────── 总计 12.5 天 ``` > **说明**:Phase 4 预留了额外缓冲(Vue Flow 首次使用风险)。如果 Phase 0 验证顺利,Phase 4 可缩短至 2 天。
{{ task.taskDescription || task.result?.agent }}
{{ currentEscalation?.reason }}
点击运行记录查看详情