GPU_GUARD_MONOREPO/docs/superpowers/plans/2026-05-13-weixin-archive-sync.md
2026-05-20 21:39:12 +08:00

65 KiB

Weixin 群聊记录同步到 Neta 本地存档(weixin-archive sync)实施计划

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: 群聊管理页加"同步聊天记录"按钮 → 后端解 WCDB → 增量落地到 <dataDir>/weixin-archive/cid-<channelId>.db SQLite。

Architecture: 新增独立 WeixinArchiveSyncService,复用现有 wcdb_codec / RoomResolver / MessageRepo;通过 WeixinDbService.getRuntime(cid) 获取已 bind 的 reader 和 key。同步走 channel-level mutex,better-sqlite3 写入 per-channel 文件,INSERT OR IGNORE + last_sync_ts 水位实现幂等增量。

Tech Stack: Midway.js 3.20 + TypeORM(MySQL,只读 group/channel 元数据)+ better-sqlite3(archive 文件)+ Vue 3 + Element Plus + fetch。

Spec: docs/superpowers/specs/2026-05-13-weixin-archive-sync-design.md

前置依赖:

  • 2026-05-12 weixin-db channel plan 已合并:wcdb_codec / RoomResolver / MessageRepo / WeixinDbService.bindChannel 已工作
  • 频道已 bind 成功(loginStatus='connected'),WeixinDbService.runtimes map 持有运行态

关键约束:

  • 每 Task 一个 commit
  • TDD:先写测试再实现
  • 不动 IncrementalReader.readIncrement / WalWatcher / agent_channel.routeInboundMessage(监听链路独立)
  • archive 不重新 spawn ps1,复用 bindChannel 已抽好的 key
  • channel-level mutex,不是 group-level
  • 同步过程后端阻塞,前端 loading,无 job 队列(YAGNI)
  • 错误码用字符串 token(channel-not-connected / room-not-found 等),前端映射文案

文件结构

新增

文件 责任
packages/backend/src/modules/netaclaw/runtime/weixin_db/archive_decryptor.ts 抽出 decryptToWork 公共函数
packages/backend/src/modules/netaclaw/runtime/weixin_db/archive_db.ts better-sqlite3 包装:openOrCreate / insertMessages / sync_state CRUD / countMessages
packages/backend/src/modules/netaclaw/runtime/weixin_db/archive_types.ts ArchiveMessage / SyncState 接口
packages/backend/src/modules/netaclaw/runtime/weixin_db/sender_parser.ts parseSenderPrefix 从 build_pseudo.ts 抽出公共
packages/backend/src/modules/netaclaw/runtime/weixin_db/project_archive.ts projectToArchive(row, channelId, room): ArchiveMessage 纯函数
packages/backend/src/modules/netaclaw/service/weixin_archive_sync.ts 主服务:syncGroup(groupId) + channel-level mutex + deleteChannelArchive(cid)
packages/backend/test/modules/netaclaw/runtime/weixin_db/archive_db.test.ts 表创建 / 插入幂等 / sync_state CRUD
packages/backend/test/modules/netaclaw/runtime/weixin_db/project_archive.test.ts 投影纯函数
packages/backend/test/modules/netaclaw/runtime/weixin_db/sender_parser.test.ts sender 前缀解析
packages/backend/test/modules/netaclaw/service/weixin_archive_sync.test.ts 服务流程 mock 测试
packages/backend/test/modules/netaclaw/service/weixin_archive_sync.concurrency.test.ts channel-level mutex 串行 / 并行验证

修改

文件 改动
packages/backend/src/modules/netaclaw/runtime/weixin_db/incremental_reader.ts decryptToWork 改为 import archive_decryptor.decryptDbToWorkDir
packages/backend/src/modules/netaclaw/runtime/weixin_db/build_pseudo.ts parseSenderPrefix 改为 import sender_parser
packages/backend/src/modules/netaclaw/runtime/weixin_db/room_resolver.ts 新增 findRoomsByName(roomName) 方法
packages/backend/src/modules/netaclaw/service/weixin_db.ts ChannelRuntimemessageKey 字段 + 新增 getRuntime(cid)
packages/backend/src/modules/netaclaw/service/agent_channel.ts delete(ids) cascade rm archive 文件
packages/backend/src/modules/netaclaw/service/agent_channel_group.ts 新增 sync(groupId) 委托
packages/backend/src/modules/netaclaw/controller/admin/agent_channel_group.ts 新增 POST /sync endpoint
packages/frontend/src/modules/agent/components/channel-group-panel.vue 群卡片新增"同步聊天记录"按钮 + handler

Phase A · 公共函数抽出(零行为变更的纯重构)

先把 decryptToWorkparseSenderPrefix 抽出来,后续 sync service 直接 import。

Task 1: 抽出 archive_decryptor.ts

Files:

  • Create: packages/backend/src/modules/netaclaw/runtime/weixin_db/archive_decryptor.ts

  • Modify: packages/backend/src/modules/netaclaw/runtime/weixin_db/incremental_reader.ts

  • Step 1: 写新文件 archive_decryptor.ts

import * as fs from 'node:fs';
import * as path from 'node:path';
import { decryptDatabase } from './wcdb_codec.js';

export interface DecryptOptions {
  srcDb: string;
  rawKey: Buffer;
  workDir: string;
}

/**
 * 把加密 DB 原地 copy 到 workDir/src/ 再解密到 workDir/decrypted/ 下,返回明文 DB 路径。
 * 微信持有独占锁,必须先 copy 副本; -wal / -shm 如存在也尝试 copy(锁文件 copy 失败可忽略)。
 * 解密目标 DB 自带完整数据,删掉残留 -wal / -shm,避免 better-sqlite3 误读。
 */
export function decryptDbToWorkDir(opts: DecryptOptions): string {
  const { srcDb, rawKey, workDir } = opts;
  const srcDir = path.join(workDir, 'src');
  const outDir = path.join(workDir, 'decrypted');
  fs.mkdirSync(srcDir, { recursive: true });
  fs.mkdirSync(outDir, { recursive: true });

  const name = path.basename(srcDb);
  const srcCopy = path.join(srcDir, name);
  fs.copyFileSync(srcDb, srcCopy);
  for (const suffix of ['-wal', '-shm']) {
    const s = srcDb + suffix;
    if (fs.existsSync(s)) {
      try { fs.copyFileSync(s, srcCopy + suffix); } catch { /* 锁文件 copy 失败可忽略 */ }
    }
  }

  const encrypted = fs.readFileSync(srcCopy);
  const decrypted = decryptDatabase(encrypted, rawKey);
  const outPath = path.join(outDir, name);
  fs.writeFileSync(outPath, decrypted);
  for (const suffix of ['-wal', '-shm']) {
    const p = outPath + suffix;
    if (fs.existsSync(p)) {
      try { fs.unlinkSync(p); } catch { /* ignore */ }
    }
  }
  return outPath;
}
  • Step 2: 改 incremental_reader.ts 改用此函数

定位 private decryptToWork(srcDb, rawKey) 方法(约 83-108 行),整段删除。文件顶部 import 加:

import { decryptDbToWorkDir } from './archive_decryptor.js';

并删除 import { decryptDatabase } from './wcdb_codec.js'; 这一行(由 archive_decryptor 内部使用,本文件不再直接用)。

然后把内部 3 处 this.decryptToWork(X, Y) 全部替换为:

decryptDbToWorkDir({ srcDb: X, rawKey: Y, workDir: this.cfg.workDir })

具体替换点:

  • initialize() 方法内 3 处(sessionDb / contactDb / messageDb 各一次)

  • readIncrement() 方法内 1 处(messageDb)

  • Step 3: 跑现有 weixin_db 单测确保零回归

Run: pnpm --filter @neta/backend test -- runtime/weixin_db 2>&1 | tail -15 Expected: 全 pass(应仍能跑 wcdb_codec / room_resolver / build_pseudo / db_paths 等已有测试)

  • Step 4: Commit
git add packages/backend/src/modules/netaclaw/runtime/weixin_db/archive_decryptor.ts \
        packages/backend/src/modules/netaclaw/runtime/weixin_db/incremental_reader.ts
git commit -m "refactor(weixin-db): 抽出 decryptDbToWorkDir 公共函数 (供 archive 复用)"

Task 2: 抽出 sender_parser.ts

Files:

  • Create: packages/backend/src/modules/netaclaw/runtime/weixin_db/sender_parser.ts

  • Create: packages/backend/test/modules/netaclaw/runtime/weixin_db/sender_parser.test.ts

  • Modify: packages/backend/src/modules/netaclaw/runtime/weixin_db/build_pseudo.ts

  • Step 1: 写失败测试 sender_parser.test.ts

import { parseSenderPrefix } from '../../../../../src/modules/netaclaw/runtime/weixin_db/sender_parser.js';

describe('parseSenderPrefix', () => {
  it('parses "wxid_alice:\\n..." prefix', () => {
    const r = parseSenderPrefix('wxid_alice:\n你好');
    expect(r).toEqual({ senderWxid: 'wxid_alice', body: '你好' });
  });

  it('returns null when no colon-newline', () => {
    expect(parseSenderPrefix('系统消息')).toBeNull();
  });

  it('returns null when prefix not starting with wxid_', () => {
    expect(parseSenderPrefix('alice:\n你好')).toBeNull();
  });

  it('returns null when prefix is empty (colon at index 0)', () => {
    expect(parseSenderPrefix(':\nfoo')).toBeNull();
  });
});
  • Step 2: 跑测试确认失败

