GPU_GUARD_MONOREPO/docs/superpowers/plans/2026-05-06-tool-process-events.md

1548 lines
45 KiB
Markdown
Raw Permalink Normal View History

2026-05-20 21:39:12 +08:00
# 运行时过程事件实施计划
> **给执行代理的要求:** 实施本计划时必须使用 `superpowers:subagent-driven-development`(推荐)或 `superpowers:executing-plans`。每个步骤使用 checkbox`- [ ]`)跟踪。
**目标:** 为长耗时工具、compute-entry skill 以及未来的子代理/批处理任务增加一条 Pi 风格的运行时过程事件流,让 Agent 对话页能实时显示“执行到了哪一步”,而不是只显示最终结果或单一进度条。
**架构:** 在现有 NetaClaw 工具生命周期 `started -> result -> completed` 中增加 `process-event`。第一期 WebSocket 事件名保留为 `tool_process_event`,便于接入当前 `streamingTools` 投影;但事件 payload 采用通用 `RuntimeProcessEvent`,包含 `version``operationId``parentOperationId``targetType``toolCallId``taskId` 等字段,避免绑定死某一个 skill。`execute_skill` 负责把 skill 子进程输出的结构化过程事件桥接到 runtime callbackgateway 负责把事件节流后发给前端,并在 assistant metadata 中保存摘要,同时写完整 JSONL 过程日志供后端排查。
**关键边界:**
- 过程事件只用于 UI 展示、过程回放和后端排查,不进入 LLM history也不能被 `buildLLMMessages` 带回模型上下文。
- `processLogRef` 只能是逻辑引用,不能把本地绝对路径暴露给前端。
- 第一期不做完整事件总线,不做任务中心,不做完整日志查看 API只做一个轻量 runtime helper避免过度设计。
- 第一期只保证 `execute_skill` 过程可见;事件结构保留未来接入普通 tool、subagent、后台任务的能力。
**技术栈:** TypeScript、Midway.js WebSocket、NetaClaw runtime tools、Pinia/Vue 3、Element Plus、Jest、vue-tsc/Vite build。
---
## 文件结构
后端:
- 新增 `packages/backend/src/modules/netaclaw/runtime/process_events.ts`
- 定义 `RuntimeProcessEvent``RuntimeProcessEventInput`
- 负责事件归一化、`operationId` 生成、百分比计算、payload 脱敏、metadata 采样、逻辑 `processLogRef` 生成、JSONL 记录、轻量节流/去重。
- 注意:这是轻量 helper不是完整 EventBus。
- 修改 `packages/backend/src/modules/netaclaw/tools/common.ts`
- 增加 `ToolProcessUpdateCallback`
- 扩展 `AgentTool.execute()`,增加可选第三参数 `onUpdate`
- 修改 `packages/backend/src/modules/netaclaw/runtime/attempt.ts`
- 增加 `onToolProcessEvent`
- 调用 tool 时传入 `emitProcessEvent`
- 修改 `packages/backend/src/modules/netaclaw/runtime/agent.ts`
-`onToolProcessEvent``runAgent` 透传到 `runAttempt`
- 修改 `packages/backend/src/modules/netaclaw/service/skill_executor.ts`
- 只解析显式 `type: "process_event"` 的 JSON 行。
- 保持普通 stderr 原样进后端日志。
- timeout、非 0 退出、spawn error、最终 JSON 解析失败时,发终态失败过程事件。
- 修改 `packages/backend/src/modules/netaclaw/tools/builtin/execute_skill.ts`
- 把 skill 过程事件桥接成 tool process update。
- 修改 `packages/backend/src/modules/netaclaw/gateway/protocol.ts`
- 增加 `ServerToolProcessEvent`
- 修改 `packages/backend/src/modules/netaclaw/gateway/server.ts`
- 接收 runtime process event。
-`tool_process_event` 给前端。
-`skillExecutions` metadata 中保存 `latestProcessEvent`、采样后的 `processEvents`、逻辑 `processLogRef`
- 不把完整本地文件路径放入前端 payload。
后端测试:
- 新增 `packages/backend/test/tool_process_events.test.ts`
- 新增 `packages/backend/test/skill_process_events.test.ts`
- 新增 `packages/backend/test/process_event_buffer.test.ts`
前端:
- 修改 `packages/frontend/src/modules/agent/types/index.d.ts`
- 增加 `RuntimeProcessEventProjection`
-`StreamingToolProjection` 增加 `latestProcessEvent``processEvents``processLogRef`
-`WSServerEvent.type` 增加 `tool_process_event`
- 修改 `packages/frontend/src/modules/agent/store/chat.ts`
- 处理 live `tool_process_event`
- 从 assistant metadata 恢复过程事件摘要。
-`toolCallId` 合并到对应 `streamingTool`
- 新增 `packages/frontend/src/modules/agent/components/tool-process-timeline.vue`
- 独立渲染过程时间线。
- 修改 `packages/frontend/src/modules/agent/components/skill-card.vue`
- 展示最新过程事件。
- 只有存在 numeric percent 时才显示进度条。
- 嵌入 `tool-process-timeline.vue`
- 修改实际渲染 `streamingTools` 的父组件或 renderer
- 明确传入 `latestProcessEvent``processEvents``processLogRef`
---
## Task 1后端运行时过程事件类型与工具回调契约
**文件:**
- 新增:`packages/backend/src/modules/netaclaw/runtime/process_events.ts`
- 修改:`packages/backend/src/modules/netaclaw/tools/common.ts`
- 修改:`packages/backend/src/modules/netaclaw/runtime/attempt.ts`
- 修改:`packages/backend/src/modules/netaclaw/runtime/agent.ts`
- 测试:`packages/backend/test/tool_process_events.test.ts`
- [ ] **Step 1先写失败测试**
创建 `packages/backend/test/tool_process_events.test.ts`
```ts
import { Type } from '@sinclair/typebox';
import { runAttempt } from '../src/modules/netaclaw/runtime/attempt';
import type { AnyAgentTool } from '../src/modules/netaclaw/tools/common';
jest.mock('../src/modules/netaclaw/runtime/model_selection', () => ({
getProvider: () => ({
chat: jest
.fn()
.mockResolvedValueOnce({
content: '',
usage: { inputTokens: 1, outputTokens: 1 },
toolCalls: [{ id: 'call-1', name: 'slow_tool', arguments: '{"taskId":"task-1"}' }],
})
.mockResolvedValueOnce({
content: 'done',
usage: { inputTokens: 1, outputTokens: 1 },
toolCalls: [],
}),
}),
}));
describe('runtime process events', () => {
it('从执行中的 tool 发出带 toolCallId 和 operationId 的过程事件', async () => {
const updates: any[] = [];
const tool: AnyAgentTool = {
name: 'slow_tool',
label: 'Slow Tool',
description: 'test tool',
parameters: Type.Object({ taskId: Type.String() }),
async execute(_id, _params, onUpdate) {
onUpdate?.({
source: 'tool',
targetType: 'tool',
stage: 'prepare',
status: 'running',
message: 'Preparing work',
current: 1,
total: 2,
});
return { type: 'text', text: 'ok' };
},
};
await runAttempt({
messages: [{ role: 'user', content: 'run' }],
tools: [tool],
config: { provider: 'openai', model: 'test', apiKey: 'test' },
onToolProcessEvent: event => updates.push(event),
});
expect(updates).toEqual([
expect.objectContaining({
toolCallId: 'call-1',
operationId: 'tool-call-1',
toolName: 'slow_tool',
args: { taskId: 'task-1' },
event: expect.objectContaining({
version: 1,
operationId: 'tool-call-1',
targetType: 'tool',
source: 'tool',
stage: 'prepare',
status: 'running',
message: 'Preparing work',
current: 1,
total: 2,
percent: 50,
}),
}),
]);
});
});
```
- [ ] **Step 2运行测试确认失败**
```bash
pnpm --filter @neta/backend test -- --runInBand test/tool_process_events.test.ts
```
预期:失败,因为 `RuntimeProcessEvent``onToolProcessEvent``onUpdate` 还不存在。
- [ ] **Step 3新增运行时过程事件 helper**
创建 `packages/backend/src/modules/netaclaw/runtime/process_events.ts`
```ts
import { randomUUID } from 'crypto';
import * as fs from 'fs';
import * as path from 'path';
import { pDataPath } from '../../../comm/path.js';
export type RuntimeProcessTargetType = 'tool' | 'skill' | 'subagent' | 'task' | 'runtime';
export type RuntimeProcessSource = 'skill' | 'tool' | 'subagent' | 'runtime';
export type RuntimeProcessStatus = 'queued' | 'started' | 'running' | 'completed' | 'failed' | 'cancelled' | 'skipped';
export type RuntimeProcessLevel = 'debug' | 'info' | 'warning' | 'error';
export interface RuntimeProcessEvent {
version: 1;
operationId: string;
parentOperationId?: string;
targetType: RuntimeProcessTargetType;
source: RuntimeProcessSource;
taskId?: string;
stage?: string;
status: RuntimeProcessStatus;
level: RuntimeProcessLevel;
message: string;
detail?: string;
current?: number;
total?: number;
percent?: number;
payload?: Record<string, unknown>;
timestamp: string;
sequence?: number;
}
export type RuntimeProcessEventInput =
Partial<Omit<RuntimeProcessEvent, 'version' | 'operationId' | 'targetType' | 'source' | 'status' | 'level' | 'message' | 'timestamp'>> & {
operationId?: string;
parentOperationId?: string;
targetType?: RuntimeProcessTargetType;
source?: RuntimeProcessSource;
status?: RuntimeProcessStatus;
level?: RuntimeProcessLevel;
message: string;
timestamp?: string;
};
export interface RuntimeProcessLogRef {
kind: 'runtime_process_log';
sessionId: string;
operationId: string;
toolCallId?: string;
}
const SENSITIVE_KEY_PATTERN = /(api[_-]?key|token|password|secret|authorization|credential)/i;
const MAX_PAYLOAD_STRING_LENGTH = 500;
export function buildOperationId(toolCallId?: string): string {
return toolCallId ? `tool-${toolCallId}` : `op-${randomUUID()}`;
}
export function normalizeRuntimeProcessEvent(input: RuntimeProcessEventInput): RuntimeProcessEvent {
const total = typeof input.total === 'number' ? input.total : undefined;
const current = typeof input.current === 'number' ? input.current : undefined;
const percent = typeof input.percent === 'number'
? clampPercent(input.percent)
: total && current !== undefined
? clampPercent(Math.round((current / total) * 100))
: undefined;
return {
version: 1,
operationId: input.operationId || buildOperationId(),
parentOperationId: input.parentOperationId,
targetType: input.targetType || 'runtime',
source: input.source || 'runtime',
taskId: input.taskId,
stage: input.stage,
status: input.status || 'running',
level: input.level || 'info',
message: input.message,
detail: input.detail,
current,
total,
percent,
payload: sanitizeProcessPayload(input.payload),
timestamp: input.timestamp || new Date().toISOString(),
sequence: input.sequence,
};
}
export function sanitizeProcessPayload(payload: unknown): Record<string, unknown> | undefined {
if (!payload || typeof payload !== 'object' || Array.isArray(payload)) return undefined;
const output: Record<string, unknown> = {};
for (const [key, value] of Object.entries(payload as Record<string, unknown>)) {
if (SENSITIVE_KEY_PATTERN.test(key)) {
output[key] = '[redacted]';
} else if (typeof value === 'string') {
output[key] = value.length > MAX_PAYLOAD_STRING_LENGTH
? `${value.slice(0, MAX_PAYLOAD_STRING_LENGTH)}...`
: value;
} else if (typeof value === 'number' || typeof value === 'boolean' || value === null) {
output[key] = value;
} else {
output[key] = '[object]';
}
}
return Object.keys(output).length > 0 ? output : undefined;
}
export function sampleProcessEventsForMetadata<T>(events: T[], limit = 50): T[] {
if (events.length <= limit) return events;
const headCount = Math.min(10, Math.floor(limit / 5));
return [
...events.slice(0, headCount),
...events.slice(-(limit - headCount)),
];
}
export function buildProcessLogRef(input: {
sessionId: string;
operationId: string;
toolCallId?: string;
}): RuntimeProcessLogRef {
return {
kind: 'runtime_process_log',
sessionId: input.sessionId,
operationId: input.operationId,
toolCallId: input.toolCallId,
};
}
export function appendRuntimeProcessLog(input: {
sessionId: string;
operationId: string;
toolCallId?: string;
event: RuntimeProcessEvent;
}): RuntimeProcessLogRef {
const dir = path.join(pDataPath(), 'netaclaw-process-events', input.sessionId);
fs.mkdirSync(dir, { recursive: true });
const filename = `${safeFilePart(input.operationId)}.jsonl`;
fs.appendFileSync(path.join(dir, filename), `${JSON.stringify(input.event)}\n`, 'utf8');
return buildProcessLogRef(input);
}
function clampPercent(value: number): number {
return Math.max(0, Math.min(100, value));
}
function safeFilePart(value: string): string {
return value.replace(/[^a-zA-Z0-9_.-]/g, '_').slice(0, 120);
}
```
- [ ] **Step 4扩展 tool execute 契约**
修改 `packages/backend/src/modules/netaclaw/tools/common.ts`
增加 import
```ts
import type { RuntimeProcessEventInput } from '../runtime/process_events.js';
```
增加类型:
```ts
export type ToolProcessUpdateCallback = (event: RuntimeProcessEventInput) => void;
```
`AgentTool.execute` 改为:
```ts
execute(id: string, params: Static<TParams>, onUpdate?: ToolProcessUpdateCallback): Promise<TResult>;
```
- [ ] **Step 5在 attempt 中桥接 tool update**
修改 `packages/backend/src/modules/netaclaw/runtime/attempt.ts`
增加 import
```ts
import { AnyAgentTool, ToolResultContent, toolResultToText } from '../tools/common.js';
import { RuntimeProcessEvent, RuntimeProcessEventInput, buildOperationId, normalizeRuntimeProcessEvent } from './process_events.js';
```
`AttemptParams` 增加:
```ts
onToolProcessEvent?: (input: {
toolCallId: string;
operationId: string;
toolName: string;
args: Record<string, unknown>;
event: RuntimeProcessEvent;
}) => void;
```
`runAttempt()` 中解构 `onToolProcessEvent`
调用 `tool.execute` 前增加:
```ts
const operationId = buildOperationId(tc.id);
const emitProcessEvent = (event: RuntimeProcessEventInput) => {
const normalized = normalizeRuntimeProcessEvent({
operationId,
targetType: event.targetType || 'tool',
source: event.source || 'tool',
...event,
});
onToolProcessEvent?.({
toolCallId: tc.id,
operationId,
toolName: tc.name,
args: effectiveArgs,
event: normalized,
});
};
```
把:
```ts
const result = await tool.execute(tc.id, effectiveArgs);
```
改为:
```ts
const result = await tool.execute(tc.id, effectiveArgs, emitProcessEvent);
```
- [ ] **Step 6在 agent 中透传 callback**
修改 `packages/backend/src/modules/netaclaw/runtime/agent.ts`
增加 import
```ts
import type { RuntimeProcessEvent } from './process_events.js';
```
`AgentRunParams` 增加:
```ts
onToolProcessEvent?: (input: {
toolCallId: string;
operationId: string;
toolName: string;
args: Record<string, unknown>;
event: RuntimeProcessEvent;
}) => void;
```
`runAttempt()` 参数中增加:
```ts
onToolProcessEvent: params.onToolProcessEvent,
```
- [ ] **Step 7运行测试**
```bash
pnpm --filter @neta/backend test -- --runInBand test/tool_process_events.test.ts
```
预期PASS。
---
## Task 2Skill 子进程过程事件解析
**文件:**
- 修改:`packages/backend/src/modules/netaclaw/service/skill_executor.ts`
- 测试:`packages/backend/test/skill_process_events.test.ts`
- [ ] **Step 1先写失败测试**
创建 `packages/backend/test/skill_process_events.test.ts`
```ts
import { normalizeSkillProcessLine } from '../src/modules/netaclaw/service/skill_executor';
describe('skill process event parsing', () => {
it('只解析显式 type=process_event 的 JSON 行', () => {
const event = normalizeSkillProcessLine(
'vehicle-damage-inspection',
JSON.stringify({
type: 'process_event',
time: '2026-05-06T04:07:17.187Z',
stage: 'frames',
message: 'frames extracted',
taskId: 'inspection-1',
frameCount: 361,
duration: 120.38,
resolution: '960x544',
}),
7,
);
expect(event).toEqual({
source: 'skill',
targetType: 'skill',
taskId: 'inspection-1',
stage: 'frames',
status: 'running',
level: 'info',
message: 'frames extracted',
timestamp: '2026-05-06T04:07:17.187Z',
sequence: 7,
payload: {
frameCount: 361,
duration: 120.38,
resolution: '960x544',
},
});
});
it('从 batch/totalBatches 推导 current total percent', () => {
const event = normalizeSkillProcessLine(
'vehicle-damage-inspection',
JSON.stringify({
type: 'process_event',
time: '2026-05-06T04:08:11.443Z',
stage: 'detect',
message: 'batch completed',
batch: 3,
totalBatches: 8,
damageCount: 3,
}),
8,
);
expect(event).toEqual(expect.objectContaining({
source: 'skill',
targetType: 'skill',
stage: 'detect',
message: 'batch completed',
current: 3,
total: 8,
percent: 38,
payload: expect.objectContaining({ damageCount: 3 }),
}));
});
it('忽略普通 stderr', () => {
expect(normalizeSkillProcessLine('demo', 'plain stderr', 1)).toBeNull();
});
it('忽略没有 process_event 标记的普通 JSON', () => {
expect(normalizeSkillProcessLine('demo', '{"stage":"frames","message":"too broad"}', 1)).toBeNull();
});
});
```
- [ ] **Step 2运行测试确认失败**
```bash
pnpm --filter @neta/backend test -- --runInBand test/skill_process_events.test.ts
```
预期:失败,因为 `normalizeSkillProcessLine` 还不存在。
- [ ] **Step 3实现解析器**
`packages/backend/src/modules/netaclaw/service/skill_executor.ts` 中增加 import
```ts
import type { RuntimeProcessEventInput } from '../runtime/process_events.js';
```
扩展 `SkillExecuteParams`
```ts
toolCallId?: string;
onProcessEvent?: (event: RuntimeProcessEventInput) => void;
```
`SkillExecutorService` 之前增加:
```ts
const RESERVED_PROCESS_KEYS = new Set([
'type',
'time',
'timestamp',
'stage',
'message',
'taskId',
'level',
'status',
'current',
'total',
'percent',
'batch',
'totalBatches',
]);
export function normalizeSkillProcessLine(
_skillName: string,
line: string,
sequence: number,
): RuntimeProcessEventInput | null {
let parsed: Record<string, unknown>;
try {
parsed = JSON.parse(line);
} catch {
return null;
}
if (!parsed || typeof parsed !== 'object' || Array.isArray(parsed)) return null;
if (parsed.type !== 'process_event') return null;
if (typeof parsed.message !== 'string' && typeof parsed.stage !== 'string') return null;
const current = typeof parsed.current === 'number'
? parsed.current
: typeof parsed.batch === 'number'
? parsed.batch
: undefined;
const total = typeof parsed.total === 'number'
? parsed.total
: typeof parsed.totalBatches === 'number'
? parsed.totalBatches
: undefined;
const percent = typeof parsed.percent === 'number'
? parsed.percent
: total && current !== undefined
? Math.max(0, Math.min(100, Math.round((current / total) * 100)))
: undefined;
const payload: Record<string, unknown> = {};
for (const [key, value] of Object.entries(parsed)) {
if (!RESERVED_PROCESS_KEYS.has(key)) payload[key] = value;
}
return {
source: 'skill',
targetType: 'skill',
taskId: typeof parsed.taskId === 'string' ? parsed.taskId : undefined,
stage: typeof parsed.stage === 'string' ? parsed.stage : undefined,
status: isProcessStatus(parsed.status) ? parsed.status : 'running',
level: isProcessLevel(parsed.level) ? parsed.level : 'info',
message: typeof parsed.message === 'string' ? parsed.message : String(parsed.stage || 'running'),
current,
total,
percent,
timestamp: typeof parsed.time === 'string'
? parsed.time
: typeof parsed.timestamp === 'string'
? parsed.timestamp
: new Date().toISOString(),
sequence,
payload: Object.keys(payload).length > 0 ? payload : undefined,
};
}
function isProcessStatus(value: unknown): value is RuntimeProcessEventInput['status'] {
return value === 'queued'
|| value === 'started'
|| value === 'running'
|| value === 'completed'
|| value === 'failed'
|| value === 'cancelled'
|| value === 'skipped';
}
function isProcessLevel(value: unknown): value is RuntimeProcessEventInput['level'] {
return value === 'debug' || value === 'info' || value === 'warning' || value === 'error';
}
```
- [ ] **Step 4从 stderr 中提取过程事件**
`execute()` 中增加:
```ts
let processSequence = 0;
```
在 stderr line loop 中 logger 后增加:
```ts
const processEvent = normalizeSkillProcessLine(skillName, line, ++processSequence);
if (processEvent) {
params.onProcessEvent?.(processEvent);
}
```
- [ ] **Step 5补失败终态事件**
timeout 时发:
```ts
params.onProcessEvent?.({
source: 'runtime',
targetType: 'skill',
taskId: typeof input.taskId === 'string' ? input.taskId : undefined,
stage: 'skill-timeout',
status: 'failed',
level: 'error',
message: `Execution timed out after ${timeout}ms`,
});
```
非 0 退出、spawn error、stdout JSON 解析失败时同样发 `status: 'failed'``source: 'runtime'``targetType: 'skill'`
- [ ] **Step 6运行测试**
```bash
pnpm --filter @neta/backend test -- --runInBand test/skill_process_events.test.ts
```
预期PASS。
---
## Task 3execute_skill 桥接 skill 过程事件
**文件:**
- 修改:`packages/backend/src/modules/netaclaw/tools/builtin/execute_skill.ts`
- 测试:扩展 `packages/backend/test/tool_process_events.test.ts`
- [ ] **Step 1增加失败测试**
`packages/backend/test/tool_process_events.test.ts` 追加:
```ts
import { createExecuteSkillTool } from '../src/modules/netaclaw/tools/builtin/execute_skill';
describe('execute_skill process bridge', () => {
it('把 skill 过程事件转发给 tool update callback', async () => {
const executor = {
execute: jest.fn(async (params: any) => {
params.onProcessEvent({
source: 'skill',
targetType: 'skill',
taskId: 'task-1',
stage: 'frames',
status: 'running',
message: 'extracting frames',
current: 1,
total: 4,
});
return { success: true, output: { ok: true }, duration: 10 };
}),
} as any;
const updates: any[] = [];
const tool = createExecuteSkillTool(executor);
await tool.execute(
'call-skill-1',
{ name: 'vehicle-damage-inspection', input: { taskId: 'task-1' } },
event => updates.push(event),
);
expect(executor.execute).toHaveBeenCalledWith(expect.objectContaining({
skillName: 'vehicle-damage-inspection',
input: { taskId: 'task-1' },
toolCallId: 'call-skill-1',
}));
expect(updates).toEqual([
expect.objectContaining({
source: 'skill',
targetType: 'skill',
taskId: 'task-1',
stage: 'frames',
message: 'extracting frames',
current: 1,
total: 4,
percent: 25,
}),
]);
});
});
```
- [ ] **Step 2运行测试确认失败**
```bash
pnpm --filter @neta/backend test -- --runInBand test/tool_process_events.test.ts
```
- [ ] **Step 3修改 execute_skill**
`packages/backend/src/modules/netaclaw/tools/builtin/execute_skill.ts` 中把 execute 改成接收第三参数:
```ts
async execute(id, params, onUpdate) {
```
调用 executor 时改为:
```ts
const result = await executor.execute({
skillName: params.name,
input: params.input as Record<string, unknown>,
toolCallId: id,
onProcessEvent: event => {
const total = typeof event.total === 'number' ? event.total : undefined;
const current = typeof event.current === 'number' ? event.current : undefined;
onUpdate?.({
...event,
source: 'skill',
targetType: event.targetType || 'skill',
percent: typeof event.percent === 'number'
? event.percent
: total && current !== undefined
? Math.max(0, Math.min(100, Math.round((current / total) * 100)))
: undefined,
});
},
});
```
- [ ] **Step 4运行测试**
```bash
pnpm --filter @neta/backend test -- --runInBand test/tool_process_events.test.ts test/skill_process_events.test.ts
```
预期PASS。
---
## Task 4Gateway 协议、节流、metadata 摘要和 JSONL 记录
**文件:**
- 修改:`packages/backend/src/modules/netaclaw/runtime/process_events.ts`
- 修改:`packages/backend/src/modules/netaclaw/gateway/protocol.ts`
- 修改:`packages/backend/src/modules/netaclaw/gateway/server.ts`
- 测试:`packages/backend/test/process_event_buffer.test.ts`
- [ ] **Step 1写轻量 buffer 测试**
创建 `packages/backend/test/process_event_buffer.test.ts`
```ts
import { ProcessEventBuffer } from '../src/modules/netaclaw/runtime/process_events';
import type { RuntimeProcessEvent } from '../src/modules/netaclaw/runtime/process_events';
function event(overrides: Partial<RuntimeProcessEvent>): RuntimeProcessEvent {
return {
version: 1,
operationId: 'tool-call-1',
targetType: 'skill',
source: 'skill',
status: 'running',
level: 'info',
message: 'batch completed',
timestamp: '2026-05-06T04:00:00.000Z',
...overrides,
};
}
describe('ProcessEventBuffer', () => {
it('合并重复过程事件,并 flush 最新事件', () => {
const emitted: RuntimeProcessEvent[] = [];
const buffer = new ProcessEventBuffer(e => emitted.push(e), { throttleMs: 500 });
buffer.push(event({ operationId: 'op-1', stage: 'detect', sequence: 1 }));
buffer.push(event({ operationId: 'op-1', stage: 'detect', sequence: 2 }));
buffer.flushAll();
expect(emitted).toHaveLength(1);
expect(emitted[0]).toEqual(expect.objectContaining({
operationId: 'op-1',
stage: 'detect',
sequence: 2,
payload: expect.objectContaining({ repeatCount: 2 }),
}));
});
it('终态和错误事件立即发送', () => {
const emitted: RuntimeProcessEvent[] = [];
const buffer = new ProcessEventBuffer(e => emitted.push(e), { throttleMs: 500 });
buffer.push(event({ operationId: 'op-1', status: 'failed', level: 'error', message: 'failed' }));
expect(emitted).toHaveLength(1);
expect(emitted[0]).toEqual(expect.objectContaining({ status: 'failed', message: 'failed' }));
});
});
```
- [ ] **Step 2运行测试确认失败**
```bash
pnpm --filter @neta/backend test -- --runInBand test/process_event_buffer.test.ts
```
- [ ] **Step 3在 process_events.ts 中增加轻量 buffer**
`packages/backend/src/modules/netaclaw/runtime/process_events.ts` 追加:
```ts
export interface ProcessEventBufferOptions {
throttleMs?: number;
}
interface BufferedEvent {
event: RuntimeProcessEvent;
repeatCount: number;
timer?: NodeJS.Timeout;
}
export class ProcessEventBuffer {
private readonly throttleMs: number;
private readonly pending = new Map<string, BufferedEvent>();
constructor(
private readonly emit: (event: RuntimeProcessEvent) => void,
options: ProcessEventBufferOptions = {},
) {
this.throttleMs = options.throttleMs ?? 400;
}
push(event: RuntimeProcessEvent) {
if (isTerminalProcessEvent(event)) {
this.flushOperation(event.operationId);
this.emit(event);
return;
}
const key = `${event.operationId}:${event.stage || ''}:${event.message}`;
const existing = this.pending.get(key);
if (existing) {
existing.event = event;
existing.repeatCount += 1;
return;
}
const buffered: BufferedEvent = {
event,
repeatCount: 1,
timer: setTimeout(() => this.flushKey(key), this.throttleMs),
};
this.pending.set(key, buffered);
}
flushAll() {
for (const key of [...this.pending.keys()]) {
this.flushKey(key);
}
}
private flushOperation(operationId: string) {
for (const [key, buffered] of [...this.pending.entries()]) {
if (buffered.event.operationId === operationId) {
this.flushKey(key);
}
}
}
private flushKey(key: string) {
const buffered = this.pending.get(key);
if (!buffered) return;
if (buffered.timer) clearTimeout(buffered.timer);
this.pending.delete(key);
this.emit({
...buffered.event,
payload: buffered.repeatCount > 1
? { ...(buffered.event.payload || {}), repeatCount: buffered.repeatCount }
: buffered.event.payload,
});
}
}
function isTerminalProcessEvent(event: RuntimeProcessEvent): boolean {
return event.status === 'completed'
|| event.status === 'failed'
|| event.status === 'cancelled'
|| event.status === 'skipped'
|| event.level === 'error';
}
```
- [ ] **Step 4增加 WebSocket 协议**
`packages/backend/src/modules/netaclaw/gateway/protocol.ts` 增加:
```ts
export interface ServerToolProcessEvent {
type: 'tool_process_event';
sessionId: string;
data: {
version: 1;
operationId: string;
parentOperationId?: string;
targetType: 'tool' | 'skill' | 'subagent' | 'task' | 'runtime';
toolCallId: string;
assistantEntryId?: string;
toolName: string;
toolLabel?: string;
source: 'skill' | 'tool' | 'subagent' | 'runtime';
taskId?: string;
stage?: string;
status: 'queued' | 'started' | 'running' | 'completed' | 'failed' | 'cancelled' | 'skipped';
level?: 'debug' | 'info' | 'warning' | 'error';
message: string;
detail?: string;
current?: number;
total?: number;
percent?: number;
payload?: Record<string, unknown>;
timestamp: string;
sequence: number;
};
}
```
`ServerToolProcessEvent` 加入 `ServerEvent` union。
- [ ] **Step 5Gateway 发送 live event 并保存 metadata 摘要**
`packages/backend/src/modules/netaclaw/gateway/server.ts` import
```ts
import {
ProcessEventBuffer,
RuntimeProcessEvent,
RuntimeProcessLogRef,
appendRuntimeProcessLog,
buildProcessLogRef,
sampleProcessEventsForMetadata,
} from '../runtime/process_events.js';
```
扩展 `toolExecutions` item
```ts
processEvents?: RuntimeProcessEvent[];
processLogRef?: RuntimeProcessLogRef;
```
`buildToolExecutionMetadata()` 的每个 item 中增加:
```ts
latestProcessEvent: te.processEvents?.at(-1),
processEvents: sampleProcessEventsForMetadata(te.processEvents || [], 50),
processLogRef: te.processLogRef,
```
在 runner 内创建 buffer
```ts
const processEventBuffer = new ProcessEventBuffer(event => {
this.send({
type: 'tool_process_event',
sessionId: sid,
data: event as any,
});
});
```
`runAgent` 增加:
```ts
onToolProcessEvent: ({ toolCallId, operationId, toolName, event }) => {
const projectionId = toolCallId || randomUUID();
const te = [...toolExecutions].reverse().find(t => t.toolCallId === projectionId)
|| [...toolExecutions].reverse().find(t => t.name === toolName && t.status === 'running');
const sequence = (te?.processEvents?.length || 0) + 1;
const processEvent: RuntimeProcessEvent = {
...event,
sequence,
};
if (te) {
te.processEvents = [...(te.processEvents || []), processEvent].slice(-50);
te.processLogRef = appendRuntimeProcessLog({
sessionId: sid,
operationId: processEvent.operationId || operationId,
toolCallId: projectionId,
event: processEvent,
});
}
processEventBuffer.push({
...processEvent,
payload: processEvent.payload,
} as RuntimeProcessEvent & {
toolCallId: string;
assistantEntryId?: string;
toolName: string;
toolLabel?: string;
});
},
```
注意:如果 TypeScript 不接受扩展字段,给发送 payload 单独构造对象,不要污染 `RuntimeProcessEvent` 基础类型。
- [ ] **Step 6失败时发终态事件**
`failRunningTools()` 中为每个 running tool 追加 `status: 'failed'` 的过程事件,并调用 `processEventBuffer.push(...)`
逻辑引用使用:
```ts
te.processLogRef = buildProcessLogRef({
sessionId: sid,
operationId: `tool-${te.toolCallId}`,
toolCallId: te.toolCallId,
});
```
不要把本地 JSONL 绝对路径写进 metadata。
- [ ] **Step 7flush pending events**
`runAgent` 正常完成后、构建 final metadata 前调用:
```ts
processEventBuffer.flushAll();
```
在 catch 中 `failRunningTools(...)` 后也调用。
- [ ] **Step 8运行测试和构建**
```bash
pnpm --filter @neta/backend test -- --runInBand test/tool_process_events.test.ts test/skill_process_events.test.ts test/process_event_buffer.test.ts
pnpm --filter @neta/backend build
```
预期PASS。
---
## Task 5前端类型和 chat store 投影
**文件:**
- 修改:`packages/frontend/src/modules/agent/types/index.d.ts`
- 修改:`packages/frontend/src/modules/agent/store/chat.ts`
- [ ] **Step 1增加前端类型**
`packages/frontend/src/modules/agent/types/index.d.ts` 增加:
```ts
export interface RuntimeProcessLogRef {
kind: 'runtime_process_log';
sessionId: string;
operationId: string;
toolCallId?: string;
}
export interface RuntimeProcessEventProjection {
version: 1;
operationId: string;
parentOperationId?: string;
targetType: 'tool' | 'skill' | 'subagent' | 'task' | 'runtime';
source: 'skill' | 'tool' | 'subagent' | 'runtime';
taskId?: string;
stage?: string;
status: 'queued' | 'started' | 'running' | 'completed' | 'failed' | 'cancelled' | 'skipped';
level?: 'debug' | 'info' | 'warning' | 'error';
message: string;
detail?: string;
current?: number;
total?: number;
percent?: number;
payload?: Record<string, unknown>;
timestamp: string;
sequence: number;
}
```
`StreamingToolProjection` 增加:
```ts
latestProcessEvent?: RuntimeProcessEventProjection;
processEvents?: RuntimeProcessEventProjection[];
processLogRef?: RuntimeProcessLogRef;
```
`WSServerEvent.type` 增加 `'tool_process_event'`
- [ ] **Step 2从 metadata 恢复过程事件**
`packages/frontend/src/modules/agent/store/chat.ts``toStreamingToolsFromEntry()` 中,每个 projected tool 增加:
```ts
latestProcessEvent: normalizeToolProcessEvent(item.latestProcessEvent),
processEvents: normalizeToolProcessEvents(item.processEvents),
processLogRef: normalizeProcessLogRef(item.processLogRef),
```
增加 helper
```ts
function normalizeToolProcessEvents(value: unknown): import('../types/index.d').RuntimeProcessEventProjection[] | undefined {
if (!Array.isArray(value)) return undefined;
const events = value
.map(normalizeToolProcessEvent)
.filter((item): item is import('../types/index.d').RuntimeProcessEventProjection => Boolean(item));
return events.length > 0 ? events.slice(-50) : undefined;
}
function normalizeToolProcessEvent(value: unknown): import('../types/index.d').RuntimeProcessEventProjection | undefined {
if (!value || typeof value !== 'object' || Array.isArray(value)) return undefined;
const record = value as Record<string, any>;
if (typeof record.message !== 'string') return undefined;
return {
version: 1,
operationId: typeof record.operationId === 'string' ? record.operationId : `legacy-${record.toolCallId || record.sequence || Date.now()}`,
parentOperationId: typeof record.parentOperationId === 'string' ? record.parentOperationId : undefined,
targetType: ['tool', 'skill', 'subagent', 'task', 'runtime'].includes(record.targetType) ? record.targetType : 'tool',
source: ['skill', 'tool', 'subagent', 'runtime'].includes(record.source) ? record.source : 'tool',
taskId: typeof record.taskId === 'string' ? record.taskId : undefined,
stage: typeof record.stage === 'string' ? record.stage : undefined,
status: ['queued', 'started', 'running', 'completed', 'failed', 'cancelled', 'skipped'].includes(record.status) ? record.status : 'running',
level: ['debug', 'info', 'warning', 'error'].includes(record.level) ? record.level : undefined,
message: record.message,
detail: typeof record.detail === 'string' ? record.detail : undefined,
current: typeof record.current === 'number' ? record.current : undefined,
total: typeof record.total === 'number' ? record.total : undefined,
percent: typeof record.percent === 'number' ? record.percent : undefined,
payload: record.payload && typeof record.payload === 'object' && !Array.isArray(record.payload) ? record.payload : undefined,
timestamp: typeof record.timestamp === 'string' ? record.timestamp : new Date().toISOString(),
sequence: typeof record.sequence === 'number' ? record.sequence : 0,
};
}
function normalizeProcessLogRef(value: unknown): import('../types/index.d').RuntimeProcessLogRef | undefined {
if (!value || typeof value !== 'object' || Array.isArray(value)) return undefined;
const record = value as Record<string, any>;
if (record.kind !== 'runtime_process_log') return undefined;
if (typeof record.sessionId !== 'string' || typeof record.operationId !== 'string') return undefined;
return {
kind: 'runtime_process_log',
sessionId: record.sessionId,
operationId: record.operationId,
toolCallId: typeof record.toolCallId === 'string' ? record.toolCallId : undefined,
};
}
```
- [ ] **Step 3处理 live tool_process_event**
`handleWSEvent()` 中增加:
```ts
case 'tool_process_event':
if (event.data?.toolCallId) {
const processEvent = normalizeToolProcessEvent(event.data);
if (processEvent) {
const existing = streamingTools.value.find(item => item.toolCallId === event.data.toolCallId);
const processEvents = [...(existing?.processEvents || []), processEvent].slice(-50);
upsertStreamingTool({
toolCallId: event.data.toolCallId,
assistantEntryId: event.data.assistantEntryId,
toolName: event.data.toolName,
label: event.data.toolLabel,
status: existing?.status || 'running',
args: existing?.args,
result: existing?.result,
rawResult: existing?.rawResult,
runtimeRoute: existing?.runtimeRoute,
workerRoutingHint: existing?.workerRoutingHint,
blockedReason: existing?.blockedReason,
policySources: existing?.policySources,
processEvents,
latestProcessEvent: processEvent,
processLogRef: existing?.processLogRef,
});
}
}
break;
```
- [ ] **Step 4运行前端类型检查**
```bash
pnpm --filter @neta/frontend type-check
```
预期PASS。如果存在无关旧错误记录下来并在最终执行 `pnpm --filter @neta/frontend build`
---
## Task 6前端过程时间线 UI
**文件:**
- 新增:`packages/frontend/src/modules/agent/components/tool-process-timeline.vue`
- 修改:`packages/frontend/src/modules/agent/components/skill-card.vue`
- 修改:实际渲染 `streamingTools` 的父组件或 renderer
- [ ] **Step 1新增独立时间线组件**
创建 `packages/frontend/src/modules/agent/components/tool-process-timeline.vue`
```vue
<template>
<div v-if="events.length" class="tool-process">
<el-button text size="small" @click="showProcess = !showProcess">
{{ showProcess ? '收起过程' : `查看过程 ${events.length}` }}
</el-button>
<el-collapse-transition>
<ol v-if="showProcess" class="tool-process__timeline">
<li
v-for="event in events"
:key="`${event.operationId}-${event.sequence}-${event.timestamp}`"
class="tool-process__item"
:class="[`is-${event.level || 'info'}`]"
>
<span class="tool-process__dot" />
<span class="tool-process__main">
<span class="tool-process__message">{{ event.message }}</span>
<span v-if="event.current !== undefined && event.total !== undefined" class="tool-process__count">
{{ event.current }}/{{ event.total }}
</span>
</span>
</li>
</ol>
</el-collapse-transition>
</div>
</template>
<script lang="ts" setup>
import { computed, ref } from 'vue';
import type { RuntimeProcessEventProjection } from '../types/index.d';
const props = defineProps<{
events?: RuntimeProcessEventProjection[];
}>();
const showProcess = ref(false);
const events = computed(() => props.events || []);
</script>
<style lang="scss" scoped>
.tool-process {
margin-top: 6px;
}
.tool-process__timeline {
list-style: none;
margin: 8px 0 0;
padding: 0;
display: grid;
gap: 6px;
}
.tool-process__item {
display: grid;
grid-template-columns: 10px 1fr;
gap: 8px;
align-items: start;
color: var(--el-text-color-secondary);
font-size: 12px;
line-height: 1.45;
}
.tool-process__dot {
width: 6px;
height: 6px;
margin-top: 6px;
border-radius: 50%;
background: var(--el-color-primary);
}
.tool-process__item.is-warning .tool-process__dot {
background: var(--el-color-warning);
}
.tool-process__item.is-error .tool-process__dot {
background: var(--el-color-danger);
}
.tool-process__main {
min-width: 0;
display: flex;
gap: 6px;
align-items: baseline;
}
.tool-process__message {
min-width: 0;
overflow-wrap: anywhere;
}
.tool-process__count {
flex: none;
color: var(--el-text-color-placeholder);
}
</style>
```
- [ ] **Step 2修改 skill-card**
`skill-card.vue` 中增加:
```ts
import type { RuntimeProcessEventProjection } from '../types/index.d';
import ToolProcessTimeline from './tool-process-timeline.vue';
```
props 增加:
```ts
latestProcessEvent?: RuntimeProcessEventProjection;
processEvents?: RuntimeProcessEventProjection[];
```
把 running 区域改为:
```vue
<transition name="fade">
<div v-if="status === 'running' || latestProcessEvent" class="skill-card__progress">
<el-progress
v-if="typeof latestProcessEvent?.percent === 'number' || typeof percent === 'number'"
:percentage="latestProcessEvent?.percent ?? percent ?? 0"
:stroke-width="4"
:show-text="false"
/>
<span v-if="latestProcessEvent?.message || step || detail" class="skill-card__step">
{{ latestProcessEvent?.message || step || detail }}
</span>
<span v-if="latestProcessEvent?.detail" class="skill-card__detail">
{{ latestProcessEvent.detail }}
</span>
</div>
</transition>
<ToolProcessTimeline :events="processEvents" />
```
增加样式:
```scss
.skill-card__detail {
display: block;
margin-top: 2px;
font-size: 12px;
color: var(--el-text-color-placeholder);
line-height: 1.45;
}
```
- [ ] **Step 3确认父组件传参**
搜索:
```bash
rg -n "<skill-card|SkillCard|streamingTools" packages/frontend/src/modules/agent
```
确保最终渲染工具卡的位置传入:
```vue
:latest-process-event="tool.latestProcessEvent"
:process-events="tool.processEvents"
```
如果走 renderer registry确保 renderer 保留:
```ts
latestProcessEvent: tool.latestProcessEvent,
processEvents: tool.processEvents,
processLogRef: tool.processLogRef,
```
- [ ] **Step 4运行构建**
```bash
pnpm --filter @neta/frontend build
```
预期PASS。
---
## Task 7端到端验证
**文件:**
- 不要求新增源码文件。
- 使用现有 `vehicle-damage-inspection` skill。
- [ ] **Step 1运行后端测试**
```bash
pnpm --filter @neta/backend test -- --runInBand test/tool_process_events.test.ts test/skill_process_events.test.ts test/process_event_buffer.test.ts
```
预期PASS。
- [ ] **Step 2运行后端构建**
```bash
pnpm --filter @neta/backend build
```
预期PASS。
- [ ] **Step 3运行前端构建**
```bash
pnpm --filter @neta/frontend build
```
预期PASS。
- [ ] **Step 4手工验证旧伤检测过程**
启动项目:
```bash
pnpm dev
```
在 Agent 对话页输入:
```text
请检查是否有旧伤
```
预期:
- `execute_skill` 工具卡出现。
- 旧伤检测过程中,卡片持续显示当前阶段,例如:
- `creating workspace`
- `extracting frames`
- `frames extracted`
- `batch started`
- `batch completed`
- `grounding damage candidates`
- `selecting best frames`
- 展开过程后可以看到采样后的过程时间线。
- assistant metadata 中有 `latestProcessEvent``processEvents`、逻辑 `processLogRef`
- `processLogRef` 不包含本地绝对路径。
- 后端数据目录下存在完整 JSONL 过程日志。
- 刷新页面后,已完成会话仍能显示 metadata 中保存的过程摘要。
- 模型最终回复和 tool result 不受影响。
- [ ] **Step 5检查模型上下文边界**
检查 `buildLLMMessages` 和 session-tree runtime context 构建逻辑,确认:
```text
latestProcessEvent
processEvents
processLogRef
```
不会被注入 LLM history。
- [ ] **Step 6并发关联验证**
如果能触发同一轮里多个工具或多次 skill 调用,验证:
- 过程事件按 `toolCallId` 合并到正确工具卡。
- 同名 skill 不串线。
- 每个事件都有稳定 `version=1``operationId``targetType``timestamp`
- 失败时卡片收到 `failed` 过程事件,并结束 running 状态。
---
## 自检清单
- [ ] 第一版只做 `execute_skill` 过程可见,没有扩展成完整任务中心。
- [ ] 事件结构支持未来 subagent/tool/background task但不在本期强行接入。
- [ ] `processLogRef` 是逻辑引用,不暴露本地绝对路径。
- [ ] 过程事件只存在 metadata/UI replay/log不进入 LLM 上下文。
- [ ] payload 做敏感字段脱敏和长字符串截断。
- [ ] stderr 解析只接受 `type: "process_event"`,不会误解析普通 JSON 日志。
- [ ] gateway 中只保留必要桥接逻辑,通用 normalize/sample/log/buffer 放在轻量 helper 中。
- [ ] 前端不是单纯进度条,而是“最新步骤 + 可展开过程时间线”。
- [ ] 后端测试、后端 build、前端 build 全部通过。