GPU_GUARD_MONOREPO/docs/superpowers/specs/2026-04-14-multi-agent-crew-orchestration-design.md
2026-05-20 21:39:12 +08:00

24 KiB
Raw Permalink Blame History

Multi-Agent Crew 编排与运行监控 设计文档

日期2026-04-14 状态:待实施

1. 概述

1.1 背景

现有 Neta 平台已具备单 Agent 管理能力(创建、配置 Skill、选模型、对话。业务场景需要多个 Agent 协作完成复杂任务——例如淘宝电商运营涉及登录、产品生成、上架、投流分析等环节,每个环节由专门的子 Agent 负责,未来还需扩展到拼多多、抖音等平台。因此需要一个"Agent 集群Crew"编排与监控系统。

1.2 核心设计决策

采用 主 Agent 委派模式(借鉴 hermes-agent 的 delegate 模式):

  • 主 Agent 是一个真正的 AI Agent拥有 delegate_task 工具
  • 主 Agent 根据自身判断决定调用哪个子 Agent、并行还是串行
  • 画布上的可选连线作为 soft hint 注入主 Agent 的 system prompt不是硬编码流程
  • 子 Agent 报错时,主 Agent 自主决策重试/换 Agent/跳过,搞不定才升级给用户

1.3 功能范围

功能 说明
Agent 编排页 无限画布,拖入 Agent 组建集群,可选连线表示推荐调用关系
运行监控页 集群运行列表 + 详情(实时画布 + 任务时间线 + 日志)
集群调度引擎 后端编排器,管理主 Agent 委派子 Agent 的全生命周期
多触发方式 手动 / 定时 / Webhook / API
权限集成 集群可分配给不同权限的用户

2. 数据模型

2.1 新增表

netaclaw_crewAgent 集群)

字段 类型 说明
id int (PK) 自增主键
name varchar(100) 集群唯一标识
label varchar(200) 显示名称
description text 集群描述
icon varchar(500) 图标
masterAgentId int 主 Agent ID关联 netaclaw_agent
canvasData json 画布布局数据(节点位置、连线、分组)
triggerConfig json 触发配置
delegateHints json 从连线生成的委派提示文本
status tinyint 0=草稿 1=已发布
maxConcurrent int 最大并发子 Agent 数,默认 3
taskTimeout int 单个子任务默认超时时间(秒),默认 300
retryPolicy json 全局重试策略

triggerConfig JSON 结构:

{
  "manual": true,
  "cron": { "enabled": false, "expression": "0 9 * * *", "timezone": "Asia/Shanghai" },
  "webhook": { "enabled": false, "secret": "xxx" },
  "api": { "enabled": false, "apiKey": "xxx" }
}

delegateHints JSON 结构(从画布连线自动生成):

{
  "hints": "建议执行顺序登录Agent → 获取商品数据Agent → 上架Agent投流分析Agent 可与上架并行执行。",
  "edges": [
    { "from": "登录Agent", "to": "获取数据Agent", "type": "serial", "note": "需要先登录获取cookie" },
    { "from": "获取数据Agent", "to": "上架Agent", "type": "serial" },
    { "from": "获取数据Agent", "to": "投流Agent", "type": "parallel" }
  ]
}

netaclaw_crew_agent集群-Agent 关联)

数据权威性说明crew_agent 表是集群成员关系的权威来源(成员列表、角色、分组)。canvasData JSON 仅存储画布渲染信息(节点位置、连线样式)。后端编排器读取 crew_agent 获取成员和角色,不依赖 canvasData。保存画布时,前端负责保持两者同步。

字段 类型 说明
id int (PK) 自增主键
crewId int 集群 ID
agentId int Agent ID
role text 该 Agent 在集群中的角色描述
canvasPosition json 画布位置 {x, y}
groupName varchar(100) 分组名(如"淘宝组"

netaclaw_crew_run集群运行记录

字段 类型 说明
id int (PK) 自增主键
crewId int 集群 ID
triggerType varchar(20) manual / cron / webhook / api
triggerInput text 触发时的输入参数
status varchar(20) pending / running / paused / completed / failed
masterSessionId int 主 Agent 会话 ID
startTime datetime 开始时间
endTime datetime 结束时间
result json 运行结果摘要
error text 错误信息
pausedState json 升级暂停时保存的主 Agent 对话上下文(用于恢复)
tokenUsage json 整次运行累计 token 消耗 { inputTokens, outputTokens }

netaclaw_crew_task子 Agent 任务记录)

