GPU_GUARD_MONOREPO/docs/superpowers/plans/2026-05-06-tool-process-events.md
2026-05-20 21:39:12 +08:00

45 KiB
Raw Permalink Blame History

运行时过程事件实施计划

给执行代理的要求: 实施本计划时必须使用 superpowers:subagent-driven-development(推荐)或 superpowers:executing-plans。每个步骤使用 checkbox- [ ])跟踪。

目标: 为长耗时工具、compute-entry skill 以及未来的子代理/批处理任务增加一条 Pi 风格的运行时过程事件流,让 Agent 对话页能实时显示“执行到了哪一步”,而不是只显示最终结果或单一进度条。

架构: 在现有 NetaClaw 工具生命周期 started -> result -> completed 中增加 process-event。第一期 WebSocket 事件名保留为 tool_process_event,便于接入当前 streamingTools 投影;但事件 payload 采用通用 RuntimeProcessEvent,包含 versionoperationIdparentOperationIdtargetTypetoolCallIdtaskId 等字段,避免绑定死某一个 skill。execute_skill 负责把 skill 子进程输出的结构化过程事件桥接到 runtime callbackgateway 负责把事件节流后发给前端,并在 assistant metadata 中保存摘要,同时写完整 JSONL 过程日志供后端排查。

关键边界:

  • 过程事件只用于 UI 展示、过程回放和后端排查,不进入 LLM history也不能被 buildLLMMessages 带回模型上下文。
  • processLogRef 只能是逻辑引用,不能把本地绝对路径暴露给前端。
  • 第一期不做完整事件总线,不做任务中心,不做完整日志查看 API只做一个轻量 runtime helper避免过度设计。
  • 第一期只保证 execute_skill 过程可见;事件结构保留未来接入普通 tool、subagent、后台任务的能力。

技术栈: TypeScript、Midway.js WebSocket、NetaClaw runtime tools、Pinia/Vue 3、Element Plus、Jest、vue-tsc/Vite build。


文件结构

后端:

  • 新增 packages/backend/src/modules/netaclaw/runtime/process_events.ts
    • 定义 RuntimeProcessEventRuntimeProcessEventInput
    • 负责事件归一化、operationId 生成、百分比计算、payload 脱敏、metadata 采样、逻辑 processLogRef 生成、JSONL 记录、轻量节流/去重。
    • 注意:这是轻量 helper不是完整 EventBus。
  • 修改 packages/backend/src/modules/netaclaw/tools/common.ts
    • 增加 ToolProcessUpdateCallback
    • 扩展 AgentTool.execute(),增加可选第三参数 onUpdate
  • 修改 packages/backend/src/modules/netaclaw/runtime/attempt.ts
    • 增加 onToolProcessEvent
    • 调用 tool 时传入 emitProcessEvent
  • 修改 packages/backend/src/modules/netaclaw/runtime/agent.ts
    • onToolProcessEventrunAgent 透传到 runAttempt
  • 修改 packages/backend/src/modules/netaclaw/service/skill_executor.ts
    • 只解析显式 type: "process_event" 的 JSON 行。
    • 保持普通 stderr 原样进后端日志。
    • timeout、非 0 退出、spawn error、最终 JSON 解析失败时,发终态失败过程事件。
  • 修改 packages/backend/src/modules/netaclaw/tools/builtin/execute_skill.ts
    • 把 skill 过程事件桥接成 tool process update。
  • 修改 packages/backend/src/modules/netaclaw/gateway/protocol.ts
    • 增加 ServerToolProcessEvent
  • 修改 packages/backend/src/modules/netaclaw/gateway/server.ts
    • 接收 runtime process event。
    • tool_process_event 给前端。
    • skillExecutions metadata 中保存 latestProcessEvent、采样后的 processEvents、逻辑 processLogRef
    • 不把完整本地文件路径放入前端 payload。

后端测试:

  • 新增 packages/backend/test/tool_process_events.test.ts
  • 新增 packages/backend/test/skill_process_events.test.ts
  • 新增 packages/backend/test/process_event_buffer.test.ts

前端:

  • 修改 packages/frontend/src/modules/agent/types/index.d.ts
    • 增加 RuntimeProcessEventProjection
    • StreamingToolProjection 增加 latestProcessEventprocessEventsprocessLogRef
    • WSServerEvent.type 增加 tool_process_event
  • 修改 packages/frontend/src/modules/agent/store/chat.ts
    • 处理 live tool_process_event
    • 从 assistant metadata 恢复过程事件摘要。
    • toolCallId 合并到对应 streamingTool
  • 新增 packages/frontend/src/modules/agent/components/tool-process-timeline.vue
    • 独立渲染过程时间线。
  • 修改 packages/frontend/src/modules/agent/components/skill-card.vue
    • 展示最新过程事件。
    • 只有存在 numeric percent 时才显示进度条。
    • 嵌入 tool-process-timeline.vue
  • 修改实际渲染 streamingTools 的父组件或 renderer
    • 明确传入 latestProcessEventprocessEventsprocessLogRef

Task 1后端运行时过程事件类型与工具回调契约

文件:

  • 新增:packages/backend/src/modules/netaclaw/runtime/process_events.ts

  • 修改:packages/backend/src/modules/netaclaw/tools/common.ts

  • 修改:packages/backend/src/modules/netaclaw/runtime/attempt.ts

  • 修改:packages/backend/src/modules/netaclaw/runtime/agent.ts

  • 测试:packages/backend/test/tool_process_events.test.ts

  • Step 1先写失败测试

创建 packages/backend/test/tool_process_events.test.ts

import { Type } from '@sinclair/typebox';
import { runAttempt } from '../src/modules/netaclaw/runtime/attempt';
import type { AnyAgentTool } from '../src/modules/netaclaw/tools/common';