Run: pnpm --filter @neta/backend test -- runtime/weixin_db/sender_parser 2>&1 | tail -10 Expected: FAIL(module 不存在)

  • Step 3: 写实现 sender_parser.ts
/**
 * 群消息 message_content 的 sender 前缀格式: "wxid_xxx:\n<正文>"。
 * - 非群消息 / 系统消息无此前缀 → 返回 null
 * - 前缀必须以 "wxid_" 起始,且 ":\n" 不在 index 0(避免空前缀误判)
 */
export function parseSenderPrefix(content: string): { senderWxid: string; body: string } | null {
  const idx = content.indexOf(':\n');
  if (idx <= 0) return null;
  const prefix = content.slice(0, idx);
  if (!prefix.startsWith('wxid_')) return null;
  return { senderWxid: prefix, body: content.slice(idx + 2) };
}
  • Step 4: 跑测试通过

Run: pnpm --filter @neta/backend test -- runtime/weixin_db/sender_parser 2>&1 | tail -10 Expected: 4 passed

  • Step 5: 改 build_pseudo.ts 改用公共函数

把文件底部的 function parseSenderPrefix(...) 整段删除,顶部 import 添加:

import { parseSenderPrefix } from './sender_parser.js';
  • Step 6: 跑 build_pseudo 现有测试零回归

Run: pnpm --filter @neta/backend test -- runtime/weixin_db/build_pseudo 2>&1 | tail -10 Expected: 全 pass

  • Step 7: Commit
git add packages/backend/src/modules/netaclaw/runtime/weixin_db/sender_parser.ts \
        packages/backend/src/modules/netaclaw/runtime/weixin_db/build_pseudo.ts \
        packages/backend/test/modules/netaclaw/runtime/weixin_db/sender_parser.test.ts
git commit -m "refactor(weixin-db): 抽出 parseSenderPrefix 公共函数"

Phase B · ArchiveDb 模块(TDD)

Task 3: archive_types.ts 类型定义

Files:

  • Create: packages/backend/src/modules/netaclaw/runtime/weixin_db/archive_types.ts

  • Step 1: 写实现

/**
 * 一条入档消息(写入 SQLite message 表的 row 投影)。
 */
export interface ArchiveMessage {
  channelId: number;
  roomId: string;                  // 群 username,如 xxx@chatroom
  roomName: string | null;         // 同步时刻的群显示名
  msgTable: string;                // 'Msg_<md5>'
  localId: number;
  serverId: string | null;         // BigInt → TEXT 保留精度
  localType: number;
  senderWxid: string | null;
  createTime: number;              // Unix 秒
  content: string | null;          // zstd 解后的 UTF-8 (文本=正文; 非文本=完整 XML)
  rawHex: string | null;           // 极少数解失败时的兜底 hex
}

/**
 * 每群水位 (sync_state 表的 row)。
 */
export interface SyncState {
  roomId: string;
  roomName: string | null;
  lastSyncTs: number;              // create_time 高水位
  lastSyncCount: number;           // 上次 INSERT OR IGNORE 实际入档数
  totalCount: number;              // 该 room 当前 archive 累计总数
  lastSyncAt: number;              // 上次完成时间戳 (Unix 秒)
  lastError: string | null;
}
  • Step 2: Commit
git add packages/backend/src/modules/netaclaw/runtime/weixin_db/archive_types.ts
git commit -m "feat(weixin-archive): 定义 ArchiveMessage / SyncState 类型"

Task 4: archive_db.ts 写失败测试

Files:

  • Create: packages/backend/test/modules/netaclaw/runtime/weixin_db/archive_db.test.ts

  • Step 1: 写测试

import * as fs from 'node:fs';
import * as os from 'node:os';
import * as path from 'node:path';
import { ArchiveDb } from '../../../../../src/modules/netaclaw/runtime/weixin_db/archive_db.js';
import type { ArchiveMessage } from '../../../../../src/modules/netaclaw/runtime/weixin_db/archive_types.js';

const tempDirs: string[] = [];
function tmpDataDir(): string {
  const d = fs.mkdtempSync(path.join(os.tmpdir(), 'archivedb-'));
  tempDirs.push(d);
  return d;
}
afterAll(() => {
  for (const d of tempDirs) {
    try { fs.rmSync(d, { recursive: true, force: true }); } catch { /* ignore */ }
  }
});

function row(over: Partial<ArchiveMessage> = {}): ArchiveMessage {
  return {
    channelId: 6,
    roomId: 'gA@chatroom',
    roomName: '产品研发群',
    msgTable: 'Msg_abc',
    localId: 1,
    serverId: '999',
    localType: 1,
    senderWxid: 'wxid_a',
    createTime: 1700000000,
    content: 'hello',
    rawHex: null,
    ...over,
  };
}

describe('ArchiveDb', () => {
  it('archivePath returns <dataDir>/weixin-archive/cid-<id>.db', () => {
    expect(ArchiveDb.archivePath(7, '/data').replace(/\\/g, '/'))
      .toBe('/data/weixin-archive/cid-7.db');
  });

  it('openOrCreate creates file + tables', () => {
    const dir = tmpDataDir();
    const db = ArchiveDb.openOrCreate(1, dir);
    try {
      expect(fs.existsSync(path.join(dir, 'weixin-archive', 'cid-1.db'))).toBe(true);
      expect(db.countMessages('any-room')).toBe(0);
      expect(db.getSyncState('any-room')).toBeNull();
    } finally {
      db.close();
    }
  });

  it('insertMessages returns number of new rows (INSERT OR IGNORE)', () => {
    const db = ArchiveDb.openOrCreate(1, tmpDataDir());
    try {
      const first = db.insertMessages([row({ localId: 1 }), row({ localId: 2 })]);
      expect(first).toBe(2);
      // 同 (room_id, local_id) 重复插入应被忽略
      const second = db.insertMessages([row({ localId: 1 }), row({ localId: 3 })]);
      expect(second).toBe(1);
      expect(db.countMessages('gA@chatroom')).toBe(3);
    } finally {
      db.close();
    }
  });

  it('updateSyncState writes and reads back', () => {
    const db = ArchiveDb.openOrCreate(1, tmpDataDir());
    try {
      db.updateSyncState('gA@chatroom', {
        roomName: '产品研发群',
        lastSyncTs: 1700000100,
        lastSyncCount: 12,
        totalCount: 100,
        lastSyncAt: 1700000200,
        lastError: null,
      });
      const s = db.getSyncState('gA@chatroom');
      expect(s).not.toBeNull();
      expect(s!.lastSyncTs).toBe(1700000100);
      expect(s!.lastSyncCount).toBe(12);
      expect(s!.totalCount).toBe(100);
      expect(s!.lastError).toBeNull();
    } finally {
      db.close();
    }
  });

  it('updateSyncState merges partial patches (UPSERT semantics)', () => {
    const db = ArchiveDb.openOrCreate(1, tmpDataDir());
    try {
      db.updateSyncState('r', {
        roomName: 'R',
        lastSyncTs: 100,
        lastSyncCount: 5,
        totalCount: 5,
        lastSyncAt: 200,
        lastError: 'oops',
      });
      // 部分更新: 只清错误 + 更新 lastSyncAt
      db.updateSyncState('r', { lastSyncAt: 300, lastError: null });
      const s = db.getSyncState('r')!;
      expect(s.lastSyncAt).toBe(300);
      expect(s.lastError).toBeNull();
      expect(s.lastSyncTs).toBe(100); // 未变
      expect(s.lastSyncCount).toBe(5); // 未变
    } finally {
      db.close();
    }
  });

  it('insertMessages stores rawHex when content is null', () => {
    const db = ArchiveDb.openOrCreate(1, tmpDataDir());
    try {
      db.insertMessages([row({ localId: 9, content: null, rawHex: '28b52ffd' })]);
      // 直接读底层 sqlite 验证字段
      const handle = (db as any).db as import('better-sqlite3').Database;
      const r = handle.prepare('SELECT content, raw_hex FROM message WHERE local_id=9').get() as any;
      expect(r.content).toBeNull();
      expect(r.raw_hex).toBe('28b52ffd');
    } finally {
      db.close();
    }
  });
});
  • Step 2: 跑测试确认失败

Run: pnpm --filter @neta/backend test -- runtime/weixin_db/archive_db 2>&1 | tail -15 Expected: FAIL("Cannot find module")


Task 5: archive_db.ts 实现

Files:

  • Create: packages/backend/src/modules/netaclaw/runtime/weixin_db/archive_db.ts

  • Step 1: 写实现

import Database = require('better-sqlite3');
import * as fs from 'node:fs';
import * as path from 'node:path';
import type { ArchiveMessage, SyncState } from './archive_types.js';