字段 类型 说明
id int (PK) 自增主键
runId int 运行记录 ID
agentId int 子 Agent ID
taskDescription text 主 Agent 委派的任务描述
status varchar(20) pending / running / completed / failed / retrying
sessionId int 子 Agent 会话 ID
startTime datetime 开始时间
endTime datetime 结束时间
result json 执行结果
error text 错误信息
retryCount int 已重试次数,默认 0
timeout int 该任务超时时间(秒),为空则使用集群的 taskTimeout
tokenUsage json 该任务 token 消耗 { inputTokens, outputTokens }
parentTaskId int 父任务 ID支持嵌套委派

2.2 扩展现有表

netaclaw_agent 新增字段:

字段 类型 说明
isCrewMaster tinyint 是否可作为集群主 AgentUI 筛选用)

3. 后端架构

3.1 模块结构

netaclaw/crew/
├── entity/
│   ├── crew.ts
│   ├── crew_agent.ts
│   ├── crew_run.ts
│   └── crew_task.ts
├── controller/
│   ├── crew.ts              # 集群 CRUD + 画布保存
│   ├── crew_run.ts          # 运行记录查询
│   └── crew_trigger.ts      # 触发控制(启动/暂停/恢复/终止)
├── gateway/
│   └── crew_server.ts       # ★ /crew 命名空间 WebSocket 网关
├── service/
│   ├── crew.ts              # 集群业务逻辑
│   ├── crew_orchestrator.ts # ★ 核心编排调度器
│   ├── crew_delegate.ts     # 委派执行器(调用 runAgent 执行子 Agent
│   └── crew_scheduler.ts    # 定时调度CronJob 动态管理)
└── tools/
    ├── delegate_task.ts     # ★ 串行委派工具
    ├── delegate_parallel.ts # ★ 并行委派工具
    └── escalate.ts          # ★ 升级人工工具

3.2 核心编排流程CrewOrchestrator

触发(手动/定时/webhook/api
  → 创建 crew_runstatus=running
  → 加载集群配置masterAgent + 子 Agent 列表 + 角色 + 连线提示)
  → 构建主 Agent 增强 system prompt
      原始 systemPrompt
      + "\n\n## 团队成员\n"
      + 每个子 Agent: "- {name}: {role描述}\n"
      + "\n## 调度建议\n" + delegateHints.hints
      + "\n## 工具说明\n使用 delegate_task 将任务委派给团队成员。"
  → 创建主 Agent 会话
  → 发送 triggerInput 作为用户消息
  → 主 Agent 进入 ReAct 循环
      ├─ 调用 delegate_task(agent_name, task_description, context?)
      │   → CrewDelegate 创建 crew_taskstatus=running
      │   → 调用 runAgent()(纯函数,独立上下文,参见 3.7 节)
      │   → 等待子 Agent 完成 → 返回 DelegateResult
      │   → 更新 crew_task 状态 → WebSocket 推送
      ├─ 调用 delegate_parallel(tasks[])
      │   → 为每个子任务创建 crew_task
      │   → Promise.all() 并发调用多个 runAgent()(受 maxConcurrent 限制)
      │   → 全部完成后返回 DelegateResult[]
      ├─ 子 Agent 报错
      │   → 错误信息作为 tool_result 返回给主 Agent
      │   → 主 Agent 自主决策:重试 / 换 Agent / 跳过
      │   → 主 Agent 判断无法处理 → 调用 escalate 工具
      │   → 保存对话上下文到 pausedState → crew_run status=paused
      │   → WebSocket 推送 crew:escalation
      │   → 用户在监控页处理后 → crew:control resume → 恢复执行
      └─ 主 Agent 判断全部完成 → crew_run status=completed

3.3 delegate_task 工具

提供两个委派工具:delegate_task(串行,单个委派)和 delegate_parallel(并行,批量委派)。

delegate_task串行委派

