# 运行时过程事件实施计划 > **给执行代理的要求:** 实施本计划时必须使用 `superpowers:subagent-driven-development`(推荐)或 `superpowers:executing-plans`。每个步骤使用 checkbox(`- [ ]`)跟踪。 **目标:** 为长耗时工具、compute-entry skill 以及未来的子代理/批处理任务增加一条 Pi 风格的运行时过程事件流,让 Agent 对话页能实时显示“执行到了哪一步”,而不是只显示最终结果或单一进度条。 **架构:** 在现有 NetaClaw 工具生命周期 `started -> result -> completed` 中增加 `process-event`。第一期 WebSocket 事件名保留为 `tool_process_event`,便于接入当前 `streamingTools` 投影;但事件 payload 采用通用 `RuntimeProcessEvent`,包含 `version`、`operationId`、`parentOperationId`、`targetType`、`toolCallId`、`taskId` 等字段,避免绑定死某一个 skill。`execute_skill` 负责把 skill 子进程输出的结构化过程事件桥接到 runtime callback;gateway 负责把事件节流后发给前端,并在 assistant metadata 中保存摘要,同时写完整 JSONL 过程日志供后端排查。 **关键边界:** - 过程事件只用于 UI 展示、过程回放和后端排查,不进入 LLM history,也不能被 `buildLLMMessages` 带回模型上下文。 - `processLogRef` 只能是逻辑引用,不能把本地绝对路径暴露给前端。 - 第一期不做完整事件总线,不做任务中心,不做完整日志查看 API,只做一个轻量 runtime helper,避免过度设计。 - 第一期只保证 `execute_skill` 过程可见;事件结构保留未来接入普通 tool、subagent、后台任务的能力。 **技术栈:** TypeScript、Midway.js WebSocket、NetaClaw runtime tools、Pinia/Vue 3、Element Plus、Jest、vue-tsc/Vite build。 --- ## 文件结构 后端: - 新增 `packages/backend/src/modules/netaclaw/runtime/process_events.ts` - 定义 `RuntimeProcessEvent`、`RuntimeProcessEventInput`。 - 负责事件归一化、`operationId` 生成、百分比计算、payload 脱敏、metadata 采样、逻辑 `processLogRef` 生成、JSONL 记录、轻量节流/去重。 - 注意:这是轻量 helper,不是完整 EventBus。 - 修改 `packages/backend/src/modules/netaclaw/tools/common.ts` - 增加 `ToolProcessUpdateCallback`。 - 扩展 `AgentTool.execute()`,增加可选第三参数 `onUpdate`。 - 修改 `packages/backend/src/modules/netaclaw/runtime/attempt.ts` - 增加 `onToolProcessEvent`。 - 调用 tool 时传入 `emitProcessEvent`。 - 修改 `packages/backend/src/modules/netaclaw/runtime/agent.ts` - 把 `onToolProcessEvent` 从 `runAgent` 透传到 `runAttempt`。 - 修改 `packages/backend/src/modules/netaclaw/service/skill_executor.ts` - 只解析显式 `type: "process_event"` 的 JSON 行。 - 保持普通 stderr 原样进后端日志。 - timeout、非 0 退出、spawn error、最终 JSON 解析失败时,发终态失败过程事件。 - 修改 `packages/backend/src/modules/netaclaw/tools/builtin/execute_skill.ts` - 把 skill 过程事件桥接成 tool process update。 - 修改 `packages/backend/src/modules/netaclaw/gateway/protocol.ts` - 增加 `ServerToolProcessEvent`。 - 修改 `packages/backend/src/modules/netaclaw/gateway/server.ts` - 接收 runtime process event。 - 发 `tool_process_event` 给前端。 - 在 `skillExecutions` metadata 中保存 `latestProcessEvent`、采样后的 `processEvents`、逻辑 `processLogRef`。 - 不把完整本地文件路径放入前端 payload。 后端测试: - 新增 `packages/backend/test/tool_process_events.test.ts` - 新增 `packages/backend/test/skill_process_events.test.ts` - 新增 `packages/backend/test/process_event_buffer.test.ts` 前端: - 修改 `packages/frontend/src/modules/agent/types/index.d.ts` - 增加 `RuntimeProcessEventProjection`。 - 给 `StreamingToolProjection` 增加 `latestProcessEvent`、`processEvents`、`processLogRef`。 - 给 `WSServerEvent.type` 增加 `tool_process_event`。 - 修改 `packages/frontend/src/modules/agent/store/chat.ts` - 处理 live `tool_process_event`。 - 从 assistant metadata 恢复过程事件摘要。 - 按 `toolCallId` 合并到对应 `streamingTool`。 - 新增 `packages/frontend/src/modules/agent/components/tool-process-timeline.vue` - 独立渲染过程时间线。 - 修改 `packages/frontend/src/modules/agent/components/skill-card.vue` - 展示最新过程事件。 - 只有存在 numeric percent 时才显示进度条。 - 嵌入 `tool-process-timeline.vue`。 - 修改实际渲染 `streamingTools` 的父组件或 renderer - 明确传入 `latestProcessEvent`、`processEvents`、`processLogRef`。 --- ## Task 1:后端运行时过程事件类型与工具回调契约 **文件:** - 新增:`packages/backend/src/modules/netaclaw/runtime/process_events.ts` - 修改:`packages/backend/src/modules/netaclaw/tools/common.ts` - 修改:`packages/backend/src/modules/netaclaw/runtime/attempt.ts` - 修改:`packages/backend/src/modules/netaclaw/runtime/agent.ts` - 测试:`packages/backend/test/tool_process_events.test.ts` - [ ] **Step 1:先写失败测试** 创建 `packages/backend/test/tool_process_events.test.ts`: ```ts import { Type } from '@sinclair/typebox'; import { runAttempt } from '../src/modules/netaclaw/runtime/attempt'; import type { AnyAgentTool } from '../src/modules/netaclaw/tools/common'; jest.mock('../src/modules/netaclaw/runtime/model_selection', () => ({ getProvider: () => ({ chat: jest .fn() .mockResolvedValueOnce({ content: '', usage: { inputTokens: 1, outputTokens: 1 }, toolCalls: [{ id: 'call-1', name: 'slow_tool', arguments: '{"taskId":"task-1"}' }], }) .mockResolvedValueOnce({ content: 'done', usage: { inputTokens: 1, outputTokens: 1 }, toolCalls: [], }), }), })); describe('runtime process events', () => { it('从执行中的 tool 发出带 toolCallId 和 operationId 的过程事件', async () => { const updates: any[] = []; const tool: AnyAgentTool = { name: 'slow_tool', label: 'Slow Tool', description: 'test tool', parameters: Type.Object({ taskId: Type.String() }), async execute(_id, _params, onUpdate) { onUpdate?.({ source: 'tool', targetType: 'tool', stage: 'prepare', status: 'running', message: 'Preparing work', current: 1, total: 2, }); return { type: 'text', text: 'ok' }; }, }; await runAttempt({ messages: [{ role: 'user', content: 'run' }], tools: [tool], config: { provider: 'openai', model: 'test', apiKey: 'test' }, onToolProcessEvent: event => updates.push(event), }); expect(updates).toEqual([ expect.objectContaining({ toolCallId: 'call-1', operationId: 'tool-call-1', toolName: 'slow_tool', args: { taskId: 'task-1' }, event: expect.objectContaining({ version: 1, operationId: 'tool-call-1', targetType: 'tool', source: 'tool', stage: 'prepare', status: 'running', message: 'Preparing work', current: 1, total: 2, percent: 50, }), }), ]); }); }); ``` - [ ] **Step 2:运行测试,确认失败** ```bash pnpm --filter @neta/backend test -- --runInBand test/tool_process_events.test.ts ``` 预期:失败,因为 `RuntimeProcessEvent`、`onToolProcessEvent`、`onUpdate` 还不存在。 - [ ] **Step 3:新增运行时过程事件 helper** 创建 `packages/backend/src/modules/netaclaw/runtime/process_events.ts`: ```ts import { randomUUID } from 'crypto'; import * as fs from 'fs'; import * as path from 'path'; import { pDataPath } from '../../../comm/path.js'; export type RuntimeProcessTargetType = 'tool' | 'skill' | 'subagent' | 'task' | 'runtime'; export type RuntimeProcessSource = 'skill' | 'tool' | 'subagent' | 'runtime'; export type RuntimeProcessStatus = 'queued' | 'started' | 'running' | 'completed' | 'failed' | 'cancelled' | 'skipped'; export type RuntimeProcessLevel = 'debug' | 'info' | 'warning' | 'error'; export interface RuntimeProcessEvent { version: 1; operationId: string; parentOperationId?: string; targetType: RuntimeProcessTargetType; source: RuntimeProcessSource; taskId?: string; stage?: string; status: RuntimeProcessStatus; level: RuntimeProcessLevel; message: string; detail?: string; current?: number; total?: number; percent?: number; payload?: Record; timestamp: string; sequence?: number; } export type RuntimeProcessEventInput = Partial> & { operationId?: string; parentOperationId?: string; targetType?: RuntimeProcessTargetType; source?: RuntimeProcessSource; status?: RuntimeProcessStatus; level?: RuntimeProcessLevel; message: string; timestamp?: string; }; export interface RuntimeProcessLogRef { kind: 'runtime_process_log'; sessionId: string; operationId: string; toolCallId?: string; } const SENSITIVE_KEY_PATTERN = /(api[_-]?key|token|password|secret|authorization|credential)/i; const MAX_PAYLOAD_STRING_LENGTH = 500; export function buildOperationId(toolCallId?: string): string { return toolCallId ? `tool-${toolCallId}` : `op-${randomUUID()}`; } export function normalizeRuntimeProcessEvent(input: RuntimeProcessEventInput): RuntimeProcessEvent { const total = typeof input.total === 'number' ? input.total : undefined; const current = typeof input.current === 'number' ? input.current : undefined; const percent = typeof input.percent === 'number' ? clampPercent(input.percent) : total && current !== undefined ? clampPercent(Math.round((current / total) * 100)) : undefined; return { version: 1, operationId: input.operationId || buildOperationId(), parentOperationId: input.parentOperationId, targetType: input.targetType || 'runtime', source: input.source || 'runtime', taskId: input.taskId, stage: input.stage, status: input.status || 'running', level: input.level || 'info', message: input.message, detail: input.detail, current, total, percent, payload: sanitizeProcessPayload(input.payload), timestamp: input.timestamp || new Date().toISOString(), sequence: input.sequence, }; } export function sanitizeProcessPayload(payload: unknown): Record | undefined { if (!payload || typeof payload !== 'object' || Array.isArray(payload)) return undefined; const output: Record = {}; for (const [key, value] of Object.entries(payload as Record)) { if (SENSITIVE_KEY_PATTERN.test(key)) { output[key] = '[redacted]'; } else if (typeof value === 'string') { output[key] = value.length > MAX_PAYLOAD_STRING_LENGTH ? `${value.slice(0, MAX_PAYLOAD_STRING_LENGTH)}...` : value; } else if (typeof value === 'number' || typeof value === 'boolean' || value === null) { output[key] = value; } else { output[key] = '[object]'; } } return Object.keys(output).length > 0 ? output : undefined; } export function sampleProcessEventsForMetadata(events: T[], limit = 50): T[] { if (events.length <= limit) return events; const headCount = Math.min(10, Math.floor(limit / 5)); return [ ...events.slice(0, headCount), ...events.slice(-(limit - headCount)), ]; } export function buildProcessLogRef(input: { sessionId: string; operationId: string; toolCallId?: string; }): RuntimeProcessLogRef { return { kind: 'runtime_process_log', sessionId: input.sessionId, operationId: input.operationId, toolCallId: input.toolCallId, }; } export function appendRuntimeProcessLog(input: { sessionId: string; operationId: string; toolCallId?: string; event: RuntimeProcessEvent; }): RuntimeProcessLogRef { const dir = path.join(pDataPath(), 'netaclaw-process-events', input.sessionId); fs.mkdirSync(dir, { recursive: true }); const filename = `${safeFilePart(input.operationId)}.jsonl`; fs.appendFileSync(path.join(dir, filename), `${JSON.stringify(input.event)}\n`, 'utf8'); return buildProcessLogRef(input); } function clampPercent(value: number): number { return Math.max(0, Math.min(100, value)); } function safeFilePart(value: string): string { return value.replace(/[^a-zA-Z0-9_.-]/g, '_').slice(0, 120); } ``` - [ ] **Step 4:扩展 tool execute 契约** 修改 `packages/backend/src/modules/netaclaw/tools/common.ts`。 增加 import: ```ts import type { RuntimeProcessEventInput } from '../runtime/process_events.js'; ``` 增加类型: ```ts export type ToolProcessUpdateCallback = (event: RuntimeProcessEventInput) => void; ``` 把 `AgentTool.execute` 改为: ```ts execute(id: string, params: Static, onUpdate?: ToolProcessUpdateCallback): Promise; ``` - [ ] **Step 5:在 attempt 中桥接 tool update** 修改 `packages/backend/src/modules/netaclaw/runtime/attempt.ts`。 增加 import: ```ts import { AnyAgentTool, ToolResultContent, toolResultToText } from '../tools/common.js'; import { RuntimeProcessEvent, RuntimeProcessEventInput, buildOperationId, normalizeRuntimeProcessEvent } from './process_events.js'; ``` 给 `AttemptParams` 增加: ```ts onToolProcessEvent?: (input: { toolCallId: string; operationId: string; toolName: string; args: Record; event: RuntimeProcessEvent; }) => void; ``` 在 `runAttempt()` 中解构 `onToolProcessEvent`。 调用 `tool.execute` 前增加: ```ts const operationId = buildOperationId(tc.id); const emitProcessEvent = (event: RuntimeProcessEventInput) => { const normalized = normalizeRuntimeProcessEvent({ operationId, targetType: event.targetType || 'tool', source: event.source || 'tool', ...event, }); onToolProcessEvent?.({ toolCallId: tc.id, operationId, toolName: tc.name, args: effectiveArgs, event: normalized, }); }; ``` 把: ```ts const result = await tool.execute(tc.id, effectiveArgs); ``` 改为: ```ts const result = await tool.execute(tc.id, effectiveArgs, emitProcessEvent); ``` - [ ] **Step 6:在 agent 中透传 callback** 修改 `packages/backend/src/modules/netaclaw/runtime/agent.ts`。 增加 import: ```ts import type { RuntimeProcessEvent } from './process_events.js'; ``` 给 `AgentRunParams` 增加: ```ts onToolProcessEvent?: (input: { toolCallId: string; operationId: string; toolName: string; args: Record; event: RuntimeProcessEvent; }) => void; ``` 在 `runAttempt()` 参数中增加: ```ts onToolProcessEvent: params.onToolProcessEvent, ``` - [ ] **Step 7:运行测试** ```bash pnpm --filter @neta/backend test -- --runInBand test/tool_process_events.test.ts ``` 预期:PASS。 --- ## Task 2:Skill 子进程过程事件解析 **文件:** - 修改:`packages/backend/src/modules/netaclaw/service/skill_executor.ts` - 测试:`packages/backend/test/skill_process_events.test.ts` - [ ] **Step 1:先写失败测试** 创建 `packages/backend/test/skill_process_events.test.ts`: ```ts import { normalizeSkillProcessLine } from '../src/modules/netaclaw/service/skill_executor'; describe('skill process event parsing', () => { it('只解析显式 type=process_event 的 JSON 行', () => { const event = normalizeSkillProcessLine( 'vehicle-damage-inspection', JSON.stringify({ type: 'process_event', time: '2026-05-06T04:07:17.187Z', stage: 'frames', message: 'frames extracted', taskId: 'inspection-1', frameCount: 361, duration: 120.38, resolution: '960x544', }), 7, ); expect(event).toEqual({ source: 'skill', targetType: 'skill', taskId: 'inspection-1', stage: 'frames', status: 'running', level: 'info', message: 'frames extracted', timestamp: '2026-05-06T04:07:17.187Z', sequence: 7, payload: { frameCount: 361, duration: 120.38, resolution: '960x544', }, }); }); it('从 batch/totalBatches 推导 current total percent', () => { const event = normalizeSkillProcessLine( 'vehicle-damage-inspection', JSON.stringify({ type: 'process_event', time: '2026-05-06T04:08:11.443Z', stage: 'detect', message: 'batch completed', batch: 3, totalBatches: 8, damageCount: 3, }), 8, ); expect(event).toEqual(expect.objectContaining({ source: 'skill', targetType: 'skill', stage: 'detect', message: 'batch completed', current: 3, total: 8, percent: 38, payload: expect.objectContaining({ damageCount: 3 }), })); }); it('忽略普通 stderr', () => { expect(normalizeSkillProcessLine('demo', 'plain stderr', 1)).toBeNull(); }); it('忽略没有 process_event 标记的普通 JSON', () => { expect(normalizeSkillProcessLine('demo', '{"stage":"frames","message":"too broad"}', 1)).toBeNull(); }); }); ``` - [ ] **Step 2:运行测试,确认失败** ```bash pnpm --filter @neta/backend test -- --runInBand test/skill_process_events.test.ts ``` 预期:失败,因为 `normalizeSkillProcessLine` 还不存在。 - [ ] **Step 3:实现解析器** 在 `packages/backend/src/modules/netaclaw/service/skill_executor.ts` 中增加 import: ```ts import type { RuntimeProcessEventInput } from '../runtime/process_events.js'; ``` 扩展 `SkillExecuteParams`: ```ts toolCallId?: string; onProcessEvent?: (event: RuntimeProcessEventInput) => void; ``` 在 `SkillExecutorService` 之前增加: ```ts const RESERVED_PROCESS_KEYS = new Set([ 'type', 'time', 'timestamp', 'stage', 'message', 'taskId', 'level', 'status', 'current', 'total', 'percent', 'batch', 'totalBatches', ]); export function normalizeSkillProcessLine( _skillName: string, line: string, sequence: number, ): RuntimeProcessEventInput | null { let parsed: Record; try { parsed = JSON.parse(line); } catch { return null; } if (!parsed || typeof parsed !== 'object' || Array.isArray(parsed)) return null; if (parsed.type !== 'process_event') return null; if (typeof parsed.message !== 'string' && typeof parsed.stage !== 'string') return null; const current = typeof parsed.current === 'number' ? parsed.current : typeof parsed.batch === 'number' ? parsed.batch : undefined; const total = typeof parsed.total === 'number' ? parsed.total : typeof parsed.totalBatches === 'number' ? parsed.totalBatches : undefined; const percent = typeof parsed.percent === 'number' ? parsed.percent : total && current !== undefined ? Math.max(0, Math.min(100, Math.round((current / total) * 100))) : undefined; const payload: Record = {}; for (const [key, value] of Object.entries(parsed)) { if (!RESERVED_PROCESS_KEYS.has(key)) payload[key] = value; } return { source: 'skill', targetType: 'skill', taskId: typeof parsed.taskId === 'string' ? parsed.taskId : undefined, stage: typeof parsed.stage === 'string' ? parsed.stage : undefined, status: isProcessStatus(parsed.status) ? parsed.status : 'running', level: isProcessLevel(parsed.level) ? parsed.level : 'info', message: typeof parsed.message === 'string' ? parsed.message : String(parsed.stage || 'running'), current, total, percent, timestamp: typeof parsed.time === 'string' ? parsed.time : typeof parsed.timestamp === 'string' ? parsed.timestamp : new Date().toISOString(), sequence, payload: Object.keys(payload).length > 0 ? payload : undefined, }; } function isProcessStatus(value: unknown): value is RuntimeProcessEventInput['status'] { return value === 'queued' || value === 'started' || value === 'running' || value === 'completed' || value === 'failed' || value === 'cancelled' || value === 'skipped'; } function isProcessLevel(value: unknown): value is RuntimeProcessEventInput['level'] { return value === 'debug' || value === 'info' || value === 'warning' || value === 'error'; } ``` - [ ] **Step 4:从 stderr 中提取过程事件** 在 `execute()` 中增加: ```ts let processSequence = 0; ``` 在 stderr line loop 中 logger 后增加: ```ts const processEvent = normalizeSkillProcessLine(skillName, line, ++processSequence); if (processEvent) { params.onProcessEvent?.(processEvent); } ``` - [ ] **Step 5:补失败终态事件** timeout 时发: ```ts params.onProcessEvent?.({ source: 'runtime', targetType: 'skill', taskId: typeof input.taskId === 'string' ? input.taskId : undefined, stage: 'skill-timeout', status: 'failed', level: 'error', message: `Execution timed out after ${timeout}ms`, }); ``` 非 0 退出、spawn error、stdout JSON 解析失败时同样发 `status: 'failed'`,`source: 'runtime'`,`targetType: 'skill'`。 - [ ] **Step 6:运行测试** ```bash pnpm --filter @neta/backend test -- --runInBand test/skill_process_events.test.ts ``` 预期:PASS。 --- ## Task 3:execute_skill 桥接 skill 过程事件 **文件:** - 修改:`packages/backend/src/modules/netaclaw/tools/builtin/execute_skill.ts` - 测试:扩展 `packages/backend/test/tool_process_events.test.ts` - [ ] **Step 1:增加失败测试** 在 `packages/backend/test/tool_process_events.test.ts` 追加: ```ts import { createExecuteSkillTool } from '../src/modules/netaclaw/tools/builtin/execute_skill'; describe('execute_skill process bridge', () => { it('把 skill 过程事件转发给 tool update callback', async () => { const executor = { execute: jest.fn(async (params: any) => { params.onProcessEvent({ source: 'skill', targetType: 'skill', taskId: 'task-1', stage: 'frames', status: 'running', message: 'extracting frames', current: 1, total: 4, }); return { success: true, output: { ok: true }, duration: 10 }; }), } as any; const updates: any[] = []; const tool = createExecuteSkillTool(executor); await tool.execute( 'call-skill-1', { name: 'vehicle-damage-inspection', input: { taskId: 'task-1' } }, event => updates.push(event), ); expect(executor.execute).toHaveBeenCalledWith(expect.objectContaining({ skillName: 'vehicle-damage-inspection', input: { taskId: 'task-1' }, toolCallId: 'call-skill-1', })); expect(updates).toEqual([ expect.objectContaining({ source: 'skill', targetType: 'skill', taskId: 'task-1', stage: 'frames', message: 'extracting frames', current: 1, total: 4, percent: 25, }), ]); }); }); ``` - [ ] **Step 2:运行测试,确认失败** ```bash pnpm --filter @neta/backend test -- --runInBand test/tool_process_events.test.ts ``` - [ ] **Step 3:修改 execute_skill** 在 `packages/backend/src/modules/netaclaw/tools/builtin/execute_skill.ts` 中把 execute 改成接收第三参数: ```ts async execute(id, params, onUpdate) { ``` 调用 executor 时改为: ```ts const result = await executor.execute({ skillName: params.name, input: params.input as Record, toolCallId: id, onProcessEvent: event => { const total = typeof event.total === 'number' ? event.total : undefined; const current = typeof event.current === 'number' ? event.current : undefined; onUpdate?.({ ...event, source: 'skill', targetType: event.targetType || 'skill', percent: typeof event.percent === 'number' ? event.percent : total && current !== undefined ? Math.max(0, Math.min(100, Math.round((current / total) * 100))) : undefined, }); }, }); ``` - [ ] **Step 4:运行测试** ```bash pnpm --filter @neta/backend test -- --runInBand test/tool_process_events.test.ts test/skill_process_events.test.ts ``` 预期:PASS。 --- ## Task 4:Gateway 协议、节流、metadata 摘要和 JSONL 记录 **文件:** - 修改:`packages/backend/src/modules/netaclaw/runtime/process_events.ts` - 修改:`packages/backend/src/modules/netaclaw/gateway/protocol.ts` - 修改:`packages/backend/src/modules/netaclaw/gateway/server.ts` - 测试:`packages/backend/test/process_event_buffer.test.ts` - [ ] **Step 1:写轻量 buffer 测试** 创建 `packages/backend/test/process_event_buffer.test.ts`: ```ts import { ProcessEventBuffer } from '../src/modules/netaclaw/runtime/process_events'; import type { RuntimeProcessEvent } from '../src/modules/netaclaw/runtime/process_events'; function event(overrides: Partial): RuntimeProcessEvent { return { version: 1, operationId: 'tool-call-1', targetType: 'skill', source: 'skill', status: 'running', level: 'info', message: 'batch completed', timestamp: '2026-05-06T04:00:00.000Z', ...overrides, }; } describe('ProcessEventBuffer', () => { it('合并重复过程事件,并 flush 最新事件', () => { const emitted: RuntimeProcessEvent[] = []; const buffer = new ProcessEventBuffer(e => emitted.push(e), { throttleMs: 500 }); buffer.push(event({ operationId: 'op-1', stage: 'detect', sequence: 1 })); buffer.push(event({ operationId: 'op-1', stage: 'detect', sequence: 2 })); buffer.flushAll(); expect(emitted).toHaveLength(1); expect(emitted[0]).toEqual(expect.objectContaining({ operationId: 'op-1', stage: 'detect', sequence: 2, payload: expect.objectContaining({ repeatCount: 2 }), })); }); it('终态和错误事件立即发送', () => { const emitted: RuntimeProcessEvent[] = []; const buffer = new ProcessEventBuffer(e => emitted.push(e), { throttleMs: 500 }); buffer.push(event({ operationId: 'op-1', status: 'failed', level: 'error', message: 'failed' })); expect(emitted).toHaveLength(1); expect(emitted[0]).toEqual(expect.objectContaining({ status: 'failed', message: 'failed' })); }); }); ``` - [ ] **Step 2:运行测试,确认失败** ```bash pnpm --filter @neta/backend test -- --runInBand test/process_event_buffer.test.ts ``` - [ ] **Step 3:在 process_events.ts 中增加轻量 buffer** 在 `packages/backend/src/modules/netaclaw/runtime/process_events.ts` 追加: ```ts export interface ProcessEventBufferOptions { throttleMs?: number; } interface BufferedEvent { event: RuntimeProcessEvent; repeatCount: number; timer?: NodeJS.Timeout; } export class ProcessEventBuffer { private readonly throttleMs: number; private readonly pending = new Map(); constructor( private readonly emit: (event: RuntimeProcessEvent) => void, options: ProcessEventBufferOptions = {}, ) { this.throttleMs = options.throttleMs ?? 400; } push(event: RuntimeProcessEvent) { if (isTerminalProcessEvent(event)) { this.flushOperation(event.operationId); this.emit(event); return; } const key = `${event.operationId}:${event.stage || ''}:${event.message}`; const existing = this.pending.get(key); if (existing) { existing.event = event; existing.repeatCount += 1; return; } const buffered: BufferedEvent = { event, repeatCount: 1, timer: setTimeout(() => this.flushKey(key), this.throttleMs), }; this.pending.set(key, buffered); } flushAll() { for (const key of [...this.pending.keys()]) { this.flushKey(key); } } private flushOperation(operationId: string) { for (const [key, buffered] of [...this.pending.entries()]) { if (buffered.event.operationId === operationId) { this.flushKey(key); } } } private flushKey(key: string) { const buffered = this.pending.get(key); if (!buffered) return; if (buffered.timer) clearTimeout(buffered.timer); this.pending.delete(key); this.emit({ ...buffered.event, payload: buffered.repeatCount > 1 ? { ...(buffered.event.payload || {}), repeatCount: buffered.repeatCount } : buffered.event.payload, }); } } function isTerminalProcessEvent(event: RuntimeProcessEvent): boolean { return event.status === 'completed' || event.status === 'failed' || event.status === 'cancelled' || event.status === 'skipped' || event.level === 'error'; } ``` - [ ] **Step 4:增加 WebSocket 协议** 在 `packages/backend/src/modules/netaclaw/gateway/protocol.ts` 增加: ```ts export interface ServerToolProcessEvent { type: 'tool_process_event'; sessionId: string; data: { version: 1; operationId: string; parentOperationId?: string; targetType: 'tool' | 'skill' | 'subagent' | 'task' | 'runtime'; toolCallId: string; assistantEntryId?: string; toolName: string; toolLabel?: string; source: 'skill' | 'tool' | 'subagent' | 'runtime'; taskId?: string; stage?: string; status: 'queued' | 'started' | 'running' | 'completed' | 'failed' | 'cancelled' | 'skipped'; level?: 'debug' | 'info' | 'warning' | 'error'; message: string; detail?: string; current?: number; total?: number; percent?: number; payload?: Record; timestamp: string; sequence: number; }; } ``` 把 `ServerToolProcessEvent` 加入 `ServerEvent` union。 - [ ] **Step 5:Gateway 发送 live event 并保存 metadata 摘要** 在 `packages/backend/src/modules/netaclaw/gateway/server.ts` import: ```ts import { ProcessEventBuffer, RuntimeProcessEvent, RuntimeProcessLogRef, appendRuntimeProcessLog, buildProcessLogRef, sampleProcessEventsForMetadata, } from '../runtime/process_events.js'; ``` 扩展 `toolExecutions` item: ```ts processEvents?: RuntimeProcessEvent[]; processLogRef?: RuntimeProcessLogRef; ``` 在 `buildToolExecutionMetadata()` 的每个 item 中增加: ```ts latestProcessEvent: te.processEvents?.at(-1), processEvents: sampleProcessEventsForMetadata(te.processEvents || [], 50), processLogRef: te.processLogRef, ``` 在 runner 内创建 buffer: ```ts const processEventBuffer = new ProcessEventBuffer(event => { this.send({ type: 'tool_process_event', sessionId: sid, data: event as any, }); }); ``` 给 `runAgent` 增加: ```ts onToolProcessEvent: ({ toolCallId, operationId, toolName, event }) => { const projectionId = toolCallId || randomUUID(); const te = [...toolExecutions].reverse().find(t => t.toolCallId === projectionId) || [...toolExecutions].reverse().find(t => t.name === toolName && t.status === 'running'); const sequence = (te?.processEvents?.length || 0) + 1; const processEvent: RuntimeProcessEvent = { ...event, sequence, }; if (te) { te.processEvents = [...(te.processEvents || []), processEvent].slice(-50); te.processLogRef = appendRuntimeProcessLog({ sessionId: sid, operationId: processEvent.operationId || operationId, toolCallId: projectionId, event: processEvent, }); } processEventBuffer.push({ ...processEvent, payload: processEvent.payload, } as RuntimeProcessEvent & { toolCallId: string; assistantEntryId?: string; toolName: string; toolLabel?: string; }); }, ``` 注意:如果 TypeScript 不接受扩展字段,给发送 payload 单独构造对象,不要污染 `RuntimeProcessEvent` 基础类型。 - [ ] **Step 6:失败时发终态事件** 在 `failRunningTools()` 中为每个 running tool 追加 `status: 'failed'` 的过程事件,并调用 `processEventBuffer.push(...)`。 逻辑引用使用: ```ts te.processLogRef = buildProcessLogRef({ sessionId: sid, operationId: `tool-${te.toolCallId}`, toolCallId: te.toolCallId, }); ``` 不要把本地 JSONL 绝对路径写进 metadata。 - [ ] **Step 7:flush pending events** 在 `runAgent` 正常完成后、构建 final metadata 前调用: ```ts processEventBuffer.flushAll(); ``` 在 catch 中 `failRunningTools(...)` 后也调用。 - [ ] **Step 8:运行测试和构建** ```bash pnpm --filter @neta/backend test -- --runInBand test/tool_process_events.test.ts test/skill_process_events.test.ts test/process_event_buffer.test.ts pnpm --filter @neta/backend build ``` 预期:PASS。 --- ## Task 5:前端类型和 chat store 投影 **文件:** - 修改:`packages/frontend/src/modules/agent/types/index.d.ts` - 修改:`packages/frontend/src/modules/agent/store/chat.ts` - [ ] **Step 1:增加前端类型** 在 `packages/frontend/src/modules/agent/types/index.d.ts` 增加: ```ts export interface RuntimeProcessLogRef { kind: 'runtime_process_log'; sessionId: string; operationId: string; toolCallId?: string; } export interface RuntimeProcessEventProjection { version: 1; operationId: string; parentOperationId?: string; targetType: 'tool' | 'skill' | 'subagent' | 'task' | 'runtime'; source: 'skill' | 'tool' | 'subagent' | 'runtime'; taskId?: string; stage?: string; status: 'queued' | 'started' | 'running' | 'completed' | 'failed' | 'cancelled' | 'skipped'; level?: 'debug' | 'info' | 'warning' | 'error'; message: string; detail?: string; current?: number; total?: number; percent?: number; payload?: Record; timestamp: string; sequence: number; } ``` 给 `StreamingToolProjection` 增加: ```ts latestProcessEvent?: RuntimeProcessEventProjection; processEvents?: RuntimeProcessEventProjection[]; processLogRef?: RuntimeProcessLogRef; ``` 给 `WSServerEvent.type` 增加 `'tool_process_event'`。 - [ ] **Step 2:从 metadata 恢复过程事件** 在 `packages/frontend/src/modules/agent/store/chat.ts` 的 `toStreamingToolsFromEntry()` 中,每个 projected tool 增加: ```ts latestProcessEvent: normalizeToolProcessEvent(item.latestProcessEvent), processEvents: normalizeToolProcessEvents(item.processEvents), processLogRef: normalizeProcessLogRef(item.processLogRef), ``` 增加 helper: ```ts function normalizeToolProcessEvents(value: unknown): import('../types/index.d').RuntimeProcessEventProjection[] | undefined { if (!Array.isArray(value)) return undefined; const events = value .map(normalizeToolProcessEvent) .filter((item): item is import('../types/index.d').RuntimeProcessEventProjection => Boolean(item)); return events.length > 0 ? events.slice(-50) : undefined; } function normalizeToolProcessEvent(value: unknown): import('../types/index.d').RuntimeProcessEventProjection | undefined { if (!value || typeof value !== 'object' || Array.isArray(value)) return undefined; const record = value as Record; if (typeof record.message !== 'string') return undefined; return { version: 1, operationId: typeof record.operationId === 'string' ? record.operationId : `legacy-${record.toolCallId || record.sequence || Date.now()}`, parentOperationId: typeof record.parentOperationId === 'string' ? record.parentOperationId : undefined, targetType: ['tool', 'skill', 'subagent', 'task', 'runtime'].includes(record.targetType) ? record.targetType : 'tool', source: ['skill', 'tool', 'subagent', 'runtime'].includes(record.source) ? record.source : 'tool', taskId: typeof record.taskId === 'string' ? record.taskId : undefined, stage: typeof record.stage === 'string' ? record.stage : undefined, status: ['queued', 'started', 'running', 'completed', 'failed', 'cancelled', 'skipped'].includes(record.status) ? record.status : 'running', level: ['debug', 'info', 'warning', 'error'].includes(record.level) ? record.level : undefined, message: record.message, detail: typeof record.detail === 'string' ? record.detail : undefined, current: typeof record.current === 'number' ? record.current : undefined, total: typeof record.total === 'number' ? record.total : undefined, percent: typeof record.percent === 'number' ? record.percent : undefined, payload: record.payload && typeof record.payload === 'object' && !Array.isArray(record.payload) ? record.payload : undefined, timestamp: typeof record.timestamp === 'string' ? record.timestamp : new Date().toISOString(), sequence: typeof record.sequence === 'number' ? record.sequence : 0, }; } function normalizeProcessLogRef(value: unknown): import('../types/index.d').RuntimeProcessLogRef | undefined { if (!value || typeof value !== 'object' || Array.isArray(value)) return undefined; const record = value as Record; if (record.kind !== 'runtime_process_log') return undefined; if (typeof record.sessionId !== 'string' || typeof record.operationId !== 'string') return undefined; return { kind: 'runtime_process_log', sessionId: record.sessionId, operationId: record.operationId, toolCallId: typeof record.toolCallId === 'string' ? record.toolCallId : undefined, }; } ``` - [ ] **Step 3:处理 live tool_process_event** 在 `handleWSEvent()` 中增加: ```ts case 'tool_process_event': if (event.data?.toolCallId) { const processEvent = normalizeToolProcessEvent(event.data); if (processEvent) { const existing = streamingTools.value.find(item => item.toolCallId === event.data.toolCallId); const processEvents = [...(existing?.processEvents || []), processEvent].slice(-50); upsertStreamingTool({ toolCallId: event.data.toolCallId, assistantEntryId: event.data.assistantEntryId, toolName: event.data.toolName, label: event.data.toolLabel, status: existing?.status || 'running', args: existing?.args, result: existing?.result, rawResult: existing?.rawResult, runtimeRoute: existing?.runtimeRoute, workerRoutingHint: existing?.workerRoutingHint, blockedReason: existing?.blockedReason, policySources: existing?.policySources, processEvents, latestProcessEvent: processEvent, processLogRef: existing?.processLogRef, }); } } break; ``` - [ ] **Step 4:运行前端类型检查** ```bash pnpm --filter @neta/frontend type-check ``` 预期:PASS。如果存在无关旧错误,记录下来,并在最终执行 `pnpm --filter @neta/frontend build`。 --- ## Task 6:前端过程时间线 UI **文件:** - 新增:`packages/frontend/src/modules/agent/components/tool-process-timeline.vue` - 修改:`packages/frontend/src/modules/agent/components/skill-card.vue` - 修改:实际渲染 `streamingTools` 的父组件或 renderer - [ ] **Step 1:新增独立时间线组件** 创建 `packages/frontend/src/modules/agent/components/tool-process-timeline.vue`: ```vue ``` - [ ] **Step 2:修改 skill-card** 在 `skill-card.vue` 中增加: ```ts import type { RuntimeProcessEventProjection } from '../types/index.d'; import ToolProcessTimeline from './tool-process-timeline.vue'; ``` props 增加: ```ts latestProcessEvent?: RuntimeProcessEventProjection; processEvents?: RuntimeProcessEventProjection[]; ``` 把 running 区域改为: ```vue
{{ latestProcessEvent?.message || step || detail }} {{ latestProcessEvent.detail }}
``` 增加样式: ```scss .skill-card__detail { display: block; margin-top: 2px; font-size: 12px; color: var(--el-text-color-placeholder); line-height: 1.45; } ``` - [ ] **Step 3:确认父组件传参** 搜索: ```bash rg -n "