const CREATE_MESSAGE_SQL = `
CREATE TABLE IF NOT EXISTS message (
  id            INTEGER PRIMARY KEY AUTOINCREMENT,
  channel_id    INTEGER NOT NULL,
  room_id       TEXT    NOT NULL,
  room_name     TEXT,
  msg_table     TEXT    NOT NULL,
  local_id      INTEGER NOT NULL,
  server_id     TEXT,
  local_type    INTEGER NOT NULL,
  sender_wxid   TEXT,
  create_time   INTEGER NOT NULL,
  content       TEXT,
  raw_hex       TEXT,
  synced_at     INTEGER NOT NULL,
  UNIQUE(room_id, local_id)
)`;

const CREATE_INDEX_ROOM_TIME = `
CREATE INDEX IF NOT EXISTS idx_msg_room_time ON message(room_id, create_time DESC)`;

const CREATE_INDEX_TIME = `
CREATE INDEX IF NOT EXISTS idx_msg_create_time ON message(create_time DESC)`;

const CREATE_SYNC_STATE_SQL = `
CREATE TABLE IF NOT EXISTS sync_state (
  room_id          TEXT PRIMARY KEY,
  room_name        TEXT,
  last_sync_ts     INTEGER NOT NULL DEFAULT 0,
  last_sync_count  INTEGER NOT NULL DEFAULT 0,
  total_count      INTEGER NOT NULL DEFAULT 0,
  last_sync_at     INTEGER NOT NULL,
  last_error       TEXT
)`;

/**
 * 每个 channel 一个 archive sqlite 文件。
 * 文件路径: `<dataDir>/weixin-archive/cid-<channelId>.db`
 * Single-writer 模式: 调用方通过 channel-level mutex 串行化写入。
 */
export class ArchiveDb {
  private constructor(private readonly db: Database.Database) {}

  static archivePath(channelId: number, dataDir: string): string {
    return path.join(dataDir, 'weixin-archive', `cid-${channelId}.db`);
  }

  static openOrCreate(channelId: number, dataDir: string): ArchiveDb {
    const file = ArchiveDb.archivePath(channelId, dataDir);
    fs.mkdirSync(path.dirname(file), { recursive: true });
    const db = new Database(file);
    db.pragma('journal_mode = WAL');
    db.pragma('synchronous = NORMAL');
    db.exec(CREATE_MESSAGE_SQL);
    db.exec(CREATE_INDEX_ROOM_TIME);
    db.exec(CREATE_INDEX_TIME);
    db.exec(CREATE_SYNC_STATE_SQL);
    return new ArchiveDb(db);
  }

  /**
   * 批量插入,使用 `INSERT OR IGNORE` 保证 (room_id, local_id) 幂等。
   * 返回真正写入的行数。
   */
  insertMessages(rows: ArchiveMessage[]): number {
    if (rows.length === 0) return 0;
    const now = Math.floor(Date.now() / 1000);
    const stmt = this.db.prepare(`
      INSERT OR IGNORE INTO message
        (channel_id, room_id, room_name, msg_table, local_id, server_id, local_type,
         sender_wxid, create_time, content, raw_hex, synced_at)
      VALUES
        (@channelId, @roomId, @roomName, @msgTable, @localId, @serverId, @localType,
         @senderWxid, @createTime, @content, @rawHex, @syncedAt)
    `);
    const txn = this.db.transaction((batch: ArchiveMessage[]) => {
      let inserted = 0;
      for (const r of batch) {
        const info = stmt.run({
          channelId: r.channelId,
          roomId: r.roomId,
          roomName: r.roomName,
          msgTable: r.msgTable,
          localId: r.localId,
          serverId: r.serverId,
          localType: r.localType,
          senderWxid: r.senderWxid,
          createTime: r.createTime,
          content: r.content,
          rawHex: r.rawHex,
          syncedAt: now,
        });
        if (info.changes > 0) inserted++;
      }
      return inserted;
    });
    return txn(rows);
  }

  getSyncState(roomId: string): SyncState | null {
    const r = this.db.prepare(`
      SELECT room_id, room_name, last_sync_ts, last_sync_count, total_count, last_sync_at, last_error
      FROM sync_state WHERE room_id = ?
    `).get(roomId) as any;
    if (!r) return null;
    return {
      roomId: r.room_id,
      roomName: r.room_name,
      lastSyncTs: r.last_sync_ts,
      lastSyncCount: r.last_sync_count,
      totalCount: r.total_count,
      lastSyncAt: r.last_sync_at,
      lastError: r.last_error,
    };
  }

  /**
   * UPSERT 语义: 不存在则插入,存在则只更新 patch 里提供的字段。
   */
  updateSyncState(roomId: string, patch: Partial<Omit<SyncState, 'roomId'>>): void {
    const existing = this.getSyncState(roomId);
    const merged: SyncState = existing
      ? {
          roomId,
          roomName: patch.roomName !== undefined ? patch.roomName : existing.roomName,
          lastSyncTs: patch.lastSyncTs !== undefined ? patch.lastSyncTs : existing.lastSyncTs,
          lastSyncCount: patch.lastSyncCount !== undefined ? patch.lastSyncCount : existing.lastSyncCount,
          totalCount: patch.totalCount !== undefined ? patch.totalCount : existing.totalCount,
          lastSyncAt: patch.lastSyncAt !== undefined ? patch.lastSyncAt : existing.lastSyncAt,
          lastError: patch.lastError !== undefined ? patch.lastError : existing.lastError,
        }
      : {
          roomId,
          roomName: patch.roomName ?? null,
          lastSyncTs: patch.lastSyncTs ?? 0,
          lastSyncCount: patch.lastSyncCount ?? 0,
          totalCount: patch.totalCount ?? 0,
          lastSyncAt: patch.lastSyncAt ?? Math.floor(Date.now() / 1000),
          lastError: patch.lastError ?? null,
        };
    this.db.prepare(`
      INSERT INTO sync_state
        (room_id, room_name, last_sync_ts, last_sync_count, total_count, last_sync_at, last_error)
      VALUES (?, ?, ?, ?, ?, ?, ?)
      ON CONFLICT(room_id) DO UPDATE SET
        room_name       = excluded.room_name,
        last_sync_ts    = excluded.last_sync_ts,
        last_sync_count = excluded.last_sync_count,
        total_count     = excluded.total_count,
        last_sync_at    = excluded.last_sync_at,
        last_error      = excluded.last_error
    `).run(
      merged.roomId,
      merged.roomName,
      merged.lastSyncTs,
      merged.lastSyncCount,
      merged.totalCount,
      merged.lastSyncAt,
      merged.lastError,
    );
  }

  countMessages(roomId: string): number {
    const r = this.db.prepare(`SELECT COUNT(*) AS c FROM message WHERE room_id = ?`).get(roomId) as any;
    return r.c as number;
  }

  close(): void {
    this.db.close();
  }
}
  • Step 2: 跑测试通过

Run: pnpm --filter @neta/backend test -- runtime/weixin_db/archive_db 2>&1 | tail -15 Expected: 6 passed

  • Step 3: Commit
git add packages/backend/src/modules/netaclaw/runtime/weixin_db/archive_db.ts \
        packages/backend/src/modules/netaclaw/runtime/weixin_db/archive_types.ts \
        packages/backend/test/modules/netaclaw/runtime/weixin_db/archive_db.test.ts
git commit -m "feat(weixin-archive): ArchiveDb 模块 (per-channel sqlite + 幂等插入 + sync_state UPSERT)"

Phase C · 投影函数 projectToArchive(TDD)

Task 6: project_archive.ts

Files:

  • Create: packages/backend/src/modules/netaclaw/runtime/weixin_db/project_archive.ts

  • Create: packages/backend/test/modules/netaclaw/runtime/weixin_db/project_archive.test.ts

  • Step 1: 写测试

import { projectToArchive } from '../../../../../src/modules/netaclaw/runtime/weixin_db/project_archive.js';
import type { MessageRow } from '../../../../../src/modules/netaclaw/runtime/weixin_db/message_repo.js';
import type { RoomInfo } from '../../../../../src/modules/netaclaw/runtime/weixin_db/room_resolver.js';

const room: RoomInfo = {
  tableName: 'Msg_abc',
  username: 'gA@chatroom',
  roomName: '产品研发群',
  isGroup: true,
};