{
  name: "delegate_task",
  description: "将任务委派给团队中的一个子 Agent 执行,等待执行完成后返回结果。",
  parameters: {
    agent_name: { type: "string", description: "子 Agent 名称" },
    task_description: { type: "string", description: "任务描述" },
    context: { type: "string", description: "上下文(前序任务结果等)", optional: true }
  }
}

串行执行:直接调用 runAgent()await 子 Agent 完成后返回结果。与现有 runAttempt 的逐个 tool_call 处理方式完全兼容。

delegate_parallel并行委派

{
  name: "delegate_parallel",
  description: "将多个任务同时委派给不同的子 Agent 并行执行,全部完成后返回所有结果。",
  parameters: {
    tasks: {
      type: "array",
      items: {
        type: "object",
        properties: {
          agent_name: { type: "string", description: "子 Agent 名称" },
          task_description: { type: "string", description: "任务描述" },
          context: { type: "string", description: "上下文", optional: true }
        }
      },
      description: "要并行执行的任务列表"
    }
  }
}

并行执行逻辑:

  • 单次 tool_call 包含多个子任务,在 execute() 内部使用 Promise.all() 并发调用多个 runAgent()
  • maxConcurrent 限制,超出上限的任务排队等待
  • 全部完成后,将所有结果合并为一个数组返回

设计说明:之所以不用"多个 delegate_task + parallel_group"的方案,是因为现有 runAttempt 对 toolCalls 是逐个 await 处理的。如果要"收集同组任务再并发",需要侵入性地修改 ReAct 循环。而 delegate_parallel 作为一个单独工具,在自己的 execute() 内部实现并发,对 runtime 零改动。

tool_result 返回格式

delegate_task 和 delegate_parallel 均返回结构化的 JSON 结果:

// 单任务结果
interface DelegateResult {
  agent: string;          // 子 Agent 名称
  status: "completed" | "failed";
  result: string;         // 子 Agent 的最终输出
  error?: string;         // 失败时的错误信息
  duration: string;       // 执行耗时,如 "45s"
  tokenUsage: { inputTokens: number; outputTokens: number };
}

// delegate_parallel 返回 DelegateResult[]

主 Agent 根据返回的 status 和 error 自主决策后续行动。

3.4 escalate 工具与暂停/恢复机制

{
  name: "escalate",
  description: "当你无法处理某个问题时,升级给人工处理。集群将暂停等待人工介入。",
  parameters: {
    reason: { type: "string", description: "升级原因" },
    failed_task: { type: "string", description: "失败的任务描述", optional: true }
  }
}

暂停/恢复持久化流程:

escalate 工具被调用时,本质上是在主 Agent 的 ReAct 循环内部。具体流程:

  1. 暂停escalate.execute() 被调用时:

    • 将当前主 Agent 的完整对话历史(conversation 数组)序列化到 crew_run.pausedState
    • crew_run.status 设为 paused
    • 通过 WebSocket 推送 crew:escalation 事件
    • execute() 返回一个 Promise该 Promise 不 resolve,而是将 resolve 回调存储在内存 Map 中key 为 runId
    • 这使得 runAttempt 阻塞在该 tool_call 上,不继续循环
  2. 恢复:用户在监控页点击"恢复"时:

    • 前端发送 crew:control { runId, action: "resume", userMessage: "..." }
    • 后端从内存 Map 中取出对应的 resolve 回调
    • 将用户的处理意见作为 tool_result 传入 resolve
    • runAttempt 恢复执行,主 Agent 收到用户意见后继续决策
  3. 进程重启恢复:如果服务重启导致内存 Map 丢失:

    • 启动时扫描 crew_runstatus=paused 的记录
    • pausedState 恢复对话历史
    • 重新调用 runAgent(),将用户的恢复消息追加到历史中继续执行

3.5 WebSocket 事件

使用独立的 /crew Socket.IO 命名空间,与现有 /netaclaw 网关隔离。新建 CrewGatewaygateway/crew_server.ts),使用 @WSController('/crew') 装饰器。

