582 lines
26 KiB
Markdown
582 lines
26 KiB
Markdown
# Weixin 群聊记录同步到 Neta 本地存档(weixin-archive sync)
|
|
|
|
> Status: Draft 2026-05-13
|
|
> Owner: 与 lixin 共识
|
|
> 关联 spec: `2026-05-08-weixin-group-channel-design.md`(架构 C)
|
|
> 关联 plan: `2026-05-12-weixin-db-channel.md`(本 spec 的前置)
|
|
|
|
## 1. 背景与目标
|
|
|
|
架构 C 的 weixin-db 渠道已经把"实时监听 + 路由到 Agent"打通(`WeixinDbService.bindChannel` + `WalWatcher` + `IncrementalReader` + `routeInboundMessage`)。但当前体验有两个问题:
|
|
|
|
1. **历史消息丢失**:bindChannel 只读 wal mtime 触发后的**新行**(`WHERE create_time > MAX(create_time) at bind`),群里之前已有的几千条历史**完全读不到**。
|
|
2. **依赖 Weixin 在线**:任何"查看群聊天记录"都要 Weixin 跑着、key 还有效、IncrementalReader 在线。Weixin 退出 / 关机 / key 过期 → 全部不可用。
|
|
|
|
本 spec 引入**独立的 archive 同步管线**:用户在群聊管理页点"同步聊天记录"按钮 → 后端把该群所有历史消息从 WCDB 解出来,落地到 neta 自己的 SQLite 文件。
|
|
|
|
**核心原则**:
|
|
- archive 与监听是**两条独立链路**,共用底层 wcdb_codec / room_resolver / message_repo,但不互相阻塞、不共享状态。
|
|
- archive 文件是**永久存档**;Weixin 自己清理旧消息后,neta 这边仍可查。
|
|
- archive 只做"读 → 解 → 落地",**不做查看 UI、不解附件文件内容、不混排到 Agent session**。这些是后续独立 spec。
|
|
|
|
**显式 out-of-scope**(避免 scope creep):
|
|
- ❌ 查看/展示 UI(按钮、抽屉、聊天回放页)
|
|
- ❌ 附件文件解密(图片 .dat / 语音 SILK / 视频 / 收藏)
|
|
- ❌ 把存档消息回放到 `/agent/chat` session
|
|
- ❌ 跨 channel 合库 / 全文搜索
|
|
- ❌ 实时监听管线的任何改动(walWatcher / IncrementalReader / agent_channel.routeInboundMessage 不动)
|
|
|
|
## 2. 用户视角
|
|
|
|
### 2.1 触发
|
|
|
|
群聊管理面板每个群卡片上,在原"查看对话记录"按钮**旁边**新增一个按钮 **"同步聊天记录"**。
|
|
|
|
> 注:原"查看对话记录"按钮当前会跳到一个并不存在的 chat session(报 `Session not found`)。本 spec **不动这个按钮**(它的修复属于后续"查看"spec)。本 spec 只新增"同步"按钮。
|
|
|
|
点击后:
|
|
- 按钮变 loading + 文案"同步中..."
|
|
- 整个群卡片其它操作禁用,避免重复点
|
|
- 后端阻塞执行,5-30s 后返回结果
|
|
- toast 提示:"已同步 N 条新消息(总计 M 条)"
|
|
|
|
### 2.2 同步范围
|
|
|
|
| 触发时机 | 行为 |
|
|
|---|---|
|
|
| **首次** (sync_state.last_sync_ts === 0) | 全量:WHERE create_time > 0,把该群在 WCDB 里能看到的所有历史消息都拉一遍 |
|
|
| **后续** | 增量:WHERE create_time > sync_state.last_sync_ts,只拉自上次同步后的新消息 |
|
|
|
|
> 一万条群首次同步预计 5-15 秒(本地 SQLite + zstd 解压 + 写入)。增量同步通常秒级。
|
|
|
|
### 2.3 错误反馈
|
|
|
|
| 失败场景 | 提示 |
|
|
|---|---|
|
|
| 频道未连接 (loginStatus ≠ connected) | "频道未连接,请检查 PC 微信是否登录" |
|
|
| 群不在 WCDB(对方退群 / 群被解散) | "未找到该群消息,可能群已不存在" |
|
|
| key 已失效(Weixin 升级 / 重登) | "登录态失效,请重新触发频道连接" |
|
|
| 解密失败 | "解密失败,请联系管理员"(error 写日志) |
|
|
| 非 Windows 平台 | "本功能需要 Windows + PC 微信" |
|
|
|
|
## 3. 整体架构
|
|
|
|
```
|
|
[群聊管理 frontend]
|
|
│ 点击"同步聊天记录"按钮
|
|
▼
|
|
POST /admin/netaclaw/agent_channel_group/sync { groupId }
|
|
│
|
|
▼
|
|
[NetaClawAgentChannelGroupController.sync]
|
|
│
|
|
▼
|
|
[WeixinArchiveSyncService.syncGroup(groupId)]
|
|
│
|
|
├─ 加 channel-level mutex(同 channel 串行,避免双写 sqlite)
|
|
│
|
|
├─ 取 group + channel; 校验 channel.type === 'weixin-db' && loginStatus === 'connected'
|
|
│
|
|
├─ 取 WeixinDbService.runtimes.get(channelId) 拿到已 bind 的 reader/key/paths
|
|
│ ↑ archive 不重新 spawn ps1, 复用 bindChannel 已抽好的 key
|
|
│
|
|
├─ ArchiveDb.openOrCreate(channelId) → <dataDir>/weixin-archive/cid-<id>.db
|
|
│ └─ CREATE TABLE IF NOT EXISTS message / sync_state
|
|
│
|
|
├─ 解密 message_0.db → 临时明文 (★ 独立 work 目录, 见 §3.1 工作目录隔离)
|
|
│
|
|
├─ 用 IncrementalReader.getRoomResolver().findRoomsByName(group.roomName)
|
|
│ → 拿到 Msg_<sha> 表名 + roomInfo
|
|
│ ↑ 复用 bindChannel 时已加载好的 session.db + contact.db 映射;
|
|
│ archive 自身不再解 session.db/contact.db (只解 message_0.db),
|
|
│ 减少一半 IO 且避免与监听链路争抢 RoomResolver 重建。
|
|
│
|
|
├─ lastSyncTs = ArchiveDb.getSyncState(roomId).lastSyncTs (首次=0)
|
|
│
|
|
├─ 分批 SELECT * FROM Msg_<sha> WHERE create_time > ? ORDER BY create_time ASC LIMIT 500
|
|
│ └─ 流式分页,避免一次加载万条
|
|
│
|
|
├─ 对每行: zstd 解 message_content → 解析 sender prefix → 投影成 ArchiveMessage
|
|
│
|
|
├─ INSERT OR IGNORE INTO message (...) [batch transaction]
|
|
│ └─ UNIQUE(room_id, local_id) 保证幂等
|
|
│
|
|
├─ UPDATE sync_state SET last_sync_ts = MAX(create_time), last_sync_count = N, last_error = NULL
|
|
│
|
|
├─ 任意失败 → catch 块尝试 best-effort 写 sync_state.last_error 后重新 throw
|
|
│
|
|
├─ 释放锁 (try/finally 保证 archive db + repo 都 close)
|
|
│
|
|
└─ 返回 { newCount, totalCount, durationMs, lastSyncTs }
|
|
```
|
|
|
|
**关键复用**:
|
|
- `wcdb_codec.decryptDatabase` —— 解密 WCDB(Phase C-2 Task 7)
|
|
- `RoomResolver.findRoomsByName`(新增,本 spec)—— roomName → 群信息(支持同名歧义)
|
|
- `MessageRepo.listSince` —— 流式 SELECT + zstd 解(Phase C-2 Task 11)
|
|
- 公共 `decryptDbToWorkDir`(从 IncrementalReader 抽出)—— 拷贝 + 解密整 db
|
|
- `WeixinDbService.getRuntime(cid)`(新增,本 spec)—— 已 bind 的 reader / paths / messageKey
|
|
|
|
archive 不重新 spawn ps1 抽 key —— **必须先 bindChannel 成功**。这意味着同步按钮的前置条件是"频道已连接"。
|
|
|
|
### 3.1 工作目录隔离 (★ 关键)
|
|
|
|
`IncrementalReader` 监听过程每 ~500ms 把 `message_0.db` 解密**覆盖写到** `<dataDir>/weixin-db-work/cid-<id>/decrypted/message_0.db`。如果 archive sync **复用同一个 workDir**:
|
|
|
|
```
|
|
监听线程: fs.writeFileSync(workDir/decrypted/message_0.db, ...) ← 持续覆盖
|
|
archive 线程: better-sqlite3 readonly open(workDir/decrypted/message_0.db) ← 同一文件
|
|
```
|
|
|
|
**会读到撕裂的文件**(写到一半被读)→ 可能 SIGBUS / corrupt page / wrong key 误报。
|
|
|
|
**强制约束**:archive 必须用**独立子目录**,本 spec 固定为:
|
|
|
|
```
|
|
<dataDir>/weixin-db-work/cid-<id>/ ← 监听 (IncrementalReader)
|
|
<dataDir>/weixin-db-work/cid-<id>/archive/ ← archive sync (本 spec)
|
|
```
|
|
|
|
实现:`WeixinArchiveSyncService.doSyncGroup` 计算 `workDir = path.join(dataDir, 'weixin-db-work', \`cid-${cid}\`, 'archive')`,传给 `decryptDbToWorkDir`。
|
|
|
|
监听链路代码**不动**(它已经在用 `cid-<id>/` 根目录)。仅 archive 进入子目录。
|
|
|
|
### 3.2 资源释放与失败恢复
|
|
|
|
- archive sqlite handle 和 MessageRepo 都用 `try/finally` 保证 `close()`,不能只 close 一个。
|
|
- 任意异常 → 在解出 `room.username` 之**后**的失败,best-effort 写 `sync_state.last_error`(再失败也吞掉,不掩盖原异常)。解出 room 之**前**的失败只 log,不写 sync_state(无 room_id 主键)。
|
|
- channel-level mutex 的释放必须放 `finally`,即使 doSyncGroup 抛错也要释放,否则下次同 channel 永远等待。
|
|
|
|
## 4. 数据模型
|
|
|
|
### 4.1 文件位置
|
|
|
|
```
|
|
<dataDir>/weixin-archive/
|
|
├── cid-6.db ← channel 6 (各群消息合存一文件)
|
|
├── cid-6.db-wal ← SQLite WAL (better-sqlite3 默认 WAL mode)
|
|
└── cid-7.db ← channel 7
|
|
```
|
|
|
|
每个 channel 一个独立 db 文件。理由:
|
|
- 频道删除时 cascade rm 整个文件,不留垃圾
|
|
- 单 channel 同步只锁自己的文件,跨 channel 并发互不影响
|
|
- 备份/迁移单 channel 简单
|
|
|
|
### 4.2 表结构
|
|
|
|
```sql
|
|
-- 消息主表
|
|
CREATE TABLE IF NOT EXISTS message (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
channel_id INTEGER NOT NULL, -- 冗余,便于以后跨库合并
|
|
room_id TEXT NOT NULL, -- 群 username,如 xxx@chatroom
|
|
room_name TEXT, -- 同步时刻的显示名 (可能变,仅参考)
|
|
msg_table TEXT NOT NULL, -- 来源 'Msg_<md5>',方便回查源
|
|
local_id INTEGER NOT NULL, -- WCDB local_id
|
|
server_id TEXT, -- WCDB server_id (BigInt → TEXT 避免精度丢失)
|
|
local_type INTEGER NOT NULL, -- 1 文本 / 3 图片 / 34 语音 / 43 视频 / 49 应用消息(含引用) ...
|
|
sender_wxid TEXT, -- 解析自 message_content "wxid_xxx:\n" 前缀
|
|
create_time INTEGER NOT NULL, -- Unix 秒
|
|
content TEXT, -- zstd 解压后的 UTF-8 (文本消息=正文; 非文本=完整 XML)
|
|
raw_hex TEXT, -- 当 message_content 非 UTF-8 / zstd 解压失败时的兜底 hex (诊断用)
|
|
synced_at INTEGER NOT NULL, -- 入档时间戳 (Unix 秒)
|
|
UNIQUE(room_id, local_id)
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_msg_room_time ON message(room_id, create_time DESC);
|
|
CREATE INDEX IF NOT EXISTS idx_msg_create_time ON message(create_time DESC);
|
|
|
|
-- 每群的同步水位
|
|
CREATE TABLE IF NOT EXISTS sync_state (
|
|
room_id TEXT PRIMARY KEY,
|
|
room_name TEXT,
|
|
last_sync_ts INTEGER NOT NULL DEFAULT 0, -- create_time 高水位
|
|
last_sync_count INTEGER NOT NULL DEFAULT 0, -- 上次同步入档数 (新行)
|
|
total_count INTEGER NOT NULL DEFAULT 0, -- 该 room 累计存档总数 (本表内 COUNT(*))
|
|
last_sync_at INTEGER NOT NULL, -- 上次完成时间戳 (Unix 秒)
|
|
last_error TEXT -- 上次失败原因 (成功时清空)
|
|
);
|
|
```
|
|
|
|
**字段决策说明**:
|
|
|
|
- `local_type` 不做枚举映射,**留 raw 数字**。WCDB type 体系复杂(49 下还有 sub-type),应用层有需要再解析,数据库层就只是数字。
|
|
- `content` 对于文本消息(type=1)就是消息正文;对于 type 3/34/43/49 等,是**完整的 message_content XML**(zstd 解后)。这样未来要做附件、引用、卡片解析,**不用回头重新读 WCDB** —— 数据已经在 archive 里了。
|
|
- `raw_hex` 是兜底 —— 极少数情况 zstd 解失败 / 数据损坏 / 二进制内容,把原始字节存 hex,避免数据丢失。正常 99%+ 的消息只填 `content` 不填 `raw_hex`。
|
|
- `UNIQUE(room_id, local_id)` 保证**重复点同步幂等**:同一条消息再次 INSERT OR IGNORE 不会写重。
|
|
|
|
## 5. 后端代码结构
|
|
|
|
### 5.1 新增文件
|
|
|
|
| 文件 | 责任 |
|
|
|---|---|
|
|
| `runtime/weixin_db/archive_db.ts` | better-sqlite3 包装:openOrCreate / migrate / insertMessages(batch) / getSyncState / updateSyncState / countMessages |
|
|
| `runtime/weixin_db/archive_decryptor.ts` | 抽出 IncrementalReader.decryptToWork 的公共逻辑(拷贝 + 解密整 db → 临时文件)。**Refactor**: IncrementalReader 改用此模块,不再内联实现。 |
|
|
| `service/weixin_archive_sync.ts` | 主服务:`syncGroup(groupId): Promise<SyncResult>` + channel-level mutex |
|
|
| `test/.../archive_db.test.ts` | 表创建 / 重复插入幂等 / sync_state 读写 |
|
|
| `test/.../weixin_archive_sync.test.ts` | mock IncrementalReader + WeixinDbService,跑同步流程 |
|
|
|
|
### 5.2 修改文件
|
|
|
|
| 文件 | 改动 |
|
|
|---|---|
|
|
| `service/agent_channel_group.ts` | 新增 `sync(groupId)` 委托给 `WeixinArchiveSyncService.syncGroup` |
|
|
| `controller/admin/agent_channel_group.ts` | 新增 `@Post('/sync')` endpoint |
|
|
| `service/agent_channel.ts` | `delete(ids)` cascade 删 `<dataDir>/weixin-archive/cid-{id}.db`(避免泄漏) |
|
|
| `runtime/weixin_db/incremental_reader.ts` | `decryptToWork` 提取到 `archive_decryptor.ts`,本文件 import 复用 |
|
|
| `frontend/.../channel-group-panel.vue` | 群卡片新增"同步聊天记录"按钮 + handler + loading 态 |
|
|
|
|
### 5.3 接口契约
|
|
|
|
#### 5.3.1 后端 API
|
|
|
|
```
|
|
POST /admin/netaclaw/agent_channel_group/sync
|
|
Body: { groupId: number }
|
|
Response success:
|
|
{
|
|
code: 1000,
|
|
data: {
|
|
newCount: number, // 本次新入档消息数
|
|
totalCount: number, // 该群累计入档总数
|
|
lastSyncTs: number, // 新水位 (Unix 秒)
|
|
durationMs: number
|
|
}
|
|
}
|
|
Response error:
|
|
{ code: 1003, message: string }
|
|
```
|
|
|
|
错误码使用现有体系(channel-not-connected / room-not-found / decrypt-failed / unsupported-platform)。
|
|
|
|
#### 5.3.2 Service 接口
|
|
|
|
```ts
|
|
// service/weixin_archive_sync.ts
|
|
@Provide()
|
|
@Scope(ScopeEnum.Singleton)
|
|
export class WeixinArchiveSyncService {
|
|
async syncGroup(groupId: number): Promise<SyncResult>;
|
|
async deleteChannelArchive(channelId: number): Promise<void>; // delete cascade 用
|
|
}
|
|
|
|
export interface SyncResult {
|
|
newCount: number;
|
|
totalCount: number;
|
|
lastSyncTs: number;
|
|
durationMs: number;
|
|
}
|
|
```
|
|
|
|
```ts
|
|
// runtime/weixin_db/archive_db.ts
|
|
export class ArchiveDb {
|
|
static openOrCreate(channelId: number, dataDir: string): ArchiveDb;
|
|
insertMessages(rows: ArchiveMessage[]): number; // 返回 INSERT OR IGNORE 真插入数
|
|
getSyncState(roomId: string): SyncState | null;
|
|
updateSyncState(roomId: string, patch: Partial<SyncState>): void;
|
|
countMessages(roomId: string): number;
|
|
close(): void;
|
|
|
|
static archivePath(channelId: number, dataDir: string): string; // for cascade delete
|
|
}
|
|
|
|
export interface ArchiveMessage {
|
|
channelId: number;
|
|
roomId: string;
|
|
roomName: string | null;
|
|
msgTable: string;
|
|
localId: number;
|
|
serverId: string | null;
|
|
localType: number;
|
|
senderWxid: string | null;
|
|
createTime: number;
|
|
content: string | null;
|
|
rawHex: string | null;
|
|
}
|
|
|
|
export interface SyncState {
|
|
roomId: string;
|
|
roomName: string | null;
|
|
lastSyncTs: number;
|
|
lastSyncCount: number;
|
|
totalCount: number;
|
|
lastSyncAt: number;
|
|
lastError: string | null;
|
|
}
|
|
```
|
|
|
|
```ts
|
|
// runtime/weixin_db/archive_decryptor.ts
|
|
export function decryptDbToWorkDir(opts: {
|
|
srcDb: string;
|
|
rawKey: Buffer;
|
|
workDir: string;
|
|
}): string; // returns plaintext db path
|
|
```
|
|
|
|
### 5.4 并发控制
|
|
|
|
**Channel-level mutex**,不是 group-level:
|
|
|
|
```ts
|
|
private readonly channelLocks = new Map<number, Promise<void>>();
|
|
|
|
async syncGroup(groupId: number): Promise<SyncResult> {
|
|
const group = ...;
|
|
const channelId = group.channelId;
|
|
const prev = this.channelLocks.get(channelId) ?? Promise.resolve();
|
|
let release: () => void;
|
|
const next = new Promise<void>(r => (release = r));
|
|
this.channelLocks.set(channelId, prev.then(() => next));
|
|
|
|
await prev;
|
|
try {
|
|
return await this.doSyncGroup(group);
|
|
} finally {
|
|
release!();
|
|
if (this.channelLocks.get(channelId) === next) this.channelLocks.delete(channelId);
|
|
}
|
|
}
|
|
```
|
|
|
|
理由:同一个 cid-X.db 文件 SQLite better-sqlite3 是**同步 single-writer**,跨群同时写同一 db 会抢锁;channel-level 串行最简单稳妥。跨 channel 并发不互相阻塞。
|
|
|
|
### 5.5 同步循环细节
|
|
|
|
```ts
|
|
async doSyncGroup(group: GroupEntity): Promise<SyncResult> {
|
|
// 1. 前置校验
|
|
const channel = await this.channelRepo.findOne({ where: { id: group.channelId } });
|
|
if (channel?.type !== 'weixin-db') throw new Error('only weixin-db supported');
|
|
if (channel.loginStatus !== 'connected') throw new Error('channel-not-connected');
|
|
if (process.platform !== 'win32') throw new Error('unsupported-platform');
|
|
|
|
const runtime = this.weixinDbService.getRuntime(channel.id); // 新增 getter
|
|
if (!runtime) throw new Error('channel-not-bound');
|
|
|
|
// 2. 解整库到 work
|
|
const plainMsgDb = decryptDbToWorkDir({
|
|
srcDb: runtime.paths.messageDb,
|
|
rawKey: runtime.messageKey,
|
|
workDir: path.join(resolveDataDir(), 'weixin-db-work', `cid-${channel.id}`),
|
|
});
|
|
|
|
// 3. 解析 roomName → tableName (用 runtime 已加载好的 RoomResolver)
|
|
const matches = runtime.reader.findRoomsByName(group.roomName); // 新增方法
|
|
if (matches.length === 0) throw new Error('room-not-found');
|
|
if (matches.length > 1) {
|
|
// 同名群,警告;按 lastSeenAt 取最近的那个
|
|
this.logger.warn('[archive-sync] %d rooms with same name=%s, using most recent',
|
|
matches.length, group.roomName);
|
|
}
|
|
const room = matches[0];
|
|
|
|
// 4. 打开 archive db
|
|
const archive = ArchiveDb.openOrCreate(channel.id, resolveDataDir());
|
|
|
|
// 5. 取水位 + 分批拉
|
|
const t0 = Date.now();
|
|
const state = archive.getSyncState(room.username);
|
|
const sinceTs = state?.lastSyncTs ?? 0;
|
|
const repo = new MessageRepo(plainMsgDb);
|
|
let newCount = 0;
|
|
let cursorTs = sinceTs;
|
|
try {
|
|
while (true) {
|
|
const rows = await repo.listSince(room.tableName, cursorTs, 500);
|
|
if (rows.length === 0) break;
|
|
const archiveRows = rows.map(r => projectToArchive(r, channel.id, room));
|
|
newCount += archive.insertMessages(archiveRows);
|
|
cursorTs = rows[rows.length - 1].createTime;
|
|
if (rows.length < 500) break;
|
|
}
|
|
} finally {
|
|
repo.close();
|
|
}
|
|
|
|
// 6. 更新水位
|
|
const totalCount = archive.countMessages(room.username);
|
|
archive.updateSyncState(room.username, {
|
|
roomName: room.roomName,
|
|
lastSyncTs: cursorTs,
|
|
lastSyncCount: newCount,
|
|
totalCount,
|
|
lastSyncAt: Math.floor(Date.now() / 1000),
|
|
lastError: null,
|
|
});
|
|
archive.close();
|
|
|
|
return { newCount, totalCount, lastSyncTs: cursorTs, durationMs: Date.now() - t0 };
|
|
}
|
|
```
|
|
|
|
`projectToArchive` 是纯函数,把 `MessageRow` 投影成 `ArchiveMessage`。**与 `build_pseudo.ts` 解析 sender prefix 的逻辑共用一份**(抽出 `parseSenderPrefix` 到 weixin_db/types 或类似公共位置,避免双份实现漂移)。
|
|
|
|
## 6. 前端
|
|
|
|
### 6.1 改动文件
|
|
|
|
`packages/frontend/src/modules/agent/components/channel-group-panel.vue`
|
|
|
|
### 6.2 UI 改动
|
|
|
|
群卡片操作区,在现有按钮旁新增:
|
|
|
|
```vue
|
|
<el-button
|
|
size="small"
|
|
:loading="group._syncing"
|
|
@click="handleSync(group)"
|
|
>
|
|
{{ group._syncing ? '同步中...' : '同步聊天记录' }}
|
|
</el-button>
|
|
```
|
|
|
|
群卡片元数据区可显示:
|
|
- 上次同步时间(读自 group.lastSyncAt,后端 list 返回时附加,见 §6.4)
|
|
- 上次同步条数 / 累计存档数
|
|
|
|
### 6.3 handler
|
|
|
|
```ts
|
|
async function handleSync(group: GroupItem) {
|
|
group._syncing = true;
|
|
try {
|
|
const resp = await apiPost('/admin/netaclaw/agent_channel_group/sync', { groupId: group.id });
|
|
if (resp.code !== 1000) {
|
|
ElMessage.error(resp.message || '同步失败');
|
|
return;
|
|
}
|
|
const { newCount, totalCount, durationMs } = resp.data;
|
|
ElMessage.success(`已同步 ${newCount} 条新消息(总计 ${totalCount} 条,${(durationMs/1000).toFixed(1)}s)`);
|
|
await loadList(); // 拉最新元数据回填
|
|
} catch (err: any) {
|
|
ElMessage.error(err?.message || '同步失败');
|
|
} finally {
|
|
group._syncing = false;
|
|
}
|
|
}
|
|
```
|
|
|
|
### 6.4 list 接口元数据扩展(★ **不在本 spec**,留给查看 spec)
|
|
|
|
群卡片显示"上次同步时间 / 累计存档数"是个不错的体验加分,但需要 list 接口对每个 weixin-db 群打开 ArchiveDb 读 sync_state,本 spec 不做。
|
|
|
|
理由:
|
|
- 本 spec 焦点是同步本身,UI 元数据回显属于"展示"范畴
|
|
- 同步完成后 toast 已经告诉用户 newCount / totalCount,够用
|
|
- 未来"查看聊天记录"功能(独立 spec)会重做 list 接口或新增 archive list 接口,届时再统一加
|
|
|
|
## 7. 错误处理
|
|
|
|
| 场景 | 后端处理 | 前端提示 |
|
|
|---|---|---|
|
|
| `channel.type !== 'weixin-db'` | throw `unsupported-channel-type` | "本频道不支持同步" |
|
|
| `loginStatus !== 'connected'` | throw `channel-not-connected` | "频道未连接,请检查 PC 微信是否登录" |
|
|
| `WeixinDbService.runtimes` 不含该 channel | throw `channel-not-bound` | "频道未连接,请检查 PC 微信是否登录" |
|
|
| RoomResolver 找不到 roomName | throw `room-not-found` | "未找到该群消息,可能群已不存在" |
|
|
| 同名群多个 → warn,取 lastSeenAt 最新的 | log warn | (用户无感) |
|
|
| `decryptDbToWorkDir` 失败(key 失效) | throw `decrypt-failed` | "登录态失效,请重新触发频道连接" |
|
|
| `MessageRepo` SQL 失败 | throw,日志 + lastError 写 sync_state | "同步失败:<原因>" |
|
|
| 跨 channel 并发同步 | channel-level mutex 串行,不报错 | (用户无感,只是排队) |
|
|
| 同 channel 同时同步两个群 | mutex 串行 | (用户无感) |
|
|
| 同 group 重复点 | mutex 自然排队;前端 _syncing flag 也阻止 | 按钮 loading 中无法再点 |
|
|
| sqlite WAL 损坏 / 磁盘满 | throw,日志,sync_state.last_error 留痕 | "同步失败:<原因>" |
|
|
| 非 Windows | throw `unsupported-platform` | "本功能需要 Windows + PC 微信" |
|
|
|
|
`lastError` 写到 `sync_state.last_error` 后,下次同步成功会被清空,便于排查。
|
|
|
|
## 8. 测试策略
|
|
|
|
### 8.1 单元测试(可在 Linux/Mac 跑)
|
|
|
|
| 测试文件 | 覆盖 |
|
|
|---|---|
|
|
| `archive_db.test.ts` | openOrCreate 创建表 / 重复 INSERT OR IGNORE 幂等 / sync_state CRUD / archivePath 计算 |
|
|
| `weixin_archive_sync.test.ts` | mock WeixinDbService.runtimes + IncrementalReader, 跑 syncGroup 流程,断言 newCount/totalCount/sqlite 内容 |
|
|
| `weixin_archive_sync.concurrency.test.ts` | mock 慢速 syncGroup,并发调两次同 channel → 串行;并发不同 channel → 并行 |
|
|
| `parseSenderPrefix.test.ts`(若抽公共) | 文本消息 / 引用消息 / 系统消息 sender 解析 |
|
|
|
|
### 8.2 集成测试(条件 skip,Windows-only)
|
|
|
|
| 测试 | 前置 | 内容 |
|
|
|---|---|---|
|
|
| `archive_e2e.test.ts` | Weixin 登录 + 至少一个测试群且已 bindChannel | 触发同步 → 断言 archive db 有行 → lastSyncTs 单调递增 |
|
|
|
|
### 8.3 手工冒烟(E2E checklist)
|
|
|
|
放在 plan 的 Phase 末尾。
|
|
|
|
---
|
|
|
|
## 9. 里程碑
|
|
|
|
本 spec 较小,**单 plan 单 milestone**,不分 M1/M2:
|
|
|
|
| 阶段 | 任务 | 产出 |
|
|
|---|---|---|
|
|
| **A. ArchiveDb 模块**(TDD) | 写 archive_db.ts + 单测 | 表创建 / 幂等插入 / sync_state CRUD |
|
|
| **B. archive_decryptor 抽出** | 重构 IncrementalReader 用同款逻辑 | 不破坏现有监听链路(回归测试) |
|
|
| **C. WeixinArchiveSyncService 主服务**(TDD) | 写 service + 单测(mock) | syncGroup 流程闭环 |
|
|
| **D. WeixinDbService 暴露 runtime getter + RoomResolver 多名匹配** | 改动最小 | reader.findRoomsByName |
|
|
| **E. Controller endpoint** | `POST /sync` | API 通 |
|
|
| **F. agent_channel.delete cascade rm archive 文件** | 防止泄漏 | rm `cid-X.db` 在 delete 时 |
|
|
| **G. 前端按钮 + handler** | UI + apiPost | 用户能点能看 toast |
|
|
| **H. list 接口附加 archive 元数据**(可选) | 群卡片显示上次同步时间 | 用户体验加分 |
|
|
| **I. E2E 手工冒烟** | Windows + 真群 | 通过即合并 |
|
|
|
|
每阶段一个 commit。
|
|
|
|
---
|
|
|
|
## 10. 已对齐的关键决策(避免实施时反悔)
|
|
|
|
| 决策 | 结论 | 理由 |
|
|
|---|---|---|
|
|
| 文件粒度 | per-channel (`cid-X.db`) | cascade 删除简单,跨 channel 无锁竞争 |
|
|
| 同步策略 | 首次全量 + 后续增量(基于 `last_sync_ts` 水位) | 自然语义,重复点幂等 |
|
|
| 同步反馈 | 后端阻塞,前端 loading | 首次最长 ~30s,简单可接受 |
|
|
| 附件 | 只存 raw XML 到 `content`,不解附件文件 | 数据不丢,未来再做附件解析 spec |
|
|
| 与监听的关系 | 两条独立链路,共享底层模块 | archive 不依赖 walWatcher,监听不依赖 archive |
|
|
| 工作目录 | archive 用 `cid-<id>/archive/` 子目录,与监听隔离 | 防止与监听 500ms 覆盖写 `message_0.db` 撕裂 |
|
|
| 前置条件 | 频道必须已 `bindChannel` 成功 (loginStatus=connected) | 复用已抽好的 key,archive 不重新 spawn ps1 |
|
|
| 并发 | channel-level mutex | better-sqlite3 是 single-writer,channel 串行最稳 |
|
|
| 复用 RoomResolver | archive 只用 `runtime.reader.getRoomResolver()`,不重建 | 避免 archive 也解 session.db/contact.db,IO 减半 + 状态一致 |
|
|
| 失败时 sync_state | best-effort 写 `last_error`,不掩盖原异常 | 便于排查,同步链路对一致性容忍度高 |
|
|
| 查看 UI | **不在本 spec** | 后续独立 spec |
|
|
| 重复点行为 | 幂等(`INSERT OR IGNORE`) | 用户无心理负担 |
|
|
| 群名匹配 | trim 后精确等于 | `addByName` 已 trim 入库,RoomResolver 输入也 trim 兜底 |
|
|
|
|
---
|
|
|
|
## 11. 风险与兜底
|
|
|
|
| 风险 | 概率 | 兜底 |
|
|
|---|---|---|
|
|
| WCDB schema 在 Weixin 新版本变化 | 中 | listMsgTables / listSince 已有,沿用现有抽象;若变,改 message_repo 即可 |
|
|
| 同名群歧义 | 低 | warn log + 取首条;未来支持用 username 直接选(下一个 spec) |
|
|
| sqlite 文件损坏 | 低 | better-sqlite3 WAL mode + 单写;损坏时手动 rm 文件,下次同步全量重建 |
|
|
| 万条群同步超时 | 中 | 分批 500 条 LIMIT 流式;若仍超 30s,改用后台 job 模式(非本 spec) |
|
|
| 用户在同步中删除频道 | 低 | mutex 持有 archive db 文件句柄;syncGroup 抛错 → 前端提示;cascade delete 在 mutex 释放后执行 |
|
|
| ★ archive 与监听双写 workDir 解密文件撕裂 | **高(若不隔离)** | **必须用 `cid-<id>/archive/` 子目录,见 §3.1** |
|
|
| ★ catch 路径漏 close archive db handle | 中 | `try/finally` 保证 archive + repo 都 close(spec §3.2) |
|
|
| 异常未写 sync_state.last_error 难排查 | 中 | best-effort 写 `last_error`,见 §3.2 |
|
|
| 群名首尾空格 / 不可见字符 | 低 | `addByName` 已 trim;`findRoomsByName` 内部也 trim 比较 |
|
|
| 漏释放 mutex 导致 channel 永远阻塞 | 低 | release 放 finally,doSyncGroup 抛错也释放 |
|
|
|
|
---
|
|
|
|
## 12. Self-Review(spec 写完检查)
|
|
|
|
- [x] **Placeholder 扫描**:无 TBD/TODO,所有字段已定。
|
|
- [x] **内部一致性**:数据模型 §4.2 字段与 service §5.3 ArchiveMessage interface 对齐;接口契约与 controller / service 调用一致。
|
|
- [x] **scope 检查**:已显式列出 out-of-scope(查看 UI / 附件解密 / 实时监听不动);本 spec 焦点收敛。
|
|
- [x] **歧义检查**:
|
|
- "全量"明确为 `WHERE create_time > 0`(不是"重置 + 全部覆盖")。
|
|
- "上次同步条数" = `INSERT OR IGNORE` 实际入库数,不是 SELECT 出来的数。
|
|
- "totalCount" = 当前 archive 表 `COUNT(*) WHERE room_id = ?`,不是 WCDB 源数。
|
|
- [x] **决策点都有理由**:每个决策都附了"理由"列。
|