describe('projectToArchive', () => {
  it('parses sender prefix for text message', () => {
    const row: MessageRow = {
      localId: 11,
      serverId: 999n,
      localType: 1,
      realSenderId: 42n,
      createTime: 1700000000,
      content: 'wxid_alice:\n你好',
      tableName: 'Msg_abc',
    };
    const a = projectToArchive(row, 6, room);
    expect(a.channelId).toBe(6);
    expect(a.roomId).toBe('gA@chatroom');
    expect(a.roomName).toBe('产品研发群');
    expect(a.msgTable).toBe('Msg_abc');
    expect(a.localId).toBe(11);
    expect(a.serverId).toBe('999');
    expect(a.localType).toBe(1);
    expect(a.senderWxid).toBe('wxid_alice');
    expect(a.createTime).toBe(1700000000);
    expect(a.content).toBe('你好');
    expect(a.rawHex).toBeNull();
  });

  it('keeps full XML content for non-text (no sender prefix)', () => {
    const xml = '<msg><img md5="abc"/></msg>';
    const row: MessageRow = {
      localId: 12, serverId: null, localType: 3, realSenderId: 7n,
      createTime: 1700001000, content: xml, tableName: 'Msg_abc',
    };
    const a = projectToArchive(row, 6, room);
    expect(a.senderWxid).toBeNull();        // 无前缀
    expect(a.content).toBe(xml);            // 整段 XML 保留
    expect(a.localType).toBe(3);
  });

  it('serverId BigInt → string preserves precision', () => {
    const big = 9_999_999_999_999_999n;
    const row: MessageRow = {
      localId: 1, serverId: big, localType: 1, realSenderId: null,
      createTime: 1, content: 'x', tableName: 'Msg_abc',
    };
    const a = projectToArchive(row, 1, room);
    expect(a.serverId).toBe(big.toString());
  });

  it('rawHex null by default (content always set)', () => {
    const row: MessageRow = {
      localId: 1, serverId: null, localType: 1, realSenderId: null,
      createTime: 1, content: '', tableName: 'Msg_abc',
    };
    const a = projectToArchive(row, 1, room);
    expect(a.content).toBe('');
    expect(a.rawHex).toBeNull();
  });
});
  • Step 2: 跑测试确认失败

Run: pnpm --filter @neta/backend test -- runtime/weixin_db/project_archive 2>&1 | tail -10 Expected: FAIL("Cannot find module")

  • Step 3: 写实现
import { parseSenderPrefix } from './sender_parser.js';
import type { MessageRow } from './message_repo.js';
import type { RoomInfo } from './room_resolver.js';
import type { ArchiveMessage } from './archive_types.js';

/**
 * 把 MessageRepo 读到的 row 投影成 archive 写入行。
 * - 文本消息(有 wxid_xxx:\n 前缀): senderWxid + body 分离
 * - 非文本(无前缀): senderWxid=null, content=完整 XML 原样保留
 * - serverId: bigint → string 保精度
 * - rawHex: 本投影只处理 content 已是 string 的场景,zstd 解失败时由调用方填 rawHex
 */
export function projectToArchive(
  row: MessageRow,
  channelId: number,
  room: RoomInfo,
): ArchiveMessage {
  const parsed = parseSenderPrefix(row.content);
  return {
    channelId,
    roomId: room.username,
    roomName: room.roomName ?? null,
    msgTable: row.tableName,
    localId: row.localId,
    serverId: row.serverId != null ? row.serverId.toString() : null,
    localType: row.localType,
    senderWxid: parsed?.senderWxid ?? null,
    createTime: row.createTime,
    content: parsed ? parsed.body : row.content,
    rawHex: null,
  };
}
  • Step 4: 跑测试通过

Run: pnpm --filter @neta/backend test -- runtime/weixin_db/project_archive 2>&1 | tail -10 Expected: 4 passed

  • Step 5: Commit
git add packages/backend/src/modules/netaclaw/runtime/weixin_db/project_archive.ts \
        packages/backend/test/modules/netaclaw/runtime/weixin_db/project_archive.test.ts
git commit -m "feat(weixin-archive): projectToArchive 投影函数 (MessageRow → ArchiveMessage)"

Phase D · 暴露 weixin-db runtime + RoomResolver 名字反查

Task 7: RoomResolver.findRoomsByName

Files:

  • Modify: packages/backend/src/modules/netaclaw/runtime/weixin_db/room_resolver.ts

  • Create: packages/backend/test/modules/netaclaw/runtime/weixin_db/room_resolver_find.test.ts

  • Step 1: 写测试

import { RoomResolver, type RoomInfo } from '../../../../../src/modules/netaclaw/runtime/weixin_db/room_resolver.js';

function mk(over: Partial<RoomInfo>): RoomInfo {
  return {
    tableName: 'Msg_x',
    username: 'u',
    roomName: 'R',
    isGroup: true,
    ...over,
  };
}

describe('RoomResolver.findRoomsByName', () => {
  it('returns all rooms with matching roomName (groups only)', () => {
    const r = new RoomResolver();
    r._setMap(new Map([
      ['t1', mk({ tableName: 't1', username: 'g1@chatroom', roomName: '产品研发群', isGroup: true })],
      ['t2', mk({ tableName: 't2', username: 'g2@chatroom', roomName: '产品研发群', isGroup: true })],  // 同名
      ['t3', mk({ tableName: 't3', username: 'wxid_x',     roomName: '产品研发群', isGroup: false })], // DM 同名
      ['t4', mk({ tableName: 't4', username: 'g3@chatroom', roomName: '家庭群', isGroup: true })],
    ]));
    const hits = r.findRoomsByName('产品研发群');
    expect(hits).toHaveLength(2);
    expect(hits.map(h => h.username).sort()).toEqual(['g1@chatroom', 'g2@chatroom']);
  });

  it('returns empty when no match', () => {
    const r = new RoomResolver();
    expect(r.findRoomsByName('不存在的群')).toEqual([]);
  });

  it('returns empty for empty/whitespace input', () => {
    const r = new RoomResolver();
    expect(r.findRoomsByName('')).toEqual([]);
    expect(r.findRoomsByName('   ')).toEqual([]);
  });

  it('trims both sides before comparison', () => {
    const r = new RoomResolver();
    r._setMap(new Map([
      ['t1', mk({ tableName: 't1', username: 'g1@chatroom', roomName: '产品研发群 ', isGroup: true })],
    ]));
    expect(r.findRoomsByName('  产品研发群  ')).toHaveLength(1);
  });
});
  • Step 2: 跑测试确认失败

Run: pnpm --filter @neta/backend test -- runtime/weixin_db/room_resolver_find 2>&1 | tail -10 Expected: FAIL("findRoomsByName is not a function")

  • Step 3: 修 room_resolver.ts,在 lookup 方法之后插入
  /**
   * 按 roomName 反查所有匹配的群(仅 isGroup=true)。
   * 同名群可能多个,由调用方决定如何取舍。
   * 比较前 trim 双侧,抗首尾空格 / 用户输入与 contact.db 显示名细微差异。
   */
  findRoomsByName(roomName: string): RoomInfo[] {
    const needle = (roomName ?? '').trim();
    if (!needle) return [];
    const result: RoomInfo[] = [];
    for (const info of this.roomsByTable.values()) {
      if (info.isGroup && info.roomName.trim() === needle) result.push(info);
    }
    return result;
  }
  • Step 4: 跑测试通过

Run: pnpm --filter @neta/backend test -- runtime/weixin_db/room_resolver_find 2>&1 | tail -10 Expected: 2 passed

  • Step 5: Commit
git add packages/backend/src/modules/netaclaw/runtime/weixin_db/room_resolver.ts \
        packages/backend/test/modules/netaclaw/runtime/weixin_db/room_resolver_find.test.ts
git commit -m "feat(weixin-db): RoomResolver.findRoomsByName 名字反查群信息"

Task 8: WeixinDbService.getRuntime + ChannelRuntime.messageKey

Files:

  • Modify: packages/backend/src/modules/netaclaw/service/weixin_db.ts

archive sync 需要拿到已 bind 的 paths / messageKey / reader.roomResolver 等运行态,本 task 暴露这些。

  • Step 1: 修 ChannelRuntime 接口加 messageKey: Buffer

定位接口定义(约 16-23 行),改为:

interface ChannelRuntime {
  channelId: number;
  reader: IncrementalReader;
  watcher: WalWatcher;
  weixinPid: number;
  paths: DbPaths;
  messageKey: Buffer;
  onInbound: (channelId: number, pseudo: unknown) => Promise<void>;
}
  • Step 2: 修 bindChannel,在 this.runtimes.set(channel.id, { ... }) 处加 messageKey

定位 this.runtimes.set(channel.id, { channelId: channel.id, reader, watcher, weixinPid: keys.pid, paths, onInbound });(约 129-132 行),改为:

    this.runtimes.set(channel.id, {
      channelId: channel.id,
      reader,
      watcher,
      weixinPid: keys.pid,
      paths,
      messageKey: Buffer.from(msgKeyHex, 'hex'),
      onInbound,
    });
  • Step 3: 类型暴露 + 加 getRuntime 方法

在文件顶部已有 interface ChannelRuntime 那一行,改为 export interface ChannelRuntime 让外部可 import。

unbindChannel 方法之前(或类内任意位置)添加:

  /**
   * 暴露已 bind 的 runtime 给同进程内其他服务(如 weixin_archive_sync)。
   * 未 bind 返回 undefined。
   */
  getRuntime(channelId: number): ChannelRuntime | undefined {
    return this.runtimes.get(channelId);
  }
  • Step 4: 跑 weixin_db.ts 现有测试零回归

Run: pnpm --filter @neta/backend test -- service/weixin_db 2>&1 | tail -10 Expected: 全 pass

  • Step 5: Commit
git add packages/backend/src/modules/netaclaw/service/weixin_db.ts
git commit -m "feat(weixin-db): ChannelRuntime 暴露 messageKey + getRuntime(cid) 公开访问器"

Task 9: IncrementalReader 暴露 roomResolver(供 archive 名字反查)

Files:

  • Modify: packages/backend/src/modules/netaclaw/runtime/weixin_db/incremental_reader.ts

  • Step 1: 暴露 getter

