45 KiB
运行时过程事件实施计划
给执行代理的要求: 实施本计划时必须使用
superpowers:subagent-driven-development(推荐)或superpowers:executing-plans。每个步骤使用 checkbox(- [ ])跟踪。
目标: 为长耗时工具、compute-entry skill 以及未来的子代理/批处理任务增加一条 Pi 风格的运行时过程事件流,让 Agent 对话页能实时显示“执行到了哪一步”,而不是只显示最终结果或单一进度条。
架构: 在现有 NetaClaw 工具生命周期 started -> result -> completed 中增加 process-event。第一期 WebSocket 事件名保留为 tool_process_event,便于接入当前 streamingTools 投影;但事件 payload 采用通用 RuntimeProcessEvent,包含 version、operationId、parentOperationId、targetType、toolCallId、taskId 等字段,避免绑定死某一个 skill。execute_skill 负责把 skill 子进程输出的结构化过程事件桥接到 runtime callback;gateway 负责把事件节流后发给前端,并在 assistant metadata 中保存摘要,同时写完整 JSONL 过程日志供后端排查。
关键边界:
- 过程事件只用于 UI 展示、过程回放和后端排查,不进入 LLM history,也不能被
buildLLMMessages带回模型上下文。 processLogRef只能是逻辑引用,不能把本地绝对路径暴露给前端。- 第一期不做完整事件总线,不做任务中心,不做完整日志查看 API,只做一个轻量 runtime helper,避免过度设计。
- 第一期只保证
execute_skill过程可见;事件结构保留未来接入普通 tool、subagent、后台任务的能力。
技术栈: TypeScript、Midway.js WebSocket、NetaClaw runtime tools、Pinia/Vue 3、Element Plus、Jest、vue-tsc/Vite build。
文件结构
后端:
- 新增
packages/backend/src/modules/netaclaw/runtime/process_events.ts- 定义
RuntimeProcessEvent、RuntimeProcessEventInput。 - 负责事件归一化、
operationId生成、百分比计算、payload 脱敏、metadata 采样、逻辑processLogRef生成、JSONL 记录、轻量节流/去重。 - 注意:这是轻量 helper,不是完整 EventBus。
- 定义
- 修改
packages/backend/src/modules/netaclaw/tools/common.ts- 增加
ToolProcessUpdateCallback。 - 扩展
AgentTool.execute(),增加可选第三参数onUpdate。
- 增加
- 修改
packages/backend/src/modules/netaclaw/runtime/attempt.ts- 增加
onToolProcessEvent。 - 调用 tool 时传入
emitProcessEvent。
- 增加
- 修改
packages/backend/src/modules/netaclaw/runtime/agent.ts- 把
onToolProcessEvent从runAgent透传到runAttempt。
- 把
- 修改
packages/backend/src/modules/netaclaw/service/skill_executor.ts- 只解析显式
type: "process_event"的 JSON 行。 - 保持普通 stderr 原样进后端日志。
- timeout、非 0 退出、spawn error、最终 JSON 解析失败时,发终态失败过程事件。
- 只解析显式
- 修改
packages/backend/src/modules/netaclaw/tools/builtin/execute_skill.ts- 把 skill 过程事件桥接成 tool process update。
- 修改
packages/backend/src/modules/netaclaw/gateway/protocol.ts- 增加
ServerToolProcessEvent。
- 增加
- 修改
packages/backend/src/modules/netaclaw/gateway/server.ts- 接收 runtime process event。
- 发
tool_process_event给前端。 - 在
skillExecutionsmetadata 中保存latestProcessEvent、采样后的processEvents、逻辑processLogRef。 - 不把完整本地文件路径放入前端 payload。
后端测试:
- 新增
packages/backend/test/tool_process_events.test.ts - 新增
packages/backend/test/skill_process_events.test.ts - 新增
packages/backend/test/process_event_buffer.test.ts
前端:
- 修改
packages/frontend/src/modules/agent/types/index.d.ts- 增加
RuntimeProcessEventProjection。 - 给
StreamingToolProjection增加latestProcessEvent、processEvents、processLogRef。 - 给
WSServerEvent.type增加tool_process_event。
- 增加
- 修改
packages/frontend/src/modules/agent/store/chat.ts- 处理 live
tool_process_event。 - 从 assistant metadata 恢复过程事件摘要。
- 按
toolCallId合并到对应streamingTool。
- 处理 live
- 新增
packages/frontend/src/modules/agent/components/tool-process-timeline.vue- 独立渲染过程时间线。
- 修改
packages/frontend/src/modules/agent/components/skill-card.vue- 展示最新过程事件。
- 只有存在 numeric percent 时才显示进度条。
- 嵌入
tool-process-timeline.vue。
- 修改实际渲染
streamingTools的父组件或 renderer- 明确传入
latestProcessEvent、processEvents、processLogRef。
- 明确传入
Task 1:后端运行时过程事件类型与工具回调契约
文件:
-
新增:
packages/backend/src/modules/netaclaw/runtime/process_events.ts -
修改:
packages/backend/src/modules/netaclaw/tools/common.ts -
修改:
packages/backend/src/modules/netaclaw/runtime/attempt.ts -
修改:
packages/backend/src/modules/netaclaw/runtime/agent.ts -
测试:
packages/backend/test/tool_process_events.test.ts -
Step 1:先写失败测试
创建 packages/backend/test/tool_process_events.test.ts:
import { Type } from '@sinclair/typebox';
import { runAttempt } from '../src/modules/netaclaw/runtime/attempt';
import type { AnyAgentTool } from '../src/modules/netaclaw/tools/common';
jest.mock('../src/modules/netaclaw/runtime/model_selection', () => ({
getProvider: () => ({
chat: jest
.fn()
.mockResolvedValueOnce({
content: '',
usage: { inputTokens: 1, outputTokens: 1 },
toolCalls: [{ id: 'call-1', name: 'slow_tool', arguments: '{"taskId":"task-1"}' }],
})
.mockResolvedValueOnce({
content: 'done',
usage: { inputTokens: 1, outputTokens: 1 },
toolCalls: [],
}),
}),
}));
describe('runtime process events', () => {
it('从执行中的 tool 发出带 toolCallId 和 operationId 的过程事件', async () => {
const updates: any[] = [];
const tool: AnyAgentTool = {
name: 'slow_tool',
label: 'Slow Tool',
description: 'test tool',
parameters: Type.Object({ taskId: Type.String() }),
async execute(_id, _params, onUpdate) {
onUpdate?.({
source: 'tool',
targetType: 'tool',
stage: 'prepare',
status: 'running',
message: 'Preparing work',
current: 1,
total: 2,
});
return { type: 'text', text: 'ok' };
},
};
await runAttempt({
messages: [{ role: 'user', content: 'run' }],
tools: [tool],
config: { provider: 'openai', model: 'test', apiKey: 'test' },
onToolProcessEvent: event => updates.push(event),
});
expect(updates).toEqual([
expect.objectContaining({
toolCallId: 'call-1',
operationId: 'tool-call-1',
toolName: 'slow_tool',
args: { taskId: 'task-1' },
event: expect.objectContaining({
version: 1,
operationId: 'tool-call-1',
targetType: 'tool',
source: 'tool',
stage: 'prepare',
status: 'running',
message: 'Preparing work',
current: 1,
total: 2,
percent: 50,
}),
}),
]);
});
});
- Step 2:运行测试,确认失败
pnpm --filter @neta/backend test -- --runInBand test/tool_process_events.test.ts
预期:失败,因为 RuntimeProcessEvent、onToolProcessEvent、onUpdate 还不存在。
- Step 3:新增运行时过程事件 helper
创建 packages/backend/src/modules/netaclaw/runtime/process_events.ts:
import { randomUUID } from 'crypto';
import * as fs from 'fs';
import * as path from 'path';
import { pDataPath } from '../../../comm/path.js';
export type RuntimeProcessTargetType = 'tool' | 'skill' | 'subagent' | 'task' | 'runtime';
export type RuntimeProcessSource = 'skill' | 'tool' | 'subagent' | 'runtime';
export type RuntimeProcessStatus = 'queued' | 'started' | 'running' | 'completed' | 'failed' | 'cancelled' | 'skipped';
export type RuntimeProcessLevel = 'debug' | 'info' | 'warning' | 'error';
export interface RuntimeProcessEvent {
version: 1;
operationId: string;
parentOperationId?: string;
targetType: RuntimeProcessTargetType;
source: RuntimeProcessSource;
taskId?: string;
stage?: string;
status: RuntimeProcessStatus;
level: RuntimeProcessLevel;
message: string;
detail?: string;
current?: number;
total?: number;
percent?: number;
payload?: Record<string, unknown>;
timestamp: string;
sequence?: number;
}
export type RuntimeProcessEventInput =
Partial<Omit<RuntimeProcessEvent, 'version' | 'operationId' | 'targetType' | 'source' | 'status' | 'level' | 'message' | 'timestamp'>> & {
operationId?: string;
parentOperationId?: string;
targetType?: RuntimeProcessTargetType;
source?: RuntimeProcessSource;
status?: RuntimeProcessStatus;
level?: RuntimeProcessLevel;
message: string;
timestamp?: string;
};
export interface RuntimeProcessLogRef {
kind: 'runtime_process_log';
sessionId: string;
operationId: string;
toolCallId?: string;
}
const SENSITIVE_KEY_PATTERN = /(api[_-]?key|token|password|secret|authorization|credential)/i;
const MAX_PAYLOAD_STRING_LENGTH = 500;
export function buildOperationId(toolCallId?: string): string {
return toolCallId ? `tool-${toolCallId}` : `op-${randomUUID()}`;
}
export function normalizeRuntimeProcessEvent(input: RuntimeProcessEventInput): RuntimeProcessEvent {
const total = typeof input.total === 'number' ? input.total : undefined;
const current = typeof input.current === 'number' ? input.current : undefined;
const percent = typeof input.percent === 'number'
? clampPercent(input.percent)
: total && current !== undefined
? clampPercent(Math.round((current / total) * 100))
: undefined;
return {
version: 1,
operationId: input.operationId || buildOperationId(),
parentOperationId: input.parentOperationId,
targetType: input.targetType || 'runtime',
source: input.source || 'runtime',
taskId: input.taskId,
stage: input.stage,
status: input.status || 'running',
level: input.level || 'info',
message: input.message,
detail: input.detail,
current,
total,
percent,
payload: sanitizeProcessPayload(input.payload),
timestamp: input.timestamp || new Date().toISOString(),
sequence: input.sequence,
};
}
export function sanitizeProcessPayload(payload: unknown): Record<string, unknown> | undefined {
if (!payload || typeof payload !== 'object' || Array.isArray(payload)) return undefined;
const output: Record<string, unknown> = {};
for (const [key, value] of Object.entries(payload as Record<string, unknown>)) {
if (SENSITIVE_KEY_PATTERN.test(key)) {
output[key] = '[redacted]';
} else if (typeof value === 'string') {
output[key] = value.length > MAX_PAYLOAD_STRING_LENGTH
? `${value.slice(0, MAX_PAYLOAD_STRING_LENGTH)}...`
: value;
} else if (typeof value === 'number' || typeof value === 'boolean' || value === null) {
output[key] = value;
} else {
output[key] = '[object]';
}
}
return Object.keys(output).length > 0 ? output : undefined;
}
export function sampleProcessEventsForMetadata<T>(events: T[], limit = 50): T[] {
if (events.length <= limit) return events;
const headCount = Math.min(10, Math.floor(limit / 5));
return [
...events.slice(0, headCount),
...events.slice(-(limit - headCount)),
];
}
export function buildProcessLogRef(input: {
sessionId: string;
operationId: string;
toolCallId?: string;
}): RuntimeProcessLogRef {
return {
kind: 'runtime_process_log',
sessionId: input.sessionId,
operationId: input.operationId,
toolCallId: input.toolCallId,
};
}
export function appendRuntimeProcessLog(input: {
sessionId: string;
operationId: string;
toolCallId?: string;
event: RuntimeProcessEvent;
}): RuntimeProcessLogRef {
const dir = path.join(pDataPath(), 'netaclaw-process-events', input.sessionId);
fs.mkdirSync(dir, { recursive: true });
const filename = `${safeFilePart(input.operationId)}.jsonl`;
fs.appendFileSync(path.join(dir, filename), `${JSON.stringify(input.event)}\n`, 'utf8');
return buildProcessLogRef(input);
}
function clampPercent(value: number): number {
return Math.max(0, Math.min(100, value));
}
function safeFilePart(value: string): string {
return value.replace(/[^a-zA-Z0-9_.-]/g, '_').slice(0, 120);
}
- Step 4:扩展 tool execute 契约
修改 packages/backend/src/modules/netaclaw/tools/common.ts。
增加 import:
import type { RuntimeProcessEventInput } from '../runtime/process_events.js';
增加类型:
export type ToolProcessUpdateCallback = (event: RuntimeProcessEventInput) => void;
把 AgentTool.execute 改为:
execute(id: string, params: Static<TParams>, onUpdate?: ToolProcessUpdateCallback): Promise<TResult>;
- Step 5:在 attempt 中桥接 tool update
修改 packages/backend/src/modules/netaclaw/runtime/attempt.ts。
增加 import:
import { AnyAgentTool, ToolResultContent, toolResultToText } from '../tools/common.js';
import { RuntimeProcessEvent, RuntimeProcessEventInput, buildOperationId, normalizeRuntimeProcessEvent } from './process_events.js';
给 AttemptParams 增加:
onToolProcessEvent?: (input: {
toolCallId: string;
operationId: string;
toolName: string;
args: Record<string, unknown>;
event: RuntimeProcessEvent;
}) => void;
在 runAttempt() 中解构 onToolProcessEvent。
调用 tool.execute 前增加:
const operationId = buildOperationId(tc.id);
const emitProcessEvent = (event: RuntimeProcessEventInput) => {
const normalized = normalizeRuntimeProcessEvent({
operationId,
targetType: event.targetType || 'tool',
source: event.source || 'tool',
...event,
});
onToolProcessEvent?.({
toolCallId: tc.id,
operationId,
toolName: tc.name,
args: effectiveArgs,
event: normalized,
});
};
把:
const result = await tool.execute(tc.id, effectiveArgs);
改为:
const result = await tool.execute(tc.id, effectiveArgs, emitProcessEvent);
- Step 6:在 agent 中透传 callback
修改 packages/backend/src/modules/netaclaw/runtime/agent.ts。
增加 import:
import type { RuntimeProcessEvent } from './process_events.js';
给 AgentRunParams 增加:
onToolProcessEvent?: (input: {
toolCallId: string;
operationId: string;
toolName: string;
args: Record<string, unknown>;
event: RuntimeProcessEvent;
}) => void;
在 runAttempt() 参数中增加:
onToolProcessEvent: params.onToolProcessEvent,
- Step 7:运行测试
pnpm --filter @neta/backend test -- --runInBand test/tool_process_events.test.ts
预期:PASS。
Task 2:Skill 子进程过程事件解析
文件:
-
修改:
packages/backend/src/modules/netaclaw/service/skill_executor.ts -
测试:
packages/backend/test/skill_process_events.test.ts -
Step 1:先写失败测试
创建 packages/backend/test/skill_process_events.test.ts:
import { normalizeSkillProcessLine } from '../src/modules/netaclaw/service/skill_executor';
describe('skill process event parsing', () => {
it('只解析显式 type=process_event 的 JSON 行', () => {
const event = normalizeSkillProcessLine(
'vehicle-damage-inspection',
JSON.stringify({
type: 'process_event',
time: '2026-05-06T04:07:17.187Z',
stage: 'frames',
message: 'frames extracted',
taskId: 'inspection-1',
frameCount: 361,
duration: 120.38,
resolution: '960x544',
}),
7,
);
expect(event).toEqual({
source: 'skill',
targetType: 'skill',
taskId: 'inspection-1',
stage: 'frames',
status: 'running',
level: 'info',
message: 'frames extracted',
timestamp: '2026-05-06T04:07:17.187Z',
sequence: 7,
payload: {
frameCount: 361,
duration: 120.38,
resolution: '960x544',
},
});
});
it('从 batch/totalBatches 推导 current total percent', () => {
const event = normalizeSkillProcessLine(
'vehicle-damage-inspection',
JSON.stringify({
type: 'process_event',
time: '2026-05-06T04:08:11.443Z',
stage: 'detect',
message: 'batch completed',
batch: 3,
totalBatches: 8,
damageCount: 3,
}),
8,
);
expect(event).toEqual(expect.objectContaining({
source: 'skill',
targetType: 'skill',
stage: 'detect',
message: 'batch completed',
current: 3,
total: 8,
percent: 38,
payload: expect.objectContaining({ damageCount: 3 }),
}));
});
it('忽略普通 stderr', () => {
expect(normalizeSkillProcessLine('demo', 'plain stderr', 1)).toBeNull();
});
it('忽略没有 process_event 标记的普通 JSON', () => {
expect(normalizeSkillProcessLine('demo', '{"stage":"frames","message":"too broad"}', 1)).toBeNull();
});
});
- Step 2:运行测试,确认失败
pnpm --filter @neta/backend test -- --runInBand test/skill_process_events.test.ts
预期:失败,因为 normalizeSkillProcessLine 还不存在。
- Step 3:实现解析器
在 packages/backend/src/modules/netaclaw/service/skill_executor.ts 中增加 import:
import type { RuntimeProcessEventInput } from '../runtime/process_events.js';
扩展 SkillExecuteParams:
toolCallId?: string;
onProcessEvent?: (event: RuntimeProcessEventInput) => void;
在 SkillExecutorService 之前增加:
const RESERVED_PROCESS_KEYS = new Set([
'type',
'time',
'timestamp',
'stage',
'message',
'taskId',
'level',
'status',
'current',
'total',
'percent',
'batch',
'totalBatches',
]);
export function normalizeSkillProcessLine(
_skillName: string,
line: string,
sequence: number,
): RuntimeProcessEventInput | null {
let parsed: Record<string, unknown>;
try {
parsed = JSON.parse(line);
} catch {
return null;
}
if (!parsed || typeof parsed !== 'object' || Array.isArray(parsed)) return null;
if (parsed.type !== 'process_event') return null;
if (typeof parsed.message !== 'string' && typeof parsed.stage !== 'string') return null;
const current = typeof parsed.current === 'number'
? parsed.current
: typeof parsed.batch === 'number'
? parsed.batch
: undefined;
const total = typeof parsed.total === 'number'
? parsed.total
: typeof parsed.totalBatches === 'number'
? parsed.totalBatches
: undefined;
const percent = typeof parsed.percent === 'number'
? parsed.percent
: total && current !== undefined
? Math.max(0, Math.min(100, Math.round((current / total) * 100)))
: undefined;
const payload: Record<string, unknown> = {};
for (const [key, value] of Object.entries(parsed)) {
if (!RESERVED_PROCESS_KEYS.has(key)) payload[key] = value;
}
return {
source: 'skill',
targetType: 'skill',
taskId: typeof parsed.taskId === 'string' ? parsed.taskId : undefined,
stage: typeof parsed.stage === 'string' ? parsed.stage : undefined,
status: isProcessStatus(parsed.status) ? parsed.status : 'running',
level: isProcessLevel(parsed.level) ? parsed.level : 'info',
message: typeof parsed.message === 'string' ? parsed.message : String(parsed.stage || 'running'),
current,
total,
percent,
timestamp: typeof parsed.time === 'string'
? parsed.time
: typeof parsed.timestamp === 'string'
? parsed.timestamp
: new Date().toISOString(),
sequence,
payload: Object.keys(payload).length > 0 ? payload : undefined,
};
}
function isProcessStatus(value: unknown): value is RuntimeProcessEventInput['status'] {
return value === 'queued'
|| value === 'started'
|| value === 'running'
|| value === 'completed'
|| value === 'failed'
|| value === 'cancelled'
|| value === 'skipped';
}
function isProcessLevel(value: unknown): value is RuntimeProcessEventInput['level'] {
return value === 'debug' || value === 'info' || value === 'warning' || value === 'error';
}
- Step 4:从 stderr 中提取过程事件
在 execute() 中增加:
let processSequence = 0;
在 stderr line loop 中 logger 后增加:
const processEvent = normalizeSkillProcessLine(skillName, line, ++processSequence);
if (processEvent) {
params.onProcessEvent?.(processEvent);
}
- Step 5:补失败终态事件
timeout 时发:
params.onProcessEvent?.({
source: 'runtime',
targetType: 'skill',
taskId: typeof input.taskId === 'string' ? input.taskId : undefined,
stage: 'skill-timeout',
status: 'failed',
level: 'error',
message: `Execution timed out after ${timeout}ms`,
});
非 0 退出、spawn error、stdout JSON 解析失败时同样发 status: 'failed',source: 'runtime',targetType: 'skill'。
- Step 6:运行测试
pnpm --filter @neta/backend test -- --runInBand test/skill_process_events.test.ts
预期:PASS。
Task 3:execute_skill 桥接 skill 过程事件
文件:
-
修改:
packages/backend/src/modules/netaclaw/tools/builtin/execute_skill.ts -
测试:扩展
packages/backend/test/tool_process_events.test.ts -
Step 1:增加失败测试
在 packages/backend/test/tool_process_events.test.ts 追加:
import { createExecuteSkillTool } from '../src/modules/netaclaw/tools/builtin/execute_skill';
describe('execute_skill process bridge', () => {
it('把 skill 过程事件转发给 tool update callback', async () => {
const executor = {
execute: jest.fn(async (params: any) => {
params.onProcessEvent({
source: 'skill',
targetType: 'skill',
taskId: 'task-1',
stage: 'frames',
status: 'running',
message: 'extracting frames',
current: 1,
total: 4,
});
return { success: true, output: { ok: true }, duration: 10 };
}),
} as any;
const updates: any[] = [];
const tool = createExecuteSkillTool(executor);
await tool.execute(
'call-skill-1',
{ name: 'vehicle-damage-inspection', input: { taskId: 'task-1' } },
event => updates.push(event),
);
expect(executor.execute).toHaveBeenCalledWith(expect.objectContaining({
skillName: 'vehicle-damage-inspection',
input: { taskId: 'task-1' },
toolCallId: 'call-skill-1',
}));
expect(updates).toEqual([
expect.objectContaining({
source: 'skill',
targetType: 'skill',
taskId: 'task-1',
stage: 'frames',
message: 'extracting frames',
current: 1,
total: 4,
percent: 25,
}),
]);
});
});
- Step 2:运行测试,确认失败
pnpm --filter @neta/backend test -- --runInBand test/tool_process_events.test.ts
- Step 3:修改 execute_skill
在 packages/backend/src/modules/netaclaw/tools/builtin/execute_skill.ts 中把 execute 改成接收第三参数:
async execute(id, params, onUpdate) {
调用 executor 时改为:
const result = await executor.execute({
skillName: params.name,
input: params.input as Record<string, unknown>,
toolCallId: id,
onProcessEvent: event => {
const total = typeof event.total === 'number' ? event.total : undefined;
const current = typeof event.current === 'number' ? event.current : undefined;
onUpdate?.({
...event,
source: 'skill',
targetType: event.targetType || 'skill',
percent: typeof event.percent === 'number'
? event.percent
: total && current !== undefined
? Math.max(0, Math.min(100, Math.round((current / total) * 100)))
: undefined,
});
},
});
- Step 4:运行测试
pnpm --filter @neta/backend test -- --runInBand test/tool_process_events.test.ts test/skill_process_events.test.ts
预期:PASS。
Task 4:Gateway 协议、节流、metadata 摘要和 JSONL 记录
文件:
-
修改:
packages/backend/src/modules/netaclaw/runtime/process_events.ts -
修改:
packages/backend/src/modules/netaclaw/gateway/protocol.ts -
修改:
packages/backend/src/modules/netaclaw/gateway/server.ts -
测试:
packages/backend/test/process_event_buffer.test.ts -
Step 1:写轻量 buffer 测试
创建 packages/backend/test/process_event_buffer.test.ts:
import { ProcessEventBuffer } from '../src/modules/netaclaw/runtime/process_events';
import type { RuntimeProcessEvent } from '../src/modules/netaclaw/runtime/process_events';
function event(overrides: Partial<RuntimeProcessEvent>): RuntimeProcessEvent {
return {
version: 1,
operationId: 'tool-call-1',
targetType: 'skill',
source: 'skill',
status: 'running',
level: 'info',
message: 'batch completed',
timestamp: '2026-05-06T04:00:00.000Z',
...overrides,
};
}
describe('ProcessEventBuffer', () => {
it('合并重复过程事件,并 flush 最新事件', () => {
const emitted: RuntimeProcessEvent[] = [];
const buffer = new ProcessEventBuffer(e => emitted.push(e), { throttleMs: 500 });
buffer.push(event({ operationId: 'op-1', stage: 'detect', sequence: 1 }));
buffer.push(event({ operationId: 'op-1', stage: 'detect', sequence: 2 }));
buffer.flushAll();
expect(emitted).toHaveLength(1);
expect(emitted[0]).toEqual(expect.objectContaining({
operationId: 'op-1',
stage: 'detect',
sequence: 2,
payload: expect.objectContaining({ repeatCount: 2 }),
}));
});
it('终态和错误事件立即发送', () => {
const emitted: RuntimeProcessEvent[] = [];
const buffer = new ProcessEventBuffer(e => emitted.push(e), { throttleMs: 500 });
buffer.push(event({ operationId: 'op-1', status: 'failed', level: 'error', message: 'failed' }));
expect(emitted).toHaveLength(1);
expect(emitted[0]).toEqual(expect.objectContaining({ status: 'failed', message: 'failed' }));
});
});
- Step 2:运行测试,确认失败
pnpm --filter @neta/backend test -- --runInBand test/process_event_buffer.test.ts
- Step 3:在 process_events.ts 中增加轻量 buffer
在 packages/backend/src/modules/netaclaw/runtime/process_events.ts 追加:
export interface ProcessEventBufferOptions {
throttleMs?: number;
}
interface BufferedEvent {
event: RuntimeProcessEvent;
repeatCount: number;
timer?: NodeJS.Timeout;
}
export class ProcessEventBuffer {
private readonly throttleMs: number;
private readonly pending = new Map<string, BufferedEvent>();
constructor(
private readonly emit: (event: RuntimeProcessEvent) => void,
options: ProcessEventBufferOptions = {},
) {
this.throttleMs = options.throttleMs ?? 400;
}
push(event: RuntimeProcessEvent) {
if (isTerminalProcessEvent(event)) {
this.flushOperation(event.operationId);
this.emit(event);
return;
}
const key = `${event.operationId}:${event.stage || ''}:${event.message}`;
const existing = this.pending.get(key);
if (existing) {
existing.event = event;
existing.repeatCount += 1;
return;
}
const buffered: BufferedEvent = {
event,
repeatCount: 1,
timer: setTimeout(() => this.flushKey(key), this.throttleMs),
};
this.pending.set(key, buffered);
}
flushAll() {
for (const key of [...this.pending.keys()]) {
this.flushKey(key);
}
}
private flushOperation(operationId: string) {
for (const [key, buffered] of [...this.pending.entries()]) {
if (buffered.event.operationId === operationId) {
this.flushKey(key);
}
}
}
private flushKey(key: string) {
const buffered = this.pending.get(key);
if (!buffered) return;
if (buffered.timer) clearTimeout(buffered.timer);
this.pending.delete(key);
this.emit({
...buffered.event,
payload: buffered.repeatCount > 1
? { ...(buffered.event.payload || {}), repeatCount: buffered.repeatCount }
: buffered.event.payload,
});
}
}
function isTerminalProcessEvent(event: RuntimeProcessEvent): boolean {
return event.status === 'completed'
|| event.status === 'failed'
|| event.status === 'cancelled'
|| event.status === 'skipped'
|| event.level === 'error';
}
- Step 4:增加 WebSocket 协议
在 packages/backend/src/modules/netaclaw/gateway/protocol.ts 增加:
export interface ServerToolProcessEvent {
type: 'tool_process_event';
sessionId: string;
data: {
version: 1;
operationId: string;
parentOperationId?: string;
targetType: 'tool' | 'skill' | 'subagent' | 'task' | 'runtime';
toolCallId: string;
assistantEntryId?: string;
toolName: string;
toolLabel?: string;
source: 'skill' | 'tool' | 'subagent' | 'runtime';
taskId?: string;
stage?: string;
status: 'queued' | 'started' | 'running' | 'completed' | 'failed' | 'cancelled' | 'skipped';
level?: 'debug' | 'info' | 'warning' | 'error';
message: string;
detail?: string;
current?: number;
total?: number;
percent?: number;
payload?: Record<string, unknown>;
timestamp: string;
sequence: number;
};
}
把 ServerToolProcessEvent 加入 ServerEvent union。
- Step 5:Gateway 发送 live event 并保存 metadata 摘要
在 packages/backend/src/modules/netaclaw/gateway/server.ts import:
import {
ProcessEventBuffer,
RuntimeProcessEvent,
RuntimeProcessLogRef,
appendRuntimeProcessLog,
buildProcessLogRef,
sampleProcessEventsForMetadata,
} from '../runtime/process_events.js';
扩展 toolExecutions item:
processEvents?: RuntimeProcessEvent[];
processLogRef?: RuntimeProcessLogRef;
在 buildToolExecutionMetadata() 的每个 item 中增加:
latestProcessEvent: te.processEvents?.at(-1),
processEvents: sampleProcessEventsForMetadata(te.processEvents || [], 50),
processLogRef: te.processLogRef,
在 runner 内创建 buffer:
const processEventBuffer = new ProcessEventBuffer(event => {
this.send({
type: 'tool_process_event',
sessionId: sid,
data: event as any,
});
});
给 runAgent 增加:
onToolProcessEvent: ({ toolCallId, operationId, toolName, event }) => {
const projectionId = toolCallId || randomUUID();
const te = [...toolExecutions].reverse().find(t => t.toolCallId === projectionId)
|| [...toolExecutions].reverse().find(t => t.name === toolName && t.status === 'running');
const sequence = (te?.processEvents?.length || 0) + 1;
const processEvent: RuntimeProcessEvent = {
...event,
sequence,
};
if (te) {
te.processEvents = [...(te.processEvents || []), processEvent].slice(-50);
te.processLogRef = appendRuntimeProcessLog({
sessionId: sid,
operationId: processEvent.operationId || operationId,
toolCallId: projectionId,
event: processEvent,
});
}
processEventBuffer.push({
...processEvent,
payload: processEvent.payload,
} as RuntimeProcessEvent & {
toolCallId: string;
assistantEntryId?: string;
toolName: string;
toolLabel?: string;
});
},
注意:如果 TypeScript 不接受扩展字段,给发送 payload 单独构造对象,不要污染 RuntimeProcessEvent 基础类型。
- Step 6:失败时发终态事件
在 failRunningTools() 中为每个 running tool 追加 status: 'failed' 的过程事件,并调用 processEventBuffer.push(...)。
逻辑引用使用:
te.processLogRef = buildProcessLogRef({
sessionId: sid,
operationId: `tool-${te.toolCallId}`,
toolCallId: te.toolCallId,
});
不要把本地 JSONL 绝对路径写进 metadata。
- Step 7:flush pending events
在 runAgent 正常完成后、构建 final metadata 前调用:
processEventBuffer.flushAll();
在 catch 中 failRunningTools(...) 后也调用。
- Step 8:运行测试和构建
pnpm --filter @neta/backend test -- --runInBand test/tool_process_events.test.ts test/skill_process_events.test.ts test/process_event_buffer.test.ts
pnpm --filter @neta/backend build
预期:PASS。
Task 5:前端类型和 chat store 投影
文件:
-
修改:
packages/frontend/src/modules/agent/types/index.d.ts -
修改:
packages/frontend/src/modules/agent/store/chat.ts -
Step 1:增加前端类型
在 packages/frontend/src/modules/agent/types/index.d.ts 增加:
export interface RuntimeProcessLogRef {
kind: 'runtime_process_log';
sessionId: string;
operationId: string;
toolCallId?: string;
}
export interface RuntimeProcessEventProjection {
version: 1;
operationId: string;
parentOperationId?: string;
targetType: 'tool' | 'skill' | 'subagent' | 'task' | 'runtime';
source: 'skill' | 'tool' | 'subagent' | 'runtime';
taskId?: string;
stage?: string;
status: 'queued' | 'started' | 'running' | 'completed' | 'failed' | 'cancelled' | 'skipped';
level?: 'debug' | 'info' | 'warning' | 'error';
message: string;
detail?: string;
current?: number;
total?: number;
percent?: number;
payload?: Record<string, unknown>;
timestamp: string;
sequence: number;
}
给 StreamingToolProjection 增加:
latestProcessEvent?: RuntimeProcessEventProjection;
processEvents?: RuntimeProcessEventProjection[];
processLogRef?: RuntimeProcessLogRef;
给 WSServerEvent.type 增加 'tool_process_event'。
- Step 2:从 metadata 恢复过程事件
在 packages/frontend/src/modules/agent/store/chat.ts 的 toStreamingToolsFromEntry() 中,每个 projected tool 增加:
latestProcessEvent: normalizeToolProcessEvent(item.latestProcessEvent),
processEvents: normalizeToolProcessEvents(item.processEvents),
processLogRef: normalizeProcessLogRef(item.processLogRef),
增加 helper:
function normalizeToolProcessEvents(value: unknown): import('../types/index.d').RuntimeProcessEventProjection[] | undefined {
if (!Array.isArray(value)) return undefined;
const events = value
.map(normalizeToolProcessEvent)
.filter((item): item is import('../types/index.d').RuntimeProcessEventProjection => Boolean(item));
return events.length > 0 ? events.slice(-50) : undefined;
}
function normalizeToolProcessEvent(value: unknown): import('../types/index.d').RuntimeProcessEventProjection | undefined {
if (!value || typeof value !== 'object' || Array.isArray(value)) return undefined;
const record = value as Record<string, any>;
if (typeof record.message !== 'string') return undefined;
return {
version: 1,
operationId: typeof record.operationId === 'string' ? record.operationId : `legacy-${record.toolCallId || record.sequence || Date.now()}`,
parentOperationId: typeof record.parentOperationId === 'string' ? record.parentOperationId : undefined,
targetType: ['tool', 'skill', 'subagent', 'task', 'runtime'].includes(record.targetType) ? record.targetType : 'tool',
source: ['skill', 'tool', 'subagent', 'runtime'].includes(record.source) ? record.source : 'tool',
taskId: typeof record.taskId === 'string' ? record.taskId : undefined,
stage: typeof record.stage === 'string' ? record.stage : undefined,
status: ['queued', 'started', 'running', 'completed', 'failed', 'cancelled', 'skipped'].includes(record.status) ? record.status : 'running',
level: ['debug', 'info', 'warning', 'error'].includes(record.level) ? record.level : undefined,
message: record.message,
detail: typeof record.detail === 'string' ? record.detail : undefined,
current: typeof record.current === 'number' ? record.current : undefined,
total: typeof record.total === 'number' ? record.total : undefined,
percent: typeof record.percent === 'number' ? record.percent : undefined,
payload: record.payload && typeof record.payload === 'object' && !Array.isArray(record.payload) ? record.payload : undefined,
timestamp: typeof record.timestamp === 'string' ? record.timestamp : new Date().toISOString(),
sequence: typeof record.sequence === 'number' ? record.sequence : 0,
};
}
function normalizeProcessLogRef(value: unknown): import('../types/index.d').RuntimeProcessLogRef | undefined {
if (!value || typeof value !== 'object' || Array.isArray(value)) return undefined;
const record = value as Record<string, any>;
if (record.kind !== 'runtime_process_log') return undefined;
if (typeof record.sessionId !== 'string' || typeof record.operationId !== 'string') return undefined;
return {
kind: 'runtime_process_log',
sessionId: record.sessionId,
operationId: record.operationId,
toolCallId: typeof record.toolCallId === 'string' ? record.toolCallId : undefined,
};
}
- Step 3:处理 live tool_process_event
在 handleWSEvent() 中增加:
case 'tool_process_event':
if (event.data?.toolCallId) {
const processEvent = normalizeToolProcessEvent(event.data);
if (processEvent) {
const existing = streamingTools.value.find(item => item.toolCallId === event.data.toolCallId);
const processEvents = [...(existing?.processEvents || []), processEvent].slice(-50);
upsertStreamingTool({
toolCallId: event.data.toolCallId,
assistantEntryId: event.data.assistantEntryId,
toolName: event.data.toolName,
label: event.data.toolLabel,
status: existing?.status || 'running',
args: existing?.args,
result: existing?.result,
rawResult: existing?.rawResult,
runtimeRoute: existing?.runtimeRoute,
workerRoutingHint: existing?.workerRoutingHint,
blockedReason: existing?.blockedReason,
policySources: existing?.policySources,
processEvents,
latestProcessEvent: processEvent,
processLogRef: existing?.processLogRef,
});
}
}
break;
- Step 4:运行前端类型检查
pnpm --filter @neta/frontend type-check
预期:PASS。如果存在无关旧错误,记录下来,并在最终执行 pnpm --filter @neta/frontend build。
Task 6:前端过程时间线 UI
文件:
-
新增:
packages/frontend/src/modules/agent/components/tool-process-timeline.vue -
修改:
packages/frontend/src/modules/agent/components/skill-card.vue -
修改:实际渲染
streamingTools的父组件或 renderer -
Step 1:新增独立时间线组件
创建 packages/frontend/src/modules/agent/components/tool-process-timeline.vue:
<template>
<div v-if="events.length" class="tool-process">
<el-button text size="small" @click="showProcess = !showProcess">
{{ showProcess ? '收起过程' : `查看过程 ${events.length}` }}
</el-button>
<el-collapse-transition>
<ol v-if="showProcess" class="tool-process__timeline">
<li
v-for="event in events"
:key="`${event.operationId}-${event.sequence}-${event.timestamp}`"
class="tool-process__item"
:class="[`is-${event.level || 'info'}`]"
>
<span class="tool-process__dot" />
<span class="tool-process__main">
<span class="tool-process__message">{{ event.message }}</span>
<span v-if="event.current !== undefined && event.total !== undefined" class="tool-process__count">
{{ event.current }}/{{ event.total }}
</span>
</span>
</li>
</ol>
</el-collapse-transition>
</div>
</template>
<script lang="ts" setup>
import { computed, ref } from 'vue';
import type { RuntimeProcessEventProjection } from '../types/index.d';
const props = defineProps<{
events?: RuntimeProcessEventProjection[];
}>();
const showProcess = ref(false);
const events = computed(() => props.events || []);
</script>
<style lang="scss" scoped>
.tool-process {
margin-top: 6px;
}
.tool-process__timeline {
list-style: none;
margin: 8px 0 0;
padding: 0;
display: grid;
gap: 6px;
}
.tool-process__item {
display: grid;
grid-template-columns: 10px 1fr;
gap: 8px;
align-items: start;
color: var(--el-text-color-secondary);
font-size: 12px;
line-height: 1.45;
}
.tool-process__dot {
width: 6px;
height: 6px;
margin-top: 6px;
border-radius: 50%;
background: var(--el-color-primary);
}
.tool-process__item.is-warning .tool-process__dot {
background: var(--el-color-warning);
}
.tool-process__item.is-error .tool-process__dot {
background: var(--el-color-danger);
}
.tool-process__main {
min-width: 0;
display: flex;
gap: 6px;
align-items: baseline;
}
.tool-process__message {
min-width: 0;
overflow-wrap: anywhere;
}
.tool-process__count {
flex: none;
color: var(--el-text-color-placeholder);
}
</style>
- Step 2:修改 skill-card
在 skill-card.vue 中增加:
import type { RuntimeProcessEventProjection } from '../types/index.d';
import ToolProcessTimeline from './tool-process-timeline.vue';
props 增加:
latestProcessEvent?: RuntimeProcessEventProjection;
processEvents?: RuntimeProcessEventProjection[];
把 running 区域改为:
<transition name="fade">
<div v-if="status === 'running' || latestProcessEvent" class="skill-card__progress">
<el-progress
v-if="typeof latestProcessEvent?.percent === 'number' || typeof percent === 'number'"
:percentage="latestProcessEvent?.percent ?? percent ?? 0"
:stroke-width="4"
:show-text="false"
/>
<span v-if="latestProcessEvent?.message || step || detail" class="skill-card__step">
{{ latestProcessEvent?.message || step || detail }}
</span>
<span v-if="latestProcessEvent?.detail" class="skill-card__detail">
{{ latestProcessEvent.detail }}
</span>
</div>
</transition>
<ToolProcessTimeline :events="processEvents" />
增加样式:
.skill-card__detail {
display: block;
margin-top: 2px;
font-size: 12px;
color: var(--el-text-color-placeholder);
line-height: 1.45;
}
- Step 3:确认父组件传参
搜索:
rg -n "<skill-card|SkillCard|streamingTools" packages/frontend/src/modules/agent
确保最终渲染工具卡的位置传入:
:latest-process-event="tool.latestProcessEvent"
:process-events="tool.processEvents"
如果走 renderer registry,确保 renderer 保留:
latestProcessEvent: tool.latestProcessEvent,
processEvents: tool.processEvents,
processLogRef: tool.processLogRef,
- Step 4:运行构建
pnpm --filter @neta/frontend build
预期:PASS。
Task 7:端到端验证
文件:
-
不要求新增源码文件。
-
使用现有
vehicle-damage-inspectionskill。 -
Step 1:运行后端测试
pnpm --filter @neta/backend test -- --runInBand test/tool_process_events.test.ts test/skill_process_events.test.ts test/process_event_buffer.test.ts
预期:PASS。
- Step 2:运行后端构建
pnpm --filter @neta/backend build
预期:PASS。
- Step 3:运行前端构建
pnpm --filter @neta/frontend build
预期:PASS。
- Step 4:手工验证旧伤检测过程
启动项目:
pnpm dev
在 Agent 对话页输入:
请检查是否有旧伤
预期:
-
execute_skill工具卡出现。 -
旧伤检测过程中,卡片持续显示当前阶段,例如:
creating workspaceextracting framesframes extractedbatch startedbatch completedgrounding damage candidatesselecting best frames
-
展开过程后可以看到采样后的过程时间线。
-
assistant metadata 中有
latestProcessEvent、processEvents、逻辑processLogRef。 -
processLogRef不包含本地绝对路径。 -
后端数据目录下存在完整 JSONL 过程日志。
-
刷新页面后,已完成会话仍能显示 metadata 中保存的过程摘要。
-
模型最终回复和 tool result 不受影响。
-
Step 5:检查模型上下文边界
检查 buildLLMMessages 和 session-tree runtime context 构建逻辑,确认:
latestProcessEvent
processEvents
processLogRef
不会被注入 LLM history。
- Step 6:并发关联验证
如果能触发同一轮里多个工具或多次 skill 调用,验证:
- 过程事件按
toolCallId合并到正确工具卡。 - 同名 skill 不串线。
- 每个事件都有稳定
version=1、operationId、targetType、timestamp。 - 失败时卡片收到
failed过程事件,并结束 running 状态。
自检清单
- 第一版只做
execute_skill过程可见,没有扩展成完整任务中心。 - 事件结构支持未来 subagent/tool/background task,但不在本期强行接入。
processLogRef是逻辑引用,不暴露本地绝对路径。- 过程事件只存在 metadata/UI replay/log,不进入 LLM 上下文。
- payload 做敏感字段脱敏和长字符串截断。
- stderr 解析只接受
type: "process_event",不会误解析普通 JSON 日志。 - gateway 中只保留必要桥接逻辑,通用 normalize/sample/log/buffer 放在轻量 helper 中。
- 前端不是单纯进度条,而是“最新步骤 + 可展开过程时间线”。
- 后端测试、后端 build、前端 build 全部通过。