设计说明:现有 /netaclaw 网关使用 sessionState = new Map() 管理状态,所有 message 事件共享同一个 handler。Crew 的事件模式(多运行实例、多任务并发推送)与单 Agent 对话模式差异较大,混入现有 handler 会增加耦合和复杂度。独立命名空间更清晰。

事件 方向 数据
crew:run:status S→C { runId, status, progress: "3/7" }
crew:task:status S→C { runId, taskId, agentName, status, result?, error? }
crew:escalation S→C { runId, taskId, reason, error }
crew:trigger C→S { crewId, triggerInput }
crew:control C→S `{ runId, action: "pause"
crew:log S→C { runId, taskId?, agentName, level, message, timestamp }

3.6 触发方式

方式 端点/机制 认证
手动 POST /admin/crew/trigger/start 登录态
定时 使用 cron npm 包的 CronJob 动态创建定时任务(与现有 modules/task/service/local.ts 模式一致) 内部
Webhook POST /open/crew/webhook/:crewId 签名验证
API POST /open/crew/api/:crewId/run API Key

定时调度实现细节(crew_scheduler.ts

import { CronJob } from 'cron';

// 内存中维护活跃的 CronJob 实例
const activeJobs = new Map<number, CronJob>(); // crewId → CronJob

// 集群发布时注册定时任务
function registerCron(crew: Crew) {
  if (!crew.triggerConfig?.cron?.enabled) return;
  const job = new CronJob(
    crew.triggerConfig.cron.expression,
    () => crewOrchestrator.start(crew.id, 'cron', ''),
    null, true,
    crew.triggerConfig.cron.timezone
  );
  activeJobs.set(crew.id, job);
}

// 集群取消发布或删除时销毁
function unregisterCron(crewId: number) {
  activeJobs.get(crewId)?.stop();
  activeJobs.delete(crewId);
}

// 服务启动时恢复所有已发布集群的定时任务
async onServerReady() {
  const publishedCrews = await crewRepo.find({ where: { status: 1 } });
  publishedCrews.filter(c => c.triggerConfig?.cron?.enabled).forEach(registerCron);
}

注意:项目中不存在 Midway @Schedule 装饰器。定时任务统一使用 cron npm 包的 CronJob 类,支持动态创建和销毁。

3.7 子 Agent 执行spawn 定义)

"spawn 子 Agent" 的技术实现是直接调用 runAgent() 函数runAgent() 是纯函数(非单例),接收 AgentRunParams,返回 Promise<AttemptResult>,支持并发调用。

子 Agent 执行过程(crew_delegate.ts

async function executeSubAgent(
  agent: NetaclawAgent,
  taskDescription: string,
  context: string,
  runId: number,
  callbacks: CrewCallbacks
): Promise<DelegateResult> {
  // 1. 加载子 Agent 配置
  const agentConfig = {
    name: agent.name,
    systemPrompt: agent.systemPrompt + `\n\n## 当前任务\n${taskDescription}`,
    model: agent.modelConfig.modelId,
    apiKey: agent.modelConfig.apiKey,
    skills: agent.skills || [],
  };

  // 2. 加载子 Agent 工具集(移除 delegate_task、delegate_parallel、escalate
  const tools = loadAgentTools(agent).filter(
    t => !['delegate_task', 'delegate_parallel', 'escalate'].includes(t.name)
  );

  // 3. 创建 crew_task 记录
  const task = await crewTaskRepo.save({
    runId, agentId: agent.id,
    taskDescription, status: 'running',
    startTime: new Date(),
  });

  // 4. 调用 runAgent()(纯函数,独立上下文)
  const result = await runAgent({
    agentConfig,
    tools,
    userMessage: context ? `任务:${taskDescription}\n\n上下文${context}` : taskDescription,
    history: [],  // 空历史 = 独立会话
    onToken: (text) => callbacks.onLog(task.id, agent.name, 'info', text),
    onToolCall: (name, args) => callbacks.onLog(task.id, agent.name, 'tool', `调用 ${name}`),
  });

  // 5. 更新 crew_task 记录 + WebSocket 推送
  await crewTaskRepo.update(task.id, {
    status: 'completed',
    result: { text: result.text },
    tokenUsage: result.tokens,
    endTime: new Date(),
  });

  return {
    agent: agent.name,
    status: 'completed',
    result: result.text,
    duration: `${(Date.now() - task.startTime.getTime()) / 1000}s`,
    tokenUsage: result.tokens,
  };
}