定位 private roomResolver = new RoomResolver(); 这一行(约 19 行),保留 private 字段,在类内 resolveRoom 方法之后加:

  /** 暴露给同进程其他服务(如 archive_sync)调用 findRoomsByName / 其他 RoomResolver API。 */
  getRoomResolver(): RoomResolver {
    return this.roomResolver;
  }
  • Step 2: 跑现有测试零回归

Run: pnpm --filter @neta/backend test -- runtime/weixin_db 2>&1 | tail -10 Expected: 全 pass

  • Step 3: Commit
git add packages/backend/src/modules/netaclaw/runtime/weixin_db/incremental_reader.ts
git commit -m "feat(weixin-db): IncrementalReader.getRoomResolver 公开访问器"

Phase E · 主服务 WeixinArchiveSyncService(TDD)

Task 10: 服务 mock 单测

Files:

  • Create: packages/backend/test/modules/netaclaw/service/weixin_archive_sync.test.ts

本 task 只写测试。WeixinArchiveSyncService 类不存在 → 测试理应 FAIL。Task 11 写实现让测试通过。

  • Step 1: 写测试
import * as fs from 'node:fs';
import * as os from 'node:os';
import * as path from 'node:path';
import { WeixinArchiveSyncService } from '../../../../src/modules/netaclaw/service/weixin_archive_sync.js';

function mkLogger() {
  return { info: jest.fn(), warn: jest.fn(), error: jest.fn(), debug: jest.fn() };
}

function tmpDataDir(): string {
  return fs.mkdtempSync(path.join(os.tmpdir(), 'arcsync-'));
}

function mkRuntime(overrides: any = {}) {
  return {
    channelId: 6,
    paths: { messageDb: '/fake/message_0.db' },
    messageKey: Buffer.alloc(32),
    reader: {
      getRoomResolver: () => ({
        findRoomsByName: jest.fn().mockReturnValue([
          { tableName: 'Msg_abc', username: 'gA@chatroom', roomName: '产品研发群', isGroup: true },
        ]),
      }),
    },
    ...overrides,
  };
}

function mkGroup(over: any = {}) {
  return { id: 1, channelId: 6, roomId: 'gA@chatroom', roomName: '产品研发群', ...over };
}

function mkChannel(over: any = {}) {
  return { id: 6, type: 'weixin-db', loginStatus: 'connected', ...over };
}

describe('WeixinArchiveSyncService.syncGroup', () => {
  let svc: WeixinArchiveSyncService;
  let dataDir: string;
  const dirsToCleanup: string[] = [];

  beforeEach(() => {
    dataDir = tmpDataDir();
    dirsToCleanup.push(dataDir);
    svc = new WeixinArchiveSyncService();
    (svc as any).logger = mkLogger();
    (svc as any).resolveDataDir = () => dataDir;
    (svc as any).platform = 'win32';

    // mock decryptDbToWorkDir to return a fake path (后续会被 MessageRepo 调用 — 也 mock 掉)
    (svc as any).decryptDb = jest.fn().mockReturnValue('/fake/plain.db');

    // mock 注入: 创建一个不真打开 sqlite 的 MessageRepo
    (svc as any).openMessageRepo = jest.fn().mockReturnValue({
      listSince: jest.fn()
        .mockResolvedValueOnce([
          { localId: 1, serverId: 100n, localType: 1, realSenderId: null, createTime: 100,
            content: 'wxid_a:\nhi', tableName: 'Msg_abc' },
          { localId: 2, serverId: 101n, localType: 1, realSenderId: null, createTime: 101,
            content: 'wxid_b:\nyo', tableName: 'Msg_abc' },
        ])
        .mockResolvedValueOnce([]),
      close: jest.fn(),
    });
  });

  afterAll(() => {
    for (const d of dirsToCleanup) {
      try { fs.rmSync(d, { recursive: true, force: true }); } catch { /* ignore */ }
    }
  });

  it('rejects non-Windows platforms', async () => {
    (svc as any).platform = 'linux';
    (svc as any).channelRepo = { findOne: jest.fn().mockResolvedValue(mkChannel()) };
    (svc as any).groupRepo = { findOne: jest.fn().mockResolvedValue(mkGroup()) };
    (svc as any).weixinDbService = { getRuntime: () => mkRuntime() };
    await expect(svc.syncGroup(1)).rejects.toThrow(/unsupported-platform/);
  });

  it('rejects when channel not weixin-db', async () => {
    (svc as any).channelRepo = { findOne: jest.fn().mockResolvedValue(mkChannel({ type: 'weixin' })) };
    (svc as any).groupRepo = { findOne: jest.fn().mockResolvedValue(mkGroup()) };
    (svc as any).weixinDbService = { getRuntime: () => mkRuntime() };
    await expect(svc.syncGroup(1)).rejects.toThrow(/unsupported-channel-type/);
  });

  it('rejects when loginStatus !== connected', async () => {
    (svc as any).channelRepo = { findOne: jest.fn().mockResolvedValue(mkChannel({ loginStatus: 'disconnected' })) };
    (svc as any).groupRepo = { findOne: jest.fn().mockResolvedValue(mkGroup()) };
    (svc as any).weixinDbService = { getRuntime: () => mkRuntime() };
    await expect(svc.syncGroup(1)).rejects.toThrow(/channel-not-connected/);
  });

  it('rejects when runtime not bound', async () => {
    (svc as any).channelRepo = { findOne: jest.fn().mockResolvedValue(mkChannel()) };
    (svc as any).groupRepo = { findOne: jest.fn().mockResolvedValue(mkGroup()) };
    (svc as any).weixinDbService = { getRuntime: () => undefined };
    await expect(svc.syncGroup(1)).rejects.toThrow(/channel-not-bound/);
  });

  it('rejects when room not found in resolver', async () => {
    const rt = mkRuntime();
    (rt.reader.getRoomResolver as any) = () => ({ findRoomsByName: () => [] });
    (svc as any).channelRepo = { findOne: jest.fn().mockResolvedValue(mkChannel()) };
    (svc as any).groupRepo = { findOne: jest.fn().mockResolvedValue(mkGroup()) };
    (svc as any).weixinDbService = { getRuntime: () => rt };
    await expect(svc.syncGroup(1)).rejects.toThrow(/room-not-found/);
  });

  it('happy path: writes rows to archive db + updates sync_state', async () => {
    (svc as any).channelRepo = { findOne: jest.fn().mockResolvedValue(mkChannel()) };
    (svc as any).groupRepo = { findOne: jest.fn().mockResolvedValue(mkGroup()) };
    (svc as any).weixinDbService = { getRuntime: () => mkRuntime() };

    const result = await svc.syncGroup(1);
    expect(result.newCount).toBe(2);
    expect(result.totalCount).toBe(2);
    expect(result.lastSyncTs).toBe(101);
    expect(result.durationMs).toBeGreaterThanOrEqual(0);

    // 第二次同步: 数据没变, mock 返回空
    (svc as any).openMessageRepo = jest.fn().mockReturnValue({
      listSince: jest.fn().mockResolvedValue([]),
      close: jest.fn(),
    });
    const r2 = await svc.syncGroup(1);
    expect(r2.newCount).toBe(0);
    expect(r2.totalCount).toBe(2);
    expect(r2.lastSyncTs).toBe(101); // 水位不变
  });

  it('uses isolated archive workDir (not the listener cid-<id>/ root)', async () => {
    (svc as any).channelRepo = { findOne: jest.fn().mockResolvedValue(mkChannel()) };
    (svc as any).groupRepo = { findOne: jest.fn().mockResolvedValue(mkGroup()) };
    (svc as any).weixinDbService = { getRuntime: () => mkRuntime() };

    const decryptSpy = jest.fn().mockReturnValue('/fake/plain.db');
    (svc as any).decryptDb = decryptSpy;

    await svc.syncGroup(1);
    const passedWorkDir = decryptSpy.mock.calls[0][2] as string;
    // 必须以 .../archive 结尾, 不能直接是 cid-6
    expect(passedWorkDir.replace(/\\/g, '/')).toMatch(/cid-6\/archive$/);
  });

  it('writes lastError to sync_state on listSince failure, then rethrows', async () => {
    (svc as any).channelRepo = { findOne: jest.fn().mockResolvedValue(mkChannel()) };
    (svc as any).groupRepo = { findOne: jest.fn().mockResolvedValue(mkGroup()) };
    (svc as any).weixinDbService = { getRuntime: () => mkRuntime() };
    (svc as any).openMessageRepo = jest.fn().mockReturnValue({
      listSince: jest.fn().mockRejectedValue(new Error('disk-corrupt')),
      close: jest.fn(),
    });

    await expect(svc.syncGroup(1)).rejects.toThrow(/disk-corrupt/);

    // 用 ArchiveDb 直接读 sync_state, 应有 last_error
    const { ArchiveDb } = await import('../../../../src/modules/netaclaw/runtime/weixin_db/archive_db.js');
    const db = ArchiveDb.openOrCreate(6, dataDir);
    try {
      const st = db.getSyncState('gA@chatroom');
      expect(st).not.toBeNull();
      expect(st!.lastError).toMatch(/disk-corrupt/);
    } finally {
      db.close();
    }
  });

  it('closes archive db even when error occurs after open', async () => {
    (svc as any).channelRepo = { findOne: jest.fn().mockResolvedValue(mkChannel()) };
    (svc as any).groupRepo = { findOne: jest.fn().mockResolvedValue(mkGroup()) };
    (svc as any).weixinDbService = { getRuntime: () => mkRuntime() };
    (svc as any).openMessageRepo = jest.fn().mockReturnValue({
      listSince: jest.fn().mockRejectedValue(new Error('boom')),
      close: jest.fn(),
    });

    await expect(svc.syncGroup(1)).rejects.toThrow();
    // 第二次同步成功说明 db 句柄已释放
    (svc as any).openMessageRepo = jest.fn().mockReturnValue({
      listSince: jest.fn().mockResolvedValue([]),
      close: jest.fn(),
    });
    await expect(svc.syncGroup(1)).resolves.toBeTruthy();
  });
});
  • Step 2: 跑测试确认失败