jest.mock('../src/modules/netaclaw/runtime/model_selection', () => ({
  getProvider: () => ({
    chat: jest
      .fn()
      .mockResolvedValueOnce({
        content: '',
        usage: { inputTokens: 1, outputTokens: 1 },
        toolCalls: [{ id: 'call-1', name: 'slow_tool', arguments: '{"taskId":"task-1"}' }],
      })
      .mockResolvedValueOnce({
        content: 'done',
        usage: { inputTokens: 1, outputTokens: 1 },
        toolCalls: [],
      }),
  }),
}));

describe('runtime process events', () => {
  it('从执行中的 tool 发出带 toolCallId 和 operationId 的过程事件', async () => {
    const updates: any[] = [];
    const tool: AnyAgentTool = {
      name: 'slow_tool',
      label: 'Slow Tool',
      description: 'test tool',
      parameters: Type.Object({ taskId: Type.String() }),
      async execute(_id, _params, onUpdate) {
        onUpdate?.({
          source: 'tool',
          targetType: 'tool',
          stage: 'prepare',
          status: 'running',
          message: 'Preparing work',
          current: 1,
          total: 2,
        });
        return { type: 'text', text: 'ok' };
      },
    };

    await runAttempt({
      messages: [{ role: 'user', content: 'run' }],
      tools: [tool],
      config: { provider: 'openai', model: 'test', apiKey: 'test' },
      onToolProcessEvent: event => updates.push(event),
    });

    expect(updates).toEqual([
      expect.objectContaining({
        toolCallId: 'call-1',
        operationId: 'tool-call-1',
        toolName: 'slow_tool',
        args: { taskId: 'task-1' },
        event: expect.objectContaining({
          version: 1,
          operationId: 'tool-call-1',
          targetType: 'tool',
          source: 'tool',
          stage: 'prepare',
          status: 'running',
          message: 'Preparing work',
          current: 1,
          total: 2,
          percent: 50,
        }),
      }),
    ]);
  });
});
  • Step 2运行测试确认失败
pnpm --filter @neta/backend test -- --runInBand test/tool_process_events.test.ts

预期:失败,因为 RuntimeProcessEventonToolProcessEventonUpdate 还不存在。

  • Step 3新增运行时过程事件 helper

创建 packages/backend/src/modules/netaclaw/runtime/process_events.ts

import { randomUUID } from 'crypto';
import * as fs from 'fs';
import * as path from 'path';
import { pDataPath } from '../../../comm/path.js';

export type RuntimeProcessTargetType = 'tool' | 'skill' | 'subagent' | 'task' | 'runtime';
export type RuntimeProcessSource = 'skill' | 'tool' | 'subagent' | 'runtime';
export type RuntimeProcessStatus = 'queued' | 'started' | 'running' | 'completed' | 'failed' | 'cancelled' | 'skipped';
export type RuntimeProcessLevel = 'debug' | 'info' | 'warning' | 'error';

export interface RuntimeProcessEvent {
  version: 1;
  operationId: string;
  parentOperationId?: string;
  targetType: RuntimeProcessTargetType;
  source: RuntimeProcessSource;
  taskId?: string;
  stage?: string;
  status: RuntimeProcessStatus;
  level: RuntimeProcessLevel;
  message: string;
  detail?: string;
  current?: number;
  total?: number;
  percent?: number;
  payload?: Record<string, unknown>;
  timestamp: string;
  sequence?: number;
}

export type RuntimeProcessEventInput =
  Partial<Omit<RuntimeProcessEvent, 'version' | 'operationId' | 'targetType' | 'source' | 'status' | 'level' | 'message' | 'timestamp'>> & {
    operationId?: string;
    parentOperationId?: string;
    targetType?: RuntimeProcessTargetType;
    source?: RuntimeProcessSource;
    status?: RuntimeProcessStatus;
    level?: RuntimeProcessLevel;
    message: string;
    timestamp?: string;
  };

export interface RuntimeProcessLogRef {
  kind: 'runtime_process_log';
  sessionId: string;
  operationId: string;
  toolCallId?: string;
}

const SENSITIVE_KEY_PATTERN = /(api[_-]?key|token|password|secret|authorization|credential)/i;
const MAX_PAYLOAD_STRING_LENGTH = 500;

export function buildOperationId(toolCallId?: string): string {
  return toolCallId ? `tool-${toolCallId}` : `op-${randomUUID()}`;
}

export function normalizeRuntimeProcessEvent(input: RuntimeProcessEventInput): RuntimeProcessEvent {
  const total = typeof input.total === 'number' ? input.total : undefined;
  const current = typeof input.current === 'number' ? input.current : undefined;
  const percent = typeof input.percent === 'number'
    ? clampPercent(input.percent)
    : total && current !== undefined
      ? clampPercent(Math.round((current / total) * 100))
      : undefined;

  return {
    version: 1,
    operationId: input.operationId || buildOperationId(),
    parentOperationId: input.parentOperationId,
    targetType: input.targetType || 'runtime',
    source: input.source || 'runtime',
    taskId: input.taskId,
    stage: input.stage,
    status: input.status || 'running',
    level: input.level || 'info',
    message: input.message,
    detail: input.detail,
    current,
    total,
    percent,
    payload: sanitizeProcessPayload(input.payload),
    timestamp: input.timestamp || new Date().toISOString(),
    sequence: input.sequence,
  };
}

export function sanitizeProcessPayload(payload: unknown): Record<string, unknown> | undefined {
  if (!payload || typeof payload !== 'object' || Array.isArray(payload)) return undefined;
  const output: Record<string, unknown> = {};
  for (const [key, value] of Object.entries(payload as Record<string, unknown>)) {
    if (SENSITIVE_KEY_PATTERN.test(key)) {
      output[key] = '[redacted]';
    } else if (typeof value === 'string') {
      output[key] = value.length > MAX_PAYLOAD_STRING_LENGTH
        ? `${value.slice(0, MAX_PAYLOAD_STRING_LENGTH)}...`
        : value;
    } else if (typeof value === 'number' || typeof value === 'boolean' || value === null) {
      output[key] = value;
    } else {
      output[key] = '[object]';
    }
  }
  return Object.keys(output).length > 0 ? output : undefined;
}

