GPU_GUARD_MONOREPO/docs/superpowers/plans/2026-05-06-tool-process-events.md
2026-05-20 21:39:12 +08:00

1548 lines
45 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 运行时过程事件实施计划
> **给执行代理的要求:** 实施本计划时必须使用 `superpowers:subagent-driven-development`(推荐)或 `superpowers:executing-plans`。每个步骤使用 checkbox`- [ ]`)跟踪。
**目标:** 为长耗时工具、compute-entry skill 以及未来的子代理/批处理任务增加一条 Pi 风格的运行时过程事件流,让 Agent 对话页能实时显示“执行到了哪一步”,而不是只显示最终结果或单一进度条。
**架构:** 在现有 NetaClaw 工具生命周期 `started -> result -> completed` 中增加 `process-event`。第一期 WebSocket 事件名保留为 `tool_process_event`,便于接入当前 `streamingTools` 投影;但事件 payload 采用通用 `RuntimeProcessEvent`,包含 `version``operationId``parentOperationId``targetType``toolCallId``taskId` 等字段,避免绑定死某一个 skill。`execute_skill` 负责把 skill 子进程输出的结构化过程事件桥接到 runtime callbackgateway 负责把事件节流后发给前端,并在 assistant metadata 中保存摘要,同时写完整 JSONL 过程日志供后端排查。
**关键边界:**
- 过程事件只用于 UI 展示、过程回放和后端排查,不进入 LLM history也不能被 `buildLLMMessages` 带回模型上下文。
- `processLogRef` 只能是逻辑引用,不能把本地绝对路径暴露给前端。
- 第一期不做完整事件总线,不做任务中心,不做完整日志查看 API只做一个轻量 runtime helper避免过度设计。
- 第一期只保证 `execute_skill` 过程可见;事件结构保留未来接入普通 tool、subagent、后台任务的能力。
**技术栈:** TypeScript、Midway.js WebSocket、NetaClaw runtime tools、Pinia/Vue 3、Element Plus、Jest、vue-tsc/Vite build。
---
## 文件结构
后端:
- 新增 `packages/backend/src/modules/netaclaw/runtime/process_events.ts`
- 定义 `RuntimeProcessEvent``RuntimeProcessEventInput`
- 负责事件归一化、`operationId` 生成、百分比计算、payload 脱敏、metadata 采样、逻辑 `processLogRef` 生成、JSONL 记录、轻量节流/去重。
- 注意:这是轻量 helper不是完整 EventBus。
- 修改 `packages/backend/src/modules/netaclaw/tools/common.ts`
- 增加 `ToolProcessUpdateCallback`
- 扩展 `AgentTool.execute()`,增加可选第三参数 `onUpdate`
- 修改 `packages/backend/src/modules/netaclaw/runtime/attempt.ts`
- 增加 `onToolProcessEvent`
- 调用 tool 时传入 `emitProcessEvent`
- 修改 `packages/backend/src/modules/netaclaw/runtime/agent.ts`
-`onToolProcessEvent``runAgent` 透传到 `runAttempt`
- 修改 `packages/backend/src/modules/netaclaw/service/skill_executor.ts`
- 只解析显式 `type: "process_event"` 的 JSON 行。
- 保持普通 stderr 原样进后端日志。
- timeout、非 0 退出、spawn error、最终 JSON 解析失败时,发终态失败过程事件。
- 修改 `packages/backend/src/modules/netaclaw/tools/builtin/execute_skill.ts`
- 把 skill 过程事件桥接成 tool process update。
- 修改 `packages/backend/src/modules/netaclaw/gateway/protocol.ts`
- 增加 `ServerToolProcessEvent`
- 修改 `packages/backend/src/modules/netaclaw/gateway/server.ts`
- 接收 runtime process event。
-`tool_process_event` 给前端。
-`skillExecutions` metadata 中保存 `latestProcessEvent`、采样后的 `processEvents`、逻辑 `processLogRef`
- 不把完整本地文件路径放入前端 payload。
后端测试:
- 新增 `packages/backend/test/tool_process_events.test.ts`
- 新增 `packages/backend/test/skill_process_events.test.ts`
- 新增 `packages/backend/test/process_event_buffer.test.ts`
前端:
- 修改 `packages/frontend/src/modules/agent/types/index.d.ts`
- 增加 `RuntimeProcessEventProjection`
-`StreamingToolProjection` 增加 `latestProcessEvent``processEvents``processLogRef`
-`WSServerEvent.type` 增加 `tool_process_event`
- 修改 `packages/frontend/src/modules/agent/store/chat.ts`
- 处理 live `tool_process_event`
- 从 assistant metadata 恢复过程事件摘要。
-`toolCallId` 合并到对应 `streamingTool`
- 新增 `packages/frontend/src/modules/agent/components/tool-process-timeline.vue`
- 独立渲染过程时间线。
- 修改 `packages/frontend/src/modules/agent/components/skill-card.vue`
- 展示最新过程事件。
- 只有存在 numeric percent 时才显示进度条。
- 嵌入 `tool-process-timeline.vue`
- 修改实际渲染 `streamingTools` 的父组件或 renderer
- 明确传入 `latestProcessEvent``processEvents``processLogRef`
---
## Task 1后端运行时过程事件类型与工具回调契约
**文件:**
- 新增:`packages/backend/src/modules/netaclaw/runtime/process_events.ts`
- 修改:`packages/backend/src/modules/netaclaw/tools/common.ts`
- 修改:`packages/backend/src/modules/netaclaw/runtime/attempt.ts`
- 修改:`packages/backend/src/modules/netaclaw/runtime/agent.ts`
- 测试:`packages/backend/test/tool_process_events.test.ts`
- [ ] **Step 1先写失败测试**
创建 `packages/backend/test/tool_process_events.test.ts`
```ts
import { Type } from '@sinclair/typebox';
import { runAttempt } from '../src/modules/netaclaw/runtime/attempt';
import type { AnyAgentTool } from '../src/modules/netaclaw/tools/common';
jest.mock('../src/modules/netaclaw/runtime/model_selection', () => ({
getProvider: () => ({
chat: jest
.fn()
.mockResolvedValueOnce({
content: '',
usage: { inputTokens: 1, outputTokens: 1 },
toolCalls: [{ id: 'call-1', name: 'slow_tool', arguments: '{"taskId":"task-1"}' }],
})
.mockResolvedValueOnce({
content: 'done',
usage: { inputTokens: 1, outputTokens: 1 },
toolCalls: [],
}),
}),
}));
describe('runtime process events', () => {
it('从执行中的 tool 发出带 toolCallId 和 operationId 的过程事件', async () => {
const updates: any[] = [];
const tool: AnyAgentTool = {
name: 'slow_tool',
label: 'Slow Tool',
description: 'test tool',
parameters: Type.Object({ taskId: Type.String() }),
async execute(_id, _params, onUpdate) {
onUpdate?.({
source: 'tool',
targetType: 'tool',
stage: 'prepare',
status: 'running',
message: 'Preparing work',
current: 1,
total: 2,
});
return { type: 'text', text: 'ok' };
},
};
await runAttempt({
messages: [{ role: 'user', content: 'run' }],
tools: [tool],
config: { provider: 'openai', model: 'test', apiKey: 'test' },
onToolProcessEvent: event => updates.push(event),
});
expect(updates).toEqual([
expect.objectContaining({
toolCallId: 'call-1',
operationId: 'tool-call-1',
toolName: 'slow_tool',
args: { taskId: 'task-1' },
event: expect.objectContaining({
version: 1,
operationId: 'tool-call-1',
targetType: 'tool',
source: 'tool',
stage: 'prepare',
status: 'running',
message: 'Preparing work',
current: 1,
total: 2,
percent: 50,
}),
}),
]);
});
});
```
- [ ] **Step 2运行测试确认失败**
```bash
pnpm --filter @neta/backend test -- --runInBand test/tool_process_events.test.ts
```
预期:失败,因为 `RuntimeProcessEvent``onToolProcessEvent``onUpdate` 还不存在。
- [ ] **Step 3新增运行时过程事件 helper**
创建 `packages/backend/src/modules/netaclaw/runtime/process_events.ts`
```ts
import { randomUUID } from 'crypto';
import * as fs from 'fs';
import * as path from 'path';
import { pDataPath } from '../../../comm/path.js';
export type RuntimeProcessTargetType = 'tool' | 'skill' | 'subagent' | 'task' | 'runtime';
export type RuntimeProcessSource = 'skill' | 'tool' | 'subagent' | 'runtime';
export type RuntimeProcessStatus = 'queued' | 'started' | 'running' | 'completed' | 'failed' | 'cancelled' | 'skipped';
export type RuntimeProcessLevel = 'debug' | 'info' | 'warning' | 'error';
export interface RuntimeProcessEvent {
version: 1;
operationId: string;
parentOperationId?: string;
targetType: RuntimeProcessTargetType;
source: RuntimeProcessSource;
taskId?: string;
stage?: string;
status: RuntimeProcessStatus;
level: RuntimeProcessLevel;
message: string;
detail?: string;
current?: number;
total?: number;
percent?: number;
payload?: Record<string, unknown>;
timestamp: string;
sequence?: number;
}
export type RuntimeProcessEventInput =
Partial<Omit<RuntimeProcessEvent, 'version' | 'operationId' | 'targetType' | 'source' | 'status' | 'level' | 'message' | 'timestamp'>> & {
operationId?: string;
parentOperationId?: string;
targetType?: RuntimeProcessTargetType;
source?: RuntimeProcessSource;
status?: RuntimeProcessStatus;
level?: RuntimeProcessLevel;
message: string;
timestamp?: string;
};
export interface RuntimeProcessLogRef {
kind: 'runtime_process_log';
sessionId: string;
operationId: string;
toolCallId?: string;
}
const SENSITIVE_KEY_PATTERN = /(api[_-]?key|token|password|secret|authorization|credential)/i;
const MAX_PAYLOAD_STRING_LENGTH = 500;
export function buildOperationId(toolCallId?: string): string {
return toolCallId ? `tool-${toolCallId}` : `op-${randomUUID()}`;
}
export function normalizeRuntimeProcessEvent(input: RuntimeProcessEventInput): RuntimeProcessEvent {
const total = typeof input.total === 'number' ? input.total : undefined;
const current = typeof input.current === 'number' ? input.current : undefined;
const percent = typeof input.percent === 'number'
? clampPercent(input.percent)
: total && current !== undefined
? clampPercent(Math.round((current / total) * 100))
: undefined;
return {
version: 1,
operationId: input.operationId || buildOperationId(),
parentOperationId: input.parentOperationId,
targetType: input.targetType || 'runtime',
source: input.source || 'runtime',
taskId: input.taskId,
stage: input.stage,
status: input.status || 'running',
level: input.level || 'info',
message: input.message,
detail: input.detail,
current,
total,
percent,
payload: sanitizeProcessPayload(input.payload),
timestamp: input.timestamp || new Date().toISOString(),
sequence: input.sequence,
};
}
export function sanitizeProcessPayload(payload: unknown): Record<string, unknown> | undefined {
if (!payload || typeof payload !== 'object' || Array.isArray(payload)) return undefined;
const output: Record<string, unknown> = {};
for (const [key, value] of Object.entries(payload as Record<string, unknown>)) {
if (SENSITIVE_KEY_PATTERN.test(key)) {
output[key] = '[redacted]';
} else if (typeof value === 'string') {
output[key] = value.length > MAX_PAYLOAD_STRING_LENGTH
? `${value.slice(0, MAX_PAYLOAD_STRING_LENGTH)}...`
: value;
} else if (typeof value === 'number' || typeof value === 'boolean' || value === null) {
output[key] = value;
} else {
output[key] = '[object]';
}
}
return Object.keys(output).length > 0 ? output : undefined;
}
export function sampleProcessEventsForMetadata<T>(events: T[], limit = 50): T[] {
if (events.length <= limit) return events;
const headCount = Math.min(10, Math.floor(limit / 5));
return [
...events.slice(0, headCount),
...events.slice(-(limit - headCount)),
];
}
export function buildProcessLogRef(input: {
sessionId: string;
operationId: string;
toolCallId?: string;
}): RuntimeProcessLogRef {
return {
kind: 'runtime_process_log',
sessionId: input.sessionId,
operationId: input.operationId,
toolCallId: input.toolCallId,
};
}
export function appendRuntimeProcessLog(input: {
sessionId: string;
operationId: string;
toolCallId?: string;
event: RuntimeProcessEvent;
}): RuntimeProcessLogRef {
const dir = path.join(pDataPath(), 'netaclaw-process-events', input.sessionId);
fs.mkdirSync(dir, { recursive: true });
const filename = `${safeFilePart(input.operationId)}.jsonl`;
fs.appendFileSync(path.join(dir, filename), `${JSON.stringify(input.event)}\n`, 'utf8');
return buildProcessLogRef(input);
}
function clampPercent(value: number): number {
return Math.max(0, Math.min(100, value));
}
function safeFilePart(value: string): string {
return value.replace(/[^a-zA-Z0-9_.-]/g, '_').slice(0, 120);
}
```
- [ ] **Step 4扩展 tool execute 契约**
修改 `packages/backend/src/modules/netaclaw/tools/common.ts`
增加 import
```ts
import type { RuntimeProcessEventInput } from '../runtime/process_events.js';
```
增加类型:
```ts
export type ToolProcessUpdateCallback = (event: RuntimeProcessEventInput) => void;
```
`AgentTool.execute` 改为:
```ts
execute(id: string, params: Static<TParams>, onUpdate?: ToolProcessUpdateCallback): Promise<TResult>;
```
- [ ] **Step 5在 attempt 中桥接 tool update**
修改 `packages/backend/src/modules/netaclaw/runtime/attempt.ts`
增加 import
```ts
import { AnyAgentTool, ToolResultContent, toolResultToText } from '../tools/common.js';
import { RuntimeProcessEvent, RuntimeProcessEventInput, buildOperationId, normalizeRuntimeProcessEvent } from './process_events.js';
```
`AttemptParams` 增加:
```ts
onToolProcessEvent?: (input: {
toolCallId: string;
operationId: string;
toolName: string;
args: Record<string, unknown>;
event: RuntimeProcessEvent;
}) => void;
```
`runAttempt()` 中解构 `onToolProcessEvent`
调用 `tool.execute` 前增加:
```ts
const operationId = buildOperationId(tc.id);
const emitProcessEvent = (event: RuntimeProcessEventInput) => {
const normalized = normalizeRuntimeProcessEvent({
operationId,
targetType: event.targetType || 'tool',
source: event.source || 'tool',
...event,
});
onToolProcessEvent?.({
toolCallId: tc.id,
operationId,
toolName: tc.name,
args: effectiveArgs,
event: normalized,
});
};
```
把:
```ts
const result = await tool.execute(tc.id, effectiveArgs);
```
改为:
```ts
const result = await tool.execute(tc.id, effectiveArgs, emitProcessEvent);
```
- [ ] **Step 6在 agent 中透传 callback**
修改 `packages/backend/src/modules/netaclaw/runtime/agent.ts`
增加 import
```ts
import type { RuntimeProcessEvent } from './process_events.js';
```
`AgentRunParams` 增加:
```ts
onToolProcessEvent?: (input: {
toolCallId: string;
operationId: string;
toolName: string;
args: Record<string, unknown>;
event: RuntimeProcessEvent;
}) => void;
```
`runAttempt()` 参数中增加:
```ts
onToolProcessEvent: params.onToolProcessEvent,
```
- [ ] **Step 7运行测试**
```bash
pnpm --filter @neta/backend test -- --runInBand test/tool_process_events.test.ts
```
预期PASS。
---
## Task 2Skill 子进程过程事件解析
**文件:**
- 修改:`packages/backend/src/modules/netaclaw/service/skill_executor.ts`
- 测试:`packages/backend/test/skill_process_events.test.ts`
- [ ] **Step 1先写失败测试**
创建 `packages/backend/test/skill_process_events.test.ts`
```ts
import { normalizeSkillProcessLine } from '../src/modules/netaclaw/service/skill_executor';
describe('skill process event parsing', () => {
it('只解析显式 type=process_event 的 JSON 行', () => {
const event = normalizeSkillProcessLine(
'vehicle-damage-inspection',
JSON.stringify({
type: 'process_event',
time: '2026-05-06T04:07:17.187Z',
stage: 'frames',
message: 'frames extracted',
taskId: 'inspection-1',
frameCount: 361,
duration: 120.38,
resolution: '960x544',
}),
7,
);
expect(event).toEqual({
source: 'skill',
targetType: 'skill',
taskId: 'inspection-1',
stage: 'frames',
status: 'running',
level: 'info',
message: 'frames extracted',
timestamp: '2026-05-06T04:07:17.187Z',
sequence: 7,
payload: {
frameCount: 361,
duration: 120.38,
resolution: '960x544',
},
});
});
it('从 batch/totalBatches 推导 current total percent', () => {
const event = normalizeSkillProcessLine(
'vehicle-damage-inspection',
JSON.stringify({
type: 'process_event',
time: '2026-05-06T04:08:11.443Z',
stage: 'detect',
message: 'batch completed',
batch: 3,
totalBatches: 8,
damageCount: 3,
}),
8,
);
expect(event).toEqual(expect.objectContaining({
source: 'skill',
targetType: 'skill',
stage: 'detect',
message: 'batch completed',
current: 3,
total: 8,
percent: 38,
payload: expect.objectContaining({ damageCount: 3 }),
}));
});
it('忽略普通 stderr', () => {
expect(normalizeSkillProcessLine('demo', 'plain stderr', 1)).toBeNull();
});
it('忽略没有 process_event 标记的普通 JSON', () => {
expect(normalizeSkillProcessLine('demo', '{"stage":"frames","message":"too broad"}', 1)).toBeNull();
});
});
```
- [ ] **Step 2运行测试确认失败**
```bash
pnpm --filter @neta/backend test -- --runInBand test/skill_process_events.test.ts
```
预期:失败,因为 `normalizeSkillProcessLine` 还不存在。
- [ ] **Step 3实现解析器**
`packages/backend/src/modules/netaclaw/service/skill_executor.ts` 中增加 import
```ts
import type { RuntimeProcessEventInput } from '../runtime/process_events.js';
```
扩展 `SkillExecuteParams`
```ts
toolCallId?: string;
onProcessEvent?: (event: RuntimeProcessEventInput) => void;
```
`SkillExecutorService` 之前增加:
```ts
const RESERVED_PROCESS_KEYS = new Set([
'type',
'time',
'timestamp',
'stage',
'message',
'taskId',
'level',
'status',
'current',
'total',
'percent',
'batch',
'totalBatches',
]);
export function normalizeSkillProcessLine(
_skillName: string,
line: string,
sequence: number,
): RuntimeProcessEventInput | null {
let parsed: Record<string, unknown>;
try {
parsed = JSON.parse(line);
} catch {
return null;
}
if (!parsed || typeof parsed !== 'object' || Array.isArray(parsed)) return null;
if (parsed.type !== 'process_event') return null;
if (typeof parsed.message !== 'string' && typeof parsed.stage !== 'string') return null;
const current = typeof parsed.current === 'number'
? parsed.current
: typeof parsed.batch === 'number'
? parsed.batch
: undefined;
const total = typeof parsed.total === 'number'
? parsed.total
: typeof parsed.totalBatches === 'number'
? parsed.totalBatches
: undefined;
const percent = typeof parsed.percent === 'number'
? parsed.percent
: total && current !== undefined
? Math.max(0, Math.min(100, Math.round((current / total) * 100)))
: undefined;
const payload: Record<string, unknown> = {};
for (const [key, value] of Object.entries(parsed)) {
if (!RESERVED_PROCESS_KEYS.has(key)) payload[key] = value;
}
return {
source: 'skill',
targetType: 'skill',
taskId: typeof parsed.taskId === 'string' ? parsed.taskId : undefined,
stage: typeof parsed.stage === 'string' ? parsed.stage : undefined,
status: isProcessStatus(parsed.status) ? parsed.status : 'running',
level: isProcessLevel(parsed.level) ? parsed.level : 'info',
message: typeof parsed.message === 'string' ? parsed.message : String(parsed.stage || 'running'),
current,
total,
percent,
timestamp: typeof parsed.time === 'string'
? parsed.time
: typeof parsed.timestamp === 'string'
? parsed.timestamp
: new Date().toISOString(),
sequence,
payload: Object.keys(payload).length > 0 ? payload : undefined,
};
}
function isProcessStatus(value: unknown): value is RuntimeProcessEventInput['status'] {
return value === 'queued'
|| value === 'started'
|| value === 'running'
|| value === 'completed'
|| value === 'failed'
|| value === 'cancelled'
|| value === 'skipped';
}
function isProcessLevel(value: unknown): value is RuntimeProcessEventInput['level'] {
return value === 'debug' || value === 'info' || value === 'warning' || value === 'error';
}
```
- [ ] **Step 4从 stderr 中提取过程事件**
`execute()` 中增加:
```ts
let processSequence = 0;
```
在 stderr line loop 中 logger 后增加:
```ts
const processEvent = normalizeSkillProcessLine(skillName, line, ++processSequence);
if (processEvent) {
params.onProcessEvent?.(processEvent);
}
```
- [ ] **Step 5补失败终态事件**
timeout 时发:
```ts
params.onProcessEvent?.({
source: 'runtime',
targetType: 'skill',
taskId: typeof input.taskId === 'string' ? input.taskId : undefined,
stage: 'skill-timeout',
status: 'failed',
level: 'error',
message: `Execution timed out after ${timeout}ms`,
});
```
非 0 退出、spawn error、stdout JSON 解析失败时同样发 `status: 'failed'``source: 'runtime'``targetType: 'skill'`
- [ ] **Step 6运行测试**
```bash
pnpm --filter @neta/backend test -- --runInBand test/skill_process_events.test.ts
```
预期PASS。
---
## Task 3execute_skill 桥接 skill 过程事件
**文件:**
- 修改:`packages/backend/src/modules/netaclaw/tools/builtin/execute_skill.ts`
- 测试:扩展 `packages/backend/test/tool_process_events.test.ts`
- [ ] **Step 1增加失败测试**
`packages/backend/test/tool_process_events.test.ts` 追加:
```ts
import { createExecuteSkillTool } from '../src/modules/netaclaw/tools/builtin/execute_skill';
describe('execute_skill process bridge', () => {
it('把 skill 过程事件转发给 tool update callback', async () => {
const executor = {
execute: jest.fn(async (params: any) => {
params.onProcessEvent({
source: 'skill',
targetType: 'skill',
taskId: 'task-1',
stage: 'frames',
status: 'running',
message: 'extracting frames',
current: 1,
total: 4,
});
return { success: true, output: { ok: true }, duration: 10 };
}),
} as any;
const updates: any[] = [];
const tool = createExecuteSkillTool(executor);
await tool.execute(
'call-skill-1',
{ name: 'vehicle-damage-inspection', input: { taskId: 'task-1' } },
event => updates.push(event),
);
expect(executor.execute).toHaveBeenCalledWith(expect.objectContaining({
skillName: 'vehicle-damage-inspection',
input: { taskId: 'task-1' },
toolCallId: 'call-skill-1',
}));
expect(updates).toEqual([
expect.objectContaining({
source: 'skill',
targetType: 'skill',
taskId: 'task-1',
stage: 'frames',
message: 'extracting frames',
current: 1,
total: 4,
percent: 25,
}),
]);
});
});
```
- [ ] **Step 2运行测试确认失败**
```bash
pnpm --filter @neta/backend test -- --runInBand test/tool_process_events.test.ts
```
- [ ] **Step 3修改 execute_skill**
`packages/backend/src/modules/netaclaw/tools/builtin/execute_skill.ts` 中把 execute 改成接收第三参数:
```ts
async execute(id, params, onUpdate) {
```
调用 executor 时改为:
```ts
const result = await executor.execute({
skillName: params.name,
input: params.input as Record<string, unknown>,
toolCallId: id,
onProcessEvent: event => {
const total = typeof event.total === 'number' ? event.total : undefined;
const current = typeof event.current === 'number' ? event.current : undefined;
onUpdate?.({
...event,
source: 'skill',
targetType: event.targetType || 'skill',
percent: typeof event.percent === 'number'
? event.percent
: total && current !== undefined
? Math.max(0, Math.min(100, Math.round((current / total) * 100)))
: undefined,
});
},
});
```
- [ ] **Step 4运行测试**
```bash
pnpm --filter @neta/backend test -- --runInBand test/tool_process_events.test.ts test/skill_process_events.test.ts
```
预期PASS。
---
## Task 4Gateway 协议、节流、metadata 摘要和 JSONL 记录
**文件:**
- 修改:`packages/backend/src/modules/netaclaw/runtime/process_events.ts`
- 修改:`packages/backend/src/modules/netaclaw/gateway/protocol.ts`
- 修改:`packages/backend/src/modules/netaclaw/gateway/server.ts`
- 测试:`packages/backend/test/process_event_buffer.test.ts`
- [ ] **Step 1写轻量 buffer 测试**
创建 `packages/backend/test/process_event_buffer.test.ts`
```ts
import { ProcessEventBuffer } from '../src/modules/netaclaw/runtime/process_events';
import type { RuntimeProcessEvent } from '../src/modules/netaclaw/runtime/process_events';
function event(overrides: Partial<RuntimeProcessEvent>): RuntimeProcessEvent {
return {
version: 1,
operationId: 'tool-call-1',
targetType: 'skill',
source: 'skill',
status: 'running',
level: 'info',
message: 'batch completed',
timestamp: '2026-05-06T04:00:00.000Z',
...overrides,
};
}
describe('ProcessEventBuffer', () => {
it('合并重复过程事件,并 flush 最新事件', () => {
const emitted: RuntimeProcessEvent[] = [];
const buffer = new ProcessEventBuffer(e => emitted.push(e), { throttleMs: 500 });
buffer.push(event({ operationId: 'op-1', stage: 'detect', sequence: 1 }));
buffer.push(event({ operationId: 'op-1', stage: 'detect', sequence: 2 }));
buffer.flushAll();
expect(emitted).toHaveLength(1);
expect(emitted[0]).toEqual(expect.objectContaining({
operationId: 'op-1',
stage: 'detect',
sequence: 2,
payload: expect.objectContaining({ repeatCount: 2 }),
}));
});
it('终态和错误事件立即发送', () => {
const emitted: RuntimeProcessEvent[] = [];
const buffer = new ProcessEventBuffer(e => emitted.push(e), { throttleMs: 500 });
buffer.push(event({ operationId: 'op-1', status: 'failed', level: 'error', message: 'failed' }));
expect(emitted).toHaveLength(1);
expect(emitted[0]).toEqual(expect.objectContaining({ status: 'failed', message: 'failed' }));
});
});
```
- [ ] **Step 2运行测试确认失败**
```bash
pnpm --filter @neta/backend test -- --runInBand test/process_event_buffer.test.ts
```
- [ ] **Step 3在 process_events.ts 中增加轻量 buffer**
`packages/backend/src/modules/netaclaw/runtime/process_events.ts` 追加:
```ts
export interface ProcessEventBufferOptions {
throttleMs?: number;
}
interface BufferedEvent {
event: RuntimeProcessEvent;
repeatCount: number;
timer?: NodeJS.Timeout;
}
export class ProcessEventBuffer {
private readonly throttleMs: number;
private readonly pending = new Map<string, BufferedEvent>();
constructor(
private readonly emit: (event: RuntimeProcessEvent) => void,
options: ProcessEventBufferOptions = {},
) {
this.throttleMs = options.throttleMs ?? 400;
}
push(event: RuntimeProcessEvent) {
if (isTerminalProcessEvent(event)) {
this.flushOperation(event.operationId);
this.emit(event);
return;
}
const key = `${event.operationId}:${event.stage || ''}:${event.message}`;
const existing = this.pending.get(key);
if (existing) {
existing.event = event;
existing.repeatCount += 1;
return;
}
const buffered: BufferedEvent = {
event,
repeatCount: 1,
timer: setTimeout(() => this.flushKey(key), this.throttleMs),
};
this.pending.set(key, buffered);
}
flushAll() {
for (const key of [...this.pending.keys()]) {
this.flushKey(key);
}
}
private flushOperation(operationId: string) {
for (const [key, buffered] of [...this.pending.entries()]) {
if (buffered.event.operationId === operationId) {
this.flushKey(key);
}
}
}
private flushKey(key: string) {
const buffered = this.pending.get(key);
if (!buffered) return;
if (buffered.timer) clearTimeout(buffered.timer);
this.pending.delete(key);
this.emit({
...buffered.event,
payload: buffered.repeatCount > 1
? { ...(buffered.event.payload || {}), repeatCount: buffered.repeatCount }
: buffered.event.payload,
});
}
}
function isTerminalProcessEvent(event: RuntimeProcessEvent): boolean {
return event.status === 'completed'
|| event.status === 'failed'
|| event.status === 'cancelled'
|| event.status === 'skipped'
|| event.level === 'error';
}
```
- [ ] **Step 4增加 WebSocket 协议**
`packages/backend/src/modules/netaclaw/gateway/protocol.ts` 增加:
```ts
export interface ServerToolProcessEvent {
type: 'tool_process_event';
sessionId: string;
data: {
version: 1;
operationId: string;
parentOperationId?: string;
targetType: 'tool' | 'skill' | 'subagent' | 'task' | 'runtime';
toolCallId: string;
assistantEntryId?: string;
toolName: string;
toolLabel?: string;
source: 'skill' | 'tool' | 'subagent' | 'runtime';
taskId?: string;
stage?: string;
status: 'queued' | 'started' | 'running' | 'completed' | 'failed' | 'cancelled' | 'skipped';
level?: 'debug' | 'info' | 'warning' | 'error';
message: string;
detail?: string;
current?: number;
total?: number;
percent?: number;
payload?: Record<string, unknown>;
timestamp: string;
sequence: number;
};
}
```
`ServerToolProcessEvent` 加入 `ServerEvent` union。
- [ ] **Step 5Gateway 发送 live event 并保存 metadata 摘要**
`packages/backend/src/modules/netaclaw/gateway/server.ts` import
```ts
import {
ProcessEventBuffer,
RuntimeProcessEvent,
RuntimeProcessLogRef,
appendRuntimeProcessLog,
buildProcessLogRef,
sampleProcessEventsForMetadata,
} from '../runtime/process_events.js';
```
扩展 `toolExecutions` item
```ts
processEvents?: RuntimeProcessEvent[];
processLogRef?: RuntimeProcessLogRef;
```
`buildToolExecutionMetadata()` 的每个 item 中增加:
```ts
latestProcessEvent: te.processEvents?.at(-1),
processEvents: sampleProcessEventsForMetadata(te.processEvents || [], 50),
processLogRef: te.processLogRef,
```
在 runner 内创建 buffer
```ts
const processEventBuffer = new ProcessEventBuffer(event => {
this.send({
type: 'tool_process_event',
sessionId: sid,
data: event as any,
});
});
```
`runAgent` 增加:
```ts
onToolProcessEvent: ({ toolCallId, operationId, toolName, event }) => {
const projectionId = toolCallId || randomUUID();
const te = [...toolExecutions].reverse().find(t => t.toolCallId === projectionId)
|| [...toolExecutions].reverse().find(t => t.name === toolName && t.status === 'running');
const sequence = (te?.processEvents?.length || 0) + 1;
const processEvent: RuntimeProcessEvent = {
...event,
sequence,
};
if (te) {
te.processEvents = [...(te.processEvents || []), processEvent].slice(-50);
te.processLogRef = appendRuntimeProcessLog({
sessionId: sid,
operationId: processEvent.operationId || operationId,
toolCallId: projectionId,
event: processEvent,
});
}
processEventBuffer.push({
...processEvent,
payload: processEvent.payload,
} as RuntimeProcessEvent & {
toolCallId: string;
assistantEntryId?: string;
toolName: string;
toolLabel?: string;
});
},
```
注意:如果 TypeScript 不接受扩展字段,给发送 payload 单独构造对象,不要污染 `RuntimeProcessEvent` 基础类型。
- [ ] **Step 6失败时发终态事件**
`failRunningTools()` 中为每个 running tool 追加 `status: 'failed'` 的过程事件,并调用 `processEventBuffer.push(...)`
逻辑引用使用:
```ts
te.processLogRef = buildProcessLogRef({
sessionId: sid,
operationId: `tool-${te.toolCallId}`,
toolCallId: te.toolCallId,
});
```
不要把本地 JSONL 绝对路径写进 metadata。
- [ ] **Step 7flush pending events**
`runAgent` 正常完成后、构建 final metadata 前调用:
```ts
processEventBuffer.flushAll();
```
在 catch 中 `failRunningTools(...)` 后也调用。
- [ ] **Step 8运行测试和构建**
```bash
pnpm --filter @neta/backend test -- --runInBand test/tool_process_events.test.ts test/skill_process_events.test.ts test/process_event_buffer.test.ts
pnpm --filter @neta/backend build
```
预期PASS。
---
## Task 5前端类型和 chat store 投影
**文件:**
- 修改:`packages/frontend/src/modules/agent/types/index.d.ts`
- 修改:`packages/frontend/src/modules/agent/store/chat.ts`
- [ ] **Step 1增加前端类型**
`packages/frontend/src/modules/agent/types/index.d.ts` 增加:
```ts
export interface RuntimeProcessLogRef {
kind: 'runtime_process_log';
sessionId: string;
operationId: string;
toolCallId?: string;
}
export interface RuntimeProcessEventProjection {
version: 1;
operationId: string;
parentOperationId?: string;
targetType: 'tool' | 'skill' | 'subagent' | 'task' | 'runtime';
source: 'skill' | 'tool' | 'subagent' | 'runtime';
taskId?: string;
stage?: string;
status: 'queued' | 'started' | 'running' | 'completed' | 'failed' | 'cancelled' | 'skipped';
level?: 'debug' | 'info' | 'warning' | 'error';
message: string;
detail?: string;
current?: number;
total?: number;
percent?: number;
payload?: Record<string, unknown>;
timestamp: string;
sequence: number;
}
```
`StreamingToolProjection` 增加:
```ts
latestProcessEvent?: RuntimeProcessEventProjection;
processEvents?: RuntimeProcessEventProjection[];
processLogRef?: RuntimeProcessLogRef;
```
`WSServerEvent.type` 增加 `'tool_process_event'`
- [ ] **Step 2从 metadata 恢复过程事件**
`packages/frontend/src/modules/agent/store/chat.ts``toStreamingToolsFromEntry()` 中,每个 projected tool 增加:
```ts
latestProcessEvent: normalizeToolProcessEvent(item.latestProcessEvent),
processEvents: normalizeToolProcessEvents(item.processEvents),
processLogRef: normalizeProcessLogRef(item.processLogRef),
```
增加 helper
```ts
function normalizeToolProcessEvents(value: unknown): import('../types/index.d').RuntimeProcessEventProjection[] | undefined {
if (!Array.isArray(value)) return undefined;
const events = value
.map(normalizeToolProcessEvent)
.filter((item): item is import('../types/index.d').RuntimeProcessEventProjection => Boolean(item));
return events.length > 0 ? events.slice(-50) : undefined;
}
function normalizeToolProcessEvent(value: unknown): import('../types/index.d').RuntimeProcessEventProjection | undefined {
if (!value || typeof value !== 'object' || Array.isArray(value)) return undefined;
const record = value as Record<string, any>;
if (typeof record.message !== 'string') return undefined;
return {
version: 1,
operationId: typeof record.operationId === 'string' ? record.operationId : `legacy-${record.toolCallId || record.sequence || Date.now()}`,
parentOperationId: typeof record.parentOperationId === 'string' ? record.parentOperationId : undefined,
targetType: ['tool', 'skill', 'subagent', 'task', 'runtime'].includes(record.targetType) ? record.targetType : 'tool',
source: ['skill', 'tool', 'subagent', 'runtime'].includes(record.source) ? record.source : 'tool',
taskId: typeof record.taskId === 'string' ? record.taskId : undefined,
stage: typeof record.stage === 'string' ? record.stage : undefined,
status: ['queued', 'started', 'running', 'completed', 'failed', 'cancelled', 'skipped'].includes(record.status) ? record.status : 'running',
level: ['debug', 'info', 'warning', 'error'].includes(record.level) ? record.level : undefined,
message: record.message,
detail: typeof record.detail === 'string' ? record.detail : undefined,
current: typeof record.current === 'number' ? record.current : undefined,
total: typeof record.total === 'number' ? record.total : undefined,
percent: typeof record.percent === 'number' ? record.percent : undefined,
payload: record.payload && typeof record.payload === 'object' && !Array.isArray(record.payload) ? record.payload : undefined,
timestamp: typeof record.timestamp === 'string' ? record.timestamp : new Date().toISOString(),
sequence: typeof record.sequence === 'number' ? record.sequence : 0,
};
}
function normalizeProcessLogRef(value: unknown): import('../types/index.d').RuntimeProcessLogRef | undefined {
if (!value || typeof value !== 'object' || Array.isArray(value)) return undefined;
const record = value as Record<string, any>;
if (record.kind !== 'runtime_process_log') return undefined;
if (typeof record.sessionId !== 'string' || typeof record.operationId !== 'string') return undefined;
return {
kind: 'runtime_process_log',
sessionId: record.sessionId,
operationId: record.operationId,
toolCallId: typeof record.toolCallId === 'string' ? record.toolCallId : undefined,
};
}
```
- [ ] **Step 3处理 live tool_process_event**
`handleWSEvent()` 中增加:
```ts
case 'tool_process_event':
if (event.data?.toolCallId) {
const processEvent = normalizeToolProcessEvent(event.data);
if (processEvent) {
const existing = streamingTools.value.find(item => item.toolCallId === event.data.toolCallId);
const processEvents = [...(existing?.processEvents || []), processEvent].slice(-50);
upsertStreamingTool({
toolCallId: event.data.toolCallId,
assistantEntryId: event.data.assistantEntryId,
toolName: event.data.toolName,
label: event.data.toolLabel,
status: existing?.status || 'running',
args: existing?.args,
result: existing?.result,
rawResult: existing?.rawResult,
runtimeRoute: existing?.runtimeRoute,
workerRoutingHint: existing?.workerRoutingHint,
blockedReason: existing?.blockedReason,
policySources: existing?.policySources,
processEvents,
latestProcessEvent: processEvent,
processLogRef: existing?.processLogRef,
});
}
}
break;
```
- [ ] **Step 4运行前端类型检查**
```bash
pnpm --filter @neta/frontend type-check
```
预期PASS。如果存在无关旧错误记录下来并在最终执行 `pnpm --filter @neta/frontend build`
---
## Task 6前端过程时间线 UI
**文件:**
- 新增:`packages/frontend/src/modules/agent/components/tool-process-timeline.vue`
- 修改:`packages/frontend/src/modules/agent/components/skill-card.vue`
- 修改:实际渲染 `streamingTools` 的父组件或 renderer
- [ ] **Step 1新增独立时间线组件**
创建 `packages/frontend/src/modules/agent/components/tool-process-timeline.vue`
```vue
<template>
<div v-if="events.length" class="tool-process">
<el-button text size="small" @click="showProcess = !showProcess">
{{ showProcess ? '收起过程' : `查看过程 ${events.length}` }}
</el-button>
<el-collapse-transition>
<ol v-if="showProcess" class="tool-process__timeline">
<li
v-for="event in events"
:key="`${event.operationId}-${event.sequence}-${event.timestamp}`"
class="tool-process__item"
:class="[`is-${event.level || 'info'}`]"
>
<span class="tool-process__dot" />
<span class="tool-process__main">
<span class="tool-process__message">{{ event.message }}</span>
<span v-if="event.current !== undefined && event.total !== undefined" class="tool-process__count">
{{ event.current }}/{{ event.total }}
</span>
</span>
</li>
</ol>
</el-collapse-transition>
</div>
</template>
<script lang="ts" setup>
import { computed, ref } from 'vue';
import type { RuntimeProcessEventProjection } from '../types/index.d';
const props = defineProps<{
events?: RuntimeProcessEventProjection[];
}>();
const showProcess = ref(false);
const events = computed(() => props.events || []);
</script>
<style lang="scss" scoped>
.tool-process {
margin-top: 6px;
}
.tool-process__timeline {
list-style: none;
margin: 8px 0 0;
padding: 0;
display: grid;
gap: 6px;
}
.tool-process__item {
display: grid;
grid-template-columns: 10px 1fr;
gap: 8px;
align-items: start;
color: var(--el-text-color-secondary);
font-size: 12px;
line-height: 1.45;
}
.tool-process__dot {
width: 6px;
height: 6px;
margin-top: 6px;
border-radius: 50%;
background: var(--el-color-primary);
}
.tool-process__item.is-warning .tool-process__dot {
background: var(--el-color-warning);
}
.tool-process__item.is-error .tool-process__dot {
background: var(--el-color-danger);
}
.tool-process__main {
min-width: 0;
display: flex;
gap: 6px;
align-items: baseline;
}
.tool-process__message {
min-width: 0;
overflow-wrap: anywhere;
}
.tool-process__count {
flex: none;
color: var(--el-text-color-placeholder);
}
</style>
```
- [ ] **Step 2修改 skill-card**
`skill-card.vue` 中增加:
```ts
import type { RuntimeProcessEventProjection } from '../types/index.d';
import ToolProcessTimeline from './tool-process-timeline.vue';
```
props 增加:
```ts
latestProcessEvent?: RuntimeProcessEventProjection;
processEvents?: RuntimeProcessEventProjection[];
```
把 running 区域改为:
```vue
<transition name="fade">
<div v-if="status === 'running' || latestProcessEvent" class="skill-card__progress">
<el-progress
v-if="typeof latestProcessEvent?.percent === 'number' || typeof percent === 'number'"
:percentage="latestProcessEvent?.percent ?? percent ?? 0"
:stroke-width="4"
:show-text="false"
/>
<span v-if="latestProcessEvent?.message || step || detail" class="skill-card__step">
{{ latestProcessEvent?.message || step || detail }}
</span>
<span v-if="latestProcessEvent?.detail" class="skill-card__detail">
{{ latestProcessEvent.detail }}
</span>
</div>
</transition>
<ToolProcessTimeline :events="processEvents" />
```
增加样式:
```scss
.skill-card__detail {
display: block;
margin-top: 2px;
font-size: 12px;
color: var(--el-text-color-placeholder);
line-height: 1.45;
}
```
- [ ] **Step 3确认父组件传参**
搜索:
```bash
rg -n "<skill-card|SkillCard|streamingTools" packages/frontend/src/modules/agent
```
确保最终渲染工具卡的位置传入:
```vue
:latest-process-event="tool.latestProcessEvent"
:process-events="tool.processEvents"
```
如果走 renderer registry确保 renderer 保留:
```ts
latestProcessEvent: tool.latestProcessEvent,
processEvents: tool.processEvents,
processLogRef: tool.processLogRef,
```
- [ ] **Step 4运行构建**
```bash
pnpm --filter @neta/frontend build
```
预期PASS。
---
## Task 7端到端验证
**文件:**
- 不要求新增源码文件。
- 使用现有 `vehicle-damage-inspection` skill。
- [ ] **Step 1运行后端测试**
```bash
pnpm --filter @neta/backend test -- --runInBand test/tool_process_events.test.ts test/skill_process_events.test.ts test/process_event_buffer.test.ts
```
预期PASS。
- [ ] **Step 2运行后端构建**
```bash
pnpm --filter @neta/backend build
```
预期PASS。
- [ ] **Step 3运行前端构建**
```bash
pnpm --filter @neta/frontend build
```
预期PASS。
- [ ] **Step 4手工验证旧伤检测过程**
启动项目:
```bash
pnpm dev
```
在 Agent 对话页输入:
```text
请检查是否有旧伤
```
预期:
- `execute_skill` 工具卡出现。
- 旧伤检测过程中,卡片持续显示当前阶段,例如:
- `creating workspace`
- `extracting frames`
- `frames extracted`
- `batch started`
- `batch completed`
- `grounding damage candidates`
- `selecting best frames`
- 展开过程后可以看到采样后的过程时间线。
- assistant metadata 中有 `latestProcessEvent``processEvents`、逻辑 `processLogRef`
- `processLogRef` 不包含本地绝对路径。
- 后端数据目录下存在完整 JSONL 过程日志。
- 刷新页面后,已完成会话仍能显示 metadata 中保存的过程摘要。
- 模型最终回复和 tool result 不受影响。
- [ ] **Step 5检查模型上下文边界**
检查 `buildLLMMessages` 和 session-tree runtime context 构建逻辑,确认:
```text
latestProcessEvent
processEvents
processLogRef
```
不会被注入 LLM history。
- [ ] **Step 6并发关联验证**
如果能触发同一轮里多个工具或多次 skill 调用,验证:
- 过程事件按 `toolCallId` 合并到正确工具卡。
- 同名 skill 不串线。
- 每个事件都有稳定 `version=1``operationId``targetType``timestamp`
- 失败时卡片收到 `failed` 过程事件,并结束 running 状态。
---
## 自检清单
- [ ] 第一版只做 `execute_skill` 过程可见,没有扩展成完整任务中心。
- [ ] 事件结构支持未来 subagent/tool/background task但不在本期强行接入。
- [ ] `processLogRef` 是逻辑引用,不暴露本地绝对路径。
- [ ] 过程事件只存在 metadata/UI replay/log不进入 LLM 上下文。
- [ ] payload 做敏感字段脱敏和长字符串截断。
- [ ] stderr 解析只接受 `type: "process_event"`,不会误解析普通 JSON 日志。
- [ ] gateway 中只保留必要桥接逻辑,通用 normalize/sample/log/buffer 放在轻量 helper 中。
- [ ] 前端不是单纯进度条,而是“最新步骤 + 可展开过程时间线”。
- [ ] 后端测试、后端 build、前端 build 全部通过。