Run: pnpm --filter @neta/backend test -- service/weixin_archive_sync.test 2>&1 | tail -10 Expected: FAIL("Cannot find module .../weixin_archive_sync")


Task 11: 主服务 weixin_archive_sync.ts 实现

Files:

  • Create: packages/backend/src/modules/netaclaw/service/weixin_archive_sync.ts

  • Step 1: 写实现

import * as fs from 'node:fs';
import * as path from 'node:path';
import { Provide, Scope, ScopeEnum, Logger, Inject } from '@midwayjs/core';
import type { ILogger } from '@midwayjs/logger';
import { InjectEntityModel } from '@midwayjs/typeorm';
import { Repository } from 'typeorm';
import { resolveDataDir } from '../../../comm/data-dir.js';
import { NetaClawAgentChannelEntity } from '../entity/agent_channel.js';
import { NetaClawAgentChannelGroupEntity } from '../entity/agent_channel_group.js';
import { WeixinDbService } from './weixin_db.js';
import { ArchiveDb } from '../runtime/weixin_db/archive_db.js';
import { MessageRepo } from '../runtime/weixin_db/message_repo.js';
import { projectToArchive } from '../runtime/weixin_db/project_archive.js';
import { decryptDbToWorkDir } from '../runtime/weixin_db/archive_decryptor.js';

export interface SyncResult {
  newCount: number;
  totalCount: number;
  lastSyncTs: number;
  durationMs: number;
}

const BATCH_LIMIT = 500;

@Provide()
@Scope(ScopeEnum.Singleton)
export class WeixinArchiveSyncService {
  @Logger() logger: ILogger;

  @InjectEntityModel(NetaClawAgentChannelEntity)
  channelRepo: Repository<NetaClawAgentChannelEntity>;

  @InjectEntityModel(NetaClawAgentChannelGroupEntity)
  groupRepo: Repository<NetaClawAgentChannelGroupEntity>;

  @Inject() weixinDbService: WeixinDbService;

  /**
   * 测试钩子: 默认走 process.platform / resolveDataDir / decryptDbToWorkDir / new MessageRepo,
   * 测试可在实例上 monkey-patch 这些字段来注入 mock。
   */
  platform: NodeJS.Platform = process.platform;
  resolveDataDir = (): string => resolveDataDir();
  decryptDb = (srcDb: string, rawKey: Buffer, workDir: string): string =>
    decryptDbToWorkDir({ srcDb, rawKey, workDir });
  openMessageRepo = (plainDb: string): {
    listSince(tableName: string, lastTs: number, limit?: number): Promise<any[]>;
    close(): void;
  } => new MessageRepo(plainDb);

  private readonly channelLocks = new Map<number, Promise<void>>();

  /**
   * 同步指定群的聊天记录到 archive sqlite。channel-level mutex 串行,跨 channel 并行。
   */
  async syncGroup(groupId: number): Promise<SyncResult> {
    const group = await this.groupRepo.findOne({ where: { id: groupId } });
    if (!group) throw new Error('group-not-found');

    const cid = group.channelId;
    const prev = this.channelLocks.get(cid) ?? Promise.resolve();
    let release: () => void;
    const next = new Promise<void>(r => (release = r));
    this.channelLocks.set(cid, prev.then(() => next));

    await prev;
    try {
      return await this.doSyncGroup(group);
    } finally {
      release!();
      if (this.channelLocks.get(cid) === next) this.channelLocks.delete(cid);
    }
  }

  /**
   * channel 删除时 cascade 清掉对应 archive 文件,避免泄漏。
   * 找不到文件不报错。
   */
  async deleteChannelArchive(channelId: number): Promise<void> {
    const file = ArchiveDb.archivePath(channelId, this.resolveDataDir());
    for (const suffix of ['', '-wal', '-shm']) {
      const p = file + suffix;
      if (fs.existsSync(p)) {
        try { fs.unlinkSync(p); }
        catch (err: any) {
          this.logger.warn('[archive] rm %s failed: %s', p, err?.message || err);
        }
      }
    }
  }

  private async doSyncGroup(group: NetaClawAgentChannelGroupEntity): Promise<SyncResult> {
    if (this.platform !== 'win32') throw new Error('unsupported-platform');

    const channel = await this.channelRepo.findOne({ where: { id: group.channelId } });
    if (!channel) throw new Error('channel-not-found');
    if (channel.type !== 'weixin-db') throw new Error('unsupported-channel-type');
    if (channel.loginStatus !== 'connected') throw new Error('channel-not-connected');

    const runtime = this.weixinDbService.getRuntime(channel.id);
    if (!runtime) throw new Error('channel-not-bound');

    // 找群 (按名字, trim 比较抗首尾空格)
    const resolver = runtime.reader.getRoomResolver();
    const matches = resolver.findRoomsByName(group.roomName ?? '');
    if (matches.length === 0) throw new Error('room-not-found');
    if (matches.length > 1) {
      this.logger.warn('[archive] %d rooms with same name=%s, using first', matches.length, group.roomName);
    }
    const room = matches[0];

    const dataDir = this.resolveDataDir();
    // ★ 关键: archive 用独立子目录, 避免与监听 (cid-<id>/) 的 IncrementalReader
    //   每 500ms 覆盖写 message_0.db 解密文件的竞争
    const workDir = path.join(dataDir, 'weixin-db-work', `cid-${channel.id}`, 'archive');

    const t0 = Date.now();
    // 解出 room 之后才打开 archive db; 这样失败时能写 sync_state.last_error
    const archive = ArchiveDb.openOrCreate(channel.id, dataDir);
    let cursorTs = archive.getSyncState(room.username)?.lastSyncTs ?? 0;
    let newCount = 0;

    let plainDb: string;
    try {
      plainDb = this.decryptDb(runtime.paths.messageDb, runtime.messageKey, workDir);
    } catch (err: any) {
      this.logger.error('[archive] decrypt failed cid=%s: %s', channel.id, err?.message || err);
      this.tryWriteLastError(archive, room.username, room.roomName, `decrypt-failed: ${err?.message || err}`);
      archive.close();
      throw new Error('decrypt-failed');
    }

    let repo: ReturnType<WeixinArchiveSyncService['openMessageRepo']> | null = null;
    try {
      repo = this.openMessageRepo(plainDb);
      while (true) {
        const rows = await repo.listSince(room.tableName, cursorTs, BATCH_LIMIT);
        if (!rows || rows.length === 0) break;
        const archiveRows = rows.map(r => projectToArchive(r, channel.id, room));
        newCount += archive.insertMessages(archiveRows);
        cursorTs = rows[rows.length - 1].createTime;
        if (rows.length < BATCH_LIMIT) break;
      }

      const totalCount = archive.countMessages(room.username);
      archive.updateSyncState(room.username, {
        roomName: room.roomName,
        lastSyncTs: cursorTs,
        lastSyncCount: newCount,
        totalCount,
        lastSyncAt: Math.floor(Date.now() / 1000),
        lastError: null,
      });
      const durationMs = Date.now() - t0;
      this.logger.info('[archive] sync cid=%s room=%s new=%d total=%d ts=%d in %dms',
        channel.id, group.roomName, newCount, totalCount, cursorTs, durationMs);
      return { newCount, totalCount, lastSyncTs: cursorTs, durationMs };
    } catch (err: any) {
      this.tryWriteLastError(archive, room.username, room.roomName, err?.message || String(err));
      throw err;
    } finally {
      try { repo?.close(); } catch { /* ignore */ }
      try { archive.close(); } catch { /* ignore */ }
    }
  }

  /**
   * best-effort 写 sync_state.last_error; 写失败也吞掉, 不掩盖原异常。
   */
  private tryWriteLastError(
    archive: ArchiveDb,
    roomId: string,
    roomName: string | null,
    message: string,
  ): void {
    try {
      archive.updateSyncState(roomId, {
        roomName,
        lastSyncAt: Math.floor(Date.now() / 1000),
        lastError: message,
      });
    } catch (e: any) {
      this.logger.warn('[archive] write lastError failed: %s', e?.message || e);
    }
  }
}
  • Step 2: 跑测试通过

Run: pnpm --filter @neta/backend test -- service/weixin_archive_sync.test 2>&1 | tail -15 Expected: 6 passed

  • Step 3: Commit