export function sampleProcessEventsForMetadata<T>(events: T[], limit = 50): T[] {
  if (events.length <= limit) return events;
  const headCount = Math.min(10, Math.floor(limit / 5));
  return [
    ...events.slice(0, headCount),
    ...events.slice(-(limit - headCount)),
  ];
}

export function buildProcessLogRef(input: {
  sessionId: string;
  operationId: string;
  toolCallId?: string;
}): RuntimeProcessLogRef {
  return {
    kind: 'runtime_process_log',
    sessionId: input.sessionId,
    operationId: input.operationId,
    toolCallId: input.toolCallId,
  };
}

export function appendRuntimeProcessLog(input: {
  sessionId: string;
  operationId: string;
  toolCallId?: string;
  event: RuntimeProcessEvent;
}): RuntimeProcessLogRef {
  const dir = path.join(pDataPath(), 'netaclaw-process-events', input.sessionId);
  fs.mkdirSync(dir, { recursive: true });
  const filename = `${safeFilePart(input.operationId)}.jsonl`;
  fs.appendFileSync(path.join(dir, filename), `${JSON.stringify(input.event)}\n`, 'utf8');
  return buildProcessLogRef(input);
}

function clampPercent(value: number): number {
  return Math.max(0, Math.min(100, value));
}

function safeFilePart(value: string): string {
  return value.replace(/[^a-zA-Z0-9_.-]/g, '_').slice(0, 120);
}
  • Step 4扩展 tool execute 契约

修改 packages/backend/src/modules/netaclaw/tools/common.ts

增加 import

import type { RuntimeProcessEventInput } from '../runtime/process_events.js';

增加类型:

export type ToolProcessUpdateCallback = (event: RuntimeProcessEventInput) => void;

AgentTool.execute 改为:

execute(id: string, params: Static<TParams>, onUpdate?: ToolProcessUpdateCallback): Promise<TResult>;
  • Step 5在 attempt 中桥接 tool update

修改 packages/backend/src/modules/netaclaw/runtime/attempt.ts

增加 import

import { AnyAgentTool, ToolResultContent, toolResultToText } from '../tools/common.js';
import { RuntimeProcessEvent, RuntimeProcessEventInput, buildOperationId, normalizeRuntimeProcessEvent } from './process_events.js';

AttemptParams 增加:

onToolProcessEvent?: (input: {
  toolCallId: string;
  operationId: string;
  toolName: string;
  args: Record<string, unknown>;
  event: RuntimeProcessEvent;
}) => void;

runAttempt() 中解构 onToolProcessEvent

调用 tool.execute 前增加:

const operationId = buildOperationId(tc.id);
const emitProcessEvent = (event: RuntimeProcessEventInput) => {
  const normalized = normalizeRuntimeProcessEvent({
    operationId,
    targetType: event.targetType || 'tool',
    source: event.source || 'tool',
    ...event,
  });
  onToolProcessEvent?.({
    toolCallId: tc.id,
    operationId,
    toolName: tc.name,
    args: effectiveArgs,
    event: normalized,
  });
};

把:

const result = await tool.execute(tc.id, effectiveArgs);

改为:

const result = await tool.execute(tc.id, effectiveArgs, emitProcessEvent);
  • Step 6在 agent 中透传 callback

修改 packages/backend/src/modules/netaclaw/runtime/agent.ts

增加 import

import type { RuntimeProcessEvent } from './process_events.js';

AgentRunParams 增加:

onToolProcessEvent?: (input: {
  toolCallId: string;
  operationId: string;
  toolName: string;
  args: Record<string, unknown>;
  event: RuntimeProcessEvent;
}) => void;

runAttempt() 参数中增加:

onToolProcessEvent: params.onToolProcessEvent,
  • Step 7运行测试
pnpm --filter @neta/backend test -- --runInBand test/tool_process_events.test.ts

预期PASS。


Task 2Skill 子进程过程事件解析

文件:

  • 修改:packages/backend/src/modules/netaclaw/service/skill_executor.ts

  • 测试:packages/backend/test/skill_process_events.test.ts

  • Step 1先写失败测试

创建 packages/backend/test/skill_process_events.test.ts

import { normalizeSkillProcessLine } from '../src/modules/netaclaw/service/skill_executor';

describe('skill process event parsing', () => {
  it('只解析显式 type=process_event 的 JSON 行', () => {
    const event = normalizeSkillProcessLine(
      'vehicle-damage-inspection',
      JSON.stringify({
        type: 'process_event',
        time: '2026-05-06T04:07:17.187Z',
        stage: 'frames',
        message: 'frames extracted',
        taskId: 'inspection-1',
        frameCount: 361,
        duration: 120.38,
        resolution: '960x544',
      }),
      7,
    );

    expect(event).toEqual({
      source: 'skill',
      targetType: 'skill',
      taskId: 'inspection-1',
      stage: 'frames',
      status: 'running',
      level: 'info',
      message: 'frames extracted',
      timestamp: '2026-05-06T04:07:17.187Z',
      sequence: 7,
      payload: {
        frameCount: 361,
        duration: 120.38,
        resolution: '960x544',
      },
    });
  });

  it('从 batch/totalBatches 推导 current total percent', () => {
    const event = normalizeSkillProcessLine(
      'vehicle-damage-inspection',
      JSON.stringify({
        type: 'process_event',
        time: '2026-05-06T04:08:11.443Z',
        stage: 'detect',
        message: 'batch completed',
        batch: 3,
        totalBatches: 8,
        damageCount: 3,
      }),
      8,
    );

    expect(event).toEqual(expect.objectContaining({
      source: 'skill',
      targetType: 'skill',
      stage: 'detect',
      message: 'batch completed',
      current: 3,
      total: 8,
      percent: 38,
      payload: expect.objectContaining({ damageCount: 3 }),
    }));
  });

  it('忽略普通 stderr', () => {
    expect(normalizeSkillProcessLine('demo', 'plain stderr', 1)).toBeNull();
  });

  it('忽略没有 process_event 标记的普通 JSON', () => {
    expect(normalizeSkillProcessLine('demo', '{"stage":"frames","message":"too broad"}', 1)).toBeNull();
  });
});
  • Step 2运行测试确认失败
