65 KiB
Weixin 群聊记录同步到 Neta 本地存档(weixin-archive sync)实施计划
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: 群聊管理页加"同步聊天记录"按钮 → 后端解 WCDB → 增量落地到 <dataDir>/weixin-archive/cid-<channelId>.db SQLite。
Architecture: 新增独立 WeixinArchiveSyncService,复用现有 wcdb_codec / RoomResolver / MessageRepo;通过 WeixinDbService.getRuntime(cid) 获取已 bind 的 reader 和 key。同步走 channel-level mutex,better-sqlite3 写入 per-channel 文件,INSERT OR IGNORE + last_sync_ts 水位实现幂等增量。
Tech Stack: Midway.js 3.20 + TypeORM(MySQL,只读 group/channel 元数据)+ better-sqlite3(archive 文件)+ Vue 3 + Element Plus + fetch。
Spec: docs/superpowers/specs/2026-05-13-weixin-archive-sync-design.md
前置依赖:
- 2026-05-12 weixin-db channel plan 已合并:
wcdb_codec/RoomResolver/MessageRepo/WeixinDbService.bindChannel已工作 - 频道已 bind 成功(
loginStatus='connected'),WeixinDbService.runtimesmap 持有运行态
关键约束:
- 每 Task 一个 commit
- TDD:先写测试再实现
- 不动
IncrementalReader.readIncrement/WalWatcher/agent_channel.routeInboundMessage(监听链路独立) - archive 不重新 spawn ps1,复用 bindChannel 已抽好的 key
- channel-level mutex,不是 group-level
- 同步过程后端阻塞,前端 loading,无 job 队列(YAGNI)
- 错误码用字符串 token(
channel-not-connected/room-not-found等),前端映射文案
文件结构
新增
| 文件 | 责任 |
|---|---|
packages/backend/src/modules/netaclaw/runtime/weixin_db/archive_decryptor.ts |
抽出 decryptToWork 公共函数 |
packages/backend/src/modules/netaclaw/runtime/weixin_db/archive_db.ts |
better-sqlite3 包装:openOrCreate / insertMessages / sync_state CRUD / countMessages |
packages/backend/src/modules/netaclaw/runtime/weixin_db/archive_types.ts |
ArchiveMessage / SyncState 接口 |
packages/backend/src/modules/netaclaw/runtime/weixin_db/sender_parser.ts |
parseSenderPrefix 从 build_pseudo.ts 抽出公共 |
packages/backend/src/modules/netaclaw/runtime/weixin_db/project_archive.ts |
projectToArchive(row, channelId, room): ArchiveMessage 纯函数 |
packages/backend/src/modules/netaclaw/service/weixin_archive_sync.ts |
主服务:syncGroup(groupId) + channel-level mutex + deleteChannelArchive(cid) |
packages/backend/test/modules/netaclaw/runtime/weixin_db/archive_db.test.ts |
表创建 / 插入幂等 / sync_state CRUD |
packages/backend/test/modules/netaclaw/runtime/weixin_db/project_archive.test.ts |
投影纯函数 |
packages/backend/test/modules/netaclaw/runtime/weixin_db/sender_parser.test.ts |
sender 前缀解析 |
packages/backend/test/modules/netaclaw/service/weixin_archive_sync.test.ts |
服务流程 mock 测试 |
packages/backend/test/modules/netaclaw/service/weixin_archive_sync.concurrency.test.ts |
channel-level mutex 串行 / 并行验证 |
修改
| 文件 | 改动 |
|---|---|
packages/backend/src/modules/netaclaw/runtime/weixin_db/incremental_reader.ts |
decryptToWork 改为 import archive_decryptor.decryptDbToWorkDir |
packages/backend/src/modules/netaclaw/runtime/weixin_db/build_pseudo.ts |
parseSenderPrefix 改为 import sender_parser |
packages/backend/src/modules/netaclaw/runtime/weixin_db/room_resolver.ts |
新增 findRoomsByName(roomName) 方法 |
packages/backend/src/modules/netaclaw/service/weixin_db.ts |
ChannelRuntime 加 messageKey 字段 + 新增 getRuntime(cid) |
packages/backend/src/modules/netaclaw/service/agent_channel.ts |
delete(ids) cascade rm archive 文件 |
packages/backend/src/modules/netaclaw/service/agent_channel_group.ts |
新增 sync(groupId) 委托 |
packages/backend/src/modules/netaclaw/controller/admin/agent_channel_group.ts |
新增 POST /sync endpoint |
packages/frontend/src/modules/agent/components/channel-group-panel.vue |
群卡片新增"同步聊天记录"按钮 + handler |
Phase A · 公共函数抽出(零行为变更的纯重构)
先把
decryptToWork和parseSenderPrefix抽出来,后续 sync service 直接 import。
Task 1: 抽出 archive_decryptor.ts
Files:
-
Create:
packages/backend/src/modules/netaclaw/runtime/weixin_db/archive_decryptor.ts -
Modify:
packages/backend/src/modules/netaclaw/runtime/weixin_db/incremental_reader.ts -
Step 1: 写新文件
archive_decryptor.ts
import * as fs from 'node:fs';
import * as path from 'node:path';
import { decryptDatabase } from './wcdb_codec.js';
export interface DecryptOptions {
srcDb: string;
rawKey: Buffer;
workDir: string;
}
/**
* 把加密 DB 原地 copy 到 workDir/src/ 再解密到 workDir/decrypted/ 下,返回明文 DB 路径。
* 微信持有独占锁,必须先 copy 副本; -wal / -shm 如存在也尝试 copy(锁文件 copy 失败可忽略)。
* 解密目标 DB 自带完整数据,删掉残留 -wal / -shm,避免 better-sqlite3 误读。
*/
export function decryptDbToWorkDir(opts: DecryptOptions): string {
const { srcDb, rawKey, workDir } = opts;
const srcDir = path.join(workDir, 'src');
const outDir = path.join(workDir, 'decrypted');
fs.mkdirSync(srcDir, { recursive: true });
fs.mkdirSync(outDir, { recursive: true });
const name = path.basename(srcDb);
const srcCopy = path.join(srcDir, name);
fs.copyFileSync(srcDb, srcCopy);
for (const suffix of ['-wal', '-shm']) {
const s = srcDb + suffix;
if (fs.existsSync(s)) {
try { fs.copyFileSync(s, srcCopy + suffix); } catch { /* 锁文件 copy 失败可忽略 */ }
}
}
const encrypted = fs.readFileSync(srcCopy);
const decrypted = decryptDatabase(encrypted, rawKey);
const outPath = path.join(outDir, name);
fs.writeFileSync(outPath, decrypted);
for (const suffix of ['-wal', '-shm']) {
const p = outPath + suffix;
if (fs.existsSync(p)) {
try { fs.unlinkSync(p); } catch { /* ignore */ }
}
}
return outPath;
}
- Step 2: 改
incremental_reader.ts改用此函数
定位 private decryptToWork(srcDb, rawKey) 方法(约 83-108 行),整段删除。文件顶部 import 加:
import { decryptDbToWorkDir } from './archive_decryptor.js';
并删除 import { decryptDatabase } from './wcdb_codec.js'; 这一行(由 archive_decryptor 内部使用,本文件不再直接用)。
然后把内部 3 处 this.decryptToWork(X, Y) 全部替换为:
decryptDbToWorkDir({ srcDb: X, rawKey: Y, workDir: this.cfg.workDir })
具体替换点:
-
initialize()方法内 3 处(sessionDb/contactDb/messageDb各一次) -
readIncrement()方法内 1 处(messageDb) -
Step 3: 跑现有 weixin_db 单测确保零回归
Run: pnpm --filter @neta/backend test -- runtime/weixin_db 2>&1 | tail -15
Expected: 全 pass(应仍能跑 wcdb_codec / room_resolver / build_pseudo / db_paths 等已有测试)
- Step 4: Commit
git add packages/backend/src/modules/netaclaw/runtime/weixin_db/archive_decryptor.ts \
packages/backend/src/modules/netaclaw/runtime/weixin_db/incremental_reader.ts
git commit -m "refactor(weixin-db): 抽出 decryptDbToWorkDir 公共函数 (供 archive 复用)"
Task 2: 抽出 sender_parser.ts
Files:
-
Create:
packages/backend/src/modules/netaclaw/runtime/weixin_db/sender_parser.ts -
Create:
packages/backend/test/modules/netaclaw/runtime/weixin_db/sender_parser.test.ts -
Modify:
packages/backend/src/modules/netaclaw/runtime/weixin_db/build_pseudo.ts -
Step 1: 写失败测试
sender_parser.test.ts
import { parseSenderPrefix } from '../../../../../src/modules/netaclaw/runtime/weixin_db/sender_parser.js';
describe('parseSenderPrefix', () => {
it('parses "wxid_alice:\\n..." prefix', () => {
const r = parseSenderPrefix('wxid_alice:\n你好');
expect(r).toEqual({ senderWxid: 'wxid_alice', body: '你好' });
});
it('returns null when no colon-newline', () => {
expect(parseSenderPrefix('系统消息')).toBeNull();
});
it('returns null when prefix not starting with wxid_', () => {
expect(parseSenderPrefix('alice:\n你好')).toBeNull();
});
it('returns null when prefix is empty (colon at index 0)', () => {
expect(parseSenderPrefix(':\nfoo')).toBeNull();
});
});
- Step 2: 跑测试确认失败
Run: pnpm --filter @neta/backend test -- runtime/weixin_db/sender_parser 2>&1 | tail -10
Expected: FAIL(module 不存在)
- Step 3: 写实现
sender_parser.ts
/**
* 群消息 message_content 的 sender 前缀格式: "wxid_xxx:\n<正文>"。
* - 非群消息 / 系统消息无此前缀 → 返回 null
* - 前缀必须以 "wxid_" 起始,且 ":\n" 不在 index 0(避免空前缀误判)
*/
export function parseSenderPrefix(content: string): { senderWxid: string; body: string } | null {
const idx = content.indexOf(':\n');
if (idx <= 0) return null;
const prefix = content.slice(0, idx);
if (!prefix.startsWith('wxid_')) return null;
return { senderWxid: prefix, body: content.slice(idx + 2) };
}
- Step 4: 跑测试通过
Run: pnpm --filter @neta/backend test -- runtime/weixin_db/sender_parser 2>&1 | tail -10
Expected: 4 passed
- Step 5: 改
build_pseudo.ts改用公共函数
把文件底部的 function parseSenderPrefix(...) 整段删除,顶部 import 添加:
import { parseSenderPrefix } from './sender_parser.js';
- Step 6: 跑 build_pseudo 现有测试零回归
Run: pnpm --filter @neta/backend test -- runtime/weixin_db/build_pseudo 2>&1 | tail -10
Expected: 全 pass
- Step 7: Commit
git add packages/backend/src/modules/netaclaw/runtime/weixin_db/sender_parser.ts \
packages/backend/src/modules/netaclaw/runtime/weixin_db/build_pseudo.ts \
packages/backend/test/modules/netaclaw/runtime/weixin_db/sender_parser.test.ts
git commit -m "refactor(weixin-db): 抽出 parseSenderPrefix 公共函数"
Phase B · ArchiveDb 模块(TDD)
Task 3: archive_types.ts 类型定义
Files:
-
Create:
packages/backend/src/modules/netaclaw/runtime/weixin_db/archive_types.ts -
Step 1: 写实现
/**
* 一条入档消息(写入 SQLite message 表的 row 投影)。
*/
export interface ArchiveMessage {
channelId: number;
roomId: string; // 群 username,如 xxx@chatroom
roomName: string | null; // 同步时刻的群显示名
msgTable: string; // 'Msg_<md5>'
localId: number;
serverId: string | null; // BigInt → TEXT 保留精度
localType: number;
senderWxid: string | null;
createTime: number; // Unix 秒
content: string | null; // zstd 解后的 UTF-8 (文本=正文; 非文本=完整 XML)
rawHex: string | null; // 极少数解失败时的兜底 hex
}
/**
* 每群水位 (sync_state 表的 row)。
*/
export interface SyncState {
roomId: string;
roomName: string | null;
lastSyncTs: number; // create_time 高水位
lastSyncCount: number; // 上次 INSERT OR IGNORE 实际入档数
totalCount: number; // 该 room 当前 archive 累计总数
lastSyncAt: number; // 上次完成时间戳 (Unix 秒)
lastError: string | null;
}
- Step 2: Commit
git add packages/backend/src/modules/netaclaw/runtime/weixin_db/archive_types.ts
git commit -m "feat(weixin-archive): 定义 ArchiveMessage / SyncState 类型"
Task 4: archive_db.ts 写失败测试
Files:
-
Create:
packages/backend/test/modules/netaclaw/runtime/weixin_db/archive_db.test.ts -
Step 1: 写测试
import * as fs from 'node:fs';
import * as os from 'node:os';
import * as path from 'node:path';
import { ArchiveDb } from '../../../../../src/modules/netaclaw/runtime/weixin_db/archive_db.js';
import type { ArchiveMessage } from '../../../../../src/modules/netaclaw/runtime/weixin_db/archive_types.js';
const tempDirs: string[] = [];
function tmpDataDir(): string {
const d = fs.mkdtempSync(path.join(os.tmpdir(), 'archivedb-'));
tempDirs.push(d);
return d;
}
afterAll(() => {
for (const d of tempDirs) {
try { fs.rmSync(d, { recursive: true, force: true }); } catch { /* ignore */ }
}
});
function row(over: Partial<ArchiveMessage> = {}): ArchiveMessage {
return {
channelId: 6,
roomId: 'gA@chatroom',
roomName: '产品研发群',
msgTable: 'Msg_abc',
localId: 1,
serverId: '999',
localType: 1,
senderWxid: 'wxid_a',
createTime: 1700000000,
content: 'hello',
rawHex: null,
...over,
};
}
describe('ArchiveDb', () => {
it('archivePath returns <dataDir>/weixin-archive/cid-<id>.db', () => {
expect(ArchiveDb.archivePath(7, '/data').replace(/\\/g, '/'))
.toBe('/data/weixin-archive/cid-7.db');
});
it('openOrCreate creates file + tables', () => {
const dir = tmpDataDir();
const db = ArchiveDb.openOrCreate(1, dir);
try {
expect(fs.existsSync(path.join(dir, 'weixin-archive', 'cid-1.db'))).toBe(true);
expect(db.countMessages('any-room')).toBe(0);
expect(db.getSyncState('any-room')).toBeNull();
} finally {
db.close();
}
});
it('insertMessages returns number of new rows (INSERT OR IGNORE)', () => {
const db = ArchiveDb.openOrCreate(1, tmpDataDir());
try {
const first = db.insertMessages([row({ localId: 1 }), row({ localId: 2 })]);
expect(first).toBe(2);
// 同 (room_id, local_id) 重复插入应被忽略
const second = db.insertMessages([row({ localId: 1 }), row({ localId: 3 })]);
expect(second).toBe(1);
expect(db.countMessages('gA@chatroom')).toBe(3);
} finally {
db.close();
}
});
it('updateSyncState writes and reads back', () => {
const db = ArchiveDb.openOrCreate(1, tmpDataDir());
try {
db.updateSyncState('gA@chatroom', {
roomName: '产品研发群',
lastSyncTs: 1700000100,
lastSyncCount: 12,
totalCount: 100,
lastSyncAt: 1700000200,
lastError: null,
});
const s = db.getSyncState('gA@chatroom');
expect(s).not.toBeNull();
expect(s!.lastSyncTs).toBe(1700000100);
expect(s!.lastSyncCount).toBe(12);
expect(s!.totalCount).toBe(100);
expect(s!.lastError).toBeNull();
} finally {
db.close();
}
});
it('updateSyncState merges partial patches (UPSERT semantics)', () => {
const db = ArchiveDb.openOrCreate(1, tmpDataDir());
try {
db.updateSyncState('r', {
roomName: 'R',
lastSyncTs: 100,
lastSyncCount: 5,
totalCount: 5,
lastSyncAt: 200,
lastError: 'oops',
});
// 部分更新: 只清错误 + 更新 lastSyncAt
db.updateSyncState('r', { lastSyncAt: 300, lastError: null });
const s = db.getSyncState('r')!;
expect(s.lastSyncAt).toBe(300);
expect(s.lastError).toBeNull();
expect(s.lastSyncTs).toBe(100); // 未变
expect(s.lastSyncCount).toBe(5); // 未变
} finally {
db.close();
}
});
it('insertMessages stores rawHex when content is null', () => {
const db = ArchiveDb.openOrCreate(1, tmpDataDir());
try {
db.insertMessages([row({ localId: 9, content: null, rawHex: '28b52ffd' })]);
// 直接读底层 sqlite 验证字段
const handle = (db as any).db as import('better-sqlite3').Database;
const r = handle.prepare('SELECT content, raw_hex FROM message WHERE local_id=9').get() as any;
expect(r.content).toBeNull();
expect(r.raw_hex).toBe('28b52ffd');
} finally {
db.close();
}
});
});
- Step 2: 跑测试确认失败
Run: pnpm --filter @neta/backend test -- runtime/weixin_db/archive_db 2>&1 | tail -15
Expected: FAIL("Cannot find module")
Task 5: archive_db.ts 实现
Files:
-
Create:
packages/backend/src/modules/netaclaw/runtime/weixin_db/archive_db.ts -
Step 1: 写实现
import Database = require('better-sqlite3');
import * as fs from 'node:fs';
import * as path from 'node:path';
import type { ArchiveMessage, SyncState } from './archive_types.js';
const CREATE_MESSAGE_SQL = `
CREATE TABLE IF NOT EXISTS message (
id INTEGER PRIMARY KEY AUTOINCREMENT,
channel_id INTEGER NOT NULL,
room_id TEXT NOT NULL,
room_name TEXT,
msg_table TEXT NOT NULL,
local_id INTEGER NOT NULL,
server_id TEXT,
local_type INTEGER NOT NULL,
sender_wxid TEXT,
create_time INTEGER NOT NULL,
content TEXT,
raw_hex TEXT,
synced_at INTEGER NOT NULL,
UNIQUE(room_id, local_id)
)`;
const CREATE_INDEX_ROOM_TIME = `
CREATE INDEX IF NOT EXISTS idx_msg_room_time ON message(room_id, create_time DESC)`;
const CREATE_INDEX_TIME = `
CREATE INDEX IF NOT EXISTS idx_msg_create_time ON message(create_time DESC)`;
const CREATE_SYNC_STATE_SQL = `
CREATE TABLE IF NOT EXISTS sync_state (
room_id TEXT PRIMARY KEY,
room_name TEXT,
last_sync_ts INTEGER NOT NULL DEFAULT 0,
last_sync_count INTEGER NOT NULL DEFAULT 0,
total_count INTEGER NOT NULL DEFAULT 0,
last_sync_at INTEGER NOT NULL,
last_error TEXT
)`;
/**
* 每个 channel 一个 archive sqlite 文件。
* 文件路径: `<dataDir>/weixin-archive/cid-<channelId>.db`
* Single-writer 模式: 调用方通过 channel-level mutex 串行化写入。
*/
export class ArchiveDb {
private constructor(private readonly db: Database.Database) {}
static archivePath(channelId: number, dataDir: string): string {
return path.join(dataDir, 'weixin-archive', `cid-${channelId}.db`);
}
static openOrCreate(channelId: number, dataDir: string): ArchiveDb {
const file = ArchiveDb.archivePath(channelId, dataDir);
fs.mkdirSync(path.dirname(file), { recursive: true });
const db = new Database(file);
db.pragma('journal_mode = WAL');
db.pragma('synchronous = NORMAL');
db.exec(CREATE_MESSAGE_SQL);
db.exec(CREATE_INDEX_ROOM_TIME);
db.exec(CREATE_INDEX_TIME);
db.exec(CREATE_SYNC_STATE_SQL);
return new ArchiveDb(db);
}
/**
* 批量插入,使用 `INSERT OR IGNORE` 保证 (room_id, local_id) 幂等。
* 返回真正写入的行数。
*/
insertMessages(rows: ArchiveMessage[]): number {
if (rows.length === 0) return 0;
const now = Math.floor(Date.now() / 1000);
const stmt = this.db.prepare(`
INSERT OR IGNORE INTO message
(channel_id, room_id, room_name, msg_table, local_id, server_id, local_type,
sender_wxid, create_time, content, raw_hex, synced_at)
VALUES
(@channelId, @roomId, @roomName, @msgTable, @localId, @serverId, @localType,
@senderWxid, @createTime, @content, @rawHex, @syncedAt)
`);
const txn = this.db.transaction((batch: ArchiveMessage[]) => {
let inserted = 0;
for (const r of batch) {
const info = stmt.run({
channelId: r.channelId,
roomId: r.roomId,
roomName: r.roomName,
msgTable: r.msgTable,
localId: r.localId,
serverId: r.serverId,
localType: r.localType,
senderWxid: r.senderWxid,
createTime: r.createTime,
content: r.content,
rawHex: r.rawHex,
syncedAt: now,
});
if (info.changes > 0) inserted++;
}
return inserted;
});
return txn(rows);
}
getSyncState(roomId: string): SyncState | null {
const r = this.db.prepare(`
SELECT room_id, room_name, last_sync_ts, last_sync_count, total_count, last_sync_at, last_error
FROM sync_state WHERE room_id = ?
`).get(roomId) as any;
if (!r) return null;
return {
roomId: r.room_id,
roomName: r.room_name,
lastSyncTs: r.last_sync_ts,
lastSyncCount: r.last_sync_count,
totalCount: r.total_count,
lastSyncAt: r.last_sync_at,
lastError: r.last_error,
};
}
/**
* UPSERT 语义: 不存在则插入,存在则只更新 patch 里提供的字段。
*/
updateSyncState(roomId: string, patch: Partial<Omit<SyncState, 'roomId'>>): void {
const existing = this.getSyncState(roomId);
const merged: SyncState = existing
? {
roomId,
roomName: patch.roomName !== undefined ? patch.roomName : existing.roomName,
lastSyncTs: patch.lastSyncTs !== undefined ? patch.lastSyncTs : existing.lastSyncTs,
lastSyncCount: patch.lastSyncCount !== undefined ? patch.lastSyncCount : existing.lastSyncCount,
totalCount: patch.totalCount !== undefined ? patch.totalCount : existing.totalCount,
lastSyncAt: patch.lastSyncAt !== undefined ? patch.lastSyncAt : existing.lastSyncAt,
lastError: patch.lastError !== undefined ? patch.lastError : existing.lastError,
}
: {
roomId,
roomName: patch.roomName ?? null,
lastSyncTs: patch.lastSyncTs ?? 0,
lastSyncCount: patch.lastSyncCount ?? 0,
totalCount: patch.totalCount ?? 0,
lastSyncAt: patch.lastSyncAt ?? Math.floor(Date.now() / 1000),
lastError: patch.lastError ?? null,
};
this.db.prepare(`
INSERT INTO sync_state
(room_id, room_name, last_sync_ts, last_sync_count, total_count, last_sync_at, last_error)
VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(room_id) DO UPDATE SET
room_name = excluded.room_name,
last_sync_ts = excluded.last_sync_ts,
last_sync_count = excluded.last_sync_count,
total_count = excluded.total_count,
last_sync_at = excluded.last_sync_at,
last_error = excluded.last_error
`).run(
merged.roomId,
merged.roomName,
merged.lastSyncTs,
merged.lastSyncCount,
merged.totalCount,
merged.lastSyncAt,
merged.lastError,
);
}
countMessages(roomId: string): number {
const r = this.db.prepare(`SELECT COUNT(*) AS c FROM message WHERE room_id = ?`).get(roomId) as any;
return r.c as number;
}
close(): void {
this.db.close();
}
}
- Step 2: 跑测试通过
Run: pnpm --filter @neta/backend test -- runtime/weixin_db/archive_db 2>&1 | tail -15
Expected: 6 passed
- Step 3: Commit
git add packages/backend/src/modules/netaclaw/runtime/weixin_db/archive_db.ts \
packages/backend/src/modules/netaclaw/runtime/weixin_db/archive_types.ts \
packages/backend/test/modules/netaclaw/runtime/weixin_db/archive_db.test.ts
git commit -m "feat(weixin-archive): ArchiveDb 模块 (per-channel sqlite + 幂等插入 + sync_state UPSERT)"
Phase C · 投影函数 projectToArchive(TDD)
Task 6: project_archive.ts
Files:
-
Create:
packages/backend/src/modules/netaclaw/runtime/weixin_db/project_archive.ts -
Create:
packages/backend/test/modules/netaclaw/runtime/weixin_db/project_archive.test.ts -
Step 1: 写测试
import { projectToArchive } from '../../../../../src/modules/netaclaw/runtime/weixin_db/project_archive.js';
import type { MessageRow } from '../../../../../src/modules/netaclaw/runtime/weixin_db/message_repo.js';
import type { RoomInfo } from '../../../../../src/modules/netaclaw/runtime/weixin_db/room_resolver.js';
const room: RoomInfo = {
tableName: 'Msg_abc',
username: 'gA@chatroom',
roomName: '产品研发群',
isGroup: true,
};
describe('projectToArchive', () => {
it('parses sender prefix for text message', () => {
const row: MessageRow = {
localId: 11,
serverId: 999n,
localType: 1,
realSenderId: 42n,
createTime: 1700000000,
content: 'wxid_alice:\n你好',
tableName: 'Msg_abc',
};
const a = projectToArchive(row, 6, room);
expect(a.channelId).toBe(6);
expect(a.roomId).toBe('gA@chatroom');
expect(a.roomName).toBe('产品研发群');
expect(a.msgTable).toBe('Msg_abc');
expect(a.localId).toBe(11);
expect(a.serverId).toBe('999');
expect(a.localType).toBe(1);
expect(a.senderWxid).toBe('wxid_alice');
expect(a.createTime).toBe(1700000000);
expect(a.content).toBe('你好');
expect(a.rawHex).toBeNull();
});
it('keeps full XML content for non-text (no sender prefix)', () => {
const xml = '<msg><img md5="abc"/></msg>';
const row: MessageRow = {
localId: 12, serverId: null, localType: 3, realSenderId: 7n,
createTime: 1700001000, content: xml, tableName: 'Msg_abc',
};
const a = projectToArchive(row, 6, room);
expect(a.senderWxid).toBeNull(); // 无前缀
expect(a.content).toBe(xml); // 整段 XML 保留
expect(a.localType).toBe(3);
});
it('serverId BigInt → string preserves precision', () => {
const big = 9_999_999_999_999_999n;
const row: MessageRow = {
localId: 1, serverId: big, localType: 1, realSenderId: null,
createTime: 1, content: 'x', tableName: 'Msg_abc',
};
const a = projectToArchive(row, 1, room);
expect(a.serverId).toBe(big.toString());
});
it('rawHex null by default (content always set)', () => {
const row: MessageRow = {
localId: 1, serverId: null, localType: 1, realSenderId: null,
createTime: 1, content: '', tableName: 'Msg_abc',
};
const a = projectToArchive(row, 1, room);
expect(a.content).toBe('');
expect(a.rawHex).toBeNull();
});
});
- Step 2: 跑测试确认失败
Run: pnpm --filter @neta/backend test -- runtime/weixin_db/project_archive 2>&1 | tail -10
Expected: FAIL("Cannot find module")
- Step 3: 写实现
import { parseSenderPrefix } from './sender_parser.js';
import type { MessageRow } from './message_repo.js';
import type { RoomInfo } from './room_resolver.js';
import type { ArchiveMessage } from './archive_types.js';
/**
* 把 MessageRepo 读到的 row 投影成 archive 写入行。
* - 文本消息(有 wxid_xxx:\n 前缀): senderWxid + body 分离
* - 非文本(无前缀): senderWxid=null, content=完整 XML 原样保留
* - serverId: bigint → string 保精度
* - rawHex: 本投影只处理 content 已是 string 的场景,zstd 解失败时由调用方填 rawHex
*/
export function projectToArchive(
row: MessageRow,
channelId: number,
room: RoomInfo,
): ArchiveMessage {
const parsed = parseSenderPrefix(row.content);
return {
channelId,
roomId: room.username,
roomName: room.roomName ?? null,
msgTable: row.tableName,
localId: row.localId,
serverId: row.serverId != null ? row.serverId.toString() : null,
localType: row.localType,
senderWxid: parsed?.senderWxid ?? null,
createTime: row.createTime,
content: parsed ? parsed.body : row.content,
rawHex: null,
};
}
- Step 4: 跑测试通过
Run: pnpm --filter @neta/backend test -- runtime/weixin_db/project_archive 2>&1 | tail -10
Expected: 4 passed
- Step 5: Commit
git add packages/backend/src/modules/netaclaw/runtime/weixin_db/project_archive.ts \
packages/backend/test/modules/netaclaw/runtime/weixin_db/project_archive.test.ts
git commit -m "feat(weixin-archive): projectToArchive 投影函数 (MessageRow → ArchiveMessage)"
Phase D · 暴露 weixin-db runtime + RoomResolver 名字反查
Task 7: RoomResolver.findRoomsByName
Files:
-
Modify:
packages/backend/src/modules/netaclaw/runtime/weixin_db/room_resolver.ts -
Create:
packages/backend/test/modules/netaclaw/runtime/weixin_db/room_resolver_find.test.ts -
Step 1: 写测试
import { RoomResolver, type RoomInfo } from '../../../../../src/modules/netaclaw/runtime/weixin_db/room_resolver.js';
function mk(over: Partial<RoomInfo>): RoomInfo {
return {
tableName: 'Msg_x',
username: 'u',
roomName: 'R',
isGroup: true,
...over,
};
}
describe('RoomResolver.findRoomsByName', () => {
it('returns all rooms with matching roomName (groups only)', () => {
const r = new RoomResolver();
r._setMap(new Map([
['t1', mk({ tableName: 't1', username: 'g1@chatroom', roomName: '产品研发群', isGroup: true })],
['t2', mk({ tableName: 't2', username: 'g2@chatroom', roomName: '产品研发群', isGroup: true })], // 同名
['t3', mk({ tableName: 't3', username: 'wxid_x', roomName: '产品研发群', isGroup: false })], // DM 同名
['t4', mk({ tableName: 't4', username: 'g3@chatroom', roomName: '家庭群', isGroup: true })],
]));
const hits = r.findRoomsByName('产品研发群');
expect(hits).toHaveLength(2);
expect(hits.map(h => h.username).sort()).toEqual(['g1@chatroom', 'g2@chatroom']);
});
it('returns empty when no match', () => {
const r = new RoomResolver();
expect(r.findRoomsByName('不存在的群')).toEqual([]);
});
it('returns empty for empty/whitespace input', () => {
const r = new RoomResolver();
expect(r.findRoomsByName('')).toEqual([]);
expect(r.findRoomsByName(' ')).toEqual([]);
});
it('trims both sides before comparison', () => {
const r = new RoomResolver();
r._setMap(new Map([
['t1', mk({ tableName: 't1', username: 'g1@chatroom', roomName: '产品研发群 ', isGroup: true })],
]));
expect(r.findRoomsByName(' 产品研发群 ')).toHaveLength(1);
});
});
- Step 2: 跑测试确认失败
Run: pnpm --filter @neta/backend test -- runtime/weixin_db/room_resolver_find 2>&1 | tail -10
Expected: FAIL("findRoomsByName is not a function")
- Step 3: 修
room_resolver.ts,在lookup方法之后插入
/**
* 按 roomName 反查所有匹配的群(仅 isGroup=true)。
* 同名群可能多个,由调用方决定如何取舍。
* 比较前 trim 双侧,抗首尾空格 / 用户输入与 contact.db 显示名细微差异。
*/
findRoomsByName(roomName: string): RoomInfo[] {
const needle = (roomName ?? '').trim();
if (!needle) return [];
const result: RoomInfo[] = [];
for (const info of this.roomsByTable.values()) {
if (info.isGroup && info.roomName.trim() === needle) result.push(info);
}
return result;
}
- Step 4: 跑测试通过
Run: pnpm --filter @neta/backend test -- runtime/weixin_db/room_resolver_find 2>&1 | tail -10
Expected: 2 passed
- Step 5: Commit
git add packages/backend/src/modules/netaclaw/runtime/weixin_db/room_resolver.ts \
packages/backend/test/modules/netaclaw/runtime/weixin_db/room_resolver_find.test.ts
git commit -m "feat(weixin-db): RoomResolver.findRoomsByName 名字反查群信息"
Task 8: WeixinDbService.getRuntime + ChannelRuntime.messageKey
Files:
- Modify:
packages/backend/src/modules/netaclaw/service/weixin_db.ts
archive sync 需要拿到已 bind 的 paths / messageKey / reader.roomResolver 等运行态,本 task 暴露这些。
- Step 1: 修
ChannelRuntime接口加messageKey: Buffer
定位接口定义(约 16-23 行),改为:
interface ChannelRuntime {
channelId: number;
reader: IncrementalReader;
watcher: WalWatcher;
weixinPid: number;
paths: DbPaths;
messageKey: Buffer;
onInbound: (channelId: number, pseudo: unknown) => Promise<void>;
}
- Step 2: 修
bindChannel,在this.runtimes.set(channel.id, { ... })处加messageKey
定位 this.runtimes.set(channel.id, { channelId: channel.id, reader, watcher, weixinPid: keys.pid, paths, onInbound });(约 129-132 行),改为:
this.runtimes.set(channel.id, {
channelId: channel.id,
reader,
watcher,
weixinPid: keys.pid,
paths,
messageKey: Buffer.from(msgKeyHex, 'hex'),
onInbound,
});
- Step 3: 类型暴露 + 加
getRuntime方法
在文件顶部已有 interface ChannelRuntime 那一行,改为 export interface ChannelRuntime 让外部可 import。
在 unbindChannel 方法之前(或类内任意位置)添加:
/**
* 暴露已 bind 的 runtime 给同进程内其他服务(如 weixin_archive_sync)。
* 未 bind 返回 undefined。
*/
getRuntime(channelId: number): ChannelRuntime | undefined {
return this.runtimes.get(channelId);
}
- Step 4: 跑 weixin_db.ts 现有测试零回归
Run: pnpm --filter @neta/backend test -- service/weixin_db 2>&1 | tail -10
Expected: 全 pass
- Step 5: Commit
git add packages/backend/src/modules/netaclaw/service/weixin_db.ts
git commit -m "feat(weixin-db): ChannelRuntime 暴露 messageKey + getRuntime(cid) 公开访问器"
Task 9: IncrementalReader 暴露 roomResolver(供 archive 名字反查)
Files:
-
Modify:
packages/backend/src/modules/netaclaw/runtime/weixin_db/incremental_reader.ts -
Step 1: 暴露 getter
定位 private roomResolver = new RoomResolver(); 这一行(约 19 行),保留 private 字段,在类内 resolveRoom 方法之后加:
/** 暴露给同进程其他服务(如 archive_sync)调用 findRoomsByName / 其他 RoomResolver API。 */
getRoomResolver(): RoomResolver {
return this.roomResolver;
}
- Step 2: 跑现有测试零回归
Run: pnpm --filter @neta/backend test -- runtime/weixin_db 2>&1 | tail -10
Expected: 全 pass
- Step 3: Commit
git add packages/backend/src/modules/netaclaw/runtime/weixin_db/incremental_reader.ts
git commit -m "feat(weixin-db): IncrementalReader.getRoomResolver 公开访问器"
Phase E · 主服务 WeixinArchiveSyncService(TDD)
Task 10: 服务 mock 单测
Files:
- Create:
packages/backend/test/modules/netaclaw/service/weixin_archive_sync.test.ts
本 task 只写测试。
WeixinArchiveSyncService类不存在 → 测试理应 FAIL。Task 11 写实现让测试通过。
- Step 1: 写测试
import * as fs from 'node:fs';
import * as os from 'node:os';
import * as path from 'node:path';
import { WeixinArchiveSyncService } from '../../../../src/modules/netaclaw/service/weixin_archive_sync.js';
function mkLogger() {
return { info: jest.fn(), warn: jest.fn(), error: jest.fn(), debug: jest.fn() };
}
function tmpDataDir(): string {
return fs.mkdtempSync(path.join(os.tmpdir(), 'arcsync-'));
}
function mkRuntime(overrides: any = {}) {
return {
channelId: 6,
paths: { messageDb: '/fake/message_0.db' },
messageKey: Buffer.alloc(32),
reader: {
getRoomResolver: () => ({
findRoomsByName: jest.fn().mockReturnValue([
{ tableName: 'Msg_abc', username: 'gA@chatroom', roomName: '产品研发群', isGroup: true },
]),
}),
},
...overrides,
};
}
function mkGroup(over: any = {}) {
return { id: 1, channelId: 6, roomId: 'gA@chatroom', roomName: '产品研发群', ...over };
}
function mkChannel(over: any = {}) {
return { id: 6, type: 'weixin-db', loginStatus: 'connected', ...over };
}
describe('WeixinArchiveSyncService.syncGroup', () => {
let svc: WeixinArchiveSyncService;
let dataDir: string;
const dirsToCleanup: string[] = [];
beforeEach(() => {
dataDir = tmpDataDir();
dirsToCleanup.push(dataDir);
svc = new WeixinArchiveSyncService();
(svc as any).logger = mkLogger();
(svc as any).resolveDataDir = () => dataDir;
(svc as any).platform = 'win32';
// mock decryptDbToWorkDir to return a fake path (后续会被 MessageRepo 调用 — 也 mock 掉)
(svc as any).decryptDb = jest.fn().mockReturnValue('/fake/plain.db');
// mock 注入: 创建一个不真打开 sqlite 的 MessageRepo
(svc as any).openMessageRepo = jest.fn().mockReturnValue({
listSince: jest.fn()
.mockResolvedValueOnce([
{ localId: 1, serverId: 100n, localType: 1, realSenderId: null, createTime: 100,
content: 'wxid_a:\nhi', tableName: 'Msg_abc' },
{ localId: 2, serverId: 101n, localType: 1, realSenderId: null, createTime: 101,
content: 'wxid_b:\nyo', tableName: 'Msg_abc' },
])
.mockResolvedValueOnce([]),
close: jest.fn(),
});
});
afterAll(() => {
for (const d of dirsToCleanup) {
try { fs.rmSync(d, { recursive: true, force: true }); } catch { /* ignore */ }
}
});
it('rejects non-Windows platforms', async () => {
(svc as any).platform = 'linux';
(svc as any).channelRepo = { findOne: jest.fn().mockResolvedValue(mkChannel()) };
(svc as any).groupRepo = { findOne: jest.fn().mockResolvedValue(mkGroup()) };
(svc as any).weixinDbService = { getRuntime: () => mkRuntime() };
await expect(svc.syncGroup(1)).rejects.toThrow(/unsupported-platform/);
});
it('rejects when channel not weixin-db', async () => {
(svc as any).channelRepo = { findOne: jest.fn().mockResolvedValue(mkChannel({ type: 'weixin' })) };
(svc as any).groupRepo = { findOne: jest.fn().mockResolvedValue(mkGroup()) };
(svc as any).weixinDbService = { getRuntime: () => mkRuntime() };
await expect(svc.syncGroup(1)).rejects.toThrow(/unsupported-channel-type/);
});
it('rejects when loginStatus !== connected', async () => {
(svc as any).channelRepo = { findOne: jest.fn().mockResolvedValue(mkChannel({ loginStatus: 'disconnected' })) };
(svc as any).groupRepo = { findOne: jest.fn().mockResolvedValue(mkGroup()) };
(svc as any).weixinDbService = { getRuntime: () => mkRuntime() };
await expect(svc.syncGroup(1)).rejects.toThrow(/channel-not-connected/);
});
it('rejects when runtime not bound', async () => {
(svc as any).channelRepo = { findOne: jest.fn().mockResolvedValue(mkChannel()) };
(svc as any).groupRepo = { findOne: jest.fn().mockResolvedValue(mkGroup()) };
(svc as any).weixinDbService = { getRuntime: () => undefined };
await expect(svc.syncGroup(1)).rejects.toThrow(/channel-not-bound/);
});
it('rejects when room not found in resolver', async () => {
const rt = mkRuntime();
(rt.reader.getRoomResolver as any) = () => ({ findRoomsByName: () => [] });
(svc as any).channelRepo = { findOne: jest.fn().mockResolvedValue(mkChannel()) };
(svc as any).groupRepo = { findOne: jest.fn().mockResolvedValue(mkGroup()) };
(svc as any).weixinDbService = { getRuntime: () => rt };
await expect(svc.syncGroup(1)).rejects.toThrow(/room-not-found/);
});
it('happy path: writes rows to archive db + updates sync_state', async () => {
(svc as any).channelRepo = { findOne: jest.fn().mockResolvedValue(mkChannel()) };
(svc as any).groupRepo = { findOne: jest.fn().mockResolvedValue(mkGroup()) };
(svc as any).weixinDbService = { getRuntime: () => mkRuntime() };
const result = await svc.syncGroup(1);
expect(result.newCount).toBe(2);
expect(result.totalCount).toBe(2);
expect(result.lastSyncTs).toBe(101);
expect(result.durationMs).toBeGreaterThanOrEqual(0);
// 第二次同步: 数据没变, mock 返回空
(svc as any).openMessageRepo = jest.fn().mockReturnValue({
listSince: jest.fn().mockResolvedValue([]),
close: jest.fn(),
});
const r2 = await svc.syncGroup(1);
expect(r2.newCount).toBe(0);
expect(r2.totalCount).toBe(2);
expect(r2.lastSyncTs).toBe(101); // 水位不变
});
it('uses isolated archive workDir (not the listener cid-<id>/ root)', async () => {
(svc as any).channelRepo = { findOne: jest.fn().mockResolvedValue(mkChannel()) };
(svc as any).groupRepo = { findOne: jest.fn().mockResolvedValue(mkGroup()) };
(svc as any).weixinDbService = { getRuntime: () => mkRuntime() };
const decryptSpy = jest.fn().mockReturnValue('/fake/plain.db');
(svc as any).decryptDb = decryptSpy;
await svc.syncGroup(1);
const passedWorkDir = decryptSpy.mock.calls[0][2] as string;
// 必须以 .../archive 结尾, 不能直接是 cid-6
expect(passedWorkDir.replace(/\\/g, '/')).toMatch(/cid-6\/archive$/);
});
it('writes lastError to sync_state on listSince failure, then rethrows', async () => {
(svc as any).channelRepo = { findOne: jest.fn().mockResolvedValue(mkChannel()) };
(svc as any).groupRepo = { findOne: jest.fn().mockResolvedValue(mkGroup()) };
(svc as any).weixinDbService = { getRuntime: () => mkRuntime() };
(svc as any).openMessageRepo = jest.fn().mockReturnValue({
listSince: jest.fn().mockRejectedValue(new Error('disk-corrupt')),
close: jest.fn(),
});
await expect(svc.syncGroup(1)).rejects.toThrow(/disk-corrupt/);
// 用 ArchiveDb 直接读 sync_state, 应有 last_error
const { ArchiveDb } = await import('../../../../src/modules/netaclaw/runtime/weixin_db/archive_db.js');
const db = ArchiveDb.openOrCreate(6, dataDir);
try {
const st = db.getSyncState('gA@chatroom');
expect(st).not.toBeNull();
expect(st!.lastError).toMatch(/disk-corrupt/);
} finally {
db.close();
}
});
it('closes archive db even when error occurs after open', async () => {
(svc as any).channelRepo = { findOne: jest.fn().mockResolvedValue(mkChannel()) };
(svc as any).groupRepo = { findOne: jest.fn().mockResolvedValue(mkGroup()) };
(svc as any).weixinDbService = { getRuntime: () => mkRuntime() };
(svc as any).openMessageRepo = jest.fn().mockReturnValue({
listSince: jest.fn().mockRejectedValue(new Error('boom')),
close: jest.fn(),
});
await expect(svc.syncGroup(1)).rejects.toThrow();
// 第二次同步成功说明 db 句柄已释放
(svc as any).openMessageRepo = jest.fn().mockReturnValue({
listSince: jest.fn().mockResolvedValue([]),
close: jest.fn(),
});
await expect(svc.syncGroup(1)).resolves.toBeTruthy();
});
});
- Step 2: 跑测试确认失败
Run: pnpm --filter @neta/backend test -- service/weixin_archive_sync.test 2>&1 | tail -10
Expected: FAIL("Cannot find module .../weixin_archive_sync")
Task 11: 主服务 weixin_archive_sync.ts 实现
Files:
-
Create:
packages/backend/src/modules/netaclaw/service/weixin_archive_sync.ts -
Step 1: 写实现
import * as fs from 'node:fs';
import * as path from 'node:path';
import { Provide, Scope, ScopeEnum, Logger, Inject } from '@midwayjs/core';
import type { ILogger } from '@midwayjs/logger';
import { InjectEntityModel } from '@midwayjs/typeorm';
import { Repository } from 'typeorm';
import { resolveDataDir } from '../../../comm/data-dir.js';
import { NetaClawAgentChannelEntity } from '../entity/agent_channel.js';
import { NetaClawAgentChannelGroupEntity } from '../entity/agent_channel_group.js';
import { WeixinDbService } from './weixin_db.js';
import { ArchiveDb } from '../runtime/weixin_db/archive_db.js';
import { MessageRepo } from '../runtime/weixin_db/message_repo.js';
import { projectToArchive } from '../runtime/weixin_db/project_archive.js';
import { decryptDbToWorkDir } from '../runtime/weixin_db/archive_decryptor.js';
export interface SyncResult {
newCount: number;
totalCount: number;
lastSyncTs: number;
durationMs: number;
}
const BATCH_LIMIT = 500;
@Provide()
@Scope(ScopeEnum.Singleton)
export class WeixinArchiveSyncService {
@Logger() logger: ILogger;
@InjectEntityModel(NetaClawAgentChannelEntity)
channelRepo: Repository<NetaClawAgentChannelEntity>;
@InjectEntityModel(NetaClawAgentChannelGroupEntity)
groupRepo: Repository<NetaClawAgentChannelGroupEntity>;
@Inject() weixinDbService: WeixinDbService;
/**
* 测试钩子: 默认走 process.platform / resolveDataDir / decryptDbToWorkDir / new MessageRepo,
* 测试可在实例上 monkey-patch 这些字段来注入 mock。
*/
platform: NodeJS.Platform = process.platform;
resolveDataDir = (): string => resolveDataDir();
decryptDb = (srcDb: string, rawKey: Buffer, workDir: string): string =>
decryptDbToWorkDir({ srcDb, rawKey, workDir });
openMessageRepo = (plainDb: string): {
listSince(tableName: string, lastTs: number, limit?: number): Promise<any[]>;
close(): void;
} => new MessageRepo(plainDb);
private readonly channelLocks = new Map<number, Promise<void>>();
/**
* 同步指定群的聊天记录到 archive sqlite。channel-level mutex 串行,跨 channel 并行。
*/
async syncGroup(groupId: number): Promise<SyncResult> {
const group = await this.groupRepo.findOne({ where: { id: groupId } });
if (!group) throw new Error('group-not-found');
const cid = group.channelId;
const prev = this.channelLocks.get(cid) ?? Promise.resolve();
let release: () => void;
const next = new Promise<void>(r => (release = r));
this.channelLocks.set(cid, prev.then(() => next));
await prev;
try {
return await this.doSyncGroup(group);
} finally {
release!();
if (this.channelLocks.get(cid) === next) this.channelLocks.delete(cid);
}
}
/**
* channel 删除时 cascade 清掉对应 archive 文件,避免泄漏。
* 找不到文件不报错。
*/
async deleteChannelArchive(channelId: number): Promise<void> {
const file = ArchiveDb.archivePath(channelId, this.resolveDataDir());
for (const suffix of ['', '-wal', '-shm']) {
const p = file + suffix;
if (fs.existsSync(p)) {
try { fs.unlinkSync(p); }
catch (err: any) {
this.logger.warn('[archive] rm %s failed: %s', p, err?.message || err);
}
}
}
}
private async doSyncGroup(group: NetaClawAgentChannelGroupEntity): Promise<SyncResult> {
if (this.platform !== 'win32') throw new Error('unsupported-platform');
const channel = await this.channelRepo.findOne({ where: { id: group.channelId } });
if (!channel) throw new Error('channel-not-found');
if (channel.type !== 'weixin-db') throw new Error('unsupported-channel-type');
if (channel.loginStatus !== 'connected') throw new Error('channel-not-connected');
const runtime = this.weixinDbService.getRuntime(channel.id);
if (!runtime) throw new Error('channel-not-bound');
// 找群 (按名字, trim 比较抗首尾空格)
const resolver = runtime.reader.getRoomResolver();
const matches = resolver.findRoomsByName(group.roomName ?? '');
if (matches.length === 0) throw new Error('room-not-found');
if (matches.length > 1) {
this.logger.warn('[archive] %d rooms with same name=%s, using first', matches.length, group.roomName);
}
const room = matches[0];
const dataDir = this.resolveDataDir();
// ★ 关键: archive 用独立子目录, 避免与监听 (cid-<id>/) 的 IncrementalReader
// 每 500ms 覆盖写 message_0.db 解密文件的竞争
const workDir = path.join(dataDir, 'weixin-db-work', `cid-${channel.id}`, 'archive');
const t0 = Date.now();
// 解出 room 之后才打开 archive db; 这样失败时能写 sync_state.last_error
const archive = ArchiveDb.openOrCreate(channel.id, dataDir);
let cursorTs = archive.getSyncState(room.username)?.lastSyncTs ?? 0;
let newCount = 0;
let plainDb: string;
try {
plainDb = this.decryptDb(runtime.paths.messageDb, runtime.messageKey, workDir);
} catch (err: any) {
this.logger.error('[archive] decrypt failed cid=%s: %s', channel.id, err?.message || err);
this.tryWriteLastError(archive, room.username, room.roomName, `decrypt-failed: ${err?.message || err}`);
archive.close();
throw new Error('decrypt-failed');
}
let repo: ReturnType<WeixinArchiveSyncService['openMessageRepo']> | null = null;
try {
repo = this.openMessageRepo(plainDb);
while (true) {
const rows = await repo.listSince(room.tableName, cursorTs, BATCH_LIMIT);
if (!rows || rows.length === 0) break;
const archiveRows = rows.map(r => projectToArchive(r, channel.id, room));
newCount += archive.insertMessages(archiveRows);
cursorTs = rows[rows.length - 1].createTime;
if (rows.length < BATCH_LIMIT) break;
}
const totalCount = archive.countMessages(room.username);
archive.updateSyncState(room.username, {
roomName: room.roomName,
lastSyncTs: cursorTs,
lastSyncCount: newCount,
totalCount,
lastSyncAt: Math.floor(Date.now() / 1000),
lastError: null,
});
const durationMs = Date.now() - t0;
this.logger.info('[archive] sync cid=%s room=%s new=%d total=%d ts=%d in %dms',
channel.id, group.roomName, newCount, totalCount, cursorTs, durationMs);
return { newCount, totalCount, lastSyncTs: cursorTs, durationMs };
} catch (err: any) {
this.tryWriteLastError(archive, room.username, room.roomName, err?.message || String(err));
throw err;
} finally {
try { repo?.close(); } catch { /* ignore */ }
try { archive.close(); } catch { /* ignore */ }
}
}
/**
* best-effort 写 sync_state.last_error; 写失败也吞掉, 不掩盖原异常。
*/
private tryWriteLastError(
archive: ArchiveDb,
roomId: string,
roomName: string | null,
message: string,
): void {
try {
archive.updateSyncState(roomId, {
roomName,
lastSyncAt: Math.floor(Date.now() / 1000),
lastError: message,
});
} catch (e: any) {
this.logger.warn('[archive] write lastError failed: %s', e?.message || e);
}
}
}
- Step 2: 跑测试通过
Run: pnpm --filter @neta/backend test -- service/weixin_archive_sync.test 2>&1 | tail -15
Expected: 6 passed
- Step 3: Commit
git add packages/backend/src/modules/netaclaw/service/weixin_archive_sync.ts \
packages/backend/test/modules/netaclaw/service/weixin_archive_sync.test.ts
git commit -m "feat(weixin-archive): WeixinArchiveSyncService syncGroup 主流程 (mutex + 增量水位)"
Task 12: 并发测试(channel-level mutex)
Files:
-
Create:
packages/backend/test/modules/netaclaw/service/weixin_archive_sync.concurrency.test.ts -
Step 1: 写测试
import { WeixinArchiveSyncService } from '../../../../src/modules/netaclaw/service/weixin_archive_sync.js';
function mkLogger() {
return { info: jest.fn(), warn: jest.fn(), error: jest.fn(), debug: jest.fn() };
}
describe('WeixinArchiveSyncService concurrency', () => {
it('serializes calls within the same channel', async () => {
const svc = new WeixinArchiveSyncService();
(svc as any).logger = mkLogger();
const startTimes: Record<number, number> = {};
const endTimes: Record<number, number> = {};
// mock doSyncGroup so we don't touch fs / db
let counter = 0;
(svc as any).doSyncGroup = async (group: any) => {
const id = ++counter;
startTimes[id] = Date.now();
await new Promise(r => setTimeout(r, 50));
endTimes[id] = Date.now();
return { newCount: 0, totalCount: 0, lastSyncTs: 0, durationMs: 50 };
};
(svc as any).groupRepo = {
findOne: jest.fn().mockImplementation(async ({ where: { id } }: any) => ({ id, channelId: 1 })),
};
// 同 channel 三个并发请求 → 应串行
await Promise.all([svc.syncGroup(11), svc.syncGroup(12), svc.syncGroup(13)]);
expect(startTimes[2]).toBeGreaterThanOrEqual(endTimes[1]);
expect(startTimes[3]).toBeGreaterThanOrEqual(endTimes[2]);
});
it('runs in parallel across different channels', async () => {
const svc = new WeixinArchiveSyncService();
(svc as any).logger = mkLogger();
const startTimes: Record<number, number> = {};
(svc as any).doSyncGroup = async (group: any) => {
startTimes[group.id] = Date.now();
await new Promise(r => setTimeout(r, 50));
return { newCount: 0, totalCount: 0, lastSyncTs: 0, durationMs: 50 };
};
(svc as any).groupRepo = {
findOne: jest.fn()
.mockResolvedValueOnce({ id: 21, channelId: 1 })
.mockResolvedValueOnce({ id: 22, channelId: 2 }),
};
await Promise.all([svc.syncGroup(21), svc.syncGroup(22)]);
// 两个 channel 同时启动,差距 < 20ms
expect(Math.abs(startTimes[21] - startTimes[22])).toBeLessThan(20);
});
});
- Step 2: 跑测试通过
Run: pnpm --filter @neta/backend test -- service/weixin_archive_sync.concurrency 2>&1 | tail -10
Expected: 2 passed
- Step 3: Commit
git add packages/backend/test/modules/netaclaw/service/weixin_archive_sync.concurrency.test.ts
git commit -m "test(weixin-archive): channel-level mutex 串行/并行行为"
Phase F · 接入业务层 + 前端
Task 13: agent_channel_group.service.sync 委托
Files:
-
Modify:
packages/backend/src/modules/netaclaw/service/agent_channel_group.ts -
Step 1: 加 Inject + 委托方法
文件顶部 import 加:
import { Inject } from '@midwayjs/core';
import { WeixinArchiveSyncService, type SyncResult } from './weixin_archive_sync.js';
(Provide / Scope 等已有,只补 Inject 即可。)
在类内任意位置(推荐放 list 方法之后)加:
@Inject() archiveSyncService: WeixinArchiveSyncService;
/** 委托给 WeixinArchiveSyncService 执行同步。 */
async sync(groupId: number): Promise<SyncResult> {
return this.archiveSyncService.syncGroup(groupId);
}
- Step 2: Commit
git add packages/backend/src/modules/netaclaw/service/agent_channel_group.ts
git commit -m "feat(netaclaw): agent_channel_group.sync 委托给 WeixinArchiveSyncService"
Task 14: Controller endpoint POST /sync
Files:
-
Modify:
packages/backend/src/modules/netaclaw/controller/admin/agent_channel_group.ts -
Step 1: 加 endpoint
错误码到文案的映射放在 controller 层(service 只 throw token),前端不用关心字符串内容。
在 controller 类内任意位置(推荐放 delete 方法之前)添加:
@Post('/sync')
async sync(@Body() body: { groupId: number }) {
if (!body?.groupId) return { code: 1003, message: 'groupId is required' };
try {
const data = await this.groupService.sync(body.groupId);
return { code: 1000, data };
} catch (err: any) {
const token = err?.message || 'sync-failed';
const message = ERROR_MESSAGES[token] ?? `同步失败: ${token}`;
return { code: 1003, message };
}
}
并在文件顶部(import 之后,装饰器之前)添加错误码字典:
const ERROR_MESSAGES: Record<string, string> = {
'group-not-found': '群记录不存在',
'channel-not-found': '频道记录不存在',
'unsupported-channel-type': '本频道不支持同步',
'channel-not-connected': '频道未连接,请检查 PC 微信是否登录',
'channel-not-bound': '频道未连接,请检查 PC 微信是否登录',
'room-not-found': '未找到该群消息,请检查群名是否与 PC 微信中显示的完全一致',
'decrypt-failed': '解密失败,可能登录态已失效,请重启 PC 微信后重试',
'unsupported-platform': '本功能需要 Windows + PC 微信',
};
- Step 2: 启 backend 验证路由注册
Run: pnpm --filter @neta/backend dev & (后台启动)
然后:
curl -s -X POST http://localhost:8003/admin/netaclaw/agent_channel_group/sync \
-H 'Content-Type: application/json' \
-H 'Authorization: <你的 token>' \
-d '{"groupId": 1}'
Expected: 返回 JSON(成功或合理的错误信息,不是 404)。看到结果后 kill 后台 backend。
- Step 3: Commit
git add packages/backend/src/modules/netaclaw/controller/admin/agent_channel_group.ts
git commit -m "feat(netaclaw): POST /admin/netaclaw/agent_channel_group/sync 端点 + 错误码映射"
Task 15: agent_channel.service.delete cascade rm archive 文件
Files:
-
Modify:
packages/backend/src/modules/netaclaw/service/agent_channel.ts -
Step 1: 找到
delete方法
文件中现有(参考已读过的 126-133 行):
async delete(ids: number[]) {
for (const id of ids) {
this.stopRunner(id);
await this.groupService.cascadeDeleteByChannel(id);
this.weixinDbService.unbindChannel(id);
}
await this.channelRepo.delete(ids);
}
- Step 2: 加 archive 清理 + Inject
类顶部 Inject 块加:
@Inject() archiveSyncService: WeixinArchiveSyncService;
(同时文件顶部 import 加 import { WeixinArchiveSyncService } from './weixin_archive_sync.js';)
delete 方法循环内加:
async delete(ids: number[]) {
for (const id of ids) {
this.stopRunner(id);
await this.groupService.cascadeDeleteByChannel(id);
this.weixinDbService.unbindChannel(id);
await this.archiveSyncService.deleteChannelArchive(id);
}
await this.channelRepo.delete(ids);
}
- Step 3: 跑 agent_channel 现有测试零回归
Run: pnpm --filter @neta/backend test -- service/agent_channel 2>&1 | tail -10
Expected: 全 pass
- Step 4: Commit
git add packages/backend/src/modules/netaclaw/service/agent_channel.ts
git commit -m "feat(netaclaw): channel.delete cascade 清理 weixin-archive sqlite 文件"
Task 16: 前端添加"同步聊天记录"按钮 + handler
Files:
-
Modify:
packages/frontend/src/modules/agent/components/channel-group-panel.vue -
Step 1: 给 GroupItem 加
_syncing字段
文件内有 interface GroupItem 或类似类型。定位后,在 client-only 临时字段那一段加(若没有相应 interface,直接在 ref 初始化时挂):
interface GroupItem {
id: number;
channelId: number;
roomId: string;
roomName: string | null;
status: number;
/* ... 其他既有字段 ... */
_syncing?: boolean;
}
(若 GroupItem 已用 inline 类型 / any,则不需要改;直接在模板/handler 用 group._syncing 即可。)
- Step 2: 模板加按钮
定位 group-card__actions div(约 83-87 行):
<div class="group-card__actions">
<el-button size="small" @click="jumpToChat(group)">查看对话记录</el-button>
<el-button size="small" type="primary" @click="handleSavePolicy(group)">保存策略</el-button>
<el-button size="small" type="danger" link @click="handleDelete(group)">删除</el-button>
</div>
改为(同时禁用原"查看对话记录"按钮,它会跳到一个不存在的 chat session,会让用户疑惑;留 tooltip 说明):
<div class="group-card__actions">
<el-button
size="small"
:loading="group._syncing"
@click="handleSync(group)"
>
{{ group._syncing ? '同步中...' : '同步聊天记录' }}
</el-button>
<el-tooltip content="本功能开发中,请先用'同步聊天记录'按钮把消息存档到本地" placement="top">
<el-button size="small" disabled>查看对话记录</el-button>
</el-tooltip>
<el-button size="small" type="primary" @click="handleSavePolicy(group)">保存策略</el-button>
<el-button size="small" type="danger" link @click="handleDelete(group)">删除</el-button>
</div>
- Step 3: 在
<script setup>加 handler
定位 function jumpToChat 或 function handleDelete,在其旁边加:
async function handleSync(group: GroupItem) {
if (group._syncing) return;
group._syncing = true;
try {
const resp = await apiPost('/admin/netaclaw/agent_channel_group/sync', { groupId: group.id });
if (resp.code !== 1000) {
ElMessage.error(resp.message || '同步失败');
return;
}
const { newCount, totalCount, durationMs } = resp.data as {
newCount: number; totalCount: number; durationMs: number;
};
ElMessage.success(`已同步 ${newCount} 条新消息(总计 ${totalCount} 条,${(durationMs / 1000).toFixed(1)}s)`);
} catch (err: any) {
ElMessage.error(err?.message || '同步失败');
} finally {
group._syncing = false;
}
}
- Step 4: type-check + 手工冒烟
Run: pnpm --filter @neta/frontend type-check 2>&1 | tail -10
Expected: 无新增 error(若 GroupItem 是 any 类型则无需改)
启动前后端:
pnpm --filter @neta/backend dev &
pnpm --filter @neta/frontend dev &
浏览器打开群聊管理页 → 点"同步聊天记录"按钮 → 看到 loading + 完成后 toast 显示 "已同步 N 条新消息(总计 M 条)" 即通过。
- Step 5: Commit
git add packages/frontend/src/modules/agent/components/channel-group-panel.vue
git commit -m "feat(agent-fe): 群卡片新增 '同步聊天记录' 按钮 + handler"
Phase G · 端到端手工冒烟 + 验证报告
Task 17: E2E checklist + 验证报告
Files:
- Create:
docs/superpowers/followups/2026-05-13-weixin-archive-sync-e2e.md
前置:
- Windows + Weixin 4.x 已登录 + 至少一个测试群
- 频道 type=weixin-db 已 bind 成功(
loginStatus='connected') - 该群已通过"+ 添加群"加入白名单(
netaclaw_agent_channel_group表有对应 row)
Checklist:
-
E2E-1: 群里先发 3 条文本消息(为了让 archive 有内容可拉)
-
E2E-2: 前端群卡片 → 点"同步聊天记录" → 按钮 loading → 几秒后 toast 显示 "已同步 N 条新消息(总计 M 条)",N ≥ 3
-
E2E-3: 在文件系统中确认
<dataDir>/weixin-archive/cid-<channelId>.db存在,大小 > 0 -
E2E-4: 用 sqlite3 CLI 或 DB Browser 打开 archive 文件,
SELECT COUNT(*) FROM message WHERE room_id=<群的wxid@chatroom>≥ 3 -
E2E-5:
SELECT * FROM sync_state含一行,last_sync_ts > 0,last_error IS NULL -
E2E-6: 再点一次"同步聊天记录"(中间不发新消息) → toast 显示 "已同步 0 条新消息(总计 M 条)",说明增量幂等
-
E2E-7: 群里再发 2 条 → 同步 → toast "已同步 2 条新消息(总计 M+2 条)"
-
E2E-8: 把频道改成禁用(
status=0)再点同步 → toast 显示 "频道未连接,请检查 PC 微信是否登录" 或类似错误(说明错误码映射工作) -
E2E-9: 改一个不存在的群名(
groupId对应的 group.roomName 改为不存在的) → 同步 → toast "未找到该群消息..." -
E2E-10: 在前端"删除频道" → 文件系统中
<dataDir>/weixin-archive/cid-<channelId>.db应消失 -
E2E-11: 跨群并发(同 channel 两个群同时点)→ 应顺序完成,无 db 损坏
-
E2E-12: Linux/Mac 上跑同步 → toast "本功能需要 Windows + PC 微信"
-
Step 1: 逐条手工跑
-
Step 2: 写验证报告
mkdir -p docs/superpowers/followups
把发现写入 docs/superpowers/followups/2026-05-13-weixin-archive-sync-e2e.md:
# weixin-archive sync E2E 验证 (2026-05-13)
## 环境
- Weixin 版本: <填>
- channel id / name: <填>
- group id / roomName: <填>
## Checklist 结果
- [x] E2E-1 ...
- [x] E2E-2 ...
- [ ] E2E-X 失败原因: <填>
## 已知问题 / Follow-ups
- (若有发现的小问题但不阻塞合并,记在这里)
- Step 3: Commit
git add docs/superpowers/followups/2026-05-13-weixin-archive-sync-e2e.md
git commit -m "docs(weixin-archive): E2E 验证报告"
自检 (Self-Review)
1. Spec 覆盖:
| Spec 章节 | 覆盖 Task |
|---|---|
| §1 背景与目标 + out-of-scope | 整个 plan 不做查看 UI / 附件解密 / 监听变更 |
| §2.1 触发(同步聊天记录按钮) | Task 16 |
| §2.2 首次全量 + 后续增量(水位) | Task 11 cursorTs = state?.lastSyncTs ?? 0 |
| §2.3 错误反馈映射 | Task 14 ERROR_MESSAGES 字典 |
| §3 整体架构(mutex + decrypt + resolve + INSERT OR IGNORE) | Task 11 doSyncGroup |
| §3.1 ★ 工作目录隔离(archive 用 cid-/archive 子目录) | Task 11 workDir = path.join(dataDir, 'weixin-db-work', \cid-${cid}`, 'archive')` + Task 10 新增 "uses isolated archive workDir" 测试 |
| §3.2 资源释放 + best-effort lastError | Task 11 双层 try/finally + tryWriteLastError + Task 10 新增 "writes lastError" / "closes archive db even when error" 测试 |
§4.1 文件位置 <dataDir>/weixin-archive/cid-<id>.db |
Task 5 archivePath |
| §4.2 表结构 message / sync_state | Task 5 CREATE SQL |
| §5.1 新文件:archive_db / archive_decryptor / archive_types / sender_parser / project_archive / weixin_archive_sync | Task 1 / 2 / 3 / 5 / 6 / 11 |
| §5.2 修改:incremental_reader / build_pseudo / room_resolver / weixin_db / agent_channel / agent_channel_group / controller / 前端 | Task 1 / 2 / 7 / 8 / 9 / 13 / 14 / 15 / 16 |
| §5.3 API 契约 POST /sync | Task 14 |
| §5.3 Service 接口 syncGroup / deleteChannelArchive | Task 11 / 15 |
| §5.4 Channel-level mutex | Task 11 + Task 12 验证 |
| §5.5 流式分批 500 LIMIT | Task 11 BATCH_LIMIT |
| §6 前端按钮 + handler | Task 16 |
| §6.4 list 元数据 | spec 已显式标 out-of-scope,无对应 task,一致 |
| §7 错误码 9 场景 | Task 14 字典 + Task 10 测试 5 个错误分支 |
| §8.1 单元测试 archive_db / weixin_archive_sync / concurrency / parseSenderPrefix | Task 4 / 10 / 12 / 2 |
| §8.3 E2E checklist | Task 17 |
| §10 群名 trim 兜底 | Task 7 实现 + 单测 "trims both sides" |
2. Placeholder 扫描:
- 无 TBD / TODO。
- 所有 Step 都有具体代码或具体命令。
- "找到 list 方法之后插入" 这类定位说明都给了行号或上下文。
3. 类型一致性:
ArchiveMessage字段在 Task 3 / 4 / 5 / 6 / 11 一致(channelId/roomId/roomName/msgTable/localId/serverId/localType/senderWxid/createTime/content/rawHex)SyncState字段在 Task 3 / 4 / 5 / 11 一致SyncResult字段在 Task 10 / 11 / 16 一致(newCount / totalCount / lastSyncTs / durationMs)- 错误码 token 在 Task 10 / 11 / 14 一致(channel-not-connected / room-not-found 等)
WeixinDbService.getRuntime返回ChannelRuntime | undefined,Task 8 定义 / Task 11 使用一致IncrementalReader.getRoomResolver()返回RoomResolver,Task 9 定义 / Task 11 使用.findRoomsByName一致
4. 跨 Phase 衔接:
- Phase A 抽公共函数 → Phase B/C/E 直接 import
- Phase D 暴露 runtime 访问器 → Phase E 通过
getRuntime/getRoomResolver拿到运行态 - Phase F 接 service / controller / 前端 → Phase G E2E 验证整链路
5. DEV 可行性:
- Phase A-E 全部可在 Linux/Mac 跑单测(better-sqlite3 跨平台,mock 掉 platform / decryptDb 即可)
- Phase G E2E 需要 Windows + Weixin 登录
Execution Handoff
Plan 完整保存在 docs/superpowers/plans/2026-05-13-weixin-archive-sync.md。