git add packages/backend/src/modules/netaclaw/service/weixin_archive_sync.ts \
        packages/backend/test/modules/netaclaw/service/weixin_archive_sync.test.ts
git commit -m "feat(weixin-archive): WeixinArchiveSyncService syncGroup 主流程 (mutex + 增量水位)"

Task 12: 并发测试(channel-level mutex)

Files:

  • Create: packages/backend/test/modules/netaclaw/service/weixin_archive_sync.concurrency.test.ts

  • Step 1: 写测试

import { WeixinArchiveSyncService } from '../../../../src/modules/netaclaw/service/weixin_archive_sync.js';

function mkLogger() {
  return { info: jest.fn(), warn: jest.fn(), error: jest.fn(), debug: jest.fn() };
}

describe('WeixinArchiveSyncService concurrency', () => {
  it('serializes calls within the same channel', async () => {
    const svc = new WeixinArchiveSyncService();
    (svc as any).logger = mkLogger();

    const startTimes: Record<number, number> = {};
    const endTimes: Record<number, number> = {};

    // mock doSyncGroup so we don't touch fs / db
    let counter = 0;
    (svc as any).doSyncGroup = async (group: any) => {
      const id = ++counter;
      startTimes[id] = Date.now();
      await new Promise(r => setTimeout(r, 50));
      endTimes[id] = Date.now();
      return { newCount: 0, totalCount: 0, lastSyncTs: 0, durationMs: 50 };
    };
    (svc as any).groupRepo = {
      findOne: jest.fn().mockImplementation(async ({ where: { id } }: any) => ({ id, channelId: 1 })),
    };

    // 同 channel 三个并发请求 → 应串行
    await Promise.all([svc.syncGroup(11), svc.syncGroup(12), svc.syncGroup(13)]);
    expect(startTimes[2]).toBeGreaterThanOrEqual(endTimes[1]);
    expect(startTimes[3]).toBeGreaterThanOrEqual(endTimes[2]);
  });

  it('runs in parallel across different channels', async () => {
    const svc = new WeixinArchiveSyncService();
    (svc as any).logger = mkLogger();

    const startTimes: Record<number, number> = {};
    (svc as any).doSyncGroup = async (group: any) => {
      startTimes[group.id] = Date.now();
      await new Promise(r => setTimeout(r, 50));
      return { newCount: 0, totalCount: 0, lastSyncTs: 0, durationMs: 50 };
    };
    (svc as any).groupRepo = {
      findOne: jest.fn()
        .mockResolvedValueOnce({ id: 21, channelId: 1 })
        .mockResolvedValueOnce({ id: 22, channelId: 2 }),
    };
    await Promise.all([svc.syncGroup(21), svc.syncGroup(22)]);
    // 两个 channel 同时启动,差距 < 20ms
    expect(Math.abs(startTimes[21] - startTimes[22])).toBeLessThan(20);
  });
});
  • Step 2: 跑测试通过

Run: pnpm --filter @neta/backend test -- service/weixin_archive_sync.concurrency 2>&1 | tail -10 Expected: 2 passed

  • Step 3: Commit
git add packages/backend/test/modules/netaclaw/service/weixin_archive_sync.concurrency.test.ts
git commit -m "test(weixin-archive): channel-level mutex 串行/并行行为"

Phase F · 接入业务层 + 前端

Task 13: agent_channel_group.service.sync 委托

Files:

  • Modify: packages/backend/src/modules/netaclaw/service/agent_channel_group.ts

  • Step 1: 加 Inject + 委托方法

文件顶部 import 加:

import { Inject } from '@midwayjs/core';
import { WeixinArchiveSyncService, type SyncResult } from './weixin_archive_sync.js';

(Provide / Scope 等已有,只补 Inject 即可。)

在类内任意位置(推荐放 list 方法之后)加:

  @Inject() archiveSyncService: WeixinArchiveSyncService;

  /** 委托给 WeixinArchiveSyncService 执行同步。 */
  async sync(groupId: number): Promise<SyncResult> {
    return this.archiveSyncService.syncGroup(groupId);
  }
  • Step 2: Commit
git add packages/backend/src/modules/netaclaw/service/agent_channel_group.ts
git commit -m "feat(netaclaw): agent_channel_group.sync 委托给 WeixinArchiveSyncService"

Task 14: Controller endpoint POST /sync

Files:

  • Modify: packages/backend/src/modules/netaclaw/controller/admin/agent_channel_group.ts

  • Step 1: 加 endpoint

错误码到文案的映射放在 controller 层(service 只 throw token),前端不用关心字符串内容。

在 controller 类内任意位置(推荐放 delete 方法之前)添加:

  @Post('/sync')
  async sync(@Body() body: { groupId: number }) {
    if (!body?.groupId) return { code: 1003, message: 'groupId is required' };
    try {
      const data = await this.groupService.sync(body.groupId);
      return { code: 1000, data };
    } catch (err: any) {
      const token = err?.message || 'sync-failed';
      const message = ERROR_MESSAGES[token] ?? `同步失败: ${token}`;
      return { code: 1003, message };
    }
  }

并在文件顶部(import 之后,装饰器之前)添加错误码字典:

const ERROR_MESSAGES: Record<string, string> = {
  'group-not-found': '群记录不存在',
  'channel-not-found': '频道记录不存在',
  'unsupported-channel-type': '本频道不支持同步',
  'channel-not-connected': '频道未连接,请检查 PC 微信是否登录',
  'channel-not-bound': '频道未连接,请检查 PC 微信是否登录',
  'room-not-found': '未找到该群消息,请检查群名是否与 PC 微信中显示的完全一致',
  'decrypt-failed': '解密失败,可能登录态已失效,请重启 PC 微信后重试',
  'unsupported-platform': '本功能需要 Windows + PC 微信',
};
  • Step 2: 启 backend 验证路由注册

Run: pnpm --filter @neta/backend dev & (后台启动) 然后:

curl -s -X POST http://localhost:8003/admin/netaclaw/agent_channel_group/sync \
  -H 'Content-Type: application/json' \
  -H 'Authorization: <你的 token>' \
  -d '{"groupId": 1}'

Expected: 返回 JSON(成功或合理的错误信息,不是 404)。看到结果后 kill 后台 backend。

  • Step 3: Commit
git add packages/backend/src/modules/netaclaw/controller/admin/agent_channel_group.ts
git commit -m "feat(netaclaw): POST /admin/netaclaw/agent_channel_group/sync 端点 + 错误码映射"

Task 15: agent_channel.service.delete cascade rm archive 文件

Files:

  • Modify: packages/backend/src/modules/netaclaw/service/agent_channel.ts

  • Step 1: 找到 delete 方法

文件中现有(参考已读过的 126-133 行):

  async delete(ids: number[]) {
    for (const id of ids) {
      this.stopRunner(id);
      await this.groupService.cascadeDeleteByChannel(id);
      this.weixinDbService.unbindChannel(id);
    }
    await this.channelRepo.delete(ids);
  }
  • Step 2: 加 archive 清理 + Inject

类顶部 Inject 块加:

  @Inject() archiveSyncService: WeixinArchiveSyncService;

(同时文件顶部 import 加 import { WeixinArchiveSyncService } from './weixin_archive_sync.js';)

delete 方法循环内加:

  async delete(ids: number[]) {
    for (const id of ids) {
      this.stopRunner(id);
      await this.groupService.cascadeDeleteByChannel(id);
      this.weixinDbService.unbindChannel(id);
      await this.archiveSyncService.deleteChannelArchive(id);
    }
    await this.channelRepo.delete(ids);
  }
  • Step 3: 跑 agent_channel 现有测试零回归

Run: pnpm --filter @neta/backend test -- service/agent_channel 2>&1 | tail -10 Expected: 全 pass

  • Step 4: Commit
git add packages/backend/src/modules/netaclaw/service/agent_channel.ts
git commit -m "feat(netaclaw): channel.delete cascade 清理 weixin-archive sqlite 文件"

Task 16: 前端添加"同步聊天记录"按钮 + handler

Files:

  • Modify: packages/frontend/src/modules/agent/components/channel-group-panel.vue

  • Step 1: 给 GroupItem 加 _syncing 字段

文件内有 interface GroupItem 或类似类型。定位后,在 client-only 临时字段那一段加(若没有相应 interface,直接在 ref 初始化时挂):

interface GroupItem {
  id: number;
  channelId: number;
  roomId: string;
  roomName: string | null;
  status: number;
  /* ... 其他既有字段 ... */
  _syncing?: boolean;
}

(若 GroupItem 已用 inline 类型 / any,则不需要改;直接在模板/handler 用 group._syncing 即可。)

  • Step 2: 模板加按钮

定位 group-card__actions div(约 83-87 行):

<div class="group-card__actions">
  <el-button size="small" @click="jumpToChat(group)">查看对话记录</el-button>
  <el-button size="small" type="primary" @click="handleSavePolicy(group)">保存策略</el-button>
  <el-button size="small" type="danger" link @click="handleDelete(group)">删除</el-button>
</div>

改为(同时禁用原"查看对话记录"按钮,它会跳到一个不存在的 chat session,会让用户疑惑;留 tooltip 说明):