pnpm --filter @neta/backend test -- --runInBand test/skill_process_events.test.ts

预期:失败,因为 normalizeSkillProcessLine 还不存在。

  • Step 3实现解析器

packages/backend/src/modules/netaclaw/service/skill_executor.ts 中增加 import

import type { RuntimeProcessEventInput } from '../runtime/process_events.js';

扩展 SkillExecuteParams

toolCallId?: string;
onProcessEvent?: (event: RuntimeProcessEventInput) => void;

SkillExecutorService 之前增加:

const RESERVED_PROCESS_KEYS = new Set([
  'type',
  'time',
  'timestamp',
  'stage',
  'message',
  'taskId',
  'level',
  'status',
  'current',
  'total',
  'percent',
  'batch',
  'totalBatches',
]);

export function normalizeSkillProcessLine(
  _skillName: string,
  line: string,
  sequence: number,
): RuntimeProcessEventInput | null {
  let parsed: Record<string, unknown>;
  try {
    parsed = JSON.parse(line);
  } catch {
    return null;
  }
  if (!parsed || typeof parsed !== 'object' || Array.isArray(parsed)) return null;
  if (parsed.type !== 'process_event') return null;
  if (typeof parsed.message !== 'string' && typeof parsed.stage !== 'string') return null;

  const current = typeof parsed.current === 'number'
    ? parsed.current
    : typeof parsed.batch === 'number'
      ? parsed.batch
      : undefined;
  const total = typeof parsed.total === 'number'
    ? parsed.total
    : typeof parsed.totalBatches === 'number'
      ? parsed.totalBatches
      : undefined;
  const percent = typeof parsed.percent === 'number'
    ? parsed.percent
    : total && current !== undefined
      ? Math.max(0, Math.min(100, Math.round((current / total) * 100)))
      : undefined;

  const payload: Record<string, unknown> = {};
  for (const [key, value] of Object.entries(parsed)) {
    if (!RESERVED_PROCESS_KEYS.has(key)) payload[key] = value;
  }

  return {
    source: 'skill',
    targetType: 'skill',
    taskId: typeof parsed.taskId === 'string' ? parsed.taskId : undefined,
    stage: typeof parsed.stage === 'string' ? parsed.stage : undefined,
    status: isProcessStatus(parsed.status) ? parsed.status : 'running',
    level: isProcessLevel(parsed.level) ? parsed.level : 'info',
    message: typeof parsed.message === 'string' ? parsed.message : String(parsed.stage || 'running'),
    current,
    total,
    percent,
    timestamp: typeof parsed.time === 'string'
      ? parsed.time
      : typeof parsed.timestamp === 'string'
        ? parsed.timestamp
        : new Date().toISOString(),
    sequence,
    payload: Object.keys(payload).length > 0 ? payload : undefined,
  };
}

function isProcessStatus(value: unknown): value is RuntimeProcessEventInput['status'] {
  return value === 'queued'
    || value === 'started'
    || value === 'running'
    || value === 'completed'
    || value === 'failed'
    || value === 'cancelled'
    || value === 'skipped';
}

function isProcessLevel(value: unknown): value is RuntimeProcessEventInput['level'] {
  return value === 'debug' || value === 'info' || value === 'warning' || value === 'error';
}
  • Step 4从 stderr 中提取过程事件

execute() 中增加:

let processSequence = 0;

在 stderr line loop 中 logger 后增加:

const processEvent = normalizeSkillProcessLine(skillName, line, ++processSequence);
if (processEvent) {
  params.onProcessEvent?.(processEvent);
}
  • Step 5补失败终态事件

timeout 时发:

params.onProcessEvent?.({
  source: 'runtime',
  targetType: 'skill',
  taskId: typeof input.taskId === 'string' ? input.taskId : undefined,
  stage: 'skill-timeout',
  status: 'failed',
  level: 'error',
  message: `Execution timed out after ${timeout}ms`,
});

非 0 退出、spawn error、stdout JSON 解析失败时同样发 status: 'failed'source: 'runtime'targetType: 'skill'

  • Step 6运行测试
pnpm --filter @neta/backend test -- --runInBand test/skill_process_events.test.ts

预期PASS。


Task 3execute_skill 桥接 skill 过程事件

文件:

  • 修改:packages/backend/src/modules/netaclaw/tools/builtin/execute_skill.ts

  • 测试:扩展 packages/backend/test/tool_process_events.test.ts

  • Step 1增加失败测试

packages/backend/test/tool_process_events.test.ts 追加:

import { createExecuteSkillTool } from '../src/modules/netaclaw/tools/builtin/execute_skill';

describe('execute_skill process bridge', () => {
  it('把 skill 过程事件转发给 tool update callback', async () => {
    const executor = {
      execute: jest.fn(async (params: any) => {
        params.onProcessEvent({
          source: 'skill',
          targetType: 'skill',
          taskId: 'task-1',
          stage: 'frames',
          status: 'running',
          message: 'extracting frames',
          current: 1,
          total: 4,
        });
        return { success: true, output: { ok: true }, duration: 10 };
      }),
    } as any;
    const updates: any[] = [];
    const tool = createExecuteSkillTool(executor);

    await tool.execute(
      'call-skill-1',
      { name: 'vehicle-damage-inspection', input: { taskId: 'task-1' } },
      event => updates.push(event),
    );

    expect(executor.execute).toHaveBeenCalledWith(expect.objectContaining({
      skillName: 'vehicle-damage-inspection',
      input: { taskId: 'task-1' },
      toolCallId: 'call-skill-1',
    }));
    expect(updates).toEqual([
      expect.objectContaining({
        source: 'skill',
        targetType: 'skill',
        taskId: 'task-1',
        stage: 'frames',
        message: 'extracting frames',
        current: 1,
        total: 4,
        percent: 25,
      }),
    ]);
  });
});
  • Step 2运行测试确认失败