超时控制:使用 Promise.race() 包裹 runAgent() 调用,超时时间取 crew_task.timeout ?? crew.taskTimeout ?? 300 秒。超时后将 task 标记为 failed,错误信息返回给主 Agent。

约束清单:

  • 独立会话(history: [],不共享主 Agent 历史)
  • 聚焦 system prompt只包含该子 Agent 自身配置 + 委派任务描述)
  • 受限工具集:继承子 Agent 自身的 skills/tools但移除 delegate_taskdelegate_parallelescalate(防止无限嵌套)
  • 独立 token 计数(从 AttemptResult.tokens 获取)
  • 超时控制(Promise.race

4. 前端架构

4.1 路由与菜单

agent 模块 config.ts 新增:

{ path: '/agent/crew-editor', meta: { label: 'Agent 编排' } },
{ path: '/agent/crew-monitor', meta: { label: '运行监控' } },

同步在 base_sys_menu 插入菜单记录parentId 指向 Agent 管理的父菜单。

4.2 编排页crew-editor.vue

布局: 左侧面板 + 中央画布 + 底部属性面板

左侧面板crew-sidebar.vue

  • 已发布 Agent 列表,支持搜索
  • 拖拽 Agent 到画布添加为集群成员
  • 已在集群中的 Agent 显示勾选标记

中央画布crew-canvas.vue基于 Vue Flow

  • 自定义节点 crew-agent-node.vueAgent 图标、名称、角色标签、分组色带
  • 主 Agent 节点:金色边框 + 星标
  • 节点自由拖拽定位
  • 节点间拖拽连线Handle 在节点四边)
  • 连线自定义标签 crew-edge-label.vue:串行/并行图标 + 标注文字
  • 框选多节点 → 右键"设为分组"
  • minimap@vue-flow/minimap
  • 缩放控制(@vue-flow/controls
  • 背景网格(@vue-flow/background
  • 右键菜单:设为主 Agent / 移出集群 / 编辑 Agent / 删除连线 / 自动布局elkjs

底部属性面板crew-property-panel.vue

  • 点击节点角色描述textarea、分组名input、重试次数number
  • 点击连线:类型(串行依赖/并行建议)、标注文字
  • 点击空白:集群全局配置(名称、描述、触发配置、最大并发数)

顶部工具栏:

  • 集群选择下拉 + 新建
  • 保存(持久化 canvasData + delegateHints
  • 发布/取消发布
  • 试运行(快速触发一次,跳转监控页)

4.3 监控页crew-monitor.vue

上半部运行列表crew-run-table.vue

  • Element Plus 表格集群名、触发方式、状态彩色圆点、开始时间、耗时、进度x/y、操作
  • 状态颜色:绿=running、蓝=completed、黄=paused、红=failed、灰=pending
  • 操作按钮:暂停/恢复/终止/重试/详情
  • 筛选栏:集群、状态、时间范围
  • WebSocket 实时更新行数据

下半部运行详情crew-run-detail.vue点击行展开或抽屉

三栏布局:

  1. 画布实时视图crew-canvas.vue 只读模式)

    • 复用编排页画布组件,传入 readonly=true + liveStatus prop
    • 节点状态映射idle=灰、running=蓝+呼吸动画、completed=绿+勾、failed=红+叹号
    • 活跃连线显示流动虚线动画
  2. 任务时间线crew-timeline.vue

    • 纵向时间轴,每条 crew_task 一个节点
    • 并行任务横向并排
    • 显示时间、Agent 名、任务描述、状态图标、耗时
    • 点击跳转对应日志
  3. 日志面板crew-log-panel.vue

    • Tab 切换:主 Agent / 各子 Agent
    • 实时日志流WebSocket crew:log 事件)
    • 主 Agent tab 显示推理过程thinking
    • 子 Agent tab 显示执行日志
    • 错误高亮 + 搜索过滤

4.4 组件清单

modules/agent/
├── views/
│   ├── crew-editor.vue
│   └── crew-monitor.vue
├── components/crew/
│   ├── crew-canvas.vue           # Vue Flow 画布(编排+监控复用)
│   ├── crew-agent-node.vue       # 自定义 Agent 节点
│   ├── crew-edge-label.vue       # 自定义连线标签
│   ├── crew-sidebar.vue          # 左侧 Agent 列表
│   ├── crew-property-panel.vue   # 底部属性面板
│   ├── crew-trigger-config.vue   # 触发配置表单
│   ├── crew-run-table.vue        # 运行记录表格
│   ├── crew-run-detail.vue       # 运行详情容器
│   ├── crew-timeline.vue         # 任务时间线
│   └── crew-log-panel.vue        # 日志面板
├── hooks/
│   ├── crew-canvas.ts            # 画布操作(增删节点、连线、序列化)
│   ├── crew-orchestration.ts     # canvasData ↔ delegateHints 转换
│   └── crew-monitor.ts           # WebSocket 订阅 + 实时状态
└── store/
    └── crew.ts                   # Pinia集群列表、当前编辑、运行状态缓存

4.5 画布数据序列化

保存时将 Vue Flow 的 nodes/edges 序列化为 canvasData,同时自动生成 delegateHints

// hooks/crew-orchestration.ts
function canvasToHints(nodes, edges): DelegateHints {
  // 1. 遍历 edges按 type 分类serial/parallel
  // 2. 生成自然语言提示文本
  // 3. 返回 { hints: string, edges: EdgeInfo[] }
}

5. 权限集成

5.1 菜单权限

通过 base_sys_menu 控制页面访问:

  • agent:crew:editor — 编排页
  • agent:crew:monitor — 监控页

5.2 数据权限

netaclaw_crew 继承 BaseEntity 的 tenantId,通过 Cool Admin 的租户隔离机制实现:

  • 用户只能看到自己有权限的集群
  • 管理员可以将集群分配给指定用户/角色

5.3 操作权限

权限标识 说明
crew:create 创建集群
crew:edit 编辑集群
crew:publish 发布/取消发布
crew:trigger 手动触发运行
crew:control 暂停/恢复/终止运行
crew:view 查看监控

6. 实现边界与约束

6.1 本次实现

  • 集群 CRUD + 画布编排页完整功能
  • 运行监控页完整功能(列表 + 详情 + 实时状态)
  • 后端编排引擎CrewOrchestrator + CrewDelegate
  • delegate_task + escalate 工具
  • 手动触发 + 定时调度
  • WebSocket 实时推送
  • 基础权限集成

6.2 本次不实现(后续迭代)

  • Webhook 触发(需要签名验证体系)
  • API 触发(需要 API Key 管理体系)
  • 子 Agent 嵌套委派parentTaskId第一版只支持一层
  • 集群模板/市场(预设的电商运营集群模板)
  • 运行数据统计/报表

6.3 技术约束

  • 前端画布基于已集成的 @vue-flow/core 1.42.1 + 扩展
  • 自动布局使用已集成的 elkjs 0.9.3
  • 后端 Entity 继承 BaseEntityController 使用 @CoolController
  • 前端通过 Cool Admin service 代理调用 API
  • 菜单通过数据库配置
  • 定时调度使用 cron npm 包的 CronJob(不使用 @Schedule
  • 子 Agent 执行通过直接调用 runAgent() 纯函数实现

6.4 风险项

风险 影响 缓解措施
Vue Flow 首次使用 项目中从未实际使用过 @vue-flow/core,虽然已安装但无参考代码。画布交互开发可能遇到未知问题 优先开发画布组件作为技术验证;准备纯 SVG/Canvas 降级方案;预留 1-2 天额外时间
主 Agent 自主决策质量 主 Agent 的编排质量依赖 LLM 能力,可能出现不合理的调度决策 delegateHints 提供 soft hint 引导记录所有决策日志供人工复盘escalate 兜底
长时间运行稳定性 集群运行可能持续数十分钟,期间服务重启会丢失状态 pausedState 持久化对话上下文;启动时恢复 paused 状态的运行
并发 token 消耗 并行执行多个子 Agent 时 LLM API 成本可能很高 maxConcurrent 限制tokenUsage 跟踪;前端展示 token 消耗