<div class="group-card__actions">
  <el-button
    size="small"
    :loading="group._syncing"
    @click="handleSync(group)"
  >
    {{ group._syncing ? '同步中...' : '同步聊天记录' }}
  </el-button>
  <el-tooltip content="本功能开发中,请先用'同步聊天记录'按钮把消息存档到本地" placement="top">
    <el-button size="small" disabled>查看对话记录</el-button>
  </el-tooltip>
  <el-button size="small" type="primary" @click="handleSavePolicy(group)">保存策略</el-button>
  <el-button size="small" type="danger" link @click="handleDelete(group)">删除</el-button>
</div>
  • Step 3: 在 <script setup> 加 handler

定位 function jumpToChatfunction handleDelete,在其旁边加:

async function handleSync(group: GroupItem) {
  if (group._syncing) return;
  group._syncing = true;
  try {
    const resp = await apiPost('/admin/netaclaw/agent_channel_group/sync', { groupId: group.id });
    if (resp.code !== 1000) {
      ElMessage.error(resp.message || '同步失败');
      return;
    }
    const { newCount, totalCount, durationMs } = resp.data as {
      newCount: number; totalCount: number; durationMs: number;
    };
    ElMessage.success(`已同步 ${newCount} 条新消息(总计 ${totalCount} 条,${(durationMs / 1000).toFixed(1)}s)`);
  } catch (err: any) {
    ElMessage.error(err?.message || '同步失败');
  } finally {
    group._syncing = false;
  }
}
  • Step 4: type-check + 手工冒烟

Run: pnpm --filter @neta/frontend type-check 2>&1 | tail -10 Expected: 无新增 error(若 GroupItem 是 any 类型则无需改)

启动前后端:

pnpm --filter @neta/backend dev &
pnpm --filter @neta/frontend dev &

浏览器打开群聊管理页 → 点"同步聊天记录"按钮 → 看到 loading + 完成后 toast 显示 "已同步 N 条新消息(总计 M 条)" 即通过。

  • Step 5: Commit
git add packages/frontend/src/modules/agent/components/channel-group-panel.vue
git commit -m "feat(agent-fe): 群卡片新增 '同步聊天记录' 按钮 + handler"

Phase G · 端到端手工冒烟 + 验证报告

Task 17: E2E checklist + 验证报告

Files:

  • Create: docs/superpowers/followups/2026-05-13-weixin-archive-sync-e2e.md

前置:

  • Windows + Weixin 4.x 已登录 + 至少一个测试群
  • 频道 type=weixin-db 已 bind 成功(loginStatus='connected')
  • 该群已通过"+ 添加群"加入白名单(netaclaw_agent_channel_group 表有对应 row)

Checklist:

  • E2E-1: 群里先发 3 条文本消息(为了让 archive 有内容可拉)

  • E2E-2: 前端群卡片 → 点"同步聊天记录" → 按钮 loading → 几秒后 toast 显示 "已同步 N 条新消息(总计 M 条)",N ≥ 3

  • E2E-3: 在文件系统中确认 <dataDir>/weixin-archive/cid-<channelId>.db 存在,大小 > 0

  • E2E-4: 用 sqlite3 CLI 或 DB Browser 打开 archive 文件,SELECT COUNT(*) FROM message WHERE room_id=<群的wxid@chatroom> ≥ 3

  • E2E-5: SELECT * FROM sync_state 含一行,last_sync_ts > 0,last_error IS NULL

  • E2E-6: 再点一次"同步聊天记录"(中间不发新消息) → toast 显示 "已同步 0 条新消息(总计 M 条)",说明增量幂等

  • E2E-7: 群里再发 2 条 → 同步 → toast "已同步 2 条新消息(总计 M+2 条)"

  • E2E-8: 把频道改成禁用(status=0)再点同步 → toast 显示 "频道未连接,请检查 PC 微信是否登录" 或类似错误(说明错误码映射工作)

  • E2E-9: 改一个不存在的群名(groupId 对应的 group.roomName 改为不存在的) → 同步 → toast "未找到该群消息..."

  • E2E-10: 在前端"删除频道" → 文件系统中 <dataDir>/weixin-archive/cid-<channelId>.db 应消失

  • E2E-11: 跨群并发(同 channel 两个群同时点)→ 应顺序完成,无 db 损坏

  • E2E-12: Linux/Mac 上跑同步 → toast "本功能需要 Windows + PC 微信"

  • Step 1: 逐条手工跑

  • Step 2: 写验证报告

mkdir -p docs/superpowers/followups

把发现写入 docs/superpowers/followups/2026-05-13-weixin-archive-sync-e2e.md:

# weixin-archive sync E2E 验证 (2026-05-13)

## 环境
- Weixin 版本: <>
- channel id / name: <>
- group id / roomName: <>

## Checklist 结果
- [x] E2E-1 ...
- [x] E2E-2 ...
- [ ] E2E-X 失败原因: <>

## 已知问题 / Follow-ups
- (若有发现的小问题但不阻塞合并,记在这里)
  • Step 3: Commit
git add docs/superpowers/followups/2026-05-13-weixin-archive-sync-e2e.md
git commit -m "docs(weixin-archive): E2E 验证报告"

自检 (Self-Review)

1. Spec 覆盖:

Spec 章节 覆盖 Task
§1 背景与目标 + out-of-scope 整个 plan 不做查看 UI / 附件解密 / 监听变更
§2.1 触发(同步聊天记录按钮) Task 16
§2.2 首次全量 + 后续增量(水位) Task 11 cursorTs = state?.lastSyncTs ?? 0
§2.3 错误反馈映射 Task 14 ERROR_MESSAGES 字典
§3 整体架构(mutex + decrypt + resolve + INSERT OR IGNORE) Task 11 doSyncGroup
§3.1 ★ 工作目录隔离(archive 用 cid-/archive 子目录) Task 11 workDir = path.join(dataDir, 'weixin-db-work', \cid-${cid}`, 'archive')` + Task 10 新增 "uses isolated archive workDir" 测试
§3.2 资源释放 + best-effort lastError Task 11 双层 try/finally + tryWriteLastError + Task 10 新增 "writes lastError" / "closes archive db even when error" 测试
§4.1 文件位置 <dataDir>/weixin-archive/cid-<id>.db Task 5 archivePath
§4.2 表结构 message / sync_state Task 5 CREATE SQL
§5.1 新文件:archive_db / archive_decryptor / archive_types / sender_parser / project_archive / weixin_archive_sync Task 1 / 2 / 3 / 5 / 6 / 11
§5.2 修改:incremental_reader / build_pseudo / room_resolver / weixin_db / agent_channel / agent_channel_group / controller / 前端 Task 1 / 2 / 7 / 8 / 9 / 13 / 14 / 15 / 16
§5.3 API 契约 POST /sync Task 14
§5.3 Service 接口 syncGroup / deleteChannelArchive Task 11 / 15
§5.4 Channel-level mutex Task 11 + Task 12 验证
§5.5 流式分批 500 LIMIT Task 11 BATCH_LIMIT
§6 前端按钮 + handler Task 16
§6.4 list 元数据 spec 已显式标 out-of-scope,无对应 task,一致
§7 错误码 9 场景 Task 14 字典 + Task 10 测试 5 个错误分支
§8.1 单元测试 archive_db / weixin_archive_sync / concurrency / parseSenderPrefix Task 4 / 10 / 12 / 2
§8.3 E2E checklist Task 17
§10 群名 trim 兜底 Task 7 实现 + 单测 "trims both sides"

2. Placeholder 扫描:

  • 无 TBD / TODO。
  • 所有 Step 都有具体代码或具体命令。
  • "找到 list 方法之后插入" 这类定位说明都给了行号或上下文。

3. 类型一致性:

  • ArchiveMessage 字段在 Task 3 / 4 / 5 / 6 / 11 一致(channelId/roomId/roomName/msgTable/localId/serverId/localType/senderWxid/createTime/content/rawHex)
  • SyncState 字段在 Task 3 / 4 / 5 / 11 一致
  • SyncResult 字段在 Task 10 / 11 / 16 一致(newCount / totalCount / lastSyncTs / durationMs)
  • 错误码 token 在 Task 10 / 11 / 14 一致(channel-not-connected / room-not-found 等)
  • WeixinDbService.getRuntime 返回 ChannelRuntime | undefined,Task 8 定义 / Task 11 使用一致
  • IncrementalReader.getRoomResolver() 返回 RoomResolver,Task 9 定义 / Task 11 使用 .findRoomsByName 一致

4. 跨 Phase 衔接:

  • Phase A 抽公共函数 → Phase B/C/E 直接 import
  • Phase D 暴露 runtime 访问器 → Phase E 通过 getRuntime / getRoomResolver 拿到运行态
  • Phase F 接 service / controller / 前端 → Phase G E2E 验证整链路

5. DEV 可行性:

  • Phase A-E 全部可在 Linux/Mac 跑单测(better-sqlite3 跨平台,mock 掉 platform / decryptDb 即可)
  • Phase G E2E 需要 Windows + Weixin 登录

Execution Handoff

Plan 完整保存在 docs/superpowers/plans/2026-05-13-weixin-archive-sync.md