pnpm --filter @neta/backend test -- --runInBand test/tool_process_events.test.ts
  • Step 3修改 execute_skill

packages/backend/src/modules/netaclaw/tools/builtin/execute_skill.ts 中把 execute 改成接收第三参数:

async execute(id, params, onUpdate) {

调用 executor 时改为:

const result = await executor.execute({
  skillName: params.name,
  input: params.input as Record<string, unknown>,
  toolCallId: id,
  onProcessEvent: event => {
    const total = typeof event.total === 'number' ? event.total : undefined;
    const current = typeof event.current === 'number' ? event.current : undefined;
    onUpdate?.({
      ...event,
      source: 'skill',
      targetType: event.targetType || 'skill',
      percent: typeof event.percent === 'number'
        ? event.percent
        : total && current !== undefined
          ? Math.max(0, Math.min(100, Math.round((current / total) * 100)))
          : undefined,
    });
  },
});
  • Step 4运行测试
pnpm --filter @neta/backend test -- --runInBand test/tool_process_events.test.ts test/skill_process_events.test.ts

预期PASS。


Task 4Gateway 协议、节流、metadata 摘要和 JSONL 记录

文件:

  • 修改:packages/backend/src/modules/netaclaw/runtime/process_events.ts

  • 修改:packages/backend/src/modules/netaclaw/gateway/protocol.ts

  • 修改:packages/backend/src/modules/netaclaw/gateway/server.ts

  • 测试:packages/backend/test/process_event_buffer.test.ts

  • Step 1写轻量 buffer 测试

创建 packages/backend/test/process_event_buffer.test.ts

import { ProcessEventBuffer } from '../src/modules/netaclaw/runtime/process_events';
import type { RuntimeProcessEvent } from '../src/modules/netaclaw/runtime/process_events';

function event(overrides: Partial<RuntimeProcessEvent>): RuntimeProcessEvent {
  return {
    version: 1,
    operationId: 'tool-call-1',
    targetType: 'skill',
    source: 'skill',
    status: 'running',
    level: 'info',
    message: 'batch completed',
    timestamp: '2026-05-06T04:00:00.000Z',
    ...overrides,
  };
}

describe('ProcessEventBuffer', () => {
  it('合并重复过程事件,并 flush 最新事件', () => {
    const emitted: RuntimeProcessEvent[] = [];
    const buffer = new ProcessEventBuffer(e => emitted.push(e), { throttleMs: 500 });

    buffer.push(event({ operationId: 'op-1', stage: 'detect', sequence: 1 }));
    buffer.push(event({ operationId: 'op-1', stage: 'detect', sequence: 2 }));
    buffer.flushAll();

    expect(emitted).toHaveLength(1);
    expect(emitted[0]).toEqual(expect.objectContaining({
      operationId: 'op-1',
      stage: 'detect',
      sequence: 2,
      payload: expect.objectContaining({ repeatCount: 2 }),
    }));
  });

  it('终态和错误事件立即发送', () => {
    const emitted: RuntimeProcessEvent[] = [];
    const buffer = new ProcessEventBuffer(e => emitted.push(e), { throttleMs: 500 });

    buffer.push(event({ operationId: 'op-1', status: 'failed', level: 'error', message: 'failed' }));

    expect(emitted).toHaveLength(1);
    expect(emitted[0]).toEqual(expect.objectContaining({ status: 'failed', message: 'failed' }));
  });
});
  • Step 2运行测试确认失败
pnpm --filter @neta/backend test -- --runInBand test/process_event_buffer.test.ts
  • Step 3在 process_events.ts 中增加轻量 buffer

packages/backend/src/modules/netaclaw/runtime/process_events.ts 追加:

export interface ProcessEventBufferOptions {
  throttleMs?: number;
}

interface BufferedEvent {
  event: RuntimeProcessEvent;
  repeatCount: number;
  timer?: NodeJS.Timeout;
}

export class ProcessEventBuffer {
  private readonly throttleMs: number;
  private readonly pending = new Map<string, BufferedEvent>();

  constructor(
    private readonly emit: (event: RuntimeProcessEvent) => void,
    options: ProcessEventBufferOptions = {},
  ) {
    this.throttleMs = options.throttleMs ?? 400;
  }

  push(event: RuntimeProcessEvent) {
    if (isTerminalProcessEvent(event)) {
      this.flushOperation(event.operationId);
      this.emit(event);
      return;
    }

    const key = `${event.operationId}:${event.stage || ''}:${event.message}`;
    const existing = this.pending.get(key);
    if (existing) {
      existing.event = event;
      existing.repeatCount += 1;
      return;
    }

    const buffered: BufferedEvent = {
      event,
      repeatCount: 1,
      timer: setTimeout(() => this.flushKey(key), this.throttleMs),
    };
    this.pending.set(key, buffered);
  }

  flushAll() {
    for (const key of [...this.pending.keys()]) {
      this.flushKey(key);
    }
  }

  private flushOperation(operationId: string) {
    for (const [key, buffered] of [...this.pending.entries()]) {
      if (buffered.event.operationId === operationId) {
        this.flushKey(key);
      }
    }
  }

  private flushKey(key: string) {
    const buffered = this.pending.get(key);
    if (!buffered) return;
    if (buffered.timer) clearTimeout(buffered.timer);
    this.pending.delete(key);
    this.emit({
      ...buffered.event,
      payload: buffered.repeatCount > 1
        ? { ...(buffered.event.payload || {}), repeatCount: buffered.repeatCount }
        : buffered.event.payload,
    });
  }
}

function isTerminalProcessEvent(event: RuntimeProcessEvent): boolean {
  return event.status === 'completed'
    || event.status === 'failed'
    || event.status === 'cancelled'
    || event.status === 'skipped'
    || event.level === 'error';
}
  • Step 4增加 WebSocket 协议

packages/backend/src/modules/netaclaw/gateway/protocol.ts 增加:

export interface ServerToolProcessEvent {
  type: 'tool_process_event';
  sessionId: string;
  data: {
    version: 1;
    operationId: string;
    parentOperationId?: string;
    targetType: 'tool' | 'skill' | 'subagent' | 'task' | 'runtime';
    toolCallId: string;
    assistantEntryId?: string;
    toolName: string;
    toolLabel?: string;
    source: 'skill' | 'tool' | 'subagent' | 'runtime';
    taskId?: string;
    stage?: string;
    status: 'queued' | 'started' | 'running' | 'completed' | 'failed' | 'cancelled' | 'skipped';
    level?: 'debug' | 'info' | 'warning' | 'error';
    message: string;
    detail?: string;
    current?: number;
    total?: number;
    percent?: number;
    payload?: Record<string, unknown>;
    timestamp: string;
    sequence: number;
  };
}

ServerToolProcessEvent 加入 ServerEvent union。

  • Step 5Gateway 发送 live event 并保存 metadata 摘要

packages/backend/src/modules/netaclaw/gateway/server.ts import

import {
  ProcessEventBuffer,
  RuntimeProcessEvent,
  RuntimeProcessLogRef,
  appendRuntimeProcessLog,
  buildProcessLogRef,
  sampleProcessEventsForMetadata,
} from '../runtime/process_events.js';

扩展 toolExecutions item

processEvents?: RuntimeProcessEvent[];
processLogRef?: RuntimeProcessLogRef;

buildToolExecutionMetadata() 的每个 item 中增加:

latestProcessEvent: te.processEvents?.at(-1),
processEvents: sampleProcessEventsForMetadata(te.processEvents || [], 50),
processLogRef: te.processLogRef,

在 runner 内创建 buffer

const processEventBuffer = new ProcessEventBuffer(event => {
  this.send({
    type: 'tool_process_event',
    sessionId: sid,
    data: event as any,
  });
});

runAgent 增加:

onToolProcessEvent: ({ toolCallId, operationId, toolName, event }) => {
  const projectionId = toolCallId || randomUUID();
  const te = [...toolExecutions].reverse().find(t => t.toolCallId === projectionId)
    || [...toolExecutions].reverse().find(t => t.name === toolName && t.status === 'running');
  const sequence = (te?.processEvents?.length || 0) + 1;
  const processEvent: RuntimeProcessEvent = {
    ...event,
    sequence,
  };

  if (te) {
    te.processEvents = [...(te.processEvents || []), processEvent].slice(-50);
    te.processLogRef = appendRuntimeProcessLog({
      sessionId: sid,
      operationId: processEvent.operationId || operationId,
      toolCallId: projectionId,
      event: processEvent,
    });
  }

  processEventBuffer.push({
    ...processEvent,
    payload: processEvent.payload,
  } as RuntimeProcessEvent & {
    toolCallId: string;
    assistantEntryId?: string;
    toolName: string;
    toolLabel?: string;
  });
},

注意:如果 TypeScript 不接受扩展字段,给发送 payload 单独构造对象,不要污染 RuntimeProcessEvent 基础类型。

  • Step 6失败时发终态事件

failRunningTools() 中为每个 running tool 追加 status: 'failed' 的过程事件,并调用 processEventBuffer.push(...)

逻辑引用使用:

te.processLogRef = buildProcessLogRef({
  sessionId: sid,
  operationId: `tool-${te.toolCallId}`,
  toolCallId: te.toolCallId,
});

不要把本地 JSONL 绝对路径写进 metadata。

  • Step 7flush pending events

runAgent 正常完成后、构建 final metadata 前调用:

processEventBuffer.flushAll();

在 catch 中 failRunningTools(...) 后也调用。

  • Step 8运行测试和构建
pnpm --filter @neta/backend test -- --runInBand test/tool_process_events.test.ts test/skill_process_events.test.ts test/process_event_buffer.test.ts
pnpm --filter @neta/backend build

预期PASS。


Task 5前端类型和 chat store 投影

文件:

  • 修改:packages/frontend/src/modules/agent/types/index.d.ts

  • 修改:packages/frontend/src/modules/agent/store/chat.ts

  • Step 1增加前端类型

packages/frontend/src/modules/agent/types/index.d.ts 增加:

export interface RuntimeProcessLogRef {
	kind: 'runtime_process_log';
	sessionId: string;
	operationId: string;
	toolCallId?: string;
}

export interface RuntimeProcessEventProjection {
	version: 1;
	operationId: string;
	parentOperationId?: string;
	targetType: 'tool' | 'skill' | 'subagent' | 'task' | 'runtime';
	source: 'skill' | 'tool' | 'subagent' | 'runtime';
	taskId?: string;
	stage?: string;
	status: 'queued' | 'started' | 'running' | 'completed' | 'failed' | 'cancelled' | 'skipped';
	level?: 'debug' | 'info' | 'warning' | 'error';
	message: string;
	detail?: string;
	current?: number;
	total?: number;
	percent?: number;
	payload?: Record<string, unknown>;
	timestamp: string;
	sequence: number;
}

StreamingToolProjection 增加:

latestProcessEvent?: RuntimeProcessEventProjection;
processEvents?: RuntimeProcessEventProjection[];
processLogRef?: RuntimeProcessLogRef;

WSServerEvent.type 增加 'tool_process_event'

  • Step 2从 metadata 恢复过程事件

packages/frontend/src/modules/agent/store/chat.tstoStreamingToolsFromEntry() 中,每个 projected tool 增加:

latestProcessEvent: normalizeToolProcessEvent(item.latestProcessEvent),
processEvents: normalizeToolProcessEvents(item.processEvents),
processLogRef: normalizeProcessLogRef(item.processLogRef),

增加 helper

function normalizeToolProcessEvents(value: unknown): import('../types/index.d').RuntimeProcessEventProjection[] | undefined {
	if (!Array.isArray(value)) return undefined;
	const events = value
		.map(normalizeToolProcessEvent)
		.filter((item): item is import('../types/index.d').RuntimeProcessEventProjection => Boolean(item));
	return events.length > 0 ? events.slice(-50) : undefined;
}

function normalizeToolProcessEvent(value: unknown): import('../types/index.d').RuntimeProcessEventProjection | undefined {
	if (!value || typeof value !== 'object' || Array.isArray(value)) return undefined;
	const record = value as Record<string, any>;
	if (typeof record.message !== 'string') return undefined;
	return {
		version: 1,
		operationId: typeof record.operationId === 'string' ? record.operationId : `legacy-${record.toolCallId || record.sequence || Date.now()}`,
		parentOperationId: typeof record.parentOperationId === 'string' ? record.parentOperationId : undefined,
		targetType: ['tool', 'skill', 'subagent', 'task', 'runtime'].includes(record.targetType) ? record.targetType : 'tool',
		source: ['skill', 'tool', 'subagent', 'runtime'].includes(record.source) ? record.source : 'tool',
		taskId: typeof record.taskId === 'string' ? record.taskId : undefined,
		stage: typeof record.stage === 'string' ? record.stage : undefined,
		status: ['queued', 'started', 'running', 'completed', 'failed', 'cancelled', 'skipped'].includes(record.status) ? record.status : 'running',
		level: ['debug', 'info', 'warning', 'error'].includes(record.level) ? record.level : undefined,
		message: record.message,
		detail: typeof record.detail === 'string' ? record.detail : undefined,
		current: typeof record.current === 'number' ? record.current : undefined,
		total: typeof record.total === 'number' ? record.total : undefined,
		percent: typeof record.percent === 'number' ? record.percent : undefined,
		payload: record.payload && typeof record.payload === 'object' && !Array.isArray(record.payload) ? record.payload : undefined,
		timestamp: typeof record.timestamp === 'string' ? record.timestamp : new Date().toISOString(),
		sequence: typeof record.sequence === 'number' ? record.sequence : 0,
	};
}

function normalizeProcessLogRef(value: unknown): import('../types/index.d').RuntimeProcessLogRef | undefined {
	if (!value || typeof value !== 'object' || Array.isArray(value)) return undefined;
	const record = value as Record<string, any>;
	if (record.kind !== 'runtime_process_log') return undefined;
	if (typeof record.sessionId !== 'string' || typeof record.operationId !== 'string') return undefined;
	return {
		kind: 'runtime_process_log',
		sessionId: record.sessionId,
		operationId: record.operationId,
		toolCallId: typeof record.toolCallId === 'string' ? record.toolCallId : undefined,
	};
}
  • Step 3处理 live tool_process_event

handleWSEvent() 中增加:

case 'tool_process_event':
	if (event.data?.toolCallId) {
		const processEvent = normalizeToolProcessEvent(event.data);
		if (processEvent) {
			const existing = streamingTools.value.find(item => item.toolCallId === event.data.toolCallId);
			const processEvents = [...(existing?.processEvents || []), processEvent].slice(-50);
			upsertStreamingTool({
				toolCallId: event.data.toolCallId,
				assistantEntryId: event.data.assistantEntryId,
				toolName: event.data.toolName,
				label: event.data.toolLabel,
				status: existing?.status || 'running',
				args: existing?.args,
				result: existing?.result,
				rawResult: existing?.rawResult,
				runtimeRoute: existing?.runtimeRoute,
				workerRoutingHint: existing?.workerRoutingHint,
				blockedReason: existing?.blockedReason,
				policySources: existing?.policySources,
				processEvents,
				latestProcessEvent: processEvent,
				processLogRef: existing?.processLogRef,
			});
		}
	}
	break;
  • Step 4运行前端类型检查
pnpm --filter @neta/frontend type-check

预期PASS。如果存在无关旧错误记录下来并在最终执行 pnpm --filter @neta/frontend build


Task 6前端过程时间线 UI

文件:

  • 新增:packages/frontend/src/modules/agent/components/tool-process-timeline.vue

  • 修改:packages/frontend/src/modules/agent/components/skill-card.vue

  • 修改:实际渲染 streamingTools 的父组件或 renderer

  • Step 1新增独立时间线组件

创建 packages/frontend/src/modules/agent/components/tool-process-timeline.vue

<template>
  <div v-if="events.length" class="tool-process">
    <el-button text size="small" @click="showProcess = !showProcess">
      {{ showProcess ? '收起过程' : `查看过程 ${events.length}` }}
    </el-button>
    <el-collapse-transition>
      <ol v-if="showProcess" class="tool-process__timeline">
        <li
          v-for="event in events"
          :key="`${event.operationId}-${event.sequence}-${event.timestamp}`"
          class="tool-process__item"
          :class="[`is-${event.level || 'info'}`]"
        >
          <span class="tool-process__dot" />
          <span class="tool-process__main">
            <span class="tool-process__message">{{ event.message }}</span>
            <span v-if="event.current !== undefined && event.total !== undefined" class="tool-process__count">
              {{ event.current }}/{{ event.total }}
            </span>
          </span>
        </li>
      </ol>
    </el-collapse-transition>
  </div>
</template>

<script lang="ts" setup>
import { computed, ref } from 'vue';
import type { RuntimeProcessEventProjection } from '../types/index.d';

const props = defineProps<{
  events?: RuntimeProcessEventProjection[];
}>();

const showProcess = ref(false);
const events = computed(() => props.events || []);
</script>

<style lang="scss" scoped>
.tool-process {
  margin-top: 6px;
}

.tool-process__timeline {
  list-style: none;
  margin: 8px 0 0;
  padding: 0;
  display: grid;
  gap: 6px;
}

.tool-process__item {
  display: grid;
  grid-template-columns: 10px 1fr;
  gap: 8px;
  align-items: start;
  color: var(--el-text-color-secondary);
  font-size: 12px;
  line-height: 1.45;
}

.tool-process__dot {
  width: 6px;
  height: 6px;
  margin-top: 6px;
  border-radius: 50%;
  background: var(--el-color-primary);
}

.tool-process__item.is-warning .tool-process__dot {
  background: var(--el-color-warning);
}

.tool-process__item.is-error .tool-process__dot {
  background: var(--el-color-danger);
}

.tool-process__main {
  min-width: 0;
  display: flex;
  gap: 6px;
  align-items: baseline;
}

.tool-process__message {
  min-width: 0;
  overflow-wrap: anywhere;
}

.tool-process__count {
  flex: none;
  color: var(--el-text-color-placeholder);
}
</style>
  • Step 2修改 skill-card

skill-card.vue 中增加:

import type { RuntimeProcessEventProjection } from '../types/index.d';
import ToolProcessTimeline from './tool-process-timeline.vue';

props 增加:

latestProcessEvent?: RuntimeProcessEventProjection;
processEvents?: RuntimeProcessEventProjection[];

把 running 区域改为:

<transition name="fade">
  <div v-if="status === 'running' || latestProcessEvent" class="skill-card__progress">
    <el-progress
      v-if="typeof latestProcessEvent?.percent === 'number' || typeof percent === 'number'"
      :percentage="latestProcessEvent?.percent ?? percent ?? 0"
      :stroke-width="4"
      :show-text="false"
    />
    <span v-if="latestProcessEvent?.message || step || detail" class="skill-card__step">
      {{ latestProcessEvent?.message || step || detail }}
    </span>
    <span v-if="latestProcessEvent?.detail" class="skill-card__detail">
      {{ latestProcessEvent.detail }}
    </span>
  </div>
</transition>

<ToolProcessTimeline :events="processEvents" />

增加样式:

.skill-card__detail {
  display: block;
  margin-top: 2px;
  font-size: 12px;
  color: var(--el-text-color-placeholder);
  line-height: 1.45;
}
  • Step 3确认父组件传参

搜索:

rg -n "<skill-card|SkillCard|streamingTools" packages/frontend/src/modules/agent

确保最终渲染工具卡的位置传入:

:latest-process-event="tool.latestProcessEvent"
:process-events="tool.processEvents"

如果走 renderer registry确保 renderer 保留:

latestProcessEvent: tool.latestProcessEvent,
processEvents: tool.processEvents,
processLogRef: tool.processLogRef,
  • Step 4运行构建
pnpm --filter @neta/frontend build

预期PASS。


Task 7端到端验证

文件:

  • 不要求新增源码文件。

  • 使用现有 vehicle-damage-inspection skill。

  • Step 1运行后端测试

pnpm --filter @neta/backend test -- --runInBand test/tool_process_events.test.ts test/skill_process_events.test.ts test/process_event_buffer.test.ts

预期PASS。

  • Step 2运行后端构建
pnpm --filter @neta/backend build

预期PASS。

  • Step 3运行前端构建
pnpm --filter @neta/frontend build

预期PASS。

  • Step 4手工验证旧伤检测过程

启动项目:

pnpm dev

在 Agent 对话页输入:

请检查是否有旧伤

预期:

  • execute_skill 工具卡出现。

  • 旧伤检测过程中,卡片持续显示当前阶段,例如:

    • creating workspace
    • extracting frames
    • frames extracted
    • batch started
    • batch completed
    • grounding damage candidates
    • selecting best frames
  • 展开过程后可以看到采样后的过程时间线。

  • assistant metadata 中有 latestProcessEventprocessEvents、逻辑 processLogRef

  • processLogRef 不包含本地绝对路径。

  • 后端数据目录下存在完整 JSONL 过程日志。

  • 刷新页面后,已完成会话仍能显示 metadata 中保存的过程摘要。

  • 模型最终回复和 tool result 不受影响。

  • Step 5检查模型上下文边界

检查 buildLLMMessages 和 session-tree runtime context 构建逻辑,确认:

latestProcessEvent
processEvents
processLogRef

不会被注入 LLM history。

  • Step 6并发关联验证

如果能触发同一轮里多个工具或多次 skill 调用,验证:

  • 过程事件按 toolCallId 合并到正确工具卡。
  • 同名 skill 不串线。
  • 每个事件都有稳定 version=1operationIdtargetTypetimestamp
  • 失败时卡片收到 failed 过程事件,并结束 running 状态。

自检清单

  • 第一版只做 execute_skill 过程可见,没有扩展成完整任务中心。
  • 事件结构支持未来 subagent/tool/background task但不在本期强行接入。
  • processLogRef 是逻辑引用,不暴露本地绝对路径。
  • 过程事件只存在 metadata/UI replay/log不进入 LLM 上下文。
  • payload 做敏感字段脱敏和长字符串截断。
  • stderr 解析只接受 type: "process_event",不会误解析普通 JSON 日志。
  • gateway 中只保留必要桥接逻辑,通用 normalize/sample/log/buffer 放在轻量 helper 中。
  • 前端不是单纯进度条,而是“最新步骤 + 可展开过程时间线”。
  • 后端测试、后端 build、前端 build 全部通过。